From d480c6c2d3ed27b0f1feb69a87ac6336908e9d1b Mon Sep 17 00:00:00 2001 From: Matthew Waters Date: Wed, 7 Feb 2024 16:18:54 +1100 Subject: [PATCH] rtpbin2/config: add stats to session GObject Part-of: --- net/rtp/src/rtpbin2/config.rs | 7 + net/rtp/src/rtpbin2/imp.rs | 307 +++++++++++++++++----------------- 2 files changed, 162 insertions(+), 152 deletions(-) diff --git a/net/rtp/src/rtpbin2/config.rs b/net/rtp/src/rtpbin2/config.rs index fd89cab0..7757341d 100644 --- a/net/rtp/src/rtpbin2/config.rs +++ b/net/rtp/src/rtpbin2/config.rs @@ -96,6 +96,12 @@ mod imp { ret.build() } + + pub fn stats(&self) -> Option { + let session = self.session()?; + let session = session.lock().unwrap(); + Some(session.stats()) + } } #[glib::object_subclass] @@ -120,6 +126,7 @@ mod imp { fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { match pspec.name() { "pt-map" => self.pt_map().to_value(), + "stats" => self.stats().to_value(), _ => unreachable!(), } } diff --git a/net/rtp/src/rtpbin2/imp.rs b/net/rtp/src/rtpbin2/imp.rs index f109fb16..23c4ded2 100644 --- a/net/rtp/src/rtpbin2/imp.rs +++ b/net/rtp/src/rtpbin2/imp.rs @@ -399,6 +399,160 @@ impl BinSessionInner { self.pt_map.iter().map(|(&k, v)| (k, v)) } + pub fn stats(&self) -> gst::Structure { + let mut session_stats = gst::Structure::builder("application/x-rtpbin2-session-stats") + .field("id", self.id as u64); + for ssrc in self.session.ssrcs() { + if let Some(ls) = self.session.local_send_source_by_ssrc(ssrc) { + let mut source_stats = + gst::Structure::builder("application/x-rtpbin2-source-stats") + .field("ssrc", ls.ssrc()) + .field("sender", true) + .field("local", true) + .field("packets-sent", ls.packet_count()) + .field("octets-sent", ls.octet_count()) + .field("bitrate", ls.bitrate() as u64); + if let Some(pt) = ls.payload_type() { + if let Some(clock_rate) = self.session.clock_rate_from_pt(pt) { + source_stats = source_stats.field("clock-rate", clock_rate); + } + } + if let Some(sr) = ls.last_sent_sr() { + source_stats = source_stats + .field("sr-ntptime", sr.ntp_timestamp().as_u64()) + .field("sr-rtptime", sr.rtp_timestamp()) + .field("sr-octet-count", sr.octet_count()) + .field("sr-packet-count", sr.packet_count()); + } + let rbs = gst::List::new(ls.received_report_blocks().map( + |(sender_ssrc, ReceivedRb { rb, .. })| { + gst::Structure::builder("application/x-rtcp-report-block") + .field("sender-ssrc", sender_ssrc) + .field("rb-fraction-lost", rb.fraction_lost()) + .field("rb-packets-lost", rb.cumulative_lost()) + .field("rb-extended_sequence_number", rb.extended_sequence_number()) + .field("rb-jitter", rb.jitter()) + .field("rb-last-sr-ntp-time", rb.last_sr_ntp_time()) + .field("rb-delay_since_last-sr-ntp-time", rb.delay_since_last_sr()) + .build() + }, + )); + match rbs.len() { + 0 => (), + 1 => { + source_stats = + source_stats.field("report-blocks", rbs.first().unwrap().clone()); + } + _ => { + source_stats = source_stats.field("report-blocks", rbs); + } + } + + // TODO: add jitter, packets-lost + session_stats = session_stats.field(ls.ssrc().to_string(), source_stats.build()); + } else if let Some(lr) = self.session.local_receive_source_by_ssrc(ssrc) { + let mut source_stats = + gst::Structure::builder("application/x-rtpbin2-source-stats") + .field("ssrc", lr.ssrc()) + .field("sender", false) + .field("local", true); + if let Some(pt) = lr.payload_type() { + if let Some(clock_rate) = self.session.clock_rate_from_pt(pt) { + source_stats = source_stats.field("clock-rate", clock_rate); + } + } + // TODO: add rb stats + session_stats = session_stats.field(lr.ssrc().to_string(), source_stats.build()); + } else if let Some(rs) = self.session.remote_send_source_by_ssrc(ssrc) { + let mut source_stats = + gst::Structure::builder("application/x-rtpbin2-source-stats") + .field("ssrc", rs.ssrc()) + .field("sender", true) + .field("local", false) + .field("octets-received", rs.octet_count()) + .field("packets-received", rs.packet_count()) + .field("bitrate", rs.bitrate() as u64) + .field("jitter", rs.jitter()) + .field("packets-lost", rs.packets_lost()); + if let Some(pt) = rs.payload_type() { + if let Some(clock_rate) = self.session.clock_rate_from_pt(pt) { + source_stats = source_stats.field("clock-rate", clock_rate); + } + } + if let Some(rtp_from) = rs.rtp_from() { + source_stats = source_stats.field("rtp-from", rtp_from.to_string()); + } + if let Some(rtcp_from) = rs.rtcp_from() { + source_stats = source_stats.field("rtcp-from", rtcp_from.to_string()); + } + if let Some(sr) = rs.last_received_sr() { + source_stats = source_stats + .field("sr-ntptime", sr.ntp_timestamp().as_u64()) + .field("sr-rtptime", sr.rtp_timestamp()) + .field("sr-octet-count", sr.octet_count()) + .field("sr-packet-count", sr.packet_count()); + } + if let Some(rb) = rs.last_sent_rb() { + source_stats = source_stats + .field("sent-rb-fraction-lost", rb.fraction_lost()) + .field("sent-rb-packets-lost", rb.cumulative_lost()) + .field( + "sent-rb-extended-sequence-number", + rb.extended_sequence_number(), + ) + .field("sent-rb-jitter", rb.jitter()) + .field("sent-rb-last-sr-ntp-time", rb.last_sr_ntp_time()) + .field( + "sent-rb-delay-since-last-sr-ntp-time", + rb.delay_since_last_sr(), + ); + } + let rbs = gst::List::new(rs.received_report_blocks().map( + |(sender_ssrc, ReceivedRb { rb, .. })| { + gst::Structure::builder("application/x-rtcp-report-block") + .field("sender-ssrc", sender_ssrc) + .field("rb-fraction-lost", rb.fraction_lost()) + .field("rb-packets-lost", rb.cumulative_lost()) + .field("rb-extended_sequence_number", rb.extended_sequence_number()) + .field("rb-jitter", rb.jitter()) + .field("rb-last-sr-ntp-time", rb.last_sr_ntp_time()) + .field("rb-delay_since_last-sr-ntp-time", rb.delay_since_last_sr()) + .build() + }, + )); + match rbs.len() { + 0 => (), + 1 => { + source_stats = + source_stats.field("report-blocks", rbs.first().unwrap().clone()); + } + _ => { + source_stats = source_stats.field("report-blocks", rbs); + } + } + session_stats = session_stats.field(rs.ssrc().to_string(), source_stats.build()); + } else if let Some(rr) = self.session.remote_receive_source_by_ssrc(ssrc) { + let source_stats = gst::Structure::builder("application/x-rtpbin2-source-stats") + .field("ssrc", rr.ssrc()) + .field("sender", false) + .field("local", false) + .build(); + session_stats = session_stats.field(rr.ssrc().to_string(), source_stats); + } + } + + let jb_stats = gst::List::new(self.rtp_recv_srcpads.iter().map(|pad| { + let mut jb_stats = pad.jitter_buffer_store.lock().unwrap().jitterbuffer.stats(); + jb_stats.set_value("ssrc", (pad.ssrc as i32).to_send_value()); + jb_stats.set_value("pt", (pad.pt as i32).to_send_value()); + jb_stats + })); + + session_stats = session_stats.field("jitterbuffer-stats", jb_stats); + + session_stats.build() + } + fn start_rtp_recv_task(&mut self, pad: &gst::Pad) -> Result<(), glib::BoolError> { gst::debug!(CAT, obj: pad, "Starting rtp recv src task"); @@ -574,159 +728,8 @@ impl State { for session in self.sessions.iter() { let sess_id = session.id; let session = session.inner.lock().unwrap(); - let mut session_stats = gst::Structure::builder("application/x-rtp-session-stats"); - for ssrc in session.session.ssrcs() { - if let Some(ls) = session.session.local_send_source_by_ssrc(ssrc) { - let mut source_stats = - gst::Structure::builder("application/x-rtp-source-stats") - .field("ssrc", ls.ssrc()) - .field("sender", true) - .field("local", true) - .field("packets-sent", ls.packet_count()) - .field("octets-sent", ls.octet_count()) - .field("bitrate", ls.bitrate() as u64); - if let Some(pt) = ls.payload_type() { - if let Some(clock_rate) = session.session.clock_rate_from_pt(pt) { - source_stats = source_stats.field("clock-rate", clock_rate); - } - } - if let Some(sr) = ls.last_sent_sr() { - source_stats = source_stats - .field("sr-ntptime", sr.ntp_timestamp().as_u64()) - .field("sr-rtptime", sr.rtp_timestamp()) - .field("sr-octet-count", sr.octet_count()) - .field("sr-packet-count", sr.packet_count()); - } - let rbs = gst::List::new(ls.received_report_blocks().map( - |(sender_ssrc, ReceivedRb { rb, .. })| { - gst::Structure::builder("application/x-rtcp-report-block") - .field("sender-ssrc", sender_ssrc) - .field("rb-fraction-lost", rb.fraction_lost()) - .field("rb-packets-lost", rb.cumulative_lost()) - .field("rb-extended_sequence_number", rb.extended_sequence_number()) - .field("rb-jitter", rb.jitter()) - .field("rb-last-sr-ntp-time", rb.last_sr_ntp_time()) - .field("rb-delay_since_last-sr-ntp-time", rb.delay_since_last_sr()) - .build() - }, - )); - match rbs.len() { - 0 => (), - 1 => { - source_stats = - source_stats.field("report-blocks", rbs.first().unwrap().clone()); - } - _ => { - source_stats = source_stats.field("report-blocks", rbs); - } - } - // TODO: add jitter, packets-lost - session_stats = - session_stats.field(ls.ssrc().to_string(), source_stats.build()); - } else if let Some(lr) = session.session.local_receive_source_by_ssrc(ssrc) { - let mut source_stats = - gst::Structure::builder("application/x-rtp-source-stats") - .field("ssrc", lr.ssrc()) - .field("sender", false) - .field("local", true); - if let Some(pt) = lr.payload_type() { - if let Some(clock_rate) = session.session.clock_rate_from_pt(pt) { - source_stats = source_stats.field("clock-rate", clock_rate); - } - } - // TODO: add rb stats - session_stats = - session_stats.field(lr.ssrc().to_string(), source_stats.build()); - } else if let Some(rs) = session.session.remote_send_source_by_ssrc(ssrc) { - let mut source_stats = - gst::Structure::builder("application/x-rtp-source-stats") - .field("ssrc", rs.ssrc()) - .field("sender", true) - .field("local", false) - .field("octets-received", rs.octet_count()) - .field("packets-received", rs.packet_count()) - .field("bitrate", rs.bitrate() as u64) - .field("jitter", rs.jitter()) - .field("packets-lost", rs.packets_lost()); - if let Some(pt) = rs.payload_type() { - if let Some(clock_rate) = session.session.clock_rate_from_pt(pt) { - source_stats = source_stats.field("clock-rate", clock_rate); - } - } - if let Some(rtp_from) = rs.rtp_from() { - source_stats = source_stats.field("rtp-from", rtp_from.to_string()); - } - if let Some(rtcp_from) = rs.rtcp_from() { - source_stats = source_stats.field("rtcp-from", rtcp_from.to_string()); - } - if let Some(sr) = rs.last_received_sr() { - source_stats = source_stats - .field("sr-ntptime", sr.ntp_timestamp().as_u64()) - .field("sr-rtptime", sr.rtp_timestamp()) - .field("sr-octet-count", sr.octet_count()) - .field("sr-packet-count", sr.packet_count()); - } - if let Some(rb) = rs.last_sent_rb() { - source_stats = source_stats - .field("sent-rb-fraction-lost", rb.fraction_lost()) - .field("sent-rb-packets-lost", rb.cumulative_lost()) - .field( - "sent-rb-extended-sequence-number", - rb.extended_sequence_number(), - ) - .field("sent-rb-jitter", rb.jitter()) - .field("sent-rb-last-sr-ntp-time", rb.last_sr_ntp_time()) - .field( - "sent-rb-delay-since-last-sr-ntp-time", - rb.delay_since_last_sr(), - ); - } - let rbs = gst::List::new(rs.received_report_blocks().map( - |(sender_ssrc, ReceivedRb { rb, .. })| { - gst::Structure::builder("application/x-rtcp-report-block") - .field("sender-ssrc", sender_ssrc) - .field("rb-fraction-lost", rb.fraction_lost()) - .field("rb-packets-lost", rb.cumulative_lost()) - .field("rb-extended_sequence_number", rb.extended_sequence_number()) - .field("rb-jitter", rb.jitter()) - .field("rb-last-sr-ntp-time", rb.last_sr_ntp_time()) - .field("rb-delay_since_last-sr-ntp-time", rb.delay_since_last_sr()) - .build() - }, - )); - match rbs.len() { - 0 => (), - 1 => { - source_stats = - source_stats.field("report-blocks", rbs.first().unwrap().clone()); - } - _ => { - source_stats = source_stats.field("report-blocks", rbs); - } - } - session_stats = - session_stats.field(rs.ssrc().to_string(), source_stats.build()); - } else if let Some(rr) = session.session.remote_receive_source_by_ssrc(ssrc) { - let source_stats = gst::Structure::builder("application/x-rtp-source-stats") - .field("ssrc", rr.ssrc()) - .field("sender", false) - .field("local", false) - .build(); - session_stats = session_stats.field(rr.ssrc().to_string(), source_stats); - } - } - - let jb_stats = gst::List::new(session.rtp_recv_srcpads.iter().map(|pad| { - let mut jb_stats = pad.jitter_buffer_store.lock().unwrap().jitterbuffer.stats(); - jb_stats.set_value("ssrc", (pad.ssrc as i32).to_send_value()); - jb_stats.set_value("pt", (pad.pt as i32).to_send_value()); - jb_stats - })); - - session_stats = session_stats.field("jitterbuffer-stats", jb_stats); - - ret = ret.field(sess_id.to_string(), session_stats.build()); + ret = ret.field(sess_id.to_string(), session.stats()); } ret.build() }