audio: migrate to new ClockTime design

This commit is contained in:
François Laignel 2021-05-25 16:37:48 +02:00
parent 8dfc872544
commit 17feaa8c71
9 changed files with 138 additions and 113 deletions

View file

@ -30,15 +30,15 @@ static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
use super::ring_buffer::RingBuffer;
const DEFAULT_MAX_DELAY: u64 = gst::SECOND_VAL;
const DEFAULT_DELAY: u64 = 500 * gst::MSECOND_VAL;
const DEFAULT_MAX_DELAY: gst::ClockTime = gst::ClockTime::SECOND;
const DEFAULT_DELAY: gst::ClockTime = gst::ClockTime::from_seconds(500);
const DEFAULT_INTENSITY: f64 = 0.5;
const DEFAULT_FEEDBACK: f64 = 0.0;
#[derive(Debug, Clone, Copy)]
struct Settings {
pub max_delay: u64,
pub delay: u64,
pub max_delay: gst::ClockTime,
pub delay: gst::ClockTime,
pub intensity: f64,
pub feedback: f64,
}
@ -71,10 +71,10 @@ impl AudioEcho {
state: &mut State,
settings: &Settings,
) {
let delay_frames = (settings.delay as usize)
* (state.info.channels() as usize)
* (state.info.rate() as usize)
/ (gst::SECOND_VAL as usize);
let delay_frames = (settings.delay
* (state.info.channels() as u64)
* (state.info.rate() as u64))
.seconds() as usize;
for (i, (o, e)) in data.iter_mut().zip(state.buffer.iter(delay_frames)) {
let inp = (*i).to_f64().unwrap();
@ -99,8 +99,8 @@ impl ObjectImpl for AudioEcho {
glib::ParamSpec::new_uint64("max-delay",
"Maximum Delay",
"Maximum delay of the echo in nanoseconds (can't be changed in PLAYING or PAUSED state)",
0, u64::MAX,
DEFAULT_MAX_DELAY,
0, u64::MAX - 1,
DEFAULT_MAX_DELAY.nseconds(),
glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
),
glib::ParamSpec::new_uint64(
@ -108,8 +108,8 @@ impl ObjectImpl for AudioEcho {
"Delay",
"Delay of the echo in nanoseconds",
0,
u64::MAX,
DEFAULT_DELAY,
u64::MAX - 1,
DEFAULT_DELAY.nseconds(),
glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_PLAYING,
),
glib::ParamSpec::new_double(
@ -147,12 +147,14 @@ impl ObjectImpl for AudioEcho {
"max-delay" => {
let mut settings = self.settings.lock().unwrap();
if self.state.lock().unwrap().is_none() {
settings.max_delay = value.get().expect("type checked upstream");
settings.max_delay =
gst::ClockTime::from_nseconds(value.get().expect("type checked upstream"));
}
}
"delay" => {
let mut settings = self.settings.lock().unwrap();
settings.delay = value.get().expect("type checked upstream");
settings.delay =
gst::ClockTime::from_nseconds(value.get().expect("type checked upstream"));
}
"intensity" => {
let mut settings = self.settings.lock().unwrap();
@ -292,12 +294,12 @@ impl BaseTransformImpl for AudioEcho {
let info = gst_audio::AudioInfo::from_caps(incaps)
.map_err(|_| gst::loggable_error!(CAT, "Failed to parse input caps"))?;
let max_delay = self.settings.lock().unwrap().max_delay;
let size = max_delay * (info.rate() as u64) / gst::SECOND_VAL;
let buffer_size = size * (info.channels() as u64);
let size = (max_delay * (info.rate() as u64)).seconds() as usize;
let buffer_size = size * (info.channels() as usize);
*self.state.lock().unwrap() = Some(State {
info,
buffer: RingBuffer::new(buffer_size as usize),
buffer: RingBuffer::new(buffer_size),
});
Ok(())

View file

@ -232,10 +232,10 @@ impl State {
{
let (pts, distance) = self.adapter.prev_pts();
let distance_samples = distance / self.info.bpf() as u64;
let pts = pts
+ gst::ClockTime::from(
distance_samples.mul_div_floor(gst::SECOND_VAL, self.info.rate() as u64),
);
let distance_ts = distance_samples
.mul_div_floor(*gst::ClockTime::SECOND, self.info.rate() as u64)
.map(gst::ClockTime::from_nseconds);
let pts = pts.zip(distance_ts).map(|(pts, dist)| pts + dist);
let inbuf = self
.adapter
@ -253,8 +253,11 @@ impl State {
outbuf.set_pts(pts);
outbuf.set_duration(
(outbuf.size() as u64)
.mul_div_floor(gst::SECOND_VAL, (self.info.bpf() * self.info.rate()) as u64)
.into(),
.mul_div_floor(
*gst::ClockTime::SECOND,
(self.info.bpf() * self.info.rate()) as u64,
)
.map(gst::ClockTime::from_nseconds),
);
}
@ -270,10 +273,10 @@ impl State {
let (pts, distance) = self.adapter.prev_pts();
let distance_samples = distance / self.info.bpf() as u64;
let pts = pts
+ gst::ClockTime::from(
distance_samples.mul_div_floor(gst::SECOND_VAL, self.info.rate() as u64),
);
let distance_ts = distance_samples
.mul_div_floor(*gst::ClockTime::SECOND, self.info.rate() as u64)
.map(gst::ClockTime::from_nseconds);
let pts = pts.zip(distance_ts).map(|(pts, dist)| pts + dist);
let mut _mapped_inbuf = None;
let src = if self.adapter.available() > 0 {
@ -310,8 +313,11 @@ impl State {
outbuf.set_pts(pts);
outbuf.set_duration(
(outbuf.size() as u64)
.mul_div_floor(gst::SECOND_VAL, (self.info.bpf() * self.info.rate()) as u64)
.into(),
.mul_div_floor(
*gst::ClockTime::SECOND,
(self.info.bpf() * self.info.rate()) as u64,
)
.map(gst::ClockTime::from_nseconds),
);
}
@ -370,8 +376,8 @@ impl State {
&mut self,
element: &super::AudioLoudNorm,
src: &[f64],
pts: gst::ClockTime,
) -> Result<(gst::Buffer, gst::ClockTime), gst::FlowError> {
pts: impl Into<Option<gst::ClockTime>>,
) -> Result<(gst::Buffer, Option<gst::ClockTime>), gst::FlowError> {
// Fill our whole buffer here with the initial input, i.e. 3000ms of samples.
self.buf.copy_from_slice(src);
@ -439,7 +445,7 @@ impl State {
// PTS is the input PTS for the first frame, we output the first 100ms of the input
// buffer here
Ok((outbuf, pts))
Ok((outbuf, pts.into()))
}
fn process_fill_inner_frame(&mut self, element: &super::AudioLoudNorm, src: &[f64]) {
@ -600,8 +606,8 @@ impl State {
&mut self,
element: &super::AudioLoudNorm,
src: &[f64],
pts: gst::ClockTime,
) -> Result<(gst::Buffer, gst::ClockTime), gst::FlowError> {
pts: impl Into<Option<gst::ClockTime>>,
) -> Result<(gst::Buffer, Option<gst::ClockTime>), gst::FlowError> {
// Fill in these 100ms and adjust its gain according to previous measurements, and
// at the same time copy 100ms over to the limiter_buf.
self.process_fill_inner_frame(element, src);
@ -632,7 +638,9 @@ impl State {
// PTS is 2.9s seconds before the input PTS as we buffer 3s of samples and just
// outputted here the first 100ms of that.
let pts = pts + 100 * gst::MSECOND - 3 * gst::SECOND;
let pts = pts
.into()
.map(|pts| pts + 100 * gst::ClockTime::MSECOND - 3 * gst::ClockTime::SECOND);
Ok((outbuf, pts))
}
@ -693,8 +701,8 @@ impl State {
&mut self,
element: &super::AudioLoudNorm,
src: &[f64],
pts: gst::ClockTime,
) -> Result<(gst::Buffer, gst::ClockTime), gst::FlowError> {
pts: impl Into<Option<gst::ClockTime>>,
) -> Result<(gst::Buffer, Option<gst::ClockTime>), gst::FlowError> {
let channels = self.info.channels() as usize;
let num_samples = src.len() / channels;
@ -769,7 +777,9 @@ impl State {
// PTS is 2.9s seconds before the input PTS as we buffer 3s of samples and just
// outputted here the first 100ms of that.
let pts = pts + 100 * gst::MSECOND - 3 * gst::SECOND;
let pts = pts
.into()
.map(|pts| pts + 100 * gst::ClockTime::MSECOND - 3 * gst::ClockTime::SECOND);
Ok((outbuf, pts))
}
@ -777,8 +787,8 @@ impl State {
&mut self,
element: &super::AudioLoudNorm,
src: &[f64],
pts: gst::ClockTime,
) -> Result<(gst::Buffer, gst::ClockTime), gst::FlowError> {
pts: impl Into<Option<gst::ClockTime>>,
) -> Result<(gst::Buffer, Option<gst::ClockTime>), gst::FlowError> {
// Apply a linear scale factor to the whole buffer
gst_debug!(
@ -807,15 +817,15 @@ impl State {
}
// PTS is input PTS as we just pass through the data without latency.
Ok((outbuf, pts))
Ok((outbuf, pts.into()))
}
fn process(
&mut self,
element: &super::AudioLoudNorm,
src: &[f64],
pts: gst::ClockTime,
) -> Result<(gst::Buffer, gst::ClockTime), gst::FlowError> {
pts: impl Into<Option<gst::ClockTime>>,
) -> Result<(gst::Buffer, Option<gst::ClockTime>), gst::FlowError> {
self.r128_in
.add_frames_f64(src)
.map_err(|_| gst::FlowError::Error)?;
@ -1690,8 +1700,8 @@ impl AudioLoudNorm {
let (live, min_latency, max_latency) = peer_query.result();
q.set(
live,
min_latency + 3 * gst::SECOND,
max_latency + 3 * gst::SECOND,
min_latency + 3 * gst::ClockTime::SECOND,
max_latency.map(|max| max + 3 * gst::ClockTime::SECOND),
);
true
} else {

View file

@ -52,16 +52,18 @@ pub struct AudioRNNoise {
impl State {
// The following three functions are copied from the csound filter.
fn buffer_duration(&self, buffer_size: u64) -> gst::ClockTime {
fn buffer_duration(&self, buffer_size: u64) -> Option<gst::ClockTime> {
let samples = buffer_size / self.in_info.bpf() as u64;
self.samples_to_time(samples)
}
fn samples_to_time(&self, samples: u64) -> gst::ClockTime {
gst::ClockTime(samples.mul_div_round(gst::SECOND_VAL, self.in_info.rate() as u64))
fn samples_to_time(&self, samples: u64) -> Option<gst::ClockTime> {
samples
.mul_div_round(*gst::ClockTime::SECOND, self.in_info.rate() as u64)
.map(gst::ClockTime::from_nseconds)
}
fn current_pts(&self) -> gst::ClockTime {
fn current_pts(&self) -> Option<gst::ClockTime> {
// get the last seen pts and the amount of bytes
// since then
let (prev_pts, distance) = self.adapter.prev_pts();
@ -71,7 +73,9 @@ impl State {
// can be added to the prev_pts to get the
// pts at the beginning of the adapter.
let samples = distance / self.in_info.bpf() as u64;
prev_pts + self.samples_to_time(samples)
prev_pts
.zip(self.samples_to_time(samples))
.map(|(prev_pts, time_offset)| prev_pts + time_offset)
}
fn needs_more_data(&self) -> bool {
@ -359,11 +363,12 @@ impl BaseTransformImpl for AudioRNNoise {
"Peer latency: live {} min {} max {}",
live,
min,
max
max.display(),
);
min += gst::ClockTime::from_seconds((FRAME_SIZE / 48000) as u64);
max += gst::ClockTime::from_seconds((FRAME_SIZE / 48000) as u64);
max = max
.map(|max| max + gst::ClockTime::from_seconds((FRAME_SIZE / 48000) as u64));
q.set(live, min, max);
return true;
}

View file

@ -81,13 +81,13 @@ impl From<Mode> for ebur128::Mode {
const DEFAULT_MODE: Mode = Mode::all();
const DEFAULT_POST_MESSAGES: bool = true;
const DEFAULT_INTERVAL: u64 = gst::SECOND_VAL;
const DEFAULT_INTERVAL: gst::ClockTime = gst::ClockTime::SECOND;
#[derive(Debug, Clone, Copy)]
struct Settings {
mode: Mode,
post_messages: bool,
interval: u64,
interval: gst::ClockTime,
}
impl Default for Settings {
@ -104,8 +104,8 @@ struct State {
info: gst_audio::AudioInfo,
ebur128: ebur128::EbuR128,
num_frames: u64,
interval_frames: u64,
interval_frames_remaining: u64,
interval_frames: gst::ClockTime,
interval_frames_remaining: gst::ClockTime,
}
#[derive(Default)]
@ -167,8 +167,8 @@ impl ObjectImpl for EbuR128Level {
"Interval",
"Interval in nanoseconds for posting messages",
0,
u64::MAX,
DEFAULT_INTERVAL,
u64::MAX - 1,
DEFAULT_INTERVAL.nseconds(),
glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
),
]
@ -209,13 +209,14 @@ impl ObjectImpl for EbuR128Level {
settings.post_messages = post_messages;
}
"interval" => {
let interval = value.get().expect("type checked upstream");
let interval =
gst::ClockTime::from_nseconds(value.get().expect("type checked upstream"));
gst_info!(
CAT,
obj: obj,
"Changing interval from {} to {}",
gst::ClockTime::from(settings.interval),
gst::ClockTime::from(interval)
settings.interval,
interval,
);
settings.interval = interval;
}
@ -396,7 +397,7 @@ impl BaseTransformImpl for EbuR128Level {
let interval_frames = settings
.interval
.mul_div_floor(info.rate() as u64, gst::SECOND_VAL)
.mul_div_floor(info.rate() as u64, *gst::ClockTime::SECOND)
.unwrap();
*self.state.borrow_mut() = Some(State {
@ -459,7 +460,10 @@ impl BaseTransformImpl for EbuR128Level {
state.num_frames = 0;
}
let to_process = u64::min(state.interval_frames_remaining, frames.num_frames() as u64);
let to_process = u64::min(
state.interval_frames_remaining.nseconds(),
frames.num_frames() as u64,
);
frames
.process(to_process, &mut state.ebur128)
@ -472,30 +476,25 @@ impl BaseTransformImpl for EbuR128Level {
gst::FlowError::Error
})?;
state.interval_frames_remaining -= to_process;
state.interval_frames_remaining -= gst::ClockTime::from_nseconds(to_process);
state.num_frames += to_process;
// The timestamp we report in messages is always the timestamp until which measurements
// are included, not the starting timestamp.
timestamp += gst::ClockTime::from(
to_process
.mul_div_floor(gst::SECOND_VAL, state.info.rate() as u64)
.unwrap(),
);
timestamp = timestamp.map(|ts| {
ts + to_process
.mul_div_floor(*gst::ClockTime::SECOND, state.info.rate() as u64)
.map(gst::ClockTime::from_nseconds)
.unwrap()
});
// Post a message whenever an interval is full
if state.interval_frames_remaining == 0 {
if state.interval_frames_remaining.is_zero() {
state.interval_frames_remaining = state.interval_frames;
if settings.post_messages {
let running_time = segment
.as_ref()
.map(|s| s.to_running_time(timestamp))
.unwrap_or(gst::CLOCK_TIME_NONE);
let stream_time = segment
.as_ref()
.map(|s| s.to_stream_time(timestamp))
.unwrap_or(gst::CLOCK_TIME_NONE);
let running_time = segment.as_ref().and_then(|s| s.to_running_time(timestamp));
let stream_time = segment.as_ref().and_then(|s| s.to_stream_time(timestamp));
let mut s = gst::Structure::builder("ebur128-level")
.field("timestamp", &timestamp)

View file

@ -54,7 +54,7 @@ fn run_test(
second_input = second_input,
num_buffers = num_buffers,
samples_per_buffer = samples_per_buffer,
output_buffer_duration = samples_per_buffer as u64 * gst::SECOND_VAL / 192_000,
output_buffer_duration = samples_per_buffer as u64 * *gst::ClockTime::SECOND / 192_000,
format = format,
))
} else {
@ -102,7 +102,7 @@ fn run_test(
let mut eos = false;
let bus = pipeline.bus().unwrap();
while let Some(msg) = bus.timed_pop(gst::CLOCK_TIME_NONE) {
while let Some(msg) = bus.timed_pop(gst::ClockTime::NONE) {
use gst::MessageView;
match msg.view() {
MessageView::Eos(..) => {
@ -127,17 +127,17 @@ fn run_test(
.unwrap();
let mut num_samples = 0;
let mut expected_ts = gst::ClockTime::from(0);
let mut expected_ts = gst::ClockTime::ZERO;
for sample in samples.iter() {
use std::cmp::Ordering;
let buf = sample.buffer().unwrap();
let ts = buf.pts();
let ts = buf.pts().expect("undefined pts");
match ts.cmp(&expected_ts) {
Ordering::Greater => {
assert!(
ts - expected_ts <= gst::ClockTime::from(1),
ts - expected_ts <= gst::ClockTime::NSECOND,
"TS is {} instead of {}",
ts,
expected_ts
@ -145,7 +145,7 @@ fn run_test(
}
Ordering::Less => {
assert!(
expected_ts - ts <= gst::ClockTime::from(1),
expected_ts - ts <= gst::ClockTime::NSECOND,
"TS is {} instead of {}",
ts,
expected_ts
@ -160,8 +160,7 @@ fn run_test(
num_samples += data.len() / channels as usize;
r128.add_frames_f64(data).unwrap();
expected_ts +=
gst::ClockTime::from((data.len() as u64 / channels as u64) * gst::SECOND_VAL / 192_000);
expected_ts += gst::ClockTime::from_seconds(data.len() as u64 / channels as u64) / 192_000;
}
assert_eq!(

View file

@ -125,9 +125,9 @@ fn run_test(layout: gst_audio::AudioLayout, format: gst_audio::AudioFormat) {
let timestamp = s.get::<u64>("timestamp").unwrap();
let running_time = s.get::<u64>("running-time").unwrap();
let stream_time = s.get::<u64>("stream-time").unwrap();
assert_eq!(timestamp, num_msgs * 500 * gst::MSECOND_VAL);
assert_eq!(running_time, num_msgs * 500 * gst::MSECOND_VAL);
assert_eq!(stream_time, num_msgs * 500 * gst::MSECOND_VAL);
assert_eq!(timestamp, num_msgs * 500 * *gst::ClockTime::MSECOND);
assert_eq!(running_time, num_msgs * 500 * *gst::ClockTime::MSECOND);
assert_eq!(stream_time, num_msgs * 500 * *gst::ClockTime::MSECOND);
// Check if all these exist
let _momentary_loudness = s.get::<f64>("momentary-loudness").unwrap();

View file

@ -106,7 +106,7 @@ fn main_loop(pipeline: gst::Pipeline) -> Result<(), Box<dyn Error>> {
.bus()
.expect("Pipeline without bus. Shouldn't happen!");
for msg in bus.iter_timed(gst::CLOCK_TIME_NONE) {
for msg in bus.iter_timed(gst::ClockTime::NONE) {
use gst::MessageView;
match msg.view() {

View file

@ -103,11 +103,13 @@ impl State {
self.adapter.available() < self.spin_capacity()
}
fn samples_to_time(&self, samples: u64) -> gst::ClockTime {
gst::ClockTime(samples.mul_div_round(gst::SECOND_VAL, self.in_info.rate() as u64))
fn samples_to_time(&self, samples: u64) -> Option<gst::ClockTime> {
samples
.mul_div_round(*gst::ClockTime::SECOND, self.in_info.rate() as u64)
.map(gst::ClockTime::from_nseconds)
}
fn current_pts(&self) -> gst::ClockTime {
fn current_pts(&self) -> Option<gst::ClockTime> {
// get the last seen pts and the amount of bytes
// since then
let (prev_pts, distance) = self.adapter.prev_pts();
@ -117,10 +119,12 @@ impl State {
// can be added to the prev_pts to get the
// pts at the beginning of the adapter.
let samples = distance / self.in_info.bpf() as u64;
prev_pts + self.samples_to_time(samples)
prev_pts
.zip(self.samples_to_time(samples))
.map(|(prev_pts, time_offset)| prev_pts + time_offset)
}
fn buffer_duration(&self, buffer_size: u64) -> gst::ClockTime {
fn buffer_duration(&self, buffer_size: u64) -> Option<gst::ClockTime> {
let samples = buffer_size / self.out_info.bpf() as u64;
self.samples_to_time(samples)
}
@ -265,8 +269,8 @@ impl CsoundFilter {
CAT,
obj: element,
"Generating output at: {} - duration: {}",
pts,
duration
pts.display(),
duration.display(),
);
// Get the required amount of bytes to be read from

View file

@ -73,8 +73,10 @@ fn build_harness(src_caps: gst::Caps, sink_caps: gst::Caps, csd: &str) -> gst_ch
h
}
fn duration_from_samples(num_samples: u64, rate: u64) -> gst::ClockTime {
gst::ClockTime(num_samples.mul_div_round(gst::SECOND_VAL, rate))
fn duration_from_samples(num_samples: u64, rate: u64) -> Option<gst::ClockTime> {
num_samples
.mul_div_round(*gst::ClockTime::SECOND, rate)
.map(gst::ClockTime::from_nseconds)
}
// This test verifies the well functioning of the EOS logic,
@ -116,15 +118,16 @@ fn csound_filter_eos() {
h.play();
// The input buffer pts and duration
let mut in_pts = gst::ClockTime::zero();
let in_duration = duration_from_samples(EOS_NUM_SAMPLES as _, sr as _);
let mut in_pts = gst::ClockTime::ZERO;
let in_duration = duration_from_samples(EOS_NUM_SAMPLES as _, sr as _)
.expect("duration defined because sr is > 0");
// The number of samples that were leftover during the previous iteration
let mut samples_offset = 0;
// Output samples and buffers counters
let mut num_samples: usize = 0;
let mut num_buffers = 0;
// The expected pts of output buffers
let mut expected_pts = gst::ClockTime::zero();
let mut expected_pts = gst::ClockTime::ZERO;
for _ in 0..EOS_NUM_BUFFERS {
let mut buffer =
@ -149,13 +152,14 @@ fn csound_filter_eos() {
buffer.as_ref().duration(),
duration_from_samples(in_process_samples, sr as _)
);
assert_eq!(buffer.as_ref().pts(), expected_pts);
assert_eq!(buffer.as_ref().pts(), Some(expected_pts));
// Get the number of samples that were not processed
samples_offset = in_samples % ksmps as u64;
// Calculates the next output buffer timestamp
expected_pts =
in_pts + duration_from_samples(EOS_NUM_SAMPLES as u64 - samples_offset, sr as _);
expected_pts = in_pts
+ duration_from_samples(EOS_NUM_SAMPLES as u64 - samples_offset, sr as _)
.expect("duration defined because sr is > 0");
// Calculates the next input buffer timestamp
in_pts += in_duration;
@ -177,7 +181,7 @@ fn csound_filter_eos() {
let samples_at_eos = (EOS_NUM_BUFFERS * EOS_NUM_SAMPLES) % ksmps;
assert_eq!(
buffer.as_ref().pts(),
in_pts - duration_from_samples(samples_at_eos as _, sr as _)
duration_from_samples(samples_at_eos as _, sr as _).map(|duration| in_pts - duration)
);
let map = buffer.into_mapped_buffer_readable().unwrap();
@ -226,8 +230,9 @@ fn csound_filter_underflow() {
h.play();
// Input buffers timestamp
let mut in_pts = gst::ClockTime::zero();
let in_samples_duration = duration_from_samples(UNDERFLOW_NUM_SAMPLES as _, sr as _);
let mut in_pts = gst::ClockTime::ZERO;
let in_samples_duration = duration_from_samples(UNDERFLOW_NUM_SAMPLES as _, sr as _)
.expect("duration defined because sr is > 0");
for _ in 0..UNDERFLOW_NUM_BUFFERS {
let mut buffer =
@ -247,21 +252,22 @@ fn csound_filter_underflow() {
let mut num_buffers = 0;
let mut num_samples = 0;
let expected_duration = duration_from_samples(UNDERFLOW_NUM_SAMPLES as u64 * 2, sr as _);
let expected_duration = duration_from_samples(UNDERFLOW_NUM_SAMPLES as u64 * 2, sr as _)
.expect("duration defined because sr is > 0");
let expected_buffers = UNDERFLOW_NUM_BUFFERS / 2;
let mut expected_pts = gst::ClockTime::zero();
let mut expected_pts = gst::ClockTime::ZERO;
for _ in 0..expected_buffers {
let buffer = h.pull().unwrap();
let samples = buffer.size() / std::mem::size_of::<f64>();
assert_eq!(buffer.as_ref().pts(), expected_pts);
assert_eq!(buffer.as_ref().duration(), expected_duration);
assert_eq!(buffer.as_ref().pts(), Some(expected_pts));
assert_eq!(buffer.as_ref().duration(), Some(expected_duration));
assert_eq!(samples, UNDERFLOW_NUM_SAMPLES * 2);
// Output data is produced after 2 input buffers
// so that, the next output buffer's PTS should be
// equal to the last PTS plus the duration of 2 input buffers
expected_pts += in_samples_duration * 2;
expected_pts += 2 * in_samples_duration;
num_buffers += 1;
num_samples += samples;