gstreamer/subprojects/gst-plugins-bad/ext/srt/gstsrtobject.c
Jan Alexander Steffens (heftig) 424b331afc srt: Remove callers for which srt_bstats fails
This keeps them from accumulating in the element and in the stats while
the sink is not being fed, as long as we at least periodically grab
stats.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/3156>
2022-11-04 13:07:34 +00:00

1988 lines
58 KiB
C

/* GStreamer
* Copyright (C) 2018, Collabora Ltd.
* Copyright (C) 2018, SK Telecom, Co., Ltd.
* Author: Jeongseok Kim <jeongseok.kim@sk.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
/* Needed for GValueArray */
#define GLIB_DISABLE_DEPRECATION_WARNINGS
#include "gstsrtobject.h"
#include <gst/base/gstbasesink.h>
#include <gio/gnetworking.h>
#include <stdlib.h>
#include <stdbool.h>
#include <stdint.h>
GST_DEBUG_CATEGORY_EXTERN (gst_debug_srtobject);
#define GST_CAT_DEFAULT gst_debug_srtobject
#if SRT_VERSION_VALUE > 0x10402
#define REASON_FORMAT "s"
#define REASON_ARGS(reason) srt_rejectreason_str (reason)
#else
/* srt_rejectreason_str() is unavailable in libsrt 1.4.2 and prior due to
* unexported symbol. See https://github.com/Haivision/srt/pull/1728. */
#define REASON_FORMAT "s %d"
#define REASON_ARGS(reason) "reject reason code", (reason)
#endif
/* Define options added in later revisions */
#if SRT_VERSION_VALUE < 0x10402
#define SRTO_DRIFTTRACER 37
/* We can't define SRTO_BINDTODEVICE since it depends on configuration flags *sigh* */
#define SRTO_RETRANSMITALGO 61
#endif
enum
{
PROP_URI = 1,
PROP_MODE,
PROP_LOCALADDRESS,
PROP_LOCALPORT,
PROP_PASSPHRASE,
PROP_PBKEYLEN,
PROP_POLL_TIMEOUT,
PROP_LATENCY,
PROP_MSG_SIZE,
PROP_STATS,
PROP_WAIT_FOR_CONNECTION,
PROP_STREAMID,
PROP_AUTHENTICATION,
PROP_LAST
};
typedef struct
{
SRTSOCKET sock;
gint poll_id;
GSocketAddress *sockaddr;
gboolean sent_headers;
} SRTCaller;
static SRTCaller *
srt_caller_new (void)
{
SRTCaller *caller = g_new0 (SRTCaller, 1);
caller->sock = SRT_INVALID_SOCK;
caller->poll_id = SRT_ERROR;
caller->sent_headers = FALSE;
return caller;
}
static void
srt_caller_free (SRTCaller * caller)
{
g_return_if_fail (caller != NULL);
g_clear_object (&caller->sockaddr);
if (caller->sock != SRT_INVALID_SOCK) {
srt_close (caller->sock);
}
if (caller->poll_id != SRT_ERROR) {
srt_epoll_release (caller->poll_id);
}
g_free (caller);
}
/* called with sock_lock */
static void
srt_caller_signal_removed (SRTCaller * caller, GstSRTObject * srtobject)
{
g_signal_emit_by_name (srtobject->element, "caller-removed", 0,
caller->sockaddr);
}
struct srt_constant_params
{
const gchar *name;
SRT_SOCKOPT param;
const void *val;
int val_len;
};
static const bool bool_false = false;
static const bool bool_true = true;
static const struct linger no_linger = { 0, 0 };
/* *INDENT-OFF* */
static const struct srt_constant_params srt_params[] = {
{"SRTO_SNDSYN", SRTO_SNDSYN, &bool_false, sizeof bool_false}, /* non-blocking */
{"SRTO_RCVSYN", SRTO_RCVSYN, &bool_false, sizeof bool_false}, /* non-blocking */
{"SRTO_LINGER", SRTO_LINGER, &no_linger, sizeof no_linger}, /* no linger time */
{"SRTO_TSBPDMODE", SRTO_TSBPDMODE, &bool_true, sizeof bool_true}, /* Timestamp-based Packet Delivery mode must be enabled */
{NULL, -1, NULL, 0},
};
/* *INDENT-ON* */
typedef struct
{
const gchar *name;
SRT_SOCKOPT opt;
GType gtype;
} SrtOption;
SrtOption srt_options[] = {
{"mss", SRTO_MSS, G_TYPE_INT},
{"fc", SRTO_FC, G_TYPE_INT},
{"sndbuf", SRTO_SNDBUF, G_TYPE_INT},
{"rcvbuf", SRTO_RCVBUF, G_TYPE_INT},
{"maxbw", SRTO_MAXBW, G_TYPE_INT64},
{"tsbpdmode", SRTO_TSBPDMODE, G_TYPE_BOOLEAN},
{"latency", SRTO_LATENCY, G_TYPE_INT},
{"inputbw", SRTO_INPUTBW, G_TYPE_INT64},
{"oheadbw", SRTO_OHEADBW, G_TYPE_INT},
{"passphrase", SRTO_PASSPHRASE, G_TYPE_STRING},
{"pbkeylen", SRTO_PBKEYLEN, G_TYPE_INT},
{"ipttl", SRTO_IPTTL, G_TYPE_INT},
{"iptos", SRTO_IPTOS, G_TYPE_INT},
{"tlpktdrop", SRTO_TLPKTDROP, G_TYPE_BOOLEAN},
{"snddropdelay", SRTO_SNDDROPDELAY, G_TYPE_INT},
{"nakreport", SRTO_NAKREPORT, G_TYPE_BOOLEAN},
{"conntimeo", SRTO_CONNTIMEO, G_TYPE_INT},
{"drifttracer", SRTO_DRIFTTRACER, G_TYPE_BOOLEAN},
{"lossmaxttl", SRTO_LOSSMAXTTL, G_TYPE_INT},
{"rcvlatency", SRTO_RCVLATENCY, G_TYPE_INT},
{"peerlatency", SRTO_PEERLATENCY, G_TYPE_INT},
{"minversion", SRTO_MINVERSION, G_TYPE_INT},
{"streamid", SRTO_STREAMID, G_TYPE_STRING},
{"congestion", SRTO_CONGESTION, G_TYPE_STRING},
{"messageapi", SRTO_MESSAGEAPI, G_TYPE_BOOLEAN},
{"payloadsize", SRTO_PAYLOADSIZE, G_TYPE_INT},
{"transtype", SRTO_TRANSTYPE, G_TYPE_INT},
{"kmrefreshrate", SRTO_KMREFRESHRATE, G_TYPE_INT},
{"kmpreannounce", SRTO_KMPREANNOUNCE, G_TYPE_INT},
{"enforcedencryption", SRTO_ENFORCEDENCRYPTION, G_TYPE_BOOLEAN},
{"ipv6only", SRTO_IPV6ONLY, G_TYPE_INT},
{"peeridletimeo", SRTO_PEERIDLETIMEO, G_TYPE_INT},
#if SRT_VERSION_VALUE >= 0x10402
{"bindtodevice", SRTO_BINDTODEVICE, G_TYPE_STRING},
#endif
{"packetfilter", SRTO_PACKETFILTER, G_TYPE_STRING},
{"retransmitalgo", SRTO_RETRANSMITALGO, G_TYPE_INT},
{NULL}
};
static gint srt_init_refcount = 0;
static GSocketAddress *
gst_srt_object_resolve (GstSRTObject * srtobject, const gchar * address,
guint port, GCancellable * cancellable, GError ** err_out)
{
GError *err = NULL;
GSocketAddress *saddr;
GResolver *resolver;
saddr = g_inet_socket_address_new_from_string (address, port);
if (!saddr) {
GList *results;
GST_DEBUG_OBJECT (srtobject->element, "resolving IP address for host %s",
address);
resolver = g_resolver_get_default ();
results = g_resolver_lookup_by_name (resolver, address, cancellable, &err);
if (!results)
goto name_resolve;
saddr = g_inet_socket_address_new (G_INET_ADDRESS (results->data), port);
g_resolver_free_addresses (results);
g_object_unref (resolver);
}
#ifndef GST_DISABLE_GST_DEBUG
{
gchar *ip =
g_inet_address_to_string (g_inet_socket_address_get_address
(G_INET_SOCKET_ADDRESS (saddr)));
GST_DEBUG_OBJECT (srtobject->element, "IP address for host %s is %s",
address, ip);
g_free (ip);
}
#endif
return saddr;
name_resolve:
{
GST_WARNING_OBJECT (srtobject->element, "Failed to resolve %s: %s", address,
err->message);
g_set_error (err_out, GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_OPEN_READ,
"Failed to resolve host '%s': %s", address, err->message);
g_clear_error (&err);
g_object_unref (resolver);
return NULL;
}
}
static gboolean
gst_srt_object_apply_socket_option (SRTSOCKET sock, SrtOption * option,
const GValue * value, GError ** error)
{
union
{
int32_t i;
int64_t i64;
gboolean b;
const gchar *c;
} u;
const void *optval = &u;
gint optlen;
if (!G_VALUE_HOLDS (value, option->gtype)) {
goto bad_type;
}
switch (option->gtype) {
case G_TYPE_INT:
u.i = g_value_get_int (value);
optlen = sizeof u.i;
break;
case G_TYPE_INT64:
u.i64 = g_value_get_int64 (value);
optlen = sizeof u.i64;
break;
case G_TYPE_BOOLEAN:
u.b = g_value_get_boolean (value);
optlen = sizeof u.b;
break;
case G_TYPE_STRING:
u.c = g_value_get_string (value);
optval = u.c;
optlen = u.c ? strlen (u.c) : 0;
if (optlen == 0) {
return TRUE;
}
break;
default:
goto bad_type;
}
if (srt_setsockopt (sock, 0, option->opt, optval, optlen)) {
g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_SETTINGS,
"failed to set %s (reason: %s)", option->name, srt_getlasterror_str ());
return FALSE;
}
return TRUE;
bad_type:
g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_SETTINGS,
"option %s has unsupported type", option->name);
return FALSE;
}
static gboolean
gst_srt_object_set_common_params (SRTSOCKET sock, GstSRTObject * srtobject,
GError ** error)
{
const struct srt_constant_params *params = srt_params;
SrtOption *option = srt_options;
GST_OBJECT_LOCK (srtobject->element);
for (; params->name != NULL; params++) {
if (srt_setsockopt (sock, 0, params->param, params->val, params->val_len)) {
g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_SETTINGS,
"failed to set %s (reason: %s)", params->name,
srt_getlasterror_str ());
goto err;
}
}
for (; option->name; ++option) {
const GValue *val;
val = gst_structure_get_value (srtobject->parameters, option->name);
if (val && !gst_srt_object_apply_socket_option (sock, option, val, error)) {
goto err;
}
}
GST_OBJECT_UNLOCK (srtobject->element);
return TRUE;
err:
GST_OBJECT_UNLOCK (srtobject->element);
return FALSE;
}
GstSRTObject *
gst_srt_object_new (GstElement * element)
{
GstSRTObject *srtobject;
if (g_atomic_int_add (&srt_init_refcount, 1) == 0) {
GST_DEBUG_OBJECT (element, "Starting up SRT");
if (srt_startup () < 0) {
g_warning ("Failed to initialize SRT (reason: %s)",
srt_getlasterror_str ());
}
}
srtobject = g_new0 (GstSRTObject, 1);
srtobject->element = element;
srtobject->parameters = gst_structure_new_empty ("application/x-srt-params");
srtobject->sock = SRT_INVALID_SOCK;
srtobject->poll_id = srt_epoll_create ();
srtobject->listener_sock = SRT_INVALID_SOCK;
srtobject->listener_poll_id = SRT_ERROR;
srtobject->sent_headers = FALSE;
srtobject->wait_for_connection = GST_SRT_DEFAULT_WAIT_FOR_CONNECTION;
g_cond_init (&srtobject->sock_cond);
return srtobject;
}
void
gst_srt_object_destroy (GstSRTObject * srtobject)
{
g_return_if_fail (srtobject != NULL);
if (srtobject->sock != SRT_INVALID_SOCK) {
srt_close (srtobject->sock);
}
srt_epoll_release (srtobject->poll_id);
g_cond_clear (&srtobject->sock_cond);
GST_DEBUG_OBJECT (srtobject->element, "Destroying srtobject");
gst_structure_free (srtobject->parameters);
if (g_atomic_int_dec_and_test (&srt_init_refcount)) {
srt_cleanup ();
GST_DEBUG_OBJECT (srtobject->element, "Cleaning up SRT");
}
g_clear_pointer (&srtobject->uri, gst_uri_unref);
g_free (srtobject);
}
gboolean
gst_srt_object_set_property_helper (GstSRTObject * srtobject,
guint prop_id, const GValue * value, GParamSpec * pspec)
{
GST_OBJECT_LOCK (srtobject->element);
switch (prop_id) {
case PROP_URI:
gst_srt_object_set_uri (srtobject, g_value_get_string (value), NULL);
break;
case PROP_MODE:
gst_structure_set_value (srtobject->parameters, "mode", value);
break;
case PROP_POLL_TIMEOUT:
gst_structure_set_value (srtobject->parameters, "poll-timeout", value);
break;
case PROP_LATENCY:
gst_structure_set_value (srtobject->parameters, "latency", value);
break;
case PROP_LOCALADDRESS:
gst_structure_set_value (srtobject->parameters, "localaddress", value);
break;
case PROP_LOCALPORT:
gst_structure_set_value (srtobject->parameters, "localport", value);
break;
case PROP_PASSPHRASE:
gst_structure_set_value (srtobject->parameters, "passphrase", value);
break;
case PROP_PBKEYLEN:
gst_structure_set (srtobject->parameters, "pbkeylen", G_TYPE_INT,
g_value_get_enum (value), NULL);
break;
case PROP_WAIT_FOR_CONNECTION:
srtobject->wait_for_connection = g_value_get_boolean (value);
break;
case PROP_STREAMID:
gst_structure_set_value (srtobject->parameters, "streamid", value);
break;
case PROP_AUTHENTICATION:
srtobject->authentication = g_value_get_boolean (value);
break;
default:
goto err;
}
GST_OBJECT_UNLOCK (srtobject->element);
return TRUE;
err:
GST_OBJECT_UNLOCK (srtobject->element);
return FALSE;
}
gboolean
gst_srt_object_get_property_helper (GstSRTObject * srtobject,
guint prop_id, GValue * value, GParamSpec * pspec)
{
switch (prop_id) {
case PROP_URI:
GST_OBJECT_LOCK (srtobject->element);
g_value_take_string (value, gst_uri_to_string (srtobject->uri));
GST_OBJECT_UNLOCK (srtobject->element);
break;
case PROP_MODE:{
GstSRTConnectionMode v;
GST_OBJECT_LOCK (srtobject->element);
if (!gst_structure_get_enum (srtobject->parameters, "mode",
GST_TYPE_SRT_CONNECTION_MODE, (gint *) & v)) {
GST_WARNING_OBJECT (srtobject->element, "Failed to get 'mode'");
v = GST_SRT_CONNECTION_MODE_NONE;
}
g_value_set_enum (value, v);
GST_OBJECT_UNLOCK (srtobject->element);
break;
}
case PROP_LOCALADDRESS:
GST_OBJECT_LOCK (srtobject->element);
g_value_set_string (value,
gst_structure_get_string (srtobject->parameters, "localaddress"));
GST_OBJECT_UNLOCK (srtobject->element);
break;
case PROP_LOCALPORT:{
guint v;
GST_OBJECT_LOCK (srtobject->element);
if (!gst_structure_get_uint (srtobject->parameters, "localport", &v)) {
GST_WARNING_OBJECT (srtobject->element, "Failed to get 'localport'");
v = GST_SRT_DEFAULT_PORT;
}
g_value_set_uint (value, v);
GST_OBJECT_UNLOCK (srtobject->element);
break;
}
case PROP_PBKEYLEN:{
GstSRTKeyLength v;
GST_OBJECT_LOCK (srtobject->element);
if (!gst_structure_get_int (srtobject->parameters, "pbkeylen",
(gint *) & v)) {
GST_WARNING_OBJECT (srtobject->element, "Failed to get 'pbkeylen'");
v = GST_SRT_KEY_LENGTH_NO_KEY;
}
g_value_set_enum (value, v);
GST_OBJECT_UNLOCK (srtobject->element);
break;
}
case PROP_POLL_TIMEOUT:{
gint v;
GST_OBJECT_LOCK (srtobject->element);
if (!gst_structure_get_int (srtobject->parameters, "poll-timeout", &v)) {
GST_WARNING_OBJECT (srtobject->element, "Failed to get 'poll-timeout'");
v = GST_SRT_DEFAULT_POLL_TIMEOUT;
}
g_value_set_int (value, v);
GST_OBJECT_UNLOCK (srtobject->element);
break;
}
case PROP_LATENCY:{
gint v;
GST_OBJECT_LOCK (srtobject->element);
if (!gst_structure_get_int (srtobject->parameters, "latency", &v)) {
GST_WARNING_OBJECT (srtobject->element, "Failed to get 'latency'");
v = GST_SRT_DEFAULT_LATENCY;
}
g_value_set_int (value, v);
GST_OBJECT_UNLOCK (srtobject->element);
break;
}
case PROP_STATS:
g_value_take_boxed (value, gst_srt_object_get_stats (srtobject));
break;
case PROP_WAIT_FOR_CONNECTION:
GST_OBJECT_LOCK (srtobject->element);
g_value_set_boolean (value, srtobject->wait_for_connection);
GST_OBJECT_UNLOCK (srtobject->element);
break;
case PROP_STREAMID:
GST_OBJECT_LOCK (srtobject->element);
g_value_set_string (value,
gst_structure_get_string (srtobject->parameters, "streamid"));
GST_OBJECT_UNLOCK (srtobject->element);
break;
case PROP_AUTHENTICATION:
g_value_set_boolean (value, srtobject->authentication);
break;
default:
return FALSE;
}
return TRUE;
}
void
gst_srt_object_install_properties_helper (GObjectClass * gobject_class)
{
/**
* GstSRTSrc:uri:
*
* The URI used by SRT connection. User can specify SRT specific options by URI parameters.
* Refer to <a href="https://github.com/Haivision/srt/blob/master/docs/stransmit.md#medium-srt">Mediun: SRT</a>
*/
g_object_class_install_property (gobject_class, PROP_URI,
g_param_spec_string ("uri", "URI",
"URI in the form of srt://address:port", GST_SRT_DEFAULT_URI,
G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
G_PARAM_STATIC_STRINGS));
/**
* GstSRTSrc:mode:
*
* The SRT connection mode.
* This property can be set by URI parameters.
*/
g_object_class_install_property (gobject_class, PROP_MODE,
g_param_spec_enum ("mode", "Connection mode",
"SRT connection mode", GST_TYPE_SRT_CONNECTION_MODE,
GST_SRT_CONNECTION_MODE_CALLER,
G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
G_PARAM_STATIC_STRINGS));
gst_type_mark_as_plugin_api (GST_TYPE_SRT_CONNECTION_MODE, 0);
/**
* GstSRTSrc:localaddress:
*
* The address to bind when #GstSRTSrc:mode is listener or rendezvous.
* This property can be set by URI parameters.
*/
g_object_class_install_property (gobject_class, PROP_LOCALADDRESS,
g_param_spec_string ("localaddress", "Local address",
"Local address to bind", GST_SRT_DEFAULT_LOCALADDRESS,
G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
G_PARAM_STATIC_STRINGS));
/**
* GstSRTSrc:localport:
*
* The local port to bind when #GstSRTSrc:mode is listener or rendezvous.
* This property can be set by URI parameters.
*/
g_object_class_install_property (gobject_class, PROP_LOCALPORT,
g_param_spec_uint ("localport", "Local port",
"Local port to bind", 0,
65535, GST_SRT_DEFAULT_PORT,
G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
G_PARAM_STATIC_STRINGS));
/**
* GstSRTSrc:passphrase:
*
* The password for the encrypted transmission.
* This property can be set by URI parameters.
*/
g_object_class_install_property (gobject_class, PROP_PASSPHRASE,
g_param_spec_string ("passphrase", "Passphrase",
"Password for the encrypted transmission", "",
G_PARAM_WRITABLE | GST_PARAM_MUTABLE_READY | G_PARAM_STATIC_STRINGS));
/**
* GstSRTSrc:pbkeylen:
*
* The crypto key length.
* This property can be set by URI parameters.
*/
g_object_class_install_property (gobject_class, PROP_PBKEYLEN,
g_param_spec_enum ("pbkeylen", "Crypto key length",
"Crypto key length in bytes", GST_TYPE_SRT_KEY_LENGTH,
GST_SRT_DEFAULT_PBKEYLEN,
G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
G_PARAM_STATIC_STRINGS));
gst_type_mark_as_plugin_api (GST_TYPE_SRT_KEY_LENGTH, 0);
/**
* GstSRTSrc:poll-timeout:
*
* The polling timeout used when srt poll is started.
* Even if the default value indicates infinite waiting, it can be cancellable according to #GstState
* This property can be set by URI parameters.
*/
g_object_class_install_property (gobject_class, PROP_POLL_TIMEOUT,
g_param_spec_int ("poll-timeout", "Poll timeout",
"Return poll wait after timeout milliseconds (-1 = infinite)", -1,
G_MAXINT32, GST_SRT_DEFAULT_POLL_TIMEOUT,
G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
G_PARAM_STATIC_STRINGS));
/**
* GstSRTSrc:latency:
*
* The maximum accepted transmission latency.
*/
g_object_class_install_property (gobject_class, PROP_LATENCY,
g_param_spec_int ("latency", "latency",
"Minimum latency (milliseconds)", 0,
G_MAXINT32, GST_SRT_DEFAULT_LATENCY,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
/**
* GstSRTSrc:stats:
*
* The statistics from SRT.
*/
g_object_class_install_property (gobject_class, PROP_STATS,
g_param_spec_boxed ("stats", "Statistics",
"SRT Statistics", GST_TYPE_STRUCTURE,
G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
/**
* GstSRTSink:wait-for-connection:
*
* Boolean to block streaming until a client connects. If TRUE,
* `srtsink' will stream only when a client is connected.
*/
g_object_class_install_property (gobject_class, PROP_WAIT_FOR_CONNECTION,
g_param_spec_boolean ("wait-for-connection",
"Wait for a connection",
"Block the stream until a client connects",
GST_SRT_DEFAULT_WAIT_FOR_CONNECTION,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
/**
* GstSRTSrc:streamid:
*
* The stream id for the SRT access control.
*/
g_object_class_install_property (gobject_class, PROP_STREAMID,
g_param_spec_string ("streamid", "Stream ID",
"Stream ID for the SRT access control", "",
G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
G_PARAM_STATIC_STRINGS));
/**
* GstSRTSink:authentication:
*
* Boolean to authenticate a connection. If TRUE,
* the incoming connection is authenticated. Else,
* all the connections are accepted.
*
* Since: 1.20
*
*/
g_object_class_install_property (gobject_class, PROP_AUTHENTICATION,
g_param_spec_boolean ("authentication",
"Authentication",
"Authenticate a connection",
FALSE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
}
static void
gst_srt_object_set_enum_value (GstStructure * s, GType enum_type,
gconstpointer key, gconstpointer value)
{
GEnumClass *enum_class;
GEnumValue *enum_value;
enum_class = g_type_class_ref (enum_type);
enum_value = g_enum_get_value_by_nick (enum_class, value);
if (enum_value) {
gst_structure_set (s, key, enum_type, enum_value->value, NULL);
}
g_type_class_unref (enum_class);
}
static void
gst_srt_object_set_string_value (GstStructure * s, const gchar * key,
const gchar * value)
{
gst_structure_set (s, key, G_TYPE_STRING, value, NULL);
}
static void
gst_srt_object_set_uint_value (GstStructure * s, const gchar * key,
const gchar * value)
{
gst_structure_set (s, key, G_TYPE_UINT,
(guint) g_ascii_strtoll (value, NULL, 10), NULL);
}
static void
gst_srt_object_set_int_value (GstStructure * s, const gchar * key,
const gchar * value)
{
gst_structure_set (s, key, G_TYPE_INT,
(gint) g_ascii_strtoll (value, NULL, 10), NULL);
}
static void
gst_srt_object_set_int64_value (GstStructure * s, const gchar * key,
const gchar * value)
{
gst_structure_set (s, key, G_TYPE_INT64,
g_ascii_strtoll (value, NULL, 10), NULL);
}
static void
gst_srt_object_set_boolean_value (GstStructure * s, const gchar * key,
const gchar * value)
{
gboolean bool_val;
static const gchar *true_names[] = {
"1", "yes", "on", "true", NULL
};
static const gchar *false_names[] = {
"0", "no", "off", "false", NULL
};
if (g_strv_contains (true_names, value)) {
bool_val = TRUE;
} else if (g_strv_contains (false_names, value)) {
bool_val = FALSE;
} else {
return;
}
gst_structure_set (s, key, G_TYPE_BOOLEAN, bool_val, NULL);
}
static void
gst_srt_object_set_socket_option (GstStructure * s, const gchar * key,
const gchar * value)
{
SrtOption *option = srt_options;
for (; option; ++option) {
if (g_str_equal (key, option->name)) {
switch (option->gtype) {
case G_TYPE_INT:
gst_srt_object_set_int_value (s, key, value);
break;
case G_TYPE_INT64:
gst_srt_object_set_int64_value (s, key, value);
break;
case G_TYPE_STRING:
gst_srt_object_set_string_value (s, key, value);
break;
case G_TYPE_BOOLEAN:
gst_srt_object_set_boolean_value (s, key, value);
break;
}
break;
}
}
}
static void
gst_srt_object_validate_parameters (GstStructure * s, GstUri * uri)
{
GstSRTConnectionMode connection_mode = GST_SRT_CONNECTION_MODE_NONE;
gst_structure_get_enum (s, "mode", GST_TYPE_SRT_CONNECTION_MODE,
(gint *) & connection_mode);
if (connection_mode == GST_SRT_CONNECTION_MODE_RENDEZVOUS ||
connection_mode == GST_SRT_CONNECTION_MODE_LISTENER) {
guint local_port;
const gchar *local_address = gst_structure_get_string (s, "localaddress");
if (local_address == NULL) {
local_address =
gst_uri_get_host (uri) ==
NULL ? GST_SRT_DEFAULT_LOCALADDRESS : gst_uri_get_host (uri);
gst_srt_object_set_string_value (s, "localaddress", local_address);
}
if (!gst_structure_get_uint (s, "localport", &local_port)) {
local_port =
gst_uri_get_port (uri) ==
GST_URI_NO_PORT ? GST_SRT_DEFAULT_PORT : gst_uri_get_port (uri);
gst_structure_set (s, "localport", G_TYPE_UINT, local_port, NULL);
}
}
}
/* called with GST_OBJECT_LOCK (srtobject->element) held */
gboolean
gst_srt_object_set_uri (GstSRTObject * srtobject, const gchar * uri,
GError ** err)
{
GHashTable *query_table = NULL;
GHashTableIter iter;
gpointer key, value;
const char *addr_str;
if (srtobject->opened) {
g_warning
("It's not supported to change the 'uri' property when SRT socket is opened.");
g_set_error (err, GST_URI_ERROR, GST_URI_ERROR_BAD_STATE,
"It's not supported to change the 'uri' property when SRT socket is opened");
return FALSE;
}
if (!g_str_has_prefix (uri, GST_SRT_DEFAULT_URI_SCHEME)) {
g_warning ("Given uri cannot be used for SRT connection.");
g_set_error (err, GST_URI_ERROR, GST_URI_ERROR_BAD_URI,
"Invalid SRT URI scheme");
return FALSE;
}
g_clear_pointer (&srtobject->uri, gst_uri_unref);
srtobject->uri = gst_uri_from_string (uri);
g_clear_pointer (&srtobject->parameters, gst_structure_free);
srtobject->parameters = gst_structure_new ("application/x-srt-params",
"poll-timeout", G_TYPE_INT, GST_SRT_DEFAULT_POLL_TIMEOUT,
"latency", G_TYPE_INT, GST_SRT_DEFAULT_LATENCY, NULL);
query_table = gst_uri_get_query_table (srtobject->uri);
GST_DEBUG_OBJECT (srtobject->element,
"set uri to (host: %s, port: %d) with %d query strings",
gst_uri_get_host (srtobject->uri), gst_uri_get_port (srtobject->uri),
query_table == NULL ? 0 : g_hash_table_size (query_table));
addr_str = gst_uri_get_host (srtobject->uri);
if (addr_str)
gst_srt_object_set_enum_value (srtobject->parameters,
GST_TYPE_SRT_CONNECTION_MODE, "mode", "caller");
else
gst_srt_object_set_enum_value (srtobject->parameters,
GST_TYPE_SRT_CONNECTION_MODE, "mode", "listener");
if (query_table) {
g_hash_table_iter_init (&iter, query_table);
while (g_hash_table_iter_next (&iter, &key, &value)) {
if (!g_strcmp0 ("mode", key)) {
gst_srt_object_set_enum_value (srtobject->parameters,
GST_TYPE_SRT_CONNECTION_MODE, key, value);
} else if (!g_strcmp0 ("localaddress", key)) {
gst_srt_object_set_string_value (srtobject->parameters, key, value);
} else if (!g_strcmp0 ("localport", key)) {
gst_srt_object_set_uint_value (srtobject->parameters, key, value);
} else if (!g_strcmp0 ("poll-timeout", key)) {
gst_srt_object_set_int_value (srtobject->parameters, key, value);
} else {
gst_srt_object_set_socket_option (srtobject->parameters, key, value);
}
}
g_hash_table_unref (query_table);
}
gst_srt_object_validate_parameters (srtobject->parameters, srtobject->uri);
return TRUE;
}
static gpointer
thread_func (gpointer data)
{
GstSRTObject *srtobject = data;
SRTSOCKET caller_sock;
union
{
struct sockaddr_storage ss;
struct sockaddr sa;
} caller_sa;
int caller_sa_len = sizeof (caller_sa);
gint poll_timeout;
SRTSOCKET rsock;
gint rsocklen = 1;
for (;;) {
GST_OBJECT_LOCK (srtobject->element);
if (!gst_structure_get_int (srtobject->parameters, "poll-timeout",
&poll_timeout)) {
poll_timeout = GST_SRT_DEFAULT_POLL_TIMEOUT;
}
GST_OBJECT_UNLOCK (srtobject->element);
GST_DEBUG_OBJECT (srtobject->element, "Waiting a request from caller");
if (srt_epoll_wait (srtobject->listener_poll_id, &rsock, &rsocklen, 0, 0,
poll_timeout, NULL, 0, NULL, 0) < 0) {
gint srt_errno = srt_getlasterror (NULL);
if (srtobject->listener_poll_id == SRT_ERROR)
return NULL;
if (srt_errno == SRT_ETIMEOUT)
continue;
GST_ELEMENT_ERROR (srtobject->element, RESOURCE, FAILED,
("abort polling: %s", srt_getlasterror_str ()), (NULL));
return NULL;
}
caller_sock =
srt_accept (srtobject->listener_sock, &caller_sa.sa, &caller_sa_len);
if (caller_sock != SRT_INVALID_SOCK) {
SRTCaller *caller;
gint flag = SRT_EPOLL_ERR;
caller = srt_caller_new ();
caller->sockaddr =
g_socket_address_new_from_native (&caller_sa.sa, caller_sa_len);
caller->poll_id = srt_epoll_create ();
caller->sock = caller_sock;
if (gst_uri_handler_get_uri_type (GST_URI_HANDLER
(srtobject->element)) == GST_URI_SRC) {
flag |= SRT_EPOLL_IN;
} else {
flag |= SRT_EPOLL_OUT;
}
if (srt_epoll_add_usock (caller->poll_id, caller_sock, &flag)) {
GST_ELEMENT_ERROR (srtobject->element, RESOURCE, SETTINGS,
("%s", srt_getlasterror_str ()), (NULL));
srt_caller_free (caller);
/* try-again */
continue;
}
GST_DEBUG_OBJECT (srtobject->element, "Accept to connect %d",
caller->sock);
g_mutex_lock (&srtobject->sock_lock);
srtobject->callers = g_list_prepend (srtobject->callers, caller);
g_cond_signal (&srtobject->sock_cond);
g_mutex_unlock (&srtobject->sock_lock);
/* notifying caller-added */
g_signal_emit_by_name (srtobject->element, "caller-added", 0,
caller->sockaddr);
if (gst_uri_handler_get_uri_type (GST_URI_HANDLER (srtobject->element)) ==
GST_URI_SRC)
return NULL;
}
}
}
static GSocketAddress *
peeraddr_to_g_socket_address (const struct sockaddr *peeraddr)
{
gsize peeraddr_len;
switch (peeraddr->sa_family) {
case AF_INET:
peeraddr_len = sizeof (struct sockaddr_in);
break;
case AF_INET6:
peeraddr_len = sizeof (struct sockaddr_in6);
break;
default:
g_warning ("Unsupported address family %d", peeraddr->sa_family);
return NULL;
}
return g_socket_address_new_from_native ((gpointer) peeraddr, peeraddr_len);
}
static gint
srt_listen_callback_func (GstSRTObject * self, SRTSOCKET sock, int hs_version,
const struct sockaddr *peeraddr, const char *stream_id)
{
GSocketAddress *addr = peeraddr_to_g_socket_address (peeraddr);
if (!addr) {
GST_WARNING_OBJECT (self->element,
"Invalid peer address. Rejecting sink %d streamid: %s", sock,
stream_id);
return -1;
}
if (self->authentication) {
gboolean authenticated = FALSE;
/* notifying caller-connecting */
g_signal_emit_by_name (self->element, "caller-connecting", addr,
stream_id, &authenticated);
if (!authenticated)
goto reject;
}
GST_DEBUG_OBJECT (self->element,
"Accepting sink %d streamid: %s", sock, stream_id);
g_object_unref (addr);
return 0;
reject:
/* notifying caller-rejected */
GST_WARNING_OBJECT (self->element,
"Rejecting sink %d streamid: %s", sock, stream_id);
g_signal_emit_by_name (self->element, "caller-rejected", addr, stream_id);
g_object_unref (addr);
return -1;
}
static gboolean
gst_srt_object_wait_connect (GstSRTObject * srtobject,
GCancellable * cancellable, gpointer sa, size_t sa_len, GError ** error)
{
SRTSOCKET sock = SRT_INVALID_SOCK;
const gchar *local_address = NULL;
guint local_port = 0;
gint sock_flags = SRT_EPOLL_ERR | SRT_EPOLL_IN;
gpointer bind_sa;
gsize bind_sa_len;
GSocketAddress *bind_addr = NULL;
GST_OBJECT_LOCK (srtobject->element);
gst_structure_get_uint (srtobject->parameters, "localport", &local_port);
local_address =
gst_structure_get_string (srtobject->parameters, "localaddress");
if (local_address == NULL)
local_address = GST_SRT_DEFAULT_LOCALADDRESS;
GST_OBJECT_UNLOCK (srtobject->element);
bind_addr =
gst_srt_object_resolve (srtobject, local_address, local_port, cancellable,
error);
if (!bind_addr) {
goto failed;
}
bind_sa_len = g_socket_address_get_native_size (bind_addr);
bind_sa = g_alloca (bind_sa_len);
if (!g_socket_address_to_native (bind_addr, bind_sa, bind_sa_len, error)) {
goto failed;
}
g_clear_object (&bind_addr);
sock = srt_create_socket ();
if (sock == SRT_INVALID_SOCK) {
g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_INIT, "%s",
srt_getlasterror_str ());
goto failed;
}
if (!gst_srt_object_set_common_params (sock, srtobject, error)) {
goto failed;
}
GST_DEBUG_OBJECT (srtobject->element, "Binding to %s (port: %d)",
local_address, local_port);
if (srt_bind (sock, bind_sa, bind_sa_len) == SRT_ERROR) {
g_set_error (error, GST_RESOURCE_ERROR,
GST_RESOURCE_ERROR_OPEN_READ_WRITE, "Cannot bind to %s:%d - %s",
local_address, local_port, srt_getlasterror_str ());
goto failed;
}
if (srt_epoll_add_usock (srtobject->listener_poll_id, sock, &sock_flags)) {
g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_SETTINGS, "%s",
srt_getlasterror_str ());
goto failed;
}
GST_DEBUG_OBJECT (srtobject->element, "Starting to listen on bind socket");
if (srt_listen (sock, 1) == SRT_ERROR) {
g_set_error (error, GST_RESOURCE_ERROR,
GST_RESOURCE_ERROR_OPEN_READ_WRITE, "Cannot listen on bind socket: %s",
srt_getlasterror_str ());
goto failed;
}
srtobject->listener_sock = sock;
/* Register the SRT listen callback */
if (srt_listen_callback (srtobject->listener_sock,
(srt_listen_callback_fn *) srt_listen_callback_func, srtobject)) {
goto failed;
}
srtobject->thread =
g_thread_try_new ("GstSRTObjectListener", thread_func, srtobject, error);
if (srtobject->thread == NULL) {
GST_ERROR_OBJECT (srtobject->element, "Failed to start thread");
goto failed;
}
return TRUE;
failed:
if (srtobject->listener_poll_id != SRT_ERROR) {
srt_epoll_release (srtobject->listener_poll_id);
}
if (sock != SRT_INVALID_SOCK) {
srt_close (sock);
}
g_clear_object (&bind_addr);
srtobject->listener_poll_id = SRT_ERROR;
srtobject->listener_sock = SRT_INVALID_SOCK;
return FALSE;
}
static gboolean
gst_srt_object_connect (GstSRTObject * srtobject, GCancellable * cancellable,
GstSRTConnectionMode connection_mode, gpointer sa, size_t sa_len,
GError ** error)
{
SRTSOCKET sock;
gint sock_flags = SRT_EPOLL_ERR;
guint local_port = 0;
const gchar *local_address = NULL;
bool sender;
bool rendezvous;
sock = srt_create_socket ();
if (sock == SRT_INVALID_SOCK) {
g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_INIT, "%s",
srt_getlasterror_str ());
return FALSE;
}
if (!gst_srt_object_set_common_params (sock, srtobject, error)) {
goto failed;
}
switch (gst_uri_handler_get_uri_type (GST_URI_HANDLER (srtobject->element))) {
case GST_URI_SRC:
sender = false;
sock_flags |= SRT_EPOLL_IN;
break;
case GST_URI_SINK:
sender = true;
sock_flags |= SRT_EPOLL_OUT;
break;
default:
g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_SETTINGS,
"Cannot determine stream direction");
goto failed;
}
if (srt_setsockopt (sock, 0, SRTO_SENDER, &sender, sizeof sender)) {
g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_SETTINGS, "%s",
srt_getlasterror_str ());
goto failed;
}
rendezvous = (connection_mode == GST_SRT_CONNECTION_MODE_RENDEZVOUS);
if (srt_setsockopt (sock, 0, SRTO_RENDEZVOUS, &rendezvous, sizeof rendezvous)) {
g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_SETTINGS, "%s",
srt_getlasterror_str ());
goto failed;
}
GST_OBJECT_LOCK (srtobject->element);
gst_structure_get_uint (srtobject->parameters, "localport", &local_port);
local_address =
gst_structure_get_string (srtobject->parameters, "localaddress");
GST_OBJECT_UNLOCK (srtobject->element);
/* According to SRT norm, bind local address and port if specified */
if (local_address != NULL && local_port != 0) {
gpointer bind_sa;
gsize bind_sa_len;
GSocketAddress *bind_addr =
gst_srt_object_resolve (srtobject, local_address,
local_port, cancellable, error);
if (!bind_addr) {
goto failed;
}
bind_sa_len = g_socket_address_get_native_size (bind_addr);
bind_sa = g_alloca (bind_sa_len);
if (!g_socket_address_to_native (bind_addr, bind_sa, bind_sa_len, error)) {
g_clear_object (&bind_addr);
goto failed;
}
g_clear_object (&bind_addr);
GST_DEBUG_OBJECT (srtobject->element, "Binding to %s (port: %d)",
local_address, local_port);
if (srt_bind (sock, bind_sa, bind_sa_len) == SRT_ERROR) {
g_set_error (error, GST_RESOURCE_ERROR,
GST_RESOURCE_ERROR_OPEN_READ_WRITE, "Cannot bind to %s:%d - %s",
local_address, local_port, srt_getlasterror_str ());
goto failed;
}
}
if (srt_epoll_add_usock (srtobject->poll_id, sock, &sock_flags)) {
g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_SETTINGS, "%s",
srt_getlasterror_str ());
goto failed;
}
if (srt_connect (sock, sa, sa_len) == SRT_ERROR) {
g_set_error (error, GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_OPEN_READ, "%s",
srt_getlasterror_str ());
srt_epoll_remove_usock (srtobject->poll_id, sock);
goto failed;
}
srtobject->sock = sock;
return TRUE;
failed:
srt_close (sock);
return FALSE;
}
static gboolean
gst_srt_object_open_connection (GstSRTObject * srtobject,
GCancellable * cancellable, GstSRTConnectionMode connection_mode,
gpointer sa, size_t sa_len, GError ** error)
{
gboolean ret = FALSE;
if (connection_mode == GST_SRT_CONNECTION_MODE_LISTENER) {
ret =
gst_srt_object_wait_connect (srtobject, cancellable, sa, sa_len, error);
} else {
ret =
gst_srt_object_connect (srtobject, cancellable, connection_mode, sa,
sa_len, error);
}
return ret;
}
static gboolean
gst_srt_object_open_internal (GstSRTObject * srtobject,
GCancellable * cancellable, GError ** error)
{
GSocketAddress *socket_address = NULL;
GstSRTConnectionMode connection_mode;
gpointer sa;
size_t sa_len;
const gchar *addr_str;
guint port;
gboolean ret = FALSE;
GST_OBJECT_LOCK (srtobject->element);
srtobject->opened = FALSE;
if (!gst_structure_get_enum (srtobject->parameters,
"mode", GST_TYPE_SRT_CONNECTION_MODE, (gint *) & connection_mode)) {
GST_WARNING_OBJECT (srtobject->element,
"Cannot get connection mode information." " Use default mode");
connection_mode = GST_SRT_DEFAULT_MODE;
}
addr_str = gst_uri_get_host (srtobject->uri);
if (addr_str == NULL) {
connection_mode = GST_SRT_CONNECTION_MODE_LISTENER;
addr_str = GST_SRT_DEFAULT_LOCALADDRESS;
GST_DEBUG_OBJECT (srtobject->element,
"Given uri doesn't have hostname or address. Use any (%s) and"
" setting listener mode", addr_str);
}
port = gst_uri_get_port (srtobject->uri);
GST_DEBUG_OBJECT (srtobject->element,
"Opening SRT socket with parameters: %" GST_PTR_FORMAT,
srtobject->parameters);
GST_OBJECT_UNLOCK (srtobject->element);
socket_address =
gst_srt_object_resolve (srtobject, addr_str, port, cancellable, error);
if (socket_address == NULL) {
goto out;
}
sa_len = g_socket_address_get_native_size (socket_address);
sa = g_alloca (sa_len);
if (!g_socket_address_to_native (socket_address, sa, sa_len, error)) {
goto out;
}
srtobject->listener_poll_id = srt_epoll_create ();
ret =
gst_srt_object_open_connection
(srtobject, cancellable, connection_mode, sa, sa_len, error);
GST_OBJECT_LOCK (srtobject->element);
srtobject->opened = ret;
GST_OBJECT_UNLOCK (srtobject->element);
out:
g_clear_object (&socket_address);
return ret;
}
gboolean
gst_srt_object_open (GstSRTObject * srtobject, GCancellable * cancellable,
GError ** error)
{
srtobject->bytes = 0;
return gst_srt_object_open_internal (srtobject, cancellable, error);
}
void
gst_srt_object_close (GstSRTObject * srtobject)
{
g_mutex_lock (&srtobject->sock_lock);
if (srtobject->sock != SRT_INVALID_SOCK) {
srt_epoll_remove_usock (srtobject->poll_id, srtobject->sock);
GST_DEBUG_OBJECT (srtobject->element, "Closing SRT socket (0x%x)",
srtobject->sock);
srt_close (srtobject->sock);
srtobject->sock = SRT_INVALID_SOCK;
}
if (srtobject->listener_poll_id != SRT_ERROR) {
if (srtobject->listener_sock != SRT_INVALID_SOCK) {
srt_epoll_remove_usock (srtobject->listener_poll_id,
srtobject->listener_sock);
}
srt_epoll_release (srtobject->listener_poll_id);
srtobject->listener_poll_id = SRT_ERROR;
}
if (srtobject->thread) {
GThread *thread = g_steal_pointer (&srtobject->thread);
g_mutex_unlock (&srtobject->sock_lock);
g_thread_join (thread);
g_mutex_lock (&srtobject->sock_lock);
}
if (srtobject->listener_sock != SRT_INVALID_SOCK) {
GST_DEBUG_OBJECT (srtobject->element, "Closing SRT listener socket (0x%x)",
srtobject->listener_sock);
srt_close (srtobject->listener_sock);
srtobject->listener_sock = SRT_INVALID_SOCK;
}
if (srtobject->callers) {
GList *callers = g_steal_pointer (&srtobject->callers);
g_list_foreach (callers, (GFunc) srt_caller_signal_removed, srtobject);
g_list_free_full (callers, (GDestroyNotify) srt_caller_free);
}
g_mutex_unlock (&srtobject->sock_lock);
GST_OBJECT_LOCK (srtobject->element);
srtobject->opened = FALSE;
GST_OBJECT_UNLOCK (srtobject->element);
}
static gboolean
gst_srt_object_wait_caller (GstSRTObject * srtobject,
GCancellable * cancellable)
{
gboolean ret;
g_mutex_lock (&srtobject->sock_lock);
ret = (srtobject->callers != NULL);
if (!ret) {
GST_INFO_OBJECT (srtobject->element, "Waiting for connection");
while (!ret && !g_cancellable_is_cancelled (cancellable)) {
g_cond_wait (&srtobject->sock_cond, &srtobject->sock_lock);
ret = (srtobject->callers != NULL);
}
}
g_mutex_unlock (&srtobject->sock_lock);
if (ret) {
GST_DEBUG_OBJECT (srtobject->element, "Got a connection");
}
return ret;
}
gssize
gst_srt_object_read (GstSRTObject * srtobject,
guint8 * data, gsize size, GCancellable * cancellable, GError ** error,
SRT_MSGCTRL * mctrl)
{
gssize len = 0;
gint poll_timeout;
GstSRTConnectionMode connection_mode = GST_SRT_CONNECTION_MODE_NONE;
gint poll_id = SRT_ERROR;
/* Only source element can read data */
g_return_val_if_fail (gst_uri_handler_get_uri_type (GST_URI_HANDLER
(srtobject->element)) == GST_URI_SRC, -1);
GST_OBJECT_LOCK (srtobject->element);
gst_structure_get_enum (srtobject->parameters, "mode",
GST_TYPE_SRT_CONNECTION_MODE, (gint *) & connection_mode);
if (!gst_structure_get_int (srtobject->parameters, "poll-timeout",
&poll_timeout)) {
poll_timeout = GST_SRT_DEFAULT_POLL_TIMEOUT;
}
GST_OBJECT_UNLOCK (srtobject->element);
if (connection_mode == GST_SRT_CONNECTION_MODE_LISTENER) {
if (!gst_srt_object_wait_caller (srtobject, cancellable))
return 0;
g_mutex_lock (&srtobject->sock_lock);
if (srtobject->callers) {
SRTCaller *caller = srtobject->callers->data;
poll_id = caller->poll_id;
}
g_mutex_unlock (&srtobject->sock_lock);
if (poll_id == SRT_ERROR)
return 0;
} else {
poll_id = srtobject->poll_id;
}
while (!g_cancellable_is_cancelled (cancellable)) {
SRTSOCKET rsock;
gint rsocklen = 1;
SRTSOCKET wsock;
gint wsocklen = 1;
if (srt_epoll_wait (poll_id, &rsock, &rsocklen, &wsock, &wsocklen,
poll_timeout, NULL, 0, NULL, 0) < 0) {
gint srt_errno = srt_getlasterror (NULL);
#if SRT_VERSION_VALUE >= 0x010402
if (srt_errno == SRT_EPOLLEMPTY)
return 0;
#endif
if (srt_errno == SRT_ETIMEOUT)
continue;
g_set_error (error, GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_WRITE,
"Failed to poll socket: %s", srt_getlasterror_str ());
return -1;
}
if (wsocklen == 1 && rsocklen == 1) {
/* Socket reported in wsock AND rsock signifies an error. */
gint reason = srt_getrejectreason (wsock);
if (reason == SRT_REJ_BADSECRET || reason == SRT_REJ_UNSECURE) {
if (connection_mode == GST_SRT_CONNECTION_MODE_LISTENER) {
GST_ELEMENT_WARNING (srtobject->element, RESOURCE, NOT_AUTHORIZED,
("Caller failed to authenticate: %" REASON_FORMAT,
REASON_ARGS (reason)), (NULL));
return 0;
}
GST_ELEMENT_WARNING (srtobject->element, RESOURCE, NOT_AUTHORIZED,
("Failed to authenticate: %" REASON_FORMAT ". Trying to reconnect",
REASON_ARGS (reason)), (NULL));
} else {
if (connection_mode == GST_SRT_CONNECTION_MODE_LISTENER) {
/* Caller has disappeared. */
return 0;
}
GST_ELEMENT_WARNING (srtobject->element, RESOURCE, READ,
("Error on SRT socket: %" REASON_FORMAT ". Trying to reconnect",
REASON_ARGS (reason)), (NULL));
}
gst_srt_object_close (srtobject);
if (!gst_srt_object_open_internal (srtobject, cancellable, error)) {
return -1;
}
continue;
}
srt_msgctrl_init (mctrl);
len = srt_recvmsg2 (rsock, (char *) (data), size, mctrl);
if (len == SRT_ERROR) {
gint srt_errno = srt_getlasterror (NULL);
if (srt_errno == SRT_EASYNCRCV) {
continue;
} else {
g_set_error (error, GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_READ,
"Failed to receive from SRT socket: %s", srt_getlasterror_str ());
return -1;
}
}
srtobject->bytes += len;
break;
}
return len;
}
void
gst_srt_object_wakeup (GstSRTObject * srtobject, GCancellable * cancellable)
{
GST_DEBUG_OBJECT (srtobject->element, "waking up SRT");
/* Removing all socket descriptors from the monitoring list
* wakes up SRT's threads. We only have one to remove. */
if (srtobject->sock != SRT_INVALID_SOCK) {
srt_epoll_remove_usock (srtobject->poll_id, srtobject->sock);
}
/* connection is only waited for in listener mode,
* but there is no harm in raising signal in any case */
g_mutex_lock (&srtobject->sock_lock);
/* however, a race might be harmful ...
* the cancellation is used as 'flushing' flag here,
* so make sure it is so detected by the intended part at proper time */
g_cancellable_cancel (cancellable);
g_cond_signal (&srtobject->sock_cond);
g_mutex_unlock (&srtobject->sock_lock);
}
static gboolean
gst_srt_object_send_headers (GstSRTObject * srtobject, SRTSOCKET sock,
gint poll_id, gint poll_timeout, GstBufferList * headers,
GCancellable * cancellable, GError ** error)
{
guint size, i;
if (!headers)
return TRUE;
size = gst_buffer_list_length (headers);
GST_DEBUG_OBJECT (srtobject->element, "Sending %u stream headers", size);
for (i = 0; i < size; i++) {
SRTSOCKET wsock = sock;
gint wsocklen = 1;
gint sent;
GstBuffer *buffer = gst_buffer_list_get (headers, i);
GstMapInfo mapinfo;
if (g_cancellable_is_cancelled (cancellable)) {
return TRUE;
}
if (poll_id >= 0 && srt_epoll_wait (poll_id, 0, 0, &wsock, &wsocklen,
poll_timeout, NULL, 0, NULL, 0) < 0) {
gint srt_errno = srt_getlasterror (NULL);
#if SRT_VERSION_VALUE >= 0x010402
if (srt_errno == SRT_EPOLLEMPTY)
return TRUE;
#endif
if (srt_errno == SRT_ETIMEOUT)
continue;
g_set_error (error, GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_WRITE,
"Failed to poll socket: %s", srt_getlasterror_str ());
return FALSE;
}
GST_TRACE_OBJECT (srtobject->element, "sending header %u %" GST_PTR_FORMAT,
i, buffer);
if (!gst_buffer_map (buffer, &mapinfo, GST_MAP_READ)) {
g_set_error (error, GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_WRITE,
"Failed to map header buffer");
return FALSE;
}
sent = srt_sendmsg2 (wsock, (char *) mapinfo.data, mapinfo.size, 0);
if (sent == SRT_ERROR) {
g_set_error (error, GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_WRITE, "%s",
srt_getlasterror_str ());
gst_buffer_unmap (buffer, &mapinfo);
return FALSE;
}
srtobject->bytes += sent;
gst_buffer_unmap (buffer, &mapinfo);
}
return TRUE;
}
static gssize
gst_srt_object_write_to_callers (GstSRTObject * srtobject,
GstBufferList * headers,
const GstMapInfo * mapinfo, GCancellable * cancellable)
{
GList *item, *next;
g_mutex_lock (&srtobject->sock_lock);
for (item = srtobject->callers, next = NULL; item; item = next) {
SRTCaller *caller = item->data;
gssize len = 0;
const guint8 *msg = mapinfo->data;
gint sent;
gint payload_size, optlen = sizeof (payload_size);
next = item->next;
if (g_cancellable_is_cancelled (cancellable)) {
goto cancelled;
}
if (!caller->sent_headers) {
GError *error = NULL;
if (!gst_srt_object_send_headers (srtobject, caller->sock, -1, 0,
headers, cancellable, &error)) {
GST_WARNING_OBJECT (srtobject->element,
"Failed to send headers to caller %d: %s",
caller->sock, error->message);
g_error_free (error);
goto err;
}
caller->sent_headers = TRUE;
}
if (srt_getsockflag (caller->sock, SRTO_PAYLOADSIZE, &payload_size,
&optlen)) {
GST_WARNING_OBJECT (srtobject->element, "%s", srt_getlasterror_str ());
goto err;
}
while (len < mapinfo->size) {
gint rest = MIN (mapinfo->size - len, payload_size);
sent = srt_sendmsg2 (caller->sock, (char *) (msg + len), rest, 0);
if (sent < 0) {
GST_WARNING_OBJECT (srtobject->element, "Dropping caller %d: %s",
caller->sock, srt_getlasterror_str ());
goto err;
}
len += sent;
srtobject->bytes += sent;
}
continue;
err:
srtobject->callers = g_list_delete_link (srtobject->callers, item);
srt_caller_signal_removed (caller, srtobject);
srt_caller_free (caller);
}
g_mutex_unlock (&srtobject->sock_lock);
return mapinfo->size;
cancelled:
g_mutex_unlock (&srtobject->sock_lock);
return 0;
}
static gssize
gst_srt_object_write_one (GstSRTObject * srtobject,
GstBufferList * headers,
const GstMapInfo * mapinfo, GCancellable * cancellable, GError ** error)
{
gssize len = 0;
gint poll_timeout;
const guint8 *msg = mapinfo->data;
gint payload_size, optlen = sizeof (payload_size);
gboolean wait_for_connection;
GST_OBJECT_LOCK (srtobject->element);
wait_for_connection = srtobject->wait_for_connection;
if (!gst_structure_get_int (srtobject->parameters, "poll-timeout",
&poll_timeout)) {
poll_timeout = GST_SRT_DEFAULT_POLL_TIMEOUT;
}
GST_OBJECT_UNLOCK (srtobject->element);
if (!srtobject->sent_headers) {
if (!gst_srt_object_send_headers (srtobject, srtobject->sock,
srtobject->poll_id, poll_timeout, headers, cancellable, error)) {
return -1;
}
srtobject->sent_headers = TRUE;
}
while (len < mapinfo->size) {
SRTSOCKET rsock;
gint rsocklen = 1;
SRTSOCKET wsock;
gint wsocklen = 1;
gint sent;
gint rest;
if (g_cancellable_is_cancelled (cancellable)) {
break;
}
if (!wait_for_connection &&
srt_getsockstate (srtobject->sock) == SRTS_CONNECTING) {
GST_LOG_OBJECT (srtobject->element,
"Not connected yet. Dropping the buffer.");
break;
}
if (srt_epoll_wait (srtobject->poll_id, &rsock, &rsocklen, &wsock,
&wsocklen, poll_timeout, NULL, 0, NULL, 0) < 0) {
gint srt_errno = srt_getlasterror (NULL);
#if SRT_VERSION_VALUE >= 0x010402
if (srt_errno == SRT_EPOLLEMPTY)
return 0;
#endif
if (srt_errno == SRT_ETIMEOUT)
continue;
g_set_error (error, GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_WRITE,
"Failed to poll socket: %s", srt_getlasterror_str ());
return -1;
}
if (wsocklen == 1 && rsocklen == 1) {
/* Socket reported in wsock AND rsock signifies an error. */
gint reason = srt_getrejectreason (wsock);
if (reason == SRT_REJ_BADSECRET || reason == SRT_REJ_UNSECURE) {
GST_ELEMENT_WARNING (srtobject->element, RESOURCE, NOT_AUTHORIZED,
("Failed to authenticate: %" REASON_FORMAT ". Trying to reconnect",
REASON_ARGS (reason)), (NULL));
} else {
GST_ELEMENT_WARNING (srtobject->element, RESOURCE, WRITE,
("Error on SRT socket: %" REASON_FORMAT ". Trying to reconnect",
REASON_ARGS (reason)), (NULL));
}
gst_srt_object_close (srtobject);
if (!gst_srt_object_open_internal (srtobject, cancellable, error)) {
return -1;
}
continue;
}
if (srt_getsockflag (wsock, SRTO_PAYLOADSIZE, &payload_size, &optlen)) {
g_set_error (error, GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_WRITE, "%s",
srt_getlasterror_str ());
return -1;
}
rest = MIN (mapinfo->size - len, payload_size);
sent = srt_sendmsg2 (wsock, (char *) (msg + len), rest, 0);
if (sent < 0) {
g_set_error (error, GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_WRITE, "%s",
srt_getlasterror_str ());
return -1;
}
len += sent;
srtobject->bytes += sent;
}
return len;
}
gssize
gst_srt_object_write (GstSRTObject * srtobject,
GstBufferList * headers,
const GstMapInfo * mapinfo, GCancellable * cancellable, GError ** error)
{
gssize len = 0;
GstSRTConnectionMode connection_mode = GST_SRT_CONNECTION_MODE_NONE;
gboolean wait_for_connection;
/* Only sink element can write data */
g_return_val_if_fail (gst_uri_handler_get_uri_type (GST_URI_HANDLER
(srtobject->element)) == GST_URI_SINK, -1);
GST_OBJECT_LOCK (srtobject->element);
gst_structure_get_enum (srtobject->parameters, "mode",
GST_TYPE_SRT_CONNECTION_MODE, (gint *) & connection_mode);
wait_for_connection = srtobject->wait_for_connection;
GST_OBJECT_UNLOCK (srtobject->element);
if (connection_mode == GST_SRT_CONNECTION_MODE_LISTENER) {
if (wait_for_connection) {
if (!gst_srt_object_wait_caller (srtobject, cancellable))
return 0;
}
len =
gst_srt_object_write_to_callers (srtobject, headers, mapinfo,
cancellable);
} else {
len =
gst_srt_object_write_one (srtobject, headers, mapinfo, cancellable,
error);
}
return len;
}
static GstStructure *
get_stats_for_srtsock (GstSRTObject * srtobject, SRTSOCKET srtsock)
{
GstStructure *s;
int ret;
SRT_TRACEBSTATS stats;
ret = srt_bstats (srtsock, &stats, 0);
if (ret < 0) {
GST_WARNING_OBJECT (srtobject->element,
"failed to retrieve stats for socket %d (reason %s)",
srtsock, srt_getlasterror_str ());
return NULL;
}
s = gst_structure_new ("application/x-srt-statistics",
/* number of sent data packets, including retransmissions */
"packets-sent", G_TYPE_INT64, stats.pktSent,
/* number of lost packets (sender side) */
"packets-sent-lost", G_TYPE_INT, stats.pktSndLoss,
/* number of retransmitted packets */
"packets-retransmitted", G_TYPE_INT, stats.pktRetrans,
/* number of received ACK packets */
"packet-ack-received", G_TYPE_INT, stats.pktRecvACK,
/* number of received NAK packets */
"packet-nack-received", G_TYPE_INT, stats.pktRecvNAK,
/* time duration when UDT is sending data (idle time exclusive) */
"send-duration-us", G_TYPE_INT64, stats.usSndDuration,
/* number of sent data bytes, including retransmissions */
"bytes-sent", G_TYPE_UINT64, stats.byteSent,
/* number of retransmitted bytes */
"bytes-retransmitted", G_TYPE_UINT64, stats.byteRetrans,
/* number of too-late-to-send dropped bytes */
"bytes-sent-dropped", G_TYPE_UINT64, stats.byteSndDrop,
/* number of too-late-to-send dropped packets */
"packets-sent-dropped", G_TYPE_INT, stats.pktSndDrop,
/* sending rate in Mb/s */
"send-rate-mbps", G_TYPE_DOUBLE, stats.mbpsSendRate,
/* busy sending time (i.e., idle time exclusive) */
"send-duration-us", G_TYPE_UINT64, stats.usSndDuration,
"negotiated-latency-ms", G_TYPE_INT, stats.msSndTsbPdDelay,
"packets-received", G_TYPE_INT64, stats.pktRecvTotal,
"packets-received-lost", G_TYPE_INT, stats.pktRcvLossTotal,
/* number of sent ACK packets */
"packet-ack-sent", G_TYPE_INT, stats.pktSentACK,
/* number of sent NAK packets */
"packet-nack-sent", G_TYPE_INT, stats.pktSentNAK,
"bytes-received", G_TYPE_UINT64, stats.byteRecvTotal,
"bytes-received-lost", G_TYPE_UINT64, stats.byteRcvLossTotal,
"receive-rate-mbps", G_TYPE_DOUBLE, stats.mbpsRecvRate,
"negotiated-latency-ms", G_TYPE_INT, stats.msRcvTsbPdDelay,
/* estimated bandwidth, in Mb/s */
"bandwidth-mbps", G_TYPE_DOUBLE, stats.mbpsBandwidth,
"rtt-ms", G_TYPE_DOUBLE, stats.msRTT, NULL);
GST_DEBUG_OBJECT (srtobject->element,
"retreived stats for socket %d: %" GST_PTR_FORMAT, srtsock, s);
return s;
}
GstStructure *
gst_srt_object_get_stats (GstSRTObject * srtobject)
{
GstStructure *s = NULL;
gboolean is_sender = GST_IS_BASE_SINK (srtobject->element);
g_mutex_lock (&srtobject->sock_lock);
if (srtobject->sock != SRT_INVALID_SOCK) {
s = get_stats_for_srtsock (srtobject, srtobject->sock);
}
if (s == NULL) {
s = gst_structure_new_empty ("application/x-srt-statistics");
}
if (srtobject->callers) {
GValueArray *callers_stats = g_value_array_new (1);
GValue callers_stats_v = G_VALUE_INIT;
GList *item, *next;
for (item = srtobject->callers, next = NULL; item; item = next) {
SRTCaller *caller = item->data;
GstStructure *tmp;
GValue *v;
next = item->next;
tmp = get_stats_for_srtsock (srtobject, caller->sock);
if (tmp == NULL) {
srtobject->callers = g_list_delete_link (srtobject->callers, item);
srt_caller_signal_removed (caller, srtobject);
srt_caller_free (caller);
continue;
}
gst_structure_set (tmp, "caller-address", G_TYPE_SOCKET_ADDRESS,
caller->sockaddr, NULL);
g_value_array_append (callers_stats, NULL);
v = g_value_array_get_nth (callers_stats, callers_stats->n_values - 1);
g_value_init (v, GST_TYPE_STRUCTURE);
g_value_take_boxed (v, tmp);
}
g_value_init (&callers_stats_v, G_TYPE_VALUE_ARRAY);
g_value_take_boxed (&callers_stats_v, callers_stats);
gst_structure_take_value (s, "callers", &callers_stats_v);
}
gst_structure_set (s, is_sender ? "bytes-sent-total" : "bytes-received-total",
G_TYPE_UINT64, srtobject->bytes, NULL);
g_mutex_unlock (&srtobject->sock_lock);
return s;
}