import os
import shutil
import logging
import tempfile
import json
import datetime
from galaxy import model
from galaxy.model.item_attrs import UsesAnnotations
from galaxy.model.orm import eagerload, eagerload_all
from galaxy.tools.parameters.basic import UnvalidatedValue
from galaxy.util.json import dumps, loads
from galaxy.web.framework.helpers import to_unicode
from sqlalchemy.sql import expression
log = logging.getLogger(__name__)
EXPORT_HISTORY_TEXT = """
<tool id="__EXPORT_HISTORY__" name="Export History" version="0.1" tool_type="export_history">
<type class="ExportHistoryTool" module="galaxy.tools"/>
<action module="galaxy.tools.actions.history_imp_exp" class="ExportHistoryToolAction"/>
<command>python $export_history $__EXPORT_HISTORY_COMMAND_INPUTS_OPTIONS__ $output_file</command>
<inputs>
<param name="__HISTORY_TO_EXPORT__" type="hidden"/>
<param name="compress" type="boolean"/>
<param name="__EXPORT_HISTORY_COMMAND_INPUTS_OPTIONS__" type="hidden"/>
</inputs>
<configfiles>
<configfile name="export_history">from galaxy.tools.imp_exp.export_history import main; main()</configfile>
</configfiles>
<outputs>
<data format="gzip" name="output_file"/>
</outputs>
</tool>
"""
[docs]def load_history_imp_exp_tools( toolbox ):
""" Adds tools for importing/exporting histories to archives. """
# Use same process as that used in load_external_metadata_tool; see that
# method for why create tool description files on the fly.
tool_xml_text = EXPORT_HISTORY_TEXT
# Load export tool.
tmp_name = tempfile.NamedTemporaryFile()
tmp_name.write( tool_xml_text )
tmp_name.flush()
history_exp_tool = toolbox.load_hidden_tool( tmp_name.name )
log.debug( "Loaded history export tool: %s", history_exp_tool.id )
# Load import tool.
tool_xml = os.path.join( os.getcwd(), "lib/galaxy/tools/imp_exp/imp_history_from_archive.xml" )
history_imp_tool = toolbox.load_hidden_tool( tool_xml )
log.debug( "Loaded history import tool: %s", history_imp_tool.id )
[docs]class JobImportHistoryArchiveWrapper( object, UsesAnnotations ):
"""
Class provides support for performing jobs that import a history from
an archive.
"""
def __init__( self, app, job_id ):
self.app = app
self.job_id = job_id
self.sa_session = self.app.model.context
[docs] def cleanup_after_job( self ):
""" Set history, datasets, and jobs' attributes and clean up archive directory. """
#
# Helper methods.
#
def file_in_dir( file_path, a_dir ):
""" Returns true if file is in directory. """
abs_file_path = os.path.abspath( file_path )
return os.path.split( abs_file_path )[0] == a_dir
def read_file_contents( file_path ):
""" Read contents of a file. """
fp = open( file_path, 'rb' )
buffsize = 1048576
file_contents = ''
try:
while True:
file_contents += fp.read( buffsize )
if not file_contents or len( file_contents ) % buffsize != 0:
break
except OverflowError:
pass
fp.close()
return file_contents
def get_tag_str( tag, value ):
""" Builds a tag string for a tag, value pair. """
if not value:
return tag
else:
return tag + ":" + value
#
# Import history.
#
jiha = self.sa_session.query( model.JobImportHistoryArchive ).filter_by( job_id=self.job_id ).first()
if jiha:
try:
archive_dir = jiha.archive_dir
user = jiha.job.user
#
# Create history.
#
history_attr_file_name = os.path.join( archive_dir, 'history_attrs.txt')
history_attr_str = read_file_contents( history_attr_file_name )
history_attrs = loads( history_attr_str )
# Create history.
new_history = model.History( name='imported from archive: %s' % history_attrs['name'].encode( 'utf-8' ),
user=user )
new_history.importing = True
new_history.hid_counter = history_attrs['hid_counter']
new_history.genome_build = history_attrs['genome_build']
self.sa_session.add( new_history )
jiha.history = new_history
self.sa_session.flush()
# Add annotation, tags.
if user:
self.add_item_annotation( self.sa_session, user, new_history, history_attrs[ 'annotation' ] )
"""
TODO: figure out to how add tags to item.
for tag, value in history_attrs[ 'tags' ].items():
trans.app.tag_handler.apply_item_tags( trans, trans.user, new_history, get_tag_str( tag, value ) )
"""
#
# Create datasets.
#
datasets_attrs_file_name = os.path.join( archive_dir, 'datasets_attrs.txt')
datasets_attr_str = read_file_contents( datasets_attrs_file_name )
datasets_attrs = loads( datasets_attr_str )
if os.path.exists( datasets_attrs_file_name + ".provenance" ):
provenance_attr_str = read_file_contents( datasets_attrs_file_name + ".provenance" )
provenance_attrs = loads( provenance_attr_str )
datasets_attrs += provenance_attrs
# Get counts of how often each dataset file is used; a file can
# be linked to multiple dataset objects (HDAs).
datasets_usage_counts = {}
for dataset_attrs in datasets_attrs:
temp_dataset_file_name = \
os.path.abspath( os.path.join( archive_dir, dataset_attrs['file_name'] ) )
if ( temp_dataset_file_name not in datasets_usage_counts ):
datasets_usage_counts[ temp_dataset_file_name ] = 0
datasets_usage_counts[ temp_dataset_file_name ] += 1
# Create datasets.
for dataset_attrs in datasets_attrs:
metadata = dataset_attrs['metadata']
# Create dataset and HDA.
hda = model.HistoryDatasetAssociation( name=dataset_attrs['name'].encode( 'utf-8' ),
extension=dataset_attrs['extension'],
info=dataset_attrs['info'].encode( 'utf-8' ),
blurb=dataset_attrs['blurb'],
peek=dataset_attrs['peek'],
designation=dataset_attrs['designation'],
visible=dataset_attrs['visible'],
dbkey=metadata['dbkey'],
metadata=metadata,
history=new_history,
create_dataset=True,
sa_session=self.sa_session )
if 'uuid' in dataset_attrs:
hda.dataset.uuid = dataset_attrs["uuid"]
if dataset_attrs.get('exported', True) is False:
hda.state = hda.states.DISCARDED
hda.deleted = True
hda.purged = True
else:
hda.state = hda.states.OK
self.sa_session.add( hda )
self.sa_session.flush()
new_history.add_dataset( hda, genome_build=None )
hda.hid = dataset_attrs['hid'] # Overwrite default hid set when HDA added to history.
# TODO: Is there a way to recover permissions? Is this needed?
# permissions = trans.app.security_agent.history_get_default_permissions( new_history )
# trans.app.security_agent.set_all_dataset_permissions( hda.dataset, permissions )
self.sa_session.flush()
if dataset_attrs.get('exported', True) is True:
# Do security check and move/copy dataset data.
temp_dataset_file_name = \
os.path.abspath( os.path.join( archive_dir, dataset_attrs['file_name'] ) )
if not file_in_dir( temp_dataset_file_name, os.path.join( archive_dir, "datasets" ) ):
raise Exception( "Invalid dataset path: %s" % temp_dataset_file_name )
if datasets_usage_counts[ temp_dataset_file_name ] == 1:
shutil.move( temp_dataset_file_name, hda.file_name )
else:
datasets_usage_counts[ temp_dataset_file_name ] -= 1
shutil.copyfile( temp_dataset_file_name, hda.file_name )
hda.dataset.set_total_size() # update the filesize record in the database
# Set tags, annotations.
if user:
self.add_item_annotation( self.sa_session, user, hda, dataset_attrs[ 'annotation' ] )
# TODO: Set tags.
"""
for tag, value in dataset_attrs[ 'tags' ].items():
trans.app.tag_handler.apply_item_tags( trans, trans.user, hda, get_tag_str( tag, value ) )
self.sa_session.flush()
"""
# Although metadata is set above, need to set metadata to recover BAI for BAMs.
if hda.extension == 'bam':
self.app.datatypes_registry.set_external_metadata_tool.tool_action.execute_via_app(
self.app.datatypes_registry.set_external_metadata_tool, self.app, jiha.job.session_id,
new_history.id, jiha.job.user, incoming={ 'input1': hda }, overwrite=False
)
#
# Create jobs.
#
# Read jobs attributes.
jobs_attr_file_name = os.path.join( archive_dir, 'jobs_attrs.txt')
jobs_attr_str = read_file_contents( jobs_attr_file_name )
# Decode jobs attributes.
def as_hda( obj_dct ):
""" Hook to 'decode' an HDA; method uses history and HID to get the HDA represented by
the encoded object. This only works because HDAs are created above. """
if obj_dct.get( '__HistoryDatasetAssociation__', False ):
return self.sa_session.query( model.HistoryDatasetAssociation
).filter_by( history=new_history, hid=obj_dct['hid'] ).first()
return obj_dct
jobs_attrs = loads( jobs_attr_str, object_hook=as_hda )
# Create each job.
for job_attrs in jobs_attrs:
imported_job = model.Job()
imported_job.user = user
# TODO: set session?
# imported_job.session = trans.get_galaxy_session().id
imported_job.history = new_history
imported_job.imported = True
imported_job.tool_id = job_attrs[ 'tool_id' ]
imported_job.tool_version = job_attrs[ 'tool_version' ]
imported_job.set_state( job_attrs[ 'state' ] )
imported_job.info = job_attrs.get('info', None)
imported_job.exit_code = job_attrs.get('exit_code', None)
imported_job.traceback = job_attrs.get('traceback', None)
imported_job.stdout = job_attrs.get('stdout', None)
imported_job.stderr = job_attrs.get('stderr', None)
imported_job.command_line = job_attrs.get('command_line', None)
try:
imported_job.create_time = datetime.datetime.strptime(job_attrs["create_time"], "%Y-%m-%dT%H:%M:%S.%f")
imported_job.update_time = datetime.datetime.strptime(job_attrs["update_time"], "%Y-%m-%dT%H:%M:%S.%f")
except:
pass
self.sa_session.add( imported_job )
self.sa_session.flush()
class HistoryDatasetAssociationIDEncoder( json.JSONEncoder ):
""" Custom JSONEncoder for a HistoryDatasetAssociation that encodes an HDA as its ID. """
def default( self, obj ):
""" Encode an HDA, default encoding for everything else. """
if isinstance( obj, model.HistoryDatasetAssociation ):
return obj.id
return json.JSONEncoder.default( self, obj )
# Set parameters. May be useful to look at metadata.py for creating parameters.
# TODO: there may be a better way to set parameters, e.g.:
# for name, value in tool.params_to_strings( incoming, trans.app ).iteritems():
# job.add_parameter( name, value )
# to make this work, we'd need to flesh out the HDA objects. The code below is
# relatively similar.
for name, value in job_attrs[ 'params' ].items():
# Transform parameter values when necessary.
if isinstance( value, model.HistoryDatasetAssociation ):
# HDA input: use hid to find input.
input_hda = self.sa_session.query( model.HistoryDatasetAssociation ) \
.filter_by( history=new_history, hid=value.hid ).first()
value = input_hda.id
# print "added parameter %s-->%s to job %i" % ( name, value, imported_job.id )
imported_job.add_parameter( name, dumps( value, cls=HistoryDatasetAssociationIDEncoder ) )
# TODO: Connect jobs to input datasets.
# Connect jobs to output datasets.
for output_hid in job_attrs[ 'output_datasets' ]:
# print "%s job has output dataset %i" % (imported_job.id, output_hid)
output_hda = self.sa_session.query( model.HistoryDatasetAssociation
).filter_by(history=new_history, hid=output_hid ).first()
if output_hda:
imported_job.add_output_dataset( output_hda.name, output_hda )
# Connect jobs to input datasets.
if 'input_mapping' in job_attrs:
for input_name, input_hid in job_attrs[ 'input_mapping' ].items():
input_hda = self.sa_session.query( model.HistoryDatasetAssociation ) \
.filter_by( history=new_history, hid=input_hid ).first()
if input_hda:
imported_job.add_input_dataset( input_name, input_hda )
self.sa_session.flush()
# Done importing.
new_history.importing = False
self.sa_session.flush()
# Cleanup.
if os.path.exists( archive_dir ):
shutil.rmtree( archive_dir )
except Exception, e:
jiha.job.stderr += "Error cleaning up history import job: %s" % e
self.sa_session.flush()
[docs]class JobExportHistoryArchiveWrapper( object, UsesAnnotations ):
"""
Class provides support for performing jobs that export a history to an
archive.
"""
def __init__( self, job_id ):
self.job_id = job_id
[docs] def get_history_datasets( self, trans, history ):
"""
Returns history's datasets.
"""
query = ( trans.sa_session.query( trans.model.HistoryDatasetAssociation )
.filter( trans.model.HistoryDatasetAssociation.history == history )
.options( eagerload( "children" ) )
.join( "dataset" )
.options( eagerload_all( "dataset.actions" ) )
.order_by( trans.model.HistoryDatasetAssociation.hid )
.filter( trans.model.HistoryDatasetAssociation.deleted == expression.false() )
.filter( trans.model.Dataset.purged == expression.false() ) )
return query.all()
# TODO: should use db_session rather than trans in this method.
[docs] def setup_job( self, trans, jeha, include_hidden=False, include_deleted=False ):
""" Perform setup for job to export a history into an archive. Method generates
attribute files for export, sets the corresponding attributes in the jeha
object, and returns a command line for running the job. The command line
includes the command, inputs, and options; it does not include the output
file because it must be set at runtime. """
#
# Helper methods/classes.
#
def get_item_tag_dict( item ):
""" Create dictionary of an item's tags. """
tags = {}
for tag in item.tags:
tag_user_tname = to_unicode( tag.user_tname )
tag_user_value = to_unicode( tag.user_value )
tags[ tag_user_tname ] = tag_user_value
return tags
def prepare_metadata( metadata ):
""" Prepare metatdata for exporting. """
for name, value in metadata.items():
# Metadata files are not needed for export because they can be
# regenerated.
if isinstance( value, trans.app.model.MetadataFile ):
del metadata[ name ]
return metadata
class HistoryDatasetAssociationEncoder( json.JSONEncoder ):
""" Custom JSONEncoder for a HistoryDatasetAssociation. """
def default( self, obj ):
""" Encode an HDA, default encoding for everything else. """
if isinstance( obj, trans.app.model.HistoryDatasetAssociation ):
rval = {
"__HistoryDatasetAssociation__": True,
"create_time": obj.create_time.__str__(),
"update_time": obj.update_time.__str__(),
"hid": obj.hid,
"name": to_unicode( obj.name ),
"info": to_unicode( obj.info ),
"blurb": obj.blurb,
"peek": obj.peek,
"extension": obj.extension,
"metadata": prepare_metadata( dict( obj.metadata.items() ) ),
"parent_id": obj.parent_id,
"designation": obj.designation,
"deleted": obj.deleted,
"visible": obj.visible,
"file_name": obj.file_name,
"uuid": ( lambda uuid: str( uuid ) if uuid else None )( obj.dataset.uuid ),
"annotation": to_unicode( getattr( obj, 'annotation', '' ) ),
"tags": get_item_tag_dict( obj ),
}
if not obj.visible and not include_hidden:
rval['exported'] = False
elif obj.deleted and not include_deleted:
rval['exported'] = False
else:
rval['exported'] = True
return rval
if isinstance( obj, UnvalidatedValue ):
return obj.__str__()
return json.JSONEncoder.default( self, obj )
#
# Create attributes/metadata files for export.
#
temp_output_dir = tempfile.mkdtemp()
# Write history attributes to file.
history = jeha.history
history_attrs = {
"create_time": history.create_time.__str__(),
"update_time": history.update_time.__str__(),
"name": to_unicode( history.name ),
"hid_counter": history.hid_counter,
"genome_build": history.genome_build,
"annotation": to_unicode( self.get_item_annotation_str( trans.sa_session, history.user, history ) ),
"tags": get_item_tag_dict( history ),
"includes_hidden_datasets": include_hidden,
"includes_deleted_datasets": include_deleted
}
history_attrs_filename = tempfile.NamedTemporaryFile( dir=temp_output_dir ).name
history_attrs_out = open( history_attrs_filename, 'w' )
history_attrs_out.write( dumps( history_attrs ) )
history_attrs_out.close()
jeha.history_attrs_filename = history_attrs_filename
# Write datasets' attributes to file.
datasets = self.get_history_datasets( trans, history )
included_datasets = []
datasets_attrs = []
provenance_attrs = []
for dataset in datasets:
dataset.annotation = self.get_item_annotation_str( trans.sa_session, history.user, dataset )
if (not dataset.visible and not include_hidden) or (dataset.deleted and not include_deleted):
provenance_attrs.append( dataset )
else:
datasets_attrs.append( dataset )
included_datasets.append( dataset )
datasets_attrs_filename = tempfile.NamedTemporaryFile( dir=temp_output_dir ).name
datasets_attrs_out = open( datasets_attrs_filename, 'w' )
datasets_attrs_out.write( dumps( datasets_attrs, cls=HistoryDatasetAssociationEncoder ) )
datasets_attrs_out.close()
jeha.datasets_attrs_filename = datasets_attrs_filename
provenance_attrs_out = open( datasets_attrs_filename + ".provenance", 'w' )
provenance_attrs_out.write( dumps( provenance_attrs, cls=HistoryDatasetAssociationEncoder ) )
provenance_attrs_out.close()
#
# Write jobs attributes file.
#
# Get all jobs associated with included HDAs.
jobs_dict = {}
for hda in included_datasets:
# Get the associated job, if any. If this hda was copied from another,
# we need to find the job that created the origial hda
job_hda = hda
while job_hda.copied_from_history_dataset_association: # should this check library datasets as well?
job_hda = job_hda.copied_from_history_dataset_association
if not job_hda.creating_job_associations:
# No viable HDA found.
continue
# Get the job object.
job = None
for assoc in job_hda.creating_job_associations:
job = assoc.job
break
if not job:
# No viable job.
continue
jobs_dict[ job.id ] = job
# Get jobs' attributes.
jobs_attrs = []
for id, job in jobs_dict.items():
job_attrs = {}
job_attrs[ 'tool_id' ] = job.tool_id
job_attrs[ 'tool_version' ] = job.tool_version
job_attrs[ 'state' ] = job.state
job_attrs[ 'info' ] = job.info
job_attrs[ 'traceback' ] = job.traceback
job_attrs[ 'command_line' ] = job.command_line
job_attrs[ 'stderr' ] = job.stderr
job_attrs[ 'stdout' ] = job.stdout
job_attrs[ 'exit_code' ] = job.exit_code
job_attrs[ 'create_time' ] = job.create_time.isoformat()
job_attrs[ 'update_time' ] = job.update_time.isoformat()
# Get the job's parameters
try:
params_objects = job.get_param_values( trans.app )
except:
# Could not get job params.
continue
params_dict = {}
for name, value in params_objects.items():
params_dict[ name ] = value
job_attrs[ 'params' ] = params_dict
# -- Get input, output datasets. --
input_datasets = []
input_mapping = {}
for assoc in job.input_datasets:
# Optional data inputs will not have a dataset.
if assoc.dataset:
input_datasets.append( assoc.dataset.hid )
input_mapping[assoc.name] = assoc.dataset.hid
job_attrs[ 'input_datasets' ] = input_datasets
job_attrs[ 'input_mapping'] = input_mapping
output_datasets = [ assoc.dataset.hid for assoc in job.output_datasets ]
job_attrs[ 'output_datasets' ] = output_datasets
jobs_attrs.append( job_attrs )
jobs_attrs_filename = tempfile.NamedTemporaryFile( dir=temp_output_dir ).name
jobs_attrs_out = open( jobs_attrs_filename, 'w' )
jobs_attrs_out.write( dumps( jobs_attrs, cls=HistoryDatasetAssociationEncoder ) )
jobs_attrs_out.close()
jeha.jobs_attrs_filename = jobs_attrs_filename
#
# Create and return command line for running tool.
#
options = ""
if jeha.compressed:
options = "-G"
return "%s %s %s %s" % ( options, history_attrs_filename,
datasets_attrs_filename,
jobs_attrs_filename )
[docs] def cleanup_after_job( self, db_session ):
""" Remove temporary directory and attribute files generated during setup for this job. """
# Get jeha for job.
jeha = db_session.query( model.JobExportHistoryArchive ).filter_by( job_id=self.job_id ).first()
if jeha:
for filename in [ jeha.history_attrs_filename, jeha.datasets_attrs_filename, jeha.jobs_attrs_filename ]:
try:
os.remove( filename )
except Exception, e:
log.debug( 'Failed to cleanup attributes file (%s): %s' % ( filename, e ) )
temp_dir = os.path.split( jeha.history_attrs_filename )[0]
try:
shutil.rmtree( temp_dir )
except Exception, e:
log.debug( 'Error deleting directory containing attribute files (%s): %s' % ( temp_dir, e ) )