threadshare: WIP

This commit is contained in:
Sebastian Dröge 2018-03-09 17:32:21 +02:00
parent 0fe6fbc859
commit 299c69185e

View file

@ -23,20 +23,18 @@ use gst::prelude::*;
use gst_plugin::properties::*; use gst_plugin::properties::*;
use gst_plugin::object::*; use gst_plugin::object::*;
use gst_plugin::element::*; use gst_plugin::element::*;
use gst_plugin::bytes::*;
use std::sync::{Arc, Mutex, Weak}; use std::sync::{Arc, Mutex, Weak};
use std::{cmp, mem, ops, i32, u16}; use std::u16;
use std::io::Write;
use futures::{Async, Future, Poll, Stream}; use futures::{Async, Future, IntoFuture, Poll, Stream};
use futures::{future, task}; use futures::{future, task};
use futures::sync::oneshot; use futures::sync::oneshot;
use tokio::executor::thread_pool; use tokio::executor::thread_pool;
use tokio::reactor; use tokio::reactor;
use tokio::net; use tokio::net;
use std::thread;
use std::collections::HashMap; use std::collections::HashMap;
use std::marker::PhantomData;
lazy_static!{ lazy_static!{
static ref CONTEXTS: Mutex<HashMap<String, Weak<IOContextInner>>> = Mutex::new(HashMap::new()); static ref CONTEXTS: Mutex<HashMap<String, Weak<IOContextInner>>> = Mutex::new(HashMap::new());
@ -114,6 +112,30 @@ impl IOContext {
} }
} }
/* Workaround for https://github.com/tokio-rs/tokio/issues/207 */
struct YieldOnce<E>(Option<()>, PhantomData<E>);
impl<E> YieldOnce<E> {
fn new() -> YieldOnce<E> {
YieldOnce(None, PhantomData)
}
}
impl<E> Future for YieldOnce<E> {
type Item = ();
type Error = E;
fn poll(&mut self) -> Poll<(), E> {
if let Some(_) = self.0.take() {
Ok(Async::Ready(()))
} else {
self.0 = Some(());
task::current().notify();
Ok(Async::NotReady)
}
}
}
#[derive(Clone)] #[derive(Clone)]
struct Socket(Arc<Mutex<SocketInner>>); struct Socket(Arc<Mutex<SocketInner>>);
@ -150,7 +172,7 @@ impl Socket {
}))) })))
} }
fn schedule<F: FnMut(gst::Buffer) -> Result<(), gst::FlowError> + Send + 'static>( fn schedule<F: Fn(gst::Buffer) -> Result<(), gst::FlowError> + Send + 'static>(
&self, &self,
io_context: &IOContext, io_context: &IOContext,
func: F, func: F,
@ -176,13 +198,23 @@ impl Socket {
inner.shutdown_receiver = Some(receiver); inner.shutdown_receiver = Some(receiver);
let element_clone = inner.element.clone(); let element_clone = inner.element.clone();
io_context.spawn(stream.for_each(func).then(move |res| { io_context.spawn(
gst_debug!(SOCKET_CAT, obj: &element_clone, "Socket finished {:?}", res); stream
// TODO: Do something with errors here? .for_each(move |buffer| {
let _ = sender.send(()); let res = func(buffer);
match res {
Ok(()) => future::Either::A(YieldOnce::new()),
Err(err) => future::Either::B(Err(err).into_future()),
}
})
.then(move |res| {
gst_debug!(SOCKET_CAT, obj: &element_clone, "Socket finished {:?}", res);
// TODO: Do something with errors here?
let _ = sender.send(());
Ok(()) Ok(())
})); }),
);
} }
fn unpause(&self, clock: gst::Clock, base_time: gst::ClockTime) { fn unpause(&self, clock: gst::Clock, base_time: gst::ClockTime) {
@ -550,7 +582,7 @@ impl UdpSrc {
let addr: IpAddr = match settings.address { let addr: IpAddr = match settings.address {
None => return Err(()), None => return Err(()),
Some(addr) => match addr.parse() { Some(ref addr) => match addr.parse() {
Err(_) => return Err(()), Err(_) => return Err(()),
Ok(addr) => addr, Ok(addr) => addr,
}, },
@ -617,22 +649,15 @@ impl UdpSrc {
udpsrc udpsrc
.src_pad .src_pad
.push_event(gst::Event::new_stream_start("meh").build()); .push_event(gst::Event::new_stream_start("meh").build());
udpsrc.src_pad.push_event( if let Some(ref caps) = settings.caps {
gst::Event::new_segment(&gst::FormattedSegment::<gst::format::Time>::new())
.build(),
);
if let Some(caps) = udpsrc
.settings
.lock()
.unwrap()
.caps
.as_ref()
.map(|c| c.clone())
{
udpsrc udpsrc
.src_pad .src_pad
.push_event(gst::Event::new_caps(&caps).build()); .push_event(gst::Event::new_caps(&caps).build());
} }
udpsrc.src_pad.push_event(
gst::Event::new_segment(&gst::FormattedSegment::<gst::format::Time>::new())
.build(),
);
state.need_initial_events = false; state.need_initial_events = false;
} }