Merge remote-tracking branch 'origin/0.10'

Conflicts:
	plugins/elements/gstmultiqueue.c
This commit is contained in:
Sebastian Dröge 2012-03-29 14:54:53 +02:00
commit 4b6c3c7fea
3 changed files with 78 additions and 15 deletions

View file

@ -549,9 +549,10 @@ gst_input_selector_wait_running_time (GstInputSelector * sel,
* d) the active pad has no running time or the active * d) the active pad has no running time or the active
* pad's running time is before this running time * pad's running time is before this running time
* e) the active pad has a non-time segment * e) the active pad has a non-time segment
* f) the active pad changed and has not pushed anything
*/ */
while (pad != active_selpad && !sel->flushing && !pad->flushing && while (pad != active_selpad && !sel->flushing && !pad->flushing
(sel->blocked || active_running_time == -1 && active_selpad->pushed && (sel->blocked || active_running_time == -1
|| running_time >= active_running_time)) { || running_time >= active_running_time)) {
if (!sel->blocked) if (!sel->blocked)
GST_DEBUG_OBJECT (pad, GST_DEBUG_OBJECT (pad,

View file

@ -136,6 +136,12 @@ struct _GstSingleQueue
/* flowreturn of previous srcpad push */ /* flowreturn of previous srcpad push */
GstFlowReturn srcresult; GstFlowReturn srcresult;
/* If something was actually pushed on
* this pad after flushing/pad activation
* and the srcresult corresponds to something
* real
*/
gboolean pushed;
/* segments */ /* segments */
GstSegment sink_segment; GstSegment sink_segment;
@ -744,15 +750,16 @@ gst_single_queue_flush (GstMultiQueue * mq, GstSingleQueue * sq, gboolean flush)
sq->id); sq->id);
if (flush) { if (flush) {
GST_MULTI_QUEUE_MUTEX_LOCK (mq);
sq->srcresult = GST_FLOW_FLUSHING; sq->srcresult = GST_FLOW_FLUSHING;
gst_data_queue_set_flushing (sq->queue, TRUE); gst_data_queue_set_flushing (sq->queue, TRUE);
sq->flushing = TRUE; sq->flushing = TRUE;
GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
/* wake up non-linked task */ /* wake up non-linked task */
GST_LOG_OBJECT (mq, "SingleQueue %d : waking up eventually waiting task", GST_LOG_OBJECT (mq, "SingleQueue %d : waking up eventually waiting task",
sq->id); sq->id);
GST_MULTI_QUEUE_MUTEX_LOCK (mq);
g_cond_signal (&sq->turn); g_cond_signal (&sq->turn);
sq->last_query = FALSE; sq->last_query = FALSE;
g_cond_signal (&sq->query_handled); g_cond_signal (&sq->query_handled);
@ -762,11 +769,13 @@ gst_single_queue_flush (GstMultiQueue * mq, GstSingleQueue * sq, gboolean flush)
result = gst_pad_pause_task (sq->srcpad); result = gst_pad_pause_task (sq->srcpad);
sq->sink_tainted = sq->src_tainted = TRUE; sq->sink_tainted = sq->src_tainted = TRUE;
} else { } else {
GST_MULTI_QUEUE_MUTEX_LOCK (mq);
gst_data_queue_flush (sq->queue); gst_data_queue_flush (sq->queue);
gst_segment_init (&sq->sink_segment, GST_FORMAT_TIME); gst_segment_init (&sq->sink_segment, GST_FORMAT_TIME);
gst_segment_init (&sq->src_segment, GST_FORMAT_TIME); gst_segment_init (&sq->src_segment, GST_FORMAT_TIME);
/* All pads start off not-linked for a smooth kick-off */ /* All pads start off not-linked for a smooth kick-off */
sq->srcresult = GST_FLOW_OK; sq->srcresult = GST_FLOW_OK;
sq->pushed = FALSE;
sq->cur_time = 0; sq->cur_time = 0;
sq->max_size.visible = mq->max_size.visible; sq->max_size.visible = mq->max_size.visible;
sq->is_eos = FALSE; sq->is_eos = FALSE;
@ -778,11 +787,10 @@ gst_single_queue_flush (GstMultiQueue * mq, GstSingleQueue * sq, gboolean flush)
gst_data_queue_set_flushing (sq->queue, FALSE); gst_data_queue_set_flushing (sq->queue, FALSE);
/* Reset high time to be recomputed next */ /* Reset high time to be recomputed next */
GST_MULTI_QUEUE_MUTEX_LOCK (mq);
mq->high_time = GST_CLOCK_TIME_NONE; mq->high_time = GST_CLOCK_TIME_NONE;
GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
sq->flushing = FALSE; sq->flushing = FALSE;
GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
GST_LOG_OBJECT (mq, "SingleQueue %d : starting task", sq->id); GST_LOG_OBJECT (mq, "SingleQueue %d : starting task", sq->id);
result = result =
@ -1201,14 +1209,13 @@ gst_multi_queue_loop (GstPad * pad)
* or it's the first loop, or we just passed the previous highid, * or it's the first loop, or we just passed the previous highid,
* we might need to wake some sleeping pad up, so there's extra work * we might need to wake some sleeping pad up, so there's extra work
* there too */ * there too */
GST_MULTI_QUEUE_MUTEX_LOCK (mq);
if (sq->srcresult == GST_FLOW_NOT_LINKED if (sq->srcresult == GST_FLOW_NOT_LINKED
|| (sq->last_oldid == G_MAXUINT32) || (newid != (sq->last_oldid + 1)) || (sq->last_oldid == G_MAXUINT32) || (newid != (sq->last_oldid + 1))
|| sq->last_oldid > mq->highid) { || sq->last_oldid > mq->highid) {
GST_LOG_OBJECT (mq, "CHECKING sq->srcresult: %s", GST_LOG_OBJECT (mq, "CHECKING sq->srcresult: %s",
gst_flow_get_name (sq->srcresult)); gst_flow_get_name (sq->srcresult));
GST_MULTI_QUEUE_MUTEX_LOCK (mq);
/* Check again if we're flushing after the lock is taken, /* Check again if we're flushing after the lock is taken,
* the flush flag might have been changed in the meantime */ * the flush flag might have been changed in the meantime */
if (sq->flushing) { if (sq->flushing) {
@ -1275,9 +1282,8 @@ gst_multi_queue_loop (GstPad * pad)
/* We're done waiting, we can clear the nextid and nexttime */ /* We're done waiting, we can clear the nextid and nexttime */
sq->nextid = 0; sq->nextid = 0;
sq->next_time = GST_CLOCK_TIME_NONE; sq->next_time = GST_CLOCK_TIME_NONE;
GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
} }
GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
if (sq->flushing) if (sq->flushing)
goto out_flushing; goto out_flushing;
@ -1286,6 +1292,7 @@ gst_multi_queue_loop (GstPad * pad)
gst_flow_get_name (sq->srcresult)); gst_flow_get_name (sq->srcresult));
/* Update time stats */ /* Update time stats */
GST_MULTI_QUEUE_MUTEX_LOCK (mq);
next_time = get_running_time (&sq->src_segment, object, FALSE); next_time = get_running_time (&sq->src_segment, object, FALSE);
if (next_time != GST_CLOCK_TIME_NONE) { if (next_time != GST_CLOCK_TIME_NONE) {
if (sq->last_time == GST_CLOCK_TIME_NONE || sq->last_time < next_time) if (sq->last_time == GST_CLOCK_TIME_NONE || sq->last_time < next_time)
@ -1296,10 +1303,51 @@ gst_multi_queue_loop (GstPad * pad)
wake_up_next_non_linked (mq); wake_up_next_non_linked (mq);
} }
} }
GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
/* Try to push out the new object */ /* Try to push out the new object */
result = gst_single_queue_push_one (mq, sq, object); result = gst_single_queue_push_one (mq, sq, object);
/* Check if we pushed something already and if this is
* now a switch from an active to a non-active stream.
*
* If it is, we reset all the waiting streams, let them
* push another buffer to see if they're now active again.
* This allows faster switching between streams and prevents
* deadlocks if downstream does any waiting too.
*/
GST_MULTI_QUEUE_MUTEX_LOCK (mq);
if (sq->pushed && sq->srcresult == GST_FLOW_OK
&& result == GST_FLOW_NOT_LINKED) {
GList *tmp;
GST_LOG_OBJECT (mq, "SingleQueue %d : Changed from active to non-active",
sq->id);
compute_high_id (mq);
/* maybe no-one is waiting */
if (mq->numwaiting > 0) {
/* Else figure out which singlequeue(s) need waking up */
for (tmp = mq->queues; tmp; tmp = g_list_next (tmp)) {
GstSingleQueue *sq2 = (GstSingleQueue *) tmp->data;
if (sq2->srcresult == GST_FLOW_NOT_LINKED) {
GST_LOG_OBJECT (mq, "Waking up singlequeue %d", sq2->id);
sq2->pushed = FALSE;
sq2->srcresult = GST_FLOW_OK;
g_cond_signal (&sq2->turn);
}
}
}
}
if (GST_IS_BUFFER (object))
sq->pushed = TRUE;
sq->srcresult = result; sq->srcresult = result;
sq->last_oldid = newid;
GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
object = NULL; object = NULL;
if (result != GST_FLOW_OK && result != GST_FLOW_NOT_LINKED if (result != GST_FLOW_OK && result != GST_FLOW_NOT_LINKED
@ -1309,8 +1357,6 @@ gst_multi_queue_loop (GstPad * pad)
GST_LOG_OBJECT (mq, "AFTER PUSHING sq->srcresult: %s", GST_LOG_OBJECT (mq, "AFTER PUSHING sq->srcresult: %s",
gst_flow_get_name (sq->srcresult)); gst_flow_get_name (sq->srcresult));
sq->last_oldid = newid;
return; return;
out_flushing: out_flushing:
@ -1320,6 +1366,7 @@ out_flushing:
/* Need to make sure wake up any sleeping pads when we exit */ /* Need to make sure wake up any sleeping pads when we exit */
GST_MULTI_QUEUE_MUTEX_LOCK (mq); GST_MULTI_QUEUE_MUTEX_LOCK (mq);
compute_high_time (mq);
compute_high_id (mq); compute_high_id (mq);
wake_up_next_non_linked (mq); wake_up_next_non_linked (mq);
GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
@ -1406,14 +1453,22 @@ gst_multi_queue_sink_activate_mode (GstPad * pad, GstObject * parent,
{ {
gboolean res; gboolean res;
GstSingleQueue *sq; GstSingleQueue *sq;
GstMultiQueue *mq;
sq = (GstSingleQueue *) gst_pad_get_element_private (pad); sq = (GstSingleQueue *) gst_pad_get_element_private (pad);
mq = (GstMultiQueue *) gst_pad_get_parent (pad);
/* mq is NULL if the pad is activated/deactivated before being
* added to the multiqueue */
if (mq)
GST_MULTI_QUEUE_MUTEX_LOCK (mq);
switch (mode) { switch (mode) {
case GST_PAD_MODE_PUSH: case GST_PAD_MODE_PUSH:
if (active) { if (active) {
/* All pads start off linked until they push one buffer */ /* All pads start off linked until they push one buffer */
sq->srcresult = GST_FLOW_OK; sq->srcresult = GST_FLOW_OK;
sq->pushed = FALSE;
} else { } else {
sq->srcresult = GST_FLOW_FLUSHING; sq->srcresult = GST_FLOW_FLUSHING;
gst_data_queue_flush (sq->queue); gst_data_queue_flush (sq->queue);
@ -1424,6 +1479,12 @@ gst_multi_queue_sink_activate_mode (GstPad * pad, GstObject * parent,
res = FALSE; res = FALSE;
break; break;
} }
if (mq) {
GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
gst_object_unref (mq);
}
return res; return res;
} }
@ -1928,6 +1989,7 @@ gst_single_queue_new (GstMultiQueue * mqueue, guint id)
sq->mqueue = mqueue; sq->mqueue = mqueue;
sq->srcresult = GST_FLOW_FLUSHING; sq->srcresult = GST_FLOW_FLUSHING;
sq->pushed = FALSE;
sq->queue = gst_data_queue_new_full ((GstDataQueueCheckFullFunction) sq->queue = gst_data_queue_new_full ((GstDataQueueCheckFullFunction)
single_queue_check_full, single_queue_check_full,
(GstDataQueueFullCallback) single_queue_overrun_cb, (GstDataQueueFullCallback) single_queue_overrun_cb,

View file

@ -592,7 +592,7 @@ event_loop (GstElement * pipeline, gboolean blocking, GstState target_state)
} }
case GST_MESSAGE_TAG: case GST_MESSAGE_TAG:
if (tags) { if (tags) {
GstTagList *tags; GstTagList *tag_list;
if (GST_IS_ELEMENT (GST_MESSAGE_SRC (message))) { if (GST_IS_ELEMENT (GST_MESSAGE_SRC (message))) {
PRINT (_("FOUND TAG : found by element \"%s\".\n"), PRINT (_("FOUND TAG : found by element \"%s\".\n"),
@ -607,9 +607,9 @@ event_loop (GstElement * pipeline, gboolean blocking, GstState target_state)
PRINT (_("FOUND TAG\n")); PRINT (_("FOUND TAG\n"));
} }
gst_message_parse_tag (message, &tags); gst_message_parse_tag (message, &tag_list);
gst_tag_list_foreach (tags, print_tag, NULL); gst_tag_list_foreach (tag_list, print_tag, NULL);
gst_tag_list_free (tags); gst_tag_list_free (tag_list);
} }
break; break;
case GST_MESSAGE_INFO:{ case GST_MESSAGE_INFO:{