From 0221524a1091cfea688f95805e37ad71b7f4778d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Laignel?= Date: Sat, 30 Nov 2019 19:51:31 +0100 Subject: [PATCH] Update to tokio release 0.2.5 + throttling --- .gitlab-ci.yml | 40 +- gst-plugin-threadshare/Cargo.toml | 5 +- gst-plugin-threadshare/examples/benchmark.rs | 12 +- gst-plugin-threadshare/src/appsrc.rs | 42 +- gst-plugin-threadshare/src/jitterbuffer.rs | 60 +- gst-plugin-threadshare/src/lib.rs | 2 +- gst-plugin-threadshare/src/proxy.rs | 56 +- gst-plugin-threadshare/src/queue.rs | 34 +- .../src/runtime/executor.rs | 517 ++++-------------- .../src/runtime/future/abortable_waitable.rs | 198 ------- .../src/runtime/future/mod.rs | 24 - .../src/runtime/future/waitable.rs | 224 -------- gst-plugin-threadshare/src/runtime/macros.rs | 25 - gst-plugin-threadshare/src/runtime/mod.rs | 11 +- gst-plugin-threadshare/src/runtime/pad.rs | 9 +- .../src/runtime/pad_context.rs | 63 +-- gst-plugin-threadshare/src/runtime/task.rs | 34 +- gst-plugin-threadshare/src/tcpclientsrc.rs | 194 ++++--- gst-plugin-threadshare/src/udpsrc.rs | 184 ++++--- gst-plugin-threadshare/tests/pad.rs | 70 +-- gst-plugin-threadshare/tests/pipeline.rs | 126 +++-- rust-toolchain | 1 - 22 files changed, 597 insertions(+), 1334 deletions(-) delete mode 100644 gst-plugin-threadshare/src/runtime/future/abortable_waitable.rs delete mode 100644 gst-plugin-threadshare/src/runtime/future/mod.rs delete mode 100644 gst-plugin-threadshare/src/runtime/future/waitable.rs delete mode 100644 gst-plugin-threadshare/src/runtime/macros.rs delete mode 100644 rust-toolchain diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index d0d0202c..0dbe6a20 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -55,32 +55,17 @@ stages: - cargo build --color=always --all --examples --all-features - G_DEBUG=fatal_warnings cargo test --color=always --all --examples --all-features -#test 1.39: +test 1.39: # 1.39 img # https://hub.docker.com/_/rust/ -# image: "rust:1.39-slim-buster" -# extends: '.cargo test' + image: "rust:1.39-slim-buster" + extends: '.cargo test' -#test stable: +test stable: # Stable img # https://hub.docker.com/_/rust/ -# image: "rust:slim-buster" -# extends: '.cargo test' - -test beta: - # https://hub.docker.com/_/rust/ image: "rust:slim-buster" - extends: '.tarball_setup' - script: - - export CARGO_HOME=/usr/local/cargo # will install a new toolchain, reset CARGO_HOME to its default path - - rustup toolchain install beta - - export CARGO_HOME=${CI_PROJECT_DIR}/.cargo_home - - rustup override set beta - - rustc --version - - cargo build --color=always --all - - G_DEBUG=fatal_warnings cargo test --color=always --all - - cargo build --color=always --all --examples --all-features - - G_DEBUG=fatal_warnings cargo test --color=always --all --examples --all-features + extends: '.cargo test' test nightly: # Nightly @@ -91,15 +76,9 @@ test nightly: rustfmt: image: "rust:slim-buster" - extends: '.tarball_setup' stage: "lint" script: - - export CARGO_HOME=/usr/local/cargo # will install a new toolchain, reset CARGO_HOME to its default path - - rustup toolchain install beta - - rustup component add rustfmt --toolchain beta - - export CARGO_HOME=${CI_PROJECT_DIR}/.cargo_home - - rustup override set beta - - rustc --version + - rustup component add rustfmt - cargo fmt --version - cargo fmt -- --color=always --check @@ -108,12 +87,7 @@ clippy: image: "rust:slim-buster" stage: 'extras' script: - - export CARGO_HOME=/usr/local/cargo # will install a new toolchain, reset CARGO_HOME to its default path - - rustup toolchain install beta - - rustup component add clippy --toolchain beta - - export CARGO_HOME=${CI_PROJECT_DIR}/.cargo_home - - rustup override set beta - - rustc --version + - rustup component add clippy-preview - cargo clippy --color=always --all --all-features -- -A clippy::redundant_pattern_matching -A clippy::single_match -A clippy::cast_lossless audit: diff --git a/gst-plugin-threadshare/Cargo.toml b/gst-plugin-threadshare/Cargo.toml index 1a448d66..a346f288 100644 --- a/gst-plugin-threadshare/Cargo.toml +++ b/gst-plugin-threadshare/Cargo.toml @@ -21,10 +21,7 @@ gst-net = { package = "gstreamer-net", git = "https://gitlab.freedesktop.org/gst gst-rtp = { package = "gstreamer-rtp", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" } gstreamer-sys = { git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs-sys" } pin-project = "0.4" -tokio = "=0.2.0-alpha.6" -tokio-executor = { version = "=0.2.0-alpha.6", features = ["current-thread"] } -tokio-net = { version = "=0.2.0-alpha.6", features = ["tcp", "udp"] } -tokio-timer = "=0.3.0-alpha.6" +tokio = { git = "https://github.com/fengalin/tokio", branch = "fengalin/0.2.5-throttling", features = ["io-util", "macros", "rt-core", "sync", "time", "tcp", "udp"] } futures = "0.3" lazy_static = "1.0" either = "1.0" diff --git a/gst-plugin-threadshare/examples/benchmark.rs b/gst-plugin-threadshare/examples/benchmark.rs index a5d88353..f7d164ed 100644 --- a/gst-plugin-threadshare/examples/benchmark.rs +++ b/gst-plugin-threadshare/examples/benchmark.rs @@ -60,10 +60,10 @@ fn main() { let pipeline = gst::Pipeline::new(None); for i in 0..n_streams { - let fakesink = + let sink = gst::ElementFactory::make("fakesink", Some(format!("sink-{}", i).as_str())).unwrap(); - fakesink.set_property("sync", &false).unwrap(); - fakesink.set_property("async", &false).unwrap(); + sink.set_property("sync", &false).unwrap(); + sink.set_property("async", &false).unwrap(); let source = match source.as_str() { "udpsrc" => { @@ -117,7 +117,7 @@ fn main() { .set_property("samplesperbuffer", &((wait as i32) * 8000 / 1000)) .unwrap(); - fakesink.set_property("sync", &true).unwrap(); + sink.set_property("sync", &true).unwrap(); source } @@ -138,8 +138,8 @@ fn main() { _ => unimplemented!(), }; - pipeline.add_many(&[&source, &fakesink]).unwrap(); - source.link(&fakesink).unwrap(); + pipeline.add_many(&[&source, &sink]).unwrap(); + source.link(&sink).unwrap(); } let bus = pipeline.get_bus().unwrap(); diff --git a/gst-plugin-threadshare/src/appsrc.rs b/gst-plugin-threadshare/src/appsrc.rs index 7c97ae52..fb6df302 100644 --- a/gst-plugin-threadshare/src/appsrc.rs +++ b/gst-plugin-threadshare/src/appsrc.rs @@ -18,6 +18,7 @@ use either::Either; use futures::channel::mpsc; +use futures::executor::block_on; use futures::future::BoxFuture; use futures::lock::{Mutex, MutexGuard}; use futures::prelude::*; @@ -42,7 +43,6 @@ use std::convert::TryInto; use std::sync::Arc; use std::u32; -use crate::block_on; use crate::runtime::prelude::*; use crate::runtime::{Context, PadSrc, PadSrcRef}; @@ -280,7 +280,7 @@ impl PadSrcHandler for AppSrcPadHandler { let ret = match event.view() { EventView::FlushStart(..) => { - let _ = block_on!(app_src.pause(element)); + let _ = block_on(app_src.pause(element)); true } EventView::FlushStop(..) => { @@ -288,7 +288,7 @@ impl PadSrcHandler for AppSrcPadHandler { if res == Ok(gst::StateChangeSuccess::Success) && state == gst::State::Playing || res == Ok(gst::StateChangeSuccess::Async) && pending == gst::State::Playing { - let _ = block_on!(app_src.start(element)); + let _ = block_on(app_src.start(element)); } true } @@ -325,7 +325,7 @@ impl PadSrcHandler for AppSrcPadHandler { true } QueryView::Caps(ref mut q) => { - let inner = block_on!(self.lock()); + let inner = block_on(self.lock()); let caps = if let Some(ref caps) = inner.configured_caps { q.get_filter() .map(|f| f.intersect_with_mode(caps, gst::CapsIntersectMode::First)) @@ -540,7 +540,7 @@ impl ObjectSubclass for AppSrc { .expect("missing signal arg"); let appsrc = Self::from_instance(&element); - Some(block_on!(appsrc.push_buffer(&element, buffer)).to_value()) + Some(block_on(appsrc.push_buffer(&element, buffer)).to_value()) }, ); @@ -555,7 +555,7 @@ impl ObjectSubclass for AppSrc { .expect("signal arg") .expect("missing signal arg"); let appsrc = Self::from_instance(&element); - Some(block_on!(appsrc.end_of_stream(&element)).to_value()) + Some(block_on(appsrc.end_of_stream(&element)).to_value()) }, ); } @@ -581,26 +581,26 @@ impl ObjectImpl for AppSrc { match *prop { subclass::Property("context", ..) => { - let mut settings = block_on!(self.settings.lock()); + let mut settings = block_on(self.settings.lock()); settings.context = value .get() .expect("type checked upstream") .unwrap_or_else(|| "".into()); } subclass::Property("context-wait", ..) => { - let mut settings = block_on!(self.settings.lock()); + let mut settings = block_on(self.settings.lock()); settings.context_wait = value.get_some().expect("type checked upstream"); } subclass::Property("caps", ..) => { - let mut settings = block_on!(self.settings.lock()); + let mut settings = block_on(self.settings.lock()); settings.caps = value.get().expect("type checked upstream"); } subclass::Property("max-buffers", ..) => { - let mut settings = block_on!(self.settings.lock()); + let mut settings = block_on(self.settings.lock()); settings.max_buffers = value.get_some().expect("type checked upstream"); } subclass::Property("do-timestamp", ..) => { - let mut settings = block_on!(self.settings.lock()); + let mut settings = block_on(self.settings.lock()); settings.do_timestamp = value.get_some().expect("type checked upstream"); } _ => unimplemented!(), @@ -612,23 +612,23 @@ impl ObjectImpl for AppSrc { match *prop { subclass::Property("context", ..) => { - let settings = block_on!(self.settings.lock()); + let settings = block_on(self.settings.lock()); Ok(settings.context.to_value()) } subclass::Property("context-wait", ..) => { - let settings = block_on!(self.settings.lock()); + let settings = block_on(self.settings.lock()); Ok(settings.context_wait.to_value()) } subclass::Property("caps", ..) => { - let settings = block_on!(self.settings.lock()); + let settings = block_on(self.settings.lock()); Ok(settings.caps.to_value()) } subclass::Property("max-buffers", ..) => { - let settings = block_on!(self.settings.lock()); + let settings = block_on(self.settings.lock()); Ok(settings.max_buffers.to_value()) } subclass::Property("do-timestamp", ..) => { - let settings = block_on!(self.settings.lock()); + let settings = block_on(self.settings.lock()); Ok(settings.do_timestamp.to_value()) } _ => unimplemented!(), @@ -655,16 +655,16 @@ impl ElementImpl for AppSrc { match transition { gst::StateChange::NullToReady => { - block_on!(self.prepare(element)).map_err(|err| { + block_on(self.prepare(element)).map_err(|err| { element.post_error_message(&err); gst::StateChangeError })?; } gst::StateChange::PlayingToPaused => { - block_on!(self.pause(element)).map_err(|_| gst::StateChangeError)?; + block_on(self.pause(element)).map_err(|_| gst::StateChangeError)?; } gst::StateChange::ReadyToNull => { - block_on!(self.unprepare(element)).map_err(|_| gst::StateChangeError)?; + block_on(self.unprepare(element)).map_err(|_| gst::StateChangeError)?; } _ => (), } @@ -676,10 +676,10 @@ impl ElementImpl for AppSrc { success = gst::StateChangeSuccess::NoPreroll; } gst::StateChange::PausedToPlaying => { - block_on!(self.start(element)).map_err(|_| gst::StateChangeError)?; + block_on(self.start(element)).map_err(|_| gst::StateChangeError)?; } gst::StateChange::PausedToReady => { - block_on!(async { + block_on(async { self.src_pad_handler.lock().await.need_initial_events = true; }); } diff --git a/gst-plugin-threadshare/src/jitterbuffer.rs b/gst-plugin-threadshare/src/jitterbuffer.rs index 5d7efdd6..d5b274c9 100644 --- a/gst-plugin-threadshare/src/jitterbuffer.rs +++ b/gst-plugin-threadshare/src/jitterbuffer.rs @@ -17,8 +17,9 @@ use either::Either; +use futures::executor::block_on; use futures::future::BoxFuture; -use futures::future::{abortable, AbortHandle}; +use futures::future::{abortable, AbortHandle, Aborted}; use futures::lock::{Mutex, MutexGuard}; use futures::prelude::*; @@ -41,7 +42,6 @@ use std::cmp::{max, min, Ordering}; use std::collections::{BTreeSet, VecDeque}; use std::time::Duration; -use crate::runtime::future::{abortable_waitable, AbortWaitHandle}; use crate::runtime::prelude::*; use crate::runtime::{Context, PadContext, PadContextWeak, PadSink, PadSinkRef, PadSrc, PadSrcRef}; @@ -226,7 +226,7 @@ impl PadSinkHandler for JitterBufferPadSinkHandler { }.boxed()) } else { if let EventView::FlushStop(..) = event.view() { - block_on!(jitterbuffer.flush(element)); + block_on(jitterbuffer.flush(element)); } gst_log!(CAT, obj: pad.gst_pad(), "Forwarding non-serialized event {:?}", event); @@ -247,7 +247,7 @@ impl PadSinkHandler for JitterBufferPadSinkHandler { match query.view_mut() { QueryView::Drain(..) => { gst_info!(CAT, obj: pad.gst_pad(), "Draining"); - block_on!(jitterbuffer.enqueue_item(pad.gst_pad(), element, None)).is_ok() + block_on(jitterbuffer.enqueue_item(pad.gst_pad(), element, None)).is_ok() } _ => jitterbuffer.src_pad.gst_pad().peer_query(query), } @@ -278,7 +278,7 @@ impl PadSrcHandler for JitterBufferPadSrcHandler { if ret { let (_, mut min_latency, _) = peer_query.get_result(); let our_latency = - block_on!(jitterbuffer.settings.lock()).latency_ms as u64 * gst::MSECOND; + block_on(jitterbuffer.settings.lock()).latency_ms as u64 * gst::MSECOND; min_latency += our_latency; let max_latency = gst::CLOCK_TIME_NONE; @@ -292,7 +292,7 @@ impl PadSrcHandler for JitterBufferPadSrcHandler { if q.get_format() != gst::Format::Time { jitterbuffer.sink_pad.gst_pad().peer_query(query) } else { - q.set(block_on!(jitterbuffer.state.lock()).segment.get_position()); + q.set(block_on(jitterbuffer.state.lock()).segment.get_position()); true } } @@ -354,7 +354,8 @@ struct State { discont: bool, last_res: Result, task_queue_abort_handle: Option, - wakeup_abort_handle: Option, + wakeup_abort_handle: Option, + wakeup_join_handle: Option>>, } impl Default for State { @@ -383,6 +384,7 @@ impl Default for State { last_res: Ok(gst::FlowSuccess::Ok), task_queue_abort_handle: None, wakeup_abort_handle: None, + wakeup_join_handle: None, } } } @@ -941,6 +943,9 @@ impl JitterBuffer { if let Some(wakeup_abort_handle) = state.wakeup_abort_handle.take() { wakeup_abort_handle.abort(); } + if let Some(wakeup_join_handle) = state.wakeup_join_handle.take() { + let _ = wakeup_join_handle.await; + } let pad_src_state = self.src_pad.lock_state().await; let pad_ctx = pad_src_state.pad_context().unwrap(); @@ -953,9 +958,8 @@ impl JitterBuffer { Self::wakeup_fut(latency_ns, context_wait_ns, element, pad_ctx_weak) }); - let (wakeup_fut, abort_handle) = abortable_waitable(wakeup_fut); - pad_ctx.spawn(wakeup_fut.map(drop)); - + let (wakeup_fut, abort_handle) = abortable(wakeup_fut); + state.wakeup_join_handle = Some(pad_ctx.spawn(wakeup_fut)); state.wakeup_abort_handle = Some(abort_handle); } @@ -1136,7 +1140,7 @@ impl ObjectSubclass for JitterBuffer { .expect("signal arg") .expect("missing signal arg"); let jitterbuffer = Self::from_instance(&element); - block_on!(jitterbuffer.clear_pt_map(&element)); + block_on(jitterbuffer.clear_pt_map(&element)); None }, ); @@ -1176,10 +1180,10 @@ impl ObjectImpl for JitterBuffer { match *prop { subclass::Property("latency", ..) => { - let mut settings = block_on!(self.settings.lock()); + let mut settings = block_on(self.settings.lock()); settings.latency_ms = value.get_some().expect("type checked upstream"); - block_on!(self.state.lock()) + block_on(self.state.lock()) .jbuf .borrow() .set_delay(settings.latency_ms as u64 * gst::MSECOND); @@ -1187,26 +1191,26 @@ impl ObjectImpl for JitterBuffer { /* TODO: post message */ } subclass::Property("do-lost", ..) => { - let mut settings = block_on!(self.settings.lock()); + let mut settings = block_on(self.settings.lock()); settings.do_lost = value.get_some().expect("type checked upstream"); } subclass::Property("max-dropout-time", ..) => { - let mut settings = block_on!(self.settings.lock()); + let mut settings = block_on(self.settings.lock()); settings.max_dropout_time = value.get_some().expect("type checked upstream"); } subclass::Property("max-misorder-time", ..) => { - let mut settings = block_on!(self.settings.lock()); + let mut settings = block_on(self.settings.lock()); settings.max_misorder_time = value.get_some().expect("type checked upstream"); } subclass::Property("context", ..) => { - let mut settings = block_on!(self.settings.lock()); + let mut settings = block_on(self.settings.lock()); settings.context = value .get() .expect("type checked upstream") .unwrap_or_else(|| "".into()); } subclass::Property("context-wait", ..) => { - let mut settings = block_on!(self.settings.lock()); + let mut settings = block_on(self.settings.lock()); settings.context_wait = value.get_some().expect("type checked upstream"); } _ => unimplemented!(), @@ -1218,23 +1222,23 @@ impl ObjectImpl for JitterBuffer { match *prop { subclass::Property("latency", ..) => { - let settings = block_on!(self.settings.lock()); + let settings = block_on(self.settings.lock()); Ok(settings.latency_ms.to_value()) } subclass::Property("do-lost", ..) => { - let settings = block_on!(self.settings.lock()); + let settings = block_on(self.settings.lock()); Ok(settings.do_lost.to_value()) } subclass::Property("max-dropout-time", ..) => { - let settings = block_on!(self.settings.lock()); + let settings = block_on(self.settings.lock()); Ok(settings.max_dropout_time.to_value()) } subclass::Property("max-misorder-time", ..) => { - let settings = block_on!(self.settings.lock()); + let settings = block_on(self.settings.lock()); Ok(settings.max_misorder_time.to_value()) } subclass::Property("stats", ..) => { - let state = block_on!(self.state.lock()); + let state = block_on(self.state.lock()); let s = gst::Structure::new( "application/x-rtp-jitterbuffer-stats", &[ @@ -1246,11 +1250,11 @@ impl ObjectImpl for JitterBuffer { Ok(s.to_value()) } subclass::Property("context", ..) => { - let settings = block_on!(self.settings.lock()); + let settings = block_on(self.settings.lock()); Ok(settings.context.to_value()) } subclass::Property("context-wait", ..) => { - let settings = block_on!(self.settings.lock()); + let settings = block_on(self.settings.lock()); Ok(settings.context_wait.to_value()) } _ => unimplemented!(), @@ -1275,7 +1279,7 @@ impl ElementImpl for JitterBuffer { gst_trace!(CAT, obj: element, "Changing state {:?}", transition); match transition { - gst::StateChange::NullToReady => block_on!(async { + gst::StateChange::NullToReady => block_on(async { let _state = self.state.lock().await; let settings = self.settings.lock().await; @@ -1294,7 +1298,7 @@ impl ElementImpl for JitterBuffer { self.sink_pad.prepare(&JitterBufferPadSinkHandler {}).await; }), - gst::StateChange::PausedToReady => block_on!(async { + gst::StateChange::PausedToReady => block_on(async { let mut state = self.state.lock().await; if let Some(wakeup_abort_handle) = state.wakeup_abort_handle.take() { @@ -1305,7 +1309,7 @@ impl ElementImpl for JitterBuffer { abort_handle.abort(); } }), - gst::StateChange::ReadyToNull => block_on!(async { + gst::StateChange::ReadyToNull => block_on(async { let mut state = self.state.lock().await; self.sink_pad.unprepare().await; diff --git a/gst-plugin-threadshare/src/lib.rs b/gst-plugin-threadshare/src/lib.rs index 89254f8d..0748fcde 100644 --- a/gst-plugin-threadshare/src/lib.rs +++ b/gst-plugin-threadshare/src/lib.rs @@ -24,7 +24,7 @@ #![recursion_limit = "1024"] #![crate_type = "cdylib"] -pub use tokio_executor; +pub use tokio; #[macro_use] pub mod runtime; diff --git a/gst-plugin-threadshare/src/proxy.rs b/gst-plugin-threadshare/src/proxy.rs index c6cd75be..2b3169ec 100644 --- a/gst-plugin-threadshare/src/proxy.rs +++ b/gst-plugin-threadshare/src/proxy.rs @@ -18,6 +18,7 @@ use either::Either; use futures::channel::oneshot; +use futures::executor::block_on; use futures::future::BoxFuture; use futures::lock::{Mutex, MutexGuard}; use futures::prelude::*; @@ -40,7 +41,6 @@ use std::collections::{HashMap, VecDeque}; use std::sync::{Arc, Weak}; use std::{u32, u64}; -use crate::block_on; use crate::runtime::prelude::*; use crate::runtime::{Context, PadSink, PadSinkRef, PadSrc, PadSrcRef}; @@ -364,7 +364,7 @@ impl PadSinkHandler for ProxySinkPadHandler { } else { match event.view() { EventView::FlushStart(..) => { - let _ = block_on!(proxysink.stop(element)); + let _ = block_on(proxysink.stop(element)); } EventView::FlushStop(..) => { let (res, state, pending) = element.get_state(0.into()); @@ -372,7 +372,7 @@ impl PadSinkHandler for ProxySinkPadHandler { || res == Ok(gst::StateChangeSuccess::Async) && pending == gst::State::Paused { - let _ = block_on!(proxysink.start(&element)); + let _ = block_on(proxysink.start(&element)); } } _ => (), @@ -380,7 +380,7 @@ impl PadSinkHandler for ProxySinkPadHandler { gst_debug!(SINK_CAT, obj: pad.gst_pad(), "Fowarding non-serialized event {:?}", event); // FIXME proxysink can't forward directly to the src_pad of the proxysrc - let _ = block_on!(proxysink.enqueue_item(&element, DataQueueItem::Event(event))); + let _ = block_on(proxysink.enqueue_item(&element, DataQueueItem::Event(event))); Either::Left(true) } @@ -706,7 +706,7 @@ impl ObjectImpl for ProxySink { match *prop { subclass::Property("proxy-context", ..) => { - let mut settings = block_on!(self.settings.lock()); + let mut settings = block_on(self.settings.lock()); settings.proxy_context = value .get() .expect("type checked upstream") @@ -721,7 +721,7 @@ impl ObjectImpl for ProxySink { match *prop { subclass::Property("proxy-context", ..) => { - let settings = block_on!(self.settings.lock()); + let settings = block_on(self.settings.lock()); Ok(settings.proxy_context.to_value()) } _ => unimplemented!(), @@ -748,16 +748,16 @@ impl ElementImpl for ProxySink { match transition { gst::StateChange::NullToReady => { - block_on!(self.prepare(element)).map_err(|err| { + block_on(self.prepare(element)).map_err(|err| { element.post_error_message(&err); gst::StateChangeError })?; } gst::StateChange::PausedToReady => { - block_on!(self.stop(element)).map_err(|_| gst::StateChangeError)?; + block_on(self.stop(element)).map_err(|_| gst::StateChangeError)?; } gst::StateChange::ReadyToNull => { - block_on!(self.unprepare(element)).map_err(|_| gst::StateChangeError)?; + block_on(self.unprepare(element)).map_err(|_| gst::StateChangeError)?; } _ => (), } @@ -765,7 +765,7 @@ impl ElementImpl for ProxySink { let success = self.parent_change_state(element, transition)?; if transition == gst::StateChange::ReadyToPaused { - block_on!(self.start(element)).map_err(|_| gst::StateChangeError)?; + block_on(self.start(element)).map_err(|_| gst::StateChangeError)?; } Ok(success) @@ -881,7 +881,7 @@ impl PadSrcHandler for ProxySrcPadHandler { let ret = match event.view() { EventView::FlushStart(..) => { - let _ = block_on!(proxysrc.pause(element)); + let _ = block_on(proxysrc.pause(element)); true } EventView::FlushStop(..) => { @@ -889,7 +889,7 @@ impl PadSrcHandler for ProxySrcPadHandler { if res == Ok(gst::StateChangeSuccess::Success) && state == gst::State::Playing || res == Ok(gst::StateChangeSuccess::Async) && pending == gst::State::Playing { - let _ = block_on!(proxysrc.start(element)); + let _ = block_on(proxysrc.start(element)); } true } @@ -1157,30 +1157,30 @@ impl ObjectImpl for ProxySrc { match *prop { subclass::Property("max-size-buffers", ..) => { - let mut settings = block_on!(self.settings.lock()); + let mut settings = block_on(self.settings.lock()); settings.max_size_buffers = value.get_some().expect("type checked upstream"); } subclass::Property("max-size-bytes", ..) => { - let mut settings = block_on!(self.settings.lock()); + let mut settings = block_on(self.settings.lock()); settings.max_size_bytes = value.get_some().expect("type checked upstream"); } subclass::Property("max-size-time", ..) => { - let mut settings = block_on!(self.settings.lock()); + let mut settings = block_on(self.settings.lock()); settings.max_size_time = value.get_some().expect("type checked upstream"); } subclass::Property("context", ..) => { - let mut settings = block_on!(self.settings.lock()); + let mut settings = block_on(self.settings.lock()); settings.context = value .get() .expect("type checked upstream") .unwrap_or_else(|| "".into()); } subclass::Property("context-wait", ..) => { - let mut settings = block_on!(self.settings.lock()); + let mut settings = block_on(self.settings.lock()); settings.context_wait = value.get_some().expect("type checked upstream"); } subclass::Property("proxy-context", ..) => { - let mut settings = block_on!(self.settings.lock()); + let mut settings = block_on(self.settings.lock()); settings.proxy_context = value .get() .expect("type checked upstream") @@ -1195,27 +1195,27 @@ impl ObjectImpl for ProxySrc { match *prop { subclass::Property("max-size-buffers", ..) => { - let settings = block_on!(self.settings.lock()); + let settings = block_on(self.settings.lock()); Ok(settings.max_size_buffers.to_value()) } subclass::Property("max-size-bytes", ..) => { - let settings = block_on!(self.settings.lock()); + let settings = block_on(self.settings.lock()); Ok(settings.max_size_bytes.to_value()) } subclass::Property("max-size-time", ..) => { - let settings = block_on!(self.settings.lock()); + let settings = block_on(self.settings.lock()); Ok(settings.max_size_time.to_value()) } subclass::Property("context", ..) => { - let settings = block_on!(self.settings.lock()); + let settings = block_on(self.settings.lock()); Ok(settings.context.to_value()) } subclass::Property("context-wait", ..) => { - let settings = block_on!(self.settings.lock()); + let settings = block_on(self.settings.lock()); Ok(settings.context_wait.to_value()) } subclass::Property("proxy-context", ..) => { - let settings = block_on!(self.settings.lock()); + let settings = block_on(self.settings.lock()); Ok(settings.proxy_context.to_value()) } _ => unimplemented!(), @@ -1242,16 +1242,16 @@ impl ElementImpl for ProxySrc { match transition { gst::StateChange::NullToReady => { - block_on!(self.prepare(element)).map_err(|err| { + block_on(self.prepare(element)).map_err(|err| { element.post_error_message(&err); gst::StateChangeError })?; } gst::StateChange::PlayingToPaused => { - block_on!(self.pause(element)).map_err(|_| gst::StateChangeError)?; + block_on(self.pause(element)).map_err(|_| gst::StateChangeError)?; } gst::StateChange::ReadyToNull => { - block_on!(self.unprepare(element)).map_err(|_| gst::StateChangeError)?; + block_on(self.unprepare(element)).map_err(|_| gst::StateChangeError)?; } _ => (), } @@ -1263,7 +1263,7 @@ impl ElementImpl for ProxySrc { success = gst::StateChangeSuccess::NoPreroll; } gst::StateChange::PausedToPlaying => { - block_on!(self.start(element)).map_err(|_| gst::StateChangeError)?; + block_on(self.start(element)).map_err(|_| gst::StateChangeError)?; } _ => (), } diff --git a/gst-plugin-threadshare/src/queue.rs b/gst-plugin-threadshare/src/queue.rs index 2fd777c4..9ab6c72a 100644 --- a/gst-plugin-threadshare/src/queue.rs +++ b/gst-plugin-threadshare/src/queue.rs @@ -18,6 +18,7 @@ use either::Either; use futures::channel::oneshot; +use futures::executor::block_on; use futures::future::BoxFuture; use futures::lock::Mutex; use futures::prelude::*; @@ -39,7 +40,6 @@ use lazy_static::lazy_static; use std::collections::VecDeque; use std::{u32, u64}; -use crate::block_on; use crate::runtime::prelude::*; use crate::runtime::{Context, PadSink, PadSinkRef, PadSrc, PadSrcRef}; @@ -372,7 +372,7 @@ impl PadSrcHandler for QueuePadSrcHandler { } else { match event.view() { EventView::FlushStart(..) => { - let _ = block_on!(queue.stop(element)); + let _ = block_on(queue.stop(element)); } EventView::FlushStop(..) => { let (res, state, pending) = element.get_state(0.into()); @@ -380,7 +380,7 @@ impl PadSrcHandler for QueuePadSrcHandler { || res == Ok(gst::StateChangeSuccess::Async) && pending == gst::State::Playing { - let _ = block_on!(queue.start(element)); + let _ = block_on(queue.start(element)); } } _ => (), @@ -812,26 +812,26 @@ impl ObjectImpl for Queue { match *prop { subclass::Property("max-size-buffers", ..) => { - let mut settings = block_on!(self.settings.lock()); + let mut settings = block_on(self.settings.lock()); settings.max_size_buffers = value.get_some().expect("type checked upstream"); } subclass::Property("max-size-bytes", ..) => { - let mut settings = block_on!(self.settings.lock()); + let mut settings = block_on(self.settings.lock()); settings.max_size_bytes = value.get_some().expect("type checked upstream"); } subclass::Property("max-size-time", ..) => { - let mut settings = block_on!(self.settings.lock()); + let mut settings = block_on(self.settings.lock()); settings.max_size_time = value.get_some().expect("type checked upstream"); } subclass::Property("context", ..) => { - let mut settings = block_on!(self.settings.lock()); + let mut settings = block_on(self.settings.lock()); settings.context = value .get() .expect("type checked upstream") .unwrap_or_else(|| "".into()); } subclass::Property("context-wait", ..) => { - let mut settings = block_on!(self.settings.lock()); + let mut settings = block_on(self.settings.lock()); settings.context_wait = value.get_some().expect("type checked upstream"); } _ => unimplemented!(), @@ -843,23 +843,23 @@ impl ObjectImpl for Queue { match *prop { subclass::Property("max-size-buffers", ..) => { - let settings = block_on!(self.settings.lock()); + let settings = block_on(self.settings.lock()); Ok(settings.max_size_buffers.to_value()) } subclass::Property("max-size-bytes", ..) => { - let settings = block_on!(self.settings.lock()); + let settings = block_on(self.settings.lock()); Ok(settings.max_size_bytes.to_value()) } subclass::Property("max-size-time", ..) => { - let settings = block_on!(self.settings.lock()); + let settings = block_on(self.settings.lock()); Ok(settings.max_size_time.to_value()) } subclass::Property("context", ..) => { - let settings = block_on!(self.settings.lock()); + let settings = block_on(self.settings.lock()); Ok(settings.context.to_value()) } subclass::Property("context-wait", ..) => { - let settings = block_on!(self.settings.lock()); + let settings = block_on(self.settings.lock()); Ok(settings.context_wait.to_value()) } _ => unimplemented!(), @@ -885,16 +885,16 @@ impl ElementImpl for Queue { match transition { gst::StateChange::NullToReady => { - block_on!(self.prepare(element)).map_err(|err| { + block_on(self.prepare(element)).map_err(|err| { element.post_error_message(&err); gst::StateChangeError })?; } gst::StateChange::PausedToReady => { - block_on!(self.stop(element)).map_err(|_| gst::StateChangeError)?; + block_on(self.stop(element)).map_err(|_| gst::StateChangeError)?; } gst::StateChange::ReadyToNull => { - block_on!(self.unprepare(element)).map_err(|_| gst::StateChangeError)?; + block_on(self.unprepare(element)).map_err(|_| gst::StateChangeError)?; } _ => (), } @@ -902,7 +902,7 @@ impl ElementImpl for Queue { let success = self.parent_change_state(element, transition)?; if transition == gst::StateChange::ReadyToPaused { - block_on!(self.start(element)).map_err(|_| gst::StateChangeError)?; + block_on(self.start(element)).map_err(|_| gst::StateChangeError)?; } Ok(success) diff --git a/gst-plugin-threadshare/src/runtime/executor.rs b/gst-plugin-threadshare/src/runtime/executor.rs index 2f24fef5..bab65546 100644 --- a/gst-plugin-threadshare/src/runtime/executor.rs +++ b/gst-plugin-threadshare/src/runtime/executor.rs @@ -36,10 +36,9 @@ //! [`PadSrc`]: struct.PadSrc.html //! [`PadSink`]: struct.PadSink.html -use futures::channel::mpsc as future_mpsc; +use futures::channel::oneshot; use futures::future::BoxFuture; use futures::prelude::*; -use futures::ready; use futures::stream::futures_unordered::FuturesUnordered; use glib; @@ -50,19 +49,13 @@ use gst::{gst_debug, gst_log, gst_trace}; use lazy_static::lazy_static; -use std::cmp; -use std::collections::{BinaryHeap, HashMap}; +use std::collections::HashMap; use std::io; use std::mem; -use std::pin::Pin; use std::sync::mpsc as sync_mpsc; -use std::sync::{atomic, Arc, Mutex, Weak}; -use std::task::Poll; +use std::sync::{Arc, Mutex, Weak}; use std::thread; -use std::time::{Duration, Instant}; - -use tokio_executor::current_thread as tokio_current_thread; -use tokio_executor::park::Unpark; +use std::time::Duration; use super::RUNTIME_CAT; @@ -82,170 +75,52 @@ lazy_static! { struct ContextThread { name: String, - shutdown: Arc, } impl ContextThread { - fn start( - name: &str, - wait: u32, - reactor: tokio_net::driver::Reactor, - timers: Arc>>, - ) -> (tokio_current_thread::Handle, ContextShutdown) { - let handle = reactor.handle(); - let shutdown = Arc::new(atomic::AtomicBool::new(false)); - let shutdown_clone = shutdown.clone(); + fn start(name: &str, wait: u32) -> (tokio::runtime::Handle, ContextShutdown) { let name_clone = name.into(); - let mut context_thread = ContextThread { - shutdown: shutdown_clone, - name: name_clone, - }; + let mut context_thread = ContextThread { name: name_clone }; - let (sender, receiver) = sync_mpsc::channel(); + let (handle_sender, handle_receiver) = sync_mpsc::channel(); + let (shutdown_sender, shutdown_receiver) = oneshot::channel(); let join = thread::spawn(move || { - context_thread.spawn(wait, reactor, sender, timers); + context_thread.spawn(wait, handle_sender, shutdown_receiver); }); + let handle = handle_receiver.recv().expect("Context thread init failed"); + let shutdown = ContextShutdown { name: name.into(), - shutdown, - handle, + shutdown: Some(shutdown_sender), join: Some(join), }; - let thread_handle = receiver.recv().expect("Context thread init failed"); - - (thread_handle, shutdown) + (handle, shutdown) } fn spawn( &mut self, wait: u32, - reactor: tokio_net::driver::Reactor, - sender: sync_mpsc::Sender, - timers: Arc>>, + handle_sender: sync_mpsc::Sender, + shutdown_receiver: oneshot::Receiver<()>, ) { gst_debug!(RUNTIME_CAT, "Started context thread '{}'", self.name); - let wait = Duration::from_millis(wait as u64); + let mut runtime = tokio::runtime::Builder::new() + .basic_scheduler() + .enable_all() + .max_throttling(Duration::from_millis(wait as u64)) + .build() + .expect("Couldn't build the runtime"); - let handle = reactor.handle(); - let timer = tokio_timer::Timer::new(reactor); - let timer_handle = timer.handle(); - - let mut current_thread = tokio_current_thread::CurrentThread::new_with_park(timer); - - sender - .send(current_thread.handle()) + handle_sender + .send(runtime.handle().clone()) .expect("Couldn't send context thread handle"); - let _timer_guard = tokio_timer::set_default(&timer_handle); - let _reactor_guard = tokio_net::driver::set_default(&handle); - - let mut now = Instant::now(); - - loop { - if self.shutdown.load(atomic::Ordering::SeqCst) { - gst_debug!(RUNTIME_CAT, "Shutting down loop"); - break; - } - - gst_trace!(RUNTIME_CAT, "Elapsed {:?} since last loop", now.elapsed()); - - // Handle timers - { - // Trigger all timers that would be expired before the middle of the loop wait - // time - let timer_threshold = now + wait / 2; - let mut timers = timers.lock().unwrap(); - while timers - .peek() - .and_then(|entry| { - if entry.time < timer_threshold { - Some(()) - } else { - None - } - }) - .is_some() - { - let TimerEntry { - time, - interval, - sender, - .. - } = timers.pop().unwrap(); - - if sender.is_closed() { - continue; - } - - let _ = sender.unbounded_send(()); - if let Some(interval) = interval { - timers.push(TimerEntry { - time: time + interval, - id: TIMER_ENTRY_ID.fetch_add(1, atomic::Ordering::Relaxed), - interval: Some(interval), - sender, - }); - } - } - } - - gst_trace!(RUNTIME_CAT, "Turning thread '{}'", self.name); - while current_thread - .turn(Some(Duration::from_millis(0))) - .unwrap() - .has_polled() - {} - gst_trace!(RUNTIME_CAT, "Turned thread '{}'", self.name); - - // We have to check again after turning in case we're supposed to shut down now - // and already handled the unpark above - if self.shutdown.load(atomic::Ordering::SeqCst) { - gst_debug!(RUNTIME_CAT, "Shutting down loop"); - break; - } - - let elapsed = now.elapsed(); - gst_trace!(RUNTIME_CAT, "Elapsed {:?} after handling futures", elapsed); - - if wait == Duration::from_millis(0) { - let timers = timers.lock().unwrap(); - let wait = match timers.peek().map(|entry| entry.time) { - None => None, - Some(time) => Some({ - let tmp = Instant::now(); - - if time < tmp { - Duration::from_millis(0) - } else { - time.duration_since(tmp) - } - }), - }; - drop(timers); - - gst_trace!(RUNTIME_CAT, "Sleeping for up to {:?}", wait); - current_thread.turn(wait).unwrap(); - gst_trace!(RUNTIME_CAT, "Slept for {:?}", now.elapsed()); - now = Instant::now(); - } else { - if elapsed < wait { - gst_trace!( - RUNTIME_CAT, - "Waiting for {:?} before polling again", - wait - elapsed - ); - thread::sleep(wait - elapsed); - gst_trace!(RUNTIME_CAT, "Slept for {:?}", now.elapsed()); - } - - now += wait; - } - } + let _ = runtime.block_on(shutdown_receiver); } } @@ -258,8 +133,7 @@ impl Drop for ContextThread { #[derive(Debug)] struct ContextShutdown { name: String, - shutdown: Arc, - handle: tokio_net::driver::Handle, + shutdown: Option>, join: Option>, } @@ -270,16 +144,13 @@ impl Drop for ContextShutdown { "Shutting down context thread thread '{}'", self.name ); - self.shutdown.store(true, atomic::Ordering::SeqCst); + self.shutdown.take().unwrap(); + gst_trace!( RUNTIME_CAT, "Waiting for context thread '{}' to shutdown", self.name ); - // After being unparked, the next turn() is guaranteed to finish immediately, - // as such there is no race condition between checking for shutdown and setting - // shutdown. - self.handle.unpark(); let _ = self.join.take().unwrap().join(); } } @@ -301,9 +172,7 @@ type TaskQueue = FuturesUnordered>; #[derive(Debug)] struct ContextInner { name: String, - thread_handle: Mutex, - reactor_handle: tokio_net::driver::Handle, - timers: Arc>>, + handle: Mutex, // Only used for dropping _shutdown: ContextShutdown, task_queues: Mutex<(u64, HashMap)>, @@ -352,19 +221,11 @@ impl Context { } } - let reactor = tokio_net::driver::Reactor::new()?; - let reactor_handle = reactor.handle(); - - let timers = Arc::new(Mutex::new(BinaryHeap::new())); - - let (thread_handle, shutdown) = - ContextThread::start(context_name, wait, reactor, timers.clone()); + let (handle, shutdown) = ContextThread::start(context_name, wait); let context = Context(Arc::new(ContextInner { name: context_name.into(), - thread_handle: Mutex::new(thread_handle), - reactor_handle, - timers, + handle: Mutex::new(handle), _shutdown: shutdown, task_queues: Mutex::new((0, HashMap::new())), })); @@ -391,15 +252,12 @@ impl Context { self.0.name.as_str() } - pub fn reactor_handle(&self) -> &tokio_net::driver::Handle { - &self.0.reactor_handle - } - - pub fn spawn(&self, future: Fut) + pub fn spawn(&self, future: Fut) -> tokio::task::JoinHandle where - Fut: Future + Send + 'static, + Fut: Future + Send + 'static, + Fut::Output: Send + 'static, { - self.0.thread_handle.lock().unwrap().spawn(future).unwrap(); + self.0.handle.lock().unwrap().spawn(future) } pub fn release_task_queue(&self, id: TaskQueueId) -> Option { @@ -451,169 +309,59 @@ impl Context { } } - pub fn add_timer( - &self, - time: Instant, - interval: Option, - ) -> future_mpsc::UnboundedReceiver<()> { - let (sender, receiver) = future_mpsc::unbounded(); - - let mut timers = self.0.timers.lock().unwrap(); - let entry = TimerEntry { - time, - id: TIMER_ENTRY_ID.fetch_add(1, atomic::Ordering::Relaxed), - interval, - sender, - }; - - timers.push(entry); - self.0.reactor_handle.unpark(); - - receiver - } - - pub fn new_interval(&self, interval: Duration) -> Interval { - Interval::new(&self, interval) - } - /// Builds a `Future` to execute an `action` at [`Interval`]s. /// /// [`Interval`]: struct.Interval.html - pub fn interval(&self, interval: Duration, f: F) -> impl Future + pub fn interval(&self, interval: Duration, f: F) -> impl Future where F: Fn() -> Fut + Send + Sync + 'static, - Fut: Future> + Send + 'static, + E: Send + 'static, + Fut: Future> + Send + 'static, { - let f = Arc::new(f); - self.new_interval(interval).try_for_each(move |_| { - let f = Arc::clone(&f); - f() - }) - } - - pub fn new_timeout(&self, timeout: Duration) -> Timeout { - Timeout::new(&self, timeout) + async move { + let mut interval = tokio::time::interval(interval); + loop { + interval.tick().await; + if let Err(err) = f().await { + break Err(err); + } + } + } } /// Builds a `Future` to execute an action after the given `delay` has elapsed. pub fn delay_for(&self, delay: Duration, f: F) -> impl Future where F: FnOnce() -> Fut + Send + Sync + 'static, - Fut: Future + Send + 'static, + Fut: Future + Send + 'static, { - self.new_timeout(delay).then(move |_| f()) - } -} - -static TIMER_ENTRY_ID: atomic::AtomicUsize = atomic::AtomicUsize::new(0); - -// Ad-hoc interval timer implementation for our throttled event loop above -#[derive(Debug)] -struct TimerEntry { - time: Instant, - id: usize, // for producing a total order - interval: Option, - sender: future_mpsc::UnboundedSender<()>, -} - -impl PartialEq for TimerEntry { - fn eq(&self, other: &Self) -> bool { - self.time.eq(&other.time) && self.id.eq(&other.id) - } -} - -impl Eq for TimerEntry {} - -impl PartialOrd for TimerEntry { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(&other)) - } -} - -impl Ord for TimerEntry { - fn cmp(&self, other: &Self) -> cmp::Ordering { - other - .time - .cmp(&self.time) - .then_with(|| other.id.cmp(&self.id)) - } -} - -/// A `Stream` that yields a tick at `interval`s. -#[derive(Debug)] -pub struct Interval { - receiver: future_mpsc::UnboundedReceiver<()>, -} - -impl Interval { - fn new(context: &Context, interval: Duration) -> Self { - Self { - receiver: context.add_timer(Instant::now(), Some(interval)), - } - } -} - -impl Stream for Interval { - type Item = Result<(), ()>; - - fn poll_next( - mut self: Pin<&mut Self>, - cx: &mut std::task::Context, - ) -> Poll> { - self.receiver - .poll_next_unpin(cx) - .map(|item_opt| item_opt.map(Ok)) - } -} - -/// A `Future` that completes after a `timeout` is elapsed. -#[derive(Debug)] -pub struct Timeout { - receiver: future_mpsc::UnboundedReceiver<()>, -} - -impl Timeout { - fn new(context: &Context, timeout: Duration) -> Self { - Self { - receiver: context.add_timer(Instant::now() + timeout, None), - } - } -} - -impl Future for Timeout { - type Output = (); - - fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context) -> Poll { - match ready!(self.receiver.poll_next_unpin(cx)) { - Some(_) => Poll::Ready(()), - None => unreachable!(), + async move { + tokio::time::delay_for(delay).await; + f().await } } } #[cfg(test)] mod tests { - use futures::channel::{mpsc, oneshot}; - use futures::future::Aborted; + use futures::channel::mpsc; + use futures::future::abortable; use futures::lock::Mutex; use gst; use std::sync::Arc; - use std::time::{Duration, Instant}; - - use crate::block_on; - use crate::runtime::future::abortable_waitable; + use std::time::Instant; use super::*; type Item = i32; const SLEEP_DURATION: u32 = 2; - const INTERVAL: Duration = Duration::from_millis(100 * SLEEP_DURATION as u64); + const INTERVAL: Duration = std::time::Duration::from_millis(100 * SLEEP_DURATION as u64); - #[test] - fn user_drain_pending_tasks() { + #[tokio::test] + async fn user_drain_pending_tasks() { // Setup gst::init().unwrap(); @@ -649,15 +397,15 @@ mod tests { // User triggered drain receiver.try_next().unwrap_err(); - block_on!(drain).unwrap(); + drain.await.unwrap(); assert_eq!(receiver.try_next().unwrap(), Some(0)); add_task(1).unwrap(); receiver.try_next().unwrap_err(); } - #[test] - fn delay_for() { + #[tokio::test] + async fn delay_for() { gst::init().unwrap(); let context = Context::acquire("delay_for", SLEEP_DURATION).unwrap(); @@ -665,41 +413,21 @@ mod tests { let (sender, receiver) = oneshot::channel(); let start = Instant::now(); - let delayed_by_fut = context.delay_for(INTERVAL, move || async { - sender.send(42).unwrap(); + let delayed_by_fut = context.delay_for(INTERVAL, move || { + async { + sender.send(42).unwrap(); + } }); context.spawn(delayed_by_fut); - let _ = block_on!(receiver).unwrap(); + let _ = receiver.await.unwrap(); let delta = Instant::now() - start; assert!(delta >= INTERVAL); assert!(delta < INTERVAL * 2); } - #[test] - fn delay_for_abort() { - gst::init().unwrap(); - - let context = Context::acquire("delay_for_abort", SLEEP_DURATION).unwrap(); - - let (sender, receiver) = oneshot::channel(); - - let delay_for_fut = context.delay_for(INTERVAL, move || async { - sender.send(42).unwrap(); - }); - let (abortable_delay_for, abort_handle) = abortable_waitable(delay_for_fut); - context.spawn(abortable_delay_for.map(move |res| { - if let Err(Aborted) = res { - gst_debug!(RUNTIME_CAT, "Aborted delay_for"); - } - })); - - block_on!(abort_handle.abort_and_wait()).unwrap(); - block_on!(receiver).unwrap_err(); - } - - #[test] - fn interval_ok() { + #[tokio::test] + async fn interval_ok() { gst::init().unwrap(); let context = Context::acquire("interval_ok", SLEEP_DURATION).unwrap(); @@ -707,37 +435,36 @@ mod tests { let (sender, mut receiver) = mpsc::channel(1); let sender: Arc>> = Arc::new(Mutex::new(sender)); - let interval_fut = context.interval(INTERVAL, move || { + let (interval_fut, handle) = abortable(context.interval(INTERVAL, move || { let sender = Arc::clone(&sender); async move { let instant = Instant::now(); sender.lock().await.send(instant).await.map_err(drop) } - }); + })); context.spawn(interval_fut.map(drop)); - block_on!(async { - let mut idx: u32 = 0; - let mut first = Instant::now(); - while let Some(instant) = receiver.next().await { - if idx > 0 { - let delta = instant - first; - assert!(delta > INTERVAL * (idx - 1)); - assert!(delta < INTERVAL * (idx + 1)); - } else { - first = instant; - } - if idx == 3 { - break; - } - - idx += 1; + let mut idx: u32 = 0; + let mut first = Instant::now(); + while let Some(instant) = receiver.next().await { + if idx > 0 { + let delta = instant - first; + assert!(delta > INTERVAL * (idx - 1)); + assert!(delta < INTERVAL * (idx + 1)); + } else { + first = instant; } - }); + if idx == 3 { + handle.abort(); + break; + } + + idx += 1; + } } - #[test] - fn interval_err() { + #[tokio::test] + async fn interval_err() { gst::init().unwrap(); let context = Context::acquire("interval_err", SLEEP_DURATION).unwrap(); @@ -763,68 +490,20 @@ mod tests { }); context.spawn(interval_fut.map(drop)); - block_on!(async { - let mut idx: u32 = 0; - let mut first = Instant::now(); - while let Some(instant) = receiver.next().await { - if idx > 0 { - let delta = instant - first; - assert!(delta > INTERVAL * (idx - 1)); - assert!(delta < INTERVAL * (idx + 1)); - } else { - first = instant; - } - - idx += 1; + let mut idx: u32 = 0; + let mut first = Instant::now(); + while let Some(instant) = receiver.next().await { + if idx > 0 { + let delta = instant - first; + assert!(delta > INTERVAL * (idx - 1)); + assert!(delta < INTERVAL * (idx + 1)); + } else { + first = instant; } - assert_eq!(idx, 3); - }); - } + idx += 1; + } - #[test] - fn interval_abort() { - gst::init().unwrap(); - - let context = Context::acquire("interval_abort", SLEEP_DURATION).unwrap(); - - let (sender, mut receiver) = mpsc::channel(1); - let sender: Arc>> = Arc::new(Mutex::new(sender)); - - let interval_fut = context.interval(INTERVAL, move || { - let sender = Arc::clone(&sender); - async move { - let instant = Instant::now(); - sender.lock().await.send(instant).await.map_err(drop) - } - }); - let (abortable_interval, abort_handle) = abortable_waitable(interval_fut); - context.spawn(abortable_interval.map(move |res| { - if let Err(Aborted) = res { - gst_debug!(RUNTIME_CAT, "Aborted timeout"); - } - })); - - block_on!(async { - let mut idx: u32 = 0; - let mut first = Instant::now(); - while let Some(instant) = receiver.next().await { - if idx > 0 { - let delta = instant - first; - assert!(delta > INTERVAL * (idx - 1)); - assert!(delta < INTERVAL * (idx + 1)); - } else { - first = instant; - } - if idx == 3 { - abort_handle.abort_and_wait().await.unwrap(); - break; - } - - idx += 1; - } - - assert_eq!(receiver.next().await, None); - }); + assert_eq!(idx, 3); } } diff --git a/gst-plugin-threadshare/src/runtime/future/abortable_waitable.rs b/gst-plugin-threadshare/src/runtime/future/abortable_waitable.rs deleted file mode 100644 index b20205a0..00000000 --- a/gst-plugin-threadshare/src/runtime/future/abortable_waitable.rs +++ /dev/null @@ -1,198 +0,0 @@ -// Copyright (C) 2019 François Laignel -// -// This library is free software; you can redistribute it and/or -// modify it under the terms of the GNU Library General Public -// License as published by the Free Software Foundation; either -// version 2 of the License, or (at your option) any later version. -// -// This library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -// Library General Public License for more details. -// -// You should have received a copy of the GNU Library General Public -// License along with this library; if not, write to the -// Free Software Foundation, Inc., 51 Franklin Street, Suite 500, -// Boston, MA 02110-1335, USA. - -use futures::future::{self, AbortHandle, Abortable}; -use futures::prelude::*; - -use super::{waitable, WaitError, WaitHandle, Waitable}; - -pub type AbortableWaitable = Waitable>; - -/// Builds an [`Abortable`] and [`Waitable`] `Future` from the provided `Future`. -/// -/// See [`AbortWaitHandle`]. -/// -/// [`Abortable`]: https://rust-lang-nursery.github.io/futures-api-docs/0.3.0-alpha.19/futures/future/struct.Abortable.html -/// [`Waitable`]: struct.Waitable.html -/// [`AbortWaitHandle`]: struct.AbortWaitHandle.html -pub fn abortable_waitable(future: Fut) -> (AbortableWaitable, AbortWaitHandle) { - let (abortable, abort_handle) = future::abortable(future); - let (abortable_waitable, wait_handle) = waitable(abortable); - - ( - abortable_waitable, - AbortWaitHandle::new(abort_handle, wait_handle), - ) -} - -/// A handle to an [`Abortable`] and [`Waitable`] `Future`. -/// -/// The handle allows checking for the `Future` state, canceling the `Future` and waiting until -/// the `Future` completes. -/// -/// [`Abortable`]: https://rust-lang-nursery.github.io/futures-api-docs/0.3.0-alpha.19/futures/future/struct.Abortable.html -/// [`Waitable`]: struct.Waitable.html -#[derive(Debug)] -pub struct AbortWaitHandle { - abort_handle: AbortHandle, - wait_handle: WaitHandle, -} - -impl AbortWaitHandle { - fn new(abort_handle: AbortHandle, wait_handle: WaitHandle) -> Self { - AbortWaitHandle { - abort_handle, - wait_handle, - } - } - - pub fn is_terminated(&mut self) -> bool { - self.wait_handle.is_terminated() - } - - pub fn is_cancelled(&mut self) -> bool { - self.wait_handle.is_cancelled() - } - - pub async fn wait(self) -> Result<(), WaitError> { - self.wait_handle.wait().await - } - - pub fn abort(&self) { - self.abort_handle.abort(); - } - - pub async fn abort_and_wait(mut self) -> Result<(), WaitError> { - if self.wait_handle.is_terminated() { - if self.wait_handle.is_cancelled() { - return Err(WaitError::Cancelled); - } - return Ok(()); - } - - self.abort_handle.abort(); - self.wait_handle.wait().await - } -} - -#[cfg(test)] -mod tests { - use super::*; - use futures::channel::{mpsc, oneshot}; - - #[derive(Debug, PartialEq)] - enum State { - Released, - Terminated, - Triggered, - } - - #[tokio::test] - async fn abort_wait_async_non_blocking_task() { - let (trigger_sender, trigger_receiver) = oneshot::channel::<()>(); - let (_release_sender, release_receiver) = oneshot::channel::<()>(); - - let (mut state_sender_abrt, mut state_receiver_abrt) = mpsc::channel(1); - let (shared, mut handle) = abortable_waitable(async move { - let _ = trigger_receiver.await; - state_sender_abrt.send(State::Triggered).await.unwrap(); - - let _ = release_receiver.await; - state_sender_abrt.send(State::Released).await.unwrap(); - }); - - let (mut state_sender_spawn, mut state_receiver_spawn) = mpsc::channel(1); - tokio::spawn(async move { - let _ = shared.await; - state_sender_spawn.send(State::Terminated).await.unwrap(); - }); - - drop(trigger_sender); - - assert_eq!(state_receiver_abrt.next().await, Some(State::Triggered)); - assert!(!handle.is_terminated()); - - assert_eq!(handle.abort_and_wait().await, Ok(())); - - assert_eq!(state_receiver_spawn.next().await, Some(State::Terminated)); - assert_eq!(state_receiver_abrt.next().await, None); - } - - #[tokio::test] - async fn abort_wait_blocking_task() { - let (trigger_sender, trigger_receiver) = oneshot::channel::<()>(); - let (release_sender, release_receiver) = oneshot::channel::<()>(); - - let (mut state_sender_abrt, mut state_receiver_abrt) = mpsc::channel(1); - let (shared, mut handle) = abortable_waitable(async move { - let _ = trigger_receiver.await; - state_sender_abrt.send(State::Triggered).await.unwrap(); - - let _ = release_receiver.await; - state_sender_abrt.send(State::Released).await.unwrap(); - }); - - let (mut state_sender_spawn, mut state_receiver_spawn) = mpsc::channel(1); - tokio::spawn(async move { - let _ = shared.await; - state_sender_spawn.send(State::Terminated).await.unwrap(); - }); - - drop(trigger_sender); - assert_eq!(state_receiver_abrt.next().await, Some(State::Triggered)); - assert!(!handle.is_terminated()); - - drop(release_sender); - assert_eq!(state_receiver_abrt.next().await, Some(State::Released)); - - assert_eq!(handle.abort_and_wait().await, Ok(())); - - assert_eq!(state_receiver_spawn.next().await, Some(State::Terminated)); - assert_eq!(state_receiver_abrt.next().await, None); - } - - #[tokio::test] - async fn abort_only() { - let (trigger_sender, trigger_receiver) = oneshot::channel::<()>(); - let (_release_sender, release_receiver) = oneshot::channel::<()>(); - - let (mut state_sender_abrt, mut state_receiver_abrt) = mpsc::channel(1); - let (shared, mut handle) = abortable_waitable(async move { - let _ = trigger_receiver.await; - state_sender_abrt.send(State::Triggered).await.unwrap(); - - let _ = release_receiver.await; - state_sender_abrt.send(State::Released).await.unwrap(); - }); - - let (mut state_sender_spawn, mut state_receiver_spawn) = mpsc::channel(1); - tokio::spawn(async move { - let _ = shared.await; - state_sender_spawn.send(State::Terminated).await.unwrap(); - }); - - assert!(!handle.is_terminated()); - drop(trigger_sender); - - assert_eq!(state_receiver_abrt.next().await, Some(State::Triggered)); - assert!(!handle.is_terminated()); - - handle.abort(); - - assert_eq!(state_receiver_spawn.next().await, Some(State::Terminated)); - } -} diff --git a/gst-plugin-threadshare/src/runtime/future/mod.rs b/gst-plugin-threadshare/src/runtime/future/mod.rs deleted file mode 100644 index dd4b2bce..00000000 --- a/gst-plugin-threadshare/src/runtime/future/mod.rs +++ /dev/null @@ -1,24 +0,0 @@ -// Copyright (C) 2019 François Laignel -// -// This library is free software; you can redistribute it and/or -// modify it under the terms of the GNU Library General Public -// License as published by the Free Software Foundation; either -// version 2 of the License, or (at your option) any later version. -// -// This library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -// Library General Public License for more details. -// -// You should have received a copy of the GNU Library General Public -// License along with this library; if not, write to the -// Free Software Foundation, Inc., 51 Franklin Street, Suite 500, -// Boston, MA 02110-1335, USA. - -//! `Future`s combinators which help implementing statefull asynchronous `Processor`s. - -mod abortable_waitable; -pub use abortable_waitable::{abortable_waitable, AbortWaitHandle, AbortableWaitable}; - -mod waitable; -pub use waitable::{waitable, WaitError, WaitHandle, Waitable}; diff --git a/gst-plugin-threadshare/src/runtime/future/waitable.rs b/gst-plugin-threadshare/src/runtime/future/waitable.rs deleted file mode 100644 index b22fc1e3..00000000 --- a/gst-plugin-threadshare/src/runtime/future/waitable.rs +++ /dev/null @@ -1,224 +0,0 @@ -// Copyright (C) 2019 François Laignel -// -// This library is free software; you can redistribute it and/or -// modify it under the terms of the GNU Library General Public -// License as published by the Free Software Foundation; either -// version 2 of the License, or (at your option) any later version. -// -// This library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -// Library General Public License for more details. -// -// You should have received a copy of the GNU Library General Public -// License along with this library; if not, write to the -// Free Software Foundation, Inc., 51 Franklin Street, Suite 500, -// Boston, MA 02110-1335, USA. - -use futures::channel::oneshot; -use futures::prelude::*; -use futures::ready; - -use pin_project::pin_project; - -use std::pin::Pin; -use std::task::{self, Poll}; - -/// Builds a [`Waitable`] `Future` from the provided `Future`. -/// -/// See [`WaitHandle`]. -/// -/// [`Waitable`]: struct.Waitable.html -/// [`WaitHandle`]: struct.WaitHandle.html -pub fn waitable(future: Fut) -> (Waitable, WaitHandle) { - Waitable::new(future) -} - -/// A `Waitable` `Future`. -/// -/// See [`WaitHandle`]. -/// -/// [`WaitHandle`]: struct.WaitHandle.html -#[pin_project] -#[derive(Debug)] -pub struct Waitable { - #[pin] - inner: Fut, - sender: Option>, -} - -impl Waitable { - pub fn new(inner: Fut) -> (Waitable, WaitHandle) { - let (sender, receiver) = oneshot::channel(); - - ( - Waitable { - inner, - sender: Some(sender), - }, - WaitHandle::new(receiver), - ) - } -} - -impl Future for Waitable { - type Output = Fut::Output; - - fn poll(self: Pin<&mut Self>, cx: &mut task::Context) -> Poll { - let this = self.project(); - let output = ready!(this.inner.poll(cx)); - if let Some(sender) = this.sender.take() { - let _ = sender.send(()); - } - Poll::Ready(output) - } -} - -#[derive(Debug, Eq, PartialEq)] -pub enum WaitError { - Cancelled, -} - -/// A handle to a [`Waitable`] `Future`. -/// -/// The handle allows checking for the `Future` completion and waiting until the `Future` -/// completes. -/// -/// [`Waitable`]: struct.Waitable.html -#[derive(Debug)] -pub struct WaitHandle { - receiver: oneshot::Receiver<()>, - is_terminated: bool, - is_cancelled: bool, -} - -impl WaitHandle { - fn new(receiver: oneshot::Receiver<()>) -> WaitHandle { - WaitHandle { - receiver, - is_terminated: false, - is_cancelled: false, - } - } - - pub fn is_terminated(&mut self) -> bool { - self.check_state(); - self.is_terminated - } - - pub fn is_cancelled(&mut self) -> bool { - self.check_state(); - self.is_cancelled - } - - pub async fn wait(self) -> Result<(), WaitError> { - if self.is_terminated { - if self.is_cancelled { - return Err(WaitError::Cancelled); - } - return Ok(()); - } - - self.receiver.await.map_err(|_| WaitError::Cancelled) - } - - fn check_state(&mut self) { - if self.is_terminated { - // no need to check state - return; - } - - let res = self.receiver.try_recv(); - - match res { - Ok(None) => (), - Ok(Some(())) => self.is_terminated = true, - Err(_) => { - self.is_terminated = true; - self.is_cancelled = true; - } - } - } -} - -#[cfg(test)] -mod tests { - use futures::channel::{mpsc, oneshot}; - use futures::future; - - use std::time::Duration; - - use tokio::future::FutureExt; - - use super::*; - - #[derive(Debug, PartialEq)] - enum State { - Released, - Terminated, - Triggered, - } - - #[tokio::test] - async fn wait() { - let (trigger_sender, trigger_receiver) = oneshot::channel::<()>(); - - let (mut state_sender_wait, mut state_receiver_wait) = mpsc::channel(1); - let (shared, mut handle) = waitable(async move { - let _ = trigger_receiver.await; - state_sender_wait.send(State::Triggered).await.unwrap(); - let _ = future::pending::<()>() - .timeout(Duration::from_secs(1)) - .await; - state_sender_wait.send(State::Released).await.unwrap(); - }); - - let (mut state_sender_spawn, mut state_receiver_spawn) = mpsc::channel(1); - tokio::spawn(async move { - let _ = shared.await; - state_sender_spawn.send(State::Terminated).await.unwrap(); - }); - - drop(trigger_sender); - - assert_eq!(state_receiver_wait.next().await, Some(State::Triggered)); - assert!(!handle.is_terminated()); - - assert_eq!(handle.wait().await, Ok(())); - assert_eq!(state_receiver_wait.next().await, Some(State::Released)); - - assert_eq!(state_receiver_spawn.next().await, Some(State::Terminated)); - assert_eq!(state_receiver_wait.next().await, None); - } - - #[tokio::test] - async fn no_wait() { - let (trigger_sender, trigger_receiver) = oneshot::channel::<()>(); - - let (mut state_sender_wait, mut state_receiver_wait) = mpsc::channel(1); - let (shared, mut handle) = waitable(async move { - let _ = trigger_receiver.await; - state_sender_wait.send(State::Triggered).await.unwrap(); - state_sender_wait.send(State::Released).await.unwrap(); - }); - - let (mut state_sender_spawn, mut state_receiver_spawn) = mpsc::channel(1); - tokio::spawn(async move { - let _ = shared.await; - state_sender_spawn.send(State::Terminated).await.unwrap(); - }); - - drop(trigger_sender); - - assert_eq!(state_receiver_wait.next().await, Some(State::Triggered)); - assert!(!handle.is_terminated()); - - assert_eq!(state_receiver_wait.next().await, Some(State::Released)); - - assert_eq!(state_receiver_spawn.next().await, Some(State::Terminated)); - assert_eq!(state_receiver_wait.next().await, None); - - assert!(handle.is_terminated()); - assert!(!handle.is_cancelled()); - } -} diff --git a/gst-plugin-threadshare/src/runtime/macros.rs b/gst-plugin-threadshare/src/runtime/macros.rs deleted file mode 100644 index c1fb1c7d..00000000 --- a/gst-plugin-threadshare/src/runtime/macros.rs +++ /dev/null @@ -1,25 +0,0 @@ -// Copyright (C) 2019 François Laignel -// -// This library is free software; you can redistribute it and/or -// modify it under the terms of the GNU Library General Public -// License as published by the Free Software Foundation; either -// version 2 of the License, or (at your option) any later version. -// -// This library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -// Library General Public License for more details. -// -// You should have received a copy of the GNU Library General Public -// License along with this library; if not, write to the -// Free Software Foundation, Inc., 51 Franklin Street, Suite 500, -// Boston, MA 02110-1335, USA. - -//! A set of `macro`s to ease the implementation of asynchronous processings. - -#[macro_export] -macro_rules! block_on { - ($future:expr) => { - $crate::tokio_executor::current_thread::CurrentThread::new().block_on($future) - }; -} diff --git a/gst-plugin-threadshare/src/runtime/mod.rs b/gst-plugin-threadshare/src/runtime/mod.rs index d01e6db3..881dfd0a 100644 --- a/gst-plugin-threadshare/src/runtime/mod.rs +++ b/gst-plugin-threadshare/src/runtime/mod.rs @@ -28,7 +28,8 @@ //! The `threadshare` `runtime` is a framework to build `Element`s for such applications. It //! uses light-weight threading to allow multiple `Element`s share a reduced number of OS `thread`s. //! -//! See this [talk] ([slides]) for a presentation of the motivations and principles. +//! See this [talk] ([slides]) for a presentation of the motivations and principles, +//! and this [blog post]. //! //! Current implementation uses the crate [`tokio`]. //! @@ -37,17 +38,13 @@ //! //! [talk]: https://gstconf.ubicast.tv/videos/when-adding-more-threads-adds-more-problems-thread-sharing-between-elements-in-gstreamer/ //! [slides]: https://gstreamer.freedesktop.org/data/events/gstreamer-conference/2018/Sebastian%20Dr%C3%B6ge%20-%20When%20adding%20more%20threads%20adds%20more%20problems:%20Thread-sharing%20between%20elements%20in%20GStreamer.pdf +//! [blog post]: https://coaxion.net/blog/2018/04/improving-gstreamer-performance-on-a-high-number-of-network-streams-by-sharing-threads-between-elements-with-rusts-tokio-crate //! [`tokio`]: https://crates.io/crates/tokio //! [`PadSrc`]: pad/struct.PadSrc.html //! [`PadSink`]: pad/struct.PadSink.html pub mod executor; -pub use executor::{Context, Interval, TaskOutput, Timeout}; - -pub mod future; - -#[macro_use] -pub mod macros; +pub use executor::{Context, TaskOutput}; pub mod pad; pub use pad::{PadSink, PadSinkRef, PadSrc, PadSrcRef, PadSrcWeak}; diff --git a/gst-plugin-threadshare/src/runtime/pad.rs b/gst-plugin-threadshare/src/runtime/pad.rs index 86203ebe..9c5c67f0 100644 --- a/gst-plugin-threadshare/src/runtime/pad.rs +++ b/gst-plugin-threadshare/src/runtime/pad.rs @@ -65,6 +65,7 @@ use either::Either; +use futures::executor::block_on; use futures::future; use futures::future::BoxFuture; use futures::lock::{Mutex, MutexGuard}; @@ -81,8 +82,6 @@ use std::marker::PhantomData; use std::sync; use std::sync::{Arc, Weak}; -use crate::block_on; - use super::executor::Context; use super::pad_context::{PadContext, PadContextRef, PadContextWeak}; use super::task::Task; @@ -400,7 +399,7 @@ impl<'a> PadSrcRef<'a> { } if !active { - block_on!(async { + block_on(async { self.strong.lock_state().await.is_initialized = false; }); } @@ -1198,9 +1197,9 @@ impl PadSink { pad_ctx.add_pending_task(fut.map(|res| res.map(drop))); Ok(FlowSuccess::Ok) } - None => block_on!(fut), + None => block_on(fut), }, - None => block_on!(fut), + None => block_on(fut), } } diff --git a/gst-plugin-threadshare/src/runtime/pad_context.rs b/gst-plugin-threadshare/src/runtime/pad_context.rs index a71fa559..87133304 100644 --- a/gst-plugin-threadshare/src/runtime/pad_context.rs +++ b/gst-plugin-threadshare/src/runtime/pad_context.rs @@ -29,7 +29,7 @@ use glib::{glib_boxed_derive_traits, glib_boxed_type}; use std::marker::PhantomData; use std::time::Duration; -use super::executor::{Context, ContextWeak, Interval, TaskOutput, TaskQueueId, Timeout}; +use super::executor::{Context, ContextWeak, TaskOutput, TaskQueueId}; #[derive(Clone)] pub struct PadContextWeak { @@ -91,11 +91,12 @@ impl<'a> PadContextRef<'a> { self.strong.downgrade() } - pub fn spawn(&self, future: Fut) + pub fn spawn(&self, future: Fut) -> tokio::task::JoinHandle where - Fut: Future + Send + 'static, + Fut: Future + Send + 'static, + Fut::Output: Send + 'static, { - self.strong.context.spawn(future); + self.strong.context.spawn(future) } pub fn add_pending_task(&self, task: T) @@ -117,30 +118,23 @@ impl<'a> PadContextRef<'a> { &self.strong.context } - pub fn new_interval(&self, interval: Duration) -> Interval { - self.strong.new_interval(interval) - } - /// Builds a `Future` to execute an `action` at [`Interval`]s. /// /// [`Interval`]: struct.Interval.html - pub fn interval(&self, interval: Duration, f: F) -> impl Future + pub fn interval(&self, interval: Duration, f: F) -> impl Future where F: Fn() -> Fut + Send + Sync + 'static, - Fut: Future> + Send + 'static, + E: Send + 'static, + Fut: Future> + Send + 'static, { self.strong.interval(interval, f) } - pub fn new_timeout(&self, timeout: Duration) -> Timeout { - self.strong.new_timeout(timeout) - } - /// Builds a `Future` to execute an action after the given `delay` has elapsed. pub fn delay_for(&self, delay: Duration, f: F) -> impl Future where F: FnOnce() -> Fut + Send + Sync + 'static, - Fut: Future + Send + 'static, + Fut: Future + Send + 'static, { self.strong.delay_for(delay, f) } @@ -188,29 +182,20 @@ impl PadContextStrong { } #[inline] - fn new_interval(&self, interval: Duration) -> Interval { - self.context.new_interval(interval) - } - - #[inline] - fn interval(&self, interval: Duration, f: F) -> impl Future + fn interval(&self, interval: Duration, f: F) -> impl Future where F: Fn() -> Fut + Send + Sync + 'static, - Fut: Future> + Send + 'static, + E: Send + 'static, + Fut: Future> + Send + 'static, { self.context.interval(interval, f) } - #[inline] - fn new_timeout(&self, timeout: Duration) -> Timeout { - self.context.new_timeout(timeout) - } - #[inline] pub fn delay_for(&self, delay: Duration, f: F) -> impl Future where F: FnOnce() -> Fut + Send + Sync + 'static, - Fut: Future + Send + 'static, + Fut: Future + Send + 'static, { self.context.delay_for(delay, f) } @@ -246,11 +231,12 @@ impl PadContext { PadContextRef::new(self.0.context.clone(), self.0.queue_id) } - pub fn spawn(&self, future: Fut) + pub fn spawn(&self, future: Fut) -> tokio::task::JoinHandle where - Fut: Future + Send + 'static, + Fut: Future + Send + 'static, + Fut::Output: Send + 'static, { - self.0.context.spawn(future); + self.0.context.spawn(future) } pub fn drain_pending_tasks(&self) -> Option> { @@ -261,30 +247,23 @@ impl PadContext { self.0.clear_pending_tasks(); } - pub fn new_interval(&self, interval: Duration) -> Interval { - self.0.new_interval(interval) - } - /// Builds a `Future` to execute an `action` at [`Interval`]s. /// /// [`Interval`]: struct.Interval.html - pub fn interval(&self, interval: Duration, f: F) -> impl Future + pub fn interval(&self, interval: Duration, f: F) -> impl Future where F: Fn() -> Fut + Send + Sync + 'static, - Fut: Future> + Send + 'static, + E: Send + 'static, + Fut: Future> + Send + 'static, { self.0.interval(interval, f) } - pub fn new_timeout(&self, timeout: Duration) -> Timeout { - self.0.new_timeout(timeout) - } - /// Builds a `Future` to execute an action after the given `delay` has elapsed. pub fn delay_for(&self, delay: Duration, f: F) -> impl Future where F: FnOnce() -> Fut + Send + Sync + 'static, - Fut: Future + Send + 'static, + Fut: Future + Send + 'static, { self.0.delay_for(delay, f) } diff --git a/gst-plugin-threadshare/src/runtime/task.rs b/gst-plugin-threadshare/src/runtime/task.rs index 5cd9fadf..7297d6ac 100644 --- a/gst-plugin-threadshare/src/runtime/task.rs +++ b/gst-plugin-threadshare/src/runtime/task.rs @@ -19,8 +19,7 @@ //! //! [`Context`]: ../executor/struct.Context.html -use futures::channel::oneshot; -use futures::future::{self, BoxFuture}; +use futures::future::{self, abortable, AbortHandle, Aborted, BoxFuture}; use futures::lock::Mutex; use futures::prelude::*; @@ -30,7 +29,6 @@ use gst::{gst_debug, gst_log, gst_trace, gst_warning}; use std::fmt; use std::sync::Arc; -use super::future::{abortable_waitable, AbortWaitHandle}; use super::{Context, RUNTIME_CAT}; #[derive(Clone, Debug, Eq, PartialEq)] @@ -52,8 +50,8 @@ impl std::error::Error for TaskError {} struct TaskInner { context: Option, state: TaskState, - loop_end_sender: Option>, - loop_handle: Option, + abort_handle: Option, + loop_handle: Option>>, } impl Default for TaskInner { @@ -61,7 +59,7 @@ impl Default for TaskInner { TaskInner { context: None, state: TaskState::Stopped, - loop_end_sender: None, + abort_handle: None, loop_handle: None, } } @@ -136,17 +134,13 @@ impl Task { gst_debug!(RUNTIME_CAT, "Starting Task"); - let (loop_fut, loop_handle) = abortable_waitable(async move { + let (loop_fut, abort_handle) = abortable(async move { loop { func().await; - let mut inner = inner_clone.lock().await; - match inner.state { + match inner_clone.lock().await.state { TaskState::Started => (), TaskState::Paused | TaskState::Stopped => { - inner.loop_handle = None; - inner.loop_end_sender.take(); - break; } other => unreachable!("Unexpected Task state {:?}", other), @@ -154,12 +148,13 @@ impl Task { } }); - inner + let loop_handle = inner .context .as_ref() .expect("Context not set") - .spawn(loop_fut.map(drop)); + .spawn(loop_fut); + inner.abort_handle = Some(abort_handle); inner.loop_handle = Some(loop_handle); inner.state = TaskState::Started; @@ -175,11 +170,10 @@ impl Task { inner.state = TaskState::Paused; - let (sender, receiver) = oneshot::channel(); - inner.loop_end_sender = Some(sender); + let loop_handle = inner.loop_handle.take().unwrap(); async move { - let _ = receiver.await; + let _ = loop_handle.await; gst_log!(RUNTIME_CAT, "Task Paused"); } .boxed() @@ -206,8 +200,12 @@ impl Task { gst_debug!(RUNTIME_CAT, "Stopping Task"); + if let Some(abort_handle) = inner.abort_handle.take() { + abort_handle.abort(); + } + if let Some(loop_handle) = inner.loop_handle.take() { - let _ = loop_handle.abort_and_wait().await; + let _ = loop_handle.await; } inner.state = TaskState::Stopped; diff --git a/gst-plugin-threadshare/src/tcpclientsrc.rs b/gst-plugin-threadshare/src/tcpclientsrc.rs index 7f78b0cf..52eed502 100644 --- a/gst-plugin-threadshare/src/tcpclientsrc.rs +++ b/gst-plugin-threadshare/src/tcpclientsrc.rs @@ -17,6 +17,7 @@ // Boston, MA 02110-1335, USA. use either::Either; +use futures::executor::block_on; use futures::future::BoxFuture; use futures::lock::{Mutex, MutexGuard}; use futures::prelude::*; @@ -43,8 +44,8 @@ use std::sync::Arc; use std::u16; use tokio::io::AsyncReadExt; +use tokio::task::JoinHandle; -use crate::block_on; use crate::runtime::prelude::*; use crate::runtime::{Context, PadSrc, PadSrcRef}; @@ -144,32 +145,14 @@ static PROPERTIES: [subclass::Property; 6] = [ ]; struct TcpClientReaderInner { - connect_future: Option>>, - socket: Option, -} - -impl TcpClientReaderInner { - fn new(connect_future: Fut) -> Self - where - Fut: Future> + Send + 'static, - { - Self { - connect_future: Some(connect_future.boxed()), - socket: None, - } - } + socket: tokio::net::TcpStream, } pub struct TcpClientReader(Arc>); impl TcpClientReader { - pub fn new(connect_future: Fut) -> Self - where - Fut: Future> + Send + 'static, - { - TcpClientReader(Arc::new(Mutex::new(TcpClientReaderInner::new( - connect_future, - )))) + pub fn new(socket: tokio::net::TcpStream) -> Self { + TcpClientReader(Arc::new(Mutex::new(TcpClientReaderInner { socket }))) } } @@ -183,18 +166,12 @@ impl SocketRead for TcpClientReader { let this = Arc::clone(&self.0); async move { - let mut this = this.lock().await; - - let socket = match this.socket { - Some(ref mut socket) => socket, - None => { - let stream = this.connect_future.take().unwrap().await?; - this.socket = Some(stream); - this.socket.as_mut().unwrap() - } - }; - - socket.read(buffer).await.map(|read_size| (read_size, None)) + this.lock() + .await + .socket + .read(buffer) + .await + .map(|read_size| (read_size, None)) } .boxed() } @@ -363,7 +340,7 @@ impl PadSrcHandler for TcpClientSrcPadHandler { let ret = match event.view() { EventView::FlushStart(..) => { - let _ = block_on!(tcpclientsrc.pause(element)); + let _ = block_on(tcpclientsrc.pause(element)); true } EventView::FlushStop(..) => { @@ -371,7 +348,7 @@ impl PadSrcHandler for TcpClientSrcPadHandler { if res == Ok(gst::StateChangeSuccess::Success) && state == gst::State::Playing || res == Ok(gst::StateChangeSuccess::Async) && pending == gst::State::Playing { - let _ = block_on!(tcpclientsrc.start(element)); + let _ = block_on(tcpclientsrc.start(element)); } true } @@ -408,7 +385,7 @@ impl PadSrcHandler for TcpClientSrcPadHandler { true } QueryView::Caps(ref mut q) => { - let inner = block_on!(self.lock()); + let inner = block_on(self.lock()); let caps = if let Some(ref caps) = inner.configured_caps { q.get_filter() .map(|f| f.intersect_with_mode(caps, gst::CapsIntersectMode::First)) @@ -446,11 +423,18 @@ impl Default for State { } } +#[derive(Debug)] +struct PreparationSet { + join_handle: JoinHandle>, + context: Context, +} + struct TcpClientSrc { src_pad: PadSrc, src_pad_handler: TcpClientSrcPadHandler, state: Mutex, settings: Mutex, + preparation_set: Mutex>, } lazy_static! { @@ -463,10 +447,35 @@ lazy_static! { impl TcpClientSrc { async fn prepare(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> { - let mut state = self.state.lock().await; + let _state = self.state.lock().await; gst_debug!(CAT, obj: element, "Preparing"); - let settings = self.settings.lock().await; + let context = { + let settings = self.settings.lock().await; + Context::acquire(&settings.context, settings.context_wait).map_err(|err| { + gst_error_msg!( + gst::ResourceError::OpenRead, + ["Failed to acquire Context: {}", err] + ) + })? + }; + + // TcpStream needs to be instantiated in the thread of its I/O reactor + *self.preparation_set.lock().await = Some(PreparationSet { + join_handle: context.spawn(Self::prepare_socket(element.clone())), + context, + }); + + gst_debug!(CAT, obj: element, "Prepared"); + + Ok(()) + } + + async fn prepare_socket(element: gst::Element) -> Result<(), gst::ErrorMessage> { + let this = Self::from_instance(&element); + + let settings = this.settings.lock().await.clone(); + gst_debug!(CAT, obj: &element, "Preparing Socket"); let addr: IpAddr = match settings.address { None => { @@ -487,17 +496,14 @@ impl TcpClientSrc { }; let port = settings.port; - let context = - Context::acquire(&settings.context, settings.context_wait).map_err(|err| { - gst_error_msg!( - gst::ResourceError::OpenRead, - ["Failed to acquire Context: {}", err] - ) - })?; - let saddr = SocketAddr::new(addr, port as u16); - gst_debug!(CAT, obj: element, "Connecting to {:?}", saddr); - let socket = tokio::net::TcpStream::connect(saddr); + gst_debug!(CAT, obj: &element, "Connecting to {:?}", saddr); + let socket = tokio::net::TcpStream::connect(saddr).await.map_err(|err| { + gst_error_msg!( + gst::ResourceError::Settings, + ["Failed to connect to {:?} {:?}", saddr, err] + ) + })?; let buffer_pool = gst::BufferPool::new(); let mut config = buffer_pool.get_config(); @@ -515,11 +521,39 @@ impl TcpClientSrc { buffer_pool, ); - let socket_stream = socket.prepare().await.map_err(|_| { - gst_error_msg!(gst::ResourceError::OpenRead, ["Failed to prepare socket"]) + let socket_stream = socket.prepare().await.map_err(|err| { + gst_error_msg!( + gst::ResourceError::OpenRead, + ["Failed to prepare socket {:?}", err] + ) })?; - self.src_pad_handler.lock().await.socket_stream = Some(socket_stream); + this.src_pad_handler.lock().await.socket_stream = Some(socket_stream); + + this.state.lock().await.socket = Some(socket); + + gst_debug!(CAT, obj: &element, "Socket Prepared"); + + Ok(()) + } + + async fn complete_preparation(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> { + gst_debug!(CAT, obj: element, "Completing preparation"); + + let PreparationSet { + join_handle, + context, + } = self + .preparation_set + .lock() + .await + .take() + .expect("preparation_set already taken"); + + join_handle + .await + .expect("The socket preparation has panicked")?; + self.src_pad .prepare(context, &self.src_pad_handler) .await @@ -530,9 +564,7 @@ impl TcpClientSrc { ) })?; - state.socket = Some(socket); - - gst_debug!(CAT, obj: element, "Prepared"); + gst_debug!(CAT, obj: element, "Preparation completed"); Ok(()) } @@ -632,6 +664,7 @@ impl ObjectSubclass for TcpClientSrc { src_pad_handler: TcpClientSrcPadHandler::new(), state: Mutex::new(State::default()), settings: Mutex::new(Settings::default()), + preparation_set: Mutex::new(None), } } } @@ -644,30 +677,30 @@ impl ObjectImpl for TcpClientSrc { match *prop { subclass::Property("address", ..) => { - let mut settings = block_on!(self.settings.lock()); + let mut settings = block_on(self.settings.lock()); settings.address = value.get().expect("type checked upstream"); } subclass::Property("port", ..) => { - let mut settings = block_on!(self.settings.lock()); + let mut settings = block_on(self.settings.lock()); settings.port = value.get_some().expect("type checked upstream"); } subclass::Property("caps", ..) => { - let mut settings = block_on!(self.settings.lock()); + let mut settings = block_on(self.settings.lock()); settings.caps = value.get().expect("type checked upstream"); } subclass::Property("chunk-size", ..) => { - let mut settings = block_on!(self.settings.lock()); + let mut settings = block_on(self.settings.lock()); settings.chunk_size = value.get_some().expect("type checked upstream"); } subclass::Property("context", ..) => { - let mut settings = block_on!(self.settings.lock()); + let mut settings = block_on(self.settings.lock()); settings.context = value .get() .expect("type checked upstream") .unwrap_or_else(|| "".into()); } subclass::Property("context-wait", ..) => { - let mut settings = block_on!(self.settings.lock()); + let mut settings = block_on(self.settings.lock()); settings.context_wait = value.get_some().expect("type checked upstream"); } _ => unimplemented!(), @@ -679,27 +712,27 @@ impl ObjectImpl for TcpClientSrc { match *prop { subclass::Property("address", ..) => { - let settings = block_on!(self.settings.lock()); + let settings = block_on(self.settings.lock()); Ok(settings.address.to_value()) } subclass::Property("port", ..) => { - let settings = block_on!(self.settings.lock()); + let settings = block_on(self.settings.lock()); Ok(settings.port.to_value()) } subclass::Property("caps", ..) => { - let settings = block_on!(self.settings.lock()); + let settings = block_on(self.settings.lock()); Ok(settings.caps.to_value()) } subclass::Property("chunk-size", ..) => { - let settings = block_on!(self.settings.lock()); + let settings = block_on(self.settings.lock()); Ok(settings.chunk_size.to_value()) } subclass::Property("context", ..) => { - let settings = block_on!(self.settings.lock()); + let settings = block_on(self.settings.lock()); Ok(settings.context.to_value()) } subclass::Property("context-wait", ..) => { - let settings = block_on!(self.settings.lock()); + let settings = block_on(self.settings.lock()); Ok(settings.context_wait.to_value()) } _ => unimplemented!(), @@ -726,20 +759,22 @@ impl ElementImpl for TcpClientSrc { match transition { gst::StateChange::NullToReady => { - block_on!(self.prepare(element)) - .map_err(|err| { - element.post_error_message(&err); - gst::StateChangeError - }) - .and_then(|_| { - block_on!(self.start(element)).map_err(|_| gst::StateChangeError) - })?; + block_on(self.prepare(element)).map_err(|err| { + element.post_error_message(&err); + gst::StateChangeError + })?; + } + gst::StateChange::ReadyToPaused => { + block_on(self.complete_preparation(element)).map_err(|err| { + element.post_error_message(&err); + gst::StateChangeError + })?; } gst::StateChange::PlayingToPaused => { - block_on!(self.pause(element)).map_err(|_| gst::StateChangeError)?; + block_on(self.pause(element)).map_err(|_| gst::StateChangeError)?; } gst::StateChange::ReadyToNull => { - block_on!(self.unprepare(element)).map_err(|_| gst::StateChangeError)?; + block_on(self.unprepare(element)).map_err(|_| gst::StateChangeError)?; } _ => (), } @@ -750,8 +785,11 @@ impl ElementImpl for TcpClientSrc { gst::StateChange::ReadyToPaused => { success = gst::StateChangeSuccess::Success; } + gst::StateChange::PausedToPlaying => { + block_on(self.start(element)).map_err(|_| gst::StateChangeError)?; + } gst::StateChange::PausedToReady => { - let mut src_pad_handler = block_on!(self.src_pad_handler.lock()); + let mut src_pad_handler = block_on(self.src_pad_handler.lock()); src_pad_handler.need_initial_events = true; } _ => (), diff --git a/gst-plugin-threadshare/src/udpsrc.rs b/gst-plugin-threadshare/src/udpsrc.rs index fbc37cf9..2ef672f8 100644 --- a/gst-plugin-threadshare/src/udpsrc.rs +++ b/gst-plugin-threadshare/src/udpsrc.rs @@ -17,6 +17,7 @@ use either::Either; +use futures::executor::block_on; use futures::future::BoxFuture; use futures::lock::{Mutex, MutexGuard}; use futures::prelude::*; @@ -54,7 +55,8 @@ use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; #[cfg(windows)] use std::os::windows::io::{AsRawSocket, FromRawSocket, IntoRawSocket, RawSocket}; -use crate::block_on; +use tokio::task::JoinHandle; + use crate::runtime::prelude::*; use crate::runtime::{Context, PadSrc, PadSrcRef}; @@ -295,14 +297,14 @@ static PROPERTIES: [subclass::Property; 10] = [ #[derive(Debug)] struct UdpReaderInner { - socket: tokio::net::udp::UdpSocket, + socket: tokio::net::UdpSocket, } #[derive(Debug)] pub struct UdpReader(Arc>); impl UdpReader { - fn new(socket: tokio::net::udp::UdpSocket) -> Self { + fn new(socket: tokio::net::UdpSocket) -> Self { UdpReader(Arc::new(Mutex::new(UdpReaderInner { socket }))) } } @@ -506,7 +508,7 @@ impl PadSrcHandler for UdpSrcPadHandler { let ret = match event.view() { EventView::FlushStart(..) => { - let _ = block_on!(udpsrc.pause(element)); + let _ = block_on(udpsrc.pause(element)); true } EventView::FlushStop(..) => { @@ -514,7 +516,7 @@ impl PadSrcHandler for UdpSrcPadHandler { if res == Ok(gst::StateChangeSuccess::Success) && state == gst::State::Playing || res == Ok(gst::StateChangeSuccess::Async) && pending == gst::State::Playing { - let _ = block_on!(udpsrc.start(element)); + let _ = block_on(udpsrc.start(element)); } true } @@ -552,7 +554,7 @@ impl PadSrcHandler for UdpSrcPadHandler { true } QueryView::Caps(ref mut q) => { - let inner = block_on!(self.lock()); + let inner = block_on(self.lock()); let caps = if let Some(ref caps) = inner.configured_caps { q.get_filter() .map(|f| f.intersect_with_mode(caps, gst::CapsIntersectMode::First)) @@ -591,12 +593,19 @@ impl Default for State { } } +#[derive(Debug)] +struct PreparationSet { + join_handle: JoinHandle>, + context: Context, +} + #[derive(Debug)] struct UdpSrc { src_pad: PadSrc, src_pad_handler: UdpSrcPadHandler, state: Mutex, settings: Mutex, + preparation_set: Mutex>, } lazy_static! { @@ -609,18 +618,36 @@ lazy_static! { impl UdpSrc { async fn prepare(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> { - let mut state = self.state.lock().await; + let _state = self.state.lock().await; gst_debug!(CAT, obj: element, "Preparing"); - let mut settings = self.settings.lock().await.clone(); + let context = { + let settings = self.settings.lock().await.clone(); - let context = Context::acquire(&settings.context, settings.context_wait).map_err(|err| { gst_error_msg!( gst::ResourceError::OpenRead, ["Failed to acquire Context: {}", err] ) - })?; + })? + }; + + // UdpSocket needs to be instantiated in the thread of its I/O reactor + *self.preparation_set.lock().await = Some(PreparationSet { + join_handle: context.spawn(Self::prepare_socket(element.clone())), + context, + }); + + gst_debug!(CAT, obj: element, "Prepared"); + + Ok(()) + } + + async fn prepare_socket(element: gst::Element) -> Result<(), gst::ErrorMessage> { + let this = Self::from_instance(&element); + + let mut settings = this.settings.lock().await.clone(); + gst_debug!(CAT, obj: &element, "Preparing Socket"); let socket = if let Some(ref wrapped_socket) = settings.socket { use std::net::UdpSocket; @@ -636,13 +663,12 @@ impl UdpSrc { socket = wrapped_socket.get() } - let socket = tokio::net::UdpSocket::from_std(socket, context.reactor_handle()) - .map_err(|err| { - gst_error_msg!( - gst::ResourceError::OpenRead, - ["Failed to setup socket for tokio: {}", err] - ) - })?; + let socket = tokio::net::UdpSocket::from_std(socket).map_err(|err| { + gst_error_msg!( + gst::ResourceError::OpenRead, + ["Failed to setup socket for tokio: {}", err] + ) + })?; settings.used_socket = Some(wrapped_socket.clone()); @@ -669,17 +695,16 @@ impl UdpSrc { // TODO: TTL, multicast loopback, etc let saddr = if addr.is_multicast() { - // TODO: Use ::unspecified() constructor once stable let bind_addr = if addr.is_ipv4() { - IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)) + IpAddr::V4(Ipv4Addr::UNSPECIFIED) } else { - IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0)) + IpAddr::V6(Ipv6Addr::UNSPECIFIED) }; let saddr = SocketAddr::new(bind_addr, port as u16); gst_debug!( CAT, - obj: element, + obj: &element, "Binding to {:?} for multicast group {:?}", saddr, addr @@ -688,7 +713,7 @@ impl UdpSrc { saddr } else { let saddr = SocketAddr::new(addr, port as u16); - gst_debug!(CAT, obj: element, "Binding to {:?}", saddr); + gst_debug!(CAT, obj: &element, "Binding to {:?}", saddr); saddr }; @@ -731,13 +756,12 @@ impl UdpSrc { ) })?; - let socket = tokio::net::UdpSocket::from_std(socket, context.reactor_handle()) - .map_err(|err| { - gst_error_msg!( - gst::ResourceError::OpenRead, - ["Failed to setup socket for tokio: {}", err] - ) - })?; + let socket = tokio::net::UdpSocket::from_std(socket).map_err(|err| { + gst_error_msg!( + gst::ResourceError::OpenRead, + ["Failed to setup socket for tokio: {}", err] + ) + })?; if addr.is_multicast() { // TODO: Multicast interface configuration, going to be tricky @@ -821,24 +845,54 @@ impl UdpSrc { let buffer_pool = gst::BufferPool::new(); let mut config = buffer_pool.get_config(); config.set_params(None, settings.mtu, 0, 0); - buffer_pool.set_config(config).map_err(|_| { + buffer_pool.set_config(config).map_err(|err| { gst_error_msg!( gst::ResourceError::Settings, - ["Failed to configure buffer pool"] + ["Failed to configure buffer pool {:?}", err] ) })?; let socket = Socket::new(element.upcast_ref(), UdpReader::new(socket), buffer_pool); - let socket_stream = socket.prepare().await.map_err(|_| { - gst_error_msg!(gst::ResourceError::OpenRead, ["Failed to prepare socket"]) + let socket_stream = socket.prepare().await.map_err(|err| { + gst_error_msg!( + gst::ResourceError::OpenRead, + ["Failed to prepare socket {:?}", err] + ) })?; { - let mut src_pad_handler = self.src_pad_handler.lock().await; + let mut src_pad_handler = this.src_pad_handler.lock().await; src_pad_handler.retrieve_sender_address = settings.retrieve_sender_address; src_pad_handler.socket_stream = Some(socket_stream); } + this.state.lock().await.socket = Some(socket); + + gst_debug!(CAT, obj: &element, "Socket Prepared"); + drop(settings); + + element.notify("used-socket"); + + Ok(()) + } + + async fn complete_preparation(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> { + gst_debug!(CAT, obj: element, "Completing preparation"); + + let PreparationSet { + join_handle, + context, + } = self + .preparation_set + .lock() + .await + .take() + .expect("preparation_set already taken"); + + join_handle + .await + .expect("The socket preparation has panicked")?; + self.src_pad .prepare(context, &self.src_pad_handler) .await @@ -849,12 +903,7 @@ impl UdpSrc { ) })?; - state.socket = Some(socket); - - gst_debug!(CAT, obj: element, "Prepared"); - drop(state); - - element.notify("used-socket"); + gst_debug!(CAT, obj: element, "Preparation completed"); Ok(()) } @@ -972,6 +1021,7 @@ impl ObjectSubclass for UdpSrc { src_pad_handler: UdpSrcPadHandler::new(), state: Mutex::new(State::default()), settings: Mutex::new(Settings::default()), + preparation_set: Mutex::new(None), } } } @@ -984,27 +1034,27 @@ impl ObjectImpl for UdpSrc { match *prop { subclass::Property("address", ..) => { - let mut settings = block_on!(self.settings.lock()); + let mut settings = block_on(self.settings.lock()); settings.address = value.get().expect("type checked upstream"); } subclass::Property("port", ..) => { - let mut settings = block_on!(self.settings.lock()); + let mut settings = block_on(self.settings.lock()); settings.port = value.get_some().expect("type checked upstream"); } subclass::Property("reuse", ..) => { - let mut settings = block_on!(self.settings.lock()); + let mut settings = block_on(self.settings.lock()); settings.reuse = value.get_some().expect("type checked upstream"); } subclass::Property("caps", ..) => { - let mut settings = block_on!(self.settings.lock()); + let mut settings = block_on(self.settings.lock()); settings.caps = value.get().expect("type checked upstream"); } subclass::Property("mtu", ..) => { - let mut settings = block_on!(self.settings.lock()); + let mut settings = block_on(self.settings.lock()); settings.mtu = value.get_some().expect("type checked upstream"); } subclass::Property("socket", ..) => { - let mut settings = block_on!(self.settings.lock()); + let mut settings = block_on(self.settings.lock()); settings.socket = value .get::() .expect("type checked upstream") @@ -1014,18 +1064,18 @@ impl ObjectImpl for UdpSrc { unreachable!(); } subclass::Property("context", ..) => { - let mut settings = block_on!(self.settings.lock()); + let mut settings = block_on(self.settings.lock()); settings.context = value .get() .expect("type checked upstream") .unwrap_or_else(|| "".into()); } subclass::Property("context-wait", ..) => { - let mut settings = block_on!(self.settings.lock()); + let mut settings = block_on(self.settings.lock()); settings.context_wait = value.get_some().expect("type checked upstream"); } subclass::Property("retrieve-sender-address", ..) => { - let mut settings = block_on!(self.settings.lock()); + let mut settings = block_on(self.settings.lock()); settings.retrieve_sender_address = value.get_some().expect("type checked upstream"); } _ => unimplemented!(), @@ -1037,27 +1087,27 @@ impl ObjectImpl for UdpSrc { match *prop { subclass::Property("address", ..) => { - let settings = block_on!(self.settings.lock()); + let settings = block_on(self.settings.lock()); Ok(settings.address.to_value()) } subclass::Property("port", ..) => { - let settings = block_on!(self.settings.lock()); + let settings = block_on(self.settings.lock()); Ok(settings.port.to_value()) } subclass::Property("reuse", ..) => { - let settings = block_on!(self.settings.lock()); + let settings = block_on(self.settings.lock()); Ok(settings.reuse.to_value()) } subclass::Property("caps", ..) => { - let settings = block_on!(self.settings.lock()); + let settings = block_on(self.settings.lock()); Ok(settings.caps.to_value()) } subclass::Property("mtu", ..) => { - let settings = block_on!(self.settings.lock()); + let settings = block_on(self.settings.lock()); Ok(settings.mtu.to_value()) } subclass::Property("socket", ..) => { - let settings = block_on!(self.settings.lock()); + let settings = block_on(self.settings.lock()); Ok(settings .socket .as_ref() @@ -1065,7 +1115,7 @@ impl ObjectImpl for UdpSrc { .to_value()) } subclass::Property("used-socket", ..) => { - let settings = block_on!(self.settings.lock()); + let settings = block_on(self.settings.lock()); Ok(settings .used_socket .as_ref() @@ -1073,15 +1123,15 @@ impl ObjectImpl for UdpSrc { .to_value()) } subclass::Property("context", ..) => { - let settings = block_on!(self.settings.lock()); + let settings = block_on(self.settings.lock()); Ok(settings.context.to_value()) } subclass::Property("context-wait", ..) => { - let settings = block_on!(self.settings.lock()); + let settings = block_on(self.settings.lock()); Ok(settings.context_wait.to_value()) } subclass::Property("retrieve-sender-address", ..) => { - let settings = block_on!(self.settings.lock()); + let settings = block_on(self.settings.lock()); Ok(settings.retrieve_sender_address.to_value()) } _ => unimplemented!(), @@ -1107,16 +1157,22 @@ impl ElementImpl for UdpSrc { match transition { gst::StateChange::NullToReady => { - block_on!(self.prepare(element)).map_err(|err| { + block_on(self.prepare(element)).map_err(|err| { + element.post_error_message(&err); + gst::StateChangeError + })?; + } + gst::StateChange::ReadyToPaused => { + block_on(self.complete_preparation(element)).map_err(|err| { element.post_error_message(&err); gst::StateChangeError })?; } gst::StateChange::PlayingToPaused => { - block_on!(self.pause(element)).map_err(|_| gst::StateChangeError)?; + block_on(self.pause(element)).map_err(|_| gst::StateChangeError)?; } gst::StateChange::ReadyToNull => { - block_on!(self.unprepare(element)).map_err(|_| gst::StateChangeError)?; + block_on(self.unprepare(element)).map_err(|_| gst::StateChangeError)?; } _ => (), } @@ -1128,10 +1184,10 @@ impl ElementImpl for UdpSrc { success = gst::StateChangeSuccess::NoPreroll; } gst::StateChange::PausedToPlaying => { - block_on!(self.start(element)).map_err(|_| gst::StateChangeError)?; + block_on(self.start(element)).map_err(|_| gst::StateChangeError)?; } gst::StateChange::PausedToReady => { - block_on!(async { + block_on(async { self.src_pad_handler.lock().await.need_initial_events = true; }); } diff --git a/gst-plugin-threadshare/tests/pad.rs b/gst-plugin-threadshare/tests/pad.rs index b583ebdd..6b799872 100644 --- a/gst-plugin-threadshare/tests/pad.rs +++ b/gst-plugin-threadshare/tests/pad.rs @@ -18,6 +18,7 @@ use either::Either; use futures::channel::mpsc; +use futures::executor::block_on; use futures::future::BoxFuture; use futures::lock::Mutex; use futures::prelude::*; @@ -36,7 +37,6 @@ use lazy_static::lazy_static; use std::boxed::Box; use std::sync::Arc; -use gstthreadshare::block_on; use gstthreadshare::runtime::prelude::*; use gstthreadshare::runtime::{Context, PadContext, PadSink, PadSinkRef, PadSrc, PadSrcRef}; @@ -151,7 +151,7 @@ impl PadSrcHandler for PadSrcHandlerTest { let ret = match event.view() { EventView::FlushStart(..) => { - let _ = block_on!(elem_src_test.pause(element)); + let _ = block_on(elem_src_test.pause(element)); true } EventView::FlushStop(..) => { @@ -159,7 +159,7 @@ impl PadSrcHandler for PadSrcHandlerTest { if res == Ok(gst::StateChangeSuccess::Success) && state == gst::State::Playing || res == Ok(gst::StateChangeSuccess::Async) && pending == gst::State::Playing { - let _ = block_on!(elem_src_test.start(element)); + let _ = block_on(elem_src_test.start(element)); } true } @@ -340,7 +340,7 @@ impl ObjectImpl for ElementSrcTest { .expect("type checked upstream") .unwrap_or_else(|| "".into()); - block_on!(self.settings.lock()).context = context; + block_on(self.settings.lock()).context = context; } _ => unimplemented!(), } @@ -364,16 +364,16 @@ impl ElementImpl for ElementSrcTest { match transition { gst::StateChange::NullToReady => { - block_on!(self.prepare(element)).map_err(|err| { + block_on(self.prepare(element)).map_err(|err| { element.post_error_message(&err); gst::StateChangeError })?; } gst::StateChange::PlayingToPaused => { - block_on!(self.pause(element)).map_err(|_| gst::StateChangeError)?; + block_on(self.pause(element)).map_err(|_| gst::StateChangeError)?; } gst::StateChange::ReadyToNull => { - block_on!(self.unprepare(element)).map_err(|_| gst::StateChangeError)?; + block_on(self.unprepare(element)).map_err(|_| gst::StateChangeError)?; } _ => (), } @@ -382,7 +382,7 @@ impl ElementImpl for ElementSrcTest { match transition { gst::StateChange::PausedToPlaying => { - block_on!(self.start(element)).map_err(|_| gst::StateChangeError)?; + block_on(self.start(element)).map_err(|_| gst::StateChangeError)?; } gst::StateChange::ReadyToPaused => { success = gst::StateChangeSuccess::NoPreroll; @@ -516,7 +516,7 @@ impl PadSinkHandler for PadSinkHandlerTest { } else { gst_debug!(SINK_CAT, obj: pad.gst_pad(), "Fowarding non-serialized event {:?}", event); Either::Left( - block_on!(elem_sink_test.sender.lock()) + block_on(elem_sink_test.sender.lock()) .as_mut() .expect("ItemSender not set") .try_send(Item::Event(event)) @@ -633,7 +633,7 @@ impl ObjectImpl for ElementSinkTest { .expect("type checked upstream") .expect("ItemSender not found") .clone(); - *block_on!(self.sender.lock()) = Some(sender); + *block_on(self.sender.lock()) = Some(sender); } _ => unimplemented!(), } @@ -657,10 +657,10 @@ impl ElementImpl for ElementSinkTest { match transition { gst::StateChange::NullToReady => { - block_on!(self.sink_pad.prepare(&PadSinkHandlerTest {})); + block_on(self.sink_pad.prepare(&PadSinkHandlerTest {})); } gst::StateChange::ReadyToNull => { - block_on!(self.sink_pad.unprepare()); + block_on(self.sink_pad.unprepare()); } _ => (), } @@ -714,14 +714,16 @@ fn task() { pipeline.set_state(gst::State::Playing).unwrap(); // Initial events - block_on!(elem_src_test.try_push(Item::Event( - gst::Event::new_stream_start("stream_id_task_test") - .group_id(gst::util_group_id_next()) - .build(), - ))) + block_on( + elem_src_test.try_push(Item::Event( + gst::Event::new_stream_start("stream_id_task_test") + .group_id(gst::util_group_id_next()) + .build(), + )), + ) .unwrap(); - match block_on!(receiver.next()).unwrap() { + match block_on(receiver.next()).unwrap() { Item::Event(event) => match event.view() { gst::EventView::CustomDownstreamSticky(e) => { assert!(PadContext::is_pad_context_sticky_event(&e)) @@ -731,7 +733,7 @@ fn task() { other => panic!("Unexpected item {:?}", other), } - match block_on!(receiver.next()).unwrap() { + match block_on(receiver.next()).unwrap() { Item::Event(event) => match event.view() { gst::EventView::StreamStart(_) => (), other => panic!("Unexpected event {:?}", other), @@ -739,12 +741,12 @@ fn task() { other => panic!("Unexpected item {:?}", other), } - block_on!(elem_src_test.try_push(Item::Event( + block_on(elem_src_test.try_push(Item::Event( gst::Event::new_segment(&gst::FormattedSegment::::new()).build(), ))) .unwrap(); - match block_on!(receiver.next()).unwrap() { + match block_on(receiver.next()).unwrap() { Item::Event(event) => match event.view() { gst::EventView::Segment(_) => (), other => panic!("Unexpected event {:?}", other), @@ -753,10 +755,10 @@ fn task() { } // Buffer - block_on!(elem_src_test.try_push(Item::Buffer(gst::Buffer::from_slice(vec![1, 2, 3, 4])))) + block_on(elem_src_test.try_push(Item::Buffer(gst::Buffer::from_slice(vec![1, 2, 3, 4])))) .unwrap(); - match block_on!(receiver.next()).unwrap() { + match block_on(receiver.next()).unwrap() { Item::Buffer(buffer) => { let data = buffer.map_readable().unwrap(); assert_eq!(data.as_slice(), vec![1, 2, 3, 4].as_slice()); @@ -769,9 +771,9 @@ fn task() { list.get_mut() .unwrap() .add(gst::Buffer::from_slice(vec![1, 2, 3, 4])); - block_on!(elem_src_test.try_push(Item::BufferList(list))).unwrap(); + block_on(elem_src_test.try_push(Item::BufferList(list))).unwrap(); - match block_on!(receiver.next()).unwrap() { + match block_on(receiver.next()).unwrap() { Item::BufferList(_) => (), other => panic!("Unexpected item {:?}", other), } @@ -780,7 +782,7 @@ fn task() { pipeline.set_state(gst::State::Paused).unwrap(); // Items not longer accepted - block_on!(elem_src_test.try_push(Item::Buffer(gst::Buffer::from_slice(vec![1, 2, 3, 4])))) + block_on(elem_src_test.try_push(Item::Buffer(gst::Buffer::from_slice(vec![1, 2, 3, 4])))) .unwrap_err(); // Nothing forwarded @@ -808,8 +810,8 @@ fn task() { .unwrap()); // EOS - block_on!(elem_src_test.try_push(Item::Event(gst::Event::new_eos().build()))).unwrap(); - match block_on!(receiver.next()).unwrap() { + block_on(elem_src_test.try_push(Item::Event(gst::Event::new_eos().build()))).unwrap(); + match block_on(receiver.next()).unwrap() { Item::Event(event) => match event.view() { gst::EventView::Eos(_) => (), other => panic!("Unexpected event {:?}", other), @@ -821,10 +823,12 @@ fn task() { pipeline.set_state(gst::State::Ready).unwrap(); // Receiver was dropped when stopping => can't send anymore - block_on!(elem_src_test.try_push(Item::Event( - gst::Event::new_stream_start("stream_id_task_test_past_stop") - .group_id(gst::util_group_id_next()) - .build(), - ))) + block_on( + elem_src_test.try_push(Item::Event( + gst::Event::new_stream_start("stream_id_task_test_past_stop") + .group_id(gst::util_group_id_next()) + .build(), + )), + ) .unwrap_err(); } diff --git a/gst-plugin-threadshare/tests/pipeline.rs b/gst-plugin-threadshare/tests/pipeline.rs index 76eaf2e9..f04c96cb 100644 --- a/gst-plugin-threadshare/tests/pipeline.rs +++ b/gst-plugin-threadshare/tests/pipeline.rs @@ -167,7 +167,8 @@ fn multiple_contexts_queue() { }; glib::Continue(true) - }); + }) + .unwrap(); pipeline.set_state(gst::State::Playing).unwrap(); @@ -326,7 +327,8 @@ fn multiple_contexts_proxy() { }; glib::Continue(true) - }); + }) + .unwrap(); pipeline.set_state(gst::State::Playing).unwrap(); @@ -421,39 +423,43 @@ fn eos() { }); let l_clone = l.clone(); - pipeline.get_bus().unwrap().add_watch(move |_, msg| { - use gst::MessageView; + pipeline + .get_bus() + .unwrap() + .add_watch(move |_, msg| { + use gst::MessageView; - match msg.view() { - MessageView::StateChanged(state_changed) => { - if let Some(source) = state_changed.get_src() { - if source.get_type() != gst::Pipeline::static_type() { - return glib::Continue(true); - } - if state_changed.get_old() == gst::State::Paused - && state_changed.get_current() == gst::State::Playing - { - if let Some(scenario) = scenario.take() { - std::thread::spawn(scenario); + match msg.view() { + MessageView::StateChanged(state_changed) => { + if let Some(source) = state_changed.get_src() { + if source.get_type() != gst::Pipeline::static_type() { + return glib::Continue(true); + } + if state_changed.get_old() == gst::State::Paused + && state_changed.get_current() == gst::State::Playing + { + if let Some(scenario) = scenario.take() { + std::thread::spawn(scenario); + } } } } - } - MessageView::Error(err) => { - gst_error!( - CAT, - "eos: Error from {:?}: {} ({:?})", - err.get_src().map(|s| s.get_path_string()), - err.get_error(), - err.get_debug() - ); - l_clone.quit(); - } - _ => (), - }; + MessageView::Error(err) => { + gst_error!( + CAT, + "eos: Error from {:?}: {} ({:?})", + err.get_src().map(|s| s.get_path_string()), + err.get_error(), + err.get_debug() + ); + l_clone.quit(); + } + _ => (), + }; - glib::Continue(true) - }); + glib::Continue(true) + }) + .unwrap(); pipeline.set_state(gst::State::Playing).unwrap(); @@ -569,39 +575,43 @@ fn premature_shutdown() { }); let l_clone = l.clone(); - pipeline.get_bus().unwrap().add_watch(move |_, msg| { - use gst::MessageView; + pipeline + .get_bus() + .unwrap() + .add_watch(move |_, msg| { + use gst::MessageView; - match msg.view() { - MessageView::StateChanged(state_changed) => { - if let Some(source) = state_changed.get_src() { - if source.get_type() != gst::Pipeline::static_type() { - return glib::Continue(true); - } - if state_changed.get_old() == gst::State::Paused - && state_changed.get_current() == gst::State::Playing - { - if let Some(scenario) = scenario.take() { - std::thread::spawn(scenario); + match msg.view() { + MessageView::StateChanged(state_changed) => { + if let Some(source) = state_changed.get_src() { + if source.get_type() != gst::Pipeline::static_type() { + return glib::Continue(true); + } + if state_changed.get_old() == gst::State::Paused + && state_changed.get_current() == gst::State::Playing + { + if let Some(scenario) = scenario.take() { + std::thread::spawn(scenario); + } } } } - } - MessageView::Error(err) => { - gst_error!( - CAT, - "premature_shutdown: Error from {:?}: {} ({:?})", - err.get_src().map(|s| s.get_path_string()), - err.get_error(), - err.get_debug() - ); - l_clone.quit(); - } - _ => (), - }; + MessageView::Error(err) => { + gst_error!( + CAT, + "premature_shutdown: Error from {:?}: {} ({:?})", + err.get_src().map(|s| s.get_path_string()), + err.get_error(), + err.get_debug() + ); + l_clone.quit(); + } + _ => (), + }; - glib::Continue(true) - }); + glib::Continue(true) + }) + .unwrap(); pipeline.set_state(gst::State::Playing).unwrap(); diff --git a/rust-toolchain b/rust-toolchain deleted file mode 100644 index 65b2df87..00000000 --- a/rust-toolchain +++ /dev/null @@ -1 +0,0 @@ -beta