analytics splitter/combiner: Remove the separate fields to events and buffer

Just put all of the serialized mini objects in the same queue.
This commit is contained in:
Olivier Crête 2025-08-15 15:15:29 -04:00
parent 83d16a1cca
commit e49ca55ed8
3 changed files with 387 additions and 410 deletions

View file

@ -7,7 +7,7 @@ use gst_base::{prelude::*, subclass::prelude::*};
use std::{
cmp,
collections::{BTreeMap, VecDeque},
mem, ptr,
mem,
sync::{LazyLock, Mutex},
};
@ -222,10 +222,7 @@ impl AggregatorImpl for AnalyticsCombiner {
for pad in &state_guard.sinkpads {
let pad_imp = pad.imp();
let mut pad_state = pad_imp.state.lock().unwrap();
pad_state.sticky_events.clear();
pad_state.pending_serialized_events.clear();
pad_state.pending_buffers.clear();
pad_state.previous_buffer = None;
*pad_state = Default::default();
}
*state_guard = State::default();
@ -290,7 +287,9 @@ impl AggregatorImpl for AnalyticsCombiner {
end_position,
)?;
let buffer = self.drain(&state_guard, &settings, start_position, end_position);
drop(state_guard);
// Renegotiate if caps where processed
self.negotiate(state_guard);
gst::trace!(CAT, imp = self, "Finishing buffer {buffer:?}",);
@ -302,8 +301,6 @@ impl AggregatorImpl for AnalyticsCombiner {
}
fn sink_event(&self, pad: &gst_base::AggregatorPad, event: gst::Event) -> bool {
use gst::EventView;
gst::log!(CAT, obj = pad, "Handling event {event:?}");
let pad = pad
@ -311,84 +308,9 @@ impl AggregatorImpl for AnalyticsCombiner {
.unwrap();
let pad_imp = pad.imp();
match event.view() {
EventView::Caps(caps) => {
let caps = caps.caps_owned();
gst::debug!(CAT, obj = pad, "Received new caps {caps:?}");
let mut pad_state = pad_imp.state.lock().unwrap();
pad_state.current_caps = Some(caps);
// Renegotiate before the next aggregate call
self.obj().src_pad().mark_reconfigure();
}
EventView::StreamStart(ev) => {
let mut pad_state = pad_imp.state.lock().unwrap();
let stream_id_changed = pad_state
.sticky_events
.get(&(OrderedEventType(gst::EventType::StreamStart), None))
.is_none_or(|old_ev| {
let new_stream_id = ev.stream_id();
let old_stream_id = {
let gst::EventView::StreamStart(old_ev) = old_ev.view() else {
unreachable!();
};
old_ev.stream_id()
};
new_stream_id != old_stream_id
});
pad_state
.sticky_events
.remove(&(OrderedEventType(gst::EventType::Eos), None));
pad_state
.sticky_events
.remove(&(OrderedEventType(gst::EventType::StreamGroupDone), None));
if stream_id_changed {
pad_state
.sticky_events
.remove(&(OrderedEventType(gst::EventType::Tag), None));
}
}
EventView::FlushStop(_) => {
let mut pad_state = pad_imp.state.lock().unwrap();
pad_state
.sticky_events
.remove(&(OrderedEventType(gst::EventType::Eos), None));
pad_state
.sticky_events
.remove(&(OrderedEventType(gst::EventType::StreamGroupDone), None));
pad_state
.sticky_events
.remove(&(OrderedEventType(gst::EventType::Segment), None));
pad_state.pending_serialized_events.clear();
pad_state.pending_buffers.clear();
pad_state.previous_buffer = None;
}
_ => (),
}
// Collect all events to be sent as part of the next buffer
if event.is_sticky() {
let mut pad_state = pad_imp.state.lock().unwrap();
let key = if event.is_sticky_multi() {
(
OrderedEventType(event.type_()),
event.structure().map(|s| s.name().to_string()),
)
} else {
(OrderedEventType(event.type_()), None)
};
pad_state.sticky_events.insert(key, event.clone());
} else if event.is_serialized() {
if event.is_serialized() {
let mut pad_state = pad_imp.state.lock().unwrap();
pad_state.pending_serialized_events.push_back(event.clone());
}
@ -517,42 +439,6 @@ impl AggregatorImpl for AnalyticsCombiner {
}
fn negotiate(&self) -> bool {
let state_guard = self.state.lock().unwrap();
let mut streams = gst::Array::default();
for pad in &state_guard.sinkpads {
let pad_imp = pad.imp();
let pad_state = pad_imp.state.lock().unwrap();
let caps = pad_state
.sticky_events
.values()
.find_map(|ev| {
let gst::EventView::Caps(caps) = ev.view() else {
return None;
};
let caps = caps.caps_owned();
Some(Some(caps))
})
.unwrap_or_else(|| {
gst::warning!(CAT, obj = pad, "No caps for pad, using NULL caps for now");
None::<gst::Caps>
});
streams.append(caps);
}
let caps = gst::Caps::builder("multistream/x-analytics-batch")
.features([gst_analytics::CAPS_FEATURE_META_ANALYTICS_BATCH_META])
.field("streams", streams)
.build();
gst::debug!(CAT, imp = self, "Configuring caps {caps:?}");
drop(state_guard);
self.obj().set_src_caps(&caps);
true
}
}
@ -671,7 +557,6 @@ impl AnalyticsCombiner {
);
let buffer = BatchBuffer {
running_time: None,
sticky_events: pad_state.sticky_events.values().cloned().collect(),
serialized_events: pad_state.pending_serialized_events.drain(..).collect(),
buffer: Some(buffer),
};
@ -711,7 +596,6 @@ impl AnalyticsCombiner {
let buffer = BatchBuffer {
running_time: Some(running_time),
sticky_events: pad_state.sticky_events.values().cloned().collect(),
serialized_events: pad_state.pending_serialized_events.drain(..).collect(),
buffer: Some(buffer),
};
@ -760,7 +644,7 @@ impl AnalyticsCombiner {
meta.n_streams = state.sinkpads.len();
}
for (idx, pad) in state.sinkpads.iter().enumerate() {
for pad in state.sinkpads.iter() {
let pad_imp = pad.imp();
let mut pad_state = pad_imp.state.lock().unwrap();
let pad_settings = pad_imp.settings.lock().unwrap().clone();
@ -908,12 +792,27 @@ impl AnalyticsCombiner {
if pad_state.pending_buffers.is_empty() {
let buffer = BatchBuffer {
running_time: None,
sticky_events: pad_state.sticky_events.values().cloned().collect(),
serialized_events: pad_state.pending_serialized_events.drain(..).collect(),
buffer: None,
};
pad_state.pending_buffers.push_back(buffer);
} else if let Some(mut b) = pad_state.pending_buffers.pop_front() {
while !b.serialized_events.is_empty() {
let event = &mut b.serialized_events[0];
if !event.is_sticky() {
break;
}
if pad_state.store_sticky_event(event) {
self.obj().src_pad().mark_reconfigure();
}
b.serialized_events.remove(0);
}
pad_state.pending_buffers.push_front(b);
}
}
for (idx, pad) in state.sinkpads.iter().enumerate() {
let mut pad_state = pad.imp().state.lock().unwrap();
// And finally fill the meta
unsafe {
@ -923,19 +822,47 @@ impl AnalyticsCombiner {
let stream = &mut *meta.streams.add(idx);
stream.index = idx as u32;
stream.buffers = glib::ffi::g_malloc0_n(
pad_state.pending_buffers.len(),
mem::size_of::<gst_analytics::ffi::GstAnalyticsBatchBuffer>(),
)
as *mut gst_analytics::ffi::GstAnalyticsBatchBuffer;
stream.n_buffers = pad_state.pending_buffers.len();
let sticky_events: Vec<_> = pad_state.sticky_events.values().collect();
for (buffer_idx, mut buffer) in pad_state.pending_buffers.drain(..).enumerate()
stream.n_sticky_events = sticky_events.len();
stream.sticky_events = glib::ffi::g_malloc0_n(
stream.n_sticky_events,
mem::size_of::<*mut gst::ffi::GstEvent>(),
) as *mut *mut gst::ffi::GstEvent;
for (event_id, event) in sticky_events.into_iter().enumerate() {
*stream.sticky_events.add(event_id) = event.clone().into_glib_ptr();
}
stream.n_objects = pad_state
.pending_buffers
.iter()
.map(|b| b.serialized_events.len() + b.buffer.is_some() as usize)
.sum();
stream.objects = glib::ffi::g_malloc0_n(
stream.n_objects,
mem::size_of::<*mut gst::ffi::GstMiniObject>(),
) as *mut *mut gst::ffi::GstMiniObject;
let mut count = 0;
for buffer in pad_state
.pending_buffers
.drain(..)
.collect::<Vec<_>>()
.into_iter()
{
let buffer_storage = &mut *stream.buffers.add(buffer_idx);
for ev in buffer.serialized_events.into_iter() {
if ev.is_sticky() && pad_state.store_sticky_event(&ev) {
self.obj().src_pad().mark_reconfigure();
}
*stream.objects.add(count) = ev.upcast().into_glib_ptr();
count += 1;
}
// Replace GAP buffers with a GAP event
if let Some(ref b) = buffer.buffer {
if let Some(b) = buffer.buffer {
if b.flags().contains(gst::BufferFlags::GAP)
&& b.flags().contains(gst::BufferFlags::DROPPABLE)
&& b.size() == 0
@ -943,40 +870,12 @@ impl AnalyticsCombiner {
let ev = gst::event::Gap::builder(b.pts().unwrap())
.duration(b.duration())
.build();
buffer.buffer = None;
buffer.serialized_events.push(ev);
*stream.objects.add(count) = ev.upcast().into_glib_ptr();
} else {
*stream.objects.add(count) = b.upcast().into_glib_ptr();
}
count += 1;
}
buffer_storage.sticky_events = glib::ffi::g_malloc0_n(
buffer.sticky_events.len(),
mem::size_of::<*mut gst::ffi::GstEvent>(),
)
as *mut *mut gst::ffi::GstEvent;
buffer_storage.n_sticky_events = buffer.sticky_events.len();
for (event_idx, event) in buffer.sticky_events.into_iter().enumerate() {
*buffer_storage.sticky_events.add(event_idx) = event.into_glib_ptr();
}
if !buffer.serialized_events.is_empty() {
buffer_storage.serialized_events = glib::ffi::g_malloc0_n(
buffer.serialized_events.len(),
mem::size_of::<*mut gst::ffi::GstEvent>(),
)
as *mut *mut gst::ffi::GstEvent;
buffer_storage.n_serialized_events = buffer.serialized_events.len();
for (event_idx, event) in
buffer.serialized_events.into_iter().enumerate()
{
*buffer_storage.serialized_events.add(event_idx) =
event.into_glib_ptr();
}
} else {
buffer_storage.serialized_events = ptr::null_mut();
buffer_storage.n_serialized_events = 0;
}
buffer_storage.buffer = buffer.buffer.into_glib_ptr();
}
}
}
@ -984,6 +883,36 @@ impl AnalyticsCombiner {
buffer
}
fn negotiate(&self, state_guard: std::sync::MutexGuard<State>) {
if !self.obj().src_pad().check_reconfigure() {
return;
}
let mut streams = gst::Array::default();
for pad in &state_guard.sinkpads {
let pad_imp = pad.imp();
let pad_state = pad_imp.state.lock().unwrap();
let caps = pad_state.caps.clone().unwrap_or_else(|| {
gst::warning!(CAT, obj = pad, "No caps for pad, using empty caps for now");
gst::Caps::new_empty()
});
streams.append(caps);
}
let caps = gst::Caps::builder("multistream/x-analytics-batch")
.features([gst_analytics::CAPS_FEATURE_META_ANALYTICS_BATCH_META])
.field("streams", streams)
.build();
gst::debug!(CAT, imp = self, "Configuring caps {caps:?}");
drop(state_guard);
self.obj().set_src_caps(&caps);
}
}
#[derive(Debug, Clone, Copy)]
@ -1015,18 +944,86 @@ impl cmp::Ord for OrderedEventType {
struct BatchBuffer {
running_time: Option<gst::Signed<gst::ClockTime>>,
sticky_events: Vec<gst::Event>,
serialized_events: Vec<gst::Event>,
buffer: Option<gst::Buffer>,
}
#[derive(Default)]
struct PadState {
pending_serialized_events: VecDeque<gst::Event>,
sticky_events: BTreeMap<(OrderedEventType, Option<String>), gst::Event>,
pending_serialized_events: VecDeque<gst::Event>,
pending_buffers: VecDeque<BatchBuffer>,
current_caps: Option<gst::Caps>,
previous_buffer: Option<BatchBuffer>,
caps: Option<gst::Caps>,
}
impl PadState {
fn store_sticky_event(&mut self, event: &gst::Event) -> bool {
use gst::EventView;
let mut ret = false;
match event.view() {
EventView::Caps(caps) => {
let caps = caps.caps_owned();
gst::debug!(CAT, "Received new caps {caps:?}");
self.caps = Some(caps);
// Renegotiate before the next aggregate call
ret = true;
}
EventView::StreamStart(ev) => {
let stream_id_changed = self
.sticky_events
.get(&(OrderedEventType(gst::EventType::StreamStart), None))
.is_none_or(|old_ev| {
let new_stream_id = ev.stream_id();
let old_stream_id = {
let gst::EventView::StreamStart(old_ev) = old_ev.view() else {
unreachable!();
};
old_ev.stream_id()
};
new_stream_id != old_stream_id
});
self.sticky_events
.remove(&(OrderedEventType(gst::EventType::Eos), None));
self.sticky_events
.remove(&(OrderedEventType(gst::EventType::StreamGroupDone), None));
if stream_id_changed {
self.sticky_events
.remove(&(OrderedEventType(gst::EventType::Tag), None));
}
}
EventView::FlushStop(_) => {
self.sticky_events
.remove(&(OrderedEventType(gst::EventType::Eos), None));
self.sticky_events
.remove(&(OrderedEventType(gst::EventType::StreamGroupDone), None));
self.sticky_events
.remove(&(OrderedEventType(gst::EventType::Segment), None));
self.pending_buffers.clear();
self.previous_buffer = None;
}
_ => (),
}
let key = if event.is_sticky_multi() {
(
OrderedEventType(event.type_()),
event.structure().map(|s| s.name().to_string()),
)
} else {
(OrderedEventType(event.type_()), None)
};
self.sticky_events.insert(key, event.clone());
ret
}
}
#[derive(Default, Clone)]
@ -1100,24 +1097,4 @@ impl GstObjectImpl for AnalyticsCombinerSinkPad {}
impl PadImpl for AnalyticsCombinerSinkPad {}
impl AggregatorPadImpl for AnalyticsCombinerSinkPad {
fn flush(&self, aggregator: &gst_base::Aggregator) -> Result<gst::FlowSuccess, gst::FlowError> {
let mut pad_state = self.state.lock().unwrap();
pad_state
.sticky_events
.remove(&(OrderedEventType(gst::EventType::Eos), None));
pad_state
.sticky_events
.remove(&(OrderedEventType(gst::EventType::StreamGroupDone), None));
pad_state
.sticky_events
.remove(&(OrderedEventType(gst::EventType::Segment), None));
pad_state.pending_serialized_events.clear();
pad_state.pending_buffers.clear();
pad_state.previous_buffer = None;
self.parent_flush(aggregator)
}
}
impl AggregatorPadImpl for AnalyticsCombinerSinkPad {}

View file

@ -189,25 +189,22 @@ impl AnalyticsSplitter {
let mut res = Ok(gst::FlowSuccess::Ok);
'next_stream: for (pad, stream) in Iterator::zip(pads.into_iter(), meta.streams().iter()) {
for buffer in stream.buffers() {
for event in buffer.sticky_events() {
gst::trace!(CAT, obj = pad, "Storing sticky event {event:?}");
let _ = pad.store_sticky_event(event);
}
for event in stream.sticky_events() {
gst::trace!(CAT, obj = pad, "Storing sticky event {event:?}");
let _ = pad.store_sticky_event(event);
}
for event in buffer.serialized_events() {
for object in stream.objects() {
if let Some(event) = object.downcast_ref::<gst::Event>() {
gst::trace!(CAT, obj = pad, "Pushing serialized event {event:?}");
let _ = pad.push_event(event.clone());
}
if let Some(buffer) = buffer.buffer_owned() {
pad.push_event(event.clone());
} else if let Some(buffer) = object.downcast_ref::<gst::Buffer>() {
gst::trace!(CAT, obj = pad, "Pushing buffer {buffer:?}");
let pad_res = pad.push(buffer);
let pad_res = pad.push(buffer.clone());
res = combiner.update_pad_flow(&pad, pad_res);
}
if let Some(buffer_list) = buffer.buffer_list_owned() {
} else if let Some(buffer_list) = object.downcast_ref::<gst::BufferList>() {
gst::trace!(CAT, obj = pad, "Pushing buffer list {buffer_list:?}");
let pad_res = pad.push_list(buffer_list);
let pad_res = pad.push_list(buffer_list.clone());
res = combiner.update_pad_flow(&pad, pad_res);
}

View file

@ -86,37 +86,39 @@ fn test_combine_multi() {
assert_eq!(streams.len(), 2);
let stream = &streams[0];
assert_eq!(stream.index(), 0);
let buffers = stream.buffers();
assert_eq!(buffers.len(), 10);
for (idx, buffer) in buffers.iter().enumerate() {
assert_eq!(
buffer.stream_id(),
Some(sink_0.stream_id().unwrap().as_gstr())
);
assert_eq!(
buffer.segment(),
Some(gst::FormattedSegment::<gst::ClockTime>::new().upcast())
);
assert_eq!(buffer.caps().as_ref(), Some(&h0_caps));
let b = buffer.buffer().unwrap();
assert_eq!(
stream.stream_id(),
Some(sink_0.stream_id().unwrap().as_gstr())
);
assert_eq!(
stream.segment(),
Some(gst::FormattedSegment::<gst::ClockTime>::new().upcast())
);
assert_eq!(stream.caps().as_ref(), Some(&h0_caps));
let objects = stream.objects();
assert_eq!(objects.len(), 10);
for (idx, object) in objects.iter().enumerate() {
let b = object.downcast_ref::<gst::Buffer>().unwrap();
assert_eq!(b.pts(), Some(idx as u64 * 20.mseconds()));
assert_eq!(b.duration(), Some(20.mseconds()));
}
let stream = &streams[1];
assert_eq!(stream.index(), 1);
let buffers = stream.buffers();
assert_eq!(buffers.len(), 5);
for (idx, buffer) in buffers.iter().enumerate() {
assert_eq!(
buffer.stream_id(),
Some(sink_1.stream_id().unwrap().as_gstr())
);
assert_eq!(
buffer.segment(),
Some(gst::FormattedSegment::<gst::ClockTime>::new().upcast())
);
assert_eq!(buffer.caps().as_ref(), Some(&h1_caps));
let b = buffer.buffer().unwrap();
assert_eq!(
stream.stream_id(),
Some(sink_1.stream_id().unwrap().as_gstr())
);
assert_eq!(
stream.segment(),
Some(gst::FormattedSegment::<gst::ClockTime>::new().upcast())
);
assert_eq!(stream.caps().as_ref(), Some(&h1_caps));
let objects = stream.objects();
assert_eq!(objects.len(), 5);
for (idx, object) in objects.iter().enumerate() {
let b = object.downcast_ref::<gst::Buffer>().unwrap();
assert_eq!(b.pts(), Some(idx as u64 * 40.mseconds()));
assert_eq!(b.duration(), Some(40.mseconds()));
}
@ -133,37 +135,39 @@ fn test_combine_multi() {
assert_eq!(streams.len(), 2);
let stream = &streams[0];
assert_eq!(stream.index(), 0);
let buffers = stream.buffers();
assert_eq!(buffers.len(), 2);
for (idx, buffer) in buffers.iter().enumerate() {
assert_eq!(
buffer.stream_id(),
Some(sink_0.stream_id().unwrap().as_gstr())
);
assert_eq!(
buffer.segment(),
Some(gst::FormattedSegment::<gst::ClockTime>::new().upcast())
);
assert_eq!(buffer.caps().as_ref(), Some(&h0_caps));
let b = buffer.buffer().unwrap();
assert_eq!(
stream.stream_id(),
Some(sink_0.stream_id().unwrap().as_gstr())
);
assert_eq!(
stream.segment(),
Some(gst::FormattedSegment::<gst::ClockTime>::new().upcast())
);
assert_eq!(stream.caps().as_ref(), Some(&h0_caps));
let objects = stream.objects();
assert_eq!(objects.len(), 2);
for (idx, object) in objects.iter().enumerate() {
let b = object.downcast_ref::<gst::Buffer>().unwrap();
assert_eq!(b.pts(), Some(200.mseconds() + idx as u64 * 20.mseconds()));
assert_eq!(b.duration(), Some(20.mseconds()));
}
let stream = &streams[1];
assert_eq!(stream.index(), 1);
let buffers = stream.buffers();
assert_eq!(buffers.len(), 1);
for (idx, buffer) in buffers.iter().enumerate() {
assert_eq!(
buffer.stream_id(),
Some(sink_1.stream_id().unwrap().as_gstr())
);
assert_eq!(
buffer.segment(),
Some(gst::FormattedSegment::<gst::ClockTime>::new().upcast())
);
assert_eq!(buffer.caps().as_ref(), Some(&h1_caps));
let b = buffer.buffer().unwrap();
assert_eq!(
stream.stream_id(),
Some(sink_1.stream_id().unwrap().as_gstr())
);
assert_eq!(
stream.segment(),
Some(gst::FormattedSegment::<gst::ClockTime>::new().upcast())
);
assert_eq!(stream.caps().as_ref(), Some(&h1_caps));
let objects = stream.objects();
assert_eq!(objects.len(), 1);
for (idx, object) in objects.iter().enumerate() {
let b = object.downcast_ref::<gst::Buffer>().unwrap();
assert_eq!(b.pts(), Some(200.mseconds() + idx as u64 * 40.mseconds()));
assert_eq!(b.duration(), Some(40.mseconds()));
}
@ -240,19 +244,19 @@ fn test_strategy_all() {
assert_eq!(streams.len(), 1);
let stream = &streams[0];
assert_eq!(stream.index(), 0);
let buffers = stream.buffers();
assert_eq!(buffers.len(), 3);
for (idx, buffer) in buffers.iter().enumerate() {
let objects = stream.objects();
assert_eq!(objects.len(), 3);
for (idx, object) in objects.iter().enumerate() {
assert_eq!(
buffer.stream_id(),
stream.stream_id(),
Some(sink_0.stream_id().unwrap().as_gstr())
);
assert_eq!(
buffer.segment(),
stream.segment(),
Some(gst::FormattedSegment::<gst::ClockTime>::new().upcast())
);
assert_eq!(buffer.caps().as_ref(), Some(&h_caps));
let b = buffer.buffer().unwrap();
assert_eq!(stream.caps().as_ref(), Some(&h_caps));
let b = object.downcast_ref::<gst::Buffer>().unwrap();
assert_eq!(b.pts(), Some(gst::ClockTime::from_mseconds(ptss[idx])));
assert_eq!(b.duration(), Some(33_333_333.nseconds()));
}
@ -277,19 +281,19 @@ fn test_strategy_all() {
assert_eq!(streams.len(), 1);
let stream = &streams[0];
assert_eq!(stream.index(), 0);
let buffers = stream.buffers();
assert_eq!(buffers.len(), 2);
for (idx, buffer) in buffers.iter().enumerate() {
let objects = stream.objects();
assert_eq!(objects.len(), 2);
for (idx, object) in objects.iter().enumerate() {
assert_eq!(
buffer.stream_id(),
stream.stream_id(),
Some(sink_0.stream_id().unwrap().as_gstr())
);
assert_eq!(
buffer.segment(),
stream.segment(),
Some(gst::FormattedSegment::<gst::ClockTime>::new().upcast())
);
assert_eq!(buffer.caps().as_ref(), Some(&h_caps));
let b = buffer.buffer().unwrap();
assert_eq!(stream.caps().as_ref(), Some(&h_caps));
let b = object.downcast_ref::<gst::Buffer>().unwrap();
assert_eq!(b.pts(), Some(gst::ClockTime::from_mseconds(ptss[idx])));
assert_eq!(b.duration(), Some(33_333_333.nseconds()));
}
@ -314,19 +318,19 @@ fn test_strategy_all() {
assert_eq!(streams.len(), 1);
let stream = &streams[0];
assert_eq!(stream.index(), 0);
let buffers = stream.buffers();
assert_eq!(buffers.len(), 4);
for (idx, buffer) in buffers.iter().enumerate() {
let objects = stream.objects();
assert_eq!(objects.len(), 4);
for (idx, object) in objects.iter().enumerate() {
assert_eq!(
buffer.stream_id(),
stream.stream_id(),
Some(sink_0.stream_id().unwrap().as_gstr())
);
assert_eq!(
buffer.segment(),
stream.segment(),
Some(gst::FormattedSegment::<gst::ClockTime>::new().upcast())
);
assert_eq!(buffer.caps().as_ref(), Some(&h_caps));
let b = buffer.buffer().unwrap();
assert_eq!(stream.caps().as_ref(), Some(&h_caps));
let b = object.downcast_ref::<gst::Buffer>().unwrap();
assert_eq!(b.pts(), Some(gst::ClockTime::from_mseconds(ptss[idx])));
assert_eq!(b.duration(), Some(33_333_333.nseconds()));
}
@ -341,19 +345,20 @@ fn test_strategy_all() {
assert_eq!(streams.len(), 1);
let stream = &streams[0];
assert_eq!(stream.index(), 0);
let buffers = stream.buffers();
assert_eq!(buffers.len(), 1);
let buffer = &buffers[0];
assert_eq!(
buffer.stream_id(),
stream.stream_id(),
Some(sink_0.stream_id().unwrap().as_gstr())
);
assert_eq!(
buffer.segment(),
stream.segment(),
Some(gst::FormattedSegment::<gst::ClockTime>::new().upcast())
);
assert_eq!(buffer.caps().as_ref(), Some(&h_caps));
let b = buffer.buffer().unwrap();
assert_eq!(stream.caps().as_ref(), Some(&h_caps));
let objects = stream.objects();
assert_eq!(objects.len(), 1);
let object = &objects[0];
let b = object.downcast_ref::<gst::Buffer>().unwrap();
assert_eq!(b.pts(), Some(gst::ClockTime::from_mseconds(300)));
assert_eq!(b.duration(), Some(33_333_333.nseconds()));
@ -428,19 +433,19 @@ fn test_strategy_first() {
assert_eq!(streams.len(), 1);
let stream = &streams[0];
assert_eq!(stream.index(), 0);
let buffers = stream.buffers();
assert_eq!(buffers.len(), 1);
let buffer = &buffers[0];
assert_eq!(
buffer.stream_id(),
stream.stream_id(),
Some(sink_0.stream_id().unwrap().as_gstr())
);
assert_eq!(
buffer.segment(),
stream.segment(),
Some(gst::FormattedSegment::<gst::ClockTime>::new().upcast())
);
assert_eq!(buffer.caps().as_ref(), Some(&h_caps));
let b = buffer.buffer().unwrap();
assert_eq!(stream.caps().as_ref(), Some(&h_caps));
let objects = stream.objects();
assert_eq!(objects.len(), 1);
let object = &objects[0];
let b = object.downcast_ref::<gst::Buffer>().unwrap();
assert_eq!(b.pts(), Some(gst::ClockTime::from_mseconds(0)));
assert_eq!(b.duration(), Some(33_333_333.nseconds()));
@ -463,19 +468,19 @@ fn test_strategy_first() {
assert_eq!(streams.len(), 1);
let stream = &streams[0];
assert_eq!(stream.index(), 0);
let buffers = stream.buffers();
assert_eq!(buffers.len(), 1);
let buffer = &buffers[0];
assert_eq!(
buffer.stream_id(),
stream.stream_id(),
Some(sink_0.stream_id().unwrap().as_gstr())
);
assert_eq!(
buffer.segment(),
stream.segment(),
Some(gst::FormattedSegment::<gst::ClockTime>::new().upcast())
);
assert_eq!(buffer.caps().as_ref(), Some(&h_caps));
let b = buffer.buffer().unwrap();
assert_eq!(stream.caps().as_ref(), Some(&h_caps));
let objects = stream.objects();
assert_eq!(objects.len(), 1);
let object = &objects[0];
let b = object.downcast_ref::<gst::Buffer>().unwrap();
assert_eq!(b.pts(), Some(gst::ClockTime::from_mseconds(100)));
assert_eq!(b.duration(), Some(33_333_333.nseconds()));
@ -498,19 +503,19 @@ fn test_strategy_first() {
assert_eq!(streams.len(), 1);
let stream = &streams[0];
assert_eq!(stream.index(), 0);
let buffers = stream.buffers();
assert_eq!(buffers.len(), 1);
let buffer = &buffers[0];
assert_eq!(
buffer.stream_id(),
stream.stream_id(),
Some(sink_0.stream_id().unwrap().as_gstr())
);
assert_eq!(
buffer.segment(),
stream.segment(),
Some(gst::FormattedSegment::<gst::ClockTime>::new().upcast())
);
assert_eq!(buffer.caps().as_ref(), Some(&h_caps));
let b = buffer.buffer().unwrap();
assert_eq!(stream.caps().as_ref(), Some(&h_caps));
let objects = stream.objects();
assert_eq!(objects.len(), 1);
let object = &objects[0];
let b = object.downcast_ref::<gst::Buffer>().unwrap();
assert_eq!(b.pts(), Some(gst::ClockTime::from_mseconds(200)));
assert_eq!(b.duration(), Some(33_333_333.nseconds()));
@ -524,19 +529,19 @@ fn test_strategy_first() {
assert_eq!(streams.len(), 1);
let stream = &streams[0];
assert_eq!(stream.index(), 0);
let buffers = stream.buffers();
assert_eq!(buffers.len(), 1);
let buffer = &buffers[0];
assert_eq!(
buffer.stream_id(),
stream.stream_id(),
Some(sink_0.stream_id().unwrap().as_gstr())
);
assert_eq!(
buffer.segment(),
stream.segment(),
Some(gst::FormattedSegment::<gst::ClockTime>::new().upcast())
);
assert_eq!(buffer.caps().as_ref(), Some(&h_caps));
let b = buffer.buffer().unwrap();
assert_eq!(stream.caps().as_ref(), Some(&h_caps));
let objects = stream.objects();
assert_eq!(objects.len(), 1);
let object = &objects[0];
let b = object.downcast_ref::<gst::Buffer>().unwrap();
assert_eq!(b.pts(), Some(gst::ClockTime::from_mseconds(300)));
assert_eq!(b.duration(), Some(33_333_333.nseconds()));
@ -611,19 +616,19 @@ fn test_strategy_first_with_overlap() {
assert_eq!(streams.len(), 1);
let stream = &streams[0];
assert_eq!(stream.index(), 0);
let buffers = stream.buffers();
assert_eq!(buffers.len(), 1);
let buffer = &buffers[0];
assert_eq!(
buffer.stream_id(),
stream.stream_id(),
Some(sink_0.stream_id().unwrap().as_gstr())
);
assert_eq!(
buffer.segment(),
stream.segment(),
Some(gst::FormattedSegment::<gst::ClockTime>::new().upcast())
);
assert_eq!(buffer.caps().as_ref(), Some(&h_caps));
let b = buffer.buffer().unwrap();
assert_eq!(stream.caps().as_ref(), Some(&h_caps));
let objects = stream.objects();
assert_eq!(objects.len(), 1);
let object = &objects[0];
let b = object.downcast_ref::<gst::Buffer>().unwrap();
assert_eq!(b.pts(), Some(gst::ClockTime::from_mseconds(0)));
assert_eq!(b.duration(), Some(33_333_333.nseconds()));
@ -646,19 +651,19 @@ fn test_strategy_first_with_overlap() {
assert_eq!(streams.len(), 1);
let stream = &streams[0];
assert_eq!(stream.index(), 0);
let buffers = stream.buffers();
assert_eq!(buffers.len(), 1);
let buffer = &buffers[0];
assert_eq!(
buffer.stream_id(),
stream.stream_id(),
Some(sink_0.stream_id().unwrap().as_gstr())
);
assert_eq!(
buffer.segment(),
stream.segment(),
Some(gst::FormattedSegment::<gst::ClockTime>::new().upcast())
);
assert_eq!(buffer.caps().as_ref(), Some(&h_caps));
let b = buffer.buffer().unwrap();
assert_eq!(stream.caps().as_ref(), Some(&h_caps));
let objects = stream.objects();
assert_eq!(objects.len(), 1);
let object = &objects[0];
let b = object.downcast_ref::<gst::Buffer>().unwrap();
assert_eq!(b.pts(), Some(gst::ClockTime::from_mseconds(100)));
assert_eq!(b.duration(), Some(33_333_333.nseconds()));
@ -681,19 +686,19 @@ fn test_strategy_first_with_overlap() {
assert_eq!(streams.len(), 1);
let stream = &streams[0];
assert_eq!(stream.index(), 0);
let buffers = stream.buffers();
assert_eq!(buffers.len(), 1);
let buffer = &buffers[0];
assert_eq!(
buffer.stream_id(),
stream.stream_id(),
Some(sink_0.stream_id().unwrap().as_gstr())
);
assert_eq!(
buffer.segment(),
stream.segment(),
Some(gst::FormattedSegment::<gst::ClockTime>::new().upcast())
);
assert_eq!(buffer.caps().as_ref(), Some(&h_caps));
let b = buffer.buffer().unwrap();
assert_eq!(stream.caps().as_ref(), Some(&h_caps));
let objects = stream.objects();
assert_eq!(objects.len(), 1);
let object = &objects[0];
let b = object.downcast_ref::<gst::Buffer>().unwrap();
assert_eq!(b.pts(), Some(gst::ClockTime::from_mseconds(199)));
assert_eq!(b.duration(), Some(33_333_333.nseconds()));
@ -707,19 +712,19 @@ fn test_strategy_first_with_overlap() {
assert_eq!(streams.len(), 1);
let stream = &streams[0];
assert_eq!(stream.index(), 0);
let buffers = stream.buffers();
assert_eq!(buffers.len(), 1);
let buffer = &buffers[0];
assert_eq!(
buffer.stream_id(),
stream.stream_id(),
Some(sink_0.stream_id().unwrap().as_gstr())
);
assert_eq!(
buffer.segment(),
stream.segment(),
Some(gst::FormattedSegment::<gst::ClockTime>::new().upcast())
);
assert_eq!(buffer.caps().as_ref(), Some(&h_caps));
let b = buffer.buffer().unwrap();
assert_eq!(stream.caps().as_ref(), Some(&h_caps));
let objects = stream.objects();
assert_eq!(objects.len(), 1);
let object = &objects[0];
let b = object.downcast_ref::<gst::Buffer>().unwrap();
assert_eq!(b.pts(), Some(gst::ClockTime::from_mseconds(301)));
assert_eq!(b.duration(), Some(33_333_333.nseconds()));
@ -794,19 +799,19 @@ fn test_strategy_last() {
assert_eq!(streams.len(), 1);
let stream = &streams[0];
assert_eq!(stream.index(), 0);
let buffers = stream.buffers();
assert_eq!(buffers.len(), 1);
let buffer = &buffers[0];
assert_eq!(
buffer.stream_id(),
stream.stream_id(),
Some(sink_0.stream_id().unwrap().as_gstr())
);
assert_eq!(
buffer.segment(),
stream.segment(),
Some(gst::FormattedSegment::<gst::ClockTime>::new().upcast())
);
assert_eq!(buffer.caps().as_ref(), Some(&h_caps));
let b = buffer.buffer().unwrap();
assert_eq!(stream.caps().as_ref(), Some(&h_caps));
let objects = stream.objects();
assert_eq!(objects.len(), 1);
let object = &objects[0];
let b = object.downcast_ref::<gst::Buffer>().unwrap();
assert_eq!(b.pts(), Some(gst::ClockTime::from_mseconds(66)));
assert_eq!(b.duration(), Some(33_333_333.nseconds()));
@ -829,19 +834,19 @@ fn test_strategy_last() {
assert_eq!(streams.len(), 1);
let stream = &streams[0];
assert_eq!(stream.index(), 0);
let buffers = stream.buffers();
assert_eq!(buffers.len(), 1);
let buffer = &buffers[0];
let objects = stream.objects();
assert_eq!(objects.len(), 1);
let object = &objects[0];
assert_eq!(
buffer.stream_id(),
stream.stream_id(),
Some(sink_0.stream_id().unwrap().as_gstr())
);
assert_eq!(
buffer.segment(),
stream.segment(),
Some(gst::FormattedSegment::<gst::ClockTime>::new().upcast())
);
assert_eq!(buffer.caps().as_ref(), Some(&h_caps));
let b = buffer.buffer().unwrap();
assert_eq!(stream.caps().as_ref(), Some(&h_caps));
let b = object.downcast_ref::<gst::Buffer>().unwrap();
assert_eq!(b.pts(), Some(gst::ClockTime::from_mseconds(133)));
assert_eq!(b.duration(), Some(33_333_333.nseconds()));
@ -864,19 +869,19 @@ fn test_strategy_last() {
assert_eq!(streams.len(), 1);
let stream = &streams[0];
assert_eq!(stream.index(), 0);
let buffers = stream.buffers();
assert_eq!(buffers.len(), 1);
let buffer = &buffers[0];
let objects = stream.objects();
assert_eq!(objects.len(), 1);
let object = &objects[0];
assert_eq!(
buffer.stream_id(),
stream.stream_id(),
Some(sink_0.stream_id().unwrap().as_gstr())
);
assert_eq!(
buffer.segment(),
stream.segment(),
Some(gst::FormattedSegment::<gst::ClockTime>::new().upcast())
);
assert_eq!(buffer.caps().as_ref(), Some(&h_caps));
let b = buffer.buffer().unwrap();
assert_eq!(stream.caps().as_ref(), Some(&h_caps));
let b = object.downcast_ref::<gst::Buffer>().unwrap();
assert_eq!(b.pts(), Some(gst::ClockTime::from_mseconds(266)));
assert_eq!(b.duration(), Some(33_333_333.nseconds()));
@ -890,19 +895,19 @@ fn test_strategy_last() {
assert_eq!(streams.len(), 1);
let stream = &streams[0];
assert_eq!(stream.index(), 0);
let buffers = stream.buffers();
assert_eq!(buffers.len(), 1);
let buffer = &buffers[0];
let objects = stream.objects();
assert_eq!(objects.len(), 1);
let object = &objects[0];
assert_eq!(
buffer.stream_id(),
stream.stream_id(),
Some(sink_0.stream_id().unwrap().as_gstr())
);
assert_eq!(
buffer.segment(),
stream.segment(),
Some(gst::FormattedSegment::<gst::ClockTime>::new().upcast())
);
assert_eq!(buffer.caps().as_ref(), Some(&h_caps));
let b = buffer.buffer().unwrap();
assert_eq!(stream.caps().as_ref(), Some(&h_caps));
let b = object.downcast_ref::<gst::Buffer>().unwrap();
assert_eq!(b.pts(), Some(gst::ClockTime::from_mseconds(300)));
assert_eq!(b.duration(), Some(33_333_333.nseconds()));
@ -991,32 +996,30 @@ fn test_combine_multi_initial_gap() {
assert_eq!(streams.len(), 2);
let stream = &streams[0];
assert_eq!(stream.index(), 0);
let buffers = stream.buffers();
assert_eq!(buffers.len(), 10);
for (idx, buffer) in buffers.iter().enumerate() {
let objects = stream.objects();
assert_eq!(objects.len(), 10);
for (idx, object) in objects.iter().enumerate() {
assert_eq!(
buffer.stream_id(),
stream.stream_id(),
Some(sink_0.stream_id().unwrap().as_gstr())
);
assert_eq!(
buffer.segment(),
stream.segment(),
Some(gst::FormattedSegment::<gst::ClockTime>::new().upcast())
);
assert_eq!(buffer.caps().as_ref(), Some(&h0_caps));
let b = buffer.buffer().unwrap();
assert_eq!(stream.caps().as_ref(), Some(&h0_caps));
let b = object.downcast_ref::<gst::Buffer>().unwrap();
assert_eq!(b.pts(), Some(idx as u64 * 20.mseconds()));
assert_eq!(b.duration(), Some(20.mseconds()));
}
let stream = &streams[1];
assert_eq!(stream.index(), 1);
let buffers = stream.buffers();
let objects = stream.objects();
// Only an empty buffer with no events or anything for the second stream
assert_eq!(buffers.len(), 1);
let buffer = &buffers[0];
assert_eq!(buffer.stream_id(), None);
assert_eq!(buffer.segment(), None);
assert_eq!(buffer.caps().as_ref(), None);
assert_eq!(buffer.buffer(), None);
assert_eq!(objects.len(), 0);
assert_eq!(stream.stream_id(), None);
assert_eq!(stream.segment(), None);
assert_eq!(stream.caps().as_ref(), None);
// Now start the second stream
h1.set_src_caps(h1_caps.clone());
@ -1054,37 +1057,37 @@ fn test_combine_multi_initial_gap() {
assert_eq!(streams.len(), 2);
let stream = &streams[0];
assert_eq!(stream.index(), 0);
let buffers = stream.buffers();
assert_eq!(buffers.len(), 10);
for (idx, buffer) in buffers.iter().enumerate() {
let objects = stream.objects();
assert_eq!(objects.len(), 10);
for (idx, object) in objects.iter().enumerate() {
assert_eq!(
buffer.stream_id(),
stream.stream_id(),
Some(sink_0.stream_id().unwrap().as_gstr())
);
assert_eq!(
buffer.segment(),
stream.segment(),
Some(gst::FormattedSegment::<gst::ClockTime>::new().upcast())
);
assert_eq!(buffer.caps().as_ref(), Some(&h0_caps));
let b = buffer.buffer().unwrap();
assert_eq!(stream.caps().as_ref(), Some(&h0_caps));
let b = object.downcast_ref::<gst::Buffer>().unwrap();
assert_eq!(b.pts(), Some(200.mseconds() + idx as u64 * 20.mseconds()));
assert_eq!(b.duration(), Some(20.mseconds()));
}
let stream = &streams[1];
assert_eq!(stream.index(), 1);
let buffers = stream.buffers();
assert_eq!(buffers.len(), 5);
for (idx, buffer) in buffers.iter().enumerate() {
let objects = stream.objects();
assert_eq!(objects.len(), 5);
for (idx, object) in objects.iter().enumerate() {
assert_eq!(
buffer.stream_id(),
stream.stream_id(),
Some(sink_1.stream_id().unwrap().as_gstr())
);
assert_eq!(
buffer.segment(),
stream.segment(),
Some(gst::FormattedSegment::<gst::ClockTime>::new().upcast())
);
assert_eq!(buffer.caps().as_ref(), Some(&h1_caps));
let b = buffer.buffer().unwrap();
assert_eq!(stream.caps().as_ref(), Some(&h1_caps));
let b = object.downcast_ref::<gst::Buffer>().unwrap();
assert_eq!(b.pts(), Some(200.mseconds() + idx as u64 * 40.mseconds()));
assert_eq!(b.duration(), Some(40.mseconds()));
}
@ -1101,37 +1104,37 @@ fn test_combine_multi_initial_gap() {
assert_eq!(streams.len(), 2);
let stream = &streams[0];
assert_eq!(stream.index(), 0);
let buffers = stream.buffers();
assert_eq!(buffers.len(), 1);
for (idx, buffer) in buffers.iter().enumerate() {
let objects = stream.objects();
assert_eq!(objects.len(), 1);
for (idx, object) in objects.iter().enumerate() {
assert_eq!(
buffer.stream_id(),
stream.stream_id(),
Some(sink_0.stream_id().unwrap().as_gstr())
);
assert_eq!(
buffer.segment(),
stream.segment(),
Some(gst::FormattedSegment::<gst::ClockTime>::new().upcast())
);
assert_eq!(buffer.caps().as_ref(), Some(&h0_caps));
let b = buffer.buffer().unwrap();
assert_eq!(stream.caps().as_ref(), Some(&h0_caps));
let b = object.downcast_ref::<gst::Buffer>().unwrap();
assert_eq!(b.pts(), Some(400.mseconds() + idx as u64 * 20.mseconds()));
assert_eq!(b.duration(), Some(20.mseconds()));
}
let stream = &streams[1];
assert_eq!(stream.index(), 1);
let buffers = stream.buffers();
assert_eq!(buffers.len(), 1);
for (idx, buffer) in buffers.iter().enumerate() {
let objects = stream.objects();
assert_eq!(objects.len(), 1);
for (idx, object) in objects.iter().enumerate() {
assert_eq!(
buffer.stream_id(),
stream.stream_id(),
Some(sink_1.stream_id().unwrap().as_gstr())
);
assert_eq!(
buffer.segment(),
stream.segment(),
Some(gst::FormattedSegment::<gst::ClockTime>::new().upcast())
);
assert_eq!(buffer.caps().as_ref(), Some(&h1_caps));
let b = buffer.buffer().unwrap();
assert_eq!(stream.caps().as_ref(), Some(&h1_caps));
let b = object.downcast_ref::<gst::Buffer>().unwrap();
assert_eq!(b.pts(), Some(400.mseconds() + idx as u64 * 40.mseconds()));
assert_eq!(b.duration(), Some(40.mseconds()));
}