threadshare: add cur-level properties to DataQueue related elements

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/2399>
This commit is contained in:
François Laignel 2025-07-29 15:21:13 +02:00
parent 42a72d034e
commit 2f10e6e23f
5 changed files with 318 additions and 37 deletions

View file

@ -16732,6 +16732,48 @@
"type": "guint", "type": "guint",
"writable": true "writable": true
}, },
"current-level-buffers": {
"blurb": "Current number of buffers in the queue",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "0",
"max": "-1",
"min": "0",
"mutable": "null",
"readable": true,
"type": "guint",
"writable": false
},
"current-level-bytes": {
"blurb": "Current amount of data in the queue (bytes)",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "0",
"max": "-1",
"min": "0",
"mutable": "null",
"readable": true,
"type": "guint",
"writable": false
},
"current-level-time": {
"blurb": "Current amount of data in the queue (in ns)",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "0",
"max": "18446744073709551615",
"min": "0",
"mutable": "null",
"readable": true,
"type": "guint64",
"writable": false
},
"inter-context": { "inter-context": {
"blurb": "Context name of the inter elements to share with", "blurb": "Context name of the inter elements to share with",
"conditionally-available": false, "conditionally-available": false,
@ -17008,6 +17050,48 @@
"type": "guint", "type": "guint",
"writable": true "writable": true
}, },
"current-level-buffers": {
"blurb": "Current number of buffers in the queue",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "0",
"max": "-1",
"min": "0",
"mutable": "null",
"readable": true,
"type": "guint",
"writable": false
},
"current-level-bytes": {
"blurb": "Current amount of data in the queue (bytes)",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "0",
"max": "-1",
"min": "0",
"mutable": "null",
"readable": true,
"type": "guint",
"writable": false
},
"current-level-time": {
"blurb": "Current amount of data in the queue (in ns)",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "0",
"max": "18446744073709551615",
"min": "0",
"mutable": "null",
"readable": true,
"type": "guint64",
"writable": false
},
"max-size-buffers": { "max-size-buffers": {
"blurb": "Maximum number of buffers to queue (0=unlimited)", "blurb": "Maximum number of buffers to queue (0=unlimited)",
"conditionally-available": false, "conditionally-available": false,
@ -17116,6 +17200,48 @@
"type": "guint", "type": "guint",
"writable": true "writable": true
}, },
"current-level-buffers": {
"blurb": "Current number of buffers in the queue",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "0",
"max": "-1",
"min": "0",
"mutable": "null",
"readable": true,
"type": "guint",
"writable": false
},
"current-level-bytes": {
"blurb": "Current amount of data in the queue (bytes)",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "0",
"max": "-1",
"min": "0",
"mutable": "null",
"readable": true,
"type": "guint",
"writable": false
},
"current-level-time": {
"blurb": "Current amount of data in the queue (in ns)",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "0",
"max": "18446744073709551615",
"min": "0",
"mutable": "null",
"readable": true,
"type": "guint64",
"writable": false
},
"max-size-buffers": { "max-size-buffers": {
"blurb": "Maximum number of buffers to queue (0=unlimited)", "blurb": "Maximum number of buffers to queue (0=unlimited)",
"conditionally-available": false, "conditionally-available": false,

View file

@ -80,8 +80,9 @@ struct DataQueueInner {
state: DataQueueState, state: DataQueueState,
queue: VecDeque<DataQueueItem>, queue: VecDeque<DataQueueItem>,
cur_size_buffers: u32, cur_level_buffers: u32,
cur_size_bytes: u32, cur_level_bytes: u32,
cur_level_time: gst::ClockTime,
max_size_buffers: Option<u32>, max_size_buffers: Option<u32>,
max_size_bytes: Option<u32>, max_size_bytes: Option<u32>,
max_size_time: Option<gst::ClockTime>, max_size_time: Option<gst::ClockTime>,
@ -110,8 +111,9 @@ impl DataQueue {
src_pad: src_pad.clone(), src_pad: src_pad.clone(),
state: DataQueueState::Stopped, state: DataQueueState::Stopped,
queue: VecDeque::new(), queue: VecDeque::new(),
cur_size_buffers: 0, cur_level_buffers: 0,
cur_size_bytes: 0, cur_level_bytes: 0,
cur_level_time: gst::ClockTime::ZERO,
max_size_buffers, max_size_buffers,
max_size_bytes, max_size_bytes,
max_size_time: max_size_time.into(), max_size_time: max_size_time.into(),
@ -123,6 +125,18 @@ impl DataQueue {
self.0.lock().unwrap().state self.0.lock().unwrap().state
} }
pub fn cur_level_buffers(&self) -> u32 {
self.0.lock().unwrap().cur_level_buffers
}
pub fn cur_level_bytes(&self) -> u32 {
self.0.lock().unwrap().cur_level_bytes
}
pub fn cur_level_time(&self) -> gst::ClockTime {
self.0.lock().unwrap().cur_level_time
}
pub fn start(&self) { pub fn start(&self) {
let mut inner = self.0.lock().unwrap(); let mut inner = self.0.lock().unwrap();
if inner.state == DataQueueState::Started { if inner.state == DataQueueState::Started {
@ -170,6 +184,10 @@ impl DataQueue {
} }
} }
inner.cur_level_buffers = 0;
inner.cur_level_bytes = 0;
inner.cur_level_time = gst::ClockTime::ZERO;
gst::debug!(DATA_QUEUE_CAT, obj = inner.element, "Data queue cleared"); gst::debug!(DATA_QUEUE_CAT, obj = inner.element, "Data queue cleared");
} }
@ -195,58 +213,63 @@ impl DataQueue {
); );
let (count, bytes) = item.size(); let (count, bytes) = item.size();
let queue_ts = inner.queue.iter().filter_map(|i| i.timestamp()).next(); let queue_ts = inner.queue.iter().find_map(|i| i.timestamp());
let ts = item.timestamp(); let ts = item.timestamp();
if let Some(max) = inner.max_size_buffers { if let Some(max) = inner.max_size_buffers {
if max <= inner.cur_size_buffers { if max <= inner.cur_level_buffers {
gst::debug!( gst::debug!(
DATA_QUEUE_CAT, DATA_QUEUE_CAT,
obj = inner.element, obj = inner.element,
"Queue is full (buffers): {} <= {}", "Queue is full (buffers): {} <= {}",
max, max,
inner.cur_size_buffers inner.cur_level_buffers
); );
return Err(item); return Err(item);
} }
} }
if let Some(max) = inner.max_size_bytes { if let Some(max) = inner.max_size_bytes {
if max <= inner.cur_size_bytes { if max <= inner.cur_level_bytes {
gst::debug!( gst::debug!(
DATA_QUEUE_CAT, DATA_QUEUE_CAT,
obj = inner.element, obj = inner.element,
"Queue is full (bytes): {} <= {}", "Queue is full (bytes): {} <= {}",
max, max,
inner.cur_size_bytes inner.cur_level_bytes
); );
return Err(item); return Err(item);
} }
} }
// FIXME: Use running time // FIXME: Use running time
if let (Some(max), Some(queue_ts), Some(ts)) = (inner.max_size_time, queue_ts, ts) { let level = if let (Some(queue_ts), Some(ts)) = (queue_ts, ts) {
let level = if queue_ts > ts { let level = if queue_ts > ts {
queue_ts - ts queue_ts - ts
} else { } else {
ts - queue_ts ts - queue_ts
}; };
if max <= level { if inner.max_size_time.opt_le(level).unwrap_or(false) {
gst::debug!( gst::debug!(
DATA_QUEUE_CAT, DATA_QUEUE_CAT,
obj = inner.element, obj = inner.element,
"Queue is full (time): {} <= {}", "Queue is full (time): {} <= {}",
max, inner.max_size_time.display(),
level level
); );
return Err(item); return Err(item);
} }
}
level
} else {
gst::ClockTime::ZERO
};
inner.queue.push_back(item); inner.queue.push_back(item);
inner.cur_size_buffers += count; inner.cur_level_buffers += count;
inner.cur_size_bytes += bytes; inner.cur_level_bytes += bytes;
inner.cur_level_time = level;
inner.wake(); inner.wake();
@ -273,8 +296,8 @@ impl DataQueue {
); );
let (count, bytes) = item.size(); let (count, bytes) = item.size();
inner.cur_size_buffers -= count; inner.cur_level_buffers -= count;
inner.cur_size_bytes -= bytes; inner.cur_level_bytes -= bytes;
return Some(item); return Some(item);
} }

