ts-audiotestsrc: rework

This element didn't comply with usual convetions.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/2387>
This commit is contained in:
François Laignel 2025-07-17 17:46:28 +02:00
parent ff77fc0089
commit 3a53d857b8
4 changed files with 303 additions and 153 deletions

1
Cargo.lock generated
View file

@ -3410,6 +3410,7 @@ dependencies = [
"async-lock",
"async-task",
"bitflags 2.9.1",
"byte-slice-cast",
"cc",
"cfg-if",
"clap",

View file

@ -16387,7 +16387,7 @@
"klass": "Source/Test",
"pad-templates": {
"src": {
"caps": "audio/x-raw:\n rate: [ 8000, 2147483646 ]\n channels: [ 1, 2147483646 ]\n layout: interleaved\n format: S16LE\n",
"caps": "audio/x-raw:\n rate: [ 1, 2147483647 ]\n channels: [ 1, 2147483647 ]\n layout: interleaved\n format: S16LE\n",
"direction": "src",
"presence": "always"
}
@ -16445,14 +16445,40 @@
"type": "gboolean",
"writable": true
},
"freq": {
"blurb": "Frequency",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "440",
"max": "-1",
"min": "1",
"mutable": "playing",
"readable": true,
"type": "guint",
"writable": true
},
"is-live": {
"blurb": "Whether to act as a live source",
"blurb": "(Pseudo) live output",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "false",
"mutable": "null",
"mutable": "ready",
"readable": true,
"type": "gboolean",
"writable": true
},
"mute": {
"blurb": "Mute",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "false",
"mutable": "playing",
"readable": true,
"type": "gboolean",
"writable": true
@ -16470,6 +16496,34 @@
"readable": true,
"type": "gint",
"writable": true
},
"samples-per-buffer": {
"blurb": "Number of samples per output buffer",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "1024",
"max": "-1",
"min": "1",
"mutable": "ready",
"readable": true,
"type": "guint",
"writable": true
},
"volume": {
"blurb": "Output volume",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "0.8",
"max": "10",
"min": "-1.79769e+308",
"mutable": "playing",
"readable": true,
"type": "gdouble",
"writable": true
}
},
"rank": "none"

View file

@ -11,6 +11,7 @@ rust-version.workspace = true
[dependencies]
async-task = "4.3.0"
async-lock = "3.4.0"
byte-slice-cast = "1.0"
cfg-if = "1"
concurrent-queue = "2.2.0"
futures = "0.3.28"

View file

