srt: Clean up error handling

- Make the srt_epoll_wait loops more uniform.

- Error only via GError when possible; let the element send the error
  message. Avoids a second error message.

- Return 0 when cancelled. Avoids an error message from the element.

- Don't send an error message from send_headers when we're a server
  sink.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/3156>
This commit is contained in:
Jan Alexander Steffens (heftig) 2022-06-08 16:35:54 +02:00 committed by GStreamer Marge Bot
parent a3cc5cf257
commit 4e05100e8c

View file

@ -38,11 +38,13 @@ GST_DEBUG_CATEGORY_EXTERN (gst_debug_srtobject);
#define GST_CAT_DEFAULT gst_debug_srtobject
#if SRT_VERSION_VALUE > 0x10402
#define SRTSOCK_ERROR_DEBUG ("libsrt reported: %s", srt_rejectreason_str (reason))
#define REASON_FORMAT "s"
#define REASON_ARGS(reason) srt_rejectreason_str (reason)
#else
/* srt_rejectreason_str() is unavailable in libsrt 1.4.2 and prior due to
* unexported symbol. See https://github.com/Haivision/srt/pull/1728. */
#define SRTSOCK_ERROR_DEBUG ("libsrt reported reject reason code %d", reason)
#define REASON_FORMAT "s %d"
#define REASON_ARGS(reason) "reject reason code", (reason)
#endif
/* Define options added in later revisions */
@ -52,10 +54,6 @@ GST_DEBUG_CATEGORY_EXTERN (gst_debug_srtobject);
#define SRTO_RETRANSMITALGO 61
#endif
#define ELEMENT_WARNING_SRTSOCK_ERROR(code, reason) \
GST_ELEMENT_WARNING (srtobject->element, RESOURCE, code, \
("Error on SRT socket. Trying to reconnect."), SRTSOCK_ERROR_DEBUG)
enum
{
PROP_URI = 1,
@ -924,19 +922,19 @@ thread_func (gpointer data)
GST_DEBUG_OBJECT (srtobject->element, "Waiting a request from caller");
if (srt_epoll_wait (srtobject->listener_poll_id, &rsock,
&rsocklen, 0, 0, poll_timeout, NULL, 0, NULL, 0) < 0) {
if (srt_epoll_wait (srtobject->listener_poll_id, &rsock, &rsocklen, 0, 0,
poll_timeout, NULL, 0, NULL, 0) < 0) {
gint srt_errno = srt_getlasterror (NULL);
if (srtobject->listener_poll_id == SRT_ERROR)
return NULL;
if (srt_errno == SRT_ETIMEOUT) {
if (srt_errno == SRT_ETIMEOUT)
continue;
} else {
GST_ELEMENT_ERROR (srtobject->element, RESOURCE, FAILED,
("abort polling: %s", srt_getlasterror_str ()), (NULL));
return NULL;
}
GST_ELEMENT_ERROR (srtobject->element, RESOURCE, FAILED,
("abort polling: %s", srt_getlasterror_str ()), (NULL));
return NULL;
}
caller_sock =
@ -1433,33 +1431,25 @@ gst_srt_object_close (GstSRTObject * srtobject)
static gboolean
gst_srt_object_wait_caller (GstSRTObject * srtobject,
GCancellable * cancellable, GError ** error)
GCancellable * cancellable)
{
gboolean ret;
g_mutex_lock (&srtobject->sock_lock);
if (srtobject->callers == NULL) {
ret = (srtobject->callers != NULL);
if (!ret) {
GST_INFO_OBJECT (srtobject->element, "Waiting for connection");
while (!g_cancellable_is_cancelled (cancellable)) {
ret = (srtobject->callers != NULL);
if (ret) {
GST_DEBUG_OBJECT (srtobject->element, "Got a connection");
break;
}
while (!ret && !g_cancellable_is_cancelled (cancellable)) {
g_cond_wait (&srtobject->sock_cond, &srtobject->sock_lock);
ret = (srtobject->callers != NULL);
}
} else {
ret = TRUE;
}
g_mutex_unlock (&srtobject->sock_lock);
if (!ret) {
g_set_error (error, GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_FAILED,
"Canceled waiting for a connection.");
if (ret) {
GST_DEBUG_OBJECT (srtobject->element, "Got a connection");
}
return ret;
@ -1492,8 +1482,8 @@ gst_srt_object_read (GstSRTObject * srtobject,
GST_OBJECT_UNLOCK (srtobject->element);
if (connection_mode == GST_SRT_CONNECTION_MODE_LISTENER) {
if (!gst_srt_object_wait_caller (srtobject, cancellable, error))
return -1;
if (!gst_srt_object_wait_caller (srtobject, cancellable))
return 0;
g_mutex_lock (&srtobject->sock_lock);
if (srtobject->callers) {
@ -1519,34 +1509,48 @@ gst_srt_object_read (GstSRTObject * srtobject,
poll_timeout, NULL, 0, NULL, 0) < 0) {
gint srt_errno = srt_getlasterror (NULL);
if (srt_errno != SRT_ETIMEOUT) {
#if SRT_VERSION_VALUE >= 0x010402
if (srt_errno == SRT_EPOLLEMPTY)
return 0;
}
continue;
#endif
if (srt_errno == SRT_ETIMEOUT)
continue;
g_set_error (error, GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_WRITE,
"Failed to poll socket: %s", srt_getlasterror_str ());
return -1;
}
if (wsocklen == 1 && rsocklen == 1) {
/* Socket reported in wsock AND rsock signifies an error. */
gint reason = srt_getrejectreason (wsock);
gboolean is_auth_error = (reason == SRT_REJ_BADSECRET
|| reason == SRT_REJ_UNSECURE);
if (is_auth_error) {
ELEMENT_WARNING_SRTSOCK_ERROR (NOT_AUTHORIZED, reason);
if (reason == SRT_REJ_BADSECRET || reason == SRT_REJ_UNSECURE) {
if (connection_mode == GST_SRT_CONNECTION_MODE_LISTENER) {
GST_ELEMENT_WARNING (srtobject->element, RESOURCE, NOT_AUTHORIZED,
("Caller failed to authenticate: %" REASON_FORMAT,
REASON_ARGS (reason)), (NULL));
return 0;
}
GST_ELEMENT_WARNING (srtobject->element, RESOURCE, NOT_AUTHORIZED,
("Failed to authenticate: %" REASON_FORMAT ". Trying to reconnect",
REASON_ARGS (reason)), (NULL));
} else {
if (connection_mode == GST_SRT_CONNECTION_MODE_LISTENER) {
/* Caller has disappeared. */
return 0;
}
GST_ELEMENT_WARNING (srtobject->element, RESOURCE, READ,
("Error on SRT socket: %" REASON_FORMAT ". Trying to reconnect",
REASON_ARGS (reason)), (NULL));
}
if (connection_mode == GST_SRT_CONNECTION_MODE_LISTENER) {
/* Caller has disappeared. */
return 0;
} else {
if (!is_auth_error) {
ELEMENT_WARNING_SRTSOCK_ERROR (READ, reason);
}
gst_srt_object_close (srtobject);
if (!gst_srt_object_open_internal (srtobject, cancellable, error)) {
return -1;
}
gst_srt_object_close (srtobject);
if (!gst_srt_object_open_internal (srtobject, cancellable, error)) {
return -1;
}
continue;
}
@ -1598,7 +1602,7 @@ gst_srt_object_wakeup (GstSRTObject * srtobject, GCancellable * cancellable)
static gboolean
gst_srt_object_send_headers (GstSRTObject * srtobject, SRTSOCKET sock,
gint poll_id, gint poll_timeout, GstBufferList * headers,
GCancellable * cancellable)
GCancellable * cancellable, GError ** error)
{
guint size, i;
@ -1618,27 +1622,39 @@ gst_srt_object_send_headers (GstSRTObject * srtobject, SRTSOCKET sock,
GstMapInfo mapinfo;
if (g_cancellable_is_cancelled (cancellable)) {
return FALSE;
return TRUE;
}
if (poll_id > 0 && srt_epoll_wait (poll_id, 0, 0, &wsock,
&wsocklen, poll_timeout, NULL, 0, NULL, 0) < 0) {
continue;
if (poll_id >= 0 && srt_epoll_wait (poll_id, 0, 0, &wsock, &wsocklen,
poll_timeout, NULL, 0, NULL, 0) < 0) {
gint srt_errno = srt_getlasterror (NULL);
#if SRT_VERSION_VALUE >= 0x010402
if (srt_errno == SRT_EPOLLEMPTY)
return TRUE;
#endif
if (srt_errno == SRT_ETIMEOUT)
continue;
g_set_error (error, GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_WRITE,
"Failed to poll socket: %s", srt_getlasterror_str ());
return FALSE;
}
GST_TRACE_OBJECT (srtobject->element, "sending header %u %" GST_PTR_FORMAT,
i, buffer);
if (!gst_buffer_map (buffer, &mapinfo, GST_MAP_READ)) {
GST_ELEMENT_ERROR (srtobject->element, RESOURCE, READ,
("Could not map the input stream"), (NULL));
g_set_error (error, GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_WRITE,
"Failed to map header buffer");
return FALSE;
}
sent = srt_sendmsg2 (wsock, (char *) mapinfo.data, mapinfo.size, 0);
if (sent == SRT_ERROR) {
GST_ELEMENT_ERROR (srtobject->element, RESOURCE, WRITE, NULL,
("%s", srt_getlasterror_str ()));
g_set_error (error, GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_WRITE, "%s",
srt_getlasterror_str ());
gst_buffer_unmap (buffer, &mapinfo);
return FALSE;
}
@ -1654,7 +1670,7 @@ gst_srt_object_send_headers (GstSRTObject * srtobject, SRTSOCKET sock,
static gssize
gst_srt_object_write_to_callers (GstSRTObject * srtobject,
GstBufferList * headers,
const GstMapInfo * mapinfo, GCancellable * cancellable, GError ** error)
const GstMapInfo * mapinfo, GCancellable * cancellable)
{
GList *callers;
@ -1674,10 +1690,17 @@ gst_srt_object_write_to_callers (GstSRTObject * srtobject,
}
if (!caller->sent_headers) {
if (!gst_srt_object_send_headers (srtobject, caller->sock, -1,
-1, headers, cancellable)) {
GError *error = NULL;
if (!gst_srt_object_send_headers (srtobject, caller->sock, -1, 0,
headers, cancellable, &error)) {
GST_WARNING_OBJECT (srtobject->element,
"Failed to send headers to caller %d: %s",
caller->sock, error->message);
g_error_free (error);
goto err;
}
caller->sent_headers = TRUE;
}
@ -1712,7 +1735,7 @@ gst_srt_object_write_to_callers (GstSRTObject * srtobject,
cancelled:
g_mutex_unlock (&srtobject->sock_lock);
return -1;
return 0;
}
static gssize
@ -1736,9 +1759,10 @@ gst_srt_object_write_one (GstSRTObject * srtobject,
if (!srtobject->sent_headers) {
if (!gst_srt_object_send_headers (srtobject, srtobject->sock,
srtobject->poll_id, poll_timeout, headers, cancellable)) {
srtobject->poll_id, poll_timeout, headers, cancellable, error)) {
return -1;
}
srtobject->sent_headers = TRUE;
}
@ -1764,7 +1788,19 @@ gst_srt_object_write_one (GstSRTObject * srtobject,
if (srt_epoll_wait (srtobject->poll_id, &rsock, &rsocklen, &wsock,
&wsocklen, poll_timeout, NULL, 0, NULL, 0) < 0) {
continue;
gint srt_errno = srt_getlasterror (NULL);
#if SRT_VERSION_VALUE >= 0x010402
if (srt_errno == SRT_EPOLLEMPTY)
return 0;
#endif
if (srt_errno == SRT_ETIMEOUT)
continue;
g_set_error (error, GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_WRITE,
"Failed to poll socket: %s", srt_getlasterror_str ());
return -1;
}
if (wsocklen == 1 && rsocklen == 1) {
@ -1772,9 +1808,13 @@ gst_srt_object_write_one (GstSRTObject * srtobject,
gint reason = srt_getrejectreason (wsock);
if (reason == SRT_REJ_BADSECRET || reason == SRT_REJ_UNSECURE) {
ELEMENT_WARNING_SRTSOCK_ERROR (NOT_AUTHORIZED, reason);
GST_ELEMENT_WARNING (srtobject->element, RESOURCE, NOT_AUTHORIZED,
("Failed to authenticate: %" REASON_FORMAT ". Trying to reconnect",
REASON_ARGS (reason)), (NULL));
} else {
ELEMENT_WARNING_SRTSOCK_ERROR (WRITE, reason);
GST_ELEMENT_WARNING (srtobject->element, RESOURCE, WRITE,
("Error on SRT socket: %" REASON_FORMAT ". Trying to reconnect",
REASON_ARGS (reason)), (NULL));
}
gst_srt_object_close (srtobject);
@ -1785,18 +1825,18 @@ gst_srt_object_write_one (GstSRTObject * srtobject,
}
if (srt_getsockflag (wsock, SRTO_PAYLOADSIZE, &payload_size, &optlen)) {
GST_ELEMENT_ERROR (srtobject->element, RESOURCE, WRITE, NULL,
("%s", srt_getlasterror_str ()));
break;
g_set_error (error, GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_WRITE, "%s",
srt_getlasterror_str ());
return -1;
}
rest = MIN (mapinfo->size - len, payload_size);
sent = srt_sendmsg2 (wsock, (char *) (msg + len), rest, 0);
if (sent < 0) {
GST_ELEMENT_ERROR (srtobject->element, RESOURCE, WRITE, NULL,
("%s", srt_getlasterror_str ()));
break;
g_set_error (error, GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_WRITE, "%s",
srt_getlasterror_str ());
return -1;
}
len += sent;
srtobject->bytes += sent;
@ -1826,12 +1866,12 @@ gst_srt_object_write (GstSRTObject * srtobject,
if (connection_mode == GST_SRT_CONNECTION_MODE_LISTENER) {
if (wait_for_connection) {
if (!gst_srt_object_wait_caller (srtobject, cancellable, error))
return -1;
if (!gst_srt_object_wait_caller (srtobject, cancellable))
return 0;
}
len =
gst_srt_object_write_to_callers (srtobject, headers, mapinfo,
cancellable, error);
cancellable);
} else {
len =
gst_srt_object_write_one (srtobject, headers, mapinfo, cancellable,