Update for gstreamer bindings API changes

This commit is contained in:
Sebastian Dröge 2020-03-30 00:29:40 +03:00
parent 9cdf09d106
commit bbf800f17f
6 changed files with 267 additions and 258 deletions

View file

@ -28,8 +28,7 @@ use gst::{
};
use gst_audio;
use gst_base;
use gst_base::subclass::base_transform::BaseTransformImplExt;
use gst_base::subclass::base_transform::GeneratedOutput;
use gst_base::subclass::base_transform::GenerateOutputSuccess;
use gst_base::subclass::prelude::*;
use std::sync::atomic::{AtomicBool, Ordering};
@ -300,7 +299,7 @@ impl CsoundFilter {
&self,
element: &gst_base::BaseTransform,
state: &mut State,
) -> Result<GeneratedOutput, gst::FlowError> {
) -> Result<GenerateOutputSuccess, gst::FlowError> {
let output_size = state.max_output_size(state.adapter.available());
let mut output = gst::Buffer::with_size(output_size).map_err(|_| gst::FlowError::Error)?;
@ -360,7 +359,7 @@ impl CsoundFilter {
}
}
Ok(GeneratedOutput::Buffer(output))
Ok(GenerateOutputSuccess::Buffer(output))
}
}
@ -658,7 +657,7 @@ impl BaseTransformImpl for CsoundFilter {
fn generate_output(
&self,
element: &gst_base::BaseTransform,
) -> Result<GeneratedOutput, gst::FlowError> {
) -> Result<GenerateOutputSuccess, gst::FlowError> {
// Check if there are enough data in the queued buffer and adapter,
// if it is not the case, just notify the parent class to not generate
// an output
@ -683,7 +682,7 @@ impl BaseTransformImpl for CsoundFilter {
}
}
gst_log!(CAT, "No enough data to generate output");
Ok(GeneratedOutput::NoOutput)
Ok(GenerateOutputSuccess::NoOutput)
}
}

View file

