Source code for tool_shed.util.tool_util

import logging
import os
import shutil

import galaxy.tools
from galaxy import util
from galaxy.datatypes import checkers
from galaxy.util.expressions import ExpressionContext
from galaxy.web.form_builder import SelectField

from tool_shed.util import basic_util

log = logging.getLogger( __name__ )


[docs]def build_shed_tool_conf_select_field( app ): """Build a SelectField whose options are the keys in app.toolbox.shed_tool_confs.""" options = [] for dynamic_tool_conf_filename in app.toolbox.dynamic_conf_filenames(): if dynamic_tool_conf_filename.startswith( './' ): option_label = dynamic_tool_conf_filename.replace( './', '', 1 ) else: option_label = dynamic_tool_conf_filename options.append( ( option_label, dynamic_tool_conf_filename ) ) select_field = SelectField( name='shed_tool_conf' ) for option_tup in options: select_field.add_option( option_tup[ 0 ], option_tup[ 1 ] ) return select_field
[docs]def build_tool_panel_section_select_field( app ): """Build a SelectField whose options are the sections of the current in-memory toolbox.""" options = [] for section_id, section_name in app.toolbox.get_sections(): options.append( ( section_name, section_id ) ) select_field = SelectField( name='tool_panel_section_id', display='radio' ) for option_tup in options: select_field.add_option( option_tup[ 0 ], option_tup[ 1 ] ) return select_field
[docs]def copy_sample_file( app, filename, dest_path=None ): """ Copy xxx.sample to dest_path/xxx.sample and dest_path/xxx. The default value for dest_path is ~/tool-data. """ if dest_path is None: dest_path = os.path.abspath( app.config.tool_data_path ) sample_file_name = basic_util.strip_path( filename ) copied_file = sample_file_name.replace( '.sample', '' ) full_source_path = os.path.abspath( filename ) full_destination_path = os.path.join( dest_path, sample_file_name ) # Don't copy a file to itself - not sure how this happens, but sometimes it does... if full_source_path != full_destination_path: # It's ok to overwrite the .sample version of the file. shutil.copy( full_source_path, full_destination_path ) # Only create the .loc file if it does not yet exist. We don't overwrite it in case it # contains stuff proprietary to the local instance. if not os.path.exists( os.path.join( dest_path, copied_file ) ): shutil.copy( full_source_path, os.path.join( dest_path, copied_file ) )
[docs]def copy_sample_files( app, sample_files, tool_path=None, sample_files_copied=None, dest_path=None ): """ Copy all appropriate files to dest_path in the local Galaxy environment that have not already been copied. Those that have been copied are contained in sample_files_copied. The default value for dest_path is ~/tool-data. We need to be careful to copy only appropriate files here because tool shed repositories can contain files ending in .sample that should not be copied to the ~/tool-data directory. """ filenames_not_to_copy = [ 'tool_data_table_conf.xml.sample' ] sample_files_copied = util.listify( sample_files_copied ) for filename in sample_files: filename_sans_path = os.path.split( filename )[ 1 ] if filename_sans_path not in filenames_not_to_copy and filename not in sample_files_copied: if tool_path: filename = os.path.join( tool_path, filename ) # Attempt to ensure we're copying an appropriate file. if is_data_index_sample_file( filename ): copy_sample_file( app, filename, dest_path=dest_path )
[docs]def generate_message_for_invalid_tools( app, invalid_file_tups, repository, metadata_dict, as_html=True, displaying_invalid_tool=False ): if as_html: new_line = '<br/>' bold_start = '<b>' bold_end = '</b>' else: new_line = '\n' bold_start = '' bold_end = '' message = '' if app.name == 'galaxy': tip_rev = str( repository.changeset_revision ) else: tip_rev = str( repository.tip( app ) ) if not displaying_invalid_tool: if metadata_dict: message += "Metadata may have been defined for some items in revision '%s'. " % tip_rev message += "Correct the following problems if necessary and reset metadata.%s" % new_line else: message += "Metadata cannot be defined for revision '%s' so this revision cannot be automatically " % tip_rev message += "installed into a local Galaxy instance. Correct the following problems and reset metadata.%s" % new_line for itc_tup in invalid_file_tups: tool_file, exception_msg = itc_tup if exception_msg.find( 'No such file or directory' ) >= 0: exception_items = exception_msg.split() missing_file_items = exception_items[ 7 ].split( '/' ) missing_file = missing_file_items[ -1 ].rstrip( '\'' ) if missing_file.endswith( '.loc' ): sample_ext = '%s.sample' % missing_file else: sample_ext = missing_file correction_msg = "This file refers to a missing file %s%s%s. " % \ ( bold_start, str( missing_file ), bold_end ) correction_msg += "Upload a file named %s%s%s to the repository to correct this error." % \ ( bold_start, sample_ext, bold_end ) else: if as_html: correction_msg = exception_msg else: correction_msg = exception_msg.replace( '<br/>', new_line ).replace( '<b>', bold_start ).replace( '</b>', bold_end ) message += "%s%s%s - %s%s" % ( bold_start, tool_file, bold_end, correction_msg, new_line ) return message
[docs]def get_headers( fname, sep, count=60, is_multi_byte=False ): """Returns a list with the first 'count' lines split by 'sep'.""" headers = [] for idx, line in enumerate( file( fname ) ): line = line.rstrip( '\n\r' ) if is_multi_byte: line = unicode( line, 'utf-8' ) sep = sep.encode( 'utf-8' ) headers.append( line.split( sep ) ) if idx == count: break return headers
[docs]def get_tool_path_install_dir( partial_install_dir, shed_tool_conf_dict, tool_dict, config_elems ): for elem in config_elems: if elem.tag == 'tool': if elem.get( 'guid' ) == tool_dict[ 'guid' ]: tool_path = shed_tool_conf_dict[ 'tool_path' ] relative_install_dir = os.path.join( tool_path, partial_install_dir ) return tool_path, relative_install_dir elif elem.tag == 'section': for section_elem in elem: if section_elem.tag == 'tool': if section_elem.get( 'guid' ) == tool_dict[ 'guid' ]: tool_path = shed_tool_conf_dict[ 'tool_path' ] relative_install_dir = os.path.join( tool_path, partial_install_dir ) return tool_path, relative_install_dir return None, None
[docs]def handle_missing_index_file( app, tool_path, sample_files, repository_tools_tups, sample_files_copied ): """ Inspect each tool to see if it has any input parameters that are dynamically generated select lists that depend on a .loc file. This method is not called from the tool shed, but from Galaxy when a repository is being installed. """ for index, repository_tools_tup in enumerate( repository_tools_tups ): tup_path, guid, repository_tool = repository_tools_tup params_with_missing_index_file = repository_tool.params_with_missing_index_file for param in params_with_missing_index_file: options = param.options missing_file_name = basic_util.strip_path( options.missing_index_file ) if missing_file_name not in sample_files_copied: # The repository must contain the required xxx.loc.sample file. for sample_file in sample_files: sample_file_name = basic_util.strip_path( sample_file ) if sample_file_name == '%s.sample' % missing_file_name: copy_sample_file( app, sample_file ) if options.tool_data_table and options.tool_data_table.missing_index_file: options.tool_data_table.handle_found_index_file( options.missing_index_file ) sample_files_copied.append( options.missing_index_file ) break # Reload the tool into the local list of repository_tools_tups. repository_tool = app.toolbox.load_tool( os.path.join( tool_path, tup_path ), guid=guid ) repository_tools_tups[ index ] = ( tup_path, guid, repository_tool ) return repository_tools_tups, sample_files_copied
[docs]def is_column_based( fname, sep='\t', skip=0, is_multi_byte=False ): """See if the file is column based with respect to a separator.""" headers = get_headers( fname, sep, is_multi_byte=is_multi_byte ) count = 0 if not headers: return False for hdr in headers[ skip: ]: if hdr and hdr[ 0 ] and not hdr[ 0 ].startswith( '#' ): if len( hdr ) > 1: count = len( hdr ) break if count < 2: return False for hdr in headers[ skip: ]: if hdr and hdr[ 0 ] and not hdr[ 0 ].startswith( '#' ): if len( hdr ) != count: return False return True
[docs]def is_data_index_sample_file( file_path ): """ Attempt to determine if a .sample file is appropriate for copying to ~/tool-data when a tool shed repository is being installed into a Galaxy instance. """ # Currently most data index files are tabular, so check that first. We'll assume that # if the file is tabular, it's ok to copy. if is_column_based( file_path ): return True # If the file is any of the following, don't copy it. if checkers.check_html( file_path ): return False if checkers.check_image( file_path ): return False if checkers.check_binary( name=file_path ): return False if checkers.is_bz2( file_path ): return False if checkers.is_gzip( file_path ): return False if checkers.check_zip( file_path ): return False # Default to copying the file if none of the above are true. return True
[docs]def new_state( trans, tool, invalid=False ): """Create a new `DefaultToolState` for the received tool. Only inputs on the first page will be initialized.""" state = galaxy.tools.DefaultToolState() state.inputs = {} if invalid: # We're attempting to display a tool in the tool shed that has been determined to have errors, so is invalid. return state try: # Attempt to generate the tool state using the standard Galaxy-side code return tool.new_state( trans ) except Exception, e: # Fall back to building tool state as below log.debug( 'Failed to build tool state for tool "%s" using standard method, will try to fall back on custom method: %s', tool.id, e ) inputs = tool.inputs_by_page[ 0 ] context = ExpressionContext( state.inputs, parent=None ) for input in inputs.itervalues(): try: state.inputs[ input.name ] = input.get_initial_value( trans, context ) except: # FIXME: not all values should be an empty list state.inputs[ input.name ] = [] return state
[docs]def panel_entry_per_tool( tool_section_dict ): # Return True if tool_section_dict looks like this. # {<Tool guid> : # [{ tool_config : <tool_config_file>, # id: <ToolSection id>, # version : <ToolSection version>, # name : <TooSection name>}]} # But not like this. # { id: <ToolSection id>, version : <ToolSection version>, name : <TooSection name>} if not tool_section_dict: return False if len( tool_section_dict ) != 3: return True for k, v in tool_section_dict: if k not in [ 'id', 'version', 'name' ]: return True return False
[docs]def reload_upload_tools( app ): if hasattr( app, 'toolbox' ): app.toolbox.handle_datatypes_changed()