mirror of
https://gitlab.freedesktop.org/gstreamer/gstreamer.git
synced 2024-12-18 22:36:33 +00:00
gst/playback/gstqueue2.c: Also fix #476514 for queue2.
Original commit message from CVS: * gst/playback/gstqueue2.c: (update_buffering), (gst_queue_locked_flush), (gst_queue_locked_enqueue), (gst_queue_handle_sink_event), (gst_queue_chain), (gst_queue_push_one), (gst_queue_sink_activate_push), (gst_queue_src_activate_push), (gst_queue_src_activate_pull): Also fix #476514 for queue2.
This commit is contained in:
parent
523fd097e6
commit
d133f1548e
2 changed files with 100 additions and 6 deletions
|
@ -1,3 +1,12 @@
|
||||||
|
2007-09-17 Wim Taymans <wim.taymans@gmail.com>
|
||||||
|
|
||||||
|
* gst/playback/gstqueue2.c: (update_buffering),
|
||||||
|
(gst_queue_locked_flush), (gst_queue_locked_enqueue),
|
||||||
|
(gst_queue_handle_sink_event), (gst_queue_chain),
|
||||||
|
(gst_queue_push_one), (gst_queue_sink_activate_push),
|
||||||
|
(gst_queue_src_activate_push), (gst_queue_src_activate_pull):
|
||||||
|
Also fix #476514 for queue2.
|
||||||
|
|
||||||
2007-09-16 Wim Taymans <wim.taymans@gmail.com>
|
2007-09-16 Wim Taymans <wim.taymans@gmail.com>
|
||||||
|
|
||||||
* gst-libs/gst/rtp/gstbasertpdepayload.c:
|
* gst-libs/gst/rtp/gstbasertpdepayload.c:
|
||||||
|
|
|
@ -158,6 +158,7 @@ struct _GstQueue
|
||||||
/* flowreturn when srcpad is paused */
|
/* flowreturn when srcpad is paused */
|
||||||
GstFlowReturn srcresult;
|
GstFlowReturn srcresult;
|
||||||
gboolean is_eos;
|
gboolean is_eos;
|
||||||
|
gboolean unexpected;
|
||||||
|
|
||||||
/* the queue of data we're keeping our hands on */
|
/* the queue of data we're keeping our hands on */
|
||||||
GQueue *queue;
|
GQueue *queue;
|
||||||
|
@ -166,11 +167,13 @@ struct _GstQueue
|
||||||
GstQueueSize max_level; /* max. amount of data allowed in the queue */
|
GstQueueSize max_level; /* max. amount of data allowed in the queue */
|
||||||
gboolean use_buffering;
|
gboolean use_buffering;
|
||||||
gboolean use_rate_estimate;
|
gboolean use_rate_estimate;
|
||||||
|
GstClockTime buffering_interval;
|
||||||
gint low_percent; /* low/high watermarks for buffering */
|
gint low_percent; /* low/high watermarks for buffering */
|
||||||
gint high_percent;
|
gint high_percent;
|
||||||
|
|
||||||
/* current buffering state */
|
/* current buffering state */
|
||||||
gboolean is_buffering;
|
gboolean is_buffering;
|
||||||
|
guint buffering_iteration;
|
||||||
|
|
||||||
/* for measuring input/output rates */
|
/* for measuring input/output rates */
|
||||||
guint64 bytes_in;
|
guint64 bytes_in;
|
||||||
|
@ -682,6 +685,7 @@ update_buffering (GstQueue * queue)
|
||||||
* below the low threshold */
|
* below the low threshold */
|
||||||
if (percent < queue->low_percent) {
|
if (percent < queue->low_percent) {
|
||||||
queue->is_buffering = TRUE;
|
queue->is_buffering = TRUE;
|
||||||
|
queue->buffering_iteration++;
|
||||||
post = TRUE;
|
post = TRUE;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -985,7 +989,6 @@ gst_queue_locked_flush (GstQueue * queue)
|
||||||
GST_QUEUE_CLEAR_LEVEL (queue->cur_level);
|
GST_QUEUE_CLEAR_LEVEL (queue->cur_level);
|
||||||
gst_segment_init (&queue->sink_segment, GST_FORMAT_TIME);
|
gst_segment_init (&queue->sink_segment, GST_FORMAT_TIME);
|
||||||
gst_segment_init (&queue->src_segment, GST_FORMAT_TIME);
|
gst_segment_init (&queue->src_segment, GST_FORMAT_TIME);
|
||||||
queue->is_eos = FALSE;
|
|
||||||
if (queue->starting_segment != NULL)
|
if (queue->starting_segment != NULL)
|
||||||
gst_event_unref (queue->starting_segment);
|
gst_event_unref (queue->starting_segment);
|
||||||
queue->starting_segment = NULL;
|
queue->starting_segment = NULL;
|
||||||
|
@ -1041,6 +1044,9 @@ gst_queue_locked_enqueue (GstQueue * queue, gpointer item)
|
||||||
queue->segment_event_received = TRUE;
|
queue->segment_event_received = TRUE;
|
||||||
queue->starting_segment = event;
|
queue->starting_segment = event;
|
||||||
}
|
}
|
||||||
|
/* a new segment allows us to accept more buffers if we got UNEXPECTED
|
||||||
|
* from downstream */
|
||||||
|
queue->unexpected = FALSE;
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
if (QUEUE_IS_USING_TEMP_FILE (queue))
|
if (QUEUE_IS_USING_TEMP_FILE (queue))
|
||||||
|
@ -1182,6 +1188,8 @@ gst_queue_handle_sink_event (GstPad * pad, GstEvent * event)
|
||||||
GST_QUEUE_MUTEX_LOCK (queue);
|
GST_QUEUE_MUTEX_LOCK (queue);
|
||||||
gst_queue_locked_flush (queue);
|
gst_queue_locked_flush (queue);
|
||||||
queue->srcresult = GST_FLOW_OK;
|
queue->srcresult = GST_FLOW_OK;
|
||||||
|
queue->is_eos = FALSE;
|
||||||
|
queue->unexpected = FALSE;
|
||||||
/* reset rate counters */
|
/* reset rate counters */
|
||||||
reset_rate_timer (queue);
|
reset_rate_timer (queue);
|
||||||
gst_pad_start_task (queue->srcpad, (GstTaskFunction) gst_queue_loop,
|
gst_pad_start_task (queue->srcpad, (GstTaskFunction) gst_queue_loop,
|
||||||
|
@ -1193,6 +1201,9 @@ gst_queue_handle_sink_event (GstPad * pad, GstEvent * event)
|
||||||
if (GST_EVENT_IS_SERIALIZED (event)) {
|
if (GST_EVENT_IS_SERIALIZED (event)) {
|
||||||
/* serialized events go in the queue */
|
/* serialized events go in the queue */
|
||||||
GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
|
GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
|
||||||
|
/* refuse more events on EOS */
|
||||||
|
if (queue->is_eos)
|
||||||
|
goto out_eos;
|
||||||
gst_queue_locked_enqueue (queue, event);
|
gst_queue_locked_enqueue (queue, event);
|
||||||
GST_QUEUE_MUTEX_UNLOCK (queue);
|
GST_QUEUE_MUTEX_UNLOCK (queue);
|
||||||
} else {
|
} else {
|
||||||
|
@ -1207,10 +1218,16 @@ done:
|
||||||
/* ERRORS */
|
/* ERRORS */
|
||||||
out_flushing:
|
out_flushing:
|
||||||
{
|
{
|
||||||
GST_DEBUG_OBJECT (queue, "we are flushing");
|
GST_DEBUG_OBJECT (queue, "refusing event, we are flushing");
|
||||||
GST_QUEUE_MUTEX_UNLOCK (queue);
|
GST_QUEUE_MUTEX_UNLOCK (queue);
|
||||||
|
gst_event_unref (event);
|
||||||
gst_buffer_unref (event);
|
return FALSE;
|
||||||
|
}
|
||||||
|
out_eos:
|
||||||
|
{
|
||||||
|
GST_DEBUG_OBJECT (queue, "refusing event, we are EOS");
|
||||||
|
GST_QUEUE_MUTEX_UNLOCK (queue);
|
||||||
|
gst_event_unref (event);
|
||||||
return FALSE;
|
return FALSE;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1275,6 +1292,12 @@ gst_queue_chain (GstPad * pad, GstBuffer * buffer)
|
||||||
|
|
||||||
/* we have to lock the queue since we span threads */
|
/* we have to lock the queue since we span threads */
|
||||||
GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
|
GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
|
||||||
|
/* when we received EOS, we refuse more data */
|
||||||
|
if (queue->is_eos)
|
||||||
|
goto out_eos;
|
||||||
|
/* when we received unexpected from downstream, refuse more buffers */
|
||||||
|
if (queue->unexpected)
|
||||||
|
goto out_unexpected;
|
||||||
|
|
||||||
/* We make space available if we're "full" according to whatever
|
/* We make space available if we're "full" according to whatever
|
||||||
* the user defined as "full". */
|
* the user defined as "full". */
|
||||||
|
@ -1299,11 +1322,27 @@ out_flushing:
|
||||||
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
|
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
|
||||||
"exit because task paused, reason: %s", gst_flow_get_name (ret));
|
"exit because task paused, reason: %s", gst_flow_get_name (ret));
|
||||||
GST_QUEUE_MUTEX_UNLOCK (queue);
|
GST_QUEUE_MUTEX_UNLOCK (queue);
|
||||||
|
|
||||||
gst_buffer_unref (buffer);
|
gst_buffer_unref (buffer);
|
||||||
|
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
out_eos:
|
||||||
|
{
|
||||||
|
GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we received EOS");
|
||||||
|
GST_QUEUE_MUTEX_UNLOCK (queue);
|
||||||
|
gst_buffer_unref (buffer);
|
||||||
|
|
||||||
|
return GST_FLOW_UNEXPECTED;
|
||||||
|
}
|
||||||
|
out_unexpected:
|
||||||
|
{
|
||||||
|
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
|
||||||
|
"exit because we received UNEXPECTED");
|
||||||
|
GST_QUEUE_MUTEX_UNLOCK (queue);
|
||||||
|
gst_buffer_unref (buffer);
|
||||||
|
|
||||||
|
return GST_FLOW_UNEXPECTED;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* dequeue an item from the queue an push it downstream. This functions returns
|
/* dequeue an item from the queue an push it downstream. This functions returns
|
||||||
|
@ -1318,6 +1357,7 @@ gst_queue_push_one (GstQueue * queue)
|
||||||
if (data == NULL)
|
if (data == NULL)
|
||||||
goto no_item;
|
goto no_item;
|
||||||
|
|
||||||
|
next:
|
||||||
if (GST_IS_BUFFER (data)) {
|
if (GST_IS_BUFFER (data)) {
|
||||||
GstBuffer *buffer = GST_BUFFER_CAST (data);
|
GstBuffer *buffer = GST_BUFFER_CAST (data);
|
||||||
|
|
||||||
|
@ -1327,6 +1367,42 @@ gst_queue_push_one (GstQueue * queue)
|
||||||
|
|
||||||
/* need to check for srcresult here as well */
|
/* need to check for srcresult here as well */
|
||||||
GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
|
GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
|
||||||
|
if (result == GST_FLOW_UNEXPECTED) {
|
||||||
|
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
|
||||||
|
"got UNEXPECTED from downstream");
|
||||||
|
/* stop pushing buffers, we dequeue all items until we see an item that we
|
||||||
|
* can push again, which is EOS or NEWSEGMENT. If there is nothing in the
|
||||||
|
* queue we can push, we set a flag to make the sinkpad refuse more
|
||||||
|
* buffers with an UNEXPECTED return value until we receive something
|
||||||
|
* pushable again or we get flushed. */
|
||||||
|
while ((data = gst_queue_locked_dequeue (queue))) {
|
||||||
|
if (GST_IS_BUFFER (data)) {
|
||||||
|
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
|
||||||
|
"dropping UNEXPECTED buffer %p", data);
|
||||||
|
gst_buffer_unref (GST_BUFFER_CAST (data));
|
||||||
|
} else if (GST_IS_EVENT (data)) {
|
||||||
|
GstEvent *event = GST_EVENT_CAST (data);
|
||||||
|
GstEventType type = GST_EVENT_TYPE (event);
|
||||||
|
|
||||||
|
if (type == GST_EVENT_EOS || type == GST_EVENT_NEWSEGMENT) {
|
||||||
|
/* we found a pushable item in the queue, push it out */
|
||||||
|
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
|
||||||
|
"pushing pushable event %s after UNEXPECTED %p",
|
||||||
|
GST_EVENT_TYPE_NAME (event));
|
||||||
|
goto next;
|
||||||
|
}
|
||||||
|
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
|
||||||
|
"dropping UNEXPECTED event %p", event);
|
||||||
|
gst_event_unref (event);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
/* no more items in the queue. Set the unexpected flag so that upstream
|
||||||
|
* make us refuse any more buffers on the sinkpad. Since we will still
|
||||||
|
* accept EOS and NEWSEGMENT we return _FLOW_OK to the caller so that the
|
||||||
|
* task function does not shut down. */
|
||||||
|
queue->unexpected = TRUE;
|
||||||
|
result = GST_FLOW_OK;
|
||||||
|
}
|
||||||
} else if (GST_IS_EVENT (data)) {
|
} else if (GST_IS_EVENT (data)) {
|
||||||
GstEvent *event = GST_EVENT_CAST (data);
|
GstEvent *event = GST_EVENT_CAST (data);
|
||||||
GstEventType type = GST_EVENT_TYPE (event);
|
GstEventType type = GST_EVENT_TYPE (event);
|
||||||
|
@ -1337,9 +1413,12 @@ gst_queue_push_one (GstQueue * queue)
|
||||||
|
|
||||||
GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
|
GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
|
||||||
/* if we're EOS, return UNEXPECTED so that the task pauses. */
|
/* if we're EOS, return UNEXPECTED so that the task pauses. */
|
||||||
if (type == GST_EVENT_EOS)
|
if (type == GST_EVENT_EOS) {
|
||||||
|
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
|
||||||
|
"pushed EOS event %p, return UNEXPECTED", event);
|
||||||
result = GST_FLOW_UNEXPECTED;
|
result = GST_FLOW_UNEXPECTED;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
return result;
|
return result;
|
||||||
|
|
||||||
/* ERRORS */
|
/* ERRORS */
|
||||||
|
@ -1557,6 +1636,8 @@ gst_queue_sink_activate_push (GstPad * pad, gboolean active)
|
||||||
GST_QUEUE_MUTEX_LOCK (queue);
|
GST_QUEUE_MUTEX_LOCK (queue);
|
||||||
GST_DEBUG_OBJECT (queue, "activating push mode");
|
GST_DEBUG_OBJECT (queue, "activating push mode");
|
||||||
queue->srcresult = GST_FLOW_OK;
|
queue->srcresult = GST_FLOW_OK;
|
||||||
|
queue->is_eos = FALSE;
|
||||||
|
queue->unexpected = FALSE;
|
||||||
reset_rate_timer (queue);
|
reset_rate_timer (queue);
|
||||||
GST_QUEUE_MUTEX_UNLOCK (queue);
|
GST_QUEUE_MUTEX_UNLOCK (queue);
|
||||||
} else {
|
} else {
|
||||||
|
@ -1587,6 +1668,8 @@ gst_queue_src_activate_push (GstPad * pad, gboolean active)
|
||||||
GST_QUEUE_MUTEX_LOCK (queue);
|
GST_QUEUE_MUTEX_LOCK (queue);
|
||||||
GST_DEBUG_OBJECT (queue, "activating push mode");
|
GST_DEBUG_OBJECT (queue, "activating push mode");
|
||||||
queue->srcresult = GST_FLOW_OK;
|
queue->srcresult = GST_FLOW_OK;
|
||||||
|
queue->is_eos = FALSE;
|
||||||
|
queue->unexpected = FALSE;
|
||||||
result = gst_pad_start_task (pad, (GstTaskFunction) gst_queue_loop, pad);
|
result = gst_pad_start_task (pad, (GstTaskFunction) gst_queue_loop, pad);
|
||||||
GST_QUEUE_MUTEX_UNLOCK (queue);
|
GST_QUEUE_MUTEX_UNLOCK (queue);
|
||||||
} else {
|
} else {
|
||||||
|
@ -1621,6 +1704,8 @@ gst_queue_src_activate_pull (GstPad * pad, gboolean active)
|
||||||
GST_QUEUE_MUTEX_LOCK (queue);
|
GST_QUEUE_MUTEX_LOCK (queue);
|
||||||
GST_DEBUG_OBJECT (queue, "activating pull mode");
|
GST_DEBUG_OBJECT (queue, "activating pull mode");
|
||||||
queue->srcresult = GST_FLOW_OK;
|
queue->srcresult = GST_FLOW_OK;
|
||||||
|
queue->is_eos = FALSE;
|
||||||
|
queue->unexpected = FALSE;
|
||||||
result = TRUE;
|
result = TRUE;
|
||||||
GST_QUEUE_MUTEX_UNLOCK (queue);
|
GST_QUEUE_MUTEX_UNLOCK (queue);
|
||||||
} else {
|
} else {
|
||||||
|
|
Loading…
Reference in a new issue