utils: streamproducer: wrap atomic operations

In order to support more platforms, especially MIPS,
add a small wrapper for necessary atomic(u64) instructions.
It exposes the necessary functions, and falls back to
mutex if needed.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer-rs/-/merge_requests/1505>
This commit is contained in:
Albert Sjolund 2024-06-26 14:36:52 +02:00 committed by GStreamer Marge Bot
parent 5ae5f14532
commit dc9be3b60e

View file

@ -8,6 +8,57 @@ use gst::{glib, prelude::*};
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
use thiserror::Error; use thiserror::Error;
// Small wrapper around AtomicU64 and a Mutex, to allow it to run regular AtomicU64
// operations where supported, and fallback to a mutex where it is not. The wrapper methods
// are the ones that are needed, and not all are exposed.
#[derive(Debug)]
struct WrappedAtomicU64 {
#[cfg(not(target_has_atomic = "64"))]
atomic: Mutex<u64>,
#[cfg(target_has_atomic = "64")]
atomic: atomic::AtomicU64,
}
#[cfg(target_has_atomic = "64")]
impl WrappedAtomicU64 {
fn new(value: u64) -> WrappedAtomicU64 {
WrappedAtomicU64 {
atomic: atomic::AtomicU64::new(value),
}
}
fn fetch_add(&self, value: u64, order: atomic::Ordering) -> u64 {
self.atomic.fetch_add(value, order)
}
fn store(&self, value: u64, order: atomic::Ordering) {
self.atomic.store(value, order);
}
fn load(&self, order: atomic::Ordering) -> u64 {
self.atomic.load(order)
}
}
#[cfg(not(target_has_atomic = "64"))]
impl WrappedAtomicU64 {
fn new(value: u64) -> WrappedAtomicU64 {
WrappedAtomicU64 {
atomic: Mutex::new(value),
}
}
fn fetch_add(&self, value: u64, _order: atomic::Ordering) -> u64 {
let mut guard = self.atomic.lock().unwrap();
let old = *guard;
*guard += value;
old
}
fn store(&self, value: u64, _order: atomic::Ordering) {
*self.atomic.lock().unwrap() = value;
}
fn load(&self, _order: atomic::Ordering) -> u64 {
*self.atomic.lock().unwrap()
}
}
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| { static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
gst::DebugCategory::new( gst::DebugCategory::new(
"utilsrs-stream-producer", "utilsrs-stream-producer",
@ -45,9 +96,9 @@ pub struct ConsumptionLink {
consumer: gst_app::AppSrc, consumer: gst_app::AppSrc,
producer: Option<StreamProducer>, producer: Option<StreamProducer>,
/// number of buffers dropped because `consumer` internal queue was full /// number of buffers dropped because `consumer` internal queue was full
dropped: Arc<atomic::AtomicU64>, dropped: Arc<WrappedAtomicU64>,
/// number of buffers pushed through `consumer` /// number of buffers pushed through `consumer`
pushed: Arc<atomic::AtomicU64>, pushed: Arc<WrappedAtomicU64>,
/// if buffers should not be pushed to the `consumer` right now /// if buffers should not be pushed to the `consumer` right now
discard: Arc<atomic::AtomicBool>, discard: Arc<atomic::AtomicBool>,
} }
@ -58,8 +109,8 @@ impl ConsumptionLink {
ConsumptionLink { ConsumptionLink {
consumer, consumer,
producer: None, producer: None,
dropped: Arc::new(atomic::AtomicU64::new(0)), dropped: Arc::new(WrappedAtomicU64::new(0)),
pushed: Arc::new(atomic::AtomicU64::new(0)), pushed: Arc::new(WrappedAtomicU64::new(0)),
discard: Arc::new(atomic::AtomicBool::new(false)), discard: Arc::new(atomic::AtomicBool::new(false)),
} }
} }
@ -157,8 +208,8 @@ impl StreamProducer {
&self, &self,
consumer: &gst_app::AppSrc, consumer: &gst_app::AppSrc,
) -> Result<ConsumptionLink, AddConsumerError> { ) -> Result<ConsumptionLink, AddConsumerError> {
let dropped = Arc::new(atomic::AtomicU64::new(0)); let dropped = Arc::new(WrappedAtomicU64::new(0));
let pushed = Arc::new(atomic::AtomicU64::new(0)); let pushed = Arc::new(WrappedAtomicU64::new(0));
let discard = Arc::new(atomic::AtomicBool::new(false)); let discard = Arc::new(atomic::AtomicBool::new(false));
self.add_consumer_internal(consumer, dropped.clone(), pushed.clone(), discard.clone())?; self.add_consumer_internal(consumer, dropped.clone(), pushed.clone(), discard.clone())?;
@ -175,8 +226,8 @@ impl StreamProducer {
fn add_consumer_internal( fn add_consumer_internal(
&self, &self,
consumer: &gst_app::AppSrc, consumer: &gst_app::AppSrc,
dropped: Arc<atomic::AtomicU64>, dropped: Arc<WrappedAtomicU64>,
pushed: Arc<atomic::AtomicU64>, pushed: Arc<WrappedAtomicU64>,
discard: Arc<atomic::AtomicBool>, discard: Arc<atomic::AtomicBool>,
) -> Result<(), AddConsumerError> { ) -> Result<(), AddConsumerError> {
let mut consumers = self.consumers.lock().unwrap(); let mut consumers = self.consumers.lock().unwrap();
@ -622,9 +673,9 @@ struct StreamConsumer {
/// streams. /// streams.
needs_keyframe: Arc<atomic::AtomicBool>, needs_keyframe: Arc<atomic::AtomicBool>,
/// number of buffers dropped because `appsrc` internal queue was full /// number of buffers dropped because `appsrc` internal queue was full
dropped: Arc<atomic::AtomicU64>, dropped: Arc<WrappedAtomicU64>,
/// number of buffers pushed through `appsrc` /// number of buffers pushed through `appsrc`
pushed: Arc<atomic::AtomicU64>, pushed: Arc<WrappedAtomicU64>,
/// if buffers should not be pushed to the `appsrc` right now /// if buffers should not be pushed to the `appsrc` right now
discard: Arc<atomic::AtomicBool>, discard: Arc<atomic::AtomicBool>,
} }
@ -634,8 +685,8 @@ impl StreamConsumer {
fn new( fn new(
appsrc: &gst_app::AppSrc, appsrc: &gst_app::AppSrc,
fku_probe_id: gst::PadProbeId, fku_probe_id: gst::PadProbeId,
dropped: Arc<atomic::AtomicU64>, dropped: Arc<WrappedAtomicU64>,
pushed: Arc<atomic::AtomicU64>, pushed: Arc<WrappedAtomicU64>,
discard: Arc<atomic::AtomicBool>, discard: Arc<atomic::AtomicBool>,
) -> Self { ) -> Self {
let needs_keyframe = Arc::new(atomic::AtomicBool::new(true)); let needs_keyframe = Arc::new(atomic::AtomicBool::new(true));