@ -6,6 +6,7 @@
//
// SPDX-License-Identifier: MPL-2.0
use byte_slice_cast::*;
use gst::glib;
use gst::prelude::*;
use gst::subclass::prelude::*;
@ -31,36 +32,29 @@ static CAT: LazyLock<gst::DebugCategory> = LazyLock::new(|| {
const DEFAULT_CONTEXT: &str = "";
const DEFAULT_CONTEXT_WAIT: Duration = Duration::ZERO;
const DEFAULT_BUFFER_DURATION: gst::ClockTime = gst::ClockTime::from_mseconds(10);
const DEFAULT_DO_TIMESTAMP: bool = false;
const DEFAULT_SAMPLES_PER_BUFFER: u32 = 1024;
const DEFAULT_RATE: u32 = 44_100;
const DEFAULT_CHANNELS: usize = 1;
const DEFAULT_FREQ: u32 = 440;
const DEFAULT_VOLUME: f64 = 0.8;
const DEFAULT_MUTE: bool = false;
const DEFAULT_IS_LIVE: bool = false;
const DEFAULT_NUM_BUFFERS: i32 = -1;
const DEFAULT_CHANNELS: usize = 1;
const DEFAULT_FREQ: f32 = 440.0;
const DEFAULT_VOLUME: f32 = 0.8;
const DEFAULT_RATE: u32 = 44_100;
#[cfg(feature = "tuning")]
const RAMPUP_BUFFER_COUNT: u32 = 500;
#[cfg(feature = "tuning")]
const LOG_BUFFER_INTERVAL: u32 = 2000;
static DEFAULT_CAPS: LazyLock<gst::Caps> = LazyLock::new(|| {
gst_audio::AudioCapsBuilder::new_interleaved()
.format(gst_audio::AUDIO_FORMAT_S16)
.rate_range(8_000..i32::MAX)
.channels_range(1..i32::MAX)
.build()
});
#[derive(Debug, Clone)]
struct Settings {
context: String,
context_wait: Duration,
do_timestamp: bool,
samples_per_buffer: u32,
freq: u32,
volume: f64,
mute: bool,
is_live: bool,
buffer_duration: gst::ClockTime,
num_buffers: Option<u32>,
#[cfg(feature = "tuning")]
is_main_elem: bool,
@ -71,9 +65,11 @@ impl Default for Settings {
Settings {
context: DEFAULT_CONTEXT.into(),
context_wait: DEFAULT_CONTEXT_WAIT,
do_timestamp: DEFAULT_DO_TIMESTAMP,
samples_per_buffer: DEFAULT_SAMPLES_PER_BUFFER,
freq: DEFAULT_FREQ,
volume: DEFAULT_VOLUME,
mute: DEFAULT_MUTE,
is_live: DEFAULT_IS_LIVE,
buffer_duration: DEFAULT_BUFFER_DURATION,
num_buffers: None,
#[cfg(feature = "tuning")]
is_main_elem: false,
@ -86,59 +82,68 @@ struct AudioTestSrcPadHandler;
impl PadSrcHandler for AudioTestSrcPadHandler {
type ElementImpl = AudioTestSrc;
fn src_query(self, pad: &gst::Pad, imp: &Self::ElementImpl, query: &mut gst::QueryRef) -> bool {
fn src_query(
self,
pad: &gst::Pad,
elem: &Self::ElementImpl,
query: &mut gst::QueryRef,
) -> bool {
gst::debug!(CAT, obj = pad, "Received {query:?}");
if query.is_serialized() {
// See comment in runtime::pad::PadSrcHandler
return false;
}
if let gst::QueryViewMut::Latency(q) = query.view_mut() {
let settings = imp.settings.lock().unwrap();
let min_latency = if settings.is_live {
settings.buffer_duration
} else {
gst::ClockTime::ZERO
let rate = {
let caps = elem.caps.lock().unwrap();
let Some(caps) = caps.as_ref() else {
gst::debug!(CAT, imp = elem, "No caps yet");
return false;
};
let s = caps.structure(0).unwrap();
s.get::<i32>("rate").expect("negotiated")
};
q.set(
settings.is_live,
min_latency,
min_latency
+ runtime::Context::current().map_or(gst::ClockTime::ZERO, |ctx| {
gst::ClockTime::try_from(ctx.wait_duration()).unwrap()
}),
);
let settings = elem.settings.lock().unwrap();
// timers can be up to 1/2 x context-wait late
let context_wait = gst::ClockTime::try_from(settings.context_wait).unwrap();
let latency = gst::ClockTime::SECOND
.mul_div_floor(settings.samples_per_buffer as u64, rate as u64)
.unwrap()
+ context_wait / 2;
gst::debug!(CAT, imp = elem, "Returning latency {latency}");
q.set(settings.is_live, latency, gst::ClockTime::NONE);
return true;
}
gst::Pad::query_default(pad, Some(&*imp.obj()), query)
}
}
#[derive(Debug, Copy, Clone)]
enum Negotiation {
Unchanged,
Changed,
}
impl Negotiation {
fn has_changed(self) -> bool {
matches!(self, Negotiation::Changed)
gst::Pad::query_default(pad, Some(&*elem.obj()), query)
}
}
#[derive(Debug)]
struct AudioTestSrcTask {
elem: super::AudioTestSrc,
buffer_pool: gst::BufferPool,
segment: gst::FormattedSegment<gst::format::Time>,
need_initial_events: bool,
volume: f64,
freq: f64,
rate: u32,
channels: usize,
do_timestamp: bool,
is_live: bool,
samples_per_buffer: u32,
bytes_per_buffer: usize,
buffer_duration: gst::ClockTime,
need_initial_events: bool,
step: f32,
accumulator: f32,
last_buffer_end: Option<gst::ClockTime>,
caps: gst::Caps,
sample_offset: u64,
sample_stop: Option<u64>,
step: f64,
accumulator: f64,
buffer_count: u32,
num_buffers: Option<u32>,
#[cfg(feature = "tuning")]
@ -153,17 +158,26 @@ impl AudioTestSrcTask {
fn new(elem: super::AudioTestSrc) -> Self {
AudioTestSrcTask {
elem,
buffer_pool: gst::BufferPool::new(),
segment: gst::FormattedSegment::<gst::format::Time>::new(),
need_initial_events: true,
volume: DEFAULT_VOLUME,
freq: DEFAULT_FREQ as f64,
rate: DEFAULT_RATE,
channels: DEFAULT_CHANNELS,
do_timestamp: DEFAULT_DO_TIMESTAMP,
is_live: DEFAULT_IS_LIVE,
buffer_duration: DEFAULT_BUFFER_DURATION,
need_initial_events: true,
bytes_per_buffer: (DEFAULT_SAMPLES_PER_BUFFER as usize)
* DEFAULT_CHANNELS
* size_of::<i16>(),
samples_per_buffer: DEFAULT_SAMPLES_PER_BUFFER,
buffer_duration: gst::ClockTime::SECOND
.mul_div_floor(DEFAULT_SAMPLES_PER_BUFFER as u64, DEFAULT_RATE as u64)
.unwrap(),
sample_offset: 0,
sample_stop: None,
step: 0.0,
accumulator: 0.0,
last_buffer_end: None,
caps: gst::Caps::new_empty(),
buffer_count: 0,
num_buffers: None,
#[cfg(feature = "tuning")]
@ -175,15 +189,17 @@ impl AudioTestSrcTask {
}
}
async fn negotiate(&mut self) -> Result<Negotiation, gst::ErrorMessage> {
async fn negotiate(&mut self) -> Result<(), gst::ErrorMessage> {
let imp = self.elem.imp();
let pad = imp.src_pad.gst_pad();
if !pad.check_reconfigure() {
return Ok(Negotiation::Unchanged);
return Ok(());
}
let mut caps = pad.peer_query_caps(Some(&DEFAULT_CAPS));
let pad_template = self.elem.pad_template("src").unwrap();
let default_caps = pad_template.caps();
let mut caps = pad.peer_query_caps(Some(default_caps));
gst::debug!(CAT, imp = imp, "Peer returned {caps:?}");
if caps.is_empty() {
@ -195,9 +211,22 @@ impl AudioTestSrcTask {
if caps.is_any() {
gst::debug!(CAT, imp = imp, "Using our own Caps");
caps = DEFAULT_CAPS.clone();
caps = gst_audio::AudioCapsBuilder::new_interleaved()
.format(gst_audio::AUDIO_FORMAT_S16)
.channels(DEFAULT_CHANNELS as i32)
.rate(DEFAULT_RATE as i32)
.build();
}
self.set_caps(caps).await
}
async fn set_caps(&mut self, mut caps: gst::Caps) -> Result<(), gst::ErrorMessage> {
use std::ops::Rem;
let imp = self.elem.imp();
gst::debug!(CAT, imp = imp, "Configuring for caps {caps}");
{
let caps = caps.make_mut();
let s = caps.structure_mut(0).ok_or_else(|| {
@ -206,9 +235,17 @@ impl AudioTestSrcTask {
err
})?;
let old_rate = self.rate as u64;
s.fixate_field_nearest_int("rate", DEFAULT_RATE as i32);
self.rate = s.get::<i32>("rate").unwrap() as u32;
self.step = 2.0 * std::f32::consts::PI * DEFAULT_FREQ / (self.rate as f32);
if self.rate != old_rate as u32 {
self.elem.call_async(|elem| {
let _ = elem.post_message(gst::message::Latency::new());
});
}
self.step = 2.0 * std::f64::consts::PI * self.freq / (self.rate as f64);
s.fixate_field_nearest_int("channels", DEFAULT_CHANNELS as i32);
self.channels = s.get::<i32>("channels").unwrap() as usize;
@ -221,6 +258,26 @@ impl AudioTestSrcTask {
)),
);
}
// Update sample offset and accumulator based on the previous values and the
// sample rate change, if any
let old_sample_offset = self.sample_offset;
let sample_offset = old_sample_offset
.mul_div_floor(self.rate as u64, old_rate)
.unwrap();
let old_sample_stop = self.sample_stop;
self.sample_stop =
old_sample_stop.map(|v| v.mul_div_floor(self.rate as u64, old_rate).unwrap());
self.accumulator = (sample_offset as f64).rem(self.step);
self.buffer_duration = gst::ClockTime::SECOND
.mul_div_floor(self.samples_per_buffer as u64, self.rate as u64)
.unwrap();
self.bytes_per_buffer =
(self.samples_per_buffer as usize) * self.channels * size_of::<i16>();
}
caps.fixate();
@ -228,9 +285,9 @@ impl AudioTestSrcTask {
imp.src_pad.push_event(gst::event::Caps::new(&caps)).await;
self.caps = caps;
*imp.caps.lock().unwrap() = Some(caps);
Ok(Negotiation::Changed)
Ok(())
}
}
@ -242,10 +299,10 @@ impl TaskImpl for AudioTestSrcTask {
let imp = self.elem.imp();
let settings = imp.settings.lock().unwrap();
self.do_timestamp = settings.do_timestamp;
self.is_live = settings.is_live;
self.buffer_duration = settings.buffer_duration;
self.samples_per_buffer = settings.samples_per_buffer;
self.num_buffers = settings.num_buffers;
self.freq = settings.freq as f64;
#[cfg(feature = "tuning")]
{
@ -268,26 +325,10 @@ impl TaskImpl for AudioTestSrcTask {
self.elem.imp().src_pad.push_event(stream_start_evt).await;
}
if self.negotiate().await?.has_changed() {
let bytes_per_buffer = (self.rate as u64)
* self.buffer_duration.mseconds()
* self.channels as u64
* size_of::<i16>() as u64
/ 1_000;
let mut pool_config = self.buffer_pool.config();
pool_config
.as_mut()
.set_params(Some(&self.caps), bytes_per_buffer as u32, 2, 6);
self.buffer_pool.set_config(pool_config).unwrap();
}
assert!(!self.caps.is_empty());
self.buffer_pool.set_active(true).unwrap();
self.negotiate().await?;
if self.need_initial_events {
let segment_evt =
gst::event::Segment::new(&gst::FormattedSegment::<gst::format::Time>::new());
let segment_evt = gst::event::Segment::new(&self.segment);
self.elem.imp().src_pad.push_event(segment_evt).await;
self.need_initial_events = false;
@ -305,7 +346,6 @@ impl TaskImpl for AudioTestSrcTask {
async fn pause(&mut self) -> Result<(), gst::ErrorMessage> {
gst::log!(CAT, obj = self.elem, "Pausing Task");
self.buffer_pool.set_active(false).unwrap();
Ok(())
}
@ -314,69 +354,80 @@ impl TaskImpl for AudioTestSrcTask {
gst::log!(CAT, obj = self.elem, "Stopping Task");
self.need_initial_events = true;
self.sample_offset = 0;
self.sample_stop = None;
self.accumulator = 0.0;
self.last_buffer_end = None;
Ok(())
}
async fn try_next(&mut self) -> Result<gst::Buffer, gst::FlowError> {
let mut buffer = match self.buffer_pool.acquire_buffer(None) {
Ok(buffer) => buffer,
Err(err) => {
gst::error!(CAT, obj = self.elem, "Failed to acquire buffer {}", err);
return Err(err);
}
let Ok(mut buffer) = gst::Buffer::with_size(self.bytes_per_buffer) else {
gst::error!(CAT, obj = self.elem, "Failed to create buffer");
return Err(gst::FlowError::Flushing);
};
let buffer_mut = buffer.get_mut().unwrap();
let start = if self.is_live | self.do_timestamp {
self.last_buffer_end
.or_else(|| self.elem.current_running_time())
let n_samples = if let Some(sample_stop) = self.sample_stop {
if sample_stop <= self.sample_offset {
gst::log!(CAT, obj = self.elem, "At EOS");
return Err(gst::FlowError::Eos);
}
sample_stop - self.sample_offset
} else {
None
self.samples_per_buffer as u64
};
let pts = self
.sample_offset
.mul_div_floor(*gst::ClockTime::SECOND, self.rate as u64)
.map(gst::ClockTime::from_nseconds)
.unwrap();
let next_pts = (self.sample_offset + n_samples)
.mul_div_floor(*gst::ClockTime::SECOND, self.rate as u64)
.map(gst::ClockTime::from_nseconds)
.unwrap();
buffer_mut.set_pts(pts);
buffer_mut.set_duration(next_pts - pts);
{
use std::io::Write;
let mut mapped = buffer_mut.map_writable().unwrap();
let slice = mapped.as_mut_slice();
slice
.chunks_mut(self.channels * size_of::<i16>())
.for_each(|frame| {
let sample = ((self.accumulator.sin() * DEFAULT_VOLUME * (i16::MAX as f32))
as i16)
.to_ne_bytes();
let data = mapped.as_mut_slice_of::<i16>().unwrap();
for chunk in data.chunks_exact_mut(self.channels) {
let value = (self.accumulator.sin() * self.volume * (i16::MAX as f64)) as i16;
for sample in chunk {
*sample = value;
}
frame.chunks_mut(size_of::<i16>()).for_each(|mut channel| {
let _ = channel.write(&sample).unwrap();
});
self.accumulator += self.step;
if self.accumulator >= 2.0 * std::f32::consts::PI {
self.accumulator = -2.0 * std::f32::consts::PI;
}
});
self.accumulator += self.step;
if self.accumulator >= 2.0 * std::f64::consts::PI {
self.accumulator = -2.0 * std::f64::consts::PI;
}
}
}
if self.do_timestamp {
buffer_mut.set_pts(start);
buffer_mut.set_duration(self.buffer_duration);
}
self.last_buffer_end = start.opt_add(self.buffer_duration);
self.sample_offset += n_samples;
if self.is_live {
if let Some(delay) = self
.last_buffer_end
.unwrap()
.checked_sub(self.elem.current_running_time().unwrap())
{
// Wait for all samples to fit in last time slice
timer::delay_for_at_least(delay.into()).await;
}
let running_time = self
.segment
.to_running_time(buffer.pts().opt_add(buffer.duration()));
let Some(cur_rt) = self.elem.current_running_time() else {
// Let the scheduler share time with other tasks
runtime::executor::yield_now().await;
return Ok(buffer);
};
let Ok(Some(delay)) = running_time.opt_checked_sub(cur_rt) else {
// Let the scheduler share time with other tasks
runtime::executor::yield_now().await;
return Ok(buffer);
};
// Wait for all samples to fit in last time slice
timer::delay_for_at_least(delay.into()).await;
} else {
// Let the scheduler share time with other tasks
runtime::executor::yield_now().await;
@ -460,6 +511,7 @@ impl TaskImpl for AudioTestSrcTask {
pub struct AudioTestSrc {
src_pad: PadSrc,
task: Task,
caps: Mutex<Option<gst::Caps>>,
settings: Mutex<Settings>,
}
@ -522,6 +574,7 @@ impl ObjectSubclass for AudioTestSrc {
AudioTestSrcPadHandler,
),
task: Task::default(),
caps: Default::default(),
settings: Default::default(),
}
}
@ -542,18 +595,38 @@ impl ObjectImpl for AudioTestSrc {
.maximum(1000)
.default_value(DEFAULT_CONTEXT_WAIT.as_millis() as u32)
.build(),
glib::ParamSpecBoolean::builder("do-timestamp")
.nick("Do timestamp")
.blurb("Apply current stream time to buffers")
glib::ParamSpecUInt::builder("samples-per-buffer")
.nick("Samples Per Buffer")
.blurb("Number of samples per output buffer")
.minimum(1)
.default_value(DEFAULT_SAMPLES_PER_BUFFER)
.mutable_ready()
.build(),
glib::ParamSpecUInt::builder("freq")
.nick("Frequency")
.blurb("Frequency")
.minimum(1)
.default_value(DEFAULT_FREQ)
.mutable_playing()
.build(),
glib::ParamSpecDouble::builder("volume")
.nick("Volume")
.blurb("Output volume")
.maximum(10.0)
.default_value(DEFAULT_VOLUME)
.mutable_playing()
.build(),
glib::ParamSpecBoolean::builder("mute")
.nick("Mute")
.blurb("Mute")
.default_value(DEFAULT_MUTE)
.mutable_playing()
.build(),
glib::ParamSpecBoolean::builder("is-live")
.nick("Is live")
.blurb("Whether to act as a live source")
.build(),
glib::ParamSpecUInt::builder("buffer-duration")
.nick("Buffer duration")
.blurb("Buffer duration in ms")
.default_value(DEFAULT_BUFFER_DURATION.mseconds() as u32)
.nick("Is Live")
.blurb("(Pseudo) live output")
.default_value(DEFAULT_IS_LIVE)
.mutable_ready()
.build(),
glib::ParamSpecInt::builder("num-buffers")
.nick("Num Buffers")
@ -585,14 +658,26 @@ impl ObjectImpl for AudioTestSrc {
"context-wait" => {
settings.context_wait = Duration::from_millis(value.get::<u32>().unwrap().into());
}
"do-timestamp" => {
settings.do_timestamp = value.get::<bool>().unwrap();
"samples-per-buffer" => {
let mut settings = self.settings.lock().unwrap();
settings.samples_per_buffer = value.get().expect("type checked upstream");
drop(settings);
let _ = self
.obj()
.post_message(gst::message::Latency::builder().src(&*self.obj()).build());
}
"freq" => {
settings.freq = value.get().expect("type checked upstream");
}
"volume" => {
settings.volume = value.get().expect("type checked upstream");
}
"mute" => {
settings.mute = value.get().expect("type checked upstream");
}
"is-live" => {
settings.is_live = value.get::<bool>().unwrap();
}
"buffer-duration" => {
settings.buffer_duration = (value.get::<u32>().unwrap() as u64).mseconds();
settings.is_live = value.get().expect("type checked upstream");
}
"num-buffers" => {
let value = value.get::<i32>().unwrap();
@ -611,9 +696,11 @@ impl ObjectImpl for AudioTestSrc {
match pspec.name() {
"context" => settings.context.to_value(),
"context-wait" => (settings.context_wait.as_millis() as u32).to_value(),
"do-timestamp" => settings.do_timestamp.to_value(),
"samples-per-buffer" => settings.samples_per_buffer.to_value(),
"freq" => settings.freq.to_value(),
"volume" => settings.volume.to_value(),
"mute" => settings.mute.to_value(),
"is-live" => settings.is_live.to_value(),
"buffer-duration" => (settings.buffer_duration.mseconds() as u32).to_value(),
"num-buffers" => settings
.num_buffers
.and_then(|val| val.try_into().ok())
@ -652,11 +739,15 @@ impl ElementImpl for AudioTestSrc {
fn pad_templates() -> &'static [gst::PadTemplate] {
static PAD_TEMPLATES: LazyLock<Vec<gst::PadTemplate>> = LazyLock::new(|| {
let caps = gst_audio::AudioCapsBuilder::new_interleaved()
.format(gst_audio::AUDIO_FORMAT_S16)
.build();
let src_pad_template = gst::PadTemplate::new(
"src",
gst::PadDirection::Src,
gst::PadPresence::Always,
&DEFAULT_CAPS,
&caps,
)
.unwrap();
@ -691,6 +782,9 @@ impl ElementImpl for AudioTestSrc {
let mut success = self.parent_change_state(transition)?;
match transition {
gst::StateChange::ReadyToPaused => {
success = gst::StateChangeSuccess::NoPreroll;
}
gst::StateChange::PausedToPlaying => {
self.start().map_err(|_| gst::StateChangeError)?;
}