diff --git a/docs/plugins/gst_plugins_cache.json b/docs/plugins/gst_plugins_cache.json index a99f4c66..a0097042 100644 --- a/docs/plugins/gst_plugins_cache.json +++ b/docs/plugins/gst_plugins_cache.json @@ -8733,6 +8733,21 @@ ], "kind": "object" }, + "GstRtpSendProfile": { + "kind": "enum", + "values": [ + { + "desc": "AVP profile as specified in RFC 3550", + "name": "avp", + "value": "0" + }, + { + "desc": "AVPF profile as specified in RFC 4585", + "name": "avpf", + "value": "1" + } + ] + }, "GstRtpVp8Pay2FragmentationMode": { "kind": "enum", "values": [ diff --git a/net/rtsp/src/rtspsrc/imp.rs b/net/rtsp/src/rtspsrc/imp.rs index 5ac905e3..b83e08f2 100644 --- a/net/rtsp/src/rtspsrc/imp.rs +++ b/net/rtsp/src/rtspsrc/imp.rs @@ -816,12 +816,12 @@ impl RtspSrc { ) .await? }; - let manager = RtspManager::new(std::env::var("USE_RTPBIN2").is_ok_and(|s| s == "1")); + let manager = RtspManager::new(std::env::var("USE_RTP2").is_ok_and(|s| s == "1")); let obj = self.obj(); - obj.add(&manager.inner) + manager + .add_to(obj.upcast_ref::()) .expect("Adding the manager cannot fail"); - manager.inner.sync_state_with_parent().unwrap(); let mut tcp_interleave_appsrcs = HashMap::new(); for (rtpsession_n, p) in state.setup_params.iter_mut().enumerate() { @@ -983,7 +983,7 @@ impl RtspSrc { obj.no_more_pads(); // Expose RTP srcpads - manager.inner.connect_pad_added(|manager, pad| { + manager.recv.connect_pad_added(|manager, pad| { if pad.direction() != gst::PadDirection::Src { return; } @@ -995,9 +995,9 @@ impl RtspSrc { }; let name = pad.name(); match *name.split('_').collect::>() { - // rtpbin and rtpbin2 + // rtpbin and rtp2 ["recv", "rtp", "src", stream_id, ssrc, pt] - | ["rtp", "recv", "src", stream_id, ssrc, pt] => { + | ["rtp", "src", stream_id, ssrc, pt] => { if stream_id.parse::().is_err() { gst::info!(CAT, "Ignoring srcpad with invalid stream id: {name}"); return; @@ -1128,16 +1128,27 @@ impl RtspSrc { } struct RtspManager { - inner: gst::Element, - using_rtpbin2: bool, + recv: gst::Element, + send: gst::Element, + using_rtp2: bool, } impl RtspManager { - fn new(rtpbin2: bool) -> Self { - let name = if rtpbin2 { "rtpbin2" } else { "rtpbin" }; - let manager = gst::ElementFactory::make_with_name(name, None) - .unwrap_or_else(|_| panic!("{name} not found")); - if !rtpbin2 { + fn new(rtp2: bool) -> Self { + let (recv, send) = if rtp2 { + let recv = gst::ElementFactory::make_with_name("rtprecv", None) + .unwrap_or_else(|_| panic!("rtprecv not found")); + let send = gst::ElementFactory::make("rtpsend") + .property("rtp-id", recv.property::("rtp-id")) + .build() + .unwrap_or_else(|_| panic!("rtpsend not found")); + (recv, send) + } else { + let e = gst::ElementFactory::make_with_name("rtpbin", None) + .unwrap_or_else(|_| panic!("rtpbin not found")); + (e.clone(), e) + }; + if !rtp2 { let on_bye = |args: &[glib::Value]| { let m = args[0].get::().unwrap(); let obj = m.parent()?; @@ -1145,46 +1156,62 @@ impl RtspManager { bin.send_event(gst::event::Eos::new()); None }; - manager.connect("on-bye-ssrc", true, move |args| { + recv.connect("on-bye-ssrc", true, move |args| { gst::info!(CAT, "Received BYE packet"); on_bye(args) }); - manager.connect("on-bye-timeout", true, move |args| { + recv.connect("on-bye-timeout", true, move |args| { gst::info!(CAT, "BYE due to timeout"); on_bye(args) }); } RtspManager { - inner: manager, - using_rtpbin2: rtpbin2, + recv, + send, + using_rtp2: rtp2, } } fn rtp_recv_sinkpad(&self, rtpsession: usize) -> Option { - let name = if self.using_rtpbin2 { - format!("rtp_recv_sink_{}", rtpsession) + let name = if self.using_rtp2 { + format!("rtp_sink_{}", rtpsession) } else { format!("recv_rtp_sink_{}", rtpsession) }; - self.inner.request_pad_simple(&name) + gst::info!(CAT, "requesting {name} for receiving RTP"); + self.recv.request_pad_simple(&name) } fn rtcp_recv_sinkpad(&self, rtpsession: usize) -> Option { - let name = if self.using_rtpbin2 { - format!("rtcp_recv_sink_{}", rtpsession) + let name = if self.using_rtp2 { + format!("rtcp_sink_{}", rtpsession) } else { format!("recv_rtcp_sink_{}", rtpsession) }; - self.inner.request_pad_simple(&name) + gst::info!(CAT, "requesting {name} for receiving RTCP"); + self.recv.request_pad_simple(&name) } fn rtcp_send_srcpad(&self, rtpsession: usize) -> Option { - let name = if self.using_rtpbin2 { - format!("rtcp_send_src_{}", rtpsession) + let name = if self.using_rtp2 { + format!("rtcp_src_{}", rtpsession) } else { format!("send_rtcp_src_{}", rtpsession) }; - self.inner.request_pad_simple(&name) + gst::info!(CAT, "requesting {name} for sending RTCP"); + self.send.request_pad_simple(&name) + } + + fn add_to>(&self, bin: &T) -> Result<(), glib::BoolError> { + if self.using_rtp2 { + bin.add_many([&self.recv, &self.send])?; + self.recv.sync_state_with_parent()?; + self.send.sync_state_with_parent()?; + } else { + bin.add_many([&self.recv])?; + self.recv.sync_state_with_parent()?; + } + Ok(()) } }