gst/gstbus.*: Implement a real GSource and use g_main_context_wakeup() to signal new messages instead of the socketpair.

Original commit message from CVS:
* gst/gstbus.c: (gst_bus_init), (gst_bus_dispose), (gst_bus_post),
(gst_bus_pop), (gst_bus_source_prepare), (gst_bus_source_check),
(gst_bus_source_dispatch), (gst_bus_source_finalize),
(gst_bus_create_watch), (gst_bus_add_watch_full):
* gst/gstbus.h:
Implement a real GSource and use g_main_context_wakeup() to
signal new messages instead of the socketpair.
This commit is contained in:
Wim Taymans 2005-05-26 10:48:53 +00:00
parent 56d9730d20
commit ea2dd0057c
3 changed files with 93 additions and 147 deletions

View file

@ -1,3 +1,13 @@
2005-05-26 Wim Taymans <wim@fluendo.com>
* gst/gstbus.c: (gst_bus_init), (gst_bus_dispose), (gst_bus_post),
(gst_bus_pop), (gst_bus_source_prepare), (gst_bus_source_check),
(gst_bus_source_dispatch), (gst_bus_source_finalize),
(gst_bus_create_watch), (gst_bus_add_watch_full):
* gst/gstbus.h:
Implement a real GSource and use g_main_context_wakeup() to
signal new messages instead of the socketpair.
2005-05-25 Wim Taymans <wim@fluendo.com>
* gst/gstbin.c: (bin_element_is_sink), (has_ancestor),

View file

