threadsharing: migrate to async/await and tokio 0.2.0-alpha.6

See https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/merge_requests/164
This commit is contained in:
François Laignel 2019-11-24 21:12:40 +01:00
parent a91d0d929c
commit 8eec141517
20 changed files with 1184 additions and 890 deletions

View file

@ -55,17 +55,32 @@ stages:
- cargo build --color=always --all --examples --all-features - cargo build --color=always --all --examples --all-features
- G_DEBUG=fatal_warnings cargo test --color=always --all --examples --all-features - G_DEBUG=fatal_warnings cargo test --color=always --all --examples --all-features
test 1.39: #test 1.39:
# 1.39 img # 1.39 img
# https://hub.docker.com/_/rust/ # https://hub.docker.com/_/rust/
image: "rust:1.39-slim-buster" # image: "rust:1.39-slim-buster"
extends: '.cargo test' # extends: '.cargo test'
test stable: #test stable:
# Stable img # Stable img
# https://hub.docker.com/_/rust/ # https://hub.docker.com/_/rust/
# image: "rust:slim-buster"
# extends: '.cargo test'
test beta:
# https://hub.docker.com/_/rust/
image: "rust:slim-buster" image: "rust:slim-buster"
extends: '.cargo test' extends: '.tarball_setup'
script:
- export CARGO_HOME=/usr/local/cargo # will install a new toolchain, reset CARGO_HOME to its default path
- rustup toolchain install beta
- export CARGO_HOME=${CI_PROJECT_DIR}/.cargo_home
- rustup override set beta
- rustc --version
- cargo build --color=always --all
- G_DEBUG=fatal_warnings cargo test --color=always --all
- cargo build --color=always --all --examples --all-features
- G_DEBUG=fatal_warnings cargo test --color=always --all --examples --all-features
test nightly: test nightly:
# Nightly # Nightly
@ -76,9 +91,15 @@ test nightly:
rustfmt: rustfmt:
image: "rust:slim-buster" image: "rust:slim-buster"
extends: '.tarball_setup'
stage: "lint" stage: "lint"
script: script:
- rustup component add rustfmt - export CARGO_HOME=/usr/local/cargo # will install a new toolchain, reset CARGO_HOME to its default path
- rustup toolchain install beta
- rustup component add rustfmt --toolchain beta
- export CARGO_HOME=${CI_PROJECT_DIR}/.cargo_home
- rustup override set beta
- rustc --version
- cargo fmt --version - cargo fmt --version
- cargo fmt -- --color=always --check - cargo fmt -- --color=always --check
@ -87,7 +108,12 @@ clippy:
image: "rust:slim-buster" image: "rust:slim-buster"
stage: 'extras' stage: 'extras'
script: script:
- rustup component add clippy-preview - export CARGO_HOME=/usr/local/cargo # will install a new toolchain, reset CARGO_HOME to its default path
- rustup toolchain install beta
- rustup component add clippy --toolchain beta
- export CARGO_HOME=${CI_PROJECT_DIR}/.cargo_home
- rustup override set beta
- rustc --version
- cargo clippy --color=always --all --all-features -- -A clippy::redundant_pattern_matching -A clippy::single_match -A clippy::cast_lossless - cargo clippy --color=always --all --all-features -- -A clippy::redundant_pattern_matching -A clippy::single_match -A clippy::cast_lossless
audit: audit:

View file

