proxy: fix blocking upwards state change

ProxySink previously blocked on receiving the source pad
of ProxySrc in its ReadyToPaused transition, which meant
ProxySrc had to transition to Ready at the same time.

The usual use case is for the source and sink to reside in
two separate pipelines, and such an arrangement easily led
to deadlocks, as examplified by the new test case.

Instead we now maintain two more global hash maps holding
per-context sink pads and src pads weak references, and
forward events to those when needed.

As ProxySink may not have a source pad context to run
a future on when receiving FlushStart, gst::Element::call_async
is used instead, with a simple oneshot channel used to synchronize
flush start and flush stop handling.
This commit is contained in:
Mathieu Duponchelle 2020-01-28 22:08:58 +01:00
parent 4abb389269
commit 53e948b8da
3 changed files with 216 additions and 191 deletions

View file

@ -14,7 +14,7 @@ gobject-sys = { git = "https://github.com/gtk-rs/sys" }
gio-sys = { git = "https://github.com/gtk-rs/sys" } gio-sys = { git = "https://github.com/gtk-rs/sys" }
glib = { git = "https://github.com/gtk-rs/glib" } glib = { git = "https://github.com/gtk-rs/glib" }
gio = { git = "https://github.com/gtk-rs/gio" } gio = { git = "https://github.com/gtk-rs/gio" }
gst = { package = "gstreamer", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" } gst = { package = "gstreamer", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features=["v1_10"] }
gst-app = { package = "gstreamer-app", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" } gst-app = { package = "gstreamer-app", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
gst-check = { package = "gstreamer-check", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" } gst-check = { package = "gstreamer-check", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
gst-net = { package = "gstreamer-net", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" } gst-net = { package = "gstreamer-net", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }

View file

@ -49,6 +49,10 @@ use super::dataqueue::{DataQueue, DataQueueItem};
lazy_static! { lazy_static! {
static ref PROXY_CONTEXTS: Mutex<HashMap<String, Weak<Mutex<ProxyContextInner>>>> = static ref PROXY_CONTEXTS: Mutex<HashMap<String, Weak<Mutex<ProxyContextInner>>>> =
Mutex::new(HashMap::new()); Mutex::new(HashMap::new());
static ref PROXY_SRC_PADS: std::sync::Mutex<HashMap<String, PadSrcWeak>> =
std::sync::Mutex::new(HashMap::new());
static ref PROXY_SINK_PADS: std::sync::Mutex<HashMap<String, PadSinkWeak>> =
std::sync::Mutex::new(HashMap::new());
} }
const DEFAULT_PROXY_CONTEXT: &str = ""; const DEFAULT_PROXY_CONTEXT: &str = "";
@ -185,18 +189,6 @@ impl PendingQueue {
} }
} }
/// ProxySrc fields which are necessary for ProxySink
#[derive(Debug)]
struct SharedSrc {
src_pad: PadSrcWeak,
}
/// ProxySink fields which are necessary for ProxySrc
#[derive(Debug)]
struct SharedSink {
sink_pad: PadSinkWeak,
}
#[derive(Debug)] #[derive(Debug)]
struct ProxyContextInner { struct ProxyContextInner {
name: String, name: String,
@ -205,10 +197,6 @@ struct ProxyContextInner {
pending_queue: Option<PendingQueue>, pending_queue: Option<PendingQueue>,
have_sink: bool, have_sink: bool,
have_src: bool, have_src: bool,
shared_src_tx: Option<oneshot::Sender<SharedSrc>>,
shared_src_rx: Option<oneshot::Receiver<SharedSrc>>,
shared_sink_tx: Option<oneshot::Sender<SharedSink>>,
shared_sink_rx: Option<oneshot::Receiver<SharedSink>>,
} }
impl ProxyContextInner { impl ProxyContextInner {
@ -232,6 +220,7 @@ impl Drop for ProxyContextInner {
struct ProxyContext { struct ProxyContext {
shared: Arc<Mutex<ProxyContextInner>>, shared: Arc<Mutex<ProxyContextInner>>,
as_sink: bool, as_sink: bool,
name: String,
} }
impl ProxyContext { impl ProxyContext {
@ -254,7 +243,11 @@ impl ProxyContext {
} }
proxy_ctx = Some({ proxy_ctx = Some({
let proxy_ctx = ProxyContext { shared, as_sink }; let proxy_ctx = ProxyContext {
shared,
as_sink,
name: name.into(),
};
{ {
let mut shared = proxy_ctx.lock_shared().await; let mut shared = proxy_ctx.lock_shared().await;
if as_sink { if as_sink {
@ -270,8 +263,6 @@ impl ProxyContext {
} }
if proxy_ctx.is_none() { if proxy_ctx.is_none() {
let (shared_src_tx, shared_src_rx) = oneshot::channel();
let (shared_sink_tx, shared_sink_rx) = oneshot::channel();
let shared = Arc::new(Mutex::new(ProxyContextInner { let shared = Arc::new(Mutex::new(ProxyContextInner {
name: name.into(), name: name.into(),
dataqueue: None, dataqueue: None,
@ -279,15 +270,15 @@ impl ProxyContext {
pending_queue: None, pending_queue: None,
have_sink: as_sink, have_sink: as_sink,
have_src: !as_sink, have_src: !as_sink,
shared_src_tx: Some(shared_src_tx),
shared_src_rx: Some(shared_src_rx),
shared_sink_tx: Some(shared_sink_tx),
shared_sink_rx: Some(shared_sink_rx),
})); }));
proxy_ctxs.insert(name.into(), Arc::downgrade(&shared)); proxy_ctxs.insert(name.into(), Arc::downgrade(&shared));
proxy_ctx = Some(ProxyContext { shared, as_sink }); proxy_ctx = Some(ProxyContext {
shared,
as_sink,
name: name.into(),
});
} }
proxy_ctx proxy_ctx
@ -310,20 +301,24 @@ impl ProxyContext {
#[derive(Debug)] #[derive(Debug)]
struct ProxySinkPadHandlerInner { struct ProxySinkPadHandlerInner {
flush_join_handle: sync::Mutex<Option<JoinHandle<Result<(), ()>>>>, proxy_ctx: ProxyContext,
src_pad: PadSrcWeak, flush_rx: sync::Mutex<Option<oneshot::Receiver<()>>>,
} }
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
struct ProxySinkPadHandler(Arc<ProxySinkPadHandlerInner>); struct ProxySinkPadHandler(Arc<ProxySinkPadHandlerInner>);
impl ProxySinkPadHandler { impl ProxySinkPadHandler {
fn new(src_pad: PadSrcWeak) -> Self { fn new(proxy_ctx: ProxyContext) -> Self {
ProxySinkPadHandler(Arc::new(ProxySinkPadHandlerInner { ProxySinkPadHandler(Arc::new(ProxySinkPadHandlerInner {
flush_join_handle: sync::Mutex::new(None), proxy_ctx: proxy_ctx,
src_pad, flush_rx: sync::Mutex::new(None),
})) }))
} }
fn proxy_ctx(&self) -> &ProxyContext {
&self.0.proxy_ctx
}
} }
impl PadSinkHandler for ProxySinkPadHandler { impl PadSinkHandler for ProxySinkPadHandler {
@ -399,16 +394,11 @@ impl PadSinkHandler for ProxySinkPadHandler {
EventView::FlushStop(..) => { EventView::FlushStop(..) => {
let inner = inner_weak.upgrade().unwrap(); let inner = inner_weak.upgrade().unwrap();
let flush_join_handle = inner.flush_join_handle.lock().unwrap().take(); let flush_rx = inner.flush_rx.lock().unwrap().take();
if let Some(flush_join_handle) = flush_join_handle {
gst_debug!(SINK_CAT, obj: pad.gst_pad(), "Waiting for FlushStart to complete"); if let Some(flush_rx) = flush_rx {
if let Ok(Ok(())) = flush_join_handle.await { let _ = flush_rx.await;
let _ = proxysink.start(&element).await; let _ = proxysink.start(&element).await;
} else {
gst_debug!(SINK_CAT, obj: pad.gst_pad(), "FlushStop ignored: FlushStart failed to complete");
}
} else {
gst_debug!(SINK_CAT, obj: pad.gst_pad(), "FlushStop ignored: no Flush in progress");
} }
} }
_ => (), _ => (),
@ -424,46 +414,58 @@ impl PadSinkHandler for ProxySinkPadHandler {
.boxed(), .boxed(),
) )
} else { } else {
let src_pad = self let src_pad = PROXY_SRC_PADS
.0 .lock()
.src_pad .unwrap()
.upgrade() .get(&self.proxy_ctx().name)
.expect("PadSrc no longer available"); .and_then(|src_pad| src_pad.upgrade())
.map(|src_pad| src_pad.gst_pad().clone());
// The usual behaviour on flush start is to spawn on a pad context,
// proxy elements are a special case as we don't necessarily have
// a pad context to spawn on.
if let EventView::FlushStart(..) = event.view() { if let EventView::FlushStart(..) = event.view() {
let mut flush_join_handle = self.0.flush_join_handle.lock().unwrap(); let (flush_tx, flush_rx) = oneshot::channel();
if flush_join_handle.is_none() { *self.0.flush_rx.lock().unwrap() = Some(flush_rx);
gst_log!(SINK_CAT, obj: pad.gst_pad(), "Handling {:?}", event);
let element = element.clone(); element.call_async(move |element| {
*flush_join_handle = Some(src_pad.spawn(async move { let _ = runtime::executor::block_on(
ProxySink::from_instance(&element).stop(&element).await ProxySink::from_instance(element).stop(element),
})); );
} else { let _ = flush_tx.send(());
gst_debug!(SINK_CAT, obj: pad.gst_pad(), "FlushStart ignored: previous Flush in progress"); });
}
} }
gst_log!(SINK_CAT, obj: pad.gst_pad(), "Fowarding non-serialized {:?}", event); if let Some(src_pad) = src_pad {
Either::Left(src_pad.gst_pad().push_event(event)) Either::Left(src_pad.push_event(event))
} else {
gst_error!(SINK_CAT, obj: pad.gst_pad(), "No src pad to forward non-serialized event to");
Either::Left(true)
}
} }
} }
} }
#[derive(Debug)] #[derive(Debug)]
struct StateSink { struct StateSink {
proxy_ctx: Option<ProxyContext>, sink_pad_handler: Option<ProxySinkPadHandler>,
} }
impl StateSink { impl StateSink {
#[inline] #[inline]
fn proxy_ctx(&self) -> &ProxyContext { fn proxy_ctx(&self) -> Option<&ProxyContext> {
self.proxy_ctx.as_ref().unwrap() match self.sink_pad_handler.as_ref() {
Some(handler) => Some(handler.proxy_ctx()),
None => None,
}
} }
} }
impl Default for StateSink { impl Default for StateSink {
fn default() -> Self { fn default() -> Self {
StateSink { proxy_ctx: None } StateSink {
sink_pad_handler: None,
}
} }
} }
@ -500,10 +502,12 @@ impl ProxySink {
let more_queue_space_receiver = { let more_queue_space_receiver = {
let state = sink.state.lock().await; let state = sink.state.lock().await;
let proxy_ctx = state.proxy_ctx.as_ref(); let proxy_ctx = state.proxy_ctx();
if proxy_ctx.is_none() { if proxy_ctx.is_none() {
return; return;
} }
let mut shared_ctx = proxy_ctx.unwrap().lock_shared().await; let mut shared_ctx = proxy_ctx.unwrap().lock_shared().await;
gst_log!(SINK_CAT, obj: &element, "Trying to empty pending queue"); gst_log!(SINK_CAT, obj: &element, "Trying to empty pending queue");
@ -561,9 +565,13 @@ impl ProxySink {
let wait_fut = { let wait_fut = {
let state = self.state.lock().await; let state = self.state.lock().await;
let proxy_ctx = state.proxy_ctx.as_ref().ok_or(gst::FlowError::Error)?; let proxy_ctx = state.proxy_ctx().ok_or(gst::FlowError::Error)?;
let mut shared_ctx = proxy_ctx.lock_shared().await; let mut shared_ctx = proxy_ctx.lock_shared().await;
/* We've taken the lock again, make sure not to recreate
* a pending queue if tearing down */
shared_ctx.last_res?;
let item = { let item = {
let ProxyContextInner { let ProxyContextInner {
ref mut pending_queue, ref mut pending_queue,
@ -658,7 +666,7 @@ impl ProxySink {
} }
let state = self.state.lock().await; let state = self.state.lock().await;
let shared_ctx = state.proxy_ctx().lock_shared().await; let shared_ctx = state.proxy_ctx().unwrap().lock_shared().await;
shared_ctx.last_res shared_ctx.last_res
} }
@ -666,7 +674,9 @@ impl ProxySink {
let mut state = self.state.lock().await; let mut state = self.state.lock().await;
gst_debug!(SINK_CAT, obj: element, "Preparing"); gst_debug!(SINK_CAT, obj: element, "Preparing");
let proxy_ctx = ProxyContext::get(&self.settings.lock().await.proxy_context, true) let proxy_context = self.settings.lock().await.proxy_context.to_string();
let proxy_ctx = ProxyContext::get(&proxy_context, true)
.await .await
.ok_or_else(|| { .ok_or_else(|| {
gst_error_msg!( gst_error_msg!(
@ -676,62 +686,27 @@ impl ProxySink {
})?; })?;
{ {
let mut shared_ctx = proxy_ctx.lock_shared().await; let mut proxy_sink_pads = PROXY_SINK_PADS.lock().unwrap();
assert!(shared_ctx.shared_src_rx.is_some()); assert!(!proxy_sink_pads.contains_key(&proxy_context));
shared_ctx proxy_sink_pads.insert(proxy_context, self.sink_pad.downgrade());
.shared_sink_tx
.take()
.unwrap()
.send(SharedSink {
sink_pad: self.sink_pad.downgrade(),
})
.map_err(|err| {
gst_error_msg!(
gst::ResourceError::OpenRead,
["Failed to send SharedSink: {:?}", err]
)
})?;
} }
state.proxy_ctx = Some(proxy_ctx); let handler = ProxySinkPadHandler::new(proxy_ctx);
self.sink_pad.prepare(&handler).await;
state.sink_pad_handler = Some(handler);
gst_debug!(SINK_CAT, obj: element, "Prepared"); gst_debug!(SINK_CAT, obj: element, "Prepared");
Ok(()) Ok(())
} }
async fn complete_preparation(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
let state = self.state.lock().await;
let shared_src_rx = state.proxy_ctx().lock_shared().await.shared_src_rx.take();
if shared_src_rx.is_none() {
gst_log!(SINK_CAT, obj: element, "Preparation already completed");
return Ok(());
}
gst_debug!(SINK_CAT, obj: element, "Completing preparation");
let SharedSrc { src_pad } = shared_src_rx.unwrap().await.map_err(|err| {
gst_error_msg!(
gst::ResourceError::OpenRead,
["Failed to receive SharedSrc: {:?}", err]
)
})?;
self.sink_pad
.prepare(&ProxySinkPadHandler::new(src_pad))
.await;
gst_debug!(SINK_CAT, obj: element, "Preparation completed");
Ok(())
}
async fn unprepare(&self, element: &gst::Element) -> Result<(), ()> { async fn unprepare(&self, element: &gst::Element) -> Result<(), ()> {
let mut state = self.state.lock().await; let mut state = self.state.lock().await;
gst_debug!(SINK_CAT, obj: element, "Unpreparing"); gst_debug!(SINK_CAT, obj: element, "Unpreparing");
let proxy_ctx = state.proxy_ctx.take().unwrap(); let proxy_ctx = state.proxy_ctx().unwrap();
proxy_ctx.unprepare().await; proxy_ctx.unprepare().await;
self.sink_pad.unprepare().await; self.sink_pad.unprepare().await;
@ -746,7 +721,13 @@ impl ProxySink {
let state = self.state.lock().await; let state = self.state.lock().await;
gst_debug!(SINK_CAT, obj: element, "Starting"); gst_debug!(SINK_CAT, obj: element, "Starting");
let mut shared_ctx = state.proxy_ctx().lock_shared().await; {
let settings = self.settings.lock().await;
let mut proxy_sink_pads = PROXY_SINK_PADS.lock().unwrap();
proxy_sink_pads.remove(&settings.proxy_context);
}
let mut shared_ctx = state.proxy_ctx().unwrap().lock_shared().await;
shared_ctx.last_res = Ok(gst::FlowSuccess::Ok); shared_ctx.last_res = Ok(gst::FlowSuccess::Ok);
gst_debug!(SINK_CAT, obj: element, "Started"); gst_debug!(SINK_CAT, obj: element, "Started");
@ -758,7 +739,7 @@ impl ProxySink {
let state = self.state.lock().await; let state = self.state.lock().await;
gst_debug!(SINK_CAT, obj: element, "Stopping"); gst_debug!(SINK_CAT, obj: element, "Stopping");
let mut shared_ctx = state.proxy_ctx().lock_shared().await; let mut shared_ctx = state.proxy_ctx().unwrap().lock_shared().await;
let _ = shared_ctx.pending_queue.take(); let _ = shared_ctx.pending_queue.take();
shared_ctx.last_res = Err(gst::FlowError::Flushing); shared_ctx.last_res = Err(gst::FlowError::Flushing);
@ -863,10 +844,6 @@ impl ElementImpl for ProxySink {
gst::StateChangeError gst::StateChangeError
})?; })?;
} }
gst::StateChange::ReadyToPaused => {
runtime::executor::block_on(self.complete_preparation(element))
.map_err(|_| gst::StateChangeError)?;
}
gst::StateChange::PausedToReady => { gst::StateChange::PausedToReady => {
runtime::executor::block_on(self.stop(element)) runtime::executor::block_on(self.stop(element))
.map_err(|_| gst::StateChangeError)?; .map_err(|_| gst::StateChangeError)?;
@ -891,20 +868,24 @@ impl ElementImpl for ProxySink {
#[derive(Debug)] #[derive(Debug)]
struct ProxySrcPadHandlerInner { struct ProxySrcPadHandlerInner {
flush_join_handle: sync::Mutex<Option<JoinHandle<Result<(), ()>>>>, flush_join_handle: sync::Mutex<Option<JoinHandle<Result<(), ()>>>>,
sink_pad: PadSinkWeak, proxy_ctx: ProxyContext,
} }
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
struct ProxySrcPadHandler(Arc<ProxySrcPadHandlerInner>); struct ProxySrcPadHandler(Arc<ProxySrcPadHandlerInner>);
impl ProxySrcPadHandler { impl ProxySrcPadHandler {
fn new(sink_pad: PadSinkWeak) -> Self { fn new(proxy_ctx: ProxyContext) -> Self {
ProxySrcPadHandler(Arc::new(ProxySrcPadHandlerInner { ProxySrcPadHandler(Arc::new(ProxySrcPadHandlerInner {
flush_join_handle: sync::Mutex::new(None), flush_join_handle: sync::Mutex::new(None),
sink_pad, proxy_ctx,
})) }))
} }
fn proxy_ctx(&self) -> &ProxyContext {
&self.0.proxy_ctx
}
async fn start_task(pad: PadSrcRef<'_>, element: &gst::Element, dataqueue: DataQueue) { async fn start_task(pad: PadSrcRef<'_>, element: &gst::Element, dataqueue: DataQueue) {
let pad_weak = pad.downgrade(); let pad_weak = pad.downgrade();
let element = element.clone(); let element = element.clone();
@ -937,7 +918,7 @@ impl ProxySrcPadHandler {
{ {
let state = proxysrc.state.lock().await; let state = proxysrc.state.lock().await;
let mut shared_ctx = state.proxy_ctx().lock_shared().await; let mut shared_ctx = state.proxy_ctx().unwrap().lock_shared().await;
if let Some(ref mut pending_queue) = shared_ctx.pending_queue { if let Some(ref mut pending_queue) = shared_ctx.pending_queue {
pending_queue.notify_more_queue_space(); pending_queue.notify_more_queue_space();
} }
@ -963,21 +944,21 @@ impl ProxySrcPadHandler {
Ok(_) => { Ok(_) => {
gst_log!(SRC_CAT, obj: pad.gst_pad(), "Successfully pushed item"); gst_log!(SRC_CAT, obj: pad.gst_pad(), "Successfully pushed item");
let state = proxysrc.state.lock().await; let state = proxysrc.state.lock().await;
let mut shared_ctx = state.proxy_ctx().lock_shared().await; let mut shared_ctx = state.proxy_ctx().unwrap().lock_shared().await;
shared_ctx.last_res = Ok(gst::FlowSuccess::Ok); shared_ctx.last_res = Ok(gst::FlowSuccess::Ok);
} }
Err(gst::FlowError::Flushing) => { Err(gst::FlowError::Flushing) => {
gst_debug!(SRC_CAT, obj: pad.gst_pad(), "Flushing"); gst_debug!(SRC_CAT, obj: pad.gst_pad(), "Flushing");
let state = proxysrc.state.lock().await; let state = proxysrc.state.lock().await;
pad.pause_task().await; pad.pause_task().await;
let mut shared_ctx = state.proxy_ctx().lock_shared().await; let mut shared_ctx = state.proxy_ctx().unwrap().lock_shared().await;
shared_ctx.last_res = Err(gst::FlowError::Flushing); shared_ctx.last_res = Err(gst::FlowError::Flushing);
} }
Err(gst::FlowError::Eos) => { Err(gst::FlowError::Eos) => {
gst_debug!(SRC_CAT, obj: pad.gst_pad(), "EOS"); gst_debug!(SRC_CAT, obj: pad.gst_pad(), "EOS");
let state = proxysrc.state.lock().await; let state = proxysrc.state.lock().await;
pad.pause_task().await; pad.pause_task().await;
let mut shared_ctx = state.proxy_ctx().lock_shared().await; let mut shared_ctx = state.proxy_ctx().unwrap().lock_shared().await;
shared_ctx.last_res = Err(gst::FlowError::Eos); shared_ctx.last_res = Err(gst::FlowError::Eos);
} }
Err(err) => { Err(err) => {
@ -989,7 +970,7 @@ impl ProxySrcPadHandler {
["streaming stopped, reason {}", err] ["streaming stopped, reason {}", err]
); );
let state = proxysrc.state.lock().await; let state = proxysrc.state.lock().await;
let mut shared_ctx = state.proxy_ctx().lock_shared().await; let mut shared_ctx = state.proxy_ctx().unwrap().lock_shared().await;
shared_ctx.last_res = Err(err); shared_ctx.last_res = Err(err);
} }
} }
@ -1010,11 +991,17 @@ impl PadSrcHandler for ProxySrcPadHandler {
gst_log!(SRC_CAT, obj: pad.gst_pad(), "Handling {:?}", event); gst_log!(SRC_CAT, obj: pad.gst_pad(), "Handling {:?}", event);
let sink_pad = PROXY_SINK_PADS
.lock()
.unwrap()
.get(&self.proxy_ctx().name)
.and_then(|sink_pad| sink_pad.upgrade())
.map(|sink_pad| sink_pad.gst_pad().clone());
if event.is_serialized() { if event.is_serialized() {
let element = element.clone(); let element = element.clone();
let inner_weak = Arc::downgrade(&self.0); let inner_weak = Arc::downgrade(&self.0);
let src_pad_weak = pad.downgrade(); let src_pad_weak = pad.downgrade();
let sink_pad_weak = self.0.sink_pad.clone();
Either::Right( Either::Right(
async move { async move {
@ -1047,7 +1034,7 @@ impl PadSrcHandler for ProxySrcPadHandler {
if ret { if ret {
let src_pad = src_pad_weak.upgrade().expect("PadSrc no longer exists"); let src_pad = src_pad_weak.upgrade().expect("PadSrc no longer exists");
gst_log!(SRC_CAT, obj: src_pad.gst_pad(), "Forwarding serialized {:?}", event); gst_log!(SRC_CAT, obj: src_pad.gst_pad(), "Forwarding serialized {:?}", event);
sink_pad_weak.upgrade().expect("PadSink no longer available").gst_pad().push_event(event) sink_pad.unwrap().push_event(event)
} else { } else {
false false
} }
@ -1078,14 +1065,12 @@ impl PadSrcHandler for ProxySrcPadHandler {
} }
gst_log!(SRC_CAT, obj: pad.gst_pad(), "Forwarding non-serialized {:?}", event); gst_log!(SRC_CAT, obj: pad.gst_pad(), "Forwarding non-serialized {:?}", event);
Either::Left( if let Some(sink_pad) = sink_pad {
self.0 Either::Left(sink_pad.push_event(event))
.sink_pad } else {
.upgrade() gst_error!(SRC_CAT, obj: pad.gst_pad(), "No sink pad to forward non-serialized event to");
.expect("PadSink no longer available") Either::Left(false)
.gst_pad() }
.push_event(event),
)
} }
} }
@ -1139,21 +1124,23 @@ impl PadSrcHandler for ProxySrcPadHandler {
#[derive(Debug)] #[derive(Debug)]
struct StateSrc { struct StateSrc {
proxy_ctx: Option<ProxyContext>, src_pad_handler: Option<ProxySrcPadHandler>,
ts_ctx: Option<Context>, ts_ctx: Option<Context>,
} }
impl StateSrc { impl StateSrc {
#[inline] fn proxy_ctx(&self) -> Option<&ProxyContext> {
fn proxy_ctx(&self) -> &ProxyContext { match self.src_pad_handler.as_ref() {
self.proxy_ctx.as_ref().unwrap() Some(handler) => Some(handler.proxy_ctx()),
None => None,
}
} }
} }
impl Default for StateSrc { impl Default for StateSrc {
fn default() -> Self { fn default() -> Self {
StateSrc { StateSrc {
proxy_ctx: None, src_pad_handler: None,
ts_ctx: None, ts_ctx: None,
} }
} }
@ -1219,55 +1206,19 @@ impl ProxySrc {
{ {
let mut shared_ctx = proxy_ctx.lock_shared().await; let mut shared_ctx = proxy_ctx.lock_shared().await;
assert!(shared_ctx.shared_sink_rx.is_some());
shared_ctx
.shared_src_tx
.take()
.unwrap()
.send(SharedSrc {
src_pad: self.src_pad.downgrade(),
})
.map_err(|err| {
gst_error_msg!(
gst::ResourceError::OpenRead,
["Failed to send SharedSrc: {:?}", err]
)
})?;
shared_ctx.dataqueue = Some(dataqueue); shared_ctx.dataqueue = Some(dataqueue);
let mut proxy_src_pads = PROXY_SRC_PADS.lock().unwrap();
assert!(!proxy_src_pads.contains_key(&settings.proxy_context));
proxy_src_pads.insert(settings.proxy_context.to_string(), self.src_pad.downgrade());
} }
state.ts_ctx = Some(ts_ctx); state.ts_ctx = Some(ts_ctx);
state.proxy_ctx = Some(proxy_ctx);
gst_debug!(SRC_CAT, obj: element, "Prepared"); let handler = ProxySrcPadHandler::new(proxy_ctx);
Ok(())
}
async fn complete_preparation(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
let mut state = self.state.lock().await;
let shared_sink_rx = state.proxy_ctx().lock_shared().await.shared_sink_rx.take();
if shared_sink_rx.is_none() {
gst_log!(SRC_CAT, obj: element, "Preparation already completed");
return Ok(());
}
gst_debug!(SRC_CAT, obj: element, "Completing preparation");
let SharedSink { sink_pad } = shared_sink_rx.unwrap().await.map_err(|err| {
gst_error_msg!(
gst::ResourceError::OpenRead,
["Failed to receive SharedSink: {:?}", err]
)
})?;
self.src_pad self.src_pad
.prepare( .prepare(state.ts_ctx.take().unwrap(), &handler)
state.ts_ctx.take().unwrap(),
&ProxySrcPadHandler::new(sink_pad),
)
.await .await
.map_err(|err| { .map_err(|err| {
gst_error_msg!( gst_error_msg!(
@ -1276,7 +1227,9 @@ impl ProxySrc {
) )
})?; })?;
gst_debug!(SRC_CAT, obj: element, "Preparation completed"); state.src_pad_handler = Some(handler);
gst_debug!(SRC_CAT, obj: element, "Prepared");
Ok(()) Ok(())
} }
@ -1285,12 +1238,17 @@ impl ProxySrc {
let mut state = self.state.lock().await; let mut state = self.state.lock().await;
gst_debug!(SRC_CAT, obj: element, "Unpreparing"); gst_debug!(SRC_CAT, obj: element, "Unpreparing");
{
let settings = self.settings.lock().await;
let mut proxy_src_pads = PROXY_SRC_PADS.lock().unwrap();
proxy_src_pads.remove(&settings.proxy_context);
}
self.src_pad.stop_task().await; self.src_pad.stop_task().await;
let _ = self.src_pad.unprepare().await; let _ = self.src_pad.unprepare().await;
if let Some(proxy_ctx) = state.proxy_ctx.take() { let proxy_ctx = state.proxy_ctx().unwrap();
proxy_ctx.unprepare().await; proxy_ctx.unprepare().await;
}
*state = StateSrc::default(); *state = StateSrc::default();
@ -1305,6 +1263,7 @@ impl ProxySrc {
let dataqueue = state let dataqueue = state
.proxy_ctx() .proxy_ctx()
.unwrap()
.lock_shared() .lock_shared()
.await .await
.dataqueue .dataqueue
@ -1327,7 +1286,14 @@ impl ProxySrc {
let pause_completion = self.src_pad.pause_task().await; let pause_completion = self.src_pad.pause_task().await;
if let Some(dataqueue) = state.proxy_ctx().lock_shared().await.dataqueue.as_ref() { if let Some(dataqueue) = state
.proxy_ctx()
.unwrap()
.lock_shared()
.await
.dataqueue
.as_ref()
{
dataqueue.pause().await; dataqueue.pause().await;
dataqueue.clear().await; dataqueue.clear().await;
dataqueue.stop().await; dataqueue.stop().await;
@ -1467,10 +1433,6 @@ impl ElementImpl for ProxySrc {
gst::StateChangeError gst::StateChangeError
})?; })?;
} }
gst::StateChange::ReadyToPaused => {
runtime::executor::block_on(self.complete_preparation(element))
.map_err(|_| gst::StateChangeError)?;
}
gst::StateChange::PlayingToPaused => { gst::StateChange::PlayingToPaused => {
runtime::executor::block_on(self.pause(element)) runtime::executor::block_on(self.pause(element))
.map_err(|_| gst::StateChangeError)?; .map_err(|_| gst::StateChangeError)?;

View file

@ -53,8 +53,8 @@ fn test_push() {
proxysrc.link(&appsink).unwrap(); proxysrc.link(&appsink).unwrap();
fakesrc.set_property("num-buffers", &3i32).unwrap(); fakesrc.set_property("num-buffers", &3i32).unwrap();
proxysink.set_property("proxy-context", &"test").unwrap(); proxysink.set_property("proxy-context", &"test1").unwrap();
proxysrc.set_property("proxy-context", &"test").unwrap(); proxysrc.set_property("proxy-context", &"test1").unwrap();
appsink.set_property("emit-signals", &true).unwrap(); appsink.set_property("emit-signals", &true).unwrap();
@ -102,3 +102,66 @@ fn test_push() {
pipeline.set_state(gst::State::Null).unwrap(); pipeline.set_state(gst::State::Null).unwrap();
} }
#[test]
fn test_from_pipeline_to_pipeline() {
init();
let pipe_1 = gst::Pipeline::new(None);
let fakesrc = gst::ElementFactory::make("fakesrc", None).unwrap();
let pxsink = gst::ElementFactory::make("ts-proxysink", None).unwrap();
let pipe_2 = gst::Pipeline::new(None);
let pxsrc = gst::ElementFactory::make("ts-proxysrc", None).unwrap();
let fakesink = gst::ElementFactory::make("fakesink", None).unwrap();
pipe_1.add_many(&[&fakesrc, &pxsink]).unwrap();
fakesrc.link(&pxsink).unwrap();
pipe_2.add_many(&[&pxsrc, &fakesink]).unwrap();
pxsrc.link(&fakesink).unwrap();
pxsink.set_property("proxy-context", &"test2").unwrap();
pxsrc.set_property("proxy-context", &"test2").unwrap();
pipe_1.set_state(gst::State::Paused).unwrap();
pipe_2.set_state(gst::State::Paused).unwrap();
let _ = pipe_1.get_state(gst::CLOCK_TIME_NONE);
let _ = pipe_2.get_state(gst::CLOCK_TIME_NONE);
pipe_1.set_state(gst::State::Null).unwrap();
pipe_2.set_state(gst::State::Null).unwrap();
}
#[test]
fn test_from_pipeline_to_pipeline_and_back() {
init();
let pipe_1 = gst::Pipeline::new(None);
let pxsrc_1 = gst::ElementFactory::make("ts-proxysrc", None).unwrap();
let pxsink_1 = gst::ElementFactory::make("ts-proxysink", None).unwrap();
let pipe_2 = gst::Pipeline::new(None);
let pxsrc_2 = gst::ElementFactory::make("ts-proxysrc", None).unwrap();
let pxsink_2 = gst::ElementFactory::make("ts-proxysink", None).unwrap();
pipe_1.add_many(&[&pxsrc_1, &pxsink_1]).unwrap();
pxsrc_1.link(&pxsink_1).unwrap();
pipe_2.add_many(&[&pxsrc_2, &pxsink_2]).unwrap();
pxsrc_2.link(&pxsink_2).unwrap();
pxsrc_1.set_property("proxy-context", &"test3").unwrap();
pxsink_2.set_property("proxy-context", &"test3").unwrap();
pxsrc_2.set_property("proxy-context", &"test4").unwrap();
pxsink_1.set_property("proxy-context", &"test4").unwrap();
pipe_1.set_state(gst::State::Paused).unwrap();
pipe_2.set_state(gst::State::Paused).unwrap();
pipe_1.set_state(gst::State::Null).unwrap();
pipe_2.set_state(gst::State::Null).unwrap();
}