webrtcbin: Fixes for bundled statistics generation

When multiple streams are bundled on the same transport,
the statistics would end up incorrectly generated,
as each pad would regenerate stats for every ssrc on the
transport, overwriting previous iterations and assigning
bogus media kind and other values to the wrong ssrc.

Fix by making sure each pad only loops and generates
statistics for the one ssrc that pad is receiving / sending.

Add a unit test that the codec kind field in RTP statistics
are now generated correctly.

Fixes: https://gitlab.freedesktop.org/gstreamer/gstreamer/-/issues/2555
Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/7338>
This commit is contained in:
Jan Schmidt 2024-08-12 22:17:14 +10:00
parent de6de83986
commit 7da5d03b29
2 changed files with 188 additions and 59 deletions

View file

@ -106,10 +106,10 @@ _gst_structure_take_structure (GstStructure * s, const char *fieldname,
/* https://www.w3.org/TR/webrtc-stats/#remoteinboundrtpstats-dict* */
static gboolean
_get_stats_from_remote_rtp_source_stats (GstWebRTCBin * webrtc,
TransportStream * stream, const GstStructure * source_stats,
guint ssrc, guint clock_rate, const gchar * codec_id, const gchar * kind,
const gchar * transport_id, GstStructure * s)
_get_stats_from_remote_rtp_source_stats (TransportStream * stream,
const GstStructure * source_stats, guint ssrc, guint clock_rate,
const gchar * codec_id, const gchar * kind, const gchar * transport_id,
GstStructure * s)
{
gboolean have_rb = FALSE, internal = FALSE;
int lost;
@ -207,10 +207,9 @@ _get_stats_from_remote_rtp_source_stats (GstWebRTCBin * webrtc,
/* https://www.w3.org/TR/webrtc-stats/#inboundrtpstats-dict*
https://www.w3.org/TR/webrtc-stats/#outboundrtpstats-dict* */
static void
_get_stats_from_rtp_source_stats (GstWebRTCBin * webrtc,
TransportStream * stream, const GstStructure * source_stats,
const gchar * codec_id, const gchar * kind, const gchar * transport_id,
GstStructure * s)
_get_stats_from_rtp_source_stats (TransportStream * stream,
const GstStructure * source_stats, const gchar * codec_id,
const gchar * kind, const gchar * transport_id, GstStructure * s)
{
guint ssrc, fir, pli, nack, jitter;
int clock_rate;
@ -875,45 +874,54 @@ _get_codec_stats_from_pad (GstWebRTCBin * webrtc, GstPad * pad,
struct transport_stream_stats
{
GstWebRTCBin *webrtc;
TransportStream *stream;
char *transport_id;
char *codec_id;
guint ssrc;
const char *kind;
guint clock_rate;
GValueArray *source_stats;
GstStructure *s;
GstStructure *s; /* Return value stats accumulator */
};
static gboolean
webrtc_stats_get_from_transport (SsrcMapItem * entry,
webrtc_stats_get_from_transport_for_one_ssrc (SsrcMapItem * entry,
struct transport_stream_stats *ts_stats)
{
double ts;
int i;
/* We're only interested in the map entry for the ssrc for the
* pad under inspection */
if (ts_stats->ssrc != entry->ssrc)
return FALSE; /* Continue iterating */
gst_structure_get_double (ts_stats->s, "timestamp", &ts);
/* construct stats objects */
for (i = 0; i < ts_stats->source_stats->n_values; i++) {
const GstStructure *stats;
const GValue *val = g_value_array_get_nth (ts_stats->source_stats, i);
guint stats_ssrc = 0;
const GstStructure *stats = gst_value_get_structure (val);
stats = gst_value_get_structure (val);
guint stats_ssrc = 0;
/* skip foreign sources */
if (gst_structure_get_uint (stats, "ssrc", &stats_ssrc) &&
entry->ssrc == stats_ssrc)
_get_stats_from_rtp_source_stats (ts_stats->webrtc, ts_stats->stream,
stats, ts_stats->codec_id, ts_stats->kind, ts_stats->transport_id,
ts_stats->s);
else if (gst_structure_get_uint (stats, "rb-ssrc", &stats_ssrc)
&& entry->ssrc == stats_ssrc)
_get_stats_from_remote_rtp_source_stats (ts_stats->webrtc,
ts_stats->stream, stats, entry->ssrc, ts_stats->clock_rate,
entry->ssrc == stats_ssrc) {
GST_TRACE ("Found source stats for ssrc %u: %" GST_PTR_FORMAT, stats_ssrc,
stats);
_get_stats_from_rtp_source_stats (ts_stats->stream, stats,
ts_stats->codec_id, ts_stats->kind, ts_stats->transport_id,
ts_stats->s);
} else if (gst_structure_get_uint (stats, "rb-ssrc", &stats_ssrc)
&& entry->ssrc == stats_ssrc) {
GST_TRACE ("Found remote source stats for ssrc %u: %" GST_PTR_FORMAT,
stats_ssrc, stats);
_get_stats_from_remote_rtp_source_stats (ts_stats->stream, stats,
entry->ssrc, ts_stats->clock_rate, ts_stats->codec_id, ts_stats->kind,
ts_stats->transport_id, ts_stats->s);
}
}
/* we want to look at all the entries */
@ -925,14 +933,14 @@ _get_stats_from_pad (GstWebRTCBin * webrtc, GstPad * pad, GstStructure * s)
{
GstWebRTCBinPad *wpad = GST_WEBRTC_BIN_PAD (pad);
struct transport_stream_stats ts_stats = { NULL, };
guint ssrc, clock_rate;
guint clock_rate;
GObject *rtp_session;
GObject *gst_rtp_session;
GstStructure *rtp_stats, *twcc_stats;
GstWebRTCKind kind;
_get_codec_stats_from_pad (webrtc, pad, s, &ts_stats.codec_id, &ssrc,
&clock_rate);
_get_codec_stats_from_pad (webrtc, pad, s, &ts_stats.codec_id,
&ts_stats.ssrc, &clock_rate);
if (!wpad->trans)
goto out;
@ -983,7 +991,7 @@ _get_stats_from_pad (GstWebRTCBin * webrtc, GstPad * pad, GstStructure * s)
ts_stats.clock_rate = clock_rate;
transport_stream_find_ssrc_map_item (ts_stats.stream, &ts_stats,
(FindSsrcMapFunc) webrtc_stats_get_from_transport);
(FindSsrcMapFunc) webrtc_stats_get_from_transport_for_one_ssrc);
g_clear_object (&rtp_session);
g_clear_object (&gst_rtp_session);

View file

@ -1185,6 +1185,9 @@ create_audio_test (void)
return t;
}
static void add_audio_test_src_harness (GstHarness * h, guint ssrc);
static void add_video_test_src_harness (GstHarness * h, guint ssrc);
static void
on_new_transceiver_expected_kind (GstWebRTCBin * webrtc,
GstWebRTCRTPTransceiver * trans, gpointer user_data)
@ -1599,6 +1602,7 @@ validate_rtc_stream_stats (const GstStructure * s, const GstStructure * stats)
{
gchar *codec_id, *transport_id, *kind;
GstStructure *codec, *transport;
guint ssrc;
fail_unless (gst_structure_get (s, "codec-id", G_TYPE_STRING, &codec_id,
NULL));
@ -1616,8 +1620,18 @@ validate_rtc_stream_stats (const GstStructure * s, const GstStructure * stats)
gst_structure_free (transport);
gst_structure_free (codec);
fail_unless (gst_structure_get (s, "ssrc", G_TYPE_UINT, &ssrc, NULL));
fail_unless (gst_structure_get (s, "kind", G_TYPE_STRING, &kind, NULL));
fail_unless (g_str_equal (kind, "audio") || g_str_equal (kind, "video"));
// Using ssrc to differentiate video from audio streams is the easiest
// way, otherwise we have to pass the info some other way or look up
// caps in the codec stats entries.
if (ssrc == 0xCAFECAFE) {
fail_unless (g_str_equal (kind, "video"));
} else {
fail_unless (g_str_equal (kind, "audio"));
}
g_free (codec_id);
g_free (transport_id);
@ -1627,7 +1641,7 @@ validate_rtc_stream_stats (const GstStructure * s, const GstStructure * stats)
static void
validate_inbound_rtp_stats (const GstStructure * s, const GstStructure * stats)
{
guint ssrc, fir, pli, nack;
guint fir, pli, nack;
gint64 packets_lost;
guint64 packets_received, bytes_received;
double jitter;
@ -1636,7 +1650,6 @@ validate_inbound_rtp_stats (const GstStructure * s, const GstStructure * stats)
validate_rtc_stream_stats (s, stats);
fail_unless (gst_structure_get (s, "ssrc", G_TYPE_UINT, &ssrc, NULL));
fail_unless (gst_structure_get (s, "fir-count", G_TYPE_UINT, &fir, NULL));
fail_unless (gst_structure_get (s, "pli-count", G_TYPE_UINT, &pli, NULL));
fail_unless (gst_structure_get (s, "nack-count", G_TYPE_UINT, &nack, NULL));
@ -1661,7 +1674,6 @@ static void
validate_remote_inbound_rtp_stats (const GstStructure * s,
const GstStructure * stats)
{
guint ssrc;
gint64 packets_lost;
double jitter, rtt;
gchar *local_id;
@ -1669,7 +1681,6 @@ validate_remote_inbound_rtp_stats (const GstStructure * s,
validate_rtc_stream_stats (s, stats);
fail_unless (gst_structure_get (s, "ssrc", G_TYPE_UINT, &ssrc, NULL));
fail_unless (gst_structure_get (s, "jitter", G_TYPE_DOUBLE, &jitter, NULL));
fail_unless (gst_structure_get (s, "packets-lost", G_TYPE_INT64,
&packets_lost, NULL));
@ -1688,14 +1699,13 @@ validate_remote_inbound_rtp_stats (const GstStructure * s,
static void
validate_outbound_rtp_stats (const GstStructure * s, const GstStructure * stats)
{
guint ssrc, fir, pli, nack;
guint fir, pli, nack;
guint64 packets_sent, bytes_sent;
gchar *remote_id;
GstStructure *remote;
validate_rtc_stream_stats (s, stats);
fail_unless (gst_structure_get (s, "ssrc", G_TYPE_UINT, &ssrc, NULL));
fail_unless (gst_structure_get (s, "fir-count", G_TYPE_UINT, &fir, NULL));
fail_unless (gst_structure_get (s, "pli-count", G_TYPE_UINT, &pli, NULL));
fail_unless (gst_structure_get (s, "nack-count", G_TYPE_UINT, &nack, NULL));
@ -1717,13 +1727,11 @@ static void
validate_remote_outbound_rtp_stats (const GstStructure * s,
const GstStructure * stats)
{
guint ssrc;
gchar *local_id;
GstStructure *local;
validate_rtc_stream_stats (s, stats);
fail_unless (gst_structure_get (s, "ssrc", G_TYPE_UINT, &ssrc, NULL));
fail_unless (gst_structure_get (s, "local-id", G_TYPE_STRING, &local_id,
NULL));
fail_unless (gst_structure_get (stats, local_id, GST_TYPE_STRUCTURE, &local,
@ -1769,17 +1777,32 @@ validate_peer_connection_stats (const GstStructure * s)
fail_unless (opened >= closed);
}
struct stats_check_state
{
struct test_webrtc *t;
gint n_streams;
const GstStructure *stats;
gboolean saw_outbound_rtp;
gboolean saw_remote_outbound_rtp;
gboolean saw_inbound_rtp;
gboolean saw_remote_inbound_rtp;
};
static gboolean
validate_stats_foreach (GQuark field_id, const GValue * value,
const GstStructure * stats)
gpointer user_data)
{
struct stats_check_state *state = (struct stats_check_state *) (user_data);
const GstStructure *stats = state->stats;
const gchar *field = g_quark_to_string (field_id);
GstWebRTCStatsType type;
const GstStructure *s;
fail_unless (GST_VALUE_HOLDS_STRUCTURE (value));
s = gst_value_get_structure (value);
const GstStructure *s = gst_value_get_structure (value);
GST_INFO ("validating field %s %" GST_PTR_FORMAT, field, s);
@ -1789,12 +1812,16 @@ validate_stats_foreach (GQuark field_id, const GValue * value,
validate_codec_stats (s);
} else if (type == GST_WEBRTC_STATS_INBOUND_RTP) {
validate_inbound_rtp_stats (s, stats);
state->saw_inbound_rtp = TRUE;
} else if (type == GST_WEBRTC_STATS_OUTBOUND_RTP) {
validate_outbound_rtp_stats (s, stats);
state->saw_outbound_rtp = TRUE;
} else if (type == GST_WEBRTC_STATS_REMOTE_INBOUND_RTP) {
validate_remote_inbound_rtp_stats (s, stats);
state->saw_remote_inbound_rtp = TRUE;
} else if (type == GST_WEBRTC_STATS_REMOTE_OUTBOUND_RTP) {
validate_remote_outbound_rtp_stats (s, stats);
state->saw_remote_outbound_rtp = TRUE;
} else if (type == GST_WEBRTC_STATS_CSRC) {
} else if (type == GST_WEBRTC_STATS_PEER_CONNECTION) {
validate_peer_connection_stats (s);
@ -1815,29 +1842,38 @@ validate_stats_foreach (GQuark field_id, const GValue * value,
}
static void
validate_stats (const GstStructure * stats)
validate_stats (struct stats_check_state *state)
{
gst_structure_foreach (stats,
(GstStructureForeachFunc) validate_stats_foreach, (gpointer) stats);
gst_structure_foreach (state->stats,
(GstStructureForeachFunc) validate_stats_foreach, (gpointer) state);
}
static void
_on_stats (GstPromise * promise, gpointer user_data)
{
struct test_webrtc *t = user_data;
struct stats_check_state *state = user_data;
struct test_webrtc *t = state->t;
const GstStructure *reply = gst_promise_get_reply (promise);
int i;
validate_stats (reply);
GST_LOG ("Got stats %" GST_PTR_FORMAT, reply);
state->stats = reply;
g_mutex_lock (&t->lock);
i = GPOINTER_TO_INT (t->user_data);
i++;
t->user_data = GINT_TO_POINTER (i);
g_mutex_unlock (&t->lock);
validate_stats (state);
if (i >= 2)
if (state->n_streams > 0 && state->saw_inbound_rtp && state->saw_outbound_rtp
&& state->saw_remote_inbound_rtp && state->saw_remote_outbound_rtp) {
g_mutex_lock (&t->lock);
i = GPOINTER_TO_INT (t->user_data);
i++;
t->user_data = GINT_TO_POINTER (i);
g_mutex_unlock (&t->lock);
if (i >= 2)
test_webrtc_signal_state (t, STATE_CUSTOM);
} else {
test_webrtc_signal_state (t, STATE_CUSTOM);
}
gst_promise_unref (promise);
}
@ -1845,15 +1881,16 @@ _on_stats (GstPromise * promise, gpointer user_data)
GST_START_TEST (test_session_stats)
{
struct test_webrtc *t = test_webrtc_new ();
struct stats_check_state state = {.t = t, 0 };
GstPromise *p;
/* test that the stats generated without any streams are sane */
t->on_negotiation_needed = NULL;
test_validate_sdp (t, NULL, NULL);
p = gst_promise_new_with_change_func (_on_stats, t, NULL);
p = gst_promise_new_with_change_func (_on_stats, &state, NULL);
g_signal_emit_by_name (t->webrtc1, "get-stats", NULL, p);
p = gst_promise_new_with_change_func (_on_stats, t, NULL);
p = gst_promise_new_with_change_func (_on_stats, &state, NULL);
g_signal_emit_by_name (t->webrtc2, "get-stats", NULL, p);
test_webrtc_wait_for_state_mask (t, 1 << STATE_CUSTOM);
@ -1866,9 +1903,8 @@ GST_END_TEST;
GST_START_TEST (test_stats_with_stream)
{
struct test_webrtc *t = create_audio_test ();
struct stats_check_state state = {.t = t, 0 };
GstPromise *p;
GstCaps *caps;
GstPad *pad;
/* test that the stats generated with stream are sane */
@ -1889,22 +1925,81 @@ GST_START_TEST (test_stats_with_stream)
GST_STATE_PLAYING) == GST_STATE_CHANGE_FAILURE);
/* set caps for webrtcbin sink to validate codec stats */
caps = gst_caps_from_string (OPUS_RTP_CAPS (96));
pad = gst_element_get_static_pad (t->webrtc1, "sink_0");
GstCaps *caps = gst_caps_from_string (OPUS_RTP_CAPS (96));
GstPad *pad = gst_element_get_static_pad (t->webrtc1, "sink_0");
gst_pad_set_caps (pad, caps);
gst_caps_unref (caps);
gst_object_unref (pad);
test_webrtc_wait_for_answer_error_eos (t);
test_webrtc_signal_state (t, STATE_REMOTE_ANSWER_SET);
p = gst_promise_new_with_change_func (_on_stats, t, NULL);
p = gst_promise_new_with_change_func (_on_stats, &state, NULL);
g_signal_emit_by_name (t->webrtc1, "get-stats", NULL, p);
p = gst_promise_new_with_change_func (_on_stats, t, NULL);
p = gst_promise_new_with_change_func (_on_stats, &state, NULL);
g_signal_emit_by_name (t->webrtc2, "get-stats", NULL, p);
test_webrtc_wait_for_state_mask (t, 1 << STATE_CUSTOM);
gst_object_unref (pad);
test_webrtc_free (t);
}
GST_END_TEST;
GST_START_TEST (test_stats_with_two_streams)
{
/* test that the stats generated with audio and video stream have correct info */
struct test_webrtc *t = test_webrtc_new ();
GstPromise *p;
t->on_offer_created = NULL;
t->on_answer_created = NULL;
t->on_negotiation_needed = NULL;
t->on_ice_candidate = NULL;
t->on_pad_added = _pad_added_fakesink;
GstHarness *h1 = gst_harness_new_with_element (t->webrtc1, "sink_0", NULL);
add_audio_test_src_harness (h1, 0xDEADBEEF);
t->harnesses = g_list_prepend (t->harnesses, h1);
GstHarness *h2 = gst_harness_new_with_element (t->webrtc1, "sink_1", NULL);
add_video_test_src_harness (h2, 0xCAFECAFE);
t->harnesses = g_list_prepend (t->harnesses, h2);
fail_if (gst_element_set_state (t->webrtc1,
GST_STATE_READY) == GST_STATE_CHANGE_FAILURE);
fail_if (gst_element_set_state (t->webrtc2,
GST_STATE_READY) == GST_STATE_CHANGE_FAILURE);
test_webrtc_create_offer (t);
fail_if (gst_element_set_state (t->webrtc1,
GST_STATE_PLAYING) == GST_STATE_CHANGE_FAILURE);
fail_if (gst_element_set_state (t->webrtc2,
GST_STATE_PLAYING) == GST_STATE_CHANGE_FAILURE);
test_webrtc_wait_for_answer_error_eos (t);
/* We need to push data until the connection is established and the
* statistics start reporting outbound-rtp stats before things will
* unblock below */
gst_harness_push_from_src (h1);
gst_harness_push_from_src (h2);
while (TRUE) {
struct stats_check_state state = {.t = t,.n_streams = 2, 0 };
g_usleep (100 * 1000);
p = gst_promise_new_with_change_func (_on_stats, &state, NULL);
g_signal_emit_by_name (t->webrtc1, "get-stats", NULL, p);
p = gst_promise_new_with_change_func (_on_stats, &state, NULL);
g_signal_emit_by_name (t->webrtc2, "get-stats", NULL, p);
if (test_webrtc_check_for_state_mask (t, 1 << STATE_CUSTOM))
break;
}
test_webrtc_free (t);
}
@ -4694,6 +4789,25 @@ add_audio_test_src_harness (GstHarness * h, guint ssrc)
#undef L16_CAPS
}
static void
add_video_test_src_harness (GstHarness * h, guint ssrc)
{
GstCaps *caps = gst_caps_from_string (VP8_RTP_CAPS (96));
GstElement *capsfilter;
if (ssrc != 0) {
gst_caps_set_simple (caps, "ssrc", G_TYPE_UINT, ssrc, NULL);
}
gst_harness_add_src_parse (h,
"videotestsrc is-live=true ! video/x-raw,width=16,height=16 ! vp8enc ! rtpvp8pay ! "
"capsfilter name=capsfilter ! identity", TRUE);
capsfilter =
gst_bin_get_by_name (GST_BIN (h->src_harness->element), "capsfilter");
g_object_set (G_OBJECT (capsfilter), "caps", caps, NULL);
gst_harness_set_src_caps (h, caps);
caps = NULL;
gst_clear_object (&capsfilter);
}
struct pad_added_harness_data
{
GList *sink_harnesses;
@ -6249,7 +6363,7 @@ webrtcbin_suite (void)
Suite *s = suite_create ("webrtcbin");
TCase *tc = tcase_create ("general");
GstPluginFeature *nicesrc, *nicesink, *dtlssrtpdec, *dtlssrtpenc;
GstPluginFeature *sctpenc, *sctpdec;
GstPluginFeature *sctpenc, *sctpdec, *vp8enc;
GstRegistry *registry;
registry = gst_registry_get ();
@ -6259,6 +6373,7 @@ webrtcbin_suite (void)
dtlssrtpdec = gst_registry_lookup_feature (registry, "dtlssrtpdec");
sctpenc = gst_registry_lookup_feature (registry, "sctpenc");
sctpdec = gst_registry_lookup_feature (registry, "sctpdec");
vp8enc = gst_registry_lookup_feature (registry, "vp8enc");
tcase_add_test (tc, test_no_nice_elements_request_pad);
tcase_add_test (tc, test_no_nice_elements_state_change);
@ -6266,6 +6381,12 @@ webrtcbin_suite (void)
tcase_add_test (tc, test_sdp_no_media);
tcase_add_test (tc, test_session_stats);
tcase_add_test (tc, test_stats_with_stream);
if (vp8enc) {
tcase_add_test (tc, test_stats_with_two_streams);
} else {
GST_WARNING ("A required element was not found: vp8enc. Skipping tests");
}
tcase_add_test (tc, test_audio);
tcase_add_test (tc, test_audio_sendrecv);
tcase_add_test (tc, test_ice_port_restriction);