diff --git a/Cargo.lock b/Cargo.lock index b4cbd654..bd69693c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2926,12 +2926,14 @@ dependencies = [ "data-encoding", "fastrand", "futures", + "gst-plugin-rtp", "gst-plugin-version-helper", "gst-plugin-webrtc-signalling-protocol", "gstreamer", "gstreamer-app", "gstreamer-audio", "gstreamer-base", + "gstreamer-net", "gstreamer-rtp", "gstreamer-sdp", "gstreamer-utils", diff --git a/net/webrtc/Cargo.toml b/net/webrtc/Cargo.toml index c5ef0379..8619e00e 100644 --- a/net/webrtc/Cargo.toml +++ b/net/webrtc/Cargo.toml @@ -61,6 +61,8 @@ rand = "0.8" once_cell.workspace = true [dev-dependencies] +gst-net = { workspace = true, features = ["v1_20"] } +gst-plugin-rtp = { path = "../rtp" } tracing = { version = "0.1", features = ["log"] } tracing-subscriber = { version = "0.3", features = ["registry", "env-filter"] } tracing-log = "0.2" @@ -103,3 +105,9 @@ name = "webrtcsink-high-quality-tune" [[example]] name = "webrtcsink-custom-signaller" + +[[example]] +name = "webrtc-precise-sync-send" + +[[example]] +name = "webrtc-precise-sync-recv" diff --git a/net/webrtc/examples/README.md b/net/webrtc/examples/README.md index a2a9ce9e..16340b48 100644 --- a/net/webrtc/examples/README.md +++ b/net/webrtc/examples/README.md @@ -21,3 +21,89 @@ run an example client. An example of custom signaller implementation, see the corresponding [README](webrtcsink-custom-signaller/README.md) for more details on code and usage. + +## WebRTC precise synchronization example + +This example demonstrates a sender / receiver setup which ensures precise +synchronization of multiple streams in a single session. + +[RFC 6051]-style rapid synchronization of RTP streams is available as an option. +Se the [Instantaneous RTP synchronization...] blog post for details about this +mode and an example based on RTSP instead of WebRTC. + +[RFC 6051]: https://datatracker.ietf.org/doc/html/rfc6051 +[Instantaneous RTP synchronization...]: https://coaxion.net/blog/2022/05/instantaneous-rtp-synchronization-retrieval-of-absolute-sender-clock-times-with-gstreamer/ + +### Signaller + +The example uses the default WebRTC signaller. Launch it using the following +command: + +```shell +cargo run --bin gst-webrtc-signalling-server +``` + +### Receiver + +The receiver awaits for new audio & video stream publishers and render the +streams using auto sink elements. Launch it using the following command: + +```shell +cargo r --example webrtc-precise-sync-recv +``` + +The default configuration should work for a local test. For a multi-host setup, +see the available options: + +```shell +cargo r --example webrtc-precise-sync-recv -- --help +``` + +E.g.: the following will force `avdec_h264` over hardware decoders, activate +debug logs for the receiver and connect to the signalling server at the +specified address: + +```shell +GST_PLUGIN_FEATURE_RANK=avdec_h264:512 \ +WEBRTC_PRECISE_SYNC_RECV_LOG=debug \ +cargo r --example webrtc-precise-sync-recv -- --server 192.168.1.22 +``` + +### Sender + +The sender publishes audio & video test streams. Launch it using the following +command: + +```shell +cargo r --example webrtc-precise-sync-send +``` + +The default configuration should work for a local test. For a multi-host setup, +to set the number of audio / video streams, to enable rapid synchronization or +to force the video encoder, see the available options: + +```shell +cargo r --example webrtc-precise-sync-send -- --help +``` + +E.g.: the following will force H264 and `x264enc` over hardware encoders, +activate debug logs for the sender and connect to the signalling server at the +specified address: + +```shell +GST_PLUGIN_FEATURE_RANK=264enc:512 \ +WEBRTC_PRECISE_SYNC_SEND_LOG=debug \ +cargo r --example webrtc-precise-sync-send -- \ + --server 192.168.1.22 --video-caps video/x-h264 +``` + +### The pipeline latency + +The `--pipeline-latency` argument configures a static latency of 1s by default. +This needs to be higher than the sum of the sender latency and the receiver +latency of the receiver with the highest latency. As this can't be known +automatically and depends on many factors, this has to be known for the overall +system and configured accordingly. + +The default configuration is on the safe side and favors synchronization over +low latency. Depending on the use case, shorter or larger values should be used. diff --git a/net/webrtc/examples/webrtc-precise-sync-recv.rs b/net/webrtc/examples/webrtc-precise-sync-recv.rs new file mode 100644 index 00000000..fdc2c804 --- /dev/null +++ b/net/webrtc/examples/webrtc-precise-sync-recv.rs @@ -0,0 +1,682 @@ +use anyhow::{anyhow, bail, Context}; +use async_tungstenite::tungstenite::Message; +use futures::prelude::*; +use futures::{pin_mut, select, select_biased}; +use gst::prelude::*; +use tracing::{debug, error, info, trace}; +use url::Url; + +use std::pin::Pin; +use std::sync::Arc; + +use gst_plugin_webrtc_protocol::{ + IncomingMessage as ToSignaller, OutgoingMessage as FromSignaller, PeerRole, PeerStatus, +}; + +#[derive(Debug, Default, Clone, clap::Parser)] +struct Args { + #[clap(long, help = "Pipeline latency (ms)", default_value = "1000")] + pub pipeline_latency: u64, + + #[clap(long, help = "RTP jitterbuffer latency (ms)", default_value = "40")] + pub rtp_latency: u32, + + #[clap( + long, + help = "Maximum duration in seconds to wait for clock synchronization", + default_value = "5" + )] + pub clock_sync_timeout: u64, + + #[clap(long, help = "NTP server host", default_value = "pool.ntp.org")] + pub ntp_server: String, + + #[clap(long, help = "Signalling server host", default_value = "localhost")] + pub server: String, + + #[clap(long, help = "Signalling server port", default_value = "8443")] + pub port: u32, + + #[clap(long, help = "use tls")] + pub use_tls: bool, +} + +impl Args { + pub fn scheme(&self) -> &str { + if self.use_tls { + "wss" + } else { + "ws" + } + } +} + +fn spawn_consumer( + signaller_url: &Url, + pipeline: &gst::Pipeline, + args: Arc, + peer_id: String, + meta: Option, +) -> anyhow::Result<()> { + info!(%peer_id, ?meta, "Spawning consumer"); + + let bin = gst::Bin::with_name(&peer_id); + pipeline.add(&bin).context("Adding consumer bin")?; + + let webrtcsrc = gst::ElementFactory::make("webrtcsrc") + .build() + .context("Creating webrtcsrc")?; + + bin.add(&webrtcsrc).context("Adding webrtcsrc")?; + + let signaller = webrtcsrc.property::("signaller"); + signaller.set_property("uri", signaller_url.as_str()); + signaller.set_property("producer-peer-id", &peer_id); + + signaller.connect("webrtcbin-ready", false, { + let cli_args = args.clone(); + move |args| { + let webrtcbin = args[2].get::().unwrap(); + webrtcbin.set_property("latency", cli_args.rtp_latency); + + let rtpbin = webrtcbin + .downcast_ref::() + .unwrap() + .by_name("rtpbin") + .unwrap(); + + rtpbin.set_property("add-reference-timestamp-meta", true); + + // Configure for network synchronization via the RTP NTP timestamps. + // This requires that sender and receiver are synchronized to the same + // clock. + rtpbin.set_property_from_str("buffer-mode", "synced"); + rtpbin.set_property("ntp-sync", true); + + // Don't bother updating inter-stream offsets if the difference to the previous + // configuration is less than 1ms. The synchronization will have rounding errors + // in the range of the RTP clock rate, i.e. 1/90000s and 1/48000s in this case + rtpbin.set_property("min-ts-offset", gst::ClockTime::MSECOND); + + None + } + }); + + webrtcsrc.connect_pad_added({ + move |webrtcsrc, pad| { + let Some(bin) = webrtcsrc + .parent() + .map(|b| b.downcast::().expect("webrtcsrc added to a bin")) + else { + return; + }; + + if pad.name().starts_with("audio") { + let audioconvert = gst::ElementFactory::make("audioconvert") + .build() + .expect("Checked in prepare()"); + let audioresample = gst::ElementFactory::make("audioresample") + .build() + .expect("Checked in prepare()"); + // Decouple processing from sync a bit + let queue = gst::ElementFactory::make("queue") + .property("max-size-buffers", 1u32) + .property("max-size-bytes", 0u32) + .property("max-size-time", 0u64) + .build() + .expect("Checked in prepare()"); + let audiosink = gst::ElementFactory::make("autoaudiosink") + .build() + .expect("Checked in prepare()"); + bin.add_many([&audioconvert, &audioresample, &queue, &audiosink]) + .unwrap(); + + pad.link(&audioconvert.static_pad("sink").unwrap()).unwrap(); + gst::Element::link_many([&audioconvert, &audioresample, &queue, &audiosink]) + .unwrap(); + + audiosink.sync_state_with_parent().unwrap(); + queue.sync_state_with_parent().unwrap(); + audioresample.sync_state_with_parent().unwrap(); + audioconvert.sync_state_with_parent().unwrap(); + } else if pad.name().starts_with("video") { + // Create a timeoverlay element to render the timestamps from + // the reference timestamp metadata on top of the video frames + // in the bottom left. + // + // Also add a pad probe on its sink pad to log the same timestamp to + // stdout on each frame. + let timeoverlay = gst::ElementFactory::make("timeoverlay") + .property_from_str("time-mode", "reference-timestamp") + .property_from_str("valignment", "bottom") + .build() + .expect("Checked in prepare()"); + let sinkpad = timeoverlay + .static_pad("video_sink") + .expect("Failed to get timeoverlay sinkpad"); + sinkpad + .add_probe(gst::PadProbeType::BUFFER, |_pad, info| { + if let Some(gst::PadProbeData::Buffer(ref buffer)) = info.data { + if let Some(meta) = buffer.meta::() { + trace!(timestamp = %meta.timestamp(), "Have sender clock time"); + } else { + trace!("Have no sender clock time"); + } + } + + gst::PadProbeReturn::Ok + }) + .expect("Failed to add timeoverlay pad probe"); + + let videoconvert = gst::ElementFactory::make("videoconvert") + .build() + .expect("Checked in prepare()"); + // Decouple processing from sync a bit + let queue = gst::ElementFactory::make("queue") + .property("max-size-buffers", 1u32) + .property("max-size-bytes", 0u32) + .property("max-size-time", 0u64) + .build() + .expect("Checked in prepare()"); + let videosink = gst::ElementFactory::make("autovideosink") + .build() + .expect("Checked in prepare()"); + + bin.add_many([&timeoverlay, &videoconvert, &queue, &videosink]) + .unwrap(); + + pad.link(&sinkpad).unwrap(); + gst::Element::link_many([&timeoverlay, &videoconvert, &queue, &videosink]).unwrap(); + + videosink.sync_state_with_parent().unwrap(); + queue.sync_state_with_parent().unwrap(); + videoconvert.sync_state_with_parent().unwrap(); + timeoverlay.sync_state_with_parent().unwrap(); + } + } + }); + + signaller.connect("session-ended", true, { + let bin = bin.downgrade(); + move |_| { + info!(%peer_id, "Session ended"); + + let Some(bin) = bin.upgrade() else { + return Some(false.into()); + }; + + bin.call_async(|bin| { + let _ = bin.set_state(gst::State::Null); + + if let Some(pipeline) = bin.parent().map(|p| { + p.downcast::() + .expect("Bin added to the pipeline") + }) { + let _ = pipeline.remove(bin); + } + }); + + Some(false.into()) + } + }); + + bin.sync_state_with_parent() + .context("Syncing consumer bin with pipeline")?; + + Ok(()) +} + +#[derive(Debug, Default)] +struct App { + args: Arc, + pipeline: Option, + listener_abort_hdl: Option, + listener_task_hdl: Option>>>, +} + +impl App { + fn new(args: Args) -> Self { + App { + args: args.into(), + ..Default::default() + } + } + + #[inline(always)] + fn pipeline(&self) -> &gst::Pipeline { + self.pipeline.as_ref().expect("Set in prepare") + } + + async fn prepare_and_run(&mut self) -> anyhow::Result<()> { + self.prepare().await.context("Preparing")?; + self.run().await.context("Running")?; + + Ok(()) + } + + async fn prepare(&mut self) -> anyhow::Result<()> { + debug!("Preparing"); + + // Check availability of elements which might be created in webrtcsrc.connect_pad_added() + let mut missing_elements = String::new(); + for elem in [ + "queue", + "audioconvert", + "audioresample", + "autovideosink", + "timeoverlay", + "videoconvert", + "autovideosink", + ] { + if gst::ElementFactory::find(elem).is_none() { + missing_elements.push_str("\n\t- "); + missing_elements.push_str(elem); + } + } + if !missing_elements.is_empty() { + bail!("Missing elements:{}", missing_elements); + } + + debug!("Syncing to NTP clock {}", self.args.ntp_server); + + // Create the NTP clock and wait for synchronization. + let clock = tokio::task::spawn_blocking({ + let clock = + gst_net::NtpClock::new(None, &self.args.ntp_server, 123, gst::ClockTime::ZERO); + let clock_sync_timeout = gst::ClockTime::from_seconds(self.args.clock_sync_timeout); + move || -> anyhow::Result { + clock.wait_for_sync(clock_sync_timeout)?; + + Ok(clock) + } + }) + .await + .context("Syncing to NTP clock")??; + + info!("Synced to NTP clock"); + + self.pipeline = Some(gst::Pipeline::new()); + self.pipeline().use_clock(Some(&clock)); + + // Set the base time of the pipeline statically to zero so that running + // time and clock time are the same. This easies debugging. + self.pipeline().set_base_time(gst::ClockTime::ZERO); + self.pipeline().set_start_time(gst::ClockTime::NONE); + + // Configure a static latency (1s by default). + // This needs to be higher than the sum of the sender latency and + // the receiver latency of the receiver with the highest latency. + // As this can't be known automatically and depends on many factors, + // this has to be known for the overall system and configured accordingly. + self.pipeline() + .set_latency(gst::ClockTime::from_mseconds(self.args.pipeline_latency)); + + let signaller_url = Url::parse(&format!( + "{}://{}:{}", + self.args.scheme(), + self.args.server, + self.args.port, + ))?; + + let (signaller_tx, signaller_rx) = connect_as_listener(&signaller_url) + .await + .context("Connecting as listener")?; + + let (listener_abort_hdl, listener_abort_reg) = future::AbortHandle::new_pair(); + self.listener_abort_hdl = Some(listener_abort_hdl); + + self.listener_task_hdl = Some( + tokio::task::spawn(listener_task( + listener_abort_reg, + signaller_tx, + signaller_rx, + signaller_url, + self.pipeline().clone(), + self.args.clone(), + )) + .fuse(), + ); + + Ok(()) + } + + async fn run(&mut self) -> anyhow::Result<()> { + debug!("Running"); + + let bus = self.pipeline().bus().context("Getting the pipeline bus")?; + let mut bus_stream = bus.stream(); + + self.pipeline() + .call_async_future(|pipeline| pipeline.set_state(gst::State::Playing)) + .await + .context("Setting pipeline to Playing")?; + + loop { + select_biased! { + // Don't take listener_task_hdl: we will need it in teardown() + listener_res = self.listener_task_hdl.as_mut().expect("defined in prepare") => { + listener_res.context("Signaller listener task")??; + + info!("Breaking due to signaller listener termination"); + break; + }, + bus_msg = bus_stream.next() => { + use gst::MessageView::*; + + let Some(msg) = bus_msg else { break }; + match msg.view() { + Error(msg) => { + let err = msg.error(); + let src_name = msg.src().map(|src| src.name()); + + bail!( + "Element {} error message: {err:#}", + src_name.as_deref().unwrap_or("UNKNOWN"), + ); + } + Latency(msg) => { + info!( + "Latency requirements have changed for element {}", + msg.src().map(|src| src.name()).as_deref().unwrap_or("UNKNOWN"), + ); + if let Err(err) = self.pipeline().recalculate_latency() { + error!(%err, "Error recalculating latency"); + } + } + _ => (), + } + } + } + } + + Ok(()) + } + + /// Tears this `App` down and deallocates all its resources by consuming `self`. + async fn teardown(mut self) { + debug!("Tearing down"); + + if let Some(ref pipeline) = self.pipeline { + // For debugging purposes: + // define the GST_DEBUG_DUMP_DOT_DIR env var to generate a dot file. + pipeline.debug_to_dot_file( + gst::DebugGraphDetails::all(), + "webrtc-precise-sync-recv-tearing-down", + ); + } + + if let Some(listener_abort_hdl) = self.listener_abort_hdl.take() { + listener_abort_hdl.abort(); + } + + if let Some(pipeline) = self.pipeline.take() { + let _ = pipeline + .call_async_future(|pipeline| pipeline.set_state(gst::State::Null)) + .await; + } + + if let Some(listener_task_hdl) = self.listener_task_hdl.take() { + use future::FusedFuture; + if !listener_task_hdl.is_terminated() { + let _ = listener_task_hdl.await; + } + } + } +} + +async fn connect_as_listener( + signaller_url: &Url, +) -> anyhow::Result<( + Pin>>, + Pin>>>, +)> { + async fn register( + mut signaller_tx: Pin<&mut impl Sink>, + mut signaller_rx: Pin<&mut impl Stream>>, + ) -> anyhow::Result<()> { + match signaller_rx + .next() + .await + .unwrap_or_else(|| Err(anyhow!("Signaller ended session"))) + .context("Expecting Welcome")? + { + FromSignaller::Welcome { peer_id } => { + info!(%peer_id, "Got Welcomed by signaller"); + } + other => bail!("Expected Welcome, got {other:?}"), + } + + debug!("Registering as listener"); + + signaller_tx + .send(ToSignaller::SetPeerStatus(PeerStatus { + roles: vec![PeerRole::Listener], + ..Default::default() + })) + .await + .context("Sending SetPeerStatus")?; + + loop { + let msg = signaller_rx + .next() + .await + .unwrap_or_else(|| Err(anyhow!("Signaller ended session"))) + .context("SetPeerStatus response")?; + + match msg { + FromSignaller::PeerStatusChanged(peer_status) if peer_status.listening() => break, + FromSignaller::EndSession(_) => bail!("Signaller ended session unexpectedly"), + _ => (), + } + } + + Ok(()) + } + + debug!("Connecting to Signaller"); + + let (ws, _) = async_tungstenite::tokio::connect_async(signaller_url) + .await + .context("Connecting to signaller")?; + let (ws_tx, ws_rx) = ws.split(); + + let mut signaller_tx = Box::pin(ws_tx.sink_err_into::().with( + |msg: ToSignaller| { + future::ok(Message::Text( + serde_json::to_string(&msg).expect("msg is serializable"), + )) + }, + )); + + let mut signaller_rx = Box::pin(ws_rx.filter_map(|msg| { + future::ready(match msg { + Ok(Message::Text(msg)) => match serde_json::from_str::(&msg) { + Ok(message) => Some(Ok(message)), + Err(err) => Some(Err(anyhow!( + "Failed to deserialize signaller message: {err:#}", + ))), + }, + Ok(Message::Close(_)) => Some(Err(anyhow!("Connection closed"))), + Ok(Message::Ping(_)) => None, + Ok(other) => Some(Err(anyhow!("Unexpected message {other:?}"))), + Err(err) => Some(Err(err.into())), + }) + })); + + if let Err(err) = register(signaller_tx.as_mut(), signaller_rx.as_mut()) + .await + .context("Registering as listener") + { + debug!("Closing signaller websocket due to {err:#}"); + let _ = signaller_tx.close().await; + + return Err(err); + } + + Ok((signaller_tx, signaller_rx)) +} + +async fn listen( + signaller_tx: &mut Pin>>, + mut signaller_rx: Pin>>>, + signaller_url: Url, + pipeline: gst::Pipeline, + args: Arc, +) -> anyhow::Result<()> { + debug!("Looking for already registered producers"); + + signaller_tx + .send(ToSignaller::List) + .await + .context("Sending List")?; + + loop { + match signaller_rx + .next() + .await + .unwrap_or_else(|| Err(anyhow!("Signaller ended session"))) + .context("List response")? + { + FromSignaller::List { producers } => { + for peer in producers { + spawn_consumer(&signaller_url, &pipeline, args.clone(), peer.id, peer.meta) + .context("Spawning consumer")?; + } + break; + } + FromSignaller::EndSession(_) => bail!("Signaller ended session unexpectedly"), + _ => (), + } + } + + debug!("Listening to signaller"); + + loop { + match signaller_rx + .next() + .await + .unwrap_or_else(|| Err(anyhow!("Signaller ended session"))) + .context("Listening to signaller")? + { + FromSignaller::PeerStatusChanged(peer_status) if peer_status.producing() => { + spawn_consumer( + &signaller_url, + &pipeline, + args.clone(), + peer_status.peer_id.expect("producer with peer_id"), + peer_status.meta, + ) + .context("Spawning consumer")?; + } + FromSignaller::EndSession(_) => { + info!("Signaller ended session"); + break; + } + other => trace!(msg = ?other, "Ignoring signaller message"), + } + } + + Ok(()) +} + +/// Wrapper around the listener. +/// +/// Ensures the websocket is properly closed even if an error occurs or +/// the listener is aborted. +async fn listener_task( + abort_reg: future::AbortRegistration, + mut signaller_tx: Pin>>, + signaller_rx: Pin>>>, + signaller_url: Url, + pipeline: gst::Pipeline, + args: Arc, +) -> anyhow::Result<()> { + let res = future::Abortable::new( + listen( + &mut signaller_tx, + signaller_rx, + signaller_url, + pipeline, + args, + ), + abort_reg, + ) + .await; + + debug!("Closing signaller websocket"); + let _ = signaller_tx.close().await; + + if let Ok(listener_res) = res { + if listener_res.is_err() { + listener_res?; + } + } + + Ok(()) +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + use clap::Parser; + use tracing_subscriber::prelude::*; + + let args = Args::parse(); + + tracing_log::LogTracer::init().context("Setting logger")?; + let env_filter = tracing_subscriber::EnvFilter::try_from_env("WEBRTC_PRECISE_SYNC_RECV_LOG") + .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")); + let fmt_layer = tracing_subscriber::fmt::layer() + .with_thread_ids(true) + .with_target(true) + .with_span_events( + tracing_subscriber::fmt::format::FmtSpan::NEW + | tracing_subscriber::fmt::format::FmtSpan::CLOSE, + ); + let subscriber = tracing_subscriber::Registry::default() + .with(env_filter) + .with(fmt_layer); + tracing::subscriber::set_global_default(subscriber).context("Setting tracing subscriber")?; + + gst::init()?; + gstrswebrtc::plugin_register_static()?; + gstrsrtp::plugin_register_static()?; + + debug!("Starting"); + + let mut res = Ok(()); + let mut app = App::new(args); + + { + let ctrl_c = tokio::signal::ctrl_c().fuse(); + pin_mut!(ctrl_c); + + let prepare_and_run = app.prepare_and_run().fuse(); + pin_mut!(prepare_and_run); + + select! { + _ctrl_c_res = ctrl_c => { + info!("Shutting down due to user request"); + } + app_res = prepare_and_run => { + if let Err(ref err) = app_res { + error!("Shutting down due to application error: {err:#}"); + } else { + info!("Shutting down due to application termination"); + } + + res = app_res; + } + } + } + + app.teardown().await; + + debug!("Quitting"); + + unsafe { + // Needed for certain tracers to write data + gst::deinit(); + } + + res +} diff --git a/net/webrtc/examples/webrtc-precise-sync-send.rs b/net/webrtc/examples/webrtc-precise-sync-send.rs new file mode 100644 index 00000000..cd61a3a9 --- /dev/null +++ b/net/webrtc/examples/webrtc-precise-sync-send.rs @@ -0,0 +1,357 @@ +use anyhow::{bail, Context}; +use futures::prelude::*; +use gst::prelude::*; +use gst_rtp::prelude::*; +use tracing::{debug, error, info}; +use url::Url; + +const VIDEO_PATTERNS: [&str; 3] = ["ball", "smpte", "snow"]; + +#[derive(Debug, Default, Clone, clap::Parser)] +struct Args { + #[clap( + long, + help = "Number of audio streams. Use 0 to disable audio", + default_value = "1" + )] + pub audio_streams: usize, + + #[clap( + long, + help = "Number of video streams. Use 0 to disable video", + default_value = "1" + )] + pub video_streams: usize, + + #[clap(long, help = "Force video caps (ex. 'video/x-h264')")] + pub video_caps: Option, + + #[clap(long, help = "Use RFC 6051 64-bit NTP timestamp RTP header extension.")] + pub enable_rapid_sync: bool, + + #[clap( + long, + help = "Maximum duration in seconds to wait for clock synchronization", + default_value = "5" + )] + pub clock_sync_timeout: u64, + + #[clap(long, help = "NTP server host", default_value = "pool.ntp.org")] + pub ntp_server: String, + + #[clap(long, help = "Signalling server host", default_value = "localhost")] + pub server: String, + + #[clap(long, help = "Signalling server port", default_value = "8443")] + pub port: u32, + + #[clap(long, help = "use tls")] + pub use_tls: bool, +} + +impl Args { + fn scheme(&self) -> &str { + if self.use_tls { + "wss" + } else { + "ws" + } + } +} + +#[derive(Debug, Default)] +struct App { + args: Args, + pipeline: Option, +} + +impl App { + fn new(args: Args) -> Self { + App { + args, + ..Default::default() + } + } + + #[inline(always)] + fn pipeline(&self) -> &gst::Pipeline { + self.pipeline.as_ref().expect("Set in prepare") + } + + async fn prepare_and_run(&mut self) -> anyhow::Result<()> { + self.prepare().await.context("Preparing")?; + self.run().await.context("Running")?; + + Ok(()) + } + + async fn prepare(&mut self) -> anyhow::Result<()> { + debug!("Preparing"); + + debug!("Syncing to NTP clock {}", self.args.ntp_server); + + // Create the NTP clock and wait for synchronization. + let clock = tokio::task::spawn_blocking({ + let clock = + gst_net::NtpClock::new(None, &self.args.ntp_server, 123, gst::ClockTime::ZERO); + let clock_sync_timeout = gst::ClockTime::from_seconds(self.args.clock_sync_timeout); + move || -> anyhow::Result { + clock.wait_for_sync(clock_sync_timeout)?; + + Ok(clock) + } + }) + .await + .context("Syncing to NTP clock")??; + + info!("Synced to NTP clock"); + + self.pipeline = Some(gst::Pipeline::new()); + self.pipeline().use_clock(Some(&clock)); + + // Set the base time of the pipeline statically to zero so that running + // time and clock time are the same and timeoverlay can be used to render + // the clock time over the video frames. + // + // This is needed for no other reasons. + self.pipeline().set_base_time(gst::ClockTime::ZERO); + self.pipeline().set_start_time(gst::ClockTime::NONE); + + let signaller_url = Url::parse(&format!( + "{}://{}:{}", + self.args.scheme(), + self.args.server, + self.args.port, + ))?; + + let webrtcsink = gst::ElementFactory::make("webrtcsink") + // See: + // * https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/issues/497 + // * https://gitlab.freedesktop.org/gstreamer/gstreamer/-/issues/3301 + .property("do-fec", false) + .build() + .context("Creating webrtcsink")?; + + self.pipeline().add(&webrtcsink).unwrap(); + + let signaller = webrtcsink.property::("signaller"); + signaller.set_property("uri", signaller_url.as_str()); + + signaller.connect("webrtcbin-ready", false, |args| { + let webrtcbin = args[2].get::().unwrap(); + + let rtpbin = webrtcbin + .downcast_ref::() + .unwrap() + .by_name("rtpbin") + .unwrap(); + + // Use local pipeline clock time as RTP NTP time source instead of using + // the local wallclock time converted to the NTP epoch. + rtpbin.set_property_from_str("ntp-time-source", "clock-time"); + + // Use the capture time instead of the send time for the RTP / NTP timestamp + // mapping. The difference between the two options is the capture/encoder/etc. + // latency that is introduced before sending. + rtpbin.set_property("rtcp-sync-send-time", false); + + None + }); + + webrtcsink.connect("encoder-setup", true, |args| { + let enc = args[3].get::().unwrap(); + if enc.is::() { + // Make sure the audio encoder tracks upstream timestamps. + enc.set_property("perfect-timestamp", false); + } + + Some(true.to_value()) + }); + + if self.args.enable_rapid_sync { + webrtcsink.connect("payloader-setup", false, |args| { + let payloader = args[3].get::().unwrap(); + + // Add RFC6051 64-bit NTP timestamp RTP header extension. + let hdr_ext = gst_rtp::RTPHeaderExtension::create_from_uri( + "urn:ietf:params:rtp-hdrext:ntp-64", + ) + .expect("Creating NTP 64-bit RTP header extension"); + hdr_ext.set_id(1); + payloader.emit_by_name::<()>("add-extension", &[&hdr_ext]); + + Some(true.into()) + }); + } + + for idx in 0..self.args.audio_streams { + let audiosrc = gst::ElementFactory::make("audiotestsrc") + .property("is-live", true) + .property("freq", (idx + 1) as f64 * 440.0) + .property("volume", 0.2f64) + .build() + .context("Creating audiotestsrc")?; + self.pipeline().add(&audiosrc).context("Adding audiosrc")?; + + audiosrc + .link_pads(None, &webrtcsink, Some("audio_%u")) + .context("Linking audiosrc")?; + } + + for idx in 0..self.args.video_streams { + let videosrc = gst::ElementFactory::make("videotestsrc") + .property("is-live", true) + .property_from_str("pattern", VIDEO_PATTERNS[idx % VIDEO_PATTERNS.len()]) + .build() + .context("Creating videotestsrc")?; + let video_overlay = gst::ElementFactory::make("timeoverlay") + .property_from_str("time-mode", "running-time") + .build() + .context("Creating timeoverlay")?; + + self.pipeline() + .add_many([&videosrc, &video_overlay]) + .expect("adding video elements"); + + videosrc + .link_filtered( + &video_overlay, + &gst::Caps::builder("video/x-raw") + .field("width", 800i32) + .field("height", 600i32) + .build(), + ) + .context("Linking videosrc to timeoverlay")?; + + video_overlay + .link_pads(None, &webrtcsink, Some("video_%u")) + .context("Linking video overlay")?; + } + + if let Some(ref video_caps) = self.args.video_caps { + webrtcsink.set_property("video-caps", &gst::Caps::builder(video_caps).build()); + } + + Ok(()) + } + + async fn run(&mut self) -> anyhow::Result<()> { + debug!("Running"); + + let bus = self.pipeline().bus().context("Getting the pipeline bus")?; + let mut bus_stream = bus.stream(); + + self.pipeline() + .call_async_future(|pipeline| pipeline.set_state(gst::State::Playing)) + .await + .context("Setting pipeline to Playing")?; + + while let Some(bus_msg) = bus_stream.next().await { + use gst::MessageView::*; + + match bus_msg.view() { + Error(msg) => { + let err = msg.error(); + let src_name = msg.src().map(|src| src.name()); + + bail!( + "Element {} error message: {err:#}", + src_name.as_deref().unwrap_or("UNKNOWN"), + ); + } + Latency(msg) => { + info!( + "Latency requirements have changed for element {}", + msg.src() + .map(|src| src.name()) + .as_deref() + .unwrap_or("UNKNOWN"), + ); + if let Err(err) = self.pipeline().recalculate_latency() { + error!(%err, "Error recalculating latency"); + } + } + _ => (), + } + } + + Ok(()) + } + + /// Tears this `App` down and deallocates all its resources by consuming `self`. + async fn teardown(mut self) { + debug!("Tearing down"); + + if let Some(pipeline) = self.pipeline.take() { + let _ = pipeline + .call_async_future(|pipeline| pipeline.set_state(gst::State::Null)) + .await; + } + } +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + use clap::Parser; + use tracing_subscriber::prelude::*; + + let args = Args::parse(); + + tracing_log::LogTracer::init().context("Setting logger")?; + let env_filter = tracing_subscriber::EnvFilter::try_from_env("WEBRTC_PRECISE_SYNC_SEND_LOG") + .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")); + let fmt_layer = tracing_subscriber::fmt::layer() + .with_thread_ids(true) + .with_target(true) + .with_span_events( + tracing_subscriber::fmt::format::FmtSpan::NEW + | tracing_subscriber::fmt::format::FmtSpan::CLOSE, + ); + let subscriber = tracing_subscriber::Registry::default() + .with(env_filter) + .with(fmt_layer); + tracing::subscriber::set_global_default(subscriber).context("Setting tracing subscriber")?; + + gst::init()?; + gstrswebrtc::plugin_register_static()?; + gstrsrtp::plugin_register_static()?; + + debug!("Starting"); + + let mut res = Ok(()); + let mut app = App::new(args); + + { + let ctrl_c = tokio::signal::ctrl_c().fuse(); + tokio::pin!(ctrl_c); + + let prepare_and_run = app.prepare_and_run().fuse(); + tokio::pin!(prepare_and_run); + + futures::select! { + _ctrl_c_res = ctrl_c => { + info!("Shutting down due to user request"); + } + app_res = prepare_and_run => { + if let Err(ref err) = app_res { + error!("Shutting down due to application error: {err:#}"); + } else { + info!("Shutting down due to application termination"); + } + + res = app_res; + } + } + } + + app.teardown().await; + + debug!("Quitting"); + + unsafe { + // Needed for certain tracers to write data + gst::deinit(); + } + + res +}