threadshare/inputselector: port to new API

This commit is contained in:
Mathieu Duponchelle 2020-03-17 16:19:01 +01:00 committed by Sebastian Dröge
parent ad4597a40e
commit c01ef0e774
2 changed files with 177 additions and 146 deletions

View file

@ -1,4 +1,4 @@
// Copyright (C) 2019 Mathieu Duponchelle <mathieu@centricular.com>
// Copyright (C) 2020 Mathieu Duponchelle <mathieu@centricular.com>
//
// This library is free software; you can redistribute it and/or
// modify it under the terms of the GNU Library General Public
@ -15,12 +15,8 @@
// Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
// Boston, MA 02110-1335, USA.
use either::Either;
use futures::executor::block_on;
use futures::future::BoxFuture;
use futures::future::{abortable, AbortHandle};
use futures::lock::Mutex;
use futures::prelude::*;
use glib;
@ -37,24 +33,13 @@ use gst::{gst_debug, gst_error_msg, gst_log, gst_trace};
use lazy_static::lazy_static;
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use std::u32;
use crate::runtime::prelude::*;
use crate::runtime::{self, Context, PadSink, PadSinkRef, PadSrc};
fn get_current_running_time(element: &gst::Element) -> gst::ClockTime {
if let Some(clock) = element.get_clock() {
if clock.get_time() > element.get_base_time() {
clock.get_time() - element.get_base_time()
} else {
0.into()
}
} else {
gst::CLOCK_TIME_NONE
}
}
use crate::runtime::{self, Context, PadSink, PadSinkRef, PadSrc, PadSrcRef};
use crate::get_current_running_time;
const DEFAULT_CONTEXT: &str = "";
const DEFAULT_CONTEXT_WAIT: u32 = 0;
@ -151,57 +136,60 @@ impl InputSelectorPadSinkHandler {
) -> Result<gst::FlowSuccess, gst::FlowError> {
let inputselector = InputSelector::from_instance(element);
let mut state = inputselector.state.lock().await;
let mut inner = self.0.lock().await;
let (stickies, is_active, sync_future, switched_pad) = {
let mut state = inputselector.state.lock().unwrap();
let mut inner = self.0.lock().unwrap();
let mut stickies = vec![];
let mut sync_future = None;
let switched_pad = state.switched_pad;
let mut sync_future = None;
if let Some(segment) = &inner.segment {
if let Some(segment) = segment.downcast_ref::<gst::format::Time>() {
let rtime = segment.to_running_time(buffer.get_pts());
let (sync_fut, abort_handle) = abortable(self.sync(&element, rtime));
inner.abort_handle = Some(abort_handle);
sync_future = Some(sync_fut.map_err(|_| gst::FlowError::Flushing));
if let Some(segment) = &inner.segment {
if let Some(segment) = segment.downcast_ref::<gst::format::Time>() {
let rtime = segment.to_running_time(buffer.get_pts());
let (sync_fut, abort_handle) = abortable(self.sync(&element, rtime));
inner.abort_handle = Some(abort_handle);
sync_future = Some(sync_fut.map_err(|_| gst::FlowError::Flushing));
}
}
let is_active = {
if state.active_sinkpad.as_ref() == Some(pad.gst_pad()) {
if inner.send_sticky || state.switched_pad {
pad.gst_pad().sticky_events_foreach(|event| {
stickies.push(event.clone());
Ok(Some(event))
});
inner.send_sticky = false;
state.switched_pad = false;
}
true
} else {
false
}
};
(stickies, is_active, sync_future, switched_pad)
};
if let Some(sync_fut) = sync_future {
sync_fut.await?;
}
if state.active_sinkpad.as_ref() == Some(pad.gst_pad()) {
for event in stickies {
inputselector.src_pad.push_event(event).await;
}
if is_active {
gst_log!(CAT, obj: pad.gst_pad(), "Forwarding {:?}", buffer);
if state.switched_pad && !buffer.get_flags().contains(gst::BufferFlags::DISCONT) {
if switched_pad && !buffer.get_flags().contains(gst::BufferFlags::DISCONT) {
let buffer = buffer.make_mut();
buffer.set_flags(gst::BufferFlags::DISCONT);
}
let mut stickies: Vec<gst::Event> = vec![];
if inner.send_sticky || state.switched_pad {
pad.gst_pad().sticky_events_foreach(|event| {
stickies.push(event.clone());
Ok(Some(event))
});
inner.send_sticky = false;
state.switched_pad = false;
}
drop(inner);
drop(state);
if let Some(sync_fut) = sync_future {
sync_fut.await?;
}
for event in &stickies {
inputselector.src_pad.push_event(event.clone()).await;
}
inputselector.src_pad.push(buffer).await
} else {
drop(inner);
drop(state);
if let Some(sync_fut) = sync_future {
sync_fut.await?;
}
gst_log!(CAT, obj: pad.gst_pad(), "Dropping {:?}", buffer);
Ok(gst::FlowSuccess::Ok)
}
}
@ -250,59 +238,64 @@ impl PadSinkHandler for InputSelectorPadSinkHandler {
.boxed()
}
fn sink_event_serialized(
&self,
_pad: &PadSinkRef,
_inputselector: &InputSelector,
_element: &gst::Element,
event: gst::Event,
) -> BoxFuture<'static, bool> {
let this = self.clone();
async move {
let mut inner = this.0.lock().unwrap();
// Remember the segment for later use
match event.view() {
gst::EventView::Segment(e) => {
inner.segment = Some(e.get_segment().clone());
}
_ => (),
}
// We sent sticky events together with the next buffer once it becomes
// the active pad.
//
// TODO: Other serialized events for the active pad can also be forwarded
// here, and sticky events could be forwarded directly. Needs forwarding of
// all other sticky events first!
if event.is_sticky() {
inner.send_sticky = true;
true
} else {
true
}
}
.boxed()
}
fn sink_event(
&self,
_pad: &PadSinkRef,
inputselector: &InputSelector,
_element: &gst::Element,
event: gst::Event,
) -> Either<bool, BoxFuture<'static, bool>> {
if event.is_serialized() {
let this = self.clone();
Either::Right(
async move {
let mut inner = this.0.lock().await;
) -> bool {
/* Drop all events for now */
match event.view() {
gst::EventView::FlushStart(..) => {
/* Unblock downstream */
inputselector.src_pad.gst_pad().push_event(event.clone());
// Remember the segment for later use
match event.view() {
gst::EventView::Segment(e) => {
inner.segment = Some(e.get_segment().clone());
}
_ => (),
}
let mut inner = self.0.lock().unwrap();
// We sent sticky events together with the next buffer once it becomes
// the active pad.
//
// TODO: Other serialized events for the active pad can also be forwarded
// here, and sticky events could be forwarded directly. Needs forwarding of
// all other sticky events first!
if event.is_sticky() {
inner.send_sticky = true;
true
} else {
true
}
if let Some(abort_handle) = inner.abort_handle.take() {
abort_handle.abort();
}
.boxed(),
)
} else {
/* Drop all events for now */
match event.view() {
gst::EventView::FlushStart(..) => {
/* Unblock downstream */
inputselector.src_pad.gst_pad().push_event(event.clone());
let mut inner = block_on(self.0.lock());
if let Some(abort_handle) = inner.abort_handle.take() {
abort_handle.abort();
}
}
_ => (),
}
Either::Left(true)
_ => (),
}
true
}
fn sink_query(
@ -332,6 +325,58 @@ impl InputSelectorPadSrcHandler {}
impl PadSrcHandler for InputSelectorPadSrcHandler {
type ElementImpl = InputSelector;
fn src_query(
&self,
pad: &PadSrcRef,
inputselector: &InputSelector,
_element: &gst::Element,
query: &mut gst::QueryRef,
) -> bool {
use gst::QueryView;
gst_log!(CAT, obj: pad.gst_pad(), "Handling {:?}", query);
match query.view_mut() {
QueryView::Latency(ref mut q) => {
let mut ret = true;
let mut min_latency: gst::ClockTime = 0.into();
let mut max_latency: gst::ClockTime = 0.into();
let sinkpad = {
let state = inputselector.state.lock().unwrap();
state.active_sinkpad.clone()
};
if let Some(sinkpad) = sinkpad {
let mut peer_query = gst::query::Query::new_latency();
ret = sinkpad.peer_query(&mut peer_query);
if ret {
let (_, min, max) = peer_query.get_result();
min_latency = min;
max_latency = max;
}
}
q.set(true, min_latency, max_latency);
ret
}
_ => {
let sinkpad = {
let state = inputselector.state.lock().unwrap();
state.active_sinkpad.clone()
};
if let Some(sinkpad) = sinkpad {
sinkpad.peer_query(query)
} else {
true
}
}
}
}
}
#[derive(Debug)]
@ -381,10 +426,10 @@ lazy_static! {
}
impl InputSelector {
async fn prepare(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
fn prepare(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
gst_debug!(CAT, obj: element, "Preparing");
let settings = self.settings.lock().await;
let settings = self.settings.lock().unwrap();
let context =
Context::acquire(&settings.context, settings.context_wait).map_err(|err| {
@ -396,7 +441,6 @@ impl InputSelector {
self.src_pad
.prepare(context, &InputSelectorPadSrcHandler {})
.await
.map_err(|err| {
gst_error_msg!(
gst::ResourceError::OpenRead,
@ -409,11 +453,11 @@ impl InputSelector {
Ok(())
}
async fn unprepare(&self, element: &gst::Element) -> Result<(), ()> {
let mut state = self.state.lock().await;
fn unprepare(&self, element: &gst::Element) -> Result<(), ()> {
let mut state = self.state.lock().unwrap();
gst_debug!(CAT, obj: element, "Unpreparing");
let _ = self.src_pad.unprepare().await;
let _ = self.src_pad.unprepare();
*state = State::default();
@ -421,20 +465,6 @@ impl InputSelector {
Ok(())
}
async fn start(&self, element: &gst::Element) -> Result<(), ()> {
gst_debug!(CAT, obj: element, "Starting");
gst_debug!(CAT, obj: element, "Started");
Ok(())
}
async fn stop(&self, element: &gst::Element) -> Result<(), ()> {
gst_debug!(CAT, obj: element, "Stopped");
Ok(())
}
}
impl ObjectSubclass for InputSelector {
@ -497,20 +527,20 @@ impl ObjectImpl for InputSelector {
match *prop {
subclass::Property("context", ..) => {
let mut settings = block_on(self.settings.lock());
let mut settings = self.settings.lock().unwrap();
settings.context = value
.get()
.expect("type checked upstream")
.unwrap_or_else(|| "".into());
}
subclass::Property("context-wait", ..) => {
let mut settings = block_on(self.settings.lock());
let mut settings = self.settings.lock().unwrap();
settings.context_wait = value.get_some().expect("type checked upstream");
}
subclass::Property("active-pad", ..) => {
let pad = value.get::<gst::Pad>().expect("type checked upstream");
let mut state = block_on(self.state.lock());
let pads = block_on(self.pads.lock());
let mut state = self.state.lock().unwrap();
let pads = self.pads.lock().unwrap();
if let Some(pad) = pad {
if pads.sink_pads.get(&pad).is_some() {
state.active_sinkpad = Some(pad);
@ -529,15 +559,15 @@ impl ObjectImpl for InputSelector {
match *prop {
subclass::Property("context", ..) => {
let settings = block_on(self.settings.lock());
let settings = self.settings.lock().unwrap();
Ok(settings.context.to_value())
}
subclass::Property("context-wait", ..) => {
let settings = block_on(self.settings.lock());
let settings = self.settings.lock().unwrap();
Ok(settings.context_wait.to_value())
}
subclass::Property("active-pad", ..) => {
let state = block_on(self.state.lock());
let state = self.state.lock().unwrap();
let active_pad = state.active_sinkpad.clone();
Ok(active_pad.to_value())
}
@ -563,24 +593,27 @@ impl ElementImpl for InputSelector {
match transition {
gst::StateChange::NullToReady => {
block_on(self.prepare(element)).map_err(|err| {
self.prepare(element).map_err(|err| {
element.post_error_message(&err);
gst::StateChangeError
})?;
}
gst::StateChange::PausedToReady => {
block_on(self.stop(element)).map_err(|_| gst::StateChangeError)?;
}
gst::StateChange::ReadyToNull => {
block_on(self.unprepare(element)).map_err(|_| gst::StateChangeError)?;
self.unprepare(element).map_err(|_| gst::StateChangeError)?;
}
_ => (),
}
let success = self.parent_change_state(element, transition)?;
let mut success = self.parent_change_state(element, transition)?;
if transition == gst::StateChange::ReadyToPaused {
block_on(self.start(element)).map_err(|_| gst::StateChangeError)?;
match transition {
gst::StateChange::ReadyToPaused => {
success = gst::StateChangeSuccess::NoPreroll;
}
gst::StateChange::PlayingToPaused => {
success = gst::StateChangeSuccess::NoPreroll;
}
_ => (),
}
Ok(success)
@ -593,8 +626,8 @@ impl ElementImpl for InputSelector {
_name: Option<String>,
_caps: Option<&gst::Caps>,
) -> Option<gst::Pad> {
let mut state = block_on(self.state.lock());
let mut pads = block_on(self.pads.lock());
let mut state = self.state.lock().unwrap();
let mut pads = self.pads.lock().unwrap();
let sink_pad =
gst::Pad::new_from_template(&templ, Some(format!("sink_{}", pads.pad_serial).as_str()));
pads.pad_serial += 1;
@ -603,7 +636,7 @@ impl ElementImpl for InputSelector {
let sink_pad = PadSink::new(sink_pad);
let ret = sink_pad.gst_pad().clone();
block_on(sink_pad.prepare(&InputSelectorPadSinkHandler::new()));
sink_pad.prepare(&InputSelectorPadSinkHandler::new());
if state.active_sinkpad.is_none() {
state.active_sinkpad = Some(ret.clone());
@ -616,9 +649,9 @@ impl ElementImpl for InputSelector {
}
fn release_pad(&self, element: &gst::Element, pad: &gst::Pad) {
let mut pads = block_on(self.pads.lock());
let mut pads = self.pads.lock().unwrap();
let sink_pad = pads.sink_pads.remove(pad).unwrap();
block_on(sink_pad.unprepare());
sink_pad.unprepare();
element.remove_pad(pad).unwrap();
}
}

View file

@ -1,4 +1,4 @@
// Copyright (C) 2018 Sebastian Dröge <sebastian@centricular.com>
// Copyright (C) 2020 Mathieu Duponchelle <mathieu@centricular.com>
//
// This library is free software; you can redistribute it and/or
// modify it under the terms of the GNU Library General Public
@ -74,10 +74,8 @@ fn test_active_pad() {
assert_eq!(h2.push(buf), Ok(gst::FlowSuccess::Ok));
assert_eq!(h1.buffers_received(), 1);
assert_eq!(h1.events_received(), 4);
assert_eq!(h1.events_received(), 3);
let event = h1.pull_event().unwrap();
assert_eq!(event.get_type(), gst::EventType::CustomDownstreamSticky);
let event = h1.pull_event().unwrap();
assert_eq!(event.get_type(), gst::EventType::StreamStart);
let event = h1.pull_event().unwrap();
@ -89,7 +87,7 @@ fn test_active_pad() {
let buf = gst::Buffer::new();
assert_eq!(h2.push(buf), Ok(gst::FlowSuccess::Ok));
assert_eq!(h1.buffers_received(), 2);
assert_eq!(h1.events_received(), 4);
assert_eq!(h1.events_received(), 3);
/* Switch the active pad and push a buffer, we should receive stream-start, segment and caps
* again */
@ -98,7 +96,7 @@ fn test_active_pad() {
.unwrap();
assert_eq!(h1.push(buf), Ok(gst::FlowSuccess::Ok));
assert_eq!(h1.buffers_received(), 3);
assert_eq!(h1.events_received(), 7);
assert_eq!(h1.events_received(), 6);
let event = h1.pull_event().unwrap();
assert_eq!(event.get_type(), gst::EventType::StreamStart);
let event = h1.pull_event().unwrap();