"""
Manager and Serializer for histories.
Histories are containers for datasets or dataset collections
created (or copied) by users over the course of an analysis.
"""
from galaxy import model
from galaxy.managers import sharable
from galaxy.managers import deletable
from galaxy.managers import hdas
from galaxy.managers import collections_util
import logging
log = logging.getLogger( __name__ )
[docs]class HistoryManager( sharable.SharableModelManager, deletable.PurgableManagerMixin ):
model_class = model.History
foreign_key_name = 'history'
user_share_model = model.HistoryUserShareAssociation
tag_assoc = model.HistoryTagAssociation
annotation_assoc = model.HistoryAnnotationAssociation
rating_assoc = model.HistoryRatingAssociation
# TODO: incorporate imp/exp (or alias to)
def __init__( self, app, *args, **kwargs ):
super( HistoryManager, self ).__init__( app, *args, **kwargs )
self.hda_manager = hdas.HDAManager( app )
[docs] def copy( self, history, user, **kwargs ):
"""
Copy and return the given `history`.
"""
return history.copy( target_user=user, **kwargs )
# .... sharable
# overriding to handle anonymous users' current histories in both cases
[docs] def by_user( self, user, current_history=None, **kwargs ):
"""
Get all the histories for a given user (allowing anon users' theirs)
ordered by update time.
"""
# handle default and/or anonymous user (which still may not have a history yet)
if self.user_manager.is_anonymous( user ):
return [ current_history ] if current_history else []
return super( HistoryManager, self ).by_user( user, **kwargs )
[docs] def is_owner( self, history, user, current_history=None, **kwargs ):
"""
True if the current user is the owner of the given history.
"""
# anon users are only allowed to view their current history
if self.user_manager.is_anonymous( user ):
if current_history and history == current_history:
return True
return False
return super( HistoryManager, self ).is_owner( history, user )
# TODO: possibly to sharable or base
[docs] def most_recent( self, user, filters=None, current_history=None, **kwargs ):
"""
Return the most recently update history for the user.
If user is anonymous, return the current history. If the user is anonymous
and the current history is deleted, return None.
"""
if self.user_manager.is_anonymous( user ):
return None if ( not current_history or current_history.deleted ) else current_history
desc_update_time = self.model_class.table.c.update_time
filters = self._munge_filters( filters, self.model_class.user_id == user.id )
# TODO: normalize this return value
return self.query( filters=filters, order_by=desc_update_time, limit=1, **kwargs ).first()
# .... purgable
[docs] def purge( self, history, flush=True, **kwargs ):
"""
Purge this history and all HDAs, Collections, and Datasets inside this history.
"""
self.hda_manager.dataset_manager.error_unless_dataset_purge_allowed()
# First purge all the datasets
for hda in history.datasets:
if not hda.purged:
self.hda_manager.purge( hda, flush=True )
# Now mark the history as purged
super( HistoryManager, self ).purge( history, flush=flush, **kwargs )
# .... current
# TODO: make something to bypass the anon user + current history permissions issue
# def is_current_users_current_history( self, history, trans ):
# pass
[docs] def get_current( self, trans ):
"""
Return the current history.
"""
# TODO: trans
return trans.get_history()
[docs] def set_current( self, trans, history ):
"""
Set the current history.
"""
# TODO: trans
trans.set_history( history )
return history
[docs] def set_current_by_id( self, trans, history_id ):
"""
Set the current history by an id.
"""
return self.set_current( trans, self.by_id( history_id ) )
# TODO: replace or move to serializer
def _get_history_data( self, trans, history ):
"""
Returns a dictionary containing ``history`` and ``contents``, serialized
history and an array of serialized history contents respectively.
"""
# TODO: instantiate here? really?
history_serializer = HistorySerializer( self.app )
hda_serializer = hdas.HDASerializer( self.app )
history_dictionary = {}
contents_dictionaries = []
try:
history_dictionary = history_serializer.serialize_to_view( history, view='detailed',
user=trans.user, trans=trans )
for content in history.contents_iter( types=[ 'dataset', 'dataset_collection' ] ):
contents_dict = {}
if isinstance( content, model.HistoryDatasetAssociation ):
contents_dict = hda_serializer.serialize_to_view( content, view='detailed',
user=trans.user, trans=trans )
elif isinstance( content, model.HistoryDatasetCollectionAssociation ):
try:
service = self.app.dataset_collections_service
collection = service.get_dataset_collection_instance(
trans=trans,
instance_type='history',
id=self.app.security.encode_id( content.id ),
)
serializer = collections_util.dictify_dataset_collection_instance
contents_dict = serializer( collection,
security=self.app.security,
parent=collection.history,
view="element" )
except Exception, exc:
log.exception( "Error in history API at listing dataset collection: %s", exc )
# TODO: return some dict with the error
contents_dictionaries.append( contents_dict )
except Exception, exc:
user_id = str( trans.user.id ) if trans.user else '(anonymous)'
log.exception( 'Error bootstrapping history for user %s: %s', user_id, str( exc ) )
message = ( 'An error occurred getting the history data from the server. '
'Please contact a Galaxy administrator if the problem persists.' )
history_dictionary[ 'error' ] = message
return { 'history': history_dictionary,
'contents': contents_dictionaries }
[docs]class HistorySerializer( sharable.SharableModelSerializer, deletable.PurgableSerializerMixin ):
"""
Interface/service object for serializing histories into dictionaries.
"""
SINGLE_CHAR_ABBR = 'h'
def __init__( self, app ):
super( HistorySerializer, self ).__init__( app )
self.history_manager = HistoryManager( app )
self.hda_manager = hdas.HDAManager( app )
self.hda_serializer = hdas.HDASerializer( app )
self.default_view = 'summary'
self.add_view( 'summary', [
'id',
'model_class',
'name',
'deleted',
'purged',
# 'count'
'url',
# TODO: why these?
'published',
'annotation',
'tags',
])
self.add_view( 'detailed', [
'contents_url',
# 'hdas',
'empty',
'size', 'nice_size',
'user_id',
'create_time', 'update_time',
'importable', 'slug', 'username_and_slug',
'genome_build',
# TODO: remove the next three - instead getting the same info from the 'hdas' list
'state',
'state_details',
'state_ids',
# in the Historys' case, each of these views includes the keys from the previous
], include_keys_from='summary' )
# assumes: outgoing to json.dumps and sanitized
[docs] def add_serializers( self ):
super( HistorySerializer, self ).add_serializers()
deletable.PurgableSerializerMixin.add_serializers( self )
self.serializers.update({
'model_class' : lambda *a, **c: 'History',
'id' : self.serialize_id,
'create_time' : self.serialize_date,
'update_time' : self.serialize_date,
'size' : lambda i, k, **c: int( i.get_disk_size() ),
'nice_size' : lambda i, k, **c: i.get_disk_size( nice_size=True ),
'state' : self.serialize_history_state,
'url' : lambda i, k, **c: self.url_for( 'history', id=self.app.security.encode_id( i.id ) ),
'contents_url' : lambda i, k, **c: self.url_for( 'history_contents',
history_id=self.app.security.encode_id( i.id ) ),
'empty' : lambda i, k, **c: ( len( i.datasets ) + len( i.dataset_collections ) ) <= 0,
'count' : lambda i, k, **c: len( i.datasets ),
'hdas' : lambda i, k, **c: [ self.app.security.encode_id( hda.id ) for hda in i.datasets ],
'state_details' : self.serialize_state_counts,
'state_ids' : self.serialize_state_ids,
'contents' : self.serialize_contents
})
# remove this
[docs] def serialize_state_ids( self, history, key, **context ):
"""
Return a dictionary keyed to possible dataset states and valued with lists
containing the ids of each HDA in that state.
"""
state_ids = {}
for state in model.Dataset.states.values():
state_ids[ state ] = []
# TODO:?? collections and coll. states?
for hda in history.datasets:
# TODO: do not encode ids at this layer
encoded_id = self.app.security.encode_id( hda.id )
state_ids[ hda.state ].append( encoded_id )
return state_ids
# remove this
[docs] def serialize_state_counts( self, history, key, exclude_deleted=True, exclude_hidden=False, **context ):
"""
Return a dictionary keyed to possible dataset states and valued with the number
of datasets in this history that have those states.
"""
# TODO: the default flags above may not make a lot of sense (T,T?)
state_counts = {}
for state in model.Dataset.states.values():
state_counts[ state ] = 0
# TODO:?? collections and coll. states?
for hda in history.datasets:
if exclude_deleted and hda.deleted:
continue
if exclude_hidden and not hda.visible:
continue
state_counts[ hda.state ] = state_counts[ hda.state ] + 1
return state_counts
# TODO: remove this (is state used/useful?)
[docs] def serialize_history_state( self, history, key, **context ):
"""
Returns the history state based on the states of the HDAs it contains.
"""
states = model.Dataset.states
# (default to ERROR)
state = states.ERROR
# TODO: history_state and state_counts are classically calc'd at the same time
# so this is rel. ineff. - if we keep this...
hda_state_counts = self.serialize_state_counts( history, 'counts', exclude_deleted=False, **context )
num_hdas = sum( hda_state_counts.values() )
if num_hdas == 0:
state = states.NEW
else:
if ( hda_state_counts[ states.RUNNING ] > 0
or hda_state_counts[ states.SETTING_METADATA ] > 0
or hda_state_counts[ states.UPLOAD ] > 0 ):
state = states.RUNNING
# TODO: this method may be more useful if we *also* polled the histories jobs here too
elif hda_state_counts[ states.QUEUED ] > 0:
state = states.QUEUED
elif ( hda_state_counts[ states.ERROR ] > 0
or hda_state_counts[ states.FAILED_METADATA ] > 0 ):
state = states.ERROR
elif hda_state_counts[ states.OK ] == num_hdas:
state = states.OK
return state
[docs] def serialize_contents( self, history, *args, **context ):
contents_dictionaries = []
for content in history.contents_iter( types=[ 'dataset', 'dataset_collection' ] ):
contents_dict = {}
if isinstance( content, model.HistoryDatasetAssociation ):
contents_dict = self.hda_serializer.serialize_to_view( content, view='detailed', **context )
# elif isinstance( content, model.HistoryDatasetCollectionAssociation ):
# contents_dict = self._serialize_collection( trans, content )
contents_dictionaries.append( contents_dict )
return contents_dictionaries
# TODO: remove trans
def _serialize_collection( self, trans, collection ):
service = self.app.dataset_collections_service
dataset_collection_instance = service.get_dataset_collection_instance(
trans=trans,
instance_type='history',
id=self.app.security.encode_id( collection.id ),
)
return collections_util.dictify_dataset_collection_instance( dataset_collection_instance,
security=self.app.security, parent=dataset_collection_instance.history, view="element" )
[docs]class HistoryDeserializer( sharable.SharableModelDeserializer, deletable.PurgableDeserializerMixin ):
"""
Interface/service object for validating and deserializing dictionaries into histories.
"""
model_manager_class = HistoryManager
def __init__( self, app ):
super( HistoryDeserializer, self ).__init__( app )
self.history_manager = self.manager
[docs] def add_deserializers( self ):
super( HistoryDeserializer, self ).add_deserializers()
deletable.PurgableDeserializerMixin.add_deserializers( self )
self.deserializers.update({
'name' : self.deserialize_basestring,
'genome_build' : self.deserialize_genome_build,
})
[docs]class HistoryFilters( sharable.SharableModelFilters,
deletable.PurgableFiltersMixin ):
model_class = model.History
def _add_parsers( self ):
super( HistoryFilters, self )._add_parsers()
deletable.PurgableFiltersMixin._add_parsers( self )
self.orm_filter_parsers.update({
# history specific
'name' : { 'op': ( 'eq', 'contains', 'like' ) },
'genome_build' : { 'op': ( 'eq', 'contains', 'like' ) },
})