srt: Clean up locking

Use GST_OBJECT_LOCK (srtobject->element) to protect only the fields
involved in property access.

Introduce a new mutex srtobject->sock_lock to go with
srtobject->sock_cond and protect the list of callers from concurrent
access.
This commit is contained in:
Jan Alexander Steffens (heftig) 2020-03-18 17:55:38 +01:00
parent 37ee389913
commit d2d00e07ac
No known key found for this signature in database
GPG key ID: DE5E0C5F25941CA5
4 changed files with 134 additions and 73 deletions

View file

@ -88,6 +88,7 @@ srt_caller_free (SRTCaller * caller)
g_free (caller);
}
/* called with sock_lock */
static void
srt_caller_invoke_removed_closure (SRTCaller * caller, GstSRTObject * srtobject)
{
@ -132,12 +133,14 @@ gst_srt_object_set_common_params (SRTSOCKET sock, GstSRTObject * srtobject,
{
struct srt_constant_params *params = srt_params;
GST_OBJECT_LOCK (srtobject->element);
for (; params->name != NULL; params++) {
if (srt_setsockopt (sock, 0, params->param, &params->val, sizeof (gint))) {
g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_SETTINGS,
"failed to set %s (reason: %s)", params->name,
srt_getlasterror_str ());
return FALSE;
goto err;
}
}
@ -149,7 +152,7 @@ gst_srt_object_set_common_params (SRTSOCKET sock, GstSRTObject * srtobject,
g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_SETTINGS,
"failed to set passphrase (reason: %s)", srt_getlasterror_str ());
return FALSE;
goto err;
}
if (!gst_structure_get_int (srtobject->parameters, "pbkeylen", &pbkeylen)) {
@ -159,7 +162,7 @@ gst_srt_object_set_common_params (SRTSOCKET sock, GstSRTObject * srtobject,
if (srt_setsockopt (sock, 0, SRTO_PBKEYLEN, &pbkeylen, sizeof (int))) {
g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_SETTINGS,
"failed to set pbkeylen (reason: %s)", srt_getlasterror_str ());
return FALSE;
goto err;
}
}
@ -171,7 +174,7 @@ gst_srt_object_set_common_params (SRTSOCKET sock, GstSRTObject * srtobject,
if (srt_setsockopt (sock, 0, SRTO_LATENCY, &latency, sizeof (int))) {
g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_SETTINGS,
"failed to set latency (reason: %s)", srt_getlasterror_str ());
return FALSE;
goto err;
}
}
@ -187,7 +190,12 @@ gst_srt_object_set_common_params (SRTSOCKET sock, GstSRTObject * srtobject,
}
}
GST_OBJECT_UNLOCK (srtobject->element);
return TRUE;
err:
GST_OBJECT_UNLOCK (srtobject->element);
return FALSE;
}
GstSRTObject *
@ -254,12 +262,12 @@ gboolean
gst_srt_object_set_property_helper (GstSRTObject * srtobject,
guint prop_id, const GValue * value, GParamSpec * pspec)
{
GST_OBJECT_LOCK (srtobject->element);
switch (prop_id) {
case PROP_URI:{
const gchar *uri = g_value_get_string (value);
gst_srt_object_set_uri (srtobject, uri, NULL);
case PROP_URI:
gst_srt_object_set_uri (srtobject, g_value_get_string (value), NULL);
break;
}
case PROP_MODE:
gst_structure_set_value (srtobject->parameters, "mode", value);
break;
@ -283,20 +291,26 @@ gst_srt_object_set_property_helper (GstSRTObject * srtobject,
gst_structure_set_value (srtobject->parameters, "pbkeylen", value);
break;
case PROP_WAIT_FOR_CONNECTION:
GST_OBJECT_LOCK (srtobject->element);
srtobject->wait_for_connection = g_value_get_boolean (value);
GST_OBJECT_UNLOCK (srtobject->element);
break;
default:
return FALSE;
goto err;
}
GST_OBJECT_UNLOCK (srtobject->element);
return TRUE;
err:
GST_OBJECT_UNLOCK (srtobject->element);
return FALSE;
}
gboolean
gst_srt_object_get_property_helper (GstSRTObject * srtobject,
guint prop_id, GValue * value, GParamSpec * pspec)
{
GST_OBJECT_LOCK (srtobject->element);
switch (prop_id) {
case PROP_URI:
g_value_take_string (value, gst_uri_to_string (srtobject->uri));
@ -355,15 +369,19 @@ gst_srt_object_get_property_helper (GstSRTObject * srtobject,
case PROP_STATS:
g_value_take_boxed (value, gst_srt_object_get_stats (srtobject));
break;
case PROP_WAIT_FOR_CONNECTION:{
case PROP_WAIT_FOR_CONNECTION:
g_value_set_boolean (value, srtobject->wait_for_connection);
break;
}
default:
return FALSE;
goto err;
}
GST_OBJECT_UNLOCK (srtobject->element);
return TRUE;
err:
GST_OBJECT_UNLOCK (srtobject->element);
return FALSE;
}
void
@ -563,6 +581,7 @@ gst_srt_object_validate_parameters (GstStructure * s, GstUri * uri)
}
}
/* called with GST_OBJECT_LOCK (srtobject->element) held */
gboolean
gst_srt_object_set_uri (GstSRTObject * srtobject, const gchar * uri,
GError ** err)
@ -653,10 +672,12 @@ thread_func (gpointer data)
gint rsocklen = 1;
for (;;) {
GST_OBJECT_LOCK (srtobject->element);
if (!gst_structure_get_int (srtobject->parameters, "poll-timeout",
&poll_timeout)) {
poll_timeout = GST_SRT_DEFAULT_POLL_TIMEOUT;
}
GST_OBJECT_UNLOCK (srtobject->element);
GST_DEBUG_OBJECT (srtobject->element, "Waiting a request from caller");
@ -706,10 +727,10 @@ thread_func (gpointer data)
continue;
}
GST_OBJECT_LOCK (srtobject->element);
g_mutex_lock (&srtobject->sock_lock);
srtobject->callers = g_list_append (srtobject->callers, caller);
g_cond_signal (&srtobject->sock_cond);
GST_OBJECT_UNLOCK (srtobject->element);
g_mutex_unlock (&srtobject->sock_lock);
/* notifying caller-added */
if (srtobject->caller_added_closure != NULL) {
@ -749,6 +770,8 @@ gst_srt_object_wait_connect (GstSRTObject * srtobject,
gsize bind_sa_len;
GSocketAddress *bind_addr;
GST_OBJECT_LOCK (srtobject->element);
gst_structure_get_uint (srtobject->parameters, "localport", &local_port);
local_address =
@ -756,6 +779,8 @@ gst_srt_object_wait_connect (GstSRTObject * srtobject,
if (local_address == NULL)
local_address = GST_SRT_DEFAULT_LOCALADDRESS;
GST_OBJECT_UNLOCK (srtobject->element);
bind_addr = g_inet_socket_address_new_from_string (local_address, local_port);
bind_sa_len = g_socket_address_get_native_size (bind_addr);
bind_sa = g_alloca (bind_sa_len);
@ -881,9 +906,12 @@ gst_srt_object_connect (GstSRTObject * srtobject,
goto failed;
}
GST_OBJECT_LOCK (srtobject->element);
gst_structure_get_uint (srtobject->parameters, "localport", &local_port);
local_address =
gst_structure_get_string (srtobject->parameters, "localaddress");
GST_OBJECT_UNLOCK (srtobject->element);
/* According to SRT norm, bind local address and port if specified */
if (local_address != NULL && local_port != 0) {
gpointer bind_sa;
@ -983,6 +1011,10 @@ gst_srt_object_open_full (GstSRTObject * srtobject,
gpointer sa;
size_t sa_len;
const gchar *addr_str;
guint port;
gboolean ret = FALSE;
GST_OBJECT_LOCK (srtobject->element);
srtobject->opened = FALSE;
@ -1001,7 +1033,6 @@ gst_srt_object_open_full (GstSRTObject * srtobject,
}
addr_str = gst_uri_get_host (srtobject->uri);
if (addr_str == NULL) {
addr_str = GST_SRT_DEFAULT_LOCALADDRESS;
GST_DEBUG_OBJECT (srtobject->element,
@ -1009,9 +1040,22 @@ gst_srt_object_open_full (GstSRTObject * srtobject,
" setting listener mode", addr_str);
}
socket_address =
g_inet_socket_address_new_from_string (addr_str,
gst_uri_get_port (srtobject->uri));
port = gst_uri_get_port (srtobject->uri);
GST_DEBUG_OBJECT (srtobject->element,
"Opening SRT socket with parameters: %" GST_PTR_FORMAT,
srtobject->parameters);
if (!gst_structure_get_enum (srtobject->parameters,
"mode", GST_TYPE_SRT_CONNECTION_MODE, (gint *) & connection_mode)) {
GST_WARNING_OBJECT (srtobject->element,
"Cannot get connection mode information." " Use default mode");
connection_mode = GST_TYPE_SRT_CONNECTION_MODE;
}
GST_OBJECT_UNLOCK (srtobject->element);
socket_address = g_inet_socket_address_new_from_string (addr_str, port);
if (socket_address == NULL) {
g_set_error (error, GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_OPEN_READ,
@ -1033,33 +1077,26 @@ gst_srt_object_open_full (GstSRTObject * srtobject,
goto out;
}
GST_DEBUG_OBJECT (srtobject->element,
"Opening SRT socket with parameters: %" GST_PTR_FORMAT,
srtobject->parameters);
if (!gst_structure_get_enum (srtobject->parameters,
"mode", GST_TYPE_SRT_CONNECTION_MODE, (gint *) & connection_mode)) {
GST_WARNING_OBJECT (srtobject->element,
"Cannot get connection mode information." " Use default mode");
connection_mode = GST_TYPE_SRT_CONNECTION_MODE;
}
srtobject->listener_poll_id = srt_epoll_create ();
srtobject->opened =
ret =
gst_srt_object_open_connection
(srtobject, cancellable, connection_mode, sa, sa_len, error);
GST_OBJECT_LOCK (srtobject->element);
srtobject->opened = ret;
GST_OBJECT_UNLOCK (srtobject->element);
out:
g_clear_object (&socket_address);
return srtobject->opened;
return ret;
}
void
gst_srt_object_close (GstSRTObject * srtobject)
{
GST_OBJECT_LOCK (srtobject->element);
g_mutex_lock (&srtobject->sock_lock);
if (srtobject->poll_id != SRT_ERROR) {
srt_epoll_remove_usock (srtobject->poll_id, srtobject->sock);
}
@ -1080,9 +1117,9 @@ gst_srt_object_close (GstSRTObject * srtobject)
}
if (srtobject->thread) {
GThread *thread = g_steal_pointer (&srtobject->thread);
GST_OBJECT_UNLOCK (srtobject->element);
g_mutex_unlock (&srtobject->sock_lock);
g_thread_join (thread);
GST_OBJECT_LOCK (srtobject->element);
g_mutex_lock (&srtobject->sock_lock);
}
if (srtobject->listener_sock != SRT_INVALID_SOCK) {
@ -1095,16 +1132,17 @@ gst_srt_object_close (GstSRTObject * srtobject)
if (srtobject->callers) {
GList *callers = g_steal_pointer (&srtobject->callers);
GST_OBJECT_UNLOCK (srtobject->element);
g_list_foreach (callers, (GFunc) srt_caller_invoke_removed_closure,
srtobject);
GST_OBJECT_LOCK (srtobject->element);
g_list_free_full (callers, (GDestroyNotify) srt_caller_free);
}
g_clear_pointer (&srtobject->caller_added_closure, g_closure_unref);
g_clear_pointer (&srtobject->caller_removed_closure, g_closure_unref);
g_mutex_unlock (&srtobject->sock_lock);
GST_OBJECT_LOCK (srtobject->element);
srtobject->opened = FALSE;
GST_OBJECT_UNLOCK (srtobject->element);
}
@ -1117,15 +1155,14 @@ gst_srt_object_wait_caller (GstSRTObject * srtobject,
GST_DEBUG_OBJECT (srtobject->element, "Waiting connection from caller");
GST_OBJECT_LOCK (srtobject->element);
g_mutex_lock (&srtobject->sock_lock);
while (!g_cancellable_is_cancelled (cancellable)) {
ret = (srtobject->callers != NULL);
if (ret)
break;
g_cond_wait (&srtobject->sock_cond,
GST_OBJECT_GET_LOCK (srtobject->element));
g_cond_wait (&srtobject->sock_cond, &srtobject->sock_lock);
}
GST_OBJECT_UNLOCK (srtobject->element);
g_mutex_unlock (&srtobject->sock_lock);
GST_DEBUG_OBJECT (srtobject->element, "got %s connection", ret ? "a" : "no");
@ -1145,31 +1182,35 @@ gst_srt_object_read (GstSRTObject * srtobject,
g_return_val_if_fail (gst_uri_handler_get_uri_type (GST_URI_HANDLER
(srtobject->element)) == GST_URI_SRC, -1);
GST_OBJECT_LOCK (srtobject->element);
gst_structure_get_enum (srtobject->parameters, "mode",
GST_TYPE_SRT_CONNECTION_MODE, (gint *) & connection_mode);
if (connection_mode == GST_SRT_CONNECTION_MODE_LISTENER) {
SRTCaller *caller;
if (!gst_srt_object_wait_caller (srtobject, cancellable, error))
return -1;
GST_OBJECT_LOCK (srtobject->element);
caller = srtobject->callers->data;
if (srtobject->callers)
poll_id = caller->poll_id;
GST_OBJECT_UNLOCK (srtobject->element);
if (poll_id == SRT_ERROR)
return 0;
} else {
poll_id = srtobject->poll_id;
}
if (!gst_structure_get_int (srtobject->parameters, "poll-timeout",
&poll_timeout)) {
poll_timeout = GST_SRT_DEFAULT_POLL_TIMEOUT;
}
GST_OBJECT_UNLOCK (srtobject->element);
if (connection_mode == GST_SRT_CONNECTION_MODE_LISTENER) {
if (!gst_srt_object_wait_caller (srtobject, cancellable, error))
return -1;
g_mutex_lock (&srtobject->sock_lock);
if (srtobject->callers) {
SRTCaller *caller = srtobject->callers->data;
poll_id = caller->poll_id;
}
g_mutex_unlock (&srtobject->sock_lock);
if (poll_id == SRT_ERROR)
return 0;
} else {
poll_id = srtobject->poll_id;
}
while (!g_cancellable_is_cancelled (cancellable)) {
SRTSOCKET rsock;
@ -1236,13 +1277,13 @@ gst_srt_object_wakeup (GstSRTObject * srtobject, GCancellable * cancellable)
/* connection is only waited for in listener mode,
* but there is no harm in raising signal in any case */
GST_OBJECT_LOCK (srtobject->element);
g_mutex_lock (&srtobject->sock_lock);
/* however, a race might be harmful ...
* the cancellation is used as 'flushing' flag here,
* so make sure it is so detected by the intended part at proper time */
g_cancellable_cancel (cancellable);
g_cond_signal (&srtobject->sock_cond);
GST_OBJECT_UNLOCK (srtobject->element);
g_mutex_unlock (&srtobject->sock_lock);
}
static gboolean
@ -1305,7 +1346,7 @@ gst_srt_object_write_to_callers (GstSRTObject * srtobject,
{
GList *callers;
GST_OBJECT_LOCK (srtobject->element);
g_mutex_lock (&srtobject->sock_lock);
callers = srtobject->callers;
while (callers != NULL) {
gssize len = 0;
@ -1317,8 +1358,7 @@ gst_srt_object_write_to_callers (GstSRTObject * srtobject,
callers = callers->next;
if (g_cancellable_is_cancelled (cancellable)) {
GST_OBJECT_UNLOCK (srtobject->element);
return -1;
goto cancelled;
}
if (!caller->sent_headers) {
@ -1352,9 +1392,12 @@ gst_srt_object_write_to_callers (GstSRTObject * srtobject,
srt_caller_free (caller);
}
GST_OBJECT_UNLOCK (srtobject->element);
g_mutex_unlock (&srtobject->sock_lock);
return mapinfo->size;
cancelled:
g_mutex_unlock (&srtobject->sock_lock);
return -1;
}
static gssize
@ -1367,10 +1410,12 @@ gst_srt_object_write_one (GstSRTObject * srtobject,
const guint8 *msg = mapinfo->data;
gint payload_size, optlen = 1;
GST_OBJECT_LOCK (srtobject->element);
if (!gst_structure_get_int (srtobject->parameters, "poll-timeout",
&poll_timeout)) {
poll_timeout = GST_SRT_DEFAULT_POLL_TIMEOUT;
}
GST_OBJECT_UNLOCK (srtobject->element);
if (!srtobject->sent_headers) {
if (!gst_srt_object_send_headers (srtobject, srtobject->sock,
@ -1443,16 +1488,20 @@ gst_srt_object_write (GstSRTObject * srtobject,
{
gssize len = 0;
GstSRTConnectionMode connection_mode = GST_SRT_CONNECTION_MODE_NONE;
gboolean wait_for_connection;
/* Only sink element can write data */
g_return_val_if_fail (gst_uri_handler_get_uri_type (GST_URI_HANDLER
(srtobject->element)) == GST_URI_SINK, -1);
GST_OBJECT_LOCK (srtobject->element);
gst_structure_get_enum (srtobject->parameters, "mode",
GST_TYPE_SRT_CONNECTION_MODE, (gint *) & connection_mode);
wait_for_connection = srtobject->wait_for_connection;
GST_OBJECT_UNLOCK (srtobject->element);
if (connection_mode == GST_SRT_CONNECTION_MODE_LISTENER) {
if (srtobject->wait_for_connection) {
if (wait_for_connection) {
if (!gst_srt_object_wait_caller (srtobject, cancellable, error))
return -1;
}
@ -1534,7 +1583,7 @@ gst_srt_object_get_stats (GstSRTObject * srtobject)
GstStructure *s = NULL;
gboolean is_sender = GST_IS_BASE_SINK (srtobject->element);
GST_OBJECT_LOCK (srtobject->element);
g_mutex_lock (&srtobject->sock_lock);
if (srtobject->sock != SRT_INVALID_SOCK) {
s = get_stats_for_srtsock (srtobject->sock, is_sender);
goto done;
@ -1564,7 +1613,7 @@ gst_srt_object_get_stats (GstSRTObject * srtobject)
}
done:
GST_OBJECT_UNLOCK (srtobject->element);
g_mutex_unlock (&srtobject->sock_lock);
return s;
}

View file

@ -56,14 +56,16 @@ struct _GstSRTObject
gint poll_id;
gboolean sent_headers;
GCond sock_cond;
GTask *listener_task;
SRTSOCKET listener_sock;
gint listener_poll_id;
GThread *thread;
/* Protects the list of callers */
GMutex sock_lock;
GCond sock_cond;
GList *callers;
GClosure *caller_added_closure;

View file

@ -374,8 +374,13 @@ gst_srt_sink_uri_set_uri (GstURIHandler * handler,
const gchar * uri, GError ** error)
{
GstSRTSink *self = GST_SRT_SINK (handler);
gboolean ret;
return gst_srt_object_set_uri (self->srtobject, uri, error);
GST_OBJECT_LOCK (self);
ret = gst_srt_object_set_uri (self->srtobject, uri, error);
GST_OBJECT_UNLOCK (self);
return ret;
}
static void

View file

@ -342,8 +342,13 @@ gst_srt_src_uri_set_uri (GstURIHandler * handler,
const gchar * uri, GError ** error)
{
GstSRTSrc *self = GST_SRT_SRC (handler);
gboolean ret;
return gst_srt_object_set_uri (self->srtobject, uri, error);
GST_OBJECT_LOCK (self);
ret = gst_srt_object_set_uri (self->srtobject, uri, error);
GST_OBJECT_UNLOCK (self);
return ret;
}
static void