diff --git a/Cargo.toml b/Cargo.toml index a8cf771d..96e090c9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -36,6 +36,7 @@ members = [ "text/wrap", "utils/fallbackswitch", + "utils/livesync", "utils/togglerecord", "utils/tracers", "utils/uriplaylistbin", @@ -84,6 +85,7 @@ default-members = [ "text/wrap", "utils/fallbackswitch", + "utils/livesync", "utils/togglerecord", "utils/tracers", "utils/uriplaylistbin", diff --git a/README.md b/README.md index 575dbd5d..2b50eb4d 100644 --- a/README.md +++ b/README.md @@ -127,6 +127,9 @@ You will find the following plugins in this repository: configuring a fallback audio/video if there are problems with the main source. + - `livesync`: Element to maintain a continuous live stream from a + potentially unstable source. + - `togglerecord`: Element to enable starting and stopping multiple streams together. - `tracers`: Plugin with multiple tracers: diff --git a/docs/plugins/gst_plugins_cache.json b/docs/plugins/gst_plugins_cache.json index 42779a54..1a05277b 100644 --- a/docs/plugins/gst_plugins_cache.json +++ b/docs/plugins/gst_plugins_cache.json @@ -2497,6 +2497,142 @@ "tracers": {}, "url": "https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs" }, + "livesync": { + "description": "Livesync Plugin", + "elements": { + "livesync": { + "author": "Jan Alexander Steffens (heftig) ", + "description": "Outputs livestream, inserting gap frames when input lags", + "hierarchy": [ + "GstLiveSync", + "GstElement", + "GstObject", + "GInitiallyUnowned", + "GObject" + ], + "klass": "Filter", + "long-name": "Live Synchronizer", + "pad-templates": { + "sink": { + "caps": "ANY", + "direction": "sink", + "presence": "always" + }, + "src": { + "caps": "ANY", + "direction": "src", + "presence": "always" + } + }, + "properties": { + "drop": { + "blurb": "Number of incoming frames dropped", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "0", + "max": "18446744073709551615", + "min": "0", + "mutable": "null", + "readable": true, + "type": "guint64", + "writable": false + }, + "duplicate": { + "blurb": "Number of outgoing frames duplicated", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "0", + "max": "18446744073709551615", + "min": "0", + "mutable": "null", + "readable": true, + "type": "guint64", + "writable": false + }, + "in": { + "blurb": "Number of incoming frames accepted", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "0", + "max": "18446744073709551615", + "min": "0", + "mutable": "null", + "readable": true, + "type": "guint64", + "writable": false + }, + "late-threshold": { + "blurb": "Maximum time spent (in nanoseconds) before accepting one late buffer; -1 = never", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "2000000000", + "max": "18446744073709551615", + "min": "1000000000", + "mutable": "playing", + "readable": true, + "type": "guint64", + "writable": true + }, + "latency": { + "blurb": "Additional latency to allow upstream to take longer to produce buffers for the current position (in nanoseconds)", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "0", + "max": "9223372036854775807", + "min": "0", + "mutable": "playing", + "readable": true, + "type": "guint64", + "writable": true + }, + "out": { + "blurb": "Number of outgoing frames produced", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "0", + "max": "18446744073709551615", + "min": "0", + "mutable": "null", + "readable": true, + "type": "guint64", + "writable": false + }, + "single-segment": { + "blurb": "Timestamp buffers and eat segments so as to appear as one segment", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "false", + "mutable": "ready", + "readable": true, + "type": "gboolean", + "writable": true + } + }, + "rank": "none" + } + }, + "filename": "gstlivesync", + "license": "MPL", + "other-types": {}, + "package": "gst-plugin-livesync", + "source": "gst-plugin-livesync", + "tracers": {}, + "url": "https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs" + }, "mp4": { "description": "GStreamer Rust MP4 Plugin", "elements": { diff --git a/meson.build b/meson.build index a800e3cd..37d62c17 100644 --- a/meson.build +++ b/meson.build @@ -67,6 +67,7 @@ plugins = { 'gst-plugin-textwrap': 'libgsttextwrap', 'gst-plugin-fallbackswitch': 'libgstfallbackswitch', + 'gst-plugin-livesync': 'libgstlivesync', 'gst-plugin-togglerecord': 'libgsttogglerecord', 'gst-plugin-tracers': 'libgstrstracers', 'gst-plugin-uriplaylistbin': 'libgsturiplaylistbin', diff --git a/utils/livesync/Cargo.toml b/utils/livesync/Cargo.toml new file mode 100644 index 00000000..ccd6e206 --- /dev/null +++ b/utils/livesync/Cargo.toml @@ -0,0 +1,58 @@ +[package] +name = "gst-plugin-livesync" +version = "0.9.0-alpha.1" +authors = ["Jan Alexander Steffens (heftig) "] +license = "MPL-2.0" +description = "Livesync Plugin" +repository = "https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs" +edition = "2021" +rust-version = "1.63" + +[dependencies] +gio = { git = "https://github.com/gtk-rs/gtk-rs-core", optional = true } +gst = { package = "gstreamer", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" } +gst-audio = { package = "gstreamer-audio", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" } +gst-plugin-gtk4 = { path = "../../video/gtk4", optional = true } +gtk = { package = "gtk4", git = "https://github.com/gtk-rs/gtk4-rs", optional = true } +muldiv = "1.0" +num-rational = { version = "0.4", default-features = false, features = [] } +once_cell = "1.0" +parking_lot = "0.12" + +[dev-dependencies] +gst-check = { package = "gstreamer-check", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" } + +[lib] +name = "gstlivesync" +crate-type = ["cdylib", "rlib"] +path = "src/lib.rs" + +[[example]] +name = "gtk-livesync" +path = "examples/gtk_livesync.rs" +required-features = ["gtk", "gio", "gst-plugin-gtk4"] + +[[test]] +name = "livesync" +path = "tests/livesync.rs" + +[build-dependencies] +gst-plugin-version-helper = { path="../../version-helper" } + +[features] +static = [] +capi = [] +doc = ["gst/v1_18"] + +[package.metadata.capi] +min_version = "0.8.0" + +[package.metadata.capi.header] +enabled = false + +[package.metadata.capi.library] +install_subdir = "gstreamer-1.0" +versioning = false + +[package.metadata.capi.pkg_config] +requires_private = "gstreamer-1.0, gstreamer-audio-1.0, gobject-2.0, glib-2.0, gmodule-2.0" diff --git a/utils/livesync/LICENSE-MPL-2.0 b/utils/livesync/LICENSE-MPL-2.0 new file mode 120000 index 00000000..eb5d24fe --- /dev/null +++ b/utils/livesync/LICENSE-MPL-2.0 @@ -0,0 +1 @@ +../../LICENSE-MPL-2.0 \ No newline at end of file diff --git a/utils/livesync/build.rs b/utils/livesync/build.rs new file mode 100644 index 00000000..cda12e57 --- /dev/null +++ b/utils/livesync/build.rs @@ -0,0 +1,3 @@ +fn main() { + gst_plugin_version_helper::info() +} diff --git a/utils/livesync/examples/gtk_livesync.rs b/utils/livesync/examples/gtk_livesync.rs new file mode 100644 index 00000000..0a24acdf --- /dev/null +++ b/utils/livesync/examples/gtk_livesync.rs @@ -0,0 +1,175 @@ +// Copyright (C) 2022 LTN Global Communications, Inc. +// Contact: Jan Alexander Steffens (heftig) +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +use gio::prelude::*; +use gst::{glib, prelude::*}; +use gtk::prelude::*; +use std::cell::Cell; + +struct DroppingProbe(glib::WeakRef, Option); + +impl DroppingProbe { + fn install(pad: &gst::Pad) -> Self { + let probe_id = pad + .add_probe(gst::PadProbeType::BUFFER, |_, _| gst::PadProbeReturn::Drop) + .unwrap(); + Self(pad.downgrade(), Some(probe_id)) + } +} + +impl Drop for DroppingProbe { + fn drop(&mut self) { + if let Some((pad, probe_id)) = self.0.upgrade().zip(self.1.take()) { + pad.remove_probe(probe_id); + } + } +} + +fn create_pipeline() -> gst::Pipeline { + gst::parse_launch( + r#"videotestsrc name=vsrc is-live=1 + ! video/x-raw,framerate=60/1,width=800,height=600 + ! timeoverlay text="Pre:" + ! queue + ! livesync latency=50000000 + ! videorate + ! timeoverlay text="Post:" halignment=right + ! queue + ! gtk4paintablesink name=vsink + audiotestsrc name=asrc is-live=1 + ! audio/x-raw,channels=2 + ! queue + ! livesync latency=50000000 + ! audiorate + ! queue + ! autoaudiosink + "#, + ) + .expect("Failed to create pipeline") + .downcast() + .unwrap() +} + +fn create_window(app: >k::Application) { + let pipeline = create_pipeline(); + let video_src_pad = pipeline.by_name("vsrc").unwrap().static_pad("src").unwrap(); + let audio_src_pad = pipeline.by_name("asrc").unwrap().static_pad("src").unwrap(); + + let window = gtk::ApplicationWindow::new(app); + window.set_default_size(800, 684); + + let vbox = gtk::Box::new(gtk::Orientation::Vertical, 0); + + let picture = gtk::Picture::new(); + let paintable = pipeline + .by_name("vsink") + .unwrap() + .property::("paintable"); + picture.set_paintable(Some(&paintable)); + vbox.append(&picture); + + let action_bar = gtk::ActionBar::new(); + vbox.append(&action_bar); + + let offset_spin = gtk::SpinButton::with_range(0.0, 500.0, 100.0); + action_bar.pack_start(&offset_spin); + + { + let video_src_pad = video_src_pad.clone(); + let audio_src_pad = audio_src_pad.clone(); + offset_spin.connect_value_notify(move |offset_spin| { + const MSECOND: f64 = gst::ClockTime::MSECOND.nseconds() as _; + + let offset = (offset_spin.value() * -MSECOND) as i64; + video_src_pad.set_offset(offset); + audio_src_pad.set_offset(offset); + }); + } + + let drop_button = gtk::ToggleButton::with_label("Drop Signal"); + action_bar.pack_end(&drop_button); + + let drop_ids = Cell::new(None); + drop_button.connect_toggled(move |drop_button| { + if drop_button.is_active() { + let video_probe = DroppingProbe::install(&video_src_pad); + let audio_probe = DroppingProbe::install(&audio_src_pad); + drop_ids.set(Some((video_probe, audio_probe))); + } else { + drop_ids.set(None); + } + }); + + { + let bus = pipeline.bus().unwrap(); + let window = window.downgrade(); + bus.add_watch_local(move |_, msg| { + use gst::MessageView; + + match msg.view() { + MessageView::Eos(..) => { + if let Some(window) = window.upgrade() { + window.close(); + } + } + + MessageView::Error(err) => { + eprintln!( + "Error from {}: {} ({:?})", + msg.src().map(|s| s.path_string()).as_deref().unwrap_or(""), + err.error(), + err.debug(), + ); + if let Some(window) = window.upgrade() { + window.close(); + } + } + + _ => (), + }; + + glib::Continue(true) + }) + .unwrap(); + } + + { + let pipeline = pipeline.clone(); + window.connect_realize(move |_| { + pipeline + .set_state(gst::State::Playing) + .expect("Failed to start pipeline"); + }); + } + + window.connect_unrealize(move |_| { + pipeline + .set_state(gst::State::Null) + .expect("Failed to stop pipeline"); + }); + + window.set_child(Some(&vbox)); + window.show(); +} + +fn main() { + let app = gtk::Application::new( + Some("gtk-plugins-rs.gtk-livesync"), + gio::ApplicationFlags::FLAGS_NONE, + ); + + app.connect_startup(move |_app| { + gst::init().expect("Failed to initialize GStreamer"); + gstlivesync::plugin_register_static().expect("Failed to register livesync plugin"); + gstgtk4::plugin_register_static().expect("Failed to register gstgtk4 plugin"); + }); + + app.connect_activate(create_window); + app.run(); +} diff --git a/utils/livesync/src/lib.rs b/utils/livesync/src/lib.rs new file mode 100644 index 00000000..9d646c36 --- /dev/null +++ b/utils/livesync/src/lib.rs @@ -0,0 +1,36 @@ +// Copyright (C) 2022 LTN Global Communications, Inc. +// Contact: Jan Alexander Steffens (heftig) +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +#![allow(clippy::non_send_fields_in_send_ty, unused_doc_comments)] + +/** + * plugin-livesync: + * + * Since: plugins-rs-0.9.0 + */ +use gst::glib; + +mod livesync; + +fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { + livesync::register(plugin) +} + +gst::plugin_define!( + livesync, + env!("CARGO_PKG_DESCRIPTION"), + plugin_init, + concat!(env!("CARGO_PKG_VERSION"), "-", env!("COMMIT_ID")), + // FIXME: MPL-2.0 is only allowed since 1.18.3 (as unknown) and 1.20 (as known) + "MPL", + env!("CARGO_PKG_NAME"), + env!("CARGO_PKG_NAME"), + env!("CARGO_PKG_REPOSITORY"), + env!("BUILD_REL_DATE") +); diff --git a/utils/livesync/src/livesync/imp.rs b/utils/livesync/src/livesync/imp.rs new file mode 100644 index 00000000..63c15678 --- /dev/null +++ b/utils/livesync/src/livesync/imp.rs @@ -0,0 +1,1202 @@ +// Copyright (C) 2022 LTN Global Communications, Inc. +// Contact: Jan Alexander Steffens (heftig) +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +use gst::{ + glib::{self, translate::IntoGlib}, + prelude::*, + subclass::prelude::*, +}; +use once_cell::sync::Lazy; +use parking_lot::{Condvar, Mutex, MutexGuard}; +use std::sync::mpsc; + +/// Offset for the segment in single-segment mode, to handle negative DTS +const SEGMENT_OFFSET: gst::ClockTime = gst::ClockTime::from_seconds(60 * 60 * 1000); + +static CAT: Lazy = Lazy::new(|| { + gst::DebugCategory::new( + "livesync", + gst::DebugColorFlags::empty(), + Some("debug category for the livesync element"), + ) +}); + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum BufferLateness { + OnTime, + LateUnderThreshold, + LateOverThreshold, +} + +#[derive(Debug)] +enum Item { + Buffer(gst::Buffer, BufferLateness), + Event(gst::Event), + // SAFETY: Item needs to wait until the query and the receiver has returned + Query(std::ptr::NonNull, mpsc::SyncSender), +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +struct Timestamps { + start: gst::ClockTime, + end: gst::ClockTime, +} + +// SAFETY: Need to be able to pass *mut gst::QueryRef +unsafe impl Send for Item {} + +#[derive(Debug)] +pub struct LiveSync { + state: Mutex, + cond: Condvar, + sinkpad: gst::Pad, + srcpad: gst::Pad, +} + +#[derive(Debug)] +struct State { + latency: gst::ClockTime, + late_threshold: Option, + + upstream_latency: Option, + fallback_duration: gst::ClockTime, + + eos: bool, + segment: Option>, + + srcresult: Result, + playing: bool, + sent_segment: bool, + clock_id: Option, + + in_caps: Option, + in_audio_info: Option, + out_audio_info: Option, + + in_item: Option, + out_buffer: Option, + + in_timestamp: Option, + out_timestamp: Option, + + num_in: u64, + num_drop: u64, + num_out: u64, + num_duplicate: u64, + single_segment: bool, +} + +const PROP_LATENCY: &str = "latency"; +const PROP_LATE_THRESHOLD: &str = "late-threshold"; + +const PROP_IN: &str = "in"; +const PROP_DROP: &str = "drop"; +const PROP_OUT: &str = "out"; +const PROP_DUPLICATE: &str = "duplicate"; +const PROP_SINGLE_SEGMENT: &str = "single-segment"; + +const DEFAULT_LATENCY: gst::ClockTime = gst::ClockTime::ZERO; +const DEFAULT_DURATION: gst::ClockTime = gst::ClockTime::from_mseconds(100); +const MINIMUM_LATE_THRESHOLD: gst::ClockTime = gst::ClockTime::from_seconds(1); +const DEFAULT_LATE_THRESHOLD: Option = Some(gst::ClockTime::from_seconds(2)); + +impl Default for State { + fn default() -> Self { + Self { + latency: DEFAULT_LATENCY, + late_threshold: DEFAULT_LATE_THRESHOLD, + upstream_latency: None, + fallback_duration: DEFAULT_DURATION, + eos: false, + segment: None, + srcresult: Err(gst::FlowError::Flushing), + playing: false, + sent_segment: false, + clock_id: None, + in_caps: None, + in_audio_info: None, + out_audio_info: None, + in_item: None, + out_buffer: None, + in_timestamp: None, + out_timestamp: None, + num_in: 0, + num_drop: 0, + num_out: 0, + num_duplicate: 0, + single_segment: false, + } + } +} + +#[glib::object_subclass] +impl ObjectSubclass for LiveSync { + const NAME: &'static str = "GstLiveSync"; + type Type = super::LiveSync; + type ParentType = gst::Element; + + fn with_class(class: &Self::Class) -> Self { + let sinkpad = + gst::Pad::builder_with_template(&class.pad_template("sink").unwrap(), Some("sink")) + .activatemode_function(|pad, parent, mode, active| { + Self::catch_panic_pad_function( + parent, + || Err(gst::loggable_error!(CAT, "sink_activate_mode panicked")), + |livesync| livesync.sink_activate_mode(pad, mode, active), + ) + }) + .event_function(|pad, parent, event| { + Self::catch_panic_pad_function( + parent, + || false, + |livesync| livesync.sink_event(pad, event), + ) + }) + .query_function(|pad, parent, query| { + Self::catch_panic_pad_function( + parent, + || false, + |livesync| livesync.sink_query(pad, query), + ) + }) + .chain_function(|pad, parent, buffer| { + Self::catch_panic_pad_function( + parent, + || Err(gst::FlowError::Error), + |livesync| livesync.sink_chain(pad, buffer), + ) + }) + .flags( + gst::PadFlags::PROXY_CAPS + | gst::PadFlags::PROXY_ALLOCATION + | gst::PadFlags::PROXY_SCHEDULING, + ) + .build(); + + let srcpad = + gst::Pad::builder_with_template(&class.pad_template("src").unwrap(), Some("src")) + .activatemode_function(|pad, parent, mode, active| { + Self::catch_panic_pad_function( + parent, + || Err(gst::loggable_error!(CAT, "src_activate_mode panicked")), + |livesync| livesync.src_activate_mode(pad, mode, active), + ) + }) + .event_function(|pad, parent, event| { + Self::catch_panic_pad_function( + parent, + || false, + |livesync| livesync.src_event(pad, event), + ) + }) + .query_function(|pad, parent, query| { + Self::catch_panic_pad_function( + parent, + || false, + |livesync| livesync.src_query(pad, query), + ) + }) + .flags( + gst::PadFlags::PROXY_CAPS + | gst::PadFlags::PROXY_ALLOCATION + | gst::PadFlags::PROXY_SCHEDULING, + ) + .build(); + + Self { + state: Default::default(), + cond: Condvar::new(), + sinkpad, + srcpad, + } + } +} + +impl ObjectImpl for LiveSync { + fn properties() -> &'static [glib::ParamSpec] { + static PROPERTIES: Lazy<[glib::ParamSpec; 7]> = Lazy::new(|| { + [ + glib::ParamSpecUInt64::builder(PROP_LATENCY) + .nick("Latency") + .blurb( + "Additional latency to allow upstream to take longer to \ + produce buffers for the current position (in nanoseconds)", + ) + .maximum(i64::MAX as u64) + .default_value(DEFAULT_LATENCY.into_glib()) + .mutable_playing() + .build(), + glib::ParamSpecUInt64::builder(PROP_LATE_THRESHOLD) + .nick("Late threshold") + .blurb( + "Maximum time spent (in nanoseconds) before \ + accepting one late buffer; -1 = never", + ) + .minimum(MINIMUM_LATE_THRESHOLD.into_glib()) + .default_value(DEFAULT_LATE_THRESHOLD.into_glib()) + .mutable_playing() + .build(), + glib::ParamSpecUInt64::builder(PROP_IN) + .nick("Frames input") + .blurb("Number of incoming frames accepted") + .read_only() + .build(), + glib::ParamSpecUInt64::builder(PROP_DROP) + .nick("Frames dropped") + .blurb("Number of incoming frames dropped") + .read_only() + .build(), + glib::ParamSpecUInt64::builder(PROP_OUT) + .nick("Frames output") + .blurb("Number of outgoing frames produced") + .read_only() + .build(), + glib::ParamSpecUInt64::builder(PROP_DUPLICATE) + .nick("Frames duplicated") + .blurb("Number of outgoing frames duplicated") + .read_only() + .build(), + glib::ParamSpecBoolean::builder(PROP_SINGLE_SEGMENT) + .nick("Single segment") + .blurb("Timestamp buffers and eat segments so as to appear as one segment") + .mutable_ready() + .build(), + ] + }); + + PROPERTIES.as_ref() + } + + fn constructed(&self) { + self.parent_constructed(); + + let obj = self.obj(); + obj.add_pad(&self.sinkpad).unwrap(); + obj.add_pad(&self.srcpad).unwrap(); + obj.set_element_flags(gst::ElementFlags::PROVIDE_CLOCK | gst::ElementFlags::REQUIRE_CLOCK); + } + + fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) { + let mut state = self.state.lock(); + match pspec.name() { + PROP_LATENCY => { + state.latency = value.get().unwrap(); + state.update_fallback_duration(); + let _ = self.obj().post_message(gst::message::Latency::new()); + } + + PROP_LATE_THRESHOLD => { + state.late_threshold = value.get().unwrap(); + } + + PROP_SINGLE_SEGMENT => { + state.single_segment = value.get().unwrap(); + } + + _ => unimplemented!(), + } + } + + fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { + let state = self.state.lock(); + match pspec.name() { + PROP_LATENCY => state.latency.to_value(), + PROP_LATE_THRESHOLD => state.late_threshold.to_value(), + PROP_IN => state.num_in.to_value(), + PROP_DROP => state.num_drop.to_value(), + PROP_OUT => state.num_out.to_value(), + PROP_DUPLICATE => state.num_duplicate.to_value(), + PROP_SINGLE_SEGMENT => state.single_segment.to_value(), + _ => unimplemented!(), + } + } +} + +impl GstObjectImpl for LiveSync {} + +impl ElementImpl for LiveSync { + fn metadata() -> Option<&'static gst::subclass::ElementMetadata> { + static ELEMENT_METADATA: Lazy = Lazy::new(|| { + gst::subclass::ElementMetadata::new( + "Live Synchronizer", + "Filter", + "Outputs livestream, inserting gap frames when input lags", + "Jan Alexander Steffens (heftig) ", + ) + }); + + Some(&*ELEMENT_METADATA) + } + + fn pad_templates() -> &'static [gst::PadTemplate] { + static PAD_TEMPLATES: Lazy<[gst::PadTemplate; 2]> = Lazy::new(|| { + let caps = gst::Caps::new_any(); + + [ + gst::PadTemplate::new( + "sink", + gst::PadDirection::Sink, + gst::PadPresence::Always, + &caps, + ) + .unwrap(), + gst::PadTemplate::new( + "src", + gst::PadDirection::Src, + gst::PadPresence::Always, + &caps, + ) + .unwrap(), + ] + }); + + PAD_TEMPLATES.as_ref() + } + + fn change_state( + &self, + transition: gst::StateChange, + ) -> Result { + gst::trace!(CAT, imp: self, "Changing state {:?}", transition); + + if transition == gst::StateChange::PausedToPlaying { + let mut state = self.state.lock(); + state.playing = true; + self.cond.notify_all(); + } + + let success = self.parent_change_state(transition)?; + + if transition == gst::StateChange::PlayingToPaused { + let mut state = self.state.lock(); + state.playing = false; + } + + match (transition, success) { + ( + gst::StateChange::ReadyToPaused | gst::StateChange::PlayingToPaused, + gst::StateChangeSuccess::Success, + ) => Ok(gst::StateChangeSuccess::NoPreroll), + (_, s) => Ok(s), + } + } + + fn provide_clock(&self) -> Option { + Some(gst::SystemClock::obtain()) + } +} + +impl State { + /// Calculate the running time the buffer covers, including latency + fn ts_range(&self, buf: &gst::BufferRef) -> Option { + let mut timestamp_start = buf.dts_or_pts()?; + + if !self.single_segment { + timestamp_start = self + .segment + .as_ref() + .unwrap() + .to_running_time(timestamp_start) + .unwrap_or(gst::ClockTime::ZERO); + timestamp_start += self.latency + self.upstream_latency.unwrap(); + } else { + timestamp_start += self.upstream_latency.unwrap(); + timestamp_start = timestamp_start.saturating_sub(SEGMENT_OFFSET); + } + + Some(Timestamps { + start: timestamp_start, + end: timestamp_start + buf.duration().unwrap(), + }) + } + + fn update_fallback_duration(&mut self) { + self.fallback_duration = self + // First, try 1/framerate from the caps + .in_caps + .as_ref() + .and_then(|c| c.structure(0)) + .filter(|s| s.name().starts_with("video/")) + .and_then(|s| s.get::("framerate").ok()) + .and_then(|framerate| { + gst::ClockTime::SECOND + .mul_div_round(framerate.numer() as u64, framerate.denom() as u64) + }) + .filter(|&dur| dur > 8.mseconds() && dur < 10.seconds()) + // Otherwise, half the configured latency + .or_else(|| Some(self.latency / 2)) + // In any case, don't allow a zero duration + .filter(|&dur| dur > gst::ClockTime::ZERO) + // Safe default + .unwrap_or(DEFAULT_DURATION); + } +} + +impl LiveSync { + fn sink_activate_mode( + &self, + pad: &gst::Pad, + mode: gst::PadMode, + active: bool, + ) -> Result<(), gst::LoggableError> { + if mode != gst::PadMode::Push { + return Err(gst::loggable_error!(CAT, "Wrong scheduling mode")); + } + + if active { + let mut state = self.state.lock(); + state.srcresult = Ok(gst::FlowSuccess::Ok); + state.eos = false; + state.in_timestamp = None; + state.num_in = 0; + state.num_drop = 0; + state.segment = None; + } else { + { + let mut state = self.state.lock(); + state.srcresult = Err(gst::FlowError::Flushing); + state.out_buffer = None; + state.out_audio_info = None; + if let Some(clock_id) = state.clock_id.take() { + clock_id.unschedule(); + } + self.cond.notify_all(); + } + + let lock = pad.stream_lock(); + { + let mut state = self.state.lock(); + state.in_caps = None; + state.in_audio_info = None; + state.in_item = None; + state.update_fallback_duration(); + } + drop(lock); + } + + Ok(()) + } + + fn src_activate_mode( + &self, + pad: &gst::Pad, + mode: gst::PadMode, + active: bool, + ) -> Result<(), gst::LoggableError> { + if mode != gst::PadMode::Push { + return Err(gst::loggable_error!(CAT, "Wrong scheduling mode")); + } + + if active { + let ret; + + { + let mut state = self.state.lock(); + + state.srcresult = Ok(gst::FlowSuccess::Ok); + state.sent_segment = false; + state.out_timestamp = None; + state.num_out = 0; + state.num_duplicate = 0; + + ret = self.start_src_task().map_err(Into::into); + } + + ret + } else { + { + let mut state = self.state.lock(); + state.srcresult = Err(gst::FlowError::Flushing); + state.out_buffer = None; + state.out_audio_info = None; + if let Some(clock_id) = state.clock_id.take() { + clock_id.unschedule(); + } + self.cond.notify_all(); + } + + pad.stop_task().map_err(Into::into) + } + } + + fn sink_event(&self, pad: &gst::Pad, mut event: gst::Event) -> bool { + { + let state = self.state.lock(); + if state.single_segment { + let event = event.make_mut(); + let latency = state.latency.nseconds() as i64; + event.set_running_time_offset(event.running_time_offset() + latency); + } + } + + match event.view() { + gst::EventView::FlushStart(_) => { + let ret = self.srcpad.push_event(event); + + { + let mut state = self.state.lock(); + state.srcresult = Err(gst::FlowError::Flushing); + if let Some(clock_id) = state.clock_id.take() { + clock_id.unschedule(); + } + self.cond.notify_all(); + } + + let _ = self.srcpad.pause_task(); + return ret; + } + + gst::EventView::FlushStop(_) => { + let ret = self.srcpad.push_event(event); + + let mut state = self.state.lock(); + state.srcresult = Ok(gst::FlowSuccess::Ok); + state.eos = false; + state.sent_segment = false; + state.segment = None; + state.in_caps = None; + state.in_audio_info = None; + state.out_audio_info = None; + state.in_item = None; + state.out_buffer = None; + state.update_fallback_duration(); + + let _ = self.start_src_task(); + return ret; + } + + gst::EventView::StreamStart(_) => { + let mut state = self.state.lock(); + state.srcresult = Ok(gst::FlowSuccess::Ok); + state.eos = false; + } + + gst::EventView::Segment(e) => { + let segment = match e.segment().downcast_ref() { + Some(s) => s, + None => { + gst::error!(CAT, imp: self, "Got non-TIME segment"); + return false; + } + }; + + let mut state = self.state.lock(); + state.segment = Some(segment.clone()); + state.sent_segment = false; + return true; + } + + gst::EventView::Gap(_) => { + gst::debug!(CAT, imp: self, "Got gap event"); + return true; + } + + gst::EventView::Eos(_) => { + let mut state = self.state.lock(); + + if let Err(err) = state.srcresult { + if matches!(err, gst::FlowError::Flushing | gst::FlowError::Eos) { + let err = state.srcresult.unwrap_err(); + gst::element_imp_error!( + self, + gst::StreamError::Failed, + ("Internal data flow error."), + ["streaming task paused, reason {} ({:?})", err, err] + ); + } + } + + state.eos = true; + } + + gst::EventView::Caps(c) => { + let caps = c.caps_owned(); + + let audio_info = match caps + .structure(0) + .unwrap() + .has_name("audio/x-raw") + .then(|| gst_audio::AudioInfo::from_caps(&caps)) + .transpose() + { + Ok(ai) => ai, + Err(e) => { + gst::error!(CAT, imp: self, "Failed to parse audio caps: {}", e); + return false; + } + }; + + let mut state = self.state.lock(); + state.in_caps = Some(caps); + state.in_audio_info = audio_info; + state.update_fallback_duration(); + return true; + } + + _ => {} + } + + if event.is_serialized() { + let mut state = self.state.lock(); + while state.srcresult.is_ok() && state.in_item.is_some() { + self.cond.wait(&mut state); + } + + if state.srcresult.is_err() { + return false; + } + + gst::trace!(CAT, imp: self, "Queueing {:?}", event); + state.in_item = Some(Item::Event(event)); + self.cond.notify_all(); + + true + } else { + gst::Pad::event_default(pad, Some(&*self.obj()), event) + } + } + + fn src_event(&self, pad: &gst::Pad, mut event: gst::Event) -> bool { + { + let state = self.state.lock(); + if state.single_segment { + let event = event.make_mut(); + let latency = state.latency.nseconds() as i64; + event.set_running_time_offset(event.running_time_offset() - latency); + } + } + + match event.view() { + gst::EventView::Reconfigure(_) => { + { + let mut state = self.state.lock(); + if state.srcresult == Err(gst::FlowError::NotLinked) { + state.srcresult = Ok(gst::FlowSuccess::Ok); + let _ = self.start_src_task(); + } + } + self.sinkpad.push_event(event) + } + + _ => gst::Pad::event_default(pad, Some(&*self.obj()), event), + } + } + + fn sink_query(&self, pad: &gst::Pad, query: &mut gst::QueryRef) -> bool { + if query.is_serialized() { + let (sender, receiver) = mpsc::sync_channel(1); + + let mut state = self.state.lock(); + while state.srcresult.is_ok() && state.in_item.is_some() { + self.cond.wait(&mut state); + } + + if state.srcresult.is_err() { + return false; + } + + gst::trace!(CAT, imp: self, "Queueing {:?}", query); + state.in_item = Some(Item::Query(std::ptr::NonNull::from(query), sender)); + self.cond.notify_all(); + drop(state); + + receiver.recv().unwrap_or(false) + } else { + gst::Pad::query_default(pad, Some(&*self.obj()), query) + } + } + + fn src_query(&self, pad: &gst::Pad, query: &mut gst::QueryRef) -> bool { + match query.view_mut() { + gst::QueryViewMut::Latency(_) => { + if !gst::Pad::query_default(pad, Some(&*self.obj()), query) { + return false; + } + + let q = match query.view_mut() { + gst::QueryViewMut::Latency(q) => q, + _ => unreachable!(), + }; + + let mut state = self.state.lock(); + let latency = state.latency; + + let (_live, min, max) = q.result(); + q.set(true, min + latency, max.map(|max| max + latency)); + + state.upstream_latency = Some(min); + true + } + + _ => gst::Pad::query_default(pad, Some(&*self.obj()), query), + } + } + + fn sink_chain( + &self, + _pad: &gst::Pad, + mut buffer: gst::Buffer, + ) -> Result { + gst::trace!(CAT, imp: self, "incoming {:?}", buffer); + + let mut state = self.state.lock(); + + if state.upstream_latency.is_none() { + gst::debug!(CAT, imp: self, "Have no upstream latency yet, querying"); + let mut q = gst::query::Latency::new(); + if MutexGuard::unlocked(&mut state, || self.sinkpad.peer_query(&mut q)) { + let (live, min, max) = q.result(); + + gst::debug!( + CAT, + imp: self, + "Latency query response: live {} min {} max {}", + live, + min, + max.display() + ); + + state.upstream_latency = Some(min); + } else { + gst::warning!( + CAT, + imp: self, + "Can't query upstream latency -- assuming zero" + ); + } + } + + while state.srcresult.is_ok() && state.in_item.is_some() { + self.cond.wait(&mut state); + } + state.srcresult?; + + let buf_mut = buffer.make_mut(); + + if buf_mut.pts().is_none() { + gst::warning!(CAT, imp: self, "incoming buffer has no timestamps"); + } + + if let Some(audio_info) = &state.in_audio_info { + let buf_duration = buf_mut.duration().unwrap_or_default(); + if let Some(calc_duration) = audio_info + .convert::>(Some(gst::format::Bytes::from_usize( + buf_mut.size(), + ))) + .flatten() + { + let diff = if buf_duration < calc_duration { + calc_duration - buf_duration + } else { + buf_duration - calc_duration + }; + + if diff.nseconds() > 1 { + gst::warning!( + CAT, + imp: self, + "Correcting duration on audio buffer from {} to {}", + buf_duration, + calc_duration, + ); + buf_mut.set_duration(calc_duration); + } + } else { + gst::debug!( + CAT, + imp: self, + "Failed to calculate duration of {:?}", + buf_mut, + ); + } + } + + if state.single_segment { + // At this stage we should really really have a segment + let segment = state.segment.as_ref().ok_or(gst::FlowError::Error)?; + let dts = segment + .to_running_time_full(buf_mut.dts()) + .map(|r| r + SEGMENT_OFFSET) + .and_then(|r| r.positive()); + let pts = segment + .to_running_time_full(buf_mut.pts()) + .map(|r| r + SEGMENT_OFFSET) + .and_then(|r| r.positive()) + .or_else(|| { + self.obj() + .current_running_time() + .map(|r| r + SEGMENT_OFFSET) + }); + + buf_mut.set_dts(dts.map(|t| t + state.latency)); + buf_mut.set_pts(pts.map(|t| t + state.latency)); + } + + if buf_mut.duration().is_none() { + gst::debug!(CAT, imp: self, "incoming buffer without duration"); + buf_mut.set_duration(Some(state.fallback_duration)); + } + + if state + .out_buffer + .as_ref() + .map_or(false, |b| b.flags().contains(gst::BufferFlags::GAP)) + { + // We are done bridging a gap, so mark it as DISCONT instead + buf_mut.unset_flags(gst::BufferFlags::GAP); + buf_mut.set_flags(gst::BufferFlags::DISCONT); + } + + let mut timestamp = state.ts_range(buf_mut); + let lateness = self.buffer_is_backwards(&state, timestamp); + match lateness { + BufferLateness::OnTime => {} + + BufferLateness::LateUnderThreshold => { + gst::debug!(CAT, imp: self, "discarding late {:?}", buf_mut); + state.num_drop += 1; + return Ok(gst::FlowSuccess::Ok); + } + + BufferLateness::LateOverThreshold => { + gst::debug!(CAT, imp: self, "accepting late {:?}", buf_mut); + + let prev = state.out_buffer.as_ref().unwrap(); + let prev_duration = prev.duration().unwrap(); + + if let Some(audio_info) = &state.in_audio_info { + let mut map_info = buf_mut.map_writable().map_err(|e| { + gst::error!(CAT, imp: self, "Failed to map buffer: {}", e); + gst::FlowError::Error + })?; + + audio_info + .format_info() + .fill_silence(map_info.as_mut_slice()); + } else { + buf_mut.set_duration(Some(state.fallback_duration)); + } + + buf_mut.set_dts(prev.dts().map(|t| t + prev_duration)); + buf_mut.set_pts(prev.pts().map(|t| t + prev_duration)); + buf_mut.set_flags(gst::BufferFlags::GAP); + + timestamp = state.ts_range(buf_mut); + } + } + + gst::trace!(CAT, imp: self, "Queueing {:?} ({:?})", buffer, lateness); + state.in_item = Some(Item::Buffer(buffer, lateness)); + state.in_timestamp = timestamp; + state.num_in += 1; + self.cond.notify_all(); + + Ok(gst::FlowSuccess::Ok) + } + + fn start_src_task(&self) -> Result<(), glib::BoolError> { + self.srcpad.start_task({ + let pad = self.srcpad.downgrade(); + move || { + let pad = pad.upgrade().unwrap(); + let parent = pad.parent_element().unwrap(); + let livesync = parent.downcast_ref::().unwrap(); + let ret = livesync.imp().src_loop(&pad); + + if !ret { + gst::log!(CAT, obj: &parent, "Loop stopping"); + let _ = pad.pause_task(); + } + } + }) + } + + fn src_loop(&self, pad: &gst::Pad) -> bool { + let mut err = match self.src_loop_inner() { + Ok(_) => return true, + Err(e) => e, + }; + let eos; + + { + let mut state = self.state.lock(); + + match state.srcresult { + // Can be set to Flushing by another thread + Err(e) => err = e, + + // Communicate our flow return + Ok(_) => state.srcresult = Err(err), + } + eos = state.eos; + state.clock_id = None; + + self.cond.notify_all(); + } + + if eos && !matches!(err, gst::FlowError::Flushing | gst::FlowError::Eos) { + gst::element_imp_error!( + self, + gst::StreamError::Failed, + ("Internal data flow error."), + ["streaming task paused, reason {} ({:?})", err, err] + ); + pad.push_event(gst::event::Eos::new()); + } + + false + } + + fn src_loop_inner(&self) -> Result { + let mut state = self.state.lock(); + while state.srcresult.is_ok() + && (!state.playing || (state.in_item.is_none() && state.out_buffer.is_none())) + { + self.cond.wait(&mut state); + } + state.srcresult?; + + gst::trace!(CAT, imp: self, "Unqueueing {:?}", state.in_item); + let in_buffer = match state.in_item.take() { + None => None, + + Some(Item::Buffer(buffer, lateness)) => { + if self.buffer_is_early(&state, state.in_timestamp) { + // Try this buffer again on the next iteration + state.in_item = Some(Item::Buffer(buffer, lateness)); + None + } else { + Some((buffer, lateness)) + } + } + + Some(Item::Event(event)) => { + self.cond.notify_all(); + drop(state); + + self.srcpad.push_event(event); + + return Ok(gst::FlowSuccess::Ok); + } + + Some(Item::Query(mut query, sender)) => { + self.cond.notify_all(); + drop(state); + + // SAFETY: The other thread is waiting for us to handle the query + let res = self.srcpad.peer_query(unsafe { query.as_mut() }); + sender.send(res).ok(); + + return Ok(gst::FlowSuccess::Ok); + } + }; + + let (duplicate, caps) = if let Some((buffer, lateness)) = in_buffer { + let caps = state.in_caps.take(); + + state.out_buffer = Some(buffer); + state.out_timestamp = state.in_timestamp; + + if caps.is_some() { + state.out_audio_info = state.in_audio_info.clone(); + } + + self.cond.notify_all(); + + (lateness != BufferLateness::OnTime, caps) + } else { + // Work around borrow checker + let State { + fallback_duration, + out_buffer: ref mut buffer, + out_audio_info: ref audio_info, + .. + } = *state; + gst::debug!(CAT, imp: self, "repeating {:?}", buffer); + + let buffer = buffer.as_mut().unwrap().make_mut(); + let prev_duration = buffer.duration().unwrap(); + + if let Some(audio_info) = audio_info { + if !buffer.flags().contains(gst::BufferFlags::GAP) { + let mut map_info = buffer.map_writable().map_err(|e| { + gst::error!(CAT, imp: self, "Failed to map buffer: {}", e); + gst::FlowError::Error + })?; + + audio_info + .format_info() + .fill_silence(map_info.as_mut_slice()); + } + } else { + buffer.set_duration(Some(fallback_duration)); + } + + buffer.set_dts(buffer.dts().map(|t| t + prev_duration)); + buffer.set_pts(buffer.pts().map(|t| t + prev_duration)); + buffer.set_flags(gst::BufferFlags::GAP); + buffer.unset_flags(gst::BufferFlags::DISCONT); + + state.out_timestamp = state.ts_range(state.out_buffer.as_ref().unwrap()); + (true, None) + }; + + let buffer = state.out_buffer.clone().unwrap(); + let sync_ts = state + .out_timestamp + .map_or(gst::ClockTime::ZERO, |t| t.start); + + if let Some(caps) = caps { + gst::debug!(CAT, imp: self, "Sending new caps: {}", caps); + + let event = gst::event::Caps::new(&caps); + MutexGuard::unlocked(&mut state, || self.srcpad.push_event(event)); + state.srcresult?; + } + + if !state.sent_segment { + let event = if state.single_segment { + // Create live segment + let mut segment = gst::FormattedSegment::::new(); + segment.set_start(sync_ts + SEGMENT_OFFSET); + segment.set_base(sync_ts); + segment.set_time(sync_ts); + segment.set_position(sync_ts + SEGMENT_OFFSET); + + gst::debug!(CAT, imp: self, "Sending new segment: {:?}", segment); + gst::event::Segment::new(&segment) + } else { + let segment = state.segment.as_ref().unwrap(); + + gst::debug!(CAT, imp: self, "Forwarding segment: {:?}", segment); + gst::event::Segment::new(segment) + }; + + MutexGuard::unlocked(&mut state, || self.srcpad.push_event(event)); + state.srcresult?; + state.sent_segment = true; + } + + { + let element = self.obj(); + + let base_time = element.base_time().ok_or_else(|| { + gst::error!(CAT, imp: self, "Missing base time"); + gst::FlowError::Flushing + })?; + + let clock = element.clock().ok_or_else(|| { + gst::error!(CAT, imp: self, "Missing clock"); + gst::FlowError::Flushing + })?; + + let clock_id = clock.new_single_shot_id(base_time + sync_ts); + state.clock_id = Some(clock_id.clone()); + + gst::trace!( + CAT, + imp: self, + "Waiting for clock to reach {}", + clock_id.time(), + ); + + let (res, _) = MutexGuard::unlocked(&mut state, || clock_id.wait()); + gst::trace!(CAT, imp: self, "Clock returned {res:?}",); + + if res == Err(gst::ClockError::Unscheduled) { + return Err(gst::FlowError::Flushing); + } + + state.srcresult?; + state.clock_id = None; + } + + state.num_out += 1; + if duplicate { + state.num_duplicate += 1; + } + + drop(state); + + gst::trace!(CAT, imp: self, "Pushing {buffer:?}"); + self.srcpad.push(buffer) + } + + fn buffer_is_backwards(&self, state: &State, timestamp: Option) -> BufferLateness { + let timestamp = match timestamp { + Some(t) => t, + None => return BufferLateness::OnTime, + }; + + let out_timestamp = match state.out_timestamp { + Some(t) => t, + None => return BufferLateness::OnTime, + }; + + if timestamp.end > out_timestamp.end { + return BufferLateness::OnTime; + } + + gst::debug!( + CAT, + imp: self, + "Timestamp regresses: buffer ends at {}, expected {}", + timestamp.end, + out_timestamp.end, + ); + + let late_threshold = match state.late_threshold { + Some(gst::ClockTime::ZERO) => return BufferLateness::LateOverThreshold, + Some(t) => t, + None => return BufferLateness::LateUnderThreshold, + }; + + let in_timestamp = match state.in_timestamp { + Some(t) => t, + None => return BufferLateness::LateUnderThreshold, + }; + + if timestamp.start > in_timestamp.end + late_threshold { + BufferLateness::LateOverThreshold + } else { + BufferLateness::LateUnderThreshold + } + } + + fn buffer_is_early(&self, state: &State, timestamp: Option) -> bool { + let timestamp = match timestamp { + Some(t) => t, + None => return false, + }; + + let out_timestamp = match state.out_timestamp { + Some(t) => t, + None => return false, + }; + + let slack = state + .out_buffer + .as_deref() + .map_or(gst::ClockTime::ZERO, |b| b.duration().unwrap()); + + if timestamp.start < out_timestamp.end + slack { + return false; + } + + gst::debug!( + CAT, + imp: self, + "Timestamp is too early: buffer starts at {}, expected {}", + timestamp.start, + out_timestamp.end, + ); + + true + } +} diff --git a/utils/livesync/src/livesync/mod.rs b/utils/livesync/src/livesync/mod.rs new file mode 100644 index 00000000..ab36d123 --- /dev/null +++ b/utils/livesync/src/livesync/mod.rs @@ -0,0 +1,28 @@ +// Copyright (C) 2022 LTN Global Communications, Inc. +// Contact: Jan Alexander Steffens (heftig) +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +use gst::{glib, prelude::*}; + +mod imp; + +glib::wrapper! { + pub struct LiveSync(ObjectSubclass) + @extends gst::Element, gst::Object; +} + +pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { + gst::Element::register( + Some(plugin), + "livesync", + gst::Rank::None, + LiveSync::static_type(), + )?; + + Ok(()) +} diff --git a/utils/livesync/tests/livesync.rs b/utils/livesync/tests/livesync.rs new file mode 100644 index 00000000..595f523b --- /dev/null +++ b/utils/livesync/tests/livesync.rs @@ -0,0 +1,198 @@ +// Copyright (C) 2022 LTN Global Communications, Inc. +// Contact: Jan Alexander Steffens (heftig) +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +use gst::prelude::*; + +fn init() { + use std::sync::Once; + static INIT: Once = Once::new(); + + INIT.call_once(|| { + gst::init().unwrap(); + gstlivesync::plugin_register_static().expect("Failed to register livesync plugin"); + }); +} + +const DURATION: gst::ClockTime = gst::ClockTime::from_mseconds(100); +const LATENCY: gst::ClockTime = gst::ClockTime::from_mseconds(200); +const SEGMENT_OFFSET: gst::ClockTime = gst::ClockTime::from_seconds(60 * 60 * 1000); + +fn crank_pull(harness: &mut gst_check::Harness) -> gst::Buffer { + harness.crank_single_clock_wait().unwrap(); + harness.pull().unwrap() +} + +#[track_caller] +fn assert_buf( + buf: &gst::BufferRef, + offset: u64, + pts: gst::ClockTime, + duration: gst::ClockTime, + flags: gst::BufferFlags, +) { + assert_eq!(buf.offset(), offset, "Bad offset"); + assert_eq!(buf.pts(), Some(pts), "Bad PTS"); + assert_eq!(buf.duration(), Some(duration), "Bad duration"); + assert_eq!( + buf.flags() - gst::BufferFlags::TAG_MEMORY, + flags, + "Bad flags", + ); +} + +#[track_caller] +fn assert_crank_pull( + harness: &mut gst_check::Harness, + offset_per_buffer: u64, + src_buffer_number: u64, + sink_buffer_number: u64, + flags: gst::BufferFlags, + singlesegment: bool, +) { + let pts = if singlesegment { + LATENCY + DURATION * sink_buffer_number + SEGMENT_OFFSET + } else { + DURATION * sink_buffer_number + }; + assert_buf( + &crank_pull(harness), + offset_per_buffer * src_buffer_number, + pts, + DURATION, + flags, + ); +} + +#[test] +fn test_video_singlesegment() { + test_video(true); +} + +#[test] +fn test_audio_singlesegment() { + test_audio(true); +} + +#[test] +fn test_video_nonsinglesegment() { + test_video(false); +} + +#[test] +fn test_audio_nonsinglesegment() { + test_audio(false); +} + +fn test_video(singlesegment: bool) { + init(); + + let mut h = gst_check::Harness::new("livesync"); + h.add_src_parse( + r"videotestsrc is-live=1 + ! capsfilter caps=video/x-raw,framerate=10/1 + ", + true, + ); + + let element = h.element().unwrap(); + element.set_property("latency", LATENCY); + element.set_property("single-segment", singlesegment); + + test_livesync(&mut h, 1, singlesegment); +} + +fn test_audio(singlesegment: bool) { + init(); + + let mut h = gst_check::Harness::new("livesync"); + h.add_src_parse( + r"audiotestsrc is-live=1 samplesperbuffer=4800 + ! capsfilter caps=audio/x-raw,rate=48000 + ", + true, + ); + + let element = h.element().unwrap(); + element.set_property("latency", LATENCY); + element.set_property("single-segment", singlesegment); + + test_livesync(&mut h, 4800, singlesegment); +} + +fn test_livesync(h: &mut gst_check::Harness, o: u64, singlesegment: bool) { + // Normal operation ------------------------------ + + // Push frames 0-1, pull frame 0 + h.push_from_src().unwrap(); + h.push_from_src().unwrap(); + assert_eq!(h.pull_event().unwrap().type_(), gst::EventType::StreamStart); + assert_eq!(h.pull_event().unwrap().type_(), gst::EventType::Caps); + assert_eq!(h.pull_event().unwrap().type_(), gst::EventType::Segment); + assert_crank_pull(h, o, 0, 0, gst::BufferFlags::DISCONT, singlesegment); + + // Push frames 2-10, pull frames 1-9 + for i in 1..=9 { + h.push_from_src().unwrap(); + assert_crank_pull(h, o, i, i, gst::BufferFlags::empty(), singlesegment); + } + + // Pull frame 10 + assert_crank_pull(h, o, 10, 10, gst::BufferFlags::empty(), singlesegment); + + // Bridging gap ---------------------------------- + + // Pull frames 11-19 + for i in 11..=19 { + assert_crank_pull(h, o, 10, i, gst::BufferFlags::GAP, singlesegment); + } + + // Push frames 11-19 + for _ in 11..=19 { + h.push_from_src().unwrap(); + } + + // Normal operation ------------------------------ + + // Push frames 20-21, pull frame 20 + for _ in 1..=2 { + let mut src_h = h.src_harness_mut().unwrap(); + src_h.crank_single_clock_wait().unwrap(); + let mut buf = src_h.pull().unwrap(); + let buf_mut = buf.make_mut(); + buf_mut.set_flags(gst::BufferFlags::MARKER); + h.push(buf).unwrap(); + } + assert_crank_pull(h, o, 10, 20, gst::BufferFlags::GAP, singlesegment); + + // Push frame 22, pull frame 21 + h.push_from_src().unwrap(); + assert_crank_pull( + h, + o, + 21, + 21, + gst::BufferFlags::DISCONT | gst::BufferFlags::MARKER, + singlesegment, + ); + + // Push frames 23-30, pull frames 22-29 + for i in 22..=29 { + h.push_from_src().unwrap(); + assert_crank_pull(h, o, i, i, gst::BufferFlags::empty(), singlesegment); + } + + // EOS ------------------------------------------- + assert!(h.push_event(gst::event::Eos::new())); + + // Pull frame 30 + assert_crank_pull(h, o, 30, 30, gst::BufferFlags::empty(), singlesegment); + + assert_eq!(h.pull_event().unwrap().type_(), gst::EventType::Eos); + assert_eq!(h.try_pull(), None); +}