"""
Manager and Serializer for libraries.
"""
import galaxy.web
from galaxy import exceptions
from galaxy.model import orm
from galaxy.managers import base as manager_base
from galaxy.model.orm import and_, not_, or_
from sqlalchemy.orm.exc import MultipleResultsFound
from sqlalchemy.orm.exc import NoResultFound
import logging
log = logging.getLogger( __name__ )
# =============================================================================
[docs]class LibraryManager( object ):
"""
Interface/service object for interacting with libraries.
"""
def __init__( self, *args, **kwargs ):
super( LibraryManager, self ).__init__( *args, **kwargs )
[docs] def get( self, trans, decoded_library_id, check_accessible=True ):
"""
Get the library from the DB.
:param decoded_library_id: decoded library id
:type decoded_library_id: int
:param check_accessible: flag whether to check that user can access item
:type check_accessible: bool
:returns: the requested library
:rtype: Library
"""
try:
library = trans.sa_session.query( trans.app.model.Library ).filter( trans.app.model.Library.table.c.id == decoded_library_id ).one()
except MultipleResultsFound:
raise exceptions.InconsistentDatabase( 'Multiple libraries found with the same id.' )
except NoResultFound:
raise exceptions.RequestParameterInvalidException( 'No library found with the id provided.' )
except Exception, e:
raise exceptions.InternalServerError( 'Error loading from the database.' + str( e ) )
library = self.secure( trans, library, check_accessible)
return library
[docs] def create( self, trans, name, description='', synopsis=''):
"""
Create a new library.
"""
if not trans.user_is_admin:
raise exceptions.ItemAccessibilityException( 'Only administrators can create libraries.' )
else:
library = trans.app.model.Library( name=name, description=description, synopsis=synopsis )
root_folder = trans.app.model.LibraryFolder( name=name, description='' )
library.root_folder = root_folder
trans.sa_session.add_all( ( library, root_folder ) )
trans.sa_session.flush()
return library
[docs] def update( self, trans, library, name=None, description=None, synopsis=None ):
"""
Update the given library
"""
changed = False
if not trans.user_is_admin():
raise exceptions.ItemAccessibilityException( 'Only administrators can update libraries.' )
if library.deleted:
raise exceptions.RequestParameterInvalidException( 'You cannot modify a deleted library. Undelete it first.' )
if name is not None:
library.name = name
changed = True
if description is not None:
library.description = description
changed = True
if synopsis is not None:
library.synopsis = synopsis
changed = True
if changed:
trans.sa_session.add( library )
trans.sa_session.flush()
return library
[docs] def delete( self, trans, library, undelete=False ):
"""
Mark given library deleted/undeleted based on the flag.
"""
if not trans.user_is_admin():
raise exceptions.ItemAccessibilityException( 'Only administrators can delete and undelete libraries.' )
if undelete:
library.deleted = False
else:
library.deleted = True
trans.sa_session.add( library )
trans.sa_session.flush()
return library
[docs] def list( self, trans, deleted=False ):
"""
Return a list of libraries from the DB.
:param deleted: if True, show only ``deleted`` libraries, if False show only ``non-deleted``
:type deleted: boolean (optional)
:returns: query that will emit all accessible libraries
:rtype: sqlalchemy query
"""
is_admin = trans.user_is_admin()
query = trans.sa_session.query( trans.app.model.Library )
if is_admin:
if deleted is None:
# Flag is not specified, do not filter on it.
pass
elif deleted:
query = query.filter( trans.app.model.Library.table.c.deleted == True )
else:
query = query.filter( trans.app.model.Library.table.c.deleted == False )
else:
# Nonadmins can't see deleted libraries
current_user_role_ids = [ role.id for role in trans.get_current_user_roles() ]
library_access_action = trans.app.security_agent.permitted_actions.LIBRARY_ACCESS.action
restricted_library_ids = [ lp.library_id for lp in ( trans.sa_session.query( trans.model.LibraryPermissions )
.filter( trans.model.LibraryPermissions.table.c.action == library_access_action )
.distinct() ) ]
accessible_restricted_library_ids = [ lp.library_id for lp in ( trans.sa_session.query( trans.model.LibraryPermissions )
.filter( and_( trans.model.LibraryPermissions.table.c.action == library_access_action,
trans.model.LibraryPermissions.table.c.role_id.in_( current_user_role_ids ) ) ) ) ]
query = query.filter( or_( not_( trans.model.Library.table.c.id.in_( restricted_library_ids ) ), trans.model.Library.table.c.id.in_( accessible_restricted_library_ids ) ) )
return query
[docs] def secure( self, trans, library, check_accessible=True ):
"""
Check if library is accessible to user.
:param folder: library
:type folder: Library
:param check_accessible: flag whether to check that user can access library
:type check_accessible: bool
:returns: the original folder
:rtype: LibraryFolder
"""
# all libraries are accessible to an admin
if trans.user_is_admin():
return library
if check_accessible:
library = self.check_accessible( trans, library )
return library
[docs] def check_accessible( self, trans, library ):
"""
Check whether the library is accessible to current user.
"""
if not trans.app.security_agent.can_access_library( trans.get_current_user_roles(), library ):
raise exceptions.ObjectNotFound( 'Library with the id provided was not found.' )
elif library.deleted:
raise exceptions.ObjectNotFound( 'Library with the id provided is deleted.' )
else:
return library
[docs] def get_library_dict( self, trans, library ):
"""
Return library data in the form of a dictionary.
:param library: library
:type library: Library
:returns: dict with data about the library
:rtype: dictionary
"""
library_dict = library.to_dict( view='element', value_mapper={ 'id': trans.security.encode_id, 'root_folder_id': trans.security.encode_id } )
if trans.app.security_agent.library_is_public( library, contents=False ):
library_dict[ 'public' ] = True
current_user_roles = trans.get_current_user_roles()
if not trans.user_is_admin():
library_dict[ 'can_user_add' ] = trans.app.security_agent.can_add_library_item( current_user_roles, library )
library_dict[ 'can_user_modify' ] = trans.app.security_agent.can_modify_library_item( current_user_roles, library )
library_dict[ 'can_user_manage' ] = trans.app.security_agent.can_manage_library_item( current_user_roles, library )
else:
library_dict[ 'can_user_add' ] = True
library_dict[ 'can_user_modify' ] = True
library_dict[ 'can_user_manage' ] = True
return library_dict
[docs] def get_current_roles( self, trans, library ):
"""
Load all permissions currently related to the given library.
:param library: the model object
:type library: Library
:rtype: dictionary
:returns: dict of current roles for all available permission types
"""
access_library_role_list = [ ( access_role.name, trans.security.encode_id( access_role.id ) ) for access_role in self.get_access_roles( trans, library ) ]
modify_library_role_list = [ ( modify_role.name, trans.security.encode_id( modify_role.id ) ) for modify_role in self.get_modify_roles( trans, library ) ]
manage_library_role_list = [ ( manage_role.name, trans.security.encode_id( manage_role.id ) ) for manage_role in self.get_manage_roles( trans, library ) ]
add_library_item_role_list = [ ( add_role.name, trans.security.encode_id( add_role.id ) ) for add_role in self.get_add_roles( trans, library ) ]
return dict( access_library_role_list=access_library_role_list,
modify_library_role_list=modify_library_role_list,
manage_library_role_list=manage_library_role_list,
add_library_item_role_list=add_library_item_role_list )
[docs] def get_access_roles( self, trans, library ):
"""
Load access roles for all library permissions
"""
return set( library.get_access_roles( trans ) )
[docs] def get_modify_roles( self, trans, library ):
"""
Load modify roles for all library permissions
"""
return set( trans.app.security_agent.get_roles_for_action( library, trans.app.security_agent.permitted_actions.LIBRARY_MODIFY ) )
[docs] def get_manage_roles( self, trans, library ):
"""
Load manage roles for all library permissions
"""
return set( trans.app.security_agent.get_roles_for_action( library, trans.app.security_agent.permitted_actions.LIBRARY_MANAGE ) )
[docs] def get_add_roles( self, trans, library ):
"""
Load add roles for all library permissions
"""
return set( trans.app.security_agent.get_roles_for_action( library, trans.app.security_agent.permitted_actions.LIBRARY_ADD ) )
[docs] def set_permission_roles( self, trans, library, access_roles, modify_roles, manage_roles, add_roles ):
"""
Set permissions on the given library.
"""
[docs] def make_public( self, trans, library ):
"""
Makes the given library public (removes all access roles)
"""
trans.app.security_agent.make_library_public( library )
return self.is_public( trans, library )
[docs] def is_public( self, trans, library ):
"""
Return true if lib is public.
"""
return trans.app.security_agent.library_is_public( library )