mirror of
https://gitlab.freedesktop.org/gstreamer/gstreamer-rs.git
synced 2025-01-23 15:38:19 +00:00
gstreamer: Add accessors for PadProbeData
s on PadProbeInfo
And make use of it in examples and other code. This allows to simplify usage a bit in most cases. Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer-rs/-/merge_requests/1337>
This commit is contained in:
parent
43096963de
commit
c2b5341b8f
6 changed files with 144 additions and 77 deletions
|
@ -75,31 +75,33 @@ fn example_main() {
|
|||
// Add a pad probe on the sink pad and catch the custom event we sent, then send
|
||||
// an EOS event on the pipeline.
|
||||
sinkpad.add_probe(gst::PadProbeType::EVENT_DOWNSTREAM, move |_, probe_info| {
|
||||
match probe_info.data {
|
||||
Some(gst::PadProbeData::Event(ref ev))
|
||||
if ev.type_() == gst::EventType::CustomDownstream =>
|
||||
{
|
||||
if let Some(custom_event) = ExampleCustomEvent::parse(ev) {
|
||||
if let Some(pipeline) = pipeline_weak.upgrade() {
|
||||
if custom_event.send_eos {
|
||||
/* Send EOS event to shut down the pipeline, but from an async callback, as we're
|
||||
* in a pad probe blocking the stream thread here... */
|
||||
println!("Got custom event with send_eos=true. Sending EOS");
|
||||
let ev = gst::event::Eos::new();
|
||||
let pipeline_weak = pipeline_weak.clone();
|
||||
pipeline.call_async(move |_| {
|
||||
if let Some(pipeline) = pipeline_weak.upgrade() {
|
||||
pipeline.send_event(ev);
|
||||
}
|
||||
});
|
||||
} else {
|
||||
println!("Got custom event, with send_eos=false. Ignoring");
|
||||
}
|
||||
}
|
||||
let Some(event) = probe_info.event() else {
|
||||
return gst::PadProbeReturn::Ok;
|
||||
};
|
||||
|
||||
let Some(custom_event) = ExampleCustomEvent::parse(event) else {
|
||||
return gst::PadProbeReturn::Ok;
|
||||
};
|
||||
|
||||
let Some(pipeline) = pipeline_weak.upgrade() else {
|
||||
return gst::PadProbeReturn::Ok;
|
||||
};
|
||||
|
||||
if custom_event.send_eos {
|
||||
/* Send EOS event to shut down the pipeline, but from an async callback, as we're
|
||||
* in a pad probe blocking the stream thread here... */
|
||||
println!("Got custom event with send_eos=true. Sending EOS");
|
||||
let ev = gst::event::Eos::new();
|
||||
let pipeline_weak = pipeline_weak.clone();
|
||||
pipeline.call_async(move |_| {
|
||||
if let Some(pipeline) = pipeline_weak.upgrade() {
|
||||
pipeline.send_event(ev);
|
||||
}
|
||||
}
|
||||
_ => (),
|
||||
});
|
||||
} else {
|
||||
println!("Got custom event, with send_eos=false. Ignoring");
|
||||
}
|
||||
|
||||
gst::PadProbeReturn::Ok
|
||||
});
|
||||
|
||||
|
|
|
@ -305,19 +305,18 @@ fn main() -> Result<()> {
|
|||
let sinkpad = videosink.static_pad("sink").unwrap();
|
||||
let overlay_context_weak = Arc::downgrade(&overlay_context);
|
||||
sinkpad.add_probe(gst::PadProbeType::BUFFER, move |_, probe_info| {
|
||||
if let Some(gst::PadProbeData::Buffer(_)) = probe_info.data {
|
||||
let overlay_context = overlay_context_weak.upgrade().unwrap();
|
||||
let mut context = overlay_context.lock().unwrap();
|
||||
context.timestamp_queue.push_back(SystemTime::now());
|
||||
// Updates framerate per 10 frames
|
||||
if context.timestamp_queue.len() >= 10 {
|
||||
let now = context.timestamp_queue.back().unwrap();
|
||||
let front = context.timestamp_queue.front().unwrap();
|
||||
let duration = now.duration_since(*front).unwrap().as_millis() as f32;
|
||||
context.avg_fps = 1000f32 * (context.timestamp_queue.len() - 1) as f32 / duration;
|
||||
context.timestamp_queue.clear();
|
||||
}
|
||||
let overlay_context = overlay_context_weak.upgrade().unwrap();
|
||||
let mut context = overlay_context.lock().unwrap();
|
||||
context.timestamp_queue.push_back(SystemTime::now());
|
||||
// Updates framerate per 10 frames
|
||||
if context.timestamp_queue.len() >= 10 {
|
||||
let now = context.timestamp_queue.back().unwrap();
|
||||
let front = context.timestamp_queue.front().unwrap();
|
||||
let duration = now.duration_since(*front).unwrap().as_millis() as f32;
|
||||
context.avg_fps = 1000f32 * (context.timestamp_queue.len() - 1) as f32 / duration;
|
||||
context.timestamp_queue.clear();
|
||||
}
|
||||
|
||||
gst::PadProbeReturn::Ok
|
||||
});
|
||||
|
||||
|
|
|
@ -38,36 +38,38 @@ fn example_main() {
|
|||
// This handler gets called for every buffer that passes the pad we probe.
|
||||
src_pad.add_probe(gst::PadProbeType::BUFFER, |_, probe_info| {
|
||||
// Interpret the data sent over the pad as one buffer
|
||||
if let Some(gst::PadProbeData::Buffer(ref buffer)) = probe_info.data {
|
||||
// At this point, buffer is only a reference to an existing memory region somewhere.
|
||||
// When we want to access its content, we have to map it while requesting the required
|
||||
// mode of access (read, read/write).
|
||||
// This type of abstraction is necessary, because the buffer in question might not be
|
||||
// on the machine's main memory itself, but rather in the GPU's memory.
|
||||
// So mapping the buffer makes the underlying memory region accessible to us.
|
||||
// See: https://gstreamer.freedesktop.org/documentation/plugin-development/advanced/allocation.html
|
||||
let map = buffer.map_readable().unwrap();
|
||||
let Some(buffer) = probe_info.buffer() else {
|
||||
return gst::PadProbeReturn::Ok;
|
||||
};
|
||||
|
||||
// We know what format the data in the memory region has, since we requested
|
||||
// it by setting the appsink's caps. So what we do here is interpret the
|
||||
// memory region we mapped as an array of signed 16 bit integers.
|
||||
let samples = if let Ok(samples) = map.as_slice_of::<i16>() {
|
||||
samples
|
||||
} else {
|
||||
return gst::PadProbeReturn::Ok;
|
||||
};
|
||||
// At this point, buffer is only a reference to an existing memory region somewhere.
|
||||
// When we want to access its content, we have to map it while requesting the required
|
||||
// mode of access (read, read/write).
|
||||
// This type of abstraction is necessary, because the buffer in question might not be
|
||||
// on the machine's main memory itself, but rather in the GPU's memory.
|
||||
// So mapping the buffer makes the underlying memory region accessible to us.
|
||||
// See: https://gstreamer.freedesktop.org/documentation/plugin-development/advanced/allocation.html
|
||||
let map = buffer.map_readable().unwrap();
|
||||
|
||||
// For buffer (= chunk of samples), we calculate the root mean square:
|
||||
let sum: f64 = samples
|
||||
.iter()
|
||||
.map(|sample| {
|
||||
let f = f64::from(*sample) / f64::from(i16::MAX);
|
||||
f * f
|
||||
})
|
||||
.sum();
|
||||
let rms = (sum / (samples.len() as f64)).sqrt();
|
||||
println!("rms: {rms}");
|
||||
}
|
||||
// We know what format the data in the memory region has, since we requested
|
||||
// it by setting the appsink's caps. So what we do here is interpret the
|
||||
// memory region we mapped as an array of signed 16 bit integers.
|
||||
let samples = if let Ok(samples) = map.as_slice_of::<i16>() {
|
||||
samples
|
||||
} else {
|
||||
return gst::PadProbeReturn::Ok;
|
||||
};
|
||||
|
||||
// For buffer (= chunk of samples), we calculate the root mean square:
|
||||
let sum: f64 = samples
|
||||
.iter()
|
||||
.map(|sample| {
|
||||
let f = f64::from(*sample) / f64::from(i16::MAX);
|
||||
f * f
|
||||
})
|
||||
.sum();
|
||||
let rms = (sum / (samples.len() as f64)).sqrt();
|
||||
println!("rms: {rms}");
|
||||
|
||||
gst::PadProbeReturn::Ok
|
||||
});
|
||||
|
|
|
@ -96,7 +96,7 @@ fn example_main() {
|
|||
mixer_src_pad.add_probe(gst::PadProbeType::EVENT_UPSTREAM, move |_, probe_info| {
|
||||
let mixer_sink_pad = mixer_sink_pad_weak.upgrade().unwrap();
|
||||
|
||||
let Some(gst::PadProbeData::Event(ref ev)) = probe_info.data else {
|
||||
let Some(ev) = probe_info.event() else {
|
||||
return gst::PadProbeReturn::Ok;
|
||||
};
|
||||
|
||||
|
|
|
@ -196,11 +196,13 @@ impl StreamProducer {
|
|||
.add_probe(
|
||||
gst::PadProbeType::EVENT_UPSTREAM,
|
||||
glib::clone!(@weak appsink, @weak consumer => @default-panic, move |_pad, info| {
|
||||
if let Some(gst::PadProbeData::Event(ref ev)) = info.data {
|
||||
if gst_video::UpstreamForceKeyUnitEvent::parse(ev).is_ok() {
|
||||
gst::debug!(CAT, obj: &appsink, "Requesting keyframe");
|
||||
let _ = appsink.send_event(ev.clone());
|
||||
}
|
||||
let Some(event) = info.event() else {
|
||||
return gst::PadProbeReturn::Ok;
|
||||
};
|
||||
|
||||
if gst_video::UpstreamForceKeyUnitEvent::parse(event).is_ok() {
|
||||
gst::debug!(CAT, obj: &appsink, "Requesting keyframe");
|
||||
let _ = appsink.send_event(event.clone());
|
||||
}
|
||||
|
||||
gst::PadProbeReturn::Ok
|
||||
|
@ -486,14 +488,18 @@ impl<'a> From<&'a gst_app::AppSink> for StreamProducer {
|
|||
sinkpad.add_probe(
|
||||
gst::PadProbeType::EVENT_UPSTREAM,
|
||||
glib::clone!(@strong consumers => move |_pad, info| {
|
||||
if let Some(gst::PadProbeData::Event(ref ev)) = info.data {
|
||||
if let gst::EventView::Latency(ev) = ev.view() {
|
||||
let latency = ev.latency();
|
||||
let mut consumers = consumers.lock().unwrap();
|
||||
consumers.current_latency = Some(latency);
|
||||
consumers.latency_updated = true;
|
||||
}
|
||||
}
|
||||
let Some(event) = info.event() else {
|
||||
return gst::PadProbeReturn::Ok;
|
||||
};
|
||||
|
||||
let gst::EventView::Latency(event) = event.view() else {
|
||||
return gst::PadProbeReturn::Ok;
|
||||
};
|
||||
|
||||
let latency = event.latency();
|
||||
let mut consumers = consumers.lock().unwrap();
|
||||
consumers.current_latency = Some(latency);
|
||||
|
||||
gst::PadProbeReturn::Ok
|
||||
}),
|
||||
);
|
||||
|
|
|
@ -56,6 +56,64 @@ pub struct PadProbeInfo<'a> {
|
|||
pub flow_res: Result<FlowSuccess, FlowError>,
|
||||
}
|
||||
|
||||
impl<'a> PadProbeInfo<'a> {
|
||||
pub fn buffer(&self) -> Option<&Buffer> {
|
||||
match self.data {
|
||||
Some(PadProbeData::Buffer(ref buffer)) => Some(buffer),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn buffer_mut(&mut self) -> Option<&mut Buffer> {
|
||||
match self.data {
|
||||
Some(PadProbeData::Buffer(ref mut buffer)) => Some(buffer),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn buffer_list(&self) -> Option<&BufferList> {
|
||||
match self.data {
|
||||
Some(PadProbeData::BufferList(ref buffer_list)) => Some(buffer_list),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn buffer_list_mut(&mut self) -> Option<&mut BufferList> {
|
||||
match self.data {
|
||||
Some(PadProbeData::BufferList(ref mut buffer_list)) => Some(buffer_list),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn query(&self) -> Option<&QueryRef> {
|
||||
match self.data {
|
||||
Some(PadProbeData::Query(ref query)) => Some(*query),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn query_mut(&mut self) -> Option<&mut QueryRef> {
|
||||
match self.data {
|
||||
Some(PadProbeData::Query(ref mut query)) => Some(*query),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn event(&self) -> Option<&Event> {
|
||||
match self.data {
|
||||
Some(PadProbeData::Event(ref event)) => Some(event),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn event_mut(&mut self) -> Option<&mut Event> {
|
||||
match self.data {
|
||||
Some(PadProbeData::Event(ref mut event)) => Some(event),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum PadProbeData<'a> {
|
||||
Buffer(Buffer),
|
||||
|
|
Loading…
Reference in a new issue