livekit: add room-timeout

Produces an error message and disconnects when there is no other participant
in the room for room-timeout milliseconds.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/2307>
This commit is contained in:
Matthew Waters 2024-12-09 16:10:28 +11:00 committed by GStreamer Marge Bot
parent a5f0b5947b
commit f835e075ea

View file

@ -17,6 +17,7 @@ use std::sync::LazyLock;
use std::sync::{Arc, Mutex};
use tokio::sync::oneshot;
use tokio::task::JoinHandle;
use tokio::time::{Duration, Instant};
use livekit_api::access_token::{AccessToken, VideoGrants};
use livekit_api::signal_client;
@ -45,6 +46,7 @@ struct Settings {
producer_peer_id: Option<String>,
excluded_produder_peer_ids: Vec<String>,
timeout: u32,
room_timeout: u32,
}
impl Default for Settings {
@ -61,6 +63,7 @@ impl Default for Settings {
producer_peer_id: None,
excluded_produder_peer_ids: vec![],
timeout: DEFAULT_TRACK_PUBLISH_TIMEOUT,
room_timeout: 0,
}
}
}
@ -71,6 +74,7 @@ pub struct Signaller {
connection: Mutex<Option<Connection>>,
join_canceller: Mutex<Option<futures::future::AbortHandle>>,
signal_task_canceller: Mutex<Option<futures::future::AbortHandle>>,
room_timeout_task_canceller: Mutex<Option<futures::future::AbortHandle>>,
}
struct Channels {
@ -86,6 +90,9 @@ struct Connection {
channels: Option<Channels>,
participants: HashMap<String, proto::ParticipantInfo>,
state: ConnectionState,
room_timeout_task: Option<JoinHandle<()>>,
last_participant_lost_at: Option<Instant>,
our_participant_sid: String,
}
#[derive(Debug, Default, Copy, Clone, PartialEq, Eq, glib::Enum)]
@ -319,15 +326,6 @@ impl Signaller {
}
proto::signal_response::Message::Update(update) => {
if !self.is_subscriber() {
gst::trace!(
CAT,
imp = self,
"Ignoring update in non-subscriber mode: {:?}",
update
);
return;
}
gst::debug!(CAT, imp = self, "Update: {:?}", update);
for participant in update.participants {
self.on_participant(&participant, true)
@ -342,6 +340,72 @@ impl Signaller {
}
}
fn start_room_timeout_task(&self) {
let Some(connection) = &mut *self.connection.lock().unwrap() else {
return;
};
let weak_imp = self.downgrade();
let room_timeout_task = RUNTIME.spawn(async move {
if let Some(imp) = weak_imp.upgrade() {
imp.room_timeout_task().await;
}
});
connection.room_timeout_task = Some(room_timeout_task);
}
fn stop_room_timeout_task(&self) {
if let Some(canceller) = self.room_timeout_task_canceller.lock().unwrap().take() {
canceller.abort();
}
let room_timeout_task = self
.connection
.lock()
.unwrap()
.as_mut()
.and_then(|connection| connection.room_timeout_task.take());
if let Some(room_timeout_task) = room_timeout_task {
block_on(room_timeout_task).unwrap();
}
}
async fn room_timeout_task(&self) {
loop {
let room_timeout = self.settings.lock().unwrap().clone().room_timeout;
if room_timeout == 0 {
return;
}
let room_timeout = Duration::from_millis(room_timeout as u64);
let now = Instant::now();
let end = {
let Some(state) = &*self.connection.lock().unwrap() else {
return;
};
state
.last_participant_lost_at
.map(|initial| initial + room_timeout)
.unwrap_or(now + Duration::from_secs(60))
};
if end < now {
self.raise_error("Room timeout reached with no other participants".to_owned());
return;
}
let fut = tokio::time::sleep_until(end);
match wait_async(&self.room_timeout_task_canceller, fut, 0).await {
Ok(()) => (),
Err(err) => match err {
WaitError::FutureAborted => {
gst::debug!(CAT, imp = self, "Closing room_timeout_task");
break;
}
WaitError::FutureError(err) => self.raise_error(err.to_string()),
},
}
}
}
fn send_sdp_answer(&self, _session_id: &str, sessdesc: &gst_webrtc::WebRTCSessionDescription) {
let weak_imp = self.downgrade();
let sessdesc = sessdesc.clone();
@ -498,38 +562,63 @@ impl Signaller {
fn on_participant(&self, participant: &proto::ParticipantInfo, new_connection: bool) {
gst::debug!(CAT, imp = self, "{:?}", participant);
if !participant.is_publisher {
return;
}
let peer_sid = &participant.sid;
let peer_identity = &participant.identity;
match self.producer_peer_id() {
Some(id) if id == *peer_sid => {
gst::debug!(CAT, imp = self, "matching peer sid {id:?}");
}
Some(id) if id == *peer_identity => {
gst::debug!(CAT, imp = self, "matching peer identity {id:?}");
}
None => {
if self.is_peer_excluded(peer_sid) || self.is_peer_excluded(peer_identity) {
gst::debug!(CAT, imp = self, "ignoring excluded peer {participant:?}");
return;
if participant.is_publisher && self.is_subscriber() {
match self.producer_peer_id() {
Some(id) if id == *peer_sid => {
gst::debug!(CAT, imp = self, "matching peer sid {id:?}");
}
gst::debug!(CAT, imp = self, "catch-all mode, matching {participant:?}");
Some(id) if id == *peer_identity => {
gst::debug!(CAT, imp = self, "matching peer identity {id:?}");
}
None => {
if self.is_peer_excluded(peer_sid) || self.is_peer_excluded(peer_identity) {
gst::debug!(CAT, imp = self, "ignoring excluded peer {participant:?}");
return;
}
gst::debug!(CAT, imp = self, "catch-all mode, matching {participant:?}");
}
_ => return,
}
_ => return,
return;
}
if participant.state != proto::participant_info::State::Disconnected as i32 {
if let Some(ref mut connection) = &mut *self.connection.lock().unwrap() {
let mut connection = self.connection.lock().unwrap();
if let Some(ref mut connection) = &mut *connection {
if participant.state == proto::participant_info::State::Disconnected as i32 {
connection.participants.remove(&participant.sid);
if connection
.participants
.iter()
.next()
.is_none_or(|(id, _part)| id == &connection.our_participant_sid)
{
gst::debug!(
CAT,
imp = self,
"no other participants, starting room timeout"
);
connection.last_participant_lost_at = Some(Instant::now());
}
} else if participant.sid != connection.our_participant_sid {
if !connection.participants.contains_key(&participant.sid) {
connection
.participants
.insert(participant.sid.clone(), participant.clone());
}
gst::debug!(
CAT,
imp = self,
"have at least one other participant, unsetting room-timeout"
);
connection.last_participant_lost_at = None;
}
} else if let Some(ref mut connection) = &mut *self.connection.lock().unwrap() {
connection.participants.remove(&participant.sid);
}
if !participant.is_publisher || !self.is_subscriber() {
return;
}
let meta = Some(&participant.metadata)
@ -703,7 +792,7 @@ impl SignallableImpl for Signaller {
}
});
let connection = Connection {
let mut connection = Connection {
signal_client,
signal_task,
pending_tracks: Default::default(),
@ -711,8 +800,18 @@ impl SignallableImpl for Signaller {
channels: None,
participants: HashMap::default(),
state: ConnectionState::ServerConnected,
last_participant_lost_at: Some(Instant::now()),
room_timeout_task: None,
our_participant_sid: join_response
.participant
.map(|p| p.sid.clone())
.unwrap_or_default(),
};
if !join_response.other_participants.is_empty() {
connection.last_participant_lost_at = None;
}
*imp.connection.lock().unwrap() = Some(connection);
imp.start_room_timeout_task();
imp.obj().notify("connection-state");
if imp.is_subscriber() {
@ -834,6 +933,9 @@ impl SignallableImpl for Signaller {
if let Some(canceller) = &*self.signal_task_canceller.lock().unwrap() {
canceller.abort();
}
if let Some(canceller) = &*self.room_timeout_task_canceller.lock().unwrap() {
canceller.abort();
}
if let Some(connection) = self.connection.lock().unwrap().take() {
block_on(connection.signal_task).unwrap();
@ -939,6 +1041,14 @@ impl ObjectImpl for Signaller {
.default_value(ConnectionState::ServerDisconnected)
.read_only()
.build(),
glib::ParamSpecUInt::builder("room-timeout")
.nick("Room timeout")
.blurb("How many milliseconds to stay in the room if there are no other participants in the room (0 = disabled)")
.minimum(0)
.maximum(u32::MAX)
.default_value(0)
.flags(glib::ParamFlags::READWRITE)
.build(),
]
});
@ -984,6 +1094,15 @@ impl ObjectImpl for Signaller {
.map(|id| id.to_string())
.collect::<Vec<String>>()
}
"room-timeout" => {
let val = value.get().unwrap();
if settings.room_timeout != val {
settings.room_timeout = val;
drop(settings);
self.stop_room_timeout_task();
self.start_room_timeout_task();
}
}
_ => unimplemented!(),
}
}
@ -1029,6 +1148,7 @@ impl ObjectImpl for Signaller {
};
connection.state.to_value()
}
"room-timeout" => settings.room_timeout.to_value(),
_ => unimplemented!(),
}
}