View file

@ -852,6 +852,21 @@ impl ObjectImpl for InterSrc {
.readwrite() .readwrite()
.construct_only() .construct_only()
.build(), .build(),
glib::ParamSpecUInt::builder("current-level-buffers")
.nick("Current Level Buffers")
.blurb("Current number of buffers in the queue")
.read_only()
.build(),
glib::ParamSpecUInt::builder("current-level-bytes")
.nick("Current Level Bytes")
.blurb("Current amount of data in the queue (bytes)")
.read_only()
.build(),
glib::ParamSpecUInt64::builder("current-level-time")
.nick("Current Level Time")
.blurb("Current amount of data in the queue (in ns)")
.read_only()
.build(),
] ]
}); });
@ -904,14 +919,43 @@ impl ObjectImpl for InterSrc {
} }
fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
let settings = self.settings.lock().unwrap();
match pspec.name() { match pspec.name() {
"max-size-buffers" => settings.max_size_buffers.to_value(), "max-size-buffers" => self.settings.lock().unwrap().max_size_buffers.to_value(),
"max-size-bytes" => settings.max_size_bytes.to_value(), "max-size-bytes" => self.settings.lock().unwrap().max_size_bytes.to_value(),
"max-size-time" => settings.max_size_time.nseconds().to_value(), "max-size-time" => self
"context" => settings.context.to_value(), .settings
"context-wait" => (settings.context_wait.as_millis() as u32).to_value(), .lock()
"inter-context" => settings.inter_context.to_value(), .unwrap()
.max_size_time
.nseconds()
.to_value(),
"current-level-buffers" => self
.dataqueue
.lock()
.unwrap()
.as_ref()
.map_or(0, |d| d.cur_level_buffers())
.to_value(),
"current-level-bytes" => self
.dataqueue
.lock()
.unwrap()
.as_ref()
.map_or(0, |d| d.cur_level_bytes())
.to_value(),
"current-level-time" => self
.dataqueue
.lock()
.unwrap()
.as_ref()
.map_or(gst::ClockTime::ZERO, |d| d.cur_level_time())
.nseconds()
.to_value(),
"context" => self.settings.lock().unwrap().context.to_value(),
"context-wait" => {
(self.settings.lock().unwrap().context_wait.as_millis() as u32).to_value()
}
"inter-context" => self.settings.lock().unwrap().inter_context.to_value(),
_ => unimplemented!(), _ => unimplemented!(),
} }
} }

