ts: queue & proxy: spawn on src_pad in PadSinkHandlers...

... instead of copying the src_pad's Context
This commit is contained in:
François Laignel 2020-01-17 15:37:54 +01:00
parent d98e76529d
commit 676ae87aed
2 changed files with 16 additions and 34 deletions

View file

@ -188,7 +188,6 @@ impl PendingQueue {
/// ProxySrc fields which are necessary for ProxySink /// ProxySrc fields which are necessary for ProxySink
#[derive(Debug)] #[derive(Debug)]
struct SharedSrc { struct SharedSrc {
ts_ctx: Context,
src_pad: PadSrcWeak, src_pad: PadSrcWeak,
} }
@ -312,7 +311,6 @@ impl ProxyContext {
#[derive(Debug)] #[derive(Debug)]
struct ProxySinkPadHandlerInner { struct ProxySinkPadHandlerInner {
flush_join_handle: sync::Mutex<Option<JoinHandle<Result<(), ()>>>>, flush_join_handle: sync::Mutex<Option<JoinHandle<Result<(), ()>>>>,
ts_ctx: Context,
src_pad: PadSrcWeak, src_pad: PadSrcWeak,
} }
@ -320,10 +318,9 @@ struct ProxySinkPadHandlerInner {
struct ProxySinkPadHandler(Arc<ProxySinkPadHandlerInner>); struct ProxySinkPadHandler(Arc<ProxySinkPadHandlerInner>);
impl ProxySinkPadHandler { impl ProxySinkPadHandler {
fn new(ts_ctx: Context, src_pad: PadSrcWeak) -> Self { fn new(src_pad: PadSrcWeak) -> Self {
ProxySinkPadHandler(Arc::new(ProxySinkPadHandlerInner { ProxySinkPadHandler(Arc::new(ProxySinkPadHandlerInner {
flush_join_handle: sync::Mutex::new(None), flush_join_handle: sync::Mutex::new(None),
ts_ctx,
src_pad, src_pad,
})) }))
} }
@ -427,12 +424,18 @@ impl PadSinkHandler for ProxySinkPadHandler {
.boxed(), .boxed(),
) )
} else { } else {
let src_pad = self
.0
.src_pad
.upgrade()
.expect("PadSrc no longer available");
if let EventView::FlushStart(..) = event.view() { if let EventView::FlushStart(..) = event.view() {
let mut flush_join_handle = self.0.flush_join_handle.lock().unwrap(); let mut flush_join_handle = self.0.flush_join_handle.lock().unwrap();
if flush_join_handle.is_none() { if flush_join_handle.is_none() {
gst_log!(SINK_CAT, obj: pad.gst_pad(), "Handling {:?}", event); gst_log!(SINK_CAT, obj: pad.gst_pad(), "Handling {:?}", event);
let element = element.clone(); let element = element.clone();
*flush_join_handle = Some(self.0.ts_ctx.spawn(async move { *flush_join_handle = Some(src_pad.spawn(async move {
ProxySink::from_instance(&element).stop(&element).await ProxySink::from_instance(&element).stop(&element).await
})); }));
} else { } else {
@ -441,15 +444,7 @@ impl PadSinkHandler for ProxySinkPadHandler {
} }
gst_log!(SINK_CAT, obj: pad.gst_pad(), "Fowarding non-serialized {:?}", event); gst_log!(SINK_CAT, obj: pad.gst_pad(), "Fowarding non-serialized {:?}", event);
Either::Left(src_pad.gst_pad().push_event(event))
Either::Left(
self.0
.src_pad
.upgrade()
.expect("PadSrc no longer available")
.gst_pad()
.push_event(event),
)
} }
} }
} }
@ -716,7 +711,7 @@ impl ProxySink {
gst_debug!(SINK_CAT, obj: element, "Completing preparation"); gst_debug!(SINK_CAT, obj: element, "Completing preparation");
let SharedSrc { ts_ctx, src_pad } = shared_src_rx.unwrap().await.map_err(|err| { let SharedSrc { src_pad } = shared_src_rx.unwrap().await.map_err(|err| {
gst_error_msg!( gst_error_msg!(
gst::ResourceError::OpenRead, gst::ResourceError::OpenRead,
["Failed to receive SharedSrc: {:?}", err] ["Failed to receive SharedSrc: {:?}", err]
@ -724,7 +719,7 @@ impl ProxySink {
})?; })?;
self.sink_pad self.sink_pad
.prepare(&ProxySinkPadHandler::new(ts_ctx, src_pad)) .prepare(&ProxySinkPadHandler::new(src_pad))
.await; .await;
gst_debug!(SINK_CAT, obj: element, "Preparation completed"); gst_debug!(SINK_CAT, obj: element, "Preparation completed");
@ -1230,7 +1225,6 @@ impl ProxySrc {
.take() .take()
.unwrap() .unwrap()
.send(SharedSrc { .send(SharedSrc {
ts_ctx: ts_ctx.clone(),
src_pad: self.src_pad.downgrade(), src_pad: self.src_pad.downgrade(),
}) })
.map_err(|err| { .map_err(|err| {

View file

@ -140,24 +140,14 @@ impl PendingQueue {
} }
} }
#[derive(Debug)] #[derive(Debug, Default)]
struct QueuePadSinkHandlerInner { struct QueuePadSinkHandlerInner {
flush_join_handle: sync::Mutex<Option<JoinHandle<Result<(), ()>>>>, flush_join_handle: sync::Mutex<Option<JoinHandle<Result<(), ()>>>>,
context: Context,
} }
#[derive(Clone, Debug)] #[derive(Clone, Default)]
struct QueuePadSinkHandler(Arc<QueuePadSinkHandlerInner>); struct QueuePadSinkHandler(Arc<QueuePadSinkHandlerInner>);
impl QueuePadSinkHandler {
fn new(context: Context) -> Self {
QueuePadSinkHandler(Arc::new(QueuePadSinkHandlerInner {
flush_join_handle: sync::Mutex::new(None),
context,
}))
}
}
impl PadSinkHandler for QueuePadSinkHandler { impl PadSinkHandler for QueuePadSinkHandler {
type ElementImpl = Queue; type ElementImpl = Queue;
@ -253,7 +243,7 @@ impl PadSinkHandler for QueuePadSinkHandler {
gst_log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event); gst_log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event);
let element = element.clone(); let element = element.clone();
*flush_join_handle = *flush_join_handle =
Some(self.0.context.spawn(async move { Some(queue.src_pad.spawn(async move {
Queue::from_instance(&element).stop(&element).await Queue::from_instance(&element).stop(&element).await
})); }));
} else { } else {
@ -748,7 +738,7 @@ impl Queue {
})?; })?;
self.src_pad self.src_pad
.prepare(context.clone(), &QueuePadSrcHandler::default()) .prepare(context, &QueuePadSrcHandler::default())
.await .await
.map_err(|err| { .map_err(|err| {
gst_error_msg!( gst_error_msg!(
@ -756,9 +746,7 @@ impl Queue {
["Error joining Context: {:?}", err] ["Error joining Context: {:?}", err]
) )
})?; })?;
self.sink_pad self.sink_pad.prepare(&QueuePadSinkHandler::default()).await;
.prepare(&QueuePadSinkHandler::new(context))
.await;
gst_debug!(CAT, obj: element, "Prepared"); gst_debug!(CAT, obj: element, "Prepared");