srt: Use g_cancellable_get_fd for poll cancellation

Removing sockets from the epoll for cancellation is unreliable and might
not be thread-safe. Rather, have SRT watch a FD from the cancellable if
available. Keep the cancellable cancelled while we're not open.

Use the regular single-socket `sock` and `poll_id` fields for the
listening thread instead of duplicating them.

Before polling we need to check the socket state. SRT closes broken
sockets by itself and when the epoll contains our cancellation FD it can
no longer be empty, which was an error before.

Treat more failures in the read and write operations as an opportunity
to try a reconnect.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/4087>
This commit is contained in:
Jan Alexander Steffens (heftig) 2023-03-01 16:46:19 -05:00 committed by GStreamer Marge Bot
parent 3f75836822
commit c533010e20
2 changed files with 343 additions and 261 deletions

View file

@ -37,6 +37,15 @@
GST_DEBUG_CATEGORY_EXTERN (gst_debug_srtobject);
#define GST_CAT_DEFAULT gst_debug_srtobject
#define ERROR_TO_WARNING(srtobject, error, suffix) \
G_STMT_START { \
gchar *text = g_strdup_printf ("%s%s", (error)->message, (suffix)); \
GST_WARNING_OBJECT ((srtobject)->element, "warning: %s", text); \
gst_element_message_full ((srtobject)->element, GST_MESSAGE_WARNING, \
(error)->domain, (error)->code, text, NULL, __FILE__, GST_FUNCTION, \
__LINE__); \
} G_STMT_END
#if SRT_VERSION_VALUE > 0x10402
#define REASON_FORMAT "s"
#define REASON_ARGS(reason) srt_rejectreason_str (reason)
@ -338,6 +347,7 @@ GstSRTObject *
gst_srt_object_new (GstElement * element)
{
GstSRTObject *srtobject;
gint fd, fd_flags = SRT_EPOLL_ERR | SRT_EPOLL_IN;
if (g_atomic_int_add (&srt_init_refcount, 1) == 0) {
GST_DEBUG_OBJECT (element, "Starting up SRT");
@ -353,12 +363,15 @@ gst_srt_object_new (GstElement * element)
srtobject->parameters = gst_structure_new_empty ("application/x-srt-params");
srtobject->sock = SRT_INVALID_SOCK;
srtobject->poll_id = srt_epoll_create ();
srtobject->listener_sock = SRT_INVALID_SOCK;
srtobject->listener_poll_id = SRT_ERROR;
srtobject->sent_headers = FALSE;
srtobject->wait_for_connection = GST_SRT_DEFAULT_WAIT_FOR_CONNECTION;
srtobject->auto_reconnect = GST_SRT_DEFAULT_AUTO_RECONNECT;
fd = g_cancellable_get_fd (srtobject->cancellable);
if (fd >= 0)
srt_epoll_add_ssock (srtobject->poll_id, fd, &fd_flags);
g_cancellable_cancel (srtobject->cancellable);
g_cond_init (&srtobject->sock_cond);
return srtobject;
}
@ -928,20 +941,8 @@ static gpointer
thread_func (gpointer data)
{
GstSRTObject *srtobject = data;
SRTSOCKET caller_sock;
union
{
struct sockaddr_storage ss;
struct sockaddr sa;
} caller_sa;
int caller_sa_len = sizeof (caller_sa);
gint poll_timeout;
SRTSOCKET rsock;
gint rsocklen = 1;
for (;;) {
GST_OBJECT_LOCK (srtobject->element);
if (!gst_structure_get_int (srtobject->parameters, "poll-timeout",
&poll_timeout)) {
@ -949,29 +950,75 @@ thread_func (gpointer data)
}
GST_OBJECT_UNLOCK (srtobject->element);
GST_DEBUG_OBJECT (srtobject->element, "Waiting a request from caller");
for (;;) {
SRTSOCKET rsock;
gint rsocklen = 1;
SYSSOCKET rsys, wsys;
gint rsyslen = 1, wsyslen = 1;
gint ret;
SRTSOCKET caller_sock;
union
{
struct sockaddr_storage ss;
struct sockaddr sa;
} caller_sa;
int caller_sa_len = sizeof (caller_sa);
SRTCaller *caller;
gint flag = SRT_EPOLL_ERR;
gint fd, fd_flags = SRT_EPOLL_ERR | SRT_EPOLL_IN;
if (srt_epoll_wait (srtobject->listener_poll_id, &rsock, &rsocklen, 0, 0,
poll_timeout, NULL, 0, NULL, 0) < 0) {
gint srt_errno = srt_getlasterror (NULL);
GST_OBJECT_LOCK (srtobject->element);
if (!srtobject->opened) {
GST_OBJECT_UNLOCK (srtobject->element);
break;
}
GST_OBJECT_UNLOCK (srtobject->element);
if (srtobject->listener_poll_id == SRT_ERROR)
switch (srt_getsockstate (srtobject->sock)) {
case SRTS_BROKEN:
case SRTS_CLOSING:
case SRTS_CLOSED:
case SRTS_NONEXIST:
GST_ELEMENT_ERROR (srtobject->element, RESOURCE, FAILED,
("Socket is broken or closed"), (NULL));
return NULL;
default:
break;
}
GST_TRACE_OBJECT (srtobject->element, "Waiting on listening socket");
ret =
srt_epoll_wait (srtobject->poll_id, &rsock, &rsocklen, NULL, 0,
poll_timeout, &rsys, &rsyslen, &wsys, &wsyslen);
GST_OBJECT_LOCK (srtobject->element);
if (!srtobject->opened) {
GST_OBJECT_UNLOCK (srtobject->element);
break;
}
GST_OBJECT_UNLOCK (srtobject->element);
if (ret < 0) {
gint srt_errno = srt_getlasterror (NULL);
if (srt_errno == SRT_ETIMEOUT)
continue;
GST_ELEMENT_ERROR (srtobject->element, RESOURCE, FAILED,
("abort polling: %s", srt_getlasterror_str ()), (NULL));
("Failed to poll socket: %s", srt_getlasterror_str ()), (NULL));
return NULL;
}
caller_sock =
srt_accept (srtobject->listener_sock, &caller_sa.sa, &caller_sa_len);
if (rsocklen != 1)
continue;
if (caller_sock != SRT_INVALID_SOCK) {
SRTCaller *caller;
gint flag = SRT_EPOLL_ERR;
caller_sock = srt_accept (rsock, &caller_sa.sa, &caller_sa_len);
if (caller_sock == SRT_INVALID_SOCK) {
GST_ELEMENT_ERROR (srtobject->element, RESOURCE, FAILED,
("Failed to accept connection: %s", srt_getlasterror_str ()), (NULL));
return NULL;
}
caller = srt_caller_new ();
caller->sockaddr =
@ -979,6 +1026,10 @@ thread_func (gpointer data)
caller->poll_id = srt_epoll_create ();
caller->sock = caller_sock;
fd = g_cancellable_get_fd (srtobject->cancellable);
if (fd >= 0)
srt_epoll_add_ssock (srtobject->poll_id, fd, &fd_flags);
if (gst_uri_handler_get_uri_type (GST_URI_HANDLER
(srtobject->element)) == GST_URI_SRC) {
flag |= SRT_EPOLL_IN;
@ -986,9 +1037,8 @@ thread_func (gpointer data)
flag |= SRT_EPOLL_OUT;
}
if (srt_epoll_add_usock (caller->poll_id, caller_sock, &flag)) {
GST_ELEMENT_ERROR (srtobject->element, RESOURCE, SETTINGS,
if (srt_epoll_add_usock (caller->poll_id, caller_sock, &flag) < 0) {
GST_ELEMENT_WARNING (srtobject->element, LIBRARY, SETTINGS,
("%s", srt_getlasterror_str ()), (NULL));
srt_caller_free (caller);
@ -997,8 +1047,7 @@ thread_func (gpointer data)
continue;
}
GST_DEBUG_OBJECT (srtobject->element, "Accept to connect %d",
caller->sock);
GST_DEBUG_OBJECT (srtobject->element, "Accept to connect %d", caller->sock);
g_mutex_lock (&srtobject->sock_lock);
srtobject->callers = g_list_prepend (srtobject->callers, caller);
@ -1011,10 +1060,11 @@ thread_func (gpointer data)
if (gst_uri_handler_get_uri_type (GST_URI_HANDLER (srtobject->element)) ==
GST_URI_SRC)
break;
}
return NULL;
}
}
}
static GSocketAddress *
peeraddr_to_g_socket_address (const struct sockaddr *peeraddr)
@ -1080,6 +1130,7 @@ gst_srt_object_wait_connect (GstSRTObject * srtobject, gpointer sa,
const gchar *local_address = NULL;
guint local_port = 0;
gint sock_flags = SRT_EPOLL_ERR | SRT_EPOLL_IN;
gboolean poll_added = FALSE;
gpointer bind_sa;
gsize bind_sa_len;
@ -1132,25 +1183,25 @@ gst_srt_object_wait_connect (GstSRTObject * srtobject, gpointer sa,
goto failed;
}
if (srt_epoll_add_usock (srtobject->listener_poll_id, sock, &sock_flags)) {
if (srt_epoll_add_usock (srtobject->poll_id, sock, &sock_flags) < 0) {
g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_SETTINGS, "%s",
srt_getlasterror_str ());
goto failed;
}
poll_added = TRUE;
GST_DEBUG_OBJECT (srtobject->element, "Starting to listen on bind socket");
if (srt_listen (sock, 1) == SRT_ERROR) {
g_set_error (error, GST_RESOURCE_ERROR,
GST_RESOURCE_ERROR_OPEN_READ_WRITE, "Cannot listen on bind socket: %s",
srt_getlasterror_str ());
goto failed;
}
srtobject->listener_sock = sock;
srtobject->sock = sock;
/* Register the SRT listen callback */
if (srt_listen_callback (srtobject->listener_sock,
if (srt_listen_callback (srtobject->sock,
(srt_listen_callback_fn *) srt_listen_callback_func, srtobject)) {
goto failed;
}
@ -1164,10 +1215,10 @@ gst_srt_object_wait_connect (GstSRTObject * srtobject, gpointer sa,
return TRUE;
failed:
if (srtobject->listener_poll_id != SRT_ERROR) {
srt_epoll_release (srtobject->listener_poll_id);
failed:
if (poll_added) {
srt_epoll_remove_usock (srtobject->poll_id, sock);
}
if (sock != SRT_INVALID_SOCK) {
@ -1176,8 +1227,7 @@ failed:
g_clear_object (&bind_addr);
srtobject->listener_poll_id = SRT_ERROR;
srtobject->listener_sock = SRT_INVALID_SOCK;
srtobject->sock = SRT_INVALID_SOCK;
return FALSE;
}
@ -1272,7 +1322,7 @@ gst_srt_object_connect (GstSRTObject * srtobject,
}
}
if (srt_epoll_add_usock (srtobject->poll_id, sock, &sock_flags)) {
if (srt_epoll_add_usock (srtobject->poll_id, sock, &sock_flags) < 0) {
g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_SETTINGS, "%s",
srt_getlasterror_str ());
goto failed;
@ -1294,23 +1344,6 @@ failed:
return FALSE;
}
static gboolean
gst_srt_object_open_connection (GstSRTObject * srtobject,
GstSRTConnectionMode connection_mode, gpointer sa, size_t sa_len,
GError ** error)
{
gboolean ret = FALSE;
if (connection_mode == GST_SRT_CONNECTION_MODE_LISTENER) {
ret = gst_srt_object_wait_connect (srtobject, sa, sa_len, error);
} else {
ret =
gst_srt_object_connect (srtobject, connection_mode, sa, sa_len, error);
}
return ret;
}
static gboolean
gst_srt_object_open_internal (GstSRTObject * srtobject, GError ** error)
{
@ -1325,8 +1358,6 @@ gst_srt_object_open_internal (GstSRTObject * srtobject, GError ** error)
GST_OBJECT_LOCK (srtobject->element);
srtobject->opened = FALSE;
if (!gst_structure_get_enum (srtobject->parameters,
"mode", GST_TYPE_SRT_CONNECTION_MODE, (gint *) & connection_mode)) {
GST_WARNING_OBJECT (srtobject->element,
@ -1363,15 +1394,12 @@ gst_srt_object_open_internal (GstSRTObject * srtobject, GError ** error)
goto out;
}
srtobject->listener_poll_id = srt_epoll_create ();
if (connection_mode == GST_SRT_CONNECTION_MODE_LISTENER) {
ret = gst_srt_object_wait_connect (srtobject, sa, sa_len, error);
} else {
ret =
gst_srt_object_open_connection (srtobject, connection_mode, sa, sa_len,
error);
GST_OBJECT_LOCK (srtobject->element);
srtobject->opened = ret;
GST_OBJECT_UNLOCK (srtobject->element);
gst_srt_object_connect (srtobject, connection_mode, sa, sa_len, error);
}
out:
g_clear_object (&socket_address);
@ -1382,13 +1410,18 @@ out:
gboolean
gst_srt_object_open (GstSRTObject * srtobject, GError ** error)
{
GST_OBJECT_LOCK (srtobject->element);
srtobject->opened = TRUE;
GST_OBJECT_UNLOCK (srtobject->element);
g_cancellable_reset (srtobject->cancellable);
srtobject->bytes = 0;
return gst_srt_object_open_internal (srtobject, error);
}
void
gst_srt_object_close (GstSRTObject * srtobject)
static void
gst_srt_object_close_internal (GstSRTObject * srtobject)
{
g_mutex_lock (&srtobject->sock_lock);
@ -1402,15 +1435,6 @@ gst_srt_object_close (GstSRTObject * srtobject)
srtobject->sock = SRT_INVALID_SOCK;
}
if (srtobject->listener_poll_id != SRT_ERROR) {
if (srtobject->listener_sock != SRT_INVALID_SOCK) {
srt_epoll_remove_usock (srtobject->listener_poll_id,
srtobject->listener_sock);
}
srt_epoll_release (srtobject->listener_poll_id);
srtobject->listener_poll_id = SRT_ERROR;
}
if (srtobject->thread) {
GThread *thread = g_steal_pointer (&srtobject->thread);
g_mutex_unlock (&srtobject->sock_lock);
@ -1418,25 +1442,27 @@ gst_srt_object_close (GstSRTObject * srtobject)
g_mutex_lock (&srtobject->sock_lock);
}
if (srtobject->listener_sock != SRT_INVALID_SOCK) {
GST_DEBUG_OBJECT (srtobject->element, "Closing SRT listener socket (0x%x)",
srtobject->listener_sock);
srt_close (srtobject->listener_sock);
srtobject->listener_sock = SRT_INVALID_SOCK;
}
if (srtobject->callers) {
GList *callers = g_steal_pointer (&srtobject->callers);
g_list_foreach (callers, (GFunc) srt_caller_signal_removed, srtobject);
g_list_free_full (callers, (GDestroyNotify) srt_caller_free);
}
g_mutex_unlock (&srtobject->sock_lock);
srtobject->sent_headers = FALSE;
g_mutex_unlock (&srtobject->sock_lock);
}
void
gst_srt_object_close (GstSRTObject * srtobject)
{
GST_OBJECT_LOCK (srtobject->element);
srtobject->opened = FALSE;
GST_OBJECT_UNLOCK (srtobject->element);
g_cancellable_cancel (srtobject->cancellable);
gst_srt_object_close_internal (srtobject);
}
static gboolean
@ -1472,7 +1498,9 @@ gst_srt_object_read (GstSRTObject * srtobject, guint8 * data, gsize size,
gint poll_timeout;
GstSRTConnectionMode connection_mode = GST_SRT_CONNECTION_MODE_NONE;
gint poll_id = SRT_ERROR;
SRTSOCKET sock = SRT_INVALID_SOCK;
gboolean auto_reconnect;
GError *internal_error = NULL;
/* Only source element can read data */
g_return_val_if_fail (gst_uri_handler_get_uri_type (GST_URI_HANDLER
@ -1492,6 +1520,7 @@ gst_srt_object_read (GstSRTObject * srtobject, guint8 * data, gsize size,
GST_OBJECT_UNLOCK (srtobject->element);
retry:
if (connection_mode == GST_SRT_CONNECTION_MODE_LISTENER) {
if (!gst_srt_object_wait_caller (srtobject))
return 0;
@ -1500,6 +1529,7 @@ gst_srt_object_read (GstSRTObject * srtobject, guint8 * data, gsize size,
if (srtobject->callers) {
SRTCaller *caller = srtobject->callers->data;
poll_id = caller->poll_id;
sock = caller->sock;
}
g_mutex_unlock (&srtobject->sock_lock);
@ -1507,92 +1537,78 @@ gst_srt_object_read (GstSRTObject * srtobject, guint8 * data, gsize size,
return 0;
} else {
poll_id = srtobject->poll_id;
sock = srtobject->sock;
}
while (!g_cancellable_is_cancelled (srtobject->cancellable)) {
SRTSOCKET rsock, wsock;
gint rsocklen = 1, wsocklen = 1;
SYSSOCKET rsys, wsys;
gint rsyslen = 1, wsyslen = 1;
gint ret;
SRTSOCKET rsock;
gint rsocklen = 1;
SRTSOCKET wsock;
gint wsocklen = 1;
switch (srt_getsockstate (sock)) {
case SRTS_BROKEN:
case SRTS_CLOSING:
case SRTS_CLOSED:
case SRTS_NONEXIST:
g_set_error (&internal_error, GST_RESOURCE_ERROR,
GST_RESOURCE_ERROR_READ, "Socket is broken or closed");
goto err;
if (srt_epoll_wait (poll_id, &rsock, &rsocklen, &wsock, &wsocklen,
poll_timeout, NULL, 0, NULL, 0) < 0) {
default:
break;
}
GST_TRACE_OBJECT (srtobject->element, "Waiting for read");
ret =
srt_epoll_wait (poll_id, &rsock, &rsocklen, &wsock, &wsocklen,
poll_timeout, &rsys, &rsyslen, &wsys, &wsyslen);
if (g_cancellable_is_cancelled (srtobject->cancellable))
break;
if (ret < 0) {
gint srt_errno = srt_getlasterror (NULL);
#if SRT_VERSION_VALUE >= 0x010402
if (srt_errno == SRT_EPOLLEMPTY
&& g_cancellable_is_cancelled (srtobject->cancellable))
return 0;
#endif
if (srt_errno == SRT_ETIMEOUT)
continue;
g_set_error (error, GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_WRITE,
g_set_error (&internal_error, GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_READ,
"Failed to poll socket: %s", srt_getlasterror_str ());
return -1;
goto err;
}
if (rsocklen != 1)
continue;
if (wsocklen == 1 && rsocklen == 1) {
/* Socket reported in wsock AND rsock signifies an error. */
gint reason = srt_getrejectreason (wsock);
if (reason == SRT_REJ_BADSECRET || reason == SRT_REJ_UNSECURE) {
if (connection_mode == GST_SRT_CONNECTION_MODE_LISTENER) {
GST_ELEMENT_WARNING (srtobject->element, RESOURCE, NOT_AUTHORIZED,
("Caller failed to authenticate: %" REASON_FORMAT,
REASON_ARGS (reason)), (NULL));
return 0;
}
if (!auto_reconnect) {
g_set_error (error, GST_RESOURCE_ERROR,
g_set_error (&internal_error, GST_RESOURCE_ERROR,
GST_RESOURCE_ERROR_NOT_AUTHORIZED,
"Failed to authenticate: %" REASON_FORMAT, REASON_ARGS (reason));
return -1;
}
GST_ELEMENT_WARNING (srtobject->element, RESOURCE, NOT_AUTHORIZED,
("Failed to authenticate: %" REASON_FORMAT ". Trying to reconnect",
REASON_ARGS (reason)), (NULL));
} else {
if (connection_mode == GST_SRT_CONNECTION_MODE_LISTENER) {
/* Caller has disappeared. */
return 0;
}
if (!auto_reconnect) {
g_set_error (error, GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_READ,
g_set_error (&internal_error, GST_RESOURCE_ERROR,
GST_RESOURCE_ERROR_READ,
"Error on SRT socket: %" REASON_FORMAT, REASON_ARGS (reason));
return -1;
}
GST_ELEMENT_WARNING (srtobject->element, RESOURCE, READ,
("Error on SRT socket: %" REASON_FORMAT ". Trying to reconnect",
REASON_ARGS (reason)), (NULL));
goto err;
}
gst_srt_object_close (srtobject);
if (!gst_srt_object_open_internal (srtobject, error)) {
return -1;
}
continue;
}
srt_msgctrl_init (mctrl);
len = srt_recvmsg2 (rsock, (char *) (data), size, mctrl);
if (len == SRT_ERROR) {
gint srt_errno = srt_getlasterror (NULL);
if (srt_errno == SRT_EASYNCRCV) {
if (srt_errno == SRT_EASYNCRCV)
continue;
} else {
g_set_error (error, GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_READ,
g_set_error (&internal_error, GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_READ,
"Failed to receive from SRT socket: %s", srt_getlasterror_str ());
return -1;
}
goto err;
}
srtobject->bytes += len;
@ -1600,6 +1616,32 @@ gst_srt_object_read (GstSRTObject * srtobject, guint8 * data, gsize size,
}
return len;
err:
if (g_cancellable_is_cancelled (srtobject->cancellable))
return 0;
if (connection_mode == GST_SRT_CONNECTION_MODE_LISTENER) {
/* Caller has disappeared. */
ERROR_TO_WARNING (srtobject, internal_error, "");
g_clear_error (&internal_error);
return 0;
}
if (!auto_reconnect) {
g_propagate_error (error, internal_error);
return -1;
}
ERROR_TO_WARNING (srtobject, internal_error, ". Trying to reconnect");
g_clear_error (&internal_error);
gst_srt_object_close_internal (srtobject);
if (!gst_srt_object_open_internal (srtobject, error)) {
return -1;
}
goto retry;
}
void
@ -1607,12 +1649,6 @@ gst_srt_object_unlock (GstSRTObject * srtobject)
{
GST_DEBUG_OBJECT (srtobject->element, "waking up SRT");
/* Removing all socket descriptors from the monitoring list
* wakes up SRT's threads. We only have one to remove. */
if (srtobject->sock != SRT_INVALID_SOCK) {
srt_epoll_remove_usock (srtobject->poll_id, srtobject->sock);
}
/* connection is only waited for in listener mode,
* but there is no harm in raising signal in any case */
g_mutex_lock (&srtobject->sock_lock);
@ -1646,25 +1682,41 @@ gst_srt_object_send_headers (GstSRTObject * srtobject, SRTSOCKET sock,
for (i = 0; i < size; i++) {
SRTSOCKET wsock = sock;
gint wsocklen = 1;
gint sent;
SYSSOCKET rsys, wsys;
gint rsyslen = 1, wsyslen = 1;
gint ret = 0, sent;
GstBuffer *buffer = gst_buffer_list_get (headers, i);
GstMapInfo mapinfo;
if (g_cancellable_is_cancelled (srtobject->cancellable)) {
return TRUE;
if (g_cancellable_is_cancelled (srtobject->cancellable))
break;
if (poll_id >= 0) {
switch (srt_getsockstate (sock)) {
case SRTS_BROKEN:
case SRTS_CLOSING:
case SRTS_CLOSED:
case SRTS_NONEXIST:
g_set_error (error, GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_WRITE,
"Socket is broken or closed");
return FALSE;
default:
break;
}
if (poll_id >= 0 && srt_epoll_wait (poll_id, 0, 0, &wsock, &wsocklen,
poll_timeout, NULL, 0, NULL, 0) < 0) {
GST_TRACE_OBJECT (srtobject->element, "Waiting for header write");
ret =
srt_epoll_wait (poll_id, NULL, 0, &wsock, &wsocklen, poll_timeout,
&rsys, &rsyslen, &wsys, &wsyslen);
if (g_cancellable_is_cancelled (srtobject->cancellable))
break;
}
if (ret < 0) {
gint srt_errno = srt_getlasterror (NULL);
#if SRT_VERSION_VALUE >= 0x010402
if (srt_errno == SRT_EPOLLEMPTY
&& g_cancellable_is_cancelled (srtobject->cancellable))
return TRUE;
#endif
if (srt_errno == SRT_ETIMEOUT)
continue;
@ -1673,6 +1725,9 @@ gst_srt_object_send_headers (GstSRTObject * srtobject, SRTSOCKET sock,
return FALSE;
}
if (wsocklen != 1)
continue;
GST_TRACE_OBJECT (srtobject->element, "sending header %u %" GST_PTR_FORMAT,
i, buffer);
@ -1776,6 +1831,7 @@ gst_srt_object_write_one (GstSRTObject * srtobject, GstBufferList * headers,
const guint8 *msg = mapinfo->data;
gint payload_size, optlen = sizeof (payload_size);
gboolean wait_for_connection, auto_reconnect;
GError *internal_error = NULL;
GST_OBJECT_LOCK (srtobject->element);
wait_for_connection = srtobject->wait_for_connection;
@ -1787,106 +1843,133 @@ gst_srt_object_write_one (GstSRTObject * srtobject, GstBufferList * headers,
}
GST_OBJECT_UNLOCK (srtobject->element);
retry:
if (!srtobject->sent_headers) {
if (!gst_srt_object_send_headers (srtobject, srtobject->sock,
srtobject->poll_id, poll_timeout, headers, error)) {
return -1;
goto err;
}
srtobject->sent_headers = TRUE;
}
while (len < mapinfo->size) {
SRTSOCKET rsock;
gint rsocklen = 1;
SRTSOCKET wsock;
gint wsocklen = 1;
SRTSOCKET rsock, wsock;
gint rsocklen = 1, wsocklen = 1;
SYSSOCKET rsys, wsys;
gint rsyslen = 1, wsyslen = 1;
gint ret, sent, rest;
gboolean connecting_but_not_waiting = FALSE;
gint sent;
gint rest;
if (g_cancellable_is_cancelled (srtobject->cancellable))
break;
if (g_cancellable_is_cancelled (srtobject->cancellable)) {
switch (srt_getsockstate (srtobject->sock)) {
case SRTS_BROKEN:
case SRTS_CLOSING:
case SRTS_CLOSED:
case SRTS_NONEXIST:
g_set_error (&internal_error, GST_RESOURCE_ERROR,
GST_RESOURCE_ERROR_WRITE, "Socket is broken or closed");
goto err;
case SRTS_CONNECTING:
if (!wait_for_connection) {
/* We need to check for SRT_EPOLL_ERR */
connecting_but_not_waiting = TRUE;
}
break;
default:
break;
}
if (!wait_for_connection &&
srt_getsockstate (srtobject->sock) == SRTS_CONNECTING) {
GST_LOG_OBJECT (srtobject->element,
"Not connected yet. Dropping the buffer.");
break;
}
GST_TRACE_OBJECT (srtobject->element, "Waiting a write");
ret =
srt_epoll_wait (srtobject->poll_id, &rsock, &rsocklen, &wsock,
&wsocklen, connecting_but_not_waiting ? 0 : poll_timeout, &rsys,
&rsyslen, &wsys, &wsyslen);
if (srt_epoll_wait (srtobject->poll_id, &rsock, &rsocklen, &wsock,
&wsocklen, poll_timeout, NULL, 0, NULL, 0) < 0) {
if (g_cancellable_is_cancelled (srtobject->cancellable))
break;
if (ret < 0) {
gint srt_errno = srt_getlasterror (NULL);
#if SRT_VERSION_VALUE >= 0x010402
if (srt_errno == SRT_EPOLLEMPTY
&& g_cancellable_is_cancelled (srtobject->cancellable))
return 0;
#endif
if (srt_errno == SRT_ETIMEOUT)
continue;
g_set_error (error, GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_WRITE,
"Failed to poll socket: %s", srt_getlasterror_str ());
return -1;
g_set_error (&internal_error, GST_RESOURCE_ERROR,
GST_RESOURCE_ERROR_WRITE, "Failed to poll socket: %s",
srt_getlasterror_str ());
goto err;
}
if (wsocklen != 1)
continue;
if (wsocklen == 1 && rsocklen == 1) {
/* Socket reported in wsock AND rsock signifies an error. */
gint reason = srt_getrejectreason (wsock);
if (reason == SRT_REJ_BADSECRET || reason == SRT_REJ_UNSECURE) {
if (!auto_reconnect) {
g_set_error (error, GST_RESOURCE_ERROR,
g_set_error (&internal_error, GST_RESOURCE_ERROR,
GST_RESOURCE_ERROR_NOT_AUTHORIZED,
"Failed to authenticate: %" REASON_FORMAT, REASON_ARGS (reason));
return -1;
}
GST_ELEMENT_WARNING (srtobject->element, RESOURCE, NOT_AUTHORIZED,
("Failed to authenticate: %" REASON_FORMAT ". Trying to reconnect",
REASON_ARGS (reason)), (NULL));
} else {
if (!auto_reconnect) {
g_set_error (error, GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_WRITE,
g_set_error (&internal_error, GST_RESOURCE_ERROR,
GST_RESOURCE_ERROR_WRITE,
"Error on SRT socket: %" REASON_FORMAT, REASON_ARGS (reason));
return -1;
}
GST_ELEMENT_WARNING (srtobject->element, RESOURCE, WRITE,
("Error on SRT socket: %" REASON_FORMAT ". Trying to reconnect",
REASON_ARGS (reason)), (NULL));
goto err;
}
gst_srt_object_close (srtobject);
if (!gst_srt_object_open_internal (srtobject, error)) {
return -1;
}
continue;
if (connecting_but_not_waiting) {
GST_LOG_OBJECT (srtobject->element,
"Not connected yet. Dropping the buffer.");
break;
}
if (srt_getsockflag (wsock, SRTO_PAYLOADSIZE, &payload_size, &optlen)) {
g_set_error (error, GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_WRITE, "%s",
srt_getlasterror_str ());
return -1;
g_set_error (&internal_error, GST_RESOURCE_ERROR,
GST_RESOURCE_ERROR_WRITE, "%s", srt_getlasterror_str ());
goto err;
}
rest = MIN (mapinfo->size - len, payload_size);
sent = srt_sendmsg2 (wsock, (char *) (msg + len), rest, 0);
if (sent < 0) {
g_set_error (error, GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_WRITE, "%s",
srt_getlasterror_str ());
return -1;
g_set_error (&internal_error, GST_RESOURCE_ERROR,
GST_RESOURCE_ERROR_WRITE, "%s", srt_getlasterror_str ());
goto err;
}
len += sent;
srtobject->bytes += sent;
continue;
}
return len;
err:
if (g_cancellable_is_cancelled (srtobject->cancellable))
return 0;
if (!auto_reconnect) {
g_propagate_error (error, internal_error);
return -1;
}
ERROR_TO_WARNING (srtobject, internal_error, ". Trying to reconnect");
g_clear_error (&internal_error);
gst_srt_object_close_internal (srtobject);
if (!gst_srt_object_open_internal (srtobject, error)) {
return -1;
}
goto retry;
}
gssize
@ -1988,7 +2071,8 @@ gst_srt_object_get_stats (GstSRTObject * srtobject)
g_mutex_lock (&srtobject->sock_lock);
if (srtobject->sock != SRT_INVALID_SOCK) {
if (srtobject->thread == NULL) {
/* Not a listening socket */
s = get_stats_for_srtsock (srtobject, srtobject->sock);
}

View file

@ -59,8 +59,6 @@ struct _GstSRTObject
gboolean sent_headers;
GTask *listener_task;
SRTSOCKET listener_sock;
gint listener_poll_id;
GThread *thread;