diff --git a/generic/threadshare/examples/standalone/args/clap_args.rs b/generic/threadshare/examples/standalone/args/clap_args.rs new file mode 100644 index 00000000..60291f27 --- /dev/null +++ b/generic/threadshare/examples/standalone/args/clap_args.rs @@ -0,0 +1,66 @@ +use super::super::CAT; +use clap::Parser; + +#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, clap::ValueEnum)] +pub enum Sink { + /// Item handling in PadHandler with async Mutex + AsyncMutex, + /// Item handling in PadHandler with sync Mutex + SyncMutex, + /// Item handling in runtime::Task + Task, +} + +impl Sink { + pub fn element_name(self) -> &'static str { + use super::super::sink; + use Sink::*; + match self { + AsyncMutex => sink::ASYNC_MUTEX_ELEMENT_NAME, + SyncMutex => sink::SYNC_MUTEX_ELEMENT_NAME, + Task => sink::TASK_ELEMENT_NAME, + } + } +} + +#[derive(Parser, Debug)] +#[clap(version)] +#[clap( + about = "Standalone pipeline threadshare runtime test. Use `GST_DEBUG=ts-standalone*:4` for stats" +)] +pub struct Args { + /// Parallel streams to process. + #[clap(short, long, default_value_t = 5000)] + pub streams: u32, + + /// Threadshare groups. + #[clap(short, long, default_value_t = 2)] + pub groups: u32, + + /// Threadshare Context wait in ms (max throttling duration). + #[clap(short, long, default_value_t = 20)] + pub wait: u32, + + /// Buffer push period in ms. + #[clap(short, long, default_value_t = 20)] + pub push_period: u32, + + /// Number of buffers per stream to output before sending EOS (-1 = unlimited). + #[clap(short, long, default_value_t = 5000)] + pub num_buffers: i32, + + /// The Sink variant to use. + #[clap(long, value_enum, default_value_t = Sink::SyncMutex)] + pub sink: Sink, + + /// Disables statistics logging. + #[clap(short, long)] + pub disable_stats_log: bool, +} + +pub fn args() -> Args { + let args = Args::parse(); + gst::info!(CAT, "{:?}", args); + + args +} diff --git a/generic/threadshare/examples/standalone/args/default_args.rs b/generic/threadshare/examples/standalone/args/default_args.rs new file mode 100644 index 00000000..9f84aaf4 --- /dev/null +++ b/generic/threadshare/examples/standalone/args/default_args.rs @@ -0,0 +1,47 @@ +use super::super::CAT; + +#[derive(Copy, Clone, Debug)] +pub struct SyncMutexSink; + +impl SyncMutexSink { + pub fn element_name(self) -> &'static str { + super::super::sink::SYNC_MUTEX_ELEMENT_NAME + } +} + +#[derive(Debug)] +pub struct Args { + pub streams: u32, + pub groups: u32, + pub wait: u32, + pub push_period: u32, + pub num_buffers: i32, + pub sink: SyncMutexSink, + pub disable_stats_log: bool, +} + +impl Default for Args { + fn default() -> Self { + Args { + streams: 5000, + groups: 2, + wait: 20, + push_period: 20, + num_buffers: 5000, + sink: SyncMutexSink, + disable_stats_log: false, + } + } +} + +pub fn args() -> Args { + if std::env::args().len() > 1 { + gst::warning!(CAT, "Ignoring command line arguments"); + gst::warning!(CAT, "Build with `--features=clap`"); + } + + let args = Args::default(); + gst::warning!(CAT, "{:?}", args); + + args +} diff --git a/generic/threadshare/examples/standalone/args/mod.rs b/generic/threadshare/examples/standalone/args/mod.rs new file mode 100644 index 00000000..483b0a26 --- /dev/null +++ b/generic/threadshare/examples/standalone/args/mod.rs @@ -0,0 +1,9 @@ +#[cfg(not(feature = "clap"))] +mod default_args; +#[cfg(not(feature = "clap"))] +pub use default_args::*; + +#[cfg(feature = "clap")] +mod clap_args; +#[cfg(feature = "clap")] +pub use clap_args::*; diff --git a/generic/threadshare/examples/standalone/macros.rs b/generic/threadshare/examples/standalone/macros.rs new file mode 100644 index 00000000..dc6043ef --- /dev/null +++ b/generic/threadshare/examples/standalone/macros.rs @@ -0,0 +1,19 @@ +macro_rules! debug_or_trace { + ($cat:expr, $raise_log_level:expr, $qual:ident: $obj:expr, $rest:tt $(,)?) => { + if $raise_log_level { + gst::debug!($cat, $qual: $obj, $rest); + } else { + gst::trace!($cat, $qual: $obj, $rest); + } + }; +} + +macro_rules! log_or_trace { + ($cat:expr, $raise_log_level:expr, $qual:ident: $obj:expr, $rest:tt $(,)?) => { + if $raise_log_level { + gst::log!($cat, $qual: $obj, $rest); + } else { + gst::trace!($cat, $qual: $obj, $rest); + } + }; +} diff --git a/generic/threadshare/examples/standalone/main.rs b/generic/threadshare/examples/standalone/main.rs index 28064331..ff1edc41 100644 --- a/generic/threadshare/examples/standalone/main.rs +++ b/generic/threadshare/examples/standalone/main.rs @@ -1,12 +1,21 @@ use gst::glib; use once_cell::sync::Lazy; +mod args; +use args::*; + +#[macro_use] +mod macros; + mod sink; mod src; +use std::sync::atomic::{AtomicU32, Ordering}; +use std::sync::Arc; + static CAT: Lazy = Lazy::new(|| { gst::DebugCategory::new( - "ts-standalone-test-main", + "ts-standalone-main", gst::DebugColorFlags::empty(), Some("Thread-sharing standalone test main"), ) @@ -14,6 +23,8 @@ static CAT: Lazy = Lazy::new(|| { fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { src::register(plugin)?; + sink::async_mutex::register(plugin)?; + sink::sync_mutex::register(plugin)?; sink::task::register(plugin)?; Ok(()) @@ -31,91 +42,6 @@ gst::plugin_define!( env!("BUILD_REL_DATE") ); -#[cfg(feature = "clap")] -use clap::Parser; - -#[cfg(feature = "clap")] -#[derive(Parser, Debug)] -#[clap(version)] -#[clap( - about = "Standalone pipeline threadshare runtime test. Use `GST_DEBUG=ts-standalone*:4` for stats" -)] -struct Args { - /// Parallel streams to process. - #[clap(short, long, default_value_t = 5000)] - streams: u32, - - /// Threadshare groups. - #[clap(short, long, default_value_t = 2)] - groups: u32, - - /// Threadshare Context wait in ms (max throttling duration). - #[clap(short, long, default_value_t = 20)] - wait: u32, - - /// Buffer push period in ms. - #[clap(short, long, default_value_t = 20)] - push_period: u32, - - /// Number of buffers per stream to output before sending EOS (-1 = unlimited). - #[clap(short, long, default_value_t = 5000)] - num_buffers: i32, - - /// Disables statistics logging. - #[clap(short, long)] - disable_stats_log: bool, -} - -#[cfg(not(feature = "clap"))] -#[derive(Debug)] -struct Args { - streams: u32, - groups: u32, - wait: u32, - push_period: u32, - num_buffers: i32, - disable_stats_log: bool, -} - -#[cfg(not(feature = "clap"))] -impl Default for Args { - fn default() -> Self { - Args { - streams: 5000, - groups: 2, - wait: 20, - push_period: 20, - num_buffers: 5000, - disable_stats_log: false, - } - } -} - -fn args() -> Args { - #[cfg(feature = "clap")] - let args = { - let args = Args::parse(); - gst::info!(CAT, "{:?}", args); - - args - }; - - #[cfg(not(feature = "clap"))] - let args = { - if std::env::args().len() > 1 { - gst::warning!(CAT, "Ignoring command line arguments"); - gst::warning!(CAT, "Build with `--features=clap`"); - } - - let args = Args::default(); - gst::warning!(CAT, "{:?}", args); - - args - }; - - args -} - fn main() { use gst::prelude::*; use std::time::Instant; @@ -133,8 +59,8 @@ fn main() { for i in 0..args.streams { let ctx_name = format!("standalone {}", i % args.groups); - let src = gst::ElementFactory::make("ts-standalone-test-src") - .name(format!("src-{}", i).as_str()) + let src = gst::ElementFactory::make(src::ELEMENT_NAME) + .name(format!("src-{i}").as_str()) .property("context", &ctx_name) .property("context-wait", args.wait) .property("push-period", args.push_period) @@ -142,16 +68,16 @@ fn main() { .build() .unwrap(); - let sink = gst::ElementFactory::make("ts-standalone-test-sink") - .name(format!("sink-{}", i).as_str()) + let sink = gst::ElementFactory::make(args.sink.element_name()) + .name(format!("sink-{i}").as_str()) .property("context", &ctx_name) .property("context-wait", args.wait) .build() .unwrap(); if i == 0 { - src.set_property("raise-log-level", true); - sink.set_property("raise-log-level", true); + src.set_property("main-elem", true); + sink.set_property("main-elem", true); if !args.disable_stats_log { // Don't use the last 5 secs in stats @@ -179,30 +105,46 @@ fn main() { let l = glib::MainLoop::new(None, false); let bus = pipeline.bus().unwrap(); + let terminated_count = Arc::new(AtomicU32::new(0)); let pipeline_clone = pipeline.clone(); let l_clone = l.clone(); bus.add_watch(move |_, msg| { use gst::MessageView; - match msg.view() { MessageView::Eos(_) => { + // Actually, we don't post EOS (see sinks impl). gst::info!(CAT, "Received eos"); l_clone.quit(); + + glib::Continue(false) } - MessageView::Error(err) => { + MessageView::Error(msg) => { + if let gst::MessageView::Error(msg) = msg.message().view() { + if msg.error().matches(gst::LibraryError::Shutdown) { + if terminated_count.fetch_add(1, Ordering::SeqCst) == args.streams - 1 { + gst::info!(CAT, "Received all shutdown requests"); + l_clone.quit(); + + return glib::Continue(false); + } else { + return glib::Continue(true); + } + } + } + gst::error!( CAT, "Error from {:?}: {} ({:?})", - err.src().map(|s| s.path_string()), - err.error(), - err.debug() + msg.src().map(|s| s.path_string()), + msg.error(), + msg.debug() ); l_clone.quit(); - } - _ => (), - }; - glib::Continue(true) + glib::Continue(false) + } + _ => glib::Continue(true), + } }) .expect("Failed to add bus watch"); diff --git a/generic/threadshare/examples/standalone/sink/async_mutex/imp.rs b/generic/threadshare/examples/standalone/sink/async_mutex/imp.rs new file mode 100644 index 00000000..b909ab48 --- /dev/null +++ b/generic/threadshare/examples/standalone/sink/async_mutex/imp.rs @@ -0,0 +1,334 @@ +// Copyright (C) 2022 François Laignel +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +use futures::future::BoxFuture; +use futures::prelude::*; + +use gst::glib; +use gst::prelude::*; +use gst::subclass::prelude::*; +use gst::EventView; + +use once_cell::sync::Lazy; + +use gstthreadshare::runtime::executor::block_on_or_add_sub_task; +use gstthreadshare::runtime::{prelude::*, PadSink}; + +use std::sync::{Arc, Mutex}; +use std::time::Duration; + +use super::super::{Settings, Stats, CAT}; + +#[derive(Debug, Default)] +struct PadSinkHandlerInner { + is_flushing: bool, + is_main_elem: bool, + last_dts: Option, + segment_start: Option, + stats: Option>, +} + +impl PadSinkHandlerInner { + fn handle_buffer( + &mut self, + elem: &super::AsyncMutexSink, + buffer: gst::Buffer, + ) -> Result<(), gst::FlowError> { + if self.is_flushing { + log_or_trace!( + CAT, + self.is_main_elem, + obj: elem, + "Discarding {buffer:?} (flushing)" + ); + + return Err(gst::FlowError::Flushing); + } + + debug_or_trace!(CAT, self.is_main_elem, obj: elem, "Received {buffer:?}"); + + let dts = buffer + .dts() + .expect("Buffer without dts") + .checked_sub(self.segment_start.expect("Buffer without Time Segment")) + .expect("dts before Segment start"); + + if let Some(last_dts) = self.last_dts { + let cur_ts = elem.current_running_time().unwrap(); + let latency: Duration = (cur_ts - dts).into(); + let interval: Duration = (dts - last_dts).into(); + + if let Some(stats) = self.stats.as_mut() { + stats.add_buffer(latency, interval); + } + + debug_or_trace!(CAT, self.is_main_elem, obj: elem, "o latency {latency:.2?}"); + debug_or_trace!( + CAT, + self.is_main_elem, + obj: elem, + "o interval {interval:.2?}", + ); + } + + self.last_dts = Some(dts); + + log_or_trace!(CAT, self.is_main_elem, obj: elem, "Buffer processed"); + + Ok(()) + } +} + +#[derive(Clone, Debug, Default)] +struct AsyncPadSinkHandler(Arc>); + +impl PadSinkHandler for AsyncPadSinkHandler { + type ElementImpl = AsyncMutexSink; + + fn sink_chain( + self, + _pad: gst::Pad, + elem: super::AsyncMutexSink, + buffer: gst::Buffer, + ) -> BoxFuture<'static, Result> { + async move { + if self.0.lock().await.handle_buffer(&elem, buffer).is_err() { + return Err(gst::FlowError::Flushing); + } + + Ok(gst::FlowSuccess::Ok) + } + .boxed() + } + + fn sink_event_serialized( + self, + _pad: gst::Pad, + elem: super::AsyncMutexSink, + event: gst::Event, + ) -> BoxFuture<'static, bool> { + async move { + match event.view() { + EventView::Eos(_) => { + { + let mut inner = self.0.lock().await; + debug_or_trace!(CAT, inner.is_main_elem, obj: elem, "EOS"); + inner.is_flushing = true; + } + + // When each element sends its own EOS message, + // it takes ages for the pipeline to process all of them. + // Let's just post an error message and let main shuts down + // after all streams have posted this message. + let _ = elem + .post_message(gst::message::Error::new(gst::LibraryError::Shutdown, "EOS")); + } + EventView::FlushStop(_) => { + self.0.lock().await.is_flushing = false; + } + EventView::Segment(evt) => { + if let Some(time_seg) = evt.segment().downcast_ref::() { + self.0.lock().await.segment_start = time_seg.start(); + } + } + EventView::SinkMessage(evt) => { + let _ = elem.post_message(evt.message()); + } + _ => (), + } + + true + } + .boxed() + } + + fn sink_event(self, _pad: &gst::Pad, _imp: &AsyncMutexSink, event: gst::Event) -> bool { + if let EventView::FlushStart(..) = event.view() { + block_on_or_add_sub_task(async move { self.0.lock().await.is_flushing = true }); + } + + true + } +} + +impl AsyncPadSinkHandler { + fn prepare(&self, is_main_elem: bool, stats: Option) { + futures::executor::block_on(async move { + let mut inner = self.0.lock().await; + inner.is_main_elem = is_main_elem; + inner.stats = stats.map(Box::new); + }); + } + + fn start(&self) { + futures::executor::block_on(async move { + let mut inner = self.0.lock().await; + + inner.is_flushing = false; + inner.last_dts = None; + + if let Some(stats) = inner.stats.as_mut() { + stats.start(); + } + }); + } + + fn stop(&self) { + futures::executor::block_on(async move { + let mut inner = self.0.lock().await; + inner.is_flushing = true; + }); + } +} + +#[derive(Debug)] +pub struct AsyncMutexSink { + sink_pad: PadSink, + sink_pad_handler: AsyncPadSinkHandler, + settings: Mutex, +} + +impl AsyncMutexSink { + fn prepare(&self) -> Result<(), gst::ErrorMessage> { + let settings = self.settings.lock().unwrap(); + debug_or_trace!(CAT, settings.is_main_elem, imp: self, "Preparing"); + let stats = if settings.logs_stats { + Some(Stats::new( + settings.max_buffers, + settings.push_period + settings.context_wait / 2, + )) + } else { + None + }; + + self.sink_pad_handler.prepare(settings.is_main_elem, stats); + debug_or_trace!(CAT, settings.is_main_elem, imp: self, "Prepared"); + + Ok(()) + } + + fn stop(&self) -> Result<(), gst::ErrorMessage> { + let is_main_elem = self.settings.lock().unwrap().is_main_elem; + debug_or_trace!(CAT, is_main_elem, imp: self, "Stopping"); + self.sink_pad_handler.stop(); + debug_or_trace!(CAT, is_main_elem, imp: self, "Stopped"); + + Ok(()) + } + + fn start(&self) -> Result<(), gst::ErrorMessage> { + let is_main_elem = self.settings.lock().unwrap().is_main_elem; + debug_or_trace!(CAT, is_main_elem, imp: self, "Starting"); + self.sink_pad_handler.start(); + debug_or_trace!(CAT, is_main_elem, imp: self, "Started"); + + Ok(()) + } +} + +#[glib::object_subclass] +impl ObjectSubclass for AsyncMutexSink { + const NAME: &'static str = "TsStandaloneAsyncMutexSink"; + type Type = super::AsyncMutexSink; + type ParentType = gst::Element; + + fn with_class(klass: &Self::Class) -> Self { + let sink_pad_handler = AsyncPadSinkHandler::default(); + Self { + sink_pad: PadSink::new( + gst::Pad::from_template(&klass.pad_template("sink").unwrap(), Some("sink")), + sink_pad_handler.clone(), + ), + sink_pad_handler, + settings: Default::default(), + } + } +} + +impl ObjectImpl for AsyncMutexSink { + fn properties() -> &'static [glib::ParamSpec] { + static PROPERTIES: Lazy> = Lazy::new(Settings::properties); + PROPERTIES.as_ref() + } + + fn set_property(&self, id: usize, value: &glib::Value, pspec: &glib::ParamSpec) { + self.settings.lock().unwrap().set_property(id, value, pspec); + } + + fn property(&self, id: usize, pspec: &glib::ParamSpec) -> glib::Value { + self.settings.lock().unwrap().property(id, pspec) + } + + fn constructed(&self) { + self.parent_constructed(); + + let obj = self.obj(); + obj.add_pad(self.sink_pad.gst_pad()).unwrap(); + obj.set_element_flags(gst::ElementFlags::SINK); + } +} + +impl GstObjectImpl for AsyncMutexSink {} + +impl ElementImpl for AsyncMutexSink { + fn metadata() -> Option<&'static gst::subclass::ElementMetadata> { + static ELEMENT_METADATA: Lazy = Lazy::new(|| { + gst::subclass::ElementMetadata::new( + "Thread-sharing standalone test async mutex sink", + "Sink/Test", + "Thread-sharing standalone test async mutex sink", + "François Laignel ", + ) + }); + + Some(&*ELEMENT_METADATA) + } + + fn pad_templates() -> &'static [gst::PadTemplate] { + static PAD_TEMPLATES: Lazy> = Lazy::new(|| { + let caps = gst::Caps::new_any(); + + let sink_pad_template = gst::PadTemplate::new( + "sink", + gst::PadDirection::Sink, + gst::PadPresence::Always, + &caps, + ) + .unwrap(); + + vec![sink_pad_template] + }); + + PAD_TEMPLATES.as_ref() + } + + fn change_state( + &self, + transition: gst::StateChange, + ) -> Result { + gst::trace!(CAT, imp: self, "Changing state {transition:?}"); + + match transition { + gst::StateChange::NullToReady => { + self.prepare().map_err(|err| { + self.post_error_message(err); + gst::StateChangeError + })?; + } + gst::StateChange::ReadyToPaused => { + self.start().map_err(|_| gst::StateChangeError)?; + } + gst::StateChange::PausedToReady => { + self.stop().map_err(|_| gst::StateChangeError)?; + } + _ => (), + } + + self.parent_change_state(transition) + } +} diff --git a/generic/threadshare/examples/standalone/sink/async_mutex/mod.rs b/generic/threadshare/examples/standalone/sink/async_mutex/mod.rs new file mode 100644 index 00000000..8c711d33 --- /dev/null +++ b/generic/threadshare/examples/standalone/sink/async_mutex/mod.rs @@ -0,0 +1,17 @@ +use gst::glib; +use gst::prelude::*; + +mod imp; + +glib::wrapper! { + pub struct AsyncMutexSink(ObjectSubclass) @extends gst::Element, gst::Object; +} + +pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { + gst::Element::register( + Some(plugin), + super::ASYNC_MUTEX_ELEMENT_NAME, + gst::Rank::None, + AsyncMutexSink::static_type(), + ) +} diff --git a/generic/threadshare/examples/standalone/sink/mod.rs b/generic/threadshare/examples/standalone/sink/mod.rs index cdafe4ad..bf9b7aae 100644 --- a/generic/threadshare/examples/standalone/sink/mod.rs +++ b/generic/threadshare/examples/standalone/sink/mod.rs @@ -1 +1,22 @@ +pub mod async_mutex; +pub mod sync_mutex; pub mod task; + +mod settings; +pub use settings::Settings; + +mod stats; +pub use stats::Stats; + +pub const ASYNC_MUTEX_ELEMENT_NAME: &str = "ts-standalone-async-mutex-sink"; +pub const SYNC_MUTEX_ELEMENT_NAME: &str = "ts-standalone-sync-mutex-sink"; +pub const TASK_ELEMENT_NAME: &str = "ts-standalone-task-sink"; + +use once_cell::sync::Lazy; +static CAT: Lazy = Lazy::new(|| { + gst::DebugCategory::new( + "ts-standalone-sink", + gst::DebugColorFlags::empty(), + Some("Thread-sharing standalone test sink"), + ) +}); diff --git a/generic/threadshare/examples/standalone/sink/settings.rs b/generic/threadshare/examples/standalone/sink/settings.rs new file mode 100644 index 00000000..cc04187d --- /dev/null +++ b/generic/threadshare/examples/standalone/sink/settings.rs @@ -0,0 +1,115 @@ +use gst::glib; +use gst::prelude::*; + +use std::time::Duration; + +const DEFAULT_CONTEXT: &str = ""; +const DEFAULT_CONTEXT_WAIT: Duration = Duration::from_millis(20); +const DEFAULT_PUSH_PERIOD: Duration = Duration::from_millis(20); +const DEFAULT_MAX_BUFFERS: i32 = 50 * (100 - 25); + +#[derive(Debug, Clone)] +pub struct Settings { + pub context: String, + pub context_wait: Duration, + pub is_main_elem: bool, + pub logs_stats: bool, + pub push_period: Duration, + pub max_buffers: Option, +} + +impl Default for Settings { + fn default() -> Self { + Settings { + context: DEFAULT_CONTEXT.into(), + context_wait: DEFAULT_CONTEXT_WAIT, + is_main_elem: false, + logs_stats: false, + push_period: DEFAULT_PUSH_PERIOD, + max_buffers: Some(DEFAULT_MAX_BUFFERS as u32), + } + } +} + +impl Settings { + pub fn properties() -> Vec { + vec![ + glib::ParamSpecString::builder("context") + .nick("Context") + .blurb("Context name to share threads with") + .default_value(Some(DEFAULT_CONTEXT)) + .build(), + glib::ParamSpecUInt::builder("context-wait") + .nick("Context Wait") + .blurb("Throttle poll loop to run at most once every this many ms") + .maximum(1000) + .default_value(DEFAULT_CONTEXT_WAIT.as_millis() as u32) + .build(), + glib::ParamSpecBoolean::builder("main-elem") + .nick("Main Element") + .blurb("Declare this element as the main one") + .write_only() + .build(), + glib::ParamSpecBoolean::builder("logs-stats") + .nick("Logs Stats") + .blurb("Whether statistics should be logged") + .write_only() + .build(), + glib::ParamSpecUInt::builder("push-period") + .nick("Src buffer Push Period") + .blurb("Push period used by `src` element (used for stats warnings)") + .default_value(DEFAULT_PUSH_PERIOD.as_millis() as u32) + .build(), + glib::ParamSpecInt::builder("max-buffers") + .nick("Max Buffers") + .blurb("Number of buffers to count before stopping stats (-1 = unlimited)") + .minimum(-1i32) + .default_value(DEFAULT_MAX_BUFFERS) + .build(), + ] + } + + pub fn set_property(&mut self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) { + match pspec.name() { + "context" => { + self.context = value + .get::>() + .unwrap() + .unwrap_or_else(|| DEFAULT_CONTEXT.into()); + } + "context-wait" => { + self.context_wait = Duration::from_millis(value.get::().unwrap().into()); + } + "main-elem" => { + self.is_main_elem = value.get::().unwrap(); + } + "logs-stats" => { + let logs_stats = value.get().unwrap(); + self.logs_stats = logs_stats; + } + "push-period" => { + self.push_period = Duration::from_millis(value.get::().unwrap().into()); + } + "max-buffers" => { + let value = value.get::().unwrap(); + self.max_buffers = if value > 0 { Some(value as u32) } else { None }; + } + _ => unimplemented!(), + } + } + + pub fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { + match pspec.name() { + "context" => self.context.to_value(), + "context-wait" => (self.context_wait.as_millis() as u32).to_value(), + "main-elem" => self.is_main_elem.to_value(), + "push-period" => (self.push_period.as_millis() as u32).to_value(), + "max-buffers" => self + .max_buffers + .and_then(|val| val.try_into().ok()) + .unwrap_or(-1i32) + .to_value(), + _ => unimplemented!(), + } + } +} diff --git a/generic/threadshare/examples/standalone/sink/stats.rs b/generic/threadshare/examples/standalone/sink/stats.rs new file mode 100644 index 00000000..69370d9c --- /dev/null +++ b/generic/threadshare/examples/standalone/sink/stats.rs @@ -0,0 +1,270 @@ +use gst::prelude::*; +use std::time::{Duration, Instant}; + +#[cfg(feature = "tuning")] +use gstthreadshare::runtime::Context; + +use super::CAT; + +const LOG_PERIOD: Duration = Duration::from_secs(20); + +#[derive(Debug, Default)] +pub struct Stats { + ramp_up_instant: Option, + log_start_instant: Option, + last_delta_instant: Option, + max_buffers: Option, + buffer_count: f32, + buffer_count_delta: f32, + latency_sum: f32, + latency_square_sum: f32, + latency_sum_delta: f32, + latency_square_sum_delta: f32, + latency_min: Duration, + latency_min_delta: Duration, + latency_max: Duration, + latency_max_delta: Duration, + interval_sum: f32, + interval_square_sum: f32, + interval_sum_delta: f32, + interval_square_sum_delta: f32, + interval_min: Duration, + interval_min_delta: Duration, + interval_max: Duration, + interval_max_delta: Duration, + interval_late_warn: Duration, + interval_late_count: f32, + interval_late_count_delta: f32, + #[cfg(feature = "tuning")] + parked_duration_init: Duration, +} + +impl Stats { + pub fn new(max_buffers: Option, interval_late_warn: Duration) -> Self { + Stats { + max_buffers: max_buffers.map(|max_buffers| max_buffers as f32), + interval_late_warn, + ..Default::default() + } + } + + pub fn start(&mut self) { + self.buffer_count = 0.0; + self.buffer_count_delta = 0.0; + self.latency_sum = 0.0; + self.latency_square_sum = 0.0; + self.latency_sum_delta = 0.0; + self.latency_square_sum_delta = 0.0; + self.latency_min = Duration::MAX; + self.latency_min_delta = Duration::MAX; + self.latency_max = Duration::ZERO; + self.latency_max_delta = Duration::ZERO; + self.interval_sum = 0.0; + self.interval_square_sum = 0.0; + self.interval_sum_delta = 0.0; + self.interval_square_sum_delta = 0.0; + self.interval_min = Duration::MAX; + self.interval_min_delta = Duration::MAX; + self.interval_max = Duration::ZERO; + self.interval_max_delta = Duration::ZERO; + self.interval_late_count = 0.0; + self.interval_late_count_delta = 0.0; + self.last_delta_instant = None; + self.log_start_instant = None; + + self.ramp_up_instant = Some(Instant::now()); + gst::info!(CAT, "First stats logs in {:2?}", 2 * LOG_PERIOD); + } + + pub fn is_active(&mut self) -> bool { + if let Some(ramp_up_instant) = self.ramp_up_instant { + if ramp_up_instant.elapsed() < LOG_PERIOD { + return false; + } + + self.ramp_up_instant = None; + gst::info!(CAT, "Ramp up complete. Stats logs in {:2?}", LOG_PERIOD); + self.log_start_instant = Some(Instant::now()); + self.last_delta_instant = self.log_start_instant; + + #[cfg(feature = "tuning")] + { + self.parked_duration_init = Context::current().unwrap().parked_duration(); + } + } + + use std::cmp::Ordering::*; + match self.max_buffers.opt_cmp(self.buffer_count) { + Some(Equal) => { + self.log_global(); + self.buffer_count += 1.0; + false + } + Some(Less) => false, + _ => true, + } + } + + pub fn add_buffer(&mut self, latency: Duration, interval: Duration) { + if !self.is_active() { + return; + } + + self.buffer_count += 1.0; + self.buffer_count_delta += 1.0; + + // Latency + let latency_f32 = latency.as_nanos() as f32; + let latency_square = latency_f32.powi(2); + + self.latency_sum += latency_f32; + self.latency_square_sum += latency_square; + self.latency_min = self.latency_min.min(latency); + self.latency_max = self.latency_max.max(latency); + + self.latency_sum_delta += latency_f32; + self.latency_square_sum_delta += latency_square; + self.latency_min_delta = self.latency_min_delta.min(latency); + self.latency_max_delta = self.latency_max_delta.max(latency); + + // Interval + let interval_f32 = interval.as_nanos() as f32; + let interval_square = interval_f32.powi(2); + + self.interval_sum += interval_f32; + self.interval_square_sum += interval_square; + self.interval_min = self.interval_min.min(interval); + self.interval_max = self.interval_max.max(interval); + + self.interval_sum_delta += interval_f32; + self.interval_square_sum_delta += interval_square; + self.interval_min_delta = self.interval_min_delta.min(interval); + self.interval_max_delta = self.interval_max_delta.max(interval); + + if interval > self.interval_late_warn { + self.interval_late_count += 1.0; + self.interval_late_count_delta += 1.0; + } + + let delta_duration = match self.last_delta_instant { + Some(last_delta) => last_delta.elapsed(), + None => return, + }; + + if delta_duration < LOG_PERIOD { + return; + } + + self.last_delta_instant = Some(Instant::now()); + + gst::info!(CAT, "Delta stats:"); + let interval_mean = self.interval_sum_delta / self.buffer_count_delta; + let interval_std_dev = f32::sqrt( + self.interval_square_sum_delta / self.buffer_count_delta - interval_mean.powi(2), + ); + + gst::info!( + CAT, + "o interval: mean {:4.2?} σ {:4.1?} [{:4.1?}, {:4.1?}]", + Duration::from_nanos(interval_mean as u64), + Duration::from_nanos(interval_std_dev as u64), + self.interval_min_delta, + self.interval_max_delta, + ); + + if self.interval_late_count_delta > f32::EPSILON { + gst::warning!( + CAT, + "o {:5.2}% late buffers", + 100f32 * self.interval_late_count_delta / self.buffer_count_delta + ); + } + + self.interval_sum_delta = 0.0; + self.interval_square_sum_delta = 0.0; + self.interval_min_delta = Duration::MAX; + self.interval_max_delta = Duration::ZERO; + self.interval_late_count_delta = 0.0; + + let latency_mean = self.latency_sum_delta / self.buffer_count_delta; + let latency_std_dev = f32::sqrt( + self.latency_square_sum_delta / self.buffer_count_delta - latency_mean.powi(2), + ); + + gst::info!( + CAT, + "o latency: mean {:4.2?} σ {:4.1?} [{:4.1?}, {:4.1?}]", + Duration::from_nanos(latency_mean as u64), + Duration::from_nanos(latency_std_dev as u64), + self.latency_min_delta, + self.latency_max_delta, + ); + + self.latency_sum_delta = 0.0; + self.latency_square_sum_delta = 0.0; + self.latency_min_delta = Duration::MAX; + self.latency_max_delta = Duration::ZERO; + + self.buffer_count_delta = 0.0; + } + + pub fn log_global(&mut self) { + if self.buffer_count < 1.0 { + return; + } + + let _log_start = if let Some(log_start) = self.log_start_instant { + log_start + } else { + return; + }; + + gst::info!(CAT, "Global stats:"); + + #[cfg(feature = "tuning")] + { + let duration = _log_start.elapsed(); + let parked_duration = + Context::current().unwrap().parked_duration() - self.parked_duration_init; + gst::info!( + CAT, + "o parked: {parked_duration:4.2?} ({:5.2?}%)", + (parked_duration.as_nanos() as f32 * 100.0 / duration.as_nanos() as f32) + ); + } + + let interval_mean = self.interval_sum / self.buffer_count; + let interval_std_dev = + f32::sqrt(self.interval_square_sum / self.buffer_count - interval_mean.powi(2)); + + gst::info!( + CAT, + "o interval: mean {:4.2?} σ {:4.1?} [{:4.1?}, {:4.1?}]", + Duration::from_nanos(interval_mean as u64), + Duration::from_nanos(interval_std_dev as u64), + self.interval_min, + self.interval_max, + ); + + if self.interval_late_count > f32::EPSILON { + gst::warning!( + CAT, + "o {:5.2}% late buffers", + 100f32 * self.interval_late_count / self.buffer_count + ); + } + + let latency_mean = self.latency_sum / self.buffer_count; + let latency_std_dev = + f32::sqrt(self.latency_square_sum / self.buffer_count - latency_mean.powi(2)); + + gst::info!( + CAT, + "o latency: mean {:4.2?} σ {:4.1?} [{:4.1?}, {:4.1?}]", + Duration::from_nanos(latency_mean as u64), + Duration::from_nanos(latency_std_dev as u64), + self.latency_min, + self.latency_max, + ); + } +} diff --git a/generic/threadshare/examples/standalone/sink/sync_mutex/imp.rs b/generic/threadshare/examples/standalone/sink/sync_mutex/imp.rs new file mode 100644 index 00000000..bef18db1 --- /dev/null +++ b/generic/threadshare/examples/standalone/sink/sync_mutex/imp.rs @@ -0,0 +1,327 @@ +// Copyright (C) 2022 François Laignel +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +use futures::future::BoxFuture; +use futures::prelude::*; + +use gst::glib; +use gst::prelude::*; +use gst::subclass::prelude::*; +use gst::EventView; + +use once_cell::sync::Lazy; + +use gstthreadshare::runtime::{prelude::*, PadSink}; + +use std::sync::{Arc, Mutex}; +use std::time::Duration; + +use super::super::{Settings, Stats, CAT}; + +#[derive(Debug, Default)] +struct PadSinkHandlerInner { + is_flushing: bool, + is_main_elem: bool, + last_dts: Option, + segment_start: Option, + stats: Option>, +} + +impl PadSinkHandlerInner { + fn handle_buffer( + &mut self, + elem: &super::DirectSink, + buffer: gst::Buffer, + ) -> Result<(), gst::FlowError> { + if self.is_flushing { + log_or_trace!( + CAT, + self.is_main_elem, + obj: elem, + "Discarding {buffer:?} (flushing)" + ); + + return Err(gst::FlowError::Flushing); + } + + debug_or_trace!(CAT, self.is_main_elem, obj: elem, "Received {buffer:?}"); + + let dts = buffer + .dts() + .expect("Buffer without dts") + .checked_sub(self.segment_start.expect("Buffer without Time Segment")) + .expect("dts before Segment start"); + + if let Some(last_dts) = self.last_dts { + let cur_ts = elem.current_running_time().unwrap(); + let latency: Duration = (cur_ts - dts).into(); + let interval: Duration = (dts - last_dts).into(); + + if let Some(stats) = self.stats.as_mut() { + stats.add_buffer(latency, interval); + } + + debug_or_trace!(CAT, self.is_main_elem, obj: elem, "o latency {latency:.2?}"); + debug_or_trace!( + CAT, + self.is_main_elem, + obj: elem, + "o interval {interval:.2?}", + ); + } + + self.last_dts = Some(dts); + + log_or_trace!(CAT, self.is_main_elem, obj: elem, "Buffer processed"); + + Ok(()) + } +} + +#[derive(Clone, Debug, Default)] +struct SyncPadSinkHandler(Arc>); + +impl PadSinkHandler for SyncPadSinkHandler { + type ElementImpl = DirectSink; + + fn sink_chain( + self, + _pad: gst::Pad, + elem: super::DirectSink, + buffer: gst::Buffer, + ) -> BoxFuture<'static, Result> { + async move { + if self.0.lock().unwrap().handle_buffer(&elem, buffer).is_err() { + return Err(gst::FlowError::Flushing); + } + + Ok(gst::FlowSuccess::Ok) + } + .boxed() + } + + fn sink_event_serialized( + self, + _pad: gst::Pad, + elem: super::DirectSink, + event: gst::Event, + ) -> BoxFuture<'static, bool> { + async move { + match event.view() { + EventView::Eos(_) => { + { + let mut inner = self.0.lock().unwrap(); + debug_or_trace!(CAT, inner.is_main_elem, obj: elem, "EOS"); + inner.is_flushing = true; + } + + // When each element sends its own EOS message, + // it takes ages for the pipeline to process all of them. + // Let's just post an error message and let main shuts down + // after all streams have posted this message. + let _ = elem + .post_message(gst::message::Error::new(gst::LibraryError::Shutdown, "EOS")); + } + EventView::FlushStop(_) => { + self.0.lock().unwrap().is_flushing = false; + } + EventView::Segment(evt) => { + if let Some(time_seg) = evt.segment().downcast_ref::() { + self.0.lock().unwrap().segment_start = time_seg.start(); + } + } + EventView::SinkMessage(evt) => { + let _ = elem.post_message(evt.message()); + } + _ => (), + } + + true + } + .boxed() + } + + fn sink_event(self, _pad: &gst::Pad, _imp: &DirectSink, event: gst::Event) -> bool { + if let EventView::FlushStart(..) = event.view() { + self.0.lock().unwrap().is_flushing = true; + } + + true + } +} + +impl SyncPadSinkHandler { + fn prepare(&self, is_main_elem: bool, stats: Option) { + let mut inner = self.0.lock().unwrap(); + inner.is_main_elem = is_main_elem; + inner.stats = stats.map(Box::new); + } + + fn start(&self) { + let mut inner = self.0.lock().unwrap(); + + inner.is_flushing = false; + inner.last_dts = None; + + if let Some(stats) = inner.stats.as_mut() { + stats.start(); + } + } + + fn stop(&self) { + let mut inner = self.0.lock().unwrap(); + inner.is_flushing = true; + } +} + +#[derive(Debug)] +pub struct DirectSink { + sink_pad: PadSink, + sink_pad_handler: SyncPadSinkHandler, + settings: Mutex, +} + +impl DirectSink { + fn prepare(&self) -> Result<(), gst::ErrorMessage> { + let settings = self.settings.lock().unwrap(); + debug_or_trace!(CAT, settings.is_main_elem, imp: self, "Preparing"); + let stats = if settings.logs_stats { + Some(Stats::new( + settings.max_buffers, + settings.push_period + settings.context_wait / 2, + )) + } else { + None + }; + + self.sink_pad_handler.prepare(settings.is_main_elem, stats); + debug_or_trace!(CAT, settings.is_main_elem, imp: self, "Prepared"); + + Ok(()) + } + + fn stop(&self) -> Result<(), gst::ErrorMessage> { + let is_main_elem = self.settings.lock().unwrap().is_main_elem; + debug_or_trace!(CAT, is_main_elem, imp: self, "Stopping"); + self.sink_pad_handler.stop(); + debug_or_trace!(CAT, is_main_elem, imp: self, "Stopped"); + + Ok(()) + } + + fn start(&self) -> Result<(), gst::ErrorMessage> { + let is_main_elem = self.settings.lock().unwrap().is_main_elem; + debug_or_trace!(CAT, is_main_elem, imp: self, "Starting"); + self.sink_pad_handler.start(); + debug_or_trace!(CAT, is_main_elem, imp: self, "Started"); + + Ok(()) + } +} + +#[glib::object_subclass] +impl ObjectSubclass for DirectSink { + const NAME: &'static str = "TsStandaloneDirectSink"; + type Type = super::DirectSink; + type ParentType = gst::Element; + + fn with_class(klass: &Self::Class) -> Self { + let sink_pad_handler = SyncPadSinkHandler::default(); + Self { + sink_pad: PadSink::new( + gst::Pad::from_template(&klass.pad_template("sink").unwrap(), Some("sink")), + sink_pad_handler.clone(), + ), + sink_pad_handler, + settings: Default::default(), + } + } +} + +impl ObjectImpl for DirectSink { + fn properties() -> &'static [glib::ParamSpec] { + static PROPERTIES: Lazy> = Lazy::new(Settings::properties); + PROPERTIES.as_ref() + } + + fn set_property(&self, id: usize, value: &glib::Value, pspec: &glib::ParamSpec) { + self.settings.lock().unwrap().set_property(id, value, pspec); + } + + fn property(&self, id: usize, pspec: &glib::ParamSpec) -> glib::Value { + self.settings.lock().unwrap().property(id, pspec) + } + + fn constructed(&self) { + self.parent_constructed(); + + let obj = self.obj(); + obj.add_pad(self.sink_pad.gst_pad()).unwrap(); + obj.set_element_flags(gst::ElementFlags::SINK); + } +} + +impl GstObjectImpl for DirectSink {} + +impl ElementImpl for DirectSink { + fn metadata() -> Option<&'static gst::subclass::ElementMetadata> { + static ELEMENT_METADATA: Lazy = Lazy::new(|| { + gst::subclass::ElementMetadata::new( + "Thread-sharing standalone test direct sink", + "Sink/Test", + "Thread-sharing standalone test direct sink", + "François Laignel ", + ) + }); + + Some(&*ELEMENT_METADATA) + } + + fn pad_templates() -> &'static [gst::PadTemplate] { + static PAD_TEMPLATES: Lazy> = Lazy::new(|| { + let caps = gst::Caps::new_any(); + + let sink_pad_template = gst::PadTemplate::new( + "sink", + gst::PadDirection::Sink, + gst::PadPresence::Always, + &caps, + ) + .unwrap(); + + vec![sink_pad_template] + }); + + PAD_TEMPLATES.as_ref() + } + + fn change_state( + &self, + transition: gst::StateChange, + ) -> Result { + gst::trace!(CAT, imp: self, "Changing state {transition:?}"); + + match transition { + gst::StateChange::NullToReady => { + self.prepare().map_err(|err| { + self.post_error_message(err); + gst::StateChangeError + })?; + } + gst::StateChange::ReadyToPaused => { + self.start().map_err(|_| gst::StateChangeError)?; + } + gst::StateChange::PausedToReady => { + self.stop().map_err(|_| gst::StateChangeError)?; + } + _ => (), + } + + self.parent_change_state(transition) + } +} diff --git a/generic/threadshare/examples/standalone/sink/sync_mutex/mod.rs b/generic/threadshare/examples/standalone/sink/sync_mutex/mod.rs new file mode 100644 index 00000000..c3bfb4a0 --- /dev/null +++ b/generic/threadshare/examples/standalone/sink/sync_mutex/mod.rs @@ -0,0 +1,17 @@ +use gst::glib; +use gst::prelude::*; + +mod imp; + +glib::wrapper! { + pub struct DirectSink(ObjectSubclass) @extends gst::Element, gst::Object; +} + +pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { + gst::Element::register( + Some(plugin), + super::SYNC_MUTEX_ELEMENT_NAME, + gst::Rank::None, + DirectSink::static_type(), + ) +} diff --git a/generic/threadshare/examples/standalone/sink/task/imp.rs b/generic/threadshare/examples/standalone/sink/task/imp.rs index dae30931..3979a0e3 100644 --- a/generic/threadshare/examples/standalone/sink/task/imp.rs +++ b/generic/threadshare/examples/standalone/sink/task/imp.rs @@ -8,7 +8,6 @@ use futures::future::BoxFuture; use futures::prelude::*; -use futures::stream::Peekable; use gst::error_msg; use gst::glib; @@ -22,46 +21,9 @@ use gstthreadshare::runtime::prelude::*; use gstthreadshare::runtime::{Context, PadSink, Task}; use std::sync::Mutex; -use std::task::Poll; -use std::time::{Duration, Instant}; +use std::time::Duration; -static CAT: Lazy = Lazy::new(|| { - gst::DebugCategory::new( - "ts-standalone-test-sink", - gst::DebugColorFlags::empty(), - Some("Thread-sharing standalone test sink"), - ) -}); - -const DEFAULT_CONTEXT: &str = ""; -const DEFAULT_CONTEXT_WAIT: Duration = Duration::from_millis(20); -const DEFAULT_PUSH_PERIOD: Duration = Duration::from_millis(20); -const DEFAULT_MAX_BUFFERS: i32 = 50 * (100 - 25); - -const LOG_PERIOD: Duration = Duration::from_secs(20); - -#[derive(Debug, Clone)] -struct Settings { - context: String, - context_wait: Duration, - raise_log_level: bool, - logs_stats: bool, - push_period: Duration, - max_buffers: Option, -} - -impl Default for Settings { - fn default() -> Self { - Settings { - context: DEFAULT_CONTEXT.into(), - context_wait: DEFAULT_CONTEXT_WAIT, - raise_log_level: false, - logs_stats: false, - push_period: DEFAULT_PUSH_PERIOD, - max_buffers: Some(DEFAULT_MAX_BUFFERS as u32), - } - } -} +use super::super::{Settings, Stats, CAT}; #[derive(Debug)] enum StreamItem { @@ -70,21 +32,20 @@ enum StreamItem { } #[derive(Clone, Debug)] -struct TestSinkPadHandler; +struct TaskPadSinkHandler; -impl PadSinkHandler for TestSinkPadHandler { - type ElementImpl = TestSink; +impl PadSinkHandler for TaskPadSinkHandler { + type ElementImpl = TaskSink; fn sink_chain( self, _pad: gst::Pad, - elem: super::TestSink, + elem: super::TaskSink, buffer: gst::Buffer, ) -> BoxFuture<'static, Result> { let sender = elem.imp().clone_item_sender(); async move { if sender.send_async(StreamItem::Buffer(buffer)).await.is_err() { - gst::debug!(CAT, obj: elem, "Flushing"); return Err(gst::FlowError::Flushing); } @@ -93,39 +54,37 @@ impl PadSinkHandler for TestSinkPadHandler { .boxed() } - fn sink_chain_list( - self, - _pad: gst::Pad, - elem: super::TestSink, - list: gst::BufferList, - ) -> BoxFuture<'static, Result> { - let sender = elem.imp().clone_item_sender(); - async move { - for buffer in list.iter_owned() { - if sender.send_async(StreamItem::Buffer(buffer)).await.is_err() { - gst::debug!(CAT, obj: elem, "Flushing"); - return Err(gst::FlowError::Flushing); - } - } - - Ok(gst::FlowSuccess::Ok) - } - .boxed() - } - fn sink_event_serialized( self, _pad: gst::Pad, - elem: super::TestSink, + elem: super::TaskSink, event: gst::Event, ) -> BoxFuture<'static, bool> { let sender = elem.imp().clone_item_sender(); async move { - if let EventView::FlushStop(_) = event.view() { - let imp = elem.imp(); - return imp.task.flush_stop().await_maybe_on_context().is_ok(); - } else if sender.send_async(StreamItem::Event(event)).await.is_err() { - gst::debug!(CAT, obj: elem, "Flushing"); + match event.view() { + EventView::Segment(_) => { + let _ = sender.send_async(StreamItem::Event(event)).await; + } + EventView::Eos(_) => { + let is_main_elem = elem.imp().settings.lock().unwrap().is_main_elem; + debug_or_trace!(CAT, is_main_elem, obj: elem, "EOS"); + + // When each element sends its own EOS message, + // it takes ages for the pipeline to process all of them. + // Let's just post an error message and let main shuts down + // after all streams have posted this message. + let _ = elem + .post_message(gst::message::Error::new(gst::LibraryError::Shutdown, "EOS")); + } + EventView::FlushStop(_) => { + let imp = elem.imp(); + return imp.task.flush_stop().await_maybe_on_context().is_ok(); + } + EventView::SinkMessage(evt) => { + let _ = elem.post_message(evt.message()); + } + _ => (), } true @@ -133,7 +92,7 @@ impl PadSinkHandler for TestSinkPadHandler { .boxed() } - fn sink_event(self, _pad: &gst::Pad, imp: &TestSink, event: gst::Event) -> bool { + fn sink_event(self, _pad: &gst::Pad, imp: &TaskSink, event: gst::Event) -> bool { if let EventView::FlushStart(..) = event.view() { return imp.task.flush_start().await_maybe_on_context().is_ok(); } @@ -142,329 +101,54 @@ impl PadSinkHandler for TestSinkPadHandler { } } -#[derive(Default)] -struct Stats { - must_log: bool, - ramp_up_instant: Option, - log_start_instant: Option, - last_delta_instant: Option, - max_buffers: Option, - buffer_count: f32, - buffer_count_delta: f32, - latency_sum: f32, - latency_square_sum: f32, - latency_sum_delta: f32, - latency_square_sum_delta: f32, - latency_min: Duration, - latency_min_delta: Duration, - latency_max: Duration, - latency_max_delta: Duration, - interval_sum: f32, - interval_square_sum: f32, - interval_sum_delta: f32, - interval_square_sum_delta: f32, - interval_min: Duration, - interval_min_delta: Duration, - interval_max: Duration, - interval_max_delta: Duration, - interval_late_warn: Duration, - interval_late_count: f32, - interval_late_count_delta: f32, - #[cfg(feature = "tuning")] - parked_duration_init: Duration, -} - -impl Stats { - fn start(&mut self) { - if !self.must_log { - return; - } - - self.buffer_count = 0.0; - self.buffer_count_delta = 0.0; - self.latency_sum = 0.0; - self.latency_square_sum = 0.0; - self.latency_sum_delta = 0.0; - self.latency_square_sum_delta = 0.0; - self.latency_min = Duration::MAX; - self.latency_min_delta = Duration::MAX; - self.latency_max = Duration::ZERO; - self.latency_max_delta = Duration::ZERO; - self.interval_sum = 0.0; - self.interval_square_sum = 0.0; - self.interval_sum_delta = 0.0; - self.interval_square_sum_delta = 0.0; - self.interval_min = Duration::MAX; - self.interval_min_delta = Duration::MAX; - self.interval_max = Duration::ZERO; - self.interval_max_delta = Duration::ZERO; - self.interval_late_count = 0.0; - self.interval_late_count_delta = 0.0; - self.last_delta_instant = None; - self.log_start_instant = None; - - self.ramp_up_instant = Some(Instant::now()); - gst::info!(CAT, "First stats logs in {:2?}", 2 * LOG_PERIOD); - } - - fn is_active(&mut self) -> bool { - if !self.must_log { - return false; - } - - if let Some(ramp_up_instant) = self.ramp_up_instant { - if ramp_up_instant.elapsed() < LOG_PERIOD { - return false; - } - - self.ramp_up_instant = None; - gst::info!(CAT, "Ramp up complete. Stats logs in {:2?}", LOG_PERIOD); - self.log_start_instant = Some(Instant::now()); - self.last_delta_instant = self.log_start_instant; - - #[cfg(feature = "tuning")] - { - self.parked_duration_init = Context::current().unwrap().parked_duration(); - } - } - - use std::cmp::Ordering::*; - match self.max_buffers.opt_cmp(self.buffer_count) { - Some(Equal) => { - self.log_global(); - self.buffer_count += 1.0; - false - } - Some(Less) => false, - _ => true, - } - } - - fn add_buffer(&mut self, latency: Duration, interval: Duration) { - if !self.is_active() { - return; - } - - self.buffer_count += 1.0; - self.buffer_count_delta += 1.0; - - // Latency - let latency_f32 = latency.as_nanos() as f32; - let latency_square = latency_f32.powi(2); - - self.latency_sum += latency_f32; - self.latency_square_sum += latency_square; - self.latency_min = self.latency_min.min(latency); - self.latency_max = self.latency_max.max(latency); - - self.latency_sum_delta += latency_f32; - self.latency_square_sum_delta += latency_square; - self.latency_min_delta = self.latency_min_delta.min(latency); - self.latency_max_delta = self.latency_max_delta.max(latency); - - // Interval - let interval_f32 = interval.as_nanos() as f32; - let interval_square = interval_f32.powi(2); - - self.interval_sum += interval_f32; - self.interval_square_sum += interval_square; - self.interval_min = self.interval_min.min(interval); - self.interval_max = self.interval_max.max(interval); - - self.interval_sum_delta += interval_f32; - self.interval_square_sum_delta += interval_square; - self.interval_min_delta = self.interval_min_delta.min(interval); - self.interval_max_delta = self.interval_max_delta.max(interval); - - if interval > self.interval_late_warn { - self.interval_late_count += 1.0; - self.interval_late_count_delta += 1.0; - } - - let delta_duration = match self.last_delta_instant { - Some(last_delta) => last_delta.elapsed(), - None => return, - }; - - if delta_duration < LOG_PERIOD { - return; - } - - self.last_delta_instant = Some(Instant::now()); - - gst::info!(CAT, "Delta stats:"); - let interval_mean = self.interval_sum_delta / self.buffer_count_delta; - let interval_std_dev = f32::sqrt( - self.interval_square_sum_delta / self.buffer_count_delta - interval_mean.powi(2), - ); - - gst::info!( - CAT, - "o interval: mean {:4.2?} σ {:4.1?} [{:4.1?}, {:4.1?}]", - Duration::from_nanos(interval_mean as u64), - Duration::from_nanos(interval_std_dev as u64), - self.interval_min_delta, - self.interval_max_delta, - ); - - if self.interval_late_count_delta > f32::EPSILON { - gst::warning!( - CAT, - "o {:5.2}% late buffers", - 100f32 * self.interval_late_count_delta / self.buffer_count_delta - ); - } - - self.interval_sum_delta = 0.0; - self.interval_square_sum_delta = 0.0; - self.interval_min_delta = Duration::MAX; - self.interval_max_delta = Duration::ZERO; - self.interval_late_count_delta = 0.0; - - let latency_mean = self.latency_sum_delta / self.buffer_count_delta; - let latency_std_dev = f32::sqrt( - self.latency_square_sum_delta / self.buffer_count_delta - latency_mean.powi(2), - ); - - gst::info!( - CAT, - "o latency: mean {:4.2?} σ {:4.1?} [{:4.1?}, {:4.1?}]", - Duration::from_nanos(latency_mean as u64), - Duration::from_nanos(latency_std_dev as u64), - self.latency_min_delta, - self.latency_max_delta, - ); - - self.latency_sum_delta = 0.0; - self.latency_square_sum_delta = 0.0; - self.latency_min_delta = Duration::MAX; - self.latency_max_delta = Duration::ZERO; - - self.buffer_count_delta = 0.0; - } - - fn log_global(&mut self) { - if self.buffer_count < 1.0 { - return; - } - - let _log_start = if let Some(log_start) = self.log_start_instant { - log_start - } else { - return; - }; - - gst::info!(CAT, "Global stats:"); - - #[cfg(feature = "tuning")] - { - let duration = _log_start.elapsed(); - let parked_duration = - Context::current().unwrap().parked_duration() - self.parked_duration_init; - gst::info!( - CAT, - "o parked: {parked_duration:4.2?} ({:5.2?}%)", - (parked_duration.as_nanos() as f32 * 100.0 / duration.as_nanos() as f32) - ); - } - - let interval_mean = self.interval_sum / self.buffer_count; - let interval_std_dev = - f32::sqrt(self.interval_square_sum / self.buffer_count - interval_mean.powi(2)); - - gst::info!( - CAT, - "o interval: mean {:4.2?} σ {:4.1?} [{:4.1?}, {:4.1?}]", - Duration::from_nanos(interval_mean as u64), - Duration::from_nanos(interval_std_dev as u64), - self.interval_min, - self.interval_max, - ); - - if self.interval_late_count > f32::EPSILON { - gst::warning!( - CAT, - "o {:5.2}% late buffers", - 100f32 * self.interval_late_count / self.buffer_count - ); - } - - let latency_mean = self.latency_sum / self.buffer_count; - let latency_std_dev = - f32::sqrt(self.latency_square_sum / self.buffer_count - latency_mean.powi(2)); - - gst::info!( - CAT, - "o latency: mean {:4.2?} σ {:4.1?} [{:4.1?}, {:4.1?}]", - Duration::from_nanos(latency_mean as u64), - Duration::from_nanos(latency_std_dev as u64), - self.latency_min, - self.latency_max, - ); - } -} - -struct TestSinkTask { - element: super::TestSink, - raise_log_level: bool, +struct TaskSinkTask { + elem: super::TaskSink, + item_receiver: flume::Receiver, + is_main_elem: bool, last_dts: Option, - item_receiver: Peekable>, - stats: Stats, - segment: Option, + segment_start: Option, + stats: Option>, } -impl TestSinkTask { - fn new(element: &super::TestSink, item_receiver: flume::Receiver) -> Self { - TestSinkTask { - element: element.clone(), - raise_log_level: false, +impl TaskSinkTask { + fn new( + elem: &super::TaskSink, + item_receiver: flume::Receiver, + is_main_elem: bool, + stats: Option>, + ) -> Self { + TaskSinkTask { + elem: elem.clone(), + item_receiver, + is_main_elem, last_dts: None, - item_receiver: item_receiver.into_stream().peekable(), - stats: Stats::default(), - segment: None, + stats, + segment_start: None, } } - async fn flush(&mut self) { + fn flush(&mut self) { // Purge the channel - while let Poll::Ready(Some(_item)) = futures::poll!(self.item_receiver.next()) {} + while !self.item_receiver.is_empty() {} } } -impl TaskImpl for TestSinkTask { +impl TaskImpl for TaskSinkTask { type Item = StreamItem; fn prepare(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> { - async move { - let sink = self.element.imp(); - let settings = sink.settings.lock().unwrap(); - self.raise_log_level = settings.raise_log_level; - - if self.raise_log_level { - gst::log!(CAT, obj: self.element, "Preparing Task"); - } else { - gst::trace!(CAT, obj: self.element, "Preparing Task"); - } - - self.stats.must_log = settings.logs_stats; - self.stats.max_buffers = settings.max_buffers.map(|max_buffers| max_buffers as f32); - self.stats.interval_late_warn = settings.push_period + settings.context_wait / 2; - - Ok(()) - } - .boxed() + log_or_trace!(CAT, self.is_main_elem, obj: self.elem, "Preparing Task"); + future::ok(()).boxed() } fn start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> { async { - if self.raise_log_level { - gst::log!(CAT, obj: self.element, "Starting Task"); - } else { - gst::trace!(CAT, obj: self.element, "Starting Task"); + log_or_trace!(CAT, self.is_main_elem, obj: self.elem, "Starting Task"); + self.last_dts = None; + if let Some(stats) = self.stats.as_mut() { + stats.start(); } - self.last_dts = None; - self.stats.start(); Ok(()) } .boxed() @@ -472,101 +156,66 @@ impl TaskImpl for TestSinkTask { fn stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> { async { - if self.raise_log_level { - gst::log!(CAT, obj: self.element, "Stopping Task"); - } else { - gst::trace!(CAT, obj: self.element, "Stopping Task"); - } - - self.flush().await; + log_or_trace!(CAT, self.is_main_elem, obj: self.elem, "Stopping Task"); + self.flush(); Ok(()) } .boxed() } fn try_next(&mut self) -> BoxFuture<'_, Result> { - async move { - let item = self.item_receiver.next().await.unwrap(); - - if self.raise_log_level { - gst::log!(CAT, obj: self.element, "Popped item"); - } else { - gst::trace!(CAT, obj: self.element, "Popped item"); - } - - Ok(item) - } - .boxed() + self.item_receiver + .recv_async() + .map(|opt_item| Ok(opt_item.unwrap())) + .boxed() } fn handle_item(&mut self, item: StreamItem) -> BoxFuture<'_, Result<(), gst::FlowError>> { async move { - if self.raise_log_level { - gst::debug!(CAT, obj: self.element, "Received {:?}", item); - } else { - gst::trace!(CAT, obj: self.element, "Received {:?}", item); - } + debug_or_trace!(CAT, self.is_main_elem, obj: self.elem, "Received {item:?}"); match item { StreamItem::Buffer(buffer) => { - let dts = self - .segment - .as_ref() - .and_then(|segment| { - segment - .downcast_ref::() - .and_then(|segment| segment.to_running_time(buffer.dts())) - }) - .unwrap(); + let dts = buffer + .dts() + .expect("Buffer without dts") + .checked_sub(self.segment_start.expect("Buffer without Time Segment")) + .expect("dts before Segment start"); if let Some(last_dts) = self.last_dts { - let cur_ts = self.element.current_running_time().unwrap(); + let cur_ts = self.elem.current_running_time().unwrap(); let latency: Duration = (cur_ts - dts).into(); let interval: Duration = (dts - last_dts).into(); - self.stats.add_buffer(latency, interval); - - if self.raise_log_level { - gst::debug!(CAT, obj: self.element, "o latency {:.2?}", latency); - gst::debug!(CAT, obj: self.element, "o interval {:.2?}", interval); - } else { - gst::trace!(CAT, obj: self.element, "o latency {:.2?}", latency); - gst::trace!(CAT, obj: self.element, "o interval {:.2?}", interval); + if let Some(stats) = self.stats.as_mut() { + stats.add_buffer(latency, interval); } + + debug_or_trace!( + CAT, + self.is_main_elem, + obj: self.elem, + "o latency {latency:.2?}", + ); + debug_or_trace!( + CAT, + self.is_main_elem, + obj: self.elem, + "o interval {interval:.2?}", + ); } self.last_dts = Some(dts); - if self.raise_log_level { - gst::log!(CAT, obj: self.element, "Buffer processed"); - } else { - gst::trace!(CAT, obj: self.element, "Buffer processed"); + log_or_trace!(CAT, self.is_main_elem, obj: self.elem, "Buffer processed"); + } + StreamItem::Event(evt) => { + if let EventView::Segment(evt) = evt.view() { + if let Some(time_seg) = evt.segment().downcast_ref::() { + self.segment_start = time_seg.start(); + } } } - StreamItem::Event(event) => match event.view() { - EventView::Eos(_) => { - if self.raise_log_level { - gst::debug!(CAT, obj: self.element, "EOS"); - } else { - gst::trace!(CAT, obj: self.element, "EOS"); - } - - let elem = self.element.clone(); - self.element.call_async(move |_| { - let _ = - elem.post_message(gst::message::Eos::builder().src(&elem).build()); - }); - - return Err(gst::FlowError::Eos); - } - EventView::Segment(e) => { - self.segment = Some(e.segment().clone()); - } - EventView::SinkMessage(e) => { - let _ = self.element.post_message(e.message()); - } - _ => (), - }, } Ok(()) @@ -576,121 +225,88 @@ impl TaskImpl for TestSinkTask { } #[derive(Debug)] -pub struct TestSink { +pub struct TaskSink { sink_pad: PadSink, task: Task, item_sender: Mutex>>, settings: Mutex, } -impl TestSink { +impl TaskSink { #[track_caller] fn clone_item_sender(&self) -> flume::Sender { self.item_sender.lock().unwrap().as_ref().unwrap().clone() } fn prepare(&self) -> Result<(), gst::ErrorMessage> { - let raise_log_level = self.settings.lock().unwrap().raise_log_level; - if raise_log_level { - gst::debug!(CAT, imp: self, "Preparing"); + let settings = self.settings.lock().unwrap(); + let stats = if settings.logs_stats { + Some(Box::new(Stats::new( + settings.max_buffers, + settings.push_period + settings.context_wait / 2, + ))) } else { - gst::trace!(CAT, imp: self, "Preparing"); - } - - let context = { - let settings = self.settings.lock().unwrap(); - - Context::acquire(&settings.context, settings.context_wait).map_err(|err| { - error_msg!( - gst::ResourceError::OpenWrite, - ["Failed to acquire Context: {}", err] - ) - })? + None }; + debug_or_trace!(CAT, settings.is_main_elem, imp: self, "Preparing"); + + let ts_ctx = Context::acquire(&settings.context, settings.context_wait).map_err(|err| { + error_msg!( + gst::ResourceError::OpenWrite, + ["Failed to acquire Context: {}", err] + ) + })?; + // Enable backpressure for items let (item_sender, item_receiver) = flume::bounded(0); - let task_impl = TestSinkTask::new(&self.obj(), item_receiver); - self.task.prepare(task_impl, context).block_on()?; + let task_impl = TaskSinkTask::new(&self.obj(), item_receiver, settings.is_main_elem, stats); + self.task.prepare(task_impl, ts_ctx).block_on()?; *self.item_sender.lock().unwrap() = Some(item_sender); - if raise_log_level { - gst::debug!(CAT, imp: self, "Prepared"); - } else { - gst::trace!(CAT, imp: self, "Prepared"); - } + debug_or_trace!(CAT, settings.is_main_elem, imp: self, "Prepared"); Ok(()) } fn unprepare(&self) { - let raise_log_level = self.settings.lock().unwrap().raise_log_level; - if raise_log_level { - gst::debug!(CAT, imp: self, "Unpreparing"); - } else { - gst::trace!(CAT, imp: self, "Unpreparing"); - } - + let is_main_elem = self.settings.lock().unwrap().is_main_elem; + debug_or_trace!(CAT, is_main_elem, imp: self, "Unpreparing"); self.task.unprepare().block_on().unwrap(); - - if raise_log_level { - gst::debug!(CAT, imp: self, "Unprepared"); - } else { - gst::trace!(CAT, imp: self, "Unprepared"); - } + debug_or_trace!(CAT, is_main_elem, imp: self, "Unprepared"); } fn stop(&self) -> Result<(), gst::ErrorMessage> { - let raise_log_level = self.settings.lock().unwrap().raise_log_level; - if raise_log_level { - gst::debug!(CAT, imp: self, "Stopping"); - } else { - gst::trace!(CAT, imp: self, "Stopping"); - } - + let is_main_elem = self.settings.lock().unwrap().is_main_elem; + debug_or_trace!(CAT, is_main_elem, imp: self, "Stopping"); self.task.stop().block_on()?; - - if raise_log_level { - gst::debug!(CAT, imp: self, "Stopped"); - } else { - gst::trace!(CAT, imp: self, "Stopped"); - } + debug_or_trace!(CAT, is_main_elem, imp: self, "Stopped"); Ok(()) } fn start(&self) -> Result<(), gst::ErrorMessage> { - let raise_log_level = self.settings.lock().unwrap().raise_log_level; - if raise_log_level { - gst::debug!(CAT, imp: self, "Starting"); - } else { - gst::trace!(CAT, imp: self, "Starting"); - } - + let is_main_elem = self.settings.lock().unwrap().is_main_elem; + debug_or_trace!(CAT, is_main_elem, imp: self, "Starting"); self.task.start().block_on()?; - - if raise_log_level { - gst::debug!(CAT, imp: self, "Started"); - } else { - gst::trace!(CAT, imp: self, "Started"); - } + debug_or_trace!(CAT, is_main_elem, imp: self, "Started"); Ok(()) } } #[glib::object_subclass] -impl ObjectSubclass for TestSink { - const NAME: &'static str = "StandaloneTestSink"; - type Type = super::TestSink; +impl ObjectSubclass for TaskSink { + const NAME: &'static str = "TsStandaloneTaskSink"; + type Type = super::TaskSink; type ParentType = gst::Element; fn with_class(klass: &Self::Class) -> Self { Self { sink_pad: PadSink::new( gst::Pad::from_template(&klass.pad_template("sink").unwrap(), Some("sink")), - TestSinkPadHandler, + TaskPadSinkHandler, ), task: Task::default(), item_sender: Default::default(), @@ -699,96 +315,18 @@ impl ObjectSubclass for TestSink { } } -impl ObjectImpl for TestSink { +impl ObjectImpl for TaskSink { fn properties() -> &'static [glib::ParamSpec] { - static PROPERTIES: Lazy> = Lazy::new(|| { - vec![ - glib::ParamSpecString::builder("context") - .nick("Context") - .blurb("Context name to share threads with") - .default_value(Some(DEFAULT_CONTEXT)) - .build(), - glib::ParamSpecUInt::builder("context-wait") - .nick("Context Wait") - .blurb("Throttle poll loop to run at most once every this many ms") - .maximum(1000) - .default_value(DEFAULT_CONTEXT_WAIT.as_millis() as u32) - .build(), - glib::ParamSpecBoolean::builder("raise-log-level") - .nick("Raise log level") - .blurb("Raises the log level so that this element stands out") - .write_only() - .build(), - glib::ParamSpecBoolean::builder("logs-stats") - .nick("Logs Stats") - .blurb("Whether statistics should be logged") - .write_only() - .build(), - glib::ParamSpecUInt::builder("push-period") - .nick("Src buffer Push Period") - .blurb("Push period used by `src` element (used for stats warnings)") - .default_value(DEFAULT_PUSH_PERIOD.as_millis() as u32) - .build(), - glib::ParamSpecInt::builder("max-buffers") - .nick("Max Buffers") - .blurb("Number of buffers to count before stopping stats (-1 = unlimited)") - .minimum(-1i32) - .default_value(DEFAULT_MAX_BUFFERS) - .build(), - ] - }); - + static PROPERTIES: Lazy> = Lazy::new(Settings::properties); PROPERTIES.as_ref() } - fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) { - let mut settings = self.settings.lock().unwrap(); - match pspec.name() { - "context" => { - settings.context = value - .get::>() - .expect("type checked upstream") - .unwrap_or_else(|| DEFAULT_CONTEXT.into()); - } - "context-wait" => { - settings.context_wait = Duration::from_millis( - value.get::().expect("type checked upstream").into(), - ); - } - "raise-log-level" => { - settings.raise_log_level = value.get::().expect("type checked upstream"); - } - "logs-stats" => { - let logs_stats = value.get().expect("type checked upstream"); - settings.logs_stats = logs_stats; - } - "push-period" => { - settings.push_period = Duration::from_millis( - value.get::().expect("type checked upstream").into(), - ); - } - "max-buffers" => { - let value = value.get::().expect("type checked upstream"); - settings.max_buffers = if value > 0 { Some(value as u32) } else { None }; - } - _ => unimplemented!(), - } + fn set_property(&self, id: usize, value: &glib::Value, pspec: &glib::ParamSpec) { + self.settings.lock().unwrap().set_property(id, value, pspec); } - fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { - let settings = self.settings.lock().unwrap(); - match pspec.name() { - "context" => settings.context.to_value(), - "context-wait" => (settings.context_wait.as_millis() as u32).to_value(), - "raise-log-level" => settings.raise_log_level.to_value(), - "push-period" => (settings.push_period.as_millis() as u32).to_value(), - "max-buffers" => settings - .max_buffers - .and_then(|val| val.try_into().ok()) - .unwrap_or(-1i32) - .to_value(), - _ => unimplemented!(), - } + fn property(&self, id: usize, pspec: &glib::ParamSpec) -> glib::Value { + self.settings.lock().unwrap().property(id, pspec) } fn constructed(&self) { @@ -800,15 +338,15 @@ impl ObjectImpl for TestSink { } } -impl GstObjectImpl for TestSink {} +impl GstObjectImpl for TaskSink {} -impl ElementImpl for TestSink { +impl ElementImpl for TaskSink { fn metadata() -> Option<&'static gst::subclass::ElementMetadata> { static ELEMENT_METADATA: Lazy = Lazy::new(|| { gst::subclass::ElementMetadata::new( - "Thread-sharing standalone test sink", + "Thread-sharing standalone test task sink", "Sink/Test", - "Thread-sharing standalone test sink", + "Thread-sharing standalone test task sink", "François Laignel ", ) }); @@ -838,7 +376,7 @@ impl ElementImpl for TestSink { &self, transition: gst::StateChange, ) -> Result { - gst::trace!(CAT, imp: self, "Changing state {:?}", transition); + gst::trace!(CAT, imp: self, "Changing state {transition:?}"); match transition { gst::StateChange::NullToReady => { diff --git a/generic/threadshare/examples/standalone/sink/task/mod.rs b/generic/threadshare/examples/standalone/sink/task/mod.rs index 6b8ce952..1dea85a1 100644 --- a/generic/threadshare/examples/standalone/sink/task/mod.rs +++ b/generic/threadshare/examples/standalone/sink/task/mod.rs @@ -4,14 +4,14 @@ use gst::prelude::*; mod imp; glib::wrapper! { - pub struct TestSink(ObjectSubclass) @extends gst::Element, gst::Object; + pub struct TaskSink(ObjectSubclass) @extends gst::Element, gst::Object; } pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { gst::Element::register( Some(plugin), - "ts-standalone-test-sink", + super::TASK_ELEMENT_NAME, gst::Rank::None, - TestSink::static_type(), + TaskSink::static_type(), ) } diff --git a/generic/threadshare/examples/standalone/src/imp.rs b/generic/threadshare/examples/standalone/src/imp.rs index 521d63c1..c6f9c82b 100644 --- a/generic/threadshare/examples/standalone/src/imp.rs +++ b/generic/threadshare/examples/standalone/src/imp.rs @@ -19,11 +19,11 @@ use std::sync::Mutex; use std::time::Duration; use gstthreadshare::runtime::prelude::*; -use gstthreadshare::runtime::{timer, Context, PadSrc, Task}; +use gstthreadshare::runtime::{task, timer, Context, PadSrc, Task}; static CAT: Lazy = Lazy::new(|| { gst::DebugCategory::new( - "ts-standalone-test-src", + super::ELEMENT_NAME, gst::DebugColorFlags::empty(), Some("Thread-sharing standalone test src"), ) @@ -39,7 +39,7 @@ struct Settings { context: String, context_wait: Duration, push_period: gst::ClockTime, - raise_log_level: bool, + is_main_elem: bool, num_buffers: Option, } @@ -49,7 +49,7 @@ impl Default for Settings { context: DEFAULT_CONTEXT.into(), context_wait: DEFAULT_CONTEXT_WAIT, push_period: DEFAULT_PUSH_PERIOD, - raise_log_level: false, + is_main_elem: false, num_buffers: Some(DEFAULT_NUM_BUFFERS as u32), } } @@ -63,19 +63,18 @@ impl PadSrcHandler for TestSrcPadHandler { #[derive(Debug)] struct SrcTask { - element: super::TestSrc, + elem: super::TestSrc, buffer_pool: gst::BufferPool, timer: Option, - raise_log_level: bool, + is_main_elem: bool, push_period: gst::ClockTime, need_initial_events: bool, - need_segment: bool, num_buffers: Option, buffer_count: u32, } impl SrcTask { - fn new(element: super::TestSrc) -> Self { + fn new(elem: super::TestSrc) -> Self { let buffer_pool = gst::BufferPool::new(); let mut pool_config = buffer_pool.config(); pool_config @@ -84,13 +83,12 @@ impl SrcTask { buffer_pool.set_config(pool_config).unwrap(); SrcTask { - element, + elem, buffer_pool, timer: None, - raise_log_level: false, + is_main_elem: false, push_period: gst::ClockTime::ZERO, need_initial_events: true, - need_segment: true, num_buffers: Some(DEFAULT_NUM_BUFFERS as u32), buffer_count: 0, } @@ -98,34 +96,48 @@ impl SrcTask { } impl TaskImpl for SrcTask { - type Item = gst::Buffer; + type Item = (); fn prepare(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> { - async move { - let src = self.element.imp(); - let settings = src.settings.lock().unwrap(); - self.raise_log_level = settings.raise_log_level; + let imp = self.elem.imp(); + let settings = imp.settings.lock().unwrap(); + self.is_main_elem = settings.is_main_elem; - if self.raise_log_level { - gst::log!(CAT, obj: self.element, "Preparing Task"); - } else { - gst::trace!(CAT, obj: self.element, "Preparing Task"); - } + log_or_trace!(CAT, self.is_main_elem, imp: imp, "Preparing Task"); - self.push_period = settings.push_period; - self.num_buffers = settings.num_buffers; + self.push_period = settings.push_period; + self.num_buffers = settings.num_buffers; - Ok(()) - } - .boxed() + future::ok(()).boxed() } fn start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> { - async { - if self.raise_log_level { - gst::log!(CAT, obj: self.element, "Starting Task"); - } else { - gst::trace!(CAT, obj: self.element, "Starting Task"); + async move { + log_or_trace!(CAT, self.is_main_elem, obj: self.elem, "Starting Task"); + + if self.need_initial_events { + let imp = self.elem.imp(); + + debug_or_trace!(CAT, self.is_main_elem, obj: self.elem, "Pushing initial events"); + + let stream_id = + format!("{:08x}{:08x}", rand::random::(), rand::random::()); + let stream_start_evt = gst::event::StreamStart::builder(&stream_id) + .group_id(gst::GroupId::next()) + .build(); + imp.src_pad.push_event(stream_start_evt).await; + + imp.src_pad + .push_event(gst::event::Caps::new( + &gst::Caps::builder("foo/bar").build(), + )) + .await; + + let segment_evt = + gst::event::Segment::new(&gst::FormattedSegment::::new()); + imp.src_pad.push_event(segment_evt).await; + + self.need_initial_events = false; } self.timer = Some( @@ -138,178 +150,100 @@ impl TaskImpl for SrcTask { ); self.buffer_count = 0; self.buffer_pool.set_active(true).unwrap(); + Ok(()) } .boxed() } fn stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> { - async move { - if self.raise_log_level { - gst::log!(CAT, obj: self.element, "Stopping Task"); - } else { - gst::trace!(CAT, obj: self.element, "Stopping Task"); - } + log_or_trace!(CAT, self.is_main_elem, obj: self.elem, "Stopping Task"); + self.buffer_pool.set_active(false).unwrap(); + self.timer = None; + self.need_initial_events = true; - self.buffer_pool.set_active(false).unwrap(); - self.timer = None; - self.need_initial_events = true; - self.need_segment = true; + future::ok(()).boxed() + } + + fn try_next(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> { + async move { + log_or_trace!(CAT, self.is_main_elem, obj: self.elem, "Awaiting timer"); + self.timer.as_mut().unwrap().next().await; + log_or_trace!(CAT, self.is_main_elem, obj: self.elem, "Timer ticked"); Ok(()) } .boxed() } - fn try_next(&mut self) -> BoxFuture<'_, Result> { + fn handle_item(&mut self, _: ()) -> BoxFuture<'_, Result<(), gst::FlowError>> { async move { - if self.raise_log_level { - gst::log!(CAT, obj: self.element, "Awaiting timer"); - } else { - gst::trace!(CAT, obj: self.element, "Awaiting timer"); - } - - self.timer.as_mut().unwrap().next().await; - - if self.raise_log_level { - gst::log!(CAT, obj: self.element, "Timer ticked"); - } else { - gst::trace!(CAT, obj: self.element, "Timer ticked"); - } - - self.buffer_pool + let buffer = self + .buffer_pool .acquire_buffer(None) .map(|mut buffer| { { let buffer = buffer.get_mut().unwrap(); - let rtime = self.element.current_running_time().unwrap(); + let rtime = self.elem.current_running_time().unwrap(); buffer.set_dts(rtime); } buffer }) .map_err(|err| { - gst::error!(CAT, obj: self.element, "Failed to acquire buffer {}", err); + gst::error!(CAT, obj: self.elem, "Failed to acquire buffer {err}"); err - }) + })?; + + debug_or_trace!(CAT, self.is_main_elem, obj: self.elem, "Forwarding buffer"); + self.elem.imp().src_pad.push(buffer).await?; + log_or_trace!(CAT, self.is_main_elem, obj: self.elem, "Successfully pushed buffer"); + + self.buffer_count += 1; + + if self.num_buffers.opt_eq(self.buffer_count) == Some(true) { + return Err(gst::FlowError::Eos); + } + + Ok(()) } .boxed() } - fn handle_item(&mut self, buffer: gst::Buffer) -> BoxFuture<'_, Result<(), gst::FlowError>> { + fn handle_loop_error(&mut self, err: gst::FlowError) -> BoxFuture<'_, task::Trigger> { async move { - let res = self.push(buffer).await; - match res { - Ok(_) => { - if self.raise_log_level { - gst::log!(CAT, obj: self.element, "Successfully pushed buffer"); - } else { - gst::trace!(CAT, obj: self.element, "Successfully pushed buffer"); - } - } - Err(gst::FlowError::Eos) => { - if self.raise_log_level { - gst::debug!(CAT, obj: self.element, "EOS"); - } else { - gst::trace!(CAT, obj: self.element, "EOS"); - } - let test_src = self.element.imp(); - test_src.src_pad.push_event(gst::event::Eos::new()).await; + match err { + gst::FlowError::Eos => { + debug_or_trace!(CAT, self.is_main_elem, obj: self.elem, "Pushing EOS"); - return Err(gst::FlowError::Eos); - } - Err(gst::FlowError::Flushing) => { - if self.raise_log_level { - gst::debug!(CAT, obj: self.element, "Flushing"); - } else { - gst::trace!(CAT, obj: self.element, "Flushing"); + let imp = self.elem.imp(); + if !imp.src_pad.push_event(gst::event::Eos::new()).await { + gst::error!(CAT, imp: imp, "Error pushing EOS"); } + + task::Trigger::Stop } - Err(err) => { - gst::error!(CAT, obj: self.element, "Got error {}", err); + gst::FlowError::Flushing => { + debug_or_trace!(CAT, self.is_main_elem, obj: self.elem, "Flushing"); + + task::Trigger::FlushStart + } + err => { + gst::error!(CAT, obj: self.elem, "Got error {err}"); gst::element_error!( - &self.element, + &self.elem, gst::StreamError::Failed, ("Internal data stream error"), ["streaming stopped, reason {}", err] ); + + task::Trigger::Error } } - - res.map(drop) } .boxed() } } -impl SrcTask { - async fn push(&mut self, buffer: gst::Buffer) -> Result { - if self.raise_log_level { - gst::debug!(CAT, obj: self.element, "Pushing {:?}", buffer); - } else { - gst::trace!(CAT, obj: self.element, "Pushing {:?}", buffer); - } - - let test_src = self.element.imp(); - - if self.need_initial_events { - if self.raise_log_level { - gst::debug!(CAT, obj: self.element, "Pushing initial events"); - } else { - gst::trace!(CAT, obj: self.element, "Pushing initial events"); - } - - let stream_id = format!("{:08x}{:08x}", rand::random::(), rand::random::()); - let stream_start_evt = gst::event::StreamStart::builder(&stream_id) - .group_id(gst::GroupId::next()) - .build(); - test_src.src_pad.push_event(stream_start_evt).await; - - test_src - .src_pad - .push_event(gst::event::Caps::new( - &gst::Caps::builder("foo/bar").build(), - )) - .await; - - self.need_initial_events = false; - } - - if self.need_segment { - let segment_evt = - gst::event::Segment::new(&gst::FormattedSegment::::new()); - test_src.src_pad.push_event(segment_evt).await; - - self.need_segment = false; - } - - if self.raise_log_level { - gst::debug!(CAT, obj: self.element, "Forwarding buffer"); - } else { - gst::trace!(CAT, obj: self.element, "Forwarding buffer"); - } - - let ok = test_src.src_pad.push(buffer).await?; - - self.buffer_count += 1; - - if self.num_buffers.opt_eq(self.buffer_count).unwrap_or(false) { - if self.raise_log_level { - gst::debug!(CAT, obj: self.element, "Pushing EOS"); - } else { - gst::trace!(CAT, obj: self.element, "Pushing EOS"); - } - - let test_src = self.element.imp(); - if !test_src.src_pad.push_event(gst::event::Eos::new()).await { - gst::error!(CAT, obj: self.element, "Error pushing EOS"); - } - return Err(gst::FlowError::Eos); - } - - Ok(ok) - } -} - #[derive(Debug)] pub struct TestSrc { src_pad: PadSrc, @@ -319,106 +253,57 @@ pub struct TestSrc { impl TestSrc { fn prepare(&self) -> Result<(), gst::ErrorMessage> { - let raise_log_level = self.settings.lock().unwrap().raise_log_level; - if raise_log_level { - gst::debug!(CAT, imp: self, "Preparing"); - } else { - gst::trace!(CAT, imp: self, "Preparing"); - } + let is_main_elem = self.settings.lock().unwrap().is_main_elem; + debug_or_trace!(CAT, is_main_elem, imp: self, "Preparing"); let settings = self.settings.lock().unwrap(); - let context = - Context::acquire(&settings.context, settings.context_wait).map_err(|err| { - gst::error_msg!( - gst::ResourceError::OpenRead, - ["Failed to acquire Context: {}", err] - ) - })?; + let ts_ctx = Context::acquire(&settings.context, settings.context_wait).map_err(|err| { + gst::error_msg!( + gst::ResourceError::OpenRead, + ["Failed to acquire Context: {}", err] + ) + })?; drop(settings); self.task - .prepare(SrcTask::new(self.obj().clone()), context) + .prepare(SrcTask::new(self.instance().clone()), ts_ctx) .block_on()?; - if raise_log_level { - gst::debug!(CAT, imp: self, "Prepared"); - } else { - gst::trace!(CAT, imp: self, "Prepared"); - } + debug_or_trace!(CAT, is_main_elem, imp: self, "Prepared"); Ok(()) } fn unprepare(&self) { - let raise_log_level = self.settings.lock().unwrap().raise_log_level; - if raise_log_level { - gst::debug!(CAT, imp: self, "Unpreparing"); - } else { - gst::trace!(CAT, imp: self, "Unpreparing"); - } - + let is_main_elem = self.settings.lock().unwrap().is_main_elem; + debug_or_trace!(CAT, is_main_elem, imp: self, "Unpreparing"); self.task.unprepare().block_on().unwrap(); - - if raise_log_level { - gst::debug!(CAT, imp: self, "Unprepared"); - } else { - gst::trace!(CAT, imp: self, "Unprepared"); - } + debug_or_trace!(CAT, is_main_elem, imp: self, "Unprepared"); } fn stop(&self) -> Result<(), gst::ErrorMessage> { - let raise_log_level = self.settings.lock().unwrap().raise_log_level; - if raise_log_level { - gst::debug!(CAT, imp: self, "Stopping"); - } else { - gst::trace!(CAT, imp: self, "Stopping"); - } - + let is_main_elem = self.settings.lock().unwrap().is_main_elem; + debug_or_trace!(CAT, is_main_elem, imp: self, "Stopping"); self.task.stop().block_on()?; - - if raise_log_level { - gst::debug!(CAT, imp: self, "Stopped"); - } else { - gst::trace!(CAT, imp: self, "Stopped"); - } + debug_or_trace!(CAT, is_main_elem, imp: self, "Stopped"); Ok(()) } fn start(&self) -> Result<(), gst::ErrorMessage> { - let raise_log_level = self.settings.lock().unwrap().raise_log_level; - if raise_log_level { - gst::debug!(CAT, imp: self, "Starting"); - } else { - gst::trace!(CAT, imp: self, "Starting"); - } - + let is_main_elem = self.settings.lock().unwrap().is_main_elem; + debug_or_trace!(CAT, is_main_elem, imp: self, "Starting"); self.task.start().block_on()?; - - if raise_log_level { - gst::debug!(CAT, imp: self, "Started"); - } else { - gst::trace!(CAT, imp: self, "Started"); - } + debug_or_trace!(CAT, is_main_elem, imp: self, "Started"); Ok(()) } fn pause(&self) -> Result<(), gst::ErrorMessage> { - let raise_log_level = self.settings.lock().unwrap().raise_log_level; - if raise_log_level { - gst::debug!(CAT, imp: self, "Pausing"); - } else { - gst::trace!(CAT, imp: self, "Pausing"); - } - + let is_main_elem = self.settings.lock().unwrap().is_main_elem; + debug_or_trace!(CAT, is_main_elem, imp: self, "Pausing"); self.task.pause().block_on()?; - - if raise_log_level { - gst::debug!(CAT, imp: self, "Paused"); - } else { - gst::trace!(CAT, imp: self, "Paused"); - } + debug_or_trace!(CAT, is_main_elem, imp: self, "Paused"); Ok(()) } @@ -462,9 +347,9 @@ impl ObjectImpl for TestSrc { .blurb("Push a new buffer every this many ms") .default_value(DEFAULT_PUSH_PERIOD.mseconds() as u32) .build(), - glib::ParamSpecBoolean::builder("raise-log-level") - .nick("Raise log level") - .blurb("Raises the log level so that this element stands out") + glib::ParamSpecBoolean::builder("main-elem") + .nick("Main Element") + .blurb("Declare this element as the main one") .write_only() .build(), glib::ParamSpecInt::builder("num-buffers") @@ -485,24 +370,21 @@ impl ObjectImpl for TestSrc { "context" => { settings.context = value .get::>() - .expect("type checked upstream") + .unwrap() .unwrap_or_else(|| DEFAULT_CONTEXT.into()); } "context-wait" => { - settings.context_wait = Duration::from_millis( - value.get::().expect("type checked upstream").into(), - ); + settings.context_wait = Duration::from_millis(value.get::().unwrap().into()); } "push-period" => { - settings.push_period = gst::ClockTime::from_mseconds( - value.get::().expect("type checked upstream").into(), - ); + let value: u64 = value.get::().unwrap().into(); + settings.push_period = value.mseconds(); } - "raise-log-level" => { - settings.raise_log_level = value.get::().expect("type checked upstream"); + "main-elem" => { + settings.is_main_elem = value.get::().unwrap(); } "num-buffers" => { - let value = value.get::().expect("type checked upstream"); + let value = value.get::().unwrap(); settings.num_buffers = if value > 0 { Some(value as u32) } else { None }; } _ => unimplemented!(), @@ -515,7 +397,7 @@ impl ObjectImpl for TestSrc { "context" => settings.context.to_value(), "context-wait" => (settings.context_wait.as_millis() as u32).to_value(), "push-period" => (settings.push_period.mseconds() as u32).to_value(), - "raise-log-level" => settings.raise_log_level.to_value(), + "main-elem" => settings.is_main_elem.to_value(), "num-buffers" => settings .num_buffers .and_then(|val| val.try_into().ok()) @@ -571,7 +453,7 @@ impl ElementImpl for TestSrc { &self, transition: gst::StateChange, ) -> Result { - gst::trace!(CAT, imp: self, "Changing state {:?}", transition); + gst::trace!(CAT, imp: self, "Changing state {transition:?}"); match transition { gst::StateChange::NullToReady => { diff --git a/generic/threadshare/examples/standalone/src/mod.rs b/generic/threadshare/examples/standalone/src/mod.rs index c0fa2777..0a2cbcf8 100644 --- a/generic/threadshare/examples/standalone/src/mod.rs +++ b/generic/threadshare/examples/standalone/src/mod.rs @@ -3,6 +3,8 @@ use gst::prelude::*; mod imp; +pub const ELEMENT_NAME: &str = "ts-standalone-src"; + glib::wrapper! { pub struct TestSrc(ObjectSubclass) @extends gst::Element, gst::Object; } @@ -10,7 +12,7 @@ glib::wrapper! { pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { gst::Element::register( Some(plugin), - "ts-standalone-test-src", + "ts-standalone-src", gst::Rank::None, TestSrc::static_type(), )