Source code for galaxy.webapps.tool_shed.security
"""Tool Shed Security"""
import ConfigParser
import logging
import os
from datetime import datetime
from datetime import timedelta
from galaxy.util.bunch import Bunch
from galaxy.util import listify
from galaxy.model.orm import and_
log = logging.getLogger(__name__)
[docs]class Action( object ):
def __init__( self, action, description, model ):
self.action = action
self.description = description
self.model = model
[docs]class RBACAgent:
"""Handle Galaxy Tool Shed security"""
permitted_actions = Bunch()
[docs] def associate_components( self, **kwd ):
raise 'No valid method of associating provided components: %s' % kwd
[docs] def associate_user_role( self, user, role ):
raise 'No valid method of associating a user with a role'
[docs] def convert_permitted_action_strings( self, permitted_action_strings ):
"""
When getting permitted actions from an untrusted source like a
form, ensure that they match our actual permitted actions.
"""
return filter( lambda x: x is not None, [ self.permitted_actions.get( action_string ) for action_string in permitted_action_strings ] )
[docs] def get_action( self, name, default=None ):
"""Get a permitted action by its dict key or action name"""
for k, v in self.permitted_actions.items():
if k == name or v.action == name:
return v
return default
[docs] def get_actions( self ):
"""Get all permitted actions as a list of Action objects"""
return self.permitted_actions.__dict__.values()
[docs] def get_item_actions( self, action, item ):
raise 'No valid method of retrieving action (%s) for item %s.' % ( action, item )
[docs]class CommunityRBACAgent( RBACAgent ):
def __init__( self, model, permitted_actions=None ):
self.model = model
if permitted_actions:
self.permitted_actions = permitted_actions
@property
def sa_session( self ):
"""Returns a SQLAlchemy session"""
return self.model.context
[docs] def allow_action( self, roles, action, item ):
"""
Method for checking a permission for the current user ( based on roles ) to perform a
specific action on an item
"""
item_actions = self.get_item_actions( action, item )
if not item_actions:
return action.model == 'restrict'
ret_val = False
for item_action in item_actions:
if item_action.role in roles:
ret_val = True
break
return ret_val
[docs] def associate_components( self, **kwd ):
if 'user' in kwd:
if 'group' in kwd:
return self.associate_user_group( kwd['user'], kwd['group'] )
elif 'role' in kwd:
return self.associate_user_role( kwd['user'], kwd['role'] )
elif 'role' in kwd:
if 'group' in kwd:
return self.associate_group_role( kwd['group'], kwd['role'] )
elif 'repository' in kwd:
return self.associate_repository_category( kwd[ 'repository' ], kwd[ 'category' ] )
raise 'No valid method of associating provided components: %s' % kwd
[docs] def associate_group_role( self, group, role ):
assoc = self.model.GroupRoleAssociation( group, role )
self.sa_session.add( assoc )
self.sa_session.flush()
return assoc
[docs] def associate_user_group( self, user, group ):
assoc = self.model.UserGroupAssociation( user, group )
self.sa_session.add( assoc )
self.sa_session.flush()
return assoc
[docs] def associate_user_role( self, user, role ):
assoc = self.model.UserRoleAssociation( user, role )
self.sa_session.add( assoc )
self.sa_session.flush()
return assoc
[docs] def associate_repository_category( self, repository, category ):
assoc = self.model.RepositoryCategoryAssociation( repository, category )
self.sa_session.add( assoc )
self.sa_session.flush()
return assoc
[docs] def create_private_user_role( self, user ):
# Create private role
role = self.model.Role( name=user.email, description='Private Role for ' + user.email, type=self.model.Role.types.PRIVATE )
self.sa_session.add( role )
self.sa_session.flush()
# Add user to role
self.associate_components( role=role, user=user )
return role
[docs] def get_item_actions( self, action, item ):
# item must be one of: Dataset, Library, LibraryFolder, LibraryDataset, LibraryDatasetDatasetAssociation
return [ permission for permission in item.actions if permission.action == action.action ]
[docs] def get_private_user_role( self, user, auto_create=False ):
role = self.sa_session.query( self.model.Role ) \
.filter( and_( self.model.Role.table.c.name == user.email,
self.model.Role.table.c.type == self.model.Role.types.PRIVATE ) ) \
.first()
if not role:
if auto_create:
return self.create_private_user_role( user )
else:
return None
return role
[docs] def get_repository_reviewer_role( self ):
return self.sa_session.query( self.model.Role ) \
.filter( and_( self.model.Role.table.c.name == 'Repository Reviewer',
self.model.Role.table.c.type == self.model.Role.types.SYSTEM ) ) \
.first()
[docs] def set_entity_group_associations( self, groups=None, users=None, roles=None, delete_existing_assocs=True ):
if groups is None:
groups = []
if users is None:
users = []
if roles is None:
roles = []
for group in groups:
if delete_existing_assocs:
for a in group.roles + group.users:
self.sa_session.delete( a )
self.sa_session.flush()
for role in roles:
self.associate_components( group=group, role=role )
for user in users:
self.associate_components( group=group, user=user )
[docs] def set_entity_role_associations( self, roles=None, users=None, groups=None, repositories=None, delete_existing_assocs=True ):
if roles is None:
roles = []
if users is None:
users = []
if groups is None:
groups = []
if repositories is None:
repositories = []
for role in roles:
if delete_existing_assocs:
for a in role.users + role.groups:
self.sa_session.delete( a )
self.sa_session.flush()
for user in users:
self.associate_components( user=user, role=role )
for group in groups:
self.associate_components( group=group, role=role )
[docs] def set_entity_user_associations( self, users=None, roles=None, groups=None, delete_existing_assocs=True ):
if users is None:
users = []
if roles is None:
roles = []
if groups is None:
groups = []
for user in users:
if delete_existing_assocs:
for a in user.non_private_roles + user.groups:
self.sa_session.delete( a )
self.sa_session.flush()
self.sa_session.refresh( user )
for role in roles:
# Make sure we are not creating an additional association with a PRIVATE role
if role not in user.roles:
self.associate_components( user=user, role=role )
for group in groups:
self.associate_components( user=user, group=group )
[docs] def can_push( self, app, user, repository ):
if user:
return user.username in listify( repository.allow_push( app ) )
return False
[docs] def user_can_administer_repository( self, user, repository ):
"""Return True if the received user can administer the received repository."""
if user:
if repository:
repository_admin_role = repository.admin_role
for rra in repository.roles:
role = rra.role
if role.id == repository_admin_role.id:
# We have the repository's admin role, so see if the user is associated with it.
for ura in role.users:
role_member = ura.user
if role_member.id == user.id:
return True
# The user is not directly associated with the role, so see if they are a member
# of a group that is associated with the role.
for gra in role.groups:
group = gra.group
for uga in group.members:
member = uga.user
if member.id == user.id:
return True
return False
[docs] def user_can_import_repository_archive( self, user, archive_owner ):
# This method should be called only if the current user is not an admin.
if user.username == archive_owner:
return True
# A member of the IUC is authorized to create new repositories that are owned by another user.
iuc_group = self.sa_session.query( self.model.Group ) \
.filter( and_( self.model.Group.table.c.name == 'Intergalactic Utilities Commission',
self.model.Group.table.c.deleted == False ) ) \
.first()
if iuc_group is not None:
for uga in iuc_group.users:
if uga.user.id == user.id:
return True
return False
[docs] def user_can_review_repositories( self, user ):
if user:
roles = user.all_roles()
if roles:
repository_reviewer_role = self.get_repository_reviewer_role()
if repository_reviewer_role:
return repository_reviewer_role in roles
return False
[docs] def user_can_browse_component_review( self, app, repository, component_review, user ):
if component_review and user:
if self.can_push( app, user, repository ):
# A user with write permission on the repository can access private/public component reviews.
return True
else:
if self.user_can_review_repositories( user ):
# Reviewers can access private/public component reviews.
return True
return False
[docs]def get_permitted_actions( filter=None ):
'''Utility method to return a subset of RBACAgent's permitted actions'''
if filter is None:
return RBACAgent.permitted_actions
tmp_bunch = Bunch()
[ tmp_bunch.__dict__.__setitem__(k, v) for k, v in RBACAgent.permitted_actions.items() if k.startswith( filter ) ]
return tmp_bunch