ts/standalone: add new Sinks

Contrary to the existing Task Sink, the Async and Sync Mutex Sinks
handle buffers in the `PadSinkHandler` directly. The Async Mutex
Sink uses an async Mutex for the `PadSinkHandlerInner` while the
Sync Mutex Sink uses... a sync Mutex.

All Sinks share the same settings and stats manager.

Use the `--sink` command line option to select the sink (default is
`sync-mutex` since it allows evaluating the framework with as little
overhead as possible.

Also apply various fixes:

- Only keep the segment start instead of the full `Segment`. This
  helps with cache locality (`Segment` is a plain struct with many
  fields) and avoids downcasting the generic `Segment` upon each
  buffer handling.
- Box the `Stat`s. This should improve cache locality a bit.
- Fix EOS handling which took ages for no benefits in this
  particular use case.
- Use a macro to raise log level in the main element.
- Move error handling during item processing in `handle_loop_error`.
  This function was precisely designed for this and it should reduce
  the `handle_item`'s Future size.
This commit is contained in:
François Laignel 2022-10-15 20:13:32 +02:00 committed by Sebastian Dröge
parent 4616f0a4a4
commit 9b96cfc452
16 changed files with 1557 additions and 951 deletions

View file

@ -0,0 +1,66 @@
use super::super::CAT;
use clap::Parser;
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, clap::ValueEnum)]
pub enum Sink {
/// Item handling in PadHandler with async Mutex
AsyncMutex,
/// Item handling in PadHandler with sync Mutex
SyncMutex,
/// Item handling in runtime::Task
Task,
}
impl Sink {
pub fn element_name(self) -> &'static str {
use super::super::sink;
use Sink::*;
match self {
AsyncMutex => sink::ASYNC_MUTEX_ELEMENT_NAME,
SyncMutex => sink::SYNC_MUTEX_ELEMENT_NAME,
Task => sink::TASK_ELEMENT_NAME,
}
}
}
#[derive(Parser, Debug)]
#[clap(version)]
#[clap(
about = "Standalone pipeline threadshare runtime test. Use `GST_DEBUG=ts-standalone*:4` for stats"
)]
pub struct Args {
/// Parallel streams to process.
#[clap(short, long, default_value_t = 5000)]
pub streams: u32,
/// Threadshare groups.
#[clap(short, long, default_value_t = 2)]
pub groups: u32,
/// Threadshare Context wait in ms (max throttling duration).
#[clap(short, long, default_value_t = 20)]
pub wait: u32,
/// Buffer push period in ms.
#[clap(short, long, default_value_t = 20)]
pub push_period: u32,
/// Number of buffers per stream to output before sending EOS (-1 = unlimited).
#[clap(short, long, default_value_t = 5000)]
pub num_buffers: i32,
/// The Sink variant to use.
#[clap(long, value_enum, default_value_t = Sink::SyncMutex)]
pub sink: Sink,
/// Disables statistics logging.
#[clap(short, long)]
pub disable_stats_log: bool,
}
pub fn args() -> Args {
let args = Args::parse();
gst::info!(CAT, "{:?}", args);
args
}

View file

@ -0,0 +1,47 @@
use super::super::CAT;
#[derive(Copy, Clone, Debug)]
pub struct SyncMutexSink;
impl SyncMutexSink {
pub fn element_name(self) -> &'static str {
super::super::sink::SYNC_MUTEX_ELEMENT_NAME
}
}
#[derive(Debug)]
pub struct Args {
pub streams: u32,
pub groups: u32,
pub wait: u32,
pub push_period: u32,
pub num_buffers: i32,
pub sink: SyncMutexSink,
pub disable_stats_log: bool,
}
impl Default for Args {
fn default() -> Self {
Args {
streams: 5000,
groups: 2,
wait: 20,
push_period: 20,
num_buffers: 5000,
sink: SyncMutexSink,
disable_stats_log: false,
}
}
}
pub fn args() -> Args {
if std::env::args().len() > 1 {
gst::warning!(CAT, "Ignoring command line arguments");
gst::warning!(CAT, "Build with `--features=clap`");
}
let args = Args::default();
gst::warning!(CAT, "{:?}", args);
args
}

View file

@ -0,0 +1,9 @@
#[cfg(not(feature = "clap"))]
mod default_args;
#[cfg(not(feature = "clap"))]
pub use default_args::*;
#[cfg(feature = "clap")]
mod clap_args;
#[cfg(feature = "clap")]
pub use clap_args::*;

View file

@ -0,0 +1,19 @@
macro_rules! debug_or_trace {
($cat:expr, $raise_log_level:expr, $qual:ident: $obj:expr, $rest:tt $(,)?) => {
if $raise_log_level {
gst::debug!($cat, $qual: $obj, $rest);
} else {
gst::trace!($cat, $qual: $obj, $rest);
}
};
}
macro_rules! log_or_trace {
($cat:expr, $raise_log_level:expr, $qual:ident: $obj:expr, $rest:tt $(,)?) => {
if $raise_log_level {
gst::log!($cat, $qual: $obj, $rest);
} else {
gst::trace!($cat, $qual: $obj, $rest);
}
};
}

View file

