srt: Replace stats accumulation with naive byte counting

srt_bstats cannot be used to get the stats of closed connections, so the
best we can do is keep the running count ourselves.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/3156>
This commit is contained in:
Jan Alexander Steffens (heftig) 2020-10-05 19:50:13 +02:00 committed by GStreamer Marge Bot
parent bfed80a82c
commit b6974b6afc
2 changed files with 15 additions and 45 deletions

View file

@ -82,9 +82,6 @@ typedef struct
gboolean sent_headers; gboolean sent_headers;
} SRTCaller; } SRTCaller;
static GstStructure *gst_srt_object_accumulate_stats (GstSRTObject * srtobject,
SRTSOCKET srtsock);
static SRTCaller * static SRTCaller *
srt_caller_new (void) srt_caller_new (void)
{ {
@ -118,14 +115,6 @@ srt_caller_free (SRTCaller * caller)
static void static void
srt_caller_signal_removed (SRTCaller * caller, GstSRTObject * srtobject) srt_caller_signal_removed (SRTCaller * caller, GstSRTObject * srtobject)
{ {
GstStructure *stats;
stats = gst_srt_object_accumulate_stats (srtobject, caller->sock);
/* FIXME: These are the final statistics for the caller before we close its
* socket. Deliver the stats to the app before we throw them away. */
gst_structure_free (stats);
g_signal_emit_by_name (srtobject->element, "caller-removed", 0, g_signal_emit_by_name (srtobject->element, "caller-removed", 0,
caller->sockaddr); caller->sockaddr);
} }
@ -1383,7 +1372,7 @@ gboolean
gst_srt_object_open (GstSRTObject * srtobject, GCancellable * cancellable, gst_srt_object_open (GstSRTObject * srtobject, GCancellable * cancellable,
GError ** error) GError ** error)
{ {
srtobject->previous_bytes = 0; srtobject->bytes = 0;
return gst_srt_object_open_internal (srtobject, cancellable, error); return gst_srt_object_open_internal (srtobject, cancellable, error);
} }
@ -1394,18 +1383,10 @@ gst_srt_object_close (GstSRTObject * srtobject)
g_mutex_lock (&srtobject->sock_lock); g_mutex_lock (&srtobject->sock_lock);
if (srtobject->sock != SRT_INVALID_SOCK) { if (srtobject->sock != SRT_INVALID_SOCK) {
GstStructure *stats;
if (srtobject->poll_id != SRT_ERROR) { if (srtobject->poll_id != SRT_ERROR) {
srt_epoll_remove_usock (srtobject->poll_id, srtobject->sock); srt_epoll_remove_usock (srtobject->poll_id, srtobject->sock);
} }
stats = gst_srt_object_accumulate_stats (srtobject, srtobject->sock);
/* FIXME: These are the final statistics for the socket before we close it.
* Deliver the stats to the app before we throw them away. */
gst_structure_free (stats);
GST_DEBUG_OBJECT (srtobject->element, "Closing SRT socket (0x%x)", GST_DEBUG_OBJECT (srtobject->element, "Closing SRT socket (0x%x)",
srtobject->sock); srtobject->sock);
@ -1584,6 +1565,8 @@ gst_srt_object_read (GstSRTObject * srtobject,
return -1; return -1;
} }
} }
srtobject->bytes += len;
break; break;
} }
@ -1629,6 +1612,7 @@ gst_srt_object_send_headers (GstSRTObject * srtobject, SRTSOCKET sock,
for (i = 0; i < size; i++) { for (i = 0; i < size; i++) {
SRTSOCKET wsock = sock; SRTSOCKET wsock = sock;
gint wsocklen = 1; gint wsocklen = 1;
gint sent;
GstBuffer *buffer = gst_buffer_list_get (headers, i); GstBuffer *buffer = gst_buffer_list_get (headers, i);
GstMapInfo mapinfo; GstMapInfo mapinfo;
@ -1651,14 +1635,16 @@ gst_srt_object_send_headers (GstSRTObject * srtobject, SRTSOCKET sock,
return FALSE; return FALSE;
} }
if (srt_sendmsg2 (wsock, (char *) mapinfo.data, mapinfo.size, sent = srt_sendmsg2 (wsock, (char *) mapinfo.data, mapinfo.size, 0);
0) == SRT_ERROR) { if (sent == SRT_ERROR) {
GST_ELEMENT_ERROR (srtobject->element, RESOURCE, WRITE, NULL, GST_ELEMENT_ERROR (srtobject->element, RESOURCE, WRITE, NULL,
("%s", srt_getlasterror_str ())); ("%s", srt_getlasterror_str ()));
gst_buffer_unmap (buffer, &mapinfo); gst_buffer_unmap (buffer, &mapinfo);
return FALSE; return FALSE;
} }
srtobject->bytes += sent;
gst_buffer_unmap (buffer, &mapinfo); gst_buffer_unmap (buffer, &mapinfo);
} }
@ -1710,6 +1696,7 @@ gst_srt_object_write_to_callers (GstSRTObject * srtobject,
goto err; goto err;
} }
len += sent; len += sent;
srtobject->bytes += sent;
} }
continue; continue;
@ -1812,6 +1799,7 @@ gst_srt_object_write_one (GstSRTObject * srtobject,
break; break;
} }
len += sent; len += sent;
srtobject->bytes += sent;
} }
return len; return len;
@ -1854,7 +1842,7 @@ gst_srt_object_write (GstSRTObject * srtobject,
} }
static GstStructure * static GstStructure *
get_stats_for_srtsock (SRTSOCKET srtsock, gboolean is_sender, guint64 * bytes) get_stats_for_srtsock (SRTSOCKET srtsock, gboolean is_sender)
{ {
GstStructure *s = gst_structure_new_empty ("application/x-srt-statistics"); GstStructure *s = gst_structure_new_empty ("application/x-srt-statistics");
int ret; int ret;
@ -1890,7 +1878,6 @@ get_stats_for_srtsock (SRTSOCKET srtsock, gboolean is_sender, guint64 * bytes)
/* busy sending time (i.e., idle time exclusive) */ /* busy sending time (i.e., idle time exclusive) */
"send-duration-us", G_TYPE_UINT64, stats.usSndDuration, "send-duration-us", G_TYPE_UINT64, stats.usSndDuration,
"negotiated-latency-ms", G_TYPE_INT, stats.msSndTsbPdDelay, NULL); "negotiated-latency-ms", G_TYPE_INT, stats.msSndTsbPdDelay, NULL);
*bytes += stats.byteSent;
} else { } else {
gst_structure_set (s, gst_structure_set (s,
"packets-received", G_TYPE_INT64, stats.pktRecvTotal, "packets-received", G_TYPE_INT64, stats.pktRecvTotal,
@ -1903,7 +1890,6 @@ get_stats_for_srtsock (SRTSOCKET srtsock, gboolean is_sender, guint64 * bytes)
"bytes-received-lost", G_TYPE_UINT64, stats.byteRcvLossTotal, "bytes-received-lost", G_TYPE_UINT64, stats.byteRcvLossTotal,
"receive-rate-mbps", G_TYPE_DOUBLE, stats.mbpsRecvRate, "receive-rate-mbps", G_TYPE_DOUBLE, stats.mbpsRecvRate,
"negotiated-latency-ms", G_TYPE_INT, stats.msRcvTsbPdDelay, NULL); "negotiated-latency-ms", G_TYPE_INT, stats.msRcvTsbPdDelay, NULL);
*bytes += stats.byteRecvTotal;
} }
gst_structure_set (s, gst_structure_set (s,
@ -1921,14 +1907,11 @@ gst_srt_object_get_stats (GstSRTObject * srtobject)
{ {
GstStructure *s = NULL; GstStructure *s = NULL;
gboolean is_sender = GST_IS_BASE_SINK (srtobject->element); gboolean is_sender = GST_IS_BASE_SINK (srtobject->element);
guint64 bytes;
g_mutex_lock (&srtobject->sock_lock); g_mutex_lock (&srtobject->sock_lock);
bytes = srtobject->previous_bytes;
if (srtobject->sock != SRT_INVALID_SOCK) { if (srtobject->sock != SRT_INVALID_SOCK) {
s = get_stats_for_srtsock (srtobject->sock, is_sender, &bytes); s = get_stats_for_srtsock (srtobject->sock, is_sender);
goto done; goto done;
} }
@ -1944,7 +1927,7 @@ gst_srt_object_get_stats (GstSRTObject * srtobject)
GstStructure *tmp; GstStructure *tmp;
GValue *v; GValue *v;
tmp = get_stats_for_srtsock (caller->sock, is_sender, &bytes); tmp = get_stats_for_srtsock (caller->sock, is_sender);
gst_structure_set (tmp, "caller-address", G_TYPE_SOCKET_ADDRESS, gst_structure_set (tmp, "caller-address", G_TYPE_SOCKET_ADDRESS,
caller->sockaddr, NULL); caller->sockaddr, NULL);
@ -1962,22 +1945,9 @@ gst_srt_object_get_stats (GstSRTObject * srtobject)
done: done:
gst_structure_set (s, is_sender ? "bytes-sent-total" : "bytes-received-total", gst_structure_set (s, is_sender ? "bytes-sent-total" : "bytes-received-total",
G_TYPE_UINT64, bytes, NULL); G_TYPE_UINT64, srtobject->bytes, NULL);
g_mutex_unlock (&srtobject->sock_lock); g_mutex_unlock (&srtobject->sock_lock);
return s; return s;
} }
static GstStructure *
gst_srt_object_accumulate_stats (GstSRTObject * srtobject, SRTSOCKET srtsock)
{
gboolean is_sender = GST_IS_BASE_SINK (srtobject->element);
GstStructure *stats;
guint64 bytes = 0;
stats = get_stats_for_srtsock (srtsock, is_sender, &bytes);
srtobject->previous_bytes += bytes;
return stats;
}

View file

@ -72,7 +72,7 @@ struct _GstSRTObject
gboolean authentication; gboolean authentication;
guint64 previous_bytes; guint64 bytes;
}; };
GstSRTObject *gst_srt_object_new (GstElement *element); GstSRTObject *gst_srt_object_new (GstElement *element);