mirror of
https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git
synced 2025-09-02 09:43:48 +00:00
webrtc: Allow producers to start a session for a chosen consumer
Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/2206>
This commit is contained in:
parent
0afe3e1846
commit
abf97ba2a9
5 changed files with 140 additions and 50 deletions
|
@ -666,7 +666,7 @@ async fn listen(
|
|||
.unwrap_or_else(|| Err(anyhow!("Signaller ended session")))
|
||||
.context("List response")?
|
||||
{
|
||||
FromSignaller::List { producers } => {
|
||||
FromSignaller::List { producers, .. } => {
|
||||
for peer in producers {
|
||||
spawn_consumer(&signaller_url, &pipeline, args.clone(), peer.id, peer.meta)
|
||||
.context("Spawning consumer")?;
|
||||
|
|
|
@ -35,9 +35,12 @@ pub enum OutgoingMessage {
|
|||
EndSession(EndSessionMessage),
|
||||
/// Messages directly forwarded from one peer to another
|
||||
Peer(PeerMessage),
|
||||
/// Provides the current list of consumer peers
|
||||
/// Provides the current list of producers and passive consumers (awaiting session initiation)
|
||||
#[serde(rename_all = "camelCase")]
|
||||
List { producers: Vec<Peer> },
|
||||
List {
|
||||
producers: Vec<Peer>,
|
||||
consumers: Vec<Peer>,
|
||||
},
|
||||
/// Notifies that an error occurred with the peer's current session
|
||||
#[serde(rename_all = "camelCase")]
|
||||
Error { details: String },
|
||||
|
@ -51,6 +54,8 @@ pub enum PeerRole {
|
|||
Producer,
|
||||
/// Register as a listener
|
||||
Listener,
|
||||
/// Register as a passive consumer (awaiting a session initiated by a producer peer)
|
||||
Consumer,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Default, Clone)]
|
||||
|
@ -71,11 +76,15 @@ impl PeerStatus {
|
|||
pub fn listening(&self) -> bool {
|
||||
self.roles.iter().any(|t| matches!(t, PeerRole::Listener))
|
||||
}
|
||||
|
||||
pub fn consuming(&self) -> bool {
|
||||
self.roles.iter().any(|t| matches!(t, PeerRole::Consumer))
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
/// Ask the server to start a session with a producer peer
|
||||
/// Ask the server to start a session (either as a producer or a consumer)
|
||||
pub struct StartSessionMessage {
|
||||
/// Identifies the peer
|
||||
pub peer_id: String,
|
||||
|
@ -143,7 +152,7 @@ pub enum IncomingMessage {
|
|||
NewPeer,
|
||||
/// Set current peer status
|
||||
SetPeerStatus(PeerStatus),
|
||||
/// Start a session with a producer peer
|
||||
/// Start a session with another peer
|
||||
StartSession(StartSessionMessage),
|
||||
/// End an existing session
|
||||
EndSession(EndSessionMessage),
|
||||
|
|
|
@ -83,10 +83,10 @@ impl Handler {
|
|||
}
|
||||
p::IncomingMessage::SetPeerStatus(status) => self.set_peer_status(peer_id, &status),
|
||||
p::IncomingMessage::StartSession(message) => {
|
||||
self.start_session(&message.peer_id, peer_id, message.offer.as_deref())
|
||||
self.start_session(peer_id, &message.peer_id, message.offer.as_deref())
|
||||
}
|
||||
p::IncomingMessage::Peer(peermsg) => self.handle_peer_message(peer_id, peermsg),
|
||||
p::IncomingMessage::List => self.list_producers(peer_id),
|
||||
p::IncomingMessage::List => self.list_producers_and_consumers(peer_id),
|
||||
p::IncomingMessage::EndSession(msg) => self.end_session(peer_id, &msg.session_id),
|
||||
}
|
||||
}
|
||||
|
@ -203,22 +203,26 @@ impl Handler {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
/// List producer peers
|
||||
/// List producer and consumer peers
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
fn list_producers(&mut self, peer_id: &str) -> Result<(), Error> {
|
||||
fn list_producers_and_consumers(&mut self, peer_id: &str) -> Result<(), Error> {
|
||||
let filter_peers = |f: fn(&PeerStatus) -> bool| {
|
||||
self.peers
|
||||
.iter()
|
||||
.filter_map(|(peer_id, peer)| {
|
||||
f(peer).then_some(p::Peer {
|
||||
id: peer_id.clone(),
|
||||
meta: peer.meta.clone(),
|
||||
})
|
||||
})
|
||||
.collect()
|
||||
};
|
||||
|
||||
self.items.push_back((
|
||||
peer_id.to_string(),
|
||||
p::OutgoingMessage::List {
|
||||
producers: self
|
||||
.peers
|
||||
.iter()
|
||||
.filter_map(|(peer_id, peer)| {
|
||||
peer.producing().then_some(p::Peer {
|
||||
id: peer_id.clone(),
|
||||
meta: peer.meta.clone(),
|
||||
})
|
||||
})
|
||||
.collect(),
|
||||
producers: filter_peers(PeerStatus::producing),
|
||||
consumers: filter_peers(PeerStatus::consuming),
|
||||
},
|
||||
));
|
||||
|
||||
|
@ -239,8 +243,17 @@ impl Handler {
|
|||
return Ok(());
|
||||
}
|
||||
|
||||
if status.producing() && status.consuming() {
|
||||
bail!(
|
||||
"Cannot register peer {} as both producer and passive consumer",
|
||||
peer_id
|
||||
);
|
||||
}
|
||||
|
||||
if old_status.producing() && !status.producing() {
|
||||
self.stop_producer(peer_id);
|
||||
} else if old_status.consuming() && !status.consuming() {
|
||||
self.stop_consumer(peer_id);
|
||||
}
|
||||
|
||||
let mut status = status.clone();
|
||||
|
@ -261,7 +274,7 @@ impl Handler {
|
|||
));
|
||||
}
|
||||
|
||||
info!(peer_id = %peer_id, "registered as a producer");
|
||||
info!(peer_id = %peer_id, "registered as {:?}", status.roles);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
@ -270,27 +283,32 @@ impl Handler {
|
|||
#[instrument(level = "debug", skip(self))]
|
||||
fn start_session(
|
||||
&mut self,
|
||||
producer_id: &str,
|
||||
consumer_id: &str,
|
||||
from_id: &str,
|
||||
to_id: &str,
|
||||
offer: Option<&str>,
|
||||
) -> Result<(), Error> {
|
||||
self.peers.get(producer_id).map_or_else(
|
||||
|| Err(anyhow!("No producer with ID: '{producer_id}'")),
|
||||
|peer| {
|
||||
if !peer.producing() {
|
||||
Err(anyhow!(
|
||||
"Peer with id {} is not registered as a producer",
|
||||
producer_id
|
||||
))
|
||||
} else {
|
||||
Ok(peer)
|
||||
}
|
||||
},
|
||||
)?;
|
||||
let from = self
|
||||
.peers
|
||||
.get(from_id)
|
||||
.ok_or_else(|| anyhow!("Peer with ID '{}' not found", from_id))?;
|
||||
let to = self
|
||||
.peers
|
||||
.get(to_id)
|
||||
.ok_or_else(|| anyhow!("Peer with ID '{}' not found", to_id))?;
|
||||
|
||||
self.peers
|
||||
.get(consumer_id)
|
||||
.map_or_else(|| Err(anyhow!("No consumer with ID: '{consumer_id}'")), Ok)?;
|
||||
let (producer_id, consumer_id) = if to.producing() {
|
||||
(to_id, from_id)
|
||||
} else if to.consuming() {
|
||||
(from_id, to_id)
|
||||
} else {
|
||||
bail!(
|
||||
"Missing a producer or a consumer: id {} roles {:?}, id {} roles {:?}",
|
||||
from_id,
|
||||
from.roles,
|
||||
to_id,
|
||||
to.roles
|
||||
);
|
||||
};
|
||||
|
||||
let session_id = uuid::Uuid::new_v4().to_string();
|
||||
self.sessions.insert(
|
||||
|
@ -444,7 +462,8 @@ mod tests {
|
|||
meta: Some(json!(
|
||||
{"display-name": "foobar".to_string()
|
||||
})),
|
||||
}]
|
||||
}],
|
||||
consumers: vec!()
|
||||
}
|
||||
);
|
||||
}
|
||||
|
@ -1031,7 +1050,7 @@ mod tests {
|
|||
assert_eq!(
|
||||
sent_message,
|
||||
p::OutgoingMessage::Error {
|
||||
details: "No producer with ID: 'producer'".into()
|
||||
details: "Peer with ID 'producer' not found".into()
|
||||
}
|
||||
);
|
||||
}
|
||||
|
@ -1238,7 +1257,7 @@ mod tests {
|
|||
assert_eq!(
|
||||
sent_message,
|
||||
p::OutgoingMessage::Error {
|
||||
details: "No consumer with ID: 'consumer'".into()
|
||||
details: "Peer with ID 'consumer' not found".into()
|
||||
}
|
||||
);
|
||||
}
|
||||
|
@ -1372,4 +1391,62 @@ mod tests {
|
|||
.get(&session0_id)
|
||||
.expect("Session should remain");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_start_session_passive_consumer() {
|
||||
let (mut tx, rx) = mpsc::unbounded();
|
||||
let mut handler = Handler::new(Box::pin(rx));
|
||||
|
||||
new_peer(&mut tx, &mut handler, "consumer").await;
|
||||
let message = p::IncomingMessage::SetPeerStatus(p::PeerStatus {
|
||||
roles: vec![p::PeerRole::Consumer],
|
||||
meta: None,
|
||||
peer_id: None,
|
||||
});
|
||||
tx.send(("consumer".to_string(), Some(message)))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
new_peer(&mut tx, &mut handler, "producer").await;
|
||||
let message = p::IncomingMessage::SetPeerStatus(p::PeerStatus {
|
||||
roles: vec![p::PeerRole::Producer],
|
||||
meta: None,
|
||||
peer_id: None,
|
||||
});
|
||||
tx.send(("producer".to_string(), Some(message)))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let message = p::IncomingMessage::StartSession(p::StartSessionMessage {
|
||||
peer_id: "consumer".to_string(),
|
||||
offer: None,
|
||||
});
|
||||
tx.send(("producer".to_string(), Some(message)))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let (peer_id, sent_message) = handler.next().await.unwrap();
|
||||
assert_eq!(peer_id, "consumer");
|
||||
let session_id = match sent_message {
|
||||
p::OutgoingMessage::SessionStarted {
|
||||
ref peer_id,
|
||||
ref session_id,
|
||||
} => {
|
||||
assert_eq!(peer_id, "producer");
|
||||
session_id.to_string()
|
||||
}
|
||||
_ => panic!("SessionStarted message missing"),
|
||||
};
|
||||
|
||||
let (peer_id, sent_message) = handler.next().await.unwrap();
|
||||
assert_eq!(peer_id, "producer");
|
||||
assert_eq!(
|
||||
sent_message,
|
||||
p::OutgoingMessage::StartSession {
|
||||
peer_id: "consumer".to_string(),
|
||||
session_id,
|
||||
offer: None,
|
||||
}
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -155,9 +155,12 @@ impl Signaller {
|
|||
};
|
||||
|
||||
if let super::WebRTCSignallerRole::Consumer = role {
|
||||
if !connect_to_first_producer {
|
||||
self.producer_peer_id()
|
||||
.ok_or_else(|| anyhow!("No target producer peer id set"))?;
|
||||
if !connect_to_first_producer && self.producer_peer_id().is_none() {
|
||||
gst::info!(
|
||||
CAT,
|
||||
imp = self,
|
||||
"No producer peer id set, listening for producer session requests"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -273,7 +276,7 @@ impl Signaller {
|
|||
super::WebRTCSignallerRole::Consumer => p::PeerStatus {
|
||||
meta: meta.clone(),
|
||||
peer_id: Some(peer_id.to_string()),
|
||||
roles: vec![],
|
||||
roles: vec![p::PeerRole::Consumer],
|
||||
},
|
||||
super::WebRTCSignallerRole::Producer => p::PeerStatus {
|
||||
meta: meta.clone(),
|
||||
|
@ -373,10 +376,10 @@ impl Signaller {
|
|||
match msg {
|
||||
p::OutgoingMessage::Welcome { peer_id } => {
|
||||
self.set_status(meta, &peer_id);
|
||||
if self.producer_peer_id().is_none() {
|
||||
self.send(p::IncomingMessage::List);
|
||||
} else {
|
||||
if self.producer_peer_id().is_some() {
|
||||
self.start_session();
|
||||
} else if self.connect_to_first_producer() {
|
||||
self.send(p::IncomingMessage::List);
|
||||
}
|
||||
}
|
||||
p::OutgoingMessage::PeerStatusChanged(p::PeerStatus {
|
||||
|
@ -510,10 +513,11 @@ impl Signaller {
|
|||
);
|
||||
}
|
||||
},
|
||||
p::OutgoingMessage::List { producers } => {
|
||||
p::OutgoingMessage::List { producers, .. } => {
|
||||
let mut settings = self.settings.lock().unwrap();
|
||||
let role = settings.role;
|
||||
if matches!(role, super::WebRTCSignallerRole::Consumer)
|
||||
&& settings.connect_to_first_producer
|
||||
&& settings.producer_peer_id.is_none()
|
||||
{
|
||||
if let Some(producer) = producers.first() {
|
||||
|
|
|
@ -1117,7 +1117,7 @@ impl BaseWebRTCSrc {
|
|||
glib::closure!(
|
||||
#[watch]
|
||||
instance,
|
||||
move |_signaler: glib::Object, session_id: &str| {
|
||||
move |_signaller: glib::Object, session_id: &str| {
|
||||
let this = instance.imp();
|
||||
let mut state = this.state.lock().unwrap();
|
||||
let Some(session) = state.sessions.remove(session_id) else {
|
||||
|
|
Loading…
Reference in a new issue