diff --git a/ChangeLog b/ChangeLog index ced2825393..a7b7e46631 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,43 @@ +2006-11-28 Edward Hervey + + * libs/gst/base/Makefile.am: + * libs/gst/base/gstdataqueue.c: (gst_data_queue_get_type), + (gst_data_queue_base_init), (gst_data_queue_class_init), + (gst_data_queue_init), (gst_data_queue_new), + (gst_data_queue_cleanup), (gst_data_queue_finalize), + (gst_data_queue_locked_flush), (gst_data_queue_locked_is_empty), + (gst_data_queue_locked_is_full), (gst_data_queue_flush), + (gst_data_queue_is_empty), (gst_data_queue_is_full), + (gst_data_queue_set_flushing), (gst_data_queue_push), + (gst_data_queue_pop), (gst_data_queue_drop_head), + (gst_data_queue_set_property), (gst_data_queue_get_property): + * libs/gst/base/gstdataqueue.h: + New GstDataQueue object for threadsafe queueing. Most useful for + elements that need some queueing functionnality. + * docs/libs/gstreamer-libs-docs.sgml: + * docs/libs/gstreamer-libs-sections.txt: + Insert documentation for GstDataQueue + * plugins/elements/Makefile.am: + * plugins/elements/gstelements.c: + * plugins/elements/gstmultiqueue.c: (gst_multi_queue_base_init), + (gst_multi_queue_class_init), (gst_multi_queue_init), + (gst_multi_queue_finalize), (gst_multi_queue_set_property), + (gst_multi_queue_get_property), (gst_multi_queue_request_new_pad), + (gst_multi_queue_release_pad), (gst_single_queue_push_one), + (gst_multi_queue_item_destroy), (gst_multi_queue_item_new), + (gst_multi_queue_loop), (gst_multi_queue_chain), + (gst_multi_queue_sink_activate_push), (gst_multi_queue_sink_event), + (gst_multi_queue_getcaps), (gst_multi_queue_bufferalloc), + (gst_multi_queue_src_activate_push), (gst_multi_queue_acceptcaps), + (gst_multi_queue_src_event), (gst_multi_queue_src_query), + (wake_up_next_non_linked), (compute_next_non_linked), + (single_queue_overrun_cb), (single_queue_underrun_cb), + (single_queue_check_full), (gst_single_queue_new): + * plugins/elements/gstmultiqueue.h: + New multiqueue element, using GstDataQueue. Used for queuing multiple + streams. + Closes #344639 and #347785 + 2006-11-22 Stefan Kost * docs/pwg/advanced-types.xml: diff --git a/docs/libs/gstreamer-libs-docs.sgml b/docs/libs/gstreamer-libs-docs.sgml index 9150d26a6a..37e6cd89e9 100644 --- a/docs/libs/gstreamer-libs-docs.sgml +++ b/docs/libs/gstreamer-libs-docs.sgml @@ -11,6 +11,7 @@ + @@ -59,6 +60,7 @@ &GstAdapter; &GstCollectPads; &GstTypeFindHelper; + &GstDataQueue; diff --git a/docs/libs/gstreamer-libs-sections.txt b/docs/libs/gstreamer-libs-sections.txt index fd7e13b957..f027750985 100644 --- a/docs/libs/gstreamer-libs-sections.txt +++ b/docs/libs/gstreamer-libs-sections.txt @@ -303,6 +303,33 @@ gst_type_find_helper_get_range +
+gstdataqueue +GstDataQueue +gst/base/gstdataqueue.h +GstDataQueue +GstDataQueueSize +GstDataQueueCheckFullFunction +GstDataQueueItem +gst_data_queue_new +gst_data_queue_push +gst_data_queue_pop +gst_data_queue_flush +gst_data_queue_set_flushing +gst_data_queue_drop_head +gst_data_queue_is_full +gst_data_queue_is_empty + +GstDataQueueClass +GST_DATA_QUEUE +GST_IS_DATA_QUEUE +GST_TYPE_DATA_QUEUE +GST_DATA_QUEUE_CLASS +GST_IS_DATA_QUEUE_CLASS + +gst_data_queue_get_type +
+ # net
diff --git a/libs/gst/base/Makefile.am b/libs/gst/base/Makefile.am index ab5b99097a..0c72e5da39 100644 --- a/libs/gst/base/Makefile.am +++ b/libs/gst/base/Makefile.am @@ -9,7 +9,8 @@ libgstbase_@GST_MAJORMINOR@_la_SOURCES = \ gstbasetransform.c \ gstcollectpads.c \ gstpushsrc.c \ - gsttypefindhelper.c + gsttypefindhelper.c \ + gstdataqueue.c libgstbase_@GST_MAJORMINOR@_la_CFLAGS = $(GST_OBJ_CFLAGS) libgstbase_@GST_MAJORMINOR@_la_LIBADD = $(GST_OBJ_LIBS) @@ -25,7 +26,8 @@ libgstbase_@GST_MAJORMINOR@include_HEADERS = \ gstbasetransform.h \ gstcollectpads.h \ gstpushsrc.h \ - gsttypefindhelper.h + gsttypefindhelper.h \ + gstdataqueue.h CLEANFILES = *.gcno *.gcda *.gcov diff --git a/libs/gst/base/gstdataqueue.c b/libs/gst/base/gstdataqueue.c new file mode 100644 index 0000000000..a9683ec056 --- /dev/null +++ b/libs/gst/base/gstdataqueue.c @@ -0,0 +1,589 @@ +/* GStreamer + * Copyright (C) 2006 Edward Hervey + * + * gstdataqueue.c: + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place - Suite 330, + * Boston, MA 02111-1307, USA. + */ + +/** + * SECTION:gstdataqueue + * @short_description: Threadsafe queueing object + * + * #GstDataQueue is an object that handles threadsafe queueing of object. It + * also provides size-related functionnality. This object should be used for + * any #GstElement that wishes to provide some sort of queueing functionnality. + */ + +#include +#include "gstdataqueue.h" + +GST_DEBUG_CATEGORY_STATIC (data_queue_debug); +#define GST_CAT_DEFAULT (data_queue_debug) +GST_DEBUG_CATEGORY_STATIC (data_queue_dataflow); + + +/* Queue signals and args */ +enum +{ + SIGNAL_EMPTY, + SIGNAL_FULL, + LAST_SIGNAL +}; + +enum +{ + ARG_0, + ARG_CUR_LEVEL_VISIBLE, + ARG_CUR_LEVEL_BYTES, + ARG_CUR_LEVEL_TIME, + /* FILL ME */ +}; + +#define GST_DATA_QUEUE_MUTEX_LOCK(q) G_STMT_START { \ + GST_CAT_LOG (data_queue_dataflow, \ + "locking qlock from thread %p", \ + g_thread_self ()); \ + g_mutex_lock (q->qlock); \ + GST_CAT_LOG (data_queue_dataflow, \ + "locked qlock from thread %p", \ + g_thread_self ()); \ +} G_STMT_END + +#define GST_DATA_QUEUE_MUTEX_LOCK_CHECK(q, label) G_STMT_START { \ + GST_DATA_QUEUE_MUTEX_LOCK (q); \ + if (q->flushing) \ + goto label; \ + } G_STMT_END + +#define GST_DATA_QUEUE_MUTEX_UNLOCK(q) G_STMT_START { \ + GST_CAT_LOG (data_queue_dataflow, \ + "unlocking qlock from thread %p", \ + g_thread_self ()); \ + g_mutex_unlock (q->qlock); \ +} G_STMT_END + +#define STATUS(q, msg) \ + GST_CAT_LOG (data_queue_dataflow, \ + "queue:%p " msg ": %u visible items, %u " \ + "bytes, %"G_GUINT64_FORMAT \ + " ns, %u elements", \ + queue, \ + q->cur_level.visible, \ + q->cur_level.bytes, \ + q->cur_level.time, \ + q->queue->length) + +static void gst_data_queue_base_init (GstDataQueueClass * klass); +static void gst_data_queue_class_init (GstDataQueueClass * klass); +static void gst_data_queue_init (GstDataQueue * queue); +static void gst_data_queue_finalize (GObject * object); + +static void gst_data_queue_set_property (GObject * object, + guint prop_id, const GValue * value, GParamSpec * pspec); +static void gst_data_queue_get_property (GObject * object, + guint prop_id, GValue * value, GParamSpec * pspec); + +static GObjectClass *parent_class = NULL; +static guint gst_data_queue_signals[LAST_SIGNAL] = { 0 }; + +GType +gst_data_queue_get_type (void) +{ + static GType queue_type = 0; + + if (!queue_type) { + static const GTypeInfo queue_info = { + sizeof (GstDataQueueClass), + (GBaseInitFunc) gst_data_queue_base_init, + NULL, + (GClassInitFunc) gst_data_queue_class_init, + NULL, + NULL, + sizeof (GstDataQueue), + 0, + (GInstanceInitFunc) gst_data_queue_init, + NULL + }; + + queue_type = g_type_register_static (G_TYPE_OBJECT, + "GstDataQueue", &queue_info, 0); + GST_DEBUG_CATEGORY_INIT (data_queue_debug, "dataqueue", 0, + "data queue object"); + GST_DEBUG_CATEGORY_INIT (data_queue_dataflow, "data_queue_dataflow", 0, + "dataflow inside the data queue object"); + } + + return queue_type; +} + +static void +gst_data_queue_base_init (GstDataQueueClass * klass) +{ + /* Do we need anything here ?? */ + return; +} + +static void +gst_data_queue_class_init (GstDataQueueClass * klass) +{ + GObjectClass *gobject_class = G_OBJECT_CLASS (klass); + + parent_class = g_type_class_peek_parent (klass); + + gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_data_queue_set_property); + gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_data_queue_get_property); + + /* signals */ + /** + * GstDataQueue::empty: + * @queue: the queue instance + * + * Reports that the queue became empty (empty). + * A queue is empty if the total amount of visible items inside it (num-visible, time, + * size) is lower than the boundary values which can be set through the GObject + * properties. + */ + gst_data_queue_signals[SIGNAL_EMPTY] = + g_signal_new ("empty", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST, + G_STRUCT_OFFSET (GstDataQueueClass, empty), NULL, NULL, + g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0); + + /** + * GstDataQueue::full: + * @queue: the queue instance + * + * Reports that the queue became full (full). + * A queue is full if the total amount of data inside it (num-visible, time, + * size) is higher than the boundary values which can be set through the GObject + * properties. + */ + gst_data_queue_signals[SIGNAL_FULL] = + g_signal_new ("full", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST, + G_STRUCT_OFFSET (GstDataQueueClass, full), NULL, NULL, + g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0); + + /* properties */ + g_object_class_install_property (gobject_class, ARG_CUR_LEVEL_BYTES, + g_param_spec_uint ("current-level-bytes", "Current level (kB)", + "Current amount of data in the queue (bytes)", + 0, G_MAXUINT, 0, G_PARAM_READABLE)); + g_object_class_install_property (gobject_class, ARG_CUR_LEVEL_VISIBLE, + g_param_spec_uint ("current-level-visible", + "Current level (visible items)", + "Current number of visible items in the queue", 0, G_MAXUINT, 0, + G_PARAM_READABLE)); + g_object_class_install_property (gobject_class, ARG_CUR_LEVEL_TIME, + g_param_spec_uint64 ("current-level-time", "Current level (ns)", + "Current amount of data in the queue (in ns)", 0, G_MAXUINT64, 0, + G_PARAM_READABLE)); + + /* set several parent class virtual functions */ + gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_data_queue_finalize); + +} + +static void +gst_data_queue_init (GstDataQueue * queue) +{ + queue->cur_level.visible = 0; /* no content */ + queue->cur_level.bytes = 0; /* no content */ + queue->cur_level.time = 0; /* no content */ + + queue->checkfull = NULL; + + queue->qlock = g_mutex_new (); + queue->item_add = g_cond_new (); + queue->item_del = g_cond_new (); + queue->queue = g_queue_new (); + + GST_DEBUG ("initialized queue's not_empty & not_full conditions"); +} + +/** + * gst_data_queue_new: + * @checkfull: the callback used to tell if the element considers the queue full + * or not. + * @checkdata: a #gpointer that will be given in the @checkfull callback. + * + * Returns: a new #GstDataQueue. + */ + +GstDataQueue * +gst_data_queue_new (GstDataQueueCheckFullFunction checkfull, gpointer checkdata) +{ + GstDataQueue *ret; + + g_return_val_if_fail (checkfull != NULL, NULL); + + ret = g_object_new (GST_TYPE_DATA_QUEUE, NULL); + ret->checkfull = checkfull; + ret->checkdata = checkdata; + + return ret; +} + +static void +gst_data_queue_cleanup (GstDataQueue * queue) +{ + while (!g_queue_is_empty (queue->queue)) { + GstDataQueueItem *item = g_queue_pop_head (queue->queue); + + /* Just call the destroy notify on the item */ + item->destroy (item); + } + queue->cur_level.visible = 0; + queue->cur_level.bytes = 0; + queue->cur_level.time = 0; + +} + +/* called only once, as opposed to dispose */ +static void +gst_data_queue_finalize (GObject * object) +{ + GstDataQueue *queue = GST_DATA_QUEUE (object); + + GST_DEBUG ("finalizing queue"); + + gst_data_queue_cleanup (queue); + g_queue_free (queue->queue); + + GST_DEBUG ("free mutex"); + g_mutex_free (queue->qlock); + GST_DEBUG ("done free mutex"); + + g_cond_free (queue->item_add); + g_cond_free (queue->item_del); + + if (G_OBJECT_CLASS (parent_class)->finalize) + G_OBJECT_CLASS (parent_class)->finalize (object); +} + +static void +gst_data_queue_locked_flush (GstDataQueue * queue) +{ + STATUS (queue, "before flushing"); + gst_data_queue_cleanup (queue); + STATUS (queue, "after flushing"); + /* we deleted something... */ + g_cond_signal (queue->item_del); +} + +static gboolean +gst_data_queue_locked_is_empty (GstDataQueue * queue) +{ + return (queue->queue->length == 0); +} + +static gboolean +gst_data_queue_locked_is_full (GstDataQueue * queue) +{ + return queue->checkfull (queue, queue->cur_level.visible, + queue->cur_level.bytes, queue->cur_level.time, queue->checkdata); +} + +/** + * gst_data_queue_flush: + * @queue: a #GstDataQueue. + * + * Flushes all the contents of the @queue. Any call to #gst_data_queue_pull and + * #gst_data_queue_pop will be released. + * MT safe. + */ + +void +gst_data_queue_flush (GstDataQueue * queue) +{ + GST_DEBUG ("queue:%p", queue); + GST_DATA_QUEUE_MUTEX_LOCK (queue); + gst_data_queue_locked_flush (queue); + GST_DATA_QUEUE_MUTEX_UNLOCK (queue); +} + +/** + * gst_data_queue_is_empty: + * @queue: a #GstDataQueue. + * + * Queries if there are any items in the @queue. + * MT safe. + * + * Returns: #TRUE if @queue is empty. + */ + +gboolean +gst_data_queue_is_empty (GstDataQueue * queue) +{ + gboolean res; + + GST_DATA_QUEUE_MUTEX_LOCK (queue); + res = gst_data_queue_locked_is_empty (queue); + GST_DATA_QUEUE_MUTEX_UNLOCK (queue); + + return res; +} + +/** + * gst_data_queue_is_full: + * @queue: a #GstDataQueue. + * + * Queries if @queue is full. This check will be done using the + * #GstDataQueueCheckFullCallback registered with @queue. + * MT safe. + * + * Returns: #TRUE if @queue is full. + */ + +gboolean +gst_data_queue_is_full (GstDataQueue * queue) +{ + gboolean res; + + GST_DATA_QUEUE_MUTEX_LOCK (queue); + res = gst_data_queue_locked_is_full (queue); + GST_DATA_QUEUE_MUTEX_UNLOCK (queue); + + return res; +} + +/** + * gst_data_queue_set_flushing: + * @queue: a #GstDataQueue. + * @flushing: a #gboolean stating if the queue will be flushing or not. + * + * Sets the queue to flushing state if @flushing is #TRUE. If set to flushing + * state, any incoming data on the @queue will be discarded. Any call currently + * blocking on #gst_data_queue_push or #gst_data_queue_pop will return straight + * away with a return value of #FALSE. While the @queue is in flushing state, + * all calls to those two functions will return #FALSE. + * MT Safe. + */ + +void +gst_data_queue_set_flushing (GstDataQueue * queue, gboolean flushing) +{ + + GST_DEBUG ("queue:%p , flushing:%d", queue, flushing); + GST_DATA_QUEUE_MUTEX_LOCK (queue); + queue->flushing = flushing; + if (flushing) { + /* release push/pop functions */ + g_cond_signal (queue->item_add); + g_cond_signal (queue->item_del); + } + GST_DATA_QUEUE_MUTEX_UNLOCK (queue); +} + +/** + * gst_data_queue_push: + * @queue: a #GstDataQueue. + * @item: a #GstDataQueueItem. + * + * Pushes a #GstDataQueueItem (or a structure that begins with the same fields) + * on the @queue. If the @queue is full, the call will block until space is + * available, OR the @queue is set to flushing state. + * MT safe. + * + * Returns: #TRUE if the @item was successfully pushed on the @queue. + */ +gboolean +gst_data_queue_push (GstDataQueue * queue, GstDataQueueItem * item) +{ + gboolean res = FALSE; + + GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, done); + + STATUS (queue, "before pushing"); + + /* We ALWAYS need to check for queue fillness */ + while (gst_data_queue_locked_is_full (queue)) { + GST_DATA_QUEUE_MUTEX_UNLOCK (queue); + g_signal_emit (G_OBJECT (queue), gst_data_queue_signals[SIGNAL_FULL], 0); + GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, done); + + /* signal might have removed some items */ + while (gst_data_queue_locked_is_full (queue)) { + g_cond_wait (queue->item_del, queue->qlock); + if (queue->flushing) + goto done; + } + } + + g_queue_push_tail (queue->queue, item); + res = TRUE; + + if (item->visible) + queue->cur_level.visible++; + queue->cur_level.bytes += item->size; + queue->cur_level.time += item->duration; + + STATUS (queue, "after pushing"); + + g_cond_signal (queue->item_add); + +done: + GST_DATA_QUEUE_MUTEX_UNLOCK (queue); + + GST_DEBUG ("queue:%p, result:%d", queue, res); + + return res; +} + +/** + * gst_data_queue_pop: + * @queue: a #GstDataQueue. + * @item: pointer to store the returned #GstDataQueueItem. + * + * Retrieves the first @item available on the @queue. If the queue is currently + * empty, the call will block until at least one item is available, OR the + * @queue is set to the flushing state. + * MT safe. + * + * Returns: #TRUE if an @item was successfully retrieved from the @queue. + */ + +gboolean +gst_data_queue_pop (GstDataQueue * queue, GstDataQueueItem ** item) +{ + gboolean res = FALSE; + + GST_DEBUG ("queue:%p", queue); + + g_return_val_if_fail (item != NULL, FALSE); + + GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, done); + + STATUS (queue, "before popping"); + + while (gst_data_queue_locked_is_empty (queue)) { + GST_DATA_QUEUE_MUTEX_UNLOCK (queue); + g_signal_emit (G_OBJECT (queue), gst_data_queue_signals[SIGNAL_EMPTY], 0); + GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, done); + + while (gst_data_queue_locked_is_empty (queue)) { + g_cond_wait (queue->item_add, queue->qlock); + if (queue->flushing) + goto done; + } + } + + /* Get the item from the GQueue */ + *item = g_queue_pop_head (queue->queue); + res = TRUE; + + /* update current level counter */ + if ((*item)->visible) + queue->cur_level.visible--; + queue->cur_level.bytes -= (*item)->size; + queue->cur_level.time -= (*item)->duration; + + g_cond_signal (queue->item_del); + +done: + GST_DATA_QUEUE_MUTEX_UNLOCK (queue); + + GST_DEBUG ("queue:%p , res:%d", queue, res); + + return res; +} + +/** + * gst_data_queue_drop_head: + * @queue: The #GstDataQueue to drop an item from. + * @type: The #GType of the item to drop. + * + * Pop and unref the head-most #GstMiniObject with the given #GType. + * + * Returns: TRUE if an element was removed. + */ + +gboolean +gst_data_queue_drop_head (GstDataQueue * queue, GType type) +{ + gboolean res = FALSE; + GList *item; + GstDataQueueItem *leak = NULL; + + GST_DEBUG ("queue:%p", queue); + + GST_DATA_QUEUE_MUTEX_LOCK (queue); + for (item = g_queue_peek_head_link (queue->queue); item; item = item->next) { + GstDataQueueItem *tmp = (GstDataQueueItem *) item->data; + + if (G_TYPE_CHECK_INSTANCE_TYPE (tmp->object, type)) { + leak = tmp; + break; + } + } + + if (!leak) + goto done; + + g_queue_delete_link (queue->queue, item); + + if (leak->visible) + queue->cur_level.visible--; + queue->cur_level.bytes -= leak->size; + queue->cur_level.time -= leak->duration; + + leak->destroy (leak); + + res = TRUE; + +done: + GST_DATA_QUEUE_MUTEX_UNLOCK (queue); + + GST_DEBUG ("queue:%p , res:%d", queue, res); + + return res; +} + +static void +gst_data_queue_set_property (GObject * object, + guint prop_id, const GValue * value, GParamSpec * pspec) +{ + switch (prop_id) { + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static void +gst_data_queue_get_property (GObject * object, + guint prop_id, GValue * value, GParamSpec * pspec) +{ + GstDataQueue *queue = GST_DATA_QUEUE (object); + + GST_DATA_QUEUE_MUTEX_LOCK (queue); + + switch (prop_id) { + case ARG_CUR_LEVEL_BYTES: + g_value_set_uint (value, queue->cur_level.bytes); + break; + case ARG_CUR_LEVEL_VISIBLE: + g_value_set_uint (value, queue->cur_level.visible); + break; + case ARG_CUR_LEVEL_TIME: + g_value_set_uint64 (value, queue->cur_level.time); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } + + GST_DATA_QUEUE_MUTEX_UNLOCK (queue); +} diff --git a/libs/gst/base/gstdataqueue.h b/libs/gst/base/gstdataqueue.h new file mode 100644 index 0000000000..b3cd87b554 --- /dev/null +++ b/libs/gst/base/gstdataqueue.h @@ -0,0 +1,153 @@ +/* GStreamer + * Copyright (C) 2006 Edward Hervey + * + * gstdataqueue.h: + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place - Suite 330, + * Boston, MA 02111-1307, USA. + */ + + +#ifndef __GST_DATA_QUEUE_H__ +#define __GST_DATA_QUEUE_H__ + +#include + +G_BEGIN_DECLS +#define GST_TYPE_DATA_QUEUE \ + (gst_data_queue_get_type()) +#define GST_DATA_QUEUE(obj) \ + (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_DATA_QUEUE,GstDataQueue)) +#define GST_DATA_QUEUE_CLASS(klass) \ + (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_DATA_QUEUE,GstDataQueueClass)) +#define GST_IS_DATA_QUEUE(obj) \ + (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_DATA_QUEUE)) +#define GST_IS_DATA_QUEUE_CLASS(klass) \ + (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_DATA_QUEUE)) +typedef struct _GstDataQueue GstDataQueue; +typedef struct _GstDataQueueClass GstDataQueueClass; +typedef struct _GstDataQueueSize GstDataQueueSize; +typedef struct _GstDataQueueItem GstDataQueueItem; + +/** + * GstDataQueueItem: + * @object: the #GstMiniObject to queue. + * @size: the size in bytes of the miniobject. + * @duration: the duration in #GstClockTime of the miniobject. Can not be + * #GST_CLOCK_TIME_NONE. + * @visible: #TRUE if @object should be considered as a visible object. + * @destroy: The #GDestroyNotify to use to free the #GstDataQueueItem. + * + * Structure used by #GstDataQueue. You can supply a different structure, as + * long as the top of the structure is identical to this structure. + */ + +struct _GstDataQueueItem +{ + GstMiniObject *object; + guint size; + guint64 duration; + gboolean visible; + + /* user supplied destroy function */ + GDestroyNotify destroy; +}; + +/** + * GstDataQueueSize: + * @visible: number of buffers + * @bytes: number of bytes + * @time: amount of time + * + * Structure describing the size of a queue. + */ +struct _GstDataQueueSize +{ + guint visible; + guint bytes; + guint64 time; +}; + +/** + * GstDataQueueCheckFullFunction: + * @queue: a #GstDataQueue. + * @visible: The number of visible items currently in the queue. + * @bytes: The amount of bytes currently in the queue. + * @time: The accumulated duration of the items currently in the queue. + * @checkdata: The #gpointer registered when the #GstDataQueue was created. + * + * The prototype of the function used to inform the queue that it should be + * considered as full. + * + * Returns: #TRUE if the queue should be considered full. + */ +typedef gboolean (*GstDataQueueCheckFullFunction) (GstDataQueue * queue, + guint visible, guint bytes, guint64 time, gpointer checkdata); + +/** + * GstDataQueue: + * + * Opaque #GstDataQueue structure. + */ +struct _GstDataQueue +{ + GObject object; + + /*< private > */ + /* the queue of data we're keeping our grubby hands on */ + GQueue *queue; + + GstDataQueueSize cur_level; /* size of the queue */ + GstDataQueueCheckFullFunction checkfull; /* Callback to check if the queue is full */ + gpointer *checkdata; + + GMutex *qlock; /* lock for queue (vs object lock) */ + GCond *item_add; /* signals buffers now available for reading */ + GCond *item_del; /* signals space now available for writing */ + gboolean flushing; /* indicates whether conditions where signalled because + * of external flushing */ + + gpointer _gst_reserved[GST_PADDING]; +}; + +struct _GstDataQueueClass +{ + GObjectClass parent_class; + + /* signals */ + void (*empty) (GstDataQueue * queue); + void (*full) (GstDataQueue * queue); + + gpointer _gst_reserved[GST_PADDING]; +}; + +GType gst_data_queue_get_type (void); + +GstDataQueue *gst_data_queue_new (GstDataQueueCheckFullFunction checkfull, + gpointer checkdata); + +gboolean gst_data_queue_push (GstDataQueue * queue, GstDataQueueItem * item); +gboolean gst_data_queue_pop (GstDataQueue * queue, GstDataQueueItem ** item); + +void gst_data_queue_flush (GstDataQueue * queue); +void gst_data_queue_set_flushing (GstDataQueue * queue, gboolean flushing); + +gboolean gst_data_queue_drop_head (GstDataQueue * queue, GType type); + +gboolean gst_data_queue_is_full (GstDataQueue * queue); +gboolean gst_data_queue_is_empty (GstDataQueue * queue); + +G_END_DECLS +#endif /* __GST_DATA_QUEUE_H__ */ diff --git a/plugins/elements/Makefile.am b/plugins/elements/Makefile.am index 0f2a88fa74..9099aca01a 100644 --- a/plugins/elements/Makefile.am +++ b/plugins/elements/Makefile.am @@ -28,7 +28,8 @@ libgstcoreelements_la_SOURCES = \ gstidentity.c \ gstqueue.c \ gsttee.c \ - gsttypefindelement.c + gsttypefindelement.c \ + gstmultiqueue.c libgstcoreelements_la_CFLAGS = $(GST_OBJ_CFLAGS) libgstcoreelements_la_LIBADD = \ @@ -47,7 +48,8 @@ noinst_HEADERS = \ gstidentity.h \ gstqueue.h \ gsttee.h \ - gsttypefindelement.h + gsttypefindelement.h \ + gstmultiqueue.h EXTRA_DIST = gstfdsrc.c \ gstfdsink.c diff --git a/plugins/elements/gstelements.c b/plugins/elements/gstelements.c index cc25a7ecab..93d383a497 100644 --- a/plugins/elements/gstelements.c +++ b/plugins/elements/gstelements.c @@ -37,6 +37,7 @@ #include "gstqueue.h" #include "gsttee.h" #include "gsttypefindelement.h" +#include "gstmultiqueue.h" struct _elements_entry { @@ -63,6 +64,7 @@ static struct _elements_entry _elements[] = { {"filesink", GST_RANK_PRIMARY, gst_file_sink_get_type}, {"tee", GST_RANK_NONE, gst_tee_get_type}, {"typefind", GST_RANK_NONE, gst_type_find_element_get_type}, + {"multiqueue", GST_RANK_NONE, gst_multi_queue_get_type}, {NULL, 0}, }; diff --git a/plugins/elements/gstmultiqueue.c b/plugins/elements/gstmultiqueue.c new file mode 100644 index 0000000000..5d5f0de64a --- /dev/null +++ b/plugins/elements/gstmultiqueue.c @@ -0,0 +1,1046 @@ +/* GStreamer + * Copyright (C) 2006 Edward Hervey + * + * gstmultiqueue.c: + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place - Suite 330, + * Boston, MA 02111-1307, USA. + */ + +#ifdef HAVE_CONFIG_H +# include "config.h" +#endif + +#include +#include "gstmultiqueue.h" + +/** + * GstSingleQueue: + * @sinkpad: associated sink #GstPad + * @srcpad: associated source #GstPad + * + * Structure containing all information and properties about + * a single queue. + */ + +typedef struct _GstSingleQueue GstSingleQueue; + +struct _GstSingleQueue +{ + /* unique identifier of the queue */ + guint id; + + GstMultiQueue *mqueue; + + GstPad *sinkpad; + GstPad *srcpad; + + /* flowreturn of previous srcpad push */ + GstFlowReturn srcresult; + + /* queue of data */ + GstDataQueue *queue; + GstDataQueueSize max_size, extra_size; + + /* Protected by global lock */ + guint32 nextid; /* ID of the next object waiting to be pushed */ + guint32 oldid; /* ID of the last object pushed (last in a series) */ + GCond *turn; /* SingleQueue turn waiting conditional */ +}; + + +/* Extension of GstDataQueueItem structure for our usage */ +typedef struct _GstMultiQueueItem GstMultiQueueItem; + +struct _GstMultiQueueItem +{ + GstMiniObject *object; + guint size; + guint64 duration; + gboolean visible; + + GDestroyNotify destroy; + guint32 posid; +}; + +static GstSingleQueue *gst_single_queue_new (GstMultiQueue * mqueue); + +static void wake_up_next_non_linked (GstMultiQueue * mq); +static void compute_next_non_linked (GstMultiQueue * mq); + +static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink%d", + GST_PAD_SINK, + GST_PAD_REQUEST, + GST_STATIC_CAPS_ANY); + +static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src%d", + GST_PAD_SRC, + GST_PAD_SOMETIMES, + GST_STATIC_CAPS_ANY); + +GST_DEBUG_CATEGORY_STATIC (multi_queue_debug); +#define GST_CAT_DEFAULT (multi_queue_debug) + +static const GstElementDetails gst_multi_queue_details = +GST_ELEMENT_DETAILS ("MultiQueue", + "Generic", + "Multiple data queue", + "Edward Hervey "); + +#define DEFAULT_MAX_SIZE_BYTES 10 * 1024 * 1024 /* 10 MB */ +#define DEFAULT_MAX_SIZE_BUFFERS 200 +#define DEFAULT_MAX_SIZE_TIME GST_SECOND + +#define DEFAULT_EXTRA_SIZE_BYTES 10 * 1024 * 1024 /* 10 MB */ +#define DEFAULT_EXTRA_SIZE_BUFFERS 200 +#define DEFAULT_EXTRA_SIZE_TIME GST_SECOND + +/* Signals and args */ +enum +{ + SIGNAL_UNDERRUN, + SIGNAL_OVERRUN, + LAST_SIGNAL +}; + +enum +{ + ARG_0, + ARG_EXTRA_SIZE_BYTES, + ARG_EXTRA_SIZE_BUFFERS, + ARG_EXTRA_SIZE_TIME, + ARG_MAX_SIZE_BYTES, + ARG_MAX_SIZE_BUFFERS, + ARG_MAX_SIZE_TIME, +}; + +#define GST_MULTI_QUEUE_MUTEX_LOCK(q) G_STMT_START { \ + GST_CAT_LOG_OBJECT (multi_queue_debug, q, \ + "locking qlock from thread %p", \ + g_thread_self ()); \ + g_mutex_lock (q->qlock); \ + GST_CAT_LOG_OBJECT (multi_queue_debug, q, \ + "locked qlock from thread %p", \ + g_thread_self ()); \ +} G_STMT_END + +#define GST_MULTI_QUEUE_MUTEX_UNLOCK(q) G_STMT_START { \ + GST_CAT_LOG_OBJECT (multi_queue_debug, q, \ + "unlocking qlock from thread %p", \ + g_thread_self ()); \ + g_mutex_unlock (q->qlock); \ +} G_STMT_END + +static void gst_multi_queue_finalize (GObject * object); +static void gst_multi_queue_set_property (GObject * object, + guint prop_id, const GValue * value, GParamSpec * pspec); +static void gst_multi_queue_get_property (GObject * object, + guint prop_id, GValue * value, GParamSpec * pspec); + +static GstPad *gst_multi_queue_request_new_pad (GstElement * element, + GstPadTemplate * temp, const gchar * name); +static void gst_multi_queue_release_pad (GstElement * element, GstPad * pad); + +#define _do_init(bla) \ + GST_DEBUG_CATEGORY_INIT (multi_queue_debug, "multiqueue", 0, "multiqueue element"); + +GST_BOILERPLATE_FULL (GstMultiQueue, gst_multi_queue, GstElement, + GST_TYPE_ELEMENT, _do_init); + +static guint gst_multi_queue_signals[LAST_SIGNAL] = { 0 }; + +static void +gst_multi_queue_base_init (gpointer g_class) +{ + GstElementClass *gstelement_class = GST_ELEMENT_CLASS (g_class); + + gst_element_class_add_pad_template (gstelement_class, + gst_static_pad_template_get (&sinktemplate)); + gst_element_class_add_pad_template (gstelement_class, + gst_static_pad_template_get (&srctemplate)); + gst_element_class_set_details (gstelement_class, &gst_multi_queue_details); +} + +static void +gst_multi_queue_class_init (GstMultiQueueClass * klass) +{ + GObjectClass *gobject_class = G_OBJECT_CLASS (klass); + GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass); + + gobject_class->set_property = + GST_DEBUG_FUNCPTR (gst_multi_queue_set_property); + gobject_class->get_property = + GST_DEBUG_FUNCPTR (gst_multi_queue_get_property); + + /* SIGNALS */ + gst_multi_queue_signals[SIGNAL_UNDERRUN] = + g_signal_new ("underrun", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST, + G_STRUCT_OFFSET (GstMultiQueueClass, underrun), NULL, NULL, + g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0); + + gst_multi_queue_signals[SIGNAL_OVERRUN] = + g_signal_new ("overrun", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST, + G_STRUCT_OFFSET (GstMultiQueueClass, overrun), NULL, NULL, + g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0); + + /* PROPERTIES */ + + g_object_class_install_property (gobject_class, ARG_MAX_SIZE_BYTES, + g_param_spec_uint ("max-size-bytes", "Max. size (kB)", + "Max. amount of data in the queue (bytes, 0=disable)", + 0, G_MAXUINT, DEFAULT_MAX_SIZE_BYTES, G_PARAM_READWRITE)); + g_object_class_install_property (gobject_class, ARG_MAX_SIZE_BUFFERS, + g_param_spec_uint ("max-size-buffers", "Max. size (buffers)", + "Max. number of buffers in the queue (0=disable)", + 0, G_MAXUINT, DEFAULT_MAX_SIZE_BUFFERS, G_PARAM_READWRITE)); + g_object_class_install_property (gobject_class, ARG_MAX_SIZE_TIME, + g_param_spec_uint64 ("max-size-time", "Max. size (ns)", + "Max. amount of data in the queue (in ns, 0=disable)", + 0, G_MAXUINT64, DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE)); + + g_object_class_install_property (gobject_class, ARG_EXTRA_SIZE_BYTES, + g_param_spec_uint ("extra-size-bytes", "Extra Size (kB)", + "Amount of data the queues can grow if one of them is empty (bytes, 0=disable)", + 0, G_MAXUINT, DEFAULT_EXTRA_SIZE_BYTES, G_PARAM_READWRITE)); + g_object_class_install_property (gobject_class, ARG_EXTRA_SIZE_BUFFERS, + g_param_spec_uint ("extra-size-buffers", "Extra Size (buffers)", + "Amount of buffers the queues can grow if one of them is empty (0=disable)", + 0, G_MAXUINT, DEFAULT_EXTRA_SIZE_BUFFERS, G_PARAM_READWRITE)); + g_object_class_install_property (gobject_class, ARG_EXTRA_SIZE_TIME, + g_param_spec_uint64 ("extra-size-time", "Extra Size (ns)", + "Amount of time the queues can grow if one of them is empty (in ns, 0=disable)", + 0, G_MAXUINT64, DEFAULT_EXTRA_SIZE_TIME, G_PARAM_READWRITE)); + + gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_multi_queue_finalize); + + gstelement_class->request_new_pad = + GST_DEBUG_FUNCPTR (gst_multi_queue_request_new_pad); + gstelement_class->release_pad = + GST_DEBUG_FUNCPTR (gst_multi_queue_release_pad); +} + +static void +gst_multi_queue_init (GstMultiQueue * mqueue, GstMultiQueueClass * klass) +{ + + mqueue->nbqueues = 0; + mqueue->queues = NULL; + + mqueue->max_size.bytes = DEFAULT_MAX_SIZE_BYTES; + mqueue->max_size.visible = DEFAULT_MAX_SIZE_BUFFERS; + mqueue->max_size.time = DEFAULT_MAX_SIZE_TIME; + + mqueue->extra_size.bytes = DEFAULT_EXTRA_SIZE_BYTES; + mqueue->extra_size.visible = DEFAULT_EXTRA_SIZE_BUFFERS; + mqueue->extra_size.time = DEFAULT_EXTRA_SIZE_TIME; + + mqueue->counter = 0; + mqueue->highid = -1; + mqueue->nextnotlinked = -1; + + mqueue->qlock = g_mutex_new (); + + /* FILLME ? */ +} + +static void +gst_multi_queue_finalize (GObject * object) +{ + GstMultiQueue *mqueue = GST_MULTI_QUEUE (object); + GList *tmp = mqueue->queues; + + /* FILLME ? */ + /* DRAIN QUEUES */ + while (tmp) { + GstSingleQueue *sq = (GstSingleQueue *) tmp->data; + + gst_data_queue_flush (sq->queue); + g_object_unref (G_OBJECT (sq->queue)); + + tmp = g_list_next (tmp); + } + g_list_free (mqueue->queues); + + /* free/unref instance data */ + g_mutex_free (mqueue->qlock); + + if (G_OBJECT_CLASS (parent_class)->finalize) + G_OBJECT_CLASS (parent_class)->finalize (object); +} + +#define SET_CHILD_PROPERTY(mq,name,value) G_STMT_START { \ + GList * tmp = mq->queues; \ + while (tmp) { \ + GstSingleQueue *q = (GstSingleQueue*)tmp->data; \ + g_object_set_property ((GObject*) q->queue, name, value); \ + tmp = g_list_next(tmp); \ + }; \ +} G_STMT_END + +static void +gst_multi_queue_set_property (GObject * object, guint prop_id, + const GValue * value, GParamSpec * pspec) +{ + GstMultiQueue *mq = GST_MULTI_QUEUE (object); + + switch (prop_id) { + case ARG_MAX_SIZE_BYTES: + mq->max_size.bytes = g_value_get_uint (value); + SET_CHILD_PROPERTY (mq, "max-size-bytes", value); + break; + case ARG_MAX_SIZE_BUFFERS: + mq->max_size.visible = g_value_get_uint (value); + SET_CHILD_PROPERTY (mq, "max-size-visible", value); + break; + case ARG_MAX_SIZE_TIME: + mq->max_size.time = g_value_get_uint64 (value); + SET_CHILD_PROPERTY (mq, "max-size-time", value); + break; + case ARG_EXTRA_SIZE_BYTES: + mq->extra_size.bytes = g_value_get_uint (value); + break; + case ARG_EXTRA_SIZE_BUFFERS: + mq->extra_size.visible = g_value_get_uint (value); + break; + case ARG_EXTRA_SIZE_TIME: + mq->extra_size.time = g_value_get_uint64 (value); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } + +} + +static void +gst_multi_queue_get_property (GObject * object, guint prop_id, + GValue * value, GParamSpec * pspec) +{ + GstMultiQueue *mq = GST_MULTI_QUEUE (object); + + GST_MULTI_QUEUE_MUTEX_LOCK (mq); + + switch (prop_id) { + case ARG_EXTRA_SIZE_BYTES: + g_value_set_uint (value, mq->extra_size.bytes); + break; + case ARG_EXTRA_SIZE_BUFFERS: + g_value_set_uint (value, mq->extra_size.visible); + break; + case ARG_EXTRA_SIZE_TIME: + g_value_set_uint64 (value, mq->extra_size.time); + break; + case ARG_MAX_SIZE_BYTES: + g_value_set_uint (value, mq->max_size.bytes); + break; + case ARG_MAX_SIZE_BUFFERS: + g_value_set_uint (value, mq->max_size.visible); + break; + case ARG_MAX_SIZE_TIME: + g_value_set_uint64 (value, mq->max_size.time); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } + + GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); +} + + +/* + * GstElement methods + */ + +static GstPad * +gst_multi_queue_request_new_pad (GstElement * element, GstPadTemplate * temp, + const gchar * name) +{ + GstMultiQueue *mqueue = GST_MULTI_QUEUE (element); + GstSingleQueue *squeue; + + GST_LOG_OBJECT (element, "name : %s", name); + + /* Create a new single queue, add the sink and source pad and return the sink pad */ + squeue = gst_single_queue_new (mqueue); + + GST_MULTI_QUEUE_MUTEX_LOCK (mqueue); + mqueue->queues = g_list_append (mqueue->queues, squeue); + GST_MULTI_QUEUE_MUTEX_UNLOCK (mqueue); + + GST_DEBUG_OBJECT (mqueue, "Returning pad %s:%s", + GST_DEBUG_PAD_NAME (squeue->sinkpad)); + + return squeue->sinkpad; +} + +static void +gst_multi_queue_release_pad (GstElement * element, GstPad * pad) +{ + GstMultiQueue *mqueue = GST_MULTI_QUEUE (element); + GstSingleQueue *sq = NULL; + GList *tmp; + + GST_LOG_OBJECT (element, "pad %s:%s", GST_DEBUG_PAD_NAME (pad)); + + GST_MULTI_QUEUE_MUTEX_LOCK (mqueue); + /* Find which single queue it belongs to, knowing that it should be a sinkpad */ + for (tmp = mqueue->queues; tmp; tmp = g_list_next (tmp)) { + sq = (GstSingleQueue *) tmp->data; + + if (sq->sinkpad == pad) + break; + } + + if (!tmp) { + GST_WARNING_OBJECT (mqueue, "That pad doesn't belong to this element ???"); + return; + } + + /* remove it from the list */ + mqueue->queues = g_list_delete_link (mqueue->queues, tmp); + + /* delete SingleQueue */ + gst_data_queue_set_flushing (sq->queue, TRUE); + gst_data_queue_flush (sq->queue); + + g_object_unref (G_OBJECT (sq->queue)); + + gst_element_remove_pad (element, sq->srcpad); + gst_element_remove_pad (element, sq->sinkpad); + + /* FIXME : recompute next-non-linked */ + GST_MULTI_QUEUE_MUTEX_UNLOCK (mqueue); +} + +static gboolean +gst_single_queue_push_one (GstMultiQueue * mq, GstSingleQueue * sq, + GstMiniObject * object) +{ + if (GST_IS_BUFFER (object)) { + sq->srcresult = gst_pad_push (sq->srcpad, GST_BUFFER (object)); + + if ((sq->srcresult != GST_FLOW_OK) + && (sq->srcresult != GST_FLOW_NOT_LINKED)) { + GST_DEBUG_OBJECT (mq, "GstSingleQueue %d : pausing queue, reason %s", + sq->id, gst_flow_get_name (sq->srcresult)); + gst_data_queue_set_flushing (sq->queue, TRUE); + gst_pad_pause_task (sq->srcpad); + } + } else if (GST_IS_EVENT (object)) { + if (GST_EVENT_TYPE (object) == GST_EVENT_EOS) { + sq->srcresult = GST_FLOW_UNEXPECTED; + + GST_DEBUG_OBJECT (mq, "GstSingleQueue %d : pausing queue, got EOS", + sq->id); + gst_data_queue_set_flushing (sq->queue, TRUE); + gst_pad_pause_task (sq->srcpad); + } + gst_pad_push_event (sq->srcpad, GST_EVENT (object)); + } else { + g_warning ("Unexpected object in singlequeue %d (refcounting problem?)", + sq->id); + } + + return FALSE; +} + +static void +gst_multi_queue_item_destroy (GstMultiQueueItem * item) +{ + gst_mini_object_unref (item->object); + g_free (item); +} + +static GstMultiQueueItem * +gst_multi_queue_item_new (GstMiniObject * object) +{ + GstMultiQueueItem *item; + + item = g_new0 (GstMultiQueueItem, 1); + item->object = gst_mini_object_ref (object); + item->destroy = (GDestroyNotify) gst_multi_queue_item_destroy; + + if (GST_IS_BUFFER (object)) { + item->size = GST_BUFFER_SIZE (object); + item->duration = GST_BUFFER_DURATION (object); + if (item->duration == GST_CLOCK_TIME_NONE) + item->duration = 0; + item->visible = TRUE; + } + + return item; +} + +static void +gst_multi_queue_loop (GstPad * pad) +{ + GstSingleQueue *sq; + GstMultiQueueItem *item; + GstDataQueueItem *sitem; + GstMultiQueue *mq; + GstMiniObject *object; + guint32 newid; + guint32 oldid = -1; + + sq = (GstSingleQueue *) gst_pad_get_element_private (pad); + mq = (GstMultiQueue *) gst_pad_get_parent (pad); + +restart: + GST_DEBUG_OBJECT (mq, "SingleQueue %d : trying to pop an object", sq->id); + if (!(gst_data_queue_pop (sq->queue, &sitem))) { + /* QUEUE FLUSHING */ + if (sq->srcresult != GST_FLOW_OK) + goto out_flushing; + else + GST_WARNING_OBJECT (mq, + "data_queue_pop() returned FALSE, but srcresult == GST_FLOW_OK !"); + } else { + item = (GstMultiQueueItem *) sitem; + newid = item->posid; + object = item->object; + + GST_LOG_OBJECT (mq, "SingleQueue %d : newid:%d , oldid:%d", + sq->id, newid, oldid); + + /* 1. Only check turn if : + * _ We haven't pushed anything yet + * _ OR the new id isn't the follower of the previous one (continuous segment) */ + if ((oldid == -1) || (newid != (oldid + 1))) { + GST_MULTI_QUEUE_MUTEX_LOCK (mq); + + GST_LOG_OBJECT (mq, "CHECKING sq->srcresult: %s", + gst_flow_get_name (sq->srcresult)); + + /* preamble : if we're not linked, set the newid as the next one we want */ + if (sq->srcresult == GST_FLOW_NOT_LINKED) + sq->nextid = newid; + + /* store the last id we outputted */ + if (oldid != -1) + sq->oldid = oldid; + + /* 2. If there's a queue waiting to push, wake it up. If it's us the */ + /* check below (3.) will avoid us waiting. */ + wake_up_next_non_linked (mq); + + /* 3. If we're not linked AND our nextid is higher than the highest oldid outputted + * _ Update global next-not-linked + * _ Wait on our conditional + */ + while ((sq->srcresult == GST_FLOW_NOT_LINKED) + && (mq->nextnotlinked != sq->id)) { + compute_next_non_linked (mq); + g_cond_wait (sq->turn, mq->qlock); + } + + GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); + + /* 4. Check again status, maybe we're flushing */ + if ((sq->srcresult != GST_FLOW_OK) + && (sq->srcresult != GST_FLOW_NOT_LINKED)) { + gst_multi_queue_item_destroy (item); + goto out_flushing; + } + } + + GST_LOG_OBJECT (mq, "BEFORE PUSHING sq->srcresult: %s", + gst_flow_get_name (sq->srcresult)); + + /* 4. Try to push out the new object */ + gst_single_queue_push_one (mq, sq, object); + + GST_LOG_OBJECT (mq, "AFTER PUSHING sq->srcresult: %s", + gst_flow_get_name (sq->srcresult)); + + gst_multi_queue_item_destroy (item); + oldid = newid; + + /* 5. if GstFlowReturn is non-fatal, goto restart */ + if ((sq->srcresult == GST_FLOW_OK) + || (sq->srcresult == GST_FLOW_NOT_LINKED)) + goto restart; + } + + +beach: + gst_object_unref (mq); + return; + +out_flushing: + { + gst_pad_pause_task (sq->srcpad); + GST_CAT_LOG_OBJECT (multi_queue_debug, mq, + "SingleQueue[%d] task paused, reason:%s", + sq->id, gst_flow_get_name (sq->srcresult)); + goto beach; + } +} + + +/** + * gst_multi_queue_chain: + * + * This is similar to GstQueue's chain function, except: + * _ we don't have leak behavioures, + * _ we push with a unique id (curid) + */ + +static GstFlowReturn +gst_multi_queue_chain (GstPad * pad, GstBuffer * buffer) +{ + GstSingleQueue *sq; + GstMultiQueue *mq; + GstMultiQueueItem *item; + GstFlowReturn ret = GST_FLOW_OK; + guint32 curid; + + sq = gst_pad_get_element_private (pad); + mq = (GstMultiQueue *) gst_pad_get_parent (pad); + + /* Get an unique incrementing id */ + GST_MULTI_QUEUE_MUTEX_LOCK (mq); + curid = mq->counter++; + GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); + + GST_LOG_OBJECT (mq, "SingleQueue %d : about to push buffer with id %d", + sq->id, curid); + + item = gst_multi_queue_item_new ((GstMiniObject *) buffer); + item->posid = curid; + + if (!(gst_data_queue_push (sq->queue, (GstDataQueueItem *) item))) { + GST_LOG_OBJECT (mq, "SingleQueue %d : exit because task paused, reason: %s", + sq->id, gst_flow_get_name (sq->srcresult)); + gst_multi_queue_item_destroy (item); + gst_buffer_unref (buffer); + ret = sq->srcresult; + } + + gst_object_unref (mq); + return ret; +} + +static gboolean +gst_multi_queue_sink_activate_push (GstPad * pad, gboolean active) +{ + GstSingleQueue *sq; + + sq = (GstSingleQueue *) gst_pad_get_element_private (pad); + + if (active) + sq->srcresult = GST_FLOW_OK; + else { + sq->srcresult = GST_FLOW_WRONG_STATE; + gst_data_queue_flush (sq->queue); + } + + return TRUE; +} + +static gboolean +gst_multi_queue_sink_event (GstPad * pad, GstEvent * event) +{ + GstSingleQueue *sq; + GstMultiQueue *mq; + guint32 curid; + GstMultiQueueItem *item; + + sq = (GstSingleQueue *) gst_pad_get_element_private (pad); + mq = (GstMultiQueue *) gst_pad_get_parent (pad); + + switch (GST_EVENT_TYPE (event)) { + case GST_EVENT_FLUSH_START: + GST_DEBUG_OBJECT (mq, "SingleQueue %d : received flush start event", + sq->id); + + gst_pad_push_event (sq->srcpad, event); + + sq->srcresult = GST_FLOW_WRONG_STATE; + gst_data_queue_set_flushing (sq->queue, TRUE); + + /* wake up non-linked task */ + GST_LOG_OBJECT (mq, "SingleQueue %d : waking up eventually waiting task", + sq->id); + GST_MULTI_QUEUE_MUTEX_LOCK (mq); + g_cond_signal (sq->turn); + GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); + + gst_pad_pause_task (sq->srcpad); + goto done; + + case GST_EVENT_FLUSH_STOP: + GST_DEBUG_OBJECT (mq, "SingleQueue %d : received flush stop event", + sq->id); + + gst_pad_push_event (sq->srcpad, event); + + gst_data_queue_flush (sq->queue); + gst_data_queue_set_flushing (sq->queue, FALSE); + sq->srcresult = GST_FLOW_OK; + sq->nextid = -1; + sq->oldid = -1; + + GST_DEBUG_OBJECT (mq, "SingleQueue %d : restarting task", sq->id); + gst_pad_start_task (sq->srcpad, (GstTaskFunction) gst_multi_queue_loop, + sq->srcpad); + goto done; + + default: + if (!(GST_EVENT_IS_SERIALIZED (event))) { + gst_pad_push_event (sq->srcpad, event); + goto done; + } + break; + } + + /* Get an unique incrementing id */ + GST_MULTI_QUEUE_MUTEX_LOCK (mq); + curid = mq->counter++; + GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); + + item = gst_multi_queue_item_new ((GstMiniObject *) event); + item->posid = curid; + + GST_DEBUG_OBJECT (mq, + "SingleQueue %d : Adding event %p of type %s with id %d", sq->id, event, + GST_EVENT_TYPE_NAME (event), curid); + + if (!(gst_data_queue_push (sq->queue, (GstDataQueueItem *) item))) { + GST_LOG_OBJECT (mq, "SingleQueue %d : exit because task paused, reason: %s", + sq->id, gst_flow_get_name (sq->srcresult)); + gst_multi_queue_item_destroy (item); + gst_event_unref (event); + } + +done: + gst_object_unref (mq); + return TRUE; +} + +static GstCaps * +gst_multi_queue_getcaps (GstPad * pad) +{ + GstSingleQueue *sq = gst_pad_get_element_private (pad); + GstPad *otherpad; + GstCaps *result; + + GST_LOG_OBJECT (pad, "..."); + + otherpad = (pad == sq->srcpad) ? sq->sinkpad : sq->srcpad; + + GST_LOG_OBJECT (otherpad, "Getting caps from the peer of this pad"); + + result = gst_pad_peer_get_caps (otherpad); + if (result == NULL) + result = gst_caps_new_any (); + + return result; +} + +static GstFlowReturn +gst_multi_queue_bufferalloc (GstPad * pad, guint64 offset, guint size, + GstCaps * caps, GstBuffer ** buf) +{ + GstSingleQueue *sq = gst_pad_get_element_private (pad); + + return gst_pad_alloc_buffer (sq->srcpad, offset, size, caps, buf); +} + +static gboolean +gst_multi_queue_src_activate_push (GstPad * pad, gboolean active) +{ + GstMultiQueue *mq; + GstSingleQueue *sq; + gboolean result = FALSE; + + sq = (GstSingleQueue *) gst_pad_get_element_private (pad); + mq = (GstMultiQueue *) gst_pad_get_parent (pad); + + GST_LOG ("SingleQueue %d", sq->id); + + if (active) { + sq->srcresult = GST_FLOW_OK; + gst_data_queue_set_flushing (sq->queue, FALSE); + result = gst_pad_start_task (pad, (GstTaskFunction) gst_multi_queue_loop, + pad); + } else { + /* 1. unblock loop function */ + sq->srcresult = GST_FLOW_WRONG_STATE; + gst_data_queue_set_flushing (sq->queue, TRUE); + + /* 2. unblock potentially non-linked pad */ + GST_LOG_OBJECT (mq, "SingleQueue %d : waking up eventually waiting task", + sq->id); + GST_MULTI_QUEUE_MUTEX_LOCK (mq); + g_cond_signal (sq->turn); + GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); + + /* 3. make sure streaming finishes */ + result = gst_pad_stop_task (pad); + gst_data_queue_set_flushing (sq->queue, FALSE); + } + + gst_object_unref (mq); + return result; +} + +static gboolean +gst_multi_queue_acceptcaps (GstPad * pad, GstCaps * caps) +{ + return TRUE; +} + +static gboolean +gst_multi_queue_src_event (GstPad * pad, GstEvent * event) +{ + GstSingleQueue *sq = gst_pad_get_element_private (pad); + + return gst_pad_push_event (sq->sinkpad, event); +} + +static gboolean +gst_multi_queue_src_query (GstPad * pad, GstQuery * query) +{ + GstSingleQueue *sq = gst_pad_get_element_private (pad); + GstPad *peerpad; + gboolean res; + + /* FILLME */ + /* Handle position offset depending on queue size */ + + /* default handling */ + if (!(peerpad = gst_pad_get_peer (sq->sinkpad))) + goto no_peer; + + res = gst_pad_query (peerpad, query); + + gst_object_unref (peerpad); + + return res; + +no_peer: + { + GST_LOG_OBJECT (sq->sinkpad, "Couldn't send query because we have no peer"); + return FALSE; + } +} + + +/* + * Next-non-linked functions + */ + +/* WITH LOCK TAKEN */ +static void +wake_up_next_non_linked (GstMultiQueue * mq) +{ + GList *tmp; + + GST_LOG ("mq->nextnotlinked:%d", mq->nextnotlinked); + + /* maybe no-one is waiting */ + if (mq->nextnotlinked == -1) + return; + + /* Else figure out which singlequeue it is and wake it up */ + for (tmp = mq->queues; tmp; tmp = g_list_next (tmp)) { + GstSingleQueue *sq = (GstSingleQueue *) tmp->data; + + if (sq->srcresult == GST_FLOW_NOT_LINKED) + if (sq->id == mq->nextnotlinked) { + GST_LOG_OBJECT (mq, "Waking up singlequeue %d", sq->id); + g_cond_signal (sq->turn); + return; + } + } +} + +/* WITH LOCK TAKEN */ +static void +compute_next_non_linked (GstMultiQueue * mq) +{ + GList *tmp; + guint32 lowest = G_MAXUINT32; + gint nextid = -1; + + for (tmp = mq->queues; tmp; tmp = g_list_next (tmp)) { + GstSingleQueue *sq = (GstSingleQueue *) tmp->data; + + GST_LOG ("inspecting sq:%d , nextid:%d, oldid:%d, srcresult:%s", + sq->id, sq->nextid, sq->oldid, gst_flow_get_name (sq->srcresult)); + + if (sq->srcresult == GST_FLOW_NOT_LINKED) + if (lowest > sq->nextid) { + lowest = sq->nextid; + nextid = sq->id; + } + + /* If we don't have a global highid, or the global highid is lower than */ + /* this single queue's last outputted id, store the queue's one */ + if ((mq->highid == -1) || (mq->highid < sq->oldid)) + mq->highid = sq->oldid; + } + + mq->nextnotlinked = nextid; + GST_LOG_OBJECT (mq, + "Next-non-linked is sq #%d with nextid : %d. Highid is now : %d", nextid, + lowest, mq->highid); +} + +/* + * GstSingleQueue functions + */ + +static void +single_queue_overrun_cb (GstSingleQueue * sq, GstMultiQueue * mq) +{ + GST_LOG_OBJECT (mq, "Single Queue %d is full", sq->id); + /* Overrun is always forwarded, since this is blocking the upstream element */ + g_signal_emit (G_OBJECT (mq), gst_multi_queue_signals[SIGNAL_OVERRUN], 0); +} + +static void +single_queue_underrun_cb (GstSingleQueue * sq, GstMultiQueue * mq) +{ + gboolean empty = TRUE; + GList *tmp; + + GST_LOG_OBJECT (mq, + "Single Queue %d is empty, Checking if all single queues are empty", + sq->id); + + GST_MULTI_QUEUE_MUTEX_LOCK (mq); + for (tmp = mq->queues; tmp; tmp = g_list_next (tmp)) { + GstSingleQueue *sq = (GstSingleQueue *) tmp->data; + + if (!gst_data_queue_is_empty (sq->queue)) { + empty = FALSE; + break; + } + } + GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); + + if (empty) { + GST_DEBUG_OBJECT (mq, "All queues are empty, signalling it"); + g_signal_emit (G_OBJECT (mq), gst_multi_queue_signals[SIGNAL_UNDERRUN], 0); + } +} + +static gboolean +single_queue_check_full (GstDataQueue * dataq, guint visible, guint bytes, + guint64 time, GstSingleQueue * sq) +{ + gboolean res; + + res = (((sq->max_size.visible != 0) && + sq->max_size.visible < visible) || + ((sq->max_size.bytes != 0) && + sq->max_size.bytes < bytes) || + ((sq->max_size.time != 0) && sq->max_size.time < time)); + + return res; +} + +static GstSingleQueue * +gst_single_queue_new (GstMultiQueue * mqueue) +{ + GstSingleQueue *sq; + gchar *tmp; + + sq = g_new0 (GstSingleQueue, 1); + + GST_MULTI_QUEUE_MUTEX_LOCK (mqueue); + sq->id = mqueue->nbqueues++; + + /* copy over max_size and extra_size so we don't need to take the lock + * any longer when checking if the queue is full. */ + /* FIXME : We can't modify those values once the single queue is created + * since we don't have any lock protecting those values. */ + sq->max_size.visible = mqueue->max_size.visible; + sq->max_size.bytes = mqueue->max_size.bytes; + sq->max_size.time = mqueue->max_size.time; + + sq->extra_size.visible = mqueue->extra_size.visible; + sq->extra_size.bytes = mqueue->extra_size.bytes; + sq->extra_size.time = mqueue->extra_size.time; + + GST_MULTI_QUEUE_MUTEX_UNLOCK (mqueue); + + GST_DEBUG_OBJECT (mqueue, "Creating GstSingleQueue id:%d", sq->id); + + sq->mqueue = mqueue; + sq->srcresult = GST_FLOW_OK; + sq->queue = + gst_data_queue_new ((GstDataQueueCheckFullFunction) + single_queue_check_full, sq); + + sq->nextid = -1; + sq->oldid = -1; + sq->turn = g_cond_new (); + + /* FIXME : attach to underrun/overrun signals to handle non-starvation + * OR should this be handled when we check if the queue is full/empty before pushing/popping ? */ + + g_signal_connect (G_OBJECT (sq->queue), "full", + G_CALLBACK (single_queue_overrun_cb), mqueue); + + g_signal_connect (G_OBJECT (sq->queue), "empty", + G_CALLBACK (single_queue_underrun_cb), mqueue); + + tmp = g_strdup_printf ("sink%d", sq->id); + sq->sinkpad = gst_pad_new_from_static_template (&sinktemplate, tmp); + g_free (tmp); + + gst_pad_set_chain_function (sq->sinkpad, + GST_DEBUG_FUNCPTR (gst_multi_queue_chain)); + gst_pad_set_activatepush_function (sq->sinkpad, + GST_DEBUG_FUNCPTR (gst_multi_queue_sink_activate_push)); + gst_pad_set_event_function (sq->sinkpad, + GST_DEBUG_FUNCPTR (gst_multi_queue_sink_event)); + gst_pad_set_getcaps_function (sq->sinkpad, + GST_DEBUG_FUNCPTR (gst_multi_queue_getcaps)); + gst_pad_set_bufferalloc_function (sq->sinkpad, + GST_DEBUG_FUNCPTR (gst_multi_queue_bufferalloc)); + + tmp = g_strdup_printf ("src%d", sq->id); + sq->srcpad = gst_pad_new_from_static_template (&srctemplate, tmp); + g_free (tmp); + + gst_pad_set_activatepush_function (sq->srcpad, + GST_DEBUG_FUNCPTR (gst_multi_queue_src_activate_push)); + gst_pad_set_acceptcaps_function (sq->srcpad, + GST_DEBUG_FUNCPTR (gst_multi_queue_acceptcaps)); + gst_pad_set_getcaps_function (sq->srcpad, + GST_DEBUG_FUNCPTR (gst_multi_queue_getcaps)); + gst_pad_set_event_function (sq->srcpad, + GST_DEBUG_FUNCPTR (gst_multi_queue_src_event)); + gst_pad_set_query_function (sq->srcpad, + GST_DEBUG_FUNCPTR (gst_multi_queue_src_query)); + + gst_pad_set_element_private (sq->sinkpad, (gpointer) sq); + gst_pad_set_element_private (sq->srcpad, (gpointer) sq); + + gst_element_add_pad (GST_ELEMENT (mqueue), sq->sinkpad); + gst_element_add_pad (GST_ELEMENT (mqueue), sq->srcpad); + + gst_pad_set_active (sq->srcpad, TRUE); + gst_pad_set_active (sq->sinkpad, TRUE); + + GST_DEBUG_OBJECT (mqueue, "GstSingleQueue [%d] created and pads added", + sq->id); + + return sq; +} diff --git a/plugins/elements/gstmultiqueue.h b/plugins/elements/gstmultiqueue.h new file mode 100644 index 0000000000..74cdaf34fb --- /dev/null +++ b/plugins/elements/gstmultiqueue.h @@ -0,0 +1,88 @@ +/* GStreamer + * Copyright (C) 2006 Edward Hervey + * + * gstmultiqueue.h: + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place - Suite 330, + * Boston, MA 02111-1307, USA. + */ + + +#ifndef __GST_MULTI_QUEUE_H__ +#define __GST_MULTI_QUEUE_H__ + +#include +#include + +G_BEGIN_DECLS + +#define GST_TYPE_MULTI_QUEUE \ + (gst_multi_queue_get_type()) +#define GST_MULTI_QUEUE(obj) \ + (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_MULTI_QUEUE,GstMultiQueue)) +#define GST_MULTI_QUEUE_CLASS(klass) \ + (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_MULTI_QUEUE,GstMultiQueueClass)) +#define GST_IS_MULTI_QUEUE(obj) \ + (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_MULTI_QUEUE)) +#define GST_IS_MULTI_QUEUE_CLASS(obj) \ + (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_MULTI_QUEUE)) + +typedef struct _GstMultiQueue GstMultiQueue; +typedef struct _GstMultiQueueClass GstMultiQueueClass; + +/** + * GstMultiQueue: + * + * Opaque #GstMultiQueue structure. + */ +struct _GstMultiQueue { + GstElement element; + + /* number of queues */ + guint nbqueues; + + /* The list of individual queues */ + GList *queues; + + GstDataQueueSize max_size, extra_size; + + guint32 counter; /* incoming object counter */ + guint32 highid; /* contains highest id of last outputted object */ + + GMutex * qlock; /* Global queue lock (vs object lock or individual */ + /* queues lock). Protects nbqueues, queues, global */ + /* GstMultiQueueSize, counter and highid */ + + gint nextnotlinked; /* ID of the next queue not linked (-1 : none) */ + + gpointer _gst_reserved[GST_PADDING_LARGE]; +}; + +struct _GstMultiQueueClass { + GstElementClass parent_class; + + /* signals emitted when ALL queues are either full or empty */ + void (*underrun) (GstMultiQueue *queue); + void (*overrun) (GstMultiQueue *queue); + + gpointer _gst_reserved[GST_PADDING_LARGE]; +}; + +GType gst_multi_queue_get_type (void); + +G_END_DECLS + + +#endif /* __GST_MULTI_QUEUE_H__ */