Source code for galaxy.jobs.runners.condor

"""
Job control via the Condor DRM.
"""

import os
import logging

from galaxy import model
from galaxy.jobs.runners import AsynchronousJobState, AsynchronousJobRunner
from galaxy.jobs.runners.util.condor import submission_params, build_submit_description
from galaxy.jobs.runners.util.condor import condor_submit, condor_stop
from galaxy.jobs.runners.util.condor import summarize_condor_log

log = logging.getLogger( __name__ )

__all__ = [ 'CondorJobRunner' ]


class CondorJobState( AsynchronousJobState ):
    def __init__( self, **kwargs ):
        """
        Encapsulates state related to a job that is being run via the DRM and
        that we need to monitor.
        """
        super( CondorJobState, self ).__init__( **kwargs )
        self.failed = False
        self.user_log = None
        self.user_log_size = 0


[docs]class CondorJobRunner( AsynchronousJobRunner ): """ Job runner backed by a finite pool of worker threads. FIFO scheduling """ runner_name = "CondorRunner" def __init__( self, app, nworkers ): """Initialize this job runner and start the monitor thread""" super( CondorJobRunner, self ).__init__( app, nworkers ) self._init_monitor_thread() self._init_worker_threads()
[docs] def queue_job( self, job_wrapper ): """Create job script and submit it to the DRM""" # prepare the job if not self.prepare_job( job_wrapper, include_metadata=True ): return # command line has been added to the wrapper by prepare_job() command_line = job_wrapper.runner_command_line # get configured job destination job_destination = job_wrapper.job_destination # wrapper.get_id_tag() instead of job_id for compatibility with TaskWrappers. galaxy_id_tag = job_wrapper.get_id_tag() # get destination params query_params = submission_params(prefix="", **job_destination.params) galaxy_slots = query_params.get('request_cpus', None) if galaxy_slots: galaxy_slots_statement = 'GALAXY_SLOTS="%s"; export GALAXY_SLOTS_CONFIGURED="1"' % galaxy_slots else: galaxy_slots_statement = 'GALAXY_SLOTS="1"' # define job attributes cjs = CondorJobState( files_dir=self.app.config.cluster_files_directory, job_wrapper=job_wrapper ) cluster_directory = self.app.config.cluster_files_directory cjs.user_log = os.path.join( cluster_directory, 'galaxy_%s.condor.log' % galaxy_id_tag ) cjs.register_cleanup_file_attribute( 'user_log' ) submit_file = os.path.join( cluster_directory, 'galaxy_%s.condor.desc' % galaxy_id_tag ) executable = cjs.job_file build_submit_params = dict( executable=executable, output=cjs.output_file, error=cjs.error_file, user_log=cjs.user_log, query_params=query_params, ) submit_file_contents = build_submit_description(**build_submit_params) script = self.get_job_file( job_wrapper, exit_code_path=cjs.exit_code_file, slots_statement=galaxy_slots_statement, ) try: fh = file( executable, "w" ) fh.write( script ) fh.close() os.chmod( executable, 0750 ) except: job_wrapper.fail( "failure preparing job script", exception=True ) log.exception( "(%s) failure preparing job script" % galaxy_id_tag ) return try: open(submit_file, "w").write(submit_file_contents) except: if self.app.config.cleanup_job == "always": cjs.cleanup() # job_wrapper.fail() calls job_wrapper.cleanup() job_wrapper.fail( "failure preparing submit file", exception=True ) log.exception( "(%s) failure preparing submit file" % galaxy_id_tag ) return # job was deleted while we were preparing it if job_wrapper.get_state() == model.Job.states.DELETED: log.debug( "Job %s deleted by user before it entered the queue" % galaxy_id_tag ) if self.app.config.cleanup_job in ( "always", "onsuccess" ): os.unlink( submit_file ) cjs.cleanup() job_wrapper.cleanup() return log.debug( "(%s) submitting file %s" % ( galaxy_id_tag, executable ) ) external_job_id, message = condor_submit(submit_file) if external_job_id is None: log.debug( "condor_submit failed for job %s: %s" % (job_wrapper.get_id_tag(), message) ) if self.app.config.cleanup_job == "always": os.unlink( submit_file ) cjs.cleanup() job_wrapper.fail( "condor_submit failed", exception=True ) return os.unlink( submit_file ) log.info( "(%s) queued as %s" % ( galaxy_id_tag, external_job_id ) ) # store runner information for tracking if Galaxy restarts job_wrapper.set_job_destination( job_destination, external_job_id ) # Store DRM related state information for job cjs.job_id = external_job_id cjs.job_destination = job_destination # Add to our 'queue' of jobs to monitor self.monitor_queue.put( cjs )
[docs] def check_watched_items( self ): """ Called by the monitor thread to look at each watched job and deal with state changes. """ new_watched = [] for cjs in self.watched: job_id = cjs.job_id galaxy_id_tag = cjs.job_wrapper.get_id_tag() try: if os.stat( cjs.user_log ).st_size == cjs.user_log_size: new_watched.append( cjs ) continue s1, s4, s7, s5, s9, log_size = summarize_condor_log(cjs.user_log, job_id) job_running = s1 and not (s4 or s7) job_complete = s5 job_failed = s9 cjs.user_log_size = log_size except Exception: # so we don't kill the monitor thread log.exception( "(%s/%s) Unable to check job status" % ( galaxy_id_tag, job_id ) ) log.warning( "(%s/%s) job will now be errored" % ( galaxy_id_tag, job_id ) ) cjs.fail_message = "Cluster could not complete job" self.work_queue.put( ( self.fail_job, cjs ) ) continue if job_running and not cjs.running: log.debug( "(%s/%s) job is now running" % ( galaxy_id_tag, job_id ) ) cjs.job_wrapper.change_state( model.Job.states.RUNNING ) if not job_running and cjs.running: log.debug( "(%s/%s) job has stopped running" % ( galaxy_id_tag, job_id ) ) # Will switching from RUNNING to QUEUED confuse Galaxy? #cjs.job_wrapper.change_state( model.Job.states.QUEUED ) if job_complete: if cjs.job_wrapper.get_state() != model.Job.states.DELETED: log.debug( "(%s/%s) job has completed" % ( galaxy_id_tag, job_id ) ) self.work_queue.put( ( self.finish_job, cjs ) ) continue if job_failed: log.debug( "(%s/%s) job failed" % ( galaxy_id_tag, job_id ) ) cjs.failed = True self.work_queue.put( ( self.finish_job, cjs ) ) continue cjs.runnning = job_running new_watched.append( cjs ) # Replace the watch list with the updated version self.watched = new_watched
[docs] def stop_job( self, job ): """Attempts to delete a job from the DRM queue""" external_id = job.job_runner_external_id failure_message = condor_stop(external_id) if failure_message: log.debug("(%s/%s). Failed to stop condor %s" % (external_id, failure_message))
[docs] def recover( self, job, job_wrapper ): """Recovers jobs stuck in the queued/running state when Galaxy started""" # TODO Check if we need any changes here job_id = job.get_job_runner_external_id() galaxy_id_tag = job_wrapper.get_id_tag() if job_id is None: self.put( job_wrapper ) return cjs = CondorJobState( job_wrapper=job_wrapper, files_dir=self.app.config.cluster_files_directory ) cjs.job_id = str( job_id ) cjs.command_line = job.get_command_line() cjs.job_wrapper = job_wrapper cjs.job_destination = job_wrapper.job_destination cjs.user_log = os.path.join( self.app.config.cluster_files_directory, 'galaxy_%s.condor.log' % galaxy_id_tag ) cjs.register_cleanup_file_attribute( 'user_log' ) self.__old_state_paths( cjs ) if job.state == model.Job.states.RUNNING: log.debug( "(%s/%s) is still in running state, adding to the DRM queue" % ( job.id, job.job_runner_external_id ) ) cjs.running = True self.monitor_queue.put( cjs ) elif job.state == model.Job.states.QUEUED: log.debug( "(%s/%s) is still in DRM queued state, adding to the DRM queue" % ( job.id, job.job_runner_external_id ) ) cjs.running = False self.monitor_queue.put( cjs )
def __old_state_paths( self, cjs ): """For recovery of jobs started prior to standardizing the naming of files in the AsychronousJobState object """ if cjs.job_wrapper is not None: user_log = "%s/%s.condor.log" % (self.app.config.cluster_files_directory, cjs.job_wrapper.job_id) if not os.path.exists( cjs.user_log ) and os.path.exists( user_log ): cjs.output_file = "%s/%s.o" % (self.app.config.cluster_files_directory, cjs.job_wrapper.job_id) cjs.error_file = "%s/%s.e" % (self.app.config.cluster_files_directory, cjs.job_wrapper.job_id) cjs.job_file = "%s/galaxy_%s.sh" % (self.app.config.cluster_files_directory, cjs.job_wrapper.job_id) cjs.user_log = user_log