from sqlalchemy.sql.expression import func
from sqlalchemy.orm.collections import InstrumentedList
# Cannot import galaxy.model b/c it creates a circular import graph.
import galaxy
import logging
import datetime
import uuid
log = logging.getLogger( __name__ )
[docs]class RuntimeException( Exception ):
pass
[docs]class UsesItemRatings:
"""
Mixin for getting and setting item ratings.
Class makes two assumptions:
(1) item-rating association table is named <item_class>RatingAssocation
(2) item-rating association table has a column with a foreign key referencing
item table that contains the item's id.
"""
[docs] def get_ave_item_rating_data( self, db_session, item, webapp_model=None ):
""" Returns the average rating for an item."""
if webapp_model is None:
webapp_model = galaxy.model
item_rating_assoc_class = self._get_item_rating_assoc_class( item, webapp_model=webapp_model )
if not item_rating_assoc_class:
raise RuntimeException( "Item does not have ratings: %s" % item.__class__.__name__ )
item_id_filter = self._get_item_id_filter_str( item, item_rating_assoc_class )
ave_rating = db_session.query( func.avg( item_rating_assoc_class.rating ) ).filter( item_id_filter ).scalar()
# Convert ave_rating to float; note: if there are no item ratings, ave rating is None.
if ave_rating:
ave_rating = float( ave_rating )
else:
ave_rating = 0
num_ratings = int( db_session.query( func.count( item_rating_assoc_class.rating ) ).filter( item_id_filter ).scalar() )
return ( ave_rating, num_ratings )
[docs] def rate_item( self, db_session, user, item, rating, webapp_model=None ):
""" Rate an item. Return type is <item_class>RatingAssociation. """
if webapp_model is None:
webapp_model = galaxy.model
item_rating = self.get_user_item_rating( db_session, user, item, webapp_model=webapp_model )
if not item_rating:
# User has not yet rated item; create rating.
item_rating_assoc_class = self._get_item_rating_assoc_class( item, webapp_model=webapp_model )
item_rating = item_rating_assoc_class()
item_rating.user = user
item_rating.set_item( item )
item_rating.rating = rating
db_session.add( item_rating )
db_session.flush()
elif item_rating.rating != rating:
# User has rated item; update rating.
item_rating.rating = rating
db_session.flush()
return item_rating
[docs] def get_user_item_rating( self, db_session, user, item, webapp_model=None ):
""" Returns user's rating for an item. Return type is <item_class>RatingAssociation. """
if webapp_model is None:
webapp_model = galaxy.model
item_rating_assoc_class = self._get_item_rating_assoc_class( item, webapp_model=webapp_model )
if not item_rating_assoc_class:
raise RuntimeException( "Item does not have ratings: %s" % item.__class__.__name__ )
# Query rating table by user and item id.
item_id_filter = self._get_item_id_filter_str( item, item_rating_assoc_class )
return db_session.query( item_rating_assoc_class ).filter_by( user=user ).filter( item_id_filter ).first()
def _get_item_rating_assoc_class( self, item, webapp_model=None ):
""" Returns an item's item-rating association class. """
if webapp_model is None:
webapp_model = galaxy.model
item_rating_assoc_class = '%sRatingAssociation' % item.__class__.__name__
return getattr( webapp_model, item_rating_assoc_class, None )
def _get_item_id_filter_str( self, item, item_rating_assoc_class, webapp_model=None ):
# Get foreign key in item-rating association table that references item table.
if webapp_model is None:
webapp_model = galaxy.model
item_fk = None
for fk in item_rating_assoc_class.table.foreign_keys:
if fk.references( item.table ):
item_fk = fk
break
if not item_fk:
raise RuntimeException( "Cannot find item id column in item-rating association table: %s, %s" % item_rating_assoc_class.__name__, item_rating_assoc_class.table.name )
# TODO: can we provide a better filter than a raw string?
return "%s=%i" % ( item_fk.parent.name, item.id )
[docs]class UsesAnnotations:
""" Mixin for getting and setting item annotations. """
[docs] def get_item_annotation_str( self, db_session, user, item ):
""" Returns a user's annotation string for an item. """
annotation_obj = self.get_item_annotation_obj( db_session, user, item )
if annotation_obj:
return galaxy.util.unicodify( annotation_obj.annotation )
return None
[docs] def get_item_annotation_obj( self, db_session, user, item ):
""" Returns a user's annotation object for an item. """
# Get annotation association class.
annotation_assoc_class = self._get_annotation_assoc_class( item )
if not annotation_assoc_class:
return None
# Get annotation association object.
annotation_assoc = db_session.query( annotation_assoc_class ).filter_by( user=user )
# TODO: use filtering like that in _get_item_id_filter_str()
if item.__class__ == galaxy.model.History:
annotation_assoc = annotation_assoc.filter_by( history=item )
elif item.__class__ == galaxy.model.HistoryDatasetAssociation:
annotation_assoc = annotation_assoc.filter_by( hda=item )
elif item.__class__ == galaxy.model.StoredWorkflow:
annotation_assoc = annotation_assoc.filter_by( stored_workflow=item )
elif item.__class__ == galaxy.model.WorkflowStep:
annotation_assoc = annotation_assoc.filter_by( workflow_step=item )
elif item.__class__ == galaxy.model.Page:
annotation_assoc = annotation_assoc.filter_by( page=item )
elif item.__class__ == galaxy.model.Visualization:
annotation_assoc = annotation_assoc.filter_by( visualization=item )
return annotation_assoc.first()
[docs] def add_item_annotation( self, db_session, user, item, annotation ):
""" Add or update an item's annotation; a user can only have a single annotation for an item. """
# Get/create annotation association object.
annotation_assoc = self.get_item_annotation_obj( db_session, user, item )
if not annotation_assoc:
annotation_assoc_class = self._get_annotation_assoc_class( item )
if not annotation_assoc_class:
return None
annotation_assoc = annotation_assoc_class()
item.annotations.append( annotation_assoc )
annotation_assoc.user = user
# Set annotation.
annotation_assoc.annotation = annotation
return annotation_assoc
[docs] def delete_item_annotation( self, db_session, user, item):
annotation_assoc = self.get_item_annotation_obj( db_session, user, item )
if annotation_assoc:
db_session.delete(annotation_assoc)
db_session.flush()
[docs] def copy_item_annotation( self, db_session, source_user, source_item, target_user, target_item ):
""" Copy an annotation from a user/item source to a user/item target. """
if source_user and target_user:
annotation_str = self.get_item_annotation_str( db_session, source_user, source_item )
if annotation_str:
annotation = self.add_item_annotation( db_session, target_user, target_item, annotation_str )
return annotation
return None
def _get_annotation_assoc_class( self, item ):
""" Returns an item's item-annotation association class. """
class_name = '%sAnnotationAssociation' % item.__class__.__name__
return getattr( galaxy.model, class_name, None )
[docs]class Dictifiable:
""" Mixin that enables objects to be converted to dictionaries. This is useful
when for sharing objects across boundaries, such as the API, tool scripts,
and JavaScript code. """
[docs] def to_dict( self, view='collection', value_mapper=None ):
"""
Return item dictionary.
"""
if not value_mapper:
value_mapper = {}
def get_value( key, item ):
"""
Recursive helper function to get item values.
"""
# FIXME: why use exception here? Why not look for key in value_mapper
# first and then default to to_dict?
try:
return item.to_dict( view=view, value_mapper=value_mapper )
except:
if key in value_mapper:
return value_mapper.get( key )( item )
if type(item) == datetime.datetime:
return item.isoformat()
elif type(item) == uuid.UUID:
return str(item)
# Leaving this for future reference, though we may want a more
# generic way to handle special type mappings going forward.
# If the item is of a class that needs to be 'stringified' before being put into a JSON data structure
# elif type(item) in []:
# return str(item)
return item
# Create dict to represent item.
rval = dict(
model_class=self.__class__.__name__
)
# Fill item dict with visible keys.
try:
visible_keys = self.__getattribute__( 'dict_' + view + '_visible_keys' )
except AttributeError:
raise Exception( 'Unknown Dictifiable view: %s' % view )
for key in visible_keys:
try:
item = self.__getattribute__( key )
if type( item ) == InstrumentedList:
rval[ key ] = []
for i in item:
rval[ key ].append( get_value( key, i ) )
else:
rval[ key ] = get_value( key, item )
except AttributeError:
rval[ key ] = None
return rval