mirror of
https://git.pleroma.social/pleroma/pleroma.git
synced 2024-11-14 04:52:17 +00:00
201 lines
5.4 KiB
Elixir
201 lines
5.4 KiB
Elixir
# Pleroma: A lightweight social networking server
|
|
# Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
|
|
# SPDX-License-Identifier: AGPL-3.0-only
|
|
|
|
defmodule Pleroma.Integration.WebsocketClient do
|
|
@moduledoc """
|
|
A WebSocket client used to test Mastodon API streaming
|
|
|
|
Based on Phoenix Framework's WebsocketClient
|
|
https://github.com/phoenixframework/phoenix/blob/master/test/support/websocket_client.exs
|
|
"""
|
|
|
|
use GenServer
|
|
import Kernel, except: [send: 2]
|
|
|
|
defstruct [
|
|
:conn,
|
|
:request_ref,
|
|
:websocket,
|
|
:caller,
|
|
:status,
|
|
:resp_headers,
|
|
:sender,
|
|
closing?: false
|
|
]
|
|
|
|
@doc """
|
|
Starts the WebSocket client for given ws URL. `Phoenix.Socket.Message`s
|
|
received from the server are forwarded to the sender pid.
|
|
"""
|
|
def connect(sender, url, headers \\ []) do
|
|
with {:ok, socket} <- GenServer.start_link(__MODULE__, {sender}),
|
|
{:ok, :connected} <- GenServer.call(socket, {:connect, url, headers}) do
|
|
{:ok, socket}
|
|
end
|
|
end
|
|
|
|
@doc """
|
|
Closes the socket
|
|
"""
|
|
def close(socket) do
|
|
GenServer.cast(socket, :close)
|
|
end
|
|
|
|
@doc """
|
|
Sends a low-level text message to the client.
|
|
"""
|
|
def send_text(server_pid, msg) do
|
|
GenServer.call(server_pid, {:text, msg})
|
|
end
|
|
|
|
@doc false
|
|
def init({sender}) do
|
|
state = %__MODULE__{sender: sender}
|
|
|
|
{:ok, state}
|
|
end
|
|
|
|
@doc false
|
|
def handle_call({:connect, url, headers}, from, state) do
|
|
uri = URI.parse(url)
|
|
|
|
http_scheme =
|
|
case uri.scheme do
|
|
"ws" -> :http
|
|
"wss" -> :https
|
|
end
|
|
|
|
ws_scheme =
|
|
case uri.scheme do
|
|
"ws" -> :ws
|
|
"wss" -> :wss
|
|
end
|
|
|
|
path =
|
|
case uri.query do
|
|
nil -> uri.path
|
|
query -> uri.path <> "?" <> query
|
|
end
|
|
|
|
with {:ok, conn} <- Mint.HTTP.connect(http_scheme, uri.host, uri.port),
|
|
{:ok, conn, ref} <- Mint.WebSocket.upgrade(ws_scheme, conn, path, headers) do
|
|
state = %{state | conn: conn, request_ref: ref, caller: from}
|
|
{:noreply, state}
|
|
else
|
|
{:error, reason} ->
|
|
{:reply, {:error, reason}, state}
|
|
|
|
{:error, conn, reason} ->
|
|
{:reply, {:error, reason}, put_in(state.conn, conn)}
|
|
end
|
|
end
|
|
|
|
@doc false
|
|
def handle_info(message, state) do
|
|
case Mint.WebSocket.stream(state.conn, message) do
|
|
{:ok, conn, responses} ->
|
|
state = put_in(state.conn, conn) |> handle_responses(responses)
|
|
if state.closing?, do: do_close(state), else: {:noreply, state}
|
|
|
|
{:error, conn, reason, _responses} ->
|
|
state = put_in(state.conn, conn) |> reply({:error, reason})
|
|
{:noreply, state}
|
|
|
|
:unknown ->
|
|
{:noreply, state}
|
|
end
|
|
end
|
|
|
|
defp do_close(state) do
|
|
# Streaming a close frame may fail if the server has already closed
|
|
# for writing.
|
|
_ = stream_frame(state, :close)
|
|
Mint.HTTP.close(state.conn)
|
|
{:stop, :normal, state}
|
|
end
|
|
|
|
defp handle_responses(state, responses)
|
|
|
|
defp handle_responses(%{request_ref: ref} = state, [{:status, ref, status} | rest]) do
|
|
put_in(state.status, status)
|
|
|> handle_responses(rest)
|
|
end
|
|
|
|
defp handle_responses(%{request_ref: ref} = state, [{:headers, ref, resp_headers} | rest]) do
|
|
put_in(state.resp_headers, resp_headers)
|
|
|> handle_responses(rest)
|
|
end
|
|
|
|
defp handle_responses(%{request_ref: ref} = state, [{:done, ref} | rest]) do
|
|
case Mint.WebSocket.new(state.conn, ref, state.status, state.resp_headers) do
|
|
{:ok, conn, websocket} ->
|
|
%{state | conn: conn, websocket: websocket, status: nil, resp_headers: nil}
|
|
|> reply({:ok, :connected})
|
|
|> handle_responses(rest)
|
|
|
|
{:error, conn, reason} ->
|
|
put_in(state.conn, conn)
|
|
|> reply({:error, reason})
|
|
end
|
|
end
|
|
|
|
defp handle_responses(%{request_ref: ref, websocket: websocket} = state, [
|
|
{:data, ref, data} | rest
|
|
])
|
|
when websocket != nil do
|
|
case Mint.WebSocket.decode(websocket, data) do
|
|
{:ok, websocket, frames} ->
|
|
put_in(state.websocket, websocket)
|
|
|> handle_frames(frames)
|
|
|> handle_responses(rest)
|
|
|
|
{:error, websocket, reason} ->
|
|
put_in(state.websocket, websocket)
|
|
|> reply({:error, reason})
|
|
end
|
|
end
|
|
|
|
defp handle_responses(state, [_response | rest]) do
|
|
handle_responses(state, rest)
|
|
end
|
|
|
|
defp handle_responses(state, []), do: state
|
|
|
|
defp handle_frames(state, frames) do
|
|
{frames, state} =
|
|
Enum.flat_map_reduce(frames, state, fn
|
|
# prepare to close the connection when a close frame is received
|
|
{:close, _code, _data}, state ->
|
|
{[], put_in(state.closing?, true)}
|
|
|
|
frame, state ->
|
|
{[frame], state}
|
|
end)
|
|
|
|
Enum.each(frames, &Kernel.send(state.sender, &1))
|
|
|
|
state
|
|
end
|
|
|
|
defp reply(state, response) do
|
|
if state.caller, do: GenServer.reply(state.caller, response)
|
|
put_in(state.caller, nil)
|
|
end
|
|
|
|
# Encodes a frame as a binary and sends it along the wire, keeping `conn`
|
|
# and `websocket` up to date in `state`.
|
|
defp stream_frame(state, frame) do
|
|
with {:ok, websocket, data} <- Mint.WebSocket.encode(state.websocket, frame),
|
|
state = put_in(state.websocket, websocket),
|
|
{:ok, conn} <- Mint.WebSocket.stream_request_body(state.conn, state.request_ref, data) do
|
|
{:ok, put_in(state.conn, conn)}
|
|
else
|
|
{:error, %Mint.WebSocket{} = websocket, reason} ->
|
|
{:error, put_in(state.websocket, websocket), reason}
|
|
|
|
{:error, conn, reason} ->
|
|
{:error, put_in(state.conn, conn), reason}
|
|
end
|
|
end
|
|
end
|