Source code for galaxy.tools

"""
Classes encapsulating galaxy tools and tool configuration.
"""

import binascii
import glob
import json
import logging
import os
import pipes
import re
import shutil
import sys
import string
import tarfile
import tempfile
import threading
import traceback
import types
import urllib

from math import isinf

from galaxy import eggs
eggs.require( "MarkupSafe" )  # MarkupSafe must load before mako
eggs.require( "Mako" )
eggs.require( "elementtree" )
eggs.require( "Paste" )
eggs.require( "SQLAlchemy >= 0.4" )

from cgi import FieldStorage
from elementtree import ElementTree
from mako.template import Template
from paste import httpexceptions
from sqlalchemy import and_

from galaxy import jobs, model
from galaxy.jobs.error_level import StdioErrorLevel
from galaxy.datatypes.metadata import JobExternalOutputMetadataWrapper
from galaxy import exceptions
from galaxy.jobs import ParallelismInfo
from galaxy.tools import watcher
from galaxy.tools.actions import DefaultToolAction
from galaxy.tools.actions.data_source import DataSourceToolAction
from galaxy.tools.actions.data_manager import DataManagerToolAction
from galaxy.tools.deps import build_dependency_manager
from galaxy.tools.deps.requirements import parse_requirements_from_xml
from galaxy.tools.parameters import check_param, params_from_strings, params_to_strings
from galaxy.tools.parameters import output_collect
from galaxy.tools.parameters.basic import (BaseURLToolParameter,
                                           DataToolParameter, HiddenToolParameter, LibraryDatasetToolParameter,
                                           SelectToolParameter, ToolParameter, UnvalidatedValue,
                                           IntegerToolParameter, FloatToolParameter)
from galaxy.tools.parameters.grouping import Conditional, ConditionalWhen, Repeat, UploadDataset
from galaxy.tools.parameters.input_translation import ToolInputTranslator
from galaxy.tools.parameters.output import ToolOutputActionGroup
from galaxy.tools.parameters.validation import LateValidationError
from galaxy.tools.filters import FilterFactory
from galaxy.tools.test import parse_tests_elem
from galaxy.util import listify, parse_xml, rst_to_html, string_as_bool, string_to_object, xml_text, xml_to_string
from galaxy.tools.parameters.meta import expand_meta_parameters
from galaxy.util.bunch import Bunch
from galaxy.util.expressions import ExpressionContext
from galaxy.util.hash_util import hmac_new
from galaxy.util.none_like import NoneDataset
from galaxy.util.odict import odict
from galaxy.util.template import fill_template
from galaxy.web import url_for
from galaxy.web.form_builder import SelectField
from galaxy.web.framework.helpers import escape
from galaxy.model.item_attrs import Dictifiable
from galaxy.model import Workflow
from tool_shed.util import common_util
from tool_shed.util import shed_util_common as suc
from .loader import load_tool, template_macro_params, raw_tool_xml_tree, imported_macro_paths
from .execute import execute as execute_job
from .wrappers import (
    ToolParameterValueWrapper,
    RawObjectWrapper,
    LibraryDatasetValueWrapper,
    InputValueWrapper,
    SelectToolParameterWrapper,
    DatasetFilenameWrapper,
    DatasetListWrapper,
    DatasetCollectionWrapper,
)


log = logging.getLogger( __name__ )

WORKFLOW_PARAMETER_REGULAR_EXPRESSION = re.compile( '''\$\{.+?\}''' )

JOB_RESOURCE_CONDITIONAL_XML = """<conditional name="__job_resource">
    <param name="__job_resource__select" type="select" label="Job Resource Parameters">
        <option value="no">Use default job resource parameters</option>
        <option value="yes">Specify job resource parameters</option>
    </param>
    <when value="no"></when>
    <when value="yes">
    </when>
</conditional>"""


