diff --git a/gst-plugin-threadshare/Cargo.toml b/gst-plugin-threadshare/Cargo.toml index 2ea34eb7..fcde86e9 100644 --- a/gst-plugin-threadshare/Cargo.toml +++ b/gst-plugin-threadshare/Cargo.toml @@ -14,7 +14,7 @@ gobject-sys = { git = "https://github.com/gtk-rs/sys" } gio-sys = { git = "https://github.com/gtk-rs/sys" } glib = { git = "https://github.com/gtk-rs/glib" } gio = { git = "https://github.com/gtk-rs/gio" } -gst = { package = "gstreamer", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" } +gst = { package = "gstreamer", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features=["v1_10"] } gst-app = { package = "gstreamer-app", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" } gst-check = { package = "gstreamer-check", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" } gst-net = { package = "gstreamer-net", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" } diff --git a/gst-plugin-threadshare/src/proxy.rs b/gst-plugin-threadshare/src/proxy.rs index b55d9561..17ed249e 100644 --- a/gst-plugin-threadshare/src/proxy.rs +++ b/gst-plugin-threadshare/src/proxy.rs @@ -49,6 +49,10 @@ use super::dataqueue::{DataQueue, DataQueueItem}; lazy_static! { static ref PROXY_CONTEXTS: Mutex>>> = Mutex::new(HashMap::new()); + static ref PROXY_SRC_PADS: std::sync::Mutex> = + std::sync::Mutex::new(HashMap::new()); + static ref PROXY_SINK_PADS: std::sync::Mutex> = + std::sync::Mutex::new(HashMap::new()); } const DEFAULT_PROXY_CONTEXT: &str = ""; @@ -185,18 +189,6 @@ impl PendingQueue { } } -/// ProxySrc fields which are necessary for ProxySink -#[derive(Debug)] -struct SharedSrc { - src_pad: PadSrcWeak, -} - -/// ProxySink fields which are necessary for ProxySrc -#[derive(Debug)] -struct SharedSink { - sink_pad: PadSinkWeak, -} - #[derive(Debug)] struct ProxyContextInner { name: String, @@ -205,10 +197,6 @@ struct ProxyContextInner { pending_queue: Option, have_sink: bool, have_src: bool, - shared_src_tx: Option>, - shared_src_rx: Option>, - shared_sink_tx: Option>, - shared_sink_rx: Option>, } impl ProxyContextInner { @@ -232,6 +220,7 @@ impl Drop for ProxyContextInner { struct ProxyContext { shared: Arc>, as_sink: bool, + name: String, } impl ProxyContext { @@ -254,7 +243,11 @@ impl ProxyContext { } proxy_ctx = Some({ - let proxy_ctx = ProxyContext { shared, as_sink }; + let proxy_ctx = ProxyContext { + shared, + as_sink, + name: name.into(), + }; { let mut shared = proxy_ctx.lock_shared().await; if as_sink { @@ -270,8 +263,6 @@ impl ProxyContext { } if proxy_ctx.is_none() { - let (shared_src_tx, shared_src_rx) = oneshot::channel(); - let (shared_sink_tx, shared_sink_rx) = oneshot::channel(); let shared = Arc::new(Mutex::new(ProxyContextInner { name: name.into(), dataqueue: None, @@ -279,15 +270,15 @@ impl ProxyContext { pending_queue: None, have_sink: as_sink, have_src: !as_sink, - shared_src_tx: Some(shared_src_tx), - shared_src_rx: Some(shared_src_rx), - shared_sink_tx: Some(shared_sink_tx), - shared_sink_rx: Some(shared_sink_rx), })); proxy_ctxs.insert(name.into(), Arc::downgrade(&shared)); - proxy_ctx = Some(ProxyContext { shared, as_sink }); + proxy_ctx = Some(ProxyContext { + shared, + as_sink, + name: name.into(), + }); } proxy_ctx @@ -310,20 +301,24 @@ impl ProxyContext { #[derive(Debug)] struct ProxySinkPadHandlerInner { - flush_join_handle: sync::Mutex>>>, - src_pad: PadSrcWeak, + proxy_ctx: ProxyContext, + flush_rx: sync::Mutex>>, } #[derive(Clone, Debug)] struct ProxySinkPadHandler(Arc); impl ProxySinkPadHandler { - fn new(src_pad: PadSrcWeak) -> Self { + fn new(proxy_ctx: ProxyContext) -> Self { ProxySinkPadHandler(Arc::new(ProxySinkPadHandlerInner { - flush_join_handle: sync::Mutex::new(None), - src_pad, + proxy_ctx: proxy_ctx, + flush_rx: sync::Mutex::new(None), })) } + + fn proxy_ctx(&self) -> &ProxyContext { + &self.0.proxy_ctx + } } impl PadSinkHandler for ProxySinkPadHandler { @@ -399,16 +394,11 @@ impl PadSinkHandler for ProxySinkPadHandler { EventView::FlushStop(..) => { let inner = inner_weak.upgrade().unwrap(); - let flush_join_handle = inner.flush_join_handle.lock().unwrap().take(); - if let Some(flush_join_handle) = flush_join_handle { - gst_debug!(SINK_CAT, obj: pad.gst_pad(), "Waiting for FlushStart to complete"); - if let Ok(Ok(())) = flush_join_handle.await { - let _ = proxysink.start(&element).await; - } else { - gst_debug!(SINK_CAT, obj: pad.gst_pad(), "FlushStop ignored: FlushStart failed to complete"); - } - } else { - gst_debug!(SINK_CAT, obj: pad.gst_pad(), "FlushStop ignored: no Flush in progress"); + let flush_rx = inner.flush_rx.lock().unwrap().take(); + + if let Some(flush_rx) = flush_rx { + let _ = flush_rx.await; + let _ = proxysink.start(&element).await; } } _ => (), @@ -424,46 +414,58 @@ impl PadSinkHandler for ProxySinkPadHandler { .boxed(), ) } else { - let src_pad = self - .0 - .src_pad - .upgrade() - .expect("PadSrc no longer available"); + let src_pad = PROXY_SRC_PADS + .lock() + .unwrap() + .get(&self.proxy_ctx().name) + .and_then(|src_pad| src_pad.upgrade()) + .map(|src_pad| src_pad.gst_pad().clone()); + // The usual behaviour on flush start is to spawn on a pad context, + // proxy elements are a special case as we don't necessarily have + // a pad context to spawn on. if let EventView::FlushStart(..) = event.view() { - let mut flush_join_handle = self.0.flush_join_handle.lock().unwrap(); - if flush_join_handle.is_none() { - gst_log!(SINK_CAT, obj: pad.gst_pad(), "Handling {:?}", event); - let element = element.clone(); - *flush_join_handle = Some(src_pad.spawn(async move { - ProxySink::from_instance(&element).stop(&element).await - })); - } else { - gst_debug!(SINK_CAT, obj: pad.gst_pad(), "FlushStart ignored: previous Flush in progress"); - } + let (flush_tx, flush_rx) = oneshot::channel(); + *self.0.flush_rx.lock().unwrap() = Some(flush_rx); + + element.call_async(move |element| { + let _ = runtime::executor::block_on( + ProxySink::from_instance(element).stop(element), + ); + let _ = flush_tx.send(()); + }); } - gst_log!(SINK_CAT, obj: pad.gst_pad(), "Fowarding non-serialized {:?}", event); - Either::Left(src_pad.gst_pad().push_event(event)) + if let Some(src_pad) = src_pad { + Either::Left(src_pad.push_event(event)) + } else { + gst_error!(SINK_CAT, obj: pad.gst_pad(), "No src pad to forward non-serialized event to"); + Either::Left(true) + } } } } #[derive(Debug)] struct StateSink { - proxy_ctx: Option, + sink_pad_handler: Option, } impl StateSink { #[inline] - fn proxy_ctx(&self) -> &ProxyContext { - self.proxy_ctx.as_ref().unwrap() + fn proxy_ctx(&self) -> Option<&ProxyContext> { + match self.sink_pad_handler.as_ref() { + Some(handler) => Some(handler.proxy_ctx()), + None => None, + } } } impl Default for StateSink { fn default() -> Self { - StateSink { proxy_ctx: None } + StateSink { + sink_pad_handler: None, + } } } @@ -500,10 +502,12 @@ impl ProxySink { let more_queue_space_receiver = { let state = sink.state.lock().await; - let proxy_ctx = state.proxy_ctx.as_ref(); + let proxy_ctx = state.proxy_ctx(); + if proxy_ctx.is_none() { return; } + let mut shared_ctx = proxy_ctx.unwrap().lock_shared().await; gst_log!(SINK_CAT, obj: &element, "Trying to empty pending queue"); @@ -561,9 +565,13 @@ impl ProxySink { let wait_fut = { let state = self.state.lock().await; - let proxy_ctx = state.proxy_ctx.as_ref().ok_or(gst::FlowError::Error)?; + let proxy_ctx = state.proxy_ctx().ok_or(gst::FlowError::Error)?; let mut shared_ctx = proxy_ctx.lock_shared().await; + /* We've taken the lock again, make sure not to recreate + * a pending queue if tearing down */ + shared_ctx.last_res?; + let item = { let ProxyContextInner { ref mut pending_queue, @@ -658,7 +666,7 @@ impl ProxySink { } let state = self.state.lock().await; - let shared_ctx = state.proxy_ctx().lock_shared().await; + let shared_ctx = state.proxy_ctx().unwrap().lock_shared().await; shared_ctx.last_res } @@ -666,7 +674,9 @@ impl ProxySink { let mut state = self.state.lock().await; gst_debug!(SINK_CAT, obj: element, "Preparing"); - let proxy_ctx = ProxyContext::get(&self.settings.lock().await.proxy_context, true) + let proxy_context = self.settings.lock().await.proxy_context.to_string(); + + let proxy_ctx = ProxyContext::get(&proxy_context, true) .await .ok_or_else(|| { gst_error_msg!( @@ -676,62 +686,27 @@ impl ProxySink { })?; { - let mut shared_ctx = proxy_ctx.lock_shared().await; - assert!(shared_ctx.shared_src_rx.is_some()); - shared_ctx - .shared_sink_tx - .take() - .unwrap() - .send(SharedSink { - sink_pad: self.sink_pad.downgrade(), - }) - .map_err(|err| { - gst_error_msg!( - gst::ResourceError::OpenRead, - ["Failed to send SharedSink: {:?}", err] - ) - })?; + let mut proxy_sink_pads = PROXY_SINK_PADS.lock().unwrap(); + assert!(!proxy_sink_pads.contains_key(&proxy_context)); + proxy_sink_pads.insert(proxy_context, self.sink_pad.downgrade()); } - state.proxy_ctx = Some(proxy_ctx); + let handler = ProxySinkPadHandler::new(proxy_ctx); + + self.sink_pad.prepare(&handler).await; + + state.sink_pad_handler = Some(handler); gst_debug!(SINK_CAT, obj: element, "Prepared"); Ok(()) } - async fn complete_preparation(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> { - let state = self.state.lock().await; - - let shared_src_rx = state.proxy_ctx().lock_shared().await.shared_src_rx.take(); - if shared_src_rx.is_none() { - gst_log!(SINK_CAT, obj: element, "Preparation already completed"); - return Ok(()); - } - - gst_debug!(SINK_CAT, obj: element, "Completing preparation"); - - let SharedSrc { src_pad } = shared_src_rx.unwrap().await.map_err(|err| { - gst_error_msg!( - gst::ResourceError::OpenRead, - ["Failed to receive SharedSrc: {:?}", err] - ) - })?; - - self.sink_pad - .prepare(&ProxySinkPadHandler::new(src_pad)) - .await; - - gst_debug!(SINK_CAT, obj: element, "Preparation completed"); - - Ok(()) - } - async fn unprepare(&self, element: &gst::Element) -> Result<(), ()> { let mut state = self.state.lock().await; gst_debug!(SINK_CAT, obj: element, "Unpreparing"); - let proxy_ctx = state.proxy_ctx.take().unwrap(); + let proxy_ctx = state.proxy_ctx().unwrap(); proxy_ctx.unprepare().await; self.sink_pad.unprepare().await; @@ -746,7 +721,13 @@ impl ProxySink { let state = self.state.lock().await; gst_debug!(SINK_CAT, obj: element, "Starting"); - let mut shared_ctx = state.proxy_ctx().lock_shared().await; + { + let settings = self.settings.lock().await; + let mut proxy_sink_pads = PROXY_SINK_PADS.lock().unwrap(); + proxy_sink_pads.remove(&settings.proxy_context); + } + + let mut shared_ctx = state.proxy_ctx().unwrap().lock_shared().await; shared_ctx.last_res = Ok(gst::FlowSuccess::Ok); gst_debug!(SINK_CAT, obj: element, "Started"); @@ -758,7 +739,7 @@ impl ProxySink { let state = self.state.lock().await; gst_debug!(SINK_CAT, obj: element, "Stopping"); - let mut shared_ctx = state.proxy_ctx().lock_shared().await; + let mut shared_ctx = state.proxy_ctx().unwrap().lock_shared().await; let _ = shared_ctx.pending_queue.take(); shared_ctx.last_res = Err(gst::FlowError::Flushing); @@ -863,10 +844,6 @@ impl ElementImpl for ProxySink { gst::StateChangeError })?; } - gst::StateChange::ReadyToPaused => { - runtime::executor::block_on(self.complete_preparation(element)) - .map_err(|_| gst::StateChangeError)?; - } gst::StateChange::PausedToReady => { runtime::executor::block_on(self.stop(element)) .map_err(|_| gst::StateChangeError)?; @@ -891,20 +868,24 @@ impl ElementImpl for ProxySink { #[derive(Debug)] struct ProxySrcPadHandlerInner { flush_join_handle: sync::Mutex>>>, - sink_pad: PadSinkWeak, + proxy_ctx: ProxyContext, } #[derive(Clone, Debug)] struct ProxySrcPadHandler(Arc); impl ProxySrcPadHandler { - fn new(sink_pad: PadSinkWeak) -> Self { + fn new(proxy_ctx: ProxyContext) -> Self { ProxySrcPadHandler(Arc::new(ProxySrcPadHandlerInner { flush_join_handle: sync::Mutex::new(None), - sink_pad, + proxy_ctx, })) } + fn proxy_ctx(&self) -> &ProxyContext { + &self.0.proxy_ctx + } + async fn start_task(pad: PadSrcRef<'_>, element: &gst::Element, dataqueue: DataQueue) { let pad_weak = pad.downgrade(); let element = element.clone(); @@ -937,7 +918,7 @@ impl ProxySrcPadHandler { { let state = proxysrc.state.lock().await; - let mut shared_ctx = state.proxy_ctx().lock_shared().await; + let mut shared_ctx = state.proxy_ctx().unwrap().lock_shared().await; if let Some(ref mut pending_queue) = shared_ctx.pending_queue { pending_queue.notify_more_queue_space(); } @@ -963,21 +944,21 @@ impl ProxySrcPadHandler { Ok(_) => { gst_log!(SRC_CAT, obj: pad.gst_pad(), "Successfully pushed item"); let state = proxysrc.state.lock().await; - let mut shared_ctx = state.proxy_ctx().lock_shared().await; + let mut shared_ctx = state.proxy_ctx().unwrap().lock_shared().await; shared_ctx.last_res = Ok(gst::FlowSuccess::Ok); } Err(gst::FlowError::Flushing) => { gst_debug!(SRC_CAT, obj: pad.gst_pad(), "Flushing"); let state = proxysrc.state.lock().await; pad.pause_task().await; - let mut shared_ctx = state.proxy_ctx().lock_shared().await; + let mut shared_ctx = state.proxy_ctx().unwrap().lock_shared().await; shared_ctx.last_res = Err(gst::FlowError::Flushing); } Err(gst::FlowError::Eos) => { gst_debug!(SRC_CAT, obj: pad.gst_pad(), "EOS"); let state = proxysrc.state.lock().await; pad.pause_task().await; - let mut shared_ctx = state.proxy_ctx().lock_shared().await; + let mut shared_ctx = state.proxy_ctx().unwrap().lock_shared().await; shared_ctx.last_res = Err(gst::FlowError::Eos); } Err(err) => { @@ -989,7 +970,7 @@ impl ProxySrcPadHandler { ["streaming stopped, reason {}", err] ); let state = proxysrc.state.lock().await; - let mut shared_ctx = state.proxy_ctx().lock_shared().await; + let mut shared_ctx = state.proxy_ctx().unwrap().lock_shared().await; shared_ctx.last_res = Err(err); } } @@ -1010,11 +991,17 @@ impl PadSrcHandler for ProxySrcPadHandler { gst_log!(SRC_CAT, obj: pad.gst_pad(), "Handling {:?}", event); + let sink_pad = PROXY_SINK_PADS + .lock() + .unwrap() + .get(&self.proxy_ctx().name) + .and_then(|sink_pad| sink_pad.upgrade()) + .map(|sink_pad| sink_pad.gst_pad().clone()); + if event.is_serialized() { let element = element.clone(); let inner_weak = Arc::downgrade(&self.0); let src_pad_weak = pad.downgrade(); - let sink_pad_weak = self.0.sink_pad.clone(); Either::Right( async move { @@ -1047,7 +1034,7 @@ impl PadSrcHandler for ProxySrcPadHandler { if ret { let src_pad = src_pad_weak.upgrade().expect("PadSrc no longer exists"); gst_log!(SRC_CAT, obj: src_pad.gst_pad(), "Forwarding serialized {:?}", event); - sink_pad_weak.upgrade().expect("PadSink no longer available").gst_pad().push_event(event) + sink_pad.unwrap().push_event(event) } else { false } @@ -1078,14 +1065,12 @@ impl PadSrcHandler for ProxySrcPadHandler { } gst_log!(SRC_CAT, obj: pad.gst_pad(), "Forwarding non-serialized {:?}", event); - Either::Left( - self.0 - .sink_pad - .upgrade() - .expect("PadSink no longer available") - .gst_pad() - .push_event(event), - ) + if let Some(sink_pad) = sink_pad { + Either::Left(sink_pad.push_event(event)) + } else { + gst_error!(SRC_CAT, obj: pad.gst_pad(), "No sink pad to forward non-serialized event to"); + Either::Left(false) + } } } @@ -1139,21 +1124,23 @@ impl PadSrcHandler for ProxySrcPadHandler { #[derive(Debug)] struct StateSrc { - proxy_ctx: Option, + src_pad_handler: Option, ts_ctx: Option, } impl StateSrc { - #[inline] - fn proxy_ctx(&self) -> &ProxyContext { - self.proxy_ctx.as_ref().unwrap() + fn proxy_ctx(&self) -> Option<&ProxyContext> { + match self.src_pad_handler.as_ref() { + Some(handler) => Some(handler.proxy_ctx()), + None => None, + } } } impl Default for StateSrc { fn default() -> Self { StateSrc { - proxy_ctx: None, + src_pad_handler: None, ts_ctx: None, } } @@ -1219,55 +1206,19 @@ impl ProxySrc { { let mut shared_ctx = proxy_ctx.lock_shared().await; - assert!(shared_ctx.shared_sink_rx.is_some()); - shared_ctx - .shared_src_tx - .take() - .unwrap() - .send(SharedSrc { - src_pad: self.src_pad.downgrade(), - }) - .map_err(|err| { - gst_error_msg!( - gst::ResourceError::OpenRead, - ["Failed to send SharedSrc: {:?}", err] - ) - })?; - shared_ctx.dataqueue = Some(dataqueue); + + let mut proxy_src_pads = PROXY_SRC_PADS.lock().unwrap(); + assert!(!proxy_src_pads.contains_key(&settings.proxy_context)); + proxy_src_pads.insert(settings.proxy_context.to_string(), self.src_pad.downgrade()); } state.ts_ctx = Some(ts_ctx); - state.proxy_ctx = Some(proxy_ctx); - gst_debug!(SRC_CAT, obj: element, "Prepared"); - - Ok(()) - } - - async fn complete_preparation(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> { - let mut state = self.state.lock().await; - - let shared_sink_rx = state.proxy_ctx().lock_shared().await.shared_sink_rx.take(); - if shared_sink_rx.is_none() { - gst_log!(SRC_CAT, obj: element, "Preparation already completed"); - return Ok(()); - } - - gst_debug!(SRC_CAT, obj: element, "Completing preparation"); - - let SharedSink { sink_pad } = shared_sink_rx.unwrap().await.map_err(|err| { - gst_error_msg!( - gst::ResourceError::OpenRead, - ["Failed to receive SharedSink: {:?}", err] - ) - })?; + let handler = ProxySrcPadHandler::new(proxy_ctx); self.src_pad - .prepare( - state.ts_ctx.take().unwrap(), - &ProxySrcPadHandler::new(sink_pad), - ) + .prepare(state.ts_ctx.take().unwrap(), &handler) .await .map_err(|err| { gst_error_msg!( @@ -1276,7 +1227,9 @@ impl ProxySrc { ) })?; - gst_debug!(SRC_CAT, obj: element, "Preparation completed"); + state.src_pad_handler = Some(handler); + + gst_debug!(SRC_CAT, obj: element, "Prepared"); Ok(()) } @@ -1285,12 +1238,17 @@ impl ProxySrc { let mut state = self.state.lock().await; gst_debug!(SRC_CAT, obj: element, "Unpreparing"); + { + let settings = self.settings.lock().await; + let mut proxy_src_pads = PROXY_SRC_PADS.lock().unwrap(); + proxy_src_pads.remove(&settings.proxy_context); + } + self.src_pad.stop_task().await; let _ = self.src_pad.unprepare().await; - if let Some(proxy_ctx) = state.proxy_ctx.take() { - proxy_ctx.unprepare().await; - } + let proxy_ctx = state.proxy_ctx().unwrap(); + proxy_ctx.unprepare().await; *state = StateSrc::default(); @@ -1305,6 +1263,7 @@ impl ProxySrc { let dataqueue = state .proxy_ctx() + .unwrap() .lock_shared() .await .dataqueue @@ -1327,7 +1286,14 @@ impl ProxySrc { let pause_completion = self.src_pad.pause_task().await; - if let Some(dataqueue) = state.proxy_ctx().lock_shared().await.dataqueue.as_ref() { + if let Some(dataqueue) = state + .proxy_ctx() + .unwrap() + .lock_shared() + .await + .dataqueue + .as_ref() + { dataqueue.pause().await; dataqueue.clear().await; dataqueue.stop().await; @@ -1467,10 +1433,6 @@ impl ElementImpl for ProxySrc { gst::StateChangeError })?; } - gst::StateChange::ReadyToPaused => { - runtime::executor::block_on(self.complete_preparation(element)) - .map_err(|_| gst::StateChangeError)?; - } gst::StateChange::PlayingToPaused => { runtime::executor::block_on(self.pause(element)) .map_err(|_| gst::StateChangeError)?; diff --git a/gst-plugin-threadshare/tests/proxy.rs b/gst-plugin-threadshare/tests/proxy.rs index 07ed3735..b593b16e 100644 --- a/gst-plugin-threadshare/tests/proxy.rs +++ b/gst-plugin-threadshare/tests/proxy.rs @@ -53,8 +53,8 @@ fn test_push() { proxysrc.link(&appsink).unwrap(); fakesrc.set_property("num-buffers", &3i32).unwrap(); - proxysink.set_property("proxy-context", &"test").unwrap(); - proxysrc.set_property("proxy-context", &"test").unwrap(); + proxysink.set_property("proxy-context", &"test1").unwrap(); + proxysrc.set_property("proxy-context", &"test1").unwrap(); appsink.set_property("emit-signals", &true).unwrap(); @@ -102,3 +102,66 @@ fn test_push() { pipeline.set_state(gst::State::Null).unwrap(); } + +#[test] +fn test_from_pipeline_to_pipeline() { + init(); + + let pipe_1 = gst::Pipeline::new(None); + let fakesrc = gst::ElementFactory::make("fakesrc", None).unwrap(); + let pxsink = gst::ElementFactory::make("ts-proxysink", None).unwrap(); + + let pipe_2 = gst::Pipeline::new(None); + let pxsrc = gst::ElementFactory::make("ts-proxysrc", None).unwrap(); + let fakesink = gst::ElementFactory::make("fakesink", None).unwrap(); + + pipe_1.add_many(&[&fakesrc, &pxsink]).unwrap(); + fakesrc.link(&pxsink).unwrap(); + + pipe_2.add_many(&[&pxsrc, &fakesink]).unwrap(); + pxsrc.link(&fakesink).unwrap(); + + pxsink.set_property("proxy-context", &"test2").unwrap(); + pxsrc.set_property("proxy-context", &"test2").unwrap(); + + pipe_1.set_state(gst::State::Paused).unwrap(); + pipe_2.set_state(gst::State::Paused).unwrap(); + + let _ = pipe_1.get_state(gst::CLOCK_TIME_NONE); + let _ = pipe_2.get_state(gst::CLOCK_TIME_NONE); + + pipe_1.set_state(gst::State::Null).unwrap(); + + pipe_2.set_state(gst::State::Null).unwrap(); +} + +#[test] +fn test_from_pipeline_to_pipeline_and_back() { + init(); + + let pipe_1 = gst::Pipeline::new(None); + let pxsrc_1 = gst::ElementFactory::make("ts-proxysrc", None).unwrap(); + let pxsink_1 = gst::ElementFactory::make("ts-proxysink", None).unwrap(); + + let pipe_2 = gst::Pipeline::new(None); + let pxsrc_2 = gst::ElementFactory::make("ts-proxysrc", None).unwrap(); + let pxsink_2 = gst::ElementFactory::make("ts-proxysink", None).unwrap(); + + pipe_1.add_many(&[&pxsrc_1, &pxsink_1]).unwrap(); + pxsrc_1.link(&pxsink_1).unwrap(); + + pipe_2.add_many(&[&pxsrc_2, &pxsink_2]).unwrap(); + pxsrc_2.link(&pxsink_2).unwrap(); + + pxsrc_1.set_property("proxy-context", &"test3").unwrap(); + pxsink_2.set_property("proxy-context", &"test3").unwrap(); + + pxsrc_2.set_property("proxy-context", &"test4").unwrap(); + pxsink_1.set_property("proxy-context", &"test4").unwrap(); + + pipe_1.set_state(gst::State::Paused).unwrap(); + pipe_2.set_state(gst::State::Paused).unwrap(); + + pipe_1.set_state(gst::State::Null).unwrap(); + pipe_2.set_state(gst::State::Null).unwrap(); +}