rtspsrc2: Fix RTCP send/recv in the multicast case

Don't use connect(), since that is incompatible with multicast.
Instead, drop received packets that are from senders we do not want.

Also set multicast loopback = false so we don't receive RTCP RRs from
ourselves and interpret them as RTCP SRs.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1425>
This commit is contained in:
Nirbheek Chauhan 2024-02-01 01:47:57 +05:30
parent e59f3bbe58
commit 7a1cd675c2

View file

@ -849,6 +849,7 @@ impl RtspSrc {
if let Some(ttl) = ttl {
let _ = rtp_socket.set_multicast_ttl_v4(*ttl as u32);
}
let _ = rtp_socket.set_multicast_loop_v4(false);
if let Some(rtcp_socket) = &rtcp_socket {
if let Err(err) =
rtcp_socket.join_multicast_v4(*addr, Ipv4Addr::UNSPECIFIED)
@ -857,20 +858,25 @@ impl RtspSrc {
CAT,
"Failed to join RTCP multicast address {addr}: {err:?}"
);
}
} else {
if let Some(ttl) = ttl {
let _ = rtcp_socket.set_multicast_ttl_v4(*ttl as u32);
}
let _ = rtcp_socket.set_multicast_loop_v4(false);
}
}
}
IpAddr::V6(addr) => {
rtp_socket.join_multicast_v6(addr, 0)?;
let _ = rtp_socket.set_multicast_loop_v6(false);
if let Some(rtcp_socket) = &rtcp_socket {
if let Err(err) = rtcp_socket.join_multicast_v6(addr, 0) {
gst::warning!(
CAT,
"Failed to join RTCP multicast address {addr}: {err:?}"
);
} else {
let _ = rtcp_socket.set_multicast_loop_v6(false);
}
}
}
@ -885,16 +891,18 @@ impl RtspSrc {
rtp_appsrc,
settings.timeout,
settings.receive_mtu,
None,
)
.await
}));
// Spawn RTCP udp send/recv task
if let Some(rtcp_socket) = rtcp_socket {
let rtcp_dest = rtcp_port.and_then(|p| Some(SocketAddr::new(*dest, p)));
let rtcp_appsrc = self.make_rtcp_appsrc(rtpsession_n, &manager)?;
self.make_rtcp_appsink(rtpsession_n, &manager, on_rtcp)?;
state.handles.push(RUNTIME.spawn(async move {
udp_rtcp_task(&rtcp_socket, rtcp_appsrc, rx).await
udp_rtcp_task(&rtcp_socket, rtcp_appsrc, rtcp_dest, true, rx).await
}));
}
}
@ -912,18 +920,18 @@ impl RtspSrc {
);
continue;
};
if let Some((server_rtp_port, server_rtcp_port)) = server_port {
let _ = rtp_socket
.connect(&format!(
"{}:{server_rtp_port}",
source.as_ref().expect("Must have source address")
))
.await;
if let (Some(source), Some(port), Some(s)) =
(source, server_rtcp_port, rtcp_socket.as_ref())
{
let _ = s.connect(&format!("{source}:{port}")).await;
let (rtp_sender_addr, rtcp_sender_addr) = match (source, server_port) {
(Some(ip), Some((rtp_port, Some(rtcp_port)))) => {
let ip = ip.parse().unwrap();
(
Some(SocketAddr::new(ip, *rtp_port)),
Some(SocketAddr::new(ip, *rtcp_port)),
)
}
(Some(ip), Some((rtp_port, None))) => {
(Some(SocketAddr::new(ip.parse().unwrap(), *rtp_port)), None)
}
_ => (None, None),
};
// Spawn RTP udp receive task
@ -935,6 +943,7 @@ impl RtspSrc {
rtp_appsrc,
settings.timeout,
settings.receive_mtu,
rtp_sender_addr,
)
.await
}));
@ -944,7 +953,8 @@ impl RtspSrc {
let rtcp_appsrc = self.make_rtcp_appsrc(rtpsession_n, &manager)?;
self.make_rtcp_appsink(rtpsession_n, &manager, on_rtcp)?;
state.handles.push(RUNTIME.spawn(async move {
udp_rtcp_task(&rtcp_socket, rtcp_appsrc, rx).await
udp_rtcp_task(&rtcp_socket, rtcp_appsrc, rtcp_sender_addr, false, rx)
.await
}));
}
}
@ -1860,19 +1870,17 @@ async fn udp_rtp_task(
appsrc: gst_app::AppSrc,
timeout: gst::ClockTime,
receive_mtu: u32,
sender_addr: Option<SocketAddr>,
) {
let t = Duration::from_secs(timeout.into());
let addr = match socket.peer_addr() {
Ok(addr) => addr,
// We would not be connected if the server didn't give us a Transport header or its
// Transport header didn't specify the server port, so we don't know the sender port from
// which we will get data till we get the first packet here.
Err(_) => {
let sender_addr = match sender_addr {
Some(addr) => addr,
// Server didn't give us a Transport header or its Transport header didn't specify the
// server port, so we don't know the sender port from which we will get data till we get
// the first packet here.
None => {
let ret = match time::timeout(t, socket.peek_sender()).await {
Ok(Ok(addr)) => {
let _ = socket.connect(addr).await;
Ok(addr)
}
Ok(Ok(addr)) => Ok(addr),
Ok(Err(_elapsed)) => Err(format!(
"No data after {} seconds, exiting",
timeout.seconds()
@ -1893,9 +1901,10 @@ async fn udp_rtp_task(
}
}
};
gst::info!(CAT, "Receiving from address {sender_addr:?}");
let gio_addr = {
let inet_addr: gio::InetAddress = addr.ip().into();
gio::InetSocketAddress::new(&inet_addr, addr.port())
let inet_addr: gio::InetAddress = sender_addr.ip().into();
gio::InetSocketAddress::new(&inet_addr, sender_addr.port())
};
let mut size = receive_mtu;
let caps = appsrc.caps();
@ -1911,8 +1920,12 @@ async fn udp_rtp_task(
let Ok(mut map) = buffer.into_mapped_buffer_writable() else {
break "Failed to map buffer writable".to_string();
};
match time::timeout(t, socket.recv(map.as_mut_slice())).await {
Ok(Ok(len)) => {
match time::timeout(t, socket.recv_from(map.as_mut_slice())).await {
Ok(Ok((len, addr))) => {
// Ignore packets from the wrong sender
if addr != sender_addr {
continue;
}
if size < UDP_PACKET_MAX_SIZE && len == size as usize {
gst::warning!(
CAT,
@ -1936,6 +1949,7 @@ async fn udp_rtp_task(
bufref.set_size(len);
bufref.set_dts(t);
gst_net::NetAddressMeta::add(bufref, &gio_addr);
gst::trace!(CAT, "received RTP packet from {addr:?}");
if let Err(err) = appsrc.push_buffer(buffer) {
break format!("UDP buffer push failed: {err:?}");
}
@ -1957,27 +1971,27 @@ async fn udp_rtp_task(
async fn udp_rtcp_task(
socket: &UdpSocket,
appsrc: gst_app::AppSrc,
mut sender_addr: Option<SocketAddr>,
is_multicast: bool,
mut rx: mpsc::Receiver<MappedBuffer<Readable>>,
) {
// The socket might not be connected if the server either didn't specify a server_port for
// RTCP, or if the server didn't send a Transport header in the SETUP response at all.
// In that case, we will connect when we get an RTCP packet.
let mut is_connected = socket.peer_addr().is_ok();
let mut buf = vec![0; UDP_PACKET_MAX_SIZE as usize];
let mut cache: LruCache<_, _> = LruCache::new(NonZeroUsize::new(RTCP_ADDR_CACHE_SIZE).unwrap());
let error = loop {
tokio::select! {
send_rtcp = rx.recv() => match send_rtcp {
Some(data) => match socket.send(data.as_ref()).await {
Ok(_) => gst::debug!(CAT, "Sent RTCP RR"),
// The server either didn't specify a server_port for RTCP, or if the server didn't
// send a Transport header in the SETUP response at all.
Some(data) => if let Some(addr) = sender_addr.as_ref() {
match socket.send_to(data.as_ref(), addr).await {
Ok(_) => gst::debug!(CAT, "Sent RTCP RR packet"),
Err(err) => {
if !is_connected {
gst::warning!(CAT, "Can't send RTCP yet: don't have dest addr");
} else {
rx.close();
break format!("RTCP send error: {err:?}, stopping task");
}
}
} else {
gst::warning!(CAT, "Can't send RTCP yet: don't have dest addr");
},
None => {
rx.close();
@ -1986,11 +2000,15 @@ async fn udp_rtcp_task(
},
recv_rtcp = socket.recv_from(&mut buf) => match recv_rtcp {
Ok((len, addr)) => {
gst::debug!(CAT, "Received RTCP SR");
if !is_connected {
gst::info!(CAT, "Delayed RTCP UDP connect to {addr:?}");
let _ = socket.connect(addr).await;
is_connected = true;
gst::debug!(CAT, "Received RTCP packet");
if let Some(sender_addr) = sender_addr {
// Ignore RTCP from the wrong sender
if !is_multicast && addr != sender_addr {
continue;
}
} else {
sender_addr.replace(addr);
gst::info!(CAT, "Delayed RTCP UDP send address: {addr:?}");
};
let t = appsrc.current_running_time();
let mut buffer = gst::Buffer::from_slice(buf[..len].to_owned());