import logging
import re
from sqlalchemy.sql import select
from sqlalchemy.sql.expression import func
log = logging.getLogger( __name__ )
# Item-specific information needed to perform tagging.
[docs]class ItemTagAssocInfo( object ):
def __init__( self, item_class, tag_assoc_class, item_id_col ):
self.item_class = item_class
self.tag_assoc_class = tag_assoc_class
self.item_id_col = item_id_col
[docs]class TagManager( object ):
"""
Manages CRUD operations related to tagging objects.
"""
def __init__( self, app ):
self.app = app
# Minimum tag length.
self.min_tag_len = 2
# Maximum tag length.
self.max_tag_len = 255
# Tag separator.
self.tag_separators = ',;'
# Hierarchy separator.
self.hierarchy_separator = '.'
# Key-value separator.
self.key_value_separators = "=:"
# Initialize with known classes - add to this in subclasses.
self.item_tag_assoc_info = {}
[docs] def get_tag_assoc_class( self, item_class ):
"""Returns tag association class for item class."""
return self.item_tag_assoc_info[item_class.__name__].tag_assoc_class
[docs] def get_id_col_in_item_tag_assoc_table( self, item_class ):
"""Returns item id column in class' item-tag association table."""
return self.item_tag_assoc_info[item_class.__name__].item_id_col
[docs] def remove_item_tag( self, user, item, tag_name ):
"""Remove a tag from an item."""
# Get item tag association.
item_tag_assoc = self._get_item_tag_assoc( user, item, tag_name )
# Remove association.
if item_tag_assoc:
# Delete association.
self.app.model.context.delete( item_tag_assoc )
item.tags.remove( item_tag_assoc )
return True
return False
[docs] def item_has_tag( self, user, item, tag ):
"""Returns true if item is has a given tag."""
# Get tag name.
if isinstance( tag, basestring ):
tag_name = tag
elif isinstance( tag, self.app.model.Tag ):
tag_name = tag.name
# Check for an item-tag association to see if item has a given tag.
item_tag_assoc = self._get_item_tag_assoc( user, item, tag_name )
if item_tag_assoc:
return True
return False
[docs] def apply_item_tag( self, user, item, name, value=None ):
# Use lowercase name for searching/creating tag.
lc_name = name.lower()
# Get or create item-tag association.
item_tag_assoc = self._get_item_tag_assoc( user, item, lc_name )
if not item_tag_assoc:
# Create item-tag association.
# Create tag; if None, skip the tag (and log error).
tag = self._get_or_create_tag( lc_name )
if not tag:
log.warn( "Failed to create tag with name %s" % lc_name )
return
# Create tag association based on item class.
item_tag_assoc_class = self.get_tag_assoc_class( item.__class__ )
item_tag_assoc = item_tag_assoc_class()
# Add tag to association.
item.tags.append( item_tag_assoc )
item_tag_assoc.tag = tag
item_tag_assoc.user = user
# Apply attributes to item-tag association. Strip whitespace from user name and tag.
lc_value = None
if value:
lc_value = value.lower()
item_tag_assoc.user_tname = name
item_tag_assoc.user_value = value
item_tag_assoc.value = lc_value
return item_tag_assoc
[docs] def get_tag_by_id( self, tag_id ):
"""Get a Tag object from a tag id."""
return self.app.model.context.query( self.app.model.Tag ).filter_by( id=tag_id ).first()
[docs] def get_tag_by_name( self, tag_name ):
"""Get a Tag object from a tag name (string)."""
if tag_name:
return self.app.model.context.query( self.app.model.Tag ).filter_by( name=tag_name.lower() ).first()
return None
def _create_tag( self, tag_str ):
"""Create a Tag object from a tag string."""
tag_hierarchy = tag_str.split( self.hierarchy_separator )
tag_prefix = ""
parent_tag = None
for sub_tag in tag_hierarchy:
# Get or create subtag.
tag_name = tag_prefix + self._scrub_tag_name( sub_tag )
tag = self.app.model.context.query( self.app.model.Tag ).filter_by( name=tag_name).first()
if not tag:
tag = self.app.model.Tag( type=0, name=tag_name )
# Set tag parent.
tag.parent = parent_tag
# Update parent and tag prefix.
parent_tag = tag
tag_prefix = tag.name + self.hierarchy_separator
return tag
def _get_or_create_tag( self, tag_str ):
"""Get or create a Tag object from a tag string."""
# Scrub tag; if tag is None after being scrubbed, return None.
scrubbed_tag_str = self._scrub_tag_name( tag_str )
if not scrubbed_tag_str:
return None
# Get item tag.
tag = self.get_tag_by_name( scrubbed_tag_str )
# Create tag if necessary.
if tag is None:
tag = self._create_tag( scrubbed_tag_str )
return tag
def _get_item_tag_assoc( self, user, item, tag_name ):
"""
Return ItemTagAssociation object for a user, item, and tag string; returns None if there is
no such association.
"""
scrubbed_tag_name = self._scrub_tag_name( tag_name )
for item_tag_assoc in item.tags:
if ( item_tag_assoc.user == user ) and ( item_tag_assoc.user_tname == scrubbed_tag_name ):
return item_tag_assoc
return None
def _scrub_tag_value( self, value ):
"""Scrub a tag value."""
# Gracefully handle None:
if not value:
return None
# Remove whitespace from value.
reg_exp = re.compile( '\s' )
scrubbed_value = re.sub( reg_exp, "", value )
return scrubbed_value
def _scrub_tag_name( self, name ):
"""Scrub a tag name."""
# Gracefully handle None:
if not name:
return None
# Remove whitespace from name.
reg_exp = re.compile( '\s' )
scrubbed_name = re.sub( reg_exp, "", name )
# Ignore starting ':' char.
if scrubbed_name.startswith( self.hierarchy_separator ):
scrubbed_name = scrubbed_name[1:]
# If name is too short or too long, return None.
if len( scrubbed_name ) < self.min_tag_len or len( scrubbed_name ) > self.max_tag_len:
return None
return scrubbed_name
def _scrub_tag_name_list( self, tag_name_list ):
"""Scrub a tag name list."""
scrubbed_tag_list = list()
for tag in tag_name_list:
scrubbed_tag_list.append( self._scrub_tag_name( tag ) )
return scrubbed_tag_list
def _get_name_value_pair( self, tag_str ):
"""Get name, value pair from a tag string."""
# Use regular expression to parse name, value.
reg_exp = re.compile( "[" + self.key_value_separators + "]" )
name_value_pair = reg_exp.split( tag_str )
# Add empty slot if tag does not have value.
if len( name_value_pair ) < 2:
name_value_pair.append( None )
return name_value_pair
[docs]class GalaxyTagManager( TagManager ):
def __init__( self, app ):
from galaxy import model
TagManager.__init__( self, app )
self.item_tag_assoc_info["History"] = ItemTagAssocInfo( model.History,
model.HistoryTagAssociation,
model.HistoryTagAssociation.table.c.history_id )
self.item_tag_assoc_info["HistoryDatasetAssociation"] = \
ItemTagAssocInfo( model.HistoryDatasetAssociation,
model.HistoryDatasetAssociationTagAssociation,
model.HistoryDatasetAssociationTagAssociation.table.c.history_dataset_association_id )
self.item_tag_assoc_info["Page"] = ItemTagAssocInfo( model.Page,
model.PageTagAssociation,
model.PageTagAssociation.table.c.page_id )
self.item_tag_assoc_info["StoredWorkflow"] = ItemTagAssocInfo( model.StoredWorkflow,
model.StoredWorkflowTagAssociation,
model.StoredWorkflowTagAssociation.table.c.stored_workflow_id )
self.item_tag_assoc_info["Visualization"] = ItemTagAssocInfo( model.Visualization,
model.VisualizationTagAssociation,
model.VisualizationTagAssociation.table.c.visualization_id )