"""
Module for managing data transfer jobs.
"""
import logging, urllib2, re, shutil
from galaxy import eggs
from sqlalchemy import and_
from galaxy.util.odict import odict
from galaxy.workflow.modules import module_factory
from galaxy.jobs.actions.post import ActionBox
from galaxy.jobs.deferred import FakeTrans
from galaxy.tools.parameters import visit_input_values
from galaxy.tools.parameters.basic import DataToolParameter
from galaxy.datatypes import sniff
log = logging.getLogger( __name__ )
__all__ = [ 'DataTransfer' ]
[docs]class DataTransfer( object ):
check_interval = 15
dataset_name_re = re.compile( '(dataset\d+)_(name)' )
dataset_datatype_re = re.compile( '(dataset\d+)_(datatype)' )
def __init__( self, app ):
self.app = app
self.sa_session = app.model.context.current
[docs] def create_job( self, trans, **kwd ):
raise Exception( "Unimplemented Method" )
[docs] def check_job( self, job ):
raise Exception( "Unimplemented Method" )
[docs] def run_job( self, job ):
if job.params[ 'type' ] == 'init_transfer':
# TODO: don't create new downloads on restart.
if job.params[ 'protocol' ] in [ 'http', 'https' ]:
results = []
for result in job.params[ 'results' ].values():
result[ 'transfer_job' ] = self.app.transfer_manager.new( protocol=job.params[ 'protocol' ],
name=result[ 'name' ],
datatype=result[ 'datatype' ],
url=result[ 'url' ] )
results.append( result )
elif job.params[ 'protocol' ] == 'scp':
results = []
result = {}
sample_datasets_dict = job.params[ 'sample_datasets_dict' ]
# sample_datasets_dict looks something like the following. The outer dictionary keys are SampleDataset ids.
# {'7': {'status': 'Not started', 'name': '3.bed', 'file_path': '/tmp/library/3.bed', 'sample_id': 7,
# 'external_service_id': 2, 'error_msg': '', 'size': '8.0K'}}
for sample_dataset_id, sample_dataset_info_dict in sample_datasets_dict.items():
result = {}
result[ 'transfer_job' ] = self.app.transfer_manager.new( protocol=job.params[ 'protocol' ],
host=job.params[ 'host' ],
user_name=job.params[ 'user_name' ],
password=job.params[ 'password' ],
sample_dataset_id=sample_dataset_id,
status=sample_dataset_info_dict[ 'status' ],
name=sample_dataset_info_dict[ 'name' ],
file_path=sample_dataset_info_dict[ 'file_path' ],
sample_id=sample_dataset_info_dict[ 'sample_id' ],
external_service_id=sample_dataset_info_dict[ 'external_service_id' ],
error_msg=sample_dataset_info_dict[ 'error_msg' ],
size=sample_dataset_info_dict[ 'size' ] )
results.append( result )
self.app.transfer_manager.run( [ r[ 'transfer_job' ] for r in results ] )
for result in results:
transfer_job = result.pop( 'transfer_job' )
self.create_job( None,
transfer_job_id=transfer_job.id,
result=transfer_job.params,
sample_id=job.params[ 'sample_id' ] )
# Update the state of the relevant SampleDataset
new_status = self.app.model.SampleDataset.transfer_status.IN_QUEUE
self._update_sample_dataset_status( protocol=job.params[ 'protocol' ],
sample_id=job.params[ 'sample_id' ],
result_dict=transfer_job.params,
new_status=new_status,
error_msg='' )
job.state = self.app.model.DeferredJob.states.OK
self.sa_session.add( job )
self.sa_session.flush()
# TODO: Error handling: failure executing, or errors returned from the manager
if job.params[ 'type' ] == 'finish_transfer':
protocol = job.params[ 'protocol' ]
# Update the state of the relevant SampleDataset
new_status = self.app.model.SampleDataset.transfer_status.ADD_TO_LIBRARY
if protocol in [ 'http', 'https' ]:
result_dict = job.params[ 'result' ]
library_dataset_name = result_dict[ 'name' ]
extension = result_dict[ 'datatype' ]
elif protocol in [ 'scp' ]:
# In this case, job.params will be a dictionary that contains a key named 'result'. The value
# of the result key is a dictionary that looks something like:
# {'sample_dataset_id': '8', 'status': 'Not started', 'protocol': 'scp', 'name': '3.bed',
# 'file_path': '/data/library/3.bed', 'host': '127.0.0.1', 'sample_id': 8, 'external_service_id': 2,
# 'local_path': '/tmp/kjl2Ss4', 'password': 'galaxy', 'user_name': 'gvk', 'error_msg': '', 'size': '8.0K'}
try:
tj = self.sa_session.query( self.app.model.TransferJob ).get( int( job.params['transfer_job_id'] ) )
result_dict = tj.params
result_dict['local_path'] = tj.path
except Exception, e:
log.error( "Updated transfer result unavailable, using old result. Error was: %s" % str( e ) )
result_dict = job.params[ 'result' ]
library_dataset_name = result_dict[ 'name' ]
# Determine the data format (see the relevant TODO item in the manual_data_transfer plugin)..
extension = sniff.guess_ext( result_dict[ 'local_path' ], sniff_order=self.app.datatypes_registry.sniff_order )
self._update_sample_dataset_status( protocol=job.params[ 'protocol' ],
sample_id=int( job.params[ 'sample_id' ] ),
result_dict=result_dict,
new_status=new_status,
error_msg='' )
sample = self.sa_session.query( self.app.model.Sample ).get( int( job.params[ 'sample_id' ] ) )
ld = self.app.model.LibraryDataset( folder=sample.folder, name=library_dataset_name )
self.sa_session.add( ld )
self.sa_session.flush()
self.app.security_agent.copy_library_permissions( FakeTrans( self.app ), sample.folder, ld )
ldda = self.app.model.LibraryDatasetDatasetAssociation( name = library_dataset_name,
extension = extension,
dbkey = '?',
library_dataset = ld,
create_dataset = True,
sa_session = self.sa_session )
ldda.message = 'Transferred by the Data Transfer Plugin'
self.sa_session.add( ldda )
self.sa_session.flush()
ldda.state = ldda.states.QUEUED # flushed in the set property
ld.library_dataset_dataset_association_id = ldda.id
self.sa_session.add( ld )
self.sa_session.flush()
try:
# Move the dataset from its temporary location
shutil.move( job.transfer_job.path, ldda.file_name )
ldda.init_meta()
for name, spec in ldda.metadata.spec.items():
if name not in [ 'name', 'info', 'dbkey', 'base_name' ]:
if spec.get( 'default' ):
setattr( ldda.metadata, name, spec.unwrap( spec.get( 'default' ) ) )
self.app.datatypes_registry.set_external_metadata_tool.tool_action.execute( self.app.datatypes_registry.set_external_metadata_tool,
FakeTrans( self.app,
history=sample.history,
user=sample.request.user ),
incoming = { 'input1':ldda } )
ldda.state = ldda.states.OK
# TODO: not sure if this flush is necessary
self.sa_session.add( ldda )
self.sa_session.flush()
except Exception, e:
log.exception( 'Failure preparing library dataset for finished transfer job (id: %s) via deferred job (id: %s):' % \
( str( job.transfer_job.id ), str( job.id ) ) )
ldda.state = ldda.states.ERROR
if sample.workflow:
log.debug( "\n\nLogging sample mappings as: %s" % sample.workflow[ 'mappings' ] )
log.debug( "job.params: %s" % job.params )
# We have a workflow. Update all mappings to ldda's, and when the final one is done
# execute_workflow with either the provided history, or a new one.
sub_done = True
rep_done = False
for k, v in sample.workflow[ 'mappings' ].iteritems():
if not 'hda' in v and v[ 'ds_tag' ].startswith( 'hi|' ):
sample.workflow[ 'mappings' ][ k ][ 'hda' ] = self.app.security.decode_id( v[ 'ds_tag' ][3:] )
for key, value in sample.workflow[ 'mappings' ].iteritems():
if 'url' in value and value[ 'url' ] == job.params[ 'result' ][ 'url' ]:
# DBTODO Make sure all ds| mappings get the URL of the dataset, for linking to later.
# If this dataset maps to what we just finished, update the ldda id in the sample.
sample.workflow[ 'mappings' ][ key ][ 'ldda' ] = ldda.id
rep_done = True
# DBTODO replace the hi| mappings with the hda here. Just rip off the first three chars.
elif not 'ldda' in value and not 'hda' in value:
# We're not done if some mappings still don't have ldda or hda mappings.
sub_done = False
if sub_done and rep_done:
if not sample.history:
new_history = self.app.model.History( name="New History From %s" % sample.name, user=sample.request.user )
self.sa_session.add( new_history )
sample.history = new_history
self.sa_session.flush()
self._execute_workflow( sample )
# Check the workflow for substitution done-ness
self.sa_session.add( sample )
self.sa_session.flush()
elif sample.history:
# We don't have a workflow, but a history was provided.
# No processing, go ahead and chunk everything in the history.
if ldda.dataset.state in [ 'new', 'upload', 'queued', 'running', 'empty', 'discarded' ]:
log.error("Cannot import dataset '%s' to user history since its state is '%s'. " % ( ldda.name, ldda.dataset.state ))
elif ldda.dataset.state in [ 'ok', 'error' ]:
ldda.to_history_dataset_association( target_history=sample.history, add_to_history=True )
# Finished
job.state = self.app.model.DeferredJob.states.OK
self.sa_session.add( job )
self.sa_session.flush()
# Update the state of the relevant SampleDataset
new_status = self.app.model.SampleDataset.transfer_status.COMPLETE
self._update_sample_dataset_status( protocol=job.params[ 'protocol' ],
sample_id=int( job.params[ 'sample_id' ] ),
result_dict=job.params[ 'result' ],
new_status=new_status,
error_msg='' )
if sample.datasets and not sample.untransferred_dataset_files:
# Update the state of the sample to the sample's request type's final state.
new_state = sample.request.type.final_sample_state
self._update_sample_state( sample.id, new_state )
# Update the state of the request, if possible
self._update_request_state( sample.request.id )
def _missing_params( self, params, required_params ):
missing_params = filter( lambda x: x not in params, required_params )
if missing_params:
log.error( 'Job parameters missing required keys: %s' % ', '.join( missing_params ) )
return True
return False
def _update_sample_dataset_status( self, protocol, sample_id, result_dict, new_status, error_msg=None ):
# result_dict looks something like:
# {'url': '127.0.0.1/data/filtered_subreads.fa', 'name': 'Filtered reads'}
# Check if the new status is a valid transfer status
valid_statuses = [ v[1] for v in self.app.model.SampleDataset.transfer_status.items() ]
# TODO: error checking on valid new_status value
if protocol in [ 'http', 'https' ]:
sample_dataset = self.sa_session.query( self.app.model.SampleDataset ) \
.filter( and_( self.app.model.SampleDataset.table.c.sample_id == sample_id,
self.app.model.SampleDataset.table.c.name == result_dict[ 'name' ],
self.app.model.SampleDataset.table.c.file_path == result_dict[ 'url' ] ) ) \
.first()
elif protocol in [ 'scp' ]:
sample_dataset = self.sa_session.query( self.app.model.SampleDataset ).get( int( result_dict[ 'sample_dataset_id' ] ) )
sample_dataset.status = new_status
sample_dataset.error_msg = error_msg
self.sa_session.add( sample_dataset )
self.sa_session.flush()
def _update_sample_state( self, sample_id, new_state, comment=None ):
sample = self.sa_session.query( self.app.model.Sample ).get( sample_id )
if comment is None:
comment = 'Sample state set to %s' % str( new_state )
event = self.app.model.SampleEvent( sample, new_state, comment )
self.sa_session.add( event )
self.sa_session.flush()
def _update_request_state( self, request_id ):
request = self.sa_session.query( self.app.model.Request ).get( request_id )
# Make sure all the samples of the current request have the same state
common_state = request.samples_have_common_state
if not common_state:
# If the current request state is complete and one of its samples moved from
# the final sample state, then move the request state to In-progress
if request.is_complete:
message = "At least 1 sample state moved from the final sample state, so now the request's state is (%s)" % request.states.SUBMITTED
event = self.app.model.RequestEvent( request, request.states.SUBMITTED, message )
self.sa_session.add( event )
self.sa_session.flush()
else:
final_state = False
request_type_state = request.type.final_sample_state
if common_state.id == request_type_state.id:
# Since all the samples are in the final state, change the request state to 'Complete'
comment = "All samples of this sequencing request are in the final sample state (%s). " % request_type_state.name
state = request.states.COMPLETE
final_state = True
else:
comment = "All samples of this sequencing request are in the (%s) sample state. " % common_state.name
state = request.states.SUBMITTED
event = self.app.model.RequestEvent( request, state, comment )
self.sa_session.add( event )
self.sa_session.flush()
# TODO: handle email notification if it is configured to be sent when the samples are in this state.
def _execute_workflow( self, sample):
for key, value in sample.workflow['mappings'].iteritems():
if 'hda' not in value and 'ldda' in value:
# If HDA is already here, it's an external input, we're not copying anything.
ldda = self.sa_session.query( self.app.model.LibraryDatasetDatasetAssociation ).get( value['ldda'] )
if ldda.dataset.state in [ 'new', 'upload', 'queued', 'running', 'empty', 'discarded' ]:
log.error("Cannot import dataset '%s' to user history since its state is '%s'. " % ( ldda.name, ldda.dataset.state ))
elif ldda.dataset.state in [ 'ok', 'error' ]:
hda = ldda.to_history_dataset_association( target_history=sample.history, add_to_history=True )
sample.workflow['mappings'][key]['hda'] = hda.id
self.sa_session.add( sample )
self.sa_session.flush()
workflow_dict = sample.workflow
import copy
new_wf_dict = copy.deepcopy(workflow_dict)
for key in workflow_dict['mappings']:
if not isinstance(key, int):
new_wf_dict['mappings'][int(key)] = workflow_dict['mappings'][key]
workflow_dict = new_wf_dict
fk_trans = FakeTrans(self.app, history = sample.history, user=sample.request.user)
workflow = self.sa_session.query(self.app.model.Workflow).get(workflow_dict['id'])
if not workflow:
log.error("Workflow mapping failure.")
return
if len( workflow.steps ) == 0:
log.error( "Workflow cannot be run because it does not have any steps" )
return
if workflow.has_cycles:
log.error( "Workflow cannot be run because it contains cycles" )
return
if workflow.has_errors:
log.error( "Workflow cannot be run because of validation errors in some steps" )
return
# Build the state for each step
errors = {}
has_upgrade_messages = False
has_errors = False
# Build a fake dictionary prior to execution.
# Prepare each step
for step in workflow.steps:
step.upgrade_messages = {}
# Contruct modules
if step.type == 'tool' or step.type is None:
# Restore the tool state for the step
step.module = module_factory.from_workflow_step( fk_trans, step )
# Fix any missing parameters
step.upgrade_messages = step.module.check_and_update_state()
if step.upgrade_messages:
has_upgrade_messages = True
# Any connected input needs to have value DummyDataset (these
# are not persisted so we need to do it every time)
step.module.add_dummy_datasets( connections=step.input_connections )
# Store state with the step
step.state = step.module.state
# Error dict
if step.tool_errors:
has_errors = True
errors[step.id] = step.tool_errors
else:
## Non-tool specific stuff?
step.module = module_factory.from_workflow_step( fk_trans, step )
step.state = step.module.get_runtime_state()
# Connections by input name
step.input_connections_by_name = dict( ( conn.input_name, conn ) for conn in step.input_connections )
for step in workflow.steps:
step.upgrade_messages = {}
# Connections by input name
step.input_connections_by_name = \
dict( ( conn.input_name, conn ) for conn in step.input_connections )
# Extract just the arguments for this step by prefix
step_errors = None
if step.type == 'tool' or step.type is None:
module = module_factory.from_workflow_step( fk_trans, step )
# Fix any missing parameters
step.upgrade_messages = module.check_and_update_state()
if step.upgrade_messages:
has_upgrade_messages = True
# Any connected input needs to have value DummyDataset (these
# are not persisted so we need to do it every time)
module.add_dummy_datasets( connections=step.input_connections )
# Get the tool
tool = module.tool
# Get the state
step.state = state = module.state
# Get old errors
old_errors = state.inputs.pop( "__errors__", {} )
if step_errors:
errors[step.id] = state.inputs["__errors__"] = step_errors
# Run each step, connecting outputs to inputs
workflow_invocation = self.app.model.WorkflowInvocation()
workflow_invocation.workflow = workflow
outputs = odict()
for i, step in enumerate( workflow.steps ):
job = None
if step.type == 'tool' or step.type is None:
tool = self.app.toolbox.get_tool( step.tool_id )
def callback( input, value, prefixed_name, prefixed_label ):
if isinstance( input, DataToolParameter ):
if prefixed_name in step.input_connections_by_name:
conn = step.input_connections_by_name[ prefixed_name ]
return outputs[ conn.output_step.id ][ conn.output_name ]
visit_input_values( tool.inputs, step.state.inputs, callback )
job, out_data = tool.execute( fk_trans, step.state.inputs, history=sample.history)
outputs[ step.id ] = out_data
for pja in step.post_job_actions:
if pja.action_type in ActionBox.immediate_actions:
ActionBox.execute(self.app, self.sa_session, pja, job, replacement_dict=None)
else:
job.add_post_job_action(pja)
else:
job, out_data = step.module.execute( fk_trans, step.state)
outputs[ step.id ] = out_data
if step.id in workflow_dict['mappings']:
data = self.sa_session.query( self.app.model.HistoryDatasetAssociation ).get( workflow_dict['mappings'][str(step.id)]['hda'] )
outputs[ step.id ]['output'] = data
workflow_invocation_step = self.app.model.WorkflowInvocationStep()
workflow_invocation_step.workflow_invocation = workflow_invocation
workflow_invocation_step.workflow_step = step
workflow_invocation_step.job = job
self.sa_session.add( workflow_invocation )
self.sa_session.flush()