From afebd394fa5d29357eebc5e1d23636a9931f23c1 Mon Sep 17 00:00:00 2001 From: Jan Schmidt Date: Tue, 26 Jun 2007 14:45:15 +0000 Subject: [PATCH] plugins/elements/gstmultiqueue.*: Take the multiqueue lock when updating the fill level so we don't get confused. Original commit message from CVS: * plugins/elements/gstmultiqueue.c: (gst_multi_queue_init), (gst_single_queue_flush), (apply_segment), (apply_buffer), (gst_single_queue_push_one), (gst_multi_queue_loop), (gst_multi_queue_sink_activate_push), (gst_multi_queue_sink_event), (gst_multi_queue_src_activate_push), (wake_up_next_non_linked), (compute_high_id), (gst_single_queue_new): * plugins/elements/gstmultiqueue.h: Take the multiqueue lock when updating the fill level so we don't get confused. After applying a buffer or event on the src pad segment, make sure to call gst_data_queue_limits_changed() to get the data queue to unblock and check the filled state again. Rework the not-linked pad handling so the logic is that not-linked pads can push as fast as they like, but only so they never get ahead of any linked pads. * tests/check/elements/multiqueue.c: (mq_sinkpad_to_srcpad), (mq_dummypad_getcaps), (mq_dummypad_chain), (mq_dummypad_event), (run_output_order_test), (GST_START_TEST), (multiqueue_suite): Add a test to check that not-linked pads always stay behind linked pads. --- ChangeLog | 27 ++++ plugins/elements/gstmultiqueue.c | 234 +++++++++++++++++----------- plugins/elements/gstmultiqueue.h | 2 + tests/check/elements/multiqueue.c | 245 ++++++++++++++++++++++++++++++ 4 files changed, 417 insertions(+), 91 deletions(-) diff --git a/ChangeLog b/ChangeLog index bff6a1c33a..4137dee1ea 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,30 @@ +2007-06-26 Jan Schmidt + + * plugins/elements/gstmultiqueue.c: (gst_multi_queue_init), + (gst_single_queue_flush), (apply_segment), (apply_buffer), + (gst_single_queue_push_one), (gst_multi_queue_loop), + (gst_multi_queue_sink_activate_push), (gst_multi_queue_sink_event), + (gst_multi_queue_src_activate_push), (wake_up_next_non_linked), + (compute_high_id), (gst_single_queue_new): + * plugins/elements/gstmultiqueue.h: + Take the multiqueue lock when updating the fill level so we don't get + confused. + + After applying a buffer or event on the src pad segment, make sure to + call gst_data_queue_limits_changed() to get the data queue to unblock + and check the filled state again. + + Rework the not-linked pad handling so the logic is that not-linked + pads can push as fast as they like, but only so they never get + ahead of any linked pads. + + * tests/check/elements/multiqueue.c: (mq_sinkpad_to_srcpad), + (mq_dummypad_getcaps), (mq_dummypad_chain), (mq_dummypad_event), + (run_output_order_test), (GST_START_TEST), (multiqueue_suite): + + Add a test to check that not-linked pads always stay behind + linked pads. + 2007-06-26 Jan Schmidt * docs/random/release: diff --git a/plugins/elements/gstmultiqueue.c b/plugins/elements/gstmultiqueue.c index b2e6ab3127..f795f9625c 100644 --- a/plugins/elements/gstmultiqueue.c +++ b/plugins/elements/gstmultiqueue.c @@ -1,5 +1,7 @@ /* GStreamer * Copyright (C) 2006 Edward Hervey + * Copyright (C) 2007 Jan Schmidt + * Copyright (C) 2007 Wim Taymans * * gstmultiqueue.c: * @@ -83,7 +85,7 @@ static GstSingleQueue *gst_single_queue_new (GstMultiQueue * mqueue); static void gst_single_queue_free (GstSingleQueue * squeue); static void wake_up_next_non_linked (GstMultiQueue * mq); -static void compute_next_non_linked (GstMultiQueue * mq); +static void compute_high_id (GstMultiQueue * mq); static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink%d", GST_PAD_SINK, @@ -246,7 +248,7 @@ gst_multi_queue_init (GstMultiQueue * mqueue, GstMultiQueueClass * klass) mqueue->extra_size.visible = DEFAULT_EXTRA_SIZE_BUFFERS; mqueue->extra_size.time = DEFAULT_EXTRA_SIZE_TIME; - mqueue->counter = 0; + mqueue->counter = 1; mqueue->highid = -1; mqueue->nextnotlinked = -1; @@ -441,13 +443,14 @@ gst_single_queue_flush (GstMultiQueue * mq, GstSingleQueue * sq, gboolean flush) gst_data_queue_flush (sq->queue); gst_segment_init (&sq->sink_segment, GST_FORMAT_TIME); gst_segment_init (&sq->src_segment, GST_FORMAT_TIME); - sq->srcresult = GST_FLOW_OK; + /* All pads start off not-linked for a smooth kick-off */ + sq->srcresult = GST_FLOW_NOT_LINKED; sq->cur_time = 0; sq->max_size.visible = mq->max_size.visible; sq->is_eos = FALSE; sq->inextra = FALSE; - sq->nextid = -1; - sq->oldid = -1; + sq->nextid = 0; + sq->oldid = 0; gst_data_queue_set_flushing (sq->queue, FALSE); GST_LOG_OBJECT (mq, "SingleQueue %d : starting task", sq->id); @@ -459,7 +462,8 @@ gst_single_queue_flush (GstMultiQueue * mq, GstSingleQueue * sq, gboolean flush) } /* calculate the diff between running time on the sink and src of the queue. - * This is the total amount of time in the queue. */ + * This is the total amount of time in the queue. + * WITH LOCK TAKEN */ static void update_time_level (GstMultiQueue * mq, GstSingleQueue * sq) { @@ -509,6 +513,9 @@ apply_segment (GstMultiQueue * mq, GstSingleQueue * sq, GstEvent * event, stop = -1; time = 0; } + + GST_MULTI_QUEUE_MUTEX_LOCK (mq); + gst_segment_set_newsegment_full (segment, update, rate, arate, format, start, stop, time); @@ -517,6 +524,8 @@ apply_segment (GstMultiQueue * mq, GstSingleQueue * sq, GstEvent * event, /* segment can update the time level of the queue */ update_time_level (mq, sq); + + GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); } /* take a buffer and update segment, updating the time level of the queue. */ @@ -524,6 +533,8 @@ static void apply_buffer (GstMultiQueue * mq, GstSingleQueue * sq, GstClockTime timestamp, GstClockTime duration, GstSegment * segment) { + GST_MULTI_QUEUE_MUTEX_LOCK (mq); + /* if no timestamp is set, assume it's continuous with the previous * time */ if (timestamp == GST_CLOCK_TIME_NONE) @@ -540,6 +551,7 @@ apply_buffer (GstMultiQueue * mq, GstSingleQueue * sq, GstClockTime timestamp, /* calc diff with other end */ update_time_level (mq, sq); + GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); } static GstFlowReturn @@ -558,6 +570,9 @@ gst_single_queue_push_one (GstMultiQueue * mq, GstSingleQueue * sq, apply_buffer (mq, sq, timestamp, duration, &sq->src_segment); + /* Applying the buffer may have made the queue non-full again, unblock it if needed */ + gst_data_queue_limits_changed (sq->queue); + GST_DEBUG_OBJECT (mq, "SingleQueue %d : Pushing buffer %p with ts %" GST_TIME_FORMAT, sq->id, buffer, GST_TIME_ARGS (timestamp)); @@ -574,6 +589,8 @@ gst_single_queue_push_one (GstMultiQueue * mq, GstSingleQueue * sq, break; case GST_EVENT_NEWSEGMENT: apply_segment (mq, sq, event, &sq->src_segment); + /* Applying the segment may have made the queue non-full again, unblock it if needed */ + gst_data_queue_limits_changed (sq->queue); break; default: break; @@ -637,6 +654,10 @@ gst_multi_queue_item_new (GstMiniObject * object, guint32 curid) return item; } +/* Each main loop attempts to push buffers until the return value + * is not-linked. not-linked pads are not allowed to push data beyond + * any linked pads, so they don't 'rush ahead of the pack'. + */ static void gst_multi_queue_loop (GstPad * pad) { @@ -652,81 +673,102 @@ gst_multi_queue_loop (GstPad * pad) sq = (GstSingleQueue *) gst_pad_get_element_private (pad); mq = sq->mqueue; -restart: - GST_DEBUG_OBJECT (mq, "SingleQueue %d : trying to pop an object", sq->id); + do { + GST_DEBUG_OBJECT (mq, "SingleQueue %d : trying to pop an object", sq->id); - if (!(gst_data_queue_pop (sq->queue, &sitem))) - goto out_flushing; + /* Get something from the queue, blocking until that happens, or we get + * flushed */ + if (!(gst_data_queue_pop (sq->queue, &sitem))) + goto out_flushing; - item = (GstMultiQueueItem *) sitem; - newid = item->posid; - /* steal the object and destroy the item */ - object = gst_multi_queue_item_steal_object (item); - gst_multi_queue_item_destroy (item); + item = (GstMultiQueueItem *) sitem; + newid = item->posid; - GST_LOG_OBJECT (mq, "SingleQueue %d : newid:%d , oldid:%d", - sq->id, newid, oldid); + /* steal the object and destroy the item */ + object = gst_multi_queue_item_steal_object (item); + gst_multi_queue_item_destroy (item); - /* 1. Only check turn if : - * _ We haven't pushed anything yet - * _ OR the new id isn't the follower of the previous one (continuous segment) */ - if ((oldid == -1) || (newid != (oldid + 1))) { - GST_MULTI_QUEUE_MUTEX_LOCK (mq); + GST_LOG_OBJECT (mq, "SingleQueue %d : newid:%d , oldid:%d", + sq->id, newid, oldid); - GST_LOG_OBJECT (mq, "CHECKING sq->srcresult: %s", - gst_flow_get_name (sq->srcresult)); + /* If we're not-linked, we do some extra work because we might need to + * wait before pushing. If we're linked but there's a gap in the IDs, + * or it's the first loop, or we just passed the previous highid, + * we might need to wake some sleeping pad up, so there's extra work + * there too */ + if (sq->srcresult == GST_FLOW_NOT_LINKED || + (oldid == -1) || (newid != (oldid + 1)) || oldid > mq->highid) { + GST_LOG_OBJECT (mq, "CHECKING sq->srcresult: %s", + gst_flow_get_name (sq->srcresult)); - /* preamble : if we're not linked, set the newid as the next one we want */ - if (sq->srcresult == GST_FLOW_NOT_LINKED) + GST_MULTI_QUEUE_MUTEX_LOCK (mq); + + /* Update the nextid so other threads know when to wake us up */ sq->nextid = newid; - /* store the last id we outputted */ - if (oldid != -1) - sq->oldid = oldid; + /* Update the oldid (the last ID we output) for highid tracking */ + if (oldid != -1) + sq->oldid = oldid; - /* 2. If there's a queue waiting to push, wake it up. If it's us the */ - /* check below (3.) will avoid us waiting. */ - wake_up_next_non_linked (mq); + if (sq->srcresult == GST_FLOW_NOT_LINKED) { + /* Go to sleep until it's time to push this buffer */ + + /* Recompute the highid */ + compute_high_id (mq); + while (newid > mq->highid && sq->srcresult == GST_FLOW_NOT_LINKED) { + GST_DEBUG_OBJECT (mq, "queue %d sleeping for not-linked wakeup with " + "newid %u and highid %u", sq->id, newid, mq->highid); + + + /* Wake up all non-linked pads before we sleep */ + wake_up_next_non_linked (mq); + + mq->numwaiting++; + g_cond_wait (sq->turn, mq->qlock); + mq->numwaiting--; + + GST_DEBUG_OBJECT (mq, "queue %d woken from sleeping for not-linked " + "wakeup with newid %u and highid %u", sq->id, newid, mq->highid); + } + + /* Re-compute the high_id in case someone else pushed */ + compute_high_id (mq); + } else { + compute_high_id (mq); + /* Wake up all non-linked pads */ + wake_up_next_non_linked (mq); + } + /* We're done waiting, we can clear the nextid */ + sq->nextid = 0; - /* 3. If we're not linked AND our nextid is higher than the highest oldid outputted - * _ Update global next-not-linked - * _ Wait on our conditional - */ - while ((sq->srcresult == GST_FLOW_NOT_LINKED) - && (mq->nextnotlinked != sq->id)) { - compute_next_non_linked (mq); - g_cond_wait (sq->turn, mq->qlock); - } - /* 4. Check again status, maybe we're flushing */ - if ((sq->srcresult != GST_FLOW_OK)) { GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); - gst_mini_object_unref (object); - goto out_flushing; } - GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); + + GST_LOG_OBJECT (mq, "BEFORE PUSHING sq->srcresult: %s", + gst_flow_get_name (sq->srcresult)); + + /* Try to push out the new object */ + result = gst_single_queue_push_one (mq, sq, object); + sq->srcresult = result; + + if (result != GST_FLOW_OK && result != GST_FLOW_NOT_LINKED) + goto out_flushing; + + GST_LOG_OBJECT (mq, "AFTER PUSHING sq->srcresult: %s", + gst_flow_get_name (sq->srcresult)); + + oldid = newid; } + while (TRUE); - GST_LOG_OBJECT (mq, "BEFORE PUSHING sq->srcresult: %s", - gst_flow_get_name (sq->srcresult)); - - /* 4. Try to push out the new object */ - result = gst_single_queue_push_one (mq, sq, object); - sq->srcresult = result; - - if (result != GST_FLOW_OK && result != GST_FLOW_NOT_LINKED) - goto out_flushing; - - GST_LOG_OBJECT (mq, "AFTER PUSHING sq->srcresult: %s", - gst_flow_get_name (sq->srcresult)); - - oldid = newid; - - /* restart to get the next element */ - goto restart; - - /* ERRORS */ out_flushing: { + /* Need to make sure wake up any sleeping pads when we exit */ + GST_MULTI_QUEUE_MUTEX_LOCK (mq); + compute_high_id (mq); + wake_up_next_non_linked (mq); + GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); + gst_data_queue_set_flushing (sq->queue, TRUE); gst_pad_pause_task (sq->srcpad); GST_CAT_LOG_OBJECT (multi_queue_debug, mq, @@ -800,7 +842,8 @@ gst_multi_queue_sink_activate_push (GstPad * pad, gboolean active) sq = (GstSingleQueue *) gst_pad_get_element_private (pad); if (active) { - sq->srcresult = GST_FLOW_OK; + /* All pads start off not-linked for a smooth kick-off */ + sq->srcresult = GST_FLOW_NOT_LINKED; } else { sq->srcresult = GST_FLOW_WRONG_STATE; gst_data_queue_flush (sq->queue); @@ -937,7 +980,7 @@ gst_multi_queue_src_activate_push (GstPad * pad, gboolean active) sq = (GstSingleQueue *) gst_pad_get_element_private (pad); mq = sq->mqueue; - GST_LOG ("SingleQueue %d", sq->id); + GST_DEBUG_OBJECT (mq, "SingleQueue %d", sq->id); if (active) { result = gst_single_queue_flush (mq, sq, FALSE); @@ -1000,55 +1043,64 @@ wake_up_next_non_linked (GstMultiQueue * mq) { GList *tmp; - GST_LOG ("mq->nextnotlinked:%d", mq->nextnotlinked); - /* maybe no-one is waiting */ - if (mq->nextnotlinked == -1) + if (mq->numwaiting < 1) return; - /* Else figure out which singlequeue it is and wake it up */ + /* Else figure out which singlequeue(s) need waking up */ for (tmp = mq->queues; tmp; tmp = g_list_next (tmp)) { GstSingleQueue *sq = (GstSingleQueue *) tmp->data; - if (sq->srcresult == GST_FLOW_NOT_LINKED) - if (sq->id == mq->nextnotlinked) { + if (sq->srcresult == GST_FLOW_NOT_LINKED) { + if (sq->nextid != 0 && sq->nextid <= mq->highid) { GST_LOG_OBJECT (mq, "Waking up singlequeue %d", sq->id); g_cond_signal (sq->turn); - return; } + } } } /* WITH LOCK TAKEN */ static void -compute_next_non_linked (GstMultiQueue * mq) +compute_high_id (GstMultiQueue * mq) { + /* The high-id is either the highest id among the linked pads, or if all + * pads are not-linked, it's the lowest not-linked pad */ GList *tmp; guint32 lowest = G_MAXUINT32; - gint nextid = -1; + guint32 highid = G_MAXUINT32; for (tmp = mq->queues; tmp; tmp = g_list_next (tmp)) { GstSingleQueue *sq = (GstSingleQueue *) tmp->data; - GST_LOG ("inspecting sq:%d , nextid:%d, oldid:%d, srcresult:%s", + GST_LOG_OBJECT (mq, "inspecting sq:%d , nextid:%d, oldid:%d, srcresult:%s", sq->id, sq->nextid, sq->oldid, gst_flow_get_name (sq->srcresult)); - if (sq->srcresult == GST_FLOW_NOT_LINKED) - if (lowest > sq->nextid) { - lowest = sq->nextid; - nextid = sq->id; + if (sq->srcresult == GST_FLOW_NOT_LINKED) { + /* No need to consider queues which are not waiting */ + if (sq->nextid == 0) { + GST_LOG_OBJECT (mq, "sq:%d is not waiting - ignoring", sq->id); + continue; } - /* If we don't have a global highid, or the global highid is lower than */ - /* this single queue's last outputted id, store the queue's one */ - if ((mq->highid == -1) || (mq->highid < sq->oldid)) - mq->highid = sq->oldid; + if (sq->nextid < lowest) + lowest = sq->nextid; + } else if (sq->srcresult != GST_FLOW_UNEXPECTED) { + /* If we don't have a global highid, or the global highid is lower than + * this single queue's last outputted id, store the queue's one, + * unless the singlequeue is at EOS (srcresult = UNEXPECTED) */ + if ((highid == G_MAXUINT32) || (sq->oldid > highid)) + highid = sq->oldid; + } } - mq->nextnotlinked = nextid; - GST_LOG_OBJECT (mq, - "Next-non-linked is sq #%d with nextid : %d. Highid is now : %d", nextid, - lowest, mq->highid); + if (highid == G_MAXUINT32 || lowest < highid) + mq->highid = lowest; + else + mq->highid = highid; + + GST_LOG_OBJECT (mq, "Highid is now : %u, lowest non-linked %u", mq->highid, + lowest); } #define IS_FILLED(format, value) ((sq->max_size.format) != 0 && \ @@ -1213,8 +1265,8 @@ gst_single_queue_new (GstMultiQueue * mqueue) gst_segment_init (&sq->sink_segment, GST_FORMAT_TIME); gst_segment_init (&sq->src_segment, GST_FORMAT_TIME); - sq->nextid = -1; - sq->oldid = -1; + sq->nextid = 0; + sq->oldid = 0; sq->turn = g_cond_new (); /* attach to underrun/overrun signals to handle non-starvation */ diff --git a/plugins/elements/gstmultiqueue.h b/plugins/elements/gstmultiqueue.h index b85b69e490..4874563236 100644 --- a/plugins/elements/gstmultiqueue.h +++ b/plugins/elements/gstmultiqueue.h @@ -66,6 +66,8 @@ struct _GstMultiQueue { /* GstMultiQueueSize, counter and highid */ gint nextnotlinked; /* ID of the next queue not linked (-1 : none) */ + + gint numwaiting; /* number of not-linked pads waiting */ }; struct _GstMultiQueueClass { diff --git a/tests/check/elements/multiqueue.c b/tests/check/elements/multiqueue.c index e6826e2c31..20fb9761cd 100644 --- a/tests/check/elements/multiqueue.c +++ b/tests/check/elements/multiqueue.c @@ -250,6 +250,250 @@ again: GST_END_TEST; +static GstPad * +mq_sinkpad_to_srcpad (GstElement * mq, GstPad * sink) +{ + GstPad *srcpad = NULL; + + gchar *mq_sinkpad_name; + gchar *mq_srcpad_name; + + mq_sinkpad_name = gst_pad_get_name (sink); + fail_unless (g_str_has_prefix (mq_sinkpad_name, "sink")); + mq_srcpad_name = g_strdup_printf ("src%s", mq_sinkpad_name + 4); + srcpad = gst_element_get_pad (mq, mq_srcpad_name); + fail_unless (srcpad != NULL); + + g_free (mq_sinkpad_name); + g_free (mq_srcpad_name); + + return srcpad; +} + +static GstCaps * +mq_dummypad_getcaps (GstPad * sinkpad) +{ + return gst_caps_new_any (); +} + +struct PadData +{ + guint8 pad_num; + guint32 *max_linked_id_ptr; + guint32 *eos_count_ptr; + gboolean is_linked; + gint n_linked; + + GMutex *mutex; + GCond *cond; +}; + +static GstFlowReturn +mq_dummypad_chain (GstPad * sinkpad, GstBuffer * buf) +{ + guint32 cur_id; + struct PadData *pad_data; + + pad_data = gst_pad_get_element_private (sinkpad); + fail_if (pad_data == NULL); + + /* Read an ID from the first 4 bytes of the buffer data and check it's + * what we expect */ + fail_unless (GST_BUFFER_SIZE (buf) >= 4); + cur_id = GST_READ_UINT32_BE (GST_BUFFER_DATA (buf)); + + g_mutex_lock (pad_data->mutex); + + /* For not-linked pads, ensure that we're not running ahead of the 'linked' + * pads */ + if (!pad_data->is_linked) { + /* If there are no linked pads, we can't track a max_id for them :) */ + if (pad_data->n_linked > 0) { + fail_unless (cur_id <= *(pad_data->max_linked_id_ptr) + 1, + "Got buffer %u on pad %u before buffer %u was seen on a " + "linked pad (max: %u)", cur_id, pad_data->pad_num, cur_id - 1, + *(pad_data->max_linked_id_ptr)); + } + } else { + /* Update the max_id value */ + if (cur_id > *(pad_data->max_linked_id_ptr)) + *(pad_data->max_linked_id_ptr) = cur_id; + } + + g_mutex_unlock (pad_data->mutex); + + /* Unref the buffer */ + gst_buffer_unref (buf); + + /* Return OK or not-linked as indicated */ + return pad_data->is_linked ? GST_FLOW_OK : GST_FLOW_NOT_LINKED; +} + +static gboolean +mq_dummypad_event (GstPad * sinkpad, GstEvent * event) +{ + struct PadData *pad_data; + + pad_data = gst_pad_get_element_private (sinkpad); + fail_if (pad_data == NULL); + + if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) { + g_mutex_lock (pad_data->mutex); + + /* Accumulate that we've seen the EOS and signal the main thread */ + *(pad_data->eos_count_ptr) += 1; + + GST_DEBUG ("EOS on pad %u", pad_data->pad_num); + + g_cond_broadcast (pad_data->cond); + g_mutex_unlock (pad_data->mutex); + } + + gst_event_unref (event); + return TRUE; +} + +static void +run_output_order_test (gint n_linked) +{ + /* This test creates a multiqueue with 2 linked output, and 3 outputs that + * return 'not-linked' when data is pushed, then verifies that all buffers + * are received on not-linked pads only after earlier buffers on the + * 'linked' pads are made */ + GstElement *pipe; + GstElement *mq; + GstPad *inputpads[5]; + GstPad *sinkpads[5]; + struct PadData pad_data[5]; + guint32 max_linked_id; + guint32 eos_seen; + GMutex *mutex; + GCond *cond; + gint i; + const gint NPADS = 5; + const gint NBUFFERS = 1000; + + mutex = g_mutex_new (); + cond = g_cond_new (); + + pipe = gst_bin_new ("testbin"); + + mq = gst_element_factory_make ("multiqueue", NULL); + fail_unless (mq != NULL); + gst_bin_add (GST_BIN (pipe), mq); + + /* Construct NPADS dummy output pads. The first 'n_linked' return FLOW_OK, the rest + * return NOT_LINKED. The not-linked ones check the expected ordering of + * output buffers */ + for (i = 0; i < NPADS; i++) { + GstPad *mq_srcpad, *mq_sinkpad; + gchar *name; + + name = g_strdup_printf ("dummysrc%d", i); + inputpads[i] = gst_pad_new (name, GST_PAD_SRC); + g_free (name); + gst_pad_set_getcaps_function (inputpads[i], mq_dummypad_getcaps); + + mq_sinkpad = gst_element_get_request_pad (mq, "sink%d"); + fail_unless (mq_sinkpad != NULL); + gst_pad_link (inputpads[i], mq_sinkpad); + + gst_pad_set_active (inputpads[i], TRUE); + + mq_srcpad = mq_sinkpad_to_srcpad (mq, mq_sinkpad); + + name = g_strdup_printf ("dummysink%d", i); + sinkpads[i] = gst_pad_new (name, GST_PAD_SINK); + g_free (name); + gst_pad_set_chain_function (sinkpads[i], mq_dummypad_chain); + gst_pad_set_event_function (sinkpads[i], mq_dummypad_event); + gst_pad_set_getcaps_function (sinkpads[i], mq_dummypad_getcaps); + + pad_data[i].pad_num = i; + pad_data[i].max_linked_id_ptr = &max_linked_id; + pad_data[i].eos_count_ptr = &eos_seen; + pad_data[i].is_linked = (i < n_linked ? TRUE : FALSE); + pad_data[i].n_linked = n_linked; + pad_data[i].cond = cond; + pad_data[i].mutex = mutex; + gst_pad_set_element_private (sinkpads[i], pad_data + i); + + gst_pad_link (mq_srcpad, sinkpads[i]); + gst_pad_set_active (sinkpads[i], TRUE); + + gst_object_unref (mq_sinkpad); + gst_object_unref (mq_srcpad); + } + + /* Run the test. Push 1000 buffers through the multiqueue in a pattern */ + + max_linked_id = 0; + eos_seen = 0; + gst_element_set_state (pipe, GST_STATE_PLAYING); + + for (i = 0; i < NBUFFERS; i++) { + const guint8 pad_pattern[] = + { 0, 0, 0, 0, 1, 1, 2, 1, 0, 2, 3, 2, 3, 1, 4 }; + const guint n = sizeof (pad_pattern) / sizeof (guint8); + guint8 cur_pad; + GstBuffer *buf; + GstFlowReturn ret; + + cur_pad = pad_pattern[i % n]; + + buf = gst_buffer_new_and_alloc (4); + fail_if (buf == NULL); + GST_WRITE_UINT32_BE (GST_BUFFER_DATA (buf), i); + + ret = gst_pad_push (inputpads[cur_pad], buf); + if (pad_data[cur_pad].is_linked) { + fail_unless (ret == GST_FLOW_OK, + "Push on pad %d returned %d when FLOW_OK was expected", cur_pad, ret); + } else { + /* Expect OK initially, then NOT_LINKED when the srcpad starts pushing */ + fail_unless (ret == GST_FLOW_OK || ret == GST_FLOW_NOT_LINKED, + "Push on pad %d returned %d when FLOW_OK or NOT_LINKED was expected", + cur_pad, ret); + } + } + for (i = 0; i < NPADS; i++) { + gst_pad_push_event (inputpads[i], gst_event_new_eos ()); + } + + /* Wait while the buffers are processed */ + g_mutex_lock (mutex); + while (eos_seen < 5) { + g_cond_wait (cond, mutex); + } + g_mutex_unlock (mutex); + + /* Clean up */ + for (i = 0; i < 5; i++) { + GstPad *mq_input = gst_pad_get_peer (inputpads[i]); + + gst_pad_unlink (inputpads[i], mq_input); + gst_element_release_request_pad (mq, mq_input); + gst_object_unref (mq_input); + gst_object_unref (inputpads[i]); + + gst_object_unref (sinkpads[i]); + } + + gst_element_set_state (pipe, GST_STATE_NULL); + gst_object_unref (pipe); + + g_cond_free (cond); + g_mutex_free (mutex); +} + +GST_START_TEST (test_output_order) +{ + run_output_order_test (2); + run_output_order_test (0); +} + +GST_END_TEST; + static Suite * multiqueue_suite (void) { @@ -264,6 +508,7 @@ multiqueue_suite (void) /* FIXME: test_request_pads() needs some more fixes, see comments there */ tcase_add_test (tc_chain, test_request_pads); + tcase_add_test (tc_chain, test_output_order); return s; }