From b3becb01c38292a0725c8408bd1700b3e569f955 Mon Sep 17 00:00:00 2001 From: Mathieu Duponchelle Date: Fri, 3 Jan 2020 18:46:32 +0100 Subject: [PATCH] threadshare: New input selector element --- gst-plugin-threadshare/src/inputselector.rs | 605 ++++++++++++++++++ gst-plugin-threadshare/src/lib.rs | 2 + gst-plugin-threadshare/tests/inputselector.rs | 105 +++ 3 files changed, 712 insertions(+) create mode 100644 gst-plugin-threadshare/src/inputselector.rs create mode 100644 gst-plugin-threadshare/tests/inputselector.rs diff --git a/gst-plugin-threadshare/src/inputselector.rs b/gst-plugin-threadshare/src/inputselector.rs new file mode 100644 index 000000000..a6e724be3 --- /dev/null +++ b/gst-plugin-threadshare/src/inputselector.rs @@ -0,0 +1,605 @@ +// Copyright (C) 2019 Mathieu Duponchelle +// +// This library is free software; you can redistribute it and/or +// modify it under the terms of the GNU Library General Public +// License as published by the Free Software Foundation; either +// version 2 of the License, or (at your option) any later version. +// +// This library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +// Library General Public License for more details. +// +// You should have received a copy of the GNU Library General Public +// License along with this library; if not, write to the +// Free Software Foundation, Inc., 51 Franklin Street, Suite 500, +// Boston, MA 02110-1335, USA. + +use either::Either; + +use futures::channel::oneshot; +use futures::executor::block_on; +use futures::future::BoxFuture; +use futures::future::{abortable, AbortHandle}; +use futures::lock::{Mutex, MutexGuard}; +use futures::prelude::*; + +use glib; +use glib::prelude::*; +use glib::subclass; +use glib::subclass::prelude::*; +use glib::{glib_object_impl, glib_object_subclass}; + +use gst; +use gst::prelude::*; +use gst::subclass::prelude::*; +use gst::{gst_debug, gst_error_msg, gst_log, gst_trace}; + +use lazy_static::lazy_static; + +use std::collections::HashMap; +use std::sync::Arc; +use std::time::Duration; +use std::u32; + +use crate::runtime::prelude::*; +use crate::runtime::{Context, PadSink, PadSinkRef, PadSrc}; + +fn get_current_running_time(element: &gst::Element) -> gst::ClockTime { + if let Some(clock) = element.get_clock() { + if clock.get_time() > element.get_base_time() { + clock.get_time() - element.get_base_time() + } else { + 0.into() + } + } else { + gst::CLOCK_TIME_NONE + } +} + +const DEFAULT_CONTEXT: &str = ""; +const DEFAULT_CONTEXT_WAIT: u32 = 0; + +#[derive(Debug, Clone)] +struct Settings { + context: String, + context_wait: u32, +} + +impl Default for Settings { + fn default() -> Self { + Settings { + context: DEFAULT_CONTEXT.into(), + context_wait: DEFAULT_CONTEXT_WAIT, + } + } +} + +static PROPERTIES: [subclass::Property; 3] = [ + subclass::Property("context", |name| { + glib::ParamSpec::string( + name, + "Context", + "Context name to share threads with", + Some(DEFAULT_CONTEXT), + glib::ParamFlags::READWRITE, + ) + }), + subclass::Property("context-wait", |name| { + glib::ParamSpec::uint( + name, + "Context Wait", + "Throttle poll loop to run at most once every this many ms", + 0, + 1000, + DEFAULT_CONTEXT_WAIT, + glib::ParamFlags::READWRITE, + ) + }), + subclass::Property("active-pad", |name| { + glib::ParamSpec::object( + name, + "Active Pad", + "Currently active pad", + gst::Pad::static_type(), + glib::ParamFlags::READWRITE, + ) + }), +]; + +#[derive(Debug)] +struct InputSelectorPadSinkHandlerInner { + segment: Option, + abort_handle: Option, +} + +impl Default for InputSelectorPadSinkHandlerInner { + fn default() -> Self { + InputSelectorPadSinkHandlerInner { + segment: None, + abort_handle: None, + } + } +} + +#[derive(Clone, Debug)] +struct InputSelectorPadSinkHandler(Arc>); + +impl InputSelectorPadSinkHandler { + fn new() -> Self { + InputSelectorPadSinkHandler(Arc::new(Mutex::new( + InputSelectorPadSinkHandlerInner::default(), + ))) + } + + #[inline] + async fn lock(&self) -> MutexGuard<'_, InputSelectorPadSinkHandlerInner> { + self.0.lock().await + } + + /* Wait until specified time */ + async fn sync(&self, element: &gst::Element, running_time: gst::ClockTime) { + let inputselector = InputSelector::from_instance(element); + let now = get_current_running_time(&element); + + if now.is_some() && now < running_time { + let pad_src_state = inputselector.src_pad.lock_state().await; + let context = pad_src_state.pad_context().unwrap(); + let (sender, receiver) = oneshot::channel(); + let delay = running_time - now; + let delay_for_fut = + context.delay_for(Duration::from_nanos(delay.nseconds().unwrap()), move || { + async { + let _ = sender.send(()); + } + }); + context.spawn(delay_for_fut); + let _ = receiver.await; + } + } + + async fn handle_item( + &self, + pad: PadSinkRef<'_>, + element: &gst::Element, + buffer: gst::Buffer, + ) -> Result { + let inputselector = InputSelector::from_instance(element); + let mut inner = self.lock().await; + + if let Some(segment) = &inner.segment { + if let Some(segment) = segment.downcast_ref::() { + let rtime = segment.to_running_time(buffer.get_pts()); + let (sync_fut, abort_handle) = abortable(self.sync(&element, rtime)); + inner.abort_handle = Some(abort_handle); + drop(inner); + let _ = sync_fut.await; + } + } + + let mut state = inputselector.state.lock().await; + + if state.active_sinkpad.as_ref() == Some(pad.gst_pad()) { + gst_log!(CAT, obj: pad.gst_pad(), "Forwarding {:?}", buffer); + + if state.send_sticky { + let mut stickies: Vec = vec![]; + + pad.gst_pad().sticky_events_foreach(|event| { + let mut forward = true; + + if event.get_type() == gst::EventType::StreamStart { + forward = state.send_stream_start; + state.send_stream_start = false; + } + + if forward { + stickies.push(event.clone()); + } + + Ok(Some(event)) + }); + + for event in &stickies { + inputselector.src_pad.push_event(event.clone()).await; + } + + state.send_sticky = false; + } + + drop(state); + + inputselector.src_pad.push(buffer).await + } else { + gst_log!(CAT, obj: pad.gst_pad(), "Dropping {:?}", buffer); + Ok(gst::FlowSuccess::Ok) + } + } +} + +impl PadSinkHandler for InputSelectorPadSinkHandler { + type ElementImpl = InputSelector; + + fn sink_chain( + &self, + pad: PadSinkRef, + _inputselector: &InputSelector, + element: &gst::Element, + buffer: gst::Buffer, + ) -> BoxFuture<'static, Result> { + let this = self.clone(); + let element = element.clone(); + let pad_weak = pad.downgrade(); + async move { + let pad = pad_weak.upgrade().expect("PadSink no longer exists"); + this.handle_item(pad, &element, buffer).await + } + .boxed() + } + + fn sink_chain_list( + &self, + pad: PadSinkRef, + _inputselector: &InputSelector, + element: &gst::Element, + list: gst::BufferList, + ) -> BoxFuture<'static, Result> { + let element = element.clone(); + let pad_weak = pad.downgrade(); + async move { + let inputselector = InputSelector::from_instance(&element); + let pad = pad_weak.upgrade().expect("PadSink no longer exists"); + gst_log!(CAT, obj: pad.gst_pad(), "Handling buffer list {:?}", list); + inputselector.src_pad.push_list(list).await + } + .boxed() + } + + fn sink_event( + &self, + _pad: PadSinkRef, + inputselector: &InputSelector, + _element: &gst::Element, + event: gst::Event, + ) -> Either> { + if event.is_serialized() { + let this = self.clone(); + Either::Right( + async move { + match event.view() { + gst::EventView::Segment(e) => { + let mut inner = this.lock().await; + inner.segment = Some(e.get_segment().clone()); + } + _ => (), + } + true + } + .boxed(), + ) + } else { + /* Drop all events for now */ + match event.view() { + gst::EventView::FlushStart(..) => { + /* Unblock downstream */ + inputselector.src_pad.gst_pad().push_event(event.clone()); + + let mut inner = block_on(self.lock()); + + if let Some(abort_handle) = inner.abort_handle.take() { + abort_handle.abort(); + } + } + _ => (), + } + Either::Left(true) + } + } + + fn sink_query( + &self, + pad: PadSinkRef, + inputselector: &InputSelector, + _element: &gst::Element, + query: &mut gst::QueryRef, + ) -> bool { + gst_log!(CAT, obj: pad.gst_pad(), "Handling query {:?}", query); + + if query.is_serialized() { + // FIXME: How can we do this (drops ALLOCATION and DRAIN)? + gst_log!(CAT, obj: pad.gst_pad(), "Dropping serialized query {:?}", query); + false + } else { + gst_log!(CAT, obj: pad.gst_pad(), "Forwarding query {:?}", query); + inputselector.src_pad.gst_pad().peer_query(query) + } + } +} + +#[derive(Clone, Debug)] +struct InputSelectorPadSrcHandler; + +impl InputSelectorPadSrcHandler {} + +impl PadSrcHandler for InputSelectorPadSrcHandler { + type ElementImpl = InputSelector; +} + +#[derive(Debug)] +struct State { + active_sinkpad: Option, + send_sticky: bool, + send_stream_start: bool, + pad_serial: u32, + sink_pads: HashMap, +} + +impl Default for State { + fn default() -> State { + State { + active_sinkpad: None, + send_sticky: false, + send_stream_start: true, + pad_serial: 0, + sink_pads: HashMap::new(), + } + } +} + +#[derive(Debug)] +struct InputSelector { + src_pad: PadSrc, + state: Mutex, + settings: Mutex, +} + +lazy_static! { + static ref CAT: gst::DebugCategory = gst::DebugCategory::new( + "ts-input-selector", + gst::DebugColorFlags::empty(), + Some("Thread-sharing input selector"), + ); +} + +impl InputSelector { + async fn prepare(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> { + gst_debug!(CAT, obj: element, "Preparing"); + + let settings = self.settings.lock().await; + + let context = + Context::acquire(&settings.context, settings.context_wait).map_err(|err| { + gst_error_msg!( + gst::ResourceError::OpenRead, + ["Failed to acquire Context: {}", err] + ) + })?; + + self.src_pad + .prepare(context, &InputSelectorPadSrcHandler {}) + .await + .map_err(|err| { + gst_error_msg!( + gst::ResourceError::OpenRead, + ["Error joining Context: {:?}", err] + ) + })?; + + gst_debug!(CAT, obj: element, "Prepared"); + + Ok(()) + } + + async fn unprepare(&self, element: &gst::Element) -> Result<(), ()> { + let mut state = self.state.lock().await; + gst_debug!(CAT, obj: element, "Unpreparing"); + + let _ = self.src_pad.unprepare().await; + + *state = State::default(); + + gst_debug!(CAT, obj: element, "Unprepared"); + + Ok(()) + } + + async fn start(&self, element: &gst::Element) -> Result<(), ()> { + gst_debug!(CAT, obj: element, "Starting"); + + gst_debug!(CAT, obj: element, "Started"); + + Ok(()) + } + + async fn stop(&self, element: &gst::Element) -> Result<(), ()> { + gst_debug!(CAT, obj: element, "Stopped"); + + Ok(()) + } +} + +impl ObjectSubclass for InputSelector { + const NAME: &'static str = "RsTsInputSelector"; + type ParentType = gst::Element; + type Instance = gst::subclass::ElementInstanceStruct; + type Class = subclass::simple::ClassStruct; + + glib_object_subclass!(); + + fn class_init(klass: &mut subclass::simple::ClassStruct) { + klass.set_metadata( + "Thread-sharing input selector", + "Generic", + "Simple input selector element", + "Mathieu Duponchelle ", + ); + + let caps = gst::Caps::new_any(); + + let sink_pad_template = gst::PadTemplate::new( + "sink_%u", + gst::PadDirection::Sink, + gst::PadPresence::Request, + &caps, + ) + .unwrap(); + klass.add_pad_template(sink_pad_template); + + let src_pad_template = gst::PadTemplate::new( + "src", + gst::PadDirection::Src, + gst::PadPresence::Always, + &caps, + ) + .unwrap(); + klass.add_pad_template(src_pad_template); + + klass.install_properties(&PROPERTIES); + } + + fn new_with_class(klass: &subclass::simple::ClassStruct) -> Self { + let templ = klass.get_pad_template("src").unwrap(); + let src_pad = PadSrc::new_from_template(&templ, Some("src")); + + Self { + src_pad, + state: Mutex::new(State::default()), + settings: Mutex::new(Settings::default()), + } + } +} + +impl ObjectImpl for InputSelector { + glib_object_impl!(); + + fn set_property(&self, _obj: &glib::Object, id: usize, value: &glib::Value) { + let prop = &PROPERTIES[id]; + + match *prop { + subclass::Property("context", ..) => { + let mut settings = block_on(self.settings.lock()); + settings.context = value + .get() + .expect("type checked upstream") + .unwrap_or_else(|| "".into()); + } + subclass::Property("context-wait", ..) => { + let mut settings = block_on(self.settings.lock()); + settings.context_wait = value.get_some().expect("type checked upstream"); + } + subclass::Property("active-pad", ..) => { + let mut state = block_on(self.state.lock()); + state.active_sinkpad = value.get::().expect("type checked upstream"); + state.send_sticky = true; + } + _ => unimplemented!(), + } + } + + fn get_property(&self, _obj: &glib::Object, id: usize) -> Result { + let prop = &PROPERTIES[id]; + + match *prop { + subclass::Property("context", ..) => { + let settings = block_on(self.settings.lock()); + Ok(settings.context.to_value()) + } + subclass::Property("context-wait", ..) => { + let settings = block_on(self.settings.lock()); + Ok(settings.context_wait.to_value()) + } + subclass::Property("active-pad", ..) => { + let state = block_on(self.state.lock()); + let active_pad = state.active_sinkpad.clone(); + Ok(active_pad.to_value()) + } + _ => unimplemented!(), + } + } + + fn constructed(&self, obj: &glib::Object) { + self.parent_constructed(obj); + + let element = obj.downcast_ref::().unwrap(); + element.add_pad(self.src_pad.gst_pad()).unwrap(); + } +} + +impl ElementImpl for InputSelector { + fn change_state( + &self, + element: &gst::Element, + transition: gst::StateChange, + ) -> Result { + gst_trace!(CAT, obj: element, "Changing state {:?}", transition); + + match transition { + gst::StateChange::NullToReady => { + block_on(self.prepare(element)).map_err(|err| { + element.post_error_message(&err); + gst::StateChangeError + })?; + } + gst::StateChange::PausedToReady => { + block_on(self.stop(element)).map_err(|_| gst::StateChangeError)?; + } + gst::StateChange::ReadyToNull => { + block_on(self.unprepare(element)).map_err(|_| gst::StateChangeError)?; + } + _ => (), + } + + let success = self.parent_change_state(element, transition)?; + + if transition == gst::StateChange::ReadyToPaused { + block_on(self.start(element)).map_err(|_| gst::StateChangeError)?; + } + + Ok(success) + } + + fn request_new_pad( + &self, + element: &gst::Element, + templ: &gst::PadTemplate, + _name: Option, + _caps: Option<&gst::Caps>, + ) -> Option { + let mut state = block_on(self.state.lock()); + let sink_pad = gst::Pad::new_from_template( + &templ, + Some(format!("sink_{}", state.pad_serial).as_str()), + ); + state.pad_serial += 1; + sink_pad.set_active(true).unwrap(); + element.add_pad(&sink_pad).unwrap(); + let sink_pad = PadSink::new(sink_pad); + let ret = sink_pad.gst_pad().clone(); + + block_on(sink_pad.prepare(&InputSelectorPadSinkHandler::new())); + state.sink_pads.insert(ret.clone(), sink_pad); + + if state.active_sinkpad.is_none() { + state.active_sinkpad = Some(ret.clone()); + state.send_sticky = true; + } + + Some(ret) + } + + fn release_pad(&self, element: &gst::Element, pad: &gst::Pad) { + let mut state = block_on(self.state.lock()); + let sink_pad = state.sink_pads.remove(pad).unwrap(); + block_on(sink_pad.unprepare()); + element.remove_pad(pad).unwrap(); + } +} + +pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { + gst::Element::register( + Some(plugin), + "ts-input-selector", + gst::Rank::None, + InputSelector::get_type(), + ) +} diff --git a/gst-plugin-threadshare/src/lib.rs b/gst-plugin-threadshare/src/lib.rs index c8171fdf1..42ecc114d 100644 --- a/gst-plugin-threadshare/src/lib.rs +++ b/gst-plugin-threadshare/src/lib.rs @@ -36,6 +36,7 @@ mod udpsrc; mod appsrc; pub mod dataqueue; +mod inputselector; mod jitterbuffer; mod proxy; mod queue; @@ -56,6 +57,7 @@ fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { proxy::register(plugin)?; appsrc::register(plugin)?; jitterbuffer::jitterbuffer::register(plugin)?; + inputselector::register(plugin)?; Ok(()) } diff --git a/gst-plugin-threadshare/tests/inputselector.rs b/gst-plugin-threadshare/tests/inputselector.rs new file mode 100644 index 000000000..82dc53140 --- /dev/null +++ b/gst-plugin-threadshare/tests/inputselector.rs @@ -0,0 +1,105 @@ +// Copyright (C) 2018 Sebastian Dröge +// +// This library is free software; you can redistribute it and/or +// modify it under the terms of the GNU Library General Public +// License as published by the Free Software Foundation; either +// version 2 of the License, or (at your option) any later version. +// +// This library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +// Library General Public License for more details. +// +// You should have received a copy of the GNU Library General Public +// License along with this library; if not, write to the +// Free Software Foundation, Inc., 51 Franklin Street, Suite 500, +// Boston, MA 02110-1335, USA. + +use glib; +use glib::prelude::*; + +use gst; +use gst::prelude::*; +use gst_check; + +use gstthreadshare; + +fn init() { + use std::sync::Once; + static INIT: Once = Once::new(); + + INIT.call_once(|| { + gst::init().unwrap(); + gstthreadshare::plugin_register_static().expect("gstthreadshare inputselector test"); + }); +} + +#[test] +fn test_active_pad() { + init(); + + let is = gst::ElementFactory::make("ts-input-selector", None).unwrap(); + + let mut h1 = gst_check::Harness::new_with_element(&is, Some("sink_%u"), Some("src")); + let mut h2 = gst_check::Harness::new_with_element(&is, Some("sink_%u"), None); + + let active_pad = is + .get_property("active-pad") + .unwrap() + .get::() + .unwrap(); + assert!(active_pad == h1.get_srcpad().unwrap().get_peer()); + + is.set_property("active-pad", &h2.get_srcpad().unwrap().get_peer()) + .unwrap(); + let active_pad = is + .get_property("active-pad") + .unwrap() + .get::() + .unwrap(); + assert!(active_pad == h2.get_srcpad().unwrap().get_peer()); + + h1.set_src_caps_str("foo/bar"); + h2.set_src_caps_str("foo/bar"); + + h1.play(); + + /* Push buffer on inactive pad, we should not receive anything */ + let buf = gst::Buffer::new(); + assert!(h1.push(buf) == Ok(gst::FlowSuccess::Ok)); + assert!(h1.buffers_received() == 0); + + /* Buffers pushed on the active pad should be received */ + let buf = gst::Buffer::new(); + assert!(h2.push(buf) == Ok(gst::FlowSuccess::Ok)); + assert!(h1.buffers_received() == 1); + + assert!(h1.events_received() == 4); + + let event = h1.pull_event().unwrap(); + assert!(event.get_type() == gst::EventType::CustomDownstreamSticky); + let event = h1.pull_event().unwrap(); + assert!(event.get_type() == gst::EventType::StreamStart); + let event = h1.pull_event().unwrap(); + assert!(event.get_type() == gst::EventType::Caps); + let event = h1.pull_event().unwrap(); + assert!(event.get_type() == gst::EventType::Segment); + + /* Push another buffer on the active pad, there should be no new events */ + let buf = gst::Buffer::new(); + assert!(h2.push(buf) == Ok(gst::FlowSuccess::Ok)); + assert!(h1.buffers_received() == 2); + assert!(h1.events_received() == 4); + + /* Switch the active pad and push a buffer, we should receive segment and caps again */ + let buf = gst::Buffer::new(); + is.set_property("active-pad", &h1.get_srcpad().unwrap().get_peer()) + .unwrap(); + assert!(h1.push(buf) == Ok(gst::FlowSuccess::Ok)); + assert!(h1.buffers_received() == 3); + assert!(h1.events_received() == 6); + let event = h1.pull_event().unwrap(); + assert!(event.get_type() == gst::EventType::Caps); + let event = h1.pull_event().unwrap(); + assert!(event.get_type() == gst::EventType::Segment); +}