View file

@ -1058,6 +1058,21 @@ impl ObjectImpl for ProxySrc {
.maximum(u64::MAX - 1) .maximum(u64::MAX - 1)
.default_value(DEFAULT_MAX_SIZE_TIME.nseconds()) .default_value(DEFAULT_MAX_SIZE_TIME.nseconds())
.build(), .build(),
glib::ParamSpecUInt::builder("current-level-buffers")
.nick("Current Level Buffers")
.blurb("Current number of buffers in the queue")
.read_only()
.build(),
glib::ParamSpecUInt::builder("current-level-bytes")
.nick("Current Level Bytes")
.blurb("Current amount of data in the queue (bytes)")
.read_only()
.build(),
glib::ParamSpecUInt64::builder("current-level-time")
.nick("Current Level Time")
.blurb("Current amount of data in the queue (in ns)")
.read_only()
.build(),
] ]
}); });
@ -1098,14 +1113,43 @@ impl ObjectImpl for ProxySrc {
} }
fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
let settings = self.settings.lock().unwrap();
match pspec.name() { match pspec.name() {
"max-size-buffers" => settings.max_size_buffers.to_value(), "max-size-buffers" => self.settings.lock().unwrap().max_size_buffers.to_value(),
"max-size-bytes" => settings.max_size_bytes.to_value(), "max-size-bytes" => self.settings.lock().unwrap().max_size_bytes.to_value(),
"max-size-time" => settings.max_size_time.nseconds().to_value(), "max-size-time" => self
"context" => settings.context.to_value(), .settings
"context-wait" => (settings.context_wait.as_millis() as u32).to_value(), .lock()
"proxy-context" => settings.proxy_context.to_value(), .unwrap()
.max_size_time
.nseconds()
.to_value(),
"current-level-buffers" => self
.dataqueue
.lock()
.unwrap()
.as_ref()
.map_or(0, |d| d.cur_level_buffers())
.to_value(),
"current-level-bytes" => self
.dataqueue
.lock()
.unwrap()
.as_ref()
.map_or(0, |d| d.cur_level_bytes())
.to_value(),
"current-level-time" => self
.dataqueue
.lock()
.unwrap()
.as_ref()
.map_or(gst::ClockTime::ZERO, |d| d.cur_level_time())
.nseconds()
.to_value(),
"context" => self.settings.lock().unwrap().context.to_value(),
"context-wait" => {
(self.settings.lock().unwrap().context_wait.as_millis() as u32).to_value()
}
"proxy-context" => self.settings.lock().unwrap().proxy_context.to_value(),
_ => unimplemented!(), _ => unimplemented!(),
} }
} }

