Merge pull request #2784 from WesleyAC/add-status-cache-get-audience

Only call get_audience once in add_status
This commit is contained in:
Mouse Reeve 2023-04-07 06:43:04 -07:00 committed by GitHub
commit 4e3513bd41
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 48 additions and 41 deletions

View file

@ -38,11 +38,14 @@ class ActivityStream(RedisStore):
def add_status(self, status, increment_unread=False):
"""add a status to users' feeds"""
audience = self.get_audience(status)
# the pipeline contains all the add-to-stream activities
pipeline = self.add_object_to_related_stores(status, execute=False)
pipeline = self.add_object_to_stores(
status, self.get_stores_for_users(audience), execute=False
)
if increment_unread:
for user_id in self.get_audience(status):
for user_id in audience:
# add to the unread status count
pipeline.incr(self.unread_id(user_id))
# add to the unread status count for status type
@ -147,8 +150,9 @@ class ActivityStream(RedisStore):
trace.get_current_span().set_attribute("stream_id", self.key)
return [user.id for user in self._get_audience(status)]
def get_stores_for_object(self, obj):
return [self.stream_id(user_id) for user_id in self.get_audience(obj)]
def get_stores_for_users(self, user_ids):
"""convert a list of user ids into redis store ids"""
return [self.stream_id(user_id) for user_id in user_ids]
def get_statuses_for_user(self, user): # pylint: disable=no-self-use
"""given a user, what statuses should they see on this stream"""
@ -514,7 +518,9 @@ def remove_status_task(status_ids):
for stream in streams.values():
for status in statuses:
stream.remove_object_from_related_stores(status)
stream.remove_object_from_stores(
status, stream.get_stores_for_users(stream.get_audience(status))
)
@app.task(queue=HIGH, ignore_result=True)
@ -563,10 +569,10 @@ def handle_boost_task(boost_id):
for stream in streams.values():
# people who should see the boost (not people who see the original status)
audience = stream.get_stores_for_object(instance)
stream.remove_object_from_related_stores(boosted, stores=audience)
audience = stream.get_stores_for_users(stream.get_audience(instance))
stream.remove_object_from_stores(boosted, audience)
for status in old_versions:
stream.remove_object_from_related_stores(status, stores=audience)
stream.remove_object_from_stores(status, audience)
def get_status_type(status):

View file

@ -24,8 +24,7 @@ class ListsStream(RedisStore):
def add_list(self, book_list):
"""add a list to users' feeds"""
# the pipeline contains all the add-to-stream activities
self.add_object_to_related_stores(book_list)
self.add_object_to_stores(book_list, self.get_stores_for_object(book_list))
def add_user_lists(self, viewer, user):
"""add a user's lists to another user's feed"""
@ -98,6 +97,7 @@ class ListsStream(RedisStore):
return audience.distinct()
def get_stores_for_object(self, obj):
"""the stores that an object belongs in"""
return [self.stream_id(u) for u in self.get_audience(obj)]
def get_lists_for_user(self, user): # pylint: disable=no-self-use
@ -233,7 +233,7 @@ def remove_list_task(list_id, re_add=False):
# delete for every store
stores = [ListsStream().stream_id(idx) for idx in stores]
ListsStream().remove_object_from_related_stores(list_id, stores=stores)
ListsStream().remove_object_from_stores(list_id, stores)
if re_add:
add_list_task.delay(list_id)

View file

