From 975556c06bb709191dcd46be43aab2fee5da05f5 Mon Sep 17 00:00:00 2001 From: Nirbheek Chauhan Date: Fri, 12 Jan 2024 01:01:51 +0530 Subject: [PATCH] rtspsrc2: Allow a SETUP response without a Transports header If we only send a single Transport in the Transports header, then the server is allowed to omit it in the response. This has some strange consequences for UDP transport: specifically, we have no idea what addr/port we will get the packets from. In those cases, we connect() on the socket when we receive the first packet, so we can send RTCP RRs, and also so we can ensure that we ignore data from other addresses. Part-of: --- net/rtsp/README.md | 1 - net/rtsp/src/rtspsrc/imp.rs | 222 +++++++++++++++++------------- net/rtsp/src/rtspsrc/transport.rs | 10 +- 3 files changed, 130 insertions(+), 103 deletions(-) diff --git a/net/rtsp/README.md b/net/rtsp/README.md index d0cedfad..26d72e52 100644 --- a/net/rtsp/README.md +++ b/net/rtsp/README.md @@ -41,7 +41,6 @@ Roughly in order of priority: - Or TCP reconnection if UDP has not timed out * Parse SDP rtcp-fb attributes * Parse SDP ssrc attributes -* Don't require Transport header in SETUP response, it is optional * Clock sync support, such as RFC7273 * PAUSE support with VOD * Seeking support with VOD diff --git a/net/rtsp/src/rtspsrc/imp.rs b/net/rtsp/src/rtspsrc/imp.rs index d312e38c..2933ceaa 100644 --- a/net/rtsp/src/rtspsrc/imp.rs +++ b/net/rtsp/src/rtspsrc/imp.rs @@ -16,7 +16,7 @@ use std::convert::TryFrom; use std::fmt; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6}; use std::pin::Pin; -use std::sync::{Arc, Mutex}; +use std::sync::Mutex; use std::time::Duration; use anyhow::Result; @@ -51,6 +51,7 @@ const DEFAULT_LOCATION: Option = None; const DEFAULT_TIMEOUT: gst::ClockTime = gst::ClockTime::from_seconds(2); const DEFAULT_PORT_START: u16 = 0; +// Priority list has multicast first, because we want to prefer multicast if it's available const DEFAULT_PROTOCOLS: &str = "udp-mcast,udp,tcp"; const MAX_MESSAGE_SIZE: usize = 1024 * 1024; const MAX_BIND_PORT_RETRY: u16 = 100; @@ -545,7 +546,7 @@ impl RtspSrc { Err(err) => { gst::element_imp_error!( task_src, - gst::CoreError::Failed, + gst::ResourceError::OpenRead, ["Failed to connect to RTSP server: {err:#?}"] ); return; @@ -852,30 +853,23 @@ impl RtspSrc { let rtp_appsrc = self.make_rtp_appsrc(rtpsession_n, &p.caps, &manager)?; p.rtp_appsrc = Some(rtp_appsrc.clone()); - // Spawn RTP udpsrc task + // Spawn RTP udp receive task state.handles.push(RUNTIME.spawn(async move { - udpsrc_task(&rtp_socket, rtp_appsrc, Some(settings.timeout)).await + udp_rtp_task(&rtp_socket, rtp_appsrc, settings.timeout).await })); - // Spawn RTCP udpsrc task + // Spawn RTCP udp send/recv task if let Some(rtcp_socket) = rtcp_socket { let rtcp_appsrc = self.make_rtcp_appsrc(rtpsession_n, &manager)?; - let socket = Arc::new(rtcp_socket); - let sock = socket.clone(); - state.handles.push( - RUNTIME - .spawn(async move { udpsrc_task(&sock, rtcp_appsrc, None).await }), - ); - // Spawn RTCP RR udpsink task self.make_rtcp_appsink(rtpsession_n, &manager, on_rtcp)?; - state - .handles - .push(RUNTIME.spawn(async move { udpsink_task(&socket, rx).await })); + state.handles.push(RUNTIME.spawn(async move { + udp_rtcp_task(&rtcp_socket, rtcp_appsrc, rx).await + })); } } RtspTransportInfo::Udp { source, - server_port: (server_rtp_port, server_rtcp_port), + server_port, client_port: _, sockets, } => { @@ -887,41 +881,34 @@ impl RtspSrc { ); continue; }; + if let Some((server_rtp_port, server_rtcp_port)) = server_port { + let _ = rtp_socket + .connect(&format!( + "{}:{server_rtp_port}", + source.as_ref().expect("Must have source address") + )) + .await; + if let (Some(source), Some(port), Some(s)) = + (source, server_rtcp_port, rtcp_socket.as_ref()) + { + let _ = s.connect(&format!("{source}:{port}")).await; + } + }; - let _ = rtp_socket - .connect(&format!( - "{}:{server_rtp_port}", - source.as_ref().expect("Must have source address") - )) - .await; - - if let (Some(source), Some(port), Some(s)) = - (source, server_rtcp_port, rtcp_socket.as_ref()) - { - let _ = s.connect(&format!("{source}:{port}")).await; - } - - // Spawn RTP udpsrc task + // Spawn RTP udp receive task let rtp_appsrc = self.make_rtp_appsrc(rtpsession_n, &p.caps, &manager)?; p.rtp_appsrc = Some(rtp_appsrc.clone()); state.handles.push(RUNTIME.spawn(async move { - udpsrc_task(&rtp_socket, rtp_appsrc, Some(settings.timeout)).await + udp_rtp_task(&rtp_socket, rtp_appsrc, settings.timeout).await })); + // Spawn RTCP udp send/recv task if let Some(rtcp_socket) = rtcp_socket { - // Spawn RTCP SR udpsrc task let rtcp_appsrc = self.make_rtcp_appsrc(rtpsession_n, &manager)?; - let socket = Arc::new(rtcp_socket); - let sock = socket.clone(); - state.handles.push( - RUNTIME - .spawn(async move { udpsrc_task(&sock, rtcp_appsrc, None).await }), - ); - // Spawn RTCP RR udpsink task self.make_rtcp_appsink(rtpsession_n, &manager, on_rtcp)?; - state - .handles - .push(RUNTIME.spawn(async move { udpsink_task(&socket, rx).await })); + state.handles.push(RUNTIME.spawn(async move { + udp_rtcp_task(&rtcp_socket, rtcp_appsrc, rx).await + })); } } RtspTransportInfo::Tcp { @@ -1415,7 +1402,7 @@ impl RtspTaskState { } fn parse_setup_transports( - transports: Transports, + transports: &Transports, s: &mut gst::Structure, protocols: &[RtspProtocol], mode: &TransportMode, @@ -1664,10 +1651,11 @@ impl RtspTaskState { } self.cseq += 1; + let transports: Transports = transports.as_slice().into(); let req = Request::builder(Method::Setup, self.version) .typed_header::(&self.cseq.into()) .header(USER_AGENT, DEFAULT_USER_AGENT) - .typed_header::(&transports.as_slice().into()) + .typed_header::(&transports) .request_uri(control_url.clone()); let req = if let Some(s) = session { req.typed_header::(s) @@ -1700,13 +1688,17 @@ impl RtspTaskState { // Manually strip timeout field: https://github.com/sdroege/rtsp-types/issues/24 session.replace(Session(new_session.0, None)); let mut parsed_transport = if let Some(transports) = rsp.typed_header::()? { - Self::parse_setup_transports(transports, &mut s, &protocols, &mode) + Self::parse_setup_transports(&transports, &mut s, &protocols, &mode) } else { - // FIXME: Transport header in response is optional + // Transport header in response is optional if only one transport was offered // https://datatracker.ietf.org/doc/html/rfc2326#section-12.39 - Err(RtspError::InvalidMessage( - "No transport header in SETUP response", - )) + if transports.len() == 1 { + Self::parse_setup_transports(&transports, &mut s, &protocols, &mode) + } else { + Err(RtspError::InvalidMessage( + "No transport header in SETUP response", + )) + } }?; match &mut parsed_transport { RtspTransportInfo::UdpMulticast { .. } => {} @@ -1953,11 +1945,33 @@ fn on_rtcp_tcp( } } -async fn udpsrc_task(socket: &UdpSocket, appsrc: gst_app::AppSrc, timeout: Option) { +async fn udp_rtp_task(socket: &UdpSocket, appsrc: gst_app::AppSrc, timeout: gst::ClockTime) { // TODO: this should allocate a buffer pool to avoid a copy let mut buf = vec![0; UDP_PACKET_MAX_SIZE]; - let t = Duration::from_secs(timeout.unwrap_or(gst::ClockTime::MAX).into()); - loop { + let t = Duration::from_secs(timeout.into()); + // We would not be connected if the server didn't give us a Transport header or its Transport + // header didn't specify the server port, so we don't know the sender port from which we will + // get data till we get the first packet here. + if !socket.peer_addr().is_ok() { + let ret = match time::timeout(t, socket.peek_sender()).await { + Ok(Ok(addr)) => { + let _ = socket.connect(addr).await; + Ok(()) + } + Ok(Err(_elapsed)) => Err(format!("No data after {DEFAULT_TIMEOUT} seconds, exiting")), + Err(err) => Err(format!("UDP socket was closed: {err:?}")), + }; + if let Err(s) = ret { + gst::element_error!( + appsrc, + gst::ResourceError::Failed, + ("{}", s), + ["{:#?}", socket] + ); + return; + } + } + let error = loop { match time::timeout(t, socket.recv(&mut buf)).await { Ok(Ok(len)) => { let t = appsrc.current_running_time(); @@ -1965,56 +1979,76 @@ async fn udpsrc_task(socket: &UdpSocket, appsrc: gst_app::AppSrc, timeout: Optio let bufref = buffer.make_mut(); bufref.set_dts(t); if let Err(err) = appsrc.push_buffer(buffer) { - gst::element_error!( - appsrc, - gst::ResourceError::Failed, - ("UDP buffer push failed: {:?}", err), - ["{:#?}", socket] - ); - break; + break format!("UDP buffer push failed: {err:?}"); } } - Ok(Err(_elapsed)) => { - gst::element_error!( - appsrc, - gst::ResourceError::Failed, - ["No data received after {DEFAULT_TIMEOUT} seconds, exiting"] - ); - break; - } - Err(err) => { - gst::element_error!( - appsrc, - gst::ResourceError::Close, - ("UDP socket was closed: {:?}", err), - ["{:#?}", socket] - ); - break; - } + Ok(Err(_elapsed)) => break format!("No data after {DEFAULT_TIMEOUT} seconds, exiting"), + Err(err) => break format!("UDP socket was closed: {err:?}"), }; - } + }; + gst::element_error!( + appsrc, + gst::ResourceError::Failed, + ("{}", error), + ["{:#?}", socket] + ); } -async fn udpsink_task(socket: &UdpSocket, mut rx: mpsc::Receiver>) { - loop { - match rx.recv().await { - Some(data) => match socket.send(data.as_ref()).await { - Ok(_) => { - gst::debug!(CAT, "Sent RTCP RR"); - } - Err(err) => { - gst::error!(CAT, "UDP socket send error: {err:?}, quitting loop"); +async fn udp_rtcp_task( + socket: &UdpSocket, + appsrc: gst_app::AppSrc, + mut rx: mpsc::Receiver>, +) { + // The socket might not be connected if the server either didn't specify a server_port for + // RTCP, or if the server didn't send a Transport header in the SETUP response at all. + // In that case, we will connect when we get an RTCP packet. + let mut is_connected = socket.peer_addr().is_ok(); + let mut buf = vec![0; UDP_PACKET_MAX_SIZE]; + let error = loop { + tokio::select! { + send_rtcp = rx.recv() => match send_rtcp { + Some(data) => match socket.send(data.as_ref()).await { + Ok(_) => gst::debug!(CAT, "Sent RTCP RR"), + Err(err) => { + if !is_connected { + gst::warning!(CAT, "Can't send RTCP yet: don't have dest addr"); + } else { + rx.close(); + break format!("RTCP send error: {err:?}, stopping task"); + } + } + }, + None => { rx.close(); - break; + break format!("UDP socket {socket:?} closed, no more RTCP will be sent"); } }, - None => { - gst::info!(CAT, "UDP socket {socket:?} closed, quitting loop"); - rx.close(); - break; - } - }; - } + recv_rtcp = socket.recv_from(&mut buf) => match recv_rtcp { + Ok((len, addr)) => { + gst::debug!(CAT, "Received RTCP SR"); + if !is_connected { + gst::info!(CAT, "Delayed RTCP UDP connect to {addr:?}"); + let _ = socket.connect(addr).await; + is_connected = true; + }; + let t = appsrc.current_running_time(); + let mut buffer = gst::Buffer::from_slice(buf[..len].to_owned()); + let bufref = buffer.make_mut(); + bufref.set_dts(t); + if let Err(err) = appsrc.push_buffer(buffer) { + break format!("UDP buffer push failed: {err:?}"); + } + } + Err(err) => break format!("UDP socket was closed: {err:?}"), + }, + } + }; + gst::element_error!( + appsrc, + gst::ResourceError::Failed, + ("{}", error), + ["{:#?}", socket] + ); } #[glib::object_subclass] diff --git a/net/rtsp/src/rtspsrc/transport.rs b/net/rtsp/src/rtspsrc/transport.rs index f4573902..d1b84119 100644 --- a/net/rtsp/src/rtspsrc/transport.rs +++ b/net/rtsp/src/rtspsrc/transport.rs @@ -22,7 +22,7 @@ pub enum RtspTransportInfo { }, Udp { source: Option, - server_port: (u16, Option), + server_port: Option<(u16, Option)>, client_port: Option<(u16, Option)>, sockets: Option<(UdpSocket, Option)>, }, @@ -73,15 +73,9 @@ impl TryFrom<&RtpTransport> for RtspTransportInfo { ttl: t.params.ttl, }) } else { - let Some(server_port) = t.params.server_port else { - return Err(RtspError::Fatal(format!( - "Need server unicast UDP port(s): {:#?}", - t.params, - ))); - }; Ok(RtspTransportInfo::Udp { source: t.params.source.clone(), - server_port, + server_port: t.params.server_port, client_port: t.params.client_port, sockets: None, })