mirror of
https://gitlab.freedesktop.org/gstreamer/gstreamer.git
synced 2024-10-04 17:52:29 +00:00
gst/playback/gstqueue2.c: Use separate timers for input and output rates.
Original commit message from CVS: * gst/playback/gstqueue2.c: (gst_queue_init), (gst_queue_finalize), (reset_rate_timer), (update_in_rates), (update_out_rates), (gst_queue_locked_enqueue), (gst_queue_locked_dequeue), (gst_queue_chain), (gst_queue_loop): Use separate timers for input and output rates. Pause measuring the output rate when we block for more data. See #503262.
This commit is contained in:
parent
3fb28dfc90
commit
4de6a788b8
1 changed files with 93 additions and 41 deletions
|
@ -177,13 +177,17 @@ struct _GstQueue
|
||||||
guint buffering_iteration;
|
guint buffering_iteration;
|
||||||
|
|
||||||
/* for measuring input/output rates */
|
/* for measuring input/output rates */
|
||||||
|
GTimer *in_timer;
|
||||||
|
gboolean in_timer_started;
|
||||||
|
gdouble last_in_elapsed;
|
||||||
guint64 bytes_in;
|
guint64 bytes_in;
|
||||||
guint64 bytes_out;
|
|
||||||
GTimer *timer;
|
|
||||||
gdouble byte_in_rate;
|
gdouble byte_in_rate;
|
||||||
|
|
||||||
|
GTimer *out_timer;
|
||||||
|
gboolean out_timer_started;
|
||||||
|
gdouble last_out_elapsed;
|
||||||
|
guint64 bytes_out;
|
||||||
gdouble byte_out_rate;
|
gdouble byte_out_rate;
|
||||||
gdouble last_elapsed;
|
|
||||||
gboolean timer_started;
|
|
||||||
|
|
||||||
GMutex *qlock; /* lock for queue (vs object lock) */
|
GMutex *qlock; /* lock for queue (vs object lock) */
|
||||||
gboolean waiting_add;
|
gboolean waiting_add;
|
||||||
|
@ -469,7 +473,8 @@ gst_queue_init (GstQueue * queue, GstQueueClass * g_class)
|
||||||
|
|
||||||
queue->srcresult = GST_FLOW_WRONG_STATE;
|
queue->srcresult = GST_FLOW_WRONG_STATE;
|
||||||
queue->is_eos = FALSE;
|
queue->is_eos = FALSE;
|
||||||
queue->timer = g_timer_new ();
|
queue->in_timer = g_timer_new ();
|
||||||
|
queue->out_timer = g_timer_new ();
|
||||||
|
|
||||||
queue->qlock = g_mutex_new ();
|
queue->qlock = g_mutex_new ();
|
||||||
queue->waiting_add = FALSE;
|
queue->waiting_add = FALSE;
|
||||||
|
@ -504,7 +509,8 @@ gst_queue_finalize (GObject * object)
|
||||||
g_mutex_free (queue->qlock);
|
g_mutex_free (queue->qlock);
|
||||||
g_cond_free (queue->item_add);
|
g_cond_free (queue->item_add);
|
||||||
g_cond_free (queue->item_del);
|
g_cond_free (queue->item_del);
|
||||||
g_timer_destroy (queue->timer);
|
g_timer_destroy (queue->in_timer);
|
||||||
|
g_timer_destroy (queue->out_timer);
|
||||||
|
|
||||||
/* temp_file path cleanup */
|
/* temp_file path cleanup */
|
||||||
if (queue->temp_location != NULL)
|
if (queue->temp_location != NULL)
|
||||||
|
@ -703,8 +709,10 @@ reset_rate_timer (GstQueue * queue)
|
||||||
queue->bytes_out = 0;
|
queue->bytes_out = 0;
|
||||||
queue->byte_in_rate = 0.0;
|
queue->byte_in_rate = 0.0;
|
||||||
queue->byte_out_rate = 0.0;
|
queue->byte_out_rate = 0.0;
|
||||||
queue->last_elapsed = 0.0;
|
queue->last_in_elapsed = 0.0;
|
||||||
queue->timer_started = FALSE;
|
queue->last_out_elapsed = 0.0;
|
||||||
|
queue->in_timer_started = FALSE;
|
||||||
|
queue->out_timer_started = FALSE;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* the interval in seconds to recalculate the rate */
|
/* the interval in seconds to recalculate the rate */
|
||||||
|
@ -717,55 +725,80 @@ reset_rate_timer (GstQueue * queue)
|
||||||
#define AVG_OUT(avg,val) ((avg) * 3.0 + (val)) / 4.0
|
#define AVG_OUT(avg,val) ((avg) * 3.0 + (val)) / 4.0
|
||||||
|
|
||||||
static void
|
static void
|
||||||
update_rates (GstQueue * queue)
|
update_in_rates (GstQueue * queue)
|
||||||
{
|
{
|
||||||
gdouble elapsed, period;
|
gdouble elapsed, period;
|
||||||
gdouble byte_in_rate;
|
gdouble byte_in_rate;
|
||||||
gdouble byte_out_rate;
|
|
||||||
|
|
||||||
if (!queue->timer_started) {
|
if (!queue->in_timer_started) {
|
||||||
queue->timer_started = TRUE;
|
queue->in_timer_started = TRUE;
|
||||||
g_timer_start (queue->timer);
|
g_timer_start (queue->in_timer);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
elapsed = g_timer_elapsed (queue->timer, NULL);
|
elapsed = g_timer_elapsed (queue->in_timer, NULL);
|
||||||
|
|
||||||
/* recalc after each interval. */
|
/* recalc after each interval. */
|
||||||
if (queue->last_elapsed + RATE_INTERVAL < elapsed) {
|
if (queue->last_in_elapsed + RATE_INTERVAL < elapsed) {
|
||||||
period = elapsed - queue->last_elapsed;
|
period = elapsed - queue->last_in_elapsed;
|
||||||
|
|
||||||
GST_DEBUG_OBJECT (queue,
|
GST_DEBUG_OBJECT (queue,
|
||||||
"rates: period %f, in %" G_GUINT64_FORMAT ", out %" G_GUINT64_FORMAT,
|
"rates: period %f, in %" G_GUINT64_FORMAT, period, queue->bytes_in);
|
||||||
period, queue->bytes_in, queue->bytes_out);
|
|
||||||
|
|
||||||
byte_in_rate = queue->bytes_in / period;
|
byte_in_rate = queue->bytes_in / period;
|
||||||
byte_out_rate = queue->bytes_out / period;
|
|
||||||
|
|
||||||
if (queue->byte_in_rate == 0.0)
|
if (queue->byte_in_rate == 0.0)
|
||||||
queue->byte_in_rate = byte_in_rate;
|
queue->byte_in_rate = byte_in_rate;
|
||||||
else
|
else
|
||||||
queue->byte_in_rate = AVG_IN (queue->byte_in_rate, byte_in_rate);
|
queue->byte_in_rate = AVG_IN (queue->byte_in_rate, byte_in_rate);
|
||||||
|
|
||||||
|
/* reset the values to calculate rate over the next interval */
|
||||||
|
queue->last_in_elapsed = elapsed;
|
||||||
|
queue->bytes_in = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (queue->byte_in_rate > 0.0) {
|
||||||
|
queue->cur_level.rate_time =
|
||||||
|
queue->cur_level.bytes / queue->byte_in_rate * GST_SECOND;
|
||||||
|
}
|
||||||
|
GST_DEBUG_OBJECT (queue, "rates: in %f, time %" GST_TIME_FORMAT,
|
||||||
|
queue->byte_in_rate, GST_TIME_ARGS (queue->cur_level.rate_time));
|
||||||
|
}
|
||||||
|
|
||||||
|
static void
|
||||||
|
update_out_rates (GstQueue * queue)
|
||||||
|
{
|
||||||
|
gdouble elapsed, period;
|
||||||
|
gdouble byte_out_rate;
|
||||||
|
|
||||||
|
if (!queue->out_timer_started) {
|
||||||
|
queue->out_timer_started = TRUE;
|
||||||
|
g_timer_start (queue->out_timer);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
elapsed = g_timer_elapsed (queue->out_timer, NULL);
|
||||||
|
|
||||||
|
/* recalc after each interval. */
|
||||||
|
if (queue->last_out_elapsed + RATE_INTERVAL < elapsed) {
|
||||||
|
period = elapsed - queue->last_out_elapsed;
|
||||||
|
|
||||||
|
GST_DEBUG_OBJECT (queue,
|
||||||
|
"rates: period %f, out %" G_GUINT64_FORMAT, period, queue->bytes_out);
|
||||||
|
|
||||||
|
byte_out_rate = queue->bytes_out / period;
|
||||||
|
|
||||||
if (queue->byte_out_rate == 0.0)
|
if (queue->byte_out_rate == 0.0)
|
||||||
queue->byte_out_rate = byte_out_rate;
|
queue->byte_out_rate = byte_out_rate;
|
||||||
else
|
else
|
||||||
queue->byte_out_rate = AVG_OUT (queue->byte_out_rate, byte_out_rate);
|
queue->byte_out_rate = AVG_OUT (queue->byte_out_rate, byte_out_rate);
|
||||||
|
|
||||||
/* reset the values to calculate rate over the next interval */
|
/* reset the values to calculate rate over the next interval */
|
||||||
queue->last_elapsed = elapsed;
|
queue->last_out_elapsed = elapsed;
|
||||||
queue->bytes_in = 0;
|
|
||||||
queue->bytes_out = 0;
|
queue->bytes_out = 0;
|
||||||
}
|
}
|
||||||
|
GST_DEBUG_OBJECT (queue, "rates: out %f, time %" GST_TIME_FORMAT,
|
||||||
if (queue->byte_in_rate > 0.0) {
|
queue->byte_out_rate, GST_TIME_ARGS (queue->cur_level.rate_time));
|
||||||
queue->cur_level.rate_time =
|
|
||||||
queue->cur_level.bytes / queue->byte_in_rate * GST_SECOND;
|
|
||||||
}
|
|
||||||
|
|
||||||
GST_DEBUG_OBJECT (queue, "rates: in %f, out %f, time %" GST_TIME_FORMAT,
|
|
||||||
queue->byte_in_rate, queue->byte_out_rate,
|
|
||||||
GST_TIME_ARGS (queue->cur_level.rate_time));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
|
@ -1006,7 +1039,7 @@ gst_queue_locked_enqueue (GstQueue * queue, gpointer item)
|
||||||
/* apply new buffer to segment stats */
|
/* apply new buffer to segment stats */
|
||||||
apply_buffer (queue, buffer, &queue->sink_segment);
|
apply_buffer (queue, buffer, &queue->sink_segment);
|
||||||
/* update the byterate stats */
|
/* update the byterate stats */
|
||||||
update_rates (queue);
|
update_in_rates (queue);
|
||||||
|
|
||||||
if (QUEUE_IS_USING_TEMP_FILE (queue)) {
|
if (QUEUE_IS_USING_TEMP_FILE (queue)) {
|
||||||
gst_queue_write_buffer_to_file (queue, buffer);
|
gst_queue_write_buffer_to_file (queue, buffer);
|
||||||
|
@ -1102,7 +1135,7 @@ gst_queue_locked_dequeue (GstQueue * queue)
|
||||||
queue->bytes_out += size;
|
queue->bytes_out += size;
|
||||||
apply_buffer (queue, buffer, &queue->src_segment);
|
apply_buffer (queue, buffer, &queue->src_segment);
|
||||||
/* update the byterate stats */
|
/* update the byterate stats */
|
||||||
update_rates (queue);
|
update_out_rates (queue);
|
||||||
/* update the buffering */
|
/* update the buffering */
|
||||||
update_buffering (queue);
|
update_buffering (queue);
|
||||||
|
|
||||||
|
@ -1291,22 +1324,25 @@ gst_queue_chain (GstPad * pad, GstBuffer * buffer)
|
||||||
|
|
||||||
/* We make space available if we're "full" according to whatever
|
/* We make space available if we're "full" according to whatever
|
||||||
* the user defined as "full". */
|
* the user defined as "full". */
|
||||||
while (gst_queue_is_filled (queue)) {
|
if (gst_queue_is_filled (queue)) {
|
||||||
gboolean started;
|
gboolean started;
|
||||||
|
|
||||||
/* pause the timer while we wait. The fact that we are waiting does not mean
|
/* pause the timer while we wait. The fact that we are waiting does not mean
|
||||||
* the byterate on the input pad is lower */
|
* the byterate on the input pad is lower */
|
||||||
if ((started = queue->timer_started))
|
if ((started = queue->in_timer_started))
|
||||||
g_timer_stop (queue->timer);
|
g_timer_stop (queue->in_timer);
|
||||||
|
|
||||||
GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
|
GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
|
||||||
"queue is full, waiting for free space");
|
"queue is full, waiting for free space");
|
||||||
|
do {
|
||||||
/* Wait for space to be available, we could be unlocked because of a flush. */
|
/* Wait for space to be available, we could be unlocked because of a flush. */
|
||||||
GST_QUEUE_WAIT_DEL_CHECK (queue, out_flushing);
|
GST_QUEUE_WAIT_DEL_CHECK (queue, out_flushing);
|
||||||
|
}
|
||||||
|
while (gst_queue_is_filled (queue));
|
||||||
|
|
||||||
/* and continue if we were running before */
|
/* and continue if we were running before */
|
||||||
if (started)
|
if (started)
|
||||||
g_timer_continue (queue->timer);
|
g_timer_continue (queue->in_timer);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* put buffer in queue now */
|
/* put buffer in queue now */
|
||||||
|
@ -1458,10 +1494,26 @@ gst_queue_loop (GstPad * pad)
|
||||||
/* have to lock for thread-safety */
|
/* have to lock for thread-safety */
|
||||||
GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
|
GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
|
||||||
|
|
||||||
while (gst_queue_is_empty (queue)) {
|
if (gst_queue_is_empty (queue)) {
|
||||||
/* we recheck, we could be unlocked because of a flush. */
|
gboolean started;
|
||||||
|
|
||||||
|
/* pause the timer while we wait. The fact that we are waiting does not mean
|
||||||
|
* the byterate on the output pad is lower */
|
||||||
|
if ((started = queue->out_timer_started))
|
||||||
|
g_timer_stop (queue->out_timer);
|
||||||
|
|
||||||
|
GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
|
||||||
|
"queue is empty, waiting for new data");
|
||||||
|
do {
|
||||||
|
/* Wait for data to be available, we could be unlocked because of a flush. */
|
||||||
GST_QUEUE_WAIT_ADD_CHECK (queue, out_flushing);
|
GST_QUEUE_WAIT_ADD_CHECK (queue, out_flushing);
|
||||||
}
|
}
|
||||||
|
while (gst_queue_is_empty (queue));
|
||||||
|
|
||||||
|
/* and continue if we were running before */
|
||||||
|
if (started)
|
||||||
|
g_timer_continue (queue->out_timer);
|
||||||
|
}
|
||||||
ret = gst_queue_push_one (queue);
|
ret = gst_queue_push_one (queue);
|
||||||
queue->srcresult = ret;
|
queue->srcresult = ret;
|
||||||
if (ret != GST_FLOW_OK)
|
if (ret != GST_FLOW_OK)
|
||||||
|
|
Loading…
Reference in a new issue