API: add gst_bus_pop_filtered

Original commit message from CVS:
* docs/gst/gstreamer-sections.txt:
* gst/gstbus.c:
* gst/gstbus.h:
API: add gst_bus_pop_filtered
API: add gst_bus_timed_pop_filtered
Two new functions for waiting for specific message types on the
bus for a specified amount of time without iterating any main
loops or main contexts.
* tests/check/gst/gstbus.c:
Some tests for the new functions.
This commit is contained in:
Tim-Philipp Müller 2007-10-16 20:30:13 +00:00
parent 4f6acb610f
commit 13c6e89d6c
5 changed files with 282 additions and 49 deletions

View file

@ -1,3 +1,17 @@
2007-10-16 Tim-Philipp Müller <tim at centricular dot net>
* docs/gst/gstreamer-sections.txt:
* gst/gstbus.c:
* gst/gstbus.h:
API: add gst_bus_pop_filtered
API: add gst_bus_timed_pop_filtered
Two new functions for waiting for specific message types on the
bus for a specified amount of time without iterating any main
loops or main contexts.
* tests/check/gst/gstbus.c:
Some tests for the new functions.
2007-10-16 Tim-Philipp Müller <tim at centricular dot net> 2007-10-16 Tim-Philipp Müller <tim at centricular dot net>
* docs/libs/gstreamer-libs-sections.txt: * docs/libs/gstreamer-libs-sections.txt:

View file

@ -97,7 +97,9 @@ gst_bus_post
gst_bus_have_pending gst_bus_have_pending
gst_bus_peek gst_bus_peek
gst_bus_pop gst_bus_pop
gst_bus_pop_filtered
gst_bus_timed_pop gst_bus_timed_pop
gst_bus_timed_pop_filtered
gst_bus_set_flushing gst_bus_set_flushing
gst_bus_set_sync_handler gst_bus_set_sync_handler
gst_bus_sync_signal_handler gst_bus_sync_signal_handler

View file

