harness: Fix race condition when torn down during the handling of a non-serialized query or event

It's possible and normal to tear down a harness while the pipeline is
running. At the same time, it's desired for the
`gst_harness_pad_link_tear_down()` function to be synchronous.

This has created the conflict where the main thread may request a
harness to be torn down while it's in use or about to be used by a pad
in the streaming thread.

The previous implementation of `gst_harness_pad_link_tear_down()` tried
to handle this by taking the stream lock of the harnessed pad and
resetting all the pad functions while holding it. That approach was
however insufficient to handle the case where a non-serialized event
or query is being handled or about to be handled in a different thread.

This edge case was one race condition behind the flakes in the flvmux
check tests -- the rest being covered by https://gitlab.freedesktop.org/gstreamer/gstreamer/-/issues/2803.

This patch fixes the problem by adding an intermediate ref-counted
object, GstHarnessLink, which replaces the usage of the HARNESS_KEY
association. GstHarnessLink allows the pad functions such as event,
query and chain to borrow a reference to GstHarness and more
importantly, to lock the GstHarnessLink during their usage to block
(delay) its destruction until no users are left, and guarantee that any
future user will not receive an invalid GstHarness handle past its
destruction.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/5017>
This commit is contained in:
Alicia Boya García 2023-07-11 23:24:41 +02:00 committed by GStreamer Marge Bot
parent da3b51c0c4
commit 9ca194d8cc
5 changed files with 346 additions and 19 deletions

View file

@ -113,6 +113,7 @@ foreach h: ['gettext.h', 'glib-compat-private.h', 'glib-compat.h',
'../libs/gst/base/gstindex.h',
'../libs/gst/base/gstindex.c',
'../libs/gst/check/internal-check.h',
'../libs/gst/check/gstharnesslink.h',
'parser/grammar.tab.pre.h', 'parse/parse_lex.h', 'types.h',
'gst-printf.h', 'printf-args.h', 'printf-extension.h',
'printf-parse.h', 'vasnprintf.h', 'gstregistrybinary.c',

View file

@ -126,6 +126,7 @@
#endif
#include "gstharness.h"
#include "gstharnesslink.h"
#include <stdio.h>
#include <string.h>
@ -133,8 +134,12 @@
static void gst_harness_stress_free (GstHarnessThread * t);
#define HARNESS_KEY "harness"
/* Keys used for storing and retrieving associations to pads and elements with
* g_object_set_data (): */
/* The HARNESS_REF association inside a GstElement will keep track of how many
* GstHarness'es are attached to that element. */
#define HARNESS_REF "harness-ref"
#define HARNESS_LOCK(h) g_mutex_lock (&(h)->priv->priv_mutex)
#define HARNESS_UNLOCK(h) g_mutex_unlock (&(h)->priv->priv_mutex)
@ -211,10 +216,16 @@ struct _GstHarnessPrivate
static GstFlowReturn
gst_harness_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
{
GstHarness *h = g_object_get_data (G_OBJECT (pad), HARNESS_KEY);
GstHarnessPrivate *priv = h->priv;
GstHarness *h;
GstHarnessLink *link;
GstHarnessPrivate *priv;
(void) parent;
if (!(link = gst_harness_pad_link_lock (pad, &h)))
return GST_FLOW_FLUSHING;
g_assert (h != NULL);
priv = h->priv;
g_mutex_lock (&priv->blocking_push_mutex);
g_atomic_int_inc (&priv->recv_buffers);
@ -233,31 +244,45 @@ gst_harness_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
}
g_mutex_unlock (&priv->blocking_push_mutex);
gst_harness_link_unlock (link);
return GST_FLOW_OK;
}
static gboolean
gst_harness_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
{
GstHarness *h = g_object_get_data (G_OBJECT (pad), HARNESS_KEY);
GstHarnessPrivate *priv = h->priv;
GstHarness *h;
GstHarnessLink *link;
GstHarnessPrivate *priv;
(void) parent;
if (!(link = gst_harness_pad_link_lock (pad, &h)))
return FALSE;
g_assert (h != NULL);
priv = h->priv;
g_atomic_int_inc (&priv->recv_upstream_events);
g_async_queue_push (priv->src_event_queue, event);
gst_harness_link_unlock (link);
return TRUE;
}
static gboolean
gst_harness_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
{
GstHarness *h = g_object_get_data (G_OBJECT (pad), HARNESS_KEY);
GstHarnessPrivate *priv = h->priv;
GstHarness *h;
GstHarnessLink *link;
GstHarnessPrivate *priv;
gboolean ret = TRUE;
gboolean forward;
g_assert (h != NULL);
(void) parent;
if (!(link = gst_harness_pad_link_lock (pad, &h)))
return FALSE;
g_assert (h != NULL);
priv = h->priv;
g_atomic_int_inc (&priv->recv_events);
switch (GST_EVENT_TYPE (event)) {
@ -289,6 +314,7 @@ gst_harness_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
}
HARNESS_UNLOCK (h);
gst_harness_link_unlock (link);
return ret;
}
@ -369,10 +395,16 @@ gst_harness_negotiate (GstHarness * h)
static gboolean
gst_harness_sink_query (GstPad * pad, GstObject * parent, GstQuery * query)
{
GstHarness *h = g_object_get_data (G_OBJECT (pad), HARNESS_KEY);
GstHarnessPrivate *priv = h->priv;
GstHarness *h;
GstHarnessLink *link;
GstHarnessPrivate *priv;
gboolean res = TRUE;
(void) parent;
if (!(link = gst_harness_pad_link_lock (pad, &h)))
return FALSE;
g_assert (h != NULL);
priv = h->priv;
// FIXME: forward all queries?
@ -456,16 +488,23 @@ gst_harness_sink_query (GstPad * pad, GstObject * parent, GstQuery * query)
res = gst_pad_query_default (pad, parent, query);
}
gst_harness_link_unlock (link);
return res;
}
static gboolean
gst_harness_src_query (GstPad * pad, GstObject * parent, GstQuery * query)
{
GstHarness *h = g_object_get_data (G_OBJECT (pad), HARNESS_KEY);
GstHarnessPrivate *priv = h->priv;
GstHarness *h;
GstHarnessLink *link;
GstHarnessPrivate *priv;
gboolean res = TRUE;
(void) parent;
if (!(link = gst_harness_pad_link_lock (pad, &h)))
return FALSE;
g_assert (h != NULL);
priv = h->priv;
switch (GST_QUERY_TYPE (query)) {
case GST_QUERY_LATENCY:
@ -495,6 +534,7 @@ gst_harness_src_query (GstPad * pad, GstObject * parent, GstQuery * query)
default:
res = gst_pad_query_default (pad, parent, query);
}
gst_harness_link_unlock (link);
return res;
}
@ -579,8 +619,8 @@ gst_harness_setup_src_pad (GstHarness * h,
/* sending pad */
h->srcpad = gst_pad_new_from_static_template (src_tmpl, "src");
g_assert (h->srcpad);
g_object_set_data (G_OBJECT (h->srcpad), HARNESS_KEY, h);
gst_harness_pad_link_set (h->srcpad, h);
gst_pad_set_query_function (h->srcpad, gst_harness_src_query);
gst_pad_set_event_function (h->srcpad, gst_harness_src_event);
@ -600,8 +640,8 @@ gst_harness_setup_sink_pad (GstHarness * h,
/* receiving pad */
h->sinkpad = gst_pad_new_from_static_template (sink_tmpl, "sink");
g_assert (h->sinkpad);
g_object_set_data (G_OBJECT (h->sinkpad), HARNESS_KEY, h);
gst_harness_pad_link_set (h->sinkpad, h);
gst_pad_set_chain_function (h->sinkpad, gst_harness_chain);
gst_pad_set_query_function (h->sinkpad, gst_harness_sink_query);
gst_pad_set_event_function (h->sinkpad, gst_harness_sink_event);
@ -1090,10 +1130,9 @@ gst_harness_teardown (GstHarness * h)
/* Make sure our funcs are not called after harness is teared down since
* they try to access this harness through pad data */
GST_PAD_STREAM_LOCK (h->srcpad);
gst_harness_pad_link_tear_down (h->srcpad);
gst_pad_set_event_function (h->srcpad, NULL);
gst_pad_set_query_function (h->srcpad, NULL);
GST_PAD_STREAM_UNLOCK (h->srcpad);
gst_object_unref (h->srcpad);
}
@ -1108,11 +1147,10 @@ gst_harness_teardown (GstHarness * h)
/* Make sure our funcs are not called after harness is teared down since
* they try to access this harness through pad data */
GST_PAD_STREAM_LOCK (h->sinkpad);
gst_harness_pad_link_tear_down (h->sinkpad);
gst_pad_set_chain_function (h->sinkpad, NULL);
gst_pad_set_event_function (h->sinkpad, NULL);
gst_pad_set_query_function (h->sinkpad, NULL);
GST_PAD_STREAM_UNLOCK (h->sinkpad);
gst_object_unref (h->sinkpad);
}

View file

@ -0,0 +1,230 @@
/* GstHarnessLink - A ref counted class arbitrating access to a
* pad harness in a thread-safe manner.
*
* Copyright (C) 2023 Igalia S.L.
* Copyright (C) 2023 Metrological
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 59 Temple Place - Suite 330,
* Boston, MA 02111-1307, USA.
*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include "gstharnesslink.h"
/* The HARNESS_LINK association inside a GstPad will store a pointer to
* GstHarnessLink, which can be used to read and atomically lock the harness
* while in use. */
#define HARNESS_LINK "harness-link"
struct _GstHarnessLink
{
/* rw_lock will be locked for writing for tearing down the harness,
* and locked for reading for any other use. The goal is to allow simultaneous
* access to the harness from multiple threads while guaranteeing that the
* resources of harness won't be freed during use. */
GRWLock rw_lock;
GstHarness *harness;
};
static void
gst_harness_link_init (GstHarnessLink * link)
{
g_rw_lock_init (&link->rw_lock);
}
static void
gst_harness_link_dispose (GstHarnessLink * link)
{
gboolean lock_acquired = g_rw_lock_writer_trylock (&link->rw_lock);
if (lock_acquired)
g_rw_lock_writer_unlock (&link->rw_lock);
else
g_critical
("GstHarnessLink was about to be disposed while having the lock in use.");
g_rw_lock_clear (&link->rw_lock);
}
static GstHarnessLink *
gst_harness_link_new (void)
{
GstHarnessLink *link = g_atomic_rc_box_new0 (GstHarnessLink);
gst_harness_link_init (link);
return link;
}
static GstHarnessLink *
gst_harness_link_ref (GstHarnessLink * link)
{
return g_atomic_rc_box_acquire (link);
}
static void
gst_harness_link_unref (GstHarnessLink * link)
{
g_atomic_rc_box_release_full (link,
(GDestroyNotify) gst_harness_link_dispose);
}
/**
* gst_harness_pad_link_set: (skip)
* @pad: (transfer none): The pad that will be associated with a #GstHarness.
* @harness: (transfer none): The #GstHarness that will be associated with the
* pad.
*
* Creates a new #GstHarnessLink pointing to the provided @harness and
* associates it to the provided @pad.
*
* Once this association is set, the #GstHarness can be obtained using
* @gst_harness_pad_link_lock, which will also lock it until
* @gst_harness_link_unlock is called to prevent the #GstHarness from being
* destroyed while in use.
*/
void
gst_harness_pad_link_set (GstPad * pad, GstHarness * harness)
{
GstHarnessLink *link = gst_harness_link_new ();
link->harness = harness;
// The pad will own a reference to GstHarnessLink.
g_object_set_data_full (G_OBJECT (pad), HARNESS_LINK,
link, (GDestroyNotify) gst_harness_link_unref);
}
static gpointer
_gst_harness_link_dup_func (gpointer harness_link, gpointer user_data)
{
if (harness_link)
gst_harness_link_ref (harness_link);
return harness_link;
}
/**
* gst_harness_pad_link_lock: (skip)
* @pad: (transfer none): A #GstPad that has been at one point associated to a
* #GstHarness.
* @dst_harness: (transfer none): An address where the pointer to #GstHarness
* will be placed.
*
* Find a #GstHarness associated with this @pad and place a pointer to it on
* @dst_harness, locking it to prevent it being destroyed while in use.
*
* Both @dst_harness and the return value will be set to %NULL if the @pad is no
* longer linked to a GstHarness. Generally user code will need to handle this
* gracefully.
*
* Call @gst_harness_link_unlock once you're done using #GstHarness.
*
* Locking the link in this manner is reentrant: it is valid to lock the pad
* link more than once from the same thread as long as @gst_harness_link_unlock
* is called after exactly that many times.
*
* Returns: (transfer full) (nullable): a #GstHarnessLink object that you must
* pass to @gst_harness_link_unlock after being done with the #GstHarness, or
* %NULL if the link has been torn down at this point.
*/
GstHarnessLink *
gst_harness_pad_link_lock (GstPad * pad, GstHarness ** dst_harness)
{
// g_object_dup_data() will call _gst_harness_link_dup_func() while holding
// the mutex of the GObject association table. This guarantees that the
// GstHarnessLink is not destroyed between the time we get the pointer to it
// and increase its refcount.
GstHarnessLink *link = g_object_dup_data (G_OBJECT (pad), HARNESS_LINK,
_gst_harness_link_dup_func, NULL);
if (!link) {
// There is no longer a link between this pad and a GstHarness, as there is
// no associated GstHarnessLink.
*dst_harness = NULL;
return NULL;
}
g_rw_lock_reader_lock (&link->rw_lock);
if ((*dst_harness = link->harness)) {
// This GstHarnessLink has a valid link to GstHarness and will remain valid
// for at least as long as the user holds the lock.
return link;
} else {
// This GstHarnessLink has been torn down, it no longer points to a
// GstHarness. This will happen if we lock the link just after another
// thread torn it down. The GstHarnessLink will stay alive for a little
// longer until its refcount runs out.
g_rw_lock_reader_unlock (&link->rw_lock);
gst_harness_link_unref (link);
return NULL;
}
}
/**
* gst_harness_link_unlock: (skip)
* @link: (nullable) (transfer full): A #GstHarnessLink.
*
* Release the lock of the harness link for this particular thread.
*
* Whenever @gst_harness_pad_link_lock returns non-NULL this function must be
* called after the caller has finished use of the #GstHarness.
*
* The harness data must not be accessed after this function is called, as it is
* no longer guaranteed not to be destroyed.
*
* For convenience, the function will accept %NULL, in which case it will do
* nothing.
*/
void
gst_harness_link_unlock (GstHarnessLink * link)
{
if (!link)
return;
g_rw_lock_reader_unlock (&link->rw_lock);
gst_harness_link_unref (link);
}
/**
* gst_harness_pad_link_tear_down: (skip)
* @pad: (transfer none): A #GstPad that has been at one point associated to a
* #GstHarness.
*
* Reset the link to the harness. Further calls to @gst_harness_pad_link_lock
* will return NULL.
*
* This function will block until every thread that successfully locked the
* harness link with @gst_harness_pad_link_lock has unlocked it with
* @gst_harness_link_unlock.
*/
void
gst_harness_pad_link_tear_down (GstPad * pad)
{
// Steal the reference from the pad. This is still synchronized with
// g_object_dup_data().
GstHarnessLink *link = g_object_steal_data (G_OBJECT (pad), HARNESS_LINK);
g_return_if_fail (link != NULL);
// Take the lock for writing, which will wait for all threads that have locked
// the harness and will block future lock attempts until we unlock.
g_rw_lock_writer_lock (&link->rw_lock);
link->harness = NULL;
g_rw_lock_writer_unlock (&link->rw_lock);
// Unref the reference. In the likely case that no other thread has just done
// g_object_dup_data() and has therefore increase the refcount, this will be
// the last reference and terminate the GstHarnessLink.
gst_harness_link_unref (link);
// Even in the case where there is a remaining reference to GstHarnessLink in
// a different thread, the GstHarness pointer has been cleared at this point,
// so the caller thread can safely tear down the GstHarness.
}

