From 42385c81becabb075e0edfbbf2a9ec1376df6edc Mon Sep 17 00:00:00 2001 From: "Jan Alexander Steffens (heftig)" Date: Wed, 12 Oct 2022 15:35:11 +0200 Subject: [PATCH] Add livesync plugin It attempts to produce a (nearly) gapless live stream by synchronizing its output to the running time and forwarding the next input buffer if its start is (nearly) flush with the end of the last output buffer. If the input buffer is missing or too far in the future, it duplicates the last output buffer with adjusted timestamps. If it is operating on a raw audio stream, it will fill duplicate buffers with silence. If an input buffer arrives too late, it is thrown away. If the last input buffer was accepted too long ago (according to `late-threshold`), a late input buffer is accepted anyway, but immediately considered a duplicate. Due to the silence-filling, this has no effect on audio, but video gets a "slideshow" effect instead of freezing completely. The "many-repeats" property will be notified when this element has recently duplicated a lot of buffers or recovered from such a state. Co-authored-by: Vivia Nikolaidou Part-of: --- Cargo.toml | 2 + README.md | 3 + docs/plugins/gst_plugins_cache.json | 136 +++ meson.build | 1 + utils/livesync/Cargo.toml | 58 ++ utils/livesync/LICENSE-MPL-2.0 | 1 + utils/livesync/build.rs | 3 + utils/livesync/examples/gtk_livesync.rs | 175 ++++ utils/livesync/src/lib.rs | 36 + utils/livesync/src/livesync/imp.rs | 1202 +++++++++++++++++++++++ utils/livesync/src/livesync/mod.rs | 28 + utils/livesync/tests/livesync.rs | 198 ++++ 12 files changed, 1843 insertions(+) create mode 100644 utils/livesync/Cargo.toml create mode 120000 utils/livesync/LICENSE-MPL-2.0 create mode 100644 utils/livesync/build.rs create mode 100644 utils/livesync/examples/gtk_livesync.rs create mode 100644 utils/livesync/src/lib.rs create mode 100644 utils/livesync/src/livesync/imp.rs create mode 100644 utils/livesync/src/livesync/mod.rs create mode 100644 utils/livesync/tests/livesync.rs diff --git a/Cargo.toml b/Cargo.toml index a8cf771d..96e090c9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -36,6 +36,7 @@ members = [ "text/wrap", "utils/fallbackswitch", + "utils/livesync", "utils/togglerecord", "utils/tracers", "utils/uriplaylistbin", @@ -84,6 +85,7 @@ default-members = [ "text/wrap", "utils/fallbackswitch", + "utils/livesync", "utils/togglerecord", "utils/tracers", "utils/uriplaylistbin", diff --git a/README.md b/README.md index 575dbd5d..2b50eb4d 100644 --- a/README.md +++ b/README.md @@ -127,6 +127,9 @@ You will find the following plugins in this repository: configuring a fallback audio/video if there are problems with the main source. + - `livesync`: Element to maintain a continuous live stream from a + potentially unstable source. + - `togglerecord`: Element to enable starting and stopping multiple streams together. - `tracers`: Plugin with multiple tracers: diff --git a/docs/plugins/gst_plugins_cache.json b/docs/plugins/gst_plugins_cache.json index 42779a54..1a05277b 100644 --- a/docs/plugins/gst_plugins_cache.json +++ b/docs/plugins/gst_plugins_cache.json @@ -2497,6 +2497,142 @@ "tracers": {}, "url": "https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs" }, + "livesync": { + "description": "Livesync Plugin", + "elements": { + "livesync": { + "author": "Jan Alexander Steffens (heftig) ", + "description": "Outputs livestream, inserting gap frames when input lags", + "hierarchy": [ + "GstLiveSync", + "GstElement", + "GstObject", + "GInitiallyUnowned", + "GObject" + ], + "klass": "Filter", + "long-name": "Live Synchronizer", + "pad-templates": { + "sink": { + "caps": "ANY", + "direction": "sink", + "presence": "always" + }, + "src": { + "caps": "ANY", + "direction": "src", + "presence": "always" + } + }, + "properties": { + "drop": { + "blurb": "Number of incoming frames dropped", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "0", + "max": "18446744073709551615", + "min": "0", + "mutable": "null", + "readable": true, + "type": "guint64", + "writable": false + }, + "duplicate": { + "blurb": "Number of outgoing frames duplicated", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "0", + "max": "18446744073709551615", + "min": "0", + "mutable": "null", + "readable": true, + "type": "guint64", + "writable": false + }, + "in": { + "blurb": "Number of incoming frames accepted", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "0", + "max": "18446744073709551615", + "min": "0", + "mutable": "null", + "readable": true, + "type": "guint64", + "writable": false + }, + "late-threshold": { + "blurb": "Maximum time spent (in nanoseconds) before accepting one late buffer; -1 = never", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "2000000000", + "max": "18446744073709551615", + "min": "1000000000", + "mutable": "playing", + "readable": true, + "type": "guint64", + "writable": true + }, + "latency": { + "blurb": "Additional latency to allow upstream to take longer to produce buffers for the current position (in nanoseconds)", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "0", + "max": "9223372036854775807", + "min": "0", + "mutable": "playing", + "readable": true, + "type": "guint64", + "writable": true + }, + "out": { + "blurb": "Number of outgoing frames produced", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "0", + "max": "18446744073709551615", + "min": "0", + "mutable": "null", + "readable": true, + "type": "guint64", + "writable": false + }, + "single-segment": { + "blurb": "Timestamp buffers and eat segments so as to appear as one segment", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "false", + "mutable": "ready", + "readable": true, + "type": "gboolean", + "writable": true + } + }, + "rank": "none" + } + }, + "filename": "gstlivesync", + "license": "MPL", + "other-types": {}, + "package": "gst-plugin-livesync", + "source": "gst-plugin-livesync", + "tracers": {}, + "url": "https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs" + }, "mp4": { "description": "GStreamer Rust MP4 Plugin", "elements": { diff --git a/meson.build b/meson.build index a800e3cd..37d62c17 100644 --- a/meson.build +++ b/meson.build @@ -67,6 +67,7 @@ plugins = { 'gst-plugin-textwrap': 'libgsttextwrap', 'gst-plugin-fallbackswitch': 'libgstfallbackswitch', + 'gst-plugin-livesync': 'libgstlivesync', 'gst-plugin-togglerecord': 'libgsttogglerecord', 'gst-plugin-tracers': 'libgstrstracers', 'gst-plugin-uriplaylistbin': 'libgsturiplaylistbin', diff --git a/utils/livesync/Cargo.toml b/utils/livesync/Cargo.toml new file mode 100644 index 00000000..ccd6e206 --- /dev/null +++ b/utils/livesync/Cargo.toml @@ -0,0 +1,58 @@ +[package] +name = "gst-plugin-livesync" +version = "0.9.0-alpha.1" +authors = ["Jan Alexander Steffens (heftig) "] +license = "MPL-2.0" +description = "Livesync Plugin" +repository = "https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs" +edition = "2021" +rust-version = "1.63" + +[dependencies] +gio = { git = "https://github.com/gtk-rs/gtk-rs-core", optional = true } +gst = { package = "gstreamer", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" } +gst-audio = { package = "gstreamer-audio", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" } +gst-plugin-gtk4 = { path = "../../video/gtk4", optional = true } +gtk = { package = "gtk4", git = "https://github.com/gtk-rs/gtk4-rs", optional = true } +muldiv = "1.0" +num-rational = { version = "0.4", default-features = false, features = [] } +once_cell = "1.0" +parking_lot = "0.12" + +[dev-dependencies] +gst-check = { package = "gstreamer-check", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" } + +[lib] +name = "gstlivesync" +crate-type = ["cdylib", "rlib"] +path = "src/lib.rs" + +[[example]] +name = "gtk-livesync" +path = "examples/gtk_livesync.rs" +required-features = ["gtk", "gio", "gst-plugin-gtk4"] + +[[test]] +name = "livesync" +path = "tests/livesync.rs" + +[build-dependencies] +gst-plugin-version-helper = { path="../../version-helper" } + +[features] +static = [] +capi = [] +doc = ["gst/v1_18"] + +[package.metadata.capi] +min_version = "0.8.0" + +[package.metadata.capi.header] +enabled = false + +[package.metadata.capi.library] +install_subdir = "gstreamer-1.0" +versioning = false + +[package.metadata.capi.pkg_config] +requires_private = "gstreamer-1.0, gstreamer-audio-1.0, gobject-2.0, glib-2.0, gmodule-2.0" diff --git a/utils/livesync/LICENSE-MPL-2.0 b/utils/livesync/LICENSE-MPL-2.0 new file mode 120000 index 00000000..eb5d24fe --- /dev/null +++ b/utils/livesync/LICENSE-MPL-2.0 @@ -0,0 +1 @@ +../../LICENSE-MPL-2.0 \ No newline at end of file diff --git a/utils/livesync/build.rs b/utils/livesync/build.rs new file mode 100644 index 00000000..cda12e57 --- /dev/null +++ b/utils/livesync/build.rs @@ -0,0 +1,3 @@ +fn main() { + gst_plugin_version_helper::info() +} diff --git a/utils/livesync/examples/gtk_livesync.rs b/utils/livesync/examples/gtk_livesync.rs new file mode 100644 index 00000000..0a24acdf --- /dev/null +++ b/utils/livesync/examples/gtk_livesync.rs @@ -0,0 +1,175 @@ +// Copyright (C) 2022 LTN Global Communications, Inc. +// Contact: Jan Alexander Steffens (heftig) +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +use gio::prelude::*; +use gst::{glib, prelude::*}; +use gtk::prelude::*; +use std::cell::Cell; + +struct DroppingProbe(glib::WeakRef, Option); + +impl DroppingProbe { + fn install(pad: &gst::Pad) -> Self { + let probe_id = pad + .add_probe(gst::PadProbeType::BUFFER, |_, _| gst::PadProbeReturn::Drop) + .unwrap(); + Self(pad.downgrade(), Some(probe_id)) + } +} + +impl Drop for DroppingProbe { + fn drop(&mut self) { + if let Some((pad, probe_id)) = self.0.upgrade().zip(self.1.take()) { + pad.remove_probe(probe_id); + } + } +} + +fn create_pipeline() -> gst::Pipeline { + gst::parse_launch( + r#"videotestsrc name=vsrc is-live=1 + ! video/x-raw,framerate=60/1,width=800,height=600 + ! timeoverlay text="Pre:" + ! queue + ! livesync latency=50000000 + ! videorate + ! timeoverlay text="Post:" halignment=right + ! queue + ! gtk4paintablesink name=vsink + audiotestsrc name=asrc is-live=1 + ! audio/x-raw,channels=2 + ! queue + ! livesync latency=50000000 + ! audiorate + ! queue + ! autoaudiosink + "#, + ) + .expect("Failed to create pipeline") + .downcast() + .unwrap() +} + +fn create_window(app: >k::Application) { + let pipeline = create_pipeline(); + let video_src_pad = pipeline.by_name("vsrc").unwrap().static_pad("src").unwrap(); + let audio_src_pad = pipeline.by_name("asrc").unwrap().static_pad("src").unwrap(); + + let window = gtk::ApplicationWindow::new(app); + window.set_default_size(800, 684); + + let vbox = gtk::Box::new(gtk::Orientation::Vertical, 0); + + let picture = gtk::Picture::new(); + let paintable = pipeline + .by_name("vsink") + .unwrap() + .property::("paintable"); + picture.set_paintable(Some(&paintable)); + vbox.append(&picture); + + let action_bar = gtk::ActionBar::new(); + vbox.append(&action_bar); + + let offset_spin = gtk::SpinButton::with_range(0.0, 500.0, 100.0); + action_bar.pack_start(&offset_spin); + + { + let video_src_pad = video_src_pad.clone(); + let audio_src_pad = audio_src_pad.clone(); + offset_spin.connect_value_notify(move |offset_spin| { + const MSECOND: f64 = gst::ClockTime::MSECOND.nseconds() as _; + + let offset = (offset_spin.value() * -MSECOND) as i64; + video_src_pad.set_offset(offset); + audio_src_pad.set_offset(offset); + }); + } + + let drop_button = gtk::ToggleButton::with_label("Drop Signal"); + action_bar.pack_end(&drop_button); + + let drop_ids = Cell::new(None); + drop_button.connect_toggled(move |drop_button| { + if drop_button.is_active() { + let video_probe = DroppingProbe::install(&video_src_pad); + let audio_probe = DroppingProbe::install(&audio_src_pad); + drop_ids.set(Some((video_probe, audio_probe))); + } else { + drop_ids.set(None); + } + }); + + { + let bus = pipeline.bus().unwrap(); + let window = window.downgrade(); + bus.add_watch_local(move |_, msg| { + use gst::MessageView; + + match msg.view() { + MessageView::Eos(..) => { + if let Some(window) = window.upgrade() { + window.close(); + } + } + + MessageView::Error(err) => { + eprintln!( + "Error from {}: {} ({:?})", + msg.src().map(|s| s.path_string()).as_deref().unwrap_or(""), + err.error(), + err.debug(), + ); + if let Some(window) = window.upgrade() { + window.close(); + } + } + + _ => (), + }; + + glib::Continue(true) + }) + .unwrap(); + } + + { + let pipeline = pipeline.clone(); + window.connect_realize(move |_| { + pipeline + .set_state(gst::State::Playing) + .expect("Failed to start pipeline"); + }); + } + + window.connect_unrealize(move |_| { + pipeline + .set_state(gst::State::Null) + .expect("Failed to stop pipeline"); + }); + + window.set_child(Some(&vbox)); + window.show(); +} + +fn main() { + let app = gtk::Application::new( + Some("gtk-plugins-rs.gtk-livesync"), + gio::ApplicationFlags::FLAGS_NONE, + ); + + app.connect_startup(move |_app| { + gst::init().expect("Failed to initialize GStreamer"); + gstlivesync::plugin_register_static().expect("Failed to register livesync plugin"); + gstgtk4::plugin_register_static().expect("Failed to register gstgtk4 plugin"); + }); + + app.connect_activate(create_window); + app.run(); +} diff --git a/utils/livesync/src/lib.rs b/utils/livesync/src/lib.rs new file mode 100644 index 00000000..9d646c36 --- /dev/null +++ b/utils/livesync/src/lib.rs @@ -0,0 +1,36 @@ +// Copyright (C) 2022 LTN Global Communications, Inc. +// Contact: Jan Alexander Steffens (heftig) +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +#![allow(clippy::non_send_fields_in_send_ty, unused_doc_comments)] + +/** + * plugin-livesync: + * + * Since: plugins-rs-0.9.0 + */ +use gst::glib; + +mod livesync; + +fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { + livesync::register(plugin) +} + +gst::plugin_define!( + livesync, + env!("CARGO_PKG_DESCRIPTION"), + plugin_init, + concat!(env!("CARGO_PKG_VERSION"), "-", env!("COMMIT_ID")), + // FIXME: MPL-2.0 is only allowed since 1.18.3 (as unknown) and 1.20 (as known) + "MPL", + env!("CARGO_PKG_NAME"), + env!("CARGO_PKG_NAME"), + env!("CARGO_PKG_REPOSITORY"), + env!("BUILD_REL_DATE") +); diff --git a/utils/livesync/src/livesync/imp.rs b/utils/livesync/src/livesync/imp.rs new file mode 100644 index 00000000..63c15678 --- /dev/null +++ b/utils/livesync/src/livesync/imp.rs @@ -0,0 +1,1202 @@ +// Copyright (C) 2022 LTN Global Communications, Inc. +// Contact: Jan Alexander Steffens (heftig) +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +use gst::{ + glib::{self, translate::IntoGlib}, + prelude::*, + subclass::prelude::*, +}; +use once_cell::sync::Lazy; +use parking_lot::{Condvar, Mutex, MutexGuard}; +use std::sync::mpsc; + +/// Offset for the segment in single-segment mode, to handle negative DTS +const SEGMENT_OFFSET: gst::ClockTime = gst::ClockTime::from_seconds(60 * 60 * 1000); + +static CAT: Lazy = Lazy::new(|| { + gst::DebugCategory::new( + "livesync", + gst::DebugColorFlags::empty(), + Some("debug category for the livesync element"), + ) +}); + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum BufferLateness { + OnTime, + LateUnderThreshold, + LateOverThreshold, +} + +#[derive(Debug)] +enum Item { + Buffer(gst::Buffer, BufferLateness), + Event(gst::Event), + // SAFETY: Item needs to wait until the query and the receiver has returned + Query(std::ptr::NonNull, mpsc::SyncSender), +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +struct Timestamps { + start: gst::ClockTime, + end: gst::ClockTime, +} + +// SAFETY: Need to be able to pass *mut gst::QueryRef +unsafe impl Send for Item {} + +#[derive(Debug)] +pub struct LiveSync { + state: Mutex, + cond: Condvar, + sinkpad: gst::Pad, + srcpad: gst::Pad, +} + +#[derive(Debug)] +struct State { + latency: gst::ClockTime, + late_threshold: Option, + + upstream_latency: Option, + fallback_duration: gst::ClockTime, + + eos: bool, + segment: Option>, + + srcresult: Result, + playing: bool, + sent_segment: bool, + clock_id: Option, + + in_caps: Option, + in_audio_info: Option, + out_audio_info: Option, + + in_item: Option, + out_buffer: Option, + + in_timestamp: Option, + out_timestamp: Option, + + num_in: u64, + num_drop: u64, + num_out: u64, + num_duplicate: u64, + single_segment: bool, +} + +const PROP_LATENCY: &str = "latency"; +const PROP_LATE_THRESHOLD: &str = "late-threshold"; + +const PROP_IN: &str = "in"; +const PROP_DROP: &str = "drop"; +const PROP_OUT: &str = "out"; +const PROP_DUPLICATE: &str = "duplicate"; +const PROP_SINGLE_SEGMENT: &str = "single-segment"; + +const DEFAULT_LATENCY: gst::ClockTime = gst::ClockTime::ZERO; +const DEFAULT_DURATION: gst::ClockTime = gst::ClockTime::from_mseconds(100); +const MINIMUM_LATE_THRESHOLD: gst::ClockTime = gst::ClockTime::from_seconds(1); +const DEFAULT_LATE_THRESHOLD: Option = Some(gst::ClockTime::from_seconds(2)); + +impl Default for State { + fn default() -> Self { + Self { + latency: DEFAULT_LATENCY, + late_threshold: DEFAULT_LATE_THRESHOLD, + upstream_latency: None, + fallback_duration: DEFAULT_DURATION, + eos: false, + segment: None, + srcresult: Err(gst::FlowError::Flushing), + playing: false, + sent_segment: false, + clock_id: None, + in_caps: None, + in_audio_info: None, + out_audio_info: None, + in_item: None, + out_buffer: None, + in_timestamp: None, + out_timestamp: None, + num_in: 0, + num_drop: 0, + num_out: 0, + num_duplicate: 0, + single_segment: false, + } + } +} + +#[glib::object_subclass] +impl ObjectSubclass for LiveSync { + const NAME: &'static str = "GstLiveSync"; + type Type = super::LiveSync; + type ParentType = gst::Element; + + fn with_class(class: &Self::Class) -> Self { + let sinkpad = + gst::Pad::builder_with_template(&class.pad_template("sink").unwrap(), Some("sink")) + .activatemode_function(|pad, parent, mode, active| { + Self::catch_panic_pad_function( + parent, + || Err(gst::loggable_error!(CAT, "sink_activate_mode panicked")), + |livesync| livesync.sink_activate_mode(pad, mode, active), + ) + }) + .event_function(|pad, parent, event| { + Self::catch_panic_pad_function( + parent, + || false, + |livesync| livesync.sink_event(pad, event), + ) + }) + .query_function(|pad, parent, query| { + Self::catch_panic_pad_function( + parent, + || false, + |livesync| livesync.sink_query(pad, query), + ) + }) + .chain_function(|pad, parent, buffer| { + Self::catch_panic_pad_function( + parent, + || Err(gst::FlowError::Error), + |livesync| livesync.sink_chain(pad, buffer), + ) + }) + .flags( + gst::PadFlags::PROXY_CAPS + | gst::PadFlags::PROXY_ALLOCATION + | gst::PadFlags::PROXY_SCHEDULING, + ) + .build(); + + let srcpad = + gst::Pad::builder_with_template(&class.pad_template("src").unwrap(), Some("src")) + .activatemode_function(|pad, parent, mode, active| { + Self::catch_panic_pad_function( + parent, + || Err(gst::loggable_error!(CAT, "src_activate_mode panicked")), + |livesync| livesync.src_activate_mode(pad, mode, active), + ) + }) + .event_function(|pad, parent, event| { + Self::catch_panic_pad_function( + parent, + || false, + |livesync| livesync.src_event(pad, event), + ) + }) + .query_function(|pad, parent, query| { + Self::catch_panic_pad_function( + parent, + || false, + |livesync| livesync.src_query(pad, query), + ) + }) + .flags( + gst::PadFlags::PROXY_CAPS + | gst::PadFlags::PROXY_ALLOCATION + | gst::PadFlags::PROXY_SCHEDULING, + ) + .build(); + + Self { + state: Default::default(), + cond: Condvar::new(), + sinkpad, + srcpad, + } + } +} + +impl ObjectImpl for LiveSync { + fn properties() -> &'static [glib::ParamSpec] { + static PROPERTIES: Lazy<[glib::ParamSpec; 7]> = Lazy::new(|| { + [ + glib::ParamSpecUInt64::builder(PROP_LATENCY) + .nick("Latency") + .blurb( + "Additional latency to allow upstream to take longer to \ + produce buffers for the current position (in nanoseconds)", + ) + .maximum(i64::MAX as u64) + .default_value(DEFAULT_LATENCY.into_glib()) + .mutable_playing() + .build(), + glib::ParamSpecUInt64::builder(PROP_LATE_THRESHOLD) + .nick("Late threshold") + .blurb( + "Maximum time spent (in nanoseconds) before \ + accepting one late buffer; -1 = never", + ) + .minimum(MINIMUM_LATE_THRESHOLD.into_glib()) + .default_value(DEFAULT_LATE_THRESHOLD.into_glib()) + .mutable_playing() + .build(), + glib::ParamSpecUInt64::builder(PROP_IN) + .nick("Frames input") + .blurb("Number of incoming frames accepted") + .read_only() + .build(), + glib::ParamSpecUInt64::builder(PROP_DROP) + .nick("Frames dropped") + .blurb("Number of incoming frames dropped") + .read_only() + .build(), + glib::ParamSpecUInt64::builder(PROP_OUT) + .nick("Frames output") + .blurb("Number of outgoing frames produced") + .read_only() + .build(), + glib::ParamSpecUInt64::builder(PROP_DUPLICATE) + .nick("Frames duplicated") + .blurb("Number of outgoing frames duplicated") + .read_only() + .build(), + glib::ParamSpecBoolean::builder(PROP_SINGLE_SEGMENT) + .nick("Single segment") + .blurb("Timestamp buffers and eat segments so as to appear as one segment") + .mutable_ready() + .build(), + ] + }); + + PROPERTIES.as_ref() + } + + fn constructed(&self) { + self.parent_constructed(); + + let obj = self.obj(); + obj.add_pad(&self.sinkpad).unwrap(); + obj.add_pad(&self.srcpad).unwrap(); + obj.set_element_flags(gst::ElementFlags::PROVIDE_CLOCK | gst::ElementFlags::REQUIRE_CLOCK); + } + + fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) { + let mut state = self.state.lock(); + match pspec.name() { + PROP_LATENCY => { + state.latency = value.get().unwrap(); + state.update_fallback_duration(); + let _ = self.obj().post_message(gst::message::Latency::new()); + } + + PROP_LATE_THRESHOLD => { + state.late_threshold = value.get().unwrap(); + } + + PROP_SINGLE_SEGMENT => { + state.single_segment = value.get().unwrap(); + } + + _ => unimplemented!(), + } + } + + fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { + let state = self.state.lock(); + match pspec.name() { + PROP_LATENCY => state.latency.to_value(), + PROP_LATE_THRESHOLD => state.late_threshold.to_value(), + PROP_IN => state.num_in.to_value(), + PROP_DROP => state.num_drop.to_value(), + PROP_OUT => state.num_out.to_value(), + PROP_DUPLICATE => state.num_duplicate.to_value(), + PROP_SINGLE_SEGMENT => state.single_segment.to_value(), + _ => unimplemented!(), + } + } +} + +impl GstObjectImpl for LiveSync {} + +impl ElementImpl for LiveSync { + fn metadata() -> Option<&'static gst::subclass::ElementMetadata> { + static ELEMENT_METADATA: Lazy = Lazy::new(|| { + gst::subclass::ElementMetadata::new( + "Live Synchronizer", + "Filter", + "Outputs livestream, inserting gap frames when input lags", + "Jan Alexander Steffens (heftig) ", + ) + }); + + Some(&*ELEMENT_METADATA) + } + + fn pad_templates() -> &'static [gst::PadTemplate] { + static PAD_TEMPLATES: Lazy<[gst::PadTemplate; 2]> = Lazy::new(|| { + let caps = gst::Caps::new_any(); + + [ + gst::PadTemplate::new( + "sink", + gst::PadDirection::Sink, + gst::PadPresence::Always, + &caps, + ) + .unwrap(), + gst::PadTemplate::new( + "src", + gst::PadDirection::Src, + gst::PadPresence::Always, + &caps, + ) + .unwrap(), + ] + }); + + PAD_TEMPLATES.as_ref() + } + + fn change_state( + &self, + transition: gst::StateChange, + ) -> Result { + gst::trace!(CAT, imp: self, "Changing state {:?}", transition); + + if transition == gst::StateChange::PausedToPlaying { + let mut state = self.state.lock(); + state.playing = true; + self.cond.notify_all(); + } + + let success = self.parent_change_state(transition)?; + + if transition == gst::StateChange::PlayingToPaused { + let mut state = self.state.lock(); + state.playing = false; + } + + match (transition, success) { + ( + gst::StateChange::ReadyToPaused | gst::StateChange::PlayingToPaused, + gst::StateChangeSuccess::Success, + ) => Ok(gst::StateChangeSuccess::NoPreroll), + (_, s) => Ok(s), + } + } + + fn provide_clock(&self) -> Option { + Some(gst::SystemClock::obtain()) + } +} + +impl State { + /// Calculate the running time the buffer covers, including latency + fn ts_range(&self, buf: &gst::BufferRef) -> Option { + let mut timestamp_start = buf.dts_or_pts()?; + + if !self.single_segment { + timestamp_start = self + .segment + .as_ref() + .unwrap() + .to_running_time(timestamp_start) + .unwrap_or(gst::ClockTime::ZERO); + timestamp_start += self.latency + self.upstream_latency.unwrap(); + } else { + timestamp_start += self.upstream_latency.unwrap(); + timestamp_start = timestamp_start.saturating_sub(SEGMENT_OFFSET); + } + + Some(Timestamps { + start: timestamp_start, + end: timestamp_start + buf.duration().unwrap(), + }) + } + + fn update_fallback_duration(&mut self) { + self.fallback_duration = self + // First, try 1/framerate from the caps + .in_caps + .as_ref() + .and_then(|c| c.structure(0)) + .filter(|s| s.name().starts_with("video/")) + .and_then(|s| s.get::("framerate").ok()) + .and_then(|framerate| { + gst::ClockTime::SECOND + .mul_div_round(framerate.numer() as u64, framerate.denom() as u64) + }) + .filter(|&dur| dur > 8.mseconds() && dur < 10.seconds()) + // Otherwise, half the configured latency + .or_else(|| Some(self.latency / 2)) + // In any case, don't allow a zero duration + .filter(|&dur| dur > gst::ClockTime::ZERO) + // Safe default + .unwrap_or(DEFAULT_DURATION); + } +} + +impl LiveSync { + fn sink_activate_mode( + &self, + pad: &gst::Pad, + mode: gst::PadMode, + active: bool, + ) -> Result<(), gst::LoggableError> { + if mode != gst::PadMode::Push { + return Err(gst::loggable_error!(CAT, "Wrong scheduling mode")); + } + + if active { + let mut state = self.state.lock(); + state.srcresult = Ok(gst::FlowSuccess::Ok); + state.eos = false; + state.in_timestamp = None; + state.num_in = 0; + state.num_drop = 0; + state.segment = None; + } else { + { + let mut state = self.state.lock(); + state.srcresult = Err(gst::FlowError::Flushing); + state.out_buffer = None; + state.out_audio_info = None; + if let Some(clock_id) = state.clock_id.take() { + clock_id.unschedule(); + } + self.cond.notify_all(); + } + + let lock = pad.stream_lock(); + { + let mut state = self.state.lock(); + state.in_caps = None; + state.in_audio_info = None; + state.in_item = None; + state.update_fallback_duration(); + } + drop(lock); + } + + Ok(()) + } + + fn src_activate_mode( + &self, + pad: &gst::Pad, + mode: gst::PadMode, + active: bool, + ) -> Result<(), gst::LoggableError> { + if mode != gst::PadMode::Push { + return Err(gst::loggable_error!(CAT, "Wrong scheduling mode")); + } + + if active { + let ret; + + { + let mut state = self.state.lock(); + + state.srcresult = Ok(gst::FlowSuccess::Ok); + state.sent_segment = false; + state.out_timestamp = None; + state.num_out = 0; + state.num_duplicate = 0; + + ret = self.start_src_task().map_err(Into::into); + } + + ret + } else { + { + let mut state = self.state.lock(); + state.srcresult = Err(gst::FlowError::Flushing); + state.out_buffer = None; + state.out_audio_info = None; + if let Some(clock_id) = state.clock_id.take() { + clock_id.unschedule(); + } + self.cond.notify_all(); + } + + pad.stop_task().map_err(Into::into) + } + } + + fn sink_event(&self, pad: &gst::Pad, mut event: gst::Event) -> bool { + { + let state = self.state.lock(); + if state.single_segment { + let event = event.make_mut(); + let latency = state.latency.nseconds() as i64; + event.set_running_time_offset(event.running_time_offset() + latency); + } + } + + match event.view() { + gst::EventView::FlushStart(_) => { + let ret = self.srcpad.push_event(event); + + { + let mut state = self.state.lock(); + state.srcresult = Err(gst::FlowError::Flushing); + if let Some(clock_id) = state.clock_id.take() { + clock_id.unschedule(); + } + self.cond.notify_all(); + } + + let _ = self.srcpad.pause_task(); + return ret; + } + + gst::EventView::FlushStop(_) => { + let ret = self.srcpad.push_event(event); + + let mut state = self.state.lock(); + state.srcresult = Ok(gst::FlowSuccess::Ok); + state.eos = false; + state.sent_segment = false; + state.segment = None; + state.in_caps = None; + state.in_audio_info = None; + state.out_audio_info = None; + state.in_item = None; + state.out_buffer = None; + state.update_fallback_duration(); + + let _ = self.start_src_task(); + return ret; + } + + gst::EventView::StreamStart(_) => { + let mut state = self.state.lock(); + state.srcresult = Ok(gst::FlowSuccess::Ok); + state.eos = false; + } + + gst::EventView::Segment(e) => { + let segment = match e.segment().downcast_ref() { + Some(s) => s, + None => { + gst::error!(CAT, imp: self, "Got non-TIME segment"); + return false; + } + }; + + let mut state = self.state.lock(); + state.segment = Some(segment.clone()); + state.sent_segment = false; + return true; + } + + gst::EventView::Gap(_) => { + gst::debug!(CAT, imp: self, "Got gap event"); + return true; + } + + gst::EventView::Eos(_) => { + let mut state = self.state.lock(); + + if let Err(err) = state.srcresult { + if matches!(err, gst::FlowError::Flushing | gst::FlowError::Eos) { + let err = state.srcresult.unwrap_err(); + gst::element_imp_error!( + self, + gst::StreamError::Failed, + ("Internal data flow error."), + ["streaming task paused, reason {} ({:?})", err, err] + ); + } + } + + state.eos = true; + } + + gst::EventView::Caps(c) => { + let caps = c.caps_owned(); + + let audio_info = match caps + .structure(0) + .unwrap() + .has_name("audio/x-raw") + .then(|| gst_audio::AudioInfo::from_caps(&caps)) + .transpose() + { + Ok(ai) => ai, + Err(e) => { + gst::error!(CAT, imp: self, "Failed to parse audio caps: {}", e); + return false; + } + }; + + let mut state = self.state.lock(); + state.in_caps = Some(caps); + state.in_audio_info = audio_info; + state.update_fallback_duration(); + return true; + } + + _ => {} + } + + if event.is_serialized() { + let mut state = self.state.lock(); + while state.srcresult.is_ok() && state.in_item.is_some() { + self.cond.wait(&mut state); + } + + if state.srcresult.is_err() { + return false; + } + + gst::trace!(CAT, imp: self, "Queueing {:?}", event); + state.in_item = Some(Item::Event(event)); + self.cond.notify_all(); + + true + } else { + gst::Pad::event_default(pad, Some(&*self.obj()), event) + } + } + + fn src_event(&self, pad: &gst::Pad, mut event: gst::Event) -> bool { + { + let state = self.state.lock(); + if state.single_segment { + let event = event.make_mut(); + let latency = state.latency.nseconds() as i64; + event.set_running_time_offset(event.running_time_offset() - latency); + } + } + + match event.view() { + gst::EventView::Reconfigure(_) => { + { + let mut state = self.state.lock(); + if state.srcresult == Err(gst::FlowError::NotLinked) { + state.srcresult = Ok(gst::FlowSuccess::Ok); + let _ = self.start_src_task(); + } + } + self.sinkpad.push_event(event) + } + + _ => gst::Pad::event_default(pad, Some(&*self.obj()), event), + } + } + + fn sink_query(&self, pad: &gst::Pad, query: &mut gst::QueryRef) -> bool { + if query.is_serialized() { + let (sender, receiver) = mpsc::sync_channel(1); + + let mut state = self.state.lock(); + while state.srcresult.is_ok() && state.in_item.is_some() { + self.cond.wait(&mut state); + } + + if state.srcresult.is_err() { + return false; + } + + gst::trace!(CAT, imp: self, "Queueing {:?}", query); + state.in_item = Some(Item::Query(std::ptr::NonNull::from(query), sender)); + self.cond.notify_all(); + drop(state); + + receiver.recv().unwrap_or(false) + } else { + gst::Pad::query_default(pad, Some(&*self.obj()), query) + } + } + + fn src_query(&self, pad: &gst::Pad, query: &mut gst::QueryRef) -> bool { + match query.view_mut() { + gst::QueryViewMut::Latency(_) => { + if !gst::Pad::query_default(pad, Some(&*self.obj()), query) { + return false; + } + + let q = match query.view_mut() { + gst::QueryViewMut::Latency(q) => q, + _ => unreachable!(), + }; + + let mut state = self.state.lock(); + let latency = state.latency; + + let (_live, min, max) = q.result(); + q.set(true, min + latency, max.map(|max| max + latency)); + + state.upstream_latency = Some(min); + true + } + + _ => gst::Pad::query_default(pad, Some(&*self.obj()), query), + } + } + + fn sink_chain( + &self, + _pad: &gst::Pad, + mut buffer: gst::Buffer, + ) -> Result { + gst::trace!(CAT, imp: self, "incoming {:?}", buffer); + + let mut state = self.state.lock(); + + if state.upstream_latency.is_none() { + gst::debug!(CAT, imp: self, "Have no upstream latency yet, querying"); + let mut q = gst::query::Latency::new(); + if MutexGuard::unlocked(&mut state, || self.sinkpad.peer_query(&mut q)) { + let (live, min, max) = q.result(); + + gst::debug!( + CAT, + imp: self, + "Latency query response: live {} min {} max {}", + live, + min, + max.display() + ); + + state.upstream_latency = Some(min); + } else { + gst::warning!( + CAT, + imp: self, + "Can't query upstream latency -- assuming zero" + ); + } + } + + while state.srcresult.is_ok() && state.in_item.is_some() { + self.cond.wait(&mut state); + } + state.srcresult?; + + let buf_mut = buffer.make_mut(); + + if buf_mut.pts().is_none() { + gst::warning!(CAT, imp: self, "incoming buffer has no timestamps"); + } + + if let Some(audio_info) = &state.in_audio_info { + let buf_duration = buf_mut.duration().unwrap_or_default(); + if let Some(calc_duration) = audio_info + .convert::>(Some(gst::format::Bytes::from_usize( + buf_mut.size(), + ))) + .flatten() + { + let diff = if buf_duration < calc_duration { + calc_duration - buf_duration + } else { + buf_duration - calc_duration + }; + + if diff.nseconds() > 1 { + gst::warning!( + CAT, + imp: self, + "Correcting duration on audio buffer from {} to {}", + buf_duration, + calc_duration, + ); + buf_mut.set_duration(calc_duration); + } + } else { + gst::debug!( + CAT, + imp: self, + "Failed to calculate duration of {:?}", + buf_mut, + ); + } + } + + if state.single_segment { + // At this stage we should really really have a segment + let segment = state.segment.as_ref().ok_or(gst::FlowError::Error)?; + let dts = segment + .to_running_time_full(buf_mut.dts()) + .map(|r| r + SEGMENT_OFFSET) + .and_then(|r| r.positive()); + let pts = segment + .to_running_time_full(buf_mut.pts()) + .map(|r| r + SEGMENT_OFFSET) + .and_then(|r| r.positive()) + .or_else(|| { + self.obj() + .current_running_time() + .map(|r| r + SEGMENT_OFFSET) + }); + + buf_mut.set_dts(dts.map(|t| t + state.latency)); + buf_mut.set_pts(pts.map(|t| t + state.latency)); + } + + if buf_mut.duration().is_none() { + gst::debug!(CAT, imp: self, "incoming buffer without duration"); + buf_mut.set_duration(Some(state.fallback_duration)); + } + + if state + .out_buffer + .as_ref() + .map_or(false, |b| b.flags().contains(gst::BufferFlags::GAP)) + { + // We are done bridging a gap, so mark it as DISCONT instead + buf_mut.unset_flags(gst::BufferFlags::GAP); + buf_mut.set_flags(gst::BufferFlags::DISCONT); + } + + let mut timestamp = state.ts_range(buf_mut); + let lateness = self.buffer_is_backwards(&state, timestamp); + match lateness { + BufferLateness::OnTime => {} + + BufferLateness::LateUnderThreshold => { + gst::debug!(CAT, imp: self, "discarding late {:?}", buf_mut); + state.num_drop += 1; + return Ok(gst::FlowSuccess::Ok); + } + + BufferLateness::LateOverThreshold => { + gst::debug!(CAT, imp: self, "accepting late {:?}", buf_mut); + + let prev = state.out_buffer.as_ref().unwrap(); + let prev_duration = prev.duration().unwrap(); + + if let Some(audio_info) = &state.in_audio_info { + let mut map_info = buf_mut.map_writable().map_err(|e| { + gst::error!(CAT, imp: self, "Failed to map buffer: {}", e); + gst::FlowError::Error + })?; + + audio_info + .format_info() + .fill_silence(map_info.as_mut_slice()); + } else { + buf_mut.set_duration(Some(state.fallback_duration)); + } + + buf_mut.set_dts(prev.dts().map(|t| t + prev_duration)); + buf_mut.set_pts(prev.pts().map(|t| t + prev_duration)); + buf_mut.set_flags(gst::BufferFlags::GAP); + + timestamp = state.ts_range(buf_mut); + } + } + + gst::trace!(CAT, imp: self, "Queueing {:?} ({:?})", buffer, lateness); + state.in_item = Some(Item::Buffer(buffer, lateness)); + state.in_timestamp = timestamp; + state.num_in += 1; + self.cond.notify_all(); + + Ok(gst::FlowSuccess::Ok) + } + + fn start_src_task(&self) -> Result<(), glib::BoolError> { + self.srcpad.start_task({ + let pad = self.srcpad.downgrade(); + move || { + let pad = pad.upgrade().unwrap(); + let parent = pad.parent_element().unwrap(); + let livesync = parent.downcast_ref::().unwrap(); + let ret = livesync.imp().src_loop(&pad); + + if !ret { + gst::log!(CAT, obj: &parent, "Loop stopping"); + let _ = pad.pause_task(); + } + } + }) + } + + fn src_loop(&self, pad: &gst::Pad) -> bool { + let mut err = match self.src_loop_inner() { + Ok(_) => return true, + Err(e) => e, + }; + let eos; + + { + let mut state = self.state.lock(); + + match state.srcresult { + // Can be set to Flushing by another thread + Err(e) => err = e, + + // Communicate our flow return + Ok(_) => state.srcresult = Err(err), + } + eos = state.eos; + state.clock_id = None; + + self.cond.notify_all(); + } + + if eos && !matches!(err, gst::FlowError::Flushing | gst::FlowError::Eos) { + gst::element_imp_error!( + self, + gst::StreamError::Failed, + ("Internal data flow error."), + ["streaming task paused, reason {} ({:?})", err, err] + ); + pad.push_event(gst::event::Eos::new()); + } + + false + } + + fn src_loop_inner(&self) -> Result { + let mut state = self.state.lock(); + while state.srcresult.is_ok() + && (!state.playing || (state.in_item.is_none() && state.out_buffer.is_none())) + { + self.cond.wait(&mut state); + } + state.srcresult?; + + gst::trace!(CAT, imp: self, "Unqueueing {:?}", state.in_item); + let in_buffer = match state.in_item.take() { + None => None, + + Some(Item::Buffer(buffer, lateness)) => { + if self.buffer_is_early(&state, state.in_timestamp) { + // Try this buffer again on the next iteration + state.in_item = Some(Item::Buffer(buffer, lateness)); + None + } else { + Some((buffer, lateness)) + } + } + + Some(Item::Event(event)) => { + self.cond.notify_all(); + drop(state); + + self.srcpad.push_event(event); + + return Ok(gst::FlowSuccess::Ok); + } + + Some(Item::Query(mut query, sender)) => { + self.cond.notify_all(); + drop(state); + + // SAFETY: The other thread is waiting for us to handle the query + let res = self.srcpad.peer_query(unsafe { query.as_mut() }); + sender.send(res).ok(); + + return Ok(gst::FlowSuccess::Ok); + } + }; + + let (duplicate, caps) = if let Some((buffer, lateness)) = in_buffer { + let caps = state.in_caps.take(); + + state.out_buffer = Some(buffer); + state.out_timestamp = state.in_timestamp; + + if caps.is_some() { + state.out_audio_info = state.in_audio_info.clone(); + } + + self.cond.notify_all(); + + (lateness != BufferLateness::OnTime, caps) + } else { + // Work around borrow checker + let State { + fallback_duration, + out_buffer: ref mut buffer, + out_audio_info: ref audio_info, + .. + } = *state; + gst::debug!(CAT, imp: self, "repeating {:?}", buffer); + + let buffer = buffer.as_mut().unwrap().make_mut(); + let prev_duration = buffer.duration().unwrap(); + + if let Some(audio_info) = audio_info { + if !buffer.flags().contains(gst::BufferFlags::GAP) { + let mut map_info = buffer.map_writable().map_err(|e| { + gst::error!(CAT, imp: self, "Failed to map buffer: {}", e); + gst::FlowError::Error + })?; + + audio_info + .format_info() + .fill_silence(map_info.as_mut_slice()); + } + } else { + buffer.set_duration(Some(fallback_duration)); + } + + buffer.set_dts(buffer.dts().map(|t| t + prev_duration)); + buffer.set_pts(buffer.pts().map(|t| t + prev_duration)); + buffer.set_flags(gst::BufferFlags::GAP); + buffer.unset_flags(gst::BufferFlags::DISCONT); + + state.out_timestamp = state.ts_range(state.out_buffer.as_ref().unwrap()); + (true, None) + }; + + let buffer = state.out_buffer.clone().unwrap(); + let sync_ts = state + .out_timestamp + .map_or(gst::ClockTime::ZERO, |t| t.start); + + if let Some(caps) = caps { + gst::debug!(CAT, imp: self, "Sending new caps: {}", caps); + + let event = gst::event::Caps::new(&caps); + MutexGuard::unlocked(&mut state, || self.srcpad.push_event(event)); + state.srcresult?; + } + + if !state.sent_segment { + let event = if state.single_segment { + // Create live segment + let mut segment = gst::FormattedSegment::::new(); + segment.set_start(sync_ts + SEGMENT_OFFSET); + segment.set_base(sync_ts); + segment.set_time(sync_ts); + segment.set_position(sync_ts + SEGMENT_OFFSET); + + gst::debug!(CAT, imp: self, "Sending new segment: {:?}", segment); + gst::event::Segment::new(&segment) + } else { + let segment = state.segment.as_ref().unwrap(); + + gst::debug!(CAT, imp: self, "Forwarding segment: {:?}", segment); + gst::event::Segment::new(segment) + }; + + MutexGuard::unlocked(&mut state, || self.srcpad.push_event(event)); + state.srcresult?; + state.sent_segment = true; + } + + { + let element = self.obj(); + + let base_time = element.base_time().ok_or_else(|| { + gst::error!(CAT, imp: self, "Missing base time"); + gst::FlowError::Flushing + })?; + + let clock = element.clock().ok_or_else(|| { + gst::error!(CAT, imp: self, "Missing clock"); + gst::FlowError::Flushing + })?; + + let clock_id = clock.new_single_shot_id(base_time + sync_ts); + state.clock_id = Some(clock_id.clone()); + + gst::trace!( + CAT, + imp: self, + "Waiting for clock to reach {}", + clock_id.time(), + ); + + let (res, _) = MutexGuard::unlocked(&mut state, || clock_id.wait()); + gst::trace!(CAT, imp: self, "Clock returned {res:?}",); + + if res == Err(gst::ClockError::Unscheduled) { + return Err(gst::FlowError::Flushing); + } + + state.srcresult?; + state.clock_id = None; + } + + state.num_out += 1; + if duplicate { + state.num_duplicate += 1; + } + + drop(state); + + gst::trace!(CAT, imp: self, "Pushing {buffer:?}"); + self.srcpad.push(buffer) + } + + fn buffer_is_backwards(&self, state: &State, timestamp: Option) -> BufferLateness { + let timestamp = match timestamp { + Some(t) => t, + None => return BufferLateness::OnTime, + }; + + let out_timestamp = match state.out_timestamp { + Some(t) => t, + None => return BufferLateness::OnTime, + }; + + if timestamp.end > out_timestamp.end { + return BufferLateness::OnTime; + } + + gst::debug!( + CAT, + imp: self, + "Timestamp regresses: buffer ends at {}, expected {}", + timestamp.end, + out_timestamp.end, + ); + + let late_threshold = match state.late_threshold { + Some(gst::ClockTime::ZERO) => return BufferLateness::LateOverThreshold, + Some(t) => t, + None => return BufferLateness::LateUnderThreshold, + }; + + let in_timestamp = match state.in_timestamp { + Some(t) => t, + None => return BufferLateness::LateUnderThreshold, + }; + + if timestamp.start > in_timestamp.end + late_threshold { + BufferLateness::LateOverThreshold + } else { + BufferLateness::LateUnderThreshold + } + } + + fn buffer_is_early(&self, state: &State, timestamp: Option) -> bool { + let timestamp = match timestamp { + Some(t) => t, + None => return false, + }; + + let out_timestamp = match state.out_timestamp { + Some(t) => t, + None => return false, + }; + + let slack = state + .out_buffer + .as_deref() + .map_or(gst::ClockTime::ZERO, |b| b.duration().unwrap()); + + if timestamp.start < out_timestamp.end + slack { + return false; + } + + gst::debug!( + CAT, + imp: self, + "Timestamp is too early: buffer starts at {}, expected {}", + timestamp.start, + out_timestamp.end, + ); + + true + } +} diff --git a/utils/livesync/src/livesync/mod.rs b/utils/livesync/src/livesync/mod.rs new file mode 100644 index 00000000..ab36d123 --- /dev/null +++ b/utils/livesync/src/livesync/mod.rs @@ -0,0 +1,28 @@ +// Copyright (C) 2022 LTN Global Communications, Inc. +// Contact: Jan Alexander Steffens (heftig) +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +use gst::{glib, prelude::*}; + +mod imp; + +glib::wrapper! { + pub struct LiveSync(ObjectSubclass) + @extends gst::Element, gst::Object; +} + +pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { + gst::Element::register( + Some(plugin), + "livesync", + gst::Rank::None, + LiveSync::static_type(), + )?; + + Ok(()) +} diff --git a/utils/livesync/tests/livesync.rs b/utils/livesync/tests/livesync.rs new file mode 100644 index 00000000..595f523b --- /dev/null +++ b/utils/livesync/tests/livesync.rs @@ -0,0 +1,198 @@ +// Copyright (C) 2022 LTN Global Communications, Inc. +// Contact: Jan Alexander Steffens (heftig) +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +use gst::prelude::*; + +fn init() { + use std::sync::Once; + static INIT: Once = Once::new(); + + INIT.call_once(|| { + gst::init().unwrap(); + gstlivesync::plugin_register_static().expect("Failed to register livesync plugin"); + }); +} + +const DURATION: gst::ClockTime = gst::ClockTime::from_mseconds(100); +const LATENCY: gst::ClockTime = gst::ClockTime::from_mseconds(200); +const SEGMENT_OFFSET: gst::ClockTime = gst::ClockTime::from_seconds(60 * 60 * 1000); + +fn crank_pull(harness: &mut gst_check::Harness) -> gst::Buffer { + harness.crank_single_clock_wait().unwrap(); + harness.pull().unwrap() +} + +#[track_caller] +fn assert_buf( + buf: &gst::BufferRef, + offset: u64, + pts: gst::ClockTime, + duration: gst::ClockTime, + flags: gst::BufferFlags, +) { + assert_eq!(buf.offset(), offset, "Bad offset"); + assert_eq!(buf.pts(), Some(pts), "Bad PTS"); + assert_eq!(buf.duration(), Some(duration), "Bad duration"); + assert_eq!( + buf.flags() - gst::BufferFlags::TAG_MEMORY, + flags, + "Bad flags", + ); +} + +#[track_caller] +fn assert_crank_pull( + harness: &mut gst_check::Harness, + offset_per_buffer: u64, + src_buffer_number: u64, + sink_buffer_number: u64, + flags: gst::BufferFlags, + singlesegment: bool, +) { + let pts = if singlesegment { + LATENCY + DURATION * sink_buffer_number + SEGMENT_OFFSET + } else { + DURATION * sink_buffer_number + }; + assert_buf( + &crank_pull(harness), + offset_per_buffer * src_buffer_number, + pts, + DURATION, + flags, + ); +} + +#[test] +fn test_video_singlesegment() { + test_video(true); +} + +#[test] +fn test_audio_singlesegment() { + test_audio(true); +} + +#[test] +fn test_video_nonsinglesegment() { + test_video(false); +} + +#[test] +fn test_audio_nonsinglesegment() { + test_audio(false); +} + +fn test_video(singlesegment: bool) { + init(); + + let mut h = gst_check::Harness::new("livesync"); + h.add_src_parse( + r"videotestsrc is-live=1 + ! capsfilter caps=video/x-raw,framerate=10/1 + ", + true, + ); + + let element = h.element().unwrap(); + element.set_property("latency", LATENCY); + element.set_property("single-segment", singlesegment); + + test_livesync(&mut h, 1, singlesegment); +} + +fn test_audio(singlesegment: bool) { + init(); + + let mut h = gst_check::Harness::new("livesync"); + h.add_src_parse( + r"audiotestsrc is-live=1 samplesperbuffer=4800 + ! capsfilter caps=audio/x-raw,rate=48000 + ", + true, + ); + + let element = h.element().unwrap(); + element.set_property("latency", LATENCY); + element.set_property("single-segment", singlesegment); + + test_livesync(&mut h, 4800, singlesegment); +} + +fn test_livesync(h: &mut gst_check::Harness, o: u64, singlesegment: bool) { + // Normal operation ------------------------------ + + // Push frames 0-1, pull frame 0 + h.push_from_src().unwrap(); + h.push_from_src().unwrap(); + assert_eq!(h.pull_event().unwrap().type_(), gst::EventType::StreamStart); + assert_eq!(h.pull_event().unwrap().type_(), gst::EventType::Caps); + assert_eq!(h.pull_event().unwrap().type_(), gst::EventType::Segment); + assert_crank_pull(h, o, 0, 0, gst::BufferFlags::DISCONT, singlesegment); + + // Push frames 2-10, pull frames 1-9 + for i in 1..=9 { + h.push_from_src().unwrap(); + assert_crank_pull(h, o, i, i, gst::BufferFlags::empty(), singlesegment); + } + + // Pull frame 10 + assert_crank_pull(h, o, 10, 10, gst::BufferFlags::empty(), singlesegment); + + // Bridging gap ---------------------------------- + + // Pull frames 11-19 + for i in 11..=19 { + assert_crank_pull(h, o, 10, i, gst::BufferFlags::GAP, singlesegment); + } + + // Push frames 11-19 + for _ in 11..=19 { + h.push_from_src().unwrap(); + } + + // Normal operation ------------------------------ + + // Push frames 20-21, pull frame 20 + for _ in 1..=2 { + let mut src_h = h.src_harness_mut().unwrap(); + src_h.crank_single_clock_wait().unwrap(); + let mut buf = src_h.pull().unwrap(); + let buf_mut = buf.make_mut(); + buf_mut.set_flags(gst::BufferFlags::MARKER); + h.push(buf).unwrap(); + } + assert_crank_pull(h, o, 10, 20, gst::BufferFlags::GAP, singlesegment); + + // Push frame 22, pull frame 21 + h.push_from_src().unwrap(); + assert_crank_pull( + h, + o, + 21, + 21, + gst::BufferFlags::DISCONT | gst::BufferFlags::MARKER, + singlesegment, + ); + + // Push frames 23-30, pull frames 22-29 + for i in 22..=29 { + h.push_from_src().unwrap(); + assert_crank_pull(h, o, i, i, gst::BufferFlags::empty(), singlesegment); + } + + // EOS ------------------------------------------- + assert!(h.push_event(gst::event::Eos::new())); + + // Pull frame 30 + assert_crank_pull(h, o, 30, 30, gst::BufferFlags::empty(), singlesegment); + + assert_eq!(h.pull_event().unwrap().type_(), gst::EventType::Eos); + assert_eq!(h.try_pull(), None); +}