diff --git a/changelog.d/user-imports.fix b/changelog.d/user-imports.fix new file mode 100644 index 000000000..0076c73d7 --- /dev/null +++ b/changelog.d/user-imports.fix @@ -0,0 +1 @@ +Imports of blocks, mutes, and follows would retry repeatedly due to incorrect error handling and all work executed in a single job diff --git a/lib/pleroma/user/import.ex b/lib/pleroma/user/import.ex index 11905237c..b79fa88eb 100644 --- a/lib/pleroma/user/import.ex +++ b/lib/pleroma/user/import.ex @@ -5,6 +5,7 @@ defmodule Pleroma.User.Import do use Ecto.Schema + alias Pleroma.Repo alias Pleroma.User alias Pleroma.Web.CommonAPI alias Pleroma.Workers.BackgroundWorker @@ -12,80 +13,99 @@ defmodule Pleroma.User.Import do require Logger @spec perform(atom(), User.t(), list()) :: :ok | list() | {:error, any()} - def perform(:mutes_import, %User{} = user, [_ | _] = identifiers) do - Enum.map( - identifiers, - fn identifier -> - with {:ok, %User{} = muted_user} <- User.get_or_fetch(identifier), - {:ok, _} <- User.mute(user, muted_user) do - muted_user - else - error -> handle_error(:mutes_import, identifier, error) - end - end - ) + def perform(:mute_import, %User{} = user, actor) do + with {:ok, %User{} = muted_user} <- User.get_or_fetch(actor), + {_, false} <- {:existing_mute, User.mutes_user?(user, muted_user)}, + {:ok, _} <- User.mute(user, muted_user) do + {:ok, muted_user} + else + {:existing_mute, true} -> :ok + error -> handle_error(:mutes_import, actor, error) + end end - def perform(:blocks_import, %User{} = blocker, [_ | _] = identifiers) do - Enum.map( - identifiers, - fn identifier -> - with {:ok, %User{} = blocked} <- User.get_or_fetch(identifier), - {:ok, _block} <- CommonAPI.block(blocked, blocker) do - blocked - else - error -> handle_error(:blocks_import, identifier, error) - end - end - ) + def perform(:block_import, %User{} = user, actor) do + with {:ok, %User{} = blocked} <- User.get_or_fetch(actor), + {_, false} <- {:existing_block, User.blocks_user?(user, blocked)}, + {:ok, _block} <- CommonAPI.block(blocked, user) do + {:ok, blocked} + else + {:existing_block, true} -> :ok + error -> handle_error(:blocks_import, actor, error) + end end - def perform(:follow_import, %User{} = follower, [_ | _] = identifiers) do - Enum.map( - identifiers, - fn identifier -> - with {:ok, %User{} = followed} <- User.get_or_fetch(identifier), - {:ok, follower, followed} <- User.maybe_direct_follow(follower, followed), - {:ok, _, _, _} <- CommonAPI.follow(followed, follower) do - followed - else - error -> handle_error(:follow_import, identifier, error) - end - end - ) + def perform(:follow_import, %User{} = user, actor) do + with {:ok, %User{} = followed} <- User.get_or_fetch(actor), + {_, false} <- {:existing_follow, User.following?(user, followed)}, + {:ok, user, followed} <- User.maybe_direct_follow(user, followed), + {:ok, _, _, _} <- CommonAPI.follow(followed, user) do + {:ok, followed} + else + {:existing_follow, true} -> :ok + error -> handle_error(:follow_import, actor, error) + end end - def perform(_, _, _), do: :ok - defp handle_error(op, user_id, error) do Logger.debug("#{op} failed for #{user_id} with: #{inspect(error)}") error end - def blocks_import(%User{} = blocker, [_ | _] = identifiers) do - BackgroundWorker.new(%{ - "op" => "blocks_import", - "user_id" => blocker.id, - "identifiers" => identifiers - }) - |> Oban.insert() + def blocks_import(%User{} = user, [_ | _] = actors) do + jobs = + Repo.checkout(fn -> + Enum.reduce(actors, [], fn actor, acc -> + {:ok, job} = + BackgroundWorker.new(%{ + "op" => "block_import", + "user_id" => user.id, + "actor" => actor + }) + |> Oban.insert() + + acc ++ [job] + end) + end) + + {:ok, jobs} end - def follow_import(%User{} = follower, [_ | _] = identifiers) do - BackgroundWorker.new(%{ - "op" => "follow_import", - "user_id" => follower.id, - "identifiers" => identifiers - }) - |> Oban.insert() + def follows_import(%User{} = user, [_ | _] = actors) do + jobs = + Repo.checkout(fn -> + Enum.reduce(actors, [], fn actor, acc -> + {:ok, job} = + BackgroundWorker.new(%{ + "op" => "follow_import", + "user_id" => user.id, + "actor" => actor + }) + |> Oban.insert() + + acc ++ [job] + end) + end) + + {:ok, jobs} end - def mutes_import(%User{} = user, [_ | _] = identifiers) do - BackgroundWorker.new(%{ - "op" => "mutes_import", - "user_id" => user.id, - "identifiers" => identifiers - }) - |> Oban.insert() + def mutes_import(%User{} = user, [_ | _] = actors) do + jobs = + Repo.checkout(fn -> + Enum.reduce(actors, [], fn actor, acc -> + {:ok, job} = + BackgroundWorker.new(%{ + "op" => "mute_import", + "user_id" => user.id, + "actor" => actor + }) + |> Oban.insert() + + acc ++ [job] + end) + end) + + {:ok, jobs} end end diff --git a/lib/pleroma/web/pleroma_api/controllers/user_import_controller.ex b/lib/pleroma/web/pleroma_api/controllers/user_import_controller.ex index 96466f192..d65c30dab 100644 --- a/lib/pleroma/web/pleroma_api/controllers/user_import_controller.ex +++ b/lib/pleroma/web/pleroma_api/controllers/user_import_controller.ex @@ -38,8 +38,8 @@ defmodule Pleroma.Web.PleromaAPI.UserImportController do |> Enum.map(&(&1 |> String.trim() |> String.trim_leading("@"))) |> Enum.reject(&(&1 == "")) - User.Import.follow_import(follower, identifiers) - json(conn, "job started") + User.Import.follows_import(follower, identifiers) + json(conn, "jobs started") end def blocks( @@ -55,7 +55,7 @@ defmodule Pleroma.Web.PleromaAPI.UserImportController do defp do_block(%{assigns: %{user: blocker}} = conn, list) do User.Import.blocks_import(blocker, prepare_user_identifiers(list)) - json(conn, "job started") + json(conn, "jobs started") end def mutes( @@ -71,7 +71,7 @@ defmodule Pleroma.Web.PleromaAPI.UserImportController do defp do_mute(%{assigns: %{user: user}} = conn, list) do User.Import.mutes_import(user, prepare_user_identifiers(list)) - json(conn, "job started") + json(conn, "jobs started") end defp prepare_user_identifiers(list) do diff --git a/lib/pleroma/workers/background_worker.ex b/lib/pleroma/workers/background_worker.ex index 60da2d5ca..4737c6ea2 100644 --- a/lib/pleroma/workers/background_worker.ex +++ b/lib/pleroma/workers/background_worker.ex @@ -19,10 +19,10 @@ defmodule Pleroma.Workers.BackgroundWorker do User.perform(:force_password_reset, user) end - def perform(%Job{args: %{"op" => op, "user_id" => user_id, "identifiers" => identifiers}}) - when op in ["blocks_import", "follow_import", "mutes_import"] do + def perform(%Job{args: %{"op" => op, "user_id" => user_id, "actor" => actor}}) + when op in ["block_import", "follow_import", "mute_import"] do user = User.get_cached_by_id(user_id) - {:ok, User.Import.perform(String.to_existing_atom(op), user, identifiers)} + User.Import.perform(String.to_existing_atom(op), user, actor) end def perform(%Job{ diff --git a/test/pleroma/user/import_test.exs b/test/pleroma/user/import_test.exs index f75305e0e..1d6469a4f 100644 --- a/test/pleroma/user/import_test.exs +++ b/test/pleroma/user/import_test.exs @@ -25,11 +25,12 @@ defmodule Pleroma.User.ImportTest do user3.nickname ] - {:ok, job} = User.Import.follow_import(user1, identifiers) + {:ok, jobs} = User.Import.follows_import(user1, identifiers) + + for job <- jobs do + assert {:ok, %User{}} = ObanHelpers.perform(job) + end - assert {:ok, result} = ObanHelpers.perform(job) - assert is_list(result) - assert result == [refresh_record(user2), refresh_record(user3)] assert User.following?(user1, user2) assert User.following?(user1, user3) end @@ -44,11 +45,12 @@ defmodule Pleroma.User.ImportTest do user3.nickname ] - {:ok, job} = User.Import.blocks_import(user1, identifiers) + {:ok, jobs} = User.Import.blocks_import(user1, identifiers) + + for job <- jobs do + assert {:ok, %User{}} = ObanHelpers.perform(job) + end - assert {:ok, result} = ObanHelpers.perform(job) - assert is_list(result) - assert result == [user2, user3] assert User.blocks?(user1, user2) assert User.blocks?(user1, user3) end @@ -63,11 +65,12 @@ defmodule Pleroma.User.ImportTest do user3.nickname ] - {:ok, job} = User.Import.mutes_import(user1, identifiers) + {:ok, jobs} = User.Import.mutes_import(user1, identifiers) + + for job <- jobs do + assert {:ok, %User{}} = ObanHelpers.perform(job) + end - assert {:ok, result} = ObanHelpers.perform(job) - assert is_list(result) - assert result == [user2, user3] assert User.mutes?(user1, user2) assert User.mutes?(user1, user3) end diff --git a/test/pleroma/web/pleroma_api/controllers/user_import_controller_test.exs b/test/pleroma/web/pleroma_api/controllers/user_import_controller_test.exs index 52a62e416..efdc743e3 100644 --- a/test/pleroma/web/pleroma_api/controllers/user_import_controller_test.exs +++ b/test/pleroma/web/pleroma_api/controllers/user_import_controller_test.exs @@ -22,7 +22,7 @@ defmodule Pleroma.Web.PleromaAPI.UserImportControllerTest do test "it returns HTTP 200", %{conn: conn} do user2 = insert(:user) - assert "job started" == + assert "jobs started" == conn |> put_req_header("content-type", "application/json") |> post("/api/pleroma/follow_import", %{"list" => "#{user2.ap_id}"}) @@ -38,7 +38,7 @@ defmodule Pleroma.Web.PleromaAPI.UserImportControllerTest do "Account address,Show boosts\n#{user2.ap_id},true" end} ]) do - assert "job started" == + assert "jobs started" == conn |> put_req_header("content-type", "application/json") |> post("/api/pleroma/follow_import", %{ @@ -46,9 +46,9 @@ defmodule Pleroma.Web.PleromaAPI.UserImportControllerTest do }) |> json_response_and_validate_schema(200) - assert [{:ok, job_result}] = ObanHelpers.perform_all() - assert job_result == [refresh_record(user2)] - assert [%Pleroma.User{follower_count: 1}] = job_result + assert [{:ok, updated_user}] = ObanHelpers.perform_all() + assert updated_user.id == user2.id + assert updated_user.follower_count == 1 end end @@ -63,7 +63,7 @@ defmodule Pleroma.Web.PleromaAPI.UserImportControllerTest do }) |> json_response_and_validate_schema(200) - assert response == "job started" + assert response == "jobs started" end test "requires 'follow' or 'write:follows' permissions" do @@ -102,14 +102,20 @@ defmodule Pleroma.Web.PleromaAPI.UserImportControllerTest do ] |> Enum.join("\n") - assert "job started" == + assert "jobs started" == conn |> put_req_header("content-type", "application/json") |> post("/api/pleroma/follow_import", %{"list" => identifiers}) |> json_response_and_validate_schema(200) - assert [{:ok, job_result}] = ObanHelpers.perform_all() - assert job_result == Enum.map(users, &refresh_record/1) + results = ObanHelpers.perform_all() + + returned_users = + for {_, returned_user} <- results do + returned_user + end + + assert returned_users == Enum.map(users, &refresh_record/1) end end @@ -120,7 +126,7 @@ defmodule Pleroma.Web.PleromaAPI.UserImportControllerTest do test "it returns HTTP 200", %{conn: conn} do user2 = insert(:user) - assert "job started" == + assert "jobs started" == conn |> put_req_header("content-type", "application/json") |> post("/api/pleroma/blocks_import", %{"list" => "#{user2.ap_id}"}) @@ -133,7 +139,7 @@ defmodule Pleroma.Web.PleromaAPI.UserImportControllerTest do with_mocks([ {File, [], read!: fn "blocks_list.txt" -> "#{user2.ap_id} #{user3.ap_id}" end} ]) do - assert "job started" == + assert "jobs started" == conn |> put_req_header("content-type", "application/json") |> post("/api/pleroma/blocks_import", %{ @@ -141,8 +147,14 @@ defmodule Pleroma.Web.PleromaAPI.UserImportControllerTest do }) |> json_response_and_validate_schema(200) - assert [{:ok, job_result}] = ObanHelpers.perform_all() - assert job_result == users + results = ObanHelpers.perform_all() + + returned_users = + for {_, returned_user} <- results do + returned_user + end + + assert returned_users == users end end @@ -159,14 +171,25 @@ defmodule Pleroma.Web.PleromaAPI.UserImportControllerTest do ] |> Enum.join(" ") - assert "job started" == + assert "jobs started" == conn |> put_req_header("content-type", "application/json") |> post("/api/pleroma/blocks_import", %{"list" => identifiers}) |> json_response_and_validate_schema(200) - assert [{:ok, job_result}] = ObanHelpers.perform_all() - assert job_result == users + results = ObanHelpers.perform_all() + + returned_user_ids = + for {_, user} <- results do + user.id + end + + original_user_ids = + for user <- users do + user.id + end + + assert match?(^original_user_ids, returned_user_ids) end end @@ -177,24 +200,25 @@ defmodule Pleroma.Web.PleromaAPI.UserImportControllerTest do test "it returns HTTP 200", %{user: user, conn: conn} do user2 = insert(:user) - assert "job started" == + assert "jobs started" == conn |> put_req_header("content-type", "application/json") |> post("/api/pleroma/mutes_import", %{"list" => "#{user2.ap_id}"}) |> json_response_and_validate_schema(200) - assert [{:ok, job_result}] = ObanHelpers.perform_all() - assert job_result == [user2] + [{:ok, result_user}] = ObanHelpers.perform_all() + + assert result_user == refresh_record(user2) assert Pleroma.User.mutes?(user, user2) end test "it imports mutes users from file", %{user: user, conn: conn} do - users = [user2, user3] = insert_list(2, :user) + [user2, user3] = insert_list(2, :user) with_mocks([ {File, [], read!: fn "mutes_list.txt" -> "#{user2.ap_id} #{user3.ap_id}" end} ]) do - assert "job started" == + assert "jobs started" == conn |> put_req_header("content-type", "application/json") |> post("/api/pleroma/mutes_import", %{ @@ -202,14 +226,19 @@ defmodule Pleroma.Web.PleromaAPI.UserImportControllerTest do }) |> json_response_and_validate_schema(200) - assert [{:ok, job_result}] = ObanHelpers.perform_all() - assert job_result == users - assert Enum.all?(users, &Pleroma.User.mutes?(user, &1)) + results = ObanHelpers.perform_all() + + returned_users = + for {_, returned_user} <- results do + returned_user + end + + assert Enum.all?(returned_users, &Pleroma.User.mutes?(user, &1)) end end test "it imports mutes with different nickname variations", %{user: user, conn: conn} do - users = [user2, user3, user4, user5, user6] = insert_list(5, :user) + [user2, user3, user4, user5, user6] = insert_list(5, :user) identifiers = [ @@ -221,15 +250,20 @@ defmodule Pleroma.Web.PleromaAPI.UserImportControllerTest do ] |> Enum.join(" ") - assert "job started" == + assert "jobs started" == conn |> put_req_header("content-type", "application/json") |> post("/api/pleroma/mutes_import", %{"list" => identifiers}) |> json_response_and_validate_schema(200) - assert [{:ok, job_result}] = ObanHelpers.perform_all() - assert job_result == users - assert Enum.all?(users, &Pleroma.User.mutes?(user, &1)) + results = ObanHelpers.perform_all() + + returned_users = + for {_, returned_user} <- results do + returned_user + end + + assert Enum.all?(returned_users, &Pleroma.User.mutes?(user, &1)) end end end