newly added files

Original commit message from CVS:
newly added files
This commit is contained in:
Thomas Vander Stichele 2004-12-08 18:05:14 +00:00
parent 3ffce00efc
commit 65285e1acf
9 changed files with 1577 additions and 0 deletions

311
gst/gstbus.c Normal file
View file

@ -0,0 +1,311 @@
/* GStreamer
* Copyright (C) 2004 Wim Taymans <wim@fluendo.com>
*
* gstbus.c: GstBus subsystem
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 59 Temple Place - Suite 330,
* Boston, MA 02111-1307, USA.
*/
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include "gst_private.h"
#include "gstinfo.h"
#include "gstbus.h"
enum
{
ARG_0,
};
static void gst_bus_class_init (GstBusClass * klass);
static void gst_bus_init (GstBus * bus);
static void gst_bus_dispose (GObject * object);
static void gst_bus_set_property (GObject * object, guint prop_id,
const GValue * value, GParamSpec * pspec);
static void gst_bus_get_property (GObject * object, guint prop_id,
GValue * value, GParamSpec * pspec);
static GstObjectClass *parent_class = NULL;
/* static guint gst_bus_signals[LAST_SIGNAL] = { 0 }; */
GType
gst_bus_get_type (void)
{
static GType bus_type = 0;
if (!bus_type) {
static const GTypeInfo bus_info = {
sizeof (GstBusClass),
NULL,
NULL,
(GClassInitFunc) gst_bus_class_init,
NULL,
NULL,
sizeof (GstBus),
0,
(GInstanceInitFunc) gst_bus_init,
NULL
};
bus_type = g_type_register_static (GST_TYPE_OBJECT, "GstBus", &bus_info, 0);
}
return bus_type;
}
static void
gst_bus_class_init (GstBusClass * klass)
{
GObjectClass *gobject_class;
GstObjectClass *gstobject_class;
gobject_class = (GObjectClass *) klass;
gstobject_class = (GstObjectClass *) klass;
parent_class = g_type_class_ref (GST_TYPE_OBJECT);
if (!g_thread_supported ())
g_thread_init (NULL);
gobject_class->dispose = GST_DEBUG_FUNCPTR (gst_bus_dispose);
gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_bus_set_property);
gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_bus_get_property);
}
static void
gst_bus_init (GstBus * bus)
{
bus->queue = g_async_queue_new ();
if (socketpair (PF_UNIX, SOCK_STREAM, 0, bus->control_socket) < 0) {
g_warning ("cannot create io channel");
} else {
bus->io_channel = g_io_channel_unix_new (bus->control_socket[0]);
}
}
static void
gst_bus_dispose (GObject * object)
{
GstBus *bus;
bus = GST_BUS (object);
G_OBJECT_CLASS (parent_class)->dispose (object);
}
static void
gst_bus_set_property (GObject * object, guint prop_id,
const GValue * value, GParamSpec * pspec)
{
GstBus *bus;
bus = GST_BUS (object);
switch (prop_id) {
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
}
}
static void
gst_bus_get_property (GObject * object, guint prop_id,
GValue * value, GParamSpec * pspec)
{
GstBus *bus;
bus = GST_BUS (object);
switch (prop_id) {
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
}
}
gboolean
gst_bus_post (GstBus * bus, GstMessage * message)
{
gchar c;
GstBusSyncReply reply = GST_BUS_PASS;
g_return_val_if_fail (GST_IS_BUS (bus), FALSE);
g_return_val_if_fail (GST_IS_MESSAGE (message), FALSE);
if (bus->sync_handler) {
reply = bus->sync_handler (bus, message, bus->sync_handler_data);
}
switch (reply) {
case GST_BUS_DROP:
break;
case GST_BUS_PASS:
g_async_queue_push (bus->queue, message);
c = 'p';
write (bus->control_socket[1], &c, 1);
break;
case GST_BUS_ASYNC:
{
GMutex *lock = g_mutex_new ();
GCond *cond = g_cond_new ();
message->cond = cond;
message->lock = lock;
GST_DEBUG ("waiting for async delivery of message %p", message);
g_mutex_lock (lock);
g_async_queue_push (bus->queue, message);
c = 'p';
write (bus->control_socket[1], &c, 1);
g_cond_wait (cond, lock);
g_mutex_unlock (lock);
GST_DEBUG ("message %p delivered asynchronously", message);
g_mutex_free (lock);
g_cond_free (cond);
break;
}
}
return TRUE;
}
gboolean
gst_bus_have_pending (GstBus * bus)
{
gint length;
g_return_val_if_fail (GST_IS_BUS (bus), FALSE);
length = g_async_queue_length (bus->queue);
return (length > 0);
}
GstMessage *
gst_bus_pop (GstBus * bus)
{
GstMessage *message;
gchar c;
g_return_val_if_fail (GST_IS_BUS (bus), FALSE);
message = g_async_queue_pop (bus->queue);
read (bus->control_socket[0], &c, 1);
return message;
}
void
gst_bus_set_sync_handler (GstBus * bus, GstBusSyncHandler func, gpointer data)
{
g_return_if_fail (GST_IS_BUS (bus));
bus->sync_handler = func;
bus->sync_handler_data = data;
}
GSource *
gst_bus_create_watch (GstBus * bus)
{
GSource *source;
g_return_val_if_fail (GST_IS_BUS (bus), FALSE);
source = g_io_create_watch (bus->io_channel, G_IO_IN);
return source;
}
typedef struct
{
GSource *source;
GstBus *bus;
gint priority;
GstBusHandler handler;
gpointer user_data;
GDestroyNotify notify;
} GstBusWatch;
static gboolean
bus_callback (GIOChannel * channel, GIOCondition cond, GstBusWatch * watch)
{
GstMessage *message;
g_return_val_if_fail (GST_IS_BUS (watch->bus), FALSE);
message = gst_bus_pop (watch->bus);
if (watch->handler)
watch->handler (watch->bus, message, watch->user_data);
return TRUE;
}
static void
bus_destroy (GstBusWatch * watch)
{
g_print ("destroy\n");
if (watch->notify) {
watch->notify (watch->user_data);
}
g_free (watch);
}
guint
gst_bus_add_watch_full (GstBus * bus, gint priority,
GstBusHandler handler, gpointer user_data, GDestroyNotify notify)
{
guint id;
GstBusWatch *watch;
g_return_val_if_fail (GST_IS_BUS (bus), 0);
watch = g_new (GstBusWatch, 1);
watch->source = gst_bus_create_watch (bus);
watch->bus = bus;
watch->priority = priority;
watch->handler = handler;
watch->user_data = user_data;
watch->notify = notify;
if (priority != G_PRIORITY_DEFAULT)
g_source_set_priority (watch->source, priority);
g_source_set_callback (watch->source, (GSourceFunc) bus_callback, watch,
(GDestroyNotify) bus_destroy);
id = g_source_attach (watch->source, NULL);
g_source_unref (watch->source);
return id;
}
guint
gst_bus_add_watch (GstBus * bus, GstBusHandler handler, gpointer user_data)
{
return gst_bus_add_watch_full (bus, G_PRIORITY_DEFAULT, handler, user_data,
NULL);
}

