diff --git a/docs/plugins/gst_plugins_cache.json b/docs/plugins/gst_plugins_cache.json index 6bc654983..7c8a27f7e 100644 --- a/docs/plugins/gst_plugins_cache.json +++ b/docs/plugins/gst_plugins_cache.json @@ -16732,6 +16732,48 @@ "type": "guint", "writable": true }, + "current-level-buffers": { + "blurb": "Current number of buffers in the queue", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "0", + "max": "-1", + "min": "0", + "mutable": "null", + "readable": true, + "type": "guint", + "writable": false + }, + "current-level-bytes": { + "blurb": "Current amount of data in the queue (bytes)", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "0", + "max": "-1", + "min": "0", + "mutable": "null", + "readable": true, + "type": "guint", + "writable": false + }, + "current-level-time": { + "blurb": "Current amount of data in the queue (in ns)", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "0", + "max": "18446744073709551615", + "min": "0", + "mutable": "null", + "readable": true, + "type": "guint64", + "writable": false + }, "inter-context": { "blurb": "Context name of the inter elements to share with", "conditionally-available": false, @@ -17008,6 +17050,48 @@ "type": "guint", "writable": true }, + "current-level-buffers": { + "blurb": "Current number of buffers in the queue", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "0", + "max": "-1", + "min": "0", + "mutable": "null", + "readable": true, + "type": "guint", + "writable": false + }, + "current-level-bytes": { + "blurb": "Current amount of data in the queue (bytes)", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "0", + "max": "-1", + "min": "0", + "mutable": "null", + "readable": true, + "type": "guint", + "writable": false + }, + "current-level-time": { + "blurb": "Current amount of data in the queue (in ns)", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "0", + "max": "18446744073709551615", + "min": "0", + "mutable": "null", + "readable": true, + "type": "guint64", + "writable": false + }, "max-size-buffers": { "blurb": "Maximum number of buffers to queue (0=unlimited)", "conditionally-available": false, @@ -17116,6 +17200,48 @@ "type": "guint", "writable": true }, + "current-level-buffers": { + "blurb": "Current number of buffers in the queue", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "0", + "max": "-1", + "min": "0", + "mutable": "null", + "readable": true, + "type": "guint", + "writable": false + }, + "current-level-bytes": { + "blurb": "Current amount of data in the queue (bytes)", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "0", + "max": "-1", + "min": "0", + "mutable": "null", + "readable": true, + "type": "guint", + "writable": false + }, + "current-level-time": { + "blurb": "Current amount of data in the queue (in ns)", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "0", + "max": "18446744073709551615", + "min": "0", + "mutable": "null", + "readable": true, + "type": "guint64", + "writable": false + }, "max-size-buffers": { "blurb": "Maximum number of buffers to queue (0=unlimited)", "conditionally-available": false, diff --git a/generic/threadshare/src/dataqueue.rs b/generic/threadshare/src/dataqueue.rs index 50fb10564..41081ad39 100644 --- a/generic/threadshare/src/dataqueue.rs +++ b/generic/threadshare/src/dataqueue.rs @@ -80,8 +80,9 @@ struct DataQueueInner { state: DataQueueState, queue: VecDeque, - cur_size_buffers: u32, - cur_size_bytes: u32, + cur_level_buffers: u32, + cur_level_bytes: u32, + cur_level_time: gst::ClockTime, max_size_buffers: Option, max_size_bytes: Option, max_size_time: Option, @@ -110,8 +111,9 @@ impl DataQueue { src_pad: src_pad.clone(), state: DataQueueState::Stopped, queue: VecDeque::new(), - cur_size_buffers: 0, - cur_size_bytes: 0, + cur_level_buffers: 0, + cur_level_bytes: 0, + cur_level_time: gst::ClockTime::ZERO, max_size_buffers, max_size_bytes, max_size_time: max_size_time.into(), @@ -123,6 +125,18 @@ impl DataQueue { self.0.lock().unwrap().state } + pub fn cur_level_buffers(&self) -> u32 { + self.0.lock().unwrap().cur_level_buffers + } + + pub fn cur_level_bytes(&self) -> u32 { + self.0.lock().unwrap().cur_level_bytes + } + + pub fn cur_level_time(&self) -> gst::ClockTime { + self.0.lock().unwrap().cur_level_time + } + pub fn start(&self) { let mut inner = self.0.lock().unwrap(); if inner.state == DataQueueState::Started { @@ -170,6 +184,10 @@ impl DataQueue { } } + inner.cur_level_buffers = 0; + inner.cur_level_bytes = 0; + inner.cur_level_time = gst::ClockTime::ZERO; + gst::debug!(DATA_QUEUE_CAT, obj = inner.element, "Data queue cleared"); } @@ -195,58 +213,63 @@ impl DataQueue { ); let (count, bytes) = item.size(); - let queue_ts = inner.queue.iter().filter_map(|i| i.timestamp()).next(); + let queue_ts = inner.queue.iter().find_map(|i| i.timestamp()); let ts = item.timestamp(); if let Some(max) = inner.max_size_buffers { - if max <= inner.cur_size_buffers { + if max <= inner.cur_level_buffers { gst::debug!( DATA_QUEUE_CAT, obj = inner.element, "Queue is full (buffers): {} <= {}", max, - inner.cur_size_buffers + inner.cur_level_buffers ); return Err(item); } } if let Some(max) = inner.max_size_bytes { - if max <= inner.cur_size_bytes { + if max <= inner.cur_level_bytes { gst::debug!( DATA_QUEUE_CAT, obj = inner.element, "Queue is full (bytes): {} <= {}", max, - inner.cur_size_bytes + inner.cur_level_bytes ); return Err(item); } } // FIXME: Use running time - if let (Some(max), Some(queue_ts), Some(ts)) = (inner.max_size_time, queue_ts, ts) { + let level = if let (Some(queue_ts), Some(ts)) = (queue_ts, ts) { let level = if queue_ts > ts { queue_ts - ts } else { ts - queue_ts }; - if max <= level { + if inner.max_size_time.opt_le(level).unwrap_or(false) { gst::debug!( DATA_QUEUE_CAT, obj = inner.element, "Queue is full (time): {} <= {}", - max, + inner.max_size_time.display(), level ); return Err(item); } - } + + level + } else { + gst::ClockTime::ZERO + }; inner.queue.push_back(item); - inner.cur_size_buffers += count; - inner.cur_size_bytes += bytes; + inner.cur_level_buffers += count; + inner.cur_level_bytes += bytes; + inner.cur_level_time = level; inner.wake(); @@ -273,8 +296,8 @@ impl DataQueue { ); let (count, bytes) = item.size(); - inner.cur_size_buffers -= count; - inner.cur_size_bytes -= bytes; + inner.cur_level_buffers -= count; + inner.cur_level_bytes -= bytes; return Some(item); } diff --git a/generic/threadshare/src/inter/src/imp.rs b/generic/threadshare/src/inter/src/imp.rs index 29c26c0cd..e3a451eef 100644 --- a/generic/threadshare/src/inter/src/imp.rs +++ b/generic/threadshare/src/inter/src/imp.rs @@ -852,6 +852,21 @@ impl ObjectImpl for InterSrc { .readwrite() .construct_only() .build(), + glib::ParamSpecUInt::builder("current-level-buffers") + .nick("Current Level Buffers") + .blurb("Current number of buffers in the queue") + .read_only() + .build(), + glib::ParamSpecUInt::builder("current-level-bytes") + .nick("Current Level Bytes") + .blurb("Current amount of data in the queue (bytes)") + .read_only() + .build(), + glib::ParamSpecUInt64::builder("current-level-time") + .nick("Current Level Time") + .blurb("Current amount of data in the queue (in ns)") + .read_only() + .build(), ] }); @@ -904,14 +919,43 @@ impl ObjectImpl for InterSrc { } fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { - let settings = self.settings.lock().unwrap(); match pspec.name() { - "max-size-buffers" => settings.max_size_buffers.to_value(), - "max-size-bytes" => settings.max_size_bytes.to_value(), - "max-size-time" => settings.max_size_time.nseconds().to_value(), - "context" => settings.context.to_value(), - "context-wait" => (settings.context_wait.as_millis() as u32).to_value(), - "inter-context" => settings.inter_context.to_value(), + "max-size-buffers" => self.settings.lock().unwrap().max_size_buffers.to_value(), + "max-size-bytes" => self.settings.lock().unwrap().max_size_bytes.to_value(), + "max-size-time" => self + .settings + .lock() + .unwrap() + .max_size_time + .nseconds() + .to_value(), + "current-level-buffers" => self + .dataqueue + .lock() + .unwrap() + .as_ref() + .map_or(0, |d| d.cur_level_buffers()) + .to_value(), + "current-level-bytes" => self + .dataqueue + .lock() + .unwrap() + .as_ref() + .map_or(0, |d| d.cur_level_bytes()) + .to_value(), + "current-level-time" => self + .dataqueue + .lock() + .unwrap() + .as_ref() + .map_or(gst::ClockTime::ZERO, |d| d.cur_level_time()) + .nseconds() + .to_value(), + "context" => self.settings.lock().unwrap().context.to_value(), + "context-wait" => { + (self.settings.lock().unwrap().context_wait.as_millis() as u32).to_value() + } + "inter-context" => self.settings.lock().unwrap().inter_context.to_value(), _ => unimplemented!(), } } diff --git a/generic/threadshare/src/proxy/imp.rs b/generic/threadshare/src/proxy/imp.rs index 9f8d1ff69..cbf88adeb 100644 --- a/generic/threadshare/src/proxy/imp.rs +++ b/generic/threadshare/src/proxy/imp.rs @@ -1058,6 +1058,21 @@ impl ObjectImpl for ProxySrc { .maximum(u64::MAX - 1) .default_value(DEFAULT_MAX_SIZE_TIME.nseconds()) .build(), + glib::ParamSpecUInt::builder("current-level-buffers") + .nick("Current Level Buffers") + .blurb("Current number of buffers in the queue") + .read_only() + .build(), + glib::ParamSpecUInt::builder("current-level-bytes") + .nick("Current Level Bytes") + .blurb("Current amount of data in the queue (bytes)") + .read_only() + .build(), + glib::ParamSpecUInt64::builder("current-level-time") + .nick("Current Level Time") + .blurb("Current amount of data in the queue (in ns)") + .read_only() + .build(), ] }); @@ -1098,14 +1113,43 @@ impl ObjectImpl for ProxySrc { } fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { - let settings = self.settings.lock().unwrap(); match pspec.name() { - "max-size-buffers" => settings.max_size_buffers.to_value(), - "max-size-bytes" => settings.max_size_bytes.to_value(), - "max-size-time" => settings.max_size_time.nseconds().to_value(), - "context" => settings.context.to_value(), - "context-wait" => (settings.context_wait.as_millis() as u32).to_value(), - "proxy-context" => settings.proxy_context.to_value(), + "max-size-buffers" => self.settings.lock().unwrap().max_size_buffers.to_value(), + "max-size-bytes" => self.settings.lock().unwrap().max_size_bytes.to_value(), + "max-size-time" => self + .settings + .lock() + .unwrap() + .max_size_time + .nseconds() + .to_value(), + "current-level-buffers" => self + .dataqueue + .lock() + .unwrap() + .as_ref() + .map_or(0, |d| d.cur_level_buffers()) + .to_value(), + "current-level-bytes" => self + .dataqueue + .lock() + .unwrap() + .as_ref() + .map_or(0, |d| d.cur_level_bytes()) + .to_value(), + "current-level-time" => self + .dataqueue + .lock() + .unwrap() + .as_ref() + .map_or(gst::ClockTime::ZERO, |d| d.cur_level_time()) + .nseconds() + .to_value(), + "context" => self.settings.lock().unwrap().context.to_value(), + "context-wait" => { + (self.settings.lock().unwrap().context_wait.as_millis() as u32).to_value() + } + "proxy-context" => self.settings.lock().unwrap().proxy_context.to_value(), _ => unimplemented!(), } } diff --git a/generic/threadshare/src/queue/imp.rs b/generic/threadshare/src/queue/imp.rs index b744dcd7f..dccde1325 100644 --- a/generic/threadshare/src/queue/imp.rs +++ b/generic/threadshare/src/queue/imp.rs @@ -648,6 +648,21 @@ impl ObjectImpl for Queue { .maximum(u64::MAX - 1) .default_value(DEFAULT_MAX_SIZE_TIME.nseconds()) .build(), + glib::ParamSpecUInt::builder("current-level-buffers") + .nick("Current Level Buffers") + .blurb("Current number of buffers in the queue") + .read_only() + .build(), + glib::ParamSpecUInt::builder("current-level-bytes") + .nick("Current Level Bytes") + .blurb("Current amount of data in the queue (bytes)") + .read_only() + .build(), + glib::ParamSpecUInt64::builder("current-level-time") + .nick("Current Level Time") + .blurb("Current amount of data in the queue (in ns)") + .read_only() + .build(), ] }); @@ -682,13 +697,42 @@ impl ObjectImpl for Queue { } fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { - let settings = self.settings.lock().unwrap(); match pspec.name() { - "max-size-buffers" => settings.max_size_buffers.to_value(), - "max-size-bytes" => settings.max_size_bytes.to_value(), - "max-size-time" => settings.max_size_time.nseconds().to_value(), - "context" => settings.context.to_value(), - "context-wait" => (settings.context_wait.as_millis() as u32).to_value(), + "max-size-buffers" => self.settings.lock().unwrap().max_size_buffers.to_value(), + "max-size-bytes" => self.settings.lock().unwrap().max_size_bytes.to_value(), + "max-size-time" => self + .settings + .lock() + .unwrap() + .max_size_time + .nseconds() + .to_value(), + "current-level-buffers" => self + .dataqueue + .lock() + .unwrap() + .as_ref() + .map_or(0, |d| d.cur_level_buffers()) + .to_value(), + "current-level-bytes" => self + .dataqueue + .lock() + .unwrap() + .as_ref() + .map_or(0, |d| d.cur_level_bytes()) + .to_value(), + "current-level-time" => self + .dataqueue + .lock() + .unwrap() + .as_ref() + .map_or(gst::ClockTime::ZERO, |d| d.cur_level_time()) + .nseconds() + .to_value(), + "context" => self.settings.lock().unwrap().context.to_value(), + "context-wait" => { + (self.settings.lock().unwrap().context_wait.as_millis() as u32).to_value() + } _ => unimplemented!(), } }