splitmuxsink: Handle negative running time

Use signed clock times for running time everywhere
so that we handle negative running times without
going haywire, similar to what queue and multiqueue
do these days.
This commit is contained in:
Jan Schmidt 2016-07-17 22:41:02 +10:00
parent e2505dd7df
commit 6755691b28
2 changed files with 106 additions and 72 deletions

View file

@ -399,6 +399,23 @@ gst_splitmux_sink_get_property (GObject * object, guint prop_id,
}
}
/* Convenience function */
static inline GstClockTimeDiff
my_segment_to_running_time (GstSegment * segment, GstClockTime val)
{
GstClockTimeDiff res = GST_CLOCK_STIME_NONE;
if (GST_CLOCK_TIME_IS_VALID (val)) {
gboolean sign =
gst_segment_to_running_time_full (segment, GST_FORMAT_TIME, val, &val);
if (sign > 0)
res = val;
else if (sign < 0)
res = -val;
}
return res;
}
static GstPad *
mq_sink_to_src (GstElement * mq, GstPad * sink_pad)
{
@ -454,7 +471,7 @@ mq_stream_ctx_new (GstSplitMuxSink * splitmux)
ctx->splitmux = splitmux;
gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
ctx->in_running_time = ctx->out_running_time = 0;
ctx->in_running_time = ctx->out_running_time = GST_CLOCK_STIME_NONE;
g_queue_init (&ctx->queued_bufs);
return ctx;
}
@ -546,11 +563,11 @@ complete_or_wait_on_out (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
do {
GST_LOG_OBJECT (ctx->srcpad,
"Checking running time %" GST_TIME_FORMAT " against max %"
GST_TIME_FORMAT, GST_TIME_ARGS (ctx->out_running_time),
GST_TIME_ARGS (splitmux->max_out_running_time));
"Checking running time %" GST_STIME_FORMAT " against max %"
GST_STIME_FORMAT, GST_STIME_ARGS (ctx->out_running_time),
GST_STIME_ARGS (splitmux->max_out_running_time));
if (splitmux->max_out_running_time == GST_CLOCK_TIME_NONE ||
if (splitmux->max_out_running_time == GST_CLOCK_STIME_NONE ||
ctx->out_running_time < splitmux->max_out_running_time) {
splitmux->have_muxed_something = TRUE;
return;
@ -571,17 +588,17 @@ complete_or_wait_on_out (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
GST_INFO_OBJECT (ctx->srcpad,
"Sleeping for running time %"
GST_TIME_FORMAT " (max %" GST_TIME_FORMAT ")",
GST_TIME_ARGS (ctx->out_running_time),
GST_TIME_ARGS (splitmux->max_out_running_time));
GST_STIME_FORMAT " (max %" GST_STIME_FORMAT ")",
GST_STIME_ARGS (ctx->out_running_time),
GST_STIME_ARGS (splitmux->max_out_running_time));
ctx->out_blocked = TRUE;
/* Expand the mq if needed before sleeping */
check_queue_length (splitmux, ctx);
GST_SPLITMUX_WAIT (splitmux);
ctx->out_blocked = FALSE;
GST_INFO_OBJECT (ctx->srcpad,
"Woken for new max running time %" GST_TIME_FORMAT,
GST_TIME_ARGS (splitmux->max_out_running_time));
"Woken for new max running time %" GST_STIME_FORMAT,
GST_STIME_ARGS (splitmux->max_out_running_time));
} while (1);
}
@ -631,6 +648,7 @@ handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
break;
case GST_EVENT_GAP:{
GstClockTime gap_ts;
GstClockTimeDiff rtime;
gst_event_parse_gap (event, &gap_ts, NULL);
if (gap_ts == GST_CLOCK_TIME_NONE)
@ -638,28 +656,30 @@ handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
GST_SPLITMUX_LOCK (splitmux);
gap_ts = gst_segment_to_running_time (&ctx->out_segment,
GST_FORMAT_TIME, gap_ts);
rtime = my_segment_to_running_time (&ctx->out_segment, gap_ts);
GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_TIME_FORMAT,
GST_TIME_ARGS (gap_ts));
GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_STIME_FORMAT,
GST_STIME_ARGS (rtime));
if (splitmux->state == SPLITMUX_STATE_STOPPED)
goto beach;
ctx->out_running_time = gap_ts;
complete_or_wait_on_out (splitmux, ctx);
if (rtime != GST_CLOCK_STIME_NONE) {
ctx->out_running_time = rtime;
complete_or_wait_on_out (splitmux, ctx);
}
GST_SPLITMUX_UNLOCK (splitmux);
break;
}
case GST_EVENT_CUSTOM_DOWNSTREAM:{
const GstStructure *s;
GstClockTime ts = 0;
GstClockTimeDiff ts = 0;
s = gst_event_get_structure (event);
if (!gst_structure_has_name (s, "splitmuxsink-unblock"))
break;
gst_structure_get_uint64 (s, "timestamp", &ts);
gst_structure_get_int64 (s, "timestamp", &ts);
GST_SPLITMUX_LOCK (splitmux);
@ -691,9 +711,9 @@ handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
ctx->out_running_time = buf_info->run_ts;
GST_LOG_OBJECT (splitmux,
"Pad %" GST_PTR_FORMAT " buffer with TS %" GST_TIME_FORMAT
"Pad %" GST_PTR_FORMAT " buffer with run TS %" GST_STIME_FORMAT
" size %" G_GSIZE_FORMAT,
pad, GST_TIME_ARGS (ctx->out_running_time), buf_info->buf_size);
pad, GST_STIME_ARGS (ctx->out_running_time), buf_info->buf_size);
if (splitmux->opening_first_fragment) {
send_fragment_opened_closed_msg (splitmux, TRUE);
@ -702,7 +722,7 @@ handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
complete_or_wait_on_out (splitmux, ctx);
if (splitmux->muxed_out_time == GST_CLOCK_TIME_NONE ||
if (splitmux->muxed_out_time == GST_CLOCK_STIME_NONE ||
splitmux->muxed_out_time < buf_info->run_ts)
splitmux->muxed_out_time = buf_info->run_ts;
@ -712,8 +732,8 @@ handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
{
GstBuffer *buf = gst_pad_probe_info_get_buffer (info);
GST_LOG_OBJECT (pad, "Returning to pass buffer %" GST_PTR_FORMAT
" run ts %" GST_TIME_FORMAT, buf,
GST_TIME_ARGS (ctx->out_running_time));
" run ts %" GST_STIME_FORMAT, buf,
GST_STIME_ARGS (ctx->out_running_time));
}
#endif
@ -776,7 +796,7 @@ start_next_fragment (GstSplitMuxSink * splitmux)
splitmux->max_out_running_time = splitmux->reference_ctx->in_running_time;
} else {
splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
splitmux->max_out_running_time = GST_CLOCK_TIME_NONE;
splitmux->max_out_running_time = GST_CLOCK_STIME_NONE;
splitmux->have_muxed_something = FALSE;
}
splitmux->have_muxed_something =
@ -787,8 +807,8 @@ start_next_fragment (GstSplitMuxSink * splitmux)
splitmux->mux_start_bytes = splitmux->muxed_out_bytes;
GST_DEBUG_OBJECT (splitmux,
"Restarting flow for new fragment. New running time %" GST_TIME_FORMAT,
GST_TIME_ARGS (splitmux->max_out_running_time));
"Restarting flow for new fragment. New running time %" GST_STIME_FORMAT,
GST_STIME_ARGS (splitmux->max_out_running_time));
send_fragment_opened_closed_msg (splitmux, TRUE);
@ -808,7 +828,7 @@ bus_handler (GstBin * bin, GstMessage * message)
send_fragment_opened_closed_msg (splitmux, FALSE);
if (splitmux->state == SPLITMUX_STATE_ENDING_FILE &&
splitmux->max_out_running_time != GST_CLOCK_TIME_NONE) {
splitmux->max_out_running_time != GST_CLOCK_STIME_NONE) {
GST_DEBUG_OBJECT (splitmux, "Caught EOS at end of fragment, dropping");
splitmux->state = SPLITMUX_STATE_START_NEXT_FRAGMENT;
GST_SPLITMUX_BROADCAST (splitmux);
@ -838,7 +858,7 @@ handle_gathered_gop (GstSplitMuxSink * splitmux)
{
GList *cur;
gsize queued_bytes = 0;
GstClockTime queued_time = 0;
GstClockTimeDiff queued_time = 0;
/* Assess if the multiqueue contents overflowed the current file */
for (cur = g_list_first (splitmux->contexts);
@ -858,8 +878,8 @@ handle_gathered_gop (GstSplitMuxSink * splitmux)
/* Expand queued bytes estimate by muxer overhead */
queued_bytes += (queued_bytes * splitmux->mux_overhead);
GST_LOG_OBJECT (splitmux, "mq at TS %" GST_TIME_FORMAT
" bytes %" G_GSIZE_FORMAT, GST_TIME_ARGS (queued_time), queued_bytes);
GST_LOG_OBJECT (splitmux, "mq at TS %" GST_STIME_FORMAT
" bytes %" G_GSIZE_FORMAT, GST_STIME_ARGS (queued_time), queued_bytes);
/* Check for overrun - have we output at least one byte and overrun
* either threshold? */
@ -873,16 +893,16 @@ handle_gathered_gop (GstSplitMuxSink * splitmux)
GST_INFO_OBJECT (splitmux,
"mq overflowed since last, draining out. max out TS is %"
GST_TIME_FORMAT, GST_TIME_ARGS (splitmux->max_out_running_time));
GST_STIME_FORMAT, GST_STIME_ARGS (splitmux->max_out_running_time));
GST_SPLITMUX_BROADCAST (splitmux);
} else {
/* No overflow */
GST_LOG_OBJECT (splitmux,
"This GOP didn't overflow the fragment. Bytes sent %" G_GSIZE_FORMAT
" queued %" G_GSIZE_FORMAT " time %" GST_TIME_FORMAT " Continuing.",
" queued %" G_GSIZE_FORMAT " time %" GST_STIME_FORMAT " Continuing.",
splitmux->muxed_out_bytes - splitmux->mux_start_bytes,
queued_bytes, GST_TIME_ARGS (queued_time));
queued_bytes, GST_STIME_ARGS (queued_time));
/* Wake everyone up to push this one GOP, then sleep */
splitmux->have_muxed_something = TRUE;
@ -892,11 +912,11 @@ handle_gathered_gop (GstSplitMuxSink * splitmux)
splitmux->max_out_running_time = splitmux->reference_ctx->in_running_time;
} else {
splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
splitmux->max_out_running_time = GST_CLOCK_TIME_NONE;
splitmux->max_out_running_time = GST_CLOCK_STIME_NONE;
}
GST_LOG_OBJECT (splitmux, "Waking output for complete GOP, TS %"
GST_TIME_FORMAT, GST_TIME_ARGS (splitmux->max_out_running_time));
GST_STIME_FORMAT, GST_STIME_ARGS (splitmux->max_out_running_time));
GST_SPLITMUX_BROADCAST (splitmux);
}
@ -912,24 +932,24 @@ check_completed_gop (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
{
GList *cur;
gboolean ready = TRUE;
GstClockTime current_max_in_running_time;
GstClockTimeDiff current_max_in_running_time;
if (splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE) {
/* Iterate each pad, and check that the input running time is at least
* up to the reference running time, and if so handle the collected GOP */
GST_LOG_OBJECT (splitmux, "Checking GOP collected, Max in running time %"
GST_TIME_FORMAT " ctx %p",
GST_TIME_ARGS (splitmux->max_in_running_time), ctx);
GST_STIME_FORMAT " ctx %p",
GST_STIME_ARGS (splitmux->max_in_running_time), ctx);
for (cur = g_list_first (splitmux->contexts); cur != NULL;
cur = g_list_next (cur)) {
MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
GST_LOG_OBJECT (splitmux,
"Context %p (src pad %" GST_PTR_FORMAT ") TS %" GST_TIME_FORMAT
"Context %p (src pad %" GST_PTR_FORMAT ") TS %" GST_STIME_FORMAT
" EOS %d", tmpctx, tmpctx->srcpad,
GST_TIME_ARGS (tmpctx->in_running_time), tmpctx->in_eos);
GST_STIME_ARGS (tmpctx->in_running_time), tmpctx->in_eos);
if (splitmux->max_in_running_time != GST_CLOCK_TIME_NONE &&
if (splitmux->max_in_running_time != G_MAXINT64 &&
tmpctx->in_running_time < splitmux->max_in_running_time &&
!tmpctx->in_eos) {
GST_LOG_OBJECT (splitmux,
@ -1050,7 +1070,7 @@ handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
ctx->in_eos = FALSE;
ctx->in_bytes = 0;
ctx->in_running_time = 0;
ctx->in_running_time = GST_CLOCK_STIME_NONE;
GST_SPLITMUX_UNLOCK (splitmux);
break;
case GST_EVENT_EOS:
@ -1063,7 +1083,7 @@ handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
if (ctx->is_reference) {
GST_INFO_OBJECT (splitmux, "Got Reference EOS. Finishing up");
/* Act as if this is a new keyframe with infinite timestamp */
splitmux->max_in_running_time = GST_CLOCK_TIME_NONE;
splitmux->max_in_running_time = G_MAXINT64;
splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
/* Wake up other input pads to collect this GOP */
GST_SPLITMUX_BROADCAST (splitmux);
@ -1091,6 +1111,8 @@ handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
else
ts = GST_BUFFER_DTS (buf);
GST_LOG_OBJECT (pad, "Buffer TS is %" GST_STIME_FORMAT, GST_STIME_ARGS (ts));
GST_SPLITMUX_LOCK (splitmux);
if (splitmux->state == SPLITMUX_STATE_STOPPED)
@ -1099,23 +1121,27 @@ handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
/* If this buffer has a timestamp, advance the input timestamp of the
* stream */
if (GST_CLOCK_TIME_IS_VALID (ts)) {
GstClockTime running_time =
gst_segment_to_running_time (&ctx->in_segment, GST_FORMAT_TIME,
GstClockTimeDiff running_time =
my_segment_to_running_time (&ctx->in_segment,
GST_BUFFER_TIMESTAMP (buf));
if (GST_CLOCK_TIME_IS_VALID (running_time) &&
(ctx->in_running_time == GST_CLOCK_TIME_NONE
|| running_time > ctx->in_running_time))
GST_LOG_OBJECT (pad, "Buffer running TS is %" GST_STIME_FORMAT,
GST_STIME_ARGS (running_time));
if (GST_CLOCK_STIME_IS_VALID (running_time)
&& running_time > ctx->in_running_time)
ctx->in_running_time = running_time;
}
/* Try to make sure we have a valid running time */
if (!GST_CLOCK_TIME_IS_VALID (ctx->in_running_time)) {
if (!GST_CLOCK_STIME_IS_VALID (ctx->in_running_time)) {
ctx->in_running_time =
gst_segment_to_running_time (&ctx->in_segment, GST_FORMAT_TIME,
ctx->in_segment.start);
my_segment_to_running_time (&ctx->in_segment, ctx->in_segment.start);
}
GST_LOG_OBJECT (pad, "in running time now %" GST_STIME_FORMAT,
GST_STIME_ARGS (ctx->in_running_time));
buf_info->run_ts = ctx->in_running_time;
buf_info->buf_size = gst_buffer_get_size (buf);
@ -1123,12 +1149,19 @@ handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
ctx->in_bytes += buf_info->buf_size;
/* initialize mux_start_time */
if (ctx->is_reference && splitmux->mux_start_time == 0)
if (ctx->is_reference && splitmux->mux_start_time == GST_CLOCK_STIME_NONE) {
splitmux->mux_start_time = buf_info->run_ts;
GST_LOG_OBJECT (splitmux, "Mux start time now %" GST_STIME_FORMAT,
GST_STIME_ARGS (splitmux->mux_start_time));
/* Also take this as the first start time when starting up,
* so that we start counting overflow from the first frame */
if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time))
splitmux->max_in_running_time = splitmux->mux_start_time;
}
GST_DEBUG_OBJECT (pad, "Buf TS %" GST_TIME_FORMAT
GST_DEBUG_OBJECT (pad, "Buf TS %" GST_STIME_FORMAT
" total in_bytes %" G_GSIZE_FORMAT,
GST_TIME_ARGS (buf_info->run_ts), ctx->in_bytes);
GST_STIME_ARGS (buf_info->run_ts), ctx->in_bytes);
loop_again = TRUE;
do {
@ -1140,15 +1173,15 @@ handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
if (ctx->is_reference) {
/* If a keyframe, we have a complete GOP */
if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT) ||
!GST_CLOCK_TIME_IS_VALID (ctx->in_running_time) ||
!GST_CLOCK_STIME_IS_VALID (ctx->in_running_time) ||
splitmux->max_in_running_time >= ctx->in_running_time) {
/* Pass this buffer through */
loop_again = FALSE;
break;
}
GST_INFO_OBJECT (pad,
"Have keyframe with running time %" GST_TIME_FORMAT,
GST_TIME_ARGS (ctx->in_running_time));
"Have keyframe with running time %" GST_STIME_FORMAT,
GST_STIME_ARGS (ctx->in_running_time));
keyframe = TRUE;
splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
splitmux->max_in_running_time = ctx->in_running_time;
@ -1164,14 +1197,14 @@ handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
}
break;
case SPLITMUX_STATE_WAITING_GOP_COMPLETE:
/* After a GOP start is found, this buffer might complete the GOP */
/* If we overran the target timestamp, it might be time to process
* the GOP, otherwise bail out for more data
*/
GST_LOG_OBJECT (pad,
"Checking TS %" GST_TIME_FORMAT " against max %" GST_TIME_FORMAT,
GST_TIME_ARGS (ctx->in_running_time),
GST_TIME_ARGS (splitmux->max_in_running_time));
"Checking TS %" GST_STIME_FORMAT " against max %" GST_STIME_FORMAT,
GST_STIME_ARGS (ctx->in_running_time),
GST_STIME_ARGS (splitmux->max_in_running_time));
if (ctx->in_running_time < splitmux->max_in_running_time) {
loop_again = FALSE;
@ -1195,7 +1228,7 @@ handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM |
GST_EVENT_TYPE_SERIALIZED,
gst_structure_new ("splitmuxsink-unblock", "timestamp",
G_TYPE_UINT64, splitmux->max_in_running_time, NULL));
G_TYPE_INT64, splitmux->max_in_running_time, NULL));
GST_SPLITMUX_UNLOCK (splitmux);
gst_pad_send_event (ctx->sinkpad, event);
@ -1227,7 +1260,7 @@ handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
check_queue_length (splitmux, ctx);
GST_LOG_OBJECT (pad, "Returning to queue buffer %" GST_PTR_FORMAT
" run ts %" GST_TIME_FORMAT, buf, GST_TIME_ARGS (ctx->in_running_time));
" run ts %" GST_STIME_FORMAT, buf, GST_STIME_ARGS (ctx->in_running_time));
GST_SPLITMUX_UNLOCK (splitmux);
return GST_PAD_PROBE_PASS;
@ -1605,8 +1638,9 @@ gst_splitmux_sink_change_state (GstElement * element, GstStateChange transition)
GST_SPLITMUX_LOCK (splitmux);
/* Start by collecting one input on each pad */
splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START;
splitmux->max_in_running_time = 0;
splitmux->muxed_out_time = splitmux->mux_start_time = 0;
splitmux->max_in_running_time = GST_CLOCK_STIME_NONE;
splitmux->muxed_out_time = splitmux->mux_start_time =
GST_CLOCK_STIME_NONE;
splitmux->muxed_out_bytes = splitmux->mux_start_bytes = 0;
splitmux->opening_first_fragment = TRUE;
GST_SPLITMUX_UNLOCK (splitmux);

View file

@ -48,7 +48,7 @@ typedef enum _SplitMuxState {
typedef struct _MqStreamBuf
{
gboolean keyframe;
GstClockTime run_ts;
GstClockTimeDiff run_ts;
gsize buf_size;
} MqStreamBuf;
@ -70,8 +70,8 @@ typedef struct _MqStreamCtx
GstSegment in_segment;
GstSegment out_segment;
GstClockTime in_running_time;
GstClockTime out_running_time;
GstClockTimeDiff in_running_time;
GstClockTimeDiff out_running_time;
gsize in_bytes;
@ -114,14 +114,14 @@ struct _GstSplitMuxSink {
MqStreamCtx *reference_ctx;
guint queued_gops;
GstClockTime max_in_running_time;
GstClockTime max_out_running_time;
GstClockTimeDiff max_in_running_time;
GstClockTimeDiff max_out_running_time;
GstClockTime muxed_out_time;
GstClockTimeDiff muxed_out_time;
gsize muxed_out_bytes;
gboolean have_muxed_something;
GstClockTime mux_start_time;
GstClockTimeDiff mux_start_time;
gsize mux_start_bytes;
gboolean opening_first_fragment;