From c533010e206b7b7e66e8c9694830203142980e5c Mon Sep 17 00:00:00 2001 From: "Jan Alexander Steffens (heftig)" Date: Wed, 1 Mar 2023 16:46:19 -0500 Subject: [PATCH] srt: Use g_cancellable_get_fd for poll cancellation Removing sockets from the epoll for cancellation is unreliable and might not be thread-safe. Rather, have SRT watch a FD from the cancellable if available. Keep the cancellable cancelled while we're not open. Use the regular single-socket `sock` and `poll_id` fields for the listening thread instead of duplicating them. Before polling we need to check the socket state. SRT closes broken sockets by itself and when the epoll contains our cancellation FD it can no longer be empty, which was an error before. Treat more failures in the read and write operations as an opportunity to try a reconnect. Part-of: --- .../gst-plugins-bad/ext/srt/gstsrtobject.c | 602 ++++++++++-------- .../gst-plugins-bad/ext/srt/gstsrtobject.h | 2 - 2 files changed, 343 insertions(+), 261 deletions(-) diff --git a/subprojects/gst-plugins-bad/ext/srt/gstsrtobject.c b/subprojects/gst-plugins-bad/ext/srt/gstsrtobject.c index 836478a87d..cf6b20d86c 100644 --- a/subprojects/gst-plugins-bad/ext/srt/gstsrtobject.c +++ b/subprojects/gst-plugins-bad/ext/srt/gstsrtobject.c @@ -37,6 +37,15 @@ GST_DEBUG_CATEGORY_EXTERN (gst_debug_srtobject); #define GST_CAT_DEFAULT gst_debug_srtobject +#define ERROR_TO_WARNING(srtobject, error, suffix) \ +G_STMT_START { \ + gchar *text = g_strdup_printf ("%s%s", (error)->message, (suffix)); \ + GST_WARNING_OBJECT ((srtobject)->element, "warning: %s", text); \ + gst_element_message_full ((srtobject)->element, GST_MESSAGE_WARNING, \ + (error)->domain, (error)->code, text, NULL, __FILE__, GST_FUNCTION, \ + __LINE__); \ +} G_STMT_END + #if SRT_VERSION_VALUE > 0x10402 #define REASON_FORMAT "s" #define REASON_ARGS(reason) srt_rejectreason_str (reason) @@ -338,6 +347,7 @@ GstSRTObject * gst_srt_object_new (GstElement * element) { GstSRTObject *srtobject; + gint fd, fd_flags = SRT_EPOLL_ERR | SRT_EPOLL_IN; if (g_atomic_int_add (&srt_init_refcount, 1) == 0) { GST_DEBUG_OBJECT (element, "Starting up SRT"); @@ -353,12 +363,15 @@ gst_srt_object_new (GstElement * element) srtobject->parameters = gst_structure_new_empty ("application/x-srt-params"); srtobject->sock = SRT_INVALID_SOCK; srtobject->poll_id = srt_epoll_create (); - srtobject->listener_sock = SRT_INVALID_SOCK; - srtobject->listener_poll_id = SRT_ERROR; srtobject->sent_headers = FALSE; srtobject->wait_for_connection = GST_SRT_DEFAULT_WAIT_FOR_CONNECTION; srtobject->auto_reconnect = GST_SRT_DEFAULT_AUTO_RECONNECT; + fd = g_cancellable_get_fd (srtobject->cancellable); + if (fd >= 0) + srt_epoll_add_ssock (srtobject->poll_id, fd, &fd_flags); + g_cancellable_cancel (srtobject->cancellable); + g_cond_init (&srtobject->sock_cond); return srtobject; } @@ -928,92 +941,129 @@ static gpointer thread_func (gpointer data) { GstSRTObject *srtobject = data; - SRTSOCKET caller_sock; - union - { - struct sockaddr_storage ss; - struct sockaddr sa; - } caller_sa; - int caller_sa_len = sizeof (caller_sa); - gint poll_timeout; - SRTSOCKET rsock; - gint rsocklen = 1; + GST_OBJECT_LOCK (srtobject->element); + if (!gst_structure_get_int (srtobject->parameters, "poll-timeout", + &poll_timeout)) { + poll_timeout = GST_SRT_DEFAULT_POLL_TIMEOUT; + } + GST_OBJECT_UNLOCK (srtobject->element); for (;;) { + SRTSOCKET rsock; + gint rsocklen = 1; + SYSSOCKET rsys, wsys; + gint rsyslen = 1, wsyslen = 1; + gint ret; + SRTSOCKET caller_sock; + union + { + struct sockaddr_storage ss; + struct sockaddr sa; + } caller_sa; + int caller_sa_len = sizeof (caller_sa); + SRTCaller *caller; + gint flag = SRT_EPOLL_ERR; + gint fd, fd_flags = SRT_EPOLL_ERR | SRT_EPOLL_IN; + GST_OBJECT_LOCK (srtobject->element); - if (!gst_structure_get_int (srtobject->parameters, "poll-timeout", - &poll_timeout)) { - poll_timeout = GST_SRT_DEFAULT_POLL_TIMEOUT; + if (!srtobject->opened) { + GST_OBJECT_UNLOCK (srtobject->element); + break; } GST_OBJECT_UNLOCK (srtobject->element); - GST_DEBUG_OBJECT (srtobject->element, "Waiting a request from caller"); - - if (srt_epoll_wait (srtobject->listener_poll_id, &rsock, &rsocklen, 0, 0, - poll_timeout, NULL, 0, NULL, 0) < 0) { - gint srt_errno = srt_getlasterror (NULL); - - if (srtobject->listener_poll_id == SRT_ERROR) + switch (srt_getsockstate (srtobject->sock)) { + case SRTS_BROKEN: + case SRTS_CLOSING: + case SRTS_CLOSED: + case SRTS_NONEXIST: + GST_ELEMENT_ERROR (srtobject->element, RESOURCE, FAILED, + ("Socket is broken or closed"), (NULL)); return NULL; + default: + break; + } + + GST_TRACE_OBJECT (srtobject->element, "Waiting on listening socket"); + ret = + srt_epoll_wait (srtobject->poll_id, &rsock, &rsocklen, NULL, 0, + poll_timeout, &rsys, &rsyslen, &wsys, &wsyslen); + + GST_OBJECT_LOCK (srtobject->element); + if (!srtobject->opened) { + GST_OBJECT_UNLOCK (srtobject->element); + break; + } + GST_OBJECT_UNLOCK (srtobject->element); + + if (ret < 0) { + gint srt_errno = srt_getlasterror (NULL); if (srt_errno == SRT_ETIMEOUT) continue; GST_ELEMENT_ERROR (srtobject->element, RESOURCE, FAILED, - ("abort polling: %s", srt_getlasterror_str ()), (NULL)); + ("Failed to poll socket: %s", srt_getlasterror_str ()), (NULL)); return NULL; } - caller_sock = - srt_accept (srtobject->listener_sock, &caller_sa.sa, &caller_sa_len); + if (rsocklen != 1) + continue; - if (caller_sock != SRT_INVALID_SOCK) { - SRTCaller *caller; - gint flag = SRT_EPOLL_ERR; + caller_sock = srt_accept (rsock, &caller_sa.sa, &caller_sa_len); - caller = srt_caller_new (); - caller->sockaddr = - g_socket_address_new_from_native (&caller_sa.sa, caller_sa_len); - caller->poll_id = srt_epoll_create (); - caller->sock = caller_sock; - - if (gst_uri_handler_get_uri_type (GST_URI_HANDLER - (srtobject->element)) == GST_URI_SRC) { - flag |= SRT_EPOLL_IN; - } else { - flag |= SRT_EPOLL_OUT; - } - - if (srt_epoll_add_usock (caller->poll_id, caller_sock, &flag)) { - - GST_ELEMENT_ERROR (srtobject->element, RESOURCE, SETTINGS, - ("%s", srt_getlasterror_str ()), (NULL)); - - srt_caller_free (caller); - - /* try-again */ - continue; - } - - GST_DEBUG_OBJECT (srtobject->element, "Accept to connect %d", - caller->sock); - - g_mutex_lock (&srtobject->sock_lock); - srtobject->callers = g_list_prepend (srtobject->callers, caller); - g_cond_signal (&srtobject->sock_cond); - g_mutex_unlock (&srtobject->sock_lock); - - /* notifying caller-added */ - g_signal_emit_by_name (srtobject->element, "caller-added", 0, - caller->sockaddr); - - if (gst_uri_handler_get_uri_type (GST_URI_HANDLER (srtobject->element)) == - GST_URI_SRC) - return NULL; + if (caller_sock == SRT_INVALID_SOCK) { + GST_ELEMENT_ERROR (srtobject->element, RESOURCE, FAILED, + ("Failed to accept connection: %s", srt_getlasterror_str ()), (NULL)); + return NULL; } + + caller = srt_caller_new (); + caller->sockaddr = + g_socket_address_new_from_native (&caller_sa.sa, caller_sa_len); + caller->poll_id = srt_epoll_create (); + caller->sock = caller_sock; + + fd = g_cancellable_get_fd (srtobject->cancellable); + if (fd >= 0) + srt_epoll_add_ssock (srtobject->poll_id, fd, &fd_flags); + + if (gst_uri_handler_get_uri_type (GST_URI_HANDLER + (srtobject->element)) == GST_URI_SRC) { + flag |= SRT_EPOLL_IN; + } else { + flag |= SRT_EPOLL_OUT; + } + + if (srt_epoll_add_usock (caller->poll_id, caller_sock, &flag) < 0) { + GST_ELEMENT_WARNING (srtobject->element, LIBRARY, SETTINGS, + ("%s", srt_getlasterror_str ()), (NULL)); + + srt_caller_free (caller); + + /* try-again */ + continue; + } + + GST_DEBUG_OBJECT (srtobject->element, "Accept to connect %d", caller->sock); + + g_mutex_lock (&srtobject->sock_lock); + srtobject->callers = g_list_prepend (srtobject->callers, caller); + g_cond_signal (&srtobject->sock_cond); + g_mutex_unlock (&srtobject->sock_lock); + + /* notifying caller-added */ + g_signal_emit_by_name (srtobject->element, "caller-added", 0, + caller->sockaddr); + + if (gst_uri_handler_get_uri_type (GST_URI_HANDLER (srtobject->element)) == + GST_URI_SRC) + break; } + + return NULL; } static GSocketAddress * @@ -1080,6 +1130,7 @@ gst_srt_object_wait_connect (GstSRTObject * srtobject, gpointer sa, const gchar *local_address = NULL; guint local_port = 0; gint sock_flags = SRT_EPOLL_ERR | SRT_EPOLL_IN; + gboolean poll_added = FALSE; gpointer bind_sa; gsize bind_sa_len; @@ -1132,25 +1183,25 @@ gst_srt_object_wait_connect (GstSRTObject * srtobject, gpointer sa, goto failed; } - if (srt_epoll_add_usock (srtobject->listener_poll_id, sock, &sock_flags)) { + if (srt_epoll_add_usock (srtobject->poll_id, sock, &sock_flags) < 0) { g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_SETTINGS, "%s", srt_getlasterror_str ()); goto failed; } + poll_added = TRUE; GST_DEBUG_OBJECT (srtobject->element, "Starting to listen on bind socket"); if (srt_listen (sock, 1) == SRT_ERROR) { g_set_error (error, GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_OPEN_READ_WRITE, "Cannot listen on bind socket: %s", srt_getlasterror_str ()); - goto failed; } - srtobject->listener_sock = sock; + srtobject->sock = sock; /* Register the SRT listen callback */ - if (srt_listen_callback (srtobject->listener_sock, + if (srt_listen_callback (srtobject->sock, (srt_listen_callback_fn *) srt_listen_callback_func, srtobject)) { goto failed; } @@ -1164,10 +1215,10 @@ gst_srt_object_wait_connect (GstSRTObject * srtobject, gpointer sa, return TRUE; -failed: - if (srtobject->listener_poll_id != SRT_ERROR) { - srt_epoll_release (srtobject->listener_poll_id); +failed: + if (poll_added) { + srt_epoll_remove_usock (srtobject->poll_id, sock); } if (sock != SRT_INVALID_SOCK) { @@ -1176,8 +1227,7 @@ failed: g_clear_object (&bind_addr); - srtobject->listener_poll_id = SRT_ERROR; - srtobject->listener_sock = SRT_INVALID_SOCK; + srtobject->sock = SRT_INVALID_SOCK; return FALSE; } @@ -1272,7 +1322,7 @@ gst_srt_object_connect (GstSRTObject * srtobject, } } - if (srt_epoll_add_usock (srtobject->poll_id, sock, &sock_flags)) { + if (srt_epoll_add_usock (srtobject->poll_id, sock, &sock_flags) < 0) { g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_SETTINGS, "%s", srt_getlasterror_str ()); goto failed; @@ -1294,23 +1344,6 @@ failed: return FALSE; } -static gboolean -gst_srt_object_open_connection (GstSRTObject * srtobject, - GstSRTConnectionMode connection_mode, gpointer sa, size_t sa_len, - GError ** error) -{ - gboolean ret = FALSE; - - if (connection_mode == GST_SRT_CONNECTION_MODE_LISTENER) { - ret = gst_srt_object_wait_connect (srtobject, sa, sa_len, error); - } else { - ret = - gst_srt_object_connect (srtobject, connection_mode, sa, sa_len, error); - } - - return ret; -} - static gboolean gst_srt_object_open_internal (GstSRTObject * srtobject, GError ** error) { @@ -1325,8 +1358,6 @@ gst_srt_object_open_internal (GstSRTObject * srtobject, GError ** error) GST_OBJECT_LOCK (srtobject->element); - srtobject->opened = FALSE; - if (!gst_structure_get_enum (srtobject->parameters, "mode", GST_TYPE_SRT_CONNECTION_MODE, (gint *) & connection_mode)) { GST_WARNING_OBJECT (srtobject->element, @@ -1363,15 +1394,12 @@ gst_srt_object_open_internal (GstSRTObject * srtobject, GError ** error) goto out; } - srtobject->listener_poll_id = srt_epoll_create (); - - ret = - gst_srt_object_open_connection (srtobject, connection_mode, sa, sa_len, - error); - - GST_OBJECT_LOCK (srtobject->element); - srtobject->opened = ret; - GST_OBJECT_UNLOCK (srtobject->element); + if (connection_mode == GST_SRT_CONNECTION_MODE_LISTENER) { + ret = gst_srt_object_wait_connect (srtobject, sa, sa_len, error); + } else { + ret = + gst_srt_object_connect (srtobject, connection_mode, sa, sa_len, error); + } out: g_clear_object (&socket_address); @@ -1382,13 +1410,18 @@ out: gboolean gst_srt_object_open (GstSRTObject * srtobject, GError ** error) { + GST_OBJECT_LOCK (srtobject->element); + srtobject->opened = TRUE; + GST_OBJECT_UNLOCK (srtobject->element); + + g_cancellable_reset (srtobject->cancellable); srtobject->bytes = 0; return gst_srt_object_open_internal (srtobject, error); } -void -gst_srt_object_close (GstSRTObject * srtobject) +static void +gst_srt_object_close_internal (GstSRTObject * srtobject) { g_mutex_lock (&srtobject->sock_lock); @@ -1402,15 +1435,6 @@ gst_srt_object_close (GstSRTObject * srtobject) srtobject->sock = SRT_INVALID_SOCK; } - if (srtobject->listener_poll_id != SRT_ERROR) { - if (srtobject->listener_sock != SRT_INVALID_SOCK) { - srt_epoll_remove_usock (srtobject->listener_poll_id, - srtobject->listener_sock); - } - srt_epoll_release (srtobject->listener_poll_id); - srtobject->listener_poll_id = SRT_ERROR; - } - if (srtobject->thread) { GThread *thread = g_steal_pointer (&srtobject->thread); g_mutex_unlock (&srtobject->sock_lock); @@ -1418,25 +1442,27 @@ gst_srt_object_close (GstSRTObject * srtobject) g_mutex_lock (&srtobject->sock_lock); } - if (srtobject->listener_sock != SRT_INVALID_SOCK) { - GST_DEBUG_OBJECT (srtobject->element, "Closing SRT listener socket (0x%x)", - srtobject->listener_sock); - - srt_close (srtobject->listener_sock); - srtobject->listener_sock = SRT_INVALID_SOCK; - } - if (srtobject->callers) { GList *callers = g_steal_pointer (&srtobject->callers); g_list_foreach (callers, (GFunc) srt_caller_signal_removed, srtobject); g_list_free_full (callers, (GDestroyNotify) srt_caller_free); } - g_mutex_unlock (&srtobject->sock_lock); + srtobject->sent_headers = FALSE; + g_mutex_unlock (&srtobject->sock_lock); +} + +void +gst_srt_object_close (GstSRTObject * srtobject) +{ GST_OBJECT_LOCK (srtobject->element); srtobject->opened = FALSE; GST_OBJECT_UNLOCK (srtobject->element); + + g_cancellable_cancel (srtobject->cancellable); + + gst_srt_object_close_internal (srtobject); } static gboolean @@ -1472,7 +1498,9 @@ gst_srt_object_read (GstSRTObject * srtobject, guint8 * data, gsize size, gint poll_timeout; GstSRTConnectionMode connection_mode = GST_SRT_CONNECTION_MODE_NONE; gint poll_id = SRT_ERROR; + SRTSOCKET sock = SRT_INVALID_SOCK; gboolean auto_reconnect; + GError *internal_error = NULL; /* Only source element can read data */ g_return_val_if_fail (gst_uri_handler_get_uri_type (GST_URI_HANDLER @@ -1492,6 +1520,7 @@ gst_srt_object_read (GstSRTObject * srtobject, guint8 * data, gsize size, GST_OBJECT_UNLOCK (srtobject->element); +retry: if (connection_mode == GST_SRT_CONNECTION_MODE_LISTENER) { if (!gst_srt_object_wait_caller (srtobject)) return 0; @@ -1500,6 +1529,7 @@ gst_srt_object_read (GstSRTObject * srtobject, guint8 * data, gsize size, if (srtobject->callers) { SRTCaller *caller = srtobject->callers->data; poll_id = caller->poll_id; + sock = caller->sock; } g_mutex_unlock (&srtobject->sock_lock); @@ -1507,92 +1537,78 @@ gst_srt_object_read (GstSRTObject * srtobject, guint8 * data, gsize size, return 0; } else { poll_id = srtobject->poll_id; + sock = srtobject->sock; } while (!g_cancellable_is_cancelled (srtobject->cancellable)) { + SRTSOCKET rsock, wsock; + gint rsocklen = 1, wsocklen = 1; + SYSSOCKET rsys, wsys; + gint rsyslen = 1, wsyslen = 1; + gint ret; - SRTSOCKET rsock; - gint rsocklen = 1; - SRTSOCKET wsock; - gint wsocklen = 1; + switch (srt_getsockstate (sock)) { + case SRTS_BROKEN: + case SRTS_CLOSING: + case SRTS_CLOSED: + case SRTS_NONEXIST: + g_set_error (&internal_error, GST_RESOURCE_ERROR, + GST_RESOURCE_ERROR_READ, "Socket is broken or closed"); + goto err; - if (srt_epoll_wait (poll_id, &rsock, &rsocklen, &wsock, &wsocklen, - poll_timeout, NULL, 0, NULL, 0) < 0) { + default: + break; + } + + GST_TRACE_OBJECT (srtobject->element, "Waiting for read"); + ret = + srt_epoll_wait (poll_id, &rsock, &rsocklen, &wsock, &wsocklen, + poll_timeout, &rsys, &rsyslen, &wsys, &wsyslen); + + if (g_cancellable_is_cancelled (srtobject->cancellable)) + break; + + if (ret < 0) { gint srt_errno = srt_getlasterror (NULL); - -#if SRT_VERSION_VALUE >= 0x010402 - if (srt_errno == SRT_EPOLLEMPTY - && g_cancellable_is_cancelled (srtobject->cancellable)) - return 0; -#endif - if (srt_errno == SRT_ETIMEOUT) continue; - g_set_error (error, GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_WRITE, + g_set_error (&internal_error, GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_READ, "Failed to poll socket: %s", srt_getlasterror_str ()); - return -1; + goto err; } + if (rsocklen != 1) + continue; + if (wsocklen == 1 && rsocklen == 1) { /* Socket reported in wsock AND rsock signifies an error. */ gint reason = srt_getrejectreason (wsock); if (reason == SRT_REJ_BADSECRET || reason == SRT_REJ_UNSECURE) { - if (connection_mode == GST_SRT_CONNECTION_MODE_LISTENER) { - GST_ELEMENT_WARNING (srtobject->element, RESOURCE, NOT_AUTHORIZED, - ("Caller failed to authenticate: %" REASON_FORMAT, - REASON_ARGS (reason)), (NULL)); - return 0; - } - - if (!auto_reconnect) { - g_set_error (error, GST_RESOURCE_ERROR, - GST_RESOURCE_ERROR_NOT_AUTHORIZED, - "Failed to authenticate: %" REASON_FORMAT, REASON_ARGS (reason)); - return -1; - } - - GST_ELEMENT_WARNING (srtobject->element, RESOURCE, NOT_AUTHORIZED, - ("Failed to authenticate: %" REASON_FORMAT ". Trying to reconnect", - REASON_ARGS (reason)), (NULL)); + g_set_error (&internal_error, GST_RESOURCE_ERROR, + GST_RESOURCE_ERROR_NOT_AUTHORIZED, + "Failed to authenticate: %" REASON_FORMAT, REASON_ARGS (reason)); } else { - if (connection_mode == GST_SRT_CONNECTION_MODE_LISTENER) { - /* Caller has disappeared. */ - return 0; - } - - if (!auto_reconnect) { - g_set_error (error, GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_READ, - "Error on SRT socket: %" REASON_FORMAT, REASON_ARGS (reason)); - return -1; - } - - GST_ELEMENT_WARNING (srtobject->element, RESOURCE, READ, - ("Error on SRT socket: %" REASON_FORMAT ". Trying to reconnect", - REASON_ARGS (reason)), (NULL)); + g_set_error (&internal_error, GST_RESOURCE_ERROR, + GST_RESOURCE_ERROR_READ, + "Error on SRT socket: %" REASON_FORMAT, REASON_ARGS (reason)); } - gst_srt_object_close (srtobject); - if (!gst_srt_object_open_internal (srtobject, error)) { - return -1; - } - continue; + goto err; } - srt_msgctrl_init (mctrl); len = srt_recvmsg2 (rsock, (char *) (data), size, mctrl); if (len == SRT_ERROR) { gint srt_errno = srt_getlasterror (NULL); - if (srt_errno == SRT_EASYNCRCV) { + if (srt_errno == SRT_EASYNCRCV) continue; - } else { - g_set_error (error, GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_READ, - "Failed to receive from SRT socket: %s", srt_getlasterror_str ()); - return -1; - } + + g_set_error (&internal_error, GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_READ, + "Failed to receive from SRT socket: %s", srt_getlasterror_str ()); + goto err; } srtobject->bytes += len; @@ -1600,6 +1616,32 @@ gst_srt_object_read (GstSRTObject * srtobject, guint8 * data, gsize size, } return len; + +err: + if (g_cancellable_is_cancelled (srtobject->cancellable)) + return 0; + + if (connection_mode == GST_SRT_CONNECTION_MODE_LISTENER) { + /* Caller has disappeared. */ + ERROR_TO_WARNING (srtobject, internal_error, ""); + g_clear_error (&internal_error); + return 0; + } + + if (!auto_reconnect) { + g_propagate_error (error, internal_error); + return -1; + } + + ERROR_TO_WARNING (srtobject, internal_error, ". Trying to reconnect"); + g_clear_error (&internal_error); + + gst_srt_object_close_internal (srtobject); + if (!gst_srt_object_open_internal (srtobject, error)) { + return -1; + } + + goto retry; } void @@ -1607,12 +1649,6 @@ gst_srt_object_unlock (GstSRTObject * srtobject) { GST_DEBUG_OBJECT (srtobject->element, "waking up SRT"); - /* Removing all socket descriptors from the monitoring list - * wakes up SRT's threads. We only have one to remove. */ - if (srtobject->sock != SRT_INVALID_SOCK) { - srt_epoll_remove_usock (srtobject->poll_id, srtobject->sock); - } - /* connection is only waited for in listener mode, * but there is no harm in raising signal in any case */ g_mutex_lock (&srtobject->sock_lock); @@ -1646,25 +1682,41 @@ gst_srt_object_send_headers (GstSRTObject * srtobject, SRTSOCKET sock, for (i = 0; i < size; i++) { SRTSOCKET wsock = sock; gint wsocklen = 1; - gint sent; + SYSSOCKET rsys, wsys; + gint rsyslen = 1, wsyslen = 1; + gint ret = 0, sent; GstBuffer *buffer = gst_buffer_list_get (headers, i); GstMapInfo mapinfo; - if (g_cancellable_is_cancelled (srtobject->cancellable)) { - return TRUE; + if (g_cancellable_is_cancelled (srtobject->cancellable)) + break; + + if (poll_id >= 0) { + switch (srt_getsockstate (sock)) { + case SRTS_BROKEN: + case SRTS_CLOSING: + case SRTS_CLOSED: + case SRTS_NONEXIST: + g_set_error (error, GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_WRITE, + "Socket is broken or closed"); + return FALSE; + + default: + break; + } + + GST_TRACE_OBJECT (srtobject->element, "Waiting for header write"); + ret = + srt_epoll_wait (poll_id, NULL, 0, &wsock, &wsocklen, poll_timeout, + &rsys, &rsyslen, &wsys, &wsyslen); + + if (g_cancellable_is_cancelled (srtobject->cancellable)) + break; } - if (poll_id >= 0 && srt_epoll_wait (poll_id, 0, 0, &wsock, &wsocklen, - poll_timeout, NULL, 0, NULL, 0) < 0) { + if (ret < 0) { gint srt_errno = srt_getlasterror (NULL); - -#if SRT_VERSION_VALUE >= 0x010402 - if (srt_errno == SRT_EPOLLEMPTY - && g_cancellable_is_cancelled (srtobject->cancellable)) - return TRUE; -#endif - if (srt_errno == SRT_ETIMEOUT) continue; @@ -1673,6 +1725,9 @@ gst_srt_object_send_headers (GstSRTObject * srtobject, SRTSOCKET sock, return FALSE; } + if (wsocklen != 1) + continue; + GST_TRACE_OBJECT (srtobject->element, "sending header %u %" GST_PTR_FORMAT, i, buffer); @@ -1776,6 +1831,7 @@ gst_srt_object_write_one (GstSRTObject * srtobject, GstBufferList * headers, const guint8 *msg = mapinfo->data; gint payload_size, optlen = sizeof (payload_size); gboolean wait_for_connection, auto_reconnect; + GError *internal_error = NULL; GST_OBJECT_LOCK (srtobject->element); wait_for_connection = srtobject->wait_for_connection; @@ -1787,106 +1843,133 @@ gst_srt_object_write_one (GstSRTObject * srtobject, GstBufferList * headers, } GST_OBJECT_UNLOCK (srtobject->element); +retry: if (!srtobject->sent_headers) { if (!gst_srt_object_send_headers (srtobject, srtobject->sock, srtobject->poll_id, poll_timeout, headers, error)) { - return -1; + goto err; } srtobject->sent_headers = TRUE; } while (len < mapinfo->size) { - SRTSOCKET rsock; - gint rsocklen = 1; - SRTSOCKET wsock; - gint wsocklen = 1; + SRTSOCKET rsock, wsock; + gint rsocklen = 1, wsocklen = 1; + SYSSOCKET rsys, wsys; + gint rsyslen = 1, wsyslen = 1; + gint ret, sent, rest; + gboolean connecting_but_not_waiting = FALSE; - gint sent; - gint rest; - - if (g_cancellable_is_cancelled (srtobject->cancellable)) { + if (g_cancellable_is_cancelled (srtobject->cancellable)) break; + + switch (srt_getsockstate (srtobject->sock)) { + case SRTS_BROKEN: + case SRTS_CLOSING: + case SRTS_CLOSED: + case SRTS_NONEXIST: + g_set_error (&internal_error, GST_RESOURCE_ERROR, + GST_RESOURCE_ERROR_WRITE, "Socket is broken or closed"); + goto err; + + case SRTS_CONNECTING: + if (!wait_for_connection) { + /* We need to check for SRT_EPOLL_ERR */ + connecting_but_not_waiting = TRUE; + } + break; + + default: + break; } - if (!wait_for_connection && - srt_getsockstate (srtobject->sock) == SRTS_CONNECTING) { - GST_LOG_OBJECT (srtobject->element, - "Not connected yet. Dropping the buffer."); - break; - } + GST_TRACE_OBJECT (srtobject->element, "Waiting a write"); + ret = + srt_epoll_wait (srtobject->poll_id, &rsock, &rsocklen, &wsock, + &wsocklen, connecting_but_not_waiting ? 0 : poll_timeout, &rsys, + &rsyslen, &wsys, &wsyslen); - if (srt_epoll_wait (srtobject->poll_id, &rsock, &rsocklen, &wsock, - &wsocklen, poll_timeout, NULL, 0, NULL, 0) < 0) { + if (g_cancellable_is_cancelled (srtobject->cancellable)) + break; + + if (ret < 0) { gint srt_errno = srt_getlasterror (NULL); - -#if SRT_VERSION_VALUE >= 0x010402 - if (srt_errno == SRT_EPOLLEMPTY - && g_cancellable_is_cancelled (srtobject->cancellable)) - return 0; -#endif - if (srt_errno == SRT_ETIMEOUT) continue; - g_set_error (error, GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_WRITE, - "Failed to poll socket: %s", srt_getlasterror_str ()); - return -1; + g_set_error (&internal_error, GST_RESOURCE_ERROR, + GST_RESOURCE_ERROR_WRITE, "Failed to poll socket: %s", + srt_getlasterror_str ()); + goto err; } + if (wsocklen != 1) + continue; + if (wsocklen == 1 && rsocklen == 1) { /* Socket reported in wsock AND rsock signifies an error. */ gint reason = srt_getrejectreason (wsock); if (reason == SRT_REJ_BADSECRET || reason == SRT_REJ_UNSECURE) { - if (!auto_reconnect) { - g_set_error (error, GST_RESOURCE_ERROR, - GST_RESOURCE_ERROR_NOT_AUTHORIZED, - "Failed to authenticate: %" REASON_FORMAT, REASON_ARGS (reason)); - return -1; - } - - GST_ELEMENT_WARNING (srtobject->element, RESOURCE, NOT_AUTHORIZED, - ("Failed to authenticate: %" REASON_FORMAT ". Trying to reconnect", - REASON_ARGS (reason)), (NULL)); + g_set_error (&internal_error, GST_RESOURCE_ERROR, + GST_RESOURCE_ERROR_NOT_AUTHORIZED, + "Failed to authenticate: %" REASON_FORMAT, REASON_ARGS (reason)); } else { - if (!auto_reconnect) { - g_set_error (error, GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_WRITE, - "Error on SRT socket: %" REASON_FORMAT, REASON_ARGS (reason)); - return -1; - } - - GST_ELEMENT_WARNING (srtobject->element, RESOURCE, WRITE, - ("Error on SRT socket: %" REASON_FORMAT ". Trying to reconnect", - REASON_ARGS (reason)), (NULL)); + g_set_error (&internal_error, GST_RESOURCE_ERROR, + GST_RESOURCE_ERROR_WRITE, + "Error on SRT socket: %" REASON_FORMAT, REASON_ARGS (reason)); } - gst_srt_object_close (srtobject); - if (!gst_srt_object_open_internal (srtobject, error)) { - return -1; - } - continue; + goto err; + } + + if (connecting_but_not_waiting) { + GST_LOG_OBJECT (srtobject->element, + "Not connected yet. Dropping the buffer."); + break; } if (srt_getsockflag (wsock, SRTO_PAYLOADSIZE, &payload_size, &optlen)) { - g_set_error (error, GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_WRITE, "%s", - srt_getlasterror_str ()); - return -1; + g_set_error (&internal_error, GST_RESOURCE_ERROR, + GST_RESOURCE_ERROR_WRITE, "%s", srt_getlasterror_str ()); + goto err; } rest = MIN (mapinfo->size - len, payload_size); sent = srt_sendmsg2 (wsock, (char *) (msg + len), rest, 0); if (sent < 0) { - g_set_error (error, GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_WRITE, "%s", - srt_getlasterror_str ()); - return -1; + g_set_error (&internal_error, GST_RESOURCE_ERROR, + GST_RESOURCE_ERROR_WRITE, "%s", srt_getlasterror_str ()); + goto err; } + len += sent; srtobject->bytes += sent; + continue; } return len; + +err: + if (g_cancellable_is_cancelled (srtobject->cancellable)) + return 0; + + if (!auto_reconnect) { + g_propagate_error (error, internal_error); + return -1; + } + + ERROR_TO_WARNING (srtobject, internal_error, ". Trying to reconnect"); + g_clear_error (&internal_error); + + gst_srt_object_close_internal (srtobject); + if (!gst_srt_object_open_internal (srtobject, error)) { + return -1; + } + + goto retry; } gssize @@ -1988,7 +2071,8 @@ gst_srt_object_get_stats (GstSRTObject * srtobject) g_mutex_lock (&srtobject->sock_lock); - if (srtobject->sock != SRT_INVALID_SOCK) { + if (srtobject->thread == NULL) { + /* Not a listening socket */ s = get_stats_for_srtsock (srtobject, srtobject->sock); } diff --git a/subprojects/gst-plugins-bad/ext/srt/gstsrtobject.h b/subprojects/gst-plugins-bad/ext/srt/gstsrtobject.h index d92efb22f5..e6e5f07ad4 100644 --- a/subprojects/gst-plugins-bad/ext/srt/gstsrtobject.h +++ b/subprojects/gst-plugins-bad/ext/srt/gstsrtobject.h @@ -59,8 +59,6 @@ struct _GstSRTObject gboolean sent_headers; GTask *listener_task; - SRTSOCKET listener_sock; - gint listener_poll_id; GThread *thread;