From 960529d90d42ed2369644151675be41a27786657 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Wed, 26 Jun 2024 11:53:40 +0300 Subject: [PATCH] livesync: Add sync property for allowing to output buffers as soon as they arrive By default livesync will wait for each buffer on the clock. If sync is set to false, it will output buffers immediately once they're available and only waits on the clock for outputting gap filler buffers. Part-of: --- docs/plugins/gst_plugins_cache.json | 12 +++++++++ utils/livesync/src/livesync/imp.rs | 38 +++++++++++++++++++++++------ 2 files changed, 43 insertions(+), 7 deletions(-) diff --git a/docs/plugins/gst_plugins_cache.json b/docs/plugins/gst_plugins_cache.json index 50689988e..154dbf545 100644 --- a/docs/plugins/gst_plugins_cache.json +++ b/docs/plugins/gst_plugins_cache.json @@ -3410,6 +3410,18 @@ "readable": true, "type": "gboolean", "writable": true + }, + "sync": { + "blurb": "Synchronize buffers to the clock", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "true", + "mutable": "ready", + "readable": true, + "type": "gboolean", + "writable": true } }, "rank": "none" diff --git a/utils/livesync/src/livesync/imp.rs b/utils/livesync/src/livesync/imp.rs index 96e8f2913..be5ab2e93 100644 --- a/utils/livesync/src/livesync/imp.rs +++ b/utils/livesync/src/livesync/imp.rs @@ -89,6 +89,9 @@ struct State { /// See `PROP_SINGLE_SEGMENT` single_segment: bool, + /// See `PROP_SYNC` + sync: bool, + /// Latency reported by upstream upstream_latency: Option, @@ -162,6 +165,7 @@ struct State { const PROP_LATENCY: &str = "latency"; const PROP_LATE_THRESHOLD: &str = "late-threshold"; const PROP_SINGLE_SEGMENT: &str = "single-segment"; +const PROP_SYNC: &str = "sync"; const PROP_IN: &str = "in"; const PROP_DROP: &str = "drop"; @@ -181,6 +185,7 @@ impl Default for State { latency: DEFAULT_LATENCY, late_threshold: DEFAULT_LATE_THRESHOLD, single_segment: false, + sync: true, upstream_latency: None, playing: false, eos: false, @@ -291,7 +296,7 @@ impl ObjectSubclass for LiveSync { impl ObjectImpl for LiveSync { fn properties() -> &'static [glib::ParamSpec] { - static PROPERTIES: Lazy<[glib::ParamSpec; 7]> = Lazy::new(|| { + static PROPERTIES: Lazy<[glib::ParamSpec; 8]> = Lazy::new(|| { [ glib::ParamSpecUInt64::builder(PROP_LATENCY) .nick("Latency") @@ -318,6 +323,11 @@ impl ObjectImpl for LiveSync { .blurb("Timestamp buffers and eat segments so as to appear as one segment") .mutable_ready() .build(), + glib::ParamSpecBoolean::builder(PROP_SYNC) + .nick("Sync") + .blurb("Synchronize buffers to the clock") + .mutable_ready() + .build(), glib::ParamSpecUInt64::builder(PROP_IN) .nick("Frames input") .blurb("Number of incoming frames accepted") @@ -369,6 +379,10 @@ impl ObjectImpl for LiveSync { state.single_segment = value.get().unwrap(); } + PROP_SYNC => { + state.sync = value.get().unwrap(); + } + _ => unimplemented!(), } } @@ -379,6 +393,7 @@ impl ObjectImpl for LiveSync { PROP_LATENCY => state.latency.to_value(), PROP_LATE_THRESHOLD => state.late_threshold.to_value(), PROP_SINGLE_SEGMENT => state.single_segment.to_value(), + PROP_SYNC => state.sync.to_value(), PROP_IN => state.num_in.to_value(), PROP_DROP => state.num_drop.to_value(), PROP_OUT => state.num_out.to_value(), @@ -950,6 +965,14 @@ impl LiveSync { state.in_timestamp = timestamp; self.cond.notify_all(); + // If we're not strictly syncing to the clock but output buffers as soon as they arrive + // then also wake up the source pad task now in case it's waiting on the clock. + if !state.sync { + if let Some(clock_id) = state.clock_id.take() { + clock_id.unschedule(); + } + } + Ok(gst::FlowSuccess::Ok) } @@ -1008,7 +1031,12 @@ impl LiveSync { } state.srcresult?; - if let Some(out_timestamp) = state.out_timestamp { + // Synchronize to the clock if requested to do so, or when the queue is currently empty + // and we might have to introduce a gap buffer. + if let Some(out_timestamp) = (state.sync || state.queue.is_empty()) + .then_some(state.out_timestamp) + .flatten() + { let sync_ts = out_timestamp.end; let element = self.obj(); @@ -1038,12 +1066,8 @@ impl LiveSync { if jitter.is_negative() {"-"} else {""}, gst::ClockTime::from_nseconds(jitter.unsigned_abs())); - if res == Err(gst::ClockError::Unscheduled) { - return Err(gst::FlowError::Flushing); - } - - state.srcresult?; state.clock_id = None; + state.srcresult?; } let in_item = state.queue.pop_front();