app: Add an AppSrc/AppSink builder

These allows to construct these elements explicitly together with all the
properties instead of going via gst::ElementFactory::make().
This commit is contained in:
Sebastian Dröge 2022-10-22 10:14:33 +03:00
parent 0e5a4f05f8
commit 2a4e1069a1
8 changed files with 491 additions and 91 deletions

View file

@ -37,25 +37,21 @@ fn create_pipeline() -> Result<gst::Pipeline, Error> {
let pipeline = gst::Pipeline::new(None);
let src = gst::ElementFactory::make("audiotestsrc").build()?;
let sink = gst::ElementFactory::make("appsink").build()?;
pipeline.add_many(&[&src, &sink])?;
src.link(&sink)?;
let appsink = sink
.dynamic_cast::<gst_app::AppSink>()
.expect("Sink element is expected to be an appsink!");
let appsink = gst_app::AppSink::builder()
// Tell the appsink what format we want. It will then be the audiotestsrc's job to
// provide the format we request.
// This can be set after linking the two objects, because format negotiation between
// both elements will happen during pre-rolling of the pipeline.
appsink.set_caps(Some(
.caps(
&gst_audio::AudioCapsBuilder::new_interleaved()
.format(gst_audio::AUDIO_FORMAT_S16)
.channels(1)
.build(),
));
)
.build();
pipeline.add_many(&[&src, appsink.upcast_ref()])?;
src.link(&appsink)?;
// Getting data out of the appsink is done by setting callbacks on it.
// The appsink will then call those handlers, as soon as data is available.

View file

@ -34,16 +34,6 @@ fn create_pipeline() -> Result<gst::Pipeline, Error> {
gst::init()?;
let pipeline = gst::Pipeline::new(None);
let src = gst::ElementFactory::make("appsrc").build()?;
let videoconvert = gst::ElementFactory::make("videoconvert").build()?;
let sink = gst::ElementFactory::make("autovideosink").build()?;
pipeline.add_many(&[&src, &videoconvert, &sink])?;
gst::Element::link_many(&[&src, &videoconvert, &sink])?;
let appsrc = src
.dynamic_cast::<gst_app::AppSrc>()
.expect("Source element is expected to be an appsrc!");
// Specify the format we want to provide as application into the pipeline
// by creating a video info with the given format and creating caps from it for the appsrc element.
@ -53,8 +43,16 @@ fn create_pipeline() -> Result<gst::Pipeline, Error> {
.build()
.expect("Failed to create video info");
appsrc.set_caps(Some(&video_info.to_caps().unwrap()));
appsrc.set_format(gst::Format::Time);
let appsrc = gst_app::AppSrc::builder()
.caps(&video_info.to_caps().unwrap())
.format(gst::Format::Time)
.build();
let videoconvert = gst::ElementFactory::make("videoconvert").build()?;
let sink = gst::ElementFactory::make("autovideosink").build()?;
pipeline.add_many(&[appsrc.upcast_ref(), &videoconvert, &sink])?;
gst::Element::link_many(&[appsrc.upcast_ref(), &videoconvert, &sink])?;
// Our frame counter, that is stored in the mutable environment
// of the closure of the need-data callback

View file

@ -182,16 +182,8 @@ fn example_main() {
// This creates a pipeline with appsrc and appsink.
let pipeline = gst::Pipeline::new(None);
let appsrc = gst::ElementFactory::make("appsrc")
.build()
.unwrap()
.downcast::<gst_app::AppSrc>()
.unwrap();
let appsink = gst::ElementFactory::make("appsink")
.build()
.unwrap()
.downcast::<gst_app::AppSink>()
.unwrap();
let appsrc = gst_app::AppSrc::builder().build();
let appsink = gst_app::AppSink::builder().build();
pipeline.add(&appsrc).unwrap();
pipeline.add(&appsink).unwrap();

View file

@ -47,31 +47,24 @@ fn create_receiver_pipeline(
let caps = video_info.to_caps()?;
let pipeline = gst::Pipeline::new(None);
let src = gst::ElementFactory::make("appsrc").build()?;
let src = gst_app::AppSrc::builder()
.caps(&caps)
.do_timestamp(true)
.is_live(true)
.build();
let filter = video_filter::FdMemoryFadeInVideoFilter::default().upcast::<gst::Element>();
let convert = gst::ElementFactory::make("videoconvert").build()?;
let queue = gst::ElementFactory::make("queue").build()?;
let sink = gst::ElementFactory::make("autovideosink").build()?;
src.downcast_ref::<gst_app::AppSrc>()
.ok_or_else(|| anyhow::anyhow!("is not a appsrc"))?
.set_caps(Some(&caps));
pipeline.add_many(&[&src, &filter, &convert, &queue, &sink])?;
gst::Element::link_many(&[&src, &filter, &convert, &queue, &sink])?;
let appsrc = src
.downcast::<gst_app::AppSrc>()
.map_err(|_| anyhow::anyhow!("is not a appsrc"))?;
appsrc.set_do_timestamp(true);
appsrc.set_is_live(true);
pipeline.add_many(&[src.upcast_ref(), &filter, &convert, &queue, &sink])?;
gst::Element::link_many(&[src.upcast_ref(), &filter, &convert, &queue, &sink])?;
let fd_allocator = gst_allocators::FdAllocator::new();
let video_info = video_info.clone();
let mut fd_buf = [-1; 253];
appsrc.set_callbacks(
src.set_callbacks(
gst_app::AppSrcCallbacks::builder()
.need_data(move |appsrc, _| {
// Read the next fds from the socket, if 0

View file

@ -533,19 +533,17 @@ impl App {
let pipeline = gst::Pipeline::new(None);
let src = gst::ElementFactory::make("videotestsrc").build()?;
let appsink = gst::ElementFactory::make("appsink")
.build()?
.dynamic_cast::<gst_app::AppSink>()
.expect("Sink element is expected to be an appsink!");
appsink.set_enable_last_sample(true);
appsink.set_max_buffers(1);
let caps = gst_video::VideoCapsBuilder::new()
.features(&[&gst_gl::CAPS_FEATURE_MEMORY_GL_MEMORY])
.format(gst_video::VideoFormat::Rgba)
.field("texture-target", "2D")
.build();
appsink.set_caps(Some(&caps));
let appsink = gst_app::AppSink::builder()
.enable_last_sample(true)
.max_buffers(1)
.caps(&caps)
.build();
if let Some(gl_element) = gl_element {
let glupload = gst::ElementFactory::make("glupload").build()?;

View file

@ -268,6 +268,14 @@ unsafe extern "C" fn destroy_callbacks(ptr: gpointer) {
}
impl AppSink {
// rustdoc-stripper-ignore-next
/// Creates a new builder-pattern struct instance to construct [`AppSink`] objects.
///
/// This method returns an instance of [`AppSinkBuilder`](crate::builders::AppSinkBuilder) which can be used to create [`AppSink`] objects.
pub fn builder() -> AppSinkBuilder {
AppSinkBuilder::default()
}
#[doc(alias = "gst_app_sink_set_callbacks")]
pub fn set_callbacks(&self, callbacks: AppSinkCallbacks) {
use once_cell::sync::Lazy;
@ -914,6 +922,205 @@ impl AppSink {
}
}
#[derive(Default)]
// rustdoc-stripper-ignore-next
/// A [builder-pattern] type to construct [`AppSink`] objects.
///
/// [builder-pattern]: https://doc.rust-lang.org/1.0.0/style/ownership/builders.html
#[must_use = "The builder must be built to be used"]
pub struct AppSinkBuilder {
async_: Option<bool>,
buffer_list: Option<bool>,
callbacks: Option<AppSinkCallbacks>,
caps: Option<gst::Caps>,
drop: Option<bool>,
drop_out_of_segment: Option<bool>,
enable_last_sample: Option<bool>,
max_bitrate: Option<u64>,
max_buffers: Option<u32>,
max_lateness: Option<i64>,
#[cfg(any(feature = "v1_16", feature = "dox"))]
processing_deadline: Option<i64>,
qos: Option<bool>,
render_delay: Option<Option<gst::ClockTime>>,
sync: Option<bool>,
throttle_time: Option<u64>,
ts_offset: Option<gst::ClockTimeDiff>,
wait_on_eos: Option<bool>,
name: Option<String>,
}
impl AppSinkBuilder {
// rustdoc-stripper-ignore-next
/// Create a new [`AppSinkBuilder`].
pub fn new() -> Self {
Self::default()
}
// rustdoc-stripper-ignore-next
/// Build the [`AppSink`].
#[must_use = "Building the object from the builder is usually expensive and is not expected to have side effects"]
pub fn build(self) -> AppSink {
let mut properties: Vec<(&str, &dyn ToValue)> = vec![];
if let Some(ref async_) = self.async_ {
properties.push(("async", async_));
}
if let Some(ref buffer_list) = self.buffer_list {
properties.push(("buffer-list", buffer_list));
}
if let Some(ref caps) = self.caps {
properties.push(("caps", caps));
}
if let Some(ref drop) = self.drop {
properties.push(("drop", drop));
}
if let Some(ref enable_last_sample) = self.enable_last_sample {
properties.push(("enable-last-sample", enable_last_sample));
}
if let Some(ref max_bitrate) = self.max_bitrate {
properties.push(("max-bitrate", max_bitrate));
}
if let Some(ref max_buffers) = self.max_buffers {
properties.push(("max-buffers", max_buffers));
}
if let Some(ref max_lateness) = self.max_lateness {
properties.push(("max-lateness", max_lateness));
}
#[cfg(any(feature = "v1_16", feature = "dox"))]
if let Some(ref processing_deadline) = self.processing_deadline {
properties.push(("processing-deadline", processing_deadline));
}
if let Some(ref qos) = self.qos {
properties.push(("qos", qos));
}
if let Some(ref render_delay) = self.render_delay {
properties.push(("render-delay", render_delay));
}
if let Some(ref sync) = self.sync {
properties.push(("sync", sync));
}
if let Some(ref throttle_time) = self.throttle_time {
properties.push(("throttle-time", throttle_time));
}
if let Some(ref ts_offset) = self.ts_offset {
properties.push(("ts-offset", ts_offset));
}
if let Some(ref qos) = self.qos {
properties.push(("qos", qos));
}
if let Some(ref wait_on_eos) = self.wait_on_eos {
properties.push(("wait-on-eos", wait_on_eos));
}
if let Some(ref name) = self.name {
properties.push(("name", name));
}
let appsink = glib::Object::new::<AppSink>(&properties);
if let Some(callbacks) = self.callbacks {
appsink.set_callbacks(callbacks);
}
if let Some(drop_out_of_segment) = self.drop_out_of_segment {
appsink.set_drop_out_of_segment(drop_out_of_segment);
}
appsink
}
pub fn async_(mut self, async_: bool) -> Self {
self.async_ = Some(async_);
self
}
pub fn buffer_list(mut self, buffer_list: bool) -> Self {
self.buffer_list = Some(buffer_list);
self
}
pub fn callbacks(mut self, callbacks: AppSinkCallbacks) -> Self {
self.callbacks = Some(callbacks);
self
}
pub fn caps(mut self, caps: &gst::Caps) -> Self {
self.caps = Some(caps.clone());
self
}
pub fn drop(mut self, drop: bool) -> Self {
self.drop = Some(drop);
self
}
pub fn drop_out_of_segment(mut self, drop_out_of_segment: bool) -> Self {
self.drop_out_of_segment = Some(drop_out_of_segment);
self
}
pub fn enable_last_sample(mut self, enable_last_sample: bool) -> Self {
self.enable_last_sample = Some(enable_last_sample);
self
}
pub fn max_bitrate(mut self, max_bitrate: u64) -> Self {
self.max_bitrate = Some(max_bitrate);
self
}
pub fn max_buffers(mut self, max_buffers: u32) -> Self {
self.max_buffers = Some(max_buffers);
self
}
pub fn max_lateness(mut self, max_lateness: i64) -> Self {
self.max_lateness = Some(max_lateness);
self
}
#[cfg(any(feature = "v1_16", feature = "dox"))]
#[cfg_attr(feature = "dox", doc(cfg(feature = "v1_16")))]
pub fn processing_deadline(mut self, processing_deadline: i64) -> Self {
self.processing_deadline = Some(processing_deadline);
self
}
pub fn qos(mut self, qos: bool) -> Self {
self.qos = Some(qos);
self
}
pub fn render_delay(mut self, render_delay: Option<gst::ClockTime>) -> Self {
self.render_delay = Some(render_delay);
self
}
pub fn sync(mut self, sync: bool) -> Self {
self.sync = Some(sync);
self
}
pub fn throttle_time(mut self, throttle_time: u64) -> Self {
self.throttle_time = Some(throttle_time);
self
}
pub fn ts_offset(mut self, ts_offset: gst::ClockTimeDiff) -> Self {
self.ts_offset = Some(ts_offset);
self
}
pub fn wait_on_eos(mut self, wait_on_eos: bool) -> Self {
self.wait_on_eos = Some(wait_on_eos);
self
}
pub fn name(mut self, name: &str) -> Self {
self.name = Some(name.to_string());
self
}
}
#[derive(Debug)]
pub struct AppSinkStream {
app_sink: glib::WeakRef<AppSink>,

View file

@ -205,6 +205,14 @@ unsafe extern "C" fn destroy_callbacks(ptr: gpointer) {
}
impl AppSrc {
// rustdoc-stripper-ignore-next
/// Creates a new builder-pattern struct instance to construct [`AppSrc`] objects.
///
/// This method returns an instance of [`AppSrcBuilder`](crate::builders::AppSrcBuilder) which can be used to create [`AppSrc`] objects.
pub fn builder() -> AppSrcBuilder {
AppSrcBuilder::default()
}
#[doc(alias = "gst_app_src_push_buffer")]
pub fn push_buffer(&self, buffer: gst::Buffer) -> Result<gst::FlowSuccess, gst::FlowError> {
unsafe {
@ -352,6 +360,227 @@ impl AppSrc {
}
}
#[derive(Default)]
// rustdoc-stripper-ignore-next
/// A [builder-pattern] type to construct [`AppSrc`] objects.
///
/// [builder-pattern]: https://doc.rust-lang.org/1.0.0/style/ownership/builders.html
#[must_use = "The builder must be built to be used"]
pub struct AppSrcBuilder {
automatic_eos: Option<bool>,
block: Option<bool>,
callbacks: Option<AppSrcCallbacks>,
caps: Option<gst::Caps>,
do_timestamp: Option<bool>,
duration: Option<u64>,
format: Option<gst::Format>,
#[cfg(any(feature = "v1_18", feature = "dox"))]
#[cfg_attr(feature = "dox", doc(cfg(feature = "v1_18")))]
handle_segment_change: Option<bool>,
is_live: Option<bool>,
#[cfg(any(feature = "v1_20", feature = "dox"))]
#[cfg_attr(feature = "dox", doc(cfg(feature = "v1_20")))]
leaky_type: Option<crate::AppLeakyType>,
#[cfg(any(feature = "v1_20", feature = "dox"))]
#[cfg_attr(feature = "dox", doc(cfg(feature = "v1_20")))]
max_buffers: Option<u64>,
max_bytes: Option<u64>,
max_latency: Option<i64>,
#[cfg(any(feature = "v1_20", feature = "dox"))]
#[cfg_attr(feature = "dox", doc(cfg(feature = "v1_20")))]
max_time: Option<Option<gst::ClockTime>>,
min_latency: Option<i64>,
min_percent: Option<u32>,
size: Option<i64>,
stream_type: Option<crate::AppStreamType>,
name: Option<String>,
}
impl AppSrcBuilder {
// rustdoc-stripper-ignore-next
/// Create a new [`AppSrcBuilder`].
pub fn new() -> Self {
Self::default()
}
// rustdoc-stripper-ignore-next
/// Build the [`AppSrc`].
#[must_use = "Building the object from the builder is usually expensive and is not expected to have side effects"]
pub fn build(self) -> AppSrc {
let mut properties: Vec<(&str, &dyn ToValue)> = vec![];
if let Some(ref block) = self.block {
properties.push(("block", block));
}
if let Some(ref caps) = self.caps {
properties.push(("caps", caps));
}
if let Some(ref do_timestamp) = self.do_timestamp {
properties.push(("do-timestamp", do_timestamp));
}
if let Some(ref duration) = self.duration {
properties.push(("duration", duration));
}
if let Some(ref format) = self.format {
properties.push(("format", format));
}
#[cfg(any(feature = "v1_18", feature = "dox"))]
if let Some(ref handle_segment_change) = self.handle_segment_change {
properties.push(("handle-segment-change", handle_segment_change));
}
if let Some(ref is_live) = self.is_live {
properties.push(("is-live", is_live));
}
#[cfg(any(feature = "v1_20", feature = "dox"))]
if let Some(ref leaky_type) = self.leaky_type {
properties.push(("leaky-type", leaky_type));
}
#[cfg(any(feature = "v1_20", feature = "dox"))]
if let Some(ref max_buffers) = self.max_buffers {
properties.push(("max-buffers", max_buffers));
}
if let Some(ref max_bytes) = self.max_bytes {
properties.push(("max-bytes", max_bytes));
}
if let Some(ref max_latency) = self.max_latency {
properties.push(("max-latency", max_latency));
}
#[cfg(any(feature = "v1_20", feature = "dox"))]
if let Some(ref max_time) = self.max_time {
properties.push(("max-time", max_time));
}
if let Some(ref min_latency) = self.min_latency {
properties.push(("min-latency", min_latency));
}
if let Some(ref min_percent) = self.min_percent {
properties.push(("min-percent", min_percent));
}
if let Some(ref size) = self.size {
properties.push(("size", size));
}
if let Some(ref stream_type) = self.stream_type {
properties.push(("stream-type", stream_type));
}
if let Some(ref name) = self.name {
properties.push(("name", name));
}
let appsrc = glib::Object::new::<AppSrc>(&properties);
if let Some(callbacks) = self.callbacks {
appsrc.set_callbacks(callbacks);
}
if let Some(automatic_eos) = self.automatic_eos {
appsrc.set_automatic_eos(automatic_eos);
}
appsrc
}
pub fn automatic_eos(mut self, automatic_eos: bool) -> Self {
self.automatic_eos = Some(automatic_eos);
self
}
pub fn block(mut self, block: bool) -> Self {
self.block = Some(block);
self
}
pub fn callbacks(mut self, callbacks: AppSrcCallbacks) -> Self {
self.callbacks = Some(callbacks);
self
}
pub fn caps(mut self, caps: &gst::Caps) -> Self {
self.caps = Some(caps.clone());
self
}
pub fn do_timestamp(mut self, do_timestamp: bool) -> Self {
self.do_timestamp = Some(do_timestamp);
self
}
pub fn duration(mut self, duration: u64) -> Self {
self.duration = Some(duration);
self
}
pub fn format(mut self, format: gst::Format) -> Self {
self.format = Some(format);
self
}
#[cfg(any(feature = "v1_18", feature = "dox"))]
#[cfg_attr(feature = "dox", doc(cfg(feature = "v1_18")))]
pub fn handle_segment_change(mut self, handle_segment_change: bool) -> Self {
self.handle_segment_change = Some(handle_segment_change);
self
}
pub fn is_live(mut self, is_live: bool) -> Self {
self.is_live = Some(is_live);
self
}
#[cfg(any(feature = "v1_20", feature = "dox"))]
#[cfg_attr(feature = "dox", doc(cfg(feature = "v1_20")))]
pub fn leaky_type(mut self, leaky_type: crate::AppLeakyType) -> Self {
self.leaky_type = Some(leaky_type);
self
}
#[cfg(any(feature = "v1_20", feature = "dox"))]
#[cfg_attr(feature = "dox", doc(cfg(feature = "v1_20")))]
pub fn max_buffers(mut self, max_buffers: u64) -> Self {
self.max_buffers = Some(max_buffers);
self
}
pub fn max_bytes(mut self, max_bytes: u64) -> Self {
self.max_bytes = Some(max_bytes);
self
}
pub fn max_latency(mut self, max_latency: i64) -> Self {
self.max_latency = Some(max_latency);
self
}
#[cfg(any(feature = "v1_20", feature = "dox"))]
#[cfg_attr(feature = "dox", doc(cfg(feature = "v1_20")))]
pub fn max_time(mut self, max_time: Option<gst::ClockTime>) -> Self {
self.max_time = Some(max_time);
self
}
pub fn min_latency(mut self, min_latency: i64) -> Self {
self.min_latency = Some(min_latency);
self
}
pub fn min_percent(mut self, min_percent: u32) -> Self {
self.min_percent = Some(min_percent);
self
}
pub fn size(mut self, size: i64) -> Self {
self.size = Some(size);
self
}
pub fn stream_type(mut self, stream_type: crate::AppStreamType) -> Self {
self.stream_type = Some(stream_type);
self
}
pub fn name(mut self, name: &str) -> Self {
self.name = Some(name.to_string());
self
}
}
#[derive(Debug)]
pub struct AppSrcSink {
app_src: glib::WeakRef<AppSrc>,

View file

@ -47,10 +47,16 @@ fn main() {
return;
}
let appsrc = gst::ElementFactory::make("appsrc")
.name("audio_source")
let info = AudioInfo::builder(gst_audio::AudioFormat::S16le, SAMPLE_RATE, 1)
.build()
.unwrap();
let audio_caps = info.to_caps().unwrap();
let appsrc = gst_app::AppSrc::builder()
.name("audio_source")
.caps(&audio_caps)
.format(gst::Format::Time)
.build();
let tee = gst::ElementFactory::make("tee")
.name("tee")
.build()
@ -97,16 +103,16 @@ fn main() {
.name("app_queue")
.build()
.unwrap();
let appsink = gst::ElementFactory::make("appsink")
let appsink = gst_app::AppSink::builder()
.caps(&audio_caps)
.name("app_sink")
.build()
.unwrap();
.build();
let pipeline = gst::Pipeline::new(Some("test-pipeline"));
pipeline
.add_many(&[
&appsrc,
appsrc.upcast_ref(),
&tee,
&audio_queue,
&audio_convert1,
@ -118,11 +124,11 @@ fn main() {
&video_convert,
&video_sink,
&app_queue,
&appsink,
appsink.upcast_ref(),
])
.unwrap();
gst::Element::link_many(&[&appsrc, &tee]).unwrap();
gst::Element::link_many(&[appsrc.upcast_ref(), &tee]).unwrap();
gst::Element::link_many(&[&audio_queue, &audio_convert1, &audio_resample, &audio_sink])
.unwrap();
gst::Element::link_many(&[
@ -133,7 +139,7 @@ fn main() {
&video_sink,
])
.unwrap();
gst::Element::link_many(&[&app_queue, &appsink]).unwrap();
gst::Element::link_many(&[&app_queue, appsink.upcast_ref()]).unwrap();
let tee_audio_pad = tee.request_pad_simple("src_%u").unwrap();
println!(
@ -154,22 +160,6 @@ fn main() {
let queue_app_pad = app_queue.static_pad("sink").unwrap();
tee_app_pad.link(&queue_app_pad).unwrap();
// configure appsrc
let info = AudioInfo::builder(gst_audio::AudioFormat::S16le, SAMPLE_RATE, 1)
.build()
.unwrap();
let audio_caps = info.to_caps().unwrap();
let appsrc = appsrc
.dynamic_cast::<AppSrc>()
.expect("Source element is expected to be an appsrc!");
appsrc.set_caps(Some(&audio_caps));
appsrc.set_format(gst::Format::Time);
let appsink = appsink
.dynamic_cast::<AppSink>()
.expect("Sink element is expected to be an appsink!");
let data: Arc<Mutex<CustomData>> = Arc::new(Mutex::new(CustomData::new(&appsrc, &appsink)));
let data_weak = Arc::downgrade(&data);
@ -250,9 +240,6 @@ fn main() {
.build(),
);
// configure appsink
appsink.set_caps(Some(&audio_caps));
let data_weak = Arc::downgrade(&data);
appsink.set_callbacks(
gst_app::AppSinkCallbacks::builder()