diff --git a/net/rtpav1/src/pay/imp.rs b/net/rtpav1/src/pay/imp.rs index bfd7ef76a..fdd5ae261 100644 --- a/net/rtpav1/src/pay/imp.rs +++ b/net/rtpav1/src/pay/imp.rs @@ -10,6 +10,7 @@ use gst::{glib, subclass::prelude::*}; use gst_rtp::{prelude::*, subclass::prelude::*}; use std::{ + cmp, collections::VecDeque, io::{Cursor, Read, Seek, SeekFrom, Write}, sync::Mutex, @@ -76,6 +77,11 @@ struct State { /// Indicates the next constructed packet will be the first in its sequence /// (Corresponds to `N` field in the aggregation header) first_packet_in_seq: bool, + + /// The last observed DTS if upstream does not provide DTS for each OBU + last_dts: Option, + /// The last observed PTS if upstream does not provide PTS for each OBU + last_pts: Option, } #[derive(Debug, Default)] @@ -89,6 +95,8 @@ impl Default for State { obus: VecDeque::new(), open_obu_fragment: false, first_packet_in_seq: true, + last_dts: None, + last_pts: None, } } } @@ -342,11 +350,26 @@ impl RTPAv1Pay { { // this block enforces that outbuf_mut is dropped before pushing outbuf let first_obu = state.obus.front().unwrap(); + if let Some(dts) = first_obu.dts { + state.last_dts = Some( + state + .last_dts + .map_or(dts, |last_dts| cmp::max(last_dts, dts)), + ); + } + if let Some(pts) = first_obu.pts { + state.last_pts = Some( + state + .last_pts + .map_or(pts, |last_pts| cmp::max(last_pts, pts)), + ); + } + let outbuf_mut = outbuf .get_mut() .expect("Failed to get mutable reference to outbuf"); - outbuf_mut.set_dts(first_obu.dts); - outbuf_mut.set_pts(first_obu.pts); + outbuf_mut.set_dts(state.last_dts); + outbuf_mut.set_pts(state.last_pts); let mut rtp = gst_rtp::RTPBuffer::from_buffer_writable(outbuf_mut) .expect("Failed to create RTPBuffer"); @@ -383,6 +406,22 @@ impl RTPAv1Pay { for _ in 1..packet.obu_count { let obu = loop { let obu = state.obus.pop_front().unwrap(); + + if let Some(dts) = obu.dts { + state.last_dts = Some( + state + .last_dts + .map_or(dts, |last_dts| cmp::max(last_dts, dts)), + ); + } + if let Some(pts) = obu.pts { + state.last_pts = Some( + state + .last_pts + .map_or(pts, |last_pts| cmp::max(last_pts, pts)), + ); + } + // Drop temporal delimiter from here if obu.info.obu_type != ObuType::TemporalDelimiter { break obu; @@ -403,6 +442,22 @@ impl RTPAv1Pay { { let last_obu = loop { let obu = state.obus.front_mut().unwrap(); + + if let Some(dts) = obu.dts { + state.last_dts = Some( + state + .last_dts + .map_or(dts, |last_dts| cmp::max(last_dts, dts)), + ); + } + if let Some(pts) = obu.pts { + state.last_pts = Some( + state + .last_pts + .map_or(pts, |last_pts| cmp::max(last_pts, pts)), + ); + } + // Drop temporal delimiter from here if obu.info.obu_type != ObuType::TemporalDelimiter { break obu;