# -*- coding: utf-8 -*-
"""
Utility functions used systemwide.
"""
from __future__ import absolute_import
import binascii
import collections
import errno
import grp
import logging
import os
import random
import re
import shutil
import smtplib
import stat
import string
import sys
import time
import tempfile
import threading
import urlparse
from galaxy.util import json
from datetime import datetime
from email.MIMEText import MIMEText
from os.path import relpath
from hashlib import md5
from itertools import izip
from galaxy import eggs
eggs.require( 'docutils' )
import docutils.core
import docutils.writers.html4css1
from xml.etree import ElementTree, ElementInclude
eggs.require( "wchartype" )
import wchartype
from .inflection import Inflector, English
inflector = Inflector(English)
log = logging.getLogger(__name__)
_lock = threading.RLock()
CHUNK_SIZE = 65536 # 64k
DATABASE_MAX_STRING_SIZE = 32768
DATABASE_MAX_STRING_SIZE_PRETTY = '32K'
gzip_magic = '\037\213'
bz2_magic = 'BZh'
DEFAULT_ENCODING = os.environ.get('GALAXY_DEFAULT_ENCODING', 'utf-8')
NULL_CHAR = '\000'
BINARY_CHARS = [ NULL_CHAR ]
[docs]def is_multi_byte( chars ):
for char in chars:
try:
char = unicode( char )
except UnicodeDecodeError:
# Probably binary
return False
if ( wchartype.is_asian( char ) or wchartype.is_full_width( char ) or
wchartype.is_kanji( char ) or wchartype.is_hiragana( char ) or
wchartype.is_katakana( char ) or wchartype.is_half_katakana( char )
or wchartype.is_hangul( char ) or wchartype.is_full_digit( char )
or wchartype.is_full_letter( char )):
return True
return False
[docs]def is_binary( value, binary_chars=None ):
"""
File is binary if it contains a null-byte by default (e.g. behavior of grep, etc.).
This may fail for utf-16 files, but so would ASCII encoding.
>>> is_binary( string.printable )
False
>>> is_binary( '\\xce\\x94' )
False
>>> is_binary( '\\000' )
True
"""
if binary_chars is None:
binary_chars = BINARY_CHARS
for binary_char in binary_chars:
if binary_char in value:
return True
return False
[docs]def is_uuid( value ):
"""
This method returns True if value is a UUID, otherwise False.
>>> is_uuid( "123e4567-e89b-12d3-a456-426655440000" )
True
>>> is_uuid( "0x3242340298902834" )
False
"""
uuid_re = re.compile( "[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}" )
if re.match( uuid_re, str( value ) ):
return True
else:
return False
[docs]def synchronized(func):
"""This wrapper will serialize access to 'func' to a single thread. Use it as a decorator."""
def caller(*params, **kparams):
_lock.acquire(True) # Wait
try:
return func(*params, **kparams)
finally:
_lock.release()
return caller
[docs]def file_iter(fname, sep=None):
"""
This generator iterates over a file and yields its lines
splitted via the C{sep} parameter. Skips empty lines and lines starting with
the C{#} character.
>>> lines = [ line for line in file_iter(__file__) ]
>>> len(lines) != 0
True
"""
for line in file(fname):
if line and line[0] != '#':
yield line.split(sep)
[docs]def file_reader( fp, chunk_size=CHUNK_SIZE ):
"""This generator yields the open fileobject in chunks (default 64k). Closes the file at the end"""
while 1:
data = fp.read(chunk_size)
if not data:
break
yield data
fp.close()
[docs]def unique_id(KEY_SIZE=128):
"""
Generates an unique id
>>> ids = [ unique_id() for i in range(1000) ]
>>> len(set(ids))
1000
"""
return md5(str( random.getrandbits( KEY_SIZE ) )).hexdigest()
[docs]def parse_xml( fname ):
"""Returns a parsed xml tree"""
# handle deprecation warning for XMLParsing a file with DOCTYPE
class DoctypeSafeCallbackTarget( ElementTree.TreeBuilder ):
def doctype( *args ):
pass
tree = ElementTree.ElementTree()
root = tree.parse( fname, parser=ElementTree.XMLParser( target=DoctypeSafeCallbackTarget() ) )
ElementInclude.include( root )
return tree
[docs]def parse_xml_string(xml_string):
tree = ElementTree.fromstring(xml_string)
return tree
[docs]def xml_to_string( elem, pretty=False ):
"""Returns a string from an xml tree"""
if pretty:
elem = pretty_print_xml( elem )
try:
return ElementTree.tostring( elem )
except TypeError, e:
# we assume this is a comment
if hasattr( elem, 'text' ):
return "<!-- %s -->\n" % ( elem.text )
else:
raise e
[docs]def xml_element_compare( elem1, elem2 ):
if not isinstance( elem1, dict ):
elem1 = xml_element_to_dict( elem1 )
if not isinstance( elem2, dict ):
elem2 = xml_element_to_dict( elem2 )
return elem1 == elem2
[docs]def xml_element_list_compare( elem_list1, elem_list2 ):
return [ xml_element_to_dict( elem ) for elem in elem_list1 ] == [ xml_element_to_dict( elem ) for elem in elem_list2 ]
[docs]def xml_element_to_dict( elem ):
rval = {}
if elem.attrib:
rval[ elem.tag ] = {}
else:
rval[ elem.tag ] = None
sub_elems = list( elem )
if sub_elems:
sub_elem_dict = dict()
for sub_sub_elem_dict in map( xml_element_to_dict, sub_elems ):
for key, value in sub_sub_elem_dict.iteritems():
if key not in sub_elem_dict:
sub_elem_dict[ key ] = []
sub_elem_dict[ key ].append( value )
for key, value in sub_elem_dict.iteritems():
if len( value ) == 1:
rval[ elem.tag ][ key ] = value[0]
else:
rval[ elem.tag ][ key ] = value
if elem.attrib:
for key, value in elem.attrib.iteritems():
rval[ elem.tag ][ "@%s" % key ] = value
if elem.text:
text = elem.text.strip()
if text and sub_elems or elem.attrib:
rval[ elem.tag ][ '#text' ] = text
else:
rval[ elem.tag ] = text
return rval
[docs]def pretty_print_xml( elem, level=0 ):
pad = ' '
i = "\n" + level * pad
if len( elem ):
if not elem.text or not elem.text.strip():
elem.text = i + pad + pad
if not elem.tail or not elem.tail.strip():
elem.tail = i
for e in elem:
pretty_print_xml( e, level + 1 )
if not elem.tail or not elem.tail.strip():
elem.tail = i
else:
if level and ( not elem.tail or not elem.tail.strip() ):
elem.tail = i + pad
return elem
[docs]def get_file_size( value, default=None ):
try:
# try built-in
return os.path.getsize( value )
except:
try:
# try built-in one name attribute
return os.path.getsize( value.name )
except:
try:
# try tell() of end of object
offset = value.tell()
value.seek( 0, 2 )
rval = value.tell()
value.seek( offset )
return rval
except:
# return default value
return default
[docs]def shrink_stream_by_size( value, size, join_by="..", left_larger=True, beginning_on_size_error=False, end_on_size_error=False ):
rval = ''
if get_file_size( value ) > size:
start = value.tell()
len_join_by = len( join_by )
min_size = len_join_by + 2
if size < min_size:
if beginning_on_size_error:
rval = value.read( size )
value.seek( start )
return rval
elif end_on_size_error:
value.seek( -size, 2 )
rval = value.read( size )
value.seek( start )
return rval
raise ValueError( 'With the provided join_by value (%s), the minimum size value is %i.' % ( join_by, min_size ) )
left_index = right_index = int( ( size - len_join_by ) / 2 )
if left_index + right_index + len_join_by < size:
if left_larger:
left_index += 1
else:
right_index += 1
rval = value.read( left_index ) + join_by
value.seek( -right_index, 2 )
rval += value.read( right_index )
else:
while True:
data = value.read( CHUNK_SIZE )
if not data:
break
rval += data
return rval
[docs]def shrink_string_by_size( value, size, join_by="..", left_larger=True, beginning_on_size_error=False, end_on_size_error=False ):
if len( value ) > size:
len_join_by = len( join_by )
min_size = len_join_by + 2
if size < min_size:
if beginning_on_size_error:
return value[:size]
elif end_on_size_error:
return value[-size:]
raise ValueError( 'With the provided join_by value (%s), the minimum size value is %i.' % ( join_by, min_size ) )
left_index = right_index = int( ( size - len_join_by ) / 2 )
if left_index + right_index + len_join_by < size:
if left_larger:
left_index += 1
else:
right_index += 1
value = "%s%s%s" % ( value[:left_index], join_by, value[-right_index:] )
return value
[docs]def pretty_print_time_interval( time=False, precise=False ):
"""
Get a datetime object or a int() Epoch timestamp and return a
pretty string like 'an hour ago', 'Yesterday', '3 months ago',
'just now', etc
credit: http://stackoverflow.com/questions/1551382/user-friendly-time-format-in-python
"""
now = datetime.now()
if type( time ) is int:
diff = now - datetime.fromtimestamp( time )
elif isinstance( time, datetime ):
diff = now - time
elif not time:
diff = now - now
second_diff = diff.seconds
day_diff = diff.days
if day_diff < 0:
return ''
if precise:
if day_diff == 0:
if second_diff < 10:
return "just now"
if second_diff < 60:
return str(second_diff) + " seconds ago"
if second_diff < 120:
return "a minute ago"
if second_diff < 3600:
return str(second_diff / 60) + " minutes ago"
if second_diff < 7200:
return "an hour ago"
if second_diff < 86400:
return str(second_diff / 3600) + " hours ago"
if day_diff == 1:
return "yesterday"
if day_diff < 7:
return str( day_diff ) + " days ago"
if day_diff < 31:
return str( day_diff / 7 ) + " weeks ago"
if day_diff < 365:
return str( day_diff / 30 ) + " months ago"
return str( day_diff / 365 ) + " years ago"
else:
if day_diff == 0:
return "today"
if day_diff == 1:
return "yesterday"
if day_diff < 7:
return "less than a week"
if day_diff < 31:
return "less than a month"
if day_diff < 365:
return "less than a year"
return "a few years ago"
[docs]def pretty_print_json(json_data, is_json_string=False):
if is_json_string:
json_data = json.loads(json_data)
return json.dumps(json_data, sort_keys=True, indent=4)
# characters that are valid
valid_chars = set(string.letters + string.digits + " -=_.()/+*^,:?!")
# characters that are allowed but need to be escaped
mapped_chars = { '>': '__gt__',
'<': '__lt__',
"'": '__sq__',
'"': '__dq__',
'[': '__ob__',
']': '__cb__',
'{': '__oc__',
'}': '__cc__',
'@': '__at__',
'\n': '__cn__',
'\r': '__cr__',
'\t': '__tc__',
'#': '__pd__'}
[docs]def restore_text( text, character_map=mapped_chars ):
"""Restores sanitized text"""
if not text:
return text
for key, value in character_map.items():
text = text.replace(value, key)
return text
[docs]def sanitize_text( text, valid_characters=valid_chars, character_map=mapped_chars, invalid_character='X' ):
"""
Restricts the characters that are allowed in text; accepts both strings
and lists of strings; non-string entities will be cast to strings.
"""
if isinstance( text, list ):
return map( lambda x: sanitize_text( x, valid_characters=valid_characters, character_map=character_map, invalid_character=invalid_character ), text )
if not isinstance( text, basestring ):
text = smart_str( text )
return _sanitize_text_helper( text, valid_characters=valid_characters, character_map=character_map )
def _sanitize_text_helper( text, valid_characters=valid_chars, character_map=mapped_chars, invalid_character='X' ):
"""Restricts the characters that are allowed in a string"""
out = []
for c in text:
if c in valid_characters:
out.append(c)
elif c in character_map:
out.append( character_map[c] )
else:
out.append( invalid_character ) # makes debugging easier
return ''.join(out)
[docs]def sanitize_lists_to_string( values, valid_characters=valid_chars, character_map=mapped_chars, invalid_character='X' ):
if isinstance( values, list ):
rval = []
for value in values:
rval.append( sanitize_lists_to_string( value,
valid_characters=valid_characters,
character_map=character_map,
invalid_character=invalid_character ) )
values = ",".join( rval )
else:
values = sanitize_text( values, valid_characters=valid_characters, character_map=character_map, invalid_character=invalid_character )
return values
[docs]def sanitize_param( value, valid_characters=valid_chars, character_map=mapped_chars, invalid_character='X' ):
"""Clean incoming parameters (strings or lists)"""
if isinstance( value, basestring ):
return sanitize_text( value, valid_characters=valid_characters, character_map=character_map, invalid_character=invalid_character )
elif isinstance( value, list ):
return map( lambda x: sanitize_text( x, valid_characters=valid_characters, character_map=character_map, invalid_character=invalid_character ), value )
else:
raise Exception('Unknown parameter type (%s)' % ( type( value ) ))
valid_filename_chars = set( string.ascii_letters + string.digits + '_.' )
invalid_filenames = [ '', '.', '..' ]
[docs]def sanitize_for_filename( text, default=None ):
"""
Restricts the characters that are allowed in a filename portion; Returns default value or a unique id string if result is not a valid name.
Method is overly aggressive to minimize possible complications, but a maximum length is not considered.
"""
out = []
for c in text:
if c in valid_filename_chars:
out.append( c )
else:
out.append( '_' )
out = ''.join( out )
if out in invalid_filenames:
if default is None:
return sanitize_for_filename( str( unique_id() ) )
return default
return out
[docs]def mask_password_from_url( url ):
"""
Masks out passwords from connection urls like the database connection in galaxy.ini
>>> mask_password_from_url( 'sqlite+postgresql://user:password@localhost/' )
'sqlite+postgresql://user:********@localhost/'
>>> mask_password_from_url( 'amqp://user:amqp@localhost' )
'amqp://user:********@localhost'
>>> mask_password_from_url( 'amqp://localhost')
'amqp://localhost'
"""
split = urlparse.urlsplit(url)
if split.password:
if url.count(split.password) == 1:
url = url.replace(split.password, "********")
else:
# This can manipulate the input other than just masking password,
# so the previous string replace method is preferred when the
# password doesn't appear twice in the url
split = split._replace(netloc=split.netloc.replace("%s:%s" % (split.username, split.password), '%s:********' % split.username))
url = urlparse.urlunsplit(split)
return url
[docs]def ready_name_for_url( raw_name ):
""" General method to convert a string (i.e. object name) to a URL-ready
slug.
>>> ready_name_for_url( "My Cool Object" )
'My-Cool-Object'
>>> ready_name_for_url( "!My Cool Object!" )
'My-Cool-Object'
>>> ready_name_for_url( "Helloâ‚©â—ŽÒ‘ÊŸâ…¾" )
'Hello'
"""
# Replace whitespace with '-'
slug_base = re.sub( "\s+", "-", raw_name )
# Remove all non-alphanumeric characters.
slug_base = re.sub( "[^a-zA-Z0-9\-]", "", slug_base )
# Remove trailing '-'.
if slug_base.endswith('-'):
slug_base = slug_base[:-1]
return slug_base
[docs]def in_directory( file, directory, local_path_module=os.path ):
"""
Return true, if the common prefix of both is equal to directory
e.g. /a/b/c/d.rst and directory is /a/b, the common prefix is /a/b
"""
# Make both absolute.
directory = local_path_module.abspath(directory)
file = local_path_module.abspath(file)
return local_path_module.commonprefix([file, directory]) == directory
[docs]def merge_sorted_iterables( operator, *iterables ):
"""
>>> operator = lambda x: x
>>> list( merge_sorted_iterables( operator, [1,2,3], [4,5] ) )
[1, 2, 3, 4, 5]
>>> list( merge_sorted_iterables( operator, [4, 5], [1,2,3] ) )
[1, 2, 3, 4, 5]
>>> list( merge_sorted_iterables( operator, [1, 4, 5], [2], [3] ) )
[1, 2, 3, 4, 5]
"""
first_iterable = iterables[ 0 ]
if len( iterables ) == 1:
for el in first_iterable:
yield el
else:
for el in __merge_two_sorted_iterables(
operator,
iter( first_iterable ),
merge_sorted_iterables( operator, *iterables[ 1: ] )
):
yield el
def __merge_two_sorted_iterables( operator, iterable1, iterable2 ):
unset = object()
continue_merge = True
next_1 = unset
next_2 = unset
while continue_merge:
try:
if next_1 is unset:
next_1 = next( iterable1 )
if next_2 is unset:
next_2 = next( iterable2 )
if operator( next_2 ) < operator( next_1 ):
yield next_2
next_2 = unset
else:
yield next_1
next_1 = unset
except StopIteration:
continue_merge = False
if next_1 is not unset:
yield next_1
if next_2 is not unset:
yield next_2
for el in iterable1:
yield el
for el in iterable2:
yield el
[docs]class Params( object ):
"""
Stores and 'sanitizes' parameters. Alphanumeric characters and the
non-alphanumeric ones that are deemed safe are let to pass through (see L{valid_chars}).
Some non-safe characters are escaped to safe forms for example C{>} becomes C{__lt__}
(see L{mapped_chars}). All other characters are replaced with C{X}.
Operates on string or list values only (HTTP parameters).
>>> values = { 'status':'on', 'symbols':[ 'alpha', '<>', '$rm&#!' ] }
>>> par = Params(values)
>>> par.status
'on'
>>> par.value == None # missing attributes return None
True
>>> par.get('price', 0)
0
>>> par.symbols # replaces unknown symbols with X
['alpha', '__lt____gt__', 'XrmX__pd__!']
>>> sorted(par.flatten()) # flattening to a list
[('status', 'on'), ('symbols', 'XrmX__pd__!'), ('symbols', '__lt____gt__'), ('symbols', 'alpha')]
"""
# is NEVER_SANITIZE required now that sanitizing for tool parameters can be controlled on a per parameter basis and occurs via InputValueWrappers?
NEVER_SANITIZE = ['file_data', 'url_paste', 'URL', 'filesystem_paths']
def __init__( self, params, sanitize=True ):
if sanitize:
for key, value in params.items():
# sanitize check both ungrouped and grouped parameters by
# name. Anything relying on NEVER_SANITIZE should be
# changed to not require this and NEVER_SANITIZE should be
# removed.
if key not in self.NEVER_SANITIZE and True not in [ key.endswith( "|%s" % nonsanitize_parameter ) for
nonsanitize_parameter in self.NEVER_SANITIZE ]:
self.__dict__[ key ] = sanitize_param( value )
else:
self.__dict__[ key ] = value
else:
self.__dict__.update(params)
[docs] def flatten(self):
"""
Creates a tuple list from a dict with a tuple/value pair for every value that is a list
"""
flat = []
for key, value in self.__dict__.items():
if isinstance(value, list):
for v in value:
flat.append( (key, v) )
else:
flat.append( (key, value) )
return flat
def __getattr__(self, name):
"""This is here to ensure that we get None for non existing parameters"""
return None
[docs] def get(self, key, default):
return self.__dict__.get(key, default)
def __str__(self):
return '%s' % self.__dict__
def __len__(self):
return len(self.__dict__)
def __iter__(self):
return iter(self.__dict__)
[docs] def update(self, values):
self.__dict__.update(values)
[docs]def rst_to_html( s ):
"""Convert a blob of reStructuredText to HTML"""
log = logging.getLogger( "docutils" )
class FakeStream( object ):
def write( self, str ):
if len( str ) > 0 and not str.isspace():
log.warn( str )
settings_overrides = {
"embed_stylesheet": False,
"template": os.path.join(os.path.dirname(__file__), "docutils_template.txt"),
"warning_stream": FakeStream(),
"doctitle_xform": False, # without option, very different rendering depending on
# number of sections in help content.
}
return unicodify( docutils.core.publish_string( s,
writer=docutils.writers.html4css1.Writer(),
settings_overrides=settings_overrides ) )
[docs]def xml_text(root, name=None):
"""Returns the text inside an element"""
if name is not None:
# Try attribute first
val = root.get(name)
if val:
return val
# Then try as element
elem = root.find(name)
else:
elem = root
if elem is not None and elem.text:
text = ''.join(elem.text.splitlines())
return text.strip()
# No luck, return empty string
return ''
# asbool implementation pulled from PasteDeploy
truthy = frozenset(['true', 'yes', 'on', 'y', 't', '1'])
falsy = frozenset(['false', 'no', 'off', 'n', 'f', '0'])
[docs]def asbool(obj):
if isinstance(obj, basestring):
obj = obj.strip().lower()
if obj in truthy:
return True
elif obj in falsy:
return False
else:
raise ValueError("String is not true/false: %r" % obj)
return bool(obj)
[docs]def string_as_bool( string ):
if str( string ).lower() in ( 'true', 'yes', 'on' ):
return True
else:
return False
[docs]def string_as_bool_or_none( string ):
"""
Returns True, None or False based on the argument:
True if passed True, 'True', 'Yes', or 'On'
None if passed None or 'None'
False otherwise
Note: string comparison is case-insensitive so lowecase versions of those
function equivalently.
"""
string = str( string ).lower()
if string in ( 'true', 'yes', 'on' ):
return True
elif string == 'none':
return None
else:
return False
[docs]def listify( item, do_strip=False ):
"""
Make a single item a single item list, or return a list if passed a
list. Passing a None returns an empty list.
"""
if not item:
return []
elif isinstance( item, list ):
return item
elif isinstance( item, basestring ) and item.count( ',' ):
if do_strip:
return [token.strip() for token in item.split( ',' )]
else:
return item.split( ',' )
else:
return [ item ]
[docs]def commaify(amount):
orig = amount
new = re.sub("^(-?\d+)(\d{3})", '\g<1>,\g<2>', amount)
if orig == new:
return new
else:
return commaify(new)
[docs]def roundify(amount, sfs=2):
"""
Take a number in string form and truncate to 'sfs' significant figures.
"""
if len(amount) <= sfs:
return amount
else:
return amount[0:sfs] + '0' * (len(amount) - sfs)
[docs]def unicodify( value, encoding=DEFAULT_ENCODING, error='replace', default=None ):
"""
Returns a unicode string or None
"""
if isinstance( value, unicode ):
return value
try:
return unicode( str( value ), encoding, error )
except:
return default
[docs]def smart_str(s, encoding='utf-8', strings_only=False, errors='strict'):
"""
Returns a bytestring version of 's', encoded as specified in 'encoding'.
If strings_only is True, don't convert (some) non-string-like objects.
Adapted from an older, simpler version of django.utils.encoding.smart_str.
"""
if strings_only and isinstance(s, (type(None), int)):
return s
if not isinstance(s, basestring):
try:
return str(s)
except UnicodeEncodeError:
return unicode(s).encode(encoding, errors)
elif isinstance(s, unicode):
return s.encode(encoding, errors)
elif s and encoding != 'utf-8':
return s.decode('utf-8', errors).encode(encoding, errors)
else:
return s
[docs]def object_to_string( obj ):
return binascii.hexlify( obj )
[docs]def string_to_object( s ):
return binascii.unhexlify( s )
[docs]class ParamsWithSpecs( collections.defaultdict ):
"""
"""
def __init__( self, specs=None, params=None ):
self.specs = specs or dict()
self.params = params or dict()
for name, value in self.params.items():
if name not in self.specs:
self._param_unknown_error( name )
if 'map' in self.specs[ name ]:
try:
self.params[ name ] = self.specs[ name ][ 'map' ]( value )
except Exception:
self._param_map_error( name, value )
if 'valid' in self.specs[ name ]:
if not self.specs[ name ][ 'valid' ]( value ):
self._param_vaildation_error( name, value )
self.update( self.params )
def __missing__( self, name ):
return self.specs[ name ][ 'default' ]
def __getattr__( self, name ):
return self[ name ]
def _param_unknown_error( self, name ):
raise NotImplementedError()
def _param_map_error( self, name, value ):
raise NotImplementedError()
def _param_vaildation_error( self, name, value ):
raise NotImplementedError()
[docs]def compare_urls( url1, url2, compare_scheme=True, compare_hostname=True, compare_path=True ):
url1 = urlparse.urlparse( url1 )
url2 = urlparse.urlparse( url2 )
if compare_scheme and url1.scheme and url2.scheme and url1.scheme != url2.scheme:
return False
if compare_hostname and url1.hostname and url2.hostname and url1.hostname != url2.hostname:
return False
if compare_path and url1.path and url2.path and url1.path != url2.path:
return False
return True
[docs]def read_dbnames(filename):
""" Read build names from file """
class DBNames( list ):
default_value = "?"
default_name = "unspecified (?)"
db_names = DBNames()
try:
ucsc_builds = {}
man_builds = [] # assume these are integers
name_to_db_base = {}
if filename is None:
# Should only be happening with the galaxy.tools.parameters.basic:GenomeBuildParameter docstring unit test
filename = os.path.join( 'tool-data', 'shared', 'ucsc', 'builds.txt.sample' )
for line in open(filename):
try:
if line[0:1] == "#":
continue
fields = line.replace("\r", "").replace("\n", "").split("\t")
# Special case of unspecified build is at top of list
if fields[0] == "?":
db_names.insert(0, (fields[0], fields[1]))
continue
try: # manual build (i.e. microbes)
int(fields[0])
man_builds.append((fields[1], fields[0]))
except: # UCSC build
db_base = fields[0].rstrip('0123456789')
if db_base not in ucsc_builds:
ucsc_builds[db_base] = []
name_to_db_base[fields[1]] = db_base
# we want to sort within a species numerically by revision number
build_rev = re.compile(r'\d+$')
try:
build_rev = int(build_rev.findall(fields[0])[0])
except:
build_rev = 0
ucsc_builds[db_base].append((build_rev, fields[0], fields[1]))
except:
continue
sort_names = name_to_db_base.keys()
sort_names.sort()
for name in sort_names:
db_base = name_to_db_base[name]
ucsc_builds[db_base].sort()
ucsc_builds[db_base].reverse()
ucsc_builds[db_base] = [(build, name) for _, build, name in ucsc_builds[db_base]]
db_names = DBNames( db_names + ucsc_builds[db_base] )
if len( db_names ) > 1 and len( man_builds ) > 0:
db_names.append( ( db_names.default_value, '----- Additional Species Are Below -----' ) )
man_builds.sort()
man_builds = [(build, name) for name, build in man_builds]
db_names = DBNames( db_names + man_builds )
except Exception, e:
print "ERROR: Unable to read builds file:", e
if len(db_names) < 1:
db_names = DBNames( [( db_names.default_value, db_names.default_name )] )
return db_names
[docs]def read_build_sites( filename, check_builds=True ):
""" read db names to ucsc mappings from file, this file should probably be merged with the one above """
build_sites = []
try:
for line in open(filename):
try:
if line[0:1] == "#":
continue
fields = line.replace("\r", "").replace("\n", "").split("\t")
site_name = fields[0]
site = fields[1]
if check_builds:
site_builds = fields[2].split(",")
site_dict = {'name': site_name, 'url': site, 'builds': site_builds}
else:
site_dict = {'name': site_name, 'url': site}
build_sites.append( site_dict )
except:
continue
except:
print "ERROR: Unable to read builds for site file %s" % filename
return build_sites
[docs]def relativize_symlinks( path, start=None, followlinks=False):
for root, dirs, files in os.walk( path, followlinks=followlinks ):
rel_start = None
for file_name in files:
symlink_file_name = os.path.join( root, file_name )
if os.path.islink( symlink_file_name ):
symlink_target = os.readlink( symlink_file_name )
if rel_start is None:
if start is None:
rel_start = root
else:
rel_start = start
rel_path = relpath( symlink_target, rel_start )
os.remove( symlink_file_name )
os.symlink( rel_path, symlink_file_name )
[docs]def stringify_dictionary_keys( in_dict ):
# returns a new dictionary
# changes unicode keys into strings, only works on top level (does not recurse)
# unicode keys are not valid for expansion into keyword arguments on method calls
out_dict = {}
for key, value in in_dict.iteritems():
out_dict[ str( key ) ] = value
return out_dict
[docs]def recursively_stringify_dictionary_keys( d ):
if isinstance(d, dict):
return dict([(k.encode( DEFAULT_ENCODING ), recursively_stringify_dictionary_keys(v)) for k, v in d.iteritems()])
elif isinstance(d, list):
return [recursively_stringify_dictionary_keys(x) for x in d]
else:
return d
[docs]def mkstemp_ln( src, prefix='mkstemp_ln_' ):
"""
From tempfile._mkstemp_inner, generate a hard link in the same dir with a
random name. Created so we can persist the underlying file of a
NamedTemporaryFile upon its closure.
"""
dir = os.path.dirname(src)
names = tempfile._get_candidate_names()
for seq in xrange(tempfile.TMP_MAX):
name = names.next()
file = os.path.join(dir, prefix + name)
try:
os.link( src, file )
return (os.path.abspath(file))
except OSError, e:
if e.errno == errno.EEXIST:
continue # try again
raise
raise IOError(errno.EEXIST, "No usable temporary file name found")
[docs]def umask_fix_perms( path, umask, unmasked_perms, gid=None ):
"""
umask-friendly permissions fixing
"""
perms = unmasked_perms & ~umask
try:
st = os.stat( path )
except OSError, e:
log.exception( 'Unable to set permissions or group on %s' % path )
return
# fix modes
if stat.S_IMODE( st.st_mode ) != perms:
try:
os.chmod( path, perms )
except Exception, e:
log.warning( 'Unable to honor umask (%s) for %s, tried to set: %s but mode remains %s, error was: %s' % ( oct( umask ),
path,
oct( perms ),
oct( stat.S_IMODE( st.st_mode ) ),
e ) )
# fix group
if gid is not None and st.st_gid != gid:
try:
os.chown( path, -1, gid )
except Exception, e:
try:
desired_group = grp.getgrgid( gid )
current_group = grp.getgrgid( st.st_gid )
except:
desired_group = gid
current_group = st.st_gid
log.warning( 'Unable to honor primary group (%s) for %s, group remains %s, error was: %s' % ( desired_group,
path,
current_group,
e ) )
[docs]def docstring_trim(docstring):
"""Trimming python doc strings. Taken from: http://www.python.org/dev/peps/pep-0257/"""
if not docstring:
return ''
# Convert tabs to spaces (following the normal Python rules)
# and split into a list of lines:
lines = docstring.expandtabs().splitlines()
# Determine minimum indentation (first line doesn't count):
indent = sys.maxint
for line in lines[1:]:
stripped = line.lstrip()
if stripped:
indent = min(indent, len(line) - len(stripped))
# Remove indentation (first line is special):
trimmed = [lines[0].strip()]
if indent < sys.maxint:
for line in lines[1:]:
trimmed.append(line[indent:].rstrip())
# Strip off trailing and leading blank lines:
while trimmed and not trimmed[-1]:
trimmed.pop()
while trimmed and not trimmed[0]:
trimmed.pop(0)
# Return a single string:
return '\n'.join(trimmed)
[docs]def nice_size(size):
"""
Returns a readably formatted string with the size
>>> nice_size(100)
'100 bytes'
>>> nice_size(10000)
'9.8 KB'
>>> nice_size(1000000)
'976.6 KB'
>>> nice_size(100000000)
'95.4 MB'
"""
words = [ 'bytes', 'KB', 'MB', 'GB', 'TB' ]
prefix = ''
try:
size = float( size )
if size < 0:
size = abs( size )
prefix = '-'
except:
return '??? bytes'
for ind, word in enumerate(words):
step = 1024 ** (ind + 1)
if step > size:
size = size / float(1024 ** ind)
if word == 'bytes': # No decimals for bytes
return "%s%d bytes" % ( prefix, size )
return "%s%.1f %s" % ( prefix, size, word )
return '??? bytes'
[docs]def size_to_bytes( size ):
"""
Returns a number of bytes if given a reasonably formatted string with the size
"""
# Assume input in bytes if we can convert directly to an int
try:
return int( size )
except:
pass
# Otherwise it must have non-numeric characters
size_re = re.compile( '([\d\.]+)\s*([tgmk]b?|b|bytes?)$' )
size_match = re.match( size_re, size.lower() )
assert size_match is not None
size = float( size_match.group(1) )
multiple = size_match.group(2)
if multiple.startswith( 't' ):
return int( size * 1024 ** 4 )
elif multiple.startswith( 'g' ):
return int( size * 1024 ** 3 )
elif multiple.startswith( 'm' ):
return int( size * 1024 ** 2 )
elif multiple.startswith( 'k' ):
return int( size * 1024 )
elif multiple.startswith( 'b' ):
return int( size )
[docs]def send_mail( frm, to, subject, body, config ):
"""
Sends an email.
"""
to = listify( to )
msg = MIMEText( body.encode( 'ascii', 'replace' ) )
msg[ 'To' ] = ', '.join( to )
msg[ 'From' ] = frm
msg[ 'Subject' ] = subject
if config.smtp_server is None:
log.error( "Mail is not configured for this Galaxy instance." )
log.info( msg )
return
smtp_ssl = asbool( getattr(config, 'smtp_ssl', False ) )
if smtp_ssl:
s = smtplib.SMTP_SSL()
else:
s = smtplib.SMTP()
s.connect( config.smtp_server )
if not smtp_ssl:
try:
s.starttls()
log.debug( 'Initiated SSL/TLS connection to SMTP server: %s' % config.smtp_server )
except RuntimeError, e:
log.warning( 'SSL/TLS support is not available to your Python interpreter: %s' % e )
except smtplib.SMTPHeloError, e:
log.error( "The server didn't reply properly to the HELO greeting: %s" % e )
s.close()
raise
except smtplib.SMTPException, e:
log.warning( 'The server does not support the STARTTLS extension: %s' % e )
if config.smtp_username and config.smtp_password:
try:
s.login( config.smtp_username, config.smtp_password )
except smtplib.SMTPHeloError, e:
log.error( "The server didn't reply properly to the HELO greeting: %s" % e )
s.close()
raise
except smtplib.SMTPAuthenticationError, e:
log.error( "The server didn't accept the username/password combination: %s" % e )
s.close()
raise
except smtplib.SMTPException, e:
log.error( "No suitable authentication method was found: %s" % e )
s.close()
raise
s.sendmail( frm, to, msg.as_string() )
s.quit()
[docs]def force_symlink( source, link_name ):
try:
os.symlink( source, link_name )
except OSError, e:
if e.errno == errno.EEXIST:
os.remove( link_name )
os.symlink( source, link_name )
else:
raise e
[docs]def move_merge( source, target ):
# when using shutil and moving a directory, if the target exists,
# then the directory is placed inside of it
# if the target doesn't exist, then the target is made into the directory
# this makes it so that the target is always the target, and if it exists,
# the source contents are moved into the target
if os.path.isdir( source ) and os.path.exists( target ) and os.path.isdir( target ):
for name in os.listdir( source ):
move_merge( os.path.join( source, name ), os.path.join( target, name ) )
else:
return shutil.move( source, target )
[docs]def safe_str_cmp(a, b):
"""safely compare two strings in a timing-attack-resistant manner
"""
if len(a) != len(b):
return False
rv = 0
for x, y in izip(a, b):
rv |= ord(x) ^ ord(y)
return rv == 0
galaxy_root_path = os.path.join(__path__[0], "..", "..", "..")
[docs]def galaxy_directory():
return os.path.abspath(galaxy_root_path)
[docs]class ExecutionTimer(object):
def __init__(self):
self.begin = time.time()
def __str__(self):
elapsed = (time.time() - self.begin) * 1000.0
return "(%0.3f ms)" % elapsed
if __name__ == '__main__':
import doctest
doctest.testmod(sys.modules[__name__], verbose=False)