rtpav1pay: Track last known upstream PTS/DTS in case not all OBUs are properly timestamped

This commit is contained in:
Sebastian Dröge 2022-10-19 15:42:48 +03:00
parent 36861edf9a
commit 9ce8e93c63

View file

@ -10,6 +10,7 @@
use gst::{glib, subclass::prelude::*}; use gst::{glib, subclass::prelude::*};
use gst_rtp::{prelude::*, subclass::prelude::*}; use gst_rtp::{prelude::*, subclass::prelude::*};
use std::{ use std::{
cmp,
collections::VecDeque, collections::VecDeque,
io::{Cursor, Read, Seek, SeekFrom, Write}, io::{Cursor, Read, Seek, SeekFrom, Write},
sync::Mutex, sync::Mutex,
@ -76,6 +77,11 @@ struct State {
/// Indicates the next constructed packet will be the first in its sequence /// Indicates the next constructed packet will be the first in its sequence
/// (Corresponds to `N` field in the aggregation header) /// (Corresponds to `N` field in the aggregation header)
first_packet_in_seq: bool, first_packet_in_seq: bool,
/// The last observed DTS if upstream does not provide DTS for each OBU
last_dts: Option<gst::ClockTime>,
/// The last observed PTS if upstream does not provide PTS for each OBU
last_pts: Option<gst::ClockTime>,
} }
#[derive(Debug, Default)] #[derive(Debug, Default)]
@ -89,6 +95,8 @@ impl Default for State {
obus: VecDeque::new(), obus: VecDeque::new(),
open_obu_fragment: false, open_obu_fragment: false,
first_packet_in_seq: true, first_packet_in_seq: true,
last_dts: None,
last_pts: None,
} }
} }
} }
@ -342,11 +350,26 @@ impl RTPAv1Pay {
{ {
// this block enforces that outbuf_mut is dropped before pushing outbuf // this block enforces that outbuf_mut is dropped before pushing outbuf
let first_obu = state.obus.front().unwrap(); let first_obu = state.obus.front().unwrap();
if let Some(dts) = first_obu.dts {
state.last_dts = Some(
state
.last_dts
.map_or(dts, |last_dts| cmp::max(last_dts, dts)),
);
}
if let Some(pts) = first_obu.pts {
state.last_pts = Some(
state
.last_pts
.map_or(pts, |last_pts| cmp::max(last_pts, pts)),
);
}
let outbuf_mut = outbuf let outbuf_mut = outbuf
.get_mut() .get_mut()
.expect("Failed to get mutable reference to outbuf"); .expect("Failed to get mutable reference to outbuf");
outbuf_mut.set_dts(first_obu.dts); outbuf_mut.set_dts(state.last_dts);
outbuf_mut.set_pts(first_obu.pts); outbuf_mut.set_pts(state.last_pts);
let mut rtp = gst_rtp::RTPBuffer::from_buffer_writable(outbuf_mut) let mut rtp = gst_rtp::RTPBuffer::from_buffer_writable(outbuf_mut)
.expect("Failed to create RTPBuffer"); .expect("Failed to create RTPBuffer");
@ -383,6 +406,22 @@ impl RTPAv1Pay {
for _ in 1..packet.obu_count { for _ in 1..packet.obu_count {
let obu = loop { let obu = loop {
let obu = state.obus.pop_front().unwrap(); let obu = state.obus.pop_front().unwrap();
if let Some(dts) = obu.dts {
state.last_dts = Some(
state
.last_dts
.map_or(dts, |last_dts| cmp::max(last_dts, dts)),
);
}
if let Some(pts) = obu.pts {
state.last_pts = Some(
state
.last_pts
.map_or(pts, |last_pts| cmp::max(last_pts, pts)),
);
}
// Drop temporal delimiter from here // Drop temporal delimiter from here
if obu.info.obu_type != ObuType::TemporalDelimiter { if obu.info.obu_type != ObuType::TemporalDelimiter {
break obu; break obu;
@ -403,6 +442,22 @@ impl RTPAv1Pay {
{ {
let last_obu = loop { let last_obu = loop {
let obu = state.obus.front_mut().unwrap(); let obu = state.obus.front_mut().unwrap();
if let Some(dts) = obu.dts {
state.last_dts = Some(
state
.last_dts
.map_or(dts, |last_dts| cmp::max(last_dts, dts)),
);
}
if let Some(pts) = obu.pts {
state.last_pts = Some(
state
.last_pts
.map_or(pts, |last_pts| cmp::max(last_pts, pts)),
);
}
// Drop temporal delimiter from here // Drop temporal delimiter from here
if obu.info.obu_type != ObuType::TemporalDelimiter { if obu.info.obu_type != ObuType::TemporalDelimiter {
break obu; break obu;