Created generic redis class for activitstreams

This commit is contained in:
Mouse Reeve 2021-04-05 11:05:37 -07:00
parent d25c68f887
commit bd8858830a
3 changed files with 133 additions and 73 deletions

View file

@ -1,18 +1,13 @@
""" access the activity streams stored in redis """ """ access the activity streams stored in redis """
from abc import ABC
from django.dispatch import receiver from django.dispatch import receiver
from django.db.models import signals, Q from django.db.models import signals, Q
import redis
from bookwyrm import models, settings from bookwyrm import models
from bookwyrm.redis_store import RedisStore, r
from bookwyrm.views.helpers import privacy_filter from bookwyrm.views.helpers import privacy_filter
r = redis.Redis(
host=settings.REDIS_ACTIVITY_HOST, port=settings.REDIS_ACTIVITY_PORT, db=0
)
class ActivityStream(RedisStore):
class ActivityStream(ABC):
""" a category of activity stream (like home, local, federated) """ """ a category of activity stream (like home, local, federated) """
def stream_id(self, user): def stream_id(self, user):
@ -23,58 +18,38 @@ class ActivityStream(ABC):
""" the redis key for this user's unread count for this stream """ """ the redis key for this user's unread count for this stream """
return "{}-unread".format(self.stream_id(user)) return "{}-unread".format(self.stream_id(user))
def get_value(self, status): # pylint: disable=no-self-use def get_rank(self, obj): # pylint: disable=no-self-use
""" the status id and the rank (ie, published date) """ """ the sort rank of a status, which is published date """
return {status.id: status.published_date.timestamp()} return obj.published_date.timestamp()
def add_status(self, status): def add_status(self, status):
""" add a status to users' feeds """ """ add a status to users' feeds """
value = self.get_value(status) # the pipeline contains all the addp-to-stream activities
# we want to do this as a bulk operation, hence "pipeline" pipeline = self.add_object_to_related_stores(status, execute=False)
pipeline = r.pipeline()
for user in self.stream_users(status): for user in self.get_audience(status):
# add the status to the feed
pipeline.zadd(self.stream_id(user), value)
pipeline.zremrangebyrank(
self.stream_id(user), 0, -1 * settings.MAX_STREAM_LENGTH
)
# add to the unread status count # add to the unread status count
pipeline.incr(self.unread_id(user)) pipeline.incr(self.unread_id(user))
# and go!
pipeline.execute()
def remove_status(self, status): # and go!
""" remove a status from all feeds """
pipeline = r.pipeline()
for user in self.stream_users(status):
pipeline.zrem(self.stream_id(user), -1, status.id)
pipeline.execute() pipeline.execute()
def add_user_statuses(self, viewer, user): def add_user_statuses(self, viewer, user):
""" add a user's statuses to another user's feed """ """ add a user's statuses to another user's feed """
pipeline = r.pipeline() statuses = privacy_filter(viewer, user.status_set.all())
statuses = user.status_set.all()[: settings.MAX_STREAM_LENGTH] self.bulk_add_objects_to_store(statuses, self.stream_id(viewer))
for status in statuses:
pipeline.zadd(self.stream_id(viewer), self.get_value(status))
if statuses:
pipeline.zremrangebyrank(
self.stream_id(user), 0, -1 * settings.MAX_STREAM_LENGTH
)
pipeline.execute()
def remove_user_statuses(self, viewer, user): def remove_user_statuses(self, viewer, user):
""" remove a user's status from another user's feed """ """ remove a user's status from another user's feed """
pipeline = r.pipeline() statuses = user.status_set.all()
for status in user.status_set.all()[: settings.MAX_STREAM_LENGTH]: self.bulk_remove_objects_from_store(statuses, self.stream_id(viewer))
pipeline.lrem(self.stream_id(viewer), -1, status.id)
pipeline.execute()
def get_activity_stream(self, user): def get_activity_stream(self, user):
""" load the ids for statuses to be displayed """ """ load the statuses to be displayed """
# clear unreads for this feed # clear unreads for this feed
r.set(self.unread_id(user), 0) r.set(self.unread_id(user), 0)
statuses = r.zrevrange(self.stream_id(user), 0, -1) statuses = super().get_store(self.stream_id(user))
return ( return (
models.Status.objects.select_subclasses() models.Status.objects.select_subclasses()
.filter(id__in=statuses) .filter(id__in=statuses)
@ -85,23 +60,11 @@ class ActivityStream(ABC):
""" get the unread status count for this user's feed """ """ get the unread status count for this user's feed """
return int(r.get(self.unread_id(user)) or 0) return int(r.get(self.unread_id(user)) or 0)
def populate_stream(self, user): def populate_streamse(self, user):
""" go from zero to a timeline """ """ go from zero to a timeline """
pipeline = r.pipeline() super().populate_store(self.stream_id(user))
statuses = self.stream_statuses(user)
stream_id = self.stream_id(user) def get_audience(self, status): # pylint: disable=no-self-use
for status in statuses.all()[: settings.MAX_STREAM_LENGTH]:
pipeline.zadd(stream_id, self.get_value(status))
# only trim the stream if statuses were added
if statuses.exists():
pipeline.zremrangebyrank(
self.stream_id(user), 0, -1 * settings.MAX_STREAM_LENGTH
)
pipeline.execute()
def stream_users(self, status): # pylint: disable=no-self-use
""" given a status, what users should see it """ """ given a status, what users should see it """
# direct messages don't appeard in feeds, direct comments/reviews/etc do # direct messages don't appeard in feeds, direct comments/reviews/etc do
if status.privacy == "direct" and status.status_type == "Note": if status.privacy == "direct" and status.status_type == "Note":
@ -129,7 +92,10 @@ class ActivityStream(ABC):
) )
return audience.distinct() return audience.distinct()
def stream_statuses(self, user): # pylint: disable=no-self-use def get_stores_for_object(self, obj):
return [self.stream_id(u) for u in self.get_audience(obj)]
def get_statuses_for_user(self, user): # pylint: disable=no-self-use
""" given a user, what statuses should they see on this stream """ """ given a user, what statuses should they see on this stream """
return privacy_filter( return privacy_filter(
user, user,
@ -137,14 +103,18 @@ class ActivityStream(ABC):
privacy_levels=["public", "unlisted", "followers"], privacy_levels=["public", "unlisted", "followers"],
) )
def get_objects_for_store(self, store):
user = models.User.objects.get(id=store.split('-')[0])
return self.get_statuses_for_user(user)
class HomeStream(ActivityStream): class HomeStream(ActivityStream):
""" users you follow """ """ users you follow """
key = "home" key = "home"
def stream_users(self, status): def get_audience(self, status):
audience = super().stream_users(status) audience = super().get_audience(status)
if not audience: if not audience:
return [] return []
return audience.filter( return audience.filter(
@ -152,7 +122,7 @@ class HomeStream(ActivityStream):
| Q(following=status.user) # if the user is following the author | Q(following=status.user) # if the user is following the author
).distinct() ).distinct()
def stream_statuses(self, user): def get_statuses_for_user(self, user):
return privacy_filter( return privacy_filter(
user, user,
models.Status.objects.select_subclasses(), models.Status.objects.select_subclasses(),
@ -166,13 +136,13 @@ class LocalStream(ActivityStream):
key = "local" key = "local"
def stream_users(self, status): def get_audience(self, status):
# this stream wants no part in non-public statuses # this stream wants no part in non-public statuses
if status.privacy != "public" or not status.user.local: if status.privacy != "public" or not status.user.local:
return [] return []
return super().stream_users(status) return super().get_audience(status)
def stream_statuses(self, user): def get_statuses_for_user(self, user):
# all public statuses by a local user # all public statuses by a local user
return privacy_filter( return privacy_filter(
user, user,
@ -186,13 +156,13 @@ class FederatedStream(ActivityStream):
key = "federated" key = "federated"
def stream_users(self, status): def get_audience(self, status):
# this stream wants no part in non-public statuses # this stream wants no part in non-public statuses
if status.privacy != "public": if status.privacy != "public":
return [] return []
return super().stream_users(status) return super().get_audience(status)
def stream_statuses(self, user): def get_statuses_for_user(self, user):
return privacy_filter( return privacy_filter(
user, user,
models.Status.objects.select_subclasses(), models.Status.objects.select_subclasses(),
@ -217,7 +187,7 @@ def add_status_on_create(sender, instance, created, *args, **kwargs):
if instance.deleted: if instance.deleted:
for stream in streams.values(): for stream in streams.values():
stream.remove_status(instance) stream.remove_object_from_related_stores(instance)
return return
if not created: if not created:
@ -234,7 +204,7 @@ def remove_boost_on_delete(sender, instance, *args, **kwargs):
""" boosts are deleted """ """ boosts are deleted """
# we're only interested in new statuses # we're only interested in new statuses
for stream in streams.values(): for stream in streams.values():
stream.remove_status(instance) stream.remove_object_from_related_stores(instance)
@receiver(signals.post_save, sender=models.UserFollows) @receiver(signals.post_save, sender=models.UserFollows)
@ -248,7 +218,7 @@ def add_statuses_on_follow(sender, instance, created, *args, **kwargs):
@receiver(signals.post_delete, sender=models.UserFollows) @receiver(signals.post_delete, sender=models.UserFollows)
# pylint: disable=unused-argument # pylint: disable=unused-argument
def remove_statuses_on_unfollow(sender, instance, *args, **kwargs): def remove_objectes_on_unfollow(sender, instance, *args, **kwargs):
""" remove statuses from a feed on unfollow """ """ remove statuses from a feed on unfollow """
if not instance.user_subject.local: if not instance.user_subject.local:
return return
@ -257,7 +227,7 @@ def remove_statuses_on_unfollow(sender, instance, *args, **kwargs):
@receiver(signals.post_save, sender=models.UserBlocks) @receiver(signals.post_save, sender=models.UserBlocks)
# pylint: disable=unused-argument # pylint: disable=unused-argument
def remove_statuses_on_block(sender, instance, *args, **kwargs): def remove_objectes_on_block(sender, instance, *args, **kwargs):
""" remove statuses from all feeds on block """ """ remove statuses from all feeds on block """
# blocks apply ot all feeds # blocks apply ot all feeds
if instance.user_subject.local: if instance.user_subject.local:
@ -294,4 +264,4 @@ def populate_streams_on_account_create(sender, instance, created, *args, **kwarg
return return
for stream in streams.values(): for stream in streams.values():
stream.populate_stream(instance) stream.populate_streams(instance)

91
bookwyrm/redis_store.py Normal file
View file

@ -0,0 +1,91 @@
""" access the activity stores stored in redis """
from abc import ABC, abstractmethod
import redis
from bookwyrm import settings
r = redis.Redis(
host=settings.REDIS_ACTIVITY_HOST, port=settings.REDIS_ACTIVITY_PORT, db=0
)
class RedisStore(ABC):
""" sets of ranked, related objects, like statuses for a user's feed """
max_length = settings.MAX_STREAM_LENGTH
def get_value(self, obj):
""" the object and rank """
return {obj.id: self.get_rank(obj)}
def add_object_to_related_stores(self, obj, execute=True):
""" add an object to all suitable stores """
value = self.get_value(obj)
# we want to do this as a bulk operation, hence "pipeline"
pipeline = r.pipeline()
for store in self.get_stores_for_object(obj):
# add the status to the feed
pipeline.zadd(store, value)
# trim the store
pipeline.zremrangebyrank(
store, 0, -1 * self.max_length
)
if not execute:
return pipeline
# and go!
return pipeline.execute()
def remove_object_from_related_stores(self, obj):
""" remove an object from all stores """
pipeline = r.pipeline()
for store in self.get_stores_for_object(obj):
pipeline.zrem(store, -1, obj.id)
pipeline.execute()
def bulk_add_objects_to_store(self, objs, store):
""" add a list of objects to a given store """
pipeline = r.pipeline()
for obj in objs[:self.max_length]:
pipeline.zadd(store, self.get_value(obj))
if objs:
pipeline.zremrangebyrank(
store, 0, -1 * self.max_length
)
pipeline.execute()
def bulk_remove_objects_from_store(self, objs, store):
""" remoev a list of objects from a given store """
pipeline = r.pipeline()
for obj in objs[:self.max_length]:
pipeline.zrem(store, -1, obj.id)
pipeline.execute()
def get_store(self, store): # pylint: disable=no-self-use
""" load the values in a store """
return r.zrevrange(store, 0, -1)
def populate_store(self, store):
""" go from zero to a store """
pipeline = r.pipeline()
queryset = self.get_objects_for_store(store)
for obj in queryset[:self.max_length]:
pipeline.zadd(store, self.get_value(obj))
# only trim the store if objects were added
if queryset.exists():
pipeline.zremrangebyrank(
store, 0, -1 * self.max_length
)
pipeline.execute()
@abstractmethod
def get_objects_for_store(self, store):
""" a queryset of what should go in a store, used for populating it """
@abstractmethod
def get_stores_for_object(self, obj):
""" the stores that an object belongs in """
@abstractmethod
def get_rank(self, obj):
""" how to rank an object """

View file

@ -31,7 +31,6 @@ class Feed(View):
tab = "home" tab = "home"
activities = activitystreams.streams[tab].get_activity_stream(request.user) activities = activitystreams.streams[tab].get_activity_stream(request.user)
paginated = Paginator(activities, PAGE_LENGTH) paginated = Paginator(activities, PAGE_LENGTH)
suggested_users = get_suggested_users(request.user) suggested_users = get_suggested_users(request.user)