diff --git a/Cargo.lock b/Cargo.lock index 271700293..71265526a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3410,6 +3410,7 @@ dependencies = [ "async-lock", "async-task", "bitflags 2.9.1", + "byte-slice-cast", "cc", "cfg-if", "clap", diff --git a/docs/plugins/gst_plugins_cache.json b/docs/plugins/gst_plugins_cache.json index 943129681..ba8688feb 100644 --- a/docs/plugins/gst_plugins_cache.json +++ b/docs/plugins/gst_plugins_cache.json @@ -16387,7 +16387,7 @@ "klass": "Source/Test", "pad-templates": { "src": { - "caps": "audio/x-raw:\n rate: [ 8000, 2147483646 ]\n channels: [ 1, 2147483646 ]\n layout: interleaved\n format: S16LE\n", + "caps": "audio/x-raw:\n rate: [ 1, 2147483647 ]\n channels: [ 1, 2147483647 ]\n layout: interleaved\n format: S16LE\n", "direction": "src", "presence": "always" } @@ -16445,14 +16445,40 @@ "type": "gboolean", "writable": true }, + "freq": { + "blurb": "Frequency", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "440", + "max": "-1", + "min": "1", + "mutable": "playing", + "readable": true, + "type": "guint", + "writable": true + }, "is-live": { - "blurb": "Whether to act as a live source", + "blurb": "(Pseudo) live output", "conditionally-available": false, "construct": false, "construct-only": false, "controllable": false, "default": "false", - "mutable": "null", + "mutable": "ready", + "readable": true, + "type": "gboolean", + "writable": true + }, + "mute": { + "blurb": "Mute", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "false", + "mutable": "playing", "readable": true, "type": "gboolean", "writable": true @@ -16470,6 +16496,34 @@ "readable": true, "type": "gint", "writable": true + }, + "samples-per-buffer": { + "blurb": "Number of samples per output buffer", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "1024", + "max": "-1", + "min": "1", + "mutable": "ready", + "readable": true, + "type": "guint", + "writable": true + }, + "volume": { + "blurb": "Output volume", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "0.8", + "max": "10", + "min": "-1.79769e+308", + "mutable": "playing", + "readable": true, + "type": "gdouble", + "writable": true } }, "rank": "none" diff --git a/generic/threadshare/Cargo.toml b/generic/threadshare/Cargo.toml index a61bb1ebc..fcdc400b3 100644 --- a/generic/threadshare/Cargo.toml +++ b/generic/threadshare/Cargo.toml @@ -11,6 +11,7 @@ rust-version.workspace = true [dependencies] async-task = "4.3.0" async-lock = "3.4.0" +byte-slice-cast = "1.0" cfg-if = "1" concurrent-queue = "2.2.0" futures = "0.3.28" diff --git a/generic/threadshare/src/audiotestsrc/imp.rs b/generic/threadshare/src/audiotestsrc/imp.rs index 7b2de0dfe..bf55a1ae4 100644 --- a/generic/threadshare/src/audiotestsrc/imp.rs +++ b/generic/threadshare/src/audiotestsrc/imp.rs @@ -6,6 +6,7 @@ // // SPDX-License-Identifier: MPL-2.0 +use byte_slice_cast::*; use gst::glib; use gst::prelude::*; use gst::subclass::prelude::*; @@ -31,36 +32,29 @@ static CAT: LazyLock = LazyLock::new(|| { const DEFAULT_CONTEXT: &str = ""; const DEFAULT_CONTEXT_WAIT: Duration = Duration::ZERO; -const DEFAULT_BUFFER_DURATION: gst::ClockTime = gst::ClockTime::from_mseconds(10); -const DEFAULT_DO_TIMESTAMP: bool = false; +const DEFAULT_SAMPLES_PER_BUFFER: u32 = 1024; +const DEFAULT_RATE: u32 = 44_100; +const DEFAULT_CHANNELS: usize = 1; +const DEFAULT_FREQ: u32 = 440; +const DEFAULT_VOLUME: f64 = 0.8; +const DEFAULT_MUTE: bool = false; const DEFAULT_IS_LIVE: bool = false; const DEFAULT_NUM_BUFFERS: i32 = -1; -const DEFAULT_CHANNELS: usize = 1; -const DEFAULT_FREQ: f32 = 440.0; -const DEFAULT_VOLUME: f32 = 0.8; -const DEFAULT_RATE: u32 = 44_100; - #[cfg(feature = "tuning")] const RAMPUP_BUFFER_COUNT: u32 = 500; #[cfg(feature = "tuning")] const LOG_BUFFER_INTERVAL: u32 = 2000; -static DEFAULT_CAPS: LazyLock = LazyLock::new(|| { - gst_audio::AudioCapsBuilder::new_interleaved() - .format(gst_audio::AUDIO_FORMAT_S16) - .rate_range(8_000..i32::MAX) - .channels_range(1..i32::MAX) - .build() -}); - #[derive(Debug, Clone)] struct Settings { context: String, context_wait: Duration, - do_timestamp: bool, + samples_per_buffer: u32, + freq: u32, + volume: f64, + mute: bool, is_live: bool, - buffer_duration: gst::ClockTime, num_buffers: Option, #[cfg(feature = "tuning")] is_main_elem: bool, @@ -71,9 +65,11 @@ impl Default for Settings { Settings { context: DEFAULT_CONTEXT.into(), context_wait: DEFAULT_CONTEXT_WAIT, - do_timestamp: DEFAULT_DO_TIMESTAMP, + samples_per_buffer: DEFAULT_SAMPLES_PER_BUFFER, + freq: DEFAULT_FREQ, + volume: DEFAULT_VOLUME, + mute: DEFAULT_MUTE, is_live: DEFAULT_IS_LIVE, - buffer_duration: DEFAULT_BUFFER_DURATION, num_buffers: None, #[cfg(feature = "tuning")] is_main_elem: false, @@ -86,59 +82,68 @@ struct AudioTestSrcPadHandler; impl PadSrcHandler for AudioTestSrcPadHandler { type ElementImpl = AudioTestSrc; - fn src_query(self, pad: &gst::Pad, imp: &Self::ElementImpl, query: &mut gst::QueryRef) -> bool { + fn src_query( + self, + pad: &gst::Pad, + elem: &Self::ElementImpl, + query: &mut gst::QueryRef, + ) -> bool { gst::debug!(CAT, obj = pad, "Received {query:?}"); + if query.is_serialized() { + // See comment in runtime::pad::PadSrcHandler + return false; + } + if let gst::QueryViewMut::Latency(q) = query.view_mut() { - let settings = imp.settings.lock().unwrap(); - let min_latency = if settings.is_live { - settings.buffer_duration - } else { - gst::ClockTime::ZERO + let rate = { + let caps = elem.caps.lock().unwrap(); + let Some(caps) = caps.as_ref() else { + gst::debug!(CAT, imp = elem, "No caps yet"); + return false; + }; + + let s = caps.structure(0).unwrap(); + s.get::("rate").expect("negotiated") }; - q.set( - settings.is_live, - min_latency, - min_latency - + runtime::Context::current().map_or(gst::ClockTime::ZERO, |ctx| { - gst::ClockTime::try_from(ctx.wait_duration()).unwrap() - }), - ); + let settings = elem.settings.lock().unwrap(); + // timers can be up to 1/2 x context-wait late + let context_wait = gst::ClockTime::try_from(settings.context_wait).unwrap(); + let latency = gst::ClockTime::SECOND + .mul_div_floor(settings.samples_per_buffer as u64, rate as u64) + .unwrap() + + context_wait / 2; + + gst::debug!(CAT, imp = elem, "Returning latency {latency}"); + q.set(settings.is_live, latency, gst::ClockTime::NONE); return true; } - gst::Pad::query_default(pad, Some(&*imp.obj()), query) - } -} - -#[derive(Debug, Copy, Clone)] -enum Negotiation { - Unchanged, - Changed, -} - -impl Negotiation { - fn has_changed(self) -> bool { - matches!(self, Negotiation::Changed) + gst::Pad::query_default(pad, Some(&*elem.obj()), query) } } #[derive(Debug)] struct AudioTestSrcTask { elem: super::AudioTestSrc, - buffer_pool: gst::BufferPool, + segment: gst::FormattedSegment, + need_initial_events: bool, + + volume: f64, + freq: f64, rate: u32, channels: usize, - do_timestamp: bool, is_live: bool, + samples_per_buffer: u32, + bytes_per_buffer: usize, buffer_duration: gst::ClockTime, - need_initial_events: bool, - step: f32, - accumulator: f32, - last_buffer_end: Option, - caps: gst::Caps, + sample_offset: u64, + sample_stop: Option, + step: f64, + accumulator: f64, + buffer_count: u32, num_buffers: Option, #[cfg(feature = "tuning")] @@ -153,17 +158,26 @@ impl AudioTestSrcTask { fn new(elem: super::AudioTestSrc) -> Self { AudioTestSrcTask { elem, - buffer_pool: gst::BufferPool::new(), + segment: gst::FormattedSegment::::new(), + need_initial_events: true, + + volume: DEFAULT_VOLUME, + freq: DEFAULT_FREQ as f64, rate: DEFAULT_RATE, channels: DEFAULT_CHANNELS, - do_timestamp: DEFAULT_DO_TIMESTAMP, is_live: DEFAULT_IS_LIVE, - buffer_duration: DEFAULT_BUFFER_DURATION, - need_initial_events: true, + bytes_per_buffer: (DEFAULT_SAMPLES_PER_BUFFER as usize) + * DEFAULT_CHANNELS + * size_of::(), + samples_per_buffer: DEFAULT_SAMPLES_PER_BUFFER, + buffer_duration: gst::ClockTime::SECOND + .mul_div_floor(DEFAULT_SAMPLES_PER_BUFFER as u64, DEFAULT_RATE as u64) + .unwrap(), + sample_offset: 0, + sample_stop: None, step: 0.0, accumulator: 0.0, - last_buffer_end: None, - caps: gst::Caps::new_empty(), + buffer_count: 0, num_buffers: None, #[cfg(feature = "tuning")] @@ -175,15 +189,17 @@ impl AudioTestSrcTask { } } - async fn negotiate(&mut self) -> Result { + async fn negotiate(&mut self) -> Result<(), gst::ErrorMessage> { let imp = self.elem.imp(); let pad = imp.src_pad.gst_pad(); if !pad.check_reconfigure() { - return Ok(Negotiation::Unchanged); + return Ok(()); } - let mut caps = pad.peer_query_caps(Some(&DEFAULT_CAPS)); + let pad_template = self.elem.pad_template("src").unwrap(); + let default_caps = pad_template.caps(); + let mut caps = pad.peer_query_caps(Some(default_caps)); gst::debug!(CAT, imp = imp, "Peer returned {caps:?}"); if caps.is_empty() { @@ -195,9 +211,22 @@ impl AudioTestSrcTask { if caps.is_any() { gst::debug!(CAT, imp = imp, "Using our own Caps"); - caps = DEFAULT_CAPS.clone(); + caps = gst_audio::AudioCapsBuilder::new_interleaved() + .format(gst_audio::AUDIO_FORMAT_S16) + .channels(DEFAULT_CHANNELS as i32) + .rate(DEFAULT_RATE as i32) + .build(); } + self.set_caps(caps).await + } + + async fn set_caps(&mut self, mut caps: gst::Caps) -> Result<(), gst::ErrorMessage> { + use std::ops::Rem; + + let imp = self.elem.imp(); + gst::debug!(CAT, imp = imp, "Configuring for caps {caps}"); + { let caps = caps.make_mut(); let s = caps.structure_mut(0).ok_or_else(|| { @@ -206,9 +235,17 @@ impl AudioTestSrcTask { err })?; + let old_rate = self.rate as u64; s.fixate_field_nearest_int("rate", DEFAULT_RATE as i32); self.rate = s.get::("rate").unwrap() as u32; - self.step = 2.0 * std::f32::consts::PI * DEFAULT_FREQ / (self.rate as f32); + + if self.rate != old_rate as u32 { + self.elem.call_async(|elem| { + let _ = elem.post_message(gst::message::Latency::new()); + }); + } + + self.step = 2.0 * std::f64::consts::PI * self.freq / (self.rate as f64); s.fixate_field_nearest_int("channels", DEFAULT_CHANNELS as i32); self.channels = s.get::("channels").unwrap() as usize; @@ -221,6 +258,26 @@ impl AudioTestSrcTask { )), ); } + + // Update sample offset and accumulator based on the previous values and the + // sample rate change, if any + let old_sample_offset = self.sample_offset; + let sample_offset = old_sample_offset + .mul_div_floor(self.rate as u64, old_rate) + .unwrap(); + + let old_sample_stop = self.sample_stop; + self.sample_stop = + old_sample_stop.map(|v| v.mul_div_floor(self.rate as u64, old_rate).unwrap()); + + self.accumulator = (sample_offset as f64).rem(self.step); + + self.buffer_duration = gst::ClockTime::SECOND + .mul_div_floor(self.samples_per_buffer as u64, self.rate as u64) + .unwrap(); + + self.bytes_per_buffer = + (self.samples_per_buffer as usize) * self.channels * size_of::(); } caps.fixate(); @@ -228,9 +285,9 @@ impl AudioTestSrcTask { imp.src_pad.push_event(gst::event::Caps::new(&caps)).await; - self.caps = caps; + *imp.caps.lock().unwrap() = Some(caps); - Ok(Negotiation::Changed) + Ok(()) } } @@ -242,10 +299,10 @@ impl TaskImpl for AudioTestSrcTask { let imp = self.elem.imp(); let settings = imp.settings.lock().unwrap(); - self.do_timestamp = settings.do_timestamp; self.is_live = settings.is_live; - self.buffer_duration = settings.buffer_duration; + self.samples_per_buffer = settings.samples_per_buffer; self.num_buffers = settings.num_buffers; + self.freq = settings.freq as f64; #[cfg(feature = "tuning")] { @@ -268,26 +325,10 @@ impl TaskImpl for AudioTestSrcTask { self.elem.imp().src_pad.push_event(stream_start_evt).await; } - if self.negotiate().await?.has_changed() { - let bytes_per_buffer = (self.rate as u64) - * self.buffer_duration.mseconds() - * self.channels as u64 - * size_of::() as u64 - / 1_000; - - let mut pool_config = self.buffer_pool.config(); - pool_config - .as_mut() - .set_params(Some(&self.caps), bytes_per_buffer as u32, 2, 6); - self.buffer_pool.set_config(pool_config).unwrap(); - } - - assert!(!self.caps.is_empty()); - self.buffer_pool.set_active(true).unwrap(); + self.negotiate().await?; if self.need_initial_events { - let segment_evt = - gst::event::Segment::new(&gst::FormattedSegment::::new()); + let segment_evt = gst::event::Segment::new(&self.segment); self.elem.imp().src_pad.push_event(segment_evt).await; self.need_initial_events = false; @@ -305,7 +346,6 @@ impl TaskImpl for AudioTestSrcTask { async fn pause(&mut self) -> Result<(), gst::ErrorMessage> { gst::log!(CAT, obj = self.elem, "Pausing Task"); - self.buffer_pool.set_active(false).unwrap(); Ok(()) } @@ -314,69 +354,80 @@ impl TaskImpl for AudioTestSrcTask { gst::log!(CAT, obj = self.elem, "Stopping Task"); self.need_initial_events = true; + self.sample_offset = 0; + self.sample_stop = None; self.accumulator = 0.0; - self.last_buffer_end = None; Ok(()) } async fn try_next(&mut self) -> Result { - let mut buffer = match self.buffer_pool.acquire_buffer(None) { - Ok(buffer) => buffer, - Err(err) => { - gst::error!(CAT, obj = self.elem, "Failed to acquire buffer {}", err); - return Err(err); - } + let Ok(mut buffer) = gst::Buffer::with_size(self.bytes_per_buffer) else { + gst::error!(CAT, obj = self.elem, "Failed to create buffer"); + return Err(gst::FlowError::Flushing); }; - let buffer_mut = buffer.get_mut().unwrap(); - let start = if self.is_live | self.do_timestamp { - self.last_buffer_end - .or_else(|| self.elem.current_running_time()) + let n_samples = if let Some(sample_stop) = self.sample_stop { + if sample_stop <= self.sample_offset { + gst::log!(CAT, obj = self.elem, "At EOS"); + return Err(gst::FlowError::Eos); + } + + sample_stop - self.sample_offset } else { - None + self.samples_per_buffer as u64 }; + let pts = self + .sample_offset + .mul_div_floor(*gst::ClockTime::SECOND, self.rate as u64) + .map(gst::ClockTime::from_nseconds) + .unwrap(); + let next_pts = (self.sample_offset + n_samples) + .mul_div_floor(*gst::ClockTime::SECOND, self.rate as u64) + .map(gst::ClockTime::from_nseconds) + .unwrap(); + buffer_mut.set_pts(pts); + buffer_mut.set_duration(next_pts - pts); + { - use std::io::Write; - let mut mapped = buffer_mut.map_writable().unwrap(); - let slice = mapped.as_mut_slice(); - slice - .chunks_mut(self.channels * size_of::()) - .for_each(|frame| { - let sample = ((self.accumulator.sin() * DEFAULT_VOLUME * (i16::MAX as f32)) - as i16) - .to_ne_bytes(); + let data = mapped.as_mut_slice_of::().unwrap(); + for chunk in data.chunks_exact_mut(self.channels) { + let value = (self.accumulator.sin() * self.volume * (i16::MAX as f64)) as i16; + for sample in chunk { + *sample = value; + } - frame.chunks_mut(size_of::()).for_each(|mut channel| { - let _ = channel.write(&sample).unwrap(); - }); - - self.accumulator += self.step; - if self.accumulator >= 2.0 * std::f32::consts::PI { - self.accumulator = -2.0 * std::f32::consts::PI; - } - }); + self.accumulator += self.step; + if self.accumulator >= 2.0 * std::f64::consts::PI { + self.accumulator = -2.0 * std::f64::consts::PI; + } + } } - if self.do_timestamp { - buffer_mut.set_pts(start); - buffer_mut.set_duration(self.buffer_duration); - } - - self.last_buffer_end = start.opt_add(self.buffer_duration); + self.sample_offset += n_samples; if self.is_live { - if let Some(delay) = self - .last_buffer_end - .unwrap() - .checked_sub(self.elem.current_running_time().unwrap()) - { - // Wait for all samples to fit in last time slice - timer::delay_for_at_least(delay.into()).await; - } + let running_time = self + .segment + .to_running_time(buffer.pts().opt_add(buffer.duration())); + + let Some(cur_rt) = self.elem.current_running_time() else { + // Let the scheduler share time with other tasks + runtime::executor::yield_now().await; + return Ok(buffer); + }; + + let Ok(Some(delay)) = running_time.opt_checked_sub(cur_rt) else { + // Let the scheduler share time with other tasks + runtime::executor::yield_now().await; + return Ok(buffer); + }; + + // Wait for all samples to fit in last time slice + timer::delay_for_at_least(delay.into()).await; } else { // Let the scheduler share time with other tasks runtime::executor::yield_now().await; @@ -460,6 +511,7 @@ impl TaskImpl for AudioTestSrcTask { pub struct AudioTestSrc { src_pad: PadSrc, task: Task, + caps: Mutex>, settings: Mutex, } @@ -522,6 +574,7 @@ impl ObjectSubclass for AudioTestSrc { AudioTestSrcPadHandler, ), task: Task::default(), + caps: Default::default(), settings: Default::default(), } } @@ -542,18 +595,38 @@ impl ObjectImpl for AudioTestSrc { .maximum(1000) .default_value(DEFAULT_CONTEXT_WAIT.as_millis() as u32) .build(), - glib::ParamSpecBoolean::builder("do-timestamp") - .nick("Do timestamp") - .blurb("Apply current stream time to buffers") + glib::ParamSpecUInt::builder("samples-per-buffer") + .nick("Samples Per Buffer") + .blurb("Number of samples per output buffer") + .minimum(1) + .default_value(DEFAULT_SAMPLES_PER_BUFFER) + .mutable_ready() + .build(), + glib::ParamSpecUInt::builder("freq") + .nick("Frequency") + .blurb("Frequency") + .minimum(1) + .default_value(DEFAULT_FREQ) + .mutable_playing() + .build(), + glib::ParamSpecDouble::builder("volume") + .nick("Volume") + .blurb("Output volume") + .maximum(10.0) + .default_value(DEFAULT_VOLUME) + .mutable_playing() + .build(), + glib::ParamSpecBoolean::builder("mute") + .nick("Mute") + .blurb("Mute") + .default_value(DEFAULT_MUTE) + .mutable_playing() .build(), glib::ParamSpecBoolean::builder("is-live") - .nick("Is live") - .blurb("Whether to act as a live source") - .build(), - glib::ParamSpecUInt::builder("buffer-duration") - .nick("Buffer duration") - .blurb("Buffer duration in ms") - .default_value(DEFAULT_BUFFER_DURATION.mseconds() as u32) + .nick("Is Live") + .blurb("(Pseudo) live output") + .default_value(DEFAULT_IS_LIVE) + .mutable_ready() .build(), glib::ParamSpecInt::builder("num-buffers") .nick("Num Buffers") @@ -585,14 +658,26 @@ impl ObjectImpl for AudioTestSrc { "context-wait" => { settings.context_wait = Duration::from_millis(value.get::().unwrap().into()); } - "do-timestamp" => { - settings.do_timestamp = value.get::().unwrap(); + "samples-per-buffer" => { + let mut settings = self.settings.lock().unwrap(); + settings.samples_per_buffer = value.get().expect("type checked upstream"); + drop(settings); + + let _ = self + .obj() + .post_message(gst::message::Latency::builder().src(&*self.obj()).build()); + } + "freq" => { + settings.freq = value.get().expect("type checked upstream"); + } + "volume" => { + settings.volume = value.get().expect("type checked upstream"); + } + "mute" => { + settings.mute = value.get().expect("type checked upstream"); } "is-live" => { - settings.is_live = value.get::().unwrap(); - } - "buffer-duration" => { - settings.buffer_duration = (value.get::().unwrap() as u64).mseconds(); + settings.is_live = value.get().expect("type checked upstream"); } "num-buffers" => { let value = value.get::().unwrap(); @@ -611,9 +696,11 @@ impl ObjectImpl for AudioTestSrc { match pspec.name() { "context" => settings.context.to_value(), "context-wait" => (settings.context_wait.as_millis() as u32).to_value(), - "do-timestamp" => settings.do_timestamp.to_value(), + "samples-per-buffer" => settings.samples_per_buffer.to_value(), + "freq" => settings.freq.to_value(), + "volume" => settings.volume.to_value(), + "mute" => settings.mute.to_value(), "is-live" => settings.is_live.to_value(), - "buffer-duration" => (settings.buffer_duration.mseconds() as u32).to_value(), "num-buffers" => settings .num_buffers .and_then(|val| val.try_into().ok()) @@ -652,11 +739,15 @@ impl ElementImpl for AudioTestSrc { fn pad_templates() -> &'static [gst::PadTemplate] { static PAD_TEMPLATES: LazyLock> = LazyLock::new(|| { + let caps = gst_audio::AudioCapsBuilder::new_interleaved() + .format(gst_audio::AUDIO_FORMAT_S16) + .build(); + let src_pad_template = gst::PadTemplate::new( "src", gst::PadDirection::Src, gst::PadPresence::Always, - &DEFAULT_CAPS, + &caps, ) .unwrap(); @@ -691,6 +782,9 @@ impl ElementImpl for AudioTestSrc { let mut success = self.parent_change_state(transition)?; match transition { + gst::StateChange::ReadyToPaused => { + success = gst::StateChangeSuccess::NoPreroll; + } gst::StateChange::PausedToPlaying => { self.start().map_err(|_| gst::StateChangeError)?; }