Source code for galaxy.managers.collections

from galaxy import model
from galaxy.dataset_collections import builder
from galaxy.dataset_collections.matching import MatchingCollections
from galaxy.dataset_collections.registry import DatasetCollectionTypesRegistry
from galaxy.dataset_collections.type_description import CollectionTypeDescriptionFactory
from galaxy.exceptions import ItemAccessibilityException
from galaxy.exceptions import MessageException
from galaxy.exceptions import RequestParameterInvalidException
from galaxy.managers import hdas  # TODO: Refactor all mixin use into managers.
from galaxy.managers import histories
from galaxy.managers import lddas
from galaxy.managers import tags
from galaxy.managers.collections_util import validate_input_element_identifiers
from galaxy.util import odict
from galaxy.util import validation
import logging
log = logging.getLogger( __name__ )

ERROR_INVALID_ELEMENTS_SPECIFICATION = "Create called with invalid parameters, must specify element identifiers."
ERROR_NO_COLLECTION_TYPE = "Create called without specifing a collection type."

[docs]class DatasetCollectionManager( object ): """ Abstraction for interfacing with dataset collections instance - ideally abstarcts out model and plugin details. """ ELEMENTS_UNINITIALIZED = object() def __init__( self, app ): self.type_registry = DatasetCollectionTypesRegistry( app ) self.collection_type_descriptions = CollectionTypeDescriptionFactory( self.type_registry ) self.model = app.model = self.hda_manager = hdas.HDAManager( app ) self.history_manager = histories.HistoryManager( app ) self.tag_manager = tags.TagManager( app ) self.ldda_manager = lddas.LDDAManager( app )
[docs] def create( self, trans, parent, # PRECONDITION: security checks on ability to add to parent occurred during load. name, collection_type, element_identifiers=None, elements=None, implicit_collection_info=None, ): """ """ # Trust embedded, newly created objects created by tool subsystem. trusted_identifiers = implicit_collection_info is not None if element_identifiers and not trusted_identifiers: validate_input_element_identifiers( element_identifiers ) dataset_collection = self.__create_dataset_collection( trans=trans, collection_type=collection_type, element_identifiers=element_identifiers, elements=elements, ) if isinstance( parent, model.History ): dataset_collection_instance = self.model.HistoryDatasetCollectionAssociation( collection=dataset_collection, name=name, ) if implicit_collection_info: for input_name, input_collection in implicit_collection_info[ "implicit_inputs" ]: dataset_collection_instance.add_implicit_input_collection( input_name, input_collection ) for output_dataset in implicit_collection_info.get( "outputs" ): if isinstance( output_dataset, model.HistoryDatasetCollectionAssociation ): dataset_collection_instance.add_implicit_input_collection( input_name, input_collection ) else: # dataset collection, don't need to do anything... pass trans.sa_session.add( output_dataset ) dataset_collection_instance.implicit_output_name = implicit_collection_info[ "implicit_output_name" ] log.debug("Created collection with %d elements" % ( len( dataset_collection_instance.collection.elements ) ) ) # Handle setting hid parent.add_dataset_collection( dataset_collection_instance ) elif isinstance( parent, model.LibraryFolder ): dataset_collection_instance = self.model.LibraryDatasetCollectionAssociation( collection=dataset_collection, folder=parent, name=name, ) else: message = "Internal logic error - create called with unknown parent type %s" % type( parent ) log.exception( message ) raise MessageException( message ) return self.__persist( dataset_collection_instance )
[docs] def create_dataset_collection( self, trans, collection_type, elements=None, ): return self.__create_dataset_collection( trans=trans, collection_type=collection_type, elements=elements, )
def __create_dataset_collection( self, trans, collection_type, element_identifiers=None, elements=None, ): if element_identifiers is None and elements is None: raise RequestParameterInvalidException( ERROR_INVALID_ELEMENTS_SPECIFICATION ) if not collection_type: raise RequestParameterInvalidException( ERROR_NO_COLLECTION_TYPE ) collection_type_description = self.collection_type_descriptions.for_collection_type( collection_type ) # If we have elements, this is an internal request, don't need to load # objects from identifiers. if elements is None: if collection_type_description.has_subcollections( ): # Nested collection - recursively create collections and update identifiers. self.__recursively_create_collections( trans, element_identifiers ) elements = self.__load_elements( trans, element_identifiers ) # else if elements is set, it better be an ordered dict! if elements is not self.ELEMENTS_UNINITIALIZED: type_plugin = collection_type_description.rank_type_plugin() dataset_collection = builder.build_collection( type_plugin, elements ) else: dataset_collection = model.DatasetCollection( populated=False ) dataset_collection.collection_type = collection_type return dataset_collection
[docs] def set_collection_elements( self, dataset_collection, dataset_instances ): if dataset_collection.populated: raise Exception("Cannot reset elements of an already populated dataset collection.") collection_type = dataset_collection.collection_type collection_type_description = self.collection_type_descriptions.for_collection_type( collection_type ) type_plugin = collection_type_description.rank_type_plugin() builder.set_collection_elements( dataset_collection, type_plugin, dataset_instances ) dataset_collection.mark_as_populated() return dataset_collection
[docs] def delete( self, trans, instance_type, id ): dataset_collection_instance = self.get_dataset_collection_instance( trans, instance_type, id, check_ownership=True ) dataset_collection_instance.deleted = True trans.sa_session.add( dataset_collection_instance ) trans.sa_session.flush( )
[docs] def update( self, trans, instance_type, id, payload ): dataset_collection_instance = self.get_dataset_collection_instance( trans, instance_type, id, check_ownership=True ) if trans.user is None: anon_allowed_payload = {} if 'deleted' in payload: anon_allowed_payload[ 'deleted' ] = payload[ 'deleted' ] if 'visible' in payload: anon_allowed_payload[ 'visible' ] = payload[ 'visible' ] payload = self._validate_and_parse_update_payload( anon_allowed_payload ) else: payload = self._validate_and_parse_update_payload( payload ) changed = self._set_from_dict( trans, dataset_collection_instance, payload ) return changed
[docs] def copy( self, trans, parent, # PRECONDITION: security checks on ability to add to parent occurred during load. source, encoded_source_id, ): assert source == "hdca" # for now source_hdca = self.__get_history_collection_instance( trans, encoded_source_id ) new_hdca = source_hdca.copy() parent.add_dataset_collection( new_hdca ) trans.sa_session.add( new_hdca ) trans.sa_session.flush() return source_hdca
def _set_from_dict( self, trans, dataset_collection_instance, new_data ): # send what we can down into the model changed = dataset_collection_instance.set_from_dict( new_data ) # the rest (often involving the trans) - do here if 'annotation' in new_data.keys() and trans.get_user(): dataset_collection_instance.add_item_annotation( trans.sa_session, trans.get_user(), dataset_collection_instance, new_data[ 'annotation' ] ) changed[ 'annotation' ] = new_data[ 'annotation' ] if 'tags' in new_data.keys() and trans.get_user(): self.tag_manager.set_tags_from_list( trans.get_user(), dataset_collection_instance, new_data[ 'tags' ] ) if changed.keys(): trans.sa_session.flush() return changed def _validate_and_parse_update_payload( self, payload ): validated_payload = {} for key, val in payload.items(): if val is None: continue if key in ( 'name' ): val = validation.validate_and_sanitize_basestring( key, val ) validated_payload[ key ] = val if key in ( 'deleted', 'visible' ): validated_payload[ key ] = validation.validate_boolean( key, val ) elif key == 'tags': validated_payload[ key ] = validation.validate_and_sanitize_basestring_list( key, val ) return validated_payload
[docs] def history_dataset_collections(self, history, query): collections = history.active_dataset_collections collections = filter( query.direct_match, collections ) return collections
def __persist( self, dataset_collection_instance ): context = self.model.context context.add( dataset_collection_instance ) context.flush() return dataset_collection_instance def __recursively_create_collections( self, trans, element_identifiers ): for index, element_identifier in enumerate( element_identifiers ): try: if not element_identifier[ "src" ] == "new_collection": # not a new collection, keep moving... continue except KeyError: # Not a dictionary, just an id of an HDA - move along. continue # element identifier is a dict with src new_collection... collection_type = element_identifier.get( "collection_type", None ) collection = self.__create_dataset_collection( trans=trans, collection_type=collection_type, element_identifiers=element_identifier[ "element_identifiers" ], ) element_identifier[ "__object__" ] = collection return element_identifiers def __load_elements( self, trans, element_identifiers ): elements = odict.odict() for element_identifier in element_identifiers: elements[ element_identifier[ "name" ] ] = self.__load_element( trans, element_identifier ) return elements def __load_element( self, trans, element_identifier ): # if not isinstance( element_identifier, dict ): # # Is allowing this to just be the id of an hda too clever? Somewhat # # consistent with other API methods though. # element_identifier = dict( src='hda', id=str( element_identifier ) ) # Previously created collection already found in request, just pass # through as is. if "__object__" in element_identifier: return element_identifier[ "__object__" ] # dateset_identifier is dict {src=hda|ldda|hdca|new_collection, id=<encoded_id>} try: src_type = element_identifier.get( 'src', 'hda' ) except AttributeError: raise MessageException( "Dataset collection element definition (%s) not dictionary-like." % element_identifier ) encoded_id = element_identifier.get( 'id', None ) if not src_type or not encoded_id: raise RequestParameterInvalidException( "Problem decoding element identifier %s" % element_identifier ) if src_type == 'hda': decoded_id = int( encoded_id ) ) element = self.hda_manager.get_accessible( decoded_id, trans.user ) elif src_type == 'ldda': element = self.ldda_manager.get( trans, encoded_id ) elif src_type == 'hdca': # TODO: Option to copy? Force copy? Copy or allow if not owned? element = self.__get_history_collection_instance( trans, encoded_id ).collection # TODO: ldca. else: raise RequestParameterInvalidException( "Unknown src_type parameter supplied '%s'." % src_type ) return element
[docs] def match_collections( self, collections_to_match ): """ May seem odd to place it here, but planning to grow sophistication and get plugin types involved so it will likely make sense in the future. """ return MatchingCollections.for_collections( collections_to_match, self.collection_type_descriptions )
[docs] def get_dataset_collection_instance( self, trans, instance_type, id, **kwds ): """ """ if instance_type == "history": return self.__get_history_collection_instance( trans, id, **kwds ) elif instance_type == "library": return self.__get_library_collection_instance( trans, id, **kwds )
[docs] def get_dataset_collection( self, trans, encoded_id ): collection_id = int( encoded_id ) ) collection = trans.sa_session.query( ).get( collection_id ) return collection
def __get_history_collection_instance( self, trans, id, check_ownership=False, check_accessible=True ): instance_id = int( id ) ) collection_instance = trans.sa_session.query( ).get( instance_id ) if check_ownership: self.history_manager.error_unless_owner( collection_instance.history, trans.user, current_history=trans.history ) if check_accessible: self.history_manager.error_unless_accessible( collection_instance.history, trans.user, current_history=trans.history ) return collection_instance def __get_library_collection_instance( self, trans, id, check_ownership=False, check_accessible=True ): if check_ownership: raise NotImplemented( "Functionality (getting library dataset collection with ownership check) unimplemented." ) instance_id = int( id ) ) collection_instance = trans.sa_session.query( ).get( instance_id ) if check_accessible: if not trans.get_current_user_roles(), collection_instance, trans.user ): raise ItemAccessibilityException( "LibraryDatasetCollectionAssociation is not accessible to the current user", type='error' ) return collection_instance