Source code for galaxy.webapps.galaxy.api.folders

"""
API operations on library folders.
"""
from galaxy import util
from galaxy import exceptions
from galaxy.managers import folders
from galaxy.web import _future_expose_api as expose_api
from galaxy.web.base.controller import BaseAPIController, UsesLibraryMixin, UsesLibraryMixinItems
from sqlalchemy.orm.exc import MultipleResultsFound
from sqlalchemy.orm.exc import NoResultFound

import logging
log = logging.getLogger( __name__ )


[docs]class FoldersController( BaseAPIController, UsesLibraryMixin, UsesLibraryMixinItems ): def __init__( self, app ): super( FoldersController, self ).__init__( app ) self.folder_manager = folders.FolderManager() @expose_api
[docs] def index( self, trans, **kwd ): """ *GET /api/folders/ This would normally display a list of folders. However, that would be across multiple libraries, so it's not implemented. """ raise exceptions.NotImplemented( 'Listing all accessible library folders is not implemented.' )
@expose_api
[docs] def show( self, trans, id, **kwd ): """ show( self, trans, id, **kwd ) *GET /api/folders/{encoded_folder_id} Displays information about a folder. :param id: the folder's encoded id (required) :type id: an encoded id string (has to be prefixed by 'F') :returns: dictionary including details of the folder :rtype: dict """ folder_id = self.folder_manager.cut_and_decode( trans, id ) folder = self.folder_manager.get( trans, folder_id, check_manageable=False, check_accessible=True ) return_dict = self.folder_manager.get_folder_dict( trans, folder ) return return_dict
@expose_api
[docs] def create( self, trans, encoded_parent_folder_id, **kwd ): """ create( self, trans, encoded_parent_folder_id, **kwd ) *POST /api/folders/{encoded_parent_folder_id} Create a new folder object underneath the one specified in the parameters. :param encoded_parent_folder_id: the parent folder's id (required) :type encoded_parent_folder_id: an encoded id string (should be prefixed by 'F') :param name: the name of the new folder (required) :type name: str :param description: the description of the new folder :type description: str :returns: information about newly created folder, notably including ID :rtype: dictionary :raises: RequestParameterMissingException """ payload = kwd.get( 'payload', None ) if payload is None: raise exceptions.RequestParameterMissingException( "Missing required parameter 'name'." ) name = payload.get( 'name', None ) if name is None: raise exceptions.RequestParameterMissingException( "Missing required parameter 'name'." ) description = payload.get( 'description', '' ) decoded_parent_folder_id = self.folder_manager.cut_and_decode( trans, encoded_parent_folder_id ) parent_folder = self.folder_manager.get( trans, decoded_parent_folder_id ) new_folder = self.folder_manager.create( trans, parent_folder.id, name, description ) return self.folder_manager.get_folder_dict( trans, new_folder )
@expose_api
[docs] def get_permissions( self, trans, encoded_folder_id, **kwd ): """ * GET /api/folders/{id}/permissions Load all permissions for the given folder id and return it. :param encoded_folder_id: the encoded id of the folder :type encoded_folder_id: an encoded id string :param scope: either 'current' or 'available' :type scope: string :returns: dictionary with all applicable permissions' values :rtype: dictionary :raises: ObjectNotFound, InsufficientPermissionsException """ current_user_roles = trans.get_current_user_roles() is_admin = trans.user_is_admin() decoded_folder_id = self.folder_manager.cut_and_decode( trans, encoded_folder_id ) folder = self.folder_manager.get( trans, decoded_folder_id ) if not ( is_admin or trans.app.security_agent.can_manage_library_item( current_user_roles, folder ) ): raise exceptions.InsufficientPermissionsException( 'You do not have proper permission to access permissions of this folder.' ) scope = kwd.get( 'scope', None ) if scope == 'current' or scope is None: return self.folder_manager.get_current_roles( trans, folder ) # Return roles that are available to select. elif scope == 'available': page = kwd.get( 'page', None ) if page is not None: page = int( page ) else: page = 1 page_limit = kwd.get( 'page_limit', None ) if page_limit is not None: page_limit = int( page_limit ) else: page_limit = 10 query = kwd.get( 'q', None ) roles, total_roles = trans.app.security_agent.get_valid_roles( trans, folder, query, page, page_limit ) return_roles = [] for role in roles: role_id = trans.security.encode_id( role.id ) return_roles.append( dict( id=role_id, name=role.name, type=role.type ) ) return dict( roles=return_roles, page=page, page_limit=page_limit, total=total_roles ) else: raise exceptions.RequestParameterInvalidException( "The value of 'scope' parameter is invalid. Alllowed values: current, available" )
@expose_api
[docs] def set_permissions( self, trans, encoded_folder_id, **kwd ): """ def set_permissions( self, trans, encoded_folder_id, **kwd ): *POST /api/folders/{encoded_folder_id}/permissions :param encoded_folder_id: the encoded id of the folder to set the permissions of :type encoded_folder_id: an encoded id string :param action: (required) describes what action should be performed available actions: set_permissions :type action: string :param add_ids[]: list of Role.id defining roles that should have add item permission on the folder :type add_ids[]: string or list :param manage_ids[]: list of Role.id defining roles that should have manage permission on the folder :type manage_ids[]: string or list :param modify_ids[]: list of Role.id defining roles that should have modify permission on the folder :type modify_ids[]: string or list :rtype: dictionary :returns: dict of current roles for all available permission types. :raises: RequestParameterInvalidException, ObjectNotFound, InsufficientPermissionsException, InternalServerError RequestParameterMissingException """ is_admin = trans.user_is_admin() current_user_roles = trans.get_current_user_roles() decoded_folder_id = self.folder_manager.decode_folder_id( trans, self.folder_manager.cut_the_prefix( encoded_folder_id ) ) folder = self.folder_manager.get( trans, decoded_folder_id ) if not ( is_admin or trans.app.security_agent.can_manage_library_item( current_user_roles, folder ) ): raise exceptions.InsufficientPermissionsException( 'You do not have proper permission to modify permissions of this folder.' ) new_add_roles_ids = util.listify( kwd.get( 'add_ids[]', None ) ) new_manage_roles_ids = util.listify( kwd.get( 'manage_ids[]', None ) ) new_modify_roles_ids = util.listify( kwd.get( 'modify_ids[]', None ) ) action = kwd.get( 'action', None ) if action is None: raise exceptions.RequestParameterMissingException( 'The mandatory parameter "action" is missing.' ) elif action == 'set_permissions': # ADD TO LIBRARY ROLES valid_add_roles = [] invalid_add_roles_names = [] for role_id in new_add_roles_ids: role = self._load_role( trans, role_id ) # Check whether role is in the set of allowed roles valid_roles, total_roles = trans.app.security_agent.get_valid_roles( trans, folder ) if role in valid_roles: valid_add_roles.append( role ) else: invalid_add_roles_names.append( role_id ) if len( invalid_add_roles_names ) > 0: log.warning( "The following roles could not be added to the add library item permission: " + str( invalid_add_roles_names ) ) # MANAGE FOLDER ROLES valid_manage_roles = [] invalid_manage_roles_names = [] for role_id in new_manage_roles_ids: role = self._load_role( trans, role_id ) # Check whether role is in the set of allowed roles valid_roles, total_roles = trans.app.security_agent.get_valid_roles( trans, folder ) if role in valid_roles: valid_manage_roles.append( role ) else: invalid_manage_roles_names.append( role_id ) if len( invalid_manage_roles_names ) > 0: log.warning( "The following roles could not be added to the manage folder permission: " + str( invalid_manage_roles_names ) ) # MODIFY FOLDER ROLES valid_modify_roles = [] invalid_modify_roles_names = [] for role_id in new_modify_roles_ids: role = self._load_role( trans, role_id ) # Check whether role is in the set of allowed roles valid_roles, total_roles = trans.app.security_agent.get_valid_roles( trans, folder ) if role in valid_roles: valid_modify_roles.append( role ) else: invalid_modify_roles_names.append( role_id ) if len( invalid_modify_roles_names ) > 0: log.warning( "The following roles could not be added to the modify folder permission: " + str( invalid_modify_roles_names ) ) permissions = { trans.app.security_agent.permitted_actions.LIBRARY_ADD: valid_add_roles } permissions.update( { trans.app.security_agent.permitted_actions.LIBRARY_MANAGE: valid_manage_roles } ) permissions.update( { trans.app.security_agent.permitted_actions.LIBRARY_MODIFY: valid_modify_roles } ) trans.app.security_agent.set_all_library_permissions( trans, folder, permissions ) else: raise exceptions.RequestParameterInvalidException( 'The mandatory parameter "action" has an invalid value.' 'Allowed values are: "set_permissions"' ) return self.folder_manager.get_current_roles( trans, folder )
@expose_api
[docs] def delete( self, trans, id, **kwd ): """ delete( self, trans, id, **kwd ) * DELETE /api/folders/{id} marks the folder with the given ``id`` as `deleted` (or removes the `deleted` mark if the `undelete` param is true) .. note:: Currently, only admin users can un/delete folders. :param id: the encoded id of the folder to un/delete :type id: an encoded id string :param undelete: (optional) flag specifying whether the item should be deleted or undeleted, defaults to false: :type undelete: bool :returns: detailed folder information :rtype: dictionary :raises: ItemAccessibilityException, MalformedId, ObjectNotFound """ folder = self.folder_manager.get( trans, self.folder_manager.cut_and_decode( trans, id ), True ) undelete = util.string_as_bool( kwd.get( 'undelete', False ) ) folder = self.folder_manager.delete( trans, folder, undelete ) folder_dict = self.folder_manager.get_folder_dict( trans, folder ) return folder_dict
@expose_api
[docs] def update( self, trans, id, library_id, payload, **kwd ): """ PUT /api/folders/{encoded_folder_id} """ raise exceptions.NotImplemented( 'Updating folder through this endpoint is not implemented yet.' ) # TODO move to Role manager
def _load_role( self, trans, role_name ): """ Method loads the role from the DB based on the given role name. :param role_name: name of the role to load from the DB :type role_name: string :rtype: Role :returns: the loaded Role object :raises: InconsistentDatabase, RequestParameterInvalidException, InternalServerError """ try: role = trans.sa_session.query( trans.app.model.Role ).filter( trans.model.Role.table.c.name == role_name ).one() except MultipleResultsFound: raise exceptions.InconsistentDatabase( 'Multiple roles found with the same name. Name: ' + str( role_name ) ) except NoResultFound: raise exceptions.RequestParameterInvalidException( 'No role found with the name provided. Name: ' + str( role_name ) ) except Exception, e: raise exceptions.InternalServerError( 'Error loading from the database.' + str(e)) return role