Source code for galaxy.tools.test

import os
import os.path
import galaxy.tools.parameters.basic
import galaxy.tools.parameters.grouping
from galaxy.util import string_as_bool
try:
    from nose.tools import nottest
except ImportError:
    nottest = lambda x: x
import logging

log = logging.getLogger( __name__ )

DEFAULT_FTYPE = 'auto'
DEFAULT_DBKEY = 'hg17'
DEFAULT_INTERACTOR = "api"  # Default mechanism test code uses for interacting with Galaxy instance.
DEFAULT_MAX_SECS = 120


@nottest
[docs]def parse_tests(tool, tests_source): """ Build ToolTestBuilder objects for each "<test>" elements and return default interactor (if any). """ default_interactor = os.environ.get( 'GALAXY_TEST_DEFAULT_INTERACTOR', DEFAULT_INTERACTOR ) tests_dict = tests_source.parse_tests_to_dict() tests_default_interactor = tests_dict.get( 'interactor', default_interactor ) tests = [] for i, test_dict in enumerate(tests_dict.get('tests', [])): test = ToolTestBuilder( tool, test_dict, i, default_interactor=tests_default_interactor ) tests.append( test ) return tests
[docs]class ToolTestBuilder( object ): """ Encapsulates information about a tool test, and allows creation of a dynamic TestCase class (the unittest framework is very class oriented, doing dynamic tests in this way allows better integration) """ def __init__( self, tool, test_dict, i, default_interactor ): name = test_dict.get( 'name', 'Test-%d' % (i + 1) ) maxseconds = int( test_dict.get( 'maxseconds', DEFAULT_MAX_SECS ) ) self.tool = tool self.name = name self.maxseconds = maxseconds self.required_files = [] self.inputs = [] self.outputs = [] # By default do not making assertions on number of outputs - but to # test filtering allow explicitly state number of outputs. self.num_outputs = None self.error = False self.exception = None self.__handle_test_dict( test_dict, i, default_interactor )
[docs] def test_data( self ): """ Iterator over metadata representing the required files for upload. """ return test_data_iter( self.required_files )
def __matching_case_for_value( self, cond, declared_value ): test_param = cond.test_param if isinstance(test_param, galaxy.tools.parameters.basic.BooleanToolParameter): if declared_value is None: # No explicit value for param in test case, determine from default query_value = test_param.checked else: # Test case supplied value, check cases against this. query_value = string_as_bool( declared_value ) matches_declared_value = lambda case_value: string_as_bool(case_value) == query_value elif isinstance(test_param, galaxy.tools.parameters.basic.SelectToolParameter): if declared_value is not None: # Test case supplied explicit value to check against. matches_declared_value = lambda case_value: case_value == declared_value elif test_param.static_options: # No explicit value in test case, not much to do if options are dynamic but # if static options are available can find the one specified as default or # fallback on top most option (like GUI). for (name, value, selected) in test_param.static_options: if selected: default_option = name else: first_option = test_param.static_options[0] first_option_value = first_option[1] default_option = first_option_value matches_declared_value = lambda case_value: case_value == default_option else: # No explicit value for this param and cannot determine a # default - give up. Previously this would just result in a key # error exception. msg = "Failed to find test parameter value specification required for conditional %s" % cond.name raise Exception( msg ) # Check the tool's defined cases against predicate to determine # selected or default. for i, case in enumerate( cond.cases ): if matches_declared_value( case.value ): return case else: msg_template = "%s - Failed to find case matching value (%s) for test parameter specification for conditional %s. Remainder of test behavior is unspecified." msg = msg_template % ( self.tool.id, declared_value, cond.name ) log.info( msg ) def __split_if_str( self, value ): split = isinstance(value, str) if split: value = value.split(",") return value def __handle_test_dict( self, test_dict, i, default_interactor ): try: # Mechanism test code uses for interacting with Galaxy instance, # until 'api' is the default switch this to API to use its new # features. Once 'api' is the default set to 'twill' to use legacy # features or workarounds. self.interactor = test_dict.get( 'interactor', default_interactor ) self.inputs = self.__process_raw_inputs( self.tool.inputs, test_dict["inputs"] ) self.outputs = test_dict["outputs"] self.output_collections = test_dict["output_collections"] num_outputs = test_dict.get( 'expect_num_outputs', None ) if num_outputs: num_outputs = int( num_outputs ) self.num_outputs = num_outputs self.command_line = test_dict.get("command", None) self.stdout = test_dict.get("stdout", None) self.stderr = test_dict.get("stderr", None) self.expect_exit_code = test_dict.get("expect_exit_code", None) self.expect_failure = test_dict.get("expect_failure", False) self.md5 = test_dict.get("md5", None) except Exception, e: self.error = True self.exception = e def __process_raw_inputs( self, tool_inputs, raw_inputs, parent_context=None ): """ Recursively expand flat list of inputs into "tree" form of flat list (| using to nest to new levels) structure and expand dataset information as proceeding to populate self.required_files. """ parent_context = parent_context or RootParamContext() expanded_inputs = {} for key, value in tool_inputs.items(): if isinstance( value, galaxy.tools.parameters.grouping.Conditional ): cond_context = ParamContext( name=value.name, parent_context=parent_context ) case_context = ParamContext( name=value.test_param.name, parent_context=cond_context ) raw_input = case_context.extract_value( raw_inputs ) case_value = raw_input[ 1 ] if raw_input else None case = self.__matching_case_for_value( value, case_value ) if case: for input_name, input_value in case.inputs.items(): case_inputs = self.__process_raw_inputs( { input_name: input_value }, raw_inputs, parent_context=cond_context ) expanded_inputs.update( case_inputs ) expanded_case_value = self.__split_if_str( case.value ) if case_value is not None: # A bit tricky here - we are growing inputs with value # that may be implicit (i.e. not defined by user just # a default defined in tool). So we do not want to grow # expanded_inputs and risk repeat block viewing this # as a new instance with value defined and hence enter # an infinite loop - hence the "case_value is not None" # check. expanded_inputs[ case_context.for_state() ] = expanded_case_value elif isinstance( value, galaxy.tools.parameters.grouping.Section ): context = ParamContext( name=value.name, parent_context=parent_context ) for r_name, r_value in value.inputs.iteritems(): expanded_input = self.__process_raw_inputs( { context.for_state(): r_value }, raw_inputs, parent_context=context ) if expanded_input: expanded_inputs.update( expanded_input ) elif isinstance( value, galaxy.tools.parameters.grouping.Repeat ): repeat_index = 0 while True: context = ParamContext( name=value.name, index=repeat_index, parent_context=parent_context ) updated = False for r_name, r_value in value.inputs.iteritems(): expanded_input = self.__process_raw_inputs( { context.for_state(): r_value }, raw_inputs, parent_context=context ) if expanded_input: expanded_inputs.update( expanded_input ) updated = True if not updated: break repeat_index += 1 else: context = ParamContext( name=value.name, parent_context=parent_context ) raw_input = context.extract_value( raw_inputs ) if raw_input: (name, param_value, param_extra) = raw_input param_value = self.__split_if_str( param_value ) if isinstance( value, galaxy.tools.parameters.basic.DataToolParameter ): if not isinstance(param_value, list): param_value = [ param_value ] map( lambda v: self.__add_uploaded_dataset( context.for_state(), v, param_extra, value ), param_value ) processed_value = param_value elif isinstance( value, galaxy.tools.parameters.basic.DataCollectionToolParameter ): assert 'collection' in param_extra collection_def = param_extra[ 'collection' ] for ( name, value, extra ) in collection_def.collect_inputs(): require_file( name, value, extra, self.required_files ) processed_value = collection_def elif isinstance( value, galaxy.tools.parameters.basic.SelectToolParameter ) and hasattr( value, 'static_options' ): # Tests may specify values as either raw value or the value # as they appear in the list - the API doesn't and shouldn't # accept the text value - so we need to convert the text # into the form value. def process_param_value( param_value ): found_value = False value_for_text = None if value.static_options: for (text, opt_value, selected) in value.static_options: if param_value == opt_value: found_value = True if value_for_text is None and param_value == text: value_for_text = opt_value if not found_value and value_for_text is not None: processed_value = value_for_text else: processed_value = param_value return processed_value # Do replacement described above for lists or singleton # values. if isinstance( param_value, list ): processed_value = map( process_param_value, param_value ) else: processed_value = process_param_value( param_value ) else: processed_value = param_value expanded_inputs[ context.for_state() ] = processed_value return expanded_inputs def __add_uploaded_dataset( self, name, value, extra, input_parameter ): if value is None: assert input_parameter.optional, '%s is not optional. You must provide a valid filename.' % name return value return require_file( name, value, extra, self.required_files )
@nottest
[docs]def test_data_iter( required_files ): for fname, extra in required_files: data_dict = dict( fname=fname, metadata=extra.get( 'metadata', [] ), composite_data=extra.get( 'composite_data', [] ), ftype=extra.get( 'ftype', DEFAULT_FTYPE ), dbkey=extra.get( 'dbkey', DEFAULT_DBKEY ), ) edit_attributes = extra.get( 'edit_attributes', [] ) # currently only renaming is supported for edit_att in edit_attributes: if edit_att.get( 'type', None ) == 'name': new_name = edit_att.get( 'value', None ) assert new_name, 'You must supply the new dataset name as the value tag of the edit_attributes tag' data_dict['name'] = new_name else: raise Exception( 'edit_attributes type (%s) is unimplemented' % edit_att.get( 'type', None ) ) yield data_dict
[docs]def require_file( name, value, extra, required_files ): if ( value, extra ) not in required_files: required_files.append( ( value, extra ) ) # these files will be uploaded name_change = [ att for att in extra.get( 'edit_attributes', [] ) if att.get( 'type' ) == 'name' ] if name_change: name_change = name_change[-1].get( 'value' ) # only the last name change really matters value = name_change # change value for select to renamed uploaded file for e.g. composite dataset else: for end in [ '.zip', '.gz' ]: if value.endswith( end ): value = value[ :-len( end ) ] break value = os.path.basename( value ) # if uploading a file in a path other than root of test-data return value
[docs]class ParamContext(object): def __init__( self, name, index=None, parent_context=None ): self.parent_context = parent_context self.name = name self.index = None if index is None else int( index )
[docs] def for_state( self ): name = self.name if self.index is None else "%s_%d" % ( self.name, self.index ) parent_for_state = self.parent_context.for_state() if parent_for_state: return "%s|%s" % ( parent_for_state, name ) else: return name
def __str__( self ): return "Context[for_state=%s]" % self.for_state()
[docs] def param_names( self ): for parent_context_param in self.parent_context.param_names(): if self.index is not None: yield "%s|%s_%d" % ( parent_context_param, self.name, self.index ) else: yield "%s|%s" % ( parent_context_param, self.name ) if self.index is not None: yield "%s_%d" % ( self.name, self.index ) else: yield self.name
[docs] def extract_value( self, raw_inputs ): for param_name in self.param_names(): value = self.__raw_param_found( param_name, raw_inputs) if value: return value return None
def __raw_param_found( self, param_name, raw_inputs ): index = None for i, raw_input in enumerate( raw_inputs ): if raw_input[ 0 ] == param_name: index = i if index is not None: raw_input = raw_inputs[ index ] del raw_inputs[ index ] return raw_input else: return None
[docs]class RootParamContext(object): def __init__( self ): pass
[docs] def for_state( self ): return ""
[docs] def param_names( self ): return []
[docs] def get_index( self ): return 0