Source code for galaxy.datatypes.display_applications.parameters

#Contains parameters that are used in Display Applications
import logging, urllib
from galaxy.util import string_as_bool
from galaxy.util.bunch import Bunch
from galaxy.util.template import fill_template
from galaxy.web import url_for
import mimetypes

DEFAULT_DATASET_NAME = 'dataset'

[docs]class DisplayApplicationParameter( object ): """ Abstract Class for Display Application Parameters """ type = None @classmethod
[docs] def from_elem( cls, elem, link ): param_type = elem.get( 'type', None ) assert param_type, 'DisplayApplicationParameter requires a type' return parameter_type_to_class[ param_type ]( elem, link )
def __init__( self, elem, link ): self.name = elem.get( 'name', None ) assert self.name, 'DisplayApplicationParameter requires a name' self.link = link self.url = elem.get( 'url', self.name ) #name used in url for display purposes defaults to name; e.g. want the form of file.ext, where a '.' is not allowed as python variable name/keyword self.mime_type = elem.get( 'mimetype', None ) self.guess_mime_type = string_as_bool( elem.get( 'guess_mimetype', 'False' ) ) self.viewable = string_as_bool( elem.get( 'viewable', 'False' ) ) #only allow these to be viewed via direct url when explicitly set to viewable self.strip = string_as_bool( elem.get( 'strip', 'False' ) ) self.strip_https = string_as_bool( elem.get( 'strip_https', 'False' ) ) self.allow_override = string_as_bool( elem.get( 'allow_override', 'False' ) ) # Passing query param app_<name>=<value> to dataset controller allows override if this is true.
[docs] def get_value( self, other_values, dataset_hash, user_hash, trans ): raise Exception, 'Unimplemented'
[docs] def prepare( self, other_values, dataset_hash, user_hash, trans ): return self.get_value( other_values, dataset_hash, user_hash, trans )
[docs] def ready( self, other_values ): return True
[docs] def is_preparing( self, other_values ): return False
[docs] def build_url( self, other_values ): return fill_template( self.url, context = other_values )
[docs]class DisplayApplicationDataParameter( DisplayApplicationParameter ): """ Parameter that returns a file_name containing the requested content """ type = 'data' def __init__( self, elem, link ): DisplayApplicationParameter.__init__( self, elem, link ) self.extensions = elem.get( 'format', None ) if self.extensions: self.extensions = self.extensions.split( "," ) self.metadata = elem.get( 'metadata', None ) self.dataset = elem.get( 'dataset', DEFAULT_DATASET_NAME ) # 'dataset' is default name assigned to dataset to be displayed assert not ( self.extensions and self.metadata ), 'A format or a metadata can be defined for a DisplayApplicationParameter, but not both.' self.viewable = string_as_bool( elem.get( 'viewable', 'True' ) ) #data params should be viewable self.force_url_param = string_as_bool( elem.get( 'force_url_param', 'False' ) ) self.force_conversion = string_as_bool( elem.get( 'force_conversion', 'False' ) ) @property def formats( self ): if self.extensions: return tuple( map( type, map( self.link.display_application.app.datatypes_registry.get_datatype_by_extension, self.extensions ) ) ) return None def _get_dataset_like_object( self, other_values ): #this returned object has file_name, state, and states attributes equivalent to a DatasetAssociation data = other_values.get( self.dataset, None ) assert data, 'Base dataset could not be found in values provided to DisplayApplicationDataParameter' if isinstance( data, DisplayDataValueWrapper ): data = data.value if self.metadata: rval = getattr( data.metadata, self.metadata, None ) assert rval, 'Unknown metadata name (%s) provided for dataset type (%s).' % ( self.metadata, data.datatype.__class__.name ) return Bunch( file_name = rval.file_name, state = data.state, states = data.states, extension='data' ) elif self.extensions and ( self.force_conversion or not isinstance( data.datatype, self.formats ) ): for ext in self.extensions: rval = data.get_converted_files_by_type( ext ) if rval: return rval assert data.find_conversion_destination( self.formats )[0] is not None, "No conversion path found for data param: %s" % self.name return None return data
[docs] def get_value( self, other_values, dataset_hash, user_hash, trans ): data = self._get_dataset_like_object( other_values ) if data: return DisplayDataValueWrapper( data, self, other_values, dataset_hash, user_hash, trans ) return None
[docs] def prepare( self, other_values, dataset_hash, user_hash, trans ): data = self._get_dataset_like_object( other_values ) if not data and self.formats: data = other_values.get( self.dataset, None ) trans.sa_session.refresh( data ) #start conversion #FIXME: Much of this is copied (more than once...); should be some abstract method elsewhere called from here #find target ext target_ext, converted_dataset = data.find_conversion_destination( self.formats, converter_safe = True ) if target_ext and not converted_dataset: if isinstance( data, DisplayDataValueWrapper ): data = data.value new_data = data.datatype.convert_dataset( trans, data, target_ext, return_output = True, visible = False ).values()[0] new_data.hid = data.hid new_data.name = data.name trans.sa_session.add( new_data ) assoc = trans.app.model.ImplicitlyConvertedDatasetAssociation( parent = data, file_type = target_ext, dataset = new_data, metadata_safe = False ) trans.sa_session.add( assoc ) trans.sa_session.flush() elif converted_dataset and converted_dataset.state == converted_dataset.states.ERROR: raise Exception, "Dataset conversion failed for data parameter: %s" % self.name return self.get_value( other_values, dataset_hash, user_hash, trans )
[docs] def is_preparing( self, other_values ): value = self._get_dataset_like_object( other_values ) if value and value.state in ( value.states.NEW, value.states.UPLOAD, value.states.QUEUED, value.states.RUNNING ): return True return False
[docs] def ready( self, other_values ): value = self._get_dataset_like_object( other_values ) if value: if value.state == value.states.OK: return True elif value.state == value.states.ERROR: raise Exception( 'A data display parameter is in the error state: %s' % ( self.name ) ) return False
[docs]class DisplayApplicationTemplateParameter( DisplayApplicationParameter ): """ Parameter that returns a string containing the requested content """ type = 'template' def __init__( self, elem, link ): DisplayApplicationParameter.__init__( self, elem, link ) self.text = elem.text or ''
[docs] def get_value( self, other_values, dataset_hash, user_hash, trans ): value = fill_template( self.text, context = other_values ) if self.strip: value = value.strip() return DisplayParameterValueWrapper( value, self, other_values, dataset_hash, user_hash, trans )
parameter_type_to_class = { DisplayApplicationDataParameter.type:DisplayApplicationDataParameter, DisplayApplicationTemplateParameter.type:DisplayApplicationTemplateParameter }
[docs]class DisplayParameterValueWrapper( object ): ACTION_NAME = 'param' def __init__( self, value, parameter, other_values, dataset_hash, user_hash, trans ): self.value = value self.parameter = parameter self.other_values = other_values self.trans = trans self._dataset_hash = dataset_hash self._user_hash = user_hash self._url = self.parameter.build_url( self.other_values ) def __str__( self ): return str( self.value )
[docs] def mime_type( self ): if self.parameter.mime_type is not None: return self.parameter.mime_type if self.parameter.guess_mime_type: mime, encoding = mimetypes.guess_type( self._url ) if not mime: mime = self.trans.app.datatypes_registry.get_mimetype_by_extension( ".".split( self._url )[ -1 ], None ) if mime: return mime return 'text/plain'
@property def url( self ): base_url = self.trans.request.base if self.parameter.strip_https and base_url[ : 5].lower() == 'https': base_url = "http%s" % base_url[ 5: ] return "%s%s" % ( base_url, url_for( controller='dataset', action="display_application", dataset_id=self._dataset_hash, user_id=self._user_hash, app_name=urllib.quote_plus( self.parameter.link.display_application.id ), link_name=urllib.quote_plus( self.parameter.link.id ), app_action=self.action_name, action_param=self._url ) ) @property def action_name( self ): return self.ACTION_NAME @property def qp( self ): #returns quoted str contents return self.other_values[ 'qp' ]( str( self ) ) def __getattr__( self, key ): return getattr( self.value, key )
[docs]class DisplayDataValueWrapper( DisplayParameterValueWrapper ): ACTION_NAME = 'data' def __str__( self ): #string of data param is filename return str( self.value.file_name )
[docs] def mime_type( self ): if self.parameter.mime_type is not None: return self.parameter.mime_type if self.parameter.guess_mime_type: mime, encoding = mimetypes.guess_type( self._url ) if not mime: mime = self.trans.app.datatypes_registry.get_mimetype_by_extension( ".".split( self._url )[ -1 ], None ) if mime: return mime if hasattr( self.value, 'get_mime' ): return self.value.get_mime() return self.other_values[ DEFAULT_DATASET_NAME ].get_mime()
@property def action_name( self ): if self.parameter.force_url_param: return super( DisplayParameterValueWrapper, self ).action_name return self.ACTION_NAME @property def qp( self ): #returns quoted url contents return self.other_values[ 'qp' ]( self.url )