diff --git a/assets/js/app.js b/assets/js/app.js index 7abeb7f..08093cd 100644 --- a/assets/js/app.js +++ b/assets/js/app.js @@ -195,6 +195,7 @@ window.addEventListener("phx:page-loading-stop", info => topbar.hide()) window.addEventListener("phx:page-loading-stop", routeUpdated) window.addEventListener("js:exec", e => e.target[e.detail.call](...e.detail.args)) +window.addEventListener("phx:remove-el", e => document.getElementById(e.detail.id).remove()) // connect if there are any LiveViews on the page liveSocket.getSocket().onOpen(() => execJS("#connection-status", "js-hide")) diff --git a/lib/live_beats/accounts.ex b/lib/live_beats/accounts.ex index 9b3070e..6ec2ac0 100644 --- a/lib/live_beats/accounts.ex +++ b/lib/live_beats/accounts.ex @@ -21,6 +21,10 @@ defmodule LiveBeats.Accounts do Repo.all(from u in User, limit: ^Keyword.fetch!(opts, :limit)) end + def get_users_map(user_ids) when is_list(user_ids) do + Repo.all(from u in User, where: u.id in ^user_ids, select: {u.id, u}) + end + def lists_users_by_active_profile(id, opts) do Repo.all( from u in User, where: u.active_profile_user_id == ^id, limit: ^Keyword.fetch!(opts, :limit) diff --git a/lib/live_beats/application.ex b/lib/live_beats/application.ex index e40f2a0..e6ff89f 100644 --- a/lib/live_beats/application.ex +++ b/lib/live_beats/application.ex @@ -17,8 +17,16 @@ defmodule LiveBeats.Application do LiveBeatsWeb.Telemetry, # Start the PubSub system {Phoenix.PubSub, name: LiveBeats.PubSub}, + # start presence + LiveBeatsWeb.Presence, + {Phoenix.Presence.Client, + client: LiveBeats.PresenceClient, + pubsub: LiveBeats.PubSub, + presence: LiveBeatsWeb.Presence, + name: PresenceClient}, # Start the Endpoint (http/https) LiveBeatsWeb.Endpoint + # Start a worker by calling: LiveBeats.Worker.start_link(arg) # {LiveBeats.Worker, arg} ] diff --git a/lib/live_beats/presence/phoenix_presence_client.ex b/lib/live_beats/presence/phoenix_presence_client.ex new file mode 100644 index 0000000..afda887 --- /dev/null +++ b/lib/live_beats/presence/phoenix_presence_client.ex @@ -0,0 +1,203 @@ +defmodule Phoenix.Presence.Client do + use GenServer + + @callback init(state :: term) :: {:ok, new_state :: term} + @callback handle_join(topic :: String.t(), key :: String.t(), meta :: [map()], state :: term) :: + {:ok, term} + @callback handle_leave(topic :: String.t(), key :: String.t(), meta :: [map()], state :: term) :: + {:ok, term} + + @doc """ + TODO + + ## Options + + * `:pubsub` - The required name of the pubsub server + * `:presence` - The required name of the presence module + * `:client` - The required callback module + """ + def start_link(opts) do + case Keyword.fetch(opts, :name) do + {:ok, name} -> + GenServer.start_link(__MODULE__, opts, name: name) + + :error -> + GenServer.start_link(__MODULE__, opts) + end + end + + def track(pid \\ PresenceClient, topic, key, meta) do + GenServer.call(pid, {:track, self(), topic, to_string(key), meta}) + end + + def untrack(pid \\ PresenceClient, topic, key) do + GenServer.call(pid, {:untrack, self(), topic, to_string(key)}) + end + + def init(opts) do + client = Keyword.fetch!(opts, :client) + {:ok, client_state} = client.init(%{}) + + state = %{ + topics: %{}, + client: client, + pubsub: Keyword.fetch!(opts, :pubsub), + presence_mod: Keyword.fetch!(opts, :presence), + client_state: client_state + } + + {:ok, state} + end + + def handle_info(%{topic: topic, event: "presence_diff", payload: diff}, state) do + {:noreply, merge_diff(state, topic, diff)} + end + + def handle_call(:state, _from, state) do + {:reply, state, state} + end + + def handle_call({:track, pid, topic, key, meta}, _from, state) do + {:reply, :ok, track_pid(state, pid, topic, key, meta)} + end + + def handle_call({:untrack, pid, topic, key}, _from, state) do + {:reply, :ok, untrack_pid(state, pid, topic, key)} + end + + defp track_pid(state, pid, topic, key, meta) do + # presences are handled when the presence_diff event is received + case Map.fetch(state.topics, topic) do + {:ok, _topic_content} -> + state.presence_mod.track(pid, topic, key, meta) + state + + :error -> + # subscribe to topic we weren't yet tracking + Phoenix.PubSub.subscribe(state.pubsub, topic) + state.presence_mod.track(pid, topic, key, meta) + state + end + end + + defp untrack_pid(state, pid, topic, key) do + if Map.has_key?(state.topics, topic) do + state.presence_mod.untrack(pid, topic, key) + else + state + end + end + + defp merge_diff(state, topic, %{leaves: leaves, joins: joins}) do + # add new topic if needed + updated_state = + if Map.has_key?(state.topics, topic) do + state + else + update_topics_state(:add_new_topic, state, topic) + end + + # merge diff into state.topics + {updated_state, _topic} = Enum.reduce(joins, {updated_state, topic}, &handle_join/2) + {updated_state, _topic} = Enum.reduce(leaves, {updated_state, topic}, &handle_leave/2) + + # if no more presences for given topic, unsubscribe and remove topic + if topic_presences_count(updated_state, topic) == 0 do + Phoenix.PubSub.unsubscribe(state.pubsub, topic) + update_topics_state(:remove_topic, updated_state, topic) + else + updated_state + end + end + + defp handle_join({joined_key, meta}, {state, topic}) do + joined_meta = Map.get(meta, :metas, []) + + updated_state = + update_topics_state(:add_new_presence_or_metas, state, topic, joined_key, joined_meta) + + {:ok, updated_client_state} = + state.client.handle_join(topic, joined_key, meta, state.client_state) + + updated_state = Map.put(updated_state, :client_state, updated_client_state) + + {updated_state, topic} + end + + defp handle_leave({left_key, meta}, {state, topic}) do + updated_state = update_topics_state(:remove_presence_or_metas, state, topic, left_key, meta) + + {:ok, updated_client_state} = + state.client.handle_leave(topic, left_key, meta, state.client_state) + + updated_state = Map.put(updated_state, :client_state, updated_client_state) + + {updated_state, topic} + end + + defp update_topics_state(:add_new_topic, %{topics: topics} = state, topic) do + updated_topics = Map.put_new(topics, topic, %{}) + Map.put(state, :topics, updated_topics) + end + + defp update_topics_state(:remove_topic, %{topics: topics} = state, topic) do + updated_topics = Map.delete(topics, topic) + Map.put(state, :topics, updated_topics) + end + + defp update_topics_state( + :add_new_presence_or_metas, + %{topics: topics} = state, + topic, + key, + new_metas + ) do + topic_info = topics[topic] + + updated_topic = + case Map.fetch(topic_info, key) do + # existing presence, add new metas + {:ok, existing_metas} -> + remaining_metas = new_metas -- existing_metas + updated_metas = existing_metas ++ remaining_metas + Map.put(topic_info, key, updated_metas) + + :error -> + # there are no presences for that key + Map.put(topic_info, key, new_metas) + end + + updated_topics = Map.put(topics, topic, updated_topic) + + Map.put(state, :topics, updated_topics) + end + + defp update_topics_state( + :remove_presence_or_metas, + %{topics: topics} = state, + topic, + key, + deleted_metas + ) do + topic_info = topics[topic] + + state_metas = Map.get(topic_info, key, []) + remaining_metas = state_metas -- Map.get(deleted_metas, :metas, []) + + updated_topic = + case remaining_metas do + # delete presence + [] -> Map.delete(topic_info, key) + # delete metas + _ -> Map.put(topic_info, key, remaining_metas) + end + + updated_topics = Map.put(topics, topic, updated_topic) + + Map.put(state, :topics, updated_topics) + end + + defp topic_presences_count(state, topic) do + map_size(state.topics[topic]) + end +end diff --git a/lib/live_beats/presence/presence_client.ex b/lib/live_beats/presence/presence_client.ex new file mode 100644 index 0000000..0ddec5e --- /dev/null +++ b/lib/live_beats/presence/presence_client.ex @@ -0,0 +1,57 @@ +defmodule LiveBeats.PresenceClient do + @behaviour Phoenix.Presence.Client + + @presence LiveBeatsWeb.Presence + @pubsub LiveBeats.PubSub + + def list(topic) do + @presence.list(topic) + end + + @impl Phoenix.Presence.Client + def init(_opts) do + # user-land state + {:ok, %{}} + end + + @impl Phoenix.Presence.Client + def handle_join(topic, _key, presence, state) do + active_users_topic = + topic + |> profile_identifier() + |> active_users_topic() + + Phoenix.PubSub.local_broadcast( + @pubsub, + active_users_topic, + {__MODULE__, %{user_joined: presence}} + ) + + {:ok, state} + end + + @impl Phoenix.Presence.Client + def handle_leave(topic, _key, presence, state) do + active_users_topic = + topic + |> profile_identifier() + |> active_users_topic() + + Phoenix.PubSub.local_broadcast( + @pubsub, + active_users_topic, + {__MODULE__, %{user_left: presence}} + ) + + {:ok, state} + end + + defp active_users_topic(user_id) do + "active_users:#{user_id}" + end + + defp profile_identifier(topic) do + "active_profile:" <> identifier = topic + identifier + end +end diff --git a/lib/live_beats_web/channels/presence.ex b/lib/live_beats_web/channels/presence.ex index 9378943..d203309 100644 --- a/lib/live_beats_web/channels/presence.ex +++ b/lib/live_beats_web/channels/presence.ex @@ -5,20 +5,30 @@ defmodule LiveBeatsWeb.Presence do See the [`Phoenix.Presence`](http://hexdocs.pm/phoenix/Phoenix.Presence.html) docs for more details. """ - use Phoenix.Presence, otp_app: :live_beats, - pubsub_server: LiveBeats.PubSub + use Phoenix.Presence, + otp_app: :live_beats, + pubsub_server: LiveBeats.PubSub import Phoenix.LiveView.Helpers import LiveBeatsWeb.LiveHelpers + @pubsub LiveBeats.PubSub + + alias LiveBeats.Accounts def listening_now(assigns) do ~H"""
-

Who's Listening

-