@ -1,12 +1,21 @@
use gst::glib;
use once_cell::sync::Lazy;
mod args;
use args::*;
#[macro_use]
mod macros;
mod sink;
mod src;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
gst::DebugCategory::new(
"ts-standalone-test-main",
"ts-standalone-main",
gst::DebugColorFlags::empty(),
Some("Thread-sharing standalone test main"),
)
@ -14,6 +23,8 @@ static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
src::register(plugin)?;
sink::async_mutex::register(plugin)?;
sink::sync_mutex::register(plugin)?;
sink::task::register(plugin)?;
Ok(())
@ -31,91 +42,6 @@ gst::plugin_define!(
env!("BUILD_REL_DATE")
);
#[cfg(feature = "clap")]
use clap::Parser;
#[cfg(feature = "clap")]
#[derive(Parser, Debug)]
#[clap(version)]
#[clap(
about = "Standalone pipeline threadshare runtime test. Use `GST_DEBUG=ts-standalone*:4` for stats"
)]
struct Args {
/// Parallel streams to process.
#[clap(short, long, default_value_t = 5000)]
streams: u32,
/// Threadshare groups.
#[clap(short, long, default_value_t = 2)]
groups: u32,
/// Threadshare Context wait in ms (max throttling duration).
#[clap(short, long, default_value_t = 20)]
wait: u32,
/// Buffer push period in ms.
#[clap(short, long, default_value_t = 20)]
push_period: u32,
/// Number of buffers per stream to output before sending EOS (-1 = unlimited).
#[clap(short, long, default_value_t = 5000)]
num_buffers: i32,
/// Disables statistics logging.
#[clap(short, long)]
disable_stats_log: bool,
}
#[cfg(not(feature = "clap"))]
#[derive(Debug)]
struct Args {
streams: u32,
groups: u32,
wait: u32,
push_period: u32,
num_buffers: i32,
disable_stats_log: bool,
}
#[cfg(not(feature = "clap"))]
impl Default for Args {
fn default() -> Self {
Args {
streams: 5000,
groups: 2,
wait: 20,
push_period: 20,
num_buffers: 5000,
disable_stats_log: false,
}
}
}
fn args() -> Args {
#[cfg(feature = "clap")]
let args = {
let args = Args::parse();
gst::info!(CAT, "{:?}", args);
args
};
#[cfg(not(feature = "clap"))]
let args = {
if std::env::args().len() > 1 {
gst::warning!(CAT, "Ignoring command line arguments");
gst::warning!(CAT, "Build with `--features=clap`");
}
let args = Args::default();
gst::warning!(CAT, "{:?}", args);
args
};
args
}
fn main() {
use gst::prelude::*;
use std::time::Instant;
@ -133,8 +59,8 @@ fn main() {
for i in 0..args.streams {
let ctx_name = format!("standalone {}", i % args.groups);
let src = gst::ElementFactory::make("ts-standalone-test-src")
.name(format!("src-{}", i).as_str())
let src = gst::ElementFactory::make(src::ELEMENT_NAME)
.name(format!("src-{i}").as_str())
.property("context", &ctx_name)
.property("context-wait", args.wait)
.property("push-period", args.push_period)
@ -142,16 +68,16 @@ fn main() {
.build()
.unwrap();
let sink = gst::ElementFactory::make("ts-standalone-test-sink")
.name(format!("sink-{}", i).as_str())
let sink = gst::ElementFactory::make(args.sink.element_name())
.name(format!("sink-{i}").as_str())
.property("context", &ctx_name)
.property("context-wait", args.wait)
.build()
.unwrap();
if i == 0 {
src.set_property("raise-log-level", true);
sink.set_property("raise-log-level", true);
src.set_property("main-elem", true);
sink.set_property("main-elem", true);
if !args.disable_stats_log {
// Don't use the last 5 secs in stats
@ -179,30 +105,46 @@ fn main() {
let l = glib::MainLoop::new(None, false);
let bus = pipeline.bus().unwrap();
let terminated_count = Arc::new(AtomicU32::new(0));
let pipeline_clone = pipeline.clone();
let l_clone = l.clone();
bus.add_watch(move |_, msg| {
use gst::MessageView;
match msg.view() {
MessageView::Eos(_) => {
// Actually, we don't post EOS (see sinks impl).
gst::info!(CAT, "Received eos");
l_clone.quit();
glib::Continue(false)
}
MessageView::Error(err) => {
MessageView::Error(msg) => {
if let gst::MessageView::Error(msg) = msg.message().view() {
if msg.error().matches(gst::LibraryError::Shutdown) {
if terminated_count.fetch_add(1, Ordering::SeqCst) == args.streams - 1 {
gst::info!(CAT, "Received all shutdown requests");
l_clone.quit();
return glib::Continue(false);
} else {
return glib::Continue(true);
}
}
}
gst::error!(
CAT,
"Error from {:?}: {} ({:?})",
err.src().map(|s| s.path_string()),
err.error(),
err.debug()
msg.src().map(|s| s.path_string()),
msg.error(),
msg.debug()
);
l_clone.quit();
}
_ => (),
};
glib::Continue(true)
glib::Continue(false)
}
_ => glib::Continue(true),
}
})
.expect("Failed to add bus watch");

View file

@ -0,0 +1,334 @@
// Copyright (C) 2022 François Laignel <fengalin@free.fr>
//
// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
// If a copy of the MPL was not distributed with this file, You can obtain one at
// <https://mozilla.org/MPL/2.0/>.
//
// SPDX-License-Identifier: MPL-2.0
use futures::future::BoxFuture;
use futures::prelude::*;
use gst::glib;
use gst::prelude::*;
use gst::subclass::prelude::*;
use gst::EventView;
use once_cell::sync::Lazy;
use gstthreadshare::runtime::executor::block_on_or_add_sub_task;
use gstthreadshare::runtime::{prelude::*, PadSink};
use std::sync::{Arc, Mutex};
use std::time::Duration;
use super::super::{Settings, Stats, CAT};
#[derive(Debug, Default)]
struct PadSinkHandlerInner {
is_flushing: bool,
is_main_elem: bool,
last_dts: Option<gst::ClockTime>,
segment_start: Option<gst::ClockTime>,
stats: Option<Box<Stats>>,
}
impl PadSinkHandlerInner {
fn handle_buffer(
&mut self,
elem: &super::AsyncMutexSink,
buffer: gst::Buffer,
) -> Result<(), gst::FlowError> {
if self.is_flushing {
log_or_trace!(
CAT,
self.is_main_elem,
obj: elem,
"Discarding {buffer:?} (flushing)"
);
return Err(gst::FlowError::Flushing);
}
debug_or_trace!(CAT, self.is_main_elem, obj: elem, "Received {buffer:?}");
let dts = buffer
.dts()
.expect("Buffer without dts")
.checked_sub(self.segment_start.expect("Buffer without Time Segment"))
.expect("dts before Segment start");
if let Some(last_dts) = self.last_dts {
let cur_ts = elem.current_running_time().unwrap();
let latency: Duration = (cur_ts - dts).into();
let interval: Duration = (dts - last_dts).into();
if let Some(stats) = self.stats.as_mut() {
stats.add_buffer(latency, interval);
}
debug_or_trace!(CAT, self.is_main_elem, obj: elem, "o latency {latency:.2?}");
debug_or_trace!(
CAT,
self.is_main_elem,
obj: elem,
"o interval {interval:.2?}",
);
}
self.last_dts = Some(dts);
log_or_trace!(CAT, self.is_main_elem, obj: elem, "Buffer processed");
Ok(())
}
}
#[derive(Clone, Debug, Default)]
struct AsyncPadSinkHandler(Arc<futures::lock::Mutex<PadSinkHandlerInner>>);
impl PadSinkHandler for AsyncPadSinkHandler {
type ElementImpl = AsyncMutexSink;
fn sink_chain(
self,
_pad: gst::Pad,
elem: super::AsyncMutexSink,
buffer: gst::Buffer,
) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> {
async move {
if self.0.lock().await.handle_buffer(&elem, buffer).is_err() {
return Err(gst::FlowError::Flushing);
}
Ok(gst::FlowSuccess::Ok)
}
.boxed()
}
fn sink_event_serialized(
self,
_pad: gst::Pad,
elem: super::AsyncMutexSink,
event: gst::Event,
) -> BoxFuture<'static, bool> {
async move {
match event.view() {
EventView::Eos(_) => {
{
let mut inner = self.0.lock().await;
debug_or_trace!(CAT, inner.is_main_elem, obj: elem, "EOS");
inner.is_flushing = true;
}
// When each element sends its own EOS message,
// it takes ages for the pipeline to process all of them.
// Let's just post an error message and let main shuts down
// after all streams have posted this message.
let _ = elem
.post_message(gst::message::Error::new(gst::LibraryError::Shutdown, "EOS"));
}
EventView::FlushStop(_) => {
self.0.lock().await.is_flushing = false;
}
EventView::Segment(evt) => {
if let Some(time_seg) = evt.segment().downcast_ref::<gst::ClockTime>() {
self.0.lock().await.segment_start = time_seg.start();
}
}
EventView::SinkMessage(evt) => {
let _ = elem.post_message(evt.message());
}
_ => (),
}
true
}
.boxed()
}
fn sink_event(self, _pad: &gst::Pad, _imp: &AsyncMutexSink, event: gst::Event) -> bool {
if let EventView::FlushStart(..) = event.view() {
block_on_or_add_sub_task(async move { self.0.lock().await.is_flushing = true });
}
true
}
}
impl AsyncPadSinkHandler {
fn prepare(&self, is_main_elem: bool, stats: Option<Stats>) {
futures::executor::block_on(async move {
let mut inner = self.0.lock().await;
inner.is_main_elem = is_main_elem;
inner.stats = stats.map(Box::new);
});
}
fn start(&self) {
futures::executor::block_on(async move {
let mut inner = self.0.lock().await;
inner.is_flushing = false;
inner.last_dts = None;
if let Some(stats) = inner.stats.as_mut() {
stats.start();
}
});
}
fn stop(&self) {
futures::executor::block_on(async move {
let mut inner = self.0.lock().await;
inner.is_flushing = true;
});
}
}
#[derive(Debug)]
pub struct AsyncMutexSink {
sink_pad: PadSink,
sink_pad_handler: AsyncPadSinkHandler,
settings: Mutex<Settings>,
}
impl AsyncMutexSink {
fn prepare(&self) -> Result<(), gst::ErrorMessage> {
let settings = self.settings.lock().unwrap();
debug_or_trace!(CAT, settings.is_main_elem, imp: self, "Preparing");
let stats = if settings.logs_stats {
Some(Stats::new(
settings.max_buffers,
settings.push_period + settings.context_wait / 2,
))
} else {
None
};
self.sink_pad_handler.prepare(settings.is_main_elem, stats);
debug_or_trace!(CAT, settings.is_main_elem, imp: self, "Prepared");
Ok(())
}
fn stop(&self) -> Result<(), gst::ErrorMessage> {
let is_main_elem = self.settings.lock().unwrap().is_main_elem;
debug_or_trace!(CAT, is_main_elem, imp: self, "Stopping");
self.sink_pad_handler.stop();
debug_or_trace!(CAT, is_main_elem, imp: self, "Stopped");
Ok(())
}
fn start(&self) -> Result<(), gst::ErrorMessage> {
let is_main_elem = self.settings.lock().unwrap().is_main_elem;
debug_or_trace!(CAT, is_main_elem, imp: self, "Starting");
self.sink_pad_handler.start();
debug_or_trace!(CAT, is_main_elem, imp: self, "Started");
Ok(())
}
}
#[glib::object_subclass]
impl ObjectSubclass for AsyncMutexSink {
const NAME: &'static str = "TsStandaloneAsyncMutexSink";
type Type = super::AsyncMutexSink;
type ParentType = gst::Element;
fn with_class(klass: &Self::Class) -> Self {
let sink_pad_handler = AsyncPadSinkHandler::default();
Self {
sink_pad: PadSink::new(
gst::Pad::from_template(&klass.pad_template("sink").unwrap(), Some("sink")),
sink_pad_handler.clone(),
),
sink_pad_handler,
settings: Default::default(),
}
}
}
impl ObjectImpl for AsyncMutexSink {
fn properties() -> &'static [glib::ParamSpec] {
static PROPERTIES: Lazy<Vec<glib::ParamSpec>> = Lazy::new(Settings::properties);
PROPERTIES.as_ref()
}
fn set_property(&self, id: usize, value: &glib::Value, pspec: &glib::ParamSpec) {
self.settings.lock().unwrap().set_property(id, value, pspec);
}
fn property(&self, id: usize, pspec: &glib::ParamSpec) -> glib::Value {
self.settings.lock().unwrap().property(id, pspec)
}
fn constructed(&self) {
self.parent_constructed();
let obj = self.obj();
obj.add_pad(self.sink_pad.gst_pad()).unwrap();
obj.set_element_flags(gst::ElementFlags::SINK);
}
}
impl GstObjectImpl for AsyncMutexSink {}
impl ElementImpl for AsyncMutexSink {
fn metadata() -> Option<&'static gst::subclass::ElementMetadata> {
static ELEMENT_METADATA: Lazy<gst::subclass::ElementMetadata> = Lazy::new(|| {
gst::subclass::ElementMetadata::new(
"Thread-sharing standalone test async mutex sink",
"Sink/Test",
"Thread-sharing standalone test async mutex sink",
"François Laignel <fengalin@free.fr>",
)
});
Some(&*ELEMENT_METADATA)
}
fn pad_templates() -> &'static [gst::PadTemplate] {
static PAD_TEMPLATES: Lazy<Vec<gst::PadTemplate>> = Lazy::new(|| {
let caps = gst::Caps::new_any();
let sink_pad_template = gst::PadTemplate::new(
"sink",
gst::PadDirection::Sink,
gst::PadPresence::Always,
&caps,
)
.unwrap();
vec![sink_pad_template]
});
PAD_TEMPLATES.as_ref()
}
fn change_state(
&self,
transition: gst::StateChange,
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
gst::trace!(CAT, imp: self, "Changing state {transition:?}");
match transition {
gst::StateChange::NullToReady => {
self.prepare().map_err(|err| {
self.post_error_message(err);
gst::StateChangeError
})?;
}
gst::StateChange::ReadyToPaused => {
self.start().map_err(|_| gst::StateChangeError)?;
}
gst::StateChange::PausedToReady => {
self.stop().map_err(|_| gst::StateChangeError)?;
}
_ => (),
}
self.parent_change_state(transition)
}
}

View file

@ -0,0 +1,17 @@
use gst::glib;
use gst::prelude::*;
mod imp;
glib::wrapper! {
pub struct AsyncMutexSink(ObjectSubclass<imp::AsyncMutexSink>) @extends gst::Element, gst::Object;
}
pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
gst::Element::register(
Some(plugin),
super::ASYNC_MUTEX_ELEMENT_NAME,
gst::Rank::None,
AsyncMutexSink::static_type(),
)
}

View file

@ -1 +1,22 @@
pub mod async_mutex;
pub mod sync_mutex;
pub mod task;
mod settings;
pub use settings::Settings;
mod stats;
pub use stats::Stats;
pub const ASYNC_MUTEX_ELEMENT_NAME: &str = "ts-standalone-async-mutex-sink";
pub const SYNC_MUTEX_ELEMENT_NAME: &str = "ts-standalone-sync-mutex-sink";
pub const TASK_ELEMENT_NAME: &str = "ts-standalone-task-sink";
use once_cell::sync::Lazy;
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
gst::DebugCategory::new(
"ts-standalone-sink",
gst::DebugColorFlags::empty(),
Some("Thread-sharing standalone test sink"),
)
});

View file