View file

@ -648,6 +648,21 @@ impl ObjectImpl for Queue {
.maximum(u64::MAX - 1) .maximum(u64::MAX - 1)
.default_value(DEFAULT_MAX_SIZE_TIME.nseconds()) .default_value(DEFAULT_MAX_SIZE_TIME.nseconds())
.build(), .build(),
glib::ParamSpecUInt::builder("current-level-buffers")
.nick("Current Level Buffers")
.blurb("Current number of buffers in the queue")
.read_only()
.build(),
glib::ParamSpecUInt::builder("current-level-bytes")
.nick("Current Level Bytes")
.blurb("Current amount of data in the queue (bytes)")
.read_only()
.build(),
glib::ParamSpecUInt64::builder("current-level-time")
.nick("Current Level Time")
.blurb("Current amount of data in the queue (in ns)")
.read_only()
.build(),
] ]
}); });
@ -682,13 +697,42 @@ impl ObjectImpl for Queue {
} }
fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
let settings = self.settings.lock().unwrap();
match pspec.name() { match pspec.name() {
"max-size-buffers" => settings.max_size_buffers.to_value(), "max-size-buffers" => self.settings.lock().unwrap().max_size_buffers.to_value(),
"max-size-bytes" => settings.max_size_bytes.to_value(), "max-size-bytes" => self.settings.lock().unwrap().max_size_bytes.to_value(),
"max-size-time" => settings.max_size_time.nseconds().to_value(), "max-size-time" => self
"context" => settings.context.to_value(), .settings
"context-wait" => (settings.context_wait.as_millis() as u32).to_value(), .lock()
.unwrap()
.max_size_time
.nseconds()
.to_value(),
"current-level-buffers" => self
.dataqueue
.lock()
.unwrap()
.as_ref()
.map_or(0, |d| d.cur_level_buffers())
.to_value(),
"current-level-bytes" => self
.dataqueue
.lock()
.unwrap()
.as_ref()
.map_or(0, |d| d.cur_level_bytes())
.to_value(),
"current-level-time" => self
.dataqueue
.lock()
.unwrap()
.as_ref()
.map_or(gst::ClockTime::ZERO, |d| d.cur_level_time())
.nseconds()
.to_value(),
"context" => self.settings.lock().unwrap().context.to_value(),
"context-wait" => {
(self.settings.lock().unwrap().context_wait.as_millis() as u32).to_value()
}
_ => unimplemented!(), _ => unimplemented!(),
} }
} }