mirror of
https://gitlab.freedesktop.org/gstreamer/gstreamer.git
synced 2024-11-18 07:47:17 +00:00
API: gst_bus_timed_pop()
Original commit message from CVS: * docs/gst/gstreamer-sections.txt: * gst/gstbus.c: (gst_bus_init), (gst_bus_dispose), (gst_bus_post), (gst_bus_timed_pop), (gst_bus_pop): * gst/gstbus.h: API: gst_bus_timed_pop() Implement gst_bus_timed_pop() to do a blocking timed wait for a message to arrive on the bus. * tests/check/gst/gstbus.c: (GST_START_TEST), (pop_thread), (gst_bus_suite): Two unit tests for new _timed_pop() function.
This commit is contained in:
parent
c574a01484
commit
7553c996a4
5 changed files with 159 additions and 14 deletions
14
ChangeLog
14
ChangeLog
|
@ -1,3 +1,17 @@
|
|||
2007-02-27 Wim Taymans <wim@fluendo.com>
|
||||
|
||||
* docs/gst/gstreamer-sections.txt:
|
||||
* gst/gstbus.c: (gst_bus_init), (gst_bus_dispose), (gst_bus_post),
|
||||
(gst_bus_timed_pop), (gst_bus_pop):
|
||||
* gst/gstbus.h:
|
||||
API: gst_bus_timed_pop()
|
||||
Implement gst_bus_timed_pop() to do a blocking timed wait for a
|
||||
message to arrive on the bus.
|
||||
|
||||
* tests/check/gst/gstbus.c: (GST_START_TEST), (pop_thread),
|
||||
(gst_bus_suite):
|
||||
Two unit tests for new _timed_pop() function.
|
||||
|
||||
2007-02-23 Wim Taymans <wim@fluendo.com>
|
||||
|
||||
* gst/gstpipeline.c: (gst_pipeline_change_state),
|
||||
|
|
|
@ -96,6 +96,7 @@ gst_bus_post
|
|||
gst_bus_have_pending
|
||||
gst_bus_peek
|
||||
gst_bus_pop
|
||||
gst_bus_timed_pop
|
||||
gst_bus_set_flushing
|
||||
gst_bus_set_sync_handler
|
||||
gst_bus_sync_signal_handler
|
||||
|
|
93
gst/gstbus.c
93
gst/gstbus.c
|
@ -105,6 +105,8 @@ static GMainContext *main_context;
|
|||
struct _GstBusPrivate
|
||||
{
|
||||
guint num_sync_message_emitters;
|
||||
|
||||
GCond *queue_cond;
|
||||
};
|
||||
|
||||
GType
|
||||
|
@ -225,6 +227,7 @@ gst_bus_init (GstBus * bus)
|
|||
bus->queue_lock = g_mutex_new ();
|
||||
|
||||
bus->priv = G_TYPE_INSTANCE_GET_PRIVATE (bus, GST_TYPE_BUS, GstBusPrivate);
|
||||
bus->priv->queue_cond = g_cond_new ();
|
||||
|
||||
GST_DEBUG_OBJECT (bus, "created");
|
||||
}
|
||||
|
@ -250,6 +253,8 @@ gst_bus_dispose (GObject * object)
|
|||
g_mutex_unlock (bus->queue_lock);
|
||||
g_mutex_free (bus->queue_lock);
|
||||
bus->queue_lock = NULL;
|
||||
g_cond_free (bus->priv->queue_cond);
|
||||
bus->priv->queue_cond = NULL;
|
||||
}
|
||||
|
||||
G_OBJECT_CLASS (parent_class)->dispose (object);
|
||||
|
@ -360,6 +365,7 @@ gst_bus_post (GstBus * bus, GstMessage * message)
|
|||
GST_DEBUG_OBJECT (bus, "[msg %p] pushing on async queue", message);
|
||||
g_mutex_lock (bus->queue_lock);
|
||||
g_queue_push_tail (bus->queue, message);
|
||||
g_cond_broadcast (bus->priv->queue_cond);
|
||||
g_mutex_unlock (bus->queue_lock);
|
||||
GST_DEBUG_OBJECT (bus, "[msg %p] pushed on async queue", message);
|
||||
|
||||
|
@ -385,6 +391,7 @@ gst_bus_post (GstBus * bus, GstMessage * message)
|
|||
g_mutex_lock (lock);
|
||||
g_mutex_lock (bus->queue_lock);
|
||||
g_queue_push_tail (bus->queue, message);
|
||||
g_cond_broadcast (bus->priv->queue_cond);
|
||||
g_mutex_unlock (bus->queue_lock);
|
||||
|
||||
/* FIXME cannot assume sources are only in the default context */
|
||||
|
@ -478,6 +485,78 @@ gst_bus_set_flushing (GstBus * bus, gboolean flushing)
|
|||
}
|
||||
|
||||
|
||||
/**
|
||||
* gst_bus_timed_pop:
|
||||
* @bus: a #GstBus to pop
|
||||
* @timeout: a timeout
|
||||
*
|
||||
* Get a message from the bus, waiting up to the specified timeout.
|
||||
*
|
||||
* If @timeout is 0, this function behaves like gst_bus_pop(). If @timeout is
|
||||
* #GST_CLOCK_TIME_NONE, this function will block forever until a message was
|
||||
* posted on the bus.
|
||||
*
|
||||
* Returns: The #GstMessage that is on the bus after the specified timeout
|
||||
* or NULL if the bus is empty after the timeout expired.
|
||||
* The message is taken from the bus and needs to be unreffed with
|
||||
* gst_message_unref() after usage.
|
||||
*
|
||||
* MT safe.
|
||||
*
|
||||
* Since: 0.10.12
|
||||
*/
|
||||
GstMessage *
|
||||
gst_bus_timed_pop (GstBus * bus, GstClockTime timeout)
|
||||
{
|
||||
GstMessage *message;
|
||||
GTimeVal *timeval, abstimeout;
|
||||
|
||||
g_return_val_if_fail (GST_IS_BUS (bus), NULL);
|
||||
|
||||
g_mutex_lock (bus->queue_lock);
|
||||
while (TRUE) {
|
||||
message = g_queue_pop_head (bus->queue);
|
||||
if (message) {
|
||||
GST_DEBUG_OBJECT (bus,
|
||||
"pop from bus, have %d messages, got message %p, %s",
|
||||
g_queue_get_length (bus->queue) + 1, message,
|
||||
GST_MESSAGE_TYPE_NAME (message));
|
||||
/* exit the loop, we have a message */
|
||||
break;
|
||||
} else {
|
||||
GST_DEBUG_OBJECT (bus, "pop from bus, no messages");
|
||||
/* no need to wait, exit loop */
|
||||
if (timeout == 0)
|
||||
break;
|
||||
if (timeout == GST_CLOCK_TIME_NONE) {
|
||||
/* wait forever */
|
||||
timeval = NULL;
|
||||
} else {
|
||||
glong add = timeout / 1000;
|
||||
|
||||
if (add == 0)
|
||||
/* no need to wait */
|
||||
break;
|
||||
|
||||
/* make timeout absolute */
|
||||
g_get_current_time (&abstimeout);
|
||||
g_time_val_add (&abstimeout, add);
|
||||
timeval = &abstimeout;
|
||||
GST_DEBUG_OBJECT (bus, "blocking for message, timeout %ld", add);
|
||||
}
|
||||
if (!g_cond_timed_wait (bus->priv->queue_cond, bus->queue_lock, timeval)) {
|
||||
GST_INFO_OBJECT (bus, "timed out, breaking loop");
|
||||
break;
|
||||
} else {
|
||||
GST_INFO_OBJECT (bus, "we got woken up, recheck for message");
|
||||
}
|
||||
}
|
||||
}
|
||||
g_mutex_unlock (bus->queue_lock);
|
||||
|
||||
return message;
|
||||
}
|
||||
|
||||
/**
|
||||
* gst_bus_pop:
|
||||
* @bus: a #GstBus to pop
|
||||
|
@ -493,21 +572,9 @@ gst_bus_set_flushing (GstBus * bus, gboolean flushing)
|
|||
GstMessage *
|
||||
gst_bus_pop (GstBus * bus)
|
||||
{
|
||||
GstMessage *message;
|
||||
|
||||
g_return_val_if_fail (GST_IS_BUS (bus), NULL);
|
||||
|
||||
g_mutex_lock (bus->queue_lock);
|
||||
message = g_queue_pop_head (bus->queue);
|
||||
if (message)
|
||||
GST_DEBUG_OBJECT (bus, "pop from bus, have %d messages, got message %p, %s",
|
||||
g_queue_get_length (bus->queue) + 1, message,
|
||||
GST_MESSAGE_TYPE_NAME (message));
|
||||
else
|
||||
GST_DEBUG_OBJECT (bus, "pop from bus, no messages");
|
||||
g_mutex_unlock (bus->queue_lock);
|
||||
|
||||
return message;
|
||||
return gst_bus_timed_pop (bus, 0);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -150,6 +150,7 @@ gboolean gst_bus_post (GstBus * bus, GstMessage * message);
|
|||
gboolean gst_bus_have_pending (GstBus * bus);
|
||||
GstMessage * gst_bus_peek (GstBus * bus);
|
||||
GstMessage * gst_bus_pop (GstBus * bus);
|
||||
GstMessage * gst_bus_timed_pop (GstBus * bus, GstClockTime timeout);
|
||||
void gst_bus_set_flushing (GstBus * bus, gboolean flushing);
|
||||
|
||||
/* synchronous dispatching */
|
||||
|
|
|
@ -183,7 +183,7 @@ GST_START_TEST (test_watch)
|
|||
|
||||
GST_END_TEST;
|
||||
|
||||
static gint messages_seen = 0;
|
||||
static gint messages_seen;
|
||||
|
||||
static void
|
||||
message_func (GstBus * bus, GstMessage * message, gpointer data)
|
||||
|
@ -213,6 +213,7 @@ GST_START_TEST (test_watch_with_poll)
|
|||
guint i;
|
||||
|
||||
test_bus = gst_bus_new ();
|
||||
messages_seen = 0;
|
||||
|
||||
gst_bus_add_signal_watch (test_bus);
|
||||
g_signal_connect (test_bus, "message", (GCallback) message_func, NULL);
|
||||
|
@ -233,6 +234,65 @@ GST_START_TEST (test_watch_with_poll)
|
|||
|
||||
GST_END_TEST;
|
||||
|
||||
/* test that you get the messages with pop. */
|
||||
GST_START_TEST (test_timed_pop)
|
||||
{
|
||||
guint i;
|
||||
|
||||
test_bus = gst_bus_new ();
|
||||
|
||||
send_10_app_messages ();
|
||||
|
||||
for (i = 0; i < 10; i++)
|
||||
gst_message_unref (gst_bus_timed_pop (test_bus, GST_CLOCK_TIME_NONE));
|
||||
|
||||
fail_if (gst_bus_have_pending (test_bus), "unexpected messages on bus");
|
||||
|
||||
gst_object_unref (test_bus);
|
||||
}
|
||||
|
||||
GST_END_TEST;
|
||||
|
||||
/* test that you get the messages with pop from another thread. */
|
||||
static gpointer
|
||||
pop_thread (gpointer data)
|
||||
{
|
||||
GstBus *bus = GST_BUS_CAST (data);
|
||||
guint i;
|
||||
|
||||
for (i = 0; i < 10; i++)
|
||||
gst_message_unref (gst_bus_timed_pop (bus, GST_CLOCK_TIME_NONE));
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
GST_START_TEST (test_timed_pop_thread)
|
||||
{
|
||||
GThread *thread;
|
||||
GError *error = NULL;
|
||||
|
||||
test_bus = gst_bus_new ();
|
||||
|
||||
thread = g_thread_create (pop_thread, test_bus, TRUE, &error);
|
||||
fail_if (error != NULL);
|
||||
|
||||
send_10_app_messages ();
|
||||
|
||||
g_thread_join (thread);
|
||||
|
||||
fail_if (gst_bus_have_pending (test_bus), "unexpected messages on bus");
|
||||
|
||||
/* try to pop a message without timeout. */
|
||||
fail_if (gst_bus_timed_pop (test_bus, 0) != NULL);
|
||||
|
||||
/* with a small timeout */
|
||||
fail_if (gst_bus_timed_pop (test_bus, 1000) != NULL);
|
||||
|
||||
gst_object_unref (test_bus);
|
||||
}
|
||||
|
||||
GST_END_TEST;
|
||||
|
||||
Suite *
|
||||
gst_bus_suite (void)
|
||||
{
|
||||
|
@ -245,6 +305,8 @@ gst_bus_suite (void)
|
|||
tcase_add_test (tc_chain, test_hammer_bus);
|
||||
tcase_add_test (tc_chain, test_watch);
|
||||
tcase_add_test (tc_chain, test_watch_with_poll);
|
||||
tcase_add_test (tc_chain, test_timed_pop);
|
||||
tcase_add_test (tc_chain, test_timed_pop_thread);
|
||||
return s;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue