Revert lockfree GstBus for the release

Drop in old GstBus code for the release to play it safe, since
regressions that are apparently hard to track down and reproduce
have been reported (on windows/OSX mostly) against the lockfree
version, and more time is needed to fix them.

This reverts commit 03391a8970.
This reverts commit 43cdbc17e6.
This reverts commit 80eb160e0f.
This reverts commit c41b0ade28.
This reverts commit 874d60e589.
This reverts commit 79370d4b17.
This reverts commit 2cb3e52351.
This reverts commit bd1c400114.
This reverts commit 4bf8f1524f.
This reverts commit 14d7db1b52.

https://bugzilla.gnome.org/show_bug.cgi?id=647493
This commit is contained in:
Tim-Philipp Müller 2011-04-26 15:42:46 +01:00
parent 36fc7179cb
commit 06ca275916
4 changed files with 105 additions and 114 deletions

View file

@ -123,6 +123,7 @@ gboolean priv_gst_structure_append_to_gstring (const GstStructure * structure,
gboolean gst_registry_binary_read_cache (GstRegistry * registry, const char *location);
gboolean gst_registry_binary_write_cache (GstRegistry * registry, const char *location);
/* used in gstvalue.c and gststructure.c */
#define GST_ASCII_IS_STRING(c) (g_ascii_isalnum((c)) || ((c) == '_') || \
((c) == '-') || ((c) == '+') || ((c) == '/') || ((c) == ':') || \

View file

@ -540,7 +540,7 @@ gst_bin_init (GstBin * bin, GstBinClass * klass)
bin->clock_dirty = FALSE;
/* Set up a bus for listening to child elements */
bus = g_object_new (GST_TYPE_BUS, "enable-async", FALSE, NULL);
bus = gst_bus_new ();
bin->child_bus = bus;
GST_DEBUG_OBJECT (bin, "using bus %" GST_PTR_FORMAT " to listen to children",
bus);

View file

@ -75,7 +75,6 @@
#include <sys/types.h>
#include "gstinfo.h"
#include "gstpoll.h"
#include "gstbus.h"
@ -89,27 +88,19 @@ enum
LAST_SIGNAL
};
#define DEFAULT_ENABLE_ASYNC (TRUE)
enum
{
PROP_0,
PROP_ENABLE_ASYNC
};
static void gst_bus_dispose (GObject * object);
static void gst_bus_set_main_context (GstBus * bus, GMainContext * ctx);
static GstObjectClass *parent_class = NULL;
static guint gst_bus_signals[LAST_SIGNAL] = { 0 };
struct _GstBusPrivate
{
guint num_sync_message_emitters;
GCond *queue_cond;
GSource *watch_id;
gboolean enable_async;
GstPoll *poll;
GPollFD pollfd;
GMainContext *main_context;
};
G_DEFINE_TYPE (GstBus, gst_bus, GST_TYPE_OBJECT);
@ -142,33 +133,6 @@ marshal_VOID__MINIOBJECT (GClosure * closure, GValue * return_value,
callback (data1, gst_value_get_mini_object (param_values + 1), data2);
}
static void
gst_bus_set_property (GObject * object,
guint prop_id, const GValue * value, GParamSpec * pspec)
{
GstBus *bus = GST_BUS_CAST (object);
switch (prop_id) {
case PROP_ENABLE_ASYNC:
bus->priv->enable_async = g_value_get_boolean (value);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
}
}
static void
gst_bus_constructed (GObject * object)
{
GstBus *bus = GST_BUS_CAST (object);
if (bus->priv->enable_async) {
bus->priv->poll = gst_poll_new_timer ();
gst_poll_get_read_gpollfd (bus->priv->poll, &bus->priv->pollfd);
}
}
static void
gst_bus_class_init (GstBusClass * klass)
{
@ -177,25 +141,6 @@ gst_bus_class_init (GstBusClass * klass)
parent_class = g_type_class_peek_parent (klass);
gobject_class->dispose = gst_bus_dispose;
gobject_class->set_property = gst_bus_set_property;
gobject_class->constructed = gst_bus_constructed;
/* GstBus:enable-async:
*
* Enable async message delivery support for bus watches,
* gst_bus_pop() and similar API. Without this only the
* synchronous message handlers are called.
*
* This property is used to create the child element buses
* in #GstBin.
*
* Since: 0.10.33
*/
g_object_class_install_property (gobject_class, PROP_ENABLE_ASYNC,
g_param_spec_boolean ("enable-async", "Enable Async",
"Enable async message delivery for bus watches and gst_bus_pop()",
DEFAULT_ENABLE_ASYNC,
G_PARAM_CONSTRUCT_ONLY | G_PARAM_WRITABLE | G_PARAM_STATIC_STRINGS));
/**
* GstBus::sync-message:
@ -239,11 +184,11 @@ gst_bus_class_init (GstBusClass * klass)
static void
gst_bus_init (GstBus * bus)
{
bus->queue = gst_atomic_queue_new (32);
bus->queue = g_queue_new ();
bus->queue_lock = g_mutex_new ();
bus->priv = G_TYPE_INSTANCE_GET_PRIVATE (bus, GST_TYPE_BUS, GstBusPrivate);
bus->priv->enable_async = DEFAULT_ENABLE_ASYNC;
bus->priv->queue_cond = g_cond_new ();
GST_DEBUG_OBJECT (bus, "created");
}
@ -258,24 +203,63 @@ gst_bus_dispose (GObject * object)
g_mutex_lock (bus->queue_lock);
do {
message = gst_atomic_queue_pop (bus->queue);
message = g_queue_pop_head (bus->queue);
if (message)
gst_message_unref (message);
} while (message != NULL);
gst_atomic_queue_unref (bus->queue);
g_queue_free (bus->queue);
bus->queue = NULL;
g_mutex_unlock (bus->queue_lock);
g_mutex_free (bus->queue_lock);
bus->queue_lock = NULL;
g_cond_free (bus->priv->queue_cond);
bus->priv->queue_cond = NULL;
}
if (bus->priv->poll)
gst_poll_free (bus->priv->poll);
bus->priv->poll = NULL;
if (bus->priv->main_context) {
g_main_context_unref (bus->priv->main_context);
bus->priv->main_context = NULL;
}
G_OBJECT_CLASS (parent_class)->dispose (object);
}
static void
gst_bus_wakeup_main_context (GstBus * bus)
{
GMainContext *ctx;
GST_OBJECT_LOCK (bus);
if ((ctx = bus->priv->main_context))
g_main_context_ref (ctx);
GST_OBJECT_UNLOCK (bus);
g_main_context_wakeup (ctx);
if (ctx)
g_main_context_unref (ctx);
}
static void
gst_bus_set_main_context (GstBus * bus, GMainContext * ctx)
{
GST_OBJECT_LOCK (bus);
if (bus->priv->main_context != NULL) {
g_main_context_unref (bus->priv->main_context);
bus->priv->main_context = NULL;
}
if (ctx != NULL) {
bus->priv->main_context = g_main_context_ref (ctx);
}
GST_DEBUG_OBJECT (bus, "setting main context to %p, GLib default context: %p",
ctx, g_main_context_default ());
GST_OBJECT_UNLOCK (bus);
}
/**
* gst_bus_new:
*
@ -342,11 +326,6 @@ gst_bus_post (GstBus * bus, GstMessage * message)
&& handler != gst_bus_sync_signal_handler)
gst_bus_sync_signal_handler (bus, message, NULL);
/* If this is a bus without async message delivery
* always drop the message */
if (!bus->priv->poll)
reply = GST_BUS_DROP;
/* now see what we should do with the message */
switch (reply) {
case GST_BUS_DROP:
@ -356,10 +335,14 @@ gst_bus_post (GstBus * bus, GstMessage * message)
case GST_BUS_PASS:
/* pass the message to the async queue, refcount passed in the queue */
GST_DEBUG_OBJECT (bus, "[msg %p] pushing on async queue", message);
gst_atomic_queue_push (bus->queue, message);
gst_poll_write_control (bus->priv->poll);
g_mutex_lock (bus->queue_lock);
g_queue_push_tail (bus->queue, message);
g_cond_broadcast (bus->priv->queue_cond);
g_mutex_unlock (bus->queue_lock);
GST_DEBUG_OBJECT (bus, "[msg %p] pushed on async queue", message);
gst_bus_wakeup_main_context (bus);
break;
case GST_BUS_ASYNC:
{
@ -377,9 +360,12 @@ gst_bus_post (GstBus * bus, GstMessage * message)
* queue. When the message is handled by the app and destroyed,
* the cond will be signalled and we can continue */
g_mutex_lock (lock);
g_mutex_lock (bus->queue_lock);
g_queue_push_tail (bus->queue, message);
g_cond_broadcast (bus->priv->queue_cond);
g_mutex_unlock (bus->queue_lock);
gst_atomic_queue_push (bus->queue, message);
gst_poll_write_control (bus->priv->poll);
gst_bus_wakeup_main_context (bus);
/* now block till the message is freed */
g_cond_wait (cond, lock);
@ -427,8 +413,10 @@ gst_bus_have_pending (GstBus * bus)
g_return_val_if_fail (GST_IS_BUS (bus), FALSE);
g_mutex_lock (bus->queue_lock);
/* see if there is a message on the bus */
result = gst_atomic_queue_length (bus->queue) != 0;
result = !g_queue_is_empty (bus->queue);
g_mutex_unlock (bus->queue_lock);
return result;
}
@ -494,24 +482,18 @@ gst_bus_timed_pop_filtered (GstBus * bus, GstClockTime timeout,
GstMessageType types)
{
GstMessage *message;
GTimeVal now, then;
GTimeVal *timeval, abstimeout;
gboolean first_round = TRUE;
g_return_val_if_fail (GST_IS_BUS (bus), NULL);
g_return_val_if_fail (types != 0, NULL);
g_return_val_if_fail (timeout == 0 || bus->priv->poll != NULL, NULL);
g_mutex_lock (bus->queue_lock);
while (TRUE) {
gint ret;
GST_LOG_OBJECT (bus, "have %d messages", g_queue_get_length (bus->queue));
GST_LOG_OBJECT (bus, "have %d messages",
gst_atomic_queue_length (bus->queue));
while ((message = gst_atomic_queue_pop (bus->queue))) {
if (bus->priv->poll)
gst_poll_read_control (bus->priv->poll);
while ((message = g_queue_pop_head (bus->queue))) {
GST_DEBUG_OBJECT (bus, "got message %p, %s, type mask is %u",
message, GST_MESSAGE_TYPE_NAME (message), (guint) types);
if ((GST_MESSAGE_TYPE (message) & types) != 0) {
@ -528,30 +510,28 @@ gst_bus_timed_pop_filtered (GstBus * bus, GstClockTime timeout,
if (timeout == 0)
break;
else if (timeout != GST_CLOCK_TIME_NONE) {
if (first_round) {
g_get_current_time (&then);
first_round = FALSE;
} else {
GstClockTime elapsed;
if (timeout == GST_CLOCK_TIME_NONE) {
/* wait forever */
timeval = NULL;
} else if (first_round) {
glong add = timeout / 1000;
g_get_current_time (&now);
if (add == 0)
/* no need to wait */
break;
elapsed = GST_TIMEVAL_TO_TIME (now) - GST_TIMEVAL_TO_TIME (then);
if (timeout > elapsed)
timeout -= elapsed;
else
timeout = 0;
}
/* make timeout absolute */
g_get_current_time (&abstimeout);
g_time_val_add (&abstimeout, add);
timeval = &abstimeout;
first_round = FALSE;
GST_DEBUG_OBJECT (bus, "blocking for message, timeout %ld", add);
} else {
/* calculated the absolute end time already, no need to do it again */
GST_DEBUG_OBJECT (bus, "blocking for message, again");
timeval = &abstimeout; /* fool compiler */
}
/* only here in timeout case */
g_assert (bus->priv->poll);
g_mutex_unlock (bus->queue_lock);
ret = gst_poll_wait (bus->priv->poll, timeout);
g_mutex_lock (bus->queue_lock);
if (ret == 0) {
if (!g_cond_timed_wait (bus->priv->queue_cond, bus->queue_lock, timeval)) {
GST_INFO_OBJECT (bus, "timed out, breaking loop");
break;
} else {
@ -664,7 +644,7 @@ gst_bus_peek (GstBus * bus)
g_return_val_if_fail (GST_IS_BUS (bus), NULL);
g_mutex_lock (bus->queue_lock);
message = gst_atomic_queue_peek (bus->queue);
message = g_queue_peek_head (bus->queue);
if (message)
gst_message_ref (message);
g_mutex_unlock (bus->queue_lock);
@ -722,13 +702,24 @@ typedef struct
{
GSource source;
GstBus *bus;
gboolean inited;
} GstBusSource;
static gboolean
gst_bus_source_prepare (GSource * source, gint * timeout)
{
GstBusSource *bsrc = (GstBusSource *) source;
/* we do this here now that we know that we're attached to a main context
* (we don't support detaching a source from a main context and then
* re-attaching it to a different main context) */
if (G_UNLIKELY (!bsrc->inited)) {
gst_bus_set_main_context (bsrc->bus, g_source_get_context (source));
bsrc->inited = TRUE;
}
*timeout = -1;
return FALSE;
return gst_bus_have_pending (bsrc->bus);
}
static gboolean
@ -736,7 +727,7 @@ gst_bus_source_check (GSource * source)
{
GstBusSource *bsrc = (GstBusSource *) source;
return bsrc->bus->priv->pollfd.revents & (G_IO_IN | G_IO_HUP | G_IO_ERR);
return gst_bus_have_pending (bsrc->bus);
}
static gboolean
@ -798,6 +789,7 @@ gst_bus_source_finalize (GSource * source)
bus->priv->watch_id = NULL;
GST_OBJECT_UNLOCK (bus);
gst_bus_set_main_context (bsource->bus, NULL);
gst_object_unref (bsource->bus);
bsource->bus = NULL;
}
@ -825,12 +817,11 @@ gst_bus_create_watch (GstBus * bus)
GstBusSource *source;
g_return_val_if_fail (GST_IS_BUS (bus), NULL);
g_return_val_if_fail (bus->priv->poll != NULL, NULL);
source = (GstBusSource *) g_source_new (&gst_bus_source_funcs,
sizeof (GstBusSource));
source->bus = gst_object_ref (bus);
g_source_add_poll ((GSource *) source, &bus->priv->pollfd);
source->inited = FALSE;
return (GSource *) source;
}

View file

@ -28,7 +28,6 @@ typedef struct _GstBusClass GstBusClass;
#include <gst/gstmessage.h>
#include <gst/gstclock.h>
#include <gst/gstatomicqueue.h>
G_BEGIN_DECLS
@ -116,7 +115,7 @@ struct _GstBus
GstObject object;
/*< private >*/
GstAtomicQueue *queue;
GQueue *queue;
GMutex *queue_lock;
GstBusSyncHandler sync_handler;