from galaxy import exceptions
from galaxy import model
from galaxy.model import tool_shed_install
import logging
log = logging.getLogger( __name__ )
[docs]class ModelManager( object ):
pass
[docs]class ModelSerializer( object ):
pass
[docs]def security_check( trans, item, check_ownership=False, check_accessible=False ):
""" Security checks for an item: checks if (a) user owns item or (b) item
is accessible to user. This is a generic method for dealing with objects
uniformly from the older controller mixin code - however whenever possible
the managers for a particular model should be used to perform security
checks.
"""
# all items are accessible to an admin
if trans.user_is_admin():
return item
# Verify ownership: there is a current user and that user is the same as the item's
if check_ownership:
if not trans.user:
raise exceptions.ItemOwnershipException( "Must be logged in to manage Galaxy items", type='error' )
if item.user != trans.user:
raise exceptions.ItemOwnershipException( "%s is not owned by the current user" % item.__class__.__name__, type='error' )
# Verify accessible:
# if it's part of a lib - can they access via security
# if it's something else (sharable) have they been added to the item's users_shared_with_dot_users
if check_accessible:
if type( item ) in ( trans.app.model.LibraryFolder, trans.app.model.LibraryDatasetDatasetAssociation, trans.app.model.LibraryDataset ):
if not trans.app.security_agent.can_access_library_item( trans.get_current_user_roles(), item, trans.user ):
raise exceptions.ItemAccessibilityException( "%s is not accessible to the current user" % item.__class__.__name__, type='error' )
else:
if ( item.user != trans.user ) and ( not item.importable ) and ( trans.user not in item.users_shared_with_dot_users ):
raise exceptions.ItemAccessibilityException( "%s is not accessible to the current user" % item.__class__.__name__, type='error' )
return item
[docs]def get_class( class_name ):
""" Returns the class object that a string denotes. Without this method, we'd have to do eval(<class_name>). """
if class_name == 'History':
item_class = model.History
elif class_name == 'HistoryDatasetAssociation':
item_class = model.HistoryDatasetAssociation
elif class_name == 'Page':
item_class = model.Page
elif class_name == 'StoredWorkflow':
item_class = model.StoredWorkflow
elif class_name == 'Visualization':
item_class = model.Visualization
elif class_name == 'Tool':
item_class = model.Tool
elif class_name == 'Job':
item_class = model.Job
elif class_name == 'User':
item_class = model.User
elif class_name == 'Group':
item_class = model.Group
elif class_name == 'Role':
item_class = model.Role
elif class_name == 'Quota':
item_class = model.Quota
elif class_name == 'Library':
item_class = model.Library
elif class_name == 'LibraryFolder':
item_class = model.LibraryFolder
elif class_name == 'LibraryDatasetDatasetAssociation':
item_class = model.LibraryDatasetDatasetAssociation
elif class_name == 'LibraryDataset':
item_class = model.LibraryDataset
elif class_name == 'ToolShedRepository':
item_class = tool_shed_install.ToolShedRepository
else:
item_class = None
return item_class
[docs]def get_object( trans, id, class_name, check_ownership=False, check_accessible=False, deleted=None ):
"""
Convenience method to get a model object with the specified checks. This is
a generic method for dealing with objects uniformly from the older
controller mixin code - however whenever possible the managers for a
particular model should be used to load objects.
"""
try:
decoded_id = trans.security.decode_id( id )
except:
raise exceptions.MessageException( "Malformed %s id ( %s ) specified, unable to decode"
% ( class_name, str( id ) ), type='error' )
try:
item_class = get_class( class_name )
assert item_class is not None
item = trans.sa_session.query( item_class ).get( decoded_id )
assert item is not None
except Exception:
log.exception( "Invalid %s id ( %s ) specified." % ( class_name, id ) )
raise exceptions.MessageException( "Invalid %s id ( %s ) specified" % ( class_name, id ), type="error" )
if check_ownership or check_accessible:
security_check( trans, item, check_ownership, check_accessible )
if deleted is True and not item.deleted:
raise exceptions.ItemDeletionException( '%s "%s" is not deleted'
% ( class_name, getattr( item, 'name', id ) ), type="warning" )
elif deleted is False and item.deleted:
raise exceptions.ItemDeletionException( '%s "%s" is deleted'
% ( class_name, getattr( item, 'name', id ) ), type="warning" )
return item