gst-plugins-rs/net/rtp/src/basepay/imp.rs
Martin Nordholts a719cbfcc6 rtp: Change RtpBasePay2::ssrc_collision from AtomicU64 to Option<u32>
Rust targets without support for `AtomicU64` is still
somewhat common. Running

    git grep -i 'max_atomic_width: Some(32)' | wc -l

in the Rust compiler repo currently counts to 34 targets.

Change the `RtpBasePay2::ssrc_collision` from `AtomicU64` to
`Mutex<Option<u32>>`. This way we keep support for these
targets.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1562>
2024-05-10 14:23:41 +00:00

2118 lines
74 KiB
Rust

//
// Copyright (C) 2023 Sebastian Dröge <sebastian@centricular.com>
//
// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
// If a copy of the MPL was not distributed with this file, You can obtain one at
// <https://mozilla.org/MPL/2.0/>.
//
// SPDX-License-Identifier: MPL-2.0
use atomic_refcell::AtomicRefCell;
use gst::{glib, prelude::*, subclass::prelude::*};
use gst_rtp::prelude::*;
use once_cell::sync::Lazy;
use smallvec::SmallVec;
use std::{
collections::{BTreeMap, VecDeque},
num::Wrapping,
ops::{Bound, RangeBounds},
sync::Mutex,
};
use super::PacketToBufferRelation;
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
gst::DebugCategory::new(
"rtpbasepay2",
gst::DebugColorFlags::empty(),
Some("RTP Base Payloader 2"),
)
});
#[derive(Clone, Debug)]
struct Settings {
mtu: u32,
pt: u8,
ssrc: Option<u32>,
timestamp_offset: Option<u32>,
seqnum_offset: Option<u16>,
onvif_no_rate_control: bool,
scale_rtptime: bool,
source_info: bool,
auto_header_extensions: bool,
}
impl Default for Settings {
fn default() -> Self {
Self {
// TODO: Should we use a different default here? libwebrtc uses 1200, for example
mtu: 1400,
pt: 96,
ssrc: None,
timestamp_offset: None,
seqnum_offset: None,
onvif_no_rate_control: false,
scale_rtptime: true,
source_info: false,
auto_header_extensions: true,
}
}
}
#[derive(Debug)]
struct Stream {
pt: u8,
ssrc: u32,
timestamp_offset: u32,
#[allow(dead_code)]
seqnum_offset: u16,
/// Set if onvif_no_rate_control || !scale_rtptime
use_stream_time: bool,
}
/// Metadata of a pending buffer for tracking purposes.
struct PendingBuffer {
/// Buffer id of this buffer.
id: u64,
buffer: gst::Buffer,
}
/// A pending packet that still has to be sent downstream.
struct PendingPacket {
/// If no PTS is set then this packet also has no timestamp yet
/// and must be pushed together with the next packet where
/// it is actually known, or at EOS with the last known one.
buffer: gst::Buffer,
}
struct State {
segment: Option<(gst::Seqnum, gst::FormattedSegment<gst::ClockTime>)>,
/// Set when a new segment event should be sent downstream before the next buffer.
pending_segment: bool,
sink_caps: Option<gst::Caps>,
src_caps: Option<gst::Caps>,
negotiated_src_caps: Option<gst::Caps>,
/// Set when the srcpad caps are known.
clock_rate: Option<u32>,
/// Stream configuration. Set in Ready->Paused.
stream: Option<Stream>,
/// Set when the next outgoing buffer should have the discont flag set.
discont_pending: bool,
/// Buffer id of the next buffer.
current_buffer_id: u64,
/// Last buffer id that was used for a packet.
last_used_buffer_id: u64,
/// Last seqnum that was put on a packet. This is initialized with stream.seqnum_offset.
last_seqnum: Wrapping<u16>,
/// Last RTP timestamp that was put on a packet. This is initialized with stream.timestamp_offset.
last_timestamp: Wrapping<u32>,
/// Last PTS that was put on an outgoing buffer.
last_pts: Option<gst::ClockTime>,
/// Last PTS / RTP time mapping.
///
/// This is reset when draining and used if the subclass provides RTP timestamp offsets
/// to get the base mapping.
last_pts_rtp_mapping: Option<(gst::ClockTime, u32)>,
/// Pending input buffers that were not completely used up for outgoing packets yet.
pending_buffers: VecDeque<PendingBuffer>,
/// Pending packets that have to be pushed downstream. Some of them might not have a PTS or RTP
/// timestamp yet.
pending_packets: VecDeque<PendingPacket>,
}
impl Default for State {
fn default() -> Self {
Self {
segment: None,
pending_segment: false,
sink_caps: None,
src_caps: None,
negotiated_src_caps: None,
clock_rate: None,
stream: None,
discont_pending: true,
current_buffer_id: 0,
last_used_buffer_id: 0,
last_seqnum: Wrapping(0),
last_timestamp: Wrapping(0),
last_pts: None,
last_pts_rtp_mapping: None,
pending_buffers: VecDeque::new(),
pending_packets: VecDeque::new(),
}
}
}
#[derive(Clone, Debug)]
struct Stats {
ssrc: u32,
pt: u8,
// Only known once the caps are known
clock_rate: Option<u32>,
running_time: Option<gst::ClockTime>,
seqnum: u16,
timestamp: u32,
seqnum_offset: u16,
timestamp_offset: u32,
}
pub struct RtpBasePay2 {
sink_pad: gst::Pad,
src_pad: gst::Pad,
state: AtomicRefCell<State>,
settings: Mutex<Settings>,
stats: Mutex<Option<Stats>>,
/// If Some then there was an SSRC collision and we should switch to the
/// provided SSRC here.
ssrc_collision: Mutex<Option<u32>>,
/// Currently configured header extensions
extensions: Mutex<BTreeMap<u8, gst_rtp::RTPHeaderExtension>>,
}
/// Wrappers for public methods and associated helper functions.
#[allow(dead_code)]
impl RtpBasePay2 {
pub(super) fn mtu(&self) -> u32 {
self.settings.lock().unwrap().mtu
}
pub(super) fn max_payload_size(&self) -> u32 {
let settings = self.settings.lock().unwrap();
// FIXME: This does not consider the space needed for header extensions. Doing so would
// require knowing the buffer id range so we can calculate the maximum here.
settings
.mtu
.saturating_sub(if settings.source_info {
rtp_types::RtpPacket::MAX_N_CSRCS as u32 * 4
} else {
0
})
.saturating_sub(rtp_types::RtpPacket::MIN_RTP_PACKET_LEN as u32)
}
pub(super) fn set_src_caps(&self, src_caps: &gst::Caps) {
gst::debug!(CAT, imp: self, "Setting src caps {src_caps:?}");
let s = src_caps.structure(0).unwrap();
assert!(
s.has_name("application/x-rtp"),
"Non-RTP caps {src_caps:?} not supported"
);
let mut state = self.state.borrow_mut();
state.src_caps = Some(src_caps.clone());
drop(state);
self.negotiate();
}
fn negotiate(&self) {
let state = self.state.borrow_mut();
let Some(ref src_caps) = state.src_caps else {
gst::debug!(CAT, imp: self, "No src caps set yet, can't negotiate");
return;
};
let mut src_caps = src_caps.clone();
drop(state);
gst::debug!(CAT, imp: self, "Configured src caps: {src_caps:?}");
let peer_caps = self.src_pad.peer_query_caps(Some(&src_caps));
if !peer_caps.is_empty() {
gst::debug!(CAT, imp: self, "Peer caps: {peer_caps:?}");
src_caps = peer_caps;
} else {
gst::debug!(CAT, imp: self, "Empty peer caps");
}
gst::debug!(CAT, imp: self, "Negotiating with caps {src_caps:?}");
src_caps.make_mut();
let obj = self.obj();
(obj.class().as_ref().negotiate)(&obj, src_caps);
}
pub(super) fn drop_buffers(&self, ids: impl RangeBounds<u64>) {
gst::trace!(CAT, imp: self, "Dropping buffers up to {:?}", ids.end_bound());
let mut state = self.state.borrow_mut();
let end = match ids.end_bound() {
Bound::Included(end) => *end,
Bound::Excluded(end) if *end == 0 => return,
Bound::Excluded(end) => *end - 1,
Bound::Unbounded => {
state.pending_buffers.clear();
return;
}
};
if let Some(back) = state.pending_buffers.back() {
if back.id <= end {
state.pending_buffers.clear();
return;
}
} else {
return;
}
while state.pending_buffers.front().map_or(false, |b| b.id <= end) {
let _ = state.pending_buffers.pop_front();
}
}
pub(super) fn queue_packet(
&self,
packet_to_buffer_relation: PacketToBufferRelation,
mut packet: rtp_types::RtpPacketBuilder<&[u8], &[u8]>,
) -> Result<gst::FlowSuccess, gst::FlowError> {
gst::trace!(
CAT,
imp: self,
"Queueing packet for {packet_to_buffer_relation:?}",
);
let settings = self.settings.lock().unwrap().clone();
let mut state = self.state.borrow_mut();
if state.negotiated_src_caps.is_none() {
gst::error!(CAT, imp: self, "No source pad caps negotiated yet");
return Err(gst::FlowError::NotNegotiated);
}
state.last_seqnum += 1;
let stream = state.stream.as_ref().unwrap();
let seqnum = state.last_seqnum.0;
gst::trace!(CAT, imp: self, "Using seqnum {seqnum}");
packet = packet
.payload_type(stream.pt)
.ssrc(stream.ssrc)
.sequence_number(seqnum);
let use_stream_time = stream.use_stream_time;
// Queue up packet for later if it doesn't have any associated buffers that could be used
// for figuring out the timestamp.
if matches!(packet_to_buffer_relation, PacketToBufferRelation::OutOfBand) {
gst::trace!(CAT, imp: self, "Keeping packet without associated buffer until next packet or EOS");
// FIXME: Use more optimal packet writing API once available
// https://github.com/ystreet/rtp-types/issues/4
// https://github.com/ystreet/rtp-types/issues/5
// TODO: Maybe use an MTU-sized buffer pool?
let packet_buffer = packet.write_vec().map_err(|err| {
gst::error!(CAT, imp: self, "Can't write packet: {err}");
gst::FlowError::Error
})?;
let packet_len = packet_buffer.len();
if (settings.mtu as usize) < packet_len {
gst::warning!(CAT, imp: self, "Generated packet has bigger size {packet_len} than MTU {}", settings.mtu);
}
gst::trace!(CAT, imp: self, "Queueing packet of size {packet_len}");
let marker = {
let packet = rtp_types::RtpPacket::parse(&packet_buffer).unwrap();
packet.marker_bit()
};
let mut buffer = gst::Buffer::from_mut_slice(packet_buffer);
{
let buffer = buffer.get_mut().unwrap();
if marker {
buffer.set_flags(gst::BufferFlags::MARKER);
}
if state.discont_pending {
buffer.set_flags(gst::BufferFlags::DISCONT);
state.discont_pending = false;
}
}
state.pending_packets.push_back(PendingPacket { buffer });
return Ok(gst::FlowSuccess::Ok);
};
let (ids, timestamp_offset) = match packet_to_buffer_relation {
PacketToBufferRelation::Ids(ids) => (ids.clone(), None),
PacketToBufferRelation::IdsWithOffset {
ids,
timestamp_offset,
} => (ids.clone(), Some(timestamp_offset)),
PacketToBufferRelation::OutOfBand => unreachable!(),
};
if ids.is_empty() {
gst::error!(CAT, imp: self, "Empty buffer id range provided");
return Err(gst::FlowError::Error);
}
let id_start = *ids.start();
let id_end = *ids.end();
// Drop all older buffers
while state
.pending_buffers
.front()
.map_or(false, |b| b.id < id_start)
{
let b = state.pending_buffers.pop_front().unwrap();
gst::trace!(CAT, imp: self, "Dropping buffer with id {}", b.id);
}
let Some(front) = state.pending_buffers.front() else {
gst::error!(CAT, imp: self, "Queueing packet for future buffer ids not allowed");
return Err(gst::FlowError::Error);
};
if id_end > state.pending_buffers.back().unwrap().id {
gst::error!(CAT, imp: self, "Queueing packet for future buffer ids not allowed");
return Err(gst::FlowError::Error);
}
// Set when caps are set
let clock_rate = state.clock_rate.unwrap();
// Clip PTS to the segment boundaries
let segment = state.segment.as_ref().unwrap().1.clone();
let pts = front.buffer.pts().unwrap();
let pts = {
let start = segment.start().expect("no segment start");
let stop = segment.stop();
if pts <= start {
start
} else if let Some(stop) = stop {
if pts >= stop {
stop
} else {
pts
}
} else {
pts
}
};
let rtptime_base = if use_stream_time {
segment.to_stream_time(pts).unwrap()
} else {
segment.to_running_time(pts).unwrap()
};
let packet_pts;
let packet_rtptime;
if let Some(timestamp_offset) = timestamp_offset {
match timestamp_offset {
crate::basepay::TimestampOffset::Pts(pts_diff) => {
if let Some(stop) = segment.stop() {
if pts + pts_diff > stop {
packet_pts = stop
} else {
packet_pts = pts + pts_diff
}
} else {
packet_pts = pts + pts_diff
}
let packet_rtptime_base = if use_stream_time {
segment.to_stream_time(packet_pts).unwrap()
} else {
segment.to_running_time(packet_pts).unwrap()
};
packet_rtptime = (*packet_rtptime_base
.mul_div_ceil(clock_rate as u64, *gst::ClockTime::SECOND)
.unwrap()
& 0xffff_ffff) as u32;
state.last_pts_rtp_mapping = Some((packet_pts, packet_rtptime));
}
crate::basepay::TimestampOffset::Rtp(rtp_diff) => {
let Some((base_pts, base_rtptime)) = state.last_pts_rtp_mapping else {
gst::error!(CAT, imp: self, "Have no base PTS / RTP time mapping");
return Err(gst::FlowError::Error);
};
packet_pts = base_pts
+ gst::ClockTime::from_nseconds(
rtp_diff
.mul_div_ceil(*gst::ClockTime::SECOND, clock_rate as u64)
.unwrap(),
);
let rate = if use_stream_time {
segment.applied_rate()
} else {
1.0 / segment.rate()
};
let rtp_diff = if rate != 0.0 {
(rtp_diff as f64 * rate) as u64
} else {
rtp_diff
};
packet_rtptime = base_rtptime + (rtp_diff & 0xffff_ffff) as u32;
}
}
} else {
packet_pts = pts;
packet_rtptime = (*rtptime_base
.mul_div_ceil(clock_rate as u64, *gst::ClockTime::SECOND)
.unwrap()
& 0xffff_ffff) as u32;
state.last_pts_rtp_mapping = Some((packet_pts, packet_rtptime));
}
// Check if any discont is pending here
let mut discont_pending = state.discont_pending
|| (state.last_used_buffer_id < id_end
&& state
.pending_buffers
.iter()
.skip_while(|b| b.id <= state.last_used_buffer_id)
.take_while(|b| b.id <= id_end)
.any(|b| b.buffer.flags().contains(gst::BufferFlags::DISCONT)));
state.discont_pending = false;
let stream = state.stream.as_ref().unwrap();
let packet_rtptime = stream.timestamp_offset.wrapping_add(packet_rtptime);
// update pts and rtp timestamp of all pending packets that have none yet
for packet in state
.pending_packets
.iter_mut()
.skip_while(|p| p.buffer.pts().is_some())
{
assert!(packet.buffer.pts().is_none());
let buffer = packet.buffer.get_mut().unwrap();
buffer.set_pts(pts);
if discont_pending {
buffer.set_flags(gst::BufferFlags::DISCONT);
discont_pending = false;
}
let mut map = buffer.map_writable().unwrap();
let mut packet = rtp_types::RtpPacketMut::parse(&mut map).unwrap();
packet.set_timestamp(packet_rtptime);
}
packet = packet.timestamp(packet_rtptime);
if settings.source_info
/* || header_extensions */
{
let mut csrc = [0u32; rtp_types::RtpPacket::MAX_N_CSRCS];
let mut n_csrc = 0;
// Copy over metas and other metadata from the packets that made up this buffer
for front in state.pending_buffers.iter().take_while(|b| b.id <= id_end) {
if n_csrc < rtp_types::RtpPacket::MAX_N_CSRCS {
if let Some(meta) = front.buffer.meta::<gst_rtp::RTPSourceMeta>() {
if let Some(ssrc) = meta.ssrc() {
if !csrc[..n_csrc].contains(&ssrc) {
csrc[n_csrc] = ssrc;
n_csrc += 1;
}
}
for c in meta.csrc() {
if n_csrc >= rtp_types::RtpPacket::MAX_N_CSRCS {
break;
}
if !csrc[..n_csrc].contains(c) {
csrc[n_csrc] = *c;
n_csrc += 1;
}
}
}
}
}
if n_csrc > 0 {
for c in &csrc[..n_csrc] {
packet = packet.add_csrc(*c);
}
}
}
// Calculate size and flags for header extensions
// FIXME: We're only considering the first buffer that makes up this packet for RTP header
// extensions.
let extension_input_buffer = state
.pending_buffers
.iter()
.take_while(|b| b.id <= id_end)
.next()
.expect("no input buffer for this packet");
let extensions = self.extensions.lock().unwrap();
let mut extension_flags =
gst_rtp::RTPHeaderExtensionFlags::ONE_BYTE | gst_rtp::RTPHeaderExtensionFlags::TWO_BYTE;
let mut extension_size = 0;
for extension in extensions.values() {
extension_flags &= extension.supported_flags();
let max_size = extension.max_size(&extension_input_buffer.buffer);
let ext_id = extension.id();
if max_size > 16 || ext_id > 14 {
extension_flags -= gst_rtp::RTPHeaderExtensionFlags::ONE_BYTE;
}
if max_size > 255 || ext_id > 255 {
extension_flags -= gst_rtp::RTPHeaderExtensionFlags::TWO_BYTE;
}
extension_size += max_size;
}
let extension_pattern =
if extension_flags.contains(gst_rtp::RTPHeaderExtensionFlags::ONE_BYTE) {
extension_flags = gst_rtp::RTPHeaderExtensionFlags::ONE_BYTE;
extension_size += extensions.len();
0xBEDE
} else if extension_flags.contains(gst_rtp::RTPHeaderExtensionFlags::TWO_BYTE) {
extension_flags = gst_rtp::RTPHeaderExtensionFlags::TWO_BYTE;
extension_size += 2 * extensions.len();
0x1000
} else {
extension_flags = gst_rtp::RTPHeaderExtensionFlags::empty();
extension_size = 0;
0
};
// Round up to a multiple of 4 bytes
extension_size = ((extension_size + 3) / 4) * 4;
// If there are extensions, write an empty extension area of the required size. If this is
// not filled then it would be considered as padding inside the extension because of the
// zeroes, which are not a valid extension ID.
let mut extension_data = SmallVec::<[u8; 256]>::with_capacity(extension_size);
extension_data.resize(extension_size, 0);
let mut packet = packet;
if !extension_data.is_empty() && extension_pattern != 0 {
packet = packet.extension(extension_pattern, extension_data.as_slice());
}
// Queue up packet and timestamp all others without timestamp that are before it.
// FIXME: Use more optimal packet writing API once available
// https://github.com/ystreet/rtp-types/issues/4
// https://github.com/ystreet/rtp-types/issues/5
// TODO: Maybe use an MTU-sized buffer pool?
let packet_buffer = packet.write_vec().map_err(|err| {
gst::error!(CAT, imp: self, "Can't write packet: {err}");
gst::FlowError::Error
})?;
let packet_len = packet_buffer.len();
// FIXME: See comment in `max_payload_size()`. We currently don't provide a way to the
// subclass to know how much space will be used up by extensions.
if (settings.mtu as usize) < packet_len - extension_size {
gst::warning!(CAT, imp: self, "Generated packet has bigger size {packet_len} than MTU {}", settings.mtu);
}
gst::trace!(
CAT,
imp: self,
"Queueing packet of size {packet_len} with RTP timestamp {packet_rtptime} and PTS {packet_pts}",
);
let marker = {
let packet = rtp_types::RtpPacket::parse(&packet_buffer).unwrap();
packet.marker_bit()
};
let mut buffer = gst::Buffer::from_mut_slice(packet_buffer);
{
let buffer = buffer.get_mut().unwrap();
buffer.set_pts(packet_pts);
if marker {
buffer.set_flags(gst::BufferFlags::MARKER);
}
if discont_pending {
buffer.set_flags(gst::BufferFlags::DISCONT);
}
// Copy over metas and other metadata from the packets that made up this buffer
let obj = self.obj();
for front in state.pending_buffers.iter().take_while(|b| b.id <= id_end) {
use std::ops::ControlFlow::*;
front.buffer.foreach_meta(|meta| {
// Do not copy metas that are memory specific
if meta.has_tag::<gst::meta::tags::Memory>()
|| meta.has_tag::<gst::meta::tags::MemoryReference>()
{
return Continue(());
}
(obj.class().as_ref().transform_meta)(&obj, &front.buffer, &meta, buffer);
Continue(())
});
}
}
// Finally write extensions into the otherwise complete packet
if extension_size != 0 && extension_pattern != 0 {
let buffer = buffer.get_mut().unwrap();
// FIXME Get a mutable reference to the output buffer via a pointer to
// work around bug in the C API.
// See https://gitlab.freedesktop.org/gstreamer/gstreamer-rs/-/issues/375
let buffer_ptr = buffer.as_mut_ptr();
let mut map = buffer.map_writable().unwrap();
let mut packet = rtp_types::RtpPacketMut::parse(&mut map).unwrap();
let mut extension_data = packet.extension_mut().unwrap();
for (ext_id, extension) in extensions.iter() {
let offset = if extension_flags == gst_rtp::RTPHeaderExtensionFlags::ONE_BYTE {
1
} else {
2
};
if extension_data.len() < offset {
gst::error!(CAT, imp: self, "No space left for writing RTP header extension {ext_id}");
break;
}
if let Ok(written) = extension.write(
&extension_input_buffer.buffer,
extension_flags,
unsafe { gst::BufferRef::from_mut_ptr(buffer_ptr) },
&mut extension_data[offset..],
) {
// Nothing written, can just continue
if written == 0 {
continue;
}
if extension_flags == gst_rtp::RTPHeaderExtensionFlags::ONE_BYTE {
assert!(written <= 16);
extension_data[0] = (*ext_id << 4) | (written as u8 - 1);
} else {
assert!(written <= 255);
extension_data[0] = *ext_id;
extension_data[1] = written as u8;
}
extension_data = &mut extension_data[offset + written..];
} else {
gst::error!(CAT, imp: self, "Writing RTP header extension {ext_id} failed");
}
}
}
drop(extensions);
state.pending_packets.push_back(PendingPacket { buffer });
state.last_used_buffer_id = id_end;
state.last_timestamp = Wrapping(packet_rtptime);
state.last_pts = Some(pts);
// Update stats
if let Some(stats) = self.stats.lock().unwrap().as_mut() {
stats.running_time = segment.to_running_time(packet_pts);
stats.seqnum = seqnum;
stats.timestamp = packet_rtptime;
}
Ok(gst::FlowSuccess::Ok)
}
pub(super) fn finish_pending_packets(&self) -> Result<gst::FlowSuccess, gst::FlowError> {
let mut state = self.state.borrow_mut();
// As long as there are packets that can be finished, take all with the same PTS and put
// them into a buffer list.
while state
.pending_packets
.iter()
.any(|p| p.buffer.pts().is_some())
{
let pts = state.pending_packets.front().unwrap().buffer.pts().unwrap();
// First count number of pending packets so we can allocate a buffer list with the correct
// size instead of re-allocating for every added buffer, or simply push a single buffer
// downstream instead of allocating a buffer list just for one buffer.
let num_buffers = state
.pending_packets
.iter()
.take_while(|p| p.buffer.pts() == Some(pts))
.count();
assert_ne!(num_buffers, 0);
gst::trace!(CAT, imp: self, "Flushing {num_buffers} packets");
let segment_event = self.retrieve_pending_segment_event(&mut state);
enum BufferOrList {
Buffer(gst::Buffer),
List(gst::BufferList),
}
let packets = if num_buffers == 1 {
let buffer = state.pending_packets.pop_front().unwrap().buffer;
gst::trace!(CAT, imp: self, "Finishing buffer {buffer:?}");
BufferOrList::Buffer(buffer)
} else {
let mut list = gst::BufferList::new_sized(num_buffers);
{
let list = list.get_mut().unwrap();
while state
.pending_packets
.front()
.map_or(false, |p| p.buffer.pts() == Some(pts))
{
let buffer = state.pending_packets.pop_front().unwrap().buffer;
gst::trace!(CAT, imp: self, "Finishing buffer {buffer:?}");
list.add(buffer);
}
}
BufferOrList::List(list)
};
drop(state);
if let Some(segment_event) = segment_event {
let _ = self.src_pad.push_event(segment_event);
}
let res = match packets {
BufferOrList::Buffer(buffer) => self.src_pad.push(buffer),
BufferOrList::List(list) => self.src_pad.push_list(list),
};
if let Err(err) = res {
if ![gst::FlowError::Flushing, gst::FlowError::Eos].contains(&err) {
gst::warning!(CAT, imp: self, "Failed pushing packets: {err:?}");
} else {
gst::debug!(CAT, imp: self, "Failed pushing packets: {err:?}");
}
return Err(err);
}
state = self.state.borrow_mut();
}
Ok(gst::FlowSuccess::Ok)
}
pub(super) fn sink_pad(&self) -> &gst::Pad {
&self.sink_pad
}
pub(super) fn src_pad(&self) -> &gst::Pad {
&self.src_pad
}
/// Helper so we don't manually have to set ssrc_collision to None if there
/// is Some collision to handle.
fn take_ssrc_collision(&self) -> Option<u32> {
let mut ssrc_collision = self.ssrc_collision.lock().unwrap();
ssrc_collision.take()
}
}
/// Default virtual method implementations.
impl RtpBasePay2 {
fn start_default(&self) -> Result<(), gst::ErrorMessage> {
Ok(())
}
fn stop_default(&self) -> Result<(), gst::ErrorMessage> {
Ok(())
}
fn set_sink_caps_default(&self, _caps: &gst::Caps) -> bool {
true
}
fn negotiate_header_extensions(
&self,
state: &State,
src_caps: &mut gst::Caps,
) -> Result<(), gst::FlowError> {
let s = src_caps.structure(0).unwrap();
// Check which header extensions to enable/disable based on the caps.
let mut caps_extensions = BTreeMap::new();
for (k, v) in s.iter() {
let Some(ext_id) = k.strip_prefix("extmap-") else {
continue;
};
let Ok(ext_id) = ext_id.parse::<u8>() else {
gst::error!(CAT, imp: self, "Can't parse RTP header extension id from caps {src_caps:?}");
return Err(gst::FlowError::NotNegotiated);
};
let uri = if let Ok(uri) = v.get::<String>() {
uri
} else if let Ok(arr) = v.get::<gst::ArrayRef>() {
if let Some(uri) = arr.get(1).and_then(|v| v.get::<String>().ok()) {
uri
} else {
gst::error!(CAT, imp: self, "Couldn't get URI for RTP header extension id {ext_id} from caps {src_caps:?}");
return Err(gst::FlowError::NotNegotiated);
}
} else {
gst::error!(CAT, imp: self, "Couldn't get URI for RTP header extension id {ext_id} from caps {src_caps:?}");
return Err(gst::FlowError::NotNegotiated);
};
caps_extensions.insert(ext_id, uri);
}
let mut extensions = self.extensions.lock().unwrap();
let mut extensions_changed = false;
for (ext_id, uri) in caps_extensions {
if let Some(extension) = extensions.get(&ext_id) {
if extension.uri().as_deref() == Some(&uri) {
// Same extension, nothing to do here, we will update it with the new caps
// later
continue;
}
gst::debug!(
CAT,
imp: self,
"Extension ID {ext_id} changed from {:?} to {uri}",
extension.uri(),
);
extensions_changed |= true;
extensions.remove(&ext_id);
}
gst::debug!(CAT, imp: self, "Requesting extension {uri} for ID {ext_id}");
let ext = self
.obj()
.emit_by_name::<Option<gst_rtp::RTPHeaderExtension>>(
"request-extension",
&[&(ext_id as u32), &uri],
);
let Some(ext) = ext else {
gst::warning!(CAT, imp: self, "Couldn't create extension for {uri} with ID {ext_id}");
continue;
};
if ext.id() != ext_id as u32 {
gst::warning!(CAT, imp: self, "Created extension has wrong ID");
continue;
}
extensions.insert(ext_id, ext);
extensions_changed |= true;
}
let sink_caps = state.sink_caps.as_ref().unwrap();
let mut to_remove = vec![];
for (ext_id, ext) in extensions.iter() {
if !ext.set_attributes_from_caps(src_caps) {
gst::warning!(CAT, imp: self, "Failed to configure extension {ext_id} from caps {src_caps}");
to_remove.push(*ext_id);
}
if !ext.set_non_rtp_sink_caps(sink_caps) {
gst::warning!(CAT, imp: self, "Failed to configure extension {ext_id} from sink caps {sink_caps}");
to_remove.push(*ext_id);
}
}
for ext_id in to_remove {
extensions.remove(&ext_id);
extensions_changed |= true;
}
// First remove all header extension related fields from the caps,
// then add them again correctly from the actually selected extensions.
{
let caps = src_caps.make_mut();
let s = caps.structure_mut(0).unwrap();
s.filter_map_in_place(|field, value| {
if field.as_str().starts_with("extmap-") {
None
} else {
Some(value)
}
});
for (_, ext) in extensions.iter() {
ext.set_caps_from_attributes(caps);
}
}
drop(extensions);
if extensions_changed {
self.obj().notify("extensions");
}
Ok(())
}
fn negotiate_default(&self, mut src_caps: gst::Caps) {
gst::debug!(CAT, imp: self, "Negotiating caps {src_caps:?}");
// Fixate what is left to fixate.
src_caps.fixate();
let s = src_caps.structure(0).unwrap();
assert!(
s.has_name("application/x-rtp"),
"Non-RTP caps {src_caps:?} not supported"
);
let clock_rate = match s.get::<i32>("clock-rate") {
Ok(clock_rate) if clock_rate > 0 => clock_rate as u32,
_ => {
panic!("RTP caps {src_caps:?} without 'clock-rate'");
}
};
let mut state = self.state.borrow_mut();
if self
.negotiate_header_extensions(&state, &mut src_caps)
.is_err()
{
state.negotiated_src_caps = None;
return;
}
let stream = state.stream.as_ref().unwrap();
let ssrc = stream.ssrc;
let pt = stream.pt;
// Set SSRC and payload type. We don't negotiate these with downstream!
{
let caps = src_caps.make_mut();
caps.set("ssrc", ssrc);
caps.set("payload", pt as i32);
}
if Some(&src_caps) == state.negotiated_src_caps.as_ref() {
gst::debug!(CAT, imp: self, "Setting same caps {src_caps:?} again");
return;
}
let clock_rate_changed = state
.clock_rate
.map_or(false, |old_clock_rate| old_clock_rate != clock_rate);
state.negotiated_src_caps = Some(src_caps.clone());
state.clock_rate = Some(clock_rate);
self.stats.lock().unwrap().as_mut().unwrap().clock_rate = Some(clock_rate);
let seqnum = if let Some((seqnum, _)) = state.segment {
seqnum
} else {
gst::Seqnum::next()
};
let mut segment_event = self.retrieve_pending_segment_event(&mut state);
drop(state);
// FIXME: Drain also in other conditions?
if clock_rate_changed {
// First push the pending segment event downstream before draining
if let Some(segment_event) = segment_event.take() {
let _ = self.src_pad.push_event(segment_event);
}
if let Err(err) = self.drain() {
// Just continue here. The payloader is now flushed and can proceed below,
// and if there was a serious error downstream it will happen on the next
// buffer pushed downstream
gst::debug!(CAT, imp: self, "Draining failed: {err:?}");
}
}
// Rely on the actual flow return later when pushing a buffer downstream. The bool
// return from pushing the event does not allow to distinguish between different kinds of
// errors.
let _ = self
.src_pad
.push_event(gst::event::Caps::builder(&src_caps).seqnum(seqnum).build());
if let Some(segment_event) = segment_event {
let _ = self.src_pad.push_event(segment_event);
}
}
fn handle_buffer_default(
&self,
_buffer: &gst::Buffer,
_id: u64,
) -> Result<gst::FlowSuccess, gst::FlowError> {
unimplemented!()
}
fn drain_default(&self) -> Result<gst::FlowSuccess, gst::FlowError> {
Ok(gst::FlowSuccess::Ok)
}
fn flush_default(&self) {}
fn sink_event_default(&self, event: gst::Event) -> Result<gst::FlowSuccess, gst::FlowError> {
match event.view() {
gst::EventView::Caps(event) => {
let caps = event.caps_owned();
return self.set_sink_caps(caps);
}
gst::EventView::Segment(event) => {
let segment = event.segment();
let seqnum = event.seqnum();
let state = self.state.borrow();
if state.sink_caps.is_none() {
gst::warning!(CAT, imp: self, "Received segment before caps");
return Err(gst::FlowError::NotNegotiated);
}
let same_segment = state
.segment
.as_ref()
.map(|(old_seqnum, old_segment)| {
(&seqnum, segment) == (old_seqnum, old_segment.upcast_ref())
})
.unwrap_or(false);
// Only drain if the segment changed and we actually had one before
let drain = !same_segment && state.segment.is_some();
drop(state);
if !same_segment {
gst::debug!(CAT, imp: self, "Received segment {segment:?}");
if drain {
if let Err(err) = self.drain() {
// Just continue here. The payloader is now flushed and can proceed below,
// and if there was a serious error downstream it will happen on the next
// buffer pushed downstream
gst::debug!(CAT, imp: self, "Draining failed: {err:?}");
}
}
let mut state = self.state.borrow_mut();
if let Some(segment) = segment.downcast_ref::<gst::ClockTime>() {
state.segment = Some((event.seqnum(), segment.clone()));
} else {
gst::error!(CAT, imp: self, "Segments in non-TIME format are not supported");
state.segment = None;
// FIXME: Forget everything here?
return Err(gst::FlowError::Error);
}
if state.negotiated_src_caps.is_none() {
state.pending_segment = true;
gst::debug!(CAT, imp: self, "Received segment before knowing src caps, delaying");
return Ok(gst::FlowSuccess::Ok);
}
drop(state);
}
}
gst::EventView::Eos(_) => {
if let Err(err) = self.drain() {
gst::debug!(CAT, imp: self, "Draining on EOS failed: {err:?}");
}
}
gst::EventView::FlushStop(_) => {
self.flush();
let mut state = self.state.borrow_mut();
state.segment = None;
state.pending_segment = false;
// RTP timestamp and seqnum are not reset here
state.last_pts = None;
drop(state);
*self.stats.lock().unwrap() = None;
}
_ => (),
}
gst::debug!(CAT, imp: self, "Forwarding event: {event:?}");
if self.src_pad.push_event(event) {
Ok(gst::FlowSuccess::Ok)
} else if self.src_pad.pad_flags().contains(gst::PadFlags::FLUSHING) {
Err(gst::FlowError::Flushing)
} else {
Err(gst::FlowError::Error)
}
}
fn handle_ssrc_collision_event(&self, s: &gst::StructureRef) {
let Ok(ssrc) = s.get::<u32>("ssrc") else {
return;
};
let stats = self.stats.lock().unwrap();
let Some(ref stats) = &*stats else {
return;
};
if stats.ssrc != ssrc {
return;
}
let new_ssrc = if let Ok(suggested_ssrc) = s.get::<u32>("suggested-ssrc") {
suggested_ssrc
} else {
use rand::prelude::*;
let mut rng = rand::thread_rng();
loop {
let new_ssrc = rng.gen::<u32>();
if new_ssrc != stats.ssrc {
break new_ssrc;
}
}
};
{
let mut ssrc_collision = self.ssrc_collision.lock().unwrap();
*ssrc_collision = Some(new_ssrc);
}
}
fn src_event_default(&self, event: gst::Event) -> Result<gst::FlowSuccess, gst::FlowError> {
if let gst::EventView::CustomUpstream(ev) = event.view() {
if let Some(s) = ev.structure() {
if s.name() == "GstRTPCollision" {
self.handle_ssrc_collision_event(s);
}
}
}
if gst::Pad::event_default(&self.src_pad, Some(&*self.obj()), event) {
Ok(gst::FlowSuccess::Ok)
} else if self.sink_pad.pad_flags().contains(gst::PadFlags::FLUSHING) {
Err(gst::FlowError::Flushing)
} else {
Err(gst::FlowError::Error)
}
}
fn sink_query_default(&self, query: &mut gst::QueryRef) -> bool {
gst::Pad::query_default(&self.sink_pad, Some(&*self.obj()), query)
}
fn src_query_default(&self, query: &mut gst::QueryRef) -> bool {
gst::Pad::query_default(&self.src_pad, Some(&*self.obj()), query)
}
fn transform_meta_default(
&self,
_in_buf: &gst::BufferRef,
meta: &gst::MetaRef<gst::Meta>,
out_buf: &mut gst::BufferRef,
) {
let tags = meta.tags();
if tags.len() > 1 {
gst::trace!(
CAT,
imp: self,
"Not copying meta {}: has multiple tags {tags:?}",
meta.api(),
);
return;
}
let allowed_tags = self.obj().class().as_ref().allowed_meta_tags;
if tags.len() == 1 {
let meta_tag = &tags[0];
if !allowed_tags.iter().copied().any(|tag| tag == *meta_tag) {
gst::trace!(
CAT,
imp: self,
"Not copying meta {}: tag '{meta_tag}' not allowed",
meta.api(),
);
return;
}
}
gst::trace!(CAT, imp: self, "Copying meta {}", meta.api());
if let Err(err) = meta.transform(out_buf, &gst::meta::MetaTransformCopy::new(false, ..)) {
gst::trace!(CAT, imp: self, "Could not copy meta {}: {err}", meta.api());
}
}
fn add_extension(&self, ext: &gst_rtp::RTPHeaderExtension) {
assert_ne!(ext.id(), 0);
let mut extensions = self.extensions.lock().unwrap();
extensions.insert(ext.id() as u8, ext.clone());
self.src_pad.mark_reconfigure();
drop(extensions);
self.obj().notify("extensions");
}
fn clear_extensions(&self) {
let mut extensions = self.extensions.lock().unwrap();
extensions.clear();
self.src_pad.mark_reconfigure();
drop(extensions);
self.obj().notify("extensions");
}
fn request_extension(&self, ext_id: u32, uri: &str) -> Option<gst_rtp::RTPHeaderExtension> {
let settings = self.settings.lock().unwrap();
if !settings.auto_header_extensions {
return None;
}
drop(settings);
let Some(ext) = gst_rtp::RTPHeaderExtension::create_from_uri(uri) else {
gst::debug!(
CAT,
imp: self,
"Didn't find any extension implementing URI {uri}",
);
return None;
};
gst::debug!(
CAT,
imp: self,
"Automatically enabling extension {} for URI {uri}",
ext.name(),
);
ext.set_id(ext_id);
Some(ext)
}
}
/// Pad functions and associated helper functions.
impl RtpBasePay2 {
fn sink_chain(
&self,
_pad: &gst::Pad,
buffer: gst::Buffer,
) -> Result<gst::FlowSuccess, gst::FlowError> {
gst::trace!(CAT, imp: self, "Received buffer {buffer:?}");
let settings = self.settings.lock().unwrap().clone();
self.handle_buffer(&settings, buffer)
}
fn sink_chain_list(
&self,
_pad: &gst::Pad,
list: gst::BufferList,
) -> Result<gst::FlowSuccess, gst::FlowError> {
gst::trace!(CAT, imp: self, "Received buffer list {list:?}");
let settings = self.settings.lock().unwrap().clone();
for buffer in list.iter_owned() {
self.handle_buffer(&settings, buffer)?;
}
Ok(gst::FlowSuccess::Ok)
}
fn sink_event(
&self,
pad: &gst::Pad,
event: gst::Event,
) -> Result<gst::FlowSuccess, gst::FlowError> {
gst::debug!(CAT, obj: pad, "Received event: {event:?}");
let obj = self.obj();
(obj.class().as_ref().sink_event)(&obj, event)
}
fn src_event(
&self,
pad: &gst::Pad,
event: gst::Event,
) -> Result<gst::FlowSuccess, gst::FlowError> {
gst::debug!(CAT, obj: pad, "Received event: {event:?}");
let obj = self.obj();
(obj.class().as_ref().src_event)(&obj, event)
}
fn sink_query(&self, pad: &gst::Pad, query: &mut gst::QueryRef) -> bool {
gst::trace!(CAT, obj: pad, "Received query: {query:?}");
let obj = self.obj();
(obj.class().as_ref().sink_query)(&obj, query)
}
fn src_query(&self, pad: &gst::Pad, query: &mut gst::QueryRef) -> bool {
gst::trace!(CAT, obj: pad, "Received query: {query:?}");
let obj = self.obj();
(obj.class().as_ref().src_query)(&obj, query)
}
fn retrieve_pending_segment_event(&self, state: &mut State) -> Option<gst::Event> {
// If a segment event was pending, take it now and push it out later.
if !state.pending_segment {
return None;
}
state.negotiated_src_caps.as_ref()?;
let (seqnum, segment) = state.segment.as_ref().unwrap();
let segment_event = gst::event::Segment::builder(segment)
.seqnum(*seqnum)
.build();
state.pending_segment = false;
gst::debug!(CAT, imp: self, "Created segment event {segment:?} with seqnum {seqnum:?}");
Some(segment_event)
}
fn handle_buffer(
&self,
_settings: &Settings,
buffer: gst::Buffer,
) -> Result<gst::FlowSuccess, gst::FlowError> {
// Check if renegotiation is needed.
if self.src_pad.check_reconfigure() {
self.negotiate();
}
let mut state = self.state.borrow_mut();
if state.sink_caps.is_none() {
gst::error!(CAT, imp: self, "No sink pad caps");
gst::element_imp_error!(
self,
gst::CoreError::Negotiation,
[
"No input format was negotiated, i.e. no caps event was received. \
Perhaps you need a parser or typefind element before the payloader",
]
);
return Err(gst::FlowError::NotNegotiated);
}
if buffer.flags().contains(gst::BufferFlags::HEADER)
&& self.obj().class().as_ref().drop_header_buffers
{
gst::trace!(CAT, imp: self, "Dropping buffer with HEADER flag");
return Ok(gst::FlowSuccess::Ok);
}
if buffer.pts().is_none() {
gst::error!(CAT, imp: self, "Buffers without PTS");
return Err(gst::FlowError::Error);
};
// TODO: Should we drain on DISCONT here, or leave it to the subclass?
// Check for too many pending packets
if let Some((front, back)) =
Option::zip(state.pending_buffers.front(), state.pending_buffers.back())
{
let pts_diff = back.buffer.pts().opt_saturating_sub(front.buffer.pts());
if let Some(pts_diff) = pts_diff {
if pts_diff > gst::ClockTime::SECOND {
gst::warning!(CAT, imp: self, "More than {pts_diff} of PTS time queued, probably a bug in the subclass");
}
}
}
if let Some(ssrc_collision) = self.take_ssrc_collision() {
let new_ssrc = ssrc_collision;
let stream = state.stream.as_mut().unwrap();
gst::debug!(
CAT,
imp: self,
"Switching from SSRC {} to {} because of SSRC collision",
stream.ssrc,
new_ssrc,
);
stream.ssrc = new_ssrc;
if let Some(ref src_caps) = state.negotiated_src_caps {
let mut src_caps = src_caps.copy();
{
let caps = src_caps.get_mut().unwrap();
caps.set("ssrc", new_ssrc);
}
state.negotiated_src_caps = Some(src_caps.clone());
let seqnum = if let Some((seqnum, _)) = state.segment {
seqnum
} else {
gst::Seqnum::next()
};
drop(state);
let _ = self
.src_pad
.push_event(gst::event::Caps::builder(&src_caps).seqnum(seqnum).build());
state = self.state.borrow_mut();
}
let mut stats_guard = self.stats.lock().unwrap();
let stats = stats_guard.as_mut().unwrap();
stats.ssrc = new_ssrc;
drop(stats_guard);
}
let id = state.current_buffer_id;
state.current_buffer_id += 1;
gst::trace!(CAT, imp: self, "Handling buffer {buffer:?} with id {id}");
state.pending_buffers.push_back(PendingBuffer {
id,
buffer: buffer.clone(),
});
drop(state);
let obj = self.obj();
let mut res = (obj.class().as_ref().handle_buffer)(&obj, &buffer, id);
if let Err(err) = res {
gst::error!(CAT, imp: self, "Failed handling buffer: {err:?}");
} else {
res = self.finish_pending_packets();
if let Err(err) = res {
gst::debug!(CAT, imp: self, "Failed finishing pending packets: {err:?}");
}
}
// Now drop all pending buffers that were used for producing packets
// except for the very last one as that might still be used.
let mut state = self.state.borrow_mut();
while state
.pending_buffers
.front()
.map_or(false, |b| b.id < state.last_used_buffer_id)
{
let _ = state.pending_buffers.pop_front();
}
res
}
}
/// Other methods.
impl RtpBasePay2 {
fn set_sink_caps(&self, caps: gst::Caps) -> Result<gst::FlowSuccess, gst::FlowError> {
gst::debug!(CAT, imp: self, "Received caps {caps:?}");
let mut state = self.state.borrow_mut();
let same_caps = state
.sink_caps
.as_ref()
.map(|old_caps| &caps == old_caps)
.unwrap_or(false);
state.sink_caps = Some(caps.clone());
drop(state);
if !same_caps {
let obj = self.obj();
let set_sink_caps_res = if (obj.class().as_ref().set_sink_caps)(&obj, &caps) {
gst::debug!(CAT, imp: self, "Caps {caps:?} accepted");
Ok(gst::FlowSuccess::Ok)
} else {
gst::warning!(CAT, imp: self, "Caps {caps:?} not accepted");
Err(gst::FlowError::NotNegotiated)
};
let mut state = self.state.borrow_mut();
if let Err(err) = set_sink_caps_res {
state.sink_caps = None;
// FIXME: Forget everything here?
return Err(err);
}
drop(state);
}
Ok(gst::FlowSuccess::Ok)
}
fn drain(&self) -> Result<gst::FlowSuccess, gst::FlowError> {
let obj = self.obj();
if let Err(err) = (obj.class().as_ref().drain)(&obj) {
if ![gst::FlowError::Flushing, gst::FlowError::Eos].contains(&err) {
gst::warning!(CAT, imp: self, "Draining failed: {err:?}");
} else {
gst::debug!(CAT, imp: self, "Draining failed: {err:?}");
}
self.flush();
return Err(err);
}
let mut state = self.state.borrow_mut();
state.pending_buffers.clear();
if !state.pending_packets.is_empty() {
gst::debug!(CAT, imp: self, "Pushing all pending packets");
}
// Update PTS and RTP timestamp of all pending packets that have none yet
let pts = state.last_pts;
let rtptime = state.last_timestamp.0;
let mut discont_pending = state.discont_pending;
state.discont_pending = false;
for packet in state
.pending_packets
.iter_mut()
.skip_while(|p| p.buffer.pts().is_some())
{
assert!(packet.buffer.pts().is_none());
let buffer = packet.buffer.get_mut().unwrap();
buffer.set_pts(pts);
if discont_pending {
buffer.set_flags(gst::BufferFlags::DISCONT);
discont_pending = false;
}
let mut map = buffer.map_writable().unwrap();
let mut packet = rtp_types::RtpPacketMut::parse(&mut map).unwrap();
packet.set_timestamp(rtptime);
}
drop(state);
// Forward all packets
let res = self.finish_pending_packets();
self.flush();
res
}
fn flush(&self) {
let obj = self.obj();
(obj.class().as_ref().flush)(&obj);
let mut state = self.state.borrow_mut();
state.pending_buffers.clear();
// Drop any remaining pending packets, this can only really happen on errors
state.pending_packets.clear();
// Reset mapping and require the subclass to provide a new one next time
state.last_pts_rtp_mapping = None;
state.discont_pending = true;
}
fn create_stats(&self) -> gst::Structure {
let stats = self.stats.lock().unwrap().clone();
if let Some(stats) = stats {
gst::Structure::builder("application/x-rtp-payload-stats")
.field("ssrc", stats.ssrc)
.field("clock-rate", stats.clock_rate.unwrap_or_default())
.field("running-time", stats.running_time)
.field("seqnum", stats.seqnum as u32)
.field("timestamp", stats.timestamp)
.field("pt", stats.pt as u32)
.field("seqnum-offset", stats.seqnum_offset as u32)
.field("timestamp-offset", stats.timestamp_offset)
.build()
} else {
gst::Structure::builder("application/x-rtp-payload-stats").build()
}
}
}
#[glib::object_subclass]
impl ObjectSubclass for RtpBasePay2 {
const NAME: &'static str = "GstRtpBasePay2";
const ABSTRACT: bool = true;
type Type = super::RtpBasePay2;
type ParentType = gst::Element;
type Class = super::Class;
fn class_init(class: &mut Self::Class) {
class.start = |obj| obj.imp().start_default();
class.stop = |obj| obj.imp().stop_default();
class.set_sink_caps = |obj, caps| obj.imp().set_sink_caps_default(caps);
class.negotiate = |obj, caps| obj.imp().negotiate_default(caps);
class.handle_buffer = |obj, buffer, id| obj.imp().handle_buffer_default(buffer, id);
class.drain = |obj| obj.imp().drain_default();
class.flush = |obj| obj.imp().flush_default();
class.sink_event = |obj, event| obj.imp().sink_event_default(event);
class.src_event = |obj, event| obj.imp().src_event_default(event);
class.sink_query = |obj, query| obj.imp().sink_query_default(query);
class.src_query = |obj, query| obj.imp().src_query_default(query);
class.transform_meta =
|obj, in_buf, meta, out_buf| obj.imp().transform_meta_default(in_buf, meta, out_buf);
}
fn with_class(class: &Self::Class) -> Self {
let templ = class
.pad_template("sink")
.expect("Subclass did not provide a \"sink\" pad template");
let sink_pad = gst::Pad::builder_from_template(&templ)
.chain_function(|pad, parent, buffer| {
Self::catch_panic_pad_function(
parent,
|| Err(gst::FlowError::Error),
|imp| imp.sink_chain(pad, buffer),
)
})
.chain_list_function(|pad, parent, list| {
Self::catch_panic_pad_function(
parent,
|| Err(gst::FlowError::Error),
|imp| imp.sink_chain_list(pad, list),
)
})
.event_full_function(|pad, parent, event| {
Self::catch_panic_pad_function(
parent,
|| Err(gst::FlowError::Error),
|imp| imp.sink_event(pad, event),
)
})
.query_function(|pad, parent, query| {
Self::catch_panic_pad_function(parent, || false, |imp| imp.sink_query(pad, query))
})
.build();
let templ = class
.pad_template("src")
.expect("Subclass did not provide a \"src\" pad template");
let src_pad = gst::Pad::builder_from_template(&templ)
.event_full_function(|pad, parent, event| {
Self::catch_panic_pad_function(
parent,
|| Err(gst::FlowError::Error),
|imp| imp.src_event(pad, event),
)
})
.query_function(|pad, parent, query| {
Self::catch_panic_pad_function(parent, || false, |imp| imp.src_query(pad, query))
})
.flags(gst::PadFlags::FIXED_CAPS)
.build();
Self {
src_pad,
sink_pad,
state: AtomicRefCell::default(),
settings: Mutex::default(),
stats: Mutex::default(),
ssrc_collision: Mutex::new(None),
extensions: Mutex::default(),
}
}
}
impl ObjectImpl for RtpBasePay2 {
fn properties() -> &'static [glib::ParamSpec] {
static PROPERTIES: Lazy<Vec<glib::ParamSpec>> = Lazy::new(|| {
vec![
glib::ParamSpecUInt::builder("mtu")
.nick("MTU")
.blurb("Maximum size of one RTP packet")
.default_value(Settings::default().mtu)
.minimum(28)
.mutable_playing()
.build(),
glib::ParamSpecUInt::builder("pt")
.nick("Payload Type")
.blurb("Payload type of the packets")
.default_value(u32::from(Settings::default().pt))
.minimum(0)
.maximum(0x7f)
.mutable_ready()
.build(),
glib::ParamSpecInt64::builder("ssrc")
.nick("SSRC")
.blurb("SSRC of the packets (-1 == random)")
.default_value(Settings::default().ssrc.map(i64::from).unwrap_or(-1))
.minimum(-1)
.maximum(u32::MAX as i64)
.mutable_ready()
.build(),
glib::ParamSpecInt64::builder("timestamp-offset")
.nick("Timestamp Offset")
.blurb("Offset that is added to all RTP timestamps (-1 == random)")
.default_value(
Settings::default()
.timestamp_offset
.map(i64::from)
.unwrap_or(-1),
)
.minimum(-1)
.maximum(u32::MAX as i64)
.mutable_ready()
.build(),
glib::ParamSpecInt::builder("seqnum-offset")
.nick("Sequence Number Offset")
.blurb("Offset that is added to all RTP sequence numbers (-1 == random)")
.default_value(
Settings::default()
.seqnum_offset
.map(i32::from)
.unwrap_or(-1),
)
.minimum(-1)
.maximum(u16::MAX as i32)
.mutable_ready()
.build(),
glib::ParamSpecBoolean::builder("onvif-no-rate-control")
.nick("ONVIF No Rate Control")
.blurb("Enable ONVIF Rate-Control=no timestamping mode")
.default_value(Settings::default().onvif_no_rate_control)
.mutable_ready()
.build(),
glib::ParamSpecBoolean::builder("scale-rtptime")
.nick("Scale RTP Time")
.blurb("Whether the RTP timestamp should be scaled with the rate (speed)")
.default_value(Settings::default().scale_rtptime)
.mutable_ready()
.build(),
glib::ParamSpecBoxed::builder::<gst::Structure>("stats")
.nick("Statistics")
.blurb("Various statistics")
.read_only()
.build(),
glib::ParamSpecUInt::builder("seqnum")
.nick("Sequence Number")
.blurb("RTP sequence number of the last packet")
.minimum(0)
.maximum(u16::MAX as u32)
.read_only()
.build(),
glib::ParamSpecUInt::builder("timestamp")
.nick("Timestamp")
.blurb("RTP timestamp of the last packet")
.minimum(0)
.maximum(u16::MAX as u32)
.read_only()
.build(),
glib::ParamSpecBoolean::builder("source-info")
.nick("RTP Source Info")
.blurb("Add RTP source information as buffer metadata")
.default_value(Settings::default().source_info)
.mutable_playing()
.build(),
glib::ParamSpecBoolean::builder("auto-header-extensions")
.nick("Automatic RTP Header Extensions")
.blurb("Whether RTP header extensions should be automatically enabled, if an implementation is available")
.default_value(Settings::default().auto_header_extensions)
.mutable_ready()
.build(),
gst::ParamSpecArray::builder("extensions")
.nick("RTP Header Extensions")
.blurb("List of enabled RTP header extensions")
.element_spec(&glib::ParamSpecObject::builder::<gst_rtp::RTPHeaderExtension>("extension")
.nick("RTP Header Extension")
.blurb("Enabled RTP header extension")
.read_only()
.build()
)
.read_only()
.build(),
]
});
PROPERTIES.as_ref()
}
fn signals() -> &'static [glib::subclass::Signal] {
static SIGNALS: Lazy<Vec<glib::subclass::Signal>> = Lazy::new(|| {
vec![
glib::subclass::Signal::builder("add-extension")
.action()
.param_types([gst_rtp::RTPHeaderExtension::static_type()])
.class_handler(|_token, args| {
let s = args[0].get::<super::RtpBasePay2>().unwrap();
let ext = args[1].get::<&gst_rtp::RTPHeaderExtension>().unwrap();
s.imp().add_extension(ext);
None
})
.build(),
glib::subclass::Signal::builder("request-extension")
.param_types([u32::static_type(), String::static_type()])
.return_type::<gst_rtp::RTPHeaderExtension>()
.accumulator(|_hint, acc, val| {
if matches!(val.get::<Option<glib::Object>>(), Ok(Some(_))) {
*acc = val.clone();
false
} else {
true
}
})
.class_handler(|_token, args| {
let s = args[0].get::<super::RtpBasePay2>().unwrap();
let ext_id = args[1].get::<u32>().unwrap();
let uri = args[2].get::<&str>().unwrap();
let ext = s.imp().request_extension(ext_id, uri);
Some(ext.to_value())
})
.build(),
glib::subclass::Signal::builder("clear-extensions")
.action()
.class_handler(|_token, args| {
let s = args[0].get::<super::RtpBasePay2>().unwrap();
s.imp().clear_extensions();
None
})
.build(),
]
});
SIGNALS.as_ref()
}
fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) {
match pspec.name() {
"mtu" => {
self.settings.lock().unwrap().mtu = value.get().unwrap();
}
"pt" => {
self.settings.lock().unwrap().pt = value.get::<u32>().unwrap() as u8;
}
"ssrc" => {
let v = value.get::<i64>().unwrap();
self.settings.lock().unwrap().ssrc = (v != -1).then_some(v as u32);
}
"timestamp-offset" => {
let v = value.get::<i64>().unwrap();
self.settings.lock().unwrap().timestamp_offset = (v != -1).then_some(v as u32);
}
"seqnum-offset" => {
let v = value.get::<i32>().unwrap();
self.settings.lock().unwrap().seqnum_offset = (v != -1).then_some(v as u16);
}
"onvif-no-rate-control" => {
self.settings.lock().unwrap().onvif_no_rate_control = value.get().unwrap();
}
"scale-rtptime" => {
self.settings.lock().unwrap().scale_rtptime = value.get().unwrap();
}
"source-info" => {
self.settings.lock().unwrap().source_info = value.get().unwrap();
}
"auto-header-extensions" => {
self.settings.lock().unwrap().auto_header_extensions = value.get().unwrap();
}
_ => unimplemented!(),
};
}
fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
match pspec.name() {
"mtu" => self.settings.lock().unwrap().mtu.to_value(),
"pt" => (u32::from(self.settings.lock().unwrap().pt)).to_value(),
"ssrc" => self
.settings
.lock()
.unwrap()
.ssrc
.map(i64::from)
.unwrap_or(-1)
.to_value(),
"timestamp-offset" => self
.settings
.lock()
.unwrap()
.timestamp_offset
.map(i64::from)
.unwrap_or(-1)
.to_value(),
"seqnum-offset" => self
.settings
.lock()
.unwrap()
.seqnum_offset
.map(i32::from)
.unwrap_or(-1)
.to_value(),
"onvif-no-rate-control" => self
.settings
.lock()
.unwrap()
.onvif_no_rate_control
.to_value(),
"scale-rtptime" => self.settings.lock().unwrap().scale_rtptime.to_value(),
"stats" => self.create_stats().to_value(),
"seqnum" => (self
.stats
.lock()
.unwrap()
.as_ref()
.map(|s| s.seqnum)
.unwrap_or(0) as u32)
.to_value(),
"timestamp" => self
.stats
.lock()
.unwrap()
.as_ref()
.map(|s| s.timestamp)
.unwrap_or(0)
.to_value(),
"source-info" => self.settings.lock().unwrap().source_info.to_value(),
"auto-header-extensions" => self
.settings
.lock()
.unwrap()
.auto_header_extensions
.to_value(),
"extensions" => gst::Array::new(self.extensions.lock().unwrap().values()).to_value(),
_ => unimplemented!(),
}
}
fn constructed(&self) {
self.parent_constructed();
let obj = self.obj();
obj.add_pad(&self.sink_pad).unwrap();
obj.add_pad(&self.src_pad).unwrap();
}
}
impl GstObjectImpl for RtpBasePay2 {}
impl ElementImpl for RtpBasePay2 {
fn change_state(
&self,
transition: gst::StateChange,
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
gst::debug!(CAT, imp: self, "Changing state: {transition}");
if transition == gst::StateChange::ReadyToPaused {
{
use rand::prelude::*;
let mut rng = rand::thread_rng();
let settings = self.settings.lock().unwrap();
let pt = settings.pt;
let ssrc = settings.ssrc.unwrap_or_else(|| rng.gen::<u32>());
let timestamp_offset = settings
.timestamp_offset
.unwrap_or_else(|| rng.gen::<u32>());
let seqnum_offset = settings.seqnum_offset.unwrap_or_else(|| rng.gen::<u16>());
// Need to check pt against template caps if it's a non-dynamic PT
if pt < 96 {
let mut pt_found = false;
let templ_caps = self.src_pad.pad_template_caps();
for s in templ_caps.iter() {
let Ok(allowed_pt) = s.value("payload") else {
continue;
};
if allowed_pt.can_intersect(&(pt as i32).to_value()) {
pt_found = true;
break;
}
}
if !pt_found {
gst::error!(
CAT,
imp: self,
"Unsupported static payload type {pt}, not found in template caps {templ_caps}",
);
// We would now return NotNegotiated the next time a buffer is tried to be sent
// downstream.
return Err(gst::StateChangeError);
}
}
let stream = Stream {
pt,
ssrc,
timestamp_offset,
seqnum_offset,
use_stream_time: settings.onvif_no_rate_control || !settings.scale_rtptime,
};
gst::info!(CAT, imp: self, "Configuring {stream:?}");
*self.state.borrow_mut() = State {
stream: Some(stream),
last_seqnum: Wrapping(seqnum_offset),
last_timestamp: Wrapping(timestamp_offset),
..State::default()
};
*self.stats.lock().unwrap() = Some(Stats {
ssrc,
pt,
clock_rate: None,
running_time: None,
seqnum: seqnum_offset,
timestamp: timestamp_offset,
seqnum_offset,
timestamp_offset,
});
}
let obj = self.obj();
(obj.class().as_ref().start)(&obj).map_err(|err_msg| {
self.post_error_message(err_msg);
gst::StateChangeError
})?;
}
let ret = self.parent_change_state(transition)?;
if transition == gst::StateChange::PausedToReady {
let obj = self.obj();
(obj.class().as_ref().stop)(&obj).map_err(|err_msg| {
self.post_error_message(err_msg);
gst::StateChangeError
})?;
*self.state.borrow_mut() = State::default();
*self.stats.lock().unwrap() = None;
}
Ok(ret)
}
}