diff --git a/net/rtp/src/rtpbin2/config.rs b/net/rtp/src/rtpbin2/config.rs index 5f9c1130..f8579bba 100644 --- a/net/rtp/src/rtpbin2/config.rs +++ b/net/rtp/src/rtpbin2/config.rs @@ -6,22 +6,22 @@ use gst::subclass::prelude::*; use once_cell::sync::Lazy; use std::sync::{Mutex, Weak}; -use crate::rtpbin2::imp::BinSessionInner; +use crate::rtpbin2::internal::SharedSessionInner; static CAT: Lazy = Lazy::new(|| { gst::DebugCategory::new( - "rtpbin2-config", + "rtp2-config", gst::DebugColorFlags::empty(), - Some("RtpBin2 config"), + Some("Rtp2 config"), ) }); glib::wrapper! { - pub struct RtpBin2Session(ObjectSubclass); + pub struct Rtp2Session(ObjectSubclass); } -impl RtpBin2Session { - pub(crate) fn new(weak_session: Weak>) -> Self { +impl Rtp2Session { + pub(crate) fn new(weak_session: Weak>) -> Self { let ret = glib::Object::new::(); let imp = ret.imp(); imp.set_session(weak_session); @@ -36,21 +36,21 @@ mod imp { #[derive(Debug, Default)] struct State { - pub(super) weak_session: Option>>, + pub(super) weak_session: Option>>, } #[derive(Debug, Default)] - pub struct RtpBin2Session { + pub struct Rtp2Session { state: Mutex, } - impl RtpBin2Session { - pub(super) fn set_session(&self, weak_session: Weak>) { + impl Rtp2Session { + pub(super) fn set_session(&self, weak_session: Weak>) { let mut state = self.state.lock().unwrap(); state.weak_session = Some(weak_session); } - fn session(&self) -> Option>> { + fn session(&self) -> Option>> { self.state .lock() .unwrap() @@ -85,7 +85,7 @@ mod imp { } pub fn pt_map(&self) -> gst::Structure { - let mut ret = gst::Structure::builder("application/x-rtpbin2-pt-map"); + let mut ret = gst::Structure::builder("application/x-rtp2-pt-map"); let Some(session) = self.session() else { return ret.build(); }; @@ -108,13 +108,13 @@ mod imp { } #[glib::object_subclass] - impl ObjectSubclass for RtpBin2Session { - const NAME: &'static str = "GstRtpBin2Session"; - type Type = super::RtpBin2Session; + impl ObjectSubclass for Rtp2Session { + const NAME: &'static str = "GstRtp2Session"; + type Type = super::Rtp2Session; type ParentType = glib::Object; } - impl ObjectImpl for RtpBin2Session { + impl ObjectImpl for Rtp2Session { fn properties() -> &'static [glib::ParamSpec] { static PROPERTIES: Lazy> = Lazy::new(|| { vec![glib::ParamSpecBoxed::builder::("pt-map") @@ -164,40 +164,57 @@ mod imp { #[cfg(test)] mod tests { - use std::sync::{atomic::AtomicBool, Arc}; + use std::sync::{ + atomic::{AtomicBool, AtomicUsize}, + Arc, + }; use crate::{rtpbin2::session::tests::generate_rtp_packet, test_init}; use super::*; + static ELEMENT_COUNTER: AtomicUsize = AtomicUsize::new(0); + + fn next_element_counter() -> usize { + ELEMENT_COUNTER.fetch_add(1, std::sync::atomic::Ordering::SeqCst) + } + #[test] fn pt_map_get_empty() { test_init(); - let rtpbin2 = gst::ElementFactory::make("rtpbin2").build().unwrap(); - let _pad = rtpbin2.request_pad_simple("rtp_send_sink_0").unwrap(); + let id = next_element_counter(); + let rtpbin2 = gst::ElementFactory::make("rtpsend") + .property("rtp-id", id.to_string()) + .build() + .unwrap(); + let _pad = rtpbin2.request_pad_simple("rtp_sink_0").unwrap(); let session = rtpbin2.emit_by_name::("get-session", &[&0u32]); let pt_map = session.property::("pt-map"); - assert!(pt_map.has_name("application/x-rtpbin2-pt-map")); + assert!(pt_map.has_name("application/x-rtp2-pt-map")); assert_eq!(pt_map.fields().len(), 0); } #[test] fn pt_map_set() { test_init(); - let rtpbin2 = gst::ElementFactory::make("rtpbin2").build().unwrap(); - let _pad = rtpbin2.request_pad_simple("rtp_send_sink_0").unwrap(); + let id = next_element_counter(); + let rtpbin2 = gst::ElementFactory::make("rtpsend") + .property("rtp-id", id.to_string()) + .build() + .unwrap(); + let _pad = rtpbin2.request_pad_simple("rtp_sink_0").unwrap(); let session = rtpbin2.emit_by_name::("get-session", &[&0u32]); let pt = 96i32; let pt_caps = gst::Caps::builder("application/x-rtp") .field("payload", pt) .field("clock-rate", 90000i32) .build(); - let pt_map = gst::Structure::builder("application/x-rtpbin2-pt-map") + let pt_map = gst::Structure::builder("application/x-rtp2-pt-map") .field(pt.to_string(), pt_caps.clone()) .build(); session.set_property("pt-map", pt_map); let prop = session.property::("pt-map"); - assert!(prop.has_name("application/x-rtpbin2-pt-map")); + assert!(prop.has_name("application/x-rtp2-pt-map")); assert_eq!(prop.fields().len(), 1); let caps = prop.get::(pt.to_string()).unwrap(); assert_eq!(pt_caps, caps); @@ -206,12 +223,16 @@ mod tests { #[test] fn pt_map_set_none() { test_init(); - let rtpbin2 = gst::ElementFactory::make("rtpbin2").build().unwrap(); - let _pad = rtpbin2.request_pad_simple("rtp_send_sink_0").unwrap(); + let id = next_element_counter(); + let rtpbin2 = gst::ElementFactory::make("rtpsend") + .property("rtp-id", id.to_string()) + .build() + .unwrap(); + let _pad = rtpbin2.request_pad_simple("rtp_sink_0").unwrap(); let session = rtpbin2.emit_by_name::("get-session", &[&0u32]); session.set_property("pt-map", None::); let prop = session.property::("pt-map"); - assert!(prop.has_name("application/x-rtpbin2-pt-map")); + assert!(prop.has_name("application/x-rtp2-pt-map")); } #[test] @@ -219,12 +240,13 @@ mod tests { test_init(); let ssrc = 0x12345678; let new_ssrc_hit = Arc::new(AtomicBool::new(false)); - let rtpbin2 = gst::ElementFactory::make("rtpbin2").build().unwrap(); - let mut h = gst_check::Harness::with_element( - &rtpbin2, - Some("rtp_send_sink_0"), - Some("rtp_send_src_0"), - ); + let id = next_element_counter(); + let rtpbin2 = gst::ElementFactory::make("rtpsend") + .property("rtp-id", id.to_string()) + .build() + .unwrap(); + let mut h = + gst_check::Harness::with_element(&rtpbin2, Some("rtp_sink_0"), Some("rtp_src_0")); let session = h .element() .unwrap() @@ -257,13 +279,14 @@ mod tests { test_init(); let ssrc = 0x12345678; let (bye_ssrc_sender, bye_ssrc_receiver) = std::sync::mpsc::sync_channel(16); - let rtpbin2 = gst::ElementFactory::make("rtpbin2").build().unwrap(); - let mut h = gst_check::Harness::with_element( - &rtpbin2, - Some("rtp_send_sink_0"), - Some("rtp_send_src_0"), - ); - let mut h_rtcp = gst_check::Harness::with_element(&rtpbin2, None, Some("rtcp_send_src_0")); + let id = next_element_counter(); + let rtpbin2 = gst::ElementFactory::make("rtpsend") + .property("rtp-id", id.to_string()) + .build() + .unwrap(); + let mut h = + gst_check::Harness::with_element(&rtpbin2, Some("rtp_sink_0"), Some("rtp_src_0")); + let mut h_rtcp = gst_check::Harness::with_element(&rtpbin2, None, Some("rtcp_src_0")); let session = h .element() .unwrap() diff --git a/net/rtp/src/rtpbin2/internal.rs b/net/rtp/src/rtpbin2/internal.rs new file mode 100644 index 00000000..033311a8 --- /dev/null +++ b/net/rtp/src/rtpbin2/internal.rs @@ -0,0 +1,451 @@ +// SPDX-License-Identifier: MPL-2.0 + +use std::{ + collections::HashMap, + sync::{Arc, Mutex}, + task::Waker, + time::Duration, +}; + +use gst::glib; +use once_cell::sync::{Lazy, OnceCell}; + +use super::config::Rtp2Session; +use super::session::{RtpProfile, Session}; +use super::source::ReceivedRb; + +static CAT: Lazy = Lazy::new(|| { + gst::DebugCategory::new( + "rtpinternalsession", + gst::DebugColorFlags::empty(), + Some("RTP Session (internal)"), + ) +}); + +static SHARED_RTP_STATE: OnceCell>> = OnceCell::new(); + +#[derive(Debug, Clone)] +pub struct SharedRtpState { + name: String, + inner: Arc>, +} + +#[derive(Debug)] +struct SharedRtpStateInner { + sessions: HashMap, + send_outstanding: bool, + recv_outstanding: bool, +} + +impl SharedRtpState { + pub fn recv_get_or_init(name: String) -> Self { + SHARED_RTP_STATE + .get_or_init(|| Mutex::new(HashMap::new())) + .lock() + .unwrap() + .entry(name) + .and_modify(|v| { + v.inner.lock().unwrap().recv_outstanding = true; + }) + .or_insert_with_key(|name| SharedRtpState { + name: name.to_owned(), + inner: Arc::new(Mutex::new(SharedRtpStateInner { + sessions: HashMap::new(), + send_outstanding: false, + recv_outstanding: true, + })), + }) + .clone() + } + + pub fn send_get_or_init(name: String) -> Self { + SHARED_RTP_STATE + .get_or_init(|| Mutex::new(HashMap::new())) + .lock() + .unwrap() + .entry(name) + .and_modify(|v| { + v.inner.lock().unwrap().send_outstanding = true; + }) + .or_insert_with_key(|name| SharedRtpState { + name: name.to_owned(), + inner: Arc::new(Mutex::new(SharedRtpStateInner { + sessions: HashMap::new(), + send_outstanding: true, + recv_outstanding: false, + })), + }) + .clone() + } + + pub fn name(&self) -> &str { + &self.name + } + + pub fn unmark_send_outstanding(&self) { + let mut inner = self.inner.lock().unwrap(); + inner.send_outstanding = false; + if !inner.recv_outstanding { + Self::remove_from_global(&self.name); + } + } + + pub fn unmark_recv_outstanding(&self) { + let mut inner = self.inner.lock().unwrap(); + inner.recv_outstanding = false; + if !inner.send_outstanding { + Self::remove_from_global(&self.name); + } + } + + fn remove_from_global(name: &str) { + let _shared = SHARED_RTP_STATE.get().unwrap().lock().unwrap().remove(name); + } + + pub fn session_get_or_init(&self, id: usize, f: F) -> SharedSession + where + F: FnOnce() -> SharedSession, + { + self.inner + .lock() + .unwrap() + .sessions + .entry(id) + .or_insert_with(f) + .clone() + } +} + +#[derive(Debug, Clone)] +pub struct SharedSession { + pub(crate) id: usize, + pub(crate) inner: Arc>, + pub(crate) config: Rtp2Session, +} + +impl SharedSession { + pub fn new( + id: usize, + profile: RtpProfile, + min_rtcp_interval: Duration, + reduced_size_rtcp: bool, + ) -> Self { + let mut inner = SharedSessionInner::new(id); + inner.session.set_min_rtcp_interval(min_rtcp_interval); + inner.session.set_profile(profile); + inner.session.set_reduced_size_rtcp(reduced_size_rtcp); + let inner = Arc::new(Mutex::new(inner)); + let weak_inner = Arc::downgrade(&inner); + Self { + id, + inner, + config: Rtp2Session::new(weak_inner), + } + } +} + +#[derive(Debug)] +pub(crate) struct SharedSessionInner { + id: usize, + + pub(crate) session: Session, + + pub(crate) pt_map: HashMap, + + pub(crate) rtcp_waker: Option, + pub(crate) rtp_send_sinkpad: Option, +} + +impl SharedSessionInner { + fn new(id: usize) -> Self { + Self { + id, + + session: Session::new(), + + pt_map: HashMap::default(), + rtcp_waker: None, + rtp_send_sinkpad: None, + } + } + + pub fn clear_pt_map(&mut self) { + self.pt_map.clear(); + } + + pub fn add_caps(&mut self, caps: gst::Caps) { + let Some((pt, clock_rate)) = pt_clock_rate_from_caps(&caps) else { + return; + }; + let caps_clone = caps.clone(); + self.pt_map + .entry(pt) + .and_modify(move |entry| *entry = caps) + .or_insert_with(move || caps_clone); + self.session.set_pt_clock_rate(pt, clock_rate); + } + + pub(crate) fn caps_from_pt(&self, pt: u8) -> gst::Caps { + self.pt_map.get(&pt).cloned().unwrap_or( + gst::Caps::builder("application/x-rtp") + .field("payload", pt as i32) + .build(), + ) + } + + pub fn pt_map(&self) -> impl Iterator + '_ { + self.pt_map.iter().map(|(&k, v)| (k, v)) + } + + pub fn stats(&self) -> gst::Structure { + let mut session_stats = gst::Structure::builder("application/x-rtpbin2-session-stats") + .field("id", self.id as u64); + for ssrc in self.session.ssrcs() { + if let Some(ls) = self.session.local_send_source_by_ssrc(ssrc) { + let mut source_stats = + gst::Structure::builder("application/x-rtpbin2-source-stats") + .field("ssrc", ls.ssrc()) + .field("sender", true) + .field("local", true) + .field("packets-sent", ls.packet_count()) + .field("octets-sent", ls.octet_count()) + .field("bitrate", ls.bitrate() as u64); + if let Some(pt) = ls.payload_type() { + if let Some(clock_rate) = self.session.clock_rate_from_pt(pt) { + source_stats = source_stats.field("clock-rate", clock_rate); + } + } + if let Some(sr) = ls.last_sent_sr() { + source_stats = source_stats + .field("sr-ntptime", sr.ntp_timestamp().as_u64()) + .field("sr-rtptime", sr.rtp_timestamp()) + .field("sr-octet-count", sr.octet_count()) + .field("sr-packet-count", sr.packet_count()); + } + let rbs = gst::List::new(ls.received_report_blocks().map( + |(sender_ssrc, ReceivedRb { rb, .. })| { + gst::Structure::builder("application/x-rtcp-report-block") + .field("sender-ssrc", sender_ssrc) + .field("rb-fraction-lost", rb.fraction_lost()) + .field("rb-packets-lost", rb.cumulative_lost()) + .field("rb-extended_sequence_number", rb.extended_sequence_number()) + .field("rb-jitter", rb.jitter()) + .field("rb-last-sr-ntp-time", rb.last_sr_ntp_time()) + .field("rb-delay_since_last-sr-ntp-time", rb.delay_since_last_sr()) + .build() + }, + )); + match rbs.len() { + 0 => (), + 1 => { + source_stats = + source_stats.field("report-blocks", rbs.first().unwrap().clone()); + } + _ => { + source_stats = source_stats.field("report-blocks", rbs); + } + } + + // TODO: add jitter, packets-lost + session_stats = session_stats.field(ls.ssrc().to_string(), source_stats.build()); + } else if let Some(lr) = self.session.local_receive_source_by_ssrc(ssrc) { + let mut source_stats = + gst::Structure::builder("application/x-rtpbin2-source-stats") + .field("ssrc", lr.ssrc()) + .field("sender", false) + .field("local", true); + if let Some(pt) = lr.payload_type() { + if let Some(clock_rate) = self.session.clock_rate_from_pt(pt) { + source_stats = source_stats.field("clock-rate", clock_rate); + } + } + // TODO: add rb stats + session_stats = session_stats.field(lr.ssrc().to_string(), source_stats.build()); + } else if let Some(rs) = self.session.remote_send_source_by_ssrc(ssrc) { + let mut source_stats = + gst::Structure::builder("application/x-rtpbin2-source-stats") + .field("ssrc", rs.ssrc()) + .field("sender", true) + .field("local", false) + .field("octets-received", rs.octet_count()) + .field("packets-received", rs.packet_count()) + .field("bitrate", rs.bitrate() as u64) + .field("jitter", rs.jitter()) + .field("packets-lost", rs.packets_lost()); + if let Some(pt) = rs.payload_type() { + if let Some(clock_rate) = self.session.clock_rate_from_pt(pt) { + source_stats = source_stats.field("clock-rate", clock_rate); + } + } + if let Some(rtp_from) = rs.rtp_from() { + source_stats = source_stats.field("rtp-from", rtp_from.to_string()); + } + if let Some(rtcp_from) = rs.rtcp_from() { + source_stats = source_stats.field("rtcp-from", rtcp_from.to_string()); + } + if let Some(sr) = rs.last_received_sr() { + source_stats = source_stats + .field("sr-ntptime", sr.ntp_timestamp().as_u64()) + .field("sr-rtptime", sr.rtp_timestamp()) + .field("sr-octet-count", sr.octet_count()) + .field("sr-packet-count", sr.packet_count()); + } + if let Some(rb) = rs.last_sent_rb() { + source_stats = source_stats + .field("sent-rb-fraction-lost", rb.fraction_lost()) + .field("sent-rb-packets-lost", rb.cumulative_lost()) + .field( + "sent-rb-extended-sequence-number", + rb.extended_sequence_number(), + ) + .field("sent-rb-jitter", rb.jitter()) + .field("sent-rb-last-sr-ntp-time", rb.last_sr_ntp_time()) + .field( + "sent-rb-delay-since-last-sr-ntp-time", + rb.delay_since_last_sr(), + ); + } + let rbs = gst::List::new(rs.received_report_blocks().map( + |(sender_ssrc, ReceivedRb { rb, .. })| { + gst::Structure::builder("application/x-rtcp-report-block") + .field("sender-ssrc", sender_ssrc) + .field("rb-fraction-lost", rb.fraction_lost()) + .field("rb-packets-lost", rb.cumulative_lost()) + .field("rb-extended_sequence_number", rb.extended_sequence_number()) + .field("rb-jitter", rb.jitter()) + .field("rb-last-sr-ntp-time", rb.last_sr_ntp_time()) + .field("rb-delay_since_last-sr-ntp-time", rb.delay_since_last_sr()) + .build() + }, + )); + match rbs.len() { + 0 => (), + 1 => { + source_stats = + source_stats.field("report-blocks", rbs.first().unwrap().clone()); + } + _ => { + source_stats = source_stats.field("report-blocks", rbs); + } + } + session_stats = session_stats.field(rs.ssrc().to_string(), source_stats.build()); + } else if let Some(rr) = self.session.remote_receive_source_by_ssrc(ssrc) { + let source_stats = gst::Structure::builder("application/x-rtpbin2-source-stats") + .field("ssrc", rr.ssrc()) + .field("sender", false) + .field("local", false) + .build(); + session_stats = session_stats.field(rr.ssrc().to_string(), source_stats); + } + } + + session_stats.build() + } +} + +pub fn pt_clock_rate_from_caps(caps: &gst::CapsRef) -> Option<(u8, u32)> { + let Some(s) = caps.structure(0) else { + gst::debug!(CAT, "no structure!"); + return None; + }; + let Some((clock_rate, pt)) = Option::zip( + s.get::("clock-rate").ok(), + s.get::("payload").ok(), + ) else { + gst::debug!( + CAT, + "could not retrieve clock-rate and/or payload from structure" + ); + return None; + }; + if (0..=127).contains(&pt) && clock_rate > 0 { + Some((pt as u8, clock_rate as u32)) + } else { + gst::debug!( + CAT, + "payload value {pt} out of bounds or clock-rate {clock_rate} out of bounds" + ); + None + } +} + +static RUST_CAT: Lazy = Lazy::new(|| { + gst::DebugCategory::new( + "rust-log", + gst::DebugColorFlags::empty(), + Some("Logs from rust crates"), + ) +}); + +static GST_RUST_LOGGER_ONCE: once_cell::sync::OnceCell<()> = once_cell::sync::OnceCell::new(); +static GST_RUST_LOGGER: GstRustLogger = GstRustLogger {}; + +pub(crate) struct GstRustLogger {} + +impl GstRustLogger { + pub fn install() { + GST_RUST_LOGGER_ONCE.get_or_init(|| { + if log::set_logger(&GST_RUST_LOGGER).is_err() { + gst::warning!( + RUST_CAT, + "Cannot install log->gst logger, already installed?" + ); + } else { + log::set_max_level(GstRustLogger::debug_level_to_log_level_filter( + RUST_CAT.threshold(), + )); + gst::info!(RUST_CAT, "installed log->gst logger"); + } + }); + } + + fn debug_level_to_log_level_filter(level: gst::DebugLevel) -> log::LevelFilter { + match level { + gst::DebugLevel::None => log::LevelFilter::Off, + gst::DebugLevel::Error => log::LevelFilter::Error, + gst::DebugLevel::Warning => log::LevelFilter::Warn, + gst::DebugLevel::Fixme | gst::DebugLevel::Info => log::LevelFilter::Info, + gst::DebugLevel::Debug => log::LevelFilter::Debug, + gst::DebugLevel::Log | gst::DebugLevel::Trace | gst::DebugLevel::Memdump => { + log::LevelFilter::Trace + } + _ => log::LevelFilter::Trace, + } + } + + fn log_level_to_debug_level(level: log::Level) -> gst::DebugLevel { + match level { + log::Level::Error => gst::DebugLevel::Error, + log::Level::Warn => gst::DebugLevel::Warning, + log::Level::Info => gst::DebugLevel::Info, + log::Level::Debug => gst::DebugLevel::Debug, + log::Level::Trace => gst::DebugLevel::Trace, + } + } +} + +impl log::Log for GstRustLogger { + fn enabled(&self, metadata: &log::Metadata) -> bool { + RUST_CAT.above_threshold(GstRustLogger::log_level_to_debug_level(metadata.level())) + } + + fn log(&self, record: &log::Record) { + let gst_level = GstRustLogger::log_level_to_debug_level(record.metadata().level()); + let file = record + .file() + .map(glib::GString::from) + .unwrap_or_else(|| glib::GString::from("rust-log")); + let function = record.target(); + let line = record.line().unwrap_or(0); + RUST_CAT.log( + None::<&glib::Object>, + gst_level, + file.as_gstr(), + function, + line, + *record.args(), + ); + } + + fn flush(&self) {} +} diff --git a/net/rtp/src/rtpbin2/mod.rs b/net/rtp/src/rtpbin2/mod.rs index 214658f6..2dc81ca1 100644 --- a/net/rtp/src/rtpbin2/mod.rs +++ b/net/rtp/src/rtpbin2/mod.rs @@ -4,23 +4,34 @@ use gst::glib; use gst::prelude::*; use once_cell::sync::Lazy; mod config; -mod imp; +mod internal; mod jitterbuffer; +mod rtprecv; +mod rtpsend; mod session; mod source; mod sync; mod time; glib::wrapper! { - pub struct RtpBin2(ObjectSubclass) @extends gst::Element, gst::Object; + pub struct RtpSend(ObjectSubclass) @extends gst::Element, gst::Object; +} +glib::wrapper! { + pub struct RtpRecv(ObjectSubclass) @extends gst::Element, gst::Object; } pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { gst::Element::register( Some(plugin), - "rtpbin2", + "rtpsend", gst::Rank::NONE, - RtpBin2::static_type(), + RtpSend::static_type(), + )?; + gst::Element::register( + Some(plugin), + "rtprecv", + gst::Rank::NONE, + RtpRecv::static_type(), ) } diff --git a/net/rtp/src/rtpbin2/imp.rs b/net/rtp/src/rtpbin2/rtprecv.rs similarity index 51% rename from net/rtp/src/rtpbin2/imp.rs rename to net/rtp/src/rtpbin2/rtprecv.rs index bec89741..5f96018a 100644 --- a/net/rtp/src/rtpbin2/imp.rs +++ b/net/rtp/src/rtpbin2/rtprecv.rs @@ -3,144 +3,52 @@ use std::collections::{BTreeMap, HashMap}; use std::net::SocketAddr; use std::pin::Pin; -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, Mutex, MutexGuard}; use std::task::{Poll, Waker}; use std::time::{Duration, Instant, SystemTime}; -use futures::future::{AbortHandle, Abortable}; use futures::StreamExt; use gst::{glib, prelude::*, subclass::prelude::*}; use once_cell::sync::Lazy; +use super::internal::{pt_clock_rate_from_caps, GstRustLogger, SharedRtpState, SharedSession}; use super::jitterbuffer::{self, JitterBuffer}; use super::session::{ - KeyUnitRequestType, RecvReply, RequestRemoteKeyUnitReply, RtcpRecvReply, RtcpSendReply, - RtpProfile, SendReply, Session, RTCP_MIN_REPORT_INTERVAL, + KeyUnitRequestType, RecvReply, RequestRemoteKeyUnitReply, RtcpRecvReply, RtpProfile, + RTCP_MIN_REPORT_INTERVAL, }; -use super::source::{ReceivedRb, SourceState}; +use super::source::SourceState; use super::sync; -use crate::rtpbin2::config::RtpBin2Session; use crate::rtpbin2::RUNTIME; const DEFAULT_LATENCY: gst::ClockTime = gst::ClockTime::from_mseconds(200); -const DEFAULT_MIN_RTCP_INTERVAL: Duration = RTCP_MIN_REPORT_INTERVAL; -const DEFAULT_REDUCED_SIZE_RTCP: bool = false; static CAT: Lazy = Lazy::new(|| { gst::DebugCategory::new( - "rtpbin2", + "rtprecv", gst::DebugColorFlags::empty(), - Some("RTP management bin"), + Some("RTP session receiver"), ) }); -#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, glib::Enum)] -#[repr(u32)] -#[enum_type(name = "GstRtpBin2Profile")] -enum Profile { - #[default] - #[enum_value(name = "AVP profile as specified in RFC 3550", nick = "avp")] - Avp, - #[enum_value(name = "AVPF profile as specified in RFC 4585", nick = "avpf")] - Avpf, -} - -impl From for Profile { - fn from(value: RtpProfile) -> Self { - match value { - RtpProfile::Avp => Self::Avp, - RtpProfile::Avpf => Self::Avpf, - } - } -} - -impl From for RtpProfile { - fn from(value: Profile) -> Self { - match value { - Profile::Avp => Self::Avp, - Profile::Avpf => Self::Avpf, - } - } -} - #[derive(Debug, Clone)] struct Settings { + rtp_id: String, latency: gst::ClockTime, - min_rtcp_interval: Duration, - profile: Profile, - reduced_size_rtcp: bool, timestamping_mode: sync::TimestampingMode, } impl Default for Settings { fn default() -> Self { Settings { + rtp_id: String::from("rtp-id"), latency: DEFAULT_LATENCY, - min_rtcp_interval: DEFAULT_MIN_RTCP_INTERVAL, - profile: Profile::default(), - reduced_size_rtcp: DEFAULT_REDUCED_SIZE_RTCP, timestamping_mode: sync::TimestampingMode::default(), } } } -#[derive(Debug)] -#[must_use = "futures/streams/sinks do nothing unless you `.await` or poll them"] -struct RtcpSendStream { - state: Arc>, - sleep: Pin>, -} - -impl RtcpSendStream { - fn new(state: Arc>) -> Self { - Self { - state, - sleep: Box::pin(tokio::time::sleep(Duration::from_secs(1))), - } - } -} - -impl futures::stream::Stream for RtcpSendStream { - type Item = (usize, RtcpSendReply); - - fn poll_next( - self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - let mut state = self.state.lock().unwrap(); - let now = Instant::now(); - let ntp_now = SystemTime::now(); - let mut lowest_wait = None; - for session in state.sessions.iter_mut() { - let mut session = session.inner.lock().unwrap(); - if let Some(reply) = session.session.poll_rtcp_send(now, ntp_now) { - return Poll::Ready(Some((session.id, reply))); - } - if let Some(wait) = session.session.poll_rtcp_send_timeout(now) { - if lowest_wait.map_or(true, |lowest_wait| wait < lowest_wait) { - lowest_wait = Some(wait); - } - } - } - state.rtcp_waker = Some(cx.waker().clone()); - drop(state); - - // default to the minimum initial rtcp delay so we don't busy loop if there are no sessions or no - // timeouts available - let lowest_wait = - lowest_wait.unwrap_or(now + crate::rtpbin2::session::RTCP_MIN_REPORT_INTERVAL / 2); - let this = self.get_mut(); - this.sleep.as_mut().reset(lowest_wait.into()); - if !std::future::Future::poll(this.sleep.as_mut(), cx).is_pending() { - // wake us again if the delay is not pending for another go at finding the next timeout - // value - cx.waker().wake_by_ref(); - } - Poll::Pending - } -} - #[derive(Debug)] #[must_use = "futures/streams/sinks do nothing unless you `.await` or poll them"] struct JitterBufferStream { @@ -256,25 +164,25 @@ impl PartialEq for RtpRecvSrcPad { impl Eq for RtpRecvSrcPad {} impl RtpRecvSrcPad { - fn activate(&mut self, session: &BinSession) { - let session_inner = session.inner.lock().unwrap(); - let seqnum = session_inner.rtp_recv_sink_seqnum.unwrap(); + fn activate(&mut self, state: MutexGuard, session_id: usize) { + let session = state.session_by_id(session_id).unwrap(); + let seqnum = session.rtp_recv_sink_seqnum.unwrap(); let stream_id = format!("{}/{}", self.pt, self.ssrc); let stream_start = gst::event::StreamStart::builder(&stream_id) - .group_id(session_inner.rtp_recv_sink_group_id.unwrap()) + .group_id(session.rtp_recv_sink_group_id.unwrap()) .seqnum(seqnum) .build(); + let session_inner = session.internal_session.inner.lock().unwrap(); let caps = session_inner.caps_from_pt(self.pt); let caps = gst::event::Caps::builder(&caps).seqnum(seqnum).build(); - - let segment = - gst::event::Segment::builder(session_inner.rtp_recv_sink_segment.as_ref().unwrap()) - .seqnum(seqnum) - .build(); - drop(session_inner); + let segment = gst::event::Segment::builder(session.rtp_recv_sink_segment.as_ref().unwrap()) + .seqnum(seqnum) + .build(); + drop(state); + self.pad.set_active(true).unwrap(); let _ = self.pad.store_sticky_event(&stream_start); let _ = self.pad.store_sticky_event(&caps); @@ -290,38 +198,9 @@ struct HeldRecvBuffer { new_pad: bool, } -#[derive(Debug, Clone)] -pub struct BinSession { - id: usize, - inner: Arc>, - config: RtpBin2Session, -} - -impl BinSession { - fn new(id: usize, settings: &Settings) -> Self { - let mut inner = BinSessionInner::new(id); - inner - .session - .set_min_rtcp_interval(settings.min_rtcp_interval); - inner.session.set_profile(settings.profile.into()); - inner - .session - .set_reduced_size_rtcp(settings.reduced_size_rtcp); - let inner = Arc::new(Mutex::new(inner)); - let weak_inner = Arc::downgrade(&inner); - Self { - id, - inner, - config: RtpBin2Session::new(weak_inner), - } - } -} - #[derive(Debug)] -pub(crate) struct BinSessionInner { - id: usize, - - session: Session, +struct RecvSession { + internal_session: SharedSession, // State for received RTP streams rtp_recv_sinkpad: Option, @@ -330,229 +209,36 @@ pub(crate) struct BinSessionInner { rtp_recv_sink_segment: Option>, rtp_recv_sink_seqnum: Option, - pt_map: HashMap, recv_store: Vec, rtp_recv_srcpads: Vec, recv_flow_combiner: Arc>, - // State for sending RTP streams - rtp_send_sinkpad: Option, - rtp_send_srcpad: Option, - rtcp_recv_sinkpad: Option, - rtcp_send_srcpad: Option, } -impl BinSessionInner { - fn new(id: usize) -> Self { +impl RecvSession { + fn new(shared_state: &SharedRtpState, id: usize) -> Self { + let internal_session = shared_state.session_get_or_init(id, || { + SharedSession::new(id, RtpProfile::Avp, RTCP_MIN_REPORT_INTERVAL, false) + }); Self { - id, - - session: Session::new(), - + internal_session, rtp_recv_sinkpad: None, rtp_recv_sink_group_id: None, rtp_recv_sink_caps: None, rtp_recv_sink_segment: None, rtp_recv_sink_seqnum: None, - pt_map: HashMap::default(), recv_store: vec![], rtp_recv_srcpads: vec![], recv_flow_combiner: Arc::new(Mutex::new(gst_base::UniqueFlowCombiner::new())), - rtp_send_sinkpad: None, - rtp_send_srcpad: None, - rtcp_recv_sinkpad: None, - rtcp_send_srcpad: None, } } - pub fn clear_pt_map(&mut self) { - self.pt_map.clear(); - } - - pub fn add_caps(&mut self, caps: gst::Caps) { - let Some((pt, clock_rate)) = pt_clock_rate_from_caps(&caps) else { - return; - }; - let caps_clone = caps.clone(); - self.pt_map - .entry(pt) - .and_modify(move |entry| *entry = caps) - .or_insert_with(move || caps_clone); - self.session.set_pt_clock_rate(pt, clock_rate); - } - - fn caps_from_pt(&self, pt: u8) -> gst::Caps { - self.pt_map.get(&pt).cloned().unwrap_or( - gst::Caps::builder("application/x-rtp") - .field("payload", pt as i32) - .build(), - ) - } - - pub fn pt_map(&self) -> impl Iterator + '_ { - self.pt_map.iter().map(|(&k, v)| (k, v)) - } - - pub fn stats(&self) -> gst::Structure { - let mut session_stats = gst::Structure::builder("application/x-rtpbin2-session-stats") - .field("id", self.id as u64); - for ssrc in self.session.ssrcs() { - if let Some(ls) = self.session.local_send_source_by_ssrc(ssrc) { - let mut source_stats = - gst::Structure::builder("application/x-rtpbin2-source-stats") - .field("ssrc", ls.ssrc()) - .field("sender", true) - .field("local", true) - .field("packets-sent", ls.packet_count()) - .field("octets-sent", ls.octet_count()) - .field("bitrate", ls.bitrate() as u64); - if let Some(pt) = ls.payload_type() { - if let Some(clock_rate) = self.session.clock_rate_from_pt(pt) { - source_stats = source_stats.field("clock-rate", clock_rate); - } - } - if let Some(sr) = ls.last_sent_sr() { - source_stats = source_stats - .field("sr-ntptime", sr.ntp_timestamp().as_u64()) - .field("sr-rtptime", sr.rtp_timestamp()) - .field("sr-octet-count", sr.octet_count()) - .field("sr-packet-count", sr.packet_count()); - } - let rbs = gst::List::new(ls.received_report_blocks().map( - |(sender_ssrc, ReceivedRb { rb, .. })| { - gst::Structure::builder("application/x-rtcp-report-block") - .field("sender-ssrc", sender_ssrc) - .field("rb-fraction-lost", rb.fraction_lost()) - .field("rb-packets-lost", rb.cumulative_lost()) - .field("rb-extended_sequence_number", rb.extended_sequence_number()) - .field("rb-jitter", rb.jitter()) - .field("rb-last-sr-ntp-time", rb.last_sr_ntp_time()) - .field("rb-delay_since_last-sr-ntp-time", rb.delay_since_last_sr()) - .build() - }, - )); - match rbs.len() { - 0 => (), - 1 => { - source_stats = - source_stats.field("report-blocks", rbs.first().unwrap().clone()); - } - _ => { - source_stats = source_stats.field("report-blocks", rbs); - } - } - - // TODO: add jitter, packets-lost - session_stats = session_stats.field(ls.ssrc().to_string(), source_stats.build()); - } else if let Some(lr) = self.session.local_receive_source_by_ssrc(ssrc) { - let mut source_stats = - gst::Structure::builder("application/x-rtpbin2-source-stats") - .field("ssrc", lr.ssrc()) - .field("sender", false) - .field("local", true); - if let Some(pt) = lr.payload_type() { - if let Some(clock_rate) = self.session.clock_rate_from_pt(pt) { - source_stats = source_stats.field("clock-rate", clock_rate); - } - } - // TODO: add rb stats - session_stats = session_stats.field(lr.ssrc().to_string(), source_stats.build()); - } else if let Some(rs) = self.session.remote_send_source_by_ssrc(ssrc) { - let mut source_stats = - gst::Structure::builder("application/x-rtpbin2-source-stats") - .field("ssrc", rs.ssrc()) - .field("sender", true) - .field("local", false) - .field("octets-received", rs.octet_count()) - .field("packets-received", rs.packet_count()) - .field("bitrate", rs.bitrate() as u64) - .field("jitter", rs.jitter()) - .field("packets-lost", rs.packets_lost()); - if let Some(pt) = rs.payload_type() { - if let Some(clock_rate) = self.session.clock_rate_from_pt(pt) { - source_stats = source_stats.field("clock-rate", clock_rate); - } - } - if let Some(rtp_from) = rs.rtp_from() { - source_stats = source_stats.field("rtp-from", rtp_from.to_string()); - } - if let Some(rtcp_from) = rs.rtcp_from() { - source_stats = source_stats.field("rtcp-from", rtcp_from.to_string()); - } - if let Some(sr) = rs.last_received_sr() { - source_stats = source_stats - .field("sr-ntptime", sr.ntp_timestamp().as_u64()) - .field("sr-rtptime", sr.rtp_timestamp()) - .field("sr-octet-count", sr.octet_count()) - .field("sr-packet-count", sr.packet_count()); - } - if let Some(rb) = rs.last_sent_rb() { - source_stats = source_stats - .field("sent-rb-fraction-lost", rb.fraction_lost()) - .field("sent-rb-packets-lost", rb.cumulative_lost()) - .field( - "sent-rb-extended-sequence-number", - rb.extended_sequence_number(), - ) - .field("sent-rb-jitter", rb.jitter()) - .field("sent-rb-last-sr-ntp-time", rb.last_sr_ntp_time()) - .field( - "sent-rb-delay-since-last-sr-ntp-time", - rb.delay_since_last_sr(), - ); - } - let rbs = gst::List::new(rs.received_report_blocks().map( - |(sender_ssrc, ReceivedRb { rb, .. })| { - gst::Structure::builder("application/x-rtcp-report-block") - .field("sender-ssrc", sender_ssrc) - .field("rb-fraction-lost", rb.fraction_lost()) - .field("rb-packets-lost", rb.cumulative_lost()) - .field("rb-extended_sequence_number", rb.extended_sequence_number()) - .field("rb-jitter", rb.jitter()) - .field("rb-last-sr-ntp-time", rb.last_sr_ntp_time()) - .field("rb-delay_since_last-sr-ntp-time", rb.delay_since_last_sr()) - .build() - }, - )); - match rbs.len() { - 0 => (), - 1 => { - source_stats = - source_stats.field("report-blocks", rbs.first().unwrap().clone()); - } - _ => { - source_stats = source_stats.field("report-blocks", rbs); - } - } - session_stats = session_stats.field(rs.ssrc().to_string(), source_stats.build()); - } else if let Some(rr) = self.session.remote_receive_source_by_ssrc(ssrc) { - let source_stats = gst::Structure::builder("application/x-rtpbin2-source-stats") - .field("ssrc", rr.ssrc()) - .field("sender", false) - .field("local", false) - .build(); - session_stats = session_stats.field(rr.ssrc().to_string(), source_stats); - } - } - - let jb_stats = gst::List::new(self.rtp_recv_srcpads.iter().map(|pad| { - let mut jb_stats = pad.jitter_buffer_store.lock().unwrap().jitterbuffer.stats(); - jb_stats.set_value("ssrc", (pad.ssrc as i32).to_send_value()); - jb_stats.set_value("pt", (pad.pt as i32).to_send_value()); - jb_stats - })); - - session_stats = session_stats.field("jitterbuffer-stats", jb_stats); - - session_stats.build() - } - fn start_rtp_recv_task(&mut self, pad: &gst::Pad) -> Result<(), glib::BoolError> { gst::debug!(CAT, obj: pad, "Starting rtp recv src task"); @@ -598,7 +284,7 @@ impl BinSessionInner { JitterBufferItem::Query(mut query, tx) => { // This is safe because the thread holding the original reference is waiting // for us exclusively - let res = pad.query(unsafe { query.as_mut() }); + let res = pad.peer_query(unsafe { query.as_mut() }); let _ = tx.send(res); } } @@ -630,7 +316,7 @@ impl BinSessionInner { fn get_or_create_rtp_recv_src( &mut self, - rtpbin: &RtpBin2, + rtpbin: &RtpRecv, pt: u8, ssrc: u32, ) -> (RtpRecvSrcPad, bool) { @@ -641,25 +327,25 @@ impl BinSessionInner { { (pad.clone(), false) } else { - let src_templ = rtpbin.obj().pad_template("rtp_recv_src_%u_%u_%u").unwrap(); - let id = self.id; + let src_templ = rtpbin.obj().pad_template("rtp_src_%u_%u_%u").unwrap(); + let id = self.internal_session.id; let srcpad = gst::Pad::builder_from_template(&src_templ) .iterate_internal_links_function(|pad, parent| { - RtpBin2::catch_panic_pad_function( + RtpRecv::catch_panic_pad_function( parent, || gst::Iterator::from_vec(vec![]), |this| this.iterate_internal_links(pad), ) }) .query_function(|pad, parent, query| { - RtpBin2::catch_panic_pad_function( + RtpRecv::catch_panic_pad_function( parent, || false, |this| this.src_query(pad, query), ) }) .event_function(move |pad, parent, event| { - RtpBin2::catch_panic_pad_function( + RtpRecv::catch_panic_pad_function( parent, || false, |this| this.rtp_recv_src_event(pad, event, id, pt, ssrc), @@ -671,13 +357,13 @@ impl BinSessionInner { let Some(this) = this.upgrade() else { return Err(gst::LoggableError::new( *CAT, - glib::bool_error!("rtpbin does not exist anymore"), + glib::bool_error!("rtprecv does not exist anymore"), )); }; this.rtp_recv_src_activatemode(pad, mode, active, id) } }) - .name(format!("rtp_recv_src_{}_{}_{}", self.id, pt, ssrc)) + .name(format!("rtp_src_{}_{}_{}", id, pt, ssrc)) .build(); srcpad.use_fixed_caps(); @@ -707,41 +393,53 @@ impl BinSessionInner { #[derive(Debug, Default)] struct State { - sessions: Vec, - rtcp_waker: Option, + shared_state: Option, + sessions: Vec, max_session_id: usize, pads_session_id_map: HashMap, sync_context: Option, } impl State { - fn session_by_id(&self, id: usize) -> Option<&BinSession> { - self.sessions.iter().find(|session| session.id == id) + fn session_by_id(&self, id: usize) -> Option<&RecvSession> { + self.sessions + .iter() + .find(|session| session.internal_session.id == id) + } + + fn mut_session_by_id(&mut self, id: usize) -> Option<&mut RecvSession> { + self.sessions + .iter_mut() + .find(|session| session.internal_session.id == id) } fn stats(&self) -> gst::Structure { - let mut ret = gst::Structure::builder("application/x-rtpbin2-stats"); + let mut ret = gst::Structure::builder("application/x-rtp2-stats"); for session in self.sessions.iter() { - let sess_id = session.id; - let session = session.inner.lock().unwrap(); + let sess_id = session.internal_session.id; + let session_inner = session.internal_session.inner.lock().unwrap(); - ret = ret.field(sess_id.to_string(), session.stats()); + let mut session_stats = session_inner.stats(); + let jb_stats = gst::List::new(session.rtp_recv_srcpads.iter().map(|pad| { + let mut jb_stats = pad.jitter_buffer_store.lock().unwrap().jitterbuffer.stats(); + jb_stats.set_value("ssrc", (pad.ssrc as i32).to_send_value()); + jb_stats.set_value("pt", (pad.pt as i32).to_send_value()); + jb_stats + })); + + session_stats.set("jitterbuffer-stats", jb_stats); + ret = ret.field(sess_id.to_string(), session_stats); } ret.build() } } -pub struct RtpBin2 { +pub struct RtpRecv { settings: Mutex, state: Arc>, - rtcp_task: Mutex>, } -struct RtcpTask { - abort_handle: AbortHandle, -} - -impl RtpBin2 { +impl RtpRecv { fn rtp_recv_src_activatemode( &self, pad: &gst::Pad, @@ -750,8 +448,8 @@ impl RtpBin2 { id: usize, ) -> Result<(), gst::LoggableError> { if let gst::PadMode::Push = mode { - let state = self.state.lock().unwrap(); - let Some(session) = state.session_by_id(id) else { + let mut state = self.state.lock().unwrap(); + let Some(session) = state.mut_session_by_id(id) else { if active { return Err(gst::LoggableError::new( *CAT, @@ -762,12 +460,10 @@ impl RtpBin2 { } }; - let mut session = session.inner.lock().unwrap(); if active { session.start_rtp_recv_task(pad)?; } else { session.stop_rtp_recv_task(pad)?; - drop(session); gst::debug!(CAT, obj: pad, "Stopping task"); @@ -783,60 +479,6 @@ impl RtpBin2 { } } - fn start_rtcp_task(&self) { - let mut rtcp_task = self.rtcp_task.lock().unwrap(); - - if rtcp_task.is_some() { - return; - } - - // run the runtime from another task to prevent the "start a runtime from within a runtime" panic - // when the plugin is statically linked. - let (abort_handle, abort_registration) = AbortHandle::new_pair(); - let state = self.state.clone(); - RUNTIME.spawn(async move { - let future = Abortable::new(Self::rtcp_task(state), abort_registration); - future.await - }); - - rtcp_task.replace(RtcpTask { abort_handle }); - } - - async fn rtcp_task(state: Arc>) { - let mut stream = RtcpSendStream::new(state.clone()); - while let Some((session_id, reply)) = stream.next().await { - let state = state.lock().unwrap(); - let Some(session) = state.session_by_id(session_id) else { - continue; - }; - match reply { - RtcpSendReply::Data(data) => { - let Some(rtcp_srcpad) = session.inner.lock().unwrap().rtcp_send_srcpad.clone() - else { - continue; - }; - RUNTIME.spawn_blocking(move || { - let buffer = gst::Buffer::from_mut_slice(data); - if let Err(e) = rtcp_srcpad.push(buffer) { - gst::warning!(CAT, obj: rtcp_srcpad, "Failed to send rtcp data: flow return {e:?}"); - } - }); - } - RtcpSendReply::SsrcBye(ssrc) => { - session.config.emit_by_name::<()>("bye-ssrc", &[&ssrc]) - } - } - } - } - - fn stop_rtcp_task(&self) { - let mut rtcp_task = self.rtcp_task.lock().unwrap(); - - if let Some(rtcp) = rtcp_task.take() { - rtcp.abort_handle.abort(); - } - } - pub fn src_query(&self, pad: &gst::Pad, query: &mut gst::QueryRef) -> bool { gst::log!(CAT, obj: pad, "Handling query {query:?}"); @@ -869,7 +511,6 @@ impl RtpBin2 { let state = self.state.lock().unwrap(); if let Some(&id) = state.pads_session_id_map.get(pad) { if let Some(session) = state.session_by_id(id) { - let session = session.inner.lock().unwrap(); if let Some(ref sinkpad) = session.rtp_recv_sinkpad { if sinkpad == pad { let pads = session @@ -884,15 +525,6 @@ impl RtpBin2 { return gst::Iterator::from_vec(vec![sinkpad.clone()]); } } - if let Some(ref sinkpad) = session.rtp_send_sinkpad { - if let Some(ref srcpad) = session.rtp_send_srcpad { - if sinkpad == pad { - return gst::Iterator::from_vec(vec![srcpad.clone()]); - } else if srcpad == pad { - return gst::Iterator::from_vec(vec![sinkpad.clone()]); - } - } - } // nothing to do for rtcp pads } } @@ -906,7 +538,7 @@ impl RtpBin2 { mut buffer: gst::Buffer, ) -> Result { let mut state = self.state.lock().unwrap(); - let Some(session) = state.session_by_id(id) else { + let Some(session) = state.mut_session_by_id(id) else { return Err(gst::FlowError::Error); }; @@ -918,8 +550,7 @@ impl RtpBin2 { // TCP. let arrival_time = match buffer.dts() { Some(dts) => { - let session_inner = session.inner.lock().unwrap(); - let segment = session_inner.rtp_recv_sink_segment.as_ref().unwrap(); + let segment = session.rtp_recv_sink_segment.as_ref().unwrap(); // TODO: use running_time_full if we care to support that match segment.to_running_time(dts) { Some(time) => time, @@ -971,29 +602,26 @@ impl RtpBin2 { } }; - let session = session.clone(); + let internal_session = session.internal_session.clone(); - let mut session_inner = session.inner.lock().unwrap(); + let mut session_inner = internal_session.inner.lock().unwrap(); - let current_caps = session_inner.rtp_recv_sink_caps.clone(); - if let std::collections::hash_map::Entry::Vacant(e) = - session_inner.pt_map.entry(rtp.payload_type()) + if state + .sync_context + .as_ref() + .unwrap() + .clock_rate(rtp.ssrc()) + .is_none() { - if let Some(mut caps) = current_caps.filter(|caps| clock_rate_from_caps(caps).is_some()) - { - state - .sync_context - .as_mut() - .unwrap() - .set_clock_rate(rtp.ssrc(), clock_rate_from_caps(&caps).unwrap()); - { - // Ensure the caps we send out hold a payload field - let caps = caps.make_mut(); - let s = caps.structure_mut(0).unwrap(); - s.set("payload", rtp.payload_type() as i32); - } - e.insert(caps); - } + let clock_rate = session_inner + .session + .clock_rate_from_pt(rtp.payload_type()) + .unwrap(); + state + .sync_context + .as_mut() + .unwrap() + .set_clock_rate(rtp.ssrc(), clock_rate); } // TODO: Put NTP time as `gst::ReferenceTimeStampMeta` on the buffers if selected via property @@ -1002,14 +630,13 @@ impl RtpBin2 { rtp.timestamp(), arrival_time.nseconds(), ); - let segment = session_inner.rtp_recv_sink_segment.as_ref().unwrap(); + let session = state.mut_session_by_id(id).unwrap(); + let segment = session.rtp_recv_sink_segment.as_ref().unwrap(); let pts = segment .position_from_running_time(gst::ClockTime::from_nseconds(pts)) .unwrap(); gst::debug!(CAT, "Calculated PTS: {}", pts); - drop(state); - let now = Instant::now(); let mut buffers_to_push = vec![]; loop { @@ -1017,8 +644,10 @@ impl RtpBin2 { RecvReply::SsrcCollision(_ssrc) => (), // TODO: handle ssrc collision RecvReply::NewSsrc(ssrc, _pt) => { drop(session_inner); - session.config.emit_by_name::<()>("new-ssrc", &[&ssrc]); - session_inner = session.inner.lock().unwrap(); + internal_session + .config + .emit_by_name::<()>("new-ssrc", &[&ssrc]); + session_inner = internal_session.inner.lock().unwrap(); } RecvReply::Hold(hold_id) => { let pt = rtp.payload_type(); @@ -1028,8 +657,8 @@ impl RtpBin2 { let buf_mut = buffer.make_mut(); buf_mut.set_pts(pts); } - let (pad, new_pad) = session_inner.get_or_create_rtp_recv_src(self, pt, ssrc); - session_inner.recv_store.push(HeldRecvBuffer { + let (pad, new_pad) = session.get_or_create_rtp_recv_src(self, pt, ssrc); + session.recv_store.push(HeldRecvBuffer { hold_id: Some(hold_id), buffer, pad, @@ -1038,21 +667,21 @@ impl RtpBin2 { break; } RecvReply::Drop(hold_id) => { - if let Some(pos) = session_inner + if let Some(pos) = session .recv_store .iter() .position(|b| b.hold_id.unwrap() == hold_id) { - session_inner.recv_store.remove(pos); + session.recv_store.remove(pos); } } RecvReply::Forward(hold_id) => { - if let Some(pos) = session_inner + if let Some(pos) = session .recv_store .iter() .position(|b| b.hold_id.unwrap() == hold_id) { - buffers_to_push.push(session_inner.recv_store.remove(pos)); + buffers_to_push.push(session.recv_store.remove(pos)); } else { unreachable!(); } @@ -1066,7 +695,7 @@ impl RtpBin2 { let buf_mut = buffer.make_mut(); buf_mut.set_pts(pts); } - let (pad, new_pad) = session_inner.get_or_create_rtp_recv_src(self, pt, ssrc); + let (pad, new_pad) = session.get_or_create_rtp_recv_src(self, pt, ssrc); buffers_to_push.push(HeldRecvBuffer { hold_id: None, buffer, @@ -1083,11 +712,11 @@ impl RtpBin2 { for mut held in buffers_to_push { // TODO: handle other processing if held.new_pad { - held.pad.activate(&session); - self.obj().add_pad(&held.pad.pad).unwrap(); - let mut state = self.state.lock().unwrap(); state.pads_session_id_map.insert(held.pad.pad.clone(), id); - drop(state); + // drops the state lock + held.pad.activate(state, id); + self.obj().add_pad(&held.pad.pad).unwrap(); + state = self.state.lock().unwrap(); } let mapped = held.buffer.map_readable().map_err(|e| { @@ -1136,53 +765,6 @@ impl RtpBin2 { Ok(gst::FlowSuccess::Ok) } - fn rtp_send_sink_chain( - &self, - id: usize, - buffer: gst::Buffer, - ) -> Result { - let state = self.state.lock().unwrap(); - let Some(session) = state.session_by_id(id) else { - gst::error!(CAT, "No session?"); - return Err(gst::FlowError::Error); - }; - - let mapped = buffer.map_readable().map_err(|e| { - gst::error!(CAT, imp: self, "Failed to map input buffer {e:?}"); - gst::FlowError::Error - })?; - let rtp = match rtp_types::RtpPacket::parse(&mapped) { - Ok(rtp) => rtp, - Err(e) => { - gst::error!(CAT, imp: self, "Failed to parse input as valid rtp packet: {e:?}"); - return Ok(gst::FlowSuccess::Ok); - } - }; - - let session = session.clone(); - let mut session_inner = session.inner.lock().unwrap(); - drop(state); - - let now = Instant::now(); - loop { - match session_inner.session.handle_send(&rtp, now) { - SendReply::SsrcCollision(_ssrc) => (), // TODO: handle ssrc collision - SendReply::NewSsrc(ssrc, _pt) => { - drop(session_inner); - session.config.emit_by_name::<()>("new-ssrc", &[&ssrc]); - session_inner = session.inner.lock().unwrap(); - } - SendReply::Passthrough => break, - SendReply::Drop => return Ok(gst::FlowSuccess::Ok), - } - } - // TODO: handle other processing - drop(mapped); - let srcpad = session_inner.rtp_send_srcpad.clone().unwrap(); - drop(session_inner); - srcpad.push(buffer) - } - fn rtcp_recv_sink_chain( &self, id: usize, @@ -1215,10 +797,8 @@ impl RtpBin2 { } }; - let session = session.clone(); - let mut session_inner = session.inner.lock().unwrap(); - let waker = state.rtcp_waker.clone(); - drop(state); + let internal_session = session.internal_session.clone(); + let mut session_inner = internal_session.inner.lock().unwrap(); let now = Instant::now(); let ntp_now = SystemTime::now(); @@ -1228,17 +808,23 @@ impl RtpBin2 { .handle_rtcp_recv(rtcp, mapped.len(), addr, now, ntp_now); let rtp_send_sinkpad = session_inner.rtp_send_sinkpad.clone(); drop(session_inner); + drop(state); for reply in replies { match reply { RtcpRecvReply::NewSsrc(ssrc) => { - session.config.emit_by_name::<()>("new-ssrc", &[&ssrc]); + internal_session + .config + .emit_by_name::<()>("new-ssrc", &[&ssrc]); } RtcpRecvReply::SsrcCollision(_ssrc) => (), // TODO: handle ssrc collision RtcpRecvReply::TimerReconsideration => { - if let Some(ref waker) = waker { + let state = self.state.lock().unwrap(); + let session = state.session_by_id(id).unwrap(); + let mut session_inner = session.internal_session.inner.lock().unwrap(); + if let Some(waker) = session_inner.rtcp_waker.take() { // reconsider timers means that we wake the rtcp task to get a new timeout - waker.wake_by_ref(); + waker.wake(); } } RtcpRecvReply::RequestKeyUnit { ssrcs, fir } => { @@ -1269,9 +855,9 @@ impl RtpBin2 { .unwrap() .add_sender_report(ssrc, rtp, ntp); } - RtcpRecvReply::SsrcBye(ssrc) => { - session.config.emit_by_name::<()>("bye-ssrc", &[&ssrc]) - } + RtcpRecvReply::SsrcBye(ssrc) => internal_session + .config + .emit_by_name::<()>("bye-ssrc", &[&ssrc]), } } drop(mapped); @@ -1279,67 +865,6 @@ impl RtpBin2 { Ok(gst::FlowSuccess::Ok) } - fn rtp_send_sink_event(&self, pad: &gst::Pad, event: gst::Event, id: usize) -> bool { - match event.view() { - gst::EventView::Caps(caps) => { - if let Some((pt, clock_rate)) = pt_clock_rate_from_caps(caps.caps()) { - let state = self.state.lock().unwrap(); - if let Some(session) = state.session_by_id(id) { - let mut session = session.inner.lock().unwrap(); - session.session.set_pt_clock_rate(pt, clock_rate); - } - } else { - gst::warning!(CAT, obj: pad, "input caps are missing payload or clock-rate fields"); - } - gst::Pad::event_default(pad, Some(&*self.obj()), event) - } - gst::EventView::Eos(_eos) => { - let now = Instant::now(); - let mut state = self.state.lock().unwrap(); - if let Some(session) = state.session_by_id(id) { - let mut session = session.inner.lock().unwrap(); - let ssrcs = session.session.ssrcs().collect::>(); - // We want to bye all relevant ssrc's here. - // Relevant means they will not be used by something else which means that any - // local send ssrc that is not being used for Sr/Rr reports (internal_ssrc) can - // have the Bye state applied. - let mut all_local = true; - let internal_ssrc = session.session.internal_ssrc(); - for ssrc in ssrcs { - let Some(local_send) = session.session.mut_local_send_source_by_ssrc(ssrc) - else { - if let Some(local_recv) = - session.session.local_receive_source_by_ssrc(ssrc) - { - if local_recv.state() != SourceState::Bye - && Some(ssrc) != internal_ssrc - { - all_local = false; - } - } - continue; - }; - if Some(ssrc) != internal_ssrc { - local_send.mark_bye("End of Stream") - } - } - if all_local { - // if there are no non-local send ssrc's, then we can Bye the entire - // session. - session.session.schedule_bye("End of Stream", now); - } - drop(session); - if let Some(waker) = state.rtcp_waker.take() { - waker.wake(); - } - } - drop(state); - gst::Pad::event_default(pad, Some(&*self.obj()), event) - } - _ => gst::Pad::event_default(pad, Some(&*self.obj()), event), - } - } - pub fn rtp_recv_sink_query( &self, pad: &gst::Pad, @@ -1353,8 +878,6 @@ impl RtpBin2 { let mut ret = true; if let Some(session) = state.session_by_id(id) { - let session = session.inner.lock().unwrap(); - let jb_stores: Vec>> = session .rtp_recv_srcpads .iter() @@ -1362,7 +885,7 @@ impl RtpBin2 { .map(|p| p.jitter_buffer_store.clone()) .collect(); - drop(session); + drop(state); let query = std::ptr::NonNull::from(query); @@ -1395,6 +918,8 @@ impl RtpBin2 { .store .insert(id, JitterBufferItem::Query(query, query_tx)); + drop(jitterbuffer_store); + // Now block until the jitterbuffer has processed the query match query_rx.recv() { Ok(res) => { @@ -1424,7 +949,6 @@ impl RtpBin2 { fn rtp_recv_sink_queue_serialized_event(&self, id: usize, event: gst::Event) -> bool { let state = self.state.lock().unwrap(); if let Some(session) = state.session_by_id(id) { - let session = session.inner.lock().unwrap(); for srcpad in session .rtp_recv_srcpads .iter() @@ -1453,11 +977,9 @@ impl RtpBin2 { fn rtp_recv_sink_event(&self, pad: &gst::Pad, mut event: gst::Event, id: usize) -> bool { match event.view() { gst::EventView::StreamStart(stream_start) => { - let state = self.state.lock().unwrap(); - - if let Some(session) = state.session_by_id(id) { - let mut session = session.inner.lock().unwrap(); + let mut state = self.state.lock().unwrap(); + if let Some(session) = state.mut_session_by_id(id) { let group_id = stream_start.group_id(); session.rtp_recv_sink_group_id = Some(group_id.unwrap_or_else(gst::GroupId::next)); @@ -1466,28 +988,26 @@ impl RtpBin2 { true } gst::EventView::Caps(caps) => { - let state = self.state.lock().unwrap(); + let mut state = self.state.lock().unwrap(); - if let Some(session) = state.session_by_id(id) { - let mut session = session.inner.lock().unwrap(); - let caps = caps.caps_owned(); + if let Some((pt, clock_rate)) = pt_clock_rate_from_caps(caps.caps()) { + if let Some(session) = state.mut_session_by_id(id) { + let caps = caps.caps_owned(); + session.rtp_recv_sink_caps = Some(caps.clone()); - if let Some((pt, clock_rate)) = pt_clock_rate_from_caps(&caps) { - session.session.set_pt_clock_rate(pt, clock_rate); - } else { - gst::warning!(CAT, obj: pad, "input caps are missing payload or clock-rate fields"); + let mut session_inner = session.internal_session.inner.lock().unwrap(); + session_inner.session.set_pt_clock_rate(pt, clock_rate); + session_inner.add_caps(caps); } - - session.rtp_recv_sink_caps = Some(caps); + } else { + gst::warning!(CAT, obj: pad, "input caps are missing payload or clock-rate fields"); } true } gst::EventView::Segment(segment) => { - let state = self.state.lock().unwrap(); - - if let Some(session) = state.session_by_id(id) { - let mut session = session.inner.lock().unwrap(); + let mut state = self.state.lock().unwrap(); + if let Some(session) = state.mut_session_by_id(id) { let segment = segment.segment(); let segment = match segment.downcast_ref::() { Some(segment) => segment.clone(), @@ -1515,9 +1035,9 @@ impl RtpBin2 { } gst::EventView::Eos(_eos) => { let now = Instant::now(); - let mut state = self.state.lock().unwrap(); + let state = self.state.lock().unwrap(); if let Some(session) = state.session_by_id(id) { - let mut session = session.inner.lock().unwrap(); + let mut session = session.internal_session.inner.lock().unwrap(); let ssrcs = session.session.ssrcs().collect::>(); // we can only Bye the entire session if we do not have any local send sources // currently sending data @@ -1543,9 +1063,6 @@ impl RtpBin2 { session.session.schedule_bye("End of stream", now); } drop(session); - if let Some(waker) = state.rtcp_waker.take() { - waker.wake(); - } } drop(state); // FIXME: may need to delay sending eos under some circumstances @@ -1556,7 +1073,6 @@ impl RtpBin2 { let state = self.state.lock().unwrap(); let mut pause_tasks = vec![]; if let Some(session) = state.session_by_id(id) { - let session = session.inner.lock().unwrap(); for recv_pad in session.rtp_recv_srcpads.iter() { let mut store = recv_pad.jitter_buffer_store.lock().unwrap(); store.jitterbuffer.set_flushing(true); @@ -1573,9 +1089,8 @@ impl RtpBin2 { gst::Pad::event_default(pad, Some(&*self.obj()), event) } gst::EventView::FlushStop(_fs) => { - let state = self.state.lock().unwrap(); - if let Some(session) = state.session_by_id(id) { - let mut session = session.inner.lock().unwrap(); + let mut state = self.state.lock().unwrap(); + if let Some(session) = state.mut_session_by_id(id) { let pads = session .rtp_recv_srcpads .iter() @@ -1616,7 +1131,7 @@ impl RtpBin2 { let state = self.state.lock().unwrap(); if let Some(session) = state.session_by_id(id) { let now = Instant::now(); - let mut session = session.inner.lock().unwrap(); + let mut session = session.internal_session.inner.lock().unwrap(); let caps = session.caps_from_pt(pt); let s = caps.structure(0).unwrap(); @@ -1632,16 +1147,12 @@ impl RtpBin2 { if pli || fir { let replies = session.session.request_remote_key_unit(now, typ, ssrc); - let waker = state.rtcp_waker.clone(); - drop(session); - drop(state); - for reply in replies { match reply { RequestRemoteKeyUnitReply::TimerReconsideration => { - if let Some(ref waker) = waker { + if let Some(waker) = session.rtcp_waker.take() { // reconsider timers means that we wake the rtcp task to get a new timeout - waker.wake_by_ref(); + waker.wake(); } } } @@ -1660,9 +1171,9 @@ impl RtpBin2 { } #[glib::object_subclass] -impl ObjectSubclass for RtpBin2 { - const NAME: &'static str = "GstRtpBin2"; - type Type = super::RtpBin2; +impl ObjectSubclass for RtpRecv { + const NAME: &'static str = "GstRtpRecv"; + type Type = super::RtpRecv; type ParentType = gst::Element; fn new() -> Self { @@ -1670,44 +1181,30 @@ impl ObjectSubclass for RtpBin2 { Self { settings: Default::default(), state: Default::default(), - rtcp_task: Mutex::new(None), } } } -impl ObjectImpl for RtpBin2 { +impl ObjectImpl for RtpRecv { fn properties() -> &'static [glib::ParamSpec] { static PROPERTIES: Lazy> = Lazy::new(|| { vec![ + glib::ParamSpecString::builder("rtp-id") + .nick("The RTP Connection ID") + .blurb("A connection ID shared with a rtpsend element for implementing both sending and receiving using the same RTP context") + .default_value("rtp-id") + .build(), glib::ParamSpecUInt::builder("latency") .nick("Buffer latency in ms") .blurb("Amount of ms to buffer") .default_value(DEFAULT_LATENCY.mseconds() as u32) .mutable_ready() .build(), - glib::ParamSpecUInt::builder("min-rtcp-interval") - .nick("Minimum RTCP interval in ms") - .blurb("Minimum time (in ms) between RTCP reports") - .default_value(DEFAULT_MIN_RTCP_INTERVAL.as_millis() as u32) - .mutable_ready() - .build(), glib::ParamSpecUInt::builder("stats") .nick("Statistics") .blurb("Statistics about the session") .read_only() .build(), - glib::ParamSpecEnum::builder::("rtp-profile") - .nick("RTP Profile") - .blurb("RTP Profile to use") - .default_value(Profile::default()) - .mutable_ready() - .build(), - glib::ParamSpecBoolean::builder("reduced-size-rtcp") - .nick("Reduced Size RTCP") - .blurb("Use reduced size RTCP. Only has an effect if rtp-profile=avpf") - .default_value(DEFAULT_REDUCED_SIZE_RTCP) - .mutable_ready() - .build(), glib::ParamSpecEnum::builder::("timestamping-mode") .nick("Timestamping Mode") .blurb("Govern how to pick presentation timestamps for packets") @@ -1722,6 +1219,10 @@ impl ObjectImpl for RtpBin2 { fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) { match pspec.name() { + "rtp-id" => { + let mut settings = self.settings.lock().unwrap(); + settings.rtp_id = value.get::().expect("type checked upstream"); + } "latency" => { let _latency = { let mut settings = self.settings.lock().unwrap(); @@ -1735,20 +1236,6 @@ impl ObjectImpl for RtpBin2 { .obj() .post_message(gst::message::Latency::builder().src(&*self.obj()).build()); } - "min-rtcp-interval" => { - let mut settings = self.settings.lock().unwrap(); - settings.min_rtcp_interval = Duration::from_millis( - value.get::().expect("type checked upstream").into(), - ); - } - "rtp-profile" => { - let mut settings = self.settings.lock().unwrap(); - settings.profile = value.get::().expect("Type checked upstream"); - } - "reduced-size-rtcp" => { - let mut settings = self.settings.lock().unwrap(); - settings.reduced_size_rtcp = value.get::().expect("Type checked upstream"); - } "timestamping-mode" => { let mut settings = self.settings.lock().unwrap(); settings.timestamping_mode = value @@ -1761,26 +1248,18 @@ impl ObjectImpl for RtpBin2 { fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { match pspec.name() { + "rtp-id" => { + let settings = self.settings.lock().unwrap(); + settings.rtp_id.to_value() + } "latency" => { let settings = self.settings.lock().unwrap(); (settings.latency.mseconds() as u32).to_value() } - "min-rtcp-interval" => { - let settings = self.settings.lock().unwrap(); - (settings.min_rtcp_interval.as_millis() as u32).to_value() - } "stats" => { let state = self.state.lock().unwrap(); state.stats().to_value() } - "rtp-profile" => { - let settings = self.settings.lock().unwrap(); - settings.profile.to_value() - } - "reduced-size-rtcp" => { - let settings = self.settings.lock().unwrap(); - settings.reduced_size_rtcp.to_value() - } "timestamping-mode" => { let settings = self.settings.lock().unwrap(); settings.timestamping_mode.to_value() @@ -1793,16 +1272,16 @@ impl ObjectImpl for RtpBin2 { static SIGNALS: Lazy> = Lazy::new(|| { vec![glib::subclass::Signal::builder("get-session") .param_types([u32::static_type()]) - .return_type::() + .return_type::() .action() .class_handler(|_token, args| { - let element = args[0].get::().expect("signal arg"); + let element = args[0].get::().expect("signal arg"); let id = args[1].get::().expect("signal arg"); let bin = element.imp(); let state = bin.state.lock().unwrap(); state .session_by_id(id as usize) - .map(|sess| sess.config.to_value()) + .map(|sess| sess.internal_session.config.to_value()) }) .build()] }); @@ -1811,15 +1290,15 @@ impl ObjectImpl for RtpBin2 { } } -impl GstObjectImpl for RtpBin2 {} +impl GstObjectImpl for RtpRecv {} -impl ElementImpl for RtpBin2 { +impl ElementImpl for RtpRecv { fn metadata() -> Option<&'static gst::subclass::ElementMetadata> { static ELEMENT_METADATA: Lazy = Lazy::new(|| { gst::subclass::ElementMetadata::new( - "RTP Bin", + "RTP Session receiver", "Network/RTP/Filter", - "RTP sessions management", + "RTP sessions management (receiver)", "Matthew Waters ", ) }); @@ -1838,47 +1317,26 @@ impl ElementImpl for RtpBin2 { vec![ gst::PadTemplate::new( - "rtp_recv_sink_%u", + "rtp_sink_%u", gst::PadDirection::Sink, gst::PadPresence::Request, &rtp_caps, ) .unwrap(), gst::PadTemplate::new( - "rtcp_recv_sink_%u", + "rtcp_sink_%u", gst::PadDirection::Sink, gst::PadPresence::Request, &rtcp_caps, ) .unwrap(), gst::PadTemplate::new( - "rtp_recv_src_%u_%u_%u", + "rtp_src_%u_%u_%u", gst::PadDirection::Src, gst::PadPresence::Sometimes, &rtp_caps, ) .unwrap(), - gst::PadTemplate::new( - "rtp_send_sink_%u", - gst::PadDirection::Sink, - gst::PadPresence::Request, - &rtp_caps, - ) - .unwrap(), - gst::PadTemplate::new( - "rtp_send_src_%u", - gst::PadDirection::Src, - gst::PadPresence::Sometimes, - &rtp_caps, - ) - .unwrap(), - gst::PadTemplate::new( - "rtcp_send_src_%u", - gst::PadDirection::Src, - gst::PadPresence::Request, - &rtcp_caps, - ) - .unwrap(), ] }); @@ -1892,6 +1350,7 @@ impl ElementImpl for RtpBin2 { _caps: Option<&gst::Caps>, // XXX: do something with caps? ) -> Option { let settings = self.settings.lock().unwrap().clone(); + let rtp_id = settings.rtp_id.clone(); let mut state = self.state.lock().unwrap(); let max_session_id = state.max_session_id; @@ -1911,200 +1370,110 @@ impl ElementImpl for RtpBin2 { }; match templ.name_template() { - "rtp_send_sink_%u" => { - sess_parse(name, "rtp_send_sink_", max_session_id).and_then(|id| { - let new_pad = move |session: &mut BinSessionInner| -> Option<( - gst::Pad, - Option, - usize, - Vec, - )> { - let sinkpad = gst::Pad::builder_from_template(templ) - .chain_function(move |_pad, parent, buffer| { - RtpBin2::catch_panic_pad_function( - parent, - || Err(gst::FlowError::Error), - |this| this.rtp_send_sink_chain(id, buffer), - ) - }) - .iterate_internal_links_function(|pad, parent| { - RtpBin2::catch_panic_pad_function( - parent, - || gst::Iterator::from_vec(vec![]), - |this| this.iterate_internal_links(pad), - ) - }) - .event_function(move |pad, parent, event| { - RtpBin2::catch_panic_pad_function( - parent, - || false, - |this| this.rtp_send_sink_event(pad, event, id), - ) - }) - .flags(gst::PadFlags::PROXY_CAPS) - .name(format!("rtp_send_sink_{}", id)) - .build(); - let src_templ = self.obj().pad_template("rtp_send_src_%u").unwrap(); - let srcpad = gst::Pad::builder_from_template(&src_templ) - .iterate_internal_links_function(|pad, parent| { - RtpBin2::catch_panic_pad_function( - parent, - || gst::Iterator::from_vec(vec![]), - |this| this.iterate_internal_links(pad), - ) - }) - .name(format!("rtp_send_src_{}", id)) - .build(); - session.rtp_send_sinkpad = Some(sinkpad.clone()); - session.rtp_send_srcpad = Some(srcpad.clone()); - Some((sinkpad, Some(srcpad), id, vec![])) - }; + "rtp_sink_%u" => sess_parse(name, "rtp_sink_", max_session_id).and_then(|id| { + let new_pad = move |session: &mut RecvSession| -> Option<( + gst::Pad, + Option, + usize, + Vec, + )> { + let sinkpad = gst::Pad::builder_from_template(templ) + .chain_function(move |pad, parent, buffer| { + RtpRecv::catch_panic_pad_function( + parent, + || Err(gst::FlowError::Error), + |this| this.rtp_recv_sink_chain(pad, id, buffer), + ) + }) + .iterate_internal_links_function(|pad, parent| { + RtpRecv::catch_panic_pad_function( + parent, + || gst::Iterator::from_vec(vec![]), + |this| this.iterate_internal_links(pad), + ) + }) + .event_function(move |pad, parent, event| { + RtpRecv::catch_panic_pad_function( + parent, + || false, + |this| this.rtp_recv_sink_event(pad, event, id), + ) + }) + .query_function(move |pad, parent, query| { + RtpRecv::catch_panic_pad_function( + parent, + || false, + |this| this.rtp_recv_sink_query(pad, query, id), + ) + }) + .name(format!("rtp_sink_{}", id)) + .build(); + session.rtp_recv_sinkpad = Some(sinkpad.clone()); + Some((sinkpad, None, id, vec![])) + }; - let session = state.session_by_id(id); - if let Some(session) = session { - let mut session = session.inner.lock().unwrap(); - if session.rtp_send_sinkpad.is_some() { - None - } else { - new_pad(&mut session) - } + let session = state.mut_session_by_id(id); + if let Some(session) = session { + if session.rtp_recv_sinkpad.is_some() { + None } else { - let session = BinSession::new(id, &settings); - let mut inner = session.inner.lock().unwrap(); - let ret = new_pad(&mut inner); - drop(inner); - state.sessions.push(session); - ret + new_pad(session) } - }) - } - "rtp_recv_sink_%u" => { - sess_parse(name, "rtp_recv_sink_", max_session_id).and_then(|id| { - let new_pad = move |session: &mut BinSessionInner| -> Option<( - gst::Pad, - Option, - usize, - Vec, - )> { - let sinkpad = gst::Pad::builder_from_template(templ) - .chain_function(move |pad, parent, buffer| { - RtpBin2::catch_panic_pad_function( - parent, - || Err(gst::FlowError::Error), - |this| this.rtp_recv_sink_chain(pad, id, buffer), - ) - }) - .iterate_internal_links_function(|pad, parent| { - RtpBin2::catch_panic_pad_function( - parent, - || gst::Iterator::from_vec(vec![]), - |this| this.iterate_internal_links(pad), - ) - }) - .event_function(move |pad, parent, event| { - RtpBin2::catch_panic_pad_function( - parent, - || false, - |this| this.rtp_recv_sink_event(pad, event, id), - ) - }) - .query_function(move |pad, parent, query| { - RtpBin2::catch_panic_pad_function( - parent, - || false, - |this| this.rtp_recv_sink_query(pad, query, id), - ) - }) - .name(format!("rtp_recv_sink_{}", id)) - .build(); - session.rtp_recv_sinkpad = Some(sinkpad.clone()); - Some((sinkpad, None, id, vec![])) - }; + } else { + let shared_state = state + .shared_state + .get_or_insert_with(|| SharedRtpState::recv_get_or_init(rtp_id)); + let mut session = RecvSession::new(shared_state, id); + let ret = new_pad(&mut session); + state.sessions.push(session); + ret + } + }), + "rtcp_sink_%u" => sess_parse(name, "rtcp_sink_", max_session_id).and_then(|id| { + let new_pad = move |session: &mut RecvSession| -> Option<( + gst::Pad, + Option, + usize, + Vec, + )> { + let sinkpad = gst::Pad::builder_from_template(templ) + .chain_function(move |_pad, parent, buffer| { + RtpRecv::catch_panic_pad_function( + parent, + || Err(gst::FlowError::Error), + |this| this.rtcp_recv_sink_chain(id, buffer), + ) + }) + .iterate_internal_links_function(|pad, parent| { + RtpRecv::catch_panic_pad_function( + parent, + || gst::Iterator::from_vec(vec![]), + |this| this.iterate_internal_links(pad), + ) + }) + .name(format!("rtcp_sink_{}", id)) + .build(); + session.rtcp_recv_sinkpad = Some(sinkpad.clone()); + Some((sinkpad, None, id, vec![])) + }; - let session = state.session_by_id(id); - if let Some(session) = session { - let mut session = session.inner.lock().unwrap(); - if session.rtp_send_sinkpad.is_some() { - None - } else { - new_pad(&mut session) - } + let session = state.mut_session_by_id(id); + if let Some(session) = session { + if session.rtcp_recv_sinkpad.is_some() { + None } else { - let session = BinSession::new(id, &settings); - let mut inner = session.inner.lock().unwrap(); - let ret = new_pad(&mut inner); - drop(inner); - state.sessions.push(session); - ret + new_pad(session) } - }) - } - "rtcp_recv_sink_%u" => { - sess_parse(name, "rtcp_recv_sink_", max_session_id).and_then(|id| { - state.session_by_id(id).and_then(|session| { - let mut session = session.inner.lock().unwrap(); - if session.rtcp_recv_sinkpad.is_some() { - None - } else { - let sinkpad = gst::Pad::builder_from_template(templ) - .chain_function(move |_pad, parent, buffer| { - RtpBin2::catch_panic_pad_function( - parent, - || Err(gst::FlowError::Error), - |this| this.rtcp_recv_sink_chain(id, buffer), - ) - }) - .iterate_internal_links_function(|pad, parent| { - RtpBin2::catch_panic_pad_function( - parent, - || gst::Iterator::from_vec(vec![]), - |this| this.iterate_internal_links(pad), - ) - }) - .name(format!("rtcp_recv_sink_{}", id)) - .build(); - session.rtcp_recv_sinkpad = Some(sinkpad.clone()); - Some((sinkpad, None, id, vec![])) - } - }) - }) - } - "rtcp_send_src_%u" => { - self.start_rtcp_task(); - sess_parse(name, "rtcp_send_src_", max_session_id).and_then(|id| { - state.session_by_id(id).and_then(|session| { - let mut session = session.inner.lock().unwrap(); - - if session.rtcp_send_srcpad.is_some() { - None - } else { - let srcpad = gst::Pad::builder_from_template(templ) - .iterate_internal_links_function(|pad, parent| { - RtpBin2::catch_panic_pad_function( - parent, - || gst::Iterator::from_vec(vec![]), - |this| this.iterate_internal_links(pad), - ) - }) - .name(format!("rtcp_send_src_{}", id)) - .build(); - - let stream_id = format!("{}/rtcp", id); - let stream_start = gst::event::StreamStart::builder(&stream_id).build(); - let seqnum = stream_start.seqnum(); - - let caps = gst::Caps::new_empty_simple("application/x-rtcp"); - let caps = gst::event::Caps::builder(&caps).seqnum(seqnum).build(); - - let segment = gst::FormattedSegment::::new(); - let segment = gst::event::Segment::new(&segment); - - session.rtcp_send_srcpad = Some(srcpad.clone()); - Some((srcpad, None, id, vec![stream_start, caps, segment])) - } - }) - }) - } + } else { + let shared_state = state + .shared_state + .get_or_insert_with(|| SharedRtpState::recv_get_or_init(rtp_id)); + let mut session = RecvSession::new(shared_state, id); + let ret = new_pad(&mut session); + state.sessions.push(session); + ret + } + }), _ => None, } .map(|(pad, otherpad, id, sticky_events)| { @@ -2132,14 +1501,12 @@ impl ElementImpl for RtpBin2 { } fn release_pad(&self, pad: &gst::Pad) { - let state = self.state.lock().unwrap(); + let mut state = self.state.lock().unwrap(); let mut removed_pads = vec![]; let mut removed_session_ids = vec![]; if let Some(&id) = state.pads_session_id_map.get(pad) { removed_pads.push(pad.clone()); - if let Some(session) = state.session_by_id(id) { - let mut session = session.inner.lock().unwrap(); - + if let Some(session) = state.mut_session_by_id(id) { if Some(pad) == session.rtp_recv_sinkpad.as_ref() { session.rtp_recv_sinkpad = None; removed_pads.extend(session.rtp_recv_srcpads.iter().map(|r| r.pad.clone())); @@ -2148,27 +1515,12 @@ impl ElementImpl for RtpBin2 { session.recv_store.clear(); } - if Some(pad) == session.rtp_send_sinkpad.as_ref() { - session.rtp_send_sinkpad = None; - if let Some(srcpad) = session.rtp_send_srcpad.take() { - removed_pads.push(srcpad); - } - } - - if Some(pad) == session.rtcp_send_srcpad.as_ref() { - session.rtcp_send_srcpad = None; - } - if Some(pad) == session.rtcp_recv_sinkpad.as_ref() { session.rtcp_recv_sinkpad = None; } - if session.rtp_recv_sinkpad.is_none() - && session.rtp_send_sinkpad.is_none() - && session.rtcp_recv_sinkpad.is_none() - && session.rtcp_send_srcpad.is_none() - { - removed_session_ids.push(session.id); + if session.rtp_recv_sinkpad.is_none() && session.rtcp_recv_sinkpad.is_none() { + removed_session_ids.push(session.internal_session.id); } } } @@ -2189,16 +1541,10 @@ impl ElementImpl for RtpBin2 { state.pads_session_id_map.remove(pad); } for id in removed_session_ids { - if let Some(session) = state.session_by_id(id) { - let session = session.inner.lock().unwrap(); - if session.rtp_recv_sinkpad.is_none() - && session.rtp_send_sinkpad.is_none() - && session.rtcp_recv_sinkpad.is_none() - && session.rtcp_send_srcpad.is_none() - { - let id = session.id; - drop(session); - state.sessions.retain(|s| s.id != id); + if let Some(session) = state.mut_session_by_id(id) { + if session.rtp_recv_sinkpad.is_none() && session.rtcp_recv_sinkpad.is_none() { + let id = session.internal_session.id; + state.sessions.retain(|s| s.internal_session.id != id); } } } @@ -2213,6 +1559,25 @@ impl ElementImpl for RtpBin2 { transition: gst::StateChange, ) -> Result { match transition { + gst::StateChange::NullToReady => { + let settings = self.settings.lock().unwrap(); + let mut state = self.state.lock().unwrap(); + let rtp_id = settings.rtp_id.clone(); + let empty_sessions = state.sessions.is_empty(); + match state.shared_state.as_mut() { + Some(shared) => { + if !empty_sessions && shared.name() != rtp_id { + let other_name = shared.name().to_owned(); + drop(state); + self.post_error_message(gst::error_msg!(gst::LibraryError::Settings, ["rtp-id {rtp_id} does not match the currently set value {other_name}"])); + return Err(gst::StateChangeError); + } + } + None => { + state.shared_state = Some(SharedRtpState::send_get_or_init(rtp_id.clone())); + } + } + } gst::StateChange::ReadyToPaused => { let settings = self.settings.lock().unwrap(); let mut state = self.state.lock().unwrap(); @@ -2225,17 +1590,13 @@ impl ElementImpl for RtpBin2 { let mut success = self.parent_change_state(transition)?; match transition { - gst::StateChange::ReadyToNull => { - self.stop_rtcp_task(); - } gst::StateChange::ReadyToPaused | gst::StateChange::PlayingToPaused => { success = gst::StateChangeSuccess::NoPreroll; } gst::StateChange::PausedToReady => { let mut state = self.state.lock().unwrap(); let mut removed_pads = vec![]; - for session in &state.sessions { - let mut session = session.inner.lock().unwrap(); + for session in &mut state.sessions { removed_pads.extend(session.rtp_recv_srcpads.iter().map(|r| r.pad.clone())); session.recv_flow_combiner.lock().unwrap().clear(); @@ -2246,8 +1607,6 @@ impl ElementImpl for RtpBin2 { session.rtp_recv_sink_segment = None; session.rtp_recv_sink_seqnum = None; session.rtp_recv_sink_group_id = None; - - session.pt_map.clear(); } state.sync_context = None; drop(state); @@ -2273,123 +1632,10 @@ impl ElementImpl for RtpBin2 { } } -pub fn pt_clock_rate_from_caps(caps: &gst::CapsRef) -> Option<(u8, u32)> { - let Some(s) = caps.structure(0) else { - gst::debug!(CAT, "no structure!"); - return None; - }; - let Some((clock_rate, pt)) = Option::zip( - s.get::("clock-rate").ok(), - s.get::("payload").ok(), - ) else { - gst::debug!( - CAT, - "could not retrieve clock-rate and/or payload from structure" - ); - return None; - }; - if (0..=127).contains(&pt) && clock_rate > 0 { - Some((pt as u8, clock_rate as u32)) - } else { - gst::debug!( - CAT, - "payload value {pt} out of bounds or clock-rate {clock_rate} out of bounds" - ); - None - } -} - -fn clock_rate_from_caps(caps: &gst::CapsRef) -> Option { - let Some(s) = caps.structure(0) else { - return None; - }; - let Some(clock_rate) = s.get::("clock-rate").ok() else { - return None; - }; - if clock_rate > 0 { - Some(clock_rate as u32) - } else { - None - } -} - -static RUST_CAT: Lazy = Lazy::new(|| { - gst::DebugCategory::new( - "rust-log", - gst::DebugColorFlags::empty(), - Some("Logs from rust crates"), - ) -}); - -static GST_RUST_LOGGER_ONCE: once_cell::sync::OnceCell<()> = once_cell::sync::OnceCell::new(); -static GST_RUST_LOGGER: GstRustLogger = GstRustLogger {}; - -pub(crate) struct GstRustLogger {} - -impl GstRustLogger { - pub fn install() { - GST_RUST_LOGGER_ONCE.get_or_init(|| { - if log::set_logger(&GST_RUST_LOGGER).is_err() { - gst::warning!( - RUST_CAT, - "Cannot install log->gst logger, already installed?" - ); - } else { - log::set_max_level(GstRustLogger::debug_level_to_log_level_filter( - RUST_CAT.threshold(), - )); - gst::info!(RUST_CAT, "installed log->gst logger"); - } - }); - } - - fn debug_level_to_log_level_filter(level: gst::DebugLevel) -> log::LevelFilter { - match level { - gst::DebugLevel::None => log::LevelFilter::Off, - gst::DebugLevel::Error => log::LevelFilter::Error, - gst::DebugLevel::Warning => log::LevelFilter::Warn, - gst::DebugLevel::Fixme | gst::DebugLevel::Info => log::LevelFilter::Info, - gst::DebugLevel::Debug => log::LevelFilter::Debug, - gst::DebugLevel::Log | gst::DebugLevel::Trace | gst::DebugLevel::Memdump => { - log::LevelFilter::Trace - } - _ => log::LevelFilter::Trace, - } - } - - fn log_level_to_debug_level(level: log::Level) -> gst::DebugLevel { - match level { - log::Level::Error => gst::DebugLevel::Error, - log::Level::Warn => gst::DebugLevel::Warning, - log::Level::Info => gst::DebugLevel::Info, - log::Level::Debug => gst::DebugLevel::Debug, - log::Level::Trace => gst::DebugLevel::Trace, +impl Drop for RtpRecv { + fn drop(&mut self) { + if let Some(ref shared_state) = self.state.lock().unwrap().shared_state { + shared_state.unmark_recv_outstanding(); } } } - -impl log::Log for GstRustLogger { - fn enabled(&self, metadata: &log::Metadata) -> bool { - RUST_CAT.above_threshold(GstRustLogger::log_level_to_debug_level(metadata.level())) - } - - fn log(&self, record: &log::Record) { - let gst_level = GstRustLogger::log_level_to_debug_level(record.metadata().level()); - let file = record - .file() - .map(glib::GString::from) - .unwrap_or_else(|| glib::GString::from("rust-log")); - let function = record.target(); - let line = record.line().unwrap_or(0); - RUST_CAT.log( - None::<&glib::Object>, - gst_level, - file.as_gstr(), - function, - line, - *record.args(), - ); - } - - fn flush(&self) {} -} diff --git a/net/rtp/src/rtpbin2/rtpsend.rs b/net/rtp/src/rtpbin2/rtpsend.rs new file mode 100644 index 00000000..305f9803 --- /dev/null +++ b/net/rtp/src/rtpbin2/rtpsend.rs @@ -0,0 +1,878 @@ +// SPDX-License-Identifier: MPL-2.0 + +use std::collections::HashMap; +use std::pin::Pin; +use std::sync::{Arc, Mutex}; +use std::task::Poll; +use std::time::{Duration, Instant, SystemTime}; + +use futures::future::{AbortHandle, Abortable}; +use futures::StreamExt; +use gst::{glib, prelude::*, subclass::prelude::*}; +use once_cell::sync::Lazy; + +use super::internal::{pt_clock_rate_from_caps, GstRustLogger, SharedRtpState, SharedSession}; +use super::session::{RtcpSendReply, RtpProfile, SendReply, RTCP_MIN_REPORT_INTERVAL}; +use super::source::SourceState; + +use crate::rtpbin2::RUNTIME; + +const DEFAULT_MIN_RTCP_INTERVAL: Duration = RTCP_MIN_REPORT_INTERVAL; +const DEFAULT_REDUCED_SIZE_RTCP: bool = false; + +static CAT: Lazy = Lazy::new(|| { + gst::DebugCategory::new( + "rtpsend", + gst::DebugColorFlags::empty(), + Some("RTP Sending"), + ) +}); + +#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, glib::Enum)] +#[repr(u32)] +#[enum_type(name = "GstRtpSendProfile")] +enum Profile { + #[default] + #[enum_value(name = "AVP profile as specified in RFC 3550", nick = "avp")] + Avp, + #[enum_value(name = "AVPF profile as specified in RFC 4585", nick = "avpf")] + Avpf, +} + +impl From for Profile { + fn from(value: RtpProfile) -> Self { + match value { + RtpProfile::Avp => Self::Avp, + RtpProfile::Avpf => Self::Avpf, + } + } +} + +impl From for RtpProfile { + fn from(value: Profile) -> Self { + match value { + Profile::Avp => Self::Avp, + Profile::Avpf => Self::Avpf, + } + } +} + +#[derive(Debug, Clone)] +struct Settings { + rtp_id: String, + min_rtcp_interval: Duration, + profile: Profile, + reduced_size_rtcp: bool, +} + +impl Default for Settings { + fn default() -> Self { + Settings { + rtp_id: String::from("rtp-id"), + min_rtcp_interval: DEFAULT_MIN_RTCP_INTERVAL, + profile: Profile::default(), + reduced_size_rtcp: DEFAULT_REDUCED_SIZE_RTCP, + } + } +} + +#[derive(Debug)] +#[must_use = "futures/streams/sinks do nothing unless you `.await` or poll them"] +struct RtcpSendStream { + state: Arc>, + session_id: usize, + sleep: Pin>, +} + +impl RtcpSendStream { + fn new(state: Arc>, session_id: usize) -> Self { + Self { + state, + session_id, + sleep: Box::pin(tokio::time::sleep(Duration::from_secs(1))), + } + } +} + +impl futures::stream::Stream for RtcpSendStream { + type Item = RtcpSendReply; + + fn poll_next( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + let mut state = self.state.lock().unwrap(); + let now = Instant::now(); + let ntp_now = SystemTime::now(); + let mut lowest_wait = None; + if let Some(session) = state.mut_session_by_id(self.session_id) { + let mut session_inner = session.internal_session.inner.lock().unwrap(); + if let Some(reply) = session_inner.session.poll_rtcp_send(now, ntp_now) { + return Poll::Ready(Some(reply)); + } + if let Some(wait) = session_inner.session.poll_rtcp_send_timeout(now) { + if lowest_wait.map_or(true, |lowest_wait| wait < lowest_wait) { + lowest_wait = Some(wait); + } + } + session_inner.rtcp_waker = Some(cx.waker().clone()); + } + drop(state); + + // default to the minimum initial rtcp delay so we don't busy loop if there are no sessions or no + // timeouts available + let lowest_wait = + lowest_wait.unwrap_or(now + crate::rtpbin2::session::RTCP_MIN_REPORT_INTERVAL / 2); + let this = self.get_mut(); + this.sleep.as_mut().reset(lowest_wait.into()); + if !std::future::Future::poll(this.sleep.as_mut(), cx).is_pending() { + // wake us again if the delay is not pending for another go at finding the next timeout + // value + cx.waker().wake_by_ref(); + } + Poll::Pending + } +} + +#[derive(Debug)] +struct SendSession { + internal_session: SharedSession, + + rtcp_task: Mutex>, + + // State for sending RTP streams + rtp_send_sinkpad: Option, + rtp_send_srcpad: Option, + + rtcp_send_srcpad: Option, +} + +impl SendSession { + fn new(shared_state: &SharedRtpState, id: usize, settings: &Settings) -> Self { + let internal_session = shared_state.session_get_or_init(id, || { + SharedSession::new( + id, + settings.profile.into(), + settings.min_rtcp_interval, + settings.reduced_size_rtcp, + ) + }); + let mut inner = internal_session.inner.lock().unwrap(); + inner.session.set_profile(settings.profile.into()); + inner + .session + .set_min_rtcp_interval(settings.min_rtcp_interval); + inner + .session + .set_reduced_size_rtcp(settings.reduced_size_rtcp); + drop(inner); + + Self { + internal_session, + + rtcp_task: Mutex::new(None), + rtp_send_sinkpad: None, + rtp_send_srcpad: None, + rtcp_send_srcpad: None, + } + } + + fn start_rtcp_task(&self, state: Arc>) { + let mut rtcp_task = self.rtcp_task.lock().unwrap(); + + if rtcp_task.is_some() { + return; + } + + // run the runtime from another task to prevent the "start a runtime from within a runtime" panic + // when the plugin is statically linked. + let (abort_handle, abort_registration) = AbortHandle::new_pair(); + let session_id = self.internal_session.id; + RUNTIME.spawn(async move { + let future = Abortable::new(Self::rtcp_task(state, session_id), abort_registration); + future.await + }); + + rtcp_task.replace(RtcpTask { abort_handle }); + } + + async fn rtcp_task(state: Arc>, session_id: usize) { + let mut stream = RtcpSendStream::new(state.clone(), session_id); + while let Some(reply) = stream.next().await { + let state = state.lock().unwrap(); + let Some(session) = state.session_by_id(session_id) else { + continue; + }; + match reply { + RtcpSendReply::Data(data) => { + let Some(rtcp_srcpad) = session.rtcp_send_srcpad.clone() else { + continue; + }; + drop(state); + RUNTIME.spawn_blocking(move || { + let buffer = gst::Buffer::from_mut_slice(data); + if let Err(e) = rtcp_srcpad.push(buffer) { + gst::warning!(CAT, obj: rtcp_srcpad, "Failed to send rtcp data: flow return {e:?}"); + } + }); + } + RtcpSendReply::SsrcBye(ssrc) => session + .internal_session + .config + .emit_by_name::<()>("bye-ssrc", &[&ssrc]), + } + } + } + + fn stop_rtcp_task(&self) { + let mut rtcp_task = self.rtcp_task.lock().unwrap(); + + if let Some(rtcp) = rtcp_task.take() { + rtcp.abort_handle.abort(); + } + } +} + +#[derive(Debug, Default)] +struct State { + shared_state: Option, + sessions: Vec, + max_session_id: usize, + pads_session_id_map: HashMap, +} + +impl State { + fn session_by_id(&self, id: usize) -> Option<&SendSession> { + self.sessions + .iter() + .find(|session| session.internal_session.id == id) + } + + fn mut_session_by_id(&mut self, id: usize) -> Option<&mut SendSession> { + self.sessions + .iter_mut() + .find(|session| session.internal_session.id == id) + } + + fn stats(&self) -> gst::Structure { + let mut ret = gst::Structure::builder("application/x-rtp2-stats"); + for session in self.sessions.iter() { + let sess_id = session.internal_session.id; + let session = session.internal_session.inner.lock().unwrap(); + + ret = ret.field(sess_id.to_string(), session.stats()); + } + ret.build() + } +} + +pub struct RtpSend { + settings: Mutex, + state: Arc>, +} + +#[derive(Debug)] +struct RtcpTask { + abort_handle: AbortHandle, +} + +impl RtpSend { + fn iterate_internal_links(&self, pad: &gst::Pad) -> gst::Iterator { + let state = self.state.lock().unwrap(); + if let Some(&id) = state.pads_session_id_map.get(pad) { + if let Some(session) = state.session_by_id(id) { + if let Some(ref sinkpad) = session.rtp_send_sinkpad { + if let Some(ref srcpad) = session.rtp_send_srcpad { + if sinkpad == pad { + return gst::Iterator::from_vec(vec![srcpad.clone()]); + } else if srcpad == pad { + return gst::Iterator::from_vec(vec![sinkpad.clone()]); + } + } + } + // nothing to do for rtcp pads + } + } + gst::Iterator::from_vec(vec![]) + } + + fn rtp_send_sink_chain( + &self, + id: usize, + buffer: gst::Buffer, + ) -> Result { + let state = self.state.lock().unwrap(); + let Some(session) = state.session_by_id(id) else { + gst::error!(CAT, "No session?"); + return Err(gst::FlowError::Error); + }; + + let mapped = buffer.map_readable().map_err(|e| { + gst::error!(CAT, imp: self, "Failed to map input buffer {e:?}"); + gst::FlowError::Error + })?; + let rtp = match rtp_types::RtpPacket::parse(&mapped) { + Ok(rtp) => rtp, + Err(e) => { + gst::error!(CAT, imp: self, "Failed to parse input as valid rtp packet: {e:?}"); + return Ok(gst::FlowSuccess::Ok); + } + }; + + let srcpad = session.rtp_send_srcpad.clone().unwrap(); + let session = session.internal_session.clone(); + let mut session_inner = session.inner.lock().unwrap(); + drop(state); + + let now = Instant::now(); + loop { + match session_inner.session.handle_send(&rtp, now) { + SendReply::SsrcCollision(_ssrc) => (), // TODO: handle ssrc collision + SendReply::NewSsrc(ssrc, _pt) => { + drop(session_inner); + session.config.emit_by_name::<()>("new-ssrc", &[&ssrc]); + session_inner = session.inner.lock().unwrap(); + } + SendReply::Passthrough => break, + SendReply::Drop => return Ok(gst::FlowSuccess::Ok), + } + } + // TODO: handle other processing + drop(mapped); + drop(session_inner); + srcpad.push(buffer) + } + + fn rtp_send_sink_event(&self, pad: &gst::Pad, event: gst::Event, id: usize) -> bool { + match event.view() { + gst::EventView::Caps(caps) => { + if let Some((pt, clock_rate)) = pt_clock_rate_from_caps(caps.caps()) { + let state = self.state.lock().unwrap(); + if let Some(session) = state.session_by_id(id) { + let mut session = session.internal_session.inner.lock().unwrap(); + session.session.set_pt_clock_rate(pt, clock_rate); + session.add_caps(caps.caps_owned()); + } + } else { + gst::warning!(CAT, obj: pad, "input caps are missing payload or clock-rate fields"); + } + gst::Pad::event_default(pad, Some(&*self.obj()), event) + } + gst::EventView::Eos(_eos) => { + let now = Instant::now(); + let state = self.state.lock().unwrap(); + if let Some(session) = state.session_by_id(id) { + let mut session = session.internal_session.inner.lock().unwrap(); + let ssrcs = session.session.ssrcs().collect::>(); + // We want to bye all relevant ssrc's here. + // Relevant means they will not be used by something else which means that any + // local send ssrc that is not being used for Sr/Rr reports (internal_ssrc) can + // have the Bye state applied. + let mut all_local = true; + let internal_ssrc = session.session.internal_ssrc(); + for ssrc in ssrcs { + let Some(local_send) = session.session.mut_local_send_source_by_ssrc(ssrc) + else { + if let Some(local_recv) = + session.session.local_receive_source_by_ssrc(ssrc) + { + if local_recv.state() != SourceState::Bye + && Some(ssrc) != internal_ssrc + { + all_local = false; + } + } + continue; + }; + if Some(ssrc) != internal_ssrc { + local_send.mark_bye("End of Stream") + } + } + if all_local { + // if there are no non-local send ssrc's, then we can Bye the entire + // session. + session.session.schedule_bye("End of Stream", now); + } + if let Some(waker) = session.rtcp_waker.take() { + waker.wake(); + } + drop(session); + } + drop(state); + gst::Pad::event_default(pad, Some(&*self.obj()), event) + } + _ => gst::Pad::event_default(pad, Some(&*self.obj()), event), + } + } +} + +#[glib::object_subclass] +impl ObjectSubclass for RtpSend { + const NAME: &'static str = "GstRtpSend"; + type Type = super::RtpSend; + type ParentType = gst::Element; + + fn new() -> Self { + GstRustLogger::install(); + Self { + settings: Default::default(), + state: Default::default(), + } + } +} + +impl ObjectImpl for RtpSend { + fn properties() -> &'static [glib::ParamSpec] { + static PROPERTIES: Lazy> = Lazy::new(|| { + vec![ + glib::ParamSpecString::builder("rtp-id") + .nick("The RTP Connection ID") + .blurb("A connection ID shared with a rtprecv element for implementing both sending and receiving using the same RTP context") + .default_value("rtp-id") + .build(), + glib::ParamSpecUInt::builder("min-rtcp-interval") + .nick("Minimum RTCP interval in ms") + .blurb("Minimum time (in ms) between RTCP reports") + .default_value(DEFAULT_MIN_RTCP_INTERVAL.as_millis() as u32) + .mutable_ready() + .build(), + glib::ParamSpecUInt::builder("stats") + .nick("Statistics") + .blurb("Statistics about the session") + .read_only() + .build(), + glib::ParamSpecEnum::builder::("rtp-profile") + .nick("RTP Profile") + .blurb("RTP Profile to use") + .default_value(Profile::default()) + .mutable_ready() + .build(), + glib::ParamSpecBoolean::builder("reduced-size-rtcp") + .nick("Reduced Size RTCP") + .blurb("Use reduced size RTCP. Only has an effect if rtp-profile=avpf") + .default_value(DEFAULT_REDUCED_SIZE_RTCP) + .mutable_ready() + .build(), + ] + }); + + PROPERTIES.as_ref() + } + + fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) { + match pspec.name() { + "rtp-id" => { + let mut settings = self.settings.lock().unwrap(); + settings.rtp_id = value.get::().expect("type checked upstream"); + } + "min-rtcp-interval" => { + let mut settings = self.settings.lock().unwrap(); + settings.min_rtcp_interval = Duration::from_millis( + value.get::().expect("type checked upstream").into(), + ); + } + "rtp-profile" => { + let mut settings = self.settings.lock().unwrap(); + settings.profile = value.get::().expect("Type checked upstream"); + } + "reduced-size-rtcp" => { + let mut settings = self.settings.lock().unwrap(); + settings.reduced_size_rtcp = value.get::().expect("Type checked upstream"); + } + _ => unimplemented!(), + } + } + + fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { + match pspec.name() { + "rtp-id" => { + let settings = self.settings.lock().unwrap(); + settings.rtp_id.to_value() + } + "min-rtcp-interval" => { + let settings = self.settings.lock().unwrap(); + (settings.min_rtcp_interval.as_millis() as u32).to_value() + } + "stats" => { + let state = self.state.lock().unwrap(); + state.stats().to_value() + } + "rtp-profile" => { + let settings = self.settings.lock().unwrap(); + settings.profile.to_value() + } + "reduced-size-rtcp" => { + let settings = self.settings.lock().unwrap(); + settings.reduced_size_rtcp.to_value() + } + _ => unimplemented!(), + } + } + + fn signals() -> &'static [glib::subclass::Signal] { + static SIGNALS: Lazy> = Lazy::new(|| { + vec![glib::subclass::Signal::builder("get-session") + .param_types([u32::static_type()]) + .return_type::() + .action() + .class_handler(|_token, args| { + let element = args[0].get::().expect("signal arg"); + let id = args[1].get::().expect("signal arg"); + let send = element.imp(); + let state = send.state.lock().unwrap(); + state + .session_by_id(id as usize) + .map(|sess| sess.internal_session.config.to_value()) + }) + .build()] + }); + + SIGNALS.as_ref() + } +} + +impl GstObjectImpl for RtpSend {} + +impl ElementImpl for RtpSend { + fn metadata() -> Option<&'static gst::subclass::ElementMetadata> { + static ELEMENT_METADATA: Lazy = Lazy::new(|| { + gst::subclass::ElementMetadata::new( + "RTP Session Sender", + "Network/RTP/Filter", + "RTP session management (sender)", + "Matthew Waters ", + ) + }); + + Some(&*ELEMENT_METADATA) + } + + fn pad_templates() -> &'static [gst::PadTemplate] { + static PAD_TEMPLATES: Lazy> = Lazy::new(|| { + let rtp_caps = gst::Caps::builder_full() + .structure(gst::Structure::builder("application/x-rtp").build()) + .build(); + let rtcp_caps = gst::Caps::builder_full() + .structure(gst::Structure::builder("application/x-rtcp").build()) + .build(); + + vec![ + gst::PadTemplate::new( + "rtp_sink_%u", + gst::PadDirection::Sink, + gst::PadPresence::Request, + &rtp_caps, + ) + .unwrap(), + gst::PadTemplate::new( + "rtp_src_%u", + gst::PadDirection::Src, + gst::PadPresence::Sometimes, + &rtp_caps, + ) + .unwrap(), + gst::PadTemplate::new( + "rtcp_src_%u", + gst::PadDirection::Src, + gst::PadPresence::Request, + &rtcp_caps, + ) + .unwrap(), + ] + }); + + PAD_TEMPLATES.as_ref() + } + + fn request_new_pad( + &self, + templ: &gst::PadTemplate, + name: Option<&str>, + _caps: Option<&gst::Caps>, // XXX: do something with caps? + ) -> Option { + let settings = self.settings.lock().unwrap().clone(); + let state_clone = self.state.clone(); + let mut state = self.state.lock().unwrap(); + let max_session_id = state.max_session_id; + let rtp_id = settings.rtp_id.clone(); + + // parse the possibly provided name into a session id or use the default + let sess_parse = move |name: Option<&str>, prefix, default_id| -> Option { + if let Some(name) = name { + name.strip_prefix(prefix).and_then(|suffix| { + if suffix.starts_with("%u") { + Some(default_id) + } else { + suffix.parse::().ok() + } + }) + } else { + Some(default_id) + } + }; + + match templ.name_template() { + "rtp_sink_%u" => sess_parse(name, "rtp_sink_", max_session_id).and_then(|id| { + let new_pad = move |session: &mut SendSession| -> Option<( + gst::Pad, + Option, + usize, + Vec, + )> { + let sinkpad = gst::Pad::builder_from_template(templ) + .chain_function(move |_pad, parent, buffer| { + RtpSend::catch_panic_pad_function( + parent, + || Err(gst::FlowError::Error), + |this| this.rtp_send_sink_chain(id, buffer), + ) + }) + .iterate_internal_links_function(|pad, parent| { + RtpSend::catch_panic_pad_function( + parent, + || gst::Iterator::from_vec(vec![]), + |this| this.iterate_internal_links(pad), + ) + }) + .event_function(move |pad, parent, event| { + RtpSend::catch_panic_pad_function( + parent, + || false, + |this| this.rtp_send_sink_event(pad, event, id), + ) + }) + .flags(gst::PadFlags::PROXY_CAPS) + .name(format!("rtp_sink_{}", id)) + .build(); + let src_templ = self.obj().pad_template("rtp_src_%u").unwrap(); + let srcpad = gst::Pad::builder_from_template(&src_templ) + .iterate_internal_links_function(|pad, parent| { + RtpSend::catch_panic_pad_function( + parent, + || gst::Iterator::from_vec(vec![]), + |this| this.iterate_internal_links(pad), + ) + }) + .name(format!("rtp_src_{}", id)) + .build(); + session.rtp_send_sinkpad = Some(sinkpad.clone()); + session.rtp_send_srcpad = Some(srcpad.clone()); + session + .internal_session + .inner + .lock() + .unwrap() + .rtp_send_sinkpad = Some(sinkpad.clone()); + Some((sinkpad, Some(srcpad), id, vec![])) + }; + + let session = state.mut_session_by_id(id); + if let Some(session) = session { + if session.rtp_send_sinkpad.is_some() { + None + } else { + new_pad(session) + } + } else { + let shared_state = state + .shared_state + .get_or_insert_with(|| SharedRtpState::send_get_or_init(rtp_id)); + let mut session = SendSession::new(shared_state, id, &settings); + let ret = new_pad(&mut session); + state.sessions.push(session); + ret + } + }), + "rtcp_src_%u" => sess_parse(name, "rtcp_src_", max_session_id).and_then(|id| { + let new_pad = move |session: &mut SendSession| -> Option<( + gst::Pad, + Option, + usize, + Vec, + )> { + let srcpad = gst::Pad::builder_from_template(templ) + .iterate_internal_links_function(|pad, parent| { + RtpSend::catch_panic_pad_function( + parent, + || gst::Iterator::from_vec(vec![]), + |this| this.iterate_internal_links(pad), + ) + }) + .name(format!("rtcp_src_{}", id)) + .build(); + + let stream_id = format!("{}/rtcp", id); + let stream_start = gst::event::StreamStart::builder(&stream_id).build(); + let seqnum = stream_start.seqnum(); + + let caps = gst::Caps::new_empty_simple("application/x-rtcp"); + let caps = gst::event::Caps::builder(&caps).seqnum(seqnum).build(); + + let segment = gst::FormattedSegment::::new(); + let segment = gst::event::Segment::new(&segment); + + session.rtcp_send_srcpad = Some(srcpad.clone()); + session.start_rtcp_task(state_clone); + Some((srcpad, None, id, vec![stream_start, caps, segment])) + }; + + let session = state.mut_session_by_id(id); + if let Some(session) = session { + if session.rtcp_send_srcpad.is_some() { + None + } else { + new_pad(session) + } + } else { + let shared_state = state + .shared_state + .get_or_insert_with(|| SharedRtpState::send_get_or_init(rtp_id)); + let mut session = SendSession::new(shared_state, id, &settings); + let ret = new_pad(&mut session); + state.sessions.push(session); + ret + } + }), + _ => None, + } + .map(|(pad, otherpad, id, sticky_events)| { + state.max_session_id = (id + 1).max(state.max_session_id); + state.pads_session_id_map.insert(pad.clone(), id); + if let Some(ref pad) = otherpad { + state.pads_session_id_map.insert(pad.clone(), id); + } + + drop(state); + + pad.set_active(true).unwrap(); + for event in sticky_events { + let _ = pad.store_sticky_event(&event); + } + self.obj().add_pad(&pad).unwrap(); + + if let Some(pad) = otherpad { + pad.set_active(true).unwrap(); + self.obj().add_pad(&pad).unwrap(); + } + + pad + }) + } + + fn release_pad(&self, pad: &gst::Pad) { + let mut state = self.state.lock().unwrap(); + let mut removed_pads = vec![]; + let mut removed_session_ids = vec![]; + if let Some(&id) = state.pads_session_id_map.get(pad) { + removed_pads.push(pad.clone()); + if let Some(session) = state.mut_session_by_id(id) { + if Some(pad) == session.rtp_send_sinkpad.as_ref() { + session.rtp_send_sinkpad = None; + session + .internal_session + .inner + .lock() + .unwrap() + .rtp_send_sinkpad = None; + + if let Some(srcpad) = session.rtp_send_srcpad.take() { + removed_pads.push(srcpad); + } + } + + if Some(pad) == session.rtcp_send_srcpad.as_ref() { + session.rtcp_send_srcpad = None; + } + + if session.rtp_send_sinkpad.is_none() && session.rtcp_send_srcpad.is_none() { + removed_session_ids.push(session.internal_session.id); + } + } + } + drop(state); + + for pad in removed_pads.iter() { + let _ = pad.set_active(false); + // Pad might not have been added yet if it's a RTP recv srcpad + if pad.has_as_parent(&*self.obj()) { + let _ = self.obj().remove_pad(pad); + } + } + + { + let mut state = self.state.lock().unwrap(); + + for pad in removed_pads.iter() { + state.pads_session_id_map.remove(pad); + } + for id in removed_session_ids { + if let Some(session) = state.session_by_id(id) { + if session.rtp_send_sinkpad.is_none() && session.rtcp_send_srcpad.is_none() { + session.stop_rtcp_task(); + state.sessions.retain(|s| s.internal_session.id != id); + } + } + } + } + + self.parent_release_pad(pad) + } + + #[allow(clippy::single_match)] + fn change_state( + &self, + transition: gst::StateChange, + ) -> Result { + match transition { + gst::StateChange::NullToReady => { + let settings = self.settings.lock().unwrap(); + let rtp_id = settings.rtp_id.clone(); + drop(settings); + + let state_clone = self.state.clone(); + let mut state = self.state.lock().unwrap(); + let empty_sessions = state.sessions.is_empty(); + match state.shared_state.as_mut() { + Some(shared) => { + if !empty_sessions && shared.name() != rtp_id { + let other_name = shared.name().to_owned(); + drop(state); + self.post_error_message(gst::error_msg!(gst::LibraryError::Settings, ["rtp-id {rtp_id} does not match the currently set value {other_name}"])); + return Err(gst::StateChangeError); + } + } + None => { + state.shared_state = Some(SharedRtpState::send_get_or_init(rtp_id.clone())); + } + } + for session in state.sessions.iter_mut() { + if session.rtcp_send_srcpad.is_some() { + session.start_rtcp_task(state_clone.clone()); + } + } + } + _ => (), + } + let success = self.parent_change_state(transition)?; + + match transition { + gst::StateChange::ReadyToNull => { + let mut state = self.state.lock().unwrap(); + for session in state.sessions.iter_mut() { + session.stop_rtcp_task(); + } + } + _ => (), + } + + Ok(success) + } +} + +impl Drop for RtpSend { + fn drop(&mut self) { + if let Some(ref shared_state) = self.state.lock().unwrap().shared_state { + shared_state.unmark_send_outstanding(); + } + } +} diff --git a/net/rtp/src/rtpbin2/session.rs b/net/rtp/src/rtpbin2/session.rs index 59ede807..32d07e18 100644 --- a/net/rtp/src/rtpbin2/session.rs +++ b/net/rtp/src/rtpbin2/session.rs @@ -1614,7 +1614,7 @@ pub(crate) mod tests { pub(crate) fn init_logs() { let _ = gst::init(); - use crate::rtpbin2::imp::GstRustLogger; + use crate::rtpbin2::internal::GstRustLogger; GstRustLogger::install(); } @@ -1680,8 +1680,8 @@ pub(crate) mod tests { .ssrc(ssrc) .sequence_number(seq_no) .timestamp(rtp_ts) - .payload(&payload) - .write_into(&mut rtp_data) + .payload(payload.as_slice()) + .write_into(rtp_data.as_mut_slice()) .unwrap(); rtp_data[..len].to_vec() } diff --git a/net/rtp/src/rtpbin2/source.rs b/net/rtp/src/rtpbin2/source.rs index c1746ebf..6482e5cf 100644 --- a/net/rtp/src/rtpbin2/source.rs +++ b/net/rtp/src/rtpbin2/source.rs @@ -734,7 +734,7 @@ impl RemoteSendSource { }; self.transit = Some(transit); trace!("jitter {} diff {diff}", self.jitter); - self.jitter += diff.saturating_sub((self.jitter + 8) >> 4); + self.jitter += diff.saturating_sub((self.jitter.saturating_add(8)) >> 4); } self.source.payload_type = Some(payload_type); diff --git a/net/rtp/src/rtpbin2/sync.rs b/net/rtp/src/rtpbin2/sync.rs index 31354f02..ff2511ac 100644 --- a/net/rtp/src/rtpbin2/sync.rs +++ b/net/rtp/src/rtpbin2/sync.rs @@ -118,6 +118,10 @@ impl Context { } } + pub fn clock_rate(&self, ssrc_val: u32) -> Option { + self.ssrcs.get(&ssrc_val).and_then(|ssrc| ssrc.clock_rate) + } + fn disassociate(&mut self, ssrc_val: u32, cname: &str) { self.cname_to_largest_delays.remove(cname); diff --git a/net/rtp/tests/rtpbin2.rs b/net/rtp/tests/rtpbin2.rs index 57b79d58..add575bf 100644 --- a/net/rtp/tests/rtpbin2.rs +++ b/net/rtp/tests/rtpbin2.rs @@ -7,12 +7,18 @@ // // SPDX-License-Identifier: MPL-2.0 -use std::sync::{Arc, Mutex}; +use std::sync::{atomic::AtomicUsize, Arc, Mutex}; use gst::{prelude::*, Caps}; use gst_check::Harness; use rtp_types::*; +static ELEMENT_COUNTER: AtomicUsize = AtomicUsize::new(0); + +fn next_element_counter() -> usize { + ELEMENT_COUNTER.fetch_add(1, std::sync::atomic::Ordering::SeqCst) +} + fn init() { use std::sync::Once; static INIT: Once = Once::new(); @@ -34,7 +40,7 @@ fn generate_rtp_buffer(seqno: u16, rtpts: u32, payload_len: usize) -> gst::Buffe .payload_type(TEST_PT) .sequence_number(seqno) .timestamp(rtpts) - .payload(&payload); + .payload(payload.as_slice()); let size = packet.calculate_size().unwrap(); let mut data = vec![0; size]; packet.write_into(&mut data).unwrap(); @@ -44,8 +50,13 @@ fn generate_rtp_buffer(seqno: u16, rtpts: u32, payload_len: usize) -> gst::Buffe #[test] fn test_send() { init(); + let id = next_element_counter(); - let mut h = Harness::with_padnames("rtpbin2", Some("rtp_send_sink_0"), Some("rtp_send_src_0")); + let elem = gst::ElementFactory::make("rtpsend") + .property("rtp-id", id.to_string()) + .build() + .unwrap(); + let mut h = Harness::with_element(&elem, Some("rtp_sink_0"), Some("rtp_src_0")); h.play(); let caps = Caps::builder("application/x-rtp") @@ -89,10 +100,15 @@ fn test_send() { #[test] fn test_receive() { init(); + let id = next_element_counter(); - let h = Arc::new(Mutex::new(Harness::with_padnames( - "rtpbin2", - Some("rtp_recv_sink_0"), + let elem = gst::ElementFactory::make("rtprecv") + .property("rtp-id", id.to_string()) + .build() + .unwrap(); + let h = Arc::new(Mutex::new(Harness::with_element( + &elem, + Some("rtp_sink_0"), None, ))); let weak_h = Arc::downgrade(&h); @@ -124,7 +140,7 @@ fn test_receive() { let push_pad = inner .element() .unwrap() - .static_pad("rtp_recv_sink_0") + .static_pad("rtp_sink_0") .unwrap() .peer() .unwrap(); @@ -181,10 +197,15 @@ fn test_receive() { #[test] fn test_receive_flush() { init(); + let id = next_element_counter(); - let h = Arc::new(Mutex::new(Harness::with_padnames( - "rtpbin2", - Some("rtp_recv_sink_0"), + let elem = gst::ElementFactory::make("rtprecv") + .property("rtp-id", id.to_string()) + .build() + .unwrap(); + let h = Arc::new(Mutex::new(Harness::with_element( + &elem, + Some("rtp_sink_0"), None, ))); let weak_h = Arc::downgrade(&h); @@ -216,7 +237,7 @@ fn test_receive_flush() { let push_pad = inner .element() .unwrap() - .static_pad("rtp_recv_sink_0") + .static_pad("rtp_sink_0") .unwrap() .peer() .unwrap();