Source code for galaxy.managers.base

from galaxy import exceptions

from galaxy import model
from galaxy.model import tool_shed_install


import logging
log = logging.getLogger( __name__ )


[docs]class ModelManager( object ): pass
[docs]class ModelSerializer( object ): pass
[docs]def security_check( trans, item, check_ownership=False, check_accessible=False ): """ Security checks for an item: checks if (a) user owns item or (b) item is accessible to user. This is a generic method for dealing with objects uniformly from the older controller mixin code - however whenever possible the managers for a particular model should be used to perform security checks. """ # all items are accessible to an admin if trans.user_is_admin(): return item # Verify ownership: there is a current user and that user is the same as the item's if check_ownership: if not trans.user: raise exceptions.ItemOwnershipException( "Must be logged in to manage Galaxy items", type='error' ) if item.user != trans.user: raise exceptions.ItemOwnershipException( "%s is not owned by the current user" % item.__class__.__name__, type='error' ) # Verify accessible: # if it's part of a lib - can they access via security # if it's something else (sharable) have they been added to the item's users_shared_with_dot_users if check_accessible: if type( item ) in ( trans.app.model.LibraryFolder, trans.app.model.LibraryDatasetDatasetAssociation, trans.app.model.LibraryDataset ): if not trans.app.security_agent.can_access_library_item( trans.get_current_user_roles(), item, trans.user ): raise exceptions.ItemAccessibilityException( "%s is not accessible to the current user" % item.__class__.__name__, type='error' ) else: if ( item.user != trans.user ) and ( not item.importable ) and ( trans.user not in item.users_shared_with_dot_users ): raise exceptions.ItemAccessibilityException( "%s is not accessible to the current user" % item.__class__.__name__, type='error' ) return item
[docs]def get_class( class_name ): """ Returns the class object that a string denotes. Without this method, we'd have to do eval(<class_name>). """ if class_name == 'History': item_class = model.History elif class_name == 'HistoryDatasetAssociation': item_class = model.HistoryDatasetAssociation elif class_name == 'Page': item_class = model.Page elif class_name == 'StoredWorkflow': item_class = model.StoredWorkflow elif class_name == 'Visualization': item_class = model.Visualization elif class_name == 'Tool': item_class = model.Tool elif class_name == 'Job': item_class = model.Job elif class_name == 'User': item_class = model.User elif class_name == 'Group': item_class = model.Group elif class_name == 'Role': item_class = model.Role elif class_name == 'Quota': item_class = model.Quota elif class_name == 'Library': item_class = model.Library elif class_name == 'LibraryFolder': item_class = model.LibraryFolder elif class_name == 'LibraryDatasetDatasetAssociation': item_class = model.LibraryDatasetDatasetAssociation elif class_name == 'LibraryDataset': item_class = model.LibraryDataset elif class_name == 'ToolShedRepository': item_class = tool_shed_install.ToolShedRepository else: item_class = None return item_class
[docs]def get_object( trans, id, class_name, check_ownership=False, check_accessible=False, deleted=None ): """ Convenience method to get a model object with the specified checks. This is a generic method for dealing with objects uniformly from the older controller mixin code - however whenever possible the managers for a particular model should be used to load objects. """ try: decoded_id = trans.security.decode_id( id ) except: raise exceptions.MessageException( "Malformed %s id ( %s ) specified, unable to decode" % ( class_name, str( id ) ), type='error' ) try: item_class = get_class( class_name ) assert item_class is not None item = trans.sa_session.query( item_class ).get( decoded_id ) assert item is not None except Exception: log.exception( "Invalid %s id ( %s ) specified." % ( class_name, id ) ) raise exceptions.MessageException( "Invalid %s id ( %s ) specified" % ( class_name, id ), type="error" ) if check_ownership or check_accessible: security_check( trans, item, check_ownership, check_accessible ) if deleted is True and not item.deleted: raise exceptions.ItemDeletionException( '%s "%s" is not deleted' % ( class_name, getattr( item, 'name', id ) ), type="warning" ) elif deleted is False and item.deleted: raise exceptions.ItemDeletionException( '%s "%s" is deleted' % ( class_name, getattr( item, 'name', id ) ), type="warning" ) return item