pipeline: Instant rate change handling

Implement aggregation of INSTANT_RATE_REQUEST messages and sending of
INSTANT_RATE_SYNC_TIME events.
This commit is contained in:
Jan Schmidt 2019-02-07 23:59:51 +11:00
parent f72c89b159
commit acf66273f5

View file

@ -116,6 +116,12 @@ struct _GstPipelinePrivate
gboolean update_clock;
GstClockTime latency;
/* seqnum of the most recent instant-rate-request, %GST_SEQNUM_INVALID if none */
guint32 instant_rate_seqnum;
gdouble active_instant_rate;
GstClockTime instant_rate_upstream_anchor;
GstClockTime instant_rate_clock_anchor;
};
@ -131,6 +137,8 @@ static GstStateChangeReturn gst_pipeline_change_state (GstElement * element,
static void gst_pipeline_handle_message (GstBin * bin, GstMessage * message);
static gboolean gst_pipeline_do_latency (GstBin * bin);
static gboolean gst_pipeline_handle_instant_rate (GstPipeline * pipeline,
gdouble rate, guint32 seqnum);
/* static guint gst_pipeline_signals[LAST_SIGNAL] = { 0 }; */
@ -302,6 +310,15 @@ reset_start_time (GstPipeline * pipeline, GstClockTime start_time)
GST_DEBUG_OBJECT (pipeline, "reset start_time to 0");
GST_ELEMENT_START_TIME (pipeline) = start_time;
pipeline->priv->last_start_time = -1;
/* Reset instant rate multiplier because we flushed / reset time.
* Old anchor's don't make sense */
pipeline->priv->instant_rate_seqnum = GST_SEQNUM_INVALID;
pipeline->priv->instant_rate_upstream_anchor =
pipeline->priv->instant_rate_clock_anchor = GST_CLOCK_TIME_NONE;
pipeline->priv->active_instant_rate = 1.0;
GST_DEBUG_OBJECT (pipeline, "Reset start time to %" GST_TIME_FORMAT,
GST_TIME_ARGS (start_time));
} else {
GST_DEBUG_OBJECT (pipeline, "application asked to not reset stream_time");
}
@ -575,6 +592,12 @@ invalid_clock:
* the PAUSED, pending PAUSED state. When the ASYNC_DONE message is received the
* pipeline will redistribute the new base_time and will bring the elements back
* to the desired state of the pipeline. */
/* GST_MESSAGE_INSTANT_RATE_REQUEST: This message is only posted by sinks and
* bins containing sinks (which are also considered sinks). Once all sinks
* have posted this message it is posted to the parent bin, or if this is
* a top-level bin (e.g. pipeline), a instant-rate-sync-time event with
* the current running time is sent to the whole pipeline.
*/
static void
gst_pipeline_handle_message (GstBin * bin, GstMessage * message)
{
@ -605,6 +628,18 @@ gst_pipeline_handle_message (GstBin * bin, GstMessage * message)
pipeline->priv->update_clock = TRUE;
}
GST_OBJECT_UNLOCK (bin);
}
break;
case GST_MESSAGE_INSTANT_RATE_REQUEST:{
guint32 seqnum = gst_message_get_seqnum (message);
gdouble rate_multiplier;
gst_message_parse_instant_rate_request (message, &rate_multiplier);
gst_pipeline_handle_instant_rate (pipeline, rate_multiplier, seqnum);
break;
}
default:
break;
@ -1026,3 +1061,88 @@ gst_pipeline_get_latency (GstPipeline * pipeline)
return latency;
}
static gboolean
gst_pipeline_handle_instant_rate (GstPipeline * pipeline, gdouble rate,
guint32 seqnum)
{
GstClockTime running_time = GST_CLOCK_TIME_NONE;
GstClockTime upstream_running_time = GST_CLOCK_TIME_NONE;
gboolean is_playing;
GstEvent *event;
GST_OBJECT_LOCK (pipeline);
if (pipeline->priv->instant_rate_seqnum != GST_SEQNUM_INVALID &&
pipeline->priv->instant_rate_seqnum == seqnum) {
GST_DEBUG_OBJECT (pipeline,
"Handling duplicate instant-rate-request message with seqnum %u",
seqnum);
upstream_running_time = pipeline->priv->instant_rate_upstream_anchor;
running_time = pipeline->priv->instant_rate_clock_anchor;
if (G_UNLIKELY (rate != pipeline->priv->active_instant_rate)) {
GST_WARNING_OBJECT (pipeline,
"Repeated instant-rate-request has a different rate to before! %f != %f",
rate, pipeline->priv->active_instant_rate);
rate = pipeline->priv->active_instant_rate;
}
} else {
/* Get the current running time of the pipeline */
is_playing = GST_STATE (pipeline) == GST_STATE_PLAYING
&& (GST_STATE_PENDING (pipeline) == GST_STATE_VOID_PENDING ||
GST_STATE_PENDING (pipeline) == GST_STATE_PLAYING);
if (is_playing) {
GstClockTime base_time, clock_time;
GstClock *clock;
base_time = GST_ELEMENT_CAST (pipeline)->base_time;
clock = GST_ELEMENT_CLOCK (pipeline);
if (clock) {
clock_time = gst_clock_get_time (clock);
running_time = clock_time - base_time;
}
} else {
running_time = GST_ELEMENT_START_TIME (pipeline);
}
if (!GST_CLOCK_TIME_IS_VALID (running_time)) {
GST_OBJECT_UNLOCK (pipeline);
return FALSE;
}
if (GST_CLOCK_TIME_IS_VALID (pipeline->priv->instant_rate_upstream_anchor)) {
/* Already had an override, calculate the adjustment due to that
* elapsed duration */
GstClockTime elapsed =
running_time - pipeline->priv->instant_rate_clock_anchor;
pipeline->priv->instant_rate_upstream_anchor +=
elapsed * pipeline->priv->active_instant_rate;
pipeline->priv->instant_rate_clock_anchor = running_time;
} else {
/* Else this is the first override event */
pipeline->priv->instant_rate_upstream_anchor =
pipeline->priv->instant_rate_clock_anchor = running_time;
}
upstream_running_time = pipeline->priv->instant_rate_upstream_anchor;
pipeline->priv->instant_rate_seqnum = seqnum;
pipeline->priv->active_instant_rate = rate;
}
GST_OBJECT_UNLOCK (pipeline);
GST_DEBUG_OBJECT (pipeline,
"Instant rate multiplier to %f rt %" GST_TIME_FORMAT " upstream %"
GST_TIME_FORMAT, rate, GST_TIME_ARGS (running_time),
GST_TIME_ARGS (upstream_running_time));
event =
gst_event_new_instant_rate_sync_time (rate, running_time,
upstream_running_time);
gst_event_set_seqnum (event, seqnum);
return gst_element_send_event (GST_ELEMENT_CAST (pipeline), event);
}