@ -0,0 +1,115 @@
use gst::glib;
use gst::prelude::*;
use std::time::Duration;
const DEFAULT_CONTEXT: &str = "";
const DEFAULT_CONTEXT_WAIT: Duration = Duration::from_millis(20);
const DEFAULT_PUSH_PERIOD: Duration = Duration::from_millis(20);
const DEFAULT_MAX_BUFFERS: i32 = 50 * (100 - 25);
#[derive(Debug, Clone)]
pub struct Settings {
pub context: String,
pub context_wait: Duration,
pub is_main_elem: bool,
pub logs_stats: bool,
pub push_period: Duration,
pub max_buffers: Option<u32>,
}
impl Default for Settings {
fn default() -> Self {
Settings {
context: DEFAULT_CONTEXT.into(),
context_wait: DEFAULT_CONTEXT_WAIT,
is_main_elem: false,
logs_stats: false,
push_period: DEFAULT_PUSH_PERIOD,
max_buffers: Some(DEFAULT_MAX_BUFFERS as u32),
}
}
}
impl Settings {
pub fn properties() -> Vec<glib::ParamSpec> {
vec![
glib::ParamSpecString::builder("context")
.nick("Context")
.blurb("Context name to share threads with")
.default_value(Some(DEFAULT_CONTEXT))
.build(),
glib::ParamSpecUInt::builder("context-wait")
.nick("Context Wait")
.blurb("Throttle poll loop to run at most once every this many ms")
.maximum(1000)
.default_value(DEFAULT_CONTEXT_WAIT.as_millis() as u32)
.build(),
glib::ParamSpecBoolean::builder("main-elem")
.nick("Main Element")
.blurb("Declare this element as the main one")
.write_only()
.build(),
glib::ParamSpecBoolean::builder("logs-stats")
.nick("Logs Stats")
.blurb("Whether statistics should be logged")
.write_only()
.build(),
glib::ParamSpecUInt::builder("push-period")
.nick("Src buffer Push Period")
.blurb("Push period used by `src` element (used for stats warnings)")
.default_value(DEFAULT_PUSH_PERIOD.as_millis() as u32)
.build(),
glib::ParamSpecInt::builder("max-buffers")
.nick("Max Buffers")
.blurb("Number of buffers to count before stopping stats (-1 = unlimited)")
.minimum(-1i32)
.default_value(DEFAULT_MAX_BUFFERS)
.build(),
]
}
pub fn set_property(&mut self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) {
match pspec.name() {
"context" => {
self.context = value
.get::<Option<String>>()
.unwrap()
.unwrap_or_else(|| DEFAULT_CONTEXT.into());
}
"context-wait" => {
self.context_wait = Duration::from_millis(value.get::<u32>().unwrap().into());
}
"main-elem" => {
self.is_main_elem = value.get::<bool>().unwrap();
}
"logs-stats" => {
let logs_stats = value.get().unwrap();
self.logs_stats = logs_stats;
}
"push-period" => {
self.push_period = Duration::from_millis(value.get::<u32>().unwrap().into());
}
"max-buffers" => {
let value = value.get::<i32>().unwrap();
self.max_buffers = if value > 0 { Some(value as u32) } else { None };
}
_ => unimplemented!(),
}
}
pub fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
match pspec.name() {
"context" => self.context.to_value(),
"context-wait" => (self.context_wait.as_millis() as u32).to_value(),
"main-elem" => self.is_main_elem.to_value(),
"push-period" => (self.push_period.as_millis() as u32).to_value(),
"max-buffers" => self
.max_buffers
.and_then(|val| val.try_into().ok())
.unwrap_or(-1i32)
.to_value(),
_ => unimplemented!(),
}
}
}

View file

@ -0,0 +1,270 @@
use gst::prelude::*;
use std::time::{Duration, Instant};
#[cfg(feature = "tuning")]
use gstthreadshare::runtime::Context;
use super::CAT;
const LOG_PERIOD: Duration = Duration::from_secs(20);
#[derive(Debug, Default)]
pub struct Stats {
ramp_up_instant: Option<Instant>,
log_start_instant: Option<Instant>,
last_delta_instant: Option<Instant>,
max_buffers: Option<f32>,
buffer_count: f32,
buffer_count_delta: f32,
latency_sum: f32,
latency_square_sum: f32,
latency_sum_delta: f32,
latency_square_sum_delta: f32,
latency_min: Duration,
latency_min_delta: Duration,
latency_max: Duration,
latency_max_delta: Duration,
interval_sum: f32,
interval_square_sum: f32,
interval_sum_delta: f32,
interval_square_sum_delta: f32,
interval_min: Duration,
interval_min_delta: Duration,
interval_max: Duration,
interval_max_delta: Duration,
interval_late_warn: Duration,
interval_late_count: f32,
interval_late_count_delta: f32,
#[cfg(feature = "tuning")]
parked_duration_init: Duration,
}
impl Stats {
pub fn new(max_buffers: Option<u32>, interval_late_warn: Duration) -> Self {
Stats {
max_buffers: max_buffers.map(|max_buffers| max_buffers as f32),
interval_late_warn,
..Default::default()
}
}
pub fn start(&mut self) {
self.buffer_count = 0.0;
self.buffer_count_delta = 0.0;
self.latency_sum = 0.0;
self.latency_square_sum = 0.0;
self.latency_sum_delta = 0.0;
self.latency_square_sum_delta = 0.0;
self.latency_min = Duration::MAX;
self.latency_min_delta = Duration::MAX;
self.latency_max = Duration::ZERO;
self.latency_max_delta = Duration::ZERO;
self.interval_sum = 0.0;
self.interval_square_sum = 0.0;
self.interval_sum_delta = 0.0;
self.interval_square_sum_delta = 0.0;
self.interval_min = Duration::MAX;
self.interval_min_delta = Duration::MAX;
self.interval_max = Duration::ZERO;
self.interval_max_delta = Duration::ZERO;
self.interval_late_count = 0.0;
self.interval_late_count_delta = 0.0;
self.last_delta_instant = None;
self.log_start_instant = None;
self.ramp_up_instant = Some(Instant::now());
gst::info!(CAT, "First stats logs in {:2?}", 2 * LOG_PERIOD);
}
pub fn is_active(&mut self) -> bool {
if let Some(ramp_up_instant) = self.ramp_up_instant {
if ramp_up_instant.elapsed() < LOG_PERIOD {
return false;
}
self.ramp_up_instant = None;
gst::info!(CAT, "Ramp up complete. Stats logs in {:2?}", LOG_PERIOD);
self.log_start_instant = Some(Instant::now());
self.last_delta_instant = self.log_start_instant;
#[cfg(feature = "tuning")]
{
self.parked_duration_init = Context::current().unwrap().parked_duration();
}
}
use std::cmp::Ordering::*;
match self.max_buffers.opt_cmp(self.buffer_count) {
Some(Equal) => {
self.log_global();
self.buffer_count += 1.0;
false
}
Some(Less) => false,
_ => true,
}
}
pub fn add_buffer(&mut self, latency: Duration, interval: Duration) {
if !self.is_active() {
return;
}
self.buffer_count += 1.0;
self.buffer_count_delta += 1.0;
// Latency
let latency_f32 = latency.as_nanos() as f32;
let latency_square = latency_f32.powi(2);
self.latency_sum += latency_f32;
self.latency_square_sum += latency_square;
self.latency_min = self.latency_min.min(latency);
self.latency_max = self.latency_max.max(latency);
self.latency_sum_delta += latency_f32;
self.latency_square_sum_delta += latency_square;
self.latency_min_delta = self.latency_min_delta.min(latency);
self.latency_max_delta = self.latency_max_delta.max(latency);
// Interval
let interval_f32 = interval.as_nanos() as f32;
let interval_square = interval_f32.powi(2);
self.interval_sum += interval_f32;
self.interval_square_sum += interval_square;
self.interval_min = self.interval_min.min(interval);
self.interval_max = self.interval_max.max(interval);
self.interval_sum_delta += interval_f32;
self.interval_square_sum_delta += interval_square;
self.interval_min_delta = self.interval_min_delta.min(interval);
self.interval_max_delta = self.interval_max_delta.max(interval);
if interval > self.interval_late_warn {
self.interval_late_count += 1.0;
self.interval_late_count_delta += 1.0;
}
let delta_duration = match self.last_delta_instant {
Some(last_delta) => last_delta.elapsed(),
None => return,
};
if delta_duration < LOG_PERIOD {
return;
}
self.last_delta_instant = Some(Instant::now());
gst::info!(CAT, "Delta stats:");
let interval_mean = self.interval_sum_delta / self.buffer_count_delta;
let interval_std_dev = f32::sqrt(
self.interval_square_sum_delta / self.buffer_count_delta - interval_mean.powi(2),
);
gst::info!(
CAT,
"o interval: mean {:4.2?} σ {:4.1?} [{:4.1?}, {:4.1?}]",
Duration::from_nanos(interval_mean as u64),
Duration::from_nanos(interval_std_dev as u64),
self.interval_min_delta,
self.interval_max_delta,
);
if self.interval_late_count_delta > f32::EPSILON {
gst::warning!(
CAT,
"o {:5.2}% late buffers",
100f32 * self.interval_late_count_delta / self.buffer_count_delta
);
}
self.interval_sum_delta = 0.0;
self.interval_square_sum_delta = 0.0;
self.interval_min_delta = Duration::MAX;
self.interval_max_delta = Duration::ZERO;
self.interval_late_count_delta = 0.0;
let latency_mean = self.latency_sum_delta / self.buffer_count_delta;
let latency_std_dev = f32::sqrt(
self.latency_square_sum_delta / self.buffer_count_delta - latency_mean.powi(2),
);
gst::info!(
CAT,
"o latency: mean {:4.2?} σ {:4.1?} [{:4.1?}, {:4.1?}]",
Duration::from_nanos(latency_mean as u64),
Duration::from_nanos(latency_std_dev as u64),
self.latency_min_delta,
self.latency_max_delta,
);
self.latency_sum_delta = 0.0;
self.latency_square_sum_delta = 0.0;
self.latency_min_delta = Duration::MAX;
self.latency_max_delta = Duration::ZERO;
self.buffer_count_delta = 0.0;
}
pub fn log_global(&mut self) {
if self.buffer_count < 1.0 {
return;
}
let _log_start = if let Some(log_start) = self.log_start_instant {
log_start
} else {
return;
};
gst::info!(CAT, "Global stats:");
#[cfg(feature = "tuning")]
{
let duration = _log_start.elapsed();
let parked_duration =
Context::current().unwrap().parked_duration() - self.parked_duration_init;
gst::info!(
CAT,
"o parked: {parked_duration:4.2?} ({:5.2?}%)",
(parked_duration.as_nanos() as f32 * 100.0 / duration.as_nanos() as f32)
);
}
let interval_mean = self.interval_sum / self.buffer_count;
let interval_std_dev =
f32::sqrt(self.interval_square_sum / self.buffer_count - interval_mean.powi(2));
gst::info!(
CAT,
"o interval: mean {:4.2?} σ {:4.1?} [{:4.1?}, {:4.1?}]",
Duration::from_nanos(interval_mean as u64),
Duration::from_nanos(interval_std_dev as u64),
self.interval_min,
self.interval_max,
);
if self.interval_late_count > f32::EPSILON {
gst::warning!(
CAT,
"o {:5.2}% late buffers",
100f32 * self.interval_late_count / self.buffer_count
);
}
let latency_mean = self.latency_sum / self.buffer_count;
let latency_std_dev =
f32::sqrt(self.latency_square_sum / self.buffer_count - latency_mean.powi(2));
gst::info!(
CAT,
"o latency: mean {:4.2?} σ {:4.1?} [{:4.1?}, {:4.1?}]",
Duration::from_nanos(latency_mean as u64),
Duration::from_nanos(latency_std_dev as u64),
self.latency_min,
self.latency_max,
);
}
}

View file

