gst/gstmessage.h gst/gstmessage.c (_gst_message_free)

Original commit message from CVS:
2005-02-23  Andy Wingo  <wingo@pobox.com>

* gst/gstmessage.h
* gst/gstmessage.c (_gst_message_free)
(gst_message_new_application): New message type, APPLICATION,
explicitly for use by applications.

* gst/gstbus.c (gst_bus_post): Only write a byte to the wakeup
socket if the queue is coming from an empty state. Fixes a bug
where posting a message could block, waiting for someone to read
out bytes from the socket.
(bus_watch_callback): Renamed from bus_callback, let gst_bus_pop
handle the socket read.
(bus_watch_destroy): Renamed from bus_destroy.

* check/Makefile.am: Re-enable the gst-register test so we can
deal with elements here. Add the gstbus tests.

* check/gst/gstbus.c: New check, spawns off a bunch of threads all
posting messages to a bus, then reads to see if they are in the
right order.

* check/pipelines/simple_launch_lines.c: Add some tests, and make
sure to pop the message off the bus after the poll.

* gst/gstbus.c (gst_bus_pop): Read off the control byte if the
queue becomes empty.
This commit is contained in:
Andy Wingo 2005-02-23 16:08:21 +00:00
parent b152bc1c9c
commit 0d557a7bc2
8 changed files with 196 additions and 69 deletions

View file

@ -1,3 +1,31 @@
2005-02-23 Andy Wingo <wingo@pobox.com>
* gst/gstmessage.h
* gst/gstmessage.c (_gst_message_free)
(gst_message_new_application): New message type, APPLICATION,
explicitly for use by applications.
* gst/gstbus.c (gst_bus_post): Only write a byte to the wakeup
socket if the queue is coming from an empty state. Fixes a bug
where posting a message could block, waiting for someone to read
out bytes from the socket.
(bus_watch_callback): Renamed from bus_callback, let gst_bus_pop
handle the socket read.
(bus_watch_destroy): Renamed from bus_destroy.
* check/Makefile.am: Re-enable the gst-register test so we can
deal with elements here. Add the gstbus tests.
* check/gst/gstbus.c: New check, spawns off a bunch of threads all
posting messages to a bus, then reads to see if they are in the
right order.
* check/pipelines/simple_launch_lines.c: Add some tests, and make
sure to pop the message off the bus after the poll.
* gst/gstbus.c (gst_bus_pop): Read off the control byte if the
queue becomes empty.
2005-02-23 Wim Taymans <wim@fluendo.com>
* docs/design/part-states.txt:

View file

@ -5,11 +5,9 @@ TESTS_ENVIRONMENT=\
plugindir = $(libdir)/gstreamer-@GST_MAJORMINOR@
# make all tests depend on the versioned gst-register
#$(TESTS): $(top_builddir)/tools/gst-register-@GST_MAJORMINOR@
$(TESTS): $(top_builddir)/tools/gst-register-@GST_MAJORMINOR@
# rebuild gst-register-@GST_MAJORMINOR@ if needed
# the EXEEXT is because am 1.6 complained about overrides
#$(top_builddir)/tools/gst-register-@GST_MAJORMINOR@$(EXEEXT):
$(top_builddir)/tools/gst-register-@GST_MAJORMINOR@:
cd $(top_builddir)/tools && make
@ -20,9 +18,8 @@ install-pluginLTLIBRARIES:
# dumps have PIDs appended
CLEANFILES = core.*
#TESTS = $(top_builddir)/tools/gst-register-@GST_MAJORMINOR@
TESTS = \
TESTS = $(top_builddir)/tools/gst-register-@GST_MAJORMINOR@ \
gst/gstbus \
gst/gstcaps \
gst/gstdata \
gst/gstobject \

View file

