import logging
import inspect
import os
import sys
log = logging.getLogger( __name__ )
import galaxy.jobs.rules
from galaxy.jobs import stock_rules
from .rule_helper import RuleHelper
DYNAMIC_RUNNER_NAME = "dynamic"
DYNAMIC_DESTINATION_ID = "dynamic_legacy_from_url"
ERROR_MESSAGE_NO_RULE_FUNCTION = "Galaxy misconfigured - cannot find dynamic rule function name for destination %s."
ERROR_MESSAGE_RULE_FUNCTION_NOT_FOUND = "Galaxy misconfigured - no rule function named %s found in dynamic rule modules."
[docs]class JobMappingException( Exception ):
def __init__( self, failure_message ):
self.failure_message = failure_message
[docs]class JobNotReadyException( Exception ):
def __init__( self, job_state=None, message=None ):
self.job_state = job_state
self.message = message
STOCK_RULES = dict(
choose_one=stock_rules.choose_one,
burst=stock_rules.burst,
docker_dispatch=stock_rules.docker_dispatch,
)
[docs]class JobRunnerMapper( object ):
"""
This class is responsible to managing the mapping of jobs
(in the form of job_wrappers) to job runner url strings.
"""
def __init__( self, job_wrapper, url_to_destination, job_config ):
self.job_wrapper = job_wrapper
self.url_to_destination = url_to_destination
self.job_config = job_config
self.rules_module = galaxy.jobs.rules
if job_config.dynamic_params is not None:
rules_module_name = job_config.dynamic_params['rules_module']
__import__(rules_module_name)
self.rules_module = sys.modules[rules_module_name]
def __get_rule_modules( self ):
unsorted_module_names = self.__get_rule_module_names( )
## Load modules in reverse order to allow hierarchical overrides
## i.e. 000_galaxy_rules.py, 100_site_rules.py, 200_instance_rules.py
module_names = sorted( unsorted_module_names, reverse=True )
modules = []
for rule_module_name in module_names:
try:
module = __import__( rule_module_name )
for comp in rule_module_name.split( "." )[1:]:
module = getattr( module, comp )
modules.append( module )
except BaseException, exception:
exception_str = str( exception )
message = "%s rule module could not be loaded: %s" % ( rule_module_name, exception_str )
log.debug( message )
continue
return modules
def __get_rule_module_names( self ):
rules_dir = self.rules_module.__path__[0]
names = []
for fname in os.listdir( rules_dir ):
if not( fname.startswith( "_" ) ) and fname.endswith( ".py" ):
base_name = self.rules_module.__name__
rule_module_name = "%s.%s" % (base_name, fname[:-len(".py")])
names.append( rule_module_name )
return names
def __invoke_expand_function( self, expand_function, destination_params ):
function_arg_names = inspect.getargspec( expand_function ).args
app = self.job_wrapper.app
possible_args = {
"job_id": self.job_wrapper.job_id,
"tool": self.job_wrapper.tool,
"tool_id": self.job_wrapper.tool.id,
"job_wrapper": self.job_wrapper,
"rule_helper": RuleHelper( app ),
"app": app
}
actual_args = {}
# Send through any job_conf.xml defined args to function
for destination_param in destination_params.keys():
if destination_param in function_arg_names:
actual_args[ destination_param ] = destination_params[ destination_param ]
# Populate needed args
for possible_arg_name in possible_args:
if possible_arg_name in function_arg_names:
actual_args[ possible_arg_name ] = possible_args[ possible_arg_name ]
# Don't hit the DB to load the job object if not needed
require_db = False
for param in ["job", "user", "user_email", "resource_params", "workflow_invocation_uuid"]:
if param in function_arg_names:
require_db = True
break
if require_db:
job = self.job_wrapper.get_job()
user = job.user
user_email = user and str(user.email)
if "job" in function_arg_names:
actual_args[ "job" ] = job
if "user" in function_arg_names:
actual_args[ "user" ] = user
if "user_email" in function_arg_names:
actual_args[ "user_email" ] = user_email
if "resource_params" in function_arg_names:
# Find the dymically inserted resource parameters and give them
# to rule.
param_values = self.__job_params( job )
resource_params = {}
try:
resource_params_raw = param_values[ "__job_resource" ]
if resource_params_raw[ "__job_resource__select" ].lower() in [ "1", "yes", "true" ]:
for key, value in resource_params_raw.iteritems():
resource_params[ key ] = value
except KeyError:
pass
actual_args[ "resource_params" ] = resource_params
if "workflow_invocation_uuid" in function_arg_names:
param_values = job.raw_param_dict( )
workflow_invocation_uuid = param_values.get( "__workflow_invocation_uuid__", None )
actual_args[ "workflow_invocation_uuid" ] = workflow_invocation_uuid
return expand_function( **actual_args )
def __job_params( self, job ):
app = self.job_wrapper.app
param_values = job.get_param_values( app, ignore_errors=True )
return param_values
def __convert_url_to_destination( self, url ):
"""
Job runner URLs are deprecated, but dynamic mapper functions may still
be returning them. Runners are expected to be able to convert these to
destinations.
This method calls
JobHandlerQueue.DefaultJobDispatcher.url_to_destination, which in turn
calls the url_to_destination method for the appropriate runner.
"""
dest = self.url_to_destination( url )
dest['id'] = DYNAMIC_DESTINATION_ID
return dest
def __determine_expand_function_name( self, destination ):
# default look for function with name matching an id of tool, unless one specified
expand_function_name = destination.params.get('function', None)
if not expand_function_name:
for tool_id in self.job_wrapper.tool.all_ids:
if self.__last_rule_module_with_function( tool_id ):
expand_function_name = tool_id
break
return expand_function_name
def __get_expand_function( self, expand_function_name ):
matching_rule_module = self.__last_rule_module_with_function( expand_function_name )
if matching_rule_module:
expand_function = getattr( matching_rule_module, expand_function_name )
return expand_function
else:
message = ERROR_MESSAGE_RULE_FUNCTION_NOT_FOUND % ( expand_function_name )
raise Exception( message )
def __last_rule_module_with_function( self, function_name ):
# self.rule_modules is sorted in reverse order, so find first
# wiht function
for rule_module in self.__get_rule_modules( ):
if hasattr( rule_module, function_name ):
return rule_module
return None
def __handle_dynamic_job_destination( self, destination ):
expand_type = destination.params.get('type', "python")
expand_function = None
if expand_type == "python":
expand_function_name = self.__determine_expand_function_name( destination )
if not expand_function_name:
message = ERROR_MESSAGE_NO_RULE_FUNCTION % destination
raise Exception( message )
expand_function = self.__get_expand_function( expand_function_name )
elif expand_type in STOCK_RULES:
expand_function = STOCK_RULES[ expand_type ]
else:
raise Exception( "Unhandled dynamic job runner type specified - %s" % expand_type )
return self.__handle_rule( expand_function, destination )
def __handle_rule( self, rule_function, destination ):
job_destination = self.__invoke_expand_function( rule_function, destination.params )
if not isinstance(job_destination, galaxy.jobs.JobDestination):
job_destination_rep = str(job_destination) # Should be either id or url
if '://' in job_destination_rep:
job_destination = self.__convert_url_to_destination(job_destination_rep)
else:
job_destination = self.job_config.get_destination(job_destination_rep)
return job_destination
def __cache_job_destination( self, params, raw_job_destination=None ):
if raw_job_destination is None:
raw_job_destination = self.job_wrapper.tool.get_job_destination( params )
if raw_job_destination.runner == DYNAMIC_RUNNER_NAME:
job_destination = self.__handle_dynamic_job_destination( raw_job_destination )
else:
job_destination = raw_job_destination
self.cached_job_destination = job_destination
[docs] def get_job_destination( self, params ):
"""
Cache the job_destination to avoid recalculation.
"""
if not hasattr( self, 'cached_job_destination' ):
self.__cache_job_destination( params )
return self.cached_job_destination
[docs] def cache_job_destination( self, raw_job_destination ):
self.__cache_job_destination( None, raw_job_destination=raw_job_destination )
return self.cached_job_destination