Update to tokio release 0.2.5 + throttling

This commit is contained in:
François Laignel 2019-11-30 19:51:31 +01:00
parent e8f5191ee7
commit 0221524a10
22 changed files with 597 additions and 1334 deletions

View file

@ -55,32 +55,17 @@ stages:
- cargo build --color=always --all --examples --all-features - cargo build --color=always --all --examples --all-features
- G_DEBUG=fatal_warnings cargo test --color=always --all --examples --all-features - G_DEBUG=fatal_warnings cargo test --color=always --all --examples --all-features
#test 1.39: test 1.39:
# 1.39 img # 1.39 img
# https://hub.docker.com/_/rust/ # https://hub.docker.com/_/rust/
# image: "rust:1.39-slim-buster" image: "rust:1.39-slim-buster"
# extends: '.cargo test' extends: '.cargo test'
#test stable: test stable:
# Stable img # Stable img
# https://hub.docker.com/_/rust/ # https://hub.docker.com/_/rust/
# image: "rust:slim-buster"
# extends: '.cargo test'
test beta:
# https://hub.docker.com/_/rust/
image: "rust:slim-buster" image: "rust:slim-buster"
extends: '.tarball_setup' extends: '.cargo test'
script:
- export CARGO_HOME=/usr/local/cargo # will install a new toolchain, reset CARGO_HOME to its default path
- rustup toolchain install beta
- export CARGO_HOME=${CI_PROJECT_DIR}/.cargo_home
- rustup override set beta
- rustc --version
- cargo build --color=always --all
- G_DEBUG=fatal_warnings cargo test --color=always --all
- cargo build --color=always --all --examples --all-features
- G_DEBUG=fatal_warnings cargo test --color=always --all --examples --all-features
test nightly: test nightly:
# Nightly # Nightly
@ -91,15 +76,9 @@ test nightly:
rustfmt: rustfmt:
image: "rust:slim-buster" image: "rust:slim-buster"
extends: '.tarball_setup'
stage: "lint" stage: "lint"
script: script:
- export CARGO_HOME=/usr/local/cargo # will install a new toolchain, reset CARGO_HOME to its default path - rustup component add rustfmt
- rustup toolchain install beta
- rustup component add rustfmt --toolchain beta
- export CARGO_HOME=${CI_PROJECT_DIR}/.cargo_home
- rustup override set beta
- rustc --version
- cargo fmt --version - cargo fmt --version
- cargo fmt -- --color=always --check - cargo fmt -- --color=always --check
@ -108,12 +87,7 @@ clippy:
image: "rust:slim-buster" image: "rust:slim-buster"
stage: 'extras' stage: 'extras'
script: script:
- export CARGO_HOME=/usr/local/cargo # will install a new toolchain, reset CARGO_HOME to its default path - rustup component add clippy-preview
- rustup toolchain install beta
- rustup component add clippy --toolchain beta
- export CARGO_HOME=${CI_PROJECT_DIR}/.cargo_home
- rustup override set beta
- rustc --version
- cargo clippy --color=always --all --all-features -- -A clippy::redundant_pattern_matching -A clippy::single_match -A clippy::cast_lossless - cargo clippy --color=always --all --all-features -- -A clippy::redundant_pattern_matching -A clippy::single_match -A clippy::cast_lossless
audit: audit:

View file

