WIP: thread-sharing jitterbuffer

Actual thread-sharing will follow!
This commit is contained in:
Mathieu Duponchelle 2019-08-05 15:19:06 +02:00 committed by Mathieu Duponchelle
parent 120481269b
commit b7e55836c1
9 changed files with 3827 additions and 4 deletions

View file

@ -18,6 +18,7 @@ gstreamer = { git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", fea
gstreamer-app = { git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
gstreamer-check = { git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
gstreamer-net = { git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
gstreamer-rtp = { git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
tokio = "0.1"
tokio-reactor = "0.1"
@ -52,3 +53,5 @@ path = "examples/tcpclientsrc_benchmark_sender.rs"
[build-dependencies]
gst-plugin-version-helper = { path="../gst-plugin-version-helper" }
cc = "1.0.38"
pkg-config = "0.3.15"

View file

@ -1,5 +1,31 @@
extern crate cc;
extern crate gst_plugin_version_helper;
extern crate pkg_config;
fn main() {
let gstreamer = pkg_config::probe_library("gstreamer-1.0").unwrap();
let gstrtp = pkg_config::probe_library("gstreamer-rtp-1.0").unwrap();
let includes = [gstreamer.include_paths, gstrtp.include_paths];
let mut build = cc::Build::new();
for p in includes.iter().flat_map(|i| i) {
build.include(p);
}
build.file("src/rtpjitterbuffer.c");
build.file("src/rtpstats.c");
build.compile("libthreadshare-c.a");
println!("cargo:rustc-link-lib=dylib=gstrtp-1.0");
for path in gstrtp.link_paths.iter() {
println!(
"cargo:rustc-link-search=native={}",
path.to_str().expect("library path doesn't exist")
);
}
gst_plugin_version_helper::get_info()
}

File diff suppressed because it is too large Load diff

View file

@ -30,6 +30,7 @@ extern crate glib;
#[macro_use]
extern crate gstreamer as gst;
extern crate gstreamer_net as gst_net;
extern crate gstreamer_rtp as gst_rtp;
extern crate futures;
extern crate tokio;
@ -58,15 +59,23 @@ mod udpsrc;
mod appsrc;
mod dataqueue;
mod jitterbuffer;
mod proxy;
mod queue;
use glib::translate::*;
use glib::ObjectExt;
use gst::MiniObject;
use std::mem;
use std::ptr;
fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
udpsrc::register(plugin)?;
tcpclientsrc::register(plugin)?;
queue::register(plugin)?;
proxy::register(plugin)?;
appsrc::register(plugin)?;
jitterbuffer::register(plugin)?;
Ok(())
}
@ -88,8 +97,6 @@ pub fn set_element_flags<T: glib::IsA<gst::Object> + glib::IsA<gst::Element>>(
flags: gst::ElementFlags,
) {
unsafe {
use glib::translate::ToGlib;
let ptr: *mut gst_ffi::GstObject = element.as_ptr() as *mut _;
let _guard = MutexGuard::lock(&(*ptr).lock);
(*ptr).flags |= flags.to_glib();
@ -100,7 +107,6 @@ struct MutexGuard<'a>(&'a glib_ffi::GMutex);
impl<'a> MutexGuard<'a> {
pub fn lock(mutex: &'a glib_ffi::GMutex) -> Self {
use glib::translate::mut_override;
unsafe {
glib_ffi::g_mutex_lock(mut_override(mutex));
}
@ -110,9 +116,389 @@ impl<'a> MutexGuard<'a> {
impl<'a> Drop for MutexGuard<'a> {
fn drop(&mut self) {
use glib::translate::mut_override;
unsafe {
glib_ffi::g_mutex_unlock(mut_override(self.0));
}
}
}
pub mod ffi {
use glib_ffi::{gboolean, gpointer, GList, GType};
use gst_ffi::GstClockTime;
use libc::{c_int, c_uint, c_ulonglong, c_ushort, c_void};
#[repr(C)]
#[derive(Copy, Clone)]
pub struct RTPJitterBufferItem {
pub data: gpointer,
pub next: *mut GList,
pub prev: *mut GList,
pub r#type: c_uint,
pub dts: GstClockTime,
pub pts: GstClockTime,
pub seqnum: c_uint,
pub count: c_uint,
pub rtptime: c_uint,
}
#[repr(C)]
pub struct RTPJitterBuffer(c_void);
#[repr(C)]
#[derive(Copy, Clone)]
pub struct RTPPacketRateCtx {
probed: gboolean,
clock_rate: c_int,
last_seqnum: c_ushort,
last_ts: c_ulonglong,
avg_packet_rate: c_uint,
}
pub type RTPJitterBufferMode = c_int;
pub const RTP_JITTER_BUFFER_MODE_NONE: RTPJitterBufferMode = 0;
pub const RTP_JITTER_BUFFER_MODE_SLAVE: RTPJitterBufferMode = 1;
pub const RTP_JITTER_BUFFER_MODE_BUFFER: RTPJitterBufferMode = 2;
pub const RTP_JITTER_BUFFER_MODE_SYNCED: RTPJitterBufferMode = 4;
extern "C" {
pub fn rtp_jitter_buffer_new() -> *mut RTPJitterBuffer;
pub fn rtp_jitter_buffer_get_type() -> GType;
pub fn rtp_jitter_buffer_get_mode(jbuf: *mut RTPJitterBuffer) -> RTPJitterBufferMode;
pub fn rtp_jitter_buffer_set_mode(jbuf: *mut RTPJitterBuffer, mode: RTPJitterBufferMode);
pub fn rtp_jitter_buffer_get_delay(jbuf: *mut RTPJitterBuffer) -> GstClockTime;
pub fn rtp_jitter_buffer_set_delay(jbuf: *mut RTPJitterBuffer, delay: GstClockTime);
pub fn rtp_jitter_buffer_set_clock_rate(jbuf: *mut RTPJitterBuffer, clock_rate: c_uint);
pub fn rtp_jitter_buffer_get_clock_rate(jbuf: *mut RTPJitterBuffer) -> c_uint;
pub fn rtp_jitter_buffer_reset_skew(jbuf: *mut RTPJitterBuffer);
pub fn rtp_jitter_buffer_flush(jbuf: *mut RTPJitterBuffer, free_func: glib_ffi::GFunc);
pub fn rtp_jitter_buffer_find_earliest(
jbuf: *mut RTPJitterBuffer,
pts: *mut GstClockTime,
seqnum: *mut c_uint,
);
pub fn rtp_jitter_buffer_calculate_pts(
jbuf: *mut RTPJitterBuffer,
dts: GstClockTime,
estimated_dts: gboolean,
rtptime: c_uint,
base_time: GstClockTime,
gap: c_int,
is_rtx: gboolean,
) -> GstClockTime;
pub fn rtp_jitter_buffer_insert(
jbuf: *mut RTPJitterBuffer,
item: *mut RTPJitterBufferItem,
head: *mut gboolean,
percent: *mut c_int,
) -> gboolean;
pub fn rtp_jitter_buffer_pop(
jbuf: *mut RTPJitterBuffer,
percent: *mut c_int,
) -> *mut RTPJitterBufferItem;
pub fn rtp_jitter_buffer_peek(jbuf: *mut RTPJitterBuffer) -> *mut RTPJitterBufferItem;
pub fn gst_rtp_packet_rate_ctx_reset(ctx: *mut RTPPacketRateCtx, clock_rate: c_int);
pub fn gst_rtp_packet_rate_ctx_update(
ctx: *mut RTPPacketRateCtx,
seqnum: c_ushort,
ts: c_uint,
) -> c_uint;
pub fn gst_rtp_packet_rate_ctx_get_max_dropout(
ctx: *mut RTPPacketRateCtx,
time_ms: c_int,
) -> c_uint;
pub fn gst_rtp_packet_rate_ctx_get_max_disorder(
ctx: *mut RTPPacketRateCtx,
time_ms: c_int,
) -> c_uint;
}
}
glib_wrapper! {
pub struct RTPJitterBuffer(Object<ffi::RTPJitterBuffer, RTPJitterBufferClass>);
match fn {
get_type => || ffi::rtp_jitter_buffer_get_type(),
}
}
unsafe impl glib::SendUnique for RTPJitterBuffer {
fn is_unique(&self) -> bool {
self.ref_count() == 1
}
}
impl ToGlib for RTPJitterBufferMode {
type GlibType = ffi::RTPJitterBufferMode;
fn to_glib(&self) -> ffi::RTPJitterBufferMode {
match *self {
RTPJitterBufferMode::None => ffi::RTP_JITTER_BUFFER_MODE_NONE,
RTPJitterBufferMode::Slave => ffi::RTP_JITTER_BUFFER_MODE_SLAVE,
RTPJitterBufferMode::Buffer => ffi::RTP_JITTER_BUFFER_MODE_BUFFER,
RTPJitterBufferMode::Synced => ffi::RTP_JITTER_BUFFER_MODE_SYNCED,
RTPJitterBufferMode::__Unknown(value) => value,
}
}
}
impl FromGlib<ffi::RTPJitterBufferMode> for RTPJitterBufferMode {
fn from_glib(value: ffi::RTPJitterBufferMode) -> Self {
match value {
0 => RTPJitterBufferMode::None,
1 => RTPJitterBufferMode::Slave,
2 => RTPJitterBufferMode::Buffer,
4 => RTPJitterBufferMode::Synced,
value => RTPJitterBufferMode::__Unknown(value),
}
}
}
pub struct RTPJitterBufferItem(Option<Box<ffi::RTPJitterBufferItem>>);
unsafe impl Send for RTPJitterBufferItem {}
impl RTPJitterBufferItem {
pub fn new(
buffer: gst::Buffer,
dts: gst::ClockTime,
pts: gst::ClockTime,
seqnum: u32,
rtptime: u32,
) -> RTPJitterBufferItem {
unsafe {
RTPJitterBufferItem(Some(Box::new(ffi::RTPJitterBufferItem {
data: buffer.into_ptr() as *mut _,
next: ptr::null_mut(),
prev: ptr::null_mut(),
r#type: 0,
dts: dts.to_glib(),
pts: pts.to_glib(),
seqnum: seqnum,
count: 1,
rtptime: rtptime,
})))
}
}
pub fn get_buffer(&self) -> &mut gst::BufferRef {
unsafe {
let item = self.0.as_ref().expect("Invalid wrapper");
let buf = item.data as *mut gst_ffi::GstBuffer;
gst::BufferRef::from_mut_ptr(buf)
}
}
pub fn take_buffer(&mut self) -> gst::Buffer {
unsafe {
let item = self.0.take().expect("Invalid wrapper");
let buf = item.data as *mut gst_ffi::GstBuffer;
from_glib_none(buf)
}
}
pub fn get_dts(&self) -> gst::ClockTime {
let item = self.0.as_ref().expect("Invalid wrapper");
if item.dts == gst_ffi::GST_CLOCK_TIME_NONE {
gst::CLOCK_TIME_NONE
} else {
gst::ClockTime(Some(item.dts))
}
}
pub fn get_pts(&self) -> gst::ClockTime {
let item = self.0.as_ref().expect("Invalid wrapper");
if item.pts == gst_ffi::GST_CLOCK_TIME_NONE {
gst::CLOCK_TIME_NONE
} else {
gst::ClockTime(Some(item.pts))
}
}
pub fn get_seqnum(&self) -> u32 {
let item = self.0.as_ref().expect("Invalid wrapper");
item.seqnum
}
pub fn get_rtptime(&self) -> u32 {
let item = self.0.as_ref().expect("Invalid wrapper");
item.rtptime
}
}
impl Drop for RTPJitterBufferItem {
fn drop(&mut self) {
unsafe {
if let Some(ref item) = self.0 {
eprintln!("Unreffing buffer!");
gst_ffi::gst_mini_object_unref(item.data as *mut _)
}
}
}
}
pub struct RTPPacketRateCtx(Box<ffi::RTPPacketRateCtx>);
unsafe impl Send for RTPPacketRateCtx {}
impl RTPPacketRateCtx {
pub fn new() -> RTPPacketRateCtx {
unsafe {
let mut ptr: ffi::RTPPacketRateCtx = std::mem::uninitialized();
ffi::gst_rtp_packet_rate_ctx_reset(&mut ptr, -1);
RTPPacketRateCtx(Box::new(ptr))
}
}
pub fn reset(&mut self, clock_rate: i32) {
unsafe { ffi::gst_rtp_packet_rate_ctx_reset(&mut *self.0, clock_rate) }
}
pub fn update(&mut self, seqnum: u16, ts: u32) -> u32 {
unsafe { ffi::gst_rtp_packet_rate_ctx_update(&mut *self.0, seqnum, ts) }
}
pub fn get_max_dropout(&mut self, time_ms: i32) -> u32 {
unsafe { ffi::gst_rtp_packet_rate_ctx_get_max_dropout(&mut *self.0, time_ms) }
}
pub fn get_max_disorder(&mut self, time_ms: i32) -> u32 {
unsafe { ffi::gst_rtp_packet_rate_ctx_get_max_disorder(&mut *self.0, time_ms) }
}
}
#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Clone, Copy)]
pub enum RTPJitterBufferMode {
r#None,
Slave,
Buffer,
Synced,
__Unknown(i32),
}
impl RTPJitterBuffer {
pub fn new() -> RTPJitterBuffer {
unsafe { from_glib_full(ffi::rtp_jitter_buffer_new()) }
}
pub fn get_mode(&self) -> RTPJitterBufferMode {
unsafe { from_glib(ffi::rtp_jitter_buffer_get_mode(self.to_glib_none().0)) }
}
pub fn set_mode(&self, mode: RTPJitterBufferMode) {
unsafe { ffi::rtp_jitter_buffer_set_mode(self.to_glib_none().0, mode.to_glib()) }
}
pub fn get_delay(&self) -> gst::ClockTime {
unsafe { from_glib(ffi::rtp_jitter_buffer_get_delay(self.to_glib_none().0)) }
}
pub fn set_delay(&self, delay: gst::ClockTime) {
unsafe { ffi::rtp_jitter_buffer_set_delay(self.to_glib_none().0, delay.to_glib()) }
}
pub fn set_clock_rate(&self, clock_rate: u32) {
unsafe { ffi::rtp_jitter_buffer_set_clock_rate(self.to_glib_none().0, clock_rate) }
}
pub fn get_clock_rate(&self) -> u32 {
unsafe { ffi::rtp_jitter_buffer_get_clock_rate(self.to_glib_none().0) }
}
pub fn calculate_pts(
&self,
dts: gst::ClockTime,
estimated_dts: bool,
rtptime: u32,
base_time: gst::ClockTime,
gap: i32,
is_rtx: bool,
) -> gst::ClockTime {
unsafe {
let pts = ffi::rtp_jitter_buffer_calculate_pts(
self.to_glib_none().0,
dts.to_glib(),
estimated_dts.to_glib(),
rtptime,
base_time.to_glib(),
gap,
is_rtx.to_glib(),
);
if pts == gst_ffi::GST_CLOCK_TIME_NONE {
gst::CLOCK_TIME_NONE
} else {
pts.into()
}
}
}
pub fn insert(&self, mut item: RTPJitterBufferItem) -> (bool, bool, i32) {
unsafe {
let mut head = mem::uninitialized();
let mut percent = mem::uninitialized();
let box_ = item.0.take().expect("Invalid wrapper");
let ptr = Box::into_raw(box_);
let ret: bool = from_glib(ffi::rtp_jitter_buffer_insert(
self.to_glib_none().0,
ptr,
&mut head,
&mut percent,
));
if !ret {
item.0 = Some(Box::from_raw(ptr));
}
(ret, from_glib(head), percent)
}
}
pub fn find_earliest(&self) -> (gst::ClockTime, u32) {
unsafe {
let mut pts = mem::uninitialized();
let mut seqnum = mem::uninitialized();
ffi::rtp_jitter_buffer_find_earliest(self.to_glib_none().0, &mut pts, &mut seqnum);
if pts == gst_ffi::GST_CLOCK_TIME_NONE {
(gst::CLOCK_TIME_NONE, seqnum)
} else {
(pts.into(), seqnum)
}
}
}
pub fn pop(&self) -> (RTPJitterBufferItem, i32) {
unsafe {
let mut percent = mem::uninitialized();
let item = ffi::rtp_jitter_buffer_pop(self.to_glib_none().0, &mut percent);
(RTPJitterBufferItem(Some(Box::from_raw(item))), percent)
}
}
pub fn peek(&self) -> (gst::ClockTime, u32) {
unsafe {
let item = ffi::rtp_jitter_buffer_peek(self.to_glib_none().0);
if item.is_null() {
(gst::CLOCK_TIME_NONE, std::u32::MAX)
} else {
((*item).pts.into(), (*item).seqnum)
}
}
}
pub fn flush(&self) {
unsafe extern "C" fn free_item(item: glib_ffi::gpointer, _: glib_ffi::gpointer) {
let _ = RTPJitterBufferItem(Some(Box::from_raw(item as *mut _)));
}
unsafe {
ffi::rtp_jitter_buffer_flush(self.to_glib_none().0, Some(free_item));
}
}
pub fn reset_skew(&self) {
unsafe { ffi::rtp_jitter_buffer_reset_skew(self.to_glib_none().0) }
}
}

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,201 @@
/* GStreamer
* Copyright (C) <2007> Wim Taymans <wim.taymans@gmail.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
#ifndef __RTP_JITTER_BUFFER_H__
#define __RTP_JITTER_BUFFER_H__
#include <gst/gst.h>
#include <gst/rtp/gstrtcpbuffer.h>
typedef struct _RTPJitterBuffer RTPJitterBuffer;
typedef struct _RTPJitterBufferClass RTPJitterBufferClass;
typedef struct _RTPJitterBufferItem RTPJitterBufferItem;
#define RTP_TYPE_JITTER_BUFFER (rtp_jitter_buffer_get_type())
#define RTP_JITTER_BUFFER(src) (G_TYPE_CHECK_INSTANCE_CAST((src),RTP_TYPE_JITTER_BUFFER,RTPJitterBuffer))
#define RTP_JITTER_BUFFER_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST((klass),RTP_TYPE_JITTER_BUFFER,RTPJitterBufferClass))
#define RTP_IS_JITTER_BUFFER(src) (G_TYPE_CHECK_INSTANCE_TYPE((src),RTP_TYPE_JITTER_BUFFER))
#define RTP_IS_JITTER_BUFFER_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE((klass),RTP_TYPE_JITTER_BUFFER))
#define RTP_JITTER_BUFFER_CAST(src) ((RTPJitterBuffer *)(src))
/**
* RTPJitterBufferMode:
* @RTP_JITTER_BUFFER_MODE_NONE: don't do any skew correction, outgoing
* timestamps are calculated directly from the RTP timestamps. This mode is
* good for recording but not for real-time applications.
* @RTP_JITTER_BUFFER_MODE_SLAVE: calculate the skew between sender and receiver
* and produce smoothed adjusted outgoing timestamps. This mode is good for
* low latency communications.
* @RTP_JITTER_BUFFER_MODE_BUFFER: buffer packets between low/high watermarks.
* This mode is good for streaming communication.
* @RTP_JITTER_BUFFER_MODE_SYNCED: sender and receiver clocks are synchronized,
* like #RTP_JITTER_BUFFER_MODE_SLAVE but skew is assumed to be 0. Good for
* low latency communication when sender and receiver clocks are
* synchronized and there is thus no clock skew.
* @RTP_JITTER_BUFFER_MODE_LAST: last buffer mode.
*
* The different buffer modes for a jitterbuffer.
*/
typedef enum {
RTP_JITTER_BUFFER_MODE_NONE = 0,
RTP_JITTER_BUFFER_MODE_SLAVE = 1,
RTP_JITTER_BUFFER_MODE_BUFFER = 2,
/* FIXME 3 is missing because it was used for 'auto' in jitterbuffer */
RTP_JITTER_BUFFER_MODE_SYNCED = 4,
RTP_JITTER_BUFFER_MODE_LAST
} RTPJitterBufferMode;
#define RTP_TYPE_JITTER_BUFFER_MODE (rtp_jitter_buffer_mode_get_type())
GType rtp_jitter_buffer_mode_get_type (void);
#define RTP_JITTER_BUFFER_MAX_WINDOW 512
/**
* RTPJitterBuffer:
*
* A JitterBuffer in the #RTPSession
*/
struct _RTPJitterBuffer {
GObject object;
GQueue *packets;
RTPJitterBufferMode mode;
GstClockTime delay;
/* for buffering */
gboolean buffering;
guint64 low_level;
guint64 high_level;
/* for calculating skew */
gboolean need_resync;
GstClockTime base_time;
GstClockTime base_rtptime;
GstClockTime media_clock_base_time;
guint32 clock_rate;
GstClockTime base_extrtp;
GstClockTime prev_out_time;
guint64 ext_rtptime;
guint64 last_rtptime;
gint64 window[RTP_JITTER_BUFFER_MAX_WINDOW];
guint window_pos;
guint window_size;
gboolean window_filling;
gint64 window_min;
gint64 skew;
gint64 prev_send_diff;
gboolean buffering_disabled;
GMutex clock_lock;
GstClock *pipeline_clock;
GstClock *media_clock;
gulong media_clock_synced_id;
guint64 media_clock_offset;
gboolean rfc7273_sync;
};
struct _RTPJitterBufferClass {
GObjectClass parent_class;
};
/**
* RTPJitterBufferItem:
* @data: the data of the item
* @next: pointer to next item
* @prev: pointer to previous item
* @type: the type of @data, used freely by caller
* @dts: input DTS
* @pts: output PTS
* @seqnum: seqnum, the seqnum is used to insert the item in the
* right position in the jitterbuffer and detect duplicates. Use -1 to
* append.
* @count: amount of seqnum in this item
* @rtptime: rtp timestamp
*
* An object containing an RTP packet or event.
*/
struct _RTPJitterBufferItem {
gpointer data;
GList *next;
GList *prev;
guint type;
GstClockTime dts;
GstClockTime pts;
guint seqnum;
guint count;
guint rtptime;
};
GType rtp_jitter_buffer_get_type (void);
/* managing lifetime */
RTPJitterBuffer* rtp_jitter_buffer_new (void);
RTPJitterBufferMode rtp_jitter_buffer_get_mode (RTPJitterBuffer *jbuf);
void rtp_jitter_buffer_set_mode (RTPJitterBuffer *jbuf, RTPJitterBufferMode mode);
GstClockTime rtp_jitter_buffer_get_delay (RTPJitterBuffer *jbuf);
void rtp_jitter_buffer_set_delay (RTPJitterBuffer *jbuf, GstClockTime delay);
void rtp_jitter_buffer_set_clock_rate (RTPJitterBuffer *jbuf, guint32 clock_rate);
guint32 rtp_jitter_buffer_get_clock_rate (RTPJitterBuffer *jbuf);
void rtp_jitter_buffer_set_media_clock (RTPJitterBuffer *jbuf, GstClock * clock, guint64 clock_offset);
void rtp_jitter_buffer_set_pipeline_clock (RTPJitterBuffer *jbuf, GstClock * clock);
gboolean rtp_jitter_buffer_get_rfc7273_sync (RTPJitterBuffer *jbuf);
void rtp_jitter_buffer_set_rfc7273_sync (RTPJitterBuffer *jbuf, gboolean rfc7273_sync);
void rtp_jitter_buffer_reset_skew (RTPJitterBuffer *jbuf);
gboolean rtp_jitter_buffer_insert (RTPJitterBuffer *jbuf,
RTPJitterBufferItem *item,
gboolean *head, gint *percent);
void rtp_jitter_buffer_disable_buffering (RTPJitterBuffer *jbuf, gboolean disabled);
RTPJitterBufferItem * rtp_jitter_buffer_peek (RTPJitterBuffer *jbuf);
RTPJitterBufferItem * rtp_jitter_buffer_pop (RTPJitterBuffer *jbuf, gint *percent);
void rtp_jitter_buffer_flush (RTPJitterBuffer *jbuf,
GFunc free_func, gpointer user_data);
gboolean rtp_jitter_buffer_is_buffering (RTPJitterBuffer * jbuf);
void rtp_jitter_buffer_set_buffering (RTPJitterBuffer * jbuf, gboolean buffering);
gint rtp_jitter_buffer_get_percent (RTPJitterBuffer * jbuf);
guint rtp_jitter_buffer_num_packets (RTPJitterBuffer *jbuf);
guint32 rtp_jitter_buffer_get_ts_diff (RTPJitterBuffer *jbuf);
void rtp_jitter_buffer_get_sync (RTPJitterBuffer *jbuf, guint64 *rtptime,
guint64 *timestamp, guint32 *clock_rate,
guint64 *last_rtptime);
GstClockTime rtp_jitter_buffer_calculate_pts (RTPJitterBuffer * jbuf, GstClockTime dts, gboolean estimated_dts,
guint32 rtptime, GstClockTime base_time, gint gap,
gboolean is_rtx);
gboolean rtp_jitter_buffer_can_fast_start (RTPJitterBuffer * jbuf, gint num_packet);
gboolean rtp_jitter_buffer_is_full (RTPJitterBuffer * jbuf);
void rtp_jitter_buffer_find_earliest (RTPJitterBuffer * jbuf, GstClockTime *pts, guint * seqnum);
#endif /* __RTP_JITTER_BUFFER_H__ */

View file

@ -0,0 +1,429 @@
/* GStreamer
* Copyright (C) <2007> Wim Taymans <wim.taymans@gmail.com>
* Copyright (C) 2015 Kurento (http://kurento.org/)
* @author: Miguel París <mparisdiaz@gmail.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
#include "rtpstats.h"
void
gst_rtp_packet_rate_ctx_reset (RTPPacketRateCtx * ctx, gint32 clock_rate)
{
ctx->clock_rate = clock_rate;
ctx->probed = FALSE;
ctx->avg_packet_rate = -1;
ctx->last_ts = -1;
}
guint32
gst_rtp_packet_rate_ctx_update (RTPPacketRateCtx * ctx, guint16 seqnum,
guint32 ts)
{
guint64 new_ts, diff_ts;
gint diff_seqnum;
gint32 new_packet_rate;
if (ctx->clock_rate <= 0) {
return ctx->avg_packet_rate;
}
new_ts = ctx->last_ts;
gst_rtp_buffer_ext_timestamp (&new_ts, ts);
if (!ctx->probed) {
ctx->probed = TRUE;
goto done;
}
diff_seqnum = gst_rtp_buffer_compare_seqnum (ctx->last_seqnum, seqnum);
if (diff_seqnum <= 0 || new_ts <= ctx->last_ts || diff_seqnum > 1) {
goto done;
}
diff_ts = new_ts - ctx->last_ts;
diff_ts = gst_util_uint64_scale_int (diff_ts, GST_SECOND, ctx->clock_rate);
new_packet_rate = gst_util_uint64_scale (diff_seqnum, GST_SECOND, diff_ts);
/* The goal is that higher packet rates "win".
* If there's a sudden burst, the average will go up fast,
* but it will go down again slowly.
* This is useful for bursty cases, where a lot of packets are close
* to each other and should allow a higher reorder/dropout there.
* Round up the new average.
*/
if ((gint32) ctx->avg_packet_rate > new_packet_rate) {
ctx->avg_packet_rate = (7 * ctx->avg_packet_rate + new_packet_rate + 7) / 8;
} else {
ctx->avg_packet_rate = (ctx->avg_packet_rate + new_packet_rate + 1) / 2;
}
done:
ctx->last_seqnum = seqnum;
ctx->last_ts = new_ts;
return ctx->avg_packet_rate;
}
guint32
gst_rtp_packet_rate_ctx_get (RTPPacketRateCtx * ctx)
{
return ctx->avg_packet_rate;
}
guint32
gst_rtp_packet_rate_ctx_get_max_dropout (RTPPacketRateCtx * ctx, gint32 time_ms)
{
if (time_ms <= 0 || !ctx->probed || ctx->avg_packet_rate == G_MAXUINT32) {
return RTP_DEF_DROPOUT;
}
return MAX (RTP_MIN_DROPOUT, ctx->avg_packet_rate * time_ms / 1000);
}
guint32
gst_rtp_packet_rate_ctx_get_max_misorder (RTPPacketRateCtx * ctx,
gint32 time_ms)
{
if (time_ms <= 0 || !ctx->probed || ctx->avg_packet_rate == G_MAXUINT32) {
return RTP_DEF_MISORDER;
}
return MAX (RTP_MIN_MISORDER, ctx->avg_packet_rate * time_ms / 1000);
}
/**
* rtp_stats_init_defaults:
* @stats: an #RTPSessionStats struct
*
* Initialize @stats with its default values.
*/
void
rtp_stats_init_defaults (RTPSessionStats * stats)
{
rtp_stats_set_bandwidths (stats, -1, -1, -1, -1);
stats->min_interval = RTP_STATS_MIN_INTERVAL;
stats->bye_timeout = RTP_STATS_BYE_TIMEOUT;
stats->nacks_dropped = 0;
stats->nacks_sent = 0;
stats->nacks_received = 0;
}
/**
* rtp_stats_set_bandwidths:
* @stats: an #RTPSessionStats struct
* @rtp_bw: RTP bandwidth
* @rtcp_bw: RTCP bandwidth
* @rs: sender RTCP bandwidth
* @rr: receiver RTCP bandwidth
*
* Configure the bandwidth parameters in the stats. When an input variable is
* set to -1, it will be calculated from the other input variables and from the
* defaults.
*/
void
rtp_stats_set_bandwidths (RTPSessionStats * stats, guint rtp_bw,
gdouble rtcp_bw, guint rs, guint rr)
{
GST_DEBUG ("recalc bandwidths: RTP %u, RTCP %f, RS %u, RR %u", rtp_bw,
rtcp_bw, rs, rr);
/* when given, sender and receive bandwidth add up to the total
* rtcp bandwidth */
if (rs != G_MAXUINT && rr != G_MAXUINT)
rtcp_bw = rs + rr;
/* If rtcp_bw is between 0 and 1, it is a fraction of rtp_bw */
if (rtcp_bw > 0.0 && rtcp_bw < 1.0) {
if (rtp_bw > 0.0)
rtcp_bw = rtp_bw * rtcp_bw;
else
rtcp_bw = -1.0;
}
/* RTCP is 5% of the RTP bandwidth */
if (rtp_bw == G_MAXUINT && rtcp_bw > 1.0)
rtp_bw = rtcp_bw * 20;
else if (rtp_bw != G_MAXUINT && rtcp_bw < 0.0)
rtcp_bw = rtp_bw / 20;
else if (rtp_bw == G_MAXUINT && rtcp_bw < 0.0) {
/* nothing given, take defaults */
rtp_bw = RTP_STATS_BANDWIDTH;
rtcp_bw = rtp_bw * RTP_STATS_RTCP_FRACTION;
}
stats->bandwidth = rtp_bw;
stats->rtcp_bandwidth = rtcp_bw;
/* now figure out the fractions */
if (rs == G_MAXUINT) {
/* rs unknown */
if (rr == G_MAXUINT) {
/* both not given, use defaults */
rs = stats->rtcp_bandwidth * RTP_STATS_SENDER_FRACTION;
rr = stats->rtcp_bandwidth * RTP_STATS_RECEIVER_FRACTION;
} else {
/* rr known, calculate rs */
if (stats->rtcp_bandwidth > rr)
rs = stats->rtcp_bandwidth - rr;
else
rs = 0;
}
} else if (rr == G_MAXUINT) {
/* rs known, calculate rr */
if (stats->rtcp_bandwidth > rs)
rr = stats->rtcp_bandwidth - rs;
else
rr = 0;
}
if (stats->rtcp_bandwidth > 0) {
stats->sender_fraction = ((gdouble) rs) / ((gdouble) stats->rtcp_bandwidth);
stats->receiver_fraction = 1.0 - stats->sender_fraction;
} else {
/* no RTCP bandwidth, set dummy values */
stats->sender_fraction = 0.0;
stats->receiver_fraction = 0.0;
}
GST_DEBUG ("bandwidths: RTP %u, RTCP %u, RS %f, RR %f", stats->bandwidth,
stats->rtcp_bandwidth, stats->sender_fraction, stats->receiver_fraction);
}
/**
* rtp_stats_calculate_rtcp_interval:
* @stats: an #RTPSessionStats struct
* @sender: if we are a sender
* @profile: RTP profile of this session
* @ptp: if this session is a point-to-point session
* @first: if this is the first time
*
* Calculate the RTCP interval. The result of this function is the amount of
* time to wait (in nanoseconds) before sending a new RTCP message.
*
* Returns: the RTCP interval.
*/
GstClockTime
rtp_stats_calculate_rtcp_interval (RTPSessionStats * stats, gboolean we_send,
GstRTPProfile profile, gboolean ptp, gboolean first)
{
gdouble members, senders, n;
gdouble avg_rtcp_size, rtcp_bw;
gdouble interval;
gdouble rtcp_min_time;
if (profile == GST_RTP_PROFILE_AVPF || profile == GST_RTP_PROFILE_SAVPF) {
/* RFC 4585 3.4d), 3.5.1 */
if (first && !ptp)
rtcp_min_time = 1.0;
else
rtcp_min_time = 0.0;
} else {
/* Very first call at application start-up uses half the min
* delay for quicker notification while still allowing some time
* before reporting for randomization and to learn about other
* sources so the report interval will converge to the correct
* interval more quickly.
*/
rtcp_min_time = stats->min_interval;
if (first)
rtcp_min_time /= 2.0;
}
/* Dedicate a fraction of the RTCP bandwidth to senders unless
* the number of senders is large enough that their share is
* more than that fraction.
*/
n = members = stats->active_sources;
senders = (gdouble) stats->sender_sources;
rtcp_bw = stats->rtcp_bandwidth;
if (senders <= members * stats->sender_fraction) {
if (we_send) {
rtcp_bw *= stats->sender_fraction;
n = senders;
} else {
rtcp_bw *= stats->receiver_fraction;
n -= senders;
}
}
/* no bandwidth for RTCP, return NONE to signal that we don't want to send
* RTCP packets */
if (rtcp_bw <= 0.0001)
return GST_CLOCK_TIME_NONE;
avg_rtcp_size = 8.0 * stats->avg_rtcp_packet_size;
/*
* The effective number of sites times the average packet size is
* the total number of octets sent when each site sends a report.
* Dividing this by the effective bandwidth gives the time
* interval over which those packets must be sent in order to
* meet the bandwidth target, with a minimum enforced. In that
* time interval we send one report so this time is also our
* average time between reports.
*/
GST_DEBUG ("avg size %f, n %f, rtcp_bw %f", avg_rtcp_size, n, rtcp_bw);
interval = avg_rtcp_size * n / rtcp_bw;
if (interval < rtcp_min_time)
interval = rtcp_min_time;
return interval * GST_SECOND;
}
/**
* rtp_stats_add_rtcp_jitter:
* @stats: an #RTPSessionStats struct
* @interval: an RTCP interval
*
* Apply a random jitter to the @interval. @interval is typically obtained with
* rtp_stats_calculate_rtcp_interval().
*
* Returns: the new RTCP interval.
*/
GstClockTime
rtp_stats_add_rtcp_jitter (RTPSessionStats * stats G_GNUC_UNUSED, GstClockTime interval)
{
gdouble temp;
/* see RFC 3550 p 30
* To compensate for "unconditional reconsideration" converging to a
* value below the intended average.
*/
#define COMPENSATION (2.71828 - 1.5);
temp = (interval * g_random_double_range (0.5, 1.5)) / COMPENSATION;
return (GstClockTime) temp;
}
/**
* rtp_stats_calculate_bye_interval:
* @stats: an #RTPSessionStats struct
*
* Calculate the BYE interval. The result of this function is the amount of
* time to wait (in nanoseconds) before sending a BYE message.
*
* Returns: the BYE interval.
*/
GstClockTime
rtp_stats_calculate_bye_interval (RTPSessionStats * stats)
{
gdouble members;
gdouble avg_rtcp_size, rtcp_bw;
gdouble interval;
gdouble rtcp_min_time;
/* no interval when we have less than 50 members */
if (stats->active_sources < 50)
return 0;
rtcp_min_time = (stats->min_interval) / 2.0;
/* Dedicate a fraction of the RTCP bandwidth to senders unless
* the number of senders is large enough that their share is
* more than that fraction.
*/
members = stats->bye_members;
rtcp_bw = stats->rtcp_bandwidth * stats->receiver_fraction;
/* no bandwidth for RTCP, return NONE to signal that we don't want to send
* RTCP packets */
if (rtcp_bw <= 0.0001)
return GST_CLOCK_TIME_NONE;
avg_rtcp_size = 8.0 * stats->avg_rtcp_packet_size;
/*
* The effective number of sites times the average packet size is
* the total number of octets sent when each site sends a report.
* Dividing this by the effective bandwidth gives the time
* interval over which those packets must be sent in order to
* meet the bandwidth target, with a minimum enforced. In that
* time interval we send one report so this time is also our
* average time between reports.
*/
interval = avg_rtcp_size * members / rtcp_bw;
if (interval < rtcp_min_time)
interval = rtcp_min_time;
return interval * GST_SECOND;
}
/**
* rtp_stats_get_packets_lost:
* @stats: an #RTPSourceStats struct
*
* Calculate the total number of RTP packets lost since beginning of
* reception. Packets that arrive late are not considered lost, and
* duplicates are not taken into account. Hence, the loss may be negative
* if there are duplicates.
*
* Returns: total RTP packets lost.
*/
gint64
rtp_stats_get_packets_lost (const RTPSourceStats * stats)
{
gint64 lost;
guint64 extended_max, expected;
extended_max = stats->cycles + stats->max_seq;
expected = extended_max - stats->base_seq + 1;
lost = expected - stats->packets_received;
return lost;
}
void
rtp_stats_set_min_interval (RTPSessionStats * stats, gdouble min_interval)
{
stats->min_interval = min_interval;
}
gboolean
__g_socket_address_equal (GSocketAddress * a, GSocketAddress * b)
{
GInetSocketAddress *ia, *ib;
GInetAddress *iaa, *iab;
ia = G_INET_SOCKET_ADDRESS (a);
ib = G_INET_SOCKET_ADDRESS (b);
if (g_inet_socket_address_get_port (ia) !=
g_inet_socket_address_get_port (ib))
return FALSE;
iaa = g_inet_socket_address_get_address (ia);
iab = g_inet_socket_address_get_address (ib);
return g_inet_address_equal (iaa, iab);
}
gchar *
__g_socket_address_to_string (GSocketAddress * addr)
{
GInetSocketAddress *ia;
gchar *ret, *tmp;
ia = G_INET_SOCKET_ADDRESS (addr);
tmp = g_inet_address_to_string (g_inet_socket_address_get_address (ia));
ret = g_strdup_printf ("%s:%u", tmp, g_inet_socket_address_get_port (ia));
g_free (tmp);
return ret;
}

View file

@ -0,0 +1,267 @@
/* GStreamer
* Copyright (C) <2007> Wim Taymans <wim.taymans@gmail.com>
* Copyright (C) 2015 Kurento (http://kurento.org/)
* @author: Miguel París <mparisdiaz@gmail.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
#ifndef __RTP_STATS_H__
#define __RTP_STATS_H__
#include <gst/gst.h>
#include <gst/net/gstnetaddressmeta.h>
#include <gst/rtp/rtp.h>
#include <gio/gio.h>
/* UDP/IP is assumed for bandwidth calculation */
#define UDP_IP_HEADER_OVERHEAD 28
/**
* RTPSenderReport:
*
* A sender report structure.
*/
typedef struct {
gboolean is_valid;
guint64 ntptime;
guint32 rtptime;
guint32 packet_count;
guint32 octet_count;
GstClockTime time;
} RTPSenderReport;
/**
* RTPReceiverReport:
*
* A receiver report structure.
*/
typedef struct {
gboolean is_valid;
guint32 ssrc; /* who the report is from */
guint8 fractionlost;
guint32 packetslost;
guint32 exthighestseq;
guint32 jitter;
guint32 lsr;
guint32 dlsr;
guint32 round_trip;
} RTPReceiverReport;
/**
* RTPPacketInfo:
* @send: if this is a packet for sending
* @rtp: if this info is about an RTP packet
* @is_list: if this is a bufferlist
* @data: a #GstBuffer or #GstBufferList
* @address: address of the sender of the packet
* @current_time: current time according to the system clock
* @running_time: time of a packet as buffer running_time
* @ntpnstime: time of a packet NTP time in nanoseconds
* @header_len: number of overhead bytes per packet
* @bytes: bytes of the packet including lowlevel overhead
* @payload_len: bytes of the RTP payload
* @seqnum: the seqnum of the packet
* @pt: the payload type of the packet
* @rtptime: the RTP time of the packet
*
* Structure holding information about the packet.
*/
typedef struct {
gboolean send;
gboolean rtp;
gboolean is_list;
gpointer data;
GSocketAddress *address;
GstClockTime current_time;
GstClockTime running_time;
guint64 ntpnstime;
guint header_len;
guint bytes;
guint packets;
guint payload_len;
guint32 ssrc;
guint16 seqnum;
guint8 pt;
guint32 rtptime;
guint32 csrc_count;
guint32 csrcs[16];
} RTPPacketInfo;
/**
* RTPSourceStats:
* @packets_received: number of received packets in total
* @prev_received: number of packets received in previous reporting
* interval
* @octets_received: number of payload bytes received
* @bytes_received: number of total bytes received including headers and lower
* protocol level overhead
* @max_seqnr: highest sequence number received
* @transit: previous transit time used for calculating @jitter
* @jitter: current jitter (in clock rate units scaled by 16 for precision)
* @prev_rtptime: previous time when an RTP packet was received
* @prev_rtcptime: previous time when an RTCP packet was received
* @last_rtptime: time when last RTP packet received
* @last_rtcptime: time when last RTCP packet received
* @curr_rr: index of current @rr block
* @rr: previous and current receiver report block
* @curr_sr: index of current @sr block
* @sr: previous and current sender report block
*
* Stats about a source.
*/
typedef struct {
guint64 packets_received;
guint64 octets_received;
guint64 bytes_received;
guint32 prev_expected;
guint32 prev_received;
guint16 max_seq;
guint64 cycles;
guint32 base_seq;
guint32 bad_seq;
guint32 transit;
guint32 jitter;
guint64 packets_sent;
guint64 octets_sent;
guint sent_pli_count;
guint recv_pli_count;
guint sent_fir_count;
guint recv_fir_count;
guint sent_nack_count;
guint recv_nack_count;
/* when we received stuff */
GstClockTime prev_rtptime;
GstClockTime prev_rtcptime;
GstClockTime last_rtptime;
GstClockTime last_rtcptime;
/* sender and receiver reports */
gint curr_rr;
RTPReceiverReport rr[2];
gint curr_sr;
RTPSenderReport sr[2];
} RTPSourceStats;
#define RTP_STATS_BANDWIDTH 64000
#define RTP_STATS_RTCP_FRACTION 0.05
/*
* Minimum average time between RTCP packets from this site (in
* seconds). This time prevents the reports from `clumping' when
* sessions are small and the law of large numbers isn't helping
* to smooth out the traffic. It also keeps the report interval
* from becoming ridiculously small during transient outages like
* a network partition.
*/
#define RTP_STATS_MIN_INTERVAL 5.0
/*
* Fraction of the RTCP bandwidth to be shared among active
* senders. (This fraction was chosen so that in a typical
* session with one or two active senders, the computed report
* time would be roughly equal to the minimum report time so that
* we don't unnecessarily slow down receiver reports.) The
* receiver fraction must be 1 - the sender fraction.
*/
#define RTP_STATS_SENDER_FRACTION (0.25)
#define RTP_STATS_RECEIVER_FRACTION (1.0 - RTP_STATS_SENDER_FRACTION)
/*
* When receiving a BYE from a source, remove the source from the database
* after this timeout.
*/
#define RTP_STATS_BYE_TIMEOUT (2 * GST_SECOND)
/*
* The default and minimum values of the maximum number of missing packets we tolerate.
* These are packets with asequence number bigger than the last seen packet.
*/
#define RTP_DEF_DROPOUT 3000
#define RTP_MIN_DROPOUT 30
/*
* The default and minimum values of the maximum number of misordered packets we tolerate.
* These are packets with a sequence number smaller than the last seen packet.
*/
#define RTP_DEF_MISORDER 100
#define RTP_MIN_MISORDER 10
/**
* RTPPacketRateCtx:
*
* Context to calculate the pseudo-average packet rate.
*/
typedef struct {
gboolean probed;
gint32 clock_rate;
guint16 last_seqnum;
guint64 last_ts;
guint32 avg_packet_rate;
} RTPPacketRateCtx;
void gst_rtp_packet_rate_ctx_reset (RTPPacketRateCtx * ctx, gint32 clock_rate);
guint32 gst_rtp_packet_rate_ctx_update (RTPPacketRateCtx *ctx, guint16 seqnum, guint32 ts);
guint32 gst_rtp_packet_rate_ctx_get (RTPPacketRateCtx *ctx);
guint32 gst_rtp_packet_rate_ctx_get_max_dropout (RTPPacketRateCtx *ctx, gint32 time_ms);
guint32 gst_rtp_packet_rate_ctx_get_max_misorder (RTPPacketRateCtx *ctx, gint32 time_ms);
/**
* RTPSessionStats:
*
* Stats kept for a session and used to produce RTCP packet timeouts.
*/
typedef struct {
guint bandwidth;
guint rtcp_bandwidth;
gdouble sender_fraction;
gdouble receiver_fraction;
gdouble min_interval;
GstClockTime bye_timeout;
guint internal_sources;
guint sender_sources;
guint internal_sender_sources;
guint active_sources;
guint avg_rtcp_packet_size;
guint bye_members;
guint nacks_dropped;
guint nacks_sent;
guint nacks_received;
} RTPSessionStats;
void rtp_stats_init_defaults (RTPSessionStats *stats);
void rtp_stats_set_bandwidths (RTPSessionStats *stats,
guint rtp_bw,
gdouble rtcp_bw,
guint rs, guint rr);
GstClockTime rtp_stats_calculate_rtcp_interval (RTPSessionStats *stats, gboolean sender, GstRTPProfile profile, gboolean ptp, gboolean first);
GstClockTime rtp_stats_add_rtcp_jitter (RTPSessionStats *stats, GstClockTime interval);
GstClockTime rtp_stats_calculate_bye_interval (RTPSessionStats *stats);
gint64 rtp_stats_get_packets_lost (const RTPSourceStats *stats);
void rtp_stats_set_min_interval (RTPSessionStats *stats,
gdouble min_interval);
gboolean __g_socket_address_equal (GSocketAddress *a, GSocketAddress *b);
gchar * __g_socket_address_to_string (GSocketAddress * addr);
#endif /* __RTP_STATS_H__ */

View file

@ -23,6 +23,8 @@ extern crate gstreamer_check as gst_check;
extern crate gstthreadshare;
use gstthreadshare::{RTPJitterBuffer, RTPJitterBufferItem};
fn init() {
use std::sync::Once;
static INIT: Once = Once::new();
@ -37,6 +39,22 @@ fn init() {
fn test_push() {
init();
let jb = RTPJitterBuffer::new();
let item = RTPJitterBufferItem::new(
gst::Buffer::new(),
gst::ClockTime(Some(57)),
gst::ClockTime(Some(72)),
135,
142,
);
jb.insert(item);
let (item, percent) = jb.pop();
let buffer = item.get_buffer();
eprintln!("Buffer: {:?}", buffer);
eprintln!("Percent, pts: {} , {}", percent, item.get_pts());
let mut h = gst_check::Harness::new("ts-appsrc");
let caps = gst::Caps::new_simple("foo/bar", &[]);