mirror of
https://gitlab.freedesktop.org/gstreamer/gstreamer.git
synced 2024-11-28 04:31:06 +00:00
multiqueue: handle serialized queries
This commit is contained in:
parent
07c9fba116
commit
d540a5fc68
1 changed files with 44 additions and 4 deletions
|
@ -160,6 +160,10 @@ struct _GstSingleQueue
|
||||||
GstClockTime next_time; /* End running time of next buffer to be pushed */
|
GstClockTime next_time; /* End running time of next buffer to be pushed */
|
||||||
GstClockTime last_time; /* Start running time of last pushed buffer */
|
GstClockTime last_time; /* Start running time of last pushed buffer */
|
||||||
GCond turn; /* SingleQueue turn waiting conditional */
|
GCond turn; /* SingleQueue turn waiting conditional */
|
||||||
|
|
||||||
|
/* for serialized queries */
|
||||||
|
GCond query_handled;
|
||||||
|
gboolean last_query;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
|
@ -750,6 +754,8 @@ gst_single_queue_flush (GstMultiQueue * mq, GstSingleQueue * sq, gboolean flush)
|
||||||
sq->id);
|
sq->id);
|
||||||
GST_MULTI_QUEUE_MUTEX_LOCK (mq);
|
GST_MULTI_QUEUE_MUTEX_LOCK (mq);
|
||||||
g_cond_signal (&sq->turn);
|
g_cond_signal (&sq->turn);
|
||||||
|
sq->last_query = FALSE;
|
||||||
|
g_cond_signal (&sq->query_handled);
|
||||||
GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
|
GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
|
||||||
|
|
||||||
GST_LOG_OBJECT (mq, "SingleQueue %d : pausing task", sq->id);
|
GST_LOG_OBJECT (mq, "SingleQueue %d : pausing task", sq->id);
|
||||||
|
@ -1073,6 +1079,18 @@ gst_single_queue_push_one (GstMultiQueue * mq, GstSingleQueue * sq,
|
||||||
sq->id, event, GST_EVENT_TYPE_NAME (event));
|
sq->id, event, GST_EVENT_TYPE_NAME (event));
|
||||||
|
|
||||||
gst_pad_push_event (sq->srcpad, event);
|
gst_pad_push_event (sq->srcpad, event);
|
||||||
|
} else if (GST_IS_QUERY (object)) {
|
||||||
|
GstQuery *query;
|
||||||
|
gboolean res;
|
||||||
|
|
||||||
|
query = GST_QUERY_CAST (object);
|
||||||
|
|
||||||
|
res = gst_pad_peer_query (sq->srcpad, query);
|
||||||
|
|
||||||
|
GST_MULTI_QUEUE_MUTEX_LOCK (mq);
|
||||||
|
sq->last_query = res;
|
||||||
|
g_cond_signal (&sq->query_handled);
|
||||||
|
GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
|
||||||
} else {
|
} else {
|
||||||
g_warning ("Unexpected object in singlequeue %d (refcounting problem?)",
|
g_warning ("Unexpected object in singlequeue %d (refcounting problem?)",
|
||||||
sq->id);
|
sq->id);
|
||||||
|
@ -1121,7 +1139,7 @@ gst_multi_queue_buffer_item_new (GstMiniObject * object, guint32 curid)
|
||||||
}
|
}
|
||||||
|
|
||||||
static GstMultiQueueItem *
|
static GstMultiQueueItem *
|
||||||
gst_multi_queue_event_item_new (GstMiniObject * object, guint32 curid)
|
gst_multi_queue_mo_item_new (GstMiniObject * object, guint32 curid)
|
||||||
{
|
{
|
||||||
GstMultiQueueItem *item;
|
GstMultiQueueItem *item;
|
||||||
|
|
||||||
|
@ -1464,7 +1482,7 @@ gst_multi_queue_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
|
||||||
/* Get an unique incrementing id. */
|
/* Get an unique incrementing id. */
|
||||||
curid = g_atomic_int_add ((gint *) & mq->counter, 1);
|
curid = g_atomic_int_add ((gint *) & mq->counter, 1);
|
||||||
|
|
||||||
item = gst_multi_queue_event_item_new ((GstMiniObject *) event, curid);
|
item = gst_multi_queue_mo_item_new ((GstMiniObject *) event, curid);
|
||||||
|
|
||||||
GST_DEBUG_OBJECT (mq,
|
GST_DEBUG_OBJECT (mq,
|
||||||
"SingleQueue %d : Enqueuing event %p of type %s with id %d",
|
"SingleQueue %d : Enqueuing event %p of type %s with id %d",
|
||||||
|
@ -1516,12 +1534,32 @@ static gboolean
|
||||||
gst_multi_queue_sink_query (GstPad * pad, GstObject * parent, GstQuery * query)
|
gst_multi_queue_sink_query (GstPad * pad, GstObject * parent, GstQuery * query)
|
||||||
{
|
{
|
||||||
gboolean res;
|
gboolean res;
|
||||||
|
GstSingleQueue *sq;
|
||||||
|
GstMultiQueue *mq;
|
||||||
|
|
||||||
|
sq = (GstSingleQueue *) gst_pad_get_element_private (pad);
|
||||||
|
mq = (GstMultiQueue *) parent;
|
||||||
|
|
||||||
switch (GST_QUERY_TYPE (query)) {
|
switch (GST_QUERY_TYPE (query)) {
|
||||||
default:
|
default:
|
||||||
if (GST_QUERY_IS_SERIALIZED (query)) {
|
if (GST_QUERY_IS_SERIALIZED (query)) {
|
||||||
GST_WARNING_OBJECT (pad, "unhandled serialized query");
|
guint32 curid;
|
||||||
res = FALSE;
|
GstMultiQueueItem *item;
|
||||||
|
|
||||||
|
/* Get an unique incrementing id. */
|
||||||
|
curid = g_atomic_int_add ((gint *) & mq->counter, 1);
|
||||||
|
|
||||||
|
item = gst_multi_queue_mo_item_new ((GstMiniObject *) query, curid);
|
||||||
|
|
||||||
|
GST_DEBUG_OBJECT (mq,
|
||||||
|
"SingleQueue %d : Enqueuing query %p of type %s with id %d",
|
||||||
|
sq->id, query, GST_QUERY_TYPE_NAME (query), curid);
|
||||||
|
|
||||||
|
GST_MULTI_QUEUE_MUTEX_LOCK (mq);
|
||||||
|
res = gst_data_queue_push (sq->queue, (GstDataQueueItem *) item);
|
||||||
|
g_cond_wait (&sq->query_handled, &mq->qlock);
|
||||||
|
res = sq->last_query;
|
||||||
|
GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
|
||||||
} else {
|
} else {
|
||||||
/* default handling */
|
/* default handling */
|
||||||
res = gst_pad_query_default (pad, parent, query);
|
res = gst_pad_query_default (pad, parent, query);
|
||||||
|
@ -1839,6 +1877,7 @@ gst_single_queue_free (GstSingleQueue * sq)
|
||||||
gst_data_queue_flush (sq->queue);
|
gst_data_queue_flush (sq->queue);
|
||||||
g_object_unref (sq->queue);
|
g_object_unref (sq->queue);
|
||||||
g_cond_clear (&sq->turn);
|
g_cond_clear (&sq->turn);
|
||||||
|
g_cond_clear (&sq->query_handled);
|
||||||
g_free (sq);
|
g_free (sq);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1903,6 +1942,7 @@ gst_single_queue_new (GstMultiQueue * mqueue, guint id)
|
||||||
sq->next_time = GST_CLOCK_TIME_NONE;
|
sq->next_time = GST_CLOCK_TIME_NONE;
|
||||||
sq->last_time = GST_CLOCK_TIME_NONE;
|
sq->last_time = GST_CLOCK_TIME_NONE;
|
||||||
g_cond_init (&sq->turn);
|
g_cond_init (&sq->turn);
|
||||||
|
g_cond_init (&sq->query_handled);
|
||||||
|
|
||||||
sq->sinktime = GST_CLOCK_TIME_NONE;
|
sq->sinktime = GST_CLOCK_TIME_NONE;
|
||||||
sq->srctime = GST_CLOCK_TIME_NONE;
|
sq->srctime = GST_CLOCK_TIME_NONE;
|
||||||
|
|
Loading…
Reference in a new issue