Source code for galaxy.webapps.galaxy.controllers.tag

"""
Tags Controller: handles tagging/untagging of entities
and provides autocomplete support.
"""

from galaxy import web
from galaxy.web.base.controller import BaseUIController, UsesTagsMixin

from sqlalchemy.sql import select
from sqlalchemy.sql.expression import and_, func

import logging
log = logging.getLogger( __name__ )

[docs]class TagsController ( BaseUIController, UsesTagsMixin ): @web.expose @web.require_login( "edit item tags" )
[docs] def get_tagging_elt_async( self, trans, item_id, item_class, elt_context="" ): """ Returns HTML for editing an item's tags. """ item = self._get_item( trans, item_class, trans.security.decode_id( item_id ) ) if not item: return trans.show_error_message( "No item of class %s with id %s " % ( item_class, item_id ) ) return trans.fill_template( "/tagging_common.mako", tag_type="individual", user=trans.user, tagged_item=item, elt_context=elt_context, in_form=False, input_size="22", tag_click_fn="default_tag_click_fn", use_toggle_link=False )
@web.expose @web.require_login( "add tag to an item" )
[docs] def add_tag_async( self, trans, item_id=None, item_class=None, new_tag=None, context=None ): """ Add tag to an item. """ # Apply tag. item = self._get_item( trans, item_class, trans.security.decode_id( item_id ) ) user = trans.user self.get_tag_handler( trans ).apply_item_tags( user, item, new_tag.encode( 'utf-8' ) ) trans.sa_session.flush() # Log. params = dict( item_id=item.id, item_class=item_class, tag=new_tag ) trans.log_action( user, unicode( "tag" ), context, params )
@web.expose @web.require_login( "remove tag from an item" )
[docs] def remove_tag_async( self, trans, item_id=None, item_class=None, tag_name=None, context=None ): """ Remove tag from an item. """ # Remove tag. item = self._get_item( trans, item_class, trans.security.decode_id( item_id ) ) user = trans.user self.get_tag_handler( trans ).remove_item_tag( user, item, tag_name.encode( 'utf-8' ) ) trans.sa_session.flush() # Log. params = dict( item_id=item.id, item_class=item_class, tag=tag_name ) trans.log_action( user, unicode( "untag" ), context, params ) # Retag an item. All previous tags are deleted and new tags are applied.
@web.expose @web.require_login( "Apply a new set of tags to an item; previous tags are deleted." )
[docs] def retag_async( self, trans, item_id=None, item_class=None, new_tags=None ): """ Apply a new set of tags to an item; previous tags are deleted. """ # Apply tags. item = self._get_item( trans, item_class, trans.security.decode_id( item_id ) ) user = trans.user self.get_tag_handler( trans ).delete_item_tags( user, item ) self.get_tag_handler( trans ).apply_item_tags( user, item, new_tags.encode( 'utf-8' ) ) trans.sa_session.flush()
@web.expose @web.require_login( "get autocomplete data for an item's tags" )
[docs] def tag_autocomplete_data( self, trans, q=None, limit=None, timestamp=None, item_id=None, item_class=None ): """ Get autocomplete data for an item's tags. """ # Get item, do security check, and get autocomplete data. item = None if item_id is not None: item = self._get_item( trans, item_class, trans.security.decode_id( item_id ) ) user = trans.user item_class = self.get_class( item_class ) q = '' if q is None else q q = q.encode( 'utf-8' ) if q.find( ":" ) == -1: return self._get_tag_autocomplete_names( trans, q, limit, timestamp, user, item, item_class ) else: return self._get_tag_autocomplete_values( trans, q, limit, timestamp, user, item, item_class )
def _get_tag_autocomplete_names( self, trans, q, limit, timestamp, user=None, item=None, item_class=None ): """ Returns autocomplete data for tag names ordered from most frequently used to least frequently used. """ # Get user's item tags and usage counts. # Get item's class object and item-tag association class. if item is None and item_class is None: raise RuntimeError( "Both item and item_class cannot be None" ) elif item is not None: item_class = item.__class__ item_tag_assoc_class = self.get_tag_handler( trans ).get_tag_assoc_class( item_class ) # Build select statement. cols_to_select = [ item_tag_assoc_class.table.c.tag_id, func.count( '*' ) ] from_obj = item_tag_assoc_class.table.join( item_class.table ).join( trans.app.model.Tag.table ) where_clause = and_( trans.app.model.Tag.table.c.name.like( q + "%" ), item_tag_assoc_class.table.c.user_id == user.id ) order_by = [ func.count( "*" ).desc() ] group_by = item_tag_assoc_class.table.c.tag_id # Do query and get result set. query = select( columns=cols_to_select, from_obj=from_obj, whereclause=where_clause, group_by=group_by, order_by=order_by, limit=limit ) result_set = trans.sa_session.execute( query ) # Create and return autocomplete data. ac_data = "#Header|Your Tags\n" for row in result_set: tag = self.get_tag_handler( trans ).get_tag_by_id( trans, row[0] ) # Exclude tags that are already applied to the item. if ( item is not None ) and ( self.get_tag_handler( trans ).item_has_tag( trans, trans.user, item, tag ) ): continue # Add tag to autocomplete data. Use the most frequent name that user # has employed for the tag. tag_names = self._get_usernames_for_tag( trans, trans.user, tag, item_class, item_tag_assoc_class ) ac_data += tag_names[0] + "|" + tag_names[0] + "\n" return ac_data def _get_tag_autocomplete_values( self, trans, q, limit, timestamp, user=None, item=None, item_class=None ): """ Returns autocomplete data for tag values ordered from most frequently used to least frequently used. """ tag_name_and_value = q.split( ":" ) tag_name = tag_name_and_value[0] tag_value = tag_name_and_value[1] tag = self.get_tag_handler( trans ).get_tag_by_name( trans, tag_name ) # Don't autocomplete if tag doesn't exist. if tag is None: return "" # Get item's class object and item-tag association class. if item is None and item_class is None: raise RuntimeError( "Both item and item_class cannot be None" ) elif item is not None: item_class = item.__class__ item_tag_assoc_class = self.get_tag_handler( trans ).get_tag_assoc_class( item_class ) # Build select statement. cols_to_select = [ item_tag_assoc_class.table.c.value, func.count( '*' ) ] from_obj = item_tag_assoc_class.table.join( item_class.table ).join( trans.app.model.Tag.table ) where_clause = and_( item_tag_assoc_class.table.c.user_id == user.id, trans.app.model.Tag.table.c.id == tag.id, item_tag_assoc_class.table.c.value.like( tag_value + "%" ) ) order_by = [ func.count("*").desc(), item_tag_assoc_class.table.c.value ] group_by = item_tag_assoc_class.table.c.value # Do query and get result set. query = select( columns=cols_to_select, from_obj=from_obj, whereclause=where_clause, group_by=group_by, order_by=order_by, limit=limit ) result_set = trans.sa_session.execute( query ) # Create and return autocomplete data. ac_data = "#Header|Your Values for '%s'\n" % ( tag_name ) tag_uname = self._get_usernames_for_tag( trans, trans.user, tag, item_class, item_tag_assoc_class )[0] for row in result_set: ac_data += tag_uname + ":" + row[0] + "|" + row[0] + "\n" return ac_data def _get_usernames_for_tag( self, trans, user, tag, item_class, item_tag_assoc_class ): """ Returns an ordered list of the user names for a tag; list is ordered from most popular to least popular name. """ # Build select stmt. cols_to_select = [ item_tag_assoc_class.table.c.user_tname, func.count( '*' ) ] where_clause = and_( item_tag_assoc_class.table.c.user_id == user.id, item_tag_assoc_class.table.c.tag_id == tag.id ) group_by = item_tag_assoc_class.table.c.user_tname order_by = [ func.count( "*" ).desc() ] # Do query and get result set. query = select( columns=cols_to_select, whereclause=where_clause, group_by=group_by, order_by=order_by ) result_set = trans.sa_session.execute( query ) user_tag_names = list() for row in result_set: user_tag_names.append( row[0] ) return user_tag_names def _get_item( self, trans, item_class_name, id ): """ Get an item based on type and id. """ item_class = self.get_tag_handler( trans ).item_tag_assoc_info[item_class_name].item_class item = trans.sa_session.query( item_class ).filter( item_class.id == id)[0] return item