diff --git a/gst/gstbus.c b/gst/gstbus.c new file mode 100644 index 0000000000..717886773b --- /dev/null +++ b/gst/gstbus.c @@ -0,0 +1,311 @@ +/* GStreamer + * Copyright (C) 2004 Wim Taymans + * + * gstbus.c: GstBus subsystem + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place - Suite 330, + * Boston, MA 02111-1307, USA. + */ + + +#include +#include +#include + +#include "gst_private.h" +#include "gstinfo.h" + +#include "gstbus.h" + +enum +{ + ARG_0, +}; + +static void gst_bus_class_init (GstBusClass * klass); +static void gst_bus_init (GstBus * bus); +static void gst_bus_dispose (GObject * object); + +static void gst_bus_set_property (GObject * object, guint prop_id, + const GValue * value, GParamSpec * pspec); +static void gst_bus_get_property (GObject * object, guint prop_id, + GValue * value, GParamSpec * pspec); + +static GstObjectClass *parent_class = NULL; + +/* static guint gst_bus_signals[LAST_SIGNAL] = { 0 }; */ + +GType +gst_bus_get_type (void) +{ + static GType bus_type = 0; + + if (!bus_type) { + static const GTypeInfo bus_info = { + sizeof (GstBusClass), + NULL, + NULL, + (GClassInitFunc) gst_bus_class_init, + NULL, + NULL, + sizeof (GstBus), + 0, + (GInstanceInitFunc) gst_bus_init, + NULL + }; + + bus_type = g_type_register_static (GST_TYPE_OBJECT, "GstBus", &bus_info, 0); + } + return bus_type; +} + +static void +gst_bus_class_init (GstBusClass * klass) +{ + GObjectClass *gobject_class; + GstObjectClass *gstobject_class; + + gobject_class = (GObjectClass *) klass; + gstobject_class = (GstObjectClass *) klass; + + parent_class = g_type_class_ref (GST_TYPE_OBJECT); + + if (!g_thread_supported ()) + g_thread_init (NULL); + + gobject_class->dispose = GST_DEBUG_FUNCPTR (gst_bus_dispose); + gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_bus_set_property); + gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_bus_get_property); +} + +static void +gst_bus_init (GstBus * bus) +{ + bus->queue = g_async_queue_new (); + + if (socketpair (PF_UNIX, SOCK_STREAM, 0, bus->control_socket) < 0) { + g_warning ("cannot create io channel"); + } else { + bus->io_channel = g_io_channel_unix_new (bus->control_socket[0]); + } +} + +static void +gst_bus_dispose (GObject * object) +{ + GstBus *bus; + + bus = GST_BUS (object); + + G_OBJECT_CLASS (parent_class)->dispose (object); +} + +static void +gst_bus_set_property (GObject * object, guint prop_id, + const GValue * value, GParamSpec * pspec) +{ + GstBus *bus; + + bus = GST_BUS (object); + + switch (prop_id) { + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static void +gst_bus_get_property (GObject * object, guint prop_id, + GValue * value, GParamSpec * pspec) +{ + GstBus *bus; + + bus = GST_BUS (object); + + switch (prop_id) { + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +gboolean +gst_bus_post (GstBus * bus, GstMessage * message) +{ + gchar c; + GstBusSyncReply reply = GST_BUS_PASS; + + g_return_val_if_fail (GST_IS_BUS (bus), FALSE); + g_return_val_if_fail (GST_IS_MESSAGE (message), FALSE); + + if (bus->sync_handler) { + reply = bus->sync_handler (bus, message, bus->sync_handler_data); + } + + switch (reply) { + case GST_BUS_DROP: + break; + case GST_BUS_PASS: + g_async_queue_push (bus->queue, message); + c = 'p'; + write (bus->control_socket[1], &c, 1); + break; + case GST_BUS_ASYNC: + { + GMutex *lock = g_mutex_new (); + GCond *cond = g_cond_new (); + + message->cond = cond; + message->lock = lock; + + GST_DEBUG ("waiting for async delivery of message %p", message); + + g_mutex_lock (lock); + g_async_queue_push (bus->queue, message); + c = 'p'; + write (bus->control_socket[1], &c, 1); + g_cond_wait (cond, lock); + g_mutex_unlock (lock); + + GST_DEBUG ("message %p delivered asynchronously", message); + + g_mutex_free (lock); + g_cond_free (cond); + break; + } + } + + return TRUE; +} + +gboolean +gst_bus_have_pending (GstBus * bus) +{ + gint length; + + g_return_val_if_fail (GST_IS_BUS (bus), FALSE); + + length = g_async_queue_length (bus->queue); + + return (length > 0); +} + +GstMessage * +gst_bus_pop (GstBus * bus) +{ + GstMessage *message; + gchar c; + + g_return_val_if_fail (GST_IS_BUS (bus), FALSE); + + message = g_async_queue_pop (bus->queue); + read (bus->control_socket[0], &c, 1); + + return message; +} + +void +gst_bus_set_sync_handler (GstBus * bus, GstBusSyncHandler func, gpointer data) +{ + g_return_if_fail (GST_IS_BUS (bus)); + + bus->sync_handler = func; + bus->sync_handler_data = data; +} + +GSource * +gst_bus_create_watch (GstBus * bus) +{ + GSource *source; + + g_return_val_if_fail (GST_IS_BUS (bus), FALSE); + + source = g_io_create_watch (bus->io_channel, G_IO_IN); + + return source; +} + +typedef struct +{ + GSource *source; + GstBus *bus; + gint priority; + GstBusHandler handler; + gpointer user_data; + GDestroyNotify notify; +} GstBusWatch; + +static gboolean +bus_callback (GIOChannel * channel, GIOCondition cond, GstBusWatch * watch) +{ + GstMessage *message; + + g_return_val_if_fail (GST_IS_BUS (watch->bus), FALSE); + + message = gst_bus_pop (watch->bus); + + if (watch->handler) + watch->handler (watch->bus, message, watch->user_data); + + return TRUE; +} + +static void +bus_destroy (GstBusWatch * watch) +{ + g_print ("destroy\n"); + if (watch->notify) { + watch->notify (watch->user_data); + } + g_free (watch); +} + +guint +gst_bus_add_watch_full (GstBus * bus, gint priority, + GstBusHandler handler, gpointer user_data, GDestroyNotify notify) +{ + guint id; + GstBusWatch *watch; + + g_return_val_if_fail (GST_IS_BUS (bus), 0); + + watch = g_new (GstBusWatch, 1); + + watch->source = gst_bus_create_watch (bus); + watch->bus = bus; + watch->priority = priority; + watch->handler = handler; + watch->user_data = user_data; + watch->notify = notify; + + if (priority != G_PRIORITY_DEFAULT) + g_source_set_priority (watch->source, priority); + + g_source_set_callback (watch->source, (GSourceFunc) bus_callback, watch, + (GDestroyNotify) bus_destroy); + + id = g_source_attach (watch->source, NULL); + g_source_unref (watch->source); + + return id; +} + +guint +gst_bus_add_watch (GstBus * bus, GstBusHandler handler, gpointer user_data) +{ + return gst_bus_add_watch_full (bus, G_PRIORITY_DEFAULT, handler, user_data, + NULL); +} diff --git a/gst/gstbus.h b/gst/gstbus.h new file mode 100644 index 0000000000..63494a0328 --- /dev/null +++ b/gst/gstbus.h @@ -0,0 +1,94 @@ +/* GStreamer + * Copyright (C) 2004 Wim Taymans + * + * gstbus.h: Header for GstBus subsystem + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place - Suite 330, + * Boston, MA 02111-1307, USA. + */ + +#ifndef __GST_BUS_H__ +#define __GST_BUS_H__ + +#include +#include + +G_BEGIN_DECLS + +/* --- standard type macros --- */ +#define GST_TYPE_BUS (gst_bus_get_type ()) +#define GST_BUS(bus) (G_TYPE_CHECK_INSTANCE_CAST ((bus), GST_TYPE_BUS, GstBus)) +#define GST_IS_BUS(bus) (G_TYPE_CHECK_INSTANCE_TYPE ((bus), GST_TYPE_BUS)) +#define GST_BUS_CLASS(bclass) (G_TYPE_CHECK_CLASS_CAST ((bclass), GST_TYPE_BUS, GstBusClass)) +#define GST_IS_BUS_CLASS(bclass) (G_TYPE_CHECK_CLASS_TYPE ((bclass), GST_TYPE_BUS)) +#define GST_BUS_GET_CLASS(bus) (G_TYPE_INSTANCE_GET_CLASS ((bus), GST_TYPE_BUS, GstBusClass)) + +typedef enum +{ + GST_BUS_DROP = 0, /* drop message */ + GST_BUS_PASS = 1, /* pass message to async queue */ + GST_BUS_ASYNC = 2, /* pass message to async queue, continue if message is handled */ +} GstBusSyncReply; + +typedef struct _GstBus GstBus; +typedef struct _GstBusClass GstBusClass; + +typedef GstBusSyncReply (*GstBusSyncHandler) (GstBus *bus, GstMessage *message, gpointer data); +typedef gboolean (*GstBusHandler) (GstBus *bus, GstMessage *message, gpointer data); + +struct _GstBus { + GstObject object; + + GAsyncQueue *queue; + + GstBusSyncHandler sync_handler; + gpointer sync_handler_data; + + gint control_socket[2]; + GIOChannel *io_channel; + + gpointer _gst_reserved[GST_PADDING]; +}; + +struct _GstBusClass { + GstObjectClass parent_class; + + gpointer _gst_reserved[GST_PADDING]; +}; + +GType gst_bus_get_type (void); + +gboolean gst_bus_post (GstBus *bus, GstMessage *message); + +gboolean gst_bus_have_pending (GstBus *bus); +const GstMessage* gst_bus_peek (GstBus *bus); +GstMessage* gst_bus_pop (GstBus *bus); + +void gst_bus_set_sync_handler (GstBus *bus, GstBusSyncHandler func, + gpointer data); + +GSource* gst_bus_create_watch (GstBus *bus); +guint gst_bus_add_watch_full (GstBus *bus, + gint priority, + GstBusHandler handler, + gpointer user_data, + GDestroyNotify notify); +guint gst_bus_add_watch (GstBus *bus, + GstBusHandler handler, + gpointer user_data); + +G_END_DECLS + +#endif /* __GST_BUS_H__ */ diff --git a/gst/gstiterator.c b/gst/gstiterator.c new file mode 100644 index 0000000000..4c995c239d --- /dev/null +++ b/gst/gstiterator.c @@ -0,0 +1,261 @@ +/* GStreamer + * Copyright (C) 2004 Wim Taymans + * + * gstiterator.h: Base class for iterating lists. + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place - Suite 330, + * Boston, MA 02111-1307, USA. + */ + +#include "gst_private.h" +#include + +static void +gst_iterator_init (GstIterator * it, + GMutex * lock, + guint32 * master_cookie, + GstIteratorNextFunction next, + GstIteratorResyncFunction resync, GstIteratorFreeFunction free) +{ + it->lock = lock; + it->master_cookie = master_cookie; + it->cookie = *master_cookie; + it->next = next; + it->resync = resync; + it->free = free; +} + +GstIterator * +gst_iterator_new (guint size, + GMutex * lock, + guint32 * master_cookie, + GstIteratorNextFunction next, + GstIteratorResyncFunction resync, GstIteratorFreeFunction free) +{ + GstIterator *result; + + g_return_val_if_fail (size >= sizeof (GstIterator), NULL); + g_return_val_if_fail (master_cookie != NULL, NULL); + g_return_val_if_fail (next != NULL, NULL); + g_return_val_if_fail (resync != NULL, NULL); + g_return_val_if_fail (free != NULL, NULL); + + result = g_malloc (size); + gst_iterator_init (result, lock, master_cookie, next, resync, free); + + return result; +} + +GstIteratorResult +gst_iterator_next (GstIterator * it, gpointer * elem) +{ + GstIteratorResult result; + + g_return_val_if_fail (it != NULL, GST_ITERATOR_ERROR); + g_return_val_if_fail (elem != NULL, GST_ITERATOR_ERROR); + + if (it->lock) + g_mutex_lock (it->lock); + if (*it->master_cookie != it->cookie) { + result = GST_ITERATOR_RESYNC; + goto done; + } + + result = it->next (it, elem); + +done: + if (it->lock) + g_mutex_unlock (it->lock); + + return result; +} + +void +gst_iterator_resync (GstIterator * it) +{ + g_return_if_fail (it != NULL); + + if (it->lock) + g_mutex_lock (it->lock); + it->resync (it); + it->cookie = *it->master_cookie; + if (it->lock) + g_mutex_unlock (it->lock); +} + +void +gst_iterator_free (GstIterator * it) +{ + g_return_if_fail (it != NULL); + + it->free (it); +} + +typedef struct _GstIteratorFilter +{ + GstIterator iterator; + GstIterator *slave; + + GCompareFunc func; + gpointer user_data; + + gboolean compare; + gboolean first; + gboolean found; + +} GstIteratorFilter; + +static GstIteratorResult +filter_next (GstIteratorFilter * it, gpointer * elem) +{ + GstIteratorResult result; + gboolean done = FALSE; + + *elem = NULL; + + if (it->found) + return GST_ITERATOR_DONE; + + while (!done) { + gpointer item; + + result = gst_iterator_next (it->slave, &item); + switch (result) { + case GST_ITERATOR_OK: + if (GST_ITERATOR (it)->lock) + g_mutex_unlock (GST_ITERATOR (it)->lock); + if (it->compare) { + if (it->func (item, it->user_data) == 0) { + *elem = item; + done = TRUE; + if (it->first) + it->found = TRUE; + } + } else { + it->func (item, it->user_data); + } + if (GST_ITERATOR (it)->lock) + g_mutex_lock (GST_ITERATOR (it)->lock); + break; + case GST_ITERATOR_RESYNC: + case GST_ITERATOR_DONE: + done = TRUE; + break; + default: + g_assert_not_reached (); + break; + } + } + return result; +} + +static void +filter_resync (GstIteratorFilter * it) +{ + gst_iterator_resync (it->slave); + it->found = FALSE; +} + +static void +filter_uninit (GstIteratorFilter * it) +{ + it->slave->lock = GST_ITERATOR (it)->lock; +} + +static void +filter_free (GstIteratorFilter * it) +{ + filter_uninit (it); + gst_iterator_free (it->slave); + g_free (it); +} + +GstIterator * +gst_iterator_filter (GstIterator * it, gpointer user_data, GCompareFunc func) +{ + GstIteratorFilter *result; + + g_return_val_if_fail (it != NULL, NULL); + g_return_val_if_fail (func != NULL, NULL); + + result = (GstIteratorFilter *) gst_iterator_new (sizeof (GstIteratorFilter), + it->lock, it->master_cookie, + (GstIteratorNextFunction) filter_next, + (GstIteratorResyncFunction) filter_resync, + (GstIteratorFreeFunction) filter_free); + it->lock = NULL; + result->func = func; + result->user_data = user_data; + result->slave = it; + result->compare = TRUE; + result->first = FALSE; + result->found = FALSE; + + return GST_ITERATOR (result); +} + +void +gst_iterator_foreach (GstIterator * it, GFunc function, gpointer user_data) +{ + GstIteratorFilter filter; + gpointer dummy; + + g_return_if_fail (it != NULL); + g_return_if_fail (function != NULL); + + gst_iterator_init (GST_ITERATOR (&filter), + it->lock, it->master_cookie, + (GstIteratorNextFunction) filter_next, + (GstIteratorResyncFunction) filter_resync, + (GstIteratorFreeFunction) filter_uninit); + it->lock = NULL; + filter.func = (GCompareFunc) function; + filter.user_data = user_data; + filter.slave = it; + filter.compare = FALSE; + filter.first = FALSE; + filter.found = FALSE; + gst_iterator_next (GST_ITERATOR (&filter), &dummy); + gst_iterator_free (GST_ITERATOR (&filter)); +} + +gpointer +gst_iterator_find_custom (GstIterator * it, gpointer user_data, + GCompareFunc func) +{ + GstIteratorFilter filter; + gpointer result = NULL; + + g_return_val_if_fail (it != NULL, NULL); + g_return_val_if_fail (func != NULL, NULL); + + gst_iterator_init (GST_ITERATOR (&filter), + it->lock, it->master_cookie, + (GstIteratorNextFunction) filter_next, + (GstIteratorResyncFunction) filter_resync, + (GstIteratorFreeFunction) filter_uninit); + it->lock = NULL; + filter.func = func; + filter.user_data = user_data; + filter.slave = it; + filter.compare = TRUE; + filter.first = TRUE; + filter.found = FALSE; + + gst_iterator_next (GST_ITERATOR (&filter), &result); + gst_iterator_free (GST_ITERATOR (&filter)); + + return result; +} diff --git a/gst/gstiterator.h b/gst/gstiterator.h new file mode 100644 index 0000000000..d28f0c4c6a --- /dev/null +++ b/gst/gstiterator.h @@ -0,0 +1,78 @@ +/* GStreamer + * Copyright (C) 2004 Wim Taymans + * + * gstiterator.h: Header for GstIterator + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place - Suite 330, + * Boston, MA 02111-1307, USA. + */ + +#ifndef __GST_ITERATOR_H__ +#define __GST_ITERATOR_H__ + +#include + +G_BEGIN_DECLS + +typedef enum { + GST_ITERATOR_DONE = 0, + GST_ITERATOR_OK = 1, + GST_ITERATOR_RESYNC = 2, + GST_ITERATOR_ERROR = 3, +} GstIteratorResult; + +typedef struct _GstIterator GstIterator; + +typedef GstIteratorResult (*GstIteratorNextFunction) (GstIterator *it, gpointer *result); +typedef void (*GstIteratorResyncFunction) (GstIterator *it); +typedef void (*GstIteratorFreeFunction) (GstIterator *it); + +#define GST_ITERATOR(it) ((GstIterator*)(it)) +#define GST_ITERATOR_LOCK(it) (GST_ITERATOR(it)->lock) +#define GST_ITERATOR_COOKIE(it) (GST_ITERATOR(it)->cookie) +#define GST_ITERATOR_ORIG_COOKIE(it) (GST_ITERATOR(it)->master_cookie) + +struct _GstIterator { + GstIteratorNextFunction next; + GstIteratorResyncFunction resync; + GstIteratorFreeFunction free; + + GMutex *lock; + guint32 cookie; /* cookie of the iterator */ + guint32 *master_cookie; /* pointer to guint32 holding the cookie when this + iterator was created */ +}; + +GstIterator* gst_iterator_new (guint size, + GMutex *lock, + guint32 *master_cookie, + GstIteratorNextFunction next, + GstIteratorResyncFunction resync, + GstIteratorFreeFunction free); + +GstIteratorResult gst_iterator_next (GstIterator *it, gpointer *result); +void gst_iterator_resync (GstIterator *it); +void gst_iterator_free (GstIterator *it); + +void gst_iterator_foreach (GstIterator *it, GFunc function, + gpointer user_data); +gpointer gst_iterator_find_custom (GstIterator *it, gpointer user_data, + GCompareFunc func); +GstIterator* gst_iterator_filter (GstIterator *it, gpointer user_data, + GCompareFunc func); + +G_END_DECLS + +#endif /* __GST_ITERATOR_H__ */ diff --git a/gst/gstmessage.c b/gst/gstmessage.c new file mode 100644 index 0000000000..6bce2bdfda --- /dev/null +++ b/gst/gstmessage.c @@ -0,0 +1,214 @@ +/* GStreamer + * Copyright (C) 2004 Wim Taymans + * + * gstmessage.c: GstMessage subsystem + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place - Suite 330, + * Boston, MA 02111-1307, USA. + */ + + +#include /* memcpy */ + +#include "gst_private.h" +#include "gstdata_private.h" +#include "gstinfo.h" +#include "gstmemchunk.h" +#include "gstmessage.h" +#include "gsttag.h" + +#ifndef GST_DISABLE_TRACE +/* #define GST_WITH_ALLOC_TRACE */ +#include "gsttrace.h" +static GstAllocTrace *_message_trace; +#endif + +static GstMemChunk *chunk; + +/* #define MEMPROF */ + +GType _gst_message_type; + +void +_gst_message_initialize (void) +{ + /* register the type */ + _gst_message_type = g_boxed_type_register_static ("GstMessage", + (GBoxedCopyFunc) gst_data_copy, (GBoxedFreeFunc) gst_data_unref); + +#ifndef GST_DISABLE_TRACE + _message_trace = gst_alloc_trace_register (GST_MESSAGE_TRACE_NAME); +#endif + + chunk = gst_mem_chunk_new ("GstMessageChunk", sizeof (GstMessage), + sizeof (GstMessage) * 50, 0); +} + +static GstMessage * +_gst_message_copy (GstMessage * message) +{ + GstMessage *copy; + + copy = gst_mem_chunk_alloc (chunk); +#ifndef GST_DISABLE_TRACE + gst_alloc_trace_new (_message_trace, copy); +#endif + + memcpy (copy, message, sizeof (GstMessage)); + + /* FIXME copy/ref additional fields */ + switch (GST_MESSAGE_TYPE (message)) { + case GST_MESSAGE_TAG: + copy->message_data.structure.structure = + gst_tag_list_copy ((GstTagList *) message->message_data.structure. + structure); + break; + default: + break; + } + + return copy; +} + +static void +_gst_message_free (GstMessage * message) +{ + GST_CAT_INFO (GST_CAT_MESSAGE, "freeing message %p", message); + + if (GST_MESSAGE_SRC (message)) { + gst_object_unref (GST_MESSAGE_SRC (message)); + } + if (message->lock) { + g_mutex_lock (message->lock); + g_cond_signal (message->cond); + g_mutex_unlock (message->lock); + } + switch (GST_MESSAGE_TYPE (message)) { + case GST_MESSAGE_ERROR: + g_error_free (GST_MESSAGE_ERROR_ERROR (message)); + g_free (GST_MESSAGE_ERROR_DEBUG (message)); + break; + case GST_MESSAGE_TAG: + if (GST_IS_TAG_LIST (message->message_data.tag.list)) { + gst_tag_list_free (message->message_data.tag.list); + } else { + g_warning ("tag message %p didn't contain a valid tag list!", message); + GST_ERROR ("tag message %p didn't contain a valid tag list!", message); + } + break; + default: + break; + } + _GST_DATA_DISPOSE (GST_DATA (message)); +#ifndef GST_DISABLE_TRACE + gst_alloc_trace_free (_message_trace, message); +#endif + gst_mem_chunk_free (chunk, message); +} + +GType +gst_message_get_type (void) +{ + return _gst_message_type; +} + +/** + * gst_message_new: + * @type: The type of the new message + * + * Allocate a new message of the given type. + * + * Returns: A new message. + */ +GstMessage * +gst_message_new (GstMessageType type, GstObject * src) +{ + GstMessage *message; + + message = gst_mem_chunk_alloc0 (chunk); +#ifndef GST_DISABLE_TRACE + gst_alloc_trace_new (_message_trace, message); +#endif + + GST_CAT_INFO (GST_CAT_MESSAGE, "creating new message %p %d", message, type); + + _GST_DATA_INIT (GST_DATA (message), + _gst_message_type, + 0, + (GstDataFreeFunction) _gst_message_free, + (GstDataCopyFunction) _gst_message_copy); + + GST_MESSAGE_TYPE (message) = type; + GST_MESSAGE_TIMESTAMP (message) = G_GINT64_CONSTANT (0); + gst_object_ref (src); + GST_MESSAGE_SRC (message) = src; + + return message; +} + + +/** + * gst_message_new_eos: + * + * Create a new eos message. + * + * Returns: The new eos message. + */ +GstMessage * +gst_message_new_eos (GstObject * src) +{ + GstMessage *message; + + message = gst_message_new (GST_MESSAGE_EOS, src); + + return message; +} + +/** + * gst_message_new_error: + * + * Create a new error message. + * + * Returns: The new error message. + */ +GstMessage * +gst_message_new_error (GstObject * src, GError * error, gchar * debug) +{ + GstMessage *message; + + message = gst_message_new (GST_MESSAGE_ERROR, src); + GST_MESSAGE_ERROR_ERROR (message) = error; + GST_MESSAGE_ERROR_DEBUG (message) = debug; + + return message; +} + +/** + * gst_message_new_tag: + * + * Create a new tag message. + * + * Returns: The new tag message. + */ +GstMessage * +gst_message_new_tag (GstObject * src, GstTagList * tag_list) +{ + GstMessage *message; + + message = gst_message_new (GST_MESSAGE_TAG, src); + GST_MESSAGE_TAG_LIST (message) = tag_list; + + return message; +} diff --git a/gst/gstmessage.h b/gst/gstmessage.h new file mode 100644 index 0000000000..6d7862c369 --- /dev/null +++ b/gst/gstmessage.h @@ -0,0 +1,106 @@ +/* GStreamer + * Copyright (C) 2004 Wim Taymans + * + * gstmessage.h: Header for GstMessage subsystem + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place - Suite 330, + * Boston, MA 02111-1307, USA. + */ + +#ifndef __GST_MESSAGE_H__ +#define __GST_MESSAGE_H__ + +#include +#include +#include +#include +#include + +G_BEGIN_DECLS + +GST_EXPORT GType _gst_message_type; + +typedef enum { + GST_MESSAGE_UNKNOWN = 0, + GST_MESSAGE_EOS = 1, + GST_MESSAGE_ERROR = 2, + GST_MESSAGE_WARNING = 3, + GST_MESSAGE_INFO = 4, + GST_MESSAGE_TAG = 5, + GST_MESSAGE_BUFFERING = 6, + GST_MESSAGE_STATE_CHANGED = 7, + GST_MESSAGE_STEP_DONE = 8, +} GstMessageType; + +#define GST_MESSAGE_TRACE_NAME "GstMessage" + +#define GST_TYPE_MESSAGE (_gst_message_type) +#define GST_MESSAGE(message) ((GstMessage*)(message)) +#define GST_IS_MESSAGE(message) (GST_DATA_TYPE(message) == GST_TYPE_MESSAGE) + +#define GST_MESSAGE_TYPE(message) (GST_MESSAGE(message)->type) +#define GST_MESSAGE_TIMESTAMP(message) (GST_MESSAGE(message)->timestamp) +#define GST_MESSAGE_SRC(message) (GST_MESSAGE(message)->src) + +#define GST_MESSAGE_TAG_LIST(message) (GST_MESSAGE(message)->message_data.tag.list) + +#define GST_MESSAGE_ERROR_ERROR(message) (GST_MESSAGE(message)->message_data.error.error) +#define GST_MESSAGE_ERROR_DEBUG(message) (GST_MESSAGE(message)->message_data.error.debug) + +struct _GstMessage { + GstData data; + + GstMessageType type; + guint64 timestamp; + GstObject *src; + + GMutex *lock; /* lock and cond for async delivery */ + GCond *cond; + + union { + struct { + GError *error; + gchar *debug; + } error; + struct { + GstStructure *structure; + } structure; + struct { + GstTagList *list; + } tag; + } message_data; + + gpointer _gst_reserved[GST_PADDING]; +}; + +void _gst_message_initialize (void); + +GType gst_message_get_type (void); +GstMessage* gst_message_new (GstMessageType type, GstObject *src); + +/* refcounting */ +#define gst_message_ref(ev) GST_MESSAGE (gst_data_ref (GST_DATA (ev))) +#define gst_message_ref_by_count(ev,c) GST_MESSAGE (gst_data_ref_by_count (GST_DATA (ev), c)) +#define gst_message_unref(ev) gst_data_unref (GST_DATA (ev)) +/* copy message */ +#define gst_message_copy(ev) GST_MESSAGE (gst_data_copy (GST_DATA (ev))) + +GstMessage* gst_message_new_eos (GstObject *src); +GstMessage* gst_message_new_error (GstObject *src, GError *error, gchar *debug); +GstMessage* gst_message_new_tag (GstObject *src, GstTagList *tag_list); + +G_END_DECLS + +#endif /* __GST_MESSAGE_H__ */ diff --git a/gst/gsttask.c b/gst/gsttask.c new file mode 100644 index 0000000000..be12028957 --- /dev/null +++ b/gst/gsttask.c @@ -0,0 +1,118 @@ +/* GStreamer + * Copyright (C) 1999,2000 Erik Walthinsen + * 2000 Wim Taymans + * + * gsttask.c: Streaming tasks + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place - Suite 330, + * Boston, MA 02111-1307, USA. + */ + +#include "gst_private.h" + +#include "gstinfo.h" +#include "gsttask.h" + +static void gst_task_class_init (GstTaskClass * klass); +static void gst_task_init (GstTask * sched); +static void gst_task_dispose (GObject * object); + +static GstObjectClass *parent_class = NULL; + +GType +gst_task_get_type (void) +{ + static GType _gst_task_type = 0; + + if (!_gst_task_type) { + static const GTypeInfo task_info = { + sizeof (GstTaskClass), + NULL, + NULL, + (GClassInitFunc) gst_task_class_init, + NULL, + NULL, + sizeof (GstTask), + 0, + (GInstanceInitFunc) gst_task_init, + NULL + }; + + _gst_task_type = + g_type_register_static (GST_TYPE_OBJECT, "GstTask", + &task_info, G_TYPE_FLAG_ABSTRACT); + } + return _gst_task_type; +} + +static void +gst_task_class_init (GstTaskClass * klass) +{ + GObjectClass *gobject_class; + + gobject_class = (GObjectClass *) klass; + + parent_class = g_type_class_ref (GST_TYPE_OBJECT); + + gobject_class->dispose = GST_DEBUG_FUNCPTR (gst_task_dispose); +} + +static void +gst_task_init (GstTask * sched) +{ +} + +static void +gst_task_dispose (GObject * object) +{ + GstTask *task = GST_TASK (object); + + /* thse lists should all be NULL */ + GST_DEBUG ("task %p dispose", task); + + G_OBJECT_CLASS (parent_class)->dispose (object); +} + +gboolean +gst_task_start (GstTask * task) +{ + GstTaskClass *tclass; + gboolean result = FALSE; + + g_return_val_if_fail (GST_IS_TASK (task), result); + + tclass = GST_TASK_GET_CLASS (task); + + if (tclass->start) + result = tclass->start (task); + + return result; +} + +gboolean +gst_task_stop (GstTask * task) +{ + GstTaskClass *tclass; + gboolean result = FALSE; + + g_return_val_if_fail (GST_IS_TASK (task), result); + + tclass = GST_TASK_GET_CLASS (task); + + if (tclass->stop) + result = tclass->stop (task); + + return result; +} diff --git a/gst/gsttask.h b/gst/gsttask.h new file mode 100644 index 0000000000..4b7caad8d9 --- /dev/null +++ b/gst/gsttask.h @@ -0,0 +1,64 @@ +/* GStreamer + * Copyright (C) <1999> Erik Walthinsen + * <2004> Wim Taymans + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place - Suite 330, + * Boston, MA 02111-1307, USA. + */ + +#ifndef __GST_TASK_H__ +#define __GST_TASK_H__ + +#include + +G_BEGIN_DECLS + +typedef void (*GstTaskFunction) (void *data); + +/* --- standard type macros --- */ +#define GST_TYPE_TASK (gst_task_get_type ()) +#define GST_TASK(task) (G_TYPE_CHECK_INSTANCE_CAST ((task), GST_TYPE_TASK, GstTask)) +#define GST_IS_TASK(task) (G_TYPE_CHECK_INSTANCE_TYPE ((task), GST_TYPE_TASK)) +#define GST_TASK_CLASS(tclass) (G_TYPE_CHECK_CLASS_CAST ((tclass), GST_TYPE_TASK, GstTaskClass)) +#define GST_IS_TASK_CLASS(tclass) (G_TYPE_CHECK_CLASS_TYPE ((tclass), GST_TYPE_TASK)) +#define GST_TASK_GET_CLASS(task) (G_TYPE_INSTANCE_GET_CLASS ((task), GST_TYPE_TASK, GstTaskClass)) + +typedef struct _GstTask GstTask; +typedef struct _GstTaskClass GstTaskClass; + +struct _GstTask { + GstObject object; + + gpointer _gst_reserved[GST_PADDING]; +}; + +struct _GstTaskClass { + GstObjectClass parent_class; + + gboolean (*start) (GstTask *task); + gboolean (*stop) (GstTask *task); + + gpointer _gst_reserved[GST_PADDING]; +}; + +GType gst_task_get_type (void); + +gboolean gst_task_start (GstTask *task); +gboolean gst_task_stop (GstTask *task); + +G_END_DECLS + +#endif /* __GST_TASK_H__ */ + diff --git a/gst/schedulers/threadscheduler.c b/gst/schedulers/threadscheduler.c new file mode 100644 index 0000000000..e473f7c11f --- /dev/null +++ b/gst/schedulers/threadscheduler.c @@ -0,0 +1,331 @@ +/* GStreamer2 + * Copyright (C) 2004 Wim Taymans + * + * threadscheduler.c: scheduler using threads + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place - Suite 330, + * Boston, MA 02111-1307, USA. + */ + +#ifdef HAVE_CONFIG_H +# include "config.h" +#endif + +#include +#include "../gst-i18n-lib.h" + +GST_DEBUG_CATEGORY_STATIC (debug_scheduler); +#define GST_CAT_DEFAULT debug_scheduler + +#define GST_TYPE_THREAD_SCHEDULER \ + (gst_thread_scheduler_get_type ()) +#define GST_THREAD_SCHEDULER(obj) \ + (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_THREAD_SCHEDULER,GstThreadScheduler)) +#define GST_THREAD_SCHEDULER_CLASS(klass) \ + (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_THREAD_SCHEDULER,GstThreadSchedulerClass)) +#define GST_IS_THREAD_SCHEDULER(obj) \ + (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_THREAD_SCHEDULER)) +#define GST_IS_THREAD_SCHEDULER_CLASS(obj) \ + (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_THREAD_SCHEDULER)) + +#define SCHED(element) (GST_THREAD_SCHEDULER ((element)->sched)) + +GType gst_thread_scheduler_get_type (void); + +typedef struct _GstThreadScheduler GstThreadScheduler; +typedef struct _GstThreadSchedulerClass GstThreadSchedulerClass; + +struct _GstThreadScheduler +{ + GstScheduler scheduler; + + GThreadPool *pool; +}; + +struct _GstThreadSchedulerClass +{ + GstSchedulerClass scheduler_class; +}; + +#define ELEMENT_PRIVATE(element) GST_ELEMENT (element)->sched_private +#define PAD_PRIVATE(pad) (GST_REAL_PAD (pad))->sched_private + +#define GST_TYPE_THREAD_SCHEDULER_TASK \ + (gst_thread_scheduler_task_get_type ()) +#define GST_THREAD_SCHEDULER_TASK(obj) \ + (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_THREAD_SCHEDULER_TASK,GstThreadSchedulerTask)) +#define GST_THREAD_SCHEDULER_TASK_CLASS(klass) \ + (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_THREAD_SCHEDULER_TASK,GstThreadSchedulerTaskClass)) +#define GST_IS_THREAD_SCHEDULER_TASK(obj) \ + (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_THREAD_SCHEDULER_TASK)) +#define GST_IS_THREAD_SCHEDULER_TASK_CLASS(obj) \ + (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_THREAD_SCHEDULER_TASK)) + +typedef struct _GstThreadSchedulerTask GstThreadSchedulerTask; +typedef struct _GstThreadSchedulerTaskClass GstThreadSchedulerTaskClass; + +typedef enum +{ + STATE_STOPPED, + STATE_STARTED +} TaskState; + + +struct _GstThreadSchedulerTask +{ + GstTask task; + + TaskState state; + GMutex *lock; + + GstTaskFunction func; + gpointer data; +}; + +struct _GstThreadSchedulerTaskClass +{ + GstTaskClass parent_class; + +}; + +static void gst_thread_scheduler_task_class_init (gpointer g_class, + gpointer data); +static void gst_thread_scheduler_task_init (GstThreadSchedulerTask * object); + +static gboolean gst_thread_scheduler_task_start (GstTask * task); +static gboolean gst_thread_scheduler_task_stop (GstTask * task); + +GType +gst_thread_scheduler_task_get_type (void) +{ + static GType object_type = 0; + + if (object_type == 0) { + static const GTypeInfo object_info = { + sizeof (GstThreadSchedulerTaskClass), + NULL, + NULL, + gst_thread_scheduler_task_class_init, + NULL, + NULL, + sizeof (GstThreadSchedulerTask), + 0, + (GInstanceInitFunc) gst_thread_scheduler_task_init + }; + + object_type = + g_type_register_static (GST_TYPE_TASK, + "GstThreadSchedulerTask", &object_info, 0); + } + return object_type; +} + +static void +gst_thread_scheduler_task_class_init (gpointer klass, gpointer class_data) +{ + GstTaskClass *task = GST_TASK_CLASS (klass); + + task->start = gst_thread_scheduler_task_start; + task->stop = gst_thread_scheduler_task_stop; +} + +static void +gst_thread_scheduler_task_init (GstThreadSchedulerTask * task) +{ + task->state = STATE_STOPPED; + task->lock = g_mutex_new (); +} + +static gboolean +gst_thread_scheduler_task_start (GstTask * task) +{ + GstThreadSchedulerTask *ttask = GST_THREAD_SCHEDULER_TASK (task); + GstThreadScheduler *tsched = + GST_THREAD_SCHEDULER (gst_object_get_parent (GST_OBJECT (task))); + + g_mutex_lock (ttask->lock); + if (ttask->state != STATE_STARTED) { + ttask->state = STATE_STARTED; + + g_thread_pool_push (tsched->pool, task, NULL); + } + g_mutex_unlock (ttask->lock); + + return TRUE; +} + +static gboolean +gst_thread_scheduler_task_stop (GstTask * task) +{ + GstThreadSchedulerTask *ttask = GST_THREAD_SCHEDULER_TASK (task); + + g_mutex_lock (ttask->lock); + if (ttask->state != STATE_STOPPED) { + ttask->state = STATE_STOPPED; + } + g_mutex_unlock (ttask->lock); + return TRUE; +} + + +static void gst_thread_scheduler_class_init (gpointer g_class, gpointer data); +static void gst_thread_scheduler_init (GstThreadScheduler * object); + +GType +gst_thread_scheduler_get_type (void) +{ + static GType object_type = 0; + + if (object_type == 0) { + static const GTypeInfo object_info = { + sizeof (GstThreadSchedulerClass), + NULL, + NULL, + gst_thread_scheduler_class_init, + NULL, + NULL, + sizeof (GstThreadScheduler), + 0, + (GInstanceInitFunc) gst_thread_scheduler_init + }; + + object_type = + g_type_register_static (GST_TYPE_SCHEDULER, + "GstThreadScheduler", &object_info, 0); + } + return object_type; +} + +static void gst_thread_scheduler_setup (GstScheduler * sched); +static void gst_thread_scheduler_reset (GstScheduler * sched); +static void gst_thread_scheduler_add_element (GstScheduler * sched, + GstElement * element); +static void gst_thread_scheduler_remove_element (GstScheduler * sched, + GstElement * element); +static GstTask *gst_thread_scheduler_create_task (GstScheduler * sched, + GstTaskFunction func, gpointer data); +static void gst_thread_scheduler_show (GstScheduler * scheduler); + +static void +gst_thread_scheduler_class_init (gpointer klass, gpointer class_data) +{ + GstSchedulerClass *scheduler = GST_SCHEDULER_CLASS (klass); + + scheduler->setup = gst_thread_scheduler_setup; + scheduler->reset = gst_thread_scheduler_reset; + scheduler->add_element = gst_thread_scheduler_add_element; + scheduler->remove_element = gst_thread_scheduler_remove_element; + scheduler->create_task = gst_thread_scheduler_create_task; + scheduler->clock_wait = NULL; + scheduler->show = gst_thread_scheduler_show; +} + +static void +gst_thread_scheduler_func (GstThreadSchedulerTask * task, + GstThreadScheduler * sched) +{ + gst_object_ref (GST_OBJECT (task)); + GST_DEBUG_OBJECT (sched, "Entering task %p, thread %p", task, + g_thread_self ()); + g_mutex_lock (task->lock); + while (task->state == STATE_STARTED) { + g_mutex_unlock (task->lock); + task->func (task->data); + g_mutex_lock (task->lock); + } + g_mutex_unlock (task->lock); + GST_DEBUG_OBJECT (sched, "Exit task %p, thread %p", task, g_thread_self ()); + gst_object_unref (GST_OBJECT (task)); +} + +static void +gst_thread_scheduler_init (GstThreadScheduler * scheduler) +{ + scheduler->pool = g_thread_pool_new ( + (GFunc) gst_thread_scheduler_func, scheduler, -1, FALSE, NULL); +} + +static GstTask * +gst_thread_scheduler_create_task (GstScheduler * sched, GstTaskFunction func, + gpointer data) +{ + GstThreadSchedulerTask *task; + + task = + GST_THREAD_SCHEDULER_TASK (g_object_new (GST_TYPE_THREAD_SCHEDULER_TASK, + NULL)); + gst_object_set_parent (GST_OBJECT (task), GST_OBJECT (sched)); + task->func = func; + task->data = data; + + GST_DEBUG_OBJECT (sched, "Created task %p", task); + + return GST_TASK (task); +} + +static void +gst_thread_scheduler_setup (GstScheduler * sched) +{ +} + +static void +gst_thread_scheduler_reset (GstScheduler * sched) +{ +} + +static void +gst_thread_scheduler_add_element (GstScheduler * scheduler, + GstElement * element) +{ + g_print ("add element\n"); +} + +static void +gst_thread_scheduler_remove_element (GstScheduler * scheduler, + GstElement * element) +{ + GstThreadSchedulerTask *task;; + + task = ELEMENT_PRIVATE (element); + if (task) { + g_object_unref (G_OBJECT (task)); + ELEMENT_PRIVATE (element) = NULL;; + } +} + +static void +gst_thread_scheduler_show (GstScheduler * scheduler) +{ +} + +static gboolean +plugin_init (GstPlugin * plugin) +{ + GstSchedulerFactory *factory; + + GST_DEBUG_CATEGORY_INIT (debug_scheduler, "thread", 0, "thread scheduler"); + + factory = gst_scheduler_factory_new ("thread", + "A scheduler using threads", GST_TYPE_THREAD_SCHEDULER); + if (factory == NULL) + return FALSE; + + gst_plugin_add_feature (plugin, GST_PLUGIN_FEATURE (factory)); + return TRUE; +} + +GST_PLUGIN_DEFINE (GST_VERSION_MAJOR, GST_VERSION_MINOR, "gstthreadscheduler", + "a thread scheduler", plugin_init, VERSION, GST_LICENSE, GST_PACKAGE, + GST_ORIGIN)