import logging
import os
import shutil
import galaxy.tools
from galaxy import util
from galaxy.datatypes import checkers
from galaxy.util.expressions import ExpressionContext
from galaxy.web.form_builder import SelectField
from tool_shed.util import basic_util
log = logging.getLogger( __name__ )
[docs]def copy_sample_file( app, filename, dest_path=None ):
"""
Copy xxx.sample to dest_path/xxx.sample and dest_path/xxx. The default value for dest_path
is ~/tool-data.
"""
if dest_path is None:
dest_path = os.path.abspath( app.config.tool_data_path )
sample_file_name = basic_util.strip_path( filename )
copied_file = sample_file_name.replace( '.sample', '' )
full_source_path = os.path.abspath( filename )
full_destination_path = os.path.join( dest_path, sample_file_name )
# Don't copy a file to itself - not sure how this happens, but sometimes it does...
if full_source_path != full_destination_path:
# It's ok to overwrite the .sample version of the file.
shutil.copy( full_source_path, full_destination_path )
# Only create the .loc file if it does not yet exist. We don't overwrite it in case it
# contains stuff proprietary to the local instance.
if not os.path.exists( os.path.join( dest_path, copied_file ) ):
shutil.copy( full_source_path, os.path.join( dest_path, copied_file ) )
[docs]def copy_sample_files( app, sample_files, tool_path=None, sample_files_copied=None, dest_path=None ):
"""
Copy all appropriate files to dest_path in the local Galaxy environment that have not
already been copied. Those that have been copied are contained in sample_files_copied.
The default value for dest_path is ~/tool-data. We need to be careful to copy only
appropriate files here because tool shed repositories can contain files ending in .sample
that should not be copied to the ~/tool-data directory.
"""
filenames_not_to_copy = [ 'tool_data_table_conf.xml.sample' ]
sample_files_copied = util.listify( sample_files_copied )
for filename in sample_files:
filename_sans_path = os.path.split( filename )[ 1 ]
if filename_sans_path not in filenames_not_to_copy and filename not in sample_files_copied:
if tool_path:
filename = os.path.join( tool_path, filename )
# Attempt to ensure we're copying an appropriate file.
if is_data_index_sample_file( filename ):
copy_sample_file( app, filename, dest_path=dest_path )
[docs]def handle_missing_index_file( app, tool_path, sample_files, repository_tools_tups, sample_files_copied ):
"""
Inspect each tool to see if it has any input parameters that are dynamically
generated select lists that depend on a .loc file. This method is not called
from the tool shed, but from Galaxy when a repository is being installed.
"""
for index, repository_tools_tup in enumerate( repository_tools_tups ):
tup_path, guid, repository_tool = repository_tools_tup
params_with_missing_index_file = repository_tool.params_with_missing_index_file
for param in params_with_missing_index_file:
options = param.options
missing_file_name = basic_util.strip_path( options.missing_index_file )
if missing_file_name not in sample_files_copied:
# The repository must contain the required xxx.loc.sample file.
for sample_file in sample_files:
sample_file_name = basic_util.strip_path( sample_file )
if sample_file_name == '%s.sample' % missing_file_name:
copy_sample_file( app, sample_file )
if options.tool_data_table and options.tool_data_table.missing_index_file:
options.tool_data_table.handle_found_index_file( options.missing_index_file )
sample_files_copied.append( options.missing_index_file )
break
# Reload the tool into the local list of repository_tools_tups.
repository_tool = app.toolbox.load_tool( os.path.join( tool_path, tup_path ), guid=guid )
repository_tools_tups[ index ] = ( tup_path, guid, repository_tool )
return repository_tools_tups, sample_files_copied
[docs]def is_column_based( fname, sep='\t', skip=0, is_multi_byte=False ):
"""See if the file is column based with respect to a separator."""
headers = get_headers( fname, sep, is_multi_byte=is_multi_byte )
count = 0
if not headers:
return False
for hdr in headers[ skip: ]:
if hdr and hdr[ 0 ] and not hdr[ 0 ].startswith( '#' ):
if len( hdr ) > 1:
count = len( hdr )
break
if count < 2:
return False
for hdr in headers[ skip: ]:
if hdr and hdr[ 0 ] and not hdr[ 0 ].startswith( '#' ):
if len( hdr ) != count:
return False
return True
[docs]def is_data_index_sample_file( file_path ):
"""
Attempt to determine if a .sample file is appropriate for copying to ~/tool-data when
a tool shed repository is being installed into a Galaxy instance.
"""
# Currently most data index files are tabular, so check that first. We'll assume that
# if the file is tabular, it's ok to copy.
if is_column_based( file_path ):
return True
# If the file is any of the following, don't copy it.
if checkers.check_html( file_path ):
return False
if checkers.check_image( file_path ):
return False
if checkers.check_binary( name=file_path ):
return False
if checkers.is_bz2( file_path ):
return False
if checkers.is_gzip( file_path ):
return False
if checkers.check_zip( file_path ):
return False
# Default to copying the file if none of the above are true.
return True
[docs]def new_state( trans, tool, invalid=False ):
"""Create a new `DefaultToolState` for the received tool. Only inputs on the first page will be initialized."""
state = galaxy.tools.DefaultToolState()
state.inputs = {}
if invalid:
# We're attempting to display a tool in the tool shed that has been determined to have errors, so is invalid.
return state
try:
# Attempt to generate the tool state using the standard Galaxy-side code
return tool.new_state( trans )
except Exception, e:
# Fall back to building tool state as below
log.debug( 'Failed to build tool state for tool "%s" using standard method, will try to fall back on custom method: %s', tool.id, e )
inputs = tool.inputs_by_page[ 0 ]
context = ExpressionContext( state.inputs, parent=None )
for input in inputs.itervalues():
try:
state.inputs[ input.name ] = input.get_initial_value( trans, context )
except:
# FIXME: not all values should be an empty list
state.inputs[ input.name ] = []
return state
[docs]def panel_entry_per_tool( tool_section_dict ):
# Return True if tool_section_dict looks like this.
# {<Tool guid> :
# [{ tool_config : <tool_config_file>,
# id: <ToolSection id>,
# version : <ToolSection version>,
# name : <TooSection name>}]}
# But not like this.
# { id: <ToolSection id>, version : <ToolSection version>, name : <TooSection name>}
if not tool_section_dict:
return False
if len( tool_section_dict ) != 3:
return True
for k, v in tool_section_dict:
if k not in [ 'id', 'version', 'name' ]:
return True
return False