srt: Accumulate total bytes sent/received over all connections/callers

So we don't lose them. Split gst_srt_object_open_internal for internal
reconnections that don't reset the accumulated bytes.
This commit is contained in:
Jan Alexander Steffens (heftig) 2020-03-18 17:58:52 +01:00
parent d19b3fccb5
commit 97c05d3f4b
No known key found for this signature in database
GPG key ID: DE5E0C5F25941CA5
2 changed files with 64 additions and 10 deletions

View file

@ -59,6 +59,9 @@ typedef struct
gboolean sent_headers; gboolean sent_headers;
} SRTCaller; } SRTCaller;
static GstStructure *gst_srt_object_accumulate_stats (GstSRTObject * srtobject,
SRTSOCKET srtsock);
static SRTCaller * static SRTCaller *
srt_caller_new (void) srt_caller_new (void)
{ {
@ -92,6 +95,14 @@ srt_caller_free (SRTCaller * caller)
static void static void
srt_caller_signal_removed (SRTCaller * caller, GstSRTObject * srtobject) srt_caller_signal_removed (SRTCaller * caller, GstSRTObject * srtobject)
{ {
GstStructure *stats;
stats = gst_srt_object_accumulate_stats (srtobject, caller->sock);
/* FIXME: These are the final statistics for the caller before we close its
* socket. Deliver the stats to the app before we throw them away. */
gst_structure_free (stats);
g_signal_emit_by_name (srtobject->element, "caller-removed", caller->sock, g_signal_emit_by_name (srtobject->element, "caller-removed", caller->sock,
caller->sockaddr); caller->sockaddr);
} }
@ -966,9 +977,9 @@ gst_srt_object_open_connection (GstSRTObject * srtobject,
return ret; return ret;
} }
gboolean static gboolean
gst_srt_object_open (GstSRTObject * srtobject, GCancellable * cancellable, gst_srt_object_open_internal (GstSRTObject * srtobject,
GError ** error) GCancellable * cancellable, GError ** error)
{ {
GSocketAddress *socket_address = NULL; GSocketAddress *socket_address = NULL;
GstSRTConnectionMode connection_mode = GST_SRT_CONNECTION_MODE_NONE; GstSRTConnectionMode connection_mode = GST_SRT_CONNECTION_MODE_NONE;
@ -1044,6 +1055,15 @@ out:
return ret; return ret;
} }
gboolean
gst_srt_object_open (GstSRTObject * srtobject, GCancellable * cancellable,
GError ** error)
{
srtobject->previous_bytes = 0;
return gst_srt_object_open_internal (srtobject, cancellable, error);
}
void void
gst_srt_object_close (GstSRTObject * srtobject) gst_srt_object_close (GstSRTObject * srtobject)
{ {
@ -1053,6 +1073,13 @@ gst_srt_object_close (GstSRTObject * srtobject)
} }
if (srtobject->sock != SRT_INVALID_SOCK) { if (srtobject->sock != SRT_INVALID_SOCK) {
GstStructure *stats;
stats = gst_srt_object_accumulate_stats (srtobject, srtobject->sock);
/* FIXME: These are the final statistics for the socket before we close it.
* Deliver the stats to the app before we throw them away. */
gst_structure_free (stats);
GST_DEBUG_OBJECT (srtobject->element, "Closing SRT socket (0x%x)", GST_DEBUG_OBJECT (srtobject->element, "Closing SRT socket (0x%x)",
srtobject->sock); srtobject->sock);
@ -1192,7 +1219,7 @@ gst_srt_object_read (GstSRTObject * srtobject,
GST_WARNING_OBJECT (srtobject->element, GST_WARNING_OBJECT (srtobject->element,
"Invalid SRT socket. Trying to reconnect"); "Invalid SRT socket. Trying to reconnect");
gst_srt_object_close (srtobject); gst_srt_object_close (srtobject);
if (!gst_srt_object_open (srtobject, cancellable, error)) { if (!gst_srt_object_open_internal (srtobject, cancellable, error)) {
return -1; return -1;
} }
continue; continue;
@ -1402,7 +1429,7 @@ gst_srt_object_write_one (GstSRTObject * srtobject,
GST_WARNING_OBJECT (srtobject->element, GST_WARNING_OBJECT (srtobject->element,
"Invalid SRT socket. Trying to reconnect"); "Invalid SRT socket. Trying to reconnect");
gst_srt_object_close (srtobject); gst_srt_object_close (srtobject);
if (!gst_srt_object_open (srtobject, cancellable, error)) { if (!gst_srt_object_open_internal (srtobject, cancellable, error)) {
return -1; return -1;
} }
continue; continue;
@ -1465,7 +1492,7 @@ gst_srt_object_write (GstSRTObject * srtobject,
} }
static GstStructure * static GstStructure *
get_stats_for_srtsock (SRTSOCKET srtsock, gboolean is_sender) get_stats_for_srtsock (SRTSOCKET srtsock, gboolean is_sender, guint64 * bytes)
{ {
GstStructure *s = gst_structure_new_empty ("application/x-srt-statistics"); GstStructure *s = gst_structure_new_empty ("application/x-srt-statistics");
int ret; int ret;
@ -1474,7 +1501,7 @@ get_stats_for_srtsock (SRTSOCKET srtsock, gboolean is_sender)
ret = srt_bstats (srtsock, &stats, 0); ret = srt_bstats (srtsock, &stats, 0);
if (ret >= 0) { if (ret >= 0) {
if (is_sender) if (is_sender) {
gst_structure_set (s, gst_structure_set (s,
/* number of sent data packets, including retransmissions */ /* number of sent data packets, including retransmissions */
"packets-sent", G_TYPE_INT64, stats.pktSent, "packets-sent", G_TYPE_INT64, stats.pktSent,
@ -1501,7 +1528,8 @@ get_stats_for_srtsock (SRTSOCKET srtsock, gboolean is_sender)
/* busy sending time (i.e., idle time exclusive) */ /* busy sending time (i.e., idle time exclusive) */
"send-duration-us", G_TYPE_UINT64, stats.usSndDuration, "send-duration-us", G_TYPE_UINT64, stats.usSndDuration,
"negotiated-latency-ms", G_TYPE_INT, stats.msSndTsbPdDelay, NULL); "negotiated-latency-ms", G_TYPE_INT, stats.msSndTsbPdDelay, NULL);
else *bytes += stats.byteSent;
} else {
gst_structure_set (s, gst_structure_set (s,
"packets-received", G_TYPE_INT64, stats.pktRecvTotal, "packets-received", G_TYPE_INT64, stats.pktRecvTotal,
"packets-received-lost", G_TYPE_INT, stats.pktRcvLossTotal, "packets-received-lost", G_TYPE_INT, stats.pktRcvLossTotal,
@ -1513,6 +1541,8 @@ get_stats_for_srtsock (SRTSOCKET srtsock, gboolean is_sender)
"bytes-received-lost", G_TYPE_UINT64, stats.byteRcvLossTotal, "bytes-received-lost", G_TYPE_UINT64, stats.byteRcvLossTotal,
"receive-rate-mbps", G_TYPE_DOUBLE, stats.mbpsRecvRate, "receive-rate-mbps", G_TYPE_DOUBLE, stats.mbpsRecvRate,
"negotiated-latency-ms", G_TYPE_INT, stats.msRcvTsbPdDelay, NULL); "negotiated-latency-ms", G_TYPE_INT, stats.msRcvTsbPdDelay, NULL);
*bytes += stats.byteRecvTotal;
}
gst_structure_set (s, gst_structure_set (s,
/* estimated bandwidth, in Mb/s */ /* estimated bandwidth, in Mb/s */
@ -1529,10 +1559,14 @@ gst_srt_object_get_stats (GstSRTObject * srtobject)
{ {
GstStructure *s = NULL; GstStructure *s = NULL;
gboolean is_sender = GST_IS_BASE_SINK (srtobject->element); gboolean is_sender = GST_IS_BASE_SINK (srtobject->element);
guint64 bytes;
g_mutex_lock (&srtobject->sock_lock); g_mutex_lock (&srtobject->sock_lock);
bytes = srtobject->previous_bytes;
if (srtobject->sock != SRT_INVALID_SOCK) { if (srtobject->sock != SRT_INVALID_SOCK) {
s = get_stats_for_srtsock (srtobject->sock, is_sender); s = get_stats_for_srtsock (srtobject->sock, is_sender, &bytes);
goto done; goto done;
} }
@ -1545,9 +1579,11 @@ gst_srt_object_get_stats (GstSRTObject * srtobject)
for (item = srtobject->callers; item; item = item->next) { for (item = srtobject->callers; item; item = item->next) {
SRTCaller *caller = item->data; SRTCaller *caller = item->data;
GstStructure *tmp = get_stats_for_srtsock (caller->sock, is_sender); GstStructure *tmp;
GValue *v; GValue *v;
tmp = get_stats_for_srtsock (caller->sock, is_sender, &bytes);
g_value_array_append (callers_stats, NULL); g_value_array_append (callers_stats, NULL);
v = g_value_array_get_nth (callers_stats, callers_stats->n_values - 1); v = g_value_array_get_nth (callers_stats, callers_stats->n_values - 1);
g_value_init (v, GST_TYPE_STRUCTURE); g_value_init (v, GST_TYPE_STRUCTURE);
@ -1560,7 +1596,23 @@ gst_srt_object_get_stats (GstSRTObject * srtobject)
} }
done: done:
gst_structure_set (s, is_sender ? "bytes-sent-total" : "bytes-received-total",
G_TYPE_UINT64, bytes, NULL);
g_mutex_unlock (&srtobject->sock_lock); g_mutex_unlock (&srtobject->sock_lock);
return s; return s;
} }
static GstStructure *
gst_srt_object_accumulate_stats (GstSRTObject * srtobject, SRTSOCKET srtsock)
{
gboolean is_sender = GST_IS_BASE_SINK (srtobject->element);
GstStructure *stats;
guint64 bytes = 0;
stats = get_stats_for_srtsock (srtsock, is_sender, &bytes);
srtobject->previous_bytes += bytes;
return stats;
}

View file

@ -71,6 +71,8 @@ struct _GstSRTObject
gchar *passphrase; gchar *passphrase;
gboolean wait_for_connection; gboolean wait_for_connection;
guint64 previous_bytes;
}; };
GstSRTObject *gst_srt_object_new (GstElement *element); GstSRTObject *gst_srt_object_new (GstElement *element);