gstreamer: Add accessors for PadProbeDatas on PadProbeInfo

And make use of it in examples and other code.

This allows to simplify usage a bit in most cases.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer-rs/-/merge_requests/1324>
This commit is contained in:
Sebastian Dröge 2023-10-16 11:28:18 +03:00
parent 4c8d16d09e
commit 62f58620b7
6 changed files with 144 additions and 77 deletions

View file

@ -75,31 +75,33 @@ fn example_main() {
// Add a pad probe on the sink pad and catch the custom event we sent, then send
// an EOS event on the pipeline.
sinkpad.add_probe(gst::PadProbeType::EVENT_DOWNSTREAM, move |_, probe_info| {
match probe_info.data {
Some(gst::PadProbeData::Event(ref ev))
if ev.type_() == gst::EventType::CustomDownstream =>
{
if let Some(custom_event) = ExampleCustomEvent::parse(ev) {
if let Some(pipeline) = pipeline_weak.upgrade() {
if custom_event.send_eos {
/* Send EOS event to shut down the pipeline, but from an async callback, as we're
* in a pad probe blocking the stream thread here... */
println!("Got custom event with send_eos=true. Sending EOS");
let ev = gst::event::Eos::new();
let pipeline_weak = pipeline_weak.clone();
pipeline.call_async(move |_| {
if let Some(pipeline) = pipeline_weak.upgrade() {
pipeline.send_event(ev);
}
});
} else {
println!("Got custom event, with send_eos=false. Ignoring");
}
}
let Some(event) = probe_info.event() else {
return gst::PadProbeReturn::Ok;
};
let Some(custom_event) = ExampleCustomEvent::parse(event) else {
return gst::PadProbeReturn::Ok;
};
let Some(pipeline) = pipeline_weak.upgrade() else {
return gst::PadProbeReturn::Ok;
};
if custom_event.send_eos {
/* Send EOS event to shut down the pipeline, but from an async callback, as we're
* in a pad probe blocking the stream thread here... */
println!("Got custom event with send_eos=true. Sending EOS");
let ev = gst::event::Eos::new();
let pipeline_weak = pipeline_weak.clone();
pipeline.call_async(move |_| {
if let Some(pipeline) = pipeline_weak.upgrade() {
pipeline.send_event(ev);
}
}
_ => (),
});
} else {
println!("Got custom event, with send_eos=false. Ignoring");
}
gst::PadProbeReturn::Ok
});

View file

@ -305,19 +305,18 @@ fn main() -> Result<()> {
let sinkpad = videosink.static_pad("sink").unwrap();
let overlay_context_weak = Arc::downgrade(&overlay_context);
sinkpad.add_probe(gst::PadProbeType::BUFFER, move |_, probe_info| {
if let Some(gst::PadProbeData::Buffer(_)) = probe_info.data {
let overlay_context = overlay_context_weak.upgrade().unwrap();
let mut context = overlay_context.lock().unwrap();
context.timestamp_queue.push_back(SystemTime::now());
// Updates framerate per 10 frames
if context.timestamp_queue.len() >= 10 {
let now = context.timestamp_queue.back().unwrap();
let front = context.timestamp_queue.front().unwrap();
let duration = now.duration_since(*front).unwrap().as_millis() as f32;
context.avg_fps = 1000f32 * (context.timestamp_queue.len() - 1) as f32 / duration;
context.timestamp_queue.clear();
}
let overlay_context = overlay_context_weak.upgrade().unwrap();
let mut context = overlay_context.lock().unwrap();
context.timestamp_queue.push_back(SystemTime::now());
// Updates framerate per 10 frames
if context.timestamp_queue.len() >= 10 {
let now = context.timestamp_queue.back().unwrap();
let front = context.timestamp_queue.front().unwrap();
let duration = now.duration_since(*front).unwrap().as_millis() as f32;
context.avg_fps = 1000f32 * (context.timestamp_queue.len() - 1) as f32 / duration;
context.timestamp_queue.clear();
}
gst::PadProbeReturn::Ok
});

View file

@ -38,36 +38,38 @@ fn example_main() {
// This handler gets called for every buffer that passes the pad we probe.
src_pad.add_probe(gst::PadProbeType::BUFFER, |_, probe_info| {
// Interpret the data sent over the pad as one buffer
if let Some(gst::PadProbeData::Buffer(ref buffer)) = probe_info.data {
// At this point, buffer is only a reference to an existing memory region somewhere.
// When we want to access its content, we have to map it while requesting the required
// mode of access (read, read/write).
// This type of abstraction is necessary, because the buffer in question might not be
// on the machine's main memory itself, but rather in the GPU's memory.
// So mapping the buffer makes the underlying memory region accessible to us.
// See: https://gstreamer.freedesktop.org/documentation/plugin-development/advanced/allocation.html
let map = buffer.map_readable().unwrap();
let Some(buffer) = probe_info.buffer() else {
return gst::PadProbeReturn::Ok;
};
// We know what format the data in the memory region has, since we requested
// it by setting the appsink's caps. So what we do here is interpret the
// memory region we mapped as an array of signed 16 bit integers.
let samples = if let Ok(samples) = map.as_slice_of::<i16>() {
samples
} else {
return gst::PadProbeReturn::Ok;
};
// At this point, buffer is only a reference to an existing memory region somewhere.
// When we want to access its content, we have to map it while requesting the required
// mode of access (read, read/write).
// This type of abstraction is necessary, because the buffer in question might not be
// on the machine's main memory itself, but rather in the GPU's memory.
// So mapping the buffer makes the underlying memory region accessible to us.
// See: https://gstreamer.freedesktop.org/documentation/plugin-development/advanced/allocation.html
let map = buffer.map_readable().unwrap();
// For buffer (= chunk of samples), we calculate the root mean square:
let sum: f64 = samples
.iter()
.map(|sample| {
let f = f64::from(*sample) / f64::from(i16::MAX);
f * f
})
.sum();
let rms = (sum / (samples.len() as f64)).sqrt();
println!("rms: {rms}");
}
// We know what format the data in the memory region has, since we requested
// it by setting the appsink's caps. So what we do here is interpret the
// memory region we mapped as an array of signed 16 bit integers.
let samples = if let Ok(samples) = map.as_slice_of::<i16>() {
samples
} else {
return gst::PadProbeReturn::Ok;
};
// For buffer (= chunk of samples), we calculate the root mean square:
let sum: f64 = samples
.iter()
.map(|sample| {
let f = f64::from(*sample) / f64::from(i16::MAX);
f * f
})
.sum();
let rms = (sum / (samples.len() as f64)).sqrt();
println!("rms: {rms}");
gst::PadProbeReturn::Ok
});

View file

@ -96,7 +96,7 @@ fn example_main() {
mixer_src_pad.add_probe(gst::PadProbeType::EVENT_UPSTREAM, move |_, probe_info| {
let mixer_sink_pad = mixer_sink_pad_weak.upgrade().unwrap();
let Some(gst::PadProbeData::Event(ref ev)) = probe_info.data else {
let Some(ev) = probe_info.event() else {
return gst::PadProbeReturn::Ok;
};

View file

@ -196,11 +196,13 @@ impl StreamProducer {
.add_probe(
gst::PadProbeType::EVENT_UPSTREAM,
glib::clone!(@weak appsink, @weak consumer => @default-panic, move |_pad, info| {
if let Some(gst::PadProbeData::Event(ref ev)) = info.data {
if gst_video::UpstreamForceKeyUnitEvent::parse(ev).is_ok() {
gst::debug!(CAT, obj: &appsink, "Requesting keyframe");
let _ = appsink.send_event(ev.clone());
}
let Some(event) = info.event() else {
return gst::PadProbeReturn::Ok;
};
if gst_video::UpstreamForceKeyUnitEvent::parse(event).is_ok() {
gst::debug!(CAT, obj: &appsink, "Requesting keyframe");
let _ = appsink.send_event(event.clone());
}
gst::PadProbeReturn::Ok
@ -486,14 +488,18 @@ impl<'a> From<&'a gst_app::AppSink> for StreamProducer {
sinkpad.add_probe(
gst::PadProbeType::EVENT_UPSTREAM,
glib::clone!(@strong consumers => move |_pad, info| {
if let Some(gst::PadProbeData::Event(ref ev)) = info.data {
if let gst::EventView::Latency(ev) = ev.view() {
let latency = ev.latency();
let mut consumers = consumers.lock().unwrap();
consumers.current_latency = Some(latency);
consumers.latency_updated = true;
}
}
let Some(event) = info.event() else {
return gst::PadProbeReturn::Ok;
};
let gst::EventView::Latency(event) = event.view() else {
return gst::PadProbeReturn::Ok;
};
let latency = event.latency();
let mut consumers = consumers.lock().unwrap();
consumers.current_latency = Some(latency);
gst::PadProbeReturn::Ok
}),
);

View file

@ -56,6 +56,64 @@ pub struct PadProbeInfo<'a> {
pub flow_res: Result<FlowSuccess, FlowError>,
}
impl<'a> PadProbeInfo<'a> {
pub fn buffer(&self) -> Option<&Buffer> {
match self.data {
Some(PadProbeData::Buffer(ref buffer)) => Some(buffer),
_ => None,
}
}
pub fn buffer_mut(&mut self) -> Option<&mut Buffer> {
match self.data {
Some(PadProbeData::Buffer(ref mut buffer)) => Some(buffer),
_ => None,
}
}
pub fn buffer_list(&self) -> Option<&BufferList> {
match self.data {
Some(PadProbeData::BufferList(ref buffer_list)) => Some(buffer_list),
_ => None,
}
}
pub fn buffer_list_mut(&mut self) -> Option<&mut BufferList> {
match self.data {
Some(PadProbeData::BufferList(ref mut buffer_list)) => Some(buffer_list),
_ => None,
}
}
pub fn query(&self) -> Option<&QueryRef> {
match self.data {
Some(PadProbeData::Query(ref query)) => Some(*query),
_ => None,
}
}
pub fn query_mut(&mut self) -> Option<&mut QueryRef> {
match self.data {
Some(PadProbeData::Query(ref mut query)) => Some(*query),
_ => None,
}
}
pub fn event(&self) -> Option<&Event> {
match self.data {
Some(PadProbeData::Event(ref event)) => Some(event),
_ => None,
}
}
pub fn event_mut(&mut self) -> Option<&mut Event> {
match self.data {
Some(PadProbeData::Event(ref mut event)) => Some(event),
_ => None,
}
}
}
#[derive(Debug)]
pub enum PadProbeData<'a> {
Buffer(Buffer),