From 092ae1fec83b2625b48cf90a21e58ccfffa62902 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Laignel?= Date: Thu, 20 Apr 2023 18:38:46 +0200 Subject: [PATCH] net/webrtc: sink: add signal "request-encoded-filter" The new "request-encoded-filter" signal is emitted when the encoder and related elements are added to the pipeline. When defined, the element returned by the signal is inserted between the encoder and the payloader. The transformation can be reverted using the [insertable streams API] on the receiver side. [insertable streams API]: https://developer.mozilla.org/en-US/docs/Web/API/Insertable_Streams_for_MediaStreamTrack_API Part-of: --- net/webrtc/src/webrtcsink/imp.rs | 73 ++++++++++++++++++++++++++++---- 1 file changed, 64 insertions(+), 9 deletions(-) diff --git a/net/webrtc/src/webrtcsink/imp.rs b/net/webrtc/src/webrtcsink/imp.rs index d8d5414e..c3fbda7c 100644 --- a/net/webrtc/src/webrtcsink/imp.rs +++ b/net/webrtc/src/webrtcsink/imp.rs @@ -494,6 +494,7 @@ fn setup_encoding( input_caps: &gst::Caps, output_caps: &gst::Caps, codec: &Codec, + mut encoded_filter: Option, ssrc: Option, twcc: Option, ) -> Result<(gst::Element, gst::Element, gst::Element), Error> { @@ -540,6 +541,16 @@ fn setup_encoding( let codec_name = codec.caps.structure(0).unwrap().name(); + let enc_last = if let Some(encoded_filter) = encoded_filter.take() { + pipeline.add(&encoded_filter).unwrap(); + enc.link(&encoded_filter) + .with_context(|| "Linking encoded filter")?; + + encoded_filter + } else { + enc.clone() + }; + if let Some(parser) = if codec_name == "video/x-h264" { Some(make_element("h264parse", None)?) } else if codec_name == "video/x-h265" { @@ -548,10 +559,10 @@ fn setup_encoding( None } { pipeline.add(&parser).unwrap(); - gst::Element::link_many([&enc, &parser, &parse_filter]) + gst::Element::link_many([&enc_last, &parser, &parse_filter]) .with_context(|| "Linking encoding elements")?; } else { - gst::Element::link_many([&enc, &parse_filter]) + gst::Element::link_many([&enc_last, &parse_filter]) .with_context(|| "Linking encoding elements")?; } @@ -917,6 +928,11 @@ impl Session { .clone() .unwrap_or_else(gst::Caps::new_any); + let encoded_filter = element.emit_by_name::>( + "request-encoded-filter", + &[&Some(&self.peer_id), &stream_name, &codec.caps], + ); + let (enc, raw_filter, encoding_sink) = setup_encoding( element, &self.pipeline, @@ -924,6 +940,7 @@ impl Session { &webrtc_pad.in_caps, &output_caps, &codec, + encoded_filter, Some(webrtc_pad.ssrc), None, )?; @@ -1203,6 +1220,7 @@ impl BaseWebRTCSink { &payloaders, media, &stream.in_caps.as_ref().unwrap().clone(), + &stream.sink_pad.name(), settings, ) .await; @@ -1680,6 +1698,7 @@ impl BaseWebRTCSink { payloaders: &gst::glib::List, media: &gst_sdp::SDPMediaRef, in_caps: &gst::Caps, + stream_name: &str, settings: &Settings, ) -> Option { let user_caps = match media.media() { @@ -1763,13 +1782,20 @@ impl BaseWebRTCSink { .iter() .flat_map(|(_, codecs_and_caps)| codecs_and_caps) .map(|(codec, caps)| async move { - BaseWebRTCSink::run_discovery_pipeline(element, codec, in_caps, caps, twcc_idx) - .await - .map(|s| { - let mut codec = codec.clone(); - codec.output_filter = Some([s].into_iter().collect()); - codec - }) + BaseWebRTCSink::run_discovery_pipeline( + element, + stream_name, + codec, + in_caps, + caps, + twcc_idx, + ) + .await + .map(|s| { + let mut codec = codec.clone(); + codec.output_filter = Some([s].into_iter().collect()); + codec + }) }); /* Run sequentially to avoid NVENC collisions */ @@ -2704,6 +2730,7 @@ impl BaseWebRTCSink { async fn run_discovery_pipeline( element: &super::BaseWebRTCSink, + stream_name: &str, codec: &Codec, caps: &gst::Caps, output_caps: &gst::Caps, @@ -2735,6 +2762,11 @@ impl BaseWebRTCSink { gst::Element::link_many(elements_slice) .with_context(|| format!("Running discovery pipeline for caps {caps}"))?; + let encoded_filter = element.emit_by_name::>( + "request-encoded-filter", + &[&Option::::None, &stream_name, &codec.caps], + ); + let (_, _, encoding_sink) = setup_encoding( element, &pipe.0, @@ -2742,6 +2774,7 @@ impl BaseWebRTCSink { caps, output_caps, codec, + encoded_filter, None, twcc, )?; @@ -2847,6 +2880,7 @@ impl BaseWebRTCSink { .map(|(_, codec)| { BaseWebRTCSink::run_discovery_pipeline( element, + &name, codec, &sink_caps, &output_caps, @@ -3358,6 +3392,27 @@ impl ObjectImpl for BaseWebRTCSink { Some(false.to_value()) }) .build(), + /** + * RsWebRTCSink::request-encoded-filter: + * @consumer_id: Identifier of the consumer + * @pad_name: The name of the corresponding input pad + * @encoded_caps: The Caps of the encoded stream + * + * This signal can be used to insert a filter + * element between the encoder and the payloader. + * + * When called during Caps discovery, the `consumer_id` is `None`. + * + * Returns: the element to insert. + */ + glib::subclass::Signal::builder("request-encoded-filter") + .param_types([ + Option::::static_type(), + String::static_type(), + gst::Caps::static_type(), + ]) + .return_type::() + .build(), ] });