mirror of
https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git
synced 2024-11-10 20:31:10 +00:00
rtp: basepay: Negotiate SSRC and PT with downstream if not set via property
This makes the new payloaders closer to the old ones, and makes usage in webrtcbin easier. Also properly configure default PT of subclasses. Previously any PT that was set for these subclasses via g_object_new() would be overridden by the default one during construction. Additionally, do SSRC collision handling while queueing output packets. This is the more natural place as that's where the SSRC is actually used, it happens potentially earlier and also allows to drain any pending packets before the SSRC change in the caps. Fixes https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/issues/557 Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1693>
This commit is contained in:
parent
914ffc8be9
commit
31e836f4d6
5 changed files with 134 additions and 160 deletions
|
@ -35,6 +35,7 @@ static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
|
|||
struct Settings {
|
||||
mtu: u32,
|
||||
pt: u8,
|
||||
pt_set: bool, // If the pt was set after creation
|
||||
ssrc: Option<u32>,
|
||||
timestamp_offset: Option<u32>,
|
||||
seqnum_offset: Option<u16>,
|
||||
|
@ -50,6 +51,7 @@ impl Default for Settings {
|
|||
// TODO: Should we use a different default here? libwebrtc uses 1200, for example
|
||||
mtu: 1400,
|
||||
pt: 96,
|
||||
pt_set: false,
|
||||
ssrc: None,
|
||||
timestamp_offset: None,
|
||||
seqnum_offset: None,
|
||||
|
@ -70,6 +72,12 @@ struct Stream {
|
|||
seqnum_offset: u16,
|
||||
/// Set if onvif_no_rate_control || !scale_rtptime
|
||||
use_stream_time: bool,
|
||||
|
||||
/// Last seqnum that was put on a packet. This is initialized with stream.seqnum_offset.
|
||||
last_seqnum: Wrapping<u16>,
|
||||
|
||||
/// Last RTP timestamp that was put on a packet. This is initialized with stream.timestamp_offset.
|
||||
last_timestamp: Wrapping<u32>,
|
||||
}
|
||||
|
||||
/// Metadata of a pending buffer for tracking purposes.
|
||||
|
@ -109,11 +117,6 @@ struct State {
|
|||
/// Last buffer id that was used for a packet.
|
||||
last_used_buffer_id: u64,
|
||||
|
||||
/// Last seqnum that was put on a packet. This is initialized with stream.seqnum_offset.
|
||||
last_seqnum: Wrapping<u16>,
|
||||
|
||||
/// Last RTP timestamp that was put on a packet. This is initialized with stream.timestamp_offset.
|
||||
last_timestamp: Wrapping<u32>,
|
||||
/// Last PTS that was put on an outgoing buffer.
|
||||
last_pts: Option<gst::ClockTime>,
|
||||
|
||||
|
@ -148,9 +151,6 @@ impl Default for State {
|
|||
current_buffer_id: 0,
|
||||
last_used_buffer_id: 0,
|
||||
|
||||
last_seqnum: Wrapping(0),
|
||||
|
||||
last_timestamp: Wrapping(0),
|
||||
last_pts: None,
|
||||
|
||||
last_pts_rtp_mapping: None,
|
||||
|
@ -303,9 +303,9 @@ impl RtpBasePay2 {
|
|||
return Err(gst::FlowError::NotNegotiated);
|
||||
}
|
||||
|
||||
state.last_seqnum += 1;
|
||||
let stream = state.stream.as_ref().unwrap();
|
||||
let seqnum = state.last_seqnum.0;
|
||||
let stream = state.stream.as_mut().unwrap();
|
||||
stream.last_seqnum += 1;
|
||||
let seqnum = stream.last_seqnum.0;
|
||||
gst::trace!(CAT, imp = self, "Using seqnum {seqnum}");
|
||||
packet = packet
|
||||
.payload_type(stream.pt)
|
||||
|
@ -772,7 +772,7 @@ impl RtpBasePay2 {
|
|||
state.pending_packets.push_back(PendingPacket { buffer });
|
||||
|
||||
state.last_used_buffer_id = id_end;
|
||||
state.last_timestamp = Wrapping(packet_rtptime);
|
||||
state.stream.as_mut().unwrap().last_timestamp = Wrapping(packet_rtptime);
|
||||
state.last_pts = Some(pts);
|
||||
|
||||
// Update stats
|
||||
|
@ -872,13 +872,6 @@ impl RtpBasePay2 {
|
|||
pub(super) fn src_pad(&self) -> &gst::Pad {
|
||||
&self.src_pad
|
||||
}
|
||||
|
||||
/// Helper so we don't manually have to set ssrc_collision to None if there
|
||||
/// is Some collision to handle.
|
||||
fn take_ssrc_collision(&self) -> Option<u32> {
|
||||
let mut ssrc_collision = self.ssrc_collision.lock().unwrap();
|
||||
ssrc_collision.take()
|
||||
}
|
||||
}
|
||||
|
||||
/// Default virtual method implementations.
|
||||
|
@ -1063,6 +1056,17 @@ impl RtpBasePay2 {
|
|||
}
|
||||
};
|
||||
|
||||
let caps_ssrc = s.get::<u32>("ssrc").ok();
|
||||
let caps_pt = s
|
||||
.get::<i32>("payload")
|
||||
.ok()
|
||||
.filter(|pt| *pt > 0)
|
||||
.map(|pt| pt as u8);
|
||||
|
||||
// We're not negotiating seqnum-offset and timestamp-offset with downstream
|
||||
// and also don't set it in the caps. The old payloader base class does this
|
||||
// but this is not actually used anywhere.
|
||||
|
||||
let mut state = self.state.borrow_mut();
|
||||
if self
|
||||
.negotiate_header_extensions(&state, &mut src_caps)
|
||||
|
@ -1072,6 +1076,63 @@ impl RtpBasePay2 {
|
|||
return;
|
||||
}
|
||||
|
||||
let ssrc_collision = self.ssrc_collision.lock().unwrap().take();
|
||||
if state.stream.is_none() {
|
||||
use rand::prelude::*;
|
||||
let mut rng = rand::thread_rng();
|
||||
let settings = self.settings.lock().unwrap();
|
||||
|
||||
let pt = if settings.pt_set {
|
||||
settings.pt
|
||||
} else {
|
||||
caps_pt.unwrap_or(settings.pt)
|
||||
};
|
||||
let ssrc = ssrc_collision
|
||||
.or(settings.ssrc)
|
||||
.or(caps_ssrc)
|
||||
.unwrap_or_else(|| rng.gen::<u32>());
|
||||
let timestamp_offset = settings
|
||||
.timestamp_offset
|
||||
.unwrap_or_else(|| rng.gen::<u32>());
|
||||
let seqnum_offset = settings.seqnum_offset.unwrap_or_else(|| rng.gen::<u16>());
|
||||
let stream = Stream {
|
||||
pt,
|
||||
ssrc,
|
||||
timestamp_offset,
|
||||
seqnum_offset,
|
||||
use_stream_time: settings.onvif_no_rate_control || !settings.scale_rtptime,
|
||||
last_seqnum: Wrapping(seqnum_offset),
|
||||
last_timestamp: Wrapping(timestamp_offset),
|
||||
};
|
||||
|
||||
gst::info!(CAT, imp = self, "Configuring {stream:?}");
|
||||
|
||||
state.stream = Some(stream);
|
||||
*self.stats.lock().unwrap() = Some(Stats {
|
||||
ssrc,
|
||||
pt,
|
||||
clock_rate: None,
|
||||
running_time: None,
|
||||
seqnum: seqnum_offset,
|
||||
timestamp: timestamp_offset,
|
||||
seqnum_offset,
|
||||
timestamp_offset,
|
||||
});
|
||||
} else if let Some(ssrc_collision) = ssrc_collision {
|
||||
let stream = state.stream.as_mut().unwrap();
|
||||
|
||||
gst::debug!(
|
||||
CAT,
|
||||
imp = self,
|
||||
"Switching from SSRC {} to {} because of SSRC collision",
|
||||
stream.ssrc,
|
||||
ssrc_collision,
|
||||
);
|
||||
|
||||
stream.ssrc = ssrc_collision;
|
||||
self.stats.lock().unwrap().as_mut().unwrap().ssrc = ssrc_collision;
|
||||
}
|
||||
|
||||
let stream = state.stream.as_ref().unwrap();
|
||||
let ssrc = stream.ssrc;
|
||||
let pt = stream.pt;
|
||||
|
@ -1105,7 +1166,7 @@ impl RtpBasePay2 {
|
|||
drop(state);
|
||||
|
||||
// FIXME: Drain also in other conditions?
|
||||
if clock_rate_changed {
|
||||
if clock_rate_changed || ssrc_collision.is_some() {
|
||||
// First push the pending segment event downstream before draining
|
||||
if let Some(segment_event) = segment_event.take() {
|
||||
let _ = self.src_pad.push_event(segment_event);
|
||||
|
@ -1538,47 +1599,6 @@ impl RtpBasePay2 {
|
|||
}
|
||||
}
|
||||
|
||||
if let Some(ssrc_collision) = self.take_ssrc_collision() {
|
||||
let new_ssrc = ssrc_collision;
|
||||
let stream = state.stream.as_mut().unwrap();
|
||||
gst::debug!(
|
||||
CAT,
|
||||
imp = self,
|
||||
"Switching from SSRC {} to {} because of SSRC collision",
|
||||
stream.ssrc,
|
||||
new_ssrc,
|
||||
);
|
||||
stream.ssrc = new_ssrc;
|
||||
|
||||
if let Some(ref src_caps) = state.negotiated_src_caps {
|
||||
let mut src_caps = src_caps.copy();
|
||||
|
||||
{
|
||||
let caps = src_caps.get_mut().unwrap();
|
||||
caps.set("ssrc", new_ssrc);
|
||||
}
|
||||
state.negotiated_src_caps = Some(src_caps.clone());
|
||||
|
||||
let seqnum = if let Some((seqnum, _)) = state.segment {
|
||||
seqnum
|
||||
} else {
|
||||
gst::Seqnum::next()
|
||||
};
|
||||
drop(state);
|
||||
|
||||
let _ = self
|
||||
.src_pad
|
||||
.push_event(gst::event::Caps::builder(&src_caps).seqnum(seqnum).build());
|
||||
|
||||
state = self.state.borrow_mut();
|
||||
}
|
||||
|
||||
let mut stats_guard = self.stats.lock().unwrap();
|
||||
let stats = stats_guard.as_mut().unwrap();
|
||||
stats.ssrc = new_ssrc;
|
||||
drop(stats_guard);
|
||||
}
|
||||
|
||||
let id = state.current_buffer_id;
|
||||
state.current_buffer_id += 1;
|
||||
|
||||
|
@ -1669,31 +1689,31 @@ impl RtpBasePay2 {
|
|||
|
||||
if !state.pending_packets.is_empty() {
|
||||
gst::debug!(CAT, imp = self, "Pushing all pending packets");
|
||||
}
|
||||
|
||||
// Update PTS and RTP timestamp of all pending packets that have none yet
|
||||
let pts = state.last_pts;
|
||||
let rtptime = state.last_timestamp.0;
|
||||
let mut discont_pending = state.discont_pending;
|
||||
state.discont_pending = false;
|
||||
// Update PTS and RTP timestamp of all pending packets that have none yet
|
||||
let pts = state.last_pts;
|
||||
let rtptime = state.stream.as_ref().unwrap().last_timestamp.0;
|
||||
let mut discont_pending = state.discont_pending;
|
||||
state.discont_pending = false;
|
||||
|
||||
for packet in state
|
||||
.pending_packets
|
||||
.iter_mut()
|
||||
.skip_while(|p| p.buffer.pts().is_some())
|
||||
{
|
||||
assert!(packet.buffer.pts().is_none());
|
||||
let buffer = packet.buffer.get_mut().unwrap();
|
||||
buffer.set_pts(pts);
|
||||
for packet in state
|
||||
.pending_packets
|
||||
.iter_mut()
|
||||
.skip_while(|p| p.buffer.pts().is_some())
|
||||
{
|
||||
assert!(packet.buffer.pts().is_none());
|
||||
let buffer = packet.buffer.get_mut().unwrap();
|
||||
buffer.set_pts(pts);
|
||||
|
||||
if discont_pending {
|
||||
buffer.set_flags(gst::BufferFlags::DISCONT);
|
||||
discont_pending = false;
|
||||
if discont_pending {
|
||||
buffer.set_flags(gst::BufferFlags::DISCONT);
|
||||
discont_pending = false;
|
||||
}
|
||||
|
||||
let mut map = buffer.map_writable().unwrap();
|
||||
let mut packet = rtp_types::RtpPacketMut::parse(&mut map).unwrap();
|
||||
packet.set_timestamp(rtptime);
|
||||
}
|
||||
|
||||
let mut map = buffer.map_writable().unwrap();
|
||||
let mut packet = rtp_types::RtpPacketMut::parse(&mut map).unwrap();
|
||||
packet.set_timestamp(rtptime);
|
||||
}
|
||||
|
||||
drop(state);
|
||||
|
@ -1765,6 +1785,7 @@ impl ObjectSubclass for RtpBasePay2 {
|
|||
|
||||
class.allowed_meta_tags = &[];
|
||||
class.drop_header_buffers = false;
|
||||
class.default_pt = 96;
|
||||
}
|
||||
|
||||
fn with_class(class: &Self::Class) -> Self {
|
||||
|
@ -1815,11 +1836,16 @@ impl ObjectSubclass for RtpBasePay2 {
|
|||
.flags(gst::PadFlags::FIXED_CAPS)
|
||||
.build();
|
||||
|
||||
let settings = Settings {
|
||||
pt: class.default_pt,
|
||||
..Settings::default()
|
||||
};
|
||||
|
||||
Self {
|
||||
src_pad,
|
||||
sink_pad,
|
||||
state: AtomicRefCell::default(),
|
||||
settings: Mutex::default(),
|
||||
settings: Mutex::new(settings),
|
||||
stats: Mutex::default(),
|
||||
ssrc_collision: Mutex::new(None),
|
||||
extensions: Mutex::default(),
|
||||
|
@ -1995,7 +2021,9 @@ impl ObjectImpl for RtpBasePay2 {
|
|||
self.settings.lock().unwrap().mtu = value.get().unwrap();
|
||||
}
|
||||
"pt" => {
|
||||
self.settings.lock().unwrap().pt = value.get::<u32>().unwrap() as u8;
|
||||
let mut settings = self.settings.lock().unwrap();
|
||||
settings.pt = value.get::<u32>().unwrap() as u8;
|
||||
settings.pt_set = true;
|
||||
}
|
||||
"ssrc" => {
|
||||
let v = value.get::<i64>().unwrap();
|
||||
|
@ -2108,46 +2136,8 @@ impl ElementImpl for RtpBasePay2 {
|
|||
gst::debug!(CAT, imp = self, "Changing state: {transition}");
|
||||
|
||||
if transition == gst::StateChange::ReadyToPaused {
|
||||
{
|
||||
use rand::prelude::*;
|
||||
let mut rng = rand::thread_rng();
|
||||
|
||||
let settings = self.settings.lock().unwrap();
|
||||
|
||||
let pt = settings.pt;
|
||||
let ssrc = settings.ssrc.unwrap_or_else(|| rng.gen::<u32>());
|
||||
let timestamp_offset = settings
|
||||
.timestamp_offset
|
||||
.unwrap_or_else(|| rng.gen::<u32>());
|
||||
let seqnum_offset = settings.seqnum_offset.unwrap_or_else(|| rng.gen::<u16>());
|
||||
let stream = Stream {
|
||||
pt,
|
||||
ssrc,
|
||||
timestamp_offset,
|
||||
seqnum_offset,
|
||||
use_stream_time: settings.onvif_no_rate_control || !settings.scale_rtptime,
|
||||
};
|
||||
|
||||
gst::info!(CAT, imp = self, "Configuring {stream:?}");
|
||||
|
||||
*self.state.borrow_mut() = State {
|
||||
stream: Some(stream),
|
||||
last_seqnum: Wrapping(seqnum_offset),
|
||||
last_timestamp: Wrapping(timestamp_offset),
|
||||
..State::default()
|
||||
};
|
||||
|
||||
*self.stats.lock().unwrap() = Some(Stats {
|
||||
ssrc,
|
||||
pt,
|
||||
clock_rate: None,
|
||||
running_time: None,
|
||||
seqnum: seqnum_offset,
|
||||
timestamp: timestamp_offset,
|
||||
seqnum_offset,
|
||||
timestamp_offset,
|
||||
});
|
||||
}
|
||||
*self.state.borrow_mut() = State::default();
|
||||
*self.stats.lock().unwrap() = None;
|
||||
|
||||
let obj = self.obj();
|
||||
(obj.class().as_ref().start)(&obj).map_err(|err_msg| {
|
||||
|
|
|
@ -131,6 +131,9 @@ pub trait RtpBasePay2Impl: ElementImpl {
|
|||
/// to be implemented.
|
||||
const ALLOWED_META_TAGS: &'static [&'static str] = &[];
|
||||
|
||||
/// Default payload type for this subclass.
|
||||
const DEFAULT_PT: u8 = 96;
|
||||
|
||||
/// Called when streaming starts (READY -> PAUSED state change)
|
||||
///
|
||||
/// Optional, can be used to initialise streaming state.
|
||||
|
@ -380,6 +383,7 @@ pub struct Class {
|
|||
|
||||
allowed_meta_tags: &'static [&'static str],
|
||||
drop_header_buffers: bool,
|
||||
default_pt: u8,
|
||||
}
|
||||
|
||||
unsafe impl ClassStruct for Class {
|
||||
|
@ -462,6 +466,7 @@ unsafe impl<T: RtpBasePay2Impl> IsSubclassable<T> for RtpBasePay2 {
|
|||
|
||||
class.allowed_meta_tags = T::ALLOWED_META_TAGS;
|
||||
class.drop_header_buffers = T::DROP_HEADER_BUFFERS;
|
||||
class.default_pt = T::DEFAULT_PT;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -25,7 +25,7 @@ use atomic_refcell::AtomicRefCell;
|
|||
*
|
||||
* Since: plugins-rs-0.13.0
|
||||
*/
|
||||
use gst::{glib, prelude::*, subclass::prelude::*};
|
||||
use gst::{glib, subclass::prelude::*};
|
||||
use smallvec::SmallVec;
|
||||
use std::{cmp, io};
|
||||
|
||||
|
@ -64,14 +64,7 @@ impl ObjectSubclass for RtpJpegPay {
|
|||
type ParentType = crate::basepay::RtpBasePay2;
|
||||
}
|
||||
|
||||
impl ObjectImpl for RtpJpegPay {
|
||||
fn constructed(&self) {
|
||||
self.parent_constructed();
|
||||
|
||||
// Default static payload type
|
||||
self.obj().set_property("pt", 26u32);
|
||||
}
|
||||
}
|
||||
impl ObjectImpl for RtpJpegPay {}
|
||||
|
||||
impl GstObjectImpl for RtpJpegPay {}
|
||||
|
||||
|
@ -138,6 +131,7 @@ impl ElementImpl for RtpJpegPay {
|
|||
|
||||
impl crate::basepay::RtpBasePay2Impl for RtpJpegPay {
|
||||
const ALLOWED_META_TAGS: &'static [&'static str] = &["video"];
|
||||
const DEFAULT_PT: u8 = 26;
|
||||
|
||||
fn start(&self) -> Result<(), gst::ErrorMessage> {
|
||||
let mut state = self.state.borrow_mut();
|
||||
|
|
|
@ -26,7 +26,7 @@
|
|||
*/
|
||||
use atomic_refcell::AtomicRefCell;
|
||||
|
||||
use gst::{glib, prelude::*, subclass::prelude::*};
|
||||
use gst::{glib, subclass::prelude::*};
|
||||
|
||||
use once_cell::sync::Lazy;
|
||||
|
||||
|
@ -34,7 +34,7 @@ use std::num::NonZeroUsize;
|
|||
|
||||
use crate::basepay::{PacketToBufferRelation, RtpBasePay2Ext};
|
||||
|
||||
const RTP_MP2T_DEFAULT_PT: u32 = 33;
|
||||
const RTP_MP2T_DEFAULT_PT: u8 = 33;
|
||||
|
||||
const RTP_MP2T_DEFAULT_PACKET_SIZE: usize = 188;
|
||||
|
||||
|
@ -113,13 +113,7 @@ impl ObjectSubclass for RtpMP2TPay {
|
|||
type ParentType = crate::basepay::RtpBasePay2;
|
||||
}
|
||||
|
||||
impl ObjectImpl for RtpMP2TPay {
|
||||
fn constructed(&self) {
|
||||
self.parent_constructed();
|
||||
|
||||
self.obj().set_property("pt", RTP_MP2T_DEFAULT_PT);
|
||||
}
|
||||
}
|
||||
impl ObjectImpl for RtpMP2TPay {}
|
||||
|
||||
impl GstObjectImpl for RtpMP2TPay {}
|
||||
|
||||
|
@ -182,6 +176,7 @@ impl ElementImpl for RtpMP2TPay {
|
|||
|
||||
impl crate::basepay::RtpBasePay2Impl for RtpMP2TPay {
|
||||
const ALLOWED_META_TAGS: &'static [&'static str] = &[];
|
||||
const DEFAULT_PT: u8 = RTP_MP2T_DEFAULT_PT;
|
||||
|
||||
fn set_sink_caps(&self, caps: &gst::Caps) -> bool {
|
||||
let s = caps.structure(0).unwrap();
|
||||
|
|
|
@ -115,14 +115,7 @@ impl ObjectSubclass for RtpPcmaPay {
|
|||
type ParentType = super::RtpPcmauPay;
|
||||
}
|
||||
|
||||
impl ObjectImpl for RtpPcmaPay {
|
||||
fn constructed(&self) {
|
||||
self.parent_constructed();
|
||||
|
||||
// Default to payload type 8
|
||||
self.obj().set_property("pt", 8u32);
|
||||
}
|
||||
}
|
||||
impl ObjectImpl for RtpPcmaPay {}
|
||||
|
||||
impl GstObjectImpl for RtpPcmaPay {}
|
||||
|
||||
|
@ -183,7 +176,9 @@ impl ElementImpl for RtpPcmaPay {
|
|||
}
|
||||
}
|
||||
|
||||
impl crate::basepay::RtpBasePay2Impl for RtpPcmaPay {}
|
||||
impl crate::basepay::RtpBasePay2Impl for RtpPcmaPay {
|
||||
const DEFAULT_PT: u8 = 8;
|
||||
}
|
||||
|
||||
impl crate::baseaudiopay::RtpBaseAudioPay2Impl for RtpPcmaPay {}
|
||||
|
||||
|
@ -217,14 +212,7 @@ impl ObjectSubclass for RtpPcmuPay {
|
|||
type ParentType = super::RtpPcmauPay;
|
||||
}
|
||||
|
||||
impl ObjectImpl for RtpPcmuPay {
|
||||
fn constructed(&self) {
|
||||
self.parent_constructed();
|
||||
|
||||
// Default to payload type 0
|
||||
self.obj().set_property("pt", 0u32);
|
||||
}
|
||||
}
|
||||
impl ObjectImpl for RtpPcmuPay {}
|
||||
|
||||
impl GstObjectImpl for RtpPcmuPay {}
|
||||
|
||||
|
@ -285,7 +273,9 @@ impl ElementImpl for RtpPcmuPay {
|
|||
}
|
||||
}
|
||||
|
||||
impl crate::basepay::RtpBasePay2Impl for RtpPcmuPay {}
|
||||
impl crate::basepay::RtpBasePay2Impl for RtpPcmuPay {
|
||||
const DEFAULT_PT: u8 = 0;
|
||||
}
|
||||
|
||||
impl crate::baseaudiopay::RtpBaseAudioPay2Impl for RtpPcmuPay {}
|
||||
|
||||
|
|
Loading…
Reference in a new issue