@ -957,13 +957,10 @@ impl BaseSrcImpl for ReqwestHttpSrc {
Err(None) => false,
}
}
}
fn create(
&self,
src: &gst_base::BaseSrc,
offset: u64,
_length: u32,
) -> Result<gst::Buffer, gst::FlowError> {
impl PushSrcImpl for ReqwestHttpSrc {
fn create(&self, src: &gst_base::PushSrc) -> Result<gst::Buffer, gst::FlowError> {
let mut state = self.state.lock().unwrap();
let (response, position, caps, tags) = match *state {
@ -981,15 +978,7 @@ impl BaseSrcImpl for ReqwestHttpSrc {
}
};
if *position != offset {
gst_element_error!(
src,
gst::ResourceError::Seek,
["Got unexpected offset {}, expected {}", offset, position]
);
return Err(gst::FlowError::Error);
}
let offset = *position;
let mut current_response = match response.take() {
Some(response) => response,
@ -1116,7 +1105,7 @@ impl URIHandlerImpl for ReqwestHttpSrc {
impl ObjectSubclass for ReqwestHttpSrc {
const NAME: &'static str = "ReqwestHttpSrc";
type ParentType = gst_base::BaseSrc;
type ParentType = gst_base::PushSrc;
type Instance = gst::subclass::ElementInstanceStruct<Self>;
type Class = subclass::simple::ClassStruct<Self>;

View file

@ -23,6 +23,7 @@ use gst::subclass::prelude::*;
use gst_base;
use gst_base::prelude::*;
use gst_base::subclass::base_src::CreateSuccess;
use gst_base::subclass::prelude::*;
use crate::s3url::*;
@ -412,14 +413,24 @@ impl BaseSrcImpl for S3Src {
&self,
src: &gst_base::BaseSrc,
offset: u64,
buffer: Option<&mut gst::BufferRef>,
length: u32,
) -> Result<gst::Buffer, gst::FlowError> {
) -> Result<CreateSuccess, gst::FlowError> {
// FIXME: sanity check on offset and length
let data = self.get(src, offset, u64::from(length));
match data {
/* Got data */
Ok(bytes) => Ok(gst::Buffer::from_slice(bytes)),
Ok(bytes) => {
if let Some(buffer) = buffer {
if let Err(copied_bytes) = buffer.copy_from_slice(0, bytes.as_ref()) {
buffer.set_size(copied_bytes);
}
Ok(CreateSuccess::FilledBuffer)
} else {
Ok(CreateSuccess::NewBuffer(gst::Buffer::from_slice(bytes)))
}
}
/* Interrupted */
Err(None) => Err(gst::FlowError::Flushing),
/* Actual Error */

View file

@ -166,9 +166,10 @@ impl State {
fn get_requested_buffer(
&mut self,
pad: &gst::Pad,
buffer: Option<&mut gst::BufferRef>,
requested_size: u32,
adapter_offset: usize,
) -> Result<gst::Buffer, gst::FlowError> {
) -> Result<gst::PadGetRangeSuccess, gst::FlowError> {
let avail = self.adapter.available();
gst_debug!(CAT, obj: pad, "Avail: {}", avail);
gst_debug!(CAT, obj: pad, "Adapter offset: {}", adapter_offset);
@ -179,7 +180,7 @@ impl State {
.checked_sub(adapter_offset)
.ok_or(gst::FlowError::Eos)?;
// if the available buffer size is smaller than the requested, its a short
// if the available buffer size is smaller than the requested, it's a short
// read and return that. Else return the requested size
let available_size = if available_buffer <= requested_size as usize {
available_buffer
@ -190,10 +191,15 @@ impl State {
if available_size == 0 {
self.adapter.clear();
// if the requested buffer was 0 sized, retunr an
// if the requested buffer was 0 sized, return an
// empty buffer
if requested_size == 0 {
return Ok(gst::Buffer::new());
if let Some(buffer) = buffer {
buffer.set_size(0);
return Ok(gst::PadGetRangeSuccess::FilledBuffer);
} else {
return Ok(gst::PadGetRangeSuccess::NewBuffer(gst::Buffer::new()));
}
}
return Err(gst::FlowError::Eos);
@ -204,15 +210,32 @@ impl State {
self.adapter.flush(adapter_offset);
assert!(self.adapter.available() >= available_size);
let buffer = self
.adapter
.take_buffer(available_size)
.expect("Failed to get buffer from adapter");
let res = if let Some(buffer) = buffer {
let mut map = match buffer.map_writable() {
Ok(map) => map,
Err(_) => {
gst_error!(CAT, obj: pad, "Failed to map provided buffer writable");
return Err(gst::FlowError::Error);
}
};
self.adapter.copy(0, &mut map[..available_size]);
if map.len() != available_size {
drop(map);
buffer.set_size(available_size);
}
gst::PadGetRangeSuccess::FilledBuffer
} else {
let buffer = self
.adapter
.take_buffer(available_size)
.expect("Failed to get buffer from adapter");
gst::PadGetRangeSuccess::NewBuffer(buffer)
};
// Cleanup the adapter
self.adapter.clear();
Ok(buffer)
Ok(res)
}
}
@ -247,11 +270,11 @@ struct Decrypter {
impl Decrypter {
fn set_pad_functions(_sinkpad: &gst::Pad, srcpad: &gst::Pad) {
srcpad.set_getrange_function(|pad, parent, offset, size| {
srcpad.set_getrange_function(|pad, parent, offset, buffer, size| {
Decrypter::catch_panic_pad_function(
parent,
|| Err(gst::FlowError::Error),
|decrypter, element| decrypter.get_range(pad, element, offset, size),
|decrypter, element| decrypter.get_range(pad, element, offset, buffer, size),
)
});
@ -520,8 +543,9 @@ impl Decrypter {
pad: &gst::Pad,
element: &gst::Element,
offset: u64,
buffer: Option<&mut gst::BufferRef>,
requested_size: u32,
) -> Result<gst::Buffer, gst::FlowError> {
) -> Result<gst::PadGetRangeSuccess, gst::FlowError> {
let block_size = {
let mut mutex_state = self.state.lock().unwrap();
// This will only be run after READY state,
@ -542,7 +566,7 @@ impl Decrypter {
assert!(pull_offset <= std::u32::MAX as u64);
let pull_offset = pull_offset as u32;
let buffer = self.pull_requested_buffer(
let pulled_buffer = self.pull_requested_buffer(
pad,
element,
requested_size + pull_offset,
@ -555,10 +579,10 @@ impl Decrypter {
// and will be guaranted to be initialized
let state = state.as_mut().unwrap();
state.decrypt_into_adapter(element, &self.srcpad, &buffer, chunk_index)?;
state.decrypt_into_adapter(element, &self.srcpad, &pulled_buffer, chunk_index)?;
let adapter_offset = pull_offset as usize;
state.get_requested_buffer(&self.srcpad, requested_size, adapter_offset)
state.get_requested_buffer(&self.srcpad, buffer, requested_size, adapter_offset)
}
}

View file

@ -202,7 +202,7 @@ impl SineSrc {
// up the class data
impl ObjectSubclass for SineSrc {
const NAME: &'static str = "RsSineSrc";
type ParentType = gst_base::BaseSrc;
type ParentType = gst_base::PushSrc;
type Instance = gst::subclass::ElementInstanceStruct<Self>;
type Class = subclass::simple::ClassStruct<Self>;
@ -518,13 +518,6 @@ impl BaseSrcImpl for SineSrc {
use gst::QueryView;
match query.view_mut() {
// We only work in Push mode. In Pull mode, create() could be called with
// arbitrary offsets and we would have to produce for that specific offset
QueryView::Scheduling(ref mut q) => {
q.set(gst::SchedulingFlags::SEQUENTIAL, 1, -1, 0);
q.add_scheduling_modes(&[gst::PadMode::Push]);
true
}
// In Live mode we will have a latency equal to the number of samples in each buffer.
// We can't output samples before they were produced, and the last sample of a buffer
// is produced that much after the beginning, leading to this latency calculation
@ -547,157 +540,6 @@ impl BaseSrcImpl for SineSrc {
}
}
// Creates the audio buffers
fn create(
&self,
element: &gst_base::BaseSrc,
_offset: u64,
_length: u32,
) -> Result<gst::Buffer, gst::FlowError> {
// Keep a local copy of the values of all our properties at this very moment. This
// ensures that the mutex is never locked for long and the application wouldn't
// have to block until this function returns when getting/setting property values
let settings = *self.settings.lock().unwrap();
// Get a locked reference to our state, i.e. the input and output AudioInfo
let mut state = self.state.lock().unwrap();
let info = match state.info {
None => {
gst_element_error!(element, gst::CoreError::Negotiation, ["Have no caps yet"]);
return Err(gst::FlowError::NotNegotiated);
}
Some(ref info) => info.clone(),
};
// If a stop position is set (from a seek), only produce samples up to that
// point but at most samples_per_buffer samples per buffer
let n_samples = if let Some(sample_stop) = state.sample_stop {
if sample_stop <= state.sample_offset {
gst_log!(CAT, obj: element, "At EOS");
return Err(gst::FlowError::Eos);
}
sample_stop - state.sample_offset
} else {
settings.samples_per_buffer as u64
};
// Allocate a new buffer of the required size, update the metadata with the
// current timestamp and duration and then fill it according to the current
// caps
let mut buffer =
gst::Buffer::with_size((n_samples as usize) * (info.bpf() as usize)).unwrap();
{
let buffer = buffer.get_mut().unwrap();
// Calculate the current timestamp (PTS) and the next one,
// and calculate the duration from the difference instead of
// simply the number of samples to prevent rounding errors
let pts = state
.sample_offset
.mul_div_floor(gst::SECOND_VAL, info.rate() as u64)
.unwrap()
.into();
let next_pts: gst::ClockTime = (state.sample_offset + n_samples)
.mul_div_floor(gst::SECOND_VAL, info.rate() as u64)
.unwrap()
.into();
buffer.set_pts(pts);
buffer.set_duration(next_pts - pts);
// Map the buffer writable and create the actual samples
let mut map = buffer.map_writable().unwrap();
let data = map.as_mut_slice();
if info.format() == gst_audio::AUDIO_FORMAT_F32 {
Self::process::<f32>(
data,
&mut state.accumulator,
settings.freq,
info.rate(),
info.channels(),
settings.volume,
);
} else {
Self::process::<f64>(
data,
&mut state.accumulator,
settings.freq,
info.rate(),
info.channels(),
settings.volume,
);
}
}
state.sample_offset += n_samples;
drop(state);
// If we're live, we are waiting until the time of the last sample in our buffer has
// arrived. This is the very reason why we have to report that much latency.
// A real live-source would of course only allow us to have the data available after
// that latency, e.g. when capturing from a microphone, and no waiting from our side
// would be necessary..
//
// Waiting happens based on the pipeline clock, which means that a real live source
// with its own clock would require various translations between the two clocks.
// This is out of scope for the tutorial though.
if element.is_live() {
let clock = match element.get_clock() {
None => return Ok(buffer),
Some(clock) => clock,
};
let segment = element
.get_segment()
.downcast::<gst::format::Time>()
.unwrap();
let base_time = element.get_base_time();
let running_time = segment.to_running_time(buffer.get_pts() + buffer.get_duration());
// The last sample's clock time is the base time of the element plus the
// running time of the last sample
let wait_until = running_time + base_time;
if wait_until.is_none() {
return Ok(buffer);
}
// Store the clock ID in our struct unless we're flushing anyway.
// This allows to asynchronously cancel the waiting from unlock()
// so that we immediately stop waiting on e.g. shutdown.
let mut clock_wait = self.clock_wait.lock().unwrap();
if clock_wait.flushing {
gst_debug!(CAT, obj: element, "Flushing");
return Err(gst::FlowError::Flushing);
}
let id = clock.new_single_shot_id(wait_until).unwrap();
clock_wait.clock_id = Some(id.clone());
drop(clock_wait);
gst_log!(
CAT,
obj: element,
"Waiting until {}, now {}",
wait_until,
clock.get_time()
);
let (res, jitter) = id.wait();
gst_log!(CAT, obj: element, "Waited res {:?} jitter {}", res, jitter);
self.clock_wait.lock().unwrap().clock_id.take();
// If the clock ID was unscheduled, unlock() was called
// and we should return Flushing immediately.
if res == Err(gst::ClockError::Unscheduled) {
gst_debug!(CAT, obj: element, "Flushing");
return Err(gst::FlowError::Flushing);
}
}
gst_debug!(CAT, obj: element, "Produced buffer {:?}", buffer);
Ok(buffer)
}
fn fixate(&self, element: &gst_base::BaseSrc, mut caps: gst::Caps) -> gst::Caps {
// Fixate the caps. BaseSrc will do some fixation for us, but
// as we allow any rate between 1 and MAX it would fixate to 1. 1Hz
@ -853,6 +695,154 @@ impl BaseSrcImpl for SineSrc {
}
}
impl PushSrcImpl for SineSrc {
// Creates the audio buffers
fn create(&self, element: &gst_base::PushSrc) -> Result<gst::Buffer, gst::FlowError> {
// Keep a local copy of the values of all our properties at this very moment. This
// ensures that the mutex is never locked for long and the application wouldn't
// have to block until this function returns when getting/setting property values
let settings = *self.settings.lock().unwrap();
// Get a locked reference to our state, i.e. the input and output AudioInfo
let mut state = self.state.lock().unwrap();
let info = match state.info {
None => {
gst_element_error!(element, gst::CoreError::Negotiation, ["Have no caps yet"]);
return Err(gst::FlowError::NotNegotiated);
}
Some(ref info) => info.clone(),
};
// If a stop position is set (from a seek), only produce samples up to that
// point but at most samples_per_buffer samples per buffer
let n_samples = if let Some(sample_stop) = state.sample_stop {
if sample_stop <= state.sample_offset {
gst_log!(CAT, obj: element, "At EOS");
return Err(gst::FlowError::Eos);
}
sample_stop - state.sample_offset
} else {
settings.samples_per_buffer as u64
};
// Allocate a new buffer of the required size, update the metadata with the
// current timestamp and duration and then fill it according to the current
// caps
let mut buffer =
gst::Buffer::with_size((n_samples as usize) * (info.bpf() as usize)).unwrap();
{
let buffer = buffer.get_mut().unwrap();
// Calculate the current timestamp (PTS) and the next one,
// and calculate the duration from the difference instead of
// simply the number of samples to prevent rounding errors
let pts = state
.sample_offset
.mul_div_floor(gst::SECOND_VAL, info.rate() as u64)
.unwrap()
.into();
let next_pts: gst::ClockTime = (state.sample_offset + n_samples)
.mul_div_floor(gst::SECOND_VAL, info.rate() as u64)
.unwrap()
.into();
buffer.set_pts(pts);
buffer.set_duration(next_pts - pts);
// Map the buffer writable and create the actual samples
let mut map = buffer.map_writable().unwrap();
let data = map.as_mut_slice();
if info.format() == gst_audio::AUDIO_FORMAT_F32 {
Self::process::<f32>(
data,
&mut state.accumulator,
settings.freq,
info.rate(),
info.channels(),
settings.volume,
);
} else {
Self::process::<f64>(
data,
&mut state.accumulator,
settings.freq,
info.rate(),
info.channels(),
settings.volume,
);
}
}
state.sample_offset += n_samples;
drop(state);
// If we're live, we are waiting until the time of the last sample in our buffer has
// arrived. This is the very reason why we have to report that much latency.
// A real live-source would of course only allow us to have the data available after
// that latency, e.g. when capturing from a microphone, and no waiting from our side
// would be necessary..
//
// Waiting happens based on the pipeline clock, which means that a real live source
// with its own clock would require various translations between the two clocks.
// This is out of scope for the tutorial though.
if element.is_live() {
let clock = match element.get_clock() {
None => return Ok(buffer),
Some(clock) => clock,
};
let segment = element
.get_segment()
.downcast::<gst::format::Time>()
.unwrap();
let base_time = element.get_base_time();
let running_time = segment.to_running_time(buffer.get_pts() + buffer.get_duration());
// The last sample's clock time is the base time of the element plus the
// running time of the last sample
let wait_until = running_time + base_time;
if wait_until.is_none() {
return Ok(buffer);
}
// Store the clock ID in our struct unless we're flushing anyway.
// This allows to asynchronously cancel the waiting from unlock()
// so that we immediately stop waiting on e.g. shutdown.
let mut clock_wait = self.clock_wait.lock().unwrap();
if clock_wait.flushing {
gst_debug!(CAT, obj: element, "Flushing");
return Err(gst::FlowError::Flushing);
}
let id = clock.new_single_shot_id(wait_until).unwrap();
clock_wait.clock_id = Some(id.clone());
drop(clock_wait);
gst_log!(
CAT,
obj: element,
"Waiting until {}, now {}",
wait_until,
clock.get_time()
);
let (res, jitter) = id.wait();
gst_log!(CAT, obj: element, "Waited res {:?} jitter {}", res, jitter);
self.clock_wait.lock().unwrap().clock_id.take();
// If the clock ID was unscheduled, unlock() was called
// and we should return Flushing immediately.
if res == Err(gst::ClockError::Unscheduled) {
gst_debug!(CAT, obj: element, "Flushing");
return Err(gst::FlowError::Flushing);
}
}
gst_debug!(CAT, obj: element, "Produced buffer {:?}", buffer);
Ok(buffer)
}
}
// Registers the type for our element, and then registers in GStreamer under
// the name "sinesrc" for being able to instantiate it via e.g.
// gst::ElementFactory::make().

View file

@ -6,11 +6,10 @@ In this part, a raw audio sine wave source element is going to be written. The f
1. [Boilerplate](#boilerplate)
2. [Caps Negotiation](#caps-negotiation)
3. [Query Handling](#query-handling)
4. [Buffer Creation](#buffer-creation)
5. [(Pseudo) Live Mode](#pseudo-live-mode)
6. [Unlocking](#unlocking)
7. [Seeking](#seeking)
3. [Buffer Creation](#buffer-creation)
4. [(Pseudo) Live Mode](#pseudo-live-mode)
5. [Unlocking](#unlocking)
6. [Seeking](#seeking)
### Boilerplate
@ -18,7 +17,7 @@ The first part here will be all the boilerplate required to set up the element.
Our sine wave element is going to produce raw audio, with a number of channels and any possible sample rate with both 32 bit and 64 bit floating point samples. It will produce a simple sine wave with a configurable frequency, volume/mute and number of samples per audio buffer. In addition it will be possible to configure the element in (pseudo) live mode, meaning that it will only produce data in real-time according to the pipeline clock. And it will be possible to seek to any time/sample position on our source element. It will basically be a more simply version of the [`audiotestsrc`](https://gstreamer.freedesktop.org/data/doc/gstreamer/head/gst-plugins-base-plugins/html/gst-plugins-base-plugins-audiotestsrc.html) element from gst-plugins-base.
So let's get started with all the boilerplate. This time our element will be based on the [`BaseSrc`](https://gstreamer.freedesktop.org/data/doc/gstreamer/head/gstreamer-libs/html/GstBaseSrc.html) base class instead of [`BaseTransform`](https://gstreamer.freedesktop.org/data/doc/gstreamer/head/gstreamer-libs/html/GstBaseTransform.html).
So let's get started with all the boilerplate. This time our element will be based on the [`PushSrc`](https://gstreamer.freedesktop.org/data/doc/gstreamer/head/gstreamer-libs/html/GstPushSrc.html) base class instead of [`BaseTransform`](https://gstreamer.freedesktop.org/data/doc/gstreamer/head/gstreamer-libs/html/GstBaseTransform.html). `PushSrc` is a subclass of the [`BaseSrc`](https://gstreamer.freedesktop.org/data/doc/gstreamer/head/gstreamer-libs/html/GstBaseSrc.html) base class that only works in push mode, i.e. creates buffers as they arrive instead of allowing downstream elements to explicitly pull them.
```rust
use glib;
@ -148,18 +147,26 @@ static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
)
});
impl SineSrc {
// Called when a new instance is to be created
fn new(element: &BaseSrc) -> Box<BaseSrcImpl<BaseSrc>> {
// Initialize live-ness and notify the base class that
// we'd like to operate in Time format
element.set_live(DEFAULT_IS_LIVE);
element.set_format(gst::Format::Time);
impl ObjectSubclass for SineSrc {
const NAME: &'static str = "RsSineSrc";
type ParentType = gst_base::PushSrc;
type Instance = gst::subclass::ElementInstanceStruct<Self>;
type Class = subclass::simple::ClassStruct<Self>;
Box::new(Self {
// This macro provides some boilerplate.
glib_object_subclass!();
// Called when a new instance is to be created. We need to return an instance
// of our struct here.
fn new() -> Self {
Self {
settings: Mutex::new(Default::default()),
state: Mutex::new(Default::default()),
})
clock_wait: Mutex::new(ClockWait {
clock_id: None,
flushing: true,
}),
}
}
// Called exactly once when registering the type. Used for
@ -172,7 +179,11 @@ impl SineSrc {
// will automatically instantiate pads for them.
//
// Our element here can output f32 and f64
fn class_init(klass: &mut BaseSrcClass) {
fn class_init(klass: &mut subclass::simple::ClassStruct<Self>) {
// Set the element specific metadata. This information is what
// is visible from gst-inspect-1.0 and can also be programatically
// retrieved from the gst::Registry after initial registration
// without having to load the plugin in memory.
klass.set_metadata(
"Sine Wave Source",
"Source/Audio",
@ -180,6 +191,11 @@ impl SineSrc {
"Sebastian Dröge <sebastian@centricular.com>",
);
// Create and add pad templates for our sink and source pad. These
// are later used for actually creating the pads and beforehand
// already provide information to GStreamer about all possible
// pads that could exist for this type.
// On the src pad, we can produce F32/F64 with any sample rate
// and any number of channels
let caps = gst::Caps::new_simple(
@ -204,7 +220,8 @@ impl SineSrc {
gst::PadDirection::Src,
gst::PadPresence::Always,
&caps,
);
)
.unwrap();
klass.add_pad_template(src_pad_template);
// Install all our properties
@ -212,7 +229,22 @@ impl SineSrc {
}
}
impl ObjectImpl<BaseSrc> for SineSrc {
impl ObjectImpl for SineSrc {
// This macro provides some boilerplate.
glib_object_impl!();
// Called right after construction of a new instance
fn constructed(&self, obj: &glib::Object) {
// Call the parent class' ::constructed() implementation first
self.parent_constructed(obj);
// Initialize live-ness and notify the base class that
// we'd like to operate in Time format
let basesrc = obj.downcast_ref::<gst_base::BaseSrc>().unwrap();
basesrc.set_live(DEFAULT_IS_LIVE);
basesrc.set_format(gst::Format::Time);
}
// Called whenever a value of a property is changed. It can be called
// at any time from any thread.
fn set_property(&self, obj: &glib::Object, id: u32, value: &glib::Value) {
@ -320,9 +352,9 @@ impl ObjectImpl<BaseSrc> for SineSrc {
}
// Virtual methods of gst::Element. We override none
impl ElementImpl<BaseSrc> for SineSrc { }
impl ElementImpl for SineSrc { }
impl BaseSrcImpl<BaseSrc> for SineSrc {
impl BaseSrcImpl for SineSrc {
// Called when starting, so we can initialize all stream-related state to its defaults
fn start(&self, element: &BaseSrc) -> bool {
// Reset state
@ -467,35 +499,6 @@ Here we take the caps that are passed in, truncate them (i.e. remove all but the
For good measure, we also fixate the number of channels to the closest value to 1, but this would already be the default behaviour anyway. And then chain up to the parent class' implementation of `fixate`, which for now basically does the same as `caps.fixate()`. After this, the caps are fixated, i.e. there is only a single `Structure` left and all fields have concrete values (no ranges or sets).
### Query Handling
As our source element will work by generating a new audio buffer from a specific offset, and especially works in `Time` format, we want to notify downstream elements that we don't want to run in `Pull` mode, only in `Push` mode. In addition would prefer sequential reading. However we still allow seeking later. For a source that does not know about `Time`, e.g. a file source, the format would be configured as `Bytes`. Other values than `Time` and `Bytes` generally don't make any sense.
The main difference here is that otherwise the base class would ask us to produce data for arbitrary `Byte` offsets, and we would have to produce data for that. While possible in our case, it's a bit annoying and for other audio sources it's not easily possible at all.
Downstream elements will try to query this very information from us, so we now have to override the default query handling of `BaseSrc` and handle the `SCHEDULING` query differently. Later we will also handle other queries differently.
```rust
fn query(&self, element: &BaseSrc, query: &mut gst::QueryRef) -> bool {
use gst::QueryView;
match query.view_mut() {
// We only work in Push mode. In Pull mode, create() could be called with
// arbitrary offsets and we would have to produce for that specific offset
QueryView::Scheduling(ref mut q) => {
q.set(gst::SchedulingFlags::SEQUENTIAL, 1, -1, 0);
q.add_scheduling_modes(&[gst::PadMode::Push]);
true
}
_ => BaseSrcImplExt::parent_query(self, element, query),
}
}
```
To handle the `SCHEDULING` query specifically, we first have to match on a view (mutable because we want to modify the view) of the query check the type of the query. If it indeed is a scheduling query, we can set the `SEQUENTIAL` flag and specify that we handle only `Push` mode, then return `true` directly as we handled the query already.
In all other cases we fall back to the parent class' implementation of the `query` virtual method.
### Buffer Creation
Now we have everything in place for a working element, apart from the virtual method to actually generate the raw audio buffers with the sine wave. From a high-level `BaseSrc` works by calling the `create` virtual method over and over again to let the subclass produce a buffer until it returns an error or signals the end of the stream.
@ -557,14 +560,12 @@ Now we convert all the parameters to the types we will use later, and store them
The sine wave itself is calculated by `val = volume * sin(2 * PI * frequency * (i + accumulator) / rate)`, but we actually calculate it by simply increasing the accumulator by `2 * PI * frequency / rate` for every sample instead of doing the multiplication for each sample. We also make sure that the accumulator always stays between `0` and `2 * PI` to prevent any inaccuracies from floating point numbers to affect our produced samples.
Now that this is done, we need to implement the `BaseSrc::create` virtual method for actually allocating the buffer, setting timestamps and other metadata and it and calling our above function.
Now that this is done, we need to implement the `PushSrc::create` virtual method for actually allocating the buffer, setting timestamps and other metadata and it and calling our above function.
```rust
fn create(
&self,
element: &BaseSrc,
_offset: u64,
_length: u32,
element: &PushSrc,
) -> Result<gst::Buffer, gst::FlowReturn> {
// Keep a local copy of the values of all our properties at this very moment. This
// ensures that the mutex is never locked for long and the application wouldn't
@ -742,7 +743,7 @@ Next we wait and then return the buffer as before.
Now we also have to tell the base class that we're running in live mode now. This is done by calling `set_live(true)` on the base class before changing the element state from `Ready` to `Paused`. For this we override the `Element::change_state` virtual method.
```rust
impl ElementImpl<BaseSrc> for SineSrc {
impl ElementImpl for SineSrc {
fn change_state(
&self,
element: &BaseSrc,
@ -763,18 +764,13 @@ impl ElementImpl<BaseSrc> for SineSrc {
And as a last step, we also need to notify downstream elements about our [latency](https://gstreamer.freedesktop.org/documentation/application-development/advanced/clocks.html#latency). Live elements always have to report their latency so that synchronization can work correctly. As the clock time of each buffer is equal to the time when it was created, all buffers would otherwise arrive late in the sinks (they would appear as if they should've been played already at the time when they were created). So all the sinks will have to compensate for the latency that it took from capturing to the sink, and they have to do that in a coordinated way (otherwise audio and video would be out of sync if both have different latencies). For this the pipeline is querying each sink for the latency on its own branch, and then configures a global latency on all sinks according to that.
This querying is done with the `LATENCY` query, which we will now also have to handle.
This querying is done with the `LATENCY` query, which we will have to handle in the `BaseSrc::query()` function.
```rust
fn query(&self, element: &BaseSrc, query: &mut gst::QueryRef) -> bool {
use gst::QueryView;
match query.view_mut() {
// We only work in Push mode. In Pull mode, create() could be called with
// arbitrary offsets and we would have to produce for that specific offset
QueryView::Scheduling(ref mut q) => {
[...]
}
// In Live mode we will have a latency equal to the number of samples in each buffer.
// We can't output samples before they were produced, and the last sample of a buffer
// is produced that much after the beginning, leading to this latency calculation