rtpbin2: implement a session configuration object

Currently only contains pt-map

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1426>
This commit is contained in:
Matthew Waters 2024-01-17 21:20:35 +11:00
parent 48e7a2ed06
commit 06f40e72cb
5 changed files with 372 additions and 61 deletions

View file

@ -7311,7 +7311,20 @@
"writable": true "writable": true
} }
}, },
"rank": "none" "rank": "none",
"signals": {
"get-session": {
"action": true,
"args": [
{
"name": "arg0",
"type": "guint"
}
],
"return-type": "GstRtpBin2Session",
"when": "last"
}
}
}, },
"rtpgccbwe": { "rtpgccbwe": {
"author": "Thibault Saunier <tsaunier@igalia.com>", "author": "Thibault Saunier <tsaunier@igalia.com>",
@ -8131,6 +8144,49 @@
"filename": "gstrsrtp", "filename": "gstrsrtp",
"license": "MPL-2.0", "license": "MPL-2.0",
"other-types": { "other-types": {
"GstRtp2Session": {
"hierarchy": [
"GstRtp2Session",
"GObject"
],
"kind": "object",
"properties": {
"pt-map": {
"blurb": "Mapping of RTP payload type to caps",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "application/x-rtp2-pt-map;",
"mutable": "null",
"readable": true,
"type": "GstStructure",
"writable": true
}
},
"signals": {
"bye-ssrc": {
"args": [
{
"name": "arg0",
"type": "guint"
}
],
"return-type": "void",
"when": "last"
},
"new-ssrc": {
"args": [
{
"name": "arg0",
"type": "guint"
}
],
"return-type": "void",
"when": "last"
}
}
},
"GstRtpBaseAudioPay2": { "GstRtpBaseAudioPay2": {
"hierarchy": [ "hierarchy": [
"GstRtpBaseAudioPay2", "GstRtpBaseAudioPay2",

View file

@ -105,3 +105,14 @@ gst::plugin_define!(
env!("CARGO_PKG_REPOSITORY"), env!("CARGO_PKG_REPOSITORY"),
env!("BUILD_REL_DATE") env!("BUILD_REL_DATE")
); );
#[cfg(test)]
pub(crate) fn test_init() {
use std::sync::Once;
static INIT: Once = Once::new();
INIT.call_once(|| {
gst::init().unwrap();
plugin_register_static().expect("rtp plugin test");
});
}

View file

@ -0,0 +1,189 @@
// SPDX-License-Identifier: MPL-2.0
use gst::glib;
use gst::prelude::*;
use gst::subclass::prelude::*;
use once_cell::sync::Lazy;
use std::sync::{Mutex, Weak};
use crate::rtpbin2::imp::BinSessionInner;
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
gst::DebugCategory::new(
"rtpbin2-config",
gst::DebugColorFlags::empty(),
Some("RtpBin2 config"),
)
});
glib::wrapper! {
pub struct RtpBin2Session(ObjectSubclass<imp::RtpBin2Session>);
}
impl RtpBin2Session {
pub(crate) fn new(weak_session: Weak<Mutex<BinSessionInner>>) -> Self {
let ret = glib::Object::new::<Self>();
let imp = ret.imp();
imp.set_session(weak_session);
ret
}
}
mod imp {
use std::sync::Arc;
use super::*;
#[derive(Debug, Default)]
struct State {
pub(super) weak_session: Option<Weak<Mutex<BinSessionInner>>>,
}
#[derive(Debug, Default)]
pub struct RtpBin2Session {
state: Mutex<State>,
}
impl RtpBin2Session {
pub(super) fn set_session(&self, weak_session: Weak<Mutex<BinSessionInner>>) {
let mut state = self.state.lock().unwrap();
state.weak_session = Some(weak_session);
}
fn session(&self) -> Option<Arc<Mutex<BinSessionInner>>> {
self.state
.lock()
.unwrap()
.weak_session
.as_ref()
.and_then(|sess| sess.upgrade())
}
pub fn set_pt_map(&self, pt_map: Option<gst::Structure>) {
let Some(session) = self.session() else {
return;
};
let mut session = session.lock().unwrap();
session.clear_pt_map();
let Some(pt_map) = pt_map else {
return;
};
for (key, value) in pt_map.iter() {
let Ok(pt) = key.parse::<u8>() else {
gst::warning!(CAT, "failed to parse key as a pt");
continue;
};
if let Ok(caps) = value.get::<gst::Caps>() {
session.add_caps(caps);
} else {
gst::warning!(CAT, "{pt} does not contain a caps value");
continue;
}
}
}
pub fn pt_map(&self) -> gst::Structure {
let mut ret = gst::Structure::builder("application/x-rtpbin2-pt-map");
let Some(session) = self.session() else {
return ret.build();
};
let session = session.lock().unwrap();
for (pt, caps) in session.pt_map() {
ret = ret.field(pt.to_string(), caps);
}
ret.build()
}
}
#[glib::object_subclass]
impl ObjectSubclass for RtpBin2Session {
const NAME: &'static str = "GstRtpBin2Session";
type Type = super::RtpBin2Session;
type ParentType = glib::Object;
}
impl ObjectImpl for RtpBin2Session {
fn properties() -> &'static [glib::ParamSpec] {
static PROPERTIES: Lazy<Vec<glib::ParamSpec>> = Lazy::new(|| {
vec![glib::ParamSpecBoxed::builder::<gst::Structure>("pt-map")
.nick("RTP Payload Type Map")
.blurb("Mapping of RTP payload type to caps")
.build()]
});
PROPERTIES.as_ref()
}
fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
match pspec.name() {
"pt-map" => self.pt_map().to_value(),
_ => unreachable!(),
}
}
fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) {
match pspec.name() {
"pt-map" => self.set_pt_map(
value
.get::<Option<gst::Structure>>()
.expect("Type checked upstream"),
),
_ => unreachable!(),
}
}
}
}
#[cfg(test)]
mod tests {
use crate::test_init;
use super::*;
#[test]
fn pt_map_get_empty() {
test_init();
let rtpbin2 = gst::ElementFactory::make("rtpbin2").build().unwrap();
let _pad = rtpbin2.request_pad_simple("rtp_send_sink_0").unwrap();
let session = rtpbin2.emit_by_name::<gst::glib::Object>("get-session", &[&0u32]);
let pt_map = session.property::<gst::Structure>("pt-map");
assert!(pt_map.has_name("application/x-rtpbin2-pt-map"));
assert_eq!(pt_map.fields().len(), 0);
}
#[test]
fn pt_map_set() {
test_init();
let rtpbin2 = gst::ElementFactory::make("rtpbin2").build().unwrap();
let _pad = rtpbin2.request_pad_simple("rtp_send_sink_0").unwrap();
let session = rtpbin2.emit_by_name::<gst::glib::Object>("get-session", &[&0u32]);
let pt = 96i32;
let pt_caps = gst::Caps::builder("application/x-rtp")
.field("payload", pt)
.field("clock-rate", 90000i32)
.build();
let pt_map = gst::Structure::builder("application/x-rtpbin2-pt-map")
.field(pt.to_string(), pt_caps.clone())
.build();
session.set_property("pt-map", pt_map);
let prop = session.property::<gst::Structure>("pt-map");
assert!(prop.has_name("application/x-rtpbin2-pt-map"));
assert_eq!(prop.fields().len(), 1);
let caps = prop.get::<gst::Caps>(pt.to_string()).unwrap();
assert_eq!(pt_caps, caps);
}
#[test]
fn pt_map_set_none() {
test_init();
let rtpbin2 = gst::ElementFactory::make("rtpbin2").build().unwrap();
let _pad = rtpbin2.request_pad_simple("rtp_send_sink_0").unwrap();
let session = rtpbin2.emit_by_name::<gst::glib::Object>("get-session", &[&0u32]);
session.set_property("pt-map", None::<gst::Structure>);
let prop = session.property::<gst::Structure>("pt-map");
assert!(prop.has_name("application/x-rtpbin2-pt-map"));
}
}

View file

@ -20,6 +20,7 @@ use super::session::{
use super::source::{ReceivedRb, SourceState}; use super::source::{ReceivedRb, SourceState};
use super::sync; use super::sync;
use crate::rtpbin2::config::RtpBin2Session;
use crate::rtpbin2::RUNTIME; use crate::rtpbin2::RUNTIME;
const DEFAULT_LATENCY: gst::ClockTime = gst::ClockTime::from_mseconds(200); const DEFAULT_LATENCY: gst::ClockTime = gst::ClockTime::from_mseconds(200);
@ -264,7 +265,7 @@ impl RtpRecvSrcPad {
.seqnum(seqnum) .seqnum(seqnum)
.build(); .build();
let caps = session_inner.caps_from_pt_ssrc(self.pt, self.ssrc); let caps = session_inner.caps_from_pt(self.pt);
let caps = gst::event::Caps::builder(&caps).seqnum(seqnum).build(); let caps = gst::event::Caps::builder(&caps).seqnum(seqnum).build();
let segment = let segment =
@ -290,9 +291,10 @@ struct HeldRecvBuffer {
} }
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
struct BinSession { pub struct BinSession {
id: usize, id: usize,
inner: Arc<Mutex<BinSessionInner>>, inner: Arc<Mutex<BinSessionInner>>,
config: RtpBin2Session,
} }
impl BinSession { impl BinSession {
@ -305,15 +307,18 @@ impl BinSession {
inner inner
.session .session
.set_reduced_size_rtcp(settings.reduced_size_rtcp); .set_reduced_size_rtcp(settings.reduced_size_rtcp);
let inner = Arc::new(Mutex::new(inner));
let weak_inner = Arc::downgrade(&inner);
Self { Self {
id, id,
inner: Arc::new(Mutex::new(inner)), inner,
config: RtpBin2Session::new(weak_inner),
} }
} }
} }
#[derive(Debug)] #[derive(Debug)]
struct BinSessionInner { pub(crate) struct BinSessionInner {
id: usize, id: usize,
session: Session, session: Session,
@ -325,7 +330,7 @@ struct BinSessionInner {
rtp_recv_sink_segment: Option<gst::FormattedSegment<gst::ClockTime>>, rtp_recv_sink_segment: Option<gst::FormattedSegment<gst::ClockTime>>,
rtp_recv_sink_seqnum: Option<gst::Seqnum>, rtp_recv_sink_seqnum: Option<gst::Seqnum>,
caps_map: HashMap<u8, HashMap<u32, gst::Caps>>, pt_map: HashMap<u8, gst::Caps>,
recv_store: Vec<HeldRecvBuffer>, recv_store: Vec<HeldRecvBuffer>,
rtp_recv_srcpads: Vec<RtpRecvSrcPad>, rtp_recv_srcpads: Vec<RtpRecvSrcPad>,
@ -352,7 +357,7 @@ impl BinSessionInner {
rtp_recv_sink_segment: None, rtp_recv_sink_segment: None,
rtp_recv_sink_seqnum: None, rtp_recv_sink_seqnum: None,
caps_map: HashMap::default(), pt_map: HashMap::default(),
recv_store: vec![], recv_store: vec![],
rtp_recv_srcpads: vec![], rtp_recv_srcpads: vec![],
@ -366,16 +371,32 @@ impl BinSessionInner {
} }
} }
fn caps_from_pt_ssrc(&self, pt: u8, ssrc: u32) -> gst::Caps { pub fn clear_pt_map(&mut self) {
self.caps_map self.pt_map.clear();
.get(&pt) }
.and_then(|ssrc_map| ssrc_map.get(&ssrc))
.cloned() pub fn add_caps(&mut self, caps: gst::Caps) {
.unwrap_or( let Some((pt, clock_rate)) = pt_clock_rate_from_caps(&caps) else {
gst::Caps::builder("application/x-rtp") return;
.field("payload", pt as i32) };
.build(), let caps_clone = caps.clone();
) self.pt_map
.entry(pt)
.and_modify(move |entry| *entry = caps)
.or_insert_with(move || caps_clone);
self.session.set_pt_clock_rate(pt, clock_rate);
}
fn caps_from_pt(&self, pt: u8) -> gst::Caps {
self.pt_map.get(&pt).cloned().unwrap_or(
gst::Caps::builder("application/x-rtp")
.field("payload", pt as i32)
.build(),
)
}
pub fn pt_map(&self) -> impl Iterator<Item = (u8, &gst::Caps)> + '_ {
self.pt_map.iter().map(|(&k, v)| (k, v))
} }
fn start_rtp_recv_task(&mut self, pad: &gst::Pad) -> Result<(), glib::BoolError> { fn start_rtp_recv_task(&mut self, pad: &gst::Pad) -> Result<(), glib::BoolError> {
@ -948,26 +969,23 @@ impl RtpBin2 {
let mut session_inner = session.inner.lock().unwrap(); let mut session_inner = session.inner.lock().unwrap();
let current_caps = session_inner.rtp_recv_sink_caps.clone(); let current_caps = session_inner.rtp_recv_sink_caps.clone();
let ssrc_map = session_inner if let std::collections::hash_map::Entry::Vacant(e) =
.caps_map session_inner.pt_map.entry(rtp.payload_type())
.entry(rtp.payload_type()) {
.or_default(); if let Some(mut caps) = current_caps.filter(|caps| clock_rate_from_caps(caps).is_some())
if ssrc_map.get(&rtp.ssrc()).is_none() {
if let Some(mut caps) =
current_caps.filter(|caps| Self::clock_rate_from_caps(caps).is_some())
{ {
state state
.sync_context .sync_context
.as_mut() .as_mut()
.unwrap() .unwrap()
.set_clock_rate(rtp.ssrc(), Self::clock_rate_from_caps(&caps).unwrap()); .set_clock_rate(rtp.ssrc(), clock_rate_from_caps(&caps).unwrap());
{ {
// Ensure the caps we send out hold a payload field // Ensure the caps we send out hold a payload field
let caps = caps.make_mut(); let caps = caps.make_mut();
let s = caps.structure_mut(0).unwrap(); let s = caps.structure_mut(0).unwrap();
s.set("payload", rtp.payload_type() as i32); s.set("payload", rtp.payload_type() as i32);
} }
ssrc_map.insert(rtp.ssrc(), caps); e.insert(caps);
} }
} }
@ -1243,12 +1261,14 @@ impl RtpBin2 {
fn rtp_send_sink_event(&self, pad: &gst::Pad, event: gst::Event, id: usize) -> bool { fn rtp_send_sink_event(&self, pad: &gst::Pad, event: gst::Event, id: usize) -> bool {
match event.view() { match event.view() {
gst::EventView::Caps(caps) => { gst::EventView::Caps(caps) => {
if let Some((pt, clock_rate)) = Self::pt_clock_rate_from_caps(caps.caps()) { if let Some((pt, clock_rate)) = pt_clock_rate_from_caps(caps.caps()) {
let state = self.state.lock().unwrap(); let state = self.state.lock().unwrap();
if let Some(session) = state.session_by_id(id) { if let Some(session) = state.session_by_id(id) {
let mut session = session.inner.lock().unwrap(); let mut session = session.inner.lock().unwrap();
session.session.set_pt_clock_rate(pt, clock_rate); session.session.set_pt_clock_rate(pt, clock_rate);
} }
} else {
gst::warning!(CAT, obj: pad, "input caps are missing payload or clock-rate fields");
} }
gst::Pad::event_default(pad, Some(&*self.obj()), event) gst::Pad::event_default(pad, Some(&*self.obj()), event)
} }
@ -1431,8 +1451,10 @@ impl RtpBin2 {
let mut session = session.inner.lock().unwrap(); let mut session = session.inner.lock().unwrap();
let caps = caps.caps_owned(); let caps = caps.caps_owned();
if let Some((pt, clock_rate)) = Self::pt_clock_rate_from_caps(&caps) { if let Some((pt, clock_rate)) = pt_clock_rate_from_caps(&caps) {
session.session.set_pt_clock_rate(pt, clock_rate); session.session.set_pt_clock_rate(pt, clock_rate);
} else {
gst::warning!(CAT, obj: pad, "input caps are missing payload or clock-rate fields");
} }
session.rtp_recv_sink_caps = Some(caps); session.rtp_recv_sink_caps = Some(caps);
@ -1556,37 +1578,6 @@ impl RtpBin2 {
} }
} }
fn clock_rate_from_caps(caps: &gst::CapsRef) -> Option<u32> {
let Some(s) = caps.structure(0) else {
return None;
};
let Some(clock_rate) = s.get::<i32>("clock-rate").ok() else {
return None;
};
if clock_rate > 0 {
Some(clock_rate as u32)
} else {
None
}
}
fn pt_clock_rate_from_caps(caps: &gst::CapsRef) -> Option<(u8, u32)> {
let Some(s) = caps.structure(0) else {
return None;
};
let Some((clock_rate, pt)) = Option::zip(
s.get::<i32>("clock-rate").ok(),
s.get::<i32>("payload").ok(),
) else {
return None;
};
if (0..=127).contains(&pt) && clock_rate > 0 {
Some((pt as u8, clock_rate as u32))
} else {
None
}
}
fn rtp_recv_src_event( fn rtp_recv_src_event(
&self, &self,
pad: &gst::Pad, pad: &gst::Pad,
@ -1605,7 +1596,7 @@ impl RtpBin2 {
if let Some(session) = state.session_by_id(id) { if let Some(session) = state.session_by_id(id) {
let now = Instant::now(); let now = Instant::now();
let mut session = session.inner.lock().unwrap(); let mut session = session.inner.lock().unwrap();
let caps = session.caps_from_pt_ssrc(pt, ssrc); let caps = session.caps_from_pt(pt);
let s = caps.structure(0).unwrap(); let s = caps.structure(0).unwrap();
let pli = s.has_field("rtcp-fb-nack-pli"); let pli = s.has_field("rtcp-fb-nack-pli");
@ -1776,6 +1767,27 @@ impl ObjectImpl for RtpBin2 {
_ => unimplemented!(), _ => unimplemented!(),
} }
} }
fn signals() -> &'static [glib::subclass::Signal] {
static SIGNALS: Lazy<Vec<glib::subclass::Signal>> = Lazy::new(|| {
vec![glib::subclass::Signal::builder("get-session")
.param_types([u32::static_type()])
.return_type::<crate::rtpbin2::config::RtpBin2Session>()
.action()
.class_handler(|_token, args| {
let element = args[0].get::<super::RtpBin2>().expect("signal arg");
let id = args[1].get::<u32>().expect("signal arg");
let bin = element.imp();
let state = bin.state.lock().unwrap();
state
.session_by_id(id as usize)
.map(|sess| sess.config.to_value())
})
.build()]
});
SIGNALS.as_ref()
}
} }
impl GstObjectImpl for RtpBin2 {} impl GstObjectImpl for RtpBin2 {}
@ -2214,7 +2226,7 @@ impl ElementImpl for RtpBin2 {
session.rtp_recv_sink_seqnum = None; session.rtp_recv_sink_seqnum = None;
session.rtp_recv_sink_group_id = None; session.rtp_recv_sink_group_id = None;
session.caps_map.clear(); session.pt_map.clear();
} }
state.sync_context = None; state.sync_context = None;
drop(state); drop(state);
@ -2240,6 +2252,46 @@ impl ElementImpl for RtpBin2 {
} }
} }
pub fn pt_clock_rate_from_caps(caps: &gst::CapsRef) -> Option<(u8, u32)> {
let Some(s) = caps.structure(0) else {
gst::debug!(CAT, "no structure!");
return None;
};
let Some((clock_rate, pt)) = Option::zip(
s.get::<i32>("clock-rate").ok(),
s.get::<i32>("payload").ok(),
) else {
gst::debug!(
CAT,
"could not retrieve clock-rate and/or payload from structure"
);
return None;
};
if (0..=127).contains(&pt) && clock_rate > 0 {
Some((pt as u8, clock_rate as u32))
} else {
gst::debug!(
CAT,
"payload value {pt} out of bounds or clock-rate {clock_rate} out of bounds"
);
None
}
}
fn clock_rate_from_caps(caps: &gst::CapsRef) -> Option<u32> {
let Some(s) = caps.structure(0) else {
return None;
};
let Some(clock_rate) = s.get::<i32>("clock-rate").ok() else {
return None;
};
if clock_rate > 0 {
Some(clock_rate as u32)
} else {
None
}
}
static RUST_CAT: Lazy<gst::DebugCategory> = Lazy::new(|| { static RUST_CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
gst::DebugCategory::new( gst::DebugCategory::new(
"rust-log", "rust-log",

View file

@ -3,6 +3,7 @@
use gst::glib; use gst::glib;
use gst::prelude::*; use gst::prelude::*;
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
mod config;
mod imp; mod imp;
mod jitterbuffer; mod jitterbuffer;
mod session; mod session;
@ -19,6 +20,8 @@ pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
{ {
crate::rtpbin2::sync::TimestampingMode::static_type() crate::rtpbin2::sync::TimestampingMode::static_type()
.mark_as_plugin_api(gst::PluginAPIFlags::empty()); .mark_as_plugin_api(gst::PluginAPIFlags::empty());
crate::rtpbin2::config::Rtp2Session::static_type()
.mark_as_plugin_api(gst::PluginAPIFlags::empty());
} }
gst::Element::register( gst::Element::register(
Some(plugin), Some(plugin),