diff --git a/docs/plugins/gst_plugins_cache.json b/docs/plugins/gst_plugins_cache.json index 50689988..154dbf54 100644 --- a/docs/plugins/gst_plugins_cache.json +++ b/docs/plugins/gst_plugins_cache.json @@ -3410,6 +3410,18 @@ "readable": true, "type": "gboolean", "writable": true + }, + "sync": { + "blurb": "Synchronize buffers to the clock", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "true", + "mutable": "ready", + "readable": true, + "type": "gboolean", + "writable": true } }, "rank": "none" diff --git a/utils/livesync/src/livesync/imp.rs b/utils/livesync/src/livesync/imp.rs index 96e8f291..be5ab2e9 100644 --- a/utils/livesync/src/livesync/imp.rs +++ b/utils/livesync/src/livesync/imp.rs @@ -89,6 +89,9 @@ struct State { /// See `PROP_SINGLE_SEGMENT` single_segment: bool, + /// See `PROP_SYNC` + sync: bool, + /// Latency reported by upstream upstream_latency: Option, @@ -162,6 +165,7 @@ struct State { const PROP_LATENCY: &str = "latency"; const PROP_LATE_THRESHOLD: &str = "late-threshold"; const PROP_SINGLE_SEGMENT: &str = "single-segment"; +const PROP_SYNC: &str = "sync"; const PROP_IN: &str = "in"; const PROP_DROP: &str = "drop"; @@ -181,6 +185,7 @@ impl Default for State { latency: DEFAULT_LATENCY, late_threshold: DEFAULT_LATE_THRESHOLD, single_segment: false, + sync: true, upstream_latency: None, playing: false, eos: false, @@ -291,7 +296,7 @@ impl ObjectSubclass for LiveSync { impl ObjectImpl for LiveSync { fn properties() -> &'static [glib::ParamSpec] { - static PROPERTIES: Lazy<[glib::ParamSpec; 7]> = Lazy::new(|| { + static PROPERTIES: Lazy<[glib::ParamSpec; 8]> = Lazy::new(|| { [ glib::ParamSpecUInt64::builder(PROP_LATENCY) .nick("Latency") @@ -318,6 +323,11 @@ impl ObjectImpl for LiveSync { .blurb("Timestamp buffers and eat segments so as to appear as one segment") .mutable_ready() .build(), + glib::ParamSpecBoolean::builder(PROP_SYNC) + .nick("Sync") + .blurb("Synchronize buffers to the clock") + .mutable_ready() + .build(), glib::ParamSpecUInt64::builder(PROP_IN) .nick("Frames input") .blurb("Number of incoming frames accepted") @@ -369,6 +379,10 @@ impl ObjectImpl for LiveSync { state.single_segment = value.get().unwrap(); } + PROP_SYNC => { + state.sync = value.get().unwrap(); + } + _ => unimplemented!(), } } @@ -379,6 +393,7 @@ impl ObjectImpl for LiveSync { PROP_LATENCY => state.latency.to_value(), PROP_LATE_THRESHOLD => state.late_threshold.to_value(), PROP_SINGLE_SEGMENT => state.single_segment.to_value(), + PROP_SYNC => state.sync.to_value(), PROP_IN => state.num_in.to_value(), PROP_DROP => state.num_drop.to_value(), PROP_OUT => state.num_out.to_value(), @@ -950,6 +965,14 @@ impl LiveSync { state.in_timestamp = timestamp; self.cond.notify_all(); + // If we're not strictly syncing to the clock but output buffers as soon as they arrive + // then also wake up the source pad task now in case it's waiting on the clock. + if !state.sync { + if let Some(clock_id) = state.clock_id.take() { + clock_id.unschedule(); + } + } + Ok(gst::FlowSuccess::Ok) } @@ -1008,7 +1031,12 @@ impl LiveSync { } state.srcresult?; - if let Some(out_timestamp) = state.out_timestamp { + // Synchronize to the clock if requested to do so, or when the queue is currently empty + // and we might have to introduce a gap buffer. + if let Some(out_timestamp) = (state.sync || state.queue.is_empty()) + .then_some(state.out_timestamp) + .flatten() + { let sync_ts = out_timestamp.end; let element = self.obj(); @@ -1038,12 +1066,8 @@ impl LiveSync { if jitter.is_negative() {"-"} else {""}, gst::ClockTime::from_nseconds(jitter.unsigned_abs())); - if res == Err(gst::ClockError::Unscheduled) { - return Err(gst::FlowError::Flushing); - } - - state.srcresult?; state.clock_id = None; + state.srcresult?; } let in_item = state.queue.pop_front();