From d11339d61689a750672cffc6b88906e5bd977ee0 Mon Sep 17 00:00:00 2001 From: Vivia Nikolaidou Date: Tue, 13 Jun 2017 17:42:55 +0300 Subject: [PATCH] splitmuxsink: Added new async-finalize mode This mode is useful for muxers that can take a long time to finalize a file. Instead of blocking the whole upstream pipeline while the muxer is doing its stuff, we can unlink it and spawn a new muxer+sink combination to continue running normally. This requires us to receive the muxer and sink (if needed) as factories, optionally accompanied by their respective properties structures. Also added the muxer-added and sink-added signals, in case custom code has to be called for them. https://bugzilla.gnome.org/show_bug.cgi?id=783754 --- gst/multifile/gstsplitmuxsink.c | 566 ++++++++++++++++++++++++++++++-- gst/multifile/gstsplitmuxsink.h | 9 + tests/check/elements/splitmux.c | 69 ++++ 3 files changed, 611 insertions(+), 33 deletions(-) diff --git a/gst/multifile/gstsplitmuxsink.c b/gst/multifile/gstsplitmuxsink.c index 7846bc5ee1..5ccc13cff0 100644 --- a/gst/multifile/gstsplitmuxsink.c +++ b/gst/multifile/gstsplitmuxsink.c @@ -38,6 +38,13 @@ * file parts to be played individually correctly. In the absence of a video * stream, the first available stream is used as reference for synchronization. * + * In the async-finalize mode, when the threshold is crossed, the old muxer + * and sink is disconnected from the pipeline and left to finish the file + * asynchronously, and a new muxer and sink is created to continue with the + * next fragment. For that reason, instead of muxer and sink objects, the + * muxer-factory and sink-factory properties are used to construct the new + * objects, together with muxer-properties and sink-properties. + * * * Example pipelines * |[ @@ -46,6 +53,13 @@ * Records a video stream captured from a v4l2 device and muxes it into * ISO mp4 files, splitting as needed to limit size/duration to 10 seconds * and 1MB maximum size. + * + * |[ + * gst-launch-1.0 -e v4l2src num-buffers=500 ! video/x-raw,width=320,height=240 ! videoconvert ! queue ! timeoverlay ! x264enc key-int-max=10 ! h264parse ! splitmuxsink location=video%02d.mkv max-size-time=10000000000 muxer-factory=matroskamux muxer-properties="properties,streamable=true" + * ]| + * Records a video stream captured from a v4l2 device and muxer it into + * streamable Matroska files, splitting as needed to limit size/duration to 10 + * seconds. Each file will finalize asynchronously. * */ @@ -86,6 +100,11 @@ enum PROP_MUXER, PROP_SINK, PROP_RESET_MUXER, + PROP_ASYNC_FINALIZE, + PROP_MUXER_FACTORY, + PROP_MUXER_PROPERTIES, + PROP_SINK_FACTORY, + PROP_SINK_PROPERTIES }; #define DEFAULT_MAX_SIZE_TIME 0 @@ -98,12 +117,21 @@ enum #define DEFAULT_SINK "filesink" #define DEFAULT_USE_ROBUST_MUXING FALSE #define DEFAULT_RESET_MUXER TRUE +#define DEFAULT_ASYNC_FINALIZE FALSE + +typedef struct _AsyncEosHelper +{ + MqStreamCtx *ctx; + GstPad *pad; +} AsyncEosHelper; enum { SIGNAL_FORMAT_LOCATION, SIGNAL_FORMAT_LOCATION_FULL, SIGNAL_SPLIT_NOW, + SIGNAL_MUXER_ADDED, + SIGNAL_SINK_ADDED, SIGNAL_LAST }; @@ -131,11 +159,30 @@ GST_STATIC_PAD_TEMPLATE ("caption_%u", GST_STATIC_CAPS_ANY); static GQuark PAD_CONTEXT; +static GQuark EOS_FROM_US; +static GQuark RUNNING_TIME; +/* EOS_FROM_US is only valid in async-finalize mode. We need to know whether + * to forward an incoming EOS message, but we cannot rely on the state of the + * splitmux anymore, so we set this qdata on the sink instead. + * The muxer and sink must be destroyed after both of these things have + * finished: + * 1) The EOS message has been sent when the fragment is ending + * 2) The muxer has been unlinked and relinked + * Therefore, EOS_FROM_US can have these two values: + * 0: EOS was not requested from us. Forward the message. The muxer and the + * sink will be destroyed together with the rest of the bin. + * 1: EOS was requested from us, but the other of the two tasks hasn't + * finished. Set EOS_FROM_US to 2 and do your stuff. + * 2: EOS was requested from us and the other of the two tasks has finished. + * Now we can destroy the muxer and the sink. + */ static void _do_init (void) { PAD_CONTEXT = g_quark_from_static_string ("pad-context"); + EOS_FROM_US = g_quark_from_static_string ("eos-from-us"); + RUNNING_TIME = g_quark_from_static_string ("running-time"); } #define gst_splitmux_sink_parent_class parent_class @@ -275,11 +322,13 @@ gst_splitmux_sink_class_init (GstSplitMuxSinkClass * klass) g_object_class_install_property (gobject_class, PROP_MUXER, g_param_spec_object ("muxer", "Muxer", - "The muxer element to use (NULL = default mp4mux)", + "The muxer element to use (NULL = default mp4mux). " + "Valid only for async-finalize = FALSE", GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); g_object_class_install_property (gobject_class, PROP_SINK, g_param_spec_object ("sink", "Sink", - "The sink element (or element chain) to use (NULL = default filesink)", + "The sink element (or element chain) to use (NULL = default filesink). " + "Valid only for async-finalize = FALSE", GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); g_object_class_install_property (gobject_class, PROP_USE_ROBUST_MUXING, @@ -299,6 +348,34 @@ gst_splitmux_sink_class_init (GstSplitMuxSinkClass * klass) "Reset the muxer after each segment. Disabling this will not work for most muxers.", DEFAULT_RESET_MUXER, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + g_object_class_install_property (gobject_class, PROP_ASYNC_FINALIZE, + g_param_spec_boolean ("async-finalize", + "Finalize fragments asynchronously", + "Finalize each fragment asynchronously and start a new one", + DEFAULT_ASYNC_FINALIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + g_object_class_install_property (gobject_class, PROP_MUXER_FACTORY, + g_param_spec_string ("muxer-factory", "Muxer factory", + "The muxer element factory to use (default = mp4mux). " + "Valid only for async-finalize = TRUE", + "mp4mux", G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + g_object_class_install_property (gobject_class, PROP_MUXER_PROPERTIES, + g_param_spec_boxed ("muxer-properties", "Muxer properties", + "The muxer element properties to use. " + "Example: {properties,boolean-prop=true,string-prop=\"hi\"}. " + "Valid only for async-finalize = TRUE", + GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + g_object_class_install_property (gobject_class, PROP_SINK_FACTORY, + g_param_spec_string ("sink-factory", "Sink factory", + "The sink element factory to use (default = filesink). " + "Valid only for async-finalize = TRUE", + "filesink", G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + g_object_class_install_property (gobject_class, PROP_SINK_PROPERTIES, + g_param_spec_boxed ("sink-properties", "Sink properties", + "The sink element properties to use. " + "Example: {properties,boolean-prop=true,string-prop=\"hi\"}. " + "Valid only for async-finalize = TRUE", + GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + /** * GstSplitMuxSink::format-location: * @splitmux: the #GstSplitMuxSink @@ -333,12 +410,33 @@ gst_splitmux_sink_class_init (GstSplitMuxSinkClass * klass) * * Since: 1.14 */ - signals[SIGNAL_SPLIT_NOW] = g_signal_new ("split-now", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstSplitMuxSinkClass, split_now), NULL, NULL, NULL, G_TYPE_NONE, 0); + /** + * GstSplitMuxSink::muxer-added: + * @splitmux: the #GstSplitMuxSink + * @muxer: the newly added muxer element + * + * Since: 1.14 + */ + signals[SIGNAL_MUXER_ADDED] = + g_signal_new ("muxer-added", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 1, GST_TYPE_ELEMENT); + + /** + * GstSplitMuxSink::sink-added: + * @splitmux: the #GstSplitMuxSink + * @sink: the newly added sink element + * + * Since: 1.14 + */ + signals[SIGNAL_SINK_ADDED] = + g_signal_new ("sink-added", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 1, GST_TYPE_ELEMENT); + klass->split_now = split_now; } @@ -362,6 +460,12 @@ gst_splitmux_sink_init (GstSplitMuxSink * splitmux) splitmux->threshold_timecode_str = NULL; + splitmux->async_finalize = DEFAULT_ASYNC_FINALIZE; + splitmux->muxer_factory = g_strdup (DEFAULT_MUXER); + splitmux->muxer_properties = NULL; + splitmux->sink_factory = g_strdup (DEFAULT_SINK); + splitmux->sink_properties = NULL; + GST_OBJECT_FLAG_SET (splitmux, GST_ELEMENT_FLAG_SINK); splitmux->split_now = FALSE; } @@ -409,6 +513,15 @@ gst_splitmux_sink_finalize (GObject * object) if (splitmux->provided_muxer) gst_object_unref (splitmux->provided_muxer); + if (splitmux->muxer_factory) + g_free (splitmux->muxer_factory); + if (splitmux->muxer_properties) + gst_structure_free (splitmux->muxer_properties); + if (splitmux->sink_factory) + g_free (splitmux->sink_factory); + if (splitmux->sink_properties) + gst_structure_free (splitmux->sink_properties); + if (splitmux->threshold_timecode_str) g_free (splitmux->threshold_timecode_str); @@ -533,6 +646,47 @@ gst_splitmux_sink_set_property (GObject * object, guint prop_id, splitmux->reset_muxer = g_value_get_boolean (value); GST_OBJECT_UNLOCK (splitmux); break; + case PROP_ASYNC_FINALIZE: + GST_OBJECT_LOCK (splitmux); + splitmux->async_finalize = g_value_get_boolean (value); + GST_OBJECT_UNLOCK (splitmux); + break; + case PROP_MUXER_FACTORY: + GST_OBJECT_LOCK (splitmux); + if (splitmux->muxer_factory) + g_free (splitmux->muxer_factory); + splitmux->muxer_factory = g_value_dup_string (value); + GST_OBJECT_UNLOCK (splitmux); + break; + case PROP_MUXER_PROPERTIES: + GST_OBJECT_LOCK (splitmux); + if (splitmux->muxer_properties) + gst_structure_free (splitmux->muxer_properties); + if (gst_value_get_structure (value)) + splitmux->muxer_properties = + gst_structure_copy (gst_value_get_structure (value)); + else + splitmux->muxer_properties = NULL; + GST_OBJECT_UNLOCK (splitmux); + break; + case PROP_SINK_FACTORY: + GST_OBJECT_LOCK (splitmux); + if (splitmux->sink_factory) + g_free (splitmux->sink_factory); + splitmux->sink_factory = g_value_dup_string (value); + GST_OBJECT_UNLOCK (splitmux); + break; + case PROP_SINK_PROPERTIES: + GST_OBJECT_LOCK (splitmux); + if (splitmux->sink_properties) + gst_structure_free (splitmux->sink_properties); + if (gst_value_get_structure (value)) + splitmux->sink_properties = + gst_structure_copy (gst_value_get_structure (value)); + else + splitmux->sink_properties = NULL; + GST_OBJECT_UNLOCK (splitmux); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -606,6 +760,31 @@ gst_splitmux_sink_get_property (GObject * object, guint prop_id, g_value_set_boolean (value, splitmux->reset_muxer); GST_OBJECT_UNLOCK (splitmux); break; + case PROP_ASYNC_FINALIZE: + GST_OBJECT_LOCK (splitmux); + g_value_set_boolean (value, splitmux->async_finalize); + GST_OBJECT_UNLOCK (splitmux); + break; + case PROP_MUXER_FACTORY: + GST_OBJECT_LOCK (splitmux); + g_value_set_string (value, splitmux->muxer_factory); + GST_OBJECT_UNLOCK (splitmux); + break; + case PROP_MUXER_PROPERTIES: + GST_OBJECT_LOCK (splitmux); + gst_value_set_structure (value, splitmux->muxer_properties); + GST_OBJECT_UNLOCK (splitmux); + break; + case PROP_SINK_FACTORY: + GST_OBJECT_LOCK (splitmux); + g_value_set_string (value, splitmux->sink_factory); + GST_OBJECT_UNLOCK (splitmux); + break; + case PROP_SINK_PROPERTIES: + GST_OBJECT_LOCK (splitmux); + gst_value_set_structure (value, splitmux->sink_properties); + GST_OBJECT_UNLOCK (splitmux); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -690,14 +869,22 @@ _pad_block_destroy_src_notify (MqStreamCtx * ctx) } static void -send_fragment_opened_closed_msg (GstSplitMuxSink * splitmux, gboolean opened) +send_fragment_opened_closed_msg (GstSplitMuxSink * splitmux, gboolean opened, + GstElement * sink) { gchar *location = NULL; GstMessage *msg; const gchar *msg_name = opened ? "splitmuxsink-fragment-opened" : "splitmuxsink-fragment-closed"; + GstClockTime running_time = splitmux->reference_ctx->out_running_time; - g_object_get (splitmux->sink, "location", &location, NULL); + if (!opened) { + GstClockTime *rtime = g_object_get_qdata (G_OBJECT (sink), RUNNING_TIME); + if (rtime) + running_time = *rtime; + } + + g_object_get (sink, "location", &location, NULL); /* If it's in the middle of a teardown, the reference_ctc might have become * NULL */ @@ -705,14 +892,36 @@ send_fragment_opened_closed_msg (GstSplitMuxSink * splitmux, gboolean opened) msg = gst_message_new_element (GST_OBJECT (splitmux), gst_structure_new (msg_name, "location", G_TYPE_STRING, location, - "running-time", GST_TYPE_CLOCK_TIME, - splitmux->reference_ctx->out_running_time, NULL)); + "running-time", GST_TYPE_CLOCK_TIME, running_time, NULL)); gst_element_post_message (GST_ELEMENT_CAST (splitmux), msg); } g_free (location); } +static void +send_eos_async (GstSplitMuxSink * splitmux, AsyncEosHelper * helper) +{ + GstEvent *eos; + GstPad *pad; + MqStreamCtx *ctx; + + eos = gst_event_new_eos (); + pad = helper->pad; + ctx = helper->ctx; + + GST_SPLITMUX_LOCK (splitmux); + if (!pad) + pad = gst_pad_get_peer (ctx->srcpad); + GST_SPLITMUX_UNLOCK (splitmux); + + gst_pad_send_event (pad, eos); + GST_INFO_OBJECT (splitmux, "Sent async EOS on %" GST_PTR_FORMAT, pad); + + gst_object_unref (pad); + g_free (helper); +} + /* Called with lock held, drops the lock to send EOS to the * pad */ @@ -735,6 +944,54 @@ send_eos (GstSplitMuxSink * splitmux, MqStreamCtx * ctx) gst_object_unref (pad); } +/* Called with lock held. Schedules an EOS event to the ctx pad + * to happen in another thread */ +static void +eos_context_async (MqStreamCtx * ctx, GstSplitMuxSink * splitmux) +{ + AsyncEosHelper *helper = g_new0 (AsyncEosHelper, 1); + GstPad *srcpad, *sinkpad; + + srcpad = ctx->srcpad; + sinkpad = gst_pad_get_peer (srcpad); + + helper->ctx = ctx; + helper->pad = sinkpad; /* Takes the reference */ + + ctx->out_eos_async_done = TRUE; + /* HACK: Here, we explicitly unset the SINK flag on the target sink element + * that's about to be asynchronously disposed, so that it no longer + * participates in GstBin EOS logic. This fixes a race where if + * splitmuxsink really reaches EOS before an asynchronous background + * element has finished, then the bin won't actually send EOS to the + * pipeline. Even after finishing and removing the old element, the + * bin doesn't re-check EOS status on removing a SINK element. This + * should be fixed in core, making this hack unnecessary. */ + GST_OBJECT_FLAG_UNSET (splitmux->active_sink, GST_ELEMENT_FLAG_SINK); + + GST_DEBUG_OBJECT (splitmux, "scheduled EOS to pad %" GST_PTR_FORMAT " ctx %p", + sinkpad, ctx); + + g_assert_nonnull (helper->pad); + gst_element_call_async (GST_ELEMENT (splitmux), + (GstElementCallAsyncFunc) send_eos_async, helper, NULL); +} + +/* Called with lock held. TRUE iff all contexts have a + * pending (or delivered) async eos event */ +static gboolean +all_contexts_are_async_eos (GstSplitMuxSink * splitmux) +{ + gboolean ret = TRUE; + GList *item; + + for (item = splitmux->contexts; item; item = item->next) { + MqStreamCtx *ctx = item->data; + ret &= ctx->out_eos_async_done; + } + return ret; +} + /* Called with splitmux lock held to check if this output * context needs to sleep to wait for the release of the * next GOP, or to send EOS to close out the current file @@ -789,8 +1046,31 @@ complete_or_wait_on_out (GstSplitMuxSink * splitmux, MqStreamCtx * ctx) case SPLITMUX_OUTPUT_STATE_ENDING_FILE: /* We've reached the max out running_time to get here, so end this file now */ if (ctx->out_eos == FALSE) { - send_eos (splitmux, ctx); - continue; + if (splitmux->async_finalize) { + /* We must set EOS asynchronously at this point. We cannot defer + * it, because we need all contexts to wake up, for the + * reference context to eventually give us something at + * START_NEXT_FILE. Otherwise, collectpads might choose another + * context to give us the first buffer, and format-location-full + * will not contain a valid sample. */ + g_object_set_qdata ((GObject *) splitmux->sink, EOS_FROM_US, + GINT_TO_POINTER (1)); + eos_context_async (ctx, splitmux); + if (all_contexts_are_async_eos (splitmux)) { + GST_INFO_OBJECT (splitmux, + "All contexts are async_eos. Moving to the next file."); + /* We can start the next file once we've asked each pad to go EOS */ + splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE; + GST_SPLITMUX_BROADCAST_OUTPUT (splitmux); + continue; + } + } else { + send_eos (splitmux, ctx); + continue; + } + } else { + GST_INFO_OBJECT (splitmux, + "At end-of-file state, but context %p is already EOS", ctx); } break; case SPLITMUX_OUTPUT_STATE_START_NEXT_FILE: @@ -798,11 +1078,12 @@ complete_or_wait_on_out (GstSplitMuxSink * splitmux, MqStreamCtx * ctx) /* Special handling on the reference ctx to start new fragments * and collect commands from the command queue */ /* drops the splitmux lock briefly: */ + /* We must have reference ctx in order for format-location-full to + * have a sample */ start_next_fragment (splitmux, ctx); continue; } break; - case SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND:{ do { SplitMuxOutputCommand *cmd = @@ -982,6 +1263,8 @@ handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx) if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED) goto beach; ctx->out_eos = TRUE; + GST_INFO_OBJECT (splitmux, + "Have EOS event at pad %" GST_PTR_FORMAT " ctx %p", pad, ctx); break; case GST_EVENT_GAP:{ GstClockTime gap_ts; @@ -1169,6 +1452,15 @@ resend_sticky (GstPad * pad, GstEvent ** event, GstPad * peer) return gst_pad_send_event (peer, gst_event_ref (*event)); } +static void +unlock_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux) +{ + if (ctx->fragment_block_id > 0) { + gst_pad_remove_probe (ctx->srcpad, ctx->fragment_block_id); + ctx->fragment_block_id = 0; + } +} + static void restart_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux) { @@ -1179,10 +1471,83 @@ restart_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux) /* Clear EOS flag if not actually EOS */ ctx->out_eos = GST_PAD_IS_EOS (ctx->srcpad); + ctx->out_eos_async_done = ctx->out_eos; gst_object_unref (peer); } +static void +relink_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux) +{ + GstPad *sinkpad, *srcpad, *newpad; + GstPadTemplate *templ; + + srcpad = ctx->srcpad; + sinkpad = gst_pad_get_peer (srcpad); + + templ = sinkpad->padtemplate; + newpad = + gst_element_request_pad (splitmux->muxer, templ, + GST_PAD_TEMPLATE_NAME_TEMPLATE (templ), NULL); + + GST_DEBUG_OBJECT (splitmux, "Relinking ctx %p to pad %" GST_PTR_FORMAT, ctx, + newpad); + if (!gst_pad_unlink (srcpad, sinkpad)) { + gst_object_unref (sinkpad); + goto fail; + } + if (gst_pad_link_full (srcpad, newpad, + GST_PAD_LINK_CHECK_NO_RECONFIGURE) != GST_PAD_LINK_OK) { + gst_element_release_request_pad (splitmux->muxer, newpad); + gst_object_unref (sinkpad); + gst_object_unref (newpad); + goto fail; + } + gst_object_unref (newpad); + gst_object_unref (sinkpad); + return; + +fail: + GST_ELEMENT_ERROR (splitmux, RESOURCE, SETTINGS, + ("Could not create the new muxer/sink"), NULL); +} + +static GstPadProbeReturn +_block_pad (GstPad * pad, GstPadProbeInfo * info, gpointer user_data) +{ + return GST_PAD_PROBE_OK; +} + +static void +block_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux) +{ + ctx->fragment_block_id = + gst_pad_add_probe (ctx->srcpad, GST_PAD_PROBE_TYPE_BLOCK, _block_pad, + NULL, NULL); +} + +static gboolean +_set_property_from_structure (GQuark field_id, const GValue * value, + gpointer user_data) +{ + const gchar *property_name = g_quark_to_string (field_id); + GObject *element = G_OBJECT (user_data); + + g_object_set_property (element, property_name, value); + + return TRUE; +} + +static void +_lock_and_set_to_null (GstElement * element, GstSplitMuxSink * splitmux) +{ + gst_element_set_locked_state (element, TRUE); + gst_element_set_state (element, GST_STATE_NULL); + GST_LOG_OBJECT (splitmux, "Removing old element %" GST_PTR_FORMAT, element); + gst_bin_remove (GST_BIN (splitmux), element); +} + + static void _send_event (const GValue * value, gpointer user_data) { @@ -1201,6 +1566,8 @@ start_next_fragment (GstSplitMuxSink * splitmux, MqStreamCtx * ctx) { GstElement *muxer, *sink; + g_assert (ctx->is_reference); + /* 1 change to new file */ splitmux->switching_fragment = TRUE; @@ -1212,27 +1579,87 @@ start_next_fragment (GstSplitMuxSink * splitmux, MqStreamCtx * ctx) GST_SPLITMUX_UNLOCK (splitmux); GST_STATE_LOCK (splitmux); - gst_element_set_locked_state (muxer, TRUE); - gst_element_set_locked_state (sink, TRUE); - gst_element_set_state (sink, GST_STATE_NULL); + if (splitmux->async_finalize) { + if (splitmux->muxed_out_bytes > 0 || splitmux->fragment_id != 0) { + gchar *newname; + GstElement *new_sink, *new_muxer; - if (splitmux->reset_muxer) { - gst_element_set_state (muxer, GST_STATE_NULL); + GST_DEBUG_OBJECT (splitmux, "Starting fragment %u", + splitmux->fragment_id); + g_list_foreach (splitmux->contexts, (GFunc) block_context, splitmux); + newname = g_strdup_printf ("sink_%u", splitmux->fragment_id); + GST_SPLITMUX_LOCK (splitmux); + if ((splitmux->sink = + create_element (splitmux, splitmux->sink_factory, newname, + TRUE)) == NULL) + goto fail; + if (splitmux->sink_properties) + gst_structure_foreach (splitmux->sink_properties, + _set_property_from_structure, splitmux->sink); + splitmux->active_sink = splitmux->sink; + g_signal_emit (splitmux, signals[SIGNAL_SINK_ADDED], 0, splitmux->sink); + g_free (newname); + newname = g_strdup_printf ("muxer_%u", splitmux->fragment_id); + if ((splitmux->muxer = + create_element (splitmux, splitmux->muxer_factory, newname, + TRUE)) == NULL) + goto fail; + if (g_object_class_find_property (G_OBJECT_GET_CLASS (splitmux->sink), + "async") != NULL) { + /* async child elements are causing state change races and weird + * failures, so let's try and turn that off */ + g_object_set (splitmux->sink, "async", FALSE, NULL); + } + if (splitmux->muxer_properties) + gst_structure_foreach (splitmux->muxer_properties, + _set_property_from_structure, splitmux->muxer); + g_signal_emit (splitmux, signals[SIGNAL_MUXER_ADDED], 0, splitmux->muxer); + g_free (newname); + new_sink = splitmux->sink; + new_muxer = splitmux->muxer; + GST_SPLITMUX_UNLOCK (splitmux); + g_list_foreach (splitmux->contexts, (GFunc) relink_context, splitmux); + gst_element_link (new_muxer, new_sink); + + if (g_object_get_qdata ((GObject *) sink, EOS_FROM_US)) { + if (GPOINTER_TO_INT (g_object_get_qdata ((GObject *) sink, + EOS_FROM_US)) == 2) { + _lock_and_set_to_null (muxer, splitmux); + _lock_and_set_to_null (sink, splitmux); + } else { + g_object_set_qdata ((GObject *) sink, EOS_FROM_US, + GINT_TO_POINTER (2)); + } + } + muxer = new_muxer; + sink = new_sink; + gst_object_ref (muxer); + gst_object_ref (sink); + } } else { - GstIterator *it = gst_element_iterate_sink_pads (muxer); - GstEvent *ev; - ev = gst_event_new_flush_start (); - while (gst_iterator_foreach (it, _send_event, ev) == GST_ITERATOR_RESYNC); - gst_event_unref (ev); + gst_element_set_locked_state (muxer, TRUE); + gst_element_set_locked_state (sink, TRUE); + gst_element_set_state (sink, GST_STATE_NULL); - gst_iterator_resync (it); + if (splitmux->reset_muxer) { + gst_element_set_state (muxer, GST_STATE_NULL); + } else { + GstIterator *it = gst_element_iterate_sink_pads (muxer); + GstEvent *ev; - ev = gst_event_new_flush_stop (TRUE); - while (gst_iterator_foreach (it, _send_event, ev) == GST_ITERATOR_RESYNC); - gst_event_unref (ev); + ev = gst_event_new_flush_start (); + while (gst_iterator_foreach (it, _send_event, ev) == GST_ITERATOR_RESYNC); + gst_event_unref (ev); - gst_iterator_free (it); + gst_iterator_resync (it); + + ev = gst_event_new_flush_stop (TRUE); + while (gst_iterator_foreach (it, _send_event, ev) == GST_ITERATOR_RESYNC); + gst_event_unref (ev); + + gst_iterator_free (it); + } } GST_SPLITMUX_LOCK (splitmux); @@ -1256,13 +1683,20 @@ start_next_fragment (GstSplitMuxSink * splitmux, MqStreamCtx * ctx) splitmux->ready_for_output = TRUE; + g_list_foreach (splitmux->contexts, (GFunc) unlock_context, splitmux); g_list_foreach (splitmux->contexts, (GFunc) restart_context, splitmux); - send_fragment_opened_closed_msg (splitmux, TRUE); + send_fragment_opened_closed_msg (splitmux, TRUE, sink); /* FIXME: Is this always the correct next state? */ splitmux->output_state = SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND; GST_SPLITMUX_BROADCAST_OUTPUT (splitmux); + return; + +fail: + GST_STATE_UNLOCK (splitmux); + GST_ELEMENT_ERROR (splitmux, RESOURCE, SETTINGS, + ("Could not create the new muxer/sink"), NULL); } static void @@ -1271,13 +1705,49 @@ bus_handler (GstBin * bin, GstMessage * message) GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (bin); switch (GST_MESSAGE_TYPE (message)) { - case GST_MESSAGE_EOS: + case GST_MESSAGE_EOS:{ /* If the state is draining out the current file, drop this EOS */ + GstElement *sink; + + sink = GST_ELEMENT (GST_MESSAGE_SRC (message)); GST_SPLITMUX_LOCK (splitmux); - send_fragment_opened_closed_msg (splitmux, FALSE); + send_fragment_opened_closed_msg (splitmux, FALSE, sink); - if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_ENDING_FILE) { + if (splitmux->async_finalize) { + + if (g_object_get_qdata ((GObject *) sink, EOS_FROM_US)) { + if (GPOINTER_TO_INT (g_object_get_qdata ((GObject *) sink, + EOS_FROM_US)) == 2) { + GstElement *muxer; + GstPad *sinksink, *muxersrc; + + sinksink = gst_element_get_static_pad (sink, "sink"); + muxersrc = gst_pad_get_peer (sinksink); + muxer = gst_pad_get_parent_element (muxersrc); + gst_object_unref (sinksink); + gst_object_unref (muxersrc); + + gst_element_call_async (muxer, + (GstElementCallAsyncFunc) _lock_and_set_to_null, + gst_object_ref (splitmux), gst_object_unref); + gst_element_call_async (sink, + (GstElementCallAsyncFunc) _lock_and_set_to_null, + gst_object_ref (splitmux), gst_object_unref); + gst_object_unref (muxer); + } else { + g_object_set_qdata ((GObject *) sink, EOS_FROM_US, + GINT_TO_POINTER (2)); + } + GST_DEBUG_OBJECT (splitmux, + "Caught async EOS from previous muxer+sink. Dropping."); + /* We forward the EOS so that it gets aggregated as normal. If the sink + * finishes and is removed before the end, it will be de-aggregated */ + gst_message_unref (message); + GST_SPLITMUX_UNLOCK (splitmux); + return; + } + } else if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_ENDING_FILE) { GST_DEBUG_OBJECT (splitmux, "Caught EOS at end of fragment, dropping"); splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE; GST_SPLITMUX_BROADCAST_OUTPUT (splitmux); @@ -1293,9 +1763,11 @@ bus_handler (GstBin * bin, GstMessage * message) } GST_SPLITMUX_UNLOCK (splitmux); break; + } case GST_MESSAGE_ASYNC_START: case GST_MESSAGE_ASYNC_DONE: /* Ignore state changes from our children while switching */ + GST_SPLITMUX_LOCK (splitmux); if (splitmux->switching_fragment) { if (GST_MESSAGE_SRC (message) == (GstObject *) splitmux->active_sink || GST_MESSAGE_SRC (message) == (GstObject *) splitmux->muxer) { @@ -1303,9 +1775,11 @@ bus_handler (GstBin * bin, GstMessage * message) "Ignoring state change from child %" GST_PTR_FORMAT " while switching", GST_MESSAGE_SRC (message)); gst_message_unref (message); + GST_SPLITMUX_UNLOCK (splitmux); return; } } + GST_SPLITMUX_UNLOCK (splitmux); break; case GST_MESSAGE_WARNING: { @@ -1473,6 +1947,10 @@ handle_gathered_gop (GstSplitMuxSink * splitmux) /* Check for overrun - have we output at least one byte and overrun * either threshold? */ if (need_new_fragment (splitmux, queued_time, queued_gop_time, queued_bytes)) { + GstClockTime *sink_running_time = g_new (GstClockTime, 1); + *sink_running_time = splitmux->reference_ctx->out_running_time; + g_object_set_qdata_full (G_OBJECT (splitmux->sink), + RUNNING_TIME, sink_running_time, g_free); g_atomic_int_set (&(splitmux->split_now), FALSE); /* Tell the output side to start a new fragment */ GST_INFO_OBJECT (splitmux, @@ -1998,6 +2476,7 @@ gst_splitmux_sink_request_new_pad (GstElement * element, GST_SPLITMUX_LOCK (splitmux); if (!create_muxer (splitmux)) goto fail; + g_signal_emit (splitmux, signals[SIGNAL_MUXER_ADDED], 0, splitmux->muxer); if (templ->name_template) { if (g_str_equal (templ->name_template, "video")) { @@ -2107,7 +2586,7 @@ gst_splitmux_sink_request_new_pad (GstElement * element, GST_DEBUG_OBJECT (splitmux, "Request pad %" GST_PTR_FORMAT " feeds queue pad %" GST_PTR_FORMAT, res, q_sink); - splitmux->contexts = g_list_prepend (splitmux->contexts, ctx); + splitmux->contexts = g_list_append (splitmux->contexts, ctx); g_free (gname); @@ -2224,10 +2703,19 @@ create_muxer (GstSplitMuxSink * splitmux) provided_muxer = gst_object_ref (splitmux->provided_muxer); GST_OBJECT_UNLOCK (splitmux); - if (provided_muxer == NULL) { + if ((!splitmux->async_finalize && provided_muxer == NULL) || + (splitmux->async_finalize && splitmux->muxer_factory == NULL)) { if ((splitmux->muxer = - create_element (splitmux, "mp4mux", "muxer", FALSE)) == NULL) + create_element (splitmux, DEFAULT_MUXER, "muxer", FALSE)) == NULL) goto fail; + } else if (splitmux->async_finalize) { + if ((splitmux->muxer = + create_element (splitmux, splitmux->muxer_factory, "muxer", + FALSE)) == NULL) + goto fail; + if (splitmux->muxer_properties) + gst_structure_foreach (splitmux->muxer_properties, + _set_property_from_structure, splitmux->muxer); } else { /* Ensure it's not in locked state (we might be reusing an old element) */ gst_element_set_locked_state (provided_muxer, FALSE); @@ -2308,11 +2796,21 @@ create_sink (GstSplitMuxSink * splitmux) provided_sink = gst_object_ref (splitmux->provided_sink); GST_OBJECT_UNLOCK (splitmux); - if (provided_sink == NULL) { + if ((!splitmux->async_finalize && provided_sink == NULL) || + (splitmux->async_finalize && splitmux->sink_factory == NULL)) { if ((splitmux->sink = create_element (splitmux, DEFAULT_SINK, "sink", TRUE)) == NULL) goto fail; splitmux->active_sink = splitmux->sink; + } else if (splitmux->async_finalize) { + if ((splitmux->sink = + create_element (splitmux, splitmux->sink_factory, "sink", + TRUE)) == NULL) + goto fail; + if (splitmux->sink_properties) + gst_structure_foreach (splitmux->sink_properties, + _set_property_from_structure, splitmux->sink); + splitmux->active_sink = splitmux->sink; } else { /* Ensure the sink starts in locked state and NULL - it will be changed * by the filename setting code */ @@ -2452,6 +2950,8 @@ gst_splitmux_sink_change_state (GstElement * element, GstStateChange transition) GST_SPLITMUX_UNLOCK (splitmux); goto beach; } + g_signal_emit (splitmux, signals[SIGNAL_MUXER_ADDED], 0, splitmux->muxer); + g_signal_emit (splitmux, signals[SIGNAL_SINK_ADDED], 0, splitmux->sink); GST_SPLITMUX_UNLOCK (splitmux); splitmux->fragment_id = 0; break; diff --git a/gst/multifile/gstsplitmuxsink.h b/gst/multifile/gstsplitmuxsink.h index 0d35647843..eb4512ff15 100644 --- a/gst/multifile/gstsplitmuxsink.h +++ b/gst/multifile/gstsplitmuxsink.h @@ -75,12 +75,14 @@ typedef struct _MqStreamCtx guint q_overrun_id; guint sink_pad_block_id; guint src_pad_block_id; + gulong fragment_block_id; gboolean is_reference; gboolean flushing; gboolean in_eos; gboolean out_eos; + gboolean out_eos_async_done; gboolean need_unblock; gboolean caps_change; @@ -172,6 +174,13 @@ struct _GstSplitMuxSink gboolean muxer_has_reserved_props; gboolean split_now; + + /* Async finalize options */ + gboolean async_finalize; + gchar *muxer_factory; + GstStructure *muxer_properties; + gchar *sink_factory; + GstStructure *sink_properties; }; struct _GstSplitMuxSinkClass diff --git a/tests/check/elements/splitmux.c b/tests/check/elements/splitmux.c index 5fb78d27c7..2a411d94b5 100644 --- a/tests/check/elements/splitmux.c +++ b/tests/check/elements/splitmux.c @@ -195,6 +195,7 @@ test_playback (const gchar * in_pattern, GstClockTime exp_first_time, GstMessage *msg; GstElement *pipeline; GstElement *fakesink; + GstElement *fakesink2; gchar *uri; pipeline = gst_element_factory_make ("playbin", NULL); @@ -203,6 +204,9 @@ test_playback (const gchar * in_pattern, GstClockTime exp_first_time, fakesink = gst_element_factory_make ("fakesink", NULL); fail_if (fakesink == NULL); g_object_set (G_OBJECT (pipeline), "video-sink", fakesink, NULL); + fakesink2 = gst_element_factory_make ("fakesink", NULL); + fail_if (fakesink2 == NULL); + g_object_set (G_OBJECT (pipeline), "audio-sink", fakesink2, NULL); uri = g_strdup_printf ("splitmux://%s", in_pattern); @@ -371,6 +375,70 @@ GST_START_TEST (test_splitmuxsink) GST_END_TEST; +GST_START_TEST (test_splitmuxsink_async) +{ + GstMessage *msg; + GstElement *pipeline; + GstElement *sink; + GstPad *splitmux_sink_pad; + GstPad *enc_src_pad; + gchar *dest_pattern; + guint count; + gchar *in_pattern; + + pipeline = + gst_parse_launch + ("videotestsrc num-buffers=15 ! video/x-raw,width=80,height=64,framerate=5/1 ! videoconvert !" + " queue ! theoraenc keyframe-force=5 ! splitmuxsink name=splitsink " + " max-size-time=1000000000 async-finalize=true " + " muxer-factory=matroskamux audiotestsrc num-buffers=15 samplesperbuffer=9600 ! " + " audio/x-raw,rate=48000 ! splitsink.audio_%u", NULL); + fail_if (pipeline == NULL); + sink = gst_bin_get_by_name (GST_BIN (pipeline), "splitsink"); + fail_if (sink == NULL); + g_signal_connect (sink, "format-location-full", + (GCallback) check_format_location, NULL); + dest_pattern = g_build_filename (tmpdir, "matroska%05d.mkv", NULL); + g_object_set (G_OBJECT (sink), "location", dest_pattern, NULL); + g_free (dest_pattern); + g_object_unref (sink); + + msg = run_pipeline (pipeline); + + if (GST_MESSAGE_TYPE (msg) == GST_MESSAGE_ERROR) + dump_error (msg); + fail_unless (GST_MESSAGE_TYPE (msg) == GST_MESSAGE_EOS); + gst_message_unref (msg); + + /* unlink manually and relase request pad to ensure that we *can* do that + * - https://bugzilla.gnome.org/show_bug.cgi?id=753622 */ + sink = gst_bin_get_by_name (GST_BIN (pipeline), "splitsink"); + fail_if (sink == NULL); + splitmux_sink_pad = gst_element_get_static_pad (sink, "video"); + fail_if (splitmux_sink_pad == NULL); + enc_src_pad = gst_pad_get_peer (splitmux_sink_pad); + fail_if (enc_src_pad == NULL); + fail_unless (gst_pad_unlink (enc_src_pad, splitmux_sink_pad)); + gst_object_unref (enc_src_pad); + gst_element_release_request_pad (sink, splitmux_sink_pad); + gst_object_unref (splitmux_sink_pad); + /* at this point the pad must be releaased - try to find it again to verify */ + splitmux_sink_pad = gst_element_get_static_pad (sink, "video"); + fail_if (splitmux_sink_pad != NULL); + g_object_unref (sink); + + gst_object_unref (pipeline); + + count = count_files (tmpdir); + fail_unless (count == 3, "Expected 3 output files, got %d", count); + + in_pattern = g_build_filename (tmpdir, "matroska*.mkv", NULL); + test_playback (in_pattern, 0, 3 * GST_SECOND); + g_free (in_pattern); +} + +GST_END_TEST; + static GstPadProbeReturn intercept_stream_start (GstPad * pad, GstPadProbeInfo * info, gpointer user_data) @@ -771,6 +839,7 @@ splitmux_suite (void) tempdir_cleanup); tcase_add_test (tc_chain_complex, test_splitmuxsrc_sparse_streams); + tcase_add_test (tc_chain, test_splitmuxsink_async); } else { GST_INFO ("Skipping tests, missing plugins: matroska and/or vorbis"); }