Source code for galaxy.webapps.galaxy.api.history_contents

"""
API operations on the contents of a history.
"""

from galaxy import exceptions
from galaxy import util

from galaxy.web import _future_expose_api as expose_api
from galaxy.web import _future_expose_api_anonymous as expose_api_anonymous
from galaxy.web import url_for

from galaxy.web.base.controller import BaseAPIController
from galaxy.web.base.controller import UsesLibraryMixin
from galaxy.web.base.controller import UsesLibraryMixinItems
from galaxy.web.base.controller import UsesTagsMixin

from galaxy.managers import histories
from galaxy.managers import hdas
from galaxy.managers.collections_util import api_payload_to_create_params
from galaxy.managers.collections_util import dictify_dataset_collection_instance
from galaxy.util import validation


import logging
log = logging.getLogger( __name__ )


[docs]class HistoryContentsController( BaseAPIController, UsesLibraryMixin, UsesLibraryMixinItems, UsesTagsMixin ): def __init__( self, app ): super( HistoryContentsController, self ).__init__( app ) self.hda_manager = hdas.HDAManager( app ) self.history_manager = histories.HistoryManager( app ) self.hda_serializer = hdas.HDASerializer( app ) self.hda_deserializer = hdas.HDADeserializer( app ) def _parse_serialization_params( self, kwd, default_view ): view = kwd.get( 'view', None ) keys = kwd.get( 'keys' ) if isinstance( keys, basestring ): keys = keys.split( ',' ) return dict( view=view, keys=keys, default_view=default_view ) @expose_api_anonymous
[docs] def index( self, trans, history_id, ids=None, **kwd ): """ index( self, trans, history_id, ids=None, **kwd ) * GET /api/histories/{history_id}/contents return a list of HDA data for the history with the given ``id`` .. note:: Anonymous users are allowed to get their current history contents If Ids is not given, index returns a list of *summary* objects for every HDA associated with the given `history_id`. If ids is given, index returns a *more complete* json object for each HDA in the ids list. :type history_id: str :param history_id: encoded id string of the HDA's History :type ids: str :param ids: (optional) a comma separated list of encoded `HDA` ids :param types: (optional) kinds of contents to index (currently just dataset, but dataset_collection will be added shortly). :type types: str :rtype: list :returns: dictionaries containing summary or detailed HDA information """ rval = [] history = self.history_manager.get_accessible( self.decode_id( history_id ), trans.user, current_history=trans.history ) # Allow passing in type or types - for continuity rest of methods # take in type - but this one can be passed multiple types and # type=dataset,dataset_collection is a bit silly. types = kwd.get( 'type', kwd.get( 'types', None ) ) or [] if types: types = util.listify(types) else: types = [ 'dataset', "dataset_collection" ] contents_kwds = { 'types': types } if ids: ids = map( lambda id: self.decode_id( id ), ids.split( ',' ) ) contents_kwds[ 'ids' ] = ids # If explicit ids given, always used detailed result. details = 'all' else: contents_kwds[ 'deleted' ] = kwd.get( 'deleted', None ) contents_kwds[ 'visible' ] = kwd.get( 'visible', None ) # details param allows a mixed set of summary and detailed hdas # Ever more convoluted due to backwards compat..., details # should be considered deprecated in favor of more specific # dataset_details (and to be implemented dataset_collection_details). details = kwd.get( 'details', None ) or kwd.get( 'dataset_details', None ) or [] if details and details != 'all': details = util.listify( details ) for content in history.contents_iter( **contents_kwds ): encoded_content_id = trans.security.encode_id( content.id ) detailed = details == 'all' or ( encoded_content_id in details ) if isinstance( content, trans.app.model.HistoryDatasetAssociation ): view = 'detailed' if detailed else 'summary' hda_dict = self.hda_serializer.serialize_to_view( content, view=view, user=trans.user, trans=trans ) rval.append( hda_dict ) elif isinstance( content, trans.app.model.HistoryDatasetCollectionAssociation ): view = 'element' if detailed else 'collection' collection_dict = self.__collection_dict( trans, content, view=view ) rval.append( collection_dict ) return rval
def __collection_dict( self, trans, dataset_collection_instance, view="collection" ): return dictify_dataset_collection_instance( dataset_collection_instance, security=trans.security, parent=dataset_collection_instance.history, view=view ) @expose_api_anonymous
[docs] def show( self, trans, id, history_id, **kwd ): """ show( self, trans, id, history_id, **kwd ) * GET /api/histories/{history_id}/contents/{id} return detailed information about an HDA within a history .. note:: Anonymous users are allowed to get their current history contents :type id: str :param ids: the encoded id of the HDA to return :type history_id: str :param history_id: encoded id string of the HDA's History :rtype: dict :returns: dictionary containing detailed HDA information """ history = self.history_manager.get_accessible( self.decode_id( history_id ), trans.user, current_history=trans.history ) contents_type = kwd.get('type', 'dataset') if contents_type == 'dataset': return self.__show_dataset( trans, id, **kwd ) elif contents_type == 'dataset_collection': return self.__show_dataset_collection( trans, id, history_id, **kwd ) else: return self.__handle_unknown_contents_type( trans, contents_type )
def __show_dataset( self, trans, id, **kwd ): hda = self.hda_manager.get_accessible( self.decode_id( id ), trans.user ) return self.hda_serializer.serialize_to_view( hda, user=trans.user, trans=trans, **self._parse_serialization_params( kwd, 'detailed' ) ) def __show_dataset_collection( self, trans, id, history_id, **kwd ): try: service = trans.app.dataset_collections_service dataset_collection_instance = service.get_dataset_collection_instance( trans=trans, instance_type='history', id=id, ) return self.__collection_dict( trans, dataset_collection_instance, view="element" ) except Exception, e: log.exception( "Error in history API at listing dataset collection: %s", e ) trans.response.status = 500 return { 'error': str( e ) } @expose_api_anonymous
[docs] def create( self, trans, history_id, payload, **kwd ): """ create( self, trans, history_id, payload, **kwd ) * POST /api/histories/{history_id}/contents/{type} create a new HDA by copying an accessible LibraryDataset :type history_id: str :param history_id: encoded id string of the new HDA's History :type type: str :param type: Type of history content - 'dataset' (default) or 'dataset_collection'. :type payload: dict :param payload: dictionary structure containing:: copy from library (for type 'dataset'): 'source' = 'library' 'content' = [the encoded id from the library dataset] copy from history dataset (for type 'dataset'): 'source' = 'hda' 'content' = [the encoded id from the HDA] copy from history dataset collection (for type 'dataset_collection') 'source' = 'hdca' 'content' = [the encoded id from the HDCA] create new history dataset collection (for type 'dataset_collection') 'source' = 'new_collection' (default 'source' if type is 'dataset_collection' - no need to specify this) 'collection_type' = For example, "list", "paired", "list:paired". 'name' = Name of new dataset collection. 'element_identifiers' = Recursive list structure defining collection. Each element must have 'src' which can be 'hda', 'ldda', 'hdca', or 'new_collection', as well as a 'name' which is the name of element (e.g. "forward" or "reverse" for paired datasets, or arbitrary sample names for instance for lists). For all src's except 'new_collection' - a encoded 'id' attribute must be included wiht element as well. 'new_collection' sources must defined a 'collection_type' and their own list of (potentially) nested 'element_identifiers'. ..note: Currently, a user can only copy an HDA from a history that the user owns. :rtype: dict :returns: dictionary containing detailed information for the new HDA """ # TODO: Flush out create new collection documentation above, need some # examples. See also bioblend and API tests for specific examples. history = self.history_manager.get_owned( self.decode_id( history_id ), trans.user, current_history=trans.history ) type = payload.get( 'type', 'dataset' ) if type == 'dataset': return self.__create_dataset( trans, history, payload, **kwd ) elif type == 'dataset_collection': return self.__create_dataset_collection( trans, history, payload, **kwd ) else: return self.__handle_unknown_contents_type( trans, type )
def __create_dataset( self, trans, history, payload, **kwd ): source = payload.get( 'source', None ) if source not in ( 'library', 'hda' ): raise exceptions.RequestParameterInvalidException( "'source' must be either 'library' or 'hda': %s" %( source ) ) content = payload.get( 'content', None ) if content is None: raise exceptions.RequestParameterMissingException( "'content' id of lda or hda is missing" ) # copy from library dataset hda = None if source == 'library': ld = self.get_library_dataset( trans, content, check_ownership=False, check_accessible=False ) #TODO: why would get_library_dataset NOT return a library dataset? if type( ld ) is not trans.app.model.LibraryDataset: raise exceptions.RequestParameterInvalidException( "Library content id ( %s ) is not a dataset" % content ) # insert into history hda = ld.library_dataset_dataset_association.to_history_dataset_association( history, add_to_history=True ) # copy an existing, accessible hda elif source == 'hda': unencoded_hda_id = self.decode_id( content ) original = self.hda_manager.get_accessible( unencoded_hda_id, trans.user ) # check for access on history that contains the original hda as well self.history_manager.error_unless_accessible( original.history, trans.user, current_history=trans.history ) data_copy = original.copy( copy_children=True ) hda = history.add_dataset( data_copy ) trans.sa_session.flush() if not hda: return None return self.hda_serializer.serialize_to_view( hda, user=trans.user, trans=trans, **self._parse_serialization_params( kwd, 'detailed' ) ) def __create_dataset_collection( self, trans, history, payload, **kwd ): source = kwd.get("source", "new_collection") service = trans.app.dataset_collections_service if source == "new_collection": create_params = api_payload_to_create_params( payload ) dataset_collection_instance = service.create( trans, parent=history, **create_params ) elif source == "hdca": content = payload.get( 'content', None ) if content is None: raise exceptions.RequestParameterMissingException( "'content' id of target to copy is missing" ) dataset_collection_instance = service.copy( trans=trans, parent=history, source="hdca", encoded_source_id=content, ) else: message = "Invalid 'source' parameter in request %s" % source raise exceptions.RequestParameterInvalidException(message) return self.__collection_dict( trans, dataset_collection_instance, view="element" ) @expose_api_anonymous
[docs] def update( self, trans, history_id, id, payload, **kwd ): """ update( self, trans, history_id, id, payload, **kwd ) * PUT /api/histories/{history_id}/contents/{id} updates the values for the HDA with the given ``id`` :type history_id: str :param history_id: encoded id string of the HDA's History :type id: str :param id: the encoded id of the history to undelete :type payload: dict :param payload: a dictionary containing any or all the fields in :func:`galaxy.model.HistoryDatasetAssociation.to_dict` and/or the following: * annotation: an annotation for the HDA :rtype: dict :returns: an error object if an error occurred or a dictionary containing any values that were different from the original and, therefore, updated """ #TODO: PUT /api/histories/{encoded_history_id} payload = { rating: rating } (w/ no security checks) contents_type = kwd.get('type', 'dataset') if contents_type == "dataset": return self.__update_dataset( trans, history_id, id, payload, **kwd ) elif contents_type == "dataset_collection": return self.__update_dataset_collection( trans, history_id, id, payload, **kwd ) else: return self.__handle_unknown_contents_type( trans, contents_type )
def __update_dataset( self, trans, history_id, id, payload, **kwd ): # anon user: ensure that history ids match up and the history is the current, # check for uploading, and use only the subset of attribute keys manipulatable by anon users if trans.user == None: hda = self.hda_manager.by_id( self.decode_id( id ) ) if hda.history != trans.history: raise exceptions.AuthenticationRequired( 'API authentication required for this request' ) hda = self.hda_manager.error_if_uploading( hda ) anon_allowed_payload = {} if 'deleted' in payload: anon_allowed_payload[ 'deleted' ] = payload[ 'deleted' ] if 'visible' in payload: anon_allowed_payload[ 'visible' ] = payload[ 'visible' ] payload = anon_allowed_payload # logged in user: use full payload, check state if deleting, and make sure the history is theirs else: hda = self.hda_manager.get_owned( self.decode_id( id ), trans.user, current_history=trans.history ) # only check_state if not deleting, otherwise cannot delete uploading files check_state = not payload.get( 'deleted', False ) if check_state: hda = self.hda_manager.error_if_uploading( hda ) # make the actual changes #TODO: is this if still needed? if hda and isinstance( hda, trans.model.HistoryDatasetAssociation ): self.hda_deserializer.deserialize( hda, payload, user=trans.user, trans=trans ) #TODO: this should be an effect of deleting the hda if payload.get( 'deleted', False ): self.hda_manager.stop_creating_job( hda ) return self.hda_serializer.serialize_to_view( hda, user=trans.user, trans=trans, **self._parse_serialization_params( kwd, 'detailed' ) ) return {} def __update_dataset_collection( self, trans, history_id, id, payload, **kwd ): return trans.app.dataset_collections_service.update( trans, "history", id, payload ) #TODO: allow anonymous del/purge and test security on this @expose_api
[docs] def delete( self, trans, history_id, id, purge=False, **kwd ): """ delete( self, trans, history_id, id, **kwd ) * DELETE /api/histories/{history_id}/contents/{id} delete the HDA with the given ``id`` .. note:: Currently does not stop any active jobs for which this dataset is an output. :type id: str :param id: the encoded id of the history to delete :type purge: bool :param purge: if True, purge the HDA :type kwd: dict :param kwd: (optional) dictionary structure containing: * payload: a dictionary itself containing: * purge: if True, purge the HDA .. note:: that payload optionally can be placed in the query string of the request. This allows clients that strip the request body to still purge the dataset. :rtype: dict :returns: an error object if an error occurred or a dictionary containing: * id: the encoded id of the history, * deleted: if the history was marked as deleted, * purged: if the history was purged """ contents_type = kwd.get('type', 'dataset') if contents_type == "dataset": return self.__delete_dataset( trans, history_id, id, purge=purge, **kwd ) elif contents_type == "dataset_collection": trans.app.dataset_collections_service.delete( trans, "history", id ) return { 'id' : id, "deleted": True } else: return self.__handle_unknown_contents_type( trans, contents_type )
def __delete_dataset( self, trans, history_id, id, purge, **kwd ): # get purge from the query or from the request body payload (a request body is optional here) purge = util.string_as_bool( purge ) if kwd.get( 'payload', None ): # payload takes priority purge = util.string_as_bool( kwd['payload'].get( 'purge', purge ) ) hda = self.hda_manager.get_owned( self.decode_id( id ), trans.user, current_history=trans.history ) self.hda_manager.error_if_uploading( hda ) if purge: self.hda_manager.purge( hda ) else: self.hda_manager.delete( hda ) return self.hda_serializer.serialize_to_view( hda, user=trans.user, trans=trans, **self._parse_serialization_params( kwd, 'detailed' ) ) def __handle_unknown_contents_type( self, trans, contents_type ): raise exceptions.UnknownContentsType('Unknown contents type: %s' % type)