[docs]class ToolNotFoundException( Exception ): pass
[docs]def to_dict_helper( obj, kwargs ): """ Helper function that provides the appropriate kwargs to to_dict an object. """ # Label.to_dict cannot have kwargs. if isinstance( obj, ToolSectionLabel ): kwargs = {} return obj.to_dict( **kwargs )
[docs]class ToolBox( object, Dictifiable ): """Container for a collection of tools""" def __init__( self, config_filenames, tool_root_dir, app ): """ Create a toolbox from the config files named by `config_filenames`, using `tool_root_dir` as the base directory for finding individual tool config files. """ # The shed_tool_confs list contains dictionaries storing information about the tools defined in each # shed-related shed_tool_conf.xml file. self.shed_tool_confs = [] self.tools_by_id = {} self.workflows_by_id = {} # In-memory dictionary that defines the layout of the tool panel. self.tool_panel = odict() self.index = 0 self.data_manager_tools = odict() # File that contains the XML section and tool tags from all tool panel config files integrated into a # single file that defines the tool panel layout. This file can be changed by the Galaxy administrator # (in a way similar to the single tool_conf.xml file in the past) to alter the layout of the tool panel. self.integrated_tool_panel_config = app.config.integrated_tool_panel_config # In-memory dictionary that defines the layout of the tool_panel.xml file on disk. self.integrated_tool_panel = odict() self.integrated_tool_panel_config_has_contents = os.path.exists( self.integrated_tool_panel_config ) and os.stat( self.integrated_tool_panel_config ).st_size > 0 if self.integrated_tool_panel_config_has_contents: self.load_integrated_tool_panel_keys() # The following refers to the tool_path config setting for backward compatibility. The shed-related # (e.g., shed_tool_conf.xml) files include the tool_path attribute within the <toolbox> tag. self.tool_root_dir = tool_root_dir self.app = app self.tool_watcher = watcher.get_watcher( self, app.config ) self.filter_factory = FilterFactory( self ) self.init_dependency_manager() config_filenames = listify( config_filenames ) for config_filename in config_filenames: if os.path.isdir( config_filename ): directory_contents = sorted( os.listdir( config_filename ) ) directory_config_files = [ config_file for config_file in directory_contents if config_file.endswith( ".xml" ) ] config_filenames.remove( config_filename ) config_filenames.extend( directory_config_files ) for config_filename in config_filenames: try: self.init_tools( config_filename ) except: log.exception( "Error loading tools defined in config %s", config_filename ) if self.app.name == 'galaxy' and self.integrated_tool_panel_config_has_contents: # Load self.tool_panel based on the order in self.integrated_tool_panel. self.load_tool_panel() if app.config.update_integrated_tool_panel: # Write the current in-memory integrated_tool_panel to the integrated_tool_panel.xml file. # This will cover cases where the Galaxy administrator manually edited one or more of the tool panel # config files, adding or removing locally developed tools or workflows. The value of integrated_tool_panel # will be False when things like functional tests are the caller. self.fix_integrated_tool_panel_dict() self.write_integrated_tool_panel_config_file()
[docs] def fix_integrated_tool_panel_dict( self ): # HACK: instead of fixing after the fact, I suggest some combination of: # 1) adjusting init_tools() and called methods to get this right # 2) redesigning the code and/or data structure used to read/write integrated_tool_panel.xml for key, value in self.integrated_tool_panel.iteritems(): if isinstance( value, ToolSection ): for section_key, section_value in value.elems.iteritems(): if section_value is None: if isinstance( section_value, Tool ): tool_id = section_key[5:] value.elems[section_key] = self.tools_by_id.get( tool_id ) elif isinstance( section_value, Workflow ): workflow_id = section_key[9:] value.elems[section_key] = self.workflows_by_id.get( workflow_id )
[docs] def init_tools( self, config_filename ): """ Read the configuration file and load each tool. The following tags are currently supported: .. raw:: xml <toolbox> <tool file="data_source/upload.xml"/> # tools outside sections <label text="Basic Tools" id="basic_tools" /> # labels outside sections <workflow id="529fd61ab1c6cc36" /> # workflows outside sections <section name="Get Data" id="getext"> # sections <tool file="data_source/biomart.xml" /> # tools inside sections <label text="In Section" id="in_section" /> # labels inside sections <workflow id="adb5f5c93f827949" /> # workflows inside sections </section> </toolbox> """ if self.app.config.get_bool( 'enable_tool_tags', False ): log.info("removing all tool tag associations (" + str( self.sa_session.query( self.app.model.ToolTagAssociation ).count() ) + ")" ) self.sa_session.query( self.app.model.ToolTagAssociation ).delete() self.sa_session.flush() log.info( "Parsing the tool configuration %s" % config_filename ) tree = parse_xml( config_filename ) root = tree.getroot() tool_path = root.get( 'tool_path' ) if tool_path: # We're parsing a shed_tool_conf file since we have a tool_path attribute. parsing_shed_tool_conf = True # Keep an in-memory list of xml elements to enable persistence of the changing tool config. config_elems = [] else: parsing_shed_tool_conf = False tool_path = self.__resolve_tool_path(tool_path, config_filename) # Only load the panel_dict under certain conditions. load_panel_dict = not self.integrated_tool_panel_config_has_contents for _, elem in enumerate( root ): index = self.index self.index += 1 if parsing_shed_tool_conf: config_elems.append( elem ) if elem.tag == 'tool': self.load_tool_tag_set( elem, self.tool_panel, self.integrated_tool_panel, tool_path, load_panel_dict, guid=elem.get( 'guid' ), index=index ) elif elem.tag == 'workflow': self.load_workflow_tag_set( elem, self.tool_panel, self.integrated_tool_panel, load_panel_dict, index=index ) elif elem.tag == 'section': self.load_section_tag_set( elem, tool_path, load_panel_dict, index=index ) elif elem.tag == 'label': self.load_label_tag_set( elem, self.tool_panel, self.integrated_tool_panel, load_panel_dict, index=index ) elif elem.tag == 'tool_dir': self.load_tooldir_tag_set( elem, self.tool_panel, self.integrated_tool_panel, tool_path, load_panel_dict ) if parsing_shed_tool_conf: shed_tool_conf_dict = dict( config_filename=config_filename, tool_path=tool_path, config_elems=config_elems ) self.shed_tool_confs.append( shed_tool_conf_dict )
[docs] def get_shed_config_dict_by_filename( self, filename, default=None ): for shed_config_dict in self.shed_tool_confs: if shed_config_dict[ 'config_filename' ] == filename: return shed_config_dict return default
def __resolve_tool_path(self, tool_path, config_filename): if not tool_path: # Default to backward compatible config setting. tool_path = self.tool_root_dir else: # Allow use of __tool_conf_dir__ in toolbox config files. tool_conf_dir = os.path.dirname(config_filename) tool_path_vars = {"tool_conf_dir": tool_conf_dir} tool_path = string.Template(tool_path).safe_substitute(tool_path_vars) return tool_path def __add_tool_to_tool_panel( self, tool, panel_component, section=False ): # See if a version of this tool is already loaded into the tool panel. The value of panel_component # will be a ToolSection (if the value of section=True) or self.tool_panel (if section=False). tool_id = str( tool.id ) tool = self.tools_by_id[ tool_id ] if section: panel_dict = panel_component.elems else: panel_dict = panel_component already_loaded = False loaded_version_key = None lineage_id = None for lineage_id in tool.lineage_ids: if lineage_id in self.tools_by_id: loaded_version_key = 'tool_%s' % lineage_id if loaded_version_key in panel_dict: already_loaded = True break if already_loaded: if tool.lineage_ids.index( tool_id ) > tool.lineage_ids.index( lineage_id ): key = 'tool_%s' % tool.id index = panel_dict.keys().index( loaded_version_key ) del panel_dict[ loaded_version_key ] panel_dict.insert( index, key, tool ) log.debug( "Loaded tool id: %s, version: %s into tool panel." % ( tool.id, tool.version ) ) else: inserted = False key = 'tool_%s' % tool.id # The value of panel_component is the in-memory tool panel dictionary. for index, integrated_panel_key in enumerate( self.integrated_tool_panel.keys() ): if key == integrated_panel_key: panel_dict.insert( index, key, tool ) if not inserted: inserted = True if not inserted: # Check the tool's installed versions. for lineage_id in tool.lineage_ids: lineage_id_key = 'tool_%s' % lineage_id for index, integrated_panel_key in enumerate( self.integrated_tool_panel.keys() ): if lineage_id_key == integrated_panel_key: panel_dict.insert( index, key, tool ) if not inserted: inserted = True if not inserted: if tool.guid is None or \ tool.tool_shed is None or \ tool.repository_name is None or \ tool.repository_owner is None or \ tool.installed_changeset_revision is None: # We have a tool that was not installed from the Tool Shed, but is also not yet defined in # integrated_tool_panel.xml, so append it to the tool panel. panel_dict[ key ] = tool log.debug( "Loaded tool id: %s, version: %s into tool panel.." % ( tool.id, tool.version ) ) else: # We are in the process of installing the tool. tool_version = self.__get_tool_version( tool_id ) tool_lineage_ids = tool_version.get_version_ids( self.app, reverse=True ) for lineage_id in tool_lineage_ids: if lineage_id in self.tools_by_id: loaded_version_key = 'tool_%s' % lineage_id if loaded_version_key in panel_dict: if not already_loaded: already_loaded = True if not already_loaded: # If the tool is not defined in integrated_tool_panel.xml, append it to the tool panel. panel_dict[ key ] = tool log.debug( "Loaded tool id: %s, version: %s into tool panel...." % ( tool.id, tool.version ) )
[docs] def load_tool_panel( self ): for key, val in self.integrated_tool_panel.items(): if isinstance( val, Tool ): tool_id = key.replace( 'tool_', '', 1 ) if tool_id in self.tools_by_id: self.__add_tool_to_tool_panel( val, self.tool_panel, section=False ) elif isinstance( val, Workflow ): workflow_id = key.replace( 'workflow_', '', 1 ) if workflow_id in self.workflows_by_id: workflow = self.workflows_by_id[ workflow_id ] self.tool_panel[ key ] = workflow log.debug( "Loaded workflow: %s %s" % ( workflow_id, workflow.name ) ) elif isinstance( val, ToolSectionLabel ): self.tool_panel[ key ] = val elif isinstance( val, ToolSection ): elem = ElementTree.Element( 'section' ) elem.attrib[ 'id' ] = val.id or '' elem.attrib[ 'name' ] = val.name or '' elem.attrib[ 'version' ] = val.version or '' section = ToolSection( elem ) log.debug( "Loading section: %s" % elem.get( 'name' ) ) for section_key, section_val in val.elems.items(): if isinstance( section_val, Tool ): tool_id = section_key.replace( 'tool_', '', 1 ) if tool_id in self.tools_by_id: self.__add_tool_to_tool_panel( section_val, section, section=True ) elif isinstance( section_val, Workflow ): workflow_id = section_key.replace( 'workflow_', '', 1 ) if workflow_id in self.workflows_by_id: workflow = self.workflows_by_id[ workflow_id ] section.elems[ section_key ] = workflow log.debug( "Loaded workflow: %s %s" % ( workflow_id, workflow.name ) ) elif isinstance( section_val, ToolSectionLabel ): if section_val: section.elems[ section_key ] = section_val log.debug( "Loaded label: %s" % ( section_val.text ) ) self.tool_panel[ key ] = section
[docs] def load_integrated_tool_panel_keys( self ): """ Load the integrated tool panel keys, setting values for tools and workflows to None. The values will be reset when the various tool panel config files are parsed, at which time the tools and workflows are loaded. """ tree = parse_xml( self.integrated_tool_panel_config ) root = tree.getroot() for elem in root: if elem.tag == 'tool': key = 'tool_%s' % elem.get( 'id' ) self.integrated_tool_panel[ key ] = None elif elem.tag == 'workflow': key = 'workflow_%s' % elem.get( 'id' ) self.integrated_tool_panel[ key ] = None elif elem.tag == 'section': section = ToolSection( elem ) for section_elem in elem: if section_elem.tag == 'tool': key = 'tool_%s' % section_elem.get( 'id' ) section.elems[ key ] = None elif section_elem.tag == 'workflow': key = 'workflow_%s' % section_elem.get( 'id' ) section.elems[ key ] = None elif section_elem.tag == 'label': key = 'label_%s' % section_elem.get( 'id' ) section.elems[ key ] = None key = elem.get( 'id' ) self.integrated_tool_panel[ key ] = section elif elem.tag == 'label': key = 'label_%s' % elem.get( 'id' ) self.integrated_tool_panel[ key ] = None
[docs] def write_integrated_tool_panel_config_file( self ): """ Write the current in-memory version of the integrated_tool_panel.xml file to disk. Since Galaxy administrators use this file to manage the tool panel, we'll not use xml_to_string() since it doesn't write XML quite right. """ fd, filename = tempfile.mkstemp() os.write( fd, '<?xml version="1.0"?>\n' ) os.write( fd, '<toolbox>\n' ) for key, item in self.integrated_tool_panel.items(): if item: if isinstance( item, Tool ): os.write( fd, ' <tool id="%s" />\n' % item.id ) elif isinstance( item, Workflow ): os.write( fd, ' <workflow id="%s" />\n' % item.id ) elif isinstance( item, ToolSectionLabel ): label_id = item.id or '' label_text = item.text or '' label_version = item.version or '' os.write( fd, ' <label id="%s" text="%s" version="%s" />\n' % ( label_id, label_text, label_version ) ) elif isinstance( item, ToolSection ): section_id = item.id or '' section_name = item.name or '' section_version = item.version or '' os.write( fd, ' <section id="%s" name="%s" version="%s">\n' % ( section_id, section_name, section_version ) ) for section_key, section_item in item.elems.items(): if isinstance( section_item, Tool ): if section_item: os.write( fd, ' <tool id="%s" />\n' % section_item.id ) elif isinstance( section_item, Workflow ): if section_item: os.write( fd, ' <workflow id="%s" />\n' % section_item.id ) elif isinstance( section_item, ToolSectionLabel ): if section_item: label_id = section_item.id or '' label_text = section_item.text or '' label_version = section_item.version or '' os.write( fd, ' <label id="%s" text="%s" version="%s" />\n' % ( label_id, label_text, label_version ) ) os.write( fd, ' </section>\n' ) os.write( fd, '</toolbox>\n' ) os.close( fd ) shutil.move( filename, os.path.abspath( self.integrated_tool_panel_config ) ) os.chmod( self.integrated_tool_panel_config, 0644 )
[docs] def get_tool( self, tool_id, tool_version=None, get_all_versions=False ): """Attempt to locate a tool in the tool box.""" if tool_id in self.tools_by_id and not get_all_versions: #tool_id exactly matches an available tool by id (which is 'old' tool_id or guid) return self.tools_by_id[ tool_id ] #exact tool id match not found, or all versions requested, search for other options, e.g. migrated tools or different versions rval = [] tv = self.__get_tool_version( tool_id ) if tv: tool_version_ids = tv.get_version_ids( self.app ) for tool_version_id in tool_version_ids: if tool_version_id in self.tools_by_id: rval.append( self.tools_by_id[ tool_version_id ] ) if not rval: #still no tool, do a deeper search and try to match by old ids for tool in self.tools_by_id.itervalues(): if tool.old_id == tool_id: rval.append( tool ) if rval: if get_all_versions: return rval else: if tool_version: #return first tool with matching version for tool in rval: if tool.version == tool_version: return tool #No tool matches by version, simply return the first available tool found return rval[0] #We now likely have a Toolshed guid passed in, but no supporting database entries #If the tool exists by exact id and is loaded then provide exact match within a list if tool_id in self.tools_by_id: return[ self.tools_by_id[ tool_id ] ] return None
[docs] def get_loaded_tools_by_lineage( self, tool_id ): """Get all loaded tools associated by lineage to the tool whose id is tool_id.""" tv = self.__get_tool_version( tool_id ) if tv: tool_version_ids = tv.get_version_ids( self.app ) available_tool_versions = [] for tool_version_id in tool_version_ids: if tool_version_id in self.tools_by_id: available_tool_versions.append( self.tools_by_id[ tool_version_id ] ) return available_tool_versions else: if tool_id in self.tools_by_id: tool = self.tools_by_id[ tool_id ] return [ tool ] return []
def __get_tool_version( self, tool_id ): """Return a ToolVersion if one exists for the tool_id""" return self.app.install_model.context.query( self.app.install_model.ToolVersion ) \ .filter( self.app.install_model.ToolVersion.table.c.tool_id == tool_id ) \ .first() def __get_tool_shed_repository( self, tool_shed, name, owner, installed_changeset_revision ): # We store only the port, if one exists, in the database. tool_shed = common_util.remove_protocol_from_tool_shed_url( tool_shed ) return self.app.install_model.context.query( self.app.install_model.ToolShedRepository ) \ .filter( and_( self.app.install_model.ToolShedRepository.table.c.tool_shed == tool_shed, self.app.install_model.ToolShedRepository.table.c.name == name, self.app.install_model.ToolShedRepository.table.c.owner == owner, self.app.install_model.ToolShedRepository.table.c.installed_changeset_revision == installed_changeset_revision ) ) \ .first()
[docs] def get_tool_components( self, tool_id, tool_version=None, get_loaded_tools_by_lineage=False, set_selected=False ): """ Retrieve all loaded versions of a tool from the toolbox and return a select list enabling selection of a different version, the list of the tool's loaded versions, and the specified tool. """ toolbox = self tool_version_select_field = None tools = [] tool = None # Backwards compatibility for datasource tools that have default tool_id configured, but which # are now using only GALAXY_URL. tool_ids = listify( tool_id ) for tool_id in tool_ids: if get_loaded_tools_by_lineage: tools = toolbox.get_loaded_tools_by_lineage( tool_id ) else: tools = toolbox.get_tool( tool_id, tool_version=tool_version, get_all_versions=True ) if tools: tool = toolbox.get_tool( tool_id, tool_version=tool_version, get_all_versions=False ) if len( tools ) > 1: tool_version_select_field = self.build_tool_version_select_field( tools, tool.id, set_selected ) break return tool_version_select_field, tools, tool
[docs] def build_tool_version_select_field( self, tools, tool_id, set_selected ): """Build a SelectField whose options are the ids for the received list of tools.""" options = [] refresh_on_change_values = [] for tool in tools: options.insert( 0, ( tool.version, tool.id ) ) refresh_on_change_values.append( tool.id ) select_field = SelectField( name='tool_id', refresh_on_change=True, refresh_on_change_values=refresh_on_change_values ) for option_tup in options: selected = set_selected and option_tup[ 1 ] == tool_id if selected: select_field.add_option( 'version %s' % option_tup[ 0 ], option_tup[ 1 ], selected=True ) else: select_field.add_option( 'version %s' % option_tup[ 0 ], option_tup[ 1 ] ) return select_field
[docs] def load_tool_tag_set( self, elem, panel_dict, integrated_panel_dict, tool_path, load_panel_dict, guid=None, index=None ): try: path = elem.get( "file" ) repository_id = None if guid is None: tool_shed_repository = None can_load_into_panel_dict = True else: # The tool is contained in an installed tool shed repository, so load # the tool only if the repository has not been marked deleted. tool_shed = elem.find( "tool_shed" ).text repository_name = elem.find( "repository_name" ).text repository_owner = elem.find( "repository_owner" ).text installed_changeset_revision_elem = elem.find( "installed_changeset_revision" ) if installed_changeset_revision_elem is None: # Backward compatibility issue - the tag used to be named 'changeset_revision'. installed_changeset_revision_elem = elem.find( "changeset_revision" ) installed_changeset_revision = installed_changeset_revision_elem.text tool_shed_repository = self.__get_tool_shed_repository( tool_shed, repository_name, repository_owner, installed_changeset_revision ) if tool_shed_repository: # Only load tools if the repository is not deactivated or uninstalled. can_load_into_panel_dict = not tool_shed_repository.deleted repository_id = self.app.security.encode_id( tool_shed_repository.id ) else: # If there is not yet a tool_shed_repository record, we're in the process of installing # a new repository, so any included tools can be loaded into the tool panel. can_load_into_panel_dict = True tool = self.load_tool( os.path.join( tool_path, path ), guid=guid, repository_id=repository_id ) if string_as_bool(elem.get( 'hidden', False )): tool.hidden = True key = 'tool_%s' % str( tool.id ) if can_load_into_panel_dict: if guid is not None: tool.tool_shed = tool_shed tool.repository_name = repository_name tool.repository_owner = repository_owner tool.installed_changeset_revision = installed_changeset_revision tool.guid = guid tool.version = elem.find( "version" ).text # Make sure the tool has a tool_version. if not self.__get_tool_version( tool.id ): tool_version = self.app.install_model.ToolVersion( tool_id=tool.id, tool_shed_repository=tool_shed_repository ) self.app.install_model.context.add( tool_version ) self.app.install_model.context.flush() # Load the tool's lineage ids. tool.lineage_ids = tool.tool_version.get_version_ids( self.app ) if self.app.config.get_bool( 'enable_tool_tags', False ): tag_names = elem.get( "tags", "" ).split( "," ) for tag_name in tag_names: if tag_name == '': continue tag = self.sa_session.query( self.app.model.Tag ).filter_by( name=tag_name ).first() if not tag: tag = self.app.model.Tag( name=tag_name ) self.sa_session.add( tag ) self.sa_session.flush() tta = self.app.model.ToolTagAssociation( tool_id=tool.id, tag_id=tag.id ) self.sa_session.add( tta ) self.sa_session.flush() else: for tagged_tool in tag.tagged_tools: if tagged_tool.tool_id == tool.id: break else: tta = self.app.model.ToolTagAssociation( tool_id=tool.id, tag_id=tag.id ) self.sa_session.add( tta ) self.sa_session.flush() self.__add_tool( tool, load_panel_dict, panel_dict ) # Always load the tool into the integrated_panel_dict, or it will not be included in the integrated_tool_panel.xml file. if key in integrated_panel_dict or index is None: integrated_panel_dict[ key ] = tool else: integrated_panel_dict.insert( index, key, tool ) except: log.exception( "Error reading tool from path: %s" % path )
def __add_tool( self, tool, load_panel_dict, panel_dict ): # Allow for the same tool to be loaded into multiple places in the tool panel. We have to handle # the case where the tool is contained in a repository installed from the tool shed, and the Galaxy # administrator has retrieved updates to the installed repository. In this case, the tool may have # been updated, but the version was not changed, so the tool should always be reloaded here. We used # to only load the tool if it was not found in self.tools_by_id, but performing that check did # not enable this scenario. self.tools_by_id[ tool.id ] = tool if load_panel_dict: self.__add_tool_to_tool_panel( tool, panel_dict, section=isinstance( panel_dict, ToolSection ) )
[docs] def load_workflow_tag_set( self, elem, panel_dict, integrated_panel_dict, load_panel_dict, index=None ): try: # TODO: should id be encoded? workflow_id = elem.get( 'id' ) workflow = self.load_workflow( workflow_id ) self.workflows_by_id[ workflow_id ] = workflow key = 'workflow_' + workflow_id if load_panel_dict: panel_dict[ key ] = workflow # Always load workflows into the integrated_panel_dict. if key in integrated_panel_dict or index is None: integrated_panel_dict[ key ] = workflow else: integrated_panel_dict.insert( index, key, workflow ) except: log.exception( "Error loading workflow: %s" % workflow_id )
[docs] def load_label_tag_set( self, elem, panel_dict, integrated_panel_dict, load_panel_dict, index=None ): label = ToolSectionLabel( elem ) key = 'label_' + label.id if load_panel_dict: panel_dict[ key ] = label if key in integrated_panel_dict or index is None: integrated_panel_dict[ key ] = label else: integrated_panel_dict.insert( index, key, label )
[docs] def load_section_tag_set( self, elem, tool_path, load_panel_dict, index=None ): key = elem.get( "id" ) if key in self.tool_panel: section = self.tool_panel[ key ] elems = section.elems else: section = ToolSection( elem ) elems = section.elems if key in self.integrated_tool_panel: integrated_section = self.integrated_tool_panel[ key ] integrated_elems = integrated_section.elems else: integrated_section = ToolSection( elem ) integrated_elems = integrated_section.elems for sub_index, sub_elem in enumerate( elem ): if sub_elem.tag == 'tool': self.load_tool_tag_set( sub_elem, elems, integrated_elems, tool_path, load_panel_dict, guid=sub_elem.get( 'guid' ), index=sub_index ) elif sub_elem.tag == 'workflow': self.load_workflow_tag_set( sub_elem, elems, integrated_elems, load_panel_dict, index=sub_index ) elif sub_elem.tag == 'label': self.load_label_tag_set( sub_elem, elems, integrated_elems, load_panel_dict, index=sub_index ) elif sub_elem.tag == 'tool_dir': self.load_tooldir_tag_set( sub_elem, elems, tool_path, integrated_elems, load_panel_dict ) if load_panel_dict: self.tool_panel[ key ] = section # Always load sections into the integrated_tool_panel. if key in self.integrated_tool_panel or index is None: self.integrated_tool_panel[ key ] = integrated_section else: self.integrated_tool_panel.insert( index, key, integrated_section )
[docs] def load_tooldir_tag_set(self, sub_elem, elems, tool_path, integrated_elems, load_panel_dict): directory = os.path.join( tool_path, sub_elem.attrib.get("dir") ) recursive = string_as_bool( sub_elem.attrib.get("recursive", True) ) self.__watch_directory( directory, elems, integrated_elems, load_panel_dict, recursive )
def __watch_directory( self, directory, elems, integrated_elems, load_panel_dict, recursive): def quick_load( tool_file, async=True ): try: tool = self.load_tool( tool_file ) self.__add_tool( tool, load_panel_dict, elems ) # Always load the tool into the integrated_panel_dict, or it will not be included in the integrated_tool_panel.xml file. key = 'tool_%s' % str( tool.id ) integrated_elems[ key ] = tool if async: self.load_tool_panel() if self.app.config.update_integrated_tool_panel: # Write the current in-memory integrated_tool_panel to the integrated_tool_panel.xml file. # This will cover cases where the Galaxy administrator manually edited one or more of the tool panel # config files, adding or removing locally developed tools or workflows. The value of integrated_tool_panel # will be False when things like functional tests are the caller. self.fix_integrated_tool_panel_dict() self.write_integrated_tool_panel_config_file() return tool.id except Exception: log.exception("Failed to load potential tool %s." % tool_file) return None tool_loaded = False for name in os.listdir( directory ): child_path = os.path.join(directory, name) if os.path.isdir(child_path) and recursive: self.__watch_directory(child_path, elems, integrated_elems, load_panel_dict, recursive) elif name.endswith( ".xml" ): quick_load( child_path, async=False ) tool_loaded = True if tool_loaded: self.tool_watcher.watch_directory( directory, quick_load )
[docs] def load_tool( self, config_file, guid=None, repository_id=None, **kwds ): """Load a single tool from the file named by `config_file` and return an instance of `Tool`.""" # Parse XML configuration file and get the root element tree = load_tool( config_file ) root = tree.getroot() # Allow specifying a different tool subclass to instantiate if root.find( "type" ) is not None: type_elem = root.find( "type" ) module = type_elem.get( 'module', 'galaxy.tools' ) cls = type_elem.get( 'class' ) mod = __import__( module, globals(), locals(), [cls] ) ToolClass = getattr( mod, cls ) elif root.get( 'tool_type', None ) is not None: ToolClass = tool_types.get( root.get( 'tool_type' ) ) else: # Normal tool - only insert dynamic resource parameters for these # tools. if hasattr( self.app, "job_config" ): # toolshed may not have job_config? tool_id = root.get( 'id' ) if root else None parameters = self.app.job_config.get_tool_resource_parameters( tool_id ) if parameters: inputs = root.find('inputs') # If tool has not inputs, create some so we can insert conditional if not inputs: inputs = ElementTree.fromstring( "<inputs></inputs>") root.append( inputs ) # Insert a conditional allowing user to specify resource parameters. conditional_element = ElementTree.fromstring( JOB_RESOURCE_CONDITIONAL_XML ) when_yes_elem = conditional_element.findall( "when" )[ 1 ] for parameter in parameters: when_yes_elem.append( parameter ) inputs.append( conditional_element ) ToolClass = Tool tool = ToolClass( config_file, root, self.app, guid=guid, repository_id=repository_id, **kwds ) tool_id = tool.id if not tool_id.startswith("__"): # do not monitor special tools written to tmp directory - no reason # to monitor such a large directory. self.tool_watcher.watch_file( config_file, tool.id ) return tool
[docs] def package_tool( self, trans, tool_id ): """ Create a tarball with the tool's xml, help images, and test data. :param trans: the web transaction :param tool_id: the tool ID from app.toolbox :returns: tuple of tarball filename, success True/False, message/None """ message = '' success = True # Make sure the tool is actually loaded. if tool_id not in self.tools_by_id: return None, False, "No tool with id %s" % escape( tool_id ) else: tool = self.tools_by_id[ tool_id ] tarball_files = [] temp_files = [] tool_xml = file( os.path.abspath( tool.config_file ), 'r' ).read() # Retrieve tool help images and rewrite the tool's xml into a temporary file with the path # modified to be relative to the repository root. image_found = False if tool.help is not None: tool_help = tool.help._source # Check each line of the rendered tool help for an image tag that points to a location under static/ for help_line in tool_help.split( '\n' ): image_regex = re.compile( 'img alt="[^"]+" src="\${static_path}/([^"]+)"' ) matches = re.search( image_regex, help_line ) if matches is not None: tool_help_image = matches.group(1) tarball_path = tool_help_image filesystem_path = os.path.abspath( os.path.join( trans.app.config.root, 'static', tool_help_image ) ) if os.path.exists( filesystem_path ): tarball_files.append( ( filesystem_path, tarball_path ) ) image_found = True tool_xml = tool_xml.replace( '${static_path}/%s' % tarball_path, tarball_path ) # If one or more tool help images were found, add the modified tool XML to the tarball instead of the original. if image_found: fd, new_tool_config = tempfile.mkstemp( suffix='.xml' ) os.close( fd ) file( new_tool_config, 'w' ).write( tool_xml ) tool_tup = ( os.path.abspath( new_tool_config ), os.path.split( tool.config_file )[-1] ) temp_files.append( os.path.abspath( new_tool_config ) ) else: tool_tup = ( os.path.abspath( tool.config_file ), os.path.split( tool.config_file )[-1] ) tarball_files.append( tool_tup ) # TODO: This feels hacky. tool_command = tool.command.strip().split()[0] tool_path = os.path.dirname( os.path.abspath( tool.config_file ) ) # Add the tool XML to the tuple that will be used to populate the tarball. if os.path.exists( os.path.join( tool_path, tool_command ) ): tarball_files.append( ( os.path.join( tool_path, tool_command ), tool_command ) ) # Find and add macros and code files. for external_file in tool.get_externally_referenced_paths( os.path.abspath( tool.config_file ) ): external_file_abspath = os.path.abspath( os.path.join( tool_path, external_file ) ) tarball_files.append( ( external_file_abspath, external_file ) ) if os.path.exists( os.path.join( tool_path, "Dockerfile" ) ): tarball_files.append( ( os.path.join( tool_path, "Dockerfile" ), "Dockerfile" ) ) # Find tests, and check them for test data. tests = tool.tests if tests is not None: for test in tests: # Add input file tuples to the list. for input in test.inputs: for input_value in test.inputs[ input ]: input_path = os.path.abspath( os.path.join( 'test-data', input_value ) ) if os.path.exists( input_path ): td_tup = ( input_path, os.path.join( 'test-data', input_value ) ) tarball_files.append( td_tup ) # And add output file tuples to the list. for label, filename, _ in test.outputs: output_filepath = os.path.abspath( os.path.join( 'test-data', filename ) ) if os.path.exists( output_filepath ): td_tup = ( output_filepath, os.path.join( 'test-data', filename ) ) tarball_files.append( td_tup ) for param in tool.input_params: # Check for tool data table definitions. if hasattr( param, 'options' ): if hasattr( param.options, 'tool_data_table' ): data_table = param.options.tool_data_table if hasattr( data_table, 'filenames' ): data_table_definitions = [] for data_table_filename in data_table.filenames: # FIXME: from_shed_config seems to always be False. if not data_table.filenames[ data_table_filename ][ 'from_shed_config' ]: tar_file = data_table.filenames[ data_table_filename ][ 'filename' ] + '.sample' sample_file = os.path.join( data_table.filenames[ data_table_filename ][ 'tool_data_path' ], tar_file ) # Use the .sample file, if one exists. If not, skip this data table. if os.path.exists( sample_file ): tarfile_path, tarfile_name = os.path.split( tar_file ) tarfile_path = os.path.join( 'tool-data', tarfile_name ) sample_name = tarfile_path + '.sample' tarball_files.append( ( sample_file, tarfile_path ) ) data_table_definitions.append( data_table.xml_string ) if len( data_table_definitions ) > 0: # Put the data table definition XML in a temporary file. table_definition = '<?xml version="1.0" encoding="utf-8"?>\n<tables>\n %s</tables>' table_definition = table_definition % '\n'.join( data_table_definitions ) fd, table_conf = tempfile.mkstemp() os.close( fd ) file( table_conf, 'w' ).write( table_definition ) tarball_files.append( ( table_conf, os.path.join( 'tool-data', 'tool_data_table_conf.xml.sample' ) ) ) temp_files.append( table_conf ) # Create the tarball. fd, tarball_archive = tempfile.mkstemp( suffix='.tgz' ) os.close( fd ) tarball = tarfile.open( name=tarball_archive, mode='w:gz' ) # Add the files from the previously generated list. for fspath, tarpath in tarball_files: tarball.add( fspath, arcname=tarpath ) tarball.close() # Delete any temporary files that were generated. for temp_file in temp_files: os.remove( temp_file ) return tarball_archive, True, None return None, False, "An unknown error occurred."
[docs] def reload_tool_by_id( self, tool_id ): """ Attempt to reload the tool identified by 'tool_id', if successful replace the old tool. """ if tool_id not in self.tools_by_id: message = "No tool with id %s" % escape( tool_id ) status = 'error' else: old_tool = self.tools_by_id[ tool_id ] new_tool = self.load_tool( old_tool.config_file ) # The tool may have been installed from a tool shed, so set the tool shed attributes. # Since the tool version may have changed, we don't override it here. new_tool.id = old_tool.id new_tool.guid = old_tool.guid new_tool.tool_shed = old_tool.tool_shed new_tool.repository_name = old_tool.repository_name new_tool.repository_owner = old_tool.repository_owner new_tool.installed_changeset_revision = old_tool.installed_changeset_revision new_tool.old_id = old_tool.old_id # Replace old_tool with new_tool in self.tool_panel tool_key = 'tool_' + tool_id for key, val in self.tool_panel.items(): if key == tool_key: self.tool_panel[ key ] = new_tool break elif key.startswith( 'section' ): if tool_key in val.elems: self.tool_panel[ key ].elems[ tool_key ] = new_tool break self.tools_by_id[ tool_id ] = new_tool message = "Reloaded the tool:<br/>" message += "<b>name:</b> %s<br/>" % old_tool.name message += "<b>id:</b> %s<br/>" % old_tool.id message += "<b>version:</b> %s" % old_tool.version status = 'done' return message, status
[docs] def remove_tool_by_id( self, tool_id ): """ Attempt to remove the tool identified by 'tool_id'. """ if tool_id not in self.tools_by_id: message = "No tool with id %s" % escape( tool_id ) status = 'error' else: tool = self.tools_by_id[ tool_id ] del self.tools_by_id[ tool_id ] tool_key = 'tool_' + tool_id for key, val in self.tool_panel.items(): if key == tool_key: del self.tool_panel[ key ] break elif key.startswith( 'section' ): if tool_key in val.elems: del self.tool_panel[ key ].elems[ tool_key ] break if tool_id in self.data_manager_tools: del self.data_manager_tools[ tool_id ] #TODO: do we need to manually remove from the integrated panel here? message = "Removed the tool:<br/>" message += "<b>name:</b> %s<br/>" % tool.name message += "<b>id:</b> %s<br/>" % tool.id message += "<b>version:</b> %s" % tool.version status = 'done' return message, status
[docs] def load_workflow( self, workflow_id ): """ Return an instance of 'Workflow' identified by `id`, which is encoded in the tool panel. """ id = self.app.security.decode_id( workflow_id ) stored = self.app.model.context.query( self.app.model.StoredWorkflow ).get( id ) return stored.latest_workflow
[docs] def init_dependency_manager( self ): self.dependency_manager = build_dependency_manager( self.app.config )
@property def sa_session( self ): """ Returns a SQLAlchemy session """ return self.app.model.context
[docs] def to_dict( self, trans, in_panel=True, **kwds ): """ to_dict toolbox. """ context = Bunch( toolbox=self, trans=trans, **kwds ) if in_panel: panel_elts = [ val for val in self.tool_panel.itervalues() ] filters = self.filter_factory.build_filters( trans, **kwds ) filtered_panel_elts = [] for index, elt in enumerate( panel_elts ): elt = _filter_for_panel( elt, filters, context ) if elt: filtered_panel_elts.append( elt ) panel_elts = filtered_panel_elts # Produce panel. rval = [] kwargs = dict( trans=trans, link_details=True ) for elt in panel_elts: rval.append( to_dict_helper( elt, kwargs ) ) else: tools = [] for id, tool in self.tools_by_id.items(): tools.append( tool.to_dict( trans, link_details=True ) ) rval = tools return rval
def _filter_for_panel( item, filters, context ): """ Filters tool panel elements so that only those that are compatible with provided filters are kept. """ def _apply_filter( filter_item, filter_list ): for filter_method in filter_list: if not filter_method( context, filter_item ): return False return True if isinstance( item, Tool ): if _apply_filter( item, filters[ 'tool' ] ): return item elif isinstance( item, ToolSectionLabel ): if _apply_filter( item, filters[ 'label' ] ): return item elif isinstance( item, ToolSection ): # Filter section item-by-item. Only show a label if there are # non-filtered tools below it. if _apply_filter( item, filters[ 'section' ] ): cur_label_key = None tools_under_label = False filtered_elems = item.elems.copy() for key, section_item in item.elems.items(): if isinstance( section_item, Tool ): # Filter tool. if _apply_filter( section_item, filters[ 'tool' ] ): tools_under_label = True else: del filtered_elems[ key ] elif isinstance( section_item, ToolSectionLabel ): # If there is a label and it does not have tools, # remove it. if ( cur_label_key and not tools_under_label ) or not _apply_filter( section_item, filters[ 'label' ] ): del filtered_elems[ cur_label_key ] # Reset attributes for new label. cur_label_key = key tools_under_label = False # Handle last label. if cur_label_key and not tools_under_label: del filtered_elems[ cur_label_key ] # Only return section if there are elements. if len( filtered_elems ) != 0: copy = item.copy() copy.elems = filtered_elems return copy return None
[docs]class ToolSection( object, Dictifiable ): """ A group of tools with similar type/purpose that will be displayed as a group in the user interface. """ dict_collection_visible_keys = ( 'id', 'name', 'version' ) def __init__( self, elem=None ): f = lambda elem, val: elem is not None and elem.get( val ) or '' self.name = f( elem, 'name' ) self.id = f( elem, 'id' ) self.version = f( elem, 'version' ) self.elems = odict()
[docs] def copy( self ): copy = ToolSection() copy.name = self.name copy.id = self.id copy.version = self.version copy.elems = self.elems.copy() return copy
[docs] def to_dict( self, trans, link_details=False ): """ Return a dict that includes section's attributes. """ section_dict = super( ToolSection, self ).to_dict() section_elts = [] kwargs = dict( trans=trans, link_details=link_details ) for elt in self.elems.values(): section_elts.append( to_dict_helper( elt, kwargs ) ) section_dict[ 'elems' ] = section_elts return section_dict
[docs]class ToolSectionLabel( object, Dictifiable ): """ A label for a set of tools that can be displayed above groups of tools and sections in the user interface """ dict_collection_visible_keys = ( 'id', 'text', 'version' ) def __init__( self, elem ): self.text = elem.get( "text" ) self.id = elem.get( "id" ) self.version = elem.get( "version" ) or ''
[docs]class DefaultToolState( object ): """ Keeps track of the state of a users interaction with a tool between requests. The default tool state keeps track of the current page (for multipage "wizard" tools) and the values of all """ def __init__( self ): self.page = 0 self.rerun_remap_job_id = None self.inputs = None
[docs] def encode( self, tool, app, secure=True ): """ Convert the data to a string """ # Convert parameters to a dictionary of strings, and save curent # page in that dict value = params_to_strings( tool.inputs, self.inputs, app ) value["__page__"] = self.page value["__rerun_remap_job_id__"] = self.rerun_remap_job_id value = json.dumps( value ) # Make it secure if secure: a = hmac_new( app.config.tool_secret, value ) b = binascii.hexlify( value ) return "%s:%s" % ( a, b ) else: return value
[docs] def decode( self, value, tool, app, secure=True ): """ Restore the state from a string """ if secure: # Extract and verify hash a, b = value.split( ":" ) value = binascii.unhexlify( b ) test = hmac_new( app.config.tool_secret, value ) assert a == test # Restore from string values = json_fix( json.loads( value ) ) self.page = values.pop( "__page__" ) if '__rerun_remap_job_id__' in values: self.rerun_remap_job_id = values.pop( "__rerun_remap_job_id__" ) else: self.rerun_remap_job_id = None self.inputs = params_from_strings( tool.inputs, values, app, ignore_errors=True )
[docs] def copy( self ): """ WARNING! Makes a shallow copy, *SHOULD* rework to have it make a deep copy. """ new_state = DefaultToolState() new_state.page = self.page new_state.rerun_remap_job_id = self.rerun_remap_job_id # This need to be copied. new_state.inputs = self.inputs return new_state
[docs]class ToolOutput( object, Dictifiable ): """ Represents an output datasets produced by a tool. For backward compatibility this behaves as if it were the tuple:: (format, metadata_source, parent) """ dict_collection_visible_keys = ( 'name', 'format', 'label', 'hidden' ) def __init__( self, name, format=None, format_source=None, metadata_source=None, parent=None, label=None, filters=None, actions=None, hidden=False ): self.name = name self.format = format self.format_source = format_source self.metadata_source = metadata_source self.parent = parent self.label = label self.filters = filters or [] self.actions = actions self.hidden = hidden # Tuple emulation def __len__( self ): return 3 def __getitem__( self, index ): if index == 0: return self.format elif index == 1: return self.metadata_source elif index == 2: return self.parent else: raise IndexError( index ) def __iter__( self ): return iter( ( self.format, self.metadata_source, self.parent ) )
[docs]class Tool( object, Dictifiable ): """ Represents a computational tool that can be executed through Galaxy. """ tool_type = 'default' requires_setting_metadata = True default_tool_action = DefaultToolAction dict_collection_visible_keys = ( 'id', 'name', 'version', 'description' ) default_template = 'tool_form.mako' def __init__( self, config_file, root, app, guid=None, repository_id=None ): """Load a tool from the config named by `config_file`""" # Determine the full path of the directory where the tool config is self.config_file = config_file self.tool_dir = os.path.dirname( config_file ) self.app = app self.repository_id = repository_id #setup initial attribute values self.inputs = odict() self.stdio_exit_codes = list() self.stdio_regexes = list() self.inputs_by_page = list() self.display_by_page = list() self.action = '/tool_runner/index' self.target = 'galaxy_main' self.method = 'post' self.check_values = True self.nginx_upload = False self.input_required = False self.display_interface = True self.require_login = False self.rerun = False # Define a place to keep track of all input These # differ from the inputs dictionary in that inputs can be page # elements like conditionals, but input_params are basic form # parameters like SelectField objects. This enables us to more # easily ensure that parameter dependencies like index files or # tool_data_table_conf.xml entries exist. self.input_params = [] # Attributes of tools installed from Galaxy tool sheds. self.tool_shed = None self.repository_name = None self.repository_owner = None self.installed_changeset_revision = None # The tool.id value will be the value of guid, but we'll keep the # guid attribute since it is useful to have. self.guid = guid self.old_id = None self.version = None # Enable easy access to this tool's version lineage. self.lineage_ids = [] #populate toolshed repository info, if available self.populate_tool_shed_info() # Parse XML element containing configuration self.parse( root, guid=guid ) self.external_runJob_script = app.config.drmaa_external_runjob_script @property def sa_session( self ): """Returns a SQLAlchemy session""" return self.app.model.context @property def tool_version( self ): """Return a ToolVersion if one exists for our id""" return self.app.install_model.context.query( self.app.install_model.ToolVersion ) \ .filter( self.app.install_model.ToolVersion.table.c.tool_id == self.id ) \ .first() @property def tool_versions( self ): # If we have versions, return them. tool_version = self.tool_version if tool_version: return tool_version.get_versions( self.app ) return [] @property def tool_version_ids( self ): # If we have versions, return a list of their tool_ids. tool_version = self.tool_version if tool_version: return tool_version.get_version_ids( self.app ) return [] @property def tool_shed_repository( self ): # If this tool is included in an installed tool shed repository, return it. if self.tool_shed: return suc.get_tool_shed_repository_by_shed_name_owner_installed_changeset_revision( self.app, self.tool_shed, self.repository_name, self.repository_owner, self.installed_changeset_revision ) return None def __get_job_tool_configuration(self, job_params=None): """Generalized method for getting this tool's job configuration. :type job_params: dict or None :returns: `galaxy.jobs.JobToolConfiguration` -- JobToolConfiguration that matches this `Tool` and the given `job_params` """ rval = None if len(self.job_tool_configurations) == 1: # If there's only one config, use it rather than wasting time on comparisons rval = self.job_tool_configurations[0] elif job_params is None: for job_tool_config in self.job_tool_configurations: if not job_tool_config.params: rval = job_tool_config break else: for job_tool_config in self.job_tool_configurations: if job_tool_config.params: # There are job params and this config has params defined for param, value in job_params.items(): if param not in job_tool_config.params or job_tool_config.params[param] != job_params[param]: break else: # All params match, use this config rval = job_tool_config break else: rval = job_tool_config assert rval is not None, 'Could not get a job tool configuration for Tool %s with job_params %s, this is a bug' % (self.id, job_params) return rval
[docs] def get_job_handler(self, job_params=None): """Get a suitable job handler for this `Tool` given the provided `job_params`. If multiple handlers are valid for combination of `Tool` and `job_params` (e.g. the defined handler is a handler tag), one will be selected at random. :param job_params: Any params specific to this job (e.g. the job source) :type job_params: dict or None :returns: str -- The id of a job handler for a job run of this `Tool` """ # convert tag to ID if necessary return self.app.job_config.get_handler(self.__get_job_tool_configuration(job_params=job_params).handler)
[docs] def get_job_destination(self, job_params=None): """ :returns: galaxy.jobs.JobDestination -- The destination definition and runner parameters. """ return self.app.job_config.get_destination(self.__get_job_tool_configuration(job_params=job_params).destination)
[docs] def get_panel_section( self ): for key, item in self.app.toolbox.integrated_tool_panel.items(): if item: if isinstance( item, Tool ): if item.id == self.id: return '', '' if isinstance( item, ToolSection ): section_id = item.id or '' section_name = item.name or '' for section_key, section_item in item.elems.items(): if isinstance( section_item, Tool ): if section_item: if section_item.id == self.id: return section_id, section_name return None, None
[docs] def allow_user_access( self, user ): """ :returns: bool -- Whether the user is allowed to access the tool. """ return True
[docs] def parse( self, root, guid=None ): """ Read tool configuration from the element `root` and fill in `self`. """ # Get the (user visible) name of the tool self.name = root.get( "name" ) if not self.name: raise Exception( "Missing tool 'name'" ) # Get the UNIQUE id for the tool self.old_id = root.get( "id" ) if guid is None: self.id = self.old_id else: self.id = guid if not self.id: raise Exception( "Missing tool 'id'" ) self.version = root.get( "version" ) if not self.version: # For backward compatibility, some tools may not have versions yet. self.version = "1.0.0" # Support multi-byte tools self.is_multi_byte = string_as_bool( root.get( "is_multi_byte", False ) ) # Force history to fully refresh after job execution for this tool. # Useful i.e. when an indeterminate number of outputs are created by # a tool. self.force_history_refresh = string_as_bool( root.get( 'force_history_refresh', 'False' ) ) self.display_interface = string_as_bool( root.get( 'display_interface', str( self.display_interface ) ) ) self.require_login = string_as_bool( root.get( 'require_login', str( self.require_login ) ) ) # Load input translator, used by datasource tools to change names/values of incoming parameters self.input_translator = root.find( "request_param_translation" ) if self.input_translator: self.input_translator = ToolInputTranslator.from_element( self.input_translator ) # Command line (template). Optional for tools that do not invoke a local program command = root.find("command") if command is not None and command.text is not None: self.command = command.text.lstrip() # get rid of leading whitespace # Must pre-pend this AFTER processing the cheetah command template self.interpreter = command.get( "interpreter", None ) else: self.command = '' self.interpreter = None # Parameters used to build URL for redirection to external app redirect_url_params = root.find( "redirect_url_params" ) if redirect_url_params is not None and redirect_url_params.text is not None: # get rid of leading / trailing white space redirect_url_params = redirect_url_params.text.strip() # Replace remaining white space with something we can safely split on later # when we are building the params self.redirect_url_params = redirect_url_params.replace( ' ', '**^**' ) else: self.redirect_url_params = '' # Short description of the tool self.description = xml_text(root, "description") # Versioning for tools self.version_string_cmd = None version_cmd = root.find("version_command") if version_cmd is not None: self.version_string_cmd = version_cmd.text.strip() version_cmd_interpreter = version_cmd.get( "interpreter", None ) if version_cmd_interpreter: executable = self.version_string_cmd.split()[0] abs_executable = os.path.abspath(os.path.join(self.tool_dir, executable)) command_line = self.version_string_cmd.replace(executable, abs_executable, 1) self.version_string_cmd = version_cmd_interpreter + " " + command_line # Parallelism for tasks, read from tool config. parallelism = root.find("parallelism") if parallelism is not None and parallelism.get("method"): self.parallelism = ParallelismInfo(parallelism) else: self.parallelism = None # Get JobToolConfiguration(s) valid for this particular Tool. At least # a 'default' will be provided that uses the 'default' handler and # 'default' destination. I thought about moving this to the # job_config, but it makes more sense to store here. -nate self_ids = [ self.id.lower() ] if self.old_id != self.id: # Handle toolshed guids self_ids = [ self.id.lower(), self.id.lower().rsplit('/', 1)[0], self.old_id.lower() ] self.all_ids = self_ids # In the toolshed context, there is no job config. if 'job_config' in dir(self.app): self.job_tool_configurations = self.app.job_config.get_job_tool_configurations(self_ids) # Is this a 'hidden' tool (hidden in tool menu) self.hidden = xml_text(root, "hidden") if self.hidden: self.hidden = string_as_bool(self.hidden) # Load any tool specific code (optional) Edit: INS 5/29/2007, # allow code files to have access to the individual tool's # "module" if it has one. Allows us to reuse code files, etc. self.code_namespace = dict() self.hook_map = {} for code_elem in root.findall("code"): for hook_elem in code_elem.findall("hook"): for key, value in hook_elem.items(): # map hook to function self.hook_map[key] = value file_name = code_elem.get("file") code_path = os.path.join( self.tool_dir, file_name ) execfile( code_path, self.code_namespace ) # Load any tool specific options (optional) self.options = dict( sanitize=True, refresh=False ) for option_elem in root.findall("options"): for option, value in self.options.copy().items(): if isinstance(value, type(False)): self.options[option] = string_as_bool(option_elem.get(option, str(value))) else: self.options[option] = option_elem.get(option, str(value)) self.options = Bunch(** self.options) # Parse tool inputs (if there are any required) self.parse_inputs( root ) # Parse tool help self.parse_help( root ) # Description of outputs produced by an invocation of the tool self.parse_outputs( root ) # Parse result handling for tool exit codes and stdout/stderr messages: self.parse_stdio( root ) # Any extra generated config files for the tool self.config_files = [] conf_parent_elem = root.find("configfiles") if conf_parent_elem: for conf_elem in conf_parent_elem.findall( "configfile" ): name = conf_elem.get( "name" ) filename = conf_elem.get( "filename", None ) text = conf_elem.text self.config_files.append( ( name, filename, text ) ) # Action action_elem = root.find( "action" ) if action_elem is None: self.tool_action = self.default_tool_action() else: module = action_elem.get( 'module' ) cls = action_elem.get( 'class' ) mod = __import__( module, globals(), locals(), [cls]) self.tool_action = getattr( mod, cls )() # User interface hints self.uihints = {} uihints_elem = root.find( "uihints" ) if uihints_elem is not None: for key, value in uihints_elem.attrib.iteritems(): self.uihints[ key ] = value # Tests self.__tests_elem = root.find( "tests" ) self.__tests_populated = False # Requirements (dependencies) requirements, containers = parse_requirements_from_xml( root ) self.requirements = requirements self.containers = containers self.citations = self._parse_citations( root ) # Determine if this tool can be used in workflows self.is_workflow_compatible = self.check_workflow_compatible(root) # Trackster configuration. trackster_conf = root.find( "trackster_conf" ) if trackster_conf is not None: self.trackster_conf = TracksterConfig.parse( trackster_conf ) else: self.trackster_conf = None
@property def tests( self ): if not self.__tests_populated: tests_elem = self.__tests_elem if tests_elem: try: self.__tests = parse_tests_elem( self, tests_elem ) except: log.exception( "Failed to parse tool tests" ) else: self.__tests = None self.__tests_populated = True return self.__tests
[docs] def parse_inputs( self, root ): """ Parse the "<inputs>" element and create appropriate `ToolParameter`s. This implementation supports multiple pages and grouping constructs. """ # Load parameters (optional) input_elem = root.find("inputs") enctypes = set() if input_elem is not None: # Handle properties of the input form self.check_values = string_as_bool( input_elem.get("check_values", self.check_values ) ) self.nginx_upload = string_as_bool( input_elem.get( "nginx_upload", self.nginx_upload ) ) self.action = input_elem.get( 'action', self.action ) # If we have an nginx upload, save the action as a tuple instead of # a string. The actual action needs to get url_for run to add any # prefixes, and we want to avoid adding the prefix to the # nginx_upload_path. This logic is handled in the tool_form.mako # template. if self.nginx_upload and self.app.config.nginx_upload_path: if '?' in urllib.unquote_plus( self.action ): raise Exception( 'URL parameters in a non-default tool action can not be used ' \ 'in conjunction with nginx upload. Please convert them to ' \ 'hidden POST parameters' ) self.action = (self.app.config.nginx_upload_path + '?nginx_redir=', urllib.unquote_plus(self.action)) self.target = input_elem.get( "target", self.target ) self.method = input_elem.get( "method", self.method ) # Parse the actual parameters # Handle multiple page case pages = input_elem.findall( "page" ) for page in ( pages or [ input_elem ] ): display, inputs = self.parse_input_page( page, enctypes ) self.inputs_by_page.append( inputs ) self.inputs.update( inputs ) self.display_by_page.append( display ) else: self.inputs_by_page.append( self.inputs ) self.display_by_page.append( None ) self.display = self.display_by_page[0] self.npages = len( self.inputs_by_page ) self.last_page = len( self.inputs_by_page ) - 1 self.has_multiple_pages = bool( self.last_page ) # Determine the needed enctype for the form if len( enctypes ) == 0: self.enctype = "application/x-www-form-urlencoded" elif len( enctypes ) == 1: self.enctype = enctypes.pop() else: raise Exception( "Conflicting required enctypes: %s" % str( enctypes ) ) # Check if the tool either has no parameters or only hidden (and # thus hardcoded) FIXME: hidden parameters aren't # parameters at all really, and should be passed in a different # way, making this check easier. self.template_macro_params = template_macro_params(root) for param in self.inputs.values(): if not isinstance( param, ( HiddenToolParameter, BaseURLToolParameter ) ): self.input_required = True break
[docs] def parse_help( self, root ): """ Parse the help text for the tool. Formatted in reStructuredText, but stored as Mako to allow for dynamic image paths. This implementation supports multiple pages. """ # TODO: Allow raw HTML or an external link. self.help = root.find("help") self.help_by_page = list() help_header = "" help_footer = "" if self.help is not None: if self.repository_id and self.help.text.find( '.. image:: ' ) >= 0: # Handle tool help image display for tools that are contained in repositories in the tool shed or installed into Galaxy. lock = threading.Lock() lock.acquire( True ) try: self.help.text = suc.set_image_paths( self.app, self.repository_id, self.help.text ) except Exception, e: log.exception( "Exception in parse_help, so images may not be properly displayed:\n%s" % str( e ) ) finally: lock.release() help_pages = self.help.findall( "page" ) help_header = self.help.text try: self.help = Template( rst_to_html(self.help.text), input_encoding='utf-8', output_encoding='utf-8', default_filters=[ 'decode.utf8' ], encoding_errors='replace' ) except: log.exception( "error in help for tool %s" % self.name ) # Multiple help page case if help_pages: for help_page in help_pages: self.help_by_page.append( help_page.text ) help_footer = help_footer + help_page.tail # Each page has to rendered all-together because of backreferences allowed by rst try: self.help_by_page = [ Template( rst_to_html( help_header + x + help_footer ), input_encoding='utf-8', output_encoding='utf-8', default_filters=[ 'decode.utf8' ], encoding_errors='replace' ) for x in self.help_by_page ] except: log.exception( "error in multi-page help for tool %s" % self.name ) # Pad out help pages to match npages ... could this be done better? while len( self.help_by_page ) < self.npages: self.help_by_page.append( self.help )
[docs] def parse_outputs( self, root ): """ Parse <outputs> elements and fill in self.outputs (keyed by name) """ self.outputs = odict() out_elem = root.find("outputs") if not out_elem: return for data_elem in out_elem.findall("data"): output = ToolOutput( data_elem.get("name") ) output.format = data_elem.get("format", "data") output.change_format = data_elem.findall("change_format") output.format_source = data_elem.get("format_source", None) output.metadata_source = data_elem.get("metadata_source", "") output.parent = data_elem.get("parent", None) output.label = xml_text( data_elem, "label" ) output.count = int( data_elem.get("count", 1) ) output.filters = data_elem.findall( 'filter' ) output.from_work_dir = data_elem.get("from_work_dir", None) output.hidden = string_as_bool( data_elem.get("hidden", "") ) output.tool = self output.actions = ToolOutputActionGroup( output, data_elem.find( 'actions' ) ) output.dataset_collectors = output_collect.dataset_collectors_from_elem( data_elem ) self.outputs[ output.name ] = output # TODO: Include the tool's name in any parsing warnings.
[docs] def parse_stdio( self, root ): """ Parse <stdio> element(s) and fill in self.return_codes, self.stderr_rules, and self.stdout_rules. Return codes have a range and an error type (fault or warning). Stderr and stdout rules have a regular expression and an error level (fault or warning). """ try: self.stdio_exit_codes = list() self.stdio_regexes = list() # We should have a single <stdio> element, but handle the case for # multiples. # For every stdio element, add all of the exit_code and regex # subelements that we find: for stdio_elem in ( root.findall( 'stdio' ) ): self.parse_stdio_exit_codes( stdio_elem ) self.parse_stdio_regexes( stdio_elem ) except Exception: log.error( "Exception in parse_stdio! " + str(sys.exc_info()) )
[docs] def parse_stdio_exit_codes( self, stdio_elem ): """ Parse the tool's <stdio> element's <exit_code> subelements. This will add all of those elements, if any, to self.stdio_exit_codes. """ try: # Look for all <exit_code> elements. Each exit_code element must # have a range/value. # Exit-code ranges have precedence over a single exit code. # So if there are value and range attributes, we use the range # attribute. If there is neither a range nor a value, then print # a warning and skip to the next. for exit_code_elem in ( stdio_elem.findall( "exit_code" ) ): exit_code = ToolStdioExitCode() # Each exit code has an optional description that can be # part of the "desc" or "description" attributes: exit_code.desc = exit_code_elem.get( "desc" ) if None == exit_code.desc: exit_code.desc = exit_code_elem.get( "description" ) # Parse the error level: exit_code.error_level = ( self.parse_error_level( exit_code_elem.get( "level" ))) code_range = exit_code_elem.get( "range", "" ) if None == code_range: code_range = exit_code_elem.get( "value", "" ) if None == code_range: log.warning( "Tool stdio exit codes must have " + "a range or value" ) continue # Parse the range. We look for: # :Y # X: # X:Y - Split on the colon. We do not allow a colon # without a beginning or end, though we could. # Also note that whitespace is eliminated. # TODO: Turn this into a single match - it should be # more efficient. code_range = re.sub( "\s", "", code_range ) code_ranges = re.split( ":", code_range ) if ( len( code_ranges ) == 2 ): if ( None == code_ranges[0] or '' == code_ranges[0] ): exit_code.range_start = float( "-inf" ) else: exit_code.range_start = int( code_ranges[0] ) if ( None == code_ranges[1] or '' == code_ranges[1] ): exit_code.range_end = float( "inf" ) else: exit_code.range_end = int( code_ranges[1] ) # If we got more than one colon, then ignore the exit code. elif ( len( code_ranges ) > 2 ): log.warning( "Invalid tool exit_code range %s - ignored" % code_range ) continue # Else we have a singular value. If it's not an integer, then # we'll just write a log message and skip this exit_code. else: try: exit_code.range_start = int( code_range ) except: log.error( code_range ) log.warning( "Invalid range start for tool's exit_code %s: exit_code ignored" % code_range ) continue exit_code.range_end = exit_code.range_start # TODO: Check if we got ">", ">=", "<", or "<=": # Check that the range, regardless of how we got it, # isn't bogus. If we have two infinite values, then # the start must be -inf and the end must be +inf. # So at least warn about this situation: if ( isinf( exit_code.range_start ) and isinf( exit_code.range_end ) ): log.warning( "Tool exit_code range %s will match on " + "all exit codes" % code_range ) self.stdio_exit_codes.append( exit_code ) except Exception: log.error( "Exception in parse_stdio_exit_codes! " + str(sys.exc_info()) ) trace = sys.exc_info()[2] if ( None != trace ): trace_msg = repr( traceback.format_tb( trace ) ) log.error( "Traceback: %s" % trace_msg )
[docs] def parse_stdio_regexes( self, stdio_elem ): """ Look in the tool's <stdio> elem for all <regex> subelements that define how to look for warnings and fatal errors in stdout and stderr. This will add all such regex elements to the Tols's stdio_regexes list. """ try: # Look for every <regex> subelement. The regular expression # will have "match" and "source" (or "src") attributes. for regex_elem in ( stdio_elem.findall( "regex" ) ): # TODO: Fill in ToolStdioRegex regex = ToolStdioRegex() # Each regex has an optional description that can be # part of the "desc" or "description" attributes: regex.desc = regex_elem.get( "desc" ) if None == regex.desc: regex.desc = regex_elem.get( "description" ) # Parse the error level regex.error_level = ( self.parse_error_level( regex_elem.get( "level" ) ) ) regex.match = regex_elem.get( "match", "" ) if None == regex.match: # TODO: Convert the offending XML element to a string log.warning( "Ignoring tool's stdio regex element %s - " "the 'match' attribute must exist" ) continue # Parse the output sources. We look for the "src", "source", # and "sources" attributes, in that order. If there is no # such source, then the source defaults to stderr & stdout. # Look for a comma and then look for "err", "error", "out", # and "output": output_srcs = regex_elem.get( "src" ) if None == output_srcs: output_srcs = regex_elem.get( "source" ) if None == output_srcs: output_srcs = regex_elem.get( "sources" ) if None == output_srcs: output_srcs = "output,error" output_srcs = re.sub( "\s", "", output_srcs ) src_list = re.split( ",", output_srcs ) # Just put together anything to do with "out", including # "stdout", "output", etc. Repeat for "stderr", "error", # and anything to do with "err". If neither stdout nor # stderr were specified, then raise a warning and scan both. for src in src_list: if re.search( "both", src, re.IGNORECASE ): regex.stdout_match = True regex.stderr_match = True if re.search( "out", src, re.IGNORECASE ): regex.stdout_match = True if re.search( "err", src, re.IGNORECASE ): regex.stderr_match = True if (not regex.stdout_match and not regex.stderr_match): log.warning( "Tool id %s: unable to determine if tool " "stream source scanning is output, error, " "or both. Defaulting to use both." % self.id ) regex.stdout_match = True regex.stderr_match = True self.stdio_regexes.append( regex ) except Exception: log.error( "Exception in parse_stdio_exit_codes! " + str(sys.exc_info()) ) trace = sys.exc_info()[2] if ( None != trace ): trace_msg = repr( traceback.format_tb( trace ) ) log.error( "Traceback: %s" % trace_msg )
def _parse_citations( self, root ): citations = [] citations_elem = root.find("citations") if not citations_elem: return citations for citation_elem in citations_elem: if citation_elem.tag != "citation": pass citation = self.app.citations_manager.parse_citation( citation_elem, self.tool_dir ) if citation: citations.append( citation ) return citations # TODO: This method doesn't have to be part of the Tool class.
[docs] def parse_error_level( self, err_level ): """ Parses error level and returns error level enumeration. If unparsable, returns 'fatal' """ return_level = StdioErrorLevel.FATAL try: if err_level: if ( re.search( "log", err_level, re.IGNORECASE ) ): return_level = StdioErrorLevel.LOG elif ( re.search( "warning", err_level, re.IGNORECASE ) ): return_level = StdioErrorLevel.WARNING elif ( re.search( "fatal", err_level, re.IGNORECASE ) ): return_level = StdioErrorLevel.FATAL else: log.debug( "Tool %s: error level %s did not match log/warning/fatal" % ( self.id, err_level ) ) except Exception: log.error( "Exception in parse_error_level " + str(sys.exc_info() ) ) trace = sys.exc_info()[2] if ( None != trace ): trace_msg = repr( traceback.format_tb( trace ) ) log.error( "Traceback: %s" % trace_msg ) return return_level
[docs] def parse_input_page( self, input_elem, enctypes ): """ Parse a page of inputs. This basically just calls 'parse_input_elem', but it also deals with possible 'display' elements which are supported only at the top/page level (not in groups). """ inputs = self.parse_input_elem( input_elem, enctypes ) # Display display_elem = input_elem.find("display") if display_elem is not None: display = xml_to_string(display_elem) else: display = None return display, inputs
[docs] def parse_input_elem( self, parent_elem, enctypes, context=None ): """ Parse a parent element whose children are inputs -- these could be groups (repeat, conditional) or param elements. Groups will be parsed recursively. """ rval = odict() context = ExpressionContext( rval, context ) for elem in parent_elem: # Repeat group if elem.tag == "repeat": group = Repeat() group.name = elem.get( "name" ) group.title = elem.get( "title" ) group.help = elem.get( "help", None ) group.inputs = self.parse_input_elem( elem, enctypes, context ) group.default = int( elem.get( "default", 0 ) ) group.min = int( elem.get( "min", 0 ) ) # Use float instead of int so that 'inf' can be used for no max group.max = float( elem.get( "max", "inf" ) ) assert group.min <= group.max, \ ValueError( "Min repeat count must be less-than-or-equal to the max." ) # Force default to be within min-max range group.default = min( max( group.default, group.min ), group.max ) rval[group.name] = group elif elem.tag == "conditional": group = Conditional() group.name = elem.get( "name" ) group.value_ref = elem.get( 'value_ref', None ) group.value_ref_in_group = string_as_bool( elem.get( 'value_ref_in_group', 'True' ) ) value_from = elem.get( "value_from" ) if value_from: value_from = value_from.split( ':' ) group.value_from = locals().get( value_from[0] ) group.test_param = rval[ group.value_ref ] group.test_param.refresh_on_change = True for attr in value_from[1].split( '.' ): group.value_from = getattr( group.value_from, attr ) for case_value, case_inputs in group.value_from( context, group, self ).iteritems(): case = ConditionalWhen() case.value = case_value if case_inputs: case.inputs = self.parse_input_elem( ElementTree.XML( "<when>%s</when>" % case_inputs ), enctypes, context ) else: case.inputs = odict() group.cases.append( case ) else: # Should have one child "input" which determines the case input_elem = elem.find( "param" ) assert input_elem is not None, "<conditional> must have a child <param>" group.test_param = self.parse_param_elem( input_elem, enctypes, context ) possible_cases = list( group.test_param.legal_values ) # store possible cases, undefined whens will have no inputs # Must refresh when test_param changes group.test_param.refresh_on_change = True # And a set of possible cases for case_elem in elem.findall( "when" ): case = ConditionalWhen() case.value = case_elem.get( "value" ) case.inputs = self.parse_input_elem( case_elem, enctypes, context ) group.cases.append( case ) try: possible_cases.remove( case.value ) except: log.warning( "Tool %s: a when tag has been defined for '%s (%s) --> %s', but does not appear to be selectable." % ( self.id, group.name, group.test_param.name, case.value ) ) for unspecified_case in possible_cases: log.warning( "Tool %s: a when tag has not been defined for '%s (%s) --> %s', assuming empty inputs." % ( self.id, group.name, group.test_param.name, unspecified_case ) ) case = ConditionalWhen() case.value = unspecified_case case.inputs = odict() group.cases.append( case ) rval[group.name] = group elif elem.tag == "upload_dataset": group = UploadDataset() group.name = elem.get( "name" ) group.title = elem.get( "title" ) group.file_type_name = elem.get( 'file_type_name', group.file_type_name ) group.default_file_type = elem.get( 'default_file_type', group.default_file_type ) group.metadata_ref = elem.get( 'metadata_ref', group.metadata_ref ) rval[ group.file_type_name ].refresh_on_change = True rval[ group.file_type_name ].refresh_on_change_values = \ self.app.datatypes_registry.get_composite_extensions() group.inputs = self.parse_input_elem( elem, enctypes, context ) rval[ group.name ] = group elif elem.tag == "param": param = self.parse_param_elem( elem, enctypes, context ) rval[param.name] = param if hasattr( param, 'data_ref' ): param.ref_input = context[ param.data_ref ] self.input_params.append( param ) return rval
[docs] def parse_param_elem( self, input_elem, enctypes, context ): """ Parse a single "<param>" element and return a ToolParameter instance. Also, if the parameter has a 'required_enctype' add it to the set enctypes. """ param = ToolParameter.build( self, input_elem ) param_enctype = param.get_required_enctype() if param_enctype: enctypes.add( param_enctype ) # If parameter depends on any other paramters, we must refresh the # form when it changes for name in param.get_dependencies(): context[ name ].refresh_on_change = True return param
[docs] def populate_tool_shed_info( self ): if self.repository_id is not None and self.app.name == 'galaxy': repository_id = self.app.security.decode_id( self.repository_id ) tool_shed_repository = self.app.install_model.context.query( self.app.install_model.ToolShedRepository ).get( repository_id ) if tool_shed_repository: self.tool_shed = tool_shed_repository.tool_shed self.repository_name = tool_shed_repository.name self.repository_owner = tool_shed_repository.owner self.installed_changeset_revision = tool_shed_repository.installed_changeset_revision
[docs] def check_workflow_compatible( self, root ): """ Determine if a tool can be used in workflows. External tools and the upload tool are currently not supported by workflows. """ # Multiple page tools are not supported -- we're eliminating most # of these anyway if self.has_multiple_pages: return False # This is probably the best bet for detecting external web tools # right now if self.tool_type.startswith( 'data_source' ): return False if not string_as_bool( root.get( "workflow_compatible", "True" ) ): return False # TODO: Anyway to capture tools that dynamically change their own # outputs? return True
[docs] def new_state( self, trans, all_pages=False, history=None ): """ Create a new `DefaultToolState` for this tool. It will be initialized with default values for inputs. Only inputs on the first page will be initialized unless `all_pages` is True, in which case all inputs regardless of page are initialized. """ state = DefaultToolState() state.inputs = {} if all_pages: inputs = self.inputs else: inputs = self.inputs_by_page[ 0 ] self.fill_in_new_state( trans, inputs, state.inputs, history=history ) return state
[docs] def fill_in_new_state( self, trans, inputs, state, context=None, history=None ): """ Fill in a tool state dictionary with default values for all parameters in the dictionary `inputs`. Grouping elements are filled in recursively. """ context = ExpressionContext( state, context ) for input in inputs.itervalues(): state[ input.name ] = input.get_initial_value( trans, context, history=history )
[docs] def get_param_html_map( self, trans, page=0, other_values={} ): """ Return a dictionary containing the HTML representation of each parameter. This is used for rendering display elements. It is currently not compatible with grouping constructs. NOTE: This should be considered deprecated, it is only used for tools with `display` elements. These should be eliminated. """ rval = dict() for key, param in self.inputs_by_page[page].iteritems(): if not isinstance( param, ToolParameter ): raise Exception( "'get_param_html_map' only supported for simple paramters" ) rval[key] = param.get_html( trans, other_values=other_values ) return rval
[docs] def get_param( self, key ): """ Returns the parameter named `key` or None if there is no such parameter. """ return self.inputs.get( key, None )
[docs] def get_hook(self, name): """ Returns an object from the code file referenced by `code_namespace` (this will normally be a callable object) """ if self.code_namespace: # Try to look up hook in self.hook_map, otherwise resort to default if name in self.hook_map and self.hook_map[name] in self.code_namespace: return self.code_namespace[self.hook_map[name]] elif name in self.code_namespace: return self.code_namespace[name] return None
[docs] def visit_inputs( self, value, callback ): """ Call the function `callback` on each parameter of this tool. Visits grouping parameters recursively and constructs unique prefixes for each nested set of The callback method is then called as: `callback( level_prefix, parameter, parameter_value )` """ # HACK: Yet another hack around check_values -- WHY HERE? if not self.check_values: return for input in self.inputs.itervalues(): if isinstance( input, ToolParameter ): callback( "", input, value[input.name] ) else: input.visit_inputs( "", value[input.name], callback )
[docs] def handle_input( self, trans, incoming, history=None, old_errors=None, process_state='update', source='html' ): """ Process incoming parameters for this tool from the dict `incoming`, update the tool state (or create if none existed), and either return to the form or execute the tool (only if 'execute' was clicked and there were no errors). process_state can be either 'update' (to incrementally build up the state over several calls - one repeat per handle for instance) or 'populate' force a complete build of the state and submission all at once (like from API). May want an incremental version of the API also at some point, that is why this is not just called for_api. """ all_pages = ( process_state == "populate" ) # If process_state = update, handle all pages at once. rerun_remap_job_id = None if 'rerun_remap_job_id' in incoming: try: rerun_remap_job_id = trans.app.security.decode_id( incoming[ 'rerun_remap_job_id' ] ) except Exception: message = 'Failure executing tool (attempting to rerun invalid job).' return 'message.mako', dict( status='error', message=message, refresh_frames=[] ) # Fixed set of input parameters may correspond to any number of jobs. # Expand these out to individual parameters for given jobs (tool # executions). expanded_incomings, collection_info = expand_meta_parameters( trans, self, incoming ) if not expanded_incomings: raise exceptions.MessageException( "Tool execution failed, trying to run a tool over an empty collection." ) # Remapping a single job to many jobs doesn't make sense, so disable # remap if multi-runs of tools are being used. if rerun_remap_job_id and len( expanded_incomings ) > 1: message = 'Failure executing tool (cannot create multiple jobs when remapping existing job).' return 'message.mako', dict( status='error', message=message, refresh_frames=[] ) all_states = [] for expanded_incoming in expanded_incomings: state, state_new = self.__fetch_state( trans, expanded_incoming, history, all_pages=all_pages ) all_states.append( state ) if state_new: # This feels a bit like a hack. It allows forcing full processing # of inputs even when there is no state in the incoming dictionary # by providing either 'runtool_btn' (the name of the submit button # on the standard run form) or "URL" (a parameter provided by # external data source tools). if "runtool_btn" not in incoming and "URL" not in incoming: if not self.display_interface: return self.__no_display_interface_response() if len(incoming): self.update_state( trans, self.inputs_by_page[state.page], state.inputs, incoming, old_errors=old_errors or {}, source=source ) return self.default_template, dict( errors={}, tool_state=state, param_values={}, incoming={} ) all_errors = [] all_params = [] for expanded_incoming, expanded_state in zip(expanded_incomings, all_states): errors, params = self.__check_param_values( trans, expanded_incoming, expanded_state, old_errors, process_state, history=history, source=source ) all_errors.append( errors ) all_params.append( params ) if self.__should_refresh_state( incoming ): template, template_vars = self.__handle_state_refresh( trans, state, errors ) else: # User actually clicked next or execute. # If there were errors, we stay on the same page and display # error messages if any( all_errors ): error_message = "One or more errors were found in the input you provided. The specific errors are marked below." template = self.default_template template_vars = dict( errors=errors, tool_state=state, incoming=incoming, error_message=error_message ) # If we've completed the last page we can execute the tool elif all_pages or state.page == self.last_page: execution_tracker = execute_job( trans, self, all_params, history=history, rerun_remap_job_id=rerun_remap_job_id, collection_info=collection_info ) if execution_tracker.successful_jobs: template = 'tool_executed.mako' template_vars = dict( out_data=execution_tracker.output_datasets, num_jobs=len( execution_tracker.successful_jobs ), job_errors=execution_tracker.execution_errors, jobs=execution_tracker.successful_jobs, implicit_collections=execution_tracker.created_collections, ) else: template = 'message.mako' template_vars = dict( status='error', message=execution_tracker.execution_errors[0], refresh_frames=[] ) # Otherwise move on to the next page else: template, template_vars = self.__handle_page_advance( trans, state, errors ) return template, template_vars
def __should_refresh_state( self, incoming ): return not( 'runtool_btn' in incoming or 'URL' in incoming or 'ajax_upload' in incoming )
[docs] def handle_single_execution( self, trans, rerun_remap_job_id, params, history ): """ Return a pair with whether execution is successful as well as either resulting output data or an error message indicating the problem. """ try: params = self.__remove_meta_properties( params ) job, out_data = self.execute( trans, incoming=params, history=history, rerun_remap_job_id=rerun_remap_job_id ) except httpexceptions.HTTPFound, e: #if it's a paste redirect exception, pass it up the stack raise e except Exception, e: log.exception('Exception caught while attempting tool execution:') message = 'Error executing tool: %s' % str(e) return False, message if isinstance( out_data, odict ): return job, out_data.items() else: if isinstance( out_data, str ): message = out_data else: message = 'Failure executing tool (invalid data returned from tool execution)' return False, message
def __handle_state_refresh( self, trans, state, errors ): try: self.find_fieldstorage( state.inputs ) except InterruptedUpload: # If inputs contain a file it won't persist. Most likely this # is an interrupted upload. We should probably find a more # standard method of determining an incomplete POST. return self.handle_interrupted( trans, state.inputs ) except: pass # Just a refresh, render the form with updated state and errors. if not self.display_interface: return self.__no_display_interface_response() return self.default_template, dict( errors=errors, tool_state=state ) def __handle_page_advance( self, trans, state, errors ): state.page += 1 # Fill in the default values for the next page self.fill_in_new_state( trans, self.inputs_by_page[ state.page ], state.inputs ) if not self.display_interface: return self.__no_display_interface_response() return self.default_template, dict( errors=errors, tool_state=state ) def __no_display_interface_response( self ): return 'message.mako', dict( status='info', message="The interface for this tool cannot be displayed", refresh_frames=['everything'] ) def __fetch_state( self, trans, incoming, history, all_pages ): # Get the state or create if not found if "tool_state" in incoming: encoded_state = string_to_object( incoming["tool_state"] ) state = DefaultToolState() state.decode( encoded_state, self, trans.app ) new = False else: state = self.new_state( trans, history=history, all_pages=all_pages ) new = True return state, new def __check_param_values( self, trans, incoming, state, old_errors, process_state, history, source ): # Process incoming data if not( self.check_values ): # If `self.check_values` is false we don't do any checking or # processing on input This is used to pass raw values # through to/from external sites. FIXME: This should be handled # more cleanly, there is no reason why external sites need to # post back to the same URL that the tool interface uses. errors = {} params = incoming else: # Update state for all inputs on the current page taking new # values from `incoming`. if process_state == "update": inputs = self.inputs_by_page[state.page] errors = self.update_state( trans, inputs, state.inputs, incoming, old_errors=old_errors or {}, source=source ) elif process_state == "populate": inputs = self.inputs errors = self.populate_state( trans, inputs, state.inputs, incoming, history, source=source ) else: raise Exception("Unknown process_state type %s" % process_state) # If the tool provides a `validate_input` hook, call it. validate_input = self.get_hook( 'validate_input' ) if validate_input: validate_input( trans, errors, state.inputs, inputs ) params = state.inputs return errors, params
[docs] def find_fieldstorage( self, x ): if isinstance( x, FieldStorage ): raise InterruptedUpload( None ) elif type( x ) is types.DictType: [ self.find_fieldstorage( y ) for y in x.values() ] elif type( x ) is types.ListType: [ self.find_fieldstorage( y ) for y in x ]
[docs] def handle_interrupted( self, trans, inputs ): """ Upon handling inputs, if it appears that we have received an incomplete form, do some cleanup or anything else deemed necessary. Currently this is only likely during file uploads, but this method could be generalized and a method standardized for handling other tools. """ # If the async upload tool has uploading datasets, we need to error them. if 'async_datasets' in inputs and inputs['async_datasets'] not in [ 'None', '', None ]: for id in inputs['async_datasets'].split(','): try: data = self.sa_session.query( trans.model.HistoryDatasetAssociation ).get( int( id ) ) except: log.exception( 'Unable to load precreated dataset (%s) sent in upload form' % id ) continue if trans.user is None and trans.galaxy_session.current_history != data.history: log.error( 'Got a precreated dataset (%s) but it does not belong to anonymous user\'s current session (%s)' % ( data.id, trans.galaxy_session.id ) ) elif data.history.user != trans.user: log.error( 'Got a precreated dataset (%s) but it does not belong to current user (%s)' % ( data.id, trans.user.id ) ) else: data.state = data.states.ERROR data.info = 'Upload of this dataset was interrupted. Please try uploading again or' self.sa_session.add( data ) self.sa_session.flush() # It's unlikely the user will ever see this. return 'message.mako', dict( status='error', message='Your upload was interrupted. If this was uninentional, please retry it.', refresh_frames=[], cont=None )
[docs] def populate_state( self, trans, inputs, state, incoming, history=None, source="html", prefix="", context=None ): errors = dict() # Push this level onto the context stack context = ExpressionContext( state, context ) for input in inputs.itervalues(): key = prefix + input.name if isinstance( input, Repeat ): group_state = state[input.name] # Create list of empty errors for each previously existing state group_errors = [ ] any_group_errors = False rep_index = 0 del group_state[:] # Clear prepopulated defaults if repeat.min set. while True: rep_name = "%s_%d" % ( key, rep_index ) if not any( [ incoming_key.startswith(rep_name) for incoming_key in incoming.keys() ] ): break if rep_index < input.max: new_state = {} new_state['__index__'] = rep_index self.fill_in_new_state( trans, input.inputs, new_state, context, history=history ) group_state.append( new_state ) group_errors.append( {} ) rep_errors = self.populate_state( trans, input.inputs, new_state, incoming, history, source, prefix=rep_name + "|", context=context ) if rep_errors: any_group_errors = True group_errors[rep_index].update( rep_errors ) else: group_errors[-1] = { '__index__': 'Cannot add repeat (max size=%i).' % input.max } any_group_errors = True rep_index += 1 elif isinstance( input, Conditional ): group_state = state[input.name] group_prefix = "%s|" % ( key ) # Deal with the 'test' element and see if its value changed if input.value_ref and not input.value_ref_in_group: # We are referencing an existent parameter, which is not # part of this group test_param_key = prefix + input.test_param.name else: test_param_key = group_prefix + input.test_param.name # Get value of test param and determine current case value, test_param_error = check_param_from_incoming( trans, group_state, input.test_param, incoming, test_param_key, context, source ) if test_param_error: errors[ input.name ] = [ test_param_error ] # Store the value of the test element group_state[ input.test_param.name ] = value else: current_case = input.get_current_case( value, trans ) # Current case has changed, throw away old state group_state = state[input.name] = {} # TODO: we should try to preserve values if we can self.fill_in_new_state( trans, input.cases[current_case].inputs, group_state, context, history=history ) group_errors = self.populate_state( trans, input.cases[current_case].inputs, group_state, incoming, history, source, prefix=group_prefix, context=context, ) if group_errors: errors[ input.name ] = group_errors # Store the current case in a special value group_state['__current_case__'] = current_case # Store the value of the test element group_state[ input.test_param.name ] = value elif isinstance( input, UploadDataset ): group_state = state[input.name] group_errors = [] any_group_errors = False d_type = input.get_datatype( trans, context ) writable_files = d_type.writable_files #remove extra files while len( group_state ) > len( writable_files ): del group_state[-1] # Add new fileupload as needed while len( writable_files ) > len( group_state ): new_state = {} new_state['__index__'] = len( group_state ) self.fill_in_new_state( trans, input.inputs, new_state, context ) group_state.append( new_state ) if any_group_errors: group_errors.append( {} ) # Update state for i, rep_state in enumerate( group_state ): rep_index = rep_state['__index__'] rep_prefix = "%s_%d|" % ( key, rep_index ) rep_errors = self.populate_state( trans, input.inputs, rep_state, incoming, history, source, prefix=rep_prefix, context=context) if rep_errors: any_group_errors = True group_errors.append( rep_errors ) else: group_errors.append( {} ) # Were there *any* errors for any repetition? if any_group_errors: errors[input.name] = group_errors else: value, error = check_param_from_incoming( trans, state, input, incoming, key, context, source ) if error: errors[ input.name ] = error state[ input.name ] = value return errors
[docs] def update_state( self, trans, inputs, state, incoming, source='html', prefix="", context=None, update_only=False, old_errors={}, item_callback=None ): """ Update the tool state in `state` using the user input in `incoming`. This is designed to be called recursively: `inputs` contains the set of inputs being processed, and `prefix` specifies a prefix to add to the name of each input to extract its value from `incoming`. If `update_only` is True, values that are not in `incoming` will not be modified. In this case `old_errors` can be provided, and any errors for parameters which were *not* updated will be preserved. """ errors = dict() # Push this level onto the context stack context = ExpressionContext( state, context ) # Iterate inputs and update (recursively) for input in inputs.itervalues(): key = prefix + input.name if isinstance( input, Repeat ): group_state = state[input.name] # Create list of empty errors for each previously existing state group_errors = [ {} for i in range( len( group_state ) ) ] group_old_errors = old_errors.get( input.name, None ) any_group_errors = False # Check any removals before updating state -- only one # removal can be performed, others will be ignored for i, rep_state in enumerate( group_state ): rep_index = rep_state['__index__'] if key + "_" + str(rep_index) + "_remove" in incoming: if len( group_state ) > input.min: del group_state[i] del group_errors[i] if group_old_errors: del group_old_errors[i] break else: group_errors[i] = { '__index__': 'Cannot remove repeat (min size=%i).' % input.min } any_group_errors = True # Only need to find one that can't be removed due to size, since only # one removal is processed at # a time anyway break elif group_old_errors and group_old_errors[i]: group_errors[i] = group_old_errors[i] any_group_errors = True # Update state max_index = -1 for i, rep_state in enumerate( group_state ): rep_index = rep_state['__index__'] max_index = max( max_index, rep_index ) rep_prefix = "%s_%d|" % ( key, rep_index ) if group_old_errors: rep_old_errors = group_old_errors[i] else: rep_old_errors = {} rep_errors = self.update_state( trans, input.inputs, rep_state, incoming, source=source, prefix=rep_prefix, context=context, update_only=update_only, old_errors=rep_old_errors, item_callback=item_callback ) if rep_errors: any_group_errors = True group_errors[i].update( rep_errors ) # Check for addition if key + "_add" in incoming: if len( group_state ) < input.max: new_state = {} new_state['__index__'] = max_index + 1 self.fill_in_new_state( trans, input.inputs, new_state, context ) group_state.append( new_state ) group_errors.append( {} ) else: group_errors[-1] = { '__index__': 'Cannot add repeat (max size=%i).' % input.max } any_group_errors = True # Were there *any* errors for any repetition? if any_group_errors: errors[input.name] = group_errors elif isinstance( input, Conditional ): group_state = state[input.name] group_old_errors = old_errors.get( input.name, {} ) old_current_case = group_state['__current_case__'] group_prefix = "%s|" % ( key ) # Deal with the 'test' element and see if its value changed if input.value_ref and not input.value_ref_in_group: # We are referencing an existent parameter, which is not # part of this group test_param_key = prefix + input.test_param.name else: test_param_key = group_prefix + input.test_param.name test_param_error = None test_incoming = get_incoming_value( incoming, test_param_key, None ) if test_param_key not in incoming \ and "__force_update__" + test_param_key not in incoming \ and update_only: # Update only, keep previous value and state, but still # recurse in case there are nested changes value = group_state[ input.test_param.name ] current_case = old_current_case if input.test_param.name in old_errors: errors[ input.test_param.name ] = old_errors[ input.test_param.name ] else: # Get value of test param and determine current case value, test_param_error = \ check_param( trans, input.test_param, test_incoming, context, source=source ) try: current_case = input.get_current_case( value, trans ) except ValueError, e: if input.is_job_resource_conditional: # Unless explicitly given job resource parameters # (e.g. from the run tool form) don't populate the # state. Along with other hacks prevents workflow # saving from populating resource defaults - which # are meant to be much more transient than the rest # of tool state. continue #load default initial value if not test_param_error: test_param_error = str( e ) if trans is not None: history = trans.get_history() else: history = None value = input.test_param.get_initial_value( trans, context, history=history ) current_case = input.get_current_case( value, trans ) case_changed = current_case != old_current_case if case_changed: # Current case has changed, throw away old state group_state = state[input.name] = {} # TODO: we should try to preserve values if we can self.fill_in_new_state( trans, input.cases[current_case].inputs, group_state, context ) group_errors = dict() group_old_errors = dict() # If we didn't just change the current case and are coming from HTML - the values # in incoming represent the old values and should not be replaced. If being updated # from the API (json) instead of HTML - form values below the current case # may also be supplied and incoming should be preferred to case defaults. if (not case_changed) or (source != "html"): # Current case has not changed, update children group_errors = self.update_state( trans, input.cases[current_case].inputs, group_state, incoming, prefix=group_prefix, context=context, source=source, update_only=update_only, old_errors=group_old_errors, item_callback=item_callback ) if input.test_param.name in group_old_errors and not test_param_error: test_param_error = group_old_errors[ input.test_param.name ] if test_param_error: group_errors[ input.test_param.name ] = test_param_error if group_errors: errors[ input.name ] = group_errors # Store the current case in a special value group_state['__current_case__'] = current_case # Store the value of the test element group_state[ input.test_param.name ] = value elif isinstance( input, UploadDataset ): group_state = state[input.name] group_errors = [] group_old_errors = old_errors.get( input.name, None ) any_group_errors = False d_type = input.get_datatype( trans, context ) writable_files = d_type.writable_files #remove extra files while len( group_state ) > len( writable_files ): del group_state[-1] if group_old_errors: del group_old_errors[-1] # Update state max_index = -1 for i, rep_state in enumerate( group_state ): rep_index = rep_state['__index__'] max_index = max( max_index, rep_index ) rep_prefix = "%s_%d|" % ( key, rep_index ) if group_old_errors: rep_old_errors = group_old_errors[i] else: rep_old_errors = {} rep_errors = self.update_state( trans, input.inputs, rep_state, incoming, prefix=rep_prefix, context=context, source=source, update_only=update_only, old_errors=rep_old_errors, item_callback=item_callback ) if rep_errors: any_group_errors = True group_errors.append( rep_errors ) else: group_errors.append( {} ) # Add new fileupload as needed offset = 1 while len( writable_files ) > len( group_state ): new_state = {} new_state['__index__'] = max_index + offset offset += 1 self.fill_in_new_state( trans, input.inputs, new_state, context ) group_state.append( new_state ) if any_group_errors: group_errors.append( {} ) # Were there *any* errors for any repetition? if any_group_errors: errors[input.name] = group_errors else: if key not in incoming \ and "__force_update__" + key not in incoming \ and update_only: # No new value provided, and we are only updating, so keep # the old value (which should already be in the state) and # preserve the old error message. if input.name in old_errors: errors[ input.name ] = old_errors[ input.name ] else: incoming_value = get_incoming_value( incoming, key, None ) value, error = check_param( trans, input, incoming_value, context, source=source ) # If a callback was provided, allow it to process the value input_name = input.name if item_callback: old_value = state.get( input_name, None ) value, error = item_callback( trans, key, input, value, error, old_value, context ) if error: errors[ input_name ] = error state[ input_name ] = value meta_properties = self.__meta_properties_for_state( key, incoming, incoming_value, value, input_name ) state.update( meta_properties ) return errors
def __remove_meta_properties( self, incoming ): result = incoming.copy() meta_property_suffixes = [ "__multirun__", "__collection_multirun__", ] for key, value in incoming.iteritems(): if any( map( lambda s: key.endswith(s), meta_property_suffixes ) ): del result[ key ] return result def __meta_properties_for_state( self, key, incoming, incoming_val, state_val, input_name ): meta_properties = {} meta_property_suffixes = [ "__multirun__", "__collection_multirun__", ] for meta_property_suffix in meta_property_suffixes: multirun_key = "%s|%s" % ( key, meta_property_suffix ) if multirun_key in incoming: multi_value = incoming[ multirun_key ] meta_properties[ "%s|%s" % ( input_name, meta_property_suffix ) ] = multi_value return meta_properties @property def params_with_missing_data_table_entry( self ): """ Return all parameters that are dynamically generated select lists whose options require an entry not currently in the tool_data_table_conf.xml file. """ params = [] for input_param in self.input_params: if isinstance( input_param, SelectToolParameter ) and input_param.is_dynamic: options = input_param.options if options and options.missing_tool_data_table_name and input_param not in params: params.append( input_param ) return params @property def params_with_missing_index_file( self ): """ Return all parameters that are dynamically generated select lists whose options refer to a missing .loc file. """ params = [] for input_param in self.input_params: if isinstance( input_param, SelectToolParameter ) and input_param.is_dynamic: options = input_param.options if options and options.missing_index_file and input_param not in params: params.append( input_param ) return params
[docs] def get_static_param_values( self, trans ): """ Returns a map of parameter names and values if the tool does not require any user input. Will raise an exception if any parameter does require input. """ args = dict() for key, param in self.inputs.iteritems(): if isinstance( param, HiddenToolParameter ): args[key] = model.User.expand_user_properties( trans.user, param.value ) elif isinstance( param, BaseURLToolParameter ): args[key] = param.get_value( trans ) else: raise Exception( "Unexpected parameter type" ) return args
[docs] def execute( self, trans, incoming={}, set_output_hid=True, history=None, **kwargs ): """ Execute the tool using parameter values in `incoming`. This just dispatches to the `ToolAction` instance specified by `self.tool_action`. In general this will create a `Job` that when run will build the tool's outputs, e.g. `DefaultToolAction`. """ return self.tool_action.execute( self, trans, incoming=incoming, set_output_hid=set_output_hid, history=history, **kwargs )
[docs] def params_to_strings( self, params, app ): return params_to_strings( self.inputs, params, app )
[docs] def params_from_strings( self, params, app, ignore_errors=False ): return params_from_strings( self.inputs, params, app, ignore_errors )
[docs] def check_and_update_param_values( self, values, trans, update_values=True, allow_workflow_parameters=False ): """ Check that all parameters have values, and fill in with default values where necessary. This could be called after loading values from a database in case new parameters have been added. """ messages = {} self.check_and_update_param_values_helper( self.inputs, values, trans, messages, update_values=update_values, allow_workflow_parameters=allow_workflow_parameters ) return messages
[docs] def check_and_update_param_values_helper( self, inputs, values, trans, messages, context=None, prefix="", update_values=True, allow_workflow_parameters=False ): """ Recursive helper for `check_and_update_param_values_helper` """ context = ExpressionContext( values, context ) for input in inputs.itervalues(): # No value, insert the default if input.name not in values: if isinstance( input, Conditional ): cond_messages = {} if not input.is_job_resource_conditional: cond_messages = { input.test_param.name: "No value found for '%s%s', used default" % ( prefix, input.label ) } messages[ input.name ] = cond_messages test_value = input.test_param.get_initial_value( trans, context ) current_case = input.get_current_case( test_value, trans ) self.check_and_update_param_values_helper( input.cases[ current_case ].inputs, {}, trans, cond_messages, context, prefix, allow_workflow_parameters=allow_workflow_parameters ) elif isinstance( input, Repeat ): if input.min: messages[ input.name ] = [] for i in range( input.min ): rep_prefix = prefix + "%s %d > " % ( input.title, i + 1 ) rep_dict = dict() messages[ input.name ].append( rep_dict ) self.check_and_update_param_values_helper( input.inputs, {}, trans, rep_dict, context, rep_prefix, allow_workflow_parameters=allow_workflow_parameters ) else: messages[ input.name ] = "No value found for '%s%s', used default" % ( prefix, input.label ) values[ input.name ] = input.get_initial_value( trans, context ) # Value, visit recursively as usual else: if isinstance( input, Repeat ): for i, d in enumerate( values[ input.name ] ): rep_prefix = prefix + "%s %d > " % ( input.title, i + 1 ) self.check_and_update_param_values_helper( input.inputs, d, trans, messages, context, rep_prefix, allow_workflow_parameters=allow_workflow_parameters ) elif isinstance( input, Conditional ): group_values = values[ input.name ] if input.test_param.name not in group_values: # No test param invalidates the whole conditional values[ input.name ] = group_values = input.get_initial_value( trans, context ) messages[ input.test_param.name ] = "No value found for '%s%s', used default" % ( prefix, input.test_param.label ) current_case = group_values['__current_case__'] for child_input in input.cases[current_case].inputs.itervalues(): messages[ child_input.name ] = "Value no longer valid for '%s%s', replaced with default" % ( prefix, child_input.label ) else: current = group_values["__current_case__"] self.check_and_update_param_values_helper( input.cases[current].inputs, group_values, trans, messages, context, prefix, allow_workflow_parameters=allow_workflow_parameters ) else: # Regular tool parameter, no recursion needed try: ck_param = True if allow_workflow_parameters and isinstance( values[ input.name ], basestring ): if WORKFLOW_PARAMETER_REGULAR_EXPRESSION.search( values[ input.name ] ): ck_param = False #this will fail when a parameter's type has changed to a non-compatible one: e.g. conditional group changed to dataset input if ck_param: input.value_from_basic( input.value_to_basic( values[ input.name ], trans.app ), trans.app, ignore_errors=False ) except: messages[ input.name ] = "Value no longer valid for '%s%s', replaced with default" % ( prefix, input.label ) if update_values: values[ input.name ] = input.get_initial_value( trans, context )
[docs] def handle_unvalidated_param_values( self, input_values, app ): """ Find any instances of `UnvalidatedValue` within input_values and validate them (by calling `ToolParameter.from_html` and `ToolParameter.validate`). """ # No validation is done when check_values is False if not self.check_values: return self.handle_unvalidated_param_values_helper( self.inputs, input_values, app )
[docs] def handle_unvalidated_param_values_helper( self, inputs, input_values, app, context=None, prefix="" ): """ Recursive helper for `handle_unvalidated_param_values` """ context = ExpressionContext( input_values, context ) for input in inputs.itervalues(): if isinstance( input, Repeat ): for i, d in enumerate( input_values[ input.name ] ): rep_prefix = prefix + "%s %d > " % ( input.title, i + 1 ) self.handle_unvalidated_param_values_helper( input.inputs, d, app, context, rep_prefix ) elif isinstance( input, Conditional ): values = input_values[ input.name ] current = values["__current_case__"] # NOTE: The test param doesn't need to be checked since # there would be no way to tell what case to use at # workflow build time. However I'm not sure if we are # actually preventing such a case explicately. self.handle_unvalidated_param_values_helper( input.cases[current].inputs, values, app, context, prefix ) else: # Regular tool parameter value = input_values[ input.name ] if isinstance( value, UnvalidatedValue ): try: # Convert from html representation if value.value is None: # If value.value is None, it could not have been # submited via html form and therefore .from_html # can't be guaranteed to work value = None else: value = input.from_html( value.value, None, context ) # Do any further validation on the value input.validate( value, None ) except Exception, e: # Wrap an re-raise any generated error so we can # generate a more informative message message = "Failed runtime validation of %s%s (%s)" \ % ( prefix, input.label, e ) raise LateValidationError( message ) input_values[ input.name ] = value
[docs] def handle_job_failure_exception( self, e ): """ Called by job.fail when an exception is generated to allow generation of a better error message (returning None yields the default behavior) """ message = None # If the exception was generated by late validation, use its error # message (contains the parameter name and value) if isinstance( e, LateValidationError ): message = e.message return message
[docs] def build_dependency_shell_commands( self ): """Return a list of commands to be run to populate the current environment to include this tools requirements.""" return self.app.toolbox.dependency_manager.dependency_shell_commands( self.requirements, installed_tool_dependencies=self.installed_tool_dependencies )
@property def installed_tool_dependencies(self): if self.tool_shed_repository: installed_tool_dependencies = self.tool_shed_repository.tool_dependencies_installed_or_in_error else: installed_tool_dependencies = None return installed_tool_dependencies
[docs] def build_redirect_url_params( self, param_dict ): """ Substitute parameter values into self.redirect_url_params """ if not self.redirect_url_params: return redirect_url_params = None # Substituting parameter values into the url params redirect_url_params = fill_template( self.redirect_url_params, context=param_dict ) # Remove newlines redirect_url_params = redirect_url_params.replace( "\n", " " ).replace( "\r", " " ) return redirect_url_params
[docs] def parse_redirect_url( self, data, param_dict ): """ Parse the REDIRECT_URL tool param. Tools that send data to an external application via a redirect must include the following 3 tool params: 1) REDIRECT_URL - the url to which the data is being sent 2) DATA_URL - the url to which the receiving application will send an http post to retrieve the Galaxy data 3) GALAXY_URL - the url to which the external application may post data as a response """ redirect_url = param_dict.get( 'REDIRECT_URL' ) redirect_url_params = self.build_redirect_url_params( param_dict ) # Add the parameters to the redirect url. We're splitting the param # string on '**^**' because the self.parse() method replaced white # space with that separator. params = redirect_url_params.split( '**^**' ) rup_dict = {} for param in params: p_list = param.split( '=' ) p_name = p_list[0] p_val = p_list[1] rup_dict[ p_name ] = p_val DATA_URL = param_dict.get( 'DATA_URL', None ) assert DATA_URL is not None, "DATA_URL parameter missing in tool config." DATA_URL += "/%s/display" % str( data.id ) redirect_url += "?DATA_URL=%s" % DATA_URL # Add the redirect_url_params to redirect_url for p_name in rup_dict: redirect_url += "&%s=%s" % ( p_name, rup_dict[ p_name ] ) # Add the current user email to redirect_url if data.history.user: USERNAME = str( data.history.user.email ) else: USERNAME = 'Anonymous' redirect_url += "&USERNAME=%s" % USERNAME return redirect_url
[docs] def call_hook( self, hook_name, *args, **kwargs ): """ Call the custom code hook function identified by 'hook_name' if any, and return the results """ try: code = self.get_hook( hook_name ) if code: return code( *args, **kwargs ) except Exception, e: original_message = '' if len( e.args ): original_message = e.args[0] e.args = ( "Error in '%s' hook '%s', original message: %s" % ( self.name, hook_name, original_message ), ) raise
[docs] def exec_before_job( self, app, inp_data, out_data, param_dict={} ): pass
[docs] def exec_after_process( self, app, inp_data, out_data, param_dict, job=None ): pass
[docs] def job_failed( self, job_wrapper, message, exception=False ): """ Called when a job has failed """ pass
[docs] def collect_associated_files( self, output, job_working_directory ): """ Find extra files in the job working directory and move them into the appropriate dataset's files directory """ for name, hda in output.items(): temp_file_path = os.path.join( job_working_directory, "dataset_%s_files" % ( hda.dataset.id ) ) extra_dir = None try: # This skips creation of directories - object store # automatically creates them. However, empty directories will # not be created in the object store at all, which might be a # problem. for root, dirs, files in os.walk( temp_file_path ): extra_dir = root.replace(job_working_directory, '', 1).lstrip(os.path.sep) for f in files: self.app.object_store.update_from_file(hda.dataset, extra_dir=extra_dir, alt_name=f, file_name=os.path.join(root, f), create=True, preserve_symlinks=True ) # Clean up after being handled by object store. # FIXME: If the object (e.g., S3) becomes async, this will # cause issues so add it to the object store functionality? if extra_dir is not None: # there was an extra_files_path dir, attempt to remove it shutil.rmtree(temp_file_path) except Exception, e: log.debug( "Error in collect_associated_files: %s" % ( e ) ) continue
[docs] def collect_child_datasets( self, output, job_working_directory ): """ Look for child dataset files, create HDA and attach to parent. """ children = {} # Loop through output file names, looking for generated children in # form of 'child_parentId_designation_visibility_extension' for name, outdata in output.items(): filenames = [] if 'new_file_path' in self.app.config.collect_outputs_from: filenames.extend( glob.glob(os.path.join(self.app.config.new_file_path, "child_%i_*" % outdata.id) ) ) if 'job_working_directory' in self.app.config.collect_outputs_from: filenames.extend( glob.glob(os.path.join(job_working_directory, "child_%i_*" % outdata.id) ) ) for filename in filenames: if not name in children: children[name] = {} fields = os.path.basename(filename).split("_") fields.pop(0) parent_id = int(fields.pop(0)) designation = fields.pop(0) visible = fields.pop(0).lower() if visible == "visible": visible = True else: visible = False ext = fields.pop(0).lower() child_dataset = self.app.model.HistoryDatasetAssociation( extension=ext, parent_id=outdata.id, designation=designation, visible=visible, dbkey=outdata.dbkey, create_dataset=True, sa_session=self.sa_session ) self.app.security_agent.copy_dataset_permissions( outdata.dataset, child_dataset.dataset ) # Move data from temp location to dataset location self.app.object_store.update_from_file(child_dataset.dataset, file_name=filename, create=True) self.sa_session.add( child_dataset ) self.sa_session.flush() child_dataset.set_size() child_dataset.name = "Secondary Dataset (%s)" % ( designation ) child_dataset.init_meta() child_dataset.set_meta() child_dataset.set_peek() # Associate new dataset with job job = None for assoc in outdata.creating_job_associations: job = assoc.job break if job: assoc = self.app.model.JobToOutputDatasetAssociation( '__new_child_file_%s|%s__' % ( name, designation ), child_dataset ) assoc.job = job self.sa_session.add( assoc ) self.sa_session.flush() child_dataset.state = outdata.state self.sa_session.add( child_dataset ) self.sa_session.flush() # Add child to return dict children[name][designation] = child_dataset # Need to update all associated output hdas, i.e. history was # shared with job running for dataset in outdata.dataset.history_associations: if outdata == dataset: continue # Create new child dataset child_data = child_dataset.copy( parent_id=dataset.id ) self.sa_session.add( child_data ) self.sa_session.flush() return children
[docs] def collect_primary_datasets( self, output, job_working_directory, input_ext ): """ Find any additional datasets generated by a tool and attach (for cases where number of outputs is not known in advance). """ return output_collect.collect_primary_datasets( self, output, job_working_directory, input_ext )
[docs] def to_dict( self, trans, link_details=False, io_details=False ): """ Returns dict of tool. """ # Basic information tool_dict = super( Tool, self ).to_dict() # Add link details. if link_details: # Add details for creating a hyperlink to the tool. if not isinstance( self, DataSourceTool ): link = url_for( controller='tool_runner', tool_id=self.id ) else: link = url_for( controller='tool_runner', action='data_source_redirect', tool_id=self.id ) # Basic information tool_dict.update( { 'link': link, 'min_width': self.uihints.get( 'minwidth', -1 ), 'target': self.target } ) # Add input and output details. if io_details: tool_dict[ 'inputs' ] = [ input.to_dict( trans ) for input in self.inputs.values() ] tool_dict[ 'outputs' ] = [ output.to_dict() for output in self.outputs.values() ] tool_dict[ 'panel_section_id' ], tool_dict[ 'panel_section_name' ] = self.get_panel_section() return tool_dict
[docs] def get_default_history_by_trans( self, trans, create=False ): return trans.get_history( create=create )
@classmethod
[docs] def get_externally_referenced_paths( self, path ): """ Return relative paths to externally referenced files by the tool described by file at `path`. External components should not assume things about the structure of tool xml files (this is the tool's responsibility). """ tree = raw_tool_xml_tree(path) root = tree.getroot() external_paths = [] for code_elem in root.findall( 'code' ): external_path = code_elem.get( 'file' ) if external_path: external_paths.append( external_path ) external_paths.extend( imported_macro_paths( root ) ) # May also need to load external citation files as well at some point. return external_paths
[docs]class OutputParameterJSONTool( Tool ): """ Alternate implementation of Tool that provides parameters and other values JSONified within the contents of an output dataset """ tool_type = 'output_parameter_json' def _prepare_json_list( self, param_list ): rval = [] for value in param_list: if isinstance( value, dict ): rval.append( self._prepare_json_param_dict( value ) ) elif isinstance( value, list ): rval.append( self._prepare_json_list( value ) ) else: rval.append( str( value ) ) return rval def _prepare_json_param_dict( self, param_dict ): rval = {} for key, value in param_dict.iteritems(): if isinstance( value, dict ): rval[ key ] = self._prepare_json_param_dict( value ) elif isinstance( value, list ): rval[ key ] = self._prepare_json_list( value ) else: rval[ key ] = str( value ) return rval
[docs] def exec_before_job( self, app, inp_data, out_data, param_dict=None ): if param_dict is None: param_dict = {} json_params = {} json_params[ 'param_dict' ] = self._prepare_json_param_dict( param_dict ) # it would probably be better to store the original incoming parameters here, instead of the Galaxy modified ones? json_params[ 'output_data' ] = [] json_params[ 'job_config' ] = dict( GALAXY_DATATYPES_CONF_FILE=param_dict.get( 'GALAXY_DATATYPES_CONF_FILE' ), GALAXY_ROOT_DIR=param_dict.get( 'GALAXY_ROOT_DIR' ), TOOL_PROVIDED_JOB_METADATA_FILE=jobs.TOOL_PROVIDED_JOB_METADATA_FILE ) json_filename = None for i, ( out_name, data ) in enumerate( out_data.iteritems() ): #use wrapped dataset to access certain values wrapped_data = param_dict.get( out_name ) #allow multiple files to be created file_name = str( wrapped_data ) extra_files_path = str( wrapped_data.files_path ) data_dict = dict( out_data_name=out_name, ext=data.ext, dataset_id=data.dataset.id, hda_id=data.id, file_name=file_name, extra_files_path=extra_files_path ) json_params[ 'output_data' ].append( data_dict ) if json_filename is None: json_filename = file_name out = open( json_filename, 'w' ) out.write( json.dumps( json_params ) ) out.close()
[docs]class DataSourceTool( OutputParameterJSONTool ): """ Alternate implementation of Tool for data_source tools -- those that allow the user to query and extract data from another web site. """ tool_type = 'data_source' default_tool_action = DataSourceToolAction def _build_GALAXY_URL_parameter( self ): return ToolParameter.build( self, ElementTree.XML( '<param name="GALAXY_URL" type="baseurl" value="/tool_runner?tool_id=%s" />' % self.id ) )
[docs] def parse_inputs( self, root ): super( DataSourceTool, self ).parse_inputs( root ) if 'GALAXY_URL' not in self.inputs: self.inputs[ 'GALAXY_URL' ] = self._build_GALAXY_URL_parameter() self.inputs_by_page[0][ 'GALAXY_URL' ] = self.inputs[ 'GALAXY_URL' ]
[docs] def exec_before_job( self, app, inp_data, out_data, param_dict=None ): if param_dict is None: param_dict = {} dbkey = param_dict.get( 'dbkey' ) info = param_dict.get( 'info' ) data_type = param_dict.get( 'data_type' ) name = param_dict.get( 'name' ) json_params = {} json_params[ 'param_dict' ] = self._prepare_json_param_dict( param_dict ) # it would probably be better to store the original incoming parameters here, instead of the Galaxy modified ones? json_params[ 'output_data' ] = [] json_params[ 'job_config' ] = dict( GALAXY_DATATYPES_CONF_FILE=param_dict.get( 'GALAXY_DATATYPES_CONF_FILE' ), GALAXY_ROOT_DIR=param_dict.get( 'GALAXY_ROOT_DIR' ), TOOL_PROVIDED_JOB_METADATA_FILE=jobs.TOOL_PROVIDED_JOB_METADATA_FILE ) json_filename = None for i, ( out_name, data ) in enumerate( out_data.iteritems() ): #use wrapped dataset to access certain values wrapped_data = param_dict.get( out_name ) #allow multiple files to be created cur_base_param_name = 'GALAXY|%s|' % out_name cur_name = param_dict.get( cur_base_param_name + 'name', name ) cur_dbkey = param_dict.get( cur_base_param_name + 'dkey', dbkey ) cur_info = param_dict.get( cur_base_param_name + 'info', info ) cur_data_type = param_dict.get( cur_base_param_name + 'data_type', data_type ) if cur_name: data.name = cur_name if not data.info and cur_info: data.info = cur_info if cur_dbkey: data.dbkey = cur_dbkey if cur_data_type: data.extension = cur_data_type file_name = str( wrapped_data ) extra_files_path = str( wrapped_data.files_path ) data_dict = dict( out_data_name=out_name, ext=data.ext, dataset_id=data.dataset.id, hda_id=data.id, file_name=file_name, extra_files_path=extra_files_path ) json_params[ 'output_data' ].append( data_dict ) if json_filename is None: json_filename = file_name out = open( json_filename, 'w' ) out.write( json.dumps( json_params ) ) out.close()
[docs]class AsyncDataSourceTool( DataSourceTool ): tool_type = 'data_source_async' def _build_GALAXY_URL_parameter( self ): return ToolParameter.build( self, ElementTree.XML( '<param name="GALAXY_URL" type="baseurl" value="/async/%s" />' % self.id ) )
[docs]class DataDestinationTool( Tool ): tool_type = 'data_destination'
[docs]class SetMetadataTool( Tool ): """ Tool implementation for special tool that sets metadata on an existing dataset. """ tool_type = 'set_metadata' requires_setting_metadata = False
[docs] def exec_after_process( self, app, inp_data, out_data, param_dict, job=None ): for name, dataset in inp_data.iteritems(): external_metadata = JobExternalOutputMetadataWrapper( job ) if external_metadata.external_metadata_set_successfully( dataset, app.model.context ): dataset.metadata.from_JSON_dict( external_metadata.get_output_filenames_by_dataset( dataset, app.model.context ).filename_out ) else: dataset._state = model.Dataset.states.FAILED_METADATA self.sa_session.add( dataset ) self.sa_session.flush() return # If setting external metadata has failed, how can we inform the # user? For now, we'll leave the default metadata and set the state # back to its original. dataset.datatype.after_setting_metadata( dataset ) if job and job.tool_id == '1.0.0': dataset.state = param_dict.get( '__ORIGINAL_DATASET_STATE__' ) else: # Revert dataset.state to fall back to dataset.dataset.state dataset._state = None # Need to reset the peek, which may rely on metadata dataset.set_peek() self.sa_session.add( dataset ) self.sa_session.flush()
[docs] def job_failed( self, job_wrapper, message, exception=False ): job = job_wrapper.sa_session.query( model.Job ).get( job_wrapper.job_id ) if job: inp_data = {} for dataset_assoc in job.input_datasets: inp_data[dataset_assoc.name] = dataset_assoc.dataset return self.exec_after_process( job_wrapper.app, inp_data, {}, job_wrapper.get_param_dict(), job=job )
[docs]class ExportHistoryTool( Tool ): tool_type = 'export_history'
[docs]class ImportHistoryTool( Tool ): tool_type = 'import_history'
[docs]class GenomeIndexTool( Tool ): tool_type = 'index_genome'
[docs]class DataManagerTool( OutputParameterJSONTool ): tool_type = 'manage_data' default_tool_action = DataManagerToolAction def __init__( self, config_file, root, app, guid=None, data_manager_id=None, **kwds ): self.data_manager_id = data_manager_id super( DataManagerTool, self ).__init__( config_file, root, app, guid=guid, **kwds ) if self.data_manager_id is None: self.data_manager_id = self.id
[docs] def exec_after_process( self, app, inp_data, out_data, param_dict, job=None, **kwds ): assert self.allow_user_access( job.user ), "You must be an admin to access this tool." #run original exec_after_process super( DataManagerTool, self ).exec_after_process( app, inp_data, out_data, param_dict, job=job, **kwds ) #process results of tool if job and job.state == job.states.ERROR: return #Job state may now be 'running' instead of previous 'error', but datasets are still set to e.g. error for dataset in out_data.itervalues(): if dataset.state != dataset.states.OK: return data_manager_id = job.data_manager_association.data_manager_id data_manager = self.app.data_managers.get_manager( data_manager_id, None ) assert data_manager is not None, "Invalid data manager (%s) requested. It may have been removed before the job completed." % ( data_manager_id ) data_manager.process_result( out_data )
[docs] def get_default_history_by_trans( self, trans, create=False ): def _create_data_manager_history( user ): history = trans.app.model.History( name='Data Manager History (automatically created)', user=user ) data_manager_association = trans.app.model.DataManagerHistoryAssociation( user=user, history=history ) trans.sa_session.add_all( ( history, data_manager_association ) ) trans.sa_session.flush() return history user = trans.user assert user, 'You must be logged in to use this tool.' assert self.allow_user_access( user ), "You must be an admin to access this tool." history = user.data_manager_histories if not history: #create if create: history = _create_data_manager_history( user ) else: history = None else: for history in reversed( history ): history = history.history if not history.deleted: break if history.deleted: if create: history = _create_data_manager_history( user ) else: history = None return history
[docs] def allow_user_access( self, user ): """ :returns: bool -- Whether the user is allowed to access the tool. Data Manager tools are only accessible to admins. """ if super( DataManagerTool, self ).allow_user_access( user ) and self.app.config.is_admin_user( user ): return True if user: user = user.id log.debug( "User (%s) attempted to access a data manager tool (%s), but is not an admin.", user, self.id ) return False # Populate tool_type to ToolClass mappings
tool_types = {} for tool_class in [ Tool, SetMetadataTool, OutputParameterJSONTool, DataManagerTool, DataSourceTool, AsyncDataSourceTool, DataDestinationTool ]: tool_types[ tool_class.tool_type ] = tool_class # ---- Utility classes to be factored out -----------------------------------
[docs]class TracksterConfig: """ Trackster configuration encapsulation. """ def __init__( self, actions ): self.actions = actions @staticmethod
[docs] def parse( root ): actions = [] for action_elt in root.findall( "action" ): actions.append( SetParamAction.parse( action_elt ) ) return TracksterConfig( actions )
[docs]class SetParamAction: """ Set parameter action. """ def __init__( self, name, output_name ): self.name = name self.output_name = output_name @staticmethod
[docs] def parse( elt ): """ Parse action from element. """ return SetParamAction( elt.get( "name" ), elt.get( "output_name" ) )
[docs]class BadValue( object ): def __init__( self, value ): self.value = value
[docs]class ToolStdioRegex( object ): """ This is a container for the <stdio> element's regex subelement. The regex subelement has a "match" attribute, a "sources" attribute that contains "output" and/or "error", and a "level" attribute that contains "warning" or "fatal". """ def __init__( self ): self.match = "" self.stdout_match = False self.stderr_match = False # TODO: Define a common class or constant for error level: self.error_level = "fatal" self.desc = ""
[docs]class ToolStdioExitCode( object ): """ This is a container for the <stdio> element's <exit_code> subelement. The exit_code element has a range of exit codes and the error level. """ def __init__( self ): self.range_start = float( "-inf" ) self.range_end = float( "inf" ) # TODO: Define a common class or constant for error level: self.error_level = "fatal" self.desc = ""
[docs]def json_fix( val ): if isinstance( val, list ): return [ json_fix( v ) for v in val ] elif isinstance( val, dict ): return dict( [ ( json_fix( k ), json_fix( v ) ) for ( k, v ) in val.iteritems() ] ) elif isinstance( val, unicode ): return val.encode( "utf8" ) else: return val
[docs]def check_param_from_incoming( trans, state, input, incoming, key, context, source ): """ Unlike "update" state, this preserves default if no incoming value found. This lets API user specify just a subset of params and allow defaults to be used when available. """ default_input_value = state.get( input.name, None ) incoming_value = get_incoming_value( incoming, key, default_input_value ) value, error = check_param( trans, input, incoming_value, context, source=source ) return value, error
[docs]def get_incoming_value( incoming, key, default ): """ Fetch value from incoming dict directly or check special nginx upload created variants of this key. """ if "__" + key + "__is_composite" in incoming: composite_keys = incoming["__" + key + "__keys"].split() value = dict() for composite_key in composite_keys: value[composite_key] = incoming[key + "_" + composite_key] return value else: return incoming.get( key, default )
[docs]class InterruptedUpload( Exception ): pass