diff --git a/ChangeLog b/ChangeLog index 9d87657821..bb45255049 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,17 @@ +2007-02-27 Wim Taymans + + * docs/gst/gstreamer-sections.txt: + * gst/gstbus.c: (gst_bus_init), (gst_bus_dispose), (gst_bus_post), + (gst_bus_timed_pop), (gst_bus_pop): + * gst/gstbus.h: + API: gst_bus_timed_pop() + Implement gst_bus_timed_pop() to do a blocking timed wait for a + message to arrive on the bus. + + * tests/check/gst/gstbus.c: (GST_START_TEST), (pop_thread), + (gst_bus_suite): + Two unit tests for new _timed_pop() function. + 2007-02-23 Wim Taymans * gst/gstpipeline.c: (gst_pipeline_change_state), diff --git a/docs/gst/gstreamer-sections.txt b/docs/gst/gstreamer-sections.txt index ea457277d8..9d9fab3dad 100644 --- a/docs/gst/gstreamer-sections.txt +++ b/docs/gst/gstreamer-sections.txt @@ -96,6 +96,7 @@ gst_bus_post gst_bus_have_pending gst_bus_peek gst_bus_pop +gst_bus_timed_pop gst_bus_set_flushing gst_bus_set_sync_handler gst_bus_sync_signal_handler diff --git a/gst/gstbus.c b/gst/gstbus.c index a5a5bfd3f4..facfee8e1c 100644 --- a/gst/gstbus.c +++ b/gst/gstbus.c @@ -105,6 +105,8 @@ static GMainContext *main_context; struct _GstBusPrivate { guint num_sync_message_emitters; + + GCond *queue_cond; }; GType @@ -225,6 +227,7 @@ gst_bus_init (GstBus * bus) bus->queue_lock = g_mutex_new (); bus->priv = G_TYPE_INSTANCE_GET_PRIVATE (bus, GST_TYPE_BUS, GstBusPrivate); + bus->priv->queue_cond = g_cond_new (); GST_DEBUG_OBJECT (bus, "created"); } @@ -250,6 +253,8 @@ gst_bus_dispose (GObject * object) g_mutex_unlock (bus->queue_lock); g_mutex_free (bus->queue_lock); bus->queue_lock = NULL; + g_cond_free (bus->priv->queue_cond); + bus->priv->queue_cond = NULL; } G_OBJECT_CLASS (parent_class)->dispose (object); @@ -360,6 +365,7 @@ gst_bus_post (GstBus * bus, GstMessage * message) GST_DEBUG_OBJECT (bus, "[msg %p] pushing on async queue", message); g_mutex_lock (bus->queue_lock); g_queue_push_tail (bus->queue, message); + g_cond_broadcast (bus->priv->queue_cond); g_mutex_unlock (bus->queue_lock); GST_DEBUG_OBJECT (bus, "[msg %p] pushed on async queue", message); @@ -385,6 +391,7 @@ gst_bus_post (GstBus * bus, GstMessage * message) g_mutex_lock (lock); g_mutex_lock (bus->queue_lock); g_queue_push_tail (bus->queue, message); + g_cond_broadcast (bus->priv->queue_cond); g_mutex_unlock (bus->queue_lock); /* FIXME cannot assume sources are only in the default context */ @@ -478,6 +485,78 @@ gst_bus_set_flushing (GstBus * bus, gboolean flushing) } +/** + * gst_bus_timed_pop: + * @bus: a #GstBus to pop + * @timeout: a timeout + * + * Get a message from the bus, waiting up to the specified timeout. + * + * If @timeout is 0, this function behaves like gst_bus_pop(). If @timeout is + * #GST_CLOCK_TIME_NONE, this function will block forever until a message was + * posted on the bus. + * + * Returns: The #GstMessage that is on the bus after the specified timeout + * or NULL if the bus is empty after the timeout expired. + * The message is taken from the bus and needs to be unreffed with + * gst_message_unref() after usage. + * + * MT safe. + * + * Since: 0.10.12 + */ +GstMessage * +gst_bus_timed_pop (GstBus * bus, GstClockTime timeout) +{ + GstMessage *message; + GTimeVal *timeval, abstimeout; + + g_return_val_if_fail (GST_IS_BUS (bus), NULL); + + g_mutex_lock (bus->queue_lock); + while (TRUE) { + message = g_queue_pop_head (bus->queue); + if (message) { + GST_DEBUG_OBJECT (bus, + "pop from bus, have %d messages, got message %p, %s", + g_queue_get_length (bus->queue) + 1, message, + GST_MESSAGE_TYPE_NAME (message)); + /* exit the loop, we have a message */ + break; + } else { + GST_DEBUG_OBJECT (bus, "pop from bus, no messages"); + /* no need to wait, exit loop */ + if (timeout == 0) + break; + if (timeout == GST_CLOCK_TIME_NONE) { + /* wait forever */ + timeval = NULL; + } else { + glong add = timeout / 1000; + + if (add == 0) + /* no need to wait */ + break; + + /* make timeout absolute */ + g_get_current_time (&abstimeout); + g_time_val_add (&abstimeout, add); + timeval = &abstimeout; + GST_DEBUG_OBJECT (bus, "blocking for message, timeout %ld", add); + } + if (!g_cond_timed_wait (bus->priv->queue_cond, bus->queue_lock, timeval)) { + GST_INFO_OBJECT (bus, "timed out, breaking loop"); + break; + } else { + GST_INFO_OBJECT (bus, "we got woken up, recheck for message"); + } + } + } + g_mutex_unlock (bus->queue_lock); + + return message; +} + /** * gst_bus_pop: * @bus: a #GstBus to pop @@ -493,21 +572,9 @@ gst_bus_set_flushing (GstBus * bus, gboolean flushing) GstMessage * gst_bus_pop (GstBus * bus) { - GstMessage *message; - g_return_val_if_fail (GST_IS_BUS (bus), NULL); - g_mutex_lock (bus->queue_lock); - message = g_queue_pop_head (bus->queue); - if (message) - GST_DEBUG_OBJECT (bus, "pop from bus, have %d messages, got message %p, %s", - g_queue_get_length (bus->queue) + 1, message, - GST_MESSAGE_TYPE_NAME (message)); - else - GST_DEBUG_OBJECT (bus, "pop from bus, no messages"); - g_mutex_unlock (bus->queue_lock); - - return message; + return gst_bus_timed_pop (bus, 0); } /** diff --git a/gst/gstbus.h b/gst/gstbus.h index 8e075813f7..af36438303 100644 --- a/gst/gstbus.h +++ b/gst/gstbus.h @@ -150,6 +150,7 @@ gboolean gst_bus_post (GstBus * bus, GstMessage * message); gboolean gst_bus_have_pending (GstBus * bus); GstMessage * gst_bus_peek (GstBus * bus); GstMessage * gst_bus_pop (GstBus * bus); +GstMessage * gst_bus_timed_pop (GstBus * bus, GstClockTime timeout); void gst_bus_set_flushing (GstBus * bus, gboolean flushing); /* synchronous dispatching */ diff --git a/tests/check/gst/gstbus.c b/tests/check/gst/gstbus.c index 9df63c2425..b06ae3560e 100644 --- a/tests/check/gst/gstbus.c +++ b/tests/check/gst/gstbus.c @@ -183,7 +183,7 @@ GST_START_TEST (test_watch) GST_END_TEST; -static gint messages_seen = 0; +static gint messages_seen; static void message_func (GstBus * bus, GstMessage * message, gpointer data) @@ -213,6 +213,7 @@ GST_START_TEST (test_watch_with_poll) guint i; test_bus = gst_bus_new (); + messages_seen = 0; gst_bus_add_signal_watch (test_bus); g_signal_connect (test_bus, "message", (GCallback) message_func, NULL); @@ -233,6 +234,65 @@ GST_START_TEST (test_watch_with_poll) GST_END_TEST; +/* test that you get the messages with pop. */ +GST_START_TEST (test_timed_pop) +{ + guint i; + + test_bus = gst_bus_new (); + + send_10_app_messages (); + + for (i = 0; i < 10; i++) + gst_message_unref (gst_bus_timed_pop (test_bus, GST_CLOCK_TIME_NONE)); + + fail_if (gst_bus_have_pending (test_bus), "unexpected messages on bus"); + + gst_object_unref (test_bus); +} + +GST_END_TEST; + +/* test that you get the messages with pop from another thread. */ +static gpointer +pop_thread (gpointer data) +{ + GstBus *bus = GST_BUS_CAST (data); + guint i; + + for (i = 0; i < 10; i++) + gst_message_unref (gst_bus_timed_pop (bus, GST_CLOCK_TIME_NONE)); + + return NULL; +} + +GST_START_TEST (test_timed_pop_thread) +{ + GThread *thread; + GError *error = NULL; + + test_bus = gst_bus_new (); + + thread = g_thread_create (pop_thread, test_bus, TRUE, &error); + fail_if (error != NULL); + + send_10_app_messages (); + + g_thread_join (thread); + + fail_if (gst_bus_have_pending (test_bus), "unexpected messages on bus"); + + /* try to pop a message without timeout. */ + fail_if (gst_bus_timed_pop (test_bus, 0) != NULL); + + /* with a small timeout */ + fail_if (gst_bus_timed_pop (test_bus, 1000) != NULL); + + gst_object_unref (test_bus); +} + +GST_END_TEST; + Suite * gst_bus_suite (void) { @@ -245,6 +305,8 @@ gst_bus_suite (void) tcase_add_test (tc_chain, test_hammer_bus); tcase_add_test (tc_chain, test_watch); tcase_add_test (tc_chain, test_watch_with_poll); + tcase_add_test (tc_chain, test_timed_pop); + tcase_add_test (tc_chain, test_timed_pop_thread); return s; }