basesink: Add support for instant-rate-change events

Post instant-rate-request message when receiving an instant-rate-change
event, and handle the incoming instant-rate-sync-time events from the
pipeline.
This commit is contained in:
Sebastian Dröge 2018-05-15 18:42:25 +03:00 committed by Jan Schmidt
parent 74412496cf
commit f72c89b159

View file

@ -245,6 +245,25 @@ struct _GstBaseSinkPrivate
GstStepInfo current_step;
GstStepInfo pending_step;
/* instant rate change state */
/* seqnum of the last instant-rate-sync-time event
* received. %GST_SEQNUM_INVALID if there isn't one */
guint32 instant_rate_sync_seqnum;
/* Active instant-rate multipler. 0.0 if nothing pending */
gdouble instant_rate_multiplier;
/* seqnum of the last instant-rate event.
* %GST_SEQNUM_INVALID if there isn't one */
guint32 last_instant_rate_seqnum;
guint32 segment_seqnum;
GstSegment upstream_segment;
/* Running time at the start of the last segment event
* or instant-rate switch in *our* segment, not upstream */
GstClockTime last_anchor_running_time;
/* Difference between upstream running time and our own running time
* at the last segment event or instant-rate switch:
* upstream + offset = ours */
GstClockTimeDiff instant_rate_offset;
/* Cached GstClockID */
GstClockID cached_clock_id;
@ -2626,9 +2645,14 @@ gst_base_sink_do_sync (GstBaseSink * basesink,
GstFlowReturn ret;
GstStepInfo *current, *pending;
gboolean stepped;
guint32 current_instant_rate_seqnum;
priv = basesink->priv;
/* remember the currently handled instant-rate sequence number. If this
* changes after pre-rolling, we need to goto do_step again for updating
* the timing information of the current buffer */
current_instant_rate_seqnum = priv->instant_rate_sync_seqnum;
do_step:
sstart = sstop = rstart = rstop = rnext = GST_CLOCK_TIME_NONE;
do_sync = TRUE;
@ -2697,6 +2721,13 @@ again:
goto do_step;
}
if (G_UNLIKELY (priv->instant_rate_sync_seqnum !=
current_instant_rate_seqnum)) {
current_instant_rate_seqnum = priv->instant_rate_sync_seqnum;
// TODO rename the goto label - it does more these days.
goto do_step;
}
/* After rendering we store the position of the last buffer so that we can use
* it to report the position. We need to take the lock here. */
GST_OBJECT_LOCK (basesink);
@ -2745,6 +2776,13 @@ again:
/* check for unlocked by a state change, we are not flushing so
* we can try to preroll on the current buffer. */
if (G_UNLIKELY (status == GST_CLOCK_UNSCHEDULED)) {
if (G_UNLIKELY (priv->instant_rate_sync_seqnum !=
current_instant_rate_seqnum)) {
current_instant_rate_seqnum = priv->instant_rate_sync_seqnum;
// TODO rename
goto do_step;
}
GST_DEBUG_OBJECT (basesink, "unscheduled, waiting some more");
priv->call_preroll = TRUE;
goto again;
@ -2803,6 +2841,60 @@ gst_base_sink_send_qos (GstBaseSink * basesink, GstQOSType type,
"qos: type %d, proportion: %lf, diff %" G_GINT64_FORMAT ", timestamp %"
GST_TIME_FORMAT, type, proportion, diff, GST_TIME_ARGS (time));
/* Compensate for any instant-rate-change related running time offset
* between upstream and the internal running time of the sink */
if (basesink->priv->instant_rate_sync_seqnum != GST_SEQNUM_INVALID) {
GstClockTime actual_duration;
GstClockTime upstream_duration;
GstClockTimeDiff difference;
gboolean negative_duration;
GST_DEBUG_OBJECT (basesink,
"Current internal running time %" GST_TIME_FORMAT
", last internal running time %" GST_TIME_FORMAT, GST_TIME_ARGS (time),
GST_TIME_ARGS (basesink->priv->last_anchor_running_time));
/* Calculate how much running time was spent since the last switch/segment
* in the "corrected upstream segment", our segment */
/* Due to rounding errors and other inaccuracies, it can happen
* that our calculated internal running time is before the upstream
* running time. We need to compensate for that */
if (time < basesink->priv->last_anchor_running_time) {
actual_duration = basesink->priv->last_anchor_running_time - time;
negative_duration = TRUE;
} else {
actual_duration = time - basesink->priv->last_anchor_running_time;
negative_duration = FALSE;
}
/* Transpose that duration (i.e. what upstream beliefs) */
upstream_duration =
(actual_duration * basesink->segment.rate) /
basesink->priv->upstream_segment.rate;
/* Add the difference to the previously accumulated correction */
if (negative_duration)
difference = upstream_duration - actual_duration;
else
difference = actual_duration - upstream_duration;
GST_DEBUG_OBJECT (basesink,
"Current instant rate correction offset. Actual duration %"
GST_TIME_FORMAT ", upstream duration %" GST_TIME_FORMAT
", negative %d, difference %" GST_STIME_FORMAT ", current offset %"
GST_STIME_FORMAT, GST_TIME_ARGS (actual_duration),
GST_TIME_ARGS (upstream_duration), negative_duration,
GST_STIME_ARGS (difference),
GST_STIME_ARGS (basesink->priv->instant_rate_offset + difference));
difference = basesink->priv->instant_rate_offset + difference;
if (difference > 0 && time < difference)
time = 0;
else
time -= difference;
}
event = gst_event_new_qos (type, proportion, diff, time);
/* send upstream */
@ -3151,8 +3243,17 @@ gst_base_sink_flush_stop (GstBaseSink * basesink, GstPad * pad,
/* we need new segment info after the flush. */
basesink->have_newsegment = FALSE;
if (reset_time) {
gst_segment_init (&basesink->priv->upstream_segment,
GST_FORMAT_UNDEFINED);
gst_segment_init (&basesink->segment, GST_FORMAT_UNDEFINED);
GST_ELEMENT_START_TIME (basesink) = 0;
basesink->priv->last_instant_rate_seqnum = GST_SEQNUM_INVALID;
basesink->priv->instant_rate_sync_seqnum = GST_SEQNUM_INVALID;
basesink->priv->instant_rate_multiplier = 0;
basesink->priv->segment_seqnum = GST_SEQNUM_INVALID;
basesink->priv->instant_rate_offset = 0;
basesink->priv->last_anchor_running_time = 0;
}
}
GST_OBJECT_UNLOCK (basesink);
@ -3299,21 +3400,140 @@ gst_base_sink_default_event (GstBaseSink * basesink, GstEvent * event)
gst_caps_unref (current_caps);
break;
}
case GST_EVENT_SEGMENT:
case GST_EVENT_SEGMENT:{
guint32 seqnum = gst_event_get_seqnum (event);
GstSegment new_segment;
/* configure the segment */
/* The segment is protected with both the STREAM_LOCK and the OBJECT_LOCK.
* We protect with the OBJECT_LOCK so that we can use the values to
* safely answer a POSITION query. */
GST_OBJECT_LOCK (basesink);
/* the newsegment event is needed to bring the buffer timestamps to the
/* the new segment event is needed to bring the buffer timestamps to the
* stream time and to drop samples outside of the playback segment. */
gst_event_copy_segment (event, &basesink->segment);
gst_event_copy_segment (event, &new_segment);
GST_DEBUG_OBJECT (basesink,
"received upstream segment %u %" GST_SEGMENT_FORMAT, seqnum,
&new_segment);
/* Make sure that the position stays between start and stop */
new_segment.position =
CLAMP (new_segment.position, new_segment.start, new_segment.stop);
if (basesink->priv->instant_rate_sync_seqnum != GST_SEQNUM_INVALID) {
GstClockTime upstream_duration;
GstClockTime actual_duration;
GstClockTime new_segment_running_time;
GstClockTimeDiff difference;
gboolean negative_duration;
/* Calculate how much running time upstream believes has passed since
* the last switch/segment */
new_segment_running_time =
gst_segment_to_running_time (&new_segment, GST_FORMAT_TIME,
new_segment.position);
GST_DEBUG_OBJECT (basesink,
"Current upstream running time %" GST_TIME_FORMAT
", last upstream running time %" GST_TIME_FORMAT,
GST_TIME_ARGS (new_segment_running_time),
GST_TIME_ARGS (basesink->priv->last_anchor_running_time -
basesink->priv->instant_rate_offset));
/* Due to rounding errors and other inaccuracies, it can happen
* that our calculated internal running time is before the upstream
* running time. We need to compensate for that */
if (new_segment_running_time <
basesink->priv->last_anchor_running_time -
basesink->priv->instant_rate_offset) {
upstream_duration =
basesink->priv->last_anchor_running_time -
basesink->priv->instant_rate_offset - new_segment_running_time;
negative_duration = TRUE;
} else {
upstream_duration =
new_segment_running_time -
basesink->priv->last_anchor_running_time +
basesink->priv->instant_rate_offset;
negative_duration = FALSE;
}
/* Calculate the actual running-time duration of the previous segment */
actual_duration =
(upstream_duration * basesink->priv->instant_rate_multiplier);
if (negative_duration)
difference = upstream_duration - actual_duration;
else
difference = actual_duration - upstream_duration;
GST_DEBUG_OBJECT (basesink,
"Current internal running time %" GST_TIME_FORMAT
", last internal running time %" GST_TIME_FORMAT,
GST_TIME_ARGS (new_segment_running_time +
basesink->priv->instant_rate_offset + difference),
GST_TIME_ARGS (basesink->priv->last_anchor_running_time));
/* Add the difference to the previously accumulated correction. */
basesink->priv->instant_rate_offset += difference;
GST_DEBUG_OBJECT (basesink,
"Updating instant rate correction offset. Actual duration %"
GST_TIME_FORMAT ", upstream duration %" GST_TIME_FORMAT
", negative %d, difference %" GST_STIME_FORMAT ", new offset %"
GST_STIME_FORMAT, GST_TIME_ARGS (actual_duration),
GST_TIME_ARGS (upstream_duration),
negative_duration,
GST_STIME_ARGS (difference),
GST_STIME_ARGS (basesink->priv->instant_rate_offset));
if (basesink->priv->instant_rate_offset < 0 &&
new_segment_running_time < -basesink->priv->instant_rate_offset) {
GST_WARNING_OBJECT (basesink,
"Upstream current running time %" GST_TIME_FORMAT
" is smaller than calculated offset %" GST_STIME_FORMAT,
GST_TIME_ARGS (new_segment_running_time),
GST_STIME_ARGS (basesink->priv->instant_rate_offset));
basesink->priv->last_anchor_running_time = 0;
basesink->priv->instant_rate_offset = 0;
} else {
basesink->priv->last_anchor_running_time =
new_segment_running_time + basesink->priv->instant_rate_offset;
}
/* Update the segments from the event and with the newly calculated
* correction offset */
basesink->priv->upstream_segment = new_segment;
basesink->segment = new_segment;
basesink->segment.rate *= basesink->priv->instant_rate_multiplier;
gst_segment_offset_running_time (&basesink->segment, GST_FORMAT_TIME,
basesink->priv->instant_rate_offset);
GST_DEBUG_OBJECT (basesink,
"Adjusted segment is now %" GST_SEGMENT_FORMAT, &basesink->segment);
} else {
/* otherwise both segments are simply the same, no correction needed */
basesink->priv->upstream_segment = new_segment;
basesink->segment = new_segment;
basesink->priv->last_anchor_running_time =
gst_segment_to_running_time (&new_segment, new_segment.format,
new_segment.position);
basesink->priv->instant_rate_offset = 0; /* Should already be 0, but to be sure */
}
GST_DEBUG_OBJECT (basesink, "configured segment %" GST_SEGMENT_FORMAT,
&basesink->segment);
basesink->priv->segment_seqnum = seqnum;
basesink->have_newsegment = TRUE;
gst_base_sink_reset_qos (basesink);
GST_OBJECT_UNLOCK (basesink);
break;
}
case GST_EVENT_GAP:
{
if (G_UNLIKELY (gst_base_sink_wait_event (basesink,
@ -3354,6 +3574,41 @@ gst_base_sink_default_event (GstBaseSink * basesink, GstEvent * event)
gst_element_post_message (GST_ELEMENT_CAST (basesink), msg);
break;
}
case GST_EVENT_INSTANT_RATE_CHANGE:
{
GstMessage *msg;
gdouble rate_multiplier;
guint32 seqnum = gst_event_get_seqnum (event);
GST_OBJECT_LOCK (basesink);
if (G_UNLIKELY (basesink->priv->last_instant_rate_seqnum == seqnum)) {
/* Ignore repeated event */
GST_LOG_OBJECT (basesink,
"Ignoring repeated instant-rate-change event");
GST_OBJECT_UNLOCK (basesink);
break;
}
if (basesink->priv->instant_rate_sync_seqnum == seqnum) {
/* Ignore if we already received the instant-rate-sync-time event from the pipeline */
GST_LOG_OBJECT (basesink,
"Ignoring instant-rate-change event for which we already received instant-rate-sync-time");
GST_OBJECT_UNLOCK (basesink);
break;
}
basesink->priv->last_instant_rate_seqnum = seqnum;
GST_OBJECT_UNLOCK (basesink);
gst_event_parse_instant_rate_change (event, &rate_multiplier, NULL);
msg =
gst_message_new_instant_rate_request (GST_OBJECT_CAST (basesink),
rate_multiplier);
gst_message_set_seqnum (msg, seqnum);
gst_element_post_message (GST_ELEMENT_CAST (basesink), msg);
break;
}
default:
break;
}
@ -4163,6 +4418,114 @@ gst_base_sink_perform_step (GstBaseSink * sink, GstPad * pad, GstEvent * event)
return TRUE;
}
static gboolean
gst_base_sink_perform_instant_rate_change (GstBaseSink * sink, GstPad * pad,
GstEvent * event)
{
GstBaseSinkPrivate *priv;
guint32 seqnum;
gdouble rate;
GstClockTime running_time, upstream_running_time;
GstClockTime switch_time;
gint res;
priv = sink->priv;
GST_DEBUG_OBJECT (sink, "performing instant-rate-change with event %p",
event);
seqnum = gst_event_get_seqnum (event);
gst_event_parse_instant_rate_sync_time (event, &rate, &running_time,
&upstream_running_time);
GST_DEBUG_OBJECT (sink, "instant-rate-change %u %lf at %" GST_TIME_FORMAT
", upstream %" GST_TIME_FORMAT,
seqnum, rate, GST_TIME_ARGS (running_time),
GST_TIME_ARGS (upstream_running_time));
/* Take the preroll lock so we can change the segment. We do not call unlock
* like for stepping as that would cause the PLAYING state to be lost and
* would get us into prerolling again first
*
* FIXME: The below potentially blocks until the chain function returns, but
* the lock is not taken during all waiting operations inside the chain
* function (clock, preroll) so this should be fine in most cases. Only
* problem is if the render() or prepare() functions are waiting themselves!
*
* FIXME: If the subclass is calling gst_base_sink_wait() it will be woken
* up but there is no way for it to update the timestamps, or to report back
* to the base class that it should recalculate the values. The current
* change would not be instantaneous in that case but would wait until the
* next buffer.
*/
GST_BASE_SINK_PREROLL_LOCK (sink);
/* We can safely change the segment and everything here as we hold the
* PREROLL_LOCK and it is taken for the whole chain function */
sink->priv->instant_rate_sync_seqnum = seqnum;
sink->priv->instant_rate_multiplier = rate;
sink->priv->instant_rate_offset = running_time - upstream_running_time;
sink->priv->last_anchor_running_time = running_time;
GST_DEBUG_OBJECT (sink, "Current internal running time %" GST_TIME_FORMAT
", last internal running time %" GST_TIME_FORMAT,
GST_TIME_ARGS (running_time),
GST_TIME_ARGS (sink->priv->last_anchor_running_time));
/* Calculate the current position in the segment and do a seek with the
* new rate. This updates rate, base and offset accordingly */
res =
gst_segment_position_from_running_time_full (&sink->segment,
GST_FORMAT_TIME, running_time, &switch_time);
GST_DEBUG_OBJECT (sink, "Before adjustment seg is %" GST_SEGMENT_FORMAT
" new running_time %" GST_TIME_FORMAT
" position %" GST_STIME_FORMAT " res %d", &sink->segment,
GST_TIME_ARGS (running_time),
GST_STIME_ARGS ((GstClockTimeDiff) switch_time), res);
if (res < 0) {
GST_WARNING_OBJECT (sink,
"Negative position calculated. Can't instant-rate change to there");
GST_BASE_SINK_PREROLL_UNLOCK (sink);
return TRUE;
}
sink->segment.position = switch_time;
/* Calculate new output rate based on upstream value */
rate *= sink->priv->upstream_segment.rate;
gst_segment_do_seek (&sink->segment, rate, GST_FORMAT_TIME,
sink->segment.flags & (~GST_SEEK_FLAG_FLUSH) &
GST_SEEK_FLAG_INSTANT_RATE_CHANGE, GST_SEEK_TYPE_NONE, -1,
GST_SEEK_TYPE_NONE, -1, NULL);
GST_DEBUG_OBJECT (sink, "Adjusted segment is now %" GST_SEGMENT_FORMAT,
&sink->segment);
priv->current_sstart = GST_CLOCK_TIME_NONE;
priv->current_sstop = GST_CLOCK_TIME_NONE;
priv->eos_rtime = GST_CLOCK_TIME_NONE;
gst_base_sink_reset_qos (sink);
if (sink->clock_id) {
gst_clock_id_unschedule (sink->clock_id);
}
if (sink->have_preroll) {
GST_DEBUG_OBJECT (sink, "signal waiter");
/* TODO: Rename this, and GST_FLOW_STEP */
priv->step_unlock = TRUE;
GST_BASE_SINK_PREROLL_SIGNAL (sink);
}
GST_BASE_SINK_PREROLL_UNLOCK (sink);
return TRUE;
}
/* with STREAM_LOCK
*/
static void
@ -4616,6 +4979,18 @@ gst_base_sink_send_event (GstElement * element, GstEvent * event)
* when a particular piece of data will be rendered. */
break;
}
case GST_EVENT_INSTANT_RATE_SYNC_TIME:
{
gst_base_sink_perform_instant_rate_change (basesink, pad, event);
/* Forward the event. If upstream handles it already, it is supposed to
* send a SEGMENT event with the same seqnum and the final rate before
* the next buffer
*/
forward = TRUE;
break;
}
case GST_EVENT_SEEK:
/* in pull mode we will execute the seek */
if (mode == GST_PAD_MODE_PULL)
@ -4632,6 +5007,84 @@ gst_base_sink_send_event (GstElement * element, GstEvent * event)
if (forward) {
GST_DEBUG_OBJECT (basesink, "sending event %p %" GST_PTR_FORMAT, event,
event);
/* Compensate for any instant-rate-change related running time offset
* between upstream and the internal running time of the sink */
if (basesink->priv->instant_rate_sync_seqnum != GST_SEQNUM_INVALID) {
GstClockTime now = GST_CLOCK_TIME_NONE;
GstClockTime actual_duration;
GstClockTime upstream_duration;
GstClockTimeDiff difference;
gboolean is_playing, negative_duration;
GST_OBJECT_LOCK (basesink);
is_playing = GST_STATE (basesink) == GST_STATE_PLAYING
&& (GST_STATE_PENDING (basesink) == GST_STATE_VOID_PENDING ||
GST_STATE_PENDING (basesink) == GST_STATE_PLAYING);
if (is_playing) {
GstClockTime base_time, clock_time;
GstClock *clock;
base_time = GST_ELEMENT_CAST (basesink)->base_time;
clock = GST_ELEMENT_CLOCK (basesink);
GST_OBJECT_UNLOCK (basesink);
if (clock) {
clock_time = gst_clock_get_time (clock);
now = clock_time - base_time;
}
} else {
now = GST_ELEMENT_START_TIME (basesink);
GST_OBJECT_UNLOCK (basesink);
}
GST_DEBUG_OBJECT (basesink,
"Current internal running time %" GST_TIME_FORMAT
", last internal running time %" GST_TIME_FORMAT, GST_TIME_ARGS (now),
GST_TIME_ARGS (basesink->priv->last_anchor_running_time));
if (now != GST_CLOCK_TIME_NONE) {
/* Calculate how much running time was spent since the last switch/segment
* in the "corrected upstream segment", our segment */
/* Due to rounding errors and other inaccuracies, it can happen
* that our calculated internal running time is before the upstream
* running time. We need to compensate for that */
if (now < basesink->priv->last_anchor_running_time) {
actual_duration = basesink->priv->last_anchor_running_time - now;
negative_duration = TRUE;
} else {
actual_duration = now - basesink->priv->last_anchor_running_time;
negative_duration = FALSE;
}
/* Transpose that duration (i.e. what upstream beliefs) */
upstream_duration =
(actual_duration * basesink->segment.rate) /
basesink->priv->upstream_segment.rate;
/* Add the difference to the previously accumulated correction */
if (negative_duration)
difference = upstream_duration - actual_duration;
else
difference = actual_duration - upstream_duration;
GST_DEBUG_OBJECT (basesink,
"Current instant rate correction offset. Actual duration %"
GST_TIME_FORMAT ", upstream duration %" GST_TIME_FORMAT
", negative %d, difference %" GST_STIME_FORMAT ", current offset %"
GST_STIME_FORMAT, GST_TIME_ARGS (actual_duration),
GST_TIME_ARGS (upstream_duration), negative_duration,
GST_STIME_ARGS (difference),
GST_STIME_ARGS (basesink->priv->instant_rate_offset + difference));
difference = basesink->priv->instant_rate_offset + difference;
event = gst_event_make_writable (event);
gst_event_set_running_time_offset (event, -difference);
}
}
result = gst_pad_push_event (pad, event);
} else {
/* not forwarded, unref the event */
@ -5190,6 +5643,8 @@ gst_base_sink_change_state (GstElement * element, GstStateChange transition)
GST_DEBUG_OBJECT (basesink, "READY to PAUSED");
basesink->have_newsegment = FALSE;
gst_segment_init (&basesink->segment, GST_FORMAT_UNDEFINED);
gst_segment_init (&basesink->priv->upstream_segment,
GST_FORMAT_UNDEFINED);
basesink->offset = 0;
basesink->have_preroll = FALSE;
priv->step_unlock = FALSE;
@ -5207,6 +5662,12 @@ gst_base_sink_change_state (GstElement * element, GstStateChange transition)
priv->call_preroll = TRUE;
priv->current_step.valid = FALSE;
priv->pending_step.valid = FALSE;
priv->instant_rate_sync_seqnum = GST_SEQNUM_INVALID;
priv->instant_rate_multiplier = 0;
priv->last_instant_rate_seqnum = GST_SEQNUM_INVALID;
priv->segment_seqnum = GST_SEQNUM_INVALID;
priv->instant_rate_offset = 0;
priv->last_anchor_running_time = 0;
if (priv->async_enabled) {
GST_DEBUG_OBJECT (basesink, "doing async state change");
/* when async enabled, post async-start message and return ASYNC from