94
gst/gstbus.h Normal file
View file

@ -0,0 +1,94 @@
/* GStreamer
* Copyright (C) 2004 Wim Taymans <wim@fluendo.com>
*
* gstbus.h: Header for GstBus subsystem
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 59 Temple Place - Suite 330,
* Boston, MA 02111-1307, USA.
*/
#ifndef __GST_BUS_H__
#define __GST_BUS_H__
#include <gst/gsttypes.h>
#include <gst/gstmessage.h>
G_BEGIN_DECLS
/* --- standard type macros --- */
#define GST_TYPE_BUS (gst_bus_get_type ())
#define GST_BUS(bus) (G_TYPE_CHECK_INSTANCE_CAST ((bus), GST_TYPE_BUS, GstBus))
#define GST_IS_BUS(bus) (G_TYPE_CHECK_INSTANCE_TYPE ((bus), GST_TYPE_BUS))
#define GST_BUS_CLASS(bclass) (G_TYPE_CHECK_CLASS_CAST ((bclass), GST_TYPE_BUS, GstBusClass))
#define GST_IS_BUS_CLASS(bclass) (G_TYPE_CHECK_CLASS_TYPE ((bclass), GST_TYPE_BUS))
#define GST_BUS_GET_CLASS(bus) (G_TYPE_INSTANCE_GET_CLASS ((bus), GST_TYPE_BUS, GstBusClass))
typedef enum
{
GST_BUS_DROP = 0, /* drop message */
GST_BUS_PASS = 1, /* pass message to async queue */
GST_BUS_ASYNC = 2, /* pass message to async queue, continue if message is handled */
} GstBusSyncReply;
typedef struct _GstBus GstBus;
typedef struct _GstBusClass GstBusClass;
typedef GstBusSyncReply (*GstBusSyncHandler) (GstBus *bus, GstMessage *message, gpointer data);
typedef gboolean (*GstBusHandler) (GstBus *bus, GstMessage *message, gpointer data);
struct _GstBus {
GstObject object;
GAsyncQueue *queue;
GstBusSyncHandler sync_handler;
gpointer sync_handler_data;
gint control_socket[2];
GIOChannel *io_channel;
gpointer _gst_reserved[GST_PADDING];
};
struct _GstBusClass {
GstObjectClass parent_class;
gpointer _gst_reserved[GST_PADDING];
};
GType gst_bus_get_type (void);
gboolean gst_bus_post (GstBus *bus, GstMessage *message);
gboolean gst_bus_have_pending (GstBus *bus);
const GstMessage* gst_bus_peek (GstBus *bus);
GstMessage* gst_bus_pop (GstBus *bus);
void gst_bus_set_sync_handler (GstBus *bus, GstBusSyncHandler func,
gpointer data);
GSource* gst_bus_create_watch (GstBus *bus);
guint gst_bus_add_watch_full (GstBus *bus,
gint priority,
GstBusHandler handler,
gpointer user_data,
GDestroyNotify notify);
guint gst_bus_add_watch (GstBus *bus,
GstBusHandler handler,
gpointer user_data);
G_END_DECLS
#endif /* __GST_BUS_H__ */

261
gst/gstiterator.c Normal file
View file

