rtspsrc2: Update rtpbin2 support to use rtprecv and rtpsend

USE_RTPBIN2 is now USE_RTP2 because there is no "rtpbin2" now.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1426>
This commit is contained in:
Nirbheek Chauhan 2024-03-14 12:55:34 +05:30 committed by Matthew Waters
parent 1600d3b055
commit 9485265769
2 changed files with 68 additions and 26 deletions

View file

@ -8733,6 +8733,21 @@
], ],
"kind": "object" "kind": "object"
}, },
"GstRtpSendProfile": {
"kind": "enum",
"values": [
{
"desc": "AVP profile as specified in RFC 3550",
"name": "avp",
"value": "0"
},
{
"desc": "AVPF profile as specified in RFC 4585",
"name": "avpf",
"value": "1"
}
]
},
"GstRtpVp8Pay2FragmentationMode": { "GstRtpVp8Pay2FragmentationMode": {
"kind": "enum", "kind": "enum",
"values": [ "values": [

View file

@ -816,12 +816,12 @@ impl RtspSrc {
) )
.await? .await?
}; };
let manager = RtspManager::new(std::env::var("USE_RTPBIN2").is_ok_and(|s| s == "1")); let manager = RtspManager::new(std::env::var("USE_RTP2").is_ok_and(|s| s == "1"));
let obj = self.obj(); let obj = self.obj();
obj.add(&manager.inner) manager
.add_to(obj.upcast_ref::<gst::Bin>())
.expect("Adding the manager cannot fail"); .expect("Adding the manager cannot fail");
manager.inner.sync_state_with_parent().unwrap();
let mut tcp_interleave_appsrcs = HashMap::new(); let mut tcp_interleave_appsrcs = HashMap::new();
for (rtpsession_n, p) in state.setup_params.iter_mut().enumerate() { for (rtpsession_n, p) in state.setup_params.iter_mut().enumerate() {
@ -983,7 +983,7 @@ impl RtspSrc {
obj.no_more_pads(); obj.no_more_pads();
// Expose RTP srcpads // Expose RTP srcpads
manager.inner.connect_pad_added(|manager, pad| { manager.recv.connect_pad_added(|manager, pad| {
if pad.direction() != gst::PadDirection::Src { if pad.direction() != gst::PadDirection::Src {
return; return;
} }
@ -995,9 +995,9 @@ impl RtspSrc {
}; };
let name = pad.name(); let name = pad.name();
match *name.split('_').collect::<Vec<_>>() { match *name.split('_').collect::<Vec<_>>() {
// rtpbin and rtpbin2 // rtpbin and rtp2
["recv", "rtp", "src", stream_id, ssrc, pt] ["recv", "rtp", "src", stream_id, ssrc, pt]
| ["rtp", "recv", "src", stream_id, ssrc, pt] => { | ["rtp", "src", stream_id, ssrc, pt] => {
if stream_id.parse::<u32>().is_err() { if stream_id.parse::<u32>().is_err() {
gst::info!(CAT, "Ignoring srcpad with invalid stream id: {name}"); gst::info!(CAT, "Ignoring srcpad with invalid stream id: {name}");
return; return;
@ -1128,16 +1128,27 @@ impl RtspSrc {
} }
struct RtspManager { struct RtspManager {
inner: gst::Element, recv: gst::Element,
using_rtpbin2: bool, send: gst::Element,
using_rtp2: bool,
} }
impl RtspManager { impl RtspManager {
fn new(rtpbin2: bool) -> Self { fn new(rtp2: bool) -> Self {
let name = if rtpbin2 { "rtpbin2" } else { "rtpbin" }; let (recv, send) = if rtp2 {
let manager = gst::ElementFactory::make_with_name(name, None) let recv = gst::ElementFactory::make_with_name("rtprecv", None)
.unwrap_or_else(|_| panic!("{name} not found")); .unwrap_or_else(|_| panic!("rtprecv not found"));
if !rtpbin2 { let send = gst::ElementFactory::make("rtpsend")
.property("rtp-id", recv.property::<String>("rtp-id"))
.build()
.unwrap_or_else(|_| panic!("rtpsend not found"));
(recv, send)
} else {
let e = gst::ElementFactory::make_with_name("rtpbin", None)
.unwrap_or_else(|_| panic!("rtpbin not found"));
(e.clone(), e)
};
if !rtp2 {
let on_bye = |args: &[glib::Value]| { let on_bye = |args: &[glib::Value]| {
let m = args[0].get::<gst::Element>().unwrap(); let m = args[0].get::<gst::Element>().unwrap();
let obj = m.parent()?; let obj = m.parent()?;
@ -1145,46 +1156,62 @@ impl RtspManager {
bin.send_event(gst::event::Eos::new()); bin.send_event(gst::event::Eos::new());
None None
}; };
manager.connect("on-bye-ssrc", true, move |args| { recv.connect("on-bye-ssrc", true, move |args| {
gst::info!(CAT, "Received BYE packet"); gst::info!(CAT, "Received BYE packet");
on_bye(args) on_bye(args)
}); });
manager.connect("on-bye-timeout", true, move |args| { recv.connect("on-bye-timeout", true, move |args| {
gst::info!(CAT, "BYE due to timeout"); gst::info!(CAT, "BYE due to timeout");
on_bye(args) on_bye(args)
}); });
} }
RtspManager { RtspManager {
inner: manager, recv,
using_rtpbin2: rtpbin2, send,
using_rtp2: rtp2,
} }
} }
fn rtp_recv_sinkpad(&self, rtpsession: usize) -> Option<gst::Pad> { fn rtp_recv_sinkpad(&self, rtpsession: usize) -> Option<gst::Pad> {
let name = if self.using_rtpbin2 { let name = if self.using_rtp2 {
format!("rtp_recv_sink_{}", rtpsession) format!("rtp_sink_{}", rtpsession)
} else { } else {
format!("recv_rtp_sink_{}", rtpsession) format!("recv_rtp_sink_{}", rtpsession)
}; };
self.inner.request_pad_simple(&name) gst::info!(CAT, "requesting {name} for receiving RTP");
self.recv.request_pad_simple(&name)
} }
fn rtcp_recv_sinkpad(&self, rtpsession: usize) -> Option<gst::Pad> { fn rtcp_recv_sinkpad(&self, rtpsession: usize) -> Option<gst::Pad> {
let name = if self.using_rtpbin2 { let name = if self.using_rtp2 {
format!("rtcp_recv_sink_{}", rtpsession) format!("rtcp_sink_{}", rtpsession)
} else { } else {
format!("recv_rtcp_sink_{}", rtpsession) format!("recv_rtcp_sink_{}", rtpsession)
}; };
self.inner.request_pad_simple(&name) gst::info!(CAT, "requesting {name} for receiving RTCP");
self.recv.request_pad_simple(&name)
} }
fn rtcp_send_srcpad(&self, rtpsession: usize) -> Option<gst::Pad> { fn rtcp_send_srcpad(&self, rtpsession: usize) -> Option<gst::Pad> {
let name = if self.using_rtpbin2 { let name = if self.using_rtp2 {
format!("rtcp_send_src_{}", rtpsession) format!("rtcp_src_{}", rtpsession)
} else { } else {
format!("send_rtcp_src_{}", rtpsession) format!("send_rtcp_src_{}", rtpsession)
}; };
self.inner.request_pad_simple(&name) gst::info!(CAT, "requesting {name} for sending RTCP");
self.send.request_pad_simple(&name)
}
fn add_to<T: IsA<gst::Bin>>(&self, bin: &T) -> Result<(), glib::BoolError> {
if self.using_rtp2 {
bin.add_many([&self.recv, &self.send])?;
self.recv.sync_state_with_parent()?;
self.send.sync_state_with_parent()?;
} else {
bin.add_many([&self.recv])?;
self.recv.sync_state_with_parent()?;
}
Ok(())
} }
} }