splitmuxsink: Added new async-finalize mode

This mode is useful for muxers that can take a long time to finalize a
file. Instead of blocking the whole upstream pipeline while the muxer is
doing its stuff, we can unlink it and spawn a new muxer+sink combination
to continue running normally.

This requires us to receive the muxer and sink (if needed) as factories,
optionally accompanied by their respective properties structures. Also
added the muxer-added and sink-added signals, in case custom code has to
be called for them.

https://bugzilla.gnome.org/show_bug.cgi?id=783754
This commit is contained in:
Vivia Nikolaidou 2017-06-13 17:42:55 +03:00
parent d35f893715
commit d11339d616
3 changed files with 611 additions and 33 deletions

View file

@ -38,6 +38,13 @@
* file parts to be played individually correctly. In the absence of a video
* stream, the first available stream is used as reference for synchronization.
*
* In the async-finalize mode, when the threshold is crossed, the old muxer
* and sink is disconnected from the pipeline and left to finish the file
* asynchronously, and a new muxer and sink is created to continue with the
* next fragment. For that reason, instead of muxer and sink objects, the
* muxer-factory and sink-factory properties are used to construct the new
* objects, together with muxer-properties and sink-properties.
*
* <refsect2>
* <title>Example pipelines</title>
* |[
@ -46,6 +53,13 @@
* Records a video stream captured from a v4l2 device and muxes it into
* ISO mp4 files, splitting as needed to limit size/duration to 10 seconds
* and 1MB maximum size.
*
* |[
* gst-launch-1.0 -e v4l2src num-buffers=500 ! video/x-raw,width=320,height=240 ! videoconvert ! queue ! timeoverlay ! x264enc key-int-max=10 ! h264parse ! splitmuxsink location=video%02d.mkv max-size-time=10000000000 muxer-factory=matroskamux muxer-properties="properties,streamable=true"
* ]|
* Records a video stream captured from a v4l2 device and muxer it into
* streamable Matroska files, splitting as needed to limit size/duration to 10
* seconds. Each file will finalize asynchronously.
* </refsect2>
*/
@ -86,6 +100,11 @@ enum
PROP_MUXER,
PROP_SINK,
PROP_RESET_MUXER,
PROP_ASYNC_FINALIZE,
PROP_MUXER_FACTORY,
PROP_MUXER_PROPERTIES,
PROP_SINK_FACTORY,
PROP_SINK_PROPERTIES
};
#define DEFAULT_MAX_SIZE_TIME 0
@ -98,12 +117,21 @@ enum
#define DEFAULT_SINK "filesink"
#define DEFAULT_USE_ROBUST_MUXING FALSE
#define DEFAULT_RESET_MUXER TRUE
#define DEFAULT_ASYNC_FINALIZE FALSE
typedef struct _AsyncEosHelper
{
MqStreamCtx *ctx;
GstPad *pad;
} AsyncEosHelper;
enum
{
SIGNAL_FORMAT_LOCATION,
SIGNAL_FORMAT_LOCATION_FULL,
SIGNAL_SPLIT_NOW,
SIGNAL_MUXER_ADDED,
SIGNAL_SINK_ADDED,
SIGNAL_LAST
};
@ -131,11 +159,30 @@ GST_STATIC_PAD_TEMPLATE ("caption_%u",
GST_STATIC_CAPS_ANY);
static GQuark PAD_CONTEXT;
static GQuark EOS_FROM_US;
static GQuark RUNNING_TIME;
/* EOS_FROM_US is only valid in async-finalize mode. We need to know whether
* to forward an incoming EOS message, but we cannot rely on the state of the
* splitmux anymore, so we set this qdata on the sink instead.
* The muxer and sink must be destroyed after both of these things have
* finished:
* 1) The EOS message has been sent when the fragment is ending
* 2) The muxer has been unlinked and relinked
* Therefore, EOS_FROM_US can have these two values:
* 0: EOS was not requested from us. Forward the message. The muxer and the
* sink will be destroyed together with the rest of the bin.
* 1: EOS was requested from us, but the other of the two tasks hasn't
* finished. Set EOS_FROM_US to 2 and do your stuff.
* 2: EOS was requested from us and the other of the two tasks has finished.
* Now we can destroy the muxer and the sink.
*/
static void
_do_init (void)
{
PAD_CONTEXT = g_quark_from_static_string ("pad-context");
EOS_FROM_US = g_quark_from_static_string ("eos-from-us");
RUNNING_TIME = g_quark_from_static_string ("running-time");
}
#define gst_splitmux_sink_parent_class parent_class
@ -275,11 +322,13 @@ gst_splitmux_sink_class_init (GstSplitMuxSinkClass * klass)
g_object_class_install_property (gobject_class, PROP_MUXER,
g_param_spec_object ("muxer", "Muxer",
"The muxer element to use (NULL = default mp4mux)",
"The muxer element to use (NULL = default mp4mux). "
"Valid only for async-finalize = FALSE",
GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_SINK,
g_param_spec_object ("sink", "Sink",
"The sink element (or element chain) to use (NULL = default filesink)",
"The sink element (or element chain) to use (NULL = default filesink). "
"Valid only for async-finalize = FALSE",
GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_USE_ROBUST_MUXING,
@ -299,6 +348,34 @@ gst_splitmux_sink_class_init (GstSplitMuxSinkClass * klass)
"Reset the muxer after each segment. Disabling this will not work for most muxers.",
DEFAULT_RESET_MUXER, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_ASYNC_FINALIZE,
g_param_spec_boolean ("async-finalize",
"Finalize fragments asynchronously",
"Finalize each fragment asynchronously and start a new one",
DEFAULT_ASYNC_FINALIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_MUXER_FACTORY,
g_param_spec_string ("muxer-factory", "Muxer factory",
"The muxer element factory to use (default = mp4mux). "
"Valid only for async-finalize = TRUE",
"mp4mux", G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_MUXER_PROPERTIES,
g_param_spec_boxed ("muxer-properties", "Muxer properties",
"The muxer element properties to use. "
"Example: {properties,boolean-prop=true,string-prop=\"hi\"}. "
"Valid only for async-finalize = TRUE",
GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_SINK_FACTORY,
g_param_spec_string ("sink-factory", "Sink factory",
"The sink element factory to use (default = filesink). "
"Valid only for async-finalize = TRUE",
"filesink", G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_SINK_PROPERTIES,
g_param_spec_boxed ("sink-properties", "Sink properties",
"The sink element properties to use. "
"Example: {properties,boolean-prop=true,string-prop=\"hi\"}. "
"Valid only for async-finalize = TRUE",
GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
/**
* GstSplitMuxSink::format-location:
* @splitmux: the #GstSplitMuxSink
@ -333,12 +410,33 @@ gst_splitmux_sink_class_init (GstSplitMuxSinkClass * klass)
*
* Since: 1.14
*/
signals[SIGNAL_SPLIT_NOW] =
g_signal_new ("split-now", G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstSplitMuxSinkClass,
split_now), NULL, NULL, NULL, G_TYPE_NONE, 0);
/**
* GstSplitMuxSink::muxer-added:
* @splitmux: the #GstSplitMuxSink
* @muxer: the newly added muxer element
*
* Since: 1.14
*/
signals[SIGNAL_MUXER_ADDED] =
g_signal_new ("muxer-added", G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
/**
* GstSplitMuxSink::sink-added:
* @splitmux: the #GstSplitMuxSink
* @sink: the newly added sink element
*
* Since: 1.14
*/
signals[SIGNAL_SINK_ADDED] =
g_signal_new ("sink-added", G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
klass->split_now = split_now;
}
@ -362,6 +460,12 @@ gst_splitmux_sink_init (GstSplitMuxSink * splitmux)
splitmux->threshold_timecode_str = NULL;
splitmux->async_finalize = DEFAULT_ASYNC_FINALIZE;
splitmux->muxer_factory = g_strdup (DEFAULT_MUXER);
splitmux->muxer_properties = NULL;
splitmux->sink_factory = g_strdup (DEFAULT_SINK);
splitmux->sink_properties = NULL;
GST_OBJECT_FLAG_SET (splitmux, GST_ELEMENT_FLAG_SINK);
splitmux->split_now = FALSE;
}
@ -409,6 +513,15 @@ gst_splitmux_sink_finalize (GObject * object)
if (splitmux->provided_muxer)
gst_object_unref (splitmux->provided_muxer);
if (splitmux->muxer_factory)
g_free (splitmux->muxer_factory);
if (splitmux->muxer_properties)
gst_structure_free (splitmux->muxer_properties);
if (splitmux->sink_factory)
g_free (splitmux->sink_factory);
if (splitmux->sink_properties)
gst_structure_free (splitmux->sink_properties);
if (splitmux->threshold_timecode_str)
g_free (splitmux->threshold_timecode_str);
@ -533,6 +646,47 @@ gst_splitmux_sink_set_property (GObject * object, guint prop_id,
splitmux->reset_muxer = g_value_get_boolean (value);
GST_OBJECT_UNLOCK (splitmux);
break;
case PROP_ASYNC_FINALIZE:
GST_OBJECT_LOCK (splitmux);
splitmux->async_finalize = g_value_get_boolean (value);
GST_OBJECT_UNLOCK (splitmux);
break;
case PROP_MUXER_FACTORY:
GST_OBJECT_LOCK (splitmux);
if (splitmux->muxer_factory)
g_free (splitmux->muxer_factory);
splitmux->muxer_factory = g_value_dup_string (value);
GST_OBJECT_UNLOCK (splitmux);
break;
case PROP_MUXER_PROPERTIES:
GST_OBJECT_LOCK (splitmux);
if (splitmux->muxer_properties)
gst_structure_free (splitmux->muxer_properties);
if (gst_value_get_structure (value))
splitmux->muxer_properties =
gst_structure_copy (gst_value_get_structure (value));
else
splitmux->muxer_properties = NULL;
GST_OBJECT_UNLOCK (splitmux);
break;
case PROP_SINK_FACTORY:
GST_OBJECT_LOCK (splitmux);
if (splitmux->sink_factory)
g_free (splitmux->sink_factory);
splitmux->sink_factory = g_value_dup_string (value);
GST_OBJECT_UNLOCK (splitmux);
break;
case PROP_SINK_PROPERTIES:
GST_OBJECT_LOCK (splitmux);
if (splitmux->sink_properties)
gst_structure_free (splitmux->sink_properties);
if (gst_value_get_structure (value))
splitmux->sink_properties =
gst_structure_copy (gst_value_get_structure (value));
else
splitmux->sink_properties = NULL;
GST_OBJECT_UNLOCK (splitmux);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
@ -606,6 +760,31 @@ gst_splitmux_sink_get_property (GObject * object, guint prop_id,
g_value_set_boolean (value, splitmux->reset_muxer);
GST_OBJECT_UNLOCK (splitmux);
break;
case PROP_ASYNC_FINALIZE:
GST_OBJECT_LOCK (splitmux);
g_value_set_boolean (value, splitmux->async_finalize);
GST_OBJECT_UNLOCK (splitmux);
break;
case PROP_MUXER_FACTORY:
GST_OBJECT_LOCK (splitmux);
g_value_set_string (value, splitmux->muxer_factory);
GST_OBJECT_UNLOCK (splitmux);
break;
case PROP_MUXER_PROPERTIES:
GST_OBJECT_LOCK (splitmux);
gst_value_set_structure (value, splitmux->muxer_properties);
GST_OBJECT_UNLOCK (splitmux);
break;
case PROP_SINK_FACTORY:
GST_OBJECT_LOCK (splitmux);
g_value_set_string (value, splitmux->sink_factory);
GST_OBJECT_UNLOCK (splitmux);
break;
case PROP_SINK_PROPERTIES:
GST_OBJECT_LOCK (splitmux);
gst_value_set_structure (value, splitmux->sink_properties);
GST_OBJECT_UNLOCK (splitmux);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
@ -690,14 +869,22 @@ _pad_block_destroy_src_notify (MqStreamCtx * ctx)
}
static void
send_fragment_opened_closed_msg (GstSplitMuxSink * splitmux, gboolean opened)
send_fragment_opened_closed_msg (GstSplitMuxSink * splitmux, gboolean opened,
GstElement * sink)
{
gchar *location = NULL;
GstMessage *msg;
const gchar *msg_name = opened ?
"splitmuxsink-fragment-opened" : "splitmuxsink-fragment-closed";
GstClockTime running_time = splitmux->reference_ctx->out_running_time;
g_object_get (splitmux->sink, "location", &location, NULL);
if (!opened) {
GstClockTime *rtime = g_object_get_qdata (G_OBJECT (sink), RUNNING_TIME);
if (rtime)
running_time = *rtime;
}
g_object_get (sink, "location", &location, NULL);
/* If it's in the middle of a teardown, the reference_ctc might have become
* NULL */
@ -705,14 +892,36 @@ send_fragment_opened_closed_msg (GstSplitMuxSink * splitmux, gboolean opened)
msg = gst_message_new_element (GST_OBJECT (splitmux),
gst_structure_new (msg_name,
"location", G_TYPE_STRING, location,
"running-time", GST_TYPE_CLOCK_TIME,
splitmux->reference_ctx->out_running_time, NULL));
"running-time", GST_TYPE_CLOCK_TIME, running_time, NULL));
gst_element_post_message (GST_ELEMENT_CAST (splitmux), msg);
}
g_free (location);
}
static void
send_eos_async (GstSplitMuxSink * splitmux, AsyncEosHelper * helper)
{
GstEvent *eos;
GstPad *pad;
MqStreamCtx *ctx;
eos = gst_event_new_eos ();
pad = helper->pad;
ctx = helper->ctx;
GST_SPLITMUX_LOCK (splitmux);
if (!pad)
pad = gst_pad_get_peer (ctx->srcpad);
GST_SPLITMUX_UNLOCK (splitmux);
gst_pad_send_event (pad, eos);
GST_INFO_OBJECT (splitmux, "Sent async EOS on %" GST_PTR_FORMAT, pad);
gst_object_unref (pad);
g_free (helper);
}
/* Called with lock held, drops the lock to send EOS to the
* pad
*/
@ -735,6 +944,54 @@ send_eos (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
gst_object_unref (pad);
}
/* Called with lock held. Schedules an EOS event to the ctx pad
* to happen in another thread */
static void
eos_context_async (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
{
AsyncEosHelper *helper = g_new0 (AsyncEosHelper, 1);
GstPad *srcpad, *sinkpad;
srcpad = ctx->srcpad;
sinkpad = gst_pad_get_peer (srcpad);
helper->ctx = ctx;
helper->pad = sinkpad; /* Takes the reference */
ctx->out_eos_async_done = TRUE;
/* HACK: Here, we explicitly unset the SINK flag on the target sink element
* that's about to be asynchronously disposed, so that it no longer
* participates in GstBin EOS logic. This fixes a race where if
* splitmuxsink really reaches EOS before an asynchronous background
* element has finished, then the bin won't actually send EOS to the
* pipeline. Even after finishing and removing the old element, the
* bin doesn't re-check EOS status on removing a SINK element. This
* should be fixed in core, making this hack unnecessary. */
GST_OBJECT_FLAG_UNSET (splitmux->active_sink, GST_ELEMENT_FLAG_SINK);
GST_DEBUG_OBJECT (splitmux, "scheduled EOS to pad %" GST_PTR_FORMAT " ctx %p",
sinkpad, ctx);
g_assert_nonnull (helper->pad);
gst_element_call_async (GST_ELEMENT (splitmux),
(GstElementCallAsyncFunc) send_eos_async, helper, NULL);
}
/* Called with lock held. TRUE iff all contexts have a
* pending (or delivered) async eos event */
static gboolean
all_contexts_are_async_eos (GstSplitMuxSink * splitmux)
{
gboolean ret = TRUE;
GList *item;
for (item = splitmux->contexts; item; item = item->next) {
MqStreamCtx *ctx = item->data;
ret &= ctx->out_eos_async_done;
}
return ret;
}
/* Called with splitmux lock held to check if this output
* context needs to sleep to wait for the release of the
* next GOP, or to send EOS to close out the current file
@ -789,8 +1046,31 @@ complete_or_wait_on_out (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
case SPLITMUX_OUTPUT_STATE_ENDING_FILE:
/* We've reached the max out running_time to get here, so end this file now */
if (ctx->out_eos == FALSE) {
send_eos (splitmux, ctx);
continue;
if (splitmux->async_finalize) {
/* We must set EOS asynchronously at this point. We cannot defer
* it, because we need all contexts to wake up, for the
* reference context to eventually give us something at
* START_NEXT_FILE. Otherwise, collectpads might choose another
* context to give us the first buffer, and format-location-full
* will not contain a valid sample. */
g_object_set_qdata ((GObject *) splitmux->sink, EOS_FROM_US,
GINT_TO_POINTER (1));
eos_context_async (ctx, splitmux);
if (all_contexts_are_async_eos (splitmux)) {
GST_INFO_OBJECT (splitmux,
"All contexts are async_eos. Moving to the next file.");
/* We can start the next file once we've asked each pad to go EOS */
splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE;
GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
continue;
}
} else {
send_eos (splitmux, ctx);
continue;
}
} else {
GST_INFO_OBJECT (splitmux,
"At end-of-file state, but context %p is already EOS", ctx);
}
break;
case SPLITMUX_OUTPUT_STATE_START_NEXT_FILE:
@ -798,11 +1078,12 @@ complete_or_wait_on_out (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
/* Special handling on the reference ctx to start new fragments
* and collect commands from the command queue */
/* drops the splitmux lock briefly: */
/* We must have reference ctx in order for format-location-full to
* have a sample */
start_next_fragment (splitmux, ctx);
continue;
}
break;
case SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND:{
do {
SplitMuxOutputCommand *cmd =
@ -982,6 +1263,8 @@ handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
goto beach;
ctx->out_eos = TRUE;
GST_INFO_OBJECT (splitmux,
"Have EOS event at pad %" GST_PTR_FORMAT " ctx %p", pad, ctx);
break;
case GST_EVENT_GAP:{
GstClockTime gap_ts;
@ -1169,6 +1452,15 @@ resend_sticky (GstPad * pad, GstEvent ** event, GstPad * peer)
return gst_pad_send_event (peer, gst_event_ref (*event));
}
static void
unlock_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
{
if (ctx->fragment_block_id > 0) {
gst_pad_remove_probe (ctx->srcpad, ctx->fragment_block_id);
ctx->fragment_block_id = 0;
}
}
static void
restart_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
{
@ -1179,10 +1471,83 @@ restart_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
/* Clear EOS flag if not actually EOS */
ctx->out_eos = GST_PAD_IS_EOS (ctx->srcpad);
ctx->out_eos_async_done = ctx->out_eos;
gst_object_unref (peer);
}
static void
relink_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
{
GstPad *sinkpad, *srcpad, *newpad;
GstPadTemplate *templ;
srcpad = ctx->srcpad;
sinkpad = gst_pad_get_peer (srcpad);
templ = sinkpad->padtemplate;
newpad =
gst_element_request_pad (splitmux->muxer, templ,
GST_PAD_TEMPLATE_NAME_TEMPLATE (templ), NULL);
GST_DEBUG_OBJECT (splitmux, "Relinking ctx %p to pad %" GST_PTR_FORMAT, ctx,
newpad);
if (!gst_pad_unlink (srcpad, sinkpad)) {
gst_object_unref (sinkpad);
goto fail;
}
if (gst_pad_link_full (srcpad, newpad,
GST_PAD_LINK_CHECK_NO_RECONFIGURE) != GST_PAD_LINK_OK) {
gst_element_release_request_pad (splitmux->muxer, newpad);
gst_object_unref (sinkpad);
gst_object_unref (newpad);
goto fail;
}
gst_object_unref (newpad);
gst_object_unref (sinkpad);
return;
fail:
GST_ELEMENT_ERROR (splitmux, RESOURCE, SETTINGS,
("Could not create the new muxer/sink"), NULL);
}
static GstPadProbeReturn
_block_pad (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
{
return GST_PAD_PROBE_OK;
}
static void
block_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
{
ctx->fragment_block_id =
gst_pad_add_probe (ctx->srcpad, GST_PAD_PROBE_TYPE_BLOCK, _block_pad,
NULL, NULL);
}
static gboolean
_set_property_from_structure (GQuark field_id, const GValue * value,
gpointer user_data)
{
const gchar *property_name = g_quark_to_string (field_id);
GObject *element = G_OBJECT (user_data);
g_object_set_property (element, property_name, value);
return TRUE;
}
static void
_lock_and_set_to_null (GstElement * element, GstSplitMuxSink * splitmux)
{
gst_element_set_locked_state (element, TRUE);
gst_element_set_state (element, GST_STATE_NULL);
GST_LOG_OBJECT (splitmux, "Removing old element %" GST_PTR_FORMAT, element);
gst_bin_remove (GST_BIN (splitmux), element);
}
static void
_send_event (const GValue * value, gpointer user_data)
{
@ -1201,6 +1566,8 @@ start_next_fragment (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
{
GstElement *muxer, *sink;
g_assert (ctx->is_reference);
/* 1 change to new file */
splitmux->switching_fragment = TRUE;
@ -1212,27 +1579,87 @@ start_next_fragment (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
GST_SPLITMUX_UNLOCK (splitmux);
GST_STATE_LOCK (splitmux);
gst_element_set_locked_state (muxer, TRUE);
gst_element_set_locked_state (sink, TRUE);
gst_element_set_state (sink, GST_STATE_NULL);
if (splitmux->async_finalize) {
if (splitmux->muxed_out_bytes > 0 || splitmux->fragment_id != 0) {
gchar *newname;
GstElement *new_sink, *new_muxer;
if (splitmux->reset_muxer) {
gst_element_set_state (muxer, GST_STATE_NULL);
GST_DEBUG_OBJECT (splitmux, "Starting fragment %u",
splitmux->fragment_id);
g_list_foreach (splitmux->contexts, (GFunc) block_context, splitmux);
newname = g_strdup_printf ("sink_%u", splitmux->fragment_id);
GST_SPLITMUX_LOCK (splitmux);
if ((splitmux->sink =
create_element (splitmux, splitmux->sink_factory, newname,
TRUE)) == NULL)
goto fail;
if (splitmux->sink_properties)
gst_structure_foreach (splitmux->sink_properties,
_set_property_from_structure, splitmux->sink);
splitmux->active_sink = splitmux->sink;
g_signal_emit (splitmux, signals[SIGNAL_SINK_ADDED], 0, splitmux->sink);
g_free (newname);
newname = g_strdup_printf ("muxer_%u", splitmux->fragment_id);
if ((splitmux->muxer =
create_element (splitmux, splitmux->muxer_factory, newname,
TRUE)) == NULL)
goto fail;
if (g_object_class_find_property (G_OBJECT_GET_CLASS (splitmux->sink),
"async") != NULL) {
/* async child elements are causing state change races and weird
* failures, so let's try and turn that off */
g_object_set (splitmux->sink, "async", FALSE, NULL);
}
if (splitmux->muxer_properties)
gst_structure_foreach (splitmux->muxer_properties,
_set_property_from_structure, splitmux->muxer);
g_signal_emit (splitmux, signals[SIGNAL_MUXER_ADDED], 0, splitmux->muxer);
g_free (newname);
new_sink = splitmux->sink;
new_muxer = splitmux->muxer;
GST_SPLITMUX_UNLOCK (splitmux);
g_list_foreach (splitmux->contexts, (GFunc) relink_context, splitmux);
gst_element_link (new_muxer, new_sink);
if (g_object_get_qdata ((GObject *) sink, EOS_FROM_US)) {
if (GPOINTER_TO_INT (g_object_get_qdata ((GObject *) sink,
EOS_FROM_US)) == 2) {
_lock_and_set_to_null (muxer, splitmux);
_lock_and_set_to_null (sink, splitmux);
} else {
g_object_set_qdata ((GObject *) sink, EOS_FROM_US,
GINT_TO_POINTER (2));
}
}
muxer = new_muxer;
sink = new_sink;
gst_object_ref (muxer);
gst_object_ref (sink);
}
} else {
GstIterator *it = gst_element_iterate_sink_pads (muxer);
GstEvent *ev;
ev = gst_event_new_flush_start ();
while (gst_iterator_foreach (it, _send_event, ev) == GST_ITERATOR_RESYNC);
gst_event_unref (ev);
gst_element_set_locked_state (muxer, TRUE);
gst_element_set_locked_state (sink, TRUE);
gst_element_set_state (sink, GST_STATE_NULL);
gst_iterator_resync (it);
if (splitmux->reset_muxer) {
gst_element_set_state (muxer, GST_STATE_NULL);
} else {
GstIterator *it = gst_element_iterate_sink_pads (muxer);
GstEvent *ev;
ev = gst_event_new_flush_stop (TRUE);
while (gst_iterator_foreach (it, _send_event, ev) == GST_ITERATOR_RESYNC);
gst_event_unref (ev);
ev = gst_event_new_flush_start ();
while (gst_iterator_foreach (it, _send_event, ev) == GST_ITERATOR_RESYNC);
gst_event_unref (ev);
gst_iterator_free (it);
gst_iterator_resync (it);
ev = gst_event_new_flush_stop (TRUE);
while (gst_iterator_foreach (it, _send_event, ev) == GST_ITERATOR_RESYNC);
gst_event_unref (ev);
gst_iterator_free (it);
}
}
GST_SPLITMUX_LOCK (splitmux);
@ -1256,13 +1683,20 @@ start_next_fragment (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
splitmux->ready_for_output = TRUE;
g_list_foreach (splitmux->contexts, (GFunc) unlock_context, splitmux);
g_list_foreach (splitmux->contexts, (GFunc) restart_context, splitmux);
send_fragment_opened_closed_msg (splitmux, TRUE);
send_fragment_opened_closed_msg (splitmux, TRUE, sink);
/* FIXME: Is this always the correct next state? */
splitmux->output_state = SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND;
GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
return;
fail:
GST_STATE_UNLOCK (splitmux);
GST_ELEMENT_ERROR (splitmux, RESOURCE, SETTINGS,
("Could not create the new muxer/sink"), NULL);
}
static void
@ -1271,13 +1705,49 @@ bus_handler (GstBin * bin, GstMessage * message)
GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (bin);
switch (GST_MESSAGE_TYPE (message)) {
case GST_MESSAGE_EOS:
case GST_MESSAGE_EOS:{
/* If the state is draining out the current file, drop this EOS */
GstElement *sink;
sink = GST_ELEMENT (GST_MESSAGE_SRC (message));
GST_SPLITMUX_LOCK (splitmux);
send_fragment_opened_closed_msg (splitmux, FALSE);
send_fragment_opened_closed_msg (splitmux, FALSE, sink);
if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_ENDING_FILE) {
if (splitmux->async_finalize) {
if (g_object_get_qdata ((GObject *) sink, EOS_FROM_US)) {
if (GPOINTER_TO_INT (g_object_get_qdata ((GObject *) sink,
EOS_FROM_US)) == 2) {
GstElement *muxer;
GstPad *sinksink, *muxersrc;
sinksink = gst_element_get_static_pad (sink, "sink");
muxersrc = gst_pad_get_peer (sinksink);
muxer = gst_pad_get_parent_element (muxersrc);
gst_object_unref (sinksink);
gst_object_unref (muxersrc);
gst_element_call_async (muxer,
(GstElementCallAsyncFunc) _lock_and_set_to_null,
gst_object_ref (splitmux), gst_object_unref);
gst_element_call_async (sink,
(GstElementCallAsyncFunc) _lock_and_set_to_null,
gst_object_ref (splitmux), gst_object_unref);
gst_object_unref (muxer);
} else {
g_object_set_qdata ((GObject *) sink, EOS_FROM_US,
GINT_TO_POINTER (2));
}
GST_DEBUG_OBJECT (splitmux,
"Caught async EOS from previous muxer+sink. Dropping.");
/* We forward the EOS so that it gets aggregated as normal. If the sink
* finishes and is removed before the end, it will be de-aggregated */
gst_message_unref (message);
GST_SPLITMUX_UNLOCK (splitmux);
return;
}
} else if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_ENDING_FILE) {
GST_DEBUG_OBJECT (splitmux, "Caught EOS at end of fragment, dropping");
splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE;
GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
@ -1293,9 +1763,11 @@ bus_handler (GstBin * bin, GstMessage * message)
}
GST_SPLITMUX_UNLOCK (splitmux);
break;
}
case GST_MESSAGE_ASYNC_START:
case GST_MESSAGE_ASYNC_DONE:
/* Ignore state changes from our children while switching */
GST_SPLITMUX_LOCK (splitmux);
if (splitmux->switching_fragment) {
if (GST_MESSAGE_SRC (message) == (GstObject *) splitmux->active_sink
|| GST_MESSAGE_SRC (message) == (GstObject *) splitmux->muxer) {
@ -1303,9 +1775,11 @@ bus_handler (GstBin * bin, GstMessage * message)
"Ignoring state change from child %" GST_PTR_FORMAT
" while switching", GST_MESSAGE_SRC (message));
gst_message_unref (message);
GST_SPLITMUX_UNLOCK (splitmux);
return;
}
}
GST_SPLITMUX_UNLOCK (splitmux);
break;
case GST_MESSAGE_WARNING:
{
@ -1473,6 +1947,10 @@ handle_gathered_gop (GstSplitMuxSink * splitmux)
/* Check for overrun - have we output at least one byte and overrun
* either threshold? */
if (need_new_fragment (splitmux, queued_time, queued_gop_time, queued_bytes)) {
GstClockTime *sink_running_time = g_new (GstClockTime, 1);
*sink_running_time = splitmux->reference_ctx->out_running_time;
g_object_set_qdata_full (G_OBJECT (splitmux->sink),
RUNNING_TIME, sink_running_time, g_free);
g_atomic_int_set (&(splitmux->split_now), FALSE);
/* Tell the output side to start a new fragment */
GST_INFO_OBJECT (splitmux,
@ -1998,6 +2476,7 @@ gst_splitmux_sink_request_new_pad (GstElement * element,
GST_SPLITMUX_LOCK (splitmux);
if (!create_muxer (splitmux))
goto fail;
g_signal_emit (splitmux, signals[SIGNAL_MUXER_ADDED], 0, splitmux->muxer);
if (templ->name_template) {
if (g_str_equal (templ->name_template, "video")) {
@ -2107,7 +2586,7 @@ gst_splitmux_sink_request_new_pad (GstElement * element,
GST_DEBUG_OBJECT (splitmux, "Request pad %" GST_PTR_FORMAT
" feeds queue pad %" GST_PTR_FORMAT, res, q_sink);
splitmux->contexts = g_list_prepend (splitmux->contexts, ctx);
splitmux->contexts = g_list_append (splitmux->contexts, ctx);
g_free (gname);
@ -2224,10 +2703,19 @@ create_muxer (GstSplitMuxSink * splitmux)
provided_muxer = gst_object_ref (splitmux->provided_muxer);
GST_OBJECT_UNLOCK (splitmux);
if (provided_muxer == NULL) {
if ((!splitmux->async_finalize && provided_muxer == NULL) ||
(splitmux->async_finalize && splitmux->muxer_factory == NULL)) {
if ((splitmux->muxer =
create_element (splitmux, "mp4mux", "muxer", FALSE)) == NULL)
create_element (splitmux, DEFAULT_MUXER, "muxer", FALSE)) == NULL)
goto fail;
} else if (splitmux->async_finalize) {
if ((splitmux->muxer =
create_element (splitmux, splitmux->muxer_factory, "muxer",
FALSE)) == NULL)
goto fail;
if (splitmux->muxer_properties)
gst_structure_foreach (splitmux->muxer_properties,
_set_property_from_structure, splitmux->muxer);
} else {
/* Ensure it's not in locked state (we might be reusing an old element) */
gst_element_set_locked_state (provided_muxer, FALSE);
@ -2308,11 +2796,21 @@ create_sink (GstSplitMuxSink * splitmux)
provided_sink = gst_object_ref (splitmux->provided_sink);
GST_OBJECT_UNLOCK (splitmux);
if (provided_sink == NULL) {
if ((!splitmux->async_finalize && provided_sink == NULL) ||
(splitmux->async_finalize && splitmux->sink_factory == NULL)) {
if ((splitmux->sink =
create_element (splitmux, DEFAULT_SINK, "sink", TRUE)) == NULL)
goto fail;
splitmux->active_sink = splitmux->sink;
} else if (splitmux->async_finalize) {
if ((splitmux->sink =
create_element (splitmux, splitmux->sink_factory, "sink",
TRUE)) == NULL)
goto fail;
if (splitmux->sink_properties)
gst_structure_foreach (splitmux->sink_properties,
_set_property_from_structure, splitmux->sink);
splitmux->active_sink = splitmux->sink;
} else {
/* Ensure the sink starts in locked state and NULL - it will be changed
* by the filename setting code */
@ -2452,6 +2950,8 @@ gst_splitmux_sink_change_state (GstElement * element, GstStateChange transition)
GST_SPLITMUX_UNLOCK (splitmux);
goto beach;
}
g_signal_emit (splitmux, signals[SIGNAL_MUXER_ADDED], 0, splitmux->muxer);
g_signal_emit (splitmux, signals[SIGNAL_SINK_ADDED], 0, splitmux->sink);
GST_SPLITMUX_UNLOCK (splitmux);
splitmux->fragment_id = 0;
break;

View file

@ -75,12 +75,14 @@ typedef struct _MqStreamCtx
guint q_overrun_id;
guint sink_pad_block_id;
guint src_pad_block_id;
gulong fragment_block_id;
gboolean is_reference;
gboolean flushing;
gboolean in_eos;
gboolean out_eos;
gboolean out_eos_async_done;
gboolean need_unblock;
gboolean caps_change;
@ -172,6 +174,13 @@ struct _GstSplitMuxSink
gboolean muxer_has_reserved_props;
gboolean split_now;
/* Async finalize options */
gboolean async_finalize;
gchar *muxer_factory;
GstStructure *muxer_properties;
gchar *sink_factory;
GstStructure *sink_properties;
};
struct _GstSplitMuxSinkClass

View file

@ -195,6 +195,7 @@ test_playback (const gchar * in_pattern, GstClockTime exp_first_time,
GstMessage *msg;
GstElement *pipeline;
GstElement *fakesink;
GstElement *fakesink2;
gchar *uri;
pipeline = gst_element_factory_make ("playbin", NULL);
@ -203,6 +204,9 @@ test_playback (const gchar * in_pattern, GstClockTime exp_first_time,
fakesink = gst_element_factory_make ("fakesink", NULL);
fail_if (fakesink == NULL);
g_object_set (G_OBJECT (pipeline), "video-sink", fakesink, NULL);
fakesink2 = gst_element_factory_make ("fakesink", NULL);
fail_if (fakesink2 == NULL);
g_object_set (G_OBJECT (pipeline), "audio-sink", fakesink2, NULL);
uri = g_strdup_printf ("splitmux://%s", in_pattern);
@ -371,6 +375,70 @@ GST_START_TEST (test_splitmuxsink)
GST_END_TEST;
GST_START_TEST (test_splitmuxsink_async)
{
GstMessage *msg;
GstElement *pipeline;
GstElement *sink;
GstPad *splitmux_sink_pad;
GstPad *enc_src_pad;
gchar *dest_pattern;
guint count;
gchar *in_pattern;
pipeline =
gst_parse_launch
("videotestsrc num-buffers=15 ! video/x-raw,width=80,height=64,framerate=5/1 ! videoconvert !"
" queue ! theoraenc keyframe-force=5 ! splitmuxsink name=splitsink "
" max-size-time=1000000000 async-finalize=true "
" muxer-factory=matroskamux audiotestsrc num-buffers=15 samplesperbuffer=9600 ! "
" audio/x-raw,rate=48000 ! splitsink.audio_%u", NULL);
fail_if (pipeline == NULL);
sink = gst_bin_get_by_name (GST_BIN (pipeline), "splitsink");
fail_if (sink == NULL);
g_signal_connect (sink, "format-location-full",
(GCallback) check_format_location, NULL);
dest_pattern = g_build_filename (tmpdir, "matroska%05d.mkv", NULL);
g_object_set (G_OBJECT (sink), "location", dest_pattern, NULL);
g_free (dest_pattern);
g_object_unref (sink);
msg = run_pipeline (pipeline);
if (GST_MESSAGE_TYPE (msg) == GST_MESSAGE_ERROR)
dump_error (msg);
fail_unless (GST_MESSAGE_TYPE (msg) == GST_MESSAGE_EOS);
gst_message_unref (msg);
/* unlink manually and relase request pad to ensure that we *can* do that
* - https://bugzilla.gnome.org/show_bug.cgi?id=753622 */
sink = gst_bin_get_by_name (GST_BIN (pipeline), "splitsink");
fail_if (sink == NULL);
splitmux_sink_pad = gst_element_get_static_pad (sink, "video");
fail_if (splitmux_sink_pad == NULL);
enc_src_pad = gst_pad_get_peer (splitmux_sink_pad);
fail_if (enc_src_pad == NULL);
fail_unless (gst_pad_unlink (enc_src_pad, splitmux_sink_pad));
gst_object_unref (enc_src_pad);
gst_element_release_request_pad (sink, splitmux_sink_pad);
gst_object_unref (splitmux_sink_pad);
/* at this point the pad must be releaased - try to find it again to verify */
splitmux_sink_pad = gst_element_get_static_pad (sink, "video");
fail_if (splitmux_sink_pad != NULL);
g_object_unref (sink);
gst_object_unref (pipeline);
count = count_files (tmpdir);
fail_unless (count == 3, "Expected 3 output files, got %d", count);
in_pattern = g_build_filename (tmpdir, "matroska*.mkv", NULL);
test_playback (in_pattern, 0, 3 * GST_SECOND);
g_free (in_pattern);
}
GST_END_TEST;
static GstPadProbeReturn
intercept_stream_start (GstPad * pad, GstPadProbeInfo * info,
gpointer user_data)
@ -771,6 +839,7 @@ splitmux_suite (void)
tempdir_cleanup);
tcase_add_test (tc_chain_complex, test_splitmuxsrc_sparse_streams);
tcase_add_test (tc_chain, test_splitmuxsink_async);
} else {
GST_INFO ("Skipping tests, missing plugins: matroska and/or vorbis");
}