@ -0,0 +1,261 @@
/* GStreamer
* Copyright (C) 2004 Wim Taymans <wim@fluendo.com>
*
* gstiterator.h: Base class for iterating lists.
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 59 Temple Place - Suite 330,
* Boston, MA 02111-1307, USA.
*/
#include "gst_private.h"
#include <gst/gstiterator.h>
static void
gst_iterator_init (GstIterator * it,
GMutex * lock,
guint32 * master_cookie,
GstIteratorNextFunction next,
GstIteratorResyncFunction resync, GstIteratorFreeFunction free)
{
it->lock = lock;
it->master_cookie = master_cookie;
it->cookie = *master_cookie;
it->next = next;
it->resync = resync;
it->free = free;
}
GstIterator *
gst_iterator_new (guint size,
GMutex * lock,
guint32 * master_cookie,
GstIteratorNextFunction next,
GstIteratorResyncFunction resync, GstIteratorFreeFunction free)
{
GstIterator *result;
g_return_val_if_fail (size >= sizeof (GstIterator), NULL);
g_return_val_if_fail (master_cookie != NULL, NULL);
g_return_val_if_fail (next != NULL, NULL);
g_return_val_if_fail (resync != NULL, NULL);
g_return_val_if_fail (free != NULL, NULL);
result = g_malloc (size);
gst_iterator_init (result, lock, master_cookie, next, resync, free);
return result;
}
GstIteratorResult
gst_iterator_next (GstIterator * it, gpointer * elem)
{
GstIteratorResult result;
g_return_val_if_fail (it != NULL, GST_ITERATOR_ERROR);
g_return_val_if_fail (elem != NULL, GST_ITERATOR_ERROR);
if (it->lock)
g_mutex_lock (it->lock);
if (*it->master_cookie != it->cookie) {
result = GST_ITERATOR_RESYNC;
goto done;
}
result = it->next (it, elem);
done:
if (it->lock)
g_mutex_unlock (it->lock);
return result;
}
void
gst_iterator_resync (GstIterator * it)
{
g_return_if_fail (it != NULL);
if (it->lock)
g_mutex_lock (it->lock);
it->resync (it);
it->cookie = *it->master_cookie;
if (it->lock)
g_mutex_unlock (it->lock);
}
void
gst_iterator_free (GstIterator * it)
{
g_return_if_fail (it != NULL);
it->free (it);
}
typedef struct _GstIteratorFilter
{
GstIterator iterator;
GstIterator *slave;
GCompareFunc func;
gpointer user_data;
gboolean compare;
gboolean first;
gboolean found;
} GstIteratorFilter;
static GstIteratorResult
filter_next (GstIteratorFilter * it, gpointer * elem)
{
GstIteratorResult result;
gboolean done = FALSE;
*elem = NULL;
if (it->found)
return GST_ITERATOR_DONE;
while (!done) {
gpointer item;
result = gst_iterator_next (it->slave, &item);
switch (result) {
case GST_ITERATOR_OK:
if (GST_ITERATOR (it)->lock)
g_mutex_unlock (GST_ITERATOR (it)->lock);
if (it->compare) {
if (it->func (item, it->user_data) == 0) {
*elem = item;
done = TRUE;
if (it->first)
it->found = TRUE;
}
} else {
it->func (item, it->user_data);
}
if (GST_ITERATOR (it)->lock)
g_mutex_lock (GST_ITERATOR (it)->lock);
break;
case GST_ITERATOR_RESYNC:
case GST_ITERATOR_DONE:
done = TRUE;
break;
default:
g_assert_not_reached ();
break;
}
}
return result;
}
static void
filter_resync (GstIteratorFilter * it)
{
gst_iterator_resync (it->slave);
it->found = FALSE;
}
static void
filter_uninit (GstIteratorFilter * it)
{
it->slave->lock = GST_ITERATOR (it)->lock;
}
static void
filter_free (GstIteratorFilter * it)
{
filter_uninit (it);
gst_iterator_free (it->slave);
g_free (it);
}
GstIterator *
gst_iterator_filter (GstIterator * it, gpointer user_data, GCompareFunc func)
{
GstIteratorFilter *result;
g_return_val_if_fail (it != NULL, NULL);
g_return_val_if_fail (func != NULL, NULL);
result = (GstIteratorFilter *) gst_iterator_new (sizeof (GstIteratorFilter),
it->lock, it->master_cookie,
(GstIteratorNextFunction) filter_next,
(GstIteratorResyncFunction) filter_resync,
(GstIteratorFreeFunction) filter_free);
it->lock = NULL;
result->func = func;
result->user_data = user_data;
result->slave = it;
result->compare = TRUE;
result->first = FALSE;
result->found = FALSE;
return GST_ITERATOR (result);
}
void
gst_iterator_foreach (GstIterator * it, GFunc function, gpointer user_data)
{
GstIteratorFilter filter;
gpointer dummy;
g_return_if_fail (it != NULL);
g_return_if_fail (function != NULL);
gst_iterator_init (GST_ITERATOR (&filter),
it->lock, it->master_cookie,
(GstIteratorNextFunction) filter_next,
(GstIteratorResyncFunction) filter_resync,
(GstIteratorFreeFunction) filter_uninit);
it->lock = NULL;
filter.func = (GCompareFunc) function;
filter.user_data = user_data;
filter.slave = it;
filter.compare = FALSE;
filter.first = FALSE;
filter.found = FALSE;
gst_iterator_next (GST_ITERATOR (&filter), &dummy);
gst_iterator_free (GST_ITERATOR (&filter));
}
gpointer
gst_iterator_find_custom (GstIterator * it, gpointer user_data,
GCompareFunc func)
{
GstIteratorFilter filter;
gpointer result = NULL;
g_return_val_if_fail (it != NULL, NULL);
g_return_val_if_fail (func != NULL, NULL);
gst_iterator_init (GST_ITERATOR (&filter),
it->lock, it->master_cookie,
(GstIteratorNextFunction) filter_next,
(GstIteratorResyncFunction) filter_resync,
(GstIteratorFreeFunction) filter_uninit);
it->lock = NULL;
filter.func = func;
filter.user_data = user_data;
filter.slave = it;
filter.compare = TRUE;
filter.first = TRUE;
filter.found = FALSE;
gst_iterator_next (GST_ITERATOR (&filter), &result);
gst_iterator_free (GST_ITERATOR (&filter));
return result;
}

