"""
API operations on User objects.
"""
from galaxy.web.base.controller import BaseAPIController
from galaxy.web.base.controller import UsesTagsMixin
from galaxy.web.base.controller import CreatesApiKeysMixin
from galaxy.web.base.controller import CreatesUsersMixin
from galaxy.security.validate_user_input import validate_email
from galaxy.security.validate_user_input import validate_password
from galaxy.security.validate_user_input import validate_publicname
from galaxy.web import _future_expose_api as expose_api
from galaxy.web import _future_expose_api_anonymous as expose_api_anonymous
from galaxy import web
from galaxy import util
from galaxy import exceptions
import logging
log = logging.getLogger( __name__ )
[docs]class UserAPIController( BaseAPIController, UsesTagsMixin, CreatesUsersMixin, CreatesApiKeysMixin ):
@expose_api
[docs] def index( self, trans, deleted='False', f_email=None, **kwd ):
"""
GET /api/users
GET /api/users/deleted
Displays a collection (list) of users.
"""
rval = []
query = trans.sa_session.query( trans.app.model.User )
deleted = util.string_as_bool( deleted )
if f_email:
query = query.filter(trans.app.model.User.email.like("%%%s%%" % f_email))
if deleted:
query = query.filter( trans.app.model.User.table.c.deleted == True ) # noqa
# only admins can see deleted users
if not trans.user_is_admin():
return []
else:
query = query.filter( trans.app.model.User.table.c.deleted == False ) # noqa
# special case: user can see only their own user
# special case2: if the galaxy admin has specified that other user email/names are
# exposed, we don't want special case #1
if not trans.user_is_admin() and not trans.app.config.expose_user_name and not trans.app.config.expose_user_email:
item = trans.user.to_dict( value_mapper={ 'id': trans.security.encode_id } )
return [item]
for user in query:
item = user.to_dict( value_mapper={ 'id': trans.security.encode_id } )
# If NOT configured to expose_email, do not expose email UNLESS the user is self, or
# the user is an admin
if not trans.app.config.expose_user_name and user is not trans.user and not trans.user_is_admin():
del item['username']
if not trans.app.config.expose_user_email and user is not trans.user and not trans.user_is_admin():
del item['email']
# TODO: move into api_values
rval.append( item )
return rval
@expose_api_anonymous
[docs] def show( self, trans, id, deleted='False', **kwd ):
"""
GET /api/users/{encoded_user_id}
GET /api/users/deleted/{encoded_user_id}
GET /api/users/current
Displays information about a user.
"""
deleted = util.string_as_bool( deleted )
try:
# user is requesting data about themselves
if id == "current":
# ...and is anonymous - return usage and quota (if any)
if not trans.user:
item = self.anon_user_api_value( trans )
return item
# ...and is logged in - return full
else:
user = trans.user
else:
user = self.get_user( trans, id, deleted=deleted )
# check that the user is requesting themselves (and they aren't del'd) unless admin
if not trans.user_is_admin():
assert trans.user == user
assert not user.deleted
except:
raise exceptions.RequestParameterInvalidException( 'Invalid user id specified', id=id )
item = user.to_dict( view='element', value_mapper={ 'id': trans.security.encode_id,
'total_disk_usage': float } )
# add a list of tags used by the user (as strings)
item[ 'tags_used' ] = self.get_user_tags_used( trans, user=user )
# TODO: move into api_values (needs trans, tho - can we do that with api_keys/@property??)
# TODO: works with other users (from admin)??
item[ 'quota_percent' ] = trans.app.quota_agent.get_percent( trans=trans )
item[ 'is_admin' ] = trans.user_is_admin()
return item
@expose_api
[docs] def create( self, trans, payload, **kwd ):
"""
POST /api/users
Creates a new Galaxy user.
"""
if not trans.app.config.allow_user_creation and not trans.user_is_admin():
raise exceptions.ConfigDoesNotAllowException( 'User creation is not allowed in this Galaxy instance' )
if trans.app.config.use_remote_user and trans.user_is_admin():
user = trans.get_or_create_remote_user( remote_user_email=payload['remote_user_email'] )
elif trans.user_is_admin():
username = payload[ 'username' ]
email = payload[ 'email' ]
password = payload[ 'password' ]
message = "\n".join( [ validate_email( trans, email ),
validate_password( trans, password, password ),
validate_publicname( trans, username ) ] ).rstrip()
if message:
raise exceptions.RequestParameterInvalidException( message )
else:
user = self.create_user( trans=trans, email=email, username=username, password=password )
else:
raise exceptions.NotImplemented()
item = user.to_dict( view='element', value_mapper={ 'id': trans.security.encode_id,
'total_disk_usage': float } )
return item
@expose_api
@web.require_admin
[docs] def api_key( self, trans, user_id, **kwd ):
"""
POST /api/users/{encoded_user_id}/api_key
Creates a new API key for specified user.
"""
user = self.get_user( trans, user_id )
key = self.create_api_key( trans, user )
return key
@expose_api
[docs] def update( self, trans, **kwd ):
raise exceptions.NotImplemented()
@expose_api
[docs] def delete( self, trans, **kwd ):
raise exceptions.NotImplemented()
@expose_api
[docs] def undelete( self, trans, **kwd ):
raise exceptions.NotImplemented()
# TODO: move to more basal, common resource than this
[docs] def anon_user_api_value( self, trans ):
"""
Returns data for an anonymous user, truncated to only usage and quota_percent
"""
usage = trans.app.quota_agent.get_usage( trans )
percent = trans.app.quota_agent.get_percent( trans=trans, usage=usage )
return {'total_disk_usage': int( usage ),
'nice_total_disk_usage': util.nice_size( usage ),
'quota_percent': percent}