splitmuxsink: Enhancement for timecode based split

The calculated threshold for timecode might be varying depending on
"max-size-timecode" and framerate.
For instance, with framerate 29.97 (30000/1001) and
"max-size-timecode=00:02:00;02", every fragment will have identical
number of frames 3598. However, when "max-size-timecode=00:02:00;00",
calculated next keyframe via gst_video_time_code_add_interval()
can be different per fragment, but this is the nature of timecode.
To compensate such timecode drift, we should keep track of expected
timecode of next fragment based on observed timecode.
This commit is contained in:
Seungha Yang 2020-04-10 23:52:45 +09:00
parent fe73c3b0f3
commit ca48f5265e
2 changed files with 169 additions and 44 deletions

View file

@ -234,6 +234,7 @@ static GstElement *create_element (GstSplitMuxSink * splitmux,
const gchar * factory, const gchar * name, gboolean locked); const gchar * factory, const gchar * name, gboolean locked);
static void do_async_done (GstSplitMuxSink * splitmux); static void do_async_done (GstSplitMuxSink * splitmux);
static void gst_splitmux_reset_timecode (GstSplitMuxSink * splitmux);
static MqStreamBuf * static MqStreamBuf *
mq_stream_buf_new (void) mq_stream_buf_new (void)
@ -554,7 +555,7 @@ gst_splitmux_sink_init (GstSplitMuxSink * splitmux)
splitmux->reset_muxer = DEFAULT_RESET_MUXER; splitmux->reset_muxer = DEFAULT_RESET_MUXER;
splitmux->threshold_timecode_str = NULL; splitmux->threshold_timecode_str = NULL;
splitmux->threshold_timecode = GST_CLOCK_TIME_NONE; gst_splitmux_reset_timecode (splitmux);
splitmux->async_finalize = DEFAULT_ASYNC_FINALIZE; splitmux->async_finalize = DEFAULT_ASYNC_FINALIZE;
splitmux->muxer_factory = g_strdup (DEFAULT_MUXER); splitmux->muxer_factory = g_strdup (DEFAULT_MUXER);
@ -566,7 +567,6 @@ gst_splitmux_sink_init (GstSplitMuxSink * splitmux)
splitmux->split_requested = FALSE; splitmux->split_requested = FALSE;
splitmux->do_split_next_gop = FALSE; splitmux->do_split_next_gop = FALSE;
splitmux->times_to_split = gst_queue_array_new_for_struct (8, 8); splitmux->times_to_split = gst_queue_array_new_for_struct (8, 8);
splitmux->last_fku_time = GST_CLOCK_TIME_NONE;
splitmux->next_fku_time = GST_CLOCK_TIME_NONE; splitmux->next_fku_time = GST_CLOCK_TIME_NONE;
} }
@ -587,6 +587,15 @@ gst_splitmux_reset_elements (GstSplitMuxSink * splitmux)
splitmux->sink = splitmux->active_sink = splitmux->muxer = NULL; splitmux->sink = splitmux->active_sink = splitmux->muxer = NULL;
} }
static void
gst_splitmux_reset_timecode (GstSplitMuxSink * splitmux)
{
g_clear_pointer (&splitmux->in_tc, gst_video_time_code_free);
g_clear_pointer (&splitmux->fragment_start_tc, gst_video_time_code_free);
g_clear_pointer (&splitmux->gop_start_tc, gst_video_time_code_free);
splitmux->next_fragment_start_tc_time = GST_CLOCK_TIME_NONE;
}
static void static void
gst_splitmux_sink_dispose (GObject * object) gst_splitmux_sink_dispose (GObject * object)
{ {
@ -639,6 +648,7 @@ gst_splitmux_sink_finalize (GObject * object)
* because the dispose will have freed all request pads though */ * because the dispose will have freed all request pads though */
g_list_foreach (splitmux->contexts, (GFunc) mq_stream_ctx_free, NULL); g_list_foreach (splitmux->contexts, (GFunc) mq_stream_ctx_free, NULL);
g_list_free (splitmux->contexts); g_list_free (splitmux->contexts);
gst_splitmux_reset_timecode (splitmux);
G_OBJECT_CLASS (parent_class)->finalize (object); G_OBJECT_CLASS (parent_class)->finalize (object);
} }
@ -711,9 +721,9 @@ gst_splitmux_sink_set_property (GObject * object, guint prop_id,
GST_OBJECT_LOCK (splitmux); GST_OBJECT_LOCK (splitmux);
g_free (splitmux->threshold_timecode_str); g_free (splitmux->threshold_timecode_str);
/* will be calculated later */ /* will be calculated later */
splitmux->threshold_timecode = GST_CLOCK_TIME_NONE;
g_clear_pointer (&splitmux->tc_interval, g_clear_pointer (&splitmux->tc_interval,
gst_video_time_code_interval_free); gst_video_time_code_interval_free);
gst_splitmux_reset_timecode (splitmux);
splitmux->threshold_timecode_str = g_value_dup_string (value); splitmux->threshold_timecode_str = g_value_dup_string (value);
if (splitmux->threshold_timecode_str) { if (splitmux->threshold_timecode_str) {
@ -1284,7 +1294,8 @@ complete_or_wait_on_out (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
static GstClockTime static GstClockTime
calculate_next_max_timecode (GstSplitMuxSink * splitmux, calculate_next_max_timecode (GstSplitMuxSink * splitmux,
const GstVideoTimeCode * cur_tc, GstClockTime running_time) const GstVideoTimeCode * cur_tc, GstClockTime running_time,
GstVideoTimeCode ** next_tc)
{ {
GstVideoTimeCode *target_tc; GstVideoTimeCode *target_tc;
GstClockTime cur_tc_time, target_tc_time, next_max_tc_time; GstClockTime cur_tc_time, target_tc_time, next_max_tc_time;
@ -1335,7 +1346,10 @@ calculate_next_max_timecode (GstSplitMuxSink * splitmux,
GST_INFO_OBJECT (splitmux, "Next max TC time: %" GST_TIME_FORMAT GST_INFO_OBJECT (splitmux, "Next max TC time: %" GST_TIME_FORMAT
" from ref TC: %" GST_TIME_FORMAT, GST_TIME_ARGS (next_max_tc_time), " from ref TC: %" GST_TIME_FORMAT, GST_TIME_ARGS (next_max_tc_time),
GST_TIME_ARGS (cur_tc_time)); GST_TIME_ARGS (cur_tc_time));
gst_video_time_code_free (target_tc); if (next_tc)
*next_tc = target_tc;
else
gst_video_time_code_free (target_tc);
return next_max_tc_time; return next_max_tc_time;
} }
@ -1347,19 +1361,32 @@ request_next_keyframe (GstSplitMuxSink * splitmux, GstBuffer * buffer,
GstEvent *ev; GstEvent *ev;
GstClockTime target_time; GstClockTime target_time;
gboolean timecode_based = FALSE; gboolean timecode_based = FALSE;
GstClockTime max_tc_time = GST_CLOCK_TIME_NONE;
GstClockTime next_max_tc_time = GST_CLOCK_TIME_NONE; GstClockTime next_max_tc_time = GST_CLOCK_TIME_NONE;
GstClockTime next_fku_time = GST_CLOCK_TIME_NONE; GstClockTime next_fku_time = GST_CLOCK_TIME_NONE;
GstClockTime tc_rounding_error = 5 * GST_USECOND;
if (!splitmux->send_keyframe_requests)
return TRUE;
if (splitmux->tc_interval) { if (splitmux->tc_interval) {
GstVideoTimeCodeMeta *tc_meta; if (splitmux->in_tc && gst_video_time_code_is_valid (splitmux->in_tc)) {
GstVideoTimeCode *next_tc = NULL;
max_tc_time =
calculate_next_max_timecode (splitmux, splitmux->in_tc,
running_time, &next_tc);
if (buffer != NULL) { /* calculate the next expected keyframe time to prevent too early fku
tc_meta = gst_buffer_get_video_time_code_meta (buffer); * event */
if (tc_meta) { if (GST_CLOCK_TIME_IS_VALID (max_tc_time) && next_tc) {
next_max_tc_time = next_max_tc_time =
calculate_next_max_timecode (splitmux, &tc_meta->tc, running_time); calculate_next_max_timecode (splitmux, next_tc, max_tc_time, NULL);
timecode_based = GST_CLOCK_TIME_IS_VALID (next_max_tc_time);
} }
if (next_tc)
gst_video_time_code_free (next_tc);
timecode_based = GST_CLOCK_TIME_IS_VALID (max_tc_time) &&
GST_CLOCK_TIME_IS_VALID (next_max_tc_time);
} else { } else {
/* This can happen in the presence of GAP events that trigger /* This can happen in the presence of GAP events that trigger
* a new fragment start */ * a new fragment start */
@ -1368,42 +1395,73 @@ request_next_keyframe (GstSplitMuxSink * splitmux, GstBuffer * buffer,
} }
} }
/* even if we don't send keyframe request, this should be done here in order if ((splitmux->threshold_time == 0 && !timecode_based)
* to calculate the threshold timecode */
if (timecode_based && !GST_CLOCK_TIME_IS_VALID (splitmux->threshold_timecode)) {
splitmux->threshold_timecode = next_max_tc_time - running_time;
GST_DEBUG_OBJECT (splitmux, "Calculated threshold timecode duration %"
GST_TIME_FORMAT, GST_TIME_ARGS (splitmux->threshold_timecode));
}
if (splitmux->send_keyframe_requests == FALSE
|| (splitmux->threshold_time == 0 && !timecode_based)
|| splitmux->threshold_bytes != 0) || splitmux->threshold_bytes != 0)
return TRUE; return TRUE;
if (timecode_based) { if (timecode_based) {
/* We might have rounding errors: aim slightly earlier */ /* We might have rounding errors: aim slightly earlier */
target_time = next_max_tc_time - 5 * GST_USECOND; if (max_tc_time >= tc_rounding_error) {
target_time = max_tc_time - tc_rounding_error;
} else {
/* unreliable target time */
GST_DEBUG_OBJECT (splitmux, "tc time %" GST_TIME_FORMAT
" is smaller than allowed rounding error, set it to zero",
GST_TIME_ARGS (max_tc_time));
target_time = 0;
}
if (next_max_tc_time >= tc_rounding_error) {
next_fku_time = next_max_tc_time - tc_rounding_error;
} else {
/* unreliable target time */
GST_DEBUG_OBJECT (splitmux, "next tc time %" GST_TIME_FORMAT
" is smaller than allowed rounding error, set it to zero",
GST_TIME_ARGS (next_max_tc_time));
next_fku_time = 0;
}
} else { } else {
target_time = running_time + splitmux->threshold_time; target_time = running_time + splitmux->threshold_time;
} }
if (GST_CLOCK_TIME_IS_VALID (splitmux->next_fku_time) && if (GST_CLOCK_TIME_IS_VALID (splitmux->next_fku_time)) {
target_time < splitmux->next_fku_time) { GstClockTime allowed_time = splitmux->next_fku_time;
GST_DEBUG_OBJECT (splitmux, "Target time %" GST_TIME_FORMAT
" is smaller than expected next keyframe time %" GST_TIME_FORMAT,
GST_TIME_ARGS (target_time), GST_TIME_ARGS (splitmux->next_fku_time));
return TRUE; if (timecode_based) {
if (allowed_time >= tc_rounding_error) {
allowed_time -= tc_rounding_error;
} else {
/* unreliable next force key unit time */
GST_DEBUG_OBJECT (splitmux, "expected next force key unit time %"
GST_TIME_FORMAT
" is smaller than allowed rounding error, set it to zero",
GST_TIME_ARGS (splitmux->next_fku_time));
allowed_time = 0;
}
}
if (target_time < allowed_time) {
GST_LOG_OBJECT (splitmux, "Target time %" GST_TIME_FORMAT
" is smaller than expected next keyframe time %" GST_TIME_FORMAT
", rounding error compensated next keyframe time %" GST_TIME_FORMAT,
GST_TIME_ARGS (target_time),
GST_TIME_ARGS (splitmux->next_fku_time),
GST_TIME_ARGS (allowed_time));
return TRUE;
} else if (allowed_time != splitmux->next_fku_time &&
target_time < splitmux->next_fku_time) {
GST_DEBUG_OBJECT (splitmux, "Target time %" GST_TIME_FORMAT
" is smaller than expected next keyframe time %" GST_TIME_FORMAT
", but the difference is smaller than allowed rounding error",
GST_TIME_ARGS (target_time), GST_TIME_ARGS (splitmux->next_fku_time));
}
} }
if (timecode_based) { if (!timecode_based) {
next_fku_time = target_time + splitmux->threshold_timecode;
} else {
next_fku_time = target_time + splitmux->threshold_time; next_fku_time = target_time + splitmux->threshold_time;
} }
splitmux->last_fku_time = target_time;
splitmux->next_fku_time = next_fku_time; splitmux->next_fku_time = next_fku_time;
ev = gst_video_event_new_upstream_force_key_unit (target_time, TRUE, 0); ev = gst_video_event_new_upstream_force_key_unit (target_time, TRUE, 0);
@ -2098,11 +2156,16 @@ need_new_fragment (GstSplitMuxSink * splitmux,
return TRUE; /* Would overrun time limit */ return TRUE; /* Would overrun time limit */
} }
/* 5us possible rounding error was already accounted around keyframe request */ if (splitmux->tc_interval &&
if (splitmux->threshold_timecode != GST_CLOCK_TIME_NONE && GST_CLOCK_TIME_IS_VALID (splitmux->next_fragment_start_tc_time) &&
(queued_time > splitmux->threshold_timecode)) { splitmux->reference_ctx->in_running_time >
GST_TRACE_OBJECT (splitmux, "Splitting at timecode mark"); splitmux->next_fragment_start_tc_time + 5 * GST_USECOND) {
return TRUE; /* Timecode threshold */ GST_TRACE_OBJECT (splitmux,
"in running time %" GST_STIME_FORMAT " overruns time limit %"
GST_TIME_FORMAT,
GST_STIME_ARGS (splitmux->reference_ctx->in_running_time),
GST_TIME_ARGS (splitmux->next_fragment_start_tc_time));
return TRUE;
} }
if (check_robust_muxing) { if (check_robust_muxing) {
@ -2129,6 +2192,26 @@ need_new_fragment (GstSplitMuxSink * splitmux,
return FALSE; return FALSE;
} }
/* probably we want to add this API? */
static void
video_time_code_replace (GstVideoTimeCode ** old_tc, GstVideoTimeCode * new_tc)
{
GstVideoTimeCode *timecode = NULL;
g_return_if_fail (old_tc != NULL);
if (*old_tc == new_tc)
return;
if (new_tc)
timecode = gst_video_time_code_copy (new_tc);
if (*old_tc)
gst_video_time_code_free (*old_tc);
*old_tc = timecode;
}
/* Called with splitmux lock held */ /* Called with splitmux lock held */
/* Called when entering ProcessingCompleteGop state /* Called when entering ProcessingCompleteGop state
* Assess if mq contents overflowed the current file * Assess if mq contents overflowed the current file
@ -2176,7 +2259,11 @@ handle_gathered_gop (GstSplitMuxSink * splitmux)
queued_bytes += (queued_bytes * splitmux->mux_overhead); queued_bytes += (queued_bytes * splitmux->mux_overhead);
GST_LOG_OBJECT (splitmux, "mq at TS %" GST_STIME_FORMAT GST_LOG_OBJECT (splitmux, "mq at TS %" GST_STIME_FORMAT
" bytes %" G_GUINT64_FORMAT, GST_STIME_ARGS (queued_time), queued_bytes); " bytes %" G_GUINT64_FORMAT " in running time %" GST_STIME_FORMAT
" gop start time %" GST_STIME_FORMAT,
GST_STIME_ARGS (queued_time), queued_bytes,
GST_STIME_ARGS (splitmux->reference_ctx->in_running_time),
GST_STIME_ARGS (splitmux->gop_start_time));
/* Check for overrun - have we output at least one byte and overrun /* Check for overrun - have we output at least one byte and overrun
* either threshold? */ * either threshold? */
@ -2202,12 +2289,28 @@ handle_gathered_gop (GstSplitMuxSink * splitmux)
new_out_ts = splitmux->reference_ctx->in_running_time; new_out_ts = splitmux->reference_ctx->in_running_time;
splitmux->fragment_start_time = splitmux->gop_start_time; splitmux->fragment_start_time = splitmux->gop_start_time;
splitmux->fragment_total_bytes = 0; splitmux->fragment_total_bytes = 0;
if (splitmux->tc_interval) {
video_time_code_replace (&splitmux->fragment_start_tc,
splitmux->gop_start_tc);
splitmux->next_fragment_start_tc_time =
calculate_next_max_timecode (splitmux, splitmux->fragment_start_tc,
splitmux->fragment_start_time, NULL);
if (!GST_CLOCK_TIME_IS_VALID (splitmux->next_fragment_start_tc_time)) {
GST_WARNING_OBJECT (splitmux,
"Couldn't calculate next fragment start time for timecode mode");
/* shouldn't happen, but reset all and try again with next buffers */
gst_splitmux_reset_timecode (splitmux);
}
}
} }
/* And set up to collect the next GOP */ /* And set up to collect the next GOP */
if (!splitmux->reference_ctx->in_eos) { if (!splitmux->reference_ctx->in_eos) {
splitmux->input_state = SPLITMUX_INPUT_STATE_COLLECTING_GOP_START; splitmux->input_state = SPLITMUX_INPUT_STATE_COLLECTING_GOP_START;
splitmux->gop_start_time = new_out_ts; splitmux->gop_start_time = new_out_ts;
if (splitmux->tc_interval)
video_time_code_replace (&splitmux->gop_start_tc, splitmux->in_tc);
} else { } else {
/* This is probably already the current state, but just in case: */ /* This is probably already the current state, but just in case: */
splitmux->input_state = SPLITMUX_INPUT_STATE_FINISHING_UP; splitmux->input_state = SPLITMUX_INPUT_STATE_FINISHING_UP;
@ -2496,6 +2599,26 @@ handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
splitmux->max_in_running_time = splitmux->fragment_start_time; splitmux->max_in_running_time = splitmux->fragment_start_time;
} }
if (splitmux->tc_interval) {
GstVideoTimeCodeMeta *tc_meta = gst_buffer_get_video_time_code_meta (buf);
if (tc_meta) {
video_time_code_replace (&splitmux->in_tc, &tc_meta->tc);
if (!splitmux->fragment_start_tc) {
/* also initialize fragment_start_tc */
video_time_code_replace (&splitmux->gop_start_tc, &tc_meta->tc);
video_time_code_replace (&splitmux->fragment_start_tc, &tc_meta->tc);
splitmux->next_fragment_start_tc_time =
calculate_next_max_timecode (splitmux, splitmux->in_tc,
ctx->in_running_time, NULL);
GST_DEBUG_OBJECT (splitmux, "Initialize next fragment start tc time %"
GST_TIME_FORMAT,
GST_TIME_ARGS (splitmux->next_fragment_start_tc_time));
}
}
}
/* Check whether we need to request next keyframe depending on /* Check whether we need to request next keyframe depending on
* current running time */ * current running time */
if (!GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT) && if (!GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT) &&
@ -3292,7 +3415,6 @@ gst_splitmux_sink_reset (GstSplitMuxSink * splitmux)
g_atomic_int_set (&(splitmux->split_requested), FALSE); g_atomic_int_set (&(splitmux->split_requested), FALSE);
g_atomic_int_set (&(splitmux->do_split_next_gop), FALSE); g_atomic_int_set (&(splitmux->do_split_next_gop), FALSE);
splitmux->last_fku_time = GST_CLOCK_TIME_NONE;
splitmux->next_fku_time = GST_CLOCK_TIME_NONE; splitmux->next_fku_time = GST_CLOCK_TIME_NONE;
gst_queue_array_clear (splitmux->times_to_split); gst_queue_array_clear (splitmux->times_to_split);

View file

@ -118,12 +118,7 @@ struct _GstSplitMuxSink
gchar *threshold_timecode_str; gchar *threshold_timecode_str;
/* created from threshold_timecode_str */ /* created from threshold_timecode_str */
GstVideoTimeCodeInterval *tc_interval; GstVideoTimeCodeInterval *tc_interval;
/* allowed max size of queued time based on timecode */
GstClockTime threshold_timecode;
GstClockTime next_max_tc_time;
GstClockTime alignment_threshold; GstClockTime alignment_threshold;
/* previously sent running time of force keyframe unit event */
GstClockTime last_fku_time;
/* expected running time of next force keyframe unit event */ /* expected running time of next force keyframe unit event */
GstClockTime next_fku_time; GstClockTime next_fku_time;
@ -156,6 +151,14 @@ struct _GstSplitMuxSink
GstClockTimeDiff fragment_start_time; GstClockTimeDiff fragment_start_time;
/* Start time of the current GOP */ /* Start time of the current GOP */
GstClockTimeDiff gop_start_time; GstClockTimeDiff gop_start_time;
/* The last timecode we have */
GstVideoTimeCode *in_tc;
/* Start timecode of the current fragment */
GstVideoTimeCode *fragment_start_tc;
/* Start timecode of the current GOP */
GstVideoTimeCode *gop_start_tc;
/* expected running time of next fragment in timecode mode */
GstClockTime next_fragment_start_tc_time;
GQueue out_cmd_q; /* Queue of commands for output thread */ GQueue out_cmd_q; /* Queue of commands for output thread */