import logging
import os
from galaxy.model.orm import and_
from galaxy.util.odict import odict
from tool_shed.util import hg_util
import tool_shed.util.shed_util_common as suc
log = logging.getLogger( __name__ )
[docs]def can_browse_repository_reviews( app, user, repository ):
"""
Determine if there are any reviews of the received repository for which the
current user has permission to browse any component reviews.
"""
if user:
for review in repository.reviews:
for component_review in review.component_reviews:
if app.security_agent.user_can_browse_component_review( app,
repository,
component_review, user ):
return True
return False
[docs]def changeset_revision_reviewed_by_user( user, repository, changeset_revision ):
"""Determine if the current changeset revision has been reviewed by the current user."""
for review in repository.reviews:
if review.changeset_revision == changeset_revision and review.user == user:
return True
return False
[docs]def get_component( app, id ):
"""Get a component from the database."""
sa_session = app.model.context.current
return sa_session.query( app.model.Component ).get( app.security.decode_id( id ) )
[docs]def get_component_review( app, id ):
"""Get a component_review from the database"""
sa_session = app.model.context.current
return sa_session.query( app.model.ComponentReview ).get( app.security.decode_id( id ) )
[docs]def get_component_by_name( app, name ):
"""Get a component from the database via a name."""
sa_session = app.model.context.current
return sa_session.query( app.model.Component ) \
.filter( app.model.Component.table.c.name==name ) \
.first()
[docs]def get_component_review_by_repository_review_id_component_id( app, repository_review_id, component_id ):
"""Get a component_review from the database via repository_review_id and component_id."""
sa_session = app.model.context.current
return sa_session.query( app.model.ComponentReview ) \
.filter( and_( app.model.ComponentReview.table.c.repository_review_id == app.security.decode_id( repository_review_id ),
app.model.ComponentReview.table.c.component_id == app.security.decode_id( component_id ) ) ) \
.first()
[docs]def get_components( app ):
sa_session = app.model.context.current
return sa_session.query( app.model.Component ) \
.order_by( app.model.Component.name ) \
.all()
[docs]def get_previous_repository_reviews( app, repository, changeset_revision ):
"""
Return an ordered dictionary of repository reviews up to and including the
received changeset revision.
"""
repo = hg_util.get_repo_for_repository( app, repository=repository, repo_path=None, create=False )
reviewed_revision_hashes = [ review.changeset_revision for review in repository.reviews ]
previous_reviews_dict = odict()
for changeset in hg_util.reversed_upper_bounded_changelog( repo, changeset_revision ):
previous_changeset_revision = str( repo.changectx( changeset ) )
if previous_changeset_revision in reviewed_revision_hashes:
previous_rev, previous_changeset_revision_label = \
hg_util.get_rev_label_from_changeset_revision( repo, previous_changeset_revision )
revision_reviews = get_reviews_by_repository_id_changeset_revision( app,
app.security.encode_id( repository.id ),
previous_changeset_revision )
previous_reviews_dict[ previous_changeset_revision ] = \
dict( changeset_revision_label=previous_changeset_revision_label,
reviews=revision_reviews )
return previous_reviews_dict
[docs]def get_review( app, id ):
"""Get a repository_review from the database via id."""
sa_session = app.model.context.current
return sa_session.query( app.model.RepositoryReview ).get( app.security.decode_id( id ) )
[docs]def get_review_by_repository_id_changeset_revision_user_id( app, repository_id, changeset_revision, user_id ):
"""
Get a repository_review from the database via repository id, changeset_revision
and user_id.
"""
sa_session = app.model.context.current
return sa_session.query( app.model.RepositoryReview ) \
.filter( and_( app.model.RepositoryReview.repository_id == app.security.decode_id( repository_id ),
app.model.RepositoryReview.changeset_revision == changeset_revision,
app.model.RepositoryReview.user_id == app.security.decode_id( user_id ) ) ) \
.first()
[docs]def get_reviews_by_repository_id_changeset_revision( app, repository_id, changeset_revision ):
"""Get all repository_reviews from the database via repository id and changeset_revision."""
sa_session = app.model.context.current
return sa_session.query( app.model.RepositoryReview ) \
.filter( and_( app.model.RepositoryReview.repository_id == app.security.decode_id( repository_id ),
app.model.RepositoryReview.changeset_revision == changeset_revision ) ) \
.all()
[docs]def has_previous_repository_reviews( app, repository, changeset_revision ):
"""
Determine if a repository has a changeset revision review prior to the
received changeset revision.
"""
repo = hg_util.get_repo_for_repository( app, repository=repository, repo_path=None, create=False )
reviewed_revision_hashes = [ review.changeset_revision for review in repository.reviews ]
for changeset in hg_util.reversed_upper_bounded_changelog( repo, changeset_revision ):
previous_changeset_revision = str( repo.changectx( changeset ) )
if previous_changeset_revision in reviewed_revision_hashes:
return True
return False