rtp: Initial rtpbin2 element

Can receive and recevie one or more RTP sessions containing multiple
pt/ssrc combinations.

Demultiplexing happens internally instead of relying on separate
elements.

Co-Authored-By: François Laignel <francois@centricular.com>
Co-Authored-By: Mathieu Duponchelle <mathieu@centricular.com>
Co-Authored-By: Sebastian Dröge <sebastian@centricular.com>
This commit is contained in:
Matthew Waters 2023-10-12 17:06:48 +11:00
parent 8f997ea4e3
commit 513c4bb9e1
10 changed files with 4945 additions and 3 deletions

15
Cargo.lock generated
View file

@ -2643,15 +2643,22 @@ dependencies = [
"atomic_refcell",
"bitstream-io",
"chrono",
"futures",
"gio",
"gst-plugin-version-helper",
"gstreamer",
"gstreamer-app",
"gstreamer-base",
"gstreamer-check",
"gstreamer-net",
"gstreamer-rtp",
"log",
"once_cell",
"rand",
"rtcp-types",
"rtp-types",
"smallvec",
"tokio",
]
[[package]]
@ -5397,6 +5404,14 @@ dependencies = [
"winapi",
]
[[package]]
name = "rtcp-types"
version = "0.0.1"
source = "git+https://github.com/ystreet/rtcp-types#f7fddfb87e9d7f4fed0b967fedc34995dd81ca86"
dependencies = [
"thiserror",
]
[[package]]
name = "rtp-types"
version = "0.1.1"

View file

