From 7f483af8d35369d8871c7a0449179f688e52f0f6 Mon Sep 17 00:00:00 2001 From: Andrew Godwin Date: Fri, 7 Jul 2023 15:14:06 -0600 Subject: [PATCH] Rework Stator to use a next field and no async --- activities/admin.py | 6 +- .../migrations/0017_stator_next_change.py | 234 +++++++++++++++ activities/models/emoji.py | 4 +- activities/models/fan_out.py | 140 +++++---- activities/models/hashtag.py | 6 - activities/models/post.py | 57 ++-- activities/models/post_interaction.py | 4 +- api/views/instance.py | 16 +- core/exceptions.py | 4 - docs/releases/0.10.rst | 40 +++ setup.cfg | 4 + stator/graph.py | 9 + stator/management/commands/runstator.py | 3 +- stator/models.py | 211 ++++++-------- stator/runner.py | 252 +++++++++------- takahe/__init__.py | 2 +- tests/activities/models/test_post.py | 18 +- tests/activities/models/test_post_targets.py | 24 +- .../activities/models/test_timeline_event.py | 37 +-- tests/users/models/test_follow.py | 4 +- tests/users/views/test_auth.py | 2 +- tests/users/views/test_import_export.py | 2 +- users/migrations/0019_stator_next_change.py | 271 ++++++++++++++++++ users/migrations/0020_alter_identity_local.py | 18 ++ users/models/block.py | 2 +- users/models/follow.py | 2 +- users/models/identity.py | 4 +- users/models/inbox_message.py | 89 +++--- 28 files changed, 1004 insertions(+), 461 deletions(-) create mode 100644 activities/migrations/0017_stator_next_change.py create mode 100644 docs/releases/0.10.rst create mode 100644 users/migrations/0019_stator_next_change.py create mode 100644 users/migrations/0020_alter_identity_local.py diff --git a/activities/admin.py b/activities/admin.py index 24ef6e5..a00fcee 100644 --- a/activities/admin.py +++ b/activities/admin.py @@ -210,8 +210,8 @@ class TimelineEventAdmin(admin.ModelAdmin): @admin.register(FanOut) class FanOutAdmin(admin.ModelAdmin): - list_display = ["id", "state", "created", "state_attempted", "type", "identity"] - list_filter = (IdentityLocalFilter, "type", "state", "state_attempted") + list_display = ["id", "state", "created", "state_next_attempt", "type", "identity"] + list_filter = (IdentityLocalFilter, "type", "state") raw_id_fields = ["subject_post", "subject_post_interaction"] autocomplete_fields = ["identity"] readonly_fields = ["created", "updated", "state_changed"] @@ -229,7 +229,7 @@ class FanOutAdmin(admin.ModelAdmin): @admin.register(PostInteraction) class PostInteractionAdmin(admin.ModelAdmin): - list_display = ["id", "state", "state_attempted", "type", "identity", "post"] + list_display = ["id", "state", "state_next_attempt", "type", "identity", "post"] list_filter = (IdentityLocalFilter, "type", "state") raw_id_fields = ["post"] autocomplete_fields = ["identity"] diff --git a/activities/migrations/0017_stator_next_change.py b/activities/migrations/0017_stator_next_change.py new file mode 100644 index 0000000..fe39802 --- /dev/null +++ b/activities/migrations/0017_stator_next_change.py @@ -0,0 +1,234 @@ +# Generated by Django 4.2.1 on 2023-07-05 22:18 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + dependencies = [ + ("activities", "0016_index_together_migration"), + ] + + operations = [ + migrations.RemoveIndex( + model_name="emoji", + name="activities__state_r_aa72ec_idx", + ), + migrations.RemoveIndex( + model_name="emoji", + name="ix_emoji_state_attempted", + ), + migrations.RemoveIndex( + model_name="emoji", + name="ix_emoji_state_locked", + ), + migrations.RemoveIndex( + model_name="fanout", + name="ix_fanout_state_attempted", + ), + migrations.RemoveIndex( + model_name="fanout", + name="ix_fanout_state_locked", + ), + migrations.RemoveIndex( + model_name="fanout", + name="activities__state_r_aae3b4_idx", + ), + migrations.RemoveIndex( + model_name="hashtag", + name="ix_hashtag_state_attempted", + ), + migrations.RemoveIndex( + model_name="hashtag", + name="ix_hashtag_state_locked", + ), + migrations.RemoveIndex( + model_name="hashtag", + name="activities__state_r_5703be_idx", + ), + migrations.RemoveIndex( + model_name="post", + name="ix_post_state_attempted", + ), + migrations.RemoveIndex( + model_name="post", + name="ix_post_state_locked", + ), + migrations.RemoveIndex( + model_name="post", + name="activities__state_r_b8f1ff_idx", + ), + migrations.RemoveIndex( + model_name="postattachment", + name="ix_postattachm_state_attempted", + ), + migrations.RemoveIndex( + model_name="postattachment", + name="ix_postattachm_state_locked", + ), + migrations.RemoveIndex( + model_name="postattachment", + name="activities__state_r_4e981c_idx", + ), + migrations.RemoveIndex( + model_name="postinteraction", + name="activities__state_r_981d8c_idx", + ), + migrations.RemoveIndex( + model_name="postinteraction", + name="ix_postinterac_state_attempted", + ), + migrations.RemoveIndex( + model_name="postinteraction", + name="ix_postinterac_state_locked", + ), + migrations.RemoveField( + model_name="emoji", + name="state_attempted", + ), + migrations.RemoveField( + model_name="emoji", + name="state_ready", + ), + migrations.RemoveField( + model_name="fanout", + name="state_attempted", + ), + migrations.RemoveField( + model_name="fanout", + name="state_ready", + ), + migrations.RemoveField( + model_name="hashtag", + name="state_attempted", + ), + migrations.RemoveField( + model_name="hashtag", + name="state_ready", + ), + migrations.RemoveField( + model_name="post", + name="state_attempted", + ), + migrations.RemoveField( + model_name="post", + name="state_ready", + ), + migrations.RemoveField( + model_name="postattachment", + name="state_attempted", + ), + migrations.RemoveField( + model_name="postattachment", + name="state_ready", + ), + migrations.RemoveField( + model_name="postinteraction", + name="state_attempted", + ), + migrations.RemoveField( + model_name="postinteraction", + name="state_ready", + ), + migrations.AddField( + model_name="emoji", + name="state_next_attempt", + field=models.DateTimeField(blank=True, null=True), + ), + migrations.AddField( + model_name="fanout", + name="state_next_attempt", + field=models.DateTimeField(blank=True, null=True), + ), + migrations.AddField( + model_name="hashtag", + name="state_next_attempt", + field=models.DateTimeField(blank=True, null=True), + ), + migrations.AddField( + model_name="post", + name="state_next_attempt", + field=models.DateTimeField(blank=True, null=True), + ), + migrations.AddField( + model_name="postattachment", + name="state_next_attempt", + field=models.DateTimeField(blank=True, null=True), + ), + migrations.AddField( + model_name="postinteraction", + name="state_next_attempt", + field=models.DateTimeField(blank=True, null=True), + ), + migrations.AlterField( + model_name="emoji", + name="state_locked_until", + field=models.DateTimeField(blank=True, db_index=True, null=True), + ), + migrations.AlterField( + model_name="fanout", + name="state_locked_until", + field=models.DateTimeField(blank=True, db_index=True, null=True), + ), + migrations.AlterField( + model_name="hashtag", + name="state_locked_until", + field=models.DateTimeField(blank=True, db_index=True, null=True), + ), + migrations.AlterField( + model_name="post", + name="state_locked_until", + field=models.DateTimeField(blank=True, db_index=True, null=True), + ), + migrations.AlterField( + model_name="postattachment", + name="state_locked_until", + field=models.DateTimeField(blank=True, db_index=True, null=True), + ), + migrations.AlterField( + model_name="postinteraction", + name="state_locked_until", + field=models.DateTimeField(blank=True, db_index=True, null=True), + ), + migrations.AddIndex( + model_name="emoji", + index=models.Index( + fields=["state", "state_next_attempt", "state_locked_until"], + name="ix_emoji_state_next", + ), + ), + migrations.AddIndex( + model_name="fanout", + index=models.Index( + fields=["state", "state_next_attempt", "state_locked_until"], + name="ix_fanout_state_next", + ), + ), + migrations.AddIndex( + model_name="hashtag", + index=models.Index( + fields=["state", "state_next_attempt", "state_locked_until"], + name="ix_hashtag_state_next", + ), + ), + migrations.AddIndex( + model_name="post", + index=models.Index( + fields=["state", "state_next_attempt", "state_locked_until"], + name="ix_post_state_next", + ), + ), + migrations.AddIndex( + model_name="postattachment", + index=models.Index( + fields=["state", "state_next_attempt", "state_locked_until"], + name="ix_postattachm_state_next", + ), + ), + migrations.AddIndex( + model_name="postinteraction", + index=models.Index( + fields=["state", "state_next_attempt", "state_locked_until"], + name="ix_postinterac_state_next", + ), + ), + ] diff --git a/activities/models/emoji.py b/activities/models/emoji.py index 57d8707..c1ba84b 100644 --- a/activities/models/emoji.py +++ b/activities/models/emoji.py @@ -127,7 +127,7 @@ class Emoji(StatorModel): class Meta: unique_together = ("domain", "shortcode") - indexes = StatorModel.Meta.indexes + indexes: list = [] # We need this so Stator can add its own class urls(urlman.Urls): admin = "/admin/emoji/" @@ -314,11 +314,11 @@ class Emoji(StatorModel): emoji.remote_url = icon["url"] emoji.mimetype = mimetype emoji.category = category - emoji.transition_set_state("outdated") if emoji.file: emoji.file.delete(save=True) else: emoji.save() + emoji.transition_perform("outdated") return emoji emoji = cls.objects.create( diff --git a/activities/models/fan_out.py b/activities/models/fan_out.py index 9bdc2fd..308664b 100644 --- a/activities/models/fan_out.py +++ b/activities/models/fan_out.py @@ -1,5 +1,5 @@ import httpx -from asgiref.sync import sync_to_async +from asgiref.sync import async_to_sync from django.db import models from activities.models.timeline_event import TimelineEvent @@ -19,26 +19,24 @@ class FanOutStates(StateGraph): new.times_out_to(failed, seconds=86400 * 3) @classmethod - async def handle_new(cls, instance: "FanOut"): + def handle_new(cls, instance: "FanOut"): """ Sends the fan-out to the right inbox. """ - fan_out = await instance.afetch_full() - # Don't try to fan out to identities that are not fetched yet - if not (fan_out.identity.local or fan_out.identity.inbox_uri): + if not (instance.identity.local or instance.identity.inbox_uri): return - match (fan_out.type, fan_out.identity.local): + match (instance.type, instance.identity.local): # Handle creating/updating local posts case ((FanOut.Types.post | FanOut.Types.post_edited), True): - post = await fan_out.subject_post.afetch_full() + post = instance.subject_post # If the author of the post is blocked or muted, skip out if ( - await Block.objects.active() - .filter(source=fan_out.identity, target=post.author) - .aexists() + Block.objects.active() + .filter(source=instance.identity, target=post.author) + .exists() ): return cls.skipped # Make a timeline event directly @@ -48,42 +46,42 @@ class FanOutStates(StateGraph): add = True mentioned = {identity.id for identity in post.mentions.all()} if post.in_reply_to: - followed = await sync_to_async(set)( - fan_out.identity.outbound_follows.filter( + followed = set( + instance.identity.outbound_follows.filter( state__in=FollowStates.group_active() ).values_list("target_id", flat=True) ) interested_in = followed.union( - {post.author_id, fan_out.identity_id} + {post.author_id, instance.identity_id} ) add = (post.author_id in followed) and ( bool(mentioned.intersection(interested_in)) ) if add: - await sync_to_async(TimelineEvent.add_post)( - identity=fan_out.identity, + TimelineEvent.add_post( + identity=instance.identity, post=post, ) # We might have been mentioned if ( - fan_out.identity.id in mentioned - and fan_out.identity_id != post.author_id + instance.identity.id in mentioned + and instance.identity_id != post.author_id ): - await sync_to_async(TimelineEvent.add_mentioned)( - identity=fan_out.identity, + TimelineEvent.add_mentioned( + identity=instance.identity, post=post, ) # Handle sending remote posts create case (FanOut.Types.post, False): - post = await fan_out.subject_post.afetch_full() + post = instance.subject_post # Sign it and send it try: - await post.author.signed_request( + async_to_sync(post.author.signed_request)( method="post", uri=( - fan_out.identity.shared_inbox_uri - or fan_out.identity.inbox_uri + instance.identity.shared_inbox_uri + or instance.identity.inbox_uri ), body=canonicalise(post.to_create_ap()), ) @@ -92,14 +90,14 @@ class FanOutStates(StateGraph): # Handle sending remote posts update case (FanOut.Types.post_edited, False): - post = await fan_out.subject_post.afetch_full() + post = instance.subject_post # Sign it and send it try: - await post.author.signed_request( + async_to_sync(post.author.signed_request)( method="post", uri=( - fan_out.identity.shared_inbox_uri - or fan_out.identity.inbox_uri + instance.identity.shared_inbox_uri + or instance.identity.inbox_uri ), body=canonicalise(post.to_update_ap()), ) @@ -108,24 +106,24 @@ class FanOutStates(StateGraph): # Handle deleting local posts case (FanOut.Types.post_deleted, True): - post = await fan_out.subject_post.afetch_full() - if fan_out.identity.local: + post = instance.subject_post + if instance.identity.local: # Remove all timeline events mentioning it - await TimelineEvent.objects.filter( - identity=fan_out.identity, + TimelineEvent.objects.filter( + identity=instance.identity, subject_post=post, - ).adelete() + ).delete() # Handle sending remote post deletes case (FanOut.Types.post_deleted, False): - post = await fan_out.subject_post.afetch_full() + post = instance.subject_post # Send it to the remote inbox try: - await post.author.signed_request( + async_to_sync(post.author.signed_request)( method="post", uri=( - fan_out.identity.shared_inbox_uri - or fan_out.identity.inbox_uri + instance.identity.shared_inbox_uri + or instance.identity.inbox_uri ), body=canonicalise(post.to_delete_ap()), ) @@ -134,38 +132,38 @@ class FanOutStates(StateGraph): # Handle local boosts/likes case (FanOut.Types.interaction, True): - interaction = await fan_out.subject_post_interaction.afetch_full() + interaction = instance.subject_post_interaction # If the author of the interaction is blocked or their notifications # are muted, skip out if ( - await Block.objects.active() + Block.objects.active() .filter( models.Q(mute=False) | models.Q(include_notifications=True), - source=fan_out.identity, + source=instance.identity, target=interaction.identity, ) - .aexists() + .exists() ): return cls.skipped # If blocked/muted the underlying post author, skip out if ( - await Block.objects.active() + Block.objects.active() .filter( - source=fan_out.identity, + source=instance.identity, target_id=interaction.post.author_id, ) - .aexists() + .exists() ): return cls.skipped # Make a timeline event directly - await sync_to_async(TimelineEvent.add_post_interaction)( - identity=fan_out.identity, + TimelineEvent.add_post_interaction( + identity=instance.identity, interaction=interaction, ) # Handle sending remote boosts/likes/votes/pins case (FanOut.Types.interaction, False): - interaction = await fan_out.subject_post_interaction.afetch_full() + interaction = instance.subject_post_interaction # Send it to the remote inbox try: if interaction.type == interaction.Types.vote: @@ -174,11 +172,11 @@ class FanOutStates(StateGraph): body = interaction.to_add_ap() else: body = interaction.to_create_ap() - await interaction.identity.signed_request( + async_to_sync(interaction.identity.signed_request)( method="post", uri=( - fan_out.identity.shared_inbox_uri - or fan_out.identity.inbox_uri + instance.identity.shared_inbox_uri + or instance.identity.inbox_uri ), body=canonicalise(body), ) @@ -187,28 +185,28 @@ class FanOutStates(StateGraph): # Handle undoing local boosts/likes case (FanOut.Types.undo_interaction, True): # noqa:F841 - interaction = await fan_out.subject_post_interaction.afetch_full() + interaction = instance.subject_post_interaction # Delete any local timeline events - await sync_to_async(TimelineEvent.delete_post_interaction)( - identity=fan_out.identity, + TimelineEvent.delete_post_interaction( + identity=instance.identity, interaction=interaction, ) # Handle sending remote undoing boosts/likes/pins case (FanOut.Types.undo_interaction, False): # noqa:F841 - interaction = await fan_out.subject_post_interaction.afetch_full() + interaction = instance.subject_post_interaction # Send an undo to the remote inbox try: if interaction.type == interaction.Types.pin: body = interaction.to_remove_ap() else: body = interaction.to_undo_ap() - await interaction.identity.signed_request( + async_to_sync(interaction.identity.signed_request)( method="post", uri=( - fan_out.identity.shared_inbox_uri - or fan_out.identity.inbox_uri + instance.identity.shared_inbox_uri + or instance.identity.inbox_uri ), body=canonicalise(body), ) @@ -217,32 +215,30 @@ class FanOutStates(StateGraph): # Handle sending identity edited to remote case (FanOut.Types.identity_edited, False): - identity = await fan_out.subject_identity.afetch_full() + identity = instance.subject_identity try: - await identity.signed_request( + async_to_sync(identity.signed_request)( method="post", uri=( - fan_out.identity.shared_inbox_uri - or fan_out.identity.inbox_uri - ), - body=canonicalise( - await sync_to_async(fan_out.subject_identity.to_update_ap)() + instance.identity.shared_inbox_uri + or instance.identity.inbox_uri ), + body=canonicalise(instance.subject_identity.to_update_ap()), ) except httpx.RequestError: return # Handle sending identity deleted to remote case (FanOut.Types.identity_deleted, False): - identity = await fan_out.subject_identity.afetch_full() + identity = instance.subject_identity try: - await identity.signed_request( + async_to_sync(identity.signed_request)( method="post", uri=( - fan_out.identity.shared_inbox_uri - or fan_out.identity.inbox_uri + instance.identity.shared_inbox_uri + or instance.identity.inbox_uri ), - body=canonicalise(fan_out.subject_identity.to_delete_ap()), + body=canonicalise(instance.subject_identity.to_delete_ap()), ) except httpx.RequestError: return @@ -255,14 +251,14 @@ class FanOutStates(StateGraph): # Created identities make a timeline event case (FanOut.Types.identity_created, True): - await sync_to_async(TimelineEvent.add_identity_created)( - identity=fan_out.identity, - new_identity=fan_out.subject_identity, + TimelineEvent.add_identity_created( + identity=instance.identity, + new_identity=instance.subject_identity, ) case _: raise ValueError( - f"Cannot fan out with type {fan_out.type} local={fan_out.identity.local}" + f"Cannot fan out with type {instance.type} local={instance.identity.local}" ) return cls.sent diff --git a/activities/models/hashtag.py b/activities/models/hashtag.py index 894973c..7903a7b 100644 --- a/activities/models/hashtag.py +++ b/activities/models/hashtag.py @@ -22,12 +22,8 @@ class HashtagStates(StateGraph): """ Computes the stats and other things for a Hashtag """ - from time import time - from .post import Post - start = time() - posts_query = Post.objects.local_public().tagged_with(instance) total = await posts_query.acount() @@ -57,7 +53,6 @@ class HashtagStates(StateGraph): instance.stats_updated = timezone.now() await sync_to_async(instance.save)() - print(f"Updated hashtag {instance.hashtag} in {time() - start:.5f} seconds") return cls.updated @@ -86,7 +81,6 @@ class HashtagManager(models.Manager): class Hashtag(StatorModel): - MAXIMUM_LENGTH = 100 # Normalized hashtag without the '#' diff --git a/activities/models/post.py b/activities/models/post.py index ae856b8..e03d8d7 100644 --- a/activities/models/post.py +++ b/activities/models/post.py @@ -8,7 +8,7 @@ from urllib.parse import urlparse import httpx import urlman -from asgiref.sync import async_to_sync, sync_to_async +from asgiref.sync import async_to_sync from django.contrib.postgres.indexes import GinIndex from django.contrib.postgres.search import SearchVector from django.db import models, transaction @@ -63,45 +63,44 @@ class PostStates(StateGraph): edited_fanned_out.transitions_to(deleted) @classmethod - async def targets_fan_out(cls, post: "Post", type_: str) -> None: + def targets_fan_out(cls, post: "Post", type_: str) -> None: # Fan out to each target - for follow in await post.aget_targets(): - await FanOut.objects.acreate( + for follow in post.get_targets(): + FanOut.objects.create( identity=follow, type=type_, subject_post=post, ) @classmethod - async def handle_new(cls, instance: "Post"): + def handle_new(cls, instance: "Post"): """ Creates all needed fan-out objects for a new Post. """ - post = await instance.afetch_full() # Only fan out if the post was published in the last day or it's local # (we don't want to fan out anything older that that which is remote) - if post.local or (timezone.now() - post.published) < datetime.timedelta(days=1): - await cls.targets_fan_out(post, FanOut.Types.post) - await post.ensure_hashtags() + if instance.local or (timezone.now() - instance.published) < datetime.timedelta( + days=1 + ): + cls.targets_fan_out(instance, FanOut.Types.post) + instance.ensure_hashtags() return cls.fanned_out @classmethod - async def handle_deleted(cls, instance: "Post"): + def handle_deleted(cls, instance: "Post"): """ Creates all needed fan-out objects needed to delete a Post. """ - post = await instance.afetch_full() - await cls.targets_fan_out(post, FanOut.Types.post_deleted) + cls.targets_fan_out(instance, FanOut.Types.post_deleted) return cls.deleted_fanned_out @classmethod - async def handle_edited(cls, instance: "Post"): + def handle_edited(cls, instance: "Post"): """ Creates all needed fan-out objects for an edited Post. """ - post = await instance.afetch_full() - await cls.targets_fan_out(post, FanOut.Types.post_edited) - await post.ensure_hashtags() + cls.targets_fan_out(instance, FanOut.Types.post_edited) + instance.ensure_hashtags() return cls.edited_fanned_out @@ -324,7 +323,7 @@ class Post(StatorModel): fields=["visibility", "local", "created"], name="ix_post_local_public_created", ), - ] + StatorModel.Meta.indexes + ] class urls(urlman.Urls): view = "{self.author.urls.view}posts/{self.id}/" @@ -375,8 +374,6 @@ class Post(StatorModel): .first() ) - ain_reply_to_post = sync_to_async(in_reply_to_post) - ### Content cleanup and extraction ### def clean_type_data(self, value): PostTypeData.parse_obj(value) @@ -552,6 +549,8 @@ class Post(StatorModel): attachment.name = attrs.description attachment.save() + self.transition_perform(PostStates.edited) + @classmethod def mentions_from_content(cls, content, author) -> set[Identity]: mention_hits = FediverseHtmlParser(content, find_mentions=True).mentions @@ -572,7 +571,7 @@ class Post(StatorModel): mentions.add(identity) return mentions - async def ensure_hashtags(self) -> None: + def ensure_hashtags(self) -> None: """ Ensure any of the already parsed hashtags from this Post have a corresponding Hashtag record. @@ -580,10 +579,10 @@ class Post(StatorModel): # Ensure hashtags if self.hashtags: for hashtag in self.hashtags: - tag, _ = await Hashtag.objects.aget_or_create( + tag, _ = Hashtag.objects.get_or_create( hashtag=hashtag[: Hashtag.MAXIMUM_LENGTH], ) - await tag.atransition_perform(HashtagStates.outdated) + tag.transition_perform(HashtagStates.outdated) def calculate_stats(self, save=True): """ @@ -739,33 +738,33 @@ class Post(StatorModel): "object": object, } - async def aget_targets(self) -> Iterable[Identity]: + def get_targets(self) -> Iterable[Identity]: """ Returns a list of Identities that need to see posts and their changes """ targets = set() - async for mention in self.mentions.all(): + for mention in self.mentions.all(): targets.add(mention) # Then, if it's not mentions only, also deliver to followers and all hashtag followers if self.visibility != Post.Visibilities.mentioned: - async for follower in self.author.inbound_follows.filter( + for follower in self.author.inbound_follows.filter( state__in=FollowStates.group_active() ).select_related("source"): targets.add(follower.source) if self.hashtags: - async for follow in HashtagFollow.objects.by_hashtags( + for follow in HashtagFollow.objects.by_hashtags( self.hashtags ).prefetch_related("identity"): targets.add(follow.identity) # If it's a reply, always include the original author if we know them - reply_post = await self.ain_reply_to_post() + reply_post = self.in_reply_to_post() if reply_post: targets.add(reply_post.author) # And if it's a reply to one of our own, we have to re-fan-out to # the original author's followers if reply_post.author.local: - async for follower in reply_post.author.inbound_follows.filter( + for follower in reply_post.author.inbound_follows.filter( state__in=FollowStates.group_active() ).select_related("source"): targets.add(follower.source) @@ -782,7 +781,7 @@ class Post(StatorModel): .filter(mute=False) .select_related("target") ) - async for block in blocks: + for block in blocks: try: targets.remove(block.target) except KeyError: diff --git a/activities/models/post_interaction.py b/activities/models/post_interaction.py index 953b2f3..90d00ec 100644 --- a/activities/models/post_interaction.py +++ b/activities/models/post_interaction.py @@ -179,9 +179,7 @@ class PostInteraction(StatorModel): updated = models.DateTimeField(auto_now=True) class Meta: - indexes = [ - models.Index(fields=["type", "identity", "post"]) - ] + StatorModel.Meta.indexes + indexes = [models.Index(fields=["type", "identity", "post"])] ### Display helpers ### diff --git a/api/views/instance.py b/api/views/instance.py index 165c12f..58d7455 100644 --- a/api/views/instance.py +++ b/api/views/instance.py @@ -1,4 +1,5 @@ from django.conf import settings +from django.core.cache import cache from hatchway import api_view from activities.models import Post @@ -10,6 +11,15 @@ from users.models import Domain, Identity @api_view.get def instance_info_v1(request): + # The stats are expensive to calculate, so don't do it very often + stats = cache.get("instance_info_stats") + if stats is None: + stats = { + "user_count": Identity.objects.filter(local=True).count(), + "status_count": Post.objects.filter(local=True).not_hidden().count(), + "domain_count": Domain.objects.count(), + } + cache.set("instance_info_stats", stats, timeout=300) return { "uri": request.headers.get("host", settings.SETUP.MAIN_DOMAIN), "title": Config.system.site_name, @@ -18,11 +28,7 @@ def instance_info_v1(request): "email": "", "version": f"takahe/{__version__}", "urls": {}, - "stats": { - "user_count": Identity.objects.filter(local=True).count(), - "status_count": Post.objects.filter(local=True).not_hidden().count(), - "domain_count": Domain.objects.count(), - }, + "stats": stats, "thumbnail": Config.system.site_banner, "languages": ["en"], "registrations": (Config.system.signup_allowed), diff --git a/core/exceptions.py b/core/exceptions.py index 49f5b9b..558e67f 100644 --- a/core/exceptions.py +++ b/core/exceptions.py @@ -1,6 +1,5 @@ import traceback -from asgiref.sync import sync_to_async from django.conf import settings @@ -40,6 +39,3 @@ def capture_exception(exception: BaseException, scope=None, **scope_args): capture_exception(exception, scope, **scope_args) elif settings.DEBUG: traceback.print_exc() - - -acapture_exception = sync_to_async(capture_exception, thread_sensitive=False) diff --git a/docs/releases/0.10.rst b/docs/releases/0.10.rst new file mode 100644 index 0000000..d33ccdb --- /dev/null +++ b/docs/releases/0.10.rst @@ -0,0 +1,40 @@ +0.9 +=== + +*Released: Not Yet Released* + +This release is a polish release that is prepping us for the road to 1.0. + +This release's major changes: + +* Stator, the background task system, has been significantly reworked to require + smaller indexes, spend less time scheduling, and has had most of its async + nature removed, as this both reduces deadlocks and improves performance in + most situations (the context switching was costing more than the gains from + talking to other servers asynchronously). + +* TBC + +If you'd like to help with code, design, or other areas, see +:doc:`/contributing` to see how to get in touch. + +You can download images from `Docker Hub `_, +or use the image name ``jointakahe/takahe:0.10``. + + +Upgrade Notes +------------- + +Migrations +~~~~~~~~~~ + +There are new database migrations; they are backwards-compatible, but contain +very significant index changes to all of the main tables that may cause the +PostgreSQL deadlock detector to trigger if you attempt to apply them while your +site is live. + +We recommend: + +* Temporarily stopping all instances of the webserver and Stator +* Applying the migration (should be less than a few minutes on most installs) +* Restarting the instances of webserver and Stator diff --git a/setup.cfg b/setup.cfg index 0d26527..4775e47 100644 --- a/setup.cfg +++ b/setup.cfg @@ -12,6 +12,10 @@ addopts = --tb=short --ds=takahe.settings --import-mode=importlib filterwarnings = ignore:There is no current event loop ignore:No directory at + ignore:DateTimeField Post.created + ignore:'index_together' is deprecated + ignore:Deprecated call to + ignore:pkg_resources is deprecated as an API [mypy] warn_unused_ignores = True diff --git a/stator/graph.py b/stator/graph.py index 7019e3f..4390523 100644 --- a/stator/graph.py +++ b/stator/graph.py @@ -13,6 +13,7 @@ class StateGraph: initial_state: ClassVar["State"] terminal_states: ClassVar[set["State"]] automatic_states: ClassVar[set["State"]] + deletion_states: ClassVar[set["State"]] def __init_subclass__(cls) -> None: # Collect state members @@ -33,6 +34,7 @@ class StateGraph: # Check the graph layout terminal_states = set() automatic_states = set() + deletion_states = set() initial_state = None for state in cls.states.values(): # Check for multiple initial states @@ -42,6 +44,9 @@ class StateGraph: f"The graph has more than one initial state: {initial_state} and {state}" ) initial_state = state + # Collect states that require deletion handling (they can be terminal or not) + if state.delete_after: + deletion_states.add(state) # Collect terminal states if state.terminal: state.externally_progressed = True @@ -74,6 +79,7 @@ class StateGraph: cls.initial_state = initial_state cls.terminal_states = terminal_states cls.automatic_states = automatic_states + cls.deletion_states = deletion_states # Generate choices cls.choices = [(name, name) for name in cls.states.keys()] @@ -98,6 +104,9 @@ class State: self.attempt_immediately = attempt_immediately self.force_initial = force_initial self.delete_after = delete_after + # Deletes are also only attempted on try_intervals + if self.delete_after and not self.try_interval: + self.try_interval = self.delete_after self.parents: set["State"] = set() self.children: set["State"] = set() self.timeout_state: State | None = None diff --git a/stator/management/commands/runstator.py b/stator/management/commands/runstator.py index a93ea8b..cf09333 100644 --- a/stator/management/commands/runstator.py +++ b/stator/management/commands/runstator.py @@ -1,6 +1,5 @@ from typing import cast -from asgiref.sync import async_to_sync from django.apps import apps from django.core.management.base import BaseCommand @@ -84,6 +83,6 @@ class Command(BaseCommand): run_for=run_for, ) try: - async_to_sync(runner.run)() + runner.run() except KeyboardInterrupt: print("Ctrl-C received") diff --git a/stator/models.py b/stator/models.py index aacaddb..e17a0c7 100644 --- a/stator/models.py +++ b/stator/models.py @@ -1,8 +1,8 @@ import datetime import traceback -from typing import ClassVar, cast +from typing import ClassVar -from asgiref.sync import sync_to_async +from asgiref.sync import async_to_sync, iscoroutinefunction from django.db import models, transaction from django.db.models.signals import class_prepared from django.utils import timezone @@ -47,19 +47,15 @@ def add_stator_indexes(sender, **kwargs): if issubclass(sender, StatorModel): indexes = [ models.Index( - fields=["state", "state_attempted"], - name=f"ix_{sender.__name__.lower()[:11]}_state_attempted", - ), - models.Index( - fields=["state_locked_until", "state"], - condition=models.Q(state_locked_until__isnull=False), - name=f"ix_{sender.__name__.lower()[:11]}_state_locked", + fields=["state", "state_next_attempt", "state_locked_until"], + name=f"ix_{sender.__name__.lower()[:11]}_state_next", ), ] if not sender._meta.indexes: # Meta.indexes needs to not be None to trigger Django behaviors sender.Meta.indexes = [] + sender._meta.indexes = [] for idx in indexes: sender._meta.indexes.append(idx) @@ -81,30 +77,26 @@ class StatorModel(models.Model): concrete model yourself. """ - SCHEDULE_BATCH_SIZE = 1000 + CLEAN_BATCH_SIZE = 1000 + DELETE_BATCH_SIZE = 500 state: StateField - # If this row is up for transition attempts (which it always is on creation!) - state_ready = models.BooleanField(default=True) - # When the state last actually changed, or the date of instance creation state_changed = models.DateTimeField(auto_now_add=True) - # When the last state change for the current state was attempted - # (and not successful, as this is cleared on transition) - state_attempted = models.DateTimeField(blank=True, null=True) + # When the next state change should be attempted (null means immediately) + state_next_attempt = models.DateTimeField(blank=True, null=True) # If a lock is out on this row, when it is locked until # (we don't identify the lock owner, as there's no heartbeats) - state_locked_until = models.DateTimeField(null=True, blank=True) + state_locked_until = models.DateTimeField(null=True, blank=True, db_index=True) # Collection of subclasses of us subclasses: ClassVar[list[type["StatorModel"]]] = [] class Meta: abstract = True - indexes = [models.Index(fields=["state_ready", "state_locked_until", "state"])] def __init_subclass__(cls) -> None: if cls is not StatorModel: @@ -118,52 +110,6 @@ class StatorModel(models.Model): def state_age(self) -> float: return (timezone.now() - self.state_changed).total_seconds() - @classmethod - async def atransition_schedule_due(cls, now=None): - """ - Finds instances of this model that need to run and schedule them. - """ - if now is None: - now = timezone.now() - q = models.Q() - for state in cls.state_graph.states.values(): - state = cast(State, state) - if not state.externally_progressed: - q = q | models.Q( - ( - models.Q( - state_attempted__lte=( - now - - datetime.timedelta( - seconds=cast(float, state.try_interval) - ) - ) - ) - | models.Q(state_attempted__isnull=True) - ), - state=state.name, - ) - select_query = cls.objects.filter(q)[: cls.SCHEDULE_BATCH_SIZE] - await cls.objects.filter(pk__in=select_query).aupdate(state_ready=True) - - @classmethod - async def atransition_delete_due(cls, now=None): - """ - Finds instances of this model that need to be deleted and deletes them. - """ - if now is None: - now = timezone.now() - for state in cls.state_graph.states.values(): - state = cast(State, state) - if state.delete_after: - select_query = cls.objects.filter( - state=state, - state_changed__lte=( - now - datetime.timedelta(seconds=state.delete_after) - ), - )[: cls.SCHEDULE_BATCH_SIZE] - await cls.objects.filter(pk__in=select_query).adelete() - @classmethod def transition_get_with_lock( cls, number: int, lock_expiry: datetime.datetime @@ -172,11 +118,17 @@ class StatorModel(models.Model): Returns up to `number` tasks for execution, having locked them. """ with transaction.atomic(): + # Query for `number` rows that: + # - Have a next_attempt that's either null or in the past + # - Have one of the states we care about + # Then, sort them by next_attempt NULLS FIRST, so that we handle the + # rows in a roughly FIFO order. selected = list( cls.objects.filter( - state_locked_until__isnull=True, - state_ready=True, + models.Q(state_next_attempt__isnull=True) + | models.Q(state_next_attempt__lte=timezone.now()), state__in=cls.state_graph.automatic_states, + state_locked_until__isnull=True, )[:number].select_for_update() ) cls.objects.filter(pk__in=[i.pk for i in selected]).update( @@ -185,44 +137,56 @@ class StatorModel(models.Model): return selected @classmethod - async def atransition_get_with_lock( - cls, number: int, lock_expiry: datetime.datetime - ) -> list["StatorModel"]: - return await sync_to_async(cls.transition_get_with_lock)(number, lock_expiry) + def transition_delete_due(cls) -> int | None: + """ + Finds instances of this model that need to be deleted and deletes them + in small batches. Returns how many were deleted. + """ + if cls.state_graph.deletion_states: + constraints = models.Q() + for state in cls.state_graph.deletion_states: + constraints |= models.Q( + state=state, + state_changed__lte=( + timezone.now() - datetime.timedelta(seconds=state.delete_after) + ), + ) + select_query = cls.objects.filter( + models.Q(state_next_attempt__isnull=True) + | models.Q(state_next_attempt__lte=timezone.now()), + constraints, + )[: cls.DELETE_BATCH_SIZE] + return cls.objects.filter(pk__in=select_query).delete()[0] + return None @classmethod - async def atransition_ready_count(cls) -> int: + def transition_ready_count(cls) -> int: """ Returns how many instances are "queued" """ - return await cls.objects.filter( + return cls.objects.filter( + models.Q(state_next_attempt__isnull=True) + | models.Q(state_next_attempt__lte=timezone.now()), state_locked_until__isnull=True, - state_ready=True, state__in=cls.state_graph.automatic_states, - ).acount() + ).count() @classmethod - async def atransition_clean_locks(cls): + def transition_clean_locks(cls): + """ + Deletes stale locks (in batches, to avoid a giant query) + """ select_query = cls.objects.filter(state_locked_until__lte=timezone.now())[ - : cls.SCHEDULE_BATCH_SIZE + : cls.CLEAN_BATCH_SIZE ] - await cls.objects.filter(pk__in=select_query).aupdate(state_locked_until=None) + cls.objects.filter(pk__in=select_query).update(state_locked_until=None) - def transition_schedule(self): - """ - Adds this instance to the queue to get its state transition attempted. - - The scheduler will call this, but you can also call it directly if you - know it'll be ready and want to lower latency. - """ - self.state_ready = True - self.save() - - async def atransition_attempt(self) -> State | None: + def transition_attempt(self) -> State | None: """ Attempts to transition the current state by running its handler(s). """ current_state: State = self.state_graph.states[self.state] + # If it's a manual progression state don't even try # We shouldn't really be here in this case, but it could be a race condition if current_state.externally_progressed: @@ -230,12 +194,17 @@ class StatorModel(models.Model): f"Warning: trying to progress externally progressed state {self.state}!" ) return None + + # Try running its handler function try: - next_state = await current_state.handler(self) # type: ignore + if iscoroutinefunction(current_state.handler): + next_state = async_to_sync(current_state.handler)(self) + else: + next_state = current_state.handler(self) except TryAgainLater: pass except BaseException as e: - await exceptions.acapture_exception(e) + exceptions.capture_exception(e) traceback.print_exc() else: if next_state: @@ -247,20 +216,24 @@ class StatorModel(models.Model): raise ValueError( f"Cannot transition from {current_state} to {next_state} - not a declared transition" ) - await self.atransition_perform(next_state) + self.transition_perform(next_state) return next_state - # See if it timed out + + # See if it timed out since its last state change if ( current_state.timeout_value and current_state.timeout_value <= (timezone.now() - self.state_changed).total_seconds() ): - await self.atransition_perform(current_state.timeout_state) + self.transition_perform(current_state.timeout_state) # type: ignore return current_state.timeout_state - await self.__class__.objects.filter(pk=self.pk).aupdate( - state_attempted=timezone.now(), + + # Nothing happened, set next execution and unlock it + self.__class__.objects.filter(pk=self.pk).update( + state_next_attempt=( + timezone.now() + datetime.timedelta(seconds=current_state.try_interval) # type: ignore + ), state_locked_until=None, - state_ready=False, ) return None @@ -273,27 +246,6 @@ class StatorModel(models.Model): state, ) - atransition_perform = sync_to_async(transition_perform) - - def transition_set_state(self, state: State | str): - """ - Sets the instance to the given state name for when it is saved. - """ - if isinstance(state, State): - state = state.name - if state not in self.state_graph.states: - raise ValueError(f"Invalid state {state}") - self.state = state # type: ignore - self.state_changed = timezone.now() - self.state_locked_until = None - - if self.state_graph.states[state].attempt_immediately: - self.state_attempted = None - self.state_ready = True - else: - self.state_attempted = timezone.now() - self.state_ready = False - @classmethod def transition_perform_queryset( cls, @@ -303,26 +255,27 @@ class StatorModel(models.Model): """ Transitions every instance in the queryset to the given state name, forcibly. """ + # Really ensure we have the right state object if isinstance(state, State): - state = state.name - if state not in cls.state_graph.states: - raise ValueError(f"Invalid state {state}") + state_obj = cls.state_graph.states[state.name] + else: + state_obj = cls.state_graph.states[state] # See if it's ready immediately (if not, delay until first try_interval) - if cls.state_graph.states[state].attempt_immediately: + if state_obj.attempt_immediately or state_obj.try_interval is None: queryset.update( - state=state, + state=state_obj, state_changed=timezone.now(), - state_attempted=None, + state_next_attempt=None, state_locked_until=None, - state_ready=True, ) else: queryset.update( - state=state, + state=state_obj, state_changed=timezone.now(), - state_attempted=timezone.now(), + state_next_attempt=( + timezone.now() + datetime.timedelta(seconds=state_obj.try_interval) + ), state_locked_until=None, - state_ready=False, ) @@ -355,10 +308,6 @@ class Stats(models.Model): instance.statistics[key] = {} return instance - @classmethod - async def aget_for_model(cls, model: type[StatorModel]) -> "Stats": - return await sync_to_async(cls.get_for_model)(model) - def set_queued(self, number: int): """ Sets the current queued amount. diff --git a/stator/runner.py b/stator/runner.py index 278cfca..3212c1c 100644 --- a/stator/runner.py +++ b/stator/runner.py @@ -1,14 +1,13 @@ -import asyncio import datetime import os import signal import time import traceback import uuid -from collections.abc import Callable +from concurrent.futures import Future, ThreadPoolExecutor -from asgiref.sync import async_to_sync, sync_to_async from django.conf import settings +from django.db import close_old_connections from django.utils import timezone from core import exceptions, sentry @@ -16,26 +15,30 @@ from core.models import Config from stator.models import StatorModel, Stats -class LoopingTask: +class LoopingTimer: """ - Wrapper for having a coroutine go in the background and only have one - copy running at a time. + Triggers check() to be true once every `interval`. """ - def __init__(self, callable: Callable): - self.callable = callable - self.task: asyncio.Task | None = None + next_run: float | None = None - def run(self) -> bool: - # If we have a task object, see if we can clear it up - if self.task is not None: - if self.task.done(): - self.task = None + def __init__(self, interval: float, trigger_at_start=True): + self.interval = interval + self.trigger_at_start = trigger_at_start + + def check(self) -> bool: + # See if it's our first time being called + if self.next_run is None: + # Set up the next call based on trigger_at_start + if self.trigger_at_start: + self.next_run = time.monotonic() else: - return False - # OK, launch a new task - self.task = asyncio.create_task(self.callable()) - return True + self.next_run = time.monotonic() + self.interval + # See if it's time to run the next call + if time.monotonic() >= self.next_run: + self.next_run = time.monotonic() + self.interval + return True + return False class StatorRunner: @@ -47,12 +50,13 @@ class StatorRunner: def __init__( self, models: list[type[StatorModel]], - concurrency: int = getattr(settings, "STATOR_CONCURRENCY", 50), + concurrency: int = getattr(settings, "STATOR_CONCURRENCY", 30), concurrency_per_model: int = getattr( settings, "STATOR_CONCURRENCY_PER_MODEL", 15 ), liveness_file: str | None = None, - schedule_interval: int = 30, + schedule_interval: int = 60, + delete_interval: int = 30, lock_expiry: int = 300, run_for: int = 0, ): @@ -62,53 +66,52 @@ class StatorRunner: self.concurrency_per_model = concurrency_per_model self.liveness_file = liveness_file self.schedule_interval = schedule_interval + self.delete_interval = delete_interval self.lock_expiry = lock_expiry self.run_for = run_for self.minimum_loop_delay = 0.5 self.maximum_loop_delay = 5 + self.tasks: list[Future] = [] # Set up SIGALRM handler signal.signal(signal.SIGALRM, self.alarm_handler) - async def run(self): + def run(self): sentry.set_takahe_app("stator") self.handled = {} self.started = time.monotonic() - self.last_clean = time.monotonic() - self.schedule_interval - self.tasks = [] + self.executor = ThreadPoolExecutor(max_workers=self.concurrency) self.loop_delay = self.minimum_loop_delay - self.schedule_task = LoopingTask(self.run_scheduling) - self.fetch_task = LoopingTask(self.fetch_and_process_tasks) - self.config_task = LoopingTask(self.load_config) + self.scheduling_timer = LoopingTimer(self.schedule_interval) + self.deletion_timer = LoopingTimer(self.delete_interval) # For the first time period, launch tasks print("Running main task loop") try: with sentry.configure_scope() as scope: while True: - # Do we need to do cleaning? - if (time.monotonic() - self.last_clean) >= self.schedule_interval: - # Set up the watchdog timer (each time we do this the - # previous one is cancelled) + # See if we need to run cleaning + if self.scheduling_timer.check(): + # Set up the watchdog timer (each time we do this the previous one is cancelled) signal.alarm(self.schedule_interval * 2) - # Refresh the config - self.config_task.run() - if self.schedule_task.run(): - print("Running cleaning and scheduling") - else: - print("Previous scheduling still running...!") # Write liveness file if configured if self.liveness_file: with open(self.liveness_file, "w") as fh: fh.write(str(int(time.time()))) - self.last_clean = time.monotonic() + # Refresh the config + self.load_config() + # Do scheduling (stale lock deletion and stats gathering) + self.run_scheduling() # Clear the cleaning breadcrumbs/extra for the main part of the loop sentry.scope_clear(scope) - self.remove_completed_tasks() + self.clean_tasks() - # Fetching is kind of blocking, so we need to do this - # as a separate coroutine - self.fetch_task.run() + # See if we need to add deletion tasks + if self.deletion_timer.check(): + self.add_deletion_tasks() + + # Fetch and run any new handlers we can fit + self.add_transition_tasks() # Are we in limited run mode? if ( @@ -126,22 +129,19 @@ class StatorRunner: self.loop_delay * 1.5, self.maximum_loop_delay, ) - await asyncio.sleep(self.loop_delay) + time.sleep(self.loop_delay) # Clear the Sentry breadcrumbs and extra for next loop sentry.scope_clear(scope) except KeyboardInterrupt: pass + # Wait for tasks to finish print("Waiting for tasks to complete") - while True: - self.remove_completed_tasks() - if not self.tasks: - break - # Prevent busylooping - await asyncio.sleep(0.5) + self.executor.shutdown() + + # We're done print("Complete") - return self.handled def alarm_handler(self, signum, frame): """ @@ -151,103 +151,141 @@ class StatorRunner: print("Watchdog timeout exceeded") os._exit(2) - async def load_config(self): + def load_config(self): """ Refreshes config from the DB """ - Config.system = await Config.aload_system() + Config.system = Config.load_system() - async def run_scheduling(self): + def run_scheduling(self): """ - Do any transition cleanup tasks + Deletes stale locks for models, and submits their stats. """ - if self.handled: - print("Tasks processed since last flush:") - for label, number in self.handled.items(): - print(f" {label}: {number}") - else: - print("No tasks handled since last flush.") with sentry.start_transaction(op="task", name="stator.run_scheduling"): for model in self.models: - print(f"Scheduling {model._meta.label_lower}") - await self.submit_stats(model) - print(" Cleaning locks") - await model.atransition_clean_locks() - print(" Scheduling due items") - await model.atransition_schedule_due() - print(" Deleting due items") - await model.atransition_delete_due() + print( + f"{model._meta.label_lower}: Scheduling ({self.handled.get(model._meta.label_lower, 0)} handled)" + ) + self.submit_stats(model) + model.transition_clean_locks() - async def submit_stats(self, model): + def submit_stats(self, model: type[StatorModel]): """ - Pop some statistics into the database + Pop some statistics into the database from our local info for the given model """ - stats_instance = await Stats.aget_for_model(model) + stats_instance = Stats.get_for_model(model) if stats_instance.model_label in self.handled: stats_instance.add_handled(self.handled[stats_instance.model_label]) del self.handled[stats_instance.model_label] - stats_instance.set_queued(await model.atransition_ready_count()) + stats_instance.set_queued(model.transition_ready_count()) stats_instance.trim_data() - await sync_to_async(stats_instance.save)() + stats_instance.save() - async def fetch_and_process_tasks(self): + def add_transition_tasks(self, call_inline=False): + """ + Adds a transition thread for as many instances as we can, given capacity + and batch size limits. + """ # Calculate space left for tasks space_remaining = self.concurrency - len(self.tasks) # Fetch new tasks for model in self.models: if space_remaining > 0: - for instance in await model.atransition_get_with_lock( + for instance in model.transition_get_with_lock( number=min(space_remaining, self.concurrency_per_model), lock_expiry=( timezone.now() + datetime.timedelta(seconds=self.lock_expiry) ), ): - self.tasks.append( - asyncio.create_task(self.run_transition(instance)) - ) + if call_inline: + task_transition(instance, in_thread=False) + else: + self.tasks.append( + self.executor.submit(task_transition, instance) + ) self.handled[model._meta.label_lower] = ( self.handled.get(model._meta.label_lower, 0) + 1 ) space_remaining -= 1 - async def run_transition(self, instance: StatorModel): + def add_deletion_tasks(self, call_inline=False): """ - Wrapper for atransition_attempt with fallback error handling + Adds a deletion thread for each model """ - task_name = f"stator.run_transition:{instance._meta.label_lower}#{{id}} from {instance.state}" - with sentry.start_transaction(op="task", name=task_name): - sentry.set_context( - "instance", - { - "model": instance._meta.label_lower, - "pk": instance.pk, - "state": instance.state, - "state_age": instance.state_age, - }, - ) + # Yes, this potentially goes over the capacity limit - it's fine. + for model in self.models: + if model.state_graph.deletion_states: + if call_inline: + task_deletion(model, in_thread=False) + else: + self.tasks.append(self.executor.submit(task_deletion, model)) - try: - print( - f"Attempting transition on {instance._meta.label_lower}#{instance.pk} from state {instance.state}" - ) - await instance.atransition_attempt() - except BaseException as e: - await exceptions.acapture_exception(e) - traceback.print_exc() - - def remove_completed_tasks(self): + def clean_tasks(self): """ - Removes all completed asyncio.Tasks from our local in-progress list + Removes any tasks that are done and handles exceptions if they + raised them. """ - self.tasks = [t for t in self.tasks if not t.done()] + new_tasks = [] + for task in self.tasks: + if task.done(): + try: + task.result() + except BaseException as e: + exceptions.capture_exception(e) + traceback.print_exc() + else: + new_tasks.append(task) + self.tasks = new_tasks - async def run_single_cycle(self): + def run_single_cycle(self): """ Testing entrypoint to advance things just one cycle, and allow errors to propagate out. """ - await asyncio.wait_for(self.fetch_and_process_tasks(), timeout=1) - for task in self.tasks: - await task + self.add_deletion_tasks(call_inline=True) + self.add_transition_tasks(call_inline=True) - run_single_cycle_sync = async_to_sync(run_single_cycle) + +def task_transition(instance: StatorModel, in_thread: bool = True): + """ + Runs one state transition/action. + """ + task_name = f"stator.task_transition:{instance._meta.label_lower}#{{id}} from {instance.state}" + started = time.monotonic() + with sentry.start_transaction(op="task", name=task_name): + sentry.set_context( + "instance", + { + "model": instance._meta.label_lower, + "pk": instance.pk, + "state": instance.state, + "state_age": instance.state_age, + }, + ) + result = instance.transition_attempt() + duration = time.monotonic() - started + if result: + print( + f"{instance._meta.label_lower}: {instance.pk}: {instance.state} -> {result} ({duration:.2f}s)" + ) + else: + print( + f"{instance._meta.label_lower}: {instance.pk}: {instance.state} unchanged ({duration:.2f}s)" + ) + if in_thread: + close_old_connections() + + +def task_deletion(model: type[StatorModel], in_thread: bool = True): + """ + Runs one model deletion set. + """ + # Loop, running deletions every second, until there are no more to do + while True: + deleted = model.transition_delete_due() + if not deleted: + break + print(f"{model._meta.label_lower}: Deleted {deleted} stale items") + time.sleep(1) + if in_thread: + close_old_connections() diff --git a/takahe/__init__.py b/takahe/__init__.py index 3e2f46a..2aceaf4 100644 --- a/takahe/__init__.py +++ b/takahe/__init__.py @@ -1 +1 @@ -__version__ = "0.9.0" +__version__ = "0.10.0-dev" diff --git a/tests/activities/models/test_post.py b/tests/activities/models/test_post.py index 3b28b14..9b094a4 100644 --- a/tests/activities/models/test_post.py +++ b/tests/activities/models/test_post.py @@ -68,7 +68,7 @@ def test_ensure_hashtag(identity: Identity, config_system, stator): author=identity, content="Hello, #testtag", ) - stator.run_single_cycle_sync() + stator.run_single_cycle() assert post.hashtags == ["testtag"] assert Hashtag.objects.filter(hashtag="testtag").exists() # Excessively long hashtag @@ -76,7 +76,7 @@ def test_ensure_hashtag(identity: Identity, config_system, stator): author=identity, content="Hello, #thisisahashtagthatiswaytoolongandissignificantlyaboveourmaximumlimitofonehundredcharacterswhytheywouldbethislongidontknow", ) - stator.run_single_cycle_sync() + stator.run_single_cycle() assert post.hashtags == [ "thisisahashtagthatiswaytoolongandissignificantlyaboveourmaximumlimitofonehundredcharacterswhytheywou" ] @@ -226,19 +226,19 @@ def test_post_transitions(identity, stator): ) # Test: | --> new --> fanned_out assert post.state == str(PostStates.new) - stator.run_single_cycle_sync() + stator.run_single_cycle() post = Post.objects.get(id=post.id) assert post.state == str(PostStates.fanned_out) # Test: fanned_out --> (forced) edited --> edited_fanned_out Post.transition_perform(post, PostStates.edited) - stator.run_single_cycle_sync() + stator.run_single_cycle() post = Post.objects.get(id=post.id) assert post.state == str(PostStates.edited_fanned_out) # Test: edited_fanned_out --> (forced) deleted --> deleted_fanned_out Post.transition_perform(post, PostStates.deleted) - stator.run_single_cycle_sync() + stator.run_single_cycle() post = Post.objects.get(id=post.id) assert post.state == str(PostStates.deleted_fanned_out) @@ -392,7 +392,7 @@ def test_inbound_posts( InboxMessage.objects.create(message=message) # Run stator and ensure that made the post - stator.run_single_cycle_sync() + stator.run_single_cycle() post = Post.objects.get(object_uri="https://remote.test/test-post") assert post.content == "post version one" assert post.published.day == 13 @@ -416,7 +416,7 @@ def test_inbound_posts( InboxMessage.objects.create(message=message) # Run stator and ensure that edited the post - stator.run_single_cycle_sync() + stator.run_single_cycle() post = Post.objects.get(object_uri="https://remote.test/test-post") assert post.content == "post version two" assert post.edited.day == 14 @@ -455,7 +455,7 @@ def test_inbound_posts( InboxMessage.objects.create(message=message) # Run stator and ensure that deleted the post - stator.run_single_cycle_sync() + stator.run_single_cycle() assert not Post.objects.filter(object_uri="https://remote.test/test-post").exists() # Create an inbound new post message with only contentMap @@ -474,7 +474,7 @@ def test_inbound_posts( InboxMessage.objects.create(message=message) # Run stator and ensure that made the post - stator.run_single_cycle_sync() + stator.run_single_cycle() post = Post.objects.get(object_uri="https://remote.test/test-map-only") assert post.content == "post with only content map" assert post.published.day == 13 diff --git a/tests/activities/models/test_post_targets.py b/tests/activities/models/test_post_targets.py index 9aa209d..acc068b 100644 --- a/tests/activities/models/test_post_targets.py +++ b/tests/activities/models/test_post_targets.py @@ -1,5 +1,4 @@ import pytest -from asgiref.sync import async_to_sync from activities.models import Post from users.models import Block, Domain, Follow, Identity @@ -16,7 +15,7 @@ def test_post_targets_simple(identity, other_identity, remote_identity): author=identity, local=True, ) - targets = async_to_sync(post.aget_targets)() + targets = post.get_targets() assert targets == {identity} # Test remote reply targets original post author @@ -26,7 +25,7 @@ def test_post_targets_simple(identity, other_identity, remote_identity): local=False, in_reply_to=post.absolute_object_uri(), ) - targets = async_to_sync(post.aget_targets)() + targets = post.get_targets() assert targets == {identity} # Test a post with local and remote mentions @@ -38,14 +37,14 @@ def test_post_targets_simple(identity, other_identity, remote_identity): # Mentions are targeted post.mentions.add(remote_identity) post.mentions.add(other_identity) - targets = async_to_sync(post.aget_targets)() + targets = post.get_targets() # Targets everyone assert targets == {identity, other_identity, remote_identity} # Test remote post with mentions post.local = False post.save() - targets = async_to_sync(post.aget_targets)() + targets = post.get_targets() # Only targets locals who are mentioned assert targets == {other_identity} @@ -89,7 +88,7 @@ def test_post_targets_shared(identity, other_identity): post.mentions.add(other_identity) post.mentions.add(remote1) post.mentions.add(remote2) - targets = async_to_sync(post.aget_targets)() + targets = post.get_targets() # We should only have one of remote1 or remote2 in there as they share a # shared inbox URI @@ -120,13 +119,12 @@ def test_post_local_only(identity, other_identity, remote_identity): # Remote mention is not targeted post.mentions.add(remote_identity) - targets = async_to_sync(post.aget_targets)() + targets = post.get_targets() assert targets == {identity, other_identity} @pytest.mark.django_db def test_post_followers(identity, other_identity, remote_identity): - Follow.objects.create(source=other_identity, target=identity) Follow.objects.create(source=remote_identity, target=identity) @@ -137,26 +135,26 @@ def test_post_followers(identity, other_identity, remote_identity): local=True, visibility=Post.Visibilities.public, ) - targets = async_to_sync(post.aget_targets)() + targets = post.get_targets() assert targets == {identity, other_identity, remote_identity} # Remote post only targets local followers, not the author post.local = False post.save() - targets = async_to_sync(post.aget_targets)() + targets = post.get_targets() assert targets == {other_identity} # Local Only post only targets local followers post.local = True post.visibility = Post.Visibilities.local_only post.save() - targets = async_to_sync(post.aget_targets)() + targets = post.get_targets() assert targets == {identity, other_identity} # Mentioned posts do not target unmentioned followers post.visibility = Post.Visibilities.mentioned post.save() - targets = async_to_sync(post.aget_targets)() + targets = post.get_targets() assert targets == {identity} @@ -179,5 +177,5 @@ def test_post_blocked(identity, other_identity, remote_identity): post.mentions.add(other_identity) # The muted block should be in targets, the full block should not - targets = async_to_sync(post.aget_targets)() + targets = post.get_targets() assert targets == {identity, other_identity} diff --git a/tests/activities/models/test_timeline_event.py b/tests/activities/models/test_timeline_event.py index 82837c9..75af52f 100644 --- a/tests/activities/models/test_timeline_event.py +++ b/tests/activities/models/test_timeline_event.py @@ -53,9 +53,10 @@ def test_mentioned( elif blocked == "mute": Block.create_local_mute(identity, author) - # Run stator twice - to make fanouts and then process them - stator.run_single_cycle_sync() - stator.run_single_cycle_sync() + # Run stator thrice - to receive the post, make fanouts and then process them + stator.run_single_cycle() + stator.run_single_cycle() + stator.run_single_cycle() if blocked in ["full", "mute"]: # Verify we were not mentioned @@ -121,9 +122,10 @@ def test_interaction_local_post( elif blocked == "mute_with_notifications": Block.create_local_mute(identity, interactor, include_notifications=True) - # Run stator twice - to make fanouts and then process them - stator.run_single_cycle_sync() - stator.run_single_cycle_sync() + # Run stator thrice - to receive the post, make fanouts and then process them + stator.run_single_cycle() + stator.run_single_cycle() + stator.run_single_cycle() timeline_event_type = ( TimelineEvent.Types.boosted if type == "boost" else TimelineEvent.Types.liked @@ -177,9 +179,10 @@ def test_old_new_post( } InboxMessage.objects.create(message=message) - # Run stator twice - to make fanouts and then process them - stator.run_single_cycle_sync() - stator.run_single_cycle_sync() + # Run stator thrice - to receive the post, make fanouts and then process them + stator.run_single_cycle() + stator.run_single_cycle() + stator.run_single_cycle() if old: # Verify it did not appear on the timeline @@ -229,9 +232,10 @@ def test_clear_timeline( } InboxMessage.objects.create(message=message) - # Run stator twice - to make fanouts and then process them - stator.run_single_cycle_sync() - stator.run_single_cycle_sync() + # Run stator thrice - to receive the post, make fanouts and then process them + stator.run_single_cycle() + stator.run_single_cycle() + stator.run_single_cycle() # Make sure it appeared on our timeline as a post and a mentioned assert TimelineEvent.objects.filter( @@ -248,7 +252,7 @@ def test_clear_timeline( service.unfollow(remote_identity) # Run stator once to process the timeline clear message - stator.run_single_cycle_sync() + stator.run_single_cycle() # Verify that the right things vanished assert not TimelineEvent.objects.filter( @@ -308,9 +312,10 @@ def test_hashtag_followed( elif blocked == "mute": Block.create_local_mute(identity, author) - # Run stator twice - to make fanouts and then process them - stator.run_single_cycle_sync() - stator.run_single_cycle_sync() + # Run stator thrice - to receive the post, make fanouts and then process them + stator.run_single_cycle() + stator.run_single_cycle() + stator.run_single_cycle() if blocked in ["full", "mute"]: # Verify post is not in timeline diff --git a/tests/users/models/test_follow.py b/tests/users/models/test_follow.py index d23dc1e..ff66801 100644 --- a/tests/users/models/test_follow.py +++ b/tests/users/models/test_follow.py @@ -27,7 +27,7 @@ def test_follow( url="https://remote.test/@test/inbox/", status_code=202, ) - stator.run_single_cycle_sync() + stator.run_single_cycle() outbound_data = json.loads(httpx_mock.get_request().content) assert outbound_data["type"] == "Follow" assert outbound_data["actor"] == identity.actor_uri @@ -52,5 +52,5 @@ def test_follow( } InboxMessage.objects.create(message=message) # Run stator and ensure that accepted our follow - stator.run_single_cycle_sync() + stator.run_single_cycle() assert Follow.objects.get(pk=follow.pk).state == FollowStates.accepted diff --git a/tests/users/views/test_auth.py b/tests/users/views/test_auth.py index 9653a1b..b831195 100644 --- a/tests/users/views/test_auth.py +++ b/tests/users/views/test_auth.py @@ -120,5 +120,5 @@ def test_signup_email(client, config_system, stator): # Run Stator and verify it sends the email assert len(mail.outbox) == 0 - stator.run_single_cycle_sync() + stator.run_single_cycle() assert len(mail.outbox) == 1 diff --git a/tests/users/views/test_import_export.py b/tests/users/views/test_import_export.py index b50aa80..1273eb0 100644 --- a/tests/users/views/test_import_export.py +++ b/tests/users/views/test_import_export.py @@ -37,7 +37,7 @@ def test_import_following( assert InboxMessage.objects.count() == 1 # Run stator to process it - stator.run_single_cycle_sync() + stator.run_single_cycle() # See if we're now following that identity assert identity.outbound_follows.filter(target=remote_identity).count() == 1 diff --git a/users/migrations/0019_stator_next_change.py b/users/migrations/0019_stator_next_change.py new file mode 100644 index 0000000..8217604 --- /dev/null +++ b/users/migrations/0019_stator_next_change.py @@ -0,0 +1,271 @@ +# Generated by Django 4.2.1 on 2023-07-05 22:18 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + dependencies = [ + ("users", "0018_index_together_migration"), + ] + + operations = [ + migrations.RemoveIndex( + model_name="block", + name="users_block_state_r_e016a2_idx", + ), + migrations.RemoveIndex( + model_name="block", + name="ix_block_state_attempted", + ), + migrations.RemoveIndex( + model_name="block", + name="ix_block_state_locked", + ), + migrations.RemoveIndex( + model_name="domain", + name="ix_domain_state_attempted", + ), + migrations.RemoveIndex( + model_name="domain", + name="ix_domain_state_locked", + ), + migrations.RemoveIndex( + model_name="domain", + name="users_domai_state_r_42b328_idx", + ), + migrations.RemoveIndex( + model_name="follow", + name="users_follo_state_r_d1dbc2_idx", + ), + migrations.RemoveIndex( + model_name="follow", + name="ix_follow_state_attempted", + ), + migrations.RemoveIndex( + model_name="follow", + name="ix_follow_state_locked", + ), + migrations.RemoveIndex( + model_name="identity", + name="users_ident_state_r_6fdeee_idx", + ), + migrations.RemoveIndex( + model_name="identity", + name="ix_identity_state_attempted", + ), + migrations.RemoveIndex( + model_name="identity", + name="ix_identity_state_locked", + ), + migrations.RemoveIndex( + model_name="inboxmessage", + name="ix_inboxmessag_state_attempted", + ), + migrations.RemoveIndex( + model_name="inboxmessage", + name="ix_inboxmessag_state_locked", + ), + migrations.RemoveIndex( + model_name="inboxmessage", + name="users_inbox_state_r_00fce2_idx", + ), + migrations.RemoveIndex( + model_name="passwordreset", + name="ix_passwordres_state_attempted", + ), + migrations.RemoveIndex( + model_name="passwordreset", + name="ix_passwordres_state_locked", + ), + migrations.RemoveIndex( + model_name="passwordreset", + name="users_passw_state_r_f54f10_idx", + ), + migrations.RemoveIndex( + model_name="report", + name="ix_report_state_attempted", + ), + migrations.RemoveIndex( + model_name="report", + name="ix_report_state_locked", + ), + migrations.RemoveIndex( + model_name="report", + name="users_repor_state_r_345b80_idx", + ), + migrations.RemoveField( + model_name="block", + name="state_attempted", + ), + migrations.RemoveField( + model_name="block", + name="state_ready", + ), + migrations.RemoveField( + model_name="domain", + name="state_attempted", + ), + migrations.RemoveField( + model_name="domain", + name="state_ready", + ), + migrations.RemoveField( + model_name="follow", + name="state_attempted", + ), + migrations.RemoveField( + model_name="follow", + name="state_ready", + ), + migrations.RemoveField( + model_name="identity", + name="state_attempted", + ), + migrations.RemoveField( + model_name="identity", + name="state_ready", + ), + migrations.RemoveField( + model_name="inboxmessage", + name="state_attempted", + ), + migrations.RemoveField( + model_name="inboxmessage", + name="state_ready", + ), + migrations.RemoveField( + model_name="passwordreset", + name="state_attempted", + ), + migrations.RemoveField( + model_name="passwordreset", + name="state_ready", + ), + migrations.RemoveField( + model_name="report", + name="state_attempted", + ), + migrations.RemoveField( + model_name="report", + name="state_ready", + ), + migrations.AddField( + model_name="block", + name="state_next_attempt", + field=models.DateTimeField(blank=True, null=True), + ), + migrations.AddField( + model_name="domain", + name="state_next_attempt", + field=models.DateTimeField(blank=True, null=True), + ), + migrations.AddField( + model_name="follow", + name="state_next_attempt", + field=models.DateTimeField(blank=True, null=True), + ), + migrations.AddField( + model_name="identity", + name="state_next_attempt", + field=models.DateTimeField(blank=True, null=True), + ), + migrations.AddField( + model_name="inboxmessage", + name="state_next_attempt", + field=models.DateTimeField(blank=True, null=True), + ), + migrations.AddField( + model_name="passwordreset", + name="state_next_attempt", + field=models.DateTimeField(blank=True, null=True), + ), + migrations.AddField( + model_name="report", + name="state_next_attempt", + field=models.DateTimeField(blank=True, null=True), + ), + migrations.AlterField( + model_name="block", + name="state_locked_until", + field=models.DateTimeField(blank=True, db_index=True, null=True), + ), + migrations.AlterField( + model_name="domain", + name="state_locked_until", + field=models.DateTimeField(blank=True, db_index=True, null=True), + ), + migrations.AlterField( + model_name="follow", + name="state_locked_until", + field=models.DateTimeField(blank=True, db_index=True, null=True), + ), + migrations.AlterField( + model_name="identity", + name="state_locked_until", + field=models.DateTimeField(blank=True, db_index=True, null=True), + ), + migrations.AlterField( + model_name="inboxmessage", + name="state_locked_until", + field=models.DateTimeField(blank=True, db_index=True, null=True), + ), + migrations.AlterField( + model_name="passwordreset", + name="state_locked_until", + field=models.DateTimeField(blank=True, db_index=True, null=True), + ), + migrations.AlterField( + model_name="report", + name="state_locked_until", + field=models.DateTimeField(blank=True, db_index=True, null=True), + ), + migrations.AddIndex( + model_name="block", + index=models.Index( + fields=["state", "state_next_attempt", "state_locked_until"], + name="ix_block_state_next", + ), + ), + migrations.AddIndex( + model_name="follow", + index=models.Index( + fields=["state", "state_next_attempt", "state_locked_until"], + name="ix_follow_state_next", + ), + ), + migrations.AddIndex( + model_name="identity", + index=models.Index( + fields=["state", "state_next_attempt", "state_locked_until"], + name="ix_identity_state_next", + ), + ), + migrations.AddIndex( + model_name="inboxmessage", + index=models.Index( + fields=["state", "state_next_attempt", "state_locked_until"], + name="ix_inboxmessag_state_next", + ), + ), + migrations.AddIndex( + model_name="passwordreset", + index=models.Index( + fields=["state", "state_next_attempt", "state_locked_until"], + name="ix_passwordres_state_next", + ), + ), + migrations.AddIndex( + model_name="report", + index=models.Index( + fields=["state", "state_next_attempt", "state_locked_until"], + name="ix_report_state_next", + ), + ), + migrations.AddIndex( + model_name="domain", + index=models.Index( + fields=["state", "state_next_attempt", "state_locked_until"], + name="ix_domain_state_next", + ), + ), + ] diff --git a/users/migrations/0020_alter_identity_local.py b/users/migrations/0020_alter_identity_local.py new file mode 100644 index 0000000..2f163e4 --- /dev/null +++ b/users/migrations/0020_alter_identity_local.py @@ -0,0 +1,18 @@ +# Generated by Django 4.2.1 on 2023-07-07 20:37 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("users", "0019_stator_next_change"), + ] + + operations = [ + migrations.AlterField( + model_name="identity", + name="local", + field=models.BooleanField(db_index=True), + ), + ] diff --git a/users/models/block.py b/users/models/block.py index 3ede7b8..e9280d6 100644 --- a/users/models/block.py +++ b/users/models/block.py @@ -136,7 +136,7 @@ class Block(StatorModel): class Meta: unique_together = [("source", "target", "mute")] - indexes = StatorModel.Meta.indexes + indexes: list = [] # We need this so Stator can add its own def __str__(self): return f"#{self.id}: {self.source} blocks {self.target}" diff --git a/users/models/follow.py b/users/models/follow.py index e044481..a9b6063 100644 --- a/users/models/follow.py +++ b/users/models/follow.py @@ -146,7 +146,7 @@ class Follow(StatorModel): class Meta: unique_together = [("source", "target")] - indexes = StatorModel.Meta.indexes + indexes: list = [] # We need this so Stator can add its own def __str__(self): return f"#{self.id}: {self.source} → {self.target}" diff --git a/users/models/identity.py b/users/models/identity.py index 12cf665..28a5284 100644 --- a/users/models/identity.py +++ b/users/models/identity.py @@ -158,7 +158,7 @@ class Identity(StatorModel): state = StateField(IdentityStates) - local = models.BooleanField() + local = models.BooleanField(db_index=True) users = models.ManyToManyField( "users.User", related_name="identities", @@ -227,7 +227,7 @@ class Identity(StatorModel): class Meta: verbose_name_plural = "identities" unique_together = [("username", "domain")] - indexes = StatorModel.Meta.indexes + indexes: list = [] # We need this so Stator can add its own class urls(urlman.Urls): view = "/@{self.username}@{self.domain_id}/" diff --git a/users/models/inbox_message.py b/users/models/inbox_message.py index 5d057f9..8500190 100644 --- a/users/models/inbox_message.py +++ b/users/models/inbox_message.py @@ -1,4 +1,3 @@ -from asgiref.sync import sync_to_async from django.db import models from stator.models import State, StateField, StateGraph, StatorModel @@ -13,61 +12,59 @@ class InboxMessageStates(StateGraph): processed.transitions_to(purge) # Delete after release (back compat) @classmethod - async def handle_received(cls, instance: "InboxMessage"): + def handle_received(cls, instance: "InboxMessage"): from activities.models import Post, PostInteraction, TimelineEvent from users.models import Block, Follow, Identity, Report from users.services import IdentityService match instance.message_type: case "follow": - await sync_to_async(Follow.handle_request_ap)(instance.message) + Follow.handle_request_ap(instance.message) case "block": - await sync_to_async(Block.handle_ap)(instance.message) + Block.handle_ap(instance.message) case "announce": - await sync_to_async(PostInteraction.handle_ap)(instance.message) + PostInteraction.handle_ap(instance.message) case "like": - await sync_to_async(PostInteraction.handle_ap)(instance.message) + PostInteraction.handle_ap(instance.message) case "create": match instance.message_object_type: case "note": if instance.message_object_has_content: - await sync_to_async(Post.handle_create_ap)(instance.message) + Post.handle_create_ap(instance.message) else: # Notes without content are Interaction candidates - await sync_to_async(PostInteraction.handle_ap)( - instance.message - ) + PostInteraction.handle_ap(instance.message) case "question": - await sync_to_async(Post.handle_create_ap)(instance.message) + Post.handle_create_ap(instance.message) case unknown: if unknown in Post.Types.names: - await sync_to_async(Post.handle_create_ap)(instance.message) + Post.handle_create_ap(instance.message) case "update": match instance.message_object_type: case "note": - await sync_to_async(Post.handle_update_ap)(instance.message) + Post.handle_update_ap(instance.message) case "person": - await sync_to_async(Identity.handle_update_ap)(instance.message) + Identity.handle_update_ap(instance.message) case "service": - await sync_to_async(Identity.handle_update_ap)(instance.message) + Identity.handle_update_ap(instance.message) case "group": - await sync_to_async(Identity.handle_update_ap)(instance.message) + Identity.handle_update_ap(instance.message) case "organization": - await sync_to_async(Identity.handle_update_ap)(instance.message) + Identity.handle_update_ap(instance.message) case "application": - await sync_to_async(Identity.handle_update_ap)(instance.message) + Identity.handle_update_ap(instance.message) case "question": - await sync_to_async(Post.handle_update_ap)(instance.message) + Post.handle_update_ap(instance.message) case unknown: if unknown in Post.Types.names: - await sync_to_async(Post.handle_update_ap)(instance.message) + Post.handle_update_ap(instance.message) case "accept": match instance.message_object_type: case "follow": - await sync_to_async(Follow.handle_accept_ap)(instance.message) + Follow.handle_accept_ap(instance.message) case None: # It's a string object, but these will only be for Follows - await sync_to_async(Follow.handle_accept_ap)(instance.message) + Follow.handle_accept_ap(instance.message) case unknown: raise ValueError( f"Cannot handle activity of type accept.{unknown}" @@ -75,10 +72,10 @@ class InboxMessageStates(StateGraph): case "reject": match instance.message_object_type: case "follow": - await sync_to_async(Follow.handle_reject_ap)(instance.message) + Follow.handle_reject_ap(instance.message) case None: # It's a string object, but these will only be for Follows - await sync_to_async(Follow.handle_reject_ap)(instance.message) + Follow.handle_reject_ap(instance.message) case unknown: raise ValueError( f"Cannot handle activity of type reject.{unknown}" @@ -86,17 +83,13 @@ class InboxMessageStates(StateGraph): case "undo": match instance.message_object_type: case "follow": - await sync_to_async(Follow.handle_undo_ap)(instance.message) + Follow.handle_undo_ap(instance.message) case "block": - await sync_to_async(Block.handle_undo_ap)(instance.message) + Block.handle_undo_ap(instance.message) case "like": - await sync_to_async(PostInteraction.handle_undo_ap)( - instance.message - ) + PostInteraction.handle_undo_ap(instance.message) case "announce": - await sync_to_async(PostInteraction.handle_undo_ap)( - instance.message - ) + PostInteraction.handle_undo_ap(instance.message) case "http://litepub.social/ns#emojireact": # We're ignoring emoji reactions for now pass @@ -107,31 +100,31 @@ class InboxMessageStates(StateGraph): case "delete": # If there is no object type, we need to see if it's a profile or a post if not isinstance(instance.message["object"], dict): - if await Identity.objects.filter( + if Identity.objects.filter( actor_uri=instance.message["object"] - ).aexists(): - await sync_to_async(Identity.handle_delete_ap)(instance.message) - elif await Post.objects.filter( + ).exists(): + Identity.handle_delete_ap(instance.message) + elif Post.objects.filter( object_uri=instance.message["object"] - ).aexists(): - await sync_to_async(Post.handle_delete_ap)(instance.message) + ).exists(): + Post.handle_delete_ap(instance.message) else: # It is presumably already deleted pass else: match instance.message_object_type: case "tombstone": - await sync_to_async(Post.handle_delete_ap)(instance.message) + Post.handle_delete_ap(instance.message) case "note": - await sync_to_async(Post.handle_delete_ap)(instance.message) + Post.handle_delete_ap(instance.message) case unknown: raise ValueError( f"Cannot handle activity of type delete.{unknown}" ) case "add": - await sync_to_async(PostInteraction.handle_add_ap)(instance.message) + PostInteraction.handle_add_ap(instance.message) case "remove": - await sync_to_async(PostInteraction.handle_remove_ap)(instance.message) + PostInteraction.handle_remove_ap(instance.message) case "move": # We're ignoring moves for now pass @@ -140,19 +133,15 @@ class InboxMessageStates(StateGraph): pass case "flag": # Received reports - await sync_to_async(Report.handle_ap)(instance.message) + Report.handle_ap(instance.message) case "__internal__": match instance.message_object_type: case "fetchpost": - await sync_to_async(Post.handle_fetch_internal)( - instance.message["object"] - ) + Post.handle_fetch_internal(instance.message["object"]) case "cleartimeline": - await sync_to_async(TimelineEvent.handle_clear_timeline)( - instance.message["object"] - ) + TimelineEvent.handle_clear_timeline(instance.message["object"]) case "addfollow": - await sync_to_async(IdentityService.handle_internal_add_follow)( + IdentityService.handle_internal_add_follow( instance.message["object"] ) case unknown: