diff --git a/utils/tracers/src/queue_levels/imp.rs b/utils/tracers/src/queue_levels/imp.rs index 730f3b9d..145e00fb 100644 --- a/utils/tracers/src/queue_levels/imp.rs +++ b/utils/tracers/src/queue_levels/imp.rs @@ -68,6 +68,28 @@ static QUEUE_TYPE: Lazy = Lazy::new(|| { } }); +static QUEUE2_TYPE: Lazy = Lazy::new(|| { + if let Ok(queue) = gst::ElementFactory::make("queue2", None) { + queue.type_() + } else { + gst::warning!(CAT, "Can't instantiate queue2 element"); + glib::Type::INVALID + } +}); + +static MULTIQUEUE_TYPE: Lazy = Lazy::new(|| { + if let Ok(queue) = gst::ElementFactory::make("multiqueue", None) { + queue.type_() + } else { + gst::warning!(CAT, "Can't instantiate multiqueue element"); + glib::Type::INVALID + } +}); + +fn is_queue_type(type_: glib::Type) -> bool { + [*QUEUE_TYPE, *QUEUE2_TYPE, *MULTIQUEUE_TYPE].contains(&type_) +} + #[derive(Debug)] struct Settings { file: PathBuf, @@ -149,6 +171,7 @@ struct State { struct LogLine { timestamp: u64, name: Arc, + idx: Option, ptr: usize, cur_level_bytes: u32, cur_level_time: u64, @@ -180,6 +203,8 @@ impl ObjectImpl for QueueLevels { } Lazy::force(&QUEUE_TYPE); + Lazy::force(&QUEUE2_TYPE); + Lazy::force(&MULTIQUEUE_TYPE); self.register_hook(TracerHook::ElementNew); self.register_hook(TracerHook::ObjectDestroyed); @@ -224,6 +249,7 @@ impl ObjectImpl for QueueLevels { for LogLine { timestamp, name, + idx, ptr, cur_level_bytes, cur_level_time, @@ -233,7 +259,12 @@ impl ObjectImpl for QueueLevels { max_size_buffers, } in &state.log { - if let Err(err) = writeln!(&mut file, "{timestamp},{name},0x{ptr:08x},{cur_level_bytes},{cur_level_time},{cur_level_buffers},{max_size_bytes},{max_size_time},{max_size_buffers}") { + let res = if let Some(idx) = idx { + writeln!(&mut file, "{timestamp},{name}:{idx},0x{ptr:08x},{cur_level_bytes},{cur_level_time},{cur_level_buffers},{max_size_bytes},{max_size_time},{max_size_buffers}") + } else { + writeln!(&mut file, "{timestamp},{name},0x{ptr:08x},{cur_level_bytes},{cur_level_time},{cur_level_buffers},{max_size_bytes},{max_size_time},{max_size_buffers}") + }; + if let Err(err) = res { gst::error!(CAT, obj: obj, "Failed to write to file: {err}"); return; } @@ -245,7 +276,7 @@ impl GstObjectImpl for QueueLevels {} impl TracerImpl for QueueLevels { fn element_new(&self, _ts: u64, element: &gst::Element) { - if element.type_() != *QUEUE_TYPE { + if !is_queue_type(element.type_()) { return; } @@ -283,103 +314,65 @@ impl TracerImpl for QueueLevels { } fn pad_push_pre(&self, ts: u64, pad: &gst::Pad, _buffer: &gst::Buffer) { - let element = - if let Some(parent) = pad.parent().and_then(|p| p.downcast::().ok()) { - if parent.type_() == *QUEUE_TYPE { - parent - } else { - return; - } - } else { - return; - }; - - self.log(&element, ts); + if let Some(parent) = pad.parent().and_then(|p| p.downcast::().ok()) { + if is_queue_type(parent.type_()) { + self.log(&parent, Some(pad), ts); + } + } } fn pad_push_list_pre(&self, ts: u64, pad: &gst::Pad, _list: &gst::BufferList) { - let element = - if let Some(parent) = pad.parent().and_then(|p| p.downcast::().ok()) { - if parent.type_() == *QUEUE_TYPE { - parent - } else { - return; - } - } else { - return; - }; - - self.log(&element, ts); + if let Some(parent) = pad.parent().and_then(|p| p.downcast::().ok()) { + if is_queue_type(parent.type_()) { + self.log(&parent, Some(pad), ts); + } + } } #[cfg(not(feature = "v1_22"))] fn pad_push_post(&self, ts: u64, pad: &gst::Pad, _result: gst::FlowReturn) { - let element = if let Some(parent) = pad - .peer() - .and_then(|p| p.parent()) - .and_then(|p| p.downcast::().ok()) - { - if parent.type_() == *QUEUE_TYPE { - parent - } else { - return; + if let Some(peer) = pad.peer() { + if let Some(parent) = peer + .parent() + .and_then(|p| p.downcast::().ok()) + { + if is_queue_type(parent.type_()) { + self.log(&parent, Some(&peer), ts); + } } - } else { - return; - }; - - self.log(&element, ts); + } } #[cfg(not(feature = "v1_22"))] fn pad_push_list_post(&self, ts: u64, pad: &gst::Pad, _result: gst::FlowReturn) { - let element = if let Some(parent) = pad - .peer() - .and_then(|p| p.parent()) - .and_then(|p| p.downcast::().ok()) - { - if parent.type_() == *QUEUE_TYPE { - parent - } else { - return; + if let Some(peer) = pad.peer() { + if let Some(parent) = peer + .parent() + .and_then(|p| p.downcast::().ok()) + { + if is_queue_type(parent.type_()) { + self.log(&parent, Some(&peer), ts); + } } - } else { - return; - }; - - self.log(&element, ts); + } } #[cfg(feature = "v1_22")] fn pad_chain_post(&self, ts: u64, pad: &gst::Pad, _result: gst::FlowReturn) { - let element = - if let Some(parent) = pad.parent().and_then(|p| p.downcast::().ok()) { - if parent.type_() == *QUEUE_TYPE { - parent - } else { - return; - } - } else { - return; - }; - - self.log(&element, ts); + if let Some(parent) = pad.parent().and_then(|p| p.downcast::().ok()) { + if is_queue_type(parent.type_()) { + self.log(&parent, Some(pad), ts); + } + } } #[cfg(feature = "v1_22")] fn pad_chain_list_post(&self, ts: u64, pad: &gst::Pad, _result: gst::FlowReturn) { - let element = - if let Some(parent) = pad.parent().and_then(|p| p.downcast::().ok()) { - if parent.type_() == *QUEUE_TYPE { - parent - } else { - return; - } - } else { - return; - }; - - self.log(&element, ts); + if let Some(parent) = pad.parent().and_then(|p| p.downcast::().ok()) { + if is_queue_type(parent.type_()) { + self.log(&parent, Some(pad), ts); + } + } } fn element_change_state_post( @@ -393,11 +386,11 @@ impl TracerImpl for QueueLevels { return; } - if element.type_() != *QUEUE_TYPE { + if !is_queue_type(element.type_()) { return; } - self.log(element, ts); + self.log(element, None, ts); } fn pad_push_event_pre(&self, ts: u64, pad: &gst::Pad, ev: &gst::Event) { @@ -406,15 +399,15 @@ impl TracerImpl for QueueLevels { } if let Some(parent) = pad.parent().and_then(|p| p.downcast::().ok()) { - if parent.type_() == *QUEUE_TYPE { - self.log(&parent, ts); + if is_queue_type(parent.type_()) { + self.log(&parent, Some(pad), ts); } } } } impl QueueLevels { - fn log(&self, element: &gst::Element, timestamp: u64) { + fn log(&self, element: &gst::Element, pad: Option<&gst::Pad>, timestamp: u64) { let ptr = element.as_ptr() as usize; let mut state = self.state.lock().unwrap(); @@ -423,22 +416,79 @@ impl QueueLevels { None => return, }; - let cur_level_bytes = element.property::("current-level-bytes"); - let cur_level_time = element.property::("current-level-time"); - let cur_level_buffers = element.property::("current-level-buffers"); let max_size_bytes = element.property::("max-size-bytes"); let max_size_time = element.property::("max-size-time"); let max_size_buffers = element.property::("max-size-buffers"); - state.log.push(LogLine { - timestamp, - name, - ptr, - cur_level_bytes, - cur_level_time, - cur_level_buffers, - max_size_bytes, - max_size_time, - max_size_buffers, - }); + + if element.type_() == *MULTIQUEUE_TYPE { + let get_pad_idx = |pad: &gst::Pad| { + // SAFETY: Names can't change while there's a strong reference to the object + unsafe { + let name_ptr = (*pad.as_ptr()).object.name; + let name = std::ffi::CStr::from_ptr(name_ptr as *const _) + .to_str() + .unwrap(); + if let Some(idx) = name.strip_prefix("sink_") { + idx.parse::().unwrap() + } else if let Some(idx) = name.strip_prefix("src_") { + idx.parse::().unwrap() + } else { + unreachable!(); + } + } + }; + + if let Some(pad) = pad { + let cur_level_bytes = pad.property::("current-level-bytes"); + let cur_level_time = pad.property::("current-level-time"); + let cur_level_buffers = pad.property::("current-level-buffers"); + state.log.push(LogLine { + timestamp, + name, + idx: Some(get_pad_idx(pad)), + ptr, + cur_level_bytes, + cur_level_time, + cur_level_buffers, + max_size_bytes, + max_size_time, + max_size_buffers, + }); + } else { + for pad in element.sink_pads() { + let cur_level_bytes = pad.property::("current-level-bytes"); + let cur_level_time = pad.property::("current-level-time"); + let cur_level_buffers = pad.property::("current-level-buffers"); + state.log.push(LogLine { + timestamp, + name: name.clone(), + idx: Some(get_pad_idx(&pad)), + ptr, + cur_level_bytes, + cur_level_time, + cur_level_buffers, + max_size_bytes, + max_size_time, + max_size_buffers, + }); + } + } + } else { + let cur_level_bytes = element.property::("current-level-bytes"); + let cur_level_time = element.property::("current-level-time"); + let cur_level_buffers = element.property::("current-level-buffers"); + state.log.push(LogLine { + timestamp, + name, + idx: None, + ptr, + cur_level_bytes, + cur_level_time, + cur_level_buffers, + max_size_bytes, + max_size_time, + max_size_buffers, + }); + } } }