From abf97ba2a97b85122465401037a6e8c674066d3a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Brzezi=C5=84ski?= Date: Mon, 14 Apr 2025 16:36:17 +0200 Subject: [PATCH] webrtc: Allow producers to start a session for a chosen consumer Part-of: --- .../examples/webrtc-precise-sync-recv.rs | 2 +- net/webrtc/protocol/src/lib.rs | 17 +- net/webrtc/signalling/src/handlers/mod.rs | 149 +++++++++++++----- net/webrtc/src/signaller/imp.rs | 20 ++- net/webrtc/src/webrtcsrc/imp.rs | 2 +- 5 files changed, 140 insertions(+), 50 deletions(-) diff --git a/net/webrtc/examples/webrtc-precise-sync-recv.rs b/net/webrtc/examples/webrtc-precise-sync-recv.rs index b9122c745..3b7119707 100644 --- a/net/webrtc/examples/webrtc-precise-sync-recv.rs +++ b/net/webrtc/examples/webrtc-precise-sync-recv.rs @@ -666,7 +666,7 @@ async fn listen( .unwrap_or_else(|| Err(anyhow!("Signaller ended session"))) .context("List response")? { - FromSignaller::List { producers } => { + FromSignaller::List { producers, .. } => { for peer in producers { spawn_consumer(&signaller_url, &pipeline, args.clone(), peer.id, peer.meta) .context("Spawning consumer")?; diff --git a/net/webrtc/protocol/src/lib.rs b/net/webrtc/protocol/src/lib.rs index 2e668d148..0ba0d92d1 100644 --- a/net/webrtc/protocol/src/lib.rs +++ b/net/webrtc/protocol/src/lib.rs @@ -35,9 +35,12 @@ pub enum OutgoingMessage { EndSession(EndSessionMessage), /// Messages directly forwarded from one peer to another Peer(PeerMessage), - /// Provides the current list of consumer peers + /// Provides the current list of producers and passive consumers (awaiting session initiation) #[serde(rename_all = "camelCase")] - List { producers: Vec }, + List { + producers: Vec, + consumers: Vec, + }, /// Notifies that an error occurred with the peer's current session #[serde(rename_all = "camelCase")] Error { details: String }, @@ -51,6 +54,8 @@ pub enum PeerRole { Producer, /// Register as a listener Listener, + /// Register as a passive consumer (awaiting a session initiated by a producer peer) + Consumer, } #[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Default, Clone)] @@ -71,11 +76,15 @@ impl PeerStatus { pub fn listening(&self) -> bool { self.roles.iter().any(|t| matches!(t, PeerRole::Listener)) } + + pub fn consuming(&self) -> bool { + self.roles.iter().any(|t| matches!(t, PeerRole::Consumer)) + } } #[derive(Serialize, Deserialize, Debug)] #[serde(rename_all = "camelCase")] -/// Ask the server to start a session with a producer peer +/// Ask the server to start a session (either as a producer or a consumer) pub struct StartSessionMessage { /// Identifies the peer pub peer_id: String, @@ -143,7 +152,7 @@ pub enum IncomingMessage { NewPeer, /// Set current peer status SetPeerStatus(PeerStatus), - /// Start a session with a producer peer + /// Start a session with another peer StartSession(StartSessionMessage), /// End an existing session EndSession(EndSessionMessage), diff --git a/net/webrtc/signalling/src/handlers/mod.rs b/net/webrtc/signalling/src/handlers/mod.rs index f27db2a50..543c3c201 100644 --- a/net/webrtc/signalling/src/handlers/mod.rs +++ b/net/webrtc/signalling/src/handlers/mod.rs @@ -83,10 +83,10 @@ impl Handler { } p::IncomingMessage::SetPeerStatus(status) => self.set_peer_status(peer_id, &status), p::IncomingMessage::StartSession(message) => { - self.start_session(&message.peer_id, peer_id, message.offer.as_deref()) + self.start_session(peer_id, &message.peer_id, message.offer.as_deref()) } p::IncomingMessage::Peer(peermsg) => self.handle_peer_message(peer_id, peermsg), - p::IncomingMessage::List => self.list_producers(peer_id), + p::IncomingMessage::List => self.list_producers_and_consumers(peer_id), p::IncomingMessage::EndSession(msg) => self.end_session(peer_id, &msg.session_id), } } @@ -203,22 +203,26 @@ impl Handler { Ok(()) } - /// List producer peers + /// List producer and consumer peers #[instrument(level = "debug", skip(self))] - fn list_producers(&mut self, peer_id: &str) -> Result<(), Error> { + fn list_producers_and_consumers(&mut self, peer_id: &str) -> Result<(), Error> { + let filter_peers = |f: fn(&PeerStatus) -> bool| { + self.peers + .iter() + .filter_map(|(peer_id, peer)| { + f(peer).then_some(p::Peer { + id: peer_id.clone(), + meta: peer.meta.clone(), + }) + }) + .collect() + }; + self.items.push_back(( peer_id.to_string(), p::OutgoingMessage::List { - producers: self - .peers - .iter() - .filter_map(|(peer_id, peer)| { - peer.producing().then_some(p::Peer { - id: peer_id.clone(), - meta: peer.meta.clone(), - }) - }) - .collect(), + producers: filter_peers(PeerStatus::producing), + consumers: filter_peers(PeerStatus::consuming), }, )); @@ -239,8 +243,17 @@ impl Handler { return Ok(()); } + if status.producing() && status.consuming() { + bail!( + "Cannot register peer {} as both producer and passive consumer", + peer_id + ); + } + if old_status.producing() && !status.producing() { self.stop_producer(peer_id); + } else if old_status.consuming() && !status.consuming() { + self.stop_consumer(peer_id); } let mut status = status.clone(); @@ -261,7 +274,7 @@ impl Handler { )); } - info!(peer_id = %peer_id, "registered as a producer"); + info!(peer_id = %peer_id, "registered as {:?}", status.roles); Ok(()) } @@ -270,27 +283,32 @@ impl Handler { #[instrument(level = "debug", skip(self))] fn start_session( &mut self, - producer_id: &str, - consumer_id: &str, + from_id: &str, + to_id: &str, offer: Option<&str>, ) -> Result<(), Error> { - self.peers.get(producer_id).map_or_else( - || Err(anyhow!("No producer with ID: '{producer_id}'")), - |peer| { - if !peer.producing() { - Err(anyhow!( - "Peer with id {} is not registered as a producer", - producer_id - )) - } else { - Ok(peer) - } - }, - )?; + let from = self + .peers + .get(from_id) + .ok_or_else(|| anyhow!("Peer with ID '{}' not found", from_id))?; + let to = self + .peers + .get(to_id) + .ok_or_else(|| anyhow!("Peer with ID '{}' not found", to_id))?; - self.peers - .get(consumer_id) - .map_or_else(|| Err(anyhow!("No consumer with ID: '{consumer_id}'")), Ok)?; + let (producer_id, consumer_id) = if to.producing() { + (to_id, from_id) + } else if to.consuming() { + (from_id, to_id) + } else { + bail!( + "Missing a producer or a consumer: id {} roles {:?}, id {} roles {:?}", + from_id, + from.roles, + to_id, + to.roles + ); + }; let session_id = uuid::Uuid::new_v4().to_string(); self.sessions.insert( @@ -444,7 +462,8 @@ mod tests { meta: Some(json!( {"display-name": "foobar".to_string() })), - }] + }], + consumers: vec!() } ); } @@ -1031,7 +1050,7 @@ mod tests { assert_eq!( sent_message, p::OutgoingMessage::Error { - details: "No producer with ID: 'producer'".into() + details: "Peer with ID 'producer' not found".into() } ); } @@ -1238,7 +1257,7 @@ mod tests { assert_eq!( sent_message, p::OutgoingMessage::Error { - details: "No consumer with ID: 'consumer'".into() + details: "Peer with ID 'consumer' not found".into() } ); } @@ -1372,4 +1391,62 @@ mod tests { .get(&session0_id) .expect("Session should remain"); } + + #[tokio::test] + async fn test_start_session_passive_consumer() { + let (mut tx, rx) = mpsc::unbounded(); + let mut handler = Handler::new(Box::pin(rx)); + + new_peer(&mut tx, &mut handler, "consumer").await; + let message = p::IncomingMessage::SetPeerStatus(p::PeerStatus { + roles: vec![p::PeerRole::Consumer], + meta: None, + peer_id: None, + }); + tx.send(("consumer".to_string(), Some(message))) + .await + .unwrap(); + + new_peer(&mut tx, &mut handler, "producer").await; + let message = p::IncomingMessage::SetPeerStatus(p::PeerStatus { + roles: vec![p::PeerRole::Producer], + meta: None, + peer_id: None, + }); + tx.send(("producer".to_string(), Some(message))) + .await + .unwrap(); + + let message = p::IncomingMessage::StartSession(p::StartSessionMessage { + peer_id: "consumer".to_string(), + offer: None, + }); + tx.send(("producer".to_string(), Some(message))) + .await + .unwrap(); + + let (peer_id, sent_message) = handler.next().await.unwrap(); + assert_eq!(peer_id, "consumer"); + let session_id = match sent_message { + p::OutgoingMessage::SessionStarted { + ref peer_id, + ref session_id, + } => { + assert_eq!(peer_id, "producer"); + session_id.to_string() + } + _ => panic!("SessionStarted message missing"), + }; + + let (peer_id, sent_message) = handler.next().await.unwrap(); + assert_eq!(peer_id, "producer"); + assert_eq!( + sent_message, + p::OutgoingMessage::StartSession { + peer_id: "consumer".to_string(), + session_id, + offer: None, + } + ); + } } diff --git a/net/webrtc/src/signaller/imp.rs b/net/webrtc/src/signaller/imp.rs index fe40d9877..58e7bcccc 100644 --- a/net/webrtc/src/signaller/imp.rs +++ b/net/webrtc/src/signaller/imp.rs @@ -155,9 +155,12 @@ impl Signaller { }; if let super::WebRTCSignallerRole::Consumer = role { - if !connect_to_first_producer { - self.producer_peer_id() - .ok_or_else(|| anyhow!("No target producer peer id set"))?; + if !connect_to_first_producer && self.producer_peer_id().is_none() { + gst::info!( + CAT, + imp = self, + "No producer peer id set, listening for producer session requests" + ); } } @@ -273,7 +276,7 @@ impl Signaller { super::WebRTCSignallerRole::Consumer => p::PeerStatus { meta: meta.clone(), peer_id: Some(peer_id.to_string()), - roles: vec![], + roles: vec![p::PeerRole::Consumer], }, super::WebRTCSignallerRole::Producer => p::PeerStatus { meta: meta.clone(), @@ -373,10 +376,10 @@ impl Signaller { match msg { p::OutgoingMessage::Welcome { peer_id } => { self.set_status(meta, &peer_id); - if self.producer_peer_id().is_none() { - self.send(p::IncomingMessage::List); - } else { + if self.producer_peer_id().is_some() { self.start_session(); + } else if self.connect_to_first_producer() { + self.send(p::IncomingMessage::List); } } p::OutgoingMessage::PeerStatusChanged(p::PeerStatus { @@ -510,10 +513,11 @@ impl Signaller { ); } }, - p::OutgoingMessage::List { producers } => { + p::OutgoingMessage::List { producers, .. } => { let mut settings = self.settings.lock().unwrap(); let role = settings.role; if matches!(role, super::WebRTCSignallerRole::Consumer) + && settings.connect_to_first_producer && settings.producer_peer_id.is_none() { if let Some(producer) = producers.first() { diff --git a/net/webrtc/src/webrtcsrc/imp.rs b/net/webrtc/src/webrtcsrc/imp.rs index 5ca6d865a..a70ac1396 100644 --- a/net/webrtc/src/webrtcsrc/imp.rs +++ b/net/webrtc/src/webrtcsrc/imp.rs @@ -1117,7 +1117,7 @@ impl BaseWebRTCSrc { glib::closure!( #[watch] instance, - move |_signaler: glib::Object, session_id: &str| { + move |_signaller: glib::Object, session_id: &str| { let this = instance.imp(); let mut state = this.state.lock().unwrap(); let Some(session) = state.sessions.remove(session_id) else {