From 40732dd30892910aa681b691c7da586da9e30eec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Thu, 29 Oct 2009 11:30:11 +0100 Subject: [PATCH] queue2: Integrate into coreplugins --- plugins/elements/Makefile.am | 2 + plugins/elements/gstelements.c | 2 + plugins/elements/gstqueue2.c | 467 +++++++++++++++------------------ plugins/elements/gstqueue2.h | 49 ++-- 4 files changed, 252 insertions(+), 268 deletions(-) diff --git a/plugins/elements/Makefile.am b/plugins/elements/Makefile.am index 733aa48bb4..66f6d2c242 100644 --- a/plugins/elements/Makefile.am +++ b/plugins/elements/Makefile.am @@ -21,6 +21,7 @@ libgstcoreelements_la_SOURCES = \ gstfilesrc.c \ gstidentity.c \ gstqueue.c \ + gstqueue2.c \ gsttee.c \ gsttypefindelement.c \ gstmultiqueue.c @@ -42,6 +43,7 @@ noinst_HEADERS = \ gstfilesrc.h \ gstidentity.h \ gstqueue.h \ + gstqueue2.h \ gsttee.h \ gsttypefindelement.h \ gstmultiqueue.h diff --git a/plugins/elements/gstelements.c b/plugins/elements/gstelements.c index 795f79df32..bc5f50bc6e 100644 --- a/plugins/elements/gstelements.c +++ b/plugins/elements/gstelements.c @@ -36,6 +36,7 @@ #include "gstfilesrc.h" #include "gstidentity.h" #include "gstqueue.h" +#include "gstqueue2.h" #include "gsttee.h" #include "gsttypefindelement.h" #include "gstmultiqueue.h" @@ -59,6 +60,7 @@ static struct _elements_entry _elements[] = { {"filesrc", GST_RANK_PRIMARY, gst_file_src_get_type}, {"identity", GST_RANK_NONE, gst_identity_get_type}, {"queue", GST_RANK_NONE, gst_queue_get_type}, + {"queue2", GST_RANK_NONE, gst_queue2_get_type}, {"filesink", GST_RANK_PRIMARY, gst_file_sink_get_type}, {"tee", GST_RANK_NONE, gst_tee_get_type}, {"typefind", GST_RANK_NONE, gst_type_find_element_get_type}, diff --git a/plugins/elements/gstqueue2.c b/plugins/elements/gstqueue2.c index dd6e723d9f..9bc4c6619f 100644 --- a/plugins/elements/gstqueue2.c +++ b/plugins/elements/gstqueue2.c @@ -27,8 +27,8 @@ * @short_description: Asynchronous data queue. * * Data is queued until one of the limits specified by the - * #GstQueue2:max-size-buffers, #GstQueue2:max-size-bytes and/or - * #GstQueue2:max-size-time properties has been reached. Any attempt to push + * #GstQueue22:max-size-buffers, #GstQueue22:max-size-bytes and/or + * #GstQueue22:max-size-time properties has been reached. Any attempt to push * more buffers into the queue will block the pushing thread until more space * becomes available. * @@ -36,7 +36,7 @@ * processing on sink and source pad. * * You can query how many buffers are queued by reading the - * #GstQueue2:current-level-buffers property. + * #GstQueue22:current-level-buffers property. * * The default queue size limits are 100 buffers, 2MB of data, or * two seconds worth of data, whichever is reached first. @@ -62,7 +62,7 @@ #include -#include +#include "gst/gst-i18n-lib.h" #ifdef G_OS_WIN32 #include /* lseek, open, close, read */ @@ -74,7 +74,8 @@ #include #endif -static const GstElementDetails gst_queue_details = GST_ELEMENT_DETAILS ("Queue", +static const GstElementDetails gst_queue2_details = +GST_ELEMENT_DETAILS ("Queue", "Generic", "Simple data queue", "Erik Walthinsen , " @@ -129,16 +130,7 @@ enum PROP_TEMP_LOCATION }; -/* used to keep track of sizes (current and max) */ -struct _GstQueueSize -{ - guint buffers; - guint bytes; - guint64 time; - guint64 rate_time; -}; - -#define GST_QUEUE_CLEAR_LEVEL(l) G_STMT_START { \ +#define GST_QUEUE2_CLEAR_LEVEL(l) G_STMT_START { \ l.buffers = 0; \ l.bytes = 0; \ l.time = 0; \ @@ -161,21 +153,21 @@ struct _GstQueueSize queue->writing_pos - queue->max_reading_pos : \ queue->queue->length)) -#define GST_QUEUE_MUTEX_LOCK(q) G_STMT_START { \ +#define GST_QUEUE2_MUTEX_LOCK(q) G_STMT_START { \ g_mutex_lock (q->qlock); \ } G_STMT_END -#define GST_QUEUE_MUTEX_LOCK_CHECK(q,label) G_STMT_START { \ - GST_QUEUE_MUTEX_LOCK (q); \ +#define GST_QUEUE2_MUTEX_LOCK_CHECK(q,label) G_STMT_START { \ + GST_QUEUE2_MUTEX_LOCK (q); \ if (q->srcresult != GST_FLOW_OK) \ goto label; \ } G_STMT_END -#define GST_QUEUE_MUTEX_UNLOCK(q) G_STMT_START { \ +#define GST_QUEUE2_MUTEX_UNLOCK(q) G_STMT_START { \ g_mutex_unlock (q->qlock); \ } G_STMT_END -#define GST_QUEUE_WAIT_DEL_CHECK(q, label) G_STMT_START { \ +#define GST_QUEUE2_WAIT_DEL_CHECK(q, label) G_STMT_START { \ STATUS (queue, q->sinkpad, "wait for DEL"); \ q->waiting_del = TRUE; \ g_cond_wait (q->item_del, queue->qlock); \ @@ -187,7 +179,7 @@ struct _GstQueueSize STATUS (queue, q->sinkpad, "received DEL"); \ } G_STMT_END -#define GST_QUEUE_WAIT_ADD_CHECK(q, label) G_STMT_START { \ +#define GST_QUEUE2_WAIT_ADD_CHECK(q, label) G_STMT_START { \ STATUS (queue, q->srcpad, "wait for ADD"); \ q->waiting_add = TRUE; \ g_cond_wait (q->item_add, q->qlock); \ @@ -199,14 +191,14 @@ struct _GstQueueSize STATUS (queue, q->srcpad, "received ADD"); \ } G_STMT_END -#define GST_QUEUE_SIGNAL_DEL(q) G_STMT_START { \ +#define GST_QUEUE2_SIGNAL_DEL(q) G_STMT_START { \ if (q->waiting_del) { \ STATUS (q, q->srcpad, "signal DEL"); \ g_cond_signal (q->item_del); \ } \ } G_STMT_END -#define GST_QUEUE_SIGNAL_ADD(q) G_STMT_START { \ +#define GST_QUEUE2_SIGNAL_ADD(q) G_STMT_START { \ if (q->waiting_add) { \ STATUS (q, q->sinkpad, "signal ADD"); \ g_cond_signal (q->item_add); \ @@ -217,82 +209,82 @@ struct _GstQueueSize /* can't use boilerplate as we need to register with Queue2 to avoid conflicts * with queue in core elements */ -static void gst_queue_class_init (GstQueueClass * klass); -static void gst_queue_init (GstQueue * queue, GstQueueClass * g_class); +static void gst_queue2_class_init (GstQueue2Class * klass); +static void gst_queue2_init (GstQueue2 * queue, GstQueue2Class * g_class); static GstElementClass *parent_class; -static GType -gst_queue_get_type (void) +GType +gst_queue2_get_type (void) { - static GType gst_queue_type = 0; + static GType gst_queue2_type = 0; - if (!gst_queue_type) { - static const GTypeInfo gst_queue_info = { - sizeof (GstQueueClass), + if (!gst_queue2_type) { + static const GTypeInfo gst_queue2_info = { + sizeof (GstQueue2Class), NULL, NULL, - (GClassInitFunc) gst_queue_class_init, + (GClassInitFunc) gst_queue2_class_init, NULL, NULL, - sizeof (GstQueue), + sizeof (GstQueue2), 0, - (GInstanceInitFunc) gst_queue_init, + (GInstanceInitFunc) gst_queue2_init, NULL }; - gst_queue_type = - g_type_register_static (GST_TYPE_ELEMENT, "GstQueue2", - &gst_queue_info, 0); + gst_queue2_type = + g_type_register_static (GST_TYPE_ELEMENT, "GstQueue22", + &gst_queue2_info, 0); } - return gst_queue_type; + return gst_queue2_type; } -static void gst_queue_finalize (GObject * object); +static void gst_queue2_finalize (GObject * object); -static void gst_queue_set_property (GObject * object, +static void gst_queue2_set_property (GObject * object, guint prop_id, const GValue * value, GParamSpec * pspec); -static void gst_queue_get_property (GObject * object, +static void gst_queue2_get_property (GObject * object, guint prop_id, GValue * value, GParamSpec * pspec); -static GstFlowReturn gst_queue_chain (GstPad * pad, GstBuffer * buffer); -static GstFlowReturn gst_queue_bufferalloc (GstPad * pad, guint64 offset, +static GstFlowReturn gst_queue2_chain (GstPad * pad, GstBuffer * buffer); +static GstFlowReturn gst_queue2_bufferalloc (GstPad * pad, guint64 offset, guint size, GstCaps * caps, GstBuffer ** buf); -static GstFlowReturn gst_queue_push_one (GstQueue * queue); -static void gst_queue_loop (GstPad * pad); +static GstFlowReturn gst_queue2_push_one (GstQueue2 * queue); +static void gst_queue2_loop (GstPad * pad); -static gboolean gst_queue_handle_sink_event (GstPad * pad, GstEvent * event); +static gboolean gst_queue2_handle_sink_event (GstPad * pad, GstEvent * event); -static gboolean gst_queue_handle_src_event (GstPad * pad, GstEvent * event); -static gboolean gst_queue_handle_src_query (GstPad * pad, GstQuery * query); +static gboolean gst_queue2_handle_src_event (GstPad * pad, GstEvent * event); +static gboolean gst_queue2_handle_src_query (GstPad * pad, GstQuery * query); -static GstCaps *gst_queue_getcaps (GstPad * pad); -static gboolean gst_queue_acceptcaps (GstPad * pad, GstCaps * caps); +static GstCaps *gst_queue2_getcaps (GstPad * pad); +static gboolean gst_queue2_acceptcaps (GstPad * pad, GstCaps * caps); -static GstFlowReturn gst_queue_get_range (GstPad * pad, guint64 offset, +static GstFlowReturn gst_queue2_get_range (GstPad * pad, guint64 offset, guint length, GstBuffer ** buffer); -static gboolean gst_queue_src_checkgetrange_function (GstPad * pad); +static gboolean gst_queue2_src_checkgetrange_function (GstPad * pad); -static gboolean gst_queue_src_activate_pull (GstPad * pad, gboolean active); -static gboolean gst_queue_src_activate_push (GstPad * pad, gboolean active); -static gboolean gst_queue_sink_activate_push (GstPad * pad, gboolean active); -static GstStateChangeReturn gst_queue_change_state (GstElement * element, +static gboolean gst_queue2_src_activate_pull (GstPad * pad, gboolean active); +static gboolean gst_queue2_src_activate_push (GstPad * pad, gboolean active); +static gboolean gst_queue2_sink_activate_push (GstPad * pad, gboolean active); +static GstStateChangeReturn gst_queue2_change_state (GstElement * element, GstStateChange transition); -static gboolean gst_queue_is_empty (GstQueue * queue); -static gboolean gst_queue_is_filled (GstQueue * queue); +static gboolean gst_queue2_is_empty (GstQueue2 * queue); +static gboolean gst_queue2_is_filled (GstQueue2 * queue); -/* static guint gst_queue_signals[LAST_SIGNAL] = { 0 }; */ +/* static guint gst_queue2_signals[LAST_SIGNAL] = { 0 }; */ static void -gst_queue_class_init (GstQueueClass * klass) +gst_queue2_class_init (GstQueue2Class * klass) { GObjectClass *gobject_class = G_OBJECT_CLASS (klass); GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass); parent_class = g_type_class_peek_parent (klass); - gobject_class->set_property = gst_queue_set_property; - gobject_class->get_property = gst_queue_get_property; + gobject_class->set_property = gst_queue2_set_property; + gobject_class->get_property = gst_queue2_get_property; /* properties */ g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_BYTES, @@ -358,55 +350,55 @@ gst_queue_class_init (GstQueueClass * klass) gst_element_class_add_pad_template (gstelement_class, gst_static_pad_template_get (&sinktemplate)); - gst_element_class_set_details (gstelement_class, &gst_queue_details); + gst_element_class_set_details (gstelement_class, &gst_queue2_details); /* set several parent class virtual functions */ - gobject_class->finalize = gst_queue_finalize; + gobject_class->finalize = gst_queue2_finalize; - gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_queue_change_state); + gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_queue2_change_state); } static void -gst_queue_init (GstQueue * queue, GstQueueClass * g_class) +gst_queue2_init (GstQueue2 * queue, GstQueue2Class * g_class) { queue->sinkpad = gst_pad_new_from_static_template (&sinktemplate, "sink"); gst_pad_set_chain_function (queue->sinkpad, - GST_DEBUG_FUNCPTR (gst_queue_chain)); + GST_DEBUG_FUNCPTR (gst_queue2_chain)); gst_pad_set_activatepush_function (queue->sinkpad, - GST_DEBUG_FUNCPTR (gst_queue_sink_activate_push)); + GST_DEBUG_FUNCPTR (gst_queue2_sink_activate_push)); gst_pad_set_event_function (queue->sinkpad, - GST_DEBUG_FUNCPTR (gst_queue_handle_sink_event)); + GST_DEBUG_FUNCPTR (gst_queue2_handle_sink_event)); gst_pad_set_getcaps_function (queue->sinkpad, - GST_DEBUG_FUNCPTR (gst_queue_getcaps)); + GST_DEBUG_FUNCPTR (gst_queue2_getcaps)); gst_pad_set_acceptcaps_function (queue->sinkpad, - GST_DEBUG_FUNCPTR (gst_queue_acceptcaps)); + GST_DEBUG_FUNCPTR (gst_queue2_acceptcaps)); gst_pad_set_bufferalloc_function (queue->sinkpad, - GST_DEBUG_FUNCPTR (gst_queue_bufferalloc)); + GST_DEBUG_FUNCPTR (gst_queue2_bufferalloc)); gst_element_add_pad (GST_ELEMENT (queue), queue->sinkpad); queue->srcpad = gst_pad_new_from_static_template (&srctemplate, "src"); gst_pad_set_activatepull_function (queue->srcpad, - GST_DEBUG_FUNCPTR (gst_queue_src_activate_pull)); + GST_DEBUG_FUNCPTR (gst_queue2_src_activate_pull)); gst_pad_set_activatepush_function (queue->srcpad, - GST_DEBUG_FUNCPTR (gst_queue_src_activate_push)); + GST_DEBUG_FUNCPTR (gst_queue2_src_activate_push)); gst_pad_set_getrange_function (queue->srcpad, - GST_DEBUG_FUNCPTR (gst_queue_get_range)); + GST_DEBUG_FUNCPTR (gst_queue2_get_range)); gst_pad_set_checkgetrange_function (queue->srcpad, - GST_DEBUG_FUNCPTR (gst_queue_src_checkgetrange_function)); + GST_DEBUG_FUNCPTR (gst_queue2_src_checkgetrange_function)); gst_pad_set_getcaps_function (queue->srcpad, - GST_DEBUG_FUNCPTR (gst_queue_getcaps)); + GST_DEBUG_FUNCPTR (gst_queue2_getcaps)); gst_pad_set_acceptcaps_function (queue->srcpad, - GST_DEBUG_FUNCPTR (gst_queue_acceptcaps)); + GST_DEBUG_FUNCPTR (gst_queue2_acceptcaps)); gst_pad_set_event_function (queue->srcpad, - GST_DEBUG_FUNCPTR (gst_queue_handle_src_event)); + GST_DEBUG_FUNCPTR (gst_queue2_handle_src_event)); gst_pad_set_query_function (queue->srcpad, - GST_DEBUG_FUNCPTR (gst_queue_handle_src_query)); + GST_DEBUG_FUNCPTR (gst_queue2_handle_src_query)); gst_element_add_pad (GST_ELEMENT (queue), queue->srcpad); /* levels */ - GST_QUEUE_CLEAR_LEVEL (queue->cur_level); + GST_QUEUE2_CLEAR_LEVEL (queue->cur_level); queue->max_level.buffers = DEFAULT_MAX_SIZE_BUFFERS; queue->max_level.bytes = DEFAULT_MAX_SIZE_BYTES; queue->max_level.time = DEFAULT_MAX_SIZE_TIME; @@ -442,9 +434,9 @@ gst_queue_init (GstQueue * queue, GstQueueClass * g_class) /* called only once, as opposed to dispose */ static void -gst_queue_finalize (GObject * object) +gst_queue2_finalize (GObject * object) { - GstQueue *queue = GST_QUEUE (object); + GstQueue2 *queue = GST_QUEUE2 (object); GST_DEBUG_OBJECT (queue, "finalizing queue"); @@ -469,13 +461,13 @@ gst_queue_finalize (GObject * object) } static gboolean -gst_queue_acceptcaps (GstPad * pad, GstCaps * caps) +gst_queue2_acceptcaps (GstPad * pad, GstCaps * caps) { - GstQueue *queue; + GstQueue2 *queue; GstPad *otherpad; gboolean result; - queue = GST_QUEUE (GST_PAD_PARENT (pad)); + queue = GST_QUEUE2 (GST_PAD_PARENT (pad)); otherpad = (pad == queue->srcpad ? queue->sinkpad : queue->srcpad); result = gst_pad_peer_accept_caps (otherpad, caps); @@ -484,13 +476,13 @@ gst_queue_acceptcaps (GstPad * pad, GstCaps * caps) } static GstCaps * -gst_queue_getcaps (GstPad * pad) +gst_queue2_getcaps (GstPad * pad) { - GstQueue *queue; + GstQueue2 *queue; GstPad *otherpad; GstCaps *result; - queue = GST_QUEUE (GST_PAD_PARENT (pad)); + queue = GST_QUEUE2 (GST_PAD_PARENT (pad)); otherpad = (pad == queue->srcpad ? queue->sinkpad : queue->srcpad); result = gst_pad_peer_get_caps (otherpad); @@ -501,13 +493,13 @@ gst_queue_getcaps (GstPad * pad) } static GstFlowReturn -gst_queue_bufferalloc (GstPad * pad, guint64 offset, guint size, GstCaps * caps, - GstBuffer ** buf) +gst_queue2_bufferalloc (GstPad * pad, guint64 offset, guint size, + GstCaps * caps, GstBuffer ** buf) { - GstQueue *queue; + GstQueue2 *queue; GstFlowReturn result; - queue = GST_QUEUE (GST_PAD_PARENT (pad)); + queue = GST_QUEUE2 (GST_PAD_PARENT (pad)); /* Forward to src pad, without setting caps on the src pad */ result = gst_pad_alloc_buffer (queue->srcpad, offset, size, caps, buf); @@ -518,7 +510,7 @@ gst_queue_bufferalloc (GstPad * pad, guint64 offset, guint size, GstCaps * caps, /* calculate the diff between running time on the sink and src of the queue. * This is the total amount of time in the queue. */ static void -update_time_level (GstQueue * queue) +update_time_level (GstQueue2 * queue) { gint64 sink_time, src_time; @@ -541,7 +533,7 @@ update_time_level (GstQueue * queue) /* take a NEWSEGMENT event and apply the values to segment, updating the time * level of queue. */ static void -apply_segment (GstQueue * queue, GstEvent * event, GstSegment * segment) +apply_segment (GstQueue2 * queue, GstEvent * event, GstSegment * segment) { gboolean update; GstFormat format; @@ -583,7 +575,7 @@ apply_segment (GstQueue * queue, GstEvent * event, GstSegment * segment) /* take a buffer and update segment, updating the time level of the queue. */ static void -apply_buffer (GstQueue * queue, GstBuffer * buffer, GstSegment * segment) +apply_buffer (GstQueue2 * queue, GstBuffer * buffer, GstSegment * segment) { GstClockTime duration, timestamp; @@ -609,7 +601,7 @@ apply_buffer (GstQueue * queue, GstBuffer * buffer, GstSegment * segment) } static void -update_buffering (GstQueue * queue) +update_buffering (GstQueue2 * queue) { gint percent; gboolean post = FALSE; @@ -679,7 +671,7 @@ update_buffering (GstQueue * queue) } static void -reset_rate_timer (GstQueue * queue) +reset_rate_timer (GstQueue2 * queue) { queue->bytes_in = 0; queue->bytes_out = 0; @@ -701,7 +693,7 @@ reset_rate_timer (GstQueue * queue) #define AVG_OUT(avg,val) ((avg) * 3.0 + (val)) / 4.0 static void -update_in_rates (GstQueue * queue) +update_in_rates (GstQueue2 * queue) { gdouble elapsed, period; gdouble byte_in_rate; @@ -742,7 +734,7 @@ update_in_rates (GstQueue * queue) } static void -update_out_rates (GstQueue * queue) +update_out_rates (GstQueue2 * queue) { gdouble elapsed, period; gdouble byte_out_rate; @@ -782,7 +774,7 @@ update_out_rates (GstQueue * queue) } static void -gst_queue_write_buffer_to_file (GstQueue * queue, GstBuffer * buffer) +gst_queue2_write_buffer_to_file (GstQueue2 * queue, GstBuffer * buffer) { guint size; guint8 *data; @@ -814,7 +806,7 @@ gst_queue_write_buffer_to_file (GstQueue * queue, GstBuffer * buffer) /* see if there is enough data in the file to read a full buffer */ static gboolean -gst_queue_have_data (GstQueue * queue, guint64 offset, guint length) +gst_queue2_have_data (GstQueue2 * queue, guint64 offset, guint length) { GST_DEBUG_OBJECT (queue, "offset %" G_GUINT64_FORMAT ", len %u, write %" G_GUINT64_FORMAT, offset, @@ -829,7 +821,7 @@ gst_queue_have_data (GstQueue * queue, guint64 offset, guint length) } static GstFlowReturn -gst_queue_create_read (GstQueue * queue, guint64 offset, guint length, +gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length, GstBuffer ** buffer) { size_t res; @@ -837,8 +829,8 @@ gst_queue_create_read (GstQueue * queue, guint64 offset, guint length, /* check if we have enough data at @offset. If there is not enough data, we * block and wait. */ - while (!gst_queue_have_data (queue, offset, length)) { - GST_QUEUE_WAIT_ADD_CHECK (queue, out_flushing); + while (!gst_queue2_have_data (queue, offset, length)) { + GST_QUEUE2_WAIT_ADD_CHECK (queue, out_flushing); } #ifdef HAVE_FSEEKO @@ -913,7 +905,7 @@ eos: /* should be called with QUEUE_LOCK */ static GstMiniObject * -gst_queue_read_item_from_file (GstQueue * queue) +gst_queue2_read_item_from_file (GstQueue2 * queue) { GstMiniObject *item; @@ -925,7 +917,7 @@ gst_queue_read_item_from_file (GstQueue * queue) GstBuffer *buffer; ret = - gst_queue_create_read (queue, queue->reading_pos, DEFAULT_BUFFER_SIZE, + gst_queue2_create_read (queue, queue->reading_pos, DEFAULT_BUFFER_SIZE, &buffer); switch (ret) { case GST_FLOW_OK: @@ -943,7 +935,7 @@ gst_queue_read_item_from_file (GstQueue * queue) } static gboolean -gst_queue_open_temp_location_file (GstQueue * queue) +gst_queue2_open_temp_location_file (GstQueue2 * queue) { gint fd = -1; gchar *name = NULL; @@ -1018,7 +1010,7 @@ open_failed: } static void -gst_queue_close_temp_location_file (GstQueue * queue) +gst_queue2_close_temp_location_file (GstQueue2 * queue) { /* nothing to do */ if (queue->temp_file == NULL) @@ -1035,7 +1027,7 @@ gst_queue_close_temp_location_file (GstQueue * queue) } static void -gst_queue_flush_temp_file (GstQueue * queue) +gst_queue2_flush_temp_file (GstQueue2 * queue) { if (queue->temp_file == NULL) return; @@ -1050,10 +1042,10 @@ gst_queue_flush_temp_file (GstQueue * queue) } static void -gst_queue_locked_flush (GstQueue * queue) +gst_queue2_locked_flush (GstQueue2 * queue) { if (QUEUE_IS_USING_TEMP_FILE (queue)) { - gst_queue_flush_temp_file (queue); + gst_queue2_flush_temp_file (queue); } else { while (!g_queue_is_empty (queue->queue)) { GstMiniObject *data = g_queue_pop_head (queue->queue); @@ -1063,7 +1055,7 @@ gst_queue_locked_flush (GstQueue * queue) gst_mini_object_unref (data); } } - GST_QUEUE_CLEAR_LEVEL (queue->cur_level); + GST_QUEUE2_CLEAR_LEVEL (queue->cur_level); gst_segment_init (&queue->sink_segment, GST_FORMAT_TIME); gst_segment_init (&queue->src_segment, GST_FORMAT_TIME); if (queue->starting_segment != NULL) @@ -1072,12 +1064,12 @@ gst_queue_locked_flush (GstQueue * queue) queue->segment_event_received = FALSE; /* we deleted a lot of something */ - GST_QUEUE_SIGNAL_DEL (queue); + GST_QUEUE2_SIGNAL_DEL (queue); } /* enqueue an item an update the level stats */ static void -gst_queue_locked_enqueue (GstQueue * queue, gpointer item) +gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item) { if (GST_IS_BUFFER (item)) { GstBuffer *buffer; @@ -1097,7 +1089,7 @@ gst_queue_locked_enqueue (GstQueue * queue, gpointer item) update_in_rates (queue); if (QUEUE_IS_USING_TEMP_FILE (queue)) { - gst_queue_write_buffer_to_file (queue, buffer); + gst_queue2_write_buffer_to_file (queue, buffer); } } else if (GST_IS_EVENT (item)) { @@ -1150,7 +1142,7 @@ gst_queue_locked_enqueue (GstQueue * queue, gpointer item) else gst_mini_object_unref (GST_MINI_OBJECT_CAST (item)); - GST_QUEUE_SIGNAL_ADD (queue); + GST_QUEUE2_SIGNAL_ADD (queue); } return; @@ -1169,12 +1161,12 @@ unexpected_event: /* dequeue an item from the queue and update level stats */ static GstMiniObject * -gst_queue_locked_dequeue (GstQueue * queue) +gst_queue2_locked_dequeue (GstQueue2 * queue) { GstMiniObject *item; if (QUEUE_IS_USING_TEMP_FILE (queue)) - item = gst_queue_read_item_from_file (queue); + item = gst_queue2_read_item_from_file (queue); else item = g_queue_pop_head (queue->queue); @@ -1209,7 +1201,7 @@ gst_queue_locked_dequeue (GstQueue * queue) switch (GST_EVENT_TYPE (event)) { case GST_EVENT_EOS: /* queue is empty now that we dequeued the EOS */ - GST_QUEUE_CLEAR_LEVEL (queue->cur_level); + GST_QUEUE2_CLEAR_LEVEL (queue->cur_level); break; case GST_EVENT_NEWSEGMENT: apply_segment (queue, event, &queue->src_segment); @@ -1223,7 +1215,7 @@ gst_queue_locked_dequeue (GstQueue * queue) item, GST_OBJECT_NAME (queue)); item = NULL; } - GST_QUEUE_SIGNAL_DEL (queue); + GST_QUEUE2_SIGNAL_DEL (queue); return item; @@ -1236,11 +1228,11 @@ no_item: } static gboolean -gst_queue_handle_sink_event (GstPad * pad, GstEvent * event) +gst_queue2_handle_sink_event (GstPad * pad, GstEvent * event) { - GstQueue *queue; + GstQueue2 *queue; - queue = GST_QUEUE (GST_OBJECT_PARENT (pad)); + queue = GST_QUEUE2 (GST_OBJECT_PARENT (pad)); switch (GST_EVENT_TYPE (event)) { case GST_EVENT_FLUSH_START: @@ -1250,12 +1242,12 @@ gst_queue_handle_sink_event (GstPad * pad, GstEvent * event) gst_pad_push_event (queue->srcpad, event); /* now unblock the chain function */ - GST_QUEUE_MUTEX_LOCK (queue); + GST_QUEUE2_MUTEX_LOCK (queue); queue->srcresult = GST_FLOW_WRONG_STATE; /* unblock the loop and chain functions */ g_cond_signal (queue->item_add); g_cond_signal (queue->item_del); - GST_QUEUE_MUTEX_UNLOCK (queue); + GST_QUEUE2_MUTEX_UNLOCK (queue); /* make sure it pauses, this should happen since we sent * flush_start downstream. */ @@ -1269,27 +1261,27 @@ gst_queue_handle_sink_event (GstPad * pad, GstEvent * event) /* forward event */ gst_pad_push_event (queue->srcpad, event); - GST_QUEUE_MUTEX_LOCK (queue); - gst_queue_locked_flush (queue); + GST_QUEUE2_MUTEX_LOCK (queue); + gst_queue2_locked_flush (queue); queue->srcresult = GST_FLOW_OK; queue->is_eos = FALSE; queue->unexpected = FALSE; /* reset rate counters */ reset_rate_timer (queue); - gst_pad_start_task (queue->srcpad, (GstTaskFunction) gst_queue_loop, + gst_pad_start_task (queue->srcpad, (GstTaskFunction) gst_queue2_loop, queue->srcpad); - GST_QUEUE_MUTEX_UNLOCK (queue); + GST_QUEUE2_MUTEX_UNLOCK (queue); goto done; } default: if (GST_EVENT_IS_SERIALIZED (event)) { /* serialized events go in the queue */ - GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing); + GST_QUEUE2_MUTEX_LOCK_CHECK (queue, out_flushing); /* refuse more events on EOS */ if (queue->is_eos) goto out_eos; - gst_queue_locked_enqueue (queue, event); - GST_QUEUE_MUTEX_UNLOCK (queue); + gst_queue2_locked_enqueue (queue, event); + GST_QUEUE2_MUTEX_UNLOCK (queue); } else { /* non-serialized events are passed upstream. */ gst_pad_push_event (queue->srcpad, event); @@ -1303,21 +1295,21 @@ done: out_flushing: { GST_DEBUG_OBJECT (queue, "refusing event, we are flushing"); - GST_QUEUE_MUTEX_UNLOCK (queue); + GST_QUEUE2_MUTEX_UNLOCK (queue); gst_event_unref (event); return FALSE; } out_eos: { GST_DEBUG_OBJECT (queue, "refusing event, we are EOS"); - GST_QUEUE_MUTEX_UNLOCK (queue); + GST_QUEUE2_MUTEX_UNLOCK (queue); gst_event_unref (event); return FALSE; } } static gboolean -gst_queue_is_empty (GstQueue * queue) +gst_queue2_is_empty (GstQueue2 * queue) { /* never empty on EOS */ if (queue->is_eos) @@ -1334,7 +1326,7 @@ gst_queue_is_empty (GstQueue * queue) } static gboolean -gst_queue_is_filled (GstQueue * queue) +gst_queue2_is_filled (GstQueue2 * queue) { gboolean res; @@ -1366,11 +1358,11 @@ gst_queue_is_filled (GstQueue * queue) } static GstFlowReturn -gst_queue_chain (GstPad * pad, GstBuffer * buffer) +gst_queue2_chain (GstPad * pad, GstBuffer * buffer) { - GstQueue *queue; + GstQueue2 *queue; - queue = GST_QUEUE (GST_OBJECT_PARENT (pad)); + queue = GST_QUEUE2 (GST_OBJECT_PARENT (pad)); GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received buffer %p of size %d, time %" GST_TIME_FORMAT ", duration %" @@ -1379,7 +1371,7 @@ gst_queue_chain (GstPad * pad, GstBuffer * buffer) GST_TIME_ARGS (GST_BUFFER_DURATION (buffer))); /* we have to lock the queue since we span threads */ - GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing); + GST_QUEUE2_MUTEX_LOCK_CHECK (queue, out_flushing); /* when we received EOS, we refuse more data */ if (queue->is_eos) goto out_eos; @@ -1389,7 +1381,7 @@ gst_queue_chain (GstPad * pad, GstBuffer * buffer) /* We make space available if we're "full" according to whatever * the user defined as "full". */ - if (gst_queue_is_filled (queue)) { + if (gst_queue2_is_filled (queue)) { gboolean started; /* pause the timer while we wait. The fact that we are waiting does not mean @@ -1401,9 +1393,9 @@ gst_queue_chain (GstPad * pad, GstBuffer * buffer) "queue is full, waiting for free space"); do { /* Wait for space to be available, we could be unlocked because of a flush. */ - GST_QUEUE_WAIT_DEL_CHECK (queue, out_flushing); + GST_QUEUE2_WAIT_DEL_CHECK (queue, out_flushing); } - while (gst_queue_is_filled (queue)); + while (gst_queue2_is_filled (queue)); /* and continue if we were running before */ if (started) @@ -1411,8 +1403,8 @@ gst_queue_chain (GstPad * pad, GstBuffer * buffer) } /* put buffer in queue now */ - gst_queue_locked_enqueue (queue, buffer); - GST_QUEUE_MUTEX_UNLOCK (queue); + gst_queue2_locked_enqueue (queue, buffer); + GST_QUEUE2_MUTEX_UNLOCK (queue); return GST_FLOW_OK; @@ -1423,7 +1415,7 @@ out_flushing: GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because task paused, reason: %s", gst_flow_get_name (ret)); - GST_QUEUE_MUTEX_UNLOCK (queue); + GST_QUEUE2_MUTEX_UNLOCK (queue); gst_buffer_unref (buffer); return ret; @@ -1431,7 +1423,7 @@ out_flushing: out_eos: { GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we received EOS"); - GST_QUEUE_MUTEX_UNLOCK (queue); + GST_QUEUE2_MUTEX_UNLOCK (queue); gst_buffer_unref (buffer); return GST_FLOW_UNEXPECTED; @@ -1440,7 +1432,7 @@ out_unexpected: { GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we received UNEXPECTED"); - GST_QUEUE_MUTEX_UNLOCK (queue); + GST_QUEUE2_MUTEX_UNLOCK (queue); gst_buffer_unref (buffer); return GST_FLOW_UNEXPECTED; @@ -1450,12 +1442,12 @@ out_unexpected: /* dequeue an item from the queue an push it downstream. This functions returns * the result of the push. */ static GstFlowReturn -gst_queue_push_one (GstQueue * queue) +gst_queue2_push_one (GstQueue2 * queue) { GstFlowReturn result = GST_FLOW_OK; GstMiniObject *data; - data = gst_queue_locked_dequeue (queue); + data = gst_queue2_locked_dequeue (queue); if (data == NULL) goto no_item; @@ -1467,7 +1459,7 @@ next: buffer = GST_BUFFER_CAST (data); caps = GST_BUFFER_CAPS (buffer); - GST_QUEUE_MUTEX_UNLOCK (queue); + GST_QUEUE2_MUTEX_UNLOCK (queue); /* set caps before pushing the buffer so that core does not try to do * something fancy to check if this is possible. */ @@ -1477,7 +1469,7 @@ next: result = gst_pad_push (queue->srcpad, buffer); /* need to check for srcresult here as well */ - GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing); + GST_QUEUE2_MUTEX_LOCK_CHECK (queue, out_flushing); if (result == GST_FLOW_UNEXPECTED) { GST_CAT_LOG_OBJECT (queue_dataflow, queue, "got UNEXPECTED from downstream"); @@ -1486,7 +1478,7 @@ next: * queue we can push, we set a flag to make the sinkpad refuse more * buffers with an UNEXPECTED return value until we receive something * pushable again or we get flushed. */ - while ((data = gst_queue_locked_dequeue (queue))) { + while ((data = gst_queue2_locked_dequeue (queue))) { if (GST_IS_BUFFER (data)) { GST_CAT_LOG_OBJECT (queue_dataflow, queue, "dropping UNEXPECTED buffer %p", data); @@ -1518,11 +1510,11 @@ next: GstEvent *event = GST_EVENT_CAST (data); GstEventType type = GST_EVENT_TYPE (event); - GST_QUEUE_MUTEX_UNLOCK (queue); + GST_QUEUE2_MUTEX_UNLOCK (queue); gst_pad_push_event (queue->srcpad, event); - GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing); + GST_QUEUE2_MUTEX_LOCK_CHECK (queue, out_flushing); /* if we're EOS, return UNEXPECTED so that the task pauses. */ if (type == GST_EVENT_EOS) { GST_CAT_LOG_OBJECT (queue_dataflow, queue, @@ -1549,17 +1541,17 @@ out_flushing: /* called repeadedly with @pad as the source pad. This function should push out * data to the peer element. */ static void -gst_queue_loop (GstPad * pad) +gst_queue2_loop (GstPad * pad) { - GstQueue *queue; + GstQueue2 *queue; GstFlowReturn ret; - queue = GST_QUEUE (GST_PAD_PARENT (pad)); + queue = GST_QUEUE2 (GST_PAD_PARENT (pad)); /* have to lock for thread-safety */ - GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing); + GST_QUEUE2_MUTEX_LOCK_CHECK (queue, out_flushing); - if (gst_queue_is_empty (queue)) { + if (gst_queue2_is_empty (queue)) { gboolean started; /* pause the timer while we wait. The fact that we are waiting does not mean @@ -1571,20 +1563,20 @@ gst_queue_loop (GstPad * pad) "queue is empty, waiting for new data"); do { /* Wait for data to be available, we could be unlocked because of a flush. */ - GST_QUEUE_WAIT_ADD_CHECK (queue, out_flushing); + GST_QUEUE2_WAIT_ADD_CHECK (queue, out_flushing); } - while (gst_queue_is_empty (queue)); + while (gst_queue2_is_empty (queue)); /* and continue if we were running before */ if (started) g_timer_continue (queue->out_timer); } - ret = gst_queue_push_one (queue); + ret = gst_queue2_push_one (queue); queue->srcresult = ret; if (ret != GST_FLOW_OK) goto out_flushing; - GST_QUEUE_MUTEX_UNLOCK (queue); + GST_QUEUE2_MUTEX_UNLOCK (queue); return; @@ -1597,7 +1589,7 @@ out_flushing: gst_pad_pause_task (queue->srcpad); GST_CAT_LOG_OBJECT (queue_dataflow, queue, "pause task, reason: %s", gst_flow_get_name (queue->srcresult)); - GST_QUEUE_MUTEX_UNLOCK (queue); + GST_QUEUE2_MUTEX_UNLOCK (queue); /* let app know about us giving up if upstream is not expected to do so */ /* UNEXPECTED is already taken care of elsewhere */ if (eos && (GST_FLOW_IS_FATAL (ret) || ret == GST_FLOW_NOT_LINKED) && @@ -1613,10 +1605,10 @@ out_flushing: } static gboolean -gst_queue_handle_src_event (GstPad * pad, GstEvent * event) +gst_queue2_handle_src_event (GstPad * pad, GstEvent * event) { gboolean res = TRUE; - GstQueue *queue = GST_QUEUE (GST_PAD_PARENT (pad)); + GstQueue2 *queue = GST_QUEUE2 (GST_PAD_PARENT (pad)); #ifndef GST_DISABLE_GST_DEBUG GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "got event %p (%s)", @@ -1636,7 +1628,7 @@ gst_queue_handle_src_event (GstPad * pad, GstEvent * event) } static gboolean -gst_queue_peer_query (GstQueue * queue, GstPad * pad, GstQuery * query) +gst_queue2_peer_query (GstQueue2 * queue, GstPad * pad, GstQuery * query) { gboolean ret = FALSE; GstPad *peer; @@ -1649,11 +1641,11 @@ gst_queue_peer_query (GstQueue * queue, GstPad * pad, GstQuery * query) } static gboolean -gst_queue_handle_src_query (GstPad * pad, GstQuery * query) +gst_queue2_handle_src_query (GstPad * pad, GstQuery * query) { - GstQueue *queue; + GstQueue2 *queue; - queue = GST_QUEUE (GST_PAD_PARENT (pad)); + queue = GST_QUEUE2 (GST_PAD_PARENT (pad)); switch (GST_QUERY_TYPE (query)) { case GST_QUERY_POSITION: @@ -1661,7 +1653,7 @@ gst_queue_handle_src_query (GstPad * pad, GstQuery * query) gint64 peer_pos; GstFormat format; - if (!gst_queue_peer_query (queue, queue->sinkpad, query)) + if (!gst_queue2_peer_query (queue, queue->sinkpad, query)) goto peer_failed; /* get peer position */ @@ -1688,7 +1680,7 @@ gst_queue_handle_src_query (GstPad * pad, GstQuery * query) { GST_DEBUG_OBJECT (queue, "doing peer query"); - if (!gst_queue_peer_query (queue, queue->sinkpad, query)) + if (!gst_queue2_peer_query (queue, queue->sinkpad, query)) goto peer_failed; GST_DEBUG_OBJECT (queue, "peer query success"); @@ -1702,7 +1694,7 @@ gst_queue_handle_src_query (GstPad * pad, GstQuery * query) if (!QUEUE_IS_USING_TEMP_FILE (queue)) { /* no temp file, just forward to the peer */ - if (!gst_queue_peer_query (queue, queue->sinkpad, query)) + if (!gst_queue2_peer_query (queue, queue->sinkpad, query)) goto peer_failed; GST_DEBUG_OBJECT (queue, "buffering forwarded to peer"); } else { @@ -1749,7 +1741,7 @@ gst_queue_handle_src_query (GstPad * pad, GstQuery * query) } default: /* peer handled other queries */ - if (!gst_queue_peer_query (queue, queue->sinkpad, query)) + if (!gst_queue2_peer_query (queue, queue->sinkpad, query)) goto peer_failed; break; } @@ -1765,21 +1757,21 @@ peer_failed: } static GstFlowReturn -gst_queue_get_range (GstPad * pad, guint64 offset, guint length, +gst_queue2_get_range (GstPad * pad, guint64 offset, guint length, GstBuffer ** buffer) { - GstQueue *queue; + GstQueue2 *queue; GstFlowReturn ret; - queue = GST_QUEUE_CAST (gst_pad_get_parent (pad)); + queue = GST_QUEUE2_CAST (gst_pad_get_parent (pad)); - GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing); + GST_QUEUE2_MUTEX_LOCK_CHECK (queue, out_flushing); length = (length == -1) ? DEFAULT_BUFFER_SIZE : length; offset = (offset == -1) ? queue->reading_pos : offset; /* function will block when the range is not yet available */ - ret = gst_queue_create_read (queue, offset, length, buffer); - GST_QUEUE_MUTEX_UNLOCK (queue); + ret = gst_queue2_create_read (queue, offset, length, buffer); + GST_QUEUE2_MUTEX_UNLOCK (queue); gst_object_unref (queue); @@ -1789,18 +1781,18 @@ gst_queue_get_range (GstPad * pad, guint64 offset, guint length, out_flushing: { GST_DEBUG_OBJECT (queue, "we are flushing"); - GST_QUEUE_MUTEX_UNLOCK (queue); + GST_QUEUE2_MUTEX_UNLOCK (queue); return GST_FLOW_WRONG_STATE; } } static gboolean -gst_queue_src_checkgetrange_function (GstPad * pad) +gst_queue2_src_checkgetrange_function (GstPad * pad) { - GstQueue *queue; + GstQueue2 *queue; gboolean ret; - queue = GST_QUEUE (gst_pad_get_parent (pad)); + queue = GST_QUEUE2 (gst_pad_get_parent (pad)); /* we can operate in pull mode when we are using a tempfile */ ret = QUEUE_IS_USING_TEMP_FILE (queue); @@ -1812,28 +1804,28 @@ gst_queue_src_checkgetrange_function (GstPad * pad) /* sink currently only operates in push mode */ static gboolean -gst_queue_sink_activate_push (GstPad * pad, gboolean active) +gst_queue2_sink_activate_push (GstPad * pad, gboolean active) { gboolean result = TRUE; - GstQueue *queue; + GstQueue2 *queue; - queue = GST_QUEUE (gst_pad_get_parent (pad)); + queue = GST_QUEUE2 (gst_pad_get_parent (pad)); if (active) { - GST_QUEUE_MUTEX_LOCK (queue); + GST_QUEUE2_MUTEX_LOCK (queue); GST_DEBUG_OBJECT (queue, "activating push mode"); queue->srcresult = GST_FLOW_OK; queue->is_eos = FALSE; queue->unexpected = FALSE; reset_rate_timer (queue); - GST_QUEUE_MUTEX_UNLOCK (queue); + GST_QUEUE2_MUTEX_UNLOCK (queue); } else { /* unblock chain function */ - GST_QUEUE_MUTEX_LOCK (queue); + GST_QUEUE2_MUTEX_LOCK (queue); GST_DEBUG_OBJECT (queue, "deactivating push mode"); queue->srcresult = GST_FLOW_WRONG_STATE; - gst_queue_locked_flush (queue); - GST_QUEUE_MUTEX_UNLOCK (queue); + gst_queue2_locked_flush (queue); + GST_QUEUE2_MUTEX_UNLOCK (queue); } gst_object_unref (queue); @@ -1844,29 +1836,29 @@ gst_queue_sink_activate_push (GstPad * pad, gboolean active) /* src operating in push mode, we start a task on the source pad that pushes out * buffers from the queue */ static gboolean -gst_queue_src_activate_push (GstPad * pad, gboolean active) +gst_queue2_src_activate_push (GstPad * pad, gboolean active) { gboolean result = FALSE; - GstQueue *queue; + GstQueue2 *queue; - queue = GST_QUEUE (gst_pad_get_parent (pad)); + queue = GST_QUEUE2 (gst_pad_get_parent (pad)); if (active) { - GST_QUEUE_MUTEX_LOCK (queue); + GST_QUEUE2_MUTEX_LOCK (queue); GST_DEBUG_OBJECT (queue, "activating push mode"); queue->srcresult = GST_FLOW_OK; queue->is_eos = FALSE; queue->unexpected = FALSE; - result = gst_pad_start_task (pad, (GstTaskFunction) gst_queue_loop, pad); - GST_QUEUE_MUTEX_UNLOCK (queue); + result = gst_pad_start_task (pad, (GstTaskFunction) gst_queue2_loop, pad); + GST_QUEUE2_MUTEX_UNLOCK (queue); } else { /* unblock loop function */ - GST_QUEUE_MUTEX_LOCK (queue); + GST_QUEUE2_MUTEX_LOCK (queue); GST_DEBUG_OBJECT (queue, "deactivating push mode"); queue->srcresult = GST_FLOW_WRONG_STATE; /* the item add signal will unblock */ g_cond_signal (queue->item_add); - GST_QUEUE_MUTEX_UNLOCK (queue); + GST_QUEUE2_MUTEX_UNLOCK (queue); /* step 2, make sure streaming finishes */ result = gst_pad_stop_task (pad); @@ -1879,39 +1871,39 @@ gst_queue_src_activate_push (GstPad * pad, gboolean active) /* pull mode, downstream will call our getrange function */ static gboolean -gst_queue_src_activate_pull (GstPad * pad, gboolean active) +gst_queue2_src_activate_pull (GstPad * pad, gboolean active) { gboolean result; - GstQueue *queue; + GstQueue2 *queue; - queue = GST_QUEUE (gst_pad_get_parent (pad)); + queue = GST_QUEUE2 (gst_pad_get_parent (pad)); if (active) { if (QUEUE_IS_USING_TEMP_FILE (queue)) { - GST_QUEUE_MUTEX_LOCK (queue); + GST_QUEUE2_MUTEX_LOCK (queue); GST_DEBUG_OBJECT (queue, "activating pull mode"); queue->srcresult = GST_FLOW_OK; queue->is_eos = FALSE; queue->unexpected = FALSE; result = TRUE; - GST_QUEUE_MUTEX_UNLOCK (queue); + GST_QUEUE2_MUTEX_UNLOCK (queue); } else { - GST_QUEUE_MUTEX_LOCK (queue); + GST_QUEUE2_MUTEX_LOCK (queue); GST_DEBUG_OBJECT (queue, "no temp file, cannot activate pull mode"); /* this is not allowed, we cannot operate in pull mode without a temp * file. */ queue->srcresult = GST_FLOW_WRONG_STATE; result = FALSE; - GST_QUEUE_MUTEX_UNLOCK (queue); + GST_QUEUE2_MUTEX_UNLOCK (queue); } } else { - GST_QUEUE_MUTEX_LOCK (queue); + GST_QUEUE2_MUTEX_LOCK (queue); GST_DEBUG_OBJECT (queue, "deactivating pull mode"); queue->srcresult = GST_FLOW_WRONG_STATE; /* this will unlock getrange */ g_cond_signal (queue->item_add); result = TRUE; - GST_QUEUE_MUTEX_UNLOCK (queue); + GST_QUEUE2_MUTEX_UNLOCK (queue); } gst_object_unref (queue); @@ -1919,19 +1911,19 @@ gst_queue_src_activate_pull (GstPad * pad, gboolean active) } static GstStateChangeReturn -gst_queue_change_state (GstElement * element, GstStateChange transition) +gst_queue2_change_state (GstElement * element, GstStateChange transition) { - GstQueue *queue; + GstQueue2 *queue; GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS; - queue = GST_QUEUE (element); + queue = GST_QUEUE2 (element); switch (transition) { case GST_STATE_CHANGE_NULL_TO_READY: break; case GST_STATE_CHANGE_READY_TO_PAUSED: if (QUEUE_IS_USING_TEMP_FILE (queue)) { - if (!gst_queue_open_temp_location_file (queue)) + if (!gst_queue2_open_temp_location_file (queue)) ret = GST_STATE_CHANGE_FAILURE; } queue->segment_event_received = FALSE; @@ -1950,7 +1942,7 @@ gst_queue_change_state (GstElement * element, GstStateChange transition) break; case GST_STATE_CHANGE_PAUSED_TO_READY: if (QUEUE_IS_USING_TEMP_FILE (queue)) - gst_queue_close_temp_location_file (queue); + gst_queue2_close_temp_location_file (queue); if (queue->starting_segment != NULL) { gst_event_unref (queue->starting_segment); queue->starting_segment = NULL; @@ -1979,7 +1971,7 @@ gst_queue_change_state (GstElement * element, GstStateChange transition) g_cond_signal (queue->item_add); static void -gst_queue_set_temp_template (GstQueue * queue, const gchar * template) +gst_queue2_set_temp_template (GstQueue2 * queue, const gchar * template) { GstState state; @@ -2005,14 +1997,14 @@ wrong_state: } static void -gst_queue_set_property (GObject * object, +gst_queue2_set_property (GObject * object, guint prop_id, const GValue * value, GParamSpec * pspec) { - GstQueue *queue = GST_QUEUE (object); + GstQueue2 *queue = GST_QUEUE2 (object); /* someone could change levels here, and since this * affects the get/put funcs, we need to lock for safety. */ - GST_QUEUE_MUTEX_LOCK (queue); + GST_QUEUE2_MUTEX_LOCK (queue); switch (prop_id) { case PROP_MAX_SIZE_BYTES: @@ -2043,7 +2035,7 @@ gst_queue_set_property (GObject * object, queue->high_percent = g_value_get_int (value); break; case PROP_TEMP_TEMPLATE: - gst_queue_set_temp_template (queue, g_value_get_string (value)); + gst_queue2_set_temp_template (queue, g_value_get_string (value)); break; case PROP_TEMP_LOCATION: g_free (queue->temp_location); @@ -2057,16 +2049,16 @@ gst_queue_set_property (GObject * object, break; } - GST_QUEUE_MUTEX_UNLOCK (queue); + GST_QUEUE2_MUTEX_UNLOCK (queue); } static void -gst_queue_get_property (GObject * object, +gst_queue2_get_property (GObject * object, guint prop_id, GValue * value, GParamSpec * pspec) { - GstQueue *queue = GST_QUEUE (object); + GstQueue2 *queue = GST_QUEUE2 (object); - GST_QUEUE_MUTEX_LOCK (queue); + GST_QUEUE2_MUTEX_LOCK (queue); switch (prop_id) { case PROP_CUR_LEVEL_BYTES: @@ -2110,28 +2102,5 @@ gst_queue_get_property (GObject * object, break; } - GST_QUEUE_MUTEX_UNLOCK (queue); + GST_QUEUE2_MUTEX_UNLOCK (queue); } - -static gboolean -plugin_init (GstPlugin * plugin) -{ - GST_DEBUG_CATEGORY_INIT (queue_debug, "queue2", 0, "queue element"); - GST_DEBUG_CATEGORY_INIT (queue_dataflow, "queue2_dataflow", 0, - "dataflow inside the queue element"); - -#ifdef ENABLE_NLS - GST_DEBUG ("binding text domain %s to locale dir %s", GETTEXT_PACKAGE, - LOCALEDIR); - bindtextdomain (GETTEXT_PACKAGE, LOCALEDIR); - bind_textdomain_codeset (GETTEXT_PACKAGE, "UTF-8"); -#endif /* ENABLE_NLS */ - - return gst_element_register (plugin, "queue2", GST_RANK_NONE, GST_TYPE_QUEUE); -} - -GST_PLUGIN_DEFINE (GST_VERSION_MAJOR, - GST_VERSION_MINOR, - "queue2", - "Queue newer version", plugin_init, VERSION, GST_LICENSE, - GST_PACKAGE_NAME, GST_PACKAGE_ORIGIN) diff --git a/plugins/elements/gstqueue2.h b/plugins/elements/gstqueue2.h index a541f246e5..44165b02ff 100644 --- a/plugins/elements/gstqueue2.h +++ b/plugins/elements/gstqueue2.h @@ -28,24 +28,33 @@ G_BEGIN_DECLS -#define GST_TYPE_QUEUE \ - (gst_queue_get_type()) -#define GST_QUEUE(obj) \ - (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_QUEUE,GstQueue)) -#define GST_QUEUE_CLASS(klass) \ - (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_QUEUE,GstQueueClass)) -#define GST_IS_QUEUE(obj) \ - (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_QUEUE)) -#define GST_IS_QUEUE_CLASS(klass) \ - (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_QUEUE)) -#define GST_QUEUE_CAST(obj) \ - ((GstQueue *)(obj)) +#define GST_TYPE_QUEUE2 \ + (gst_queue2_get_type()) +#define GST_QUEUE2(obj) \ + (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_QUEUE2,GstQueue2)) +#define GST_QUEUE2_CLASS(klass) \ + (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_QUEUE2,GstQueue2Class)) +#define GST_IS_QUEUE2(obj) \ + (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_QUEUE2)) +#define GST_IS_QUEUE2_CLASS(klass) \ + (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_QUEUE2)) +#define GST_QUEUE2_CAST(obj) \ + ((GstQueue2 *)(obj)) -typedef struct _GstQueue GstQueue; -typedef struct _GstQueueSize GstQueueSize; -typedef struct _GstQueueClass GstQueueClass; +typedef struct _GstQueue2 GstQueue2; +typedef struct _GstQueue2Size GstQueue2Size; +typedef struct _GstQueue2Class GstQueue2Class; -struct _GstQueue +/* used to keep track of sizes (current and max) */ +struct _GstQueue2Size +{ + guint buffers; + guint bytes; + guint64 time; + guint64 rate_time; +}; + +struct _GstQueue2 { GstElement element; @@ -65,8 +74,8 @@ struct _GstQueue /* the queue of data we're keeping our hands on */ GQueue *queue; - GstQueueSize cur_level; /* currently in the queue */ - GstQueueSize max_level; /* max. amount of data allowed in the queue */ + GstQueue2Size cur_level; /* currently in the queue */ + GstQueue2Size max_level; /* max. amount of data allowed in the queue */ gboolean use_buffering; gboolean use_rate_estimate; GstClockTime buffering_interval; @@ -111,11 +120,13 @@ struct _GstQueue }; -struct _GstQueueClass +struct _GstQueue2Class { GstElementClass parent_class; }; +GType gst_queue2_get_type (void); + G_END_DECLS #endif /* __GST_QUEUE2_H__ */