mirror of
https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git
synced 2025-01-11 03:35:26 +00:00
ts/tests/pad: minor ckeanups
This commit is contained in:
parent
d6a9106ffa
commit
907d89c998
1 changed files with 32 additions and 50 deletions
|
@ -21,7 +21,6 @@
|
|||
|
||||
use futures::channel::mpsc;
|
||||
use futures::future::BoxFuture;
|
||||
use futures::lock::Mutex as FutMutex;
|
||||
use futures::prelude::*;
|
||||
|
||||
use gst::glib;
|
||||
|
@ -33,13 +32,11 @@ use gst::EventView;
|
|||
use once_cell::sync::Lazy;
|
||||
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::Mutex as StdMutex;
|
||||
use std::sync::Mutex;
|
||||
use std::time::Duration;
|
||||
|
||||
use gstthreadshare::runtime::prelude::*;
|
||||
use gstthreadshare::runtime::{
|
||||
Context, PadSink, PadSinkRef, PadSrc, PadSrcRef, PadSrcWeak, Task, TaskState,
|
||||
};
|
||||
use gstthreadshare::runtime::{Context, PadSink, PadSinkRef, PadSrc, PadSrcRef, Task, TaskState};
|
||||
|
||||
const DEFAULT_CONTEXT: &str = "";
|
||||
const THROTTLING_DURATION: Duration = Duration::from_millis(2);
|
||||
|
@ -87,25 +84,6 @@ mod imp_src {
|
|||
#[derive(Clone, Debug)]
|
||||
struct PadSrcTestHandler;
|
||||
|
||||
impl PadSrcTestHandler {
|
||||
async fn push_item(
|
||||
pad: &PadSrcRef<'_>,
|
||||
item: Item,
|
||||
) -> Result<gst::FlowSuccess, gst::FlowError> {
|
||||
gst::debug!(SRC_CAT, obj: pad.gst_pad(), "Handling {:?}", item);
|
||||
|
||||
match item {
|
||||
Item::Event(event) => {
|
||||
pad.push_event(event).await;
|
||||
|
||||
Ok(gst::FlowSuccess::Ok)
|
||||
}
|
||||
Item::Buffer(buffer) => pad.push(buffer).await,
|
||||
Item::BufferList(list) => pad.push_list(list).await,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl PadSrcHandler for PadSrcTestHandler {
|
||||
type ElementImpl = ElementSrcTest;
|
||||
|
||||
|
@ -144,21 +122,12 @@ mod imp_src {
|
|||
#[derive(Debug)]
|
||||
struct ElementSrcTestTask {
|
||||
element: super::ElementSrcTest,
|
||||
src_pad: PadSrcWeak,
|
||||
receiver: mpsc::Receiver<Item>,
|
||||
}
|
||||
|
||||
impl ElementSrcTestTask {
|
||||
fn new(
|
||||
element: &super::ElementSrcTest,
|
||||
src_pad: &PadSrc,
|
||||
receiver: mpsc::Receiver<Item>,
|
||||
) -> Self {
|
||||
ElementSrcTestTask {
|
||||
element: element.clone(),
|
||||
src_pad: src_pad.downgrade(),
|
||||
receiver,
|
||||
}
|
||||
fn new(element: super::ElementSrcTest, receiver: mpsc::Receiver<Item>) -> Self {
|
||||
ElementSrcTestTask { element, receiver }
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -167,6 +136,20 @@ mod imp_src {
|
|||
// Purge the channel
|
||||
while let Ok(Some(_item)) = self.receiver.try_next() {}
|
||||
}
|
||||
async fn push_item(&self, item: Item) -> Result<gst::FlowSuccess, gst::FlowError> {
|
||||
gst::debug!(SRC_CAT, obj: &self.element, "Handling {:?}", item);
|
||||
|
||||
let elementsrctest = self.element.imp();
|
||||
match item {
|
||||
Item::Event(event) => {
|
||||
elementsrctest.src_pad.push_event(event).await;
|
||||
|
||||
Ok(gst::FlowSuccess::Ok)
|
||||
}
|
||||
Item::Buffer(buffer) => elementsrctest.src_pad.push(buffer).await,
|
||||
Item::BufferList(list) => elementsrctest.src_pad.push_list(list).await,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl TaskImpl for ElementSrcTestTask {
|
||||
|
@ -182,8 +165,7 @@ mod imp_src {
|
|||
}
|
||||
};
|
||||
|
||||
let pad = self.src_pad.upgrade().expect("PadSrc no longer exists");
|
||||
let res = PadSrcTestHandler::push_item(&pad, item).await;
|
||||
let res = self.push_item(item).await;
|
||||
match res {
|
||||
Ok(_) => gst::log!(SRC_CAT, obj: &self.element, "Successfully pushed item"),
|
||||
Err(gst::FlowError::Flushing) => {
|
||||
|
@ -222,8 +204,8 @@ mod imp_src {
|
|||
pub struct ElementSrcTest {
|
||||
src_pad: PadSrc,
|
||||
task: Task,
|
||||
sender: StdMutex<Option<mpsc::Sender<Item>>>,
|
||||
settings: StdMutex<Settings>,
|
||||
sender: Mutex<Option<mpsc::Sender<Item>>>,
|
||||
settings: Mutex<Settings>,
|
||||
}
|
||||
|
||||
impl ElementSrcTest {
|
||||
|
@ -260,10 +242,7 @@ mod imp_src {
|
|||
*self.sender.lock().unwrap() = Some(sender);
|
||||
|
||||
self.task
|
||||
.prepare(
|
||||
ElementSrcTestTask::new(element, &self.src_pad, receiver),
|
||||
context,
|
||||
)
|
||||
.prepare(ElementSrcTestTask::new(element.clone(), receiver), context)
|
||||
.map_err(|err| {
|
||||
gst::error_msg!(
|
||||
gst::ResourceError::Failed,
|
||||
|
@ -317,8 +296,8 @@ mod imp_src {
|
|||
PadSrcTestHandler,
|
||||
),
|
||||
task: Task::default(),
|
||||
sender: StdMutex::new(None),
|
||||
settings: StdMutex::new(Settings::default()),
|
||||
sender: Mutex::new(None),
|
||||
settings: Mutex::new(Settings::default()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -565,7 +544,7 @@ mod imp_sink {
|
|||
pub struct ElementSinkTest {
|
||||
sink_pad: PadSink,
|
||||
flushing: AtomicBool,
|
||||
sender: FutMutex<Option<mpsc::Sender<Item>>>,
|
||||
sender: Mutex<Option<mpsc::Sender<Item>>>,
|
||||
}
|
||||
|
||||
impl ElementSinkTest {
|
||||
|
@ -576,11 +555,14 @@ mod imp_sink {
|
|||
) -> Result<gst::FlowSuccess, gst::FlowError> {
|
||||
if !self.flushing.load(Ordering::SeqCst) {
|
||||
gst::debug!(SINK_CAT, obj: element, "Fowarding {:?}", item);
|
||||
self.sender
|
||||
let mut sender = self
|
||||
.sender
|
||||
.lock()
|
||||
.await
|
||||
.unwrap()
|
||||
.as_mut()
|
||||
.expect("Item Sender not set")
|
||||
.clone();
|
||||
sender
|
||||
.send(item)
|
||||
.await
|
||||
.map(|_| gst::FlowSuccess::Ok)
|
||||
|
@ -648,7 +630,7 @@ mod imp_sink {
|
|||
PadSinkTestHandler,
|
||||
),
|
||||
flushing: AtomicBool::new(true),
|
||||
sender: FutMutex::new(None),
|
||||
sender: Mutex::new(None),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -681,7 +663,7 @@ mod imp_sink {
|
|||
.get::<&ItemSender>()
|
||||
.expect("type checked upstream")
|
||||
.clone();
|
||||
*futures::executor::block_on(self.sender.lock()) = Some(sender);
|
||||
*self.sender.lock().unwrap() = Some(sender);
|
||||
}
|
||||
_ => unimplemented!(),
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue