filter: Release lock in filter functions

Release the object lock before calling the filter functions. We need to
keep a cookie to detect when the list changed during the filter
callback. We also keep a hashtable to make sure we only call the filter
function once for each object in case of concurrent modification.

Fixes https://bugzilla.gnome.org/show_bug.cgi?id=732950
This commit is contained in:
Wim Taymans 2014-07-10 11:32:20 +02:00
parent 6543082d2b
commit 945c93fde0
5 changed files with 194 additions and 61 deletions

View file

@ -83,6 +83,7 @@ struct _GstRTSPClientPrivate
GList *transports;
GList *sessions;
guint sessions_cookie;
gboolean drop_backlog;
};
@ -305,6 +306,7 @@ client_watch_session (GstRTSPClient * client, GstRTSPSession * session)
GST_INFO ("watching session %p", session);
priv->sessions = g_list_prepend (priv->sessions, g_object_ref (session));
priv->sessions_cookie++;
/* connect removed session handler, it will be disconnected when the last
* session gets removed */
@ -334,12 +336,12 @@ client_unwatch_session (GstRTSPClient * client, GstRTSPSession * session,
}
priv->sessions = g_list_delete_link (priv->sessions, link);
priv->sessions_cookie++;
/* if this was the last session, disconnect the handler.
* This will also drop the extra client ref */
if (!priv->sessions) {
g_signal_handler_disconnect (priv->session_pool,
priv->session_removed_id);
g_signal_handler_disconnect (priv->session_pool, priv->session_removed_id);
priv->session_removed_id = 0;
}
@ -3455,29 +3457,50 @@ gst_rtsp_client_session_filter (GstRTSPClient * client,
{
GstRTSPClientPrivate *priv;
GList *result, *walk, *next;
GHashTable *visited;
guint cookie;
g_return_val_if_fail (GST_IS_RTSP_CLIENT (client), NULL);
priv = client->priv;
result = NULL;
if (func)
visited = g_hash_table_new_full (NULL, NULL, g_object_unref, NULL);
g_mutex_lock (&priv->lock);
restart:
cookie = priv->sessions_cookie;
for (walk = priv->sessions; walk; walk = next) {
GstRTSPSession *sess = walk->data;
GstRTSPFilterResult res;
gboolean changed;
next = g_list_next (walk);
if (func)
if (func) {
/* only visit each session once */
if (g_hash_table_contains (visited, sess))
continue;
g_hash_table_add (visited, g_object_ref (sess));
g_mutex_unlock (&priv->lock);
res = func (client, sess, user_data);
else
g_mutex_lock (&priv->lock);
} else
res = GST_RTSP_FILTER_REF;
changed = (cookie != priv->sessions_cookie);
switch (res) {
case GST_RTSP_FILTER_REMOVE:
/* stop watching the session and pretent it went away */
client_unwatch_session (client, sess, walk);
/* stop watching the session and pretend it went away, if the list was
* changed, we can't use the current list position, try to see if we
* still have the session */
client_unwatch_session (client, sess, changed ? NULL : walk);
cookie = priv->sessions_cookie;
break;
case GST_RTSP_FILTER_REF:
result = g_list_prepend (result, g_object_ref (sess));
@ -3486,8 +3509,13 @@ gst_rtsp_client_session_filter (GstRTSPClient * client,
default:
break;
}
if (changed)
goto restart;
}
g_mutex_unlock (&priv->lock);
if (func)
g_hash_table_unref (visited);
return result;
}

View file

@ -90,6 +90,7 @@ struct _GstRTSPServerPrivate
/* the clients that are connected */
GList *clients;
guint clients_cookie;
};
#define DEFAULT_ADDRESS "0.0.0.0"
@ -999,6 +1000,7 @@ unmanage_client (GstRTSPClient * client, ClientContext * ctx)
GST_RTSP_SERVER_LOCK (server);
priv->clients = g_list_remove (priv->clients, ctx);
priv->clients_cookie++;
GST_RTSP_SERVER_UNLOCK (server);
if (ctx->thread) {
@ -1050,6 +1052,7 @@ manage_client (GstRTSPServer * server, GstRTSPClient * client)
g_signal_connect (client, "closed", (GCallback) unmanage_client, cctx);
priv->clients = g_list_prepend (priv->clients, cctx);
priv->clients_cookie++;
gst_rtsp_client_attach (client, mainctx);
@ -1361,38 +1364,62 @@ gst_rtsp_server_client_filter (GstRTSPServer * server,
{
GstRTSPServerPrivate *priv;
GList *result, *walk, *next;
GHashTable *visited;
guint cookie;
g_return_val_if_fail (GST_IS_RTSP_SERVER (server), NULL);
priv = server->priv;
result = NULL;
if (func)
visited = g_hash_table_new_full (NULL, NULL, g_object_unref, NULL);
GST_RTSP_SERVER_LOCK (server);
restart:
cookie = priv->clients_cookie;
for (walk = priv->clients; walk; walk = next) {
ClientContext *cctx = walk->data;
GstRTSPClient *client = cctx->client;
GstRTSPFilterResult res;
gboolean changed;
next = g_list_next (walk);
if (func)
res = func (server, cctx->client, user_data);
else
if (func) {
/* only visit each media once */
if (g_hash_table_contains (visited, client))
continue;
g_hash_table_add (visited, g_object_ref (client));
GST_RTSP_SERVER_UNLOCK (server);
res = func (server, client, user_data);
GST_RTSP_SERVER_LOCK (server);
} else
res = GST_RTSP_FILTER_REF;
changed = (cookie != priv->clients_cookie);
switch (res) {
case GST_RTSP_FILTER_REMOVE:
/* remove client, FIXME */
break;
case GST_RTSP_FILTER_REF:
result = g_list_prepend (result, g_object_ref (cctx->client));
result = g_list_prepend (result, g_object_ref (client));
break;
case GST_RTSP_FILTER_KEEP:
default:
break;
}
if (changed)
goto restart;
}
GST_RTSP_SERVER_UNLOCK (server);
if (func)
g_hash_table_unref (visited);
return result;
}

View file

@ -50,6 +50,7 @@ struct _GstRTSPSessionPoolPrivate
GMutex lock; /* protects everything in this struct */
guint max_sessions;
GHashTable *sessions;
guint sessions_cookie;
};
#define DEFAULT_MAX_SESSIONS 0
@ -394,6 +395,7 @@ gst_rtsp_session_pool_create (GstRTSPSessionPool * pool)
g_object_ref (result);
g_hash_table_insert (priv->sessions,
(gchar *) gst_rtsp_session_get_sessionid (result), result);
priv->sessions_cookie++;
}
g_mutex_unlock (&priv->lock);
@ -455,6 +457,7 @@ gst_rtsp_session_pool_remove (GstRTSPSessionPool * pool, GstRTSPSession * sess)
g_hash_table_remove (priv->sessions,
gst_rtsp_session_get_sessionid (sess));
if (found) {
priv->sessions_cookie++;
g_signal_emit (pool, gst_rtsp_session_pool_signals[SIGNAL_SESSION_REMOVED],
0, sess);
}
@ -511,44 +514,13 @@ gst_rtsp_session_pool_cleanup (GstRTSPSessionPool * pool)
result =
g_hash_table_foreach_remove (priv->sessions, (GHRFunc) cleanup_func,
&data);
if (result > 0)
priv->sessions_cookie++;
g_mutex_unlock (&priv->lock);
return result;
}
typedef struct
{
GstRTSPSessionPool *pool;
GstRTSPSessionPoolFilterFunc func;
gpointer user_data;
GList *list;
} FilterData;
static gboolean
filter_func (gchar * sessionid, GstRTSPSession * sess, FilterData * data)
{
GstRTSPFilterResult res;
if (data->func)
res = data->func (data->pool, sess, data->user_data);
else
res = GST_RTSP_FILTER_REF;
switch (res) {
case GST_RTSP_FILTER_REMOVE:
g_signal_emit (data->pool,
gst_rtsp_session_pool_signals[SIGNAL_SESSION_REMOVED], 0, sess);
return TRUE;
case GST_RTSP_FILTER_REF:
/* keep ref */
data->list = g_list_prepend (data->list, g_object_ref (sess));
/* fallthrough */
default:
case GST_RTSP_FILTER_KEEP:
return FALSE;
}
}
/**
* gst_rtsp_session_pool_filter:
* @pool: a #GstRTSPSessionPool
@ -580,22 +552,73 @@ gst_rtsp_session_pool_filter (GstRTSPSessionPool * pool,
GstRTSPSessionPoolFilterFunc func, gpointer user_data)
{
GstRTSPSessionPoolPrivate *priv;
FilterData data;
GHashTableIter iter;
gpointer key, value;
GList *result;
GHashTable *visited;
guint cookie;
g_return_val_if_fail (GST_IS_RTSP_SESSION_POOL (pool), NULL);
priv = pool->priv;
data.pool = pool;
data.func = func;
data.user_data = user_data;
data.list = NULL;
result = NULL;
if (func)
visited = g_hash_table_new_full (NULL, NULL, g_object_unref, NULL);
g_mutex_lock (&priv->lock);
g_hash_table_foreach_remove (priv->sessions, (GHRFunc) filter_func, &data);
restart:
g_hash_table_iter_init (&iter, priv->sessions);
cookie = priv->sessions_cookie;
while (g_hash_table_iter_next (&iter, &key, &value)) {
GstRTSPSession *session = value;
GstRTSPFilterResult res;
gboolean changed;
if (func) {
/* only visit each session once */
if (g_hash_table_contains (visited, session))
continue;
g_hash_table_add (visited, g_object_ref (session));
g_mutex_unlock (&priv->lock);
res = func (pool, session, user_data);
g_mutex_lock (&priv->lock);
} else
res = GST_RTSP_FILTER_REF;
changed = (cookie != priv->sessions_cookie);
switch (res) {
case GST_RTSP_FILTER_REMOVE:
g_signal_emit (pool,
gst_rtsp_session_pool_signals[SIGNAL_SESSION_REMOVED], 0, session);
if (changed)
g_hash_table_remove (priv->sessions, key);
else
g_hash_table_iter_remove (&iter);
cookie = ++priv->sessions_cookie;
break;
case GST_RTSP_FILTER_REF:
/* keep ref */
result = g_list_prepend (result, g_object_ref (session));
break;
case GST_RTSP_FILTER_KEEP:
default:
break;
}
if (changed)
goto restart;
}
g_mutex_unlock (&priv->lock);
return data.list;
if (func)
g_hash_table_unref (visited);
return result;
}
typedef struct

View file

@ -62,6 +62,7 @@ struct _GstRTSPSessionPrivate
gint expire_count;
GList *medias;
guint medias_cookie;
};
#undef DEBUG
@ -238,6 +239,7 @@ gst_rtsp_session_manage_media (GstRTSPSession * sess, const gchar * path,
g_mutex_lock (&priv->lock);
priv->medias = g_list_prepend (priv->medias, result);
priv->medias_cookie++;
g_mutex_unlock (&priv->lock);
GST_INFO ("manage new media %p in session %p", media, result);
@ -269,8 +271,10 @@ gst_rtsp_session_release_media (GstRTSPSession * sess,
g_mutex_lock (&priv->lock);
find = g_list_find (priv->medias, media);
if (find)
if (find) {
priv->medias = g_list_delete_link (priv->medias, find);
priv->medias_cookie++;
}
more = (priv->medias != NULL);
g_mutex_unlock (&priv->lock);
@ -359,29 +363,51 @@ gst_rtsp_session_filter (GstRTSPSession * sess,
{
GstRTSPSessionPrivate *priv;
GList *result, *walk, *next;
GHashTable *visited;
guint cookie;
g_return_val_if_fail (GST_IS_RTSP_SESSION (sess), NULL);
priv = sess->priv;
result = NULL;
if (func)
visited = g_hash_table_new_full (NULL, NULL, g_object_unref, NULL);
g_mutex_lock (&priv->lock);
restart:
cookie = priv->medias_cookie;
for (walk = priv->medias; walk; walk = next) {
GstRTSPSessionMedia *media = walk->data;
GstRTSPFilterResult res;
gboolean changed;
next = g_list_next (walk);
if (func)
if (func) {
/* only visit each media once */
if (g_hash_table_contains (visited, media))
continue;
g_hash_table_add (visited, g_object_ref (media));
g_mutex_unlock (&priv->lock);
res = func (sess, media, user_data);
else
g_mutex_lock (&priv->lock);
} else
res = GST_RTSP_FILTER_REF;
changed = (cookie != priv->medias_cookie);
switch (res) {
case GST_RTSP_FILTER_REMOVE:
if (changed)
priv->medias = g_list_remove (priv->medias, media);
else
priv->medias = g_list_delete_link (priv->medias, walk);
cookie = ++priv->medias_cookie;
g_object_unref (media);
priv->medias = g_list_delete_link (priv->medias, walk);
break;
case GST_RTSP_FILTER_REF:
result = g_list_prepend (result, g_object_ref (media));
@ -390,9 +416,14 @@ gst_rtsp_session_filter (GstRTSPSession * sess,
default:
break;
}
if (changed)
goto restart;
}
g_mutex_unlock (&priv->lock);
if (func)
g_hash_table_unref (visited);
return result;
}

View file

@ -124,8 +124,9 @@ struct _GstRTSPStreamPrivate
/* transports we stream to */
guint n_active;
GList *transports;
gboolean tr_changed;
guint transports_cookie;
GList *tr_cache;
guint tr_cache_cookie;
gint dscp_qos;
@ -1503,13 +1504,13 @@ handle_new_sample (GstAppSink * sink, gpointer user_data)
is_rtp = GST_ELEMENT_CAST (sink) == priv->appsink[0];
g_mutex_lock (&priv->lock);
if (priv->tr_changed) {
if (priv->tr_cache_cookie != priv->transports_cookie) {
clear_tr_cache (priv);
for (walk = priv->transports; walk; walk = g_list_next (walk)) {
GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
priv->tr_cache = g_list_prepend (priv->tr_cache, g_object_ref (tr));
}
priv->tr_changed = FALSE;
priv->tr_cache_cookie = priv->transports_cookie;
}
g_mutex_unlock (&priv->lock);
@ -2268,7 +2269,7 @@ update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans,
g_signal_emit_by_name (priv->udpsink[1], "remove", dest, max, NULL);
priv->transports = g_list_remove (priv->transports, trans);
}
priv->tr_changed = TRUE;
priv->transports_cookie++;
break;
}
case GST_RTSP_LOWER_TRANS_TCP:
@ -2279,7 +2280,7 @@ update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans,
GST_INFO ("removing TCP %s", tr->destination);
priv->transports = g_list_remove (priv->transports, trans);
}
priv->tr_changed = TRUE;
priv->transports_cookie++;
break;
default:
goto unknown_transport;
@ -2497,25 +2498,43 @@ gst_rtsp_stream_transport_filter (GstRTSPStream * stream,
{
GstRTSPStreamPrivate *priv;
GList *result, *walk, *next;
GHashTable *visited;
guint cookie;
g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
priv = stream->priv;
result = NULL;
if (func)
visited = g_hash_table_new_full (NULL, NULL, g_object_unref, NULL);
g_mutex_lock (&priv->lock);
restart:
cookie = priv->transports_cookie;
for (walk = priv->transports; walk; walk = next) {
GstRTSPStreamTransport *trans = walk->data;
GstRTSPFilterResult res;
gboolean changed;
next = g_list_next (walk);
if (func)
if (func) {
/* only visit each transport once */
if (g_hash_table_contains (visited, trans))
continue;
g_hash_table_add (visited, g_object_ref (trans));
g_mutex_unlock (&priv->lock);
res = func (stream, trans, user_data);
else
g_mutex_lock (&priv->lock);
} else
res = GST_RTSP_FILTER_REF;
changed = (cookie != priv->transports_cookie);
switch (res) {
case GST_RTSP_FILTER_REMOVE:
update_transport (stream, trans, FALSE);
@ -2527,9 +2546,14 @@ gst_rtsp_stream_transport_filter (GstRTSPStream * stream,
default:
break;
}
if (changed)
goto restart;
}
g_mutex_unlock (&priv->lock);
if (func)
g_hash_table_unref (visited);
return result;
}