@ -0,0 +1,327 @@
// Copyright (C) 2022 François Laignel <fengalin@free.fr>
//
// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
// If a copy of the MPL was not distributed with this file, You can obtain one at
// <https://mozilla.org/MPL/2.0/>.
//
// SPDX-License-Identifier: MPL-2.0
use futures::future::BoxFuture;
use futures::prelude::*;
use gst::glib;
use gst::prelude::*;
use gst::subclass::prelude::*;
use gst::EventView;
use once_cell::sync::Lazy;
use gstthreadshare::runtime::{prelude::*, PadSink};
use std::sync::{Arc, Mutex};
use std::time::Duration;
use super::super::{Settings, Stats, CAT};
#[derive(Debug, Default)]
struct PadSinkHandlerInner {
is_flushing: bool,
is_main_elem: bool,
last_dts: Option<gst::ClockTime>,
segment_start: Option<gst::ClockTime>,
stats: Option<Box<Stats>>,
}
impl PadSinkHandlerInner {
fn handle_buffer(
&mut self,
elem: &super::DirectSink,
buffer: gst::Buffer,
) -> Result<(), gst::FlowError> {
if self.is_flushing {
log_or_trace!(
CAT,
self.is_main_elem,
obj: elem,
"Discarding {buffer:?} (flushing)"
);
return Err(gst::FlowError::Flushing);
}
debug_or_trace!(CAT, self.is_main_elem, obj: elem, "Received {buffer:?}");
let dts = buffer
.dts()
.expect("Buffer without dts")
.checked_sub(self.segment_start.expect("Buffer without Time Segment"))
.expect("dts before Segment start");
if let Some(last_dts) = self.last_dts {
let cur_ts = elem.current_running_time().unwrap();
let latency: Duration = (cur_ts - dts).into();
let interval: Duration = (dts - last_dts).into();
if let Some(stats) = self.stats.as_mut() {
stats.add_buffer(latency, interval);
}
debug_or_trace!(CAT, self.is_main_elem, obj: elem, "o latency {latency:.2?}");
debug_or_trace!(
CAT,
self.is_main_elem,
obj: elem,
"o interval {interval:.2?}",
);
}
self.last_dts = Some(dts);
log_or_trace!(CAT, self.is_main_elem, obj: elem, "Buffer processed");
Ok(())
}
}
#[derive(Clone, Debug, Default)]
struct SyncPadSinkHandler(Arc<Mutex<PadSinkHandlerInner>>);
impl PadSinkHandler for SyncPadSinkHandler {
type ElementImpl = DirectSink;
fn sink_chain(
self,
_pad: gst::Pad,
elem: super::DirectSink,
buffer: gst::Buffer,
) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> {
async move {
if self.0.lock().unwrap().handle_buffer(&elem, buffer).is_err() {
return Err(gst::FlowError::Flushing);
}
Ok(gst::FlowSuccess::Ok)
}
.boxed()
}
fn sink_event_serialized(
self,
_pad: gst::Pad,
elem: super::DirectSink,
event: gst::Event,
) -> BoxFuture<'static, bool> {
async move {
match event.view() {
EventView::Eos(_) => {
{
let mut inner = self.0.lock().unwrap();
debug_or_trace!(CAT, inner.is_main_elem, obj: elem, "EOS");
inner.is_flushing = true;
}
// When each element sends its own EOS message,
// it takes ages for the pipeline to process all of them.
// Let's just post an error message and let main shuts down
// after all streams have posted this message.
let _ = elem
.post_message(gst::message::Error::new(gst::LibraryError::Shutdown, "EOS"));
}
EventView::FlushStop(_) => {
self.0.lock().unwrap().is_flushing = false;
}
EventView::Segment(evt) => {
if let Some(time_seg) = evt.segment().downcast_ref::<gst::ClockTime>() {
self.0.lock().unwrap().segment_start = time_seg.start();
}
}
EventView::SinkMessage(evt) => {
let _ = elem.post_message(evt.message());
}
_ => (),
}
true
}
.boxed()
}
fn sink_event(self, _pad: &gst::Pad, _imp: &DirectSink, event: gst::Event) -> bool {
if let EventView::FlushStart(..) = event.view() {
self.0.lock().unwrap().is_flushing = true;
}
true
}
}
impl SyncPadSinkHandler {
fn prepare(&self, is_main_elem: bool, stats: Option<Stats>) {
let mut inner = self.0.lock().unwrap();
inner.is_main_elem = is_main_elem;
inner.stats = stats.map(Box::new);
}
fn start(&self) {
let mut inner = self.0.lock().unwrap();
inner.is_flushing = false;
inner.last_dts = None;
if let Some(stats) = inner.stats.as_mut() {
stats.start();
}
}
fn stop(&self) {
let mut inner = self.0.lock().unwrap();
inner.is_flushing = true;
}
}
#[derive(Debug)]
pub struct DirectSink {
sink_pad: PadSink,
sink_pad_handler: SyncPadSinkHandler,
settings: Mutex<Settings>,
}
impl DirectSink {
fn prepare(&self) -> Result<(), gst::ErrorMessage> {
let settings = self.settings.lock().unwrap();
debug_or_trace!(CAT, settings.is_main_elem, imp: self, "Preparing");
let stats = if settings.logs_stats {
Some(Stats::new(
settings.max_buffers,
settings.push_period + settings.context_wait / 2,
))
} else {
None
};
self.sink_pad_handler.prepare(settings.is_main_elem, stats);
debug_or_trace!(CAT, settings.is_main_elem, imp: self, "Prepared");
Ok(())
}
fn stop(&self) -> Result<(), gst::ErrorMessage> {
let is_main_elem = self.settings.lock().unwrap().is_main_elem;
debug_or_trace!(CAT, is_main_elem, imp: self, "Stopping");
self.sink_pad_handler.stop();
debug_or_trace!(CAT, is_main_elem, imp: self, "Stopped");
Ok(())
}
fn start(&self) -> Result<(), gst::ErrorMessage> {
let is_main_elem = self.settings.lock().unwrap().is_main_elem;
debug_or_trace!(CAT, is_main_elem, imp: self, "Starting");
self.sink_pad_handler.start();
debug_or_trace!(CAT, is_main_elem, imp: self, "Started");
Ok(())
}
}
#[glib::object_subclass]
impl ObjectSubclass for DirectSink {
const NAME: &'static str = "TsStandaloneDirectSink";
type Type = super::DirectSink;
type ParentType = gst::Element;
fn with_class(klass: &Self::Class) -> Self {
let sink_pad_handler = SyncPadSinkHandler::default();
Self {
sink_pad: PadSink::new(
gst::Pad::from_template(&klass.pad_template("sink").unwrap(), Some("sink")),
sink_pad_handler.clone(),
),
sink_pad_handler,
settings: Default::default(),
}
}
}
impl ObjectImpl for DirectSink {
fn properties() -> &'static [glib::ParamSpec] {
static PROPERTIES: Lazy<Vec<glib::ParamSpec>> = Lazy::new(Settings::properties);
PROPERTIES.as_ref()
}
fn set_property(&self, id: usize, value: &glib::Value, pspec: &glib::ParamSpec) {
self.settings.lock().unwrap().set_property(id, value, pspec);
}
fn property(&self, id: usize, pspec: &glib::ParamSpec) -> glib::Value {
self.settings.lock().unwrap().property(id, pspec)
}
fn constructed(&self) {
self.parent_constructed();
let obj = self.obj();
obj.add_pad(self.sink_pad.gst_pad()).unwrap();
obj.set_element_flags(gst::ElementFlags::SINK);
}
}
impl GstObjectImpl for DirectSink {}
impl ElementImpl for DirectSink {
fn metadata() -> Option<&'static gst::subclass::ElementMetadata> {
static ELEMENT_METADATA: Lazy<gst::subclass::ElementMetadata> = Lazy::new(|| {
gst::subclass::ElementMetadata::new(
"Thread-sharing standalone test direct sink",
"Sink/Test",
"Thread-sharing standalone test direct sink",
"François Laignel <fengalin@free.fr>",
)
});
Some(&*ELEMENT_METADATA)
}
fn pad_templates() -> &'static [gst::PadTemplate] {
static PAD_TEMPLATES: Lazy<Vec<gst::PadTemplate>> = Lazy::new(|| {
let caps = gst::Caps::new_any();
let sink_pad_template = gst::PadTemplate::new(
"sink",
gst::PadDirection::Sink,
gst::PadPresence::Always,
&caps,
)
.unwrap();
vec![sink_pad_template]
});
PAD_TEMPLATES.as_ref()
}
fn change_state(
&self,
transition: gst::StateChange,
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
gst::trace!(CAT, imp: self, "Changing state {transition:?}");
match transition {
gst::StateChange::NullToReady => {
self.prepare().map_err(|err| {
self.post_error_message(err);
gst::StateChangeError
})?;
}
gst::StateChange::ReadyToPaused => {
self.start().map_err(|_| gst::StateChangeError)?;
}
gst::StateChange::PausedToReady => {
self.stop().map_err(|_| gst::StateChangeError)?;
}
_ => (),
}
self.parent_change_state(transition)
}
}

View file

@ -0,0 +1,17 @@
use gst::glib;
use gst::prelude::*;
mod imp;
glib::wrapper! {
pub struct DirectSink(ObjectSubclass<imp::DirectSink>) @extends gst::Element, gst::Object;
}
pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
gst::Element::register(
Some(plugin),
super::SYNC_MUTEX_ELEMENT_NAME,
gst::Rank::None,
DirectSink::static_type(),
)
}

View file

