Source code for galaxy.tools.test

import os
import os.path
import uuid
from parameters import basic
from parameters import grouping
from galaxy.util import string_as_bool
try:
    from nose.tools import nottest
except ImportError:
    nottest = lambda x: x
import logging

log = logging.getLogger( __name__ )

DEFAULT_FTYPE = 'auto'
DEFAULT_DBKEY = 'hg17'
DEFAULT_INTERACTOR = "api"  # Default mechanism test code uses for interacting with Galaxy instance.
DEFAULT_MAX_SECS = 120


@nottest
[docs]def parse_tests_elem(tool, tests_elem): """ Build ToolTestBuilder objects for each "<test>" elements and return default interactor (if any). """ default_interactor = os.environ.get( 'GALAXY_TEST_DEFAULT_INTERACTOR', DEFAULT_INTERACTOR ) tests_default_interactor = tests_elem.get( 'interactor', default_interactor ) tests = [] for i, test_elem in enumerate( tests_elem.findall( 'test' ) ): test = ToolTestBuilder( tool, test_elem, i, default_interactor=tests_default_interactor ) tests.append( test ) return tests
[docs]class ToolTestBuilder( object ): """ Encapsulates information about a tool test, and allows creation of a dynamic TestCase class (the unittest framework is very class oriented, doing dynamic tests in this way allows better integration) """ def __init__( self, tool, test_elem, i, default_interactor ): name = test_elem.get( 'name', 'Test-%d' % (i + 1) ) maxseconds = int( test_elem.get( 'maxseconds', DEFAULT_MAX_SECS ) ) self.tool = tool self.name = name self.maxseconds = maxseconds self.required_files = [] self.inputs = [] self.outputs = [] self.num_outputs = None # By default do not making assertions on # number of outputs - but to test filtering # allow explicitly state number of outputs. self.error = False self.exception = None self.__parse_elem( test_elem, i, default_interactor )
[docs] def test_data( self ): """ Iterator over metadata representing the required files for upload. """ return test_data_iter( self.required_files )
def __matching_case_for_value( self, cond, declared_value ): test_param = cond.test_param if isinstance(test_param, basic.BooleanToolParameter): if declared_value is None: # No explicit value for param in test case, determine from default query_value = test_param.checked else: # Test case supplied value, check cases against this. query_value = string_as_bool( declared_value ) matches_declared_value = lambda case_value: string_as_bool(case_value) == query_value elif isinstance(test_param, basic.SelectToolParameter): if declared_value is not None: # Test case supplied explicit value to check against. matches_declared_value = lambda case_value: case_value == declared_value elif test_param.static_options: # No explicit value in test case, not much to do if options are dynamic but # if static options are available can find the one specified as default or # fallback on top most option (like GUI). for (name, value, selected) in test_param.static_options: if selected: default_option = name else: first_option = test_param.static_options[0] first_option_value = first_option[1] default_option = first_option_value matches_declared_value = lambda case_value: case_value == default_option else: # No explicit value for this param and cannot determine a # default - give up. Previously this would just result in a key # error exception. msg = "Failed to find test parameter value specification required for conditional %s" % cond.name raise Exception( msg ) # Check the tool's defined cases against predicate to determine # selected or default. for i, case in enumerate( cond.cases ): if matches_declared_value( case.value ): return case else: msg_template = "%s - Failed to find case matching value (%s) for test parameter specification for conditional %s. Remainder of test behavior is unspecified." msg = msg_template % ( self.tool.id, declared_value, cond.name ) log.info( msg ) def __split_if_str( self, value ): split = isinstance(value, str) if split: value = value.split(",") return value def __parse_elem( self, test_elem, i, default_interactor ): try: # Mechanism test code uses for interacting with Galaxy instance, # until 'api' is the default switch this to API to use its new # features. Once 'api' is the default set to 'twill' to use legacy # features or workarounds. self.interactor = test_elem.get( 'interactor', default_interactor ) self.__preprocess_input_elems( test_elem ) self.__parse_inputs_elems( test_elem, i ) self.outputs = parse_output_elems( test_elem ) num_outputs = test_elem.get( 'expect_num_outputs', None ) if num_outputs: num_outputs = int( num_outputs ) self.num_outputs = num_outputs except Exception, e: self.error = True self.exception = e def __preprocess_input_elems( self, test_elem ): expand_input_elems( test_elem ) def __parse_inputs_elems( self, test_elem, i ): raw_inputs = [] for param_elem in test_elem.findall( "param" ): name, value, attrib = parse_param_elem( param_elem, i ) raw_inputs.append( ( name, value, attrib ) ) self.inputs = self.__process_raw_inputs( self.tool.inputs, raw_inputs ) def __process_raw_inputs( self, tool_inputs, raw_inputs, parent_context=None ): """ Recursively expand flat list of inputs into "tree" form of flat list (| using to nest to new levels) structure and expand dataset information as proceeding to populate self.required_files. """ parent_context = parent_context or RootParamContext() expanded_inputs = {} for key, value in tool_inputs.items(): if isinstance( value, grouping.Conditional ): cond_context = ParamContext( name=value.name, parent_context=parent_context ) case_context = ParamContext( name=value.test_param.name, parent_context=cond_context ) raw_input = case_context.extract_value( raw_inputs ) case_value = raw_input[ 1 ] if raw_input else None case = self.__matching_case_for_value( value, case_value ) if case: for input_name, input_value in case.inputs.items(): case_inputs = self.__process_raw_inputs( { input_name: input_value }, raw_inputs, parent_context=cond_context ) expanded_inputs.update( case_inputs ) expanded_case_value = self.__split_if_str( case.value ) if case_value is not None: # A bit tricky here - we are growing inputs with value # that may be implicit (i.e. not defined by user just # a default defined in tool). So we do not want to grow # expanded_inputs and risk repeat block viewing this # as a new instance with value defined and hence enter # an infinite loop - hence the "case_value is not None" # check. expanded_inputs[ case_context.for_state() ] = expanded_case_value elif isinstance( value, grouping.Repeat ): repeat_index = 0 while True: context = ParamContext( name=value.name, index=repeat_index, parent_context=parent_context ) updated = False for r_name, r_value in value.inputs.iteritems(): expanded_input = self.__process_raw_inputs( { context.for_state() : r_value }, raw_inputs, parent_context=context ) if expanded_input: expanded_inputs.update( expanded_input ) updated = True if not updated: break repeat_index += 1 else: context = ParamContext( name=value.name, parent_context=parent_context ) raw_input = context.extract_value( raw_inputs ) if raw_input: (name, param_value, param_extra) = raw_input param_value = self.__split_if_str( param_value ) if isinstance( value, basic.DataToolParameter ): if not isinstance(param_value, list): param_value = [ param_value ] map( lambda v: self.__add_uploaded_dataset( context.for_state(), v, param_extra, value ), param_value ) processed_value = param_value elif isinstance( value, basic.DataCollectionToolParameter ): assert 'collection' in param_extra collection_def = param_extra[ 'collection' ] for ( name, value, extra ) in collection_def.collect_inputs(): require_file( name, value, extra, self.required_files ) processed_value = collection_def elif isinstance( value, basic.SelectToolParameter ) and hasattr( value, 'static_options' ): # Tests may specify values as either raw value or the value # as they appear in the list - the API doesn't and shouldn't # accept the text value - so we need to convert the text # into the form value. def process_param_value( param_value ): found_value = False value_for_text = None if value.static_options: for (text, opt_value, selected) in value.static_options: if param_value == opt_value: found_value = True if value_for_text is None and param_value == text: value_for_text = opt_value if not found_value and value_for_text is not None: processed_value = value_for_text else: processed_value = param_value return processed_value # Do replacement described above for lists or singleton # values. if isinstance( param_value, list ): processed_value = map( process_param_value, param_value ) else: processed_value = process_param_value( param_value ) else: processed_value = param_value expanded_inputs[ context.for_state() ] = processed_value return expanded_inputs def __add_uploaded_dataset( self, name, value, extra, input_parameter ): if value is None: assert input_parameter.optional, '%s is not optional. You must provide a valid filename.' % name return value return require_file( name, value, extra, self.required_files )
@nottest
[docs]def test_data_iter( required_files ): for fname, extra in required_files: data_dict = dict( fname=fname, metadata=extra.get( 'metadata', [] ), composite_data=extra.get( 'composite_data', [] ), ftype=extra.get( 'ftype', DEFAULT_FTYPE ), dbkey=extra.get( 'dbkey', DEFAULT_DBKEY ), ) edit_attributes = extra.get( 'edit_attributes', [] ) #currently only renaming is supported for edit_att in edit_attributes: if edit_att.get( 'type', None ) == 'name': new_name = edit_att.get( 'value', None ) assert new_name, 'You must supply the new dataset name as the value tag of the edit_attributes tag' data_dict['name'] = new_name else: raise Exception( 'edit_attributes type (%s) is unimplemented' % edit_att.get( 'type', None ) ) yield data_dict
[docs]def require_file( name, value, extra, required_files ): if ( value, extra ) not in required_files: required_files.append( ( value, extra ) ) # these files will be uploaded name_change = [ att for att in extra.get( 'edit_attributes', [] ) if att.get( 'type' ) == 'name' ] if name_change: name_change = name_change[-1].get( 'value' ) # only the last name change really matters value = name_change # change value for select to renamed uploaded file for e.g. composite dataset else: for end in [ '.zip', '.gz' ]: if value.endswith( end ): value = value[ :-len( end ) ] break value = os.path.basename( value ) # if uploading a file in a path other than root of test-data return value
[docs]def parse_param_elem( param_elem, i=0 ): attrib = dict( param_elem.attrib ) if 'values' in attrib: value = attrib[ 'values' ].split( ',' ) elif 'value' in attrib: value = attrib['value'] else: value = None attrib['children'] = list( param_elem.getchildren() ) if attrib['children']: # At this time, we can assume having children only # occurs on DataToolParameter test items but this could # change and would cause the below parsing to change # based upon differences in children items attrib['metadata'] = [] attrib['composite_data'] = [] attrib['edit_attributes'] = [] # Composite datasets need to be renamed uniquely composite_data_name = None for child in attrib['children']: if child.tag == 'composite_data': attrib['composite_data'].append( child ) if composite_data_name is None: # Generate a unique name; each test uses a # fresh history. composite_data_name = '_COMPOSITE_RENAMED_t%d_%s' \ % ( i, uuid.uuid1().hex ) elif child.tag == 'metadata': attrib['metadata'].append( child ) elif child.tag == 'metadata': attrib['metadata'].append( child ) elif child.tag == 'edit_attributes': attrib['edit_attributes'].append( child ) elif child.tag == 'collection': attrib[ 'collection' ] = TestCollectionDef( child ) if composite_data_name: # Composite datasets need implicit renaming; # inserted at front of list so explicit declarations # take precedence attrib['edit_attributes'].insert( 0, { 'type': 'name', 'value': composite_data_name } ) name = attrib.pop( 'name' ) return ( name, value, attrib )
[docs]def parse_output_elems( test_elem ): outputs = [] for output_elem in test_elem.findall( "output" ): name, file, attributes = __parse_output_elem( output_elem ) outputs.append( ( name, file, attributes ) ) return outputs
def __parse_output_elem( output_elem ): attrib = dict( output_elem.attrib ) name = attrib.pop( 'name', None ) if name is None: raise Exception( "Test output does not have a 'name'" ) file, attributes = __parse_test_attributes( output_elem, attrib ) primary_datasets = {} for primary_elem in ( output_elem.findall( "discovered_dataset" ) or [] ): primary_attrib = dict( primary_elem.attrib ) designation = primary_attrib.pop( 'designation', None ) if designation is None: raise Exception( "Test primary dataset does not have a 'designation'" ) primary_datasets[ designation ] = __parse_test_attributes( primary_elem, primary_attrib ) attributes[ "primary_datasets" ] = primary_datasets return name, file, attributes def __parse_test_attributes( output_elem, attrib ): assert_list = __parse_assert_list( output_elem ) file = attrib.pop( 'file', None ) # File no longer required if an list of assertions was present. attributes = {} # Method of comparison attributes['compare'] = attrib.pop( 'compare', 'diff' ).lower() # Number of lines to allow to vary in logs (for dates, etc) attributes['lines_diff'] = int( attrib.pop( 'lines_diff', '0' ) ) # Allow a file size to vary if sim_size compare attributes['delta'] = int( attrib.pop( 'delta', '10000' ) ) attributes['sort'] = string_as_bool( attrib.pop( 'sort', False ) ) extra_files = [] if 'ftype' in attrib: attributes['ftype'] = attrib['ftype'] for extra in output_elem.findall( 'extra_files' ): extra_files.append( __parse_extra_files_elem( extra ) ) metadata = {} for metadata_elem in output_elem.findall( 'metadata' ): metadata[ metadata_elem.get('name') ] = metadata_elem.get( 'value' ) if not (assert_list or file or extra_files or metadata): raise Exception( "Test output defines nothing to check (e.g. must have a 'file' check against, assertions to check, etc...)") attributes['assert_list'] = assert_list attributes['extra_files'] = extra_files attributes['metadata'] = metadata return file, attributes def __parse_assert_list( output_elem ): assert_elem = output_elem.find("assert_contents") assert_list = None # Trying to keep testing patch as localized as # possible, this function should be relocated # somewhere more conventional. def convert_elem(elem): """ Converts and XML element to a dictionary format, used by assertion checking code. """ tag = elem.tag attributes = dict( elem.attrib ) child_elems = list( elem.getchildren() ) converted_children = [] for child_elem in child_elems: converted_children.append( convert_elem(child_elem) ) return {"tag": tag, "attributes": attributes, "children": converted_children} if assert_elem is not None: assert_list = [] for assert_child in list(assert_elem): assert_list.append(convert_elem(assert_child)) return assert_list def __parse_extra_files_elem( extra ): # File or directory, when directory, compare basename # by basename extra_type = extra.get( 'type', 'file' ) extra_name = extra.get( 'name', None ) assert extra_type == 'directory' or extra_name is not None, \ 'extra_files type (%s) requires a name attribute' % extra_type extra_value = extra.get( 'value', None ) assert extra_value is not None, 'extra_files requires a value attribute' extra_attributes = {} extra_attributes['compare'] = extra.get( 'compare', 'diff' ).lower() extra_attributes['delta'] = extra.get( 'delta', '0' ) extra_attributes['lines_diff'] = int( extra.get( 'lines_diff', '0' ) ) extra_attributes['sort'] = string_as_bool( extra.get( 'sort', False ) ) return extra_type, extra_value, extra_name, extra_attributes
[docs]class ParamContext(object): def __init__( self, name, index=None, parent_context=None ): self.parent_context = parent_context self.name = name self.index = None if index is None else int( index )
[docs] def for_state( self ): name = self.name if self.index is None else "%s_%d" % ( self.name, self.index ) parent_for_state = self.parent_context.for_state() if parent_for_state: return "%s|%s" % ( parent_for_state, name ) else: return name
def __str__( self ): return "Context[for_state=%s]" % self.for_state()
[docs] def param_names( self ): for parent_context_param in self.parent_context.param_names(): if self.index is not None: yield "%s|%s_%d" % ( parent_context_param, self.name, self.index ) else: yield "%s|%s" % ( parent_context_param, self.name ) if self.index is not None: yield "%s_%d" % ( self.name, self.index ) else: yield self.name
[docs] def extract_value( self, raw_inputs ): for param_name in self.param_names(): value = self.__raw_param_found( param_name, raw_inputs) if value: return value return None
def __raw_param_found( self, param_name, raw_inputs ): index = None for i, raw_input in enumerate( raw_inputs ): if raw_input[ 0 ] == param_name: index = i if index is not None: raw_input = raw_inputs[ index ] del raw_inputs[ index ] return raw_input else: return None
[docs]class RootParamContext(object): def __init__( self ): pass
[docs] def for_state( self ): return ""
[docs] def param_names( self ): return []
[docs] def get_index( self ): return 0
[docs]class TestCollectionDef( object ): def __init__( self, elem ): self.elements = [] attrib = dict( elem.attrib ) self.collection_type = attrib[ "type" ] self.name = attrib.get( "name", "Unnamed Collection" ) for element in elem.findall( "element" ): element_attrib = dict( element.attrib ) element_identifier = element_attrib[ "name" ] nested_collection_elem = element.find( "collection" ) if nested_collection_elem: self.elements.append( ( element_identifier, TestCollectionDef( nested_collection_elem ) ) ) else: self.elements.append( ( element_identifier, parse_param_elem( element ) ) )
[docs] def collect_inputs( self ): inputs = [] for element in self.elements: value = element[ 1 ] if isinstance( value, TestCollectionDef ): inputs.extend( value.collect_inputs() ) else: inputs.append( value ) return inputs
[docs]def expand_input_elems( root_elem, prefix="" ): __append_prefix_to_params( root_elem, prefix ) repeat_elems = root_elem.findall( 'repeat' ) indices = {} for repeat_elem in repeat_elems: name = repeat_elem.get( "name" ) if name not in indices: indices[ name ] = 0 index = 0 else: index = indices[ name ] + 1 indices[ name ] = index new_prefix = __prefix_join( prefix, name, index=index ) expand_input_elems( repeat_elem, new_prefix ) __pull_up_params( root_elem, repeat_elem ) root_elem.remove( repeat_elem ) cond_elems = root_elem.findall( 'conditional' ) for cond_elem in cond_elems: new_prefix = __prefix_join( prefix, cond_elem.get( "name" ) ) expand_input_elems( cond_elem, new_prefix ) __pull_up_params( root_elem, cond_elem ) root_elem.remove( cond_elem )
def __append_prefix_to_params( elem, prefix ): for param_elem in elem.findall( 'param' ): param_elem.set( "name", __prefix_join( prefix, param_elem.get( "name" ) ) ) def __pull_up_params( parent_elem, child_elem ): for param_elem in child_elem.findall( 'param' ): parent_elem.append( param_elem ) child_elem.remove( param_elem ) def __prefix_join( prefix, name, index=None ): name = name if index is None else "%s_%d" % ( name, index ) return name if not prefix else "%s|%s" % ( prefix, name )