Source code for galaxy.jobs.deferred.manual_data_transfer

"""
Generic module for managing manual data transfer jobs using Galaxy's built-in file browser.
This module can be used by various external services that are configured to transfer data manually.
"""
import logging, urllib2, re, shutil
from data_transfer import *

log = logging.getLogger( __name__ )

__all__ = [ 'ManualDataTransferPlugin' ]

[docs]class ManualDataTransferPlugin( DataTransfer ): def __init__( self, app ): super( ManualDataTransferPlugin, self ).__init__( app )
[docs] def create_job( self, trans, **kwd ): if 'sample' in kwd and 'sample_datasets' in kwd and 'external_service' in kwd and 'external_service_type' in kwd: sample = kwd[ 'sample' ] sample_datasets = kwd[ 'sample_datasets' ] external_service = kwd[ 'external_service' ] external_service_type = kwd[ 'external_service_type' ] # TODO: is there a better way to store the protocol? protocol = external_service_type.data_transfer.keys()[0] host = external_service.form_values.content[ 'host' ] user_name = external_service.form_values.content[ 'user_name' ] password = external_service.form_values.content[ 'password' ] # TODO: In the future, we may want to implement a way for the user to associate a selected file with one of # the run outputs configured in the <run_details><results> section of the external service config file. The # following was a first pass at implementing something (the datatype was included in the sample_dataset_dict), # but without a way for the user to associate stuff it's useless. However, allowing the user this ability may # open a can of worms, so maybe we shouldn't do it??? # #for run_result_file_name, run_result_file_datatype in external_service_type.run_details[ 'results' ].items(): # # external_service_type.run_details[ 'results' ] looks something like: {'dataset1_name': 'dataset1_datatype'} # if run_result_file_datatype in external_service.form_values.content: # datatype = external_service.form_values.content[ run_result_file_datatype ] # # When the transfer is automatic (the process used in the SMRT Portal plugin), the datasets and datatypes # can be matched up to those configured in the <run_details><results> settings in the external service type config # (e.g., pacific_biosciences_smrt_portal.xml). However, that's a bit trickier here since the user is manually # selecting files for transfer. sample_datasets_dict = {} for sample_dataset in sample_datasets: sample_dataset_id = sample_dataset.id sample_dataset_dict = dict( sample_id = sample_dataset.sample.id, name = sample_dataset.name, file_path = sample_dataset.file_path, status = sample_dataset.status, error_msg = sample_dataset.error_msg, size = sample_dataset.size, external_service_id = sample_dataset.external_service.id ) sample_datasets_dict[ sample_dataset_id ] = sample_dataset_dict params = { 'type' : 'init_transfer', 'sample_id' : sample.id, 'sample_datasets_dict' : sample_datasets_dict, 'protocol' : protocol, 'host' : host, 'user_name' : user_name, 'password' : password } elif 'transfer_job_id' in kwd: params = { 'type' : 'finish_transfer', 'protocol' : kwd[ 'result' ][ 'protocol' ], 'sample_id' : kwd[ 'sample_id' ], 'result' : kwd[ 'result' ], 'transfer_job_id' : kwd[ 'transfer_job_id' ] } else: log.error( 'No job was created because kwd does not include "samples" and "sample_datasets" or "transfer_job_id".' ) return deferred_job = self.app.model.DeferredJob( state=self.app.model.DeferredJob.states.NEW, plugin='ManualDataTransferPlugin', params=params ) self.sa_session.add( deferred_job ) self.sa_session.flush() log.debug( 'Created a deferred job in the ManualDataTransferPlugin of type: %s' % params[ 'type' ] ) # TODO: error reporting to caller (if possible?)
[docs] def check_job( self, job ): if self._missing_params( job.params, [ 'type' ] ): return self.job_states.INVALID if job.params[ 'type' ] == 'init_transfer': if job.params[ 'protocol' ] in [ 'http', 'https' ]: raise Exception( "Manual data transfer is not yet supported for http(s)." ) elif job.params[ 'protocol' ] == 'scp': if self._missing_params( job.params, [ 'protocol', 'host', 'user_name', 'password', 'sample_id', 'sample_datasets_dict' ] ): return self.job_states.INVALID # TODO: what kind of checks do we need here? return self.job_states.READY return self.job_states.WAIT if job.params[ 'type' ] == 'finish_transfer': if self._missing_params( job.params, [ 'transfer_job_id' ] ): return self.job_states.INVALID # Get the TransferJob object and add it to the DeferredJob so we only look it up once. if not hasattr( job, 'transfer_job' ): job.transfer_job = self.sa_session.query( self.app.model.TransferJob ).get( int( job.params[ 'transfer_job_id' ] ) ) state = self.app.transfer_manager.get_state( job.transfer_job ) if not state: log.error( 'No state for transfer job id: %s' % job.transfer_job.id ) return self.job_states.WAIT if state[ 'state' ] in self.app.model.TransferJob.terminal_states: return self.job_states.READY log.debug( "Checked on finish transfer job %s, not done yet." % job.id ) return self.job_states.WAIT else: log.error( 'Unknown job type for ManualDataTransferPlugin: %s' % str( job.params[ 'type' ] ) ) return self.job_states.INVALID