@ -331,8 +331,10 @@ gst_bus_post (GstBus * bus, GstMessage * message)
g_return_val_if_fail (GST_IS_BUS (bus), FALSE); g_return_val_if_fail (GST_IS_BUS (bus), FALSE);
g_return_val_if_fail (GST_IS_MESSAGE (message), FALSE); g_return_val_if_fail (GST_IS_MESSAGE (message), FALSE);
GST_DEBUG_OBJECT (bus, "[msg %p] posting on bus, type %s, %" GST_PTR_FORMAT, GST_DEBUG_OBJECT (bus, "[msg %p] posting on bus, type %s, %" GST_PTR_FORMAT
message, GST_MESSAGE_TYPE_NAME (message), message->structure); " from source %" GST_PTR_FORMAT,
message, GST_MESSAGE_TYPE_NAME (message), message->structure,
message->src);
GST_OBJECT_LOCK (bus); GST_OBJECT_LOCK (bus);
/* check if the bus is flushing */ /* check if the bus is flushing */
@ -484,6 +486,98 @@ gst_bus_set_flushing (GstBus * bus, gboolean flushing)
GST_OBJECT_UNLOCK (bus); GST_OBJECT_UNLOCK (bus);
} }
/**
* gst_bus_timed_pop_filtered:
* @bus: a #GstBus to pop from
* @timeout: a timeout in nanoseconds, or GST_CLOCK_TIME_NONE to wait forever
* @types: message types to take into account, GST_MESSAGE_ANY for any type
*
* Get a message from the bus whose type matches the message type mask @types,
* waiting up to the specified timeout (and discarding any messages that do not
* match the mask provided).
*
* If @timeout is 0, this function behaves like gst_bus_pop_filtered(). If
* @timeout is #GST_CLOCK_TIME_NONE, this function will block forever until a
* matching message was posted on the bus.
*
* Returns: a #GstMessage matching the filter in @types, or NULL if no matching
* message was found on the bus until the timeout expired.
* The message is taken from the bus and needs to be unreffed with
* gst_message_unref() after usage.
*
* MT safe.
*
* Since: 0.10.15
*/
GstMessage *
gst_bus_timed_pop_filtered (GstBus * bus, GstClockTime timeout,
GstMessageType types)
{
GstMessage *message;
GTimeVal *timeval, abstimeout;
gboolean first_round = TRUE;
g_return_val_if_fail (GST_IS_BUS (bus), NULL);
g_return_val_if_fail (types != 0, NULL);
g_mutex_lock (bus->queue_lock);
while (TRUE) {
GST_LOG_OBJECT (bus, "have %d messages", g_queue_get_length (bus->queue));
while ((message = g_queue_pop_head (bus->queue))) {
GST_DEBUG_OBJECT (bus, "got message %p, %s, type mask is %u",
message, GST_MESSAGE_TYPE_NAME (message), (guint) types);
if ((GST_MESSAGE_TYPE (message) & types) != 0) {
/* exit the loop, we have a message */
goto beach;
} else {
GST_DEBUG_OBJECT (bus, "discarding message, does not match mask");
gst_message_unref (message);
message = NULL;
}
}
/* no need to wait, exit loop */
if (timeout == 0)
break;
if (timeout == GST_CLOCK_TIME_NONE) {
/* wait forever */
timeval = NULL;
} else if (first_round) {
glong add = timeout / 1000;
if (add == 0)
/* no need to wait */
break;
/* make timeout absolute */
g_get_current_time (&abstimeout);
g_time_val_add (&abstimeout, add);
timeval = &abstimeout;
first_round = FALSE;
GST_DEBUG_OBJECT (bus, "blocking for message, timeout %ld", add);
} else {
/* calculated the absolute end time already, no need to do it again */
GST_DEBUG_OBJECT (bus, "blocking for message, again");
timeval = &abstimeout; /* fool compiler */
}
if (!g_cond_timed_wait (bus->priv->queue_cond, bus->queue_lock, timeval)) {
GST_INFO_OBJECT (bus, "timed out, breaking loop");
break;
} else {
GST_INFO_OBJECT (bus, "we got woken up, recheck for message");
}
}
beach:
g_mutex_unlock (bus->queue_lock);
return message;
}
/** /**
* gst_bus_timed_pop: * gst_bus_timed_pop:
@ -508,53 +602,37 @@ gst_bus_set_flushing (GstBus * bus, gboolean flushing)
GstMessage * GstMessage *
gst_bus_timed_pop (GstBus * bus, GstClockTime timeout) gst_bus_timed_pop (GstBus * bus, GstClockTime timeout)
{ {
GstMessage *message;
GTimeVal *timeval, abstimeout;
g_return_val_if_fail (GST_IS_BUS (bus), NULL); g_return_val_if_fail (GST_IS_BUS (bus), NULL);
g_mutex_lock (bus->queue_lock); return gst_bus_timed_pop_filtered (bus, timeout, GST_MESSAGE_ANY);
while (TRUE) { }
message = g_queue_pop_head (bus->queue);
if (message) {
GST_DEBUG_OBJECT (bus,
"pop from bus, have %d messages, got message %p, %s",
g_queue_get_length (bus->queue) + 1, message,
GST_MESSAGE_TYPE_NAME (message));
/* exit the loop, we have a message */
break;
} else {
GST_DEBUG_OBJECT (bus, "pop from bus, no messages");
/* no need to wait, exit loop */
if (timeout == 0)
break;
if (timeout == GST_CLOCK_TIME_NONE) {
/* wait forever */
timeval = NULL;
} else {
glong add = timeout / 1000;
if (add == 0) /**
/* no need to wait */ * gst_bus_pop_filtered:
break; * @bus: a #GstBus to pop
* @types: message types to take into account
*
* Get a message matching @type from the bus. Will discard all messages on
* the bus that do not match @type and that have been posted before the first
* message that does match @type. If there is no message matching @type on
* the bus, all messages will be discarded.
*
* Returns: The next #GstMessage matching @type that is on the bus, or NULL if
* the bus is empty or there is no message matching @type.
* The message is taken from the bus and needs to be unreffed with
* gst_message_unref() after usage.
*
* MT safe.
*
* Since: 0.10.15
*/
GstMessage *
gst_bus_pop_filtered (GstBus * bus, GstMessageType types)
{
g_return_val_if_fail (GST_IS_BUS (bus), NULL);
g_return_val_if_fail (types != 0, NULL);
/* make timeout absolute */ return gst_bus_timed_pop_filtered (bus, 0, types);
g_get_current_time (&abstimeout);
g_time_val_add (&abstimeout, add);
timeval = &abstimeout;
GST_DEBUG_OBJECT (bus, "blocking for message, timeout %ld", add);
}
if (!g_cond_timed_wait (bus->priv->queue_cond, bus->queue_lock, timeval)) {
GST_INFO_OBJECT (bus, "timed out, breaking loop");
break;
} else {
GST_INFO_OBJECT (bus, "we got woken up, recheck for message");
}
}
}
g_mutex_unlock (bus->queue_lock);
return message;
} }
/** /**
@ -574,7 +652,7 @@ gst_bus_pop (GstBus * bus)
{ {
g_return_val_if_fail (GST_IS_BUS (bus), NULL); g_return_val_if_fail (GST_IS_BUS (bus), NULL);
return gst_bus_timed_pop (bus, 0); return gst_bus_timed_pop_filtered (bus, 0, GST_MESSAGE_ANY);
} }
/** /**

View file

@ -150,7 +150,9 @@ gboolean gst_bus_post (GstBus * bus, GstMessage * message);
gboolean gst_bus_have_pending (GstBus * bus); gboolean gst_bus_have_pending (GstBus * bus);
GstMessage * gst_bus_peek (GstBus * bus); GstMessage * gst_bus_peek (GstBus * bus);
GstMessage * gst_bus_pop (GstBus * bus); GstMessage * gst_bus_pop (GstBus * bus);
GstMessage * gst_bus_pop_filtered (GstBus * bus, GstMessageType types);
GstMessage * gst_bus_timed_pop (GstBus * bus, GstClockTime timeout); GstMessage * gst_bus_timed_pop (GstBus * bus, GstClockTime timeout);
GstMessage * gst_bus_timed_pop_filtered (GstBus * bus, GstClockTime timeout, GstMessageType types);
void gst_bus_set_flushing (GstBus * bus, gboolean flushing); void gst_bus_set_flushing (GstBus * bus, gboolean flushing);
/* synchronous dispatching */ /* synchronous dispatching */

