Source code for galaxy.webapps.galaxy.api.libraries

"""
API operations on a data library.
"""
from galaxy import util
from galaxy import exceptions
from galaxy.managers import libraries, folders, roles
from galaxy.web import _future_expose_api as expose_api
from galaxy.web import _future_expose_api_anonymous as expose_api_anonymous
from galaxy.web.base.controller import BaseAPIController

import logging
log = logging.getLogger( __name__ )


[docs]class LibrariesController( BaseAPIController ): def __init__( self, app ): super( LibrariesController, self ).__init__( app ) self.folder_manager = folders.FolderManager() self.library_manager = libraries.LibraryManager() self.role_manager = roles.RoleManager( app ) @expose_api_anonymous
[docs] def index( self, trans, **kwd ): """ index( self, trans, **kwd ) * GET /api/libraries: Returns a list of summary data for all libraries. :param deleted: if True, show only ``deleted`` libraries, if False show only ``non-deleted`` :type deleted: boolean (optional) :returns: list of dictionaries containing library information :rtype: list .. seealso:: :attr:`galaxy.model.Library.dict_collection_visible_keys` """ deleted = util.string_as_bool_or_none( kwd.get( 'deleted', None ) ) query = self.library_manager.list( trans, deleted ) libraries = [] for library in query: libraries.append( self.library_manager.get_library_dict( trans, library ) ) return libraries
def __decode_id( self, trans, encoded_id, object_name=None ): """ Try to decode the id. :param object_name: Name of the object the id belongs to. (optional) :type object_name: str """ try: return trans.security.decode_id( encoded_id ) except TypeError: raise exceptions.MalformedId( 'Malformed %s id specified, unable to decode.' % object_name if object_name is not None else '' ) @expose_api_anonymous
[docs] def show( self, trans, id, deleted='False', **kwd ): """ show( self, trans, id, deleted='False', **kwd ) * GET /api/libraries/{encoded_id}: returns detailed information about a library * GET /api/libraries/deleted/{encoded_id}: returns detailed information about a ``deleted`` library :param id: the encoded id of the library :type id: an encoded id string :param deleted: if True, allow information on a ``deleted`` library :type deleted: boolean :returns: detailed library information :rtype: dictionary .. seealso:: :attr:`galaxy.model.Library.dict_element_visible_keys` :raises: MalformedId, ObjectNotFound """ library = self.library_manager.get( trans, self.__decode_id( trans, id, 'library' ) ) library_dict = self.library_manager.get_library_dict( trans, library ) return library_dict
@expose_api
[docs] def create( self, trans, payload, **kwd ): """ create( self, trans, payload, **kwd ) * POST /api/libraries: Creates a new library. Only ``name`` parameter is required. .. note:: Currently, only admin users can create libraries. :param payload: dictionary structure containing:: 'name': the new library's name (required) 'description': the new library's description (optional) 'synopsis': the new library's synopsis (optional) :type payload: dict :returns: detailed library information :rtype: dict :raises: ItemAccessibilityException, RequestParameterMissingException """ params = util.Params( payload ) name = util.restore_text( params.get( 'name', None ) ) if not name: raise exceptions.RequestParameterMissingException( "Missing required parameter 'name'." ) description = util.restore_text( params.get( 'description', '' ) ) synopsis = util.restore_text( params.get( 'synopsis', '' ) ) if synopsis in [ 'None', None ]: synopsis = '' library = self.library_manager.create( trans, name, description, synopsis ) library_dict = self.library_manager.get_library_dict( trans, library ) return library_dict
@expose_api
[docs] def update( self, trans, id, **kwd ): """ * PATCH /api/libraries/{encoded_id} Updates the library defined by an ``encoded_id`` with the data in the payload. .. note:: Currently, only admin users can update libraries. Also the library must not be `deleted`. :param id: the encoded id of the library :type id: an encoded id string :param payload: (required) dictionary structure containing:: 'name': new library's name, cannot be empty 'description': new library's description 'synopsis': new library's synopsis :type payload: dict :returns: detailed library information :rtype: dict :raises: ItemAccessibilityException, MalformedId, ObjectNotFound, RequestParameterInvalidException, RequestParameterMissingException """ library = self.library_manager.get( trans, self.__decode_id( trans, id, 'library' ) ) payload = kwd.get( 'payload', None ) if payload: name = payload.get( 'name', None ) if name == '': raise exceptions.RequestParameterMissingException( "Parameter 'name' of library is required. You cannot remove it." ) if payload.get( 'description', None ) or payload.get( 'description', None ) == '': description = payload.get( 'description', None ) if payload.get( 'synopsis', None ) or payload.get( 'synopsis', None ) == '': synopsis = payload.get( 'synopsis', None ) else: raise exceptions.RequestParameterMissingException( "You did not specify any payload." ) updated_library = self.library_manager.update( trans, library, name, description, synopsis ) library_dict = self.library_manager.get_library_dict( trans, updated_library ) return library_dict
@expose_api
[docs] def delete( self, trans, id, **kwd ): """ delete( self, trans, id, **kwd ) * DELETE /api/libraries/{id} marks the library with the given ``id`` as `deleted` (or removes the `deleted` mark if the `undelete` param is true) .. note:: Currently, only admin users can un/delete libraries. :param id: the encoded id of the library to un/delete :type id: an encoded id string :param undelete: (optional) flag specifying whether the item should be deleted or undeleted, defaults to false: :type undelete: bool :returns: detailed library information :rtype: dictionary .. seealso:: :attr:`galaxy.model.Library.dict_element_visible_keys` :raises: ItemAccessibilityException, MalformedId, ObjectNotFound """ library = self.library_manager.get( trans, self.__decode_id( trans, id, 'library' )) undelete = util.string_as_bool( kwd.get( 'undelete', False ) ) library = self.library_manager.delete( trans, library, undelete ) library_dict = self.library_manager.get_library_dict( trans, library ) return library_dict
@expose_api
[docs] def get_permissions( self, trans, encoded_library_id, **kwd ): """ * GET /api/libraries/{id}/permissions Load all permissions for the given library id and return it. :param encoded_library_id: the encoded id of the library :type encoded_library_id: an encoded id string :param scope: either 'current' or 'available' :type scope: string :param is_library_access: indicates whether the roles available for the library access are requested :type is_library_access: bool :returns: dictionary with all applicable permissions' values :rtype: dictionary :raises: ObjectNotFound, InsufficientPermissionsException """ current_user_roles = trans.get_current_user_roles() is_admin = trans.user_is_admin() library = self.library_manager.get( trans, self.__decode_id( trans, encoded_library_id, 'library' ) ) if not ( is_admin or trans.app.security_agent.can_manage_library_item( current_user_roles, library ) ): raise exceptions.InsufficientPermissionsException( 'You do not have proper permission to access permissions of this library.' ) scope = kwd.get( 'scope', None ) is_library_access = util.string_as_bool( kwd.get( 'is_library_access', False ) ) if scope == 'current' or scope is None: roles = self.library_manager.get_current_roles( trans, library ) return roles # Return roles that are available to select. elif scope == 'available': page = kwd.get( 'page', None ) if page is not None: page = int( page ) else: page = 1 page_limit = kwd.get( 'page_limit', None ) if page_limit is not None: page_limit = int( page_limit ) else: page_limit = 10 query = kwd.get( 'q', None ) roles, total_roles = trans.app.security_agent.get_valid_roles( trans, library, query, page, page_limit, is_library_access ) return_roles = [] for role in roles: role_id = trans.security.encode_id( role.id ) return_roles.append( dict( id=role_id, name=role.name, type=role.type ) ) return dict( roles=return_roles, page=page, page_limit=page_limit, total=total_roles ) else: raise exceptions.RequestParameterInvalidException( "The value of 'scope' parameter is invalid. Alllowed values: current, available" )
@expose_api
[docs] def set_permissions( self, trans, encoded_library_id, **kwd ): """ def set_permissions( self, trans, encoded_dataset_id, **kwd ): *POST /api/libraries/{encoded_library_id}/permissions :param encoded_library_id: the encoded id of the library to set the permissions of :type encoded_library_id: an encoded id string :param action: (required) describes what action should be performed available actions: remove_restrictions, set_permissions :type action: string :param access_ids[]: list of Role.id defining roles that should have access permission on the library :type access_ids[]: string or list :param add_ids[]: list of Role.id defining roles that should have add item permission on the library :type add_ids[]: string or list :param manage_ids[]: list of Role.id defining roles that should have manage permission on the library :type manage_ids[]: string or list :param modify_ids[]: list of Role.id defining roles that should have modify permission on the library :type modify_ids[]: string or list :rtype: dictionary :returns: dict of current roles for all available permission types :raises: RequestParameterInvalidException, ObjectNotFound, InsufficientPermissionsException, InternalServerError RequestParameterMissingException """ is_admin = trans.user_is_admin() current_user_roles = trans.get_current_user_roles() library = self.library_manager.get( trans, self.__decode_id( trans, encoded_library_id, 'library' ) ) if not ( is_admin or trans.app.security_agent.can_manage_library_item( current_user_roles, library ) ): raise exceptions.InsufficientPermissionsException( 'You do not have proper permission to modify permissions of this library.' ) new_access_roles_ids = util.listify( kwd.get( 'access_ids[]', None ) ) new_add_roles_ids = util.listify( kwd.get( 'add_ids[]', None ) ) new_manage_roles_ids = util.listify( kwd.get( 'manage_ids[]', None ) ) new_modify_roles_ids = util.listify( kwd.get( 'modify_ids[]', None ) ) action = kwd.get( 'action', None ) if action is None: payload = kwd.get( 'payload', None ) del kwd[ 'payload' ] if payload is not None: return self.set_permissions_old( trans, library, payload, **kwd ) else: raise exceptions.RequestParameterMissingException( 'The mandatory parameter "action" is missing.' ) elif action == 'remove_restrictions': is_public = self.library_manager.make_public( trans, library ) if not is_public: raise exceptions.InternalServerError( 'An error occured while making library public.' ) elif action == 'set_permissions': # ACCESS LIBRARY ROLES valid_access_roles = [] invalid_access_roles_names = [] for role_id in new_access_roles_ids: role = self.role_manager.get( trans, self.__decode_id( trans, role_id, 'role' ) ) valid_roles, total_roles = trans.app.security_agent.get_valid_roles( trans, library, is_library_access=True ) if role in valid_roles: valid_access_roles.append( role ) else: invalid_access_roles_names.append( role_id ) if len( invalid_access_roles_names ) > 0: log.warning( "The following roles could not be added to the library access permission: " + str( invalid_access_roles_names ) ) # ADD TO LIBRARY ROLES valid_add_roles = [] invalid_add_roles_names = [] for role_id in new_add_roles_ids: role = self.role_manager.get( trans, self.__decode_id( trans, role_id, 'role' ) ) valid_roles, total_roles = trans.app.security_agent.get_valid_roles( trans, library ) if role in valid_roles: valid_add_roles.append( role ) else: invalid_add_roles_names.append( role_id ) if len( invalid_add_roles_names ) > 0: log.warning( "The following roles could not be added to the add library item permission: " + str( invalid_add_roles_names ) ) # MANAGE LIBRARY ROLES valid_manage_roles = [] invalid_manage_roles_names = [] for role_id in new_manage_roles_ids: role = self.role_manager.get( trans, self.__decode_id( trans, role_id, 'role' ) ) valid_roles, total_roles = trans.app.security_agent.get_valid_roles( trans, library ) if role in valid_roles: valid_manage_roles.append( role ) else: invalid_manage_roles_names.append( role_id ) if len( invalid_manage_roles_names ) > 0: log.warning( "The following roles could not be added to the manage library permission: " + str( invalid_manage_roles_names ) ) # MODIFY LIBRARY ROLES valid_modify_roles = [] invalid_modify_roles_names = [] for role_id in new_modify_roles_ids: role = self.role_manager.get( trans, self.__decode_id( trans, role_id, 'role' ) ) valid_roles, total_roles = trans.app.security_agent.get_valid_roles( trans, library ) if role in valid_roles: valid_modify_roles.append( role ) else: invalid_modify_roles_names.append( role_id ) if len( invalid_modify_roles_names ) > 0: log.warning( "The following roles could not be added to the modify library permission: " + str( invalid_modify_roles_names ) ) permissions = { trans.app.security_agent.permitted_actions.LIBRARY_ACCESS: valid_access_roles } permissions.update( { trans.app.security_agent.permitted_actions.LIBRARY_ADD: valid_add_roles } ) permissions.update( { trans.app.security_agent.permitted_actions.LIBRARY_MANAGE: valid_manage_roles } ) permissions.update( { trans.app.security_agent.permitted_actions.LIBRARY_MODIFY: valid_modify_roles } ) trans.app.security_agent.set_all_library_permissions( trans, library, permissions ) trans.sa_session.refresh( library ) # Copy the permissions to the root folder trans.app.security_agent.copy_library_permissions( trans, library, library.root_folder ) else: raise exceptions.RequestParameterInvalidException( 'The mandatory parameter "action" has an invalid value.' 'Allowed values are: "remove_restrictions", set_permissions"' ) roles = self.library_manager.get_current_roles( trans, library ) return roles
[docs] def set_permissions_old( self, trans, library, payload, **kwd ): """ *** old implementation for backward compatibility *** POST /api/libraries/{encoded_library_id}/permissions Updates the library permissions. """ params = util.Params( payload ) permissions = {} for k, v in trans.app.model.Library.permitted_actions.items(): role_params = params.get( k + '_in', [] ) in_roles = [ trans.sa_session.query( trans.app.model.Role ).get( trans.security.decode_id( x ) ) for x in util.listify( role_params ) ] permissions[ trans.app.security_agent.get_action( v.action ) ] = in_roles trans.app.security_agent.set_all_library_permissions( trans, library, permissions ) trans.sa_session.refresh( library ) # Copy the permissions to the root folder trans.app.security_agent.copy_library_permissions( trans, library, library.root_folder ) item = library.to_dict( view='element', value_mapper={ 'id': trans.security.encode_id, 'root_folder_id': trans.security.encode_id } ) return item