from __future__ import absolute_import
from galaxy.web.base.controller import *
from galaxy.web.framework.helpers import time_ago, iff, grids
from galaxy.model.orm import *
from galaxy import model, util
from galaxy.web.form_builder import *
from .requests_common import invalid_id_redirect
import logging, os
log = logging.getLogger( __name__ )
[docs]class ExternalServiceGrid( grids.Grid ):
# Custom column types
[docs] class NameColumn( grids.TextColumn ):
[docs] def get_value(self, trans, grid, external_service):
return external_service.name
[docs] class ExternalServiceTypeColumn( grids.TextColumn ):
[docs] def get_value(self, trans, grid, external_service):
try:
return trans.app.external_service_types.all_external_service_types[ external_service.external_service_type_id ].name
except KeyError, e:
return 'Error in loading external_service type: %s' % external_service.external_service_type_id
# Grid definition
title = "External Services"
template = "admin/external_service/grid.mako"
model_class = model.ExternalService
default_sort_key = "-create_time"
num_rows_per_page = 50
preserve_state = True
use_paging = True
default_filter = dict( deleted="False" )
columns = [
NameColumn( "Name",
key="name",
link=( lambda item: iff( item.deleted, None, dict( operation="view", id=item.id ) ) ),
attach_popup=True,
filterable="advanced" ),
grids.TextColumn( "Description",
key='description',
filterable="advanced" ),
ExternalServiceTypeColumn( "External Service Type" ),
grids.GridColumn( "Last Updated",
key="update_time",
format=time_ago ),
grids.DeletedColumn( "Deleted",
key="deleted",
visible=False,
filterable="advanced" )
]
columns.append( grids.MulticolFilterColumn( "Search",
cols_to_filter=[ columns[0], columns[1] ],
key="free-text-search",
visible=False,
filterable="standard" ) )
operations = [
grids.GridOperation( "Edit", allow_multiple=False, condition=( lambda item: not item.deleted ) ),
grids.GridOperation( "Delete", allow_multiple=True, condition=( lambda item: not item.deleted ) ),
grids.GridOperation( "Undelete", condition=( lambda item: item.deleted ) ),
]
global_actions = [
grids.GridAction( "Reload external service types", dict( controller='external_service', action='reload_external_service_types' ) ),
grids.GridAction( "Create new external service", dict( controller='external_service', action='create_external_service' ) )
]
[docs]class ExternalService( BaseUIController, UsesFormDefinitionsMixin ):
external_service_grid = ExternalServiceGrid()
@web.expose
@web.require_admin
[docs] def browse_external_services( self, trans, **kwd ):
if 'operation' in kwd:
operation = kwd['operation'].lower()
obj_id = kwd.get( 'id', None )
if operation == "view":
return self.view_external_service( trans, **kwd )
elif operation == "edit":
return self.edit_external_service( trans, **kwd )
elif operation == "delete":
return self.delete_external_service( trans, **kwd )
elif operation == "undelete":
return self.undelete_external_service( trans, **kwd )
# Render the grid view
return self.external_service_grid( trans, **kwd )
@web.expose
@web.require_admin
[docs] def create_external_service( self, trans, **kwd ):
params = util.Params( kwd )
message = util.restore_text( params.get( 'message', '' ) )
status = params.get( 'status', 'done' )
external_service_type_id = params.get( 'external_service_type_id', 'none' )
widgets = self.__build_external_service_widgets( trans, external_service=None, **kwd )
external_service_type = None
error = False
if not trans.app.external_service_types.visible_external_service_types:
error = True
message = 'There are no visible external_service types in the external_service types config file'
elif params.get( 'create_external_service_button', False ):
if external_service_type_id == 'none':
error = True
message = 'Provide an external_service_type_id to create a new external service.'
else:
self.__save_external_service( trans, **kwd )
message = 'The external_service has been created.'
return trans.response.send_redirect( web.url_for( controller='external_service',
action='browse_external_services',
message=message,
status=status ) )
elif external_service_type_id != 'none':
# Form submission via refresh_on_change
trans.app.external_service_types.reload( external_service_type_id )
external_service_type = self.get_external_service_type( trans, external_service_type_id )
widgets.extend( external_service_type.form_definition.get_widgets( trans.user, **kwd ) )
if error:
return trans.response.send_redirect( web.url_for( controller='external_service',
action='browse_external_services',
message=message,
status='error' ) )
return trans.fill_template( '/admin/external_service/create_external_service.mako',
widgets=widgets,
message=message,
status=status,
external_service_type=external_service_type )
@web.expose
@web.require_admin
[docs] def view_external_service( self, trans, **kwd ):
external_service_id = kwd.get( 'id', None )
try:
external_service = trans.sa_session.query( trans.model.ExternalService ).get( trans.security.decode_id( external_service_id ) )
except:
return invalid_id_redirect( trans, 'external_service', external_service_id, 'external_service', action='browse_external_services' )
external_service_type = self.get_external_service_type( trans, external_service.external_service_type_id )
return trans.fill_template( '/admin/external_service/view_external_service.mako',
external_service=external_service,
external_service_type=external_service_type )
@web.expose
@web.require_admin
[docs] def edit_external_service( self, trans, **kwd ):
params = util.Params( kwd )
message = util.restore_text( params.get( 'message', '' ) )
status = params.get( 'status', 'done' )
external_service_id = params.get( 'id', None )
try:
external_service = trans.sa_session.query( trans.model.ExternalService ).get( trans.security.decode_id( external_service_id ) )
except:
return invalid_id_redirect( trans, 'external_service', external_service_id, 'external_service', action='browse_external_services' )
if params.get( 'edit_external_service_button', False ):
external_service = self.__save_external_service( trans, **kwd )
trans.sa_session.refresh( external_service )
message = 'Changes made to external service (%s) have been saved' % external_service.name
widgets = self.__build_external_service_widgets( trans, external_service, **kwd )
widgets.extend( external_service.form_definition.get_widgets( trans.user, external_service.form_values.content, **kwd ) )
external_service_type = self.get_external_service_type( trans, external_service.external_service_type_id )
return trans.fill_template( '/admin/external_service/edit_external_service.mako',
external_service=external_service,
widgets=widgets,
message=message,
status=status,
external_service_type=external_service_type )
def __save_external_service( self, trans, **kwd ):
# Here we save a newly created external_service or save changed
# attributes of an existing external_service.
params = util.Params( kwd )
external_service_id = params.get( 'id', None )
name = util.restore_text( params.get( 'external_service_name', '' ) )
description = util.restore_text( params.get( 'external_service_description', '' ) )
version = util.restore_text( params.get( 'external_service_version', '' ) )
external_service_type_id = params.get( 'external_service_type_id', '' )
if external_service_id:
# We're saving changed attributes of an existing external_service.
external_service = trans.sa_session.query( trans.model.ExternalService ).get( trans.security.decode_id( external_service_id ) )
external_service.name = name
external_service.description = description
external_service.version = version
external_service.form_values.content = self.get_form_values( trans, trans.user, external_service.form_definition, **kwd )
trans.sa_session.add( external_service )
trans.sa_session.add( external_service.form_values )
trans.sa_session.flush()
else:
# We're saving a newly created external_service
external_service_type = self.get_external_service_type( trans, external_service_type_id )
external_service = trans.model.ExternalService( name, description, external_service_type_id, version )
external_service.form_definition = external_service_type.form_definition
# Get the form values from kwd, some of which may be different than the defaults in the external service
# type config because the user could have overwritten them.
values = self.get_form_values( trans, trans.user, external_service.form_definition, **kwd )
external_service.form_values = trans.model.FormValues( external_service.form_definition, values )
trans.sa_session.add( external_service )
trans.sa_session.add( external_service.form_definition )
trans.sa_session.add( external_service.form_values )
trans.sa_session.flush()
return external_service
@web.expose
@web.require_admin
@web.expose
@web.require_admin
@web.expose
@web.require_admin
[docs] def delete_external_service( self, trans, **kwd ):
external_service_id = kwd.get( 'id', '' )
external_service_id_list = util.listify( external_service_id )
for external_service_id in external_service_id_list:
try:
external_service = trans.sa_session.query( trans.model.ExternalService ).get( trans.security.decode_id( external_service_id ) )
except:
return invalid_id_redirect( trans, 'external_service', external_service_id, 'external_service', action='browse_external_services' )
external_service.deleted = True
trans.sa_session.add( external_service )
trans.sa_session.flush()
status = 'done'
message = '%i external services has been deleted' % len( external_service_id_list )
return trans.response.send_redirect( web.url_for( controller='external_service',
action='browse_external_services',
message=message,
status='done' ) )
@web.expose
@web.require_admin
[docs] def undelete_external_service( self, trans, **kwd ):
external_service_id = kwd.get( 'id', '' )
external_service_id_list = util.listify( external_service_id )
for external_service_id in external_service_id_list:
try:
external_service = trans.sa_session.query( trans.model.ExternalService ).get( trans.security.decode_id( external_service_id ) )
except:
return invalid_id_redirect( trans, 'external_service', external_service_id, 'external_service', action='browse_external_services' )
external_service.deleted = False
trans.sa_session.add( external_service )
trans.sa_session.flush()
status = 'done'
message = '%i external services have been undeleted' % len( external_service_id_list )
return trans.response.send_redirect( web.url_for( controller='external_service',
action='browse_external_services',
message=message,
status=status ) )
@web.expose
@web.require_admin
[docs] def reload_external_service_types( self, trans, **kwd ):
params = util.Params( kwd )
message = util.restore_text( params.get( 'message', '' ) )
status = params.get( 'status', 'done' )
external_service_type_id = params.get( 'external_service_type_id', 'none' )
if params.get( 'reload_external_service_type_button', False ):
new_external_service_type = trans.app.external_service_types.reload( external_service_type_id )
status = 'done'
message = 'Reloaded external service type: %s' % new_external_service_type.name
external_service_type_select_field = self.__build_external_service_type_select_field( trans,
external_service_type_id,
refresh_on_change=False,
visible_external_service_types_only=False )
if not trans.app.external_service_types.visible_external_service_types:
message = 'There are no visible external service types in the external service types config file.'
status = 'error'
return trans.response.send_redirect( web.url_for( controller='external_service',
action='browse_external_services',
message=message,
status=status ) )
return trans.fill_template( '/admin/external_service/reload_external_service_types.mako',
external_service_type_select_field=external_service_type_select_field,
message=message,
status=status )
[docs] def get_external_service_type(self, trans, external_service_type_id, action='browse_external_services'):
try:
return trans.app.external_service_types.all_external_service_types[ external_service_type_id ]
except KeyError, e:
message = 'Error in loading external service type: %s' % external_service_type_id
return trans.response.send_redirect( web.url_for( controller='external_service',
action=action,
message=message,
status='error' ) )
# ===== Methods for building SelectFields used on various admin_requests forms
def __build_external_service_widgets( self, trans, external_service=None, **kwd ):
params = util.Params( kwd )
if external_service:
name = external_service.name
description = external_service.description
version = external_service.version
seq_type = external_service.external_service_type_id
else:
name = util.restore_text( params.get( 'external_service_name', '' ) )
description = util.restore_text( params.get( 'external_service_description', '' ) )
version = util.restore_text( params.get( 'external_service_version', '' ) )
selected_seq_type = params.get( 'external_service_type_id', '' )
if trans.app.external_service_types.all_external_service_types.has_key( selected_seq_type ):
seq_type = trans.app.external_service_types.all_external_service_types[ selected_seq_type ].id
else:
seq_type = 'none'
widgets = [ dict( label='Name',
widget=TextField( 'external_service_name', 40, name ),
helptext='' ),
dict( label='Description',
widget=TextField( 'external_service_description', 40, description ),
helptext='' ),
dict( label='Version',
widget=TextField( 'external_service_version', 40, version ),
helptext='' ) ]
# Do not show the external_service_type selectfield when editing a external_service
if not external_service:
widgets.append( dict( label='External service type',
widget=self.__build_external_service_type_select_field( trans, seq_type, visible_external_service_types_only=True ),
helptext='') )
return widgets
def __build_external_service_type_select_field( self, trans, selected_value, refresh_on_change=True, visible_external_service_types_only=False ):
external_service_types = trans.app.external_service_types.all_external_service_types
if visible_external_service_types_only:
objs_list = [ external_service_types[ seq_type_id ] for seq_type_id in trans.app.external_service_types.visible_external_service_types ]
else:
objs_list = external_service_types.values()
refresh_on_change_values = [ 'none' ]
refresh_on_change_values.extend( [ trans.security.encode_id( obj.id ) for obj in objs_list] )
select_external_service_type = SelectField( 'external_service_type_id',
refresh_on_change=refresh_on_change,
refresh_on_change_values=refresh_on_change_values )
if selected_value == 'none':
select_external_service_type.add_option( 'Select one', 'none', selected=True )
else:
select_external_service_type.add_option( 'Select one', 'none' )
for seq_type in objs_list:
if seq_type.version:
option_name = " ".join( [ seq_type.name, "version", seq_type.version ] )
else:
option_name = seq_type.name
if selected_value == seq_type.id:
select_external_service_type.add_option( option_name, seq_type.id, selected=True )
else:
select_external_service_type.add_option( option_name, seq_type.id )
return select_external_service_type