diff --git a/bookwyrm/activitystreams.py b/bookwyrm/activitystreams.py index d811aa49..d9236ec9 100644 --- a/bookwyrm/activitystreams.py +++ b/bookwyrm/activitystreams.py @@ -23,15 +23,21 @@ class ActivityStream(ABC): """ the redis key for this user's unread count for this stream """ return "{}-unread".format(self.stream_id(user)) + def get_value(self, status): # pylint: disable=no-self-use + """ the status id and the rank (ie, published date) """ + return {status.id: status.published_date.timestamp()} + def add_status(self, status): """ add a status to users' feeds """ + value = self.get_value(status) # we want to do this as a bulk operation, hence "pipeline" pipeline = r.pipeline() for user in self.stream_users(status): # add the status to the feed - pipeline.lpush(self.stream_id(user), status.id) - pipeline.ltrim(self.stream_id(user), 0, settings.MAX_STREAM_LENGTH) - + pipeline.zadd(self.stream_id(user), value) + pipeline.zremrangebyrank( + self.stream_id(user), settings.MAX_STREAM_LENGTH, -1 + ) # add to the unread status count pipeline.incr(self.unread_id(user)) # and go! @@ -47,8 +53,13 @@ class ActivityStream(ABC): def add_user_statuses(self, viewer, user): """ add a user's statuses to another user's feed """ pipeline = r.pipeline() - for status in user.status_set.all()[: settings.MAX_STREAM_LENGTH]: - pipeline.lpush(self.stream_id(viewer), status.id) + statuses = user.status_set.all()[: settings.MAX_STREAM_LENGTH] + for status in statuses: + pipeline.zadd(self.stream_id(viewer), self.get_value(status)) + if statuses: + pipeline.zremrangebyrank( + self.stream_id(user), settings.MAX_STREAM_LENGTH, -1 + ) pipeline.execute() def remove_user_statuses(self, viewer, user): @@ -63,7 +74,7 @@ class ActivityStream(ABC): # clear unreads for this feed r.set(self.unread_id(user), 0) - statuses = r.lrange(self.stream_id(user), 0, -1) + statuses = r.zrevrange(self.stream_id(user), 0, -1) return ( models.Status.objects.select_subclasses() .filter(id__in=statuses) @@ -81,7 +92,11 @@ class ActivityStream(ABC): stream_id = self.stream_id(user) for status in statuses.all()[: settings.MAX_STREAM_LENGTH]: - pipeline.lpush(stream_id, status.id) + pipeline.zadd(stream_id, self.get_value(status)) + + # only trim the stream if statuses were added + if statuses.exists(): + pipeline.zremrangebyrank(stream_id, settings.MAX_STREAM_LENGTH, -1) pipeline.execute() def stream_users(self, status): # pylint: disable=no-self-use @@ -271,7 +286,7 @@ def add_statuses_on_unblock(sender, instance, *args, **kwargs): @receiver(signals.post_save, sender=models.User) # pylint: disable=unused-argument -def populate_feed_on_account_create(sender, instance, created, *args, **kwargs): +def populate_streams_on_account_create(sender, instance, created, *args, **kwargs): """ build a user's feeds when they join """ if not created or not instance.local: return diff --git a/bw-dev b/bw-dev index b6406f2c..b9c4b2a1 100755 --- a/bw-dev +++ b/bw-dev @@ -109,7 +109,7 @@ case "$CMD" in black) makeitblack ;; - populate_feeds) + populate_streams) execweb python manage.py populate_streams ;; *)