View file

@ -1,7 +1,6 @@
/* GStreamer /* GStreamer message bus unit tests
* Copyright (C) 2005 Andy Wingo <wingo@pobox.com> * Copyright (C) 2005 Andy Wingo <wingo@pobox.com>
* * Copyright (C) 2007 Tim-Philipp Müller <tim centricular net>
* gstbus.c: Unit test for the message bus
* *
* This library is free software; you can redistribute it and/or * This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public * modify it under the terms of the GNU Library General Public
@ -193,6 +192,42 @@ message_func (GstBus * bus, GstMessage * message, gpointer data)
messages_seen++; messages_seen++;
} }
static void
send_5app_1el_1err_2app_messages (guint interval_usecs)
{
GstMessage *m;
GstStructure *s;
gint i;
for (i = 0; i < 5; i++) {
s = gst_structure_new ("test_message", "msg_id", G_TYPE_INT, i, NULL);
m = gst_message_new_application (NULL, s);
GST_LOG ("posting application message");
gst_bus_post (test_bus, m);
g_usleep (interval_usecs);
}
for (i = 0; i < 1; i++) {
s = gst_structure_new ("test_message", "msg_id", G_TYPE_INT, i, NULL);
m = gst_message_new_element (NULL, s);
GST_LOG ("posting element message");
gst_bus_post (test_bus, m);
g_usleep (interval_usecs);
}
for (i = 0; i < 1; i++) {
m = gst_message_new_error (NULL, NULL, "debug string");
GST_LOG ("posting error message");
gst_bus_post (test_bus, m);
g_usleep (interval_usecs);
}
for (i = 0; i < 2; i++) {
s = gst_structure_new ("test_message", "msg_id", G_TYPE_INT, i, NULL);
m = gst_message_new_application (NULL, s);
GST_LOG ("posting application message");
gst_bus_post (test_bus, m);
g_usleep (interval_usecs);
}
}
static void static void
send_10_app_messages (void) send_10_app_messages (void)
{ {
@ -253,6 +288,106 @@ GST_START_TEST (test_timed_pop)
GST_END_TEST; GST_END_TEST;
/* test that you get the messages with pop_filtered */
GST_START_TEST (test_timed_pop_filtered)
{
GstMessage *msg;
guint i;
test_bus = gst_bus_new ();
send_10_app_messages ();
for (i = 0; i < 10; i++) {
msg = gst_bus_timed_pop_filtered (test_bus, GST_CLOCK_TIME_NONE,
GST_MESSAGE_ANY);
fail_unless (msg != NULL);
gst_message_unref (msg);
}
/* should flush all messages on the bus with types not matching */
send_10_app_messages ();
msg = gst_bus_timed_pop_filtered (test_bus, 0,
GST_MESSAGE_ANY ^ GST_MESSAGE_APPLICATION);
fail_unless (msg == NULL);
msg = gst_bus_timed_pop_filtered (test_bus, GST_SECOND / 2,
GST_MESSAGE_ANY ^ GST_MESSAGE_APPLICATION);
fail_unless (msg == NULL);
/* there should be nothing on the bus now */
fail_if (gst_bus_have_pending (test_bus), "unexpected messages on bus");
msg = gst_bus_timed_pop_filtered (test_bus, 0, GST_MESSAGE_ANY);
fail_unless (msg == NULL);
send_5app_1el_1err_2app_messages (0);
msg = gst_bus_timed_pop_filtered (test_bus, 0,
GST_MESSAGE_ANY ^ GST_MESSAGE_APPLICATION);
fail_unless (msg != NULL);
fail_unless_equals_int (GST_MESSAGE_TYPE (msg), GST_MESSAGE_ELEMENT);
gst_message_unref (msg);
fail_unless (gst_bus_have_pending (test_bus), "expected messages on bus");
msg = gst_bus_timed_pop_filtered (test_bus, 0, GST_MESSAGE_APPLICATION);
fail_unless (msg != NULL);
fail_unless_equals_int (GST_MESSAGE_TYPE (msg), GST_MESSAGE_APPLICATION);
gst_message_unref (msg);
msg = gst_bus_timed_pop_filtered (test_bus, 0, GST_MESSAGE_ERROR);
fail_unless (msg == NULL);
gst_object_unref (test_bus);
}
GST_END_TEST;
static gpointer
post_delayed_thread (gpointer data)
{
THREAD_START ();
send_5app_1el_1err_2app_messages (1 * G_USEC_PER_SEC);
return NULL;
}
/* test that you get the messages with pop_filtered if there's a timeout*/
GST_START_TEST (test_timed_pop_filtered_with_timeout)
{
GstMessage *msg;
MAIN_INIT ();
test_bus = gst_bus_new ();
MAIN_START_THREAD_FUNCTIONS (1, post_delayed_thread, NULL);
MAIN_SYNCHRONIZE ();
msg = gst_bus_timed_pop_filtered (test_bus, 2 * GST_SECOND,
GST_MESSAGE_ERROR);
fail_unless (msg == NULL, "Got unexpected %s message",
(msg) ? GST_MESSAGE_TYPE_NAME (msg) : "");
msg = gst_bus_timed_pop_filtered (test_bus, (3 + 1 + 1 + 1) * GST_SECOND,
GST_MESSAGE_ERROR | GST_MESSAGE_ELEMENT);
fail_unless (msg != NULL, "expected element message, but got nothing");
fail_unless_equals_int (GST_MESSAGE_TYPE (msg), GST_MESSAGE_ELEMENT);
gst_message_unref (msg);
msg = gst_bus_timed_pop_filtered (test_bus, GST_CLOCK_TIME_NONE,
GST_MESSAGE_APPLICATION);
fail_unless (msg != NULL, "expected application message, but got nothing");
fail_unless_equals_int (GST_MESSAGE_TYPE (msg), GST_MESSAGE_APPLICATION);
gst_message_unref (msg);
msg = gst_bus_timed_pop_filtered (test_bus, GST_CLOCK_TIME_NONE,
GST_MESSAGE_APPLICATION);
fail_unless (msg != NULL, "expected application message, but got nothing");
fail_unless_equals_int (GST_MESSAGE_TYPE (msg), GST_MESSAGE_APPLICATION);
gst_message_unref (msg);
msg = gst_bus_timed_pop_filtered (test_bus, GST_SECOND / 4,
GST_MESSAGE_TAG | GST_MESSAGE_ERROR);
fail_unless (msg == NULL, "Got unexpected %s message",
(msg) ? GST_MESSAGE_TYPE_NAME (msg) : "");
MAIN_STOP_THREADS ();
gst_object_unref (test_bus);
}
GST_END_TEST;
/* test that you get the messages with pop from another thread. */ /* test that you get the messages with pop from another thread. */
static gpointer static gpointer
pop_thread (gpointer data) pop_thread (gpointer data)
@ -307,6 +442,8 @@ gst_bus_suite (void)
tcase_add_test (tc_chain, test_watch_with_poll); tcase_add_test (tc_chain, test_watch_with_poll);
tcase_add_test (tc_chain, test_timed_pop); tcase_add_test (tc_chain, test_timed_pop);
tcase_add_test (tc_chain, test_timed_pop_thread); tcase_add_test (tc_chain, test_timed_pop_thread);
tcase_add_test (tc_chain, test_timed_pop_filtered);
tcase_add_test (tc_chain, test_timed_pop_filtered_with_timeout);
return s; return s;
} }