jitterbuffer: initial thread sharing support

This commit is contained in:
Mathieu Duponchelle 2019-08-23 01:04:14 +02:00 committed by Mathieu Duponchelle
parent 33cb599464
commit 5496067925
2 changed files with 216 additions and 37 deletions

View file

@ -28,7 +28,7 @@ use futures::future;
use futures::stream::futures_unordered::FuturesUnordered;
use futures::sync::mpsc as futures_mpsc;
use futures::sync::oneshot;
use futures::{Future, Stream};
use futures::{Async, Future, Stream};
use tokio::reactor;
use tokio_current_thread;
use tokio_timer::timer;
@ -399,11 +399,13 @@ impl Ord for TimerEntry {
}
}
#[allow(unused)]
pub struct Interval {
receiver: futures_mpsc::UnboundedReceiver<()>,
}
impl Interval {
#[allow(unused)]
pub fn new(context: &IOContext, interval: time::Duration) -> Self {
let (sender, receiver) = futures_mpsc::unbounded();
@ -428,3 +430,39 @@ impl Stream for Interval {
self.receiver.poll()
}
}
pub struct Timeout {
receiver: futures_mpsc::UnboundedReceiver<()>,
}
impl Timeout {
pub fn new(context: &IOContext, timeout: time::Duration) -> Self {
let (sender, receiver) = futures_mpsc::unbounded();
let mut timers = context.0.timers.lock().unwrap();
let entry = TimerEntry {
time: time::Instant::now() + timeout,
id: TIMER_ENTRY_ID.fetch_add(1, atomic::Ordering::Relaxed),
interval: None,
sender,
};
timers.push(entry);
Self { receiver }
}
}
impl Future for Timeout {
type Item = ();
type Error = ();
fn poll(&mut self) -> futures::Poll<Self::Item, Self::Error> {
let res = self.receiver.poll()?;
match res {
Async::NotReady => Ok(Async::NotReady),
Async::Ready(None) => unreachable!(),
Async::Ready(Some(_)) => Ok(Async::Ready(())),
}
}
}

View file

@ -27,6 +27,12 @@ use gst_rtp::RTPBuffer;
use std::cmp::{max, min, Ordering};
use std::collections::BTreeSet;
use std::sync::{Mutex, MutexGuard};
use std::time;
use futures::sync::oneshot;
use futures::Future;
use iocontext::*;
use RTPJitterBuffer;
use RTPJitterBufferItem;
@ -36,6 +42,8 @@ const DEFAULT_LATENCY_MS: u32 = 200;
const DEFAULT_DO_LOST: bool = false;
const DEFAULT_MAX_DROPOUT_TIME: u32 = 60000;
const DEFAULT_MAX_MISORDER_TIME: u32 = 2000;
const DEFAULT_CONTEXT: &str = "";
const DEFAULT_CONTEXT_WAIT: u32 = 20;
#[derive(Debug, Clone)]
struct Settings {
@ -43,6 +51,8 @@ struct Settings {
do_lost: bool,
max_dropout_time: u32,
max_misorder_time: u32,
context: String,
context_wait: u32,
}
impl Default for Settings {
@ -52,11 +62,13 @@ impl Default for Settings {
do_lost: DEFAULT_DO_LOST,
max_dropout_time: DEFAULT_MAX_DROPOUT_TIME,
max_misorder_time: DEFAULT_MAX_MISORDER_TIME,
context: DEFAULT_CONTEXT.into(),
context_wait: DEFAULT_CONTEXT_WAIT,
}
}
}
static PROPERTIES: [subclass::Property; 5] = [
static PROPERTIES: [subclass::Property; 7] = [
subclass::Property("latency", |name| {
glib::ParamSpec::uint(
name,
@ -108,6 +120,26 @@ static PROPERTIES: [subclass::Property; 5] = [
glib::ParamFlags::READABLE,
)
}),
subclass::Property("context", |name| {
glib::ParamSpec::string(
name,
"Context",
"Context name to share threads with",
Some(DEFAULT_CONTEXT),
glib::ParamFlags::READWRITE,
)
}),
subclass::Property("context-wait", |name| {
glib::ParamSpec::uint(
name,
"Context Wait",
"Throttle poll loop to run at most once every this many ms",
0,
1000,
DEFAULT_CONTEXT_WAIT,
glib::ParamFlags::READWRITE,
)
}),
];
#[derive(Eq)]
@ -149,6 +181,7 @@ impl PartialEq for GapPacket {
}
struct State {
io_context: Option<IOContext>,
jbuf: glib::SendUniqueCell<RTPJitterBuffer>,
packet_rate_ctx: RTPPacketRateCtx,
clock_rate: i32,
@ -169,11 +202,14 @@ struct State {
earliest_seqnum: u16,
last_popped_pts: gst::ClockTime,
discont: bool,
cancel: Option<oneshot::Sender<()>>,
last_res: Result<gst::FlowSuccess, gst::FlowError>,
}
impl Default for State {
fn default() -> State {
State {
io_context: None,
jbuf: glib::SendUniqueCell::new(RTPJitterBuffer::new()).unwrap(),
packet_rate_ctx: RTPPacketRateCtx::new(),
clock_rate: -1,
@ -194,6 +230,8 @@ impl Default for State {
earliest_seqnum: 0,
last_popped_pts: gst::CLOCK_TIME_NONE,
discont: false,
cancel: None,
last_res: Ok(gst::FlowSuccess::Ok),
}
}
}
@ -357,6 +395,8 @@ impl JitterBuffer {
pad: &gst::Pad,
element: &gst::Element,
) -> Result<gst::FlowSuccess, gst::FlowError> {
gst_info!(self.cat, obj: element, "Resetting");
state.jbuf.borrow().flush();
state.jbuf.borrow().reset_skew();
state.discont = true;
@ -493,6 +533,7 @@ impl JitterBuffer {
if gap <= 0 {
state.num_late += 1;
gst_debug!(self.cat, obj: element, "Dropping late {}", seq);
return Ok(gst::FlowSuccess::Ok);
}
}
@ -689,28 +730,18 @@ impl JitterBuffer {
state.num_pushed += 1;
gst_debug!(self.cat, obj: &self.src_pad, "Pushing buffer {:?}", buffer);
gst_debug!(self.cat, obj: &self.src_pad, "Pushing buffer {:?} with seq {}", buffer, seq);
self.src_pad.push(buffer.to_owned())
}
fn enqueue_item(
&self,
state: &mut MutexGuard<State>,
pad: &gst::Pad,
element: &gst::Element,
buffer: Option<gst::Buffer>,
) -> Result<gst::FlowSuccess, gst::FlowError> {
fn schedule(&self, state: &mut MutexGuard<State>, element: &gst::Element) {
let settings = self.settings.lock().unwrap().clone();
let latency_ns = settings.latency_ms as u64 * gst::MSECOND;
drop(settings);
let now = self.get_current_running_time(element);
if let Some(buf) = buffer {
self.store(state, pad, element, buf)?;
}
gst_debug!(
self.cat,
obj: element,
@ -721,34 +752,94 @@ impl JitterBuffer {
latency_ns
);
if now.is_some()
&& state.earliest_pts.is_some()
&& state.earliest_pts + latency_ns - state.packet_spacing < now
{
let mut cont = true;
if state.earliest_pts.is_some() {
let next_wakeup = state.earliest_pts + latency_ns - state.packet_spacing;
while cont {
let (head_pts, head_seq) = state.jbuf.borrow().peek();
if head_pts == state.earliest_pts && head_seq == state.earliest_seqnum as u32 {
cont = false;
let timeout = {
if next_wakeup > now {
(next_wakeup - now).nseconds().unwrap()
} else {
0
}
};
self.pop_and_push(state, element)?;
if let Some(cancel) = state.cancel.take() {
let _ = cancel.send(());
}
if !cont {
let (earliest_pts, earliest_seqnum) = state.jbuf.borrow().find_earliest();
state.earliest_pts = earliest_pts;
state.earliest_seqnum = earliest_seqnum as u16;
let (cancel, cancel_handler) = oneshot::channel();
if state.earliest_pts.is_some() {
cont = state.earliest_pts + latency_ns - state.packet_spacing < now;
let element_clone = element.clone();
gst_debug!(self.cat, obj: element, "Scheduling wakeup in {}", timeout);
let timer = Timeout::new(
state.io_context.as_ref().unwrap(),
time::Duration::from_nanos(timeout),
)
.map_err(|e| panic!("timer failed; err={:?}", e))
.and_then(move |_| {
let jb = Self::from_instance(&element_clone);
let mut state = jb.state.lock().unwrap();
/* Check earliest_pts after taking the lock again */
let mut cont = state.earliest_pts.is_some();
gst_debug!(
jb.cat,
obj: &element_clone,
"Woke back up, earliest_pts {}",
state.earliest_pts
);
let _ = state.cancel.take();
while cont {
let (head_pts, head_seq) = state.jbuf.borrow().peek();
if head_pts == state.earliest_pts && head_seq == state.earliest_seqnum as u32 {
cont = false;
}
state.last_res = jb.pop_and_push(&mut state, &element_clone);
if !cont {
let (earliest_pts, earliest_seqnum) = state.jbuf.borrow().find_earliest();
state.earliest_pts = earliest_pts;
state.earliest_seqnum = earliest_seqnum as u16;
if state.earliest_pts.is_some() {
cont = state.earliest_pts + latency_ns - state.packet_spacing < now;
}
}
}
}
jb.schedule(&mut state, &element_clone);
Ok(())
});
let future = timer.select(cancel_handler).then(|_| Ok(()));
state.cancel = Some(cancel);
state.io_context.as_ref().unwrap().spawn(future);
}
}
fn enqueue_item(
&self,
state: &mut MutexGuard<State>,
pad: &gst::Pad,
element: &gst::Element,
buffer: Option<gst::Buffer>,
) -> Result<gst::FlowSuccess, gst::FlowError> {
if let Some(buf) = buffer {
self.store(state, pad, element, buf)?;
}
Ok(gst::FlowSuccess::Ok)
self.schedule(state, element);
state.last_res
}
fn drain(&self, state: &mut MutexGuard<State>, element: &gst::Element) -> bool {
@ -770,10 +861,16 @@ impl JitterBuffer {
ret
}
fn flush(&self) {
fn flush(&self, element: &gst::Element) {
let mut state = self.state.lock().unwrap();
gst_info!(self.cat, obj: element, "Flushing");
let io_context = state.io_context.take();
*state = State::default();
state.io_context = io_context;
}
fn sink_chain(
@ -794,7 +891,7 @@ impl JitterBuffer {
match event.view() {
EventView::FlushStop(..) => {
self.flush();
self.flush(element);
}
EventView::Segment(e) => {
let mut state = self.state.lock().unwrap();
@ -822,7 +919,6 @@ impl JitterBuffer {
query: &mut gst::QueryRef,
) -> bool {
use gst::QueryView;
gst_log!(self.cat, obj: pad, "Forwarding query {:?}", query);
match query.view_mut() {
@ -1031,6 +1127,17 @@ impl ObjectImpl for JitterBuffer {
let mut settings = self.settings.lock().unwrap();
settings.max_misorder_time = value.get_some().expect("type checked upstream");
}
subclass::Property("context", ..) => {
let mut settings = self.settings.lock().unwrap();
settings.context = value
.get()
.expect("type checked upstream")
.unwrap_or_else(|| "".into());
}
subclass::Property("context-wait", ..) => {
let mut settings = self.settings.lock().unwrap();
settings.context_wait = value.get_some().expect("type checked upstream");
}
_ => unimplemented!(),
}
}
@ -1067,6 +1174,14 @@ impl ObjectImpl for JitterBuffer {
);
Ok(s.to_value())
}
subclass::Property("context", ..) => {
let settings = self.settings.lock().unwrap();
Ok(settings.context.to_value())
}
subclass::Property("context-wait", ..) => {
let settings = self.settings.lock().unwrap();
Ok(settings.context_wait.to_value())
}
_ => unimplemented!(),
}
}
@ -1080,7 +1195,33 @@ impl ObjectImpl for JitterBuffer {
}
}
impl ElementImpl for JitterBuffer {}
impl ElementImpl for JitterBuffer {
fn change_state(
&self,
element: &gst::Element,
transition: gst::StateChange,
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
gst_trace!(self.cat, obj: element, "Changing state {:?}", transition);
match transition {
gst::StateChange::NullToReady => {
let settings = self.settings.lock().unwrap().clone();
let mut state = self.state.lock().unwrap();
state.io_context =
Some(IOContext::new(&settings.context, settings.context_wait).unwrap());
}
gst::StateChange::ReadyToNull => {
let mut state = self.state.lock().unwrap();
let _ = state.io_context.take();
}
_ => (),
}
self.parent_change_state(element, transition)
}
}
pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
gst::Element::register(