diff --git a/plugins/elements/gstmultiqueue.c b/plugins/elements/gstmultiqueue.c index 7028203703..cf18dc54ac 100644 --- a/plugins/elements/gstmultiqueue.c +++ b/plugins/elements/gstmultiqueue.c @@ -160,6 +160,10 @@ struct _GstSingleQueue GstClockTime next_time; /* End running time of next buffer to be pushed */ GstClockTime last_time; /* Start running time of last pushed buffer */ GCond turn; /* SingleQueue turn waiting conditional */ + + /* for serialized queries */ + GCond query_handled; + gboolean last_query; }; @@ -750,6 +754,8 @@ gst_single_queue_flush (GstMultiQueue * mq, GstSingleQueue * sq, gboolean flush) sq->id); GST_MULTI_QUEUE_MUTEX_LOCK (mq); g_cond_signal (&sq->turn); + sq->last_query = FALSE; + g_cond_signal (&sq->query_handled); GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); GST_LOG_OBJECT (mq, "SingleQueue %d : pausing task", sq->id); @@ -1073,6 +1079,18 @@ gst_single_queue_push_one (GstMultiQueue * mq, GstSingleQueue * sq, sq->id, event, GST_EVENT_TYPE_NAME (event)); gst_pad_push_event (sq->srcpad, event); + } else if (GST_IS_QUERY (object)) { + GstQuery *query; + gboolean res; + + query = GST_QUERY_CAST (object); + + res = gst_pad_peer_query (sq->srcpad, query); + + GST_MULTI_QUEUE_MUTEX_LOCK (mq); + sq->last_query = res; + g_cond_signal (&sq->query_handled); + GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); } else { g_warning ("Unexpected object in singlequeue %d (refcounting problem?)", sq->id); @@ -1121,7 +1139,7 @@ gst_multi_queue_buffer_item_new (GstMiniObject * object, guint32 curid) } static GstMultiQueueItem * -gst_multi_queue_event_item_new (GstMiniObject * object, guint32 curid) +gst_multi_queue_mo_item_new (GstMiniObject * object, guint32 curid) { GstMultiQueueItem *item; @@ -1464,7 +1482,7 @@ gst_multi_queue_sink_event (GstPad * pad, GstObject * parent, GstEvent * event) /* Get an unique incrementing id. */ curid = g_atomic_int_add ((gint *) & mq->counter, 1); - item = gst_multi_queue_event_item_new ((GstMiniObject *) event, curid); + item = gst_multi_queue_mo_item_new ((GstMiniObject *) event, curid); GST_DEBUG_OBJECT (mq, "SingleQueue %d : Enqueuing event %p of type %s with id %d", @@ -1516,12 +1534,32 @@ static gboolean gst_multi_queue_sink_query (GstPad * pad, GstObject * parent, GstQuery * query) { gboolean res; + GstSingleQueue *sq; + GstMultiQueue *mq; + + sq = (GstSingleQueue *) gst_pad_get_element_private (pad); + mq = (GstMultiQueue *) parent; switch (GST_QUERY_TYPE (query)) { default: if (GST_QUERY_IS_SERIALIZED (query)) { - GST_WARNING_OBJECT (pad, "unhandled serialized query"); - res = FALSE; + guint32 curid; + GstMultiQueueItem *item; + + /* Get an unique incrementing id. */ + curid = g_atomic_int_add ((gint *) & mq->counter, 1); + + item = gst_multi_queue_mo_item_new ((GstMiniObject *) query, curid); + + GST_DEBUG_OBJECT (mq, + "SingleQueue %d : Enqueuing query %p of type %s with id %d", + sq->id, query, GST_QUERY_TYPE_NAME (query), curid); + + GST_MULTI_QUEUE_MUTEX_LOCK (mq); + res = gst_data_queue_push (sq->queue, (GstDataQueueItem *) item); + g_cond_wait (&sq->query_handled, &mq->qlock); + res = sq->last_query; + GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); } else { /* default handling */ res = gst_pad_query_default (pad, parent, query); @@ -1839,6 +1877,7 @@ gst_single_queue_free (GstSingleQueue * sq) gst_data_queue_flush (sq->queue); g_object_unref (sq->queue); g_cond_clear (&sq->turn); + g_cond_clear (&sq->query_handled); g_free (sq); } @@ -1903,6 +1942,7 @@ gst_single_queue_new (GstMultiQueue * mqueue, guint id) sq->next_time = GST_CLOCK_TIME_NONE; sq->last_time = GST_CLOCK_TIME_NONE; g_cond_init (&sq->turn); + g_cond_init (&sq->query_handled); sq->sinktime = GST_CLOCK_TIME_NONE; sq->srctime = GST_CLOCK_TIME_NONE;