diff --git a/gst/multifile/gstsplitmuxsink.c b/gst/multifile/gstsplitmuxsink.c index f76b8f16f7..8d5d70fe20 100644 --- a/gst/multifile/gstsplitmuxsink.c +++ b/gst/multifile/gstsplitmuxsink.c @@ -399,6 +399,23 @@ gst_splitmux_sink_get_property (GObject * object, guint prop_id, } } +/* Convenience function */ +static inline GstClockTimeDiff +my_segment_to_running_time (GstSegment * segment, GstClockTime val) +{ + GstClockTimeDiff res = GST_CLOCK_STIME_NONE; + + if (GST_CLOCK_TIME_IS_VALID (val)) { + gboolean sign = + gst_segment_to_running_time_full (segment, GST_FORMAT_TIME, val, &val); + if (sign > 0) + res = val; + else if (sign < 0) + res = -val; + } + return res; +} + static GstPad * mq_sink_to_src (GstElement * mq, GstPad * sink_pad) { @@ -454,7 +471,7 @@ mq_stream_ctx_new (GstSplitMuxSink * splitmux) ctx->splitmux = splitmux; gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED); gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED); - ctx->in_running_time = ctx->out_running_time = 0; + ctx->in_running_time = ctx->out_running_time = GST_CLOCK_STIME_NONE; g_queue_init (&ctx->queued_bufs); return ctx; } @@ -546,11 +563,11 @@ complete_or_wait_on_out (GstSplitMuxSink * splitmux, MqStreamCtx * ctx) do { GST_LOG_OBJECT (ctx->srcpad, - "Checking running time %" GST_TIME_FORMAT " against max %" - GST_TIME_FORMAT, GST_TIME_ARGS (ctx->out_running_time), - GST_TIME_ARGS (splitmux->max_out_running_time)); + "Checking running time %" GST_STIME_FORMAT " against max %" + GST_STIME_FORMAT, GST_STIME_ARGS (ctx->out_running_time), + GST_STIME_ARGS (splitmux->max_out_running_time)); - if (splitmux->max_out_running_time == GST_CLOCK_TIME_NONE || + if (splitmux->max_out_running_time == GST_CLOCK_STIME_NONE || ctx->out_running_time < splitmux->max_out_running_time) { splitmux->have_muxed_something = TRUE; return; @@ -571,17 +588,17 @@ complete_or_wait_on_out (GstSplitMuxSink * splitmux, MqStreamCtx * ctx) GST_INFO_OBJECT (ctx->srcpad, "Sleeping for running time %" - GST_TIME_FORMAT " (max %" GST_TIME_FORMAT ")", - GST_TIME_ARGS (ctx->out_running_time), - GST_TIME_ARGS (splitmux->max_out_running_time)); + GST_STIME_FORMAT " (max %" GST_STIME_FORMAT ")", + GST_STIME_ARGS (ctx->out_running_time), + GST_STIME_ARGS (splitmux->max_out_running_time)); ctx->out_blocked = TRUE; /* Expand the mq if needed before sleeping */ check_queue_length (splitmux, ctx); GST_SPLITMUX_WAIT (splitmux); ctx->out_blocked = FALSE; GST_INFO_OBJECT (ctx->srcpad, - "Woken for new max running time %" GST_TIME_FORMAT, - GST_TIME_ARGS (splitmux->max_out_running_time)); + "Woken for new max running time %" GST_STIME_FORMAT, + GST_STIME_ARGS (splitmux->max_out_running_time)); } while (1); } @@ -631,6 +648,7 @@ handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx) break; case GST_EVENT_GAP:{ GstClockTime gap_ts; + GstClockTimeDiff rtime; gst_event_parse_gap (event, &gap_ts, NULL); if (gap_ts == GST_CLOCK_TIME_NONE) @@ -638,28 +656,30 @@ handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx) GST_SPLITMUX_LOCK (splitmux); - gap_ts = gst_segment_to_running_time (&ctx->out_segment, - GST_FORMAT_TIME, gap_ts); + rtime = my_segment_to_running_time (&ctx->out_segment, gap_ts); - GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_TIME_FORMAT, - GST_TIME_ARGS (gap_ts)); + GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_STIME_FORMAT, + GST_STIME_ARGS (rtime)); if (splitmux->state == SPLITMUX_STATE_STOPPED) goto beach; - ctx->out_running_time = gap_ts; - complete_or_wait_on_out (splitmux, ctx); + + if (rtime != GST_CLOCK_STIME_NONE) { + ctx->out_running_time = rtime; + complete_or_wait_on_out (splitmux, ctx); + } GST_SPLITMUX_UNLOCK (splitmux); break; } case GST_EVENT_CUSTOM_DOWNSTREAM:{ const GstStructure *s; - GstClockTime ts = 0; + GstClockTimeDiff ts = 0; s = gst_event_get_structure (event); if (!gst_structure_has_name (s, "splitmuxsink-unblock")) break; - gst_structure_get_uint64 (s, "timestamp", &ts); + gst_structure_get_int64 (s, "timestamp", &ts); GST_SPLITMUX_LOCK (splitmux); @@ -691,9 +711,9 @@ handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx) ctx->out_running_time = buf_info->run_ts; GST_LOG_OBJECT (splitmux, - "Pad %" GST_PTR_FORMAT " buffer with TS %" GST_TIME_FORMAT + "Pad %" GST_PTR_FORMAT " buffer with run TS %" GST_STIME_FORMAT " size %" G_GSIZE_FORMAT, - pad, GST_TIME_ARGS (ctx->out_running_time), buf_info->buf_size); + pad, GST_STIME_ARGS (ctx->out_running_time), buf_info->buf_size); if (splitmux->opening_first_fragment) { send_fragment_opened_closed_msg (splitmux, TRUE); @@ -702,7 +722,7 @@ handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx) complete_or_wait_on_out (splitmux, ctx); - if (splitmux->muxed_out_time == GST_CLOCK_TIME_NONE || + if (splitmux->muxed_out_time == GST_CLOCK_STIME_NONE || splitmux->muxed_out_time < buf_info->run_ts) splitmux->muxed_out_time = buf_info->run_ts; @@ -712,8 +732,8 @@ handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx) { GstBuffer *buf = gst_pad_probe_info_get_buffer (info); GST_LOG_OBJECT (pad, "Returning to pass buffer %" GST_PTR_FORMAT - " run ts %" GST_TIME_FORMAT, buf, - GST_TIME_ARGS (ctx->out_running_time)); + " run ts %" GST_STIME_FORMAT, buf, + GST_STIME_ARGS (ctx->out_running_time)); } #endif @@ -776,7 +796,7 @@ start_next_fragment (GstSplitMuxSink * splitmux) splitmux->max_out_running_time = splitmux->reference_ctx->in_running_time; } else { splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE; - splitmux->max_out_running_time = GST_CLOCK_TIME_NONE; + splitmux->max_out_running_time = GST_CLOCK_STIME_NONE; splitmux->have_muxed_something = FALSE; } splitmux->have_muxed_something = @@ -787,8 +807,8 @@ start_next_fragment (GstSplitMuxSink * splitmux) splitmux->mux_start_bytes = splitmux->muxed_out_bytes; GST_DEBUG_OBJECT (splitmux, - "Restarting flow for new fragment. New running time %" GST_TIME_FORMAT, - GST_TIME_ARGS (splitmux->max_out_running_time)); + "Restarting flow for new fragment. New running time %" GST_STIME_FORMAT, + GST_STIME_ARGS (splitmux->max_out_running_time)); send_fragment_opened_closed_msg (splitmux, TRUE); @@ -808,7 +828,7 @@ bus_handler (GstBin * bin, GstMessage * message) send_fragment_opened_closed_msg (splitmux, FALSE); if (splitmux->state == SPLITMUX_STATE_ENDING_FILE && - splitmux->max_out_running_time != GST_CLOCK_TIME_NONE) { + splitmux->max_out_running_time != GST_CLOCK_STIME_NONE) { GST_DEBUG_OBJECT (splitmux, "Caught EOS at end of fragment, dropping"); splitmux->state = SPLITMUX_STATE_START_NEXT_FRAGMENT; GST_SPLITMUX_BROADCAST (splitmux); @@ -838,7 +858,7 @@ handle_gathered_gop (GstSplitMuxSink * splitmux) { GList *cur; gsize queued_bytes = 0; - GstClockTime queued_time = 0; + GstClockTimeDiff queued_time = 0; /* Assess if the multiqueue contents overflowed the current file */ for (cur = g_list_first (splitmux->contexts); @@ -858,8 +878,8 @@ handle_gathered_gop (GstSplitMuxSink * splitmux) /* Expand queued bytes estimate by muxer overhead */ queued_bytes += (queued_bytes * splitmux->mux_overhead); - GST_LOG_OBJECT (splitmux, "mq at TS %" GST_TIME_FORMAT - " bytes %" G_GSIZE_FORMAT, GST_TIME_ARGS (queued_time), queued_bytes); + GST_LOG_OBJECT (splitmux, "mq at TS %" GST_STIME_FORMAT + " bytes %" G_GSIZE_FORMAT, GST_STIME_ARGS (queued_time), queued_bytes); /* Check for overrun - have we output at least one byte and overrun * either threshold? */ @@ -873,16 +893,16 @@ handle_gathered_gop (GstSplitMuxSink * splitmux) GST_INFO_OBJECT (splitmux, "mq overflowed since last, draining out. max out TS is %" - GST_TIME_FORMAT, GST_TIME_ARGS (splitmux->max_out_running_time)); + GST_STIME_FORMAT, GST_STIME_ARGS (splitmux->max_out_running_time)); GST_SPLITMUX_BROADCAST (splitmux); } else { /* No overflow */ GST_LOG_OBJECT (splitmux, "This GOP didn't overflow the fragment. Bytes sent %" G_GSIZE_FORMAT - " queued %" G_GSIZE_FORMAT " time %" GST_TIME_FORMAT " Continuing.", + " queued %" G_GSIZE_FORMAT " time %" GST_STIME_FORMAT " Continuing.", splitmux->muxed_out_bytes - splitmux->mux_start_bytes, - queued_bytes, GST_TIME_ARGS (queued_time)); + queued_bytes, GST_STIME_ARGS (queued_time)); /* Wake everyone up to push this one GOP, then sleep */ splitmux->have_muxed_something = TRUE; @@ -892,11 +912,11 @@ handle_gathered_gop (GstSplitMuxSink * splitmux) splitmux->max_out_running_time = splitmux->reference_ctx->in_running_time; } else { splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE; - splitmux->max_out_running_time = GST_CLOCK_TIME_NONE; + splitmux->max_out_running_time = GST_CLOCK_STIME_NONE; } GST_LOG_OBJECT (splitmux, "Waking output for complete GOP, TS %" - GST_TIME_FORMAT, GST_TIME_ARGS (splitmux->max_out_running_time)); + GST_STIME_FORMAT, GST_STIME_ARGS (splitmux->max_out_running_time)); GST_SPLITMUX_BROADCAST (splitmux); } @@ -912,24 +932,24 @@ check_completed_gop (GstSplitMuxSink * splitmux, MqStreamCtx * ctx) { GList *cur; gboolean ready = TRUE; - GstClockTime current_max_in_running_time; + GstClockTimeDiff current_max_in_running_time; if (splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE) { /* Iterate each pad, and check that the input running time is at least * up to the reference running time, and if so handle the collected GOP */ GST_LOG_OBJECT (splitmux, "Checking GOP collected, Max in running time %" - GST_TIME_FORMAT " ctx %p", - GST_TIME_ARGS (splitmux->max_in_running_time), ctx); + GST_STIME_FORMAT " ctx %p", + GST_STIME_ARGS (splitmux->max_in_running_time), ctx); for (cur = g_list_first (splitmux->contexts); cur != NULL; cur = g_list_next (cur)) { MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data); GST_LOG_OBJECT (splitmux, - "Context %p (src pad %" GST_PTR_FORMAT ") TS %" GST_TIME_FORMAT + "Context %p (src pad %" GST_PTR_FORMAT ") TS %" GST_STIME_FORMAT " EOS %d", tmpctx, tmpctx->srcpad, - GST_TIME_ARGS (tmpctx->in_running_time), tmpctx->in_eos); + GST_STIME_ARGS (tmpctx->in_running_time), tmpctx->in_eos); - if (splitmux->max_in_running_time != GST_CLOCK_TIME_NONE && + if (splitmux->max_in_running_time != G_MAXINT64 && tmpctx->in_running_time < splitmux->max_in_running_time && !tmpctx->in_eos) { GST_LOG_OBJECT (splitmux, @@ -1050,7 +1070,7 @@ handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx) gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED); ctx->in_eos = FALSE; ctx->in_bytes = 0; - ctx->in_running_time = 0; + ctx->in_running_time = GST_CLOCK_STIME_NONE; GST_SPLITMUX_UNLOCK (splitmux); break; case GST_EVENT_EOS: @@ -1063,7 +1083,7 @@ handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx) if (ctx->is_reference) { GST_INFO_OBJECT (splitmux, "Got Reference EOS. Finishing up"); /* Act as if this is a new keyframe with infinite timestamp */ - splitmux->max_in_running_time = GST_CLOCK_TIME_NONE; + splitmux->max_in_running_time = G_MAXINT64; splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE; /* Wake up other input pads to collect this GOP */ GST_SPLITMUX_BROADCAST (splitmux); @@ -1091,6 +1111,8 @@ handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx) else ts = GST_BUFFER_DTS (buf); + GST_LOG_OBJECT (pad, "Buffer TS is %" GST_STIME_FORMAT, GST_STIME_ARGS (ts)); + GST_SPLITMUX_LOCK (splitmux); if (splitmux->state == SPLITMUX_STATE_STOPPED) @@ -1099,23 +1121,27 @@ handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx) /* If this buffer has a timestamp, advance the input timestamp of the * stream */ if (GST_CLOCK_TIME_IS_VALID (ts)) { - GstClockTime running_time = - gst_segment_to_running_time (&ctx->in_segment, GST_FORMAT_TIME, + GstClockTimeDiff running_time = + my_segment_to_running_time (&ctx->in_segment, GST_BUFFER_TIMESTAMP (buf)); - if (GST_CLOCK_TIME_IS_VALID (running_time) && - (ctx->in_running_time == GST_CLOCK_TIME_NONE - || running_time > ctx->in_running_time)) + GST_LOG_OBJECT (pad, "Buffer running TS is %" GST_STIME_FORMAT, + GST_STIME_ARGS (running_time)); + + if (GST_CLOCK_STIME_IS_VALID (running_time) + && running_time > ctx->in_running_time) ctx->in_running_time = running_time; } /* Try to make sure we have a valid running time */ - if (!GST_CLOCK_TIME_IS_VALID (ctx->in_running_time)) { + if (!GST_CLOCK_STIME_IS_VALID (ctx->in_running_time)) { ctx->in_running_time = - gst_segment_to_running_time (&ctx->in_segment, GST_FORMAT_TIME, - ctx->in_segment.start); + my_segment_to_running_time (&ctx->in_segment, ctx->in_segment.start); } + GST_LOG_OBJECT (pad, "in running time now %" GST_STIME_FORMAT, + GST_STIME_ARGS (ctx->in_running_time)); + buf_info->run_ts = ctx->in_running_time; buf_info->buf_size = gst_buffer_get_size (buf); @@ -1123,12 +1149,19 @@ handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx) ctx->in_bytes += buf_info->buf_size; /* initialize mux_start_time */ - if (ctx->is_reference && splitmux->mux_start_time == 0) + if (ctx->is_reference && splitmux->mux_start_time == GST_CLOCK_STIME_NONE) { splitmux->mux_start_time = buf_info->run_ts; + GST_LOG_OBJECT (splitmux, "Mux start time now %" GST_STIME_FORMAT, + GST_STIME_ARGS (splitmux->mux_start_time)); + /* Also take this as the first start time when starting up, + * so that we start counting overflow from the first frame */ + if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time)) + splitmux->max_in_running_time = splitmux->mux_start_time; + } - GST_DEBUG_OBJECT (pad, "Buf TS %" GST_TIME_FORMAT + GST_DEBUG_OBJECT (pad, "Buf TS %" GST_STIME_FORMAT " total in_bytes %" G_GSIZE_FORMAT, - GST_TIME_ARGS (buf_info->run_ts), ctx->in_bytes); + GST_STIME_ARGS (buf_info->run_ts), ctx->in_bytes); loop_again = TRUE; do { @@ -1140,15 +1173,15 @@ handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx) if (ctx->is_reference) { /* If a keyframe, we have a complete GOP */ if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT) || - !GST_CLOCK_TIME_IS_VALID (ctx->in_running_time) || + !GST_CLOCK_STIME_IS_VALID (ctx->in_running_time) || splitmux->max_in_running_time >= ctx->in_running_time) { /* Pass this buffer through */ loop_again = FALSE; break; } GST_INFO_OBJECT (pad, - "Have keyframe with running time %" GST_TIME_FORMAT, - GST_TIME_ARGS (ctx->in_running_time)); + "Have keyframe with running time %" GST_STIME_FORMAT, + GST_STIME_ARGS (ctx->in_running_time)); keyframe = TRUE; splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE; splitmux->max_in_running_time = ctx->in_running_time; @@ -1164,14 +1197,14 @@ handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx) } break; case SPLITMUX_STATE_WAITING_GOP_COMPLETE: - /* After a GOP start is found, this buffer might complete the GOP */ + /* If we overran the target timestamp, it might be time to process * the GOP, otherwise bail out for more data */ GST_LOG_OBJECT (pad, - "Checking TS %" GST_TIME_FORMAT " against max %" GST_TIME_FORMAT, - GST_TIME_ARGS (ctx->in_running_time), - GST_TIME_ARGS (splitmux->max_in_running_time)); + "Checking TS %" GST_STIME_FORMAT " against max %" GST_STIME_FORMAT, + GST_STIME_ARGS (ctx->in_running_time), + GST_STIME_ARGS (splitmux->max_in_running_time)); if (ctx->in_running_time < splitmux->max_in_running_time) { loop_again = FALSE; @@ -1195,7 +1228,7 @@ handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx) event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM | GST_EVENT_TYPE_SERIALIZED, gst_structure_new ("splitmuxsink-unblock", "timestamp", - G_TYPE_UINT64, splitmux->max_in_running_time, NULL)); + G_TYPE_INT64, splitmux->max_in_running_time, NULL)); GST_SPLITMUX_UNLOCK (splitmux); gst_pad_send_event (ctx->sinkpad, event); @@ -1227,7 +1260,7 @@ handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx) check_queue_length (splitmux, ctx); GST_LOG_OBJECT (pad, "Returning to queue buffer %" GST_PTR_FORMAT - " run ts %" GST_TIME_FORMAT, buf, GST_TIME_ARGS (ctx->in_running_time)); + " run ts %" GST_STIME_FORMAT, buf, GST_STIME_ARGS (ctx->in_running_time)); GST_SPLITMUX_UNLOCK (splitmux); return GST_PAD_PROBE_PASS; @@ -1605,8 +1638,9 @@ gst_splitmux_sink_change_state (GstElement * element, GstStateChange transition) GST_SPLITMUX_LOCK (splitmux); /* Start by collecting one input on each pad */ splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START; - splitmux->max_in_running_time = 0; - splitmux->muxed_out_time = splitmux->mux_start_time = 0; + splitmux->max_in_running_time = GST_CLOCK_STIME_NONE; + splitmux->muxed_out_time = splitmux->mux_start_time = + GST_CLOCK_STIME_NONE; splitmux->muxed_out_bytes = splitmux->mux_start_bytes = 0; splitmux->opening_first_fragment = TRUE; GST_SPLITMUX_UNLOCK (splitmux); diff --git a/gst/multifile/gstsplitmuxsink.h b/gst/multifile/gstsplitmuxsink.h index a233642c7a..82e03a7286 100644 --- a/gst/multifile/gstsplitmuxsink.h +++ b/gst/multifile/gstsplitmuxsink.h @@ -48,7 +48,7 @@ typedef enum _SplitMuxState { typedef struct _MqStreamBuf { gboolean keyframe; - GstClockTime run_ts; + GstClockTimeDiff run_ts; gsize buf_size; } MqStreamBuf; @@ -70,8 +70,8 @@ typedef struct _MqStreamCtx GstSegment in_segment; GstSegment out_segment; - GstClockTime in_running_time; - GstClockTime out_running_time; + GstClockTimeDiff in_running_time; + GstClockTimeDiff out_running_time; gsize in_bytes; @@ -114,14 +114,14 @@ struct _GstSplitMuxSink { MqStreamCtx *reference_ctx; guint queued_gops; - GstClockTime max_in_running_time; - GstClockTime max_out_running_time; + GstClockTimeDiff max_in_running_time; + GstClockTimeDiff max_out_running_time; - GstClockTime muxed_out_time; + GstClockTimeDiff muxed_out_time; gsize muxed_out_bytes; gboolean have_muxed_something; - GstClockTime mux_start_time; + GstClockTimeDiff mux_start_time; gsize mux_start_bytes; gboolean opening_first_fragment;