mirror of
https://git.pleroma.social/pleroma/pleroma.git
synced 2025-01-10 17:25:26 +00:00
[#1149] Replaced RetryQueue with oban-based retries.
This commit is contained in:
parent
cdfd02e904
commit
23d279e03e
18 changed files with 106 additions and 395 deletions
config
docs
lib/pleroma
mix.exsmix.lockpriv/repo/migrations
test
|
@ -440,13 +440,7 @@ config :pleroma, Pleroma.User,
|
||||||
"web"
|
"web"
|
||||||
]
|
]
|
||||||
|
|
||||||
config :pleroma, Pleroma.Web.Federator.RetryQueue,
|
job_queues = [
|
||||||
enabled: false,
|
|
||||||
max_jobs: 20,
|
|
||||||
initial_timeout: 30,
|
|
||||||
max_retries: 5
|
|
||||||
|
|
||||||
config :pleroma_job_queue, :queues,
|
|
||||||
federator_incoming: 50,
|
federator_incoming: 50,
|
||||||
federator_outgoing: 50,
|
federator_outgoing: 50,
|
||||||
web_push: 50,
|
web_push: 50,
|
||||||
|
@ -454,6 +448,15 @@ config :pleroma_job_queue, :queues,
|
||||||
transmogrifier: 20,
|
transmogrifier: 20,
|
||||||
scheduled_activities: 10,
|
scheduled_activities: 10,
|
||||||
background: 5
|
background: 5
|
||||||
|
]
|
||||||
|
|
||||||
|
config :pleroma_job_queue, :queues, job_queues
|
||||||
|
|
||||||
|
config :pleroma, Oban,
|
||||||
|
repo: Pleroma.Repo,
|
||||||
|
verbose: false,
|
||||||
|
prune: {:maxage, 60 * 60 * 24 * 7},
|
||||||
|
queues: job_queues
|
||||||
|
|
||||||
config :pleroma, :fetch_initial_posts,
|
config :pleroma, :fetch_initial_posts,
|
||||||
enabled: false,
|
enabled: false,
|
||||||
|
|
|
@ -62,6 +62,10 @@ config :web_push_encryption, :http_client, Pleroma.Web.WebPushHttpClientMock
|
||||||
|
|
||||||
config :pleroma_job_queue, disabled: true
|
config :pleroma_job_queue, disabled: true
|
||||||
|
|
||||||
|
config :pleroma, Oban,
|
||||||
|
queues: false,
|
||||||
|
prune: :disabled
|
||||||
|
|
||||||
config :pleroma, Pleroma.ScheduledActivity,
|
config :pleroma, Pleroma.ScheduledActivity,
|
||||||
daily_user_limit: 2,
|
daily_user_limit: 2,
|
||||||
total_user_limit: 3,
|
total_user_limit: 3,
|
||||||
|
|
|
@ -412,13 +412,6 @@ config :pleroma_job_queue, :queues,
|
||||||
|
|
||||||
This config contains two queues: `federator_incoming` and `federator_outgoing`. Both have the `max_jobs` set to `50`.
|
This config contains two queues: `federator_incoming` and `federator_outgoing`. Both have the `max_jobs` set to `50`.
|
||||||
|
|
||||||
## Pleroma.Web.Federator.RetryQueue
|
|
||||||
|
|
||||||
* `enabled`: If set to `true`, failed federation jobs will be retried
|
|
||||||
* `max_jobs`: The maximum amount of parallel federation jobs running at the same time.
|
|
||||||
* `initial_timeout`: The initial timeout in seconds
|
|
||||||
* `max_retries`: The maximum number of times a federation job is retried
|
|
||||||
|
|
||||||
## Pleroma.Web.Metadata
|
## Pleroma.Web.Metadata
|
||||||
* `providers`: a list of metadata providers to enable. Providers available:
|
* `providers`: a list of metadata providers to enable. Providers available:
|
||||||
* Pleroma.Web.Metadata.Providers.OpenGraph
|
* Pleroma.Web.Metadata.Providers.OpenGraph
|
||||||
|
|
|
@ -120,8 +120,8 @@ defmodule Pleroma.Application do
|
||||||
hackney_pool_children() ++
|
hackney_pool_children() ++
|
||||||
[
|
[
|
||||||
%{
|
%{
|
||||||
id: Pleroma.Web.Federator.RetryQueue,
|
id: Oban,
|
||||||
start: {Pleroma.Web.Federator.RetryQueue, :start_link, []}
|
start: {Oban, :start_link, [Application.get_env(:pleroma, Oban)]}
|
||||||
},
|
},
|
||||||
%{
|
%{
|
||||||
id: Pleroma.Web.OAuth.Token.CleanWorker,
|
id: Pleroma.Web.OAuth.Token.CleanWorker,
|
||||||
|
|
|
@ -85,6 +85,15 @@ defmodule Pleroma.Web.ActivityPub.Publisher do
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def publish_one(%{actor_id: actor_id} = params) do
|
||||||
|
actor = User.get_by_id(actor_id)
|
||||||
|
|
||||||
|
params
|
||||||
|
|> Map.delete(:actor_id)
|
||||||
|
|> Map.put(:actor, actor)
|
||||||
|
|> publish_one()
|
||||||
|
end
|
||||||
|
|
||||||
defp should_federate?(inbox, public) do
|
defp should_federate?(inbox, public) do
|
||||||
if public do
|
if public do
|
||||||
true
|
true
|
||||||
|
@ -160,7 +169,8 @@ defmodule Pleroma.Web.ActivityPub.Publisher do
|
||||||
Publishes an activity with BCC to all relevant peers.
|
Publishes an activity with BCC to all relevant peers.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def publish(actor, %{data: %{"bcc" => bcc}} = activity) when is_list(bcc) and bcc != [] do
|
def publish(%User{} = actor, %{data: %{"bcc" => bcc}} = activity)
|
||||||
|
when is_list(bcc) and bcc != [] do
|
||||||
public = is_public?(activity)
|
public = is_public?(activity)
|
||||||
{:ok, data} = Transmogrifier.prepare_outgoing(activity.data)
|
{:ok, data} = Transmogrifier.prepare_outgoing(activity.data)
|
||||||
|
|
||||||
|
@ -187,7 +197,7 @@ defmodule Pleroma.Web.ActivityPub.Publisher do
|
||||||
Pleroma.Web.Federator.Publisher.enqueue_one(__MODULE__, %{
|
Pleroma.Web.Federator.Publisher.enqueue_one(__MODULE__, %{
|
||||||
inbox: inbox,
|
inbox: inbox,
|
||||||
json: json,
|
json: json,
|
||||||
actor: actor,
|
actor_id: actor.id,
|
||||||
id: activity.data["id"],
|
id: activity.data["id"],
|
||||||
unreachable_since: unreachable_since
|
unreachable_since: unreachable_since
|
||||||
})
|
})
|
||||||
|
@ -222,7 +232,7 @@ defmodule Pleroma.Web.ActivityPub.Publisher do
|
||||||
%{
|
%{
|
||||||
inbox: inbox,
|
inbox: inbox,
|
||||||
json: json,
|
json: json,
|
||||||
actor: actor,
|
actor_id: actor.id,
|
||||||
id: activity.data["id"],
|
id: activity.data["id"],
|
||||||
unreachable_since: unreachable_since
|
unreachable_since: unreachable_since
|
||||||
}
|
}
|
||||||
|
|
|
@ -10,7 +10,6 @@ defmodule Pleroma.Web.Federator do
|
||||||
alias Pleroma.Web.ActivityPub.Transmogrifier
|
alias Pleroma.Web.ActivityPub.Transmogrifier
|
||||||
alias Pleroma.Web.ActivityPub.Utils
|
alias Pleroma.Web.ActivityPub.Utils
|
||||||
alias Pleroma.Web.Federator.Publisher
|
alias Pleroma.Web.Federator.Publisher
|
||||||
alias Pleroma.Web.Federator.RetryQueue
|
|
||||||
alias Pleroma.Web.OStatus
|
alias Pleroma.Web.OStatus
|
||||||
alias Pleroma.Web.Websub
|
alias Pleroma.Web.Websub
|
||||||
|
|
||||||
|
@ -130,19 +129,6 @@ defmodule Pleroma.Web.Federator do
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
def perform(
|
|
||||||
:publish_single_websub,
|
|
||||||
%{xml: _xml, topic: _topic, callback: _callback, secret: _secret} = params
|
|
||||||
) do
|
|
||||||
case Websub.publish_one(params) do
|
|
||||||
{:ok, _} ->
|
|
||||||
:ok
|
|
||||||
|
|
||||||
{:error, _} ->
|
|
||||||
RetryQueue.enqueue(params, Websub)
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
def perform(type, _) do
|
def perform(type, _) do
|
||||||
Logger.debug(fn -> "Unknown task: #{type}" end)
|
Logger.debug(fn -> "Unknown task: #{type}" end)
|
||||||
{:error, "Don't know what to do with this"}
|
{:error, "Don't know what to do with this"}
|
||||||
|
|
|
@ -6,7 +6,6 @@ defmodule Pleroma.Web.Federator.Publisher do
|
||||||
alias Pleroma.Activity
|
alias Pleroma.Activity
|
||||||
alias Pleroma.Config
|
alias Pleroma.Config
|
||||||
alias Pleroma.User
|
alias Pleroma.User
|
||||||
alias Pleroma.Web.Federator.RetryQueue
|
|
||||||
|
|
||||||
require Logger
|
require Logger
|
||||||
|
|
||||||
|
@ -30,23 +29,10 @@ defmodule Pleroma.Web.Federator.Publisher do
|
||||||
Enqueue publishing a single activity.
|
Enqueue publishing a single activity.
|
||||||
"""
|
"""
|
||||||
@spec enqueue_one(module(), Map.t()) :: :ok
|
@spec enqueue_one(module(), Map.t()) :: :ok
|
||||||
def enqueue_one(module, %{} = params),
|
def enqueue_one(module, %{} = params) do
|
||||||
do: PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:publish_one, module, params])
|
%{module: to_string(module), params: params}
|
||||||
|
|> Pleroma.Workers.Publisher.new()
|
||||||
@spec perform(atom(), module(), any()) :: {:ok, any()} | {:error, any()}
|
|> Pleroma.Repo.insert()
|
||||||
def perform(:publish_one, module, params) do
|
|
||||||
case apply(module, :publish_one, [params]) do
|
|
||||||
{:ok, _} ->
|
|
||||||
:ok
|
|
||||||
|
|
||||||
{:error, _e} ->
|
|
||||||
RetryQueue.enqueue(params, module)
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
def perform(type, _, _) do
|
|
||||||
Logger.debug("Unknown task: #{type}")
|
|
||||||
{:error, "Don't know what to do with this"}
|
|
||||||
end
|
end
|
||||||
|
|
||||||
@doc """
|
@doc """
|
||||||
|
|
|
@ -1,239 +0,0 @@
|
||||||
# Pleroma: A lightweight social networking server
|
|
||||||
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
|
|
||||||
# SPDX-License-Identifier: AGPL-3.0-only
|
|
||||||
|
|
||||||
defmodule Pleroma.Web.Federator.RetryQueue do
|
|
||||||
use GenServer
|
|
||||||
|
|
||||||
require Logger
|
|
||||||
|
|
||||||
def init(args) do
|
|
||||||
queue_table = :ets.new(:pleroma_retry_queue, [:bag, :protected])
|
|
||||||
|
|
||||||
{:ok, %{args | queue_table: queue_table, running_jobs: :sets.new()}}
|
|
||||||
end
|
|
||||||
|
|
||||||
def start_link do
|
|
||||||
enabled =
|
|
||||||
if Pleroma.Config.get(:env) == :test,
|
|
||||||
do: true,
|
|
||||||
else: Pleroma.Config.get([__MODULE__, :enabled], false)
|
|
||||||
|
|
||||||
if enabled do
|
|
||||||
Logger.info("Starting retry queue")
|
|
||||||
|
|
||||||
linkres =
|
|
||||||
GenServer.start_link(
|
|
||||||
__MODULE__,
|
|
||||||
%{delivered: 0, dropped: 0, queue_table: nil, running_jobs: nil},
|
|
||||||
name: __MODULE__
|
|
||||||
)
|
|
||||||
|
|
||||||
maybe_kickoff_timer()
|
|
||||||
linkres
|
|
||||||
else
|
|
||||||
Logger.info("Retry queue disabled")
|
|
||||||
:ignore
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
def enqueue(data, transport, retries \\ 0) do
|
|
||||||
GenServer.cast(__MODULE__, {:maybe_enqueue, data, transport, retries + 1})
|
|
||||||
end
|
|
||||||
|
|
||||||
def get_stats do
|
|
||||||
GenServer.call(__MODULE__, :get_stats)
|
|
||||||
end
|
|
||||||
|
|
||||||
def reset_stats do
|
|
||||||
GenServer.call(__MODULE__, :reset_stats)
|
|
||||||
end
|
|
||||||
|
|
||||||
def get_retry_params(retries) do
|
|
||||||
if retries > Pleroma.Config.get([__MODULE__, :max_retries]) do
|
|
||||||
{:drop, "Max retries reached"}
|
|
||||||
else
|
|
||||||
{:retry, growth_function(retries)}
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
def get_retry_timer_interval do
|
|
||||||
Pleroma.Config.get([:retry_queue, :interval], 1000)
|
|
||||||
end
|
|
||||||
|
|
||||||
defp ets_count_expires(table, current_time) do
|
|
||||||
:ets.select_count(
|
|
||||||
table,
|
|
||||||
[
|
|
||||||
{
|
|
||||||
{:"$1", :"$2"},
|
|
||||||
[{:"=<", :"$1", {:const, current_time}}],
|
|
||||||
[true]
|
|
||||||
}
|
|
||||||
]
|
|
||||||
)
|
|
||||||
end
|
|
||||||
|
|
||||||
defp ets_pop_n_expired(table, current_time, desired) do
|
|
||||||
{popped, _continuation} =
|
|
||||||
:ets.select(
|
|
||||||
table,
|
|
||||||
[
|
|
||||||
{
|
|
||||||
{:"$1", :"$2"},
|
|
||||||
[{:"=<", :"$1", {:const, current_time}}],
|
|
||||||
[:"$_"]
|
|
||||||
}
|
|
||||||
],
|
|
||||||
desired
|
|
||||||
)
|
|
||||||
|
|
||||||
popped
|
|
||||||
|> Enum.each(fn e ->
|
|
||||||
:ets.delete_object(table, e)
|
|
||||||
end)
|
|
||||||
|
|
||||||
popped
|
|
||||||
end
|
|
||||||
|
|
||||||
def maybe_start_job(running_jobs, queue_table) do
|
|
||||||
# we don't want to hit the ets or the DateTime more times than we have to
|
|
||||||
# could optimize slightly further by not using the count, and instead grabbing
|
|
||||||
# up to N objects early...
|
|
||||||
current_time = DateTime.to_unix(DateTime.utc_now())
|
|
||||||
n_running_jobs = :sets.size(running_jobs)
|
|
||||||
|
|
||||||
if n_running_jobs < Pleroma.Config.get([__MODULE__, :max_jobs]) do
|
|
||||||
n_ready_jobs = ets_count_expires(queue_table, current_time)
|
|
||||||
|
|
||||||
if n_ready_jobs > 0 do
|
|
||||||
# figure out how many we could start
|
|
||||||
available_job_slots = Pleroma.Config.get([__MODULE__, :max_jobs]) - n_running_jobs
|
|
||||||
start_n_jobs(running_jobs, queue_table, current_time, available_job_slots)
|
|
||||||
else
|
|
||||||
running_jobs
|
|
||||||
end
|
|
||||||
else
|
|
||||||
running_jobs
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
defp start_n_jobs(running_jobs, _queue_table, _current_time, 0) do
|
|
||||||
running_jobs
|
|
||||||
end
|
|
||||||
|
|
||||||
defp start_n_jobs(running_jobs, queue_table, current_time, available_job_slots)
|
|
||||||
when available_job_slots > 0 do
|
|
||||||
candidates = ets_pop_n_expired(queue_table, current_time, available_job_slots)
|
|
||||||
|
|
||||||
candidates
|
|
||||||
|> List.foldl(running_jobs, fn {_, e}, rj ->
|
|
||||||
{:ok, pid} = Task.start(fn -> worker(e) end)
|
|
||||||
mref = Process.monitor(pid)
|
|
||||||
:sets.add_element(mref, rj)
|
|
||||||
end)
|
|
||||||
end
|
|
||||||
|
|
||||||
def worker({:send, data, transport, retries}) do
|
|
||||||
case transport.publish_one(data) do
|
|
||||||
{:ok, _} ->
|
|
||||||
GenServer.cast(__MODULE__, :inc_delivered)
|
|
||||||
:delivered
|
|
||||||
|
|
||||||
{:error, _reason} ->
|
|
||||||
enqueue(data, transport, retries)
|
|
||||||
:retry
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
def handle_call(:get_stats, _from, %{delivered: delivery_count, dropped: drop_count} = state) do
|
|
||||||
{:reply, %{delivered: delivery_count, dropped: drop_count}, state}
|
|
||||||
end
|
|
||||||
|
|
||||||
def handle_call(:reset_stats, _from, %{delivered: delivery_count, dropped: drop_count} = state) do
|
|
||||||
{:reply, %{delivered: delivery_count, dropped: drop_count},
|
|
||||||
%{state | delivered: 0, dropped: 0}}
|
|
||||||
end
|
|
||||||
|
|
||||||
def handle_cast(:reset_stats, state) do
|
|
||||||
{:noreply, %{state | delivered: 0, dropped: 0}}
|
|
||||||
end
|
|
||||||
|
|
||||||
def handle_cast(
|
|
||||||
{:maybe_enqueue, data, transport, retries},
|
|
||||||
%{dropped: drop_count, queue_table: queue_table, running_jobs: running_jobs} = state
|
|
||||||
) do
|
|
||||||
case get_retry_params(retries) do
|
|
||||||
{:retry, timeout} ->
|
|
||||||
:ets.insert(queue_table, {timeout, {:send, data, transport, retries}})
|
|
||||||
running_jobs = maybe_start_job(running_jobs, queue_table)
|
|
||||||
{:noreply, %{state | running_jobs: running_jobs}}
|
|
||||||
|
|
||||||
{:drop, message} ->
|
|
||||||
Logger.debug(message)
|
|
||||||
{:noreply, %{state | dropped: drop_count + 1}}
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
def handle_cast(:kickoff_timer, state) do
|
|
||||||
retry_interval = get_retry_timer_interval()
|
|
||||||
Process.send_after(__MODULE__, :retry_timer_run, retry_interval)
|
|
||||||
{:noreply, state}
|
|
||||||
end
|
|
||||||
|
|
||||||
def handle_cast(:inc_delivered, %{delivered: delivery_count} = state) do
|
|
||||||
{:noreply, %{state | delivered: delivery_count + 1}}
|
|
||||||
end
|
|
||||||
|
|
||||||
def handle_cast(:inc_dropped, %{dropped: drop_count} = state) do
|
|
||||||
{:noreply, %{state | dropped: drop_count + 1}}
|
|
||||||
end
|
|
||||||
|
|
||||||
def handle_info({:send, data, transport, retries}, %{delivered: delivery_count} = state) do
|
|
||||||
case transport.publish_one(data) do
|
|
||||||
{:ok, _} ->
|
|
||||||
{:noreply, %{state | delivered: delivery_count + 1}}
|
|
||||||
|
|
||||||
{:error, _reason} ->
|
|
||||||
enqueue(data, transport, retries)
|
|
||||||
{:noreply, state}
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
def handle_info(
|
|
||||||
:retry_timer_run,
|
|
||||||
%{queue_table: queue_table, running_jobs: running_jobs} = state
|
|
||||||
) do
|
|
||||||
maybe_kickoff_timer()
|
|
||||||
running_jobs = maybe_start_job(running_jobs, queue_table)
|
|
||||||
{:noreply, %{state | running_jobs: running_jobs}}
|
|
||||||
end
|
|
||||||
|
|
||||||
def handle_info({:DOWN, ref, :process, _pid, _reason}, state) do
|
|
||||||
%{running_jobs: running_jobs, queue_table: queue_table} = state
|
|
||||||
running_jobs = :sets.del_element(ref, running_jobs)
|
|
||||||
running_jobs = maybe_start_job(running_jobs, queue_table)
|
|
||||||
{:noreply, %{state | running_jobs: running_jobs}}
|
|
||||||
end
|
|
||||||
|
|
||||||
def handle_info(unknown, state) do
|
|
||||||
Logger.debug("RetryQueue: don't know what to do with #{inspect(unknown)}, ignoring")
|
|
||||||
{:noreply, state}
|
|
||||||
end
|
|
||||||
|
|
||||||
if Pleroma.Config.get(:env) == :test do
|
|
||||||
defp growth_function(_retries) do
|
|
||||||
_shutit = Pleroma.Config.get([__MODULE__, :initial_timeout])
|
|
||||||
DateTime.to_unix(DateTime.utc_now()) - 1
|
|
||||||
end
|
|
||||||
else
|
|
||||||
defp growth_function(retries) do
|
|
||||||
round(Pleroma.Config.get([__MODULE__, :initial_timeout]) * :math.pow(retries, 3)) +
|
|
||||||
DateTime.to_unix(DateTime.utc_now())
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
defp maybe_kickoff_timer do
|
|
||||||
GenServer.cast(__MODULE__, :kickoff_timer)
|
|
||||||
end
|
|
||||||
end
|
|
|
@ -170,6 +170,15 @@ defmodule Pleroma.Web.Salmon do
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def publish_one(%{recipient_id: recipient_id} = params) do
|
||||||
|
recipient = User.get_by_id(recipient_id)
|
||||||
|
|
||||||
|
params
|
||||||
|
|> Map.delete(:recipient_id)
|
||||||
|
|> Map.put(:recipient, recipient)
|
||||||
|
|> publish_one()
|
||||||
|
end
|
||||||
|
|
||||||
def publish_one(_), do: :noop
|
def publish_one(_), do: :noop
|
||||||
|
|
||||||
@supported_activities [
|
@supported_activities [
|
||||||
|
@ -218,7 +227,7 @@ defmodule Pleroma.Web.Salmon do
|
||||||
Logger.debug(fn -> "Sending Salmon to #{remote_user.ap_id}" end)
|
Logger.debug(fn -> "Sending Salmon to #{remote_user.ap_id}" end)
|
||||||
|
|
||||||
Publisher.enqueue_one(__MODULE__, %{
|
Publisher.enqueue_one(__MODULE__, %{
|
||||||
recipient: remote_user,
|
recipient_id: remote_user.id,
|
||||||
feed: feed,
|
feed: feed,
|
||||||
unreachable_since: reachable_urls_metadata[remote_user.info.salmon]
|
unreachable_since: reachable_urls_metadata[remote_user.info.salmon]
|
||||||
})
|
})
|
||||||
|
|
14
lib/pleroma/workers/publisher.ex
Normal file
14
lib/pleroma/workers/publisher.ex
Normal file
|
@ -0,0 +1,14 @@
|
||||||
|
# Pleroma: A lightweight social networking server
|
||||||
|
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
|
||||||
|
# SPDX-License-Identifier: AGPL-3.0-only
|
||||||
|
|
||||||
|
defmodule Pleroma.Workers.Publisher do
|
||||||
|
use Oban.Worker, queue: "federator_outgoing", max_attempts: 5
|
||||||
|
|
||||||
|
@impl Oban.Worker
|
||||||
|
def perform(%Oban.Job{args: %{module: module_name, params: params}}) do
|
||||||
|
module_name
|
||||||
|
|> String.to_atom()
|
||||||
|
|> apply(:publish_one, [params])
|
||||||
|
end
|
||||||
|
end
|
1
mix.exs
1
mix.exs
|
@ -101,6 +101,7 @@ defmodule Pleroma.Mixfile do
|
||||||
{:phoenix_ecto, "~> 4.0"},
|
{:phoenix_ecto, "~> 4.0"},
|
||||||
{:ecto_sql, "~> 3.1"},
|
{:ecto_sql, "~> 3.1"},
|
||||||
{:postgrex, ">= 0.13.5"},
|
{:postgrex, ">= 0.13.5"},
|
||||||
|
{:oban, "~> 0.6"},
|
||||||
{:gettext, "~> 0.15"},
|
{:gettext, "~> 0.15"},
|
||||||
{:comeonin, "~> 4.1.1"},
|
{:comeonin, "~> 4.1.1"},
|
||||||
{:pbkdf2_elixir, "~> 0.12.3"},
|
{:pbkdf2_elixir, "~> 0.12.3"},
|
||||||
|
|
1
mix.lock
1
mix.lock
|
@ -55,6 +55,7 @@
|
||||||
"mogrify": {:hex, :mogrify, "0.6.1", "de1b527514f2d95a7bbe9642eb556061afb337e220cf97adbf3a4e6438ed70af", [:mix], [], "hexpm"},
|
"mogrify": {:hex, :mogrify, "0.6.1", "de1b527514f2d95a7bbe9642eb556061afb337e220cf97adbf3a4e6438ed70af", [:mix], [], "hexpm"},
|
||||||
"mox": {:hex, :mox, "0.5.1", "f86bb36026aac1e6f924a4b6d024b05e9adbed5c63e8daa069bd66fb3292165b", [:mix], [], "hexpm"},
|
"mox": {:hex, :mox, "0.5.1", "f86bb36026aac1e6f924a4b6d024b05e9adbed5c63e8daa069bd66fb3292165b", [:mix], [], "hexpm"},
|
||||||
"nimble_parsec": {:hex, :nimble_parsec, "0.5.0", "90e2eca3d0266e5c53f8fbe0079694740b9c91b6747f2b7e3c5d21966bba8300", [:mix], [], "hexpm"},
|
"nimble_parsec": {:hex, :nimble_parsec, "0.5.0", "90e2eca3d0266e5c53f8fbe0079694740b9c91b6747f2b7e3c5d21966bba8300", [:mix], [], "hexpm"},
|
||||||
|
"oban": {:hex, :oban, "0.6.0", "8b9b861355610e703e58a878bc29959f3f0e1b4cd1e90d785cf2bb2498d3b893", [:mix], [{:ecto_sql, "~> 3.1", [hex: :ecto_sql, repo: "hexpm", optional: false]}, {:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: false]}, {:postgrex, "~> 0.14", [hex: :postgrex, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm"},
|
||||||
"parse_trans": {:hex, :parse_trans, "3.3.0", "09765507a3c7590a784615cfd421d101aec25098d50b89d7aa1d66646bc571c1", [:rebar3], [], "hexpm"},
|
"parse_trans": {:hex, :parse_trans, "3.3.0", "09765507a3c7590a784615cfd421d101aec25098d50b89d7aa1d66646bc571c1", [:rebar3], [], "hexpm"},
|
||||||
"pbkdf2_elixir": {:hex, :pbkdf2_elixir, "0.12.3", "6706a148809a29c306062862c803406e88f048277f6e85b68faf73291e820b84", [:mix], [], "hexpm"},
|
"pbkdf2_elixir": {:hex, :pbkdf2_elixir, "0.12.3", "6706a148809a29c306062862c803406e88f048277f6e85b68faf73291e820b84", [:mix], [], "hexpm"},
|
||||||
"phoenix": {:hex, :phoenix, "1.4.9", "746d098e10741c334d88143d3c94cab1756435f94387a63441792e66ec0ee974", [:mix], [{:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:phoenix_pubsub, "~> 1.1", [hex: :phoenix_pubsub, repo: "hexpm", optional: false]}, {:plug, "~> 1.8.1 or ~> 1.9", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 1.0 or ~> 2.0", [hex: :plug_cowboy, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm"},
|
"phoenix": {:hex, :phoenix, "1.4.9", "746d098e10741c334d88143d3c94cab1756435f94387a63441792e66ec0ee974", [:mix], [{:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:phoenix_pubsub, "~> 1.1", [hex: :phoenix_pubsub, repo: "hexpm", optional: false]}, {:plug, "~> 1.8.1 or ~> 1.9", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 1.0 or ~> 2.0", [hex: :plug_cowboy, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm"},
|
||||||
|
|
|
@ -0,0 +1,6 @@
|
||||||
|
defmodule Pleroma.Repo.Migrations.AddObanJobsTable do
|
||||||
|
use Ecto.Migration
|
||||||
|
|
||||||
|
defdelegate up, to: Oban.Migrations
|
||||||
|
defdelegate down, to: Oban.Migrations
|
||||||
|
end
|
|
@ -12,9 +12,9 @@ defmodule Pleroma.UserTest do
|
||||||
alias Pleroma.Web.CommonAPI
|
alias Pleroma.Web.CommonAPI
|
||||||
|
|
||||||
use Pleroma.DataCase
|
use Pleroma.DataCase
|
||||||
|
use Oban.Testing, repo: Pleroma.Repo
|
||||||
|
|
||||||
import Pleroma.Factory
|
import Pleroma.Factory
|
||||||
import Mock
|
|
||||||
|
|
||||||
setup_all do
|
setup_all do
|
||||||
Tesla.Mock.mock_global(fn env -> apply(HttpRequestMock, :request, [env]) end)
|
Tesla.Mock.mock_global(fn env -> apply(HttpRequestMock, :request, [env]) end)
|
||||||
|
@ -1034,11 +1034,7 @@ defmodule Pleroma.UserTest do
|
||||||
refute Activity.get_by_id(repeat.id)
|
refute Activity.get_by_id(repeat.id)
|
||||||
end
|
end
|
||||||
|
|
||||||
test_with_mock "it sends out User Delete activity",
|
test "it sends out User Delete activity", %{user: user} do
|
||||||
%{user: user},
|
|
||||||
Pleroma.Web.ActivityPub.Publisher,
|
|
||||||
[:passthrough],
|
|
||||||
[] do
|
|
||||||
config_path = [:instance, :federating]
|
config_path = [:instance, :federating]
|
||||||
initial_setting = Pleroma.Config.get(config_path)
|
initial_setting = Pleroma.Config.get(config_path)
|
||||||
Pleroma.Config.put(config_path, true)
|
Pleroma.Config.put(config_path, true)
|
||||||
|
@ -1048,11 +1044,8 @@ defmodule Pleroma.UserTest do
|
||||||
|
|
||||||
{:ok, _user} = User.delete(user)
|
{:ok, _user} = User.delete(user)
|
||||||
|
|
||||||
assert called(
|
assert [%{args: %{"params" => %{"inbox" => "http://mastodon.example.org/inbox"}}}] =
|
||||||
Pleroma.Web.ActivityPub.Publisher.publish_one(%{
|
all_enqueued(worker: Pleroma.Workers.Publisher)
|
||||||
inbox: "http://mastodon.example.org/inbox"
|
|
||||||
})
|
|
||||||
)
|
|
||||||
|
|
||||||
Pleroma.Config.put(config_path, initial_setting)
|
Pleroma.Config.put(config_path, initial_setting)
|
||||||
end
|
end
|
||||||
|
|
|
@ -257,7 +257,7 @@ defmodule Pleroma.Web.ActivityPub.PublisherTest do
|
||||||
assert called(
|
assert called(
|
||||||
Pleroma.Web.Federator.Publisher.enqueue_one(Publisher, %{
|
Pleroma.Web.Federator.Publisher.enqueue_one(Publisher, %{
|
||||||
inbox: "https://domain.com/users/nick1/inbox",
|
inbox: "https://domain.com/users/nick1/inbox",
|
||||||
actor: actor,
|
actor_id: actor.id,
|
||||||
id: note_activity.data["id"]
|
id: note_activity.data["id"]
|
||||||
})
|
})
|
||||||
)
|
)
|
||||||
|
|
|
@ -6,7 +6,10 @@ defmodule Pleroma.Web.FederatorTest do
|
||||||
alias Pleroma.Instances
|
alias Pleroma.Instances
|
||||||
alias Pleroma.Web.CommonAPI
|
alias Pleroma.Web.CommonAPI
|
||||||
alias Pleroma.Web.Federator
|
alias Pleroma.Web.Federator
|
||||||
|
|
||||||
use Pleroma.DataCase
|
use Pleroma.DataCase
|
||||||
|
use Oban.Testing, repo: Pleroma.Repo
|
||||||
|
|
||||||
import Pleroma.Factory
|
import Pleroma.Factory
|
||||||
import Mock
|
import Mock
|
||||||
|
|
||||||
|
@ -22,15 +25,6 @@ defmodule Pleroma.Web.FederatorTest do
|
||||||
:ok
|
:ok
|
||||||
end
|
end
|
||||||
|
|
||||||
describe "Publisher.perform" do
|
|
||||||
test "call `perform` with unknown task" do
|
|
||||||
assert {
|
|
||||||
:error,
|
|
||||||
"Don't know what to do with this"
|
|
||||||
} = Pleroma.Web.Federator.Publisher.perform("test", :ok, :ok)
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
describe "Publish an activity" do
|
describe "Publish an activity" do
|
||||||
setup do
|
setup do
|
||||||
user = insert(:user)
|
user = insert(:user)
|
||||||
|
@ -73,10 +67,7 @@ defmodule Pleroma.Web.FederatorTest do
|
||||||
end
|
end
|
||||||
|
|
||||||
describe "Targets reachability filtering in `publish`" do
|
describe "Targets reachability filtering in `publish`" do
|
||||||
test_with_mock "it federates only to reachable instances via AP",
|
test "it federates only to reachable instances via AP" do
|
||||||
Pleroma.Web.ActivityPub.Publisher,
|
|
||||||
[:passthrough],
|
|
||||||
[] do
|
|
||||||
user = insert(:user)
|
user = insert(:user)
|
||||||
|
|
||||||
{inbox1, inbox2} =
|
{inbox1, inbox2} =
|
||||||
|
@ -104,20 +95,13 @@ defmodule Pleroma.Web.FederatorTest do
|
||||||
{:ok, _activity} =
|
{:ok, _activity} =
|
||||||
CommonAPI.post(user, %{"status" => "HI @nick1@domain.com, @nick2@domain2.com!"})
|
CommonAPI.post(user, %{"status" => "HI @nick1@domain.com, @nick2@domain2.com!"})
|
||||||
|
|
||||||
assert called(
|
expected_dt = NaiveDateTime.to_iso8601(dt)
|
||||||
Pleroma.Web.ActivityPub.Publisher.publish_one(%{
|
|
||||||
inbox: inbox1,
|
|
||||||
unreachable_since: dt
|
|
||||||
})
|
|
||||||
)
|
|
||||||
|
|
||||||
refute called(Pleroma.Web.ActivityPub.Publisher.publish_one(%{inbox: inbox2}))
|
assert [%{args: %{"params" => %{"inbox" => ^inbox1, "unreachable_since" => ^expected_dt}}}] =
|
||||||
|
all_enqueued(worker: Pleroma.Workers.Publisher)
|
||||||
end
|
end
|
||||||
|
|
||||||
test_with_mock "it federates only to reachable instances via Websub",
|
test "it federates only to reachable instances via Websub" do
|
||||||
Pleroma.Web.Websub,
|
|
||||||
[:passthrough],
|
|
||||||
[] do
|
|
||||||
user = insert(:user)
|
user = insert(:user)
|
||||||
websub_topic = Pleroma.Web.OStatus.feed_path(user)
|
websub_topic = Pleroma.Web.OStatus.feed_path(user)
|
||||||
|
|
||||||
|
@ -142,23 +126,25 @@ defmodule Pleroma.Web.FederatorTest do
|
||||||
|
|
||||||
{:ok, _activity} = CommonAPI.post(user, %{"status" => "HI"})
|
{:ok, _activity} = CommonAPI.post(user, %{"status" => "HI"})
|
||||||
|
|
||||||
assert called(
|
expected_callback = sub2.callback
|
||||||
Pleroma.Web.Websub.publish_one(%{
|
expected_dt = NaiveDateTime.to_iso8601(dt)
|
||||||
callback: sub2.callback,
|
|
||||||
unreachable_since: dt
|
|
||||||
})
|
|
||||||
)
|
|
||||||
|
|
||||||
refute called(Pleroma.Web.Websub.publish_one(%{callback: sub1.callback}))
|
assert [
|
||||||
|
%{
|
||||||
|
args: %{
|
||||||
|
"params" => %{
|
||||||
|
"callback" => ^expected_callback,
|
||||||
|
"unreachable_since" => ^expected_dt
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
] = all_enqueued(worker: Pleroma.Workers.Publisher)
|
||||||
end
|
end
|
||||||
|
|
||||||
test_with_mock "it federates only to reachable instances via Salmon",
|
test "it federates only to reachable instances via Salmon" do
|
||||||
Pleroma.Web.Salmon,
|
|
||||||
[:passthrough],
|
|
||||||
[] do
|
|
||||||
user = insert(:user)
|
user = insert(:user)
|
||||||
|
|
||||||
remote_user1 =
|
_remote_user1 =
|
||||||
insert(:user, %{
|
insert(:user, %{
|
||||||
local: false,
|
local: false,
|
||||||
nickname: "nick1@domain.com",
|
nickname: "nick1@domain.com",
|
||||||
|
@ -174,6 +160,8 @@ defmodule Pleroma.Web.FederatorTest do
|
||||||
info: %{salmon: "https://domain2.com/salmon"}
|
info: %{salmon: "https://domain2.com/salmon"}
|
||||||
})
|
})
|
||||||
|
|
||||||
|
remote_user2_id = remote_user2.id
|
||||||
|
|
||||||
dt = NaiveDateTime.utc_now()
|
dt = NaiveDateTime.utc_now()
|
||||||
Instances.set_unreachable(remote_user2.ap_id, dt)
|
Instances.set_unreachable(remote_user2.ap_id, dt)
|
||||||
|
|
||||||
|
@ -182,14 +170,18 @@ defmodule Pleroma.Web.FederatorTest do
|
||||||
{:ok, _activity} =
|
{:ok, _activity} =
|
||||||
CommonAPI.post(user, %{"status" => "HI @nick1@domain.com, @nick2@domain2.com!"})
|
CommonAPI.post(user, %{"status" => "HI @nick1@domain.com, @nick2@domain2.com!"})
|
||||||
|
|
||||||
assert called(
|
expected_dt = NaiveDateTime.to_iso8601(dt)
|
||||||
Pleroma.Web.Salmon.publish_one(%{
|
|
||||||
recipient: remote_user2,
|
|
||||||
unreachable_since: dt
|
|
||||||
})
|
|
||||||
)
|
|
||||||
|
|
||||||
refute called(Pleroma.Web.Salmon.publish_one(%{recipient: remote_user1}))
|
assert [
|
||||||
|
%{
|
||||||
|
args: %{
|
||||||
|
"params" => %{
|
||||||
|
"recipient_id" => ^remote_user2_id,
|
||||||
|
"unreachable_since" => ^expected_dt
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
] = all_enqueued(worker: Pleroma.Workers.Publisher)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
|
@ -1,48 +0,0 @@
|
||||||
# Pleroma: A lightweight social networking server
|
|
||||||
# Copyright © 2017-2018 Pleroma Authors <https://pleroma.social/>
|
|
||||||
# SPDX-License-Identifier: AGPL-3.0-only
|
|
||||||
|
|
||||||
defmodule MockActivityPub do
|
|
||||||
def publish_one({ret, waiter}) do
|
|
||||||
send(waiter, :complete)
|
|
||||||
{ret, "success"}
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
defmodule Pleroma.Web.Federator.RetryQueueTest do
|
|
||||||
use Pleroma.DataCase
|
|
||||||
alias Pleroma.Web.Federator.RetryQueue
|
|
||||||
|
|
||||||
@small_retry_count 0
|
|
||||||
@hopeless_retry_count 10
|
|
||||||
|
|
||||||
setup do
|
|
||||||
RetryQueue.reset_stats()
|
|
||||||
end
|
|
||||||
|
|
||||||
test "RetryQueue responds to stats request" do
|
|
||||||
assert %{delivered: 0, dropped: 0} == RetryQueue.get_stats()
|
|
||||||
end
|
|
||||||
|
|
||||||
test "failed posts are retried" do
|
|
||||||
{:retry, _timeout} = RetryQueue.get_retry_params(@small_retry_count)
|
|
||||||
|
|
||||||
wait_task =
|
|
||||||
Task.async(fn ->
|
|
||||||
receive do
|
|
||||||
:complete -> :ok
|
|
||||||
end
|
|
||||||
end)
|
|
||||||
|
|
||||||
RetryQueue.enqueue({:ok, wait_task.pid}, MockActivityPub, @small_retry_count)
|
|
||||||
Task.await(wait_task)
|
|
||||||
assert %{delivered: 1, dropped: 0} == RetryQueue.get_stats()
|
|
||||||
end
|
|
||||||
|
|
||||||
test "posts that have been tried too many times are dropped" do
|
|
||||||
{:drop, _timeout} = RetryQueue.get_retry_params(@hopeless_retry_count)
|
|
||||||
|
|
||||||
RetryQueue.enqueue({:ok, nil}, MockActivityPub, @hopeless_retry_count)
|
|
||||||
assert %{delivered: 0, dropped: 1} == RetryQueue.get_stats()
|
|
||||||
end
|
|
||||||
end
|
|
|
@ -96,6 +96,6 @@ defmodule Pleroma.Web.Salmon.SalmonTest do
|
||||||
|
|
||||||
Salmon.publish(user, activity)
|
Salmon.publish(user, activity)
|
||||||
|
|
||||||
assert called(Publisher.enqueue_one(Salmon, %{recipient: mentioned_user}))
|
assert called(Publisher.enqueue_one(Salmon, %{recipient_id: mentioned_user.id}))
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
Loading…
Reference in a new issue