@ -21,10 +21,7 @@ gst-net = { package = "gstreamer-net", git = "https://gitlab.freedesktop.org/gst
gst-rtp = { package = "gstreamer-rtp", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" } gst-rtp = { package = "gstreamer-rtp", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
gstreamer-sys = { git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs-sys" } gstreamer-sys = { git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs-sys" }
pin-project = "0.4" pin-project = "0.4"
tokio = "=0.2.0-alpha.6" tokio = { git = "https://github.com/fengalin/tokio", branch = "fengalin/0.2.5-throttling", features = ["io-util", "macros", "rt-core", "sync", "time", "tcp", "udp"] }
tokio-executor = { version = "=0.2.0-alpha.6", features = ["current-thread"] }
tokio-net = { version = "=0.2.0-alpha.6", features = ["tcp", "udp"] }
tokio-timer = "=0.3.0-alpha.6"
futures = "0.3" futures = "0.3"
lazy_static = "1.0" lazy_static = "1.0"
either = "1.0" either = "1.0"

View file

@ -60,10 +60,10 @@ fn main() {
let pipeline = gst::Pipeline::new(None); let pipeline = gst::Pipeline::new(None);
for i in 0..n_streams { for i in 0..n_streams {
let fakesink = let sink =
gst::ElementFactory::make("fakesink", Some(format!("sink-{}", i).as_str())).unwrap(); gst::ElementFactory::make("fakesink", Some(format!("sink-{}", i).as_str())).unwrap();
fakesink.set_property("sync", &false).unwrap(); sink.set_property("sync", &false).unwrap();
fakesink.set_property("async", &false).unwrap(); sink.set_property("async", &false).unwrap();
let source = match source.as_str() { let source = match source.as_str() {
"udpsrc" => { "udpsrc" => {
@ -117,7 +117,7 @@ fn main() {
.set_property("samplesperbuffer", &((wait as i32) * 8000 / 1000)) .set_property("samplesperbuffer", &((wait as i32) * 8000 / 1000))
.unwrap(); .unwrap();
fakesink.set_property("sync", &true).unwrap(); sink.set_property("sync", &true).unwrap();
source source
} }
@ -138,8 +138,8 @@ fn main() {
_ => unimplemented!(), _ => unimplemented!(),
}; };
pipeline.add_many(&[&source, &fakesink]).unwrap(); pipeline.add_many(&[&source, &sink]).unwrap();
source.link(&fakesink).unwrap(); source.link(&sink).unwrap();
} }
let bus = pipeline.get_bus().unwrap(); let bus = pipeline.get_bus().unwrap();

View file

@ -18,6 +18,7 @@
use either::Either; use either::Either;
use futures::channel::mpsc; use futures::channel::mpsc;
use futures::executor::block_on;
use futures::future::BoxFuture; use futures::future::BoxFuture;
use futures::lock::{Mutex, MutexGuard}; use futures::lock::{Mutex, MutexGuard};
use futures::prelude::*; use futures::prelude::*;
@ -42,7 +43,6 @@ use std::convert::TryInto;
use std::sync::Arc; use std::sync::Arc;
use std::u32; use std::u32;
use crate::block_on;
use crate::runtime::prelude::*; use crate::runtime::prelude::*;
use crate::runtime::{Context, PadSrc, PadSrcRef}; use crate::runtime::{Context, PadSrc, PadSrcRef};
@ -280,7 +280,7 @@ impl PadSrcHandler for AppSrcPadHandler {
let ret = match event.view() { let ret = match event.view() {
EventView::FlushStart(..) => { EventView::FlushStart(..) => {
let _ = block_on!(app_src.pause(element)); let _ = block_on(app_src.pause(element));
true true
} }
EventView::FlushStop(..) => { EventView::FlushStop(..) => {
@ -288,7 +288,7 @@ impl PadSrcHandler for AppSrcPadHandler {
if res == Ok(gst::StateChangeSuccess::Success) && state == gst::State::Playing if res == Ok(gst::StateChangeSuccess::Success) && state == gst::State::Playing
|| res == Ok(gst::StateChangeSuccess::Async) && pending == gst::State::Playing || res == Ok(gst::StateChangeSuccess::Async) && pending == gst::State::Playing
{ {
let _ = block_on!(app_src.start(element)); let _ = block_on(app_src.start(element));
} }
true true
} }
@ -325,7 +325,7 @@ impl PadSrcHandler for AppSrcPadHandler {
true true
} }
QueryView::Caps(ref mut q) => { QueryView::Caps(ref mut q) => {
let inner = block_on!(self.lock()); let inner = block_on(self.lock());
let caps = if let Some(ref caps) = inner.configured_caps { let caps = if let Some(ref caps) = inner.configured_caps {
q.get_filter() q.get_filter()
.map(|f| f.intersect_with_mode(caps, gst::CapsIntersectMode::First)) .map(|f| f.intersect_with_mode(caps, gst::CapsIntersectMode::First))
@ -540,7 +540,7 @@ impl ObjectSubclass for AppSrc {
.expect("missing signal arg"); .expect("missing signal arg");
let appsrc = Self::from_instance(&element); let appsrc = Self::from_instance(&element);
Some(block_on!(appsrc.push_buffer(&element, buffer)).to_value()) Some(block_on(appsrc.push_buffer(&element, buffer)).to_value())
}, },
); );
@ -555,7 +555,7 @@ impl ObjectSubclass for AppSrc {
.expect("signal arg") .expect("signal arg")
.expect("missing signal arg"); .expect("missing signal arg");
let appsrc = Self::from_instance(&element); let appsrc = Self::from_instance(&element);
Some(block_on!(appsrc.end_of_stream(&element)).to_value()) Some(block_on(appsrc.end_of_stream(&element)).to_value())
}, },
); );
} }
@ -581,26 +581,26 @@ impl ObjectImpl for AppSrc {
match *prop { match *prop {
subclass::Property("context", ..) => { subclass::Property("context", ..) => {
let mut settings = block_on!(self.settings.lock()); let mut settings = block_on(self.settings.lock());
settings.context = value settings.context = value
.get() .get()
.expect("type checked upstream") .expect("type checked upstream")
.unwrap_or_else(|| "".into()); .unwrap_or_else(|| "".into());
} }
subclass::Property("context-wait", ..) => { subclass::Property("context-wait", ..) => {
let mut settings = block_on!(self.settings.lock()); let mut settings = block_on(self.settings.lock());
settings.context_wait = value.get_some().expect("type checked upstream"); settings.context_wait = value.get_some().expect("type checked upstream");
} }
subclass::Property("caps", ..) => { subclass::Property("caps", ..) => {
let mut settings = block_on!(self.settings.lock()); let mut settings = block_on(self.settings.lock());
settings.caps = value.get().expect("type checked upstream"); settings.caps = value.get().expect("type checked upstream");
} }
subclass::Property("max-buffers", ..) => { subclass::Property("max-buffers", ..) => {
let mut settings = block_on!(self.settings.lock()); let mut settings = block_on(self.settings.lock());
settings.max_buffers = value.get_some().expect("type checked upstream"); settings.max_buffers = value.get_some().expect("type checked upstream");
} }
subclass::Property("do-timestamp", ..) => { subclass::Property("do-timestamp", ..) => {
let mut settings = block_on!(self.settings.lock()); let mut settings = block_on(self.settings.lock());
settings.do_timestamp = value.get_some().expect("type checked upstream"); settings.do_timestamp = value.get_some().expect("type checked upstream");
} }
_ => unimplemented!(), _ => unimplemented!(),
@ -612,23 +612,23 @@ impl ObjectImpl for AppSrc {
match *prop { match *prop {
subclass::Property("context", ..) => { subclass::Property("context", ..) => {
let settings = block_on!(self.settings.lock()); let settings = block_on(self.settings.lock());
Ok(settings.context.to_value()) Ok(settings.context.to_value())
} }
subclass::Property("context-wait", ..) => { subclass::Property("context-wait", ..) => {
let settings = block_on!(self.settings.lock()); let settings = block_on(self.settings.lock());
Ok(settings.context_wait.to_value()) Ok(settings.context_wait.to_value())
} }
subclass::Property("caps", ..) => { subclass::Property("caps", ..) => {
let settings = block_on!(self.settings.lock()); let settings = block_on(self.settings.lock());
Ok(settings.caps.to_value()) Ok(settings.caps.to_value())
} }
subclass::Property("max-buffers", ..) => { subclass::Property("max-buffers", ..) => {
let settings = block_on!(self.settings.lock()); let settings = block_on(self.settings.lock());
Ok(settings.max_buffers.to_value()) Ok(settings.max_buffers.to_value())
} }
subclass::Property("do-timestamp", ..) => { subclass::Property("do-timestamp", ..) => {
let settings = block_on!(self.settings.lock()); let settings = block_on(self.settings.lock());
Ok(settings.do_timestamp.to_value()) Ok(settings.do_timestamp.to_value())
} }
_ => unimplemented!(), _ => unimplemented!(),
@ -655,16 +655,16 @@ impl ElementImpl for AppSrc {
match transition { match transition {
gst::StateChange::NullToReady => { gst::StateChange::NullToReady => {
block_on!(self.prepare(element)).map_err(|err| { block_on(self.prepare(element)).map_err(|err| {
element.post_error_message(&err); element.post_error_message(&err);
gst::StateChangeError gst::StateChangeError
})?; })?;
} }
gst::StateChange::PlayingToPaused => { gst::StateChange::PlayingToPaused => {
block_on!(self.pause(element)).map_err(|_| gst::StateChangeError)?; block_on(self.pause(element)).map_err(|_| gst::StateChangeError)?;
} }
gst::StateChange::ReadyToNull => { gst::StateChange::ReadyToNull => {
block_on!(self.unprepare(element)).map_err(|_| gst::StateChangeError)?; block_on(self.unprepare(element)).map_err(|_| gst::StateChangeError)?;
} }
_ => (), _ => (),
} }
@ -676,10 +676,10 @@ impl ElementImpl for AppSrc {
success = gst::StateChangeSuccess::NoPreroll; success = gst::StateChangeSuccess::NoPreroll;
} }
gst::StateChange::PausedToPlaying => { gst::StateChange::PausedToPlaying => {
block_on!(self.start(element)).map_err(|_| gst::StateChangeError)?; block_on(self.start(element)).map_err(|_| gst::StateChangeError)?;
} }
gst::StateChange::PausedToReady => { gst::StateChange::PausedToReady => {
block_on!(async { block_on(async {
self.src_pad_handler.lock().await.need_initial_events = true; self.src_pad_handler.lock().await.need_initial_events = true;
}); });
} }

View file

@ -17,8 +17,9 @@
use either::Either; use either::Either;
use futures::executor::block_on;
use futures::future::BoxFuture; use futures::future::BoxFuture;
use futures::future::{abortable, AbortHandle}; use futures::future::{abortable, AbortHandle, Aborted};
use futures::lock::{Mutex, MutexGuard}; use futures::lock::{Mutex, MutexGuard};
use futures::prelude::*; use futures::prelude::*;
@ -41,7 +42,6 @@ use std::cmp::{max, min, Ordering};
use std::collections::{BTreeSet, VecDeque}; use std::collections::{BTreeSet, VecDeque};
use std::time::Duration; use std::time::Duration;
use crate::runtime::future::{abortable_waitable, AbortWaitHandle};
use crate::runtime::prelude::*; use crate::runtime::prelude::*;
use crate::runtime::{Context, PadContext, PadContextWeak, PadSink, PadSinkRef, PadSrc, PadSrcRef}; use crate::runtime::{Context, PadContext, PadContextWeak, PadSink, PadSinkRef, PadSrc, PadSrcRef};
@ -226,7 +226,7 @@ impl PadSinkHandler for JitterBufferPadSinkHandler {
}.boxed()) }.boxed())
} else { } else {
if let EventView::FlushStop(..) = event.view() { if let EventView::FlushStop(..) = event.view() {
block_on!(jitterbuffer.flush(element)); block_on(jitterbuffer.flush(element));
} }
gst_log!(CAT, obj: pad.gst_pad(), "Forwarding non-serialized event {:?}", event); gst_log!(CAT, obj: pad.gst_pad(), "Forwarding non-serialized event {:?}", event);
@ -247,7 +247,7 @@ impl PadSinkHandler for JitterBufferPadSinkHandler {
match query.view_mut() { match query.view_mut() {
QueryView::Drain(..) => { QueryView::Drain(..) => {
gst_info!(CAT, obj: pad.gst_pad(), "Draining"); gst_info!(CAT, obj: pad.gst_pad(), "Draining");
block_on!(jitterbuffer.enqueue_item(pad.gst_pad(), element, None)).is_ok() block_on(jitterbuffer.enqueue_item(pad.gst_pad(), element, None)).is_ok()
} }
_ => jitterbuffer.src_pad.gst_pad().peer_query(query), _ => jitterbuffer.src_pad.gst_pad().peer_query(query),
} }
@ -278,7 +278,7 @@ impl PadSrcHandler for JitterBufferPadSrcHandler {
if ret { if ret {
let (_, mut min_latency, _) = peer_query.get_result(); let (_, mut min_latency, _) = peer_query.get_result();
let our_latency = let our_latency =
block_on!(jitterbuffer.settings.lock()).latency_ms as u64 * gst::MSECOND; block_on(jitterbuffer.settings.lock()).latency_ms as u64 * gst::MSECOND;
min_latency += our_latency; min_latency += our_latency;
let max_latency = gst::CLOCK_TIME_NONE; let max_latency = gst::CLOCK_TIME_NONE;
@ -292,7 +292,7 @@ impl PadSrcHandler for JitterBufferPadSrcHandler {
if q.get_format() != gst::Format::Time { if q.get_format() != gst::Format::Time {
jitterbuffer.sink_pad.gst_pad().peer_query(query) jitterbuffer.sink_pad.gst_pad().peer_query(query)
} else { } else {
q.set(block_on!(jitterbuffer.state.lock()).segment.get_position()); q.set(block_on(jitterbuffer.state.lock()).segment.get_position());
true true
} }
} }
@ -354,7 +354,8 @@ struct State {
discont: bool, discont: bool,
last_res: Result<gst::FlowSuccess, gst::FlowError>, last_res: Result<gst::FlowSuccess, gst::FlowError>,
task_queue_abort_handle: Option<AbortHandle>, task_queue_abort_handle: Option<AbortHandle>,
wakeup_abort_handle: Option<AbortWaitHandle>, wakeup_abort_handle: Option<AbortHandle>,
wakeup_join_handle: Option<tokio::task::JoinHandle<Result<(), Aborted>>>,
} }
impl Default for State { impl Default for State {
@ -383,6 +384,7 @@ impl Default for State {
last_res: Ok(gst::FlowSuccess::Ok), last_res: Ok(gst::FlowSuccess::Ok),
task_queue_abort_handle: None, task_queue_abort_handle: None,
wakeup_abort_handle: None, wakeup_abort_handle: None,
wakeup_join_handle: None,
} }
} }
} }
@ -941,6 +943,9 @@ impl JitterBuffer {
if let Some(wakeup_abort_handle) = state.wakeup_abort_handle.take() { if let Some(wakeup_abort_handle) = state.wakeup_abort_handle.take() {
wakeup_abort_handle.abort(); wakeup_abort_handle.abort();
} }
if let Some(wakeup_join_handle) = state.wakeup_join_handle.take() {
let _ = wakeup_join_handle.await;
}
let pad_src_state = self.src_pad.lock_state().await; let pad_src_state = self.src_pad.lock_state().await;
let pad_ctx = pad_src_state.pad_context().unwrap(); let pad_ctx = pad_src_state.pad_context().unwrap();
@ -953,9 +958,8 @@ impl JitterBuffer {
Self::wakeup_fut(latency_ns, context_wait_ns, element, pad_ctx_weak) Self::wakeup_fut(latency_ns, context_wait_ns, element, pad_ctx_weak)
}); });
let (wakeup_fut, abort_handle) = abortable_waitable(wakeup_fut); let (wakeup_fut, abort_handle) = abortable(wakeup_fut);
pad_ctx.spawn(wakeup_fut.map(drop)); state.wakeup_join_handle = Some(pad_ctx.spawn(wakeup_fut));
state.wakeup_abort_handle = Some(abort_handle); state.wakeup_abort_handle = Some(abort_handle);
} }
@ -1136,7 +1140,7 @@ impl ObjectSubclass for JitterBuffer {
.expect("signal arg") .expect("signal arg")
.expect("missing signal arg"); .expect("missing signal arg");
let jitterbuffer = Self::from_instance(&element); let jitterbuffer = Self::from_instance(&element);
block_on!(jitterbuffer.clear_pt_map(&element)); block_on(jitterbuffer.clear_pt_map(&element));
None None
}, },
); );
@ -1176,10 +1180,10 @@ impl ObjectImpl for JitterBuffer {
match *prop { match *prop {
subclass::Property("latency", ..) => { subclass::Property("latency", ..) => {
let mut settings = block_on!(self.settings.lock()); let mut settings = block_on(self.settings.lock());
settings.latency_ms = value.get_some().expect("type checked upstream"); settings.latency_ms = value.get_some().expect("type checked upstream");
block_on!(self.state.lock()) block_on(self.state.lock())
.jbuf .jbuf
.borrow() .borrow()
.set_delay(settings.latency_ms as u64 * gst::MSECOND); .set_delay(settings.latency_ms as u64 * gst::MSECOND);
@ -1187,26 +1191,26 @@ impl ObjectImpl for JitterBuffer {
/* TODO: post message */ /* TODO: post message */
} }
subclass::Property("do-lost", ..) => { subclass::Property("do-lost", ..) => {
let mut settings = block_on!(self.settings.lock()); let mut settings = block_on(self.settings.lock());
settings.do_lost = value.get_some().expect("type checked upstream"); settings.do_lost = value.get_some().expect("type checked upstream");
} }
subclass::Property("max-dropout-time", ..) => { subclass::Property("max-dropout-time", ..) => {
let mut settings = block_on!(self.settings.lock()); let mut settings = block_on(self.settings.lock());
settings.max_dropout_time = value.get_some().expect("type checked upstream"); settings.max_dropout_time = value.get_some().expect("type checked upstream");
} }
subclass::Property("max-misorder-time", ..) => { subclass::Property("max-misorder-time", ..) => {
let mut settings = block_on!(self.settings.lock()); let mut settings = block_on(self.settings.lock());
settings.max_misorder_time = value.get_some().expect("type checked upstream"); settings.max_misorder_time = value.get_some().expect("type checked upstream");
} }
subclass::Property("context", ..) => { subclass::Property("context", ..) => {
let mut settings = block_on!(self.settings.lock()); let mut settings = block_on(self.settings.lock());
settings.context = value settings.context = value
.get() .get()
.expect("type checked upstream") .expect("type checked upstream")
.unwrap_or_else(|| "".into()); .unwrap_or_else(|| "".into());
} }
subclass::Property("context-wait", ..) => { subclass::Property("context-wait", ..) => {
let mut settings = block_on!(self.settings.lock()); let mut settings = block_on(self.settings.lock());
settings.context_wait = value.get_some().expect("type checked upstream"); settings.context_wait = value.get_some().expect("type checked upstream");
} }
_ => unimplemented!(), _ => unimplemented!(),
@ -1218,23 +1222,23 @@ impl ObjectImpl for JitterBuffer {
match *prop { match *prop {
subclass::Property("latency", ..) => { subclass::Property("latency", ..) => {
let settings = block_on!(self.settings.lock()); let settings = block_on(self.settings.lock());
Ok(settings.latency_ms.to_value()) Ok(settings.latency_ms.to_value())
} }
subclass::Property("do-lost", ..) => { subclass::Property("do-lost", ..) => {
let settings = block_on!(self.settings.lock()); let settings = block_on(self.settings.lock());
Ok(settings.do_lost.to_value()) Ok(settings.do_lost.to_value())
} }
subclass::Property("max-dropout-time", ..) => { subclass::Property("max-dropout-time", ..) => {
let settings = block_on!(self.settings.lock()); let settings = block_on(self.settings.lock());
Ok(settings.max_dropout_time.to_value()) Ok(settings.max_dropout_time.to_value())
} }
subclass::Property("max-misorder-time", ..) => { subclass::Property("max-misorder-time", ..) => {
let settings = block_on!(self.settings.lock()); let settings = block_on(self.settings.lock());
Ok(settings.max_misorder_time.to_value()) Ok(settings.max_misorder_time.to_value())
} }
subclass::Property("stats", ..) => { subclass::Property("stats", ..) => {
let state = block_on!(self.state.lock()); let state = block_on(self.state.lock());
let s = gst::Structure::new( let s = gst::Structure::new(
"application/x-rtp-jitterbuffer-stats", "application/x-rtp-jitterbuffer-stats",
&[ &[
@ -1246,11 +1250,11 @@ impl ObjectImpl for JitterBuffer {
Ok(s.to_value()) Ok(s.to_value())
} }
subclass::Property("context", ..) => { subclass::Property("context", ..) => {
let settings = block_on!(self.settings.lock()); let settings = block_on(self.settings.lock());
Ok(settings.context.to_value()) Ok(settings.context.to_value())
} }
subclass::Property("context-wait", ..) => { subclass::Property("context-wait", ..) => {
let settings = block_on!(self.settings.lock()); let settings = block_on(self.settings.lock());
Ok(settings.context_wait.to_value()) Ok(settings.context_wait.to_value())
} }
_ => unimplemented!(), _ => unimplemented!(),
@ -1275,7 +1279,7 @@ impl ElementImpl for JitterBuffer {
gst_trace!(CAT, obj: element, "Changing state {:?}", transition); gst_trace!(CAT, obj: element, "Changing state {:?}", transition);
match transition { match transition {
gst::StateChange::NullToReady => block_on!(async { gst::StateChange::NullToReady => block_on(async {
let _state = self.state.lock().await; let _state = self.state.lock().await;
let settings = self.settings.lock().await; let settings = self.settings.lock().await;
@ -1294,7 +1298,7 @@ impl ElementImpl for JitterBuffer {
self.sink_pad.prepare(&JitterBufferPadSinkHandler {}).await; self.sink_pad.prepare(&JitterBufferPadSinkHandler {}).await;
}), }),
gst::StateChange::PausedToReady => block_on!(async { gst::StateChange::PausedToReady => block_on(async {
let mut state = self.state.lock().await; let mut state = self.state.lock().await;
if let Some(wakeup_abort_handle) = state.wakeup_abort_handle.take() { if let Some(wakeup_abort_handle) = state.wakeup_abort_handle.take() {
@ -1305,7 +1309,7 @@ impl ElementImpl for JitterBuffer {
abort_handle.abort(); abort_handle.abort();
} }
}), }),
gst::StateChange::ReadyToNull => block_on!(async { gst::StateChange::ReadyToNull => block_on(async {
let mut state = self.state.lock().await; let mut state = self.state.lock().await;
self.sink_pad.unprepare().await; self.sink_pad.unprepare().await;

View file

@ -24,7 +24,7 @@
#![recursion_limit = "1024"] #![recursion_limit = "1024"]
#![crate_type = "cdylib"] #![crate_type = "cdylib"]
pub use tokio_executor; pub use tokio;
#[macro_use] #[macro_use]
pub mod runtime; pub mod runtime;

View file

@ -18,6 +18,7 @@
use either::Either; use either::Either;
use futures::channel::oneshot; use futures::channel::oneshot;
use futures::executor::block_on;
use futures::future::BoxFuture; use futures::future::BoxFuture;
use futures::lock::{Mutex, MutexGuard}; use futures::lock::{Mutex, MutexGuard};
use futures::prelude::*; use futures::prelude::*;
@ -40,7 +41,6 @@ use std::collections::{HashMap, VecDeque};
use std::sync::{Arc, Weak}; use std::sync::{Arc, Weak};
use std::{u32, u64}; use std::{u32, u64};
use crate::block_on;
use crate::runtime::prelude::*; use crate::runtime::prelude::*;
use crate::runtime::{Context, PadSink, PadSinkRef, PadSrc, PadSrcRef}; use crate::runtime::{Context, PadSink, PadSinkRef, PadSrc, PadSrcRef};
@ -364,7 +364,7 @@ impl PadSinkHandler for ProxySinkPadHandler {
} else { } else {
match event.view() { match event.view() {
EventView::FlushStart(..) => { EventView::FlushStart(..) => {
let _ = block_on!(proxysink.stop(element)); let _ = block_on(proxysink.stop(element));
} }
EventView::FlushStop(..) => { EventView::FlushStop(..) => {
let (res, state, pending) = element.get_state(0.into()); let (res, state, pending) = element.get_state(0.into());
@ -372,7 +372,7 @@ impl PadSinkHandler for ProxySinkPadHandler {
|| res == Ok(gst::StateChangeSuccess::Async) || res == Ok(gst::StateChangeSuccess::Async)
&& pending == gst::State::Paused && pending == gst::State::Paused
{ {
let _ = block_on!(proxysink.start(&element)); let _ = block_on(proxysink.start(&element));
} }
} }
_ => (), _ => (),
@ -380,7 +380,7 @@ impl PadSinkHandler for ProxySinkPadHandler {
gst_debug!(SINK_CAT, obj: pad.gst_pad(), "Fowarding non-serialized event {:?}", event); gst_debug!(SINK_CAT, obj: pad.gst_pad(), "Fowarding non-serialized event {:?}", event);
// FIXME proxysink can't forward directly to the src_pad of the proxysrc // FIXME proxysink can't forward directly to the src_pad of the proxysrc
let _ = block_on!(proxysink.enqueue_item(&element, DataQueueItem::Event(event))); let _ = block_on(proxysink.enqueue_item(&element, DataQueueItem::Event(event)));
Either::Left(true) Either::Left(true)
} }
@ -706,7 +706,7 @@ impl ObjectImpl for ProxySink {
match *prop { match *prop {
subclass::Property("proxy-context", ..) => { subclass::Property("proxy-context", ..) => {
let mut settings = block_on!(self.settings.lock()); let mut settings = block_on(self.settings.lock());
settings.proxy_context = value settings.proxy_context = value
.get() .get()
.expect("type checked upstream") .expect("type checked upstream")
@ -721,7 +721,7 @@ impl ObjectImpl for ProxySink {
match *prop { match *prop {
subclass::Property("proxy-context", ..) => { subclass::Property("proxy-context", ..) => {
let settings = block_on!(self.settings.lock()); let settings = block_on(self.settings.lock());
Ok(settings.proxy_context.to_value()) Ok(settings.proxy_context.to_value())
} }
_ => unimplemented!(), _ => unimplemented!(),
@ -748,16 +748,16 @@ impl ElementImpl for ProxySink {
match transition { match transition {
gst::StateChange::NullToReady => { gst::StateChange::NullToReady => {
block_on!(self.prepare(element)).map_err(|err| { block_on(self.prepare(element)).map_err(|err| {
element.post_error_message(&err); element.post_error_message(&err);
gst::StateChangeError gst::StateChangeError
})?; })?;
} }
gst::StateChange::PausedToReady => { gst::StateChange::PausedToReady => {
block_on!(self.stop(element)).map_err(|_| gst::StateChangeError)?; block_on(self.stop(element)).map_err(|_| gst::StateChangeError)?;
} }
gst::StateChange::ReadyToNull => { gst::StateChange::ReadyToNull => {
block_on!(self.unprepare(element)).map_err(|_| gst::StateChangeError)?; block_on(self.unprepare(element)).map_err(|_| gst::StateChangeError)?;
} }
_ => (), _ => (),
} }
@ -765,7 +765,7 @@ impl ElementImpl for ProxySink {
let success = self.parent_change_state(element, transition)?; let success = self.parent_change_state(element, transition)?;
if transition == gst::StateChange::ReadyToPaused { if transition == gst::StateChange::ReadyToPaused {
block_on!(self.start(element)).map_err(|_| gst::StateChangeError)?; block_on(self.start(element)).map_err(|_| gst::StateChangeError)?;
} }
Ok(success) Ok(success)
@ -881,7 +881,7 @@ impl PadSrcHandler for ProxySrcPadHandler {
let ret = match event.view() { let ret = match event.view() {
EventView::FlushStart(..) => { EventView::FlushStart(..) => {
let _ = block_on!(proxysrc.pause(element)); let _ = block_on(proxysrc.pause(element));
true true
} }
EventView::FlushStop(..) => { EventView::FlushStop(..) => {
@ -889,7 +889,7 @@ impl PadSrcHandler for ProxySrcPadHandler {
if res == Ok(gst::StateChangeSuccess::Success) && state == gst::State::Playing if res == Ok(gst::StateChangeSuccess::Success) && state == gst::State::Playing
|| res == Ok(gst::StateChangeSuccess::Async) && pending == gst::State::Playing || res == Ok(gst::StateChangeSuccess::Async) && pending == gst::State::Playing
{ {
let _ = block_on!(proxysrc.start(element)); let _ = block_on(proxysrc.start(element));
} }
true true
} }
@ -1157,30 +1157,30 @@ impl ObjectImpl for ProxySrc {
match *prop { match *prop {
subclass::Property("max-size-buffers", ..) => { subclass::Property("max-size-buffers", ..) => {
let mut settings = block_on!(self.settings.lock()); let mut settings = block_on(self.settings.lock());
settings.max_size_buffers = value.get_some().expect("type checked upstream"); settings.max_size_buffers = value.get_some().expect("type checked upstream");
} }
subclass::Property("max-size-bytes", ..) => { subclass::Property("max-size-bytes", ..) => {
let mut settings = block_on!(self.settings.lock()); let mut settings = block_on(self.settings.lock());
settings.max_size_bytes = value.get_some().expect("type checked upstream"); settings.max_size_bytes = value.get_some().expect("type checked upstream");
} }
subclass::Property("max-size-time", ..) => { subclass::Property("max-size-time", ..) => {
let mut settings = block_on!(self.settings.lock()); let mut settings = block_on(self.settings.lock());
settings.max_size_time = value.get_some().expect("type checked upstream"); settings.max_size_time = value.get_some().expect("type checked upstream");
} }
subclass::Property("context", ..) => { subclass::Property("context", ..) => {
let mut settings = block_on!(self.settings.lock()); let mut settings = block_on(self.settings.lock());
settings.context = value settings.context = value
.get() .get()
.expect("type checked upstream") .expect("type checked upstream")
.unwrap_or_else(|| "".into()); .unwrap_or_else(|| "".into());
} }
subclass::Property("context-wait", ..) => { subclass::Property("context-wait", ..) => {
let mut settings = block_on!(self.settings.lock()); let mut settings = block_on(self.settings.lock());
settings.context_wait = value.get_some().expect("type checked upstream"); settings.context_wait = value.get_some().expect("type checked upstream");
} }
subclass::Property("proxy-context", ..) => { subclass::Property("proxy-context", ..) => {
let mut settings = block_on!(self.settings.lock()); let mut settings = block_on(self.settings.lock());
settings.proxy_context = value settings.proxy_context = value
.get() .get()
.expect("type checked upstream") .expect("type checked upstream")
@ -1195,27 +1195,27 @@ impl ObjectImpl for ProxySrc {
match *prop { match *prop {
subclass::Property("max-size-buffers", ..) => { subclass::Property("max-size-buffers", ..) => {
let settings = block_on!(self.settings.lock()); let settings = block_on(self.settings.lock());
Ok(settings.max_size_buffers.to_value()) Ok(settings.max_size_buffers.to_value())
} }
subclass::Property("max-size-bytes", ..) => { subclass::Property("max-size-bytes", ..) => {
let settings = block_on!(self.settings.lock()); let settings = block_on(self.settings.lock());
Ok(settings.max_size_bytes.to_value()) Ok(settings.max_size_bytes.to_value())
} }
subclass::Property("max-size-time", ..) => { subclass::Property("max-size-time", ..) => {
let settings = block_on!(self.settings.lock()); let settings = block_on(self.settings.lock());
Ok(settings.max_size_time.to_value()) Ok(settings.max_size_time.to_value())
} }
subclass::Property("context", ..) => { subclass::Property("context", ..) => {
let settings = block_on!(self.settings.lock()); let settings = block_on(self.settings.lock());
Ok(settings.context.to_value()) Ok(settings.context.to_value())
} }
subclass::Property("context-wait", ..) => { subclass::Property("context-wait", ..) => {
let settings = block_on!(self.settings.lock()); let settings = block_on(self.settings.lock());
Ok(settings.context_wait.to_value()) Ok(settings.context_wait.to_value())
} }
subclass::Property("proxy-context", ..) => { subclass::Property("proxy-context", ..) => {
let settings = block_on!(self.settings.lock()); let settings = block_on(self.settings.lock());
Ok(settings.proxy_context.to_value()) Ok(settings.proxy_context.to_value())
} }
_ => unimplemented!(), _ => unimplemented!(),
@ -1242,16 +1242,16 @@ impl ElementImpl for ProxySrc {
match transition { match transition {
gst::StateChange::NullToReady => { gst::StateChange::NullToReady => {
block_on!(self.prepare(element)).map_err(|err| { block_on(self.prepare(element)).map_err(|err| {
element.post_error_message(&err); element.post_error_message(&err);
gst::StateChangeError gst::StateChangeError
})?; })?;
} }
gst::StateChange::PlayingToPaused => { gst::StateChange::PlayingToPaused => {
block_on!(self.pause(element)).map_err(|_| gst::StateChangeError)?; block_on(self.pause(element)).map_err(|_| gst::StateChangeError)?;
} }
gst::StateChange::ReadyToNull => { gst::StateChange::ReadyToNull => {
block_on!(self.unprepare(element)).map_err(|_| gst::StateChangeError)?; block_on(self.unprepare(element)).map_err(|_| gst::StateChangeError)?;
} }
_ => (), _ => (),
} }
@ -1263,7 +1263,7 @@ impl ElementImpl for ProxySrc {
success = gst::StateChangeSuccess::NoPreroll; success = gst::StateChangeSuccess::NoPreroll;
} }
gst::StateChange::PausedToPlaying => { gst::StateChange::PausedToPlaying => {
block_on!(self.start(element)).map_err(|_| gst::StateChangeError)?; block_on(self.start(element)).map_err(|_| gst::StateChangeError)?;
} }
_ => (), _ => (),
} }

View file

@ -18,6 +18,7 @@
use either::Either; use either::Either;
use futures::channel::oneshot; use futures::channel::oneshot;
use futures::executor::block_on;
use futures::future::BoxFuture; use futures::future::BoxFuture;
use futures::lock::Mutex; use futures::lock::Mutex;
use futures::prelude::*; use futures::prelude::*;
@ -39,7 +40,6 @@ use lazy_static::lazy_static;
use std::collections::VecDeque; use std::collections::VecDeque;
use std::{u32, u64}; use std::{u32, u64};
use crate::block_on;
use crate::runtime::prelude::*; use crate::runtime::prelude::*;
use crate::runtime::{Context, PadSink, PadSinkRef, PadSrc, PadSrcRef}; use crate::runtime::{Context, PadSink, PadSinkRef, PadSrc, PadSrcRef};
@ -372,7 +372,7 @@ impl PadSrcHandler for QueuePadSrcHandler {
} else { } else {
match event.view() { match event.view() {
EventView::FlushStart(..) => { EventView::FlushStart(..) => {
let _ = block_on!(queue.stop(element)); let _ = block_on(queue.stop(element));
} }
EventView::FlushStop(..) => { EventView::FlushStop(..) => {
let (res, state, pending) = element.get_state(0.into()); let (res, state, pending) = element.get_state(0.into());
@ -380,7 +380,7 @@ impl PadSrcHandler for QueuePadSrcHandler {
|| res == Ok(gst::StateChangeSuccess::Async) || res == Ok(gst::StateChangeSuccess::Async)
&& pending == gst::State::Playing && pending == gst::State::Playing
{ {
let _ = block_on!(queue.start(element)); let _ = block_on(queue.start(element));
} }
} }
_ => (), _ => (),
@ -812,26 +812,26 @@ impl ObjectImpl for Queue {
match *prop { match *prop {
subclass::Property("max-size-buffers", ..) => { subclass::Property("max-size-buffers", ..) => {
let mut settings = block_on!(self.settings.lock()); let mut settings = block_on(self.settings.lock());
settings.max_size_buffers = value.get_some().expect("type checked upstream"); settings.max_size_buffers = value.get_some().expect("type checked upstream");
} }
subclass::Property("max-size-bytes", ..) => { subclass::Property("max-size-bytes", ..) => {
let mut settings = block_on!(self.settings.lock()); let mut settings = block_on(self.settings.lock());
settings.max_size_bytes = value.get_some().expect("type checked upstream"); settings.max_size_bytes = value.get_some().expect("type checked upstream");
} }
subclass::Property("max-size-time", ..) => { subclass::Property("max-size-time", ..) => {
let mut settings = block_on!(self.settings.lock()); let mut settings = block_on(self.settings.lock());
settings.max_size_time = value.get_some().expect("type checked upstream"); settings.max_size_time = value.get_some().expect("type checked upstream");
} }
subclass::Property("context", ..) => { subclass::Property("context", ..) => {
let mut settings = block_on!(self.settings.lock()); let mut settings = block_on(self.settings.lock());
settings.context = value settings.context = value
.get() .get()
.expect("type checked upstream") .expect("type checked upstream")
.unwrap_or_else(|| "".into()); .unwrap_or_else(|| "".into());
} }
subclass::Property("context-wait", ..) => { subclass::Property("context-wait", ..) => {
let mut settings = block_on!(self.settings.lock()); let mut settings = block_on(self.settings.lock());
settings.context_wait = value.get_some().expect("type checked upstream"); settings.context_wait = value.get_some().expect("type checked upstream");
} }
_ => unimplemented!(), _ => unimplemented!(),
@ -843,23 +843,23 @@ impl ObjectImpl for Queue {
match *prop { match *prop {
subclass::Property("max-size-buffers", ..) => { subclass::Property("max-size-buffers", ..) => {
let settings = block_on!(self.settings.lock()); let settings = block_on(self.settings.lock());
Ok(settings.max_size_buffers.to_value()) Ok(settings.max_size_buffers.to_value())
} }
subclass::Property("max-size-bytes", ..) => { subclass::Property("max-size-bytes", ..) => {
let settings = block_on!(self.settings.lock()); let settings = block_on(self.settings.lock());
Ok(settings.max_size_bytes.to_value()) Ok(settings.max_size_bytes.to_value())
} }
subclass::Property("max-size-time", ..) => { subclass::Property("max-size-time", ..) => {
let settings = block_on!(self.settings.lock()); let settings = block_on(self.settings.lock());
Ok(settings.max_size_time.to_value()) Ok(settings.max_size_time.to_value())
} }
subclass::Property("context", ..) => { subclass::Property("context", ..) => {
let settings = block_on!(self.settings.lock()); let settings = block_on(self.settings.lock());
Ok(settings.context.to_value()) Ok(settings.context.to_value())
} }
subclass::Property("context-wait", ..) => { subclass::Property("context-wait", ..) => {
let settings = block_on!(self.settings.lock()); let settings = block_on(self.settings.lock());
Ok(settings.context_wait.to_value()) Ok(settings.context_wait.to_value())
} }
_ => unimplemented!(), _ => unimplemented!(),
@ -885,16 +885,16 @@ impl ElementImpl for Queue {
match transition { match transition {
gst::StateChange::NullToReady => { gst::StateChange::NullToReady => {
block_on!(self.prepare(element)).map_err(|err| { block_on(self.prepare(element)).map_err(|err| {
element.post_error_message(&err); element.post_error_message(&err);
gst::StateChangeError gst::StateChangeError
})?; })?;
} }
gst::StateChange::PausedToReady => { gst::StateChange::PausedToReady => {
block_on!(self.stop(element)).map_err(|_| gst::StateChangeError)?; block_on(self.stop(element)).map_err(|_| gst::StateChangeError)?;
} }
gst::StateChange::ReadyToNull => { gst::StateChange::ReadyToNull => {
block_on!(self.unprepare(element)).map_err(|_| gst::StateChangeError)?; block_on(self.unprepare(element)).map_err(|_| gst::StateChangeError)?;
} }
_ => (), _ => (),
} }
@ -902,7 +902,7 @@ impl ElementImpl for Queue {
let success = self.parent_change_state(element, transition)?; let success = self.parent_change_state(element, transition)?;
if transition == gst::StateChange::ReadyToPaused { if transition == gst::StateChange::ReadyToPaused {
block_on!(self.start(element)).map_err(|_| gst::StateChangeError)?; block_on(self.start(element)).map_err(|_| gst::StateChangeError)?;
} }
Ok(success) Ok(success)

View file

@ -36,10 +36,9 @@
//! [`PadSrc`]: struct.PadSrc.html //! [`PadSrc`]: struct.PadSrc.html
//! [`PadSink`]: struct.PadSink.html //! [`PadSink`]: struct.PadSink.html
use futures::channel::mpsc as future_mpsc; use futures::channel::oneshot;
use futures::future::BoxFuture; use futures::future::BoxFuture;
use futures::prelude::*; use futures::prelude::*;
use futures::ready;
use futures::stream::futures_unordered::FuturesUnordered; use futures::stream::futures_unordered::FuturesUnordered;
use glib; use glib;
@ -50,19 +49,13 @@ use gst::{gst_debug, gst_log, gst_trace};
use lazy_static::lazy_static; use lazy_static::lazy_static;
use std::cmp; use std::collections::HashMap;
use std::collections::{BinaryHeap, HashMap};
use std::io; use std::io;
use std::mem; use std::mem;
use std::pin::Pin;
use std::sync::mpsc as sync_mpsc; use std::sync::mpsc as sync_mpsc;
use std::sync::{atomic, Arc, Mutex, Weak}; use std::sync::{Arc, Mutex, Weak};
use std::task::Poll;
use std::thread; use std::thread;
use std::time::{Duration, Instant}; use std::time::Duration;
use tokio_executor::current_thread as tokio_current_thread;
use tokio_executor::park::Unpark;
use super::RUNTIME_CAT; use super::RUNTIME_CAT;
@ -82,170 +75,52 @@ lazy_static! {
struct ContextThread { struct ContextThread {
name: String, name: String,
shutdown: Arc<atomic::AtomicBool>,
} }
impl ContextThread { impl ContextThread {
fn start( fn start(name: &str, wait: u32) -> (tokio::runtime::Handle, ContextShutdown) {
name: &str,
wait: u32,
reactor: tokio_net::driver::Reactor,
timers: Arc<Mutex<BinaryHeap<TimerEntry>>>,
) -> (tokio_current_thread::Handle, ContextShutdown) {
let handle = reactor.handle();
let shutdown = Arc::new(atomic::AtomicBool::new(false));
let shutdown_clone = shutdown.clone();
let name_clone = name.into(); let name_clone = name.into();
let mut context_thread = ContextThread { let mut context_thread = ContextThread { name: name_clone };
shutdown: shutdown_clone,
name: name_clone,
};
let (sender, receiver) = sync_mpsc::channel(); let (handle_sender, handle_receiver) = sync_mpsc::channel();
let (shutdown_sender, shutdown_receiver) = oneshot::channel();
let join = thread::spawn(move || { let join = thread::spawn(move || {
context_thread.spawn(wait, reactor, sender, timers); context_thread.spawn(wait, handle_sender, shutdown_receiver);
}); });
let handle = handle_receiver.recv().expect("Context thread init failed");
let shutdown = ContextShutdown { let shutdown = ContextShutdown {
name: name.into(), name: name.into(),
shutdown, shutdown: Some(shutdown_sender),
handle,
join: Some(join), join: Some(join),
}; };
let thread_handle = receiver.recv().expect("Context thread init failed"); (handle, shutdown)
(thread_handle, shutdown)
} }
fn spawn( fn spawn(
&mut self, &mut self,
wait: u32, wait: u32,
reactor: tokio_net::driver::Reactor, handle_sender: sync_mpsc::Sender<tokio::runtime::Handle>,
sender: sync_mpsc::Sender<tokio_current_thread::Handle>, shutdown_receiver: oneshot::Receiver<()>,
timers: Arc<Mutex<BinaryHeap<TimerEntry>>>,
) { ) {
gst_debug!(RUNTIME_CAT, "Started context thread '{}'", self.name); gst_debug!(RUNTIME_CAT, "Started context thread '{}'", self.name);
let wait = Duration::from_millis(wait as u64); let mut runtime = tokio::runtime::Builder::new()
.basic_scheduler()
.enable_all()
.max_throttling(Duration::from_millis(wait as u64))
.build()
.expect("Couldn't build the runtime");
let handle = reactor.handle(); handle_sender
let timer = tokio_timer::Timer::new(reactor); .send(runtime.handle().clone())
let timer_handle = timer.handle();
let mut current_thread = tokio_current_thread::CurrentThread::new_with_park(timer);
sender
.send(current_thread.handle())
.expect("Couldn't send context thread handle"); .expect("Couldn't send context thread handle");
let _timer_guard = tokio_timer::set_default(&timer_handle); let _ = runtime.block_on(shutdown_receiver);
let _reactor_guard = tokio_net::driver::set_default(&handle);
let mut now = Instant::now();
loop {
if self.shutdown.load(atomic::Ordering::SeqCst) {
gst_debug!(RUNTIME_CAT, "Shutting down loop");
break;
}
gst_trace!(RUNTIME_CAT, "Elapsed {:?} since last loop", now.elapsed());
// Handle timers
{
// Trigger all timers that would be expired before the middle of the loop wait
// time
let timer_threshold = now + wait / 2;
let mut timers = timers.lock().unwrap();
while timers
.peek()
.and_then(|entry| {
if entry.time < timer_threshold {
Some(())
} else {
None
}
})
.is_some()
{
let TimerEntry {
time,
interval,
sender,
..
} = timers.pop().unwrap();
if sender.is_closed() {
continue;
}
let _ = sender.unbounded_send(());
if let Some(interval) = interval {
timers.push(TimerEntry {
time: time + interval,
id: TIMER_ENTRY_ID.fetch_add(1, atomic::Ordering::Relaxed),
interval: Some(interval),
sender,
});
}
}
}
gst_trace!(RUNTIME_CAT, "Turning thread '{}'", self.name);
while current_thread
.turn(Some(Duration::from_millis(0)))
.unwrap()
.has_polled()
{}
gst_trace!(RUNTIME_CAT, "Turned thread '{}'", self.name);
// We have to check again after turning in case we're supposed to shut down now
// and already handled the unpark above
if self.shutdown.load(atomic::Ordering::SeqCst) {
gst_debug!(RUNTIME_CAT, "Shutting down loop");
break;
}
let elapsed = now.elapsed();
gst_trace!(RUNTIME_CAT, "Elapsed {:?} after handling futures", elapsed);
if wait == Duration::from_millis(0) {
let timers = timers.lock().unwrap();
let wait = match timers.peek().map(|entry| entry.time) {
None => None,
Some(time) => Some({
let tmp = Instant::now();
if time < tmp {
Duration::from_millis(0)
} else {
time.duration_since(tmp)
}
}),
};
drop(timers);
gst_trace!(RUNTIME_CAT, "Sleeping for up to {:?}", wait);
current_thread.turn(wait).unwrap();
gst_trace!(RUNTIME_CAT, "Slept for {:?}", now.elapsed());
now = Instant::now();
} else {
if elapsed < wait {
gst_trace!(
RUNTIME_CAT,
"Waiting for {:?} before polling again",
wait - elapsed
);
thread::sleep(wait - elapsed);
gst_trace!(RUNTIME_CAT, "Slept for {:?}", now.elapsed());
}
now += wait;
}
}
} }
} }
@ -258,8 +133,7 @@ impl Drop for ContextThread {
#[derive(Debug)] #[derive(Debug)]
struct ContextShutdown { struct ContextShutdown {
name: String, name: String,
shutdown: Arc<atomic::AtomicBool>, shutdown: Option<oneshot::Sender<()>>,
handle: tokio_net::driver::Handle,
join: Option<thread::JoinHandle<()>>, join: Option<thread::JoinHandle<()>>,
} }
@ -270,16 +144,13 @@ impl Drop for ContextShutdown {
"Shutting down context thread thread '{}'", "Shutting down context thread thread '{}'",
self.name self.name
); );
self.shutdown.store(true, atomic::Ordering::SeqCst); self.shutdown.take().unwrap();
gst_trace!( gst_trace!(
RUNTIME_CAT, RUNTIME_CAT,
"Waiting for context thread '{}' to shutdown", "Waiting for context thread '{}' to shutdown",
self.name self.name
); );
// After being unparked, the next turn() is guaranteed to finish immediately,
// as such there is no race condition between checking for shutdown and setting
// shutdown.
self.handle.unpark();
let _ = self.join.take().unwrap().join(); let _ = self.join.take().unwrap().join();
} }
} }
@ -301,9 +172,7 @@ type TaskQueue = FuturesUnordered<BoxFuture<'static, TaskOutput>>;
#[derive(Debug)] #[derive(Debug)]
struct ContextInner { struct ContextInner {
name: String, name: String,
thread_handle: Mutex<tokio_current_thread::Handle>, handle: Mutex<tokio::runtime::Handle>,
reactor_handle: tokio_net::driver::Handle,
timers: Arc<Mutex<BinaryHeap<TimerEntry>>>,
// Only used for dropping // Only used for dropping
_shutdown: ContextShutdown, _shutdown: ContextShutdown,
task_queues: Mutex<(u64, HashMap<u64, TaskQueue>)>, task_queues: Mutex<(u64, HashMap<u64, TaskQueue>)>,
@ -352,19 +221,11 @@ impl Context {
} }
} }
let reactor = tokio_net::driver::Reactor::new()?; let (handle, shutdown) = ContextThread::start(context_name, wait);
let reactor_handle = reactor.handle();
let timers = Arc::new(Mutex::new(BinaryHeap::new()));
let (thread_handle, shutdown) =
ContextThread::start(context_name, wait, reactor, timers.clone());
let context = Context(Arc::new(ContextInner { let context = Context(Arc::new(ContextInner {
name: context_name.into(), name: context_name.into(),
thread_handle: Mutex::new(thread_handle), handle: Mutex::new(handle),
reactor_handle,
timers,
_shutdown: shutdown, _shutdown: shutdown,
task_queues: Mutex::new((0, HashMap::new())), task_queues: Mutex::new((0, HashMap::new())),
})); }));
@ -391,15 +252,12 @@ impl Context {
self.0.name.as_str() self.0.name.as_str()
} }
pub fn reactor_handle(&self) -> &tokio_net::driver::Handle { pub fn spawn<Fut>(&self, future: Fut) -> tokio::task::JoinHandle<Fut::Output>
&self.0.reactor_handle
}
pub fn spawn<Fut>(&self, future: Fut)
where where
Fut: Future<Output = ()> + Send + 'static, Fut: Future + Send + 'static,
Fut::Output: Send + 'static,
{ {
self.0.thread_handle.lock().unwrap().spawn(future).unwrap(); self.0.handle.lock().unwrap().spawn(future)
} }
pub fn release_task_queue(&self, id: TaskQueueId) -> Option<TaskQueue> { pub fn release_task_queue(&self, id: TaskQueueId) -> Option<TaskQueue> {
@ -451,169 +309,59 @@ impl Context {
} }
} }
pub fn add_timer(
&self,
time: Instant,
interval: Option<Duration>,
) -> future_mpsc::UnboundedReceiver<()> {
let (sender, receiver) = future_mpsc::unbounded();
let mut timers = self.0.timers.lock().unwrap();
let entry = TimerEntry {
time,
id: TIMER_ENTRY_ID.fetch_add(1, atomic::Ordering::Relaxed),
interval,
sender,
};
timers.push(entry);
self.0.reactor_handle.unpark();
receiver
}
pub fn new_interval(&self, interval: Duration) -> Interval {
Interval::new(&self, interval)
}
/// Builds a `Future` to execute an `action` at [`Interval`]s. /// Builds a `Future` to execute an `action` at [`Interval`]s.
/// ///
/// [`Interval`]: struct.Interval.html /// [`Interval`]: struct.Interval.html
pub fn interval<F, Fut>(&self, interval: Duration, f: F) -> impl Future<Output = Fut::Output> pub fn interval<F, E, Fut>(&self, interval: Duration, f: F) -> impl Future<Output = Fut::Output>
where where
F: Fn() -> Fut + Send + Sync + 'static, F: Fn() -> Fut + Send + Sync + 'static,
Fut: Future<Output = Result<(), ()>> + Send + 'static, E: Send + 'static,
Fut: Future<Output = Result<(), E>> + Send + 'static,
{ {
let f = Arc::new(f); async move {
self.new_interval(interval).try_for_each(move |_| { let mut interval = tokio::time::interval(interval);
let f = Arc::clone(&f); loop {
f() interval.tick().await;
}) if let Err(err) = f().await {
} break Err(err);
}
pub fn new_timeout(&self, timeout: Duration) -> Timeout { }
Timeout::new(&self, timeout) }
} }
/// Builds a `Future` to execute an action after the given `delay` has elapsed. /// Builds a `Future` to execute an action after the given `delay` has elapsed.
pub fn delay_for<F, Fut>(&self, delay: Duration, f: F) -> impl Future<Output = Fut::Output> pub fn delay_for<F, Fut>(&self, delay: Duration, f: F) -> impl Future<Output = Fut::Output>
where where
F: FnOnce() -> Fut + Send + Sync + 'static, F: FnOnce() -> Fut + Send + Sync + 'static,
Fut: Future<Output = ()> + Send + 'static, Fut: Future + Send + 'static,
{ {
self.new_timeout(delay).then(move |_| f()) async move {
} tokio::time::delay_for(delay).await;
} f().await
static TIMER_ENTRY_ID: atomic::AtomicUsize = atomic::AtomicUsize::new(0);
// Ad-hoc interval timer implementation for our throttled event loop above
#[derive(Debug)]
struct TimerEntry {
time: Instant,
id: usize, // for producing a total order
interval: Option<Duration>,
sender: future_mpsc::UnboundedSender<()>,
}
impl PartialEq for TimerEntry {
fn eq(&self, other: &Self) -> bool {
self.time.eq(&other.time) && self.id.eq(&other.id)
}
}
impl Eq for TimerEntry {}
impl PartialOrd for TimerEntry {
fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
Some(self.cmp(&other))
}
}
impl Ord for TimerEntry {
fn cmp(&self, other: &Self) -> cmp::Ordering {
other
.time
.cmp(&self.time)
.then_with(|| other.id.cmp(&self.id))
}
}
/// A `Stream` that yields a tick at `interval`s.
#[derive(Debug)]
pub struct Interval {
receiver: future_mpsc::UnboundedReceiver<()>,
}
impl Interval {
fn new(context: &Context, interval: Duration) -> Self {
Self {
receiver: context.add_timer(Instant::now(), Some(interval)),
}
}
}
impl Stream for Interval {
type Item = Result<(), ()>;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context,
) -> Poll<Option<Self::Item>> {
self.receiver
.poll_next_unpin(cx)
.map(|item_opt| item_opt.map(Ok))
}
}
/// A `Future` that completes after a `timeout` is elapsed.
#[derive(Debug)]
pub struct Timeout {
receiver: future_mpsc::UnboundedReceiver<()>,
}
impl Timeout {
fn new(context: &Context, timeout: Duration) -> Self {
Self {
receiver: context.add_timer(Instant::now() + timeout, None),
}
}
}
impl Future for Timeout {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context) -> Poll<Self::Output> {
match ready!(self.receiver.poll_next_unpin(cx)) {
Some(_) => Poll::Ready(()),
None => unreachable!(),
} }
} }
} }
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use futures::channel::{mpsc, oneshot}; use futures::channel::mpsc;
use futures::future::Aborted; use futures::future::abortable;
use futures::lock::Mutex; use futures::lock::Mutex;
use gst; use gst;
use std::sync::Arc; use std::sync::Arc;
use std::time::{Duration, Instant}; use std::time::Instant;
use crate::block_on;
use crate::runtime::future::abortable_waitable;
use super::*; use super::*;
type Item = i32; type Item = i32;
const SLEEP_DURATION: u32 = 2; const SLEEP_DURATION: u32 = 2;
const INTERVAL: Duration = Duration::from_millis(100 * SLEEP_DURATION as u64); const INTERVAL: Duration = std::time::Duration::from_millis(100 * SLEEP_DURATION as u64);
#[test] #[tokio::test]
fn user_drain_pending_tasks() { async fn user_drain_pending_tasks() {
// Setup // Setup
gst::init().unwrap(); gst::init().unwrap();
@ -649,15 +397,15 @@ mod tests {
// User triggered drain // User triggered drain
receiver.try_next().unwrap_err(); receiver.try_next().unwrap_err();
block_on!(drain).unwrap(); drain.await.unwrap();
assert_eq!(receiver.try_next().unwrap(), Some(0)); assert_eq!(receiver.try_next().unwrap(), Some(0));
add_task(1).unwrap(); add_task(1).unwrap();
receiver.try_next().unwrap_err(); receiver.try_next().unwrap_err();
} }
#[test] #[tokio::test]
fn delay_for() { async fn delay_for() {
gst::init().unwrap(); gst::init().unwrap();
let context = Context::acquire("delay_for", SLEEP_DURATION).unwrap(); let context = Context::acquire("delay_for", SLEEP_DURATION).unwrap();
@ -665,41 +413,21 @@ mod tests {
let (sender, receiver) = oneshot::channel(); let (sender, receiver) = oneshot::channel();
let start = Instant::now(); let start = Instant::now();
let delayed_by_fut = context.delay_for(INTERVAL, move || async { let delayed_by_fut = context.delay_for(INTERVAL, move || {
sender.send(42).unwrap(); async {
sender.send(42).unwrap();
}
}); });
context.spawn(delayed_by_fut); context.spawn(delayed_by_fut);
let _ = block_on!(receiver).unwrap(); let _ = receiver.await.unwrap();
let delta = Instant::now() - start; let delta = Instant::now() - start;
assert!(delta >= INTERVAL); assert!(delta >= INTERVAL);
assert!(delta < INTERVAL * 2); assert!(delta < INTERVAL * 2);
} }
#[test] #[tokio::test]
fn delay_for_abort() { async fn interval_ok() {
gst::init().unwrap();
let context = Context::acquire("delay_for_abort", SLEEP_DURATION).unwrap();
let (sender, receiver) = oneshot::channel();
let delay_for_fut = context.delay_for(INTERVAL, move || async {
sender.send(42).unwrap();
});
let (abortable_delay_for, abort_handle) = abortable_waitable(delay_for_fut);
context.spawn(abortable_delay_for.map(move |res| {
if let Err(Aborted) = res {
gst_debug!(RUNTIME_CAT, "Aborted delay_for");
}
}));
block_on!(abort_handle.abort_and_wait()).unwrap();
block_on!(receiver).unwrap_err();
}
#[test]
fn interval_ok() {
gst::init().unwrap(); gst::init().unwrap();
let context = Context::acquire("interval_ok", SLEEP_DURATION).unwrap(); let context = Context::acquire("interval_ok", SLEEP_DURATION).unwrap();
@ -707,37 +435,36 @@ mod tests {
let (sender, mut receiver) = mpsc::channel(1); let (sender, mut receiver) = mpsc::channel(1);
let sender: Arc<Mutex<mpsc::Sender<Instant>>> = Arc::new(Mutex::new(sender)); let sender: Arc<Mutex<mpsc::Sender<Instant>>> = Arc::new(Mutex::new(sender));
let interval_fut = context.interval(INTERVAL, move || { let (interval_fut, handle) = abortable(context.interval(INTERVAL, move || {
let sender = Arc::clone(&sender); let sender = Arc::clone(&sender);
async move { async move {
let instant = Instant::now(); let instant = Instant::now();
sender.lock().await.send(instant).await.map_err(drop) sender.lock().await.send(instant).await.map_err(drop)
} }
}); }));
context.spawn(interval_fut.map(drop)); context.spawn(interval_fut.map(drop));
block_on!(async { let mut idx: u32 = 0;
let mut idx: u32 = 0; let mut first = Instant::now();
let mut first = Instant::now(); while let Some(instant) = receiver.next().await {
while let Some(instant) = receiver.next().await { if idx > 0 {
if idx > 0 { let delta = instant - first;
let delta = instant - first; assert!(delta > INTERVAL * (idx - 1));
assert!(delta > INTERVAL * (idx - 1)); assert!(delta < INTERVAL * (idx + 1));
assert!(delta < INTERVAL * (idx + 1)); } else {
} else { first = instant;
first = instant;
}
if idx == 3 {
break;
}
idx += 1;
} }
}); if idx == 3 {
handle.abort();
break;
}
idx += 1;
}
} }
#[test] #[tokio::test]
fn interval_err() { async fn interval_err() {
gst::init().unwrap(); gst::init().unwrap();
let context = Context::acquire("interval_err", SLEEP_DURATION).unwrap(); let context = Context::acquire("interval_err", SLEEP_DURATION).unwrap();
@ -763,68 +490,20 @@ mod tests {
}); });
context.spawn(interval_fut.map(drop)); context.spawn(interval_fut.map(drop));
block_on!(async { let mut idx: u32 = 0;
let mut idx: u32 = 0; let mut first = Instant::now();
let mut first = Instant::now(); while let Some(instant) = receiver.next().await {
while let Some(instant) = receiver.next().await { if idx > 0 {
if idx > 0 { let delta = instant - first;
let delta = instant - first; assert!(delta > INTERVAL * (idx - 1));
assert!(delta > INTERVAL * (idx - 1)); assert!(delta < INTERVAL * (idx + 1));
assert!(delta < INTERVAL * (idx + 1)); } else {
} else { first = instant;
first = instant;
}
idx += 1;
} }
assert_eq!(idx, 3); idx += 1;
}); }
}
#[test] assert_eq!(idx, 3);
fn interval_abort() {
gst::init().unwrap();
let context = Context::acquire("interval_abort", SLEEP_DURATION).unwrap();
let (sender, mut receiver) = mpsc::channel(1);
let sender: Arc<Mutex<mpsc::Sender<Instant>>> = Arc::new(Mutex::new(sender));
let interval_fut = context.interval(INTERVAL, move || {
let sender = Arc::clone(&sender);
async move {
let instant = Instant::now();
sender.lock().await.send(instant).await.map_err(drop)
}
});
let (abortable_interval, abort_handle) = abortable_waitable(interval_fut);
context.spawn(abortable_interval.map(move |res| {
if let Err(Aborted) = res {
gst_debug!(RUNTIME_CAT, "Aborted timeout");
}
}));
block_on!(async {
let mut idx: u32 = 0;
let mut first = Instant::now();
while let Some(instant) = receiver.next().await {
if idx > 0 {
let delta = instant - first;
assert!(delta > INTERVAL * (idx - 1));
assert!(delta < INTERVAL * (idx + 1));
} else {
first = instant;
}
if idx == 3 {
abort_handle.abort_and_wait().await.unwrap();
break;
}
idx += 1;
}
assert_eq!(receiver.next().await, None);
});
} }
} }

View file

@ -1,198 +0,0 @@
// Copyright (C) 2019 François Laignel <fengalin@free.fr>
//
// This library is free software; you can redistribute it and/or
// modify it under the terms of the GNU Library General Public
// License as published by the Free Software Foundation; either
// version 2 of the License, or (at your option) any later version.
//
// This library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
// Library General Public License for more details.
//
// You should have received a copy of the GNU Library General Public
// License along with this library; if not, write to the
// Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
// Boston, MA 02110-1335, USA.
use futures::future::{self, AbortHandle, Abortable};
use futures::prelude::*;
use super::{waitable, WaitError, WaitHandle, Waitable};
pub type AbortableWaitable<Fut> = Waitable<Abortable<Fut>>;
/// Builds an [`Abortable`] and [`Waitable`] `Future` from the provided `Future`.
///
/// See [`AbortWaitHandle`].
///
/// [`Abortable`]: https://rust-lang-nursery.github.io/futures-api-docs/0.3.0-alpha.19/futures/future/struct.Abortable.html
/// [`Waitable`]: struct.Waitable.html
/// [`AbortWaitHandle`]: struct.AbortWaitHandle.html
pub fn abortable_waitable<Fut: Future>(future: Fut) -> (AbortableWaitable<Fut>, AbortWaitHandle) {
let (abortable, abort_handle) = future::abortable(future);
let (abortable_waitable, wait_handle) = waitable(abortable);
(
abortable_waitable,
AbortWaitHandle::new(abort_handle, wait_handle),
)
}
/// A handle to an [`Abortable`] and [`Waitable`] `Future`.
///
/// The handle allows checking for the `Future` state, canceling the `Future` and waiting until
/// the `Future` completes.
///
/// [`Abortable`]: https://rust-lang-nursery.github.io/futures-api-docs/0.3.0-alpha.19/futures/future/struct.Abortable.html
/// [`Waitable`]: struct.Waitable.html
#[derive(Debug)]
pub struct AbortWaitHandle {
abort_handle: AbortHandle,
wait_handle: WaitHandle,
}
impl AbortWaitHandle {
fn new(abort_handle: AbortHandle, wait_handle: WaitHandle) -> Self {
AbortWaitHandle {
abort_handle,
wait_handle,
}
}
pub fn is_terminated(&mut self) -> bool {
self.wait_handle.is_terminated()
}
pub fn is_cancelled(&mut self) -> bool {
self.wait_handle.is_cancelled()
}
pub async fn wait(self) -> Result<(), WaitError> {
self.wait_handle.wait().await
}
pub fn abort(&self) {
self.abort_handle.abort();
}
pub async fn abort_and_wait(mut self) -> Result<(), WaitError> {
if self.wait_handle.is_terminated() {
if self.wait_handle.is_cancelled() {
return Err(WaitError::Cancelled);
}
return Ok(());
}
self.abort_handle.abort();
self.wait_handle.wait().await
}
}
#[cfg(test)]
mod tests {
use super::*;
use futures::channel::{mpsc, oneshot};
#[derive(Debug, PartialEq)]
enum State {
Released,
Terminated,
Triggered,
}
#[tokio::test]
async fn abort_wait_async_non_blocking_task() {
let (trigger_sender, trigger_receiver) = oneshot::channel::<()>();
let (_release_sender, release_receiver) = oneshot::channel::<()>();
let (mut state_sender_abrt, mut state_receiver_abrt) = mpsc::channel(1);
let (shared, mut handle) = abortable_waitable(async move {
let _ = trigger_receiver.await;
state_sender_abrt.send(State::Triggered).await.unwrap();
let _ = release_receiver.await;
state_sender_abrt.send(State::Released).await.unwrap();
});
let (mut state_sender_spawn, mut state_receiver_spawn) = mpsc::channel(1);
tokio::spawn(async move {
let _ = shared.await;
state_sender_spawn.send(State::Terminated).await.unwrap();
});
drop(trigger_sender);
assert_eq!(state_receiver_abrt.next().await, Some(State::Triggered));
assert!(!handle.is_terminated());
assert_eq!(handle.abort_and_wait().await, Ok(()));
assert_eq!(state_receiver_spawn.next().await, Some(State::Terminated));
assert_eq!(state_receiver_abrt.next().await, None);
}
#[tokio::test]
async fn abort_wait_blocking_task() {
let (trigger_sender, trigger_receiver) = oneshot::channel::<()>();
let (release_sender, release_receiver) = oneshot::channel::<()>();
let (mut state_sender_abrt, mut state_receiver_abrt) = mpsc::channel(1);
let (shared, mut handle) = abortable_waitable(async move {
let _ = trigger_receiver.await;
state_sender_abrt.send(State::Triggered).await.unwrap();
let _ = release_receiver.await;
state_sender_abrt.send(State::Released).await.unwrap();
});
let (mut state_sender_spawn, mut state_receiver_spawn) = mpsc::channel(1);
tokio::spawn(async move {
let _ = shared.await;
state_sender_spawn.send(State::Terminated).await.unwrap();
});
drop(trigger_sender);
assert_eq!(state_receiver_abrt.next().await, Some(State::Triggered));
assert!(!handle.is_terminated());
drop(release_sender);
assert_eq!(state_receiver_abrt.next().await, Some(State::Released));
assert_eq!(handle.abort_and_wait().await, Ok(()));
assert_eq!(state_receiver_spawn.next().await, Some(State::Terminated));
assert_eq!(state_receiver_abrt.next().await, None);
}
#[tokio::test]
async fn abort_only() {
let (trigger_sender, trigger_receiver) = oneshot::channel::<()>();
let (_release_sender, release_receiver) = oneshot::channel::<()>();
let (mut state_sender_abrt, mut state_receiver_abrt) = mpsc::channel(1);
let (shared, mut handle) = abortable_waitable(async move {
let _ = trigger_receiver.await;
state_sender_abrt.send(State::Triggered).await.unwrap();
let _ = release_receiver.await;
state_sender_abrt.send(State::Released).await.unwrap();
});
let (mut state_sender_spawn, mut state_receiver_spawn) = mpsc::channel(1);
tokio::spawn(async move {
let _ = shared.await;
state_sender_spawn.send(State::Terminated).await.unwrap();
});
assert!(!handle.is_terminated());
drop(trigger_sender);
assert_eq!(state_receiver_abrt.next().await, Some(State::Triggered));
assert!(!handle.is_terminated());
handle.abort();
assert_eq!(state_receiver_spawn.next().await, Some(State::Terminated));
}
}

View file

@ -1,24 +0,0 @@
// Copyright (C) 2019 François Laignel <fengalin@free.fr>
//
// This library is free software; you can redistribute it and/or
// modify it under the terms of the GNU Library General Public
// License as published by the Free Software Foundation; either
// version 2 of the License, or (at your option) any later version.
//
// This library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
// Library General Public License for more details.
//
// You should have received a copy of the GNU Library General Public
// License along with this library; if not, write to the
// Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
// Boston, MA 02110-1335, USA.
//! `Future`s combinators which help implementing statefull asynchronous `Processor`s.
mod abortable_waitable;
pub use abortable_waitable::{abortable_waitable, AbortWaitHandle, AbortableWaitable};
mod waitable;
pub use waitable::{waitable, WaitError, WaitHandle, Waitable};

View file

@ -1,224 +0,0 @@
// Copyright (C) 2019 François Laignel <fengalin@free.fr>
//
// This library is free software; you can redistribute it and/or
// modify it under the terms of the GNU Library General Public
// License as published by the Free Software Foundation; either
// version 2 of the License, or (at your option) any later version.
//
// This library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
// Library General Public License for more details.
//
// You should have received a copy of the GNU Library General Public
// License along with this library; if not, write to the
// Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
// Boston, MA 02110-1335, USA.
use futures::channel::oneshot;
use futures::prelude::*;
use futures::ready;
use pin_project::pin_project;
use std::pin::Pin;
use std::task::{self, Poll};
/// Builds a [`Waitable`] `Future` from the provided `Future`.
///
/// See [`WaitHandle`].
///
/// [`Waitable`]: struct.Waitable.html
/// [`WaitHandle`]: struct.WaitHandle.html
pub fn waitable<Fut: Future>(future: Fut) -> (Waitable<Fut>, WaitHandle) {
Waitable::new(future)
}
/// A `Waitable` `Future`.
///
/// See [`WaitHandle`].
///
/// [`WaitHandle`]: struct.WaitHandle.html
#[pin_project]
#[derive(Debug)]
pub struct Waitable<Fut> {
#[pin]
inner: Fut,
sender: Option<oneshot::Sender<()>>,
}
impl<Fut> Waitable<Fut> {
pub fn new(inner: Fut) -> (Waitable<Fut>, WaitHandle) {
let (sender, receiver) = oneshot::channel();
(
Waitable {
inner,
sender: Some(sender),
},
WaitHandle::new(receiver),
)
}
}
impl<Fut: Future> Future for Waitable<Fut> {
type Output = Fut::Output;
fn poll(self: Pin<&mut Self>, cx: &mut task::Context) -> Poll<Self::Output> {
let this = self.project();
let output = ready!(this.inner.poll(cx));
if let Some(sender) = this.sender.take() {
let _ = sender.send(());
}
Poll::Ready(output)
}
}
#[derive(Debug, Eq, PartialEq)]
pub enum WaitError {
Cancelled,
}
/// A handle to a [`Waitable`] `Future`.
///
/// The handle allows checking for the `Future` completion and waiting until the `Future`
/// completes.
///
/// [`Waitable`]: struct.Waitable.html
#[derive(Debug)]
pub struct WaitHandle {
receiver: oneshot::Receiver<()>,
is_terminated: bool,
is_cancelled: bool,
}
impl WaitHandle {
fn new(receiver: oneshot::Receiver<()>) -> WaitHandle {
WaitHandle {
receiver,
is_terminated: false,
is_cancelled: false,
}
}
pub fn is_terminated(&mut self) -> bool {
self.check_state();
self.is_terminated
}
pub fn is_cancelled(&mut self) -> bool {
self.check_state();
self.is_cancelled
}
pub async fn wait(self) -> Result<(), WaitError> {
if self.is_terminated {
if self.is_cancelled {
return Err(WaitError::Cancelled);
}
return Ok(());
}
self.receiver.await.map_err(|_| WaitError::Cancelled)
}
fn check_state(&mut self) {
if self.is_terminated {
// no need to check state
return;
}
let res = self.receiver.try_recv();
match res {
Ok(None) => (),
Ok(Some(())) => self.is_terminated = true,
Err(_) => {
self.is_terminated = true;
self.is_cancelled = true;
}
}
}
}
#[cfg(test)]
mod tests {
use futures::channel::{mpsc, oneshot};
use futures::future;
use std::time::Duration;
use tokio::future::FutureExt;
use super::*;
#[derive(Debug, PartialEq)]
enum State {
Released,
Terminated,
Triggered,
}
#[tokio::test]
async fn wait() {
let (trigger_sender, trigger_receiver) = oneshot::channel::<()>();
let (mut state_sender_wait, mut state_receiver_wait) = mpsc::channel(1);
let (shared, mut handle) = waitable(async move {
let _ = trigger_receiver.await;
state_sender_wait.send(State::Triggered).await.unwrap();
let _ = future::pending::<()>()
.timeout(Duration::from_secs(1))
.await;
state_sender_wait.send(State::Released).await.unwrap();
});
let (mut state_sender_spawn, mut state_receiver_spawn) = mpsc::channel(1);
tokio::spawn(async move {
let _ = shared.await;
state_sender_spawn.send(State::Terminated).await.unwrap();
});
drop(trigger_sender);
assert_eq!(state_receiver_wait.next().await, Some(State::Triggered));
assert!(!handle.is_terminated());
assert_eq!(handle.wait().await, Ok(()));
assert_eq!(state_receiver_wait.next().await, Some(State::Released));
assert_eq!(state_receiver_spawn.next().await, Some(State::Terminated));
assert_eq!(state_receiver_wait.next().await, None);
}
#[tokio::test]
async fn no_wait() {
let (trigger_sender, trigger_receiver) = oneshot::channel::<()>();
let (mut state_sender_wait, mut state_receiver_wait) = mpsc::channel(1);
let (shared, mut handle) = waitable(async move {
let _ = trigger_receiver.await;
state_sender_wait.send(State::Triggered).await.unwrap();
state_sender_wait.send(State::Released).await.unwrap();
});
let (mut state_sender_spawn, mut state_receiver_spawn) = mpsc::channel(1);
tokio::spawn(async move {
let _ = shared.await;
state_sender_spawn.send(State::Terminated).await.unwrap();
});
drop(trigger_sender);
assert_eq!(state_receiver_wait.next().await, Some(State::Triggered));
assert!(!handle.is_terminated());
assert_eq!(state_receiver_wait.next().await, Some(State::Released));
assert_eq!(state_receiver_spawn.next().await, Some(State::Terminated));
assert_eq!(state_receiver_wait.next().await, None);
assert!(handle.is_terminated());
assert!(!handle.is_cancelled());
}
}

View file

@ -1,25 +0,0 @@
// Copyright (C) 2019 François Laignel <fengalin@free.fr>
//
// This library is free software; you can redistribute it and/or
// modify it under the terms of the GNU Library General Public
// License as published by the Free Software Foundation; either
// version 2 of the License, or (at your option) any later version.
//
// This library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
// Library General Public License for more details.
//
// You should have received a copy of the GNU Library General Public
// License along with this library; if not, write to the
// Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
// Boston, MA 02110-1335, USA.
//! A set of `macro`s to ease the implementation of asynchronous processings.
#[macro_export]
macro_rules! block_on {
($future:expr) => {
$crate::tokio_executor::current_thread::CurrentThread::new().block_on($future)
};
}

View file

@ -28,7 +28,8 @@
//! The `threadshare` `runtime` is a framework to build `Element`s for such applications. It //! The `threadshare` `runtime` is a framework to build `Element`s for such applications. It
//! uses light-weight threading to allow multiple `Element`s share a reduced number of OS `thread`s. //! uses light-weight threading to allow multiple `Element`s share a reduced number of OS `thread`s.
//! //!
//! See this [talk] ([slides]) for a presentation of the motivations and principles. //! See this [talk] ([slides]) for a presentation of the motivations and principles,
//! and this [blog post].
//! //!
//! Current implementation uses the crate [`tokio`]. //! Current implementation uses the crate [`tokio`].
//! //!
@ -37,17 +38,13 @@
//! //!
//! [talk]: https://gstconf.ubicast.tv/videos/when-adding-more-threads-adds-more-problems-thread-sharing-between-elements-in-gstreamer/ //! [talk]: https://gstconf.ubicast.tv/videos/when-adding-more-threads-adds-more-problems-thread-sharing-between-elements-in-gstreamer/
//! [slides]: https://gstreamer.freedesktop.org/data/events/gstreamer-conference/2018/Sebastian%20Dr%C3%B6ge%20-%20When%20adding%20more%20threads%20adds%20more%20problems:%20Thread-sharing%20between%20elements%20in%20GStreamer.pdf //! [slides]: https://gstreamer.freedesktop.org/data/events/gstreamer-conference/2018/Sebastian%20Dr%C3%B6ge%20-%20When%20adding%20more%20threads%20adds%20more%20problems:%20Thread-sharing%20between%20elements%20in%20GStreamer.pdf
//! [blog post]: https://coaxion.net/blog/2018/04/improving-gstreamer-performance-on-a-high-number-of-network-streams-by-sharing-threads-between-elements-with-rusts-tokio-crate
//! [`tokio`]: https://crates.io/crates/tokio //! [`tokio`]: https://crates.io/crates/tokio
//! [`PadSrc`]: pad/struct.PadSrc.html //! [`PadSrc`]: pad/struct.PadSrc.html
//! [`PadSink`]: pad/struct.PadSink.html //! [`PadSink`]: pad/struct.PadSink.html
pub mod executor; pub mod executor;
pub use executor::{Context, Interval, TaskOutput, Timeout}; pub use executor::{Context, TaskOutput};
pub mod future;
#[macro_use]
pub mod macros;
pub mod pad; pub mod pad;
pub use pad::{PadSink, PadSinkRef, PadSrc, PadSrcRef, PadSrcWeak}; pub use pad::{PadSink, PadSinkRef, PadSrc, PadSrcRef, PadSrcWeak};

View file

@ -65,6 +65,7 @@
use either::Either; use either::Either;
use futures::executor::block_on;
use futures::future; use futures::future;
use futures::future::BoxFuture; use futures::future::BoxFuture;
use futures::lock::{Mutex, MutexGuard}; use futures::lock::{Mutex, MutexGuard};
@ -81,8 +82,6 @@ use std::marker::PhantomData;
use std::sync; use std::sync;
use std::sync::{Arc, Weak}; use std::sync::{Arc, Weak};
use crate::block_on;
use super::executor::Context; use super::executor::Context;
use super::pad_context::{PadContext, PadContextRef, PadContextWeak}; use super::pad_context::{PadContext, PadContextRef, PadContextWeak};
use super::task::Task; use super::task::Task;
@ -400,7 +399,7 @@ impl<'a> PadSrcRef<'a> {
} }
if !active { if !active {
block_on!(async { block_on(async {
self.strong.lock_state().await.is_initialized = false; self.strong.lock_state().await.is_initialized = false;
}); });
} }
@ -1198,9 +1197,9 @@ impl PadSink {
pad_ctx.add_pending_task(fut.map(|res| res.map(drop))); pad_ctx.add_pending_task(fut.map(|res| res.map(drop)));
Ok(FlowSuccess::Ok) Ok(FlowSuccess::Ok)
} }
None => block_on!(fut), None => block_on(fut),
}, },
None => block_on!(fut), None => block_on(fut),
} }
} }

View file

@ -29,7 +29,7 @@ use glib::{glib_boxed_derive_traits, glib_boxed_type};
use std::marker::PhantomData; use std::marker::PhantomData;
use std::time::Duration; use std::time::Duration;
use super::executor::{Context, ContextWeak, Interval, TaskOutput, TaskQueueId, Timeout}; use super::executor::{Context, ContextWeak, TaskOutput, TaskQueueId};
#[derive(Clone)] #[derive(Clone)]
pub struct PadContextWeak { pub struct PadContextWeak {
@ -91,11 +91,12 @@ impl<'a> PadContextRef<'a> {
self.strong.downgrade() self.strong.downgrade()
} }
pub fn spawn<Fut>(&self, future: Fut) pub fn spawn<Fut>(&self, future: Fut) -> tokio::task::JoinHandle<Fut::Output>
where where
Fut: Future<Output = ()> + Send + 'static, Fut: Future + Send + 'static,
Fut::Output: Send + 'static,
{ {
self.strong.context.spawn(future); self.strong.context.spawn(future)
} }
pub fn add_pending_task<T>(&self, task: T) pub fn add_pending_task<T>(&self, task: T)
@ -117,30 +118,23 @@ impl<'a> PadContextRef<'a> {
&self.strong.context &self.strong.context
} }
pub fn new_interval(&self, interval: Duration) -> Interval {
self.strong.new_interval(interval)
}
/// Builds a `Future` to execute an `action` at [`Interval`]s. /// Builds a `Future` to execute an `action` at [`Interval`]s.
/// ///
/// [`Interval`]: struct.Interval.html /// [`Interval`]: struct.Interval.html
pub fn interval<F, Fut>(&self, interval: Duration, f: F) -> impl Future<Output = Fut::Output> pub fn interval<F, E, Fut>(&self, interval: Duration, f: F) -> impl Future<Output = Fut::Output>
where where
F: Fn() -> Fut + Send + Sync + 'static, F: Fn() -> Fut + Send + Sync + 'static,
Fut: Future<Output = Result<(), ()>> + Send + 'static, E: Send + 'static,
Fut: Future<Output = Result<(), E>> + Send + 'static,
{ {
self.strong.interval(interval, f) self.strong.interval(interval, f)
} }
pub fn new_timeout(&self, timeout: Duration) -> Timeout {
self.strong.new_timeout(timeout)
}
/// Builds a `Future` to execute an action after the given `delay` has elapsed. /// Builds a `Future` to execute an action after the given `delay` has elapsed.
pub fn delay_for<F, Fut>(&self, delay: Duration, f: F) -> impl Future<Output = Fut::Output> pub fn delay_for<F, Fut>(&self, delay: Duration, f: F) -> impl Future<Output = Fut::Output>
where where
F: FnOnce() -> Fut + Send + Sync + 'static, F: FnOnce() -> Fut + Send + Sync + 'static,
Fut: Future<Output = ()> + Send + 'static, Fut: Future + Send + 'static,
{ {
self.strong.delay_for(delay, f) self.strong.delay_for(delay, f)
} }
@ -188,29 +182,20 @@ impl PadContextStrong {
} }
#[inline] #[inline]
fn new_interval(&self, interval: Duration) -> Interval { fn interval<F, E, Fut>(&self, interval: Duration, f: F) -> impl Future<Output = Fut::Output>
self.context.new_interval(interval)
}
#[inline]
fn interval<F, Fut>(&self, interval: Duration, f: F) -> impl Future<Output = Fut::Output>
where where
F: Fn() -> Fut + Send + Sync + 'static, F: Fn() -> Fut + Send + Sync + 'static,
Fut: Future<Output = Result<(), ()>> + Send + 'static, E: Send + 'static,
Fut: Future<Output = Result<(), E>> + Send + 'static,
{ {
self.context.interval(interval, f) self.context.interval(interval, f)
} }
#[inline]
fn new_timeout(&self, timeout: Duration) -> Timeout {
self.context.new_timeout(timeout)
}
#[inline] #[inline]
pub fn delay_for<F, Fut>(&self, delay: Duration, f: F) -> impl Future<Output = Fut::Output> pub fn delay_for<F, Fut>(&self, delay: Duration, f: F) -> impl Future<Output = Fut::Output>
where where
F: FnOnce() -> Fut + Send + Sync + 'static, F: FnOnce() -> Fut + Send + Sync + 'static,
Fut: Future<Output = ()> + Send + 'static, Fut: Future + Send + 'static,
{ {
self.context.delay_for(delay, f) self.context.delay_for(delay, f)
} }
@ -246,11 +231,12 @@ impl PadContext {
PadContextRef::new(self.0.context.clone(), self.0.queue_id) PadContextRef::new(self.0.context.clone(), self.0.queue_id)
} }
pub fn spawn<Fut>(&self, future: Fut) pub fn spawn<Fut>(&self, future: Fut) -> tokio::task::JoinHandle<Fut::Output>
where where
Fut: Future<Output = ()> + Send + 'static, Fut: Future + Send + 'static,
Fut::Output: Send + 'static,
{ {
self.0.context.spawn(future); self.0.context.spawn(future)
} }
pub fn drain_pending_tasks(&self) -> Option<impl Future<Output = TaskOutput>> { pub fn drain_pending_tasks(&self) -> Option<impl Future<Output = TaskOutput>> {
@ -261,30 +247,23 @@ impl PadContext {
self.0.clear_pending_tasks(); self.0.clear_pending_tasks();
} }
pub fn new_interval(&self, interval: Duration) -> Interval {
self.0.new_interval(interval)
}
/// Builds a `Future` to execute an `action` at [`Interval`]s. /// Builds a `Future` to execute an `action` at [`Interval`]s.
/// ///
/// [`Interval`]: struct.Interval.html /// [`Interval`]: struct.Interval.html
pub fn interval<F, Fut>(&self, interval: Duration, f: F) -> impl Future<Output = Fut::Output> pub fn interval<F, E, Fut>(&self, interval: Duration, f: F) -> impl Future<Output = Fut::Output>
where where
F: Fn() -> Fut + Send + Sync + 'static, F: Fn() -> Fut + Send + Sync + 'static,
Fut: Future<Output = Result<(), ()>> + Send + 'static, E: Send + 'static,
Fut: Future<Output = Result<(), E>> + Send + 'static,
{ {
self.0.interval(interval, f) self.0.interval(interval, f)
} }
pub fn new_timeout(&self, timeout: Duration) -> Timeout {
self.0.new_timeout(timeout)
}
/// Builds a `Future` to execute an action after the given `delay` has elapsed. /// Builds a `Future` to execute an action after the given `delay` has elapsed.
pub fn delay_for<F, Fut>(&self, delay: Duration, f: F) -> impl Future<Output = Fut::Output> pub fn delay_for<F, Fut>(&self, delay: Duration, f: F) -> impl Future<Output = Fut::Output>
where where
F: FnOnce() -> Fut + Send + Sync + 'static, F: FnOnce() -> Fut + Send + Sync + 'static,
Fut: Future<Output = ()> + Send + 'static, Fut: Future + Send + 'static,
{ {
self.0.delay_for(delay, f) self.0.delay_for(delay, f)
} }

View file

@ -19,8 +19,7 @@
//! //!
//! [`Context`]: ../executor/struct.Context.html //! [`Context`]: ../executor/struct.Context.html
use futures::channel::oneshot; use futures::future::{self, abortable, AbortHandle, Aborted, BoxFuture};
use futures::future::{self, BoxFuture};
use futures::lock::Mutex; use futures::lock::Mutex;
use futures::prelude::*; use futures::prelude::*;
@ -30,7 +29,6 @@ use gst::{gst_debug, gst_log, gst_trace, gst_warning};
use std::fmt; use std::fmt;
use std::sync::Arc; use std::sync::Arc;
use super::future::{abortable_waitable, AbortWaitHandle};
use super::{Context, RUNTIME_CAT}; use super::{Context, RUNTIME_CAT};
#[derive(Clone, Debug, Eq, PartialEq)] #[derive(Clone, Debug, Eq, PartialEq)]
@ -52,8 +50,8 @@ impl std::error::Error for TaskError {}
struct TaskInner { struct TaskInner {
context: Option<Context>, context: Option<Context>,
state: TaskState, state: TaskState,
loop_end_sender: Option<oneshot::Sender<()>>, abort_handle: Option<AbortHandle>,
loop_handle: Option<AbortWaitHandle>, loop_handle: Option<tokio::task::JoinHandle<Result<(), Aborted>>>,
} }
impl Default for TaskInner { impl Default for TaskInner {
@ -61,7 +59,7 @@ impl Default for TaskInner {
TaskInner { TaskInner {
context: None, context: None,
state: TaskState::Stopped, state: TaskState::Stopped,
loop_end_sender: None, abort_handle: None,
loop_handle: None, loop_handle: None,
} }
} }
@ -136,17 +134,13 @@ impl Task {
gst_debug!(RUNTIME_CAT, "Starting Task"); gst_debug!(RUNTIME_CAT, "Starting Task");
let (loop_fut, loop_handle) = abortable_waitable(async move { let (loop_fut, abort_handle) = abortable(async move {
loop { loop {
func().await; func().await;
let mut inner = inner_clone.lock().await; match inner_clone.lock().await.state {
match inner.state {
TaskState::Started => (), TaskState::Started => (),
TaskState::Paused | TaskState::Stopped => { TaskState::Paused | TaskState::Stopped => {
inner.loop_handle = None;
inner.loop_end_sender.take();
break; break;
} }
other => unreachable!("Unexpected Task state {:?}", other), other => unreachable!("Unexpected Task state {:?}", other),
@ -154,12 +148,13 @@ impl Task {
} }
}); });
inner let loop_handle = inner
.context .context
.as_ref() .as_ref()
.expect("Context not set") .expect("Context not set")
.spawn(loop_fut.map(drop)); .spawn(loop_fut);
inner.abort_handle = Some(abort_handle);
inner.loop_handle = Some(loop_handle); inner.loop_handle = Some(loop_handle);
inner.state = TaskState::Started; inner.state = TaskState::Started;
@ -175,11 +170,10 @@ impl Task {
inner.state = TaskState::Paused; inner.state = TaskState::Paused;
let (sender, receiver) = oneshot::channel(); let loop_handle = inner.loop_handle.take().unwrap();
inner.loop_end_sender = Some(sender);
async move { async move {
let _ = receiver.await; let _ = loop_handle.await;
gst_log!(RUNTIME_CAT, "Task Paused"); gst_log!(RUNTIME_CAT, "Task Paused");
} }
.boxed() .boxed()
@ -206,8 +200,12 @@ impl Task {
gst_debug!(RUNTIME_CAT, "Stopping Task"); gst_debug!(RUNTIME_CAT, "Stopping Task");
if let Some(abort_handle) = inner.abort_handle.take() {
abort_handle.abort();
}
if let Some(loop_handle) = inner.loop_handle.take() { if let Some(loop_handle) = inner.loop_handle.take() {
let _ = loop_handle.abort_and_wait().await; let _ = loop_handle.await;
} }
inner.state = TaskState::Stopped; inner.state = TaskState::Stopped;

View file

@ -17,6 +17,7 @@
// Boston, MA 02110-1335, USA. // Boston, MA 02110-1335, USA.
use either::Either; use either::Either;
use futures::executor::block_on;
use futures::future::BoxFuture; use futures::future::BoxFuture;
use futures::lock::{Mutex, MutexGuard}; use futures::lock::{Mutex, MutexGuard};
use futures::prelude::*; use futures::prelude::*;
@ -43,8 +44,8 @@ use std::sync::Arc;
use std::u16; use std::u16;
use tokio::io::AsyncReadExt; use tokio::io::AsyncReadExt;
use tokio::task::JoinHandle;
use crate::block_on;
use crate::runtime::prelude::*; use crate::runtime::prelude::*;
use crate::runtime::{Context, PadSrc, PadSrcRef}; use crate::runtime::{Context, PadSrc, PadSrcRef};
@ -144,32 +145,14 @@ static PROPERTIES: [subclass::Property; 6] = [
]; ];
struct TcpClientReaderInner { struct TcpClientReaderInner {
connect_future: Option<BoxFuture<'static, io::Result<tokio::net::TcpStream>>>, socket: tokio::net::TcpStream,
socket: Option<tokio::net::TcpStream>,
}
impl TcpClientReaderInner {
fn new<Fut>(connect_future: Fut) -> Self
where
Fut: Future<Output = io::Result<tokio::net::TcpStream>> + Send + 'static,
{
Self {
connect_future: Some(connect_future.boxed()),
socket: None,
}
}
} }
pub struct TcpClientReader(Arc<Mutex<TcpClientReaderInner>>); pub struct TcpClientReader(Arc<Mutex<TcpClientReaderInner>>);
impl TcpClientReader { impl TcpClientReader {
pub fn new<Fut>(connect_future: Fut) -> Self pub fn new(socket: tokio::net::TcpStream) -> Self {
where TcpClientReader(Arc::new(Mutex::new(TcpClientReaderInner { socket })))
Fut: Future<Output = io::Result<tokio::net::TcpStream>> + Send + 'static,
{
TcpClientReader(Arc::new(Mutex::new(TcpClientReaderInner::new(
connect_future,
))))
} }
} }
@ -183,18 +166,12 @@ impl SocketRead for TcpClientReader {
let this = Arc::clone(&self.0); let this = Arc::clone(&self.0);
async move { async move {
let mut this = this.lock().await; this.lock()
.await
let socket = match this.socket { .socket
Some(ref mut socket) => socket, .read(buffer)
None => { .await
let stream = this.connect_future.take().unwrap().await?; .map(|read_size| (read_size, None))
this.socket = Some(stream);
this.socket.as_mut().unwrap()
}
};
socket.read(buffer).await.map(|read_size| (read_size, None))
} }
.boxed() .boxed()
} }
@ -363,7 +340,7 @@ impl PadSrcHandler for TcpClientSrcPadHandler {
let ret = match event.view() { let ret = match event.view() {
EventView::FlushStart(..) => { EventView::FlushStart(..) => {
let _ = block_on!(tcpclientsrc.pause(element)); let _ = block_on(tcpclientsrc.pause(element));
true true
} }
EventView::FlushStop(..) => { EventView::FlushStop(..) => {
@ -371,7 +348,7 @@ impl PadSrcHandler for TcpClientSrcPadHandler {
if res == Ok(gst::StateChangeSuccess::Success) && state == gst::State::Playing if res == Ok(gst::StateChangeSuccess::Success) && state == gst::State::Playing
|| res == Ok(gst::StateChangeSuccess::Async) && pending == gst::State::Playing || res == Ok(gst::StateChangeSuccess::Async) && pending == gst::State::Playing
{ {
let _ = block_on!(tcpclientsrc.start(element)); let _ = block_on(tcpclientsrc.start(element));
} }
true true
} }
@ -408,7 +385,7 @@ impl PadSrcHandler for TcpClientSrcPadHandler {
true true
} }
QueryView::Caps(ref mut q) => { QueryView::Caps(ref mut q) => {
let inner = block_on!(self.lock()); let inner = block_on(self.lock());
let caps = if let Some(ref caps) = inner.configured_caps { let caps = if let Some(ref caps) = inner.configured_caps {
q.get_filter() q.get_filter()
.map(|f| f.intersect_with_mode(caps, gst::CapsIntersectMode::First)) .map(|f| f.intersect_with_mode(caps, gst::CapsIntersectMode::First))
@ -446,11 +423,18 @@ impl Default for State {
} }
} }
#[derive(Debug)]
struct PreparationSet {
join_handle: JoinHandle<Result<(), gst::ErrorMessage>>,
context: Context,
}
struct TcpClientSrc { struct TcpClientSrc {
src_pad: PadSrc, src_pad: PadSrc,
src_pad_handler: TcpClientSrcPadHandler, src_pad_handler: TcpClientSrcPadHandler,
state: Mutex<State>, state: Mutex<State>,
settings: Mutex<Settings>, settings: Mutex<Settings>,
preparation_set: Mutex<Option<PreparationSet>>,
} }
lazy_static! { lazy_static! {
@ -463,10 +447,35 @@ lazy_static! {
impl TcpClientSrc { impl TcpClientSrc {
async fn prepare(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> { async fn prepare(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
let mut state = self.state.lock().await; let _state = self.state.lock().await;
gst_debug!(CAT, obj: element, "Preparing"); gst_debug!(CAT, obj: element, "Preparing");
let settings = self.settings.lock().await; let context = {
let settings = self.settings.lock().await;
Context::acquire(&settings.context, settings.context_wait).map_err(|err| {
gst_error_msg!(
gst::ResourceError::OpenRead,
["Failed to acquire Context: {}", err]
)
})?
};
// TcpStream needs to be instantiated in the thread of its I/O reactor
*self.preparation_set.lock().await = Some(PreparationSet {
join_handle: context.spawn(Self::prepare_socket(element.clone())),
context,
});
gst_debug!(CAT, obj: element, "Prepared");
Ok(())
}
async fn prepare_socket(element: gst::Element) -> Result<(), gst::ErrorMessage> {
let this = Self::from_instance(&element);
let settings = this.settings.lock().await.clone();
gst_debug!(CAT, obj: &element, "Preparing Socket");
let addr: IpAddr = match settings.address { let addr: IpAddr = match settings.address {
None => { None => {
@ -487,17 +496,14 @@ impl TcpClientSrc {
}; };
let port = settings.port; let port = settings.port;
let context =
Context::acquire(&settings.context, settings.context_wait).map_err(|err| {
gst_error_msg!(
gst::ResourceError::OpenRead,
["Failed to acquire Context: {}", err]
)
})?;
let saddr = SocketAddr::new(addr, port as u16); let saddr = SocketAddr::new(addr, port as u16);
gst_debug!(CAT, obj: element, "Connecting to {:?}", saddr); gst_debug!(CAT, obj: &element, "Connecting to {:?}", saddr);
let socket = tokio::net::TcpStream::connect(saddr); let socket = tokio::net::TcpStream::connect(saddr).await.map_err(|err| {
gst_error_msg!(
gst::ResourceError::Settings,
["Failed to connect to {:?} {:?}", saddr, err]
)
})?;
let buffer_pool = gst::BufferPool::new(); let buffer_pool = gst::BufferPool::new();
let mut config = buffer_pool.get_config(); let mut config = buffer_pool.get_config();
@ -515,11 +521,39 @@ impl TcpClientSrc {
buffer_pool, buffer_pool,
); );
let socket_stream = socket.prepare().await.map_err(|_| { let socket_stream = socket.prepare().await.map_err(|err| {
gst_error_msg!(gst::ResourceError::OpenRead, ["Failed to prepare socket"]) gst_error_msg!(
gst::ResourceError::OpenRead,
["Failed to prepare socket {:?}", err]
)
})?; })?;
self.src_pad_handler.lock().await.socket_stream = Some(socket_stream); this.src_pad_handler.lock().await.socket_stream = Some(socket_stream);
this.state.lock().await.socket = Some(socket);
gst_debug!(CAT, obj: &element, "Socket Prepared");
Ok(())
}
async fn complete_preparation(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
gst_debug!(CAT, obj: element, "Completing preparation");
let PreparationSet {
join_handle,
context,
} = self
.preparation_set
.lock()
.await
.take()
.expect("preparation_set already taken");
join_handle
.await
.expect("The socket preparation has panicked")?;
self.src_pad self.src_pad
.prepare(context, &self.src_pad_handler) .prepare(context, &self.src_pad_handler)
.await .await
@ -530,9 +564,7 @@ impl TcpClientSrc {
) )
})?; })?;
state.socket = Some(socket); gst_debug!(CAT, obj: element, "Preparation completed");
gst_debug!(CAT, obj: element, "Prepared");
Ok(()) Ok(())
} }
@ -632,6 +664,7 @@ impl ObjectSubclass for TcpClientSrc {
src_pad_handler: TcpClientSrcPadHandler::new(), src_pad_handler: TcpClientSrcPadHandler::new(),
state: Mutex::new(State::default()), state: Mutex::new(State::default()),
settings: Mutex::new(Settings::default()), settings: Mutex::new(Settings::default()),
preparation_set: Mutex::new(None),
} }
} }
} }
@ -644,30 +677,30 @@ impl ObjectImpl for TcpClientSrc {
match *prop { match *prop {
subclass::Property("address", ..) => { subclass::Property("address", ..) => {
let mut settings = block_on!(self.settings.lock()); let mut settings = block_on(self.settings.lock());
settings.address = value.get().expect("type checked upstream"); settings.address = value.get().expect("type checked upstream");
} }
subclass::Property("port", ..) => { subclass::Property("port", ..) => {
let mut settings = block_on!(self.settings.lock()); let mut settings = block_on(self.settings.lock());
settings.port = value.get_some().expect("type checked upstream"); settings.port = value.get_some().expect("type checked upstream");
} }
subclass::Property("caps", ..) => { subclass::Property("caps", ..) => {
let mut settings = block_on!(self.settings.lock()); let mut settings = block_on(self.settings.lock());
settings.caps = value.get().expect("type checked upstream"); settings.caps = value.get().expect("type checked upstream");
} }
subclass::Property("chunk-size", ..) => { subclass::Property("chunk-size", ..) => {
let mut settings = block_on!(self.settings.lock()); let mut settings = block_on(self.settings.lock());
settings.chunk_size = value.get_some().expect("type checked upstream"); settings.chunk_size = value.get_some().expect("type checked upstream");
} }
subclass::Property("context", ..) => { subclass::Property("context", ..) => {
let mut settings = block_on!(self.settings.lock()); let mut settings = block_on(self.settings.lock());
settings.context = value settings.context = value
.get() .get()
.expect("type checked upstream") .expect("type checked upstream")
.unwrap_or_else(|| "".into()); .unwrap_or_else(|| "".into());
} }
subclass::Property("context-wait", ..) => { subclass::Property("context-wait", ..) => {
let mut settings = block_on!(self.settings.lock()); let mut settings = block_on(self.settings.lock());
settings.context_wait = value.get_some().expect("type checked upstream"); settings.context_wait = value.get_some().expect("type checked upstream");
} }
_ => unimplemented!(), _ => unimplemented!(),
@ -679,27 +712,27 @@ impl ObjectImpl for TcpClientSrc {
match *prop { match *prop {
subclass::Property("address", ..) => { subclass::Property("address", ..) => {
let settings = block_on!(self.settings.lock()); let settings = block_on(self.settings.lock());
Ok(settings.address.to_value()) Ok(settings.address.to_value())
} }
subclass::Property("port", ..) => { subclass::Property("port", ..) => {
let settings = block_on!(self.settings.lock()); let settings = block_on(self.settings.lock());
Ok(settings.port.to_value()) Ok(settings.port.to_value())
} }
subclass::Property("caps", ..) => { subclass::Property("caps", ..) => {
let settings = block_on!(self.settings.lock()); let settings = block_on(self.settings.lock());
Ok(settings.caps.to_value()) Ok(settings.caps.to_value())
} }
subclass::Property("chunk-size", ..) => { subclass::Property("chunk-size", ..) => {
let settings = block_on!(self.settings.lock()); let settings = block_on(self.settings.lock());
Ok(settings.chunk_size.to_value()) Ok(settings.chunk_size.to_value())
} }
subclass::Property("context", ..) => { subclass::Property("context", ..) => {
let settings = block_on!(self.settings.lock()); let settings = block_on(self.settings.lock());
Ok(settings.context.to_value()) Ok(settings.context.to_value())
} }
subclass::Property("context-wait", ..) => { subclass::Property("context-wait", ..) => {
let settings = block_on!(self.settings.lock()); let settings = block_on(self.settings.lock());
Ok(settings.context_wait.to_value()) Ok(settings.context_wait.to_value())
} }
_ => unimplemented!(), _ => unimplemented!(),
@ -726,20 +759,22 @@ impl ElementImpl for TcpClientSrc {
match transition { match transition {
gst::StateChange::NullToReady => { gst::StateChange::NullToReady => {
block_on!(self.prepare(element)) block_on(self.prepare(element)).map_err(|err| {
.map_err(|err| { element.post_error_message(&err);
element.post_error_message(&err); gst::StateChangeError
gst::StateChangeError })?;
}) }
.and_then(|_| { gst::StateChange::ReadyToPaused => {
block_on!(self.start(element)).map_err(|_| gst::StateChangeError) block_on(self.complete_preparation(element)).map_err(|err| {
})?; element.post_error_message(&err);
gst::StateChangeError
})?;
} }
gst::StateChange::PlayingToPaused => { gst::StateChange::PlayingToPaused => {
block_on!(self.pause(element)).map_err(|_| gst::StateChangeError)?; block_on(self.pause(element)).map_err(|_| gst::StateChangeError)?;
} }
gst::StateChange::ReadyToNull => { gst::StateChange::ReadyToNull => {
block_on!(self.unprepare(element)).map_err(|_| gst::StateChangeError)?; block_on(self.unprepare(element)).map_err(|_| gst::StateChangeError)?;
} }
_ => (), _ => (),
} }
@ -750,8 +785,11 @@ impl ElementImpl for TcpClientSrc {
gst::StateChange::ReadyToPaused => { gst::StateChange::ReadyToPaused => {
success = gst::StateChangeSuccess::Success; success = gst::StateChangeSuccess::Success;
} }
gst::StateChange::PausedToPlaying => {
block_on(self.start(element)).map_err(|_| gst::StateChangeError)?;
}
gst::StateChange::PausedToReady => { gst::StateChange::PausedToReady => {
let mut src_pad_handler = block_on!(self.src_pad_handler.lock()); let mut src_pad_handler = block_on(self.src_pad_handler.lock());
src_pad_handler.need_initial_events = true; src_pad_handler.need_initial_events = true;
} }
_ => (), _ => (),

View file

@ -17,6 +17,7 @@
use either::Either; use either::Either;
use futures::executor::block_on;
use futures::future::BoxFuture; use futures::future::BoxFuture;
use futures::lock::{Mutex, MutexGuard}; use futures::lock::{Mutex, MutexGuard};
use futures::prelude::*; use futures::prelude::*;
@ -54,7 +55,8 @@ use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
#[cfg(windows)] #[cfg(windows)]
use std::os::windows::io::{AsRawSocket, FromRawSocket, IntoRawSocket, RawSocket}; use std::os::windows::io::{AsRawSocket, FromRawSocket, IntoRawSocket, RawSocket};
use crate::block_on; use tokio::task::JoinHandle;
use crate::runtime::prelude::*; use crate::runtime::prelude::*;
use crate::runtime::{Context, PadSrc, PadSrcRef}; use crate::runtime::{Context, PadSrc, PadSrcRef};
@ -295,14 +297,14 @@ static PROPERTIES: [subclass::Property; 10] = [
#[derive(Debug)] #[derive(Debug)]
struct UdpReaderInner { struct UdpReaderInner {
socket: tokio::net::udp::UdpSocket, socket: tokio::net::UdpSocket,
} }
#[derive(Debug)] #[derive(Debug)]
pub struct UdpReader(Arc<Mutex<UdpReaderInner>>); pub struct UdpReader(Arc<Mutex<UdpReaderInner>>);
impl UdpReader { impl UdpReader {
fn new(socket: tokio::net::udp::UdpSocket) -> Self { fn new(socket: tokio::net::UdpSocket) -> Self {
UdpReader(Arc::new(Mutex::new(UdpReaderInner { socket }))) UdpReader(Arc::new(Mutex::new(UdpReaderInner { socket })))
} }
} }
@ -506,7 +508,7 @@ impl PadSrcHandler for UdpSrcPadHandler {
let ret = match event.view() { let ret = match event.view() {
EventView::FlushStart(..) => { EventView::FlushStart(..) => {
let _ = block_on!(udpsrc.pause(element)); let _ = block_on(udpsrc.pause(element));
true true
} }
EventView::FlushStop(..) => { EventView::FlushStop(..) => {
@ -514,7 +516,7 @@ impl PadSrcHandler for UdpSrcPadHandler {
if res == Ok(gst::StateChangeSuccess::Success) && state == gst::State::Playing if res == Ok(gst::StateChangeSuccess::Success) && state == gst::State::Playing
|| res == Ok(gst::StateChangeSuccess::Async) && pending == gst::State::Playing || res == Ok(gst::StateChangeSuccess::Async) && pending == gst::State::Playing
{ {
let _ = block_on!(udpsrc.start(element)); let _ = block_on(udpsrc.start(element));
} }
true true
} }
@ -552,7 +554,7 @@ impl PadSrcHandler for UdpSrcPadHandler {
true true
} }
QueryView::Caps(ref mut q) => { QueryView::Caps(ref mut q) => {
let inner = block_on!(self.lock()); let inner = block_on(self.lock());
let caps = if let Some(ref caps) = inner.configured_caps { let caps = if let Some(ref caps) = inner.configured_caps {
q.get_filter() q.get_filter()
.map(|f| f.intersect_with_mode(caps, gst::CapsIntersectMode::First)) .map(|f| f.intersect_with_mode(caps, gst::CapsIntersectMode::First))
@ -591,12 +593,19 @@ impl Default for State {
} }
} }
#[derive(Debug)]
struct PreparationSet {
join_handle: JoinHandle<Result<(), gst::ErrorMessage>>,
context: Context,
}
#[derive(Debug)] #[derive(Debug)]
struct UdpSrc { struct UdpSrc {
src_pad: PadSrc, src_pad: PadSrc,
src_pad_handler: UdpSrcPadHandler, src_pad_handler: UdpSrcPadHandler,
state: Mutex<State>, state: Mutex<State>,
settings: Mutex<Settings>, settings: Mutex<Settings>,
preparation_set: Mutex<Option<PreparationSet>>,
} }
lazy_static! { lazy_static! {
@ -609,18 +618,36 @@ lazy_static! {
impl UdpSrc { impl UdpSrc {
async fn prepare(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> { async fn prepare(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
let mut state = self.state.lock().await; let _state = self.state.lock().await;
gst_debug!(CAT, obj: element, "Preparing"); gst_debug!(CAT, obj: element, "Preparing");
let mut settings = self.settings.lock().await.clone(); let context = {
let settings = self.settings.lock().await.clone();
let context =
Context::acquire(&settings.context, settings.context_wait).map_err(|err| { Context::acquire(&settings.context, settings.context_wait).map_err(|err| {
gst_error_msg!( gst_error_msg!(
gst::ResourceError::OpenRead, gst::ResourceError::OpenRead,
["Failed to acquire Context: {}", err] ["Failed to acquire Context: {}", err]
) )
})?; })?
};
// UdpSocket needs to be instantiated in the thread of its I/O reactor
*self.preparation_set.lock().await = Some(PreparationSet {
join_handle: context.spawn(Self::prepare_socket(element.clone())),
context,
});
gst_debug!(CAT, obj: element, "Prepared");
Ok(())
}
async fn prepare_socket(element: gst::Element) -> Result<(), gst::ErrorMessage> {
let this = Self::from_instance(&element);
let mut settings = this.settings.lock().await.clone();
gst_debug!(CAT, obj: &element, "Preparing Socket");
let socket = if let Some(ref wrapped_socket) = settings.socket { let socket = if let Some(ref wrapped_socket) = settings.socket {
use std::net::UdpSocket; use std::net::UdpSocket;
@ -636,13 +663,12 @@ impl UdpSrc {
socket = wrapped_socket.get() socket = wrapped_socket.get()
} }
let socket = tokio::net::UdpSocket::from_std(socket, context.reactor_handle()) let socket = tokio::net::UdpSocket::from_std(socket).map_err(|err| {
.map_err(|err| { gst_error_msg!(
gst_error_msg!( gst::ResourceError::OpenRead,
gst::ResourceError::OpenRead, ["Failed to setup socket for tokio: {}", err]
["Failed to setup socket for tokio: {}", err] )
) })?;
})?;
settings.used_socket = Some(wrapped_socket.clone()); settings.used_socket = Some(wrapped_socket.clone());
@ -669,17 +695,16 @@ impl UdpSrc {
// TODO: TTL, multicast loopback, etc // TODO: TTL, multicast loopback, etc
let saddr = if addr.is_multicast() { let saddr = if addr.is_multicast() {
// TODO: Use ::unspecified() constructor once stable
let bind_addr = if addr.is_ipv4() { let bind_addr = if addr.is_ipv4() {
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)) IpAddr::V4(Ipv4Addr::UNSPECIFIED)
} else { } else {
IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0)) IpAddr::V6(Ipv6Addr::UNSPECIFIED)
}; };
let saddr = SocketAddr::new(bind_addr, port as u16); let saddr = SocketAddr::new(bind_addr, port as u16);
gst_debug!( gst_debug!(
CAT, CAT,
obj: element, obj: &element,
"Binding to {:?} for multicast group {:?}", "Binding to {:?} for multicast group {:?}",
saddr, saddr,
addr addr
@ -688,7 +713,7 @@ impl UdpSrc {
saddr saddr
} else { } else {
let saddr = SocketAddr::new(addr, port as u16); let saddr = SocketAddr::new(addr, port as u16);
gst_debug!(CAT, obj: element, "Binding to {:?}", saddr); gst_debug!(CAT, obj: &element, "Binding to {:?}", saddr);
saddr saddr
}; };
@ -731,13 +756,12 @@ impl UdpSrc {
) )
})?; })?;
let socket = tokio::net::UdpSocket::from_std(socket, context.reactor_handle()) let socket = tokio::net::UdpSocket::from_std(socket).map_err(|err| {
.map_err(|err| { gst_error_msg!(
gst_error_msg!( gst::ResourceError::OpenRead,
gst::ResourceError::OpenRead, ["Failed to setup socket for tokio: {}", err]
["Failed to setup socket for tokio: {}", err] )
) })?;
})?;
if addr.is_multicast() { if addr.is_multicast() {
// TODO: Multicast interface configuration, going to be tricky // TODO: Multicast interface configuration, going to be tricky
@ -821,24 +845,54 @@ impl UdpSrc {
let buffer_pool = gst::BufferPool::new(); let buffer_pool = gst::BufferPool::new();
let mut config = buffer_pool.get_config(); let mut config = buffer_pool.get_config();
config.set_params(None, settings.mtu, 0, 0); config.set_params(None, settings.mtu, 0, 0);
buffer_pool.set_config(config).map_err(|_| { buffer_pool.set_config(config).map_err(|err| {
gst_error_msg!( gst_error_msg!(
gst::ResourceError::Settings, gst::ResourceError::Settings,
["Failed to configure buffer pool"] ["Failed to configure buffer pool {:?}", err]
) )
})?; })?;
let socket = Socket::new(element.upcast_ref(), UdpReader::new(socket), buffer_pool); let socket = Socket::new(element.upcast_ref(), UdpReader::new(socket), buffer_pool);
let socket_stream = socket.prepare().await.map_err(|_| { let socket_stream = socket.prepare().await.map_err(|err| {
gst_error_msg!(gst::ResourceError::OpenRead, ["Failed to prepare socket"]) gst_error_msg!(
gst::ResourceError::OpenRead,
["Failed to prepare socket {:?}", err]
)
})?; })?;
{ {
let mut src_pad_handler = self.src_pad_handler.lock().await; let mut src_pad_handler = this.src_pad_handler.lock().await;
src_pad_handler.retrieve_sender_address = settings.retrieve_sender_address; src_pad_handler.retrieve_sender_address = settings.retrieve_sender_address;
src_pad_handler.socket_stream = Some(socket_stream); src_pad_handler.socket_stream = Some(socket_stream);
} }
this.state.lock().await.socket = Some(socket);
gst_debug!(CAT, obj: &element, "Socket Prepared");
drop(settings);
element.notify("used-socket");
Ok(())
}
async fn complete_preparation(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
gst_debug!(CAT, obj: element, "Completing preparation");
let PreparationSet {
join_handle,
context,
} = self
.preparation_set
.lock()
.await
.take()
.expect("preparation_set already taken");
join_handle
.await
.expect("The socket preparation has panicked")?;
self.src_pad self.src_pad
.prepare(context, &self.src_pad_handler) .prepare(context, &self.src_pad_handler)
.await .await
@ -849,12 +903,7 @@ impl UdpSrc {
) )
})?; })?;
state.socket = Some(socket); gst_debug!(CAT, obj: element, "Preparation completed");
gst_debug!(CAT, obj: element, "Prepared");
drop(state);
element.notify("used-socket");
Ok(()) Ok(())
} }
@ -972,6 +1021,7 @@ impl ObjectSubclass for UdpSrc {
src_pad_handler: UdpSrcPadHandler::new(), src_pad_handler: UdpSrcPadHandler::new(),
state: Mutex::new(State::default()), state: Mutex::new(State::default()),
settings: Mutex::new(Settings::default()), settings: Mutex::new(Settings::default()),
preparation_set: Mutex::new(None),
} }
} }
} }
@ -984,27 +1034,27 @@ impl ObjectImpl for UdpSrc {
match *prop { match *prop {
subclass::Property("address", ..) => { subclass::Property("address", ..) => {
let mut settings = block_on!(self.settings.lock()); let mut settings = block_on(self.settings.lock());
settings.address = value.get().expect("type checked upstream"); settings.address = value.get().expect("type checked upstream");
} }
subclass::Property("port", ..) => { subclass::Property("port", ..) => {
let mut settings = block_on!(self.settings.lock()); let mut settings = block_on(self.settings.lock());
settings.port = value.get_some().expect("type checked upstream"); settings.port = value.get_some().expect("type checked upstream");
} }
subclass::Property("reuse", ..) => { subclass::Property("reuse", ..) => {
let mut settings = block_on!(self.settings.lock()); let mut settings = block_on(self.settings.lock());
settings.reuse = value.get_some().expect("type checked upstream"); settings.reuse = value.get_some().expect("type checked upstream");
} }
subclass::Property("caps", ..) => { subclass::Property("caps", ..) => {
let mut settings = block_on!(self.settings.lock()); let mut settings = block_on(self.settings.lock());
settings.caps = value.get().expect("type checked upstream"); settings.caps = value.get().expect("type checked upstream");
} }
subclass::Property("mtu", ..) => { subclass::Property("mtu", ..) => {
let mut settings = block_on!(self.settings.lock()); let mut settings = block_on(self.settings.lock());
settings.mtu = value.get_some().expect("type checked upstream"); settings.mtu = value.get_some().expect("type checked upstream");
} }
subclass::Property("socket", ..) => { subclass::Property("socket", ..) => {
let mut settings = block_on!(self.settings.lock()); let mut settings = block_on(self.settings.lock());
settings.socket = value settings.socket = value
.get::<gio::Socket>() .get::<gio::Socket>()
.expect("type checked upstream") .expect("type checked upstream")
@ -1014,18 +1064,18 @@ impl ObjectImpl for UdpSrc {
unreachable!(); unreachable!();
} }
subclass::Property("context", ..) => { subclass::Property("context", ..) => {
let mut settings = block_on!(self.settings.lock()); let mut settings = block_on(self.settings.lock());
settings.context = value settings.context = value
.get() .get()
.expect("type checked upstream") .expect("type checked upstream")
.unwrap_or_else(|| "".into()); .unwrap_or_else(|| "".into());
} }
subclass::Property("context-wait", ..) => { subclass::Property("context-wait", ..) => {
let mut settings = block_on!(self.settings.lock()); let mut settings = block_on(self.settings.lock());
settings.context_wait = value.get_some().expect("type checked upstream"); settings.context_wait = value.get_some().expect("type checked upstream");
} }
subclass::Property("retrieve-sender-address", ..) => { subclass::Property("retrieve-sender-address", ..) => {
let mut settings = block_on!(self.settings.lock()); let mut settings = block_on(self.settings.lock());
settings.retrieve_sender_address = value.get_some().expect("type checked upstream"); settings.retrieve_sender_address = value.get_some().expect("type checked upstream");
} }
_ => unimplemented!(), _ => unimplemented!(),
@ -1037,27 +1087,27 @@ impl ObjectImpl for UdpSrc {
match *prop { match *prop {
subclass::Property("address", ..) => { subclass::Property("address", ..) => {
let settings = block_on!(self.settings.lock()); let settings = block_on(self.settings.lock());
Ok(settings.address.to_value()) Ok(settings.address.to_value())
} }
subclass::Property("port", ..) => { subclass::Property("port", ..) => {
let settings = block_on!(self.settings.lock()); let settings = block_on(self.settings.lock());
Ok(settings.port.to_value()) Ok(settings.port.to_value())
} }
subclass::Property("reuse", ..) => { subclass::Property("reuse", ..) => {
let settings = block_on!(self.settings.lock()); let settings = block_on(self.settings.lock());
Ok(settings.reuse.to_value()) Ok(settings.reuse.to_value())
} }
subclass::Property("caps", ..) => { subclass::Property("caps", ..) => {
let settings = block_on!(self.settings.lock()); let settings = block_on(self.settings.lock());
Ok(settings.caps.to_value()) Ok(settings.caps.to_value())
} }
subclass::Property("mtu", ..) => { subclass::Property("mtu", ..) => {
let settings = block_on!(self.settings.lock()); let settings = block_on(self.settings.lock());
Ok(settings.mtu.to_value()) Ok(settings.mtu.to_value())
} }
subclass::Property("socket", ..) => { subclass::Property("socket", ..) => {
let settings = block_on!(self.settings.lock()); let settings = block_on(self.settings.lock());
Ok(settings Ok(settings
.socket .socket
.as_ref() .as_ref()
@ -1065,7 +1115,7 @@ impl ObjectImpl for UdpSrc {
.to_value()) .to_value())
} }
subclass::Property("used-socket", ..) => { subclass::Property("used-socket", ..) => {
let settings = block_on!(self.settings.lock()); let settings = block_on(self.settings.lock());
Ok(settings Ok(settings
.used_socket .used_socket
.as_ref() .as_ref()
@ -1073,15 +1123,15 @@ impl ObjectImpl for UdpSrc {
.to_value()) .to_value())
} }
subclass::Property("context", ..) => { subclass::Property("context", ..) => {
let settings = block_on!(self.settings.lock()); let settings = block_on(self.settings.lock());
Ok(settings.context.to_value()) Ok(settings.context.to_value())
} }
subclass::Property("context-wait", ..) => { subclass::Property("context-wait", ..) => {
let settings = block_on!(self.settings.lock()); let settings = block_on(self.settings.lock());
Ok(settings.context_wait.to_value()) Ok(settings.context_wait.to_value())
} }
subclass::Property("retrieve-sender-address", ..) => { subclass::Property("retrieve-sender-address", ..) => {
let settings = block_on!(self.settings.lock()); let settings = block_on(self.settings.lock());
Ok(settings.retrieve_sender_address.to_value()) Ok(settings.retrieve_sender_address.to_value())
} }
_ => unimplemented!(), _ => unimplemented!(),
@ -1107,16 +1157,22 @@ impl ElementImpl for UdpSrc {
match transition { match transition {
gst::StateChange::NullToReady => { gst::StateChange::NullToReady => {
block_on!(self.prepare(element)).map_err(|err| { block_on(self.prepare(element)).map_err(|err| {
element.post_error_message(&err);
gst::StateChangeError
})?;
}
gst::StateChange::ReadyToPaused => {
block_on(self.complete_preparation(element)).map_err(|err| {
element.post_error_message(&err); element.post_error_message(&err);
gst::StateChangeError gst::StateChangeError
})?; })?;
} }
gst::StateChange::PlayingToPaused => { gst::StateChange::PlayingToPaused => {
block_on!(self.pause(element)).map_err(|_| gst::StateChangeError)?; block_on(self.pause(element)).map_err(|_| gst::StateChangeError)?;
} }
gst::StateChange::ReadyToNull => { gst::StateChange::ReadyToNull => {
block_on!(self.unprepare(element)).map_err(|_| gst::StateChangeError)?; block_on(self.unprepare(element)).map_err(|_| gst::StateChangeError)?;
} }
_ => (), _ => (),
} }
@ -1128,10 +1184,10 @@ impl ElementImpl for UdpSrc {
success = gst::StateChangeSuccess::NoPreroll; success = gst::StateChangeSuccess::NoPreroll;
} }
gst::StateChange::PausedToPlaying => { gst::StateChange::PausedToPlaying => {
block_on!(self.start(element)).map_err(|_| gst::StateChangeError)?; block_on(self.start(element)).map_err(|_| gst::StateChangeError)?;
} }
gst::StateChange::PausedToReady => { gst::StateChange::PausedToReady => {
block_on!(async { block_on(async {
self.src_pad_handler.lock().await.need_initial_events = true; self.src_pad_handler.lock().await.need_initial_events = true;
}); });
} }

View file

@ -18,6 +18,7 @@
use either::Either; use either::Either;
use futures::channel::mpsc; use futures::channel::mpsc;
use futures::executor::block_on;
use futures::future::BoxFuture; use futures::future::BoxFuture;
use futures::lock::Mutex; use futures::lock::Mutex;
use futures::prelude::*; use futures::prelude::*;
@ -36,7 +37,6 @@ use lazy_static::lazy_static;
use std::boxed::Box; use std::boxed::Box;
use std::sync::Arc; use std::sync::Arc;
use gstthreadshare::block_on;
use gstthreadshare::runtime::prelude::*; use gstthreadshare::runtime::prelude::*;
use gstthreadshare::runtime::{Context, PadContext, PadSink, PadSinkRef, PadSrc, PadSrcRef}; use gstthreadshare::runtime::{Context, PadContext, PadSink, PadSinkRef, PadSrc, PadSrcRef};
@ -151,7 +151,7 @@ impl PadSrcHandler for PadSrcHandlerTest {
let ret = match event.view() { let ret = match event.view() {
EventView::FlushStart(..) => { EventView::FlushStart(..) => {
let _ = block_on!(elem_src_test.pause(element)); let _ = block_on(elem_src_test.pause(element));
true true
} }
EventView::FlushStop(..) => { EventView::FlushStop(..) => {
@ -159,7 +159,7 @@ impl PadSrcHandler for PadSrcHandlerTest {
if res == Ok(gst::StateChangeSuccess::Success) && state == gst::State::Playing if res == Ok(gst::StateChangeSuccess::Success) && state == gst::State::Playing
|| res == Ok(gst::StateChangeSuccess::Async) && pending == gst::State::Playing || res == Ok(gst::StateChangeSuccess::Async) && pending == gst::State::Playing
{ {
let _ = block_on!(elem_src_test.start(element)); let _ = block_on(elem_src_test.start(element));
} }
true true
} }
@ -340,7 +340,7 @@ impl ObjectImpl for ElementSrcTest {
.expect("type checked upstream") .expect("type checked upstream")
.unwrap_or_else(|| "".into()); .unwrap_or_else(|| "".into());
block_on!(self.settings.lock()).context = context; block_on(self.settings.lock()).context = context;
} }
_ => unimplemented!(), _ => unimplemented!(),
} }
@ -364,16 +364,16 @@ impl ElementImpl for ElementSrcTest {
match transition { match transition {
gst::StateChange::NullToReady => { gst::StateChange::NullToReady => {
block_on!(self.prepare(element)).map_err(|err| { block_on(self.prepare(element)).map_err(|err| {
element.post_error_message(&err); element.post_error_message(&err);
gst::StateChangeError gst::StateChangeError
})?; })?;
} }
gst::StateChange::PlayingToPaused => { gst::StateChange::PlayingToPaused => {
block_on!(self.pause(element)).map_err(|_| gst::StateChangeError)?; block_on(self.pause(element)).map_err(|_| gst::StateChangeError)?;
} }
gst::StateChange::ReadyToNull => { gst::StateChange::ReadyToNull => {
block_on!(self.unprepare(element)).map_err(|_| gst::StateChangeError)?; block_on(self.unprepare(element)).map_err(|_| gst::StateChangeError)?;
} }
_ => (), _ => (),
} }
@ -382,7 +382,7 @@ impl ElementImpl for ElementSrcTest {
match transition { match transition {
gst::StateChange::PausedToPlaying => { gst::StateChange::PausedToPlaying => {
block_on!(self.start(element)).map_err(|_| gst::StateChangeError)?; block_on(self.start(element)).map_err(|_| gst::StateChangeError)?;
} }
gst::StateChange::ReadyToPaused => { gst::StateChange::ReadyToPaused => {
success = gst::StateChangeSuccess::NoPreroll; success = gst::StateChangeSuccess::NoPreroll;
@ -516,7 +516,7 @@ impl PadSinkHandler for PadSinkHandlerTest {
} else { } else {
gst_debug!(SINK_CAT, obj: pad.gst_pad(), "Fowarding non-serialized event {:?}", event); gst_debug!(SINK_CAT, obj: pad.gst_pad(), "Fowarding non-serialized event {:?}", event);
Either::Left( Either::Left(
block_on!(elem_sink_test.sender.lock()) block_on(elem_sink_test.sender.lock())
.as_mut() .as_mut()
.expect("ItemSender not set") .expect("ItemSender not set")
.try_send(Item::Event(event)) .try_send(Item::Event(event))
@ -633,7 +633,7 @@ impl ObjectImpl for ElementSinkTest {
.expect("type checked upstream") .expect("type checked upstream")
.expect("ItemSender not found") .expect("ItemSender not found")
.clone(); .clone();
*block_on!(self.sender.lock()) = Some(sender); *block_on(self.sender.lock()) = Some(sender);
} }
_ => unimplemented!(), _ => unimplemented!(),
} }
@ -657,10 +657,10 @@ impl ElementImpl for ElementSinkTest {
match transition { match transition {
gst::StateChange::NullToReady => { gst::StateChange::NullToReady => {
block_on!(self.sink_pad.prepare(&PadSinkHandlerTest {})); block_on(self.sink_pad.prepare(&PadSinkHandlerTest {}));
} }
gst::StateChange::ReadyToNull => { gst::StateChange::ReadyToNull => {
block_on!(self.sink_pad.unprepare()); block_on(self.sink_pad.unprepare());
} }
_ => (), _ => (),
} }
@ -714,14 +714,16 @@ fn task() {
pipeline.set_state(gst::State::Playing).unwrap(); pipeline.set_state(gst::State::Playing).unwrap();
// Initial events // Initial events
block_on!(elem_src_test.try_push(Item::Event( block_on(
gst::Event::new_stream_start("stream_id_task_test") elem_src_test.try_push(Item::Event(
.group_id(gst::util_group_id_next()) gst::Event::new_stream_start("stream_id_task_test")
.build(), .group_id(gst::util_group_id_next())
))) .build(),
)),
)
.unwrap(); .unwrap();
match block_on!(receiver.next()).unwrap() { match block_on(receiver.next()).unwrap() {
Item::Event(event) => match event.view() { Item::Event(event) => match event.view() {
gst::EventView::CustomDownstreamSticky(e) => { gst::EventView::CustomDownstreamSticky(e) => {
assert!(PadContext::is_pad_context_sticky_event(&e)) assert!(PadContext::is_pad_context_sticky_event(&e))
@ -731,7 +733,7 @@ fn task() {
other => panic!("Unexpected item {:?}", other), other => panic!("Unexpected item {:?}", other),
} }
match block_on!(receiver.next()).unwrap() { match block_on(receiver.next()).unwrap() {
Item::Event(event) => match event.view() { Item::Event(event) => match event.view() {
gst::EventView::StreamStart(_) => (), gst::EventView::StreamStart(_) => (),
other => panic!("Unexpected event {:?}", other), other => panic!("Unexpected event {:?}", other),
@ -739,12 +741,12 @@ fn task() {
other => panic!("Unexpected item {:?}", other), other => panic!("Unexpected item {:?}", other),
} }
block_on!(elem_src_test.try_push(Item::Event( block_on(elem_src_test.try_push(Item::Event(
gst::Event::new_segment(&gst::FormattedSegment::<gst::format::Time>::new()).build(), gst::Event::new_segment(&gst::FormattedSegment::<gst::format::Time>::new()).build(),
))) )))
.unwrap(); .unwrap();
match block_on!(receiver.next()).unwrap() { match block_on(receiver.next()).unwrap() {
Item::Event(event) => match event.view() { Item::Event(event) => match event.view() {
gst::EventView::Segment(_) => (), gst::EventView::Segment(_) => (),
other => panic!("Unexpected event {:?}", other), other => panic!("Unexpected event {:?}", other),
@ -753,10 +755,10 @@ fn task() {
} }
// Buffer // Buffer
block_on!(elem_src_test.try_push(Item::Buffer(gst::Buffer::from_slice(vec![1, 2, 3, 4])))) block_on(elem_src_test.try_push(Item::Buffer(gst::Buffer::from_slice(vec![1, 2, 3, 4]))))
.unwrap(); .unwrap();
match block_on!(receiver.next()).unwrap() { match block_on(receiver.next()).unwrap() {
Item::Buffer(buffer) => { Item::Buffer(buffer) => {
let data = buffer.map_readable().unwrap(); let data = buffer.map_readable().unwrap();
assert_eq!(data.as_slice(), vec![1, 2, 3, 4].as_slice()); assert_eq!(data.as_slice(), vec![1, 2, 3, 4].as_slice());
@ -769,9 +771,9 @@ fn task() {
list.get_mut() list.get_mut()
.unwrap() .unwrap()
.add(gst::Buffer::from_slice(vec![1, 2, 3, 4])); .add(gst::Buffer::from_slice(vec![1, 2, 3, 4]));
block_on!(elem_src_test.try_push(Item::BufferList(list))).unwrap(); block_on(elem_src_test.try_push(Item::BufferList(list))).unwrap();
match block_on!(receiver.next()).unwrap() { match block_on(receiver.next()).unwrap() {
Item::BufferList(_) => (), Item::BufferList(_) => (),
other => panic!("Unexpected item {:?}", other), other => panic!("Unexpected item {:?}", other),
} }
@ -780,7 +782,7 @@ fn task() {
pipeline.set_state(gst::State::Paused).unwrap(); pipeline.set_state(gst::State::Paused).unwrap();
// Items not longer accepted // Items not longer accepted
block_on!(elem_src_test.try_push(Item::Buffer(gst::Buffer::from_slice(vec![1, 2, 3, 4])))) block_on(elem_src_test.try_push(Item::Buffer(gst::Buffer::from_slice(vec![1, 2, 3, 4]))))
.unwrap_err(); .unwrap_err();
// Nothing forwarded // Nothing forwarded
@ -808,8 +810,8 @@ fn task() {
.unwrap()); .unwrap());
// EOS // EOS
block_on!(elem_src_test.try_push(Item::Event(gst::Event::new_eos().build()))).unwrap(); block_on(elem_src_test.try_push(Item::Event(gst::Event::new_eos().build()))).unwrap();
match block_on!(receiver.next()).unwrap() { match block_on(receiver.next()).unwrap() {
Item::Event(event) => match event.view() { Item::Event(event) => match event.view() {
gst::EventView::Eos(_) => (), gst::EventView::Eos(_) => (),
other => panic!("Unexpected event {:?}", other), other => panic!("Unexpected event {:?}", other),
@ -821,10 +823,12 @@ fn task() {
pipeline.set_state(gst::State::Ready).unwrap(); pipeline.set_state(gst::State::Ready).unwrap();
// Receiver was dropped when stopping => can't send anymore // Receiver was dropped when stopping => can't send anymore
block_on!(elem_src_test.try_push(Item::Event( block_on(
gst::Event::new_stream_start("stream_id_task_test_past_stop") elem_src_test.try_push(Item::Event(
.group_id(gst::util_group_id_next()) gst::Event::new_stream_start("stream_id_task_test_past_stop")
.build(), .group_id(gst::util_group_id_next())
))) .build(),
)),
)
.unwrap_err(); .unwrap_err();
} }

View file

@ -167,7 +167,8 @@ fn multiple_contexts_queue() {
}; };
glib::Continue(true) glib::Continue(true)
}); })
.unwrap();
pipeline.set_state(gst::State::Playing).unwrap(); pipeline.set_state(gst::State::Playing).unwrap();
@ -326,7 +327,8 @@ fn multiple_contexts_proxy() {
}; };
glib::Continue(true) glib::Continue(true)
}); })
.unwrap();
pipeline.set_state(gst::State::Playing).unwrap(); pipeline.set_state(gst::State::Playing).unwrap();
@ -421,39 +423,43 @@ fn eos() {
}); });
let l_clone = l.clone(); let l_clone = l.clone();
pipeline.get_bus().unwrap().add_watch(move |_, msg| { pipeline
use gst::MessageView; .get_bus()
.unwrap()
.add_watch(move |_, msg| {
use gst::MessageView;
match msg.view() { match msg.view() {
MessageView::StateChanged(state_changed) => { MessageView::StateChanged(state_changed) => {
if let Some(source) = state_changed.get_src() { if let Some(source) = state_changed.get_src() {
if source.get_type() != gst::Pipeline::static_type() { if source.get_type() != gst::Pipeline::static_type() {
return glib::Continue(true); return glib::Continue(true);
} }
if state_changed.get_old() == gst::State::Paused if state_changed.get_old() == gst::State::Paused
&& state_changed.get_current() == gst::State::Playing && state_changed.get_current() == gst::State::Playing
{ {
if let Some(scenario) = scenario.take() { if let Some(scenario) = scenario.take() {
std::thread::spawn(scenario); std::thread::spawn(scenario);
}
} }
} }
} }
} MessageView::Error(err) => {
MessageView::Error(err) => { gst_error!(
gst_error!( CAT,
CAT, "eos: Error from {:?}: {} ({:?})",
"eos: Error from {:?}: {} ({:?})", err.get_src().map(|s| s.get_path_string()),
err.get_src().map(|s| s.get_path_string()), err.get_error(),
err.get_error(), err.get_debug()
err.get_debug() );
); l_clone.quit();
l_clone.quit(); }
} _ => (),
_ => (), };
};
glib::Continue(true) glib::Continue(true)
}); })
.unwrap();
pipeline.set_state(gst::State::Playing).unwrap(); pipeline.set_state(gst::State::Playing).unwrap();
@ -569,39 +575,43 @@ fn premature_shutdown() {
}); });
let l_clone = l.clone(); let l_clone = l.clone();
pipeline.get_bus().unwrap().add_watch(move |_, msg| { pipeline
use gst::MessageView; .get_bus()
.unwrap()
.add_watch(move |_, msg| {
use gst::MessageView;
match msg.view() { match msg.view() {
MessageView::StateChanged(state_changed) => { MessageView::StateChanged(state_changed) => {
if let Some(source) = state_changed.get_src() { if let Some(source) = state_changed.get_src() {
if source.get_type() != gst::Pipeline::static_type() { if source.get_type() != gst::Pipeline::static_type() {
return glib::Continue(true); return glib::Continue(true);
} }
if state_changed.get_old() == gst::State::Paused if state_changed.get_old() == gst::State::Paused
&& state_changed.get_current() == gst::State::Playing && state_changed.get_current() == gst::State::Playing
{ {
if let Some(scenario) = scenario.take() { if let Some(scenario) = scenario.take() {
std::thread::spawn(scenario); std::thread::spawn(scenario);
}
} }
} }
} }
} MessageView::Error(err) => {
MessageView::Error(err) => { gst_error!(
gst_error!( CAT,
CAT, "premature_shutdown: Error from {:?}: {} ({:?})",
"premature_shutdown: Error from {:?}: {} ({:?})", err.get_src().map(|s| s.get_path_string()),
err.get_src().map(|s| s.get_path_string()), err.get_error(),
err.get_error(), err.get_debug()
err.get_debug() );
); l_clone.quit();
l_clone.quit(); }
} _ => (),
_ => (), };
};
glib::Continue(true) glib::Continue(true)
}); })
.unwrap();
pipeline.set_state(gst::State::Playing).unwrap(); pipeline.set_state(gst::State::Playing).unwrap();

View file

@ -1 +0,0 @@
beta