@ -8,7 +8,6 @@
use futures::future::BoxFuture;
use futures::prelude::*;
use futures::stream::Peekable;
use gst::error_msg;
use gst::glib;
@ -22,46 +21,9 @@ use gstthreadshare::runtime::prelude::*;
use gstthreadshare::runtime::{Context, PadSink, Task};
use std::sync::Mutex;
use std::task::Poll;
use std::time::{Duration, Instant};
use std::time::Duration;
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
gst::DebugCategory::new(
"ts-standalone-test-sink",
gst::DebugColorFlags::empty(),
Some("Thread-sharing standalone test sink"),
)
});
const DEFAULT_CONTEXT: &str = "";
const DEFAULT_CONTEXT_WAIT: Duration = Duration::from_millis(20);
const DEFAULT_PUSH_PERIOD: Duration = Duration::from_millis(20);
const DEFAULT_MAX_BUFFERS: i32 = 50 * (100 - 25);
const LOG_PERIOD: Duration = Duration::from_secs(20);
#[derive(Debug, Clone)]
struct Settings {
context: String,
context_wait: Duration,
raise_log_level: bool,
logs_stats: bool,
push_period: Duration,
max_buffers: Option<u32>,
}
impl Default for Settings {
fn default() -> Self {
Settings {
context: DEFAULT_CONTEXT.into(),
context_wait: DEFAULT_CONTEXT_WAIT,
raise_log_level: false,
logs_stats: false,
push_period: DEFAULT_PUSH_PERIOD,
max_buffers: Some(DEFAULT_MAX_BUFFERS as u32),
}
}
}
use super::super::{Settings, Stats, CAT};
#[derive(Debug)]
enum StreamItem {
@ -70,21 +32,20 @@ enum StreamItem {
}
#[derive(Clone, Debug)]
struct TestSinkPadHandler;
struct TaskPadSinkHandler;
impl PadSinkHandler for TestSinkPadHandler {
type ElementImpl = TestSink;
impl PadSinkHandler for TaskPadSinkHandler {
type ElementImpl = TaskSink;
fn sink_chain(
self,
_pad: gst::Pad,
elem: super::TestSink,
elem: super::TaskSink,
buffer: gst::Buffer,
) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> {
let sender = elem.imp().clone_item_sender();
async move {
if sender.send_async(StreamItem::Buffer(buffer)).await.is_err() {
gst::debug!(CAT, obj: elem, "Flushing");
return Err(gst::FlowError::Flushing);
}
@ -93,39 +54,37 @@ impl PadSinkHandler for TestSinkPadHandler {
.boxed()
}
fn sink_chain_list(
self,
_pad: gst::Pad,
elem: super::TestSink,
list: gst::BufferList,
) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> {
let sender = elem.imp().clone_item_sender();
async move {
for buffer in list.iter_owned() {
if sender.send_async(StreamItem::Buffer(buffer)).await.is_err() {
gst::debug!(CAT, obj: elem, "Flushing");
return Err(gst::FlowError::Flushing);
}
}
Ok(gst::FlowSuccess::Ok)
}
.boxed()
}
fn sink_event_serialized(
self,
_pad: gst::Pad,
elem: super::TestSink,
elem: super::TaskSink,
event: gst::Event,
) -> BoxFuture<'static, bool> {
let sender = elem.imp().clone_item_sender();
async move {
if let EventView::FlushStop(_) = event.view() {
let imp = elem.imp();
return imp.task.flush_stop().await_maybe_on_context().is_ok();
} else if sender.send_async(StreamItem::Event(event)).await.is_err() {
gst::debug!(CAT, obj: elem, "Flushing");
match event.view() {
EventView::Segment(_) => {
let _ = sender.send_async(StreamItem::Event(event)).await;
}
EventView::Eos(_) => {
let is_main_elem = elem.imp().settings.lock().unwrap().is_main_elem;
debug_or_trace!(CAT, is_main_elem, obj: elem, "EOS");
// When each element sends its own EOS message,
// it takes ages for the pipeline to process all of them.
// Let's just post an error message and let main shuts down
// after all streams have posted this message.
let _ = elem
.post_message(gst::message::Error::new(gst::LibraryError::Shutdown, "EOS"));
}
EventView::FlushStop(_) => {
let imp = elem.imp();
return imp.task.flush_stop().await_maybe_on_context().is_ok();
}
EventView::SinkMessage(evt) => {
let _ = elem.post_message(evt.message());
}
_ => (),
}
true
@ -133,7 +92,7 @@ impl PadSinkHandler for TestSinkPadHandler {
.boxed()
}
fn sink_event(self, _pad: &gst::Pad, imp: &TestSink, event: gst::Event) -> bool {
fn sink_event(self, _pad: &gst::Pad, imp: &TaskSink, event: gst::Event) -> bool {
if let EventView::FlushStart(..) = event.view() {
return imp.task.flush_start().await_maybe_on_context().is_ok();
}
@ -142,329 +101,54 @@ impl PadSinkHandler for TestSinkPadHandler {
}
}
#[derive(Default)]
struct Stats {
must_log: bool,
ramp_up_instant: Option<Instant>,
log_start_instant: Option<Instant>,
last_delta_instant: Option<Instant>,
max_buffers: Option<f32>,
buffer_count: f32,
buffer_count_delta: f32,
latency_sum: f32,
latency_square_sum: f32,
latency_sum_delta: f32,
latency_square_sum_delta: f32,
latency_min: Duration,
latency_min_delta: Duration,
latency_max: Duration,
latency_max_delta: Duration,
interval_sum: f32,
interval_square_sum: f32,
interval_sum_delta: f32,
interval_square_sum_delta: f32,
interval_min: Duration,
interval_min_delta: Duration,
interval_max: Duration,
interval_max_delta: Duration,
interval_late_warn: Duration,
interval_late_count: f32,
interval_late_count_delta: f32,
#[cfg(feature = "tuning")]
parked_duration_init: Duration,
}
impl Stats {
fn start(&mut self) {
if !self.must_log {
return;
}
self.buffer_count = 0.0;
self.buffer_count_delta = 0.0;
self.latency_sum = 0.0;
self.latency_square_sum = 0.0;
self.latency_sum_delta = 0.0;
self.latency_square_sum_delta = 0.0;
self.latency_min = Duration::MAX;
self.latency_min_delta = Duration::MAX;
self.latency_max = Duration::ZERO;
self.latency_max_delta = Duration::ZERO;
self.interval_sum = 0.0;
self.interval_square_sum = 0.0;
self.interval_sum_delta = 0.0;
self.interval_square_sum_delta = 0.0;
self.interval_min = Duration::MAX;
self.interval_min_delta = Duration::MAX;
self.interval_max = Duration::ZERO;
self.interval_max_delta = Duration::ZERO;
self.interval_late_count = 0.0;
self.interval_late_count_delta = 0.0;
self.last_delta_instant = None;
self.log_start_instant = None;
self.ramp_up_instant = Some(Instant::now());
gst::info!(CAT, "First stats logs in {:2?}", 2 * LOG_PERIOD);
}
fn is_active(&mut self) -> bool {
if !self.must_log {
return false;
}
if let Some(ramp_up_instant) = self.ramp_up_instant {
if ramp_up_instant.elapsed() < LOG_PERIOD {
return false;
}
self.ramp_up_instant = None;
gst::info!(CAT, "Ramp up complete. Stats logs in {:2?}", LOG_PERIOD);
self.log_start_instant = Some(Instant::now());
self.last_delta_instant = self.log_start_instant;
#[cfg(feature = "tuning")]
{
self.parked_duration_init = Context::current().unwrap().parked_duration();
}
}
use std::cmp::Ordering::*;
match self.max_buffers.opt_cmp(self.buffer_count) {
Some(Equal) => {
self.log_global();
self.buffer_count += 1.0;
false
}
Some(Less) => false,
_ => true,
}
}
fn add_buffer(&mut self, latency: Duration, interval: Duration) {
if !self.is_active() {
return;
}
self.buffer_count += 1.0;
self.buffer_count_delta += 1.0;
// Latency
let latency_f32 = latency.as_nanos() as f32;
let latency_square = latency_f32.powi(2);
self.latency_sum += latency_f32;
self.latency_square_sum += latency_square;
self.latency_min = self.latency_min.min(latency);
self.latency_max = self.latency_max.max(latency);
self.latency_sum_delta += latency_f32;
self.latency_square_sum_delta += latency_square;
self.latency_min_delta = self.latency_min_delta.min(latency);
self.latency_max_delta = self.latency_max_delta.max(latency);
// Interval
let interval_f32 = interval.as_nanos() as f32;
let interval_square = interval_f32.powi(2);
self.interval_sum += interval_f32;
self.interval_square_sum += interval_square;
self.interval_min = self.interval_min.min(interval);
self.interval_max = self.interval_max.max(interval);
self.interval_sum_delta += interval_f32;
self.interval_square_sum_delta += interval_square;
self.interval_min_delta = self.interval_min_delta.min(interval);
self.interval_max_delta = self.interval_max_delta.max(interval);
if interval > self.interval_late_warn {
self.interval_late_count += 1.0;
self.interval_late_count_delta += 1.0;
}
let delta_duration = match self.last_delta_instant {
Some(last_delta) => last_delta.elapsed(),
None => return,
};
if delta_duration < LOG_PERIOD {
return;
}
self.last_delta_instant = Some(Instant::now());
gst::info!(CAT, "Delta stats:");
let interval_mean = self.interval_sum_delta / self.buffer_count_delta;
let interval_std_dev = f32::sqrt(
self.interval_square_sum_delta / self.buffer_count_delta - interval_mean.powi(2),
);
gst::info!(
CAT,
"o interval: mean {:4.2?} σ {:4.1?} [{:4.1?}, {:4.1?}]",
Duration::from_nanos(interval_mean as u64),
Duration::from_nanos(interval_std_dev as u64),
self.interval_min_delta,
self.interval_max_delta,
);
if self.interval_late_count_delta > f32::EPSILON {
gst::warning!(
CAT,
"o {:5.2}% late buffers",
100f32 * self.interval_late_count_delta / self.buffer_count_delta
);
}
self.interval_sum_delta = 0.0;
self.interval_square_sum_delta = 0.0;
self.interval_min_delta = Duration::MAX;
self.interval_max_delta = Duration::ZERO;
self.interval_late_count_delta = 0.0;
let latency_mean = self.latency_sum_delta / self.buffer_count_delta;
let latency_std_dev = f32::sqrt(
self.latency_square_sum_delta / self.buffer_count_delta - latency_mean.powi(2),
);
gst::info!(
CAT,
"o latency: mean {:4.2?} σ {:4.1?} [{:4.1?}, {:4.1?}]",
Duration::from_nanos(latency_mean as u64),
Duration::from_nanos(latency_std_dev as u64),
self.latency_min_delta,
self.latency_max_delta,
);
self.latency_sum_delta = 0.0;
self.latency_square_sum_delta = 0.0;
self.latency_min_delta = Duration::MAX;
self.latency_max_delta = Duration::ZERO;
self.buffer_count_delta = 0.0;
}
fn log_global(&mut self) {
if self.buffer_count < 1.0 {
return;
}
let _log_start = if let Some(log_start) = self.log_start_instant {
log_start
} else {
return;
};
gst::info!(CAT, "Global stats:");
#[cfg(feature = "tuning")]
{
let duration = _log_start.elapsed();
let parked_duration =
Context::current().unwrap().parked_duration() - self.parked_duration_init;
gst::info!(
CAT,
"o parked: {parked_duration:4.2?} ({:5.2?}%)",
(parked_duration.as_nanos() as f32 * 100.0 / duration.as_nanos() as f32)
);
}
let interval_mean = self.interval_sum / self.buffer_count;
let interval_std_dev =
f32::sqrt(self.interval_square_sum / self.buffer_count - interval_mean.powi(2));
gst::info!(
CAT,
"o interval: mean {:4.2?} σ {:4.1?} [{:4.1?}, {:4.1?}]",
Duration::from_nanos(interval_mean as u64),
Duration::from_nanos(interval_std_dev as u64),
self.interval_min,
self.interval_max,
);
if self.interval_late_count > f32::EPSILON {
gst::warning!(
CAT,
"o {:5.2}% late buffers",
100f32 * self.interval_late_count / self.buffer_count
);
}
let latency_mean = self.latency_sum / self.buffer_count;
let latency_std_dev =
f32::sqrt(self.latency_square_sum / self.buffer_count - latency_mean.powi(2));
gst::info!(
CAT,
"o latency: mean {:4.2?} σ {:4.1?} [{:4.1?}, {:4.1?}]",
Duration::from_nanos(latency_mean as u64),
Duration::from_nanos(latency_std_dev as u64),
self.latency_min,
self.latency_max,
);
}
}
struct TestSinkTask {
element: super::TestSink,
raise_log_level: bool,
struct TaskSinkTask {
elem: super::TaskSink,
item_receiver: flume::Receiver<StreamItem>,
is_main_elem: bool,
last_dts: Option<gst::ClockTime>,
item_receiver: Peekable<flume::r#async::RecvStream<'static, StreamItem>>,
stats: Stats,
segment: Option<gst::Segment>,
segment_start: Option<gst::ClockTime>,
stats: Option<Box<Stats>>,
}
impl TestSinkTask {
fn new(element: &super::TestSink, item_receiver: flume::Receiver<StreamItem>) -> Self {
TestSinkTask {
element: element.clone(),
raise_log_level: false,
impl TaskSinkTask {
fn new(
elem: &super::TaskSink,
item_receiver: flume::Receiver<StreamItem>,
is_main_elem: bool,
stats: Option<Box<Stats>>,
) -> Self {
TaskSinkTask {
elem: elem.clone(),
item_receiver,
is_main_elem,
last_dts: None,
item_receiver: item_receiver.into_stream().peekable(),
stats: Stats::default(),
segment: None,
stats,
segment_start: None,
}
}
async fn flush(&mut self) {
fn flush(&mut self) {
// Purge the channel
while let Poll::Ready(Some(_item)) = futures::poll!(self.item_receiver.next()) {}
while !self.item_receiver.is_empty() {}
}
}
impl TaskImpl for TestSinkTask {
impl TaskImpl for TaskSinkTask {
type Item = StreamItem;
fn prepare(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
let sink = self.element.imp();
let settings = sink.settings.lock().unwrap();
self.raise_log_level = settings.raise_log_level;
if self.raise_log_level {
gst::log!(CAT, obj: self.element, "Preparing Task");
} else {
gst::trace!(CAT, obj: self.element, "Preparing Task");
}
self.stats.must_log = settings.logs_stats;
self.stats.max_buffers = settings.max_buffers.map(|max_buffers| max_buffers as f32);
self.stats.interval_late_warn = settings.push_period + settings.context_wait / 2;
Ok(())
}
.boxed()
log_or_trace!(CAT, self.is_main_elem, obj: self.elem, "Preparing Task");
future::ok(()).boxed()
}
fn start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async {
if self.raise_log_level {
gst::log!(CAT, obj: self.element, "Starting Task");
} else {
gst::trace!(CAT, obj: self.element, "Starting Task");
log_or_trace!(CAT, self.is_main_elem, obj: self.elem, "Starting Task");
self.last_dts = None;
if let Some(stats) = self.stats.as_mut() {
stats.start();
}
self.last_dts = None;
self.stats.start();
Ok(())
}
.boxed()
@ -472,101 +156,66 @@ impl TaskImpl for TestSinkTask {
fn stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async {
if self.raise_log_level {
gst::log!(CAT, obj: self.element, "Stopping Task");
} else {
gst::trace!(CAT, obj: self.element, "Stopping Task");
}
self.flush().await;
log_or_trace!(CAT, self.is_main_elem, obj: self.elem, "Stopping Task");
self.flush();
Ok(())
}
.boxed()
}
fn try_next(&mut self) -> BoxFuture<'_, Result<StreamItem, gst::FlowError>> {
async move {
let item = self.item_receiver.next().await.unwrap();
if self.raise_log_level {
gst::log!(CAT, obj: self.element, "Popped item");
} else {
gst::trace!(CAT, obj: self.element, "Popped item");
}
Ok(item)
}
.boxed()
self.item_receiver
.recv_async()
.map(|opt_item| Ok(opt_item.unwrap()))
.boxed()
}
fn handle_item(&mut self, item: StreamItem) -> BoxFuture<'_, Result<(), gst::FlowError>> {
async move {
if self.raise_log_level {
gst::debug!(CAT, obj: self.element, "Received {:?}", item);
} else {
gst::trace!(CAT, obj: self.element, "Received {:?}", item);
}
debug_or_trace!(CAT, self.is_main_elem, obj: self.elem, "Received {item:?}");
match item {
StreamItem::Buffer(buffer) => {
let dts = self
.segment
.as_ref()
.and_then(|segment| {
segment
.downcast_ref::<gst::format::Time>()
.and_then(|segment| segment.to_running_time(buffer.dts()))
})
.unwrap();
let dts = buffer
.dts()
.expect("Buffer without dts")
.checked_sub(self.segment_start.expect("Buffer without Time Segment"))
.expect("dts before Segment start");
if let Some(last_dts) = self.last_dts {
let cur_ts = self.element.current_running_time().unwrap();
let cur_ts = self.elem.current_running_time().unwrap();
let latency: Duration = (cur_ts - dts).into();
let interval: Duration = (dts - last_dts).into();
self.stats.add_buffer(latency, interval);
if self.raise_log_level {
gst::debug!(CAT, obj: self.element, "o latency {:.2?}", latency);
gst::debug!(CAT, obj: self.element, "o interval {:.2?}", interval);
} else {
gst::trace!(CAT, obj: self.element, "o latency {:.2?}", latency);
gst::trace!(CAT, obj: self.element, "o interval {:.2?}", interval);
if let Some(stats) = self.stats.as_mut() {
stats.add_buffer(latency, interval);
}
debug_or_trace!(
CAT,
self.is_main_elem,
obj: self.elem,
"o latency {latency:.2?}",
);
debug_or_trace!(
CAT,
self.is_main_elem,
obj: self.elem,
"o interval {interval:.2?}",
);
}
self.last_dts = Some(dts);
if self.raise_log_level {
gst::log!(CAT, obj: self.element, "Buffer processed");
} else {
gst::trace!(CAT, obj: self.element, "Buffer processed");
log_or_trace!(CAT, self.is_main_elem, obj: self.elem, "Buffer processed");
}
StreamItem::Event(evt) => {
if let EventView::Segment(evt) = evt.view() {
if let Some(time_seg) = evt.segment().downcast_ref::<gst::ClockTime>() {
self.segment_start = time_seg.start();
}
}
}
StreamItem::Event(event) => match event.view() {
EventView::Eos(_) => {
if self.raise_log_level {
gst::debug!(CAT, obj: self.element, "EOS");
} else {
gst::trace!(CAT, obj: self.element, "EOS");
}
let elem = self.element.clone();
self.element.call_async(move |_| {
let _ =
elem.post_message(gst::message::Eos::builder().src(&elem).build());
});
return Err(gst::FlowError::Eos);
}
EventView::Segment(e) => {
self.segment = Some(e.segment().clone());
}
EventView::SinkMessage(e) => {
let _ = self.element.post_message(e.message());
}
_ => (),
},
}
Ok(())
@ -576,121 +225,88 @@ impl TaskImpl for TestSinkTask {
}
#[derive(Debug)]
pub struct TestSink {
pub struct TaskSink {
sink_pad: PadSink,
task: Task,
item_sender: Mutex<Option<flume::Sender<StreamItem>>>,
settings: Mutex<Settings>,
}
impl TestSink {
impl TaskSink {
#[track_caller]
fn clone_item_sender(&self) -> flume::Sender<StreamItem> {
self.item_sender.lock().unwrap().as_ref().unwrap().clone()
}
fn prepare(&self) -> Result<(), gst::ErrorMessage> {
let raise_log_level = self.settings.lock().unwrap().raise_log_level;
if raise_log_level {
gst::debug!(CAT, imp: self, "Preparing");
let settings = self.settings.lock().unwrap();
let stats = if settings.logs_stats {
Some(Box::new(Stats::new(
settings.max_buffers,
settings.push_period + settings.context_wait / 2,
)))
} else {
gst::trace!(CAT, imp: self, "Preparing");
}
let context = {
let settings = self.settings.lock().unwrap();
Context::acquire(&settings.context, settings.context_wait).map_err(|err| {
error_msg!(
gst::ResourceError::OpenWrite,
["Failed to acquire Context: {}", err]
)
})?
None
};
debug_or_trace!(CAT, settings.is_main_elem, imp: self, "Preparing");
let ts_ctx = Context::acquire(&settings.context, settings.context_wait).map_err(|err| {
error_msg!(
gst::ResourceError::OpenWrite,
["Failed to acquire Context: {}", err]
)
})?;
// Enable backpressure for items
let (item_sender, item_receiver) = flume::bounded(0);
let task_impl = TestSinkTask::new(&self.obj(), item_receiver);
self.task.prepare(task_impl, context).block_on()?;
let task_impl = TaskSinkTask::new(&self.obj(), item_receiver, settings.is_main_elem, stats);
self.task.prepare(task_impl, ts_ctx).block_on()?;
*self.item_sender.lock().unwrap() = Some(item_sender);
if raise_log_level {
gst::debug!(CAT, imp: self, "Prepared");
} else {
gst::trace!(CAT, imp: self, "Prepared");
}
debug_or_trace!(CAT, settings.is_main_elem, imp: self, "Prepared");
Ok(())
}
fn unprepare(&self) {
let raise_log_level = self.settings.lock().unwrap().raise_log_level;
if raise_log_level {
gst::debug!(CAT, imp: self, "Unpreparing");
} else {
gst::trace!(CAT, imp: self, "Unpreparing");
}
let is_main_elem = self.settings.lock().unwrap().is_main_elem;
debug_or_trace!(CAT, is_main_elem, imp: self, "Unpreparing");
self.task.unprepare().block_on().unwrap();
if raise_log_level {
gst::debug!(CAT, imp: self, "Unprepared");
} else {
gst::trace!(CAT, imp: self, "Unprepared");
}
debug_or_trace!(CAT, is_main_elem, imp: self, "Unprepared");
}
fn stop(&self) -> Result<(), gst::ErrorMessage> {
let raise_log_level = self.settings.lock().unwrap().raise_log_level;
if raise_log_level {
gst::debug!(CAT, imp: self, "Stopping");
} else {
gst::trace!(CAT, imp: self, "Stopping");
}
let is_main_elem = self.settings.lock().unwrap().is_main_elem;
debug_or_trace!(CAT, is_main_elem, imp: self, "Stopping");
self.task.stop().block_on()?;
if raise_log_level {
gst::debug!(CAT, imp: self, "Stopped");
} else {
gst::trace!(CAT, imp: self, "Stopped");
}
debug_or_trace!(CAT, is_main_elem, imp: self, "Stopped");
Ok(())
}
fn start(&self) -> Result<(), gst::ErrorMessage> {
let raise_log_level = self.settings.lock().unwrap().raise_log_level;
if raise_log_level {
gst::debug!(CAT, imp: self, "Starting");
} else {
gst::trace!(CAT, imp: self, "Starting");
}
let is_main_elem = self.settings.lock().unwrap().is_main_elem;
debug_or_trace!(CAT, is_main_elem, imp: self, "Starting");
self.task.start().block_on()?;
if raise_log_level {
gst::debug!(CAT, imp: self, "Started");
} else {
gst::trace!(CAT, imp: self, "Started");
}
debug_or_trace!(CAT, is_main_elem, imp: self, "Started");
Ok(())
}
}
#[glib::object_subclass]
impl ObjectSubclass for TestSink {
const NAME: &'static str = "StandaloneTestSink";
type Type = super::TestSink;
impl ObjectSubclass for TaskSink {
const NAME: &'static str = "TsStandaloneTaskSink";
type Type = super::TaskSink;
type ParentType = gst::Element;
fn with_class(klass: &Self::Class) -> Self {
Self {
sink_pad: PadSink::new(
gst::Pad::from_template(&klass.pad_template("sink").unwrap(), Some("sink")),
TestSinkPadHandler,
TaskPadSinkHandler,
),
task: Task::default(),
item_sender: Default::default(),
@ -699,96 +315,18 @@ impl ObjectSubclass for TestSink {
}
}
impl ObjectImpl for TestSink {
impl ObjectImpl for TaskSink {
fn properties() -> &'static [glib::ParamSpec] {
static PROPERTIES: Lazy<Vec<glib::ParamSpec>> = Lazy::new(|| {
vec![
glib::ParamSpecString::builder("context")
.nick("Context")
.blurb("Context name to share threads with")
.default_value(Some(DEFAULT_CONTEXT))
.build(),
glib::ParamSpecUInt::builder("context-wait")
.nick("Context Wait")
.blurb("Throttle poll loop to run at most once every this many ms")
.maximum(1000)
.default_value(DEFAULT_CONTEXT_WAIT.as_millis() as u32)
.build(),
glib::ParamSpecBoolean::builder("raise-log-level")
.nick("Raise log level")
.blurb("Raises the log level so that this element stands out")
.write_only()
.build(),
glib::ParamSpecBoolean::builder("logs-stats")
.nick("Logs Stats")
.blurb("Whether statistics should be logged")
.write_only()
.build(),
glib::ParamSpecUInt::builder("push-period")
.nick("Src buffer Push Period")
.blurb("Push period used by `src` element (used for stats warnings)")
.default_value(DEFAULT_PUSH_PERIOD.as_millis() as u32)
.build(),
glib::ParamSpecInt::builder("max-buffers")
.nick("Max Buffers")
.blurb("Number of buffers to count before stopping stats (-1 = unlimited)")
.minimum(-1i32)
.default_value(DEFAULT_MAX_BUFFERS)
.build(),
]
});
static PROPERTIES: Lazy<Vec<glib::ParamSpec>> = Lazy::new(Settings::properties);
PROPERTIES.as_ref()
}
fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) {
let mut settings = self.settings.lock().unwrap();
match pspec.name() {
"context" => {
settings.context = value
.get::<Option<String>>()
.expect("type checked upstream")
.unwrap_or_else(|| DEFAULT_CONTEXT.into());
}
"context-wait" => {
settings.context_wait = Duration::from_millis(
value.get::<u32>().expect("type checked upstream").into(),
);
}
"raise-log-level" => {
settings.raise_log_level = value.get::<bool>().expect("type checked upstream");
}
"logs-stats" => {
let logs_stats = value.get().expect("type checked upstream");
settings.logs_stats = logs_stats;
}
"push-period" => {
settings.push_period = Duration::from_millis(
value.get::<u32>().expect("type checked upstream").into(),
);
}
"max-buffers" => {
let value = value.get::<i32>().expect("type checked upstream");
settings.max_buffers = if value > 0 { Some(value as u32) } else { None };
}
_ => unimplemented!(),
}
fn set_property(&self, id: usize, value: &glib::Value, pspec: &glib::ParamSpec) {
self.settings.lock().unwrap().set_property(id, value, pspec);
}
fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
let settings = self.settings.lock().unwrap();
match pspec.name() {
"context" => settings.context.to_value(),
"context-wait" => (settings.context_wait.as_millis() as u32).to_value(),
"raise-log-level" => settings.raise_log_level.to_value(),
"push-period" => (settings.push_period.as_millis() as u32).to_value(),
"max-buffers" => settings
.max_buffers
.and_then(|val| val.try_into().ok())
.unwrap_or(-1i32)
.to_value(),
_ => unimplemented!(),
}
fn property(&self, id: usize, pspec: &glib::ParamSpec) -> glib::Value {
self.settings.lock().unwrap().property(id, pspec)
}
fn constructed(&self) {
@ -800,15 +338,15 @@ impl ObjectImpl for TestSink {
}
}
impl GstObjectImpl for TestSink {}
impl GstObjectImpl for TaskSink {}
impl ElementImpl for TestSink {
impl ElementImpl for TaskSink {
fn metadata() -> Option<&'static gst::subclass::ElementMetadata> {
static ELEMENT_METADATA: Lazy<gst::subclass::ElementMetadata> = Lazy::new(|| {
gst::subclass::ElementMetadata::new(
"Thread-sharing standalone test sink",
"Thread-sharing standalone test task sink",
"Sink/Test",
"Thread-sharing standalone test sink",
"Thread-sharing standalone test task sink",
"François Laignel <fengalin@free.fr>",
)
});
@ -838,7 +376,7 @@ impl ElementImpl for TestSink {
&self,
transition: gst::StateChange,
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
gst::trace!(CAT, imp: self, "Changing state {:?}", transition);
gst::trace!(CAT, imp: self, "Changing state {transition:?}");
match transition {
gst::StateChange::NullToReady => {

View file

@ -4,14 +4,14 @@ use gst::prelude::*;
mod imp;
glib::wrapper! {
pub struct TestSink(ObjectSubclass<imp::TestSink>) @extends gst::Element, gst::Object;
pub struct TaskSink(ObjectSubclass<imp::TaskSink>) @extends gst::Element, gst::Object;
}
pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
gst::Element::register(
Some(plugin),
"ts-standalone-test-sink",
super::TASK_ELEMENT_NAME,
gst::Rank::None,
TestSink::static_type(),
TaskSink::static_type(),
)
}

View file

@ -19,11 +19,11 @@ use std::sync::Mutex;
use std::time::Duration;
use gstthreadshare::runtime::prelude::*;
use gstthreadshare::runtime::{timer, Context, PadSrc, Task};
use gstthreadshare::runtime::{task, timer, Context, PadSrc, Task};
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
gst::DebugCategory::new(
"ts-standalone-test-src",
super::ELEMENT_NAME,
gst::DebugColorFlags::empty(),
Some("Thread-sharing standalone test src"),
)
@ -39,7 +39,7 @@ struct Settings {
context: String,
context_wait: Duration,
push_period: gst::ClockTime,
raise_log_level: bool,
is_main_elem: bool,
num_buffers: Option<u32>,
}
@ -49,7 +49,7 @@ impl Default for Settings {
context: DEFAULT_CONTEXT.into(),
context_wait: DEFAULT_CONTEXT_WAIT,
push_period: DEFAULT_PUSH_PERIOD,
raise_log_level: false,
is_main_elem: false,
num_buffers: Some(DEFAULT_NUM_BUFFERS as u32),
}
}
@ -63,19 +63,18 @@ impl PadSrcHandler for TestSrcPadHandler {
#[derive(Debug)]
struct SrcTask {
element: super::TestSrc,
elem: super::TestSrc,
buffer_pool: gst::BufferPool,
timer: Option<timer::Interval>,
raise_log_level: bool,
is_main_elem: bool,
push_period: gst::ClockTime,
need_initial_events: bool,
need_segment: bool,
num_buffers: Option<u32>,
buffer_count: u32,
}
impl SrcTask {
fn new(element: super::TestSrc) -> Self {
fn new(elem: super::TestSrc) -> Self {
let buffer_pool = gst::BufferPool::new();
let mut pool_config = buffer_pool.config();
pool_config
@ -84,13 +83,12 @@ impl SrcTask {
buffer_pool.set_config(pool_config).unwrap();
SrcTask {
element,
elem,
buffer_pool,
timer: None,
raise_log_level: false,
is_main_elem: false,
push_period: gst::ClockTime::ZERO,
need_initial_events: true,
need_segment: true,
num_buffers: Some(DEFAULT_NUM_BUFFERS as u32),
buffer_count: 0,
}
@ -98,34 +96,48 @@ impl SrcTask {
}
impl TaskImpl for SrcTask {
type Item = gst::Buffer;
type Item = ();
fn prepare(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
let src = self.element.imp();
let settings = src.settings.lock().unwrap();
self.raise_log_level = settings.raise_log_level;
let imp = self.elem.imp();
let settings = imp.settings.lock().unwrap();
self.is_main_elem = settings.is_main_elem;
if self.raise_log_level {
gst::log!(CAT, obj: self.element, "Preparing Task");
} else {
gst::trace!(CAT, obj: self.element, "Preparing Task");
}
log_or_trace!(CAT, self.is_main_elem, imp: imp, "Preparing Task");
self.push_period = settings.push_period;
self.num_buffers = settings.num_buffers;
self.push_period = settings.push_period;
self.num_buffers = settings.num_buffers;
Ok(())
}
.boxed()
future::ok(()).boxed()
}
fn start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async {
if self.raise_log_level {
gst::log!(CAT, obj: self.element, "Starting Task");
} else {
gst::trace!(CAT, obj: self.element, "Starting Task");
async move {
log_or_trace!(CAT, self.is_main_elem, obj: self.elem, "Starting Task");
if self.need_initial_events {
let imp = self.elem.imp();
debug_or_trace!(CAT, self.is_main_elem, obj: self.elem, "Pushing initial events");
let stream_id =
format!("{:08x}{:08x}", rand::random::<u32>(), rand::random::<u32>());
let stream_start_evt = gst::event::StreamStart::builder(&stream_id)
.group_id(gst::GroupId::next())
.build();
imp.src_pad.push_event(stream_start_evt).await;
imp.src_pad
.push_event(gst::event::Caps::new(
&gst::Caps::builder("foo/bar").build(),
))
.await;
let segment_evt =
gst::event::Segment::new(&gst::FormattedSegment::<gst::format::Time>::new());
imp.src_pad.push_event(segment_evt).await;
self.need_initial_events = false;
}
self.timer = Some(
@ -138,178 +150,100 @@ impl TaskImpl for SrcTask {
);
self.buffer_count = 0;
self.buffer_pool.set_active(true).unwrap();
Ok(())
}
.boxed()
}
fn stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
if self.raise_log_level {
gst::log!(CAT, obj: self.element, "Stopping Task");
} else {
gst::trace!(CAT, obj: self.element, "Stopping Task");
}
log_or_trace!(CAT, self.is_main_elem, obj: self.elem, "Stopping Task");
self.buffer_pool.set_active(false).unwrap();
self.timer = None;
self.need_initial_events = true;
self.buffer_pool.set_active(false).unwrap();
self.timer = None;
self.need_initial_events = true;
self.need_segment = true;
future::ok(()).boxed()
}
fn try_next(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> {
async move {
log_or_trace!(CAT, self.is_main_elem, obj: self.elem, "Awaiting timer");
self.timer.as_mut().unwrap().next().await;
log_or_trace!(CAT, self.is_main_elem, obj: self.elem, "Timer ticked");
Ok(())
}
.boxed()
}
fn try_next(&mut self) -> BoxFuture<'_, Result<gst::Buffer, gst::FlowError>> {
fn handle_item(&mut self, _: ()) -> BoxFuture<'_, Result<(), gst::FlowError>> {
async move {
if self.raise_log_level {
gst::log!(CAT, obj: self.element, "Awaiting timer");
} else {
gst::trace!(CAT, obj: self.element, "Awaiting timer");
}
self.timer.as_mut().unwrap().next().await;
if self.raise_log_level {
gst::log!(CAT, obj: self.element, "Timer ticked");
} else {
gst::trace!(CAT, obj: self.element, "Timer ticked");
}
self.buffer_pool
let buffer = self
.buffer_pool
.acquire_buffer(None)
.map(|mut buffer| {
{
let buffer = buffer.get_mut().unwrap();
let rtime = self.element.current_running_time().unwrap();
let rtime = self.elem.current_running_time().unwrap();
buffer.set_dts(rtime);
}
buffer
})
.map_err(|err| {
gst::error!(CAT, obj: self.element, "Failed to acquire buffer {}", err);
gst::error!(CAT, obj: self.elem, "Failed to acquire buffer {err}");
err
})
})?;
debug_or_trace!(CAT, self.is_main_elem, obj: self.elem, "Forwarding buffer");
self.elem.imp().src_pad.push(buffer).await?;
log_or_trace!(CAT, self.is_main_elem, obj: self.elem, "Successfully pushed buffer");
self.buffer_count += 1;
if self.num_buffers.opt_eq(self.buffer_count) == Some(true) {
return Err(gst::FlowError::Eos);
}
Ok(())
}
.boxed()
}
fn handle_item(&mut self, buffer: gst::Buffer) -> BoxFuture<'_, Result<(), gst::FlowError>> {
fn handle_loop_error(&mut self, err: gst::FlowError) -> BoxFuture<'_, task::Trigger> {
async move {
let res = self.push(buffer).await;
match res {
Ok(_) => {
if self.raise_log_level {
gst::log!(CAT, obj: self.element, "Successfully pushed buffer");
} else {
gst::trace!(CAT, obj: self.element, "Successfully pushed buffer");
}
}
Err(gst::FlowError::Eos) => {
if self.raise_log_level {
gst::debug!(CAT, obj: self.element, "EOS");
} else {
gst::trace!(CAT, obj: self.element, "EOS");
}
let test_src = self.element.imp();
test_src.src_pad.push_event(gst::event::Eos::new()).await;
match err {
gst::FlowError::Eos => {
debug_or_trace!(CAT, self.is_main_elem, obj: self.elem, "Pushing EOS");
return Err(gst::FlowError::Eos);
}
Err(gst::FlowError::Flushing) => {
if self.raise_log_level {
gst::debug!(CAT, obj: self.element, "Flushing");
} else {
gst::trace!(CAT, obj: self.element, "Flushing");
let imp = self.elem.imp();
if !imp.src_pad.push_event(gst::event::Eos::new()).await {
gst::error!(CAT, imp: imp, "Error pushing EOS");
}
task::Trigger::Stop
}
Err(err) => {
gst::error!(CAT, obj: self.element, "Got error {}", err);
gst::FlowError::Flushing => {
debug_or_trace!(CAT, self.is_main_elem, obj: self.elem, "Flushing");
task::Trigger::FlushStart
}
err => {
gst::error!(CAT, obj: self.elem, "Got error {err}");
gst::element_error!(
&self.element,
&self.elem,
gst::StreamError::Failed,
("Internal data stream error"),
["streaming stopped, reason {}", err]
);
task::Trigger::Error
}
}
res.map(drop)
}
.boxed()
}
}
impl SrcTask {
async fn push(&mut self, buffer: gst::Buffer) -> Result<gst::FlowSuccess, gst::FlowError> {
if self.raise_log_level {
gst::debug!(CAT, obj: self.element, "Pushing {:?}", buffer);
} else {
gst::trace!(CAT, obj: self.element, "Pushing {:?}", buffer);
}
let test_src = self.element.imp();
if self.need_initial_events {
if self.raise_log_level {
gst::debug!(CAT, obj: self.element, "Pushing initial events");
} else {
gst::trace!(CAT, obj: self.element, "Pushing initial events");
}
let stream_id = format!("{:08x}{:08x}", rand::random::<u32>(), rand::random::<u32>());
let stream_start_evt = gst::event::StreamStart::builder(&stream_id)
.group_id(gst::GroupId::next())
.build();
test_src.src_pad.push_event(stream_start_evt).await;
test_src
.src_pad
.push_event(gst::event::Caps::new(
&gst::Caps::builder("foo/bar").build(),
))
.await;
self.need_initial_events = false;
}
if self.need_segment {
let segment_evt =
gst::event::Segment::new(&gst::FormattedSegment::<gst::format::Time>::new());
test_src.src_pad.push_event(segment_evt).await;
self.need_segment = false;
}
if self.raise_log_level {
gst::debug!(CAT, obj: self.element, "Forwarding buffer");
} else {
gst::trace!(CAT, obj: self.element, "Forwarding buffer");
}
let ok = test_src.src_pad.push(buffer).await?;
self.buffer_count += 1;
if self.num_buffers.opt_eq(self.buffer_count).unwrap_or(false) {
if self.raise_log_level {
gst::debug!(CAT, obj: self.element, "Pushing EOS");
} else {
gst::trace!(CAT, obj: self.element, "Pushing EOS");
}
let test_src = self.element.imp();
if !test_src.src_pad.push_event(gst::event::Eos::new()).await {
gst::error!(CAT, obj: self.element, "Error pushing EOS");
}
return Err(gst::FlowError::Eos);
}
Ok(ok)
}
}
#[derive(Debug)]
pub struct TestSrc {
src_pad: PadSrc,
@ -319,106 +253,57 @@ pub struct TestSrc {
impl TestSrc {
fn prepare(&self) -> Result<(), gst::ErrorMessage> {
let raise_log_level = self.settings.lock().unwrap().raise_log_level;
if raise_log_level {
gst::debug!(CAT, imp: self, "Preparing");
} else {
gst::trace!(CAT, imp: self, "Preparing");
}
let is_main_elem = self.settings.lock().unwrap().is_main_elem;
debug_or_trace!(CAT, is_main_elem, imp: self, "Preparing");
let settings = self.settings.lock().unwrap();
let context =
Context::acquire(&settings.context, settings.context_wait).map_err(|err| {
gst::error_msg!(
gst::ResourceError::OpenRead,
["Failed to acquire Context: {}", err]
)
})?;
let ts_ctx = Context::acquire(&settings.context, settings.context_wait).map_err(|err| {
gst::error_msg!(
gst::ResourceError::OpenRead,
["Failed to acquire Context: {}", err]
)
})?;
drop(settings);
self.task
.prepare(SrcTask::new(self.obj().clone()), context)
.prepare(SrcTask::new(self.instance().clone()), ts_ctx)
.block_on()?;
if raise_log_level {
gst::debug!(CAT, imp: self, "Prepared");
} else {
gst::trace!(CAT, imp: self, "Prepared");
}
debug_or_trace!(CAT, is_main_elem, imp: self, "Prepared");
Ok(())
}
fn unprepare(&self) {
let raise_log_level = self.settings.lock().unwrap().raise_log_level;
if raise_log_level {
gst::debug!(CAT, imp: self, "Unpreparing");
} else {
gst::trace!(CAT, imp: self, "Unpreparing");
}
let is_main_elem = self.settings.lock().unwrap().is_main_elem;
debug_or_trace!(CAT, is_main_elem, imp: self, "Unpreparing");
self.task.unprepare().block_on().unwrap();
if raise_log_level {
gst::debug!(CAT, imp: self, "Unprepared");
} else {
gst::trace!(CAT, imp: self, "Unprepared");
}
debug_or_trace!(CAT, is_main_elem, imp: self, "Unprepared");
}
fn stop(&self) -> Result<(), gst::ErrorMessage> {
let raise_log_level = self.settings.lock().unwrap().raise_log_level;
if raise_log_level {
gst::debug!(CAT, imp: self, "Stopping");
} else {
gst::trace!(CAT, imp: self, "Stopping");
}
let is_main_elem = self.settings.lock().unwrap().is_main_elem;
debug_or_trace!(CAT, is_main_elem, imp: self, "Stopping");
self.task.stop().block_on()?;
if raise_log_level {
gst::debug!(CAT, imp: self, "Stopped");
} else {
gst::trace!(CAT, imp: self, "Stopped");
}
debug_or_trace!(CAT, is_main_elem, imp: self, "Stopped");
Ok(())
}
fn start(&self) -> Result<(), gst::ErrorMessage> {
let raise_log_level = self.settings.lock().unwrap().raise_log_level;
if raise_log_level {
gst::debug!(CAT, imp: self, "Starting");
} else {
gst::trace!(CAT, imp: self, "Starting");
}
let is_main_elem = self.settings.lock().unwrap().is_main_elem;
debug_or_trace!(CAT, is_main_elem, imp: self, "Starting");
self.task.start().block_on()?;
if raise_log_level {
gst::debug!(CAT, imp: self, "Started");
} else {
gst::trace!(CAT, imp: self, "Started");
}
debug_or_trace!(CAT, is_main_elem, imp: self, "Started");
Ok(())
}
fn pause(&self) -> Result<(), gst::ErrorMessage> {
let raise_log_level = self.settings.lock().unwrap().raise_log_level;
if raise_log_level {
gst::debug!(CAT, imp: self, "Pausing");
} else {
gst::trace!(CAT, imp: self, "Pausing");
}
let is_main_elem = self.settings.lock().unwrap().is_main_elem;
debug_or_trace!(CAT, is_main_elem, imp: self, "Pausing");
self.task.pause().block_on()?;
if raise_log_level {
gst::debug!(CAT, imp: self, "Paused");
} else {
gst::trace!(CAT, imp: self, "Paused");
}
debug_or_trace!(CAT, is_main_elem, imp: self, "Paused");
Ok(())
}
@ -462,9 +347,9 @@ impl ObjectImpl for TestSrc {
.blurb("Push a new buffer every this many ms")
.default_value(DEFAULT_PUSH_PERIOD.mseconds() as u32)
.build(),
glib::ParamSpecBoolean::builder("raise-log-level")
.nick("Raise log level")
.blurb("Raises the log level so that this element stands out")
glib::ParamSpecBoolean::builder("main-elem")
.nick("Main Element")
.blurb("Declare this element as the main one")
.write_only()
.build(),
glib::ParamSpecInt::builder("num-buffers")
@ -485,24 +370,21 @@ impl ObjectImpl for TestSrc {
"context" => {
settings.context = value
.get::<Option<String>>()
.expect("type checked upstream")
.unwrap()
.unwrap_or_else(|| DEFAULT_CONTEXT.into());
}
"context-wait" => {
settings.context_wait = Duration::from_millis(
value.get::<u32>().expect("type checked upstream").into(),
);
settings.context_wait = Duration::from_millis(value.get::<u32>().unwrap().into());
}
"push-period" => {
settings.push_period = gst::ClockTime::from_mseconds(
value.get::<u32>().expect("type checked upstream").into(),
);
let value: u64 = value.get::<u32>().unwrap().into();
settings.push_period = value.mseconds();
}
"raise-log-level" => {
settings.raise_log_level = value.get::<bool>().expect("type checked upstream");
"main-elem" => {
settings.is_main_elem = value.get::<bool>().unwrap();
}
"num-buffers" => {
let value = value.get::<i32>().expect("type checked upstream");
let value = value.get::<i32>().unwrap();
settings.num_buffers = if value > 0 { Some(value as u32) } else { None };
}
_ => unimplemented!(),
@ -515,7 +397,7 @@ impl ObjectImpl for TestSrc {
"context" => settings.context.to_value(),
"context-wait" => (settings.context_wait.as_millis() as u32).to_value(),
"push-period" => (settings.push_period.mseconds() as u32).to_value(),
"raise-log-level" => settings.raise_log_level.to_value(),
"main-elem" => settings.is_main_elem.to_value(),
"num-buffers" => settings
.num_buffers
.and_then(|val| val.try_into().ok())
@ -571,7 +453,7 @@ impl ElementImpl for TestSrc {
&self,
transition: gst::StateChange,
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
gst::trace!(CAT, imp: self, "Changing state {:?}", transition);
gst::trace!(CAT, imp: self, "Changing state {transition:?}");
match transition {
gst::StateChange::NullToReady => {

View file

@ -3,6 +3,8 @@ use gst::prelude::*;
mod imp;
pub const ELEMENT_NAME: &str = "ts-standalone-src";
glib::wrapper! {
pub struct TestSrc(ObjectSubclass<imp::TestSrc>) @extends gst::Element, gst::Object;
}
@ -10,7 +12,7 @@ glib::wrapper! {
pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
gst::Element::register(
Some(plugin),
"ts-standalone-test-src",
"ts-standalone-src",
gst::Rank::None,
TestSrc::static_type(),
)