From 29a490f6dc7b792df7ab45f6a79cbfbee694d332 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Laignel?= Date: Thu, 3 Nov 2022 12:33:39 +0100 Subject: [PATCH] ts: introduce ts-audiotestsrc This makes it easy to generate "listenable" signals and to evaluate discontinuities. When the `tuning` feature is activated and the `main-elem` property is set, the element can log the parked duration in %, which is an image of the CPU usage for the ts-context. This commit adds a test mode to `udpsrc-benchmark-sender` which generates default audio buffers from `ts-audiotestsrc`. The `rtp` mode is modified so that it uses `ts-audiotestsrc`. --- docs/plugins/gst_plugins_cache.json | 100 +++ generic/threadshare/Cargo.toml | 1 + generic/threadshare/examples/benchmark.rs | 27 +- .../examples/udpsrc_benchmark_sender.rs | 157 +++- generic/threadshare/src/audiotestsrc/imp.rs | 735 ++++++++++++++++++ generic/threadshare/src/audiotestsrc/mod.rs | 17 + generic/threadshare/src/lib.rs | 23 +- 7 files changed, 989 insertions(+), 71 deletions(-) create mode 100644 generic/threadshare/src/audiotestsrc/imp.rs create mode 100644 generic/threadshare/src/audiotestsrc/mod.rs diff --git a/docs/plugins/gst_plugins_cache.json b/docs/plugins/gst_plugins_cache.json index 19bbbdbd..a13879a3 100644 --- a/docs/plugins/gst_plugins_cache.json +++ b/docs/plugins/gst_plugins_cache.json @@ -6467,6 +6467,106 @@ } } }, + "ts-audiotestsrc": { + "author": "François Laignel ", + "description": "Thread-sharing audio test source", + "hierarchy": [ + "TsAudioTestSrc", + "GstElement", + "GstObject", + "GInitiallyUnowned", + "GObject" + ], + "klass": "Source/Test", + "pad-templates": { + "src": { + "caps": "audio/x-raw:\n rate: [ 8000, 2147483646 ]\n channels: [ 1, 2147483646 ]\n layout: interleaved\n format: S16LE\n", + "direction": "src", + "presence": "always" + } + }, + "properties": { + "buffer-duration": { + "blurb": "Buffer duration in ms", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "10", + "max": "-1", + "min": "0", + "mutable": "null", + "readable": true, + "type": "guint", + "writable": true + }, + "context": { + "blurb": "Context name to share threads with", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "", + "mutable": "null", + "readable": true, + "type": "gchararray", + "writable": true + }, + "context-wait": { + "blurb": "Throttle poll loop to run at most once every this many ms", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "0", + "max": "1000", + "min": "0", + "mutable": "null", + "readable": true, + "type": "guint", + "writable": true + }, + "do-timestamp": { + "blurb": "Apply current stream time to buffers", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "false", + "mutable": "null", + "readable": true, + "type": "gboolean", + "writable": true + }, + "is-live": { + "blurb": "Whether to act as a live source", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "false", + "mutable": "null", + "readable": true, + "type": "gboolean", + "writable": true + }, + "num-buffers": { + "blurb": "Number of buffers to output before sending EOS (-1 = unlimited)", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "-1", + "max": "2147483647", + "min": "-1", + "mutable": "null", + "readable": true, + "type": "gint", + "writable": true + } + }, + "rank": "none" + }, "ts-input-selector": { "author": "Mathieu Duponchelle ", "description": "Simple input selector element", diff --git a/generic/threadshare/Cargo.toml b/generic/threadshare/Cargo.toml index a1affdfb..0f73e6b5 100644 --- a/generic/threadshare/Cargo.toml +++ b/generic/threadshare/Cargo.toml @@ -16,6 +16,7 @@ futures = "0.3.21" libc = "0.2" gio = { git = "https://github.com/gtk-rs/gtk-rs-core" } gst = { package = "gstreamer", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" } +gst-audio = { package = "gstreamer-audio", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" } gst-net = { package = "gstreamer-net", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" } gst-rtp = { package = "gstreamer-rtp", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" } once_cell = "1" diff --git a/generic/threadshare/examples/benchmark.rs b/generic/threadshare/examples/benchmark.rs index 2080b777..c7e98af7 100644 --- a/generic/threadshare/examples/benchmark.rs +++ b/generic/threadshare/examples/benchmark.rs @@ -63,7 +63,7 @@ fn main() { }; let is_rtp = args.len() > 6 && (args[6] == "rtp"); - let rtp_caps = gst::Caps::builder("audio/x-rtp") + let rtp_caps = gst::Caps::builder("application/x-rtp") .field("media", "audio") .field("payload", 8i32) .field("clock-rate", 8000) @@ -97,7 +97,7 @@ fn main() { "udpsrc" => { let source = gst::ElementFactory::make("udpsrc") .name(format!("source-{}", i).as_str()) - .property("port", 40000i32 + i as i32) + .property("port", 5004i32 + i as i32) .property("retrieve-sender-address", false) .build() .unwrap(); @@ -108,7 +108,7 @@ fn main() { let context = build_context(); let source = gst::ElementFactory::make("ts-udpsrc") .name(format!("source-{}", i).as_str()) - .property("port", 40000i32 + i as i32) + .property("port", 5004i32 + i as i32) .property("context", &context) .property("context-wait", wait) .build() @@ -184,22 +184,7 @@ fn main() { pipeline.add_many(elements).unwrap(); gst::Element::link_many(elements).unwrap(); } else { - let queue = if let Some(context) = context { - let queue = gst::ElementFactory::make("ts-queue") - .name(format!("queue-{}", i).as_str()) - .property("context", &context) - .property("context-wait", wait) - .build() - .unwrap(); - queue - } else { - gst::ElementFactory::make("queue") - .name(format!("queue-{}", i).as_str()) - .build() - .unwrap() - }; - - let elements = &[&source, &queue, &sink]; + let elements = &[&source, &sink]; pipeline.add_many(elements).unwrap(); gst::Element::link_many(elements).unwrap(); } @@ -268,14 +253,14 @@ fn main() { let elapsed = init.elapsed(); gst::info!( CAT, - "{:>6.2} / s / stream", + "Thrpt: {:>6.2}", total_count * 1_000.0 / elapsed.as_millis() as f32 ); #[cfg(feature = "tuning")] gst::info!( CAT, - "{:>6.2}% parked", + "Parked: {:>6.2}%", (ctx_0.parked_duration() - parked_init).as_nanos() as f32 * 100.0 / elapsed.as_nanos() as f32 ); diff --git a/generic/threadshare/examples/udpsrc_benchmark_sender.rs b/generic/threadshare/examples/udpsrc_benchmark_sender.rs index 196662ac..2ba9e24f 100644 --- a/generic/threadshare/examples/udpsrc_benchmark_sender.rs +++ b/generic/threadshare/examples/udpsrc_benchmark_sender.rs @@ -17,19 +17,45 @@ // // SPDX-License-Identifier: LGPL-2.1-or-later +use gst::glib; +use gst::prelude::*; + +use once_cell::sync::Lazy; + use std::net; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::{env, thread, time}; +static CAT: Lazy = Lazy::new(|| { + gst::DebugCategory::new( + "ts-udpsrc-benchmark-sender", + gst::DebugColorFlags::empty(), + Some("Thread-sharing UDP src benchmark sender"), + ) +}); + fn main() { + gst::init().unwrap(); + gstthreadshare::plugin_register_static().unwrap(); + let args = env::args().collect::>(); assert!(args.len() > 1); let n_streams: u16 = args[1].parse().unwrap(); - if args.len() > 2 && args[2] == "rtp" { - send_rtp_buffers(n_streams); + let num_buffers: Option = if args.len() > 3 { + args[3].parse().ok() } else { - send_raw_buffers(n_streams); + None + }; + + if args.len() > 2 { + match args[2].as_str() { + "raw" => send_raw_buffers(n_streams), + "rtp" => send_rtp_buffers(n_streams, num_buffers), + _ => send_test_buffers(n_streams, num_buffers), + } + } else { + send_test_buffers(n_streams, num_buffers); } } @@ -38,7 +64,7 @@ fn send_raw_buffers(n_streams: u16) { let socket = net::UdpSocket::bind("0.0.0.0:0").unwrap(); let ipaddr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)); - let destinations = (40000..(40000 + n_streams)) + let destinations = (5004..(5004 + n_streams)) .map(|port| SocketAddr::new(ipaddr, port)) .collect::>(); @@ -60,43 +86,60 @@ fn send_raw_buffers(n_streams: u16) { } } -fn send_rtp_buffers(n_streams: u16) { - use gst::glib; - use gst::prelude::*; - - gst::init().unwrap(); - - #[cfg(debug_assertions)] - { - use std::path::Path; - - let mut path = Path::new("target/debug"); - if !path.exists() { - path = Path::new("../../target/debug"); - } - - gst::Registry::get().scan_path(path); - } - #[cfg(not(debug_assertions))] - { - use std::path::Path; - - let mut path = Path::new("target/release"); - if !path.exists() { - path = Path::new("../../target/release"); - } - - gst::Registry::get().scan_path(path); - } - - let l = glib::MainLoop::new(None, false); +fn send_test_buffers(n_streams: u16, num_buffers: Option) { let pipeline = gst::Pipeline::default(); for i in 0..n_streams { - let src = gst::ElementFactory::make("audiotestsrc") - .name(format!("audiotestsrc-{}", i).as_str()) + let src = gst::ElementFactory::make("ts-audiotestsrc") + .name(format!("ts-audiotestsrc-{}", i).as_str()) + .property("context-wait", 20u32) + .property("is-live", true) + .property("do-timestamp", true) .build() .unwrap(); - src.set_property("is-live", true); + + if let Some(num_buffers) = num_buffers { + src.set_property("num-buffers", num_buffers); + } + + #[cfg(feature = "tuning")] + if i == 0 { + src.set_property("main-elem", true); + } + + let sink = gst::ElementFactory::make("ts-udpsink") + .name(format!("udpsink-{}", i).as_str()) + .property("clients", format!("127.0.0.1:{}", i + 5004)) + .property("context-wait", 20u32) + .build() + .unwrap(); + + let elements = &[&src, &sink]; + pipeline.add_many(elements).unwrap(); + gst::Element::link_many(elements).unwrap(); + } + + run(pipeline); +} + +fn send_rtp_buffers(n_streams: u16, num_buffers: Option) { + let pipeline = gst::Pipeline::default(); + for i in 0..n_streams { + let src = gst::ElementFactory::make("ts-audiotestsrc") + .name(format!("ts-audiotestsrc-{}", i).as_str()) + .property("context-wait", 20u32) + .property("is-live", true) + .property("do-timestamp", true) + .build() + .unwrap(); + + if let Some(num_buffers) = num_buffers { + src.set_property("num-buffers", num_buffers); + } + + #[cfg(feature = "tuning")] + if i == 0 { + src.set_property("main-elem", true); + } let enc = gst::ElementFactory::make("alawenc") .name(format!("alawenc-{}", i).as_str()) @@ -106,11 +149,11 @@ fn send_rtp_buffers(n_streams: u16) { .name(format!("rtppcmapay-{}", i).as_str()) .build() .unwrap(); + let sink = gst::ElementFactory::make("ts-udpsink") .name(format!("udpsink-{}", i).as_str()) - .property("clients", format!("127.0.0.1:{}", i + 40000)) - .property("context", "context-udpsink") .property("context-wait", 20u32) + .property("clients", format!("127.0.0.1:{}", i + 5004)) .build() .unwrap(); @@ -119,6 +162,42 @@ fn send_rtp_buffers(n_streams: u16) { gst::Element::link_many(elements).unwrap(); } + run(pipeline); +} + +fn run(pipeline: gst::Pipeline) { + let l = glib::MainLoop::new(None, false); + + let bus = pipeline.bus().unwrap(); + let l_clone = l.clone(); + bus.add_watch(move |_, msg| { + use gst::MessageView; + match msg.view() { + MessageView::Eos(_) => { + gst::info!(CAT, "Received eos"); + l_clone.quit(); + + glib::Continue(false) + } + MessageView::Error(msg) => { + gst::error!( + CAT, + "Error from {:?}: {} ({:?})", + msg.src().map(|s| s.path_string()), + msg.error(), + msg.debug() + ); + l_clone.quit(); + + glib::Continue(false) + } + _ => glib::Continue(true), + } + }) + .expect("Failed to add bus watch"); + pipeline.set_state(gst::State::Playing).unwrap(); l.run(); + + pipeline.set_state(gst::State::Null).unwrap(); } diff --git a/generic/threadshare/src/audiotestsrc/imp.rs b/generic/threadshare/src/audiotestsrc/imp.rs new file mode 100644 index 00000000..9ae51d0b --- /dev/null +++ b/generic/threadshare/src/audiotestsrc/imp.rs @@ -0,0 +1,735 @@ +// Copyright (C) 2022 François Laignel +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +use futures::future::BoxFuture; +use futures::prelude::*; + +use gst::glib; +use gst::prelude::*; +use gst::subclass::prelude::*; + +use once_cell::sync::Lazy; + +use std::mem::size_of; +use std::sync::Mutex; +use std::time::Duration; +#[cfg(feature = "tuning")] +use std::time::Instant; + +use crate::runtime::prelude::*; +use crate::runtime::{self, task, timer, PadSrc, Task}; + +static CAT: Lazy = Lazy::new(|| { + gst::DebugCategory::new( + "ts-audiotestsrc", + gst::DebugColorFlags::empty(), + Some("Thread-sharing audio test src"), + ) +}); + +const DEFAULT_CONTEXT: &str = ""; +const DEFAULT_CONTEXT_WAIT: Duration = Duration::ZERO; +const DEFAULT_BUFFER_DURATION: gst::ClockTime = gst::ClockTime::from_mseconds(10); +const DEFAULT_DO_TIMESTAMP: bool = false; +const DEFAULT_IS_LIVE: bool = false; +const DEFAULT_NUM_BUFFERS: i32 = -1; + +const DEFAULT_CHANNELS: usize = 1; +const DEFAULT_FREQ: f32 = 440.0; +const DEFAULT_VOLUME: f32 = 0.8; +const DEFAULT_RATE: u32 = 44_100; + +#[cfg(feature = "tuning")] +const RAMPUP_BUFFER_COUNT: u32 = 500; +#[cfg(feature = "tuning")] +const LOG_BUFFER_INTERVAL: u32 = 2000; + +static DEFAULT_CAPS: Lazy = Lazy::new(|| { + gst_audio::AudioCapsBuilder::new_interleaved() + .format(gst_audio::AUDIO_FORMAT_S16) + .rate_range(8_000..i32::MAX) + .channels_range(1..i32::MAX) + .build() +}); + +#[derive(Debug, Clone)] +struct Settings { + context: String, + context_wait: Duration, + do_timestamp: bool, + is_live: bool, + buffer_duration: gst::ClockTime, + num_buffers: Option, + #[cfg(feature = "tuning")] + is_main_elem: bool, +} + +impl Default for Settings { + fn default() -> Self { + Settings { + context: DEFAULT_CONTEXT.into(), + context_wait: DEFAULT_CONTEXT_WAIT, + do_timestamp: DEFAULT_DO_TIMESTAMP, + is_live: DEFAULT_IS_LIVE, + buffer_duration: DEFAULT_BUFFER_DURATION, + num_buffers: None, + #[cfg(feature = "tuning")] + is_main_elem: false, + } + } +} + +#[derive(Clone, Debug)] +struct AudioTestSrcPadHandler; +impl PadSrcHandler for AudioTestSrcPadHandler { + type ElementImpl = AudioTestSrc; + + fn src_query(self, pad: &gst::Pad, imp: &Self::ElementImpl, query: &mut gst::QueryRef) -> bool { + gst::debug!(CAT, obj: pad, "Received {query:?}"); + + if let gst::QueryViewMut::Latency(q) = query.view_mut() { + let settings = imp.settings.lock().unwrap(); + let min_latency = if settings.is_live { + settings.buffer_duration + } else { + gst::ClockTime::ZERO + }; + + q.set( + settings.is_live, + min_latency, + min_latency + + runtime::Context::current().map_or(gst::ClockTime::ZERO, |ctx| { + gst::ClockTime::try_from(ctx.wait_duration()).unwrap() + }), + ); + + return true; + } + + gst::Pad::query_default(pad, Some(&*imp.obj()), query) + } +} + +#[derive(Debug, Copy, Clone)] +enum Negotiation { + Unchanged, + Changed, +} + +impl Negotiation { + fn has_changed(self) -> bool { + matches!(self, Negotiation::Changed) + } +} + +#[derive(Debug)] +struct AudioTestSrcTask { + elem: super::AudioTestSrc, + buffer_pool: gst::BufferPool, + rate: u32, + channels: usize, + do_timestamp: bool, + is_live: bool, + buffer_duration: gst::ClockTime, + need_initial_events: bool, + step: f32, + accumulator: f32, + last_buffer_end: Option, + caps: gst::Caps, + buffer_count: u32, + num_buffers: Option, + #[cfg(feature = "tuning")] + is_main_elem: bool, + #[cfg(feature = "tuning")] + parked_duration_init: Option, + #[cfg(feature = "tuning")] + log_start: Instant, +} + +impl AudioTestSrcTask { + fn new(elem: super::AudioTestSrc) -> Self { + AudioTestSrcTask { + elem, + buffer_pool: gst::BufferPool::new(), + rate: DEFAULT_RATE, + channels: DEFAULT_CHANNELS, + do_timestamp: DEFAULT_DO_TIMESTAMP, + is_live: DEFAULT_IS_LIVE, + buffer_duration: DEFAULT_BUFFER_DURATION, + need_initial_events: true, + step: 0.0, + accumulator: 0.0, + last_buffer_end: None, + caps: gst::Caps::new_empty(), + buffer_count: 0, + num_buffers: None, + #[cfg(feature = "tuning")] + is_main_elem: false, + #[cfg(feature = "tuning")] + parked_duration_init: None, + #[cfg(feature = "tuning")] + log_start: Instant::now(), + } + } + + async fn negotiate(&mut self) -> Result { + let imp = self.elem.imp(); + let pad = imp.src_pad.gst_pad(); + + if !pad.check_reconfigure() { + return Ok(Negotiation::Unchanged); + } + + let mut caps = pad.peer_query_caps(Some(&DEFAULT_CAPS)); + gst::debug!(CAT, imp: imp, "Peer returned {caps:?}"); + + if caps.is_empty() { + pad.mark_reconfigure(); + let err = gst::error_msg!(gst::CoreError::Pad, ["No common Caps"]); + gst::error!(CAT, imp: imp, "{err}"); + return Err(err); + } + + if caps.is_any() { + gst::debug!(CAT, imp: imp, "Using our own Caps"); + caps = DEFAULT_CAPS.clone(); + } + + { + let caps = caps.make_mut(); + let s = caps.structure_mut(0).ok_or_else(|| { + let err = gst::error_msg!(gst::CoreError::Pad, ["Invalid peer Caps structure"]); + gst::error!(CAT, imp: imp, "{err}"); + err + })?; + + s.fixate_field_nearest_int("rate", DEFAULT_RATE as i32); + self.rate = s.get::("rate").unwrap() as u32; + self.step = 2.0 * std::f32::consts::PI * DEFAULT_FREQ / (self.rate as f32); + + s.fixate_field_nearest_int("channels", DEFAULT_CHANNELS as i32); + self.channels = s.get::("channels").unwrap() as usize; + + if self.channels > 2 { + s.set::( + "channel-mask", + gst_audio::AudioChannelPosition::fallback_mask(self.channels as u32).into(), + ); + } + } + + caps.fixate(); + gst::debug!(CAT, imp: imp, "fixated to {caps:?}"); + + imp.src_pad.push_event(gst::event::Caps::new(&caps)).await; + + self.caps = caps; + + Ok(Negotiation::Changed) + } +} + +impl TaskImpl for AudioTestSrcTask { + type Item = gst::Buffer; + + fn prepare(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> { + gst::log!(CAT, obj: self.elem, "Preparing Task"); + + let imp = self.elem.imp(); + let settings = imp.settings.lock().unwrap(); + self.do_timestamp = settings.do_timestamp; + self.is_live = settings.is_live; + self.buffer_duration = settings.buffer_duration; + self.num_buffers = settings.num_buffers; + + #[cfg(feature = "tuning")] + { + self.is_main_elem = settings.is_main_elem; + } + + future::ok(()).boxed() + } + + fn start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> { + async move { + gst::log!(CAT, obj: self.elem, "Starting Task"); + + if self.need_initial_events { + gst::debug!(CAT, obj: self.elem, "Pushing initial events"); + + let stream_id = + format!("{:08x}{:08x}", rand::random::(), rand::random::()); + let stream_start_evt = gst::event::StreamStart::builder(&stream_id) + .group_id(gst::GroupId::next()) + .build(); + self.elem.imp().src_pad.push_event(stream_start_evt).await; + } + + if self.negotiate().await?.has_changed() { + let bytes_per_buffer = (self.rate as u64) + * self.buffer_duration.mseconds() + * self.channels as u64 + * size_of::() as u64 + / 1_000; + + let mut pool_config = self.buffer_pool.config(); + pool_config + .as_mut() + .set_params(Some(&self.caps), bytes_per_buffer as u32, 2, 6); + self.buffer_pool.set_config(pool_config).unwrap(); + } + + assert!(!self.caps.is_empty()); + self.buffer_pool.set_active(true).unwrap(); + + if self.need_initial_events { + let segment_evt = + gst::event::Segment::new(&gst::FormattedSegment::::new()); + self.elem.imp().src_pad.push_event(segment_evt).await; + + self.need_initial_events = false; + } + + self.buffer_count = 0; + + #[cfg(feature = "tuning")] + if self.is_main_elem { + self.parked_duration_init = None; + } + + Ok(()) + } + .boxed() + } + + fn pause(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> { + gst::log!(CAT, obj: self.elem, "Pausing Task"); + self.buffer_pool.set_active(false).unwrap(); + + future::ok(()).boxed() + } + + fn stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> { + gst::log!(CAT, obj: self.elem, "Stopping Task"); + + self.need_initial_events = true; + self.accumulator = 0.0; + self.last_buffer_end = None; + + future::ok(()).boxed() + } + + fn try_next(&mut self) -> BoxFuture<'_, Result> { + let mut buffer = match self.buffer_pool.acquire_buffer(None) { + Ok(buffer) => buffer, + Err(err) => { + gst::error!(CAT, obj: self.elem, "Failed to acquire buffer {}", err); + return future::err(err).boxed(); + } + }; + + let buffer_mut = buffer.get_mut().unwrap(); + + let start = if self.is_live | self.do_timestamp { + self.last_buffer_end + .or_else(|| self.elem.current_running_time()) + } else { + None + }; + + { + use std::io::Write; + + let mut mapped = buffer_mut.map_writable().unwrap(); + let slice = mapped.as_mut_slice(); + slice + .chunks_mut(self.channels * size_of::()) + .for_each(|frame| { + let sample = ((self.accumulator.sin() * DEFAULT_VOLUME * (i16::MAX as f32)) + as i16) + .to_ne_bytes(); + + frame.chunks_mut(size_of::()).for_each(|mut channel| { + let _ = channel.write(&sample).unwrap(); + }); + + self.accumulator += self.step; + if self.accumulator >= 2.0 * std::f32::consts::PI { + self.accumulator = -2.0 * std::f32::consts::PI; + } + }); + } + + if self.do_timestamp { + buffer_mut.set_dts(start); + buffer_mut.set_duration(self.buffer_duration); + } + + self.last_buffer_end = start.opt_add(self.buffer_duration); + + async move { + if self.is_live { + if let Some(delay) = self + .last_buffer_end + .unwrap() + .checked_sub(self.elem.current_running_time().unwrap()) + { + // Wait for all samples to fit in last time slice + timer::delay_for_at_least(delay.into()).await; + } + } else { + // Let the scheduler share time with other tasks + runtime::executor::yield_now().await; + } + + Ok(buffer) + } + .boxed() + } + + fn handle_item(&mut self, buffer: gst::Buffer) -> BoxFuture<'_, Result<(), gst::FlowError>> { + async move { + let imp = self.elem.imp(); + + gst::debug!(CAT, imp: imp, "Pushing {buffer:?}"); + imp.src_pad.push(buffer).await?; + gst::log!(CAT, imp: imp, "Successfully pushed buffer"); + + self.buffer_count += 1; + + #[cfg(feature = "tuning")] + if self.is_main_elem { + if let Some(parked_duration_init) = self.parked_duration_init { + if self.buffer_count % LOG_BUFFER_INTERVAL == 0 { + let parked_duration = + runtime::Context::current().unwrap().parked_duration() + - parked_duration_init; + + gst::info!( + CAT, + "Parked: {:5.2?}%", + parked_duration.as_nanos() as f32 * 100.0 + / self.log_start.elapsed().as_nanos() as f32, + ); + } + } else if self.buffer_count == RAMPUP_BUFFER_COUNT { + self.parked_duration_init = + Some(runtime::Context::current().unwrap().parked_duration()); + self.log_start = Instant::now(); + + gst::info!(CAT, "Ramp up complete"); + } + } + + if self.num_buffers.opt_eq(self.buffer_count) == Some(true) { + return Err(gst::FlowError::Eos); + } + + Ok(()) + } + .boxed() + } + + fn handle_loop_error(&mut self, err: gst::FlowError) -> BoxFuture<'_, task::Trigger> { + async move { + match err { + gst::FlowError::Flushing => { + gst::debug!(CAT, obj: self.elem, "Flushing"); + + task::Trigger::FlushStart + } + gst::FlowError::Eos => { + gst::debug!(CAT, obj: self.elem, "EOS"); + self.elem + .imp() + .src_pad + .push_event(gst::event::Eos::new()) + .await; + + task::Trigger::Stop + } + err => { + gst::error!(CAT, obj: self.elem, "Got error {err}"); + gst::element_error!( + &self.elem, + gst::StreamError::Failed, + ("Internal data stream error"), + ["streaming stopped, reason {}", err] + ); + + task::Trigger::Error + } + } + } + .boxed() + } +} + +#[derive(Debug)] +pub struct AudioTestSrc { + src_pad: PadSrc, + task: Task, + settings: Mutex, +} + +impl AudioTestSrc { + fn prepare(&self) -> Result<(), gst::ErrorMessage> { + gst::debug!(CAT, imp: self, "Preparing"); + + let settings = self.settings.lock().unwrap(); + let context = + runtime::Context::acquire(&settings.context, settings.context_wait).map_err(|err| { + gst::error_msg!( + gst::ResourceError::OpenRead, + ["Failed to acquire Context: {}", err] + ) + })?; + drop(settings); + + self.task + .prepare(AudioTestSrcTask::new(self.obj().clone()), context) + .block_on()?; + + gst::debug!(CAT, imp: self, "Prepared"); + + Ok(()) + } + + fn unprepare(&self) { + gst::debug!(CAT, imp: self, "Unpreparing"); + self.task.unprepare().block_on().unwrap(); + gst::debug!(CAT, imp: self, "Unprepared"); + } + + fn stop(&self) -> Result<(), gst::ErrorMessage> { + gst::debug!(CAT, imp: self, "Stopping"); + self.task.stop().block_on()?; + gst::debug!(CAT, imp: self, "Stopped"); + + Ok(()) + } + + fn start(&self) -> Result<(), gst::ErrorMessage> { + gst::debug!(CAT, imp: self, "Starting"); + self.task.start().block_on()?; + gst::debug!(CAT, imp: self, "Started"); + + Ok(()) + } + + fn pause(&self) -> Result<(), gst::ErrorMessage> { + gst::debug!(CAT, imp: self, "Pausing"); + self.task.pause().block_on()?; + gst::debug!(CAT, imp: self, "Paused"); + + Ok(()) + } +} + +#[glib::object_subclass] +impl ObjectSubclass for AudioTestSrc { + const NAME: &'static str = "TsAudioTestSrc"; + type Type = super::AudioTestSrc; + type ParentType = gst::Element; + + fn with_class(klass: &Self::Class) -> Self { + Self { + src_pad: PadSrc::new( + gst::Pad::from_template(&klass.pad_template("src").unwrap(), Some("src")), + AudioTestSrcPadHandler, + ), + task: Task::default(), + settings: Default::default(), + } + } +} + +impl ObjectImpl for AudioTestSrc { + fn properties() -> &'static [glib::ParamSpec] { + static PROPERTIES: Lazy> = Lazy::new(|| { + vec![ + glib::ParamSpecString::builder("context") + .nick("Context") + .blurb("Context name to share threads with") + .default_value(Some(DEFAULT_CONTEXT)) + .build(), + glib::ParamSpecUInt::builder("context-wait") + .nick("Context Wait") + .blurb("Throttle poll loop to run at most once every this many ms") + .maximum(1000) + .default_value(DEFAULT_CONTEXT_WAIT.as_millis() as u32) + .build(), + glib::ParamSpecBoolean::builder("do-timestamp") + .nick("Do timestamp") + .blurb("Apply current stream time to buffers") + .build(), + glib::ParamSpecBoolean::builder("is-live") + .nick("Is live") + .blurb("Whether to act as a live source") + .build(), + glib::ParamSpecUInt::builder("buffer-duration") + .nick("Buffer duration") + .blurb("Buffer duration in ms") + .default_value(DEFAULT_BUFFER_DURATION.mseconds() as u32) + .build(), + glib::ParamSpecInt::builder("num-buffers") + .nick("Num Buffers") + .blurb("Number of buffers to output before sending EOS (-1 = unlimited)") + .minimum(-1i32) + .default_value(DEFAULT_NUM_BUFFERS) + .build(), + #[cfg(feature = "tuning")] + glib::ParamSpecBoolean::builder("main-elem") + .nick("Main Element") + .blurb("Declare this element as the main one") + .write_only() + .build(), + ] + }); + + PROPERTIES.as_ref() + } + + fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) { + let mut settings = self.settings.lock().unwrap(); + match pspec.name() { + "context" => { + settings.context = value + .get::>() + .unwrap() + .unwrap_or_else(|| DEFAULT_CONTEXT.into()); + } + "context-wait" => { + settings.context_wait = Duration::from_millis(value.get::().unwrap().into()); + } + "do-timestamp" => { + settings.do_timestamp = value.get::().unwrap(); + } + "is-live" => { + settings.is_live = value.get::().unwrap(); + } + "buffer-duration" => { + settings.buffer_duration = (value.get::().unwrap() as u64).mseconds(); + } + "num-buffers" => { + let value = value.get::().unwrap(); + settings.num_buffers = if value > 0 { Some(value as u32) } else { None }; + } + #[cfg(feature = "tuning")] + "main-elem" => { + settings.is_main_elem = value.get::().unwrap(); + } + _ => unimplemented!(), + } + } + + fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { + let settings = self.settings.lock().unwrap(); + match pspec.name() { + "context" => settings.context.to_value(), + "context-wait" => (settings.context_wait.as_millis() as u32).to_value(), + "do-timestamp" => settings.do_timestamp.to_value(), + "is-live" => settings.is_live.to_value(), + "buffer-duration" => (settings.buffer_duration.mseconds() as u32).to_value(), + "num-buffers" => settings + .num_buffers + .and_then(|val| val.try_into().ok()) + .unwrap_or(-1i32) + .to_value(), + #[cfg(feature = "tuning")] + "main-elem" => settings.is_main_elem.to_value(), + _ => unimplemented!(), + } + } + + fn constructed(&self) { + self.parent_constructed(); + + let obj = self.obj(); + obj.add_pad(self.src_pad.gst_pad()).unwrap(); + obj.set_element_flags(gst::ElementFlags::SOURCE); + } +} + +impl GstObjectImpl for AudioTestSrc {} + +impl ElementImpl for AudioTestSrc { + fn metadata() -> Option<&'static gst::subclass::ElementMetadata> { + static ELEMENT_METADATA: Lazy = Lazy::new(|| { + gst::subclass::ElementMetadata::new( + "Thread-sharing audio test source", + "Source/Test", + "Thread-sharing audio test source", + "François Laignel ", + ) + }); + + Some(&*ELEMENT_METADATA) + } + + fn pad_templates() -> &'static [gst::PadTemplate] { + static PAD_TEMPLATES: Lazy> = Lazy::new(|| { + let src_pad_template = gst::PadTemplate::new( + "src", + gst::PadDirection::Src, + gst::PadPresence::Always, + &DEFAULT_CAPS, + ) + .unwrap(); + + vec![src_pad_template] + }); + + PAD_TEMPLATES.as_ref() + } + + fn change_state( + &self, + transition: gst::StateChange, + ) -> Result { + gst::trace!(CAT, imp: self, "Changing state {transition:?}"); + + match transition { + gst::StateChange::NullToReady => { + self.prepare().map_err(|err| { + self.post_error_message(err); + gst::StateChangeError + })?; + } + gst::StateChange::PlayingToPaused => { + self.pause().map_err(|_| gst::StateChangeError)?; + } + gst::StateChange::ReadyToNull => { + self.unprepare(); + } + _ => (), + } + + let mut success = self.parent_change_state(transition)?; + + match transition { + gst::StateChange::ReadyToPaused => { + self.pause().map_err(|_| gst::StateChangeError)?; + success = gst::StateChangeSuccess::NoPreroll; + } + gst::StateChange::PausedToPlaying => { + self.start().map_err(|_| gst::StateChangeError)?; + } + gst::StateChange::PlayingToPaused => { + success = gst::StateChangeSuccess::NoPreroll; + } + gst::StateChange::PausedToReady => { + self.stop().map_err(|_| gst::StateChangeError)?; + } + _ => (), + } + + Ok(success) + } +} diff --git a/generic/threadshare/src/audiotestsrc/mod.rs b/generic/threadshare/src/audiotestsrc/mod.rs new file mode 100644 index 00000000..fbd16f4f --- /dev/null +++ b/generic/threadshare/src/audiotestsrc/mod.rs @@ -0,0 +1,17 @@ +use gst::glib; +use gst::prelude::*; + +mod imp; + +glib::wrapper! { + pub struct AudioTestSrc(ObjectSubclass) @extends gst::Element, gst::Object; +} + +pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { + gst::Element::register( + Some(plugin), + "ts-audiotestsrc", + gst::Rank::None, + AudioTestSrc::static_type(), + ) +} diff --git a/generic/threadshare/src/lib.rs b/generic/threadshare/src/lib.rs index 94671379..35f2e5a8 100644 --- a/generic/threadshare/src/lib.rs +++ b/generic/threadshare/src/lib.rs @@ -16,29 +16,30 @@ #[macro_use] pub mod runtime; -pub mod socket; -mod tcpclientsrc; -mod udpsink; -mod udpsrc; - mod appsrc; +mod audiotestsrc; pub mod dataqueue; mod inputselector; mod jitterbuffer; mod proxy; mod queue; +pub mod socket; +mod tcpclientsrc; +mod udpsink; +mod udpsrc; use gst::glib; fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { - udpsrc::register(plugin)?; - udpsink::register(plugin)?; - tcpclientsrc::register(plugin)?; - queue::register(plugin)?; - proxy::register(plugin)?; appsrc::register(plugin)?; - jitterbuffer::register(plugin)?; + audiotestsrc::register(plugin)?; inputselector::register(plugin)?; + jitterbuffer::register(plugin)?; + proxy::register(plugin)?; + queue::register(plugin)?; + tcpclientsrc::register(plugin)?; + udpsink::register(plugin)?; + udpsrc::register(plugin)?; Ok(()) }