webrtc: Allow producers to start a session for a chosen consumer

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/2206>
This commit is contained in:
Piotr Brzeziński 2025-04-14 16:36:17 +02:00 committed by GStreamer Marge Bot
parent 0afe3e1846
commit abf97ba2a9
5 changed files with 140 additions and 50 deletions

View file

@ -666,7 +666,7 @@ async fn listen(
.unwrap_or_else(|| Err(anyhow!("Signaller ended session"))) .unwrap_or_else(|| Err(anyhow!("Signaller ended session")))
.context("List response")? .context("List response")?
{ {
FromSignaller::List { producers } => { FromSignaller::List { producers, .. } => {
for peer in producers { for peer in producers {
spawn_consumer(&signaller_url, &pipeline, args.clone(), peer.id, peer.meta) spawn_consumer(&signaller_url, &pipeline, args.clone(), peer.id, peer.meta)
.context("Spawning consumer")?; .context("Spawning consumer")?;

View file

@ -35,9 +35,12 @@ pub enum OutgoingMessage {
EndSession(EndSessionMessage), EndSession(EndSessionMessage),
/// Messages directly forwarded from one peer to another /// Messages directly forwarded from one peer to another
Peer(PeerMessage), Peer(PeerMessage),
/// Provides the current list of consumer peers /// Provides the current list of producers and passive consumers (awaiting session initiation)
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
List { producers: Vec<Peer> }, List {
producers: Vec<Peer>,
consumers: Vec<Peer>,
},
/// Notifies that an error occurred with the peer's current session /// Notifies that an error occurred with the peer's current session
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
Error { details: String }, Error { details: String },
@ -51,6 +54,8 @@ pub enum PeerRole {
Producer, Producer,
/// Register as a listener /// Register as a listener
Listener, Listener,
/// Register as a passive consumer (awaiting a session initiated by a producer peer)
Consumer,
} }
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Default, Clone)] #[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Default, Clone)]
@ -71,11 +76,15 @@ impl PeerStatus {
pub fn listening(&self) -> bool { pub fn listening(&self) -> bool {
self.roles.iter().any(|t| matches!(t, PeerRole::Listener)) self.roles.iter().any(|t| matches!(t, PeerRole::Listener))
} }
pub fn consuming(&self) -> bool {
self.roles.iter().any(|t| matches!(t, PeerRole::Consumer))
}
} }
#[derive(Serialize, Deserialize, Debug)] #[derive(Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
/// Ask the server to start a session with a producer peer /// Ask the server to start a session (either as a producer or a consumer)
pub struct StartSessionMessage { pub struct StartSessionMessage {
/// Identifies the peer /// Identifies the peer
pub peer_id: String, pub peer_id: String,
@ -143,7 +152,7 @@ pub enum IncomingMessage {
NewPeer, NewPeer,
/// Set current peer status /// Set current peer status
SetPeerStatus(PeerStatus), SetPeerStatus(PeerStatus),
/// Start a session with a producer peer /// Start a session with another peer
StartSession(StartSessionMessage), StartSession(StartSessionMessage),
/// End an existing session /// End an existing session
EndSession(EndSessionMessage), EndSession(EndSessionMessage),

View file

@ -83,10 +83,10 @@ impl Handler {
} }
p::IncomingMessage::SetPeerStatus(status) => self.set_peer_status(peer_id, &status), p::IncomingMessage::SetPeerStatus(status) => self.set_peer_status(peer_id, &status),
p::IncomingMessage::StartSession(message) => { p::IncomingMessage::StartSession(message) => {
self.start_session(&message.peer_id, peer_id, message.offer.as_deref()) self.start_session(peer_id, &message.peer_id, message.offer.as_deref())
} }
p::IncomingMessage::Peer(peermsg) => self.handle_peer_message(peer_id, peermsg), p::IncomingMessage::Peer(peermsg) => self.handle_peer_message(peer_id, peermsg),
p::IncomingMessage::List => self.list_producers(peer_id), p::IncomingMessage::List => self.list_producers_and_consumers(peer_id),
p::IncomingMessage::EndSession(msg) => self.end_session(peer_id, &msg.session_id), p::IncomingMessage::EndSession(msg) => self.end_session(peer_id, &msg.session_id),
} }
} }
@ -203,22 +203,26 @@ impl Handler {
Ok(()) Ok(())
} }
/// List producer peers /// List producer and consumer peers
#[instrument(level = "debug", skip(self))] #[instrument(level = "debug", skip(self))]
fn list_producers(&mut self, peer_id: &str) -> Result<(), Error> { fn list_producers_and_consumers(&mut self, peer_id: &str) -> Result<(), Error> {
let filter_peers = |f: fn(&PeerStatus) -> bool| {
self.peers
.iter()
.filter_map(|(peer_id, peer)| {
f(peer).then_some(p::Peer {
id: peer_id.clone(),
meta: peer.meta.clone(),
})
})
.collect()
};
self.items.push_back(( self.items.push_back((
peer_id.to_string(), peer_id.to_string(),
p::OutgoingMessage::List { p::OutgoingMessage::List {
producers: self producers: filter_peers(PeerStatus::producing),
.peers consumers: filter_peers(PeerStatus::consuming),
.iter()
.filter_map(|(peer_id, peer)| {
peer.producing().then_some(p::Peer {
id: peer_id.clone(),
meta: peer.meta.clone(),
})
})
.collect(),
}, },
)); ));
@ -239,8 +243,17 @@ impl Handler {
return Ok(()); return Ok(());
} }
if status.producing() && status.consuming() {
bail!(
"Cannot register peer {} as both producer and passive consumer",
peer_id
);
}
if old_status.producing() && !status.producing() { if old_status.producing() && !status.producing() {
self.stop_producer(peer_id); self.stop_producer(peer_id);
} else if old_status.consuming() && !status.consuming() {
self.stop_consumer(peer_id);
} }
let mut status = status.clone(); let mut status = status.clone();
@ -261,7 +274,7 @@ impl Handler {
)); ));
} }
info!(peer_id = %peer_id, "registered as a producer"); info!(peer_id = %peer_id, "registered as {:?}", status.roles);
Ok(()) Ok(())
} }
@ -270,27 +283,32 @@ impl Handler {
#[instrument(level = "debug", skip(self))] #[instrument(level = "debug", skip(self))]
fn start_session( fn start_session(
&mut self, &mut self,
producer_id: &str, from_id: &str,
consumer_id: &str, to_id: &str,
offer: Option<&str>, offer: Option<&str>,
) -> Result<(), Error> { ) -> Result<(), Error> {
self.peers.get(producer_id).map_or_else( let from = self
|| Err(anyhow!("No producer with ID: '{producer_id}'")), .peers
|peer| { .get(from_id)
if !peer.producing() { .ok_or_else(|| anyhow!("Peer with ID '{}' not found", from_id))?;
Err(anyhow!( let to = self
"Peer with id {} is not registered as a producer", .peers
producer_id .get(to_id)
)) .ok_or_else(|| anyhow!("Peer with ID '{}' not found", to_id))?;
} else {
Ok(peer)
}
},
)?;
self.peers let (producer_id, consumer_id) = if to.producing() {
.get(consumer_id) (to_id, from_id)
.map_or_else(|| Err(anyhow!("No consumer with ID: '{consumer_id}'")), Ok)?; } else if to.consuming() {
(from_id, to_id)
} else {
bail!(
"Missing a producer or a consumer: id {} roles {:?}, id {} roles {:?}",
from_id,
from.roles,
to_id,
to.roles
);
};
let session_id = uuid::Uuid::new_v4().to_string(); let session_id = uuid::Uuid::new_v4().to_string();
self.sessions.insert( self.sessions.insert(
@ -444,7 +462,8 @@ mod tests {
meta: Some(json!( meta: Some(json!(
{"display-name": "foobar".to_string() {"display-name": "foobar".to_string()
})), })),
}] }],
consumers: vec!()
} }
); );
} }
@ -1031,7 +1050,7 @@ mod tests {
assert_eq!( assert_eq!(
sent_message, sent_message,
p::OutgoingMessage::Error { p::OutgoingMessage::Error {
details: "No producer with ID: 'producer'".into() details: "Peer with ID 'producer' not found".into()
} }
); );
} }
@ -1238,7 +1257,7 @@ mod tests {
assert_eq!( assert_eq!(
sent_message, sent_message,
p::OutgoingMessage::Error { p::OutgoingMessage::Error {
details: "No consumer with ID: 'consumer'".into() details: "Peer with ID 'consumer' not found".into()
} }
); );
} }
@ -1372,4 +1391,62 @@ mod tests {
.get(&session0_id) .get(&session0_id)
.expect("Session should remain"); .expect("Session should remain");
} }
#[tokio::test]
async fn test_start_session_passive_consumer() {
let (mut tx, rx) = mpsc::unbounded();
let mut handler = Handler::new(Box::pin(rx));
new_peer(&mut tx, &mut handler, "consumer").await;
let message = p::IncomingMessage::SetPeerStatus(p::PeerStatus {
roles: vec![p::PeerRole::Consumer],
meta: None,
peer_id: None,
});
tx.send(("consumer".to_string(), Some(message)))
.await
.unwrap();
new_peer(&mut tx, &mut handler, "producer").await;
let message = p::IncomingMessage::SetPeerStatus(p::PeerStatus {
roles: vec![p::PeerRole::Producer],
meta: None,
peer_id: None,
});
tx.send(("producer".to_string(), Some(message)))
.await
.unwrap();
let message = p::IncomingMessage::StartSession(p::StartSessionMessage {
peer_id: "consumer".to_string(),
offer: None,
});
tx.send(("producer".to_string(), Some(message)))
.await
.unwrap();
let (peer_id, sent_message) = handler.next().await.unwrap();
assert_eq!(peer_id, "consumer");
let session_id = match sent_message {
p::OutgoingMessage::SessionStarted {
ref peer_id,
ref session_id,
} => {
assert_eq!(peer_id, "producer");
session_id.to_string()
}
_ => panic!("SessionStarted message missing"),
};
let (peer_id, sent_message) = handler.next().await.unwrap();
assert_eq!(peer_id, "producer");
assert_eq!(
sent_message,
p::OutgoingMessage::StartSession {
peer_id: "consumer".to_string(),
session_id,
offer: None,
}
);
}
} }

