gstreamer/gst/gstpromise.c
Matthew Waters fdf6a793dc gst: don't use volatile to mean atomic
volatile is not sufficient to provide atomic guarantees and real atomics
should be used instead.  GCC 11 has started warning about using volatile
with atomic operations.

https://gitlab.gnome.org/GNOME/glib/-/merge_requests/1719

Discovered in https://gitlab.freedesktop.org/gstreamer/gst-plugins-good/-/issues/868

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/793>
2021-04-13 01:58:54 +01:00

442 lines
14 KiB
C

/* GStreamer
* Copyright (C) 2017 Matthew Waters <matthew@centricular.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
#ifdef HAVE_CONFIG_H
# include "config.h"
#endif
#define GST_DISABLE_MINIOBJECT_INLINE_FUNCTIONS
#include "gst_private.h"
#include "gstpromise.h"
#define GST_CAT_DEFAULT gst_promise_debug
GST_DEBUG_CATEGORY_STATIC (GST_CAT_DEFAULT);
/**
* SECTION:gstpromise
* @title: GstPromise
* @short_description: a miniobject for future/promise-like functionality
*
* The #GstPromise object implements the container for values that may
* be available later. i.e. a Future or a Promise in
* <https://en.wikipedia.org/wiki/Futures_and_promises>.
* As with all Future/Promise-like functionality, there is the concept of the
* producer of the value and the consumer of the value.
*
* A #GstPromise is created with gst_promise_new() by the consumer and passed
* to the producer to avoid thread safety issues with the change callback.
* A #GstPromise can be replied to with a value (or an error) by the producer
* with gst_promise_reply(). The exact value returned is defined by the API
* contract of the producer and %NULL may be a valid reply.
* gst_promise_interrupt() is for the consumer to
* indicate to the producer that the value is not needed anymore and producing
* that value can stop. The @GST_PROMISE_RESULT_EXPIRED state set by a call
* to gst_promise_expire() indicates to the consumer that a value will never
* be produced and is intended to be called by a third party that implements
* some notion of message handling such as #GstBus.
* A callback can also be installed at #GstPromise creation for
* result changes with gst_promise_new_with_change_func().
* The change callback can be used to chain #GstPromises's together as in the
* following example.
* |[<!-- language="C" -->
* const GstStructure *reply;
* GstPromise *p;
* if (gst_promise_wait (promise) != GST_PROMISE_RESULT_REPLIED)
* return; // interrupted or expired value
* reply = gst_promise_get_reply (promise);
* if (error in reply)
* return; // propagate error
* p = gst_promise_new_with_change_func (another_promise_change_func, user_data, notify);
* pass p to promise-using API
* ]|
*
* Each #GstPromise starts out with a #GstPromiseResult of
* %GST_PROMISE_RESULT_PENDING and only ever transitions once
* into one of the other #GstPromiseResult's.
*
* In order to support multi-threaded code, gst_promise_reply(),
* gst_promise_interrupt() and gst_promise_expire() may all be from
* different threads with some restrictions and the final result of the promise
* is whichever call is made first. There are two restrictions on ordering:
*
* 1. That gst_promise_reply() and gst_promise_interrupt() cannot be called
* after gst_promise_expire()
* 2. That gst_promise_reply() and gst_promise_interrupt()
* cannot be called twice.
*
* The change function set with gst_promise_new_with_change_func() is
* called directly from either the gst_promise_reply(),
* gst_promise_interrupt() or gst_promise_expire() and can be called
* from an arbitrary thread. #GstPromise using APIs can restrict this to
* a single thread or a subset of threads but that is entirely up to the API
* that uses #GstPromise.
*/
static const int immutable_structure_refcount = 2;
#define GST_PROMISE_REPLY(p) (((GstPromiseImpl *)(p))->reply)
#define GST_PROMISE_RESULT(p) (((GstPromiseImpl *)(p))->result)
#define GST_PROMISE_LOCK(p) (&(((GstPromiseImpl *)(p))->lock))
#define GST_PROMISE_COND(p) (&(((GstPromiseImpl *)(p))->cond))
#define GST_PROMISE_CHANGE_FUNC(p) (((GstPromiseImpl *)(p))->change_func)
#define GST_PROMISE_CHANGE_DATA(p) (((GstPromiseImpl *)(p))->user_data)
#define GST_PROMISE_CHANGE_NOTIFY(p) (((GstPromiseImpl *)(p))->notify)
typedef struct
{
GstPromise promise;
GstPromiseResult result;
GstStructure *reply;
GMutex lock;
GCond cond;
GstPromiseChangeFunc change_func;
gpointer user_data;
GDestroyNotify notify;
} GstPromiseImpl;
/**
* gst_promise_wait:
* @promise: a #GstPromise
*
* Wait for @promise to move out of the %GST_PROMISE_RESULT_PENDING state.
* If @promise is not in %GST_PROMISE_RESULT_PENDING then it will return
* immediately with the current result.
*
* Returns: the result of the promise
*
* Since: 1.14
*/
GstPromiseResult
gst_promise_wait (GstPromise * promise)
{
GstPromiseResult ret;
g_return_val_if_fail (promise != NULL, GST_PROMISE_RESULT_EXPIRED);
g_mutex_lock (GST_PROMISE_LOCK (promise));
ret = GST_PROMISE_RESULT (promise);
while (ret == GST_PROMISE_RESULT_PENDING) {
GST_LOG ("%p waiting", promise);
g_cond_wait (GST_PROMISE_COND (promise), GST_PROMISE_LOCK (promise));
ret = GST_PROMISE_RESULT (promise);
}
GST_LOG ("%p waited", promise);
g_mutex_unlock (GST_PROMISE_LOCK (promise));
return ret;
}
/**
* gst_promise_reply:
* @promise: (allow-none): a #GstPromise
* @s: (transfer full) (nullable): a #GstStructure with the the reply contents
*
* Set a reply on @promise. This will wake up any waiters with
* %GST_PROMISE_RESULT_REPLIED. Called by the producer of the value to
* indicate success (or failure).
*
* If @promise has already been interrupted by the consumer, then this reply
* is not visible to the consumer.
*
* Since: 1.14
*/
void
gst_promise_reply (GstPromise * promise, GstStructure * s)
{
GstPromiseChangeFunc change_func = NULL;
gpointer change_data = NULL;
/* Caller requested that no reply is necessary */
if (promise == NULL)
return;
g_mutex_lock (GST_PROMISE_LOCK (promise));
if (GST_PROMISE_RESULT (promise) != GST_PROMISE_RESULT_PENDING &&
GST_PROMISE_RESULT (promise) != GST_PROMISE_RESULT_INTERRUPTED) {
GstPromiseResult result = GST_PROMISE_RESULT (promise);
g_mutex_unlock (GST_PROMISE_LOCK (promise));
g_return_if_fail (result == GST_PROMISE_RESULT_PENDING ||
result == GST_PROMISE_RESULT_INTERRUPTED);
}
/* XXX: is this necessary and valid? */
if (GST_PROMISE_REPLY (promise) && GST_PROMISE_REPLY (promise) != s)
gst_structure_free (GST_PROMISE_REPLY (promise));
/* Only reply iff we are currently in pending */
if (GST_PROMISE_RESULT (promise) == GST_PROMISE_RESULT_PENDING) {
if (s
&& !gst_structure_set_parent_refcount (s,
(int *) &immutable_structure_refcount)) {
g_critical ("Input structure has a parent already!");
g_mutex_unlock (GST_PROMISE_LOCK (promise));
return;
}
GST_PROMISE_RESULT (promise) = GST_PROMISE_RESULT_REPLIED;
GST_LOG ("%p replied", promise);
GST_PROMISE_REPLY (promise) = s;
change_func = GST_PROMISE_CHANGE_FUNC (promise);
change_data = GST_PROMISE_CHANGE_DATA (promise);
} else {
/* eat the value */
if (s)
gst_structure_free (s);
}
g_cond_broadcast (GST_PROMISE_COND (promise));
g_mutex_unlock (GST_PROMISE_LOCK (promise));
if (change_func)
change_func (promise, change_data);
}
/**
* gst_promise_get_reply:
* @promise: a #GstPromise
*
* Retrieve the reply set on @promise. @promise must be in
* %GST_PROMISE_RESULT_REPLIED and the returned structure is owned by @promise
*
* Returns: (transfer none) (nullable): The reply set on @promise
*
* Since: 1.14
*/
const GstStructure *
gst_promise_get_reply (GstPromise * promise)
{
g_return_val_if_fail (promise != NULL, NULL);
g_mutex_lock (GST_PROMISE_LOCK (promise));
if (GST_PROMISE_RESULT (promise) != GST_PROMISE_RESULT_REPLIED) {
GstPromiseResult result = GST_PROMISE_RESULT (promise);
g_mutex_unlock (GST_PROMISE_LOCK (promise));
g_return_val_if_fail (result == GST_PROMISE_RESULT_REPLIED, NULL);
}
g_mutex_unlock (GST_PROMISE_LOCK (promise));
return GST_PROMISE_REPLY (promise);
}
/**
* gst_promise_interrupt:
* @promise: a #GstPromise
*
* Interrupt waiting for a @promise. This will wake up any waiters with
* %GST_PROMISE_RESULT_INTERRUPTED. Called when the consumer does not want
* the value produced anymore.
*
* Since: 1.14
*/
void
gst_promise_interrupt (GstPromise * promise)
{
GstPromiseChangeFunc change_func = NULL;
gpointer change_data = NULL;
g_return_if_fail (promise != NULL);
g_mutex_lock (GST_PROMISE_LOCK (promise));
if (GST_PROMISE_RESULT (promise) != GST_PROMISE_RESULT_PENDING &&
GST_PROMISE_RESULT (promise) != GST_PROMISE_RESULT_REPLIED) {
GstPromiseResult result = GST_PROMISE_RESULT (promise);
g_mutex_unlock (GST_PROMISE_LOCK (promise));
g_return_if_fail (result == GST_PROMISE_RESULT_PENDING ||
result == GST_PROMISE_RESULT_REPLIED);
}
/* only interrupt if we are currently in pending */
if (GST_PROMISE_RESULT (promise) == GST_PROMISE_RESULT_PENDING) {
GST_PROMISE_RESULT (promise) = GST_PROMISE_RESULT_INTERRUPTED;
g_cond_broadcast (GST_PROMISE_COND (promise));
GST_LOG ("%p interrupted", promise);
change_func = GST_PROMISE_CHANGE_FUNC (promise);
change_data = GST_PROMISE_CHANGE_DATA (promise);
}
g_mutex_unlock (GST_PROMISE_LOCK (promise));
if (change_func)
change_func (promise, change_data);
}
/**
* gst_promise_expire:
* @promise: a #GstPromise
*
* Expire a @promise. This will wake up any waiters with
* %GST_PROMISE_RESULT_EXPIRED. Called by a message loop when the parent
* message is handled and/or destroyed (possibly unanswered).
*
* Since: 1.14
*/
void
gst_promise_expire (GstPromise * promise)
{
GstPromiseChangeFunc change_func = NULL;
gpointer change_data = NULL;
g_return_if_fail (promise != NULL);
g_mutex_lock (GST_PROMISE_LOCK (promise));
if (GST_PROMISE_RESULT (promise) == GST_PROMISE_RESULT_PENDING) {
GST_PROMISE_RESULT (promise) = GST_PROMISE_RESULT_EXPIRED;
g_cond_broadcast (GST_PROMISE_COND (promise));
GST_LOG ("%p expired", promise);
change_func = GST_PROMISE_CHANGE_FUNC (promise);
change_data = GST_PROMISE_CHANGE_DATA (promise);
GST_PROMISE_CHANGE_FUNC (promise) = NULL;
GST_PROMISE_CHANGE_DATA (promise) = NULL;
}
g_mutex_unlock (GST_PROMISE_LOCK (promise));
if (change_func)
change_func (promise, change_data);
}
static void
gst_promise_free (GstMiniObject * object)
{
GstPromise *promise = (GstPromise *) object;
/* the promise *must* be dealt with in some way before destruction */
g_warn_if_fail (GST_PROMISE_RESULT (promise) != GST_PROMISE_RESULT_PENDING);
if (GST_PROMISE_CHANGE_NOTIFY (promise))
GST_PROMISE_CHANGE_NOTIFY (promise) (GST_PROMISE_CHANGE_DATA (promise));
if (GST_PROMISE_REPLY (promise)) {
gst_structure_set_parent_refcount (GST_PROMISE_REPLY (promise), NULL);
gst_structure_free (GST_PROMISE_REPLY (promise));
}
g_mutex_clear (GST_PROMISE_LOCK (promise));
g_cond_clear (GST_PROMISE_COND (promise));
GST_LOG ("%p finalized", promise);
#ifdef USE_POISONING
memset (promise, 0xff, sizeof (GstPromiseImpl));
#endif
g_free (promise);
}
static void
gst_promise_init (GstPromise * promise)
{
static gsize _init = 0;
if (g_once_init_enter (&_init)) {
GST_DEBUG_CATEGORY_INIT (gst_promise_debug, "gstpromise", 0, "gstpromise");
g_once_init_leave (&_init, 1);
}
gst_mini_object_init (GST_MINI_OBJECT (promise), 0, GST_TYPE_PROMISE, NULL,
NULL, gst_promise_free);
GST_PROMISE_REPLY (promise) = NULL;
GST_PROMISE_RESULT (promise) = GST_PROMISE_RESULT_PENDING;
g_mutex_init (GST_PROMISE_LOCK (promise));
g_cond_init (GST_PROMISE_COND (promise));
}
/**
* gst_promise_new:
*
* Returns: a new #GstPromise
*
* Since: 1.14
*/
GstPromise *
gst_promise_new (void)
{
GstPromise *promise = GST_PROMISE (g_new0 (GstPromiseImpl, 1));
gst_promise_init (promise);
GST_LOG ("new promise %p", promise);
return promise;
}
/**
* gst_promise_new_with_change_func:
* @func: (scope notified): a #GstPromiseChangeFunc to call
* @user_data: (closure): argument to call @func with
* @notify: notification function that @user_data is no longer needed
*
* @func will be called exactly once when transitioning out of
* %GST_PROMISE_RESULT_PENDING into any of the other #GstPromiseResult
* states.
*
* Returns: a new #GstPromise
*
* Since: 1.14
*/
GstPromise *
gst_promise_new_with_change_func (GstPromiseChangeFunc func, gpointer user_data,
GDestroyNotify notify)
{
GstPromise *promise = gst_promise_new ();
GST_PROMISE_CHANGE_FUNC (promise) = func;
GST_PROMISE_CHANGE_DATA (promise) = user_data;
GST_PROMISE_CHANGE_NOTIFY (promise) = notify;
return promise;
}
GST_DEFINE_MINI_OBJECT_TYPE (GstPromise, gst_promise);
/**
* gst_promise_ref:
* @promise: a #GstPromise.
*
* Increases the refcount of the given @promise by one.
*
* Returns: (transfer full): @promise
*
* Since: 1.14
*/
GstPromise *
gst_promise_ref (GstPromise * promise)
{
return (GstPromise *) gst_mini_object_ref (GST_MINI_OBJECT_CAST (promise));
}
/**
* gst_promise_unref:
* @promise: (transfer full): a #GstPromise.
*
* Decreases the refcount of the promise. If the refcount reaches 0, the
* promise will be freed.
*
* Since: 1.14
*/
void
gst_promise_unref (GstPromise * promise)
{
gst_mini_object_unref (GST_MINI_OBJECT_CAST (promise));
}