78
gst/gstiterator.h Normal file
View file

@ -0,0 +1,78 @@
/* GStreamer
* Copyright (C) 2004 Wim Taymans <wim@fluendo.com>
*
* gstiterator.h: Header for GstIterator
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 59 Temple Place - Suite 330,
* Boston, MA 02111-1307, USA.
*/
#ifndef __GST_ITERATOR_H__
#define __GST_ITERATOR_H__
#include <glib.h>
G_BEGIN_DECLS
typedef enum {
GST_ITERATOR_DONE = 0,
GST_ITERATOR_OK = 1,
GST_ITERATOR_RESYNC = 2,
GST_ITERATOR_ERROR = 3,
} GstIteratorResult;
typedef struct _GstIterator GstIterator;
typedef GstIteratorResult (*GstIteratorNextFunction) (GstIterator *it, gpointer *result);
typedef void (*GstIteratorResyncFunction) (GstIterator *it);
typedef void (*GstIteratorFreeFunction) (GstIterator *it);
#define GST_ITERATOR(it) ((GstIterator*)(it))
#define GST_ITERATOR_LOCK(it) (GST_ITERATOR(it)->lock)
#define GST_ITERATOR_COOKIE(it) (GST_ITERATOR(it)->cookie)
#define GST_ITERATOR_ORIG_COOKIE(it) (GST_ITERATOR(it)->master_cookie)
struct _GstIterator {
GstIteratorNextFunction next;
GstIteratorResyncFunction resync;
GstIteratorFreeFunction free;
GMutex *lock;
guint32 cookie; /* cookie of the iterator */
guint32 *master_cookie; /* pointer to guint32 holding the cookie when this
iterator was created */
};
GstIterator* gst_iterator_new (guint size,
GMutex *lock,
guint32 *master_cookie,
GstIteratorNextFunction next,
GstIteratorResyncFunction resync,
GstIteratorFreeFunction free);
GstIteratorResult gst_iterator_next (GstIterator *it, gpointer *result);
void gst_iterator_resync (GstIterator *it);
void gst_iterator_free (GstIterator *it);
void gst_iterator_foreach (GstIterator *it, GFunc function,
gpointer user_data);
gpointer gst_iterator_find_custom (GstIterator *it, gpointer user_data,
GCompareFunc func);
GstIterator* gst_iterator_filter (GstIterator *it, gpointer user_data,
GCompareFunc func);
G_END_DECLS
#endif /* __GST_ITERATOR_H__ */

214
gst/gstmessage.c Normal file
View file