@ -1,7 +1,7 @@
/* GStreamer
* Copyright (C) 2005 Andy Wingo <wingo@pobox.com>
*
* gstcaps.c: Unit test for GstCaps
* simple_launch_lines.c: Unit test for simple pipelines
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
@ -37,7 +37,8 @@ setup_pipeline (gchar * pipe_descr)
the poll call will time out after half a second.
*/
static void
run_pipeline (GstElement * pipe, GstMessageType events, GstMessageType tevent)
run_pipeline (GstElement * pipe, gchar * descr,
GstMessageType events, GstMessageType tevent)
{
GstBus *bus;
GstMessageType revent;
@ -48,15 +49,22 @@ run_pipeline (GstElement * pipe, GstMessageType events, GstMessageType tevent)
while (1) {
revent = gst_bus_poll (bus, GST_MESSAGE_ANY, GST_SECOND / 2);
/* always have to pop the message before getting back into poll */
if (revent != GST_MESSAGE_UNKNOWN)
gst_message_unref (gst_bus_pop (bus));
if (revent == tevent) {
break;
} else if (revent == GST_MESSAGE_UNKNOWN) {
g_critical ("Unexpected timeout in gst_bus_poll, looking for %d", tevent);
g_critical ("Unexpected timeout in gst_bus_poll, looking for %d: %s",
tevent, descr);
break;
} else if (revent & events) {
continue;
}
g_critical ("Unexpected message received of type %d!", revent);
g_critical ("Unexpected message received of type %d, looking for %d: %s",
revent, tevent, descr);
}
gst_element_set_state (pipe, GST_STATE_NULL);
@ -67,9 +75,25 @@ START_TEST (test_2_elements)
{
gchar *s;
s = "fakesrc ! fakesink";
run_pipeline (setup_pipeline (s),
s = "fakesrc has-loop=false ! fakesink has-loop=true";
run_pipeline (setup_pipeline (s), s,
GST_MESSAGE_STATE_CHANGED, GST_MESSAGE_UNKNOWN);
s = "fakesrc has-loop=true ! fakesink has-loop=false";
run_pipeline (setup_pipeline (s), s,
GST_MESSAGE_STATE_CHANGED, GST_MESSAGE_UNKNOWN);
s = "fakesrc has-loop=false num-buffers=10 ! fakesink has-loop=true";
run_pipeline (setup_pipeline (s), s,
GST_MESSAGE_STATE_CHANGED, GST_MESSAGE_EOS);
s = "fakesrc has-loop=true num-buffers=10 ! fakesink has-loop=false";
run_pipeline (setup_pipeline (s), s,
GST_MESSAGE_STATE_CHANGED, GST_MESSAGE_EOS);
s = "fakesrc has-loop=false ! fakesink has-loop=false";
ASSERT_CRITICAL (run_pipeline (setup_pipeline (s), s,
GST_MESSAGE_STATE_CHANGED, GST_MESSAGE_UNKNOWN));
}
END_TEST Suite * simple_launch_lines_suite (void)
{

View file

@ -197,11 +197,13 @@ gst_bus_post (GstBus * bus, GstMessage * message)
GstBusSyncReply reply = GST_BUS_PASS;
GstBusSyncHandler handler;
gpointer handler_data;
gboolean need_write = FALSE;
ssize_t write_ret = -1;
g_return_val_if_fail (GST_IS_BUS (bus), FALSE);
g_return_val_if_fail (GST_IS_MESSAGE (message), FALSE);
//g_print ("posting message on bus, type %d\n", GST_MESSAGE_TYPE (message));
GST_DEBUG_OBJECT (bus, "posting message on bus");
GST_LOCK (bus);
@ -222,20 +224,27 @@ gst_bus_post (GstBus * bus, GstMessage * message)
case GST_BUS_PASS:
/* pass the message to the async queue */
g_mutex_lock (bus->queue_lock);
if (g_queue_get_length (bus->queue) == 0)
need_write = TRUE;
g_queue_push_tail (bus->queue, message);
g_mutex_unlock (bus->queue_lock);
c = 'p';
errno = EAGAIN;
while (write_ret == -1) {
switch (errno) {
case EAGAIN:
case EINTR:
break;
default:
perror ("gst_bus_post: could not write to fd");
return FALSE;
if (g_queue_get_length (bus->queue) == 0)
need_write = TRUE;
if (need_write) {
c = 'p';
errno = EAGAIN;
while (write_ret == -1) {
switch (errno) {
case EAGAIN:
case EINTR:
break;
default:
perror ("gst_bus_post: could not write to fd");
return FALSE;
}
write_ret = write (bus->control_socket[1], &c, 1);
}
write_ret = write (bus->control_socket[1], &c, 1);
}
break;
case GST_BUS_ASYNC:
@ -255,21 +264,27 @@ gst_bus_post (GstBus * bus, GstMessage * message)
* the cond will be signalled and we can continue */
g_mutex_lock (lock);
g_mutex_lock (bus->queue_lock);
if (g_queue_get_length (bus->queue) == 0)
need_write = TRUE;
g_queue_push_tail (bus->queue, message);
g_mutex_unlock (bus->queue_lock);
c = 'p';
errno = EAGAIN;
while (write_ret == -1) {
switch (errno) {
case EAGAIN:
case EINTR:
break;
default:
perror ("gst_bus_post: could not write to fd");
return FALSE;
if (need_write) {
c = 'p';
errno = EAGAIN;
while (write_ret == -1) {
switch (errno) {
case EAGAIN:
case EINTR:
break;
default:
perror ("gst_bus_post: could not write to fd");
return FALSE;
}
write_ret = write (bus->control_socket[1], &c, 1);
}
write_ret = write (bus->control_socket[1], &c, 1);
}
/* now block till the message is freed */
g_cond_wait (cond, lock);
g_mutex_unlock (lock);
@ -324,13 +339,36 @@ GstMessage *
gst_bus_pop (GstBus * bus)
{
GstMessage *message;
gboolean needs_read = FALSE;
g_return_val_if_fail (GST_IS_BUS (bus), NULL);
g_mutex_lock (bus->queue_lock);
message = g_queue_pop_head (bus->queue);
if (message && g_queue_get_length (bus->queue) == 0)
needs_read = TRUE;
g_mutex_unlock (bus->queue_lock);
if (needs_read) {
gchar c;
ssize_t read_ret = -1;
/* the char in the fd is essentially just a way to wake us up. read it off so
we're not woken up again. */
errno = EAGAIN;
while (read_ret == -1) {
switch (errno) {
case EAGAIN:
case EINTR:
break;
default:
perror ("gst_bus_pop: could not read from fd");
return NULL;
}
read_ret = read (bus->control_socket[0], &c, 1);
}
}
return message;
}
@ -413,30 +451,14 @@ typedef struct
} GstBusWatch;
static gboolean
bus_callback (GIOChannel * channel, GIOCondition cond, GstBusWatch * watch)
bus_watch_callback (GIOChannel * channel, GIOCondition cond,
GstBusWatch * watch)
{
GstMessage *message;
gboolean needs_pop = TRUE;
gchar c;
ssize_t read_ret = -1;
g_return_val_if_fail (GST_IS_BUS (watch->bus), FALSE);
/* the char in the fd is essentially just a way to wake us up. read it off so
we're not woken up again. */
errno = EAGAIN;
while (read_ret == -1) {
switch (errno) {
case EAGAIN:
case EINTR:
break;
default:
perror ("gst_bus_pop: could not read from fd");
return TRUE;
}
read_ret = read (watch->bus->control_socket[0], &c, 1);
}
message = gst_bus_peek (watch->bus);
g_return_val_if_fail (message != NULL, TRUE);
@ -451,7 +473,7 @@ bus_callback (GIOChannel * channel, GIOCondition cond, GstBusWatch * watch)
}
static void
bus_destroy (GstBusWatch * watch)
bus_watch_destroy (GstBusWatch * watch)
{
if (watch->notify) {
watch->notify (watch->user_data);
@ -496,8 +518,8 @@ gst_bus_add_watch_full (GstBus * bus, gint priority,
if (priority != G_PRIORITY_DEFAULT)
g_source_set_priority (watch->source, priority);
g_source_set_callback (watch->source, (GSourceFunc) bus_callback, watch,
(GDestroyNotify) bus_destroy);
g_source_set_callback (watch->source, (GSourceFunc) bus_watch_callback,
watch, (GDestroyNotify) bus_watch_destroy);
id = g_source_attach (watch->source, NULL);
g_source_unref (watch->source);

View file

@ -114,6 +114,12 @@ _gst_message_free (GstMessage * message)
GST_ERROR ("tag message %p didn't contain a valid tag list!", message);
}
break;
case GST_MESSAGE_APPLICATION:
if (message->message_data.structure.structure) {
gst_structure_free (message->message_data.structure.structure);
message->message_data.structure.structure = NULL;
}
break;
default:
break;
}
@ -160,8 +166,10 @@ gst_message_new (GstMessageType type, GstObject * src)
GST_MESSAGE_TYPE (message) = type;
GST_MESSAGE_TIMESTAMP (message) = G_GINT64_CONSTANT (0);
gst_object_ref (src);
GST_MESSAGE_SRC (message) = src;
if (src) {
gst_object_ref (src);
GST_MESSAGE_SRC (message) = src;
}
return message;
}
@ -269,3 +277,26 @@ gst_message_new_state_changed (GstObject * src, GstElementState old,
return message;
}
/**
* gst_message_new_application:
* @structure: The structure for the message. The message will take ownership of
* the structure.
*
* Create a new application-specific message. These messages can be used by
* application-specific plugins to pass data to the app.
*
* Returns: The new message.
*
* MT safe.
*/
GstMessage *
gst_message_new_application (GstStructure * structure)
{
GstMessage *message;
message = gst_message_new (GST_MESSAGE_APPLICATION, NULL);
message->message_data.structure.structure = structure;
return message;
}

View file

@ -47,6 +47,8 @@ GST_EXPORT GType _gst_message_type;
* @GST_MESSAGE_STRUCTURE_CHANGE: the structure of the pipeline changed.
* @GST_MESSAGE_STREAM_STATUS: status about a stream, emited when it starts,
* stops, errors, etc..
* @GST_MESSAGE_APPLICATION: message posted by the application, possibly
* via an application-specific element.
* @GST_MESSAGE_ANY: mask for all of the above messages.
*/
typedef enum
@ -63,6 +65,7 @@ typedef enum
GST_MESSAGE_NEW_CLOCK = (1 << 8),
GST_MESSAGE_STRUCTURE_CHANGE = (1 << 9),
GST_MESSAGE_STREAM_STATUS = (1 << 10),
GST_MESSAGE_APPLICATION = (1 << 11),
GST_MESSAGE_ANY = 0xffffffff
} GstMessageType;
@ -156,6 +159,7 @@ GstMessage * gst_message_new_warning (GstObject * src, GError * error, gchar *
GstMessage * gst_message_new_tag (GstObject * src, GstTagList * tag_list);
GstMessage * gst_message_new_state_changed (GstObject * src, GstElementState old,
GstElementState new);
GstMessage * gst_message_new_application (GstStructure *structure);
G_END_DECLS
#endif /* __GST_MESSAGE_H__ */

View file

@ -5,11 +5,9 @@ TESTS_ENVIRONMENT=\
plugindir = $(libdir)/gstreamer-@GST_MAJORMINOR@
# make all tests depend on the versioned gst-register
#$(TESTS): $(top_builddir)/tools/gst-register-@GST_MAJORMINOR@
$(TESTS): $(top_builddir)/tools/gst-register-@GST_MAJORMINOR@
# rebuild gst-register-@GST_MAJORMINOR@ if needed
# the EXEEXT is because am 1.6 complained about overrides
#$(top_builddir)/tools/gst-register-@GST_MAJORMINOR@$(EXEEXT):
$(top_builddir)/tools/gst-register-@GST_MAJORMINOR@:
cd $(top_builddir)/tools && make
@ -20,9 +18,8 @@ install-pluginLTLIBRARIES:
# dumps have PIDs appended
CLEANFILES = core.*
#TESTS = $(top_builddir)/tools/gst-register-@GST_MAJORMINOR@
TESTS = \
TESTS = $(top_builddir)/tools/gst-register-@GST_MAJORMINOR@ \
gst/gstbus \
gst/gstcaps \
gst/gstdata \
gst/gstobject \

View file

@ -1,7 +1,7 @@
/* GStreamer
* Copyright (C) 2005 Andy Wingo <wingo@pobox.com>
*
* gstcaps.c: Unit test for GstCaps
* simple_launch_lines.c: Unit test for simple pipelines
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
@ -37,7 +37,8 @@ setup_pipeline (gchar * pipe_descr)
the poll call will time out after half a second.
*/
static void
run_pipeline (GstElement * pipe, GstMessageType events, GstMessageType tevent)
run_pipeline (GstElement * pipe, gchar * descr,
GstMessageType events, GstMessageType tevent)
{
GstBus *bus;
GstMessageType revent;
@ -48,15 +49,22 @@ run_pipeline (GstElement * pipe, GstMessageType events, GstMessageType tevent)
while (1) {
revent = gst_bus_poll (bus, GST_MESSAGE_ANY, GST_SECOND / 2);
/* always have to pop the message before getting back into poll */
if (revent != GST_MESSAGE_UNKNOWN)
gst_message_unref (gst_bus_pop (bus));
if (revent == tevent) {
break;
} else if (revent == GST_MESSAGE_UNKNOWN) {
g_critical ("Unexpected timeout in gst_bus_poll, looking for %d", tevent);
g_critical ("Unexpected timeout in gst_bus_poll, looking for %d: %s",
tevent, descr);
break;
} else if (revent & events) {
continue;
}
g_critical ("Unexpected message received of type %d!", revent);
g_critical ("Unexpected message received of type %d, looking for %d: %s",
revent, tevent, descr);
}
gst_element_set_state (pipe, GST_STATE_NULL);
@ -67,9 +75,25 @@ START_TEST (test_2_elements)
{
gchar *s;
s = "fakesrc ! fakesink";
run_pipeline (setup_pipeline (s),
s = "fakesrc has-loop=false ! fakesink has-loop=true";
run_pipeline (setup_pipeline (s), s,
GST_MESSAGE_STATE_CHANGED, GST_MESSAGE_UNKNOWN);
s = "fakesrc has-loop=true ! fakesink has-loop=false";
run_pipeline (setup_pipeline (s), s,
GST_MESSAGE_STATE_CHANGED, GST_MESSAGE_UNKNOWN);
s = "fakesrc has-loop=false num-buffers=10 ! fakesink has-loop=true";
run_pipeline (setup_pipeline (s), s,
GST_MESSAGE_STATE_CHANGED, GST_MESSAGE_EOS);
s = "fakesrc has-loop=true num-buffers=10 ! fakesink has-loop=false";
run_pipeline (setup_pipeline (s), s,
GST_MESSAGE_STATE_CHANGED, GST_MESSAGE_EOS);
s = "fakesrc has-loop=false ! fakesink has-loop=false";
ASSERT_CRITICAL (run_pipeline (setup_pipeline (s), s,
GST_MESSAGE_STATE_CHANGED, GST_MESSAGE_UNKNOWN));
}
END_TEST Suite * simple_launch_lines_suite (void)
{