Source code for tool_shed.util.workflow_util

""" Tool shed helper methods for dealing with workflows - only two methods are
utilized outside of this modules - generate_workflow_image and import_workflow.
"""
import logging
import os

import galaxy.tools
import galaxy.tools.parameters
from galaxy.util import json
from galaxy.util.sanitize_html import sanitize_html
from galaxy.workflow.render import WorkflowCanvas
from galaxy.workflow.steps import attach_ordered_steps
from galaxy.workflow.modules import module_types
from galaxy.workflow.modules import ToolModule
from galaxy.workflow.modules import WorkflowModuleFactory

from tool_shed.tools import tool_validator

from tool_shed.util import encoding_util
from tool_shed.util import metadata_util
from tool_shed.util import shed_util_common as suc

log = logging.getLogger( __name__ )


[docs]class RepoToolModule( ToolModule ): type = "tool" def __init__( self, trans, repository_id, changeset_revision, tools_metadata, tool_id ): self.trans = trans self.tools_metadata = tools_metadata self.tool_id = tool_id self.tool = None self.errors = None self.tv = tool_validator.ToolValidator( trans.app ) if trans.webapp.name == 'tool_shed': # We're in the tool shed. for tool_dict in tools_metadata: if self.tool_id in [ tool_dict[ 'id' ], tool_dict[ 'guid' ] ]: repository, self.tool, message = self.tv.load_tool_from_changeset_revision( repository_id, changeset_revision, tool_dict[ 'tool_config' ] ) if message and self.tool is None: self.errors = 'unavailable' break else: # We're in Galaxy. self.tool = trans.app.toolbox.get_tool( self.tool_id ) if self.tool is None: self.errors = 'unavailable' self.post_job_actions = {} self.workflow_outputs = [] self.state = None @classmethod
[docs] def from_dict( Class, trans, step_dict, repository_id, changeset_revision, tools_metadata, secure=True ): tool_id = step_dict[ 'tool_id' ] module = Class( trans, repository_id, changeset_revision, tools_metadata, tool_id ) module.state = galaxy.tools.DefaultToolState() if module.tool is not None: module.state.decode( step_dict[ "tool_state" ], module.tool, module.trans.app, secure=secure ) module.errors = step_dict.get( "tool_errors", None ) return module
@classmethod
[docs] def from_workflow_step( Class, trans, step, repository_id, changeset_revision, tools_metadata ): module = Class( trans, repository_id, changeset_revision, tools_metadata, step.tool_id ) module.state = galaxy.tools.DefaultToolState() if module.tool: module.state.inputs = module.tool.params_from_strings( step.tool_inputs, trans.app, ignore_errors=True ) else: module.state.inputs = {} module.errors = step.tool_errors return module
[docs] def get_data_inputs( self ): data_inputs = [] def callback( input, value, prefixed_name, prefixed_label ): if isinstance( input, galaxy.tools.parameters.DataToolParameter ): data_inputs.append( dict( name=prefixed_name, label=prefixed_label, extensions=input.extensions ) ) if self.tool: try: galaxy.tools.parameters.visit_input_values( self.tool.inputs, self.state.inputs, callback ) except: # TODO have this actually use default parameters? Fix at # refactor, needs to be discussed wrt: reproducibility though. log.exception("Tool parse failed for %s -- this indicates incompatibility of local tool version with expected version by the workflow." % self.tool.id) return data_inputs
[docs] def get_data_outputs( self ): data_outputs = [] if self.tool: data_inputs = None for name, tool_output in self.tool.outputs.iteritems(): if tool_output.format_source is not None: # Default to special name "input" which remove restrictions on connections formats = [ 'input' ] if data_inputs is None: data_inputs = self.get_data_inputs() # Find the input parameter referenced by format_source for di in data_inputs: # Input names come prefixed with conditional and repeat names separated by '|', # so remove prefixes when comparing with format_source. if di[ 'name' ] is not None and di[ 'name' ].split( '|' )[ -1 ] == tool_output.format_source: formats = di[ 'extensions' ] else: formats = [ tool_output.format ] for change_elem in tool_output.change_format: for when_elem in change_elem.findall( 'when' ): format = when_elem.get( 'format', None ) if format and format not in formats: formats.append( format ) data_outputs.append( dict( name=name, extensions=formats ) ) return data_outputs
[docs]class RepoWorkflowModuleFactory( WorkflowModuleFactory ): def __init__( self, module_types ): self.module_types = module_types
[docs] def from_dict( self, trans, repository_id, changeset_revision, step_dict, tools_metadata, **kwd ): """Return module initialized from the data in dictionary `step_dict`.""" type = step_dict[ 'type' ] assert type in self.module_types module_method_kwds = dict( **kwd ) if type == "tool": module_method_kwds[ 'repository_id' ] = repository_id module_method_kwds[ 'changeset_revision' ] = changeset_revision module_method_kwds[ 'tools_metadata' ] = tools_metadata return self.module_types[ type ].from_dict( trans, step_dict, **module_method_kwds )
[docs] def from_workflow_step( self, trans, repository_id, changeset_revision, tools_metadata, step ): """Return module initialized from the WorkflowStep object `step`.""" type = step.type module_method_kwds = dict( ) if type == "tool": module_method_kwds[ 'repository_id' ] = repository_id module_method_kwds[ 'changeset_revision' ] = changeset_revision module_method_kwds[ 'tools_metadata' ] = tools_metadata return self.module_types[ type ].from_workflow_step( trans, step, **module_method_kwds )
tool_shed_module_types = module_types.copy() tool_shed_module_types[ 'tool' ] = RepoToolModule module_factory = RepoWorkflowModuleFactory( tool_shed_module_types )
[docs]def generate_workflow_image( trans, workflow_name, repository_metadata_id=None, repository_id=None ): """ Return an svg image representation of a workflow dictionary created when the workflow was exported. This method is called from both Galaxy and the tool shed. When called from the tool shed, repository_metadata_id will have a value and repository_id will be None. When called from Galaxy, repository_metadata_id will be None and repository_id will have a value. """ workflow_name = encoding_util.tool_shed_decode( workflow_name ) if trans.webapp.name == 'tool_shed': # We're in the tool shed. repository_metadata = metadata_util.get_repository_metadata_by_id( trans.app, repository_metadata_id ) repository_id = trans.security.encode_id( repository_metadata.repository_id ) changeset_revision = repository_metadata.changeset_revision metadata = repository_metadata.metadata else: # We're in Galaxy. repository = suc.get_tool_shed_repository_by_id( trans.app, repository_id ) changeset_revision = repository.changeset_revision metadata = repository.metadata # metadata[ 'workflows' ] is a list of tuples where each contained tuple is # [ <relative path to the .ga file in the repository>, <exported workflow dict> ] for workflow_tup in metadata[ 'workflows' ]: workflow_dict = workflow_tup[1] if workflow_dict[ 'name' ] == workflow_name: break if 'tools' in metadata: tools_metadata = metadata[ 'tools' ] else: tools_metadata = [] workflow, missing_tool_tups = get_workflow_from_dict( trans=trans, workflow_dict=workflow_dict, tools_metadata=tools_metadata, repository_id=repository_id, changeset_revision=changeset_revision ) workflow_canvas = WorkflowCanvas() canvas = workflow_canvas.canvas # Store px width for boxes of each step. for step in workflow.steps: step.upgrade_messages = {} module = module_factory.from_workflow_step( trans, repository_id, changeset_revision, tools_metadata, step ) tool_errors = module.type == 'tool' and not module.tool module_data_inputs = get_workflow_data_inputs( step, module ) module_data_outputs = get_workflow_data_outputs( step, module, workflow.steps ) module_name = get_workflow_module_name( module, missing_tool_tups ) workflow_canvas.populate_data_for_step( step, module_name, module_data_inputs, module_data_outputs, tool_errors=tool_errors ) workflow_canvas.add_steps( highlight_errors=True ) workflow_canvas.finish( ) trans.response.set_content_type( "image/svg+xml" ) return canvas.standalone_xml()
[docs]def get_workflow_data_inputs( step, module ): if module.type == 'tool': if module.tool: return module.get_data_inputs() else: data_inputs = [] for wfsc in step.input_connections: data_inputs_dict = {} data_inputs_dict[ 'extensions' ] = [ '' ] data_inputs_dict[ 'name' ] = wfsc.input_name data_inputs_dict[ 'label' ] = 'Unknown' data_inputs.append( data_inputs_dict ) return data_inputs return module.get_data_inputs()
[docs]def get_workflow_data_outputs( step, module, steps ): if module.type == 'tool': if module.tool: return module.get_data_outputs() else: data_outputs = [] data_outputs_dict = {} data_outputs_dict[ 'extensions' ] = [ 'input' ] found = False for workflow_step in steps: for wfsc in workflow_step.input_connections: if step.name == wfsc.output_step.name: data_outputs_dict[ 'name' ] = wfsc.output_name found = True break if found: break if not found: # We're at the last step of the workflow. data_outputs_dict[ 'name' ] = 'output' data_outputs.append( data_outputs_dict ) return data_outputs return module.get_data_outputs()
[docs]def get_workflow_from_dict( trans, workflow_dict, tools_metadata, repository_id, changeset_revision ): """ Return an in-memory Workflow object from the dictionary object created when it was exported. This method is called from both Galaxy and the tool shed to retrieve a Workflow object that can be displayed as an SVG image. This method is also called from Galaxy to retrieve a Workflow object that can be used for saving to the Galaxy database. """ trans.workflow_building_mode = True workflow = trans.model.Workflow() workflow.name = workflow_dict[ 'name' ] workflow.has_errors = False steps = [] # Keep ids for each step that we need to use to make connections. steps_by_external_id = {} # Keep track of tools required by the workflow that are not available in # the tool shed repository. Each tuple in the list of missing_tool_tups # will be ( tool_id, tool_name, tool_version ). missing_tool_tups = [] # First pass to build step objects and populate basic values for key, step_dict in workflow_dict[ 'steps' ].iteritems(): # Create the model class for the step step = trans.model.WorkflowStep() step.name = step_dict[ 'name' ] step.position = step_dict[ 'position' ] module = module_factory.from_dict( trans, repository_id, changeset_revision, step_dict, tools_metadata=tools_metadata, secure=False ) if module.type == 'tool' and module.tool is None: # A required tool is not available in the current repository. step.tool_errors = 'unavailable' missing_tool_tup = ( step_dict[ 'tool_id' ], step_dict[ 'name' ], step_dict[ 'tool_version' ] ) if missing_tool_tup not in missing_tool_tups: missing_tool_tups.append( missing_tool_tup ) module.save_to_step( step ) if step.tool_errors: workflow.has_errors = True # Stick this in the step temporarily. step.temp_input_connections = step_dict[ 'input_connections' ] if trans.webapp.name == 'galaxy': annotation = step_dict.get( 'annotation', '') if annotation: annotation = sanitize_html( annotation, 'utf-8', 'text/html' ) new_step_annotation = trans.model.WorkflowStepAnnotationAssociation() new_step_annotation.annotation = annotation new_step_annotation.user = trans.user step.annotations.append( new_step_annotation ) # Unpack and add post-job actions. post_job_actions = step_dict.get( 'post_job_actions', {} ) for name, pja_dict in post_job_actions.items(): trans.model.PostJobAction( pja_dict[ 'action_type' ], step, pja_dict[ 'output_name' ], pja_dict[ 'action_arguments' ] ) steps.append( step ) steps_by_external_id[ step_dict[ 'id' ] ] = step # Second pass to deal with connections between steps. for step in steps: # Input connections. for input_name, conn_dict in step.temp_input_connections.iteritems(): if conn_dict: output_step = steps_by_external_id[ conn_dict[ 'id' ] ] conn = trans.model.WorkflowStepConnection() conn.input_step = step conn.input_name = input_name conn.output_step = output_step conn.output_name = conn_dict[ 'output_name' ] step.input_connections.append( conn ) del step.temp_input_connections # Order the steps if possible. attach_ordered_steps( workflow, steps ) # Return the in-memory Workflow object for display or later persistence to the Galaxy database. return workflow, missing_tool_tups
[docs]def get_workflow_module_name( module, missing_tool_tups ): module_name = module.get_name() if module.type == 'tool' and module_name == 'unavailable': for missing_tool_tup in missing_tool_tups: missing_tool_id, missing_tool_name, missing_tool_version = missing_tool_tup if missing_tool_id == module.tool_id: module_name = '%s' % missing_tool_name break return module_name
[docs]def import_workflow( trans, repository, workflow_name ): """Import a workflow contained in an installed tool shed repository into Galaxy (this method is called only from Galaxy).""" status = 'done' message = '' changeset_revision = repository.changeset_revision metadata = repository.metadata workflows = metadata.get( 'workflows', [] ) tools_metadata = metadata.get( 'tools', [] ) workflow_dict = None for workflow_data_tuple in workflows: # The value of workflow_data_tuple is ( relative_path_to_workflow_file, exported_workflow_dict ). relative_path_to_workflow_file, exported_workflow_dict = workflow_data_tuple if exported_workflow_dict[ 'name' ] == workflow_name: # If the exported workflow is available on disk, import it. if os.path.exists( relative_path_to_workflow_file ): workflow_file = open( relative_path_to_workflow_file, 'rb' ) workflow_data = workflow_file.read() workflow_file.close() workflow_dict = json.loads( workflow_data ) else: # Use the current exported_workflow_dict. workflow_dict = exported_workflow_dict break if workflow_dict: # Create workflow if possible. workflow, missing_tool_tups = get_workflow_from_dict( trans=trans, workflow_dict=workflow_dict, tools_metadata=tools_metadata, repository_id=repository.id, changeset_revision=changeset_revision ) # Save the workflow in the Galaxy database. Pass workflow_dict along to create annotation at this point. stored_workflow = save_workflow( trans, workflow, workflow_dict ) # Use the latest version of the saved workflow. workflow = stored_workflow.latest_workflow if workflow_name: workflow.name = workflow_name # Provide user feedback and show workflow list. if workflow.has_errors: message += "Imported, but some steps in this workflow have validation errors. " status = "error" if workflow.has_cycles: message += "Imported, but this workflow contains cycles. " status = "error" else: message += "Workflow <b>%s</b> imported successfully. " % workflow.name if missing_tool_tups: name_and_id_str = '' for missing_tool_tup in missing_tool_tups: tool_id, tool_name, other = missing_tool_tup name_and_id_str += 'name: %s, id: %s' % ( str( tool_id ), str( tool_name ) ) message += "The following tools required by this workflow are missing from this Galaxy instance: %s. " % name_and_id_str else: workflow = None message += 'The workflow named %s is not included in the metadata for revision %s of repository %s' % \ ( str( workflow_name ), str( changeset_revision ), str( repository.name ) ) status = 'error' return workflow, status, message
[docs]def save_workflow( trans, workflow, workflow_dict=None): """Use the received in-memory Workflow object for saving to the Galaxy database.""" stored = trans.model.StoredWorkflow() stored.name = workflow.name workflow.stored_workflow = stored stored.latest_workflow = workflow stored.user = trans.user if workflow_dict and workflow_dict.get('annotation', ''): annotation = sanitize_html( workflow_dict['annotation'], 'utf-8', 'text/html' ) new_annotation = trans.model.StoredWorkflowAnnotationAssociation() new_annotation.annotation = annotation new_annotation.user = trans.user stored.annotations.append(new_annotation) trans.sa_session.add( stored ) trans.sa_session.flush() # Add a new entry to the Workflows menu. if trans.user.stored_workflow_menu_entries is None: trans.user.stored_workflow_menu_entries = [] menuEntry = trans.model.StoredWorkflowMenuEntry() menuEntry.stored_workflow = stored trans.user.stored_workflow_menu_entries.append( menuEntry ) trans.sa_session.flush() return stored