Source code for tool_shed.util.review_util

import logging
import os
from galaxy.model.orm import and_
from galaxy.util.odict import odict
from tool_shed.util import hg_util
import tool_shed.util.shed_util_common as suc

log = logging.getLogger( __name__ )

[docs]def can_browse_repository_reviews( app, user, repository ): """ Determine if there are any reviews of the received repository for which the current user has permission to browse any component reviews. """ if user: for review in repository.reviews: for component_review in review.component_reviews: if app.security_agent.user_can_browse_component_review( app, repository, component_review, user ): return True return False
[docs]def changeset_revision_reviewed_by_user( user, repository, changeset_revision ): """Determine if the current changeset revision has been reviewed by the current user.""" for review in repository.reviews: if review.changeset_revision == changeset_revision and review.user == user: return True return False
[docs]def get_component( app, id ): """Get a component from the database.""" sa_session = app.model.context.current return sa_session.query( app.model.Component ).get( app.security.decode_id( id ) )
[docs]def get_component_review( app, id ): """Get a component_review from the database""" sa_session = app.model.context.current return sa_session.query( app.model.ComponentReview ).get( app.security.decode_id( id ) )
[docs]def get_component_by_name( app, name ): """Get a component from the database via a name.""" sa_session = app.model.context.current return sa_session.query( app.model.Component ) \ .filter( app.model.Component.table.c.name==name ) \ .first()
[docs]def get_component_review_by_repository_review_id_component_id( app, repository_review_id, component_id ): """Get a component_review from the database via repository_review_id and component_id.""" sa_session = app.model.context.current return sa_session.query( app.model.ComponentReview ) \ .filter( and_( app.model.ComponentReview.table.c.repository_review_id == app.security.decode_id( repository_review_id ), app.model.ComponentReview.table.c.component_id == app.security.decode_id( component_id ) ) ) \ .first()
[docs]def get_components( app ): sa_session = app.model.context.current return sa_session.query( app.model.Component ) \ .order_by( app.model.Component.name ) \ .all()
[docs]def get_previous_repository_reviews( app, repository, changeset_revision ): """ Return an ordered dictionary of repository reviews up to and including the received changeset revision. """ repo = hg_util.get_repo_for_repository( app, repository=repository, repo_path=None, create=False ) reviewed_revision_hashes = [ review.changeset_revision for review in repository.reviews ] previous_reviews_dict = odict() for changeset in hg_util.reversed_upper_bounded_changelog( repo, changeset_revision ): previous_changeset_revision = str( repo.changectx( changeset ) ) if previous_changeset_revision in reviewed_revision_hashes: previous_rev, previous_changeset_revision_label = \ hg_util.get_rev_label_from_changeset_revision( repo, previous_changeset_revision ) revision_reviews = get_reviews_by_repository_id_changeset_revision( app, app.security.encode_id( repository.id ), previous_changeset_revision ) previous_reviews_dict[ previous_changeset_revision ] = \ dict( changeset_revision_label=previous_changeset_revision_label, reviews=revision_reviews ) return previous_reviews_dict
[docs]def get_review( app, id ): """Get a repository_review from the database via id.""" sa_session = app.model.context.current return sa_session.query( app.model.RepositoryReview ).get( app.security.decode_id( id ) )
[docs]def get_review_by_repository_id_changeset_revision_user_id( app, repository_id, changeset_revision, user_id ): """ Get a repository_review from the database via repository id, changeset_revision and user_id. """ sa_session = app.model.context.current return sa_session.query( app.model.RepositoryReview ) \ .filter( and_( app.model.RepositoryReview.repository_id == app.security.decode_id( repository_id ), app.model.RepositoryReview.changeset_revision == changeset_revision, app.model.RepositoryReview.user_id == app.security.decode_id( user_id ) ) ) \ .first()
[docs]def get_reviews_by_repository_id_changeset_revision( app, repository_id, changeset_revision ): """Get all repository_reviews from the database via repository id and changeset_revision.""" sa_session = app.model.context.current return sa_session.query( app.model.RepositoryReview ) \ .filter( and_( app.model.RepositoryReview.repository_id == app.security.decode_id( repository_id ), app.model.RepositoryReview.changeset_revision == changeset_revision ) ) \ .all()
[docs]def has_previous_repository_reviews( app, repository, changeset_revision ): """ Determine if a repository has a changeset revision review prior to the received changeset revision. """ repo = hg_util.get_repo_for_repository( app, repository=repository, repo_path=None, create=False ) reviewed_revision_hashes = [ review.changeset_revision for review in repository.reviews ] for changeset in hg_util.reversed_upper_bounded_changelog( repo, changeset_revision ): previous_changeset_revision = str( repo.changectx( changeset ) ) if previous_changeset_revision in reviewed_revision_hashes: return True return False