diff --git a/ChangeLog b/ChangeLog index 732deaa725..c3c33c74c0 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,16 @@ +2007-06-21 Jan Schmidt + + * plugins/elements/gstmultiqueue.c: (gst_multi_queue_set_property), + (update_time_level), (gst_single_queue_push_one), + (gst_multi_queue_chain), (gst_multi_queue_sink_event), + (single_queue_overrun_cb), (single_queue_underrun_cb), + (single_queue_check_full): + Fix setting max-* properties after adding queues. + Use IS_FILLED for checking visible items. + Signal overrun if multiple queues overrun. + Add extra debug output. + Patch by: Wim Taymans + 2007-06-21 Stefan Kost * gst/gstelement.c: (gst_element_class_set_details_simple): diff --git a/plugins/elements/gstmultiqueue.c b/plugins/elements/gstmultiqueue.c index f1f078b5f7..b2e6ab3127 100644 --- a/plugins/elements/gstmultiqueue.c +++ b/plugins/elements/gstmultiqueue.c @@ -268,11 +268,11 @@ gst_multi_queue_finalize (GObject * object) G_OBJECT_CLASS (parent_class)->finalize (object); } -#define SET_CHILD_PROPERTY(mq,name,value) G_STMT_START { \ +#define SET_CHILD_PROPERTY(mq,format) G_STMT_START { \ GList * tmp = mq->queues; \ while (tmp) { \ GstSingleQueue *q = (GstSingleQueue*)tmp->data; \ - g_object_set_property ((GObject*) q->queue, name, value); \ + q->max_size.format = mq->max_size.format; \ tmp = g_list_next(tmp); \ }; \ } G_STMT_END @@ -286,15 +286,15 @@ gst_multi_queue_set_property (GObject * object, guint prop_id, switch (prop_id) { case ARG_MAX_SIZE_BYTES: mq->max_size.bytes = g_value_get_uint (value); - SET_CHILD_PROPERTY (mq, "max-size-bytes", value); + SET_CHILD_PROPERTY (mq, bytes); break; case ARG_MAX_SIZE_BUFFERS: mq->max_size.visible = g_value_get_uint (value); - SET_CHILD_PROPERTY (mq, "max-size-visible", value); + SET_CHILD_PROPERTY (mq, visible); break; case ARG_MAX_SIZE_TIME: mq->max_size.time = g_value_get_uint64 (value); - SET_CHILD_PROPERTY (mq, "max-size-time", value); + SET_CHILD_PROPERTY (mq, time); break; case ARG_EXTRA_SIZE_BYTES: mq->extra_size.bytes = g_value_get_uint (value); @@ -476,6 +476,8 @@ update_time_level (GstMultiQueue * mq, GstSingleQueue * sq) "queue %d, sink %" GST_TIME_FORMAT ", src %" GST_TIME_FORMAT, sq->id, GST_TIME_ARGS (sink_time), GST_TIME_ARGS (src_time)); + /* This allows for streams with out of order timestamping - sometimes the + * emerging timestamp is later than the arriving one(s) */ if (sink_time >= src_time) sq->cur_time = sink_time - src_time; else @@ -556,6 +558,10 @@ gst_single_queue_push_one (GstMultiQueue * mq, GstSingleQueue * sq, apply_buffer (mq, sq, timestamp, duration, &sq->src_segment); + GST_DEBUG_OBJECT (mq, + "SingleQueue %d : Pushing buffer %p with ts %" GST_TIME_FORMAT, + sq->id, buffer, GST_TIME_ARGS (timestamp)); + result = gst_pad_push (sq->srcpad, buffer); } else if (GST_IS_EVENT (object)) { GstEvent *event; @@ -573,6 +579,10 @@ gst_single_queue_push_one (GstMultiQueue * mq, GstSingleQueue * sq, break; } + GST_DEBUG_OBJECT (mq, + "SingleQueue %d : Pushing event %p of type %s", + sq->id, event, GST_EVENT_TYPE_NAME (event)); + gst_pad_push_event (sq->srcpad, event); } else { g_warning ("Unexpected object in singlequeue %d (refcounting problem?)", @@ -751,8 +761,8 @@ gst_multi_queue_chain (GstPad * pad, GstBuffer * buffer) curid = mq->counter++; GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); - GST_LOG_OBJECT (mq, "SingleQueue %d : about to push buffer with id %d", - sq->id, curid); + GST_LOG_OBJECT (mq, "SingleQueue %d : about to enqueue buffer %p with id %d", + sq->id, buffer, curid); item = gst_multi_queue_item_new (GST_MINI_OBJECT_CAST (buffer), curid); @@ -854,8 +864,8 @@ gst_multi_queue_sink_event (GstPad * pad, GstEvent * event) item = gst_multi_queue_item_new ((GstMiniObject *) event, curid); GST_DEBUG_OBJECT (mq, - "SingleQueue %d : Adding event %p of type %s with id %d", sq->id, event, - GST_EVENT_TYPE_NAME (event), curid); + "SingleQueue %d : Enqueuing event %p of type %s with id %d", + sq->id, event, GST_EVENT_TYPE_NAME (event), curid); if (!(res = gst_data_queue_push (sq->queue, (GstDataQueueItem *) item))) goto flushing; @@ -1041,6 +1051,9 @@ compute_next_non_linked (GstMultiQueue * mq) lowest, mq->highid); } +#define IS_FILLED(format, value) ((sq->max_size.format) != 0 && \ + (sq->max_size.format) <= (value)) + /* * GstSingleQueue functions */ @@ -1050,17 +1063,19 @@ single_queue_overrun_cb (GstDataQueue * dq, GstSingleQueue * sq) GstMultiQueue *mq = sq->mqueue; GList *tmp; GstDataQueueSize size; + gboolean filled = FALSE; gst_data_queue_get_level (sq->queue, &size); - GST_LOG_OBJECT (sq->mqueue, "Single Queue %d is full", sq->id); + GST_LOG_OBJECT (mq, "Single Queue %d is full", sq->id); GST_MULTI_QUEUE_MUTEX_LOCK (mq); for (tmp = mq->queues; tmp; tmp = g_list_next (tmp)) { GstSingleQueue *ssq = (GstSingleQueue *) tmp->data; + GstDataQueueSize ssize; if (gst_data_queue_is_empty (ssq->queue)) { - if (size.visible == sq->max_size.visible) { + if (IS_FILLED (visible, size.visible)) { sq->max_size.visible++; GST_DEBUG_OBJECT (mq, "Another queue is empty, bumping single queue %d max visible to %d", @@ -1069,12 +1084,22 @@ single_queue_overrun_cb (GstDataQueue * dq, GstSingleQueue * sq) GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); goto beach; } + /* check if we reached the hard time/bytes limits */ + gst_data_queue_get_level (ssq->queue, &ssize); + + /* if this queue is filled completely we must signal overrun */ + if (IS_FILLED (bytes, ssize.bytes) || IS_FILLED (time, ssize.time)) { + filled = TRUE; + } } + /* no queues were empty */ GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); /* Overrun is always forwarded, since this is blocking the upstream element */ - g_signal_emit (G_OBJECT (sq->mqueue), gst_multi_queue_signals[SIGNAL_OVERRUN], - 0); + if (filled) { + GST_DEBUG_OBJECT (mq, "A queue is filled, signalling overrun"); + g_signal_emit (G_OBJECT (mq), gst_multi_queue_signals[SIGNAL_OVERRUN], 0); + } beach: return; @@ -1098,7 +1123,7 @@ single_queue_underrun_cb (GstDataQueue * dq, GstSingleQueue * sq) GstDataQueueSize size; gst_data_queue_get_level (sq->queue, &size); - if (size.visible == sq->max_size.visible) { + if (IS_FILLED (visible, size.visible)) { sq->max_size.visible++; GST_DEBUG_OBJECT (mq, "queue %d is filled, bumping its max visible to %d", sq->id, @@ -1131,9 +1156,6 @@ single_queue_check_full (GstDataQueue * dataq, guint visible, guint bytes, if (sq->is_eos) return TRUE; -#define IS_FILLED(format, value) ((sq->max_size.format) != 0 && \ - (sq->max_size.format) <= (value)) - /* we never go past the max visible items */ if (IS_FILLED (visible, visible)) return TRUE;