ts: have block_on panic if running on a Context thread

This commit is contained in:
François Laignel 2020-01-02 22:32:52 +01:00
parent a6adc81eb4
commit 3eed2f69d9
14 changed files with 557 additions and 551 deletions

View file

@ -21,7 +21,7 @@ gst-net = { package = "gstreamer-net", git = "https://gitlab.freedesktop.org/gst
gst-rtp = { package = "gstreamer-rtp", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
gstreamer-sys = { git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs-sys" }
pin-project = "0.4"
tokio = { git = "https://github.com/fengalin/tokio", branch = "fengalin/0.2.5-throttling", features = ["io-util", "macros", "rt-core", "sync", "time", "tcp", "udp"] }
tokio = { git = "https://github.com/fengalin/tokio", branch = "fengalin/throttling_20200105", features = ["io-util", "macros", "rt-core", "sync", "stream", "time", "tcp", "udp"] }
futures = "0.3"
lazy_static = "1.0"
either = "1.0"

View file

@ -18,7 +18,6 @@
use either::Either;
use futures::channel::mpsc;
use futures::executor::block_on;
use futures::future::BoxFuture;
use futures::lock::{Mutex, MutexGuard};
use futures::prelude::*;
@ -44,7 +43,7 @@ use std::sync::Arc;
use std::u32;
use crate::runtime::prelude::*;
use crate::runtime::{Context, PadSrc, PadSrcRef};
use crate::runtime::{self, Context, PadSrc, PadSrcRef};
const DEFAULT_CONTEXT: &str = "";
const DEFAULT_CONTEXT_WAIT: u32 = 0;
@ -280,7 +279,7 @@ impl PadSrcHandler for AppSrcPadHandler {
let ret = match event.view() {
EventView::FlushStart(..) => {
let _ = block_on(app_src.pause(element));
let _ = runtime::executor::block_on(app_src.pause(element));
true
}
EventView::FlushStop(..) => {
@ -288,7 +287,7 @@ impl PadSrcHandler for AppSrcPadHandler {
if res == Ok(gst::StateChangeSuccess::Success) && state == gst::State::Playing
|| res == Ok(gst::StateChangeSuccess::Async) && pending == gst::State::Playing
{
let _ = block_on(app_src.start(element));
let _ = runtime::executor::block_on(app_src.start(element));
}
true
}
@ -325,7 +324,7 @@ impl PadSrcHandler for AppSrcPadHandler {
true
}
QueryView::Caps(ref mut q) => {
let inner = block_on(self.lock());
let inner = runtime::executor::block_on(self.lock());
let caps = if let Some(ref caps) = inner.configured_caps {
q.get_filter()
.map(|f| f.intersect_with_mode(caps, gst::CapsIntersectMode::First))
@ -419,15 +418,15 @@ impl AppSrc {
let _state = self.state.lock().await;
gst_debug!(CAT, obj: element, "Preparing");
let settings = self.settings.lock().await;
let context =
let context = {
let settings = self.settings.lock().await;
Context::acquire(&settings.context, settings.context_wait).map_err(|err| {
gst_error_msg!(
gst::ResourceError::OpenRead,
["Failed to acquire Context: {}", err]
)
})?;
})?
};
self.src_pad
.prepare(context, &self.src_pad_handler)
@ -540,7 +539,7 @@ impl ObjectSubclass for AppSrc {
.expect("missing signal arg");
let appsrc = Self::from_instance(&element);
Some(block_on(appsrc.push_buffer(&element, buffer)).to_value())
Some(runtime::executor::block_on(appsrc.push_buffer(&element, buffer)).to_value())
},
);
@ -555,7 +554,7 @@ impl ObjectSubclass for AppSrc {
.expect("signal arg")
.expect("missing signal arg");
let appsrc = Self::from_instance(&element);
Some(block_on(appsrc.end_of_stream(&element)).to_value())
Some(runtime::executor::block_on(appsrc.end_of_stream(&element)).to_value())
},
);
}
@ -579,28 +578,24 @@ impl ObjectImpl for AppSrc {
fn set_property(&self, _obj: &glib::Object, id: usize, value: &glib::Value) {
let prop = &PROPERTIES[id];
let mut settings = runtime::executor::block_on(self.settings.lock());
match *prop {
subclass::Property("context", ..) => {
let mut settings = block_on(self.settings.lock());
settings.context = value
.get()
.expect("type checked upstream")
.unwrap_or_else(|| "".into());
}
subclass::Property("context-wait", ..) => {
let mut settings = block_on(self.settings.lock());
settings.context_wait = value.get_some().expect("type checked upstream");
}
subclass::Property("caps", ..) => {
let mut settings = block_on(self.settings.lock());
settings.caps = value.get().expect("type checked upstream");
}
subclass::Property("max-buffers", ..) => {
let mut settings = block_on(self.settings.lock());
settings.max_buffers = value.get_some().expect("type checked upstream");
}
subclass::Property("do-timestamp", ..) => {
let mut settings = block_on(self.settings.lock());
settings.do_timestamp = value.get_some().expect("type checked upstream");
}
_ => unimplemented!(),
@ -610,27 +605,13 @@ impl ObjectImpl for AppSrc {
fn get_property(&self, _obj: &glib::Object, id: usize) -> Result<glib::Value, ()> {
let prop = &PROPERTIES[id];
let settings = runtime::executor::block_on(self.settings.lock());
match *prop {
subclass::Property("context", ..) => {
let settings = block_on(self.settings.lock());
Ok(settings.context.to_value())
}
subclass::Property("context-wait", ..) => {
let settings = block_on(self.settings.lock());
Ok(settings.context_wait.to_value())
}
subclass::Property("caps", ..) => {
let settings = block_on(self.settings.lock());
Ok(settings.caps.to_value())
}
subclass::Property("max-buffers", ..) => {
let settings = block_on(self.settings.lock());
Ok(settings.max_buffers.to_value())
}
subclass::Property("do-timestamp", ..) => {
let settings = block_on(self.settings.lock());
Ok(settings.do_timestamp.to_value())
}
subclass::Property("context", ..) => Ok(settings.context.to_value()),
subclass::Property("context-wait", ..) => Ok(settings.context_wait.to_value()),
subclass::Property("caps", ..) => Ok(settings.caps.to_value()),
subclass::Property("max-buffers", ..) => Ok(settings.max_buffers.to_value()),
subclass::Property("do-timestamp", ..) => Ok(settings.do_timestamp.to_value()),
_ => unimplemented!(),
}
}
@ -655,16 +636,18 @@ impl ElementImpl for AppSrc {
match transition {
gst::StateChange::NullToReady => {
block_on(self.prepare(element)).map_err(|err| {
runtime::executor::block_on(self.prepare(element)).map_err(|err| {
element.post_error_message(&err);
gst::StateChangeError
})?;
}
gst::StateChange::PlayingToPaused => {
block_on(self.pause(element)).map_err(|_| gst::StateChangeError)?;
runtime::executor::block_on(self.pause(element))
.map_err(|_| gst::StateChangeError)?;
}
gst::StateChange::ReadyToNull => {
block_on(self.unprepare(element)).map_err(|_| gst::StateChangeError)?;
runtime::executor::block_on(self.unprepare(element))
.map_err(|_| gst::StateChangeError)?;
}
_ => (),
}
@ -676,10 +659,11 @@ impl ElementImpl for AppSrc {
success = gst::StateChangeSuccess::NoPreroll;
}
gst::StateChange::PausedToPlaying => {
block_on(self.start(element)).map_err(|_| gst::StateChangeError)?;
runtime::executor::block_on(self.start(element))
.map_err(|_| gst::StateChangeError)?;
}
gst::StateChange::PausedToReady => {
block_on(async {
runtime::executor::block_on(async {
self.src_pad_handler.lock().await.need_initial_events = true;
});
}

View file

@ -17,7 +17,6 @@
use either::Either;
use futures::executor::block_on;
use futures::future::BoxFuture;
use futures::future::{abortable, AbortHandle, Aborted};
use futures::lock::{Mutex, MutexGuard};
@ -43,7 +42,9 @@ use std::collections::{BTreeSet, VecDeque};
use std::time::Duration;
use crate::runtime::prelude::*;
use crate::runtime::{Context, PadContext, PadContextWeak, PadSink, PadSinkRef, PadSrc, PadSrcRef};
use crate::runtime::{
self, Context, JoinHandle, PadContext, PadContextRef, PadSink, PadSinkRef, PadSrc, PadSrcRef,
};
use super::{RTPJitterBuffer, RTPJitterBufferItem, RTPPacketRateCtx};
@ -226,7 +227,7 @@ impl PadSinkHandler for JitterBufferPadSinkHandler {
}.boxed())
} else {
if let EventView::FlushStop(..) = event.view() {
block_on(jitterbuffer.flush(element));
runtime::executor::block_on(jitterbuffer.flush(element));
}
gst_log!(CAT, obj: pad.gst_pad(), "Forwarding non-serialized event {:?}", event);
@ -247,7 +248,8 @@ impl PadSinkHandler for JitterBufferPadSinkHandler {
match query.view_mut() {
QueryView::Drain(..) => {
gst_info!(CAT, obj: pad.gst_pad(), "Draining");
block_on(jitterbuffer.enqueue_item(pad.gst_pad(), element, None)).is_ok()
runtime::executor::block_on(jitterbuffer.enqueue_item(pad.gst_pad(), element, None))
.is_ok()
}
_ => jitterbuffer.src_pad.gst_pad().peer_query(query),
}
@ -277,8 +279,9 @@ impl PadSrcHandler for JitterBufferPadSrcHandler {
if ret {
let (_, mut min_latency, _) = peer_query.get_result();
let our_latency =
block_on(jitterbuffer.settings.lock()).latency_ms as u64 * gst::MSECOND;
let our_latency = runtime::executor::block_on(jitterbuffer.settings.lock())
.latency_ms as u64
* gst::MSECOND;
min_latency += our_latency;
let max_latency = gst::CLOCK_TIME_NONE;
@ -292,7 +295,11 @@ impl PadSrcHandler for JitterBufferPadSrcHandler {
if q.get_format() != gst::Format::Time {
jitterbuffer.sink_pad.gst_pad().peer_query(query)
} else {
q.set(block_on(jitterbuffer.state.lock()).segment.get_position());
q.set(
runtime::executor::block_on(jitterbuffer.state.lock())
.segment
.get_position(),
);
true
}
}
@ -355,7 +362,7 @@ struct State {
last_res: Result<gst::FlowSuccess, gst::FlowError>,
task_queue_abort_handle: Option<AbortHandle>,
wakeup_abort_handle: Option<AbortHandle>,
wakeup_join_handle: Option<tokio::task::JoinHandle<Result<(), Aborted>>>,
wakeup_join_handle: Option<JoinHandle<Result<(), Aborted>>>,
}
impl Default for State {
@ -952,24 +959,29 @@ impl JitterBuffer {
gst_debug!(CAT, obj: element, "Scheduling wakeup in {}", delay);
let element = element.clone();
let pad_ctx_weak = pad_ctx.downgrade();
let wakeup_fut = pad_ctx.delay_for(Duration::from_nanos(delay), move || {
Self::wakeup_fut(latency_ns, context_wait_ns, element, pad_ctx_weak)
});
let (wakeup_fut, abort_handle) = abortable(wakeup_fut);
let (wakeup_fut, abort_handle) = abortable(Self::wakeup_fut(
Duration::from_nanos(delay),
latency_ns,
context_wait_ns,
&element,
&pad_ctx,
));
state.wakeup_join_handle = Some(pad_ctx.spawn(wakeup_fut));
state.wakeup_abort_handle = Some(abort_handle);
}
fn wakeup_fut(
delay: Duration,
latency_ns: gst::ClockTime,
context_wait_ns: gst::ClockTime,
element: gst::Element,
pad_ctx_weak: PadContextWeak,
element: &gst::Element,
pad_ctx: &PadContextRef,
) -> BoxFuture<'static, ()> {
let element = element.clone();
let pad_ctx_weak = pad_ctx.downgrade();
async move {
runtime::time::delay_for(delay).await;
let jb = Self::from_instance(&element);
let mut state = jb.state.lock().await;
@ -1140,7 +1152,7 @@ impl ObjectSubclass for JitterBuffer {
.expect("signal arg")
.expect("missing signal arg");
let jitterbuffer = Self::from_instance(&element);
block_on(jitterbuffer.clear_pt_map(&element));
runtime::executor::block_on(jitterbuffer.clear_pt_map(&element));
None
},
);
@ -1178,12 +1190,12 @@ impl ObjectImpl for JitterBuffer {
fn set_property(&self, _obj: &glib::Object, id: usize, value: &glib::Value) {
let prop = &PROPERTIES[id];
let mut settings = runtime::executor::block_on(self.settings.lock());
match *prop {
subclass::Property("latency", ..) => {
let mut settings = block_on(self.settings.lock());
settings.latency_ms = value.get_some().expect("type checked upstream");
block_on(self.state.lock())
runtime::executor::block_on(self.state.lock())
.jbuf
.borrow()
.set_delay(settings.latency_ms as u64 * gst::MSECOND);
@ -1191,26 +1203,21 @@ impl ObjectImpl for JitterBuffer {
/* TODO: post message */
}
subclass::Property("do-lost", ..) => {
let mut settings = block_on(self.settings.lock());
settings.do_lost = value.get_some().expect("type checked upstream");
}
subclass::Property("max-dropout-time", ..) => {
let mut settings = block_on(self.settings.lock());
settings.max_dropout_time = value.get_some().expect("type checked upstream");
}
subclass::Property("max-misorder-time", ..) => {
let mut settings = block_on(self.settings.lock());
settings.max_misorder_time = value.get_some().expect("type checked upstream");
}
subclass::Property("context", ..) => {
let mut settings = block_on(self.settings.lock());
settings.context = value
.get()
.expect("type checked upstream")
.unwrap_or_else(|| "".into());
}
subclass::Property("context-wait", ..) => {
let mut settings = block_on(self.settings.lock());
settings.context_wait = value.get_some().expect("type checked upstream");
}
_ => unimplemented!(),
@ -1222,23 +1229,23 @@ impl ObjectImpl for JitterBuffer {
match *prop {
subclass::Property("latency", ..) => {
let settings = block_on(self.settings.lock());
let settings = runtime::executor::block_on(self.settings.lock());
Ok(settings.latency_ms.to_value())
}
subclass::Property("do-lost", ..) => {
let settings = block_on(self.settings.lock());
let settings = runtime::executor::block_on(self.settings.lock());
Ok(settings.do_lost.to_value())
}
subclass::Property("max-dropout-time", ..) => {
let settings = block_on(self.settings.lock());
let settings = runtime::executor::block_on(self.settings.lock());
Ok(settings.max_dropout_time.to_value())
}
subclass::Property("max-misorder-time", ..) => {
let settings = block_on(self.settings.lock());
let settings = runtime::executor::block_on(self.settings.lock());
Ok(settings.max_misorder_time.to_value())
}
subclass::Property("stats", ..) => {
let state = block_on(self.state.lock());
let state = runtime::executor::block_on(self.state.lock());
let s = gst::Structure::new(
"application/x-rtp-jitterbuffer-stats",
&[
@ -1250,11 +1257,11 @@ impl ObjectImpl for JitterBuffer {
Ok(s.to_value())
}
subclass::Property("context", ..) => {
let settings = block_on(self.settings.lock());
let settings = runtime::executor::block_on(self.settings.lock());
Ok(settings.context.to_value())
}
subclass::Property("context-wait", ..) => {
let settings = block_on(self.settings.lock());
let settings = runtime::executor::block_on(self.settings.lock());
Ok(settings.context_wait.to_value())
}
_ => unimplemented!(),
@ -1279,14 +1286,16 @@ impl ElementImpl for JitterBuffer {
gst_trace!(CAT, obj: element, "Changing state {:?}", transition);
match transition {
gst::StateChange::NullToReady => block_on(async {
gst::StateChange::NullToReady => runtime::executor::block_on(async {
let _state = self.state.lock().await;
let settings = self.settings.lock().await;
let context = Context::acquire(&settings.context, settings.context_wait).unwrap();
let context = {
let settings = self.settings.lock().await;
Context::acquire(&settings.context, settings.context_wait).unwrap()
};
let _ = self
.src_pad
.prepare(context, &JitterBufferPadSrcHandler {})
.prepare(context, &JitterBufferPadSrcHandler)
.await
.map_err(|err| {
gst_error_msg!(
@ -1296,9 +1305,9 @@ impl ElementImpl for JitterBuffer {
gst::StateChangeError
});
self.sink_pad.prepare(&JitterBufferPadSinkHandler {}).await;
self.sink_pad.prepare(&JitterBufferPadSinkHandler).await;
}),
gst::StateChange::PausedToReady => block_on(async {
gst::StateChange::PausedToReady => runtime::executor::block_on(async {
let mut state = self.state.lock().await;
if let Some(wakeup_abort_handle) = state.wakeup_abort_handle.take() {
@ -1309,7 +1318,7 @@ impl ElementImpl for JitterBuffer {
abort_handle.abort();
}
}),
gst::StateChange::ReadyToNull => block_on(async {
gst::StateChange::ReadyToNull => runtime::executor::block_on(async {
let mut state = self.state.lock().await;
self.sink_pad.unprepare().await;

View file

@ -18,7 +18,6 @@
use either::Either;
use futures::channel::oneshot;
use futures::executor::block_on;
use futures::future::BoxFuture;
use futures::lock::{Mutex, MutexGuard};
use futures::prelude::*;
@ -42,7 +41,7 @@ use std::sync::{Arc, Weak};
use std::{u32, u64};
use crate::runtime::prelude::*;
use crate::runtime::{Context, PadSink, PadSinkRef, PadSrc, PadSrcRef};
use crate::runtime::{self, Context, PadSink, PadSinkRef, PadSrc, PadSrcRef};
use super::dataqueue::{DataQueue, DataQueueItem};
@ -364,7 +363,7 @@ impl PadSinkHandler for ProxySinkPadHandler {
} else {
match event.view() {
EventView::FlushStart(..) => {
let _ = block_on(proxysink.stop(element));
let _ = runtime::executor::block_on(proxysink.stop(element));
}
EventView::FlushStop(..) => {
let (res, state, pending) = element.get_state(0.into());
@ -372,7 +371,7 @@ impl PadSinkHandler for ProxySinkPadHandler {
|| res == Ok(gst::StateChangeSuccess::Async)
&& pending == gst::State::Paused
{
let _ = block_on(proxysink.start(&element));
let _ = runtime::executor::block_on(proxysink.start(&element));
}
}
_ => (),
@ -380,7 +379,9 @@ impl PadSinkHandler for ProxySinkPadHandler {
gst_debug!(SINK_CAT, obj: pad.gst_pad(), "Fowarding non-serialized event {:?}", event);
// FIXME proxysink can't forward directly to the src_pad of the proxysrc
let _ = block_on(proxysink.enqueue_item(&element, DataQueueItem::Event(event)));
let _ = runtime::executor::block_on(
proxysink.enqueue_item(&element, DataQueueItem::Event(event)),
);
Either::Left(true)
}
@ -599,8 +600,8 @@ impl ProxySink {
let mut state = self.state.lock().await;
gst_debug!(SINK_CAT, obj: element, "Preparing");
let settings = self.settings.lock().await;
state.queue = match SharedQueue::get(&settings.proxy_context, true).await {
state.queue = match SharedQueue::get(&self.settings.lock().await.proxy_context, true).await
{
Some(queue) => Some(queue),
None => {
return Err(gst_error_msg!(
@ -706,7 +707,7 @@ impl ObjectImpl for ProxySink {
match *prop {
subclass::Property("proxy-context", ..) => {
let mut settings = block_on(self.settings.lock());
let mut settings = runtime::executor::block_on(self.settings.lock());
settings.proxy_context = value
.get()
.expect("type checked upstream")
@ -721,7 +722,7 @@ impl ObjectImpl for ProxySink {
match *prop {
subclass::Property("proxy-context", ..) => {
let settings = block_on(self.settings.lock());
let settings = runtime::executor::block_on(self.settings.lock());
Ok(settings.proxy_context.to_value())
}
_ => unimplemented!(),
@ -748,16 +749,18 @@ impl ElementImpl for ProxySink {
match transition {
gst::StateChange::NullToReady => {
block_on(self.prepare(element)).map_err(|err| {
runtime::executor::block_on(self.prepare(element)).map_err(|err| {
element.post_error_message(&err);
gst::StateChangeError
})?;
}
gst::StateChange::PausedToReady => {
block_on(self.stop(element)).map_err(|_| gst::StateChangeError)?;
runtime::executor::block_on(self.stop(element))
.map_err(|_| gst::StateChangeError)?;
}
gst::StateChange::ReadyToNull => {
block_on(self.unprepare(element)).map_err(|_| gst::StateChangeError)?;
runtime::executor::block_on(self.unprepare(element))
.map_err(|_| gst::StateChangeError)?;
}
_ => (),
}
@ -765,7 +768,7 @@ impl ElementImpl for ProxySink {
let success = self.parent_change_state(element, transition)?;
if transition == gst::StateChange::ReadyToPaused {
block_on(self.start(element)).map_err(|_| gst::StateChangeError)?;
runtime::executor::block_on(self.start(element)).map_err(|_| gst::StateChangeError)?;
}
Ok(success)
@ -881,7 +884,7 @@ impl PadSrcHandler for ProxySrcPadHandler {
let ret = match event.view() {
EventView::FlushStart(..) => {
let _ = block_on(proxysrc.pause(element));
let _ = runtime::executor::block_on(proxysrc.pause(element));
true
}
EventView::FlushStop(..) => {
@ -889,7 +892,7 @@ impl PadSrcHandler for ProxySrcPadHandler {
if res == Ok(gst::StateChangeSuccess::Success) && state == gst::State::Playing
|| res == Ok(gst::StateChangeSuccess::Async) && pending == gst::State::Playing
{
let _ = block_on(proxysrc.start(element));
let _ = runtime::executor::block_on(proxysrc.start(element));
}
true
}
@ -1032,7 +1035,7 @@ impl ProxySrc {
})?;
self.src_pad
.prepare(context, &ProxySrcPadHandler {})
.prepare(context, &ProxySrcPadHandler)
.await
.map_err(|err| {
gst_error_msg!(
@ -1155,32 +1158,27 @@ impl ObjectImpl for ProxySrc {
fn set_property(&self, _obj: &glib::Object, id: usize, value: &glib::Value) {
let prop = &PROPERTIES_SRC[id];
let mut settings = runtime::executor::block_on(self.settings.lock());
match *prop {
subclass::Property("max-size-buffers", ..) => {
let mut settings = block_on(self.settings.lock());
settings.max_size_buffers = value.get_some().expect("type checked upstream");
}
subclass::Property("max-size-bytes", ..) => {
let mut settings = block_on(self.settings.lock());
settings.max_size_bytes = value.get_some().expect("type checked upstream");
}
subclass::Property("max-size-time", ..) => {
let mut settings = block_on(self.settings.lock());
settings.max_size_time = value.get_some().expect("type checked upstream");
}
subclass::Property("context", ..) => {
let mut settings = block_on(self.settings.lock());
settings.context = value
.get()
.expect("type checked upstream")
.unwrap_or_else(|| "".into());
}
subclass::Property("context-wait", ..) => {
let mut settings = block_on(self.settings.lock());
settings.context_wait = value.get_some().expect("type checked upstream");
}
subclass::Property("proxy-context", ..) => {
let mut settings = block_on(self.settings.lock());
settings.proxy_context = value
.get()
.expect("type checked upstream")
@ -1193,31 +1191,14 @@ impl ObjectImpl for ProxySrc {
fn get_property(&self, _obj: &glib::Object, id: usize) -> Result<glib::Value, ()> {
let prop = &PROPERTIES_SRC[id];
let settings = runtime::executor::block_on(self.settings.lock());
match *prop {
subclass::Property("max-size-buffers", ..) => {
let settings = block_on(self.settings.lock());
Ok(settings.max_size_buffers.to_value())
}
subclass::Property("max-size-bytes", ..) => {
let settings = block_on(self.settings.lock());
Ok(settings.max_size_bytes.to_value())
}
subclass::Property("max-size-time", ..) => {
let settings = block_on(self.settings.lock());
Ok(settings.max_size_time.to_value())
}
subclass::Property("context", ..) => {
let settings = block_on(self.settings.lock());
Ok(settings.context.to_value())
}
subclass::Property("context-wait", ..) => {
let settings = block_on(self.settings.lock());
Ok(settings.context_wait.to_value())
}
subclass::Property("proxy-context", ..) => {
let settings = block_on(self.settings.lock());
Ok(settings.proxy_context.to_value())
}
subclass::Property("max-size-buffers", ..) => Ok(settings.max_size_buffers.to_value()),
subclass::Property("max-size-bytes", ..) => Ok(settings.max_size_bytes.to_value()),
subclass::Property("max-size-time", ..) => Ok(settings.max_size_time.to_value()),
subclass::Property("context", ..) => Ok(settings.context.to_value()),
subclass::Property("context-wait", ..) => Ok(settings.context_wait.to_value()),
subclass::Property("proxy-context", ..) => Ok(settings.proxy_context.to_value()),
_ => unimplemented!(),
}
}
@ -1242,16 +1223,18 @@ impl ElementImpl for ProxySrc {
match transition {
gst::StateChange::NullToReady => {
block_on(self.prepare(element)).map_err(|err| {
runtime::executor::block_on(self.prepare(element)).map_err(|err| {
element.post_error_message(&err);
gst::StateChangeError
})?;
}
gst::StateChange::PlayingToPaused => {
block_on(self.pause(element)).map_err(|_| gst::StateChangeError)?;
runtime::executor::block_on(self.pause(element))
.map_err(|_| gst::StateChangeError)?;
}
gst::StateChange::ReadyToNull => {
block_on(self.unprepare(element)).map_err(|_| gst::StateChangeError)?;
runtime::executor::block_on(self.unprepare(element))
.map_err(|_| gst::StateChangeError)?;
}
_ => (),
}
@ -1263,7 +1246,8 @@ impl ElementImpl for ProxySrc {
success = gst::StateChangeSuccess::NoPreroll;
}
gst::StateChange::PausedToPlaying => {
block_on(self.start(element)).map_err(|_| gst::StateChangeError)?;
runtime::executor::block_on(self.start(element))
.map_err(|_| gst::StateChangeError)?;
}
_ => (),
}

View file

@ -18,7 +18,6 @@
use either::Either;
use futures::channel::oneshot;
use futures::executor::block_on;
use futures::future::BoxFuture;
use futures::lock::Mutex;
use futures::prelude::*;
@ -41,7 +40,7 @@ use std::collections::VecDeque;
use std::{u32, u64};
use crate::runtime::prelude::*;
use crate::runtime::{Context, PadSink, PadSinkRef, PadSrc, PadSrcRef};
use crate::runtime::{self, Context, PadSink, PadSinkRef, PadSrc, PadSrcRef};
use super::dataqueue::{DataQueue, DataQueueItem};
@ -372,7 +371,7 @@ impl PadSrcHandler for QueuePadSrcHandler {
} else {
match event.view() {
EventView::FlushStart(..) => {
let _ = block_on(queue.stop(element));
let _ = runtime::executor::block_on(queue.stop(element));
}
EventView::FlushStop(..) => {
let (res, state, pending) = element.get_state(0.into());
@ -380,7 +379,7 @@ impl PadSrcHandler for QueuePadSrcHandler {
|| res == Ok(gst::StateChangeSuccess::Async)
&& pending == gst::State::Playing
{
let _ = block_on(queue.start(element));
let _ = runtime::executor::block_on(queue.start(element));
}
}
_ => (),
@ -671,7 +670,7 @@ impl Queue {
})?;
self.src_pad
.prepare(context, &QueuePadSrcHandler {})
.prepare(context, &QueuePadSrcHandler)
.await
.map_err(|err| {
gst_error_msg!(
@ -679,7 +678,7 @@ impl Queue {
["Error joining Context: {:?}", err]
)
})?;
self.sink_pad.prepare(&QueuePadSinkHandler {}).await;
self.sink_pad.prepare(&QueuePadSinkHandler).await;
gst_debug!(CAT, obj: element, "Prepared");
@ -812,26 +811,26 @@ impl ObjectImpl for Queue {
match *prop {
subclass::Property("max-size-buffers", ..) => {
let mut settings = block_on(self.settings.lock());
let mut settings = runtime::executor::block_on(self.settings.lock());
settings.max_size_buffers = value.get_some().expect("type checked upstream");
}
subclass::Property("max-size-bytes", ..) => {
let mut settings = block_on(self.settings.lock());
let mut settings = runtime::executor::block_on(self.settings.lock());
settings.max_size_bytes = value.get_some().expect("type checked upstream");
}
subclass::Property("max-size-time", ..) => {
let mut settings = block_on(self.settings.lock());
let mut settings = runtime::executor::block_on(self.settings.lock());
settings.max_size_time = value.get_some().expect("type checked upstream");
}
subclass::Property("context", ..) => {
let mut settings = block_on(self.settings.lock());
let mut settings = runtime::executor::block_on(self.settings.lock());
settings.context = value
.get()
.expect("type checked upstream")
.unwrap_or_else(|| "".into());
}
subclass::Property("context-wait", ..) => {
let mut settings = block_on(self.settings.lock());
let mut settings = runtime::executor::block_on(self.settings.lock());
settings.context_wait = value.get_some().expect("type checked upstream");
}
_ => unimplemented!(),
@ -843,23 +842,23 @@ impl ObjectImpl for Queue {
match *prop {
subclass::Property("max-size-buffers", ..) => {
let settings = block_on(self.settings.lock());
let settings = runtime::executor::block_on(self.settings.lock());
Ok(settings.max_size_buffers.to_value())
}
subclass::Property("max-size-bytes", ..) => {
let settings = block_on(self.settings.lock());
let settings = runtime::executor::block_on(self.settings.lock());
Ok(settings.max_size_bytes.to_value())
}
subclass::Property("max-size-time", ..) => {
let settings = block_on(self.settings.lock());
let settings = runtime::executor::block_on(self.settings.lock());
Ok(settings.max_size_time.to_value())
}
subclass::Property("context", ..) => {
let settings = block_on(self.settings.lock());
let settings = runtime::executor::block_on(self.settings.lock());
Ok(settings.context.to_value())
}
subclass::Property("context-wait", ..) => {
let settings = block_on(self.settings.lock());
let settings = runtime::executor::block_on(self.settings.lock());
Ok(settings.context_wait.to_value())
}
_ => unimplemented!(),
@ -885,16 +884,18 @@ impl ElementImpl for Queue {
match transition {
gst::StateChange::NullToReady => {
block_on(self.prepare(element)).map_err(|err| {
runtime::executor::block_on(self.prepare(element)).map_err(|err| {
element.post_error_message(&err);
gst::StateChangeError
})?;
}
gst::StateChange::PausedToReady => {
block_on(self.stop(element)).map_err(|_| gst::StateChangeError)?;
runtime::executor::block_on(self.stop(element))
.map_err(|_| gst::StateChangeError)?;
}
gst::StateChange::ReadyToNull => {
block_on(self.unprepare(element)).map_err(|_| gst::StateChangeError)?;
runtime::executor::block_on(self.unprepare(element))
.map_err(|_| gst::StateChangeError)?;
}
_ => (),
}
@ -902,7 +903,7 @@ impl ElementImpl for Queue {
let success = self.parent_change_state(element, transition)?;
if transition == gst::StateChange::ReadyToPaused {
block_on(self.start(element)).map_err(|_| gst::StateChangeError)?;
runtime::executor::block_on(self.start(element)).map_err(|_| gst::StateChangeError)?;
}
Ok(success)

View file

@ -1,4 +1,5 @@
// Copyright (C) 2018 Sebastian Dröge <sebastian@centricular.com>
// Copyright (C) 2018-2019 Sebastian Dröge <sebastian@centricular.com>
// Copyright (C) 2019-2020 François Laignel <fengalin@free.fr>
//
// This library is free software; you can redistribute it and/or
// modify it under the terms of the GNU Library General Public
@ -49,11 +50,15 @@ use gst::{gst_debug, gst_log, gst_trace};
use lazy_static::lazy_static;
use std::cell::RefCell;
use std::collections::HashMap;
use std::fmt;
use std::io;
use std::mem;
use std::pin::Pin;
use std::sync::mpsc as sync_mpsc;
use std::sync::{Arc, Mutex, Weak};
use std::task::Poll;
use std::thread;
use std::time::Duration;
@ -73,52 +78,78 @@ lazy_static! {
static ref CONTEXTS: Mutex<HashMap<String, Weak<ContextInner>>> = Mutex::new(HashMap::new());
}
thread_local!(static CURRENT_THREAD_CONTEXT: RefCell<Option<ContextWeak>> = RefCell::new(None));
/// Blocks on `future`.
///
/// This function must NOT be called within a [`Context`] thread.
///
/// The reason is this would prevent any task operating on the
/// [`Context`] from making progress.
///
/// # Panics
///
/// This function panics if called within a [`Context`] thread.
///
/// [`Context`]: struct.Context.html
pub fn block_on<Fut: Future>(future: Fut) -> Fut::Output {
if Context::is_context_thread() {
panic!("Attempt to `block_on` within a `Context` thread");
}
// Not running in a Context thread so we can block
futures::executor::block_on(future)
}
struct ContextThread {
name: String,
}
impl ContextThread {
fn start(name: &str, wait: u32) -> (tokio::runtime::Handle, ContextShutdown) {
let name_clone = name.into();
let mut context_thread = ContextThread { name: name_clone };
let (handle_sender, handle_receiver) = sync_mpsc::channel();
let (shutdown_sender, shutdown_receiver) = oneshot::channel();
fn start(name: &str, wait: u32) -> Context {
let context_thread = ContextThread { name: name.into() };
let (context_sender, context_receiver) = sync_mpsc::channel();
let join = thread::spawn(move || {
context_thread.spawn(wait, handle_sender, shutdown_receiver);
context_thread.spawn(wait, context_sender);
});
let handle = handle_receiver.recv().expect("Context thread init failed");
let context = context_receiver.recv().expect("Context thread init failed");
*context.0.shutdown.join.lock().unwrap() = Some(join);
let shutdown = ContextShutdown {
name: name.into(),
shutdown: Some(shutdown_sender),
join: Some(join),
};
(handle, shutdown)
context
}
fn spawn(
&mut self,
wait: u32,
handle_sender: sync_mpsc::Sender<tokio::runtime::Handle>,
shutdown_receiver: oneshot::Receiver<()>,
) {
fn spawn(&self, wait: u32, context_sender: sync_mpsc::Sender<Context>) {
gst_debug!(RUNTIME_CAT, "Started context thread '{}'", self.name);
let mut runtime = tokio::runtime::Builder::new()
.basic_scheduler()
.thread_name(self.name.clone())
.enable_all()
.max_throttling(Duration::from_millis(wait as u64))
.build()
.expect("Couldn't build the runtime");
handle_sender
.send(runtime.handle().clone())
.expect("Couldn't send context thread handle");
let (shutdown_sender, shutdown_receiver) = oneshot::channel();
let shutdown = ContextShutdown {
name: self.name.clone(),
shutdown: Some(shutdown_sender),
join: Mutex::new(None),
};
let context = Context(Arc::new(ContextInner {
name: self.name.clone(),
handle: Mutex::new(runtime.handle().clone()),
shutdown,
task_queues: Mutex::new((0, HashMap::new())),
}));
CURRENT_THREAD_CONTEXT.with(|cur_ctx| {
*cur_ctx.borrow_mut() = Some(context.downgrade());
});
context_sender.send(context).unwrap();
let _ = runtime.block_on(shutdown_receiver);
}
@ -134,7 +165,7 @@ impl Drop for ContextThread {
struct ContextShutdown {
name: String,
shutdown: Option<oneshot::Sender<()>>,
join: Option<thread::JoinHandle<()>>,
join: Mutex<Option<thread::JoinHandle<()>>>,
}
impl Drop for ContextShutdown {
@ -151,7 +182,8 @@ impl Drop for ContextShutdown {
"Waiting for context thread '{}' to shutdown",
self.name
);
let _ = self.join.take().unwrap().join();
let join_handle = self.join.lock().unwrap().take().unwrap();
let _ = join_handle.join();
}
}
@ -169,12 +201,65 @@ glib_boxed_derive_traits!(TaskQueueId);
pub type TaskOutput = Result<(), gst::FlowError>;
type TaskQueue = FuturesUnordered<BoxFuture<'static, TaskOutput>>;
pub struct JoinError(tokio::task::JoinError);
impl fmt::Display for JoinError {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Display::fmt(&self.0, fmt)
}
}
impl fmt::Debug for JoinError {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Debug::fmt(&self.0, fmt)
}
}
impl std::error::Error for JoinError {}
impl From<tokio::task::JoinError> for JoinError {
fn from(src: tokio::task::JoinError) -> Self {
JoinError(src)
}
}
/// Wrapper for the underlying runtime JoinHandle implementation.
pub struct JoinHandle<T>(tokio::task::JoinHandle<T>);
unsafe impl<T: Send> Send for JoinHandle<T> {}
unsafe impl<T: Send> Sync for JoinHandle<T> {}
impl<T> From<tokio::task::JoinHandle<T>> for JoinHandle<T> {
fn from(src: tokio::task::JoinHandle<T>) -> Self {
JoinHandle(src)
}
}
impl<T> Unpin for JoinHandle<T> {}
impl<T> Future for JoinHandle<T> {
type Output = Result<T, JoinError>;
fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
self.as_mut().0.poll_unpin(cx).map_err(JoinError::from)
}
}
impl<T> fmt::Debug for JoinHandle<T>
where
T: fmt::Debug,
{
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("JoinHandle").finish()
}
}
#[derive(Debug)]
struct ContextInner {
name: String,
handle: Mutex<tokio::runtime::Handle>,
// Only used for dropping
_shutdown: ContextShutdown,
shutdown: ContextShutdown,
task_queues: Mutex<(u64, HashMap<u64, TaskQueue>)>,
}
@ -221,14 +306,7 @@ impl Context {
}
}
let (handle, shutdown) = ContextThread::start(context_name, wait);
let context = Context(Arc::new(ContextInner {
name: context_name.into(),
handle: Mutex::new(handle),
_shutdown: shutdown,
task_queues: Mutex::new((0, HashMap::new())),
}));
let context = ContextThread::start(context_name, wait);
contexts.insert(context_name.into(), Arc::downgrade(&context.0));
gst_debug!(RUNTIME_CAT, "New Context '{}'", context.0.name);
@ -252,12 +330,34 @@ impl Context {
self.0.name.as_str()
}
pub fn spawn<Fut>(&self, future: Fut) -> tokio::task::JoinHandle<Fut::Output>
/// Returns `true` if a `Context` is running on current thread.
pub fn is_context_thread() -> bool {
CURRENT_THREAD_CONTEXT.with(|cur_ctx| cur_ctx.borrow().is_some())
}
/// Returns the `Context` running on current thread, if any.
pub fn current() -> Option<Context> {
CURRENT_THREAD_CONTEXT.with(|cur_ctx| {
cur_ctx
.borrow()
.as_ref()
.and_then(|ctx_weak| ctx_weak.upgrade())
})
}
pub fn enter<F, R>(&self, f: F) -> R
where
F: FnOnce() -> R,
{
self.0.handle.lock().unwrap().enter(f)
}
pub fn spawn<Fut>(&self, future: Fut) -> JoinHandle<Fut::Output>
where
Fut: Future + Send + 'static,
Fut::Output: Send + 'static,
{
self.0.handle.lock().unwrap().spawn(future)
self.0.handle.lock().unwrap().spawn(future).into()
}
pub fn release_task_queue(&self, id: TaskQueueId) -> Option<TaskQueue> {
@ -308,64 +408,35 @@ impl Context {
None
}
}
/// Builds a `Future` to execute an `action` at [`Interval`]s.
///
/// [`Interval`]: struct.Interval.html
pub fn interval<F, E, Fut>(&self, interval: Duration, f: F) -> impl Future<Output = Fut::Output>
where
F: Fn() -> Fut + Send + Sync + 'static,
E: Send + 'static,
Fut: Future<Output = Result<(), E>> + Send + 'static,
{
async move {
let mut interval = tokio::time::interval(interval);
loop {
interval.tick().await;
if let Err(err) = f().await {
break Err(err);
}
}
}
}
/// Builds a `Future` to execute an action after the given `delay` has elapsed.
pub fn delay_for<F, Fut>(&self, delay: Duration, f: F) -> impl Future<Output = Fut::Output>
where
F: FnOnce() -> Fut + Send + Sync + 'static,
Fut: Future + Send + 'static,
{
async move {
tokio::time::delay_for(delay).await;
f().await
}
}
}
#[cfg(test)]
mod tests {
use futures;
use futures::channel::mpsc;
use futures::future::abortable;
use futures::lock::Mutex;
use futures::prelude::*;
use gst;
use std::net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket};
use std::sync::Arc;
use std::time::Instant;
use std::time::{Duration, Instant};
use super::*;
use super::Context;
type Item = i32;
const SLEEP_DURATION: u32 = 2;
const INTERVAL: Duration = std::time::Duration::from_millis(100 * SLEEP_DURATION as u64);
const SLEEP_DURATION_MS: u32 = 2;
const SLEEP_DURATION: Duration = Duration::from_millis(SLEEP_DURATION_MS as u64);
const DELAY: Duration = Duration::from_millis(SLEEP_DURATION_MS as u64 * 10);
#[tokio::test]
async fn user_drain_pending_tasks() {
// Setup
gst::init().unwrap();
let context = Context::acquire("user_drain_task_queue", SLEEP_DURATION).unwrap();
let context = Context::acquire("user_drain_task_queue", SLEEP_DURATION_MS).unwrap();
let queue_id = context.acquire_task_queue_id();
let (sender, mut receiver) = mpsc::channel(1);
@ -404,105 +475,120 @@ mod tests {
}
#[tokio::test]
async fn delay_for() {
async fn block_on_within_tokio() {
let context = Context::acquire("block_on_within_tokio", SLEEP_DURATION_MS).unwrap();
let bytes_sent = crate::runtime::executor::block_on(context.spawn(async {
let saddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5000);
let socket = UdpSocket::bind(saddr).unwrap();
let mut socket = tokio::net::UdpSocket::from_std(socket).unwrap();
let saddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 4000);
socket.send_to(&[0; 10], saddr).await.unwrap()
}))
.unwrap();
assert_eq!(bytes_sent, 10);
let elapsed = crate::runtime::executor::block_on(context.spawn(async {
let now = Instant::now();
crate::runtime::time::delay_for(DELAY).await;
now.elapsed()
}))
.unwrap();
// Due to throttling, `Delay` may be fired earlier
assert!(elapsed + SLEEP_DURATION / 2 >= DELAY);
}
#[test]
fn block_on_from_sync() {
let context = Context::acquire("block_on_from_sync", SLEEP_DURATION_MS).unwrap();
let bytes_sent = crate::runtime::executor::block_on(context.spawn(async {
let saddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5001);
let socket = UdpSocket::bind(saddr).unwrap();
let mut socket = tokio::net::UdpSocket::from_std(socket).unwrap();
let saddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 4000);
socket.send_to(&[0; 10], saddr).await.unwrap()
}))
.unwrap();
assert_eq!(bytes_sent, 10);
let elapsed = crate::runtime::executor::block_on(context.spawn(async {
let now = Instant::now();
crate::runtime::time::delay_for(DELAY).await;
now.elapsed()
}))
.unwrap();
// Due to throttling, `Delay` may be fired earlier
assert!(elapsed + SLEEP_DURATION / 2 >= DELAY);
}
#[test]
fn block_on_from_context() {
gst::init().unwrap();
let context = Context::acquire("delay_for", SLEEP_DURATION).unwrap();
let (sender, receiver) = oneshot::channel();
let start = Instant::now();
let delayed_by_fut = context.delay_for(INTERVAL, move || {
async {
sender.send(42).unwrap();
}
let context = Context::acquire("block_on_from_context", SLEEP_DURATION_MS).unwrap();
let join_handle = context.spawn(async {
crate::runtime::executor::block_on(async {
crate::runtime::time::delay_for(DELAY).await;
});
});
context.spawn(delayed_by_fut);
let _ = receiver.await.unwrap();
let delta = Instant::now() - start;
assert!(delta >= INTERVAL);
assert!(delta < INTERVAL * 2);
// Panic: attempt to `runtime::executor::block_on` within a `Context` thread
futures::executor::block_on(join_handle).unwrap_err();
}
#[tokio::test]
async fn interval_ok() {
async fn enter_context_from_tokio() {
gst::init().unwrap();
let context = Context::acquire("interval_ok", SLEEP_DURATION).unwrap();
let context = Context::acquire("enter_context_from_tokio", SLEEP_DURATION_MS).unwrap();
let mut socket = context
.enter(|| {
let saddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5002);
let socket = UdpSocket::bind(saddr).unwrap();
tokio::net::UdpSocket::from_std(socket)
})
.unwrap();
let (sender, mut receiver) = mpsc::channel(1);
let sender: Arc<Mutex<mpsc::Sender<Instant>>> = Arc::new(Mutex::new(sender));
let saddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 4000);
let bytes_sent = socket.send_to(&[0; 10], saddr).await.unwrap();
assert_eq!(bytes_sent, 10);
let (interval_fut, handle) = abortable(context.interval(INTERVAL, move || {
let sender = Arc::clone(&sender);
async move {
let instant = Instant::now();
sender.lock().await.send(instant).await.map_err(drop)
}
}));
context.spawn(interval_fut.map(drop));
let mut idx: u32 = 0;
let mut first = Instant::now();
while let Some(instant) = receiver.next().await {
if idx > 0 {
let delta = instant - first;
assert!(delta > INTERVAL * (idx - 1));
assert!(delta < INTERVAL * (idx + 1));
} else {
first = instant;
}
if idx == 3 {
handle.abort();
break;
}
idx += 1;
}
let elapsed = context.enter(|| {
futures::executor::block_on(async {
let now = Instant::now();
crate::runtime::time::delay_for(DELAY).await;
now.elapsed()
})
});
// Due to throttling, `Delay` may be fired earlier
assert!(elapsed + SLEEP_DURATION / 2 >= DELAY);
}
#[tokio::test]
async fn interval_err() {
#[test]
fn enter_context_from_sync() {
gst::init().unwrap();
let context = Context::acquire("interval_err", SLEEP_DURATION).unwrap();
let context = Context::acquire("enter_context_from_sync", SLEEP_DURATION_MS).unwrap();
let mut socket = context
.enter(|| {
let saddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5003);
let socket = UdpSocket::bind(saddr).unwrap();
tokio::net::UdpSocket::from_std(socket)
})
.unwrap();
let (sender, mut receiver) = mpsc::channel(1);
let sender: Arc<Mutex<mpsc::Sender<Instant>>> = Arc::new(Mutex::new(sender));
let interval_idx: Arc<Mutex<Item>> = Arc::new(Mutex::new(0));
let saddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 4000);
let bytes_sent = futures::executor::block_on(socket.send_to(&[0; 10], saddr)).unwrap();
assert_eq!(bytes_sent, 10);
let interval_fut = context.interval(INTERVAL, move || {
let sender = Arc::clone(&sender);
let interval_idx = Arc::clone(&interval_idx);
async move {
let instant = Instant::now();
let mut idx = interval_idx.lock().await;
sender.lock().await.send(instant).await.unwrap();
*idx += 1;
if *idx < 3 {
Ok(())
} else {
Err(())
}
}
let elapsed = context.enter(|| {
futures::executor::block_on(async {
let now = Instant::now();
crate::runtime::time::delay_for(DELAY).await;
now.elapsed()
})
});
context.spawn(interval_fut.map(drop));
let mut idx: u32 = 0;
let mut first = Instant::now();
while let Some(instant) = receiver.next().await {
if idx > 0 {
let delta = instant - first;
assert!(delta > INTERVAL * (idx - 1));
assert!(delta < INTERVAL * (idx + 1));
} else {
first = instant;
}
idx += 1;
}
assert_eq!(idx, 3);
// Due to throttling, `Delay` may be fired earlier
assert!(elapsed + SLEEP_DURATION / 2 >= DELAY);
}
}

View file

@ -44,13 +44,13 @@
//! [`PadSink`]: pad/struct.PadSink.html
pub mod executor;
pub use executor::{Context, TaskOutput};
pub use executor::{Context, JoinHandle, TaskOutput};
pub mod pad;
pub use pad::{PadSink, PadSinkRef, PadSrc, PadSrcRef, PadSrcWeak};
pub mod pad_context;
pub use pad_context::{PadContext, PadContextWeak};
pub use pad_context::{PadContext, PadContextRef, PadContextWeak};
pub mod prelude {
pub use super::pad::{PadSinkHandler, PadSrcHandler};
@ -58,6 +58,8 @@ pub mod prelude {
pub mod task;
pub mod time;
use gst;
use lazy_static::lazy_static;

View file

@ -65,7 +65,6 @@
use either::Either;
use futures::executor::block_on;
use futures::future;
use futures::future::BoxFuture;
use futures::lock::{Mutex, MutexGuard};
@ -74,7 +73,7 @@ use futures::prelude::*;
use gst;
use gst::prelude::*;
use gst::subclass::prelude::*;
use gst::{gst_debug, gst_error, gst_log, gst_loggable_error};
use gst::{gst_debug, gst_error, gst_fixme, gst_log, gst_loggable_error};
use gst::{FlowError, FlowSuccess};
use std::fmt;
@ -82,7 +81,7 @@ use std::marker::PhantomData;
use std::sync;
use std::sync::{Arc, Weak};
use super::executor::Context;
use super::executor::{self, Context};
use super::pad_context::{PadContext, PadContextRef, PadContextWeak};
use super::task::Task;
use super::RUNTIME_CAT;
@ -399,7 +398,7 @@ impl<'a> PadSrcRef<'a> {
}
if !active {
block_on(async {
executor::block_on(async {
self.strong.lock_state().await.is_initialized = false;
});
}
@ -597,7 +596,7 @@ impl PadSrc {
self.0.lock_state().await
}
fn init_pad_functions<H: PadSrcHandler>(&self, handler: &H, context: Context) {
fn init_pad_functions<H: PadSrcHandler>(&self, handler: &H) {
let handler_clone = handler.clone();
let this_weak = self.downgrade();
self.gst_pad()
@ -649,7 +648,6 @@ impl PadSrc {
.set_event_full_function(move |_gst_pad, parent, event| {
let handler = handler_clone.clone();
let this_weak = this_weak.clone();
let context = context.clone();
H::ElementImpl::catch_panic_pad_function(
parent,
|| Err(FlowError::Error),
@ -657,13 +655,11 @@ impl PadSrc {
let this_ref = this_weak.upgrade().expect("PadSrc no longer exists");
match handler.src_event_full(this_ref, imp, &element, event) {
Either::Left(res) => res,
Either::Right(fut) => {
// FIXME if we could check whether current thread is already
// associated to an executor or not, we could `.await` here.
// But I couldn't find a solution for this with current version
// of `tokio`
context.spawn(fut.map(drop));
Ok(FlowSuccess::Ok)
Either::Right(_fut) => {
// See these threads:
// https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/merge_requests/240#note_378446
// https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/merge_requests/240#note_378454
unimplemented!("Future handling in src_event*");
}
}
},
@ -681,7 +677,12 @@ impl PadSrc {
|| false,
move |imp, element| {
let this_ref = this_weak.upgrade().expect("PadSrc no longer exists");
handler.src_query(this_ref, imp, &element, query)
if !query.is_serialized() {
handler.src_query(this_ref, imp, &element, query)
} else {
gst_fixme!(RUNTIME_CAT, obj: this_ref.gst_pad(), "Serialized Query not supported");
false
}
},
)
});
@ -706,9 +707,9 @@ impl PadSrc {
.await
.map_err(|_| PadContextError::ActiveTask)?;
state.pad_context = Some(PadContext::new(context.clone()));
state.pad_context = Some(PadContext::new(context));
self.init_pad_functions(handler, context);
self.init_pad_functions(handler);
Ok(())
}
@ -1181,7 +1182,12 @@ impl PadSink {
|| false,
move |imp, element| {
let this_ref = this_weak.upgrade().expect("PadSink no longer exists");
handler.sink_query(this_ref, imp, &element, query)
if !query.is_serialized() {
handler.sink_query(this_ref, imp, &element, query)
} else {
gst_fixme!(RUNTIME_CAT, obj: this_ref.gst_pad(), "Serialized Query not supported");
false
}
},
)
});
@ -1191,15 +1197,24 @@ impl PadSink {
pad_ctx: Arc<sync::Mutex<Option<PadContextWeak>>>,
fut: impl Future<Output = Result<FlowSuccess, FlowError>> + Send + 'static,
) -> Result<FlowSuccess, FlowError> {
match *pad_ctx.lock().unwrap() {
Some(ref pad_ctx_weak) => match pad_ctx_weak.upgrade() {
Some(pad_ctx) => {
pad_ctx.add_pending_task(fut.map(|res| res.map(drop)));
match pad_ctx
.lock()
.unwrap()
.as_ref()
.and_then(|pad_ctx_weak| pad_ctx_weak.upgrade())
{
Some(pad_ctx) => {
pad_ctx.add_pending_task(fut.map(|res| res.map(drop)));
Ok(FlowSuccess::Ok)
}
None => match Context::current() {
None => executor::block_on(fut),
Some(context) => {
// Don't block the Context thread
context.spawn(fut);
Ok(FlowSuccess::Ok)
}
None => block_on(fut),
},
None => block_on(fut),
}
}

View file

@ -27,9 +27,8 @@ use glib;
use glib::{glib_boxed_derive_traits, glib_boxed_type};
use std::marker::PhantomData;
use std::time::Duration;
use super::executor::{Context, ContextWeak, TaskOutput, TaskQueueId};
use super::executor::{Context, ContextWeak, JoinHandle, TaskOutput, TaskQueueId};
#[derive(Clone)]
pub struct PadContextWeak {
@ -91,7 +90,7 @@ impl<'a> PadContextRef<'a> {
self.strong.downgrade()
}
pub fn spawn<Fut>(&self, future: Fut) -> tokio::task::JoinHandle<Fut::Output>
pub fn spawn<Fut>(&self, future: Fut) -> JoinHandle<Fut::Output>
where
Fut: Future + Send + 'static,
Fut::Output: Send + 'static,
@ -117,27 +116,6 @@ impl<'a> PadContextRef<'a> {
pub fn context(&self) -> &Context {
&self.strong.context
}
/// Builds a `Future` to execute an `action` at [`Interval`]s.
///
/// [`Interval`]: struct.Interval.html
pub fn interval<F, E, Fut>(&self, interval: Duration, f: F) -> impl Future<Output = Fut::Output>
where
F: Fn() -> Fut + Send + Sync + 'static,
E: Send + 'static,
Fut: Future<Output = Result<(), E>> + Send + 'static,
{
self.strong.interval(interval, f)
}
/// Builds a `Future` to execute an action after the given `delay` has elapsed.
pub fn delay_for<F, Fut>(&self, delay: Duration, f: F) -> impl Future<Output = Fut::Output>
where
F: FnOnce() -> Fut + Send + Sync + 'static,
Fut: Future + Send + 'static,
{
self.strong.delay_for(delay, f)
}
}
impl std::fmt::Display for PadContextRef<'_> {
@ -180,25 +158,6 @@ impl PadContextStrong {
fn clear_pending_tasks(&self) {
self.context.clear_task_queue(self.queue_id);
}
#[inline]
fn interval<F, E, Fut>(&self, interval: Duration, f: F) -> impl Future<Output = Fut::Output>
where
F: Fn() -> Fut + Send + Sync + 'static,
E: Send + 'static,
Fut: Future<Output = Result<(), E>> + Send + 'static,
{
self.context.interval(interval, f)
}
#[inline]
pub fn delay_for<F, Fut>(&self, delay: Duration, f: F) -> impl Future<Output = Fut::Output>
where
F: FnOnce() -> Fut + Send + Sync + 'static,
Fut: Future + Send + 'static,
{
self.context.delay_for(delay, f)
}
}
impl std::fmt::Display for PadContextStrong {
@ -231,7 +190,7 @@ impl PadContext {
PadContextRef::new(self.0.context.clone(), self.0.queue_id)
}
pub fn spawn<Fut>(&self, future: Fut) -> tokio::task::JoinHandle<Fut::Output>
pub fn spawn<Fut>(&self, future: Fut) -> JoinHandle<Fut::Output>
where
Fut: Future + Send + 'static,
Fut::Output: Send + 'static,
@ -247,27 +206,6 @@ impl PadContext {
self.0.clear_pending_tasks();
}
/// Builds a `Future` to execute an `action` at [`Interval`]s.
///
/// [`Interval`]: struct.Interval.html
pub fn interval<F, E, Fut>(&self, interval: Duration, f: F) -> impl Future<Output = Fut::Output>
where
F: Fn() -> Fut + Send + Sync + 'static,
E: Send + 'static,
Fut: Future<Output = Result<(), E>> + Send + 'static,
{
self.0.interval(interval, f)
}
/// Builds a `Future` to execute an action after the given `delay` has elapsed.
pub fn delay_for<F, Fut>(&self, delay: Duration, f: F) -> impl Future<Output = Fut::Output>
where
F: FnOnce() -> Fut + Send + Sync + 'static,
Fut: Future + Send + 'static,
{
self.0.delay_for(delay, f)
}
pub(super) fn new_sticky_event(&self) -> gst::Event {
let s = gst::Structure::new("ts-pad-context", &[("pad-context", &self.downgrade())]);
gst::Event::new_custom_downstream_sticky(s).build()

View file

@ -29,7 +29,7 @@ use gst::{gst_debug, gst_log, gst_trace, gst_warning};
use std::fmt;
use std::sync::Arc;
use super::{Context, RUNTIME_CAT};
use super::{Context, JoinHandle, RUNTIME_CAT};
#[derive(Clone, Debug, Eq, PartialEq)]
pub enum TaskError {
@ -51,7 +51,7 @@ struct TaskInner {
context: Option<Context>,
state: TaskState,
abort_handle: Option<AbortHandle>,
loop_handle: Option<tokio::task::JoinHandle<Result<(), Aborted>>>,
loop_handle: Option<JoinHandle<Result<(), Aborted>>>,
}
impl Default for TaskInner {

View file

@ -0,0 +1,37 @@
// Copyright (C) 2020 François Laignel <fengalin@free.fr>
//
// This library is free software; you can redistribute it and/or
// modify it under the terms of the GNU Library General Public
// License as published by the Free Software Foundation; either
// version 2 of the License, or (at your option) any later version.
//
// This library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
// Library General Public License for more details.
//
// You should have received a copy of the GNU Library General Public
// License along with this library; if not, write to the
// Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
// Boston, MA 02110-1335, USA.
//! Wrappers for the underlying runtime specific time related Futures.
use futures::prelude::*;
use futures::stream::StreamExt;
use std::time::Duration;
/// Wait until the given `delay` has elapsed.
///
/// This must be called from within the target runtime environment.
pub async fn delay_for(delay: Duration) {
tokio::time::delay_for(delay).map(drop).await;
}
/// Builds a `Stream` that yields at `interval.
///
/// This must be called from within the target runtime environment.
pub fn interval(interval: Duration) -> impl Stream<Item = ()> {
tokio::time::interval(interval).map(drop)
}

View file

@ -17,7 +17,6 @@
// Boston, MA 02110-1335, USA.
use either::Either;
use futures::executor::block_on;
use futures::future::BoxFuture;
use futures::lock::{Mutex, MutexGuard};
use futures::prelude::*;
@ -44,10 +43,9 @@ use std::sync::Arc;
use std::u16;
use tokio::io::AsyncReadExt;
use tokio::task::JoinHandle;
use crate::runtime::prelude::*;
use crate::runtime::{Context, PadSrc, PadSrcRef};
use crate::runtime::{self, Context, JoinHandle, PadSrc, PadSrcRef};
use super::socket::{Socket, SocketRead, SocketStream};
@ -340,7 +338,7 @@ impl PadSrcHandler for TcpClientSrcPadHandler {
let ret = match event.view() {
EventView::FlushStart(..) => {
let _ = block_on(tcpclientsrc.pause(element));
let _ = runtime::executor::block_on(tcpclientsrc.pause(element));
true
}
EventView::FlushStop(..) => {
@ -348,7 +346,7 @@ impl PadSrcHandler for TcpClientSrcPadHandler {
if res == Ok(gst::StateChangeSuccess::Success) && state == gst::State::Playing
|| res == Ok(gst::StateChangeSuccess::Async) && pending == gst::State::Playing
{
let _ = block_on(tcpclientsrc.start(element));
let _ = runtime::executor::block_on(tcpclientsrc.start(element));
}
true
}
@ -385,7 +383,7 @@ impl PadSrcHandler for TcpClientSrcPadHandler {
true
}
QueryView::Caps(ref mut q) => {
let inner = block_on(self.lock());
let inner = runtime::executor::block_on(self.lock());
let caps = if let Some(ref caps) = inner.configured_caps {
q.get_filter()
.map(|f| f.intersect_with_mode(caps, gst::CapsIntersectMode::First))
@ -675,32 +673,27 @@ impl ObjectImpl for TcpClientSrc {
fn set_property(&self, _obj: &glib::Object, id: usize, value: &glib::Value) {
let prop = &PROPERTIES[id];
let mut settings = runtime::executor::block_on(self.settings.lock());
match *prop {
subclass::Property("address", ..) => {
let mut settings = block_on(self.settings.lock());
settings.address = value.get().expect("type checked upstream");
}
subclass::Property("port", ..) => {
let mut settings = block_on(self.settings.lock());
settings.port = value.get_some().expect("type checked upstream");
}
subclass::Property("caps", ..) => {
let mut settings = block_on(self.settings.lock());
settings.caps = value.get().expect("type checked upstream");
}
subclass::Property("chunk-size", ..) => {
let mut settings = block_on(self.settings.lock());
settings.chunk_size = value.get_some().expect("type checked upstream");
}
subclass::Property("context", ..) => {
let mut settings = block_on(self.settings.lock());
settings.context = value
.get()
.expect("type checked upstream")
.unwrap_or_else(|| "".into());
}
subclass::Property("context-wait", ..) => {
let mut settings = block_on(self.settings.lock());
settings.context_wait = value.get_some().expect("type checked upstream");
}
_ => unimplemented!(),
@ -710,31 +703,14 @@ impl ObjectImpl for TcpClientSrc {
fn get_property(&self, _obj: &glib::Object, id: usize) -> Result<glib::Value, ()> {
let prop = &PROPERTIES[id];
let settings = runtime::executor::block_on(self.settings.lock());
match *prop {
subclass::Property("address", ..) => {
let settings = block_on(self.settings.lock());
Ok(settings.address.to_value())
}
subclass::Property("port", ..) => {
let settings = block_on(self.settings.lock());
Ok(settings.port.to_value())
}
subclass::Property("caps", ..) => {
let settings = block_on(self.settings.lock());
Ok(settings.caps.to_value())
}
subclass::Property("chunk-size", ..) => {
let settings = block_on(self.settings.lock());
Ok(settings.chunk_size.to_value())
}
subclass::Property("context", ..) => {
let settings = block_on(self.settings.lock());
Ok(settings.context.to_value())
}
subclass::Property("context-wait", ..) => {
let settings = block_on(self.settings.lock());
Ok(settings.context_wait.to_value())
}
subclass::Property("address", ..) => Ok(settings.address.to_value()),
subclass::Property("port", ..) => Ok(settings.port.to_value()),
subclass::Property("caps", ..) => Ok(settings.caps.to_value()),
subclass::Property("chunk-size", ..) => Ok(settings.chunk_size.to_value()),
subclass::Property("context", ..) => Ok(settings.context.to_value()),
subclass::Property("context-wait", ..) => Ok(settings.context_wait.to_value()),
_ => unimplemented!(),
}
}
@ -759,22 +735,24 @@ impl ElementImpl for TcpClientSrc {
match transition {
gst::StateChange::NullToReady => {
block_on(self.prepare(element)).map_err(|err| {
runtime::executor::block_on(self.prepare(element)).map_err(|err| {
element.post_error_message(&err);
gst::StateChangeError
})?;
}
gst::StateChange::ReadyToPaused => {
block_on(self.complete_preparation(element)).map_err(|err| {
runtime::executor::block_on(self.complete_preparation(element)).map_err(|err| {
element.post_error_message(&err);
gst::StateChangeError
})?;
}
gst::StateChange::PlayingToPaused => {
block_on(self.pause(element)).map_err(|_| gst::StateChangeError)?;
runtime::executor::block_on(self.pause(element))
.map_err(|_| gst::StateChangeError)?;
}
gst::StateChange::ReadyToNull => {
block_on(self.unprepare(element)).map_err(|_| gst::StateChangeError)?;
runtime::executor::block_on(self.unprepare(element))
.map_err(|_| gst::StateChangeError)?;
}
_ => (),
}
@ -786,10 +764,11 @@ impl ElementImpl for TcpClientSrc {
success = gst::StateChangeSuccess::Success;
}
gst::StateChange::PausedToPlaying => {
block_on(self.start(element)).map_err(|_| gst::StateChangeError)?;
runtime::executor::block_on(self.start(element))
.map_err(|_| gst::StateChangeError)?;
}
gst::StateChange::PausedToReady => {
let mut src_pad_handler = block_on(self.src_pad_handler.lock());
let mut src_pad_handler = runtime::executor::block_on(self.src_pad_handler.lock());
src_pad_handler.need_initial_events = true;
}
_ => (),

View file

@ -17,7 +17,6 @@
use either::Either;
use futures::executor::block_on;
use futures::future::BoxFuture;
use futures::lock::{Mutex, MutexGuard};
use futures::prelude::*;
@ -55,10 +54,8 @@ use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
#[cfg(windows)]
use std::os::windows::io::{AsRawSocket, FromRawSocket, IntoRawSocket, RawSocket};
use tokio::task::JoinHandle;
use crate::runtime::prelude::*;
use crate::runtime::{Context, PadSrc, PadSrcRef};
use crate::runtime::{self, Context, JoinHandle, PadSrc, PadSrcRef};
use super::socket::{Socket, SocketRead, SocketStream};
@ -508,7 +505,7 @@ impl PadSrcHandler for UdpSrcPadHandler {
let ret = match event.view() {
EventView::FlushStart(..) => {
let _ = block_on(udpsrc.pause(element));
let _ = runtime::executor::block_on(udpsrc.pause(element));
true
}
EventView::FlushStop(..) => {
@ -516,7 +513,7 @@ impl PadSrcHandler for UdpSrcPadHandler {
if res == Ok(gst::StateChangeSuccess::Success) && state == gst::State::Playing
|| res == Ok(gst::StateChangeSuccess::Async) && pending == gst::State::Playing
{
let _ = block_on(udpsrc.start(element));
let _ = runtime::executor::block_on(udpsrc.start(element));
}
true
}
@ -554,7 +551,7 @@ impl PadSrcHandler for UdpSrcPadHandler {
true
}
QueryView::Caps(ref mut q) => {
let inner = block_on(self.lock());
let inner = runtime::executor::block_on(self.lock());
let caps = if let Some(ref caps) = inner.configured_caps {
q.get_filter()
.map(|f| f.intersect_with_mode(caps, gst::CapsIntersectMode::First))
@ -1032,29 +1029,24 @@ impl ObjectImpl for UdpSrc {
fn set_property(&self, _obj: &glib::Object, id: usize, value: &glib::Value) {
let prop = &PROPERTIES[id];
let mut settings = runtime::executor::block_on(self.settings.lock());
match *prop {
subclass::Property("address", ..) => {
let mut settings = block_on(self.settings.lock());
settings.address = value.get().expect("type checked upstream");
}
subclass::Property("port", ..) => {
let mut settings = block_on(self.settings.lock());
settings.port = value.get_some().expect("type checked upstream");
}
subclass::Property("reuse", ..) => {
let mut settings = block_on(self.settings.lock());
settings.reuse = value.get_some().expect("type checked upstream");
}
subclass::Property("caps", ..) => {
let mut settings = block_on(self.settings.lock());
settings.caps = value.get().expect("type checked upstream");
}
subclass::Property("mtu", ..) => {
let mut settings = block_on(self.settings.lock());
settings.mtu = value.get_some().expect("type checked upstream");
}
subclass::Property("socket", ..) => {
let mut settings = block_on(self.settings.lock());
settings.socket = value
.get::<gio::Socket>()
.expect("type checked upstream")
@ -1064,18 +1056,15 @@ impl ObjectImpl for UdpSrc {
unreachable!();
}
subclass::Property("context", ..) => {
let mut settings = block_on(self.settings.lock());
settings.context = value
.get()
.expect("type checked upstream")
.unwrap_or_else(|| "".into());
}
subclass::Property("context-wait", ..) => {
let mut settings = block_on(self.settings.lock());
settings.context_wait = value.get_some().expect("type checked upstream");
}
subclass::Property("retrieve-sender-address", ..) => {
let mut settings = block_on(self.settings.lock());
settings.retrieve_sender_address = value.get_some().expect("type checked upstream");
}
_ => unimplemented!(),
@ -1085,53 +1074,26 @@ impl ObjectImpl for UdpSrc {
fn get_property(&self, _obj: &glib::Object, id: usize) -> Result<glib::Value, ()> {
let prop = &PROPERTIES[id];
let settings = runtime::executor::block_on(self.settings.lock());
match *prop {
subclass::Property("address", ..) => {
let settings = block_on(self.settings.lock());
Ok(settings.address.to_value())
}
subclass::Property("port", ..) => {
let settings = block_on(self.settings.lock());
Ok(settings.port.to_value())
}
subclass::Property("reuse", ..) => {
let settings = block_on(self.settings.lock());
Ok(settings.reuse.to_value())
}
subclass::Property("caps", ..) => {
let settings = block_on(self.settings.lock());
Ok(settings.caps.to_value())
}
subclass::Property("mtu", ..) => {
let settings = block_on(self.settings.lock());
Ok(settings.mtu.to_value())
}
subclass::Property("socket", ..) => {
let settings = block_on(self.settings.lock());
Ok(settings
.socket
.as_ref()
.map(GioSocketWrapper::as_socket)
.to_value())
}
subclass::Property("used-socket", ..) => {
let settings = block_on(self.settings.lock());
Ok(settings
.used_socket
.as_ref()
.map(GioSocketWrapper::as_socket)
.to_value())
}
subclass::Property("context", ..) => {
let settings = block_on(self.settings.lock());
Ok(settings.context.to_value())
}
subclass::Property("context-wait", ..) => {
let settings = block_on(self.settings.lock());
Ok(settings.context_wait.to_value())
}
subclass::Property("address", ..) => Ok(settings.address.to_value()),
subclass::Property("port", ..) => Ok(settings.port.to_value()),
subclass::Property("reuse", ..) => Ok(settings.reuse.to_value()),
subclass::Property("caps", ..) => Ok(settings.caps.to_value()),
subclass::Property("mtu", ..) => Ok(settings.mtu.to_value()),
subclass::Property("socket", ..) => Ok(settings
.socket
.as_ref()
.map(GioSocketWrapper::as_socket)
.to_value()),
subclass::Property("used-socket", ..) => Ok(settings
.used_socket
.as_ref()
.map(GioSocketWrapper::as_socket)
.to_value()),
subclass::Property("context", ..) => Ok(settings.context.to_value()),
subclass::Property("context-wait", ..) => Ok(settings.context_wait.to_value()),
subclass::Property("retrieve-sender-address", ..) => {
let settings = block_on(self.settings.lock());
Ok(settings.retrieve_sender_address.to_value())
}
_ => unimplemented!(),
@ -1157,22 +1119,24 @@ impl ElementImpl for UdpSrc {
match transition {
gst::StateChange::NullToReady => {
block_on(self.prepare(element)).map_err(|err| {
runtime::executor::block_on(self.prepare(element)).map_err(|err| {
element.post_error_message(&err);
gst::StateChangeError
})?;
}
gst::StateChange::ReadyToPaused => {
block_on(self.complete_preparation(element)).map_err(|err| {
runtime::executor::block_on(self.complete_preparation(element)).map_err(|err| {
element.post_error_message(&err);
gst::StateChangeError
})?;
}
gst::StateChange::PlayingToPaused => {
block_on(self.pause(element)).map_err(|_| gst::StateChangeError)?;
runtime::executor::block_on(self.pause(element))
.map_err(|_| gst::StateChangeError)?;
}
gst::StateChange::ReadyToNull => {
block_on(self.unprepare(element)).map_err(|_| gst::StateChangeError)?;
runtime::executor::block_on(self.unprepare(element))
.map_err(|_| gst::StateChangeError)?;
}
_ => (),
}
@ -1184,10 +1148,11 @@ impl ElementImpl for UdpSrc {
success = gst::StateChangeSuccess::NoPreroll;
}
gst::StateChange::PausedToPlaying => {
block_on(self.start(element)).map_err(|_| gst::StateChangeError)?;
runtime::executor::block_on(self.start(element))
.map_err(|_| gst::StateChangeError)?;
}
gst::StateChange::PausedToReady => {
block_on(async {
runtime::executor::block_on(async {
self.src_pad_handler.lock().await.need_initial_events = true;
});
}

View file

@ -1,4 +1,4 @@
// Copyright (C) 2019 François Laignel <fengalin@free.fr>
// Copyright (C) 2019-2020 François Laignel <fengalin@free.fr>
//
// This library is free software; you can redistribute it and/or
// modify it under the terms of the GNU Library General Public
@ -18,7 +18,6 @@
use either::Either;
use futures::channel::mpsc;
use futures::executor::block_on;
use futures::future::BoxFuture;
use futures::lock::Mutex;
use futures::prelude::*;
@ -38,7 +37,7 @@ use std::boxed::Box;
use std::sync::Arc;
use gstthreadshare::runtime::prelude::*;
use gstthreadshare::runtime::{Context, PadContext, PadSink, PadSinkRef, PadSrc, PadSrcRef};
use gstthreadshare::runtime::{self, Context, PadContext, PadSink, PadSinkRef, PadSrc, PadSrcRef};
const DEFAULT_CONTEXT: &str = "";
const SLEEP_DURATION: u32 = 2;
@ -151,7 +150,7 @@ impl PadSrcHandler for PadSrcHandlerTest {
let ret = match event.view() {
EventView::FlushStart(..) => {
let _ = block_on(elem_src_test.pause(element));
let _ = runtime::executor::block_on(elem_src_test.pause(element));
true
}
EventView::FlushStop(..) => {
@ -159,7 +158,7 @@ impl PadSrcHandler for PadSrcHandlerTest {
if res == Ok(gst::StateChangeSuccess::Success) && state == gst::State::Playing
|| res == Ok(gst::StateChangeSuccess::Async) && pending == gst::State::Playing
{
let _ = block_on(elem_src_test.start(element));
let _ = runtime::executor::block_on(elem_src_test.start(element));
}
true
}
@ -209,14 +208,13 @@ impl ElementSrcTest {
let _state = self.state.lock().await;
gst_debug!(SRC_CAT, obj: element, "Preparing");
let settings = self.settings.lock().await;
let context = Context::acquire(&settings.context, SLEEP_DURATION).map_err(|err| {
gst_error_msg!(
gst::ResourceError::OpenRead,
["Failed to acquire Context: {}", err]
)
})?;
let context = Context::acquire(&self.settings.lock().await.context, SLEEP_DURATION)
.map_err(|err| {
gst_error_msg!(
gst::ResourceError::OpenRead,
["Failed to acquire Context: {}", err]
)
})?;
self.src_pad
.prepare(context, &self.src_pad_handler)
@ -320,7 +318,7 @@ impl ObjectSubclass for ElementSrcTest {
ElementSrcTest {
src_pad,
src_pad_handler: PadSrcHandlerTest {},
src_pad_handler: PadSrcHandlerTest,
state: Mutex::new(ElementSrcState::default()),
settings: Mutex::new(settings),
}
@ -340,7 +338,7 @@ impl ObjectImpl for ElementSrcTest {
.expect("type checked upstream")
.unwrap_or_else(|| "".into());
block_on(self.settings.lock()).context = context;
runtime::executor::block_on(self.settings.lock()).context = context;
}
_ => unimplemented!(),
}
@ -364,16 +362,18 @@ impl ElementImpl for ElementSrcTest {
match transition {
gst::StateChange::NullToReady => {
block_on(self.prepare(element)).map_err(|err| {
runtime::executor::block_on(self.prepare(element)).map_err(|err| {
element.post_error_message(&err);
gst::StateChangeError
})?;
}
gst::StateChange::PlayingToPaused => {
block_on(self.pause(element)).map_err(|_| gst::StateChangeError)?;
runtime::executor::block_on(self.pause(element))
.map_err(|_| gst::StateChangeError)?;
}
gst::StateChange::ReadyToNull => {
block_on(self.unprepare(element)).map_err(|_| gst::StateChangeError)?;
runtime::executor::block_on(self.unprepare(element))
.map_err(|_| gst::StateChangeError)?;
}
_ => (),
}
@ -382,7 +382,8 @@ impl ElementImpl for ElementSrcTest {
match transition {
gst::StateChange::PausedToPlaying => {
block_on(self.start(element)).map_err(|_| gst::StateChangeError)?;
runtime::executor::block_on(self.start(element))
.map_err(|_| gst::StateChangeError)?;
}
gst::StateChange::ReadyToPaused => {
success = gst::StateChangeSuccess::NoPreroll;
@ -516,7 +517,7 @@ impl PadSinkHandler for PadSinkHandlerTest {
} else {
gst_debug!(SINK_CAT, obj: pad.gst_pad(), "Fowarding non-serialized event {:?}", event);
Either::Left(
block_on(elem_sink_test.sender.lock())
runtime::executor::block_on(elem_sink_test.sender.lock())
.as_mut()
.expect("ItemSender not set")
.try_send(Item::Event(event))
@ -633,7 +634,7 @@ impl ObjectImpl for ElementSinkTest {
.expect("type checked upstream")
.expect("ItemSender not found")
.clone();
*block_on(self.sender.lock()) = Some(sender);
*runtime::executor::block_on(self.sender.lock()) = Some(sender);
}
_ => unimplemented!(),
}
@ -657,10 +658,10 @@ impl ElementImpl for ElementSinkTest {
match transition {
gst::StateChange::NullToReady => {
block_on(self.sink_pad.prepare(&PadSinkHandlerTest {}));
runtime::executor::block_on(self.sink_pad.prepare(&PadSinkHandlerTest));
}
gst::StateChange::ReadyToNull => {
block_on(self.sink_pad.unprepare());
runtime::executor::block_on(self.sink_pad.unprepare());
}
_ => (),
}
@ -714,7 +715,7 @@ fn task() {
pipeline.set_state(gst::State::Playing).unwrap();
// Initial events
block_on(
runtime::executor::block_on(
elem_src_test.try_push(Item::Event(
gst::Event::new_stream_start("stream_id_task_test")
.group_id(gst::util_group_id_next())
@ -723,7 +724,7 @@ fn task() {
)
.unwrap();
match block_on(receiver.next()).unwrap() {
match runtime::executor::block_on(receiver.next()).unwrap() {
Item::Event(event) => match event.view() {
gst::EventView::CustomDownstreamSticky(e) => {
assert!(PadContext::is_pad_context_sticky_event(&e))
@ -733,7 +734,7 @@ fn task() {
other => panic!("Unexpected item {:?}", other),
}
match block_on(receiver.next()).unwrap() {
match runtime::executor::block_on(receiver.next()).unwrap() {
Item::Event(event) => match event.view() {
gst::EventView::StreamStart(_) => (),
other => panic!("Unexpected event {:?}", other),
@ -741,12 +742,12 @@ fn task() {
other => panic!("Unexpected item {:?}", other),
}
block_on(elem_src_test.try_push(Item::Event(
runtime::executor::block_on(elem_src_test.try_push(Item::Event(
gst::Event::new_segment(&gst::FormattedSegment::<gst::format::Time>::new()).build(),
)))
.unwrap();
match block_on(receiver.next()).unwrap() {
match runtime::executor::block_on(receiver.next()).unwrap() {
Item::Event(event) => match event.view() {
gst::EventView::Segment(_) => (),
other => panic!("Unexpected event {:?}", other),
@ -755,10 +756,12 @@ fn task() {
}
// Buffer
block_on(elem_src_test.try_push(Item::Buffer(gst::Buffer::from_slice(vec![1, 2, 3, 4]))))
.unwrap();
runtime::executor::block_on(
elem_src_test.try_push(Item::Buffer(gst::Buffer::from_slice(vec![1, 2, 3, 4]))),
)
.unwrap();
match block_on(receiver.next()).unwrap() {
match runtime::executor::block_on(receiver.next()).unwrap() {
Item::Buffer(buffer) => {
let data = buffer.map_readable().unwrap();
assert_eq!(data.as_slice(), vec![1, 2, 3, 4].as_slice());
@ -771,9 +774,9 @@ fn task() {
list.get_mut()
.unwrap()
.add(gst::Buffer::from_slice(vec![1, 2, 3, 4]));
block_on(elem_src_test.try_push(Item::BufferList(list))).unwrap();
runtime::executor::block_on(elem_src_test.try_push(Item::BufferList(list))).unwrap();
match block_on(receiver.next()).unwrap() {
match runtime::executor::block_on(receiver.next()).unwrap() {
Item::BufferList(_) => (),
other => panic!("Unexpected item {:?}", other),
}
@ -782,8 +785,10 @@ fn task() {
pipeline.set_state(gst::State::Paused).unwrap();
// Items not longer accepted
block_on(elem_src_test.try_push(Item::Buffer(gst::Buffer::from_slice(vec![1, 2, 3, 4]))))
.unwrap_err();
runtime::executor::block_on(
elem_src_test.try_push(Item::Buffer(gst::Buffer::from_slice(vec![1, 2, 3, 4]))),
)
.unwrap_err();
// Nothing forwarded
receiver.try_next().unwrap_err();
@ -810,8 +815,9 @@ fn task() {
.unwrap());
// EOS
block_on(elem_src_test.try_push(Item::Event(gst::Event::new_eos().build()))).unwrap();
match block_on(receiver.next()).unwrap() {
runtime::executor::block_on(elem_src_test.try_push(Item::Event(gst::Event::new_eos().build())))
.unwrap();
match runtime::executor::block_on(receiver.next()).unwrap() {
Item::Event(event) => match event.view() {
gst::EventView::Eos(_) => (),
other => panic!("Unexpected event {:?}", other),
@ -823,7 +829,7 @@ fn task() {
pipeline.set_state(gst::State::Ready).unwrap();
// Receiver was dropped when stopping => can't send anymore
block_on(
runtime::executor::block_on(
elem_src_test.try_push(Item::Event(
gst::Event::new_stream_start("stream_id_task_test_past_stop")
.group_id(gst::util_group_id_next())