GstBusHandler -> GstBusFunc, return value has the same meaning as any other GSource (FALSE == remove source).

Original commit message from CVS:
* check/gst/gstbin.c: (pop_messages), (GST_START_TEST):
* check/gst/gstbus.c: (message_func_eos), (message_func_app),
(send_messages), (GST_START_TEST), (gstbus_suite):
* check/gst/gstpipeline.c: (GST_START_TEST):
* check/pipelines/cleanup.c: (run_pipeline):
* check/pipelines/simple_launch_lines.c: (run_pipeline),
(GST_START_TEST):
* gst/gstbus.c: (gst_bus_have_pending), (gst_bus_source_prepare),
(gst_bus_source_check), (gst_bus_source_dispatch),
(gst_bus_create_watch), (gst_bus_add_watch_full),
(gst_bus_add_watch), (poll_func), (poll_timeout), (gst_bus_poll):
* gst/gstbus.h:
* tools/gst-launch.c: (event_loop):
* tools/gst-md5sum.c: (event_loop):
GstBusHandler -> GstBusFunc, return value has the same meaning as
any other GSource (FALSE == remove source).
_add_watch() and _add_watch_full() now take a MessageType mask to
only handle specific types of messages.
_poll() returns the GstMessage instead of the message type to avoid
race conditions.
_have_pending() takes a MessageType mask now too.
Added testsuite for multiple bus watches.
Fix testsuites and applications for new bus API.
This commit is contained in:
Wim Taymans 2005-09-19 11:18:03 +00:00
parent 9edf72b191
commit 1c1af875d4
15 changed files with 395 additions and 176 deletions

View file

@ -1,3 +1,29 @@
2005-09-19 Wim Taymans <wim@fluendo.com>
* check/gst/gstbin.c: (pop_messages), (GST_START_TEST):
* check/gst/gstbus.c: (message_func_eos), (message_func_app),
(send_messages), (GST_START_TEST), (gstbus_suite):
* check/gst/gstpipeline.c: (GST_START_TEST):
* check/pipelines/cleanup.c: (run_pipeline):
* check/pipelines/simple_launch_lines.c: (run_pipeline),
(GST_START_TEST):
* gst/gstbus.c: (gst_bus_have_pending), (gst_bus_source_prepare),
(gst_bus_source_check), (gst_bus_source_dispatch),
(gst_bus_create_watch), (gst_bus_add_watch_full),
(gst_bus_add_watch), (poll_func), (poll_timeout), (gst_bus_poll):
* gst/gstbus.h:
* tools/gst-launch.c: (event_loop):
* tools/gst-md5sum.c: (event_loop):
GstBusHandler -> GstBusFunc, return value has the same meaning as
any other GSource (FALSE == remove source).
_add_watch() and _add_watch_full() now take a MessageType mask to
only handle specific types of messages.
_poll() returns the GstMessage instead of the message type to avoid
race conditions.
_have_pending() takes a MessageType mask now too.
Added testsuite for multiple bus watches.
Fix testsuites and applications for new bus API.
2005-09-19 Thomas Vander Stichele <thomas at apestaart dot org>
* check/Makefile.am:

View file

