diff --git a/gst/multifile/gstsplitmuxsink.c b/gst/multifile/gstsplitmuxsink.c index 0f05f7b0e3..5d8b75f093 100644 --- a/gst/multifile/gstsplitmuxsink.c +++ b/gst/multifile/gstsplitmuxsink.c @@ -90,6 +90,7 @@ enum enum { SIGNAL_FORMAT_LOCATION, + SIGNAL_FORMAT_LOCATION_FULL, SIGNAL_LAST }; @@ -140,13 +141,15 @@ static GstStateChangeReturn gst_splitmux_sink_change_state (GstElement * element, GstStateChange transition); static void bus_handler (GstBin * bin, GstMessage * msg); -static void set_next_filename (GstSplitMuxSink * splitmux); -static void start_next_fragment (GstSplitMuxSink * splitmux); +static void set_next_filename (GstSplitMuxSink * splitmux, MqStreamCtx * ctx); +static void start_next_fragment (GstSplitMuxSink * splitmux, MqStreamCtx * ctx); static void check_queue_length (GstSplitMuxSink * splitmux, MqStreamCtx * ctx); static void mq_stream_ctx_unref (MqStreamCtx * ctx); static void gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux); +static void do_async_done (GstSplitMuxSink * splitmux); + static MqStreamBuf * mq_stream_buf_new (void) { @@ -244,6 +247,20 @@ gst_splitmux_sink_class_init (GstSplitMuxSinkClass * klass) signals[SIGNAL_FORMAT_LOCATION] = g_signal_new ("format-location", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_STRING, 1, G_TYPE_UINT); + + /** + * GstSplitMuxSink::format-location-full: + * @splitmux: the #GstSplitMuxSink + * @fragment_id: the sequence number of the file to be created + * @first_sample: A #GstSample containing the first buffer + * from the reference stream in the new file + * + * Returns: the location to be used for the next output file + */ + signals[SIGNAL_FORMAT_LOCATION_FULL] = + g_signal_new ("format-location-full", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_STRING, 2, G_TYPE_UINT, + GST_TYPE_SAMPLE); } static void @@ -614,8 +631,9 @@ complete_or_wait_on_out (GstSplitMuxSink * splitmux, MqStreamCtx * ctx) send_eos (splitmux, ctx); continue; } - } else if (splitmux->state == SPLITMUX_STATE_START_NEXT_FRAGMENT) { - start_next_fragment (splitmux); + } else if (ctx->is_reference + && splitmux->state == SPLITMUX_STATE_START_NEXT_FRAGMENT) { + start_next_fragment (splitmux, ctx); continue; } @@ -713,6 +731,20 @@ handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx) if (splitmux->state == SPLITMUX_STATE_STOPPED) goto beach; + /* When we get a gap event on the + * reference stream and we're trying to open a + * new file, we need to store it until we get + * the buffer afterwards + */ + if (ctx->is_reference && + (splitmux->opening_first_fragment || + splitmux->state == SPLITMUX_STATE_START_NEXT_FRAGMENT)) { + GST_DEBUG_OBJECT (pad, "Storing GAP event until buffer arrives"); + gst_event_replace (&ctx->pending_gap, event); + GST_SPLITMUX_UNLOCK (splitmux); + return GST_PAD_PROBE_HANDLED; + } + if (rtime != GST_CLOCK_STIME_NONE) { ctx->out_running_time = rtime; complete_or_wait_on_out (splitmux, ctx); @@ -735,7 +767,8 @@ handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx) if (splitmux->state == SPLITMUX_STATE_STOPPED) goto beach; ctx->out_running_time = ts; - complete_or_wait_on_out (splitmux, ctx); + if (!ctx->is_reference) + complete_or_wait_on_out (splitmux, ctx); GST_SPLITMUX_UNLOCK (splitmux); return GST_PAD_PROBE_DROP; } @@ -758,17 +791,18 @@ handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx) splitmux->queued_gops--; ctx->out_running_time = buf_info->run_ts; + ctx->cur_buffer = gst_pad_probe_info_get_buffer (info); GST_LOG_OBJECT (splitmux, "Pad %" GST_PTR_FORMAT " buffer with run TS %" GST_STIME_FORMAT " size %" G_GUINT64_FORMAT, pad, GST_STIME_ARGS (ctx->out_running_time), buf_info->buf_size); - if (splitmux->opening_first_fragment) { + if (ctx->is_reference && splitmux->opening_first_fragment) { if (request_next_keyframe (splitmux) == FALSE) GST_WARNING_OBJECT (splitmux, "Could not request a keyframe. Files may not split at the exact location they should"); - send_fragment_opened_closed_msg (splitmux, TRUE); + start_next_fragment (splitmux, ctx); splitmux->opening_first_fragment = FALSE; } @@ -797,8 +831,23 @@ handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx) } #endif + ctx->cur_buffer = NULL; GST_SPLITMUX_UNLOCK (splitmux); + /* pending_gap is protected by the STREAM lock */ + if (ctx->pending_gap) { + /* If we previously stored a gap event, send it now */ + GstPad *peer = gst_pad_get_peer (ctx->srcpad); + + GST_DEBUG_OBJECT (splitmux, + "Pad %" GST_PTR_FORMAT " sending pending GAP event", ctx->srcpad); + + gst_pad_send_event (peer, ctx->pending_gap); + ctx->pending_gap = NULL; + + gst_object_unref (peer); + } + mq_stream_buf_free (buf_info); return GST_PAD_PROBE_PASS; @@ -833,7 +882,7 @@ restart_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux) * a new fragment */ static void -start_next_fragment (GstSplitMuxSink * splitmux) +start_next_fragment (GstSplitMuxSink * splitmux, MqStreamCtx * ctx) { /* 1 change to new file */ splitmux->switching_fragment = TRUE; @@ -843,7 +892,7 @@ start_next_fragment (GstSplitMuxSink * splitmux) gst_element_set_state (splitmux->muxer, GST_STATE_NULL); gst_element_set_state (splitmux->active_sink, GST_STATE_NULL); - set_next_filename (splitmux); + set_next_filename (splitmux, ctx); gst_element_set_state (splitmux->active_sink, GST_STATE_TARGET (splitmux)); gst_element_set_state (splitmux->muxer, GST_STATE_TARGET (splitmux)); @@ -851,27 +900,30 @@ start_next_fragment (GstSplitMuxSink * splitmux) gst_element_set_locked_state (splitmux->active_sink, FALSE); splitmux->switching_fragment = FALSE; + do_async_done (splitmux); g_list_foreach (splitmux->contexts, (GFunc) restart_context, splitmux); - /* Switch state and go back to processing */ - if (!splitmux->reference_ctx->in_eos) { - splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START; - splitmux->max_out_running_time = splitmux->reference_ctx->in_running_time; - } else { - splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE; - splitmux->max_out_running_time = GST_CLOCK_STIME_NONE; - splitmux->have_muxed_something = FALSE; + if (!splitmux->opening_first_fragment) { + /* Switch state and go back to processing */ + if (!splitmux->reference_ctx->in_eos) { + splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START; + splitmux->max_out_running_time = splitmux->reference_ctx->in_running_time; + } else { + splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE; + splitmux->max_out_running_time = GST_CLOCK_STIME_NONE; + splitmux->have_muxed_something = FALSE; + } + splitmux->have_muxed_something = + (splitmux->reference_ctx->in_running_time > splitmux->muxed_out_time); + + /* Store the overflow parameters as the basis for the next fragment */ + splitmux->mux_start_bytes = splitmux->muxed_out_bytes; + + GST_DEBUG_OBJECT (splitmux, + "Restarting flow for new fragment. New running time %" GST_STIME_FORMAT, + GST_STIME_ARGS (splitmux->max_out_running_time)); } - splitmux->have_muxed_something = - (splitmux->reference_ctx->in_running_time > splitmux->muxed_out_time); - - /* Store the overflow parameters as the basis for the next fragment */ - splitmux->mux_start_bytes = splitmux->muxed_out_bytes; - - GST_DEBUG_OBJECT (splitmux, - "Restarting flow for new fragment. New running time %" GST_STIME_FORMAT, - GST_STIME_ARGS (splitmux->max_out_running_time)); send_fragment_opened_closed_msg (splitmux, TRUE); @@ -1561,7 +1613,7 @@ fail: static GstElement * create_element (GstSplitMuxSink * splitmux, - const gchar * factory, const gchar * name) + const gchar * factory, const gchar * name, gboolean locked) { GstElement *ret = gst_element_factory_make (factory, name); if (ret == NULL) { @@ -1569,6 +1621,13 @@ create_element (GstSplitMuxSink * splitmux, return NULL; } + if (locked) { + /* Ensure the sink starts in locked state and NULL - it will be changed + * by the filename setting code */ + gst_element_set_locked_state (ret, TRUE); + gst_element_set_state (ret, GST_STATE_NULL); + } + if (!gst_bin_add (GST_BIN (splitmux), ret)) { g_warning ("Could not add %s element - splitmuxsink will not work", name); gst_object_unref (ret); @@ -1584,7 +1643,8 @@ create_elements (GstSplitMuxSink * splitmux) /* Create internal elements */ if (splitmux->mq == NULL) { if ((splitmux->mq = - create_element (splitmux, "multiqueue", "multiqueue")) == NULL) + create_element (splitmux, "multiqueue", "multiqueue", + FALSE)) == NULL) goto fail; splitmux->mq_max_buffers = 5; @@ -1603,7 +1663,7 @@ create_elements (GstSplitMuxSink * splitmux) if (provided_muxer == NULL) { if ((splitmux->muxer = - create_element (splitmux, "mp4mux", "muxer")) == NULL) + create_element (splitmux, "mp4mux", "muxer", FALSE)) == NULL) goto fail; } else { /* Ensure it's not in locked state (we might be reusing an old element) */ @@ -1680,12 +1740,14 @@ create_sink (GstSplitMuxSink * splitmux) if (provided_sink == NULL) { if ((splitmux->sink = - create_element (splitmux, DEFAULT_SINK, "sink")) == NULL) + create_element (splitmux, DEFAULT_SINK, "sink", TRUE)) == NULL) goto fail; splitmux->active_sink = splitmux->sink; } else { - /* Ensure it's not in locked state (we might be reusing an old element) */ - gst_element_set_locked_state (provided_sink, FALSE); + /* Ensure the sink starts in locked state and NULL - it will be changed + * by the filename setting code */ + gst_element_set_locked_state (provided_sink, TRUE); + gst_element_set_state (provided_sink, GST_STATE_NULL); if (!gst_bin_add (GST_BIN (splitmux), provided_sink)) { g_warning ("Could not add sink elements - splitmuxsink will not work"); gst_object_unref (provided_sink); @@ -1721,13 +1783,30 @@ fail: #pragma GCC diagnostic ignored "-Wformat-nonliteral" #endif static void -set_next_filename (GstSplitMuxSink * splitmux) +set_next_filename (GstSplitMuxSink * splitmux, MqStreamCtx * ctx) { gchar *fname = NULL; + GstSample *sample; + GstCaps *caps; + gst_splitmux_sink_ensure_max_files (splitmux); - g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION], 0, - splitmux->fragment_id, &fname); + if (ctx->cur_buffer == NULL) + g_warning ("Starting next file without buffer"); + + caps = gst_pad_get_current_caps (ctx->srcpad); + sample = gst_sample_new (ctx->cur_buffer, caps, &ctx->out_segment, NULL); + g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION_FULL], 0, + splitmux->fragment_id, sample, &fname); + gst_sample_unref (sample); + if (caps) + gst_caps_unref (caps); + + if (fname == NULL) { + /* Fallback to the old signal if the new one returned nothing */ + g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION], 0, + splitmux->fragment_id, &fname); + } if (!fname) fname = splitmux->location ? @@ -1742,6 +1821,43 @@ set_next_filename (GstSplitMuxSink * splitmux) } } +static void +do_async_start (GstSplitMuxSink * splitmux) +{ + GstMessage *message; + + if (!splitmux->need_async_start) { + GST_INFO_OBJECT (splitmux, "no async_start needed"); + return; + } + + splitmux->async_pending = TRUE; + + GST_INFO_OBJECT (splitmux, "Sending async_start message"); + message = gst_message_new_async_start (GST_OBJECT_CAST (splitmux)); + GST_BIN_CLASS (parent_class)->handle_message (GST_BIN_CAST + (splitmux), message); +} + +static void +do_async_done (GstSplitMuxSink * splitmux) +{ + GstMessage *message; + + if (splitmux->async_pending) { + GST_INFO_OBJECT (splitmux, "Sending async_done message"); + message = + gst_message_new_async_done (GST_OBJECT_CAST (splitmux), + GST_CLOCK_TIME_NONE); + GST_BIN_CLASS (parent_class)->handle_message (GST_BIN_CAST + (splitmux), message); + + splitmux->async_pending = FALSE; + } + + splitmux->need_async_start = FALSE; +} + static GstStateChangeReturn gst_splitmux_sink_change_state (GstElement * element, GstStateChange transition) { @@ -1758,7 +1874,6 @@ gst_splitmux_sink_change_state (GstElement * element, GstStateChange transition) } GST_SPLITMUX_UNLOCK (splitmux); splitmux->fragment_id = 0; - set_next_filename (splitmux); break; } case GST_STATE_CHANGE_READY_TO_PAUSED:{ @@ -1792,12 +1907,29 @@ gst_splitmux_sink_change_state (GstElement * element, GstStateChange transition) goto beach; switch (transition) { + case GST_STATE_CHANGE_PLAYING_TO_PAUSED: + splitmux->need_async_start = TRUE; + break; + case GST_STATE_CHANGE_READY_TO_PAUSED:{ + /* Change state async, because our child sink might not + * be ready to do that for us yet if it's state is still locked */ + + splitmux->need_async_start = TRUE; + /* we want to go async to PAUSED until we managed to configure and add the + * sink */ + GST_SPLITMUX_LOCK (splitmux); + do_async_start (splitmux); + GST_SPLITMUX_UNLOCK (splitmux); + ret = GST_STATE_CHANGE_ASYNC; + break; + } case GST_STATE_CHANGE_READY_TO_NULL: GST_SPLITMUX_LOCK (splitmux); splitmux->fragment_id = 0; /* Reset internal elements only if no pad contexts are using them */ if (splitmux->contexts == NULL) gst_splitmux_reset (splitmux); + do_async_done (splitmux); GST_SPLITMUX_UNLOCK (splitmux); break; default: @@ -1805,12 +1937,14 @@ gst_splitmux_sink_change_state (GstElement * element, GstStateChange transition) } beach: - if (transition == GST_STATE_CHANGE_NULL_TO_READY && ret == GST_STATE_CHANGE_FAILURE) { /* Cleanup elements on failed transition out of NULL */ gst_splitmux_reset (splitmux); } + GST_SPLITMUX_LOCK (splitmux); + do_async_done (splitmux); + GST_SPLITMUX_UNLOCK (splitmux); return ret; } diff --git a/gst/multifile/gstsplitmuxsink.h b/gst/multifile/gstsplitmuxsink.h index 534f54a220..11eb5d5e0a 100644 --- a/gst/multifile/gstsplitmuxsink.h +++ b/gst/multifile/gstsplitmuxsink.h @@ -82,6 +82,9 @@ typedef struct _MqStreamCtx GstPad *srcpad; gboolean out_blocked; + + GstBuffer *cur_buffer; + GstEvent *pending_gap; } MqStreamCtx; struct _GstSplitMuxSink { @@ -131,6 +134,9 @@ struct _GstSplitMuxSink { gboolean switching_fragment; gboolean have_video; + + gboolean need_async_start; + gboolean async_pending; }; struct _GstSplitMuxSinkClass { diff --git a/tests/check/elements/splitmux.c b/tests/check/elements/splitmux.c index 7f4896d7fe..ce11da96c4 100644 --- a/tests/check/elements/splitmux.c +++ b/tests/check/elements/splitmux.c @@ -192,6 +192,20 @@ GST_START_TEST (test_splitmuxsrc_format_location) GST_END_TEST; +static gchar * +check_format_location (GstElement * object, + guint fragment_id, GstSample * first_sample) +{ + GstBuffer *buf = gst_sample_get_buffer (first_sample); + + /* Must have a buffer */ + fail_if (buf == NULL); + GST_LOG ("New file - first buffer %" GST_TIME_FORMAT, + GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buf))); + + return NULL; +} + GST_START_TEST (test_splitmuxsink) { GstMessage *msg; @@ -213,6 +227,8 @@ GST_START_TEST (test_splitmuxsink) fail_if (pipeline == NULL); sink = gst_bin_get_by_name (GST_BIN (pipeline), "splitsink"); fail_if (sink == NULL); + g_signal_connect (sink, "format-location-full", + (GCallback) check_format_location, NULL); dest_pattern = g_build_filename (tmpdir, "out%05d.ogg", NULL); g_object_set (G_OBJECT (sink), "location", dest_pattern, NULL); g_free (dest_pattern);