View file

@ -155,9 +155,12 @@ impl Signaller {
}; };
if let super::WebRTCSignallerRole::Consumer = role { if let super::WebRTCSignallerRole::Consumer = role {
if !connect_to_first_producer { if !connect_to_first_producer && self.producer_peer_id().is_none() {
self.producer_peer_id() gst::info!(
.ok_or_else(|| anyhow!("No target producer peer id set"))?; CAT,
imp = self,
"No producer peer id set, listening for producer session requests"
);
} }
} }
@ -273,7 +276,7 @@ impl Signaller {
super::WebRTCSignallerRole::Consumer => p::PeerStatus { super::WebRTCSignallerRole::Consumer => p::PeerStatus {
meta: meta.clone(), meta: meta.clone(),
peer_id: Some(peer_id.to_string()), peer_id: Some(peer_id.to_string()),
roles: vec![], roles: vec![p::PeerRole::Consumer],
}, },
super::WebRTCSignallerRole::Producer => p::PeerStatus { super::WebRTCSignallerRole::Producer => p::PeerStatus {
meta: meta.clone(), meta: meta.clone(),
@ -373,10 +376,10 @@ impl Signaller {
match msg { match msg {
p::OutgoingMessage::Welcome { peer_id } => { p::OutgoingMessage::Welcome { peer_id } => {
self.set_status(meta, &peer_id); self.set_status(meta, &peer_id);
if self.producer_peer_id().is_none() { if self.producer_peer_id().is_some() {
self.send(p::IncomingMessage::List);
} else {
self.start_session(); self.start_session();
} else if self.connect_to_first_producer() {
self.send(p::IncomingMessage::List);
} }
} }
p::OutgoingMessage::PeerStatusChanged(p::PeerStatus { p::OutgoingMessage::PeerStatusChanged(p::PeerStatus {
@ -510,10 +513,11 @@ impl Signaller {
); );
} }
}, },
p::OutgoingMessage::List { producers } => { p::OutgoingMessage::List { producers, .. } => {
let mut settings = self.settings.lock().unwrap(); let mut settings = self.settings.lock().unwrap();
let role = settings.role; let role = settings.role;
if matches!(role, super::WebRTCSignallerRole::Consumer) if matches!(role, super::WebRTCSignallerRole::Consumer)
&& settings.connect_to_first_producer
&& settings.producer_peer_id.is_none() && settings.producer_peer_id.is_none()
{ {
if let Some(producer) = producers.first() { if let Some(producer) = producers.first() {

View file

@ -1117,7 +1117,7 @@ impl BaseWebRTCSrc {
glib::closure!( glib::closure!(
#[watch] #[watch]
instance, instance,
move |_signaler: glib::Object, session_id: &str| { move |_signaller: glib::Object, session_id: &str| {
let this = instance.imp(); let this = instance.imp();
let mut state = this.state.lock().unwrap(); let mut state = this.state.lock().unwrap();
let Some(session) = state.sessions.remove(session_id) else { let Some(session) = state.sessions.remove(session_id) else {