tracers: queue-levels: Add support for multiqueue and queue2

Fixes https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/issues/218
This commit is contained in:
Sebastian Dröge 2022-07-21 12:15:28 +03:00
parent 5ba1c98ae7
commit a45443251b

View file

@ -68,6 +68,28 @@ static QUEUE_TYPE: Lazy<glib::Type> = Lazy::new(|| {
} }
}); });
static QUEUE2_TYPE: Lazy<glib::Type> = Lazy::new(|| {
if let Ok(queue) = gst::ElementFactory::make("queue2", None) {
queue.type_()
} else {
gst::warning!(CAT, "Can't instantiate queue2 element");
glib::Type::INVALID
}
});
static MULTIQUEUE_TYPE: Lazy<glib::Type> = Lazy::new(|| {
if let Ok(queue) = gst::ElementFactory::make("multiqueue", None) {
queue.type_()
} else {
gst::warning!(CAT, "Can't instantiate multiqueue element");
glib::Type::INVALID
}
});
fn is_queue_type(type_: glib::Type) -> bool {
[*QUEUE_TYPE, *QUEUE2_TYPE, *MULTIQUEUE_TYPE].contains(&type_)
}
#[derive(Debug)] #[derive(Debug)]
struct Settings { struct Settings {
file: PathBuf, file: PathBuf,
@ -149,6 +171,7 @@ struct State {
struct LogLine { struct LogLine {
timestamp: u64, timestamp: u64,
name: Arc<glib::GString>, name: Arc<glib::GString>,
idx: Option<usize>,
ptr: usize, ptr: usize,
cur_level_bytes: u32, cur_level_bytes: u32,
cur_level_time: u64, cur_level_time: u64,
@ -180,6 +203,8 @@ impl ObjectImpl for QueueLevels {
} }
Lazy::force(&QUEUE_TYPE); Lazy::force(&QUEUE_TYPE);
Lazy::force(&QUEUE2_TYPE);
Lazy::force(&MULTIQUEUE_TYPE);
self.register_hook(TracerHook::ElementNew); self.register_hook(TracerHook::ElementNew);
self.register_hook(TracerHook::ObjectDestroyed); self.register_hook(TracerHook::ObjectDestroyed);
@ -224,6 +249,7 @@ impl ObjectImpl for QueueLevels {
for LogLine { for LogLine {
timestamp, timestamp,
name, name,
idx,
ptr, ptr,
cur_level_bytes, cur_level_bytes,
cur_level_time, cur_level_time,
@ -233,7 +259,12 @@ impl ObjectImpl for QueueLevels {
max_size_buffers, max_size_buffers,
} in &state.log } in &state.log
{ {
if let Err(err) = writeln!(&mut file, "{timestamp},{name},0x{ptr:08x},{cur_level_bytes},{cur_level_time},{cur_level_buffers},{max_size_bytes},{max_size_time},{max_size_buffers}") { let res = if let Some(idx) = idx {
writeln!(&mut file, "{timestamp},{name}:{idx},0x{ptr:08x},{cur_level_bytes},{cur_level_time},{cur_level_buffers},{max_size_bytes},{max_size_time},{max_size_buffers}")
} else {
writeln!(&mut file, "{timestamp},{name},0x{ptr:08x},{cur_level_bytes},{cur_level_time},{cur_level_buffers},{max_size_bytes},{max_size_time},{max_size_buffers}")
};
if let Err(err) = res {
gst::error!(CAT, obj: obj, "Failed to write to file: {err}"); gst::error!(CAT, obj: obj, "Failed to write to file: {err}");
return; return;
} }
@ -245,7 +276,7 @@ impl GstObjectImpl for QueueLevels {}
impl TracerImpl for QueueLevels { impl TracerImpl for QueueLevels {
fn element_new(&self, _ts: u64, element: &gst::Element) { fn element_new(&self, _ts: u64, element: &gst::Element) {
if element.type_() != *QUEUE_TYPE { if !is_queue_type(element.type_()) {
return; return;
} }
@ -283,103 +314,65 @@ impl TracerImpl for QueueLevels {
} }
fn pad_push_pre(&self, ts: u64, pad: &gst::Pad, _buffer: &gst::Buffer) { fn pad_push_pre(&self, ts: u64, pad: &gst::Pad, _buffer: &gst::Buffer) {
let element =
if let Some(parent) = pad.parent().and_then(|p| p.downcast::<gst::Element>().ok()) { if let Some(parent) = pad.parent().and_then(|p| p.downcast::<gst::Element>().ok()) {
if parent.type_() == *QUEUE_TYPE { if is_queue_type(parent.type_()) {
parent self.log(&parent, Some(pad), ts);
} else { }
return;
} }
} else {
return;
};
self.log(&element, ts);
} }
fn pad_push_list_pre(&self, ts: u64, pad: &gst::Pad, _list: &gst::BufferList) { fn pad_push_list_pre(&self, ts: u64, pad: &gst::Pad, _list: &gst::BufferList) {
let element =
if let Some(parent) = pad.parent().and_then(|p| p.downcast::<gst::Element>().ok()) { if let Some(parent) = pad.parent().and_then(|p| p.downcast::<gst::Element>().ok()) {
if parent.type_() == *QUEUE_TYPE { if is_queue_type(parent.type_()) {
parent self.log(&parent, Some(pad), ts);
} else { }
return;
} }
} else {
return;
};
self.log(&element, ts);
} }
#[cfg(not(feature = "v1_22"))] #[cfg(not(feature = "v1_22"))]
fn pad_push_post(&self, ts: u64, pad: &gst::Pad, _result: gst::FlowReturn) { fn pad_push_post(&self, ts: u64, pad: &gst::Pad, _result: gst::FlowReturn) {
let element = if let Some(parent) = pad if let Some(peer) = pad.peer() {
.peer() if let Some(parent) = peer
.and_then(|p| p.parent()) .parent()
.and_then(|p| p.downcast::<gst::Element>().ok()) .and_then(|p| p.downcast::<gst::Element>().ok())
{ {
if parent.type_() == *QUEUE_TYPE { if is_queue_type(parent.type_()) {
parent self.log(&parent, Some(&peer), ts);
} else { }
return; }
} }
} else {
return;
};
self.log(&element, ts);
} }
#[cfg(not(feature = "v1_22"))] #[cfg(not(feature = "v1_22"))]
fn pad_push_list_post(&self, ts: u64, pad: &gst::Pad, _result: gst::FlowReturn) { fn pad_push_list_post(&self, ts: u64, pad: &gst::Pad, _result: gst::FlowReturn) {
let element = if let Some(parent) = pad if let Some(peer) = pad.peer() {
.peer() if let Some(parent) = peer
.and_then(|p| p.parent()) .parent()
.and_then(|p| p.downcast::<gst::Element>().ok()) .and_then(|p| p.downcast::<gst::Element>().ok())
{ {
if parent.type_() == *QUEUE_TYPE { if is_queue_type(parent.type_()) {
parent self.log(&parent, Some(&peer), ts);
} else { }
return; }
} }
} else {
return;
};
self.log(&element, ts);
} }
#[cfg(feature = "v1_22")] #[cfg(feature = "v1_22")]
fn pad_chain_post(&self, ts: u64, pad: &gst::Pad, _result: gst::FlowReturn) { fn pad_chain_post(&self, ts: u64, pad: &gst::Pad, _result: gst::FlowReturn) {
let element =
if let Some(parent) = pad.parent().and_then(|p| p.downcast::<gst::Element>().ok()) { if let Some(parent) = pad.parent().and_then(|p| p.downcast::<gst::Element>().ok()) {
if parent.type_() == *QUEUE_TYPE { if is_queue_type(parent.type_()) {
parent self.log(&parent, Some(pad), ts);
} else { }
return;
} }
} else {
return;
};
self.log(&element, ts);
} }
#[cfg(feature = "v1_22")] #[cfg(feature = "v1_22")]
fn pad_chain_list_post(&self, ts: u64, pad: &gst::Pad, _result: gst::FlowReturn) { fn pad_chain_list_post(&self, ts: u64, pad: &gst::Pad, _result: gst::FlowReturn) {
let element =
if let Some(parent) = pad.parent().and_then(|p| p.downcast::<gst::Element>().ok()) { if let Some(parent) = pad.parent().and_then(|p| p.downcast::<gst::Element>().ok()) {
if parent.type_() == *QUEUE_TYPE { if is_queue_type(parent.type_()) {
parent self.log(&parent, Some(pad), ts);
} else { }
return;
} }
} else {
return;
};
self.log(&element, ts);
} }
fn element_change_state_post( fn element_change_state_post(
@ -393,11 +386,11 @@ impl TracerImpl for QueueLevels {
return; return;
} }
if element.type_() != *QUEUE_TYPE { if !is_queue_type(element.type_()) {
return; return;
} }
self.log(element, ts); self.log(element, None, ts);
} }
fn pad_push_event_pre(&self, ts: u64, pad: &gst::Pad, ev: &gst::Event) { fn pad_push_event_pre(&self, ts: u64, pad: &gst::Pad, ev: &gst::Event) {
@ -406,15 +399,15 @@ impl TracerImpl for QueueLevels {
} }
if let Some(parent) = pad.parent().and_then(|p| p.downcast::<gst::Element>().ok()) { if let Some(parent) = pad.parent().and_then(|p| p.downcast::<gst::Element>().ok()) {
if parent.type_() == *QUEUE_TYPE { if is_queue_type(parent.type_()) {
self.log(&parent, ts); self.log(&parent, Some(pad), ts);
} }
} }
} }
} }
impl QueueLevels { impl QueueLevels {
fn log(&self, element: &gst::Element, timestamp: u64) { fn log(&self, element: &gst::Element, pad: Option<&gst::Pad>, timestamp: u64) {
let ptr = element.as_ptr() as usize; let ptr = element.as_ptr() as usize;
let mut state = self.state.lock().unwrap(); let mut state = self.state.lock().unwrap();
@ -423,15 +416,53 @@ impl QueueLevels {
None => return, None => return,
}; };
let cur_level_bytes = element.property::<u32>("current-level-bytes");
let cur_level_time = element.property::<u64>("current-level-time");
let cur_level_buffers = element.property::<u32>("current-level-buffers");
let max_size_bytes = element.property::<u32>("max-size-bytes"); let max_size_bytes = element.property::<u32>("max-size-bytes");
let max_size_time = element.property::<u64>("max-size-time"); let max_size_time = element.property::<u64>("max-size-time");
let max_size_buffers = element.property::<u32>("max-size-buffers"); let max_size_buffers = element.property::<u32>("max-size-buffers");
if element.type_() == *MULTIQUEUE_TYPE {
let get_pad_idx = |pad: &gst::Pad| {
// SAFETY: Names can't change while there's a strong reference to the object
unsafe {
let name_ptr = (*pad.as_ptr()).object.name;
let name = std::ffi::CStr::from_ptr(name_ptr as *const _)
.to_str()
.unwrap();
if let Some(idx) = name.strip_prefix("sink_") {
idx.parse::<usize>().unwrap()
} else if let Some(idx) = name.strip_prefix("src_") {
idx.parse::<usize>().unwrap()
} else {
unreachable!();
}
}
};
if let Some(pad) = pad {
let cur_level_bytes = pad.property::<u32>("current-level-bytes");
let cur_level_time = pad.property::<u64>("current-level-time");
let cur_level_buffers = pad.property::<u32>("current-level-buffers");
state.log.push(LogLine { state.log.push(LogLine {
timestamp, timestamp,
name, name,
idx: Some(get_pad_idx(pad)),
ptr,
cur_level_bytes,
cur_level_time,
cur_level_buffers,
max_size_bytes,
max_size_time,
max_size_buffers,
});
} else {
for pad in element.sink_pads() {
let cur_level_bytes = pad.property::<u32>("current-level-bytes");
let cur_level_time = pad.property::<u64>("current-level-time");
let cur_level_buffers = pad.property::<u32>("current-level-buffers");
state.log.push(LogLine {
timestamp,
name: name.clone(),
idx: Some(get_pad_idx(&pad)),
ptr, ptr,
cur_level_bytes, cur_level_bytes,
cur_level_time, cur_level_time,
@ -442,3 +473,22 @@ impl QueueLevels {
}); });
} }
} }
} else {
let cur_level_bytes = element.property::<u32>("current-level-bytes");
let cur_level_time = element.property::<u64>("current-level-time");
let cur_level_buffers = element.property::<u32>("current-level-buffers");
state.log.push(LogLine {
timestamp,
name,
idx: None,
ptr,
cur_level_bytes,
cur_level_time,
cur_level_buffers,
max_size_bytes,
max_size_time,
max_size_buffers,
});
}
}
}