diff --git a/gst-plugin-threadshare/src/inputselector.rs b/gst-plugin-threadshare/src/inputselector.rs index 305ed0a12..0f8e7e54a 100644 --- a/gst-plugin-threadshare/src/inputselector.rs +++ b/gst-plugin-threadshare/src/inputselector.rs @@ -1,4 +1,4 @@ -// Copyright (C) 2019 Mathieu Duponchelle +// Copyright (C) 2020 Mathieu Duponchelle // // This library is free software; you can redistribute it and/or // modify it under the terms of the GNU Library General Public @@ -15,12 +15,8 @@ // Free Software Foundation, Inc., 51 Franklin Street, Suite 500, // Boston, MA 02110-1335, USA. -use either::Either; - -use futures::executor::block_on; use futures::future::BoxFuture; use futures::future::{abortable, AbortHandle}; -use futures::lock::Mutex; use futures::prelude::*; use glib; @@ -37,24 +33,13 @@ use gst::{gst_debug, gst_error_msg, gst_log, gst_trace}; use lazy_static::lazy_static; use std::collections::HashMap; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use std::time::Duration; use std::u32; use crate::runtime::prelude::*; -use crate::runtime::{self, Context, PadSink, PadSinkRef, PadSrc}; - -fn get_current_running_time(element: &gst::Element) -> gst::ClockTime { - if let Some(clock) = element.get_clock() { - if clock.get_time() > element.get_base_time() { - clock.get_time() - element.get_base_time() - } else { - 0.into() - } - } else { - gst::CLOCK_TIME_NONE - } -} +use crate::runtime::{self, Context, PadSink, PadSinkRef, PadSrc, PadSrcRef}; +use crate::get_current_running_time; const DEFAULT_CONTEXT: &str = ""; const DEFAULT_CONTEXT_WAIT: u32 = 0; @@ -151,57 +136,60 @@ impl InputSelectorPadSinkHandler { ) -> Result { let inputselector = InputSelector::from_instance(element); - let mut state = inputselector.state.lock().await; - let mut inner = self.0.lock().await; + let (stickies, is_active, sync_future, switched_pad) = { + let mut state = inputselector.state.lock().unwrap(); + let mut inner = self.0.lock().unwrap(); + let mut stickies = vec![]; + let mut sync_future = None; + let switched_pad = state.switched_pad; - let mut sync_future = None; - if let Some(segment) = &inner.segment { - if let Some(segment) = segment.downcast_ref::() { - let rtime = segment.to_running_time(buffer.get_pts()); - let (sync_fut, abort_handle) = abortable(self.sync(&element, rtime)); - inner.abort_handle = Some(abort_handle); - sync_future = Some(sync_fut.map_err(|_| gst::FlowError::Flushing)); + if let Some(segment) = &inner.segment { + if let Some(segment) = segment.downcast_ref::() { + let rtime = segment.to_running_time(buffer.get_pts()); + let (sync_fut, abort_handle) = abortable(self.sync(&element, rtime)); + inner.abort_handle = Some(abort_handle); + sync_future = Some(sync_fut.map_err(|_| gst::FlowError::Flushing)); + } } + + let is_active = { + if state.active_sinkpad.as_ref() == Some(pad.gst_pad()) { + if inner.send_sticky || state.switched_pad { + pad.gst_pad().sticky_events_foreach(|event| { + stickies.push(event.clone()); + Ok(Some(event)) + }); + + inner.send_sticky = false; + state.switched_pad = false; + } + true + } else { + false + } + }; + + (stickies, is_active, sync_future, switched_pad) + }; + + if let Some(sync_fut) = sync_future { + sync_fut.await?; } - if state.active_sinkpad.as_ref() == Some(pad.gst_pad()) { + for event in stickies { + inputselector.src_pad.push_event(event).await; + } + + if is_active { gst_log!(CAT, obj: pad.gst_pad(), "Forwarding {:?}", buffer); - if state.switched_pad && !buffer.get_flags().contains(gst::BufferFlags::DISCONT) { + if switched_pad && !buffer.get_flags().contains(gst::BufferFlags::DISCONT) { let buffer = buffer.make_mut(); buffer.set_flags(gst::BufferFlags::DISCONT); } - let mut stickies: Vec = vec![]; - if inner.send_sticky || state.switched_pad { - pad.gst_pad().sticky_events_foreach(|event| { - stickies.push(event.clone()); - Ok(Some(event)) - }); - - inner.send_sticky = false; - state.switched_pad = false; - } - drop(inner); - drop(state); - - if let Some(sync_fut) = sync_future { - sync_fut.await?; - } - - for event in &stickies { - inputselector.src_pad.push_event(event.clone()).await; - } - inputselector.src_pad.push(buffer).await } else { - drop(inner); - drop(state); - - if let Some(sync_fut) = sync_future { - sync_fut.await?; - } - gst_log!(CAT, obj: pad.gst_pad(), "Dropping {:?}", buffer); Ok(gst::FlowSuccess::Ok) } } @@ -250,59 +238,64 @@ impl PadSinkHandler for InputSelectorPadSinkHandler { .boxed() } + fn sink_event_serialized( + &self, + _pad: &PadSinkRef, + _inputselector: &InputSelector, + _element: &gst::Element, + event: gst::Event, + ) -> BoxFuture<'static, bool> { + let this = self.clone(); + + async move { + let mut inner = this.0.lock().unwrap(); + + // Remember the segment for later use + match event.view() { + gst::EventView::Segment(e) => { + inner.segment = Some(e.get_segment().clone()); + } + _ => (), + } + + // We sent sticky events together with the next buffer once it becomes + // the active pad. + // + // TODO: Other serialized events for the active pad can also be forwarded + // here, and sticky events could be forwarded directly. Needs forwarding of + // all other sticky events first! + if event.is_sticky() { + inner.send_sticky = true; + true + } else { + true + } + } + .boxed() + } + fn sink_event( &self, _pad: &PadSinkRef, inputselector: &InputSelector, _element: &gst::Element, event: gst::Event, - ) -> Either> { - if event.is_serialized() { - let this = self.clone(); - Either::Right( - async move { - let mut inner = this.0.lock().await; + ) -> bool { + /* Drop all events for now */ + match event.view() { + gst::EventView::FlushStart(..) => { + /* Unblock downstream */ + inputselector.src_pad.gst_pad().push_event(event.clone()); - // Remember the segment for later use - match event.view() { - gst::EventView::Segment(e) => { - inner.segment = Some(e.get_segment().clone()); - } - _ => (), - } + let mut inner = self.0.lock().unwrap(); - // We sent sticky events together with the next buffer once it becomes - // the active pad. - // - // TODO: Other serialized events for the active pad can also be forwarded - // here, and sticky events could be forwarded directly. Needs forwarding of - // all other sticky events first! - if event.is_sticky() { - inner.send_sticky = true; - true - } else { - true - } + if let Some(abort_handle) = inner.abort_handle.take() { + abort_handle.abort(); } - .boxed(), - ) - } else { - /* Drop all events for now */ - match event.view() { - gst::EventView::FlushStart(..) => { - /* Unblock downstream */ - inputselector.src_pad.gst_pad().push_event(event.clone()); - - let mut inner = block_on(self.0.lock()); - - if let Some(abort_handle) = inner.abort_handle.take() { - abort_handle.abort(); - } - } - _ => (), } - Either::Left(true) + _ => (), } + true } fn sink_query( @@ -332,6 +325,58 @@ impl InputSelectorPadSrcHandler {} impl PadSrcHandler for InputSelectorPadSrcHandler { type ElementImpl = InputSelector; + + fn src_query( + &self, + pad: &PadSrcRef, + inputselector: &InputSelector, + _element: &gst::Element, + query: &mut gst::QueryRef, + ) -> bool { + use gst::QueryView; + + gst_log!(CAT, obj: pad.gst_pad(), "Handling {:?}", query); + + match query.view_mut() { + QueryView::Latency(ref mut q) => { + let mut ret = true; + let mut min_latency: gst::ClockTime = 0.into(); + let mut max_latency: gst::ClockTime = 0.into(); + let sinkpad = { + let state = inputselector.state.lock().unwrap(); + state.active_sinkpad.clone() + }; + + if let Some(sinkpad) = sinkpad { + let mut peer_query = gst::query::Query::new_latency(); + + ret = sinkpad.peer_query(&mut peer_query); + + if ret { + let (_, min, max) = peer_query.get_result(); + min_latency = min; + max_latency = max; + } + } + + q.set(true, min_latency, max_latency); + + ret + } + _ => { + let sinkpad = { + let state = inputselector.state.lock().unwrap(); + state.active_sinkpad.clone() + }; + + if let Some(sinkpad) = sinkpad { + sinkpad.peer_query(query) + } else { + true + } + } + } + } } #[derive(Debug)] @@ -381,10 +426,10 @@ lazy_static! { } impl InputSelector { - async fn prepare(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> { + fn prepare(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> { gst_debug!(CAT, obj: element, "Preparing"); - let settings = self.settings.lock().await; + let settings = self.settings.lock().unwrap(); let context = Context::acquire(&settings.context, settings.context_wait).map_err(|err| { @@ -396,7 +441,6 @@ impl InputSelector { self.src_pad .prepare(context, &InputSelectorPadSrcHandler {}) - .await .map_err(|err| { gst_error_msg!( gst::ResourceError::OpenRead, @@ -409,11 +453,11 @@ impl InputSelector { Ok(()) } - async fn unprepare(&self, element: &gst::Element) -> Result<(), ()> { - let mut state = self.state.lock().await; + fn unprepare(&self, element: &gst::Element) -> Result<(), ()> { + let mut state = self.state.lock().unwrap(); gst_debug!(CAT, obj: element, "Unpreparing"); - let _ = self.src_pad.unprepare().await; + let _ = self.src_pad.unprepare(); *state = State::default(); @@ -421,20 +465,6 @@ impl InputSelector { Ok(()) } - - async fn start(&self, element: &gst::Element) -> Result<(), ()> { - gst_debug!(CAT, obj: element, "Starting"); - - gst_debug!(CAT, obj: element, "Started"); - - Ok(()) - } - - async fn stop(&self, element: &gst::Element) -> Result<(), ()> { - gst_debug!(CAT, obj: element, "Stopped"); - - Ok(()) - } } impl ObjectSubclass for InputSelector { @@ -497,20 +527,20 @@ impl ObjectImpl for InputSelector { match *prop { subclass::Property("context", ..) => { - let mut settings = block_on(self.settings.lock()); + let mut settings = self.settings.lock().unwrap(); settings.context = value .get() .expect("type checked upstream") .unwrap_or_else(|| "".into()); } subclass::Property("context-wait", ..) => { - let mut settings = block_on(self.settings.lock()); + let mut settings = self.settings.lock().unwrap(); settings.context_wait = value.get_some().expect("type checked upstream"); } subclass::Property("active-pad", ..) => { let pad = value.get::().expect("type checked upstream"); - let mut state = block_on(self.state.lock()); - let pads = block_on(self.pads.lock()); + let mut state = self.state.lock().unwrap(); + let pads = self.pads.lock().unwrap(); if let Some(pad) = pad { if pads.sink_pads.get(&pad).is_some() { state.active_sinkpad = Some(pad); @@ -529,15 +559,15 @@ impl ObjectImpl for InputSelector { match *prop { subclass::Property("context", ..) => { - let settings = block_on(self.settings.lock()); + let settings = self.settings.lock().unwrap(); Ok(settings.context.to_value()) } subclass::Property("context-wait", ..) => { - let settings = block_on(self.settings.lock()); + let settings = self.settings.lock().unwrap(); Ok(settings.context_wait.to_value()) } subclass::Property("active-pad", ..) => { - let state = block_on(self.state.lock()); + let state = self.state.lock().unwrap(); let active_pad = state.active_sinkpad.clone(); Ok(active_pad.to_value()) } @@ -563,24 +593,27 @@ impl ElementImpl for InputSelector { match transition { gst::StateChange::NullToReady => { - block_on(self.prepare(element)).map_err(|err| { + self.prepare(element).map_err(|err| { element.post_error_message(&err); gst::StateChangeError })?; } - gst::StateChange::PausedToReady => { - block_on(self.stop(element)).map_err(|_| gst::StateChangeError)?; - } gst::StateChange::ReadyToNull => { - block_on(self.unprepare(element)).map_err(|_| gst::StateChangeError)?; + self.unprepare(element).map_err(|_| gst::StateChangeError)?; } _ => (), } - let success = self.parent_change_state(element, transition)?; + let mut success = self.parent_change_state(element, transition)?; - if transition == gst::StateChange::ReadyToPaused { - block_on(self.start(element)).map_err(|_| gst::StateChangeError)?; + match transition { + gst::StateChange::ReadyToPaused => { + success = gst::StateChangeSuccess::NoPreroll; + } + gst::StateChange::PlayingToPaused => { + success = gst::StateChangeSuccess::NoPreroll; + } + _ => (), } Ok(success) @@ -593,8 +626,8 @@ impl ElementImpl for InputSelector { _name: Option, _caps: Option<&gst::Caps>, ) -> Option { - let mut state = block_on(self.state.lock()); - let mut pads = block_on(self.pads.lock()); + let mut state = self.state.lock().unwrap(); + let mut pads = self.pads.lock().unwrap(); let sink_pad = gst::Pad::new_from_template(&templ, Some(format!("sink_{}", pads.pad_serial).as_str())); pads.pad_serial += 1; @@ -603,7 +636,7 @@ impl ElementImpl for InputSelector { let sink_pad = PadSink::new(sink_pad); let ret = sink_pad.gst_pad().clone(); - block_on(sink_pad.prepare(&InputSelectorPadSinkHandler::new())); + sink_pad.prepare(&InputSelectorPadSinkHandler::new()); if state.active_sinkpad.is_none() { state.active_sinkpad = Some(ret.clone()); @@ -616,9 +649,9 @@ impl ElementImpl for InputSelector { } fn release_pad(&self, element: &gst::Element, pad: &gst::Pad) { - let mut pads = block_on(self.pads.lock()); + let mut pads = self.pads.lock().unwrap(); let sink_pad = pads.sink_pads.remove(pad).unwrap(); - block_on(sink_pad.unprepare()); + sink_pad.unprepare(); element.remove_pad(pad).unwrap(); } } diff --git a/gst-plugin-threadshare/tests/inputselector.rs b/gst-plugin-threadshare/tests/inputselector.rs index d3e1a0a53..965c266c1 100644 --- a/gst-plugin-threadshare/tests/inputselector.rs +++ b/gst-plugin-threadshare/tests/inputselector.rs @@ -1,4 +1,4 @@ -// Copyright (C) 2018 Sebastian Dröge +// Copyright (C) 2020 Mathieu Duponchelle // // This library is free software; you can redistribute it and/or // modify it under the terms of the GNU Library General Public @@ -74,10 +74,8 @@ fn test_active_pad() { assert_eq!(h2.push(buf), Ok(gst::FlowSuccess::Ok)); assert_eq!(h1.buffers_received(), 1); - assert_eq!(h1.events_received(), 4); + assert_eq!(h1.events_received(), 3); - let event = h1.pull_event().unwrap(); - assert_eq!(event.get_type(), gst::EventType::CustomDownstreamSticky); let event = h1.pull_event().unwrap(); assert_eq!(event.get_type(), gst::EventType::StreamStart); let event = h1.pull_event().unwrap(); @@ -89,7 +87,7 @@ fn test_active_pad() { let buf = gst::Buffer::new(); assert_eq!(h2.push(buf), Ok(gst::FlowSuccess::Ok)); assert_eq!(h1.buffers_received(), 2); - assert_eq!(h1.events_received(), 4); + assert_eq!(h1.events_received(), 3); /* Switch the active pad and push a buffer, we should receive stream-start, segment and caps * again */ @@ -98,7 +96,7 @@ fn test_active_pad() { .unwrap(); assert_eq!(h1.push(buf), Ok(gst::FlowSuccess::Ok)); assert_eq!(h1.buffers_received(), 3); - assert_eq!(h1.events_received(), 7); + assert_eq!(h1.events_received(), 6); let event = h1.pull_event().unwrap(); assert_eq!(event.get_type(), gst::EventType::StreamStart); let event = h1.pull_event().unwrap();