"""
API operations for Workflows
"""
from __future__ import absolute_import
import uuid
import logging
import copy
import urllib
from sqlalchemy import desc, or_, and_
from galaxy import exceptions, util
from galaxy.model.item_attrs import UsesAnnotations
from galaxy.managers import histories
from galaxy.managers import workflows
from galaxy.web import _future_expose_api as expose_api
from galaxy.web.base.controller import BaseAPIController, url_for, UsesStoredWorkflowMixin
from galaxy.web.base.controller import SharableMixin
from galaxy.workflow.extract import extract_workflow
from galaxy.workflow.run import invoke, queue_invoke
from galaxy.workflow.run_request import build_workflow_run_config
from galaxy.workflow.modules import module_factory
log = logging.getLogger(__name__)
[docs]class WorkflowsAPIController(BaseAPIController, UsesStoredWorkflowMixin, UsesAnnotations, SharableMixin):
def __init__( self, app ):
super( WorkflowsAPIController, self ).__init__( app )
self.history_manager = histories.HistoryManager( app )
self.workflow_manager = workflows.WorkflowsManager( app )
self.workflow_contents_manager = workflows.WorkflowContentsManager()
@expose_api
[docs] def index(self, trans, **kwd):
"""
GET /api/workflows
Displays a collection of workflows.
:param show_published: if True, show also published workflows
:type show_published: boolean
"""
show_published = util.string_as_bool( kwd.get( 'show_published', 'False' ) )
rval = []
filter1 = ( trans.app.model.StoredWorkflow.user == trans.user )
if show_published:
filter1 = or_( filter1, ( trans.app.model.StoredWorkflow.published == True ) ) #noqa -- sqlalchemy comparison
for wf in trans.sa_session.query( trans.app.model.StoredWorkflow ).filter(
filter1, trans.app.model.StoredWorkflow.table.c.deleted == False ).order_by( #noqa -- sqlalchemy comparison
desc( trans.app.model.StoredWorkflow.table.c.update_time ) ).all():
item = wf.to_dict( value_mapper={ 'id': trans.security.encode_id } )
encoded_id = trans.security.encode_id(wf.id)
item['url'] = url_for('workflow', id=encoded_id)
item['owner'] = wf.user.username
rval.append(item)
for wf_sa in trans.sa_session.query( trans.app.model.StoredWorkflowUserShareAssociation ).filter_by(
user=trans.user ).join( 'stored_workflow' ).filter(
trans.app.model.StoredWorkflow.deleted == False ).order_by( #noqa -- sqlalchemy comparison
desc( trans.app.model.StoredWorkflow.update_time ) ).all():
item = wf_sa.stored_workflow.to_dict( value_mapper={ 'id': trans.security.encode_id } )
encoded_id = trans.security.encode_id(wf_sa.stored_workflow.id)
item['url'] = url_for( 'workflow', id=encoded_id )
item['owner'] = wf_sa.stored_workflow.user.username
rval.append(item)
return rval
@expose_api
[docs] def show(self, trans, id, **kwd):
"""
GET /api/workflows/{encoded_workflow_id}
Displays information needed to run a workflow from the command line.
"""
stored_workflow = self.__get_stored_workflow( trans, id )
if stored_workflow.importable is False and stored_workflow.user != trans.user and not trans.user_is_admin():
if trans.sa_session.query(trans.app.model.StoredWorkflowUserShareAssociation).filter_by(user=trans.user, stored_workflow=stored_workflow).count() == 0:
message = "Workflow is neither importable, nor owned by or shared with current user"
raise exceptions.ItemAccessibilityException( message )
return self.workflow_contents_manager.workflow_to_dict( trans, stored_workflow, style="instance" )
@expose_api
[docs] def create(self, trans, payload, **kwd):
"""
POST /api/workflows
Run or create workflows from the api.
If installed_repository_file or from_history_id is specified a new
workflow will be created for this user. Otherwise, workflow_id must be
specified and this API method will cause a workflow to execute.
:param installed_repository_file The path of a workflow to import. Either workflow_id, installed_repository_file or from_history_id must be specified
:type installed_repository_file str
:param workflow_id: An existing workflow id. Either workflow_id, installed_repository_file or from_history_id must be specified
:type workflow_id: str
:param parameters: If workflow_id is set - see _update_step_parameters()
:type parameters: dict
:param ds_map: If workflow_id is set - a dictionary mapping each input step id to a dictionary with 2 keys: 'src' (which can be 'ldda', 'ld' or 'hda') and 'id' (which should be the id of a LibraryDatasetDatasetAssociation, LibraryDataset or HistoryDatasetAssociation respectively)
:type ds_map: dict
:param no_add_to_history: If workflow_id is set - if present in the payload with any value, the input datasets will not be added to the selected history
:type no_add_to_history: str
:param history: If workflow_id is set - optional history where to run the workflow, either the name of a new history or "hist_id=HIST_ID" where HIST_ID is the id of an existing history. If not specified, the workflow will be run a new unnamed history
:type history: str
:param replacement_params: If workflow_id is set - an optional dictionary used when renaming datasets
:type replacement_params: dict
:param from_history_id: Id of history to extract a workflow from. Either workflow_id, installed_repository_file or from_history_id must be specified
:type from_history_id: str
:param job_ids: If from_history_id is set - optional list of jobs to include when extracting a workflow from history
:type job_ids: str
:param dataset_ids: If from_history_id is set - optional list of HDA `hid`s corresponding to workflow inputs when extracting a workflow from history
:type dataset_ids: str
:param dataset_collection_ids: If from_history_id is set - optional list of HDCA `hid`s corresponding to workflow inputs when extracting a workflow from history
:type dataset_collection_ids: str
:param workflow_name: If from_history_id is set - name of the workflow to create when extracting a workflow from history
:type workflow_name: str
"""
ways_to_create = set( [
'workflow_id',
'installed_repository_file',
'from_history_id',
'shared_workflow_id',
'workflow',
] ).intersection( payload )
if len( ways_to_create ) == 0:
message = "One parameter among - %s - must be specified" % ", ".join( ways_to_create )
raise exceptions.RequestParameterMissingException( message )
if len( ways_to_create ) > 1:
message = "Only one parameter among - %s - must be specified" % ", ".join( ways_to_create )
raise exceptions.RequestParameterInvalidException( message )
if 'installed_repository_file' in payload:
workflow_controller = trans.webapp.controllers[ 'workflow' ]
result = workflow_controller.import_workflow( trans=trans,
cntrller='api',
**payload)
return result
if 'from_history_id' in payload:
from_history_id = payload.get( 'from_history_id' )
from_history_id = self.decode_id( from_history_id )
history = self.history_manager.get_accessible( from_history_id, trans.user, current_history=trans.history )
job_ids = map( self.decode_id, payload.get( 'job_ids', [] ) )
dataset_ids = payload.get( 'dataset_ids', [] )
dataset_collection_ids = payload.get( 'dataset_collection_ids', [] )
workflow_name = payload[ 'workflow_name' ]
stored_workflow = extract_workflow(
trans=trans,
user=trans.get_user(),
history=history,
job_ids=job_ids,
dataset_ids=dataset_ids,
dataset_collection_ids=dataset_collection_ids,
workflow_name=workflow_name,
)
item = stored_workflow.to_dict( value_mapper={ 'id': trans.security.encode_id } )
item[ 'url' ] = url_for( 'workflow', id=item[ 'id' ] )
return item
if 'shared_workflow_id' in payload:
workflow_id = payload[ 'shared_workflow_id' ]
return self.__api_import_shared_workflow( trans, workflow_id, payload )
if 'workflow' in payload:
return self.__api_import_new_workflow( trans, payload, **kwd )
workflow_id = payload.get( 'workflow_id', None )
if not workflow_id:
message = "Invalid workflow_id specified."
raise exceptions.RequestParameterInvalidException( message )
# Get workflow + accessibility check.
stored_workflow = self.__get_stored_accessible_workflow( trans, workflow_id )
workflow = stored_workflow.latest_workflow
run_config = build_workflow_run_config( trans, workflow, payload )
history = run_config.target_history
# invoke may throw MessageExceptions on tool erors, failure
# to match up inputs, etc...
outputs, invocation = invoke(
trans=trans,
workflow=workflow,
workflow_run_config=run_config,
populate_state=True,
)
trans.sa_session.flush()
# Build legacy output - should probably include more information from
# outputs.
rval = {}
rval['history'] = trans.security.encode_id( history.id )
rval['outputs'] = []
for step in workflow.steps:
if step.type == 'tool' or step.type is None:
for v in outputs[ step.id ].itervalues():
rval[ 'outputs' ].append( trans.security.encode_id( v.id ) )
# Newer version of this API just returns the invocation as a dict, to
# facilitate migration - produce the newer style response and blend in
# the older information.
invocation_response = self.__encode_invocation( trans, invocation )
invocation_response.update( rval )
return invocation_response
@expose_api
[docs] def workflow_dict( self, trans, workflow_id, **kwd ):
"""
GET /api/workflows/{encoded_workflow_id}/download
Returns a selected workflow as a json dictionary.
"""
stored_workflow = self.__get_stored_accessible_workflow( trans, workflow_id )
style = kwd.get("style", "export")
ret_dict = self.workflow_contents_manager.workflow_to_dict( trans, stored_workflow, style=style )
if not ret_dict:
# This workflow has a tool that's missing from the distribution
message = "Workflow cannot be exported due to missing tools."
raise exceptions.MessageException( message )
return ret_dict
@expose_api
[docs] def delete( self, trans, id, **kwd ):
"""
DELETE /api/workflows/{encoded_workflow_id}
Deletes a specified workflow
Author: rpark
copied from galaxy.web.controllers.workflows.py (delete)
"""
workflow_id = id
try:
stored_workflow = trans.sa_session.query(self.app.model.StoredWorkflow).get(self.decode_id(workflow_id))
except Exception, e:
trans.response.status = 400
return ("Workflow with ID='%s' can not be found\n Exception: %s") % (workflow_id, str( e ))
# check to see if user has permissions to selected workflow
if stored_workflow.user != trans.user and not trans.user_is_admin():
trans.response.status = 403
return("Workflow is not owned by current user")
# Mark a workflow as deleted
stored_workflow.deleted = True
trans.sa_session.flush()
# TODO: Unsure of response message to let api know that a workflow was successfully deleted
return ( "Workflow '%s' successfully deleted" % stored_workflow.name )
@expose_api
[docs] def import_new_workflow_deprecated(self, trans, payload, **kwd):
"""
POST /api/workflows/upload
Importing dynamic workflows from the api. Return newly generated workflow id.
Author: rpark
# currently assumes payload['workflow'] is a json representation of a workflow to be inserted into the database
Deprecated in favor to POST /api/workflows with encoded 'workflow' in
payload the same way.
"""
return self.__api_import_new_workflow( trans, payload, **kwd )
@expose_api
[docs] def update( self, trans, id, payload, **kwds ):
"""
* PUT /api/workflows/{id}
updates the workflow stored with ``id``
:type id: str
:param id: the encoded id of the workflow to update
:type payload: dict
:param payload: a dictionary containing any or all the
* workflow the json description of the workflow as would be
produced by GET workflows/<id>/download or
given to `POST workflows`
The workflow contents will be updated to target
this.
:rtype: dict
:returns: serialized version of the workflow
"""
stored_workflow = self.__get_stored_workflow( trans, id )
if 'workflow' in payload:
workflow_contents_manager = workflows.WorkflowContentsManager()
workflow, errors = workflow_contents_manager.update_workflow_from_dict(
trans,
stored_workflow,
payload['workflow'],
)
else:
message = "Updating workflow requires dictionary containing 'workflow' attribute with new JSON description."
raise exceptions.RequestParameterInvalidException( message )
return self.workflow_contents_manager.workflow_to_dict( trans, stored_workflow, style="instance" )
@expose_api
[docs] def build_module( self, trans, payload={}):
"""
POST /api/workflows/build_module
Builds module details including a tool model for the workflow editor.
"""
tool_id = payload.get( 'tool_id', None )
tool_version = payload.get( 'tool_version', None )
tool_inputs = payload.get( 'inputs', None )
annotation = payload.get( 'annotation', '' )
# load tool
tool = self._get_tool( tool_id, tool_version=tool_version, user=trans.user )
# initialize module
trans.workflow_building_mode = True
module = module_factory.from_dict( trans, {
'type' : 'tool',
'tool_id' : tool.id,
'tool_state' : None
} )
# create tool model and default tool state (if missing)
tool_model = module.tool.to_json(trans, tool_inputs, is_workflow=True)
module.state.inputs = copy.deepcopy(tool_model['state_inputs'])
return {
'tool_model' : tool_model,
'tool_state' : module.get_state(),
'data_inputs' : module.get_data_inputs(),
'data_outputs' : module.get_data_outputs(),
'tool_errors' : module.get_errors(),
'form_html' : module.get_config_form(),
'annotation' : annotation,
'post_job_actions' : module.get_post_job_actions(tool_inputs)
}
#
# -- Helper methods --
#
def _get_tool( self, id, tool_version=None, user=None ):
id = urllib.unquote_plus( id )
tool = self.app.toolbox.get_tool( id, tool_version )
if not tool or not tool.allow_user_access( user ):
raise exceptions.ObjectNotFound("Could not find tool with id '%s'" % id)
return tool
def __api_import_new_workflow( self, trans, payload, **kwd ):
data = payload['workflow']
publish = util.string_as_bool( payload.get( "publish", False ) )
# If 'publish' set, default to importable.
importable = util.string_as_bool( payload.get( "importable", publish ) )
if publish and not importable:
raise exceptions.RequestParameterInvalidException( "Published workflow must be importable." )
from_dict_kwds = dict(
source="API",
publish=publish,
)
workflow, missing_tool_tups = self._workflow_from_dict( trans, data, **from_dict_kwds )
if importable:
self._make_item_accessible( trans.sa_session, workflow )
trans.sa_session.flush()
# galaxy workflow newly created id
workflow_id = workflow.id
# api encoded, id
encoded_id = trans.security.encode_id(workflow_id)
# return list
rval = []
item = workflow.to_dict(value_mapper={'id': trans.security.encode_id})
item['url'] = url_for('workflow', id=encoded_id)
rval.append(item)
return item
@expose_api
[docs] def import_shared_workflow_deprecated(self, trans, payload, **kwd):
"""
POST /api/workflows/import
Import a workflow shared by other users.
:param workflow_id: the workflow id (required)
:type workflow_id: str
:raises: exceptions.MessageException, exceptions.ObjectNotFound
"""
# Pull parameters out of payload.
workflow_id = payload.get('workflow_id', None)
if workflow_id is None:
raise exceptions.ObjectAttributeMissingException( "Missing required parameter 'workflow_id'." )
self.__api_import_shared_workflow( trans, workflow_id, payload )
def __api_import_shared_workflow( self, trans, workflow_id, payload, **kwd ):
try:
stored_workflow = self.get_stored_workflow( trans, workflow_id, check_ownership=False )
except:
raise exceptions.ObjectNotFound( "Malformed workflow id ( %s ) specified." % workflow_id )
if stored_workflow.importable is False:
raise exceptions.ItemAccessibilityException( 'The owner of this workflow has disabled imports via this link.' )
elif stored_workflow.deleted:
raise exceptions.ItemDeletionException( "You can't import this workflow because it has been deleted." )
imported_workflow = self._import_shared_workflow( trans, stored_workflow )
item = imported_workflow.to_dict( value_mapper={ 'id': trans.security.encode_id } )
encoded_id = trans.security.encode_id(imported_workflow.id)
item['url'] = url_for('workflow', id=encoded_id)
return item
@expose_api
[docs] def invoke( self, trans, workflow_id, payload, **kwd ):
"""
POST /api/workflows/{encoded_workflow_id}/invocations
Schedule the workflow specified by `workflow_id` to run.
"""
# /usage is awkward in this context but is consistent with the rest of
# this module. Would prefer to redo it all to use /invocation(s).
# Get workflow + accessibility check.
stored_workflow = self.__get_stored_accessible_workflow( trans, workflow_id )
workflow = stored_workflow.latest_workflow
run_config = build_workflow_run_config( trans, workflow, payload )
workflow_scheduler_id = payload.get( "scheduler", None )
# TODO: workflow scheduler hints
work_request_params = dict( scheduler=workflow_scheduler_id )
workflow_invocation = queue_invoke(
trans=trans,
workflow=workflow,
workflow_run_config=run_config,
request_params=work_request_params
)
return self.encode_all_ids( trans, workflow_invocation.to_dict(), recursive=True )
@expose_api
[docs] def index_invocations(self, trans, workflow_id, **kwd):
"""
GET /api/workflows/{workflow_id}/invocations
Get the list of the workflow invocations
:param workflow_id: the workflow id (required)
:type workflow_id: str
:raises: exceptions.MessageException, exceptions.ObjectNotFound
"""
stored_workflow = self.__get_stored_workflow(trans, workflow_id)
results = self.workflow_manager.build_invocations_query( trans, stored_workflow.id )
out = []
for r in results:
out.append( self.__encode_invocation( trans, r ) )
return out
@expose_api
[docs] def show_invocation(self, trans, workflow_id, invocation_id, **kwd):
"""
GET /api/workflows/{workflow_id}/invocation/{invocation_id}
Get detailed description of workflow invocation
:param workflow_id: the workflow id (required)
:type workflow_id: str
:param invocation_id: the invocation id (required)
:type invocation_id: str
:raises: exceptions.MessageException, exceptions.ObjectNotFound
"""
decoded_workflow_invocation_id = self.decode_id( invocation_id )
workflow_invocation = self.workflow_manager.get_invocation( trans, decoded_workflow_invocation_id )
if workflow_invocation:
return self.__encode_invocation( trans, workflow_invocation )
return None
@expose_api
[docs] def cancel_invocation(self, trans, workflow_id, invocation_id, **kwd):
"""
DELETE /api/workflows/{workflow_id}/invocation/{invocation_id}
Cancel the specified workflow invocation.
:param workflow_id: the workflow id (required)
:type workflow_id: str
:param invocation_id: the usage id (required)
:type invocation_id: str
:raises: exceptions.MessageException, exceptions.ObjectNotFound
"""
decoded_workflow_invocation_id = self.decode_id( invocation_id )
workflow_invocation = self.workflow_manager.cancel_invocation( trans, decoded_workflow_invocation_id )
return self.__encode_invocation( trans, workflow_invocation )
@expose_api
[docs] def invocation_step(self, trans, workflow_id, invocation_id, step_id, **kwd):
"""
GET /api/workflows/{workflow_id}/invocation/{invocation_id}/steps/{step_id}
:param workflow_id: the workflow id (required)
:type workflow_id: str
:param invocation_id: the invocation id (required)
:type invocation_id: str
:param step_id: encoded id of the WorkflowInvocationStep (required)
:type step_id: str
:param payload: payload containing update action information
for running workflow.
:raises: exceptions.MessageException, exceptions.ObjectNotFound
"""
decoded_invocation_step_id = self.decode_id( step_id )
invocation_step = self.workflow_manager.get_invocation_step(
trans,
decoded_invocation_step_id
)
return self.__encode_invocation_step( trans, invocation_step )
@expose_api
[docs] def update_invocation_step(self, trans, workflow_id, invocation_id, step_id, payload, **kwd):
"""
PUT /api/workflows/{workflow_id}/invocation/{invocation_id}/steps/{step_id}
Update state of running workflow step invocation - still very nebulous
but this would be for stuff like confirming paused steps can proceed
etc....
:param workflow_id: the workflow id (required)
:type workflow_id: str
:param invocation_id: the usage id (required)
:type invocation_id: str
:param step_id: encoded id of the WorkflowInvocationStep (required)
:type step_id: str
:raises: exceptions.MessageException, exceptions.ObjectNotFound
"""
decoded_invocation_step_id = self.decode_id( step_id )
action = payload.get( "action", None )
invocation_step = self.workflow_manager.update_invocation_step(
trans,
decoded_invocation_step_id,
action=action,
)
return self.__encode_invocation_step( trans, invocation_step )
def __encode_invocation_step( self, trans, invocation_step ):
return self.encode_all_ids(
trans,
invocation_step.to_dict( 'element' ),
True
)
def __get_stored_accessible_workflow( self, trans, workflow_id ):
stored_workflow = self.__get_stored_workflow( trans, workflow_id )
# check to see if user has permissions to selected workflow
if stored_workflow.user != trans.user and not trans.user_is_admin():
if trans.sa_session.query(trans.app.model.StoredWorkflowUserShareAssociation).filter_by(user=trans.user, stored_workflow=stored_workflow).count() == 0:
message = "Workflow is not owned by or shared with current user"
raise exceptions.ItemAccessibilityException( message )
return stored_workflow
def __get_stored_workflow( self, trans, workflow_id ):
if util.is_uuid(workflow_id):
# see if they have passed in the UUID for a workflow that is attached to a stored workflow
workflow_uuid = uuid.UUID(workflow_id)
stored_workflow = trans.sa_session.query(trans.app.model.StoredWorkflow).filter( and_(
trans.app.model.StoredWorkflow.latest_workflow_id == trans.app.model.Workflow.id,
trans.app.model.Workflow.uuid == workflow_uuid
)).first()
if stored_workflow is None:
raise exceptions.ObjectNotFound( "Workflow not found: %s" % workflow_id )
else:
workflow_id = self.decode_id( workflow_id )
query = trans.sa_session.query( trans.app.model.StoredWorkflow )
stored_workflow = query.get( workflow_id )
if stored_workflow is None:
raise exceptions.ObjectNotFound( "No such workflow found." )
return stored_workflow
def __encode_invocation( self, trans, invocation, view="element" ):
return self.encode_all_ids(
trans,
invocation.to_dict( view ),
True
)