"""
Job control via the DRMAA API.
"""
import json
import logging
import os
import string
import subprocess
import sys
import time
from galaxy import eggs
from galaxy import model
from galaxy.jobs import JobDestination
from galaxy.jobs.handler import DEFAULT_JOB_PUT_FAILURE_MESSAGE
from galaxy.jobs.runners import AsynchronousJobState, AsynchronousJobRunner
eggs.require( "drmaa" )
log = logging.getLogger( __name__ )
__all__ = [ 'DRMAAJobRunner' ]
drmaa = None
DRMAA_jobTemplate_attributes = [ 'args', 'remoteCommand', 'outputPath', 'errorPath', 'nativeSpecification',
'jobName', 'email', 'project' ]
[docs]class DRMAAJobRunner( AsynchronousJobRunner ):
"""
Job runner backed by a finite pool of worker threads. FIFO scheduling
"""
runner_name = "DRMAARunner"
def __init__( self, app, nworkers, **kwargs ):
"""Start the job runner"""
global drmaa
runner_param_specs = dict(
drmaa_library_path = dict( map = str, default = os.environ.get( 'DRMAA_LIBRARY_PATH', None ) ),
invalidjobexception_state = dict( map = str, valid = lambda x: x in ( model.Job.states.OK, model.Job.states.ERROR ), default = model.Job.states.OK ),
invalidjobexception_retries = dict( map = int, valid = lambda x: int >= 0, default = 0 ),
internalexception_state = dict( map = str, valid = lambda x: x in ( model.Job.states.OK, model.Job.states.ERROR ), default = model.Job.states.OK ),
internalexception_retries = dict( map = int, valid = lambda x: int >= 0, default = 0 ) )
if 'runner_param_specs' not in kwargs:
kwargs[ 'runner_param_specs' ] = dict()
kwargs[ 'runner_param_specs' ].update( runner_param_specs )
super( DRMAAJobRunner, self ).__init__( app, nworkers, **kwargs )
# This allows multiple drmaa runners (although only one per handler) in the same job config file
if 'drmaa_library_path' in kwargs:
log.info( 'Overriding DRMAA_LIBRARY_PATH due to runner plugin parameter: %s', self.runner_params.drmaa_library_path )
os.environ['DRMAA_LIBRARY_PATH'] = self.runner_params.drmaa_library_path
# We foolishly named this file the same as the name exported by the drmaa
# library... 'import drmaa' imports itself.
drmaa = __import__( "drmaa" )
# Subclasses may need access to state constants
self.drmaa_job_states = drmaa.JobState
# Descriptive state strings pulled from the drmaa lib itself
self.drmaa_job_state_strings = {
drmaa.JobState.UNDETERMINED: 'process status cannot be determined',
drmaa.JobState.QUEUED_ACTIVE: 'job is queued and active',
drmaa.JobState.SYSTEM_ON_HOLD: 'job is queued and in system hold',
drmaa.JobState.USER_ON_HOLD: 'job is queued and in user hold',
drmaa.JobState.USER_SYSTEM_ON_HOLD: 'job is queued and in user and system hold',
drmaa.JobState.RUNNING: 'job is running',
drmaa.JobState.SYSTEM_SUSPENDED: 'job is system suspended',
drmaa.JobState.USER_SUSPENDED: 'job is user suspended',
drmaa.JobState.DONE: 'job finished normally',
drmaa.JobState.FAILED: 'job finished, but failed',
}
self.ds = drmaa.Session()
self.ds.initialize()
# external_runJob_script can be None, in which case it's not used.
self.external_runJob_script = app.config.drmaa_external_runjob_script
self.external_killJob_script = app.config.drmaa_external_killjob_script
self.userid = None
self._init_monitor_thread()
self._init_worker_threads()
[docs] def url_to_destination(self, url):
"""Convert a legacy URL to a job destination"""
if not url:
return
native_spec = url.split('/')[2]
if native_spec:
params = dict( nativeSpecification=native_spec )
log.debug( "Converted URL '%s' to destination runner=drmaa, params=%s" % ( url, params ) )
return JobDestination( runner='drmaa', params=params )
else:
log.debug( "Converted URL '%s' to destination runner=drmaa" % url )
return JobDestination( runner='drmaa' )
[docs] def get_native_spec( self, url ):
"""Get any native DRM arguments specified by the site configuration"""
try:
return url.split('/')[2] or None
except:
return None
[docs] def queue_job( self, job_wrapper ):
"""Create job script and submit it to the DRM"""
# prepare the job
if not self.prepare_job( job_wrapper, include_metadata=True ):
return
# get configured job destination
job_destination = job_wrapper.job_destination
# wrapper.get_id_tag() instead of job_id for compatibility with TaskWrappers.
galaxy_id_tag = job_wrapper.get_id_tag()
# define job attributes
job_name = 'g%s' % galaxy_id_tag
if job_wrapper.tool.old_id:
job_name += '_%s' % job_wrapper.tool.old_id
if self.external_runJob_script is None:
job_name += '_%s' % job_wrapper.user
job_name = ''.join( map( lambda x: x if x in ( string.letters + string.digits + '_' ) else '_', job_name ) )
ajs = AsynchronousJobState( files_dir=job_wrapper.working_directory, job_wrapper=job_wrapper, job_name=job_name )
# set up the drmaa job template
jt = self.ds.createJobTemplate()
jt.remoteCommand = ajs.job_file
jt.jobName = ajs.job_name
jt.outputPath = ":%s" % ajs.output_file
jt.errorPath = ":%s" % ajs.error_file
# Avoid a jt.exitCodePath for now - it's only used when finishing.
native_spec = job_destination.params.get('nativeSpecification', None)
if native_spec is not None:
jt.nativeSpecification = native_spec
# fill in the DRM's job run template
script = self.get_job_file(job_wrapper, exit_code_path=ajs.exit_code_file)
try:
fh = file( ajs.job_file, "w" )
fh.write( script )
fh.close()
os.chmod( ajs.job_file, 0755 )
except:
job_wrapper.fail( "failure preparing job script", exception=True )
log.exception( "(%s) failure writing job script" % galaxy_id_tag )
return
# job was deleted while we were preparing it
if job_wrapper.get_state() == model.Job.states.DELETED:
log.debug( "(%s) Job deleted by user before it entered the queue" % galaxy_id_tag )
if self.app.config.cleanup_job in ( "always", "onsuccess" ):
job_wrapper.cleanup()
return
log.debug( "(%s) submitting file %s", galaxy_id_tag, ajs.job_file )
if native_spec:
log.debug( "(%s) native specification is: %s", galaxy_id_tag, native_spec )
# runJob will raise if there's a submit problem
if self.external_runJob_script is None:
# TODO: create a queue for retrying submission indefinitely
# TODO: configurable max tries and sleep
trynum = 0
external_job_id = None
fail_msg = None
while external_job_id is None and trynum < 5:
try:
external_job_id = self.ds.runJob(jt)
break
except ( drmaa.InternalException, drmaa.DeniedByDrmException ), e:
trynum += 1
log.warning( '(%s) drmaa.Session.runJob() failed, will retry: %s', galaxy_id_tag, e )
fail_msg = "Unable to run this job due to a cluster error, please retry it later"
time.sleep( 5 )
except:
log.exception( '(%s) drmaa.Session.runJob() failed unconditionally', galaxy_id_tag )
trynum = 5
else:
log.error( "(%s) All attempts to submit job failed" % galaxy_id_tag )
if not fail_msg:
fail_msg = DEFAULT_JOB_PUT_FAILURE_MESSAGE
job_wrapper.fail( fail_msg )
self.ds.deleteJobTemplate( jt )
return
else:
job_wrapper.change_ownership_for_run()
log.debug( '(%s) submitting with credentials: %s [uid: %s]' % ( galaxy_id_tag, job_wrapper.user_system_pwent[0], job_wrapper.user_system_pwent[2] ) )
filename = self.store_jobtemplate(job_wrapper, jt)
self.userid = job_wrapper.user_system_pwent[2]
external_job_id = self.external_runjob(filename, job_wrapper.user_system_pwent[2]).strip()
log.info( "(%s) queued as %s" % ( galaxy_id_tag, external_job_id ) )
# store runner information for tracking if Galaxy restarts
job_wrapper.set_job_destination( job_destination, external_job_id )
# Store DRM related state information for job
ajs.job_id = external_job_id
ajs.old_state = 'new'
ajs.job_destination = job_destination
# delete the job template
self.ds.deleteJobTemplate( jt )
# Add to our 'queue' of jobs to monitor
self.monitor_queue.put( ajs )
def _complete_terminal_job( self, ajs, drmaa_state, **kwargs ):
"""
Handle a job upon its termination in the DRM. This method is meant to
be overridden by subclasses to improve post-mortem and reporting of
failures.
"""
if drmaa_state == drmaa.JobState.FAILED:
if ajs.job_wrapper.get_state() != model.Job.states.DELETED:
ajs.stop_job = False
ajs.fail_message = "The cluster DRM system terminated this job"
self.work_queue.put( ( self.fail_job, ajs ) )
elif drmaa_state == drmaa.JobState.DONE:
super( DRMAAJobRunner, self )._complete_terminal_job( ajs )
[docs] def check_watched_items( self ):
"""
Called by the monitor thread to look at each watched job and deal
with state changes.
"""
new_watched = []
for ajs in self.watched:
external_job_id = ajs.job_id
galaxy_id_tag = ajs.job_wrapper.get_id_tag()
old_state = ajs.old_state
try:
assert external_job_id not in ( None, 'None' ), '(%s/%s) Invalid job id' % ( galaxy_id_tag, external_job_id )
state = self.ds.jobStatus( external_job_id )
except ( drmaa.InternalException, drmaa.InvalidJobException ), e:
if isinstance( e , drmaa.InvalidJobException ):
ecn = "InvalidJobException".lower()
else:
ecn = "InternalException".lower()
retry_param = ecn.lower() + '_retries'
state_param = ecn.lower() + '_state'
retries = getattr( ajs, retry_param, 0 )
if self.runner_params[ retry_param ] > 0:
if retries < self.runner_params[ retry_param ]:
# will retry check on next iteration
setattr( ajs, retry_param, retries + 1 )
continue
if self.runner_params[ state_param ] == model.Job.states.OK:
log.info( "(%s/%s) job left DRM queue with following message: %s", galaxy_id_tag, external_job_id, e )
self.work_queue.put( ( self.finish_job, ajs ) )
elif self.runner_params[ state_param ] == model.Job.states.ERROR:
log.info( "(%s/%s) job check resulted in %s after %s tries: %s", galaxy_id_tag, external_job_id, ecn, retries, e )
self.work_queue.put( ( self.fail_job, ajs ) )
else:
raise Exception( "%s is set to an invalid value (%s), this should not be possible. See galaxy.jobs.drmaa.__init__()", state_param, self.runner_params[ state_param ] )
continue
except drmaa.DrmCommunicationException, e:
log.warning( "(%s/%s) unable to communicate with DRM: %s", galaxy_id_tag, external_job_id, e )
new_watched.append( ajs )
continue
except Exception, e:
# so we don't kill the monitor thread
log.exception( "(%s/%s) Unable to check job status: %s" % ( galaxy_id_tag, external_job_id, str( e ) ) )
log.warning( "(%s/%s) job will now be errored" % ( galaxy_id_tag, external_job_id ) )
ajs.fail_message = "Cluster could not complete job"
self.work_queue.put( ( self.fail_job, ajs ) )
continue
if state != old_state:
log.debug( "(%s/%s) state change: %s" % ( galaxy_id_tag, external_job_id, self.drmaa_job_state_strings[state] ) )
if state == drmaa.JobState.RUNNING and not ajs.running:
ajs.running = True
ajs.job_wrapper.change_state( model.Job.states.RUNNING )
if state in ( drmaa.JobState.FAILED, drmaa.JobState.DONE ):
self._complete_terminal_job( ajs, drmaa_state = state )
continue
if ajs.check_limits():
self.work_queue.put( ( self.fail_job, ajs ) )
continue
ajs.old_state = state
new_watched.append( ajs )
# Replace the watch list with the updated version
self.watched = new_watched
[docs] def stop_job( self, job ):
"""Attempts to delete a job from the DRM queue"""
try:
ext_id = job.get_job_runner_external_id()
assert ext_id not in ( None, 'None' ), 'External job id is None'
if self.external_killJob_script is None:
self.ds.control( ext_id, drmaa.JobControlAction.TERMINATE )
else:
# FIXME: hardcoded path
subprocess.Popen( [ '/usr/bin/sudo', '-E', self.external_killJob_script, str( ext_id ), str( self.userid ) ], shell=False )
log.debug( "(%s/%s) Removed from DRM queue at user's request" % ( job.get_id(), ext_id ) )
except drmaa.InvalidJobException:
log.debug( "(%s/%s) User killed running job, but it was already dead" % ( job.get_id(), ext_id ) )
except Exception, e:
log.debug( "(%s/%s) User killed running job, but error encountered removing from DRM queue: %s" % ( job.get_id(), ext_id, e ) )
[docs] def recover( self, job, job_wrapper ):
"""Recovers jobs stuck in the queued/running state when Galaxy started"""
job_id = job.get_job_runner_external_id()
if job_id is None:
self.put( job_wrapper )
return
ajs = AsynchronousJobState( files_dir=job_wrapper.working_directory, job_wrapper=job_wrapper )
ajs.job_id = str( job_id )
ajs.command_line = job.get_command_line()
ajs.job_wrapper = job_wrapper
ajs.job_destination = job_wrapper.job_destination
self.__old_state_paths( ajs )
if job.state == model.Job.states.RUNNING:
log.debug( "(%s/%s) is still in running state, adding to the DRM queue" % ( job.get_id(), job.get_job_runner_external_id() ) )
ajs.old_state = drmaa.JobState.RUNNING
ajs.running = True
self.monitor_queue.put( ajs )
elif job.get_state() == model.Job.states.QUEUED:
log.debug( "(%s/%s) is still in DRM queued state, adding to the DRM queue" % ( job.get_id(), job.get_job_runner_external_id() ) )
ajs.old_state = drmaa.JobState.QUEUED_ACTIVE
ajs.running = False
self.monitor_queue.put( ajs )
def __old_state_paths( self, ajs ):
"""For recovery of jobs started prior to standardizing the naming of
files in the AsychronousJobState object
"""
if ajs.job_wrapper is not None:
job_file = "%s/galaxy_%s.sh" % (self.app.config.cluster_files_directory, ajs.job_wrapper.job_id)
if not os.path.exists( ajs.job_file ) and os.path.exists( job_file ):
ajs.output_file = "%s.drmout" % os.path.join(os.getcwd(), ajs.job_wrapper.working_directory, ajs.job_wrapper.get_id_tag())
ajs.error_file = "%s.drmerr" % os.path.join(os.getcwd(), ajs.job_wrapper.working_directory, ajs.job_wrapper.get_id_tag())
ajs.exit_code_file = "%s.drmec" % os.path.join(os.getcwd(), ajs.job_wrapper.working_directory, ajs.job_wrapper.get_id_tag())
ajs.job_file = job_file
[docs] def store_jobtemplate(self, job_wrapper, jt):
""" Stores the content of a DRMAA JobTemplate object in a file as a JSON string.
Path is hard-coded, but it's no worse than other path in this module.
Uses Galaxy's JobID, so file is expected to be unique."""
filename = "%s/%s.jt_json" % (self.app.config.cluster_files_directory, job_wrapper.get_id_tag())
data = {}
for attr in DRMAA_jobTemplate_attributes:
try:
data[attr] = getattr(jt, attr)
except:
pass
s = json.dumps(data)
f = open(filename,'w')
f.write(s)
f.close()
log.debug( '(%s) Job script for external submission is: %s' % ( job_wrapper.job_id, filename ) )
return filename
[docs] def external_runjob(self, jobtemplate_filename, username):
""" runs an external script the will QSUB a new job.
The external script will be run with sudo, and will setuid() to the specified user.
Effectively, will QSUB as a different user (then the one used by Galaxy).
"""
script_parts = self.external_runJob_script.split()
script = script_parts[0]
command = [ '/usr/bin/sudo', '-E', script]
for script_argument in script_parts[1:]:
command.append(script_argument)
command.extend( [ str(username), jobtemplate_filename ] )
log.info("Running command %s" % command)
p = subprocess.Popen(command,
shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
(stdoutdata, stderrdata) = p.communicate()
exitcode = p.returncode
#os.unlink(jobtemplate_filename)
if exitcode != 0:
# There was an error in the child process
raise RuntimeError("External_runjob failed (exit code %s)\nChild process reported error:\n%s" % (str(exitcode), stderrdata))
if not stdoutdata.strip():
raise RuntimeError("External_runjob did return the job id: %s" % (stdoutdata))
# The expected output is a single line containing a single numeric value:
# the DRMAA job-ID. If not the case, will throw an error.
jobId = stdoutdata
return jobId