mirror of
https://git.pleroma.social/pleroma/pleroma.git
synced 2025-01-21 22:48:13 +00:00
Fork the Lifeline plugin to raise Lazarus
This commit is contained in:
parent
0ea63d824e
commit
83286a1d9f
1 changed files with 189 additions and 0 deletions
189
lib/pleroma/oban/plugins/lazarus.ex
Normal file
189
lib/pleroma/oban/plugins/lazarus.ex
Normal file
|
@ -0,0 +1,189 @@
|
|||
# Oban is originally Apache licensed which is where this came from
|
||||
# It needs to go into a separate repo and be its own project
|
||||
# and continue using the Apache license
|
||||
|
||||
defmodule Pleroma.Oban.Plugins.Lazarus do
|
||||
@moduledoc """
|
||||
Naively transition jobs stuck `executing` back to `available`.
|
||||
|
||||
The `Lazarus` plugin periodically rescues orphaned jobs, i.e. jobs that are stuck in the
|
||||
`executing` state because the node was shut down before the job could finish. Rescuing is
|
||||
purely based on time, rather than any heuristic about the job's expected execution time or
|
||||
whether the node is still alive.
|
||||
|
||||
If an executing job has exhausted all attempts and max_attempts > 1, the Lazarus plugin
|
||||
will mark it `discarded` rather than `available`.
|
||||
|
||||
## Using the Plugin
|
||||
|
||||
Rescue orphaned jobs that are still `executing` after the default of 60 minutes:
|
||||
|
||||
config :my_app, Oban,
|
||||
plugins: [Pleroma.Oban.Plugins.Lazarus],
|
||||
...
|
||||
|
||||
Override the default period to rescue orphans after a more aggressive period of 5 minutes:
|
||||
|
||||
config :my_app, Oban,
|
||||
plugins: [{Pleroma.Oban.Plugins.Lazarus, rescue_after: :timer.minutes(5)}],
|
||||
...
|
||||
|
||||
## Options
|
||||
|
||||
* `:interval` — the number of milliseconds between rescue attempts. The default is `60_000ms`.
|
||||
|
||||
* `:rescue_after` — the maximum amount of time, in milliseconds, that a job may execute before
|
||||
being rescued. 60 minutes by default, and rescuing is performed once a minute.
|
||||
|
||||
## Instrumenting with Telemetry
|
||||
|
||||
The `Oban.Plugins.Lifeline` plugin adds the following metadata to the `[:oban, :plugin, :stop]`
|
||||
event:
|
||||
|
||||
* `:rescued_jobs` — a list of jobs transitioned back to `available`
|
||||
|
||||
* `:discarded_jobs` — a list of jobs transitioned to `discarded`
|
||||
|
||||
_Note: jobs only include `id`, `queue`, `state` fields._
|
||||
"""
|
||||
|
||||
@behaviour Oban.Plugin
|
||||
|
||||
use GenServer
|
||||
|
||||
import Ecto.Query, only: [select: 3, where: 3]
|
||||
|
||||
alias Oban.{Job, Peer, Plugin, Repo, Validation}
|
||||
alias __MODULE__, as: State
|
||||
|
||||
@type option ::
|
||||
Plugin.option()
|
||||
| {:interval, timeout()}
|
||||
| {:rescue_after, pos_integer()}
|
||||
|
||||
defstruct [
|
||||
:conf,
|
||||
:timer,
|
||||
interval: :timer.minutes(1),
|
||||
rescue_after: :timer.minutes(60)
|
||||
]
|
||||
|
||||
@doc false
|
||||
@spec child_spec(Keyword.t()) :: Supervisor.child_spec()
|
||||
def child_spec(opts), do: super(opts)
|
||||
|
||||
@impl Plugin
|
||||
@spec start_link([option()]) :: GenServer.on_start()
|
||||
def start_link(opts) do
|
||||
{name, opts} = Keyword.pop(opts, :name)
|
||||
|
||||
GenServer.start_link(__MODULE__, struct!(State, opts), name: name)
|
||||
end
|
||||
|
||||
@impl Plugin
|
||||
def validate(opts) do
|
||||
Validation.validate_schema(opts,
|
||||
conf: :any,
|
||||
name: :any,
|
||||
interval: :pos_integer,
|
||||
rescue_after: :pos_integer
|
||||
)
|
||||
end
|
||||
|
||||
@impl GenServer
|
||||
def init(state) do
|
||||
:telemetry.execute([:oban, :plugin, :init], %{}, %{conf: state.conf, plugin: __MODULE__})
|
||||
|
||||
{:ok, schedule_rescue(state)}
|
||||
end
|
||||
|
||||
@impl GenServer
|
||||
def terminate(_reason, %State{timer: timer}) do
|
||||
if is_reference(timer), do: Process.cancel_timer(timer)
|
||||
|
||||
:ok
|
||||
end
|
||||
|
||||
@impl GenServer
|
||||
def handle_info(:rescue, %State{} = state) do
|
||||
meta = %{conf: state.conf, plugin: __MODULE__}
|
||||
|
||||
:telemetry.span([:oban, :plugin], meta, fn ->
|
||||
case check_leadership_and_rescue_jobs(state) do
|
||||
{:ok, extra} when is_map(extra) ->
|
||||
{:ok, Map.merge(meta, extra)}
|
||||
|
||||
error ->
|
||||
{:error, Map.put(meta, :error, error)}
|
||||
end
|
||||
end)
|
||||
|
||||
{:noreply, schedule_rescue(state)}
|
||||
end
|
||||
|
||||
# Scheduling
|
||||
|
||||
defp schedule_rescue(state) do
|
||||
timer = Process.send_after(self(), :rescue, state.interval)
|
||||
|
||||
%{state | timer: timer}
|
||||
end
|
||||
|
||||
# Rescuing
|
||||
|
||||
defp check_leadership_and_rescue_jobs(state) do
|
||||
if Peer.leader?(state.conf) do
|
||||
Repo.transaction(state.conf, fn ->
|
||||
time = DateTime.add(DateTime.utc_now(), -state.rescue_after, :millisecond)
|
||||
base = where(Job, [j], j.state == "executing" and j.attempted_at < ^time)
|
||||
|
||||
{rescued_count, rescued} = transition_available(base, state)
|
||||
{discard_count, discard} = transition_discarded(base, state)
|
||||
|
||||
%{
|
||||
discarded_count: discard_count,
|
||||
discarded_jobs: discard,
|
||||
rescued_count: rescued_count,
|
||||
rescued_jobs: rescued
|
||||
}
|
||||
end)
|
||||
else
|
||||
{:ok, %{}}
|
||||
end
|
||||
end
|
||||
|
||||
# Rescue stuck max_attempts: 1 jobs from the dead and try again
|
||||
# until it gives us a clear error
|
||||
# Others that have tried multiple times can be discarded.
|
||||
defp transition_available(base, state) do
|
||||
query =
|
||||
base
|
||||
|> where([j], j.max_attempts == 1)
|
||||
|> where([j], j.attempt == j.max_attempts)
|
||||
|> select([j], map(j, [:id, :queue, :state]))
|
||||
|
||||
{resurrected_count, resurrected} =
|
||||
Repo.update_all(state.conf, query, set: [state: "available", attempt: 0])
|
||||
|
||||
query =
|
||||
base
|
||||
|> where([j], j.attempt < j.max_attempts)
|
||||
|> select([j], map(j, [:id, :queue, :state]))
|
||||
|
||||
{rescued_count, rescued} =
|
||||
Repo.update_all(state.conf, query, set: [state: "available"])
|
||||
|
||||
{resurrected_count + rescued_count, resurrected ++ rescued}
|
||||
end
|
||||
|
||||
defp transition_discarded(base, state) do
|
||||
query =
|
||||
base
|
||||
|> where([j], j.attempt >= j.max_attempts)
|
||||
|> select([j], map(j, [:id, :queue, :state]))
|
||||
|
||||
Repo.update_all(state.conf, query,
|
||||
set: [state: "discarded", discarded_at: DateTime.utc_now()]
|
||||
)
|
||||
end
|
||||
end
|
Loading…
Reference in a new issue