@ -5,27 +5,26 @@ authors = ["Sebastian Dröge <sebastian@centricular.com>"]
license = "LGPL-2.1+" license = "LGPL-2.1+"
description = "Threadshare Plugin" description = "Threadshare Plugin"
repository = "https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs" repository = "https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs"
edition = "2018"
[dependencies] [dependencies]
libc = "0.2" libc = "0.2"
glib-sys = { git = "https://github.com/gtk-rs/sys" } glib-sys = { git = "https://github.com/gtk-rs/sys" }
gobject-sys = { git = "https://github.com/gtk-rs/sys" } gobject-sys = { git = "https://github.com/gtk-rs/sys" }
gio-sys = { git = "https://github.com/gtk-rs/sys" } gio-sys = { git = "https://github.com/gtk-rs/sys" }
gstreamer-sys = { git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs-sys" }
glib = { git = "https://github.com/gtk-rs/glib" } glib = { git = "https://github.com/gtk-rs/glib" }
gio = { git = "https://github.com/gtk-rs/gio" } gio = { git = "https://github.com/gtk-rs/gio" }
gstreamer = { git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" } gst = { package = "gstreamer", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
gstreamer-app = { git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" } gst-app = { package = "gstreamer-app", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
gstreamer-check = { git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" } gst-check = { package = "gstreamer-check", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
gstreamer-net = { git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" } gst-net = { package = "gstreamer-net", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
gstreamer-rtp = { git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" } gst-rtp = { package = "gstreamer-rtp", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
gstreamer-sys = { git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs-sys" }
tokio = "0.1" tokio = "=0.2.0-alpha.6"
tokio-reactor = "0.1" tokio-executor = { version = "=0.2.0-alpha.6", features = ["current-thread"] }
tokio-executor = "0.1" tokio-net = { version = "=0.2.0-alpha.6", features = ["tcp", "udp"] }
tokio-timer = "0.2" tokio-timer = "=0.3.0-alpha.6"
tokio-current-thread = "0.1" futures-preview = "0.3.0-alpha.19"
futures = "0.1"
lazy_static = "1.0" lazy_static = "1.0"
either = "1.0" either = "1.0"
rand = "0.7" rand = "0.7"

View file

@ -15,10 +15,10 @@
// Free Software Foundation, Inc., 51 Franklin Street, Suite 500, // Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
// Boston, MA 02110-1335, USA. // Boston, MA 02110-1335, USA.
extern crate glib as glib; use glib;
use glib::prelude::*; use glib::prelude::*;
extern crate gstreamer as gst; use gst;
use gst::prelude::*; use gst::prelude::*;
use std::env; use std::env;

View file

@ -15,26 +15,30 @@
// Free Software Foundation, Inc., 51 Franklin Street, Suite 500, // Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
// Boston, MA 02110-1335, USA. // Boston, MA 02110-1335, USA.
use either::Either;
use futures::channel::mpsc;
use futures::prelude::*;
use glib; use glib;
use glib::prelude::*; use glib::prelude::*;
use glib::subclass; use glib::subclass;
use glib::subclass::prelude::*; use glib::subclass::prelude::*;
use glib::{glib_object_impl, glib_object_subclass};
use gst; use gst;
use gst::prelude::*; use gst::prelude::*;
use gst::subclass::prelude::*; use gst::subclass::prelude::*;
use gst::{gst_debug, gst_element_error, gst_error, gst_error_msg, gst_log, gst_trace};
use lazy_static::lazy_static;
use rand;
use std::sync::Mutex; use std::sync::Mutex;
use std::u32; use std::u32;
use futures::future; use super::iocontext::*;
use futures::sync::{mpsc, oneshot};
use futures::{Future, Stream};
use either::Either;
use rand;
use iocontext::*;
const DEFAULT_CONTEXT: &str = ""; const DEFAULT_CONTEXT: &str = "";
const DEFAULT_CONTEXT_WAIT: u32 = 0; const DEFAULT_CONTEXT_WAIT: u32 = 0;
@ -119,7 +123,7 @@ struct State {
io_context: Option<IOContext>, io_context: Option<IOContext>,
pending_future_id: Option<PendingFutureId>, pending_future_id: Option<PendingFutureId>,
channel: Option<mpsc::Sender<Either<gst::Buffer, gst::Event>>>, channel: Option<mpsc::Sender<Either<gst::Buffer, gst::Event>>>,
pending_future_cancel: Option<oneshot::Sender<()>>, pending_future_abort_handle: Option<future::AbortHandle>,
need_initial_events: bool, need_initial_events: bool,
configured_caps: Option<gst::Caps>, configured_caps: Option<gst::Caps>,
} }
@ -130,7 +134,7 @@ impl Default for State {
io_context: None, io_context: None,
pending_future_id: None, pending_future_id: None,
channel: None, channel: None,
pending_future_cancel: None, pending_future_abort_handle: None,
need_initial_events: true, need_initial_events: true,
configured_caps: None, configured_caps: None,
} }
@ -294,101 +298,102 @@ impl AppSrc {
} }
} }
fn push_item( async fn push_item(
&self, element: gst::Element,
element: &gst::Element,
item: Either<gst::Buffer, gst::Event>, item: Either<gst::Buffer, gst::Event>,
) -> future::Either< ) -> Result<(), gst::FlowError> {
Box<dyn Future<Item = (), Error = ()> + Send + 'static>, let appsrc = Self::from_instance(&element);
future::FutureResult<(), ()>,
> {
let mut events = Vec::new(); let mut events = Vec::new();
let mut state = self.state.lock().unwrap();
if state.need_initial_events {
gst_debug!(CAT, obj: element, "Pushing initial events");
let stream_id = format!("{:08x}{:08x}", rand::random::<u32>(), rand::random::<u32>()); {
events.push(gst::Event::new_stream_start(&stream_id).build()); let mut state = appsrc.state.lock().unwrap();
if let Some(ref caps) = self.settings.lock().unwrap().caps { if state.need_initial_events {
events.push(gst::Event::new_caps(&caps).build()); gst_debug!(CAT, obj: &element, "Pushing initial events");
state.configured_caps = Some(caps.clone());
}
events.push(
gst::Event::new_segment(&gst::FormattedSegment::<gst::format::Time>::new()).build(),
);
if let Some(event) = Self::create_io_context_event(&state) { let stream_id =
events.push(event); format!("{:08x}{:08x}", rand::random::<u32>(), rand::random::<u32>());
events.push(
gst::Event::new_stream_start(&stream_id)
.group_id(gst::util_group_id_next())
.build(),
);
if let Some(ref caps) = appsrc.settings.lock().unwrap().caps {
events.push(gst::Event::new_caps(&caps).build());
state.configured_caps = Some(caps.clone());
}
events.push(
gst::Event::new_segment(&gst::FormattedSegment::<gst::format::Time>::new())
.build(),
);
// Get rid of reconfigure flag if let Some(event) = Self::create_io_context_event(&state) {
self.src_pad.check_reconfigure(); events.push(event);
}
state.need_initial_events = false; // Get rid of reconfigure flag
} else if self.src_pad.check_reconfigure() { appsrc.src_pad.check_reconfigure();
if let Some(event) = Self::create_io_context_event(&state) { }
events.push(event); state.need_initial_events = false;
} else if appsrc.src_pad.check_reconfigure() {
if let Some(event) = Self::create_io_context_event(&state) {
events.push(event);
}
} }
} }
drop(state);
for event in events { for event in events {
self.src_pad.push_event(event); appsrc.src_pad.push_event(event);
} }
let res = match item { let res = match item {
Either::Left(buffer) => { Either::Left(buffer) => {
gst_log!(CAT, obj: element, "Forwarding buffer {:?}", buffer); gst_log!(CAT, obj: &element, "Forwarding buffer {:?}", buffer);
self.src_pad.push(buffer).map(|_| ()) appsrc.src_pad.push(buffer).map(|_| ())
} }
Either::Right(event) => { Either::Right(event) => {
gst_log!(CAT, obj: element, "Forwarding event {:?}", event); gst_log!(CAT, obj: &element, "Forwarding event {:?}", event);
self.src_pad.push_event(event); appsrc.src_pad.push_event(event);
Ok(()) Ok(())
} }
}; };
let res = match res {
Ok(_) => {
gst_log!(CAT, obj: element, "Successfully pushed item");
Ok(())
}
Err(gst::FlowError::Flushing) | Err(gst::FlowError::Eos) => {
gst_debug!(CAT, obj: element, "EOS");
Err(())
}
Err(err) => {
gst_error!(CAT, obj: element, "Got error {}", err);
gst_element_error!(
element,
gst::StreamError::Failed,
("Internal data stream error"),
["streaming stopped, reason {}", err]
);
Err(())
}
};
match res { match res {
Ok(()) => { Ok(()) => gst_log!(CAT, obj: &element, "Successfully pushed item"),
let mut state = self.state.lock().unwrap(); Err(gst::FlowError::Eos) => gst_debug!(CAT, obj: &element, "EOS"),
Err(gst::FlowError::Flushing) => gst_debug!(CAT, obj: &element, "Flushing"),
if let State { Err(err) => {
io_context: Some(ref io_context), gst_error!(CAT, obj: &element, "Got error {}", err);
pending_future_id: Some(ref pending_future_id), gst_element_error!(
ref mut pending_future_cancel, &element,
.. gst::StreamError::Failed,
} = *state ("Internal data stream error"),
{ ["streaming stopped, reason {}", err]
let (cancel, future) = io_context.drain_pending_futures(*pending_future_id); );
*pending_future_cancel = cancel;
future
} else {
future::Either::B(future::ok(()))
}
} }
Err(_) => future::Either::B(future::err(())),
} }
res?;
let abortable_drain = {
let mut state = appsrc.state.lock().unwrap();
if let State {
io_context: Some(ref io_context),
pending_future_id: Some(ref pending_future_id),
ref mut pending_future_abort_handle,
..
} = *state
{
let (abort_handle, abortable_drain) =
io_context.drain_pending_futures(*pending_future_id);
*pending_future_abort_handle = abort_handle;
abortable_drain
} else {
return Ok(());
}
};
abortable_drain.await
} }
fn prepare(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> { fn prepare(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
@ -466,10 +471,8 @@ impl AppSrc {
let (channel_sender, channel_receiver) = mpsc::channel(settings.max_buffers as usize); let (channel_sender, channel_receiver) = mpsc::channel(settings.max_buffers as usize);
let element_clone = element.clone(); let element_clone = element.clone();
let future = channel_receiver.for_each(move |item| { let future = channel_receiver
let appsrc = Self::from_instance(&element_clone); .for_each(move |item| Self::push_item(element_clone.clone(), item).map(|_| ()));
appsrc.push_item(&element_clone, item)
});
io_context.spawn(future); io_context.spawn(future);
*channel = Some(channel_sender); *channel = Some(channel_sender);
@ -483,7 +486,10 @@ impl AppSrc {
let mut state = self.state.lock().unwrap(); let mut state = self.state.lock().unwrap();
let _ = state.channel.take(); let _ = state.channel.take();
let _ = state.pending_future_cancel.take();
if let Some(abort_handle) = state.pending_future_abort_handle.take() {
abort_handle.abort();
}
gst_debug!(CAT, obj: element, "Stopped"); gst_debug!(CAT, obj: element, "Stopped");
@ -651,7 +657,7 @@ impl ObjectImpl for AppSrc {
let element = obj.downcast_ref::<gst::Element>().unwrap(); let element = obj.downcast_ref::<gst::Element>().unwrap();
element.add_pad(&self.src_pad).unwrap(); element.add_pad(&self.src_pad).unwrap();
::set_element_flags(element, gst::ElementFlags::SOURCE); super::set_element_flags(element, gst::ElementFlags::SOURCE);
} }
} }

View file

@ -15,18 +15,24 @@
// Free Software Foundation, Inc., 51 Franklin Street, Suite 500, // Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
// Boston, MA 02110-1335, USA. // Boston, MA 02110-1335, USA.
use futures::channel::oneshot;
use futures::prelude::*;
use gst; use gst;
use gst::gst_debug;
use gst::prelude::*; use gst::prelude::*;
use lazy_static::lazy_static;
use std::collections::VecDeque; use std::collections::VecDeque;
use std::pin::Pin;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::task::{self, Poll};
use std::{u32, u64}; use std::{u32, u64};
use futures::sync::oneshot; use tokio_executor::current_thread as tokio_current_thread;
use futures::task;
use futures::{Async, Future, IntoFuture, Poll, Stream};
use iocontext::*; use super::iocontext::*;
lazy_static! { lazy_static! {
static ref DATA_QUEUE_CAT: gst::DebugCategory = gst::DebugCategory::new( static ref DATA_QUEUE_CAT: gst::DebugCategory = gst::DebugCategory::new(
@ -89,7 +95,7 @@ struct DataQueueInner {
max_size_bytes: Option<u32>, max_size_bytes: Option<u32>,
max_size_time: Option<u64>, max_size_time: Option<u64>,
current_task: Option<task::Task>, waker: Option<task::Waker>,
shutdown_receiver: Option<oneshot::Receiver<()>>, shutdown_receiver: Option<oneshot::Receiver<()>>,
} }
@ -109,16 +115,20 @@ impl DataQueue {
max_size_buffers, max_size_buffers,
max_size_bytes, max_size_bytes,
max_size_time, max_size_time,
current_task: None, waker: None,
shutdown_receiver: None, shutdown_receiver: None,
}))) })))
} }
pub fn schedule<U, F, G>(&self, io_context: &IOContext, func: F, err_func: G) -> Result<(), ()> pub fn schedule<F, G, Fut>(
&self,
io_context: &IOContext,
func: F,
err_func: G,
) -> Result<(), ()>
where where
F: Fn(DataQueueItem) -> U + Send + 'static, F: Fn(DataQueueItem) -> Fut + Send + 'static,
U: IntoFuture<Item = (), Error = gst::FlowError> + 'static, Fut: Future<Output = Result<(), gst::FlowError>> + Send + 'static,
<U as IntoFuture>::Future: Send + 'static,
G: FnOnce(gst::FlowError) + Send + 'static, G: FnOnce(gst::FlowError) + Send + 'static,
{ {
// Ready->Paused // Ready->Paused
@ -136,12 +146,12 @@ impl DataQueue {
assert_eq!(inner.state, DataQueueState::Unscheduled); assert_eq!(inner.state, DataQueueState::Unscheduled);
inner.state = DataQueueState::Scheduled; inner.state = DataQueueState::Scheduled;
let (sender, receiver) = oneshot::channel::<()>(); let (sender, receiver) = oneshot::channel();
inner.shutdown_receiver = Some(receiver); inner.shutdown_receiver = Some(receiver);
let queue_clone = self.clone(); let queue_clone = self.clone();
let element_clone = inner.element.clone(); let element_clone = inner.element.clone();
io_context.spawn(queue_clone.for_each(func).then(move |res| { io_context.spawn(queue_clone.try_for_each(func).then(move |res| {
gst_debug!( gst_debug!(
DATA_QUEUE_CAT, DATA_QUEUE_CAT,
obj: &element_clone, obj: &element_clone,
@ -155,8 +165,9 @@ impl DataQueue {
let _ = sender.send(()); let _ = sender.send(());
Ok(()) future::ready(())
})); }));
Ok(()) Ok(())
} }
@ -164,6 +175,7 @@ impl DataQueue {
// Paused->Playing // Paused->Playing
// //
// Change state to Running and signal task // Change state to Running and signal task
let mut inner = self.0.lock().unwrap(); let mut inner = self.0.lock().unwrap();
gst_debug!(DATA_QUEUE_CAT, obj: &inner.element, "Unpausing data queue"); gst_debug!(DATA_QUEUE_CAT, obj: &inner.element, "Unpausing data queue");
if inner.state == DataQueueState::Running { if inner.state == DataQueueState::Running {
@ -174,8 +186,8 @@ impl DataQueue {
assert_eq!(inner.state, DataQueueState::Scheduled); assert_eq!(inner.state, DataQueueState::Scheduled);
inner.state = DataQueueState::Running; inner.state = DataQueueState::Running;
if let Some(task) = inner.current_task.take() { if let Some(waker) = inner.waker.take() {
task.notify(); waker.wake();
} }
} }
@ -194,8 +206,8 @@ impl DataQueue {
assert_eq!(inner.state, DataQueueState::Running); assert_eq!(inner.state, DataQueueState::Running);
inner.state = DataQueueState::Scheduled; inner.state = DataQueueState::Scheduled;
if let Some(task) = inner.current_task.take() { if let Some(waker) = inner.waker.take() {
task.notify(); waker.wake();
} }
} }
@ -215,15 +227,15 @@ impl DataQueue {
assert!(inner.state == DataQueueState::Scheduled || inner.state == DataQueueState::Running); assert!(inner.state == DataQueueState::Scheduled || inner.state == DataQueueState::Running);
inner.state = DataQueueState::Shutdown; inner.state = DataQueueState::Shutdown;
if let Some(task) = inner.current_task.take() { if let Some(waker) = inner.waker.take() {
task.notify(); waker.wake();
} }
let shutdown_receiver = inner.shutdown_receiver.take().unwrap(); let shutdown_receiver = inner.shutdown_receiver.take().unwrap();
gst_debug!(DATA_QUEUE_CAT, obj: &inner.element, "Waiting for data queue to shut down"); gst_debug!(DATA_QUEUE_CAT, obj: &inner.element, "Waiting for data queue to shut down");
drop(inner); drop(inner);
shutdown_receiver.wait().expect("Already shut down"); tokio_current_thread::block_on_all(shutdown_receiver).expect("Already shut down");
let mut inner = self.0.lock().unwrap(); let mut inner = self.0.lock().unwrap();
inner.state = DataQueueState::Unscheduled; inner.state = DataQueueState::Unscheduled;
@ -287,8 +299,8 @@ impl DataQueue {
inner.cur_size_buffers += count; inner.cur_size_buffers += count;
inner.cur_size_bytes += bytes; inner.cur_size_bytes += bytes;
if let Some(task) = inner.current_task.take() { if let Some(waker) = inner.waker.take() {
task.notify(); waker.wake();
} }
Ok(()) Ok(())
@ -302,18 +314,17 @@ impl Drop for DataQueueInner {
} }
impl Stream for DataQueue { impl Stream for DataQueue {
type Item = DataQueueItem; type Item = Result<DataQueueItem, gst::FlowError>;
type Error = gst::FlowError;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
let mut inner = self.0.lock().unwrap(); let mut inner = self.0.lock().unwrap();
if inner.state == DataQueueState::Shutdown { if inner.state == DataQueueState::Shutdown {
gst_debug!(DATA_QUEUE_CAT, obj: &inner.element, "Data queue shutting down"); gst_debug!(DATA_QUEUE_CAT, obj: &inner.element, "Data queue shutting down");
return Ok(Async::Ready(None)); return Poll::Ready(None);
} else if inner.state == DataQueueState::Scheduled { } else if inner.state == DataQueueState::Scheduled {
gst_debug!(DATA_QUEUE_CAT, obj: &inner.element, "Data queue not running"); gst_debug!(DATA_QUEUE_CAT, obj: &inner.element, "Data queue not running");
inner.current_task = Some(task::current()); inner.waker = Some(cx.waker().clone());
return Ok(Async::NotReady); return Poll::Pending;
} }
assert_eq!(inner.state, DataQueueState::Running); assert_eq!(inner.state, DataQueueState::Running);
@ -322,8 +333,8 @@ impl Stream for DataQueue {
match inner.queue.pop_front() { match inner.queue.pop_front() {
None => { None => {
gst_debug!(DATA_QUEUE_CAT, obj: &inner.element, "Data queue is empty"); gst_debug!(DATA_QUEUE_CAT, obj: &inner.element, "Data queue is empty");
inner.current_task = Some(task::current()); inner.waker = Some(cx.waker().clone());
Ok(Async::NotReady) Poll::Pending
} }
Some(item) => { Some(item) => {
gst_debug!(DATA_QUEUE_CAT, obj: &inner.element, "Popped item {:?}", item); gst_debug!(DATA_QUEUE_CAT, obj: &inner.element, "Popped item {:?}", item);
@ -332,7 +343,7 @@ impl Stream for DataQueue {
inner.cur_size_buffers -= count; inner.cur_size_buffers -= count;
inner.cur_size_bytes -= bytes; inner.cur_size_bytes -= bytes;
Ok(Async::Ready(Some(item))) Poll::Ready(Some(Ok(item)))
} }
} }
} }

View file

@ -15,26 +15,32 @@
// Free Software Foundation, Inc., 51 Franklin Street, Suite 500, // Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
// Boston, MA 02110-1335, USA. // Boston, MA 02110-1335, USA.
use futures::channel::{mpsc, oneshot};
use futures::future::{AbortHandle, Abortable, BoxFuture};
use futures::prelude::*;
use futures::ready;
use futures::stream::futures_unordered::FuturesUnordered;
use glib;
use glib::{glib_boxed_derive_traits, glib_boxed_type};
use gst;
use gst::{gst_debug, gst_log, gst_trace};
use lazy_static::lazy_static;
use std::cmp; use std::cmp;
use std::collections::{BinaryHeap, HashMap}; use std::collections::{BinaryHeap, HashMap};
use std::io; use std::io;
use std::mem; use std::mem;
use std::sync::{atomic, mpsc}; use std::pin::Pin;
use std::sync::atomic;
use std::sync::{Arc, Mutex, Weak}; use std::sync::{Arc, Mutex, Weak};
use std::task::{self, Poll};
use std::thread; use std::thread;
use std::time; use std::time;
use futures::future; use tokio_executor::current_thread as tokio_current_thread;
use futures::stream::futures_unordered::FuturesUnordered;
use futures::sync::mpsc as futures_mpsc;
use futures::sync::oneshot;
use futures::{Async, Future, Stream};
use tokio::reactor;
use tokio_current_thread;
use tokio_timer::timer;
use glib;
use gst;
lazy_static! { lazy_static! {
static ref CONTEXTS: Mutex<HashMap<String, Weak<IOContextInner>>> = Mutex::new(HashMap::new()); static ref CONTEXTS: Mutex<HashMap<String, Weak<IOContextInner>>> = Mutex::new(HashMap::new());
@ -58,7 +64,7 @@ impl IOContextRunner {
fn start( fn start(
name: &str, name: &str,
wait: u32, wait: u32,
reactor: reactor::Reactor, reactor: tokio_net::driver::Reactor,
timers: Arc<Mutex<BinaryHeap<TimerEntry>>>, timers: Arc<Mutex<BinaryHeap<TimerEntry>>>,
) -> (tokio_current_thread::Handle, IOContextShutdown) { ) -> (tokio_current_thread::Handle, IOContextShutdown) {
let handle = reactor.handle().clone(); let handle = reactor.handle().clone();
@ -71,7 +77,7 @@ impl IOContextRunner {
name: name_clone, name: name_clone,
}; };
let (sender, receiver) = mpsc::channel(); let (sender, receiver) = oneshot::channel();
let join = thread::spawn(move || { let join = thread::spawn(move || {
runner.run(wait, reactor, sender, timers); runner.run(wait, reactor, sender, timers);
@ -84,7 +90,8 @@ impl IOContextRunner {
join: Some(join), join: Some(join),
}; };
let runtime_handle = receiver.recv().unwrap(); let runtime_handle =
tokio_current_thread::block_on_all(receiver).expect("Runtime init failed");
(runtime_handle, shutdown) (runtime_handle, shutdown)
} }
@ -92,127 +99,130 @@ impl IOContextRunner {
fn run( fn run(
&mut self, &mut self,
wait: u32, wait: u32,
reactor: reactor::Reactor, reactor: tokio_net::driver::Reactor,
sender: mpsc::Sender<tokio_current_thread::Handle>, sender: oneshot::Sender<tokio_current_thread::Handle>,
timers: Arc<Mutex<BinaryHeap<TimerEntry>>>, timers: Arc<Mutex<BinaryHeap<TimerEntry>>>,
) { ) {
let wait = time::Duration::from_millis(wait as u64); use std::time::{Duration, Instant};
gst_debug!(CONTEXT_CAT, "Started reactor thread '{}'", self.name); gst_debug!(CONTEXT_CAT, "Started reactor thread '{}'", self.name);
let wait = Duration::from_millis(wait as u64);
let handle = reactor.handle(); let handle = reactor.handle();
let mut enter = ::tokio_executor::enter().unwrap(); let timer = tokio_timer::Timer::new(reactor);
let timer = timer::Timer::new(reactor);
let timer_handle = timer.handle(); let timer_handle = timer.handle();
let mut current_thread = tokio_current_thread::CurrentThread::new_with_park(timer); let mut current_thread = tokio_current_thread::CurrentThread::new_with_park(timer);
let _ = sender.send(current_thread.handle()); sender
.send(current_thread.handle())
.expect("Couldn't send Runtime handle");
let mut now = time::Instant::now(); let _timer_guard = tokio_timer::set_default(&timer_handle);
let _reactor_guard = tokio_net::driver::set_default(&handle);
::tokio_timer::with_default(&timer_handle, &mut enter, |mut enter| { let mut now = Instant::now();
::tokio_reactor::with_default(&handle, &mut enter, |enter| loop {
if self.shutdown.load(atomic::Ordering::SeqCst) > RUNNING {
gst_debug!(CONTEXT_CAT, "Shutting down loop");
break;
}
gst_trace!(CONTEXT_CAT, "Elapsed {:?} since last loop", now.elapsed()); loop {
if self.shutdown.load(atomic::Ordering::SeqCst) > RUNNING {
break;
}
// Handle timers gst_trace!(CONTEXT_CAT, "Elapsed {:?} since last loop", now.elapsed());
// Handle timers
{
// Trigger all timers that would be expired before the middle of the loop wait
// time
let timer_threshold = now + wait / 2;
let mut timers = timers.lock().unwrap();
while timers
.peek()
.and_then(|entry| {
if entry.time < timer_threshold {
Some(())
} else {
None
}
})
.is_some()
{ {
// Trigger all timers that would be expired before the middle of the loop wait let TimerEntry {
// time time,
let timer_threshold = now + wait / 2; interval,
let mut timers = timers.lock().unwrap(); sender,
while timers ..
.peek() } = timers.pop().unwrap();
.and_then(|entry| {
if entry.time < timer_threshold { if sender.is_closed() {
Some(()) continue;
} else { }
None
} let _ = sender.unbounded_send(());
}) if let Some(interval) = interval {
.is_some() timers.push(TimerEntry {
{ time: time + interval,
let TimerEntry { id: TIMER_ENTRY_ID.fetch_add(1, atomic::Ordering::Relaxed),
time, interval: Some(interval),
interval,
sender, sender,
.. });
} = timers.pop().unwrap();
if sender.is_closed() {
continue;
}
let _ = sender.unbounded_send(());
if let Some(interval) = interval {
timers.push(TimerEntry {
time: time + interval,
id: TIMER_ENTRY_ID.fetch_add(1, atomic::Ordering::Relaxed),
interval: Some(interval),
sender,
});
}
} }
} }
}
gst_trace!(CONTEXT_CAT, "Turning current thread '{}'", self.name); gst_trace!(CONTEXT_CAT, "Turning current thread '{}'", self.name);
while current_thread while current_thread
.enter(enter) .turn(Some(time::Duration::from_millis(0)))
.turn(Some(time::Duration::from_millis(0))) .unwrap()
.unwrap() .has_polled()
.has_polled() {}
{} gst_trace!(CONTEXT_CAT, "Turned current thread '{}'", self.name);
gst_trace!(CONTEXT_CAT, "Turned current thread '{}'", self.name);
// We have to check again after turning in case we're supposed to shut down now // We have to check again after turning in case we're supposed to shut down now
// and already handled the unpark above // and already handled the unpark above
if self.shutdown.load(atomic::Ordering::SeqCst) > RUNNING { if self.shutdown.load(atomic::Ordering::SeqCst) > RUNNING {
gst_debug!(CONTEXT_CAT, "Shutting down loop"); gst_debug!(CONTEXT_CAT, "Shutting down loop");
break; break;
} }
let elapsed = now.elapsed(); let elapsed = now.elapsed();
gst_trace!(CONTEXT_CAT, "Elapsed {:?} after handling futures", elapsed); gst_trace!(CONTEXT_CAT, "Elapsed {:?} after handling futures", elapsed);
if wait == time::Duration::from_millis(0) { if wait == time::Duration::from_millis(0) {
let timers = timers.lock().unwrap(); let timers = timers.lock().unwrap();
let wait = match timers.peek().map(|entry| entry.time) { let wait = match timers.peek().map(|entry| entry.time) {
None => None, None => None,
Some(time) => Some({ Some(time) => Some({
let tmp = time::Instant::now(); let tmp = time::Instant::now();
if time < tmp { if time < tmp {
time::Duration::from_millis(0) time::Duration::from_millis(0)
} else { } else {
time.duration_since(tmp) time.duration_since(tmp)
} }
}), }),
}; };
drop(timers); drop(timers);
gst_trace!(CONTEXT_CAT, "Sleeping for up to {:?}", wait); gst_trace!(CONTEXT_CAT, "Sleeping for up to {:?}", wait);
current_thread.enter(enter).turn(wait).unwrap(); current_thread.turn(wait).unwrap();
gst_trace!(CONTEXT_CAT, "Slept for {:?}", now.elapsed());
now = time::Instant::now();
} else {
if elapsed < wait {
gst_trace!(
CONTEXT_CAT,
"Waiting for {:?} before polling again",
wait - elapsed
);
thread::sleep(wait - elapsed);
gst_trace!(CONTEXT_CAT, "Slept for {:?}", now.elapsed()); gst_trace!(CONTEXT_CAT, "Slept for {:?}", now.elapsed());
now = time::Instant::now();
} else {
if elapsed < wait {
gst_trace!(
CONTEXT_CAT,
"Waiting for {:?} before polling again",
wait - elapsed
);
thread::sleep(wait - elapsed);
gst_trace!(CONTEXT_CAT, "Slept for {:?}", now.elapsed());
}
now += wait;
} }
})
}); now += wait;
}
}
} }
} }
@ -225,7 +235,7 @@ impl Drop for IOContextRunner {
struct IOContextShutdown { struct IOContextShutdown {
name: String, name: String,
shutdown: Arc<atomic::AtomicUsize>, shutdown: Arc<atomic::AtomicUsize>,
handle: reactor::Handle, handle: tokio_net::driver::Handle,
join: Option<thread::JoinHandle<()>>, join: Option<thread::JoinHandle<()>>,
} }
@ -255,19 +265,17 @@ impl glib::subclass::boxed::BoxedType for IOContext {
glib_boxed_derive_traits!(IOContext); glib_boxed_derive_traits!(IOContext);
type PendingFutures = Mutex<( pub type PendingFuturesOutput = Result<(), gst::FlowError>;
u64, type PendingFutureQueue = FuturesUnordered<BoxFuture<'static, PendingFuturesOutput>>;
HashMap<u64, FuturesUnordered<Box<dyn Future<Item = (), Error = ()> + Send + 'static>>>,
)>;
struct IOContextInner { struct IOContextInner {
name: String, name: String,
runtime_handle: Mutex<tokio_current_thread::Handle>, runtime_handle: Mutex<tokio_current_thread::Handle>,
reactor_handle: reactor::Handle, reactor_handle: tokio_net::driver::Handle,
timers: Arc<Mutex<BinaryHeap<TimerEntry>>>, timers: Arc<Mutex<BinaryHeap<TimerEntry>>>,
// Only used for dropping // Only used for dropping
_shutdown: IOContextShutdown, _shutdown: IOContextShutdown,
pending_futures: PendingFutures, pending_futures: Mutex<(u64, HashMap<u64, PendingFutureQueue>)>,
} }
impl Drop for IOContextInner { impl Drop for IOContextInner {
@ -288,7 +296,7 @@ impl IOContext {
} }
} }
let reactor = reactor::Reactor::new()?; let reactor = tokio_net::driver::Reactor::new()?;
let reactor_handle = reactor.handle().clone(); let reactor_handle = reactor.handle().clone();
let timers = Arc::new(Mutex::new(BinaryHeap::new())); let timers = Arc::new(Mutex::new(BinaryHeap::new()));
@ -310,14 +318,14 @@ impl IOContext {
Ok(IOContext(context)) Ok(IOContext(context))
} }
pub fn spawn<F>(&self, future: F) pub fn spawn<Fut>(&self, future: Fut)
where where
F: Future<Item = (), Error = ()> + Send + 'static, Fut: Future<Output = ()> + Send + 'static,
{ {
self.0.runtime_handle.lock().unwrap().spawn(future).unwrap(); self.0.runtime_handle.lock().unwrap().spawn(future).unwrap();
} }
pub fn reactor_handle(&self) -> &reactor::Handle { pub fn reactor_handle(&self) -> &tokio_net::driver::Handle {
&self.0.reactor_handle &self.0.reactor_handle
} }
@ -333,23 +341,29 @@ impl IOContext {
pub fn release_pending_future_id(&self, id: PendingFutureId) { pub fn release_pending_future_id(&self, id: PendingFutureId) {
let mut pending_futures = self.0.pending_futures.lock().unwrap(); let mut pending_futures = self.0.pending_futures.lock().unwrap();
if let Some(fs) = pending_futures.1.remove(&id.0) { if let Some(fs) = pending_futures.1.remove(&id.0) {
self.spawn(fs.for_each(|_| Ok(()))); self.spawn(fs.try_for_each(|_| future::ok(())).map(|_| ()));
} }
} }
pub fn add_pending_future<F>(&self, id: PendingFutureId, future: F) pub fn add_pending_future<F>(&self, id: PendingFutureId, future: F)
where where
F: Future<Item = (), Error = ()> + Send + 'static, F: Future<Output = PendingFuturesOutput> + Send + 'static,
{ {
let mut pending_futures = self.0.pending_futures.lock().unwrap(); let mut pending_futures = self.0.pending_futures.lock().unwrap();
let fs = pending_futures.1.get_mut(&id.0).unwrap(); let fs = pending_futures.1.get_mut(&id.0).unwrap();
fs.push(Box::new(future)) fs.push(future.boxed())
} }
pub fn drain_pending_futures<E: Send + 'static>( pub fn drain_pending_futures(
&self, &self,
id: PendingFutureId, id: PendingFutureId,
) -> (Option<oneshot::Sender<()>>, PendingFuturesFuture<E>) { ) -> (
Option<AbortHandle>,
future::Either<
BoxFuture<'static, PendingFuturesOutput>,
future::Ready<PendingFuturesOutput>,
>,
) {
let mut pending_futures = self.0.pending_futures.lock().unwrap(); let mut pending_futures = self.0.pending_futures.lock().unwrap();
let fs = pending_futures.1.get_mut(&id.0).unwrap(); let fs = pending_futures.1.get_mut(&id.0).unwrap();
@ -364,25 +378,29 @@ impl IOContext {
id, id,
); );
let (sender, receiver) = oneshot::channel(); let (abort_handle, abort_registration) = AbortHandle::new_pair();
let future = pending_futures let abortable = Abortable::new(
.for_each(|_| Ok(())) pending_futures.try_for_each(|_| future::ok(())),
.select(receiver.then(|_| Ok(()))) abort_registration,
.then(|_| Ok(())); )
.map(|res| {
res.unwrap_or_else(|_| {
gst_trace!(CONTEXT_CAT, "Aborting");
(Some(sender), future::Either::A(Box::new(future))) Err(gst::FlowError::Flushing)
})
})
.boxed()
.left_future();
(Some(abort_handle), abortable)
} else { } else {
(None, future::Either::B(future::ok(()))) (None, future::ok(()).right_future())
} }
} }
} }
pub type PendingFuturesFuture<E> = future::Either<
Box<dyn Future<Item = (), Error = E> + Send + 'static>,
future::FutureResult<(), E>,
>;
#[derive(Clone, Copy, Eq, PartialEq, Hash, Debug)] #[derive(Clone, Copy, Eq, PartialEq, Hash, Debug)]
pub struct PendingFutureId(u64); pub struct PendingFutureId(u64);
@ -401,7 +419,7 @@ pub struct TimerEntry {
time: time::Instant, time: time::Instant,
id: usize, // for producing a total order id: usize, // for producing a total order
interval: Option<time::Duration>, interval: Option<time::Duration>,
sender: futures_mpsc::UnboundedSender<()>, sender: mpsc::UnboundedSender<()>,
} }
impl PartialEq for TimerEntry { impl PartialEq for TimerEntry {
@ -429,7 +447,7 @@ impl Ord for TimerEntry {
#[allow(unused)] #[allow(unused)]
pub struct Interval { pub struct Interval {
receiver: futures_mpsc::UnboundedReceiver<()>, receiver: mpsc::UnboundedReceiver<()>,
} }
impl Interval { impl Interval {
@ -437,7 +455,7 @@ impl Interval {
pub fn new(context: &IOContext, interval: time::Duration) -> Self { pub fn new(context: &IOContext, interval: time::Duration) -> Self {
use tokio_executor::park::Unpark; use tokio_executor::park::Unpark;
let (sender, receiver) = futures_mpsc::unbounded(); let (sender, receiver) = mpsc::unbounded();
let mut timers = context.0.timers.lock().unwrap(); let mut timers = context.0.timers.lock().unwrap();
let entry = TimerEntry { let entry = TimerEntry {
@ -455,20 +473,19 @@ impl Interval {
impl Stream for Interval { impl Stream for Interval {
type Item = (); type Item = ();
type Error = ();
fn poll(&mut self) -> futures::Poll<Option<Self::Item>, Self::Error> { fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
self.receiver.poll() self.receiver.poll_next_unpin(cx)
} }
} }
pub struct Timeout { pub struct Timeout {
receiver: futures_mpsc::UnboundedReceiver<()>, receiver: mpsc::UnboundedReceiver<()>,
} }
impl Timeout { impl Timeout {
pub fn new(context: &IOContext, timeout: time::Duration) -> Self { pub fn new(context: &IOContext, timeout: time::Duration) -> Self {
let (sender, receiver) = futures_mpsc::unbounded(); let (sender, receiver) = mpsc::unbounded();
let mut timers = context.0.timers.lock().unwrap(); let mut timers = context.0.timers.lock().unwrap();
let entry = TimerEntry { let entry = TimerEntry {
@ -484,16 +501,12 @@ impl Timeout {
} }
impl Future for Timeout { impl Future for Timeout {
type Item = (); type Output = ();
type Error = ();
fn poll(&mut self) -> futures::Poll<Self::Item, Self::Error> { fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
let res = self.receiver.poll()?; match ready!(self.receiver.poll_next_unpin(cx)) {
Some(_) => Poll::Ready(()),
match res { None => unreachable!(),
Async::NotReady => Ok(Async::NotReady),
Async::Ready(None) => unreachable!(),
Async::Ready(Some(_)) => Ok(Async::Ready(())),
} }
} }
} }

View file

@ -19,24 +19,27 @@ use glib;
use glib::prelude::*; use glib::prelude::*;
use glib::subclass; use glib::subclass;
use glib::subclass::prelude::*; use glib::subclass::prelude::*;
use glib::{glib_object_impl, glib_object_subclass};
use gst; use gst;
use gst::prelude::*; use gst::prelude::*;
use gst::subclass::prelude::*; use gst::subclass::prelude::*;
use gst::{gst_debug, gst_info, gst_log, gst_trace};
use gst_rtp::RTPBuffer; use gst_rtp::RTPBuffer;
use lazy_static::lazy_static;
use std::cmp::{max, min, Ordering}; use std::cmp::{max, min, Ordering};
use std::collections::BTreeSet; use std::collections::BTreeSet;
use std::sync::{Mutex, MutexGuard}; use std::sync::{Mutex, MutexGuard};
use std::time; use std::time;
use futures::sync::oneshot; use futures::future::{AbortHandle, Abortable};
use futures::Future; use futures::prelude::*;
use iocontext::*; use crate::iocontext::*;
use RTPJitterBuffer; use crate::{RTPJitterBuffer, RTPJitterBufferItem, RTPPacketRateCtx};
use RTPJitterBufferItem;
use RTPPacketRateCtx;
const DEFAULT_LATENCY_MS: u32 = 200; const DEFAULT_LATENCY_MS: u32 = 200;
const DEFAULT_DO_LOST: bool = false; const DEFAULT_DO_LOST: bool = false;
@ -202,10 +205,10 @@ struct State {
earliest_seqnum: u16, earliest_seqnum: u16,
last_popped_pts: gst::ClockTime, last_popped_pts: gst::ClockTime,
discont: bool, discont: bool,
cancel: Option<oneshot::Sender<()>>, abort_handle: Option<AbortHandle>,
last_res: Result<gst::FlowSuccess, gst::FlowError>, last_res: Result<gst::FlowSuccess, gst::FlowError>,
pending_future_id: Option<PendingFutureId>, pending_future_id: Option<PendingFutureId>,
pending_future_cancel: Option<futures::sync::oneshot::Sender<()>>, pending_future_abort_handle: Option<AbortHandle>,
} }
impl Default for State { impl Default for State {
@ -232,10 +235,10 @@ impl Default for State {
earliest_seqnum: 0, earliest_seqnum: 0,
last_popped_pts: gst::CLOCK_TIME_NONE, last_popped_pts: gst::CLOCK_TIME_NONE,
discont: false, discont: false,
cancel: None, abort_handle: None,
last_res: Ok(gst::FlowSuccess::Ok), last_res: Ok(gst::FlowSuccess::Ok),
pending_future_id: None, pending_future_id: None,
pending_future_cancel: None, pending_future_abort_handle: None,
} }
} }
} }
@ -811,11 +814,11 @@ impl JitterBuffer {
} }
}; };
if let Some(cancel) = state.cancel.take() { if let Some(abort_handle) = state.abort_handle.take() {
let _ = cancel.send(()); abort_handle.abort();
} }
let (cancel, cancel_handler) = oneshot::channel(); let (abort_handle, abort_registration) = AbortHandle::new_pair();
let element_clone = element.clone(); let element_clone = element.clone();
@ -825,13 +828,12 @@ impl JitterBuffer {
state.io_context.as_ref().unwrap(), state.io_context.as_ref().unwrap(),
time::Duration::from_nanos(timeout), time::Duration::from_nanos(timeout),
) )
.map_err(|e| panic!("timer failed; err={:?}", e)) .map(move |_| {
.and_then(move |_| {
let jb = Self::from_instance(&element_clone); let jb = Self::from_instance(&element_clone);
let mut state = jb.state.lock().unwrap(); let mut state = jb.state.lock().unwrap();
if state.io_context.is_none() { if state.io_context.is_none() {
return Ok(()); return;
} }
let now = jb.get_current_running_time(&element_clone); let now = jb.get_current_running_time(&element_clone);
@ -843,7 +845,9 @@ impl JitterBuffer {
state.earliest_pts state.earliest_pts
); );
let _ = state.cancel.take(); if let Some(abort_handle) = state.abort_handle.take() {
abort_handle.abort();
}
/* Check earliest PTS as we have just taken the lock */ /* Check earliest PTS as we have just taken the lock */
if state.earliest_pts.is_some() if state.earliest_pts.is_some()
@ -856,15 +860,19 @@ impl JitterBuffer {
state.last_res = jb.pop_and_push(&mut state, &element_clone); state.last_res = jb.pop_and_push(&mut state, &element_clone);
if let Some(pending_future_id) = state.pending_future_id { if let Some(pending_future_id) = state.pending_future_id {
let (cancel, future) = state let (abort_handle, abortable_drain) = state
.io_context .io_context
.as_ref() .as_ref()
.unwrap() .unwrap()
.drain_pending_futures(pending_future_id); .drain_pending_futures(pending_future_id);
state.pending_future_cancel = cancel; state.pending_future_abort_handle = abort_handle;
state.io_context.as_ref().unwrap().spawn(future); state
.io_context
.as_ref()
.unwrap()
.spawn(abortable_drain.map(|_| ()));
} }
if head_pts == state.earliest_pts if head_pts == state.earliest_pts
@ -876,7 +884,7 @@ impl JitterBuffer {
state.earliest_seqnum = earliest_seqnum as u16; state.earliest_seqnum = earliest_seqnum as u16;
} }
if state.pending_future_cancel.is_some() if state.pending_future_abort_handle.is_some()
|| state.earliest_pts.is_none() || state.earliest_pts.is_none()
|| state.earliest_pts + latency_ns || state.earliest_pts + latency_ns
- state.packet_spacing - state.packet_spacing
@ -889,15 +897,17 @@ impl JitterBuffer {
} }
jb.schedule(&mut state, &element_clone); jb.schedule(&mut state, &element_clone);
Ok(())
}); });
let future = timer.select(cancel_handler).then(|_| Ok(())); let abortable_timer = Abortable::new(timer, abort_registration);
state.cancel = Some(cancel); state.abort_handle = Some(abort_handle);
state.io_context.as_ref().unwrap().spawn(future); state
.io_context
.as_ref()
.unwrap()
.spawn(abortable_timer.map(|_| ()));
} }
} }
@ -1298,7 +1308,9 @@ impl ElementImpl for JitterBuffer {
} }
gst::StateChange::PausedToReady => { gst::StateChange::PausedToReady => {
let mut state = self.state.lock().unwrap(); let mut state = self.state.lock().unwrap();
let _ = state.pending_future_cancel.take(); if let Some(abort_handle) = state.pending_future_abort_handle.take() {
abort_handle.abort();
}
} }
gst::StateChange::ReadyToNull => { gst::StateChange::ReadyToNull => {
let mut state = self.state.lock().unwrap(); let mut state = self.state.lock().unwrap();
@ -1312,7 +1324,9 @@ impl ElementImpl for JitterBuffer {
state.jbuf.borrow().flush(); state.jbuf.borrow().flush();
let _ = state.cancel.take(); if let Some(abort_handle) = state.abort_handle.take() {
abort_handle.abort();
}
let _ = state.io_context.take(); let _ = state.io_context.take();
} }
_ => (), _ => (),

View file

@ -17,40 +17,6 @@
#![crate_type = "cdylib"] #![crate_type = "cdylib"]
extern crate libc;
extern crate gio_sys as gio_ffi;
extern crate glib_sys as glib_ffi;
extern crate gobject_sys as gobject_ffi;
extern crate gstreamer_sys as gst_ffi;
extern crate gio;
#[macro_use]
extern crate glib;
#[macro_use]
extern crate gstreamer as gst;
extern crate gstreamer_net as gst_net;
extern crate gstreamer_rtp as gst_rtp;
extern crate futures;
extern crate tokio;
extern crate tokio_current_thread;
extern crate tokio_executor;
extern crate tokio_reactor;
extern crate tokio_timer;
extern crate either;
extern crate rand;
#[macro_use]
extern crate lazy_static;
extern crate net2;
#[cfg(windows)]
extern crate winapi;
mod iocontext; mod iocontext;
mod socket; mod socket;
@ -63,10 +29,11 @@ mod jitterbuffer;
mod proxy; mod proxy;
mod queue; mod queue;
use glib::prelude::*; use glib_sys as glib_ffi;
use glib::translate::*;
use std::mem; use gst;
use std::ptr; use gst::gst_plugin_define;
use gstreamer_sys as gst_ffi;
fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
udpsrc::register(plugin)?; udpsrc::register(plugin)?;
@ -123,7 +90,10 @@ impl<'a> Drop for MutexGuard<'a> {
pub mod ffi { pub mod ffi {
use glib_ffi::{gboolean, gpointer, GList, GType}; use glib_ffi::{gboolean, gpointer, GList, GType};
use glib_sys as glib_ffi;
use gst_ffi::GstClockTime; use gst_ffi::GstClockTime;
use gstreamer_sys as gst_ffi;
use libc::{c_int, c_uint, c_ulonglong, c_ushort, c_void}; use libc::{c_int, c_uint, c_ulonglong, c_ushort, c_void};
#[repr(C)] #[repr(C)]
@ -214,6 +184,13 @@ pub mod ffi {
} }
} }
use glib::prelude::*;
use glib::translate::*;
use glib::{glib_object_wrapper, glib_wrapper};
use std::mem;
use std::ptr;
glib_wrapper! { glib_wrapper! {
pub struct RTPJitterBuffer(Object<ffi::RTPJitterBuffer, RTPJitterBufferClass>); pub struct RTPJitterBuffer(Object<ffi::RTPJitterBuffer, RTPJitterBufferClass>);
@ -335,9 +312,9 @@ unsafe impl Send for RTPPacketRateCtx {}
impl RTPPacketRateCtx { impl RTPPacketRateCtx {
pub fn new() -> RTPPacketRateCtx { pub fn new() -> RTPPacketRateCtx {
unsafe { unsafe {
let mut ptr: ffi::RTPPacketRateCtx = std::mem::uninitialized(); let mut ptr = std::mem::MaybeUninit::uninit();
ffi::gst_rtp_packet_rate_ctx_reset(&mut ptr, -1); ffi::gst_rtp_packet_rate_ctx_reset(ptr.as_mut_ptr(), -1);
RTPPacketRateCtx(Box::new(ptr)) RTPPacketRateCtx(Box::new(ptr.assume_init()))
} }
} }
@ -432,29 +409,35 @@ impl RTPJitterBuffer {
pub fn insert(&self, mut item: RTPJitterBufferItem) -> (bool, bool, i32) { pub fn insert(&self, mut item: RTPJitterBufferItem) -> (bool, bool, i32) {
unsafe { unsafe {
let mut head = mem::uninitialized(); let mut head = mem::MaybeUninit::uninit();
let mut percent = mem::uninitialized(); let mut percent = mem::MaybeUninit::uninit();
let box_ = item.0.take().expect("Invalid wrapper"); let box_ = item.0.take().expect("Invalid wrapper");
let ptr = Box::into_raw(box_); let ptr = Box::into_raw(box_);
let ret: bool = from_glib(ffi::rtp_jitter_buffer_insert( let ret: bool = from_glib(ffi::rtp_jitter_buffer_insert(
self.to_glib_none().0, self.to_glib_none().0,
ptr, ptr,
&mut head, head.as_mut_ptr(),
&mut percent, percent.as_mut_ptr(),
)); ));
if !ret { if !ret {
item.0 = Some(Box::from_raw(ptr)); item.0 = Some(Box::from_raw(ptr));
} }
(ret, from_glib(head), percent) (ret, from_glib(head.assume_init()), percent.assume_init())
} }
} }
pub fn find_earliest(&self) -> (gst::ClockTime, u32) { pub fn find_earliest(&self) -> (gst::ClockTime, u32) {
unsafe { unsafe {
let mut pts = mem::uninitialized(); let mut pts = mem::MaybeUninit::uninit();
let mut seqnum = mem::uninitialized(); let mut seqnum = mem::MaybeUninit::uninit();
ffi::rtp_jitter_buffer_find_earliest(self.to_glib_none().0, &mut pts, &mut seqnum); ffi::rtp_jitter_buffer_find_earliest(
self.to_glib_none().0,
pts.as_mut_ptr(),
seqnum.as_mut_ptr(),
);
let pts = pts.assume_init();
let seqnum = seqnum.assume_init();
if pts == gst_ffi::GST_CLOCK_TIME_NONE { if pts == gst_ffi::GST_CLOCK_TIME_NONE {
(gst::CLOCK_TIME_NONE, seqnum) (gst::CLOCK_TIME_NONE, seqnum)
@ -466,10 +449,13 @@ impl RTPJitterBuffer {
pub fn pop(&self) -> (RTPJitterBufferItem, i32) { pub fn pop(&self) -> (RTPJitterBufferItem, i32) {
unsafe { unsafe {
let mut percent = mem::uninitialized(); let mut percent = mem::MaybeUninit::uninit();
let item = ffi::rtp_jitter_buffer_pop(self.to_glib_none().0, &mut percent); let item = ffi::rtp_jitter_buffer_pop(self.to_glib_none().0, percent.as_mut_ptr());
(RTPJitterBufferItem(Some(Box::from_raw(item))), percent) (
RTPJitterBufferItem(Some(Box::from_raw(item))),
percent.assume_init(),
)
} }
} }

View file

@ -15,28 +15,29 @@
// Free Software Foundation, Inc., 51 Franklin Street, Suite 500, // Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
// Boston, MA 02110-1335, USA. // Boston, MA 02110-1335, USA.
use futures::prelude::*;
use glib; use glib;
use glib::prelude::*; use glib::prelude::*;
use glib::subclass; use glib::subclass;
use glib::subclass::prelude::*; use glib::subclass::prelude::*;
use glib::{glib_object_impl, glib_object_subclass};
use gst; use gst;
use gst::prelude::*; use gst::prelude::*;
use gst::subclass::prelude::*; use gst::subclass::prelude::*;
use gst::{gst_debug, gst_element_error, gst_error, gst_error_msg, gst_log, gst_trace};
use std::collections::HashMap; use lazy_static::lazy_static;
use std::collections::VecDeque;
use std::collections::{HashMap, VecDeque};
use std::sync::{Arc, Mutex, Weak}; use std::sync::{Arc, Mutex, Weak};
use std::task::{self, Poll};
use std::{u32, u64}; use std::{u32, u64};
use futures; use tokio_executor::current_thread as tokio_current_thread;
use futures::future;
use futures::task;
use futures::{Async, Future};
use tokio::executor; use super::{dataqueue::*, iocontext::*};
use dataqueue::*;
use iocontext::*;
lazy_static! { lazy_static! {
static ref CONTEXTS: Mutex<HashMap<String, Weak<Mutex<SharedQueueInner>>>> = static ref CONTEXTS: Mutex<HashMap<String, Weak<Mutex<SharedQueueInner>>>> =
@ -200,7 +201,7 @@ impl SharedQueue {
queue: None, queue: None,
last_res: Err(gst::FlowError::Flushing), last_res: Err(gst::FlowError::Flushing),
pending_queue: None, pending_queue: None,
pending_future_cancel: None, pending_future_abort_handle: None,
have_sink: as_sink, have_sink: as_sink,
have_src: !as_sink, have_src: !as_sink,
})); }));
@ -220,15 +221,17 @@ impl Drop for SharedQueue {
let mut inner = self.0.lock().unwrap(); let mut inner = self.0.lock().unwrap();
assert!(inner.have_sink); assert!(inner.have_sink);
inner.have_sink = false; inner.have_sink = false;
if let Some((Some(task), _, _)) = inner.pending_queue.take() { if let Some((Some(waker), _, _)) = inner.pending_queue.take() {
task.notify(); waker.wake();
} }
} else { } else {
let mut inner = self.0.lock().unwrap(); let mut inner = self.0.lock().unwrap();
assert!(inner.have_src); assert!(inner.have_src);
inner.have_src = false; inner.have_src = false;
let _ = inner.queue.take(); let _ = inner.queue.take();
let _ = inner.pending_future_cancel.take(); if let Some(abort_handle) = inner.pending_future_abort_handle.take() {
abort_handle.abort();
}
} }
} }
} }
@ -237,8 +240,8 @@ struct SharedQueueInner {
name: String, name: String,
queue: Option<DataQueue>, queue: Option<DataQueue>,
last_res: Result<gst::FlowSuccess, gst::FlowError>, last_res: Result<gst::FlowSuccess, gst::FlowError>,
pending_queue: Option<(Option<task::Task>, bool, VecDeque<DataQueueItem>)>, pending_queue: Option<(Option<task::Waker>, bool, VecDeque<DataQueueItem>)>,
pending_future_cancel: Option<futures::sync::oneshot::Sender<()>>, pending_future_abort_handle: Option<future::AbortHandle>,
have_sink: bool, have_sink: bool,
have_src: bool, have_src: bool,
} }
@ -377,7 +380,7 @@ impl ProxySink {
queue.pending_queue.as_mut().unwrap().1 = true; queue.pending_queue.as_mut().unwrap().1 = true;
let element_clone = element.clone(); let element_clone = element.clone();
let future = future::poll_fn(move || { let future = future::poll_fn(move |cx| {
let sink = Self::from_instance(&element_clone); let sink = Self::from_instance(&element_clone);
let state = sink.state.lock().unwrap(); let state = sink.state.lock().unwrap();
@ -390,7 +393,7 @@ impl ProxySink {
let mut queue = match state.queue { let mut queue = match state.queue {
Some(ref queue) => queue.0.lock().unwrap(), Some(ref queue) => queue.0.lock().unwrap(),
None => { None => {
return Ok(Async::Ready(())); return Poll::Ready(Ok(()));
} }
}; };
@ -400,51 +403,51 @@ impl ProxySink {
.. ..
} = *queue; } = *queue;
let res = if let Some((ref mut task, _, ref mut items)) = *pending_queue let res =
{ if let Some((ref mut waker, _, ref mut items)) = *pending_queue {
if let Some(ref queue) = queue { if let Some(ref queue) = queue {
let mut failed_item = None; let mut failed_item = None;
while let Some(item) = items.pop_front() { while let Some(item) = items.pop_front() {
if let Err(item) = queue.push(item) { if let Err(item) = queue.push(item) {
failed_item = Some(item); failed_item = Some(item);
}
} }
}
if let Some(failed_item) = failed_item { if let Some(failed_item) = failed_item {
items.push_front(failed_item); items.push_front(failed_item);
*task = Some(task::current()); *waker = Some(cx.waker().clone());
gst_log!( gst_log!(
SINK_CAT, SINK_CAT,
obj: &element_clone, obj: &element_clone,
"Waiting for more queue space" "Waiting for more queue space"
); );
Ok(Async::NotReady) Poll::Pending
} else {
gst_log!(
SINK_CAT,
obj: &element_clone,
"Pending queue is empty now"
);
Poll::Ready(Ok(()))
}
} else { } else {
gst_log!( gst_log!(
SINK_CAT, SINK_CAT,
obj: &element_clone, obj: &element_clone,
"Pending queue is empty now" "Waiting for queue to be allocated"
); );
Ok(Async::Ready(())) Poll::Pending
} }
} else { } else {
gst_log!( gst_log!(
SINK_CAT, SINK_CAT,
obj: &element_clone, obj: &element_clone,
"Waiting for queue to be allocated" "Flushing, dropping pending queue"
); );
Ok(Async::NotReady) Poll::Ready(Ok(()))
} };
} else {
gst_log!(
SINK_CAT,
obj: &element_clone,
"Flushing, dropping pending queue"
);
Ok(Async::Ready(()))
};
if res == Ok(Async::Ready(())) { if res == Poll::Ready(Ok(())) {
*pending_queue = None; *pending_queue = None;
} }
@ -476,7 +479,7 @@ impl ProxySink {
if let Some(wait_future) = wait_future { if let Some(wait_future) = wait_future {
gst_log!(SINK_CAT, obj: element, "Blocking until queue becomes empty"); gst_log!(SINK_CAT, obj: element, "Blocking until queue becomes empty");
executor::current_thread::block_on_all(wait_future).map_err(|_| { tokio_current_thread::block_on_all(wait_future).map_err(|_| {
gst_element_error!( gst_element_error!(
element, element,
gst::StreamError::Failed, gst::StreamError::Failed,
@ -537,12 +540,12 @@ impl ProxySink {
let mut state = self.state.lock().unwrap(); let mut state = self.state.lock().unwrap();
let io_context = s let io_context = s
.get::<&IOContext>("io-context") .get::<&IOContext>("io-context")
.expect("signal arg") .expect("event field")
.expect("missing signal arg"); .expect("missing event field");
let pending_future_id = s let pending_future_id = s
.get::<&PendingFutureId>("pending-future-id") .get::<&PendingFutureId>("pending-future-id")
.expect("signal arg") .expect("event field")
.expect("missing signal arg"); .expect("missing event field");
gst_debug!( gst_debug!(
SINK_CAT, SINK_CAT,
@ -627,8 +630,8 @@ impl ProxySink {
let mut queue = state.queue.as_ref().unwrap().0.lock().unwrap(); let mut queue = state.queue.as_ref().unwrap().0.lock().unwrap();
if let Some((Some(task), _, _)) = queue.pending_queue.take() { if let Some((Some(waker), _, _)) = queue.pending_queue.take() {
task.notify(); waker.wake();
} }
queue.last_res = Err(gst::FlowError::Flushing); queue.last_res = Err(gst::FlowError::Flushing);
@ -745,7 +748,7 @@ impl ObjectImpl for ProxySink {
let element = obj.downcast_ref::<gst::Element>().unwrap(); let element = obj.downcast_ref::<gst::Element>().unwrap();
element.add_pad(&self.sink_pad).unwrap(); element.add_pad(&self.sink_pad).unwrap();
::set_element_flags(element, gst::ElementFlags::SINK); super::set_element_flags(element, gst::ElementFlags::SINK);
} }
} }
@ -893,22 +896,17 @@ impl ProxySrc {
ret ret
} }
fn push_item( async fn push_item(element: gst::Element, item: DataQueueItem) -> Result<(), gst::FlowError> {
&self, let src = Self::from_instance(&element);
element: &gst::Element,
item: DataQueueItem,
) -> future::Either<
Box<dyn Future<Item = (), Error = gst::FlowError> + Send + 'static>,
future::FutureResult<(), gst::FlowError>,
> {
let event = { let event = {
let state = self.state.lock().unwrap(); let state = src.state.lock().unwrap();
let queue = state.queue.as_ref().unwrap().0.lock().unwrap(); let queue = state.queue.as_ref().unwrap().0.lock().unwrap();
if let Some((Some(ref task), _, _)) = queue.pending_queue { if let Some((Some(ref waker), _, _)) = queue.pending_queue {
task.notify(); waker.wake_by_ref();
} }
if self.src_pad.check_reconfigure() { if src.src_pad.check_reconfigure() {
Self::create_io_context_event(&state) Self::create_io_context_event(&state)
} else { } else {
None None
@ -916,17 +914,17 @@ impl ProxySrc {
}; };
if let Some(event) = event { if let Some(event) = event {
self.src_pad.push_event(event); src.src_pad.push_event(event);
} }
let res = match item { let res = match item {
DataQueueItem::Buffer(buffer) => { DataQueueItem::Buffer(buffer) => {
gst_log!(SRC_CAT, obj: element, "Forwarding buffer {:?}", buffer); gst_log!(SRC_CAT, obj: &element, "Forwarding buffer {:?}", buffer);
self.src_pad.push(buffer).map(|_| ()) src.src_pad.push(buffer).map(|_| ())
} }
DataQueueItem::BufferList(list) => { DataQueueItem::BufferList(list) => {
gst_log!(SRC_CAT, obj: element, "Forwarding buffer list {:?}", list); gst_log!(SRC_CAT, obj: &element, "Forwarding buffer list {:?}", list);
self.src_pad.push_list(list).map(|_| ()) src.src_pad.push_list(list).map(|_| ())
} }
DataQueueItem::Event(event) => { DataQueueItem::Event(event) => {
use gst::EventView; use gst::EventView;
@ -936,7 +934,7 @@ impl ProxySrc {
EventView::CustomDownstreamSticky(e) => { EventView::CustomDownstreamSticky(e) => {
let s = e.get_structure().unwrap(); let s = e.get_structure().unwrap();
if s.get_name() == "ts-io-context" { if s.get_name() == "ts-io-context" {
let state = self.state.lock().unwrap(); let state = src.state.lock().unwrap();
new_event = Self::create_io_context_event(&state); new_event = Self::create_io_context_event(&state);
} }
} }
@ -945,82 +943,79 @@ impl ProxySrc {
match new_event { match new_event {
Some(event) => { Some(event) => {
gst_log!(SRC_CAT, obj: element, "Forwarding new event {:?}", event); gst_log!(SRC_CAT, obj: &element, "Forwarding new event {:?}", event);
self.src_pad.push_event(event); src.src_pad.push_event(event);
} }
None => { None => {
gst_log!(SRC_CAT, obj: element, "Forwarding event {:?}", event); gst_log!(SRC_CAT, obj: &element, "Forwarding event {:?}", event);
self.src_pad.push_event(event); src.src_pad.push_event(event);
} }
} }
Ok(()) Ok(())
} }
}; };
let res = match res { match res {
Ok(_) => { Ok(_) => {
gst_log!(SRC_CAT, obj: element, "Successfully pushed item"); gst_log!(SRC_CAT, obj: &element, "Successfully pushed item");
let state = self.state.lock().unwrap(); let state = src.state.lock().unwrap();
let mut queue = state.queue.as_ref().unwrap().0.lock().unwrap(); let mut queue = state.queue.as_ref().unwrap().0.lock().unwrap();
queue.last_res = Ok(gst::FlowSuccess::Ok); queue.last_res = Ok(gst::FlowSuccess::Ok);
Ok(())
} }
Err(gst::FlowError::Flushing) => { Err(gst::FlowError::Flushing) => {
gst_debug!(SRC_CAT, obj: element, "Flushing"); gst_debug!(SRC_CAT, obj: &element, "Flushing");
let state = self.state.lock().unwrap(); let state = src.state.lock().unwrap();
let mut queue = state.queue.as_ref().unwrap().0.lock().unwrap(); let mut queue = state.queue.as_ref().unwrap().0.lock().unwrap();
if let Some(ref queue) = queue.queue { if let Some(ref queue) = queue.queue {
queue.pause(); queue.pause();
} }
queue.last_res = Err(gst::FlowError::Flushing); queue.last_res = Err(gst::FlowError::Flushing);
Ok(())
} }
Err(gst::FlowError::Eos) => { Err(gst::FlowError::Eos) => {
gst_debug!(SRC_CAT, obj: element, "EOS"); gst_debug!(SRC_CAT, obj: &element, "EOS");
let state = self.state.lock().unwrap(); let state = src.state.lock().unwrap();
let mut queue = state.queue.as_ref().unwrap().0.lock().unwrap(); let mut queue = state.queue.as_ref().unwrap().0.lock().unwrap();
if let Some(ref queue) = queue.queue { if let Some(ref queue) = queue.queue {
queue.pause(); queue.pause();
} }
queue.last_res = Err(gst::FlowError::Eos); queue.last_res = Err(gst::FlowError::Eos);
Ok(())
} }
Err(err) => { Err(err) => {
gst_error!(SRC_CAT, obj: element, "Got error {}", err); gst_error!(SRC_CAT, obj: &element, "Got error {}", err);
gst_element_error!( gst_element_error!(
element, element,
gst::StreamError::Failed, gst::StreamError::Failed,
("Internal data stream error"), ("Internal data stream error"),
["streaming stopped, reason {}", err] ["streaming stopped, reason {}", err]
); );
let state = self.state.lock().unwrap(); let state = src.state.lock().unwrap();
let mut queue = state.queue.as_ref().unwrap().0.lock().unwrap(); let mut queue = state.queue.as_ref().unwrap().0.lock().unwrap();
queue.last_res = Err(err); queue.last_res = Err(err);
Err(gst::FlowError::CustomError) return Err(gst::FlowError::CustomError);
}
}
let abortable_drain = {
let state = src.state.lock().unwrap();
if let StateSrc {
io_context: Some(ref io_context),
pending_future_id: Some(ref pending_future_id),
queue: Some(ref queue),
..
} = *state
{
let (abort_handle, abortable_drain) =
io_context.drain_pending_futures(*pending_future_id);
queue.0.lock().unwrap().pending_future_abort_handle = abort_handle;
abortable_drain
} else {
return Ok(());
} }
}; };
match res { abortable_drain.await
Ok(()) => {
let state = self.state.lock().unwrap();
if let StateSrc {
io_context: Some(ref io_context),
pending_future_id: Some(ref pending_future_id),
queue: Some(ref queue),
..
} = *state
{
let (cancel, future) = io_context.drain_pending_futures(*pending_future_id);
queue.0.lock().unwrap().pending_future_cancel = cancel;
future
} else {
future::Either::B(future::ok(()))
}
}
Err(err) => future::Either::B(future::err(err)),
}
} }
fn prepare(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> { fn prepare(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
@ -1066,10 +1061,7 @@ impl ProxySrc {
dataqueue dataqueue
.schedule( .schedule(
&io_context, &io_context,
move |item| { move |item| Self::push_item(element_clone.clone(), item),
let src = Self::from_instance(&element_clone);
src.push_item(&element_clone, item)
},
move |err| { move |err| {
gst_error!(SRC_CAT, obj: &element_clone2, "Got error {}", err); gst_error!(SRC_CAT, obj: &element_clone2, "Got error {}", err);
match err { match err {
@ -1172,7 +1164,9 @@ impl ProxySrc {
queue.pause(); queue.pause();
queue.clear(&self.src_pad); queue.clear(&self.src_pad);
} }
let _ = queue.pending_future_cancel.take(); if let Some(abort_handle) = queue.pending_future_abort_handle.take() {
abort_handle.abort();
}
gst_debug!(SRC_CAT, obj: element, "Stopped"); gst_debug!(SRC_CAT, obj: element, "Stopped");
@ -1320,7 +1314,7 @@ impl ObjectImpl for ProxySrc {
let element = obj.downcast_ref::<gst::Element>().unwrap(); let element = obj.downcast_ref::<gst::Element>().unwrap();
element.add_pad(&self.src_pad).unwrap(); element.add_pad(&self.src_pad).unwrap();
::set_element_flags(element, gst::ElementFlags::SOURCE); super::set_element_flags(element, gst::ElementFlags::SOURCE);
} }
} }

View file

@ -15,27 +15,29 @@
// Free Software Foundation, Inc., 51 Franklin Street, Suite 500, // Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
// Boston, MA 02110-1335, USA. // Boston, MA 02110-1335, USA.
use futures::prelude::*;
use glib; use glib;
use glib::prelude::*; use glib::prelude::*;
use glib::subclass; use glib::subclass;
use glib::subclass::prelude::*; use glib::subclass::prelude::*;
use glib::{glib_object_impl, glib_object_subclass};
use gst; use gst;
use gst::prelude::*; use gst::prelude::*;
use gst::subclass::prelude::*; use gst::subclass::prelude::*;
use gst::{gst_debug, gst_element_error, gst_error, gst_error_msg, gst_log, gst_trace};
use lazy_static::lazy_static;
use std::collections::VecDeque; use std::collections::VecDeque;
use std::sync::Mutex; use std::sync::Mutex;
use std::task::{self, Poll};
use std::{u32, u64}; use std::{u32, u64};
use futures; use tokio_executor::current_thread as tokio_current_thread;
use futures::future;
use futures::task;
use futures::{Async, Future};
use tokio::executor; use super::{dataqueue::*, iocontext::*};
use dataqueue::*;
use iocontext::*;
const DEFAULT_MAX_SIZE_BUFFERS: u32 = 200; const DEFAULT_MAX_SIZE_BUFFERS: u32 = 200;
const DEFAULT_MAX_SIZE_BYTES: u32 = 1024 * 1024; const DEFAULT_MAX_SIZE_BYTES: u32 = 1024 * 1024;
@ -121,7 +123,7 @@ static PROPERTIES: [subclass::Property; 5] = [
]; ];
struct PendingQueue { struct PendingQueue {
task: Option<task::Task>, waker: Option<task::Waker>,
scheduled: bool, scheduled: bool,
items: VecDeque<DataQueueItem>, items: VecDeque<DataQueueItem>,
} }
@ -134,7 +136,7 @@ struct State {
queue: Option<DataQueue>, queue: Option<DataQueue>,
pending_queue: Option<PendingQueue>, pending_queue: Option<PendingQueue>,
last_res: Result<gst::FlowSuccess, gst::FlowError>, last_res: Result<gst::FlowSuccess, gst::FlowError>,
pending_future_cancel: Option<futures::sync::oneshot::Sender<()>>, pending_future_abort_handle: Option<future::AbortHandle>,
} }
impl Default for State { impl Default for State {
@ -147,7 +149,7 @@ impl Default for State {
queue: None, queue: None,
pending_queue: None, pending_queue: None,
last_res: Ok(gst::FlowSuccess::Ok), last_res: Ok(gst::FlowSuccess::Ok),
pending_future_cancel: None, pending_future_abort_handle: None,
} }
} }
} }
@ -228,7 +230,7 @@ impl Queue {
&self, &self,
element: &gst::Element, element: &gst::Element,
state: &mut State, state: &mut State,
) -> Option<impl Future<Item = (), Error = ()>> { ) -> Option<impl Future<Output = Result<(), gst::FlowError>>> {
gst_log!(CAT, obj: element, "Scheduling pending queue now"); gst_log!(CAT, obj: element, "Scheduling pending queue now");
let State { let State {
@ -241,7 +243,7 @@ impl Queue {
pending_queue.as_mut().unwrap().scheduled = true; pending_queue.as_mut().unwrap().scheduled = true;
let element_clone = element.clone(); let element_clone = element.clone();
let future = future::poll_fn(move || { let future = future::poll_fn(move |cx| {
let queue = Self::from_instance(&element_clone); let queue = Self::from_instance(&element_clone);
let mut state = queue.state.lock().unwrap(); let mut state = queue.state.lock().unwrap();
@ -252,13 +254,13 @@ impl Queue {
} = *state; } = *state;
if dq.is_none() { if dq.is_none() {
return Ok(Async::Ready(())); return Poll::Ready(Ok(()));
} }
gst_log!(CAT, obj: &element_clone, "Trying to empty pending queue"); gst_log!(CAT, obj: &element_clone, "Trying to empty pending queue");
let res = if let Some(PendingQueue { let res = if let Some(PendingQueue {
ref mut task, ref mut waker,
ref mut items, ref mut items,
.. ..
}) = *pending_queue }) = *pending_queue
@ -272,19 +274,19 @@ impl Queue {
if let Some(failed_item) = failed_item { if let Some(failed_item) = failed_item {
items.push_front(failed_item); items.push_front(failed_item);
*task = Some(task::current()); *waker = Some(cx.waker().clone());
gst_log!(CAT, obj: &element_clone, "Waiting for more queue space"); gst_log!(CAT, obj: &element_clone, "Waiting for more queue space");
Ok(Async::NotReady) Poll::Pending
} else { } else {
gst_log!(CAT, obj: &element_clone, "Pending queue is empty now"); gst_log!(CAT, obj: &element_clone, "Pending queue is empty now");
Ok(Async::Ready(())) Poll::Ready(Ok(()))
} }
} else { } else {
gst_log!(CAT, obj: &element_clone, "Flushing, dropping pending queue"); gst_log!(CAT, obj: &element_clone, "Flushing, dropping pending queue");
Ok(Async::Ready(())) Poll::Ready(Ok(()))
}; };
if res == Ok(Async::Ready(())) { if res == Poll::Ready(Ok(())) {
*pending_queue = None; *pending_queue = None;
} }
@ -326,7 +328,7 @@ impl Queue {
{ {
if pending_queue.is_none() { if pending_queue.is_none() {
*pending_queue = Some(PendingQueue { *pending_queue = Some(PendingQueue {
task: None, waker: None,
scheduled: false, scheduled: false,
items: VecDeque::new(), items: VecDeque::new(),
}); });
@ -367,7 +369,7 @@ impl Queue {
if let Some(wait_future) = wait_future { if let Some(wait_future) = wait_future {
gst_log!(CAT, obj: element, "Blocking until queue has space again"); gst_log!(CAT, obj: element, "Blocking until queue has space again");
executor::current_thread::block_on_all(wait_future).map_err(|_| { tokio_current_thread::block_on_all(wait_future).map_err(|_| {
gst_element_error!( gst_element_error!(
element, element,
gst::StreamError::Failed, gst::StreamError::Failed,
@ -424,12 +426,12 @@ impl Queue {
let mut state = self.state.lock().unwrap(); let mut state = self.state.lock().unwrap();
let io_context = s let io_context = s
.get::<&IOContext>("io-context") .get::<&IOContext>("io-context")
.expect("signal arg") .expect("event field")
.expect("missing signal arg"); .expect("missing event field");
let pending_future_id = s let pending_future_id = s
.get::<&PendingFutureId>("pending-future-id") .get::<&PendingFutureId>("pending-future-id")
.expect("signal arg") .expect("event field")
.expect("missing signal arg"); .expect("missing event field");
gst_debug!( gst_debug!(
CAT, CAT,
@ -545,25 +547,20 @@ impl Queue {
self.sink_pad.peer_query(query) self.sink_pad.peer_query(query)
} }
fn push_item( async fn push_item(element: gst::Element, item: DataQueueItem) -> Result<(), gst::FlowError> {
&self, let queue = Self::from_instance(&element);
element: &gst::Element,
item: DataQueueItem,
) -> future::Either<
Box<dyn Future<Item = (), Error = gst::FlowError> + Send + 'static>,
future::FutureResult<(), gst::FlowError>,
> {
let event = { let event = {
let state = self.state.lock().unwrap(); let state = queue.state.lock().unwrap();
if let Some(PendingQueue { if let Some(PendingQueue {
task: Some(ref task), waker: Some(ref waker),
.. ..
}) = state.pending_queue }) = state.pending_queue
{ {
task.notify(); waker.wake_by_ref();
} }
if self.src_pad.check_reconfigure() { if queue.src_pad.check_reconfigure() {
Self::create_io_context_event(&state) Self::create_io_context_event(&state)
} else { } else {
None None
@ -571,85 +568,82 @@ impl Queue {
}; };
if let Some(event) = event { if let Some(event) = event {
self.src_pad.push_event(event); queue.src_pad.push_event(event);
} }
let res = match item { let res = match item {
DataQueueItem::Buffer(buffer) => { DataQueueItem::Buffer(buffer) => {
gst_log!(CAT, obj: element, "Forwarding buffer {:?}", buffer); gst_log!(CAT, obj: &element, "Forwarding buffer {:?}", buffer);
self.src_pad.push(buffer).map(|_| ()) queue.src_pad.push(buffer).map(|_| ())
} }
DataQueueItem::BufferList(list) => { DataQueueItem::BufferList(list) => {
gst_log!(CAT, obj: element, "Forwarding buffer list {:?}", list); gst_log!(CAT, obj: &element, "Forwarding buffer list {:?}", list);
self.src_pad.push_list(list).map(|_| ()) queue.src_pad.push_list(list).map(|_| ())
} }
DataQueueItem::Event(event) => { DataQueueItem::Event(event) => {
gst_log!(CAT, obj: element, "Forwarding event {:?}", event); gst_log!(CAT, obj: &element, "Forwarding event {:?}", event);
self.src_pad.push_event(event); queue.src_pad.push_event(event);
Ok(()) Ok(())
} }
}; };
let res = match res {
Ok(_) => {
gst_log!(CAT, obj: element, "Successfully pushed item");
let mut state = self.state.lock().unwrap();
state.last_res = Ok(gst::FlowSuccess::Ok);
Ok(())
}
Err(gst::FlowError::Flushing) => {
gst_debug!(CAT, obj: element, "Flushing");
let mut state = self.state.lock().unwrap();
if let Some(ref queue) = state.queue {
queue.pause();
}
state.last_res = Err(gst::FlowError::Flushing);
Ok(())
}
Err(gst::FlowError::Eos) => {
gst_debug!(CAT, obj: element, "EOS");
let mut state = self.state.lock().unwrap();
if let Some(ref queue) = state.queue {
queue.pause();
}
state.last_res = Err(gst::FlowError::Eos);
Ok(())
}
Err(err) => {
gst_error!(CAT, obj: element, "Got error {}", err);
gst_element_error!(
element,
gst::StreamError::Failed,
("Internal data stream error"),
["streaming stopped, reason {}", err]
);
let mut state = self.state.lock().unwrap();
state.last_res = Err(err);
Err(gst::FlowError::CustomError)
}
};
match res { match res {
Ok(()) => { Ok(_) => {
let mut state = self.state.lock().unwrap(); gst_log!(CAT, obj: &element, "Successfully pushed item");
let mut state = queue.state.lock().unwrap();
if let State { state.last_res = Ok(gst::FlowSuccess::Ok);
io_context: Some(ref io_context), }
pending_future_id: Some(ref pending_future_id), Err(gst::FlowError::Flushing) => {
ref mut pending_future_cancel, gst_debug!(CAT, obj: &element, "Flushing");
.. let mut state = queue.state.lock().unwrap();
} = *state if let Some(ref queue) = state.queue {
{ queue.pause();
let (cancel, future) = io_context.drain_pending_futures(*pending_future_id); }
*pending_future_cancel = cancel; state.last_res = Err(gst::FlowError::Flushing);
}
future Err(gst::FlowError::Eos) => {
} else { gst_debug!(CAT, obj: &element, "EOS");
future::Either::B(future::ok(())) let mut state = queue.state.lock().unwrap();
} if let Some(ref queue) = state.queue {
queue.pause();
}
state.last_res = Err(gst::FlowError::Eos);
}
Err(err) => {
gst_error!(CAT, obj: &element, "Got error {}", err);
gst_element_error!(
&element,
gst::StreamError::Failed,
("Internal data stream error"),
["streaming stopped, reason {}", err]
);
let mut state = queue.state.lock().unwrap();
state.last_res = Err(err);
return Err(gst::FlowError::CustomError);
} }
Err(err) => future::Either::B(future::err(err)),
} }
let abortable_drain = {
let mut state = queue.state.lock().unwrap();
if let State {
io_context: Some(ref io_context),
pending_future_id: Some(ref pending_future_id),
ref mut pending_future_abort_handle,
..
} = *state
{
let (abort_handle, abortable_drain) =
io_context.drain_pending_futures(*pending_future_id);
*pending_future_abort_handle = abort_handle;
abortable_drain
} else {
return Ok(());
}
};
abortable_drain.await
} }
fn prepare(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> { fn prepare(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
@ -691,10 +685,7 @@ impl Queue {
dataqueue dataqueue
.schedule( .schedule(
&io_context, &io_context,
move |item| { move |item| Self::push_item(element_clone.clone(), item),
let queue = Self::from_instance(&element_clone);
queue.push_item(&element_clone, item)
},
move |err| { move |err| {
gst_error!(CAT, obj: &element_clone2, "Got error {}", err); gst_error!(CAT, obj: &element_clone2, "Got error {}", err);
match err { match err {
@ -786,13 +777,18 @@ impl Queue {
queue.pause(); queue.pause();
queue.clear(&self.src_pad); queue.clear(&self.src_pad);
} }
if let Some(PendingQueue { if let Some(PendingQueue {
task: Some(task), .. waker: Some(waker), ..
}) = state.pending_queue.take() }) = state.pending_queue.take()
{ {
task.notify(); waker.wake();
} }
let _ = state.pending_future_cancel.take();
if let Some(abort_handle) = state.pending_future_abort_handle.take() {
abort_handle.abort();
}
state.last_res = Err(gst::FlowError::Flushing); state.last_res = Err(gst::FlowError::Flushing);
gst_debug!(CAT, obj: element, "Stopped"); gst_debug!(CAT, obj: element, "Stopped");

View file

@ -16,19 +16,23 @@
// Free Software Foundation, Inc., 51 Franklin Street, Suite 500, // Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
// Boston, MA 02110-1335, USA. // Boston, MA 02110-1335, USA.
use std::io; use either::Either;
use std::sync::{Arc, Mutex}; use futures::{channel::oneshot, prelude::*};
use gst; use gst;
use gst::prelude::*; use gst::prelude::*;
use gst::{gst_debug, gst_error};
use futures::sync::oneshot; use lazy_static::lazy_static;
use futures::task;
use futures::{Async, Future, IntoFuture, Poll, Stream};
use either::Either; use std::io;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{self, Poll};
use iocontext::*; use tokio_executor::current_thread as tokio_current_thread;
use super::iocontext::*;
lazy_static! { lazy_static! {
static ref SOCKET_CAT: gst::DebugCategory = gst::DebugCategory::new( static ref SOCKET_CAT: gst::DebugCategory = gst::DebugCategory::new(
@ -48,21 +52,22 @@ enum SocketState {
Shutdown, Shutdown,
} }
pub trait SocketRead: Send { pub trait SocketRead: Send + Unpin {
const DO_TIMESTAMP: bool; const DO_TIMESTAMP: bool;
fn poll_read( fn poll_read(
&mut self, self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
buf: &mut [u8], buf: &mut [u8],
) -> Poll<(usize, Option<std::net::SocketAddr>), io::Error>; ) -> Poll<io::Result<(usize, Option<std::net::SocketAddr>)>>;
} }
struct SocketInner<T: SocketRead + 'static> { struct SocketInner<T: SocketRead + 'static> {
element: gst::Element, element: gst::Element,
state: SocketState, state: SocketState,
reader: T, reader: Pin<Box<T>>,
buffer_pool: gst::BufferPool, buffer_pool: gst::BufferPool,
current_task: Option<task::Task>, waker: Option<task::Waker>,
shutdown_receiver: Option<oneshot::Receiver<()>>, shutdown_receiver: Option<oneshot::Receiver<()>>,
clock: Option<gst::Clock>, clock: Option<gst::Clock>,
base_time: Option<gst::ClockTime>, base_time: Option<gst::ClockTime>,
@ -73,20 +78,24 @@ impl<T: SocketRead + 'static> Socket<T> {
Socket(Arc::new(Mutex::new(SocketInner::<T> { Socket(Arc::new(Mutex::new(SocketInner::<T> {
element: element.clone(), element: element.clone(),
state: SocketState::Unscheduled, state: SocketState::Unscheduled,
reader, reader: Pin::new(Box::new(reader)),
buffer_pool, buffer_pool,
current_task: None, waker: None,
shutdown_receiver: None, shutdown_receiver: None,
clock: None, clock: None,
base_time: None, base_time: None,
}))) })))
} }
pub fn schedule<U, F, G>(&self, io_context: &IOContext, func: F, err_func: G) -> Result<(), ()> pub fn schedule<F, G, Fut>(
&self,
io_context: &IOContext,
func: F,
err_func: G,
) -> Result<(), ()>
where where
F: Fn((gst::Buffer, Option<std::net::SocketAddr>)) -> U + Send + 'static, F: Fn((gst::Buffer, Option<std::net::SocketAddr>)) -> Fut + Send + 'static,
U: IntoFuture<Item = (), Error = gst::FlowError> + 'static, Fut: Future<Output = Result<(), gst::FlowError>> + Send + 'static,
<U as IntoFuture>::Future: Send + 'static,
G: FnOnce(Either<gst::FlowError, io::Error>) + Send + 'static, G: FnOnce(Either<gst::FlowError, io::Error>) + Send + 'static,
{ {
// Ready->Paused // Ready->Paused
@ -109,13 +118,13 @@ impl<T: SocketRead + 'static> Socket<T> {
return Err(()); return Err(());
} }
let (sender, receiver) = oneshot::channel::<()>(); let (sender, receiver) = oneshot::channel();
inner.shutdown_receiver = Some(receiver); inner.shutdown_receiver = Some(receiver);
let element_clone = inner.element.clone(); let element_clone = inner.element.clone();
io_context.spawn( io_context.spawn(
stream stream
.for_each(move |(buffer, saddr)| { .try_for_each(move |(buffer, saddr)| {
func((buffer, saddr)).into_future().map_err(Either::Left) func((buffer, saddr)).into_future().map_err(Either::Left)
}) })
.then(move |res| { .then(move |res| {
@ -132,7 +141,7 @@ impl<T: SocketRead + 'static> Socket<T> {
let _ = sender.send(()); let _ = sender.send(());
Ok(()) future::ready(())
}), }),
); );
Ok(()) Ok(())
@ -154,8 +163,8 @@ impl<T: SocketRead + 'static> Socket<T> {
inner.clock = clock; inner.clock = clock;
inner.base_time = base_time; inner.base_time = base_time;
if let Some(task) = inner.current_task.take() { if let Some(waker) = inner.waker.take() {
task.notify(); waker.wake();
} }
} }
@ -176,8 +185,8 @@ impl<T: SocketRead + 'static> Socket<T> {
inner.clock = None; inner.clock = None;
inner.base_time = None; inner.base_time = None;
if let Some(task) = inner.current_task.take() { if let Some(waker) = inner.waker.take() {
task.notify(); waker.wake();
} }
} }
@ -197,15 +206,15 @@ impl<T: SocketRead + 'static> Socket<T> {
assert!(inner.state == SocketState::Scheduled || inner.state == SocketState::Running); assert!(inner.state == SocketState::Scheduled || inner.state == SocketState::Running);
inner.state = SocketState::Shutdown; inner.state = SocketState::Shutdown;
if let Some(task) = inner.current_task.take() { if let Some(waker) = inner.waker.take() {
task.notify(); waker.wake();
} }
let shutdown_receiver = inner.shutdown_receiver.take().unwrap(); let shutdown_receiver = inner.shutdown_receiver.take().unwrap();
gst_debug!(SOCKET_CAT, obj: &inner.element, "Waiting for socket to shut down"); gst_debug!(SOCKET_CAT, obj: &inner.element, "Waiting for socket to shut down");
drop(inner); drop(inner);
shutdown_receiver.wait().expect("Already shut down"); tokio_current_thread::block_on_all(shutdown_receiver).expect("Already shut down");
let mut inner = self.0.lock().unwrap(); let mut inner = self.0.lock().unwrap();
inner.state = SocketState::Unscheduled; inner.state = SocketState::Unscheduled;
@ -214,7 +223,7 @@ impl<T: SocketRead + 'static> Socket<T> {
} }
} }
impl<T: SocketRead + 'static> Clone for Socket<T> { impl<T: SocketRead + Unpin + 'static> Clone for Socket<T> {
fn clone(&self) -> Self { fn clone(&self) -> Self {
Socket::<T>(self.0.clone()) Socket::<T>(self.0.clone())
} }
@ -232,49 +241,56 @@ struct SocketStream<T: SocketRead + 'static>(
); );
impl<T: SocketRead + 'static> Stream for SocketStream<T> { impl<T: SocketRead + 'static> Stream for SocketStream<T> {
type Item = (gst::Buffer, Option<std::net::SocketAddr>); type Item =
type Error = Either<gst::FlowError, io::Error>; Result<(gst::Buffer, Option<std::net::SocketAddr>), Either<gst::FlowError, io::Error>>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
// take the mapped_buffer before locking the socket so as to please the mighty borrow checker
let mut mapped_buffer = self.1.take();
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
let mut inner = (self.0).0.lock().unwrap(); let mut inner = (self.0).0.lock().unwrap();
if inner.state == SocketState::Shutdown { if inner.state == SocketState::Shutdown {
gst_debug!(SOCKET_CAT, obj: &inner.element, "Socket shutting down"); gst_debug!(SOCKET_CAT, obj: &inner.element, "Socket shutting down");
return Ok(Async::Ready(None)); return Poll::Ready(None);
} else if inner.state == SocketState::Scheduled { } else if inner.state == SocketState::Scheduled {
gst_debug!(SOCKET_CAT, obj: &inner.element, "Socket not running"); gst_debug!(SOCKET_CAT, obj: &inner.element, "Socket not running");
inner.current_task = Some(task::current()); inner.waker = Some(cx.waker().clone());
return Ok(Async::NotReady); drop(inner);
self.1 = mapped_buffer;
return Poll::Pending;
} }
assert_eq!(inner.state, SocketState::Running); assert_eq!(inner.state, SocketState::Running);
gst_debug!(SOCKET_CAT, obj: &inner.element, "Trying to read data"); gst_debug!(SOCKET_CAT, obj: &inner.element, "Trying to read data");
let (len, saddr, time) = { let (len, saddr, time) = {
let buffer = match self.1 { let buffer = match mapped_buffer {
Some(ref mut buffer) => buffer, Some(ref mut buffer) => buffer,
None => match inner.buffer_pool.acquire_buffer(None) { None => match inner.buffer_pool.acquire_buffer(None) {
Ok(buffer) => { Ok(buffer) => {
self.1 = Some(buffer.into_mapped_buffer_writable().unwrap()); mapped_buffer = Some(buffer.into_mapped_buffer_writable().unwrap());
self.1.as_mut().unwrap() mapped_buffer.as_mut().unwrap()
} }
Err(err) => { Err(err) => {
gst_debug!(SOCKET_CAT, obj: &inner.element, "Failed to acquire buffer {:?}", err); gst_debug!(SOCKET_CAT, obj: &inner.element, "Failed to acquire buffer {:?}", err);
return Err(Either::Left(err)); return Poll::Ready(Some(Err(Either::Left(err))));
} }
}, },
}; };
match inner.reader.poll_read(buffer.as_mut_slice()) { match inner.reader.as_mut().poll_read(cx, buffer.as_mut_slice()) {
Ok(Async::NotReady) => { Poll::Pending => {
gst_debug!(SOCKET_CAT, obj: &inner.element, "No data available"); gst_debug!(SOCKET_CAT, obj: &inner.element, "No data available");
inner.current_task = Some(task::current()); inner.waker = Some(cx.waker().clone());
return Ok(Async::NotReady); drop(inner);
self.1 = mapped_buffer;
return Poll::Pending;
} }
Err(err) => { Poll::Ready(Err(err)) => {
gst_debug!(SOCKET_CAT, obj: &inner.element, "Read error {:?}", err); gst_debug!(SOCKET_CAT, obj: &inner.element, "Read error {:?}", err);
return Err(Either::Right(err)); return Poll::Ready(Some(Err(Either::Right(err))));
} }
Ok(Async::Ready((len, saddr))) => { Poll::Ready(Ok((len, saddr))) => {
let dts = if T::DO_TIMESTAMP { let dts = if T::DO_TIMESTAMP {
let time = inner.clock.as_ref().unwrap().get_time(); let time = inner.clock.as_ref().unwrap().get_time();
let running_time = time - inner.base_time.unwrap(); let running_time = time - inner.base_time.unwrap();
@ -289,7 +305,7 @@ impl<T: SocketRead + 'static> Stream for SocketStream<T> {
} }
}; };
let mut buffer = self.1.take().unwrap().into_buffer(); let mut buffer = mapped_buffer.unwrap().into_buffer();
{ {
let buffer = buffer.get_mut().unwrap(); let buffer = buffer.get_mut().unwrap();
if len < buffer.get_size() { if len < buffer.get_size() {
@ -298,6 +314,6 @@ impl<T: SocketRead + 'static> Stream for SocketStream<T> {
buffer.set_dts(time); buffer.set_dts(time);
} }
Ok(Async::Ready(Some((buffer, saddr)))) Poll::Ready(Some(Ok((buffer, saddr))))
} }
} }

View file

@ -16,30 +16,35 @@
// Free Software Foundation, Inc., 51 Franklin Street, Suite 500, // Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
// Boston, MA 02110-1335, USA. // Boston, MA 02110-1335, USA.
use either::Either;
use futures::ready;
use futures::{future::BoxFuture, prelude::*};
use glib; use glib;
use glib::prelude::*; use glib::prelude::*;
use glib::subclass; use glib::subclass;
use glib::subclass::prelude::*; use glib::subclass::prelude::*;
use glib::{glib_object_impl, glib_object_subclass};
use gst; use gst;
use gst::prelude::*; use gst::prelude::*;
use gst::subclass::prelude::*; use gst::subclass::prelude::*;
use gst::{gst_debug, gst_element_error, gst_error, gst_error_msg, gst_log, gst_trace};
use std::io; use lazy_static::lazy_static;
use std::sync::Mutex;
use std::u16;
use futures;
use futures::future;
use futures::{Async, Future, Poll};
use tokio::io::AsyncRead;
use tokio::net;
use either::Either;
use rand; use rand;
use iocontext::*; use std::io;
use socket::*; use std::pin::Pin;
use std::sync::Mutex;
use std::task::{Context, Poll};
use std::u16;
use tokio::io::AsyncRead;
use super::iocontext::*;
use super::socket::*;
const DEFAULT_ADDRESS: Option<&str> = Some("127.0.0.1"); const DEFAULT_ADDRESS: Option<&str> = Some("127.0.0.1");
const DEFAULT_PORT: u32 = 5000; const DEFAULT_PORT: u32 = 5000;
@ -135,14 +140,17 @@ static PROPERTIES: [subclass::Property; 6] = [
]; ];
pub struct TcpClientReader { pub struct TcpClientReader {
connect_future: net::tcp::ConnectFuture, connect_future: BoxFuture<'static, io::Result<tokio::net::TcpStream>>,
socket: Option<net::TcpStream>, socket: Option<tokio::net::TcpStream>,
} }
impl TcpClientReader { impl TcpClientReader {
pub fn new(connect_future: net::tcp::ConnectFuture) -> Self { pub fn new<Fut>(connect_future: Fut) -> Self
where
Fut: Future<Output = io::Result<tokio::net::TcpStream>> + Send + 'static,
{
Self { Self {
connect_future, connect_future: connect_future.boxed(),
socket: None, socket: None,
} }
} }
@ -152,27 +160,23 @@ impl SocketRead for TcpClientReader {
const DO_TIMESTAMP: bool = false; const DO_TIMESTAMP: bool = false;
fn poll_read( fn poll_read(
&mut self, mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8], buf: &mut [u8],
) -> Poll<(usize, Option<std::net::SocketAddr>), io::Error> { ) -> Poll<io::Result<(usize, Option<std::net::SocketAddr>)>> {
let socket = match self.socket { let socket = match self.socket {
Some(ref mut socket) => socket, Some(ref mut socket) => socket,
None => match self.connect_future.poll() { None => {
Ok(Async::Ready(stream)) => { let stream = ready!(self.connect_future.as_mut().poll(cx))?;
self.socket = Some(stream); self.socket = Some(stream);
self.socket.as_mut().unwrap() self.socket.as_mut().unwrap()
} }
Err(err) => {
return Err(err);
}
_ => return Ok(Async::NotReady),
},
}; };
match socket.poll_read(buf) {
Ok(Async::Ready(result)) => Ok(Async::Ready((result, None))), Pin::new(socket)
Ok(Async::NotReady) => Ok(Async::NotReady), .as_mut()
Err(result) => Err(result), .poll_read(cx, buf)
} .map_ok(|read_size| (read_size, None))
} }
} }
@ -182,7 +186,7 @@ struct State {
socket: Option<Socket<TcpClientReader>>, socket: Option<Socket<TcpClientReader>>,
need_initial_events: bool, need_initial_events: bool,
configured_caps: Option<gst::Caps>, configured_caps: Option<gst::Caps>,
pending_future_cancel: Option<futures::sync::oneshot::Sender<()>>, pending_future_abort_handle: Option<future::AbortHandle>,
} }
impl Default for State { impl Default for State {
@ -193,7 +197,7 @@ impl Default for State {
socket: None, socket: None,
need_initial_events: true, need_initial_events: true,
configured_caps: None, configured_caps: None,
pending_future_cancel: None, pending_future_abort_handle: None,
} }
} }
} }
@ -308,106 +312,101 @@ impl TcpClientSrc {
} }
} }
fn push_buffer( async fn push_buffer(element: gst::Element, buffer: gst::Buffer) -> Result<(), gst::FlowError> {
&self, let tcpclientsrc = Self::from_instance(&element);
element: &gst::Element,
buffer: gst::Buffer,
) -> future::Either<
Box<dyn Future<Item = (), Error = gst::FlowError> + Send + 'static>,
future::FutureResult<(), gst::FlowError>,
> {
let mut events = Vec::new(); let mut events = Vec::new();
let mut state = self.state.lock().unwrap(); {
if state.need_initial_events { let mut state = tcpclientsrc.state.lock().unwrap();
gst_debug!(CAT, obj: element, "Pushing initial events"); if state.need_initial_events {
gst_debug!(CAT, obj: &element, "Pushing initial events");
let stream_id = format!("{:08x}{:08x}", rand::random::<u32>(), rand::random::<u32>()); let stream_id =
events.push(gst::Event::new_stream_start(&stream_id).build()); format!("{:08x}{:08x}", rand::random::<u32>(), rand::random::<u32>());
if let Some(ref caps) = self.settings.lock().unwrap().caps { events.push(
events.push(gst::Event::new_caps(&caps).build()); gst::Event::new_stream_start(&stream_id)
state.configured_caps = Some(caps.clone()); .group_id(gst::util_group_id_next())
.build(),
);
if let Some(ref caps) = tcpclientsrc.settings.lock().unwrap().caps {
events.push(gst::Event::new_caps(&caps).build());
state.configured_caps = Some(caps.clone());
}
events.push(
gst::Event::new_segment(&gst::FormattedSegment::<gst::format::Time>::new())
.build(),
);
if let Some(event) = Self::create_io_context_event(&state) {
events.push(event);
// Get rid of reconfigure flag
tcpclientsrc.src_pad.check_reconfigure();
}
state.need_initial_events = false;
} else if tcpclientsrc.src_pad.check_reconfigure() {
if let Some(event) = Self::create_io_context_event(&state) {
events.push(event);
}
} }
events.push(
gst::Event::new_segment(&gst::FormattedSegment::<gst::format::Time>::new()).build(),
);
if let Some(event) = Self::create_io_context_event(&state) { if buffer.get_size() == 0 {
events.push(event); events.push(gst::Event::new_eos().build());
// Get rid of reconfigure flag
self.src_pad.check_reconfigure();
}
state.need_initial_events = false;
} else if self.src_pad.check_reconfigure() {
if let Some(event) = Self::create_io_context_event(&state) {
events.push(event);
} }
} }
if buffer.get_size() == 0 {
events.push(gst::Event::new_eos().build());
}
drop(state);
for event in events { for event in events {
self.src_pad.push_event(event); tcpclientsrc.src_pad.push_event(event);
} }
let res = match self.src_pad.push(buffer) { match tcpclientsrc.src_pad.push(buffer) {
Ok(_) => { Ok(_) => gst_log!(CAT, obj: &element, "Successfully pushed buffer"),
gst_log!(CAT, obj: element, "Successfully pushed buffer");
Ok(())
}
Err(gst::FlowError::Flushing) => { Err(gst::FlowError::Flushing) => {
gst_debug!(CAT, obj: element, "Flushing"); gst_debug!(CAT, obj: &element, "Flushing");
let state = self.state.lock().unwrap(); let state = tcpclientsrc.state.lock().unwrap();
if let Some(ref socket) = state.socket { if let Some(ref socket) = state.socket {
socket.pause(); socket.pause();
} }
Ok(())
} }
Err(gst::FlowError::Eos) => { Err(gst::FlowError::Eos) => {
gst_debug!(CAT, obj: element, "EOS"); gst_debug!(CAT, obj: &element, "EOS");
let state = self.state.lock().unwrap(); let state = tcpclientsrc.state.lock().unwrap();
if let Some(ref socket) = state.socket { if let Some(ref socket) = state.socket {
socket.pause(); socket.pause();
} }
Ok(())
} }
Err(err) => { Err(err) => {
gst_error!(CAT, obj: element, "Got error {}", err); gst_error!(CAT, obj: &element, "Got error {}", err);
gst_element_error!( gst_element_error!(
element, element,
gst::StreamError::Failed, gst::StreamError::Failed,
("Internal data stream error"), ("Internal data stream error"),
["streaming stopped, reason {}", err] ["streaming stopped, reason {}", err]
); );
Err(gst::FlowError::CustomError) return Err(gst::FlowError::CustomError);
}
}
let abortable_drain = {
let mut state = tcpclientsrc.state.lock().unwrap();
if let State {
io_context: Some(ref io_context),
pending_future_id: Some(ref pending_future_id),
ref mut pending_future_abort_handle,
..
} = *state
{
let (cancel, abortable_drain) =
io_context.drain_pending_futures(*pending_future_id);
*pending_future_abort_handle = cancel;
abortable_drain
} else {
return Ok(());
} }
}; };
match res { abortable_drain.await
Ok(()) => {
let mut state = self.state.lock().unwrap();
if let State {
io_context: Some(ref io_context),
pending_future_id: Some(ref pending_future_id),
ref mut pending_future_cancel,
..
} = *state
{
let (cancel, future) = io_context.drain_pending_futures(*pending_future_id);
*pending_future_cancel = cancel;
future
} else {
future::Either::B(future::ok(()))
}
}
Err(err) => future::Either::B(future::err(err)),
}
} }
fn prepare(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> { fn prepare(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
@ -448,7 +447,7 @@ impl TcpClientSrc {
let saddr = SocketAddr::new(addr, port as u16); let saddr = SocketAddr::new(addr, port as u16);
gst_debug!(CAT, obj: element, "Connecting to {:?}", saddr); gst_debug!(CAT, obj: element, "Connecting to {:?}", saddr);
let socket = net::TcpStream::connect(&saddr); let socket = tokio::net::TcpStream::connect(saddr);
let buffer_pool = gst::BufferPool::new(); let buffer_pool = gst::BufferPool::new();
let mut config = buffer_pool.get_config(); let mut config = buffer_pool.get_config();
@ -471,10 +470,7 @@ impl TcpClientSrc {
socket socket
.schedule( .schedule(
&io_context, &io_context,
move |(buffer, _)| { move |(buffer, _)| Self::push_buffer(element_clone.clone(), buffer),
let tcpclientsrc = Self::from_instance(&element_clone);
tcpclientsrc.push_buffer(&element_clone, buffer)
},
move |err| { move |err| {
gst_error!(CAT, obj: &element_clone2, "Got error {}", err); gst_error!(CAT, obj: &element_clone2, "Got error {}", err);
match err { match err {
@ -568,7 +564,10 @@ impl TcpClientSrc {
if let Some(ref socket) = state.socket { if let Some(ref socket) = state.socket {
socket.pause(); socket.pause();
} }
let _ = state.pending_future_cancel.take();
if let Some(abort_handle) = state.pending_future_abort_handle.take() {
abort_handle.abort();
}
gst_debug!(CAT, obj: element, "Stopped"); gst_debug!(CAT, obj: element, "Stopped");
@ -708,7 +707,7 @@ impl ObjectImpl for TcpClientSrc {
let element = obj.downcast_ref::<gst::Element>().unwrap(); let element = obj.downcast_ref::<gst::Element>().unwrap();
element.add_pad(&self.src_pad).unwrap(); element.add_pad(&self.src_pad).unwrap();
::set_element_flags(element, gst::ElementFlags::SOURCE); super::set_element_flags(element, gst::ElementFlags::SOURCE);
} }
} }

View file

@ -15,34 +15,35 @@
// Free Software Foundation, Inc., 51 Franklin Street, Suite 500, // Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
// Boston, MA 02110-1335, USA. // Boston, MA 02110-1335, USA.
use either::Either;
use futures::prelude::*;
use gio;
use gio_sys as gio_ffi;
use glib; use glib;
use glib::prelude::*; use glib::prelude::*;
use glib::subclass; use glib::subclass;
use glib::subclass::prelude::*; use glib::subclass::prelude::*;
use glib::{glib_object_impl, glib_object_subclass};
use gobject_sys as gobject_ffi;
use gst; use gst;
use gst::prelude::*; use gst::prelude::*;
use gst::subclass::prelude::*; use gst::subclass::prelude::*;
use gst::{gst_debug, gst_element_error, gst_error, gst_error_msg, gst_log, gst_trace};
use gst_net::*; use gst_net::*;
use gio; use lazy_static::lazy_static;
use gio_ffi;
use gobject_ffi;
use std::io;
use std::sync::Mutex;
use std::u16;
use futures;
use futures::future;
use futures::{Async, Future, Poll};
use tokio::net;
use either::Either;
use rand; use rand;
use net2; use std::io;
use std::pin::Pin;
use std::sync::Mutex;
use std::task::{Context, Poll};
use std::u16;
#[cfg(unix)] #[cfg(unix)]
use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
@ -50,8 +51,7 @@ use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
#[cfg(windows)] #[cfg(windows)]
use std::os::windows::io::{AsRawSocket, FromRawSocket, IntoRawSocket, RawSocket}; use std::os::windows::io::{AsRawSocket, FromRawSocket, IntoRawSocket, RawSocket};
use iocontext::*; use super::{iocontext::*, socket::*};
use socket::*;
const DEFAULT_ADDRESS: Option<&str> = Some("127.0.0.1"); const DEFAULT_ADDRESS: Option<&str> = Some("127.0.0.1");
const DEFAULT_PORT: u32 = 5000; const DEFAULT_PORT: u32 = 5000;
@ -287,11 +287,11 @@ static PROPERTIES: [subclass::Property; 10] = [
]; ];
pub struct UdpReader { pub struct UdpReader {
socket: net::UdpSocket, socket: tokio::net::udp::UdpSocket,
} }
impl UdpReader { impl UdpReader {
fn new(socket: net::UdpSocket) -> Self { fn new(socket: tokio::net::udp::UdpSocket) -> Self {
Self { socket } Self { socket }
} }
} }
@ -300,14 +300,14 @@ impl SocketRead for UdpReader {
const DO_TIMESTAMP: bool = true; const DO_TIMESTAMP: bool = true;
fn poll_read( fn poll_read(
&mut self, mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8], buf: &mut [u8],
) -> Poll<(usize, Option<std::net::SocketAddr>), io::Error> { ) -> Poll<io::Result<(usize, Option<std::net::SocketAddr>)>> {
match self.socket.poll_recv_from(buf) { Pin::new(&mut self.socket.recv_from(buf).boxed())
Ok(Async::Ready(result)) => Ok(Async::Ready((result.0, Some(result.1)))), .as_mut()
Ok(Async::NotReady) => Ok(Async::NotReady), .poll(cx)
Err(result) => Err(result), .map(|res| res.map(|(read_size, saddr)| (read_size, Some(saddr))))
}
} }
} }
@ -317,7 +317,7 @@ struct State {
socket: Option<Socket<UdpReader>>, socket: Option<Socket<UdpReader>>,
need_initial_events: bool, need_initial_events: bool,
configured_caps: Option<gst::Caps>, configured_caps: Option<gst::Caps>,
pending_future_cancel: Option<futures::sync::oneshot::Sender<()>>, pending_future_abort_handle: Option<future::AbortHandle>,
} }
impl Default for State { impl Default for State {
@ -328,7 +328,7 @@ impl Default for State {
socket: None, socket: None,
need_initial_events: true, need_initial_events: true,
configured_caps: None, configured_caps: None,
pending_future_cancel: None, pending_future_abort_handle: None,
} }
} }
} }
@ -443,101 +443,99 @@ impl UdpSrc {
} }
} }
fn push_buffer( async fn push_buffer(element: gst::Element, buffer: gst::Buffer) -> Result<(), gst::FlowError> {
&self, let udpsrc = Self::from_instance(&element);
element: &gst::Element,
buffer: gst::Buffer,
) -> future::Either<
Box<dyn Future<Item = (), Error = gst::FlowError> + Send + 'static>,
future::FutureResult<(), gst::FlowError>,
> {
let mut events = Vec::new(); let mut events = Vec::new();
let mut state = self.state.lock().unwrap(); {
if state.need_initial_events { let mut state = udpsrc.state.lock().unwrap();
gst_debug!(CAT, obj: element, "Pushing initial events"); if state.need_initial_events {
gst_debug!(CAT, obj: &element, "Pushing initial events");
let stream_id = format!("{:08x}{:08x}", rand::random::<u32>(), rand::random::<u32>()); let stream_id =
events.push(gst::Event::new_stream_start(&stream_id).build()); format!("{:08x}{:08x}", rand::random::<u32>(), rand::random::<u32>());
if let Some(ref caps) = self.settings.lock().unwrap().caps { events.push(
events.push(gst::Event::new_caps(&caps).build()); gst::Event::new_stream_start(&stream_id)
state.configured_caps = Some(caps.clone()); .group_id(gst::util_group_id_next())
} .build(),
events.push( );
gst::Event::new_segment(&gst::FormattedSegment::<gst::format::Time>::new()).build(), if let Some(ref caps) = udpsrc.settings.lock().unwrap().caps {
); events.push(gst::Event::new_caps(&caps).build());
state.configured_caps = Some(caps.clone());
}
events.push(
gst::Event::new_segment(&gst::FormattedSegment::<gst::format::Time>::new())
.build(),
);
if let Some(event) = Self::create_io_context_event(&state) { if let Some(event) = Self::create_io_context_event(&state) {
events.push(event); events.push(event);
// Get rid of reconfigure flag // Get rid of reconfigure flag
self.src_pad.check_reconfigure(); udpsrc.src_pad.check_reconfigure();
} }
state.need_initial_events = false; state.need_initial_events = false;
} else if self.src_pad.check_reconfigure() { } else if udpsrc.src_pad.check_reconfigure() {
if let Some(event) = Self::create_io_context_event(&state) { if let Some(event) = Self::create_io_context_event(&state) {
events.push(event); events.push(event);
}
} }
} }
drop(state);
for event in events { for event in events {
self.src_pad.push_event(event); udpsrc.src_pad.push_event(event);
} }
let res = match self.src_pad.push(buffer) { match udpsrc.src_pad.push(buffer) {
Ok(_) => { Ok(_) => {
gst_log!(CAT, obj: element, "Successfully pushed buffer"); gst_log!(CAT, obj: &element, "Successfully pushed buffer");
Ok(())
} }
Err(gst::FlowError::Flushing) => { Err(gst::FlowError::Flushing) => {
gst_debug!(CAT, obj: element, "Flushing"); gst_debug!(CAT, obj: &element, "Flushing");
let state = self.state.lock().unwrap(); let state = udpsrc.state.lock().unwrap();
if let Some(ref socket) = state.socket { if let Some(ref socket) = state.socket {
socket.pause(); socket.pause();
} }
Ok(())
} }
Err(gst::FlowError::Eos) => { Err(gst::FlowError::Eos) => {
gst_debug!(CAT, obj: element, "EOS"); gst_debug!(CAT, obj: &element, "EOS");
let state = self.state.lock().unwrap(); let state = udpsrc.state.lock().unwrap();
if let Some(ref socket) = state.socket { if let Some(ref socket) = state.socket {
socket.pause(); socket.pause();
} }
Ok(())
} }
Err(err) => { Err(err) => {
gst_error!(CAT, obj: element, "Got error {}", err); gst_error!(CAT, obj: &element, "Got error {}", err);
gst_element_error!( gst_element_error!(
element, element,
gst::StreamError::Failed, gst::StreamError::Failed,
("Internal data stream error"), ("Internal data stream error"),
["streaming stopped, reason {}", err] ["streaming stopped, reason {}", err]
); );
Err(gst::FlowError::CustomError) return Err(gst::FlowError::CustomError);
}
}
let abortable_drain = {
let mut state = udpsrc.state.lock().unwrap();
if let State {
io_context: Some(ref io_context),
pending_future_id: Some(ref pending_future_id),
ref mut pending_future_abort_handle,
..
} = *state
{
let (cancel, abortable_drain) =
io_context.drain_pending_futures(*pending_future_id);
*pending_future_abort_handle = cancel;
abortable_drain
} else {
return Ok(());
} }
}; };
match res { abortable_drain.await
Ok(()) => {
let mut state = self.state.lock().unwrap();
if let State {
io_context: Some(ref io_context),
pending_future_id: Some(ref pending_future_id),
ref mut pending_future_cancel,
..
} = *state
{
let (cancel, future) = io_context.drain_pending_futures(*pending_future_id);
*pending_future_cancel = cancel;
future
} else {
future::Either::B(future::ok(()))
}
}
Err(err) => future::Either::B(future::err(err)),
}
} }
fn prepare(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> { fn prepare(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
@ -571,8 +569,8 @@ impl UdpSrc {
socket = wrapped_socket.get() socket = wrapped_socket.get()
} }
let socket = let socket = tokio::net::UdpSocket::from_std(socket, io_context.reactor_handle())
net::UdpSocket::from_std(socket, io_context.reactor_handle()).map_err(|err| { .map_err(|err| {
gst_error_msg!( gst_error_msg!(
gst::ResourceError::OpenRead, gst::ResourceError::OpenRead,
["Failed to setup socket for tokio: {}", err] ["Failed to setup socket for tokio: {}", err]
@ -666,8 +664,8 @@ impl UdpSrc {
) )
})?; })?;
let socket = let socket = tokio::net::UdpSocket::from_std(socket, io_context.reactor_handle())
net::UdpSocket::from_std(socket, io_context.reactor_handle()).map_err(|err| { .map_err(|err| {
gst_error_msg!( gst_error_msg!(
gst::ResourceError::OpenRead, gst::ResourceError::OpenRead,
["Failed to setup socket for tokio: {}", err] ["Failed to setup socket for tokio: {}", err]
@ -679,7 +677,7 @@ impl UdpSrc {
match addr { match addr {
IpAddr::V4(addr) => { IpAddr::V4(addr) => {
socket socket
.join_multicast_v4(&addr, &Ipv4Addr::new(0, 0, 0, 0)) .join_multicast_v4(addr, Ipv4Addr::new(0, 0, 0, 0))
.map_err(|err| { .map_err(|err| {
gst_error_msg!( gst_error_msg!(
gst::ResourceError::OpenRead, gst::ResourceError::OpenRead,
@ -774,7 +772,6 @@ impl UdpSrc {
.schedule( .schedule(
&io_context, &io_context,
move |(mut buffer, saddr)| { move |(mut buffer, saddr)| {
let udpsrc = Self::from_instance(&element_clone);
if let Some(saddr) = saddr { if let Some(saddr) = saddr {
if retrieve_sender_address { if retrieve_sender_address {
let inet_addr = match saddr.ip() { let inet_addr = match saddr.ip() {
@ -791,7 +788,7 @@ impl UdpSrc {
} }
} }
udpsrc.push_buffer(&element_clone, buffer) Self::push_buffer(element_clone.clone(), buffer)
}, },
move |err| { move |err| {
gst_error!(CAT, obj: &element_clone2, "Got error {}", err); gst_error!(CAT, obj: &element_clone2, "Got error {}", err);
@ -891,7 +888,10 @@ impl UdpSrc {
if let Some(ref socket) = state.socket { if let Some(ref socket) = state.socket {
socket.pause(); socket.pause();
} }
let _ = state.pending_future_cancel.take();
if let Some(abort_handle) = state.pending_future_abort_handle.take() {
abort_handle.abort();
}
gst_debug!(CAT, obj: element, "Stopped"); gst_debug!(CAT, obj: element, "Stopped");
@ -1088,7 +1088,7 @@ impl ObjectImpl for UdpSrc {
let element = obj.downcast_ref::<gst::Element>().unwrap(); let element = obj.downcast_ref::<gst::Element>().unwrap();
element.add_pad(&self.src_pad).unwrap(); element.add_pad(&self.src_pad).unwrap();
::set_element_flags(element, gst::ElementFlags::SOURCE); super::set_element_flags(element, gst::ElementFlags::SOURCE);
} }
} }

View file

@ -15,13 +15,12 @@
// Free Software Foundation, Inc., 51 Franklin Street, Suite 500, // Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
// Boston, MA 02110-1335, USA. // Boston, MA 02110-1335, USA.
extern crate glib;
use glib::prelude::*; use glib::prelude::*;
extern crate gstreamer as gst; use gst;
extern crate gstreamer_check as gst_check; use gst_check;
extern crate gstthreadshare; use gstthreadshare;
fn init() { fn init() {
use std::sync::Once; use std::sync::Once;

View file

@ -0,0 +1,235 @@
// Copyright (C) 2019 François Laignel <fengalin@free.fr>
//
// This library is free software; you can redistribute it and/or
// modify it under the terms of the GNU Library General Public
// License as published by the Free Software Foundation; either
// version 2 of the License, or (at your option) any later version.
//
// This library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
// Library General Public License for more details.
//
// You should have received a copy of the GNU Library General Public
// License along with this library; if not, write to the
// Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
// Boston, MA 02110-1335, USA.
use gst;
use gstthreadshare;
fn init() {
use std::sync::Once;
static INIT: Once = Once::new();
INIT.call_once(|| {
gst::init().unwrap();
gstthreadshare::plugin_register_static().expect("gstthreadshare pipeline test");
});
}
#[test]
fn test_multiple_contexts() {
use gst::prelude::*;
use std::net;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
init();
const CONTEXT_NB: u32 = 2;
const SRC_NB: u16 = 4;
const CONTEXT_WAIT: u32 = 1;
const BUFFER_NB: u32 = 3;
let l = glib::MainLoop::new(None, false);
let pipeline = gst::Pipeline::new(None);
let mut src_list = Vec::<gst::Element>::new();
for i in 0..SRC_NB {
let src =
gst::ElementFactory::make("ts-udpsrc", Some(format!("src-{}", i).as_str())).unwrap();
src.set_property("context", &format!("context-{}", (i as u32) % CONTEXT_NB))
.unwrap();
src.set_property("context-wait", &CONTEXT_WAIT).unwrap();
src.set_property("port", &(40000u32 + (i as u32))).unwrap();
let queue =
gst::ElementFactory::make("ts-queue", Some(format!("queue-{}", i).as_str())).unwrap();
queue
.set_property("context", &format!("context-{}", (i as u32) % CONTEXT_NB))
.unwrap();
queue.set_property("context-wait", &CONTEXT_WAIT).unwrap();
let fakesink =
gst::ElementFactory::make("fakesink", Some(format!("sink-{}", i).as_str())).unwrap();
fakesink.set_property("sync", &false).unwrap();
fakesink.set_property("async", &false).unwrap();
pipeline.add_many(&[&src, &queue, &fakesink]).unwrap();
src.link(&queue).unwrap();
queue.link(&fakesink).unwrap();
src_list.push(src);
}
let bus = pipeline.get_bus().unwrap();
let l_clone = l.clone();
bus.add_watch(move |_, msg| {
use gst::MessageView;
match msg.view() {
MessageView::Error(err) => {
println!(
"Error from {:?}: {} ({:?})",
err.get_src().map(|s| s.get_path_string()),
err.get_error(),
err.get_debug()
);
l_clone.quit();
}
_ => (),
};
glib::Continue(true)
});
let pipeline_clone = pipeline.clone();
let l_clone = l.clone();
std::thread::spawn(move || {
// Sleep to allow the pipeline to be ready
std::thread::sleep(std::time::Duration::from_millis(50));
let buffer = [0; 160];
let socket = net::UdpSocket::bind("0.0.0.0:0").unwrap();
let ipaddr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));
let destinations = (40000..(40000 + SRC_NB))
.map(|port| SocketAddr::new(ipaddr, port))
.collect::<Vec<_>>();
let wait = std::time::Duration::from_millis(CONTEXT_WAIT as u64);
for _ in 0..BUFFER_NB {
let now = std::time::Instant::now();
for dest in &destinations {
socket.send_to(&buffer, dest).unwrap();
}
let elapsed = now.elapsed();
if elapsed < wait {
std::thread::sleep(wait - elapsed);
}
}
std::thread::sleep(std::time::Duration::from_millis(50));
pipeline_clone.set_state(gst::State::Null).unwrap();
l_clone.quit();
});
pipeline.set_state(gst::State::Playing).unwrap();
println!("starting...");
l.run();
}
#[test]
fn test_premature_shutdown() {
use gst::prelude::*;
init();
const CONTEXT_NAME: &str = "pipeline-context";
const CONTEXT_WAIT: u32 = 1;
const QUEUE_BUFFER_CAPACITY: u32 = 1;
const BURST_NB: u32 = 2;
let l = glib::MainLoop::new(None, false);
let pipeline = gst::Pipeline::new(None);
let caps = gst::Caps::new_simple("foo/bar", &[]);
let appsrc = gst::ElementFactory::make("ts-appsrc", None).unwrap();
appsrc.set_property("caps", &caps).unwrap();
appsrc.set_property("do-timestamp", &true).unwrap();
appsrc.set_property("context", &CONTEXT_NAME).unwrap();
appsrc.set_property("context-wait", &CONTEXT_WAIT).unwrap();
let queue = gst::ElementFactory::make("ts-queue", None).unwrap();
queue.set_property("context", &CONTEXT_NAME).unwrap();
queue.set_property("context-wait", &CONTEXT_WAIT).unwrap();
queue
.set_property("max-size-buffers", &QUEUE_BUFFER_CAPACITY)
.unwrap();
let fakesink = gst::ElementFactory::make("fakesink", None).unwrap();
fakesink.set_property("sync", &false).unwrap();
fakesink.set_property("async", &false).unwrap();
pipeline.add_many(&[&appsrc, &queue, &fakesink]).unwrap();
appsrc.link(&queue).unwrap();
queue.link(&fakesink).unwrap();
let bus = pipeline.get_bus().unwrap();
let l_clone = l.clone();
bus.add_watch(move |_, msg| {
use gst::MessageView;
match msg.view() {
MessageView::Error(err) => {
println!(
"Error from {:?}: {} ({:?})",
err.get_src().map(|s| s.get_path_string()),
err.get_error(),
err.get_debug()
);
l_clone.quit();
}
_ => (),
};
glib::Continue(true)
});
let pipeline_clone = pipeline.clone();
let l_clone = l.clone();
std::thread::spawn(move || {
// Sleep to allow the pipeline to be ready
std::thread::sleep(std::time::Duration::from_millis(10));
// Fill up the queue then pause a bit and push again
let mut burst_idx = 0;
loop {
let was_pushed = appsrc
.emit("push-buffer", &[&gst::Buffer::from_slice(vec![0; 1024])])
.unwrap()
.unwrap()
.get_some::<bool>()
.unwrap();
if !was_pushed {
if burst_idx < BURST_NB {
burst_idx += 1;
// Sleep a bit to let a few buffers go through
std::thread::sleep(std::time::Duration::from_micros(500));
} else {
pipeline_clone.set_state(gst::State::Null).unwrap();
break;
}
}
}
l_clone.quit();
});
pipeline.set_state(gst::State::Playing).unwrap();
println!("starting...");
l.run();
}

View file

@ -15,17 +15,16 @@
// Free Software Foundation, Inc., 51 Franklin Street, Suite 500, // Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
// Boston, MA 02110-1335, USA. // Boston, MA 02110-1335, USA.
extern crate glib;
use glib::prelude::*; use glib::prelude::*;
extern crate gstreamer as gst; use gst;
use gst::prelude::*; use gst::prelude::*;
extern crate gstreamer_app as gst_app; use gst_app;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
extern crate gstthreadshare; use gstthreadshare;
fn init() { fn init() {
use std::sync::Once; use std::sync::Once;

View file

@ -15,17 +15,17 @@
// Free Software Foundation, Inc., 51 Franklin Street, Suite 500, // Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
// Boston, MA 02110-1335, USA. // Boston, MA 02110-1335, USA.
extern crate glib; use glib;
use glib::prelude::*; use glib::prelude::*;
extern crate gstreamer as gst; use gst;
use gst::prelude::*; use gst::prelude::*;
extern crate gstreamer_app as gst_app; use gst_app;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
extern crate gstthreadshare; use gstthreadshare;
fn init() { fn init() {
use std::sync::Once; use std::sync::Once;

View file

@ -16,19 +16,19 @@
// Free Software Foundation, Inc., 51 Franklin Street, Suite 500, // Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
// Boston, MA 02110-1335, USA. // Boston, MA 02110-1335, USA.
extern crate glib; use glib;
use glib::prelude::*; use glib::prelude::*;
extern crate gstreamer as gst; use gst;
use gst::prelude::*; use gst::prelude::*;
extern crate gstreamer_app as gst_app; use gst_app;
use std::io::Write; use std::io::Write;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::{thread, time}; use std::{thread, time};
extern crate gstthreadshare; use gstthreadshare;
fn init() { fn init() {
use std::sync::Once; use std::sync::Once;

View file

@ -15,17 +15,17 @@
// Free Software Foundation, Inc., 51 Franklin Street, Suite 500, // Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
// Boston, MA 02110-1335, USA. // Boston, MA 02110-1335, USA.
extern crate glib; use glib;
use glib::prelude::*; use glib::prelude::*;
extern crate gio; use gio;
extern crate gstreamer as gst; use gst;
extern crate gstreamer_check as gst_check; use gst_check;
use std::thread; use std::thread;
extern crate gstthreadshare; use gstthreadshare;
fn init() { fn init() {
use std::sync::Once; use std::sync::Once;

1
rust-toolchain Normal file
View file

@ -0,0 +1 @@
beta