View file

@ -0,0 +1,57 @@
/* GstHarnessLink - A ref counted class arbitrating access to a
* pad harness in a thread-safe manner.
*
* Copyright (C) 2023 Igalia S.L.
* Copyright (C) 2023 Metrological
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 59 Temple Place - Suite 330,
* Boston, MA 02111-1307, USA.
*/
#ifndef __GST_HARNESS_LINK_H__
#define __GST_HARNESS_LINK_H__
#include <gst/gst.h>
#include <gst/check/check-prelude.h>
G_BEGIN_DECLS
typedef struct _GstHarness GstHarness;
/**
* GstHarnessLink: (skip)
*
* Opaque handle that can be used to release a pad lock over the harness.
*/
typedef struct _GstHarnessLink GstHarnessLink;
G_GNUC_INTERNAL
GType gst_harness_link_get_type (void);
G_GNUC_INTERNAL
void gst_harness_pad_link_set (GstPad* pad, GstHarness* harness);
G_GNUC_INTERNAL
GstHarnessLink* gst_harness_pad_link_lock (GstPad* pad, GstHarness** dst_harness);
G_GNUC_INTERNAL
void gst_harness_link_unlock (GstHarnessLink* link);
G_GNUC_INTERNAL
void gst_harness_pad_link_tear_down (GstPad* pad);
G_END_DECLS
#endif /* __GST_HARNESS_LINK_H__ */

View file

@ -3,6 +3,7 @@ gst_check_sources = files(
'gstcheck.c',
'gstconsistencychecker.c',
'gstharness.c',
'gstharnesslink.c',
'gsttestclock.c',
)
gst_check_headers = files(