diff --git a/Cargo.lock b/Cargo.lock index 8d5e8349b..dbbb75618 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2630,6 +2630,7 @@ dependencies = [ "gstreamer", "gstreamer-check", "gstreamer-rtp", + "gstreamer-video", "once_cell", "smallvec", "time", diff --git a/docs/plugins/gst_plugins_cache.json b/docs/plugins/gst_plugins_cache.json index fc9cea5e0..c03e2134d 100644 --- a/docs/plugins/gst_plugins_cache.json +++ b/docs/plugins/gst_plugins_cache.json @@ -6276,6 +6276,32 @@ "presence": "always" } }, + "properties": { + "request-keyframe": { + "blurb": "Request new keyframe when packet loss is detected", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "false", + "mutable": "ready", + "readable": true, + "type": "gboolean", + "writable": true + }, + "wait-for-keyframe": { + "blurb": "Wait for the next keyframe after packet loss", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "false", + "mutable": "ready", + "readable": true, + "type": "gboolean", + "writable": true + } + }, "rank": "marginal" }, "rtpav1pay": { diff --git a/net/rtp/Cargo.toml b/net/rtp/Cargo.toml index 4c275d4c7..db677b008 100644 --- a/net/rtp/Cargo.toml +++ b/net/rtp/Cargo.toml @@ -10,8 +10,9 @@ rust-version.workspace = true [dependencies] bitstream-io = "2.0" -gst = { workspace = true, features = ["v1_20"] } -gst-rtp = { workspace = true, features = ["v1_20"]} +gst = { workspace = true, features = ["v1_20"] } +gst-rtp = { workspace = true, features = ["v1_20"]} +gst-video = { workspace = true, features = ["v1_20"]} time = { version = "0.3", default-features = false, features = ["std"] } once_cell.workspace = true smallvec = { version = "1.11", features = ["union", "write", "const_generics", "const_new"] } diff --git a/net/rtp/src/av1/depay/imp.rs b/net/rtp/src/av1/depay/imp.rs index 77e8f99aa..1dd23d204 100644 --- a/net/rtp/src/av1/depay/imp.rs +++ b/net/rtp/src/av1/depay/imp.rs @@ -24,6 +24,12 @@ use crate::av1::common::{ UnsizedObu, CLOCK_RATE, ENDIANNESS, }; +#[derive(Clone, Default)] +struct Settings { + request_keyframe: bool, + wait_for_keyframe: bool, +} + #[derive(Debug)] struct State { last_timestamp: Option, @@ -35,6 +41,8 @@ struct State { found_valid_obu: bool, /// holds data for a fragment obu_fragment: Option>, + /// if we saw a keyframe since the last discont + seen_keyframe: bool, } impl Default for State { @@ -45,13 +53,15 @@ impl Default for State { needs_discont: true, found_valid_obu: false, obu_fragment: None, + seen_keyframe: false, } } } -#[derive(Debug, Default)] +#[derive(Default)] pub struct RTPAv1Depay { state: Mutex, + settings: Mutex, } static CAT: Lazy = Lazy::new(|| { @@ -79,7 +89,48 @@ impl ObjectSubclass for RTPAv1Depay { type ParentType = gst_rtp::RTPBaseDepayload; } -impl ObjectImpl for RTPAv1Depay {} +impl ObjectImpl for RTPAv1Depay { + fn properties() -> &'static [glib::ParamSpec] { + static PROPERTIES: Lazy> = Lazy::new(|| { + vec![ + glib::ParamSpecBoolean::builder("request-keyframe") + .nick("Request Keyframe") + .blurb("Request new keyframe when packet loss is detected") + .default_value(Settings::default().request_keyframe) + .mutable_ready() + .build(), + glib::ParamSpecBoolean::builder("wait-for-keyframe") + .nick("Wait For Keyframe") + .blurb("Wait for the next keyframe after packet loss") + .default_value(Settings::default().wait_for_keyframe) + .mutable_ready() + .build(), + ] + }); + + PROPERTIES.as_ref() + } + + fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) { + match pspec.name() { + "request-keyframe" => { + self.settings.lock().unwrap().request_keyframe = value.get().unwrap(); + } + "wait-for-keyframe" => { + self.settings.lock().unwrap().wait_for_keyframe = value.get().unwrap(); + } + _ => unimplemented!(), + }; + } + + fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { + match pspec.name() { + "request-keyframe" => self.settings.lock().unwrap().request_keyframe.to_value(), + "wait-for-keyframe" => self.settings.lock().unwrap().wait_for_keyframe.to_value(), + _ => unimplemented!(), + } + } +} impl GstObjectImpl for RTPAv1Depay {} @@ -200,6 +251,7 @@ impl RTPAv1Depay { ); let payload = rtp.payload().map_err(err_flow!(self, payload_buf))?; + let settings = self.settings.lock().unwrap().clone(); let mut state = self.state.lock().unwrap(); @@ -238,6 +290,28 @@ impl RTPAv1Depay { self.reset(&mut state); } + if aggr_header.start_of_seq { + state.seen_keyframe = true; + } + + // If this is a new temporal unit and we never saw a keyframe so far, + // handle this according to the request-keyframe / wait-for-keyframe properties. + if !state.seen_keyframe { + if settings.request_keyframe { + gst::debug!(CAT, imp: self, "Requesting keyframe from upstream"); + let event = gst_video::UpstreamForceKeyUnitEvent::builder() + .all_headers(true) + .build(); + let _ = self.obj().sink_pad().push_event(event); + } + + if settings.wait_for_keyframe { + gst::trace!(CAT, imp: self, "Waiting for keyframe"); + self.reset(&mut state); + return Ok(()); + } + } + // the next temporal unit starts with a temporal delimiter OBU ready_obus.extend_from_slice(&TEMPORAL_DELIMITER); }