gst-plugins-rs/net/rtsp/src/rtspsrc/imp.rs
Nirbheek Chauhan 086ffd7aff New RTSP source plugin with live streaming support
GST_PLUGIN_FEATURE_RANK=rtspsrc2:1 gst-play-1.0 [URI]

Features:
* Live streaming N audio and N video
  - With RTCP-based A/V sync
* Lower transports: TCP, UDP, UDP-Multicast
* RTP, RTCP SR, RTCP RR
* OPTIONS DESCRIBE SETUP PLAY TEARDOWN
* Custom UDP socket management, does not use udpsrc/udpsink
* Supports both rtpbin and the rtpbin2 rust rewrite
  - Set USE_RTPBIN2=1 to use rtpbin2 (needs other MRs)
* Properties:
  - protocols selection and priority (NEW!)
  - location supports rtsp[ut]://
  - port-start instead of port-range

Co-Authored-by: Tim-Philipp Müller <tim@centricular.com>
Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1425>
2024-02-07 20:29:18 +05:30

2027 lines
74 KiB
Rust

// GStreamer RTSP Source 2
//
// Copyright (C) 2023 Tim-Philipp Müller <tim centricular com>
// Copyright (C) 2023-2024 Nirbheek Chauhan <nirbheek centricular com>
//
// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
// If a copy of the MPL was not distributed with this file, You can obtain one at
// <https://mozilla.org/MPL/2.0/>.
//
// SPDX-License-Identifier: MPL-2.0
//
// https://www.rfc-editor.org/rfc/rfc2326.html
use std::collections::{btree_set::BTreeSet, HashMap};
use std::convert::TryFrom;
use std::fmt;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6};
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use anyhow::Result;
use once_cell::sync::Lazy;
use futures::{Sink, SinkExt, Stream, StreamExt};
use socket2::Socket;
use tokio::net::{TcpStream, UdpSocket};
use tokio::runtime;
use tokio::sync::{mpsc, oneshot};
use tokio::task::JoinHandle;
use tokio::time;
use rtsp_types::headers::{
CSeq, NptRange, NptTime, Public, Range, RtpInfos, RtpLowerTransport, RtpProfile, RtpTransport,
RtpTransportParameters, Session, Transport, TransportMode, Transports, ACCEPT, CONTENT_BASE,
CONTENT_LOCATION, USER_AGENT,
};
use rtsp_types::{Message, Method, Request, Response, StatusCode, Version};
use url::Url;
use gst::buffer::{MappedBuffer, Readable};
use gst::glib;
use gst::prelude::*;
use gst::subclass::prelude::*;
use super::body::Body;
use super::transport::RtspTransportInfo;
const DEFAULT_LOCATION: Option<Url> = None;
const DEFAULT_TIMEOUT: gst::ClockTime = gst::ClockTime::from_seconds(2);
const DEFAULT_PORT_START: u16 = 0;
const DEFAULT_PROTOCOLS: &str = "udp-mcast,udp,tcp";
const MAX_MESSAGE_SIZE: usize = 1024 * 1024;
const MAX_BIND_PORT_RETRY: u16 = 100;
const UDP_PACKET_MAX_SIZE: usize = 65535 - 8;
static RTCP_CAPS: Lazy<gst::Caps> =
Lazy::new(|| gst::Caps::from(gst::Structure::new_empty("application/x-rtcp")));
// Hardcoded for now
const DEFAULT_USER_AGENT: &str = concat!(
"GStreamer rtspsrc2 ",
env!("CARGO_PKG_VERSION"),
"-",
env!("COMMIT_ID")
);
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub enum RtspProtocol {
UdpMulticast,
Udp,
Tcp,
}
impl fmt::Display for RtspProtocol {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match &self {
RtspProtocol::Udp => write!(f, "udp"),
RtspProtocol::UdpMulticast => write!(f, "udp-mcast"),
RtspProtocol::Tcp => write!(f, "tcp"),
}
}
}
#[derive(Debug, Clone)]
struct Settings {
location: Option<Url>,
port_start: u16,
protocols: Vec<RtspProtocol>,
timeout: gst::ClockTime,
}
impl Default for Settings {
fn default() -> Self {
Settings {
location: DEFAULT_LOCATION,
port_start: DEFAULT_PORT_START,
timeout: DEFAULT_TIMEOUT,
protocols: parse_protocols_str(DEFAULT_PROTOCOLS).unwrap(),
}
}
}
#[derive(Debug)]
enum Commands {
Play,
//Pause,
Teardown(Option<oneshot::Sender<()>>),
Data(rtsp_types::Data<Body>),
}
#[derive(Debug, Default)]
pub struct RtspSrc {
settings: Mutex<Settings>,
task_handle: Mutex<Option<JoinHandle<()>>>,
command_queue: Mutex<Option<mpsc::Sender<Commands>>>,
}
#[derive(thiserror::Error, Debug)]
pub enum RtspError {
#[error("Generic I/O error")]
IOGeneric(#[from] std::io::Error),
#[error("Read I/O error")]
Read(#[from] super::tcp_message::ReadError),
#[error("RTSP header parse error")]
HeaderParser(#[from] rtsp_types::headers::HeaderParseError),
#[error("SDP parse error")]
SDPParser(#[from] sdp_types::ParserError),
#[error("Unexpected RTSP message: expected, received")]
UnexpectedMessage(&'static str, rtsp_types::Message<Body>),
#[error("Invalid RTSP message")]
InvalidMessage(&'static str),
#[error("Fatal error")]
Fatal(String),
}
pub(crate) static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
gst::DebugCategory::new(
"rtspsrc2",
gst::DebugColorFlags::empty(),
Some("RTSP source"),
)
});
static RUNTIME: Lazy<runtime::Runtime> = Lazy::new(|| {
runtime::Builder::new_multi_thread()
.enable_all()
.worker_threads(1)
.build()
.unwrap()
});
fn parse_protocols_str(s: &str) -> Result<Vec<RtspProtocol>, glib::Error> {
let mut acc = Vec::new();
if s.is_empty() {
return Err(glib::Error::new(
gst::CoreError::Failed,
"Protocols list is empty",
));
}
for each in s.split(',') {
match each {
"udp-mcast" => acc.push(RtspProtocol::UdpMulticast),
"udp" => acc.push(RtspProtocol::Udp),
"tcp" => acc.push(RtspProtocol::Tcp),
_ => {
return Err(glib::Error::new(
gst::CoreError::Failed,
&format!("Unsupported RTSP protocol: {each}"),
))
}
}
}
Ok(acc)
}
impl RtspSrc {
fn set_location(&self, uri: Option<&str>) -> Result<(), glib::Error> {
if self.obj().current_state() > gst::State::Ready {
return Err(glib::Error::new(
gst::URIError::BadState,
"Changing the 'location' property on a started 'rtspsrc2' is not supported",
));
}
let mut settings = self.settings.lock().unwrap();
let Some(uri) = uri else {
settings.location = DEFAULT_LOCATION;
return Ok(());
};
let uri = Url::parse(uri).map_err(|err| {
glib::Error::new(
gst::URIError::BadUri,
&format!("Failed to parse URI '{uri}': {err:?}"),
)
})?;
if uri.password().is_some() || !uri.username().is_empty() {
// TODO
gst::fixme!(CAT, "URI credentials are currently ignored");
}
match (uri.host_str(), uri.port()) {
(Some(_), Some(_)) | (Some(_), None) => Ok(()),
_ => Err(glib::Error::new(gst::URIError::BadUri, "Invalid host")),
}?;
let protocols: &[RtspProtocol] = match uri.scheme() {
"rtspu" => &[RtspProtocol::UdpMulticast, RtspProtocol::Udp],
"rtspt" => &[RtspProtocol::Tcp],
"rtsp" => &settings.protocols,
scheme => {
return Err(glib::Error::new(
gst::URIError::UnsupportedProtocol,
&format!("Unsupported URI scheme '{}'", scheme),
));
}
};
if !settings.protocols.iter().any(|p| protocols.contains(p)) {
return Err(glib::Error::new(
gst::URIError::UnsupportedProtocol,
&format!(
"URI scheme '{}' does not match allowed protocols: {:?}",
uri.scheme(),
settings.protocols,
),
));
}
settings.protocols = protocols.to_vec();
settings.location = Some(uri);
Ok(())
}
fn set_protocols(&self, protocol_s: Option<&str>) -> Result<(), glib::Error> {
if self.obj().current_state() > gst::State::Ready {
return Err(glib::Error::new(
gst::CoreError::Failed,
"Changing the 'protocols' property on a started 'rtspsrc2' is not supported",
));
}
let mut settings = self.settings.lock().unwrap();
settings.protocols = match protocol_s {
Some(s) => parse_protocols_str(s)?,
None => parse_protocols_str(DEFAULT_PROTOCOLS).unwrap(),
};
Ok(())
}
}
impl ObjectImpl for RtspSrc {
fn properties() -> &'static [glib::ParamSpec] {
static PROPERTIES: Lazy<Vec<glib::ParamSpec>> = Lazy::new(|| {
vec![
glib::ParamSpecString::builder("location")
.nick("Location")
.blurb("RTSP server, credentials and media path, e.g. rtsp://user:p4ssw0rd@camera-5.local:8554/h264_1080p30")
.mutable_ready()
.build(),
// We purposely use port-start instead of port-range (like in rtspsrc), because
// there is no way for the user to know how many ports we actually need. It depends
// on how many streams the media contains, and whether the server wants RTCP or
// RTCP-mux, or no RTCP. This property can be used to specify the start of the
// valid range, and if the user wants to know how many ports were used, we can
// add API for that later.
glib::ParamSpecUInt::builder("port-start")
.nick("Port start")
.blurb("Port number to start allocating client ports for receiving RTP and RTCP data, eg. 3000 (0 = automatic selection)")
.default_value(DEFAULT_PORT_START.into())
.mutable_ready()
.build(),
glib::ParamSpecString::builder("protocols")
.nick("Protocols")
.blurb("Allowed lower transport protocols, in order of preference")
.default_value("udp-mcast,udp,tcp")
.mutable_ready()
.build(),
glib::ParamSpecUInt64::builder("timeout")
.nick("Timeout")
.blurb("Timeout for network activity, in nanoseconds")
.maximum(gst::ClockTime::MAX.into())
.default_value(DEFAULT_TIMEOUT.into())
.mutable_ready()
.build(),
]
});
PROPERTIES.as_ref()
}
fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) {
let res = match pspec.name() {
"location" => {
let location = value.get::<Option<&str>>().expect("type checked upstream");
self.set_location(location)
}
"port-start" => {
let mut settings = self.settings.lock().unwrap();
let start = value.get::<u32>().expect("type checked upstream");
match u16::try_from(start) {
Ok(start) => {
settings.port_start = start;
Ok(())
}
Err(err) => Err(glib::Error::new(
gst::CoreError::Failed,
&format!("Failed to set port start: {err:?}"),
)),
}
}
"protocols" => {
let protocols = value.get::<Option<&str>>().expect("type checked upstream");
self.set_protocols(protocols)
}
"timeout" => {
let mut settings = self.settings.lock().unwrap();
let timeout = value.get().expect("type checked upstream");
settings.timeout = timeout;
Ok(())
}
name => unimplemented!("Property '{name}'"),
};
if let Err(err) = res {
gst::error!(
CAT,
imp: self,
"Failed to set property `{}`: {:?}",
pspec.name(),
err
);
}
}
fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
match pspec.name() {
"location" => {
let settings = self.settings.lock().unwrap();
let location = settings.location.as_ref().map(Url::to_string);
location.to_value()
}
"port-start" => {
let settings = self.settings.lock().unwrap();
(settings.port_start as u32).to_value()
}
"protocols" => {
let settings = self.settings.lock().unwrap();
(settings
.protocols
.iter()
.map(ToString::to_string)
.collect::<Vec<_>>()
.join(","))
.to_value()
}
"timeout" => {
let settings = self.settings.lock().unwrap();
settings.timeout.to_value()
}
name => unimplemented!("Property '{name}'"),
}
}
fn constructed(&self) {
self.parent_constructed();
let obj = self.obj();
obj.set_suppressed_flags(gst::ElementFlags::SINK | gst::ElementFlags::SOURCE);
obj.set_element_flags(gst::ElementFlags::SOURCE);
}
}
impl GstObjectImpl for RtspSrc {}
impl ElementImpl for RtspSrc {
fn metadata() -> Option<&'static gst::subclass::ElementMetadata> {
static ELEMENT_METADATA: Lazy<gst::subclass::ElementMetadata> = Lazy::new(|| {
gst::subclass::ElementMetadata::new(
"RTSP Source",
"Source/Network",
"Receive audio or video from a network device via the Real Time Streaming Protocol (RTSP) (RFC 2326, 7826)",
"Nirbheek Chauhan <nirbheek centricular com>",
)
});
Some(&*ELEMENT_METADATA)
}
fn pad_templates() -> &'static [gst::PadTemplate] {
static PAD_TEMPLATES: Lazy<Vec<gst::PadTemplate>> = Lazy::new(|| {
let src_pad_template = gst::PadTemplate::new(
"stream_%u",
gst::PadDirection::Src,
gst::PadPresence::Sometimes,
&gst::Caps::new_empty_simple("application/x-rtp"),
)
.unwrap();
vec![src_pad_template]
});
PAD_TEMPLATES.as_ref()
}
fn change_state(
&self,
transition: gst::StateChange,
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
match transition {
gst::StateChange::NullToReady => {
self.start().map_err(|err_msg| {
self.post_error_message(err_msg);
gst::StateChangeError
})?;
}
gst::StateChange::PausedToPlaying => {
let cmd_queue = self.cmd_queue();
//self.async_start().map_err(|_| gst::StateChangeError)?;
RUNTIME.spawn(async move { cmd_queue.send(Commands::Play).await });
}
_ => {}
}
let mut ret = self.parent_change_state(transition)?;
match transition {
gst::StateChange::ReadyToPaused | gst::StateChange::PlayingToPaused => {
ret = gst::StateChangeSuccess::NoPreroll;
}
gst::StateChange::PausedToReady => {
match tokio::runtime::Handle::try_current() {
Ok(_) => {
// If the app does set_state(NULL) from a block_on() inside its own tokio
// runtime, calling block_on() on our own runtime will cause a panic
// because of nested blocking calls. So, shutdown the task from another
// thread.
// The app's usage is also incorrect since they are blocking the runtime
// on I/O, so emit a warning.
gst::warning!(
CAT,
"Blocking I/O: state change to NULL called from an async \
tokio context, redirecting to another thread to prevent \
the tokio panic, but you should refactor your code to \
make use of gst::Element::call_async and set the state \
to NULL from there, without blocking the runtime"
);
let (tx, rx) = std::sync::mpsc::channel();
self.obj().call_async(move |element| {
tx.send(element.imp().stop()).unwrap();
});
rx.recv().unwrap()
}
Err(_) => self.stop(),
}
.map_err(|err_msg| {
self.post_error_message(err_msg);
gst::StateChangeError
})?;
}
_ => (),
}
Ok(ret)
}
}
impl BinImpl for RtspSrc {}
impl URIHandlerImpl for RtspSrc {
const URI_TYPE: gst::URIType = gst::URIType::Src;
fn protocols() -> &'static [&'static str] {
&["rtsp", "rtspu", "rtspt"]
}
fn uri(&self) -> Option<String> {
let settings = self.settings.lock().unwrap();
settings.location.as_ref().map(Url::to_string)
}
fn set_uri(&self, uri: &str) -> Result<(), glib::Error> {
self.set_location(Some(uri))
}
}
type RtspStream =
Pin<Box<dyn Stream<Item = Result<Message<Body>, super::tcp_message::ReadError>> + Send>>;
type RtspSink = Pin<Box<dyn Sink<Message<Body>, Error = std::io::Error> + Send>>;
impl RtspSrc {
#[track_caller]
fn cmd_queue(&self) -> mpsc::Sender<Commands> {
self.command_queue.lock().unwrap().as_ref().unwrap().clone()
}
fn start(&self) -> Result<(), gst::ErrorMessage> {
let Some(url) = self.settings.lock().unwrap().location.clone() else {
return Err(gst::error_msg!(
gst::ResourceError::Settings,
["No location set"]
));
};
gst::info!(
CAT,
imp: self,
"Location: {url}",
);
gst::info!(
CAT,
imp: self,
"Starting RTSP connection thread.. "
);
let task_src = self.ref_counted();
let mut task_handle = self.task_handle.lock().unwrap();
let (tx, rx) = mpsc::channel(1);
{
let mut cmd_queue_opt = self.command_queue.lock().unwrap();
debug_assert!(cmd_queue_opt.is_none());
cmd_queue_opt.replace(tx);
}
let join_handle = RUNTIME.spawn(async move {
gst::info!(CAT, "Connecting to {url} ..");
let hostname_port =
format!("{}:{}", url.host_str().unwrap(), url.port().unwrap_or(554));
// TODO: Add TLS support
let s = match TcpStream::connect(hostname_port).await {
Ok(s) => s,
Err(err) => {
gst::element_imp_error!(
task_src,
gst::CoreError::Failed,
["Failed to connect to RTSP server: {err:#?}"]
);
return;
}
};
let _ = s.set_nodelay(true);
gst::info!(CAT, "Connected!");
let (read, write) = s.into_split();
let stream = Box::pin(super::tcp_message::async_read(read, MAX_MESSAGE_SIZE).fuse());
let sink = Box::pin(super::tcp_message::async_write(write));
let mut state = RtspTaskState::new(url, stream, sink);
let task_ret = task_src.rtsp_task(&mut state, rx).await;
gst::info!(CAT, "Exited rtsp_task");
// Cleanup after stopping
for h in &state.handles {
h.abort();
}
for h in state.handles {
let _ = h.await;
}
let obj = task_src.obj();
for e in obj.iterate_sorted() {
let Ok(e) = e else {
continue;
};
if let Err(err) = e.set_state(gst::State::Null) {
gst::warning!(CAT, "{} failed to go to Null state: {err:?}", e.name());
}
}
for pad in obj.src_pads() {
if let Err(err) = obj.remove_pad(&pad) {
gst::warning!(CAT, "Failed to remove pad {}: {err:?}", pad.name());
}
}
for e in obj.iterate_sorted() {
let Ok(e) = e else {
continue;
};
if let Err(err) = obj.remove(&e) {
gst::warning!(CAT, "Failed to remove element {}: {err:?}", e.name());
}
}
// Post the element error after cleanup
if let Err(err) = task_ret {
gst::element_imp_error!(
task_src,
gst::CoreError::Failed,
["RTSP task exited: {err:#?}"]
);
}
gst::info!(CAT, "Cleanup complete");
});
debug_assert!(task_handle.is_none());
task_handle.replace(join_handle);
gst::info!(CAT, imp: self, "Started");
Ok(())
}
fn stop(&self) -> Result<(), gst::ErrorMessage> {
gst::info!(CAT, "Stopping...");
let cmd_queue = self.cmd_queue();
let task_handle = { self.task_handle.lock().unwrap().take() };
RUNTIME.block_on(async {
let (tx, rx) = oneshot::channel();
if let Ok(()) = cmd_queue.send(Commands::Teardown(Some(tx))).await {
if let Err(_elapsed) = time::timeout(Duration::from_millis(500), rx).await {
gst::warning!(
CAT,
"Timeout waiting for Teardown, going to NULL asynchronously"
);
}
}
});
if let Some(join_handle) = task_handle {
gst::debug!(CAT, "Waiting for RTSP connection thread to shut down..");
let _ = RUNTIME.block_on(join_handle);
}
self.command_queue.lock().unwrap().take();
gst::info!(CAT, imp: self, "Stopped");
Ok(())
}
fn make_rtp_appsrc(
&self,
rtpsession_n: usize,
caps: &gst::Caps,
manager: &RtspManager,
) -> Result<gst_app::AppSrc> {
let callbacks = gst_app::AppSrcCallbacks::builder()
.enough_data(|appsrc| {
gst::warning!(CAT, "appsrc {} is overrunning: enough data!", appsrc.name());
})
.build();
let appsrc = gst_app::AppSrc::builder()
.name(format!("rtp_appsrc_{rtpsession_n}"))
.format(gst::Format::Time)
.handle_segment_change(true)
.caps(caps)
.stream_type(gst_app::AppStreamType::Stream)
.max_bytes(0)
.max_buffers(0)
.max_time(Some(gst::ClockTime::from_seconds(2)))
.leaky_type(gst_app::AppLeakyType::Downstream)
.callbacks(callbacks)
.is_live(true)
.build();
let obj = self.obj();
obj.add(&appsrc)?;
appsrc
.static_pad("src")
.unwrap()
.link(&manager.rtp_recv_sinkpad(rtpsession_n).unwrap())?;
let templ = obj.pad_template("stream_%u").unwrap();
let ghostpad = gst::GhostPad::builder_from_template(&templ)
.name(format!("stream_{}", rtpsession_n))
.build();
gst::info!(CAT, "Adding ghost srcpad {}", ghostpad.name());
obj.add_pad(&ghostpad)
.expect("Adding a ghostpad should never fail");
appsrc.sync_state_with_parent()?;
Ok(appsrc)
}
fn make_rtcp_appsrc(
&self,
rtpsession_n: usize,
manager: &RtspManager,
) -> Result<gst_app::AppSrc> {
let appsrc = gst_app::AppSrc::builder()
.name(format!("rtcp_appsrc_{rtpsession_n}"))
.format(gst::Format::Time)
.handle_segment_change(true)
.caps(&RTCP_CAPS)
.stream_type(gst_app::AppStreamType::Stream)
.is_live(true)
.build();
self.obj().add(&appsrc)?;
appsrc
.static_pad("src")
.unwrap()
.link(&manager.rtcp_recv_sinkpad(rtpsession_n).unwrap())?;
appsrc.sync_state_with_parent()?;
Ok(appsrc)
}
fn make_rtcp_appsink<
F: FnMut(&gst_app::AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static,
>(
&self,
rtpsession_n: usize,
manager: &RtspManager,
on_rtcp: F,
) -> Result<()> {
let cmd_tx_eos = self.cmd_queue();
let cbs = gst_app::app_sink::AppSinkCallbacks::builder()
.eos(move |_appsink| {
let cmd_tx = cmd_tx_eos.clone();
RUNTIME.spawn(async move {
let _ = cmd_tx.send(Commands::Teardown(None)).await;
});
})
.new_sample(on_rtcp)
.build();
let rtcp_appsink = gst_app::AppSink::builder()
.name(format!("rtcp_appsink_{rtpsession_n}"))
.sync(false)
.async_(false)
.callbacks(cbs)
.build();
self.obj().add(&rtcp_appsink)?;
manager
.rtcp_send_srcpad(rtpsession_n)
.unwrap()
.link(&rtcp_appsink.static_pad("sink").unwrap())?;
Ok(())
}
fn post_start(&self, code: &str, text: &str) {
let obj = self.obj();
let msg = gst::message::Progress::builder(gst::ProgressType::Start, code, text)
.src(&*obj)
.build();
let _ = obj.post_message(msg);
}
fn post_complete(&self, code: &str, text: &str) {
let obj = self.obj();
let msg = gst::message::Progress::builder(gst::ProgressType::Complete, code, text)
.src(&*obj)
.build();
let _ = obj.post_message(msg);
}
fn post_cancelled(&self, code: &str, text: &str) {
let obj = self.obj();
let msg = gst::message::Progress::builder(gst::ProgressType::Canceled, code, text)
.src(&*obj)
.build();
let _ = obj.post_message(msg);
}
async fn rtsp_task(
&self,
state: &mut RtspTaskState,
mut cmd_rx: mpsc::Receiver<Commands>,
) -> Result<()> {
let cmd_tx = self.cmd_queue();
let settings = { self.settings.lock().unwrap().clone() };
// OPTIONS
state.options().await?;
// DESCRIBE
state.describe().await?;
let mut session: Option<Session> = None;
// SETUP streams (TCP interleaved)
state.setup_params = {
state
.setup(
&mut session,
settings.port_start,
&settings.protocols,
TransportMode::Play,
)
.await?
};
let manager = RtspManager::new(std::env::var("USE_RTPBIN2").is_ok_and(|s| s == "1"));
let obj = self.obj();
obj.add(&manager.inner)
.expect("Adding the manager cannot fail");
manager.inner.sync_state_with_parent().unwrap();
let mut tcp_interleave_appsrcs = HashMap::new();
for (rtpsession_n, p) in state.setup_params.iter_mut().enumerate() {
let (tx, rx) = mpsc::channel(1);
let on_rtcp = move |appsink: &_| on_rtcp_udp(appsink, tx.clone());
match &mut p.transport {
RtspTransportInfo::UdpMulticast {
dest,
port: (rtp_port, rtcp_port),
ttl,
} => {
let rtp_socket = bind_port(*rtp_port, dest.is_ipv4())?;
let rtcp_socket = rtcp_port.and_then(|p| {
bind_port(p, dest.is_ipv4())
.map_err(|err| {
gst::warning!(CAT, "Could not bind to RTCP port: {err:?}");
err
})
.ok()
});
match &dest {
IpAddr::V4(addr) => {
rtp_socket.join_multicast_v4(*addr, Ipv4Addr::UNSPECIFIED)?;
if let Some(ttl) = ttl {
let _ = rtp_socket.set_multicast_ttl_v4(*ttl as u32);
}
if let Some(rtcp_socket) = &rtcp_socket {
if let Err(err) =
rtcp_socket.join_multicast_v4(*addr, Ipv4Addr::UNSPECIFIED)
{
gst::warning!(
CAT,
"Failed to join RTCP multicast address {addr}: {err:?}"
);
}
if let Some(ttl) = ttl {
let _ = rtcp_socket.set_multicast_ttl_v4(*ttl as u32);
}
}
}
IpAddr::V6(addr) => {
rtp_socket.join_multicast_v6(addr, 0)?;
if let Some(rtcp_socket) = &rtcp_socket {
if let Err(err) = rtcp_socket.join_multicast_v6(addr, 0) {
gst::warning!(
CAT,
"Failed to join RTCP multicast address {addr}: {err:?}"
);
}
}
}
};
let rtp_appsrc = self.make_rtp_appsrc(rtpsession_n, &p.caps, &manager)?;
p.rtp_appsrc = Some(rtp_appsrc.clone());
// Spawn RTP udpsrc task
state.handles.push(RUNTIME.spawn(async move {
udpsrc_task(&rtp_socket, rtp_appsrc, Some(settings.timeout)).await
}));
// Spawn RTCP udpsrc task
if let Some(rtcp_socket) = rtcp_socket {
let rtcp_appsrc = self.make_rtcp_appsrc(rtpsession_n, &manager)?;
let socket = Arc::new(rtcp_socket);
let sock = socket.clone();
state.handles.push(
RUNTIME
.spawn(async move { udpsrc_task(&sock, rtcp_appsrc, None).await }),
);
// Spawn RTCP RR udpsink task
self.make_rtcp_appsink(rtpsession_n, &manager, on_rtcp)?;
state
.handles
.push(RUNTIME.spawn(async move { udpsink_task(&socket, rx).await }));
}
}
RtspTransportInfo::Udp {
source,
server_port: (server_rtp_port, server_rtcp_port),
client_port: _,
sockets,
} => {
let Some((rtp_socket, rtcp_socket)) = sockets.take() else {
gst::warning!(
CAT,
"Skipping: no UDP sockets for {rtpsession_n}: {:#?}",
p.transport
);
continue;
};
let _ = rtp_socket
.connect(&format!(
"{}:{server_rtp_port}",
source.as_ref().expect("Must have source address")
))
.await;
if let (Some(source), Some(port), Some(s)) =
(source, server_rtcp_port, rtcp_socket.as_ref())
{
let _ = s.connect(&format!("{source}:{port}")).await;
}
// Spawn RTP udpsrc task
let rtp_appsrc = self.make_rtp_appsrc(rtpsession_n, &p.caps, &manager)?;
p.rtp_appsrc = Some(rtp_appsrc.clone());
state.handles.push(RUNTIME.spawn(async move {
udpsrc_task(&rtp_socket, rtp_appsrc, Some(settings.timeout)).await
}));
if let Some(rtcp_socket) = rtcp_socket {
// Spawn RTCP SR udpsrc task
let rtcp_appsrc = self.make_rtcp_appsrc(rtpsession_n, &manager)?;
let socket = Arc::new(rtcp_socket);
let sock = socket.clone();
state.handles.push(
RUNTIME
.spawn(async move { udpsrc_task(&sock, rtcp_appsrc, None).await }),
);
// Spawn RTCP RR udpsink task
self.make_rtcp_appsink(rtpsession_n, &manager, on_rtcp)?;
state
.handles
.push(RUNTIME.spawn(async move { udpsink_task(&socket, rx).await }));
}
}
RtspTransportInfo::Tcp {
channels: (rtp_channel, rtcp_channel),
} => {
let rtp_appsrc = self.make_rtp_appsrc(rtpsession_n, &p.caps, &manager)?;
p.rtp_appsrc = Some(rtp_appsrc.clone());
tcp_interleave_appsrcs.insert(*rtp_channel, rtp_appsrc);
if let Some(rtcp_channel) = rtcp_channel {
// RTCP SR
let rtcp_appsrc = self.make_rtcp_appsrc(rtpsession_n, &manager)?;
tcp_interleave_appsrcs.insert(*rtcp_channel, rtcp_appsrc.clone());
// RTCP RR
let rtcp_channel = *rtcp_channel;
let cmd_tx = cmd_tx.clone();
self.make_rtcp_appsink(rtpsession_n, &manager, move |appsink| {
on_rtcp_tcp(appsink, cmd_tx.clone(), rtcp_channel)
})?;
}
}
}
}
obj.no_more_pads();
// Expose RTP srcpads
manager.inner.connect_pad_added(|manager, pad| {
if pad.direction() != gst::PadDirection::Src {
return;
}
let Some(obj) = manager
.parent()
.and_then(|o| o.downcast::<gst::Element>().ok())
else {
return;
};
let name = pad.name();
match *name.split('_').collect::<Vec<_>>() {
// rtpbin and rtpbin2
["recv", "rtp", "src", stream_id, ssrc, pt]
| ["rtp", "recv", "src", stream_id, ssrc, pt] => {
if stream_id.parse::<u32>().is_err() {
gst::info!(CAT, "Ignoring srcpad with invalid stream id: {name}");
return;
};
gst::info!(CAT, "Setting rtpbin pad {} as ghostpad target", name);
let srcpad = obj
.static_pad(&format!("stream_{}", stream_id))
.expect("ghostpad should've been available already");
let ghostpad = srcpad
.downcast::<gst::GhostPad>()
.expect("rtspsrc src pads are ghost pads");
if let Err(err) = ghostpad.set_target(Some(pad)) {
gst::element_error!(
obj,
gst::ResourceError::Failed,
(
"Failed to set ghostpad {} target {}: {err:?}",
ghostpad.name(),
name
),
["pt: {pt}, ssrc: {ssrc}"]
);
}
}
_ => {
gst::info!(CAT, "Ignoring unknown srcpad: {name}");
}
}
});
let mut expected_response: Option<(Method, u32)> = None;
loop {
tokio::select! {
msg = state.stream.next() => match msg {
Some(Ok(rtsp_types::Message::Data(data))) => {
let Some(appsrc) = tcp_interleave_appsrcs.get(&data.channel_id()) else {
gst::warning!(CAT,
"ignored data of size {}: unknown channel {}",
data.len(),
data.channel_id()
);
continue;
};
let t = appsrc.current_running_time();
let channel_id = data.channel_id();
gst::trace!(CAT, "Received data on channel {channel_id}");
// TODO: this should be from_mut_slice() after making the necessary
// modifications to Body
let mut buffer = gst::Buffer::from_slice(data.into_body());
let bufref = buffer.make_mut();
bufref.set_dts(t);
// TODO: Allow unlinked source pads
if let Err(err) = appsrc.push_buffer(buffer) {
gst::error!(CAT, "Failed to push buffer on pad {} for channel {}", appsrc.name(), channel_id);
return Err(err.into());
}
}
Some(Ok(rtsp_types::Message::Request(req))) => {
// TODO: implement incoming GET_PARAMETER requests
gst::debug!(CAT, "<-- {req:#?}");
}
Some(Ok(rtsp_types::Message::Response(rsp))) => {
gst::debug!(CAT, "<-- {rsp:#?}");
let Some((expected, cseq)) = &expected_response else {
continue;
};
let Some(s) = &session else {
return Err(RtspError::Fatal(format!("Can't handle {:?} response, no SETUP", expected)).into());
};
match expected {
Method::Play => {
state.play_response(&rsp, *cseq, s).await?;
self.post_complete("request", "PLAY response received");
}
Method::Teardown => state.teardown_response(&rsp, *cseq, s).await?,
m => unreachable!("BUG: unexpected response method: {m:?}"),
};
}
Some(Err(e)) => {
// TODO: reconnect or ignore if UDP sockets are still receiving data
gst::error!(CAT, "I/O error: {e:?}, quitting");
return Err(gst::FlowError::Error.into());
}
None => {
// TODO: reconnect or ignore if UDP sockets are still receiving data
gst::error!(CAT, "TCP connection EOF, quitting");
return Err(gst::FlowError::Eos.into());
}
},
Some(cmd) = cmd_rx.recv() => match cmd {
Commands::Play => {
let Some(s) = &session else {
return Err(RtspError::InvalidMessage("Can't PLAY, no SETUP").into());
};
self.post_start("request", "PLAY request sent");
let cseq = state.play(s).await.map_err(|err| {
self.post_cancelled("request", "PLAY request cancelled");
err
})?;
expected_response = Some((Method::Play, cseq));
},
Commands::Teardown(tx) => {
gst::info!(CAT, "Received Teardown command");
let Some(s) = &session else {
return Err(RtspError::InvalidMessage("Can't TEARDOWN, no SETUP").into());
};
let _ = state.teardown(s).await;
if let Some(tx) = tx {
let _ = tx.send(());
}
break;
}
Commands::Data(data) => {
// We currently only send RTCP RR as data messages, this will change when
// we support TCP ONVIF backchannels
state.sink.send(Message::Data(data)).await?;
gst::debug!(CAT, "Sent RTCP RR over TCP");
}
},
else => {
gst::error!(CAT, "No select statement matched, breaking loop");
break;
}
}
}
Ok(())
}
}
struct RtspManager {
inner: gst::Element,
using_rtpbin2: bool,
}
impl RtspManager {
fn new(rtpbin2: bool) -> Self {
let name = if rtpbin2 { "rtpbin2" } else { "rtpbin" };
RtspManager {
inner: gst::ElementFactory::make_with_name(name, None)
.unwrap_or_else(|_| panic!("{name} not found")),
using_rtpbin2: rtpbin2,
}
}
fn rtp_recv_sinkpad(&self, rtpsession: usize) -> Option<gst::Pad> {
let name = if self.using_rtpbin2 {
format!("rtp_recv_sink_{}", rtpsession)
} else {
format!("recv_rtp_sink_{}", rtpsession)
};
self.inner.request_pad_simple(&name)
}
fn rtcp_recv_sinkpad(&self, rtpsession: usize) -> Option<gst::Pad> {
let name = if self.using_rtpbin2 {
format!("rtcp_recv_sink_{}", rtpsession)
} else {
format!("recv_rtcp_sink_{}", rtpsession)
};
self.inner.request_pad_simple(&name)
}
fn rtcp_send_srcpad(&self, rtpsession: usize) -> Option<gst::Pad> {
let name = if self.using_rtpbin2 {
format!("rtcp_send_src_{}", rtpsession)
} else {
format!("send_rtcp_src_{}", rtpsession)
};
self.inner.request_pad_simple(&name)
}
}
struct RtspTaskState {
cseq: u32,
url: Url,
version: Version,
content_base_or_location: Option<String>,
aggregate_control: Option<Url>,
sdp: Option<sdp_types::Session>,
stream:
Pin<Box<dyn Stream<Item = Result<Message<Body>, super::tcp_message::ReadError>> + Send>>,
sink: Pin<Box<dyn Sink<Message<Body>, Error = std::io::Error> + Send>>,
setup_params: Vec<RtspSetupParams>,
handles: Vec<JoinHandle<()>>,
}
struct RtspSetupParams {
control_url: Url,
transport: RtspTransportInfo,
rtp_appsrc: Option<gst_app::AppSrc>,
caps: gst::Caps,
}
impl RtspTaskState {
fn new(url: Url, stream: RtspStream, sink: RtspSink) -> Self {
RtspTaskState {
cseq: 0u32,
url,
version: Version::V1_0,
content_base_or_location: None,
aggregate_control: None,
sdp: None,
stream,
sink,
setup_params: Vec::new(),
handles: Vec::new(),
}
}
fn check_response(
rsp: &Response<Body>,
cseq: u32,
req_name: Method,
session: Option<&Session>,
) -> Result<(), RtspError> {
if rsp.status() != StatusCode::Ok {
return Err(RtspError::Fatal(format!(
"{req_name:?} request failed: {}",
rsp.reason_phrase()
)));
}
match rsp.typed_header::<CSeq>() {
Ok(Some(v)) => {
if *v != cseq {
return Err(RtspError::InvalidMessage("cseq does not match"));
}
}
Ok(None) => {
gst::warning!(
CAT,
"No cseq in response, continuing... {:#?}",
rsp.headers().collect::<Vec<_>>()
);
}
Err(_) => {
gst::warning!(
CAT,
"Invalid cseq in response, continuing... {:#?}",
rsp.headers().collect::<Vec<_>>()
);
}
};
if let Some(s) = session {
if let Some(have_s) = rsp.typed_header::<Session>()? {
if s.0 != have_s.0 {
return Err(RtspError::Fatal(format!(
"Session in header {} does not match our session {}",
s.0, have_s.0
)));
}
} else {
gst::warning!(
CAT,
"No Session header in response, continuing... {:#?}",
rsp.headers().collect::<Vec<_>>()
);
}
}
Ok(())
}
async fn options(&mut self) -> Result<(), RtspError> {
self.cseq += 1;
let req = Request::builder(Method::Options, self.version)
.typed_header::<CSeq>(&self.cseq.into())
.request_uri(self.url.clone())
.header(USER_AGENT, DEFAULT_USER_AGENT)
.build(Body::default());
gst::debug!(CAT, "-->> {req:#?}");
self.sink.send(req.into()).await?;
let rsp = match self.stream.next().await {
Some(Ok(rtsp_types::Message::Response(rsp))) => Ok(rsp),
Some(Ok(m)) => Err(RtspError::UnexpectedMessage("OPTIONS response", m)),
Some(Err(e)) => Err(e.into()),
None => Err(
std::io::Error::new(std::io::ErrorKind::UnexpectedEof, "options response").into(),
),
}?;
gst::debug!(CAT, "<<-- {rsp:#?}");
Self::check_response(&rsp, self.cseq, Method::Options, None)?;
let Ok(Some(methods)) = rsp.typed_header::<Public>() else {
return Err(RtspError::InvalidMessage(
"OPTIONS response does not contain a valid Public header",
));
};
let needed = [
Method::Describe,
Method::Setup,
Method::Play,
Method::Teardown,
];
let mut unsupported = Vec::new();
for method in &needed {
if !methods.contains(method) {
unsupported.push(format!("{method:?}"));
}
}
if !unsupported.is_empty() {
Err(RtspError::Fatal(format!(
"Server doesn't support the required method{} {}",
if unsupported.len() == 1 { "" } else { "s:" },
unsupported.join(",")
)))
} else {
Ok(())
}
}
async fn describe(&mut self) -> Result<(), RtspError> {
self.cseq += 1;
let req = Request::builder(Method::Describe, self.version)
.typed_header::<CSeq>(&self.cseq.into())
.header(USER_AGENT, DEFAULT_USER_AGENT)
.header(ACCEPT, "application/sdp")
.request_uri(self.url.clone())
.build(Body::default());
gst::debug!(CAT, "-->> {req:#?}");
self.sink.send(req.into()).await?;
let rsp = match self.stream.next().await {
Some(Ok(rtsp_types::Message::Response(rsp))) => Ok(rsp),
Some(Ok(m)) => Err(RtspError::UnexpectedMessage("DESCRIBE response", m)),
Some(Err(e)) => Err(e.into()),
None => Err(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
"describe response",
)
.into()),
}?;
gst::debug!(
CAT,
"<<-- Response {:#?}",
rsp.headers().collect::<Vec<_>>()
);
Self::check_response(&rsp, self.cseq, Method::Describe, None)?;
self.content_base_or_location = rsp
.header(&CONTENT_BASE)
.or(rsp.header(&CONTENT_LOCATION))
.map(|v| v.to_string());
gst::info!(CAT, "{}", std::str::from_utf8(rsp.body()).unwrap());
// TODO: read range attribute from SDP for VOD use-cases
let sdp = sdp_types::Session::parse(rsp.body())?;
gst::debug!(CAT, "{sdp:#?}");
self.sdp.replace(sdp);
Ok(())
}
fn parse_fmtp(fmtp: &str, s: &mut gst::structure::Structure) {
// Non-compliant RTSP servers will incorrectly set these here, ignore them
let ignore_fields = [
"media",
"payload",
"clock-rate",
"encoding-name",
"encoding-params",
];
let encoding_name = s.get::<String>("encoding-name").unwrap();
let Some((_, fmtp)) = fmtp.split_once(' ') else {
gst::warning!(CAT, "Could not parse fmtp: {fmtp}");
return;
};
let iter = fmtp.split(';').map_while(|x| x.split_once('='));
for (k, v) in iter {
let k = k.trim().to_ascii_lowercase();
if ignore_fields.contains(&k.as_str()) {
continue;
}
if encoding_name == "H264" && k == "profile-level-id" {
let profile_idc = u8::from_str_radix(&v[0..2], 16);
let csf_idc = u8::from_str_radix(&v[2..4], 16);
let level_idc = u8::from_str_radix(&v[4..6], 16);
if let (Ok(p), Ok(c), Ok(l)) = (profile_idc, csf_idc, level_idc) {
let sps = &[p, c, l];
let profile = gst_pbutils::codec_utils_h264_get_profile(sps);
let level = gst_pbutils::codec_utils_h264_get_level(sps);
if let (Ok(profile), Ok(level)) = (profile, level) {
s.set("profile", profile);
s.set("level", level);
continue;
}
}
gst::warning!(CAT, "Failed to parse profile-level-id {v}, ignoring...");
continue;
}
s.set(k, v);
}
}
fn parse_rtpmap(rtpmap: &str, s: &mut gst::structure::Structure) -> Result<(), RtspError> {
let Some((_, rtpmap)) = rtpmap.split_once(' ') else {
return Err(RtspError::InvalidMessage(
"Could not parse rtpmap: {rtpmap}",
));
};
let mut iter = rtpmap.split('/');
let Some(encoding_name) = iter.next() else {
return Err(RtspError::InvalidMessage(
"Could not parse encoding-name from rtpmap: {rtpmap}",
));
};
s.set("encoding-name", encoding_name);
let Some(v) = iter.next() else {
return Err(RtspError::InvalidMessage(
"Could not parse clock-rate from rtpmap: {rtpmap}",
));
};
let Ok(clock_rate) = v.parse::<i32>() else {
return Err(RtspError::InvalidMessage(
"Could not parse clock-rate from rtpmap: {rtpmap}",
));
};
s.set("clock-rate", clock_rate);
if let Some(v) = iter.next() {
s.set("encoding-params", v);
}
debug_assert!(iter.next().is_none());
Ok(())
}
// https://datatracker.ietf.org/doc/html/rfc2326#appendix-C.1.1
fn parse_control_path(path: &str, base: &Url) -> Option<Url> {
match Url::parse(path) {
Ok(v) => Some(v),
Err(url::ParseError::RelativeUrlWithoutBase) => {
if path == "*" {
Some(base.clone())
} else {
base.join(path).ok()
}
}
Err(_) => None,
}
}
fn parse_setup_transports(
transports: Transports,
s: &mut gst::Structure,
protocols: &[RtspProtocol],
mode: &TransportMode,
) -> Result<RtspTransportInfo, RtspError> {
let mut last_error =
RtspError::Fatal("No matching transport found matching selected protocols".to_string());
let mut parsed_transports = Vec::new();
for transport in transports.iter() {
let Transport::Rtp(t) = transport else {
last_error =
RtspError::Fatal(format!("Expected RTP transport, got {:#?}", transports));
continue;
};
// RTSP 2 specifies that we can have multiple SSRCs in the response
// Transport header, but it's not clear why, so we don't support it
if let Some(ssrc) = t.params.ssrc.first() {
s.set("ssrc", ssrc)
}
if !t.params.mode.is_empty() && !t.params.mode.contains(mode) {
last_error = RtspError::Fatal(format!(
"Requested mode {:?} doesn't match server modes: {:?}",
mode, t.params.mode
));
continue;
}
let parsed = match RtspTransportInfo::try_from(t) {
Ok(p) => p,
Err(err) => {
last_error = err;
continue;
}
};
parsed_transports.push(parsed);
}
for protocol in protocols {
for n in 0..parsed_transports.len() {
if parsed_transports[n].to_protocol() == *protocol {
let t = parsed_transports.swap_remove(n);
return Ok(t);
}
}
}
Err(last_error)
}
async fn setup(
&mut self,
session: &mut Option<Session>,
port_start: u16,
protocols: &[RtspProtocol],
mode: TransportMode,
) -> Result<Vec<RtspSetupParams>, RtspError> {
let sdp = self.sdp.as_ref().expect("Must have SDP by now");
let base = self
.content_base_or_location
.as_ref()
.and_then(|s| Url::parse(s).ok())
.unwrap_or_else(|| self.url.clone());
self.aggregate_control = sdp
.get_first_attribute_value("control")
// No attribute and no value have the same meaning for us
.ok()
.flatten()
.and_then(|v| Self::parse_control_path(v, &base));
let mut b = gst::Structure::builder("application/x-rtp");
let skip_attrs = ["control", "range"];
for sdp_types::Attribute { attribute, value } in &sdp.attributes {
if skip_attrs.contains(&attribute.as_str()) {
continue;
}
b = b.field(format!("a-{attribute}"), value);
}
let message_structure = b.build();
let conn_source = sdp
.connection
.as_ref()
.map(|c| c.connection_address.as_str())
.filter(|c| !c.is_empty())
.unwrap_or_else(|| base.host_str().unwrap());
let mut port_next = port_start;
let mut stream_num = 0;
let mut setup_params: Vec<RtspSetupParams> = Vec::new();
let skip_attrs = ["control", "rtpmap", "fmtp"];
for m in &sdp.medias {
if !["audio", "video"].contains(&m.media.as_str()) {
gst::info!(CAT, "Ignoring unsupported media {}", m.media);
continue;
}
let media_control = m
.get_first_attribute_value("control")
// No attribute and no value have the same meaning for us
.ok()
.flatten()
.and_then(|v| Self::parse_control_path(v, &base));
let Some(control_url) = media_control.as_ref().or(self.aggregate_control.as_ref())
else {
gst::warning!(
CAT,
"No session control or media control for {} fmt {}, ignoring",
m.media,
m.fmt
);
continue;
};
// RTP caps
// FIXME: move SDP -> Caps parsing to a separate file
debug_assert_eq!(m.port, 0); // TCP
let Ok(pt) = m.fmt.parse::<i32>() else {
gst::error!(CAT, "Could not parse pt: {}, ignoring media", m.fmt);
continue;
};
let mut s = message_structure.clone();
s.set("media", &m.media);
s.set("payload", pt);
if let Ok(Some(rtpmap)) = m.get_first_attribute_value("rtpmap") {
Self::parse_rtpmap(rtpmap, &mut s)?;
} else {
gst::warning!(CAT, "No rtpmap for {} {}, skipping", m.media, m.fmt);
continue;
}
if let Ok(Some(fmtp)) = m.get_first_attribute_value("fmtp") {
Self::parse_fmtp(fmtp, &mut s);
}
for sdp_types::Attribute { attribute, value } in &m.attributes {
if skip_attrs.contains(&attribute.as_str()) {
continue;
}
// https://github.com/sdroege/sdp-types/issues/17
if attribute == "ssrc" {
continue;
}
s.set(format!("a-{attribute}"), value);
}
// TODO: rtcp-fb: fields
if s.get_optional("encoding-name") == Ok(Some("H264")) {
if s.get_optional("level-asymmetry-allowed") != Ok(Some("0"))
&& s.has_field("level")
{
s.remove_field("level");
}
if s.has_field("level-asymmetry-allowed") {
s.remove_field("level-asymmetry-allowed");
};
}
// SETUP
let mut rtp_socket: Option<UdpSocket> = None;
let mut rtcp_socket: Option<UdpSocket> = None;
let mut transports = Vec::new();
let mut is_ipv4 = true;
let mut conn_protocols = BTreeSet::new();
for conn in &m.connections {
if conn.nettype != "IN" {
continue;
}
// XXX: For now, assume that all connections use the same addrtype
match conn.addrtype.as_str() {
"IP4" => is_ipv4 = true,
"IP6" => is_ipv4 = false,
_ => continue,
};
// Strip subnet mask, if any
let addr = if let Some((first, _)) = conn.connection_address.split_once('/') {
first
} else {
conn.connection_address.as_str()
};
let Ok(addr) = addr.parse::<IpAddr>() else {
continue;
};
// If this is an instance of gst-rtsp-server that only supports
// udp-multicast, it will put the multicast address in the media
// connections field.
if addr.is_multicast() {
conn_protocols.insert(RtspProtocol::UdpMulticast);
} else {
conn_protocols.insert(RtspProtocol::Tcp);
conn_protocols.insert(RtspProtocol::Udp);
}
}
let protocols = if !conn_protocols.is_empty() {
let p = protocols.iter().cloned().collect::<BTreeSet<_>>();
p.intersection(&conn_protocols).cloned().collect::<Vec<_>>()
} else {
protocols.to_owned()
};
if protocols.is_empty() {
gst::error!(CAT, "No available protocols left, skipping media");
continue;
}
if protocols.contains(&RtspProtocol::UdpMulticast) {
let params = RtpTransportParameters {
mode: vec![mode.clone()],
multicast: true,
..Default::default()
};
transports.push(Transport::Rtp(RtpTransport {
profile: RtpProfile::Avp,
lower_transport: Some(RtpLowerTransport::Udp),
params,
}));
}
if protocols.contains(&RtspProtocol::Udp) {
let (sock1, rtp_port) = bind_start_port(port_next, is_ipv4).await;
// Get the actual port that was successfully bound
port_next = rtp_port;
let (sock2, rtcp_port) = bind_start_port(rtp_port + 1, is_ipv4).await;
rtp_socket = Some(sock1);
rtcp_socket = Some(sock2);
let params = RtpTransportParameters {
mode: vec![mode.clone()],
unicast: true,
client_port: Some((rtp_port, Some(rtcp_port))),
..Default::default()
};
transports.push(Transport::Rtp(RtpTransport {
profile: RtpProfile::Avp,
lower_transport: Some(RtpLowerTransport::Udp),
params,
}));
}
if protocols.contains(&RtspProtocol::Tcp) {
let params = RtpTransportParameters {
mode: vec![mode.clone()],
interleaved: Some((stream_num, Some(stream_num + 1))),
..Default::default()
};
transports.push(Transport::Rtp(RtpTransport {
// RTSP 2.0 adds AVPF and more
profile: RtpProfile::Avp,
lower_transport: Some(RtpLowerTransport::Tcp),
params,
}));
}
self.cseq += 1;
let req = Request::builder(Method::Setup, self.version)
.typed_header::<CSeq>(&self.cseq.into())
.header(USER_AGENT, DEFAULT_USER_AGENT)
.typed_header::<Transports>(&transports.as_slice().into())
.request_uri(control_url.clone());
let req = if let Some(s) = session {
req.typed_header::<Session>(s)
} else {
req
};
let req = req.build(Body::default());
let cseq = self.cseq;
gst::debug!(CAT, "-->> {req:#?}");
self.sink.send(req.into()).await?;
// RTSP 2 supports pipelining of SETUP requests, so this ping-pong would have to be
// reworked if we want to support it.
let rsp = match self.stream.next().await {
Some(Ok(rtsp_types::Message::Response(rsp))) => Ok(rsp),
Some(Ok(m)) => Err(RtspError::UnexpectedMessage("SETUP response", m)),
Some(Err(e)) => Err(e.into()),
None => Err(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
"setup response",
)
.into()),
}?;
gst::debug!(CAT, "<<-- {rsp:#?}");
Self::check_response(&rsp, cseq, Method::Setup, session.as_ref())?;
let new_session = rsp
.typed_header::<Session>()?
.ok_or(RtspError::InvalidMessage("No session in SETUP response"))?;
// Manually strip timeout field: https://github.com/sdroege/rtsp-types/issues/24
session.replace(Session(new_session.0, None));
let mut parsed_transport = if let Some(transports) = rsp.typed_header::<Transports>()? {
Self::parse_setup_transports(transports, &mut s, &protocols, &mode)
} else {
// FIXME: Transport header in response is optional
// https://datatracker.ietf.org/doc/html/rfc2326#section-12.39
Err(RtspError::InvalidMessage(
"No transport header in SETUP response",
))
}?;
match &mut parsed_transport {
RtspTransportInfo::UdpMulticast { .. } => {}
RtspTransportInfo::Udp {
source,
server_port: _,
client_port,
sockets,
} => {
if source.is_none() {
*source = Some(conn_source.to_string());
}
if let Some((rtp_port, rtcp_port)) = client_port {
// There is no reason for the server to reject the client ports WE
// selected, so if it does, just ignore it.
if *rtp_port != port_next {
gst::warning!(
CAT,
"RTP port changed: {port_next} -> {rtp_port}, ignoring"
);
*rtp_port = port_next;
}
port_next += 1;
*sockets = if let Some(rtcp_port) = rtcp_port {
if *rtcp_port != port_next {
gst::warning!(
CAT,
"RTCP port changed: {port_next} -> {rtcp_port}, ignoring"
);
*rtcp_port = port_next;
}
port_next += 1;
Some((rtp_socket.unwrap(), rtcp_socket))
} else {
Some((rtp_socket.unwrap(), None))
}
};
}
RtspTransportInfo::Tcp {
channels: (rtp_ch, rtcp_ch),
} => {
if *rtp_ch != stream_num {
gst::info!(CAT, "RTP channel changed: {stream_num} -> {rtp_ch}");
}
stream_num += 1;
if let Some(rtcp_ch) = rtcp_ch {
if *rtcp_ch != stream_num {
gst::info!(CAT, "RTCP channel changed: {stream_num} -> {rtcp_ch}");
}
stream_num += 1;
}
}
};
let caps = gst::Caps::from(s);
setup_params.push(RtspSetupParams {
control_url: control_url.clone(),
transport: parsed_transport,
rtp_appsrc: None,
caps,
});
}
Ok(setup_params)
}
async fn play(&mut self, session: &Session) -> Result<u32, RtspError> {
self.cseq += 1;
let request_uri = self.aggregate_control.as_ref().unwrap_or(&self.url).clone();
let req = Request::builder(Method::Play, self.version)
.typed_header::<CSeq>(&self.cseq.into())
.typed_header::<Range>(&Range::Npt(NptRange::From(NptTime::Now)))
.header(USER_AGENT, DEFAULT_USER_AGENT)
.request_uri(request_uri)
.typed_header::<Session>(session);
let req = req.build(Body::default());
gst::debug!(CAT, "-->> {req:#?}");
self.sink.send(req.into()).await?;
Ok(self.cseq)
}
async fn play_response(
&mut self,
rsp: &Response<Body>,
cseq: u32,
session: &Session,
) -> Result<(), RtspError> {
Self::check_response(rsp, cseq, Method::Play, Some(session))?;
if let Some(RtpInfos::V1(rtpinfos)) = rsp.typed_header::<RtpInfos>()? {
for rtpinfo in rtpinfos {
for params in self.setup_params.iter_mut() {
if params.control_url == rtpinfo.uri {
let mut changed = false;
let mut caps = params.rtp_appsrc.as_ref().unwrap().caps().unwrap();
let capsref = caps.make_mut();
if let Some(v) = rtpinfo.seq {
capsref.set("seqnum-base", v as u32);
changed = true;
}
if let Some(v) = rtpinfo.rtptime {
capsref.set("clock-base", v);
changed = true;
}
if changed {
params.rtp_appsrc.as_ref().unwrap().set_caps(Some(&caps));
}
}
}
}
} else {
gst::warning!(CAT, "No RTPInfos V1 header in PLAY response");
};
Ok(())
}
async fn teardown(&mut self, session: &Session) -> Result<u32, RtspError> {
self.cseq += 1;
let request_uri = self.aggregate_control.as_ref().unwrap_or(&self.url).clone();
let req = Request::builder(Method::Teardown, self.version)
.typed_header::<CSeq>(&self.cseq.into())
.header(USER_AGENT, DEFAULT_USER_AGENT)
.request_uri(request_uri)
.typed_header::<Session>(session);
let req = req.build(Body::default());
gst::debug!(CAT, "-->> {req:#?}");
self.sink.send(req.into()).await?;
Ok(self.cseq)
}
async fn teardown_response(
&mut self,
rsp: &Response<Body>,
cseq: u32,
session: &Session,
) -> Result<(), RtspError> {
Self::check_response(rsp, cseq, Method::Teardown, Some(session))?;
Ok(())
}
}
fn bind_port(port: u16, is_ipv4: bool) -> Result<UdpSocket, std::io::Error> {
let domain = if is_ipv4 {
socket2::Domain::IPV4
} else {
socket2::Domain::IPV6
};
let sock = Socket::new(domain, socket2::Type::DGRAM, Some(socket2::Protocol::UDP))?;
let _ = sock.set_reuse_address(true);
#[cfg(unix)]
let _ = sock.set_reuse_port(true);
sock.set_nonblocking(true)?;
let addr: SocketAddr = if is_ipv4 {
SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, port))
} else {
SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::UNSPECIFIED, port, 0, 0))
};
sock.bind(&addr.into())?;
let bound_port = if is_ipv4 {
sock.local_addr()?.as_socket_ipv4().unwrap().port()
} else {
sock.local_addr()?.as_socket_ipv6().unwrap().port()
};
gst::debug!(CAT, "Bound to UDP port {bound_port}");
UdpSocket::from_std(sock.into())
}
async fn bind_start_port(port: u16, is_ipv4: bool) -> (UdpSocket, u16) {
let mut next_port = port;
loop {
match bind_port(next_port, is_ipv4) {
Ok(socket) => {
if next_port != 0 {
return (socket, next_port);
}
let addr = socket
.local_addr()
.expect("Newly-bound port should not fail");
return (socket, addr.port());
}
Err(err) => {
gst::debug!(CAT, "Failed to bind to {next_port}: {err:?}, trying next");
next_port += 1;
// If we fail too much, panic instead of forever doing a hot-loop
if (next_port - MAX_BIND_PORT_RETRY) > port {
panic!("Failed to allocate any ports from {port} to {next_port}");
}
}
};
}
}
fn on_rtcp_udp(
appsink: &gst_app::AppSink,
tx: mpsc::Sender<MappedBuffer<Readable>>,
) -> Result<gst::FlowSuccess, gst::FlowError> {
let Ok(sample) = appsink.pull_sample() else {
return Err(gst::FlowError::Error);
};
let Some(buffer) = sample.buffer_owned() else {
return Ok(gst::FlowSuccess::Ok);
};
let map = buffer.into_mapped_buffer_readable();
match map {
Ok(map) => match tx.try_send(map) {
Ok(_) => Ok(gst::FlowSuccess::Ok),
Err(mpsc::error::TrySendError::Full(_)) => {
gst::error!(CAT, "Could not send RTCP, channel is full");
Err(gst::FlowError::Error)
}
Err(mpsc::error::TrySendError::Closed(_)) => Err(gst::FlowError::Eos),
},
Err(err) => {
gst::error!(CAT, "Failed to map buffer: {err:?}");
Err(gst::FlowError::Error)
}
}
}
fn on_rtcp_tcp(
appsink: &gst_app::AppSink,
cmd_tx: mpsc::Sender<Commands>,
rtcp_channel: u8,
) -> Result<gst::FlowSuccess, gst::FlowError> {
let Ok(sample) = appsink.pull_sample() else {
return Err(gst::FlowError::Error);
};
let Some(buffer) = sample.buffer_owned() else {
return Ok(gst::FlowSuccess::Ok);
};
let map = buffer.into_mapped_buffer_readable();
match map {
Ok(map) => {
let data: rtsp_types::Data<Body> =
rtsp_types::Data::new(rtcp_channel, Body::mapped(map));
let cmd_tx = cmd_tx.clone();
RUNTIME.spawn(async move { cmd_tx.send(Commands::Data(data)).await });
Ok(gst::FlowSuccess::Ok)
}
Err(err) => {
gst::error!(CAT, "Failed to map buffer: {err:?}");
Err(gst::FlowError::Error)
}
}
}
async fn udpsrc_task(socket: &UdpSocket, appsrc: gst_app::AppSrc, timeout: Option<gst::ClockTime>) {
// TODO: this should allocate a buffer pool to avoid a copy
let mut buf = vec![0; UDP_PACKET_MAX_SIZE];
let t = Duration::from_secs(timeout.unwrap_or(gst::ClockTime::MAX).into());
loop {
match time::timeout(t, socket.recv(&mut buf)).await {
Ok(Ok(len)) => {
let t = appsrc.current_running_time();
let mut buffer = gst::Buffer::from_slice(buf[..len].to_owned());
let bufref = buffer.make_mut();
bufref.set_dts(t);
if let Err(err) = appsrc.push_buffer(buffer) {
gst::element_error!(
appsrc,
gst::ResourceError::Failed,
("UDP buffer push failed: {:?}", err),
["{:#?}", socket]
);
break;
}
}
Ok(Err(_elapsed)) => {
gst::element_error!(
appsrc,
gst::ResourceError::Failed,
["No data received after {DEFAULT_TIMEOUT} seconds, exiting"]
);
break;
}
Err(err) => {
gst::element_error!(
appsrc,
gst::ResourceError::Close,
("UDP socket was closed: {:?}", err),
["{:#?}", socket]
);
break;
}
};
}
}
async fn udpsink_task(socket: &UdpSocket, mut rx: mpsc::Receiver<MappedBuffer<Readable>>) {
loop {
match rx.recv().await {
Some(data) => match socket.send(data.as_ref()).await {
Ok(_) => {
gst::debug!(CAT, "Sent RTCP RR");
}
Err(err) => {
gst::error!(CAT, "UDP socket send error: {err:?}, quitting loop");
rx.close();
break;
}
},
None => {
gst::info!(CAT, "UDP socket {socket:?} closed, quitting loop");
rx.close();
break;
}
};
}
}
#[glib::object_subclass]
impl ObjectSubclass for RtspSrc {
const NAME: &'static str = "GstRtspSrc2";
type Type = super::RtspSrc;
type ParentType = gst::Bin;
type Interfaces = (gst::URIHandler,);
}