diff --git a/bookwyrm/activitystreams.py b/bookwyrm/activitystreams.py index 279079c8..949ae9da 100644 --- a/bookwyrm/activitystreams.py +++ b/bookwyrm/activitystreams.py @@ -1,18 +1,13 @@ """ access the activity streams stored in redis """ -from abc import ABC from django.dispatch import receiver from django.db.models import signals, Q -import redis -from bookwyrm import models, settings +from bookwyrm import models +from bookwyrm.redis_store import RedisStore, r from bookwyrm.views.helpers import privacy_filter -r = redis.Redis( - host=settings.REDIS_ACTIVITY_HOST, port=settings.REDIS_ACTIVITY_PORT, db=0 -) - -class ActivityStream(ABC): +class ActivityStream(RedisStore): """ a category of activity stream (like home, local, federated) """ def stream_id(self, user): @@ -23,58 +18,40 @@ class ActivityStream(ABC): """ the redis key for this user's unread count for this stream """ return "{}-unread".format(self.stream_id(user)) - def get_value(self, status): # pylint: disable=no-self-use - """ the status id and the rank (ie, published date) """ - return {status.id: status.published_date.timestamp()} + def get_rank(self, obj): # pylint: disable=no-self-use + """ statuses are sorted by date published """ + return obj.published_date.timestamp() def add_status(self, status): """ add a status to users' feeds """ - value = self.get_value(status) - # we want to do this as a bulk operation, hence "pipeline" - pipeline = r.pipeline() - for user in self.stream_users(status): - # add the status to the feed - pipeline.zadd(self.stream_id(user), value) - pipeline.zremrangebyrank( - self.stream_id(user), 0, -1 * settings.MAX_STREAM_LENGTH - ) + # the pipeline contains all the add-to-stream activities + pipeline = self.add_object_to_related_stores(status, execute=False) + + for user in self.get_audience(status): # add to the unread status count pipeline.incr(self.unread_id(user)) - # and go! - pipeline.execute() - def remove_status(self, status): - """ remove a status from all feeds """ - pipeline = r.pipeline() - for user in self.stream_users(status): - pipeline.zrem(self.stream_id(user), -1, status.id) + # and go! pipeline.execute() def add_user_statuses(self, viewer, user): """ add a user's statuses to another user's feed """ - pipeline = r.pipeline() - statuses = user.status_set.all()[: settings.MAX_STREAM_LENGTH] - for status in statuses: - pipeline.zadd(self.stream_id(viewer), self.get_value(status)) - if statuses: - pipeline.zremrangebyrank( - self.stream_id(user), 0, -1 * settings.MAX_STREAM_LENGTH - ) - pipeline.execute() + # only add the statuses that the viewer should be able to see (ie, not dms) + statuses = privacy_filter(viewer, user.status_set.all()) + self.bulk_add_objects_to_store(statuses, self.stream_id(viewer)) def remove_user_statuses(self, viewer, user): """ remove a user's status from another user's feed """ - pipeline = r.pipeline() - for status in user.status_set.all()[: settings.MAX_STREAM_LENGTH]: - pipeline.lrem(self.stream_id(viewer), -1, status.id) - pipeline.execute() + # remove all so that followers only statuses are removed + statuses = user.status_set.all() + self.bulk_remove_objects_from_store(statuses, self.stream_id(viewer)) def get_activity_stream(self, user): - """ load the ids for statuses to be displayed """ + """ load the statuses to be displayed """ # clear unreads for this feed r.set(self.unread_id(user), 0) - statuses = r.zrevrange(self.stream_id(user), 0, -1) + statuses = self.get_store(self.stream_id(user)) return ( models.Status.objects.select_subclasses() .filter(id__in=statuses) @@ -85,23 +62,11 @@ class ActivityStream(ABC): """ get the unread status count for this user's feed """ return int(r.get(self.unread_id(user)) or 0) - def populate_stream(self, user): + def populate_streams(self, user): """ go from zero to a timeline """ - pipeline = r.pipeline() - statuses = self.stream_statuses(user) + self.populate_store(self.stream_id(user)) - stream_id = self.stream_id(user) - for status in statuses.all()[: settings.MAX_STREAM_LENGTH]: - pipeline.zadd(stream_id, self.get_value(status)) - - # only trim the stream if statuses were added - if statuses.exists(): - pipeline.zremrangebyrank( - self.stream_id(user), 0, -1 * settings.MAX_STREAM_LENGTH - ) - pipeline.execute() - - def stream_users(self, status): # pylint: disable=no-self-use + def get_audience(self, status): # pylint: disable=no-self-use """ given a status, what users should see it """ # direct messages don't appeard in feeds, direct comments/reviews/etc do if status.privacy == "direct" and status.status_type == "Note": @@ -129,7 +94,10 @@ class ActivityStream(ABC): ) return audience.distinct() - def stream_statuses(self, user): # pylint: disable=no-self-use + def get_stores_for_object(self, obj): + return [self.stream_id(u) for u in self.get_audience(obj)] + + def get_statuses_for_user(self, user): # pylint: disable=no-self-use """ given a user, what statuses should they see on this stream """ return privacy_filter( user, @@ -137,14 +105,18 @@ class ActivityStream(ABC): privacy_levels=["public", "unlisted", "followers"], ) + def get_objects_for_store(self, store): + user = models.User.objects.get(id=store.split("-")[0]) + return self.get_statuses_for_user(user) + class HomeStream(ActivityStream): """ users you follow """ key = "home" - def stream_users(self, status): - audience = super().stream_users(status) + def get_audience(self, status): + audience = super().get_audience(status) if not audience: return [] return audience.filter( @@ -152,7 +124,7 @@ class HomeStream(ActivityStream): | Q(following=status.user) # if the user is following the author ).distinct() - def stream_statuses(self, user): + def get_statuses_for_user(self, user): return privacy_filter( user, models.Status.objects.select_subclasses(), @@ -166,13 +138,13 @@ class LocalStream(ActivityStream): key = "local" - def stream_users(self, status): + def get_audience(self, status): # this stream wants no part in non-public statuses if status.privacy != "public" or not status.user.local: return [] - return super().stream_users(status) + return super().get_audience(status) - def stream_statuses(self, user): + def get_statuses_for_user(self, user): # all public statuses by a local user return privacy_filter( user, @@ -186,13 +158,13 @@ class FederatedStream(ActivityStream): key = "federated" - def stream_users(self, status): + def get_audience(self, status): # this stream wants no part in non-public statuses if status.privacy != "public": return [] - return super().stream_users(status) + return super().get_audience(status) - def stream_statuses(self, user): + def get_statuses_for_user(self, user): return privacy_filter( user, models.Status.objects.select_subclasses(), @@ -217,7 +189,7 @@ def add_status_on_create(sender, instance, created, *args, **kwargs): if instance.deleted: for stream in streams.values(): - stream.remove_status(instance) + stream.remove_object_from_related_stores(instance) return if not created: @@ -234,7 +206,7 @@ def remove_boost_on_delete(sender, instance, *args, **kwargs): """ boosts are deleted """ # we're only interested in new statuses for stream in streams.values(): - stream.remove_status(instance) + stream.remove_object_from_related_stores(instance) @receiver(signals.post_save, sender=models.UserFollows) @@ -294,4 +266,4 @@ def populate_streams_on_account_create(sender, instance, created, *args, **kwarg return for stream in streams.values(): - stream.populate_stream(instance) + stream.populate_streams(instance) diff --git a/bookwyrm/redis_store.py b/bookwyrm/redis_store.py new file mode 100644 index 00000000..4236d6df --- /dev/null +++ b/bookwyrm/redis_store.py @@ -0,0 +1,86 @@ +""" access the activity stores stored in redis """ +from abc import ABC, abstractmethod +import redis + +from bookwyrm import settings + +r = redis.Redis( + host=settings.REDIS_ACTIVITY_HOST, port=settings.REDIS_ACTIVITY_PORT, db=0 +) + + +class RedisStore(ABC): + """ sets of ranked, related objects, like statuses for a user's feed """ + + max_length = settings.MAX_STREAM_LENGTH + + def get_value(self, obj): + """ the object and rank """ + return {obj.id: self.get_rank(obj)} + + def add_object_to_related_stores(self, obj, execute=True): + """ add an object to all suitable stores """ + value = self.get_value(obj) + # we want to do this as a bulk operation, hence "pipeline" + pipeline = r.pipeline() + for store in self.get_stores_for_object(obj): + # add the status to the feed + pipeline.zadd(store, value) + # trim the store + pipeline.zremrangebyrank(store, 0, -1 * self.max_length) + if not execute: + return pipeline + # and go! + return pipeline.execute() + + def remove_object_from_related_stores(self, obj): + """ remove an object from all stores """ + pipeline = r.pipeline() + for store in self.get_stores_for_object(obj): + pipeline.zrem(store, -1, obj.id) + pipeline.execute() + + def bulk_add_objects_to_store(self, objs, store): + """ add a list of objects to a given store """ + pipeline = r.pipeline() + for obj in objs[: self.max_length]: + pipeline.zadd(store, self.get_value(obj)) + if objs: + pipeline.zremrangebyrank(store, 0, -1 * self.max_length) + pipeline.execute() + + def bulk_remove_objects_from_store(self, objs, store): + """ remoev a list of objects from a given store """ + pipeline = r.pipeline() + for obj in objs[: self.max_length]: + pipeline.zrem(store, -1, obj.id) + pipeline.execute() + + def get_store(self, store): # pylint: disable=no-self-use + """ load the values in a store """ + return r.zrevrange(store, 0, -1) + + def populate_store(self, store): + """ go from zero to a store """ + pipeline = r.pipeline() + queryset = self.get_objects_for_store(store) + + for obj in queryset[: self.max_length]: + pipeline.zadd(store, self.get_value(obj)) + + # only trim the store if objects were added + if queryset.exists(): + pipeline.zremrangebyrank(store, 0, -1 * self.max_length) + pipeline.execute() + + @abstractmethod + def get_objects_for_store(self, store): + """ a queryset of what should go in a store, used for populating it """ + + @abstractmethod + def get_stores_for_object(self, obj): + """ the stores that an object belongs in """ + + @abstractmethod + def get_rank(self, obj): + """ how to rank an object """ diff --git a/bookwyrm/tests/models/test_status_model.py b/bookwyrm/tests/models/test_status_model.py index 208bf3ab..4263c457 100644 --- a/bookwyrm/tests/models/test_status_model.py +++ b/bookwyrm/tests/models/test_status_model.py @@ -116,7 +116,9 @@ class Status(TestCase): def test_status_to_activity_tombstone(self, *_): """ subclass of the base model version with a "pure" serializer """ - with patch("bookwyrm.activitystreams.ActivityStream.remove_status"): + with patch( + "bookwyrm.activitystreams.ActivityStream.remove_object_from_related_stores" + ): status = models.Status.objects.create( content="test content", user=self.local_user, diff --git a/bookwyrm/tests/test_activitystreams.py b/bookwyrm/tests/test_activitystreams.py index 88ca4693..b4efeba3 100644 --- a/bookwyrm/tests/test_activitystreams.py +++ b/bookwyrm/tests/test_activitystreams.py @@ -47,18 +47,18 @@ class Activitystreams(TestCase): "{}-test-unread".format(self.local_user.id), ) - def test_abstractstream_stream_users(self, *_): + def test_abstractstream_get_audience(self, *_): """ get a list of users that should see a status """ status = models.Status.objects.create( user=self.remote_user, content="hi", privacy="public" ) - users = self.test_stream.stream_users(status) + users = self.test_stream.get_audience(status) # remote users don't have feeds self.assertFalse(self.remote_user in users) self.assertTrue(self.local_user in users) self.assertTrue(self.another_user in users) - def test_abstractstream_stream_users_direct(self, *_): + def test_abstractstream_get_audience_direct(self, *_): """ get a list of users that should see a status """ status = models.Status.objects.create( user=self.remote_user, @@ -66,7 +66,7 @@ class Activitystreams(TestCase): privacy="direct", ) status.mention_users.add(self.local_user) - users = self.test_stream.stream_users(status) + users = self.test_stream.get_audience(status) self.assertEqual(users, []) status = models.Comment.objects.create( @@ -76,22 +76,22 @@ class Activitystreams(TestCase): book=self.book, ) status.mention_users.add(self.local_user) - users = self.test_stream.stream_users(status) + users = self.test_stream.get_audience(status) self.assertTrue(self.local_user in users) self.assertFalse(self.another_user in users) self.assertFalse(self.remote_user in users) - def test_abstractstream_stream_users_followers_remote_user(self, *_): + def test_abstractstream_get_audience_followers_remote_user(self, *_): """ get a list of users that should see a status """ status = models.Status.objects.create( user=self.remote_user, content="hi", privacy="followers", ) - users = self.test_stream.stream_users(status) + users = self.test_stream.get_audience(status) self.assertFalse(users.exists()) - def test_abstractstream_stream_users_followers_self(self, *_): + def test_abstractstream_get_audience_followers_self(self, *_): """ get a list of users that should see a status """ status = models.Comment.objects.create( user=self.local_user, @@ -99,12 +99,12 @@ class Activitystreams(TestCase): privacy="direct", book=self.book, ) - users = self.test_stream.stream_users(status) + users = self.test_stream.get_audience(status) self.assertTrue(self.local_user in users) self.assertFalse(self.another_user in users) self.assertFalse(self.remote_user in users) - def test_abstractstream_stream_users_followers_with_mention(self, *_): + def test_abstractstream_get_audience_followers_with_mention(self, *_): """ get a list of users that should see a status """ status = models.Comment.objects.create( user=self.remote_user, @@ -114,12 +114,12 @@ class Activitystreams(TestCase): ) status.mention_users.add(self.local_user) - users = self.test_stream.stream_users(status) + users = self.test_stream.get_audience(status) self.assertTrue(self.local_user in users) self.assertFalse(self.another_user in users) self.assertFalse(self.remote_user in users) - def test_abstractstream_stream_users_followers_with_relationship(self, *_): + def test_abstractstream_get_audience_followers_with_relationship(self, *_): """ get a list of users that should see a status """ self.remote_user.followers.add(self.local_user) status = models.Comment.objects.create( @@ -128,77 +128,77 @@ class Activitystreams(TestCase): privacy="direct", book=self.book, ) - users = self.test_stream.stream_users(status) + users = self.test_stream.get_audience(status) self.assertFalse(self.local_user in users) self.assertFalse(self.another_user in users) self.assertFalse(self.remote_user in users) - def test_homestream_stream_users(self, *_): + def test_homestream_get_audience(self, *_): """ get a list of users that should see a status """ status = models.Status.objects.create( user=self.remote_user, content="hi", privacy="public" ) - users = activitystreams.HomeStream().stream_users(status) + users = activitystreams.HomeStream().get_audience(status) self.assertFalse(users.exists()) - def test_homestream_stream_users_with_mentions(self, *_): + def test_homestream_get_audience_with_mentions(self, *_): """ get a list of users that should see a status """ status = models.Status.objects.create( user=self.remote_user, content="hi", privacy="public" ) status.mention_users.add(self.local_user) - users = activitystreams.HomeStream().stream_users(status) + users = activitystreams.HomeStream().get_audience(status) self.assertFalse(self.local_user in users) self.assertFalse(self.another_user in users) - def test_homestream_stream_users_with_relationship(self, *_): + def test_homestream_get_audience_with_relationship(self, *_): """ get a list of users that should see a status """ self.remote_user.followers.add(self.local_user) status = models.Status.objects.create( user=self.remote_user, content="hi", privacy="public" ) - users = activitystreams.HomeStream().stream_users(status) + users = activitystreams.HomeStream().get_audience(status) self.assertTrue(self.local_user in users) self.assertFalse(self.another_user in users) - def test_localstream_stream_users_remote_status(self, *_): + def test_localstream_get_audience_remote_status(self, *_): """ get a list of users that should see a status """ status = models.Status.objects.create( user=self.remote_user, content="hi", privacy="public" ) - users = activitystreams.LocalStream().stream_users(status) + users = activitystreams.LocalStream().get_audience(status) self.assertEqual(users, []) - def test_localstream_stream_users_local_status(self, *_): + def test_localstream_get_audience_local_status(self, *_): """ get a list of users that should see a status """ status = models.Status.objects.create( user=self.local_user, content="hi", privacy="public" ) - users = activitystreams.LocalStream().stream_users(status) + users = activitystreams.LocalStream().get_audience(status) self.assertTrue(self.local_user in users) self.assertTrue(self.another_user in users) - def test_localstream_stream_users_unlisted(self, *_): + def test_localstream_get_audience_unlisted(self, *_): """ get a list of users that should see a status """ status = models.Status.objects.create( user=self.local_user, content="hi", privacy="unlisted" ) - users = activitystreams.LocalStream().stream_users(status) + users = activitystreams.LocalStream().get_audience(status) self.assertEqual(users, []) - def test_federatedstream_stream_users(self, *_): + def test_federatedstream_get_audience(self, *_): """ get a list of users that should see a status """ status = models.Status.objects.create( user=self.remote_user, content="hi", privacy="public" ) - users = activitystreams.FederatedStream().stream_users(status) + users = activitystreams.FederatedStream().get_audience(status) self.assertTrue(self.local_user in users) self.assertTrue(self.another_user in users) - def test_federatedstream_stream_users_unlisted(self, *_): + def test_federatedstream_get_audience_unlisted(self, *_): """ get a list of users that should see a status """ status = models.Status.objects.create( user=self.remote_user, content="hi", privacy="unlisted" ) - users = activitystreams.FederatedStream().stream_users(status) + users = activitystreams.FederatedStream().get_audience(status) self.assertEqual(users, []) diff --git a/bookwyrm/tests/test_templatetags.py b/bookwyrm/tests/test_templatetags.py index 61136c2e..b4dc517f 100644 --- a/bookwyrm/tests/test_templatetags.py +++ b/bookwyrm/tests/test_templatetags.py @@ -85,7 +85,9 @@ class TemplateTags(TestCase): second_child = models.Status.objects.create( reply_parent=parent, user=self.user, content="hi" ) - with patch("bookwyrm.activitystreams.ActivityStream.remove_status"): + with patch( + "bookwyrm.activitystreams.ActivityStream.remove_object_from_related_stores" + ): third_child = models.Status.objects.create( reply_parent=parent, user=self.user, diff --git a/bookwyrm/tests/views/test_inbox.py b/bookwyrm/tests/views/test_inbox.py index f44a79c6..44a29a92 100644 --- a/bookwyrm/tests/views/test_inbox.py +++ b/bookwyrm/tests/views/test_inbox.py @@ -444,7 +444,7 @@ class Inbox(TestCase): "object": {"id": self.status.remote_id, "type": "Tombstone"}, } with patch( - "bookwyrm.activitystreams.ActivityStream.remove_status" + "bookwyrm.activitystreams.ActivityStream.remove_object_from_related_stores" ) as redis_mock: views.inbox.activity_task(activity) self.assertTrue(redis_mock.called) @@ -477,7 +477,7 @@ class Inbox(TestCase): "object": {"id": self.status.remote_id, "type": "Tombstone"}, } with patch( - "bookwyrm.activitystreams.ActivityStream.remove_status" + "bookwyrm.activitystreams.ActivityStream.remove_object_from_related_stores" ) as redis_mock: views.inbox.activity_task(activity) self.assertTrue(redis_mock.called) @@ -666,7 +666,7 @@ class Inbox(TestCase): }, } with patch( - "bookwyrm.activitystreams.ActivityStream.remove_status" + "bookwyrm.activitystreams.ActivityStream.remove_object_from_related_stores" ) as redis_mock: views.inbox.activity_task(activity) self.assertTrue(redis_mock.called) diff --git a/bookwyrm/tests/views/test_interaction.py b/bookwyrm/tests/views/test_interaction.py index 297eeb73..8d2c63ff 100644 --- a/bookwyrm/tests/views/test_interaction.py +++ b/bookwyrm/tests/views/test_interaction.py @@ -164,7 +164,7 @@ class InteractionViews(TestCase): self.assertEqual(models.Boost.objects.count(), 1) self.assertEqual(models.Notification.objects.count(), 1) with patch( - "bookwyrm.activitystreams.ActivityStream.remove_status" + "bookwyrm.activitystreams.ActivityStream.remove_object_from_related_stores" ) as redis_mock: view(request, status.id) self.assertTrue(redis_mock.called) diff --git a/bookwyrm/tests/views/test_status.py b/bookwyrm/tests/views/test_status.py index e7fc62d5..5eb13b6b 100644 --- a/bookwyrm/tests/views/test_status.py +++ b/bookwyrm/tests/views/test_status.py @@ -177,7 +177,9 @@ class StatusViews(TestCase): content="hi", book=self.book, user=self.local_user ) - with patch("bookwyrm.activitystreams.ActivityStream.remove_status") as mock: + with patch( + "bookwyrm.activitystreams.ActivityStream.remove_object_from_related_stores" + ) as mock: result = view(request, status.id) self.assertTrue(mock.called) result.render() @@ -196,7 +198,9 @@ class StatusViews(TestCase): book=self.book, rating=2.0, user=self.local_user ) - with patch("bookwyrm.activitystreams.ActivityStream.remove_status") as mock: + with patch( + "bookwyrm.activitystreams.ActivityStream.remove_object_from_related_stores" + ) as mock: result = view(request, status.id) self.assertFalse(mock.called) self.assertEqual(result.status_code, 400) @@ -214,7 +218,9 @@ class StatusViews(TestCase): content="hi", user=self.local_user ) - with patch("bookwyrm.activitystreams.ActivityStream.remove_status") as mock: + with patch( + "bookwyrm.activitystreams.ActivityStream.remove_object_from_related_stores" + ) as mock: result = view(request, status.id) self.assertFalse(mock.called) self.assertEqual(result.status_code, 400) @@ -316,7 +322,7 @@ class StatusViews(TestCase): request.user = self.local_user with patch( - "bookwyrm.activitystreams.ActivityStream.remove_status" + "bookwyrm.activitystreams.ActivityStream.remove_object_from_related_stores" ) as redis_mock: view(request, status.id) self.assertTrue(redis_mock.called) @@ -351,7 +357,7 @@ class StatusViews(TestCase): request.user.is_superuser = True with patch( - "bookwyrm.activitystreams.ActivityStream.remove_status" + "bookwyrm.activitystreams.ActivityStream.remove_object_from_related_stores" ) as redis_mock: view(request, status.id) self.assertTrue(redis_mock.called) diff --git a/bookwyrm/views/feed.py b/bookwyrm/views/feed.py index e4be50e3..cda11586 100644 --- a/bookwyrm/views/feed.py +++ b/bookwyrm/views/feed.py @@ -31,7 +31,6 @@ class Feed(View): tab = "home" activities = activitystreams.streams[tab].get_activity_stream(request.user) - paginated = Paginator(activities, PAGE_LENGTH) suggested_users = get_suggested_users(request.user)