plugins/elements/gstmultiqueue.c: Fix setting max-* properties after adding queues.

Original commit message from CVS:
* plugins/elements/gstmultiqueue.c: (gst_multi_queue_set_property),
(update_time_level), (gst_single_queue_push_one),
(gst_multi_queue_chain), (gst_multi_queue_sink_event),
(single_queue_overrun_cb), (single_queue_underrun_cb),
(single_queue_check_full):
Fix setting max-* properties after adding queues.
Use IS_FILLED for checking visible items.
Signal overrun if multiple queues overrun.
Add extra debug output.
Patch by: Wim Taymans <wim@fluendo.com>
This commit is contained in:
Wim Taymans 2007-06-21 14:35:03 +00:00 committed by Jan Schmidt
parent 9eeb1c8e2b
commit a689b50cab
2 changed files with 52 additions and 17 deletions

View file

@ -1,3 +1,16 @@
2007-06-21 Jan Schmidt <thaytan@mad.scientist.com>
* plugins/elements/gstmultiqueue.c: (gst_multi_queue_set_property),
(update_time_level), (gst_single_queue_push_one),
(gst_multi_queue_chain), (gst_multi_queue_sink_event),
(single_queue_overrun_cb), (single_queue_underrun_cb),
(single_queue_check_full):
Fix setting max-* properties after adding queues.
Use IS_FILLED for checking visible items.
Signal overrun if multiple queues overrun.
Add extra debug output.
Patch by: Wim Taymans <wim@fluendo.com>
2007-06-21 Stefan Kost <ensonic@users.sf.net> 2007-06-21 Stefan Kost <ensonic@users.sf.net>
* gst/gstelement.c: (gst_element_class_set_details_simple): * gst/gstelement.c: (gst_element_class_set_details_simple):

View file

