mirror of
https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git
synced 2025-09-03 10:13:47 +00:00
webrtc: Allow producers to start a session for a chosen consumer
Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/2206>
This commit is contained in:
parent
0afe3e1846
commit
abf97ba2a9
5 changed files with 140 additions and 50 deletions
|
@ -666,7 +666,7 @@ async fn listen(
|
||||||
.unwrap_or_else(|| Err(anyhow!("Signaller ended session")))
|
.unwrap_or_else(|| Err(anyhow!("Signaller ended session")))
|
||||||
.context("List response")?
|
.context("List response")?
|
||||||
{
|
{
|
||||||
FromSignaller::List { producers } => {
|
FromSignaller::List { producers, .. } => {
|
||||||
for peer in producers {
|
for peer in producers {
|
||||||
spawn_consumer(&signaller_url, &pipeline, args.clone(), peer.id, peer.meta)
|
spawn_consumer(&signaller_url, &pipeline, args.clone(), peer.id, peer.meta)
|
||||||
.context("Spawning consumer")?;
|
.context("Spawning consumer")?;
|
||||||
|
|
|
@ -35,9 +35,12 @@ pub enum OutgoingMessage {
|
||||||
EndSession(EndSessionMessage),
|
EndSession(EndSessionMessage),
|
||||||
/// Messages directly forwarded from one peer to another
|
/// Messages directly forwarded from one peer to another
|
||||||
Peer(PeerMessage),
|
Peer(PeerMessage),
|
||||||
/// Provides the current list of consumer peers
|
/// Provides the current list of producers and passive consumers (awaiting session initiation)
|
||||||
#[serde(rename_all = "camelCase")]
|
#[serde(rename_all = "camelCase")]
|
||||||
List { producers: Vec<Peer> },
|
List {
|
||||||
|
producers: Vec<Peer>,
|
||||||
|
consumers: Vec<Peer>,
|
||||||
|
},
|
||||||
/// Notifies that an error occurred with the peer's current session
|
/// Notifies that an error occurred with the peer's current session
|
||||||
#[serde(rename_all = "camelCase")]
|
#[serde(rename_all = "camelCase")]
|
||||||
Error { details: String },
|
Error { details: String },
|
||||||
|
@ -51,6 +54,8 @@ pub enum PeerRole {
|
||||||
Producer,
|
Producer,
|
||||||
/// Register as a listener
|
/// Register as a listener
|
||||||
Listener,
|
Listener,
|
||||||
|
/// Register as a passive consumer (awaiting a session initiated by a producer peer)
|
||||||
|
Consumer,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Default, Clone)]
|
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Default, Clone)]
|
||||||
|
@ -71,11 +76,15 @@ impl PeerStatus {
|
||||||
pub fn listening(&self) -> bool {
|
pub fn listening(&self) -> bool {
|
||||||
self.roles.iter().any(|t| matches!(t, PeerRole::Listener))
|
self.roles.iter().any(|t| matches!(t, PeerRole::Listener))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn consuming(&self) -> bool {
|
||||||
|
self.roles.iter().any(|t| matches!(t, PeerRole::Consumer))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, Debug)]
|
#[derive(Serialize, Deserialize, Debug)]
|
||||||
#[serde(rename_all = "camelCase")]
|
#[serde(rename_all = "camelCase")]
|
||||||
/// Ask the server to start a session with a producer peer
|
/// Ask the server to start a session (either as a producer or a consumer)
|
||||||
pub struct StartSessionMessage {
|
pub struct StartSessionMessage {
|
||||||
/// Identifies the peer
|
/// Identifies the peer
|
||||||
pub peer_id: String,
|
pub peer_id: String,
|
||||||
|
@ -143,7 +152,7 @@ pub enum IncomingMessage {
|
||||||
NewPeer,
|
NewPeer,
|
||||||
/// Set current peer status
|
/// Set current peer status
|
||||||
SetPeerStatus(PeerStatus),
|
SetPeerStatus(PeerStatus),
|
||||||
/// Start a session with a producer peer
|
/// Start a session with another peer
|
||||||
StartSession(StartSessionMessage),
|
StartSession(StartSessionMessage),
|
||||||
/// End an existing session
|
/// End an existing session
|
||||||
EndSession(EndSessionMessage),
|
EndSession(EndSessionMessage),
|
||||||
|
|
|
@ -83,10 +83,10 @@ impl Handler {
|
||||||
}
|
}
|
||||||
p::IncomingMessage::SetPeerStatus(status) => self.set_peer_status(peer_id, &status),
|
p::IncomingMessage::SetPeerStatus(status) => self.set_peer_status(peer_id, &status),
|
||||||
p::IncomingMessage::StartSession(message) => {
|
p::IncomingMessage::StartSession(message) => {
|
||||||
self.start_session(&message.peer_id, peer_id, message.offer.as_deref())
|
self.start_session(peer_id, &message.peer_id, message.offer.as_deref())
|
||||||
}
|
}
|
||||||
p::IncomingMessage::Peer(peermsg) => self.handle_peer_message(peer_id, peermsg),
|
p::IncomingMessage::Peer(peermsg) => self.handle_peer_message(peer_id, peermsg),
|
||||||
p::IncomingMessage::List => self.list_producers(peer_id),
|
p::IncomingMessage::List => self.list_producers_and_consumers(peer_id),
|
||||||
p::IncomingMessage::EndSession(msg) => self.end_session(peer_id, &msg.session_id),
|
p::IncomingMessage::EndSession(msg) => self.end_session(peer_id, &msg.session_id),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -203,22 +203,26 @@ impl Handler {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// List producer peers
|
/// List producer and consumer peers
|
||||||
#[instrument(level = "debug", skip(self))]
|
#[instrument(level = "debug", skip(self))]
|
||||||
fn list_producers(&mut self, peer_id: &str) -> Result<(), Error> {
|
fn list_producers_and_consumers(&mut self, peer_id: &str) -> Result<(), Error> {
|
||||||
|
let filter_peers = |f: fn(&PeerStatus) -> bool| {
|
||||||
|
self.peers
|
||||||
|
.iter()
|
||||||
|
.filter_map(|(peer_id, peer)| {
|
||||||
|
f(peer).then_some(p::Peer {
|
||||||
|
id: peer_id.clone(),
|
||||||
|
meta: peer.meta.clone(),
|
||||||
|
})
|
||||||
|
})
|
||||||
|
.collect()
|
||||||
|
};
|
||||||
|
|
||||||
self.items.push_back((
|
self.items.push_back((
|
||||||
peer_id.to_string(),
|
peer_id.to_string(),
|
||||||
p::OutgoingMessage::List {
|
p::OutgoingMessage::List {
|
||||||
producers: self
|
producers: filter_peers(PeerStatus::producing),
|
||||||
.peers
|
consumers: filter_peers(PeerStatus::consuming),
|
||||||
.iter()
|
|
||||||
.filter_map(|(peer_id, peer)| {
|
|
||||||
peer.producing().then_some(p::Peer {
|
|
||||||
id: peer_id.clone(),
|
|
||||||
meta: peer.meta.clone(),
|
|
||||||
})
|
|
||||||
})
|
|
||||||
.collect(),
|
|
||||||
},
|
},
|
||||||
));
|
));
|
||||||
|
|
||||||
|
@ -239,8 +243,17 @@ impl Handler {
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if status.producing() && status.consuming() {
|
||||||
|
bail!(
|
||||||
|
"Cannot register peer {} as both producer and passive consumer",
|
||||||
|
peer_id
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
if old_status.producing() && !status.producing() {
|
if old_status.producing() && !status.producing() {
|
||||||
self.stop_producer(peer_id);
|
self.stop_producer(peer_id);
|
||||||
|
} else if old_status.consuming() && !status.consuming() {
|
||||||
|
self.stop_consumer(peer_id);
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut status = status.clone();
|
let mut status = status.clone();
|
||||||
|
@ -261,7 +274,7 @@ impl Handler {
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
info!(peer_id = %peer_id, "registered as a producer");
|
info!(peer_id = %peer_id, "registered as {:?}", status.roles);
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
@ -270,27 +283,32 @@ impl Handler {
|
||||||
#[instrument(level = "debug", skip(self))]
|
#[instrument(level = "debug", skip(self))]
|
||||||
fn start_session(
|
fn start_session(
|
||||||
&mut self,
|
&mut self,
|
||||||
producer_id: &str,
|
from_id: &str,
|
||||||
consumer_id: &str,
|
to_id: &str,
|
||||||
offer: Option<&str>,
|
offer: Option<&str>,
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
self.peers.get(producer_id).map_or_else(
|
let from = self
|
||||||
|| Err(anyhow!("No producer with ID: '{producer_id}'")),
|
.peers
|
||||||
|peer| {
|
.get(from_id)
|
||||||
if !peer.producing() {
|
.ok_or_else(|| anyhow!("Peer with ID '{}' not found", from_id))?;
|
||||||
Err(anyhow!(
|
let to = self
|
||||||
"Peer with id {} is not registered as a producer",
|
.peers
|
||||||
producer_id
|
.get(to_id)
|
||||||
))
|
.ok_or_else(|| anyhow!("Peer with ID '{}' not found", to_id))?;
|
||||||
} else {
|
|
||||||
Ok(peer)
|
|
||||||
}
|
|
||||||
},
|
|
||||||
)?;
|
|
||||||
|
|
||||||
self.peers
|
let (producer_id, consumer_id) = if to.producing() {
|
||||||
.get(consumer_id)
|
(to_id, from_id)
|
||||||
.map_or_else(|| Err(anyhow!("No consumer with ID: '{consumer_id}'")), Ok)?;
|
} else if to.consuming() {
|
||||||
|
(from_id, to_id)
|
||||||
|
} else {
|
||||||
|
bail!(
|
||||||
|
"Missing a producer or a consumer: id {} roles {:?}, id {} roles {:?}",
|
||||||
|
from_id,
|
||||||
|
from.roles,
|
||||||
|
to_id,
|
||||||
|
to.roles
|
||||||
|
);
|
||||||
|
};
|
||||||
|
|
||||||
let session_id = uuid::Uuid::new_v4().to_string();
|
let session_id = uuid::Uuid::new_v4().to_string();
|
||||||
self.sessions.insert(
|
self.sessions.insert(
|
||||||
|
@ -444,7 +462,8 @@ mod tests {
|
||||||
meta: Some(json!(
|
meta: Some(json!(
|
||||||
{"display-name": "foobar".to_string()
|
{"display-name": "foobar".to_string()
|
||||||
})),
|
})),
|
||||||
}]
|
}],
|
||||||
|
consumers: vec!()
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
@ -1031,7 +1050,7 @@ mod tests {
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
sent_message,
|
sent_message,
|
||||||
p::OutgoingMessage::Error {
|
p::OutgoingMessage::Error {
|
||||||
details: "No producer with ID: 'producer'".into()
|
details: "Peer with ID 'producer' not found".into()
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
@ -1238,7 +1257,7 @@ mod tests {
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
sent_message,
|
sent_message,
|
||||||
p::OutgoingMessage::Error {
|
p::OutgoingMessage::Error {
|
||||||
details: "No consumer with ID: 'consumer'".into()
|
details: "Peer with ID 'consumer' not found".into()
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
@ -1372,4 +1391,62 @@ mod tests {
|
||||||
.get(&session0_id)
|
.get(&session0_id)
|
||||||
.expect("Session should remain");
|
.expect("Session should remain");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_start_session_passive_consumer() {
|
||||||
|
let (mut tx, rx) = mpsc::unbounded();
|
||||||
|
let mut handler = Handler::new(Box::pin(rx));
|
||||||
|
|
||||||
|
new_peer(&mut tx, &mut handler, "consumer").await;
|
||||||
|
let message = p::IncomingMessage::SetPeerStatus(p::PeerStatus {
|
||||||
|
roles: vec![p::PeerRole::Consumer],
|
||||||
|
meta: None,
|
||||||
|
peer_id: None,
|
||||||
|
});
|
||||||
|
tx.send(("consumer".to_string(), Some(message)))
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
new_peer(&mut tx, &mut handler, "producer").await;
|
||||||
|
let message = p::IncomingMessage::SetPeerStatus(p::PeerStatus {
|
||||||
|
roles: vec![p::PeerRole::Producer],
|
||||||
|
meta: None,
|
||||||
|
peer_id: None,
|
||||||
|
});
|
||||||
|
tx.send(("producer".to_string(), Some(message)))
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let message = p::IncomingMessage::StartSession(p::StartSessionMessage {
|
||||||
|
peer_id: "consumer".to_string(),
|
||||||
|
offer: None,
|
||||||
|
});
|
||||||
|
tx.send(("producer".to_string(), Some(message)))
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let (peer_id, sent_message) = handler.next().await.unwrap();
|
||||||
|
assert_eq!(peer_id, "consumer");
|
||||||
|
let session_id = match sent_message {
|
||||||
|
p::OutgoingMessage::SessionStarted {
|
||||||
|
ref peer_id,
|
||||||
|
ref session_id,
|
||||||
|
} => {
|
||||||
|
assert_eq!(peer_id, "producer");
|
||||||
|
session_id.to_string()
|
||||||
|
}
|
||||||
|
_ => panic!("SessionStarted message missing"),
|
||||||
|
};
|
||||||
|
|
||||||
|
let (peer_id, sent_message) = handler.next().await.unwrap();
|
||||||
|
assert_eq!(peer_id, "producer");
|
||||||
|
assert_eq!(
|
||||||
|
sent_message,
|
||||||
|
p::OutgoingMessage::StartSession {
|
||||||
|
peer_id: "consumer".to_string(),
|
||||||
|
session_id,
|
||||||
|
offer: None,
|
||||||
|
}
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -155,9 +155,12 @@ impl Signaller {
|
||||||
};
|
};
|
||||||
|
|
||||||
if let super::WebRTCSignallerRole::Consumer = role {
|
if let super::WebRTCSignallerRole::Consumer = role {
|
||||||
if !connect_to_first_producer {
|
if !connect_to_first_producer && self.producer_peer_id().is_none() {
|
||||||
self.producer_peer_id()
|
gst::info!(
|
||||||
.ok_or_else(|| anyhow!("No target producer peer id set"))?;
|
CAT,
|
||||||
|
imp = self,
|
||||||
|
"No producer peer id set, listening for producer session requests"
|
||||||
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -273,7 +276,7 @@ impl Signaller {
|
||||||
super::WebRTCSignallerRole::Consumer => p::PeerStatus {
|
super::WebRTCSignallerRole::Consumer => p::PeerStatus {
|
||||||
meta: meta.clone(),
|
meta: meta.clone(),
|
||||||
peer_id: Some(peer_id.to_string()),
|
peer_id: Some(peer_id.to_string()),
|
||||||
roles: vec![],
|
roles: vec![p::PeerRole::Consumer],
|
||||||
},
|
},
|
||||||
super::WebRTCSignallerRole::Producer => p::PeerStatus {
|
super::WebRTCSignallerRole::Producer => p::PeerStatus {
|
||||||
meta: meta.clone(),
|
meta: meta.clone(),
|
||||||
|
@ -373,10 +376,10 @@ impl Signaller {
|
||||||
match msg {
|
match msg {
|
||||||
p::OutgoingMessage::Welcome { peer_id } => {
|
p::OutgoingMessage::Welcome { peer_id } => {
|
||||||
self.set_status(meta, &peer_id);
|
self.set_status(meta, &peer_id);
|
||||||
if self.producer_peer_id().is_none() {
|
if self.producer_peer_id().is_some() {
|
||||||
self.send(p::IncomingMessage::List);
|
|
||||||
} else {
|
|
||||||
self.start_session();
|
self.start_session();
|
||||||
|
} else if self.connect_to_first_producer() {
|
||||||
|
self.send(p::IncomingMessage::List);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
p::OutgoingMessage::PeerStatusChanged(p::PeerStatus {
|
p::OutgoingMessage::PeerStatusChanged(p::PeerStatus {
|
||||||
|
@ -510,10 +513,11 @@ impl Signaller {
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
p::OutgoingMessage::List { producers } => {
|
p::OutgoingMessage::List { producers, .. } => {
|
||||||
let mut settings = self.settings.lock().unwrap();
|
let mut settings = self.settings.lock().unwrap();
|
||||||
let role = settings.role;
|
let role = settings.role;
|
||||||
if matches!(role, super::WebRTCSignallerRole::Consumer)
|
if matches!(role, super::WebRTCSignallerRole::Consumer)
|
||||||
|
&& settings.connect_to_first_producer
|
||||||
&& settings.producer_peer_id.is_none()
|
&& settings.producer_peer_id.is_none()
|
||||||
{
|
{
|
||||||
if let Some(producer) = producers.first() {
|
if let Some(producer) = producers.first() {
|
||||||
|
|
|
@ -1117,7 +1117,7 @@ impl BaseWebRTCSrc {
|
||||||
glib::closure!(
|
glib::closure!(
|
||||||
#[watch]
|
#[watch]
|
||||||
instance,
|
instance,
|
||||||
move |_signaler: glib::Object, session_id: &str| {
|
move |_signaller: glib::Object, session_id: &str| {
|
||||||
let this = instance.imp();
|
let this = instance.imp();
|
||||||
let mut state = this.state.lock().unwrap();
|
let mut state = this.state.lock().unwrap();
|
||||||
let Some(session) = state.sessions.remove(session_id) else {
|
let Some(session) = state.sessions.remove(session_id) else {
|
||||||
|
|
Loading…
Reference in a new issue