gstreamer/ext/srt/gstsrtobject.c
Jakub Adam 19391ae4c7 srtobject: allow passing SRT Stream ID in stream URI
Based on Stream ID, the application can accept or reject the connection,
select the desired data stream, or set an appropriate passphrase for the
connection. Example usage:

  srt://127.0.0.1:1234?streamid=mystream
2019-11-20 17:09:34 +00:00

1546 lines
45 KiB
C

/* GStreamer
* Copyright (C) 2018, Collabora Ltd.
* Copyright (C) 2018, SK Telecom, Co., Ltd.
* Author: Jeongseok Kim <jeongseok.kim@sk.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
/* Needed for GValueArray */
#define GLIB_DISABLE_DEPRECATION_WARNINGS
#include "gstsrtobject.h"
#include <gst/base/gstbasesink.h>
#include <gio/gnetworking.h>
#include <stdlib.h>
GST_DEBUG_CATEGORY_EXTERN (gst_debug_srtobject);
#define GST_CAT_DEFAULT gst_debug_srtobject
enum
{
PROP_URI = 1,
PROP_MODE,
PROP_LOCALADDRESS,
PROP_LOCALPORT,
PROP_PASSPHRASE,
PROP_PBKEYLEN,
PROP_POLL_TIMEOUT,
PROP_LATENCY,
PROP_MSG_SIZE,
PROP_STATS,
PROP_LAST
};
typedef struct
{
SRTSOCKET sock;
gint poll_id;
GSocketAddress *sockaddr;
gboolean sent_headers;
} SRTCaller;
static SRTCaller *
srt_caller_new (void)
{
SRTCaller *caller = g_new0 (SRTCaller, 1);
caller->sock = SRT_INVALID_SOCK;
caller->poll_id = SRT_ERROR;
caller->sent_headers = FALSE;
return caller;
}
static void
srt_caller_free (SRTCaller * caller)
{
g_return_if_fail (caller != NULL);
g_clear_object (&caller->sockaddr);
if (caller->sock != SRT_INVALID_SOCK) {
srt_close (caller->sock);
}
if (caller->poll_id != SRT_ERROR) {
srt_epoll_release (caller->poll_id);
}
g_free (caller);
}
static void
srt_caller_invoke_removed_closure (SRTCaller * caller, GstSRTObject * srtobject)
{
GValue values[2] = { G_VALUE_INIT };
if (srtobject->caller_removed_closure == NULL) {
return;
}
g_value_init (&values[0], G_TYPE_INT);
g_value_set_int (&values[0], caller->sock);
g_value_init (&values[1], G_TYPE_SOCKET_ADDRESS);
g_value_set_object (&values[1], caller->sockaddr);
g_closure_invoke (srtobject->caller_removed_closure, NULL, 2, values, NULL);
g_value_unset (&values[0]);
g_value_unset (&values[1]);
}
struct srt_constant_params
{
const gchar *name;
gint param;
gint val;
};
static struct srt_constant_params srt_params[] = {
{"SRTO_SNDSYN", SRTO_SNDSYN, 0}, /* 0: non-blocking */
{"SRTO_RCVSYN", SRTO_RCVSYN, 0}, /* 0: non-blocking */
{"SRTO_LINGER", SRTO_LINGER, 0},
{"SRTO_TSBPMODE", SRTO_TSBPDMODE, 1}, /* Timestamp-based Packet Delivery mode must be enabled */
{NULL, -1, -1},
};
static gint srt_init_refcount = 0;
static gboolean
gst_srt_object_set_common_params (SRTSOCKET sock, GstSRTObject * srtobject,
GError ** error)
{
struct srt_constant_params *params = srt_params;
for (; params->name != NULL; params++) {
if (srt_setsockopt (sock, 0, params->param, &params->val, sizeof (gint))) {
g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_SETTINGS,
"failed to set %s (reason: %s)", params->name,
srt_getlasterror_str ());
return FALSE;
}
}
if (srtobject->passphrase != NULL && srtobject->passphrase[0] != '\0') {
gint pbkeylen;
if (srt_setsockopt (sock, 0, SRTO_PASSPHRASE, srtobject->passphrase,
strlen (srtobject->passphrase))) {
g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_SETTINGS,
"failed to set passphrase (reason: %s)", srt_getlasterror_str ());
return FALSE;
}
if (!gst_structure_get_int (srtobject->parameters, "pbkeylen", &pbkeylen)) {
pbkeylen = GST_SRT_DEFAULT_PBKEYLEN;
}
if (srt_setsockopt (sock, 0, SRTO_PBKEYLEN, &pbkeylen, sizeof (int))) {
g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_SETTINGS,
"failed to set pbkeylen (reason: %s)", srt_getlasterror_str ());
return FALSE;
}
}
{
int latency;
if (!gst_structure_get_int (srtobject->parameters, "latency", &latency))
latency = GST_SRT_DEFAULT_LATENCY;
if (srt_setsockopt (sock, 0, SRTO_LATENCY, &latency, sizeof (int))) {
g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_SETTINGS,
"failed to set latency (reason: %s)", srt_getlasterror_str ());
return FALSE;
}
}
if (gst_structure_has_field (srtobject->parameters, "streamid")) {
const gchar *streamid;
streamid = gst_structure_get_string (srtobject->parameters, "streamid");
if (streamid != NULL && streamid[0] != '\0') {
if (srt_setsockopt (sock, 0, SRTO_STREAMID, streamid, strlen (streamid))) {
g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_SETTINGS,
"failed to set stream ID (reason: %s)", srt_getlasterror_str ());
}
}
}
return TRUE;
}
GstSRTObject *
gst_srt_object_new (GstElement * element)
{
GstSRTObject *srtobject;
if (g_atomic_int_get (&srt_init_refcount) == 0) {
GST_DEBUG_OBJECT (element, "Starting up SRT");
if (srt_startup () != 0) {
g_warning ("Failed to initialize SRT (reason: %s)",
srt_getlasterror_str ());
}
}
g_atomic_int_inc (&srt_init_refcount);
srtobject = g_new0 (GstSRTObject, 1);
srtobject->element = element;
srtobject->parameters = gst_structure_new ("application/x-srt-params",
"poll-timeout", G_TYPE_INT, GST_SRT_DEFAULT_POLL_TIMEOUT,
"latency", G_TYPE_INT, GST_SRT_DEFAULT_LATENCY,
"mode", GST_TYPE_SRT_CONNECTION_MODE, GST_SRT_DEFAULT_MODE, NULL);
srtobject->sock = SRT_INVALID_SOCK;
srtobject->poll_id = srt_epoll_create ();
srtobject->listener_sock = SRT_INVALID_SOCK;
srtobject->listener_poll_id = SRT_ERROR;
srtobject->sent_headers = FALSE;
g_cond_init (&srtobject->sock_cond);
return srtobject;
}
void
gst_srt_object_destroy (GstSRTObject * srtobject)
{
g_return_if_fail (srtobject != NULL);
if (srtobject->poll_id != SRT_ERROR) {
srt_epoll_release (srtobject->poll_id);
srtobject->poll_id = SRT_ERROR;
}
g_cond_clear (&srtobject->sock_cond);
GST_DEBUG_OBJECT (srtobject->element, "Destroying srtobject");
gst_structure_free (srtobject->parameters);
g_free (srtobject->passphrase);
if (g_atomic_int_dec_and_test (&srt_init_refcount)) {
srt_cleanup ();
GST_DEBUG_OBJECT (srtobject->element, "Cleaning up SRT");
}
g_clear_pointer (&srtobject->uri, gst_uri_unref);
g_free (srtobject);
}
gboolean
gst_srt_object_set_property_helper (GstSRTObject * srtobject,
guint prop_id, const GValue * value, GParamSpec * pspec)
{
switch (prop_id) {
case PROP_URI:{
const gchar *uri = g_value_get_string (value);
gst_srt_object_set_uri (srtobject, uri, NULL);
break;
}
case PROP_MODE:
gst_structure_set_value (srtobject->parameters, "mode", value);
break;
case PROP_POLL_TIMEOUT:
gst_structure_set_value (srtobject->parameters, "poll-timeout", value);
break;
case PROP_LATENCY:
gst_structure_set_value (srtobject->parameters, "latency", value);
break;
case PROP_LOCALADDRESS:
gst_structure_set_value (srtobject->parameters, "localaddress", value);
break;
case PROP_LOCALPORT:
gst_structure_set_value (srtobject->parameters, "localport", value);
break;
case PROP_PASSPHRASE:
g_free (srtobject->passphrase);
srtobject->passphrase = g_value_dup_string (value);
break;
case PROP_PBKEYLEN:
gst_structure_set_value (srtobject->parameters, "pbkeylen", value);
break;
default:
return FALSE;
}
return TRUE;
}
gboolean
gst_srt_object_get_property_helper (GstSRTObject * srtobject,
guint prop_id, GValue * value, GParamSpec * pspec)
{
switch (prop_id) {
case PROP_URI:
g_value_take_string (value, gst_uri_to_string (srtobject->uri));
break;
case PROP_MODE:{
GstSRTConnectionMode v;
if (!gst_structure_get_enum (srtobject->parameters, "mode",
GST_TYPE_SRT_CONNECTION_MODE, (gint *) & v)) {
GST_WARNING_OBJECT (srtobject->element, "Failed to get 'mode'");
v = GST_SRT_CONNECTION_MODE_NONE;
}
g_value_set_enum (value, v);
break;
}
case PROP_LOCALADDRESS:
g_value_set_string (value,
gst_structure_get_string (srtobject->parameters, "localaddress"));
break;
case PROP_LOCALPORT:{
guint v;
if (!gst_structure_get_uint (srtobject->parameters, "localport", &v)) {
GST_WARNING_OBJECT (srtobject->element, "Failed to get 'localport'");
v = GST_SRT_DEFAULT_PORT;
}
g_value_set_uint (value, v);
break;
}
case PROP_PBKEYLEN:{
GstSRTKeyLength v;
if (!gst_structure_get_enum (srtobject->parameters, "pbkeylen",
GST_TYPE_SRT_KEY_LENGTH, (gint *) & v)) {
GST_WARNING_OBJECT (srtobject->element, "Failed to get 'pbkeylen'");
v = GST_SRT_KEY_LENGTH_NO_KEY;
}
g_value_set_enum (value, v);
break;
}
case PROP_POLL_TIMEOUT:{
gint v;
if (!gst_structure_get_int (srtobject->parameters, "poll-timeout", &v)) {
GST_WARNING_OBJECT (srtobject->element, "Failed to get 'poll-timeout'");
v = GST_SRT_DEFAULT_POLL_TIMEOUT;
}
g_value_set_int (value, v);
break;
}
case PROP_LATENCY:{
gint v;
if (!gst_structure_get_int (srtobject->parameters, "latency", &v)) {
GST_WARNING_OBJECT (srtobject->element, "Failed to get 'latency'");
v = GST_SRT_DEFAULT_LATENCY;
}
g_value_set_int (value, v);
break;
}
case PROP_STATS:
g_value_take_boxed (value, gst_srt_object_get_stats (srtobject));
break;
default:
return FALSE;
}
return TRUE;
}
void
gst_srt_object_install_properties_helper (GObjectClass * gobject_class)
{
/**
* GstSRTSrc:uri:
*
* The URI used by SRT connection. User can specify SRT specific options by URI parameters.
* Refer to <a href="https://github.com/Haivision/srt/blob/master/docs/stransmit.md#medium-srt">Mediun: SRT</a>
*/
g_object_class_install_property (gobject_class, PROP_URI,
g_param_spec_string ("uri", "URI",
"URI in the form of srt://address:port", GST_SRT_DEFAULT_URI,
G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
G_PARAM_STATIC_STRINGS));
/**
* GstSRTSrc:mode:
*
* The SRT connection mode.
* This property can be set by URI parameters.
*/
g_object_class_install_property (gobject_class, PROP_MODE,
g_param_spec_enum ("mode", "Connection mode",
"SRT connection mode", GST_TYPE_SRT_CONNECTION_MODE,
GST_SRT_CONNECTION_MODE_CALLER,
G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
G_PARAM_STATIC_STRINGS));
/**
* GstSRTSrc:localaddress:
*
* The address to bind when #GstSRTSrc:mode is listener or rendezvous.
* This property can be set by URI parameters.
*/
g_object_class_install_property (gobject_class, PROP_LOCALADDRESS,
g_param_spec_string ("localaddress", "Local address",
"Local address to bind", GST_SRT_DEFAULT_LOCALADDRESS,
G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
G_PARAM_STATIC_STRINGS));
/**
* GstSRTSrc:localport:
*
* The local port to bind when #GstSRTSrc:mode is listener or rendezvous.
* This property can be set by URI parameters.
*/
g_object_class_install_property (gobject_class, PROP_LOCALPORT,
g_param_spec_uint ("localport", "Local port",
"Local port to bind", 0,
65535, GST_SRT_DEFAULT_PORT,
G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
G_PARAM_STATIC_STRINGS));
/**
* GstSRTSrc:passphrase:
*
* The password for the encrypted transmission.
* This property can be set by URI parameters.
*/
g_object_class_install_property (gobject_class, PROP_PASSPHRASE,
g_param_spec_string ("passphrase", "Passphrase",
"Password for the encrypted transmission", "",
G_PARAM_WRITABLE | GST_PARAM_MUTABLE_READY | G_PARAM_STATIC_STRINGS));
/**
* GstSRTSrc:pbkeylen:
*
* The crypto key length.
* This property can be set by URI parameters.
*/
g_object_class_install_property (gobject_class, PROP_PBKEYLEN,
g_param_spec_enum ("pbkeylen", "Crypto key length",
"Crypto key length in bytes", GST_TYPE_SRT_KEY_LENGTH,
GST_SRT_DEFAULT_PBKEYLEN,
G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
G_PARAM_STATIC_STRINGS));
/**
* GstSRTSrc:poll-timeout:
*
* The polling timeout used when srt poll is started.
* Even if the default value indicates infinite waiting, it can be cancellable according to #GstState
* This property can be set by URI parameters.
*/
g_object_class_install_property (gobject_class, PROP_POLL_TIMEOUT,
g_param_spec_int ("poll-timeout", "Poll timeout",
"Return poll wait after timeout milliseconds (-1 = infinite)", -1,
G_MAXINT32, GST_SRT_DEFAULT_POLL_TIMEOUT,
G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
G_PARAM_STATIC_STRINGS));
/**
* GstSRTSrc:latency:
*
* The maximum accepted transmission latency.
*/
g_object_class_install_property (gobject_class, PROP_LATENCY,
g_param_spec_int ("latency", "latency",
"Minimum latency (milliseconds)", 0,
G_MAXINT32, GST_SRT_DEFAULT_LATENCY,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
/**
* GstSRTSrc:stats:
*
* The statistics from SRT.
*/
g_object_class_install_property (gobject_class, PROP_STATS,
g_param_spec_boxed ("stats", "Statistics",
"SRT Statistics", GST_TYPE_STRUCTURE,
G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
}
static void
gst_srt_object_set_enum_value (GstStructure * s, GType enum_type,
gconstpointer key, gconstpointer value)
{
GEnumClass *enum_class;
GEnumValue *enum_value;
enum_class = g_type_class_ref (enum_type);
enum_value = g_enum_get_value_by_nick (enum_class, value);
if (enum_value) {
GValue v = G_VALUE_INIT;
g_value_init (&v, enum_type);
g_value_set_enum (&v, enum_value->value);
gst_structure_set_value (s, key, &v);
}
g_type_class_unref (enum_class);
}
static void
gst_srt_object_set_string_value (GstStructure * s, const gchar * key,
const gchar * value)
{
GValue v = G_VALUE_INIT;
g_value_init (&v, G_TYPE_STRING);
g_value_set_static_string (&v, value);
gst_structure_set_value (s, key, &v);
g_value_unset (&v);
}
static void
gst_srt_object_set_uint_value (GstStructure * s, const gchar * key,
const gchar * value)
{
GValue v = G_VALUE_INIT;
g_value_init (&v, G_TYPE_UINT);
g_value_set_uint (&v, (guint) strtoul (value, NULL, 10));
gst_structure_set_value (s, key, &v);
g_value_unset (&v);
}
static void
gst_srt_object_validate_parameters (GstStructure * s, GstUri * uri)
{
GstSRTConnectionMode connection_mode = GST_SRT_CONNECTION_MODE_NONE;
gst_structure_get_enum (s, "mode", GST_TYPE_SRT_CONNECTION_MODE,
(gint *) & connection_mode);
if (connection_mode == GST_SRT_CONNECTION_MODE_RENDEZVOUS ||
connection_mode == GST_SRT_CONNECTION_MODE_LISTENER) {
guint local_port;
const gchar *local_address = gst_structure_get_string (s, "localaddress");
if (local_address == NULL) {
local_address =
gst_uri_get_host (uri) ==
NULL ? GST_SRT_DEFAULT_LOCALADDRESS : gst_uri_get_host (uri);
gst_srt_object_set_string_value (s, "localaddress", local_address);
}
if (!gst_structure_get_uint (s, "localport", &local_port)) {
local_port =
gst_uri_get_port (uri) ==
GST_URI_NO_PORT ? GST_SRT_DEFAULT_PORT : gst_uri_get_port (uri);
gst_structure_set (s, "localport", G_TYPE_UINT, local_port, NULL);
}
}
}
gboolean
gst_srt_object_set_uri (GstSRTObject * srtobject, const gchar * uri,
GError ** err)
{
GHashTable *query_table = NULL;
GHashTableIter iter;
gpointer key, value;
const char *addr_str;
if (srtobject->opened) {
g_warning
("It's not supported to change the 'uri' property when SRT socket is opened.");
g_set_error (err, GST_URI_ERROR, GST_URI_ERROR_BAD_STATE,
"It's not supported to change the 'uri' property when SRT socket is opened");
return FALSE;
}
if (!g_str_has_prefix (uri, GST_SRT_DEFAULT_URI_SCHEME)) {
g_warning ("Given uri cannot be used for SRT connection.");
g_set_error (err, GST_URI_ERROR, GST_URI_ERROR_BAD_URI,
"Invalid SRT URI scheme");
return FALSE;
}
g_clear_pointer (&srtobject->uri, gst_uri_unref);
srtobject->uri = gst_uri_from_string (uri);
query_table = gst_uri_get_query_table (srtobject->uri);
GST_DEBUG_OBJECT (srtobject->element,
"set uri to (host: %s, port: %d) with %d query strings",
gst_uri_get_host (srtobject->uri), gst_uri_get_port (srtobject->uri),
query_table == NULL ? 0 : g_hash_table_size (query_table));
addr_str = gst_uri_get_host (srtobject->uri);
if (addr_str)
gst_srt_object_set_enum_value (srtobject->parameters,
GST_TYPE_SRT_CONNECTION_MODE, "mode", "caller");
else
gst_srt_object_set_enum_value (srtobject->parameters,
GST_TYPE_SRT_CONNECTION_MODE, "mode", "listener");
if (query_table) {
g_hash_table_iter_init (&iter, query_table);
while (g_hash_table_iter_next (&iter, &key, &value)) {
if (!g_strcmp0 ("mode", key)) {
gst_srt_object_set_enum_value (srtobject->parameters,
GST_TYPE_SRT_CONNECTION_MODE, key, value);
} else if (!g_strcmp0 ("localaddress", key)) {
gst_srt_object_set_string_value (srtobject->parameters, key, value);
} else if (!g_strcmp0 ("localport", key)) {
gst_srt_object_set_uint_value (srtobject->parameters, key, value);
} else if (!g_strcmp0 ("passphrase", key)) {
g_free (srtobject->passphrase);
srtobject->passphrase = g_strdup (value);
} else if (!g_strcmp0 ("pbkeylen", key)) {
gst_srt_object_set_enum_value (srtobject->parameters,
GST_TYPE_SRT_KEY_LENGTH, key, value);
} else if (!g_strcmp0 ("streamid", key)) {
gst_srt_object_set_string_value (srtobject->parameters, key, value);
}
}
g_hash_table_unref (query_table);
}
gst_srt_object_validate_parameters (srtobject->parameters, srtobject->uri);
return TRUE;
}
static gpointer
thread_func (gpointer data)
{
GstSRTObject *srtobject = data;
SRTSOCKET caller_sock;
union
{
struct sockaddr_storage ss;
struct sockaddr sa;
} caller_sa;
int caller_sa_len;
gint poll_timeout;
SRTSOCKET rsock;
gint rsocklen = 1;
for (;;) {
if (!gst_structure_get_int (srtobject->parameters, "poll-timeout",
&poll_timeout)) {
poll_timeout = GST_SRT_DEFAULT_POLL_TIMEOUT;
}
GST_DEBUG_OBJECT (srtobject->element, "Waiting a request from caller");
if (srt_epoll_wait (srtobject->listener_poll_id, &rsock,
&rsocklen, 0, 0, poll_timeout, NULL, 0, NULL, 0) < 0) {
gint srt_errno = srt_getlasterror (NULL);
if (srtobject->listener_poll_id == SRT_ERROR)
return NULL;
if (srt_errno == SRT_ETIMEOUT) {
continue;
} else {
GST_ELEMENT_ERROR (srtobject->element, RESOURCE, FAILED,
("abort polling: %s", srt_getlasterror_str ()), (NULL));
return NULL;
}
}
caller_sock =
srt_accept (srtobject->listener_sock, &caller_sa.sa, &caller_sa_len);
if (caller_sock != SRT_INVALID_SOCK) {
SRTCaller *caller;
gint flag = SRT_EPOLL_ERR;
caller = srt_caller_new ();
caller->sockaddr =
g_socket_address_new_from_native (&caller_sa.sa, caller_sa_len);
caller->poll_id = srt_epoll_create ();
caller->sock = caller_sock;
if (gst_uri_handler_get_uri_type (GST_URI_HANDLER
(srtobject->element)) == GST_URI_SRC) {
flag |= SRT_EPOLL_IN;
} else {
flag |= SRT_EPOLL_OUT;
}
if (srt_epoll_add_usock (caller->poll_id, caller_sock, &flag)) {
GST_ELEMENT_ERROR (srtobject->element, RESOURCE, SETTINGS,
("%s", srt_getlasterror_str ()), (NULL));
srt_caller_free (caller);
/* try-again */
continue;
}
GST_OBJECT_LOCK (srtobject->element);
srtobject->callers = g_list_append (srtobject->callers, caller);
g_cond_signal (&srtobject->sock_cond);
GST_OBJECT_UNLOCK (srtobject->element);
/* notifying caller-added */
if (srtobject->caller_added_closure != NULL) {
GValue values[2] = { G_VALUE_INIT, G_VALUE_INIT };
g_value_init (&values[0], G_TYPE_INT);
g_value_set_int (&values[0], caller->sock);
g_value_init (&values[1], G_TYPE_SOCKET_ADDRESS);
g_value_set_object (&values[1], caller->sockaddr);
g_closure_invoke (srtobject->caller_added_closure, NULL, 2, values,
NULL);
g_value_unset (&values[1]);
}
GST_DEBUG_OBJECT (srtobject->element, "Accept to connect");
if (gst_uri_handler_get_uri_type (GST_URI_HANDLER (srtobject->element)) ==
GST_URI_SRC)
return NULL;
}
}
}
static gboolean
gst_srt_object_wait_connect (GstSRTObject * srtobject,
GCancellable * cancellable, gpointer sa, size_t sa_len, GError ** error)
{
SRTSOCKET sock = SRT_INVALID_SOCK;
const gchar *local_address = NULL;
guint local_port = 0;
gint sock_flags = SRT_EPOLL_ERR | SRT_EPOLL_IN;
gpointer bind_sa;
gsize bind_sa_len;
GSocketAddress *bind_addr;
gst_structure_get_uint (srtobject->parameters, "localport", &local_port);
local_address =
gst_structure_get_string (srtobject->parameters, "localaddress");
if (local_address == NULL)
local_address = GST_SRT_DEFAULT_LOCALADDRESS;
bind_addr = g_inet_socket_address_new_from_string (local_address, local_port);
bind_sa_len = g_socket_address_get_native_size (bind_addr);
bind_sa = g_alloca (bind_sa_len);
if (!g_socket_address_to_native (bind_addr, bind_sa, bind_sa_len, error)) {
goto failed;
}
g_clear_object (&bind_addr);
sock = srt_socket (AF_INET, SOCK_DGRAM, 0);
if (sock == SRT_INVALID_SOCK) {
g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_INIT, "%s",
srt_getlasterror_str ());
goto failed;
}
if (!gst_srt_object_set_common_params (sock, srtobject, error)) {
goto failed;
}
GST_DEBUG_OBJECT (srtobject->element, "Binding to %s (port: %d)",
local_address, local_port);
if (srt_bind (sock, bind_sa, bind_sa_len) == SRT_ERROR) {
g_set_error (error, GST_RESOURCE_ERROR,
GST_RESOURCE_ERROR_OPEN_READ_WRITE, "Cannot bind to %s:%d - %s",
local_address, local_port, srt_getlasterror_str ());
goto failed;
}
if (srt_epoll_add_usock (srtobject->listener_poll_id, sock, &sock_flags)) {
g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_SETTINGS, "%s",
srt_getlasterror_str ());
goto failed;
}
GST_DEBUG_OBJECT (srtobject->element, "Starting to listen on bind socket");
if (srt_listen (sock, 1) == SRT_ERROR) {
g_set_error (error, GST_RESOURCE_ERROR,
GST_RESOURCE_ERROR_OPEN_READ_WRITE, "Cannot listen on bind socket: %s",
srt_getlasterror_str ());
goto failed;
}
srtobject->listener_sock = sock;
srtobject->thread =
g_thread_try_new ("GstSRTObjectListener", thread_func, srtobject, error);
if (*error != NULL) {
goto failed;
}
return TRUE;
failed:
if (srtobject->listener_poll_id != SRT_ERROR) {
srt_epoll_release (srtobject->listener_poll_id);
}
if (sock != SRT_INVALID_SOCK) {
srt_close (sock);
}
g_clear_object (&bind_addr);
srtobject->listener_poll_id = SRT_ERROR;
srtobject->listener_sock = SRT_INVALID_SOCK;
return FALSE;
}
static gboolean
gst_srt_object_connect (GstSRTObject * srtobject,
GstSRTConnectionMode connection_mode, gpointer sa, size_t sa_len,
GError ** error)
{
SRTSOCKET sock;
gint option_val = -1;
gint sock_flags = SRT_EPOLL_ERR;
guint local_port = 0;
const gchar *local_address = NULL;
sock = srt_socket (AF_INET, SOCK_DGRAM, 0);
if (sock == SRT_INVALID_SOCK) {
g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_INIT, "%s",
srt_getlasterror_str ());
goto failed;
}
if (!gst_srt_object_set_common_params (sock, srtobject, error)) {
goto failed;
}
switch (gst_uri_handler_get_uri_type (GST_URI_HANDLER (srtobject->element))) {
case GST_URI_SRC:
option_val = 0;
sock_flags |= SRT_EPOLL_IN;
break;
case GST_URI_SINK:
option_val = 1;
sock_flags |= SRT_EPOLL_OUT;
break;
default:
g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_SETTINGS,
"Cannot determine stream direction");
goto failed;
}
if (srt_setsockopt (sock, 0, SRTO_SENDER, &option_val, sizeof (gint))) {
g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_SETTINGS, "%s",
srt_getlasterror_str ());
goto failed;
}
option_val = (connection_mode == GST_SRT_CONNECTION_MODE_RENDEZVOUS);
if (srt_setsockopt (sock, 0, SRTO_RENDEZVOUS, &option_val, sizeof (gint))) {
g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_SETTINGS, "%s",
srt_getlasterror_str ());
goto failed;
}
gst_structure_get_uint (srtobject->parameters, "localport", &local_port);
local_address =
gst_structure_get_string (srtobject->parameters, "localaddress");
/* According to SRT norm, bind local address and port if specified */
if (local_address != NULL && local_port != 0) {
gpointer bind_sa;
gsize bind_sa_len;
GSocketAddress *bind_addr =
g_inet_socket_address_new_from_string (local_address,
local_port);
bind_sa_len = g_socket_address_get_native_size (bind_addr);
bind_sa = g_alloca (bind_sa_len);
if (!g_socket_address_to_native (bind_addr, bind_sa, bind_sa_len, error)) {
g_clear_object (&bind_addr);
goto failed;
}
g_clear_object (&bind_addr);
GST_DEBUG_OBJECT (srtobject->element, "Binding to %s (port: %d)",
local_address, local_port);
if (srt_bind (sock, bind_sa, bind_sa_len) == SRT_ERROR) {
g_set_error (error, GST_RESOURCE_ERROR,
GST_RESOURCE_ERROR_OPEN_READ_WRITE, "Cannot bind to %s:%d - %s",
local_address, local_port, srt_getlasterror_str ());
goto failed;
}
}
if (srt_epoll_add_usock (srtobject->poll_id, sock, &sock_flags)) {
g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_SETTINGS, "%s",
srt_getlasterror_str ());
goto failed;
}
if (srt_connect (sock, sa, sa_len) == SRT_ERROR) {
g_set_error (error, GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_OPEN_READ, "%s",
srt_getlasterror_str ());
goto failed;
}
srtobject->sock = sock;
return TRUE;
failed:
if (srtobject->poll_id != SRT_ERROR) {
srt_epoll_release (srtobject->poll_id);
}
if (sock != SRT_INVALID_SOCK) {
srt_close (sock);
}
srtobject->poll_id = SRT_ERROR;
srtobject->sock = SRT_INVALID_SOCK;
return FALSE;
}
static gboolean
gst_srt_object_open_connection (GstSRTObject * srtobject,
GCancellable * cancellable, GstSRTConnectionMode connection_mode,
gpointer sa, size_t sa_len, GError ** error)
{
gboolean ret = FALSE;
if (connection_mode == GST_SRT_CONNECTION_MODE_LISTENER) {
ret =
gst_srt_object_wait_connect (srtobject, cancellable, sa, sa_len, error);
} else {
ret =
gst_srt_object_connect (srtobject, connection_mode, sa, sa_len, error);
}
return ret;
}
gboolean
gst_srt_object_open (GstSRTObject * srtobject, GCancellable * cancellable,
GError ** error)
{
return gst_srt_object_open_full (srtobject, NULL, NULL, cancellable, error);
}
gboolean
gst_srt_object_open_full (GstSRTObject * srtobject,
GstSRTObjectCallerAdded caller_added_func,
GstSRTObjectCallerRemoved caller_removed_func,
GCancellable * cancellable, GError ** error)
{
GSocketAddress *socket_address = NULL;
GstSRTConnectionMode connection_mode = GST_SRT_CONNECTION_MODE_NONE;
gpointer sa;
size_t sa_len;
const gchar *addr_str;
srtobject->opened = FALSE;
if (caller_added_func != NULL) {
srtobject->caller_added_closure =
g_cclosure_new (G_CALLBACK (caller_added_func), srtobject, NULL);
g_closure_set_marshal (srtobject->caller_added_closure,
g_cclosure_marshal_generic);
}
if (caller_removed_func != NULL) {
srtobject->caller_removed_closure =
g_cclosure_new (G_CALLBACK (caller_removed_func), srtobject, NULL);
g_closure_set_marshal (srtobject->caller_removed_closure,
g_cclosure_marshal_generic);
}
addr_str = gst_uri_get_host (srtobject->uri);
if (addr_str == NULL) {
addr_str = GST_SRT_DEFAULT_LOCALADDRESS;
GST_DEBUG_OBJECT (srtobject->element,
"Given uri doesn't have hostname or address. Use any (%s) and"
" setting listener mode", addr_str);
}
socket_address =
g_inet_socket_address_new_from_string (addr_str,
gst_uri_get_port (srtobject->uri));
if (socket_address == NULL) {
g_set_error (error, GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_OPEN_READ,
"Invalid host");
goto out;
}
/* FIXME: Unfortunately, SRT doesn't support IPv6 currently. */
if (g_socket_address_get_family (socket_address) != G_SOCKET_FAMILY_IPV4) {
g_set_error (error, GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_OPEN_READ,
"SRT supports IPv4 only");
goto out;
}
sa_len = g_socket_address_get_native_size (socket_address);
sa = g_alloca (sa_len);
if (!g_socket_address_to_native (socket_address, sa, sa_len, error)) {
goto out;
}
GST_DEBUG_OBJECT (srtobject->element,
"Opening SRT socket with parameters: %" GST_PTR_FORMAT,
srtobject->parameters);
if (!gst_structure_get_enum (srtobject->parameters,
"mode", GST_TYPE_SRT_CONNECTION_MODE, (gint *) & connection_mode)) {
GST_WARNING_OBJECT (srtobject->element,
"Cannot get connection mode information." " Use default mode");
connection_mode = GST_TYPE_SRT_CONNECTION_MODE;
}
srtobject->listener_poll_id = srt_epoll_create ();
srtobject->opened =
gst_srt_object_open_connection
(srtobject, cancellable, connection_mode, sa, sa_len, error);
out:
g_clear_object (&socket_address);
return srtobject->opened;
}
void
gst_srt_object_close (GstSRTObject * srtobject)
{
GST_OBJECT_LOCK (srtobject->element);
if (srtobject->poll_id != SRT_ERROR) {
srt_epoll_remove_usock (srtobject->poll_id, srtobject->sock);
}
if (srtobject->sock != SRT_INVALID_SOCK) {
GST_DEBUG_OBJECT (srtobject->element, "Closing SRT socket (0x%x)",
srtobject->sock);
srt_close (srtobject->sock);
srtobject->sock = SRT_INVALID_SOCK;
}
if (srtobject->listener_poll_id != SRT_ERROR) {
srt_epoll_remove_usock (srtobject->listener_poll_id,
srtobject->listener_sock);
srtobject->listener_poll_id = SRT_ERROR;
}
if (srtobject->thread) {
GThread *thread = g_steal_pointer (&srtobject->thread);
GST_OBJECT_UNLOCK (srtobject->element);
g_thread_join (thread);
GST_OBJECT_LOCK (srtobject->element);
}
if (srtobject->listener_sock != SRT_INVALID_SOCK) {
GST_DEBUG_OBJECT (srtobject->element, "Closing SRT listener socket (0x%x)",
srtobject->listener_sock);
srt_close (srtobject->listener_sock);
srtobject->listener_sock = SRT_INVALID_SOCK;
}
if (srtobject->callers) {
GList *callers = g_steal_pointer (&srtobject->callers);
GST_OBJECT_UNLOCK (srtobject->element);
g_list_foreach (callers, (GFunc) srt_caller_invoke_removed_closure,
srtobject);
GST_OBJECT_LOCK (srtobject->element);
g_list_free_full (callers, (GDestroyNotify) srt_caller_free);
}
g_clear_pointer (&srtobject->caller_added_closure, g_closure_unref);
g_clear_pointer (&srtobject->caller_removed_closure, g_closure_unref);
srtobject->opened = FALSE;
GST_OBJECT_UNLOCK (srtobject->element);
}
static gboolean
gst_srt_object_wait_caller (GstSRTObject * srtobject,
GCancellable * cancellable, GError ** errorj)
{
gboolean ret = FALSE;
GST_DEBUG_OBJECT (srtobject->element, "Waiting connection from caller");
GST_OBJECT_LOCK (srtobject->element);
while (!g_cancellable_is_cancelled (cancellable)) {
ret = (srtobject->callers != NULL);
if (ret)
break;
g_cond_wait (&srtobject->sock_cond,
GST_OBJECT_GET_LOCK (srtobject->element));
}
GST_OBJECT_UNLOCK (srtobject->element);
GST_DEBUG_OBJECT (srtobject->element, "got %s connection", ret ? "a" : "no");
return ret;
}
gssize
gst_srt_object_read (GstSRTObject * srtobject,
guint8 * data, gsize size, GCancellable * cancellable, GError ** error)
{
gssize len = 0;
gint poll_timeout;
GstSRTConnectionMode connection_mode = GST_SRT_CONNECTION_MODE_NONE;
gint poll_id = SRT_ERROR;
/* Only source element can read data */
g_return_val_if_fail (gst_uri_handler_get_uri_type (GST_URI_HANDLER
(srtobject->element)) == GST_URI_SRC, -1);
gst_structure_get_enum (srtobject->parameters, "mode",
GST_TYPE_SRT_CONNECTION_MODE, (gint *) & connection_mode);
if (connection_mode == GST_SRT_CONNECTION_MODE_LISTENER) {
SRTCaller *caller;
if (!gst_srt_object_wait_caller (srtobject, cancellable, error))
return -1;
GST_OBJECT_LOCK (srtobject->element);
caller = srtobject->callers->data;
if (srtobject->callers)
poll_id = caller->poll_id;
GST_OBJECT_UNLOCK (srtobject->element);
if (poll_id == SRT_ERROR)
return 0;
} else {
poll_id = srtobject->poll_id;
}
if (!gst_structure_get_int (srtobject->parameters, "poll-timeout",
&poll_timeout)) {
poll_timeout = GST_SRT_DEFAULT_POLL_TIMEOUT;
}
while (!g_cancellable_is_cancelled (cancellable)) {
SRTSOCKET rsock;
gint rsocklen = 1;
int pollret;
pollret = srt_epoll_wait (poll_id, &rsock,
&rsocklen, 0, 0, poll_timeout, NULL, 0, NULL, 0);
if (pollret < 0) {
gint srt_errno = srt_getlasterror (NULL);
if (srt_errno != SRT_ETIMEOUT) {
return 0;
}
continue;
}
if (rsocklen < 0) {
GST_WARNING_OBJECT (srtobject->element,
"abnormal SRT socket is detected");
srt_close (rsock);
}
switch (srt_getsockstate (rsock)) {
case SRTS_BROKEN:
case SRTS_NONEXIST:
case SRTS_CLOSED:
if (connection_mode == GST_SRT_CONNECTION_MODE_LISTENER) {
/* Caller has been disappeared. */
return 0;
} else {
GST_WARNING_OBJECT (srtobject->element,
"Invalid SRT socket. Trying to reconnect");
gst_srt_object_close (srtobject);
if (!gst_srt_object_open (srtobject, cancellable, error)) {
return -1;
}
continue;
}
case SRTS_CONNECTED:
/* good to go */
break;
default:
/* not-ready */
continue;
}
len = srt_recvmsg (rsock, (char *) (data), size);
break;
}
return len;
}
void
gst_srt_object_wakeup (GstSRTObject * srtobject, GCancellable * cancellable)
{
GST_DEBUG_OBJECT (srtobject->element, "waking up SRT");
/* Removing all socket descriptors from the monitoring list
* wakes up SRT's threads. We only have one to remove. */
srt_epoll_remove_usock (srtobject->poll_id, srtobject->sock);
/* connection is only waited for in listener mode,
* but there is no harm in raising signal in any case */
GST_OBJECT_LOCK (srtobject->element);
/* however, a race might be harmful ...
* the cancellation is used as 'flushing' flag here,
* so make sure it is so detected by the intended part at proper time */
g_cancellable_cancel (cancellable);
g_cond_signal (&srtobject->sock_cond);
GST_OBJECT_UNLOCK (srtobject->element);
}
static gboolean
gst_srt_object_send_headers (GstSRTObject * srtobject, SRTSOCKET sock,
gint poll_id, gint poll_timeout, GstBufferList * headers,
GCancellable * cancellable)
{
guint size, i;
if (!headers)
return TRUE;
size = gst_buffer_list_length (headers);
GST_DEBUG_OBJECT (srtobject->element, "Sending %u stream headers", size);
for (i = 0; i < size; i++) {
SRTSOCKET wsock = sock;
gint wsocklen = 1;
GstBuffer *buffer = gst_buffer_list_get (headers, i);
GstMapInfo mapinfo;
if (g_cancellable_is_cancelled (cancellable)) {
return FALSE;
}
if (poll_id > 0 && srt_epoll_wait (poll_id, 0, 0, &wsock,
&wsocklen, poll_timeout, NULL, 0, NULL, 0) < 0) {
continue;
}
GST_TRACE_OBJECT (srtobject->element, "sending header %u %" GST_PTR_FORMAT,
i, buffer);
if (!gst_buffer_map (buffer, &mapinfo, GST_MAP_READ)) {
GST_ELEMENT_ERROR (srtobject->element, RESOURCE, READ,
("Could not map the input stream"), (NULL));
return FALSE;
}
if (srt_sendmsg2 (wsock, (char *) mapinfo.data, mapinfo.size,
0) == SRT_ERROR) {
GST_ELEMENT_ERROR (srtobject->element, RESOURCE, WRITE, NULL,
("%s", srt_getlasterror_str ()));
gst_buffer_unmap (buffer, &mapinfo);
return FALSE;
}
gst_buffer_unmap (buffer, &mapinfo);
}
return TRUE;
}
static gssize
gst_srt_object_write_to_callers (GstSRTObject * srtobject,
GstBufferList * headers,
const GstMapInfo * mapinfo, GCancellable * cancellable, GError ** error)
{
GList *callers;
GST_OBJECT_LOCK (srtobject->element);
callers = srtobject->callers;
while (callers != NULL) {
gssize len = 0;
const guint8 *msg = mapinfo->data;
gint sent;
gint payload_size, optlen = 1;
SRTCaller *caller = callers->data;
callers = callers->next;
if (g_cancellable_is_cancelled (cancellable)) {
GST_OBJECT_UNLOCK (srtobject->element);
return -1;
}
if (!caller->sent_headers) {
if (!gst_srt_object_send_headers (srtobject, caller->sock, -1,
-1, headers, cancellable)) {
goto err;
}
caller->sent_headers = TRUE;
}
if (srt_getsockflag (caller->sock, SRTO_PAYLOADSIZE, &payload_size,
&optlen)) {
GST_WARNING_OBJECT (srtobject->element, "%s", srt_getlasterror_str ());
goto err;
}
while (len < mapinfo->size) {
gint rest = MIN (mapinfo->size - len, payload_size);
sent = srt_sendmsg2 (caller->sock, (char *) (msg + len), rest, 0);
if (sent < 0) {
goto err;
}
len += sent;
}
continue;
err:
srtobject->callers = g_list_remove (srtobject->callers, caller);
srt_caller_invoke_removed_closure (caller, srtobject);
srt_caller_free (caller);
}
GST_OBJECT_UNLOCK (srtobject->element);
return mapinfo->size;
}
static gssize
gst_srt_object_write_one (GstSRTObject * srtobject,
GstBufferList * headers,
const GstMapInfo * mapinfo, GCancellable * cancellable, GError ** error)
{
gssize len = 0;
gint poll_timeout;
const guint8 *msg = mapinfo->data;
gint payload_size, optlen = 1;
if (!gst_structure_get_int (srtobject->parameters, "poll-timeout",
&poll_timeout)) {
poll_timeout = GST_SRT_DEFAULT_POLL_TIMEOUT;
}
if (!srtobject->sent_headers) {
if (!gst_srt_object_send_headers (srtobject, srtobject->sock,
srtobject->poll_id, poll_timeout, headers, cancellable)) {
return -1;
}
srtobject->sent_headers = TRUE;
}
while (len < mapinfo->size) {
SRTSOCKET wsock;
gint wsocklen = 1;
gint sent;
gint rest;
if (g_cancellable_is_cancelled (cancellable)) {
break;
}
if (srt_epoll_wait (srtobject->poll_id, 0, 0, &wsock,
&wsocklen, poll_timeout, NULL, 0, NULL, 0) < 0) {
continue;
}
if (srt_getsockflag (wsock, SRTO_PAYLOADSIZE, &payload_size, &optlen)) {
GST_WARNING_OBJECT (srtobject->element, "%s", srt_getlasterror_str ());
break;
}
rest = MIN (mapinfo->size - len, payload_size);
switch (srt_getsockstate (wsock)) {
case SRTS_BROKEN:
case SRTS_NONEXIST:
case SRTS_CLOSED:
GST_WARNING_OBJECT (srtobject->element,
"Invalid SRT socket. Trying to reconnect");
gst_srt_object_close (srtobject);
if (!gst_srt_object_open (srtobject, cancellable, error)) {
return -1;
}
continue;
case SRTS_CONNECTED:
/* good to go */
GST_LOG_OBJECT (srtobject->element, "good to go");
break;
default:
GST_WARNING_OBJECT (srtobject->element, "not ready");
/* not-ready */
continue;
}
sent = srt_sendmsg2 (wsock, (char *) (msg + len), rest, 0);
if (sent < 0) {
GST_ELEMENT_ERROR (srtobject->element, RESOURCE, WRITE, NULL,
("%s", srt_getlasterror_str ()));
break;
}
len += sent;
}
return len;
}
gssize
gst_srt_object_write (GstSRTObject * srtobject,
GstBufferList * headers,
const GstMapInfo * mapinfo, GCancellable * cancellable, GError ** error)
{
gssize len = 0;
GstSRTConnectionMode connection_mode = GST_SRT_CONNECTION_MODE_NONE;
/* Only sink element can write data */
g_return_val_if_fail (gst_uri_handler_get_uri_type (GST_URI_HANDLER
(srtobject->element)) == GST_URI_SINK, -1);
gst_structure_get_enum (srtobject->parameters, "mode",
GST_TYPE_SRT_CONNECTION_MODE, (gint *) & connection_mode);
if (connection_mode == GST_SRT_CONNECTION_MODE_LISTENER) {
if (!gst_srt_object_wait_caller (srtobject, cancellable, error))
return -1;
len =
gst_srt_object_write_to_callers (srtobject, headers, mapinfo,
cancellable, error);
} else {
len =
gst_srt_object_write_one (srtobject, headers, mapinfo, cancellable,
error);
}
return len;
}
static GstStructure *
get_stats_for_srtsock (SRTSOCKET srtsock, gboolean is_sender)
{
GstStructure *s = gst_structure_new_empty ("application/x-srt-statistics");
int ret;
SRT_TRACEBSTATS stats;
ret = srt_bstats (srtsock, &stats, 0);
if (ret >= 0) {
if (is_sender)
gst_structure_set (s,
/* number of sent data packets, including retransmissions */
"packets-sent", G_TYPE_INT64, stats.pktSent,
/* number of lost packets (sender side) */
"packets-sent-lost", G_TYPE_INT, stats.pktSndLoss,
/* number of retransmitted packets */
"packets-retransmitted", G_TYPE_INT, stats.pktRetrans,
/* number of received ACK packets */
"packet-ack-received", G_TYPE_INT, stats.pktRecvACK,
/* number of received NAK packets */
"packet-nack-received", G_TYPE_INT, stats.pktRecvNAK,
/* time duration when UDT is sending data (idle time exclusive) */
"send-duration-us", G_TYPE_INT64, stats.usSndDuration,
/* number of sent data bytes, including retransmissions */
"bytes-sent", G_TYPE_UINT64, stats.byteSent,
/* number of retransmitted bytes */
"bytes-retransmitted", G_TYPE_UINT64, stats.byteRetrans,
/* number of too-late-to-send dropped bytes */
"bytes-sent-dropped", G_TYPE_UINT64, stats.byteSndDrop,
/* number of too-late-to-send dropped packets */
"packets-sent-dropped", G_TYPE_INT, stats.pktSndDrop,
/* sending rate in Mb/s */
"send-rate-mbps", G_TYPE_DOUBLE, stats.mbpsSendRate,
/* busy sending time (i.e., idle time exclusive) */
"send-duration-us", G_TYPE_UINT64, stats.usSndDuration,
"negotiated-latency-ms", G_TYPE_INT, stats.msSndTsbPdDelay, NULL);
else
gst_structure_set (s,
"packets-received", G_TYPE_INT64, stats.pktRecvTotal,
"packets-received-lost", G_TYPE_INT, stats.pktRcvLossTotal,
/* number of sent ACK packets */
"packet-ack-sent", G_TYPE_INT, stats.pktSentACK,
/* number of sent NAK packets */
"packet-nack-sent", G_TYPE_INT, stats.pktSentNAK,
"bytes-received", G_TYPE_UINT64, stats.byteRecvTotal,
"bytes-received-lost", G_TYPE_INT, stats.byteRcvLossTotal,
"receive-rate-mbps", G_TYPE_DOUBLE, stats.mbpsRecvRate,
"negotiated-latency-ms", G_TYPE_INT, stats.msRcvTsbPdDelay, NULL);
gst_structure_set (s,
/* estimated bandwidth, in Mb/s */
"bandwidth-mbps", G_TYPE_DOUBLE, stats.mbpsBandwidth,
"rtt-ms", G_TYPE_DOUBLE, stats.msRTT, NULL);
}
return s;
}
GstStructure *
gst_srt_object_get_stats (GstSRTObject * srtobject)
{
GstStructure *s = NULL;
gboolean is_sender = GST_IS_BASE_SINK (srtobject->element);
GST_OBJECT_LOCK (srtobject->element);
if (srtobject->sock != SRT_INVALID_SOCK) {
s = get_stats_for_srtsock (srtobject->sock, is_sender);
goto done;
}
s = gst_structure_new_empty ("application/x-srt-statistics");
if (srtobject->callers) {
GValueArray *callers_stats = g_value_array_new (1);
GValue callers_stats_v = G_VALUE_INIT;
GList *item;
for (item = srtobject->callers; item; item = item->next) {
SRTCaller *caller = item->data;
GstStructure *tmp = get_stats_for_srtsock (caller->sock, is_sender);
GValue *v;
g_value_array_append (callers_stats, NULL);
v = g_value_array_get_nth (callers_stats, callers_stats->n_values - 1);
g_value_init (v, GST_TYPE_STRUCTURE);
g_value_take_boxed (v, tmp);
}
g_value_init (&callers_stats_v, G_TYPE_VALUE_ARRAY);
g_value_take_boxed (&callers_stats_v, callers_stats);
gst_structure_take_value (s, "callers", &callers_stats_v);
}
done:
GST_OBJECT_UNLOCK (srtobject->element);
return s;
}