@ -0,0 +1,214 @@
/* GStreamer
* Copyright (C) 2004 Wim Taymans <wim@fluendo.com>
*
* gstmessage.c: GstMessage subsystem
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 59 Temple Place - Suite 330,
* Boston, MA 02111-1307, USA.
*/
#include <string.h> /* memcpy */
#include "gst_private.h"
#include "gstdata_private.h"
#include "gstinfo.h"
#include "gstmemchunk.h"
#include "gstmessage.h"
#include "gsttag.h"
#ifndef GST_DISABLE_TRACE
/* #define GST_WITH_ALLOC_TRACE */
#include "gsttrace.h"
static GstAllocTrace *_message_trace;
#endif
static GstMemChunk *chunk;
/* #define MEMPROF */
GType _gst_message_type;
void
_gst_message_initialize (void)
{
/* register the type */
_gst_message_type = g_boxed_type_register_static ("GstMessage",
(GBoxedCopyFunc) gst_data_copy, (GBoxedFreeFunc) gst_data_unref);
#ifndef GST_DISABLE_TRACE
_message_trace = gst_alloc_trace_register (GST_MESSAGE_TRACE_NAME);
#endif
chunk = gst_mem_chunk_new ("GstMessageChunk", sizeof (GstMessage),
sizeof (GstMessage) * 50, 0);
}
static GstMessage *
_gst_message_copy (GstMessage * message)
{
GstMessage *copy;
copy = gst_mem_chunk_alloc (chunk);
#ifndef GST_DISABLE_TRACE
gst_alloc_trace_new (_message_trace, copy);
#endif
memcpy (copy, message, sizeof (GstMessage));
/* FIXME copy/ref additional fields */
switch (GST_MESSAGE_TYPE (message)) {
case GST_MESSAGE_TAG:
copy->message_data.structure.structure =
gst_tag_list_copy ((GstTagList *) message->message_data.structure.
structure);
break;
default:
break;
}
return copy;
}
static void
_gst_message_free (GstMessage * message)
{
GST_CAT_INFO (GST_CAT_MESSAGE, "freeing message %p", message);
if (GST_MESSAGE_SRC (message)) {
gst_object_unref (GST_MESSAGE_SRC (message));
}
if (message->lock) {
g_mutex_lock (message->lock);
g_cond_signal (message->cond);
g_mutex_unlock (message->lock);
}
switch (GST_MESSAGE_TYPE (message)) {
case GST_MESSAGE_ERROR:
g_error_free (GST_MESSAGE_ERROR_ERROR (message));
g_free (GST_MESSAGE_ERROR_DEBUG (message));
break;
case GST_MESSAGE_TAG:
if (GST_IS_TAG_LIST (message->message_data.tag.list)) {
gst_tag_list_free (message->message_data.tag.list);
} else {
g_warning ("tag message %p didn't contain a valid tag list!", message);
GST_ERROR ("tag message %p didn't contain a valid tag list!", message);
}
break;
default:
break;
}
_GST_DATA_DISPOSE (GST_DATA (message));
#ifndef GST_DISABLE_TRACE
gst_alloc_trace_free (_message_trace, message);
#endif
gst_mem_chunk_free (chunk, message);
}
GType
gst_message_get_type (void)
{
return _gst_message_type;
}
/**
* gst_message_new:
* @type: The type of the new message
*
* Allocate a new message of the given type.
*
* Returns: A new message.
*/
GstMessage *
gst_message_new (GstMessageType type, GstObject * src)
{
GstMessage *message;
message = gst_mem_chunk_alloc0 (chunk);
#ifndef GST_DISABLE_TRACE
gst_alloc_trace_new (_message_trace, message);
#endif
GST_CAT_INFO (GST_CAT_MESSAGE, "creating new message %p %d", message, type);
_GST_DATA_INIT (GST_DATA (message),
_gst_message_type,
0,
(GstDataFreeFunction) _gst_message_free,
(GstDataCopyFunction) _gst_message_copy);
GST_MESSAGE_TYPE (message) = type;
GST_MESSAGE_TIMESTAMP (message) = G_GINT64_CONSTANT (0);
gst_object_ref (src);
GST_MESSAGE_SRC (message) = src;
return message;
}
/**
* gst_message_new_eos:
*
* Create a new eos message.
*
* Returns: The new eos message.
*/
GstMessage *
gst_message_new_eos (GstObject * src)
{
GstMessage *message;
message = gst_message_new (GST_MESSAGE_EOS, src);
return message;
}
/**
* gst_message_new_error:
*
* Create a new error message.
*
* Returns: The new error message.
*/
GstMessage *
gst_message_new_error (GstObject * src, GError * error, gchar * debug)
{
GstMessage *message;
message = gst_message_new (GST_MESSAGE_ERROR, src);
GST_MESSAGE_ERROR_ERROR (message) = error;
GST_MESSAGE_ERROR_DEBUG (message) = debug;
return message;
}
/**
* gst_message_new_tag:
*
* Create a new tag message.
*
* Returns: The new tag message.
*/
GstMessage *
gst_message_new_tag (GstObject * src, GstTagList * tag_list)
{
GstMessage *message;
message = gst_message_new (GST_MESSAGE_TAG, src);
GST_MESSAGE_TAG_LIST (message) = tag_list;
return message;
}

106
gst/gstmessage.h Normal file
View file

@ -0,0 +1,106 @@
/* GStreamer
* Copyright (C) 2004 Wim Taymans <wim@fluendo.com>
*
* gstmessage.h: Header for GstMessage subsystem
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 59 Temple Place - Suite 330,
* Boston, MA 02111-1307, USA.
*/
#ifndef __GST_MESSAGE_H__
#define __GST_MESSAGE_H__
#include <gst/gsttypes.h>
#include <gst/gstdata.h>
#include <gst/gstobject.h>
#include <gst/gsttag.h>
#include <gst/gststructure.h>
G_BEGIN_DECLS
GST_EXPORT GType _gst_message_type;
typedef enum {
GST_MESSAGE_UNKNOWN = 0,
GST_MESSAGE_EOS = 1,
GST_MESSAGE_ERROR = 2,
GST_MESSAGE_WARNING = 3,
GST_MESSAGE_INFO = 4,
GST_MESSAGE_TAG = 5,
GST_MESSAGE_BUFFERING = 6,
GST_MESSAGE_STATE_CHANGED = 7,
GST_MESSAGE_STEP_DONE = 8,
} GstMessageType;
#define GST_MESSAGE_TRACE_NAME "GstMessage"
#define GST_TYPE_MESSAGE (_gst_message_type)
#define GST_MESSAGE(message) ((GstMessage*)(message))
#define GST_IS_MESSAGE(message) (GST_DATA_TYPE(message) == GST_TYPE_MESSAGE)
#define GST_MESSAGE_TYPE(message) (GST_MESSAGE(message)->type)
#define GST_MESSAGE_TIMESTAMP(message) (GST_MESSAGE(message)->timestamp)
#define GST_MESSAGE_SRC(message) (GST_MESSAGE(message)->src)
#define GST_MESSAGE_TAG_LIST(message) (GST_MESSAGE(message)->message_data.tag.list)
#define GST_MESSAGE_ERROR_ERROR(message) (GST_MESSAGE(message)->message_data.error.error)
#define GST_MESSAGE_ERROR_DEBUG(message) (GST_MESSAGE(message)->message_data.error.debug)
struct _GstMessage {
GstData data;
GstMessageType type;
guint64 timestamp;
GstObject *src;
GMutex *lock; /* lock and cond for async delivery */
GCond *cond;
union {
struct {
GError *error;
gchar *debug;
} error;
struct {
GstStructure *structure;
} structure;
struct {
GstTagList *list;
} tag;
} message_data;
gpointer _gst_reserved[GST_PADDING];
};
void _gst_message_initialize (void);
GType gst_message_get_type (void);
GstMessage* gst_message_new (GstMessageType type, GstObject *src);
/* refcounting */
#define gst_message_ref(ev) GST_MESSAGE (gst_data_ref (GST_DATA (ev)))
#define gst_message_ref_by_count(ev,c) GST_MESSAGE (gst_data_ref_by_count (GST_DATA (ev), c))
#define gst_message_unref(ev) gst_data_unref (GST_DATA (ev))
/* copy message */
#define gst_message_copy(ev) GST_MESSAGE (gst_data_copy (GST_DATA (ev)))
GstMessage* gst_message_new_eos (GstObject *src);
GstMessage* gst_message_new_error (GstObject *src, GError *error, gchar *debug);
GstMessage* gst_message_new_tag (GstObject *src, GstTagList *tag_list);
G_END_DECLS
#endif /* __GST_MESSAGE_H__ */

