diff --git a/bookwyrm/activitystreams.py b/bookwyrm/activitystreams.py index 279079c81..8d1e83018 100644 --- a/bookwyrm/activitystreams.py +++ b/bookwyrm/activitystreams.py @@ -1,18 +1,13 @@ """ access the activity streams stored in redis """ -from abc import ABC from django.dispatch import receiver from django.db.models import signals, Q -import redis -from bookwyrm import models, settings +from bookwyrm import models +from bookwyrm.redis_store import RedisStore, r from bookwyrm.views.helpers import privacy_filter -r = redis.Redis( - host=settings.REDIS_ACTIVITY_HOST, port=settings.REDIS_ACTIVITY_PORT, db=0 -) - -class ActivityStream(ABC): +class ActivityStream(RedisStore): """ a category of activity stream (like home, local, federated) """ def stream_id(self, user): @@ -23,58 +18,38 @@ class ActivityStream(ABC): """ the redis key for this user's unread count for this stream """ return "{}-unread".format(self.stream_id(user)) - def get_value(self, status): # pylint: disable=no-self-use - """ the status id and the rank (ie, published date) """ - return {status.id: status.published_date.timestamp()} + def get_rank(self, obj): # pylint: disable=no-self-use + """ the sort rank of a status, which is published date """ + return obj.published_date.timestamp() def add_status(self, status): """ add a status to users' feeds """ - value = self.get_value(status) - # we want to do this as a bulk operation, hence "pipeline" - pipeline = r.pipeline() - for user in self.stream_users(status): - # add the status to the feed - pipeline.zadd(self.stream_id(user), value) - pipeline.zremrangebyrank( - self.stream_id(user), 0, -1 * settings.MAX_STREAM_LENGTH - ) + # the pipeline contains all the addp-to-stream activities + pipeline = self.add_object_to_related_stores(status, execute=False) + + for user in self.get_audience(status): # add to the unread status count pipeline.incr(self.unread_id(user)) - # and go! - pipeline.execute() - def remove_status(self, status): - """ remove a status from all feeds """ - pipeline = r.pipeline() - for user in self.stream_users(status): - pipeline.zrem(self.stream_id(user), -1, status.id) + # and go! pipeline.execute() def add_user_statuses(self, viewer, user): """ add a user's statuses to another user's feed """ - pipeline = r.pipeline() - statuses = user.status_set.all()[: settings.MAX_STREAM_LENGTH] - for status in statuses: - pipeline.zadd(self.stream_id(viewer), self.get_value(status)) - if statuses: - pipeline.zremrangebyrank( - self.stream_id(user), 0, -1 * settings.MAX_STREAM_LENGTH - ) - pipeline.execute() + statuses = privacy_filter(viewer, user.status_set.all()) + self.bulk_add_objects_to_store(statuses, self.stream_id(viewer)) def remove_user_statuses(self, viewer, user): """ remove a user's status from another user's feed """ - pipeline = r.pipeline() - for status in user.status_set.all()[: settings.MAX_STREAM_LENGTH]: - pipeline.lrem(self.stream_id(viewer), -1, status.id) - pipeline.execute() + statuses = user.status_set.all() + self.bulk_remove_objects_from_store(statuses, self.stream_id(viewer)) def get_activity_stream(self, user): - """ load the ids for statuses to be displayed """ + """ load the statuses to be displayed """ # clear unreads for this feed r.set(self.unread_id(user), 0) - statuses = r.zrevrange(self.stream_id(user), 0, -1) + statuses = super().get_store(self.stream_id(user)) return ( models.Status.objects.select_subclasses() .filter(id__in=statuses) @@ -85,23 +60,11 @@ class ActivityStream(ABC): """ get the unread status count for this user's feed """ return int(r.get(self.unread_id(user)) or 0) - def populate_stream(self, user): + def populate_streamse(self, user): """ go from zero to a timeline """ - pipeline = r.pipeline() - statuses = self.stream_statuses(user) + super().populate_store(self.stream_id(user)) - stream_id = self.stream_id(user) - for status in statuses.all()[: settings.MAX_STREAM_LENGTH]: - pipeline.zadd(stream_id, self.get_value(status)) - - # only trim the stream if statuses were added - if statuses.exists(): - pipeline.zremrangebyrank( - self.stream_id(user), 0, -1 * settings.MAX_STREAM_LENGTH - ) - pipeline.execute() - - def stream_users(self, status): # pylint: disable=no-self-use + def get_audience(self, status): # pylint: disable=no-self-use """ given a status, what users should see it """ # direct messages don't appeard in feeds, direct comments/reviews/etc do if status.privacy == "direct" and status.status_type == "Note": @@ -129,7 +92,10 @@ class ActivityStream(ABC): ) return audience.distinct() - def stream_statuses(self, user): # pylint: disable=no-self-use + def get_stores_for_object(self, obj): + return [self.stream_id(u) for u in self.get_audience(obj)] + + def get_statuses_for_user(self, user): # pylint: disable=no-self-use """ given a user, what statuses should they see on this stream """ return privacy_filter( user, @@ -137,14 +103,18 @@ class ActivityStream(ABC): privacy_levels=["public", "unlisted", "followers"], ) + def get_objects_for_store(self, store): + user = models.User.objects.get(id=store.split('-')[0]) + return self.get_statuses_for_user(user) + class HomeStream(ActivityStream): """ users you follow """ key = "home" - def stream_users(self, status): - audience = super().stream_users(status) + def get_audience(self, status): + audience = super().get_audience(status) if not audience: return [] return audience.filter( @@ -152,7 +122,7 @@ class HomeStream(ActivityStream): | Q(following=status.user) # if the user is following the author ).distinct() - def stream_statuses(self, user): + def get_statuses_for_user(self, user): return privacy_filter( user, models.Status.objects.select_subclasses(), @@ -166,13 +136,13 @@ class LocalStream(ActivityStream): key = "local" - def stream_users(self, status): + def get_audience(self, status): # this stream wants no part in non-public statuses if status.privacy != "public" or not status.user.local: return [] - return super().stream_users(status) + return super().get_audience(status) - def stream_statuses(self, user): + def get_statuses_for_user(self, user): # all public statuses by a local user return privacy_filter( user, @@ -186,13 +156,13 @@ class FederatedStream(ActivityStream): key = "federated" - def stream_users(self, status): + def get_audience(self, status): # this stream wants no part in non-public statuses if status.privacy != "public": return [] - return super().stream_users(status) + return super().get_audience(status) - def stream_statuses(self, user): + def get_statuses_for_user(self, user): return privacy_filter( user, models.Status.objects.select_subclasses(), @@ -217,7 +187,7 @@ def add_status_on_create(sender, instance, created, *args, **kwargs): if instance.deleted: for stream in streams.values(): - stream.remove_status(instance) + stream.remove_object_from_related_stores(instance) return if not created: @@ -234,7 +204,7 @@ def remove_boost_on_delete(sender, instance, *args, **kwargs): """ boosts are deleted """ # we're only interested in new statuses for stream in streams.values(): - stream.remove_status(instance) + stream.remove_object_from_related_stores(instance) @receiver(signals.post_save, sender=models.UserFollows) @@ -248,7 +218,7 @@ def add_statuses_on_follow(sender, instance, created, *args, **kwargs): @receiver(signals.post_delete, sender=models.UserFollows) # pylint: disable=unused-argument -def remove_statuses_on_unfollow(sender, instance, *args, **kwargs): +def remove_objectes_on_unfollow(sender, instance, *args, **kwargs): """ remove statuses from a feed on unfollow """ if not instance.user_subject.local: return @@ -257,7 +227,7 @@ def remove_statuses_on_unfollow(sender, instance, *args, **kwargs): @receiver(signals.post_save, sender=models.UserBlocks) # pylint: disable=unused-argument -def remove_statuses_on_block(sender, instance, *args, **kwargs): +def remove_objectes_on_block(sender, instance, *args, **kwargs): """ remove statuses from all feeds on block """ # blocks apply ot all feeds if instance.user_subject.local: @@ -294,4 +264,4 @@ def populate_streams_on_account_create(sender, instance, created, *args, **kwarg return for stream in streams.values(): - stream.populate_stream(instance) + stream.populate_streams(instance) diff --git a/bookwyrm/redis_store.py b/bookwyrm/redis_store.py new file mode 100644 index 000000000..8255fcbda --- /dev/null +++ b/bookwyrm/redis_store.py @@ -0,0 +1,91 @@ +""" access the activity stores stored in redis """ +from abc import ABC, abstractmethod +import redis + +from bookwyrm import settings + +r = redis.Redis( + host=settings.REDIS_ACTIVITY_HOST, port=settings.REDIS_ACTIVITY_PORT, db=0 +) + + +class RedisStore(ABC): + """ sets of ranked, related objects, like statuses for a user's feed """ + max_length = settings.MAX_STREAM_LENGTH + + def get_value(self, obj): + """ the object and rank """ + return {obj.id: self.get_rank(obj)} + + def add_object_to_related_stores(self, obj, execute=True): + """ add an object to all suitable stores """ + value = self.get_value(obj) + # we want to do this as a bulk operation, hence "pipeline" + pipeline = r.pipeline() + for store in self.get_stores_for_object(obj): + # add the status to the feed + pipeline.zadd(store, value) + # trim the store + pipeline.zremrangebyrank( + store, 0, -1 * self.max_length + ) + if not execute: + return pipeline + # and go! + return pipeline.execute() + + def remove_object_from_related_stores(self, obj): + """ remove an object from all stores """ + pipeline = r.pipeline() + for store in self.get_stores_for_object(obj): + pipeline.zrem(store, -1, obj.id) + pipeline.execute() + + def bulk_add_objects_to_store(self, objs, store): + """ add a list of objects to a given store """ + pipeline = r.pipeline() + for obj in objs[:self.max_length]: + pipeline.zadd(store, self.get_value(obj)) + if objs: + pipeline.zremrangebyrank( + store, 0, -1 * self.max_length + ) + pipeline.execute() + + def bulk_remove_objects_from_store(self, objs, store): + """ remoev a list of objects from a given store """ + pipeline = r.pipeline() + for obj in objs[:self.max_length]: + pipeline.zrem(store, -1, obj.id) + pipeline.execute() + + def get_store(self, store): # pylint: disable=no-self-use + """ load the values in a store """ + return r.zrevrange(store, 0, -1) + + def populate_store(self, store): + """ go from zero to a store """ + pipeline = r.pipeline() + queryset = self.get_objects_for_store(store) + + for obj in queryset[:self.max_length]: + pipeline.zadd(store, self.get_value(obj)) + + # only trim the store if objects were added + if queryset.exists(): + pipeline.zremrangebyrank( + store, 0, -1 * self.max_length + ) + pipeline.execute() + + @abstractmethod + def get_objects_for_store(self, store): + """ a queryset of what should go in a store, used for populating it """ + + @abstractmethod + def get_stores_for_object(self, obj): + """ the stores that an object belongs in """ + + @abstractmethod + def get_rank(self, obj): + """ how to rank an object """ diff --git a/bookwyrm/views/feed.py b/bookwyrm/views/feed.py index e4be50e33..cda115867 100644 --- a/bookwyrm/views/feed.py +++ b/bookwyrm/views/feed.py @@ -31,7 +31,6 @@ class Feed(View): tab = "home" activities = activitystreams.streams[tab].get_activity_stream(request.user) - paginated = Paginator(activities, PAGE_LENGTH) suggested_users = get_suggested_users(request.user)