Source code for tool_shed.util.common_util
import json
import logging
import os
import urllib2
from galaxy import util
from galaxy.util.odict import odict
from galaxy.web import url_for
from tool_shed.util import encoding_util
from tool_shed.util import xml_util
log = logging.getLogger( __name__ )
REPOSITORY_OWNER = 'devteam'
[docs]def accumulate_tool_dependencies( tool_shed_accessible, tool_dependencies, all_tool_dependencies ):
if tool_shed_accessible:
if tool_dependencies:
for tool_dependency in tool_dependencies:
if tool_dependency not in all_tool_dependencies:
all_tool_dependencies.append( tool_dependency )
return all_tool_dependencies
[docs]def check_for_missing_tools( app, tool_panel_configs, latest_tool_migration_script_number ):
# Get the 000x_tools.xml file associated with the current migrate_tools version number.
tools_xml_file_path = os.path.abspath( os.path.join( 'scripts', 'migrate_tools', '%04d_tools.xml' % latest_tool_migration_script_number ) )
# Parse the XML and load the file attributes for later checking against the proprietary tool_panel_config.
migrated_tool_configs_dict = odict()
tree, error_message = xml_util.parse_xml( tools_xml_file_path )
if tree is None:
return False, odict()
root = tree.getroot()
tool_shed = root.get( 'name' )
tool_shed_url = get_tool_shed_url_from_tool_shed_registry( app, tool_shed )
# The default behavior is that the tool shed is down.
tool_shed_accessible = False
missing_tool_configs_dict = odict()
if tool_shed_url:
for elem in root:
if elem.tag == 'repository':
repository_dependencies = []
all_tool_dependencies = []
repository_name = elem.get( 'name' )
changeset_revision = elem.get( 'changeset_revision' )
tool_shed_accessible, repository_dependencies_dict = get_repository_dependencies( app,
tool_shed_url,
repository_name,
REPOSITORY_OWNER,
changeset_revision )
if tool_shed_accessible:
# Accumulate all tool dependencies defined for repository dependencies for display to the user.
for rd_key, rd_tups in repository_dependencies_dict.items():
if rd_key in [ 'root_key', 'description' ]:
continue
for rd_tup in rd_tups:
tool_shed, name, owner, changeset_revision, prior_installation_required, only_if_compiling_contained_td = \
parse_repository_dependency_tuple( rd_tup )
tool_shed_accessible, tool_dependencies = get_tool_dependencies( app,
tool_shed_url,
name,
owner,
changeset_revision )
all_tool_dependencies = accumulate_tool_dependencies( tool_shed_accessible, tool_dependencies, all_tool_dependencies )
tool_shed_accessible, tool_dependencies = get_tool_dependencies( app,
tool_shed_url,
repository_name,
REPOSITORY_OWNER,
changeset_revision )
all_tool_dependencies = accumulate_tool_dependencies( tool_shed_accessible, tool_dependencies, all_tool_dependencies )
for tool_elem in elem.findall( 'tool' ):
tool_config_file_name = tool_elem.get( 'file' )
if tool_config_file_name:
# We currently do nothing with repository dependencies except install them (we do not display repositories that will be
# installed to the user). However, we'll store them in the following dictionary in case we choose to display them in the
# future.
dependencies_dict = dict( tool_dependencies=all_tool_dependencies,
repository_dependencies=repository_dependencies )
migrated_tool_configs_dict[ tool_config_file_name ] = dependencies_dict
else:
break
if tool_shed_accessible:
# Parse the proprietary tool_panel_configs (the default is tool_conf.xml) and generate the list of missing tool config file names.
for tool_panel_config in tool_panel_configs:
tree, error_message = xml_util.parse_xml( tool_panel_config )
if tree:
root = tree.getroot()
for elem in root:
if elem.tag == 'tool':
missing_tool_configs_dict = check_tool_tag_set( elem, migrated_tool_configs_dict, missing_tool_configs_dict )
elif elem.tag == 'section':
for section_elem in elem:
if section_elem.tag == 'tool':
missing_tool_configs_dict = check_tool_tag_set( section_elem, migrated_tool_configs_dict, missing_tool_configs_dict )
else:
exception_msg = '\n\nThe entry for the main Galaxy tool shed at %s is missing from the %s file. ' % ( tool_shed, app.config.tool_sheds_config )
exception_msg += 'The entry for this tool shed must always be available in this file, so re-add it before attempting to start your Galaxy server.\n'
raise Exception( exception_msg )
return tool_shed_accessible, missing_tool_configs_dict
[docs]def check_tool_tag_set( elem, migrated_tool_configs_dict, missing_tool_configs_dict ):
file_path = elem.get( 'file', None )
if file_path:
path, name = os.path.split( file_path )
for migrated_tool_config in migrated_tool_configs_dict.keys():
if migrated_tool_config in [ file_path, name ]:
missing_tool_configs_dict[ name ] = migrated_tool_configs_dict[ migrated_tool_config ]
return missing_tool_configs_dict
[docs]def generate_clone_url_for_installed_repository( app, repository ):
"""Generate the URL for cloning a repository that has been installed into a Galaxy instance."""
tool_shed_url = get_tool_shed_url_from_tool_shed_registry( app, str( repository.tool_shed ) )
return url_join( tool_shed_url, 'repos', str( repository.owner ), str( repository.name ) )
[docs]def generate_clone_url_for_repository_in_tool_shed( user, repository ):
"""Generate the URL for cloning a repository that is in the tool shed."""
base_url = url_for( '/', qualified=True ).rstrip( '/' )
if user:
protocol, base = base_url.split( '://' )
username = '%s@' % user.username
return '%s://%s%s/repos/%s/%s' % ( protocol, username, base, repository.user.username, repository.name )
else:
return '%s/repos/%s/%s' % ( base_url, repository.user.username, repository.name )
[docs]def generate_clone_url_from_repo_info_tup( app, repo_info_tup ):
"""Generate the URL for cloning a repository given a tuple of toolshed, name, owner, changeset_revision."""
# Example tuple: ['http://localhost:9009', 'blast_datatypes', 'test', '461a4216e8ab', False]
toolshed, name, owner, changeset_revision, prior_installation_required, only_if_compiling_contained_td = \
parse_repository_dependency_tuple( repo_info_tup )
tool_shed_url = get_tool_shed_url_from_tool_shed_registry( app, toolshed )
# Don't include the changeset_revision in clone urls.
return url_join( tool_shed_url, 'repos', owner, name )
[docs]def get_non_shed_tool_panel_configs( app ):
"""Get the non-shed related tool panel configs - there can be more than one, and the default is tool_conf.xml."""
config_filenames = []
for config_filename in app.config.tool_configs:
# Any config file that includes a tool_path attribute in the root tag set like the following is shed-related.
# <toolbox tool_path="../shed_tools">
tree, error_message = xml_util.parse_xml( config_filename )
if tree is None:
continue
root = tree.getroot()
tool_path = root.get( 'tool_path', None )
if tool_path is None:
config_filenames.append( config_filename )
return config_filenames
[docs]def get_repository_dependencies( app, tool_shed_url, repository_name, repository_owner, changeset_revision ):
repository_dependencies_dict = {}
tool_shed_accessible = True
url = '%s/repository/get_repository_dependencies?name=%s&owner=%s&changeset_revision=%s' % \
( tool_shed_url, repository_name, repository_owner, changeset_revision )
try:
raw_text = tool_shed_get( app, tool_shed_url, url )
tool_shed_accessible = True
except Exception, e:
tool_shed_accessible = False
print "The URL\n%s\nraised the exception:\n%s\n" % ( url, str( e ) )
if tool_shed_accessible:
if len( raw_text ) > 2:
encoded_text = json.loads( raw_text )
repository_dependencies_dict = encoding_util.tool_shed_decode( encoded_text )
return tool_shed_accessible, repository_dependencies_dict
[docs]def get_protocol_from_tool_shed_url( tool_shed_url ):
"""Return the protocol from the received tool_shed_url if it exists."""
try:
if tool_shed_url.find( '://' ) > 0:
return tool_shed_url.split( '://' )[0].lower()
except Exception, e:
# We receive a lot of calls here where the tool_shed_url is None. The container_util uses
# that value when creating a header row. If the tool_shed_url is not None, we have a problem.
if tool_shed_url is not None:
log.exception( "Handled exception getting the protocol from Tool Shed URL %s:\n%s" % ( str( tool_shed_url ), str( e ) ) )
# Default to HTTP protocol.
return 'http'
[docs]def get_tool_dependencies( app, tool_shed_url, repository_name, repository_owner, changeset_revision ):
tool_dependencies = []
tool_shed_accessible = True
url = '%s/repository/get_tool_dependencies?name=%s&owner=%s&changeset_revision=%s' % \
( tool_shed_url, repository_name, repository_owner, changeset_revision )
try:
text = tool_shed_get( app, tool_shed_url, url )
tool_shed_accessible = True
except Exception, e:
tool_shed_accessible = False
print "The URL\n%s\nraised the exception:\n%s\n" % ( url, str( e ) )
if tool_shed_accessible:
if text:
tool_dependencies_dict = encoding_util.tool_shed_decode( text )
for dependency_key, requirements_dict in tool_dependencies_dict.items():
tool_dependency_name = requirements_dict[ 'name' ]
tool_dependency_version = requirements_dict[ 'version' ]
tool_dependency_type = requirements_dict[ 'type' ]
tool_dependencies.append( ( tool_dependency_name, tool_dependency_version, tool_dependency_type ) )
return tool_shed_accessible, tool_dependencies
[docs]def get_tool_shed_repository_ids( as_string=False, **kwd ):
tsrid = kwd.get( 'tool_shed_repository_id', None )
tsridslist = util.listify( kwd.get( 'tool_shed_repository_ids', None ) )
if not tsridslist:
tsridslist = util.listify( kwd.get( 'id', None ) )
if tsridslist is not None:
if tsrid is not None and tsrid not in tsridslist:
tsridslist.append( tsrid )
if as_string:
return ','.join( tsridslist )
return tsridslist
else:
tsridslist = util.listify( kwd.get( 'ordered_tsr_ids', None ) )
if tsridslist is not None:
if as_string:
return ','.join( tsridslist )
return tsridslist
if as_string:
''
return []
[docs]def get_tool_shed_url_from_tool_shed_registry( app, tool_shed ):
"""
The value of tool_shed is something like: toolshed.g2.bx.psu.edu. We need the URL to this tool shed, which is
something like: http://toolshed.g2.bx.psu.edu/
"""
cleaned_tool_shed = remove_protocol_from_tool_shed_url( tool_shed )
for shed_name, shed_url in app.tool_shed_registry.tool_sheds.items():
if shed_url.find( cleaned_tool_shed ) >= 0:
if shed_url.endswith( '/' ):
shed_url = shed_url.rstrip( '/' )
return shed_url
# The tool shed from which the repository was originally installed must no longer be configured in tool_sheds_conf.xml.
return None
[docs]def handle_galaxy_url( trans, **kwd ):
galaxy_url = kwd.get( 'galaxy_url', None )
if galaxy_url:
trans.set_cookie( galaxy_url, name='toolshedgalaxyurl' )
else:
galaxy_url = trans.get_cookie( name='toolshedgalaxyurl' )
return galaxy_url
[docs]def handle_tool_shed_url_protocol( app, shed_url ):
"""Handle secure and insecure HTTP protocol since they may change over time."""
try:
if app.name == 'galaxy':
url = remove_protocol_from_tool_shed_url( shed_url )
tool_shed_url = get_tool_shed_url_from_tool_shed_registry( app, url )
else:
tool_shed_url = str( url_for( '/', qualified=True ) ).rstrip( '/' )
return tool_shed_url
except Exception, e:
# We receive a lot of calls here where the tool_shed_url is None. The container_util uses
# that value when creating a header row. If the tool_shed_url is not None, we have a problem.
if shed_url is not None:
log.exception( "Handled exception removing protocol from URL %s:\n%s" % ( str( shed_url ), str( e ) ) )
return shed_url
[docs]def parse_repository_dependency_tuple( repository_dependency_tuple, contains_error=False ):
# Default both prior_installation_required and only_if_compiling_contained_td to False in cases where metadata should be reset on the
# repository containing the repository_dependency definition.
prior_installation_required = 'False'
only_if_compiling_contained_td = 'False'
if contains_error:
if len( repository_dependency_tuple ) == 5:
tool_shed, name, owner, changeset_revision, error = repository_dependency_tuple
elif len( repository_dependency_tuple ) == 6:
tool_shed, name, owner, changeset_revision, prior_installation_required, error = repository_dependency_tuple
elif len( repository_dependency_tuple ) == 7:
tool_shed, name, owner, changeset_revision, prior_installation_required, only_if_compiling_contained_td, error = \
repository_dependency_tuple
return tool_shed, name, owner, changeset_revision, prior_installation_required, only_if_compiling_contained_td, error
else:
if len( repository_dependency_tuple ) == 4:
tool_shed, name, owner, changeset_revision = repository_dependency_tuple
elif len( repository_dependency_tuple ) == 5:
tool_shed, name, owner, changeset_revision, prior_installation_required = repository_dependency_tuple
elif len( repository_dependency_tuple ) == 6:
tool_shed, name, owner, changeset_revision, prior_installation_required, only_if_compiling_contained_td = repository_dependency_tuple
return tool_shed, name, owner, changeset_revision, prior_installation_required, only_if_compiling_contained_td
[docs]def remove_port_from_tool_shed_url( tool_shed_url ):
"""Return a partial Tool Shed URL, eliminating the port if it exists."""
try:
if tool_shed_url.find( ':' ) > 0:
# Eliminate the port, if any, since it will result in an invalid directory name.
new_tool_shed_url = tool_shed_url.split( ':' )[ 0 ]
else:
new_tool_shed_url = tool_shed_url
return new_tool_shed_url.rstrip( '/' )
except Exception, e:
# We receive a lot of calls here where the tool_shed_url is None. The container_util uses
# that value when creating a header row. If the tool_shed_url is not None, we have a problem.
if tool_shed_url is not None:
log.exception( "Handled exception removing the port from Tool Shed URL %s:\n%s" % ( str( tool_shed_url ), str( e ) ) )
return tool_shed_url
[docs]def remove_protocol_and_port_from_tool_shed_url( tool_shed_url ):
"""Return a partial Tool Shed URL, eliminating the protocol and/or port if either exists."""
tool_shed = remove_protocol_from_tool_shed_url( tool_shed_url )
tool_shed = remove_port_from_tool_shed_url( tool_shed )
return tool_shed
[docs]def remove_protocol_and_user_from_clone_url( repository_clone_url ):
"""Return a URL that can be used to clone a repository, eliminating the protocol and user if either exists."""
if repository_clone_url.find( '@' ) > 0:
# We have an url that includes an authenticated user, something like:
# http://test@bx.psu.edu:9009/repos/some_username/column
items = repository_clone_url.split( '@' )
tmp_url = items[ 1 ]
elif repository_clone_url.find( '//' ) > 0:
# We have an url that includes only a protocol, something like:
# http://bx.psu.edu:9009/repos/some_username/column
items = repository_clone_url.split( '//' )
tmp_url = items[ 1 ]
else:
tmp_url = repository_clone_url
return tmp_url.rstrip( '/' )
[docs]def remove_protocol_from_tool_shed_url( tool_shed_url ):
"""Return a partial Tool Shed URL, eliminating the protocol if it exists."""
try:
if tool_shed_url.find( '://' ) > 0:
new_tool_shed_url = tool_shed_url.split( '://' )[1]
else:
new_tool_shed_url = tool_shed_url
return new_tool_shed_url.rstrip( '/' )
except Exception, e:
# We receive a lot of calls here where the tool_shed_url is None. The container_util uses
# that value when creating a header row. If the tool_shed_url is not None, we have a problem.
if tool_shed_url is not None:
log.exception( "Handled exception removing the protocol from Tool Shed URL %s:\n%s" % ( str( tool_shed_url ), str( e ) ) )
return tool_shed_url
[docs]def tool_shed_get( app, tool_shed_url, uri ):
"""Make contact with the tool shed via the uri provided."""
registry = app.tool_shed_registry
# urllib2 auto-detects system proxies, when passed a Proxyhandler.
# Refer: https://docs.python.org/2/howto/urllib2.html#proxies
proxy = urllib2.ProxyHandler()
urlopener = urllib2.build_opener( proxy )
urllib2.install_opener( urlopener )
password_mgr = registry.password_manager_for_url( tool_shed_url )
if password_mgr is not None:
auth_handler = urllib2.HTTPBasicAuthHandler( password_mgr )
urlopener.add_handler( auth_handler )
response = urlopener.open( uri )
content = response.read()
response.close()
return content
[docs]def url_join( *args ):
"""Return a valid URL produced by appending a base URL and a set of request parameters."""
parts = []
for arg in args:
if arg is not None:
parts.append( arg.strip( '/' ) )
return '/'.join( parts )