diff --git a/gst-plugin-threadshare/Cargo.toml b/gst-plugin-threadshare/Cargo.toml index a346f288..2ea34eb7 100644 --- a/gst-plugin-threadshare/Cargo.toml +++ b/gst-plugin-threadshare/Cargo.toml @@ -21,7 +21,7 @@ gst-net = { package = "gstreamer-net", git = "https://gitlab.freedesktop.org/gst gst-rtp = { package = "gstreamer-rtp", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" } gstreamer-sys = { git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs-sys" } pin-project = "0.4" -tokio = { git = "https://github.com/fengalin/tokio", branch = "fengalin/0.2.5-throttling", features = ["io-util", "macros", "rt-core", "sync", "time", "tcp", "udp"] } +tokio = { git = "https://github.com/fengalin/tokio", branch = "fengalin/throttling_20200105", features = ["io-util", "macros", "rt-core", "sync", "stream", "time", "tcp", "udp"] } futures = "0.3" lazy_static = "1.0" either = "1.0" diff --git a/gst-plugin-threadshare/src/appsrc.rs b/gst-plugin-threadshare/src/appsrc.rs index fb6df302..8bb007fd 100644 --- a/gst-plugin-threadshare/src/appsrc.rs +++ b/gst-plugin-threadshare/src/appsrc.rs @@ -18,7 +18,6 @@ use either::Either; use futures::channel::mpsc; -use futures::executor::block_on; use futures::future::BoxFuture; use futures::lock::{Mutex, MutexGuard}; use futures::prelude::*; @@ -44,7 +43,7 @@ use std::sync::Arc; use std::u32; use crate::runtime::prelude::*; -use crate::runtime::{Context, PadSrc, PadSrcRef}; +use crate::runtime::{self, Context, PadSrc, PadSrcRef}; const DEFAULT_CONTEXT: &str = ""; const DEFAULT_CONTEXT_WAIT: u32 = 0; @@ -280,7 +279,7 @@ impl PadSrcHandler for AppSrcPadHandler { let ret = match event.view() { EventView::FlushStart(..) => { - let _ = block_on(app_src.pause(element)); + let _ = runtime::executor::block_on(app_src.pause(element)); true } EventView::FlushStop(..) => { @@ -288,7 +287,7 @@ impl PadSrcHandler for AppSrcPadHandler { if res == Ok(gst::StateChangeSuccess::Success) && state == gst::State::Playing || res == Ok(gst::StateChangeSuccess::Async) && pending == gst::State::Playing { - let _ = block_on(app_src.start(element)); + let _ = runtime::executor::block_on(app_src.start(element)); } true } @@ -325,7 +324,7 @@ impl PadSrcHandler for AppSrcPadHandler { true } QueryView::Caps(ref mut q) => { - let inner = block_on(self.lock()); + let inner = runtime::executor::block_on(self.lock()); let caps = if let Some(ref caps) = inner.configured_caps { q.get_filter() .map(|f| f.intersect_with_mode(caps, gst::CapsIntersectMode::First)) @@ -419,15 +418,15 @@ impl AppSrc { let _state = self.state.lock().await; gst_debug!(CAT, obj: element, "Preparing"); - let settings = self.settings.lock().await; - - let context = + let context = { + let settings = self.settings.lock().await; Context::acquire(&settings.context, settings.context_wait).map_err(|err| { gst_error_msg!( gst::ResourceError::OpenRead, ["Failed to acquire Context: {}", err] ) - })?; + })? + }; self.src_pad .prepare(context, &self.src_pad_handler) @@ -540,7 +539,7 @@ impl ObjectSubclass for AppSrc { .expect("missing signal arg"); let appsrc = Self::from_instance(&element); - Some(block_on(appsrc.push_buffer(&element, buffer)).to_value()) + Some(runtime::executor::block_on(appsrc.push_buffer(&element, buffer)).to_value()) }, ); @@ -555,7 +554,7 @@ impl ObjectSubclass for AppSrc { .expect("signal arg") .expect("missing signal arg"); let appsrc = Self::from_instance(&element); - Some(block_on(appsrc.end_of_stream(&element)).to_value()) + Some(runtime::executor::block_on(appsrc.end_of_stream(&element)).to_value()) }, ); } @@ -579,28 +578,24 @@ impl ObjectImpl for AppSrc { fn set_property(&self, _obj: &glib::Object, id: usize, value: &glib::Value) { let prop = &PROPERTIES[id]; + let mut settings = runtime::executor::block_on(self.settings.lock()); match *prop { subclass::Property("context", ..) => { - let mut settings = block_on(self.settings.lock()); settings.context = value .get() .expect("type checked upstream") .unwrap_or_else(|| "".into()); } subclass::Property("context-wait", ..) => { - let mut settings = block_on(self.settings.lock()); settings.context_wait = value.get_some().expect("type checked upstream"); } subclass::Property("caps", ..) => { - let mut settings = block_on(self.settings.lock()); settings.caps = value.get().expect("type checked upstream"); } subclass::Property("max-buffers", ..) => { - let mut settings = block_on(self.settings.lock()); settings.max_buffers = value.get_some().expect("type checked upstream"); } subclass::Property("do-timestamp", ..) => { - let mut settings = block_on(self.settings.lock()); settings.do_timestamp = value.get_some().expect("type checked upstream"); } _ => unimplemented!(), @@ -610,27 +605,13 @@ impl ObjectImpl for AppSrc { fn get_property(&self, _obj: &glib::Object, id: usize) -> Result { let prop = &PROPERTIES[id]; + let settings = runtime::executor::block_on(self.settings.lock()); match *prop { - subclass::Property("context", ..) => { - let settings = block_on(self.settings.lock()); - Ok(settings.context.to_value()) - } - subclass::Property("context-wait", ..) => { - let settings = block_on(self.settings.lock()); - Ok(settings.context_wait.to_value()) - } - subclass::Property("caps", ..) => { - let settings = block_on(self.settings.lock()); - Ok(settings.caps.to_value()) - } - subclass::Property("max-buffers", ..) => { - let settings = block_on(self.settings.lock()); - Ok(settings.max_buffers.to_value()) - } - subclass::Property("do-timestamp", ..) => { - let settings = block_on(self.settings.lock()); - Ok(settings.do_timestamp.to_value()) - } + subclass::Property("context", ..) => Ok(settings.context.to_value()), + subclass::Property("context-wait", ..) => Ok(settings.context_wait.to_value()), + subclass::Property("caps", ..) => Ok(settings.caps.to_value()), + subclass::Property("max-buffers", ..) => Ok(settings.max_buffers.to_value()), + subclass::Property("do-timestamp", ..) => Ok(settings.do_timestamp.to_value()), _ => unimplemented!(), } } @@ -655,16 +636,18 @@ impl ElementImpl for AppSrc { match transition { gst::StateChange::NullToReady => { - block_on(self.prepare(element)).map_err(|err| { + runtime::executor::block_on(self.prepare(element)).map_err(|err| { element.post_error_message(&err); gst::StateChangeError })?; } gst::StateChange::PlayingToPaused => { - block_on(self.pause(element)).map_err(|_| gst::StateChangeError)?; + runtime::executor::block_on(self.pause(element)) + .map_err(|_| gst::StateChangeError)?; } gst::StateChange::ReadyToNull => { - block_on(self.unprepare(element)).map_err(|_| gst::StateChangeError)?; + runtime::executor::block_on(self.unprepare(element)) + .map_err(|_| gst::StateChangeError)?; } _ => (), } @@ -676,10 +659,11 @@ impl ElementImpl for AppSrc { success = gst::StateChangeSuccess::NoPreroll; } gst::StateChange::PausedToPlaying => { - block_on(self.start(element)).map_err(|_| gst::StateChangeError)?; + runtime::executor::block_on(self.start(element)) + .map_err(|_| gst::StateChangeError)?; } gst::StateChange::PausedToReady => { - block_on(async { + runtime::executor::block_on(async { self.src_pad_handler.lock().await.need_initial_events = true; }); } diff --git a/gst-plugin-threadshare/src/jitterbuffer/jitterbuffer.rs b/gst-plugin-threadshare/src/jitterbuffer/jitterbuffer.rs index e2ddda40..33f6e290 100644 --- a/gst-plugin-threadshare/src/jitterbuffer/jitterbuffer.rs +++ b/gst-plugin-threadshare/src/jitterbuffer/jitterbuffer.rs @@ -17,7 +17,6 @@ use either::Either; -use futures::executor::block_on; use futures::future::BoxFuture; use futures::future::{abortable, AbortHandle, Aborted}; use futures::lock::{Mutex, MutexGuard}; @@ -43,7 +42,9 @@ use std::collections::{BTreeSet, VecDeque}; use std::time::Duration; use crate::runtime::prelude::*; -use crate::runtime::{Context, PadContext, PadContextWeak, PadSink, PadSinkRef, PadSrc, PadSrcRef}; +use crate::runtime::{ + self, Context, JoinHandle, PadContext, PadContextRef, PadSink, PadSinkRef, PadSrc, PadSrcRef, +}; use super::{RTPJitterBuffer, RTPJitterBufferItem, RTPPacketRateCtx}; @@ -226,7 +227,7 @@ impl PadSinkHandler for JitterBufferPadSinkHandler { }.boxed()) } else { if let EventView::FlushStop(..) = event.view() { - block_on(jitterbuffer.flush(element)); + runtime::executor::block_on(jitterbuffer.flush(element)); } gst_log!(CAT, obj: pad.gst_pad(), "Forwarding non-serialized event {:?}", event); @@ -247,7 +248,8 @@ impl PadSinkHandler for JitterBufferPadSinkHandler { match query.view_mut() { QueryView::Drain(..) => { gst_info!(CAT, obj: pad.gst_pad(), "Draining"); - block_on(jitterbuffer.enqueue_item(pad.gst_pad(), element, None)).is_ok() + runtime::executor::block_on(jitterbuffer.enqueue_item(pad.gst_pad(), element, None)) + .is_ok() } _ => jitterbuffer.src_pad.gst_pad().peer_query(query), } @@ -277,8 +279,9 @@ impl PadSrcHandler for JitterBufferPadSrcHandler { if ret { let (_, mut min_latency, _) = peer_query.get_result(); - let our_latency = - block_on(jitterbuffer.settings.lock()).latency_ms as u64 * gst::MSECOND; + let our_latency = runtime::executor::block_on(jitterbuffer.settings.lock()) + .latency_ms as u64 + * gst::MSECOND; min_latency += our_latency; let max_latency = gst::CLOCK_TIME_NONE; @@ -292,7 +295,11 @@ impl PadSrcHandler for JitterBufferPadSrcHandler { if q.get_format() != gst::Format::Time { jitterbuffer.sink_pad.gst_pad().peer_query(query) } else { - q.set(block_on(jitterbuffer.state.lock()).segment.get_position()); + q.set( + runtime::executor::block_on(jitterbuffer.state.lock()) + .segment + .get_position(), + ); true } } @@ -355,7 +362,7 @@ struct State { last_res: Result, task_queue_abort_handle: Option, wakeup_abort_handle: Option, - wakeup_join_handle: Option>>, + wakeup_join_handle: Option>>, } impl Default for State { @@ -952,24 +959,29 @@ impl JitterBuffer { gst_debug!(CAT, obj: element, "Scheduling wakeup in {}", delay); - let element = element.clone(); - let pad_ctx_weak = pad_ctx.downgrade(); - let wakeup_fut = pad_ctx.delay_for(Duration::from_nanos(delay), move || { - Self::wakeup_fut(latency_ns, context_wait_ns, element, pad_ctx_weak) - }); - - let (wakeup_fut, abort_handle) = abortable(wakeup_fut); + let (wakeup_fut, abort_handle) = abortable(Self::wakeup_fut( + Duration::from_nanos(delay), + latency_ns, + context_wait_ns, + &element, + &pad_ctx, + )); state.wakeup_join_handle = Some(pad_ctx.spawn(wakeup_fut)); state.wakeup_abort_handle = Some(abort_handle); } fn wakeup_fut( + delay: Duration, latency_ns: gst::ClockTime, context_wait_ns: gst::ClockTime, - element: gst::Element, - pad_ctx_weak: PadContextWeak, + element: &gst::Element, + pad_ctx: &PadContextRef, ) -> BoxFuture<'static, ()> { + let element = element.clone(); + let pad_ctx_weak = pad_ctx.downgrade(); async move { + runtime::time::delay_for(delay).await; + let jb = Self::from_instance(&element); let mut state = jb.state.lock().await; @@ -1140,7 +1152,7 @@ impl ObjectSubclass for JitterBuffer { .expect("signal arg") .expect("missing signal arg"); let jitterbuffer = Self::from_instance(&element); - block_on(jitterbuffer.clear_pt_map(&element)); + runtime::executor::block_on(jitterbuffer.clear_pt_map(&element)); None }, ); @@ -1178,12 +1190,12 @@ impl ObjectImpl for JitterBuffer { fn set_property(&self, _obj: &glib::Object, id: usize, value: &glib::Value) { let prop = &PROPERTIES[id]; + let mut settings = runtime::executor::block_on(self.settings.lock()); match *prop { subclass::Property("latency", ..) => { - let mut settings = block_on(self.settings.lock()); settings.latency_ms = value.get_some().expect("type checked upstream"); - block_on(self.state.lock()) + runtime::executor::block_on(self.state.lock()) .jbuf .borrow() .set_delay(settings.latency_ms as u64 * gst::MSECOND); @@ -1191,26 +1203,21 @@ impl ObjectImpl for JitterBuffer { /* TODO: post message */ } subclass::Property("do-lost", ..) => { - let mut settings = block_on(self.settings.lock()); settings.do_lost = value.get_some().expect("type checked upstream"); } subclass::Property("max-dropout-time", ..) => { - let mut settings = block_on(self.settings.lock()); settings.max_dropout_time = value.get_some().expect("type checked upstream"); } subclass::Property("max-misorder-time", ..) => { - let mut settings = block_on(self.settings.lock()); settings.max_misorder_time = value.get_some().expect("type checked upstream"); } subclass::Property("context", ..) => { - let mut settings = block_on(self.settings.lock()); settings.context = value .get() .expect("type checked upstream") .unwrap_or_else(|| "".into()); } subclass::Property("context-wait", ..) => { - let mut settings = block_on(self.settings.lock()); settings.context_wait = value.get_some().expect("type checked upstream"); } _ => unimplemented!(), @@ -1222,23 +1229,23 @@ impl ObjectImpl for JitterBuffer { match *prop { subclass::Property("latency", ..) => { - let settings = block_on(self.settings.lock()); + let settings = runtime::executor::block_on(self.settings.lock()); Ok(settings.latency_ms.to_value()) } subclass::Property("do-lost", ..) => { - let settings = block_on(self.settings.lock()); + let settings = runtime::executor::block_on(self.settings.lock()); Ok(settings.do_lost.to_value()) } subclass::Property("max-dropout-time", ..) => { - let settings = block_on(self.settings.lock()); + let settings = runtime::executor::block_on(self.settings.lock()); Ok(settings.max_dropout_time.to_value()) } subclass::Property("max-misorder-time", ..) => { - let settings = block_on(self.settings.lock()); + let settings = runtime::executor::block_on(self.settings.lock()); Ok(settings.max_misorder_time.to_value()) } subclass::Property("stats", ..) => { - let state = block_on(self.state.lock()); + let state = runtime::executor::block_on(self.state.lock()); let s = gst::Structure::new( "application/x-rtp-jitterbuffer-stats", &[ @@ -1250,11 +1257,11 @@ impl ObjectImpl for JitterBuffer { Ok(s.to_value()) } subclass::Property("context", ..) => { - let settings = block_on(self.settings.lock()); + let settings = runtime::executor::block_on(self.settings.lock()); Ok(settings.context.to_value()) } subclass::Property("context-wait", ..) => { - let settings = block_on(self.settings.lock()); + let settings = runtime::executor::block_on(self.settings.lock()); Ok(settings.context_wait.to_value()) } _ => unimplemented!(), @@ -1279,14 +1286,16 @@ impl ElementImpl for JitterBuffer { gst_trace!(CAT, obj: element, "Changing state {:?}", transition); match transition { - gst::StateChange::NullToReady => block_on(async { + gst::StateChange::NullToReady => runtime::executor::block_on(async { let _state = self.state.lock().await; - let settings = self.settings.lock().await; - let context = Context::acquire(&settings.context, settings.context_wait).unwrap(); + let context = { + let settings = self.settings.lock().await; + Context::acquire(&settings.context, settings.context_wait).unwrap() + }; let _ = self .src_pad - .prepare(context, &JitterBufferPadSrcHandler {}) + .prepare(context, &JitterBufferPadSrcHandler) .await .map_err(|err| { gst_error_msg!( @@ -1296,9 +1305,9 @@ impl ElementImpl for JitterBuffer { gst::StateChangeError }); - self.sink_pad.prepare(&JitterBufferPadSinkHandler {}).await; + self.sink_pad.prepare(&JitterBufferPadSinkHandler).await; }), - gst::StateChange::PausedToReady => block_on(async { + gst::StateChange::PausedToReady => runtime::executor::block_on(async { let mut state = self.state.lock().await; if let Some(wakeup_abort_handle) = state.wakeup_abort_handle.take() { @@ -1309,7 +1318,7 @@ impl ElementImpl for JitterBuffer { abort_handle.abort(); } }), - gst::StateChange::ReadyToNull => block_on(async { + gst::StateChange::ReadyToNull => runtime::executor::block_on(async { let mut state = self.state.lock().await; self.sink_pad.unprepare().await; diff --git a/gst-plugin-threadshare/src/proxy.rs b/gst-plugin-threadshare/src/proxy.rs index 2b3169ec..43cdf2ff 100644 --- a/gst-plugin-threadshare/src/proxy.rs +++ b/gst-plugin-threadshare/src/proxy.rs @@ -18,7 +18,6 @@ use either::Either; use futures::channel::oneshot; -use futures::executor::block_on; use futures::future::BoxFuture; use futures::lock::{Mutex, MutexGuard}; use futures::prelude::*; @@ -42,7 +41,7 @@ use std::sync::{Arc, Weak}; use std::{u32, u64}; use crate::runtime::prelude::*; -use crate::runtime::{Context, PadSink, PadSinkRef, PadSrc, PadSrcRef}; +use crate::runtime::{self, Context, PadSink, PadSinkRef, PadSrc, PadSrcRef}; use super::dataqueue::{DataQueue, DataQueueItem}; @@ -364,7 +363,7 @@ impl PadSinkHandler for ProxySinkPadHandler { } else { match event.view() { EventView::FlushStart(..) => { - let _ = block_on(proxysink.stop(element)); + let _ = runtime::executor::block_on(proxysink.stop(element)); } EventView::FlushStop(..) => { let (res, state, pending) = element.get_state(0.into()); @@ -372,7 +371,7 @@ impl PadSinkHandler for ProxySinkPadHandler { || res == Ok(gst::StateChangeSuccess::Async) && pending == gst::State::Paused { - let _ = block_on(proxysink.start(&element)); + let _ = runtime::executor::block_on(proxysink.start(&element)); } } _ => (), @@ -380,7 +379,9 @@ impl PadSinkHandler for ProxySinkPadHandler { gst_debug!(SINK_CAT, obj: pad.gst_pad(), "Fowarding non-serialized event {:?}", event); // FIXME proxysink can't forward directly to the src_pad of the proxysrc - let _ = block_on(proxysink.enqueue_item(&element, DataQueueItem::Event(event))); + let _ = runtime::executor::block_on( + proxysink.enqueue_item(&element, DataQueueItem::Event(event)), + ); Either::Left(true) } @@ -599,8 +600,8 @@ impl ProxySink { let mut state = self.state.lock().await; gst_debug!(SINK_CAT, obj: element, "Preparing"); - let settings = self.settings.lock().await; - state.queue = match SharedQueue::get(&settings.proxy_context, true).await { + state.queue = match SharedQueue::get(&self.settings.lock().await.proxy_context, true).await + { Some(queue) => Some(queue), None => { return Err(gst_error_msg!( @@ -706,7 +707,7 @@ impl ObjectImpl for ProxySink { match *prop { subclass::Property("proxy-context", ..) => { - let mut settings = block_on(self.settings.lock()); + let mut settings = runtime::executor::block_on(self.settings.lock()); settings.proxy_context = value .get() .expect("type checked upstream") @@ -721,7 +722,7 @@ impl ObjectImpl for ProxySink { match *prop { subclass::Property("proxy-context", ..) => { - let settings = block_on(self.settings.lock()); + let settings = runtime::executor::block_on(self.settings.lock()); Ok(settings.proxy_context.to_value()) } _ => unimplemented!(), @@ -748,16 +749,18 @@ impl ElementImpl for ProxySink { match transition { gst::StateChange::NullToReady => { - block_on(self.prepare(element)).map_err(|err| { + runtime::executor::block_on(self.prepare(element)).map_err(|err| { element.post_error_message(&err); gst::StateChangeError })?; } gst::StateChange::PausedToReady => { - block_on(self.stop(element)).map_err(|_| gst::StateChangeError)?; + runtime::executor::block_on(self.stop(element)) + .map_err(|_| gst::StateChangeError)?; } gst::StateChange::ReadyToNull => { - block_on(self.unprepare(element)).map_err(|_| gst::StateChangeError)?; + runtime::executor::block_on(self.unprepare(element)) + .map_err(|_| gst::StateChangeError)?; } _ => (), } @@ -765,7 +768,7 @@ impl ElementImpl for ProxySink { let success = self.parent_change_state(element, transition)?; if transition == gst::StateChange::ReadyToPaused { - block_on(self.start(element)).map_err(|_| gst::StateChangeError)?; + runtime::executor::block_on(self.start(element)).map_err(|_| gst::StateChangeError)?; } Ok(success) @@ -881,7 +884,7 @@ impl PadSrcHandler for ProxySrcPadHandler { let ret = match event.view() { EventView::FlushStart(..) => { - let _ = block_on(proxysrc.pause(element)); + let _ = runtime::executor::block_on(proxysrc.pause(element)); true } EventView::FlushStop(..) => { @@ -889,7 +892,7 @@ impl PadSrcHandler for ProxySrcPadHandler { if res == Ok(gst::StateChangeSuccess::Success) && state == gst::State::Playing || res == Ok(gst::StateChangeSuccess::Async) && pending == gst::State::Playing { - let _ = block_on(proxysrc.start(element)); + let _ = runtime::executor::block_on(proxysrc.start(element)); } true } @@ -1032,7 +1035,7 @@ impl ProxySrc { })?; self.src_pad - .prepare(context, &ProxySrcPadHandler {}) + .prepare(context, &ProxySrcPadHandler) .await .map_err(|err| { gst_error_msg!( @@ -1155,32 +1158,27 @@ impl ObjectImpl for ProxySrc { fn set_property(&self, _obj: &glib::Object, id: usize, value: &glib::Value) { let prop = &PROPERTIES_SRC[id]; + let mut settings = runtime::executor::block_on(self.settings.lock()); match *prop { subclass::Property("max-size-buffers", ..) => { - let mut settings = block_on(self.settings.lock()); settings.max_size_buffers = value.get_some().expect("type checked upstream"); } subclass::Property("max-size-bytes", ..) => { - let mut settings = block_on(self.settings.lock()); settings.max_size_bytes = value.get_some().expect("type checked upstream"); } subclass::Property("max-size-time", ..) => { - let mut settings = block_on(self.settings.lock()); settings.max_size_time = value.get_some().expect("type checked upstream"); } subclass::Property("context", ..) => { - let mut settings = block_on(self.settings.lock()); settings.context = value .get() .expect("type checked upstream") .unwrap_or_else(|| "".into()); } subclass::Property("context-wait", ..) => { - let mut settings = block_on(self.settings.lock()); settings.context_wait = value.get_some().expect("type checked upstream"); } subclass::Property("proxy-context", ..) => { - let mut settings = block_on(self.settings.lock()); settings.proxy_context = value .get() .expect("type checked upstream") @@ -1193,31 +1191,14 @@ impl ObjectImpl for ProxySrc { fn get_property(&self, _obj: &glib::Object, id: usize) -> Result { let prop = &PROPERTIES_SRC[id]; + let settings = runtime::executor::block_on(self.settings.lock()); match *prop { - subclass::Property("max-size-buffers", ..) => { - let settings = block_on(self.settings.lock()); - Ok(settings.max_size_buffers.to_value()) - } - subclass::Property("max-size-bytes", ..) => { - let settings = block_on(self.settings.lock()); - Ok(settings.max_size_bytes.to_value()) - } - subclass::Property("max-size-time", ..) => { - let settings = block_on(self.settings.lock()); - Ok(settings.max_size_time.to_value()) - } - subclass::Property("context", ..) => { - let settings = block_on(self.settings.lock()); - Ok(settings.context.to_value()) - } - subclass::Property("context-wait", ..) => { - let settings = block_on(self.settings.lock()); - Ok(settings.context_wait.to_value()) - } - subclass::Property("proxy-context", ..) => { - let settings = block_on(self.settings.lock()); - Ok(settings.proxy_context.to_value()) - } + subclass::Property("max-size-buffers", ..) => Ok(settings.max_size_buffers.to_value()), + subclass::Property("max-size-bytes", ..) => Ok(settings.max_size_bytes.to_value()), + subclass::Property("max-size-time", ..) => Ok(settings.max_size_time.to_value()), + subclass::Property("context", ..) => Ok(settings.context.to_value()), + subclass::Property("context-wait", ..) => Ok(settings.context_wait.to_value()), + subclass::Property("proxy-context", ..) => Ok(settings.proxy_context.to_value()), _ => unimplemented!(), } } @@ -1242,16 +1223,18 @@ impl ElementImpl for ProxySrc { match transition { gst::StateChange::NullToReady => { - block_on(self.prepare(element)).map_err(|err| { + runtime::executor::block_on(self.prepare(element)).map_err(|err| { element.post_error_message(&err); gst::StateChangeError })?; } gst::StateChange::PlayingToPaused => { - block_on(self.pause(element)).map_err(|_| gst::StateChangeError)?; + runtime::executor::block_on(self.pause(element)) + .map_err(|_| gst::StateChangeError)?; } gst::StateChange::ReadyToNull => { - block_on(self.unprepare(element)).map_err(|_| gst::StateChangeError)?; + runtime::executor::block_on(self.unprepare(element)) + .map_err(|_| gst::StateChangeError)?; } _ => (), } @@ -1263,7 +1246,8 @@ impl ElementImpl for ProxySrc { success = gst::StateChangeSuccess::NoPreroll; } gst::StateChange::PausedToPlaying => { - block_on(self.start(element)).map_err(|_| gst::StateChangeError)?; + runtime::executor::block_on(self.start(element)) + .map_err(|_| gst::StateChangeError)?; } _ => (), } diff --git a/gst-plugin-threadshare/src/queue.rs b/gst-plugin-threadshare/src/queue.rs index 9ab6c72a..7523bab1 100644 --- a/gst-plugin-threadshare/src/queue.rs +++ b/gst-plugin-threadshare/src/queue.rs @@ -18,7 +18,6 @@ use either::Either; use futures::channel::oneshot; -use futures::executor::block_on; use futures::future::BoxFuture; use futures::lock::Mutex; use futures::prelude::*; @@ -41,7 +40,7 @@ use std::collections::VecDeque; use std::{u32, u64}; use crate::runtime::prelude::*; -use crate::runtime::{Context, PadSink, PadSinkRef, PadSrc, PadSrcRef}; +use crate::runtime::{self, Context, PadSink, PadSinkRef, PadSrc, PadSrcRef}; use super::dataqueue::{DataQueue, DataQueueItem}; @@ -372,7 +371,7 @@ impl PadSrcHandler for QueuePadSrcHandler { } else { match event.view() { EventView::FlushStart(..) => { - let _ = block_on(queue.stop(element)); + let _ = runtime::executor::block_on(queue.stop(element)); } EventView::FlushStop(..) => { let (res, state, pending) = element.get_state(0.into()); @@ -380,7 +379,7 @@ impl PadSrcHandler for QueuePadSrcHandler { || res == Ok(gst::StateChangeSuccess::Async) && pending == gst::State::Playing { - let _ = block_on(queue.start(element)); + let _ = runtime::executor::block_on(queue.start(element)); } } _ => (), @@ -671,7 +670,7 @@ impl Queue { })?; self.src_pad - .prepare(context, &QueuePadSrcHandler {}) + .prepare(context, &QueuePadSrcHandler) .await .map_err(|err| { gst_error_msg!( @@ -679,7 +678,7 @@ impl Queue { ["Error joining Context: {:?}", err] ) })?; - self.sink_pad.prepare(&QueuePadSinkHandler {}).await; + self.sink_pad.prepare(&QueuePadSinkHandler).await; gst_debug!(CAT, obj: element, "Prepared"); @@ -812,26 +811,26 @@ impl ObjectImpl for Queue { match *prop { subclass::Property("max-size-buffers", ..) => { - let mut settings = block_on(self.settings.lock()); + let mut settings = runtime::executor::block_on(self.settings.lock()); settings.max_size_buffers = value.get_some().expect("type checked upstream"); } subclass::Property("max-size-bytes", ..) => { - let mut settings = block_on(self.settings.lock()); + let mut settings = runtime::executor::block_on(self.settings.lock()); settings.max_size_bytes = value.get_some().expect("type checked upstream"); } subclass::Property("max-size-time", ..) => { - let mut settings = block_on(self.settings.lock()); + let mut settings = runtime::executor::block_on(self.settings.lock()); settings.max_size_time = value.get_some().expect("type checked upstream"); } subclass::Property("context", ..) => { - let mut settings = block_on(self.settings.lock()); + let mut settings = runtime::executor::block_on(self.settings.lock()); settings.context = value .get() .expect("type checked upstream") .unwrap_or_else(|| "".into()); } subclass::Property("context-wait", ..) => { - let mut settings = block_on(self.settings.lock()); + let mut settings = runtime::executor::block_on(self.settings.lock()); settings.context_wait = value.get_some().expect("type checked upstream"); } _ => unimplemented!(), @@ -843,23 +842,23 @@ impl ObjectImpl for Queue { match *prop { subclass::Property("max-size-buffers", ..) => { - let settings = block_on(self.settings.lock()); + let settings = runtime::executor::block_on(self.settings.lock()); Ok(settings.max_size_buffers.to_value()) } subclass::Property("max-size-bytes", ..) => { - let settings = block_on(self.settings.lock()); + let settings = runtime::executor::block_on(self.settings.lock()); Ok(settings.max_size_bytes.to_value()) } subclass::Property("max-size-time", ..) => { - let settings = block_on(self.settings.lock()); + let settings = runtime::executor::block_on(self.settings.lock()); Ok(settings.max_size_time.to_value()) } subclass::Property("context", ..) => { - let settings = block_on(self.settings.lock()); + let settings = runtime::executor::block_on(self.settings.lock()); Ok(settings.context.to_value()) } subclass::Property("context-wait", ..) => { - let settings = block_on(self.settings.lock()); + let settings = runtime::executor::block_on(self.settings.lock()); Ok(settings.context_wait.to_value()) } _ => unimplemented!(), @@ -885,16 +884,18 @@ impl ElementImpl for Queue { match transition { gst::StateChange::NullToReady => { - block_on(self.prepare(element)).map_err(|err| { + runtime::executor::block_on(self.prepare(element)).map_err(|err| { element.post_error_message(&err); gst::StateChangeError })?; } gst::StateChange::PausedToReady => { - block_on(self.stop(element)).map_err(|_| gst::StateChangeError)?; + runtime::executor::block_on(self.stop(element)) + .map_err(|_| gst::StateChangeError)?; } gst::StateChange::ReadyToNull => { - block_on(self.unprepare(element)).map_err(|_| gst::StateChangeError)?; + runtime::executor::block_on(self.unprepare(element)) + .map_err(|_| gst::StateChangeError)?; } _ => (), } @@ -902,7 +903,7 @@ impl ElementImpl for Queue { let success = self.parent_change_state(element, transition)?; if transition == gst::StateChange::ReadyToPaused { - block_on(self.start(element)).map_err(|_| gst::StateChangeError)?; + runtime::executor::block_on(self.start(element)).map_err(|_| gst::StateChangeError)?; } Ok(success) diff --git a/gst-plugin-threadshare/src/runtime/executor.rs b/gst-plugin-threadshare/src/runtime/executor.rs index 2ac4d330..25446ca4 100644 --- a/gst-plugin-threadshare/src/runtime/executor.rs +++ b/gst-plugin-threadshare/src/runtime/executor.rs @@ -1,4 +1,5 @@ -// Copyright (C) 2018 Sebastian Dröge +// Copyright (C) 2018-2019 Sebastian Dröge +// Copyright (C) 2019-2020 François Laignel // // This library is free software; you can redistribute it and/or // modify it under the terms of the GNU Library General Public @@ -49,11 +50,15 @@ use gst::{gst_debug, gst_log, gst_trace}; use lazy_static::lazy_static; +use std::cell::RefCell; use std::collections::HashMap; +use std::fmt; use std::io; use std::mem; +use std::pin::Pin; use std::sync::mpsc as sync_mpsc; use std::sync::{Arc, Mutex, Weak}; +use std::task::Poll; use std::thread; use std::time::Duration; @@ -73,52 +78,78 @@ lazy_static! { static ref CONTEXTS: Mutex>> = Mutex::new(HashMap::new()); } +thread_local!(static CURRENT_THREAD_CONTEXT: RefCell> = RefCell::new(None)); + +/// Blocks on `future`. +/// +/// This function must NOT be called within a [`Context`] thread. +/// +/// The reason is this would prevent any task operating on the +/// [`Context`] from making progress. +/// +/// # Panics +/// +/// This function panics if called within a [`Context`] thread. +/// +/// [`Context`]: struct.Context.html +pub fn block_on(future: Fut) -> Fut::Output { + if Context::is_context_thread() { + panic!("Attempt to `block_on` within a `Context` thread"); + } + + // Not running in a Context thread so we can block + futures::executor::block_on(future) +} + struct ContextThread { name: String, } impl ContextThread { - fn start(name: &str, wait: u32) -> (tokio::runtime::Handle, ContextShutdown) { - let name_clone = name.into(); - - let mut context_thread = ContextThread { name: name_clone }; - - let (handle_sender, handle_receiver) = sync_mpsc::channel(); - let (shutdown_sender, shutdown_receiver) = oneshot::channel(); - + fn start(name: &str, wait: u32) -> Context { + let context_thread = ContextThread { name: name.into() }; + let (context_sender, context_receiver) = sync_mpsc::channel(); let join = thread::spawn(move || { - context_thread.spawn(wait, handle_sender, shutdown_receiver); + context_thread.spawn(wait, context_sender); }); - let handle = handle_receiver.recv().expect("Context thread init failed"); + let context = context_receiver.recv().expect("Context thread init failed"); + *context.0.shutdown.join.lock().unwrap() = Some(join); - let shutdown = ContextShutdown { - name: name.into(), - shutdown: Some(shutdown_sender), - join: Some(join), - }; - - (handle, shutdown) + context } - fn spawn( - &mut self, - wait: u32, - handle_sender: sync_mpsc::Sender, - shutdown_receiver: oneshot::Receiver<()>, - ) { + fn spawn(&self, wait: u32, context_sender: sync_mpsc::Sender) { gst_debug!(RUNTIME_CAT, "Started context thread '{}'", self.name); let mut runtime = tokio::runtime::Builder::new() .basic_scheduler() + .thread_name(self.name.clone()) .enable_all() .max_throttling(Duration::from_millis(wait as u64)) .build() .expect("Couldn't build the runtime"); - handle_sender - .send(runtime.handle().clone()) - .expect("Couldn't send context thread handle"); + let (shutdown_sender, shutdown_receiver) = oneshot::channel(); + + let shutdown = ContextShutdown { + name: self.name.clone(), + shutdown: Some(shutdown_sender), + join: Mutex::new(None), + }; + + let context = Context(Arc::new(ContextInner { + name: self.name.clone(), + handle: Mutex::new(runtime.handle().clone()), + shutdown, + task_queues: Mutex::new((0, HashMap::new())), + })); + + CURRENT_THREAD_CONTEXT.with(|cur_ctx| { + *cur_ctx.borrow_mut() = Some(context.downgrade()); + }); + + context_sender.send(context).unwrap(); let _ = runtime.block_on(shutdown_receiver); } @@ -134,7 +165,7 @@ impl Drop for ContextThread { struct ContextShutdown { name: String, shutdown: Option>, - join: Option>, + join: Mutex>>, } impl Drop for ContextShutdown { @@ -151,7 +182,8 @@ impl Drop for ContextShutdown { "Waiting for context thread '{}' to shutdown", self.name ); - let _ = self.join.take().unwrap().join(); + let join_handle = self.join.lock().unwrap().take().unwrap(); + let _ = join_handle.join(); } } @@ -169,12 +201,65 @@ glib_boxed_derive_traits!(TaskQueueId); pub type TaskOutput = Result<(), gst::FlowError>; type TaskQueue = FuturesUnordered>; +pub struct JoinError(tokio::task::JoinError); + +impl fmt::Display for JoinError { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Display::fmt(&self.0, fmt) + } +} + +impl fmt::Debug for JoinError { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Debug::fmt(&self.0, fmt) + } +} + +impl std::error::Error for JoinError {} + +impl From for JoinError { + fn from(src: tokio::task::JoinError) -> Self { + JoinError(src) + } +} + +/// Wrapper for the underlying runtime JoinHandle implementation. +pub struct JoinHandle(tokio::task::JoinHandle); + +unsafe impl Send for JoinHandle {} +unsafe impl Sync for JoinHandle {} + +impl From> for JoinHandle { + fn from(src: tokio::task::JoinHandle) -> Self { + JoinHandle(src) + } +} + +impl Unpin for JoinHandle {} + +impl Future for JoinHandle { + type Output = Result; + + fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { + self.as_mut().0.poll_unpin(cx).map_err(JoinError::from) + } +} + +impl fmt::Debug for JoinHandle +where + T: fmt::Debug, +{ + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("JoinHandle").finish() + } +} + #[derive(Debug)] struct ContextInner { name: String, handle: Mutex, // Only used for dropping - _shutdown: ContextShutdown, + shutdown: ContextShutdown, task_queues: Mutex<(u64, HashMap)>, } @@ -221,14 +306,7 @@ impl Context { } } - let (handle, shutdown) = ContextThread::start(context_name, wait); - - let context = Context(Arc::new(ContextInner { - name: context_name.into(), - handle: Mutex::new(handle), - _shutdown: shutdown, - task_queues: Mutex::new((0, HashMap::new())), - })); + let context = ContextThread::start(context_name, wait); contexts.insert(context_name.into(), Arc::downgrade(&context.0)); gst_debug!(RUNTIME_CAT, "New Context '{}'", context.0.name); @@ -252,12 +330,34 @@ impl Context { self.0.name.as_str() } - pub fn spawn(&self, future: Fut) -> tokio::task::JoinHandle + /// Returns `true` if a `Context` is running on current thread. + pub fn is_context_thread() -> bool { + CURRENT_THREAD_CONTEXT.with(|cur_ctx| cur_ctx.borrow().is_some()) + } + + /// Returns the `Context` running on current thread, if any. + pub fn current() -> Option { + CURRENT_THREAD_CONTEXT.with(|cur_ctx| { + cur_ctx + .borrow() + .as_ref() + .and_then(|ctx_weak| ctx_weak.upgrade()) + }) + } + + pub fn enter(&self, f: F) -> R + where + F: FnOnce() -> R, + { + self.0.handle.lock().unwrap().enter(f) + } + + pub fn spawn(&self, future: Fut) -> JoinHandle where Fut: Future + Send + 'static, Fut::Output: Send + 'static, { - self.0.handle.lock().unwrap().spawn(future) + self.0.handle.lock().unwrap().spawn(future).into() } pub fn release_task_queue(&self, id: TaskQueueId) -> Option { @@ -308,64 +408,35 @@ impl Context { None } } - - /// Builds a `Future` to execute an `action` at [`Interval`]s. - /// - /// [`Interval`]: struct.Interval.html - pub fn interval(&self, interval: Duration, f: F) -> impl Future - where - F: Fn() -> Fut + Send + Sync + 'static, - E: Send + 'static, - Fut: Future> + Send + 'static, - { - async move { - let mut interval = tokio::time::interval(interval); - loop { - interval.tick().await; - if let Err(err) = f().await { - break Err(err); - } - } - } - } - - /// Builds a `Future` to execute an action after the given `delay` has elapsed. - pub fn delay_for(&self, delay: Duration, f: F) -> impl Future - where - F: FnOnce() -> Fut + Send + Sync + 'static, - Fut: Future + Send + 'static, - { - async move { - tokio::time::delay_for(delay).await; - f().await - } - } } #[cfg(test)] mod tests { + use futures; use futures::channel::mpsc; - use futures::future::abortable; use futures::lock::Mutex; + use futures::prelude::*; use gst; + use std::net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket}; use std::sync::Arc; - use std::time::Instant; + use std::time::{Duration, Instant}; - use super::*; + use super::Context; type Item = i32; - const SLEEP_DURATION: u32 = 2; - const INTERVAL: Duration = std::time::Duration::from_millis(100 * SLEEP_DURATION as u64); + const SLEEP_DURATION_MS: u32 = 2; + const SLEEP_DURATION: Duration = Duration::from_millis(SLEEP_DURATION_MS as u64); + const DELAY: Duration = Duration::from_millis(SLEEP_DURATION_MS as u64 * 10); #[tokio::test] async fn user_drain_pending_tasks() { // Setup gst::init().unwrap(); - let context = Context::acquire("user_drain_task_queue", SLEEP_DURATION).unwrap(); + let context = Context::acquire("user_drain_task_queue", SLEEP_DURATION_MS).unwrap(); let queue_id = context.acquire_task_queue_id(); let (sender, mut receiver) = mpsc::channel(1); @@ -404,105 +475,120 @@ mod tests { } #[tokio::test] - async fn delay_for() { + async fn block_on_within_tokio() { + let context = Context::acquire("block_on_within_tokio", SLEEP_DURATION_MS).unwrap(); + + let bytes_sent = crate::runtime::executor::block_on(context.spawn(async { + let saddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5000); + let socket = UdpSocket::bind(saddr).unwrap(); + let mut socket = tokio::net::UdpSocket::from_std(socket).unwrap(); + let saddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 4000); + socket.send_to(&[0; 10], saddr).await.unwrap() + })) + .unwrap(); + assert_eq!(bytes_sent, 10); + + let elapsed = crate::runtime::executor::block_on(context.spawn(async { + let now = Instant::now(); + crate::runtime::time::delay_for(DELAY).await; + now.elapsed() + })) + .unwrap(); + // Due to throttling, `Delay` may be fired earlier + assert!(elapsed + SLEEP_DURATION / 2 >= DELAY); + } + + #[test] + fn block_on_from_sync() { + let context = Context::acquire("block_on_from_sync", SLEEP_DURATION_MS).unwrap(); + + let bytes_sent = crate::runtime::executor::block_on(context.spawn(async { + let saddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5001); + let socket = UdpSocket::bind(saddr).unwrap(); + let mut socket = tokio::net::UdpSocket::from_std(socket).unwrap(); + let saddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 4000); + socket.send_to(&[0; 10], saddr).await.unwrap() + })) + .unwrap(); + assert_eq!(bytes_sent, 10); + + let elapsed = crate::runtime::executor::block_on(context.spawn(async { + let now = Instant::now(); + crate::runtime::time::delay_for(DELAY).await; + now.elapsed() + })) + .unwrap(); + // Due to throttling, `Delay` may be fired earlier + assert!(elapsed + SLEEP_DURATION / 2 >= DELAY); + } + + #[test] + fn block_on_from_context() { gst::init().unwrap(); - let context = Context::acquire("delay_for", SLEEP_DURATION).unwrap(); - - let (sender, receiver) = oneshot::channel(); - - let start = Instant::now(); - let delayed_by_fut = context.delay_for(INTERVAL, move || { - async { - sender.send(42).unwrap(); - } + let context = Context::acquire("block_on_from_context", SLEEP_DURATION_MS).unwrap(); + let join_handle = context.spawn(async { + crate::runtime::executor::block_on(async { + crate::runtime::time::delay_for(DELAY).await; + }); }); - context.spawn(delayed_by_fut); - - let _ = receiver.await.unwrap(); - let delta = Instant::now() - start; - assert!(delta >= INTERVAL); - assert!(delta < INTERVAL * 2); + // Panic: attempt to `runtime::executor::block_on` within a `Context` thread + futures::executor::block_on(join_handle).unwrap_err(); } #[tokio::test] - async fn interval_ok() { + async fn enter_context_from_tokio() { gst::init().unwrap(); - let context = Context::acquire("interval_ok", SLEEP_DURATION).unwrap(); + let context = Context::acquire("enter_context_from_tokio", SLEEP_DURATION_MS).unwrap(); + let mut socket = context + .enter(|| { + let saddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5002); + let socket = UdpSocket::bind(saddr).unwrap(); + tokio::net::UdpSocket::from_std(socket) + }) + .unwrap(); - let (sender, mut receiver) = mpsc::channel(1); - let sender: Arc>> = Arc::new(Mutex::new(sender)); + let saddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 4000); + let bytes_sent = socket.send_to(&[0; 10], saddr).await.unwrap(); + assert_eq!(bytes_sent, 10); - let (interval_fut, handle) = abortable(context.interval(INTERVAL, move || { - let sender = Arc::clone(&sender); - async move { - let instant = Instant::now(); - sender.lock().await.send(instant).await.map_err(drop) - } - })); - context.spawn(interval_fut.map(drop)); - - let mut idx: u32 = 0; - let mut first = Instant::now(); - while let Some(instant) = receiver.next().await { - if idx > 0 { - let delta = instant - first; - assert!(delta > INTERVAL * (idx - 1)); - assert!(delta < INTERVAL * (idx + 1)); - } else { - first = instant; - } - if idx == 3 { - handle.abort(); - break; - } - - idx += 1; - } + let elapsed = context.enter(|| { + futures::executor::block_on(async { + let now = Instant::now(); + crate::runtime::time::delay_for(DELAY).await; + now.elapsed() + }) + }); + // Due to throttling, `Delay` may be fired earlier + assert!(elapsed + SLEEP_DURATION / 2 >= DELAY); } - #[tokio::test] - async fn interval_err() { + #[test] + fn enter_context_from_sync() { gst::init().unwrap(); - let context = Context::acquire("interval_err", SLEEP_DURATION).unwrap(); + let context = Context::acquire("enter_context_from_sync", SLEEP_DURATION_MS).unwrap(); + let mut socket = context + .enter(|| { + let saddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5003); + let socket = UdpSocket::bind(saddr).unwrap(); + tokio::net::UdpSocket::from_std(socket) + }) + .unwrap(); - let (sender, mut receiver) = mpsc::channel(1); - let sender: Arc>> = Arc::new(Mutex::new(sender)); - let interval_idx: Arc> = Arc::new(Mutex::new(0)); + let saddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 4000); + let bytes_sent = futures::executor::block_on(socket.send_to(&[0; 10], saddr)).unwrap(); + assert_eq!(bytes_sent, 10); - let interval_fut = context.interval(INTERVAL, move || { - let sender = Arc::clone(&sender); - let interval_idx = Arc::clone(&interval_idx); - async move { - let instant = Instant::now(); - let mut idx = interval_idx.lock().await; - sender.lock().await.send(instant).await.unwrap(); - *idx += 1; - if *idx < 3 { - Ok(()) - } else { - Err(()) - } - } + let elapsed = context.enter(|| { + futures::executor::block_on(async { + let now = Instant::now(); + crate::runtime::time::delay_for(DELAY).await; + now.elapsed() + }) }); - context.spawn(interval_fut.map(drop)); - - let mut idx: u32 = 0; - let mut first = Instant::now(); - while let Some(instant) = receiver.next().await { - if idx > 0 { - let delta = instant - first; - assert!(delta > INTERVAL * (idx - 1)); - assert!(delta < INTERVAL * (idx + 1)); - } else { - first = instant; - } - - idx += 1; - } - - assert_eq!(idx, 3); + // Due to throttling, `Delay` may be fired earlier + assert!(elapsed + SLEEP_DURATION / 2 >= DELAY); } } diff --git a/gst-plugin-threadshare/src/runtime/mod.rs b/gst-plugin-threadshare/src/runtime/mod.rs index 881dfd0a..cd835a10 100644 --- a/gst-plugin-threadshare/src/runtime/mod.rs +++ b/gst-plugin-threadshare/src/runtime/mod.rs @@ -44,13 +44,13 @@ //! [`PadSink`]: pad/struct.PadSink.html pub mod executor; -pub use executor::{Context, TaskOutput}; +pub use executor::{Context, JoinHandle, TaskOutput}; pub mod pad; pub use pad::{PadSink, PadSinkRef, PadSrc, PadSrcRef, PadSrcWeak}; pub mod pad_context; -pub use pad_context::{PadContext, PadContextWeak}; +pub use pad_context::{PadContext, PadContextRef, PadContextWeak}; pub mod prelude { pub use super::pad::{PadSinkHandler, PadSrcHandler}; @@ -58,6 +58,8 @@ pub mod prelude { pub mod task; +pub mod time; + use gst; use lazy_static::lazy_static; diff --git a/gst-plugin-threadshare/src/runtime/pad.rs b/gst-plugin-threadshare/src/runtime/pad.rs index 9c5c67f0..c6ec95a7 100644 --- a/gst-plugin-threadshare/src/runtime/pad.rs +++ b/gst-plugin-threadshare/src/runtime/pad.rs @@ -65,7 +65,6 @@ use either::Either; -use futures::executor::block_on; use futures::future; use futures::future::BoxFuture; use futures::lock::{Mutex, MutexGuard}; @@ -74,7 +73,7 @@ use futures::prelude::*; use gst; use gst::prelude::*; use gst::subclass::prelude::*; -use gst::{gst_debug, gst_error, gst_log, gst_loggable_error}; +use gst::{gst_debug, gst_error, gst_fixme, gst_log, gst_loggable_error}; use gst::{FlowError, FlowSuccess}; use std::fmt; @@ -82,7 +81,7 @@ use std::marker::PhantomData; use std::sync; use std::sync::{Arc, Weak}; -use super::executor::Context; +use super::executor::{self, Context}; use super::pad_context::{PadContext, PadContextRef, PadContextWeak}; use super::task::Task; use super::RUNTIME_CAT; @@ -399,7 +398,7 @@ impl<'a> PadSrcRef<'a> { } if !active { - block_on(async { + executor::block_on(async { self.strong.lock_state().await.is_initialized = false; }); } @@ -597,7 +596,7 @@ impl PadSrc { self.0.lock_state().await } - fn init_pad_functions(&self, handler: &H, context: Context) { + fn init_pad_functions(&self, handler: &H) { let handler_clone = handler.clone(); let this_weak = self.downgrade(); self.gst_pad() @@ -649,7 +648,6 @@ impl PadSrc { .set_event_full_function(move |_gst_pad, parent, event| { let handler = handler_clone.clone(); let this_weak = this_weak.clone(); - let context = context.clone(); H::ElementImpl::catch_panic_pad_function( parent, || Err(FlowError::Error), @@ -657,13 +655,11 @@ impl PadSrc { let this_ref = this_weak.upgrade().expect("PadSrc no longer exists"); match handler.src_event_full(this_ref, imp, &element, event) { Either::Left(res) => res, - Either::Right(fut) => { - // FIXME if we could check whether current thread is already - // associated to an executor or not, we could `.await` here. - // But I couldn't find a solution for this with current version - // of `tokio` - context.spawn(fut.map(drop)); - Ok(FlowSuccess::Ok) + Either::Right(_fut) => { + // See these threads: + // https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/merge_requests/240#note_378446 + // https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/merge_requests/240#note_378454 + unimplemented!("Future handling in src_event*"); } } }, @@ -681,7 +677,12 @@ impl PadSrc { || false, move |imp, element| { let this_ref = this_weak.upgrade().expect("PadSrc no longer exists"); - handler.src_query(this_ref, imp, &element, query) + if !query.is_serialized() { + handler.src_query(this_ref, imp, &element, query) + } else { + gst_fixme!(RUNTIME_CAT, obj: this_ref.gst_pad(), "Serialized Query not supported"); + false + } }, ) }); @@ -706,9 +707,9 @@ impl PadSrc { .await .map_err(|_| PadContextError::ActiveTask)?; - state.pad_context = Some(PadContext::new(context.clone())); + state.pad_context = Some(PadContext::new(context)); - self.init_pad_functions(handler, context); + self.init_pad_functions(handler); Ok(()) } @@ -1181,7 +1182,12 @@ impl PadSink { || false, move |imp, element| { let this_ref = this_weak.upgrade().expect("PadSink no longer exists"); - handler.sink_query(this_ref, imp, &element, query) + if !query.is_serialized() { + handler.sink_query(this_ref, imp, &element, query) + } else { + gst_fixme!(RUNTIME_CAT, obj: this_ref.gst_pad(), "Serialized Query not supported"); + false + } }, ) }); @@ -1191,15 +1197,24 @@ impl PadSink { pad_ctx: Arc>>, fut: impl Future> + Send + 'static, ) -> Result { - match *pad_ctx.lock().unwrap() { - Some(ref pad_ctx_weak) => match pad_ctx_weak.upgrade() { - Some(pad_ctx) => { - pad_ctx.add_pending_task(fut.map(|res| res.map(drop))); + match pad_ctx + .lock() + .unwrap() + .as_ref() + .and_then(|pad_ctx_weak| pad_ctx_weak.upgrade()) + { + Some(pad_ctx) => { + pad_ctx.add_pending_task(fut.map(|res| res.map(drop))); + Ok(FlowSuccess::Ok) + } + None => match Context::current() { + None => executor::block_on(fut), + Some(context) => { + // Don't block the Context thread + context.spawn(fut); Ok(FlowSuccess::Ok) } - None => block_on(fut), }, - None => block_on(fut), } } diff --git a/gst-plugin-threadshare/src/runtime/pad_context.rs b/gst-plugin-threadshare/src/runtime/pad_context.rs index 87133304..26522297 100644 --- a/gst-plugin-threadshare/src/runtime/pad_context.rs +++ b/gst-plugin-threadshare/src/runtime/pad_context.rs @@ -27,9 +27,8 @@ use glib; use glib::{glib_boxed_derive_traits, glib_boxed_type}; use std::marker::PhantomData; -use std::time::Duration; -use super::executor::{Context, ContextWeak, TaskOutput, TaskQueueId}; +use super::executor::{Context, ContextWeak, JoinHandle, TaskOutput, TaskQueueId}; #[derive(Clone)] pub struct PadContextWeak { @@ -91,7 +90,7 @@ impl<'a> PadContextRef<'a> { self.strong.downgrade() } - pub fn spawn(&self, future: Fut) -> tokio::task::JoinHandle + pub fn spawn(&self, future: Fut) -> JoinHandle where Fut: Future + Send + 'static, Fut::Output: Send + 'static, @@ -117,27 +116,6 @@ impl<'a> PadContextRef<'a> { pub fn context(&self) -> &Context { &self.strong.context } - - /// Builds a `Future` to execute an `action` at [`Interval`]s. - /// - /// [`Interval`]: struct.Interval.html - pub fn interval(&self, interval: Duration, f: F) -> impl Future - where - F: Fn() -> Fut + Send + Sync + 'static, - E: Send + 'static, - Fut: Future> + Send + 'static, - { - self.strong.interval(interval, f) - } - - /// Builds a `Future` to execute an action after the given `delay` has elapsed. - pub fn delay_for(&self, delay: Duration, f: F) -> impl Future - where - F: FnOnce() -> Fut + Send + Sync + 'static, - Fut: Future + Send + 'static, - { - self.strong.delay_for(delay, f) - } } impl std::fmt::Display for PadContextRef<'_> { @@ -180,25 +158,6 @@ impl PadContextStrong { fn clear_pending_tasks(&self) { self.context.clear_task_queue(self.queue_id); } - - #[inline] - fn interval(&self, interval: Duration, f: F) -> impl Future - where - F: Fn() -> Fut + Send + Sync + 'static, - E: Send + 'static, - Fut: Future> + Send + 'static, - { - self.context.interval(interval, f) - } - - #[inline] - pub fn delay_for(&self, delay: Duration, f: F) -> impl Future - where - F: FnOnce() -> Fut + Send + Sync + 'static, - Fut: Future + Send + 'static, - { - self.context.delay_for(delay, f) - } } impl std::fmt::Display for PadContextStrong { @@ -231,7 +190,7 @@ impl PadContext { PadContextRef::new(self.0.context.clone(), self.0.queue_id) } - pub fn spawn(&self, future: Fut) -> tokio::task::JoinHandle + pub fn spawn(&self, future: Fut) -> JoinHandle where Fut: Future + Send + 'static, Fut::Output: Send + 'static, @@ -247,27 +206,6 @@ impl PadContext { self.0.clear_pending_tasks(); } - /// Builds a `Future` to execute an `action` at [`Interval`]s. - /// - /// [`Interval`]: struct.Interval.html - pub fn interval(&self, interval: Duration, f: F) -> impl Future - where - F: Fn() -> Fut + Send + Sync + 'static, - E: Send + 'static, - Fut: Future> + Send + 'static, - { - self.0.interval(interval, f) - } - - /// Builds a `Future` to execute an action after the given `delay` has elapsed. - pub fn delay_for(&self, delay: Duration, f: F) -> impl Future - where - F: FnOnce() -> Fut + Send + Sync + 'static, - Fut: Future + Send + 'static, - { - self.0.delay_for(delay, f) - } - pub(super) fn new_sticky_event(&self) -> gst::Event { let s = gst::Structure::new("ts-pad-context", &[("pad-context", &self.downgrade())]); gst::Event::new_custom_downstream_sticky(s).build() diff --git a/gst-plugin-threadshare/src/runtime/task.rs b/gst-plugin-threadshare/src/runtime/task.rs index 7297d6ac..dcee39fa 100644 --- a/gst-plugin-threadshare/src/runtime/task.rs +++ b/gst-plugin-threadshare/src/runtime/task.rs @@ -29,7 +29,7 @@ use gst::{gst_debug, gst_log, gst_trace, gst_warning}; use std::fmt; use std::sync::Arc; -use super::{Context, RUNTIME_CAT}; +use super::{Context, JoinHandle, RUNTIME_CAT}; #[derive(Clone, Debug, Eq, PartialEq)] pub enum TaskError { @@ -51,7 +51,7 @@ struct TaskInner { context: Option, state: TaskState, abort_handle: Option, - loop_handle: Option>>, + loop_handle: Option>>, } impl Default for TaskInner { diff --git a/gst-plugin-threadshare/src/runtime/time.rs b/gst-plugin-threadshare/src/runtime/time.rs new file mode 100644 index 00000000..0ea857b0 --- /dev/null +++ b/gst-plugin-threadshare/src/runtime/time.rs @@ -0,0 +1,37 @@ +// Copyright (C) 2020 François Laignel +// +// This library is free software; you can redistribute it and/or +// modify it under the terms of the GNU Library General Public +// License as published by the Free Software Foundation; either +// version 2 of the License, or (at your option) any later version. +// +// This library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +// Library General Public License for more details. +// +// You should have received a copy of the GNU Library General Public +// License along with this library; if not, write to the +// Free Software Foundation, Inc., 51 Franklin Street, Suite 500, +// Boston, MA 02110-1335, USA. + +//! Wrappers for the underlying runtime specific time related Futures. + +use futures::prelude::*; +use futures::stream::StreamExt; + +use std::time::Duration; + +/// Wait until the given `delay` has elapsed. +/// +/// This must be called from within the target runtime environment. +pub async fn delay_for(delay: Duration) { + tokio::time::delay_for(delay).map(drop).await; +} + +/// Builds a `Stream` that yields at `interval. +/// +/// This must be called from within the target runtime environment. +pub fn interval(interval: Duration) -> impl Stream { + tokio::time::interval(interval).map(drop) +} diff --git a/gst-plugin-threadshare/src/tcpclientsrc.rs b/gst-plugin-threadshare/src/tcpclientsrc.rs index 52eed502..fea4aef4 100644 --- a/gst-plugin-threadshare/src/tcpclientsrc.rs +++ b/gst-plugin-threadshare/src/tcpclientsrc.rs @@ -17,7 +17,6 @@ // Boston, MA 02110-1335, USA. use either::Either; -use futures::executor::block_on; use futures::future::BoxFuture; use futures::lock::{Mutex, MutexGuard}; use futures::prelude::*; @@ -44,10 +43,9 @@ use std::sync::Arc; use std::u16; use tokio::io::AsyncReadExt; -use tokio::task::JoinHandle; use crate::runtime::prelude::*; -use crate::runtime::{Context, PadSrc, PadSrcRef}; +use crate::runtime::{self, Context, JoinHandle, PadSrc, PadSrcRef}; use super::socket::{Socket, SocketRead, SocketStream}; @@ -340,7 +338,7 @@ impl PadSrcHandler for TcpClientSrcPadHandler { let ret = match event.view() { EventView::FlushStart(..) => { - let _ = block_on(tcpclientsrc.pause(element)); + let _ = runtime::executor::block_on(tcpclientsrc.pause(element)); true } EventView::FlushStop(..) => { @@ -348,7 +346,7 @@ impl PadSrcHandler for TcpClientSrcPadHandler { if res == Ok(gst::StateChangeSuccess::Success) && state == gst::State::Playing || res == Ok(gst::StateChangeSuccess::Async) && pending == gst::State::Playing { - let _ = block_on(tcpclientsrc.start(element)); + let _ = runtime::executor::block_on(tcpclientsrc.start(element)); } true } @@ -385,7 +383,7 @@ impl PadSrcHandler for TcpClientSrcPadHandler { true } QueryView::Caps(ref mut q) => { - let inner = block_on(self.lock()); + let inner = runtime::executor::block_on(self.lock()); let caps = if let Some(ref caps) = inner.configured_caps { q.get_filter() .map(|f| f.intersect_with_mode(caps, gst::CapsIntersectMode::First)) @@ -675,32 +673,27 @@ impl ObjectImpl for TcpClientSrc { fn set_property(&self, _obj: &glib::Object, id: usize, value: &glib::Value) { let prop = &PROPERTIES[id]; + let mut settings = runtime::executor::block_on(self.settings.lock()); match *prop { subclass::Property("address", ..) => { - let mut settings = block_on(self.settings.lock()); settings.address = value.get().expect("type checked upstream"); } subclass::Property("port", ..) => { - let mut settings = block_on(self.settings.lock()); settings.port = value.get_some().expect("type checked upstream"); } subclass::Property("caps", ..) => { - let mut settings = block_on(self.settings.lock()); settings.caps = value.get().expect("type checked upstream"); } subclass::Property("chunk-size", ..) => { - let mut settings = block_on(self.settings.lock()); settings.chunk_size = value.get_some().expect("type checked upstream"); } subclass::Property("context", ..) => { - let mut settings = block_on(self.settings.lock()); settings.context = value .get() .expect("type checked upstream") .unwrap_or_else(|| "".into()); } subclass::Property("context-wait", ..) => { - let mut settings = block_on(self.settings.lock()); settings.context_wait = value.get_some().expect("type checked upstream"); } _ => unimplemented!(), @@ -710,31 +703,14 @@ impl ObjectImpl for TcpClientSrc { fn get_property(&self, _obj: &glib::Object, id: usize) -> Result { let prop = &PROPERTIES[id]; + let settings = runtime::executor::block_on(self.settings.lock()); match *prop { - subclass::Property("address", ..) => { - let settings = block_on(self.settings.lock()); - Ok(settings.address.to_value()) - } - subclass::Property("port", ..) => { - let settings = block_on(self.settings.lock()); - Ok(settings.port.to_value()) - } - subclass::Property("caps", ..) => { - let settings = block_on(self.settings.lock()); - Ok(settings.caps.to_value()) - } - subclass::Property("chunk-size", ..) => { - let settings = block_on(self.settings.lock()); - Ok(settings.chunk_size.to_value()) - } - subclass::Property("context", ..) => { - let settings = block_on(self.settings.lock()); - Ok(settings.context.to_value()) - } - subclass::Property("context-wait", ..) => { - let settings = block_on(self.settings.lock()); - Ok(settings.context_wait.to_value()) - } + subclass::Property("address", ..) => Ok(settings.address.to_value()), + subclass::Property("port", ..) => Ok(settings.port.to_value()), + subclass::Property("caps", ..) => Ok(settings.caps.to_value()), + subclass::Property("chunk-size", ..) => Ok(settings.chunk_size.to_value()), + subclass::Property("context", ..) => Ok(settings.context.to_value()), + subclass::Property("context-wait", ..) => Ok(settings.context_wait.to_value()), _ => unimplemented!(), } } @@ -759,22 +735,24 @@ impl ElementImpl for TcpClientSrc { match transition { gst::StateChange::NullToReady => { - block_on(self.prepare(element)).map_err(|err| { + runtime::executor::block_on(self.prepare(element)).map_err(|err| { element.post_error_message(&err); gst::StateChangeError })?; } gst::StateChange::ReadyToPaused => { - block_on(self.complete_preparation(element)).map_err(|err| { + runtime::executor::block_on(self.complete_preparation(element)).map_err(|err| { element.post_error_message(&err); gst::StateChangeError })?; } gst::StateChange::PlayingToPaused => { - block_on(self.pause(element)).map_err(|_| gst::StateChangeError)?; + runtime::executor::block_on(self.pause(element)) + .map_err(|_| gst::StateChangeError)?; } gst::StateChange::ReadyToNull => { - block_on(self.unprepare(element)).map_err(|_| gst::StateChangeError)?; + runtime::executor::block_on(self.unprepare(element)) + .map_err(|_| gst::StateChangeError)?; } _ => (), } @@ -786,10 +764,11 @@ impl ElementImpl for TcpClientSrc { success = gst::StateChangeSuccess::Success; } gst::StateChange::PausedToPlaying => { - block_on(self.start(element)).map_err(|_| gst::StateChangeError)?; + runtime::executor::block_on(self.start(element)) + .map_err(|_| gst::StateChangeError)?; } gst::StateChange::PausedToReady => { - let mut src_pad_handler = block_on(self.src_pad_handler.lock()); + let mut src_pad_handler = runtime::executor::block_on(self.src_pad_handler.lock()); src_pad_handler.need_initial_events = true; } _ => (), diff --git a/gst-plugin-threadshare/src/udpsrc.rs b/gst-plugin-threadshare/src/udpsrc.rs index 2ef672f8..c96bfb02 100644 --- a/gst-plugin-threadshare/src/udpsrc.rs +++ b/gst-plugin-threadshare/src/udpsrc.rs @@ -17,7 +17,6 @@ use either::Either; -use futures::executor::block_on; use futures::future::BoxFuture; use futures::lock::{Mutex, MutexGuard}; use futures::prelude::*; @@ -55,10 +54,8 @@ use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; #[cfg(windows)] use std::os::windows::io::{AsRawSocket, FromRawSocket, IntoRawSocket, RawSocket}; -use tokio::task::JoinHandle; - use crate::runtime::prelude::*; -use crate::runtime::{Context, PadSrc, PadSrcRef}; +use crate::runtime::{self, Context, JoinHandle, PadSrc, PadSrcRef}; use super::socket::{Socket, SocketRead, SocketStream}; @@ -508,7 +505,7 @@ impl PadSrcHandler for UdpSrcPadHandler { let ret = match event.view() { EventView::FlushStart(..) => { - let _ = block_on(udpsrc.pause(element)); + let _ = runtime::executor::block_on(udpsrc.pause(element)); true } EventView::FlushStop(..) => { @@ -516,7 +513,7 @@ impl PadSrcHandler for UdpSrcPadHandler { if res == Ok(gst::StateChangeSuccess::Success) && state == gst::State::Playing || res == Ok(gst::StateChangeSuccess::Async) && pending == gst::State::Playing { - let _ = block_on(udpsrc.start(element)); + let _ = runtime::executor::block_on(udpsrc.start(element)); } true } @@ -554,7 +551,7 @@ impl PadSrcHandler for UdpSrcPadHandler { true } QueryView::Caps(ref mut q) => { - let inner = block_on(self.lock()); + let inner = runtime::executor::block_on(self.lock()); let caps = if let Some(ref caps) = inner.configured_caps { q.get_filter() .map(|f| f.intersect_with_mode(caps, gst::CapsIntersectMode::First)) @@ -1032,29 +1029,24 @@ impl ObjectImpl for UdpSrc { fn set_property(&self, _obj: &glib::Object, id: usize, value: &glib::Value) { let prop = &PROPERTIES[id]; + let mut settings = runtime::executor::block_on(self.settings.lock()); match *prop { subclass::Property("address", ..) => { - let mut settings = block_on(self.settings.lock()); settings.address = value.get().expect("type checked upstream"); } subclass::Property("port", ..) => { - let mut settings = block_on(self.settings.lock()); settings.port = value.get_some().expect("type checked upstream"); } subclass::Property("reuse", ..) => { - let mut settings = block_on(self.settings.lock()); settings.reuse = value.get_some().expect("type checked upstream"); } subclass::Property("caps", ..) => { - let mut settings = block_on(self.settings.lock()); settings.caps = value.get().expect("type checked upstream"); } subclass::Property("mtu", ..) => { - let mut settings = block_on(self.settings.lock()); settings.mtu = value.get_some().expect("type checked upstream"); } subclass::Property("socket", ..) => { - let mut settings = block_on(self.settings.lock()); settings.socket = value .get::() .expect("type checked upstream") @@ -1064,18 +1056,15 @@ impl ObjectImpl for UdpSrc { unreachable!(); } subclass::Property("context", ..) => { - let mut settings = block_on(self.settings.lock()); settings.context = value .get() .expect("type checked upstream") .unwrap_or_else(|| "".into()); } subclass::Property("context-wait", ..) => { - let mut settings = block_on(self.settings.lock()); settings.context_wait = value.get_some().expect("type checked upstream"); } subclass::Property("retrieve-sender-address", ..) => { - let mut settings = block_on(self.settings.lock()); settings.retrieve_sender_address = value.get_some().expect("type checked upstream"); } _ => unimplemented!(), @@ -1085,53 +1074,26 @@ impl ObjectImpl for UdpSrc { fn get_property(&self, _obj: &glib::Object, id: usize) -> Result { let prop = &PROPERTIES[id]; + let settings = runtime::executor::block_on(self.settings.lock()); match *prop { - subclass::Property("address", ..) => { - let settings = block_on(self.settings.lock()); - Ok(settings.address.to_value()) - } - subclass::Property("port", ..) => { - let settings = block_on(self.settings.lock()); - Ok(settings.port.to_value()) - } - subclass::Property("reuse", ..) => { - let settings = block_on(self.settings.lock()); - Ok(settings.reuse.to_value()) - } - subclass::Property("caps", ..) => { - let settings = block_on(self.settings.lock()); - Ok(settings.caps.to_value()) - } - subclass::Property("mtu", ..) => { - let settings = block_on(self.settings.lock()); - Ok(settings.mtu.to_value()) - } - subclass::Property("socket", ..) => { - let settings = block_on(self.settings.lock()); - Ok(settings - .socket - .as_ref() - .map(GioSocketWrapper::as_socket) - .to_value()) - } - subclass::Property("used-socket", ..) => { - let settings = block_on(self.settings.lock()); - Ok(settings - .used_socket - .as_ref() - .map(GioSocketWrapper::as_socket) - .to_value()) - } - subclass::Property("context", ..) => { - let settings = block_on(self.settings.lock()); - Ok(settings.context.to_value()) - } - subclass::Property("context-wait", ..) => { - let settings = block_on(self.settings.lock()); - Ok(settings.context_wait.to_value()) - } + subclass::Property("address", ..) => Ok(settings.address.to_value()), + subclass::Property("port", ..) => Ok(settings.port.to_value()), + subclass::Property("reuse", ..) => Ok(settings.reuse.to_value()), + subclass::Property("caps", ..) => Ok(settings.caps.to_value()), + subclass::Property("mtu", ..) => Ok(settings.mtu.to_value()), + subclass::Property("socket", ..) => Ok(settings + .socket + .as_ref() + .map(GioSocketWrapper::as_socket) + .to_value()), + subclass::Property("used-socket", ..) => Ok(settings + .used_socket + .as_ref() + .map(GioSocketWrapper::as_socket) + .to_value()), + subclass::Property("context", ..) => Ok(settings.context.to_value()), + subclass::Property("context-wait", ..) => Ok(settings.context_wait.to_value()), subclass::Property("retrieve-sender-address", ..) => { - let settings = block_on(self.settings.lock()); Ok(settings.retrieve_sender_address.to_value()) } _ => unimplemented!(), @@ -1157,22 +1119,24 @@ impl ElementImpl for UdpSrc { match transition { gst::StateChange::NullToReady => { - block_on(self.prepare(element)).map_err(|err| { + runtime::executor::block_on(self.prepare(element)).map_err(|err| { element.post_error_message(&err); gst::StateChangeError })?; } gst::StateChange::ReadyToPaused => { - block_on(self.complete_preparation(element)).map_err(|err| { + runtime::executor::block_on(self.complete_preparation(element)).map_err(|err| { element.post_error_message(&err); gst::StateChangeError })?; } gst::StateChange::PlayingToPaused => { - block_on(self.pause(element)).map_err(|_| gst::StateChangeError)?; + runtime::executor::block_on(self.pause(element)) + .map_err(|_| gst::StateChangeError)?; } gst::StateChange::ReadyToNull => { - block_on(self.unprepare(element)).map_err(|_| gst::StateChangeError)?; + runtime::executor::block_on(self.unprepare(element)) + .map_err(|_| gst::StateChangeError)?; } _ => (), } @@ -1184,10 +1148,11 @@ impl ElementImpl for UdpSrc { success = gst::StateChangeSuccess::NoPreroll; } gst::StateChange::PausedToPlaying => { - block_on(self.start(element)).map_err(|_| gst::StateChangeError)?; + runtime::executor::block_on(self.start(element)) + .map_err(|_| gst::StateChangeError)?; } gst::StateChange::PausedToReady => { - block_on(async { + runtime::executor::block_on(async { self.src_pad_handler.lock().await.need_initial_events = true; }); } diff --git a/gst-plugin-threadshare/tests/pad.rs b/gst-plugin-threadshare/tests/pad.rs index 6b799872..0113e261 100644 --- a/gst-plugin-threadshare/tests/pad.rs +++ b/gst-plugin-threadshare/tests/pad.rs @@ -1,4 +1,4 @@ -// Copyright (C) 2019 François Laignel +// Copyright (C) 2019-2020 François Laignel // // This library is free software; you can redistribute it and/or // modify it under the terms of the GNU Library General Public @@ -18,7 +18,6 @@ use either::Either; use futures::channel::mpsc; -use futures::executor::block_on; use futures::future::BoxFuture; use futures::lock::Mutex; use futures::prelude::*; @@ -38,7 +37,7 @@ use std::boxed::Box; use std::sync::Arc; use gstthreadshare::runtime::prelude::*; -use gstthreadshare::runtime::{Context, PadContext, PadSink, PadSinkRef, PadSrc, PadSrcRef}; +use gstthreadshare::runtime::{self, Context, PadContext, PadSink, PadSinkRef, PadSrc, PadSrcRef}; const DEFAULT_CONTEXT: &str = ""; const SLEEP_DURATION: u32 = 2; @@ -151,7 +150,7 @@ impl PadSrcHandler for PadSrcHandlerTest { let ret = match event.view() { EventView::FlushStart(..) => { - let _ = block_on(elem_src_test.pause(element)); + let _ = runtime::executor::block_on(elem_src_test.pause(element)); true } EventView::FlushStop(..) => { @@ -159,7 +158,7 @@ impl PadSrcHandler for PadSrcHandlerTest { if res == Ok(gst::StateChangeSuccess::Success) && state == gst::State::Playing || res == Ok(gst::StateChangeSuccess::Async) && pending == gst::State::Playing { - let _ = block_on(elem_src_test.start(element)); + let _ = runtime::executor::block_on(elem_src_test.start(element)); } true } @@ -209,14 +208,13 @@ impl ElementSrcTest { let _state = self.state.lock().await; gst_debug!(SRC_CAT, obj: element, "Preparing"); - let settings = self.settings.lock().await; - - let context = Context::acquire(&settings.context, SLEEP_DURATION).map_err(|err| { - gst_error_msg!( - gst::ResourceError::OpenRead, - ["Failed to acquire Context: {}", err] - ) - })?; + let context = Context::acquire(&self.settings.lock().await.context, SLEEP_DURATION) + .map_err(|err| { + gst_error_msg!( + gst::ResourceError::OpenRead, + ["Failed to acquire Context: {}", err] + ) + })?; self.src_pad .prepare(context, &self.src_pad_handler) @@ -320,7 +318,7 @@ impl ObjectSubclass for ElementSrcTest { ElementSrcTest { src_pad, - src_pad_handler: PadSrcHandlerTest {}, + src_pad_handler: PadSrcHandlerTest, state: Mutex::new(ElementSrcState::default()), settings: Mutex::new(settings), } @@ -340,7 +338,7 @@ impl ObjectImpl for ElementSrcTest { .expect("type checked upstream") .unwrap_or_else(|| "".into()); - block_on(self.settings.lock()).context = context; + runtime::executor::block_on(self.settings.lock()).context = context; } _ => unimplemented!(), } @@ -364,16 +362,18 @@ impl ElementImpl for ElementSrcTest { match transition { gst::StateChange::NullToReady => { - block_on(self.prepare(element)).map_err(|err| { + runtime::executor::block_on(self.prepare(element)).map_err(|err| { element.post_error_message(&err); gst::StateChangeError })?; } gst::StateChange::PlayingToPaused => { - block_on(self.pause(element)).map_err(|_| gst::StateChangeError)?; + runtime::executor::block_on(self.pause(element)) + .map_err(|_| gst::StateChangeError)?; } gst::StateChange::ReadyToNull => { - block_on(self.unprepare(element)).map_err(|_| gst::StateChangeError)?; + runtime::executor::block_on(self.unprepare(element)) + .map_err(|_| gst::StateChangeError)?; } _ => (), } @@ -382,7 +382,8 @@ impl ElementImpl for ElementSrcTest { match transition { gst::StateChange::PausedToPlaying => { - block_on(self.start(element)).map_err(|_| gst::StateChangeError)?; + runtime::executor::block_on(self.start(element)) + .map_err(|_| gst::StateChangeError)?; } gst::StateChange::ReadyToPaused => { success = gst::StateChangeSuccess::NoPreroll; @@ -516,7 +517,7 @@ impl PadSinkHandler for PadSinkHandlerTest { } else { gst_debug!(SINK_CAT, obj: pad.gst_pad(), "Fowarding non-serialized event {:?}", event); Either::Left( - block_on(elem_sink_test.sender.lock()) + runtime::executor::block_on(elem_sink_test.sender.lock()) .as_mut() .expect("ItemSender not set") .try_send(Item::Event(event)) @@ -633,7 +634,7 @@ impl ObjectImpl for ElementSinkTest { .expect("type checked upstream") .expect("ItemSender not found") .clone(); - *block_on(self.sender.lock()) = Some(sender); + *runtime::executor::block_on(self.sender.lock()) = Some(sender); } _ => unimplemented!(), } @@ -657,10 +658,10 @@ impl ElementImpl for ElementSinkTest { match transition { gst::StateChange::NullToReady => { - block_on(self.sink_pad.prepare(&PadSinkHandlerTest {})); + runtime::executor::block_on(self.sink_pad.prepare(&PadSinkHandlerTest)); } gst::StateChange::ReadyToNull => { - block_on(self.sink_pad.unprepare()); + runtime::executor::block_on(self.sink_pad.unprepare()); } _ => (), } @@ -714,7 +715,7 @@ fn task() { pipeline.set_state(gst::State::Playing).unwrap(); // Initial events - block_on( + runtime::executor::block_on( elem_src_test.try_push(Item::Event( gst::Event::new_stream_start("stream_id_task_test") .group_id(gst::util_group_id_next()) @@ -723,7 +724,7 @@ fn task() { ) .unwrap(); - match block_on(receiver.next()).unwrap() { + match runtime::executor::block_on(receiver.next()).unwrap() { Item::Event(event) => match event.view() { gst::EventView::CustomDownstreamSticky(e) => { assert!(PadContext::is_pad_context_sticky_event(&e)) @@ -733,7 +734,7 @@ fn task() { other => panic!("Unexpected item {:?}", other), } - match block_on(receiver.next()).unwrap() { + match runtime::executor::block_on(receiver.next()).unwrap() { Item::Event(event) => match event.view() { gst::EventView::StreamStart(_) => (), other => panic!("Unexpected event {:?}", other), @@ -741,12 +742,12 @@ fn task() { other => panic!("Unexpected item {:?}", other), } - block_on(elem_src_test.try_push(Item::Event( + runtime::executor::block_on(elem_src_test.try_push(Item::Event( gst::Event::new_segment(&gst::FormattedSegment::::new()).build(), ))) .unwrap(); - match block_on(receiver.next()).unwrap() { + match runtime::executor::block_on(receiver.next()).unwrap() { Item::Event(event) => match event.view() { gst::EventView::Segment(_) => (), other => panic!("Unexpected event {:?}", other), @@ -755,10 +756,12 @@ fn task() { } // Buffer - block_on(elem_src_test.try_push(Item::Buffer(gst::Buffer::from_slice(vec![1, 2, 3, 4])))) - .unwrap(); + runtime::executor::block_on( + elem_src_test.try_push(Item::Buffer(gst::Buffer::from_slice(vec![1, 2, 3, 4]))), + ) + .unwrap(); - match block_on(receiver.next()).unwrap() { + match runtime::executor::block_on(receiver.next()).unwrap() { Item::Buffer(buffer) => { let data = buffer.map_readable().unwrap(); assert_eq!(data.as_slice(), vec![1, 2, 3, 4].as_slice()); @@ -771,9 +774,9 @@ fn task() { list.get_mut() .unwrap() .add(gst::Buffer::from_slice(vec![1, 2, 3, 4])); - block_on(elem_src_test.try_push(Item::BufferList(list))).unwrap(); + runtime::executor::block_on(elem_src_test.try_push(Item::BufferList(list))).unwrap(); - match block_on(receiver.next()).unwrap() { + match runtime::executor::block_on(receiver.next()).unwrap() { Item::BufferList(_) => (), other => panic!("Unexpected item {:?}", other), } @@ -782,8 +785,10 @@ fn task() { pipeline.set_state(gst::State::Paused).unwrap(); // Items not longer accepted - block_on(elem_src_test.try_push(Item::Buffer(gst::Buffer::from_slice(vec![1, 2, 3, 4])))) - .unwrap_err(); + runtime::executor::block_on( + elem_src_test.try_push(Item::Buffer(gst::Buffer::from_slice(vec![1, 2, 3, 4]))), + ) + .unwrap_err(); // Nothing forwarded receiver.try_next().unwrap_err(); @@ -810,8 +815,9 @@ fn task() { .unwrap()); // EOS - block_on(elem_src_test.try_push(Item::Event(gst::Event::new_eos().build()))).unwrap(); - match block_on(receiver.next()).unwrap() { + runtime::executor::block_on(elem_src_test.try_push(Item::Event(gst::Event::new_eos().build()))) + .unwrap(); + match runtime::executor::block_on(receiver.next()).unwrap() { Item::Event(event) => match event.view() { gst::EventView::Eos(_) => (), other => panic!("Unexpected event {:?}", other), @@ -823,7 +829,7 @@ fn task() { pipeline.set_state(gst::State::Ready).unwrap(); // Receiver was dropped when stopping => can't send anymore - block_on( + runtime::executor::block_on( elem_src_test.try_push(Item::Event( gst::Event::new_stream_start("stream_id_task_test_past_stop") .group_id(gst::util_group_id_next())