From f835e075ea512aca7330243ccf3835588ee2aeab Mon Sep 17 00:00:00 2001 From: Matthew Waters Date: Mon, 9 Dec 2024 16:10:28 +1100 Subject: [PATCH] livekit: add room-timeout Produces an error message and disconnects when there is no other participant in the room for room-timeout milliseconds. Part-of: --- net/webrtc/src/livekit_signaller/imp.rs | 180 ++++++++++++++++++++---- 1 file changed, 150 insertions(+), 30 deletions(-) diff --git a/net/webrtc/src/livekit_signaller/imp.rs b/net/webrtc/src/livekit_signaller/imp.rs index 141aba6c2..21f5148a9 100644 --- a/net/webrtc/src/livekit_signaller/imp.rs +++ b/net/webrtc/src/livekit_signaller/imp.rs @@ -17,6 +17,7 @@ use std::sync::LazyLock; use std::sync::{Arc, Mutex}; use tokio::sync::oneshot; use tokio::task::JoinHandle; +use tokio::time::{Duration, Instant}; use livekit_api::access_token::{AccessToken, VideoGrants}; use livekit_api::signal_client; @@ -45,6 +46,7 @@ struct Settings { producer_peer_id: Option, excluded_produder_peer_ids: Vec, timeout: u32, + room_timeout: u32, } impl Default for Settings { @@ -61,6 +63,7 @@ impl Default for Settings { producer_peer_id: None, excluded_produder_peer_ids: vec![], timeout: DEFAULT_TRACK_PUBLISH_TIMEOUT, + room_timeout: 0, } } } @@ -71,6 +74,7 @@ pub struct Signaller { connection: Mutex>, join_canceller: Mutex>, signal_task_canceller: Mutex>, + room_timeout_task_canceller: Mutex>, } struct Channels { @@ -86,6 +90,9 @@ struct Connection { channels: Option, participants: HashMap, state: ConnectionState, + room_timeout_task: Option>, + last_participant_lost_at: Option, + our_participant_sid: String, } #[derive(Debug, Default, Copy, Clone, PartialEq, Eq, glib::Enum)] @@ -319,15 +326,6 @@ impl Signaller { } proto::signal_response::Message::Update(update) => { - if !self.is_subscriber() { - gst::trace!( - CAT, - imp = self, - "Ignoring update in non-subscriber mode: {:?}", - update - ); - return; - } gst::debug!(CAT, imp = self, "Update: {:?}", update); for participant in update.participants { self.on_participant(&participant, true) @@ -342,6 +340,72 @@ impl Signaller { } } + fn start_room_timeout_task(&self) { + let Some(connection) = &mut *self.connection.lock().unwrap() else { + return; + }; + + let weak_imp = self.downgrade(); + let room_timeout_task = RUNTIME.spawn(async move { + if let Some(imp) = weak_imp.upgrade() { + imp.room_timeout_task().await; + } + }); + connection.room_timeout_task = Some(room_timeout_task); + } + + fn stop_room_timeout_task(&self) { + if let Some(canceller) = self.room_timeout_task_canceller.lock().unwrap().take() { + canceller.abort(); + } + let room_timeout_task = self + .connection + .lock() + .unwrap() + .as_mut() + .and_then(|connection| connection.room_timeout_task.take()); + if let Some(room_timeout_task) = room_timeout_task { + block_on(room_timeout_task).unwrap(); + } + } + + async fn room_timeout_task(&self) { + loop { + let room_timeout = self.settings.lock().unwrap().clone().room_timeout; + if room_timeout == 0 { + return; + } + let room_timeout = Duration::from_millis(room_timeout as u64); + let now = Instant::now(); + let end = { + let Some(state) = &*self.connection.lock().unwrap() else { + return; + }; + state + .last_participant_lost_at + .map(|initial| initial + room_timeout) + .unwrap_or(now + Duration::from_secs(60)) + }; + + if end < now { + self.raise_error("Room timeout reached with no other participants".to_owned()); + return; + } + let fut = tokio::time::sleep_until(end); + + match wait_async(&self.room_timeout_task_canceller, fut, 0).await { + Ok(()) => (), + Err(err) => match err { + WaitError::FutureAborted => { + gst::debug!(CAT, imp = self, "Closing room_timeout_task"); + break; + } + WaitError::FutureError(err) => self.raise_error(err.to_string()), + }, + } + } + } + fn send_sdp_answer(&self, _session_id: &str, sessdesc: &gst_webrtc::WebRTCSessionDescription) { let weak_imp = self.downgrade(); let sessdesc = sessdesc.clone(); @@ -498,38 +562,63 @@ impl Signaller { fn on_participant(&self, participant: &proto::ParticipantInfo, new_connection: bool) { gst::debug!(CAT, imp = self, "{:?}", participant); - if !participant.is_publisher { - return; - } let peer_sid = &participant.sid; let peer_identity = &participant.identity; - match self.producer_peer_id() { - Some(id) if id == *peer_sid => { - gst::debug!(CAT, imp = self, "matching peer sid {id:?}"); - } - Some(id) if id == *peer_identity => { - gst::debug!(CAT, imp = self, "matching peer identity {id:?}"); - } - None => { - if self.is_peer_excluded(peer_sid) || self.is_peer_excluded(peer_identity) { - gst::debug!(CAT, imp = self, "ignoring excluded peer {participant:?}"); - return; + + if participant.is_publisher && self.is_subscriber() { + match self.producer_peer_id() { + Some(id) if id == *peer_sid => { + gst::debug!(CAT, imp = self, "matching peer sid {id:?}"); } - gst::debug!(CAT, imp = self, "catch-all mode, matching {participant:?}"); + Some(id) if id == *peer_identity => { + gst::debug!(CAT, imp = self, "matching peer identity {id:?}"); + } + None => { + if self.is_peer_excluded(peer_sid) || self.is_peer_excluded(peer_identity) { + gst::debug!(CAT, imp = self, "ignoring excluded peer {participant:?}"); + return; + } + gst::debug!(CAT, imp = self, "catch-all mode, matching {participant:?}"); + } + _ => return, } - _ => return, + return; } - if participant.state != proto::participant_info::State::Disconnected as i32 { - if let Some(ref mut connection) = &mut *self.connection.lock().unwrap() { + let mut connection = self.connection.lock().unwrap(); + if let Some(ref mut connection) = &mut *connection { + if participant.state == proto::participant_info::State::Disconnected as i32 { + connection.participants.remove(&participant.sid); + if connection + .participants + .iter() + .next() + .is_none_or(|(id, _part)| id == &connection.our_participant_sid) + { + gst::debug!( + CAT, + imp = self, + "no other participants, starting room timeout" + ); + connection.last_participant_lost_at = Some(Instant::now()); + } + } else if participant.sid != connection.our_participant_sid { if !connection.participants.contains_key(&participant.sid) { connection .participants .insert(participant.sid.clone(), participant.clone()); } + gst::debug!( + CAT, + imp = self, + "have at least one other participant, unsetting room-timeout" + ); + connection.last_participant_lost_at = None; } - } else if let Some(ref mut connection) = &mut *self.connection.lock().unwrap() { - connection.participants.remove(&participant.sid); + } + + if !participant.is_publisher || !self.is_subscriber() { + return; } let meta = Some(&participant.metadata) @@ -703,7 +792,7 @@ impl SignallableImpl for Signaller { } }); - let connection = Connection { + let mut connection = Connection { signal_client, signal_task, pending_tracks: Default::default(), @@ -711,8 +800,18 @@ impl SignallableImpl for Signaller { channels: None, participants: HashMap::default(), state: ConnectionState::ServerConnected, + last_participant_lost_at: Some(Instant::now()), + room_timeout_task: None, + our_participant_sid: join_response + .participant + .map(|p| p.sid.clone()) + .unwrap_or_default(), }; + if !join_response.other_participants.is_empty() { + connection.last_participant_lost_at = None; + } *imp.connection.lock().unwrap() = Some(connection); + imp.start_room_timeout_task(); imp.obj().notify("connection-state"); if imp.is_subscriber() { @@ -834,6 +933,9 @@ impl SignallableImpl for Signaller { if let Some(canceller) = &*self.signal_task_canceller.lock().unwrap() { canceller.abort(); } + if let Some(canceller) = &*self.room_timeout_task_canceller.lock().unwrap() { + canceller.abort(); + } if let Some(connection) = self.connection.lock().unwrap().take() { block_on(connection.signal_task).unwrap(); @@ -939,6 +1041,14 @@ impl ObjectImpl for Signaller { .default_value(ConnectionState::ServerDisconnected) .read_only() .build(), + glib::ParamSpecUInt::builder("room-timeout") + .nick("Room timeout") + .blurb("How many milliseconds to stay in the room if there are no other participants in the room (0 = disabled)") + .minimum(0) + .maximum(u32::MAX) + .default_value(0) + .flags(glib::ParamFlags::READWRITE) + .build(), ] }); @@ -984,6 +1094,15 @@ impl ObjectImpl for Signaller { .map(|id| id.to_string()) .collect::>() } + "room-timeout" => { + let val = value.get().unwrap(); + if settings.room_timeout != val { + settings.room_timeout = val; + drop(settings); + self.stop_room_timeout_task(); + self.start_room_timeout_task(); + } + } _ => unimplemented!(), } } @@ -1029,6 +1148,7 @@ impl ObjectImpl for Signaller { }; connection.state.to_value() } + "room-timeout" => settings.room_timeout.to_value(), _ => unimplemented!(), } }