webrtc: add precise synchronization example

This example demonstrates a sender / receiver setup which ensures precise
synchronization of multiple streams in a single session.

[RFC 6051]-style rapid synchronization of RTP streams is available as an option.
See the [Instantaneous RTP synchronization...] blog post for details about this
mode and an example based on RTSP instead of WebRTC.

[RFC 6051]: https://datatracker.ietf.org/doc/html/rfc6051
[Instantaneous RTP synchronization...]: https://coaxion.net/blog/2022/05/instantaneous-rtp-synchronization-retrieval-of-absolute-sender-clock-times-with-gstreamer/

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1463>
This commit is contained in:
François Laignel 2024-04-03 19:10:40 +02:00
parent b5cbc47cf7
commit cc43935036
5 changed files with 1135 additions and 0 deletions

2
Cargo.lock generated
View file

@ -2926,12 +2926,14 @@ dependencies = [
"data-encoding",
"fastrand",
"futures",
"gst-plugin-rtp",
"gst-plugin-version-helper",
"gst-plugin-webrtc-signalling-protocol",
"gstreamer",
"gstreamer-app",
"gstreamer-audio",
"gstreamer-base",
"gstreamer-net",
"gstreamer-rtp",
"gstreamer-sdp",
"gstreamer-utils",

View file

@ -61,6 +61,8 @@ rand = "0.8"
once_cell.workspace = true
[dev-dependencies]
gst-net = { workspace = true, features = ["v1_20"] }
gst-plugin-rtp = { path = "../rtp" }
tracing = { version = "0.1", features = ["log"] }
tracing-subscriber = { version = "0.3", features = ["registry", "env-filter"] }
tracing-log = "0.2"
@ -103,3 +105,9 @@ name = "webrtcsink-high-quality-tune"
[[example]]
name = "webrtcsink-custom-signaller"
[[example]]
name = "webrtc-precise-sync-send"
[[example]]
name = "webrtc-precise-sync-recv"

View file

@ -21,3 +21,89 @@ run an example client.
An example of custom signaller implementation, see the corresponding
[README](webrtcsink-custom-signaller/README.md) for more details on code and usage.
## WebRTC precise synchronization example
This example demonstrates a sender / receiver setup which ensures precise
synchronization of multiple streams in a single session.
[RFC 6051]-style rapid synchronization of RTP streams is available as an option.
Se the [Instantaneous RTP synchronization...] blog post for details about this
mode and an example based on RTSP instead of WebRTC.
[RFC 6051]: https://datatracker.ietf.org/doc/html/rfc6051
[Instantaneous RTP synchronization...]: https://coaxion.net/blog/2022/05/instantaneous-rtp-synchronization-retrieval-of-absolute-sender-clock-times-with-gstreamer/
### Signaller
The example uses the default WebRTC signaller. Launch it using the following
command:
```shell
cargo run --bin gst-webrtc-signalling-server
```
### Receiver
The receiver awaits for new audio & video stream publishers and render the
streams using auto sink elements. Launch it using the following command:
```shell
cargo r --example webrtc-precise-sync-recv
```
The default configuration should work for a local test. For a multi-host setup,
see the available options:
```shell
cargo r --example webrtc-precise-sync-recv -- --help
```
E.g.: the following will force `avdec_h264` over hardware decoders, activate
debug logs for the receiver and connect to the signalling server at the
specified address:
```shell
GST_PLUGIN_FEATURE_RANK=avdec_h264:512 \
WEBRTC_PRECISE_SYNC_RECV_LOG=debug \
cargo r --example webrtc-precise-sync-recv -- --server 192.168.1.22
```
### Sender
The sender publishes audio & video test streams. Launch it using the following
command:
```shell
cargo r --example webrtc-precise-sync-send
```
The default configuration should work for a local test. For a multi-host setup,
to set the number of audio / video streams, to enable rapid synchronization or
to force the video encoder, see the available options:
```shell
cargo r --example webrtc-precise-sync-send -- --help
```
E.g.: the following will force H264 and `x264enc` over hardware encoders,
activate debug logs for the sender and connect to the signalling server at the
specified address:
```shell
GST_PLUGIN_FEATURE_RANK=264enc:512 \
WEBRTC_PRECISE_SYNC_SEND_LOG=debug \
cargo r --example webrtc-precise-sync-send -- \
--server 192.168.1.22 --video-caps video/x-h264
```
### The pipeline latency
The `--pipeline-latency` argument configures a static latency of 1s by default.
This needs to be higher than the sum of the sender latency and the receiver
latency of the receiver with the highest latency. As this can't be known
automatically and depends on many factors, this has to be known for the overall
system and configured accordingly.
The default configuration is on the safe side and favors synchronization over
low latency. Depending on the use case, shorter or larger values should be used.

View file

@ -0,0 +1,682 @@
use anyhow::{anyhow, bail, Context};
use async_tungstenite::tungstenite::Message;
use futures::prelude::*;
use futures::{pin_mut, select, select_biased};
use gst::prelude::*;
use tracing::{debug, error, info, trace};
use url::Url;
use std::pin::Pin;
use std::sync::Arc;
use gst_plugin_webrtc_protocol::{
IncomingMessage as ToSignaller, OutgoingMessage as FromSignaller, PeerRole, PeerStatus,
};
#[derive(Debug, Default, Clone, clap::Parser)]
struct Args {
#[clap(long, help = "Pipeline latency (ms)", default_value = "1000")]
pub pipeline_latency: u64,
#[clap(long, help = "RTP jitterbuffer latency (ms)", default_value = "40")]
pub rtp_latency: u32,
#[clap(
long,
help = "Maximum duration in seconds to wait for clock synchronization",
default_value = "5"
)]
pub clock_sync_timeout: u64,
#[clap(long, help = "NTP server host", default_value = "pool.ntp.org")]
pub ntp_server: String,
#[clap(long, help = "Signalling server host", default_value = "localhost")]
pub server: String,
#[clap(long, help = "Signalling server port", default_value = "8443")]
pub port: u32,
#[clap(long, help = "use tls")]
pub use_tls: bool,
}
impl Args {
pub fn scheme(&self) -> &str {
if self.use_tls {
"wss"
} else {
"ws"
}
}
}
fn spawn_consumer(
signaller_url: &Url,
pipeline: &gst::Pipeline,
args: Arc<Args>,
peer_id: String,
meta: Option<serde_json::Value>,
) -> anyhow::Result<()> {
info!(%peer_id, ?meta, "Spawning consumer");
let bin = gst::Bin::with_name(&peer_id);
pipeline.add(&bin).context("Adding consumer bin")?;
let webrtcsrc = gst::ElementFactory::make("webrtcsrc")
.build()
.context("Creating webrtcsrc")?;
bin.add(&webrtcsrc).context("Adding webrtcsrc")?;
let signaller = webrtcsrc.property::<gst::glib::Object>("signaller");
signaller.set_property("uri", signaller_url.as_str());
signaller.set_property("producer-peer-id", &peer_id);
signaller.connect("webrtcbin-ready", false, {
let cli_args = args.clone();
move |args| {
let webrtcbin = args[2].get::<gst::Element>().unwrap();
webrtcbin.set_property("latency", cli_args.rtp_latency);
let rtpbin = webrtcbin
.downcast_ref::<gst::Bin>()
.unwrap()
.by_name("rtpbin")
.unwrap();
rtpbin.set_property("add-reference-timestamp-meta", true);
// Configure for network synchronization via the RTP NTP timestamps.
// This requires that sender and receiver are synchronized to the same
// clock.
rtpbin.set_property_from_str("buffer-mode", "synced");
rtpbin.set_property("ntp-sync", true);
// Don't bother updating inter-stream offsets if the difference to the previous
// configuration is less than 1ms. The synchronization will have rounding errors
// in the range of the RTP clock rate, i.e. 1/90000s and 1/48000s in this case
rtpbin.set_property("min-ts-offset", gst::ClockTime::MSECOND);
None
}
});
webrtcsrc.connect_pad_added({
move |webrtcsrc, pad| {
let Some(bin) = webrtcsrc
.parent()
.map(|b| b.downcast::<gst::Bin>().expect("webrtcsrc added to a bin"))
else {
return;
};
if pad.name().starts_with("audio") {
let audioconvert = gst::ElementFactory::make("audioconvert")
.build()
.expect("Checked in prepare()");
let audioresample = gst::ElementFactory::make("audioresample")
.build()
.expect("Checked in prepare()");
// Decouple processing from sync a bit
let queue = gst::ElementFactory::make("queue")
.property("max-size-buffers", 1u32)
.property("max-size-bytes", 0u32)
.property("max-size-time", 0u64)
.build()
.expect("Checked in prepare()");
let audiosink = gst::ElementFactory::make("autoaudiosink")
.build()
.expect("Checked in prepare()");
bin.add_many([&audioconvert, &audioresample, &queue, &audiosink])
.unwrap();
pad.link(&audioconvert.static_pad("sink").unwrap()).unwrap();
gst::Element::link_many([&audioconvert, &audioresample, &queue, &audiosink])
.unwrap();
audiosink.sync_state_with_parent().unwrap();
queue.sync_state_with_parent().unwrap();
audioresample.sync_state_with_parent().unwrap();
audioconvert.sync_state_with_parent().unwrap();
} else if pad.name().starts_with("video") {
// Create a timeoverlay element to render the timestamps from
// the reference timestamp metadata on top of the video frames
// in the bottom left.
//
// Also add a pad probe on its sink pad to log the same timestamp to
// stdout on each frame.
let timeoverlay = gst::ElementFactory::make("timeoverlay")
.property_from_str("time-mode", "reference-timestamp")
.property_from_str("valignment", "bottom")
.build()
.expect("Checked in prepare()");
let sinkpad = timeoverlay
.static_pad("video_sink")
.expect("Failed to get timeoverlay sinkpad");
sinkpad
.add_probe(gst::PadProbeType::BUFFER, |_pad, info| {
if let Some(gst::PadProbeData::Buffer(ref buffer)) = info.data {
if let Some(meta) = buffer.meta::<gst::ReferenceTimestampMeta>() {
trace!(timestamp = %meta.timestamp(), "Have sender clock time");
} else {
trace!("Have no sender clock time");
}
}
gst::PadProbeReturn::Ok
})
.expect("Failed to add timeoverlay pad probe");
let videoconvert = gst::ElementFactory::make("videoconvert")
.build()
.expect("Checked in prepare()");
// Decouple processing from sync a bit
let queue = gst::ElementFactory::make("queue")
.property("max-size-buffers", 1u32)
.property("max-size-bytes", 0u32)
.property("max-size-time", 0u64)
.build()
.expect("Checked in prepare()");
let videosink = gst::ElementFactory::make("autovideosink")
.build()
.expect("Checked in prepare()");
bin.add_many([&timeoverlay, &videoconvert, &queue, &videosink])
.unwrap();
pad.link(&sinkpad).unwrap();
gst::Element::link_many([&timeoverlay, &videoconvert, &queue, &videosink]).unwrap();
videosink.sync_state_with_parent().unwrap();
queue.sync_state_with_parent().unwrap();
videoconvert.sync_state_with_parent().unwrap();
timeoverlay.sync_state_with_parent().unwrap();
}
}
});
signaller.connect("session-ended", true, {
let bin = bin.downgrade();
move |_| {
info!(%peer_id, "Session ended");
let Some(bin) = bin.upgrade() else {
return Some(false.into());
};
bin.call_async(|bin| {
let _ = bin.set_state(gst::State::Null);
if let Some(pipeline) = bin.parent().map(|p| {
p.downcast::<gst::Pipeline>()
.expect("Bin added to the pipeline")
}) {
let _ = pipeline.remove(bin);
}
});
Some(false.into())
}
});
bin.sync_state_with_parent()
.context("Syncing consumer bin with pipeline")?;
Ok(())
}
#[derive(Debug, Default)]
struct App {
args: Arc<Args>,
pipeline: Option<gst::Pipeline>,
listener_abort_hdl: Option<future::AbortHandle>,
listener_task_hdl: Option<future::Fuse<tokio::task::JoinHandle<anyhow::Result<()>>>>,
}
impl App {
fn new(args: Args) -> Self {
App {
args: args.into(),
..Default::default()
}
}
#[inline(always)]
fn pipeline(&self) -> &gst::Pipeline {
self.pipeline.as_ref().expect("Set in prepare")
}
async fn prepare_and_run(&mut self) -> anyhow::Result<()> {
self.prepare().await.context("Preparing")?;
self.run().await.context("Running")?;
Ok(())
}
async fn prepare(&mut self) -> anyhow::Result<()> {
debug!("Preparing");
// Check availability of elements which might be created in webrtcsrc.connect_pad_added()
let mut missing_elements = String::new();
for elem in [
"queue",
"audioconvert",
"audioresample",
"autovideosink",
"timeoverlay",
"videoconvert",
"autovideosink",
] {
if gst::ElementFactory::find(elem).is_none() {
missing_elements.push_str("\n\t- ");
missing_elements.push_str(elem);
}
}
if !missing_elements.is_empty() {
bail!("Missing elements:{}", missing_elements);
}
debug!("Syncing to NTP clock {}", self.args.ntp_server);
// Create the NTP clock and wait for synchronization.
let clock = tokio::task::spawn_blocking({
let clock =
gst_net::NtpClock::new(None, &self.args.ntp_server, 123, gst::ClockTime::ZERO);
let clock_sync_timeout = gst::ClockTime::from_seconds(self.args.clock_sync_timeout);
move || -> anyhow::Result<gst_net::NtpClock> {
clock.wait_for_sync(clock_sync_timeout)?;
Ok(clock)
}
})
.await
.context("Syncing to NTP clock")??;
info!("Synced to NTP clock");
self.pipeline = Some(gst::Pipeline::new());
self.pipeline().use_clock(Some(&clock));
// Set the base time of the pipeline statically to zero so that running
// time and clock time are the same. This easies debugging.
self.pipeline().set_base_time(gst::ClockTime::ZERO);
self.pipeline().set_start_time(gst::ClockTime::NONE);
// Configure a static latency (1s by default).
// This needs to be higher than the sum of the sender latency and
// the receiver latency of the receiver with the highest latency.
// As this can't be known automatically and depends on many factors,
// this has to be known for the overall system and configured accordingly.
self.pipeline()
.set_latency(gst::ClockTime::from_mseconds(self.args.pipeline_latency));
let signaller_url = Url::parse(&format!(
"{}://{}:{}",
self.args.scheme(),
self.args.server,
self.args.port,
))?;
let (signaller_tx, signaller_rx) = connect_as_listener(&signaller_url)
.await
.context("Connecting as listener")?;
let (listener_abort_hdl, listener_abort_reg) = future::AbortHandle::new_pair();
self.listener_abort_hdl = Some(listener_abort_hdl);
self.listener_task_hdl = Some(
tokio::task::spawn(listener_task(
listener_abort_reg,
signaller_tx,
signaller_rx,
signaller_url,
self.pipeline().clone(),
self.args.clone(),
))
.fuse(),
);
Ok(())
}
async fn run(&mut self) -> anyhow::Result<()> {
debug!("Running");
let bus = self.pipeline().bus().context("Getting the pipeline bus")?;
let mut bus_stream = bus.stream();
self.pipeline()
.call_async_future(|pipeline| pipeline.set_state(gst::State::Playing))
.await
.context("Setting pipeline to Playing")?;
loop {
select_biased! {
// Don't take listener_task_hdl: we will need it in teardown()
listener_res = self.listener_task_hdl.as_mut().expect("defined in prepare") => {
listener_res.context("Signaller listener task")??;
info!("Breaking due to signaller listener termination");
break;
},
bus_msg = bus_stream.next() => {
use gst::MessageView::*;
let Some(msg) = bus_msg else { break };
match msg.view() {
Error(msg) => {
let err = msg.error();
let src_name = msg.src().map(|src| src.name());
bail!(
"Element {} error message: {err:#}",
src_name.as_deref().unwrap_or("UNKNOWN"),
);
}
Latency(msg) => {
info!(
"Latency requirements have changed for element {}",
msg.src().map(|src| src.name()).as_deref().unwrap_or("UNKNOWN"),
);
if let Err(err) = self.pipeline().recalculate_latency() {
error!(%err, "Error recalculating latency");
}
}
_ => (),
}
}
}
}
Ok(())
}
/// Tears this `App` down and deallocates all its resources by consuming `self`.
async fn teardown(mut self) {
debug!("Tearing down");
if let Some(ref pipeline) = self.pipeline {
// For debugging purposes:
// define the GST_DEBUG_DUMP_DOT_DIR env var to generate a dot file.
pipeline.debug_to_dot_file(
gst::DebugGraphDetails::all(),
"webrtc-precise-sync-recv-tearing-down",
);
}
if let Some(listener_abort_hdl) = self.listener_abort_hdl.take() {
listener_abort_hdl.abort();
}
if let Some(pipeline) = self.pipeline.take() {
let _ = pipeline
.call_async_future(|pipeline| pipeline.set_state(gst::State::Null))
.await;
}
if let Some(listener_task_hdl) = self.listener_task_hdl.take() {
use future::FusedFuture;
if !listener_task_hdl.is_terminated() {
let _ = listener_task_hdl.await;
}
}
}
}
async fn connect_as_listener(
signaller_url: &Url,
) -> anyhow::Result<(
Pin<Box<impl Sink<ToSignaller, Error = anyhow::Error>>>,
Pin<Box<impl Stream<Item = anyhow::Result<FromSignaller>>>>,
)> {
async fn register(
mut signaller_tx: Pin<&mut impl Sink<ToSignaller, Error = anyhow::Error>>,
mut signaller_rx: Pin<&mut impl Stream<Item = anyhow::Result<FromSignaller>>>,
) -> anyhow::Result<()> {
match signaller_rx
.next()
.await
.unwrap_or_else(|| Err(anyhow!("Signaller ended session")))
.context("Expecting Welcome")?
{
FromSignaller::Welcome { peer_id } => {
info!(%peer_id, "Got Welcomed by signaller");
}
other => bail!("Expected Welcome, got {other:?}"),
}
debug!("Registering as listener");
signaller_tx
.send(ToSignaller::SetPeerStatus(PeerStatus {
roles: vec![PeerRole::Listener],
..Default::default()
}))
.await
.context("Sending SetPeerStatus")?;
loop {
let msg = signaller_rx
.next()
.await
.unwrap_or_else(|| Err(anyhow!("Signaller ended session")))
.context("SetPeerStatus response")?;
match msg {
FromSignaller::PeerStatusChanged(peer_status) if peer_status.listening() => break,
FromSignaller::EndSession(_) => bail!("Signaller ended session unexpectedly"),
_ => (),
}
}
Ok(())
}
debug!("Connecting to Signaller");
let (ws, _) = async_tungstenite::tokio::connect_async(signaller_url)
.await
.context("Connecting to signaller")?;
let (ws_tx, ws_rx) = ws.split();
let mut signaller_tx = Box::pin(ws_tx.sink_err_into::<anyhow::Error>().with(
|msg: ToSignaller| {
future::ok(Message::Text(
serde_json::to_string(&msg).expect("msg is serializable"),
))
},
));
let mut signaller_rx = Box::pin(ws_rx.filter_map(|msg| {
future::ready(match msg {
Ok(Message::Text(msg)) => match serde_json::from_str::<FromSignaller>(&msg) {
Ok(message) => Some(Ok(message)),
Err(err) => Some(Err(anyhow!(
"Failed to deserialize signaller message: {err:#}",
))),
},
Ok(Message::Close(_)) => Some(Err(anyhow!("Connection closed"))),
Ok(Message::Ping(_)) => None,
Ok(other) => Some(Err(anyhow!("Unexpected message {other:?}"))),
Err(err) => Some(Err(err.into())),
})
}));
if let Err(err) = register(signaller_tx.as_mut(), signaller_rx.as_mut())
.await
.context("Registering as listener")
{
debug!("Closing signaller websocket due to {err:#}");
let _ = signaller_tx.close().await;
return Err(err);
}
Ok((signaller_tx, signaller_rx))
}
async fn listen(
signaller_tx: &mut Pin<Box<impl Sink<ToSignaller, Error = anyhow::Error>>>,
mut signaller_rx: Pin<Box<impl Stream<Item = anyhow::Result<FromSignaller>>>>,
signaller_url: Url,
pipeline: gst::Pipeline,
args: Arc<Args>,
) -> anyhow::Result<()> {
debug!("Looking for already registered producers");
signaller_tx
.send(ToSignaller::List)
.await
.context("Sending List")?;
loop {
match signaller_rx
.next()
.await
.unwrap_or_else(|| Err(anyhow!("Signaller ended session")))
.context("List response")?
{
FromSignaller::List { producers } => {
for peer in producers {
spawn_consumer(&signaller_url, &pipeline, args.clone(), peer.id, peer.meta)
.context("Spawning consumer")?;
}
break;
}
FromSignaller::EndSession(_) => bail!("Signaller ended session unexpectedly"),
_ => (),
}
}
debug!("Listening to signaller");
loop {
match signaller_rx
.next()
.await
.unwrap_or_else(|| Err(anyhow!("Signaller ended session")))
.context("Listening to signaller")?
{
FromSignaller::PeerStatusChanged(peer_status) if peer_status.producing() => {
spawn_consumer(
&signaller_url,
&pipeline,
args.clone(),
peer_status.peer_id.expect("producer with peer_id"),
peer_status.meta,
)
.context("Spawning consumer")?;
}
FromSignaller::EndSession(_) => {
info!("Signaller ended session");
break;
}
other => trace!(msg = ?other, "Ignoring signaller message"),
}
}
Ok(())
}
/// Wrapper around the listener.
///
/// Ensures the websocket is properly closed even if an error occurs or
/// the listener is aborted.
async fn listener_task(
abort_reg: future::AbortRegistration,
mut signaller_tx: Pin<Box<impl Sink<ToSignaller, Error = anyhow::Error>>>,
signaller_rx: Pin<Box<impl Stream<Item = anyhow::Result<FromSignaller>>>>,
signaller_url: Url,
pipeline: gst::Pipeline,
args: Arc<Args>,
) -> anyhow::Result<()> {
let res = future::Abortable::new(
listen(
&mut signaller_tx,
signaller_rx,
signaller_url,
pipeline,
args,
),
abort_reg,
)
.await;
debug!("Closing signaller websocket");
let _ = signaller_tx.close().await;
if let Ok(listener_res) = res {
if listener_res.is_err() {
listener_res?;
}
}
Ok(())
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
use clap::Parser;
use tracing_subscriber::prelude::*;
let args = Args::parse();
tracing_log::LogTracer::init().context("Setting logger")?;
let env_filter = tracing_subscriber::EnvFilter::try_from_env("WEBRTC_PRECISE_SYNC_RECV_LOG")
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info"));
let fmt_layer = tracing_subscriber::fmt::layer()
.with_thread_ids(true)
.with_target(true)
.with_span_events(
tracing_subscriber::fmt::format::FmtSpan::NEW
| tracing_subscriber::fmt::format::FmtSpan::CLOSE,
);
let subscriber = tracing_subscriber::Registry::default()
.with(env_filter)
.with(fmt_layer);
tracing::subscriber::set_global_default(subscriber).context("Setting tracing subscriber")?;
gst::init()?;
gstrswebrtc::plugin_register_static()?;
gstrsrtp::plugin_register_static()?;
debug!("Starting");
let mut res = Ok(());
let mut app = App::new(args);
{
let ctrl_c = tokio::signal::ctrl_c().fuse();
pin_mut!(ctrl_c);
let prepare_and_run = app.prepare_and_run().fuse();
pin_mut!(prepare_and_run);
select! {
_ctrl_c_res = ctrl_c => {
info!("Shutting down due to user request");
}
app_res = prepare_and_run => {
if let Err(ref err) = app_res {
error!("Shutting down due to application error: {err:#}");
} else {
info!("Shutting down due to application termination");
}
res = app_res;
}
}
}
app.teardown().await;
debug!("Quitting");
unsafe {
// Needed for certain tracers to write data
gst::deinit();
}
res
}