118
gst/gsttask.c Normal file
View file

@ -0,0 +1,118 @@
/* GStreamer
* Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
* 2000 Wim Taymans <wim.taymans@chello.be>
*
* gsttask.c: Streaming tasks
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 59 Temple Place - Suite 330,
* Boston, MA 02111-1307, USA.
*/
#include "gst_private.h"
#include "gstinfo.h"
#include "gsttask.h"
static void gst_task_class_init (GstTaskClass * klass);
static void gst_task_init (GstTask * sched);
static void gst_task_dispose (GObject * object);
static GstObjectClass *parent_class = NULL;
GType
gst_task_get_type (void)
{
static GType _gst_task_type = 0;
if (!_gst_task_type) {
static const GTypeInfo task_info = {
sizeof (GstTaskClass),
NULL,
NULL,
(GClassInitFunc) gst_task_class_init,
NULL,
NULL,
sizeof (GstTask),
0,
(GInstanceInitFunc) gst_task_init,
NULL
};
_gst_task_type =
g_type_register_static (GST_TYPE_OBJECT, "GstTask",
&task_info, G_TYPE_FLAG_ABSTRACT);
}
return _gst_task_type;
}
static void
gst_task_class_init (GstTaskClass * klass)
{
GObjectClass *gobject_class;
gobject_class = (GObjectClass *) klass;
parent_class = g_type_class_ref (GST_TYPE_OBJECT);
gobject_class->dispose = GST_DEBUG_FUNCPTR (gst_task_dispose);
}
static void
gst_task_init (GstTask * sched)
{
}
static void
gst_task_dispose (GObject * object)
{
GstTask *task = GST_TASK (object);
/* thse lists should all be NULL */
GST_DEBUG ("task %p dispose", task);
G_OBJECT_CLASS (parent_class)->dispose (object);
}
gboolean
gst_task_start (GstTask * task)
{
GstTaskClass *tclass;
gboolean result = FALSE;
g_return_val_if_fail (GST_IS_TASK (task), result);
tclass = GST_TASK_GET_CLASS (task);
if (tclass->start)
result = tclass->start (task);
return result;
}
gboolean
gst_task_stop (GstTask * task)
{
GstTaskClass *tclass;
gboolean result = FALSE;
g_return_val_if_fail (GST_IS_TASK (task), result);
tclass = GST_TASK_GET_CLASS (task);
if (tclass->stop)
result = tclass->stop (task);
return result;
}

64
gst/gsttask.h Normal file
View file

@ -0,0 +1,64 @@
/* GStreamer
* Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
* <2004> Wim Taymans <wim@fluendo.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 59 Temple Place - Suite 330,
* Boston, MA 02111-1307, USA.
*/
#ifndef __GST_TASK_H__
#define __GST_TASK_H__
#include <gst/gstobject.h>
G_BEGIN_DECLS
typedef void (*GstTaskFunction) (void *data);
/* --- standard type macros --- */
#define GST_TYPE_TASK (gst_task_get_type ())
#define GST_TASK(task) (G_TYPE_CHECK_INSTANCE_CAST ((task), GST_TYPE_TASK, GstTask))
#define GST_IS_TASK(task) (G_TYPE_CHECK_INSTANCE_TYPE ((task), GST_TYPE_TASK))
#define GST_TASK_CLASS(tclass) (G_TYPE_CHECK_CLASS_CAST ((tclass), GST_TYPE_TASK, GstTaskClass))
#define GST_IS_TASK_CLASS(tclass) (G_TYPE_CHECK_CLASS_TYPE ((tclass), GST_TYPE_TASK))
#define GST_TASK_GET_CLASS(task) (G_TYPE_INSTANCE_GET_CLASS ((task), GST_TYPE_TASK, GstTaskClass))
typedef struct _GstTask GstTask;
typedef struct _GstTaskClass GstTaskClass;
struct _GstTask {
GstObject object;
gpointer _gst_reserved[GST_PADDING];
};
struct _GstTaskClass {
GstObjectClass parent_class;
gboolean (*start) (GstTask *task);
gboolean (*stop) (GstTask *task);
gpointer _gst_reserved[GST_PADDING];
};
GType gst_task_get_type (void);
gboolean gst_task_start (GstTask *task);
gboolean gst_task_stop (GstTask *task);
G_END_DECLS
#endif /* __GST_TASK_H__ */