@ -268,11 +268,11 @@ gst_multi_queue_finalize (GObject * object)
G_OBJECT_CLASS (parent_class)->finalize (object); G_OBJECT_CLASS (parent_class)->finalize (object);
} }
#define SET_CHILD_PROPERTY(mq,name,value) G_STMT_START { \ #define SET_CHILD_PROPERTY(mq,format) G_STMT_START { \
GList * tmp = mq->queues; \ GList * tmp = mq->queues; \
while (tmp) { \ while (tmp) { \
GstSingleQueue *q = (GstSingleQueue*)tmp->data; \ GstSingleQueue *q = (GstSingleQueue*)tmp->data; \
g_object_set_property ((GObject*) q->queue, name, value); \ q->max_size.format = mq->max_size.format; \
tmp = g_list_next(tmp); \ tmp = g_list_next(tmp); \
}; \ }; \
} G_STMT_END } G_STMT_END
@ -286,15 +286,15 @@ gst_multi_queue_set_property (GObject * object, guint prop_id,
switch (prop_id) { switch (prop_id) {
case ARG_MAX_SIZE_BYTES: case ARG_MAX_SIZE_BYTES:
mq->max_size.bytes = g_value_get_uint (value); mq->max_size.bytes = g_value_get_uint (value);
SET_CHILD_PROPERTY (mq, "max-size-bytes", value); SET_CHILD_PROPERTY (mq, bytes);
break; break;
case ARG_MAX_SIZE_BUFFERS: case ARG_MAX_SIZE_BUFFERS:
mq->max_size.visible = g_value_get_uint (value); mq->max_size.visible = g_value_get_uint (value);
SET_CHILD_PROPERTY (mq, "max-size-visible", value); SET_CHILD_PROPERTY (mq, visible);
break; break;
case ARG_MAX_SIZE_TIME: case ARG_MAX_SIZE_TIME:
mq->max_size.time = g_value_get_uint64 (value); mq->max_size.time = g_value_get_uint64 (value);
SET_CHILD_PROPERTY (mq, "max-size-time", value); SET_CHILD_PROPERTY (mq, time);
break; break;
case ARG_EXTRA_SIZE_BYTES: case ARG_EXTRA_SIZE_BYTES:
mq->extra_size.bytes = g_value_get_uint (value); mq->extra_size.bytes = g_value_get_uint (value);
@ -476,6 +476,8 @@ update_time_level (GstMultiQueue * mq, GstSingleQueue * sq)
"queue %d, sink %" GST_TIME_FORMAT ", src %" GST_TIME_FORMAT, sq->id, "queue %d, sink %" GST_TIME_FORMAT ", src %" GST_TIME_FORMAT, sq->id,
GST_TIME_ARGS (sink_time), GST_TIME_ARGS (src_time)); GST_TIME_ARGS (sink_time), GST_TIME_ARGS (src_time));
/* This allows for streams with out of order timestamping - sometimes the
* emerging timestamp is later than the arriving one(s) */
if (sink_time >= src_time) if (sink_time >= src_time)
sq->cur_time = sink_time - src_time; sq->cur_time = sink_time - src_time;
else else
@ -556,6 +558,10 @@ gst_single_queue_push_one (GstMultiQueue * mq, GstSingleQueue * sq,
apply_buffer (mq, sq, timestamp, duration, &sq->src_segment); apply_buffer (mq, sq, timestamp, duration, &sq->src_segment);
GST_DEBUG_OBJECT (mq,
"SingleQueue %d : Pushing buffer %p with ts %" GST_TIME_FORMAT,
sq->id, buffer, GST_TIME_ARGS (timestamp));
result = gst_pad_push (sq->srcpad, buffer); result = gst_pad_push (sq->srcpad, buffer);
} else if (GST_IS_EVENT (object)) { } else if (GST_IS_EVENT (object)) {
GstEvent *event; GstEvent *event;
@ -573,6 +579,10 @@ gst_single_queue_push_one (GstMultiQueue * mq, GstSingleQueue * sq,
break; break;
} }
GST_DEBUG_OBJECT (mq,
"SingleQueue %d : Pushing event %p of type %s",
sq->id, event, GST_EVENT_TYPE_NAME (event));
gst_pad_push_event (sq->srcpad, event); gst_pad_push_event (sq->srcpad, event);
} else { } else {
g_warning ("Unexpected object in singlequeue %d (refcounting problem?)", g_warning ("Unexpected object in singlequeue %d (refcounting problem?)",
@ -751,8 +761,8 @@ gst_multi_queue_chain (GstPad * pad, GstBuffer * buffer)
curid = mq->counter++; curid = mq->counter++;
GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
GST_LOG_OBJECT (mq, "SingleQueue %d : about to push buffer with id %d", GST_LOG_OBJECT (mq, "SingleQueue %d : about to enqueue buffer %p with id %d",
sq->id, curid); sq->id, buffer, curid);
item = gst_multi_queue_item_new (GST_MINI_OBJECT_CAST (buffer), curid); item = gst_multi_queue_item_new (GST_MINI_OBJECT_CAST (buffer), curid);
@ -854,8 +864,8 @@ gst_multi_queue_sink_event (GstPad * pad, GstEvent * event)
item = gst_multi_queue_item_new ((GstMiniObject *) event, curid); item = gst_multi_queue_item_new ((GstMiniObject *) event, curid);
GST_DEBUG_OBJECT (mq, GST_DEBUG_OBJECT (mq,
"SingleQueue %d : Adding event %p of type %s with id %d", sq->id, event, "SingleQueue %d : Enqueuing event %p of type %s with id %d",
GST_EVENT_TYPE_NAME (event), curid); sq->id, event, GST_EVENT_TYPE_NAME (event), curid);
if (!(res = gst_data_queue_push (sq->queue, (GstDataQueueItem *) item))) if (!(res = gst_data_queue_push (sq->queue, (GstDataQueueItem *) item)))
goto flushing; goto flushing;
@ -1041,6 +1051,9 @@ compute_next_non_linked (GstMultiQueue * mq)
lowest, mq->highid); lowest, mq->highid);
} }
#define IS_FILLED(format, value) ((sq->max_size.format) != 0 && \
(sq->max_size.format) <= (value))
/* /*
* GstSingleQueue functions * GstSingleQueue functions
*/ */
@ -1050,17 +1063,19 @@ single_queue_overrun_cb (GstDataQueue * dq, GstSingleQueue * sq)
GstMultiQueue *mq = sq->mqueue; GstMultiQueue *mq = sq->mqueue;
GList *tmp; GList *tmp;
GstDataQueueSize size; GstDataQueueSize size;
gboolean filled = FALSE;
gst_data_queue_get_level (sq->queue, &size); gst_data_queue_get_level (sq->queue, &size);
GST_LOG_OBJECT (sq->mqueue, "Single Queue %d is full", sq->id); GST_LOG_OBJECT (mq, "Single Queue %d is full", sq->id);
GST_MULTI_QUEUE_MUTEX_LOCK (mq); GST_MULTI_QUEUE_MUTEX_LOCK (mq);
for (tmp = mq->queues; tmp; tmp = g_list_next (tmp)) { for (tmp = mq->queues; tmp; tmp = g_list_next (tmp)) {
GstSingleQueue *ssq = (GstSingleQueue *) tmp->data; GstSingleQueue *ssq = (GstSingleQueue *) tmp->data;
GstDataQueueSize ssize;
if (gst_data_queue_is_empty (ssq->queue)) { if (gst_data_queue_is_empty (ssq->queue)) {
if (size.visible == sq->max_size.visible) { if (IS_FILLED (visible, size.visible)) {
sq->max_size.visible++; sq->max_size.visible++;
GST_DEBUG_OBJECT (mq, GST_DEBUG_OBJECT (mq,
"Another queue is empty, bumping single queue %d max visible to %d", "Another queue is empty, bumping single queue %d max visible to %d",
@ -1069,12 +1084,22 @@ single_queue_overrun_cb (GstDataQueue * dq, GstSingleQueue * sq)
GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
goto beach; goto beach;
} }
/* check if we reached the hard time/bytes limits */
gst_data_queue_get_level (ssq->queue, &ssize);
/* if this queue is filled completely we must signal overrun */
if (IS_FILLED (bytes, ssize.bytes) || IS_FILLED (time, ssize.time)) {
filled = TRUE;
}
} }
/* no queues were empty */
GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
/* Overrun is always forwarded, since this is blocking the upstream element */ /* Overrun is always forwarded, since this is blocking the upstream element */
g_signal_emit (G_OBJECT (sq->mqueue), gst_multi_queue_signals[SIGNAL_OVERRUN], if (filled) {
0); GST_DEBUG_OBJECT (mq, "A queue is filled, signalling overrun");
g_signal_emit (G_OBJECT (mq), gst_multi_queue_signals[SIGNAL_OVERRUN], 0);
}
beach: beach:
return; return;
@ -1098,7 +1123,7 @@ single_queue_underrun_cb (GstDataQueue * dq, GstSingleQueue * sq)
GstDataQueueSize size; GstDataQueueSize size;
gst_data_queue_get_level (sq->queue, &size); gst_data_queue_get_level (sq->queue, &size);
if (size.visible == sq->max_size.visible) { if (IS_FILLED (visible, size.visible)) {
sq->max_size.visible++; sq->max_size.visible++;
GST_DEBUG_OBJECT (mq, GST_DEBUG_OBJECT (mq,
"queue %d is filled, bumping its max visible to %d", sq->id, "queue %d is filled, bumping its max visible to %d", sq->id,
@ -1131,9 +1156,6 @@ single_queue_check_full (GstDataQueue * dataq, guint visible, guint bytes,
if (sq->is_eos) if (sq->is_eos)
return TRUE; return TRUE;
#define IS_FILLED(format, value) ((sq->max_size.format) != 0 && \
(sq->max_size.format) <= (value))
/* we never go past the max visible items */ /* we never go past the max visible items */
if (IS_FILLED (visible, visible)) if (IS_FILLED (visible, visible))
return TRUE; return TRUE;