rtpbin2: implement jitterbuffer

The jitterbuffer implements both reordering and duplicate packet
handling.

Co-Authored-By: Sebastian Dröge <sebastian@centricular.com>
Co-Authored-By: Matthew Waters <matthew@centricular.com>
Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1426>
This commit is contained in:
Mathieu Duponchelle 2023-11-27 14:29:07 +01:00 committed by Matthew Waters
parent 2b4ec75bc5
commit 1865899621
9 changed files with 1107 additions and 71 deletions

View file

@ -7241,7 +7241,7 @@
"construct": false,
"construct-only": false,
"controllable": false,
"default": "0",
"default": "200",
"max": "-1",
"min": "0",
"mutable": "ready",

View file

@ -33,7 +33,7 @@ smallvec = { version = "1.11", features = ["union", "write", "const_generics", "
thiserror = "1"
time = { version = "0.3", default-features = false, features = ["std"] }
# TODO: experiment with other async executors (mio, async-std, etc)
tokio = { version = "1", default-features = false, features = ["rt-multi-thread", "time"] }
tokio = { version = "1", default-features = false, features = ["rt-multi-thread", "time", "sync"] }
[dev-dependencies]
gst-check = { workspace = true, features = ["v1_20"] }

View file

@ -1,6 +1,6 @@
// SPDX-License-Identifier: MPL-2.0
use std::collections::HashMap;
use std::collections::{BTreeMap, HashMap};
use std::net::SocketAddr;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
@ -11,7 +11,9 @@ use futures::future::{AbortHandle, Abortable};
use futures::StreamExt;
use gst::{glib, prelude::*, subclass::prelude::*};
use once_cell::sync::Lazy;
use tokio::sync::mpsc;
use super::jitterbuffer::{self, JitterBuffer};
use super::session::{
KeyUnitRequestType, RecvReply, RequestRemoteKeyUnitReply, RtcpRecvReply, RtpProfile, SendReply,
Session, RTCP_MIN_REPORT_INTERVAL,
@ -20,7 +22,7 @@ use super::source::{ReceivedRb, SourceState};
use crate::rtpbin2::RUNTIME;
const DEFAULT_LATENCY: gst::ClockTime = gst::ClockTime::from_mseconds(0);
const DEFAULT_LATENCY: gst::ClockTime = gst::ClockTime::from_mseconds(200);
const DEFAULT_MIN_RTCP_INTERVAL: Duration = RTCP_MIN_REPORT_INTERVAL;
const DEFAULT_REDUCED_SIZE_RTCP: bool = false;
@ -136,18 +138,131 @@ impl futures::stream::Stream for RtcpSendStream {
}
}
#[derive(Debug, PartialEq, Eq)]
#[derive(Debug)]
#[must_use = "futures/streams/sinks do nothing unless you `.await` or poll them"]
struct JitterBufferStream {
session: Arc<Mutex<BinSessionInner>>,
sleep: Pin<Box<tokio::time::Sleep>>,
}
impl JitterBufferStream {
fn new(session: Arc<Mutex<BinSessionInner>>) -> Self {
Self {
session,
sleep: Box::pin(tokio::time::sleep(Duration::from_secs(1))),
}
}
}
impl futures::stream::Stream for JitterBufferStream {
type Item = (gst::Buffer, mpsc::Sender<gst::Buffer>);
fn poll_next(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let now = Instant::now();
let mut lowest_wait = None;
let mut session = self.session.lock().unwrap();
for pad in session.rtp_recv_srcpads.iter_mut() {
let mut jitterbuffer_store = pad.jitter_buffer_store.lock().unwrap();
match jitterbuffer_store.jitterbuffer.poll(now) {
jitterbuffer::PollResult::Forward { id, discont } => {
if let Some(ref tx) = pad.tx {
let mut packet = jitterbuffer_store
.store
.remove(&id)
.unwrap_or_else(|| panic!("Buffer with id {id} not in store!"));
if discont {
gst::debug!(CAT, obj: pad.pad, "Forwarding discont buffer");
let packet_mut = packet.make_mut();
packet_mut.set_flags(gst::BufferFlags::DISCONT);
}
return Poll::Ready(Some((packet, tx.clone())));
}
}
jitterbuffer::PollResult::Timeout(timeout) => {
if lowest_wait.map_or(true, |lowest_wait| timeout < lowest_wait) {
lowest_wait = Some(timeout);
}
}
jitterbuffer::PollResult::Empty => {
continue;
}
}
}
session.jitterbuffer_waker = Some(cx.waker().clone());
drop(session);
if let Some(timeout) = lowest_wait {
let this = self.get_mut();
this.sleep.as_mut().reset(timeout.into());
if !std::future::Future::poll(this.sleep.as_mut(), cx).is_pending() {
cx.waker().wake_by_ref();
}
}
Poll::Pending
}
}
#[derive(Debug)]
struct JitterBufferStore {
store: BTreeMap<usize, gst::Buffer>,
jitterbuffer: JitterBuffer,
}
#[derive(Debug, Clone)]
struct RtpRecvSrcPad {
pt: u8,
ssrc: u32,
pad: gst::Pad,
tx: Option<mpsc::Sender<gst::Buffer>>,
jitter_buffer_store: Arc<Mutex<JitterBufferStore>>,
}
impl PartialEq for RtpRecvSrcPad {
fn eq(&self, other: &Self) -> bool {
self.pt == other.pt && self.ssrc == other.ssrc && self.pad == other.pad
}
}
impl Eq for RtpRecvSrcPad {}
impl RtpRecvSrcPad {
fn activate(&mut self, session: &BinSession) {
let session_inner = session.inner.lock().unwrap();
let seqnum = session_inner.rtp_recv_sink_seqnum.unwrap();
let stream_id = format!("{}/{}", self.pt, self.ssrc);
let stream_start = gst::event::StreamStart::builder(&stream_id)
.group_id(session_inner.rtp_recv_sink_group_id.unwrap())
.seqnum(seqnum)
.build();
let caps = session_inner.caps_from_pt_ssrc(self.pt, self.ssrc);
let caps = gst::event::Caps::builder(&caps).seqnum(seqnum).build();
let segment =
gst::event::Segment::builder(session_inner.rtp_recv_sink_segment.as_ref().unwrap())
.seqnum(seqnum)
.build();
drop(session_inner);
self.pad.set_active(true).unwrap();
let _ = self.pad.store_sticky_event(&stream_start);
let _ = self.pad.store_sticky_event(&caps);
let _ = self.pad.store_sticky_event(&segment);
}
}
#[derive(Debug)]
struct HeldRecvBuffer {
hold_id: Option<usize>,
buffer: gst::Buffer,
srcpad: gst::Pad,
pad: RtpRecvSrcPad,
new_pad: bool,
}
@ -189,6 +304,9 @@ struct BinSessionInner {
caps_map: HashMap<u8, HashMap<u32, gst::Caps>>,
recv_store: Vec<HeldRecvBuffer>,
jitterbuffer_task: Option<JitterBufferTask>,
jitterbuffer_waker: Option<Waker>,
rtp_recv_srcpads: Vec<RtpRecvSrcPad>,
recv_flow_combiner: Arc<Mutex<gst_base::UniqueFlowCombiner>>,
@ -215,6 +333,9 @@ impl BinSessionInner {
caps_map: HashMap::default(),
recv_store: vec![],
jitterbuffer_task: None,
jitterbuffer_waker: None,
rtp_recv_srcpads: vec![],
recv_flow_combiner: Arc::new(Mutex::new(gst_base::UniqueFlowCombiner::new())),
@ -238,18 +359,72 @@ impl BinSessionInner {
)
}
fn start_rtp_recv_task(&mut self, pad: &gst::Pad) -> Result<(), glib::BoolError> {
gst::debug!(CAT, obj: pad, "Starting rtp recv src task");
let (tx, mut rx) = mpsc::channel(1);
let recv_pad = self
.rtp_recv_srcpads
.iter_mut()
.find(|recv| &recv.pad == pad)
.unwrap();
recv_pad.tx = Some(tx);
let pad_weak = pad.downgrade();
let recv_flow_combiner = self.recv_flow_combiner.clone();
// A task per received ssrc may be a bit excessive.
// Other options are:
// - Single task per received input stream rather than per output ssrc/pt
// - somehow pool multiple recv tasks together (thread pool)
pad.start_task(move || {
let Some(pad) = pad_weak.upgrade() else {
return;
};
let Some(buffer) = rx.blocking_recv() else {
gst::debug!(CAT, obj: pad, "Pad channel was closed, pausing");
let _ = pad.pause_task();
return;
};
let mut recv_flow_combiner = recv_flow_combiner.lock().unwrap();
let _combined_flow = recv_flow_combiner.update_pad_flow(&pad, pad.push(buffer));
// TODO: store flow, return only on session pads?
})?;
gst::debug!(CAT, obj: pad, "Task started");
Ok(())
}
fn stop_rtp_recv_task(&mut self, pad: &gst::Pad) {
let recv_pad = self
.rtp_recv_srcpads
.iter_mut()
.find(|recv| &recv.pad == pad)
.unwrap();
// Drop the sender so the task is unblocked and we don't deadlock below
drop(recv_pad.tx.take());
gst::debug!(CAT, obj: pad, "Stopping task");
let _ = pad.stop_task();
}
fn get_or_create_rtp_recv_src(
&mut self,
rtpbin: &RtpBin2,
pt: u8,
ssrc: u32,
) -> (gst::Pad, bool) {
) -> (RtpRecvSrcPad, bool) {
if let Some(pad) = self
.rtp_recv_srcpads
.iter()
.find(|&r| r.ssrc == ssrc && r.pt == pt)
{
(pad.pad.clone(), false)
(pad.clone(), false)
} else {
let src_templ = rtpbin.obj().pad_template("rtp_recv_src_%u_%u_%u").unwrap();
let id = self.id;
@ -275,56 +450,42 @@ impl BinSessionInner {
|this| this.rtp_recv_src_event(pad, event, id, pt, ssrc),
)
})
.activatemode_function({
let this = rtpbin.downgrade();
move |pad, _parent, mode, active| {
let Some(this) = this.upgrade() else {
return Err(gst::LoggableError::new(
*CAT,
glib::bool_error!("rtpbin does not exist anymore"),
));
};
this.rtp_recv_src_activatemode(pad, mode, active, id)
}
})
.name(format!("rtp_recv_src_{}_{}_{}", self.id, pt, ssrc))
.build();
srcpad.set_active(true).unwrap();
srcpad.use_fixed_caps();
let settings = rtpbin.settings.lock().unwrap();
let recv_pad = RtpRecvSrcPad {
pt,
ssrc,
pad: srcpad.clone(),
tx: None,
jitter_buffer_store: Arc::new(Mutex::new(JitterBufferStore {
store: BTreeMap::new(),
jitterbuffer: JitterBuffer::new(settings.latency.into()),
})),
};
let stream_id = format!("{pt}/{ssrc}");
let mut stream_start = gst::event::StreamStart::builder(&stream_id);
if let Some(group_id) = self
.rtp_recv_sinkpad
.as_ref()
.unwrap()
.sticky_event::<gst::event::StreamStart>(0)
.and_then(|ss| ss.group_id())
{
stream_start = stream_start.group_id(group_id);
}
let stream_start = stream_start.build();
let seqnum = stream_start.seqnum();
let _ = srcpad.store_sticky_event(&stream_start);
let caps = self.caps_from_pt_ssrc(pt, ssrc);
let caps = gst::event::Caps::builder(&caps).seqnum(seqnum).build();
let _ = srcpad.store_sticky_event(&caps);
let segment = if let Some(segment) = self
.rtp_recv_sinkpad
.as_ref()
.unwrap()
.sticky_event::<gst::event::Segment>(0)
.map(|s| s.segment().clone())
{
segment
} else {
let mut segment = gst::Segment::new();
segment.set_format(gst::Format::Time);
segment
};
let segment = gst::event::Segment::new(&segment);
let _ = srcpad.store_sticky_event(&segment);
self.recv_flow_combiner
.lock()
.unwrap()
.add_pad(&recv_pad.pad);
self.rtp_recv_srcpads.push(recv_pad);
(srcpad, true)
self.rtp_recv_srcpads.push(recv_pad.clone());
(recv_pad, true)
}
}
}
@ -393,6 +554,7 @@ impl State {
source_stats = source_stats.field("report-blocks", rbs);
}
}
// TODO: add jitter, packets-lost
session_stats =
session_stats.field(ls.ssrc().to_string(), source_stats.build());
@ -488,6 +650,16 @@ impl State {
session_stats = session_stats.field(rr.ssrc().to_string(), source_stats);
}
}
let jb_stats = gst::List::new(session.rtp_recv_srcpads.iter().map(|pad| {
let mut jb_stats = pad.jitter_buffer_store.lock().unwrap().jitterbuffer.stats();
jb_stats.set_value("ssrc", (pad.ssrc as i32).to_send_value());
jb_stats.set_value("pt", (pad.pt as i32).to_send_value());
jb_stats
}));
session_stats = session_stats.field("jitterbuffer-stats", jb_stats);
ret = ret.field(sess_id.to_string(), session_stats.build());
}
ret.build()
@ -504,7 +676,48 @@ struct RtcpTask {
abort_handle: AbortHandle,
}
#[derive(Debug)]
struct JitterBufferTask {
abort_handle: AbortHandle,
}
impl RtpBin2 {
fn rtp_recv_src_activatemode(
&self,
pad: &gst::Pad,
mode: gst::PadMode,
active: bool,
id: usize,
) -> Result<(), gst::LoggableError> {
if let gst::PadMode::Push = mode {
let state = self.state.lock().unwrap();
let Some(session) = state.session_by_id(id) else {
if active {
return Err(gst::LoggableError::new(
*CAT,
glib::bool_error!("Can't activate pad of unknown session {id}"),
));
} else {
return Ok(());
}
};
let mut session = session.inner.lock().unwrap();
if active {
session.start_rtp_recv_task(pad)?;
} else {
session.stop_rtp_recv_task(pad);
}
Ok(())
} else {
Err(gst::LoggableError::new(
*CAT,
glib::bool_error!("Unsupported pad mode {mode:?}"),
))
}
}
fn start_rtcp_task(&self) {
let mut rtcp_task = self.rtcp_task.lock().unwrap();
@ -551,6 +764,32 @@ impl RtpBin2 {
}
}
fn start_jitterbuffer_task(&self, session: &BinSession, inner: &mut BinSessionInner) {
if inner.jitterbuffer_task.is_some() {
return;
}
// run the runtime from another task to prevent the "start a runtime from within a runtime" panic
// when the plugin is statically linked.
let (abort_handle, abort_registration) = AbortHandle::new_pair();
RUNTIME.spawn({
let inner = session.inner.clone();
async move {
let future = Abortable::new(Self::jitterbuffer_task(inner), abort_registration);
future.await
}
});
inner.jitterbuffer_task = Some(JitterBufferTask { abort_handle });
}
async fn jitterbuffer_task(state: Arc<Mutex<BinSessionInner>>) {
let mut stream = JitterBufferStream::new(state);
while let Some((buffer, tx)) = stream.next().await {
let _ = tx.send(buffer).await;
}
}
pub fn src_query(&self, pad: &gst::Pad, query: &mut gst::QueryRef) -> bool {
gst::log!(CAT, obj: pad, "Handling query {query:?}");
@ -589,6 +828,8 @@ impl RtpBin2 {
let pads = session
.rtp_recv_srcpads
.iter()
// Only include pads that are already part of the element
.filter(|r| state.pads_session_id_map.contains_key(&r.pad))
.map(|r| r.pad.clone())
.collect();
return gst::Iterator::from_vec(pads);
@ -615,13 +856,24 @@ impl RtpBin2 {
&self,
_pad: &gst::Pad,
id: usize,
buffer: gst::Buffer,
mut buffer: gst::Buffer,
) -> Result<gst::FlowSuccess, gst::FlowError> {
let state = self.state.lock().unwrap();
let Some(session) = state.session_by_id(id) else {
return Err(gst::FlowError::Error);
};
// TODO: this is different from the old C implementation, where we
// simply used the RTP timestamps as they were instead of doing any
// sort of skew calculations.
//
// Check if this makes sense or if this leads to issue with eg interleaved
// TCP.
if buffer.dts().is_none() {
let buf_mut = buffer.make_mut();
buf_mut.set_dts(self.obj().current_running_time());
}
let addr: Option<SocketAddr> =
buffer
.meta::<gst_net::NetAddressMeta>()
@ -654,44 +906,47 @@ impl RtpBin2 {
};
let session = session.clone();
let mut session = session.inner.lock().unwrap();
let mut session_inner = session.inner.lock().unwrap();
drop(state);
// Start jitterbuffer task now if not started yet
self.start_jitterbuffer_task(&session, &mut session_inner);
let now = Instant::now();
let mut buffers_to_push = vec![];
loop {
match session.session.handle_recv(&rtp, addr, now) {
match session_inner.session.handle_recv(&rtp, addr, now) {
RecvReply::SsrcCollision(_ssrc) => (), // TODO: handle ssrc collision
RecvReply::NewSsrc(_ssrc, _pt) => (), // TODO: signal new ssrc externally
RecvReply::Hold(hold_id) => {
let pt = rtp.payload_type();
let ssrc = rtp.ssrc();
drop(mapped);
let (srcpad, new_pad) = session.get_or_create_rtp_recv_src(self, pt, ssrc);
session.recv_store.push(HeldRecvBuffer {
let (pad, new_pad) = session_inner.get_or_create_rtp_recv_src(self, pt, ssrc);
session_inner.recv_store.push(HeldRecvBuffer {
hold_id: Some(hold_id),
buffer,
srcpad,
pad,
new_pad,
});
break;
}
RecvReply::Drop(hold_id) => {
if let Some(pos) = session
if let Some(pos) = session_inner
.recv_store
.iter()
.position(|b| b.hold_id.unwrap() == hold_id)
{
session.recv_store.remove(pos);
session_inner.recv_store.remove(pos);
}
}
RecvReply::Forward(hold_id) => {
if let Some(pos) = session
if let Some(pos) = session_inner
.recv_store
.iter()
.position(|b| b.hold_id.unwrap() == hold_id)
{
buffers_to_push.push(session.recv_store.remove(pos));
buffers_to_push.push(session_inner.recv_store.remove(pos));
} else {
unreachable!();
}
@ -701,31 +956,71 @@ impl RtpBin2 {
let pt = rtp.payload_type();
let ssrc = rtp.ssrc();
drop(mapped);
let (srcpad, new_pad) = session.get_or_create_rtp_recv_src(self, pt, ssrc);
let (pad, new_pad) = session_inner.get_or_create_rtp_recv_src(self, pt, ssrc);
buffers_to_push.push(HeldRecvBuffer {
hold_id: None,
buffer,
srcpad,
pad,
new_pad,
});
break;
}
}
}
let recv_flow_combiner = session.recv_flow_combiner.clone();
drop(session);
let mut recv_flow_combiner = recv_flow_combiner.lock().unwrap();
for held in buffers_to_push {
drop(session_inner);
for mut held in buffers_to_push {
// TODO: handle other processing
if held.new_pad {
held.pad.activate(&session);
self.obj().add_pad(&held.pad.pad).unwrap();
let mut state = self.state.lock().unwrap();
state.pads_session_id_map.insert(held.srcpad.clone(), id);
state.pads_session_id_map.insert(held.pad.pad.clone(), id);
drop(state);
self.obj().add_pad(&held.srcpad).unwrap();
}
recv_flow_combiner.update_pad_flow(&held.srcpad, held.srcpad.push(held.buffer))?;
let mapped = held.buffer.map_readable().map_err(|e| {
gst::error!(CAT, imp: self, "Failed to map input buffer {e:?}");
gst::FlowError::Error
})?;
let rtp = match rtp_types::RtpPacket::parse(&mapped) {
Ok(rtp) => rtp,
Err(e) => {
gst::error!(CAT, imp: self, "Failed to parse input as valid rtp packet: {e:?}");
return Ok(gst::FlowSuccess::Ok);
}
};
// FIXME: Should block if too many packets are stored here because the source pad task
// is blocked
let mut jitterbuffer_store = held.pad.jitter_buffer_store.lock().unwrap();
match jitterbuffer_store.jitterbuffer.queue(
&rtp,
held.buffer.dts().unwrap().nseconds(),
now,
) {
jitterbuffer::QueueResult::Queued(id) => {
drop(mapped);
jitterbuffer_store.store.insert(id, held.buffer);
}
jitterbuffer::QueueResult::Late => {
gst::warning!(CAT, "Late buffer was dropped");
}
jitterbuffer::QueueResult::Duplicate => {
gst::warning!(CAT, "Duplicate buffer was dropped");
}
}
}
let session_inner = session.inner.lock().unwrap();
if let Some(ref waker) = session_inner.jitterbuffer_waker {
waker.wake_by_ref();
}
drop(session_inner);
Ok(gst::FlowSuccess::Ok)
}
@ -907,18 +1202,67 @@ impl RtpBin2 {
}
}
fn rtp_recv_sink_event(&self, pad: &gst::Pad, event: gst::Event, id: usize) -> bool {
fn rtp_recv_sink_event(&self, pad: &gst::Pad, mut event: gst::Event, id: usize) -> bool {
match event.view() {
gst::EventView::StreamStart(stream_start) => {
let state = self.state.lock().unwrap();
if let Some(session) = state.session_by_id(id) {
let mut session = session.inner.lock().unwrap();
let group_id = stream_start.group_id();
session.rtp_recv_sink_group_id =
Some(group_id.unwrap_or_else(gst::GroupId::next));
}
true
}
gst::EventView::Caps(caps) => {
if let Some((pt, clock_rate)) = Self::pt_clock_rate_from_caps(caps.caps()) {
let state = self.state.lock().unwrap();
if let Some(session) = state.session_by_id(id) {
let mut session = session.inner.lock().unwrap();
let state = self.state.lock().unwrap();
if let Some(session) = state.session_by_id(id) {
let mut session = session.inner.lock().unwrap();
let caps = caps.caps_owned();
if let Some((pt, clock_rate)) = Self::pt_clock_rate_from_caps(&caps) {
session.session.set_pt_clock_rate(pt, clock_rate);
}
session.rtp_recv_sink_caps = Some(caps);
}
true
}
gst::EventView::Segment(segment) => {
let state = self.state.lock().unwrap();
if let Some(session) = state.session_by_id(id) {
let mut session = session.inner.lock().unwrap();
let segment = segment.segment();
let segment = match segment.downcast_ref::<gst::ClockTime>() {
Some(segment) => segment.clone(),
None => {
gst::warning!(CAT, obj: pad, "Only TIME segments are supported");
let segment = gst::FormattedSegment::new();
let seqnum = event.seqnum();
event = gst::event::Segment::builder(&segment)
.seqnum(seqnum)
.build();
segment
}
};
session.rtp_recv_sink_segment = Some(segment);
session.rtp_recv_sink_seqnum = Some(event.seqnum());
}
drop(state);
gst::Pad::event_default(pad, Some(&*self.obj()), event)
}
gst::EventView::Eos(_eos) => {
let now = Instant::now();
let mut state = self.state.lock().unwrap();
@ -1444,6 +1788,11 @@ impl ElementImpl for RtpBin2 {
session.recv_flow_combiner.lock().unwrap().clear();
session.rtp_recv_srcpads.clear();
session.recv_store.clear();
if let Some(jitterbuffer_task) = session.jitterbuffer_task.take() {
jitterbuffer_task.abort_handle.abort();
}
session.jitterbuffer_waker = None;
}
if Some(pad) == session.rtp_send_sinkpad.as_ref() {
@ -1521,6 +1870,11 @@ impl ElementImpl for RtpBin2 {
session.rtp_recv_sink_group_id = None;
session.caps_map.clear();
if let Some(jitterbuffer_task) = session.jitterbuffer_task.take() {
jitterbuffer_task.abort_handle.abort();
}
session.jitterbuffer_waker = None;
}
for pad in removed_pads.iter() {
state.pads_session_id_map.remove(pad);

View file

@ -0,0 +1,528 @@
use crate::utils::ExtendedSeqnum;
use rtp_types::RtpPacket;
use std::cmp::Ordering;
use std::collections::BTreeSet;
use std::time::{Duration, Instant};
#[derive(Debug, Clone, Copy)]
struct Stats {
num_late: u64,
num_lost: u64,
num_duplicates: u64,
num_pushed: u64,
}
impl From<Stats> for gst::Structure {
fn from(stats: Stats) -> gst::Structure {
gst::Structure::builder("application/x-rtp-jitterbuffer-stats")
.field("num-late", stats.num_late)
.field("num-duplicates", stats.num_duplicates)
.field("num-lost", stats.num_lost)
.field("num-pushed", stats.num_pushed)
.build()
}
}
#[derive(Debug)]
pub struct JitterBuffer {
packet_counter: usize,
// A set of extended seqnums that we've already seen through,
// intentionally trimmed separately from the items list so that
// we can detect duplicates after the first copy has exited the
// queue
seqnums: BTreeSet<u64>,
items: BTreeSet<Item>,
latency: Duration,
// Arrival time, PTS
base_times: Option<(Instant, u64)>,
last_output_seqnum: Option<u64>,
extended_seqnum: ExtendedSeqnum,
last_input_ts: Option<u64>,
stats: Stats,
}
#[derive(Debug, PartialEq, Eq)]
pub enum PollResult {
Forward { id: usize, discont: bool },
Timeout(Instant),
Empty,
}
#[derive(Debug, PartialEq, Eq)]
pub enum QueueResult {
Queued(usize),
Late,
Duplicate,
}
#[derive(Eq, Debug)]
struct Item {
id: usize,
// If not set, this is an event / query
pts: Option<u64>,
seqnum: u64,
}
impl Ord for Item {
fn cmp(&self, other: &Self) -> Ordering {
self.seqnum.cmp(&other.seqnum)
}
}
impl PartialOrd for Item {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl PartialEq for Item {
fn eq(&self, other: &Self) -> bool {
self.cmp(other) == Ordering::Equal
}
}
impl JitterBuffer {
pub fn new(latency: Duration) -> Self {
Self {
packet_counter: 0,
seqnums: BTreeSet::new(),
items: BTreeSet::new(),
latency,
base_times: None,
last_input_ts: None,
last_output_seqnum: None,
extended_seqnum: ExtendedSeqnum::default(),
stats: Stats {
num_late: 0,
num_lost: 0,
num_duplicates: 0,
num_pushed: 0,
},
}
}
pub fn queue(&mut self, rtp: &RtpPacket, mut pts: u64, now: Instant) -> QueueResult {
// From this point on we always work with extended sequence numbers
let seqnum = self.extended_seqnum.next(rtp.sequence_number());
if let Some(ts) = self.last_input_ts {
pts = pts.max(ts);
}
self.last_input_ts = Some(pts);
self.base_times.get_or_insert_with(|| {
debug!("Selected base times {:?} {}", now, pts);
(now, pts)
});
// Maintain (and trim) our seqnum list for duplicate detection
while self.seqnums.len() >= std::u16::MAX as usize {
debug!("Trimming");
self.seqnums.pop_first();
}
if self.seqnums.contains(&seqnum) {
trace!(
"Duplicated packet {} (extended {})",
rtp.sequence_number(),
seqnum,
);
self.stats.num_duplicates += 1;
return QueueResult::Duplicate;
}
self.seqnums.insert(seqnum);
if let Some(last_output_seqnum) = self.last_output_seqnum {
if last_output_seqnum >= seqnum {
debug!(
"Late packet {} (extended {})",
rtp.sequence_number(),
seqnum
);
self.stats.num_late += 1;
return QueueResult::Late;
}
}
let id = self.packet_counter;
self.packet_counter += 1;
let item = Item {
id,
pts: Some(pts),
seqnum,
};
if !self.items.insert(item) {
unreachable!()
}
trace!("Queued RTP packet with ts {pts}, assigned ID {id}");
QueueResult::Queued(id)
}
pub fn poll(&mut self, now: Instant) -> PollResult {
trace!("Polling at {:?}", now);
let Some((base_instant, base_ts)) = self.base_times else {
return PollResult::Empty;
};
let duration_since_base_instant = now - base_instant;
trace!(
"Duration since base instant {:?}",
duration_since_base_instant
);
let Some(item) = self.items.first() else {
return PollResult::Empty;
};
// If an event / query is at the top of the queue, it can be forwarded immediately
let Some(pts) = item.pts else {
let item = self.items.pop_first().unwrap();
return PollResult::Forward {
id: item.id,
discont: false,
};
};
let ts = pts.checked_sub(base_ts).unwrap();
let deadline = Duration::from_nanos(ts) + self.latency;
trace!(
"Considering packet {} with ts {ts}, deadline is {deadline:?}",
item.id
);
if deadline <= duration_since_base_instant {
debug!("Packet with id {} is ready", item.id);
let discont = match self.last_output_seqnum {
None => true,
Some(last_output_seq_ext) => {
let gap = item.seqnum - last_output_seq_ext;
self.stats.num_lost += gap - 1;
gap != 1
}
};
self.last_output_seqnum = Some(item.seqnum);
// Safe unwrap, we know the queue isn't empty at this point
let packet = self.items.pop_first().unwrap();
self.stats.num_pushed += 1;
PollResult::Forward {
id: packet.id,
discont,
}
} else {
trace!("Packet with id {} is not ready", item.id);
PollResult::Timeout(base_instant + deadline)
}
}
pub fn stats(&self) -> gst::Structure {
self.stats.into()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::rtpbin2::session::tests::generate_rtp_packet;
#[test]
fn empty() {
let mut jb = JitterBuffer::new(Duration::from_secs(1));
let now = Instant::now();
assert_eq!(jb.poll(now), PollResult::Empty);
}
#[test]
fn receive_one_packet_no_latency() {
let mut jb = JitterBuffer::new(Duration::from_secs(0));
let rtp_data = generate_rtp_packet(0x12345678, 0, 0, 4);
let packet = RtpPacket::parse(&rtp_data).unwrap();
let now = Instant::now();
let QueueResult::Queued(id) = jb.queue(&packet, 0, now) else {
unreachable!()
};
assert_eq!(jb.poll(now), PollResult::Forward { id, discont: true });
}
#[test]
fn receive_one_packet_with_latency() {
let mut jb = JitterBuffer::new(Duration::from_secs(1));
let rtp_data = generate_rtp_packet(0x12345678, 0, 0, 4);
let packet = RtpPacket::parse(&rtp_data).unwrap();
let mut now = Instant::now();
let QueueResult::Queued(id) = jb.queue(&packet, 0, now) else {
unreachable!()
};
assert_eq!(
jb.poll(now),
PollResult::Timeout(now + Duration::from_secs(1))
);
now += Duration::from_secs(1);
now -= Duration::from_nanos(1);
assert_eq!(
jb.poll(now),
PollResult::Timeout(now + Duration::from_nanos(1))
);
now += Duration::from_nanos(1);
assert_eq!(jb.poll(now), PollResult::Forward { id, discont: true });
}
#[test]
fn ordered_packets_no_latency() {
let mut jb = JitterBuffer::new(Duration::from_secs(0));
let now = Instant::now();
let rtp_data = generate_rtp_packet(0x12345678, 0, 0, 4);
let packet = RtpPacket::parse(&rtp_data).unwrap();
let QueueResult::Queued(id_first) = jb.queue(&packet, 0, now) else {
unreachable!()
};
let rtp_data = generate_rtp_packet(0x12345678, 1, 0, 4);
let packet = RtpPacket::parse(&rtp_data).unwrap();
let QueueResult::Queued(id_second) = jb.queue(&packet, 0, now) else {
unreachable!()
};
assert_eq!(
jb.poll(now),
PollResult::Forward {
id: id_first,
discont: true
}
);
assert_eq!(
jb.poll(now),
PollResult::Forward {
id: id_second,
discont: false
}
);
}
#[test]
fn ordered_packets_no_latency_with_gap() {
let mut jb = JitterBuffer::new(Duration::from_secs(0));
let now = Instant::now();
let rtp_data = generate_rtp_packet(0x12345678, 0, 0, 4);
let packet = RtpPacket::parse(&rtp_data).unwrap();
let QueueResult::Queued(id_first) = jb.queue(&packet, 0, now) else {
unreachable!()
};
let rtp_data = generate_rtp_packet(0x12345678, 2, 0, 4);
let packet = RtpPacket::parse(&rtp_data).unwrap();
let QueueResult::Queued(id_second) = jb.queue(&packet, 0, now) else {
unreachable!()
};
assert_eq!(
jb.poll(now),
PollResult::Forward {
id: id_first,
discont: true
}
);
assert_eq!(
jb.poll(now),
PollResult::Forward {
id: id_second,
discont: true
}
);
}
#[test]
fn misordered_packets_no_latency() {
let mut jb = JitterBuffer::new(Duration::from_secs(0));
let now = Instant::now();
let rtp_data = generate_rtp_packet(0x12345678, 1, 0, 4);
let packet = RtpPacket::parse(&rtp_data).unwrap();
let QueueResult::Queued(id) = jb.queue(&packet, 0, now) else {
unreachable!()
};
assert_eq!(jb.poll(now), PollResult::Forward { id, discont: true });
let rtp_data = generate_rtp_packet(0x12345678, 0, 0, 4);
let packet = RtpPacket::parse(&rtp_data).unwrap();
assert_eq!(jb.queue(&packet, 0, now), QueueResult::Late);
// Try and push a duplicate
let rtp_data = generate_rtp_packet(0x12345678, 1, 0, 4);
let packet = RtpPacket::parse(&rtp_data).unwrap();
assert_eq!(jb.queue(&packet, 0, now), QueueResult::Duplicate);
// We do accept future sequence numbers up to a distance of at least std::i16::MAX
let rtp_data = generate_rtp_packet(0x12345678, std::i16::MAX as u16 + 1, 0, 4);
let packet = RtpPacket::parse(&rtp_data).unwrap();
let QueueResult::Queued(id) = jb.queue(&packet, 0, now) else {
unreachable!()
};
assert_eq!(jb.poll(now), PollResult::Forward { id, discont: true });
// But no further
let rtp_data = generate_rtp_packet(0x12345678, 2, 0, 4);
let packet = RtpPacket::parse(&rtp_data).unwrap();
assert_eq!(jb.queue(&packet, 0, now), QueueResult::Late);
}
#[test]
fn ordered_packets_with_latency() {
let mut jb = JitterBuffer::new(Duration::from_secs(1));
let mut now = Instant::now();
let rtp_data = generate_rtp_packet(0x12345678, 0, 0, 4);
let packet = RtpPacket::parse(&rtp_data).unwrap();
let QueueResult::Queued(id_first) = jb.queue(&packet, 0, now) else {
unreachable!()
};
assert_eq!(
jb.poll(now),
PollResult::Timeout(now + Duration::from_secs(1))
);
let rtp_data = generate_rtp_packet(0x12345678, 1, 180000, 4);
let packet = RtpPacket::parse(&rtp_data).unwrap();
let QueueResult::Queued(id_second) = jb.queue(&packet, 2_000_000_000, now) else {
unreachable!()
};
assert_eq!(
jb.poll(now),
PollResult::Timeout(now + Duration::from_secs(1))
);
now += Duration::from_secs(1);
assert_eq!(
jb.poll(now),
PollResult::Forward {
id: id_first,
discont: true
}
);
assert_eq!(
jb.poll(now),
PollResult::Timeout(now + Duration::from_secs(2))
);
now += Duration::from_secs(2);
assert_eq!(
jb.poll(now),
PollResult::Forward {
id: id_second,
discont: false
}
);
}
fn assert_stats(
jb: &JitterBuffer,
num_late: u64,
num_lost: u64,
num_duplicates: u64,
num_pushed: u64,
) {
let stats = jb.stats();
assert_eq!(stats.get::<u64>("num-late").unwrap(), num_late);
assert_eq!(stats.get::<u64>("num-lost").unwrap(), num_lost);
assert_eq!(stats.get::<u64>("num-duplicates").unwrap(), num_duplicates);
assert_eq!(stats.get::<u64>("num-pushed").unwrap(), num_pushed);
}
#[test]
fn stats() {
let mut jb = JitterBuffer::new(Duration::from_secs(1));
let mut now = Instant::now();
let rtp_data = generate_rtp_packet(0x12345678, 0, 0, 4);
let packet = RtpPacket::parse(&rtp_data).unwrap();
jb.queue(&packet, 0, now);
assert_stats(&jb, 0, 0, 0, 0);
// At this point pushing the same packet in before it gets output
// results in an increment of the duplicate stat
jb.queue(&packet, 0, now);
assert_stats(&jb, 0, 0, 1, 0);
now += Duration::from_secs(1);
let _ = jb.poll(now);
assert_stats(&jb, 0, 0, 1, 1);
// Pushing it after the first version got output also results in
// an increment of the duplicate stat
jb.queue(&packet, 0, now);
assert_stats(&jb, 0, 0, 2, 1);
// Then after a packet with seqnum 2 goes through, the lost
// stat must be incremented by 1 (as packet with seqnum 1 went missing)
let rtp_data = generate_rtp_packet(0x12345678, 2, 9000, 4);
let packet = RtpPacket::parse(&rtp_data).unwrap();
jb.queue(&packet, 100_000_000, now);
now += Duration::from_millis(100);
let _ = jb.poll(now);
assert_stats(&jb, 0, 1, 2, 2);
// If the packet with seqnum 1 does arrive after that, it should be
// considered both late and lost
let rtp_data = generate_rtp_packet(0x12345678, 1, 4500, 4);
let packet = RtpPacket::parse(&rtp_data).unwrap();
jb.queue(&packet, 50_000_000, now);
let _ = jb.poll(now);
assert_stats(&jb, 1, 1, 2, 2);
// Finally if it arrives again it should be considered a duplicate,
// and will have achieved the dubious honor of simultaneously being
// lost, late and duplicated
jb.queue(&packet, 50_000_000, now);
let _ = jb.poll(now);
assert_stats(&jb, 1, 1, 3, 2);
}
}

View file

@ -4,6 +4,7 @@ use gst::glib;
use gst::prelude::*;
use once_cell::sync::Lazy;
mod imp;
mod jitterbuffer;
mod session;
mod source;
mod time;

View file

@ -1630,7 +1630,7 @@ pub(crate) mod tests {
}
}
fn generate_rtp_packet(ssrc: u32, seq_no: u16, rtp_ts: u32, payload_len: usize) -> Vec<u8> {
pub fn generate_rtp_packet(ssrc: u32, seq_no: u16, rtp_ts: u32, payload_len: usize) -> Vec<u8> {
init_logs();
let mut rtp_data = [0; 1200];
let payload = vec![1; payload_len];

View file

@ -653,6 +653,19 @@ impl RemoteSendSource {
SourceRecvReply::Passthrough
} else if diff < -(DEFAULT_MAX_MISORDER as i32) || diff >= DEFAULT_MAX_DROPOUT as i32 {
debug!("non-consecutive packet outside of configured limits, dropping");
// TODO: we will want to perform a few tasks here that the C jitterbuffer
// used to be taking care of:
//
// - We probably want to operate in the time domain rather than the sequence domain,
// that means implementing a utility similar to RTPPacketRateCtx
// - We should update our late / lost stats when a packet does get ignored
// - We should perform the equivalent of the big gap handling in the C jitterbuffer,
// possibly holding gap packets for a while before deciding that we indeed have an
// actual gap, then propagating a new "resync" receive reply before releasing the
// gap packets in order to let other components (eg jitterbuffer) reset themselves
// when needed.
// FIXME: should be a harder error?
return SourceRecvReply::Ignore;
} else {

View file

@ -306,6 +306,55 @@ macro_rules! define_wrapping_comparable_u32_with_display {
};
}
/// Stores information necessary to compute a series of extended seqnums
#[derive(Default, Debug)]
pub(crate) struct ExtendedSeqnum {
last_ext: Option<u64>,
}
impl ExtendedSeqnum {
/// Produces the next extended timestamp from a new RTP timestamp
pub(crate) fn next(&mut self, rtp_seqnum: u16) -> u64 {
let ext = match self.last_ext {
None => (1u64 << 16) + rtp_seqnum as u64,
Some(last_ext) => {
// pick wraparound counter from previous timestamp and add to new timestamp
let mut ext = rtp_seqnum as u64 + (last_ext & !0xffff);
// check for timestamp wraparound
if ext < last_ext {
let diff = last_ext - ext;
if diff > std::i16::MAX as u64 {
// timestamp went backwards more than allowed, we wrap around and get
// updated extended timestamp.
ext += 1u64 << 16;
}
} else {
let diff = ext - last_ext;
if diff > std::i16::MAX as u64 {
if ext < 1u64 << 16 {
// We can't ever get to such a case as our counter is opaque
unreachable!()
} else {
ext -= 1u64 << 16;
// We don't want the extended timestamp storage to go back, ever
return ext;
}
}
}
ext
}
};
self.last_ext = Some(ext);
ext
}
}
#[cfg(test)]
mod tests {
use super::*;
@ -536,4 +585,75 @@ mod tests {
assert_eq!(try_cmp(0x8000_0000, 0), Err(ComparisonLimit));
assert_eq!(try_cmp(0, 0x8000_0000), Err(ComparisonLimit));
}
#[test]
fn extended_seqnum_basic() {
let mut ext_seq = ExtendedSeqnum::default();
// No wraparound when seqnums are increasing
assert_eq!(ext_seq.next(0), (1 << 16));
assert_eq!(ext_seq.next(10), (1 << 16) + 10);
assert_eq!(ext_seq.next(10), (1 << 16) + 10);
assert_eq!(
ext_seq.next(1 + std::i16::MAX as u16),
(1 << 16) + 1 + std::i16::MAX as u64
);
// Even big bumps under MAXINT16 don't result in wrap-around
ext_seq = ExtendedSeqnum::default();
assert_eq!(ext_seq.next(27500), (1 << 16) + 27500);
assert_eq!(ext_seq.next(24), (1 << 16) + 24);
}
#[test]
fn extended_seqnum_wraparound() {
let mut ext_seq = ExtendedSeqnum::default();
assert_eq!(
ext_seq.next(std::u16::MAX - 9000 + 1),
(1 << 16) + std::u16::MAX as u64 - 9000 + 1
);
assert_eq!(ext_seq.next(0), (1 << 16) + std::u16::MAX as u64 + 1);
assert_eq!(
ext_seq.next(9000),
(1 << 16) + std::u16::MAX as u64 + 1 + 9000
);
}
#[test]
fn extended_seqnum_wraparound_disordered() {
let mut ext_seq = ExtendedSeqnum::default();
assert_eq!(
ext_seq.next(std::u16::MAX - 9000 + 1),
(1 << 16) + std::u16::MAX as u64 - 9000 + 1
);
assert_eq!(ext_seq.next(0), (1 << 16) + std::u16::MAX as u64 + 1);
// Unwrapping around
assert_eq!(
ext_seq.next(std::u16::MAX - 9000 + 1),
(1 << 16) + std::u16::MAX as u64 - 9000 + 1
);
assert_eq!(
ext_seq.next(9000),
(1 << 16) + std::u16::MAX as u64 + 1 + 9000
);
}
#[test]
fn extended_seqnum_wraparound_disordered_backwards() {
let mut ext_seq = ExtendedSeqnum::default();
assert_eq!(ext_seq.next(9000), (1 << 16) + 9000);
// Wraps backwards
assert_eq!(
ext_seq.next(std::u16::MAX - 9000 + 1),
std::u16::MAX as u64 - 9000 + 1
);
// Wraps again forwards
assert_eq!(ext_seq.next(9000), (1 << 16) + 9000);
}
}

View file

@ -70,6 +70,7 @@ fn test_send() {
assert_eq!(rtp.sequence_number(), 501);
let stats = h.element().unwrap().property::<gst::Structure>("stats");
let session_stats = stats.get::<gst::Structure>("0").unwrap();
let source_stats = session_stats
.get::<gst::Structure>(TEST_SSRC.to_string())
@ -143,10 +144,20 @@ fn test_receive() {
assert_eq!(rtp.sequence_number(), 501);
let stats = inner.element().unwrap().property::<gst::Structure>("stats");
let session_stats = stats.get::<gst::Structure>("0").unwrap();
let source_stats = session_stats
.get::<gst::Structure>(TEST_SSRC.to_string())
.unwrap();
let jitterbuffers_stats = session_stats
.get::<gst::List>("jitterbuffer-stats")
.unwrap();
assert_eq!(jitterbuffers_stats.len(), 1);
let jitterbuffer_stats = jitterbuffers_stats
.first()
.unwrap()
.get::<gst::Structure>()
.unwrap();
assert_eq!(source_stats.get::<u32>("ssrc").unwrap(), TEST_SSRC);
assert_eq!(
source_stats.get::<u32>("clock-rate").unwrap(),
@ -156,4 +167,13 @@ fn test_receive() {
assert!(!source_stats.get::<bool>("local").unwrap());
assert_eq!(source_stats.get::<u64>("packets-received").unwrap(), 2);
assert_eq!(source_stats.get::<u64>("octets-received").unwrap(), 20);
assert_eq!(jitterbuffer_stats.get::<u64>("num-late").unwrap(), 0);
assert_eq!(jitterbuffer_stats.get::<u64>("num-lost").unwrap(), 0);
assert_eq!(jitterbuffer_stats.get::<u64>("num-duplicates").unwrap(), 0);
assert_eq!(jitterbuffer_stats.get::<u64>("num-pushed").unwrap(), 2);
assert_eq!(jitterbuffer_stats.get::<i32>("pt").unwrap(), TEST_PT as i32);
assert_eq!(
jitterbuffer_stats.get::<i32>("ssrc").unwrap(),
TEST_SSRC as i32
);
}