diff --git a/gstreamer-utils/src/streamproducer.rs b/gstreamer-utils/src/streamproducer.rs index ea4914a8a..e0a9d8142 100644 --- a/gstreamer-utils/src/streamproducer.rs +++ b/gstreamer-utils/src/streamproducer.rs @@ -8,6 +8,57 @@ use gst::{glib, prelude::*}; use once_cell::sync::Lazy; use thiserror::Error; +// Small wrapper around AtomicU64 and a Mutex, to allow it to run regular AtomicU64 +// operations where supported, and fallback to a mutex where it is not. The wrapper methods +// are the ones that are needed, and not all are exposed. +#[derive(Debug)] +struct WrappedAtomicU64 { + #[cfg(not(target_has_atomic = "64"))] + atomic: Mutex, + #[cfg(target_has_atomic = "64")] + atomic: atomic::AtomicU64, +} + +#[cfg(target_has_atomic = "64")] +impl WrappedAtomicU64 { + fn new(value: u64) -> WrappedAtomicU64 { + WrappedAtomicU64 { + atomic: atomic::AtomicU64::new(value), + } + } + fn fetch_add(&self, value: u64, order: atomic::Ordering) -> u64 { + self.atomic.fetch_add(value, order) + } + fn store(&self, value: u64, order: atomic::Ordering) { + self.atomic.store(value, order); + } + + fn load(&self, order: atomic::Ordering) -> u64 { + self.atomic.load(order) + } +} + +#[cfg(not(target_has_atomic = "64"))] +impl WrappedAtomicU64 { + fn new(value: u64) -> WrappedAtomicU64 { + WrappedAtomicU64 { + atomic: Mutex::new(value), + } + } + fn fetch_add(&self, value: u64, _order: atomic::Ordering) -> u64 { + let mut guard = self.atomic.lock().unwrap(); + let old = *guard; + *guard += value; + old + } + fn store(&self, value: u64, _order: atomic::Ordering) { + *self.atomic.lock().unwrap() = value; + } + fn load(&self, _order: atomic::Ordering) -> u64 { + *self.atomic.lock().unwrap() + } +} + static CAT: Lazy = Lazy::new(|| { gst::DebugCategory::new( "utilsrs-stream-producer", @@ -45,9 +96,9 @@ pub struct ConsumptionLink { consumer: gst_app::AppSrc, producer: Option, /// number of buffers dropped because `consumer` internal queue was full - dropped: Arc, + dropped: Arc, /// number of buffers pushed through `consumer` - pushed: Arc, + pushed: Arc, /// if buffers should not be pushed to the `consumer` right now discard: Arc, } @@ -58,8 +109,8 @@ impl ConsumptionLink { ConsumptionLink { consumer, producer: None, - dropped: Arc::new(atomic::AtomicU64::new(0)), - pushed: Arc::new(atomic::AtomicU64::new(0)), + dropped: Arc::new(WrappedAtomicU64::new(0)), + pushed: Arc::new(WrappedAtomicU64::new(0)), discard: Arc::new(atomic::AtomicBool::new(false)), } } @@ -157,8 +208,8 @@ impl StreamProducer { &self, consumer: &gst_app::AppSrc, ) -> Result { - let dropped = Arc::new(atomic::AtomicU64::new(0)); - let pushed = Arc::new(atomic::AtomicU64::new(0)); + let dropped = Arc::new(WrappedAtomicU64::new(0)); + let pushed = Arc::new(WrappedAtomicU64::new(0)); let discard = Arc::new(atomic::AtomicBool::new(false)); self.add_consumer_internal(consumer, dropped.clone(), pushed.clone(), discard.clone())?; @@ -175,8 +226,8 @@ impl StreamProducer { fn add_consumer_internal( &self, consumer: &gst_app::AppSrc, - dropped: Arc, - pushed: Arc, + dropped: Arc, + pushed: Arc, discard: Arc, ) -> Result<(), AddConsumerError> { let mut consumers = self.consumers.lock().unwrap(); @@ -622,9 +673,9 @@ struct StreamConsumer { /// streams. needs_keyframe: Arc, /// number of buffers dropped because `appsrc` internal queue was full - dropped: Arc, + dropped: Arc, /// number of buffers pushed through `appsrc` - pushed: Arc, + pushed: Arc, /// if buffers should not be pushed to the `appsrc` right now discard: Arc, } @@ -634,8 +685,8 @@ impl StreamConsumer { fn new( appsrc: &gst_app::AppSrc, fku_probe_id: gst::PadProbeId, - dropped: Arc, - pushed: Arc, + dropped: Arc, + pushed: Arc, discard: Arc, ) -> Self { let needs_keyframe = Arc::new(atomic::AtomicBool::new(true));