@ -6304,6 +6304,92 @@
},
"rank": "marginal"
},
"rtpbin2": {
"author": "Matthew Waters <matthew@centricular.com>",
"description": "RTP sessions management",
"hierarchy": [
"GstRtpBin2",
"GstElement",
"GstObject",
"GInitiallyUnowned",
"GObject"
],
"klass": "Network/RTP/Filter",
"pad-templates": {
"rtcp_recv_sink_%%u": {
"caps": "application/x-rtcp:\n",
"direction": "sink",
"presence": "request"
},
"rtcp_send_src_%%u": {
"caps": "application/x-rtcp:\n",
"direction": "src",
"presence": "request"
},
"rtp_recv_sink_%%u": {
"caps": "application/x-rtp:\n",
"direction": "sink",
"presence": "request"
},
"rtp_recv_src_%%u_%%u_%%u": {
"caps": "application/x-rtp:\n",
"direction": "src",
"presence": "sometimes"
},
"rtp_send_sink_%%u": {
"caps": "application/x-rtp:\n",
"direction": "sink",
"presence": "request"
},
"rtp_send_src_%%u": {
"caps": "application/x-rtp:\n",
"direction": "src",
"presence": "sometimes"
}
},
"properties": {
"latency": {
"blurb": "Amount of ms to buffer",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "0",
"max": "-1",
"min": "0",
"mutable": "ready",
"readable": true,
"type": "guint",
"writable": true
},
"min-rtcp-interval": {
"blurb": "Minimum time (in ms) between RTCP reports",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "5000",
"max": "-1",
"min": "0",
"mutable": "ready",
"readable": true,
"type": "guint",
"writable": true
},
"stats": {
"blurb": "Statistics about the session",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"mutable": "null",
"readable": true,
"type": "guint",
"writable": false
}
},
"rank": "none"
},
"rtpgccbwe": {
"author": "Thibault Saunier <tsaunier@igalia.com>",
"description": "Estimates current network bandwidth using the Google Congestion Control algorithm notifying about it through the 'bitrate' property",

View file

@ -12,12 +12,20 @@ rust-version.workspace = true
atomic_refcell = "0.1"
bitstream-io = "2.1"
chrono = { version = "0.4", default-features = false }
gst = { workspace = true, features = ["v1_20"] }
gst-rtp = { workspace = true, features = ["v1_20"] }
futures = "0.3"
gio.workspace = true
gst = { workspace = true, features = ["v1_20"] }
gst-base = { workspace = true, features = ["v1_20"] }
gst-net = { workspace = true, features = ["v1_20"] }
gst-rtp = { workspace = true, features = ["v1_20"] }
log = "0.4"
once_cell.workspace = true
rand = { version = "0.8", default-features = false, features = ["std", "std_rng" ] }
rtp-types = { version = "0.1" }
rtcp-types = { git = "https://github.com/ystreet/rtcp-types", version = "0.0" }
smallvec = { version = "1.11", features = ["union", "write", "const_generics", "const_new"] }
# TODO: experiment with other async executors (mio, async-std, etc)
tokio = { version = "1", default-features = false, features = ["rt-multi-thread", "time"] }
[dev-dependencies]
gst-check = { workspace = true, features = ["v1_20"] }
@ -48,4 +56,4 @@ versioning = false
import_library = false
[package.metadata.capi.pkg_config]
requires_private = "gstreamer-1.0, gstreamer-base-1.0, gstreamer-rtp-1.0, gobject-2.0, glib-2.0, gmodule-2.0"
requires_private = "gstreamer-1.0, gstreamer-base-1.0, gstreamer-rtp-1.0, gstreamer-net-1.0, gobject-2.0, glib-2.0, gmodule-2.0, gio-2.0"

View file

@ -14,9 +14,14 @@
*
* Since: plugins-rs-0.9.0
*/
#[macro_use]
extern crate log;
use gst::glib;
mod gcc;
mod rtpbin2;
mod audio_discont;
mod baseaudiopay;
@ -31,6 +36,7 @@ mod tests;
fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
gcc::register(plugin)?;
rtpbin2::register(plugin)?;
#[cfg(feature = "doc")]
{

1460
net/rtp/src/rtpbin2/imp.rs Normal file

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,30 @@
// SPDX-License-Identifier: MPL-2.0
use gst::glib;
use gst::prelude::*;
use once_cell::sync::Lazy;
mod imp;
mod session;
mod source;
mod time;
glib::wrapper! {
pub struct RtpBin2(ObjectSubclass<imp::RtpBin2>) @extends gst::Element, gst::Object;
}
pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
gst::Element::register(
Some(plugin),
"rtpbin2",
gst::Rank::NONE,
RtpBin2::static_type(),
)
}
pub static RUNTIME: Lazy<tokio::runtime::Runtime> = Lazy::new(|| {
tokio::runtime::Builder::new_multi_thread()
.enable_time()
.worker_threads(1)
.build()
.unwrap()
});

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,58 @@
// SPDX-License-Identifier: MPL-2.0
use std::{
ops::{Add, Sub},
time::{Duration, SystemTime},
};
// time between the NTP time at 1900-01-01 and the unix EPOCH (1970-01-01)
const NTP_OFFSET: Duration = Duration::from_secs((365 * 70 + 17) * 24 * 60 * 60);
// 2^32
const F32: f64 = 4_294_967_296.0;
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
pub struct NtpTime(u64);
impl NtpTime {
pub fn from_duration(dur: Duration) -> Self {
Self((dur.as_secs_f64() * F32) as u64)
}
pub fn as_u32(self) -> u32 {
((self.0 >> 16) & 0xffffffff) as u32
}
pub fn as_u64(self) -> u64 {
self.0
}
}
impl Sub for NtpTime {
type Output = NtpTime;
fn sub(self, rhs: Self) -> Self::Output {
NtpTime(self.0 - rhs.0)
}
}
impl Add for NtpTime {
type Output = NtpTime;
fn add(self, rhs: Self) -> Self::Output {
NtpTime(self.0 + rhs.0)
}
}
pub fn system_time_to_ntp_time_u64(time: SystemTime) -> NtpTime {
let dur = time
.duration_since(SystemTime::UNIX_EPOCH)
.expect("time is before unix epoch?!")
+ NTP_OFFSET;
NtpTime::from_duration(dur)
}
impl From<u64> for NtpTime {
fn from(value: u64) -> Self {
NtpTime(value)
}
}

159
net/rtp/tests/rtpbin2.rs Normal file
View file

@ -0,0 +1,159 @@
//
// Copyright (C) 2023 Matthew Waters <matthew@centricular.com>
//
// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
// If a copy of the MPL was not distributed with this file, You can obtain one at
// <https://mozilla.org/MPL/2.0/>.
//
// SPDX-License-Identifier: MPL-2.0
use std::sync::{Arc, Mutex};
use gst::{prelude::*, Caps};
use gst_check::Harness;
use rtp_types::*;
fn init() {
use std::sync::Once;
static INIT: Once = Once::new();
INIT.call_once(|| {
gst::init().unwrap();
gstrsrtp::plugin_register_static().expect("rtpbin2 test");
});
}
const TEST_SSRC: u32 = 0x12345678;
const TEST_PT: u8 = 96;
const TEST_CLOCK_RATE: u32 = 48000;
fn generate_rtp_buffer(seqno: u16, rtpts: u32, payload_len: usize) -> gst::Buffer {
let payload = vec![4; payload_len];
let packet = RtpPacketBuilder::new()
.ssrc(TEST_SSRC)
.payload_type(TEST_PT)
.sequence_number(seqno)
.timestamp(rtpts)
.payload(&payload);
let size = packet.calculate_size().unwrap();
let mut data = vec![0; size];
packet.write_into(&mut data).unwrap();
gst::Buffer::from_mut_slice(data)
}
#[test]
fn test_send() {
init();
let mut h = Harness::with_padnames("rtpbin2", Some("rtp_send_sink_0"), Some("rtp_send_src_0"));
h.play();
let caps = Caps::builder("application/x-rtp")
.field("media", "audio")
.field("payload", TEST_PT as i32)
.field("clock-rate", TEST_CLOCK_RATE as i32)
.field("encoding-name", "custom-test")
.build();
h.set_src_caps(caps);
h.push(generate_rtp_buffer(500, 20, 9)).unwrap();
h.push(generate_rtp_buffer(501, 30, 11)).unwrap();
let buffer = h.pull().unwrap();
let mapped = buffer.map_readable().unwrap();
let rtp = rtp_types::RtpPacket::parse(&mapped).unwrap();
assert_eq!(rtp.sequence_number(), 500);
let buffer = h.pull().unwrap();
let mapped = buffer.map_readable().unwrap();
let rtp = rtp_types::RtpPacket::parse(&mapped).unwrap();
assert_eq!(rtp.sequence_number(), 501);
let stats = h.element().unwrap().property::<gst::Structure>("stats");
let session_stats = stats.get::<gst::Structure>("0").unwrap();
let source_stats = session_stats
.get::<gst::Structure>(TEST_SSRC.to_string())
.unwrap();
assert_eq!(source_stats.get::<u32>("ssrc").unwrap(), TEST_SSRC);
assert_eq!(
source_stats.get::<u32>("clock-rate").unwrap(),
TEST_CLOCK_RATE
);
assert!(source_stats.get::<bool>("sender").unwrap());
assert!(source_stats.get::<bool>("local").unwrap());
assert_eq!(source_stats.get::<u64>("packets-sent").unwrap(), 2);
assert_eq!(source_stats.get::<u64>("octets-sent").unwrap(), 20);
}
#[test]
fn test_receive() {
init();
let h = Arc::new(Mutex::new(Harness::with_padnames(
"rtpbin2",
Some("rtp_recv_sink_0"),
None,
)));
let weak_h = Arc::downgrade(&h);
let mut inner = h.lock().unwrap();
inner
.element()
.unwrap()
.connect_pad_added(move |_elem, pad| {
weak_h
.upgrade()
.unwrap()
.lock()
.unwrap()
.add_element_src_pad(pad)
});
inner.play();
let caps = Caps::builder("application/x-rtp")
.field("media", "audio")
.field("payload", TEST_PT as i32)
.field("clock-rate", TEST_CLOCK_RATE as i32)
.field("encoding-name", "custom-test")
.build();
inner.set_src_caps(caps);
// Cannot push with harness lock as the 'pad-added' handler needs to add the newly created pad to
// the harness and needs to also take the harness lock. Workaround by pushing from the
// internal harness pad directly.
let push_pad = inner
.element()
.unwrap()
.static_pad("rtp_recv_sink_0")
.unwrap()
.peer()
.unwrap();
drop(inner);
push_pad.push(generate_rtp_buffer(500, 20, 9)).unwrap();
push_pad.push(generate_rtp_buffer(501, 30, 11)).unwrap();
let mut inner = h.lock().unwrap();
let buffer = inner.pull().unwrap();
let mapped = buffer.map_readable().unwrap();
let rtp = rtp_types::RtpPacket::parse(&mapped).unwrap();
assert_eq!(rtp.sequence_number(), 500);
let buffer = inner.pull().unwrap();
let mapped = buffer.map_readable().unwrap();
let rtp = rtp_types::RtpPacket::parse(&mapped).unwrap();
assert_eq!(rtp.sequence_number(), 501);
let stats = inner.element().unwrap().property::<gst::Structure>("stats");
let session_stats = stats.get::<gst::Structure>("0").unwrap();
let source_stats = session_stats
.get::<gst::Structure>(TEST_SSRC.to_string())
.unwrap();
assert_eq!(source_stats.get::<u32>("ssrc").unwrap(), TEST_SSRC);
assert_eq!(
source_stats.get::<u32>("clock-rate").unwrap(),
TEST_CLOCK_RATE
);
assert!(source_stats.get::<bool>("sender").unwrap());
assert!(!source_stats.get::<bool>("local").unwrap());
assert_eq!(source_stats.get::<u64>("packets-received").unwrap(), 2);
assert_eq!(source_stats.get::<u64>("octets-received").unwrap(), 20);
}