One more try to get the fetch_account/sync_pins/post relationship and parallelism fixed (#634)

This commit is contained in:
Osma Ahvenlampi 2023-08-27 00:16:14 +03:00 committed by GitHub
parent 555046ac4d
commit 2a0bbf0d5d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 53 additions and 30 deletions

View file

@ -834,25 +834,20 @@ class Post(StatorModel):
# If the author is not fetched yet, try again later # If the author is not fetched yet, try again later
if author.domain is None: if author.domain is None:
if fetch_author: if fetch_author:
author.fetch_actor() if not author.fetch_actor() or author.domain is None:
# perhaps the entire "try again" logic below
# could be replaced with TryAgainLater for
# _all_ fetches, to let it handle pinned posts?
if author.domain is None:
raise TryAgainLater() raise TryAgainLater()
else: else:
raise TryAgainLater() raise TryAgainLater()
# If the post is from a blocked domain, stop and drop # If the post is from a blocked domain, stop and drop
if author.domain.recursively_blocked(): if author.domain.recursively_blocked():
raise cls.DoesNotExist("Post is from a blocked domain") raise cls.DoesNotExist("Post is from a blocked domain")
# parallelism may cause another simultaneous worker thread
# to try to create the same post - so watch for that and
# try to avoid failing the entire transaction
try: try:
# try again, because fetch_actor() also fetches pinned posts # wrapped in a transaction to avoid breaking the outer
post = cls.objects.select_related("author__domain").get( # transaction
object_uri=data["id"] with transaction.atomic():
)
except cls.DoesNotExist:
# finally, create a stub
try:
post = cls.objects.create( post = cls.objects.create(
object_uri=data["id"], object_uri=data["id"],
author=author, author=author,
@ -861,24 +856,23 @@ class Post(StatorModel):
type=data["type"], type=data["type"],
) )
created = True created = True
except IntegrityError as dupe: except IntegrityError:
# there's still some kind of race condition here # despite previous checks, a parallel thread managed
# it's far more rare, but sometimes we fire an # to create the same object already
# IntegrityError on activities_post_object_uri_key post = cls.by_object_uri(object_uri=data["id"])
# this transaction is now aborted and anything following
# in the caller function will fail in the database.
raise TryAgainLater() from dupe
else: else:
raise cls.DoesNotExist(f"No post with ID {data['id']}", data) raise cls.DoesNotExist(f"No post with ID {data['id']}", data)
if update or created: if update or created:
post.type = data["type"] post.type = data["type"]
post.url = data.get("url", data["id"])
if post.type in (cls.Types.article, cls.Types.question): if post.type in (cls.Types.article, cls.Types.question):
post.type_data = PostTypeData(__root__=data).__root__ post.type_data = PostTypeData(__root__=data).__root__
try: try:
# apparently sometimes posts (Pages?) in the fediverse # apparently sometimes posts (Pages?) in the fediverse
# don't have content?! # don't have content, but this shouldn't be a total failure
post.content = get_value_or_map(data, "content", "contentMap") post.content = get_value_or_map(data, "content", "contentMap")
except KeyError: except ActivityPubFormatError as err:
capture_message(f"{err} on {post.url}")
post.content = None post.content = None
# Document types have names, not summaries # Document types have names, not summaries
post.summary = data.get("summary") or data.get("name") post.summary = data.get("summary") or data.get("name")
@ -886,7 +880,6 @@ class Post(StatorModel):
post.content = post.summary post.content = post.summary
post.summary = None post.summary = None
post.sensitive = data.get("sensitive", False) post.sensitive = data.get("sensitive", False)
post.url = data.get("url", data["id"])
post.published = parse_ld_date(data.get("published")) post.published = parse_ld_date(data.get("published"))
post.edited = parse_ld_date(data.get("updated")) post.edited = parse_ld_date(data.get("updated"))
post.in_reply_to = data.get("inReplyTo") post.in_reply_to = data.get("inReplyTo")
@ -930,6 +923,9 @@ class Post(StatorModel):
# These have no IDs, so we have to wipe them each time # These have no IDs, so we have to wipe them each time
post.attachments.all().delete() post.attachments.all().delete()
for attachment in get_list(data, "attachment"): for attachment in get_list(data, "attachment"):
if "url" not in attachment and "href" in attachment:
# Links have hrefs, while other Objects have urls
attachment["url"] = attachment["href"]
if "focalPoint" in attachment: if "focalPoint" in attachment:
try: try:
focal_x, focal_y = attachment["focalPoint"] focal_x, focal_y = attachment["focalPoint"]
@ -940,7 +936,9 @@ class Post(StatorModel):
mimetype = attachment.get("mediaType") mimetype = attachment.get("mediaType")
if not mimetype or not isinstance(mimetype, str): if not mimetype or not isinstance(mimetype, str):
if "url" not in attachment: if "url" not in attachment:
raise ActivityPubFormatError("No URL present on attachment") raise ActivityPubFormatError(
f"No URL present on attachment in {post.url}"
)
mimetype, _ = mimetypes.guess_type(attachment["url"]) mimetype, _ = mimetypes.guess_type(attachment["url"])
if not mimetype: if not mimetype:
mimetype = "application/octet-stream" mimetype = "application/octet-stream"
@ -956,7 +954,11 @@ class Post(StatorModel):
) )
# Calculate stats in case we have existing replies # Calculate stats in case we have existing replies
post.calculate_stats(save=False) post.calculate_stats(save=False)
with transaction.atomic():
# if we don't commit the transaction here, there's a chance
# the parent fetch below goes into an infinite loop
post.save() post.save()
# Potentially schedule a fetch of the reply parent, and recalculate # Potentially schedule a fetch of the reply parent, and recalculate
# its stats if it's here already. # its stats if it's here already.
if post.in_reply_to: if post.in_reply_to:

View file

@ -6,7 +6,7 @@ from urllib.parse import urlparse
import httpx import httpx
import urlman import urlman
from django.conf import settings from django.conf import settings
from django.db import IntegrityError, models from django.db import IntegrityError, models, transaction
from django.utils import timezone from django.utils import timezone
from django.utils.functional import lazy from django.utils.functional import lazy
from lxml import etree from lxml import etree
@ -439,7 +439,13 @@ class Identity(StatorModel):
# to the DB until the fetch succeeds # to the DB until the fetch succeeds
return cls(actor_uri=uri, local=False) return cls(actor_uri=uri, local=False)
else: else:
return cls.objects.create(actor_uri=uri, local=False) # parallelism may cause another simultaneous worker thread
# to try to create the same identity - so use database level
# constructs to avoid an integrity error
identity, created = cls.objects.update_or_create(
actor_uri=uri, local=False
)
return identity
else: else:
raise cls.DoesNotExist(f"No identity found with actor_uri {uri}") raise cls.DoesNotExist(f"No identity found with actor_uri {uri}")
@ -860,8 +866,19 @@ class Identity(StatorModel):
}, },
) )
return False return False
try:
document = canonicalise(response.json(), include_security=True) document = canonicalise(response.json(), include_security=True)
except ValueError:
# servers with empty or invalid responses are inevitable
capture_message(
f"Invalid response fetching actor at {self.actor_uri}",
extras={
"identity": self.pk,
"domain": self.domain_id,
"content": response.content,
},
)
return False
if "type" not in document: if "type" not in document:
return False return False
self.name = document.get("name") self.name = document.get("name")
@ -923,6 +940,9 @@ class Identity(StatorModel):
# Mark as fetched # Mark as fetched
self.fetched = timezone.now() self.fetched = timezone.now()
try: try:
with transaction.atomic():
# if we don't wrap this in its own transaction, the exception
# handler is guaranteed to fail
self.save() self.save()
except IntegrityError as e: except IntegrityError as e:
# See if we can fetch a PK and save there # See if we can fetch a PK and save there
@ -934,6 +954,7 @@ class Identity(StatorModel):
f"Could not save Identity at end of actor fetch: {e}" f"Could not save Identity at end of actor fetch: {e}"
) )
self.pk: int | None = other_row.pk self.pk: int | None = other_row.pk
with transaction.atomic():
self.save() self.save()
# Fetch pinned posts after identity has been fetched and saved # Fetch pinned posts after identity has been fetched and saved