mirror of
https://github.com/bookwyrm-social/bookwyrm.git
synced 2025-01-11 01:35:28 +00:00
Attempt to complete inbox requests synchronously
When an inbox activity comes in from another fediverse instance, the behavior prior to this commit was always to immediately give a 200 response to the external server and then create a celery activity (usually in the MEDIUM_PRIORITY queue) to complete it. Instead, this would receive a request and try to complete it without making any http requests (which would make the request take too long to process). If an external request is required to complete the activity, a task is created and added to the queue. Ideally, this will cause some tasks to happen very promptly, and reduce the load on celery, which would help queued tasks happen more quickly as well. One downside is that this will make completing http requests from external servers slowing (since it's doing a bunch of thinking before responding).
This commit is contained in:
parent
db207065ce
commit
779d2b0694
4 changed files with 160 additions and 48 deletions
|
@ -100,9 +100,27 @@ class ActivityObject:
|
|||
|
||||
# pylint: disable=too-many-locals,too-many-branches,too-many-arguments
|
||||
def to_model(
|
||||
self, model=None, instance=None, allow_create=True, save=True, overwrite=True
|
||||
self,
|
||||
model=None,
|
||||
instance=None,
|
||||
allow_create=True,
|
||||
save=True,
|
||||
overwrite=True,
|
||||
allow_external_connections=True,
|
||||
):
|
||||
"""convert from an activity to a model instance"""
|
||||
"""convert from an activity to a model instance. Args:
|
||||
model: the django model that this object is being converted to
|
||||
(will guess if not known)
|
||||
instance: an existing database entry that is going to be updated by
|
||||
this activity
|
||||
allow_create: whether a new object should be created if there is no
|
||||
existing object is provided or found matching the remote_id
|
||||
save: store in the database if true, return an unsaved model obj if false
|
||||
overwrite: replace fields in the database with this activity if true,
|
||||
only update blank fields if false
|
||||
allow_external_connections: look up missing data if true,
|
||||
throw an exception if false and an external connection is needed
|
||||
"""
|
||||
model = model or get_model_from_type(self.type)
|
||||
|
||||
# only reject statuses if we're potentially creating them
|
||||
|
@ -127,7 +145,10 @@ class ActivityObject:
|
|||
for field in instance.simple_fields:
|
||||
try:
|
||||
changed = field.set_field_from_activity(
|
||||
instance, self, overwrite=overwrite
|
||||
instance,
|
||||
self,
|
||||
overwrite=overwrite,
|
||||
allow_external_connections=allow_external_connections,
|
||||
)
|
||||
if changed:
|
||||
update_fields.append(field.name)
|
||||
|
@ -138,7 +159,11 @@ class ActivityObject:
|
|||
# too early and jank up users
|
||||
for field in instance.image_fields:
|
||||
changed = field.set_field_from_activity(
|
||||
instance, self, save=save, overwrite=overwrite
|
||||
instance,
|
||||
self,
|
||||
save=save,
|
||||
overwrite=overwrite,
|
||||
allow_external_connections=allow_external_connections,
|
||||
)
|
||||
if changed:
|
||||
update_fields.append(field.name)
|
||||
|
@ -162,7 +187,11 @@ class ActivityObject:
|
|||
# add many to many fields, which have to be set post-save
|
||||
for field in instance.many_to_many_fields:
|
||||
# mention books/users, for example
|
||||
field.set_field_from_activity(instance, self)
|
||||
field.set_field_from_activity(
|
||||
instance,
|
||||
self,
|
||||
allow_external_connections=allow_external_connections,
|
||||
)
|
||||
|
||||
# reversed relationships in the models
|
||||
for (
|
||||
|
@ -266,10 +295,22 @@ def get_model_from_type(activity_type):
|
|||
return model[0]
|
||||
|
||||
|
||||
# pylint: disable=too-many-arguments
|
||||
def resolve_remote_id(
|
||||
remote_id, model=None, refresh=False, save=True, get_activity=False
|
||||
remote_id,
|
||||
model=None,
|
||||
refresh=False,
|
||||
save=True,
|
||||
get_activity=False,
|
||||
allow_external_connections=True,
|
||||
):
|
||||
"""take a remote_id and return an instance, creating if necessary"""
|
||||
"""take a remote_id and return an instance, creating if necessary. Args:
|
||||
remote_id: the unique url for looking up the object in the db or by http
|
||||
model: a string or object representing the model that corresponds to the object
|
||||
save: whether to return an unsaved database entry or a saved one
|
||||
get_activity: whether to return the activitypub object or the model object
|
||||
allow_external_connections: whether to make http connections
|
||||
"""
|
||||
if model: # a bonus check we can do if we already know the model
|
||||
if isinstance(model, str):
|
||||
model = apps.get_model(f"bookwyrm.{model}", require_ready=True)
|
||||
|
@ -277,6 +318,13 @@ def resolve_remote_id(
|
|||
if result and not refresh:
|
||||
return result if not get_activity else result.to_activity_dataclass()
|
||||
|
||||
# The above block will return the object if it already exists in the database.
|
||||
# If it doesn't, an external connection would be needed, so check if that's cool
|
||||
if not allow_external_connections:
|
||||
raise ActivitySerializerError(
|
||||
"Unable to serialize object without making external HTTP requests"
|
||||
)
|
||||
|
||||
# load the data and create the object
|
||||
try:
|
||||
data = get_data(remote_id)
|
||||
|
|
|
@ -14,12 +14,12 @@ class Verb(ActivityObject):
|
|||
actor: str
|
||||
object: ActivityObject
|
||||
|
||||
def action(self):
|
||||
def action(self, allow_external_connections=True):
|
||||
"""usually we just want to update and save"""
|
||||
# self.object may return None if the object is invalid in an expected way
|
||||
# ie, Question type
|
||||
if self.object:
|
||||
self.object.to_model()
|
||||
self.object.to_model(allow_external_connections=allow_external_connections)
|
||||
|
||||
|
||||
# pylint: disable=invalid-name
|
||||
|
@ -42,7 +42,7 @@ class Delete(Verb):
|
|||
cc: List[str] = field(default_factory=lambda: [])
|
||||
type: str = "Delete"
|
||||
|
||||
def action(self):
|
||||
def action(self, allow_external_connections=True):
|
||||
"""find and delete the activity object"""
|
||||
if not self.object:
|
||||
return
|
||||
|
@ -52,7 +52,11 @@ class Delete(Verb):
|
|||
model = apps.get_model("bookwyrm.User")
|
||||
obj = model.find_existing_by_remote_id(self.object)
|
||||
else:
|
||||
obj = self.object.to_model(save=False, allow_create=False)
|
||||
obj = self.object.to_model(
|
||||
save=False,
|
||||
allow_create=False,
|
||||
allow_external_connections=allow_external_connections,
|
||||
)
|
||||
|
||||
if obj:
|
||||
obj.delete()
|
||||
|
@ -67,11 +71,13 @@ class Update(Verb):
|
|||
to: List[str]
|
||||
type: str = "Update"
|
||||
|
||||
def action(self):
|
||||
def action(self, allow_external_connections=True):
|
||||
"""update a model instance from the dataclass"""
|
||||
if not self.object:
|
||||
return
|
||||
self.object.to_model(allow_create=False)
|
||||
self.object.to_model(
|
||||
allow_create=False, allow_external_connections=allow_external_connections
|
||||
)
|
||||
|
||||
|
||||
@dataclass(init=False)
|
||||
|
@ -80,7 +86,7 @@ class Undo(Verb):
|
|||
|
||||
type: str = "Undo"
|
||||
|
||||
def action(self):
|
||||
def action(self, allow_external_connections=True):
|
||||
"""find and remove the activity object"""
|
||||
if isinstance(self.object, str):
|
||||
# it may be that something should be done with these, but idk what
|
||||
|
@ -92,13 +98,28 @@ class Undo(Verb):
|
|||
model = None
|
||||
if self.object.type == "Follow":
|
||||
model = apps.get_model("bookwyrm.UserFollows")
|
||||
obj = self.object.to_model(model=model, save=False, allow_create=False)
|
||||
obj = self.object.to_model(
|
||||
model=model,
|
||||
save=False,
|
||||
allow_create=False,
|
||||
allow_external_connections=allow_external_connections,
|
||||
)
|
||||
if not obj:
|
||||
# this could be a follow request not a follow proper
|
||||
model = apps.get_model("bookwyrm.UserFollowRequest")
|
||||
obj = self.object.to_model(model=model, save=False, allow_create=False)
|
||||
obj = self.object.to_model(
|
||||
model=model,
|
||||
save=False,
|
||||
allow_create=False,
|
||||
allow_external_connections=allow_external_connections,
|
||||
)
|
||||
else:
|
||||
obj = self.object.to_model(model=model, save=False, allow_create=False)
|
||||
obj = self.object.to_model(
|
||||
model=model,
|
||||
save=False,
|
||||
allow_create=False,
|
||||
allow_external_connections=allow_external_connections,
|
||||
)
|
||||
if not obj:
|
||||
# if we don't have the object, we can't undo it. happens a lot with boosts
|
||||
return
|
||||
|
@ -112,9 +133,9 @@ class Follow(Verb):
|
|||
object: str
|
||||
type: str = "Follow"
|
||||
|
||||
def action(self):
|
||||
def action(self, allow_external_connections=True):
|
||||
"""relationship save"""
|
||||
self.to_model()
|
||||
self.to_model(allow_external_connections=allow_external_connections)
|
||||
|
||||
|
||||
@dataclass(init=False)
|
||||
|
@ -124,9 +145,9 @@ class Block(Verb):
|
|||
object: str
|
||||
type: str = "Block"
|
||||
|
||||
def action(self):
|
||||
def action(self, allow_external_connections=True):
|
||||
"""relationship save"""
|
||||
self.to_model()
|
||||
self.to_model(allow_external_connections=allow_external_connections)
|
||||
|
||||
|
||||
@dataclass(init=False)
|
||||
|
@ -136,7 +157,7 @@ class Accept(Verb):
|
|||
object: Follow
|
||||
type: str = "Accept"
|
||||
|
||||
def action(self):
|
||||
def action(self, allow_external_connections=True):
|
||||
"""accept a request"""
|
||||
obj = self.object.to_model(save=False, allow_create=True)
|
||||
obj.accept()
|
||||
|
@ -149,7 +170,7 @@ class Reject(Verb):
|
|||
object: Follow
|
||||
type: str = "Reject"
|
||||
|
||||
def action(self):
|
||||
def action(self, allow_external_connections=True):
|
||||
"""reject a follow request"""
|
||||
obj = self.object.to_model(save=False, allow_create=False)
|
||||
obj.reject()
|
||||
|
@ -163,7 +184,7 @@ class Add(Verb):
|
|||
object: CollectionItem
|
||||
type: str = "Add"
|
||||
|
||||
def action(self):
|
||||
def action(self, allow_external_connections=True):
|
||||
"""figure out the target to assign the item to a collection"""
|
||||
target = resolve_remote_id(self.target)
|
||||
item = self.object.to_model(save=False)
|
||||
|
@ -177,7 +198,7 @@ class Remove(Add):
|
|||
|
||||
type: str = "Remove"
|
||||
|
||||
def action(self):
|
||||
def action(self, allow_external_connections=True):
|
||||
"""find and remove the activity object"""
|
||||
obj = self.object.to_model(save=False, allow_create=False)
|
||||
if obj:
|
||||
|
@ -191,9 +212,9 @@ class Like(Verb):
|
|||
object: str
|
||||
type: str = "Like"
|
||||
|
||||
def action(self):
|
||||
def action(self, allow_external_connections=True):
|
||||
"""like"""
|
||||
self.to_model()
|
||||
self.to_model(allow_external_connections=allow_external_connections)
|
||||
|
||||
|
||||
# pylint: disable=invalid-name
|
||||
|
@ -207,6 +228,6 @@ class Announce(Verb):
|
|||
object: str
|
||||
type: str = "Announce"
|
||||
|
||||
def action(self):
|
||||
def action(self, allow_external_connections=True):
|
||||
"""boost"""
|
||||
self.to_model()
|
||||
self.to_model(allow_external_connections=allow_external_connections)
|
||||
|
|
|
@ -67,7 +67,9 @@ class ActivitypubFieldMixin:
|
|||
self.activitypub_field = activitypub_field
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
def set_field_from_activity(self, instance, data, overwrite=True):
|
||||
def set_field_from_activity(
|
||||
self, instance, data, overwrite=True, allow_external_connections=True
|
||||
):
|
||||
"""helper function for assinging a value to the field. Returns if changed"""
|
||||
try:
|
||||
value = getattr(data, self.get_activitypub_field())
|
||||
|
@ -76,7 +78,9 @@ class ActivitypubFieldMixin:
|
|||
if self.get_activitypub_field() != "attributedTo":
|
||||
raise
|
||||
value = getattr(data, "actor")
|
||||
formatted = self.field_from_activity(value)
|
||||
formatted = self.field_from_activity(
|
||||
value, allow_external_connections=allow_external_connections
|
||||
)
|
||||
if formatted is None or formatted is MISSING or formatted == {}:
|
||||
return False
|
||||
|
||||
|
@ -116,7 +120,7 @@ class ActivitypubFieldMixin:
|
|||
return {self.activitypub_wrapper: value}
|
||||
return value
|
||||
|
||||
def field_from_activity(self, value):
|
||||
def field_from_activity(self, value, allow_external_connections=True):
|
||||
"""formatter to convert activitypub into a model value"""
|
||||
if value and hasattr(self, "activitypub_wrapper"):
|
||||
value = value.get(self.activitypub_wrapper)
|
||||
|
@ -138,7 +142,7 @@ class ActivitypubRelatedFieldMixin(ActivitypubFieldMixin):
|
|||
self.load_remote = load_remote
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
def field_from_activity(self, value):
|
||||
def field_from_activity(self, value, allow_external_connections=True):
|
||||
if not value:
|
||||
return None
|
||||
|
||||
|
@ -159,7 +163,11 @@ class ActivitypubRelatedFieldMixin(ActivitypubFieldMixin):
|
|||
if not self.load_remote:
|
||||
# only look in the local database
|
||||
return related_model.find_existing_by_remote_id(value)
|
||||
return activitypub.resolve_remote_id(value, model=related_model)
|
||||
return activitypub.resolve_remote_id(
|
||||
value,
|
||||
model=related_model,
|
||||
allow_external_connections=allow_external_connections,
|
||||
)
|
||||
|
||||
|
||||
class RemoteIdField(ActivitypubFieldMixin, models.CharField):
|
||||
|
@ -219,7 +227,9 @@ class PrivacyField(ActivitypubFieldMixin, models.CharField):
|
|||
super().__init__(*args, max_length=255, choices=PrivacyLevels, default="public")
|
||||
|
||||
# pylint: disable=invalid-name
|
||||
def set_field_from_activity(self, instance, data, overwrite=True):
|
||||
def set_field_from_activity(
|
||||
self, instance, data, overwrite=True, allow_external_connections=True
|
||||
):
|
||||
if not overwrite:
|
||||
return False
|
||||
|
||||
|
@ -234,7 +244,11 @@ class PrivacyField(ActivitypubFieldMixin, models.CharField):
|
|||
break
|
||||
if not user_field:
|
||||
raise ValidationError("No user field found for privacy", data)
|
||||
user = activitypub.resolve_remote_id(getattr(data, user_field), model="User")
|
||||
user = activitypub.resolve_remote_id(
|
||||
getattr(data, user_field),
|
||||
model="User",
|
||||
allow_external_connections=allow_external_connections,
|
||||
)
|
||||
|
||||
if to == [self.public]:
|
||||
setattr(instance, self.name, "public")
|
||||
|
@ -295,13 +309,17 @@ class ManyToManyField(ActivitypubFieldMixin, models.ManyToManyField):
|
|||
self.link_only = link_only
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
def set_field_from_activity(self, instance, data, overwrite=True):
|
||||
def set_field_from_activity(
|
||||
self, instance, data, overwrite=True, allow_external_connections=True
|
||||
):
|
||||
"""helper function for assigning a value to the field"""
|
||||
if not overwrite and getattr(instance, self.name).exists():
|
||||
return False
|
||||
|
||||
value = getattr(data, self.get_activitypub_field())
|
||||
formatted = self.field_from_activity(value)
|
||||
formatted = self.field_from_activity(
|
||||
value, allow_external_connections=allow_external_connections
|
||||
)
|
||||
if formatted is None or formatted is MISSING:
|
||||
return False
|
||||
getattr(instance, self.name).set(formatted)
|
||||
|
@ -313,7 +331,7 @@ class ManyToManyField(ActivitypubFieldMixin, models.ManyToManyField):
|
|||
return f"{value.instance.remote_id}/{self.name}"
|
||||
return [i.remote_id for i in value.all()]
|
||||
|
||||
def field_from_activity(self, value):
|
||||
def field_from_activity(self, value, allow_external_connections=True):
|
||||
if value is None or value is MISSING:
|
||||
return None
|
||||
if not isinstance(value, list):
|
||||
|
@ -326,7 +344,11 @@ class ManyToManyField(ActivitypubFieldMixin, models.ManyToManyField):
|
|||
except ValidationError:
|
||||
continue
|
||||
items.append(
|
||||
activitypub.resolve_remote_id(remote_id, model=self.related_model)
|
||||
activitypub.resolve_remote_id(
|
||||
remote_id,
|
||||
model=self.related_model,
|
||||
allow_external_connections=allow_external_connections,
|
||||
)
|
||||
)
|
||||
return items
|
||||
|
||||
|
@ -353,7 +375,7 @@ class TagField(ManyToManyField):
|
|||
)
|
||||
return tags
|
||||
|
||||
def field_from_activity(self, value):
|
||||
def field_from_activity(self, value, allow_external_connections=True):
|
||||
if not isinstance(value, list):
|
||||
return None
|
||||
items = []
|
||||
|
@ -366,7 +388,11 @@ class TagField(ManyToManyField):
|
|||
# tags can contain multiple types
|
||||
continue
|
||||
items.append(
|
||||
activitypub.resolve_remote_id(link.href, model=self.related_model)
|
||||
activitypub.resolve_remote_id(
|
||||
link.href,
|
||||
model=self.related_model,
|
||||
allow_external_connections=allow_external_connections,
|
||||
)
|
||||
)
|
||||
return items
|
||||
|
||||
|
@ -391,10 +417,14 @@ class ImageField(ActivitypubFieldMixin, models.ImageField):
|
|||
super().__init__(*args, **kwargs)
|
||||
|
||||
# pylint: disable=arguments-differ,arguments-renamed
|
||||
def set_field_from_activity(self, instance, data, save=True, overwrite=True):
|
||||
def set_field_from_activity(
|
||||
self, instance, data, save=True, overwrite=True, allow_external_connections=True
|
||||
):
|
||||
"""helper function for assinging a value to the field"""
|
||||
value = getattr(data, self.get_activitypub_field())
|
||||
formatted = self.field_from_activity(value)
|
||||
formatted = self.field_from_activity(
|
||||
value, allow_external_connections=allow_external_connections
|
||||
)
|
||||
if formatted is None or formatted is MISSING:
|
||||
return False
|
||||
|
||||
|
@ -426,7 +456,7 @@ class ImageField(ActivitypubFieldMixin, models.ImageField):
|
|||
|
||||
return activitypub.Document(url=url, name=alt)
|
||||
|
||||
def field_from_activity(self, value):
|
||||
def field_from_activity(self, value, allow_external_connections=True):
|
||||
image_slug = value
|
||||
# when it's an inline image (User avatar/icon, Book cover), it's a json
|
||||
# blob, but when it's an attached image, it's just a url
|
||||
|
@ -481,7 +511,7 @@ class DateTimeField(ActivitypubFieldMixin, models.DateTimeField):
|
|||
return None
|
||||
return value.isoformat()
|
||||
|
||||
def field_from_activity(self, value):
|
||||
def field_from_activity(self, value, allow_external_connections=True):
|
||||
try:
|
||||
date_value = dateutil.parser.parse(value)
|
||||
try:
|
||||
|
@ -495,7 +525,7 @@ class DateTimeField(ActivitypubFieldMixin, models.DateTimeField):
|
|||
class HtmlField(ActivitypubFieldMixin, models.TextField):
|
||||
"""a text field for storing html"""
|
||||
|
||||
def field_from_activity(self, value):
|
||||
def field_from_activity(self, value, allow_external_connections=True):
|
||||
if not value or value == MISSING:
|
||||
return None
|
||||
return clean(value)
|
||||
|
|
|
@ -64,7 +64,7 @@ class Inbox(View):
|
|||
high = ["Follow", "Accept", "Reject", "Block", "Unblock", "Undo"]
|
||||
|
||||
priority = HIGH if activity_json["type"] in high else MEDIUM
|
||||
activity_task.apply_async(args=(activity_json,), queue=priority)
|
||||
sometimes_async_activity_task(activity_json, queue=priority)
|
||||
return HttpResponse()
|
||||
|
||||
|
||||
|
@ -102,6 +102,19 @@ def raise_is_blocked_activity(activity_json):
|
|||
raise PermissionDenied()
|
||||
|
||||
|
||||
def sometimes_async_activity_task(activity_json, queue=MEDIUM):
|
||||
"""Sometimes we can effectively respond to a request without queuing a new task,
|
||||
and whever that is possible, we should do it."""
|
||||
activity = activitypub.parse(activity_json)
|
||||
|
||||
# try resolving this activity without making any http requests
|
||||
try:
|
||||
activity.action(allow_external_connections=False)
|
||||
except activitypub.ActivitySerializerError:
|
||||
# if that doesn't work, run it asynchronously
|
||||
activity_task.apply_async(args=(activity_json,), queue=queue)
|
||||
|
||||
|
||||
@app.task(queue=MEDIUM)
|
||||
def activity_task(activity_json):
|
||||
"""do something with this json we think is legit"""
|
||||
|
|
Loading…
Reference in a new issue