diff --git a/docs/plugins/gst_plugins_cache.json b/docs/plugins/gst_plugins_cache.json index 0fe6810a..175a9b0d 100644 --- a/docs/plugins/gst_plugins_cache.json +++ b/docs/plugins/gst_plugins_cache.json @@ -10496,6 +10496,18 @@ "type": "gboolean", "writable": true }, + "forward-metas": { + "blurb": "Comma-separated list of buffer meta names to forward over the control data channel. Currently supported names are: timecode", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "", + "mutable": "playing", + "readable": true, + "type": "gchararray", + "writable": true + }, "ice-transport-policy": { "blurb": "The policy to apply for ICE transport", "conditionally-available": false, diff --git a/net/webrtc/README.md b/net/webrtc/README.md index 1bc40045..f27e987c 100644 --- a/net/webrtc/README.md +++ b/net/webrtc/README.md @@ -257,6 +257,48 @@ Code in [gstwebrtc-api](gstwebrtc-api) is also licensed under the [Mozilla Public License Version 2.0]: http://opensource.org/licenses/MPL-2.0 +## Control data channel + +When the control data channel is enabled using the `enable-control-data-channel` +property, `webrtcsink` can optionally forward serialized metas and events to the +consumer. + +Currently only timecode metas are supported, but other metas and events can easily +be added in a backward-compatible manner. + +Here is an example for forwarding timecodes: + +``` +gst-launch-1.0 videotestsrc ! timecodestamper ! webrtcsink forward-metas="timecode" name=ws enable-control-data-channel=true run-signalling-server=true run-web-server=true +``` + +Point your web browser to `http://127.0.0.1:8080`, open the console and click +on the producer ID, you should see such messages in the console once the video +is displayed: + +``` +{ + "type": "InfoMessage", + "mid": "video0", + "info": { + "meta": { + "type": "timeCode", + "hours": 0, + "minutes": 0, + "seconds": 5, + "frames": 17, + "field_count": 0, + "fps": [ + 30, + 1 + ], + "flags": "none", + "latest_daily_jam": "2024-08-23T17:51:42+0200" + } + } +} +``` + ## Using the AWS KVS signaller * Setup AWS Kinesis Video Streams diff --git a/net/webrtc/gstwebrtc-api/index.html b/net/webrtc/gstwebrtc-api/index.html index 858696c9..2c3ddf12 100644 --- a/net/webrtc/gstwebrtc-api/index.html +++ b/net/webrtc/gstwebrtc-api/index.html @@ -418,6 +418,9 @@ entryElement.classList.add("has-remote-control"); submitRequestButtonElement.disabled = false; remoteController.attachVideoElement(videoElement); + remoteController.addEventListener("info", (e) => { + console.log("Received info message from producer: ", e.detail); + }); } else { entryElement.classList.remove("has-remote-control"); submitRequestButtonElement.disabled = true; diff --git a/net/webrtc/gstwebrtc-api/src/remote-controller.js b/net/webrtc/gstwebrtc-api/src/remote-controller.js index a32fde80..6af8c484 100644 --- a/net/webrtc/gstwebrtc-api/src/remote-controller.js +++ b/net/webrtc/gstwebrtc-api/src/remote-controller.js @@ -64,6 +64,23 @@ function getModifiers(event) { return modifiers.join("+"); } +/** + * Event name: "info".
+ * Triggered when a remote peer sends an information message over the control data channel. + * @event GstWebRTCAPI#InfoEvent + * @type {external:CustomEvent} + * @property {object} detail - The info message + * @see GstWebRTCAPI.RemoteController + */ +/** + * Event name: "controlResponse".
+ * Triggered when a remote peer sends a response after a control request. + * @event GstWebRTCAPI#ControlResponseEvent + * @type {external:CustomEvent} + * @property {object} detail - The response message + * @see GstWebRTCAPI.RemoteController + */ + /** * @class GstWebRTCAPI.RemoteController * @hideconstructor @@ -78,6 +95,8 @@ function getModifiers(event) { * @extends {external:EventTarget} * @fires {@link GstWebRTCAPI#event:ErrorEvent} * @fires {@link GstWebRTCAPI#event:ClosedEvent} + * @fires {@link GstWebRTCAPI#event:InfoEvent} + * @fires {@link GstWebRTCAPI#event:ControlResponseEvent} * @see GstWebRTCAPI.ConsumerSession#remoteController * @see GstWebRTCAPI.RemoteController#attachVideoElement * @see https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/tree/main/net/webrtc/gstwebrtc-api#produce-a-gstreamer-interactive-webrtc-stream-with-remote-control @@ -117,6 +136,8 @@ export default class RemoteController extends EventTarget { if (msg.type === "ControlResponseMessage") { this.dispatchEvent(new CustomEvent("controlResponse", { detail: msg })); + } else if (msg.type === "InfoMessage") { + this.dispatchEvent(new CustomEvent("info", { detail: msg })); } } catch (ex) { this.dispatchEvent(new ErrorEvent("error", { diff --git a/net/webrtc/src/utils.rs b/net/webrtc/src/utils.rs index a04350d6..0bd0fae9 100644 --- a/net/webrtc/src/utils.rs +++ b/net/webrtc/src/utils.rs @@ -1213,6 +1213,92 @@ pub fn deserialize_serde_object( Ok(ret.build()) } +#[derive(Debug, PartialEq, Eq)] +pub struct VideoTimeCodeFlags(gst_video::VideoTimeCodeFlags); + +impl serde::Serialize for VideoTimeCodeFlags { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + if self.0.bits() == 0 { + serializer.serialize_str("none") + } else { + let mut ret = String::new(); + if self.0.contains(gst_video::VideoTimeCodeFlags::DROP_FRAME) { + ret.push_str("drop-frame"); + if self.0.contains(gst_video::VideoTimeCodeFlags::INTERLACED) { + ret.push_str("+interlaced"); + } + } else if self.0.contains(gst_video::VideoTimeCodeFlags::INTERLACED) { + ret.push_str("interlaced"); + } + serializer.serialize_str(&ret) + } + } +} + +#[derive(Serialize, Debug, PartialEq, Eq)] +#[serde(rename_all = "camelCase")] +#[serde(tag = "type")] +pub enum Meta { + TimeCode { + hours: u32, + minutes: u32, + seconds: u32, + frames: u32, + field_count: u32, + fps: gst::Fraction, + flags: VideoTimeCodeFlags, + latest_daily_jam: Option, + }, +} + +#[derive(Serialize, Debug, PartialEq)] +#[serde(rename_all = "camelCase")] +pub enum Info { + Meta(Meta), +} + +#[derive(Serialize, Debug, PartialEq)] +#[serde(rename_all = "camelCase")] +#[serde(tag = "type")] +pub struct InfoMessage { + pub mid: String, + pub info: Info, +} + +pub fn serialize_meta(buffer: &gst::BufferRef, forward_metas: &HashSet) -> Vec { + let mut ret = vec![]; + + buffer.foreach_meta(|meta| { + if forward_metas.contains("timecode") { + if let Some(tc_meta) = meta.downcast_ref::() { + let tc = tc_meta.tc(); + ret.push(Meta::TimeCode { + hours: tc.hours(), + minutes: tc.minutes(), + seconds: tc.seconds(), + frames: tc.frames(), + field_count: tc.field_count(), + fps: tc.fps(), + flags: VideoTimeCodeFlags(tc.flags()), + latest_daily_jam: tc + .latest_daily_jam() + .and_then(|dt| { + let gst_dt: gst::DateTime = dt.into(); + gst_dt.to_iso8601_string().ok() + }) + .map(|s| s.to_string()), + }); + } + } + std::ops::ControlFlow::Continue(()) + }); + + ret +} + #[cfg(test)] mod tests { use super::*; @@ -1278,6 +1364,43 @@ mod tests { Ok(()) } + #[test] + fn test_serialize_meta() -> Result<(), String> { + gst::init().unwrap(); + + let mut buffer = gst::Buffer::new(); + let time_code = gst_video::VideoTimeCode::new( + gst::Fraction::new(30, 1), + None, + gst_video::VideoTimeCodeFlags::empty(), + 10, + 53, + 17, + 0, + 0, + ); + gst_video::VideoTimeCodeMeta::add( + buffer.get_mut().unwrap(), + &time_code.try_into().unwrap(), + ); + + assert_eq!( + serialize_meta(&buffer, &[String::from("timecode")].into()), + vec![Meta::TimeCode { + hours: 10, + minutes: 53, + seconds: 17, + frames: 0, + field_count: 0, + fps: gst::Fraction::new(30, 1), + flags: VideoTimeCodeFlags(gst_video::VideoTimeCodeFlags::empty()), + latest_daily_jam: None, + }] + ); + + Ok(()) + } + fn test_find_smallest_available_ext_id_case( ids: impl IntoIterator, expected: u32, diff --git a/net/webrtc/src/webrtcsink/imp.rs b/net/webrtc/src/webrtcsink/imp.rs index e11a6d40..83e2d719 100644 --- a/net/webrtc/src/webrtcsink/imp.rs +++ b/net/webrtc/src/webrtcsink/imp.rs @@ -85,6 +85,7 @@ const DEFAULT_WEB_SERVER_PATH: Option<&str> = None; const DEFAULT_WEB_SERVER_DIRECTORY: &str = "gstwebrtc-api/dist"; #[cfg(feature = "web_server")] const DEFAULT_WEB_SERVER_HOST_ADDR: &str = "http://127.0.0.1:8080"; +const DEFAULT_FORWARD_METAS: &str = ""; /* Start adding some FEC when the bitrate > 2Mbps as we found experimentally * that it is not worth it below that threshold */ #[cfg(feature = "v1_22")] @@ -126,6 +127,7 @@ struct Settings { web_server_directory: String, #[cfg(feature = "web_server")] web_server_host_addr: url::Url, + forward_metas: HashSet, } #[derive(Debug, Clone)] @@ -512,6 +514,7 @@ struct State { #[cfg(feature = "web_server")] web_join_handle: Option>, session_mids: HashMap>, + session_stream_names: HashMap>, } fn create_navigation_event(sink: &super::BaseWebRTCSink, msg: &str, session_id: &str) { @@ -687,6 +690,7 @@ impl Default for Settings { web_server_directory: String::from(DEFAULT_WEB_SERVER_DIRECTORY), #[cfg(feature = "web_server")] web_server_host_addr: url::Url::parse(DEFAULT_WEB_SERVER_HOST_ADDR).unwrap(), + forward_metas: HashSet::new(), } } } @@ -713,6 +717,7 @@ impl Default for State { #[cfg(feature = "web_server")] web_join_handle: None, session_mids: HashMap::new(), + session_stream_names: HashMap::new(), } } } @@ -2859,6 +2864,45 @@ impl BaseWebRTCSink { signaller.add_ice(&session_id, &candidate, sdp_m_line_index, None) } + fn send_buffer_metas( + &self, + state: &State, + settings: &Settings, + session_id: &str, + buffer: &gst::BufferRef, + ) { + // This should probably never happen as the transform + // function for the dye meta always copies, but we can't + // rely on the behavior of all encoders / payloaders. + let Ok(dye_meta) = gst::meta::CustomMeta::from_buffer(buffer, "webrtcsink-dye") else { + return; + }; + let stream_name = dye_meta.structure().get::("stream-name").unwrap(); + let Some(mid) = state + .session_stream_names + .get(session_id) + .and_then(|names| names.get(&stream_name)) + else { + return; + }; + + if let Some(ref handler) = state.control_events_handler { + for meta in utils::serialize_meta(buffer, &settings.forward_metas) { + match serde_json::to_string(&utils::InfoMessage { + mid: mid.to_owned(), + info: utils::Info::Meta(meta), + }) { + Ok(msg) => { + handler.0 .1.send_string(Some(msg.as_str())); + } + Err(err) => { + gst::warning!(CAT, imp = self, "Failed to serialize info message: {err:?}",); + } + } + } + } + } + /// Called by the signaller to add a new session fn start_session( &self, @@ -2996,6 +3040,64 @@ impl BaseWebRTCSink { _ => None, }; + webrtcbin.connect_closure( + "deep-element-added", + false, + glib::closure!( + #[strong] + session_id, + #[watch] + element, + move |_webrtcbin: gst::Element, _bin: gst::Bin, e: gst::Element| { + if e.factory().map_or(false, |f| f.name() == "nicesink") { + let sinkpad = e.static_pad("sink").unwrap(); + + let session_id = session_id.clone(); + let element_clone = element.downgrade(); + sinkpad.add_probe( + gst::PadProbeType::BUFFER + | gst::PadProbeType::BUFFER_LIST + | gst::PadProbeType::EVENT_DOWNSTREAM, + move |_pad, info| { + let Some(element) = element_clone.upgrade() else { + return gst::PadProbeReturn::Remove; + }; + let this = element.imp(); + let settings = this.settings.lock().unwrap(); + if settings.forward_metas.is_empty() { + return gst::PadProbeReturn::Ok; + } + + let state = this.state.lock().unwrap(); + match info.data { + Some(gst::PadProbeData::Buffer(ref buffer)) => { + this.send_buffer_metas( + &state, + &settings, + &session_id, + buffer, + ); + } + Some(gst::PadProbeData::BufferList(ref list)) => { + for buffer in list.iter() { + this.send_buffer_metas( + &state, + &settings, + &session_id, + buffer, + ); + } + } + _ => (), + } + gst::PadProbeReturn::Ok + }, + ); + } + } + ), + ); + pipeline.add(&webrtcbin).unwrap(); webrtcbin.connect_closure( @@ -3560,6 +3662,11 @@ impl BaseWebRTCSink { .entry(session_id.clone()) .or_default() .insert(mid.to_string(), stream_name.clone()); + state + .session_stream_names + .entry(session_id.clone()) + .or_default() + .insert(stream_name.clone(), mid.to_string()); } if let Some(producer) = state @@ -4239,11 +4346,27 @@ impl BaseWebRTCSink { )); } + fn dye_buffer(&self, mut buffer: gst::Buffer, stream_name: &str) -> Result { + let buf_mut = buffer.make_mut(); + + let mut m = gst::meta::CustomMeta::add(buf_mut, "webrtcsink-dye")?; + + m.mut_structure().set("stream-name", stream_name); + + Ok(buffer) + } + fn chain( &self, pad: &gst::GhostPad, - buffer: gst::Buffer, + mut buffer: gst::Buffer, ) -> Result { + buffer = self + .dye_buffer(buffer, pad.name().as_str()) + .map_err(|err| { + gst::error!(CAT, obj = pad, "Failed to dye buffer: {err}"); + gst::FlowError::Error + })?; self.start_stream_discovery_if_needed(pad.name().as_str()); self.feed_discoveries(pad.name().as_str(), &buffer); @@ -4259,8 +4382,30 @@ impl ObjectSubclass for BaseWebRTCSink { type Interfaces = (gst::ChildProxy, gst_video::Navigation); } +fn register_dye_meta() { + use std::sync::Once; + static INIT: Once = Once::new(); + + INIT.call_once(|| { + gst::meta::CustomMeta::register_with_transform( + "webrtcsink-dye", + &[], + move |outbuf, meta, _inbuf, _type| { + let stream_name = meta.structure().get::("stream-name").unwrap(); + if let Ok(mut m) = gst::meta::CustomMeta::add(outbuf, "webrtcsink-dye") { + m.mut_structure().set("stream-name", stream_name); + true + } else { + false + } + }, + ); + }); +} + unsafe impl IsSubclassable for super::BaseWebRTCSink { fn class_init(class: &mut glib::Class) { + register_dye_meta(); Self::parent_class_init::(class); } } @@ -4484,6 +4629,24 @@ impl ObjectImpl for BaseWebRTCSink { .blurb("Address the web server should listen on") .default_value(DEFAULT_WEB_SERVER_HOST_ADDR) .build(), + /** + * GstBaseWebRTCSink:forward-metas: + * + * Comma-separated list of buffer metas to forward over the + * control data channel, if any. + * + * Currently supported names are: + * + * - timecode + * + * Since: plugins-rs-0.14.0 + */ + glib::ParamSpecString::builder("forward-metas") + .nick("Forward metas") + .blurb("Comma-separated list of buffer meta names to forward over the control data channel. Currently supported names are: timecode") + .default_value(DEFAULT_FORWARD_METAS) + .mutable_playing() + .build(), ] }); @@ -4616,6 +4779,15 @@ impl ObjectImpl for BaseWebRTCSink { }; settings.web_server_host_addr = host_addr; } + "forward-metas" => { + let mut settings = self.settings.lock().unwrap(); + settings.forward_metas = value + .get::() + .expect("type checked upstream") + .split(",") + .map(String::from) + .collect(); + } _ => unimplemented!(), } } @@ -4714,6 +4886,10 @@ impl ObjectImpl for BaseWebRTCSink { let settings = self.settings.lock().unwrap(); settings.web_server_host_addr.to_string().to_value() } + "forward-metas" => { + let settings = self.settings.lock().unwrap(); + settings.forward_metas.iter().join(",").to_value() + } _ => unimplemented!(), } }