webrtcsink: Port to the 'webrtcsrc' signaller object/interface

With contributions from:
Matthew Waters <matthew@centricular.com>

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1141>
This commit is contained in:
Thibault Saunier 2022-11-18 21:43:03 -03:00 committed by Matthew Waters
parent 538e2e0c9e
commit 8236f3e5e7
12 changed files with 825 additions and 1406 deletions

View file

@ -1,7 +1,7 @@
// SPDX-License-Identifier: MPL-2.0 // SPDX-License-Identifier: MPL-2.0
use super::protocol as p; use super::protocol as p;
use crate::webrtcsink::WebRTCSink; use crate::signaller::{Signallable, SignallableImpl};
use crate::RUNTIME; use crate::RUNTIME;
use anyhow::{anyhow, Error}; use anyhow::{anyhow, Error};
use async_tungstenite::tungstenite::Message as WsMessage; use async_tungstenite::tungstenite::Message as WsMessage;
@ -84,7 +84,7 @@ pub struct Signaller {
} }
impl Signaller { impl Signaller {
fn handle_message(element: &WebRTCSink, msg: String) { fn handle_message(&self, msg: String) {
if let Ok(msg) = serde_json::from_str::<p::IncomingMessage>(&msg) { if let Ok(msg) = serde_json::from_str::<p::IncomingMessage>(&msg) {
match BASE64.decode(&msg.message_payload.into_bytes()) { match BASE64.decode(&msg.message_payload.into_bytes()) {
Ok(payload) => { Ok(payload) => {
@ -98,21 +98,27 @@ impl Signaller {
msg.sender_client_id, msg.sender_client_id,
sdp_msg.sdp sdp_msg.sdp
); );
if let Err(err) = element.start_session( self.obj().emit_by_name::<()>(
&msg.sender_client_id, "session-requested",
&msg.sender_client_id, &[&msg.sender_client_id, &msg.sender_client_id],
Some(&gst_webrtc::WebRTCSessionDescription::new( );
gst_webrtc::WebRTCSDPType::Offer, self.obj().emit_by_name::<()>(
gst_sdp::SDPMessage::parse_buffer(sdp_msg.sdp.as_bytes()) "session-description",
&[
&msg.sender_client_id,
&gst_webrtc::WebRTCSessionDescription::new(
gst_webrtc::WebRTCSDPType::Offer,
gst_sdp::SDPMessage::parse_buffer(
sdp_msg.sdp.as_bytes(),
)
.unwrap(), .unwrap(),
)), ),
) { ],
gst::warning!(CAT, obj: element, "{err}"); );
}
} else { } else {
gst::warning!( gst::warning!(
CAT, CAT,
obj: element, imp: self,
"Failed to parse SDP_OFFER: {payload}" "Failed to parse SDP_OFFER: {payload}"
); );
} }
@ -127,18 +133,19 @@ impl Signaller {
ice_msg.sdp_m_line_index, ice_msg.sdp_m_line_index,
ice_msg.sdp_mid ice_msg.sdp_mid
); );
if let Err(err) = element.handle_ice( self.obj().emit_by_name::<()>(
&msg.sender_client_id, "handle-ice",
Some(ice_msg.sdp_m_line_index), &[
Some(ice_msg.sdp_mid), &msg.sender_client_id,
&ice_msg.candidate, &ice_msg.sdp_m_line_index,
) { &Some(ice_msg.sdp_mid),
gst::warning!(CAT, obj: element, "{err}"); &ice_msg.candidate,
} ],
);
} else { } else {
gst::warning!( gst::warning!(
CAT, CAT,
obj: element, imp: self,
"Failed to parse ICE_CANDIDATE: {payload}" "Failed to parse ICE_CANDIDATE: {payload}"
); );
} }
@ -146,7 +153,7 @@ impl Signaller {
_ => { _ => {
gst::log!( gst::log!(
CAT, CAT,
obj: element, imp: self,
"Ignoring unsupported message type {}", "Ignoring unsupported message type {}",
msg.message_type msg.message_type
); );
@ -156,20 +163,24 @@ impl Signaller {
Err(e) => { Err(e) => {
gst::error!( gst::error!(
CAT, CAT,
obj: element, imp: self,
"Failed to decode message payload from server: {e}" "Failed to decode message payload from server: {e}"
); );
element.handle_signalling_error( self.obj().emit_by_name::<()>(
anyhow!("Failed to decode message payload from server: {e}").into(), "error",
&[&format!(
"{:?}",
anyhow!("Failed to decode message payload from server: {e}")
)],
); );
} }
} }
} else { } else {
gst::log!(CAT, obj: element, "Unknown message from server: [{msg}]"); gst::log!(CAT, imp: self, "Unknown message from server: [{msg}]");
} }
} }
async fn connect(&self, element: &WebRTCSink) -> Result<(), Error> { async fn connect(&self) -> Result<(), Error> {
let settings = self.settings.lock().unwrap().clone(); let settings = self.settings.lock().unwrap().clone();
let connector = if let Some(path) = settings.cafile { let connector = if let Some(path) = settings.cafile {
@ -341,8 +352,7 @@ impl Signaller {
.collect(); .collect();
gst::info!(CAT, "Ice servers: {:?}", ice_servers); gst::info!(CAT, "Ice servers: {:?}", ice_servers);
self.obj().connect_closure(
element.connect_closure(
"consumer-added", "consumer-added",
false, false,
glib::closure!(|_webrtcsink: &gst::Element, glib::closure!(|_webrtcsink: &gst::Element,
@ -409,7 +419,7 @@ impl Signaller {
let (ws, _) = let (ws, _) =
async_tungstenite::tokio::connect_async_with_tls_connector(url, connector).await?; async_tungstenite::tokio::connect_async_with_tls_connector(url, connector).await?;
gst::info!(CAT, obj: element, "connected"); gst::info!(CAT, imp: self, "connected");
// Channel for asynchronously sending out websocket message // Channel for asynchronously sending out websocket message
let (mut ws_sink, mut ws_stream) = ws.split(); let (mut ws_sink, mut ws_stream) = ws.split();
@ -418,7 +428,7 @@ impl Signaller {
// up of messages as with unbounded // up of messages as with unbounded
let (mut _websocket_sender, mut websocket_receiver) = let (mut _websocket_sender, mut websocket_receiver) =
mpsc::channel::<p::OutgoingMessage>(1000); mpsc::channel::<p::OutgoingMessage>(1000);
let element_clone = element.downgrade(); let imp = self.downgrade();
let ping_timeout = settings.ping_timeout; let ping_timeout = settings.ping_timeout;
let send_task_handle = task::spawn(async move { let send_task_handle = task::spawn(async move {
loop { loop {
@ -429,10 +439,10 @@ impl Signaller {
.await .await
{ {
Ok(Some(msg)) => { Ok(Some(msg)) => {
if let Some(element) = element_clone.upgrade() { if let Some(imp) = imp.upgrade() {
gst::trace!( gst::trace!(
CAT, CAT,
obj: element, imp: imp,
"Sending websocket message {}", "Sending websocket message {}",
serde_json::to_string(&msg).unwrap() serde_json::to_string(&msg).unwrap()
); );
@ -450,8 +460,8 @@ impl Signaller {
} }
} }
if let Some(element) = element_clone.upgrade() { if let Some(imp) = imp.upgrade() {
gst::info!(CAT, obj: element, "Done sending"); gst::info!(CAT, imp: imp, "Done sending");
} }
ws_sink.send(WsMessage::Close(None)).await?; ws_sink.send(WsMessage::Close(None)).await?;
@ -460,29 +470,26 @@ impl Signaller {
Ok::<(), Error>(()) Ok::<(), Error>(())
}); });
let element_clone = element.downgrade(); let imp = self.downgrade();
let receive_task_handle = task::spawn(async move { let receive_task_handle = task::spawn(async move {
while let Some(msg) = tokio_stream::StreamExt::next(&mut ws_stream).await { while let Some(msg) = tokio_stream::StreamExt::next(&mut ws_stream).await {
if let Some(element) = element_clone.upgrade() { if let Some(imp) = imp.upgrade() {
match msg { match msg {
Ok(WsMessage::Text(msg)) => { Ok(WsMessage::Text(msg)) => {
gst::trace!(CAT, "received message [{msg}]"); gst::trace!(CAT, "received message [{msg}]");
Signaller::handle_message(&element, msg); imp.handle_message(msg);
} }
Ok(WsMessage::Close(reason)) => { Ok(WsMessage::Close(reason)) => {
gst::info!( gst::info!(CAT, imp: imp, "websocket connection closed: {:?}", reason);
CAT, imp.obj().emit_by_name::<()>("shutdown", &[]);
obj: element,
"websocket connection closed: {:?}",
reason
);
element.shutdown();
break; break;
} }
Ok(_) => (), Ok(_) => (),
Err(err) => { Err(err) => {
element imp.obj().emit_by_name::<()>(
.handle_signalling_error(anyhow!("Error receiving: {err}").into()); "error",
&[&format!("{:?}", anyhow!("Error receiving: {err}"))],
);
break; break;
} }
} }
@ -491,8 +498,8 @@ impl Signaller {
} }
} }
if let Some(element) = element_clone.upgrade() { if let Some(imp) = imp.upgrade() {
gst::info!(CAT, obj: element, "Stopped websocket receiving"); gst::info!(CAT, imp: imp, "Stopped websocket receiving");
} }
}); });
@ -503,24 +510,22 @@ impl Signaller {
Ok(()) Ok(())
} }
}
pub fn start(&self, element: &WebRTCSink) { impl SignallableImpl for Signaller {
fn start(&self) {
let this = self.obj().clone(); let this = self.obj().clone();
let element_clone = element.clone(); let imp = self.downgrade();
task::spawn(async move { task::spawn(async move {
let this = this.imp(); if let Some(imp) = imp.upgrade() {
if let Err(err) = this.connect(&element_clone).await { if let Err(err) = imp.connect().await {
element_clone.handle_signalling_error(err.into()); this.emit_by_name::<()>("error", &[&format!("{:?}", anyhow!(err))]);
}
} }
}); });
} }
pub fn handle_sdp( fn send_sdp(&self, session_id: &str, sdp: &gst_webrtc::WebRTCSessionDescription) {
&self,
element: &WebRTCSink,
session_id: &str,
sdp: &gst_webrtc::WebRTCSessionDescription,
) {
let state = self.state.lock().unwrap(); let state = self.state.lock().unwrap();
let msg = p::OutgoingMessage { let msg = p::OutgoingMessage {
@ -537,20 +542,22 @@ impl Signaller {
}; };
if let Some(mut sender) = state.websocket_sender.clone() { if let Some(mut sender) = state.websocket_sender.clone() {
let element = element.downgrade(); let imp = self.downgrade();
RUNTIME.spawn(async move { RUNTIME.spawn(async move {
if let Err(err) = sender.send(msg).await { if let Err(err) = sender.send(msg).await {
if let Some(element) = element.upgrade() { if let Some(imp) = imp.upgrade() {
element.handle_signalling_error(anyhow!("Error: {err}").into()); imp.obj().emit_by_name::<()>(
"error",
&[&format!("{:?}", anyhow!("Error: {err}"))],
);
} }
} }
}); });
} }
} }
pub fn handle_ice( fn add_ice(
&self, &self,
element: &WebRTCSink,
session_id: &str, session_id: &str,
candidate: &str, candidate: &str,
sdp_m_line_index: Option<u32>, sdp_m_line_index: Option<u32>,
@ -573,48 +580,43 @@ impl Signaller {
}; };
if let Some(mut sender) = state.websocket_sender.clone() { if let Some(mut sender) = state.websocket_sender.clone() {
let element = element.downgrade(); let imp = self.downgrade();
RUNTIME.spawn(async move { RUNTIME.spawn(async move {
if let Err(err) = sender.send(msg).await { if let Err(err) = sender.send(msg).await {
if let Some(element) = element.upgrade() { if let Some(imp) = imp.upgrade() {
element.handle_signalling_error(anyhow!("Error: {err}").into()); imp.obj().emit_by_name::<()>(
"error",
&[&format!("{:?}", anyhow!("Error: {err}"))],
);
} }
} }
}); });
} }
} }
pub fn stop(&self, element: &WebRTCSink) { fn stop(&self) {
gst::info!(CAT, obj: element, "Stopping now"); gst::info!(CAT, imp: self, "Stopping now");
let mut state = self.state.lock().unwrap(); let mut state = self.state.lock().unwrap();
let send_task_handle = state.send_task_handle.take(); let send_task_handle = state.send_task_handle.take();
let receive_task_handle = state.receive_task_handle.take(); let receive_task_handle = state.receive_task_handle.take();
if let Some(mut sender) = state.websocket_sender.take() { if let Some(mut sender) = state.websocket_sender.take() {
let element = element.downgrade(); let imp = self.downgrade();
RUNTIME.block_on(async move { RUNTIME.block_on(async move {
sender.close_channel(); sender.close_channel();
if let Some(handle) = send_task_handle { if let Some(handle) = send_task_handle {
if let Err(err) = handle.await { if let Err(err) = handle.await {
if let Some(element) = element.upgrade() { if let Some(imp) = imp.upgrade() {
gst::warning!( gst::warning!(CAT, imp: imp, "Error while joining send task: {err}");
CAT,
obj: element,
"Error while joining send task: {err}"
);
} }
} }
} }
if let Some(handle) = receive_task_handle { if let Some(handle) = receive_task_handle {
if let Err(err) = handle.await { if let Err(err) = handle.await {
if let Some(element) = element.upgrade() { if let Some(imp) = imp.upgrade() {
gst::warning!( gst::warning!(CAT, imp: imp, "Error while joining receive task: {err}");
CAT,
obj: element,
"Error while joining receive task: {err}"
);
} }
} }
} }
@ -622,8 +624,8 @@ impl Signaller {
} }
} }
pub fn end_session(&self, element: &WebRTCSink, session_id: &str) { fn end_session(&self, session_id: &str) {
gst::info!(CAT, obj: element, "Signalling session {session_id} ended"); gst::info!(CAT, imp: self, "Signalling session {session_id} ended");
// We can seemingly not do anything beyond that // We can seemingly not do anything beyond that
} }
@ -634,6 +636,7 @@ impl ObjectSubclass for Signaller {
const NAME: &'static str = "GstAwsKvsWebRTCSinkSignaller"; const NAME: &'static str = "GstAwsKvsWebRTCSinkSignaller";
type Type = super::AwsKvsSignaller; type Type = super::AwsKvsSignaller;
type ParentType = glib::Object; type ParentType = glib::Object;
type Interfaces = (Signallable,);
} }
impl ObjectImpl for Signaller { impl ObjectImpl for Signaller {

View file

@ -1,63 +1,18 @@
// SPDX-License-Identifier: MPL-2.0 // SPDX-License-Identifier: MPL-2.0
use crate::webrtcsink::{Signallable, WebRTCSink}; use crate::signaller::Signallable;
use gst::glib; use gst::glib;
use gst::subclass::prelude::*;
use std::error::Error;
mod imp; mod imp;
mod protocol; mod protocol;
glib::wrapper! { glib::wrapper! {
pub struct AwsKvsSignaller(ObjectSubclass<imp::Signaller>); pub struct AwsKvsSignaller(ObjectSubclass<imp::Signaller>) @implements Signallable;
} }
unsafe impl Send for AwsKvsSignaller {} unsafe impl Send for AwsKvsSignaller {}
unsafe impl Sync for AwsKvsSignaller {} unsafe impl Sync for AwsKvsSignaller {}
impl Signallable for AwsKvsSignaller {
fn start(&mut self, element: &WebRTCSink) -> Result<(), Box<dyn Error>> {
let signaller = self.imp();
signaller.start(element);
Ok(())
}
fn handle_sdp(
&mut self,
element: &WebRTCSink,
peer_id: &str,
sdp: &gst_webrtc::WebRTCSessionDescription,
) -> Result<(), Box<dyn Error>> {
let signaller = self.imp();
signaller.handle_sdp(element, peer_id, sdp);
Ok(())
}
fn handle_ice(
&mut self,
element: &WebRTCSink,
session_id: &str,
candidate: &str,
sdp_mline_index: Option<u32>,
sdp_mid: Option<String>,
) -> Result<(), Box<dyn Error>> {
let signaller = self.imp();
signaller.handle_ice(element, session_id, candidate, sdp_mline_index, sdp_mid);
Ok(())
}
fn stop(&mut self, element: &WebRTCSink) {
let signaller = self.imp();
signaller.stop(element);
}
fn session_ended(&mut self, element: &WebRTCSink, session_id: &str) {
let signaller = self.imp();
signaller.end_session(element, session_id);
}
}
impl Default for AwsKvsSignaller { impl Default for AwsKvsSignaller {
fn default() -> Self { fn default() -> Self {
glib::Object::new() glib::Object::new()

View file

@ -11,7 +11,7 @@ use once_cell::sync::Lazy;
use tokio::runtime; use tokio::runtime;
mod aws_kvs_signaller; mod aws_kvs_signaller;
mod signaller; pub mod signaller;
pub mod utils; pub mod utils;
pub mod webrtcsink; pub mod webrtcsink;
pub mod webrtcsrc; pub mod webrtcsrc;

View file

@ -102,7 +102,11 @@ unsafe impl prelude::ObjectInterface for Signallable {
* session * session
*/ */
Signal::builder("session-requested") Signal::builder("session-requested")
.param_types([str::static_type(), str::static_type()]) .param_types([
str::static_type(),
str::static_type(),
gst_webrtc::WebRTCSessionDescription::static_type(),
])
.build(), .build(),
/** /**
* GstRSWebRTCSignallableIface::error: * GstRSWebRTCSignallableIface::error:
@ -197,6 +201,34 @@ unsafe impl prelude::ObjectInterface for Signallable {
None None
}) })
.build(), .build(),
/**
* GstRSWebRTCSignallableIface::shutdown:
* @self: The object implementing #GstRSWebRTCSignallableIface
*/
Signal::builder("shutdown").build(),
/**
* GstRSWebRTCSignallableIface::consumer-added:
* @self: The object implementing #GstRSWebRTCSignallableIface
* @peer_id: Id of the consumer
* @webrtcbin: The internal WebRTCBin element
*
* This signal can be used to tweak @webrtcbin, creating a data
* channel for example.
*/
Signal::builder("consumer-added")
.param_types([String::static_type(), gst::Element::static_type()])
.build(),
/**
* GstRSWebRTCSignallableIface::consumer-removed:
* @consumer_id: Identifier of the consumer that was removed
* @webrtcbin: The webrtcbin connected to the newly removed consumer
*
* This signal is emitted right after the connection with a consumer
* has been dropped.
*/
glib::subclass::Signal::builder("consumer-removed")
.param_types([String::static_type(), gst::Element::static_type()])
.build(),
] ]
}); });
SIGNALS.as_ref() SIGNALS.as_ref()

View file

@ -1,48 +1,51 @@
// SPDX-License-Identifier: MPL-2.0 // SPDX-License-Identifier: MPL-2.0
use crate::{webrtcsink::WebRTCSink, RUNTIME}; use crate::signaller::{prelude::*, Signallable};
use crate::utils::{gvalue_to_json, serialize_json_object};
use crate::RUNTIME;
use anyhow::{anyhow, Error}; use anyhow::{anyhow, Error};
use async_tungstenite::tungstenite::Message as WsMessage; use async_tungstenite::tungstenite::Message as WsMessage;
use futures::channel::mpsc; use futures::channel::mpsc;
use futures::prelude::*; use futures::prelude::*;
use gst::glib;
use gst::glib::prelude::*; use gst::glib::prelude::*;
use gst::glib::{self, Type};
use gst::prelude::*;
use gst::subclass::prelude::*; use gst::subclass::prelude::*;
use gst_plugin_webrtc_protocol as p; use gst_plugin_webrtc_protocol as p;
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
use std::collections::HashMap; use std::collections::HashSet;
use std::path::PathBuf; use std::ops::ControlFlow;
use std::str::FromStr;
use std::sync::Mutex; use std::sync::Mutex;
use tokio::task; use std::time::Duration;
use tokio::{task, time::timeout};
use url::Url;
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| { use super::CAT;
gst::DebugCategory::new(
"webrtcsink-signaller",
gst::DebugColorFlags::empty(),
Some("WebRTC sink signaller"),
)
});
#[derive(Default)] #[derive(Debug, Eq, PartialEq, Clone, Copy, glib::Enum, Default)]
struct State { #[repr(u32)]
/// Sender for the websocket messages #[enum_type(name = "GstRSWebRTCSignallerRole")]
websocket_sender: Option<mpsc::Sender<p::IncomingMessage>>, pub enum WebRTCSignallerRole {
send_task_handle: Option<task::JoinHandle<Result<(), Error>>>, #[default]
receive_task_handle: Option<task::JoinHandle<()>>, Consumer,
Producer,
Listener,
} }
#[derive(Clone)] pub struct Settings {
struct Settings { uri: Url,
address: Option<String>, producer_peer_id: Option<String>,
cafile: Option<PathBuf>, cafile: Option<String>,
role: WebRTCSignallerRole,
} }
impl Default for Settings { impl Default for Settings {
fn default() -> Self { fn default() -> Self {
Self { Self {
address: Some("ws://127.0.0.1:8443".to_string()), uri: Url::from_str("ws://127.0.0.1:8443").unwrap(),
cafile: None, producer_peer_id: None,
cafile: Default::default(),
role: Default::default(),
} }
} }
} }
@ -53,11 +56,61 @@ pub struct Signaller {
settings: Mutex<Settings>, settings: Mutex<Settings>,
} }
impl Signaller { #[derive(Default)]
async fn connect(&self, element: &WebRTCSink) -> Result<(), Error> { struct State {
let settings = self.settings.lock().unwrap().clone(); /// Sender for the websocket messages
websocket_sender: Option<mpsc::Sender<p::IncomingMessage>>,
send_task_handle: Option<task::JoinHandle<Result<(), Error>>>,
receive_task_handle: Option<task::JoinHandle<()>>,
producers: HashSet<String>,
}
let connector = if let Some(path) = settings.cafile { impl Signaller {
fn uri(&self) -> Url {
self.settings.lock().unwrap().uri.clone()
}
fn set_uri(&self, uri: &str) -> Result<(), Error> {
let mut settings = self.settings.lock().unwrap();
let mut uri = Url::from_str(uri).map_err(|err| anyhow!("{err:?}"))?;
if let Some(peer_id) = uri
.query_pairs()
.find(|(k, _)| k == "peer-id")
.map(|v| v.1.to_string())
{
if !matches!(settings.role, WebRTCSignallerRole::Consumer) {
gst::warning!(
CAT,
"Setting peer-id doesn't make sense for {:?}",
settings.role
);
} else {
settings.producer_peer_id = Some(peer_id);
}
}
if let Some(peer_id) = &settings.producer_peer_id {
uri.query_pairs_mut()
.clear()
.append_pair("peer-id", peer_id);
}
settings.uri = uri;
Ok(())
}
async fn connect(&self) -> Result<(), Error> {
let obj = self.obj();
let role = self.settings.lock().unwrap().role;
if let super::WebRTCSignallerRole::Consumer = role {
self.producer_peer_id()
.ok_or_else(|| anyhow!("No target producer peer id set"))?;
}
let connector = if let Some(path) = obj.property::<Option<String>>("cafile") {
let cert = tokio::fs::read_to_string(&path).await?; let cert = tokio::fs::read_to_string(&path).await?;
let cert = tokio_native_tls::native_tls::Certificate::from_pem(cert.as_bytes())?; let cert = tokio_native_tls::native_tls::Certificate::from_pem(cert.as_bytes())?;
let mut connector_builder = tokio_native_tls::native_tls::TlsConnector::builder(); let mut connector_builder = tokio_native_tls::native_tls::TlsConnector::builder();
@ -67,178 +120,68 @@ impl Signaller {
None None
}; };
let (ws, _) = async_tungstenite::tokio::connect_async_with_tls_connector( let mut uri = self.uri();
settings.address.unwrap(), uri.set_query(None);
connector, let (ws, _) = timeout(
// FIXME: Make the timeout configurable
Duration::from_secs(20),
async_tungstenite::tokio::connect_async_with_tls_connector(uri.to_string(), connector),
) )
.await?; .await??;
gst::info!(CAT, obj: element, "connected"); gst::info!(CAT, imp: self, "connected");
// Channel for asynchronously sending out websocket message // Channel for asynchronously sending out websocket message
let (mut ws_sink, mut ws_stream) = ws.split(); let (mut ws_sink, mut ws_stream) = ws.split();
// 1000 is completely arbitrary, we simply don't want infinite piling // 1000 is completely arbitrary, we simply don't want infinite piling
// up of messages as with unbounded // up of messages as with unbounded
let (mut websocket_sender, mut websocket_receiver) = let (websocket_sender, mut websocket_receiver) = mpsc::channel::<p::IncomingMessage>(1000);
mpsc::channel::<p::IncomingMessage>(1000); let send_task_handle =
let element_clone = element.downgrade(); RUNTIME.spawn(glib::clone!(@weak-allow-none self as this => async move {
let send_task_handle = task::spawn(async move { while let Some(msg) = websocket_receiver.next().await {
while let Some(msg) = websocket_receiver.next().await { gst::log!(CAT, "Sending websocket message {:?}", msg);
if let Some(element) = element_clone.upgrade() { ws_sink
gst::trace!(CAT, obj: element, "Sending websocket message {:?}", msg); .send(WsMessage::Text(serde_json::to_string(&msg).unwrap()))
.await?;
} }
ws_sink
.send(WsMessage::Text(serde_json::to_string(&msg).unwrap()))
.await?;
}
if let Some(element) = element_clone.upgrade() { let msg = "Done sending";
gst::info!(CAT, obj: element, "Done sending"); this.map_or_else(|| gst::info!(CAT, "{msg}"),
} |this| gst::info!(CAT, imp: this, "{msg}")
);
ws_sink.send(WsMessage::Close(None)).await?; ws_sink.send(WsMessage::Close(None)).await?;
ws_sink.close().await?; ws_sink.close().await?;
Ok::<(), Error>(()) Ok::<(), Error>(())
}); }));
let meta = if let Some(meta) = element.property::<Option<gst::Structure>>("meta") { let obj = self.obj();
serialize_value(&meta.to_value()) let meta =
} else { if let Some(meta) = obj.emit_by_name::<Option<gst::Structure>>("request-meta", &[]) {
None gvalue_to_json(&meta.to_value())
}; } else {
websocket_sender None
.send(p::IncomingMessage::SetPeerStatus(p::PeerStatus { };
roles: vec![p::PeerRole::Producer],
meta,
peer_id: None,
}))
.await?;
let element_clone = element.downgrade(); let receive_task_handle =
let receive_task_handle = task::spawn(async move { RUNTIME.spawn(glib::clone!(@weak-allow-none self as this => async move {
while let Some(msg) = tokio_stream::StreamExt::next(&mut ws_stream).await { while let Some(msg) = tokio_stream::StreamExt::next(&mut ws_stream).await {
if let Some(element) = element_clone.upgrade() { if let Some(ref this) = this {
match msg { if let ControlFlow::Break(_) = this.handle_message(msg, &meta) {
Ok(WsMessage::Text(msg)) => {
gst::trace!(CAT, obj: element, "Received message {}", msg);
if let Ok(msg) = serde_json::from_str::<p::OutgoingMessage>(&msg) {
match msg {
p::OutgoingMessage::Welcome { peer_id } => {
gst::info!(
CAT,
obj: element,
"We are registered with the server, our peer id is {}",
peer_id
);
}
p::OutgoingMessage::StartSession {
session_id,
peer_id,
} => {
if let Err(err) =
element.start_session(&session_id, &peer_id, None)
{
gst::warning!(CAT, obj: element, "{}", err);
}
}
p::OutgoingMessage::EndSession(session_info) => {
if let Err(err) =
element.end_session(&session_info.session_id)
{
gst::warning!(CAT, obj: element, "{}", err);
}
}
p::OutgoingMessage::Peer(p::PeerMessage {
session_id,
peer_message,
}) => match peer_message {
p::PeerMessageInner::Sdp(p::SdpMessage::Answer { sdp }) => {
if let Err(err) = element.handle_sdp(
&session_id,
&gst_webrtc::WebRTCSessionDescription::new(
gst_webrtc::WebRTCSDPType::Answer,
gst_sdp::SDPMessage::parse_buffer(
sdp.as_bytes(),
)
.unwrap(),
),
) {
gst::warning!(CAT, obj: element, "{}", err);
}
}
p::PeerMessageInner::Sdp(p::SdpMessage::Offer {
..
}) => {
gst::warning!(
CAT,
obj: element,
"Ignoring offer from peer"
);
}
p::PeerMessageInner::Ice {
candidate,
sdp_m_line_index,
} => {
if let Err(err) = element.handle_ice(
&session_id,
Some(sdp_m_line_index),
None,
&candidate,
) {
gst::warning!(CAT, obj: element, "{}", err);
}
}
},
_ => {
gst::warning!(
CAT,
obj: element,
"Ignoring unsupported message {:?}",
msg
);
}
}
} else {
gst::error!(
CAT,
obj: element,
"Unknown message from server: {}",
msg
);
element.handle_signalling_error(
anyhow!("Unknown message from server: {}", msg).into(),
);
}
}
Ok(WsMessage::Close(reason)) => {
gst::info!(
CAT,
obj: element,
"websocket connection closed: {:?}",
reason
);
break;
}
Ok(_) => (),
Err(err) => {
element.handle_signalling_error(
anyhow!("Error receiving: {}", err).into(),
);
break; break;
} }
} else {
break;
} }
} else {
break;
} }
}
if let Some(element) = element_clone.upgrade() { let msg = "Stopped websocket receiving";
gst::info!(CAT, obj: element, "Stopped websocket receiving"); this.map_or_else(|| gst::info!(CAT, "{msg}"),
} |this| gst::info!(CAT, imp: this, "{msg}")
}); );
}));
let mut state = self.state.lock().unwrap(); let mut state = self.state.lock().unwrap();
state.websocket_sender = Some(websocket_sender); state.websocket_sender = Some(websocket_sender);
@ -248,53 +191,372 @@ impl Signaller {
Ok(()) Ok(())
} }
pub fn start(&self, element: &WebRTCSink) { fn set_status(&self, meta: &Option<serde_json::Value>, peer_id: &str) {
let this = self.obj().clone(); let role = self.settings.lock().unwrap().role;
let element_clone = element.clone(); self.send(p::IncomingMessage::SetPeerStatus(match role {
task::spawn(async move { super::WebRTCSignallerRole::Consumer => p::PeerStatus {
let this = this.imp(); meta: meta.clone(),
if let Err(err) = this.connect(&element_clone).await { peer_id: Some(peer_id.to_string()),
element_clone.handle_signalling_error(err.into()); roles: vec![],
} },
}); super::WebRTCSignallerRole::Producer => p::PeerStatus {
meta: meta.clone(),
peer_id: Some(peer_id.to_string()),
roles: vec![p::PeerRole::Producer],
},
super::WebRTCSignallerRole::Listener => p::PeerStatus {
meta: meta.clone(),
peer_id: Some(peer_id.to_string()),
roles: vec![p::PeerRole::Listener],
},
}));
} }
pub fn handle_sdp( fn producer_peer_id(&self) -> Option<String> {
&self, let settings = self.settings.lock().unwrap();
element: &WebRTCSink,
session_id: &str,
sdp: &gst_webrtc::WebRTCSessionDescription,
) {
let state = self.state.lock().unwrap();
let msg = p::IncomingMessage::Peer(p::PeerMessage { settings.producer_peer_id.clone()
session_id: session_id.to_string(), }
peer_message: p::PeerMessageInner::Sdp(p::SdpMessage::Offer {
sdp: sdp.sdp().as_text().unwrap(), fn send(&self, msg: p::IncomingMessage) {
}), let state = self.state.lock().unwrap();
if let Some(mut sender) = state.websocket_sender.clone() {
RUNTIME.spawn(glib::clone!(@weak self as this => async move {
if let Err(err) = sender.send(msg).await {
this.obj().emit_by_name::<()>("error", &[&format!("Error: {}", err)]);
}
}));
}
}
pub fn start_session(&self) {
let role = self.settings.lock().unwrap().role;
if matches!(role, super::WebRTCSignallerRole::Consumer) {
let target_producer = self.producer_peer_id().unwrap();
self.send(p::IncomingMessage::StartSession(p::StartSessionMessage {
peer_id: target_producer.clone(),
}));
gst::info!(
CAT,
imp: self,
"Started session with producer peer id {target_producer}",
);
}
}
fn handle_message(
&self,
msg: Result<WsMessage, async_tungstenite::tungstenite::Error>,
meta: &Option<serde_json::Value>,
) -> ControlFlow<()> {
match msg {
Ok(WsMessage::Text(msg)) => {
gst::trace!(CAT, imp: self, "Received message {}", msg);
if let Ok(msg) = serde_json::from_str::<p::OutgoingMessage>(&msg) {
match msg {
p::OutgoingMessage::Welcome { peer_id } => {
self.set_status(meta, &peer_id);
self.start_session();
}
p::OutgoingMessage::PeerStatusChanged(p::PeerStatus {
meta,
roles,
peer_id,
}) => {
let meta = meta.and_then(|m| match m {
serde_json::Value::Object(v) => Some(serialize_json_object(&v)),
_ => {
gst::error!(CAT, imp: self, "Invalid json value: {m:?}");
None
}
});
let peer_id =
peer_id.expect("Status changed should always contain a peer ID");
let mut state = self.state.lock().unwrap();
if roles.iter().any(|r| matches!(r, p::PeerRole::Producer)) {
if !state.producers.contains(&peer_id) {
state.producers.insert(peer_id.clone());
drop(state);
self.obj()
.emit_by_name::<()>("producer-added", &[&peer_id, &meta]);
}
} else if state.producers.remove(&peer_id) {
drop(state);
self.obj()
.emit_by_name::<()>("producer-removed", &[&peer_id, &meta]);
}
}
p::OutgoingMessage::SessionStarted {
peer_id,
session_id,
} => {
self.obj()
.emit_by_name::<()>("session-started", &[&session_id, &peer_id]);
}
p::OutgoingMessage::StartSession {
session_id,
peer_id,
} => {
assert!(matches!(
self.obj().property::<WebRTCSignallerRole>("role"),
super::WebRTCSignallerRole::Producer
));
self.obj().emit_by_name::<()>(
"session-requested",
&[
&session_id,
&peer_id,
&None::<gst_webrtc::WebRTCSessionDescription>,
],
);
}
p::OutgoingMessage::EndSession(p::EndSessionMessage { session_id }) => {
gst::info!(CAT, imp: self, "Session {session_id} ended");
self.obj()
.emit_by_name::<()>("session-ended", &[&session_id]);
}
p::OutgoingMessage::Peer(p::PeerMessage {
session_id,
peer_message,
}) => match peer_message {
p::PeerMessageInner::Sdp(reply) => {
let (sdp, desc_type) = match reply {
p::SdpMessage::Answer { sdp } => {
(sdp, gst_webrtc::WebRTCSDPType::Answer)
}
p::SdpMessage::Offer { sdp } => {
(sdp, gst_webrtc::WebRTCSDPType::Offer)
}
};
let sdp = match gst_sdp::SDPMessage::parse_buffer(sdp.as_bytes()) {
Ok(sdp) => sdp,
Err(err) => {
self.obj().emit_by_name::<()>(
"error",
&[&format!("Error parsing SDP: {sdp} {err:?}")],
);
return ControlFlow::Break(());
}
};
let desc =
gst_webrtc::WebRTCSessionDescription::new(desc_type, sdp);
self.obj().emit_by_name::<()>(
"session-description",
&[&session_id, &desc],
);
}
p::PeerMessageInner::Ice {
candidate,
sdp_m_line_index,
} => {
let sdp_mid: Option<String> = None;
self.obj().emit_by_name::<()>(
"handle-ice",
&[&session_id, &sdp_m_line_index, &sdp_mid, &candidate],
);
}
},
p::OutgoingMessage::Error { details } => {
self.obj().emit_by_name::<()>(
"error",
&[&format!("Error message from server: {details}")],
);
}
_ => {
gst::warning!(CAT, imp: self, "Ignoring unsupported message {:?}", msg);
}
}
} else {
gst::error!(CAT, imp: self, "Unknown message from server: {}", msg);
self.obj().emit_by_name::<()>(
"error",
&[&format!("Unknown message from server: {}", msg)],
);
}
}
Ok(WsMessage::Close(reason)) => {
gst::info!(CAT, imp: self, "websocket connection closed: {:?}", reason);
return ControlFlow::Break(());
}
Ok(_) => (),
Err(err) => {
self.obj()
.emit_by_name::<()>("error", &[&format!("Error receiving: {}", err)]);
return ControlFlow::Break(());
}
}
ControlFlow::Continue(())
}
}
#[glib::object_subclass]
impl ObjectSubclass for Signaller {
const NAME: &'static str = "GstWebRTCSignaller";
type Type = super::Signaller;
type ParentType = glib::Object;
type Interfaces = (Signallable,);
}
impl ObjectImpl for Signaller {
fn properties() -> &'static [glib::ParamSpec] {
static PROPS: Lazy<Vec<glib::ParamSpec>> = Lazy::new(|| {
vec![
glib::ParamSpecString::builder("uri")
.flags(glib::ParamFlags::READWRITE)
.build(),
glib::ParamSpecString::builder("producer-peer-id")
.flags(glib::ParamFlags::READWRITE)
.build(),
glib::ParamSpecString::builder("cafile")
.flags(glib::ParamFlags::READWRITE)
.build(),
glib::ParamSpecEnum::builder_with_default("role", WebRTCSignallerRole::Consumer)
.flags(glib::ParamFlags::READWRITE)
.build(),
]
}); });
if let Some(mut sender) = state.websocket_sender.clone() { PROPS.as_ref()
let element = element.downgrade(); }
RUNTIME.spawn(async move {
if let Err(err) = sender.send(msg).await { fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) {
if let Some(element) = element.upgrade() { match pspec.name() {
element.handle_signalling_error(anyhow!("Error: {}", err).into()); "uri" => {
if let Err(e) = self.set_uri(value.get::<&str>().expect("type checked upstream")) {
gst::error!(CAT, "Couldn't set URI: {e:?}");
}
}
"producer-peer-id" => {
let mut settings = self.settings.lock().unwrap();
if !matches!(settings.role, WebRTCSignallerRole::Consumer) {
gst::warning!(
CAT,
"Setting `producer-peer-id` doesn't make sense for {:?}",
settings.role
);
} else {
settings.producer_peer_id = value
.get::<Option<String>>()
.expect("type checked upstream");
}
}
"cafile" => {
self.settings.lock().unwrap().cafile = value
.get::<Option<String>>()
.expect("type checked upstream")
}
"role" => {
self.settings.lock().unwrap().role = value
.get::<WebRTCSignallerRole>()
.expect("type checked upstream")
}
_ => unimplemented!(),
}
}
fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
let settings = self.settings.lock().unwrap();
match pspec.name() {
"uri" => settings.uri.to_string().to_value(),
"producer-peer-id" => {
if !matches!(settings.role, WebRTCSignallerRole::Consumer) {
gst::warning!(
CAT,
"`producer-peer-id` doesn't make sense for {:?}",
settings.role
);
}
settings.producer_peer_id.to_value()
}
"cafile" => settings.cafile.to_value(),
"role" => settings.role.to_value(),
_ => unimplemented!(),
}
}
}
impl SignallableImpl for Signaller {
fn start(&self) {
gst::info!(CAT, imp: self, "Starting");
RUNTIME.spawn(glib::clone!(@weak self as this => async move {
if let Err(err) = this.connect().await {
this.obj().emit_by_name::<()>("error", &[&format!("Error receiving: {}", err)]);
}
}));
}
fn stop(&self) {
gst::info!(CAT, imp: self, "Stopping now");
let mut state = self.state.lock().unwrap();
let send_task_handle = state.send_task_handle.take();
let receive_task_handle = state.receive_task_handle.take();
if let Some(mut sender) = state.websocket_sender.take() {
RUNTIME.block_on(async move {
sender.close_channel();
if let Some(handle) = send_task_handle {
if let Err(err) = handle.await {
gst::warning!(CAT, imp: self, "Error while joining send task: {}", err);
}
}
if let Some(handle) = receive_task_handle {
if let Err(err) = handle.await {
gst::warning!(CAT, imp: self, "Error while joining receive task: {}", err);
} }
} }
}); });
} }
} }
pub fn handle_ice( fn send_sdp(&self, session_id: &str, sdp: &gst_webrtc::WebRTCSessionDescription) {
gst::debug!(CAT, imp: self, "Sending SDP {sdp:#?}");
let role = self.settings.lock().unwrap().role;
let is_consumer = matches!(role, super::WebRTCSignallerRole::Consumer);
let msg = p::IncomingMessage::Peer(p::PeerMessage {
session_id: session_id.to_owned(),
peer_message: p::PeerMessageInner::Sdp(if is_consumer {
p::SdpMessage::Answer {
sdp: sdp.sdp().as_text().unwrap(),
}
} else {
p::SdpMessage::Offer {
sdp: sdp.sdp().as_text().unwrap(),
}
}),
});
self.send(msg);
}
fn add_ice(
&self, &self,
element: &WebRTCSink,
session_id: &str, session_id: &str,
candidate: &str, candidate: &str,
sdp_m_line_index: Option<u32>, sdp_m_line_index: Option<u32>,
_sdp_mid: Option<String>, _sdp_mid: Option<String>,
) { ) {
let state = self.state.lock().unwrap(); gst::debug!(
CAT,
imp: self,
"Adding ice candidate {candidate:?} for {sdp_m_line_index:?} on session {session_id}"
);
let msg = p::IncomingMessage::Peer(p::PeerMessage { let msg = p::IncomingMessage::Peer(p::PeerMessage {
session_id: session_id.to_string(), session_id: session_id.to_string(),
@ -304,185 +566,27 @@ impl Signaller {
}, },
}); });
if let Some(mut sender) = state.websocket_sender.clone() { self.send(msg);
let element = element.downgrade();
RUNTIME.spawn(async move {
if let Err(err) = sender.send(msg).await {
if let Some(element) = element.upgrade() {
element.handle_signalling_error(anyhow!("Error: {}", err).into());
}
}
});
}
} }
pub fn stop(&self, element: &WebRTCSink) { fn end_session(&self, session_id: &str) {
gst::info!(CAT, obj: element, "Stopping now"); gst::debug!(CAT, imp: self, "Signalling session done {}", session_id);
let mut state = self.state.lock().unwrap();
let send_task_handle = state.send_task_handle.take();
let receive_task_handle = state.receive_task_handle.take();
if let Some(mut sender) = state.websocket_sender.take() {
let element = element.downgrade();
RUNTIME.block_on(async move {
sender.close_channel();
if let Some(handle) = send_task_handle {
if let Err(err) = handle.await {
if let Some(element) = element.upgrade() {
gst::warning!(
CAT,
obj: element,
"Error while joining send task: {}",
err
);
}
}
}
if let Some(handle) = receive_task_handle {
if let Err(err) = handle.await {
if let Some(element) = element.upgrade() {
gst::warning!(
CAT,
obj: element,
"Error while joining receive task: {}",
err
);
}
}
}
});
}
}
pub fn end_session(&self, element: &WebRTCSink, session_id: &str) {
gst::debug!(CAT, obj: element, "Signalling session {} ended", session_id);
let state = self.state.lock().unwrap(); let state = self.state.lock().unwrap();
let session_id = session_id.to_string(); let session_id = session_id.to_string();
let element = element.downgrade();
if let Some(mut sender) = state.websocket_sender.clone() { if let Some(mut sender) = state.websocket_sender.clone() {
RUNTIME.spawn(async move { RUNTIME.spawn(glib::clone!(@weak self as this => async move {
if let Err(err) = sender if let Err(err) = sender
.send(p::IncomingMessage::EndSession(p::EndSessionMessage { .send(p::IncomingMessage::EndSession(p::EndSessionMessage {
session_id: session_id.to_string(), session_id,
})) }))
.await .await
{ {
if let Some(element) = element.upgrade() { this.obj().emit_by_name::<()>("error", &[&format!("Error: {}", err)]);
element.handle_signalling_error(anyhow!("Error: {}", err).into());
}
} }
}); }));
} }
} }
} }
#[glib::object_subclass] impl GstObjectImpl for Signaller {}
impl ObjectSubclass for Signaller {
const NAME: &'static str = "GstWebRTCSinkSignaller";
type Type = super::Signaller;
type ParentType = glib::Object;
}
impl ObjectImpl for Signaller {
fn properties() -> &'static [glib::ParamSpec] {
static PROPERTIES: Lazy<Vec<glib::ParamSpec>> = Lazy::new(|| {
vec![
glib::ParamSpecString::builder("address")
.nick("Address")
.blurb("Address of the signalling server")
.default_value("ws://127.0.0.1:8443")
.build(),
glib::ParamSpecString::builder("cafile")
.nick("CA file")
.blurb("Path to a Certificate file to add to the set of roots the TLS connector will trust")
.build(),
]
});
PROPERTIES.as_ref()
}
fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) {
match pspec.name() {
"address" => {
let address: Option<_> = value.get().expect("type checked upstream");
if let Some(address) = address {
gst::info!(CAT, "Signaller address set to {}", address);
let mut settings = self.settings.lock().unwrap();
settings.address = Some(address);
} else {
gst::error!(CAT, "address can't be None");
}
}
"cafile" => {
let value: String = value.get().unwrap();
let mut settings = self.settings.lock().unwrap();
settings.cafile = Some(value.into());
}
_ => unimplemented!(),
}
}
fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
match pspec.name() {
"address" => self.settings.lock().unwrap().address.to_value(),
"cafile" => {
let settings = self.settings.lock().unwrap();
let cafile = settings.cafile.as_ref();
cafile.and_then(|file| file.to_str()).to_value()
}
_ => unimplemented!(),
}
}
}
fn serialize_value(val: &gst::glib::Value) -> Option<serde_json::Value> {
match val.type_() {
Type::STRING => Some(val.get::<String>().unwrap().into()),
Type::BOOL => Some(val.get::<bool>().unwrap().into()),
Type::I32 => Some(val.get::<i32>().unwrap().into()),
Type::U32 => Some(val.get::<u32>().unwrap().into()),
Type::I_LONG | Type::I64 => Some(val.get::<i64>().unwrap().into()),
Type::U_LONG | Type::U64 => Some(val.get::<u64>().unwrap().into()),
Type::F32 => Some(val.get::<f32>().unwrap().into()),
Type::F64 => Some(val.get::<f64>().unwrap().into()),
_ => {
if let Ok(s) = val.get::<gst::Structure>() {
serde_json::to_value(
s.iter()
.filter_map(|(name, value)| {
serialize_value(value).map(|value| (name.to_string(), value))
})
.collect::<HashMap<String, serde_json::Value>>(),
)
.ok()
} else if let Ok(a) = val.get::<gst::Array>() {
serde_json::to_value(
a.iter()
.filter_map(|value| serialize_value(value))
.collect::<Vec<serde_json::Value>>(),
)
.ok()
} else if let Some((_klass, values)) = gst::glib::FlagsValue::from_value(val) {
Some(
values
.iter()
.map(|value| value.nick())
.collect::<Vec<&str>>()
.join("+")
.into(),
)
} else if let Ok(value) = val.serialize() {
Some(value.as_str().into())
} else {
gst::warning!(CAT, "Can't convert {} to json", val.type_().name());
None
}
}
}
}

View file

@ -1,64 +1,46 @@
// SPDX-License-Identifier: MPL-2.0 mod iface;
use crate::webrtcsink::{Signallable, WebRTCSink};
use gst::glib;
use gst::subclass::prelude::*;
use std::error::Error;
mod imp; mod imp;
use gst::glib;
glib::wrapper! { use once_cell::sync::Lazy;
pub struct Signaller(ObjectSubclass<imp::Signaller>); // Expose traits and objects from the module itself so it exactly looks like
// generated bindings
pub use imp::WebRTCSignallerRole;
pub mod prelude {
pub use {super::SignallableExt, super::SignallableImpl};
} }
unsafe impl Send for Signaller {} pub static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
unsafe impl Sync for Signaller {} gst::DebugCategory::new(
"webrtcsrc-signaller",
gst::DebugColorFlags::empty(),
Some("WebRTC src signaller"),
)
});
impl Signallable for Signaller { glib::wrapper! {
fn start(&mut self, element: &WebRTCSink) -> Result<(), Box<dyn Error>> { pub struct Signallable(ObjectInterface<iface::Signallable>);
let signaller = self.imp(); }
signaller.start(element);
Ok(()) glib::wrapper! {
} pub struct Signaller(ObjectSubclass <imp::Signaller>) @implements Signallable;
fn handle_sdp(
&mut self,
element: &WebRTCSink,
peer_id: &str,
sdp: &gst_webrtc::WebRTCSessionDescription,
) -> Result<(), Box<dyn Error>> {
let signaller = self.imp();
signaller.handle_sdp(element, peer_id, sdp);
Ok(())
}
fn handle_ice(
&mut self,
element: &WebRTCSink,
session_id: &str,
candidate: &str,
sdp_mline_index: Option<u32>,
sdp_mid: Option<String>,
) -> Result<(), Box<dyn Error>> {
let signaller = self.imp();
signaller.handle_ice(element, session_id, candidate, sdp_mline_index, sdp_mid);
Ok(())
}
fn stop(&mut self, element: &WebRTCSink) {
let signaller = self.imp();
signaller.stop(element);
}
fn session_ended(&mut self, element: &WebRTCSink, session_id: &str) {
let signaller = self.imp();
signaller.end_session(element, session_id);
}
} }
impl Default for Signaller { impl Default for Signaller {
fn default() -> Self { fn default() -> Self {
glib::Object::new() glib::Object::builder().build()
} }
} }
impl Signaller {
pub fn new(mode: WebRTCSignallerRole) -> Self {
glib::Object::builder().property("role", &mode).build()
}
}
pub use iface::SignallableExt;
pub use iface::SignallableImpl;
pub use iface::SignallableImplExt;
unsafe impl Send for Signallable {}
unsafe impl Sync for Signallable {}

View file

@ -20,7 +20,7 @@ use std::sync::Mutex;
use super::homegrown_cc::CongestionController; use super::homegrown_cc::CongestionController;
use super::{WebRTCSinkCongestionControl, WebRTCSinkError, WebRTCSinkMitigationMode}; use super::{WebRTCSinkCongestionControl, WebRTCSinkError, WebRTCSinkMitigationMode};
use crate::aws_kvs_signaller::AwsKvsSignaller; use crate::aws_kvs_signaller::AwsKvsSignaller;
use crate::signaller::Signaller as DefaultSignaller; use crate::signaller::{prelude::*, Signallable, Signaller, WebRTCSignallerRole};
use crate::RUNTIME; use crate::RUNTIME;
use std::collections::BTreeMap; use std::collections::BTreeMap;
@ -187,9 +187,21 @@ struct NavigationEvent {
event: gst_video::NavigationEvent, event: gst_video::NavigationEvent,
} }
// Used to ensure signal are disconnected when a new signaller is is
#[allow(dead_code)]
struct SignallerSignals {
error: glib::SignalHandlerId,
request_meta: glib::SignalHandlerId,
session_requested: glib::SignalHandlerId,
session_ended: glib::SignalHandlerId,
session_description: glib::SignalHandlerId,
handle_ice: glib::SignalHandlerId,
shutdown: glib::SignalHandlerId,
}
/* Our internal state */ /* Our internal state */
struct State { struct State {
signaller: Box<dyn super::SignallableObject>, signaller: Signallable,
signaller_state: SignallerState, signaller_state: SignallerState,
sessions: HashMap<String, Session>, sessions: HashMap<String, Session>,
codecs: BTreeMap<i32, Codec>, codecs: BTreeMap<i32, Codec>,
@ -205,6 +217,7 @@ struct State {
streams: HashMap<String, InputStream>, streams: HashMap<String, InputStream>,
navigation_handler: Option<NavigationEventHandler>, navigation_handler: Option<NavigationEventHandler>,
mids: HashMap<String, String>, mids: HashMap<String, String>,
signaller_signals: Option<SignallerSignals>,
} }
fn create_navigation_event(sink: &super::WebRTCSink, msg: &str) { fn create_navigation_event(sink: &super::WebRTCSink, msg: &str) {
@ -303,10 +316,10 @@ impl Default for Settings {
impl Default for State { impl Default for State {
fn default() -> Self { fn default() -> Self {
let signaller = DefaultSignaller::default(); let signaller = Signaller::new(WebRTCSignallerRole::Producer);
Self { Self {
signaller: Box::new(signaller), signaller: signaller.upcast(),
signaller_state: SignallerState::Stopped, signaller_state: SignallerState::Stopped,
sessions: HashMap::new(), sessions: HashMap::new(),
codecs: BTreeMap::new(), codecs: BTreeMap::new(),
@ -318,6 +331,7 @@ impl Default for State {
streams: HashMap::new(), streams: HashMap::new(),
navigation_handler: None, navigation_handler: None,
mids: HashMap::new(), mids: HashMap::new(),
signaller_signals: Default::default(),
} }
} }
} }
@ -756,12 +770,7 @@ impl VideoEncoder {
} }
impl State { impl State {
fn finalize_session( fn finalize_session(&mut self, session: &mut Session, signal: bool) {
&mut self,
element: &super::WebRTCSink,
session: &mut Session,
signal: bool,
) {
gst::info!(CAT, "Ending session {}", session.id); gst::info!(CAT, "Ending session {}", session.id);
session.pipeline.debug_to_dot_file_with_ts( session.pipeline.debug_to_dot_file_with_ts(
gst::DebugGraphDetails::all(), gst::DebugGraphDetails::all(),
@ -777,18 +786,13 @@ impl State {
}); });
if signal { if signal {
self.signaller.session_ended(element, &session.peer_id); self.signaller.end_session(&session.id);
} }
} }
fn end_session( fn end_session(&mut self, session_id: &str, signal: bool) -> Option<Session> {
&mut self,
element: &super::WebRTCSink,
session_id: &str,
signal: bool,
) -> Option<Session> {
if let Some(mut session) = self.sessions.remove(session_id) { if let Some(mut session) = self.sessions.remove(session_id) {
self.finalize_session(element, &mut session, signal); self.finalize_session(&mut session, signal);
Some(session) Some(session)
} else { } else {
None None
@ -800,23 +804,13 @@ impl State {
&& element.current_state() >= gst::State::Paused && element.current_state() >= gst::State::Paused
&& self.codec_discovery_done && self.codec_discovery_done
{ {
if let Err(err) = self.signaller.start(element) { self.signaller.start();
gst::error!(CAT, obj: element, "error: {}", err);
gst::element_error!(
element,
gst::StreamError::Failed,
["Failed to start signaller {}", err]
);
} else {
gst::info!(CAT, "Started signaller");
self.signaller_state = SignallerState::Started;
}
} }
} }
fn maybe_stop_signaller(&mut self, element: &super::WebRTCSink) { fn maybe_stop_signaller(&mut self, _element: &super::WebRTCSink) {
if self.signaller_state == SignallerState::Started { if self.signaller_state == SignallerState::Started {
self.signaller.stop(element); self.signaller.stop();
self.signaller_state = SignallerState::Stopped; self.signaller_state = SignallerState::Stopped;
gst::info!(CAT, "Stopped signaller"); gst::info!(CAT, "Stopped signaller");
} }
@ -1380,7 +1374,7 @@ impl WebRTCSink {
let session_ids: Vec<_> = state.sessions.keys().map(|k| k.to_owned()).collect(); let session_ids: Vec<_> = state.sessions.keys().map(|k| k.to_owned()).collect();
for id in session_ids { for id in session_ids {
state.end_session(element, &id, true); state.end_session(&id, true);
} }
state state
@ -1406,10 +1400,100 @@ impl WebRTCSink {
Ok(()) Ok(())
} }
fn connect_signaller(&self, signaler: &Signallable) {
let instance = &*self.obj();
let _ = self.state.lock().unwrap().signaller_signals.insert(SignallerSignals {
error: signaler.connect_closure(
"error",
false,
glib::closure!(@watch instance => move |_signaler: glib::Object, error: String| {
gst::element_error!(
instance,
gst::StreamError::Failed,
["Signalling error: {}", error]
);
})
),
request_meta: signaler.connect_closure(
"request-meta",
false,
glib::closure!(@watch instance => move |_signaler: glib::Object| -> Option<gst::Structure> {
let meta = instance.imp().settings.lock().unwrap().meta.clone();
meta
})
),
session_requested: signaler.connect_closure(
"session-requested",
false,
glib::closure!(@watch instance => move |_signaler: glib::Object, session_id: &str, peer_id: &str, offer: Option<&gst_webrtc::WebRTCSessionDescription>|{
if let Err(err) = instance.imp().start_session(session_id, peer_id, offer) {
gst::warning!(CAT, "{}", err);
}
})
),
session_description: signaler.connect_closure(
"session-description",
false,
glib::closure!(@watch instance => move |
_signaler: glib::Object,
peer_id: &str,
session_description: &gst_webrtc::WebRTCSessionDescription| {
if session_description.type_() == gst_webrtc::WebRTCSDPType::Answer {
instance.imp().handle_sdp_answer(instance, peer_id, session_description);
} else {
gst::error!(CAT, obj: instance, "Unsupported SDP Type");
}
}
),
),
handle_ice: signaler.connect_closure(
"handle-ice",
false,
glib::closure!(@watch instance => move |
_signaler: glib::Object,
session_id: &str,
sdp_m_line_index: u32,
_sdp_mid: Option<String>,
candidate: &str| {
instance
.imp()
.handle_ice(session_id, Some(sdp_m_line_index), None, candidate);
}),
),
session_ended: signaler.connect_closure(
"session-ended",
false,
glib::closure!(@watch instance => move |_signaler: glib::Object, session_id: &str|{
if let Err(err) = instance.imp().remove_session(instance, session_id, false) {
gst::warning!(CAT, "{}", err);
}
})
),
shutdown: signaler.connect_closure(
"shutdown",
false,
glib::closure!(@watch instance => move |_signaler: glib::Object|{
instance.imp().shutdown(instance);
})
),
});
}
/// When using a custom signaller /// When using a custom signaller
pub fn set_signaller(&self, signaller: Box<dyn super::SignallableObject>) -> Result<(), Error> { pub fn set_signaller(&self, signaller: Signallable) -> Result<(), Error> {
let sigobj = signaller.clone();
let mut state = self.state.lock().unwrap(); let mut state = self.state.lock().unwrap();
self.connect_signaller(&sigobj);
state.signaller = signaller; state.signaller = signaller;
Ok(()) Ok(())
@ -1434,27 +1518,18 @@ impl WebRTCSink {
fn on_offer_created( fn on_offer_created(
&self, &self,
element: &super::WebRTCSink, _element: &super::WebRTCSink,
offer: gst_webrtc::WebRTCSessionDescription, offer: gst_webrtc::WebRTCSessionDescription,
session_id: &str, session_id: &str,
) { ) {
let mut state = self.state.lock().unwrap(); let state = self.state.lock().unwrap();
if let Some(session) = state.sessions.get(session_id) { if let Some(session) = state.sessions.get(session_id) {
session session
.webrtcbin .webrtcbin
.emit_by_name::<()>("set-local-description", &[&offer, &None::<gst::Promise>]); .emit_by_name::<()>("set-local-description", &[&offer, &None::<gst::Promise>]);
if let Err(err) = state.signaller.handle_sdp(element, session_id, &offer) { state.signaller.send_sdp(session_id, &offer);
gst::warning!(
CAT,
"Failed to handle SDP for session {}: {}",
session_id,
err
);
state.end_session(element, session_id, true);
}
} }
} }
@ -1482,24 +1557,14 @@ impl WebRTCSink {
.webrtcbin .webrtcbin
.emit_by_name::<()>("set-local-description", &[&answer, &None::<gst::Promise>]); .emit_by_name::<()>("set-local-description", &[&answer, &None::<gst::Promise>]);
if let Err(err) = state.signaller.handle_sdp(element, session_id, &answer) { state.signaller.send_sdp(session_id, &answer);
gst::warning!( let session_id = session.id.clone();
CAT,
"Failed to handle SDP for session {}: {}",
session_id,
err
);
state.finalize_session(element, &mut session, true); state.sessions.insert(session.id.clone(), session);
} else {
let session_id = session.id.clone();
state.sessions.insert(session.id.clone(), session); drop(state);
drop(state); self.on_remote_description_set(element, session_id)
self.on_remote_description_set(element, session_id)
}
} }
} }
@ -1802,34 +1867,20 @@ impl WebRTCSink {
fn on_ice_candidate( fn on_ice_candidate(
&self, &self,
element: &super::WebRTCSink, _element: &super::WebRTCSink,
session_id: String, session_id: String,
sdp_m_line_index: u32, sdp_m_line_index: u32,
candidate: String, candidate: String,
) { ) {
let mut state = self.state.lock().unwrap(); let state = self.state.lock().unwrap();
if let Err(err) = state.signaller.handle_ice( state
element, .signaller
&session_id, .add_ice(&session_id, &candidate, Some(sdp_m_line_index), None)
&candidate,
Some(sdp_m_line_index),
None,
) {
gst::warning!(
CAT,
"Failed to handle ICE in session {}: {}",
session_id,
err
);
state.end_session(element, &session_id, true);
}
} }
/// Called by the signaller to add a new session /// Called by the signaller to add a new session
pub fn start_session( pub fn start_session(
&self, &self,
element: &super::WebRTCSink,
session_id: &str, session_id: &str,
peer_id: &str, peer_id: &str,
offer: Option<&gst_webrtc::WebRTCSessionDescription>, offer: Option<&gst_webrtc::WebRTCSessionDescription>,
@ -1838,6 +1889,7 @@ impl WebRTCSink {
let mut state = self.state.lock().unwrap(); let mut state = self.state.lock().unwrap();
let peer_id = peer_id.to_string(); let peer_id = peer_id.to_string();
let session_id = session_id.to_string(); let session_id = session_id.to_string();
let element = self.obj().clone();
if state.sessions.contains_key(&session_id) { if state.sessions.contains_key(&session_id) {
return Err(WebRTCSinkError::DuplicateSessionId(session_id)); return Err(WebRTCSinkError::DuplicateSessionId(session_id));
@ -2167,16 +2219,16 @@ impl WebRTCSink {
}); });
if settings.enable_data_channel_navigation { if settings.enable_data_channel_navigation {
state.navigation_handler = Some(NavigationEventHandler::new(element, &webrtcbin)); state.navigation_handler = Some(NavigationEventHandler::new(&element, &webrtcbin));
} }
state.sessions.insert(session_id.to_string(), session); state.sessions.insert(session_id.to_string(), session);
let element_clone = element.downgrade();
let mut streams: Vec<InputStream> = state.streams.values().cloned().collect(); let mut streams: Vec<InputStream> = state.streams.values().cloned().collect();
streams.sort_by_key(|s| s.serial); streams.sort_by_key(|s| s.serial);
let element_clone = element.downgrade();
let offer_clone = offer.cloned(); let offer_clone = offer.cloned();
RUNTIME.spawn(async move { RUNTIME.spawn(async move {
if let Some(element) = element_clone.upgrade() { if let Some(element) = element_clone.upgrade() {
@ -2275,6 +2327,12 @@ impl WebRTCSink {
// so that application code can create data channels at the correct // so that application code can create data channels at the correct
// moment. // moment.
element.emit_by_name::<()>("consumer-added", &[&peer_id, &webrtcbin]); element.emit_by_name::<()>("consumer-added", &[&peer_id, &webrtcbin]);
{
let state = this.state.lock().unwrap();
state
.signaller
.emit_by_name::<()>("consumer-added", &[&peer_id, &webrtcbin]);
}
// We don't connect to on-negotiation-needed, this in order to call the above // We don't connect to on-negotiation-needed, this in order to call the above
// signal without holding the state lock: // signal without holding the state lock:
@ -2315,7 +2373,10 @@ impl WebRTCSink {
return Err(WebRTCSinkError::NoSessionWithId(session_id.to_string())); return Err(WebRTCSinkError::NoSessionWithId(session_id.to_string()));
} }
if let Some(session) = state.end_session(element, session_id, signal) { if let Some(session) = state.end_session(session_id, signal) {
state
.signaller
.emit_by_name::<()>("consumer-removed", &[&session.peer_id, &session.webrtcbin]);
drop(state); drop(state);
element.emit_by_name::<()>("consumer-removed", &[&session.peer_id, &session.webrtcbin]); element.emit_by_name::<()>("consumer-removed", &[&session.peer_id, &session.webrtcbin]);
} }
@ -2482,7 +2543,7 @@ impl WebRTCSink {
}); });
if remove { if remove {
state.finalize_session(element, &mut session, true); state.finalize_session(&mut session, true);
} else { } else {
state.sessions.insert(session.id.clone(), session); state.sessions.insert(session.id.clone(), session);
} }
@ -2492,34 +2553,37 @@ impl WebRTCSink {
/// Called by the signaller with an ice candidate /// Called by the signaller with an ice candidate
pub fn handle_ice( pub fn handle_ice(
&self, &self,
_element: &super::WebRTCSink,
session_id: &str, session_id: &str,
sdp_m_line_index: Option<u32>, sdp_m_line_index: Option<u32>,
_sdp_mid: Option<String>, _sdp_mid: Option<String>,
candidate: &str, candidate: &str,
) -> Result<(), WebRTCSinkError> { ) {
let state = self.state.lock().unwrap(); let state = self.state.lock().unwrap();
let sdp_m_line_index = sdp_m_line_index.ok_or(WebRTCSinkError::MandatorySdpMlineIndex)?; let sdp_m_line_index = match sdp_m_line_index {
Some(sdp_m_line_index) => sdp_m_line_index,
None => {
gst::warning!(CAT, "No mandatory SDP m-line index");
return;
}
};
if let Some(session) = state.sessions.get(session_id) { if let Some(session) = state.sessions.get(session_id) {
gst::trace!(CAT, "adding ice candidate for session {}", session_id); gst::trace!(CAT, "adding ice candidate for session {}", session_id);
session session
.webrtcbin .webrtcbin
.emit_by_name::<()>("add-ice-candidate", &[&sdp_m_line_index, &candidate]); .emit_by_name::<()>("add-ice-candidate", &[&sdp_m_line_index, &candidate]);
Ok(())
} else { } else {
Err(WebRTCSinkError::NoSessionWithId(session_id.to_string())) gst::warning!(CAT, "No consumer with ID {session_id}");
} }
} }
/// Called by the signaller with an answer to our offer pub fn handle_sdp_answer(
pub fn handle_sdp(
&self, &self,
element: &super::WebRTCSink, element: &super::WebRTCSink,
session_id: &str, session_id: &str,
desc: &gst_webrtc::WebRTCSessionDescription, desc: &gst_webrtc::WebRTCSessionDescription,
) -> Result<(), WebRTCSinkError> { ) {
let mut state = self.state.lock().unwrap(); let mut state = self.state.lock().unwrap();
if let Some(session) = state.sessions.get_mut(session_id) { if let Some(session) = state.sessions.get_mut(session_id) {
@ -2544,12 +2608,14 @@ impl WebRTCSink {
media_idx, media_idx,
media_str media_str
); );
state.end_session(element, session_id, true); state.end_session(session_id, true);
return Err(WebRTCSinkError::ConsumerRefusedMedia { gst::warning!(
session_id: session_id.to_string(), CAT,
media_idx, obj: element,
}); "Consumer refused media {session_id}, {media_idx}"
);
return;
} }
} }
@ -2568,12 +2634,10 @@ impl WebRTCSink {
session_id, session_id,
); );
state.end_session(element, session_id, true); state.end_session(session_id, true);
return Err(WebRTCSinkError::ConsumerNoValidPayload { gst::warning!(CAT, obj: element, "Consumer did not provide valid payload for media sesion: {session_id} media_ix: {media_idx}");
session_id: session_id.to_string(), return;
media_idx,
});
} }
} }
@ -2592,10 +2656,8 @@ impl WebRTCSink {
session session
.webrtcbin .webrtcbin
.emit_by_name::<()>("set-remote-description", &[desc, &promise]); .emit_by_name::<()>("set-remote-description", &[desc, &promise]);
Ok(())
} else { } else {
Err(WebRTCSinkError::NoSessionWithId(session_id.to_string())) gst::warning!(CAT, "No consumer with ID {session_id}");
} }
} }
@ -3246,6 +3308,9 @@ impl ObjectImpl for WebRTCSink {
fn constructed(&self) { fn constructed(&self) {
self.parent_constructed(); self.parent_constructed();
let signaller = self.state.lock().unwrap().signaller.clone();
self.connect_signaller(&signaller);
let obj = self.obj(); let obj = self.obj();
obj.set_suppressed_flags(gst::ElementFlags::SINK | gst::ElementFlags::SOURCE); obj.set_suppressed_flags(gst::ElementFlags::SINK | gst::ElementFlags::SOURCE);
@ -3424,15 +3489,7 @@ impl ChildProxyImpl for WebRTCSink {
fn child_by_name(&self, name: &str) -> Option<glib::Object> { fn child_by_name(&self, name: &str) -> Option<glib::Object> {
match name { match name {
"signaller" => Some( "signaller" => Some(self.state.lock().unwrap().signaller.clone().upcast()),
self.state
.lock()
.unwrap()
.signaller
.as_ref()
.as_ref()
.clone(),
),
_ => None, _ => None,
} }
} }
@ -3463,7 +3520,7 @@ impl ObjectImpl for AwsKvsWebRTCSink {
let element = self.obj(); let element = self.obj();
let ws = element.upcast_ref::<super::WebRTCSink>().imp(); let ws = element.upcast_ref::<super::WebRTCSink>().imp();
let _ = ws.set_signaller(Box::<AwsKvsSignaller>::default()); let _ = ws.set_signaller(AwsKvsSignaller::default().upcast());
} }
} }

View file

@ -1,5 +1,6 @@
// SPDX-License-Identifier: MPL-2.0 // SPDX-License-Identifier: MPL-2.0
use crate::signaller::Signallable;
/** /**
* element-webrtcsink: * element-webrtcsink:
* *
@ -9,9 +10,9 @@
use gst::glib; use gst::glib;
use gst::prelude::*; use gst::prelude::*;
use gst::subclass::prelude::*; use gst::subclass::prelude::*;
use std::error::Error;
mod homegrown_cc; mod homegrown_cc;
mod imp; mod imp;
glib::wrapper! { glib::wrapper! {
@ -42,101 +43,21 @@ pub enum WebRTCSinkError {
}, },
} }
pub trait Signallable: Sync + Send + 'static { impl Default for WebRTCSink {
fn start(&mut self, element: &WebRTCSink) -> Result<(), Box<dyn Error>>; fn default() -> Self {
glib::Object::new()
fn handle_sdp( }
&mut self,
element: &WebRTCSink,
session_id: &str,
sdp: &gst_webrtc::WebRTCSessionDescription,
) -> Result<(), Box<dyn Error>>;
/// sdp_mid is exposed for future proofing, see
/// https://gitlab.freedesktop.org/gstreamer/gst-plugins-bad/-/issues/1174,
/// at the moment sdp_m_line_index will always be Some and sdp_mid will always
/// be None
fn handle_ice(
&mut self,
element: &WebRTCSink,
session_id: &str,
candidate: &str,
sdp_m_line_index: Option<u32>,
sdp_mid: Option<String>,
) -> Result<(), Box<dyn Error>>;
fn session_ended(&mut self, element: &WebRTCSink, session_id: &str);
fn stop(&mut self, element: &WebRTCSink);
} }
/// When providing a signaller, we expect it to both be a GObject
/// and be Signallable. This is arguably a bit strange, but exposing
/// a GInterface from rust is at the moment a bit awkward, so I went
/// for a rust interface for now. The reason the signaller needs to be
/// a GObject is to make its properties available through the GstChildProxy
/// interface.
pub trait SignallableObject: AsRef<glib::Object> + Signallable {}
impl<T: AsRef<glib::Object> + Signallable> SignallableObject for T {}
impl WebRTCSink { impl WebRTCSink {
pub fn with_signaller(signaller: Box<dyn SignallableObject>) -> Self { pub fn with_signaller(signaller: Signallable) -> Self {
let ret = glib::Object::new::<WebRTCSink>(); let ret: WebRTCSink = glib::Object::new();
let ws = ret.imp(); let ws = ret.imp();
ws.set_signaller(signaller).unwrap(); ws.set_signaller(signaller).unwrap();
ret ret
} }
pub fn handle_sdp(
&self,
session_id: &str,
sdp: &gst_webrtc::WebRTCSessionDescription,
) -> Result<(), WebRTCSinkError> {
let ws = self.imp();
ws.handle_sdp(self, session_id, sdp)
}
/// sdp_mid is exposed for future proofing, see
/// https://gitlab.freedesktop.org/gstreamer/gst-plugins-bad/-/issues/1174,
/// at the moment sdp_m_line_index must be Some
pub fn handle_ice(
&self,
session_id: &str,
sdp_m_line_index: Option<u32>,
sdp_mid: Option<String>,
candidate: &str,
) -> Result<(), WebRTCSinkError> {
let ws = self.imp();
ws.handle_ice(self, session_id, sdp_m_line_index, sdp_mid, candidate)
}
pub fn handle_signalling_error(&self, error: Box<dyn Error + Send + Sync>) {
let ws = self.imp();
ws.handle_signalling_error(self, anyhow::anyhow!(error));
}
pub fn shutdown(&self) {
let ws = self.imp();
ws.shutdown(self);
}
pub fn start_session(
&self,
session_id: &str,
peer_id: &str,
offer: Option<&gst_webrtc::WebRTCSessionDescription>,
) -> Result<(), WebRTCSinkError> {
let ws = self.imp();
ws.start_session(self, session_id, peer_id, offer)
}
pub fn end_session(&self, session_id: &str) -> Result<(), WebRTCSinkError> {
let ws = self.imp();
ws.remove_session(self, session_id, false)
}
} }
#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Clone, Copy, glib::Enum)] #[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Clone, Copy, glib::Enum)]

View file

@ -2,7 +2,7 @@
use gst::prelude::*; use gst::prelude::*;
use crate::webrtcsrc::signaller::{prelude::*, Signallable, Signaller}; use crate::signaller::{prelude::*, Signallable, Signaller};
use crate::webrtcsrc::WebRTCSrcPad; use crate::webrtcsrc::WebRTCSrcPad;
use anyhow::{Context, Error}; use anyhow::{Context, Error};
use core::ops::Deref; use core::ops::Deref;

View file

@ -1,9 +1,4 @@
// SPDX-License-Identifier: MPL-2.0 // SPDX-License-Identifier: MPL-2.0
use crate::webrtcsrc::signaller::WebRTCSignallerRole;
use gst::prelude::*;
use gst::{glib, prelude::StaticType};
/** /**
* element-webrtcsrc: * element-webrtcsrc:
* *
@ -38,11 +33,11 @@ use gst::{glib, prelude::StaticType};
*/ */
mod imp; mod imp;
mod pad; mod pad;
pub mod signaller;
pub use signaller::{SignallableImpl, SignallableImplExt}; use crate::signaller::Signallable;
use crate::signaller::WebRTCSignallerRole;
use self::signaller::Signallable; use gst::prelude::*;
use gst::{glib, prelude::StaticType};
glib::wrapper! { glib::wrapper! {
pub struct WebRTCSrc(ObjectSubclass<imp::WebRTCSrc>) @extends gst::Bin, gst::Element, gst::Object, @implements gst::URIHandler, gst::ChildProxy; pub struct WebRTCSrc(ObjectSubclass<imp::WebRTCSrc>) @extends gst::Bin, gst::Element, gst::Object, @implements gst::URIHandler, gst::ChildProxy;

View file

@ -1,584 +0,0 @@
use crate::utils::{gvalue_to_json, serialize_json_object};
use crate::webrtcsrc::signaller::{prelude::*, Signallable};
use crate::RUNTIME;
use anyhow::{anyhow, Error};
use async_tungstenite::tungstenite::Message as WsMessage;
use futures::channel::mpsc;
use futures::prelude::*;
use gst::glib;
use gst::glib::prelude::*;
use gst::subclass::prelude::*;
use gst_plugin_webrtc_protocol as p;
use once_cell::sync::Lazy;
use std::collections::HashSet;
use std::ops::ControlFlow;
use std::str::FromStr;
use std::sync::Mutex;
use std::time::Duration;
use tokio::{task, time::timeout};
use url::Url;
use super::CAT;
#[derive(Debug, Eq, PartialEq, Clone, Copy, glib::Enum, Default)]
#[repr(u32)]
#[enum_type(name = "GstRSWebRTCSignallerRole")]
pub enum WebRTCSignallerRole {
#[default]
Consumer,
Producer,
Listener,
}
pub struct Settings {
uri: Url,
producer_peer_id: Option<String>,
cafile: Option<String>,
role: WebRTCSignallerRole,
}
impl Default for Settings {
fn default() -> Self {
Self {
uri: Url::from_str("ws://127.0.0.1:8443").unwrap(),
producer_peer_id: None,
cafile: Default::default(),
role: Default::default(),
}
}
}
#[derive(Default)]
pub struct Signaller {
state: Mutex<State>,
settings: Mutex<Settings>,
}
#[derive(Default)]
struct State {
/// Sender for the websocket messages
websocket_sender: Option<mpsc::Sender<p::IncomingMessage>>,
send_task_handle: Option<task::JoinHandle<Result<(), Error>>>,
receive_task_handle: Option<task::JoinHandle<()>>,
producers: HashSet<String>,
}
impl Signaller {
fn uri(&self) -> Url {
self.settings.lock().unwrap().uri.clone()
}
fn set_uri(&self, uri: &str) -> Result<(), Error> {
let mut settings = self.settings.lock().unwrap();
let mut uri = Url::from_str(uri).map_err(|err| anyhow!("{err:?}"))?;
if let Some(peer_id) = uri
.query_pairs()
.find(|(k, _)| k == "peer-id")
.map(|v| v.1.to_string())
{
if !matches!(settings.role, WebRTCSignallerRole::Consumer) {
gst::warning!(
CAT,
"Setting peer-id doesn't make sense for {:?}",
settings.role
);
} else {
settings.producer_peer_id = Some(peer_id);
}
}
if let Some(peer_id) = &settings.producer_peer_id {
uri.query_pairs_mut()
.clear()
.append_pair("peer-id", peer_id);
}
settings.uri = uri;
Ok(())
}
async fn connect(&self) -> Result<(), Error> {
let obj = self.obj();
let role = self.settings.lock().unwrap().role;
if let super::WebRTCSignallerRole::Consumer = role {
self.producer_peer_id()
.ok_or_else(|| anyhow!("No target producer peer id set"))?;
}
let connector = if let Some(path) = obj.property::<Option<String>>("cafile") {
let cert = tokio::fs::read_to_string(&path).await?;
let cert = tokio_native_tls::native_tls::Certificate::from_pem(cert.as_bytes())?;
let mut connector_builder = tokio_native_tls::native_tls::TlsConnector::builder();
let connector = connector_builder.add_root_certificate(cert).build()?;
Some(tokio_native_tls::TlsConnector::from(connector))
} else {
None
};
let mut uri = self.uri();
uri.set_query(None);
let (ws, _) = timeout(
// FIXME: Make the timeout configurable
Duration::from_secs(20),
async_tungstenite::tokio::connect_async_with_tls_connector(uri.to_string(), connector),
)
.await??;
gst::info!(CAT, imp: self, "connected");
// Channel for asynchronously sending out websocket message
let (mut ws_sink, mut ws_stream) = ws.split();
// 1000 is completely arbitrary, we simply don't want infinite piling
// up of messages as with unbounded
let (websocket_sender, mut websocket_receiver) = mpsc::channel::<p::IncomingMessage>(1000);
let send_task_handle =
RUNTIME.spawn(glib::clone!(@weak-allow-none self as this => async move {
while let Some(msg) = websocket_receiver.next().await {
gst::log!(CAT, "Sending websocket message {:?}", msg);
ws_sink
.send(WsMessage::Text(serde_json::to_string(&msg).unwrap()))
.await?;
}
let msg = "Done sending";
this.map_or_else(|| gst::info!(CAT, "{msg}"),
|this| gst::info!(CAT, imp: this, "{msg}")
);
ws_sink.send(WsMessage::Close(None)).await?;
ws_sink.close().await?;
Ok::<(), Error>(())
}));
let obj = self.obj();
let meta =
if let Some(meta) = obj.emit_by_name::<Option<gst::Structure>>("request-meta", &[]) {
gvalue_to_json(&meta.to_value())
} else {
None
};
let receive_task_handle =
RUNTIME.spawn(glib::clone!(@weak-allow-none self as this => async move {
while let Some(msg) = tokio_stream::StreamExt::next(&mut ws_stream).await {
if let Some(ref this) = this {
if let ControlFlow::Break(_) = this.handle_message(msg, &meta) {
break;
}
} else {
break;
}
}
let msg = "Stopped websocket receiving";
this.map_or_else(|| gst::info!(CAT, "{msg}"),
|this| gst::info!(CAT, imp: this, "{msg}")
);
}));
let mut state = self.state.lock().unwrap();
state.websocket_sender = Some(websocket_sender);
state.send_task_handle = Some(send_task_handle);
state.receive_task_handle = Some(receive_task_handle);
Ok(())
}
fn set_status(&self, meta: &Option<serde_json::Value>, peer_id: &str) {
let role = self.settings.lock().unwrap().role;
self.send(p::IncomingMessage::SetPeerStatus(match role {
super::WebRTCSignallerRole::Consumer => p::PeerStatus {
meta: meta.clone(),
peer_id: Some(peer_id.to_string()),
roles: vec![],
},
super::WebRTCSignallerRole::Producer => p::PeerStatus {
meta: meta.clone(),
peer_id: Some(peer_id.to_string()),
roles: vec![p::PeerRole::Producer],
},
super::WebRTCSignallerRole::Listener => p::PeerStatus {
meta: meta.clone(),
peer_id: Some(peer_id.to_string()),
roles: vec![p::PeerRole::Listener],
},
}));
}
fn producer_peer_id(&self) -> Option<String> {
let settings = self.settings.lock().unwrap();
settings.producer_peer_id.clone()
}
fn send(&self, msg: p::IncomingMessage) {
let state = self.state.lock().unwrap();
if let Some(mut sender) = state.websocket_sender.clone() {
RUNTIME.spawn(glib::clone!(@weak self as this => async move {
if let Err(err) = sender.send(msg).await {
this.obj().emit_by_name::<()>("error", &[&format!("Error: {}", err)]);
}
}));
}
}
pub fn start_session(&self) {
let role = self.settings.lock().unwrap().role;
if matches!(role, super::WebRTCSignallerRole::Consumer) {
let target_producer = self.producer_peer_id().unwrap();
self.send(p::IncomingMessage::StartSession(p::StartSessionMessage {
peer_id: target_producer.clone(),
}));
gst::info!(
CAT,
imp: self,
"Started session with producer peer id {target_producer}",
);
}
}
fn handle_message(
&self,
msg: Result<WsMessage, async_tungstenite::tungstenite::Error>,
meta: &Option<serde_json::Value>,
) -> ControlFlow<()> {
match msg {
Ok(WsMessage::Text(msg)) => {
gst::trace!(CAT, imp: self, "Received message {}", msg);
if let Ok(msg) = serde_json::from_str::<p::OutgoingMessage>(&msg) {
match msg {
p::OutgoingMessage::Welcome { peer_id } => {
self.set_status(meta, &peer_id);
self.start_session();
}
p::OutgoingMessage::PeerStatusChanged(p::PeerStatus {
meta,
roles,
peer_id,
}) => {
let meta = meta.and_then(|m| match m {
serde_json::Value::Object(v) => Some(serialize_json_object(&v)),
_ => {
gst::error!(CAT, imp: self, "Invalid json value: {m:?}");
None
}
});
let peer_id =
peer_id.expect("Status changed should always contain a peer ID");
let mut state = self.state.lock().unwrap();
if roles.iter().any(|r| matches!(r, p::PeerRole::Producer)) {
if !state.producers.contains(&peer_id) {
state.producers.insert(peer_id.clone());
drop(state);
self.obj()
.emit_by_name::<()>("producer-added", &[&peer_id, &meta]);
}
} else if state.producers.remove(&peer_id) {
drop(state);
self.obj()
.emit_by_name::<()>("producer-removed", &[&peer_id, &meta]);
}
}
p::OutgoingMessage::SessionStarted {
peer_id,
session_id,
} => {
self.obj()
.emit_by_name::<()>("session-started", &[&session_id, &peer_id]);
}
p::OutgoingMessage::StartSession {
session_id,
peer_id,
} => {
assert!(matches!(
self.obj().property::<WebRTCSignallerRole>("role"),
super::WebRTCSignallerRole::Producer
));
self.obj()
.emit_by_name::<()>("session-requested", &[&session_id, &peer_id]);
}
p::OutgoingMessage::EndSession(p::EndSessionMessage { session_id }) => {
gst::info!(CAT, imp: self, "Session {session_id} ended");
self.obj()
.emit_by_name::<()>("session-ended", &[&session_id]);
}
p::OutgoingMessage::Peer(p::PeerMessage {
session_id,
peer_message,
}) => match peer_message {
p::PeerMessageInner::Sdp(reply) => {
let (sdp, desc_type) = match reply {
p::SdpMessage::Answer { sdp } => {
(sdp, gst_webrtc::WebRTCSDPType::Answer)
}
p::SdpMessage::Offer { sdp } => {
(sdp, gst_webrtc::WebRTCSDPType::Offer)
}
};
let sdp = match gst_sdp::SDPMessage::parse_buffer(sdp.as_bytes()) {
Ok(sdp) => sdp,
Err(err) => {
self.obj().emit_by_name::<()>(
"error",
&[&format!("Error parsing SDP: {sdp} {err:?}")],
);
return ControlFlow::Break(());
}
};
let desc =
gst_webrtc::WebRTCSessionDescription::new(desc_type, sdp);
self.obj().emit_by_name::<()>(
"session-description",
&[&session_id, &desc],
);
}
p::PeerMessageInner::Ice {
candidate,
sdp_m_line_index,
} => {
let sdp_mid: Option<String> = None;
self.obj().emit_by_name::<()>(
"handle-ice",
&[&session_id, &sdp_m_line_index, &sdp_mid, &candidate],
);
}
},
p::OutgoingMessage::Error { details } => {
self.obj().emit_by_name::<()>(
"error",
&[&format!("Error message from server: {details}")],
);
}
_ => {
gst::warning!(CAT, imp: self, "Ignoring unsupported message {:?}", msg);
}
}
} else {
gst::error!(CAT, imp: self, "Unknown message from server: {}", msg);
self.obj().emit_by_name::<()>(
"error",
&[&format!("Unknown message from server: {}", msg)],
);
}
}
Ok(WsMessage::Close(reason)) => {
gst::info!(CAT, imp: self, "websocket connection closed: {:?}", reason);
return ControlFlow::Break(());
}
Ok(_) => (),
Err(err) => {
self.obj()
.emit_by_name::<()>("error", &[&format!("Error receiving: {}", err)]);
return ControlFlow::Break(());
}
}
ControlFlow::Continue(())
}
}
#[glib::object_subclass]
impl ObjectSubclass for Signaller {
const NAME: &'static str = "GstWebRTCSignaller";
type Type = super::Signaller;
type ParentType = glib::Object;
type Interfaces = (Signallable,);
}
impl ObjectImpl for Signaller {
fn properties() -> &'static [glib::ParamSpec] {
static PROPS: Lazy<Vec<glib::ParamSpec>> = Lazy::new(|| {
vec![
glib::ParamSpecString::builder("uri")
.flags(glib::ParamFlags::READWRITE)
.build(),
glib::ParamSpecString::builder("producer-peer-id")
.flags(glib::ParamFlags::READWRITE)
.build(),
glib::ParamSpecString::builder("cafile")
.flags(glib::ParamFlags::READWRITE)
.build(),
glib::ParamSpecEnum::builder_with_default("role", WebRTCSignallerRole::Consumer)
.flags(glib::ParamFlags::READWRITE)
.build(),
]
});
PROPS.as_ref()
}
fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) {
match pspec.name() {
"uri" => {
if let Err(e) = self.set_uri(value.get::<&str>().expect("type checked upstream")) {
gst::error!(CAT, "Couldn't set URI: {e:?}");
}
}
"producer-peer-id" => {
let mut settings = self.settings.lock().unwrap();
if !matches!(settings.role, WebRTCSignallerRole::Consumer) {
gst::warning!(
CAT,
"Setting `producer-peer-id` doesn't make sense for {:?}",
settings.role
);
} else {
settings.producer_peer_id = value
.get::<Option<String>>()
.expect("type checked upstream");
}
}
"cafile" => {
self.settings.lock().unwrap().cafile = value
.get::<Option<String>>()
.expect("type checked upstream")
}
"role" => {
self.settings.lock().unwrap().role = value
.get::<WebRTCSignallerRole>()
.expect("type checked upstream")
}
_ => unimplemented!(),
}
}
fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
let settings = self.settings.lock().unwrap();
match pspec.name() {
"uri" => settings.uri.to_string().to_value(),
"producer-peer-id" => {
if !matches!(settings.role, WebRTCSignallerRole::Consumer) {
gst::warning!(
CAT,
"`producer-peer-id` doesn't make sense for {:?}",
settings.role
);
}
settings.producer_peer_id.to_value()
}
"cafile" => settings.cafile.to_value(),
"role" => settings.role.to_value(),
_ => unimplemented!(),
}
}
}
impl SignallableImpl for Signaller {
fn start(&self) {
gst::info!(CAT, imp: self, "Starting");
RUNTIME.spawn(glib::clone!(@weak self as this => async move {
if let Err(err) = this.connect().await {
this.obj().emit_by_name::<()>("error", &[&format!("Error receiving: {}", err)]);
}
}));
}
fn stop(&self) {
gst::info!(CAT, imp: self, "Stopping now");
let mut state = self.state.lock().unwrap();
let send_task_handle = state.send_task_handle.take();
let receive_task_handle = state.receive_task_handle.take();
if let Some(mut sender) = state.websocket_sender.take() {
RUNTIME.block_on(async move {
sender.close_channel();
if let Some(handle) = send_task_handle {
if let Err(err) = handle.await {
gst::warning!(CAT, imp: self, "Error while joining send task: {}", err);
}
}
if let Some(handle) = receive_task_handle {
if let Err(err) = handle.await {
gst::warning!(CAT, imp: self, "Error while joining receive task: {}", err);
}
}
});
}
}
fn send_sdp(&self, session_id: &str, sdp: &gst_webrtc::WebRTCSessionDescription) {
gst::debug!(CAT, imp: self, "Sending SDP {sdp:#?}");
let role = self.settings.lock().unwrap().role;
let is_consumer = matches!(role, super::WebRTCSignallerRole::Consumer);
let msg = p::IncomingMessage::Peer(p::PeerMessage {
session_id: session_id.to_owned(),
peer_message: p::PeerMessageInner::Sdp(if is_consumer {
p::SdpMessage::Answer {
sdp: sdp.sdp().as_text().unwrap(),
}
} else {
p::SdpMessage::Offer {
sdp: sdp.sdp().as_text().unwrap(),
}
}),
});
self.send(msg);
}
fn add_ice(
&self,
session_id: &str,
candidate: &str,
sdp_m_line_index: Option<u32>,
_sdp_mid: Option<String>,
) {
gst::debug!(
CAT,
imp: self,
"Adding ice candidate {candidate:?} for {sdp_m_line_index:?} on session {session_id}"
);
let msg = p::IncomingMessage::Peer(p::PeerMessage {
session_id: session_id.to_string(),
peer_message: p::PeerMessageInner::Ice {
candidate: candidate.to_string(),
sdp_m_line_index: sdp_m_line_index.unwrap(),
},
});
self.send(msg);
}
fn end_session(&self, session_id: &str) {
gst::debug!(CAT, imp: self, "Signalling session done {}", session_id);
let state = self.state.lock().unwrap();
let session_id = session_id.to_string();
if let Some(mut sender) = state.websocket_sender.clone() {
RUNTIME.spawn(glib::clone!(@weak self as this => async move {
if let Err(err) = sender
.send(p::IncomingMessage::EndSession(p::EndSessionMessage {
session_id,
}))
.await
{
this.obj().emit_by_name::<()>("error", &[&format!("Error: {}", err)]);
}
}));
}
}
}
impl GstObjectImpl for Signaller {}

View file

@ -1,46 +0,0 @@
mod iface;
mod imp;
use gst::glib;
use once_cell::sync::Lazy;
// Expose traits and objects from the module itself so it exactly looks like
// generated bindings
pub use imp::WebRTCSignallerRole;
pub mod prelude {
pub use {super::SignallableExt, super::SignallableImpl};
}
pub static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
gst::DebugCategory::new(
"webrtcsrc-signaller",
gst::DebugColorFlags::empty(),
Some("WebRTC src signaller"),
)
});
glib::wrapper! {
pub struct Signallable(ObjectInterface<iface::Signallable>);
}
glib::wrapper! {
pub struct Signaller(ObjectSubclass <imp::Signaller>) @implements Signallable;
}
impl Default for Signaller {
fn default() -> Self {
glib::Object::builder().build()
}
}
impl Signaller {
pub fn new(mode: WebRTCSignallerRole) -> Self {
glib::Object::builder().property("role", &mode).build()
}
}
pub use iface::SignallableExt;
pub use iface::SignallableImpl;
pub use iface::SignallableImplExt;
unsafe impl Send for Signallable {}
unsafe impl Sync for Signallable {}