View file

@ -0,0 +1,357 @@
use anyhow::{bail, Context};
use futures::prelude::*;
use gst::prelude::*;
use gst_rtp::prelude::*;
use tracing::{debug, error, info};
use url::Url;
const VIDEO_PATTERNS: [&str; 3] = ["ball", "smpte", "snow"];
#[derive(Debug, Default, Clone, clap::Parser)]
struct Args {
#[clap(
long,
help = "Number of audio streams. Use 0 to disable audio",
default_value = "1"
)]
pub audio_streams: usize,
#[clap(
long,
help = "Number of video streams. Use 0 to disable video",
default_value = "1"
)]
pub video_streams: usize,
#[clap(long, help = "Force video caps (ex. 'video/x-h264')")]
pub video_caps: Option<String>,
#[clap(long, help = "Use RFC 6051 64-bit NTP timestamp RTP header extension.")]
pub enable_rapid_sync: bool,
#[clap(
long,
help = "Maximum duration in seconds to wait for clock synchronization",
default_value = "5"
)]
pub clock_sync_timeout: u64,
#[clap(long, help = "NTP server host", default_value = "pool.ntp.org")]
pub ntp_server: String,
#[clap(long, help = "Signalling server host", default_value = "localhost")]
pub server: String,
#[clap(long, help = "Signalling server port", default_value = "8443")]
pub port: u32,
#[clap(long, help = "use tls")]
pub use_tls: bool,
}
impl Args {
fn scheme(&self) -> &str {
if self.use_tls {
"wss"
} else {
"ws"
}
}
}
#[derive(Debug, Default)]
struct App {
args: Args,
pipeline: Option<gst::Pipeline>,
}
impl App {
fn new(args: Args) -> Self {
App {
args,
..Default::default()
}
}
#[inline(always)]
fn pipeline(&self) -> &gst::Pipeline {
self.pipeline.as_ref().expect("Set in prepare")
}
async fn prepare_and_run(&mut self) -> anyhow::Result<()> {
self.prepare().await.context("Preparing")?;
self.run().await.context("Running")?;
Ok(())
}
async fn prepare(&mut self) -> anyhow::Result<()> {
debug!("Preparing");
debug!("Syncing to NTP clock {}", self.args.ntp_server);
// Create the NTP clock and wait for synchronization.
let clock = tokio::task::spawn_blocking({
let clock =
gst_net::NtpClock::new(None, &self.args.ntp_server, 123, gst::ClockTime::ZERO);
let clock_sync_timeout = gst::ClockTime::from_seconds(self.args.clock_sync_timeout);
move || -> anyhow::Result<gst_net::NtpClock> {
clock.wait_for_sync(clock_sync_timeout)?;
Ok(clock)
}
})
.await
.context("Syncing to NTP clock")??;
info!("Synced to NTP clock");
self.pipeline = Some(gst::Pipeline::new());
self.pipeline().use_clock(Some(&clock));
// Set the base time of the pipeline statically to zero so that running
// time and clock time are the same and timeoverlay can be used to render
// the clock time over the video frames.
//
// This is needed for no other reasons.
self.pipeline().set_base_time(gst::ClockTime::ZERO);
self.pipeline().set_start_time(gst::ClockTime::NONE);
let signaller_url = Url::parse(&format!(
"{}://{}:{}",
self.args.scheme(),
self.args.server,
self.args.port,
))?;
let webrtcsink = gst::ElementFactory::make("webrtcsink")
// See:
// * https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/issues/497
// * https://gitlab.freedesktop.org/gstreamer/gstreamer/-/issues/3301
.property("do-fec", false)
.build()
.context("Creating webrtcsink")?;
self.pipeline().add(&webrtcsink).unwrap();
let signaller = webrtcsink.property::<gst::glib::Object>("signaller");
signaller.set_property("uri", signaller_url.as_str());
signaller.connect("webrtcbin-ready", false, |args| {
let webrtcbin = args[2].get::<gst::Element>().unwrap();
let rtpbin = webrtcbin
.downcast_ref::<gst::Bin>()
.unwrap()
.by_name("rtpbin")
.unwrap();
// Use local pipeline clock time as RTP NTP time source instead of using
// the local wallclock time converted to the NTP epoch.
rtpbin.set_property_from_str("ntp-time-source", "clock-time");
// Use the capture time instead of the send time for the RTP / NTP timestamp
// mapping. The difference between the two options is the capture/encoder/etc.
// latency that is introduced before sending.
rtpbin.set_property("rtcp-sync-send-time", false);
None
});
webrtcsink.connect("encoder-setup", true, |args| {
let enc = args[3].get::<gst::Element>().unwrap();
if enc.is::<gst_audio::AudioEncoder>() {
// Make sure the audio encoder tracks upstream timestamps.
enc.set_property("perfect-timestamp", false);
}
Some(true.to_value())
});
if self.args.enable_rapid_sync {
webrtcsink.connect("payloader-setup", false, |args| {
let payloader = args[3].get::<gst::Element>().unwrap();
// Add RFC6051 64-bit NTP timestamp RTP header extension.
let hdr_ext = gst_rtp::RTPHeaderExtension::create_from_uri(
"urn:ietf:params:rtp-hdrext:ntp-64",
)
.expect("Creating NTP 64-bit RTP header extension");
hdr_ext.set_id(1);
payloader.emit_by_name::<()>("add-extension", &[&hdr_ext]);
Some(true.into())
});
}
for idx in 0..self.args.audio_streams {
let audiosrc = gst::ElementFactory::make("audiotestsrc")
.property("is-live", true)
.property("freq", (idx + 1) as f64 * 440.0)
.property("volume", 0.2f64)
.build()
.context("Creating audiotestsrc")?;
self.pipeline().add(&audiosrc).context("Adding audiosrc")?;
audiosrc
.link_pads(None, &webrtcsink, Some("audio_%u"))
.context("Linking audiosrc")?;
}
for idx in 0..self.args.video_streams {
let videosrc = gst::ElementFactory::make("videotestsrc")
.property("is-live", true)
.property_from_str("pattern", VIDEO_PATTERNS[idx % VIDEO_PATTERNS.len()])
.build()
.context("Creating videotestsrc")?;
let video_overlay = gst::ElementFactory::make("timeoverlay")
.property_from_str("time-mode", "running-time")
.build()
.context("Creating timeoverlay")?;
self.pipeline()
.add_many([&videosrc, &video_overlay])
.expect("adding video elements");
videosrc
.link_filtered(
&video_overlay,
&gst::Caps::builder("video/x-raw")
.field("width", 800i32)
.field("height", 600i32)
.build(),
)
.context("Linking videosrc to timeoverlay")?;
video_overlay
.link_pads(None, &webrtcsink, Some("video_%u"))
.context("Linking video overlay")?;
}
if let Some(ref video_caps) = self.args.video_caps {
webrtcsink.set_property("video-caps", &gst::Caps::builder(video_caps).build());
}
Ok(())
}
async fn run(&mut self) -> anyhow::Result<()> {
debug!("Running");
let bus = self.pipeline().bus().context("Getting the pipeline bus")?;
let mut bus_stream = bus.stream();
self.pipeline()
.call_async_future(|pipeline| pipeline.set_state(gst::State::Playing))
.await
.context("Setting pipeline to Playing")?;
while let Some(bus_msg) = bus_stream.next().await {
use gst::MessageView::*;
match bus_msg.view() {
Error(msg) => {
let err = msg.error();
let src_name = msg.src().map(|src| src.name());
bail!(
"Element {} error message: {err:#}",
src_name.as_deref().unwrap_or("UNKNOWN"),
);
}
Latency(msg) => {
info!(
"Latency requirements have changed for element {}",
msg.src()
.map(|src| src.name())
.as_deref()
.unwrap_or("UNKNOWN"),
);
if let Err(err) = self.pipeline().recalculate_latency() {
error!(%err, "Error recalculating latency");
}
}
_ => (),
}
}
Ok(())
}
/// Tears this `App` down and deallocates all its resources by consuming `self`.
async fn teardown(mut self) {
debug!("Tearing down");
if let Some(pipeline) = self.pipeline.take() {
let _ = pipeline
.call_async_future(|pipeline| pipeline.set_state(gst::State::Null))
.await;
}
}
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
use clap::Parser;
use tracing_subscriber::prelude::*;
let args = Args::parse();
tracing_log::LogTracer::init().context("Setting logger")?;
let env_filter = tracing_subscriber::EnvFilter::try_from_env("WEBRTC_PRECISE_SYNC_SEND_LOG")
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info"));
let fmt_layer = tracing_subscriber::fmt::layer()
.with_thread_ids(true)
.with_target(true)
.with_span_events(
tracing_subscriber::fmt::format::FmtSpan::NEW
| tracing_subscriber::fmt::format::FmtSpan::CLOSE,
);
let subscriber = tracing_subscriber::Registry::default()
.with(env_filter)
.with(fmt_layer);
tracing::subscriber::set_global_default(subscriber).context("Setting tracing subscriber")?;
gst::init()?;
gstrswebrtc::plugin_register_static()?;
gstrsrtp::plugin_register_static()?;
debug!("Starting");
let mut res = Ok(());
let mut app = App::new(args);
{
let ctrl_c = tokio::signal::ctrl_c().fuse();
tokio::pin!(ctrl_c);
let prepare_and_run = app.prepare_and_run().fuse();
tokio::pin!(prepare_and_run);
futures::select! {
_ctrl_c_res = ctrl_c => {
info!("Shutting down due to user request");
}
app_res = prepare_and_run => {
if let Err(ref err) = app_res {
error!("Shutting down due to application error: {err:#}");
} else {
info!("Shutting down due to application termination");
}
res = app_res;
}
}
}
app.teardown().await;
debug!("Quitting");
unsafe {
// Needed for certain tracers to write data
gst::deinit();
}
res
}