mirror of
https://git.pleroma.social/pleroma/pleroma.git
synced 2025-01-24 16:08:09 +00:00
Allow unified streaming endpoint
This commit is contained in:
parent
14b1b9c9b0
commit
2b5636bf12
4 changed files with 26 additions and 8 deletions
|
@ -32,8 +32,15 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do
|
||||||
req
|
req
|
||||||
end
|
end
|
||||||
|
|
||||||
|
topics =
|
||||||
|
if topic do
|
||||||
|
[topic]
|
||||||
|
else
|
||||||
|
[]
|
||||||
|
end
|
||||||
|
|
||||||
{:cowboy_websocket, req,
|
{:cowboy_websocket, req,
|
||||||
%{user: user, topic: topic, oauth_token: oauth_token, count: 0, timer: nil},
|
%{user: user, topics: topics, oauth_token: oauth_token, count: 0, timer: nil},
|
||||||
%{idle_timeout: @timeout}}
|
%{idle_timeout: @timeout}}
|
||||||
else
|
else
|
||||||
{:error, :bad_topic} ->
|
{:error, :bad_topic} ->
|
||||||
|
@ -50,10 +57,10 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do
|
||||||
|
|
||||||
def websocket_init(state) do
|
def websocket_init(state) do
|
||||||
Logger.debug(
|
Logger.debug(
|
||||||
"#{__MODULE__} accepted websocket connection for user #{(state.user || %{id: "anonymous"}).id}, topic #{state.topic}"
|
"#{__MODULE__} accepted websocket connection for user #{(state.user || %{id: "anonymous"}).id}, topics #{state.topics}"
|
||||||
)
|
)
|
||||||
|
|
||||||
Streamer.add_socket(state.topic, state.oauth_token)
|
Enum.each(state.topics, fn topic -> Streamer.add_socket(topic, state.oauth_token) end)
|
||||||
{:ok, %{state | timer: timer()}}
|
{:ok, %{state | timer: timer()}}
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@ -109,10 +116,10 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do
|
||||||
|
|
||||||
def terminate(reason, _req, state) do
|
def terminate(reason, _req, state) do
|
||||||
Logger.debug(
|
Logger.debug(
|
||||||
"#{__MODULE__} terminating websocket connection for user #{(state.user || %{id: "anonymous"}).id}, topic #{state.topic || "?"}: #{inspect(reason)}"
|
"#{__MODULE__} terminating websocket connection for user #{(state.user || %{id: "anonymous"}).id}, topics #{state.topics || "?"}: #{inspect(reason)}"
|
||||||
)
|
)
|
||||||
|
|
||||||
Streamer.remove_socket(state.topic)
|
Enum.each(state.topics, fn topic -> Streamer.remove_socket(topic) end)
|
||||||
:ok
|
:ok
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
|
@ -59,10 +59,14 @@ defmodule Pleroma.Web.Streamer do
|
||||||
end
|
end
|
||||||
|
|
||||||
@doc "Expand and authorizes a stream"
|
@doc "Expand and authorizes a stream"
|
||||||
@spec get_topic(stream :: String.t(), User.t() | nil, Token.t() | nil, Map.t()) ::
|
@spec get_topic(stream :: String.t() | nil, User.t() | nil, Token.t() | nil, Map.t()) ::
|
||||||
{:ok, topic :: String.t()} | {:error, :bad_topic}
|
{:ok, topic :: String.t() | nil} | {:error, :bad_topic}
|
||||||
def get_topic(stream, user, oauth_token, params \\ %{})
|
def get_topic(stream, user, oauth_token, params \\ %{})
|
||||||
|
|
||||||
|
def get_topic(nil = _stream, _user, _oauth_token, _params) do
|
||||||
|
{:ok, nil}
|
||||||
|
end
|
||||||
|
|
||||||
# Allow all public steams if the instance allows unauthenticated access.
|
# Allow all public steams if the instance allows unauthenticated access.
|
||||||
# Otherwise, only allow users with valid oauth tokens.
|
# Otherwise, only allow users with valid oauth tokens.
|
||||||
def get_topic(stream, user, oauth_token, _params) when stream in @public_streams do
|
def get_topic(stream, user, oauth_token, _params) when stream in @public_streams do
|
||||||
|
|
|
@ -33,7 +33,6 @@ defmodule Pleroma.Integration.MastodonWebsocketTest do
|
||||||
|
|
||||||
test "refuses invalid requests" do
|
test "refuses invalid requests" do
|
||||||
capture_log(fn ->
|
capture_log(fn ->
|
||||||
assert {:error, %WebSockex.RequestError{code: 404}} = start_socket()
|
|
||||||
assert {:error, %WebSockex.RequestError{code: 404}} = start_socket("?stream=ncjdk")
|
assert {:error, %WebSockex.RequestError{code: 404}} = start_socket("?stream=ncjdk")
|
||||||
Process.sleep(30)
|
Process.sleep(30)
|
||||||
end)
|
end)
|
||||||
|
@ -49,6 +48,10 @@ defmodule Pleroma.Integration.MastodonWebsocketTest do
|
||||||
end)
|
end)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
test "allows unified stream" do
|
||||||
|
assert {:ok, _} = start_socket()
|
||||||
|
end
|
||||||
|
|
||||||
test "allows public streams without authentication" do
|
test "allows public streams without authentication" do
|
||||||
assert {:ok, _} = start_socket("?stream=public")
|
assert {:ok, _} = start_socket("?stream=public")
|
||||||
assert {:ok, _} = start_socket("?stream=public:local")
|
assert {:ok, _} = start_socket("?stream=public:local")
|
||||||
|
|
|
@ -22,6 +22,10 @@ defmodule Pleroma.Web.StreamerTest do
|
||||||
setup do: clear_config([:instance, :skip_thread_containment])
|
setup do: clear_config([:instance, :skip_thread_containment])
|
||||||
|
|
||||||
describe "get_topic/_ (unauthenticated)" do
|
describe "get_topic/_ (unauthenticated)" do
|
||||||
|
test "allows no stream" do
|
||||||
|
assert {:ok, nil} = Streamer.get_topic(nil, nil, nil)
|
||||||
|
end
|
||||||
|
|
||||||
test "allows public" do
|
test "allows public" do
|
||||||
assert {:ok, "public"} = Streamer.get_topic("public", nil, nil)
|
assert {:ok, "public"} = Streamer.get_topic("public", nil, nil)
|
||||||
assert {:ok, "public:local"} = Streamer.get_topic("public:local", nil, nil)
|
assert {:ok, "public:local"} = Streamer.get_topic("public:local", nil, nil)
|
||||||
|
|
Loading…
Reference in a new issue