Source code for galaxy.managers.hdas

"""
Manager and Serializer for HDAs.

HistoryDatasetAssociations (HDAs) are datasets contained or created in a
history.
"""

from galaxy import exceptions

from galaxy.managers import base as manager_base
from galaxy.managers import histories as history_manager

import galaxy.web
import galaxy.datatypes.metadata
from galaxy import objectstore

[docs]class HDAManager( manager_base.ModelManager ): """ Interface/service object for interacting with HDAs. """ def __init__( self ): """ Set up and initialize other managers needed by hdas. """ self.histories_mgr = history_manager.HistoryManager()
[docs] def get( self, trans, unencoded_id, check_ownership=True, check_accessible=True ): """ Get an HDA by its unencoded db id, checking ownership (via its history) or accessibility (via dataset shares/permissions). """ hda = trans.sa_session.query( trans.app.model.HistoryDatasetAssociation ).get( unencoded_id ) if hda is None: raise exceptions.ObjectNotFound() hda = self.secure( trans, hda, check_ownership, check_accessible ) return hda
[docs] def secure( self, trans, hda, check_ownership=True, check_accessible=True ): """ check ownership (via its history) or accessibility (via dataset shares/permissions). """ # all items are accessible to an admin if trans.user and trans.user_is_admin(): return hda if check_ownership: hda = self.check_ownership( trans, hda ) if check_accessible: hda = self.check_accessible( trans, hda ) return hda
[docs] def can_access_dataset( self, trans, hda ): """ Use security agent to see if current user has access to dataset. """ current_user_roles = trans.get_current_user_roles() return trans.app.security_agent.can_access_dataset( current_user_roles, hda.dataset ) #TODO: is_owner, is_accessible
[docs] def check_ownership( self, trans, hda ): """ Use history to see if current user owns HDA. """ if not trans.user: #if hda.history == trans.history: # return hda raise exceptions.AuthenticationRequired( "Must be logged in to manage Galaxy datasets", type='error' ) if trans.user_is_admin(): return hda # check for ownership of the containing history and accessibility of the underlying dataset if( self.histories_mgr.is_owner( trans, hda.history ) and self.can_access_dataset( trans, hda ) ): return hda raise exceptions.ItemOwnershipException( "HistoryDatasetAssociation is not owned by the current user", type='error' )
[docs] def check_accessible( self, trans, hda ): """ Raise error if HDA is not accessible. """ if trans.user and trans.user_is_admin(): return hda # check for access of the containing history... self.histories_mgr.check_accessible( trans, hda.history ) # ...then the underlying dataset if self.can_access_dataset( trans, hda ): return hda raise exceptions.ItemAccessibilityException( "HistoryDatasetAssociation is not accessible to the current user", type='error' )
[docs] def err_if_uploading( self, trans, hda ): """ Raise error if HDA is still uploading. """ if hda.state == trans.model.Dataset.states.UPLOAD: raise exceptions.Conflict( "Please wait until this dataset finishes uploading" ) return hda
[docs] def get_hda_dict( self, trans, hda ): """ Return full details of this HDA in dictionary form. """ #precondition: the user's access to this hda has already been checked #TODO:?? postcondition: all ids are encoded (is this really what we want at this level?) expose_dataset_path = trans.user_is_admin() or trans.app.config.expose_dataset_path hda_dict = hda.to_dict( view='element', expose_dataset_path=expose_dataset_path ) hda_dict[ 'api_type' ] = "file" # Add additional attributes that depend on trans must be added here rather than at the model level. can_access_hda = trans.app.security_agent.can_access_dataset( trans.get_current_user_roles(), hda.dataset ) can_access_hda = ( trans.user_is_admin() or can_access_hda ) if not can_access_hda: return self.get_inaccessible_hda_dict( trans, hda ) hda_dict[ 'accessible' ] = True #TODO: I'm unclear as to which access pattern is right hda_dict[ 'annotation' ] = hda.get_item_annotation_str( trans.sa_session, hda.history.user, hda ) #annotation = getattr( hda, 'annotation', hda.get_item_annotation_str( trans.sa_session, trans.user, hda ) ) # ---- return here if deleted AND purged OR can't access purged = ( hda.purged or hda.dataset.purged ) if ( hda.deleted and purged ): #TODO: to_dict should really go AFTER this - only summary data return trans.security.encode_dict_ids( hda_dict ) if expose_dataset_path: try: hda_dict[ 'file_name' ] = hda.file_name except objectstore.ObjectNotFound: log.exception( 'objectstore.ObjectNotFound, HDA %s.', hda.id ) hda_dict[ 'download_url' ] = galaxy.web.url_for( 'history_contents_display', history_id = trans.security.encode_id( hda.history.id ), history_content_id = trans.security.encode_id( hda.id ) ) # indeces, assoc. metadata files, etc. meta_files = [] for meta_type in hda.metadata.spec.keys(): if isinstance( hda.metadata.spec[ meta_type ].param, galaxy.datatypes.metadata.FileParameter ): meta_files.append( dict( file_type=meta_type ) ) if meta_files: hda_dict[ 'meta_files' ] = meta_files # currently, the viz reg is optional - handle on/off if trans.app.visualizations_registry: hda_dict[ 'visualizations' ] = trans.app.visualizations_registry.get_visualizations( trans, hda ) else: hda_dict[ 'visualizations' ] = hda.get_visualizations() #TODO: it may also be wiser to remove from here and add as API call that loads the visualizations # when the visualizations button is clicked (instead of preloading/pre-checking) # ---- return here if deleted if hda.deleted and not purged: return trans.security.encode_dict_ids( hda_dict ) return trans.security.encode_dict_ids( hda_dict )
[docs] def get_inaccessible_hda_dict( self, trans, hda ): """ Return truncated serialization of HDA when inaccessible to user. """ return trans.security.encode_dict_ids({ 'id' : hda.id, 'history_id': hda.history.id, 'hid' : hda.hid, 'name' : hda.name, 'state' : hda.state, 'deleted' : hda.deleted, 'visible' : hda.visible, 'accessible': False })
[docs] def get_hda_dict_with_error( self, trans, hda=None, history_id=None, id=None, error_msg='Error' ): """ Return truncated serialization of HDA when error raised getting details. """ return trans.security.encode_dict_ids({ 'id' : hda.id if hda else id, 'history_id': hda.history.id if hda else history_id, 'hid' : hda.hid if hda else '(unknown)', 'name' : hda.name if hda else '(unknown)', 'error' : error_msg, 'state' : trans.model.Dataset.states.NEW })
[docs] def get_display_apps( self, trans, hda ): """ Return dictionary containing new-style display app urls. """ display_apps = [] for display_app in hda.get_display_applications( trans ).itervalues(): app_links = [] for link_app in display_app.links.itervalues(): app_links.append({ 'target': link_app.url.get( 'target_frame', '_blank' ), 'href' : link_app.get_display_url( hda, trans ), 'text' : gettext( link_app.name ) }) if app_links: display_apps.append( dict( label=display_app.name, links=app_links ) ) return display_apps
[docs] def get_old_display_applications( self, trans, hda ): """ Return dictionary containing old-style display app urls. """ display_apps = [] if not trans.app.config.enable_old_display_applications: return display_apps for display_app in hda.datatype.get_display_types(): target_frame, display_links = hda.datatype.get_display_links( hda, display_app, trans.app, trans.request.base ) if len( display_links ) > 0: display_label = hda.datatype.get_display_label( display_app ) app_links = [] for display_name, display_link in display_links: app_links.append({ 'target': target_frame, 'href' : display_link, 'text' : gettext( display_name ) }) if app_links: display_apps.append( dict( label=display_label, links=app_links ) ) return display_apps # =============================================================================
[docs]class HistorySerializer( manager_base.ModelSerializer ): """ Interface/service object for serializing HDAs into dictionaries. """ pass