Merge pull request #855 from mouse-reeve/activitystream-sort

Use sorted set for activitystreams
This commit is contained in:
Mouse Reeve 2021-04-03 11:33:56 -07:00 committed by GitHub
commit 99185bbb3a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 24 additions and 9 deletions

View file

@ -23,15 +23,21 @@ class ActivityStream(ABC):
""" the redis key for this user's unread count for this stream """
return "{}-unread".format(self.stream_id(user))
def get_value(self, status): # pylint: disable=no-self-use
""" the status id and the rank (ie, published date) """
return {status.id: status.published_date.timestamp()}
def add_status(self, status):
""" add a status to users' feeds """
value = self.get_value(status)
# we want to do this as a bulk operation, hence "pipeline"
pipeline = r.pipeline()
for user in self.stream_users(status):
# add the status to the feed
pipeline.lpush(self.stream_id(user), status.id)
pipeline.ltrim(self.stream_id(user), 0, settings.MAX_STREAM_LENGTH)
pipeline.zadd(self.stream_id(user), value)
pipeline.zremrangebyrank(
self.stream_id(user), settings.MAX_STREAM_LENGTH, -1
)
# add to the unread status count
pipeline.incr(self.unread_id(user))
# and go!
@ -47,8 +53,13 @@ class ActivityStream(ABC):
def add_user_statuses(self, viewer, user):
""" add a user's statuses to another user's feed """
pipeline = r.pipeline()
for status in user.status_set.all()[: settings.MAX_STREAM_LENGTH]:
pipeline.lpush(self.stream_id(viewer), status.id)
statuses = user.status_set.all()[: settings.MAX_STREAM_LENGTH]
for status in statuses:
pipeline.zadd(self.stream_id(viewer), self.get_value(status))
if statuses:
pipeline.zremrangebyrank(
self.stream_id(user), settings.MAX_STREAM_LENGTH, -1
)
pipeline.execute()
def remove_user_statuses(self, viewer, user):
@ -63,7 +74,7 @@ class ActivityStream(ABC):
# clear unreads for this feed
r.set(self.unread_id(user), 0)
statuses = r.lrange(self.stream_id(user), 0, -1)
statuses = r.zrevrange(self.stream_id(user), 0, -1)
return (
models.Status.objects.select_subclasses()
.filter(id__in=statuses)
@ -81,7 +92,11 @@ class ActivityStream(ABC):
stream_id = self.stream_id(user)
for status in statuses.all()[: settings.MAX_STREAM_LENGTH]:
pipeline.lpush(stream_id, status.id)
pipeline.zadd(stream_id, self.get_value(status))
# only trim the stream if statuses were added
if statuses.exists():
pipeline.zremrangebyrank(stream_id, settings.MAX_STREAM_LENGTH, -1)
pipeline.execute()
def stream_users(self, status): # pylint: disable=no-self-use
@ -271,7 +286,7 @@ def add_statuses_on_unblock(sender, instance, *args, **kwargs):
@receiver(signals.post_save, sender=models.User)
# pylint: disable=unused-argument
def populate_feed_on_account_create(sender, instance, created, *args, **kwargs):
def populate_streams_on_account_create(sender, instance, created, *args, **kwargs):
""" build a user's feeds when they join """
if not created or not instance.local:
return

2
bw-dev
View file

@ -109,7 +109,7 @@ case "$CMD" in
black)
makeitblack
;;
populate_feeds)
populate_streams)
execweb python manage.py populate_streams
;;
*)