"""
Queue for running deferred code via plugins.
"""
import os, sys, logging, threading
from Queue import Queue, Empty
from galaxy import model
from galaxy.util.bunch import Bunch
log = logging.getLogger( __name__ )
[docs]class DeferredJobQueue( object ):
job_states = Bunch( READY = 'ready',
WAIT = 'wait',
INVALID = 'invalid' )
def __init__( self, app ):
self.app = app
self.sa_session = app.model.context.current
self.queue = Queue()
self.plugins = {}
self._load_plugins()
self.sleeper = Sleeper()
self.running = True
self.waiting_jobs = []
self.__check_jobs_at_startup()
self.monitor_thread = threading.Thread( target=self.__monitor )
self.monitor_thread.start()
log.info( 'Deferred job queue started' )
def _load_plugins( self ):
for fname in os.listdir( os.path.dirname( __file__ ) ):
if not fname.startswith( '_' ) and fname.endswith( '.py' ):
name = fname[:-3]
module_name = 'galaxy.jobs.deferred.' + name
try:
module = __import__( module_name )
except:
log.exception( 'Deferred job plugin appears to exist but is not loadable: %s' % module_name )
continue
for comp in module_name.split( "." )[1:]:
module = getattr( module, comp )
if '__all__' not in dir( module ):
log.error( 'Plugin "%s" does not contain a list of exported classes in __all__' % module_name )
continue
for obj in module.__all__:
display_name = ':'.join( ( module_name, obj ) )
plugin = getattr( module, obj )
for name in ( 'check_job', 'run_job' ):
if name not in dir( plugin ):
log.error( 'Plugin "%s" does not contain required method "%s()"' % ( display_name, name ) )
break
else:
self.plugins[obj] = plugin( self.app )
self.plugins[obj].job_states = self.job_states
log.debug( 'Loaded deferred job plugin: %s' % display_name )
def __check_jobs_at_startup( self ):
waiting_jobs = self.sa_session.query( model.DeferredJob ) \
.filter( model.DeferredJob.state == model.DeferredJob.states.WAITING ).all()
for job in waiting_jobs:
if not self.__check_job_plugin( job ):
continue
if 'check_interval' in dir( self.plugins[job.plugin] ):
job.check_interval = self.plugins[job.plugin].check_interval
log.info( 'Recovered deferred job (id: %s) at startup' % job.id )
# Pass the job ID as opposed to the job, since the monitor thread
# needs to load it in its own threadlocal scoped session.
self.waiting_jobs.append( job.id )
def __monitor( self ):
while self.running:
try:
self.__monitor_step()
except:
log.exception( 'Exception in monitor_step' )
self.sleeper.sleep( 1 )
log.info( 'job queue stopped' )
def __monitor_step( self ):
# TODO: Querying the database with this frequency is bad, we need message passing
new_jobs = self.sa_session.query( model.DeferredJob ) \
.filter( model.DeferredJob.state == model.DeferredJob.states.NEW ).all()
for job in new_jobs:
if not self.__check_job_plugin( job ):
continue
job.state = model.DeferredJob.states.WAITING
self.sa_session.add( job )
self.sa_session.flush()
if 'check_interval' in dir( self.plugins[job.plugin] ):
job.check_interval = self.plugins[job.plugin].check_interval
self.waiting_jobs.append( job )
new_waiting = []
for job in self.waiting_jobs:
try:
# Recovered jobs are passed in by ID
assert type( job ) is int
job = self.sa_session.query( model.DeferredJob ).get( job )
except:
pass
if job.is_check_time:
try:
job_state = self.plugins[job.plugin].check_job( job )
except Exception, e:
self.__fail_job( job )
log.exception( 'Set deferred job %s to error because of an exception in check_job(): %s' % ( job.id, str( e ) ) )
continue
if job_state == self.job_states.READY:
try:
self.plugins[job.plugin].run_job( job )
except Exception, e:
self.__fail_job( job )
log.exception( 'Set deferred job %s to error because of an exception in run_job(): %s' % ( job.id, str( e ) ) )
continue
elif job_state == self.job_states.INVALID:
self.__fail_job( job )
log.error( 'Unable to run deferred job (id: %s): Plugin "%s" marked it as invalid' % ( job.id, job.plugin ) )
continue
else:
new_waiting.append( job )
job.last_check = 'now'
else:
new_waiting.append( job )
self.waiting_jobs = new_waiting
def __check_job_plugin( self, job ):
if job.plugin not in self.plugins:
log.error( 'Invalid deferred job plugin: %s' ) % job.plugin
job.state = model.DeferredJob.states.ERROR
self.sa_session.add( job )
self.sa_session.flush()
return False
return True
def __check_if_ready_to_run( self, job ):
return self.plugins[job.plugin].check_job( job )
def __fail_job( self, job ):
job.state = model.DeferredJob.states.ERROR
self.sa_session.add( job )
self.sa_session.flush()
[docs] def shutdown( self ):
self.running = False
self.sleeper.wake()
[docs]class Sleeper( object ):
"""
Provides a 'sleep' method that sleeps for a number of seconds *unless*
the notify method is called (from a different thread).
"""
def __init__( self ):
self.condition = threading.Condition()
[docs] def sleep( self, seconds ):
self.condition.acquire()
self.condition.wait( seconds )
self.condition.release()
[docs] def wake( self ):
self.condition.acquire()
self.condition.notify()
self.condition.release()
[docs]class FakeTrans( object ):
"""A fake trans for calling the external set metadata tool"""
def __init__( self, app, history=None, user=None):
class Dummy( object ):
def __init__( self ):
self.id = None
self.app = app
self.sa_session = app.model.context.current
self.dummy = Dummy()
if not history:
self.history = Dummy()
else:
self.history = history
if not user:
self.user = Dummy()
else:
self.user = user
self.model = app.model
[docs] def get_galaxy_session( self ):
return self.dummy
[docs] def log_event( self, message, tool_id=None ):
pass
[docs] def get_current_user_roles( self ):
if self.user:
return self.user.all_roles()
else:
return []
[docs] def db_dataset_for( self, dbkey ):
if self.history is None:
return None
datasets = self.sa_session.query( self.app.model.HistoryDatasetAssociation ) \
.filter_by( deleted=False, history_id=self.history.id, extension="len" )
for ds in datasets:
if dbkey == ds.dbkey:
return ds
return None