View file

@ -0,0 +1,331 @@
/* GStreamer2
* Copyright (C) 2004 Wim Taymans <wim@fluendo.com>
*
* threadscheduler.c: scheduler using threads
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 59 Temple Place - Suite 330,
* Boston, MA 02111-1307, USA.
*/
#ifdef HAVE_CONFIG_H
# include "config.h"
#endif
#include <gst/gst.h>
#include "../gst-i18n-lib.h"
GST_DEBUG_CATEGORY_STATIC (debug_scheduler);
#define GST_CAT_DEFAULT debug_scheduler
#define GST_TYPE_THREAD_SCHEDULER \
(gst_thread_scheduler_get_type ())
#define GST_THREAD_SCHEDULER(obj) \
(G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_THREAD_SCHEDULER,GstThreadScheduler))
#define GST_THREAD_SCHEDULER_CLASS(klass) \
(G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_THREAD_SCHEDULER,GstThreadSchedulerClass))
#define GST_IS_THREAD_SCHEDULER(obj) \
(G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_THREAD_SCHEDULER))
#define GST_IS_THREAD_SCHEDULER_CLASS(obj) \
(G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_THREAD_SCHEDULER))
#define SCHED(element) (GST_THREAD_SCHEDULER ((element)->sched))
GType gst_thread_scheduler_get_type (void);
typedef struct _GstThreadScheduler GstThreadScheduler;
typedef struct _GstThreadSchedulerClass GstThreadSchedulerClass;
struct _GstThreadScheduler
{
GstScheduler scheduler;
GThreadPool *pool;
};
struct _GstThreadSchedulerClass
{
GstSchedulerClass scheduler_class;
};
#define ELEMENT_PRIVATE(element) GST_ELEMENT (element)->sched_private
#define PAD_PRIVATE(pad) (GST_REAL_PAD (pad))->sched_private
#define GST_TYPE_THREAD_SCHEDULER_TASK \
(gst_thread_scheduler_task_get_type ())
#define GST_THREAD_SCHEDULER_TASK(obj) \
(G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_THREAD_SCHEDULER_TASK,GstThreadSchedulerTask))
#define GST_THREAD_SCHEDULER_TASK_CLASS(klass) \
(G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_THREAD_SCHEDULER_TASK,GstThreadSchedulerTaskClass))
#define GST_IS_THREAD_SCHEDULER_TASK(obj) \
(G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_THREAD_SCHEDULER_TASK))
#define GST_IS_THREAD_SCHEDULER_TASK_CLASS(obj) \
(G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_THREAD_SCHEDULER_TASK))
typedef struct _GstThreadSchedulerTask GstThreadSchedulerTask;
typedef struct _GstThreadSchedulerTaskClass GstThreadSchedulerTaskClass;
typedef enum
{
STATE_STOPPED,
STATE_STARTED
} TaskState;
struct _GstThreadSchedulerTask
{
GstTask task;
TaskState state;
GMutex *lock;
GstTaskFunction func;
gpointer data;
};
struct _GstThreadSchedulerTaskClass
{
GstTaskClass parent_class;
};
static void gst_thread_scheduler_task_class_init (gpointer g_class,
gpointer data);
static void gst_thread_scheduler_task_init (GstThreadSchedulerTask * object);
static gboolean gst_thread_scheduler_task_start (GstTask * task);
static gboolean gst_thread_scheduler_task_stop (GstTask * task);
GType
gst_thread_scheduler_task_get_type (void)
{
static GType object_type = 0;
if (object_type == 0) {
static const GTypeInfo object_info = {
sizeof (GstThreadSchedulerTaskClass),
NULL,
NULL,
gst_thread_scheduler_task_class_init,
NULL,
NULL,
sizeof (GstThreadSchedulerTask),
0,
(GInstanceInitFunc) gst_thread_scheduler_task_init
};
object_type =
g_type_register_static (GST_TYPE_TASK,
"GstThreadSchedulerTask", &object_info, 0);
}
return object_type;
}
static void
gst_thread_scheduler_task_class_init (gpointer klass, gpointer class_data)
{
GstTaskClass *task = GST_TASK_CLASS (klass);
task->start = gst_thread_scheduler_task_start;
task->stop = gst_thread_scheduler_task_stop;
}
static void
gst_thread_scheduler_task_init (GstThreadSchedulerTask * task)
{
task->state = STATE_STOPPED;
task->lock = g_mutex_new ();
}
static gboolean
gst_thread_scheduler_task_start (GstTask * task)
{
GstThreadSchedulerTask *ttask = GST_THREAD_SCHEDULER_TASK (task);
GstThreadScheduler *tsched =
GST_THREAD_SCHEDULER (gst_object_get_parent (GST_OBJECT (task)));
g_mutex_lock (ttask->lock);
if (ttask->state != STATE_STARTED) {
ttask->state = STATE_STARTED;
g_thread_pool_push (tsched->pool, task, NULL);
}
g_mutex_unlock (ttask->lock);
return TRUE;
}
static gboolean
gst_thread_scheduler_task_stop (GstTask * task)
{
GstThreadSchedulerTask *ttask = GST_THREAD_SCHEDULER_TASK (task);
g_mutex_lock (ttask->lock);
if (ttask->state != STATE_STOPPED) {
ttask->state = STATE_STOPPED;
}
g_mutex_unlock (ttask->lock);
return TRUE;
}
static void gst_thread_scheduler_class_init (gpointer g_class, gpointer data);
static void gst_thread_scheduler_init (GstThreadScheduler * object);
GType
gst_thread_scheduler_get_type (void)
{
static GType object_type = 0;
if (object_type == 0) {
static const GTypeInfo object_info = {
sizeof (GstThreadSchedulerClass),
NULL,
NULL,
gst_thread_scheduler_class_init,
NULL,
NULL,
sizeof (GstThreadScheduler),
0,
(GInstanceInitFunc) gst_thread_scheduler_init
};
object_type =
g_type_register_static (GST_TYPE_SCHEDULER,
"GstThreadScheduler", &object_info, 0);
}
return object_type;
}
static void gst_thread_scheduler_setup (GstScheduler * sched);
static void gst_thread_scheduler_reset (GstScheduler * sched);
static void gst_thread_scheduler_add_element (GstScheduler * sched,
GstElement * element);
static void gst_thread_scheduler_remove_element (GstScheduler * sched,
GstElement * element);
static GstTask *gst_thread_scheduler_create_task (GstScheduler * sched,
GstTaskFunction func, gpointer data);
static void gst_thread_scheduler_show (GstScheduler * scheduler);
static void
gst_thread_scheduler_class_init (gpointer klass, gpointer class_data)
{
GstSchedulerClass *scheduler = GST_SCHEDULER_CLASS (klass);
scheduler->setup = gst_thread_scheduler_setup;
scheduler->reset = gst_thread_scheduler_reset;
scheduler->add_element = gst_thread_scheduler_add_element;
scheduler->remove_element = gst_thread_scheduler_remove_element;
scheduler->create_task = gst_thread_scheduler_create_task;
scheduler->clock_wait = NULL;
scheduler->show = gst_thread_scheduler_show;
}
static void
gst_thread_scheduler_func (GstThreadSchedulerTask * task,
GstThreadScheduler * sched)
{
gst_object_ref (GST_OBJECT (task));
GST_DEBUG_OBJECT (sched, "Entering task %p, thread %p", task,
g_thread_self ());
g_mutex_lock (task->lock);
while (task->state == STATE_STARTED) {
g_mutex_unlock (task->lock);
task->func (task->data);
g_mutex_lock (task->lock);
}
g_mutex_unlock (task->lock);
GST_DEBUG_OBJECT (sched, "Exit task %p, thread %p", task, g_thread_self ());
gst_object_unref (GST_OBJECT (task));
}
static void
gst_thread_scheduler_init (GstThreadScheduler * scheduler)
{
scheduler->pool = g_thread_pool_new (
(GFunc) gst_thread_scheduler_func, scheduler, -1, FALSE, NULL);
}
static GstTask *
gst_thread_scheduler_create_task (GstScheduler * sched, GstTaskFunction func,
gpointer data)
{
GstThreadSchedulerTask *task;
task =
GST_THREAD_SCHEDULER_TASK (g_object_new (GST_TYPE_THREAD_SCHEDULER_TASK,
NULL));
gst_object_set_parent (GST_OBJECT (task), GST_OBJECT (sched));
task->func = func;
task->data = data;
GST_DEBUG_OBJECT (sched, "Created task %p", task);
return GST_TASK (task);
}
static void
gst_thread_scheduler_setup (GstScheduler * sched)
{
}
static void
gst_thread_scheduler_reset (GstScheduler * sched)
{
}
static void
gst_thread_scheduler_add_element (GstScheduler * scheduler,
GstElement * element)
{
g_print ("add element\n");
}
static void
gst_thread_scheduler_remove_element (GstScheduler * scheduler,
GstElement * element)
{
GstThreadSchedulerTask *task;;
task = ELEMENT_PRIVATE (element);
if (task) {
g_object_unref (G_OBJECT (task));
ELEMENT_PRIVATE (element) = NULL;;
}
}
static void
gst_thread_scheduler_show (GstScheduler * scheduler)
{
}
static gboolean
plugin_init (GstPlugin * plugin)
{
GstSchedulerFactory *factory;
GST_DEBUG_CATEGORY_INIT (debug_scheduler, "thread", 0, "thread scheduler");
factory = gst_scheduler_factory_new ("thread",
"A scheduler using threads", GST_TYPE_THREAD_SCHEDULER);
if (factory == NULL)
return FALSE;
gst_plugin_add_feature (plugin, GST_PLUGIN_FEATURE (factory));
return TRUE;
}
GST_PLUGIN_DEFINE (GST_VERSION_MAJOR, GST_VERSION_MINOR, "gstthreadscheduler",
"a thread scheduler", plugin_init, VERSION, GST_LICENSE, GST_PACKAGE,
GST_ORIGIN)