@ -16,12 +16,12 @@ class RedisStore(ABC):
"""the object and rank"""
return {obj.id: self.get_rank(obj)}
def add_object_to_related_stores(self, obj, execute=True):
"""add an object to all suitable stores"""
def add_object_to_stores(self, obj, stores, execute=True):
"""add an object to a given set of stores"""
value = self.get_value(obj)
# we want to do this as a bulk operation, hence "pipeline"
pipeline = r.pipeline()
for store in self.get_stores_for_object(obj):
for store in stores:
# add the status to the feed
pipeline.zadd(store, value)
# trim the store
@ -32,14 +32,14 @@ class RedisStore(ABC):
# and go!
return pipeline.execute()
def remove_object_from_related_stores(self, obj, stores=None):
# pylint: disable=no-self-use
def remove_object_from_stores(self, obj, stores):
"""remove an object from all stores"""
# if the stores are provided, the object can just be an id
if stores and isinstance(obj, int):
obj_id = obj
else:
obj_id = obj.id
stores = self.get_stores_for_object(obj) if stores is None else stores
pipeline = r.pipeline()
for store in stores:
pipeline.zrem(store, -1, obj_id)
@ -82,10 +82,6 @@ class RedisStore(ABC):
def get_objects_for_store(self, store):
"""a queryset of what should go in a store, used for populating it"""
@abstractmethod
def get_stores_for_object(self, obj):
"""the stores that an object belongs in"""
@abstractmethod
def get_rank(self, obj):
"""how to rank an object"""

View file

@ -52,6 +52,7 @@ class SuggestedUsers(RedisStore):
)
def get_stores_for_object(self, obj):
"""the stores that an object belongs in"""
return [self.store_id(u) for u in self.get_users_for_object(obj)]
def get_users_for_object(self, obj): # pylint: disable=no-self-use
@ -260,7 +261,9 @@ def rerank_user_task(user_id, update_only=False):
def remove_user_task(user_id):
"""do the hard work in celery"""
user = models.User.objects.get(id=user_id)
suggested_users.remove_object_from_related_stores(user)
suggested_users.remove_object_from_stores(
user, suggested_users.get_stores_for_object(user)
)
@app.task(queue=MEDIUM, ignore_result=True)
@ -274,7 +277,9 @@ def remove_suggestion_task(user_id, suggested_user_id):
def bulk_remove_instance_task(instance_id):
"""remove a bunch of users from recs"""
for user in models.User.objects.filter(federated_server__id=instance_id):
suggested_users.remove_object_from_related_stores(user)
suggested_users.remove_object_from_stores(
user, suggested_users.get_stores_for_object(user)
)
@app.task(queue=LOW, ignore_result=True)

View file

