diff --git a/gst-plugin-threadshare/src/inputselector.rs b/gst-plugin-threadshare/src/inputselector.rs index c1540054..8e639b29 100644 --- a/gst-plugin-threadshare/src/inputselector.rs +++ b/gst-plugin-threadshare/src/inputselector.rs @@ -20,7 +20,7 @@ use either::Either; use futures::executor::block_on; use futures::future::BoxFuture; use futures::future::{abortable, AbortHandle}; -use futures::lock::{Mutex, MutexGuard}; +use futures::lock::Mutex; use futures::prelude::*; use glib; @@ -109,6 +109,7 @@ static PROPERTIES: [subclass::Property; 3] = [ #[derive(Debug)] struct InputSelectorPadSinkHandlerInner { segment: Option, + send_sticky: bool, abort_handle: Option, } @@ -116,6 +117,7 @@ impl Default for InputSelectorPadSinkHandlerInner { fn default() -> Self { InputSelectorPadSinkHandlerInner { segment: None, + send_sticky: true, abort_handle: None, } } @@ -131,11 +133,6 @@ impl InputSelectorPadSinkHandler { ))) } - #[inline] - async fn lock(&self) -> MutexGuard<'_, InputSelectorPadSinkHandlerInner> { - self.0.lock().await - } - /* Wait until specified time */ async fn sync(&self, element: &gst::Element, running_time: gst::ClockTime) { let now = get_current_running_time(&element); @@ -153,52 +150,52 @@ impl InputSelectorPadSinkHandler { buffer: gst::Buffer, ) -> Result { let inputselector = InputSelector::from_instance(element); - let mut inner = self.lock().await; + let mut state = inputselector.state.lock().await; + let mut inner = self.0.lock().await; + + let mut sync_future = None; if let Some(segment) = &inner.segment { if let Some(segment) = segment.downcast_ref::() { let rtime = segment.to_running_time(buffer.get_pts()); let (sync_fut, abort_handle) = abortable(self.sync(&element, rtime)); inner.abort_handle = Some(abort_handle); - drop(inner); - let _ = sync_fut.await; + sync_future = Some(sync_fut.map_err(|_| gst::FlowError::Flushing)); } } - let mut state = inputselector.state.lock().await; - if state.active_sinkpad.as_ref() == Some(pad.gst_pad()) { gst_log!(CAT, obj: pad.gst_pad(), "Forwarding {:?}", buffer); - if state.send_sticky { - let mut stickies: Vec = vec![]; - + let mut stickies: Vec = vec![]; + if inner.send_sticky || state.send_sticky { pad.gst_pad().sticky_events_foreach(|event| { - let mut forward = true; - - if event.get_type() == gst::EventType::StreamStart { - forward = state.send_stream_start; - state.send_stream_start = false; - } - - if forward { - stickies.push(event.clone()); - } - + stickies.push(event.clone()); Ok(Some(event)) }); - for event in &stickies { - inputselector.src_pad.push_event(event.clone()).await; - } - + inner.send_sticky = false; state.send_sticky = false; } - + drop(inner); drop(state); + if let Some(sync_fut) = sync_future { + sync_fut.await?; + } + + for event in &stickies { + inputselector.src_pad.push_event(event.clone()).await; + } + inputselector.src_pad.push(buffer).await } else { + drop(inner); + drop(state); + + if let Some(sync_fut) = sync_future { + sync_fut.await?; + } gst_log!(CAT, obj: pad.gst_pad(), "Dropping {:?}", buffer); Ok(gst::FlowSuccess::Ok) } @@ -259,14 +256,28 @@ impl PadSinkHandler for InputSelectorPadSinkHandler { let this = self.clone(); Either::Right( async move { + let mut inner = this.0.lock().await; + + // Remember the segment for later use match event.view() { gst::EventView::Segment(e) => { - let mut inner = this.lock().await; inner.segment = Some(e.get_segment().clone()); } _ => (), } - true + + // We sent sticky events together with the next buffer once it becomes + // the active pad. + // + // TODO: Other serialized events for the active pad can also be forwarded + // here, and sticky events could be forwarded directly. Needs forwarding of + // all other sticky events first! + if event.is_sticky() { + inner.send_sticky = true; + true + } else { + true + } } .boxed(), ) @@ -277,7 +288,7 @@ impl PadSinkHandler for InputSelectorPadSinkHandler { /* Unblock downstream */ inputselector.src_pad.gst_pad().push_event(event.clone()); - let mut inner = block_on(self.lock()); + let mut inner = block_on(self.0.lock()); if let Some(abort_handle) = inner.abort_handle.take() { abort_handle.abort(); @@ -322,15 +333,13 @@ impl PadSrcHandler for InputSelectorPadSrcHandler { struct State { active_sinkpad: Option, send_sticky: bool, - send_stream_start: bool, } impl Default for State { fn default() -> State { State { active_sinkpad: None, - send_sticky: false, - send_stream_start: true, + send_sticky: true, } } } @@ -494,9 +503,17 @@ impl ObjectImpl for InputSelector { settings.context_wait = value.get_some().expect("type checked upstream"); } subclass::Property("active-pad", ..) => { + let pad = value.get::().expect("type checked upstream"); let mut state = block_on(self.state.lock()); - state.active_sinkpad = value.get::().expect("type checked upstream"); - state.send_sticky = true; + let pads = block_on(self.pads.lock()); + if let Some(pad) = pad { + if pads.sink_pads.get(&pad).is_some() { + state.active_sinkpad = Some(pad); + state.send_sticky = true; + } + } else { + state.active_sinkpad = None; + } } _ => unimplemented!(), } @@ -582,13 +599,14 @@ impl ElementImpl for InputSelector { let ret = sink_pad.gst_pad().clone(); block_on(sink_pad.prepare(&InputSelectorPadSinkHandler::new())); - pads.sink_pads.insert(ret.clone(), sink_pad); if state.active_sinkpad.is_none() { state.active_sinkpad = Some(ret.clone()); state.send_sticky = true; } + pads.sink_pads.insert(ret.clone(), sink_pad); + Some(ret) } diff --git a/gst-plugin-threadshare/tests/inputselector.rs b/gst-plugin-threadshare/tests/inputselector.rs index 1c0e7bc3..d3e1a0a5 100644 --- a/gst-plugin-threadshare/tests/inputselector.rs +++ b/gst-plugin-threadshare/tests/inputselector.rs @@ -48,7 +48,7 @@ fn test_active_pad() { .unwrap() .get::() .unwrap(); - assert!(active_pad == h1.get_srcpad().unwrap().get_peer()); + assert_eq!(active_pad, h1.get_srcpad().unwrap().get_peer()); is.set_property("active-pad", &h2.get_srcpad().unwrap().get_peer()) .unwrap(); @@ -57,7 +57,7 @@ fn test_active_pad() { .unwrap() .get::() .unwrap(); - assert!(active_pad == h2.get_srcpad().unwrap().get_peer()); + assert_eq!(active_pad, h2.get_srcpad().unwrap().get_peer()); h1.set_src_caps_str("foo/bar"); h2.set_src_caps_str("foo/bar"); @@ -66,42 +66,45 @@ fn test_active_pad() { /* Push buffer on inactive pad, we should not receive anything */ let buf = gst::Buffer::new(); - assert!(h1.push(buf) == Ok(gst::FlowSuccess::Ok)); - assert!(h1.buffers_received() == 0); + assert_eq!(h1.push(buf), Ok(gst::FlowSuccess::Ok)); + assert_eq!(h1.buffers_received(), 0); /* Buffers pushed on the active pad should be received */ let buf = gst::Buffer::new(); - assert!(h2.push(buf) == Ok(gst::FlowSuccess::Ok)); - assert!(h1.buffers_received() == 1); + assert_eq!(h2.push(buf), Ok(gst::FlowSuccess::Ok)); + assert_eq!(h1.buffers_received(), 1); - assert!(h1.events_received() == 4); + assert_eq!(h1.events_received(), 4); let event = h1.pull_event().unwrap(); - assert!(event.get_type() == gst::EventType::CustomDownstreamSticky); + assert_eq!(event.get_type(), gst::EventType::CustomDownstreamSticky); let event = h1.pull_event().unwrap(); - assert!(event.get_type() == gst::EventType::StreamStart); + assert_eq!(event.get_type(), gst::EventType::StreamStart); let event = h1.pull_event().unwrap(); - assert!(event.get_type() == gst::EventType::Caps); + assert_eq!(event.get_type(), gst::EventType::Caps); let event = h1.pull_event().unwrap(); - assert!(event.get_type() == gst::EventType::Segment); + assert_eq!(event.get_type(), gst::EventType::Segment); /* Push another buffer on the active pad, there should be no new events */ let buf = gst::Buffer::new(); - assert!(h2.push(buf) == Ok(gst::FlowSuccess::Ok)); - assert!(h1.buffers_received() == 2); - assert!(h1.events_received() == 4); + assert_eq!(h2.push(buf), Ok(gst::FlowSuccess::Ok)); + assert_eq!(h1.buffers_received(), 2); + assert_eq!(h1.events_received(), 4); - /* Switch the active pad and push a buffer, we should receive segment and caps again */ + /* Switch the active pad and push a buffer, we should receive stream-start, segment and caps + * again */ let buf = gst::Buffer::new(); is.set_property("active-pad", &h1.get_srcpad().unwrap().get_peer()) .unwrap(); - assert!(h1.push(buf) == Ok(gst::FlowSuccess::Ok)); - assert!(h1.buffers_received() == 3); - assert!(h1.events_received() == 6); + assert_eq!(h1.push(buf), Ok(gst::FlowSuccess::Ok)); + assert_eq!(h1.buffers_received(), 3); + assert_eq!(h1.events_received(), 7); let event = h1.pull_event().unwrap(); - assert!(event.get_type() == gst::EventType::Caps); + assert_eq!(event.get_type(), gst::EventType::StreamStart); let event = h1.pull_event().unwrap(); - assert!(event.get_type() == gst::EventType::Segment); + assert_eq!(event.get_type(), gst::EventType::Caps); + let event = h1.pull_event().unwrap(); + assert_eq!(event.get_type(), gst::EventType::Segment); let _ = is.set_state(gst::State::Null); }