From 97c05d3f4be168352799d1394c857b8bf005670b Mon Sep 17 00:00:00 2001 From: "Jan Alexander Steffens (heftig)" Date: Wed, 18 Mar 2020 17:58:52 +0100 Subject: [PATCH] srt: Accumulate total bytes sent/received over all connections/callers So we don't lose them. Split gst_srt_object_open_internal for internal reconnections that don't reset the accumulated bytes. --- ext/srt/gstsrtobject.c | 72 ++++++++++++++++++++++++++++++++++++------ ext/srt/gstsrtobject.h | 2 ++ 2 files changed, 64 insertions(+), 10 deletions(-) diff --git a/ext/srt/gstsrtobject.c b/ext/srt/gstsrtobject.c index 23566ce8e9..6970474e70 100644 --- a/ext/srt/gstsrtobject.c +++ b/ext/srt/gstsrtobject.c @@ -59,6 +59,9 @@ typedef struct gboolean sent_headers; } SRTCaller; +static GstStructure *gst_srt_object_accumulate_stats (GstSRTObject * srtobject, + SRTSOCKET srtsock); + static SRTCaller * srt_caller_new (void) { @@ -92,6 +95,14 @@ srt_caller_free (SRTCaller * caller) static void srt_caller_signal_removed (SRTCaller * caller, GstSRTObject * srtobject) { + GstStructure *stats; + + stats = gst_srt_object_accumulate_stats (srtobject, caller->sock); + + /* FIXME: These are the final statistics for the caller before we close its + * socket. Deliver the stats to the app before we throw them away. */ + gst_structure_free (stats); + g_signal_emit_by_name (srtobject->element, "caller-removed", caller->sock, caller->sockaddr); } @@ -966,9 +977,9 @@ gst_srt_object_open_connection (GstSRTObject * srtobject, return ret; } -gboolean -gst_srt_object_open (GstSRTObject * srtobject, GCancellable * cancellable, - GError ** error) +static gboolean +gst_srt_object_open_internal (GstSRTObject * srtobject, + GCancellable * cancellable, GError ** error) { GSocketAddress *socket_address = NULL; GstSRTConnectionMode connection_mode = GST_SRT_CONNECTION_MODE_NONE; @@ -1044,6 +1055,15 @@ out: return ret; } +gboolean +gst_srt_object_open (GstSRTObject * srtobject, GCancellable * cancellable, + GError ** error) +{ + srtobject->previous_bytes = 0; + + return gst_srt_object_open_internal (srtobject, cancellable, error); +} + void gst_srt_object_close (GstSRTObject * srtobject) { @@ -1053,6 +1073,13 @@ gst_srt_object_close (GstSRTObject * srtobject) } if (srtobject->sock != SRT_INVALID_SOCK) { + GstStructure *stats; + + stats = gst_srt_object_accumulate_stats (srtobject, srtobject->sock); + + /* FIXME: These are the final statistics for the socket before we close it. + * Deliver the stats to the app before we throw them away. */ + gst_structure_free (stats); GST_DEBUG_OBJECT (srtobject->element, "Closing SRT socket (0x%x)", srtobject->sock); @@ -1192,7 +1219,7 @@ gst_srt_object_read (GstSRTObject * srtobject, GST_WARNING_OBJECT (srtobject->element, "Invalid SRT socket. Trying to reconnect"); gst_srt_object_close (srtobject); - if (!gst_srt_object_open (srtobject, cancellable, error)) { + if (!gst_srt_object_open_internal (srtobject, cancellable, error)) { return -1; } continue; @@ -1402,7 +1429,7 @@ gst_srt_object_write_one (GstSRTObject * srtobject, GST_WARNING_OBJECT (srtobject->element, "Invalid SRT socket. Trying to reconnect"); gst_srt_object_close (srtobject); - if (!gst_srt_object_open (srtobject, cancellable, error)) { + if (!gst_srt_object_open_internal (srtobject, cancellable, error)) { return -1; } continue; @@ -1465,7 +1492,7 @@ gst_srt_object_write (GstSRTObject * srtobject, } static GstStructure * -get_stats_for_srtsock (SRTSOCKET srtsock, gboolean is_sender) +get_stats_for_srtsock (SRTSOCKET srtsock, gboolean is_sender, guint64 * bytes) { GstStructure *s = gst_structure_new_empty ("application/x-srt-statistics"); int ret; @@ -1474,7 +1501,7 @@ get_stats_for_srtsock (SRTSOCKET srtsock, gboolean is_sender) ret = srt_bstats (srtsock, &stats, 0); if (ret >= 0) { - if (is_sender) + if (is_sender) { gst_structure_set (s, /* number of sent data packets, including retransmissions */ "packets-sent", G_TYPE_INT64, stats.pktSent, @@ -1501,7 +1528,8 @@ get_stats_for_srtsock (SRTSOCKET srtsock, gboolean is_sender) /* busy sending time (i.e., idle time exclusive) */ "send-duration-us", G_TYPE_UINT64, stats.usSndDuration, "negotiated-latency-ms", G_TYPE_INT, stats.msSndTsbPdDelay, NULL); - else + *bytes += stats.byteSent; + } else { gst_structure_set (s, "packets-received", G_TYPE_INT64, stats.pktRecvTotal, "packets-received-lost", G_TYPE_INT, stats.pktRcvLossTotal, @@ -1513,6 +1541,8 @@ get_stats_for_srtsock (SRTSOCKET srtsock, gboolean is_sender) "bytes-received-lost", G_TYPE_UINT64, stats.byteRcvLossTotal, "receive-rate-mbps", G_TYPE_DOUBLE, stats.mbpsRecvRate, "negotiated-latency-ms", G_TYPE_INT, stats.msRcvTsbPdDelay, NULL); + *bytes += stats.byteRecvTotal; + } gst_structure_set (s, /* estimated bandwidth, in Mb/s */ @@ -1529,10 +1559,14 @@ gst_srt_object_get_stats (GstSRTObject * srtobject) { GstStructure *s = NULL; gboolean is_sender = GST_IS_BASE_SINK (srtobject->element); + guint64 bytes; g_mutex_lock (&srtobject->sock_lock); + + bytes = srtobject->previous_bytes; + if (srtobject->sock != SRT_INVALID_SOCK) { - s = get_stats_for_srtsock (srtobject->sock, is_sender); + s = get_stats_for_srtsock (srtobject->sock, is_sender, &bytes); goto done; } @@ -1545,9 +1579,11 @@ gst_srt_object_get_stats (GstSRTObject * srtobject) for (item = srtobject->callers; item; item = item->next) { SRTCaller *caller = item->data; - GstStructure *tmp = get_stats_for_srtsock (caller->sock, is_sender); + GstStructure *tmp; GValue *v; + tmp = get_stats_for_srtsock (caller->sock, is_sender, &bytes); + g_value_array_append (callers_stats, NULL); v = g_value_array_get_nth (callers_stats, callers_stats->n_values - 1); g_value_init (v, GST_TYPE_STRUCTURE); @@ -1560,7 +1596,23 @@ gst_srt_object_get_stats (GstSRTObject * srtobject) } done: + gst_structure_set (s, is_sender ? "bytes-sent-total" : "bytes-received-total", + G_TYPE_UINT64, bytes, NULL); + g_mutex_unlock (&srtobject->sock_lock); return s; } + +static GstStructure * +gst_srt_object_accumulate_stats (GstSRTObject * srtobject, SRTSOCKET srtsock) +{ + gboolean is_sender = GST_IS_BASE_SINK (srtobject->element); + GstStructure *stats; + guint64 bytes = 0; + + stats = get_stats_for_srtsock (srtsock, is_sender, &bytes); + srtobject->previous_bytes += bytes; + + return stats; +} diff --git a/ext/srt/gstsrtobject.h b/ext/srt/gstsrtobject.h index 2bc800ac58..2e0e2cd914 100644 --- a/ext/srt/gstsrtobject.h +++ b/ext/srt/gstsrtobject.h @@ -71,6 +71,8 @@ struct _GstSRTObject gchar *passphrase; gboolean wait_for_connection; + + guint64 previous_bytes; }; GstSRTObject *gst_srt_object_new (GstElement *element);