From 335c9f28d6a58ff9ce2dec5b7f5db8fc3be41dd9 Mon Sep 17 00:00:00 2001 From: Jan Schmidt Date: Fri, 18 Nov 2016 22:42:18 +1100 Subject: [PATCH] splitmux: Rewrite buffer collection and scheduling Majorly change the way that splitmuxsink collects incoming data and sends it to the output, so that it makes all decisions about when / where to split files on the input side. Use separate queues for each stream, so they can be grown individually and kept as small as possible. This removes raciness I observed where sometimes some data would end up put in a different output file over multiple runs with the same input. Also fixes hangs with input queues getting full and causing muxing to stall out. --- gst/multifile/gstsplitmuxsink.c | 914 ++++++++++++++++++-------------- gst/multifile/gstsplitmuxsink.h | 82 ++- 2 files changed, 577 insertions(+), 419 deletions(-) diff --git a/gst/multifile/gstsplitmuxsink.c b/gst/multifile/gstsplitmuxsink.c index 5d8b75f093..4b2f01042b 100644 --- a/gst/multifile/gstsplitmuxsink.c +++ b/gst/multifile/gstsplitmuxsink.c @@ -63,8 +63,11 @@ GST_DEBUG_CATEGORY_STATIC (splitmux_debug); #define GST_SPLITMUX_LOCK(s) g_mutex_lock(&(s)->lock) #define GST_SPLITMUX_UNLOCK(s) g_mutex_unlock(&(s)->lock) -#define GST_SPLITMUX_WAIT(s) g_cond_wait (&(s)->data_cond, &(s)->lock) -#define GST_SPLITMUX_BROADCAST(s) g_cond_broadcast (&(s)->data_cond) +#define GST_SPLITMUX_WAIT_INPUT(s) g_cond_wait (&(s)->input_cond, &(s)->lock) +#define GST_SPLITMUX_BROADCAST_INPUT(s) g_cond_broadcast (&(s)->input_cond) + +#define GST_SPLITMUX_WAIT_OUTPUT(s) g_cond_wait (&(s)->output_cond, &(s)->lock) +#define GST_SPLITMUX_BROADCAST_OUTPUT(s) g_cond_broadcast (&(s)->output_cond) enum { @@ -124,7 +127,7 @@ _do_init (void) G_DEFINE_TYPE_EXTENDED (GstSplitMuxSink, gst_splitmux_sink, GST_TYPE_BIN, 0, _do_init ()); -static gboolean create_elements (GstSplitMuxSink * splitmux); +static gboolean create_muxer (GstSplitMuxSink * splitmux); static gboolean create_sink (GstSplitMuxSink * splitmux); static void gst_splitmux_sink_set_property (GObject * object, guint prop_id, const GValue * value, GParamSpec * pspec); @@ -143,10 +146,12 @@ static GstStateChangeReturn gst_splitmux_sink_change_state (GstElement * static void bus_handler (GstBin * bin, GstMessage * msg); static void set_next_filename (GstSplitMuxSink * splitmux, MqStreamCtx * ctx); static void start_next_fragment (GstSplitMuxSink * splitmux, MqStreamCtx * ctx); -static void check_queue_length (GstSplitMuxSink * splitmux, MqStreamCtx * ctx); static void mq_stream_ctx_unref (MqStreamCtx * ctx); +static void grow_blocked_queues (GstSplitMuxSink * splitmux); static void gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux); +static GstElement *create_element (GstSplitMuxSink * splitmux, + const gchar * factory, const gchar * name, gboolean locked); static void do_async_done (GstSplitMuxSink * splitmux); @@ -162,6 +167,18 @@ mq_stream_buf_free (MqStreamBuf * data) g_slice_free (MqStreamBuf, data); } +static SplitMuxOutputCommand * +out_cmd_buf_new (void) +{ + return g_slice_new0 (SplitMuxOutputCommand); +} + +static void +out_cmd_buf_free (SplitMuxOutputCommand * data) +{ + g_slice_free (SplitMuxOutputCommand, data); +} + static void gst_splitmux_sink_class_init (GstSplitMuxSinkClass * klass) { @@ -267,7 +284,9 @@ static void gst_splitmux_sink_init (GstSplitMuxSink * splitmux) { g_mutex_init (&splitmux->lock); - g_cond_init (&splitmux->data_cond); + g_cond_init (&splitmux->input_cond); + g_cond_init (&splitmux->output_cond); + g_queue_init (&splitmux->out_cmd_q); splitmux->mux_overhead = DEFAULT_MUXER_OVERHEAD; splitmux->threshold_time = DEFAULT_MAX_SIZE_TIME; @@ -275,19 +294,12 @@ gst_splitmux_sink_init (GstSplitMuxSink * splitmux) splitmux->max_files = DEFAULT_MAX_FILES; splitmux->send_keyframe_requests = DEFAULT_SEND_KEYFRAME_REQUESTS; - splitmux->update_mux_start_time = FALSE; - GST_OBJECT_FLAG_SET (splitmux, GST_ELEMENT_FLAG_SINK); } static void gst_splitmux_reset (GstSplitMuxSink * splitmux) { - if (splitmux->mq) { - gst_element_set_locked_state (splitmux->mq, TRUE); - gst_element_set_state (splitmux->mq, GST_STATE_NULL); - gst_bin_remove (GST_BIN (splitmux), splitmux->mq); - } if (splitmux->muxer) { gst_element_set_locked_state (splitmux->muxer, TRUE); gst_element_set_state (splitmux->muxer, GST_STATE_NULL); @@ -299,8 +311,7 @@ gst_splitmux_reset (GstSplitMuxSink * splitmux) gst_bin_remove (GST_BIN (splitmux), splitmux->active_sink); } - splitmux->sink = splitmux->active_sink = splitmux->muxer = splitmux->mq = - NULL; + splitmux->sink = splitmux->active_sink = splitmux->muxer = NULL; } static void @@ -309,8 +320,7 @@ gst_splitmux_sink_dispose (GObject * object) GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object); /* Calling parent dispose invalidates all child pointers */ - splitmux->sink = splitmux->active_sink = splitmux->muxer = splitmux->mq = - NULL; + splitmux->sink = splitmux->active_sink = splitmux->muxer = NULL; G_OBJECT_CLASS (parent_class)->dispose (object); } @@ -319,8 +329,12 @@ static void gst_splitmux_sink_finalize (GObject * object) { GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object); - g_cond_clear (&splitmux->data_cond); + g_cond_clear (&splitmux->input_cond); + g_cond_clear (&splitmux->output_cond); g_mutex_clear (&splitmux->lock); + g_queue_foreach (&splitmux->out_cmd_q, (GFunc) out_cmd_buf_free, NULL); + g_queue_clear (&splitmux->out_cmd_q); + if (splitmux->provided_sink) gst_object_unref (splitmux->provided_sink); if (splitmux->provided_muxer) @@ -466,51 +480,6 @@ my_segment_to_running_time (GstSegment * segment, GstClockTime val) return res; } -static GstPad * -mq_sink_to_src (GstElement * mq, GstPad * sink_pad) -{ - gchar *tmp, *sinkname, *srcname; - GstPad *mq_src; - - sinkname = gst_pad_get_name (sink_pad); - tmp = sinkname + 5; - srcname = g_strdup_printf ("src_%s", tmp); - - mq_src = gst_element_get_static_pad (mq, srcname); - - g_free (sinkname); - g_free (srcname); - - return mq_src; -} - -static gboolean -get_pads_from_mq (GstSplitMuxSink * splitmux, GstPad ** sink_pad, - GstPad ** src_pad) -{ - GstPad *mq_sink; - GstPad *mq_src; - - /* Request a pad from multiqueue, then connect this one, then - * discover the corresponding output pad and return both */ - mq_sink = gst_element_get_request_pad (splitmux->mq, "sink_%u"); - if (mq_sink == NULL) - return FALSE; - - mq_src = mq_sink_to_src (splitmux->mq, mq_sink); - if (mq_src == NULL) - goto fail; - - *sink_pad = mq_sink; - *src_pad = mq_src; - - return TRUE; - -fail: - gst_element_release_request_pad (splitmux->mq, mq_sink); - return FALSE; -} - static MqStreamCtx * mq_stream_ctx_new (GstSplitMuxSink * splitmux) { @@ -529,6 +498,15 @@ mq_stream_ctx_new (GstSplitMuxSink * splitmux) static void mq_stream_ctx_free (MqStreamCtx * ctx) { + if (ctx->q) { + g_signal_handler_disconnect (ctx->q, ctx->q_overrun_id); + gst_element_set_locked_state (ctx->q, TRUE); + gst_element_set_state (ctx->q, GST_STATE_NULL); + gst_bin_remove (GST_BIN (ctx->splitmux), ctx->q); + gst_object_unref (ctx->q); + } + gst_object_unref (ctx->sinkpad); + gst_object_unref (ctx->srcpad); g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL); g_queue_clear (&ctx->queued_bufs); g_free (ctx); @@ -611,46 +589,99 @@ static void complete_or_wait_on_out (GstSplitMuxSink * splitmux, MqStreamCtx * ctx) { do { + /* When first starting up, the reference stream has to output + * the first buffer to prepare the muxer and sink */ + gboolean can_output = (ctx->is_reference || splitmux->ready_for_output); + + if (ctx->flushing + || splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED) + return; GST_LOG_OBJECT (ctx->srcpad, "Checking running time %" GST_STIME_FORMAT " against max %" GST_STIME_FORMAT, GST_STIME_ARGS (ctx->out_running_time), GST_STIME_ARGS (splitmux->max_out_running_time)); - if (splitmux->max_out_running_time == GST_CLOCK_STIME_NONE || - ctx->out_running_time < splitmux->max_out_running_time) { - splitmux->have_muxed_something = TRUE; - return; - } - - if (ctx->flushing || splitmux->state == SPLITMUX_STATE_STOPPED) - return; - - if (splitmux->state == SPLITMUX_STATE_ENDING_FILE) { - if (ctx->out_eos == FALSE) { - send_eos (splitmux, ctx); - continue; + if (can_output) { + if (splitmux->max_out_running_time == GST_CLOCK_STIME_NONE || + ctx->out_running_time < splitmux->max_out_running_time) { + return; + } + + switch (splitmux->output_state) { + case SPLITMUX_OUTPUT_STATE_OUTPUT_GOP: + /* We only get here if we've finished outputting a GOP and need to know + * what to do next */ + splitmux->output_state = SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND; + GST_SPLITMUX_BROADCAST_OUTPUT (splitmux); + continue; + + case SPLITMUX_OUTPUT_STATE_ENDING_FILE: + /* We've reached the max out running_time to get here, so end this file now */ + if (ctx->out_eos == FALSE) { + send_eos (splitmux, ctx); + continue; + } + break; + case SPLITMUX_OUTPUT_STATE_START_NEXT_FILE: + if (ctx->is_reference) { + /* Special handling on the reference ctx to start new fragments + * and collect commands from the command queue */ + /* drops the splitmux lock briefly: */ + start_next_fragment (splitmux, ctx); + continue; + } + break; + + case SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND:{ + do { + SplitMuxOutputCommand *cmd = + g_queue_pop_tail (&splitmux->out_cmd_q); + if (cmd != NULL) { + /* If we pop the last command, we need to make our queues bigger */ + if (g_queue_get_length (&splitmux->out_cmd_q) == 0) + grow_blocked_queues (splitmux); + + if (cmd->start_new_fragment) { + GST_DEBUG_OBJECT (splitmux, "Got cmd to start new fragment"); + splitmux->output_state = SPLITMUX_OUTPUT_STATE_ENDING_FILE; + } else { + GST_DEBUG_OBJECT (splitmux, + "Got new output cmd for time %" GST_STIME_FORMAT, + GST_STIME_ARGS (cmd->max_output_ts)); + + /* Extend the output range immediately */ + splitmux->max_out_running_time = cmd->max_output_ts; + splitmux->output_state = SPLITMUX_OUTPUT_STATE_OUTPUT_GOP; + } + GST_SPLITMUX_BROADCAST_OUTPUT (splitmux); + + out_cmd_buf_free (cmd); + break; + } else { + GST_SPLITMUX_WAIT_OUTPUT (splitmux); + } + } while (splitmux->output_state == + SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND); + /* loop and re-check the state */ + continue; + } + case SPLITMUX_OUTPUT_STATE_STOPPED: + return; } - } else if (ctx->is_reference - && splitmux->state == SPLITMUX_STATE_START_NEXT_FRAGMENT) { - start_next_fragment (splitmux, ctx); - continue; } GST_INFO_OBJECT (ctx->srcpad, "Sleeping for running time %" - GST_STIME_FORMAT " (max %" GST_STIME_FORMAT ")", + GST_STIME_FORMAT " (max %" GST_STIME_FORMAT ") or state change.", GST_STIME_ARGS (ctx->out_running_time), GST_STIME_ARGS (splitmux->max_out_running_time)); - ctx->out_blocked = TRUE; - /* Expand the mq if needed before sleeping */ - check_queue_length (splitmux, ctx); - GST_SPLITMUX_WAIT (splitmux); - ctx->out_blocked = FALSE; + GST_SPLITMUX_WAIT_OUTPUT (splitmux); GST_INFO_OBJECT (ctx->srcpad, "Woken for new max running time %" GST_STIME_FORMAT, GST_STIME_ARGS (splitmux->max_out_running_time)); - } while (1); + } + while (1); } static gboolean @@ -658,14 +689,14 @@ request_next_keyframe (GstSplitMuxSink * splitmux) { GstEvent *ev; - if (splitmux->send_keyframe_requests == FALSE || splitmux->threshold_time == 0 - || splitmux->threshold_bytes != 0) + if (splitmux->send_keyframe_requests == FALSE + || splitmux->threshold_time == 0 || splitmux->threshold_bytes != 0) return TRUE; - ev = gst_video_event_new_upstream_force_key_unit (splitmux->mux_start_time + - splitmux->threshold_time, TRUE, 0); + ev = gst_video_event_new_upstream_force_key_unit + (splitmux->fragment_start_time + splitmux->threshold_time, TRUE, 0); GST_DEBUG_OBJECT (splitmux, "Requesting next keyframe at %" GST_TIME_FORMAT, - GST_TIME_ARGS (splitmux->mux_start_time + splitmux->threshold_time)); + GST_TIME_ARGS (splitmux->fragment_start_time + splitmux->threshold_time)); return gst_pad_push_event (splitmux->reference_ctx->sinkpad, ev); } @@ -684,6 +715,7 @@ handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx) } if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM) { GstEvent *event = gst_pad_probe_info_get_event (info); + gboolean locked = FALSE; GST_LOG_OBJECT (pad, "Event %" GST_PTR_FORMAT, event); @@ -693,25 +725,25 @@ handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx) break; case GST_EVENT_FLUSH_STOP: GST_SPLITMUX_LOCK (splitmux); + locked = TRUE; gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED); g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL); g_queue_clear (&ctx->queued_bufs); ctx->flushing = FALSE; - GST_SPLITMUX_UNLOCK (splitmux); break; case GST_EVENT_FLUSH_START: GST_SPLITMUX_LOCK (splitmux); + locked = TRUE; GST_LOG_OBJECT (pad, "Flush start"); ctx->flushing = TRUE; - GST_SPLITMUX_BROADCAST (splitmux); - GST_SPLITMUX_UNLOCK (splitmux); + GST_SPLITMUX_BROADCAST_OUTPUT (splitmux); break; case GST_EVENT_EOS: GST_SPLITMUX_LOCK (splitmux); - if (splitmux->state == SPLITMUX_STATE_STOPPED) + locked = TRUE; + if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED) goto beach; ctx->out_eos = TRUE; - GST_SPLITMUX_UNLOCK (splitmux); break; case GST_EVENT_GAP:{ GstClockTime gap_ts; @@ -722,13 +754,9 @@ handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx) break; GST_SPLITMUX_LOCK (splitmux); + locked = TRUE; - rtime = my_segment_to_running_time (&ctx->out_segment, gap_ts); - - GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_STIME_FORMAT, - GST_STIME_ARGS (rtime)); - - if (splitmux->state == SPLITMUX_STATE_STOPPED) + if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED) goto beach; /* When we get a gap event on the @@ -737,19 +765,22 @@ handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx) * the buffer afterwards */ if (ctx->is_reference && - (splitmux->opening_first_fragment || - splitmux->state == SPLITMUX_STATE_START_NEXT_FRAGMENT)) { + (splitmux->output_state != SPLITMUX_OUTPUT_STATE_OUTPUT_GOP)) { GST_DEBUG_OBJECT (pad, "Storing GAP event until buffer arrives"); gst_event_replace (&ctx->pending_gap, event); GST_SPLITMUX_UNLOCK (splitmux); return GST_PAD_PROBE_HANDLED; } + rtime = my_segment_to_running_time (&ctx->out_segment, gap_ts); + + GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_STIME_FORMAT, + GST_STIME_ARGS (rtime)); + if (rtime != GST_CLOCK_STIME_NONE) { ctx->out_running_time = rtime; complete_or_wait_on_out (splitmux, ctx); } - GST_SPLITMUX_UNLOCK (splitmux); break; } case GST_EVENT_CUSTOM_DOWNSTREAM:{ @@ -763,8 +794,9 @@ handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx) gst_structure_get_int64 (s, "timestamp", &ts); GST_SPLITMUX_LOCK (splitmux); + locked = TRUE; - if (splitmux->state == SPLITMUX_STATE_STOPPED) + if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED) goto beach; ctx->out_running_time = ts; if (!ctx->is_reference) @@ -775,6 +807,15 @@ handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx) default: break; } + + /* We need to make sure events aren't passed + * until the muxer / sink are ready for it */ + if (!locked) + GST_SPLITMUX_LOCK (splitmux); + if (!ctx->is_reference) + complete_or_wait_on_out (splitmux, ctx); + GST_SPLITMUX_UNLOCK (splitmux); + return GST_PAD_PROBE_PASS; } @@ -787,8 +828,8 @@ handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx) goto beach; /* If we have popped a keyframe, decrement the queued_gop count */ - if (buf_info->keyframe && splitmux->queued_gops > 0) - splitmux->queued_gops--; + if (buf_info->keyframe && splitmux->queued_keyframes > 0) + splitmux->queued_keyframes--; ctx->out_running_time = buf_info->run_ts; ctx->cur_buffer = gst_pad_probe_info_get_buffer (info); @@ -798,24 +839,8 @@ handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx) " size %" G_GUINT64_FORMAT, pad, GST_STIME_ARGS (ctx->out_running_time), buf_info->buf_size); - if (ctx->is_reference && splitmux->opening_first_fragment) { - if (request_next_keyframe (splitmux) == FALSE) - GST_WARNING_OBJECT (splitmux, - "Could not request a keyframe. Files may not split at the exact location they should"); - start_next_fragment (splitmux, ctx); - splitmux->opening_first_fragment = FALSE; - } - complete_or_wait_on_out (splitmux, ctx); - if (splitmux->update_mux_start_time && ctx->is_reference) { - splitmux->mux_start_time = buf_info->run_ts; - splitmux->update_mux_start_time = FALSE; - if (request_next_keyframe (splitmux) == FALSE) - GST_WARNING_OBJECT (splitmux, - "Could not request a keyframe. Files may not split at the exact location they should"); - } - if (splitmux->muxed_out_time == GST_CLOCK_STIME_NONE || splitmux->muxed_out_time < buf_info->run_ts) splitmux->muxed_out_time = buf_info->run_ts; @@ -884,50 +909,50 @@ restart_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux) static void start_next_fragment (GstSplitMuxSink * splitmux, MqStreamCtx * ctx) { + GstElement *muxer, *sink; + /* 1 change to new file */ splitmux->switching_fragment = TRUE; - gst_element_set_locked_state (splitmux->muxer, TRUE); - gst_element_set_locked_state (splitmux->active_sink, TRUE); - gst_element_set_state (splitmux->muxer, GST_STATE_NULL); - gst_element_set_state (splitmux->active_sink, GST_STATE_NULL); + /* We need to drop the splitmux lock to acquire the state lock + * here and ensure there's no racy state change going on elsewhere */ + muxer = gst_object_ref (splitmux->muxer); + sink = gst_object_ref (splitmux->active_sink); + GST_SPLITMUX_UNLOCK (splitmux); + GST_STATE_LOCK (splitmux); + + gst_element_set_locked_state (muxer, TRUE); + gst_element_set_locked_state (sink, TRUE); + gst_element_set_state (muxer, GST_STATE_NULL); + gst_element_set_state (sink, GST_STATE_NULL); + + GST_SPLITMUX_LOCK (splitmux); set_next_filename (splitmux, ctx); + GST_SPLITMUX_UNLOCK (splitmux); - gst_element_set_state (splitmux->active_sink, GST_STATE_TARGET (splitmux)); - gst_element_set_state (splitmux->muxer, GST_STATE_TARGET (splitmux)); - gst_element_set_locked_state (splitmux->muxer, FALSE); - gst_element_set_locked_state (splitmux->active_sink, FALSE); + gst_element_set_state (sink, GST_STATE_TARGET (splitmux)); + gst_element_set_state (muxer, GST_STATE_TARGET (splitmux)); + gst_element_set_locked_state (muxer, FALSE); + gst_element_set_locked_state (sink, FALSE); + gst_object_unref (sink); + gst_object_unref (muxer); + + GST_SPLITMUX_LOCK (splitmux); + GST_STATE_UNLOCK (splitmux); splitmux->switching_fragment = FALSE; do_async_done (splitmux); + splitmux->ready_for_output = TRUE; + g_list_foreach (splitmux->contexts, (GFunc) restart_context, splitmux); - if (!splitmux->opening_first_fragment) { - /* Switch state and go back to processing */ - if (!splitmux->reference_ctx->in_eos) { - splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START; - splitmux->max_out_running_time = splitmux->reference_ctx->in_running_time; - } else { - splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE; - splitmux->max_out_running_time = GST_CLOCK_STIME_NONE; - splitmux->have_muxed_something = FALSE; - } - splitmux->have_muxed_something = - (splitmux->reference_ctx->in_running_time > splitmux->muxed_out_time); - - /* Store the overflow parameters as the basis for the next fragment */ - splitmux->mux_start_bytes = splitmux->muxed_out_bytes; - - GST_DEBUG_OBJECT (splitmux, - "Restarting flow for new fragment. New running time %" GST_STIME_FORMAT, - GST_STIME_ARGS (splitmux->max_out_running_time)); - } - send_fragment_opened_closed_msg (splitmux, TRUE); - GST_SPLITMUX_BROADCAST (splitmux); + /* FIXME: Is this always the correct next state? */ + splitmux->output_state = SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND; + GST_SPLITMUX_BROADCAST_OUTPUT (splitmux); } static void @@ -942,15 +967,19 @@ bus_handler (GstBin * bin, GstMessage * message) send_fragment_opened_closed_msg (splitmux, FALSE); - if (splitmux->state == SPLITMUX_STATE_ENDING_FILE && - splitmux->max_out_running_time != GST_CLOCK_STIME_NONE) { + if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_ENDING_FILE) { GST_DEBUG_OBJECT (splitmux, "Caught EOS at end of fragment, dropping"); - splitmux->state = SPLITMUX_STATE_START_NEXT_FRAGMENT; - GST_SPLITMUX_BROADCAST (splitmux); + splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE; + GST_SPLITMUX_BROADCAST_OUTPUT (splitmux); gst_message_unref (message); GST_SPLITMUX_UNLOCK (splitmux); return; + } else { + GST_DEBUG_OBJECT (splitmux, + "Passing EOS message. Output state %d max_out_running_time %" + GST_STIME_FORMAT, splitmux->output_state, + GST_STIME_ARGS (splitmux->max_out_running_time)); } GST_SPLITMUX_UNLOCK (splitmux); break; @@ -958,8 +987,8 @@ bus_handler (GstBin * bin, GstMessage * message) case GST_MESSAGE_ASYNC_DONE: /* Ignore state changes from our children while switching */ if (splitmux->switching_fragment) { - if (GST_MESSAGE_SRC (message) == (GstObject *) splitmux->active_sink || - GST_MESSAGE_SRC (message) == (GstObject *) splitmux->muxer) { + if (GST_MESSAGE_SRC (message) == (GstObject *) splitmux->active_sink + || GST_MESSAGE_SRC (message) == (GstObject *) splitmux->muxer) { GST_LOG_OBJECT (splitmux, "Ignoring state change from child %" GST_PTR_FORMAT " while switching", GST_MESSAGE_SRC (message)); @@ -975,6 +1004,12 @@ bus_handler (GstBin * bin, GstMessage * message) GST_BIN_CLASS (parent_class)->handle_message (bin, message); } +static void +ctx_set_unblock (MqStreamCtx * ctx) +{ + ctx->need_unblock = TRUE; +} + /* Called with splitmux lock held */ /* Called when entering ProcessingCompleteGop state * Assess if mq contents overflowed the current file @@ -985,27 +1020,25 @@ bus_handler (GstBin * bin, GstMessage * message) static void handle_gathered_gop (GstSplitMuxSink * splitmux) { - GList *cur; - guint64 queued_bytes = 0; + guint64 queued_bytes; GstClockTimeDiff queued_time = 0; + GstClockTimeDiff new_out_ts = splitmux->reference_ctx->in_running_time; + SplitMuxOutputCommand *cmd; /* Assess if the multiqueue contents overflowed the current file */ - for (cur = g_list_first (splitmux->contexts); - cur != NULL; cur = g_list_next (cur)) { - MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data); - if (tmpctx->in_running_time > queued_time) - queued_time = tmpctx->in_running_time; - queued_bytes += tmpctx->in_bytes; - } + /* When considering if a newly gathered GOP overflows + * the time limit for the file, only consider the running time of the + * reference stream. Other streams might have run ahead a little bit, + * but extra pieces won't be released to the muxer beyond the reference + * stream cut-off anyway - so it forms the limit. */ + queued_bytes = splitmux->fragment_total_bytes + splitmux->gop_total_bytes; + queued_time = splitmux->reference_ctx->in_running_time; - GST_LOG_OBJECT (splitmux, " queued_bytes %" G_GUINT64_FORMAT - " splitmuxsink->mux_start_bytes %" G_GUINT64_FORMAT, queued_bytes, - splitmux->mux_start_bytes); - g_assert (queued_bytes >= splitmux->mux_start_bytes); - g_assert (queued_time >= splitmux->mux_start_time); + GST_LOG_OBJECT (splitmux, " queued_bytes %" G_GUINT64_FORMAT, queued_bytes); - queued_bytes -= splitmux->mux_start_bytes; - queued_time -= splitmux->mux_start_time; + g_assert (queued_time >= splitmux->fragment_start_time); + + queued_time -= splitmux->fragment_start_time; /* Expand queued bytes estimate by muxer overhead */ queued_bytes += (queued_bytes * splitmux->mux_overhead); @@ -1015,43 +1048,68 @@ handle_gathered_gop (GstSplitMuxSink * splitmux) /* Check for overrun - have we output at least one byte and overrun * either threshold? */ - if ((splitmux->have_muxed_something && + if ((splitmux->fragment_total_bytes > 0 && ((splitmux->threshold_bytes > 0 && queued_bytes > splitmux->threshold_bytes) || (splitmux->threshold_time > 0 && queued_time > splitmux->threshold_time)))) { - splitmux->state = SPLITMUX_STATE_ENDING_FILE; - splitmux->update_mux_start_time = TRUE; + /* Tell the output side to start a new fragment */ GST_INFO_OBJECT (splitmux, - "mq overflowed since last, draining out. max out TS is %" - GST_STIME_FORMAT, GST_STIME_ARGS (splitmux->max_out_running_time)); - GST_SPLITMUX_BROADCAST (splitmux); + "This GOP (dur %" GST_STIME_FORMAT + ") would overflow the fragment, Sending start_new_fragment cmd", + GST_STIME_ARGS (splitmux->reference_ctx->in_running_time - + splitmux->gop_start_time)); + cmd = out_cmd_buf_new (); + cmd->start_new_fragment = TRUE; + g_queue_push_head (&splitmux->out_cmd_q, cmd); + GST_SPLITMUX_BROADCAST_OUTPUT (splitmux); - } else { - /* No overflow */ - GST_LOG_OBJECT (splitmux, - "This GOP didn't overflow the fragment. Bytes sent %" G_GUINT64_FORMAT - " queued %" G_GUINT64_FORMAT " time %" GST_STIME_FORMAT " Continuing.", - splitmux->muxed_out_bytes - splitmux->mux_start_bytes, - queued_bytes, GST_STIME_ARGS (queued_time)); + new_out_ts = splitmux->reference_ctx->in_running_time; + splitmux->fragment_start_time = splitmux->gop_start_time; + splitmux->fragment_total_bytes = 0; - /* Wake everyone up to push this one GOP, then sleep */ - splitmux->have_muxed_something = TRUE; - - if (!splitmux->reference_ctx->in_eos) { - splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START; - splitmux->max_out_running_time = splitmux->reference_ctx->in_running_time; - } else { - splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE; - splitmux->max_out_running_time = GST_CLOCK_STIME_NONE; + if (request_next_keyframe (splitmux) == FALSE) { + GST_WARNING_OBJECT (splitmux, + "Could not request a keyframe. Files may not split at the exact location they should"); } - - GST_LOG_OBJECT (splitmux, "Waking output for complete GOP, TS %" - GST_STIME_FORMAT, GST_STIME_ARGS (splitmux->max_out_running_time)); - GST_SPLITMUX_BROADCAST (splitmux); } + /* And set up to collect the next GOP */ + if (!splitmux->reference_ctx->in_eos) { + splitmux->input_state = SPLITMUX_INPUT_STATE_COLLECTING_GOP_START; + splitmux->gop_start_time = new_out_ts; + } else { + /* This is probably already the current state, but just in case: */ + splitmux->input_state = SPLITMUX_INPUT_STATE_FINISHING_UP; + new_out_ts = GST_CLOCK_STIME_NONE; /* EOS runs until forever */ + } + + /* And wake all input contexts to send a wake-up event */ + g_list_foreach (splitmux->contexts, (GFunc) ctx_set_unblock, NULL); + GST_SPLITMUX_BROADCAST_INPUT (splitmux); + + /* Now either way - either there was no overflow, or we requested a new fragment: release this GOP */ + splitmux->fragment_total_bytes += splitmux->gop_total_bytes; + + if (splitmux->gop_total_bytes > 0) { + GST_LOG_OBJECT (splitmux, + "Releasing GOP to output. Bytes in fragment now %" G_GUINT64_FORMAT + " time %" GST_STIME_FORMAT, + splitmux->fragment_total_bytes, GST_STIME_ARGS (queued_time)); + + /* Send this GOP to the output command queue */ + cmd = out_cmd_buf_new (); + cmd->start_new_fragment = FALSE; + cmd->max_output_ts = new_out_ts; + GST_LOG_OBJECT (splitmux, "Sending GOP cmd to output for TS %" + GST_STIME_FORMAT, GST_STIME_ARGS (new_out_ts)); + g_queue_push_head (&splitmux->out_cmd_q, cmd); + + GST_SPLITMUX_BROADCAST_OUTPUT (splitmux); + } + + splitmux->gop_total_bytes = 0; } /* Called with splitmux lock held */ @@ -1063,10 +1121,37 @@ static void check_completed_gop (GstSplitMuxSink * splitmux, MqStreamCtx * ctx) { GList *cur; - gboolean ready = TRUE; - GstClockTimeDiff current_max_in_running_time; + GstEvent *event; + + /* On ENDING_FILE, the reference stream sends a command to start a new + * fragment, then releases the GOP for output in the new fragment. + * If somes streams received no buffer during the last GOP that overran, + * because its next buffer has a timestamp bigger than + * ctx->max_in_running_time, its queue is empty. In that case the only + * way to wakeup the output thread is by injecting an event in the + * queue. This usually happen with subtitle streams. + * See https://bugzilla.gnome.org/show_bug.cgi?id=763711. */ + if (ctx->need_unblock) { + GST_LOG_OBJECT (ctx->sinkpad, "Sending splitmuxsink-unblock event"); + event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM | + GST_EVENT_TYPE_SERIALIZED, + gst_structure_new ("splitmuxsink-unblock", "timestamp", + G_TYPE_INT64, splitmux->max_in_running_time, NULL)); + + GST_SPLITMUX_UNLOCK (splitmux); + gst_pad_send_event (ctx->sinkpad, event); + GST_SPLITMUX_LOCK (splitmux); + + ctx->need_unblock = FALSE; + GST_SPLITMUX_BROADCAST_INPUT (splitmux); + /* state may have changed while we were unlocked. Loop again if so */ + if (splitmux->input_state != SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT) + return; + } + + if (splitmux->input_state == SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT) { + gboolean ready = TRUE; - if (splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE) { /* Iterate each pad, and check that the input running time is at least * up to the reference running time, and if so handle the collected GOP */ GST_LOG_OBJECT (splitmux, "Checking GOP collected, Max in running time %" @@ -1081,7 +1166,7 @@ check_completed_gop (GstSplitMuxSink * splitmux, MqStreamCtx * ctx) " EOS %d", tmpctx, tmpctx->srcpad, GST_STIME_ARGS (tmpctx->in_running_time), tmpctx->in_eos); - if (splitmux->max_in_running_time != G_MAXINT64 && + if (splitmux->max_in_running_time != GST_CLOCK_STIME_NONE && tmpctx->in_running_time < splitmux->max_in_running_time && !tmpctx->in_eos) { GST_LOG_OBJECT (splitmux, @@ -1106,74 +1191,17 @@ check_completed_gop (GstSplitMuxSink * splitmux, MqStreamCtx * ctx) /* Some pad is not yet ready, or GOP is being pushed * either way, sleep and wait to get woken */ - current_max_in_running_time = splitmux->max_in_running_time; - while ((splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE || - splitmux->state == SPLITMUX_STATE_START_NEXT_FRAGMENT) && + while (splitmux->input_state == SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT && !ctx->flushing && - (current_max_in_running_time == splitmux->max_in_running_time)) { - - GST_LOG_OBJECT (splitmux, "Sleeping for %s (ctx %p)", - splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE ? - "GOP complete" : "EOF draining", ctx); - GST_SPLITMUX_WAIT (splitmux); + (ctx->in_running_time >= splitmux->max_in_running_time) && + (splitmux->max_in_running_time != GST_CLOCK_STIME_NONE)) { + GST_LOG_OBJECT (splitmux, "Sleeping for GOP collection (ctx %p)", ctx); + GST_SPLITMUX_WAIT_INPUT (splitmux); GST_LOG_OBJECT (splitmux, "Done waiting for complete GOP (ctx %p)", ctx); } } -static void -check_queue_length (GstSplitMuxSink * splitmux, MqStreamCtx * ctx) -{ - GList *cur; - guint cur_len = g_queue_get_length (&ctx->queued_bufs); - - GST_DEBUG_OBJECT (ctx->sinkpad, - "Checking queue length len %u cur_max %u queued gops %u", - cur_len, splitmux->mq_max_buffers, splitmux->queued_gops); - - if (cur_len >= splitmux->mq_max_buffers) { - gboolean allow_grow = FALSE; - - /* If collecting a GOP and this pad might block, - * and there isn't already a pending GOP in the queue - * then grow - */ - if (splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE && - ctx->in_running_time < splitmux->max_in_running_time && - splitmux->queued_gops <= 1) { - allow_grow = TRUE; - } else if (splitmux->state == SPLITMUX_STATE_COLLECTING_GOP_START && - ctx->is_reference && splitmux->queued_gops <= 1) { - allow_grow = TRUE; - } - - if (!allow_grow) { - for (cur = g_list_first (splitmux->contexts); - cur != NULL; cur = g_list_next (cur)) { - MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data); - GST_DEBUG_OBJECT (tmpctx->sinkpad, - " len %u out_blocked %d", - g_queue_get_length (&tmpctx->queued_bufs), tmpctx->out_blocked); - /* If another stream is starving, grow */ - if (tmpctx != ctx && g_queue_get_length (&tmpctx->queued_bufs) < 1) { - allow_grow = TRUE; - } - } - } - - if (allow_grow) { - splitmux->mq_max_buffers = cur_len + 1; - - GST_INFO_OBJECT (splitmux, - "Multiqueue overrun - enlarging to %u buffers ctx %p", - splitmux->mq_max_buffers, ctx); - - g_object_set (splitmux->mq, "max-size-buffers", - splitmux->mq_max_buffers, NULL); - } - } -} - static GstPadProbeReturn handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx) { @@ -1193,6 +1221,9 @@ handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx) } if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM) { GstEvent *event = gst_pad_probe_info_get_event (info); + + GST_LOG_OBJECT (pad, "Event %" GST_PTR_FORMAT, event); + switch (GST_EVENT_TYPE (event)) { case GST_EVENT_SEGMENT: gst_event_copy_segment (event, &ctx->in_segment); @@ -1201,7 +1232,6 @@ handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx) GST_SPLITMUX_LOCK (splitmux); gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED); ctx->in_eos = FALSE; - ctx->in_bytes = 0; ctx->in_running_time = GST_CLOCK_STIME_NONE; GST_SPLITMUX_UNLOCK (splitmux); break; @@ -1209,18 +1239,18 @@ handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx) GST_SPLITMUX_LOCK (splitmux); ctx->in_eos = TRUE; - if (splitmux->state == SPLITMUX_STATE_STOPPED) + if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED) goto beach; if (ctx->is_reference) { GST_INFO_OBJECT (splitmux, "Got Reference EOS. Finishing up"); - /* Act as if this is a new keyframe with infinite timestamp */ - splitmux->max_in_running_time = G_MAXINT64; - splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE; + /* check_completed_gop will act as if this is a new keyframe with infinite timestamp */ + splitmux->input_state = SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT; /* Wake up other input pads to collect this GOP */ - GST_SPLITMUX_BROADCAST (splitmux); + GST_SPLITMUX_BROADCAST_INPUT (splitmux); check_completed_gop (splitmux, ctx); - } else if (splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE) { + } else if (splitmux->input_state == + SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT) { /* If we are waiting for a GOP to be completed (ie, for aux * pads to catch up), then this pad is complete, so check * if the whole GOP is. @@ -1229,6 +1259,37 @@ handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx) } GST_SPLITMUX_UNLOCK (splitmux); break; + case GST_EVENT_GAP:{ + GstClockTime gap_ts; + GstClockTimeDiff rtime; + + gst_event_parse_gap (event, &gap_ts, NULL); + if (gap_ts == GST_CLOCK_TIME_NONE) + break; + + GST_SPLITMUX_LOCK (splitmux); + + if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED) + goto beach; + rtime = my_segment_to_running_time (&ctx->in_segment, gap_ts); + + GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_STIME_FORMAT, + GST_STIME_ARGS (rtime)); + + if (ctx->is_reference + && splitmux->fragment_start_time == GST_CLOCK_STIME_NONE) { + splitmux->gop_start_time = splitmux->fragment_start_time = rtime; + GST_LOG_OBJECT (splitmux, "Mux start time now %" GST_STIME_FORMAT, + GST_STIME_ARGS (splitmux->fragment_start_time)); + /* Also take this as the first start time when starting up, + * so that we start counting overflow from the first frame */ + if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time)) + splitmux->max_in_running_time = splitmux->fragment_start_time; + } + + GST_SPLITMUX_UNLOCK (splitmux); + break; + } default: break; } @@ -1247,7 +1308,7 @@ handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx) GST_SPLITMUX_LOCK (splitmux); - if (splitmux->state == SPLITMUX_STATE_STOPPED) + if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED) goto beach; /* If this buffer has a timestamp, advance the input timestamp of the @@ -1277,31 +1338,33 @@ handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx) buf_info->buf_size = gst_buffer_get_size (buf); buf_info->duration = GST_BUFFER_DURATION (buf); - /* Update total input byte counter for overflow detect */ - ctx->in_bytes += buf_info->buf_size; - - /* initialize mux_start_time */ - if (ctx->is_reference && splitmux->mux_start_time == GST_CLOCK_STIME_NONE) { - splitmux->mux_start_time = buf_info->run_ts; + /* initialize fragment_start_time */ + if (ctx->is_reference + && splitmux->fragment_start_time == GST_CLOCK_STIME_NONE) { + splitmux->gop_start_time = splitmux->fragment_start_time = buf_info->run_ts; GST_LOG_OBJECT (splitmux, "Mux start time now %" GST_STIME_FORMAT, - GST_STIME_ARGS (splitmux->mux_start_time)); + GST_STIME_ARGS (splitmux->fragment_start_time)); /* Also take this as the first start time when starting up, * so that we start counting overflow from the first frame */ if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time)) - splitmux->max_in_running_time = splitmux->mux_start_time; + splitmux->max_in_running_time = splitmux->fragment_start_time; + if (request_next_keyframe (splitmux) == FALSE) { + GST_WARNING_OBJECT (splitmux, + "Could not request a keyframe. Files may not split at the exact location they should"); + } } GST_DEBUG_OBJECT (pad, "Buf TS %" GST_STIME_FORMAT - " total in_bytes %" G_GUINT64_FORMAT, - GST_STIME_ARGS (buf_info->run_ts), ctx->in_bytes); + " total GOP bytes %" G_GUINT64_FORMAT, + GST_STIME_ARGS (buf_info->run_ts), splitmux->gop_total_bytes); loop_again = TRUE; do { if (ctx->flushing) break; - switch (splitmux->state) { - case SPLITMUX_STATE_COLLECTING_GOP_START: + switch (splitmux->input_state) { + case SPLITMUX_INPUT_STATE_COLLECTING_GOP_START: if (ctx->is_reference) { /* If a keyframe, we have a complete GOP */ if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT) || @@ -1309,33 +1372,50 @@ handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx) splitmux->max_in_running_time >= ctx->in_running_time) { /* Pass this buffer through */ loop_again = FALSE; + /* Allow other input pads to catch up to here too */ + splitmux->max_in_running_time = ctx->in_running_time; + GST_SPLITMUX_BROADCAST_INPUT (splitmux); break; } GST_INFO_OBJECT (pad, "Have keyframe with running time %" GST_STIME_FORMAT, GST_STIME_ARGS (ctx->in_running_time)); keyframe = TRUE; - splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE; + splitmux->input_state = SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT; splitmux->max_in_running_time = ctx->in_running_time; /* Wake up other input pads to collect this GOP */ - GST_SPLITMUX_BROADCAST (splitmux); + GST_SPLITMUX_BROADCAST_INPUT (splitmux); check_completed_gop (splitmux, ctx); } else { + /* Pass this buffer if the reference ctx is far enough ahead */ + if (ctx->in_running_time < splitmux->max_in_running_time) { + loop_again = FALSE; + break; + } + /* We're still waiting for a keyframe on the reference pad, sleep */ GST_LOG_OBJECT (pad, "Sleeping for GOP start"); - GST_SPLITMUX_WAIT (splitmux); - GST_LOG_OBJECT (pad, "Done sleeping for GOP start state now %d", - splitmux->state); + GST_SPLITMUX_WAIT_INPUT (splitmux); + GST_LOG_OBJECT (pad, + "Done sleeping for GOP start input state now %d", + splitmux->input_state); } break; - case SPLITMUX_STATE_WAITING_GOP_COMPLETE: + case SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT:{ + /* We're collecting a GOP. If this is the reference context, + * we need to check if this is a keyframe that marks the start + * of the next GOP. If it is, it marks the end of the GOP we're + * collecting, so sleep and wait until all the other pads also + * reach that timestamp - at which point, we have an entire GOP + * and either go to ENDING_FILE or release this GOP to the muxer and + * go back to COLLECT_GOP_START. */ /* If we overran the target timestamp, it might be time to process * the GOP, otherwise bail out for more data */ GST_LOG_OBJECT (pad, - "Checking TS %" GST_STIME_FORMAT " against max %" GST_STIME_FORMAT, - GST_STIME_ARGS (ctx->in_running_time), + "Checking TS %" GST_STIME_FORMAT " against max %" + GST_STIME_FORMAT, GST_STIME_ARGS (ctx->in_running_time), GST_STIME_ARGS (splitmux->max_in_running_time)); if (ctx->in_running_time < splitmux->max_in_running_time) { @@ -1347,53 +1427,28 @@ handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx) "Collected last packet of GOP. Checking other pads"); check_completed_gop (splitmux, ctx); break; - case SPLITMUX_STATE_ENDING_FILE:{ - GstEvent *event; - - /* If somes streams received no buffer during the last GOP that overran, - * because its next buffer has a timestamp bigger than - * ctx->max_in_running_time, its queue is empty. In that case the only - * way to wakeup the output thread is by injecting an event in the - * queue. This usually happen with subtitle streams. - * See https://bugzilla.gnome.org/show_bug.cgi?id=763711. */ - GST_LOG_OBJECT (pad, "Sending splitmuxsink-unblock event"); - event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM | - GST_EVENT_TYPE_SERIALIZED, - gst_structure_new ("splitmuxsink-unblock", "timestamp", - G_TYPE_INT64, splitmux->max_in_running_time, NULL)); - - GST_SPLITMUX_UNLOCK (splitmux); - gst_pad_send_event (ctx->sinkpad, event); - GST_SPLITMUX_LOCK (splitmux); - /* state may have changed while we were unlocked. Loop again if so */ - if (splitmux->state != SPLITMUX_STATE_ENDING_FILE) - break; - /* fallthrough */ } - case SPLITMUX_STATE_START_NEXT_FRAGMENT: - /* A fragment is ending, wait until that's done before continuing */ - GST_DEBUG_OBJECT (pad, "Sleeping for fragment restart"); - GST_SPLITMUX_WAIT (splitmux); - GST_DEBUG_OBJECT (pad, - "Done sleeping for fragment restart state now %d", splitmux->state); + case SPLITMUX_INPUT_STATE_FINISHING_UP: + loop_again = FALSE; break; default: loop_again = FALSE; break; } - } while (loop_again); + } + while (loop_again); if (keyframe) { - splitmux->queued_gops++; + splitmux->queued_keyframes++; buf_info->keyframe = TRUE; } + /* Update total input byte counter for overflow detect */ + splitmux->gop_total_bytes += buf_info->buf_size; + /* Now add this buffer to the queue just before returning */ g_queue_push_head (&ctx->queued_bufs, buf_info); - /* Check the buffer will fit in the mq */ - check_queue_length (splitmux, ctx); - GST_LOG_OBJECT (pad, "Returning to queue buffer %" GST_PTR_FORMAT " run ts %" GST_STIME_FORMAT, buf, GST_STIME_ARGS (ctx->in_running_time)); @@ -1407,6 +1462,89 @@ beach: return GST_PAD_PROBE_PASS; } +static void +grow_blocked_queues (GstSplitMuxSink * splitmux) +{ + GList *cur; + + /* Scan other queues for full-ness and grow them */ + for (cur = g_list_first (splitmux->contexts); + cur != NULL; cur = g_list_next (cur)) { + MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data); + guint cur_limit; + guint cur_len = g_queue_get_length (&tmpctx->queued_bufs); + + g_object_get (tmpctx->q, "max-size-buffers", &cur_limit, NULL); + GST_LOG_OBJECT (tmpctx->q, "Queue len %u", cur_len); + + if (cur_len >= cur_limit) { + cur_limit = cur_len + 1; + GST_DEBUG_OBJECT (tmpctx->q, + "Queue overflowed and needs enlarging. Growing to %u buffers", + cur_limit); + g_object_set (tmpctx->q, "max-size-buffers", cur_limit, NULL); + } + } +} + +static void +handle_q_underrun (GstElement * q, gpointer user_data) +{ + MqStreamCtx *ctx = (MqStreamCtx *) (user_data); + GstSplitMuxSink *splitmux = ctx->splitmux; + + GST_SPLITMUX_LOCK (splitmux); + GST_DEBUG_OBJECT (q, + "Queue reported underrun with %d keyframes and %d cmds enqueued", + splitmux->queued_keyframes, g_queue_get_length (&splitmux->out_cmd_q)); + grow_blocked_queues (splitmux); + GST_SPLITMUX_UNLOCK (splitmux); +} + +static void +handle_q_overrun (GstElement * q, gpointer user_data) +{ + MqStreamCtx *ctx = (MqStreamCtx *) (user_data); + GstSplitMuxSink *splitmux = ctx->splitmux; + gboolean allow_grow = FALSE; + + GST_SPLITMUX_LOCK (splitmux); + GST_DEBUG_OBJECT (q, + "Queue reported overrun with %d keyframes and %d cmds enqueued", + splitmux->queued_keyframes, g_queue_get_length (&splitmux->out_cmd_q)); + + if (splitmux->queued_keyframes < 2) { + /* Less than a full GOP queued, grow the queue */ + allow_grow = TRUE; + } else if (g_queue_get_length (&splitmux->out_cmd_q) < 1) { + allow_grow = TRUE; + } else { + /* If another queue is starved, grow */ + GList *cur; + for (cur = g_list_first (splitmux->contexts); + cur != NULL; cur = g_list_next (cur)) { + MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data); + if (tmpctx != ctx && g_queue_get_length (&tmpctx->queued_bufs) < 1) { + allow_grow = TRUE; + } + } + } + GST_SPLITMUX_UNLOCK (splitmux); + + if (allow_grow) { + guint cur_limit; + + g_object_get (q, "max-size-buffers", &cur_limit, NULL); + cur_limit++; + + GST_DEBUG_OBJECT (q, + "Queue overflowed and needs enlarging. Growing to %u buffers", + cur_limit); + + g_object_set (q, "max-size-buffers", cur_limit, NULL); + } +} + static GstPad * gst_splitmux_sink_request_new_pad (GstElement * element, GstPadTemplate * templ, const gchar * name, const GstCaps * caps) @@ -1414,7 +1552,8 @@ gst_splitmux_sink_request_new_pad (GstElement * element, GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element; GstPadTemplate *mux_template = NULL; GstPad *res = NULL; - GstPad *mq_sink, *mq_src; + GstElement *q; + GstPad *q_sink = NULL, *q_src = NULL; gchar *gname; gboolean is_video = FALSE; MqStreamCtx *ctx; @@ -1422,7 +1561,7 @@ gst_splitmux_sink_request_new_pad (GstElement * element, GST_DEBUG_OBJECT (element, "templ:%s, name:%s", templ->name_template, name); GST_SPLITMUX_LOCK (splitmux); - if (!create_elements (splitmux)) + if (!create_muxer (splitmux)) goto fail; if (templ->name_template) { @@ -1474,29 +1613,37 @@ gst_splitmux_sink_request_new_pad (GstElement * element, else gname = g_strdup (name); - if (!get_pads_from_mq (splitmux, &mq_sink, &mq_src)) { - gst_element_release_request_pad (splitmux->muxer, res); - gst_object_unref (GST_OBJECT (res)); + if ((q = create_element (splitmux, "queue", NULL, FALSE)) == NULL) goto fail; - } - if (gst_pad_link (mq_src, res) != GST_PAD_LINK_OK) { + gst_element_set_state (q, GST_STATE_TARGET (splitmux)); + + g_object_set (q, "max-size-bytes", 0, "max-size-time", (guint64) (0), + "max-size-buffers", 5, NULL); + + q_sink = gst_element_get_static_pad (q, "sink"); + q_src = gst_element_get_static_pad (q, "src"); + + if (gst_pad_link (q_src, res) != GST_PAD_LINK_OK) { gst_element_release_request_pad (splitmux->muxer, res); gst_object_unref (GST_OBJECT (res)); - gst_element_release_request_pad (splitmux->mq, mq_sink); - gst_object_unref (GST_OBJECT (mq_sink)); goto fail; } gst_object_unref (GST_OBJECT (res)); ctx = mq_stream_ctx_new (splitmux); - ctx->srcpad = mq_src; - ctx->sinkpad = mq_sink; + /* Context holds a ref: */ + ctx->q = gst_object_ref (q); + ctx->srcpad = q_src; + ctx->sinkpad = q_sink; + ctx->q_overrun_id = + g_signal_connect (q, "overrun", (GCallback) handle_q_overrun, ctx); + g_signal_connect (q, "underrun", (GCallback) handle_q_underrun, ctx); mq_stream_ctx_ref (ctx); ctx->src_pad_block_id = - gst_pad_add_probe (mq_src, GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM, + gst_pad_add_probe (q_src, GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM, (GstPadProbeCallback) handle_mq_output, ctx, (GDestroyNotify) _pad_block_destroy_src_notify); if (is_video && splitmux->reference_ctx != NULL) { @@ -1508,25 +1655,22 @@ gst_splitmux_sink_request_new_pad (GstElement * element, ctx->is_reference = TRUE; } - res = gst_ghost_pad_new_from_template (gname, mq_sink, templ); + res = gst_ghost_pad_new_from_template (gname, q_sink, templ); g_object_set_qdata ((GObject *) (res), PAD_CONTEXT, ctx); mq_stream_ctx_ref (ctx); ctx->sink_pad_block_id = - gst_pad_add_probe (mq_sink, GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM, + gst_pad_add_probe (q_sink, GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM, (GstPadProbeCallback) handle_mq_input, ctx, (GDestroyNotify) _pad_block_destroy_sink_notify); GST_DEBUG_OBJECT (splitmux, "Request pad %" GST_PTR_FORMAT - " is mq pad %" GST_PTR_FORMAT, res, mq_sink); + " feeds queue pad %" GST_PTR_FORMAT, res, q_sink); splitmux->contexts = g_list_prepend (splitmux->contexts, ctx); g_free (gname); - gst_object_unref (mq_sink); - gst_object_unref (mq_src); - if (is_video) splitmux->have_video = TRUE; @@ -1538,6 +1682,11 @@ gst_splitmux_sink_request_new_pad (GstElement * element, return res; fail: GST_SPLITMUX_UNLOCK (splitmux); + + if (q_sink) + gst_object_unref (q_sink); + if (q_src) + gst_object_unref (q_src); return NULL; already_have_video: GST_DEBUG_OBJECT (splitmux, "video sink pad already requested"); @@ -1549,23 +1698,18 @@ static void gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad) { GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element; - GstPad *mqsink, *mqsrc = NULL, *muxpad = NULL; + GstPad *muxpad = NULL; MqStreamCtx *ctx = (MqStreamCtx *) (g_object_get_qdata ((GObject *) (pad), PAD_CONTEXT)); GST_SPLITMUX_LOCK (splitmux); - if (splitmux->muxer == NULL || splitmux->mq == NULL) + if (splitmux->muxer == NULL) goto fail; /* Elements don't exist yet - nothing to release */ GST_INFO_OBJECT (pad, "releasing request pad"); - mqsink = gst_ghost_pad_get_target (GST_GHOST_PAD (pad)); - /* The ghostpad target might have disappeared during pipeline destruct */ - if (mqsink) - mqsrc = mq_sink_to_src (splitmux->mq, mqsink); - if (mqsrc) - muxpad = gst_pad_get_peer (mqsrc); + muxpad = gst_pad_get_peer (ctx->srcpad); /* Remove the context from our consideration */ splitmux->contexts = g_list_remove (splitmux->contexts, ctx); @@ -1581,24 +1725,15 @@ gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad) if (ctx == splitmux->reference_ctx) splitmux->reference_ctx = NULL; - /* Release and free the mq input */ - if (mqsink) { - gst_element_release_request_pad (splitmux->mq, mqsink); - gst_object_unref (mqsink); - } - /* Release and free the muxer input */ if (muxpad) { gst_element_release_request_pad (splitmux->muxer, muxpad); gst_object_unref (muxpad); } - if (mqsrc) - gst_object_unref (mqsrc); - if (GST_PAD_PAD_TEMPLATE (pad) && - g_str_equal (GST_PAD_TEMPLATE_NAME_TEMPLATE (GST_PAD_PAD_TEMPLATE (pad)), - "video")) + g_str_equal (GST_PAD_TEMPLATE_NAME_TEMPLATE (GST_PAD_PAD_TEMPLATE + (pad)), "video")) splitmux->have_video = FALSE; gst_element_remove_pad (element, pad); @@ -1638,21 +1773,9 @@ create_element (GstSplitMuxSink * splitmux, } static gboolean -create_elements (GstSplitMuxSink * splitmux) +create_muxer (GstSplitMuxSink * splitmux) { /* Create internal elements */ - if (splitmux->mq == NULL) { - if ((splitmux->mq = - create_element (splitmux, "multiqueue", "multiqueue", - FALSE)) == NULL) - goto fail; - - splitmux->mq_max_buffers = 5; - /* No bytes or time limit, we limit buffers manually */ - g_object_set (splitmux->mq, "max-size-bytes", 0, "max-size-time", - (guint64) 0, "max-size-buffers", splitmux->mq_max_buffers, NULL); - } - if (splitmux->muxer == NULL) { GstElement *provided_muxer = NULL; @@ -1768,6 +1891,15 @@ create_sink (GstSplitMuxSink * splitmux) } } +#if 1 + if (g_object_class_find_property (G_OBJECT_GET_CLASS (splitmux->sink), + "async") != NULL) { + /* async child elements are causing state change races and weird + * failures, so let's try and turn that off */ + g_object_set (splitmux->sink, "async", FALSE, NULL); + } +#endif + if (!gst_element_link (splitmux->muxer, splitmux->active_sink)) { g_warning ("Failed to link muxer and sink- splitmuxsink will not work"); goto fail; @@ -1867,7 +1999,7 @@ gst_splitmux_sink_change_state (GstElement * element, GstStateChange transition) switch (transition) { case GST_STATE_CHANGE_NULL_TO_READY:{ GST_SPLITMUX_LOCK (splitmux); - if (!create_elements (splitmux) || !create_sink (splitmux)) { + if (!create_muxer (splitmux) || !create_sink (splitmux)) { ret = GST_STATE_CHANGE_FAILURE; GST_SPLITMUX_UNLOCK (splitmux); goto beach; @@ -1879,23 +2011,25 @@ gst_splitmux_sink_change_state (GstElement * element, GstStateChange transition) case GST_STATE_CHANGE_READY_TO_PAUSED:{ GST_SPLITMUX_LOCK (splitmux); /* Start by collecting one input on each pad */ - splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START; + splitmux->input_state = SPLITMUX_INPUT_STATE_COLLECTING_GOP_START; + splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE; splitmux->max_in_running_time = GST_CLOCK_STIME_NONE; - splitmux->muxed_out_time = splitmux->mux_start_time = - GST_CLOCK_STIME_NONE; - splitmux->muxed_out_bytes = splitmux->mux_start_bytes = 0; - splitmux->opening_first_fragment = TRUE; + splitmux->gop_start_time = splitmux->muxed_out_time = + splitmux->fragment_start_time = GST_CLOCK_STIME_NONE; + splitmux->muxed_out_bytes = 0; GST_SPLITMUX_UNLOCK (splitmux); break; } case GST_STATE_CHANGE_PAUSED_TO_READY: case GST_STATE_CHANGE_READY_TO_NULL: GST_SPLITMUX_LOCK (splitmux); - splitmux->state = SPLITMUX_STATE_STOPPED; + splitmux->output_state = SPLITMUX_OUTPUT_STATE_STOPPED; + splitmux->input_state = SPLITMUX_INPUT_STATE_STOPPED; /* Wake up any blocked threads */ GST_LOG_OBJECT (splitmux, "State change -> NULL or READY. Waking threads"); - GST_SPLITMUX_BROADCAST (splitmux); + GST_SPLITMUX_BROADCAST_INPUT (splitmux); + GST_SPLITMUX_BROADCAST_OUTPUT (splitmux); GST_SPLITMUX_UNLOCK (splitmux); break; default: @@ -1941,10 +2075,10 @@ beach: ret == GST_STATE_CHANGE_FAILURE) { /* Cleanup elements on failed transition out of NULL */ gst_splitmux_reset (splitmux); + GST_SPLITMUX_LOCK (splitmux); + do_async_done (splitmux); + GST_SPLITMUX_UNLOCK (splitmux); } - GST_SPLITMUX_LOCK (splitmux); - do_async_done (splitmux); - GST_SPLITMUX_UNLOCK (splitmux); return ret; } diff --git a/gst/multifile/gstsplitmuxsink.h b/gst/multifile/gstsplitmuxsink.h index 11eb5d5e0a..afd73f7f2d 100644 --- a/gst/multifile/gstsplitmuxsink.h +++ b/gst/multifile/gstsplitmuxsink.h @@ -24,26 +24,39 @@ #include G_BEGIN_DECLS - #define GST_TYPE_SPLITMUX_SINK (gst_splitmux_sink_get_type()) #define GST_SPLITMUX_SINK(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_SPLITMUX_SINK,GstSplitMuxSink)) #define GST_SPLITMUX_SINK_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_SPLITMUX_SINK,GstSplitMuxSinkClass)) #define GST_IS_SPLITMUX_SINK(obj) (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_SPLITMUX_SINK)) #define GST_IS_SPLITMUX_SINK_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_SPLITMUX_SINK)) - typedef struct _GstSplitMuxSink GstSplitMuxSink; typedef struct _GstSplitMuxSinkClass GstSplitMuxSinkClass; -GType gst_splitmux_sink_get_type(void); +GType gst_splitmux_sink_get_type (void); gboolean register_splitmuxsink (GstPlugin * plugin); -typedef enum _SplitMuxState { - SPLITMUX_STATE_STOPPED, - SPLITMUX_STATE_COLLECTING_GOP_START, - SPLITMUX_STATE_WAITING_GOP_COMPLETE, - SPLITMUX_STATE_ENDING_FILE, - SPLITMUX_STATE_START_NEXT_FRAGMENT, -} SplitMuxState; +typedef enum _SplitMuxInputState +{ + SPLITMUX_INPUT_STATE_STOPPED, + SPLITMUX_INPUT_STATE_COLLECTING_GOP_START, /* Waiting for the next ref ctx keyframe */ + SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT, /* Waiting for all streams to collect GOP */ + SPLITMUX_INPUT_STATE_FINISHING_UP /* Got EOS from reference ctx, send everything */ +} SplitMuxInputState; + +typedef enum _SplitMuxOutputState +{ + SPLITMUX_OUTPUT_STATE_STOPPED, + SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND, /* Waiting first command packet from input */ + SPLITMUX_OUTPUT_STATE_OUTPUT_GOP, /* Outputting a collected GOP */ + SPLITMUX_OUTPUT_STATE_ENDING_FILE, /* Finishing the current fragment */ + SPLITMUX_OUTPUT_STATE_START_NEXT_FILE /* Restarting after ENDING_FILE */ +} SplitMuxOutputState; + +typedef struct _SplitMuxOutputCommand +{ + gboolean start_new_fragment; /* Whether to start a new fragment before advancing output ts */ + GstClockTimeDiff max_output_ts; /* Set the limit to stop GOP output */ +} SplitMuxOutputCommand; typedef struct _MqStreamBuf { @@ -59,6 +72,7 @@ typedef struct _MqStreamCtx GstSplitMuxSink *splitmux; + guint q_overrun_id; guint sink_pad_block_id; guint src_pad_block_id; @@ -67,6 +81,7 @@ typedef struct _MqStreamCtx gboolean flushing; gboolean in_eos; gboolean out_eos; + gboolean need_unblock; GstSegment in_segment; GstSegment out_segment; @@ -74,26 +89,24 @@ typedef struct _MqStreamCtx GstClockTimeDiff in_running_time; GstClockTimeDiff out_running_time; - guint64 in_bytes; - + GstElement *q; GQueue queued_bufs; GstPad *sinkpad; GstPad *srcpad; - gboolean out_blocked; - GstBuffer *cur_buffer; GstEvent *pending_gap; } MqStreamCtx; -struct _GstSplitMuxSink { +struct _GstSplitMuxSink +{ GstBin parent; GMutex lock; - GCond data_cond; + GCond input_cond; + GCond output_cond; - SplitMuxState state; gdouble mux_overhead; GstClockTime threshold_time; @@ -101,9 +114,6 @@ struct _GstSplitMuxSink { guint max_files; gboolean send_keyframe_requests; - guint mq_max_buffers; - - GstElement *mq; GstElement *muxer; GstElement *sink; @@ -112,25 +122,39 @@ struct _GstSplitMuxSink { GstElement *provided_sink; GstElement *active_sink; + gboolean ready_for_output; + gchar *location; guint fragment_id; GList *contexts; - MqStreamCtx *reference_ctx; - guint queued_gops; + SplitMuxInputState input_state; GstClockTimeDiff max_in_running_time; + /* Number of bytes sent to the + * current fragment */ + guint64 fragment_total_bytes; + /* Number of bytes we've collected into + * the GOP that's being collected */ + guint64 gop_total_bytes; + /* Start time of the current fragment */ + GstClockTimeDiff fragment_start_time; + /* Start time of the current GOP */ + GstClockTimeDiff gop_start_time; + + GQueue out_cmd_q; /* Queue of commands for output thread */ + + SplitMuxOutputState output_state; GstClockTimeDiff max_out_running_time; + GstClockTimeDiff next_max_out_running_time; GstClockTimeDiff muxed_out_time; guint64 muxed_out_bytes; - gboolean have_muxed_something; - gboolean update_mux_start_time; - GstClockTimeDiff mux_start_time; - guint64 mux_start_bytes; + MqStreamCtx *reference_ctx; + /* Count of queued keyframes in the reference ctx */ + guint queued_keyframes; - gboolean opening_first_fragment; gboolean switching_fragment; gboolean have_video; @@ -139,10 +163,10 @@ struct _GstSplitMuxSink { gboolean async_pending; }; -struct _GstSplitMuxSinkClass { +struct _GstSplitMuxSinkClass +{ GstBinClass parent_class; }; G_END_DECLS - #endif /* __GST_SPLITMUXSINK_H__ */