Commit f4a2f385 authored by Tomas Krizek's avatar Tomas Krizek Committed by Petr Špaček

job_manager/submit: script to queue jobs to htcondor

parent d6a39f5b
Respdiff contrib: Job Manager
=============================
Tools to manage respdiff jobs. These scripts are intended for use with
knot-resolver CI.
* ``create.py``: Creates all necessary config files for given test case and
kresd version. Docker and docker-compose is used to run the resolvers. Also
creates a ``run_respdiff.sh`` script which can be executed to run the entire
test, including build, setup and teardown.
* ``submit.py``: Submits the created job to a local HTCondor cluster.
Requirements
------------
Latest upstream versions of docker/docker-compose are recommended!
* docker-ce
* docker-compose
* pip3 install -r contrib/job_manager/requirements.txt
Example Usage
-------------
.. code-block:: console
Create job directories for all 'shortlist*' test cases (default) for v2.4.0
$ ./create.py v2.4.0
/var/tmp/respdiff/v2.4.0/shortlist.fwd-udp6-kresd.udp6.j384
/var/tmp/respdiff/v2.4.0/shortlist.fwd-udp6-unbound.tcp6.j384
/var/tmp/respdiff/v2.4.0/shortlist.fwd-tls6-kresd.udp6.j128
/var/tmp/respdiff/v2.4.0/shortlist.iter.udp6.j384
/var/tmp/respdiff/v2.4.0/shortlist.fwd-udp6-unbound.tls6.j384
/var/tmp/respdiff/v2.4.0/shortlist.iter.tls6.j384
/var/tmp/respdiff/v2.4.0/shortlist.fwd-udp6-unbound.udp6.j384
Create job directory for a specific test case for commit cc036420
$ ./create.py cc036420 -t shortlist.fwd-udp6-kresd.udp6.j384
/var/tmp/respdiff-jobs/cc036420/shortlist.fwd-udp6-kresd.udp6.j384
.. code-block:: console
Submit a prepared job to htcondor cluster to be executed once
$ ./submit.py /var/tmp/respdiff-jobs/cc036420/shortlist.fwd-udp6-kresd.udp6.j384
Submit a prepared job to htcondor cluster, execute it twice with a higher
$ # priority and wait until all jobs are done
$ ./submit.py -c 2 -p 10 -w /var/tmp/respdiff-jobs/cc036420/shortlist.fwd-udp6-kresd.udp6.j384
Submit multiple jobs at once, execute every job twice
$ ./submit.py -c 2 /var/tmp/respdiff/v2.4.0/shortlist.iter.udp6.j384 /var/tmp/respdiff/v2.4.0/shortlist.fwd-udp6-unbound.udp6.j384
.. code-block:: console
Chain create.py and submit.py together to submit all 'shortlist*' test cases
for commit cc036420 to htcondor cluster
$ ./submit.py $(./create.py cc036420)
Executable = run_respdiff.sh
Error = j$(Cluster).$(Process)_stderr
Output = j$(Cluster).$(Process)_stdout
Log = j$(Cluster).$(Process)_log
JobBatchName = {{ batch_name }}
should_transfer_files = YES
when_to_transfer_output = ON_EXIT
transfer_input_files = {{ input_files | join(', ') }}
transfer_output_files = j$(Cluster).$(Process)_report.json, j$(Cluster).$(Process)_report.txt, j$(Cluster).$(Process)_histogram.svg
arguments = $(Cluster) $(Process)
#!/usr/bin/env python3
import argparse
import collections
import contextlib
import glob
import itertools
import logging
import os
import sys
import time
import traceback
from typing import Dict, List, Sequence, Tuple # noqa
import warnings
WAIT_POLLING_PERIOD = 30
JOB_STATUS_RUNNING = 2
def get_all_files(directory: str) -> List[str]:
files = []
for filename in glob.iglob('{}/**'.format(directory), recursive=True):
if os.path.isfile(filename):
files.append(os.path.relpath(filename, directory))
return files
def condor_submit(txn, priority: int) -> int:
directory = os.getcwd()
input_files = get_all_files(directory)
if 'run_respdiff.sh' not in input_files:
raise RuntimeError(
"The provided directory doesn't look like a respdiff job. "
"{}/run_respdiff.sh is missing!".format(directory))
# create batch name from dir structure
commit_dir_path, test_case = os.path.split(directory)
_, commit_dir = os.path.split(commit_dir_path)
batch_name = commit_dir + '_' + test_case
submit = Submit({
'priority': str(priority),
'executable': 'run_respdiff.sh',
'arguments': '$(Cluster) $(Process)',
'error': 'j$(Cluster).$(Process)_stderr',
'output': 'j$(Cluster).$(Process)_stdout',
'log': 'j$(Cluster).$(Process)_log',
'jobbatchname': batch_name,
'should_transfer_files': 'YES',
'when_to_transfer_output': 'ON_EXIT',
'transfer_input_files': ', '.join(input_files),
'transfer_output_files': ', '.join([
'j$(Cluster).$(Process)_report.json',
'j$(Cluster).$(Process)_report.txt',
'j$(Cluster).$(Process)_histogram.svg']),
})
return submit.queue(txn)
@contextlib.contextmanager
def pushd(new_dir):
previous_dir = os.getcwd()
os.chdir(new_dir)
yield
os.chdir(previous_dir)
def condor_wait_for(schedd, job_ids: Sequence[int]) -> None:
prev_remaining = None
prev_running = None
prev_worst_pos = None
while True:
remaining, running, worst_pos = condor_check_status(schedd, job_ids)
if not remaining:
break
# log only status changes
if (remaining != prev_remaining or
running != prev_running or
worst_pos != prev_worst_pos):
logging.info(
" remaning: %2d (running: %2d) worst queue position: %2d",
remaining, running, worst_pos + 1)
prev_remaining = remaining
prev_running = running
prev_worst_pos = worst_pos
time.sleep(WAIT_POLLING_PERIOD)
def condor_check_status(schedd, job_ids: Sequence[int]) -> Tuple[int, int, int]:
all_jobs = schedd.query(True, ['JobPrio', 'ClusterId', 'ProcId', 'JobStatus'])
all_jobs = sorted(
all_jobs,
key=lambda x: (-x['JobPrio'], x['ClusterId'], x['ProcId']))
worst_pos = 0
running = 0
remaining = 0
for i, job in enumerate(all_jobs):
if int(job['ClusterId']) in job_ids:
remaining += 1
if int(job['JobStatus']) == JOB_STATUS_RUNNING:
running += 1
worst_pos = i
return remaining, running, worst_pos
def main() -> None:
parser = argparse.ArgumentParser(
description="Submit prepared jobs to HTCondor cluster")
parser.add_argument(
'-c', '--count', type=int, default=1,
help="How many times to submit job (default: 1)")
parser.add_argument(
'-p', '--priority', type=int, default=5,
help="Set condor job priority, higher means sooner execution (default: 5)")
parser.add_argument(
'-w', '--wait', action='store_true',
help="Wait until all submitted jobs are finished")
parser.add_argument(
'job_dir', nargs='+',
help="Path to the job directory to be submitted")
args = parser.parse_args()
job_ids = collections.defaultdict(list) # type: Dict[str, List[int]]
schedd = Schedd()
with schedd.transaction() as txn:
# submit jobs one-by-one to ensure round-robin job execution (instead of seq)
for _ in range(args.count):
for directory in args.job_dir:
with pushd(directory):
job_ids[directory].append(condor_submit(txn, args.priority))
for directory, jobs in job_ids.items():
logging.info("%s JOB ID(s): %s", directory, ', '.join(str(j) for j in jobs))
job_count = sum(len(jobs) for jobs in job_ids.values())
logging.info("%d job(s) successfully submitted!", job_count)
if args.wait:
logging.info(
'WAITING for jobs to complete. This can be safely interrupted with Ctl+C...')
try:
condor_wait_for(schedd, list(itertools.chain(*job_ids.values())))
except KeyboardInterrupt:
pass
else:
logging.info("All jobs done!")
if __name__ == '__main__':
logging.basicConfig(
format='%(asctime)s %(levelname)8s %(message)s', level=logging.DEBUG)
with warnings.catch_warnings():
warnings.simplefilter("error") # trigger UserWarning which causes ImportError
try:
from htcondor import Submit, Schedd
except (ImportError, UserWarning):
logging.error('HTCondor not detected. Use this script on a submit machine.')
sys.exit(1)
try:
main()
except RuntimeError as exc:
logging.debug(traceback.format_exc())
logging.error(str(exc))
sys.exit(1)
except Exception as exc:
logging.debug(traceback.format_exc())
logging.critical('Unhandled code exception: %s', str(exc))
sys.exit(2)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment