Skip to content

Commit

Permalink
ckanext-activity - speed up activity stream loading
Browse files Browse the repository at this point in the history
    - Use subselect instead of union for more effective indexing
  • Loading branch information
duttonw committed Apr 8, 2024
1 parent 7eedfc1 commit 52f3c99
Showing 1 changed file with 56 additions and 50 deletions.
106 changes: 56 additions & 50 deletions ckanext/activity/model/activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from __future__ import annotations

import datetime
from typing import Any, Iterable, Optional, Type, TypeVar
from typing import Any, Iterable, Optional, Type, TypeVar, Union, List, Tuple
from typing_extensions import TypeAlias

from sqlalchemy.orm import relationship, backref, Mapped, defer, load_only
Expand All @@ -12,8 +12,11 @@
ForeignKey,
or_,
and_,
union_all,
not_,
text,
select,
literal,
)

from ckan.common import config
Expand Down Expand Up @@ -49,7 +52,7 @@ class Activity(domain_object.DomainObject, BaseModel): # type: ignore
# legacy revision_id values are used by migrate_package_activity.py
revision_id = Column("revision_id", types.UnicodeText)
activity_type = Column("activity_type", types.UnicodeText)
data = Column("data", _types.JsonDictType)
data = Column("data", _types.JsonDictType) # Big JSON BLOB
permission_labels = Column("permission_labels", types.Text)

def __init__(
Expand Down Expand Up @@ -253,30 +256,33 @@ def _activities_union_all(*qlist: QActivity) -> QActivity:
Return union of two or more activity queries sorted by timestamp,
and remove duplicates
"""
q, *rest = qlist
for query in rest:
q = q.union(query)

q: QActivity = (
model.Session.query(Activity)
.options(_activities_load_only_without_data())
.select_entity_from(union_all(*[q.subquery().select() for q in qlist]))
.distinct(Activity.timestamp)
)
return q


def _activities_from_user_query(user_id: str) -> QActivity:
def _activities_from_user_query(user_id: Union[str, List[str]]) -> QActivity:
"""Return an SQLAlchemy query for all activities from user_id."""
q = model.Session.query(Activity)
q = q.options(_activities_load_only_without_data())
q = q.filter(Activity.user_id == user_id)
q = q.filter(Activity.user_id.in_(_to_list(user_id))) # type: ignore
return q


def _activities_about_user_query(user_id: str) -> QActivity:
def _activities_about_user_query(user_id: Union[str, List[str]]) -> QActivity:
"""Return an SQLAlchemy query for all activities about user_id."""
q = model.Session.query(Activity)
q = q.options(_activities_load_only_without_data())
q = q.filter(Activity.object_id == user_id)
q = q.filter(Activity.object_id.in_(_to_list(user_id))) # type: ignore
return q


def _user_activity_query(user_id: str, limit: int) -> QActivity:
def _user_activity_query(
user_id: Union[str, List[str]], limit: int) -> QActivity:
"""Return an SQLAlchemy query for all activities from or about user_id."""
q1 = _activities_limit(_activities_from_user_query(user_id), limit)
q2 = _activities_limit(_activities_about_user_query(user_id), limit)
Expand Down Expand Up @@ -337,11 +343,17 @@ def user_activity_list(
return results


def _package_activity_query(package_id: str) -> QActivity:
def _to_list(vals: Union[List[str], Tuple[str], str]):
if isinstance(vals, (list, tuple)):
return vals
return [vals]


def _package_activity_query(package_id: Union[str, List[str]]) -> QActivity:
"""Return an SQLAlchemy query for all activities about package_id."""
q = model.Session.query(Activity) \
.options(_activities_defer_data()) \
.filter_by(object_id=package_id)
.filter(Activity.object_id.in_(_to_list(package_id))) # type: ignore
return q


Expand Down Expand Up @@ -409,18 +421,15 @@ def package_activity_list(
return results


def _group_activity_query(group_id: str) -> QActivity:
def _group_activity_query(group_id: Union[str, List[str]]) -> QActivity:
"""Return an SQLAlchemy query for all activities about group_id.
Returns a query for all activities whose object is either the group itself
or one of the group's datasets.
"""
group = model.Group.get(group_id)
if not group:
# Return a query with no results.
return model.Session.query(Activity).filter(text("0=1"))

groups = _to_list(group_id)
q: QActivity = (
model.Session.query(Activity)
.options(_activities_defer_data())
Expand All @@ -441,20 +450,20 @@ def _group_activity_query(group_id: str) -> QActivity:
or_(
# active dataset in the group
and_(
model.Member.group_id == group_id,
model.Member.group_id.in_(groups), # type: ignore
model.Member.state == "active",
model.Package.state == "active",
),
# deleted dataset in the group
and_(
model.Member.group_id == group_id,
model.Member.group_id.in_(groups), # type: ignore
model.Member.state == "deleted",
model.Package.state == "deleted",
),
# (we want to avoid showing changes to an active dataset that
# was once in this group)
# activity the the group itself
Activity.object_id == group_id,
Activity.object_id.in_(groups), # type: ignore
)
)
)
Expand All @@ -479,21 +488,27 @@ def _organization_activity_query(org_id: str) -> QActivity:
q: QActivity = (
model.Session.query(Activity)
.options(_activities_defer_data())
.outerjoin(
model.Package,
and_(
model.Package.id == Activity.object_id,
model.Package.private == False, # noqa
),
)
.filter(
# We only care about activity either on the the org itself or on
# packages within that org.
# FIXME: This means that activity that occured while a package
# belonged to a org but was then removed will not show up. This may
# not be desired but is consistent with legacy behaviour.
or_(
model.Package.owner_org == org_id, Activity.object_id == org_id
#
# Use subselect instead of outer join so that it can all
# be indexable
Activity.object_id.in_( # type: ignore
select([model.Package.id]) # type: ignore
.where(
and_(
model.Package.private == False, # noqa
model.Package.owner_org == org_id
)
)
.union(
# select the org itself
select([literal(org_id)])
)
)
)
)
Expand Down Expand Up @@ -632,12 +647,9 @@ def _activities_from_users_followed_by_user_query(
.options(_activities_load_only_without_data()) \
.filter(text("0=1"))

return _activities_union_all(
*[
_user_activity_query(follower.object_id, limit)
for follower in follower_objects
]
)
return _user_activity_query(
[follower.object_id for follower in follower_objects],
limit)


def _activities_from_datasets_followed_by_user_query(
Expand All @@ -652,14 +664,10 @@ def _activities_from_datasets_followed_by_user_query(
.options(_activities_load_only_without_data()) \
.filter(text("0=1"))

return _activities_union_all(
*[
_activities_limit(
_package_activity_query(follower.object_id), limit
)
for follower in follower_objects
]
)
return _activities_limit(
_package_activity_query(
[follower.object_id for follower in follower_objects]),
limit)


def _activities_from_groups_followed_by_user_query(
Expand All @@ -680,12 +688,10 @@ def _activities_from_groups_followed_by_user_query(
.options(_activities_load_only_without_data()) \
.filter(text("0=1"))

return _activities_union_all(
*[
_activities_limit(_group_activity_query(follower.object_id), limit)
for follower in follower_objects
]
)
return _activities_limit(
_group_activity_query(
[follower.object_id for follower in follower_objects]),
limit)


def _activities_from_everything_followed_by_user_query(
Expand Down

0 comments on commit 52f3c99

Please sign in to comment.