@ -31,10 +31,11 @@ pop_messages (GstBus * bus, int count)
GST_DEBUG ("popping %d messages", count);
for (i = 0; i < count; ++i) {
fail_unless (gst_bus_poll (bus, GST_MESSAGE_STATE_CHANGED, -1)
message = gst_bus_poll (bus, GST_MESSAGE_STATE_CHANGED, -1);
fail_unless (message && GST_MESSAGE_TYPE (message)
== GST_MESSAGE_STATE_CHANGED, "did not get GST_MESSAGE_STATE_CHANGED");
message = gst_bus_pop (bus);
gst_message_unref (message);
}
GST_DEBUG ("popped %d messages", count);
@ -121,11 +122,11 @@ GST_START_TEST (test_message_state_changed)
ASSERT_OBJECT_REFCOUNT (bin, "bin", 2);
/* get and unref the message, causing a decref on the bin */
fail_unless (gst_bus_poll (bus, GST_MESSAGE_STATE_CHANGED,
-1) == GST_MESSAGE_STATE_CHANGED,
"did not get GST_MESSAGE_STATE_CHANGED");
message = gst_bus_poll (bus, GST_MESSAGE_STATE_CHANGED, -1);
fail_unless (message && GST_MESSAGE_TYPE (message)
== GST_MESSAGE_STATE_CHANGED, "did not get GST_MESSAGE_STATE_CHANGED");
message = gst_bus_pop (bus);
gst_message_unref (message);
ASSERT_OBJECT_REFCOUNT (bin, "bin", 1);
@ -166,10 +167,10 @@ GST_START_TEST (test_message_state_changed_child)
ASSERT_OBJECT_REFCOUNT (bin, "bin", 2);
/* get and unref the message, causing a decref on the src */
fail_unless (gst_bus_poll (bus, GST_MESSAGE_STATE_CHANGED, -1)
message = gst_bus_poll (bus, GST_MESSAGE_STATE_CHANGED, -1);
fail_unless (message && GST_MESSAGE_TYPE (message)
== GST_MESSAGE_STATE_CHANGED, "did not get GST_MESSAGE_STATE_CHANGED");
message = gst_bus_pop (bus);
fail_unless (message->src == GST_OBJECT (src));
gst_message_unref (message);
@ -177,10 +178,10 @@ GST_START_TEST (test_message_state_changed_child)
ASSERT_OBJECT_REFCOUNT (bin, "bin", 2);
/* get and unref message 2, causing a decref on the bin */
fail_unless (gst_bus_poll (bus, GST_MESSAGE_STATE_CHANGED, -1)
message = gst_bus_poll (bus, GST_MESSAGE_STATE_CHANGED, -1);
fail_unless (message && GST_MESSAGE_TYPE (message)
== GST_MESSAGE_STATE_CHANGED, "did not get GST_MESSAGE_STATE_CHANGED");
message = gst_bus_pop (bus);
fail_unless (message->src == GST_OBJECT (bin));
gst_message_unref (message);
@ -341,7 +342,7 @@ GST_START_TEST (test_watch_for_state_change)
pop_messages (bus, 5);
fail_unless (gst_bus_have_pending (bus) == FALSE,
fail_unless (gst_bus_have_pending (bus, GST_MESSAGE_ANY) == FALSE,
"Unexpected messages on bus");
gst_bin_watch_for_state_change (GST_BIN (bin));
@ -349,7 +350,7 @@ GST_START_TEST (test_watch_for_state_change)
/* should get the bin's state change message now */
pop_messages (bus, 1);
fail_unless (gst_bus_have_pending (bus) == FALSE,
fail_unless (gst_bus_have_pending (bus, GST_MESSAGE_ANY) == FALSE,
"Unexpected messages on bus");
fail_unless (gst_element_set_state (GST_ELEMENT (bin), GST_STATE_PLAYING)
@ -364,7 +365,7 @@ GST_START_TEST (test_watch_for_state_change)
pop_messages (bus, 3);
fail_unless (gst_bus_have_pending (bus) == FALSE,
fail_unless (gst_bus_have_pending (bus, GST_MESSAGE_ANY) == FALSE,
"Unexpected messages on bus");
/* setting bin to NULL flushes the bus automatically */

View file

@ -19,11 +19,10 @@
* Boston, MA 02111-1307, USA.
*/
#include <gst/check/gstcheck.h>
static GstBus *test_bus = NULL;
static GMainLoop *main_loop;
#define NUM_MESSAGES 1000
#define NUM_THREADS 10
@ -99,8 +98,86 @@ GST_START_TEST (test_hammer_bus)
gst_object_unref ((GstObject *) test_bus);
}
GST_END_TEST Suite *
gstbus_suite (void)
GST_END_TEST static gboolean
message_func_eos (GstBus * bus, GstMessage * message, gpointer data)
{
const GstStructure *s;
gint i;
g_return_val_if_fail (GST_MESSAGE_TYPE (message) == GST_MESSAGE_EOS, FALSE);
GST_DEBUG ("got EOS message");
s = gst_message_get_structure (message);
if (!gst_structure_get_int (s, "msg_id", &i))
g_critical ("Invalid message");
return i != 9;
}
static gboolean
message_func_app (GstBus * bus, GstMessage * message, gpointer data)
{
const GstStructure *s;
gint i;
g_return_val_if_fail (GST_MESSAGE_TYPE (message) == GST_MESSAGE_APPLICATION,
FALSE);
GST_DEBUG ("got APP message");
s = gst_message_get_structure (message);
if (!gst_structure_get_int (s, "msg_id", &i))
g_critical ("Invalid message");
return i != 9;
}
static gboolean
send_messages (gpointer data)
{
GstMessage *m;
GstStructure *s;
gint i;
for (i = 0; i < 10; i++) {
s = gst_structure_new ("test_message", "msg_id", G_TYPE_INT, i, NULL);
m = gst_message_new_application (NULL, s);
gst_bus_post (test_bus, m);
s = gst_structure_new ("test_message", "msg_id", G_TYPE_INT, i, NULL);
m = gst_message_new_custom (GST_MESSAGE_EOS, NULL, s);
gst_bus_post (test_bus, m);
}
return FALSE;
}
/* test id adding two watches for different message types calls the
* respective callbacks. */
GST_START_TEST (test_watch)
{
guint id1, id2;
test_bus = gst_bus_new ();
main_loop = g_main_loop_new (NULL, FALSE);
id2 = gst_bus_add_watch (test_bus, GST_MESSAGE_EOS, message_func_eos, NULL);
id1 =
gst_bus_add_watch (test_bus, GST_MESSAGE_APPLICATION, message_func_app,
NULL);
g_idle_add ((GSourceFunc) send_messages, NULL);
while (g_main_context_pending (NULL))
g_main_context_iteration (NULL, FALSE);
g_source_remove (id1);
g_source_remove (id2);
g_main_loop_unref (main_loop);
gst_object_unref ((GstObject *) test_bus);
}
GST_END_TEST Suite * gstbus_suite (void)
{
Suite *s = suite_create ("GstBus");
TCase *tc_chain = tcase_create ("stresstest");
@ -109,6 +186,7 @@ gstbus_suite (void)
suite_add_tcase (s, tc_chain);
tcase_add_test (tc_chain, test_hammer_bus);
tcase_add_test (tc_chain, test_watch);
return s;
}

View file

@ -87,15 +87,15 @@ GST_START_TEST (test_async_state_change_fake)
while (!done) {
GstMessage *message;
GstState old, new;
GstMessageType type;
type = gst_bus_poll (bus, GST_MESSAGE_STATE_CHANGED, -1);
message = gst_bus_pop (bus);
gst_message_parse_state_changed (message, &old, &new);
GST_DEBUG_OBJECT (message->src, "state change from %d to %d", old, new);
if (message->src == GST_OBJECT (pipeline) && new == GST_STATE_PLAYING)
done = TRUE;
gst_message_unref (message);
message = gst_bus_poll (bus, GST_MESSAGE_STATE_CHANGED, -1);
if (message) {
gst_message_parse_state_changed (message, &old, &new);
GST_DEBUG_OBJECT (message->src, "state change from %d to %d", old, new);
if (message->src == GST_OBJECT (pipeline) && new == GST_STATE_PLAYING)
done = TRUE;
gst_message_unref (message);
}
}
g_object_set (G_OBJECT (pipeline), "play-timeout", 3 * GST_SECOND, NULL);

View file

@ -48,11 +48,14 @@ run_pipeline (GstElement * pipe, gchar * descr,
gst_element_set_state (pipe, GST_STATE_PLAYING);
while (1) {
revent = gst_bus_poll (bus, GST_MESSAGE_ANY, GST_SECOND / 2);
GstMessage *message = gst_bus_poll (bus, GST_MESSAGE_ANY, GST_SECOND / 2);
/* always have to pop the message before getting back into poll */
if (revent != GST_MESSAGE_UNKNOWN)
gst_message_unref (gst_bus_pop (bus));
if (message) {
revent = GST_MESSAGE_TYPE (message);
gst_message_unref (message);
} else {
revent = GST_MESSAGE_UNKNOWN;
}
if (revent == tevent) {
break;

View file

@ -59,11 +59,14 @@ run_pipeline (GstElement * pipe, gchar * descr,
}
while (1) {
revent = gst_bus_poll (bus, GST_MESSAGE_ANY, GST_SECOND / 2);
GstMessage *message = gst_bus_poll (bus, GST_MESSAGE_ANY, GST_SECOND / 2);
/* always have to pop the message before getting back into poll */
if (revent != GST_MESSAGE_UNKNOWN)
gst_message_unref (gst_bus_pop (bus));
if (message) {
revent = GST_MESSAGE_TYPE (message);
gst_message_unref (message);
} else {
revent = GST_MESSAGE_UNKNOWN;
}
if (revent == tevent) {
break;
@ -138,6 +141,7 @@ GST_START_TEST (test_stop_from_app)
GstElement *fakesrc, *fakesink, *pipeline;
GstBus *bus;
GstMessageType revent;
GstMessage *message;
assert_live_count (GST_TYPE_BUFFER, 0);
@ -159,17 +163,22 @@ GST_START_TEST (test_stop_from_app)
g_assert (bus);
/* will time out after half a second */
revent = gst_bus_poll (bus, GST_MESSAGE_APPLICATION, GST_SECOND / 2);
message = gst_bus_poll (bus, GST_MESSAGE_APPLICATION, GST_SECOND / 2);
if (message) {
revent = GST_MESSAGE_TYPE (message);
gst_message_unref (message);
} else {
revent = GST_MESSAGE_UNKNOWN;
}
g_return_if_fail (revent == GST_MESSAGE_APPLICATION);
gst_message_unref (gst_bus_pop (bus));
gst_element_set_state (pipeline, GST_STATE_NULL);
gst_object_unref (pipeline);
assert_live_count (GST_TYPE_BUFFER, 0);
}
GST_END_TEST Suite * simple_launch_lines_suite (void)
GST_END_TEST Suite *
simple_launch_lines_suite (void)
{
Suite *s = suite_create ("Pipelines");
TCase *tc_chain = tcase_create ("linear");

View file

@ -42,8 +42,8 @@
* up to the specified timeout value until one of the specified messages types
* is posted on the bus. The application can then _pop() the messages from the
* bus to handle them.
* Alternatively the application can register an asynchronous bus handler using
* gst_bus_add_watch_full() orgst_bus_add_watch(). This handler will receive
* Alternatively the application can register an asynchronous bus function using
* gst_bus_add_watch_full() orgst_bus_add_watch(). This function will receive
* messages a short while after they have been posted.
*
* It is also possible to get messages from the bus without any thread
@ -52,12 +52,7 @@
* message on the bus. This should only be used if the application is able
* to deal with messages from different threads.
*
* It is important to make sure that every message is popped from the bus at
* some point in time. Otherwise it will be presented to the watches (#GSource
* elements) again and again. One way to implement it is having one watch with a
* low priority (see gst_add_watch_full()) that pops all messages.
*
* Every #GstPipeline has one bus.
* Every #GstBin has one bus.
*/
#include <errno.h>
@ -324,26 +319,35 @@ is_flushing:
/**
* gst_bus_have_pending:
* @bus: a #GstBus to check
* @events: a mask of #GstMessageType, representing the set of message types to
* watch for.
*
* Check if there are pending messages on the bus that should be
* handled.
* Check if there are pending messages on the bus of the given types that
* should be handled.
*
* Returns: TRUE if there are messages on the bus to be handled.
*
* MT safe.
*/
gboolean
gst_bus_have_pending (GstBus * bus)
gst_bus_have_pending (GstBus * bus, GstMessageType events)
{
gint length;
GstMessage *message;
gboolean result;
g_return_val_if_fail (GST_IS_BUS (bus), FALSE);
g_mutex_lock (bus->queue_lock);
length = g_queue_get_length (bus->queue);
/* see if there is a message on the bus that satisfies the
* event mask */
message = g_queue_peek_head (bus->queue);
if (message)
result = (GST_MESSAGE_TYPE (message) & events) != 0;
else
result = FALSE;
g_mutex_unlock (bus->queue_lock);
return (length > 0);
return result;
}
/**
@ -470,29 +474,34 @@ typedef struct
{
GSource source;
GstBus *bus;
GstMessageType events;
} GstBusSource;
static gboolean
gst_bus_source_prepare (GSource * source, gint * timeout)
{
GstBusSource *bsrc = (GstBusSource *) source;
*timeout = -1;
return gst_bus_have_pending (((GstBusSource *) source)->bus);
return gst_bus_have_pending (bsrc->bus, bsrc->events);
}
static gboolean
gst_bus_source_check (GSource * source)
{
return gst_bus_have_pending (((GstBusSource *) source)->bus);
GstBusSource *bsrc = (GstBusSource *) source;
return gst_bus_have_pending (bsrc->bus, bsrc->events);
}
static gboolean
gst_bus_source_dispatch (GSource * source, GSourceFunc callback,
gpointer user_data)
{
GstBusHandler handler = (GstBusHandler) callback;
GstBusFunc handler = (GstBusFunc) callback;
GstBusSource *bsource = (GstBusSource *) source;
GstMessage *message;
gboolean needs_pop = TRUE;
gboolean keep;
GstBus *bus;
g_return_val_if_fail (bsource != NULL, FALSE);
@ -501,34 +510,20 @@ gst_bus_source_dispatch (GSource * source, GSourceFunc callback,
g_return_val_if_fail (GST_IS_BUS (bus), FALSE);
message = gst_bus_peek (bus);
GST_DEBUG_OBJECT (bus, "source %p have message %p", source, message);
g_return_val_if_fail (message != NULL, TRUE);
message = gst_bus_pop (bus);
g_return_val_if_fail (message != NULL, FALSE);
if (!handler)
goto no_handler;
GST_DEBUG_OBJECT (bus, "source %p calling dispatch with %p", source, message);
needs_pop = handler (bus, message, user_data);
keep = handler (bus, message, user_data);
gst_message_unref (message);
GST_DEBUG_OBJECT (bus, "source %p handler returns %d", source, needs_pop);
if (needs_pop) {
message = gst_bus_pop (bus);
if (message) {
gst_message_unref (message);
} else {
/* after executing the handler, the app could have disposed
* the pipeline and set the bus to flushing. It is possible
* then that there are no more messages on the bus. this is
* not a problem. */
GST_DEBUG ("handler requested pop but no message on the bus");
}
}
return TRUE;
GST_DEBUG_OBJECT (bus, "source %p handler returns %d", source, keep);
return keep;
no_handler:
{
@ -558,13 +553,19 @@ static GSourceFuncs gst_bus_source_funcs = {
/**
* gst_bus_create_watch:
* @bus: a #GstBus to create the watch for
* @events: a mask of #GstMessageType, representing the set of message types to
* watch for.
*
* Create watch for this bus.
* Create watch for this bus. The source will only act on messages of the
* given types, messages of other types will simply remain on the bus and
* this GSource will not be dispatched again before the message is popped off
* the bus. For this reason one typically has a low priority GSource that
* pops all remaining messages from the bus not handled by the other GSources.
*
* Returns: A #GSource that can be added to a mainloop.
*/
GSource *
gst_bus_create_watch (GstBus * bus)
gst_bus_create_watch (GstBus * bus, GstMessageType events)
{
GstBusSource *source;
@ -574,6 +575,7 @@ gst_bus_create_watch (GstBus * bus)
sizeof (GstBusSource));
gst_object_ref (bus);
source->bus = bus;
source->events = events;
return (GSource *) source;
}
@ -582,34 +584,37 @@ gst_bus_create_watch (GstBus * bus)
* gst_bus_add_watch_full:
* @bus: a #GstBus to create the watch for.
* @priority: The priority of the watch.
* @handler: A function to call when a message is received.
* @user_data: user data passed to @handler.
* @events: a mask of #GstMessageType, representing the set of message types to
* watch for.
* @func: A function to call when a message is received.
* @user_data: user data passed to @func.
* @notify: the function to call when the source is removed.
*
* Adds the bus to the mainloop with the given priority. If the handler returns
* TRUE, the message will then be popped off the queue. When the handler is
* called, the message belongs to the caller; if you want to keep a copy of it,
* call gst_message_ref before leaving the handler.
* Adds the bus to the mainloop with the given priority. If the func returns
* FALSE, the func will be removed.
*
* When the func is called, the message belongs to the caller; if you want to
* keep a copy of it, call gst_message_ref before leaving the func.
*
* Returns: The event source id.
*
* MT safe.
*/
guint
gst_bus_add_watch_full (GstBus * bus, gint priority,
GstBusHandler handler, gpointer user_data, GDestroyNotify notify)
gst_bus_add_watch_full (GstBus * bus, gint priority, GstMessageType events,
GstBusFunc func, gpointer user_data, GDestroyNotify notify)
{
guint id;
GSource *source;
g_return_val_if_fail (GST_IS_BUS (bus), 0);
source = gst_bus_create_watch (bus);
source = gst_bus_create_watch (bus, events);
if (priority != G_PRIORITY_DEFAULT)
g_source_set_priority (source, priority);
g_source_set_callback (source, (GSourceFunc) handler, user_data, notify);
g_source_set_callback (source, (GSourceFunc) func, user_data, notify);
id = g_source_attach (source, NULL);
g_source_unref (source);
@ -621,8 +626,10 @@ gst_bus_add_watch_full (GstBus * bus, gint priority,
/**
* gst_bus_add_watch:
* @bus: a #GstBus to create the watch for
* @handler: A function to call when a message is received.
* @user_data: user data passed to @handler.
* @events: a mask of #GstMessageType, representing the set of message types to
* watch for.
* @func: A function to call when a message is received.
* @user_data: user data passed to @func.
*
* Adds the bus to the mainloop with the default priority.
*
@ -631,10 +638,11 @@ gst_bus_add_watch_full (GstBus * bus, gint priority,
* MT safe.
*/
guint
gst_bus_add_watch (GstBus * bus, GstBusHandler handler, gpointer user_data)
gst_bus_add_watch (GstBus * bus, GstMessageType events, GstBusFunc func,
gpointer user_data)
{
return gst_bus_add_watch_full (bus, G_PRIORITY_DEFAULT, handler, user_data,
NULL);
return gst_bus_add_watch_full (bus, G_PRIORITY_DEFAULT, events, func,
user_data, NULL);
}
typedef struct
@ -643,25 +651,26 @@ typedef struct
guint timeout_id;
gboolean source_running;
GstMessageType events;
GstMessageType revent;
GstMessage *message;
} GstBusPollData;
static gboolean
poll_handler (GstBus * bus, GstMessage * message, GstBusPollData * poll_data)
poll_func (GstBus * bus, GstMessage * message, GstBusPollData * poll_data)
{
if (!g_main_loop_is_running (poll_data->loop))
return FALSE;
return TRUE;
if (GST_MESSAGE_TYPE (message) & poll_data->events) {
poll_data->revent = GST_MESSAGE_TYPE (message);
g_return_val_if_fail (poll_data->message == NULL, FALSE);
/* keep ref to message */
poll_data->message = gst_message_ref (message);
g_main_loop_quit (poll_data->loop);
/* keep the message on the queue */
return FALSE;
} else {
/* pop and unref the message */
return TRUE;
/* don't remove the source. */
}
/* we always keep the source alive so that we don't accidentialy
* free the poll_data */
return TRUE;
}
static gboolean
@ -669,8 +678,9 @@ poll_timeout (GstBusPollData * poll_data)
{
g_main_loop_quit (poll_data->loop);
/* returning FALSE will remove the source id */
return FALSE;
/* we don't remove the GSource as this would free our poll_data,
* which we still need */
return TRUE;
}
static void
@ -706,24 +716,21 @@ poll_destroy_timeout (GstBusPollData * poll_data)
*
* This function will enter the default mainloop while polling.
*
* Returns: The type of the message that was received, or GST_MESSAGE_UNKNOWN if
* the poll timed out. The message will remain in the bus queue; you will need
* to gst_bus_pop() it off before entering gst_bus_poll() again.
* Returns: The message that was received, or NULL if the poll timed out.
* The message is taken from the bus and needs to be unreffed after usage.
*/
GstMessageType
GstMessage *
gst_bus_poll (GstBus * bus, GstMessageType events, GstClockTimeDiff timeout)
{
GstBusPollData *poll_data;
GstMessageType ret;
GstMessage *ret;
guint id;
poll_data = g_new0 (GstBusPollData, 1);
g_return_val_if_fail (poll_data != NULL, GST_MESSAGE_UNKNOWN);
poll_data->source_running = TRUE;
poll_data->loop = g_main_loop_new (NULL, FALSE);
poll_data->events = events;
poll_data->revent = GST_MESSAGE_UNKNOWN;
poll_data->message = NULL;
if (timeout >= 0)
poll_data->timeout_id = g_timeout_add_full (G_PRIORITY_DEFAULT_IDLE,
@ -732,10 +739,12 @@ gst_bus_poll (GstBus * bus, GstMessageType events, GstClockTimeDiff timeout)
else
poll_data->timeout_id = 0;
id = gst_bus_add_watch_full (bus, G_PRIORITY_DEFAULT_IDLE,
(GstBusHandler) poll_handler, poll_data, (GDestroyNotify) poll_destroy);
id = gst_bus_add_watch_full (bus, G_PRIORITY_DEFAULT_IDLE, GST_MESSAGE_ANY,
(GstBusFunc) poll_func, poll_data, (GDestroyNotify) poll_destroy);
g_main_loop_run (poll_data->loop);
ret = poll_data->revent;
/* holds a ref */
ret = poll_data->message;
if (poll_data->timeout_id)
g_source_remove (poll_data->timeout_id);
@ -743,7 +752,7 @@ gst_bus_poll (GstBus * bus, GstMessageType events, GstClockTimeDiff timeout)
/* poll_data may get destroyed at any time now */
g_source_remove (id);
GST_DEBUG_OBJECT (bus, "finished poll with messagetype %d", ret);
GST_DEBUG_OBJECT (bus, "finished poll with message %p", ret);
return ret;
}

View file

@ -59,24 +59,29 @@ typedef enum
* @data: user data that has been given, when registering the handler
*
* Handler will be invoked synchronously, when a new message has been injected
* into the bus.
* into the bus. This function is mostly used internally. Only one sync handler
* can be attached to a given bus.
*
* Returns: #GstBusSyncReply stating what to do with the message
*/
typedef GstBusSyncReply (*GstBusSyncHandler) (GstBus * bus, GstMessage * message, gpointer data);
typedef GstBusSyncReply (*GstBusSyncHandler) (GstBus * bus, GstMessage * message, gpointer data);
/**
* GstBusHandler:
* GstBusFunc:
* @bus: the #GstBus that sent the message
* @message: the #GstMessage
* @data: user data that has been given, when registering the handler
*
* Handler will be invoked asynchronously, after a new message has been injected
* into the bus. Return %TRUE if the message has been handled. It will then be
* taken from the bus and _unref()'ed.
* Specifies the type of function passed to #gst_bus_add_watch() or
* #gst_bus_add_watch_full(), which is called from the mainloop when a message
* is available on the bus.
*
* Returns: %TRUE if message should be taken from the bus
* The message passed to the function will be unreffed after execution of this
* function so it should not be freed in the function.
*
* Returns: %FALSE if the event source should be removed.
*/
typedef gboolean (*GstBusHandler) (GstBus * bus, GstMessage * message, gpointer data);
typedef gboolean (*GstBusFunc) (GstBus * bus, GstMessage * message, gpointer data);
struct _GstBus
{
@ -107,7 +112,7 @@ GstBus* gst_bus_new (void);
gboolean gst_bus_post (GstBus * bus, GstMessage * message);
gboolean gst_bus_have_pending (GstBus * bus);
gboolean gst_bus_have_pending (GstBus * bus, GstMessageType events);
GstMessage * gst_bus_peek (GstBus * bus);
GstMessage * gst_bus_pop (GstBus * bus);
void gst_bus_set_flushing (GstBus * bus, gboolean flushing);
@ -115,16 +120,19 @@ void gst_bus_set_flushing (GstBus * bus, gboolean flushing);
void gst_bus_set_sync_handler (GstBus * bus, GstBusSyncHandler func,
gpointer data);
GSource * gst_bus_create_watch (GstBus * bus);
GSource * gst_bus_create_watch (GstBus * bus, GstMessageType events);
guint gst_bus_add_watch_full (GstBus * bus,
gint priority,
GstBusHandler handler,
GstMessageType events,
GstBusFunc func,
gpointer user_data,
GDestroyNotify notify);
guint gst_bus_add_watch (GstBus * bus,
GstBusHandler handler,
GstMessageType events,
GstBusFunc func,
gpointer user_data);
GstMessageType gst_bus_poll (GstBus *bus, GstMessageType events,
GstMessage* gst_bus_poll (GstBus *bus, GstMessageType events,
GstClockTimeDiff timeout);
G_END_DECLS

View file

@ -31,10 +31,11 @@ pop_messages (GstBus * bus, int count)
GST_DEBUG ("popping %d messages", count);
for (i = 0; i < count; ++i) {
fail_unless (gst_bus_poll (bus, GST_MESSAGE_STATE_CHANGED, -1)
message = gst_bus_poll (bus, GST_MESSAGE_STATE_CHANGED, -1);
fail_unless (message && GST_MESSAGE_TYPE (message)
== GST_MESSAGE_STATE_CHANGED, "did not get GST_MESSAGE_STATE_CHANGED");
message = gst_bus_pop (bus);
gst_message_unref (message);
}
GST_DEBUG ("popped %d messages", count);
@ -121,11 +122,11 @@ GST_START_TEST (test_message_state_changed)
ASSERT_OBJECT_REFCOUNT (bin, "bin", 2);
/* get and unref the message, causing a decref on the bin */
fail_unless (gst_bus_poll (bus, GST_MESSAGE_STATE_CHANGED,
-1) == GST_MESSAGE_STATE_CHANGED,
"did not get GST_MESSAGE_STATE_CHANGED");
message = gst_bus_poll (bus, GST_MESSAGE_STATE_CHANGED, -1);
fail_unless (message && GST_MESSAGE_TYPE (message)
== GST_MESSAGE_STATE_CHANGED, "did not get GST_MESSAGE_STATE_CHANGED");
message = gst_bus_pop (bus);
gst_message_unref (message);
ASSERT_OBJECT_REFCOUNT (bin, "bin", 1);
@ -166,10 +167,10 @@ GST_START_TEST (test_message_state_changed_child)
ASSERT_OBJECT_REFCOUNT (bin, "bin", 2);
/* get and unref the message, causing a decref on the src */
fail_unless (gst_bus_poll (bus, GST_MESSAGE_STATE_CHANGED, -1)
message = gst_bus_poll (bus, GST_MESSAGE_STATE_CHANGED, -1);
fail_unless (message && GST_MESSAGE_TYPE (message)
== GST_MESSAGE_STATE_CHANGED, "did not get GST_MESSAGE_STATE_CHANGED");
message = gst_bus_pop (bus);
fail_unless (message->src == GST_OBJECT (src));
gst_message_unref (message);
@ -177,10 +178,10 @@ GST_START_TEST (test_message_state_changed_child)
ASSERT_OBJECT_REFCOUNT (bin, "bin", 2);
/* get and unref message 2, causing a decref on the bin */
fail_unless (gst_bus_poll (bus, GST_MESSAGE_STATE_CHANGED, -1)
message = gst_bus_poll (bus, GST_MESSAGE_STATE_CHANGED, -1);
fail_unless (message && GST_MESSAGE_TYPE (message)
== GST_MESSAGE_STATE_CHANGED, "did not get GST_MESSAGE_STATE_CHANGED");
message = gst_bus_pop (bus);
fail_unless (message->src == GST_OBJECT (bin));
gst_message_unref (message);
@ -341,7 +342,7 @@ GST_START_TEST (test_watch_for_state_change)
pop_messages (bus, 5);
fail_unless (gst_bus_have_pending (bus) == FALSE,
fail_unless (gst_bus_have_pending (bus, GST_MESSAGE_ANY) == FALSE,
"Unexpected messages on bus");
gst_bin_watch_for_state_change (GST_BIN (bin));
@ -349,7 +350,7 @@ GST_START_TEST (test_watch_for_state_change)
/* should get the bin's state change message now */
pop_messages (bus, 1);
fail_unless (gst_bus_have_pending (bus) == FALSE,
fail_unless (gst_bus_have_pending (bus, GST_MESSAGE_ANY) == FALSE,
"Unexpected messages on bus");
fail_unless (gst_element_set_state (GST_ELEMENT (bin), GST_STATE_PLAYING)
@ -364,7 +365,7 @@ GST_START_TEST (test_watch_for_state_change)
pop_messages (bus, 3);
fail_unless (gst_bus_have_pending (bus) == FALSE,
fail_unless (gst_bus_have_pending (bus, GST_MESSAGE_ANY) == FALSE,
"Unexpected messages on bus");
/* setting bin to NULL flushes the bus automatically */

View file

@ -19,11 +19,10 @@
* Boston, MA 02111-1307, USA.
*/
#include <gst/check/gstcheck.h>
static GstBus *test_bus = NULL;
static GMainLoop *main_loop;
#define NUM_MESSAGES 1000
#define NUM_THREADS 10
@ -99,8 +98,86 @@ GST_START_TEST (test_hammer_bus)
gst_object_unref ((GstObject *) test_bus);
}
GST_END_TEST Suite *
gstbus_suite (void)
GST_END_TEST static gboolean
message_func_eos (GstBus * bus, GstMessage * message, gpointer data)
{
const GstStructure *s;
gint i;
g_return_val_if_fail (GST_MESSAGE_TYPE (message) == GST_MESSAGE_EOS, FALSE);
GST_DEBUG ("got EOS message");
s = gst_message_get_structure (message);
if (!gst_structure_get_int (s, "msg_id", &i))
g_critical ("Invalid message");
return i != 9;
}
static gboolean
message_func_app (GstBus * bus, GstMessage * message, gpointer data)
{
const GstStructure *s;
gint i;
g_return_val_if_fail (GST_MESSAGE_TYPE (message) == GST_MESSAGE_APPLICATION,
FALSE);
GST_DEBUG ("got APP message");
s = gst_message_get_structure (message);
if (!gst_structure_get_int (s, "msg_id", &i))
g_critical ("Invalid message");
return i != 9;
}
static gboolean
send_messages (gpointer data)
{
GstMessage *m;
GstStructure *s;
gint i;
for (i = 0; i < 10; i++) {
s = gst_structure_new ("test_message", "msg_id", G_TYPE_INT, i, NULL);
m = gst_message_new_application (NULL, s);
gst_bus_post (test_bus, m);
s = gst_structure_new ("test_message", "msg_id", G_TYPE_INT, i, NULL);
m = gst_message_new_custom (GST_MESSAGE_EOS, NULL, s);
gst_bus_post (test_bus, m);
}
return FALSE;
}
/* test id adding two watches for different message types calls the
* respective callbacks. */
GST_START_TEST (test_watch)
{
guint id1, id2;
test_bus = gst_bus_new ();
main_loop = g_main_loop_new (NULL, FALSE);
id2 = gst_bus_add_watch (test_bus, GST_MESSAGE_EOS, message_func_eos, NULL);
id1 =
gst_bus_add_watch (test_bus, GST_MESSAGE_APPLICATION, message_func_app,
NULL);
g_idle_add ((GSourceFunc) send_messages, NULL);
while (g_main_context_pending (NULL))
g_main_context_iteration (NULL, FALSE);
g_source_remove (id1);
g_source_remove (id2);
g_main_loop_unref (main_loop);
gst_object_unref ((GstObject *) test_bus);
}
GST_END_TEST Suite * gstbus_suite (void)
{
Suite *s = suite_create ("GstBus");
TCase *tc_chain = tcase_create ("stresstest");
@ -109,6 +186,7 @@ gstbus_suite (void)
suite_add_tcase (s, tc_chain);
tcase_add_test (tc_chain, test_hammer_bus);
tcase_add_test (tc_chain, test_watch);
return s;
}

View file

@ -87,15 +87,15 @@ GST_START_TEST (test_async_state_change_fake)
while (!done) {
GstMessage *message;
GstState old, new;
GstMessageType type;
type = gst_bus_poll (bus, GST_MESSAGE_STATE_CHANGED, -1);
message = gst_bus_pop (bus);
gst_message_parse_state_changed (message, &old, &new);
GST_DEBUG_OBJECT (message->src, "state change from %d to %d", old, new);
if (message->src == GST_OBJECT (pipeline) && new == GST_STATE_PLAYING)
done = TRUE;
gst_message_unref (message);
message = gst_bus_poll (bus, GST_MESSAGE_STATE_CHANGED, -1);
if (message) {
gst_message_parse_state_changed (message, &old, &new);
GST_DEBUG_OBJECT (message->src, "state change from %d to %d", old, new);
if (message->src == GST_OBJECT (pipeline) && new == GST_STATE_PLAYING)
done = TRUE;
gst_message_unref (message);
}
}
g_object_set (G_OBJECT (pipeline), "play-timeout", 3 * GST_SECOND, NULL);

View file

@ -48,11 +48,14 @@ run_pipeline (GstElement * pipe, gchar * descr,
gst_element_set_state (pipe, GST_STATE_PLAYING);
while (1) {
revent = gst_bus_poll (bus, GST_MESSAGE_ANY, GST_SECOND / 2);
GstMessage *message = gst_bus_poll (bus, GST_MESSAGE_ANY, GST_SECOND / 2);
/* always have to pop the message before getting back into poll */
if (revent != GST_MESSAGE_UNKNOWN)
gst_message_unref (gst_bus_pop (bus));
if (message) {
revent = GST_MESSAGE_TYPE (message);
gst_message_unref (message);
} else {
revent = GST_MESSAGE_UNKNOWN;
}
if (revent == tevent) {
break;

View file

@ -59,11 +59,14 @@ run_pipeline (GstElement * pipe, gchar * descr,
}
while (1) {
revent = gst_bus_poll (bus, GST_MESSAGE_ANY, GST_SECOND / 2);
GstMessage *message = gst_bus_poll (bus, GST_MESSAGE_ANY, GST_SECOND / 2);
/* always have to pop the message before getting back into poll */
if (revent != GST_MESSAGE_UNKNOWN)
gst_message_unref (gst_bus_pop (bus));
if (message) {
revent = GST_MESSAGE_TYPE (message);
gst_message_unref (message);
} else {
revent = GST_MESSAGE_UNKNOWN;
}
if (revent == tevent) {
break;
@ -138,6 +141,7 @@ GST_START_TEST (test_stop_from_app)
GstElement *fakesrc, *fakesink, *pipeline;
GstBus *bus;
GstMessageType revent;
GstMessage *message;
assert_live_count (GST_TYPE_BUFFER, 0);
@ -159,17 +163,22 @@ GST_START_TEST (test_stop_from_app)
g_assert (bus);
/* will time out after half a second */
revent = gst_bus_poll (bus, GST_MESSAGE_APPLICATION, GST_SECOND / 2);
message = gst_bus_poll (bus, GST_MESSAGE_APPLICATION, GST_SECOND / 2);
if (message) {
revent = GST_MESSAGE_TYPE (message);
gst_message_unref (message);
} else {
revent = GST_MESSAGE_UNKNOWN;
}
g_return_if_fail (revent == GST_MESSAGE_APPLICATION);
gst_message_unref (gst_bus_pop (bus));
gst_element_set_state (pipeline, GST_STATE_NULL);
gst_object_unref (pipeline);
assert_live_count (GST_TYPE_BUFFER, 0);
}
GST_END_TEST Suite * simple_launch_lines_suite (void)
GST_END_TEST Suite *
simple_launch_lines_suite (void)
{
Suite *s = suite_create ("Pipelines");
TCase *tc_chain = tcase_create ("linear");

View file

@ -356,7 +356,6 @@ static gboolean
event_loop (GstElement * pipeline, gboolean blocking)
{
GstBus *bus;
GstMessageType revent;
GstMessage *message = NULL;
bus = gst_element_get_bus (GST_ELEMENT (pipeline));
@ -364,17 +363,14 @@ event_loop (GstElement * pipeline, gboolean blocking)
g_timeout_add (50, (GSourceFunc) check_intr, pipeline);
while (TRUE) {
revent = gst_bus_poll (bus, GST_MESSAGE_ANY, blocking ? -1 : 0);
message = gst_bus_poll (bus, GST_MESSAGE_ANY, blocking ? -1 : 0);
/* if the poll timed out, only when !blocking */
if (revent == GST_MESSAGE_UNKNOWN) {
if (message == NULL) {
gst_object_unref (bus);
return FALSE;
}
message = gst_bus_pop (bus);
g_return_val_if_fail (message != NULL, TRUE);
if (messages) {
const GstStructure *s;
@ -390,7 +386,7 @@ event_loop (GstElement * pipeline, gboolean blocking)
}
}
switch (revent) {
switch (GST_MESSAGE_TYPE (message)) {
case GST_MESSAGE_EOS:
g_print (_
("Got EOS from element \"%s\".\n"),

View file

@ -12,18 +12,16 @@ static gboolean
event_loop (GstElement * pipeline)
{
GstBus *bus;
GstMessageType revent;
GstMessage *message = NULL;
bus = gst_element_get_bus (GST_ELEMENT (pipeline));
while (TRUE) {
revent = gst_bus_poll (bus, GST_MESSAGE_ANY, -1);
message = gst_bus_poll (bus, GST_MESSAGE_ANY, -1);
message = gst_bus_pop (bus);
g_return_val_if_fail (message != NULL, TRUE);
switch (revent) {
switch (GST_MESSAGE_TYPE (message)) {
case GST_MESSAGE_EOS:
gst_message_unref (message);
return FALSE;