mirror of
https://gitlab.freedesktop.org/gstreamer/gstreamer.git
synced 2024-11-28 04:31:06 +00:00
queue: more queue optimizations
Split gst_queue_locked_enqueue() into variant for buffer and event to get rid of the if() and make the code more readable (constant boolean parameters are never nice). Removes the if (item) checks as we dereference the pointer before anyway. Also apply the same idea of reusing the previous knowledge in gst_queue_locked_dequeue to remove more type checks.
This commit is contained in:
parent
963932b1f1
commit
9f9353a84a
1 changed files with 57 additions and 47 deletions
|
@ -656,55 +656,59 @@ gst_queue_locked_flush (GstQueue * queue)
|
|||
|
||||
/* enqueue an item an update the level stats, with QUEUE_LOCK */
|
||||
static inline void
|
||||
gst_queue_locked_enqueue (GstQueue * queue, gpointer item, gboolean isbuffer)
|
||||
gst_queue_locked_enqueue_buffer (GstQueue * queue, gpointer item)
|
||||
{
|
||||
if (isbuffer) {
|
||||
GstBuffer *buffer = GST_BUFFER_CAST (item);
|
||||
GstBuffer *buffer = GST_BUFFER_CAST (item);
|
||||
|
||||
/* add buffer to the statistics */
|
||||
queue->cur_level.buffers++;
|
||||
queue->cur_level.bytes += GST_BUFFER_SIZE (buffer);
|
||||
apply_buffer (queue, buffer, &queue->sink_segment, TRUE, TRUE);
|
||||
/* add buffer to the statistics */
|
||||
queue->cur_level.buffers++;
|
||||
queue->cur_level.bytes += GST_BUFFER_SIZE (buffer);
|
||||
apply_buffer (queue, buffer, &queue->sink_segment, TRUE, TRUE);
|
||||
|
||||
/* if this is the first buffer update the end side as well, but without the
|
||||
* duration. */
|
||||
/* FIXME : This will only be useful for current time level if the
|
||||
* source task is running, which is not the case for ex in
|
||||
* gstplaybasebin when pre-rolling.
|
||||
* See #482147 */
|
||||
/* if (queue->cur_level.buffers == 1) */
|
||||
/* apply_buffer (queue, buffer, &queue->src_segment, FALSE); */
|
||||
} else {
|
||||
GstEvent *event = GST_EVENT_CAST (item);
|
||||
/* if this is the first buffer update the end side as well, but without the
|
||||
* duration. */
|
||||
/* FIXME : This will only be useful for current time level if the
|
||||
* source task is running, which is not the case for ex in
|
||||
* gstplaybasebin when pre-rolling.
|
||||
* See #482147 */
|
||||
/* if (queue->cur_level.buffers == 1) */
|
||||
/* apply_buffer (queue, buffer, &queue->src_segment, FALSE); */
|
||||
|
||||
switch (GST_EVENT_TYPE (event)) {
|
||||
case GST_EVENT_EOS:
|
||||
/* Zero the thresholds, this makes sure the queue is completely
|
||||
* filled and we can read all data from the queue. */
|
||||
GST_QUEUE_CLEAR_LEVEL (queue->min_threshold);
|
||||
/* mark the queue as EOS. This prevents us from accepting more data. */
|
||||
GST_CAT_LOG_OBJECT (queue_dataflow, queue, "got EOS from upstream");
|
||||
queue->eos = TRUE;
|
||||
break;
|
||||
case GST_EVENT_NEWSEGMENT:
|
||||
apply_segment (queue, event, &queue->sink_segment, TRUE);
|
||||
/* a new segment allows us to accept more buffers if we got UNEXPECTED
|
||||
* from downstream */
|
||||
queue->unexpected = FALSE;
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
g_queue_push_tail (queue->queue, item);
|
||||
GST_QUEUE_SIGNAL_ADD (queue);
|
||||
}
|
||||
|
||||
static inline void
|
||||
gst_queue_locked_enqueue_event (GstQueue * queue, gpointer item)
|
||||
{
|
||||
GstEvent *event = GST_EVENT_CAST (item);
|
||||
|
||||
switch (GST_EVENT_TYPE (event)) {
|
||||
case GST_EVENT_EOS:
|
||||
/* Zero the thresholds, this makes sure the queue is completely
|
||||
* filled and we can read all data from the queue. */
|
||||
GST_QUEUE_CLEAR_LEVEL (queue->min_threshold);
|
||||
/* mark the queue as EOS. This prevents us from accepting more data. */
|
||||
GST_CAT_LOG_OBJECT (queue_dataflow, queue, "got EOS from upstream");
|
||||
queue->eos = TRUE;
|
||||
break;
|
||||
case GST_EVENT_NEWSEGMENT:
|
||||
apply_segment (queue, event, &queue->sink_segment, TRUE);
|
||||
/* a new segment allows us to accept more buffers if we got UNEXPECTED
|
||||
* from downstream */
|
||||
queue->unexpected = FALSE;
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
||||
if (item)
|
||||
g_queue_push_tail (queue->queue, item);
|
||||
g_queue_push_tail (queue->queue, item);
|
||||
GST_QUEUE_SIGNAL_ADD (queue);
|
||||
}
|
||||
|
||||
/* dequeue an item from the queue and update level stats, with QUEUE_LOCK */
|
||||
static GstMiniObject *
|
||||
gst_queue_locked_dequeue (GstQueue * queue)
|
||||
gst_queue_locked_dequeue (GstQueue * queue, gboolean * is_buffer)
|
||||
{
|
||||
GstMiniObject *item;
|
||||
|
||||
|
@ -725,6 +729,8 @@ gst_queue_locked_dequeue (GstQueue * queue)
|
|||
/* if the queue is empty now, update the other side */
|
||||
if (queue->cur_level.buffers == 0)
|
||||
queue->cur_level.time = 0;
|
||||
|
||||
*is_buffer = TRUE;
|
||||
} else if (GST_IS_EVENT (item)) {
|
||||
GstEvent *event = GST_EVENT_CAST (item);
|
||||
|
||||
|
@ -742,6 +748,8 @@ gst_queue_locked_dequeue (GstQueue * queue)
|
|||
default:
|
||||
break;
|
||||
}
|
||||
|
||||
*is_buffer = FALSE;
|
||||
} else {
|
||||
g_warning
|
||||
("Unexpected item %p dequeued from queue %s (refcounting problem?)",
|
||||
|
@ -817,7 +825,7 @@ gst_queue_handle_sink_event (GstPad * pad, GstEvent * event)
|
|||
/* refuse more events on EOS */
|
||||
if (queue->eos)
|
||||
goto out_eos;
|
||||
gst_queue_locked_enqueue (queue, event, FALSE);
|
||||
gst_queue_locked_enqueue_event (queue, event);
|
||||
GST_QUEUE_MUTEX_UNLOCK (queue);
|
||||
} else {
|
||||
/* non-serialized events are passed upstream. */
|
||||
|
@ -880,8 +888,9 @@ gst_queue_leak_downstream (GstQueue * queue)
|
|||
/* for as long as the queue is filled, dequeue an item and discard it */
|
||||
while (gst_queue_is_filled (queue)) {
|
||||
GstMiniObject *leak;
|
||||
gboolean is_buffer;
|
||||
|
||||
leak = gst_queue_locked_dequeue (queue);
|
||||
leak = gst_queue_locked_dequeue (queue, &is_buffer);
|
||||
/* there is nothing to dequeue and the queue is still filled.. This should
|
||||
* not happen */
|
||||
g_assert (leak != NULL);
|
||||
|
@ -981,7 +990,7 @@ gst_queue_chain (GstPad * pad, GstBuffer * buffer)
|
|||
}
|
||||
|
||||
/* put buffer in queue now */
|
||||
gst_queue_locked_enqueue (queue, buffer, TRUE);
|
||||
gst_queue_locked_enqueue_buffer (queue, buffer);
|
||||
GST_QUEUE_MUTEX_UNLOCK (queue);
|
||||
|
||||
return GST_FLOW_OK;
|
||||
|
@ -1034,13 +1043,14 @@ gst_queue_push_one (GstQueue * queue)
|
|||
{
|
||||
GstFlowReturn result = GST_FLOW_OK;
|
||||
GstMiniObject *data;
|
||||
gboolean is_buffer;
|
||||
|
||||
data = gst_queue_locked_dequeue (queue);
|
||||
data = gst_queue_locked_dequeue (queue, &is_buffer);
|
||||
if (data == NULL)
|
||||
goto no_item;
|
||||
|
||||
next:
|
||||
if (GST_IS_BUFFER (data)) {
|
||||
if (is_buffer) {
|
||||
GstBuffer *buffer;
|
||||
GstCaps *caps;
|
||||
|
||||
|
@ -1081,12 +1091,12 @@ next:
|
|||
* can push again, which is EOS or NEWSEGMENT. If there is nothing in the
|
||||
* queue we can push, we set a flag to make the sinkpad refuse more
|
||||
* buffers with an UNEXPECTED return value. */
|
||||
while ((data = gst_queue_locked_dequeue (queue))) {
|
||||
if (GST_IS_BUFFER (data)) {
|
||||
while ((data = gst_queue_locked_dequeue (queue, &is_buffer))) {
|
||||
if (is_buffer) {
|
||||
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
|
||||
"dropping UNEXPECTED buffer %p", data);
|
||||
gst_buffer_unref (GST_BUFFER_CAST (data));
|
||||
} else if (GST_IS_EVENT (data)) {
|
||||
} else {
|
||||
GstEvent *event = GST_EVENT_CAST (data);
|
||||
GstEventType type = GST_EVENT_TYPE (event);
|
||||
|
||||
|
@ -1109,7 +1119,7 @@ next:
|
|||
queue->unexpected = TRUE;
|
||||
result = GST_FLOW_OK;
|
||||
}
|
||||
} else if (GST_IS_EVENT (data)) {
|
||||
} else {
|
||||
GstEvent *event = GST_EVENT_CAST (data);
|
||||
GstEventType type = GST_EVENT_TYPE (event);
|
||||
|
||||
|
|
Loading…
Reference in a new issue