@ -97,19 +97,7 @@ gst_bus_init (GstBus * bus)
bus->queue = g_queue_new ();
bus->queue_lock = g_mutex_new ();
if (socketpair (PF_UNIX, SOCK_STREAM, 0, bus->control_socket) < 0)
goto no_socketpair;
bus->io_channel = g_io_channel_unix_new (bus->control_socket[0]);
return;
/* errors */
no_socketpair:
{
g_warning ("cannot create io channel");
bus->io_channel = NULL;
}
}
static void
@ -119,14 +107,6 @@ gst_bus_dispose (GObject * object)
bus = GST_BUS (object);
if (bus->io_channel) {
g_io_channel_shutdown (bus->io_channel, TRUE, NULL);
g_io_channel_unref (bus->io_channel);
bus->io_channel = NULL;
}
close (bus->control_socket[0]);
close (bus->control_socket[1]);
if (bus->queue) {
g_mutex_lock (bus->queue_lock);
g_queue_free (bus->queue);
@ -193,12 +173,9 @@ gst_bus_new (void)
gboolean
gst_bus_post (GstBus * bus, GstMessage * message)
{
gchar c;
GstBusSyncReply reply = GST_BUS_PASS;
GstBusSyncHandler handler;
gpointer handler_data;
gboolean need_write = FALSE;
ssize_t write_ret = -1;
g_return_val_if_fail (GST_IS_BUS (bus), FALSE);
g_return_val_if_fail (GST_IS_MESSAGE (message), FALSE);
@ -207,7 +184,6 @@ gst_bus_post (GstBus * bus, GstMessage * message)
GST_MESSAGE_TYPE (message));
GST_LOCK (bus);
if (GST_FLAG_IS_SET (bus, GST_BUS_FLUSHING)) {
gst_message_unref (message);
GST_UNLOCK (bus);
@ -232,26 +208,12 @@ gst_bus_post (GstBus * bus, GstMessage * message)
case GST_BUS_PASS:
/* pass the message to the async queue */
g_mutex_lock (bus->queue_lock);
if (g_queue_get_length (bus->queue) == 0)
need_write = TRUE;
g_queue_push_tail (bus->queue, message);
g_mutex_unlock (bus->queue_lock);
if (need_write) {
c = 'p';
errno = EAGAIN;
while (write_ret == -1) {
switch (errno) {
case EAGAIN:
case EINTR:
break;
default:
perror ("gst_bus_post: could not write to fd");
return FALSE;
}
write_ret = write (bus->control_socket[1], &c, 1);
}
}
/* FIXME cannot assume the source is only in the default context */
g_main_context_wakeup (NULL);
break;
case GST_BUS_ASYNC:
{
@ -270,26 +232,11 @@ gst_bus_post (GstBus * bus, GstMessage * message)
* the cond will be signalled and we can continue */
g_mutex_lock (lock);
g_mutex_lock (bus->queue_lock);
if (g_queue_get_length (bus->queue) == 0)
need_write = TRUE;
g_queue_push_tail (bus->queue, message);
g_mutex_unlock (bus->queue_lock);
if (need_write) {
c = 'p';
errno = EAGAIN;
while (write_ret == -1) {
switch (errno) {
case EAGAIN:
case EINTR:
break;
default:
perror ("gst_bus_post: could not write to fd");
return FALSE;
}
write_ret = write (bus->control_socket[1], &c, 1);
}
}
/* FIXME cannot assume the source is only in the default context */
g_main_context_wakeup (NULL);
/* now block till the message is freed */
g_cond_wait (cond, lock);
@ -375,36 +322,13 @@ GstMessage *
gst_bus_pop (GstBus * bus)
{
GstMessage *message;
gboolean needs_read = FALSE;
g_return_val_if_fail (GST_IS_BUS (bus), NULL);
g_mutex_lock (bus->queue_lock);
message = g_queue_pop_head (bus->queue);
if (message && g_queue_get_length (bus->queue) == 0)
needs_read = TRUE;
g_mutex_unlock (bus->queue_lock);
if (needs_read) {
gchar c;
ssize_t read_ret = -1;
/* the char in the fd is essentially just a way to wake us up. read it off so
we're not woken up again. */
errno = EAGAIN;
while (read_ret == -1) {
switch (errno) {
case EAGAIN:
case EINTR:
break;
default:
perror ("gst_bus_pop: could not read from fd");
return NULL;
}
read_ret = read (bus->control_socket[0], &c, 1);
}
}
return message;
}
@ -454,6 +378,71 @@ gst_bus_set_sync_handler (GstBus * bus, GstBusSyncHandler func, gpointer data)
GST_UNLOCK (bus);
}
/* GSource for the bus
*/
typedef struct
{
GSource source;
GstBus *bus;
} GstBusSource;
gboolean
gst_bus_source_prepare (GSource * source, gint * timeout)
{
*timeout = -1;
return gst_bus_have_pending (((GstBusSource *) source)->bus);
}
gboolean
gst_bus_source_check (GSource * source)
{
return gst_bus_have_pending (((GstBusSource *) source)->bus);
}
gboolean
gst_bus_source_dispatch (GSource * source, GSourceFunc callback,
gpointer user_data)
{
GstBusHandler handler = (GstBusHandler) callback;
GstBusSource *bsource = (GstBusSource *) source;
GstMessage *message;
gboolean needs_pop = TRUE;
g_return_val_if_fail (GST_IS_BUS (bsource->bus), FALSE);
message = gst_bus_peek (bsource->bus);
g_return_val_if_fail (message != NULL, TRUE);
if (!handler) {
g_warning ("GstBus watch dispatched without callback\n"
"You must call g_source_connect().");
return FALSE;
}
needs_pop = handler (bsource->bus, message, user_data);
if (needs_pop)
gst_message_unref (gst_bus_pop (bsource->bus));
return TRUE;
}
void
gst_bus_source_finalize (GSource * source)
{
GstBusSource *bsource = (GstBusSource *) source;
gst_object_unref (GST_OBJECT_CAST (bsource->bus));
}
static GSourceFuncs gst_bus_source_funcs = {
gst_bus_source_prepare,
gst_bus_source_check,
gst_bus_source_dispatch,
gst_bus_source_finalize
};
/**
* gst_bus_create_watch:
* @bus: a #GstBus to create the watch for
@ -465,57 +454,16 @@ gst_bus_set_sync_handler (GstBus * bus, GstBusSyncHandler func, gpointer data)
GSource *
gst_bus_create_watch (GstBus * bus)
{
GSource *source;
GstBusSource *source;
g_return_val_if_fail (GST_IS_BUS (bus), NULL);
/* FIXME, we need to ref the bus and unref it when the source
* is destroyed */
source = g_io_create_watch (bus->io_channel, G_IO_IN);
source = (GstBusSource *) g_source_new (&gst_bus_source_funcs,
sizeof (GstBusSource));
gst_object_ref (GST_OBJECT_CAST (bus));
source->bus = bus;
return source;
}
typedef struct
{
GSource *source;
GstBus *bus;
gint priority;
GstBusHandler handler;
gpointer user_data;
GDestroyNotify notify;
} GstBusWatch;
static gboolean
bus_watch_callback (GIOChannel * channel, GIOCondition cond,
GstBusWatch * watch)
{
GstMessage *message;
gboolean needs_pop = TRUE;
g_return_val_if_fail (GST_IS_BUS (watch->bus), FALSE);
message = gst_bus_peek (watch->bus);
g_return_val_if_fail (message != NULL, TRUE);
if (watch->handler)
needs_pop = watch->handler (watch->bus, message, watch->user_data);
if (needs_pop)
gst_message_unref (gst_bus_pop (watch->bus));
return TRUE;
}
static void
bus_watch_destroy (GstBusWatch * watch)
{
if (watch->notify) {
watch->notify (watch->user_data);
}
gst_object_unref (GST_OBJECT_CAST (watch->bus));
g_free (watch);
return (GSource *) source;
}
/**
@ -540,28 +488,19 @@ gst_bus_add_watch_full (GstBus * bus, gint priority,
GstBusHandler handler, gpointer user_data, GDestroyNotify notify)
{
guint id;
GstBusWatch *watch;
GSource *source;
g_return_val_if_fail (GST_IS_BUS (bus), 0);
watch = g_new (GstBusWatch, 1);
gst_object_ref (GST_OBJECT_CAST (bus));
watch->source = gst_bus_create_watch (bus);
watch->bus = bus;
watch->priority = priority;
watch->handler = handler;
watch->user_data = user_data;
watch->notify = notify;
source = gst_bus_create_watch (bus);
if (priority != G_PRIORITY_DEFAULT)
g_source_set_priority (watch->source, priority);
g_source_set_priority (source, priority);
g_source_set_callback (watch->source, (GSourceFunc) bus_watch_callback,
watch, (GDestroyNotify) bus_watch_destroy);
g_source_set_callback (source, (GSourceFunc) handler, user_data, notify);
id = g_source_attach (watch->source, NULL);
g_source_unref (watch->source);
id = g_source_attach (source, NULL);
g_source_unref (source);
return id;
}

View file

@ -64,9 +64,6 @@ struct _GstBus
GstBusSyncHandler sync_handler;
gpointer sync_handler_data;
gint control_socket[2];
GIOChannel *io_channel;
/*< private > */
gpointer _gst_reserved[GST_PADDING];
};
@ -105,6 +102,6 @@ guint gst_bus_add_watch (GstBus * bus,
GstMessageType gst_bus_poll (GstBus *bus, GstMessageType events,
GstClockTimeDiff timeout);
G_END_DECLS
#endif /* __GST_BUS_H__ */