diff --git a/gst-plugin-threadshare/src/iocontext.rs b/gst-plugin-threadshare/src/iocontext.rs index aee1545a8..41431375c 100644 --- a/gst-plugin-threadshare/src/iocontext.rs +++ b/gst-plugin-threadshare/src/iocontext.rs @@ -28,7 +28,7 @@ use futures::future; use futures::stream::futures_unordered::FuturesUnordered; use futures::sync::mpsc as futures_mpsc; use futures::sync::oneshot; -use futures::{Future, Stream}; +use futures::{Async, Future, Stream}; use tokio::reactor; use tokio_current_thread; use tokio_timer::timer; @@ -399,11 +399,13 @@ impl Ord for TimerEntry { } } +#[allow(unused)] pub struct Interval { receiver: futures_mpsc::UnboundedReceiver<()>, } impl Interval { + #[allow(unused)] pub fn new(context: &IOContext, interval: time::Duration) -> Self { let (sender, receiver) = futures_mpsc::unbounded(); @@ -428,3 +430,39 @@ impl Stream for Interval { self.receiver.poll() } } + +pub struct Timeout { + receiver: futures_mpsc::UnboundedReceiver<()>, +} + +impl Timeout { + pub fn new(context: &IOContext, timeout: time::Duration) -> Self { + let (sender, receiver) = futures_mpsc::unbounded(); + + let mut timers = context.0.timers.lock().unwrap(); + let entry = TimerEntry { + time: time::Instant::now() + timeout, + id: TIMER_ENTRY_ID.fetch_add(1, atomic::Ordering::Relaxed), + interval: None, + sender, + }; + timers.push(entry); + + Self { receiver } + } +} + +impl Future for Timeout { + type Item = (); + type Error = (); + + fn poll(&mut self) -> futures::Poll { + let res = self.receiver.poll()?; + + match res { + Async::NotReady => Ok(Async::NotReady), + Async::Ready(None) => unreachable!(), + Async::Ready(Some(_)) => Ok(Async::Ready(())), + } + } +} diff --git a/gst-plugin-threadshare/src/jitterbuffer.rs b/gst-plugin-threadshare/src/jitterbuffer.rs index a9364900f..407853710 100644 --- a/gst-plugin-threadshare/src/jitterbuffer.rs +++ b/gst-plugin-threadshare/src/jitterbuffer.rs @@ -27,6 +27,12 @@ use gst_rtp::RTPBuffer; use std::cmp::{max, min, Ordering}; use std::collections::BTreeSet; use std::sync::{Mutex, MutexGuard}; +use std::time; + +use futures::sync::oneshot; +use futures::Future; + +use iocontext::*; use RTPJitterBuffer; use RTPJitterBufferItem; @@ -36,6 +42,8 @@ const DEFAULT_LATENCY_MS: u32 = 200; const DEFAULT_DO_LOST: bool = false; const DEFAULT_MAX_DROPOUT_TIME: u32 = 60000; const DEFAULT_MAX_MISORDER_TIME: u32 = 2000; +const DEFAULT_CONTEXT: &str = ""; +const DEFAULT_CONTEXT_WAIT: u32 = 20; #[derive(Debug, Clone)] struct Settings { @@ -43,6 +51,8 @@ struct Settings { do_lost: bool, max_dropout_time: u32, max_misorder_time: u32, + context: String, + context_wait: u32, } impl Default for Settings { @@ -52,11 +62,13 @@ impl Default for Settings { do_lost: DEFAULT_DO_LOST, max_dropout_time: DEFAULT_MAX_DROPOUT_TIME, max_misorder_time: DEFAULT_MAX_MISORDER_TIME, + context: DEFAULT_CONTEXT.into(), + context_wait: DEFAULT_CONTEXT_WAIT, } } } -static PROPERTIES: [subclass::Property; 5] = [ +static PROPERTIES: [subclass::Property; 7] = [ subclass::Property("latency", |name| { glib::ParamSpec::uint( name, @@ -108,6 +120,26 @@ static PROPERTIES: [subclass::Property; 5] = [ glib::ParamFlags::READABLE, ) }), + subclass::Property("context", |name| { + glib::ParamSpec::string( + name, + "Context", + "Context name to share threads with", + Some(DEFAULT_CONTEXT), + glib::ParamFlags::READWRITE, + ) + }), + subclass::Property("context-wait", |name| { + glib::ParamSpec::uint( + name, + "Context Wait", + "Throttle poll loop to run at most once every this many ms", + 0, + 1000, + DEFAULT_CONTEXT_WAIT, + glib::ParamFlags::READWRITE, + ) + }), ]; #[derive(Eq)] @@ -149,6 +181,7 @@ impl PartialEq for GapPacket { } struct State { + io_context: Option, jbuf: glib::SendUniqueCell, packet_rate_ctx: RTPPacketRateCtx, clock_rate: i32, @@ -169,11 +202,14 @@ struct State { earliest_seqnum: u16, last_popped_pts: gst::ClockTime, discont: bool, + cancel: Option>, + last_res: Result, } impl Default for State { fn default() -> State { State { + io_context: None, jbuf: glib::SendUniqueCell::new(RTPJitterBuffer::new()).unwrap(), packet_rate_ctx: RTPPacketRateCtx::new(), clock_rate: -1, @@ -194,6 +230,8 @@ impl Default for State { earliest_seqnum: 0, last_popped_pts: gst::CLOCK_TIME_NONE, discont: false, + cancel: None, + last_res: Ok(gst::FlowSuccess::Ok), } } } @@ -357,6 +395,8 @@ impl JitterBuffer { pad: &gst::Pad, element: &gst::Element, ) -> Result { + gst_info!(self.cat, obj: element, "Resetting"); + state.jbuf.borrow().flush(); state.jbuf.borrow().reset_skew(); state.discont = true; @@ -493,6 +533,7 @@ impl JitterBuffer { if gap <= 0 { state.num_late += 1; + gst_debug!(self.cat, obj: element, "Dropping late {}", seq); return Ok(gst::FlowSuccess::Ok); } } @@ -689,28 +730,18 @@ impl JitterBuffer { state.num_pushed += 1; - gst_debug!(self.cat, obj: &self.src_pad, "Pushing buffer {:?}", buffer); + gst_debug!(self.cat, obj: &self.src_pad, "Pushing buffer {:?} with seq {}", buffer, seq); self.src_pad.push(buffer.to_owned()) } - fn enqueue_item( - &self, - state: &mut MutexGuard, - pad: &gst::Pad, - element: &gst::Element, - buffer: Option, - ) -> Result { + fn schedule(&self, state: &mut MutexGuard, element: &gst::Element) { let settings = self.settings.lock().unwrap().clone(); let latency_ns = settings.latency_ms as u64 * gst::MSECOND; drop(settings); let now = self.get_current_running_time(element); - if let Some(buf) = buffer { - self.store(state, pad, element, buf)?; - } - gst_debug!( self.cat, obj: element, @@ -721,34 +752,94 @@ impl JitterBuffer { latency_ns ); - if now.is_some() - && state.earliest_pts.is_some() - && state.earliest_pts + latency_ns - state.packet_spacing < now - { - let mut cont = true; + if state.earliest_pts.is_some() { + let next_wakeup = state.earliest_pts + latency_ns - state.packet_spacing; - while cont { - let (head_pts, head_seq) = state.jbuf.borrow().peek(); - - if head_pts == state.earliest_pts && head_seq == state.earliest_seqnum as u32 { - cont = false; + let timeout = { + if next_wakeup > now { + (next_wakeup - now).nseconds().unwrap() + } else { + 0 } + }; - self.pop_and_push(state, element)?; + if let Some(cancel) = state.cancel.take() { + let _ = cancel.send(()); + } - if !cont { - let (earliest_pts, earliest_seqnum) = state.jbuf.borrow().find_earliest(); - state.earliest_pts = earliest_pts; - state.earliest_seqnum = earliest_seqnum as u16; + let (cancel, cancel_handler) = oneshot::channel(); - if state.earliest_pts.is_some() { - cont = state.earliest_pts + latency_ns - state.packet_spacing < now; + let element_clone = element.clone(); + + gst_debug!(self.cat, obj: element, "Scheduling wakeup in {}", timeout); + + let timer = Timeout::new( + state.io_context.as_ref().unwrap(), + time::Duration::from_nanos(timeout), + ) + .map_err(|e| panic!("timer failed; err={:?}", e)) + .and_then(move |_| { + let jb = Self::from_instance(&element_clone); + let mut state = jb.state.lock().unwrap(); + /* Check earliest_pts after taking the lock again */ + let mut cont = state.earliest_pts.is_some(); + + gst_debug!( + jb.cat, + obj: &element_clone, + "Woke back up, earliest_pts {}", + state.earliest_pts + ); + + let _ = state.cancel.take(); + + while cont { + let (head_pts, head_seq) = state.jbuf.borrow().peek(); + + if head_pts == state.earliest_pts && head_seq == state.earliest_seqnum as u32 { + cont = false; + } + + state.last_res = jb.pop_and_push(&mut state, &element_clone); + + if !cont { + let (earliest_pts, earliest_seqnum) = state.jbuf.borrow().find_earliest(); + state.earliest_pts = earliest_pts; + state.earliest_seqnum = earliest_seqnum as u16; + + if state.earliest_pts.is_some() { + cont = state.earliest_pts + latency_ns - state.packet_spacing < now; + } } } - } + + jb.schedule(&mut state, &element_clone); + + Ok(()) + }); + + let future = timer.select(cancel_handler).then(|_| Ok(())); + + state.cancel = Some(cancel); + + state.io_context.as_ref().unwrap().spawn(future); + } + } + + fn enqueue_item( + &self, + state: &mut MutexGuard, + pad: &gst::Pad, + element: &gst::Element, + buffer: Option, + ) -> Result { + if let Some(buf) = buffer { + self.store(state, pad, element, buf)?; } - Ok(gst::FlowSuccess::Ok) + self.schedule(state, element); + + state.last_res } fn drain(&self, state: &mut MutexGuard, element: &gst::Element) -> bool { @@ -770,10 +861,16 @@ impl JitterBuffer { ret } - fn flush(&self) { + fn flush(&self, element: &gst::Element) { let mut state = self.state.lock().unwrap(); + gst_info!(self.cat, obj: element, "Flushing"); + + let io_context = state.io_context.take(); + *state = State::default(); + + state.io_context = io_context; } fn sink_chain( @@ -794,7 +891,7 @@ impl JitterBuffer { match event.view() { EventView::FlushStop(..) => { - self.flush(); + self.flush(element); } EventView::Segment(e) => { let mut state = self.state.lock().unwrap(); @@ -822,7 +919,6 @@ impl JitterBuffer { query: &mut gst::QueryRef, ) -> bool { use gst::QueryView; - gst_log!(self.cat, obj: pad, "Forwarding query {:?}", query); match query.view_mut() { @@ -1031,6 +1127,17 @@ impl ObjectImpl for JitterBuffer { let mut settings = self.settings.lock().unwrap(); settings.max_misorder_time = value.get_some().expect("type checked upstream"); } + subclass::Property("context", ..) => { + let mut settings = self.settings.lock().unwrap(); + settings.context = value + .get() + .expect("type checked upstream") + .unwrap_or_else(|| "".into()); + } + subclass::Property("context-wait", ..) => { + let mut settings = self.settings.lock().unwrap(); + settings.context_wait = value.get_some().expect("type checked upstream"); + } _ => unimplemented!(), } } @@ -1067,6 +1174,14 @@ impl ObjectImpl for JitterBuffer { ); Ok(s.to_value()) } + subclass::Property("context", ..) => { + let settings = self.settings.lock().unwrap(); + Ok(settings.context.to_value()) + } + subclass::Property("context-wait", ..) => { + let settings = self.settings.lock().unwrap(); + Ok(settings.context_wait.to_value()) + } _ => unimplemented!(), } } @@ -1080,7 +1195,33 @@ impl ObjectImpl for JitterBuffer { } } -impl ElementImpl for JitterBuffer {} +impl ElementImpl for JitterBuffer { + fn change_state( + &self, + element: &gst::Element, + transition: gst::StateChange, + ) -> Result { + gst_trace!(self.cat, obj: element, "Changing state {:?}", transition); + + match transition { + gst::StateChange::NullToReady => { + let settings = self.settings.lock().unwrap().clone(); + let mut state = self.state.lock().unwrap(); + + state.io_context = + Some(IOContext::new(&settings.context, settings.context_wait).unwrap()); + } + gst::StateChange::ReadyToNull => { + let mut state = self.state.lock().unwrap(); + + let _ = state.io_context.take(); + } + _ => (), + } + + self.parent_change_state(element, transition) + } +} pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { gst::Element::register(