@ -75,7 +75,7 @@ class Activitystreams(TestCase):
def test_remove_status_task(self):
"""remove a status from all streams"""
with patch(
"bookwyrm.activitystreams.ActivityStream.remove_object_from_related_stores"
"bookwyrm.activitystreams.ActivityStream.remove_object_from_stores"
) as mock:
activitystreams.remove_status_task(self.status.id)
self.assertEqual(mock.call_count, 3)
@ -132,8 +132,8 @@ class Activitystreams(TestCase):
self.assertEqual(args[0], self.local_user)
self.assertEqual(args[1], self.another_user)
@patch("bookwyrm.activitystreams.LocalStream.remove_object_from_related_stores")
@patch("bookwyrm.activitystreams.BooksStream.remove_object_from_related_stores")
@patch("bookwyrm.activitystreams.LocalStream.remove_object_from_stores")
@patch("bookwyrm.activitystreams.BooksStream.remove_object_from_stores")
@patch("bookwyrm.models.activitypub_mixin.broadcast_task.apply_async")
def test_boost_to_another_timeline(self, *_):
"""boost from a non-follower doesn't remove original status from feed"""
@ -144,7 +144,7 @@ class Activitystreams(TestCase):
user=self.another_user,
)
with patch(
"bookwyrm.activitystreams.HomeStream.remove_object_from_related_stores"
"bookwyrm.activitystreams.HomeStream.remove_object_from_stores"
) as mock:
activitystreams.handle_boost_task(boost.id)
@ -152,10 +152,10 @@ class Activitystreams(TestCase):
self.assertEqual(mock.call_count, 1)
call_args = mock.call_args
self.assertEqual(call_args[0][0], status)
self.assertEqual(call_args[1]["stores"], [f"{self.another_user.id}-home"])
self.assertEqual(call_args[0][1], [f"{self.another_user.id}-home"])
@patch("bookwyrm.activitystreams.LocalStream.remove_object_from_related_stores")
@patch("bookwyrm.activitystreams.BooksStream.remove_object_from_related_stores")
@patch("bookwyrm.activitystreams.LocalStream.remove_object_from_stores")
@patch("bookwyrm.activitystreams.BooksStream.remove_object_from_stores")
@patch("bookwyrm.models.activitypub_mixin.broadcast_task.apply_async")
def test_boost_to_another_timeline_remote(self, *_):
"""boost from a remote non-follower doesn't remove original status from feed"""
@ -166,7 +166,7 @@ class Activitystreams(TestCase):
user=self.remote_user,
)
with patch(
"bookwyrm.activitystreams.HomeStream.remove_object_from_related_stores"
"bookwyrm.activitystreams.HomeStream.remove_object_from_stores"
) as mock:
activitystreams.handle_boost_task(boost.id)
@ -174,10 +174,10 @@ class Activitystreams(TestCase):
self.assertEqual(mock.call_count, 1)
call_args = mock.call_args
self.assertEqual(call_args[0][0], status)
self.assertEqual(call_args[1]["stores"], [])
self.assertEqual(call_args[0][1], [])
@patch("bookwyrm.activitystreams.LocalStream.remove_object_from_related_stores")
@patch("bookwyrm.activitystreams.BooksStream.remove_object_from_related_stores")
@patch("bookwyrm.activitystreams.LocalStream.remove_object_from_stores")
@patch("bookwyrm.activitystreams.BooksStream.remove_object_from_stores")
@patch("bookwyrm.models.activitypub_mixin.broadcast_task.apply_async")
def test_boost_to_following_timeline(self, *_):
"""add a boost and deduplicate the boosted status on the timeline"""
@ -189,17 +189,17 @@ class Activitystreams(TestCase):
user=self.another_user,
)
with patch(
"bookwyrm.activitystreams.HomeStream.remove_object_from_related_stores"
"bookwyrm.activitystreams.HomeStream.remove_object_from_stores"
) as mock:
activitystreams.handle_boost_task(boost.id)
self.assertTrue(mock.called)
call_args = mock.call_args
self.assertEqual(call_args[0][0], status)
self.assertTrue(f"{self.another_user.id}-home" in call_args[1]["stores"])
self.assertTrue(f"{self.local_user.id}-home" in call_args[1]["stores"])
self.assertTrue(f"{self.another_user.id}-home" in call_args[0][1])
self.assertTrue(f"{self.local_user.id}-home" in call_args[0][1])
@patch("bookwyrm.activitystreams.LocalStream.remove_object_from_related_stores")
@patch("bookwyrm.activitystreams.BooksStream.remove_object_from_related_stores")
@patch("bookwyrm.activitystreams.LocalStream.remove_object_from_stores")
@patch("bookwyrm.activitystreams.BooksStream.remove_object_from_stores")
@patch("bookwyrm.models.activitypub_mixin.broadcast_task.apply_async")
def test_boost_to_same_timeline(self, *_):
"""add a boost and deduplicate the boosted status on the timeline"""
@ -210,10 +210,10 @@ class Activitystreams(TestCase):
user=self.local_user,
)
with patch(
"bookwyrm.activitystreams.HomeStream.remove_object_from_related_stores"
"bookwyrm.activitystreams.HomeStream.remove_object_from_stores"
) as mock:
activitystreams.handle_boost_task(boost.id)
self.assertTrue(mock.called)
call_args = mock.call_args
self.assertEqual(call_args[0][0], status)
self.assertEqual(call_args[1]["stores"], [f"{self.local_user.id}-home"])
self.assertEqual(call_args[0][1], [f"{self.local_user.id}-home"])

View file

@ -59,7 +59,7 @@ class Activitystreams(TestCase):
def test_remove_list_task(self, *_):
"""remove a list from all streams"""
with patch(
"bookwyrm.lists_stream.ListsStream.remove_object_from_related_stores"
"bookwyrm.lists_stream.ListsStream.remove_object_from_stores"
) as mock:
lists_stream.remove_list_task(self.list.id)
self.assertEqual(mock.call_count, 1)