diff --git a/docs/plugins/gst_plugins_cache.json b/docs/plugins/gst_plugins_cache.json index c5d5a0da..3d4a22b6 100644 --- a/docs/plugins/gst_plugins_cache.json +++ b/docs/plugins/gst_plugins_cache.json @@ -8063,6 +8063,18 @@ "type": "GstWebRTCSinkCongestionControl", "writable": true }, + "do-clock-signalling": { + "blurb": "Whether PTP or NTP clock & RTP/clock offset should be signalled according to RFC 7273", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "false", + "mutable": "ready", + "readable": true, + "type": "gboolean", + "writable": true + }, "do-fec": { "blurb": "Whether the element should negotiate and send FEC data", "conditionally-available": false, diff --git a/net/webrtc/Cargo.toml b/net/webrtc/Cargo.toml index 8619e00e..26265b0c 100644 --- a/net/webrtc/Cargo.toml +++ b/net/webrtc/Cargo.toml @@ -13,6 +13,7 @@ gst = { workspace = true, features = ["v1_20", "serde"] } gst-app = { workspace = true, features = ["v1_20"] } gst-audio = { workspace = true, features = ["v1_20", "serde"] } gst-video = { workspace = true, features = ["v1_20", "serde"] } +gst-net = { workspace = true, features = ["v1_20"] } gst-webrtc = { workspace = true, features = ["v1_20"] } gst-sdp = { workspace = true, features = ["v1_20"] } gst-rtp = { workspace = true, features = ["v1_20"] } @@ -61,7 +62,6 @@ rand = "0.8" once_cell.workspace = true [dev-dependencies] -gst-net = { workspace = true, features = ["v1_20"] } gst-plugin-rtp = { path = "../rtp" } tracing = { version = "0.1", features = ["log"] } tracing-subscriber = { version = "0.3", features = ["registry", "env-filter"] } diff --git a/net/webrtc/examples/README.md b/net/webrtc/examples/README.md index 16340b48..4088e8f6 100644 --- a/net/webrtc/examples/README.md +++ b/net/webrtc/examples/README.md @@ -31,7 +31,11 @@ synchronization of multiple streams in a single session. Se the [Instantaneous RTP synchronization...] blog post for details about this mode and an example based on RTSP instead of WebRTC. +The examples can also be used for [RFC 7273] NTP or PTP clock signalling and +synchronization. + [RFC 6051]: https://datatracker.ietf.org/doc/html/rfc6051 +[RFC 7273]: https://datatracker.ietf.org/doc/html/rfc7273 [Instantaneous RTP synchronization...]: https://coaxion.net/blog/2022/05/instantaneous-rtp-synchronization-retrieval-of-absolute-sender-clock-times-with-gstreamer/ ### Signaller @@ -64,7 +68,7 @@ debug logs for the receiver and connect to the signalling server at the specified address: ```shell -GST_PLUGIN_FEATURE_RANK=avdec_h264:512 \ +GST_PLUGIN_FEATURE_RANK=avdec_h264:MAX \ WEBRTC_PRECISE_SYNC_RECV_LOG=debug \ cargo r --example webrtc-precise-sync-recv -- --server 192.168.1.22 ``` @@ -91,7 +95,7 @@ activate debug logs for the sender and connect to the signalling server at the specified address: ```shell -GST_PLUGIN_FEATURE_RANK=264enc:512 \ +GST_PLUGIN_FEATURE_RANK=264enc:MAX \ WEBRTC_PRECISE_SYNC_SEND_LOG=debug \ cargo r --example webrtc-precise-sync-send -- \ --server 192.168.1.22 --video-caps video/x-h264 @@ -107,3 +111,21 @@ system and configured accordingly. The default configuration is on the safe side and favors synchronization over low latency. Depending on the use case, shorter or larger values should be used. + +### RFC 7273 NTP or PTP clock signalling and synchronization + +For [RFC 7273] NTP or PTP clock signalling and synchronization, you can use +commands such as: + +#### Receiver + +```shell +cargo r --example webrtc-precise-sync-recv -- --expect-clock-signalling +``` + +#### Sender + +```shell +cargo r --example webrtc-precise-sync-send -- --clock ntp --do-clock-signalling \ + --video-streams 0 --audio-streams 2 +``` diff --git a/net/webrtc/examples/webrtc-precise-sync-recv.rs b/net/webrtc/examples/webrtc-precise-sync-recv.rs index fdc2c804..d787dbfe 100644 --- a/net/webrtc/examples/webrtc-precise-sync-recv.rs +++ b/net/webrtc/examples/webrtc-precise-sync-recv.rs @@ -15,11 +15,8 @@ use gst_plugin_webrtc_protocol::{ #[derive(Debug, Default, Clone, clap::Parser)] struct Args { - #[clap(long, help = "Pipeline latency (ms)", default_value = "1000")] - pub pipeline_latency: u64, - - #[clap(long, help = "RTP jitterbuffer latency (ms)", default_value = "40")] - pub rtp_latency: u32, + #[clap(long, help = "Initial clock type", default_value = "ntp")] + pub clock: Clock, #[clap( long, @@ -28,9 +25,24 @@ struct Args { )] pub clock_sync_timeout: u64, + #[clap( + long, + help = "Expect RFC 7273 PTP or NTP clock & RTP/clock offset signalling" + )] + pub expect_clock_signalling: bool, + #[clap(long, help = "NTP server host", default_value = "pool.ntp.org")] pub ntp_server: String, + #[clap(long, help = "PTP domain", default_value = "0")] + pub ptp_domain: u32, + + #[clap(long, help = "Pipeline latency (ms)", default_value = "1000")] + pub pipeline_latency: u64, + + #[clap(long, help = "RTP jitterbuffer latency (ms)", default_value = "40")] + pub rtp_latency: u32, + #[clap(long, help = "Signalling server host", default_value = "localhost")] pub server: String, @@ -49,6 +61,43 @@ impl Args { "ws" } } + + async fn get_synced_clock(&self) -> anyhow::Result { + debug!("Syncing to {:?}", self.clock); + + // Create the requested clock and wait for synchronization. + let clock = match self.clock { + Clock::System => gst::SystemClock::obtain(), + Clock::Ntp => gst_net::NtpClock::new(None, &self.ntp_server, 123, gst::ClockTime::ZERO) + .upcast::(), + Clock::Ptp => { + gst_net::PtpClock::init(None, &[])?; + gst_net::PtpClock::new(None, self.ptp_domain)?.upcast() + } + }; + + let clock_sync_timeout = gst::ClockTime::from_seconds(self.clock_sync_timeout); + let clock = + tokio::task::spawn_blocking(move || -> Result { + clock.wait_for_sync(clock_sync_timeout)?; + Ok(clock) + }) + .await + .with_context(|| format!("Syncing to {:?}", self.clock))? + .with_context(|| format!("Syncing to {:?}", self.clock))?; + + info!("Synced to {:?}", self.clock); + + Ok(clock) + } +} + +#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, clap::ValueEnum)] +pub enum Clock { + #[default] + Ntp, + Ptp, + System, } fn spawn_consumer( @@ -67,6 +116,13 @@ fn spawn_consumer( .build() .context("Creating webrtcsrc")?; + if args.expect_clock_signalling { + // Discard retransmission in RFC 7273 mode. See: + // * https://gitlab.freedesktop.org/gstreamer/gst-plugins-good/-/issues/914 + // * https://gitlab.freedesktop.org/gstreamer/gstreamer/-/issues/1574 + webrtcsrc.set_property("do-retransmission", false); + } + bin.add(&webrtcsrc).context("Adding webrtcsrc")?; let signaller = webrtcsrc.property::("signaller"); @@ -91,7 +147,13 @@ fn spawn_consumer( // This requires that sender and receiver are synchronized to the same // clock. rtpbin.set_property_from_str("buffer-mode", "synced"); - rtpbin.set_property("ntp-sync", true); + + if cli_args.expect_clock_signalling { + // Synchronize incoming packets using signalled RFC 7273 clock. + rtpbin.set_property("rfc7273-sync", true); + } else if cli_args.clock == Clock::Ntp { + rtpbin.set_property("ntp-sync", true); + } // Don't bother updating inter-stream offsets if the difference to the previous // configuration is less than 1ms. The synchronization will have rounding errors @@ -140,6 +202,8 @@ fn spawn_consumer( audioresample.sync_state_with_parent().unwrap(); audioconvert.sync_state_with_parent().unwrap(); } else if pad.name().starts_with("video") { + use std::sync::atomic::{AtomicBool, Ordering}; + // Create a timeoverlay element to render the timestamps from // the reference timestamp metadata on top of the video frames // in the bottom left. @@ -154,18 +218,30 @@ fn spawn_consumer( let sinkpad = timeoverlay .static_pad("video_sink") .expect("Failed to get timeoverlay sinkpad"); + let ref_ts_caps_set = AtomicBool::new(false); sinkpad - .add_probe(gst::PadProbeType::BUFFER, |_pad, info| { + .add_probe(gst::PadProbeType::BUFFER, { + let timeoverlay = timeoverlay.downgrade(); + move |_pad, info| { if let Some(gst::PadProbeData::Buffer(ref buffer)) = info.data { if let Some(meta) = buffer.meta::() { - trace!(timestamp = %meta.timestamp(), "Have sender clock time"); + if !ref_ts_caps_set.fetch_or(true, Ordering::SeqCst) { + if let Some(timeoverlay) = timeoverlay.upgrade() { + let reference = meta.reference(); + timeoverlay.set_property("reference-timestamp-caps", reference.to_owned()); + + info!(%reference, timestamp = %meta.timestamp(), "Have sender clock time"); + } + } else { + trace!(timestamp = %meta.timestamp(), "Have sender clock time"); + } } else { trace!("Have no sender clock time"); } } gst::PadProbeReturn::Ok - }) + }}) .expect("Failed to add timeoverlay pad probe"); let videoconvert = gst::ElementFactory::make("videoconvert") @@ -277,26 +353,9 @@ impl App { bail!("Missing elements:{}", missing_elements); } - debug!("Syncing to NTP clock {}", self.args.ntp_server); - - // Create the NTP clock and wait for synchronization. - let clock = tokio::task::spawn_blocking({ - let clock = - gst_net::NtpClock::new(None, &self.args.ntp_server, 123, gst::ClockTime::ZERO); - let clock_sync_timeout = gst::ClockTime::from_seconds(self.args.clock_sync_timeout); - move || -> anyhow::Result { - clock.wait_for_sync(clock_sync_timeout)?; - - Ok(clock) - } - }) - .await - .context("Syncing to NTP clock")??; - - info!("Synced to NTP clock"); - self.pipeline = Some(gst::Pipeline::new()); - self.pipeline().use_clock(Some(&clock)); + self.pipeline() + .use_clock(Some(&self.args.get_synced_clock().await?)); // Set the base time of the pipeline statically to zero so that running // time and clock time are the same. This easies debugging. diff --git a/net/webrtc/examples/webrtc-precise-sync-send.rs b/net/webrtc/examples/webrtc-precise-sync-send.rs index cd61a3a9..863980ab 100644 --- a/net/webrtc/examples/webrtc-precise-sync-send.rs +++ b/net/webrtc/examples/webrtc-precise-sync-send.rs @@ -9,6 +9,28 @@ const VIDEO_PATTERNS: [&str; 3] = ["ball", "smpte", "snow"]; #[derive(Debug, Default, Clone, clap::Parser)] struct Args { + #[clap(long, help = "Clock type", default_value = "ntp")] + pub clock: Clock, + + #[clap( + long, + help = "Maximum duration in seconds to wait for clock synchronization", + default_value = "5" + )] + pub clock_sync_timeout: u64, + + #[clap( + long, + help = "Enable RFC 7273 PTP or NTP clock & RTP/clock offset signalling" + )] + pub do_clock_signalling: bool, + + #[clap(long, help = "NTP server host", default_value = "pool.ntp.org")] + pub ntp_server: String, + + #[clap(long, help = "PTP domain", default_value = "0")] + pub ptp_domain: u32, + #[clap( long, help = "Number of audio streams. Use 0 to disable audio", @@ -29,16 +51,6 @@ struct Args { #[clap(long, help = "Use RFC 6051 64-bit NTP timestamp RTP header extension.")] pub enable_rapid_sync: bool, - #[clap( - long, - help = "Maximum duration in seconds to wait for clock synchronization", - default_value = "5" - )] - pub clock_sync_timeout: u64, - - #[clap(long, help = "NTP server host", default_value = "pool.ntp.org")] - pub ntp_server: String, - #[clap(long, help = "Signalling server host", default_value = "localhost")] pub server: String, @@ -57,6 +69,43 @@ impl Args { "ws" } } + + async fn get_synced_clock(&self) -> anyhow::Result { + debug!("Syncing to {:?}", self.clock); + + // Create the requested clock and wait for synchronization. + let clock = match self.clock { + Clock::System => gst::SystemClock::obtain(), + Clock::Ntp => gst_net::NtpClock::new(None, &self.ntp_server, 123, gst::ClockTime::ZERO) + .upcast::(), + Clock::Ptp => { + gst_net::PtpClock::init(None, &[])?; + gst_net::PtpClock::new(None, self.ptp_domain)?.upcast() + } + }; + + let clock_sync_timeout = gst::ClockTime::from_seconds(self.clock_sync_timeout); + let clock = + tokio::task::spawn_blocking(move || -> Result { + clock.wait_for_sync(clock_sync_timeout)?; + Ok(clock) + }) + .await + .with_context(|| format!("Syncing to {:?}", self.clock))? + .with_context(|| format!("Syncing to {:?}", self.clock))?; + + info!("Synced to {:?}", self.clock); + + Ok(clock) + } +} + +#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, clap::ValueEnum)] +pub enum Clock { + #[default] + Ntp, + Ptp, + System, } #[derive(Debug, Default)] @@ -88,26 +137,9 @@ impl App { async fn prepare(&mut self) -> anyhow::Result<()> { debug!("Preparing"); - debug!("Syncing to NTP clock {}", self.args.ntp_server); - - // Create the NTP clock and wait for synchronization. - let clock = tokio::task::spawn_blocking({ - let clock = - gst_net::NtpClock::new(None, &self.args.ntp_server, 123, gst::ClockTime::ZERO); - let clock_sync_timeout = gst::ClockTime::from_seconds(self.args.clock_sync_timeout); - move || -> anyhow::Result { - clock.wait_for_sync(clock_sync_timeout)?; - - Ok(clock) - } - }) - .await - .context("Syncing to NTP clock")??; - - info!("Synced to NTP clock"); - self.pipeline = Some(gst::Pipeline::new()); - self.pipeline().use_clock(Some(&clock)); + self.pipeline() + .use_clock(Some(&self.args.get_synced_clock().await?)); // Set the base time of the pipeline statically to zero so that running // time and clock time are the same and timeoverlay can be used to render @@ -129,6 +161,7 @@ impl App { // * https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/issues/497 // * https://gitlab.freedesktop.org/gstreamer/gstreamer/-/issues/3301 .property("do-fec", false) + .property("do-clock-signalling", self.args.do_clock_signalling) .build() .context("Creating webrtcsink")?; diff --git a/net/webrtc/src/webrtcsink/imp.rs b/net/webrtc/src/webrtcsink/imp.rs index d182e74e..d504e36c 100644 --- a/net/webrtc/src/webrtcsink/imp.rs +++ b/net/webrtc/src/webrtcsink/imp.rs @@ -61,6 +61,7 @@ const DEFAULT_CONGESTION_CONTROL: WebRTCSinkCongestionControl = if cfg!(feature }; const DEFAULT_DO_FEC: bool = true; const DEFAULT_DO_RETRANSMISSION: bool = true; +const DEFAULT_DO_CLOCK_SIGNALLING: bool = false; const DEFAULT_ENABLE_DATA_CHANNEL_NAVIGATION: bool = false; const DEFAULT_ICE_TRANSPORT_POLICY: WebRTCICETransportPolicy = WebRTCICETransportPolicy::All; const DEFAULT_START_BITRATE: u32 = 2048000; @@ -87,6 +88,7 @@ struct Settings { cc_info: CCInfo, do_fec: bool, do_retransmission: bool, + do_clock_signalling: bool, enable_data_channel_navigation: bool, meta: Option, ice_transport_policy: WebRTCICETransportPolicy, @@ -517,6 +519,7 @@ impl Default for Settings { }, do_fec: DEFAULT_DO_FEC, do_retransmission: DEFAULT_DO_RETRANSMISSION, + do_clock_signalling: DEFAULT_DO_CLOCK_SIGNALLING, enable_data_channel_navigation: DEFAULT_ENABLE_DATA_CHANNEL_NAVIGATION, meta: None, ice_transport_policy: DEFAULT_ICE_TRANSPORT_POLICY, @@ -1298,12 +1301,29 @@ impl Session { element.emit_by_name::("encoder-setup", &[&self.peer_id, &stream_name, &enc]); } + let sdp = self.sdp.as_ref().unwrap(); + let sdp_media = sdp.media(webrtc_pad.media_idx).unwrap(); + + let mut global_caps = gst::Caps::new_empty_simple("application/x-unknown"); + + sdp.attributes_to_caps(global_caps.get_mut().unwrap()) + .unwrap(); + sdp_media + .attributes_to_caps(global_caps.get_mut().unwrap()) + .unwrap(); + + let caps = sdp_media + .caps_from_media(payload) + .unwrap() + .intersect(&global_caps); + element.imp().configure_payloader( &self.peer_id, stream_name, &payloader, &codec, Some(webrtc_pad.ssrc), + Some(&caps), ExtensionConfigurationType::Skip, )?; @@ -1319,21 +1339,6 @@ impl Session { .property::("transceiver"); transceiver.set_property("codec-preferences", None::); - let mut global_caps = gst::Caps::new_empty_simple("application/x-unknown"); - - let sdp = self.sdp.as_ref().unwrap(); - let sdp_media = sdp.media(webrtc_pad.media_idx).unwrap(); - - sdp.attributes_to_caps(global_caps.get_mut().unwrap()) - .unwrap(); - sdp_media - .attributes_to_caps(global_caps.get_mut().unwrap()) - .unwrap(); - - let caps = sdp_media - .caps_from_media(payload) - .unwrap() - .intersect(&global_caps); let s = caps.structure(0).unwrap(); let mut filtered_s = gst::Structure::new_empty("application/x-rtp"); @@ -1635,6 +1640,7 @@ impl BaseWebRTCSink { } } + #[allow(clippy::too_many_arguments)] fn configure_payloader( &self, peer_id: &str, @@ -1642,6 +1648,7 @@ impl BaseWebRTCSink { payloader: &gst::Element, codec: &Codec, ssrc: Option, + caps: Option<&gst::Caps>, extension_configuration_type: ExtensionConfigurationType, ) -> Result<(), Error> { self.obj() @@ -1658,6 +1665,41 @@ impl BaseWebRTCSink { payloader.set_property("ssrc", ssrc); } + if self.settings.lock().unwrap().do_clock_signalling { + if let Some(caps) = caps { + let clock = self + .obj() + .clock() + .expect("element added & pipeline Playing"); + + if clock.is::() || clock.is::() { + // RFC 7273 defines the "mediaclk:direct" attribute as the RTP timestamp + // value at the clock's epoch (time of origin). It was initialised to + // 0 in the SDP offer. + // + // Let's set the payloader's offset so the RTP timestamps + // are generated accordingly. + let clock_rate = + caps.structure(0) + .unwrap() + .get::("clock-rate") + .context("Setting payloader offset")? as u64; + let basetime = self + .obj() + .base_time() + .expect("element added & pipeline Playing"); + let Some(rtp_basetime) = basetime + .nseconds() + .mul_div_ceil(clock_rate, *gst::ClockTime::SECOND) + else { + anyhow::bail!("Failed to compute RTP base time. clock-rate: {clock_rate}"); + }; + + payloader.set_property("timestamp-offset", (rtp_basetime & 0xffff_ffff) as u32); + } + } + } + self.configure_congestion_control(payloader, codec, extension_configuration_type) } @@ -1778,6 +1820,58 @@ impl BaseWebRTCSink { let payloader_caps_mut = payloader_caps.make_mut(); payloader_caps_mut.set("ssrc", ssrc); + if element.imp().settings.lock().unwrap().do_clock_signalling { + // Add RFC7273 attributes when using an NTP or PTP clock + let clock = element.clock().expect("element added and pipeline playing"); + + let ts_refclk = if clock.is::() { + gst::debug!(CAT, obj: element, "Found NTP clock"); + + let addr = clock.property::("address"); + let port = clock.property::("port"); + + Some(if port == 123 { + format!("ntp={addr}") + } else { + format!("ntp={addr}:{port}") + }) + } else if clock.is::() { + gst::debug!(CAT, obj: element, "Found PTP clock"); + + let clock_id = clock.property::("grandmaster-clock-id"); + let domain = clock.property::("domain"); + + Some(format!( + "ptp=IEEE1588-2008:{:02x}-{:02x}-{:02x}-{:02x}-{:02x}-{:02x}-{:02x}-{:02x}{}", + (clock_id >> 56) & 0xff, + (clock_id >> 48) & 0xff, + (clock_id >> 40) & 0xff, + (clock_id >> 32) & 0xff, + (clock_id >> 24) & 0xff, + (clock_id >> 16) & 0xff, + (clock_id >> 8) & 0xff, + clock_id & 0xff, + if domain == 0 { + "".to_string() + } else { + format!(":{domain}") + }, + )) + } else { + None + }; + + if let Some(ts_refclk) = ts_refclk.as_deref() { + payloader_caps_mut.set("ts-refclk", Some(ts_refclk)); + // Set the offset to 0, we will adjust the payloader offsets + // when the payloaders are available. + payloader_caps_mut.set("mediaclk", Some("direct=0")); + } else { + payloader_caps_mut.set("ts-refclk", Some("local")); + payloader_caps_mut.set("mediaclk", Some("sender")); + } + } + gst::info!( CAT, obj: element, @@ -3283,6 +3377,7 @@ impl BaseWebRTCSink { &payloader, &codec, None, + None, extension_configuration_type, )?; @@ -3807,6 +3902,12 @@ impl ObjectImpl for BaseWebRTCSink { .default_value(DEFAULT_DO_RETRANSMISSION) .mutable_ready() .build(), + glib::ParamSpecBoolean::builder("do-clock-signalling") + .nick("Do clock signalling") + .blurb("Whether PTP or NTP clock & RTP/clock offset should be signalled according to RFC 7273") + .default_value(DEFAULT_DO_CLOCK_SIGNALLING) + .mutable_ready() + .build(), glib::ParamSpecBoolean::builder("enable-data-channel-navigation") .nick("Enable data channel navigation") .blurb("Enable navigation events through a dedicated WebRTCDataChannel") @@ -3884,6 +3985,10 @@ impl ObjectImpl for BaseWebRTCSink { let mut settings = self.settings.lock().unwrap(); settings.do_retransmission = value.get::().expect("type checked upstream"); } + "do-clock-signalling" => { + let mut settings = self.settings.lock().unwrap(); + settings.do_clock_signalling = value.get::().expect("type checked upstream"); + } "enable-data-channel-navigation" => { let mut settings = self.settings.lock().unwrap(); settings.enable_data_channel_navigation = @@ -3947,6 +4052,10 @@ impl ObjectImpl for BaseWebRTCSink { let settings = self.settings.lock().unwrap(); settings.do_retransmission.to_value() } + "do-clock-signalling" => { + let settings = self.settings.lock().unwrap(); + settings.do_clock_signalling.to_value() + } "enable-data-channel-navigation" => { let settings = self.settings.lock().unwrap(); settings.enable_data_channel_navigation.to_value()