ts: fix FlushStart / FlushStop events handling

This commit is contained in:
François Laignel 2020-01-06 13:19:11 +01:00
parent 3eed2f69d9
commit a15d60105b
12 changed files with 1604 additions and 784 deletions

View file

@ -19,7 +19,7 @@ use either::Either;
use futures::channel::mpsc;
use futures::future::BoxFuture;
use futures::lock::{Mutex, MutexGuard};
use futures::lock::Mutex;
use futures::prelude::*;
use glib;
@ -32,18 +32,17 @@ use gst;
use gst::prelude::*;
use gst::subclass::prelude::*;
use gst::{gst_debug, gst_element_error, gst_error, gst_error_msg, gst_log, gst_trace};
use gst::{EventView, QueryView};
use lazy_static::lazy_static;
use rand;
use std::convert::TryInto;
use std::sync::Arc;
use std::sync::{self, Arc};
use std::u32;
use crate::runtime::prelude::*;
use crate::runtime::{self, Context, PadSrc, PadSrcRef};
use crate::runtime::{self, Context, JoinHandle, PadSrc, PadSrcRef};
const DEFAULT_CONTEXT: &str = "";
const DEFAULT_CONTEXT_WAIT: u32 = 0;
@ -139,33 +138,32 @@ enum StreamItem {
}
#[derive(Debug)]
struct AppSrcPadHandlerInner {
struct AppSrcPadHandlerState {
need_initial_events: bool,
caps: Option<gst::Caps>,
configured_caps: Option<gst::Caps>,
}
impl Default for AppSrcPadHandlerInner {
impl Default for AppSrcPadHandlerState {
fn default() -> Self {
AppSrcPadHandlerInner {
AppSrcPadHandlerState {
need_initial_events: true,
caps: None,
configured_caps: None,
}
}
}
#[derive(Clone, Debug)]
struct AppSrcPadHandler(Arc<Mutex<AppSrcPadHandlerInner>>);
#[derive(Debug, Default)]
struct AppSrcPadHandlerInner {
state: sync::RwLock<AppSrcPadHandlerState>,
flush_join_handle: sync::Mutex<Option<JoinHandle<Result<(), ()>>>>,
}
#[derive(Clone, Debug, Default)]
struct AppSrcPadHandler(Arc<AppSrcPadHandlerInner>);
impl AppSrcPadHandler {
fn new() -> Self {
AppSrcPadHandler(Arc::new(Mutex::new(AppSrcPadHandlerInner::default())))
}
#[inline]
async fn lock(&self) -> MutexGuard<'_, AppSrcPadHandlerInner> {
self.0.lock().await
}
async fn start_task(
&self,
pad: PadSrcRef<'_>,
@ -206,8 +204,13 @@ impl AppSrcPadHandler {
let mut events = Vec::new();
{
let mut inner = self.lock().await;
if inner.need_initial_events {
// Only `read` the state in the hot path
if self.0.state.read().unwrap().need_initial_events {
// We will need to `write` and we also want to prevent
// any changes on the state while we are handling initial events
let mut state = self.0.state.write().unwrap();
assert!(state.need_initial_events);
gst_debug!(CAT, obj: pad.gst_pad(), "Pushing initial events");
let stream_id =
@ -217,17 +220,17 @@ impl AppSrcPadHandler {
.group_id(gst::util_group_id_next())
.build(),
);
let appsrc = AppSrc::from_instance(element);
if let Some(ref caps) = appsrc.settings.lock().await.caps {
if let Some(ref caps) = state.caps {
events.push(gst::Event::new_caps(&caps).build());
inner.configured_caps = Some(caps.clone());
state.configured_caps = Some(caps.clone());
}
events.push(
gst::Event::new_segment(&gst::FormattedSegment::<gst::format::Time>::new())
.build(),
);
inner.need_initial_events = false;
state.need_initial_events = false;
}
}
@ -238,11 +241,11 @@ impl AppSrcPadHandler {
let res = match item {
StreamItem::Buffer(buffer) => {
gst_log!(CAT, obj: pad.gst_pad(), "Forwarding buffer {:?}", buffer);
gst_log!(CAT, obj: pad.gst_pad(), "Forwarding {:?}", buffer);
pad.push(buffer).await
}
StreamItem::Event(event) => {
gst_log!(CAT, obj: pad.gst_pad(), "Forwarding event {:?}", event);
gst_log!(CAT, obj: pad.gst_pad(), "Forwarding {:?}", event);
pad.push_event(event).await;
Ok(gst::FlowSuccess::Ok)
}
@ -270,26 +273,69 @@ impl PadSrcHandler for AppSrcPadHandler {
fn src_event(
&self,
pad: PadSrcRef,
app_src: &AppSrc,
pad: &PadSrcRef,
_app_src: &AppSrc,
element: &gst::Element,
event: gst::Event,
) -> Either<bool, BoxFuture<'static, bool>> {
gst_log!(CAT, obj: pad.gst_pad(), "Handling event {:?}", event);
use gst::EventView;
gst_log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event);
let ret = match event.view() {
EventView::FlushStart(..) => {
let _ = runtime::executor::block_on(app_src.pause(element));
let mut flush_join_handle = self.0.flush_join_handle.lock().unwrap();
if flush_join_handle.is_none() {
let element = element.clone();
let pad_weak = pad.downgrade();
*flush_join_handle = Some(pad.spawn(async move {
let res = AppSrc::from_instance(&element).pause(&element).await;
let pad = pad_weak.upgrade().unwrap();
if res.is_ok() {
gst_debug!(CAT, obj: pad.gst_pad(), "FlushStart complete");
} else {
gst_debug!(CAT, obj: pad.gst_pad(), "FlushStart failed");
}
res
}));
} else {
gst_debug!(CAT, obj: pad.gst_pad(), "FlushStart ignored: previous Flush in progress");
}
true
}
EventView::FlushStop(..) => {
let (res, state, pending) = element.get_state(0.into());
if res == Ok(gst::StateChangeSuccess::Success) && state == gst::State::Playing
|| res == Ok(gst::StateChangeSuccess::Async) && pending == gst::State::Playing
{
let _ = runtime::executor::block_on(app_src.start(element));
let element = element.clone();
let inner_weak = Arc::downgrade(&self.0);
let pad_weak = pad.downgrade();
let fut = async move {
let mut ret = false;
let pad = pad_weak.upgrade().unwrap();
let inner_weak = inner_weak.upgrade().unwrap();
let flush_join_handle = inner_weak.flush_join_handle.lock().unwrap().take();
if let Some(flush_join_handle) = flush_join_handle {
if let Ok(Ok(())) = flush_join_handle.await {
ret = AppSrc::from_instance(&element)
.start(&element)
.await
.is_ok();
gst_debug!(CAT, obj: pad.gst_pad(), "FlushStop complete");
} else {
gst_debug!(CAT, obj: pad.gst_pad(), "FlushStop aborted: FlushStart failed");
}
} else {
gst_debug!(CAT, obj: pad.gst_pad(), "FlushStop ignored: no Flush in progress");
}
ret
}
true
.boxed();
return Either::Right(fut);
}
EventView::Reconfigure(..) => true,
EventView::Latency(..) => true,
@ -297,9 +343,9 @@ impl PadSrcHandler for AppSrcPadHandler {
};
if ret {
gst_log!(CAT, obj: pad.gst_pad(), "Handled event {:?}", event);
gst_log!(CAT, obj: pad.gst_pad(), "Handled {:?}", event);
} else {
gst_log!(CAT, obj: pad.gst_pad(), "Didn't handle event {:?}", event);
gst_log!(CAT, obj: pad.gst_pad(), "Didn't handle {:?}", event);
}
Either::Left(ret)
@ -307,12 +353,14 @@ impl PadSrcHandler for AppSrcPadHandler {
fn src_query(
&self,
pad: PadSrcRef,
pad: &PadSrcRef,
_app_src: &AppSrc,
_element: &gst::Element,
query: &mut gst::QueryRef,
) -> bool {
gst_log!(CAT, obj: pad.gst_pad(), "Handling query {:?}", query);
use gst::QueryView;
gst_log!(CAT, obj: pad.gst_pad(), "Handling {:?}", query);
let ret = match query.view_mut() {
QueryView::Latency(ref mut q) => {
q.set(true, 0.into(), 0.into());
@ -324,8 +372,8 @@ impl PadSrcHandler for AppSrcPadHandler {
true
}
QueryView::Caps(ref mut q) => {
let inner = runtime::executor::block_on(self.lock());
let caps = if let Some(ref caps) = inner.configured_caps {
let state = self.0.state.read().unwrap();
let caps = if let Some(ref caps) = state.configured_caps {
q.get_filter()
.map(|f| f.intersect_with_mode(caps, gst::CapsIntersectMode::First))
.unwrap_or_else(|| caps.clone())
@ -343,9 +391,9 @@ impl PadSrcHandler for AppSrcPadHandler {
};
if ret {
gst_log!(CAT, obj: pad.gst_pad(), "Handled query {:?}", query);
gst_log!(CAT, obj: pad.gst_pad(), "Handled {:?}", query);
} else {
gst_log!(CAT, obj: pad.gst_pad(), "Didn't handle query {:?}", query);
gst_log!(CAT, obj: pad.gst_pad(), "Didn't handle {:?}", query);
}
ret
}
@ -420,6 +468,9 @@ impl AppSrc {
let context = {
let settings = self.settings.lock().await;
self.src_pad_handler.0.state.write().unwrap().caps = settings.caps.clone();
Context::acquire(&settings.context, settings.context_wait).map_err(|err| {
gst_error_msg!(
gst::ResourceError::OpenRead,
@ -449,7 +500,12 @@ impl AppSrc {
self.src_pad.stop_task().await;
let _ = self.src_pad.unprepare().await;
self.src_pad_handler.lock().await.configured_caps = None;
self.src_pad_handler
.0
.state
.write()
.unwrap()
.configured_caps = None;
gst_debug!(CAT, obj: element, "Unprepared");
@ -565,7 +621,7 @@ impl ObjectSubclass for AppSrc {
Self {
src_pad,
src_pad_handler: AppSrcPadHandler::new(),
src_pad_handler: AppSrcPadHandler::default(),
state: Mutex::new(State::default()),
settings: Mutex::new(Settings::default()),
}
@ -663,9 +719,12 @@ impl ElementImpl for AppSrc {
.map_err(|_| gst::StateChangeError)?;
}
gst::StateChange::PausedToReady => {
runtime::executor::block_on(async {
self.src_pad_handler.lock().await.need_initial_events = true;
});
self.src_pad_handler
.0
.state
.write()
.unwrap()
.need_initial_events = true;
}
_ => (),
}

View file

@ -32,7 +32,6 @@ use gst;
use gst::prelude::*;
use gst::subclass::prelude::*;
use gst::{gst_debug, gst_error_msg, gst_info, gst_log, gst_trace};
use gst::{EventView, QueryView};
use gst_rtp::RTPBuffer;
use lazy_static::lazy_static;
@ -43,7 +42,7 @@ use std::time::Duration;
use crate::runtime::prelude::*;
use crate::runtime::{
self, Context, JoinHandle, PadContext, PadContextRef, PadSink, PadSinkRef, PadSrc, PadSrcRef,
self, Context, JoinHandle, PadContext, PadSink, PadSinkRef, PadSrc, PadSrcRef, PadSrcWeak,
};
use super::{RTPJitterBuffer, RTPJitterBufferItem, RTPPacketRateCtx};
@ -160,7 +159,7 @@ impl PadSinkHandler for JitterBufferPadSinkHandler {
fn sink_chain(
&self,
pad: PadSinkRef,
pad: &PadSinkRef,
_jitterbuffer: &JitterBuffer,
element: &gst::Element,
buffer: gst::Buffer,
@ -170,7 +169,7 @@ impl PadSinkHandler for JitterBufferPadSinkHandler {
async move {
let pad = pad_weak.upgrade().expect("PadSink no longer exists");
gst_debug!(CAT, obj: pad.gst_pad(), "Handling buffer {:?}", buffer);
gst_debug!(CAT, obj: pad.gst_pad(), "Handling {:?}", buffer);
let jitterbuffer = JitterBuffer::from_instance(&element);
jitterbuffer
.enqueue_item(pad.gst_pad(), &element, Some(buffer))
@ -181,69 +180,74 @@ impl PadSinkHandler for JitterBufferPadSinkHandler {
fn sink_event(
&self,
pad: PadSinkRef,
pad: &PadSinkRef,
jitterbuffer: &JitterBuffer,
element: &gst::Element,
event: gst::Event,
) -> Either<bool, BoxFuture<'static, bool>> {
use gst::EventView;
if event.is_serialized() {
let pad_weak = pad.downgrade();
let element = element.clone();
Either::Right(async move {
let pad = pad_weak.upgrade().expect("PadSink no longer exists");
Either::Right(
async move {
let pad = pad_weak.upgrade().expect("PadSink no longer exists");
let mut forward = true;
let mut forward = true;
gst_log!(CAT, obj: pad.gst_pad(), "Handling event {:?}", event);
gst_log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event);
let jitterbuffer = JitterBuffer::from_instance(&element);
match event.view() {
EventView::Segment(e) => {
let mut state = jitterbuffer.state.lock().await;
state.segment = e
.get_segment()
.clone()
.downcast::<gst::format::Time>()
.unwrap();
}
EventView::Eos(..) => {
let mut state = jitterbuffer.state.lock().await;
jitterbuffer.drain(&mut state, &element).await;
}
EventView::CustomDownstreamSticky(e) => {
if PadContext::is_pad_context_sticky_event(&e) {
forward = false;
let jitterbuffer = JitterBuffer::from_instance(&element);
match event.view() {
EventView::FlushStop(..) => {
jitterbuffer.flush(&element).await;
}
EventView::Segment(e) => {
let mut state = jitterbuffer.state.lock().await;
state.segment = e
.get_segment()
.clone()
.downcast::<gst::format::Time>()
.unwrap();
}
EventView::Eos(..) => {
let mut state = jitterbuffer.state.lock().await;
jitterbuffer.drain(&mut state, &element).await;
}
EventView::CustomDownstreamSticky(e) => {
if PadContext::is_pad_context_sticky_event(&e) {
forward = false;
}
}
_ => (),
};
if forward {
gst_log!(CAT, obj: pad.gst_pad(), "Forwarding serialized {:?}", event);
jitterbuffer.src_pad.push_event(event).await
} else {
true
}
_ => (),
};
if forward {
gst_log!(CAT, obj: pad.gst_pad(), "Forwarding serialized event {:?}", event);
jitterbuffer.src_pad.push_event(event).await
} else {
true
}
}.boxed())
.boxed(),
)
} else {
if let EventView::FlushStop(..) = event.view() {
runtime::executor::block_on(jitterbuffer.flush(element));
}
gst_log!(CAT, obj: pad.gst_pad(), "Forwarding non-serialized event {:?}", event);
gst_log!(CAT, obj: pad.gst_pad(), "Forwarding non-serialized {:?}", event);
Either::Left(jitterbuffer.src_pad.gst_pad().push_event(event))
}
}
fn sink_query(
&self,
pad: PadSinkRef,
pad: &PadSinkRef,
jitterbuffer: &JitterBuffer,
element: &gst::Element,
query: &mut gst::QueryRef,
) -> bool {
gst_log!(CAT, obj: pad.gst_pad(), "Forwarding query {:?}", query);
use gst::QueryView;
gst_log!(CAT, obj: pad.gst_pad(), "Forwarding {:?}", query);
match query.view_mut() {
QueryView::Drain(..) => {
@ -264,12 +268,14 @@ impl PadSrcHandler for JitterBufferPadSrcHandler {
fn src_query(
&self,
pad: PadSrcRef,
pad: &PadSrcRef,
jitterbuffer: &JitterBuffer,
_element: &gst::Element,
query: &mut gst::QueryRef,
) -> bool {
gst_log!(CAT, obj: pad.gst_pad(), "Forwarding query {:?}", query);
use gst::QueryView;
gst_log!(CAT, obj: pad.gst_pad(), "Forwarding {:?}", query);
match query.view_mut() {
QueryView::Latency(ref mut q) => {
@ -433,7 +439,7 @@ impl JitterBuffer {
) -> Result<gst::FlowSuccess, gst::FlowError> {
let s = caps.get_structure(0).ok_or(gst::FlowError::Error)?;
gst_info!(CAT, obj: element, "Parsing caps: {:?}", caps);
gst_info!(CAT, obj: element, "Parsing {:?}", caps);
let payload = s
.get_some::<i32>("payload")
@ -907,7 +913,7 @@ impl JitterBuffer {
state.num_pushed += 1;
gst_debug!(CAT, obj: self.src_pad.gst_pad(), "Pushing buffer {:?} with seq {}", buffer, seq);
gst_debug!(CAT, obj: self.src_pad.gst_pad(), "Pushing {:?} with seq {}", buffer, seq);
self.src_pad.push(buffer.to_owned()).await
}
@ -954,9 +960,6 @@ impl JitterBuffer {
let _ = wakeup_join_handle.await;
}
let pad_src_state = self.src_pad.lock_state().await;
let pad_ctx = pad_src_state.pad_context().unwrap();
gst_debug!(CAT, obj: element, "Scheduling wakeup in {}", delay);
let (wakeup_fut, abort_handle) = abortable(Self::wakeup_fut(
@ -964,9 +967,9 @@ impl JitterBuffer {
latency_ns,
context_wait_ns,
&element,
&pad_ctx,
self.src_pad.downgrade(),
));
state.wakeup_join_handle = Some(pad_ctx.spawn(wakeup_fut));
state.wakeup_join_handle = Some(self.src_pad.spawn(wakeup_fut));
state.wakeup_abort_handle = Some(abort_handle);
}
@ -975,17 +978,22 @@ impl JitterBuffer {
latency_ns: gst::ClockTime,
context_wait_ns: gst::ClockTime,
element: &gst::Element,
pad_ctx: &PadContextRef,
pad_src_weak: PadSrcWeak,
) -> BoxFuture<'static, ()> {
let element = element.clone();
let pad_ctx_weak = pad_ctx.downgrade();
async move {
runtime::time::delay_for(delay).await;
let jb = Self::from_instance(&element);
let mut state = jb.state.lock().await;
let pad_ctx = match pad_ctx_weak.upgrade() {
let pad_src = match pad_src_weak.upgrade() {
Some(pad_src) => pad_src,
None => return,
};
let pad_ctx = pad_src.pad_context();
let pad_ctx = match pad_ctx.upgrade() {
Some(pad_ctx) => pad_ctx,
None => return,
};
@ -1013,7 +1021,7 @@ impl JitterBuffer {
let (abortable_drain, abort_handle) = abortable(drain_fut);
state.task_queue_abort_handle = Some(abort_handle);
pad_ctx.spawn(abortable_drain.map(drop));
pad_src.spawn(abortable_drain.map(drop));
} else {
state.task_queue_abort_handle = None;
}
@ -1190,34 +1198,42 @@ impl ObjectImpl for JitterBuffer {
fn set_property(&self, _obj: &glib::Object, id: usize, value: &glib::Value) {
let prop = &PROPERTIES[id];
let mut settings = runtime::executor::block_on(self.settings.lock());
match *prop {
subclass::Property("latency", ..) => {
settings.latency_ms = value.get_some().expect("type checked upstream");
let latency_ms = {
let mut settings = runtime::executor::block_on(self.settings.lock());
settings.latency_ms = value.get_some().expect("type checked upstream");
settings.latency_ms as u64
};
runtime::executor::block_on(self.state.lock())
.jbuf
.borrow()
.set_delay(settings.latency_ms as u64 * gst::MSECOND);
.set_delay(latency_ms * gst::MSECOND);
/* TODO: post message */
}
subclass::Property("do-lost", ..) => {
let mut settings = runtime::executor::block_on(self.settings.lock());
settings.do_lost = value.get_some().expect("type checked upstream");
}
subclass::Property("max-dropout-time", ..) => {
let mut settings = runtime::executor::block_on(self.settings.lock());
settings.max_dropout_time = value.get_some().expect("type checked upstream");
}
subclass::Property("max-misorder-time", ..) => {
let mut settings = runtime::executor::block_on(self.settings.lock());
settings.max_misorder_time = value.get_some().expect("type checked upstream");
}
subclass::Property("context", ..) => {
let mut settings = runtime::executor::block_on(self.settings.lock());
settings.context = value
.get()
.expect("type checked upstream")
.unwrap_or_else(|| "".into());
}
subclass::Property("context-wait", ..) => {
let mut settings = runtime::executor::block_on(self.settings.lock());
settings.context_wait = value.get_some().expect("type checked upstream");
}
_ => unimplemented!(),

File diff suppressed because it is too large Load diff

View file

@ -32,15 +32,15 @@ use gst;
use gst::prelude::*;
use gst::subclass::prelude::*;
use gst::{gst_debug, gst_element_error, gst_error, gst_error_msg, gst_log, gst_trace};
use gst::{EventView, QueryView};
use lazy_static::lazy_static;
use std::collections::VecDeque;
use std::sync::{self, Arc};
use std::{u32, u64};
use crate::runtime::prelude::*;
use crate::runtime::{self, Context, PadSink, PadSinkRef, PadSrc, PadSrcRef};
use crate::runtime::{self, Context, JoinHandle, PadSink, PadSinkRef, PadSrc, PadSrcRef};
use super::dataqueue::{DataQueue, DataQueueItem};
@ -140,15 +140,30 @@ impl PendingQueue {
}
}
#[derive(Debug)]
struct QueuePadSinkHandlerInner {
flush_join_handle: sync::Mutex<Option<JoinHandle<Result<(), ()>>>>,
context: Context,
}
#[derive(Clone, Debug)]
struct QueuePadSinkHandler;
struct QueuePadSinkHandler(Arc<QueuePadSinkHandlerInner>);
impl QueuePadSinkHandler {
fn new(context: Context) -> Self {
QueuePadSinkHandler(Arc::new(QueuePadSinkHandlerInner {
flush_join_handle: sync::Mutex::new(None),
context,
}))
}
}
impl PadSinkHandler for QueuePadSinkHandler {
type ElementImpl = Queue;
fn sink_chain(
&self,
pad: PadSinkRef,
pad: &PadSinkRef,
_queue: &Queue,
element: &gst::Element,
buffer: gst::Buffer,
@ -157,7 +172,7 @@ impl PadSinkHandler for QueuePadSinkHandler {
let element = element.clone();
async move {
let pad = pad_weak.upgrade().expect("PadSink no longer exists");
gst_log!(CAT, obj: pad.gst_pad(), "Handling buffer {:?}", buffer);
gst_log!(CAT, obj: pad.gst_pad(), "Handling {:?}", buffer);
let queue = Queue::from_instance(&element);
queue
.enqueue_item(&element, DataQueueItem::Buffer(buffer))
@ -168,7 +183,7 @@ impl PadSinkHandler for QueuePadSinkHandler {
fn sink_chain_list(
&self,
pad: PadSinkRef,
pad: &PadSinkRef,
_queue: &Queue,
element: &gst::Element,
list: gst::BufferList,
@ -177,7 +192,7 @@ impl PadSinkHandler for QueuePadSinkHandler {
let element = element.clone();
async move {
let pad = pad_weak.upgrade().expect("PadSink no longer exists");
gst_log!(CAT, obj: pad.gst_pad(), "Handling buffer list {:?}", list);
gst_log!(CAT, obj: pad.gst_pad(), "Handling {:?}", list);
let queue = Queue::from_instance(&element);
queue
.enqueue_item(&element, DataQueueItem::BufferList(list))
@ -188,39 +203,42 @@ impl PadSinkHandler for QueuePadSinkHandler {
fn sink_event(
&self,
pad: PadSinkRef,
pad: &PadSinkRef,
queue: &Queue,
element: &gst::Element,
event: gst::Event,
) -> Either<bool, BoxFuture<'static, bool>> {
use gst::EventView;
if event.is_serialized() {
let pad_weak = pad.downgrade();
let element = element.clone();
let inner_weak = Arc::downgrade(&self.0);
Either::Right(
async move {
let pad = pad_weak.upgrade().expect("PadSink no longer exists");
gst_log!(CAT, obj: pad.gst_pad(), "Handling event {:?}", event);
gst_log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event);
let queue = Queue::from_instance(&element);
match event.view() {
EventView::FlushStart(..) => {
let _ = queue.stop(&element).await;
}
EventView::FlushStop(..) => {
let (res, state, pending) = element.get_state(0.into());
if res == Ok(gst::StateChangeSuccess::Success)
&& state == gst::State::Paused
|| res == Ok(gst::StateChangeSuccess::Async)
&& pending == gst::State::Paused
{
if let EventView::FlushStop(..) = event.view() {
let inner = inner_weak.upgrade().unwrap();
let flush_join_handle = inner.flush_join_handle.lock().unwrap().take();
if let Some(flush_join_handle) = flush_join_handle {
gst_debug!(CAT, obj: pad.gst_pad(), "Waiting for FlushStart to complete");
if let Ok(Ok(())) = flush_join_handle.await {
let _ = queue.start(&element).await;
} else {
gst_debug!(CAT, obj: pad.gst_pad(), "FlushStop ignored: FlushStart failed to complete");
}
} else {
gst_debug!(CAT, obj: pad.gst_pad(), "FlushStop ignored: no Flush in progress");
}
_ => (),
}
gst_log!(CAT, obj: pad.gst_pad(), "Queuing serialized event {:?}", event);
gst_log!(CAT, obj: pad.gst_pad(), "Queuing serialized {:?}", event);
queue
.enqueue_item(&element, DataQueueItem::Event(event))
.await
@ -229,7 +247,21 @@ impl PadSinkHandler for QueuePadSinkHandler {
.boxed(),
)
} else {
gst_log!(CAT, obj: pad.gst_pad(), "Forwarding non-serialized event {:?}", event);
if let EventView::FlushStart(..) = event.view() {
let mut flush_join_handle = self.0.flush_join_handle.lock().unwrap();
if flush_join_handle.is_none() {
gst_log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event);
let element = element.clone();
*flush_join_handle =
Some(self.0.context.spawn(async move {
Queue::from_instance(&element).stop(&element).await
}));
} else {
gst_debug!(CAT, obj: pad.gst_pad(), "FlushStart ignored: previous Flush in progress");
}
}
gst_log!(CAT, obj: pad.gst_pad(), "Forwarding non-serialized {:?}", event);
Either::Left(queue.src_pad.gst_pad().push_event(event))
}
@ -237,26 +269,31 @@ impl PadSinkHandler for QueuePadSinkHandler {
fn sink_query(
&self,
pad: PadSinkRef,
pad: &PadSinkRef,
queue: &Queue,
_element: &gst::Element,
query: &mut gst::QueryRef,
) -> bool {
gst_log!(CAT, obj: pad.gst_pad(), "Handling query {:?}", query);
gst_log!(CAT, obj: pad.gst_pad(), "Handling {:?}", query);
if query.is_serialized() {
// FIXME: How can we do this?
gst_log!(CAT, obj: pad.gst_pad(), "Dropping serialized query {:?}", query);
gst_log!(CAT, obj: pad.gst_pad(), "Dropping serialized {:?}", query);
false
} else {
gst_log!(CAT, obj: pad.gst_pad(), "Forwarding query {:?}", query);
gst_log!(CAT, obj: pad.gst_pad(), "Forwarding {:?}", query);
queue.src_pad.gst_pad().peer_query(query)
}
}
}
#[derive(Clone, Debug)]
struct QueuePadSrcHandler;
#[derive(Debug, Default)]
struct QueuePadSrcHandlerInner {
flush_join_handle: sync::Mutex<Option<JoinHandle<Result<(), ()>>>>,
}
#[derive(Clone, Debug, Default)]
struct QueuePadSrcHandler(Arc<QueuePadSrcHandlerInner>);
impl QueuePadSrcHandler {
async fn start_task(pad: PadSrcRef<'_>, element: &gst::Element, dataqueue: DataQueue) {
@ -295,15 +332,15 @@ impl QueuePadSrcHandler {
let res = match item {
DataQueueItem::Buffer(buffer) => {
gst_log!(CAT, obj: pad.gst_pad(), "Forwarding buffer {:?}", buffer);
gst_log!(CAT, obj: pad.gst_pad(), "Forwarding {:?}", buffer);
pad.push(buffer).await.map(drop)
}
DataQueueItem::BufferList(list) => {
gst_log!(CAT, obj: pad.gst_pad(), "Forwarding buffer list {:?}", list);
gst_log!(CAT, obj: pad.gst_pad(), "Forwarding {:?}", list);
pad.push_list(list).await.map(drop)
}
DataQueueItem::Event(event) => {
gst_log!(CAT, obj: pad.gst_pad(), "Forwarding event {:?}", event);
gst_log!(CAT, obj: pad.gst_pad(), "Forwarding {:?}", event);
pad.push_event(event).await;
Ok(())
}
@ -347,57 +384,98 @@ impl PadSrcHandler for QueuePadSrcHandler {
fn src_event(
&self,
pad: PadSrcRef,
pad: &PadSrcRef,
queue: &Queue,
element: &gst::Element,
event: gst::Event,
) -> Either<bool, BoxFuture<'static, bool>> {
gst_log!(CAT, obj: pad.gst_pad(), "Handling event {:?}", event);
use gst::EventView;
gst_log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event);
if event.is_serialized() {
let pad_weak = pad.downgrade();
let element = element.clone();
let inner_weak = Arc::downgrade(&self.0);
let pad_weak = pad.downgrade();
Either::Right(
async move {
let pad = pad_weak.upgrade().expect("PadSrc no longer exists");
gst_log!(CAT, obj: pad.gst_pad(), "Forwarding serialized event {:?}", event);
let ret = if let EventView::FlushStop(..) = event.view() {
let mut ret = false;
let queue = Queue::from_instance(&element);
queue.sink_pad.gst_pad().push_event(event)
let pad = pad_weak.upgrade().unwrap();
let inner_weak = inner_weak.upgrade().unwrap();
let flush_join_handle = inner_weak.flush_join_handle.lock().unwrap().take();
if let Some(flush_join_handle) = flush_join_handle {
gst_debug!(CAT, obj: pad.gst_pad(), "Waiting for FlushStart to complete");
if let Ok(Ok(())) = flush_join_handle.await {
ret = Queue::from_instance(&element)
.start(&element)
.await
.is_ok();
gst_log!(CAT, obj: pad.gst_pad(), "FlushStop complete");
} else {
gst_debug!(CAT, obj: pad.gst_pad(), "FlushStop aborted: FlushStart failed");
}
} else {
gst_debug!(CAT, obj: pad.gst_pad(), "FlushStop ignored: no Flush in progress");
}
ret
} else {
true
};
if ret {
let pad = pad_weak.upgrade().expect("PadSrc no longer exists");
gst_log!(CAT, obj: pad.gst_pad(), "Forwarding serialized {:?}", event);
let queue = Queue::from_instance(&element);
queue.sink_pad.gst_pad().push_event(event)
} else {
false
}
}
.boxed(),
)
} else {
match event.view() {
EventView::FlushStart(..) => {
let _ = runtime::executor::block_on(queue.stop(element));
}
EventView::FlushStop(..) => {
let (res, state, pending) = element.get_state(0.into());
if res == Ok(gst::StateChangeSuccess::Success) && state == gst::State::Playing
|| res == Ok(gst::StateChangeSuccess::Async)
&& pending == gst::State::Playing
{
let _ = runtime::executor::block_on(queue.start(element));
}
}
_ => (),
};
if let EventView::FlushStart(..) = event.view() {
let mut flush_join_handle = self.0.flush_join_handle.lock().unwrap();
if flush_join_handle.is_none() {
let element = element.clone();
let pad_weak = pad.downgrade();
gst_log!(CAT, obj: pad.gst_pad(), "Forwarding non-serialized event {:?}", event);
*flush_join_handle = Some(pad.spawn(async move {
let res = Queue::from_instance(&element).stop(&element).await;
let pad = pad_weak.upgrade().unwrap();
if res.is_ok() {
gst_debug!(CAT, obj: pad.gst_pad(), "FlushStart complete");
} else {
gst_debug!(CAT, obj: pad.gst_pad(), "FlushStart failed");
}
res
}));
} else {
gst_debug!(CAT, obj: pad.gst_pad(), "FlushStart ignored: previous Flush in progress");
}
}
gst_log!(CAT, obj: pad.gst_pad(), "Forwarding non-serialized {:?}", event);
Either::Left(queue.sink_pad.gst_pad().push_event(event))
}
}
fn src_query(
&self,
pad: PadSrcRef,
pad: &PadSrcRef,
queue: &Queue,
_element: &gst::Element,
query: &mut gst::QueryRef,
) -> bool {
gst_log!(CAT, obj: pad.gst_pad(), "Handling query {:?}", query);
use gst::QueryView;
gst_log!(CAT, obj: pad.gst_pad(), "Handling {:?}", query);
if let QueryView::Scheduling(ref mut q) = query.view_mut() {
let mut new_query = gst::Query::new_scheduling();
@ -422,7 +500,7 @@ impl PadSrcHandler for QueuePadSrcHandler {
return true;
}
gst_log!(CAT, obj: pad.gst_pad(), "Forwarding query {:?}", query);
gst_log!(CAT, obj: pad.gst_pad(), "Forwarding {:?}", query);
queue.sink_pad.gst_pad().peer_query(query)
}
}
@ -670,7 +748,7 @@ impl Queue {
})?;
self.src_pad
.prepare(context, &QueuePadSrcHandler)
.prepare(context.clone(), &QueuePadSrcHandler::default())
.await
.map_err(|err| {
gst_error_msg!(
@ -678,7 +756,9 @@ impl Queue {
["Error joining Context: {:?}", err]
)
})?;
self.sink_pad.prepare(&QueuePadSinkHandler).await;
self.sink_pad
.prepare(&QueuePadSinkHandler::new(context))
.await;
gst_debug!(CAT, obj: element, "Prepared");
@ -809,28 +889,24 @@ impl ObjectImpl for Queue {
fn set_property(&self, _obj: &glib::Object, id: usize, value: &glib::Value) {
let prop = &PROPERTIES[id];
let mut settings = runtime::executor::block_on(self.settings.lock());
match *prop {
subclass::Property("max-size-buffers", ..) => {
let mut settings = runtime::executor::block_on(self.settings.lock());
settings.max_size_buffers = value.get_some().expect("type checked upstream");
}
subclass::Property("max-size-bytes", ..) => {
let mut settings = runtime::executor::block_on(self.settings.lock());
settings.max_size_bytes = value.get_some().expect("type checked upstream");
}
subclass::Property("max-size-time", ..) => {
let mut settings = runtime::executor::block_on(self.settings.lock());
settings.max_size_time = value.get_some().expect("type checked upstream");
}
subclass::Property("context", ..) => {
let mut settings = runtime::executor::block_on(self.settings.lock());
settings.context = value
.get()
.expect("type checked upstream")
.unwrap_or_else(|| "".into());
}
subclass::Property("context-wait", ..) => {
let mut settings = runtime::executor::block_on(self.settings.lock());
settings.context_wait = value.get_some().expect("type checked upstream");
}
_ => unimplemented!(),
@ -840,27 +916,13 @@ impl ObjectImpl for Queue {
fn get_property(&self, _obj: &glib::Object, id: usize) -> Result<glib::Value, ()> {
let prop = &PROPERTIES[id];
let settings = runtime::executor::block_on(self.settings.lock());
match *prop {
subclass::Property("max-size-buffers", ..) => {
let settings = runtime::executor::block_on(self.settings.lock());
Ok(settings.max_size_buffers.to_value())
}
subclass::Property("max-size-bytes", ..) => {
let settings = runtime::executor::block_on(self.settings.lock());
Ok(settings.max_size_bytes.to_value())
}
subclass::Property("max-size-time", ..) => {
let settings = runtime::executor::block_on(self.settings.lock());
Ok(settings.max_size_time.to_value())
}
subclass::Property("context", ..) => {
let settings = runtime::executor::block_on(self.settings.lock());
Ok(settings.context.to_value())
}
subclass::Property("context-wait", ..) => {
let settings = runtime::executor::block_on(self.settings.lock());
Ok(settings.context_wait.to_value())
}
subclass::Property("max-size-buffers", ..) => Ok(settings.max_size_buffers.to_value()),
subclass::Property("max-size-bytes", ..) => Ok(settings.max_size_bytes.to_value()),
subclass::Property("max-size-time", ..) => Ok(settings.max_size_time.to_value()),
subclass::Property("context", ..) => Ok(settings.context.to_value()),
subclass::Property("context-wait", ..) => Ok(settings.context_wait.to_value()),
_ => unimplemented!(),
}
}

View file

@ -25,17 +25,14 @@
//!
//! * Waiting for an incoming packet on a Socket.
//! * Waiting for an asynchronous `Mutex` `lock` to succeed.
//! * Waiting for a `Timeout` to be elapsed.
//!
//! [`Context`]s instantiators define the minimum time between two iterations of the [`Context`]
//! loop, which acts as a throttle, saving CPU usage when no operations are to be executed.
//! * Waiting for a time related `Future`.
//!
//! `Element` implementations should use [`PadSrc`] & [`PadSink`] which provides high-level features.
//!
//! [`threadshare`]: ../index.html
//! [`threadshare`]: ../../index.html
//! [`Context`]: struct.Context.html
//! [`PadSrc`]: struct.PadSrc.html
//! [`PadSink`]: struct.PadSink.html
//! [`PadSrc`]: ../pad/struct.PadSrc.html
//! [`PadSink`]: ../pad/struct.PadSink.html
use futures::channel::oneshot;
use futures::future::BoxFuture;
@ -82,8 +79,10 @@ thread_local!(static CURRENT_THREAD_CONTEXT: RefCell<Option<ContextWeak>> = RefC
/// Blocks on `future`.
///
/// This function must NOT be called within a [`Context`] thread.
/// IO & time related `Future`s must be handled within their own [`Context`].
/// Wait for the result using a [`JoinHandle`] or a `channel`.
///
/// This function must NOT be called within a [`Context`] thread.
/// The reason is this would prevent any task operating on the
/// [`Context`] from making progress.
///
@ -92,6 +91,7 @@ thread_local!(static CURRENT_THREAD_CONTEXT: RefCell<Option<ContextWeak>> = RefC
/// This function panics if called within a [`Context`] thread.
///
/// [`Context`]: struct.Context.html
/// [`JoinHandle`]: enum.JoinHandle.html
pub fn block_on<Fut: Future>(future: Fut) -> Fut::Output {
if Context::is_context_thread() {
panic!("Attempt to `block_on` within a `Context` thread");
@ -290,8 +290,8 @@ impl ContextWeak {
///
/// See the [module-level documentation](index.html) for more.
///
/// [`PadSrc`]: ../struct.PadSrc.html
/// [`PadSink`]: ../struct.PadSink.html
/// [`PadSrc`]: ../pad/struct.PadSrc.html
/// [`PadSink`]: ../pad/struct.PadSink.html
#[derive(Clone, Debug)]
pub struct Context(Arc<ContextInner>);

View file

@ -47,7 +47,7 @@ pub mod executor;
pub use executor::{Context, JoinHandle, TaskOutput};
pub mod pad;
pub use pad::{PadSink, PadSinkRef, PadSrc, PadSrcRef, PadSrcWeak};
pub use pad::{PadSink, PadSinkRef, PadSinkWeak, PadSrc, PadSrcRef, PadSrcWeak};
pub mod pad_context;
pub use pad_context::{PadContext, PadContextRef, PadContextWeak};

View file

@ -1,4 +1,4 @@
// Copyright (C) 2019 François Laignel <fengalin@free.fr>
// Copyright (C) 2019-2020 François Laignel <fengalin@free.fr>
//
// This library is free software; you can redistribute it and/or
// modify it under the terms of the GNU Library General Public
@ -81,8 +81,8 @@ use std::marker::PhantomData;
use std::sync;
use std::sync::{Arc, Weak};
use super::executor::{self, Context};
use super::pad_context::{PadContext, PadContextRef, PadContextWeak};
use super::executor::{self, Context, JoinHandle, TaskOutput};
use super::pad_context::{PadContext, PadContextWeak};
use super::task::Task;
use super::RUNTIME_CAT;
@ -144,7 +144,7 @@ pub trait PadSrcHandler: Clone + Send + Sync + 'static {
fn src_activate(
&self,
pad: PadSrcRef,
pad: &PadSrcRef,
_imp: &Self::ElementImpl,
_element: &gst::Element,
) -> Result<(), gst::LoggableError> {
@ -174,7 +174,7 @@ pub trait PadSrcHandler: Clone + Send + Sync + 'static {
fn src_activatemode(
&self,
_pad: PadSrcRef,
_pad: &PadSrcRef,
_imp: &Self::ElementImpl,
_element: &gst::Element,
_mode: gst::PadMode,
@ -185,7 +185,7 @@ pub trait PadSrcHandler: Clone + Send + Sync + 'static {
fn src_event(
&self,
pad: PadSrcRef,
pad: &PadSrcRef,
_imp: &Self::ElementImpl,
element: &gst::Element,
event: gst::Event,
@ -197,21 +197,21 @@ pub trait PadSrcHandler: Clone + Send + Sync + 'static {
Either::Right(
async move {
let pad = pad_weak.upgrade().expect("PadSrc no longer exists");
gst_log!(RUNTIME_CAT, obj: pad.gst_pad(), "Handling event {:?}", event);
gst_log!(RUNTIME_CAT, obj: pad.gst_pad(), "Handling {:?}", event);
pad.gst_pad().event_default(Some(&element), event)
}
.boxed(),
)
} else {
gst_log!(RUNTIME_CAT, obj: pad.gst_pad(), "Handling event {:?}", event);
gst_log!(RUNTIME_CAT, obj: pad.gst_pad(), "Handling {:?}", event);
Either::Left(pad.gst_pad().event_default(Some(element), event))
}
}
fn src_event_full(
&self,
pad: PadSrcRef,
pad: &PadSrcRef,
imp: &Self::ElementImpl,
element: &gst::Element,
event: gst::Event,
@ -225,12 +225,12 @@ pub trait PadSrcHandler: Clone + Send + Sync + 'static {
fn src_query(
&self,
pad: PadSrcRef,
pad: &PadSrcRef,
_imp: &Self::ElementImpl,
element: &gst::Element,
query: &mut gst::QueryRef,
) -> bool {
gst_log!(RUNTIME_CAT, obj: pad.gst_pad(), "Handling query {:?}", query);
gst_log!(RUNTIME_CAT, obj: pad.gst_pad(), "Handling {:?}", query);
if query.is_serialized() {
// FIXME serialized queries should be handled with the dataflow
// but we can't return a `Future` because we couldn't honor QueryRef's lifetime
@ -244,37 +244,13 @@ pub trait PadSrcHandler: Clone + Send + Sync + 'static {
#[derive(Default, Debug)]
pub struct PadSrcState {
is_initialized: bool,
pad_context: Option<PadContext>,
}
impl PadSrcState {
fn pad_context_priv(&self) -> &PadContext {
self.pad_context
.as_ref()
.expect("PadContext not initialized")
}
pub fn pad_context(&self) -> Option<PadContextRef<'_>> {
self.pad_context
.as_ref()
.map(|pad_context| pad_context.as_ref())
}
}
impl Drop for PadSrcState {
fn drop(&mut self) {
// Check invariant which can't be held automatically in `PadSrc`
// because `drop` can't be `async`
if self.pad_context.is_some() {
panic!("Missing call to `PadSrc::unprepare`");
}
}
}
#[derive(Debug)]
struct PadSrcInner {
state: Mutex<PadSrcState>,
gst_pad: gst::Pad,
pad_context: sync::RwLock<Option<PadContext>>,
task: Task,
}
@ -287,9 +263,24 @@ impl PadSrcInner {
PadSrcInner {
state: Mutex::new(PadSrcState::default()),
gst_pad,
pad_context: sync::RwLock::new(None),
task: Task::default(),
}
}
fn has_pad_context(&self) -> bool {
self.pad_context.read().unwrap().as_ref().is_some()
}
}
impl Drop for PadSrcInner {
fn drop(&mut self) {
// Check invariant which can't be held automatically in `PadSrc`
// because `drop` can't be `async`
if self.has_pad_context() {
panic!("Missing call to `PadSrc::unprepare`");
}
}
}
/// A [`PadSrc`] which can be moved in [`handler`]s functions and `Future`s.
@ -343,6 +334,25 @@ impl<'a> PadSrcRef<'a> {
self.strong.lock_state().await
}
pub fn pad_context(&self) -> PadContextWeak {
self.strong.pad_context()
}
/// Spawns `future` using current [`PadContext`].
///
/// # Panics
///
/// This function panics if the `PadSrc` is not prepared.
///
/// [`PadContext`]: ../struct.PadContext.html
pub fn spawn<Fut>(&self, future: Fut) -> JoinHandle<Fut::Output>
where
Fut: Future + Send + 'static,
Fut::Output: Send + 'static,
{
self.strong.spawn(future)
}
pub fn downgrade(&self) -> PadSrcWeak {
self.strong.downgrade()
}
@ -421,45 +431,96 @@ impl PadSrcStrong {
}
#[inline]
pub async fn lock_state(&self) -> MutexGuard<'_, PadSrcState> {
async fn lock_state(&self) -> MutexGuard<'_, PadSrcState> {
self.0.state.lock().await
}
#[inline]
fn pad_context_priv(&self) -> sync::RwLockReadGuard<'_, Option<PadContext>> {
self.0.pad_context.read().unwrap()
}
#[inline]
fn pad_context(&self) -> PadContextWeak {
self.pad_context_priv()
.as_ref()
.expect("PadContext not initialized")
.downgrade()
}
#[inline]
pub fn spawn<Fut>(&self, future: Fut) -> JoinHandle<Fut::Output>
where
Fut: Future + Send + 'static,
Fut::Output: Send + 'static,
{
let pad_ctx = self.pad_context_priv();
pad_ctx
.as_ref()
.expect("PadContext not initialized")
.spawn(future)
}
#[inline]
fn downgrade(&self) -> PadSrcWeak {
PadSrcWeak(Arc::downgrade(&self.0))
}
fn push_prelude(&self, state: &mut MutexGuard<'_, PadSrcState>) {
let must_send_task_context = if state.is_initialized {
self.gst_pad().check_reconfigure()
} else {
// Get rid of reconfigure flag
self.gst_pad().check_reconfigure();
state.is_initialized = true;
fn push_prelude(
&self,
state: &mut MutexGuard<'_, PadSrcState>,
) -> Result<FlowSuccess, FlowError> {
if !state.is_initialized || self.gst_pad().check_reconfigure() {
if !self.push_pad_context_event() {
return Err(FlowError::Error);
}
true
};
if must_send_task_context {
let pad_ctx = state.pad_context_priv();
gst_log!(
RUNTIME_CAT,
obj: self.gst_pad(),
"Pushing PadContext Event {}",
pad_ctx,
);
self.gst_pad().push_event(pad_ctx.new_sticky_event());
if !state.is_initialized {
// Get rid of reconfigure flag
self.gst_pad().check_reconfigure();
state.is_initialized = true;
}
}
Ok(FlowSuccess::Ok)
}
#[inline]
fn push_pad_context_event(&self) -> bool {
let pad_ctx = self.pad_context_priv();
let pad_ctx = pad_ctx.as_ref().unwrap();
gst_log!(
RUNTIME_CAT,
obj: self.gst_pad(),
"Pushing PadContext Event {}",
pad_ctx,
);
let ret = self.gst_pad().push_event(pad_ctx.new_sticky_event());
if !ret {
gst_error!(RUNTIME_CAT,
obj: self.gst_pad(),
"Failed to push PadContext sticky event to PadSrc",
);
}
ret
}
fn drain_pending_tasks(&self) -> Option<impl Future<Output = TaskOutput>> {
self.pad_context_priv()
.as_ref()
.unwrap()
.drain_pending_tasks()
}
#[inline]
async fn push(&self, buffer: gst::Buffer) -> Result<FlowSuccess, FlowError> {
let mut state = self.lock_state().await;
self.push_prelude(&mut state);
gst_log!(RUNTIME_CAT, obj: self.gst_pad(), "Pushing {:?}", buffer);
self.push_prelude(&mut state)?;
gst_log!(RUNTIME_CAT, obj: self.gst_pad(), "Pushing buffer");
let success = self.gst_pad().push(buffer).map_err(|err| {
gst_error!(RUNTIME_CAT,
obj: self.gst_pad(),
@ -469,7 +530,7 @@ impl PadSrcStrong {
err
})?;
if let Some(pending_tasks) = state.pad_context_priv().drain_pending_tasks() {
if let Some(pending_tasks) = self.drain_pending_tasks() {
gst_log!(RUNTIME_CAT, obj: self.gst_pad(), "Processing pending tasks (push)");
pending_tasks.await?;
}
@ -480,21 +541,22 @@ impl PadSrcStrong {
#[inline]
async fn push_list(&self, list: gst::BufferList) -> Result<FlowSuccess, FlowError> {
let mut state = self.lock_state().await;
self.push_prelude(&mut state);
gst_log!(RUNTIME_CAT, obj: self.gst_pad(), "Pushing {:?}", list);
self.push_prelude(&mut state)?;
gst_log!(RUNTIME_CAT, obj: self.gst_pad(), "Pushing buffer_list");
let success = self.gst_pad().push_list(list).map_err(|err| {
gst_error!(
RUNTIME_CAT,
obj: self.gst_pad(),
"Failed to push BufferList to PadSrc: {:?} ({})",
err,
state.pad_context_priv(),
self.pad_context_priv().as_ref().unwrap(),
);
err
})?;
if let Some(pending_tasks) = state.pad_context_priv().drain_pending_tasks() {
if let Some(pending_tasks) = self.drain_pending_tasks() {
gst_log!(RUNTIME_CAT, obj: self.gst_pad(), "Processing pending tasks (push_list)");
pending_tasks.await?;
}
@ -505,12 +567,28 @@ impl PadSrcStrong {
#[inline]
async fn push_event(&self, event: gst::Event) -> bool {
let mut state = self.lock_state().await;
self.push_prelude(&mut state);
gst_log!(RUNTIME_CAT, obj: self.gst_pad(), "Pushing event");
let was_handled = self.gst_pad().push_event(event);
let was_handled = if PadContext::is_pad_context_event(&event) {
// Push our own PadContext
if !self.push_pad_context_event() {
return false;
}
if let Some(pending_tasks) = state.pad_context_priv().drain_pending_tasks() {
// Get rid of reconfigure flag
self.gst_pad().check_reconfigure();
state.is_initialized = true;
true
} else {
gst_log!(RUNTIME_CAT, obj: self.gst_pad(), "Pushing {:?}", event);
if self.push_prelude(&mut state).is_err() {
return false;
}
self.gst_pad().push_event(event)
};
if let Some(pending_tasks) = self.drain_pending_tasks() {
gst_log!(RUNTIME_CAT, obj: self.gst_pad(), "Processing pending tasks (push_event)");
if pending_tasks.await.is_err() {
return false;
@ -596,6 +674,25 @@ impl PadSrc {
self.0.lock_state().await
}
pub fn pad_context(&self) -> PadContextWeak {
self.0.pad_context()
}
/// Spawns `future` using current [`PadContext`].
///
/// # Panics
///
/// This function panics if the `PadSrc` is not prepared.
///
/// [`PadContext`]: ../struct.PadContext.html
pub fn spawn<Fut>(&self, future: Fut) -> JoinHandle<Fut::Output>
where
Fut: Future + Send + 'static,
Fut::Output: Send + 'static,
{
self.0.spawn(future)
}
fn init_pad_functions<H: PadSrcHandler>(&self, handler: &H) {
let handler_clone = handler.clone();
let this_weak = self.downgrade();
@ -611,7 +708,7 @@ impl PadSrc {
},
move |imp, element| {
let this_ref = this_weak.upgrade().expect("PadSrc no longer exists");
handler.src_activate(this_ref, imp, element)
handler.src_activate(&this_ref, imp, element)
},
)
});
@ -634,8 +731,7 @@ impl PadSrc {
move |imp, element| {
let this_ref = this_weak.upgrade().expect("PadSrc no longer exists");
this_ref.activate_mode_hook(mode, active)?;
handler.src_activatemode(this_ref, imp, element, mode, active)
handler.src_activatemode(&this_ref, imp, element, mode, active)
},
)
});
@ -653,7 +749,7 @@ impl PadSrc {
|| Err(FlowError::Error),
move |imp, element| {
let this_ref = this_weak.upgrade().expect("PadSrc no longer exists");
match handler.src_event_full(this_ref, imp, &element, event) {
match handler.src_event_full(&this_ref, imp, &element, event) {
Either::Left(res) => res,
Either::Right(_fut) => {
// See these threads:
@ -678,7 +774,7 @@ impl PadSrc {
move |imp, element| {
let this_ref = this_weak.upgrade().expect("PadSrc no longer exists");
if !query.is_serialized() {
handler.src_query(this_ref, imp, &element, query)
handler.src_query(&this_ref, imp, &element, query)
} else {
gst_fixme!(RUNTIME_CAT, obj: this_ref.gst_pad(), "Serialized Query not supported");
false
@ -693,10 +789,10 @@ impl PadSrc {
context: Context,
handler: &H,
) -> Result<(), PadContextError> {
let mut state = self.lock_state().await;
let _state = self.lock_state().await;
gst_log!(RUNTIME_CAT, obj: self.gst_pad(), "Preparing");
if state.pad_context.is_some() {
if (self.0).0.has_pad_context() {
return Err(PadContextError::ActiveContext);
}
@ -707,7 +803,7 @@ impl PadSrc {
.await
.map_err(|_| PadContextError::ActiveTask)?;
state.pad_context = Some(PadContext::new(context));
*(self.0).0.pad_context.write().unwrap() = Some(PadContext::new(context.clone()));
self.init_pad_functions(handler);
@ -716,7 +812,7 @@ impl PadSrc {
/// Releases the resources held by this `PadSrc`.
pub async fn unprepare(&self) -> Result<(), PadContextError> {
let mut state = self.lock_state().await;
let _state = self.lock_state().await;
gst_log!(RUNTIME_CAT, obj: self.gst_pad(), "Unpreparing");
(self.0)
@ -737,7 +833,7 @@ impl PadSrc {
self.gst_pad()
.set_query_function(move |_gst_pad, _parent, _query| false);
state.pad_context = None;
*(self.0).0.pad_context.write().unwrap() = None;
Ok(())
}
@ -787,7 +883,7 @@ pub trait PadSinkHandler: Clone + Send + Sync + 'static {
fn sink_activate(
&self,
pad: PadSinkRef,
pad: &PadSinkRef,
_imp: &Self::ElementImpl,
_element: &gst::Element,
) -> Result<(), gst::LoggableError> {
@ -817,7 +913,7 @@ pub trait PadSinkHandler: Clone + Send + Sync + 'static {
fn sink_activatemode(
&self,
_pad: PadSinkRef,
_pad: &PadSinkRef,
_imp: &Self::ElementImpl,
_element: &gst::Element,
_mode: gst::PadMode,
@ -828,7 +924,7 @@ pub trait PadSinkHandler: Clone + Send + Sync + 'static {
fn sink_chain(
&self,
_pad: PadSinkRef,
_pad: &PadSinkRef,
_imp: &Self::ElementImpl,
_element: &gst::Element,
_buffer: gst::Buffer,
@ -838,7 +934,7 @@ pub trait PadSinkHandler: Clone + Send + Sync + 'static {
fn sink_chain_list(
&self,
_pad: PadSinkRef,
_pad: &PadSinkRef,
_imp: &Self::ElementImpl,
_element: &gst::Element,
_buffer_list: gst::BufferList,
@ -848,7 +944,7 @@ pub trait PadSinkHandler: Clone + Send + Sync + 'static {
fn sink_event(
&self,
pad: PadSinkRef,
pad: &PadSinkRef,
_imp: &Self::ElementImpl,
element: &gst::Element,
event: gst::Event,
@ -860,21 +956,21 @@ pub trait PadSinkHandler: Clone + Send + Sync + 'static {
Either::Right(
async move {
let pad = pad_weak.upgrade().expect("PadSink no longer exists");
gst_log!(RUNTIME_CAT, obj: pad.gst_pad(), "Handling event {:?}", event);
gst_log!(RUNTIME_CAT, obj: pad.gst_pad(), "Handling {:?}", event);
pad.gst_pad().event_default(Some(&element), event)
}
.boxed(),
)
} else {
gst_log!(RUNTIME_CAT, obj: pad.gst_pad(), "Handling event {:?}", event);
gst_log!(RUNTIME_CAT, obj: pad.gst_pad(), "Handling {:?}", event);
Either::Left(pad.gst_pad().event_default(Some(element), event))
}
}
fn sink_event_full(
&self,
pad: PadSinkRef,
pad: &PadSinkRef,
imp: &Self::ElementImpl,
element: &gst::Element,
event: gst::Event,
@ -888,12 +984,12 @@ pub trait PadSinkHandler: Clone + Send + Sync + 'static {
fn sink_query(
&self,
pad: PadSinkRef,
pad: &PadSinkRef,
_imp: &Self::ElementImpl,
element: &gst::Element,
query: &mut gst::QueryRef,
) -> bool {
gst_log!(RUNTIME_CAT, obj: pad.gst_pad(), "Handling query {:?}", query);
gst_log!(RUNTIME_CAT, obj: pad.gst_pad(), "Handling {:?}", query);
if query.is_serialized() {
// FIXME serialized queries should be handled with the dataflow
// but we can't return a `Future` because we couldn't honor QueryRef's lifetime
@ -907,6 +1003,7 @@ pub trait PadSinkHandler: Clone + Send + Sync + 'static {
#[derive(Debug)]
struct PadSinkInner {
gst_pad: gst::Pad,
pad_context: sync::RwLock<Option<PadContextWeak>>,
}
impl PadSinkInner {
@ -915,7 +1012,10 @@ impl PadSinkInner {
panic!("Wrong pad direction for PadSink");
}
PadSinkInner { gst_pad }
PadSinkInner {
gst_pad,
pad_context: sync::RwLock::new(None),
}
}
}
@ -965,6 +1065,10 @@ impl<'a> PadSinkRef<'a> {
self.strong.gst_pad()
}
pub fn pad_context(&self) -> Option<PadContextWeak> {
self.strong.pad_context()
}
pub fn downgrade(&self) -> PadSinkWeak {
self.strong.downgrade()
}
@ -988,6 +1092,33 @@ impl<'a> PadSinkRef<'a> {
Ok(())
}
fn handle_future(
&self,
fut: impl Future<Output = Result<FlowSuccess, FlowError>> + Send + 'static,
) -> Result<FlowSuccess, FlowError> {
if Context::is_context_thread() {
self.pad_context()
.as_ref()
.and_then(|pad_ctx_weak| pad_ctx_weak.upgrade())
.expect("Operating on a Context without a valid PadContext")
.add_pending_task(fut.map(|res| res.map(drop)));
Ok(FlowSuccess::Ok)
} else {
// Not on a context thread: execute the Future immediately.
//
// - If there is no PadContext, we don't have any other options.
// - If there is a PadContext, it means that we received it from
// an upstream element, but there is at least one non-ts element
// operating on another thread in between, so we can't take
// advantage of the task queue.
//
// Note: we don't use `crate::runtime::executor::block_on` here
// because `Context::is_context_thread()` is checked in the `if`
// statement above.
futures::executor::block_on(fut)
}
}
}
#[derive(Debug)]
@ -1002,6 +1133,10 @@ impl PadSinkStrong {
&self.0.gst_pad
}
fn pad_context(&self) -> Option<PadContextWeak> {
self.0.pad_context.read().unwrap().clone()
}
fn downgrade(&self) -> PadSinkWeak {
PadSinkWeak(Arc::downgrade(&self.0))
}
@ -1047,6 +1182,10 @@ impl PadSink {
self.0.gst_pad()
}
pub fn pad_context(&self) -> Option<PadContextWeak> {
self.0.pad_context()
}
pub fn downgrade(&self) -> PadSinkWeak {
self.0.downgrade()
}
@ -1069,7 +1208,7 @@ impl PadSink {
},
move |imp, element| {
let this_ref = this_weak.upgrade().expect("PadSink no longer exists");
handler.sink_activate(this_ref, imp, element)
handler.sink_activate(&this_ref, imp, element)
},
)
});
@ -1093,48 +1232,42 @@ impl PadSink {
let this_ref = this_weak.upgrade().expect("PadSink no longer exists");
this_ref.activate_mode_hook(mode, active)?;
handler.sink_activatemode(this_ref, imp, element, mode, active)
handler.sink_activatemode(&this_ref, imp, element, mode, active)
},
)
});
// Functions depending on the `PadContext`
let pad_ctx = Arc::new(sync::Mutex::new(None));
let handler_clone = handler.clone();
let this_weak = self.downgrade();
let pad_ctx_clone = pad_ctx.clone();
self.gst_pad()
.set_chain_function(move |_gst_pad, parent, buffer| {
let handler = handler_clone.clone();
let this_weak = this_weak.clone();
let pad_ctx = pad_ctx_clone.clone();
H::ElementImpl::catch_panic_pad_function(
parent,
|| Err(FlowError::Error),
move |imp, element| {
let this_ref = this_weak.upgrade().expect("PadSink no longer exists");
let chain_fut = handler.sink_chain(this_ref, imp, &element, buffer);
Self::handle_future(pad_ctx, chain_fut)
let chain_fut = handler.sink_chain(&this_ref, imp, &element, buffer);
this_ref.handle_future(chain_fut)
},
)
});
let handler_clone = handler.clone();
let this_weak = self.downgrade();
let pad_ctx_clone = pad_ctx.clone();
self.gst_pad()
.set_chain_list_function(move |_gst_pad, parent, list| {
let handler = handler_clone.clone();
let this_weak = this_weak.clone();
let pad_ctx = pad_ctx_clone.clone();
H::ElementImpl::catch_panic_pad_function(
parent,
|| Err(FlowError::Error),
move |imp, element| {
let this_ref = this_weak.upgrade().expect("PadSink no longer exists");
let chain_list_fut = handler.sink_chain_list(this_ref, imp, &element, list);
Self::handle_future(pad_ctx, chain_list_fut)
let chain_list_fut =
handler.sink_chain_list(&this_ref, imp, &element, list);
this_ref.handle_future(chain_list_fut)
},
)
});
@ -1147,25 +1280,19 @@ impl PadSink {
.set_event_full_function(move |gst_pad, parent, event| {
let handler = handler_clone.clone();
let this_weak = this_weak.clone();
let pad_ctx = pad_ctx.clone();
H::ElementImpl::catch_panic_pad_function(
parent,
|| Err(FlowError::Error),
move |imp, element| {
let this_ref = this_weak.upgrade().expect("PadSink no longer exists");
if let Some(received_pc) = PadContext::check_pad_context_event(&event) {
gst_log!(
RUNTIME_CAT,
obj: gst_pad,
"Received PadContext Event {:?}",
received_pc
);
*pad_ctx.lock().unwrap() = Some(received_pc);
gst_log!(RUNTIME_CAT, obj: gst_pad, "Received {:?}", received_pc);
*this_ref.strong.0.pad_context.write().unwrap() = Some(received_pc);
}
match handler.sink_event_full(this_ref, imp, &element, event) {
match handler.sink_event_full(&this_ref, imp, &element, event) {
Either::Left(ret) => ret,
Either::Right(fut) => Self::handle_future(pad_ctx, fut),
Either::Right(fut) => this_ref.handle_future(fut),
}
},
)
@ -1183,7 +1310,7 @@ impl PadSink {
move |imp, element| {
let this_ref = this_weak.upgrade().expect("PadSink no longer exists");
if !query.is_serialized() {
handler.sink_query(this_ref, imp, &element, query)
handler.sink_query(&this_ref, imp, &element, query)
} else {
gst_fixme!(RUNTIME_CAT, obj: this_ref.gst_pad(), "Serialized Query not supported");
false
@ -1193,31 +1320,6 @@ impl PadSink {
});
}
fn handle_future(
pad_ctx: Arc<sync::Mutex<Option<PadContextWeak>>>,
fut: impl Future<Output = Result<FlowSuccess, FlowError>> + Send + 'static,
) -> Result<FlowSuccess, FlowError> {
match pad_ctx
.lock()
.unwrap()
.as_ref()
.and_then(|pad_ctx_weak| pad_ctx_weak.upgrade())
{
Some(pad_ctx) => {
pad_ctx.add_pending_task(fut.map(|res| res.map(drop)));
Ok(FlowSuccess::Ok)
}
None => match Context::current() {
None => executor::block_on(fut),
Some(context) => {
// Don't block the Context thread
context.spawn(fut);
Ok(FlowSuccess::Ok)
}
},
}
}
pub async fn prepare<H: PadSinkHandler>(&self, handler: &H) {
gst_log!(RUNTIME_CAT, obj: self.gst_pad(), "Preparing");
self.init_pad_functions(handler);

View file

@ -15,11 +15,7 @@
// Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
// Boston, MA 02110-1335, USA.
//! A wrapper on a [`Context`] with additional features for [`PadSrc`] & [`PadSink`].
//!
//! [`Context`]: ../executor/struct.Context.html
//! [`PadSrc`]: ../pad/struct.PadSrc.html
//! [`PadSink`]: ../pad/struct.PadSink.html
//! Types that allow `Pad`s to operate within the threadshare runtime.
use futures::prelude::*;
@ -216,6 +212,15 @@ impl PadContext {
event.get_structure().unwrap().get_name() == "ts-pad-context"
}
#[inline]
pub fn is_pad_context_event(event: &gst::Event) -> bool {
if let gst::EventView::CustomDownstreamSticky(e) = event.view() {
return Self::is_pad_context_sticky_event(&e);
}
false
}
pub fn check_pad_context_event(event: &gst::Event) -> Option<PadContextWeak> {
if let gst::EventView::CustomDownstreamSticky(e) = event.view() {
if Self::is_pad_context_sticky_event(&e) {

View file

@ -15,9 +15,7 @@
// Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
// Boston, MA 02110-1335, USA.
//! An execution loop to run asynchronous processing on a [`Context`].
//!
//! [`Context`]: ../executor/struct.Context.html
//! An execution loop to run asynchronous processing.
use futures::future::{self, abortable, AbortHandle, Aborted, BoxFuture};
use futures::lock::Mutex;
@ -77,7 +75,7 @@ impl Drop for TaskInner {
/// A `Task` operating on a `threadshare` [`Context`].
///
/// [`Context`]: struct.Context.html
/// [`Context`]: ../executor/struct.Context.html
#[derive(Debug)]
pub struct Task(Arc<Mutex<TaskInner>>);

View file

@ -18,7 +18,7 @@
use either::Either;
use futures::future::BoxFuture;
use futures::lock::{Mutex, MutexGuard};
use futures::lock::Mutex;
use futures::prelude::*;
use glib;
@ -31,7 +31,6 @@ use gst;
use gst::prelude::*;
use gst::subclass::prelude::*;
use gst::{gst_debug, gst_element_error, gst_error, gst_error_msg, gst_log, gst_trace};
use gst::{EventView, QueryView};
use lazy_static::lazy_static;
@ -39,7 +38,7 @@ use rand;
use std::io;
use std::net::{IpAddr, SocketAddr};
use std::sync::Arc;
use std::sync::{self, Arc};
use std::u16;
use tokio::io::AsyncReadExt;
@ -175,35 +174,34 @@ impl SocketRead for TcpClientReader {
}
}
struct TcpClientSrcPadHandlerInner {
socket_stream: Option<SocketStream<TcpClientReader>>,
#[derive(Debug)]
struct TcpClientSrcPadHandlerState {
need_initial_events: bool,
caps: Option<gst::Caps>,
configured_caps: Option<gst::Caps>,
}
impl Default for TcpClientSrcPadHandlerInner {
impl Default for TcpClientSrcPadHandlerState {
fn default() -> Self {
TcpClientSrcPadHandlerInner {
socket_stream: None,
TcpClientSrcPadHandlerState {
need_initial_events: true,
caps: None,
configured_caps: None,
}
}
}
#[derive(Clone)]
struct TcpClientSrcPadHandler(Arc<Mutex<TcpClientSrcPadHandlerInner>>);
#[derive(Debug, Default)]
struct TcpClientSrcPadHandlerInner {
state: sync::RwLock<TcpClientSrcPadHandlerState>,
socket_stream: Mutex<Option<SocketStream<TcpClientReader>>>,
flush_join_handle: sync::Mutex<Option<JoinHandle<Result<(), ()>>>>,
}
#[derive(Clone, Debug, Default)]
struct TcpClientSrcPadHandler(Arc<TcpClientSrcPadHandlerInner>);
impl TcpClientSrcPadHandler {
fn new() -> Self {
TcpClientSrcPadHandler(Arc::new(Mutex::new(TcpClientSrcPadHandlerInner::default())))
}
#[inline]
async fn lock(&self) -> MutexGuard<'_, TcpClientSrcPadHandlerInner> {
self.0.lock().await
}
async fn start_task(&self, pad: PadSrcRef<'_>, element: &gst::Element) {
let this = self.clone();
let pad_weak = pad.downgrade();
@ -214,9 +212,10 @@ impl TcpClientSrcPadHandler {
let element = element.clone();
async move {
let item = this
.0
.socket_stream
.lock()
.await
.socket_stream
.as_mut()
.expect("Missing SocketStream")
.next()
@ -265,8 +264,13 @@ impl TcpClientSrcPadHandler {
{
let mut events = Vec::new();
{
let mut inner = self.lock().await;
if inner.need_initial_events {
// Only `read` the state in the hot path
if self.0.state.read().unwrap().need_initial_events {
// We will need to `write` and we also want to prevent
// any changes on the state while we are handling initial events
let mut state = self.0.state.write().unwrap();
assert!(state.need_initial_events);
gst_debug!(CAT, obj: pad.gst_pad(), "Pushing initial events");
let stream_id =
@ -276,17 +280,17 @@ impl TcpClientSrcPadHandler {
.group_id(gst::util_group_id_next())
.build(),
);
let tcpclientsrc = TcpClientSrc::from_instance(element);
if let Some(ref caps) = tcpclientsrc.settings.lock().await.caps {
if let Some(ref caps) = state.caps {
events.push(gst::Event::new_caps(&caps).build());
inner.configured_caps = Some(caps.clone());
state.configured_caps = Some(caps.clone());
}
events.push(
gst::Event::new_segment(&gst::FormattedSegment::<gst::format::Time>::new())
.build(),
);
inner.need_initial_events = false;
state.need_initial_events = false;
}
if buffer.get_size() == 0 {
@ -329,26 +333,69 @@ impl PadSrcHandler for TcpClientSrcPadHandler {
fn src_event(
&self,
pad: PadSrcRef,
tcpclientsrc: &TcpClientSrc,
pad: &PadSrcRef,
_tcpclientsrc: &TcpClientSrc,
element: &gst::Element,
event: gst::Event,
) -> Either<bool, BoxFuture<'static, bool>> {
gst_log!(CAT, obj: pad.gst_pad(), "Handling event {:?}", event);
use gst::EventView;
gst_log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event);
let ret = match event.view() {
EventView::FlushStart(..) => {
let _ = runtime::executor::block_on(tcpclientsrc.pause(element));
let mut flush_join_handle = self.0.flush_join_handle.lock().unwrap();
if flush_join_handle.is_none() {
let element = element.clone();
let pad_weak = pad.downgrade();
*flush_join_handle = Some(pad.spawn(async move {
let res = TcpClientSrc::from_instance(&element).pause(&element).await;
let pad = pad_weak.upgrade().unwrap();
if res.is_ok() {
gst_debug!(CAT, obj: pad.gst_pad(), "FlushStart complete");
} else {
gst_debug!(CAT, obj: pad.gst_pad(), "FlushStart failed");
}
res
}));
} else {
gst_debug!(CAT, obj: pad.gst_pad(), "FlushStart ignored: previous Flush in progress");
}
true
}
EventView::FlushStop(..) => {
let (res, state, pending) = element.get_state(0.into());
if res == Ok(gst::StateChangeSuccess::Success) && state == gst::State::Playing
|| res == Ok(gst::StateChangeSuccess::Async) && pending == gst::State::Playing
{
let _ = runtime::executor::block_on(tcpclientsrc.start(element));
let element = element.clone();
let inner_weak = Arc::downgrade(&self.0);
let pad_weak = pad.downgrade();
let fut = async move {
let mut ret = false;
let pad = pad_weak.upgrade().unwrap();
let inner_weak = inner_weak.upgrade().unwrap();
let flush_join_handle = inner_weak.flush_join_handle.lock().unwrap().take();
if let Some(flush_join_handle) = flush_join_handle {
if let Ok(Ok(())) = flush_join_handle.await {
ret = TcpClientSrc::from_instance(&element)
.start(&element)
.await
.is_ok();
gst_debug!(CAT, obj: pad.gst_pad(), "FlushStop complete");
} else {
gst_debug!(CAT, obj: pad.gst_pad(), "FlushStop aborted: FlushStart failed");
}
} else {
gst_debug!(CAT, obj: pad.gst_pad(), "FlushStop ignored: no Flush in progress");
}
ret
}
true
.boxed();
return Either::Right(fut);
}
EventView::Reconfigure(..) => true,
EventView::Latency(..) => true,
@ -356,9 +403,9 @@ impl PadSrcHandler for TcpClientSrcPadHandler {
};
if ret {
gst_log!(CAT, obj: pad.gst_pad(), "Handled event {:?}", event);
gst_log!(CAT, obj: pad.gst_pad(), "Handled {:?}", event);
} else {
gst_log!(CAT, obj: pad.gst_pad(), "Didn't handle event {:?}", event);
gst_log!(CAT, obj: pad.gst_pad(), "Didn't handle {:?}", event);
}
Either::Left(ret)
@ -366,12 +413,14 @@ impl PadSrcHandler for TcpClientSrcPadHandler {
fn src_query(
&self,
pad: PadSrcRef,
pad: &PadSrcRef,
_tcpclientsrc: &TcpClientSrc,
_element: &gst::Element,
query: &mut gst::QueryRef,
) -> bool {
gst_log!(CAT, obj: pad.gst_pad(), "Handling query {:?}", query);
use gst::QueryView;
gst_log!(CAT, obj: pad.gst_pad(), "Handling {:?}", query);
let ret = match query.view_mut() {
QueryView::Latency(ref mut q) => {
q.set(false, 0.into(), 0.into());
@ -383,8 +432,8 @@ impl PadSrcHandler for TcpClientSrcPadHandler {
true
}
QueryView::Caps(ref mut q) => {
let inner = runtime::executor::block_on(self.lock());
let caps = if let Some(ref caps) = inner.configured_caps {
let state = self.0.state.read().unwrap();
let caps = if let Some(ref caps) = state.configured_caps {
q.get_filter()
.map(|f| f.intersect_with_mode(caps, gst::CapsIntersectMode::First))
.unwrap_or_else(|| caps.clone())
@ -402,9 +451,9 @@ impl PadSrcHandler for TcpClientSrcPadHandler {
};
if ret {
gst_log!(CAT, obj: pad.gst_pad(), "Handled query {:?}", query);
gst_log!(CAT, obj: pad.gst_pad(), "Handled {:?}", query);
} else {
gst_log!(CAT, obj: pad.gst_pad(), "Didn't handle query {:?}", query);
gst_log!(CAT, obj: pad.gst_pad(), "Didn't handle {:?}", query);
}
ret
@ -450,6 +499,9 @@ impl TcpClientSrc {
let context = {
let settings = self.settings.lock().await;
self.src_pad_handler.0.state.write().unwrap().caps = settings.caps.clone();
Context::acquire(&settings.context, settings.context_wait).map_err(|err| {
gst_error_msg!(
gst::ResourceError::OpenRead,
@ -526,7 +578,7 @@ impl TcpClientSrc {
)
})?;
this.src_pad_handler.lock().await.socket_stream = Some(socket_stream);
*this.src_pad_handler.0.socket_stream.lock().await = Some(socket_stream);
this.state.lock().await.socket = Some(socket);
@ -536,17 +588,18 @@ impl TcpClientSrc {
}
async fn complete_preparation(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
let preparation_set = self.preparation_set.lock().await.take();
if preparation_set.is_none() {
gst_log!(CAT, obj: element, "Preparation already completed");
return Ok(());
}
gst_debug!(CAT, obj: element, "Completing preparation");
let PreparationSet {
join_handle,
context,
} = self
.preparation_set
.lock()
.await
.take()
.expect("preparation_set already taken");
} = preparation_set.unwrap();
join_handle
.await
@ -579,7 +632,12 @@ impl TcpClientSrc {
}
let _ = self.src_pad.unprepare().await;
self.src_pad_handler.lock().await.configured_caps = None;
self.src_pad_handler
.0
.state
.write()
.unwrap()
.configured_caps = None;
gst_debug!(CAT, obj: element, "Unprepared");
Ok(())
@ -659,7 +717,7 @@ impl ObjectSubclass for TcpClientSrc {
Self {
src_pad,
src_pad_handler: TcpClientSrcPadHandler::new(),
src_pad_handler: TcpClientSrcPadHandler::default(),
state: Mutex::new(State::default()),
settings: Mutex::new(Settings::default()),
preparation_set: Mutex::new(None),
@ -768,8 +826,12 @@ impl ElementImpl for TcpClientSrc {
.map_err(|_| gst::StateChangeError)?;
}
gst::StateChange::PausedToReady => {
let mut src_pad_handler = runtime::executor::block_on(self.src_pad_handler.lock());
src_pad_handler.need_initial_events = true;
self.src_pad_handler
.0
.state
.write()
.unwrap()
.need_initial_events = true;
}
_ => (),
}

View file

@ -18,7 +18,7 @@
use either::Either;
use futures::future::BoxFuture;
use futures::lock::{Mutex, MutexGuard};
use futures::lock::Mutex;
use futures::prelude::*;
use gio;
@ -36,7 +36,6 @@ use gst;
use gst::prelude::*;
use gst::subclass::prelude::*;
use gst::{gst_debug, gst_element_error, gst_error, gst_error_msg, gst_log, gst_trace};
use gst::{EventView, QueryView};
use gst_net::*;
use lazy_static::lazy_static;
@ -45,7 +44,7 @@ use rand;
use std::io;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
use std::sync::Arc;
use std::sync::{self, Arc};
use std::u16;
#[cfg(unix)]
@ -328,37 +327,35 @@ impl SocketRead for UdpReader {
}
#[derive(Debug)]
struct UdpSrcPadHandlerInner {
struct UdpSrcPadHandlerState {
retrieve_sender_address: bool,
socket_stream: Option<SocketStream<UdpReader>>,
need_initial_events: bool,
caps: Option<gst::Caps>,
configured_caps: Option<gst::Caps>,
}
impl Default for UdpSrcPadHandlerInner {
impl Default for UdpSrcPadHandlerState {
fn default() -> Self {
UdpSrcPadHandlerInner {
UdpSrcPadHandlerState {
retrieve_sender_address: true,
socket_stream: None,
need_initial_events: true,
caps: None,
configured_caps: None,
}
}
}
#[derive(Clone, Debug)]
struct UdpSrcPadHandler(Arc<Mutex<UdpSrcPadHandlerInner>>);
#[derive(Debug, Default)]
struct UdpSrcPadHandlerInner {
state: sync::RwLock<UdpSrcPadHandlerState>,
socket_stream: Mutex<Option<SocketStream<UdpReader>>>,
flush_join_handle: sync::Mutex<Option<JoinHandle<Result<(), ()>>>>,
}
#[derive(Clone, Debug, Default)]
struct UdpSrcPadHandler(Arc<UdpSrcPadHandlerInner>);
impl UdpSrcPadHandler {
fn new() -> Self {
UdpSrcPadHandler(Arc::new(Mutex::new(UdpSrcPadHandlerInner::default())))
}
#[inline]
async fn lock(&self) -> MutexGuard<'_, UdpSrcPadHandlerInner> {
self.0.lock().await
}
async fn start_task(&self, pad: PadSrcRef<'_>, element: &gst::Element) {
let this = self.clone();
let pad_weak = pad.downgrade();
@ -369,9 +366,10 @@ impl UdpSrcPadHandler {
let element = element.clone();
async move {
let item = this
.0
.socket_stream
.lock()
.await
.socket_stream
.as_mut()
.expect("Missing SocketStream")
.next()
@ -411,7 +409,7 @@ impl UdpSrcPadHandler {
};
if let Some(saddr) = saddr {
if this.lock().await.retrieve_sender_address {
if this.0.state.read().unwrap().retrieve_sender_address {
let inet_addr = match saddr.ip() {
IpAddr::V4(ip) => gio::InetAddress::new_from_bytes(
gio::InetAddressBytes::V4(&ip.octets()),
@ -436,8 +434,13 @@ impl UdpSrcPadHandler {
{
let mut events = Vec::new();
{
let mut inner = self.lock().await;
if inner.need_initial_events {
// Only `read` the state in the hot path
if self.0.state.read().unwrap().need_initial_events {
// We will need to `write` and we also want to prevent
// any changes on the state while we are handling initial events
let mut state = self.0.state.write().unwrap();
assert!(state.need_initial_events);
gst_debug!(CAT, obj: pad.gst_pad(), "Pushing initial events");
let stream_id =
@ -447,17 +450,17 @@ impl UdpSrcPadHandler {
.group_id(gst::util_group_id_next())
.build(),
);
let udpsrc = UdpSrc::from_instance(element);
if let Some(ref caps) = udpsrc.settings.lock().await.caps {
if let Some(ref caps) = state.caps {
events.push(gst::Event::new_caps(&caps).build());
inner.configured_caps = Some(caps.clone());
state.configured_caps = Some(caps.clone());
}
events.push(
gst::Event::new_segment(&gst::FormattedSegment::<gst::format::Time>::new())
.build(),
);
inner.need_initial_events = false;
state.need_initial_events = false;
}
}
@ -496,26 +499,69 @@ impl PadSrcHandler for UdpSrcPadHandler {
fn src_event(
&self,
pad: PadSrcRef,
udpsrc: &UdpSrc,
pad: &PadSrcRef,
_udpsrc: &UdpSrc,
element: &gst::Element,
event: gst::Event,
) -> Either<bool, BoxFuture<'static, bool>> {
gst_log!(CAT, obj: pad.gst_pad(), "Handling event {:?}", event);
use gst::EventView;
gst_log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event);
let ret = match event.view() {
EventView::FlushStart(..) => {
let _ = runtime::executor::block_on(udpsrc.pause(element));
let mut flush_join_handle = self.0.flush_join_handle.lock().unwrap();
if flush_join_handle.is_none() {
let element = element.clone();
let pad_weak = pad.downgrade();
*flush_join_handle = Some(pad.spawn(async move {
let res = UdpSrc::from_instance(&element).pause(&element).await;
let pad = pad_weak.upgrade().unwrap();
if res.is_ok() {
gst_debug!(CAT, obj: pad.gst_pad(), "FlushStart complete");
} else {
gst_debug!(CAT, obj: pad.gst_pad(), "FlushStart failed");
}
res
}));
} else {
gst_debug!(CAT, obj: pad.gst_pad(), "FlushStart ignored: previous Flush in progress");
}
true
}
EventView::FlushStop(..) => {
let (res, state, pending) = element.get_state(0.into());
if res == Ok(gst::StateChangeSuccess::Success) && state == gst::State::Playing
|| res == Ok(gst::StateChangeSuccess::Async) && pending == gst::State::Playing
{
let _ = runtime::executor::block_on(udpsrc.start(element));
let element = element.clone();
let inner_weak = Arc::downgrade(&self.0);
let pad_weak = pad.downgrade();
let fut = async move {
let mut ret = false;
let pad = pad_weak.upgrade().unwrap();
let inner_weak = inner_weak.upgrade().unwrap();
let flush_join_handle = inner_weak.flush_join_handle.lock().unwrap().take();
if let Some(flush_join_handle) = flush_join_handle {
if let Ok(Ok(())) = flush_join_handle.await {
ret = UdpSrc::from_instance(&element)
.start(&element)
.await
.is_ok();
gst_debug!(CAT, obj: pad.gst_pad(), "FlushStop complete");
} else {
gst_debug!(CAT, obj: pad.gst_pad(), "FlushStop aborted: FlushStart failed");
}
} else {
gst_debug!(CAT, obj: pad.gst_pad(), "FlushStop ignored: no Flush in progress");
}
ret
}
true
.boxed();
return Either::Right(fut);
}
EventView::Reconfigure(..) => true,
EventView::Latency(..) => true,
@ -523,9 +569,9 @@ impl PadSrcHandler for UdpSrcPadHandler {
};
if ret {
gst_log!(CAT, obj: pad.gst_pad(), "Handled event {:?}", event);
gst_log!(CAT, obj: pad.gst_pad(), "Handled {:?}", event);
} else {
gst_log!(CAT, obj: pad.gst_pad(), "Didn't handle event {:?}", event);
gst_log!(CAT, obj: pad.gst_pad(), "Didn't handle {:?}", event);
}
Either::Left(ret)
@ -533,12 +579,14 @@ impl PadSrcHandler for UdpSrcPadHandler {
fn src_query(
&self,
pad: PadSrcRef,
pad: &PadSrcRef,
_udpsrc: &UdpSrc,
_element: &gst::Element,
query: &mut gst::QueryRef,
) -> bool {
gst_log!(CAT, obj: pad.gst_pad(), "Handling query {:?}", query);
use gst::QueryView;
gst_log!(CAT, obj: pad.gst_pad(), "Handling {:?}", query);
let ret = match query.view_mut() {
QueryView::Latency(ref mut q) => {
@ -551,8 +599,8 @@ impl PadSrcHandler for UdpSrcPadHandler {
true
}
QueryView::Caps(ref mut q) => {
let inner = runtime::executor::block_on(self.lock());
let caps = if let Some(ref caps) = inner.configured_caps {
let state = self.0.state.read().unwrap();
let caps = if let Some(ref caps) = state.configured_caps {
q.get_filter()
.map(|f| f.intersect_with_mode(caps, gst::CapsIntersectMode::First))
.unwrap_or_else(|| caps.clone())
@ -570,9 +618,9 @@ impl PadSrcHandler for UdpSrcPadHandler {
};
if ret {
gst_log!(CAT, obj: pad.gst_pad(), "Handled query {:?}", query);
gst_log!(CAT, obj: pad.gst_pad(), "Handled {:?}", query);
} else {
gst_log!(CAT, obj: pad.gst_pad(), "Didn't handle query {:?}", query);
gst_log!(CAT, obj: pad.gst_pad(), "Didn't handle {:?}", query);
}
ret
@ -621,6 +669,12 @@ impl UdpSrc {
let context = {
let settings = self.settings.lock().await.clone();
{
let mut src_pad_handler_state = self.src_pad_handler.0.state.write().unwrap();
src_pad_handler_state.retrieve_sender_address = settings.retrieve_sender_address;
src_pad_handler_state.caps = settings.caps.clone();
}
Context::acquire(&settings.context, settings.context_wait).map_err(|err| {
gst_error_msg!(
gst::ResourceError::OpenRead,
@ -857,11 +911,7 @@ impl UdpSrc {
)
})?;
{
let mut src_pad_handler = this.src_pad_handler.lock().await;
src_pad_handler.retrieve_sender_address = settings.retrieve_sender_address;
src_pad_handler.socket_stream = Some(socket_stream);
}
*this.src_pad_handler.0.socket_stream.lock().await = Some(socket_stream);
this.state.lock().await.socket = Some(socket);
@ -919,7 +969,12 @@ impl UdpSrc {
}
let _ = self.src_pad.unprepare().await;
self.src_pad_handler.lock().await.configured_caps = None;
self.src_pad_handler
.0
.state
.write()
.unwrap()
.configured_caps = None;
gst_debug!(CAT, obj: element, "Unprepared");
Ok(())
@ -1015,7 +1070,7 @@ impl ObjectSubclass for UdpSrc {
Self {
src_pad,
src_pad_handler: UdpSrcPadHandler::new(),
src_pad_handler: UdpSrcPadHandler::default(),
state: Mutex::new(State::default()),
settings: Mutex::new(Settings::default()),
preparation_set: Mutex::new(None),
@ -1152,9 +1207,12 @@ impl ElementImpl for UdpSrc {
.map_err(|_| gst::StateChangeError)?;
}
gst::StateChange::PausedToReady => {
runtime::executor::block_on(async {
self.src_pad_handler.lock().await.need_initial_events = true;
});
self.src_pad_handler
.0
.state
.write()
.unwrap()
.need_initial_events = true;
}
_ => (),
}

View file

@ -34,13 +34,16 @@ use gst::{gst_debug, gst_error_msg, gst_log};
use lazy_static::lazy_static;
use std::boxed::Box;
use std::sync::Arc;
use std::sync::{self, Arc};
use gstthreadshare::runtime::executor::block_on;
use gstthreadshare::runtime::prelude::*;
use gstthreadshare::runtime::{self, Context, PadContext, PadSink, PadSinkRef, PadSrc, PadSrcRef};
use gstthreadshare::runtime::{
self, Context, JoinHandle, PadContext, PadSink, PadSinkRef, PadSrc, PadSrcRef,
};
const DEFAULT_CONTEXT: &str = "";
const SLEEP_DURATION: u32 = 2;
const THROTTLING_DURATION: u32 = 2;
fn init() {
use std::sync::Once;
@ -78,8 +81,10 @@ lazy_static! {
);
}
#[derive(Clone, Debug)]
struct PadSrcHandlerTest;
#[derive(Clone, Debug, Default)]
struct PadSrcHandlerTest {
flush_join_handle: Arc<sync::Mutex<Option<JoinHandle<Result<(), ()>>>>>,
}
impl PadSrcHandlerTest {
async fn start_task(&self, pad: PadSrcRef<'_>, receiver: mpsc::Receiver<Item>) {
@ -128,7 +133,7 @@ impl PadSrcHandler for PadSrcHandlerTest {
fn src_activatemode(
&self,
_pad: PadSrcRef,
_pad: &PadSrcRef,
_elem_src_test: &ElementSrcTest,
_element: &gst::Element,
mode: gst::PadMode,
@ -141,34 +146,79 @@ impl PadSrcHandler for PadSrcHandlerTest {
fn src_event(
&self,
pad: PadSrcRef,
elem_src_test: &ElementSrcTest,
pad: &PadSrcRef,
_elem_src_test: &ElementSrcTest,
element: &gst::Element,
event: gst::Event,
) -> Either<bool, BoxFuture<'static, bool>> {
gst_log!(SRC_CAT, obj: pad.gst_pad(), "Handling event {:?}", event);
gst_log!(SRC_CAT, obj: pad.gst_pad(), "Handling {:?}", event);
let ret = match event.view() {
EventView::FlushStart(..) => {
let _ = runtime::executor::block_on(elem_src_test.pause(element));
let mut flush_join_handle = self.flush_join_handle.lock().unwrap();
if flush_join_handle.is_none() {
let element = element.clone();
let pad_weak = pad.downgrade();
*flush_join_handle = Some(pad.spawn(async move {
let res = ElementSrcTest::from_instance(&element)
.pause(&element)
.await;
let pad = pad_weak.upgrade().unwrap();
if res.is_ok() {
gst_debug!(SRC_CAT, obj: pad.gst_pad(), "FlushStart complete");
} else {
gst_debug!(SRC_CAT, obj: pad.gst_pad(), "FlushStart failed");
}
res
}));
} else {
gst_debug!(SRC_CAT, obj: pad.gst_pad(), "FlushStart ignored: previous Flush in progress");
}
true
}
EventView::FlushStop(..) => {
let (res, state, pending) = element.get_state(0.into());
if res == Ok(gst::StateChangeSuccess::Success) && state == gst::State::Playing
|| res == Ok(gst::StateChangeSuccess::Async) && pending == gst::State::Playing
{
let _ = runtime::executor::block_on(elem_src_test.start(element));
let element = element.clone();
let flush_join_handle_weak = Arc::downgrade(&self.flush_join_handle);
let pad_weak = pad.downgrade();
let fut = async move {
let mut ret = false;
let pad = pad_weak.upgrade().unwrap();
let flush_join_handle = flush_join_handle_weak.upgrade().unwrap();
let flush_join_handle = flush_join_handle.lock().unwrap().take();
if let Some(flush_join_handle) = flush_join_handle {
gst_debug!(SRC_CAT, obj: pad.gst_pad(), "Waiting for FlushStart to complete");
if let Ok(Ok(())) = flush_join_handle.await {
ret = ElementSrcTest::from_instance(&element)
.start(&element)
.await
.is_ok();
gst_debug!(SRC_CAT, obj: pad.gst_pad(), "FlushStop complete");
} else {
gst_debug!(SRC_CAT, obj: pad.gst_pad(), "FlushStop aborted: FlushStart failed");
}
} else {
gst_debug!(SRC_CAT, obj: pad.gst_pad(), "FlushStop ignored: no Flush in progress");
}
ret
}
true
.boxed();
return Either::Right(fut);
}
_ => false,
};
if ret {
gst_log!(SRC_CAT, obj: pad.gst_pad(), "Handled event {:?}", event);
gst_log!(SRC_CAT, obj: pad.gst_pad(), "Handled {:?}", event);
} else {
gst_log!(SRC_CAT, obj: pad.gst_pad(), "Didn't handle event {:?}", event);
gst_log!(SRC_CAT, obj: pad.gst_pad(), "Didn't handle {:?}", event);
}
Either::Left(ret)
@ -208,7 +258,7 @@ impl ElementSrcTest {
let _state = self.state.lock().await;
gst_debug!(SRC_CAT, obj: element, "Preparing");
let context = Context::acquire(&self.settings.lock().await.context, SLEEP_DURATION)
let context = Context::acquire(&self.settings.lock().await.context, THROTTLING_DURATION)
.map_err(|err| {
gst_error_msg!(
gst::ResourceError::OpenRead,
@ -280,7 +330,7 @@ impl ElementSrcTest {
}
impl ObjectSubclass for ElementSrcTest {
const NAME: &'static str = "RsTsElementSrcTest";
const NAME: &'static str = "TsElementSrcTest";
type ParentType = gst::Element;
type Instance = gst::subclass::ElementInstanceStruct<Self>;
type Class = glib::subclass::simple::ClassStruct<Self>;
@ -318,7 +368,7 @@ impl ObjectSubclass for ElementSrcTest {
ElementSrcTest {
src_pad,
src_pad_handler: PadSrcHandlerTest,
src_pad_handler: PadSrcHandlerTest::default(),
state: Mutex::new(ElementSrcState::default()),
settings: Mutex::new(settings),
}
@ -426,111 +476,180 @@ static SINK_PROPERTIES: [glib::subclass::Property; 1] =
)
})];
#[derive(Debug)]
struct PadSinkHandlerTestInner {
flush_join_handle: sync::Mutex<Option<JoinHandle<()>>>,
context: Context,
}
#[derive(Clone, Debug)]
struct PadSinkHandlerTest;
struct PadSinkHandlerTest(Arc<PadSinkHandlerTestInner>);
impl Default for PadSinkHandlerTest {
fn default() -> Self {
PadSinkHandlerTest(Arc::new(PadSinkHandlerTestInner {
flush_join_handle: sync::Mutex::new(None),
context: Context::acquire("PadSinkHandlerTest", THROTTLING_DURATION).unwrap(),
}))
}
}
impl PadSinkHandler for PadSinkHandlerTest {
type ElementImpl = ElementSinkTest;
fn sink_chain(
&self,
pad: PadSinkRef,
elem_sink_test: &ElementSinkTest,
_element: &gst::Element,
_pad: &PadSinkRef,
_elem_sink_test: &ElementSinkTest,
element: &gst::Element,
buffer: gst::Buffer,
) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> {
let pad_weak = pad.downgrade();
let sender = Arc::clone(&elem_sink_test.sender);
let element = element.clone();
async move {
let pad = pad_weak
.upgrade()
.expect("PadSink no longer exists in sink_chain");
gst_debug!(SINK_CAT, obj: pad.gst_pad(), "Fowarding {:?}", buffer);
sender
.lock()
let elem_sink_test = ElementSinkTest::from_instance(&element);
elem_sink_test
.forward_item(&element, Item::Buffer(buffer))
.await
.as_mut()
.expect("ItemSender not set")
.send(Item::Buffer(buffer))
.await
.map(|_| gst::FlowSuccess::Ok)
.map_err(|_| gst::FlowError::CustomError)
}
.boxed()
}
fn sink_chain_list(
&self,
pad: PadSinkRef,
elem_sink_test: &ElementSinkTest,
_element: &gst::Element,
_pad: &PadSinkRef,
_elem_sink_test: &ElementSinkTest,
element: &gst::Element,
list: gst::BufferList,
) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> {
let pad_weak = pad.downgrade();
let sender = Arc::clone(&elem_sink_test.sender);
let element = element.clone();
async move {
let pad = pad_weak
.upgrade()
.expect("PadSink no longer exists in sink_chain_list");
gst_debug!(SINK_CAT, obj: pad.gst_pad(), "Fowarding {:?}", list);
sender
.lock()
let elem_sink_test = ElementSinkTest::from_instance(&element);
elem_sink_test
.forward_item(&element, Item::BufferList(list))
.await
.as_mut()
.expect("ItemSender not set")
.send(Item::BufferList(list))
.await
.map(|_| gst::FlowSuccess::Ok)
.map_err(|_| gst::FlowError::CustomError)
}
.boxed()
}
fn sink_event(
&self,
pad: PadSinkRef,
elem_sink_test: &ElementSinkTest,
_element: &gst::Element,
pad: &PadSinkRef,
_elem_sink_test: &ElementSinkTest,
element: &gst::Element,
event: gst::Event,
) -> Either<bool, BoxFuture<'static, bool>> {
if event.is_serialized() {
gst_log!(SINK_CAT, obj: pad.gst_pad(), "Handling serialized {:?}", event);
let pad_weak = pad.downgrade();
let sender = Arc::clone(&elem_sink_test.sender);
let element = element.clone();
let inner_weak = Arc::downgrade(&self.0);
Either::Right(async move {
let pad = pad_weak
.upgrade()
.expect("PadSink no longer exists in sink_event");
let elem_sink_test = ElementSinkTest::from_instance(&element);
gst_debug!(SINK_CAT, obj: pad.gst_pad(), "Fowarding serialized event {:?}", event);
sender
.lock()
.await
.as_mut()
.expect("ItemSender not set")
.send(Item::Event(event))
.await
.is_ok()
if let EventView::FlushStop(..) = event.view() {
let pad = pad_weak
.upgrade()
.expect("PadSink no longer exists in sink_event");
let inner = inner_weak.upgrade().unwrap();
let flush_join_handle = inner.flush_join_handle.lock().unwrap().take();
if let Some(flush_join_handle) = flush_join_handle {
gst_debug!(SINK_CAT, obj: pad.gst_pad(), "Waiting for FlushStart to complete");
if let Ok(()) = flush_join_handle.await {
elem_sink_test.start(&element).await;
} else {
gst_debug!(SINK_CAT, obj: pad.gst_pad(), "FlushStop ignored: FlushStart failed to complete");
}
} else {
gst_debug!(SINK_CAT, obj: pad.gst_pad(), "FlushStop ignored: no Flush in progress");
}
}
elem_sink_test.forward_item(&element, Item::Event(event)).await.is_ok()
}.boxed())
} else {
gst_debug!(SINK_CAT, obj: pad.gst_pad(), "Fowarding non-serialized event {:?}", event);
Either::Left(
runtime::executor::block_on(elem_sink_test.sender.lock())
.as_mut()
.expect("ItemSender not set")
.try_send(Item::Event(event))
.is_ok(),
)
gst_debug!(SINK_CAT, obj: pad.gst_pad(), "Handling non-serialized {:?}", event);
let ret = match event.view() {
EventView::FlushStart(..) => {
let mut flush_join_handle = self.0.flush_join_handle.lock().unwrap();
if flush_join_handle.is_none() {
gst_log!(SINK_CAT, obj: pad.gst_pad(), "Handling {:?}", event);
let element = element.clone();
*flush_join_handle = Some(self.0.context.spawn(async move {
ElementSinkTest::from_instance(&element)
.stop(&element)
.await;
}));
} else {
gst_debug!(SINK_CAT, obj: pad.gst_pad(), "FlushStart ignored: previous Flush in progress");
}
true
}
_ => false,
};
// Should forward item here
Either::Left(ret)
}
}
}
#[derive(Debug, Default)]
struct ElementSinkState {
flushing: bool,
sender: Option<mpsc::Sender<Item>>,
}
#[derive(Debug)]
struct ElementSinkTest {
sink_pad: PadSink,
sender: Arc<Mutex<Option<mpsc::Sender<Item>>>>,
state: Mutex<ElementSinkState>,
}
impl ElementSinkTest {
async fn forward_item(
&self,
element: &gst::Element,
item: Item,
) -> Result<gst::FlowSuccess, gst::FlowError> {
let mut state = self.state.lock().await;
if !state.flushing {
gst_debug!(SINK_CAT, obj: element, "Fowarding {:?}", item);
state
.sender
.as_mut()
.expect("Item Sender not set")
.send(item)
.await
.map(|_| gst::FlowSuccess::Ok)
.map_err(|_| gst::FlowError::CustomError)
} else {
gst_debug!(
SINK_CAT,
obj: element,
"Not fowarding {:?} due to flushing",
item
);
Err(gst::FlowError::Flushing)
}
}
async fn start(&self, element: &gst::Element) {
gst_debug!(SINK_CAT, obj: element, "Starting");
let mut state = self.state.lock().await;
state.flushing = false;
gst_debug!(SINK_CAT, obj: element, "Started");
}
async fn stop(&self, element: &gst::Element) {
gst_debug!(SINK_CAT, obj: element, "Stopping");
self.state.lock().await.flushing = true;
gst_debug!(SINK_CAT, obj: element, "Stopped");
}
}
lazy_static! {
@ -542,7 +661,7 @@ lazy_static! {
}
impl ObjectSubclass for ElementSinkTest {
const NAME: &'static str = "RsTsElementSinkTest";
const NAME: &'static str = "TsElementSinkTest";
type ParentType = gst::Element;
type Instance = gst::subclass::ElementInstanceStruct<Self>;
type Class = glib::subclass::simple::ClassStruct<Self>;
@ -580,6 +699,7 @@ impl ObjectSubclass for ElementSinkTest {
.expect("signal arg")
.expect("missing signal arg");
let this = Self::from_instance(&element);
gst_debug!(SINK_CAT, obj: &element, "Pushing FlushStart");
Some(
this.sink_pad
.gst_pad()
@ -600,6 +720,7 @@ impl ObjectSubclass for ElementSinkTest {
.expect("signal arg")
.expect("missing signal arg");
let this = Self::from_instance(&element);
gst_debug!(SINK_CAT, obj: &element, "Pushing FlushStop");
Some(
this.sink_pad
.gst_pad()
@ -616,7 +737,7 @@ impl ObjectSubclass for ElementSinkTest {
ElementSinkTest {
sink_pad,
sender: Arc::new(Mutex::new(None)),
state: Mutex::new(ElementSinkState::default()),
}
}
}
@ -634,7 +755,7 @@ impl ObjectImpl for ElementSinkTest {
.expect("type checked upstream")
.expect("ItemSender not found")
.clone();
*runtime::executor::block_on(self.sender.lock()) = Some(sender);
runtime::executor::block_on(self.state.lock()).sender = Some(sender);
}
_ => unimplemented!(),
}
@ -658,7 +779,10 @@ impl ElementImpl for ElementSinkTest {
match transition {
gst::StateChange::NullToReady => {
runtime::executor::block_on(self.sink_pad.prepare(&PadSinkHandlerTest));
runtime::executor::block_on(self.sink_pad.prepare(&PadSinkHandlerTest::default()));
}
gst::StateChange::PausedToReady => {
runtime::executor::block_on(self.stop(element));
}
gst::StateChange::ReadyToNull => {
runtime::executor::block_on(self.sink_pad.unprepare());
@ -666,12 +790,20 @@ impl ElementImpl for ElementSinkTest {
_ => (),
}
self.parent_change_state(element, transition)
let success = self.parent_change_state(element, transition)?;
if transition == gst::StateChange::ReadyToPaused {
runtime::executor::block_on(self.start(element));
}
Ok(success)
}
}
fn setup(
context_name: &str,
mut middle_element_1: Option<gst::Element>,
mut middle_element_2: Option<gst::Element>,
) -> (
gst::Pipeline,
gst::Element,
@ -680,53 +812,69 @@ fn setup(
) {
init();
let pipeline = gst::Pipeline::new(None);
// Src
let src_element = glib::Object::new(ElementSrcTest::get_type(), &[])
.unwrap()
.downcast::<gst::Element>()
.unwrap();
src_element.set_property("context", &context_name).unwrap();
pipeline.add(&src_element).unwrap();
let mut last_element = src_element.clone();
if let Some(middle_element) = middle_element_1.take() {
pipeline.add(&middle_element).unwrap();
last_element.link(&middle_element).unwrap();
last_element = middle_element;
}
if let Some(middle_element) = middle_element_2.take() {
// Don't link the 2 middle elements: this is used for ts-proxy
pipeline.add(&middle_element).unwrap();
last_element = middle_element;
}
// Sink
let sink_element = glib::Object::new(ElementSinkTest::get_type(), &[])
.unwrap()
.downcast::<gst::Element>()
.unwrap();
pipeline.add(&sink_element).unwrap();
last_element.link(&sink_element).unwrap();
let (sender, receiver) = mpsc::channel::<Item>(10);
sink_element
.set_property("sender", &ItemSender { sender })
.unwrap();
let pipeline = gst::Pipeline::new(None);
pipeline.add_many(&[&src_element, &sink_element]).unwrap();
src_element.link(&sink_element).unwrap();
(pipeline, src_element, sink_element, receiver)
}
#[test]
fn task() {
let (pipeline, src_element, sink_element, mut receiver) = setup("task");
fn nominal_scenario(
scenario_name: &str,
pipeline: gst::Pipeline,
src_element: gst::Element,
mut receiver: mpsc::Receiver<Item>,
) {
let elem_src_test = ElementSrcTest::from_instance(&src_element);
pipeline.set_state(gst::State::Playing).unwrap();
// Initial events
runtime::executor::block_on(
block_on(
elem_src_test.try_push(Item::Event(
gst::Event::new_stream_start("stream_id_task_test")
gst::Event::new_stream_start(scenario_name)
.group_id(gst::util_group_id_next())
.build(),
)),
)
.unwrap();
match runtime::executor::block_on(receiver.next()).unwrap() {
match block_on(receiver.next()).unwrap() {
Item::Event(event) => match event.view() {
gst::EventView::CustomDownstreamSticky(e) => {
EventView::CustomDownstreamSticky(e) => {
assert!(PadContext::is_pad_context_sticky_event(&e))
}
other => panic!("Unexpected event {:?}", other),
@ -734,34 +882,32 @@ fn task() {
other => panic!("Unexpected item {:?}", other),
}
match runtime::executor::block_on(receiver.next()).unwrap() {
match block_on(receiver.next()).unwrap() {
Item::Event(event) => match event.view() {
gst::EventView::StreamStart(_) => (),
EventView::StreamStart(_) => (),
other => panic!("Unexpected event {:?}", other),
},
other => panic!("Unexpected item {:?}", other),
}
runtime::executor::block_on(elem_src_test.try_push(Item::Event(
block_on(elem_src_test.try_push(Item::Event(
gst::Event::new_segment(&gst::FormattedSegment::<gst::format::Time>::new()).build(),
)))
.unwrap();
match runtime::executor::block_on(receiver.next()).unwrap() {
match block_on(receiver.next()).unwrap() {
Item::Event(event) => match event.view() {
gst::EventView::Segment(_) => (),
EventView::Segment(_) => (),
other => panic!("Unexpected event {:?}", other),
},
other => panic!("Unexpected item {:?}", other),
}
// Buffer
runtime::executor::block_on(
elem_src_test.try_push(Item::Buffer(gst::Buffer::from_slice(vec![1, 2, 3, 4]))),
)
.unwrap();
block_on(elem_src_test.try_push(Item::Buffer(gst::Buffer::from_slice(vec![1, 2, 3, 4]))))
.unwrap();
match runtime::executor::block_on(receiver.next()).unwrap() {
match block_on(receiver.next()).unwrap() {
Item::Buffer(buffer) => {
let data = buffer.map_readable().unwrap();
assert_eq!(data.as_slice(), vec![1, 2, 3, 4].as_slice());
@ -774,9 +920,9 @@ fn task() {
list.get_mut()
.unwrap()
.add(gst::Buffer::from_slice(vec![1, 2, 3, 4]));
runtime::executor::block_on(elem_src_test.try_push(Item::BufferList(list))).unwrap();
block_on(elem_src_test.try_push(Item::BufferList(list))).unwrap();
match runtime::executor::block_on(receiver.next()).unwrap() {
match block_on(receiver.next()).unwrap() {
Item::BufferList(_) => (),
other => panic!("Unexpected item {:?}", other),
}
@ -785,10 +931,8 @@ fn task() {
pipeline.set_state(gst::State::Paused).unwrap();
// Items not longer accepted
runtime::executor::block_on(
elem_src_test.try_push(Item::Buffer(gst::Buffer::from_slice(vec![1, 2, 3, 4]))),
)
.unwrap_err();
block_on(elem_src_test.try_push(Item::Buffer(gst::Buffer::from_slice(vec![1, 2, 3, 4]))))
.unwrap_err();
// Nothing forwarded
receiver.try_next().unwrap_err();
@ -800,41 +944,106 @@ fn task() {
receiver.try_next().unwrap_err();
// Flush
assert!(sink_element
.emit("flush-start", &[])
.unwrap()
.unwrap()
.get_some::<bool>()
.unwrap());
assert!(sink_element
.emit("flush-stop", &[])
.unwrap()
.unwrap()
.get_some::<bool>()
.unwrap());
// EOS
runtime::executor::block_on(elem_src_test.try_push(Item::Event(gst::Event::new_eos().build())))
block_on(elem_src_test.try_push(Item::Event(gst::Event::new_flush_start().build()))).unwrap();
block_on(elem_src_test.try_push(Item::Event(gst::Event::new_flush_stop(false).build())))
.unwrap();
match runtime::executor::block_on(receiver.next()).unwrap() {
match block_on(receiver.next()).unwrap() {
Item::Event(event) => match event.view() {
gst::EventView::Eos(_) => (),
EventView::FlushStop(_) => (),
other => panic!("Unexpected event {:?}", other),
},
other => panic!("Unexpected item {:?}", other),
}
// EOS
block_on(elem_src_test.try_push(Item::Event(gst::Event::new_eos().build()))).unwrap();
match block_on(receiver.next()).unwrap() {
Item::Event(event) => match event.view() {
EventView::Eos(_) => (),
other => panic!("Unexpected event {:?}", other),
},
other => panic!("Unexpected item {:?}", other),
}
// Stop the Pad task
pipeline.set_state(gst::State::Ready).unwrap();
// Receiver was dropped when stopping => can't send anymore
runtime::executor::block_on(
block_on(
elem_src_test.try_push(Item::Event(
gst::Event::new_stream_start("stream_id_task_test_past_stop")
gst::Event::new_stream_start(&format!("{}_past_stop", scenario_name))
.group_id(gst::util_group_id_next())
.build(),
)),
)
.unwrap_err();
}
#[test]
fn src_sink_nominal() {
let name = "src_sink_nominal";
let (pipeline, src_element, _sink_element, receiver) = setup(&name, None, None);
nominal_scenario(&name, pipeline, src_element, receiver);
}
#[test]
fn src_tsqueue_sink_nominal() {
init();
let name = "src_tsqueue_sink";
let ts_queue = gst::ElementFactory::make("ts-queue", Some("ts-queue")).unwrap();
ts_queue
.set_property("context", &format!("{}_queue", name))
.unwrap();
ts_queue
.set_property("context-wait", &THROTTLING_DURATION)
.unwrap();
let (pipeline, src_element, _sink_element, receiver) = setup(name, Some(ts_queue), None);
nominal_scenario(&name, pipeline, src_element, receiver);
}
#[test]
fn src_queue_sink_nominal() {
init();
let name = "src_queue_sink";
let queue = gst::ElementFactory::make("queue", Some("queue")).unwrap();
let (pipeline, src_element, _sink_element, receiver) = setup(name, Some(queue), None);
nominal_scenario(&name, pipeline, src_element, receiver);
}
#[test]
fn src_tsproxy_sink_nominal() {
init();
let name = "src_tsproxy_sink";
let ts_proxy_sink = gst::ElementFactory::make("ts-proxysink", Some("ts-proxysink")).unwrap();
ts_proxy_sink
.set_property("proxy-context", &format!("{}_proxy_context", name))
.unwrap();
let ts_proxy_src = gst::ElementFactory::make("ts-proxysrc", Some("ts-proxysrc")).unwrap();
ts_proxy_src
.set_property("proxy-context", &format!("{}_proxy_context", name))
.unwrap();
ts_proxy_src
.set_property("context", &format!("{}_context", name))
.unwrap();
ts_proxy_src
.set_property("context-wait", &THROTTLING_DURATION)
.unwrap();
let (pipeline, src_element, _sink_element, receiver) =
setup(name, Some(ts_proxy_sink), Some(ts_proxy_src));
nominal_scenario(&name, pipeline, src_element, receiver);
}