rtspsrc2: Update rtpbin2 support to use rtprecv and rtpsend

USE_RTPBIN2 is now USE_RTP2 because there is no "rtpbin2" now.
This commit is contained in:
Nirbheek Chauhan 2024-03-14 12:55:34 +05:30
parent e3458bb58b
commit 0c033f65ce

View file

@ -816,12 +816,11 @@ impl RtspSrc {
)
.await?
};
let manager = RtspManager::new(std::env::var("USE_RTPBIN2").is_ok_and(|s| s == "1"));
let manager = RtspManager::new(std::env::var("USE_RTP2").is_ok_and(|s| s == "1"));
let obj = self.obj();
obj.add(&manager.inner)
manager.add_to(obj.upcast_ref::<gst::Bin>())
.expect("Adding the manager cannot fail");
manager.inner.sync_state_with_parent().unwrap();
let mut tcp_interleave_appsrcs = HashMap::new();
for (rtpsession_n, p) in state.setup_params.iter_mut().enumerate() {
@ -983,7 +982,7 @@ impl RtspSrc {
obj.no_more_pads();
// Expose RTP srcpads
manager.inner.connect_pad_added(|manager, pad| {
manager.recv.connect_pad_added(|manager, pad| {
if pad.direction() != gst::PadDirection::Src {
return;
}
@ -995,9 +994,9 @@ impl RtspSrc {
};
let name = pad.name();
match *name.split('_').collect::<Vec<_>>() {
// rtpbin and rtpbin2
// rtpbin and rtp2
["recv", "rtp", "src", stream_id, ssrc, pt]
| ["rtp", "recv", "src", stream_id, ssrc, pt] => {
| ["rtp", "src", stream_id, ssrc, pt] => {
if stream_id.parse::<u32>().is_err() {
gst::info!(CAT, "Ignoring srcpad with invalid stream id: {name}");
return;
@ -1128,16 +1127,27 @@ impl RtspSrc {
}
struct RtspManager {
inner: gst::Element,
using_rtpbin2: bool,
recv: gst::Element,
send: gst::Element,
using_rtp2: bool,
}
impl RtspManager {
fn new(rtpbin2: bool) -> Self {
let name = if rtpbin2 { "rtpbin2" } else { "rtpbin" };
let manager = gst::ElementFactory::make_with_name(name, None)
.unwrap_or_else(|_| panic!("{name} not found"));
if !rtpbin2 {
fn new(rtp2: bool) -> Self {
let (recv, send) = if rtp2 {
let recv = gst::ElementFactory::make_with_name("rtprecv")
.unwrap_or_else(|_| panic!("rtprecv not found"));
let send = gst::ElementFactory::make("rtpsend")
.property("rtp-id", recv.property::<String>("rtp-id"))
.build()
.unwrap_or_else(|_| panic!("rtpsend not found"));
(recv, send)
} else {
let e = gst::ElementFactory::make_with_name("rtpbin", None)
.unwrap_or_else(|_| panic!("rtpbin not found"));
(e.clone(), e)
};
if !rtp2 {
let on_bye = |args: &[glib::Value]| {
let m = args[0].get::<gst::Element>().unwrap();
let Some(obj) = m.parent() else {
@ -1147,46 +1157,62 @@ impl RtspManager {
bin.send_event(gst::event::Eos::new());
None
};
manager.connect("on-bye-ssrc", true, move |args| {
recv.connect("on-bye-ssrc", true, move |args| {
gst::info!(CAT, "Received BYE packet");
on_bye(args)
});
manager.connect("on-bye-timeout", true, move |args| {
recv.connect("on-bye-timeout", true, move |args| {
gst::info!(CAT, "BYE due to timeout");
on_bye(args)
});
}
RtspManager {
inner: manager,
using_rtpbin2: rtpbin2,
recv,
send,
using_rtp2: rtp2,
}
}
fn rtp_recv_sinkpad(&self, rtpsession: usize) -> Option<gst::Pad> {
let name = if self.using_rtpbin2 {
format!("rtp_recv_sink_{}", rtpsession)
let name = if self.using_rtp2 {
format!("rtp_sink_{}", rtpsession)
} else {
format!("recv_rtp_sink_{}", rtpsession)
};
self.inner.request_pad_simple(&name)
gst::info!(CAT, "requesting {name} for receiving RTP");
self.recv.request_pad_simple(&name)
}
fn rtcp_recv_sinkpad(&self, rtpsession: usize) -> Option<gst::Pad> {
let name = if self.using_rtpbin2 {
format!("rtcp_recv_sink_{}", rtpsession)
let name = if self.using_rtp2 {
format!("rtcp_sink_{}", rtpsession)
} else {
format!("recv_rtcp_sink_{}", rtpsession)
};
self.inner.request_pad_simple(&name)
gst::info!(CAT, "requesting {name} for receiving RTCP");
self.recv.request_pad_simple(&name)
}
fn rtcp_send_srcpad(&self, rtpsession: usize) -> Option<gst::Pad> {
let name = if self.using_rtpbin2 {
format!("rtcp_send_src_{}", rtpsession)
let name = if self.using_rtp2 {
format!("rtcp_src_{}", rtpsession)
} else {
format!("send_rtcp_src_{}", rtpsession)
};
self.inner.request_pad_simple(&name)
gst::info!(CAT, "requesting {name} for sending RTCP");
self.send.request_pad_simple(&name)
}
fn add_to<T: IsA<gst::Bin>>(&self, bin: &T) -> Result<(), glib::BoolError> {
if self.using_rtp2 {
bin.add_many(&[&self.recv, &self.send])?;
self.recv.sync_state_with_parent()?;
self.send.sync_state_with_parent()?;
} else {
bin.add_many(&[&self.recv])?;
self.recv.sync_state_with_parent()?;
}
Ok(())
}
}