rtpbin2/config: add stats to session GObject

This commit is contained in:
Matthew Waters 2024-02-07 16:18:54 +11:00
parent 7e79d91580
commit 36cf3f6aae
2 changed files with 164 additions and 152 deletions

View file

@ -97,6 +97,14 @@ mod imp {
ret.build()
}
pub fn stats(&self) -> Option<gst::Structure> {
let Some(session) = self.session() else {
return None;
};
let session = session.lock().unwrap();
Some(session.stats())
}
}
#[glib::object_subclass]
@ -121,6 +129,7 @@ mod imp {
fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
match pspec.name() {
"pt-map" => self.pt_map().to_value(),
"stats" => self.stats().to_value(),
_ => unreachable!(),
}
}

View file

@ -399,6 +399,160 @@ impl BinSessionInner {
self.pt_map.iter().map(|(&k, v)| (k, v))
}
pub fn stats(&self) -> gst::Structure {
let mut session_stats = gst::Structure::builder("application/x-rtpbin2-session-stats")
.field("id", self.id as u64);
for ssrc in self.session.ssrcs() {
if let Some(ls) = self.session.local_send_source_by_ssrc(ssrc) {
let mut source_stats =
gst::Structure::builder("application/x-rtpbin2-source-stats")
.field("ssrc", ls.ssrc())
.field("sender", true)
.field("local", true)
.field("packets-sent", ls.packet_count())
.field("octets-sent", ls.octet_count())
.field("bitrate", ls.bitrate() as u64);
if let Some(pt) = ls.payload_type() {
if let Some(clock_rate) = self.session.clock_rate_from_pt(pt) {
source_stats = source_stats.field("clock-rate", clock_rate);
}
}
if let Some(sr) = ls.last_sent_sr() {
source_stats = source_stats
.field("sr-ntptime", sr.ntp_timestamp().as_u64())
.field("sr-rtptime", sr.rtp_timestamp())
.field("sr-octet-count", sr.octet_count())
.field("sr-packet-count", sr.packet_count());
}
let rbs = gst::List::new(ls.received_report_blocks().map(
|(sender_ssrc, ReceivedRb { rb, .. })| {
gst::Structure::builder("application/x-rtcp-report-block")
.field("sender-ssrc", sender_ssrc)
.field("rb-fraction-lost", rb.fraction_lost())
.field("rb-packets-lost", rb.cumulative_lost())
.field("rb-extended_sequence_number", rb.extended_sequence_number())
.field("rb-jitter", rb.jitter())
.field("rb-last-sr-ntp-time", rb.last_sr_ntp_time())
.field("rb-delay_since_last-sr-ntp-time", rb.delay_since_last_sr())
.build()
},
));
match rbs.len() {
0 => (),
1 => {
source_stats =
source_stats.field("report-blocks", rbs.first().unwrap().clone());
}
_ => {
source_stats = source_stats.field("report-blocks", rbs);
}
}
// TODO: add jitter, packets-lost
session_stats = session_stats.field(ls.ssrc().to_string(), source_stats.build());
} else if let Some(lr) = self.session.local_receive_source_by_ssrc(ssrc) {
let mut source_stats =
gst::Structure::builder("application/x-rtpbin2-source-stats")
.field("ssrc", lr.ssrc())
.field("sender", false)
.field("local", true);
if let Some(pt) = lr.payload_type() {
if let Some(clock_rate) = self.session.clock_rate_from_pt(pt) {
source_stats = source_stats.field("clock-rate", clock_rate);
}
}
// TODO: add rb stats
session_stats = session_stats.field(lr.ssrc().to_string(), source_stats.build());
} else if let Some(rs) = self.session.remote_send_source_by_ssrc(ssrc) {
let mut source_stats =
gst::Structure::builder("application/x-rtpbin2-source-stats")
.field("ssrc", rs.ssrc())
.field("sender", true)
.field("local", false)
.field("octets-received", rs.octet_count())
.field("packets-received", rs.packet_count())
.field("bitrate", rs.bitrate() as u64)
.field("jitter", rs.jitter())
.field("packets-lost", rs.packets_lost());
if let Some(pt) = rs.payload_type() {
if let Some(clock_rate) = self.session.clock_rate_from_pt(pt) {
source_stats = source_stats.field("clock-rate", clock_rate);
}
}
if let Some(rtp_from) = rs.rtp_from() {
source_stats = source_stats.field("rtp-from", rtp_from.to_string());
}
if let Some(rtcp_from) = rs.rtcp_from() {
source_stats = source_stats.field("rtcp-from", rtcp_from.to_string());
}
if let Some(sr) = rs.last_received_sr() {
source_stats = source_stats
.field("sr-ntptime", sr.ntp_timestamp().as_u64())
.field("sr-rtptime", sr.rtp_timestamp())
.field("sr-octet-count", sr.octet_count())
.field("sr-packet-count", sr.packet_count());
}
if let Some(rb) = rs.last_sent_rb() {
source_stats = source_stats
.field("sent-rb-fraction-lost", rb.fraction_lost())
.field("sent-rb-packets-lost", rb.cumulative_lost())
.field(
"sent-rb-extended-sequence-number",
rb.extended_sequence_number(),
)
.field("sent-rb-jitter", rb.jitter())
.field("sent-rb-last-sr-ntp-time", rb.last_sr_ntp_time())
.field(
"sent-rb-delay-since-last-sr-ntp-time",
rb.delay_since_last_sr(),
);
}
let rbs = gst::List::new(rs.received_report_blocks().map(
|(sender_ssrc, ReceivedRb { rb, .. })| {
gst::Structure::builder("application/x-rtcp-report-block")
.field("sender-ssrc", sender_ssrc)
.field("rb-fraction-lost", rb.fraction_lost())
.field("rb-packets-lost", rb.cumulative_lost())
.field("rb-extended_sequence_number", rb.extended_sequence_number())
.field("rb-jitter", rb.jitter())
.field("rb-last-sr-ntp-time", rb.last_sr_ntp_time())
.field("rb-delay_since_last-sr-ntp-time", rb.delay_since_last_sr())
.build()
},
));
match rbs.len() {
0 => (),
1 => {
source_stats =
source_stats.field("report-blocks", rbs.first().unwrap().clone());
}
_ => {
source_stats = source_stats.field("report-blocks", rbs);
}
}
session_stats = session_stats.field(rs.ssrc().to_string(), source_stats.build());
} else if let Some(rr) = self.session.remote_receive_source_by_ssrc(ssrc) {
let source_stats = gst::Structure::builder("application/x-rtpbin2-source-stats")
.field("ssrc", rr.ssrc())
.field("sender", false)
.field("local", false)
.build();
session_stats = session_stats.field(rr.ssrc().to_string(), source_stats);
}
}
let jb_stats = gst::List::new(self.rtp_recv_srcpads.iter().map(|pad| {
let mut jb_stats = pad.jitter_buffer_store.lock().unwrap().jitterbuffer.stats();
jb_stats.set_value("ssrc", (pad.ssrc as i32).to_send_value());
jb_stats.set_value("pt", (pad.pt as i32).to_send_value());
jb_stats
}));
session_stats = session_stats.field("jitterbuffer-stats", jb_stats);
session_stats.build()
}
fn start_rtp_recv_task(&mut self, pad: &gst::Pad) -> Result<(), glib::BoolError> {
gst::debug!(CAT, obj: pad, "Starting rtp recv src task");
@ -570,159 +724,8 @@ impl State {
for session in self.sessions.iter() {
let sess_id = session.id;
let session = session.inner.lock().unwrap();
let mut session_stats = gst::Structure::builder("application/x-rtp-session-stats");
for ssrc in session.session.ssrcs() {
if let Some(ls) = session.session.local_send_source_by_ssrc(ssrc) {
let mut source_stats =
gst::Structure::builder("application/x-rtp-source-stats")
.field("ssrc", ls.ssrc())
.field("sender", true)
.field("local", true)
.field("packets-sent", ls.packet_count())
.field("octets-sent", ls.octet_count())
.field("bitrate", ls.bitrate() as u64);
if let Some(pt) = ls.payload_type() {
if let Some(clock_rate) = session.session.clock_rate_from_pt(pt) {
source_stats = source_stats.field("clock-rate", clock_rate);
}
}
if let Some(sr) = ls.last_sent_sr() {
source_stats = source_stats
.field("sr-ntptime", sr.ntp_timestamp().as_u64())
.field("sr-rtptime", sr.rtp_timestamp())
.field("sr-octet-count", sr.octet_count())
.field("sr-packet-count", sr.packet_count());
}
let rbs = gst::List::new(ls.received_report_blocks().map(
|(sender_ssrc, ReceivedRb { rb, .. })| {
gst::Structure::builder("application/x-rtcp-report-block")
.field("sender-ssrc", sender_ssrc)
.field("rb-fraction-lost", rb.fraction_lost())
.field("rb-packets-lost", rb.cumulative_lost())
.field("rb-extended_sequence_number", rb.extended_sequence_number())
.field("rb-jitter", rb.jitter())
.field("rb-last-sr-ntp-time", rb.last_sr_ntp_time())
.field("rb-delay_since_last-sr-ntp-time", rb.delay_since_last_sr())
.build()
},
));
match rbs.len() {
0 => (),
1 => {
source_stats =
source_stats.field("report-blocks", rbs.first().unwrap().clone());
}
_ => {
source_stats = source_stats.field("report-blocks", rbs);
}
}
// TODO: add jitter, packets-lost
session_stats =
session_stats.field(ls.ssrc().to_string(), source_stats.build());
} else if let Some(lr) = session.session.local_receive_source_by_ssrc(ssrc) {
let mut source_stats =
gst::Structure::builder("application/x-rtp-source-stats")
.field("ssrc", lr.ssrc())
.field("sender", false)
.field("local", true);
if let Some(pt) = lr.payload_type() {
if let Some(clock_rate) = session.session.clock_rate_from_pt(pt) {
source_stats = source_stats.field("clock-rate", clock_rate);
}
}
// TODO: add rb stats
session_stats =
session_stats.field(lr.ssrc().to_string(), source_stats.build());
} else if let Some(rs) = session.session.remote_send_source_by_ssrc(ssrc) {
let mut source_stats =
gst::Structure::builder("application/x-rtp-source-stats")
.field("ssrc", rs.ssrc())
.field("sender", true)
.field("local", false)
.field("octets-received", rs.octet_count())
.field("packets-received", rs.packet_count())
.field("bitrate", rs.bitrate() as u64)
.field("jitter", rs.jitter())
.field("packets-lost", rs.packets_lost());
if let Some(pt) = rs.payload_type() {
if let Some(clock_rate) = session.session.clock_rate_from_pt(pt) {
source_stats = source_stats.field("clock-rate", clock_rate);
}
}
if let Some(rtp_from) = rs.rtp_from() {
source_stats = source_stats.field("rtp-from", rtp_from.to_string());
}
if let Some(rtcp_from) = rs.rtcp_from() {
source_stats = source_stats.field("rtcp-from", rtcp_from.to_string());
}
if let Some(sr) = rs.last_received_sr() {
source_stats = source_stats
.field("sr-ntptime", sr.ntp_timestamp().as_u64())
.field("sr-rtptime", sr.rtp_timestamp())
.field("sr-octet-count", sr.octet_count())
.field("sr-packet-count", sr.packet_count());
}
if let Some(rb) = rs.last_sent_rb() {
source_stats = source_stats
.field("sent-rb-fraction-lost", rb.fraction_lost())
.field("sent-rb-packets-lost", rb.cumulative_lost())
.field(
"sent-rb-extended-sequence-number",
rb.extended_sequence_number(),
)
.field("sent-rb-jitter", rb.jitter())
.field("sent-rb-last-sr-ntp-time", rb.last_sr_ntp_time())
.field(
"sent-rb-delay-since-last-sr-ntp-time",
rb.delay_since_last_sr(),
);
}
let rbs = gst::List::new(rs.received_report_blocks().map(
|(sender_ssrc, ReceivedRb { rb, .. })| {
gst::Structure::builder("application/x-rtcp-report-block")
.field("sender-ssrc", sender_ssrc)
.field("rb-fraction-lost", rb.fraction_lost())
.field("rb-packets-lost", rb.cumulative_lost())
.field("rb-extended_sequence_number", rb.extended_sequence_number())
.field("rb-jitter", rb.jitter())
.field("rb-last-sr-ntp-time", rb.last_sr_ntp_time())
.field("rb-delay_since_last-sr-ntp-time", rb.delay_since_last_sr())
.build()
},
));
match rbs.len() {
0 => (),
1 => {
source_stats =
source_stats.field("report-blocks", rbs.first().unwrap().clone());
}
_ => {
source_stats = source_stats.field("report-blocks", rbs);
}
}
session_stats =
session_stats.field(rs.ssrc().to_string(), source_stats.build());
} else if let Some(rr) = session.session.remote_receive_source_by_ssrc(ssrc) {
let source_stats = gst::Structure::builder("application/x-rtp-source-stats")
.field("ssrc", rr.ssrc())
.field("sender", false)
.field("local", false)
.build();
session_stats = session_stats.field(rr.ssrc().to_string(), source_stats);
}
}
let jb_stats = gst::List::new(session.rtp_recv_srcpads.iter().map(|pad| {
let mut jb_stats = pad.jitter_buffer_store.lock().unwrap().jitterbuffer.stats();
jb_stats.set_value("ssrc", (pad.ssrc as i32).to_send_value());
jb_stats.set_value("pt", (pad.pt as i32).to_send_value());
jb_stats
}));
session_stats = session_stats.field("jitterbuffer-stats", jb_stats);
ret = ret.field(sess_id.to_string(), session_stats.build());
ret = ret.field(sess_id.to_string(), session.stats());
}
ret.build()
}