"""
Support for generating the options for a SelectToolParameter dynamically (based
on the values of other parameters or other aspects of the current state)
"""
import operator, sys, os, logging
import basic, validation
from galaxy.util import string_as_bool
from galaxy.model import User
import galaxy.tools
log = logging.getLogger(__name__)
[docs]class Filter( object ):
"""
A filter takes the current options list and modifies it.
"""
@classmethod
[docs] def from_element( cls, d_option, elem ):
"""Loads the proper filter by the type attribute of elem"""
type = elem.get( 'type', None )
assert type is not None, "Required 'type' attribute missing from filter"
return filter_types[type.strip()]( d_option, elem )
def __init__( self, d_option, elem ):
self.dynamic_option = d_option
self.elem = elem
[docs] def get_dependency_name( self ):
"""Returns the name of any depedencies, otherwise None"""
return None
[docs] def filter_options( self, options, trans, other_values ):
"""Returns a list of options after the filter is applied"""
raise TypeError( "Abstract Method" )
[docs]class StaticValueFilter( Filter ):
"""
Filters a list of options on a column by a static value.
Type: static_value
Required Attributes:
value: static value to compare to
column: column in options to compare with
Optional Attributes:
keep: Keep columns matching value (True)
Discard columns matching value (False)
"""
def __init__( self, d_option, elem ):
Filter.__init__( self, d_option, elem )
self.value = elem.get( "value", None )
assert self.value is not None, "Required 'value' attribute missing from filter"
column = elem.get( "column", None )
assert column is not None, "Required 'column' attribute missing from filter, when loading from file"
self.column = d_option.column_spec_to_index( column )
self.keep = string_as_bool( elem.get( "keep", 'True' ) )
[docs] def filter_options( self, options, trans, other_values ):
rval = []
filter_value = self.value
try:
filter_value = User.expand_user_properties( trans.user, filter_value)
except:
pass
for fields in options:
if ( self.keep and fields[self.column] == filter_value ) or ( not self.keep and fields[self.column] != filter_value ):
rval.append( fields )
return rval
[docs]class ParamValueFilter( Filter ):
"""
Filters a list of options on a column by the value of another input.
Type: param_value
Required Attributes:
- ref: Name of input value
- column: column in options to compare with
Optional Attributes:
- keep: Keep columns matching value (True)
Discard columns matching value (False)
- ref_attribute: Period (.) separated attribute chain of input (ref) to use as value for filter
"""
def __init__( self, d_option, elem ):
Filter.__init__( self, d_option, elem )
self.ref_name = elem.get( "ref", None )
assert self.ref_name is not None, "Required 'ref' attribute missing from filter"
column = elem.get( "column", None )
assert column is not None, "Required 'column' attribute missing from filter"
self.column = d_option.column_spec_to_index( column )
self.keep = string_as_bool( elem.get( "keep", 'True' ) )
self.ref_attribute = elem.get( "ref_attribute", None )
if self.ref_attribute:
self.ref_attribute = self.ref_attribute.split( '.' )
else:
self.ref_attribute = []
[docs] def get_dependency_name( self ):
return self.ref_name
[docs] def filter_options( self, options, trans, other_values ):
if trans is not None and trans.workflow_building_mode: return []
assert self.ref_name in other_values, "Required dependency '%s' not found in incoming values" % self.ref_name
ref = other_values.get( self.ref_name, None )
for ref_attribute in self.ref_attribute:
if not hasattr( ref, ref_attribute ):
return [] #ref does not have attribute, so we cannot filter, return empty list
ref = getattr( ref, ref_attribute )
ref = str( ref )
rval = []
for fields in options:
if ( self.keep and fields[self.column] == ref ) or ( not self.keep and fields[self.column] != ref ):
rval.append( fields )
return rval
[docs]class UniqueValueFilter( Filter ):
"""
Filters a list of options to be unique by a column value.
Type: unique_value
Required Attributes:
column: column in options to compare with
"""
def __init__( self, d_option, elem ):
Filter.__init__( self, d_option, elem )
column = elem.get( "column", None )
assert column is not None, "Required 'column' attribute missing from filter"
self.column = d_option.column_spec_to_index( column )
[docs] def get_dependency_name( self ):
return self.dynamic_option.dataset_ref_name
[docs] def filter_options( self, options, trans, other_values ):
rval = []
skip_list = []
for fields in options:
if fields[self.column] not in skip_list:
rval.append( fields )
skip_list.append( fields[self.column] )
return rval
[docs]class MultipleSplitterFilter( Filter ):
"""
Turns a single line of options into multiple lines, by splitting a column and creating a line for each item.
Type: multiple_splitter
Required Attributes:
column: column in options to compare with
Optional Attributes:
separator: Split column by this (,)
"""
def __init__( self, d_option, elem ):
Filter.__init__( self, d_option, elem )
self.separator = elem.get( "separator", "," )
columns = elem.get( "column", None )
assert columns is not None, "Required 'columns' attribute missing from filter"
self.columns = [ d_option.column_spec_to_index( column ) for column in columns.split( "," ) ]
[docs] def filter_options( self, options, trans, other_values ):
rval = []
for fields in options:
for column in self.columns:
for field in fields[column].split( self.separator ):
rval.append( fields[0:column] + [field] + fields[column+1:] )
return rval
[docs]class AttributeValueSplitterFilter( Filter ):
"""
Filters a list of attribute-value pairs to be unique attribute names.
Type: attribute_value_splitter
Required Attributes:
column: column in options to compare with
Optional Attributes:
pair_separator: Split column by this (,)
name_val_separator: Split name-value pair by this ( whitespace )
"""
def __init__( self, d_option, elem ):
Filter.__init__( self, d_option, elem )
self.pair_separator = elem.get( "pair_separator", "," )
self.name_val_separator = elem.get( "name_val_separator", None )
self.columns = elem.get( "column", None )
assert self.columns is not None, "Required 'columns' attribute missing from filter"
self.columns = [ int ( column ) for column in self.columns.split( "," ) ]
[docs] def filter_options( self, options, trans, other_values ):
attr_names = []
rval = []
for fields in options:
for column in self.columns:
for pair in fields[column].split( self.pair_separator ):
ary = pair.split( self.name_val_separator )
if len( ary ) == 2:
name, value = ary
if name not in attr_names:
rval.append( fields[0:column] + [name] + fields[column:] )
attr_names.append( name )
return rval
[docs]class AdditionalValueFilter( Filter ):
"""
Adds a single static value to an options list.
Type: add_value
Required Attributes:
value: value to appear in select list
Optional Attributes:
name: Display name to appear in select list (value)
index: Index of option list to add value (APPEND)
"""
def __init__( self, d_option, elem ):
Filter.__init__( self, d_option, elem )
self.value = elem.get( "value", None )
assert self.value is not None, "Required 'value' attribute missing from filter"
self.name = elem.get( "name", None )
if self.name is None:
self.name = self.value
self.index = elem.get( "index", None )
if self.index is not None:
self.index = int( self.index )
[docs] def filter_options( self, options, trans, other_values ):
rval = list( options )
add_value = []
for i in range( self.dynamic_option.largest_index + 1 ):
add_value.append( "" )
value_col = self.dynamic_option.columns.get( 'value', 0 )
name_col = self.dynamic_option.columns.get( 'name', value_col )
# Set name first, then value, in case they are the same column
add_value[ name_col ] = self.name
add_value[ value_col ] = self.value
if self.index is not None:
rval.insert( self.index, add_value )
else:
rval.append( add_value )
return rval
[docs]class RemoveValueFilter( Filter ):
"""
Removes a value from an options list.
Type: remove_value
Required Attributes::
value: value to remove from select list
or
ref: param to refer to
or
meta_ref: dataset to refer to
key: metadata key to compare to
"""
def __init__( self, d_option, elem ):
Filter.__init__( self, d_option, elem )
self.value = elem.get( "value", None )
self.ref_name = elem.get( "ref", None )
self.meta_ref = elem.get( "meta_ref", None )
self.metadata_key = elem.get( "key", None )
assert self.value is not None or ( ( self.ref_name is not None or self.meta_ref is not None )and self.metadata_key is not None ), ValueError( "Required 'value' or 'ref' and 'key' attributes missing from filter" )
self.multiple = string_as_bool( elem.get( "multiple", "False" ) )
self.separator = elem.get( "separator", "," )
[docs] def filter_options( self, options, trans, other_values ):
if trans is not None and trans.workflow_building_mode: return options
assert self.value is not None or ( self.ref_name is not None and self.ref_name in other_values ) or (self.meta_ref is not None and self.meta_ref in other_values ) or ( trans is not None and trans.workflow_building_mode), Exception( "Required dependency '%s' or '%s' not found in incoming values" % ( self.ref_name, self.meta_ref ) )
def compare_value( option_value, filter_value ):
if isinstance( filter_value, list ):
if self.multiple:
option_value = option_value.split( self.separator )
for value in filter_value:
if value not in filter_value:
return False
return True
return option_value in filter_value
if self.multiple:
return filter_value in option_value.split( self.separator )
return option_value == filter_value
value = self.value
if value is None:
if self.ref_name is not None:
value = other_values.get( self.ref_name )
else:
data_ref = other_values.get( self.meta_ref )
if not isinstance( data_ref, self.dynamic_option.tool_param.tool.app.model.HistoryDatasetAssociation ) and not ( isinstance( data_ref, galaxy.tools.DatasetFilenameWrapper ) ):
return options #cannot modify options
value = data_ref.metadata.get( self.metadata_key, None )
return [ ( disp_name, optval, selected ) for disp_name, optval, selected in options if not compare_value( optval, value ) ]
[docs]class SortByColumnFilter( Filter ):
"""
Sorts an options list by a column
Type: sort_by
Required Attributes:
column: column to sort by
"""
def __init__( self, d_option, elem ):
Filter.__init__( self, d_option, elem )
column = elem.get( "column", None )
assert column is not None, "Required 'column' attribute missing from filter"
self.column = d_option.column_spec_to_index( column )
[docs] def filter_options( self, options, trans, other_values ):
rval = []
for i, fields in enumerate( options ):
for j in range( 0, len( rval ) ):
if fields[self.column] < rval[j][self.column]:
rval.insert( j, fields )
break
else:
rval.append( fields )
return rval
filter_types = dict( data_meta = DataMetaFilter,
param_value = ParamValueFilter,
static_value = StaticValueFilter,
unique_value = UniqueValueFilter,
multiple_splitter = MultipleSplitterFilter,
attribute_value_splitter = AttributeValueSplitterFilter,
add_value = AdditionalValueFilter,
remove_value = RemoveValueFilter,
sort_by = SortByColumnFilter )
[docs]class DynamicOptions( object ):
"""Handles dynamically generated SelectToolParameter options"""
def __init__( self, elem, tool_param ):
def load_from_parameter( from_parameter, transform_lines = None ):
obj = self.tool_param
for field in from_parameter.split( '.' ):
obj = getattr( obj, field )
if transform_lines:
obj = eval( transform_lines )
return self.parse_file_fields( obj )
self.tool_param = tool_param
self.columns = {}
self.filters = []
self.file_fields = None
self.largest_index = 0
self.dataset_ref_name = None
# True if the options generation depends on one or more other parameters
# that are dataset inputs
self.has_dataset_dependencies = False
self.validators = []
self.converter_safe = True
# Parse the <options> tag
self.separator = elem.get( 'separator', '\t' )
self.line_startswith = elem.get( 'startswith', None )
data_file = elem.get( 'from_file', None )
self.index_file = None
self.missing_index_file = None
dataset_file = elem.get( 'from_dataset', None )
from_parameter = elem.get( 'from_parameter', None )
tool_data_table_name = elem.get( 'from_data_table', None )
# Options are defined from a data table loaded by the app
self.tool_data_table = None
self.missing_tool_data_table_name = None
if tool_data_table_name:
app = tool_param.tool.app
if tool_data_table_name in app.tool_data_tables:
self.tool_data_table = app.tool_data_tables[ tool_data_table_name ]
# Column definitions are optional, but if provided override those from the table
if elem.find( "column" ) is not None:
self.parse_column_definitions( elem )
else:
self.columns = self.tool_data_table.columns
# Set self.missing_index_file if the index file to
# which the tool_data_table refers does not exist.
if self.tool_data_table.missing_index_file:
self.missing_index_file = self.tool_data_table.missing_index_file
else:
self.missing_tool_data_table_name = tool_data_table_name
log.warn( "Data table named '%s' is required by tool but not configured" % tool_data_table_name )
# Options are defined by parsing tabular text data from a data file
# on disk, a dataset, or the value of another parameter
elif data_file is not None or dataset_file is not None or from_parameter is not None:
self.parse_column_definitions( elem )
if data_file is not None:
data_file = data_file.strip()
if not os.path.isabs( data_file ):
full_path = os.path.join( self.tool_param.tool.app.config.tool_data_path, data_file )
if os.path.exists( full_path ):
self.index_file = data_file
self.file_fields = self.parse_file_fields( open( full_path ) )
else:
self.missing_index_file = data_file
elif dataset_file is not None:
self.dataset_ref_name = dataset_file
self.has_dataset_dependencies = True
self.converter_safe = False
elif from_parameter is not None:
transform_lines = elem.get( 'transform_lines', None )
self.file_fields = list( load_from_parameter( from_parameter, transform_lines ) )
# Load filters
for filter_elem in elem.findall( 'filter' ):
self.filters.append( Filter.from_element( self, filter_elem ) )
# Load Validators
for validator in elem.findall( 'validator' ):
self.validators.append( validation.Validator.from_element( self.tool_param, validator ) )
if self.dataset_ref_name:
tool_param.data_ref = self.dataset_ref_name
[docs] def parse_column_definitions( self, elem ):
for column_elem in elem.findall( 'column' ):
name = column_elem.get( 'name', None )
assert name is not None, "Required 'name' attribute missing from column def"
index = column_elem.get( 'index', None )
assert index is not None, "Required 'index' attribute missing from column def"
index = int( index )
self.columns[name] = index
if index > self.largest_index:
self.largest_index = index
assert 'value' in self.columns, "Required 'value' column missing from column def"
if 'name' not in self.columns:
self.columns['name'] = self.columns['value']
[docs] def parse_file_fields( self, reader ):
rval = []
field_count = None
for line in reader:
if line.startswith( '#' ) or ( self.line_startswith and not line.startswith( self.line_startswith ) ):
continue
line = line.rstrip( "\n\r" )
if line:
fields = line.split( self.separator )
if self.largest_index < len( fields ):
if not field_count:
field_count = len( fields )
elif field_count != len( fields ):
try:
name = reader.name
except AttributeError:
name = "a configuration file"
# Perhaps this should be an error, but even a warning is useful.
log.warn( "Inconsistent number of fields (%i vs %i) in %s using separator %r, check line: %r" % \
( field_count, len( fields ), name, self.separator, line ) )
rval.append( fields )
return rval
[docs] def get_dependency_names( self ):
"""
Return the names of parameters these options depend on -- both data
and other param types.
"""
rval = []
if self.dataset_ref_name:
rval.append( self.dataset_ref_name )
for filter in self.filters:
depend = filter.get_dependency_name()
if depend:
rval.append( depend )
return rval
[docs] def get_fields( self, trans, other_values ):
if self.dataset_ref_name:
dataset = other_values.get( self.dataset_ref_name, None )
assert dataset is not None, "Required dataset '%s' missing from input" % self.dataset_ref_name
if not dataset: return [] #no valid dataset in history
# Ensure parsing dynamic options does not consume more than a megabyte worth memory.
path = dataset.file_name
file_size = os.path.getsize( path )
if os.path.getsize( path ) < 1048576:
options = self.parse_file_fields( open( path ) )
else:
# Pass just the first megabyte to parse_file_fields.
import StringIO
log.warn( "Attempting to load options from large file, reading just first megabyte" )
contents = open( path, 'r' ).read( 1048576 )
options = self.parse_file_fields( StringIO.StringIO( contents ) )
elif self.tool_data_table:
options = self.tool_data_table.get_fields()
else:
options = list( self.file_fields )
for filter in self.filters:
options = filter.filter_options( options, trans, other_values )
return options
[docs] def get_fields_by_value( self, value, trans, other_values ):
"""
Return a list of fields with column 'value' matching provided value.
"""
rval = []
val_index = self.columns[ 'value' ]
for fields in self.get_fields( trans, other_values ):
if fields[ val_index ] == value:
rval.append( fields )
return rval
[docs] def get_field_by_name_for_value( self, field_name, value, trans, other_values ):
"""
Get contents of field by name for specified value.
"""
rval = []
if isinstance( field_name, int ):
field_index = field_name
else:
assert field_name in self.columns, "Requested '%s' column missing from column def" % field_name
field_index = self.columns[ field_name ]
if not isinstance( value, list ):
value = [value]
for val in value:
for fields in self.get_fields_by_value( val, trans, other_values ):
rval.append( fields[ field_index ] )
return rval
[docs] def get_options( self, trans, other_values ):
rval = []
if self.file_fields is not None or self.tool_data_table is not None or self.dataset_ref_name is not None:
options = self.get_fields( trans, other_values )
for fields in options:
rval.append( ( fields[self.columns['name']], fields[self.columns['value']], False ) )
else:
for filter in self.filters:
rval = filter.filter_options( rval, trans, other_values )
return rval
[docs] def column_spec_to_index( self, column_spec ):
"""
Convert a column specification (as read from the config file), to an
index. A column specification can just be a number, a column name, or
a column alias.
"""
# Name?
if column_spec in self.columns:
return self.columns[column_spec]
# Int?
return int( column_spec )