diff --git a/docs/plugins/gst_plugins_cache.json b/docs/plugins/gst_plugins_cache.json index a8bc51f7..4824fac5 100644 --- a/docs/plugins/gst_plugins_cache.json +++ b/docs/plugins/gst_plugins_cache.json @@ -7999,6 +7999,32 @@ "presence": "always" } }, + "properties": { + "request-keyframe": { + "blurb": "Request new keyframe when packet loss is detected", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "false", + "mutable": "ready", + "readable": true, + "type": "gboolean", + "writable": true + }, + "wait-for-keyframe": { + "blurb": "Wait for the next keyframe after packet loss", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "false", + "mutable": "ready", + "readable": true, + "type": "gboolean", + "writable": true + } + }, "rank": "marginal" }, "rtpav1pay": { diff --git a/net/rtp/src/av1/depay/imp.rs b/net/rtp/src/av1/depay/imp.rs index 11da141b..0cd54dee 100644 --- a/net/rtp/src/av1/depay/imp.rs +++ b/net/rtp/src/av1/depay/imp.rs @@ -13,6 +13,7 @@ use std::{ cmp::Ordering, io::{Cursor, Read, Seek, SeekFrom}, ops::RangeInclusive, + sync::Mutex, }; use bitstream_io::{BitReader, BitWriter}; @@ -28,6 +29,12 @@ use crate::{ use crate::basedepay::RtpBaseDepay2Ext; +#[derive(Clone, Default)] +struct Settings { + request_keyframe: bool, + wait_for_keyframe: bool, +} + struct PendingFragment { ext_seqnum: u64, bytes: Vec, @@ -43,6 +50,8 @@ struct State { found_valid_obu: bool, /// holds data for a fragment obu_fragment: Option, + /// if we saw a keyframe since the last discont + seen_keyframe: bool, } impl Default for State { @@ -53,6 +62,7 @@ impl Default for State { needs_discont: true, found_valid_obu: false, obu_fragment: None, + seen_keyframe: false, } } } @@ -60,6 +70,7 @@ impl Default for State { #[derive(Default)] pub struct RTPAv1Depay { state: AtomicRefCell, + settings: Mutex, } static CAT: Lazy = Lazy::new(|| { @@ -87,7 +98,48 @@ impl ObjectSubclass for RTPAv1Depay { type ParentType = crate::basedepay::RtpBaseDepay2; } -impl ObjectImpl for RTPAv1Depay {} +impl ObjectImpl for RTPAv1Depay { + fn properties() -> &'static [glib::ParamSpec] { + static PROPERTIES: Lazy> = Lazy::new(|| { + vec![ + glib::ParamSpecBoolean::builder("request-keyframe") + .nick("Request Keyframe") + .blurb("Request new keyframe when packet loss is detected") + .default_value(Settings::default().request_keyframe) + .mutable_ready() + .build(), + glib::ParamSpecBoolean::builder("wait-for-keyframe") + .nick("Wait For Keyframe") + .blurb("Wait for the next keyframe after packet loss") + .default_value(Settings::default().wait_for_keyframe) + .mutable_ready() + .build(), + ] + }); + + PROPERTIES.as_ref() + } + + fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) { + match pspec.name() { + "request-keyframe" => { + self.settings.lock().unwrap().request_keyframe = value.get().unwrap(); + } + "wait-for-keyframe" => { + self.settings.lock().unwrap().wait_for_keyframe = value.get().unwrap(); + } + _ => unimplemented!(), + }; + } + + fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { + match pspec.name() { + "request-keyframe" => self.settings.lock().unwrap().request_keyframe.to_value(), + "wait-for-keyframe" => self.settings.lock().unwrap().wait_for_keyframe.to_value(), + _ => unimplemented!(), + } + } +} impl GstObjectImpl for RTPAv1Depay {} @@ -193,6 +245,7 @@ impl RTPAv1Depay { ) -> Result, gst::Buffer)>, gst::FlowError> { gst::trace!(CAT, imp = self, "Processing RTP packet {packet:?}",); + let settings = self.settings.lock().unwrap().clone(); let mut state = self.state.borrow_mut(); let mut reader = Cursor::new(packet.payload()); @@ -226,6 +279,29 @@ impl RTPAv1Depay { self.obj().drop_packets(..packet.ext_seqnum()); } + if aggr_header.start_of_seq { + state.seen_keyframe = true; + } + + // If this is a new temporal unit and we never saw a keyframe so far, + // handle this according to the request-keyframe / wait-for-keyframe properties. + if !state.seen_keyframe { + if settings.request_keyframe { + gst::debug!(CAT, imp = self, "Requesting keyframe from upstream"); + let event = gst_video::UpstreamForceKeyUnitEvent::builder() + .all_headers(true) + .build(); + let _ = self.obj().sink_pad().push_event(event); + } + + if settings.wait_for_keyframe { + gst::trace!(CAT, imp = self, "Waiting for keyframe"); + self.reset(&mut state); + self.obj().drop_packets(..=packet.ext_seqnum()); + return Ok(None); + } + } + // the next temporal unit starts with a temporal delimiter OBU ready_obus.extend_from_slice(&TEMPORAL_DELIMITER); }