ts: Queue & Proxy: minor cleanups

This commit is contained in:
François Laignel 2022-06-26 16:47:43 +02:00 committed by Sebastian Dröge
parent 5720faa808
commit 7e826385c7
2 changed files with 89 additions and 111 deletions

View file

@ -28,9 +28,8 @@ use gst::subclass::prelude::*;
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
use std::collections::{HashMap, VecDeque}; use std::collections::{HashMap, VecDeque};
use std::sync::Mutex as StdMutex;
use std::sync::MutexGuard as StdMutexGuard;
use std::sync::{Arc, Weak}; use std::sync::{Arc, Weak};
use std::sync::{Mutex, MutexGuard};
use std::time::Duration; use std::time::Duration;
use std::{u32, u64}; use std::{u32, u64};
@ -41,12 +40,12 @@ use crate::runtime::{
use crate::dataqueue::{DataQueue, DataQueueItem}; use crate::dataqueue::{DataQueue, DataQueueItem};
static PROXY_CONTEXTS: Lazy<StdMutex<HashMap<String, Weak<StdMutex<ProxyContextInner>>>>> = static PROXY_CONTEXTS: Lazy<Mutex<HashMap<String, Weak<Mutex<ProxyContextInner>>>>> =
Lazy::new(|| StdMutex::new(HashMap::new())); Lazy::new(|| Mutex::new(HashMap::new()));
static PROXY_SRC_PADS: Lazy<StdMutex<HashMap<String, PadSrcWeak>>> = static PROXY_SRC_PADS: Lazy<Mutex<HashMap<String, PadSrcWeak>>> =
Lazy::new(|| StdMutex::new(HashMap::new())); Lazy::new(|| Mutex::new(HashMap::new()));
static PROXY_SINK_PADS: Lazy<StdMutex<HashMap<String, PadSinkWeak>>> = static PROXY_SINK_PADS: Lazy<Mutex<HashMap<String, PadSinkWeak>>> =
Lazy::new(|| StdMutex::new(HashMap::new())); Lazy::new(|| Mutex::new(HashMap::new()));
const DEFAULT_PROXY_CONTEXT: &str = ""; const DEFAULT_PROXY_CONTEXT: &str = "";
@ -126,14 +125,14 @@ impl Drop for ProxyContextInner {
#[derive(Debug)] #[derive(Debug)]
struct ProxyContext { struct ProxyContext {
shared: Arc<StdMutex<ProxyContextInner>>, shared: Arc<Mutex<ProxyContextInner>>,
as_sink: bool, as_sink: bool,
name: String, name: String,
} }
impl ProxyContext { impl ProxyContext {
#[inline] #[inline]
fn lock_shared(&self) -> StdMutexGuard<'_, ProxyContextInner> { fn lock_shared(&self) -> MutexGuard<'_, ProxyContextInner> {
self.shared.lock().unwrap() self.shared.lock().unwrap()
} }
@ -171,7 +170,7 @@ impl ProxyContext {
} }
if proxy_ctx.is_none() { if proxy_ctx.is_none() {
let shared = Arc::new(StdMutex::new(ProxyContextInner { let shared = Arc::new(Mutex::new(ProxyContextInner {
name: name.into(), name: name.into(),
dataqueue: None, dataqueue: None,
last_res: Err(gst::FlowError::Flushing), last_res: Err(gst::FlowError::Flushing),
@ -329,8 +328,8 @@ impl PadSinkHandler for ProxySinkPadHandler {
#[derive(Debug)] #[derive(Debug)]
pub struct ProxySink { pub struct ProxySink {
sink_pad: PadSink, sink_pad: PadSink,
proxy_ctx: StdMutex<Option<ProxyContext>>, proxy_ctx: Mutex<Option<ProxyContext>>,
settings: StdMutex<SettingsSink>, settings: Mutex<SettingsSink>,
} }
static SINK_CAT: Lazy<gst::DebugCategory> = Lazy::new(|| { static SINK_CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
@ -581,8 +580,8 @@ impl ObjectSubclass for ProxySink {
gst::Pad::from_template(&klass.pad_template("sink").unwrap(), Some("sink")), gst::Pad::from_template(&klass.pad_template("sink").unwrap(), Some("sink")),
ProxySinkPadHandler, ProxySinkPadHandler,
), ),
proxy_ctx: StdMutex::new(None), proxy_ctx: Mutex::new(None),
settings: StdMutex::new(SettingsSink::default()), settings: Mutex::new(SettingsSink::default()),
} }
} }
} }
@ -708,38 +707,6 @@ impl ElementImpl for ProxySink {
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
struct ProxySrcPadHandler; struct ProxySrcPadHandler;
impl ProxySrcPadHandler {
async fn push_item(
pad: &PadSrcRef<'_>,
proxysrc: &ProxySrc,
item: DataQueueItem,
) -> Result<(), gst::FlowError> {
{
let proxy_ctx = proxysrc.proxy_ctx.lock().unwrap();
let mut shared_ctx = proxy_ctx.as_ref().unwrap().lock_shared();
if let Some(pending_queue) = shared_ctx.pending_queue.as_mut() {
pending_queue.notify_more_queue_space();
}
}
match item {
DataQueueItem::Buffer(buffer) => {
gst::log!(SRC_CAT, obj: pad.gst_pad(), "Forwarding {:?}", buffer);
pad.push(buffer).await.map(drop)
}
DataQueueItem::BufferList(list) => {
gst::log!(SRC_CAT, obj: pad.gst_pad(), "Forwarding {:?}", list);
pad.push_list(list).await.map(drop)
}
DataQueueItem::Event(event) => {
gst::log!(SRC_CAT, obj: pad.gst_pad(), "Forwarding {:?}", event);
pad.push_event(event).await;
Ok(())
}
}
}
}
impl PadSrcHandler for ProxySrcPadHandler { impl PadSrcHandler for ProxySrcPadHandler {
type ElementImpl = ProxySrc; type ElementImpl = ProxySrc;
@ -853,16 +820,39 @@ impl PadSrcHandler for ProxySrcPadHandler {
#[derive(Debug)] #[derive(Debug)]
struct ProxySrcTask { struct ProxySrcTask {
element: super::ProxySrc, element: super::ProxySrc,
src_pad: PadSrcWeak,
dataqueue: DataQueue, dataqueue: DataQueue,
} }
impl ProxySrcTask { impl ProxySrcTask {
fn new(element: &super::ProxySrc, src_pad: &PadSrc, dataqueue: DataQueue) -> Self { fn new(element: super::ProxySrc, dataqueue: DataQueue) -> Self {
ProxySrcTask { ProxySrcTask { element, dataqueue }
element: element.clone(), }
src_pad: src_pad.downgrade(),
dataqueue, async fn push_item(&self, item: DataQueueItem) -> Result<(), gst::FlowError> {
let proxysrc = self.element.imp();
{
let proxy_ctx = proxysrc.proxy_ctx.lock().unwrap();
let mut shared_ctx = proxy_ctx.as_ref().unwrap().lock_shared();
if let Some(pending_queue) = shared_ctx.pending_queue.as_mut() {
pending_queue.notify_more_queue_space();
}
}
match item {
DataQueueItem::Buffer(buffer) => {
gst::log!(SRC_CAT, obj: &self.element, "Forwarding {:?}", buffer);
proxysrc.src_pad.push(buffer).await.map(drop)
}
DataQueueItem::BufferList(list) => {
gst::log!(SRC_CAT, obj: &self.element, "Forwarding {:?}", list);
proxysrc.src_pad.push_list(list).await.map(drop)
}
DataQueueItem::Event(event) => {
gst::log!(SRC_CAT, obj: &self.element, "Forwarding {:?}", event);
proxysrc.src_pad.push_event(event).await;
Ok(())
}
} }
} }
} }
@ -902,9 +892,8 @@ impl TaskImpl for ProxySrcTask {
} }
}; };
let pad = self.src_pad.upgrade().expect("PadSrc no longer exists"); let res = self.push_item(item).await;
let proxysrc = self.element.imp(); let proxysrc = self.element.imp();
let res = ProxySrcPadHandler::push_item(&pad, proxysrc, item).await;
match res { match res {
Ok(()) => { Ok(()) => {
gst::log!(SRC_CAT, obj: &self.element, "Successfully pushed item"); gst::log!(SRC_CAT, obj: &self.element, "Successfully pushed item");
@ -989,9 +978,9 @@ impl TaskImpl for ProxySrcTask {
pub struct ProxySrc { pub struct ProxySrc {
src_pad: PadSrc, src_pad: PadSrc,
task: Task, task: Task,
proxy_ctx: StdMutex<Option<ProxyContext>>, proxy_ctx: Mutex<Option<ProxyContext>>,
dataqueue: StdMutex<Option<DataQueue>>, dataqueue: Mutex<Option<DataQueue>>,
settings: StdMutex<SettingsSrc>, settings: Mutex<SettingsSrc>,
} }
static SRC_CAT: Lazy<gst::DebugCategory> = Lazy::new(|| { static SRC_CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
@ -1052,11 +1041,10 @@ impl ProxySrc {
} }
*self.proxy_ctx.lock().unwrap() = Some(proxy_ctx); *self.proxy_ctx.lock().unwrap() = Some(proxy_ctx);
*self.dataqueue.lock().unwrap() = Some(dataqueue.clone()); *self.dataqueue.lock().unwrap() = Some(dataqueue.clone());
self.task self.task
.prepare(ProxySrcTask::new(element, &self.src_pad, dataqueue), ts_ctx) .prepare(ProxySrcTask::new(element.clone(), dataqueue), ts_ctx)
.map_err(|err| { .map_err(|err| {
gst::error_msg!( gst::error_msg!(
gst::ResourceError::OpenRead, gst::ResourceError::OpenRead,
@ -1121,9 +1109,9 @@ impl ObjectSubclass for ProxySrc {
ProxySrcPadHandler, ProxySrcPadHandler,
), ),
task: Task::default(), task: Task::default(),
proxy_ctx: StdMutex::new(None), proxy_ctx: Mutex::new(None),
dataqueue: StdMutex::new(None), dataqueue: Mutex::new(None),
settings: StdMutex::new(SettingsSrc::default()), settings: Mutex::new(SettingsSrc::default()),
} }
} }
} }

View file

@ -28,12 +28,12 @@ use gst::subclass::prelude::*;
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
use std::collections::VecDeque; use std::collections::VecDeque;
use std::sync::Mutex as StdMutex; use std::sync::Mutex;
use std::time::Duration; use std::time::Duration;
use std::{u32, u64}; use std::{u32, u64};
use crate::runtime::prelude::*; use crate::runtime::prelude::*;
use crate::runtime::{Context, PadSink, PadSinkRef, PadSrc, PadSrcRef, PadSrcWeak, Task}; use crate::runtime::{Context, PadSink, PadSinkRef, PadSrc, PadSrcRef, Task};
use crate::dataqueue::{DataQueue, DataQueueItem}; use crate::dataqueue::{DataQueue, DataQueueItem};
@ -213,34 +213,6 @@ impl PadSinkHandler for QueuePadSinkHandler {
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
struct QueuePadSrcHandler; struct QueuePadSrcHandler;
impl QueuePadSrcHandler {
async fn push_item(
pad: &PadSrcRef<'_>,
queue: &Queue,
item: DataQueueItem,
) -> Result<(), gst::FlowError> {
if let Some(pending_queue) = queue.pending_queue.lock().unwrap().as_mut() {
pending_queue.notify_more_queue_space();
}
match item {
DataQueueItem::Buffer(buffer) => {
gst::log!(CAT, obj: pad.gst_pad(), "Forwarding {:?}", buffer);
pad.push(buffer).await.map(drop)
}
DataQueueItem::BufferList(list) => {
gst::log!(CAT, obj: pad.gst_pad(), "Forwarding {:?}", list);
pad.push_list(list).await.map(drop)
}
DataQueueItem::Event(event) => {
gst::log!(CAT, obj: pad.gst_pad(), "Forwarding {:?}", event);
pad.push_event(event).await;
Ok(())
}
}
}
}
impl PadSrcHandler for QueuePadSrcHandler { impl PadSrcHandler for QueuePadSrcHandler {
type ElementImpl = Queue; type ElementImpl = Queue;
@ -322,16 +294,35 @@ impl PadSrcHandler for QueuePadSrcHandler {
#[derive(Debug)] #[derive(Debug)]
struct QueueTask { struct QueueTask {
element: super::Queue, element: super::Queue,
src_pad: PadSrcWeak,
dataqueue: DataQueue, dataqueue: DataQueue,
} }
impl QueueTask { impl QueueTask {
fn new(element: &super::Queue, src_pad: &PadSrc, dataqueue: DataQueue) -> Self { fn new(element: super::Queue, dataqueue: DataQueue) -> Self {
QueueTask { QueueTask { element, dataqueue }
element: element.clone(), }
src_pad: src_pad.downgrade(),
dataqueue, async fn push_item(&self, item: DataQueueItem) -> Result<(), gst::FlowError> {
let queue = self.element.imp();
if let Some(pending_queue) = queue.pending_queue.lock().unwrap().as_mut() {
pending_queue.notify_more_queue_space();
}
match item {
DataQueueItem::Buffer(buffer) => {
gst::log!(CAT, obj: &self.element, "Forwarding {:?}", buffer);
queue.src_pad.push(buffer).await.map(drop)
}
DataQueueItem::BufferList(list) => {
gst::log!(CAT, obj: &self.element, "Forwarding {:?}", list);
queue.src_pad.push_list(list).await.map(drop)
}
DataQueueItem::Event(event) => {
gst::log!(CAT, obj: &self.element, "Forwarding {:?}", event);
queue.src_pad.push_event(event).await;
Ok(())
}
} }
} }
} }
@ -366,9 +357,8 @@ impl TaskImpl for QueueTask {
} }
}; };
let pad = self.src_pad.upgrade().expect("PadSrc no longer exists"); let res = self.push_item(item).await;
let queue = self.element.imp(); let queue = self.element.imp();
let res = QueuePadSrcHandler::push_item(&pad, queue, item).await;
match res { match res {
Ok(()) => { Ok(()) => {
gst::log!(CAT, obj: &self.element, "Successfully pushed item"); gst::log!(CAT, obj: &self.element, "Successfully pushed item");
@ -381,7 +371,7 @@ impl TaskImpl for QueueTask {
Err(gst::FlowError::Eos) => { Err(gst::FlowError::Eos) => {
gst::debug!(CAT, obj: &self.element, "EOS"); gst::debug!(CAT, obj: &self.element, "EOS");
*queue.last_res.lock().unwrap() = Err(gst::FlowError::Eos); *queue.last_res.lock().unwrap() = Err(gst::FlowError::Eos);
pad.push_event(gst::event::Eos::new()).await; queue.src_pad.push_event(gst::event::Eos::new()).await;
} }
Err(err) => { Err(err) => {
gst::error!(CAT, obj: &self.element, "Got error {}", err); gst::error!(CAT, obj: &self.element, "Got error {}", err);
@ -449,10 +439,10 @@ pub struct Queue {
sink_pad: PadSink, sink_pad: PadSink,
src_pad: PadSrc, src_pad: PadSrc,
task: Task, task: Task,
dataqueue: StdMutex<Option<DataQueue>>, dataqueue: Mutex<Option<DataQueue>>,
pending_queue: StdMutex<Option<PendingQueue>>, pending_queue: Mutex<Option<PendingQueue>>,
last_res: StdMutex<Result<gst::FlowSuccess, gst::FlowError>>, last_res: Mutex<Result<gst::FlowSuccess, gst::FlowError>>,
settings: StdMutex<Settings>, settings: Mutex<Settings>,
} }
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| { static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
@ -648,7 +638,7 @@ impl Queue {
})?; })?;
self.task self.task
.prepare(QueueTask::new(element, &self.src_pad, dataqueue), context) .prepare(QueueTask::new(element.clone(), dataqueue), context)
.map_err(|err| { .map_err(|err| {
gst::error_msg!( gst::error_msg!(
gst::ResourceError::OpenRead, gst::ResourceError::OpenRead,
@ -706,10 +696,10 @@ impl ObjectSubclass for Queue {
QueuePadSrcHandler, QueuePadSrcHandler,
), ),
task: Task::default(), task: Task::default(),
dataqueue: StdMutex::new(None), dataqueue: Mutex::new(None),
pending_queue: StdMutex::new(None), pending_queue: Mutex::new(None),
last_res: StdMutex::new(Ok(gst::FlowSuccess::Ok)), last_res: Mutex::new(Ok(gst::FlowSuccess::Ok)),
settings: StdMutex::new(Settings::default()), settings: Mutex::new(Settings::default()),
} }
} }
} }