Source code for galaxy.tools
"""
Classes encapsulating galaxy tools and tool configuration.
"""
import binascii
import glob
import json
import logging
import os
import re
import shutil
import threading
import types
import urllib
import copy
from galaxy import eggs, util
eggs.require( "MarkupSafe" ) # MarkupSafe must load before mako
eggs.require( "Mako" )
eggs.require( "Paste" )
eggs.require( "SQLAlchemy >= 0.4" )
from cgi import FieldStorage
from xml.etree import ElementTree
from mako.template import Template
from paste import httpexceptions
from galaxy import model
from galaxy.datatypes.metadata import JobExternalOutputMetadataWrapper
from galaxy import exceptions
from galaxy.tools.actions import DefaultToolAction
from galaxy.tools.actions.upload import UploadToolAction
from galaxy.tools.actions.data_source import DataSourceToolAction
from galaxy.tools.actions.data_manager import DataManagerToolAction
from galaxy.tools.deps import build_dependency_manager
from galaxy.tools.parameters import params_to_incoming, check_param, params_from_strings, params_to_strings, visit_input_values
from galaxy.tools.parameters import output_collect
from galaxy.tools.parameters.basic import (BaseURLToolParameter,
DataToolParameter, DataCollectionToolParameter, HiddenToolParameter, LibraryDatasetToolParameter,
SelectToolParameter, ToolParameter, UnvalidatedValue,
IntegerToolParameter, FloatToolParameter)
from galaxy.tools.parameters.grouping import Conditional, ConditionalWhen, Repeat, Section, UploadDataset
from galaxy.tools.parameters.input_translation import ToolInputTranslator
from galaxy.tools.parameters.output import ToolOutputActionGroup
from galaxy.tools.parameters.validation import LateValidationError
from galaxy.tools.test import parse_tests
from galaxy.tools.parser import get_tool_source
from galaxy.tools.parser.xml import XmlPageSource
from galaxy.tools.toolbox import AbstractToolBox
from galaxy.util import rst_to_html, string_as_bool, string_to_object
from galaxy.tools.parameters.meta import expand_meta_parameters
from galaxy.util.bunch import Bunch
from galaxy.util.expressions import ExpressionContext
from galaxy.util.hash_util import hmac_new, is_hashable
from galaxy.util.odict import odict
from galaxy.util.template import fill_template
from galaxy.web import url_for
from galaxy.model.item_attrs import Dictifiable
from tool_shed.util import shed_util_common as suc
from .loader import template_macro_params, raw_tool_xml_tree, imported_macro_paths
from .execute import execute as execute_job
from .wrappers import (
ToolParameterValueWrapper,
RawObjectWrapper,
LibraryDatasetValueWrapper,
InputValueWrapper,
SelectToolParameterWrapper,
DatasetFilenameWrapper,
DatasetListWrapper,
DatasetCollectionWrapper,
)
import galaxy.jobs
log = logging.getLogger( __name__ )
WORKFLOW_PARAMETER_REGULAR_EXPRESSION = re.compile( '''\$\{.+?\}''' )
JOB_RESOURCE_CONDITIONAL_XML = """<conditional name="__job_resource">
<param name="__job_resource__select" type="select" label="Job Resource Parameters">
<option value="no">Use default job resource parameters</option>
<option value="yes">Specify job resource parameters</option>
</param>
<when value="no"></when>
<when value="yes">
</when>
</conditional>"""
HELP_UNINITIALIZED = threading.Lock()
[docs]class ToolBox( AbstractToolBox ):
""" A derivative of AbstractToolBox with knowledge about Tool internals -
how to construct them, action types, dependency management, etc....
"""
def __init__( self, config_filenames, tool_root_dir, app ):
super( ToolBox, self ).__init__(
config_filenames=config_filenames,
tool_root_dir=tool_root_dir,
app=app,
)
self._init_dependency_manager()
@property
def tools_by_id( self ):
# Deprecated method, TODO - eliminate calls to this in test/.
return self._tools_by_id
[docs] def create_tool( self, config_file, repository_id=None, guid=None, **kwds ):
tool_source = get_tool_source( config_file, getattr( self.app.config, "enable_beta_tool_formats", False ) )
# Allow specifying a different tool subclass to instantiate
tool_module = tool_source.parse_tool_module()
if tool_module is not None:
module, cls = tool_module
mod = __import__( module, globals(), locals(), [cls] )
ToolClass = getattr( mod, cls )
elif tool_source.parse_tool_type():
tool_type = tool_source.parse_tool_type()
ToolClass = tool_types.get( tool_type )
else:
# Normal tool - only insert dynamic resource parameters for these
# tools.
root = getattr( tool_source, "root", None )
# TODO: mucking with the XML directly like this is terrible,
# modify inputs directly post load if possible.
if root is not None and hasattr( self.app, "job_config" ): # toolshed may not have job_config?
tool_id = root.get( 'id' )
parameters = self.app.job_config.get_tool_resource_parameters( tool_id )
if parameters:
inputs = root.find('inputs')
# If tool has not inputs, create some so we can insert conditional
if inputs is None:
inputs = ElementTree.fromstring( "<inputs></inputs>")
root.append( inputs )
# Insert a conditional allowing user to specify resource parameters.
conditional_element = ElementTree.fromstring( JOB_RESOURCE_CONDITIONAL_XML )
when_yes_elem = conditional_element.findall( "when" )[ 1 ]
for parameter in parameters:
when_yes_elem.append( parameter )
inputs.append( conditional_element )
ToolClass = Tool
tool = ToolClass( config_file, tool_source, self.app, guid=guid, repository_id=repository_id, **kwds )
return tool
def _init_dependency_manager( self ):
self.dependency_manager = build_dependency_manager( self.app.config )
[docs] def handle_datatypes_changed( self ):
""" Refresh upload tools when new datatypes are added. """
for tool_id in self._tools_by_id:
tool = self._tools_by_id[ tool_id ]
if isinstance( tool.tool_action, UploadToolAction ):
self.reload_tool_by_id( tool_id )
[docs]class DefaultToolState( object ):
"""
Keeps track of the state of a users interaction with a tool between
requests. The default tool state keeps track of the current page (for
multipage "wizard" tools) and the values of all
"""
def __init__( self ):
self.page = 0
self.rerun_remap_job_id = None
self.inputs = None
[docs] def encode( self, tool, app, secure=True ):
"""
Convert the data to a string
"""
# Convert parameters to a dictionary of strings, and save curent
# page in that dict
value = params_to_strings( tool.inputs, self.inputs, app )
value["__page__"] = self.page
value["__rerun_remap_job_id__"] = self.rerun_remap_job_id
value = json.dumps( value )
# Make it secure
if secure:
a = hmac_new( app.config.tool_secret, value )
b = binascii.hexlify( value )
return "%s:%s" % ( a, b )
else:
return value
[docs] def decode( self, value, tool, app, secure=True ):
"""
Restore the state from a string
"""
if secure:
# Extract and verify hash
a, b = value.split( ":" )
value = binascii.unhexlify( b )
test = hmac_new( app.config.tool_secret, value )
assert a == test
# Restore from string
values = json_fix( json.loads( value ) )
self.page = values.pop( "__page__" )
if '__rerun_remap_job_id__' in values:
self.rerun_remap_job_id = values.pop( "__rerun_remap_job_id__" )
else:
self.rerun_remap_job_id = None
self.inputs = params_from_strings( tool.inputs, values, app, ignore_errors=True )
[docs] def copy( self ):
"""
WARNING! Makes a shallow copy, *SHOULD* rework to have it make a deep
copy.
"""
new_state = DefaultToolState()
new_state.page = self.page
new_state.rerun_remap_job_id = self.rerun_remap_job_id
# This need to be copied.
new_state.inputs = self.inputs
return new_state
[docs]class ToolOutputBase( object, Dictifiable ):
def __init__( self, name, label=None, filters=None, hidden=False ):
super( ToolOutputBase, self ).__init__()
self.name = name
self.label = label
self.filters = filters or []
self.hidden = hidden
self.collection = False
[docs]class ToolOutput( ToolOutputBase ):
"""
Represents an output datasets produced by a tool. For backward
compatibility this behaves as if it were the tuple::
(format, metadata_source, parent)
"""
dict_collection_visible_keys = ( 'name', 'format', 'label', 'hidden' )
def __init__( self, name, format=None, format_source=None, metadata_source=None,
parent=None, label=None, filters=None, actions=None, hidden=False,
implicit=False ):
super( ToolOutput, self ).__init__( name, label=label, filters=filters, hidden=hidden )
self.format = format
self.format_source = format_source
self.metadata_source = metadata_source
self.parent = parent
self.actions = actions
# Initialize default values
self.change_format = []
self.implicit = implicit
# Tuple emulation
def __len__( self ):
return 3
def __getitem__( self, index ):
if index == 0:
return self.format
elif index == 1:
return self.metadata_source
elif index == 2:
return self.parent
else:
raise IndexError( index )
def __iter__( self ):
return iter( ( self.format, self.metadata_source, self.parent ) )
[docs]class ToolOutputCollection( ToolOutputBase ):
"""
Represents a HistoryDatasetCollectionAssociation of output datasets produced by a tool.
<outputs>
<dataset_collection type="list" label="${tool.name} on ${on_string} fasta">
<discover_datasets pattern="__name__" ext="fasta" visible="True" directory="outputFiles" />
</dataset_collection>
<dataset_collection type="paired" label="${tool.name} on ${on_string} paired reads">
<data name="forward" format="fastqsanger" />
<data name="reverse" format="fastqsanger"/>
</dataset_collection>
<outputs>
"""
def __init__(
self,
name,
structure,
label=None,
filters=None,
hidden=False,
default_format="data",
default_format_source=None,
default_metadata_source=None,
inherit_format=False,
inherit_metadata=False
):
super( ToolOutputCollection, self ).__init__( name, label=label, filters=filters, hidden=hidden )
self.collection = True
self.default_format = default_format
self.structure = structure
self.outputs = odict()
self.inherit_format = inherit_format
self.inherit_metadata = inherit_metadata
self.metadata_source = default_metadata_source
self.format_source = default_format_source
self.change_format = [] # TODO
[docs] def known_outputs( self, inputs, type_registry ):
if self.dynamic_structure:
return []
def to_part( ( element_identifier, output ) ):
return ToolOutputCollectionPart( self, element_identifier, output )
# This line is probably not right - should verify structured_like
# or have outputs and all outputs have name.
if len( self.outputs ) > 1:
outputs = self.outputs
else:
# either must have specified structured_like or something worse
if self.structure.structured_like:
collection_prototype = inputs[ self.structure.structured_like ].collection
else:
collection_prototype = type_registry.prototype( self.structure.collection_type )
# TODO: Handle nested structures.
outputs = odict()
for element in collection_prototype.elements:
name = element.element_identifier
format = self.default_format
if self.inherit_format:
format = element.dataset_instance.ext
output = ToolOutput(
name,
format=format,
format_source=self.format_source,
metadata_source=self.metadata_source,
implicit=True,
)
if self.inherit_metadata:
output.metadata_source = element.dataset_instance
outputs[ element.element_identifier ] = output
return map( to_part, outputs.items() )
@property
def dynamic_structure(self):
return self.structure.dynamic
@property
def dataset_collectors(self):
if not self.dynamic_structure:
raise Exception("dataset_collectors called for output collection with static structure")
return self.structure.dataset_collectors
[docs]class ToolOutputCollectionStructure( object ):
def __init__(
self,
collection_type,
structured_like,
dataset_collectors,
):
self.collection_type = collection_type
self.structured_like = structured_like
self.dataset_collectors = dataset_collectors
if collection_type is None and structured_like is None and dataset_collectors is None:
raise ValueError( "Output collection types must be specify type of structured_like" )
if dataset_collectors and structured_like:
raise ValueError( "Cannot specify dynamic structure (discovered_datasets) and structured_like attribute." )
self.dynamic = dataset_collectors is not None
[docs]class ToolOutputCollectionPart( object ):
def __init__( self, output_collection_def, element_identifier, output_def ):
self.output_collection_def = output_collection_def
self.element_identifier = element_identifier
self.output_def = output_def
@property
def effective_output_name( self ):
name = self.output_collection_def.name
part_name = self.element_identifier
effective_output_name = "%s|__part__|%s" % ( name, part_name )
return effective_output_name
@staticmethod
@staticmethod
[docs] def split_output_name( name ):
assert ToolOutputCollectionPart.is_named_collection_part_name( name )
return name.split("|__part__|")
[docs]class Tool( object, Dictifiable ):
"""
Represents a computational tool that can be executed through Galaxy.
"""
tool_type = 'default'
requires_setting_metadata = True
default_tool_action = DefaultToolAction
dict_collection_visible_keys = ( 'id', 'name', 'version', 'description' )
default_template = 'tool_form.mako'
def __init__( self, config_file, tool_source, app, guid=None, repository_id=None, allow_code_files=True ):
"""Load a tool from the config named by `config_file`"""
# Determine the full path of the directory where the tool config is
self.config_file = config_file
self.tool_dir = os.path.dirname( config_file )
self.app = app
self.repository_id = repository_id
self._allow_code_files = allow_code_files
#setup initial attribute values
self.inputs = odict()
self.stdio_exit_codes = list()
self.stdio_regexes = list()
self.inputs_by_page = list()
self.display_by_page = list()
self.action = '/tool_runner/index'
self.target = 'galaxy_main'
self.method = 'post'
self.check_values = True
self.nginx_upload = False
self.input_required = False
self.display_interface = True
self.require_login = False
self.rerun = False
# Define a place to keep track of all input These
# differ from the inputs dictionary in that inputs can be page
# elements like conditionals, but input_params are basic form
# parameters like SelectField objects. This enables us to more
# easily ensure that parameter dependencies like index files or
# tool_data_table_conf.xml entries exist.
self.input_params = []
# Attributes of tools installed from Galaxy tool sheds.
self.tool_shed = None
self.repository_name = None
self.repository_owner = None
self.installed_changeset_revision = None
# The tool.id value will be the value of guid, but we'll keep the
# guid attribute since it is useful to have.
self.guid = guid
self.old_id = None
self.version = None
# Enable easy access to this tool's version lineage.
self.lineage_ids = []
#populate toolshed repository info, if available
self.populate_tool_shed_info()
# Parse XML element containing configuration
self.parse( tool_source, guid=guid )
self.external_runJob_script = app.config.drmaa_external_runjob_script
@property
def sa_session( self ):
"""Returns a SQLAlchemy session"""
return self.app.model.context
@property
def tool_version( self ):
"""Return a ToolVersion if one exists for our id"""
return self.app.install_model.context.query( self.app.install_model.ToolVersion ) \
.filter( self.app.install_model.ToolVersion.table.c.tool_id == self.id ) \
.first()
@property
def tool_versions( self ):
# If we have versions, return them.
tool_version = self.tool_version
if tool_version:
return tool_version.get_versions( self.app )
return []
@property
def tool_shed_repository( self ):
# If this tool is included in an installed tool shed repository, return it.
if self.tool_shed:
return suc.get_tool_shed_repository_by_shed_name_owner_installed_changeset_revision( self.app,
self.tool_shed,
self.repository_name,
self.repository_owner,
self.installed_changeset_revision )
return None
@property
def produces_collections( self ):
return any( o.collection for o in self.outputs.values() )
def __get_job_tool_configuration(self, job_params=None):
"""Generalized method for getting this tool's job configuration.
:type job_params: dict or None
:returns: `galaxy.jobs.JobToolConfiguration` -- JobToolConfiguration that matches this `Tool` and the given `job_params`
"""
rval = None
if len(self.job_tool_configurations) == 1:
# If there's only one config, use it rather than wasting time on comparisons
rval = self.job_tool_configurations[0]
elif job_params is None:
for job_tool_config in self.job_tool_configurations:
if not job_tool_config.params:
rval = job_tool_config
break
else:
for job_tool_config in self.job_tool_configurations:
if job_tool_config.params:
# There are job params and this config has params defined
for param, value in job_params.items():
if param not in job_tool_config.params or job_tool_config.params[param] != job_params[param]:
break
else:
# All params match, use this config
rval = job_tool_config
break
else:
rval = job_tool_config
assert rval is not None, 'Could not get a job tool configuration for Tool %s with job_params %s, this is a bug' % (self.id, job_params)
return rval
[docs] def get_job_handler(self, job_params=None):
"""Get a suitable job handler for this `Tool` given the provided `job_params`. If multiple handlers are valid for combination of `Tool` and `job_params` (e.g. the defined handler is a handler tag), one will be selected at random.
:param job_params: Any params specific to this job (e.g. the job source)
:type job_params: dict or None
:returns: str -- The id of a job handler for a job run of this `Tool`
"""
# convert tag to ID if necessary
return self.app.job_config.get_handler(self.__get_job_tool_configuration(job_params=job_params).handler)
[docs] def get_job_destination(self, job_params=None):
"""
:returns: galaxy.jobs.JobDestination -- The destination definition and runner parameters.
"""
return self.app.job_config.get_destination(self.__get_job_tool_configuration(job_params=job_params).destination)
[docs] def get_panel_section( self ):
return self.app.toolbox.get_integrated_section_for_tool( self )
[docs] def allow_user_access( self, user, attempting_access=True ):
"""
:returns: bool -- Whether the user is allowed to access the tool.
"""
return True
[docs] def parse( self, tool_source, guid=None ):
"""
Read tool configuration from the element `root` and fill in `self`.
"""
# Get the (user visible) name of the tool
self.name = tool_source.parse_name()
if not self.name:
raise Exception( "Missing tool 'name'" )
# Get the UNIQUE id for the tool
self.old_id = tool_source.parse_id()
if guid is None:
self.id = self.old_id
else:
self.id = guid
if not self.id:
raise Exception( "Missing tool 'id'" )
self.version = tool_source.parse_version()
if not self.version:
# For backward compatibility, some tools may not have versions yet.
self.version = "1.0.0"
# Support multi-byte tools
self.is_multi_byte = tool_source.parse_is_multi_byte()
# Legacy feature, ignored by UI.
self.force_history_refresh = False
self.display_interface = tool_source.parse_display_interface( default=self.display_interface )
self.require_login = tool_source.parse_require_login( self.require_login )
request_param_translation_elem = tool_source.parse_request_param_translation_elem()
if request_param_translation_elem is not None:
# Load input translator, used by datasource tools to change names/values of incoming parameters
self.input_translator = ToolInputTranslator.from_element( request_param_translation_elem )
else:
self.input_translator = None
# Command line (template). Optional for tools that do not invoke a local program
command = tool_source.parse_command()
if command is not None:
self.command = command.lstrip() # get rid of leading whitespace
# Must pre-pend this AFTER processing the cheetah command template
self.interpreter = tool_source.parse_interpreter()
else:
self.command = ''
self.interpreter = None
# Parameters used to build URL for redirection to external app
redirect_url_params = tool_source.parse_redirect_url_params_elem()
if redirect_url_params is not None and redirect_url_params.text is not None:
# get rid of leading / trailing white space
redirect_url_params = redirect_url_params.text.strip()
# Replace remaining white space with something we can safely split on later
# when we are building the params
self.redirect_url_params = redirect_url_params.replace( ' ', '**^**' )
else:
self.redirect_url_params = ''
# Short description of the tool
self.description = tool_source.parse_description()
# Versioning for tools
self.version_string_cmd = None
version_command = tool_source.parse_version_command()
if version_command is not None:
self.version_string_cmd = version_command.strip()
version_cmd_interpreter = tool_source.parse_version_command_interpreter()
if version_cmd_interpreter:
executable = self.version_string_cmd.split()[0]
abs_executable = os.path.abspath(os.path.join(self.tool_dir, executable))
command_line = self.version_string_cmd.replace(executable, abs_executable, 1)
self.version_string_cmd = version_cmd_interpreter + " " + command_line
# Parallelism for tasks, read from tool config.
self.parallelism = tool_source.parse_parallelism()
# Get JobToolConfiguration(s) valid for this particular Tool. At least
# a 'default' will be provided that uses the 'default' handler and
# 'default' destination. I thought about moving this to the
# job_config, but it makes more sense to store here. -nate
self_ids = [ self.id.lower() ]
if self.old_id != self.id:
# Handle toolshed guids
self_ids = [ self.id.lower(), self.id.lower().rsplit('/', 1)[0], self.old_id.lower() ]
self.all_ids = self_ids
# In the toolshed context, there is no job config.
if 'job_config' in dir(self.app):
self.job_tool_configurations = self.app.job_config.get_job_tool_configurations(self_ids)
# Is this a 'hidden' tool (hidden in tool menu)
self.hidden = tool_source.parse_hidden()
self.__parse_legacy_features(tool_source)
# Load any tool specific options (optional)
self.options = dict( sanitize=True, refresh=False )
self.__update_options_dict( tool_source )
self.options = Bunch(** self.options)
# Parse tool inputs (if there are any required)
self.parse_inputs( tool_source )
# Parse tool help
self.parse_help( tool_source )
# Description of outputs produced by an invocation of the tool
self.parse_outputs( tool_source )
# Parse result handling for tool exit codes and stdout/stderr messages:
self.parse_stdio( tool_source )
# Any extra generated config files for the tool
self.__parse_config_files(tool_source)
# Action
action = tool_source.parse_action_module()
if action is None:
self.tool_action = self.default_tool_action()
else:
module, cls = action
mod = __import__( module, globals(), locals(), [cls])
self.tool_action = getattr( mod, cls )()
# Tests
self.__parse_tests(tool_source)
# Requirements (dependencies)
requirements, containers = tool_source.parse_requirements_and_containers()
self.requirements = requirements
self.containers = containers
self.citations = self._parse_citations( tool_source )
# Determine if this tool can be used in workflows
self.is_workflow_compatible = self.check_workflow_compatible(tool_source)
self.__parse_trackster_conf( tool_source )
def __parse_legacy_features(self, tool_source):
self.code_namespace = dict()
self.hook_map = {}
self.uihints = {}
if not hasattr(tool_source, 'root'):
return
# TODO: Move following logic into XmlToolSource.
root = tool_source.root
# Load any tool specific code (optional) Edit: INS 5/29/2007,
# allow code files to have access to the individual tool's
# "module" if it has one. Allows us to reuse code files, etc.
if self._allow_code_files:
for code_elem in root.findall("code"):
for hook_elem in code_elem.findall("hook"):
for key, value in hook_elem.items():
# map hook to function
self.hook_map[key] = value
file_name = code_elem.get("file")
code_path = os.path.join( self.tool_dir, file_name )
execfile( code_path, self.code_namespace )
# User interface hints
uihints_elem = root.find( "uihints" )
if uihints_elem is not None:
for key, value in uihints_elem.attrib.iteritems():
self.uihints[ key ] = value
def __update_options_dict(self, tool_source):
# TODO: Move following logic into ToolSource abstraction.
if not hasattr(tool_source, 'root'):
return
root = tool_source.root
for option_elem in root.findall("options"):
for option, value in self.options.copy().items():
if isinstance(value, type(False)):
self.options[option] = string_as_bool(option_elem.get(option, str(value)))
else:
self.options[option] = option_elem.get(option, str(value))
def __parse_tests(self, tool_source):
self.__tests_source = tool_source
self.__tests_populated = False
def __parse_config_files(self, tool_source):
self.config_files = []
if not hasattr(tool_source, 'root'):
return
root = tool_source.root
conf_parent_elem = root.find("configfiles")
if conf_parent_elem is not None:
for conf_elem in conf_parent_elem.findall( "configfile" ):
name = conf_elem.get( "name" )
filename = conf_elem.get( "filename", None )
text = conf_elem.text
self.config_files.append( ( name, filename, text ) )
def __parse_trackster_conf(self, tool_source):
self.trackster_conf = None
if not hasattr(tool_source, 'root'):
return
# Trackster configuration.
trackster_conf = tool_source.root.find( "trackster_conf" )
if trackster_conf is not None:
self.trackster_conf = TracksterConfig.parse( trackster_conf )
@property
def tests( self ):
if not self.__tests_populated:
tests_source = self.__tests_source
if tests_source:
try:
self.__tests = parse_tests( self, tests_source )
except:
self.__tests = None
log.exception( "Failed to parse tool tests" )
else:
self.__tests = None
self.__tests_populated = True
return self.__tests
[docs] def parse_inputs( self, tool_source ):
"""
Parse the "<inputs>" element and create appropriate `ToolParameter`s.
This implementation supports multiple pages and grouping constructs.
"""
# Load parameters (optional)
pages = tool_source.parse_input_pages()
enctypes = set()
if pages.inputs_defined:
if hasattr(pages, "input_elem"):
input_elem = pages.input_elem
# Handle properties of the input form
self.check_values = string_as_bool( input_elem.get("check_values", self.check_values ) )
self.nginx_upload = string_as_bool( input_elem.get( "nginx_upload", self.nginx_upload ) )
self.action = input_elem.get( 'action', self.action )
# If we have an nginx upload, save the action as a tuple instead of
# a string. The actual action needs to get url_for run to add any
# prefixes, and we want to avoid adding the prefix to the
# nginx_upload_path. This logic is handled in the tool_form.mako
# template.
if self.nginx_upload and self.app.config.nginx_upload_path:
if '?' in urllib.unquote_plus( self.action ):
raise Exception( 'URL parameters in a non-default tool action can not be used '
'in conjunction with nginx upload. Please convert them to '
'hidden POST parameters' )
self.action = (self.app.config.nginx_upload_path + '?nginx_redir=',
urllib.unquote_plus(self.action))
self.target = input_elem.get( "target", self.target )
self.method = input_elem.get( "method", self.method )
# Parse the actual parameters
# Handle multiple page case
for page_source in pages.page_sources:
display, inputs = self.parse_input_page( page_source, enctypes )
self.inputs_by_page.append( inputs )
self.inputs.update( inputs )
self.display_by_page.append( display )
else:
self.inputs_by_page.append( self.inputs )
self.display_by_page.append( None )
self.display = self.display_by_page[0]
self.npages = len( self.inputs_by_page )
self.last_page = len( self.inputs_by_page ) - 1
self.has_multiple_pages = bool( self.last_page )
# Determine the needed enctype for the form
if len( enctypes ) == 0:
self.enctype = "application/x-www-form-urlencoded"
elif len( enctypes ) == 1:
self.enctype = enctypes.pop()
else:
raise Exception( "Conflicting required enctypes: %s" % str( enctypes ) )
# Check if the tool either has no parameters or only hidden (and
# thus hardcoded) FIXME: hidden parameters aren't
# parameters at all really, and should be passed in a different
# way, making this check easier.
template_macros = {}
if hasattr(tool_source, 'root'):
template_macros = template_macro_params(tool_source.root)
self.template_macro_params = template_macros
for param in self.inputs.values():
if not isinstance( param, ( HiddenToolParameter, BaseURLToolParameter ) ):
self.input_required = True
break
[docs] def parse_help( self, tool_source ):
"""
Parse the help text for the tool. Formatted in reStructuredText, but
stored as Mako to allow for dynamic image paths.
This implementation supports multiple pages.
"""
# TODO: Allow raw HTML or an external link.
self.__help = HELP_UNINITIALIZED
self.__help_by_page = HELP_UNINITIALIZED
self.__help_source = tool_source
[docs] def parse_outputs( self, tool_source ):
"""
Parse <outputs> elements and fill in self.outputs (keyed by name)
"""
self.outputs, self.output_collections = tool_source.parse_outputs(self)
# TODO: Include the tool's name in any parsing warnings.
[docs] def parse_stdio( self, tool_source ):
"""
Parse <stdio> element(s) and fill in self.return_codes,
self.stderr_rules, and self.stdout_rules. Return codes have a range
and an error type (fault or warning). Stderr and stdout rules have
a regular expression and an error level (fault or warning).
"""
exit_codes, regexes = tool_source.parse_stdio()
self.stdio_exit_codes = exit_codes
self.stdio_regexes = regexes
def _parse_citations( self, tool_source ):
# TODO: Move following logic into ToolSource abstraction.
if not hasattr(tool_source, 'root'):
return []
root = tool_source.root
citations = []
citations_elem = root.find("citations")
if citations_elem is None:
return citations
for citation_elem in citations_elem:
if citation_elem.tag != "citation":
pass
citation = self.app.citations_manager.parse_citation( citation_elem, self.tool_dir )
if citation:
citations.append( citation )
return citations
[docs] def parse_input_page( self, page_source, enctypes ):
"""
Parse a page of inputs. This basically just calls 'parse_input_elem',
but it also deals with possible 'display' elements which are supported
only at the top/page level (not in groups).
"""
inputs = self.parse_input_elem( page_source, enctypes )
# Display
display = page_source.parse_display()
return display, inputs
[docs] def parse_input_elem( self, page_source, enctypes, context=None ):
"""
Parse a parent element whose children are inputs -- these could be
groups (repeat, conditional) or param elements. Groups will be parsed
recursively.
"""
rval = odict()
context = ExpressionContext( rval, context )
for input_source in page_source.parse_input_sources():
# Repeat group
input_type = input_source.parse_input_type()
if input_type == "repeat":
group = Repeat()
group.name = input_source.get( "name" )
group.title = input_source.get( "title" )
group.help = input_source.get( "help", None )
page_source = input_source.parse_nested_inputs_source()
group.inputs = self.parse_input_elem( page_source, enctypes, context )
group.default = int( input_source.get( "default", 0 ) )
group.min = int( input_source.get( "min", 0 ) )
# Use float instead of int so that 'inf' can be used for no max
group.max = float( input_source.get( "max", "inf" ) )
assert group.min <= group.max, \
ValueError( "Min repeat count must be less-than-or-equal to the max." )
# Force default to be within min-max range
group.default = min( max( group.default, group.min ), group.max )
rval[group.name] = group
elif input_type == "conditional":
group = Conditional()
group.name = input_source.get( "name" )
group.value_ref = input_source.get( 'value_ref', None )
group.value_ref_in_group = input_source.get_bool( 'value_ref_in_group', True )
value_from = input_source.get("value_from", None)
if value_from:
value_from = value_from.split( ':' )
group.value_from = locals().get( value_from[0] )
group.test_param = rval[ group.value_ref ]
group.test_param.refresh_on_change = True
for attr in value_from[1].split( '.' ):
group.value_from = getattr( group.value_from, attr )
for case_value, case_inputs in group.value_from( context, group, self ).iteritems():
case = ConditionalWhen()
case.value = case_value
if case_inputs:
page_source = XmlPageSource( ElementTree.XML( "<when>%s</when>" % case_inputs ) )
case.inputs = self.parse_input_elem( page_source, enctypes, context )
else:
case.inputs = odict()
group.cases.append( case )
else:
# Should have one child "input" which determines the case
test_param_input_source = input_source.parse_test_input_source()
group.test_param = self.parse_param_elem( test_param_input_source, enctypes, context )
possible_cases = list( group.test_param.legal_values ) # store possible cases, undefined whens will have no inputs
# Must refresh when test_param changes
group.test_param.refresh_on_change = True
# And a set of possible cases
for (value, case_inputs_source) in input_source.parse_when_input_sources():
case = ConditionalWhen()
case.value = value
case.inputs = self.parse_input_elem( case_inputs_source, enctypes, context )
group.cases.append( case )
try:
possible_cases.remove( case.value )
except:
log.warning( "Tool %s: a when tag has been defined for '%s (%s) --> %s', but does not appear to be selectable." %
( self.id, group.name, group.test_param.name, case.value ) )
for unspecified_case in possible_cases:
log.warning( "Tool %s: a when tag has not been defined for '%s (%s) --> %s', assuming empty inputs." %
( self.id, group.name, group.test_param.name, unspecified_case ) )
case = ConditionalWhen()
case.value = unspecified_case
case.inputs = odict()
group.cases.append( case )
rval[group.name] = group
elif input_type == "section":
group = Section()
group.name = input_source.get( "name" )
group.title = input_source.get( "title" )
group.help = input_source.get( "help", None )
group.expanded = input_source.get_bool( "expanded", False )
page_source = input_source.parse_nested_inputs_source()
group.inputs = self.parse_input_elem( page_source, enctypes, context )
rval[group.name] = group
elif input_type == "upload_dataset":
elem = input_source.elem()
group = UploadDataset()
group.name = elem.get( "name" )
group.title = elem.get( "title" )
group.file_type_name = elem.get( 'file_type_name', group.file_type_name )
group.default_file_type = elem.get( 'default_file_type', group.default_file_type )
group.metadata_ref = elem.get( 'metadata_ref', group.metadata_ref )
rval[ group.file_type_name ].refresh_on_change = True
rval[ group.file_type_name ].refresh_on_change_values = \
self.app.datatypes_registry.get_composite_extensions()
group_page_source = XmlPageSource(elem)
group.inputs = self.parse_input_elem( group_page_source, enctypes, context )
rval[ group.name ] = group
elif input_type == "param":
param = self.parse_param_elem( input_source, enctypes, context )
rval[param.name] = param
if hasattr( param, 'data_ref' ):
param.ref_input = context[ param.data_ref ]
self.input_params.append( param )
return rval
[docs] def parse_param_elem( self, input_source, enctypes, context ):
"""
Parse a single "<param>" element and return a ToolParameter instance.
Also, if the parameter has a 'required_enctype' add it to the set
enctypes.
"""
param = ToolParameter.build( self, input_source )
param_enctype = param.get_required_enctype()
if param_enctype:
enctypes.add( param_enctype )
# If parameter depends on any other paramters, we must refresh the
# form when it changes
for name in param.get_dependencies():
context[ name ].refresh_on_change = True
return param
[docs] def populate_tool_shed_info( self ):
if self.repository_id is not None and self.app.name == 'galaxy':
repository_id = self.app.security.decode_id( self.repository_id )
tool_shed_repository = self.app.install_model.context.query( self.app.install_model.ToolShedRepository ).get( repository_id )
if tool_shed_repository:
self.tool_shed = tool_shed_repository.tool_shed
self.repository_name = tool_shed_repository.name
self.repository_owner = tool_shed_repository.owner
self.installed_changeset_revision = tool_shed_repository.installed_changeset_revision
@property
def help(self):
if self.__help is HELP_UNINITIALIZED:
self.__ensure_help()
return self.__help
@property
def help_by_page(self):
if self.__help_by_page is HELP_UNINITIALIZED:
self.__ensure_help()
return self.__help_by_page
def __ensure_help(self):
with HELP_UNINITIALIZED:
if self.__help is HELP_UNINITIALIZED:
self.__inititalize_help()
def __inititalize_help(self):
tool_source = self.__help_source
self.__help = None
self.__help_by_page = []
help_header = ""
help_footer = ""
help_text = tool_source.parse_help()
if help_text is not None:
if self.repository_id and help_text.find( '.. image:: ' ) >= 0:
# Handle tool help image display for tools that are contained in repositories in the tool shed or installed into Galaxy.
try:
help_text = suc.set_image_paths( self.app, self.repository_id, help_text )
except Exception, e:
log.exception( "Exception in parse_help, so images may not be properly displayed:\n%s" % str( e ) )
try:
self.__help = Template( rst_to_html(help_text), input_encoding='utf-8',
output_encoding='utf-8', default_filters=[ 'decode.utf8' ],
encoding_errors='replace' )
except:
log.exception( "error in help for tool %s" % self.name )
# Handle deprecated multi-page help text in XML case.
if hasattr(tool_source, "root"):
help_elem = tool_source.root.find("help")
help_header = help_text
help_pages = help_elem.findall( "page" )
# Multiple help page case
if help_pages:
for help_page in help_pages:
self.__help_by_page.append( help_page.text )
help_footer = help_footer + help_page.tail
# Each page has to rendered all-together because of backreferences allowed by rst
try:
self.__help_by_page = [ Template( rst_to_html( help_header + x + help_footer ),
input_encoding='utf-8', output_encoding='utf-8',
default_filters=[ 'decode.utf8' ],
encoding_errors='replace' )
for x in self.__help_by_page ]
except:
log.exception( "error in multi-page help for tool %s" % self.name )
# Pad out help pages to match npages ... could this be done better?
while len( self.__help_by_page ) < self.npages:
self.__help_by_page.append( self.__help )
[docs] def find_output_def( self, name ):
# name is JobToOutputDatasetAssociation name.
# TODO: to defensive, just throw IndexError and catch somewhere
# up that stack.
if ToolOutputCollectionPart.is_named_collection_part_name( name ):
collection_name, part = ToolOutputCollectionPart.split_output_name( name )
collection_def = self.output_collections.get( collection_name, None )
if not collection_def:
return None
return collection_def.outputs.get( part, None )
else:
return self.outputs.get( name, None )
[docs] def check_workflow_compatible( self, tool_source ):
"""
Determine if a tool can be used in workflows. External tools and the
upload tool are currently not supported by workflows.
"""
# Multiple page tools are not supported -- we're eliminating most
# of these anyway
if self.has_multiple_pages:
return False
# This is probably the best bet for detecting external web tools
# right now
if self.tool_type.startswith( 'data_source' ):
return False
if self.produces_collections:
# Someday we will get there!
return False
if hasattr( tool_source, "root"):
root = tool_source.root
if not string_as_bool( root.get( "workflow_compatible", "True" ) ):
return False
# TODO: Anyway to capture tools that dynamically change their own
# outputs?
return True
[docs] def new_state( self, trans, all_pages=False, history=None ):
"""
Create a new `DefaultToolState` for this tool. It will be initialized
with default values for inputs.
Only inputs on the first page will be initialized unless `all_pages` is
True, in which case all inputs regardless of page are initialized.
"""
state = DefaultToolState()
state.inputs = {}
if all_pages:
inputs = self.inputs
else:
inputs = self.inputs_by_page[ 0 ]
self.fill_in_new_state( trans, inputs, state.inputs, history=history )
return state
[docs] def fill_in_new_state( self, trans, inputs, state, context=None, history=None ):
"""
Fill in a tool state dictionary with default values for all parameters
in the dictionary `inputs`. Grouping elements are filled in recursively.
"""
context = ExpressionContext( state, context )
for input in inputs.itervalues():
state[ input.name ] = input.get_initial_value( trans, context, history=history )
[docs] def get_param_html_map( self, trans, page=0, other_values={} ):
"""
Return a dictionary containing the HTML representation of each
parameter. This is used for rendering display elements. It is
currently not compatible with grouping constructs.
NOTE: This should be considered deprecated, it is only used for tools
with `display` elements. These should be eliminated.
"""
rval = dict()
for key, param in self.inputs_by_page[page].iteritems():
if not isinstance( param, ToolParameter ):
raise Exception( "'get_param_html_map' only supported for simple paramters" )
rval[key] = param.get_html( trans, other_values=other_values )
return rval
[docs] def get_param( self, key ):
"""
Returns the parameter named `key` or None if there is no such
parameter.
"""
return self.inputs.get( key, None )
[docs] def get_hook(self, name):
"""
Returns an object from the code file referenced by `code_namespace`
(this will normally be a callable object)
"""
if self.code_namespace:
# Try to look up hook in self.hook_map, otherwise resort to default
if name in self.hook_map and self.hook_map[name] in self.code_namespace:
return self.code_namespace[self.hook_map[name]]
elif name in self.code_namespace:
return self.code_namespace[name]
return None
[docs] def visit_inputs( self, value, callback ):
"""
Call the function `callback` on each parameter of this tool. Visits
grouping parameters recursively and constructs unique prefixes for
each nested set of The callback method is then called as:
`callback( level_prefix, parameter, parameter_value )`
"""
# HACK: Yet another hack around check_values -- WHY HERE?
if not self.check_values:
return
for input in self.inputs.itervalues():
if isinstance( input, ToolParameter ):
callback( "", input, value[input.name] )
else:
input.visit_inputs( "", value[input.name], callback )
[docs] def handle_input( self, trans, incoming, history=None, old_errors=None, process_state='update', source='html' ):
"""
Process incoming parameters for this tool from the dict `incoming`,
update the tool state (or create if none existed), and either return
to the form or execute the tool (only if 'execute' was clicked and
there were no errors).
process_state can be either 'update' (to incrementally build up the state
over several calls - one repeat per handle for instance) or 'populate'
force a complete build of the state and submission all at once (like
from API). May want an incremental version of the API also at some point,
that is why this is not just called for_api.
"""
all_pages = ( process_state == "populate" ) # If process_state = update, handle all pages at once.
rerun_remap_job_id = None
if 'rerun_remap_job_id' in incoming:
try:
rerun_remap_job_id = trans.app.security.decode_id( incoming[ 'rerun_remap_job_id' ] )
except Exception, exception:
log.error( str( exception ) )
message = 'Failure executing tool (attempting to rerun invalid job).'
return 'message.mako', dict( status='error', message=message, refresh_frames=[] )
# Fixed set of input parameters may correspond to any number of jobs.
# Expand these out to individual parameters for given jobs (tool
# executions).
expanded_incomings, collection_info = expand_meta_parameters( trans, self, incoming )
if not expanded_incomings:
raise exceptions.MessageException( "Tool execution failed, trying to run a tool over an empty collection." )
# Remapping a single job to many jobs doesn't make sense, so disable
# remap if multi-runs of tools are being used.
if rerun_remap_job_id and len( expanded_incomings ) > 1:
message = 'Failure executing tool (cannot create multiple jobs when remapping existing job).'
return 'message.mako', dict( status='error', message=message, refresh_frames=[] )
all_states = []
for expanded_incoming in expanded_incomings:
state, state_new = self.__fetch_state( trans, expanded_incoming, history, all_pages=all_pages )
all_states.append( state )
if state_new:
# This feels a bit like a hack. It allows forcing full processing
# of inputs even when there is no state in the incoming dictionary
# by providing either 'runtool_btn' (the name of the submit button
# on the standard run form) or "URL" (a parameter provided by
# external data source tools).
if "runtool_btn" not in incoming and "URL" not in incoming:
if not self.display_interface:
return self.__no_display_interface_response()
if len(incoming):
self.update_state( trans, self.inputs_by_page[state.page], state.inputs, incoming, old_errors=old_errors or {}, source=source )
return self.default_template, dict( errors={}, tool_state=state, param_values={}, incoming={} )
all_errors = []
all_params = []
for expanded_incoming, expanded_state in zip(expanded_incomings, all_states):
errors, params = self.__check_param_values( trans, expanded_incoming, expanded_state, old_errors, process_state, history=history, source=source )
all_errors.append( errors )
all_params.append( params )
if self.__should_refresh_state( incoming ):
template, template_vars = self.__handle_state_refresh( trans, state, errors )
else:
# User actually clicked next or execute.
# If there were errors, we stay on the same page and display
# error messages
if any( all_errors ):
error_message = "One or more errors were found in the input you provided. The specific errors are marked below."
template = self.default_template
template_vars = dict( errors=errors, tool_state=state, incoming=incoming, error_message=error_message )
# If we've completed the last page we can execute the tool
elif all_pages or state.page == self.last_page:
execution_tracker = execute_job( trans, self, all_params, history=history, rerun_remap_job_id=rerun_remap_job_id, collection_info=collection_info )
if execution_tracker.successful_jobs:
template = 'tool_executed.mako'
template_vars = dict(
out_data=execution_tracker.output_datasets,
num_jobs=len( execution_tracker.successful_jobs ),
job_errors=execution_tracker.execution_errors,
jobs=execution_tracker.successful_jobs,
output_collections=execution_tracker.output_collections,
implicit_collections=execution_tracker.implicit_collections,
)
else:
template = 'message.mako'
template_vars = dict( status='error', message=execution_tracker.execution_errors[0], refresh_frames=[] )
# Otherwise move on to the next page
else:
template, template_vars = self.__handle_page_advance( trans, state, errors )
return template, template_vars
def __should_refresh_state( self, incoming ):
return not( 'runtool_btn' in incoming or 'URL' in incoming or 'ajax_upload' in incoming )
[docs] def handle_single_execution( self, trans, rerun_remap_job_id, params, history, mapping_over_collection ):
"""
Return a pair with whether execution is successful as well as either
resulting output data or an error message indicating the problem.
"""
try:
params = self.__remove_meta_properties( params )
job, out_data = self.execute( trans, incoming=params, history=history, rerun_remap_job_id=rerun_remap_job_id, mapping_over_collection=mapping_over_collection )
except httpexceptions.HTTPFound, e:
#if it's a paste redirect exception, pass it up the stack
raise e
except Exception, e:
log.exception('Exception caught while attempting tool execution:')
message = 'Error executing tool: %s' % str(e)
return False, message
if isinstance( out_data, odict ):
return job, out_data.items()
else:
if isinstance( out_data, str ):
message = out_data
else:
message = 'Failure executing tool (invalid data returned from tool execution)'
return False, message
def __handle_state_refresh( self, trans, state, errors ):
try:
self.find_fieldstorage( state.inputs )
except InterruptedUpload:
# If inputs contain a file it won't persist. Most likely this
# is an interrupted upload. We should probably find a more
# standard method of determining an incomplete POST.
return self.handle_interrupted( trans, state.inputs )
except:
pass
# Just a refresh, render the form with updated state and errors.
if not self.display_interface:
return self.__no_display_interface_response()
return self.default_template, dict( errors=errors, tool_state=state )
def __handle_page_advance( self, trans, state, errors ):
state.page += 1
# Fill in the default values for the next page
self.fill_in_new_state( trans, self.inputs_by_page[ state.page ], state.inputs )
if not self.display_interface:
return self.__no_display_interface_response()
return self.default_template, dict( errors=errors, tool_state=state )
def __no_display_interface_response( self ):
return 'message.mako', dict( status='info', message="The interface for this tool cannot be displayed", refresh_frames=['everything'] )
def __fetch_state( self, trans, incoming, history, all_pages ):
# Get the state or create if not found
if "tool_state" in incoming:
encoded_state = string_to_object( incoming["tool_state"] )
state = DefaultToolState()
state.decode( encoded_state, self, trans.app )
new = False
else:
state = self.new_state( trans, history=history, all_pages=all_pages )
new = True
return state, new
def __check_param_values( self, trans, incoming, state, old_errors, process_state, history, source ):
# Process incoming data
if not( self.check_values ):
# If `self.check_values` is false we don't do any checking or
# processing on input This is used to pass raw values
# through to/from external sites. FIXME: This should be handled
# more cleanly, there is no reason why external sites need to
# post back to the same URL that the tool interface uses.
errors = {}
params = incoming
else:
# Update state for all inputs on the current page taking new
# values from `incoming`.
if process_state == "update":
inputs = self.inputs_by_page[state.page]
errors = self.update_state( trans, inputs, state.inputs, incoming, old_errors=old_errors or {}, source=source )
elif process_state == "populate":
inputs = self.inputs
errors = self.populate_state( trans, inputs, state.inputs, incoming, history, source=source )
else:
raise Exception("Unknown process_state type %s" % process_state)
# If the tool provides a `validate_input` hook, call it.
validate_input = self.get_hook( 'validate_input' )
if validate_input:
validate_input( trans, errors, state.inputs, inputs )
params = state.inputs
return errors, params
[docs] def find_fieldstorage( self, x ):
if isinstance( x, FieldStorage ):
raise InterruptedUpload( None )
elif type( x ) is types.DictType:
[ self.find_fieldstorage( y ) for y in x.values() ]
elif type( x ) is types.ListType:
[ self.find_fieldstorage( y ) for y in x ]
[docs] def handle_interrupted( self, trans, inputs ):
"""
Upon handling inputs, if it appears that we have received an incomplete
form, do some cleanup or anything else deemed necessary. Currently
this is only likely during file uploads, but this method could be
generalized and a method standardized for handling other tools.
"""
# If the async upload tool has uploading datasets, we need to error them.
if 'async_datasets' in inputs and inputs['async_datasets'] not in [ 'None', '', None ]:
for id in inputs['async_datasets'].split(','):
try:
data = self.sa_session.query( trans.model.HistoryDatasetAssociation ).get( int( id ) )
except:
log.exception( 'Unable to load precreated dataset (%s) sent in upload form' % id )
continue
if trans.user is None and trans.galaxy_session.current_history != data.history:
log.error( 'Got a precreated dataset (%s) but it does not belong to anonymous user\'s current session (%s)'
% ( data.id, trans.galaxy_session.id ) )
elif data.history.user != trans.user:
log.error( 'Got a precreated dataset (%s) but it does not belong to current user (%s)'
% ( data.id, trans.user.id ) )
else:
data.state = data.states.ERROR
data.info = 'Upload of this dataset was interrupted. Please try uploading again or'
self.sa_session.add( data )
self.sa_session.flush()
# It's unlikely the user will ever see this.
return 'message.mako', dict( status='error',
message='Your upload was interrupted. If this was uninentional, please retry it.',
refresh_frames=[], cont=None )
[docs] def populate_state( self, trans, inputs, state, incoming, history=None, source="html", prefix="", context=None ):
errors = dict()
# Push this level onto the context stack
context = ExpressionContext( state, context )
for input in inputs.itervalues():
key = prefix + input.name
if isinstance( input, Repeat ):
group_state = state[input.name]
# Create list of empty errors for each previously existing state
group_errors = [ ]
any_group_errors = False
rep_index = 0
del group_state[:] # Clear prepopulated defaults if repeat.min set.
while True:
rep_name = "%s_%d" % ( key, rep_index )
if not any( [ incoming_key.startswith(rep_name) for incoming_key in incoming.keys() ] ):
break
if rep_index < input.max:
new_state = {}
new_state['__index__'] = rep_index
self.fill_in_new_state( trans, input.inputs, new_state, context, history=history )
group_state.append( new_state )
group_errors.append( {} )
rep_errors = self.populate_state( trans,
input.inputs,
new_state,
incoming,
history,
source,
prefix=rep_name + "|",
context=context )
if rep_errors:
any_group_errors = True
group_errors[rep_index].update( rep_errors )
else:
group_errors[-1] = { '__index__': 'Cannot add repeat (max size=%i).' % input.max }
any_group_errors = True
rep_index += 1
elif isinstance( input, Conditional ):
group_state = state[input.name]
group_prefix = "%s|" % ( key )
# Deal with the 'test' element and see if its value changed
if input.value_ref and not input.value_ref_in_group:
# We are referencing an existent parameter, which is not
# part of this group
test_param_key = prefix + input.test_param.name
else:
test_param_key = group_prefix + input.test_param.name
# Get value of test param and determine current case
value, test_param_error = check_param_from_incoming( trans,
group_state,
input.test_param,
incoming,
test_param_key,
context,
source )
if test_param_error:
errors[ input.name ] = [ test_param_error ]
# Store the value of the test element
group_state[ input.test_param.name ] = value
else:
current_case = input.get_current_case( value, trans )
# Current case has changed, throw away old state
group_state = state[input.name] = {}
# TODO: we should try to preserve values if we can
self.fill_in_new_state( trans, input.cases[current_case].inputs, group_state, context, history=history )
group_errors = self.populate_state( trans,
input.cases[current_case].inputs,
group_state,
incoming,
history,
source,
prefix=group_prefix,
context=context,
)
if group_errors:
errors[ input.name ] = group_errors
# Store the current case in a special value
group_state['__current_case__'] = current_case
# Store the value of the test element
group_state[ input.test_param.name ] = value
elif isinstance( input, Section ):
group_state = state[input.name]
group_prefix = "%s|" % ( key )
self.fill_in_new_state( trans, input.inputs, group_state, context, history=history )
group_errors = self.populate_state( trans,
input.inputs,
group_state,
incoming,
history,
source,
prefix=group_prefix,
context=context )
if group_errors:
errors[ input.name ] = group_errors
elif isinstance( input, UploadDataset ):
group_state = state[input.name]
group_errors = []
any_group_errors = False
d_type = input.get_datatype( trans, context )
writable_files = d_type.writable_files
#remove extra files
while len( group_state ) > len( writable_files ):
del group_state[-1]
# Add new fileupload as needed
while len( writable_files ) > len( group_state ):
new_state = {}
new_state['__index__'] = len( group_state )
self.fill_in_new_state( trans, input.inputs, new_state, context )
group_state.append( new_state )
if any_group_errors:
group_errors.append( {} )
# Update state
for i, rep_state in enumerate( group_state ):
rep_index = rep_state['__index__']
rep_prefix = "%s_%d|" % ( key, rep_index )
rep_errors = self.populate_state( trans,
input.inputs,
rep_state,
incoming,
history,
source,
prefix=rep_prefix,
context=context)
if rep_errors:
any_group_errors = True
group_errors.append( rep_errors )
else:
group_errors.append( {} )
# Were there *any* errors for any repetition?
if any_group_errors:
errors[input.name] = group_errors
else:
value, error = check_param_from_incoming( trans, state, input, incoming, key, context, source )
if error:
errors[ input.name ] = error
state[ input.name ] = value
return errors
[docs] def update_state( self, trans, inputs, state, incoming, source='html', prefix="", context=None,
update_only=False, old_errors={}, item_callback=None ):
"""
Update the tool state in `state` using the user input in `incoming`.
This is designed to be called recursively: `inputs` contains the
set of inputs being processed, and `prefix` specifies a prefix to
add to the name of each input to extract its value from `incoming`.
If `update_only` is True, values that are not in `incoming` will
not be modified. In this case `old_errors` can be provided, and any
errors for parameters which were *not* updated will be preserved.
"""
errors = dict()
# Push this level onto the context stack
context = ExpressionContext( state, context )
# Iterate inputs and update (recursively)
for input in inputs.itervalues():
key = prefix + input.name
if isinstance( input, Repeat ):
group_state = state[input.name]
# Create list of empty errors for each previously existing state
group_errors = [ {} for i in range( len( group_state ) ) ]
group_old_errors = old_errors.get( input.name, None )
any_group_errors = False
# Check any removals before updating state -- only one
# removal can be performed, others will be ignored
for i, rep_state in enumerate( group_state ):
rep_index = rep_state['__index__']
if key + "_" + str(rep_index) + "_remove" in incoming:
if len( group_state ) > input.min:
del group_state[i]
del group_errors[i]
if group_old_errors:
del group_old_errors[i]
break
else:
group_errors[i] = { '__index__': 'Cannot remove repeat (min size=%i).' % input.min }
any_group_errors = True
# Only need to find one that can't be removed due to size, since only
# one removal is processed at # a time anyway
break
elif group_old_errors and group_old_errors[i]:
group_errors[i] = group_old_errors[i]
any_group_errors = True
# Update state
max_index = -1
for i, rep_state in enumerate( group_state ):
rep_index = rep_state['__index__']
max_index = max( max_index, rep_index )
rep_prefix = "%s_%d|" % ( key, rep_index )
if group_old_errors:
rep_old_errors = group_old_errors[i]
else:
rep_old_errors = {}
rep_errors = self.update_state( trans,
input.inputs,
rep_state,
incoming,
source=source,
prefix=rep_prefix,
context=context,
update_only=update_only,
old_errors=rep_old_errors,
item_callback=item_callback )
if rep_errors:
any_group_errors = True
group_errors[i].update( rep_errors )
# Check for addition
if key + "_add" in incoming:
if len( group_state ) < input.max:
new_state = {}
new_state['__index__'] = max_index + 1
self.fill_in_new_state( trans, input.inputs, new_state, context )
group_state.append( new_state )
group_errors.append( {} )
else:
group_errors[-1] = { '__index__': 'Cannot add repeat (max size=%i).' % input.max }
any_group_errors = True
# Were there *any* errors for any repetition?
if any_group_errors:
errors[input.name] = group_errors
elif isinstance( input, Conditional ):
group_state = state[input.name]
group_old_errors = old_errors.get( input.name, {} )
old_current_case = group_state['__current_case__']
group_prefix = "%s|" % ( key )
# Deal with the 'test' element and see if its value changed
if input.value_ref and not input.value_ref_in_group:
# We are referencing an existent parameter, which is not
# part of this group
test_param_key = prefix + input.test_param.name
else:
test_param_key = group_prefix + input.test_param.name
test_param_error = None
test_incoming = get_incoming_value( incoming, test_param_key, None )
if test_param_key not in incoming \
and "__force_update__" + test_param_key not in incoming \
and update_only:
# Update only, keep previous value and state, but still
# recurse in case there are nested changes
value = group_state[ input.test_param.name ]
current_case = old_current_case
if input.test_param.name in old_errors:
errors[ input.test_param.name ] = old_errors[ input.test_param.name ]
else:
# Get value of test param and determine current case
value, test_param_error = \
check_param( trans, input.test_param, test_incoming, context, source=source )
try:
current_case = input.get_current_case( value, trans )
except ValueError, e:
if input.is_job_resource_conditional:
# Unless explicitly given job resource parameters
# (e.g. from the run tool form) don't populate the
# state. Along with other hacks prevents workflow
# saving from populating resource defaults - which
# are meant to be much more transient than the rest
# of tool state.
continue
#load default initial value
if not test_param_error:
test_param_error = str( e )
if trans is not None:
history = trans.get_history()
else:
history = None
value = input.test_param.get_initial_value( trans, context, history=history )
current_case = input.get_current_case( value, trans )
case_changed = current_case != old_current_case
if case_changed:
# Current case has changed, throw away old state
group_state = state[input.name] = {}
# TODO: we should try to preserve values if we can
self.fill_in_new_state( trans, input.cases[current_case].inputs, group_state, context )
group_errors = dict()
group_old_errors = dict()
# If we didn't just change the current case and are coming from HTML - the values
# in incoming represent the old values and should not be replaced. If being updated
# from the API (json) instead of HTML - form values below the current case
# may also be supplied and incoming should be preferred to case defaults.
if (not case_changed) or (source != "html"):
# Current case has not changed, update children
group_errors = self.update_state( trans,
input.cases[current_case].inputs,
group_state,
incoming,
prefix=group_prefix,
context=context,
source=source,
update_only=update_only,
old_errors=group_old_errors,
item_callback=item_callback )
if input.test_param.name in group_old_errors and not test_param_error:
test_param_error = group_old_errors[ input.test_param.name ]
if test_param_error:
group_errors[ input.test_param.name ] = test_param_error
if group_errors:
errors[ input.name ] = group_errors
# Store the current case in a special value
group_state['__current_case__'] = current_case
# Store the value of the test element
group_state[ input.test_param.name ] = value
elif isinstance( input, Section ):
group_state = state[input.name]
group_old_errors = old_errors.get( input.name, {} )
group_prefix = "%s|" % ( key )
group_errors = self.update_state( trans,
input.inputs,
group_state,
incoming,
prefix=group_prefix,
context=context,
source=source,
update_only=update_only,
old_errors=group_old_errors,
item_callback=item_callback )
if group_errors:
errors[ input.name ] = group_errors
elif isinstance( input, UploadDataset ):
group_state = state[input.name]
group_errors = []
group_old_errors = old_errors.get( input.name, None )
any_group_errors = False
d_type = input.get_datatype( trans, context )
writable_files = d_type.writable_files
#remove extra files
while len( group_state ) > len( writable_files ):
del group_state[-1]
if group_old_errors:
del group_old_errors[-1]
# Update state
max_index = -1
for i, rep_state in enumerate( group_state ):
rep_index = rep_state['__index__']
max_index = max( max_index, rep_index )
rep_prefix = "%s_%d|" % ( key, rep_index )
if group_old_errors:
rep_old_errors = group_old_errors[i]
else:
rep_old_errors = {}
rep_errors = self.update_state( trans,
input.inputs,
rep_state,
incoming,
prefix=rep_prefix,
context=context,
source=source,
update_only=update_only,
old_errors=rep_old_errors,
item_callback=item_callback )
if rep_errors:
any_group_errors = True
group_errors.append( rep_errors )
else:
group_errors.append( {} )
# Add new fileupload as needed
offset = 1
while len( writable_files ) > len( group_state ):
new_state = {}
new_state['__index__'] = max_index + offset
offset += 1
self.fill_in_new_state( trans, input.inputs, new_state, context )
group_state.append( new_state )
if any_group_errors:
group_errors.append( {} )
# Were there *any* errors for any repetition?
if any_group_errors:
errors[input.name] = group_errors
else:
if key not in incoming \
and "__force_update__" + key not in incoming \
and update_only:
# No new value provided, and we are only updating, so keep
# the old value (which should already be in the state) and
# preserve the old error message.
if input.name in old_errors:
errors[ input.name ] = old_errors[ input.name ]
else:
incoming_value = get_incoming_value( incoming, key, None )
value, error = check_param( trans, input, incoming_value, context, source=source )
# If a callback was provided, allow it to process the value
input_name = input.name
if item_callback:
old_value = state.get( input_name, None )
value, error = item_callback( trans, key, input, value, error, old_value, context )
if error:
errors[ input_name ] = error
state[ input_name ] = value
meta_properties = self.__meta_properties_for_state( key, incoming, incoming_value, value, input_name )
state.update( meta_properties )
return errors
def __remove_meta_properties( self, incoming ):
result = incoming.copy()
meta_property_suffixes = [
"__multirun__",
"__collection_multirun__",
]
for key, value in incoming.iteritems():
if any( map( lambda s: key.endswith(s), meta_property_suffixes ) ):
del result[ key ]
return result
def __meta_properties_for_state( self, key, incoming, incoming_val, state_val, input_name ):
meta_properties = {}
meta_property_suffixes = [
"__multirun__",
"__collection_multirun__",
]
for meta_property_suffix in meta_property_suffixes:
multirun_key = "%s|%s" % ( key, meta_property_suffix )
if multirun_key in incoming:
multi_value = incoming[ multirun_key ]
meta_properties[ "%s|%s" % ( input_name, meta_property_suffix ) ] = multi_value
return meta_properties
@property
def params_with_missing_data_table_entry( self ):
"""
Return all parameters that are dynamically generated select lists whose
options require an entry not currently in the tool_data_table_conf.xml file.
"""
params = []
for input_param in self.input_params:
if isinstance( input_param, SelectToolParameter ) and input_param.is_dynamic:
options = input_param.options
if options and options.missing_tool_data_table_name and input_param not in params:
params.append( input_param )
return params
@property
def params_with_missing_index_file( self ):
"""
Return all parameters that are dynamically generated
select lists whose options refer to a missing .loc file.
"""
params = []
for input_param in self.input_params:
if isinstance( input_param, SelectToolParameter ) and input_param.is_dynamic:
options = input_param.options
if options and options.missing_index_file and input_param not in params:
params.append( input_param )
return params
[docs] def get_static_param_values( self, trans ):
"""
Returns a map of parameter names and values if the tool does not
require any user input. Will raise an exception if any parameter
does require input.
"""
args = dict()
for key, param in self.inputs.iteritems():
if isinstance( param, HiddenToolParameter ):
args[key] = model.User.expand_user_properties( trans.user, param.value )
elif isinstance( param, BaseURLToolParameter ):
args[key] = param.get_value( trans )
else:
raise Exception( "Unexpected parameter type" )
return args
[docs] def execute( self, trans, incoming={}, set_output_hid=True, history=None, **kwargs ):
"""
Execute the tool using parameter values in `incoming`. This just
dispatches to the `ToolAction` instance specified by
`self.tool_action`. In general this will create a `Job` that
when run will build the tool's outputs, e.g. `DefaultToolAction`.
"""
return self.tool_action.execute( self, trans, incoming=incoming, set_output_hid=set_output_hid, history=history, **kwargs )
[docs] def params_to_strings( self, params, app ):
return params_to_strings( self.inputs, params, app )
[docs] def params_from_strings( self, params, app, ignore_errors=False ):
return params_from_strings( self.inputs, params, app, ignore_errors )
[docs] def check_and_update_param_values( self, values, trans, update_values=True, allow_workflow_parameters=False ):
"""
Check that all parameters have values, and fill in with default
values where necessary. This could be called after loading values
from a database in case new parameters have been added.
"""
messages = {}
self.check_and_update_param_values_helper( self.inputs, values, trans, messages, update_values=update_values, allow_workflow_parameters=allow_workflow_parameters )
return messages
[docs] def check_and_update_param_values_helper( self, inputs, values, trans, messages, context=None, prefix="", update_values=True, allow_workflow_parameters=False ):
"""
Recursive helper for `check_and_update_param_values_helper`
"""
context = ExpressionContext( values, context )
for input in inputs.itervalues():
# No value, insert the default
if input.name not in values:
if isinstance( input, Conditional ):
cond_messages = {}
if not input.is_job_resource_conditional:
cond_messages = { input.test_param.name: "No value found for '%s%s', used default" % ( prefix, input.label ) }
messages[ input.name ] = cond_messages
test_value = input.test_param.get_initial_value( trans, context )
current_case = input.get_current_case( test_value, trans )
self.check_and_update_param_values_helper( input.cases[ current_case ].inputs, {}, trans, cond_messages, context, prefix, allow_workflow_parameters=allow_workflow_parameters )
elif isinstance( input, Repeat ):
if input.min:
messages[ input.name ] = []
for i in range( input.min ):
rep_prefix = prefix + "%s %d > " % ( input.title, i + 1 )
rep_dict = dict()
messages[ input.name ].append( rep_dict )
self.check_and_update_param_values_helper( input.inputs, {}, trans, rep_dict, context, rep_prefix, allow_workflow_parameters=allow_workflow_parameters )
else:
messages[ input.name ] = "No value found for '%s%s', used default" % ( prefix, input.label )
values[ input.name ] = input.get_initial_value( trans, context )
# Value, visit recursively as usual
else:
if isinstance( input, Repeat ):
for i, d in enumerate( values[ input.name ] ):
rep_prefix = prefix + "%s %d > " % ( input.title, i + 1 )
self.check_and_update_param_values_helper( input.inputs, d, trans, messages, context, rep_prefix, allow_workflow_parameters=allow_workflow_parameters )
elif isinstance( input, Conditional ):
group_values = values[ input.name ]
use_initial_value = False
if '__current_case__' in group_values:
if int( group_values['__current_case__'] ) >= len( input.cases ):
use_initial_value = True
else:
use_initial_value = True
if input.test_param.name not in group_values or use_initial_value:
# No test param invalidates the whole conditional
values[ input.name ] = group_values = input.get_initial_value( trans, context )
messages[ input.test_param.name ] = "No value found for '%s%s', used default" % ( prefix, input.test_param.label )
current_case = group_values['__current_case__']
for child_input in input.cases[current_case].inputs.itervalues():
messages[ child_input.name ] = "Value no longer valid for '%s%s', replaced with default" % ( prefix, child_input.label )
else:
current = group_values["__current_case__"]
self.check_and_update_param_values_helper( input.cases[current].inputs, group_values, trans, messages, context, prefix, allow_workflow_parameters=allow_workflow_parameters )
else:
# Regular tool parameter, no recursion needed
try:
ck_param = True
if allow_workflow_parameters and isinstance( values[ input.name ], basestring ):
if WORKFLOW_PARAMETER_REGULAR_EXPRESSION.search( values[ input.name ] ):
ck_param = False
#this will fail when a parameter's type has changed to a non-compatible one: e.g. conditional group changed to dataset input
if ck_param:
input.value_from_basic( input.value_to_basic( values[ input.name ], trans.app ), trans.app, ignore_errors=False )
except:
messages[ input.name ] = "Value no longer valid for '%s%s', replaced with default" % ( prefix, input.label )
if update_values:
values[ input.name ] = input.get_initial_value( trans, context )
[docs] def handle_unvalidated_param_values( self, input_values, app ):
"""
Find any instances of `UnvalidatedValue` within input_values and
validate them (by calling `ToolParameter.from_html` and
`ToolParameter.validate`).
"""
# No validation is done when check_values is False
if not self.check_values:
return
self.handle_unvalidated_param_values_helper( self.inputs, input_values, app )
[docs] def handle_unvalidated_param_values_helper( self, inputs, input_values, app, context=None, prefix="" ):
"""
Recursive helper for `handle_unvalidated_param_values`
"""
context = ExpressionContext( input_values, context )
for input in inputs.itervalues():
if isinstance( input, Repeat ):
for i, d in enumerate( input_values[ input.name ] ):
rep_prefix = prefix + "%s %d > " % ( input.title, i + 1 )
self.handle_unvalidated_param_values_helper( input.inputs, d, app, context, rep_prefix )
elif isinstance( input, Conditional ):
values = input_values[ input.name ]
current = values["__current_case__"]
# NOTE: The test param doesn't need to be checked since
# there would be no way to tell what case to use at
# workflow build time. However I'm not sure if we are
# actually preventing such a case explicately.
self.handle_unvalidated_param_values_helper( input.cases[current].inputs, values, app, context, prefix )
else:
# Regular tool parameter
value = input_values[ input.name ]
if isinstance( value, UnvalidatedValue ):
try:
# Convert from html representation
if value.value is None:
# If value.value is None, it could not have been
# submited via html form and therefore .from_html
# can't be guaranteed to work
value = None
else:
value = input.from_html( value.value, None, context )
# Do any further validation on the value
input.validate( value, None )
except Exception, e:
# Wrap an re-raise any generated error so we can
# generate a more informative message
message = "Failed runtime validation of %s%s (%s)" \
% ( prefix, input.label, e )
raise LateValidationError( message )
input_values[ input.name ] = value
[docs] def handle_job_failure_exception( self, e ):
"""
Called by job.fail when an exception is generated to allow generation
of a better error message (returning None yields the default behavior)
"""
message = None
# If the exception was generated by late validation, use its error
# message (contains the parameter name and value)
if isinstance( e, LateValidationError ):
message = e.message
return message
[docs] def build_dependency_shell_commands( self ):
"""Return a list of commands to be run to populate the current environment to include this tools requirements."""
return self.app.toolbox.dependency_manager.dependency_shell_commands(
self.requirements,
installed_tool_dependencies=self.installed_tool_dependencies,
tool_dir=self.tool_dir,
)
@property
def installed_tool_dependencies(self):
if self.tool_shed_repository:
installed_tool_dependencies = self.tool_shed_repository.tool_dependencies_installed_or_in_error
else:
installed_tool_dependencies = None
return installed_tool_dependencies
[docs] def build_redirect_url_params( self, param_dict ):
"""
Substitute parameter values into self.redirect_url_params
"""
if not self.redirect_url_params:
return
redirect_url_params = None
# Substituting parameter values into the url params
redirect_url_params = fill_template( self.redirect_url_params, context=param_dict )
# Remove newlines
redirect_url_params = redirect_url_params.replace( "\n", " " ).replace( "\r", " " )
return redirect_url_params
[docs] def parse_redirect_url( self, data, param_dict ):
"""
Parse the REDIRECT_URL tool param. Tools that send data to an external
application via a redirect must include the following 3 tool params:
1) REDIRECT_URL - the url to which the data is being sent
2) DATA_URL - the url to which the receiving application will send an
http post to retrieve the Galaxy data
3) GALAXY_URL - the url to which the external application may post
data as a response
"""
redirect_url = param_dict.get( 'REDIRECT_URL' )
redirect_url_params = self.build_redirect_url_params( param_dict )
# Add the parameters to the redirect url. We're splitting the param
# string on '**^**' because the self.parse() method replaced white
# space with that separator.
params = redirect_url_params.split( '**^**' )
rup_dict = {}
for param in params:
p_list = param.split( '=' )
p_name = p_list[0]
p_val = p_list[1]
rup_dict[ p_name ] = p_val
DATA_URL = param_dict.get( 'DATA_URL', None )
assert DATA_URL is not None, "DATA_URL parameter missing in tool config."
DATA_URL += "/%s/display" % str( data.id )
redirect_url += "?DATA_URL=%s" % DATA_URL
# Add the redirect_url_params to redirect_url
for p_name in rup_dict:
redirect_url += "&%s=%s" % ( p_name, rup_dict[ p_name ] )
# Add the current user email to redirect_url
if data.history.user:
USERNAME = str( data.history.user.email )
else:
USERNAME = 'Anonymous'
redirect_url += "&USERNAME=%s" % USERNAME
return redirect_url
[docs] def call_hook( self, hook_name, *args, **kwargs ):
"""
Call the custom code hook function identified by 'hook_name' if any,
and return the results
"""
try:
code = self.get_hook( hook_name )
if code:
return code( *args, **kwargs )
except Exception, e:
original_message = ''
if len( e.args ):
original_message = e.args[0]
e.args = ( "Error in '%s' hook '%s', original message: %s" % ( self.name, hook_name, original_message ), )
raise
[docs] def job_failed( self, job_wrapper, message, exception=False ):
"""
Called when a job has failed
"""
pass
[docs] def collect_child_datasets( self, output, job_working_directory ):
"""
Look for child dataset files, create HDA and attach to parent.
"""
children = {}
# Loop through output file names, looking for generated children in
# form of 'child_parentId_designation_visibility_extension'
for name, outdata in output.items():
filenames = []
if 'new_file_path' in self.app.config.collect_outputs_from:
filenames.extend( glob.glob(os.path.join(self.app.config.new_file_path, "child_%i_*" % outdata.id) ) )
if 'job_working_directory' in self.app.config.collect_outputs_from:
filenames.extend( glob.glob(os.path.join(job_working_directory, "child_%i_*" % outdata.id) ) )
for filename in filenames:
if not name in children:
children[name] = {}
fields = os.path.basename(filename).split("_")
fields.pop(0)
parent_id = int(fields.pop(0))
designation = fields.pop(0)
visible = fields.pop(0).lower()
if visible == "visible":
visible = True
else:
visible = False
ext = fields.pop(0).lower()
child_dataset = self.app.model.HistoryDatasetAssociation( extension=ext,
parent_id=outdata.id,
designation=designation,
visible=visible,
dbkey=outdata.dbkey,
create_dataset=True,
sa_session=self.sa_session )
self.app.security_agent.copy_dataset_permissions( outdata.dataset, child_dataset.dataset )
# Move data from temp location to dataset location
self.app.object_store.update_from_file(child_dataset.dataset, file_name=filename, create=True)
self.sa_session.add( child_dataset )
self.sa_session.flush()
child_dataset.set_size()
child_dataset.name = "Secondary Dataset (%s)" % ( designation )
child_dataset.init_meta()
child_dataset.set_meta()
child_dataset.set_peek()
# Associate new dataset with job
job = None
for assoc in outdata.creating_job_associations:
job = assoc.job
break
if job:
assoc = self.app.model.JobToOutputDatasetAssociation( '__new_child_file_%s|%s__' % ( name, designation ), child_dataset )
assoc.job = job
self.sa_session.add( assoc )
self.sa_session.flush()
child_dataset.state = outdata.state
self.sa_session.add( child_dataset )
self.sa_session.flush()
# Add child to return dict
children[name][designation] = child_dataset
# Need to update all associated output hdas, i.e. history was
# shared with job running
for dataset in outdata.dataset.history_associations:
if outdata == dataset:
continue
# Create new child dataset
child_data = child_dataset.copy( parent_id=dataset.id )
self.sa_session.add( child_data )
self.sa_session.flush()
return children
[docs] def collect_primary_datasets( self, output, job_working_directory, input_ext ):
"""
Find any additional datasets generated by a tool and attach (for
cases where number of outputs is not known in advance).
"""
return output_collect.collect_primary_datasets( self, output, job_working_directory, input_ext )
[docs] def collect_dynamic_collections( self, output, **kwds ):
""" Find files corresponding to dynamically structured collections.
"""
return output_collect.collect_dynamic_collections( self, output, **kwds )
[docs] def to_dict( self, trans, link_details=False, io_details=False ):
""" Returns dict of tool. """
# Basic information
tool_dict = super( Tool, self ).to_dict()
# Add link details.
if link_details:
# Add details for creating a hyperlink to the tool.
if not isinstance( self, DataSourceTool ):
link = url_for( controller='tool_runner', tool_id=self.id )
else:
link = url_for( controller='tool_runner', action='data_source_redirect', tool_id=self.id )
# Basic information
tool_dict.update( { 'link': link,
'min_width': self.uihints.get( 'minwidth', -1 ),
'target': self.target } )
# Add input and output details.
if io_details:
tool_dict[ 'inputs' ] = [ input.to_dict( trans ) for input in self.inputs.values() ]
tool_dict[ 'outputs' ] = [ output.to_dict() for output in self.outputs.values() ]
tool_dict[ 'panel_section_id' ], tool_dict[ 'panel_section_name' ] = self.get_panel_section()
return tool_dict
[docs] def to_json (self, trans, kwd={}, is_workflow=False):
"""
Recursively creates a tool dictionary containing repeats, dynamic options and updated states.
"""
job_id = kwd.get('__job_id__', None)
dataset_id = kwd.get('__dataset_id__', None)
# load job details if provided
job = None
if job_id:
try:
job_id = trans.security.decode_id( job_id )
job = trans.sa_session.query( trans.app.model.Job ).get( job_id )
except Exception, exception:
trans.response.status = 500
log.error('Failed to retrieve job.')
return { 'error': 'Failed to retrieve job.' }
elif dataset_id:
try:
dataset_id = trans.security.decode_id( dataset_id )
data = trans.sa_session.query( trans.app.model.HistoryDatasetAssociation ).get( dataset_id )
if not ( trans.user_is_admin() or trans.app.security_agent.can_access_dataset( trans.get_current_user_roles(), data.dataset ) ):
trans.response.status = 500
log.error('User has no access to dataset.')
return { 'error': 'User has no access to dataset.' }
job = data.creating_job
if not job:
trans.response.status = 500
log.error('Creating job not found.')
return { 'error': 'Creating job not found.' }
except Exception, exception:
trans.response.status = 500
log.error('Failed to get job information.')
return { 'error': 'Failed to get job information.' }
# load job parameters into incoming
tool_message = ''
if job:
try:
job_params = job.get_param_values( trans.app, ignore_errors = True )
job_messages = self.check_and_update_param_values( job_params, trans, update_values=False )
self._map_source_to_history( trans, self.inputs, job_params )
tool_message = self._compare_tool_version(trans, job)
params_to_incoming( kwd, self.inputs, job_params, trans.app, to_html=False )
except Exception, exception:
trans.response.status = 500
log.error( str( exception ) )
return { 'error': str( exception ) }
# create parameter object
params = galaxy.util.Params( kwd, sanitize = False )
# convert value to jsonifiable value
def jsonify(v):
# check if value is numeric
isnumber = False
try:
float(v)
isnumber = True
except Exception:
pass
# fix hda parsing
if isinstance(v, trans.app.model.HistoryDatasetAssociation):
return {
'id' : trans.security.encode_id(v.id),
'src' : 'hda'
}
elif isinstance(v, trans.app.model.HistoryDatasetCollectionAssociation):
return {
'id' : trans.security.encode_id(v.id),
'src' : 'hdca'
}
elif isinstance(v, bool):
if v is True:
return 'true'
else:
return 'false'
elif isinstance(v, basestring) or isnumber:
return v
else:
return None
# ensures that input dictionary is jsonifiable
def sanitize(dict, key='value'):
# get current value
value = dict[key] if key in dict else None
# jsonify by type
if dict['type'] in ['data']:
if isinstance(value, list):
value = [ jsonify(v) for v in value ]
else:
value = [ jsonify(value) ]
if None in value:
value = None
else:
value = { 'values': value }
elif isinstance(value, list):
value = [ jsonify(v) for v in value ]
else:
value = jsonify(value)
# update and return
dict[key] = value
# check the current state of a value and update it if necessary
def check_state(trans, input, default_value, context):
value = default_value
error = 'State validation failed.'
# skip dynamic fields if deactivated
if is_workflow and input.is_dynamic:
return [value, None]
# validate value content
try:
# resolves the inconsistent definition of boolean parameters (see base.py) without modifying shared code
if input.type == 'boolean' and isinstance(default_value, basestring):
value, error = [string_as_bool(default_value), None]
else:
value, error = check_param(trans, input, default_value, context)
except Exception, err:
log.error('Checking parameter %s failed. %s', input.name, str(err))
pass
return [value, error]
# populates state with incoming url parameters
def populate_state(trans, inputs, state, errors, incoming, prefix="", context=None ):
context = ExpressionContext(state, context)
for input in inputs.itervalues():
state[input.name] = input.get_initial_value(trans, context)
key = prefix + input.name
if input.type == 'repeat':
group_state = state[input.name]
rep_index = 0
del group_state[:]
while True:
rep_name = "%s_%d" % (key, rep_index)
if not any([incoming_key.startswith(rep_name) for incoming_key in incoming.keys()]) and rep_index >= input.min:
break
if rep_index < input.max:
new_state = {}
new_state['__index__'] = rep_index
group_state.append(new_state)
populate_state(trans, input.inputs, new_state, errors, incoming, prefix=rep_name + "|", context=context)
rep_index += 1
elif input.type == 'conditional':
group_state = state[input.name]
group_prefix = "%s|" % ( key )
test_param_key = group_prefix + input.test_param.name
default_value = incoming.get(test_param_key, group_state.get(input.test_param.name, None))
value, error = check_state(trans, input.test_param, default_value, context)
if error:
errors[test_param_key] = error
else:
try:
current_case = input.get_current_case(value, trans)
group_state = state[input.name] = {}
populate_state( trans, input.cases[current_case].inputs, group_state, errors, incoming, prefix=group_prefix, context=context)
group_state['__current_case__'] = current_case
except Exception, e:
errors[test_param_key] = 'The selected case is unavailable/invalid.'
pass
group_state[input.test_param.name] = value
elif input.type == 'section':
group_state = state[input.name]
group_prefix = "%s|" % ( key )
populate_state(trans, input.inputs, group_state, errors, incoming, prefix=group_prefix, context=context)
else:
default_value = incoming.get(key, state.get(input.name, None))
value, error = check_state(trans, input, default_value, context)
if error:
errors[key] = error
state[input.name] = value
# builds tool model including all attributes
def iterate(group_inputs, inputs, state_inputs, other_values=None):
other_values = ExpressionContext( state_inputs, other_values )
for input_index, input in enumerate( inputs.itervalues() ):
# create model dictionary
tool_dict = input.to_dict(trans)
if tool_dict is None:
continue
# state for subsection/group
group_state = state_inputs.get(input.name, {})
# iterate and update values
if input.type == 'repeat':
group_cache = tool_dict['cache'] = {}
for i in range( len( group_state ) ):
group_cache[i] = {}
iterate( group_cache[i], input.inputs, group_state[i], other_values )
elif input.type == 'conditional':
if 'test_param' in tool_dict:
test_param = tool_dict['test_param']
test_param['default_value'] = jsonify(input.test_param.get_initial_value(trans, other_values))
test_param['value'] = jsonify(group_state.get(test_param['name'], test_param['default_value']))
for i in range (len ( tool_dict['cases'] ) ):
current_state = {}
if i == group_state.get('__current_case__', None):
current_state = group_state
iterate(tool_dict['cases'][i]['inputs'], input.cases[i].inputs, current_state, other_values)
elif input.type == 'section':
iterate( tool_dict['inputs'], input.inputs, group_state, other_values )
else:
# identify name
input_name = tool_dict.get('name')
# expand input dictionary incl. repeats and dynamic_parameters
try:
tool_dict = input.to_dict(trans, other_values=other_values)
except Exception:
log.exception('tools::to_json() - Skipping parameter expansion for %s.' % input_name)
pass
# backup default value
try:
tool_dict['default_value'] = input.get_initial_value(trans, other_values)
except Exception:
log.exception('tools::to_json() - Getting initial value failed %s.' % input_name)
# get initial value failed due to improper late validation
tool_dict['default_value'] = None
pass
# update input value from tool state
tool_dict['value'] = state_inputs.get(input_name, tool_dict['default_value'])
# sanitize values
sanitize(tool_dict, 'value')
sanitize(tool_dict, 'default_value')
# backup final input dictionary
group_inputs[input_index] = tool_dict
# sanatization for the final tool state
def sanitize_state(state):
keys = None
if isinstance(state, dict):
keys = state
elif isinstance(state, list):
keys = range( len(state) )
if keys:
for k in keys:
if isinstance(state[k], dict) or isinstance(state[k], list):
sanitize_state(state[k])
else:
state[k] = jsonify(state[k])
# expand incoming parameters (parameters might trigger multiple tool executions,
# here we select the first execution only in order to resolve dynamic parameters)
expanded_incomings, _ = expand_meta_parameters( trans, self, params.__dict__ )
params.__dict__ = expanded_incomings[ 0 ]
# do param translation here, used by datasource tools
if self.input_translator:
self.input_translator.translate( params )
# initialize and populate tool state
state_inputs = {}
state_errors = {}
populate_state(trans, self.inputs, state_inputs, state_errors, params.__dict__)
# create basic tool model
tool_model = self.to_dict(trans)
tool_model['inputs'] = {}
# build tool model and tool state
iterate(tool_model['inputs'], self.inputs, state_inputs, '')
# sanitize tool state
sanitize_state(state_inputs)
# load tool help
tool_help = ''
if self.help:
tool_help = self.help
tool_help = tool_help.render( static_path=url_for( '/static' ), host_url=url_for('/', qualified=True) )
if type( tool_help ) is not unicode:
tool_help = unicode( tool_help, 'utf-8')
# check if citations exist
tool_citations = False
if self.citations:
tool_citations = True
# get tool versions
tool_versions = []
tools = self.app.toolbox.get_loaded_tools_by_lineage(self.id)
for t in tools:
if not t.version in tool_versions:
tool_versions.append(t.version)
# add information with underlying requirements and their versions
tool_requirements = []
if self.requirements:
for requirement in self.requirements:
tool_requirements.append({
'name' : requirement.name,
'version' : requirement.version
})
# add toolshed url
sharable_url = None
if self.tool_shed_repository:
sharable_url = self.tool_shed_repository.get_sharable_url( trans.app )
# add additional properties
tool_model.update({
'help' : tool_help,
'citations' : tool_citations,
'biostar_url' : trans.app.config.biostar_url,
'sharable_url' : sharable_url,
'message' : tool_message,
'versions' : tool_versions,
'requirements' : tool_requirements,
'errors' : state_errors,
'state_inputs' : state_inputs,
'job_remap' : self._get_job_remap(job)
})
# check for errors
if 'error' in tool_message:
return tool_message
# return enriched tool model
return tool_model
def _get_job_remap ( self, job):
if job:
if job.state == job.states.ERROR:
try:
if [ hda.dependent_jobs for hda in [ jtod.dataset for jtod in job.output_datasets ] if hda.dependent_jobs ]:
return True
except Exception, exception:
log.error( str( exception ) )
pass
return False
def _map_source_to_history(self, trans, tool_inputs, params):
# Need to remap dataset parameters. Job parameters point to original
# dataset used; parameter should be the analygous dataset in the
# current history.
history = trans.get_history()
# Create index for hdas.
hda_source_dict = {}
for hda in history.datasets:
key = '%s_%s' % (hda.hid, hda.dataset.id)
hda_source_dict[ hda.dataset.id ] = hda_source_dict[ key ] = hda
# Ditto for dataset collections.
hdca_source_dict = {}
for hdca in history.dataset_collections:
key = '%s_%s' % (hdca.hid, hdca.collection.id)
hdca_source_dict[ hdca.collection.id ] = hdca_source_dict[ key ] = hdca
# Map dataset or collection to current history
def map_to_history(value):
id = None
source = None
if isinstance(value, trans.app.model.HistoryDatasetAssociation):
id = value.dataset.id
source = hda_source_dict
elif isinstance(value, trans.app.model.HistoryDatasetCollectionAssociation):
id = value.collection.id
source = hdca_source_dict
else:
return None
key = '%s_%s' % (value.hid, id)
if key in source:
return source[ key ]
elif id in source:
return source[ id ]
else:
return None
# Unpack unvalidated values to strings, they'll be validated when the
# form is submitted (this happens when re-running a job that was
# initially run by a workflow)
#This needs to be done recursively through grouping parameters
def mapping_callback( input, value, prefixed_name, prefixed_label ):
if isinstance( value, UnvalidatedValue ):
try:
return input.to_html_value( value.value, trans.app )
except Exception, e:
# Need to determine when (if ever) the to_html_value call could fail.
log.debug( "Failed to use input.to_html_value to determine value of unvalidated parameter, defaulting to string: %s" % ( e ) )
return str( value )
if isinstance( input, DataToolParameter ):
if isinstance(value,list):
values = []
for val in value:
new_val = map_to_history( val )
if new_val:
values.append( new_val )
else:
values.append( val )
return values
else:
return map_to_history( value )
elif isinstance( input, DataCollectionToolParameter ):
return map_to_history( value )
visit_input_values( tool_inputs, params, mapping_callback )
def _compare_tool_version( self, trans, job ):
"""
Compares a tool version with the tool version from a job (from ToolRunner).
"""
tool_id = job.tool_id
tool_version = job.tool_version
message = ''
try:
select_field, tools, tool = self.app.toolbox.get_tool_components( tool_id, tool_version=tool_version, get_loaded_tools_by_lineage=False, set_selected=True )
if tool is None:
trans.response.status = 500
return { 'error': 'This dataset was created by an obsolete tool (%s). Can\'t re-run.' % tool_id }
if ( self.id != tool_id and self.old_id != tool_id ) or self.version != tool_version:
if self.id == tool_id:
if tool_version == None:
# for some reason jobs don't always keep track of the tool version.
message = ''
else:
message = 'This job was initially run with tool version "%s", which is currently not available. ' % tool_version
if len( tools ) > 1:
message += 'You can re-run the job with the selected tool or choose another derivation of the tool.'
else:
message += 'You can re-run the job with this tool version, which is a derivation of the original tool.'
else:
if len( tools ) > 1:
message = 'This job was initially run with tool version "%s", which is currently not available. ' % tool_version
message += 'You can re-run the job with the selected tool or choose another derivation of the tool.'
else:
message = 'This job was initially run with tool id "%s", version "%s", which is ' % ( tool_id, tool_version )
message += 'currently not available. You can re-run the job with this tool, which is a derivation of the original tool.'
except Exception, error:
trans.response.status = 500
return { 'error': str (error) }
# can't rerun upload, external data sources, et cetera. workflow compatible will proxy this for now
#if not self.is_workflow_compatible:
# trans.response.status = 500
# return { 'error': 'The \'%s\' tool does currently not support re-running.' % self.name }
return message
[docs] def get_default_history_by_trans( self, trans, create=False ):
return trans.get_history( create=create )
@classmethod
[docs] def get_externally_referenced_paths( self, path ):
""" Return relative paths to externally referenced files by the tool
described by file at `path`. External components should not assume things
about the structure of tool xml files (this is the tool's responsibility).
"""
tree = raw_tool_xml_tree(path)
root = tree.getroot()
external_paths = []
for code_elem in root.findall( 'code' ):
external_path = code_elem.get( 'file' )
if external_path:
external_paths.append( external_path )
external_paths.extend( imported_macro_paths( root ) )
# May also need to load external citation files as well at some point.
return external_paths
[docs]class OutputParameterJSONTool( Tool ):
"""
Alternate implementation of Tool that provides parameters and other values
JSONified within the contents of an output dataset
"""
tool_type = 'output_parameter_json'
def _prepare_json_list( self, param_list ):
rval = []
for value in param_list:
if isinstance( value, dict ):
rval.append( self._prepare_json_param_dict( value ) )
elif isinstance( value, list ):
rval.append( self._prepare_json_list( value ) )
else:
rval.append( str( value ) )
return rval
def _prepare_json_param_dict( self, param_dict ):
rval = {}
for key, value in param_dict.iteritems():
if isinstance( value, dict ):
rval[ key ] = self._prepare_json_param_dict( value )
elif isinstance( value, list ):
rval[ key ] = self._prepare_json_list( value )
else:
rval[ key ] = str( value )
return rval
[docs] def exec_before_job( self, app, inp_data, out_data, param_dict=None ):
if param_dict is None:
param_dict = {}
json_params = {}
json_params[ 'param_dict' ] = self._prepare_json_param_dict( param_dict ) # it would probably be better to store the original incoming parameters here, instead of the Galaxy modified ones?
json_params[ 'output_data' ] = []
json_params[ 'job_config' ] = dict( GALAXY_DATATYPES_CONF_FILE=param_dict.get( 'GALAXY_DATATYPES_CONF_FILE' ), GALAXY_ROOT_DIR=param_dict.get( 'GALAXY_ROOT_DIR' ), TOOL_PROVIDED_JOB_METADATA_FILE=galaxy.jobs.TOOL_PROVIDED_JOB_METADATA_FILE )
json_filename = None
for i, ( out_name, data ) in enumerate( out_data.iteritems() ):
#use wrapped dataset to access certain values
wrapped_data = param_dict.get( out_name )
#allow multiple files to be created
file_name = str( wrapped_data )
extra_files_path = str( wrapped_data.files_path )
data_dict = dict( out_data_name=out_name,
ext=data.ext,
dataset_id=data.dataset.id,
hda_id=data.id,
file_name=file_name,
extra_files_path=extra_files_path )
json_params[ 'output_data' ].append( data_dict )
if json_filename is None:
json_filename = file_name
out = open( json_filename, 'w' )
out.write( json.dumps( json_params ) )
out.close()
[docs]class DataSourceTool( OutputParameterJSONTool ):
"""
Alternate implementation of Tool for data_source tools -- those that
allow the user to query and extract data from another web site.
"""
tool_type = 'data_source'
default_tool_action = DataSourceToolAction
def _build_GALAXY_URL_parameter( self ):
return ToolParameter.build( self, ElementTree.XML( '<param name="GALAXY_URL" type="baseurl" value="/tool_runner?tool_id=%s" />' % self.id ) )
[docs] def parse_inputs( self, tool_source ):
super( DataSourceTool, self ).parse_inputs( tool_source )
# Open all data_source tools in _top.
self.target = '_top'
if 'GALAXY_URL' not in self.inputs:
self.inputs[ 'GALAXY_URL' ] = self._build_GALAXY_URL_parameter()
self.inputs_by_page[0][ 'GALAXY_URL' ] = self.inputs[ 'GALAXY_URL' ]
[docs] def exec_before_job( self, app, inp_data, out_data, param_dict=None ):
if param_dict is None:
param_dict = {}
dbkey = param_dict.get( 'dbkey' )
info = param_dict.get( 'info' )
data_type = param_dict.get( 'data_type' )
name = param_dict.get( 'name' )
json_params = {}
json_params[ 'param_dict' ] = self._prepare_json_param_dict( param_dict ) # it would probably be better to store the original incoming parameters here, instead of the Galaxy modified ones?
json_params[ 'output_data' ] = []
json_params[ 'job_config' ] = dict( GALAXY_DATATYPES_CONF_FILE=param_dict.get( 'GALAXY_DATATYPES_CONF_FILE' ), GALAXY_ROOT_DIR=param_dict.get( 'GALAXY_ROOT_DIR' ), TOOL_PROVIDED_JOB_METADATA_FILE=galaxy.jobs.TOOL_PROVIDED_JOB_METADATA_FILE )
json_filename = None
for i, ( out_name, data ) in enumerate( out_data.iteritems() ):
#use wrapped dataset to access certain values
wrapped_data = param_dict.get( out_name )
#allow multiple files to be created
cur_base_param_name = 'GALAXY|%s|' % out_name
cur_name = param_dict.get( cur_base_param_name + 'name', name )
cur_dbkey = param_dict.get( cur_base_param_name + 'dkey', dbkey )
cur_info = param_dict.get( cur_base_param_name + 'info', info )
cur_data_type = param_dict.get( cur_base_param_name + 'data_type', data_type )
if cur_name:
data.name = cur_name
if not data.info and cur_info:
data.info = cur_info
if cur_dbkey:
data.dbkey = cur_dbkey
if cur_data_type:
data.extension = cur_data_type
file_name = str( wrapped_data )
extra_files_path = str( wrapped_data.files_path )
data_dict = dict( out_data_name=out_name,
ext=data.ext,
dataset_id=data.dataset.id,
hda_id=data.id,
file_name=file_name,
extra_files_path=extra_files_path )
json_params[ 'output_data' ].append( data_dict )
if json_filename is None:
json_filename = file_name
out = open( json_filename, 'w' )
out.write( json.dumps( json_params ) )
out.close()
[docs]class AsyncDataSourceTool( DataSourceTool ):
tool_type = 'data_source_async'
def _build_GALAXY_URL_parameter( self ):
return ToolParameter.build( self, ElementTree.XML( '<param name="GALAXY_URL" type="baseurl" value="/async/%s" />' % self.id ) )
[docs]class SetMetadataTool( Tool ):
"""
Tool implementation for special tool that sets metadata on an existing
dataset.
"""
tool_type = 'set_metadata'
requires_setting_metadata = False
[docs] def exec_after_process( self, app, inp_data, out_data, param_dict, job=None ):
for name, dataset in inp_data.iteritems():
external_metadata = JobExternalOutputMetadataWrapper( job )
if external_metadata.external_metadata_set_successfully( dataset, app.model.context ):
dataset.metadata.from_JSON_dict( external_metadata.get_output_filenames_by_dataset( dataset, app.model.context ).filename_out )
else:
dataset._state = model.Dataset.states.FAILED_METADATA
self.sa_session.add( dataset )
self.sa_session.flush()
return
# If setting external metadata has failed, how can we inform the
# user? For now, we'll leave the default metadata and set the state
# back to its original.
dataset.datatype.after_setting_metadata( dataset )
if job and job.tool_id == '1.0.0':
dataset.state = param_dict.get( '__ORIGINAL_DATASET_STATE__' )
else:
# Revert dataset.state to fall back to dataset.dataset.state
dataset._state = None
# Need to reset the peek, which may rely on metadata
dataset.set_peek()
self.sa_session.add( dataset )
self.sa_session.flush()
[docs] def job_failed( self, job_wrapper, message, exception=False ):
job = job_wrapper.sa_session.query( model.Job ).get( job_wrapper.job_id )
if job:
inp_data = {}
for dataset_assoc in job.input_datasets:
inp_data[dataset_assoc.name] = dataset_assoc.dataset
return self.exec_after_process( job_wrapper.app, inp_data, {}, job_wrapper.get_param_dict(), job=job )
[docs]class DataManagerTool( OutputParameterJSONTool ):
tool_type = 'manage_data'
default_tool_action = DataManagerToolAction
def __init__( self, config_file, root, app, guid=None, data_manager_id=None, **kwds ):
self.data_manager_id = data_manager_id
super( DataManagerTool, self ).__init__( config_file, root, app, guid=guid, **kwds )
if self.data_manager_id is None:
self.data_manager_id = self.id
[docs] def exec_after_process( self, app, inp_data, out_data, param_dict, job=None, **kwds ):
assert self.allow_user_access( job.user ), "You must be an admin to access this tool."
#run original exec_after_process
super( DataManagerTool, self ).exec_after_process( app, inp_data, out_data, param_dict, job=job, **kwds )
#process results of tool
if job and job.state == job.states.ERROR:
return
#Job state may now be 'running' instead of previous 'error', but datasets are still set to e.g. error
for dataset in out_data.itervalues():
if dataset.state != dataset.states.OK:
return
data_manager_id = job.data_manager_association.data_manager_id
data_manager = self.app.data_managers.get_manager( data_manager_id, None )
assert data_manager is not None, "Invalid data manager (%s) requested. It may have been removed before the job completed." % ( data_manager_id )
data_manager.process_result( out_data )
[docs] def get_default_history_by_trans( self, trans, create=False ):
def _create_data_manager_history( user ):
history = trans.app.model.History( name='Data Manager History (automatically created)', user=user )
data_manager_association = trans.app.model.DataManagerHistoryAssociation( user=user, history=history )
trans.sa_session.add_all( ( history, data_manager_association ) )
trans.sa_session.flush()
return history
user = trans.user
assert user, 'You must be logged in to use this tool.'
assert self.allow_user_access( user ), "You must be an admin to access this tool."
history = user.data_manager_histories
if not history:
#create
if create:
history = _create_data_manager_history( user )
else:
history = None
else:
for history in reversed( history ):
history = history.history
if not history.deleted:
break
if history.deleted:
if create:
history = _create_data_manager_history( user )
else:
history = None
return history
[docs] def allow_user_access( self, user, attempting_access=True ):
"""
:param user: model object representing user.
:type user: galaxy.model.User
:param attempting_access: is the user attempting to do something with the
the tool (set false for incidental checks like toolbox
listing)
:type attempting_access: bool
:returns: bool -- Whether the user is allowed to access the tool.
Data Manager tools are only accessible to admins.
"""
if super( DataManagerTool, self ).allow_user_access( user ) and self.app.config.is_admin_user( user ):
return True
# If this is just an incidental check - do not log the scary message
# about users attempting to do something problematic.
if attempting_access:
if user:
user = user.id
log.debug( "User (%s) attempted to access a data manager tool (%s), but is not an admin.", user, self.id )
return False
# Populate tool_type to ToolClass mappings
tool_types = {}
for tool_class in [ Tool, SetMetadataTool, OutputParameterJSONTool,
DataManagerTool, DataSourceTool, AsyncDataSourceTool,
DataDestinationTool ]:
tool_types[ tool_class.tool_type ] = tool_class
# ---- Utility classes to be factored out -----------------------------------
[docs]class TracksterConfig:
""" Trackster configuration encapsulation. """
def __init__( self, actions ):
self.actions = actions
@staticmethod
[docs] def parse( root ):
actions = []
for action_elt in root.findall( "action" ):
actions.append( SetParamAction.parse( action_elt ) )
return TracksterConfig( actions )
[docs]class SetParamAction:
""" Set parameter action. """
def __init__( self, name, output_name ):
self.name = name
self.output_name = output_name
@staticmethod
[docs] def parse( elt ):
""" Parse action from element. """
return SetParamAction( elt.get( "name" ), elt.get( "output_name" ) )
[docs]def json_fix( val ):
if isinstance( val, list ):
return [ json_fix( v ) for v in val ]
elif isinstance( val, dict ):
return dict( [ ( json_fix( k ), json_fix( v ) ) for ( k, v ) in val.iteritems() ] )
elif isinstance( val, unicode ):
return val.encode( "utf8" )
else:
return val
[docs]def check_param_from_incoming( trans, state, input, incoming, key, context, source ):
"""
Unlike "update" state, this preserves default if no incoming value found.
This lets API user specify just a subset of params and allow defaults to be
used when available.
"""
default_input_value = state.get( input.name, None )
incoming_value = get_incoming_value( incoming, key, default_input_value )
value, error = check_param( trans, input, incoming_value, context, source=source )
return value, error
[docs]def get_incoming_value( incoming, key, default ):
"""
Fetch value from incoming dict directly or check special nginx upload
created variants of this key.
"""
if "__" + key + "__is_composite" in incoming:
composite_keys = incoming["__" + key + "__keys"].split()
value = dict()
for composite_key in composite_keys:
value[composite_key] = incoming[key + "_" + composite_key]
return value
else:
return incoming.get( key, default )