Merge pull request #77 from fly-apps/cm-active-users-sync

Active users sync
This commit is contained in:
Chris McCord 2024-06-05 14:57:48 -04:00 committed by GitHub
commit 07e7e279a9
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 88 additions and 23 deletions

View file

@ -21,10 +21,11 @@ defmodule LiveBeats.Application do
def start(_type, _args) do def start(_type, _args) do
parent = FLAME.Parent.get() parent = FLAME.Parent.get()
LiveBeats.MediaLibrary.attach() LiveBeats.MediaLibrary.attach()
whisper_serving? = parent || FLAME.Backend.impl() != FLAME.FlyBackend
children = children =
[ [
{Nx.Serving, name: LiveBeats.WhisperServing, serving: load_serving()}, whisper_serving? && {Nx.Serving, name: LiveBeats.WhisperServing, serving: load_serving()},
!parent && {DNSCluster, query: Application.get_env(:wps, :dns_cluster_query) || :ignore}, !parent && {DNSCluster, query: Application.get_env(:wps, :dns_cluster_query) || :ignore},
{Task.Supervisor, name: LiveBeats.TaskSupervisor}, {Task.Supervisor, name: LiveBeats.TaskSupervisor},
# Start the Ecto repository # Start the Ecto repository

View file

@ -23,7 +23,28 @@ defmodule LiveBeats.MediaLibrary do
def handle_execute({Accounts, %Accounts.Events.PublicSettingsChanged{user: user}}) do def handle_execute({Accounts, %Accounts.Events.PublicSettingsChanged{user: user}}) do
profile = get_profile!(user) profile = get_profile!(user)
broadcast!(user.id, %Events.PublicProfileUpdated{profile: profile})
active? =
Repo.exists?(
from(s in Song,
inner_join: u in LiveBeats.Accounts.User,
on: s.user_id == ^user.id,
where: s.status in [:playing],
limit: 1
)
)
if active? do
broadcast_active_profile!(profile)
else
broadcast_inactive_profile!(profile)
end
broadcast!(user.id, %Events.PublicProfileUpdated{profile: profile, active?: active?})
end
def subscribe_to_active_profiles do
Phoenix.PubSub.subscribe(@pubsub, "active_profiles")
end end
def subscribe_to_profile(%Profile{} = profile) do def subscribe_to_profile(%Profile{} = profile) do
@ -53,6 +74,8 @@ defmodule LiveBeats.MediaLibrary do
end end
def play_song(%Song{} = song) do def play_song(%Song{} = song) do
user = Accounts.get_user!(song.user_id)
played_at = played_at =
cond do cond do
playing?(song) -> playing?(song) ->
@ -86,6 +109,7 @@ defmodule LiveBeats.MediaLibrary do
elapsed = elapsed_playback(new_song) elapsed = elapsed_playback(new_song)
broadcast_active_profile!(get_profile!(user))
broadcast!(song.user_id, %Events.Play{song: song, elapsed: elapsed}) broadcast!(song.user_id, %Events.Play{song: song, elapsed: elapsed})
new_song new_song
@ -98,6 +122,8 @@ defmodule LiveBeats.MediaLibrary do
end end
def pause_song(%Song{} = song) do def pause_song(%Song{} = song) do
user = Accounts.get_user!(song.user_id)
now = DateTime.truncate(DateTime.utc_now(), :second) now = DateTime.truncate(DateTime.utc_now(), :second)
set = [status: :paused, paused_at: now] set = [status: :paused, paused_at: now]
pause_query = from(s in Song, where: s.id == ^song.id, update: [set: ^set]) pause_query = from(s in Song, where: s.id == ^song.id, update: [set: ^set])
@ -114,6 +140,7 @@ defmodule LiveBeats.MediaLibrary do
|> Multi.update_all(:now_paused, fn _ -> pause_query end, []) |> Multi.update_all(:now_paused, fn _ -> pause_query end, [])
|> Repo.transaction() |> Repo.transaction()
broadcast_inactive_profile!(get_profile!(user))
broadcast!(song.user_id, %Events.Pause{song: song}) broadcast!(song.user_id, %Events.Pause{song: song})
end end
@ -554,6 +581,16 @@ defmodule LiveBeats.MediaLibrary do
Phoenix.PubSub.broadcast_from!(@pubsub, pid, topic(user_id), {__MODULE__, msg}) Phoenix.PubSub.broadcast_from!(@pubsub, pid, topic(user_id), {__MODULE__, msg})
end end
defp broadcast_active_profile!(%Profile{} = profile) do
msg = %Events.PublicProfileActive{profile: profile}
Phoenix.PubSub.broadcast!(@pubsub, "active_profiles", {__MODULE__, msg})
end
defp broadcast_inactive_profile!(%Profile{} = profile) do
msg = %Events.PublicProfileInActive{profile: profile}
Phoenix.PubSub.broadcast!(@pubsub, "active_profiles", {__MODULE__, msg})
end
defp topic(user_id) when is_integer(user_id), do: "profile:#{user_id}" defp topic(user_id) when is_integer(user_id), do: "profile:#{user_id}"
defp validate_songs_limit(user_songs, new_songs_count) do defp validate_songs_limit(user_songs, new_songs_count) do

View file

@ -8,6 +8,14 @@ defmodule LiveBeats.MediaLibrary.Events do
end end
defmodule PublicProfileUpdated do defmodule PublicProfileUpdated do
defstruct profile: nil, active?: false
end
defmodule PublicProfileActive do
defstruct profile: nil
end
defmodule PublicProfileInActive do
defstruct profile: nil defstruct profile: nil
end end

View file

@ -574,8 +574,8 @@ defmodule LiveBeatsWeb.CoreComponents do
slot :col, required: true do slot :col, required: true do
attr :label, :string attr :label, :string
attr :class, :string attr :class, :any
attr :class!, :string attr :class!, :any
end end
def table(assigns) do def table(assigns) do

View file

@ -12,9 +12,10 @@ defmodule LiveBeatsWeb.Layouts do
<h3 class="px-3 text-xs font-semibold text-gray-500 uppercase tracking-wider" id={@id}> <h3 class="px-3 text-xs font-semibold text-gray-500 uppercase tracking-wider" id={@id}>
Active Users Active Users
</h3> </h3>
<div class="mt-1 space-y-1" role="group" aria-labelledby={@id}> <div id={"#{@id}-stream"} phx-update="stream" class="mt-1 space-y-1" role="group" aria-labelledby={@id}>
<%= for user <- @users do %>
<.link <.link
:for={{id, user} <- @users}
id={id}
navigate={profile_path(user)} navigate={profile_path(user)}
class="group flex items-center px-3 py-2 text-base leading-5 font-medium text-gray-600 rounded-md hover:text-gray-900 hover:bg-gray-50" class="group flex items-center px-3 py-2 text-base leading-5 font-medium text-gray-600 rounded-md hover:text-gray-900 hover:bg-gray-50"
> >
@ -23,7 +24,6 @@ defmodule LiveBeatsWeb.Layouts do
<%= user.username %> <%= user.username %>
</span> </span>
</.link> </.link>
<% end %>
</div> </div>
</div> </div>
""" """

View file

@ -57,7 +57,7 @@
<%= if @current_user do %> <%= if @current_user do %>
<.sidebar_nav_links current_user={@current_user} active_tab={@active_tab} /> <.sidebar_nav_links current_user={@current_user} active_tab={@active_tab} />
<% end %> <% end %>
<.sidebar_active_users id="desktop-active-users" users={@active_users} /> <.sidebar_active_users id="mobile-active-users" users={@streams.mobile_active_users} />
</nav> </nav>
</div> </div>
</div> </div>
@ -121,7 +121,7 @@
<.sidebar_nav_links current_user={@current_user} active_tab={@active_tab} /> <.sidebar_nav_links current_user={@current_user} active_tab={@active_tab} />
<% end %> <% end %>
<!-- Secondary navigation --> <!-- Secondary navigation -->
<.sidebar_active_users id="mobile-active-users" users={@active_users} /> <.sidebar_active_users id="desktop-active-users" users={@streams.active_users} />
</nav> </nav>
</div> </div>
</div> </div>

View file

@ -7,7 +7,7 @@ defmodule LiveBeatsWeb.Endpoint do
@session_options [ @session_options [
store: :cookie, store: :cookie,
key: "_live_beats_key_v1", key: "_live_beats_key_v1",
signing_salt: "9OALgV62" signing_salt: "1a9OALgV62"
] ]
socket "/live", Phoenix.LiveView.Socket, socket "/live", Phoenix.LiveView.Socket,

View file

@ -6,12 +6,22 @@ defmodule LiveBeatsWeb.Nav do
alias LiveBeatsWeb.{ProfileLive, SettingsLive} alias LiveBeatsWeb.{ProfileLive, SettingsLive}
def on_mount(:default, _params, _session, socket) do def on_mount(:default, _params, _session, socket) do
if connected?(socket) do
MediaLibrary.subscribe_to_active_profiles()
end
active_users = MediaLibrary.list_active_profiles(limit: 20)
{:cont, {:cont,
socket socket
|> assign(active_users: MediaLibrary.list_active_profiles(limit: 20)) |> stream_configure(:mobile_active_users, dom_id: &"mobile_active-#{&1.user_id}")
|> stream_configure(:active_users, dom_id: &"active-#{&1.user_id}")
|> stream(:active_users, active_users)
|> stream(:mobile_active_users, active_users)
|> assign(:region, System.get_env("FLY_REGION") || "iad") |> assign(:region, System.get_env("FLY_REGION") || "iad")
|> attach_hook(:active_tab, :handle_params, &handle_active_tab_params/3) |> attach_hook(:active_tab, :handle_params, &handle_active_tab_params/3)
|> attach_hook(:ping, :handle_event, &handle_event/3)} |> attach_hook(:ping, :handle_event, &handle_event/3)
|> attach_hook(:active_profile_changes, :handle_info, &handle_info/2)}
end end
defp handle_active_tab_params(params, _url, socket) do defp handle_active_tab_params(params, _url, socket) do
@ -41,6 +51,16 @@ defmodule LiveBeatsWeb.Nav do
defp handle_event(_, _, socket), do: {:cont, socket} defp handle_event(_, _, socket), do: {:cont, socket}
defp handle_info({MediaLibrary, %MediaLibrary.Events.PublicProfileActive{} = update}, socket) do
{:halt, stream_insert(socket, :active_users, update.profile)}
end
defp handle_info({MediaLibrary, %MediaLibrary.Events.PublicProfileInActive{} = update}, socket) do
{:halt, stream_delete(socket, :active_users, update.profile)}
end
defp handle_info(_, socket), do: {:cont, socket}
defp rate_limited_ping_broadcast(socket, %Accounts.User{} = user, rtt) when is_integer(rtt) do defp rate_limited_ping_broadcast(socket, %Accounts.User{} = user, rtt) when is_integer(rtt) do
now = System.system_time(:millisecond) now = System.system_time(:millisecond)
last_ping_at = socket.assigns[:last_ping_at] last_ping_at = socket.assigns[:last_ping_at]

View file

@ -17,8 +17,7 @@ defmodule LiveBeatsWeb.ProfileLive.SongEntryComponent do
<% else %> <% else %>
Title Title
<span class="text-gray-400"> <span class="text-gray-400">
(calculating duration (calculating duration)
<.spinner class="inline-block animate-spin h-2.5 w-2.5 text-gray-400" />)
</span> </span>
<% end %> <% end %>
</label> </label>