splitmuxsink: Add format-location-full signal

Add a new signal for formatting the filename, which receives
a GstSample containing the first buffer from the reference
stream that will be muxed into that file.

Useful for creating filenames that are based on the
running time or other attributes of the buffer.

To make it work, opening of files and setting filenames is
now deferred until there is some data to write to it,
which also requires some changes to how async state changes
and gap events are handled.
This commit is contained in:
Jan Schmidt 2016-11-17 23:40:27 +11:00
parent 107902ec51
commit f7009eb5d7
3 changed files with 193 additions and 37 deletions

View file

@ -90,6 +90,7 @@ enum
enum
{
SIGNAL_FORMAT_LOCATION,
SIGNAL_FORMAT_LOCATION_FULL,
SIGNAL_LAST
};
@ -140,13 +141,15 @@ static GstStateChangeReturn gst_splitmux_sink_change_state (GstElement *
element, GstStateChange transition);
static void bus_handler (GstBin * bin, GstMessage * msg);
static void set_next_filename (GstSplitMuxSink * splitmux);
static void start_next_fragment (GstSplitMuxSink * splitmux);
static void set_next_filename (GstSplitMuxSink * splitmux, MqStreamCtx * ctx);
static void start_next_fragment (GstSplitMuxSink * splitmux, MqStreamCtx * ctx);
static void check_queue_length (GstSplitMuxSink * splitmux, MqStreamCtx * ctx);
static void mq_stream_ctx_unref (MqStreamCtx * ctx);
static void gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux);
static void do_async_done (GstSplitMuxSink * splitmux);
static MqStreamBuf *
mq_stream_buf_new (void)
{
@ -244,6 +247,20 @@ gst_splitmux_sink_class_init (GstSplitMuxSinkClass * klass)
signals[SIGNAL_FORMAT_LOCATION] =
g_signal_new ("format-location", G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_STRING, 1, G_TYPE_UINT);
/**
* GstSplitMuxSink::format-location-full:
* @splitmux: the #GstSplitMuxSink
* @fragment_id: the sequence number of the file to be created
* @first_sample: A #GstSample containing the first buffer
* from the reference stream in the new file
*
* Returns: the location to be used for the next output file
*/
signals[SIGNAL_FORMAT_LOCATION_FULL] =
g_signal_new ("format-location-full", G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_STRING, 2, G_TYPE_UINT,
GST_TYPE_SAMPLE);
}
static void
@ -614,8 +631,9 @@ complete_or_wait_on_out (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
send_eos (splitmux, ctx);
continue;
}
} else if (splitmux->state == SPLITMUX_STATE_START_NEXT_FRAGMENT) {
start_next_fragment (splitmux);
} else if (ctx->is_reference
&& splitmux->state == SPLITMUX_STATE_START_NEXT_FRAGMENT) {
start_next_fragment (splitmux, ctx);
continue;
}
@ -713,6 +731,20 @@ handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
if (splitmux->state == SPLITMUX_STATE_STOPPED)
goto beach;
/* When we get a gap event on the
* reference stream and we're trying to open a
* new file, we need to store it until we get
* the buffer afterwards
*/
if (ctx->is_reference &&
(splitmux->opening_first_fragment ||
splitmux->state == SPLITMUX_STATE_START_NEXT_FRAGMENT)) {
GST_DEBUG_OBJECT (pad, "Storing GAP event until buffer arrives");
gst_event_replace (&ctx->pending_gap, event);
GST_SPLITMUX_UNLOCK (splitmux);
return GST_PAD_PROBE_HANDLED;
}
if (rtime != GST_CLOCK_STIME_NONE) {
ctx->out_running_time = rtime;
complete_or_wait_on_out (splitmux, ctx);
@ -735,7 +767,8 @@ handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
if (splitmux->state == SPLITMUX_STATE_STOPPED)
goto beach;
ctx->out_running_time = ts;
complete_or_wait_on_out (splitmux, ctx);
if (!ctx->is_reference)
complete_or_wait_on_out (splitmux, ctx);
GST_SPLITMUX_UNLOCK (splitmux);
return GST_PAD_PROBE_DROP;
}
@ -758,17 +791,18 @@ handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
splitmux->queued_gops--;
ctx->out_running_time = buf_info->run_ts;
ctx->cur_buffer = gst_pad_probe_info_get_buffer (info);
GST_LOG_OBJECT (splitmux,
"Pad %" GST_PTR_FORMAT " buffer with run TS %" GST_STIME_FORMAT
" size %" G_GUINT64_FORMAT,
pad, GST_STIME_ARGS (ctx->out_running_time), buf_info->buf_size);
if (splitmux->opening_first_fragment) {
if (ctx->is_reference && splitmux->opening_first_fragment) {
if (request_next_keyframe (splitmux) == FALSE)
GST_WARNING_OBJECT (splitmux,
"Could not request a keyframe. Files may not split at the exact location they should");
send_fragment_opened_closed_msg (splitmux, TRUE);
start_next_fragment (splitmux, ctx);
splitmux->opening_first_fragment = FALSE;
}
@ -797,8 +831,23 @@ handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
}
#endif
ctx->cur_buffer = NULL;
GST_SPLITMUX_UNLOCK (splitmux);
/* pending_gap is protected by the STREAM lock */
if (ctx->pending_gap) {
/* If we previously stored a gap event, send it now */
GstPad *peer = gst_pad_get_peer (ctx->srcpad);
GST_DEBUG_OBJECT (splitmux,
"Pad %" GST_PTR_FORMAT " sending pending GAP event", ctx->srcpad);
gst_pad_send_event (peer, ctx->pending_gap);
ctx->pending_gap = NULL;
gst_object_unref (peer);
}
mq_stream_buf_free (buf_info);
return GST_PAD_PROBE_PASS;
@ -833,7 +882,7 @@ restart_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
* a new fragment
*/
static void
start_next_fragment (GstSplitMuxSink * splitmux)
start_next_fragment (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
{
/* 1 change to new file */
splitmux->switching_fragment = TRUE;
@ -843,7 +892,7 @@ start_next_fragment (GstSplitMuxSink * splitmux)
gst_element_set_state (splitmux->muxer, GST_STATE_NULL);
gst_element_set_state (splitmux->active_sink, GST_STATE_NULL);
set_next_filename (splitmux);
set_next_filename (splitmux, ctx);
gst_element_set_state (splitmux->active_sink, GST_STATE_TARGET (splitmux));
gst_element_set_state (splitmux->muxer, GST_STATE_TARGET (splitmux));
@ -851,27 +900,30 @@ start_next_fragment (GstSplitMuxSink * splitmux)
gst_element_set_locked_state (splitmux->active_sink, FALSE);
splitmux->switching_fragment = FALSE;
do_async_done (splitmux);
g_list_foreach (splitmux->contexts, (GFunc) restart_context, splitmux);
/* Switch state and go back to processing */
if (!splitmux->reference_ctx->in_eos) {
splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START;
splitmux->max_out_running_time = splitmux->reference_ctx->in_running_time;
} else {
splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
splitmux->max_out_running_time = GST_CLOCK_STIME_NONE;
splitmux->have_muxed_something = FALSE;
if (!splitmux->opening_first_fragment) {
/* Switch state and go back to processing */
if (!splitmux->reference_ctx->in_eos) {
splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START;
splitmux->max_out_running_time = splitmux->reference_ctx->in_running_time;
} else {
splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
splitmux->max_out_running_time = GST_CLOCK_STIME_NONE;
splitmux->have_muxed_something = FALSE;
}
splitmux->have_muxed_something =
(splitmux->reference_ctx->in_running_time > splitmux->muxed_out_time);
/* Store the overflow parameters as the basis for the next fragment */
splitmux->mux_start_bytes = splitmux->muxed_out_bytes;
GST_DEBUG_OBJECT (splitmux,
"Restarting flow for new fragment. New running time %" GST_STIME_FORMAT,
GST_STIME_ARGS (splitmux->max_out_running_time));
}
splitmux->have_muxed_something =
(splitmux->reference_ctx->in_running_time > splitmux->muxed_out_time);
/* Store the overflow parameters as the basis for the next fragment */
splitmux->mux_start_bytes = splitmux->muxed_out_bytes;
GST_DEBUG_OBJECT (splitmux,
"Restarting flow for new fragment. New running time %" GST_STIME_FORMAT,
GST_STIME_ARGS (splitmux->max_out_running_time));
send_fragment_opened_closed_msg (splitmux, TRUE);
@ -1561,7 +1613,7 @@ fail:
static GstElement *
create_element (GstSplitMuxSink * splitmux,
const gchar * factory, const gchar * name)
const gchar * factory, const gchar * name, gboolean locked)
{
GstElement *ret = gst_element_factory_make (factory, name);
if (ret == NULL) {
@ -1569,6 +1621,13 @@ create_element (GstSplitMuxSink * splitmux,
return NULL;
}
if (locked) {
/* Ensure the sink starts in locked state and NULL - it will be changed
* by the filename setting code */
gst_element_set_locked_state (ret, TRUE);
gst_element_set_state (ret, GST_STATE_NULL);
}
if (!gst_bin_add (GST_BIN (splitmux), ret)) {
g_warning ("Could not add %s element - splitmuxsink will not work", name);
gst_object_unref (ret);
@ -1584,7 +1643,8 @@ create_elements (GstSplitMuxSink * splitmux)
/* Create internal elements */
if (splitmux->mq == NULL) {
if ((splitmux->mq =
create_element (splitmux, "multiqueue", "multiqueue")) == NULL)
create_element (splitmux, "multiqueue", "multiqueue",
FALSE)) == NULL)
goto fail;
splitmux->mq_max_buffers = 5;
@ -1603,7 +1663,7 @@ create_elements (GstSplitMuxSink * splitmux)
if (provided_muxer == NULL) {
if ((splitmux->muxer =
create_element (splitmux, "mp4mux", "muxer")) == NULL)
create_element (splitmux, "mp4mux", "muxer", FALSE)) == NULL)
goto fail;
} else {
/* Ensure it's not in locked state (we might be reusing an old element) */
@ -1680,12 +1740,14 @@ create_sink (GstSplitMuxSink * splitmux)
if (provided_sink == NULL) {
if ((splitmux->sink =
create_element (splitmux, DEFAULT_SINK, "sink")) == NULL)
create_element (splitmux, DEFAULT_SINK, "sink", TRUE)) == NULL)
goto fail;
splitmux->active_sink = splitmux->sink;
} else {
/* Ensure it's not in locked state (we might be reusing an old element) */
gst_element_set_locked_state (provided_sink, FALSE);
/* Ensure the sink starts in locked state and NULL - it will be changed
* by the filename setting code */
gst_element_set_locked_state (provided_sink, TRUE);
gst_element_set_state (provided_sink, GST_STATE_NULL);
if (!gst_bin_add (GST_BIN (splitmux), provided_sink)) {
g_warning ("Could not add sink elements - splitmuxsink will not work");
gst_object_unref (provided_sink);
@ -1721,13 +1783,30 @@ fail:
#pragma GCC diagnostic ignored "-Wformat-nonliteral"
#endif
static void
set_next_filename (GstSplitMuxSink * splitmux)
set_next_filename (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
{
gchar *fname = NULL;
GstSample *sample;
GstCaps *caps;
gst_splitmux_sink_ensure_max_files (splitmux);
g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION], 0,
splitmux->fragment_id, &fname);
if (ctx->cur_buffer == NULL)
g_warning ("Starting next file without buffer");
caps = gst_pad_get_current_caps (ctx->srcpad);
sample = gst_sample_new (ctx->cur_buffer, caps, &ctx->out_segment, NULL);
g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION_FULL], 0,
splitmux->fragment_id, sample, &fname);
gst_sample_unref (sample);
if (caps)
gst_caps_unref (caps);
if (fname == NULL) {
/* Fallback to the old signal if the new one returned nothing */
g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION], 0,
splitmux->fragment_id, &fname);
}
if (!fname)
fname = splitmux->location ?
@ -1742,6 +1821,43 @@ set_next_filename (GstSplitMuxSink * splitmux)
}
}
static void
do_async_start (GstSplitMuxSink * splitmux)
{
GstMessage *message;
if (!splitmux->need_async_start) {
GST_INFO_OBJECT (splitmux, "no async_start needed");
return;
}
splitmux->async_pending = TRUE;
GST_INFO_OBJECT (splitmux, "Sending async_start message");
message = gst_message_new_async_start (GST_OBJECT_CAST (splitmux));
GST_BIN_CLASS (parent_class)->handle_message (GST_BIN_CAST
(splitmux), message);
}
static void
do_async_done (GstSplitMuxSink * splitmux)
{
GstMessage *message;
if (splitmux->async_pending) {
GST_INFO_OBJECT (splitmux, "Sending async_done message");
message =
gst_message_new_async_done (GST_OBJECT_CAST (splitmux),
GST_CLOCK_TIME_NONE);
GST_BIN_CLASS (parent_class)->handle_message (GST_BIN_CAST
(splitmux), message);
splitmux->async_pending = FALSE;
}
splitmux->need_async_start = FALSE;
}
static GstStateChangeReturn
gst_splitmux_sink_change_state (GstElement * element, GstStateChange transition)
{
@ -1758,7 +1874,6 @@ gst_splitmux_sink_change_state (GstElement * element, GstStateChange transition)
}
GST_SPLITMUX_UNLOCK (splitmux);
splitmux->fragment_id = 0;
set_next_filename (splitmux);
break;
}
case GST_STATE_CHANGE_READY_TO_PAUSED:{
@ -1792,12 +1907,29 @@ gst_splitmux_sink_change_state (GstElement * element, GstStateChange transition)
goto beach;
switch (transition) {
case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
splitmux->need_async_start = TRUE;
break;
case GST_STATE_CHANGE_READY_TO_PAUSED:{
/* Change state async, because our child sink might not
* be ready to do that for us yet if it's state is still locked */
splitmux->need_async_start = TRUE;
/* we want to go async to PAUSED until we managed to configure and add the
* sink */
GST_SPLITMUX_LOCK (splitmux);
do_async_start (splitmux);
GST_SPLITMUX_UNLOCK (splitmux);
ret = GST_STATE_CHANGE_ASYNC;
break;
}
case GST_STATE_CHANGE_READY_TO_NULL:
GST_SPLITMUX_LOCK (splitmux);
splitmux->fragment_id = 0;
/* Reset internal elements only if no pad contexts are using them */
if (splitmux->contexts == NULL)
gst_splitmux_reset (splitmux);
do_async_done (splitmux);
GST_SPLITMUX_UNLOCK (splitmux);
break;
default:
@ -1805,12 +1937,14 @@ gst_splitmux_sink_change_state (GstElement * element, GstStateChange transition)
}
beach:
if (transition == GST_STATE_CHANGE_NULL_TO_READY &&
ret == GST_STATE_CHANGE_FAILURE) {
/* Cleanup elements on failed transition out of NULL */
gst_splitmux_reset (splitmux);
}
GST_SPLITMUX_LOCK (splitmux);
do_async_done (splitmux);
GST_SPLITMUX_UNLOCK (splitmux);
return ret;
}

View file

@ -82,6 +82,9 @@ typedef struct _MqStreamCtx
GstPad *srcpad;
gboolean out_blocked;
GstBuffer *cur_buffer;
GstEvent *pending_gap;
} MqStreamCtx;
struct _GstSplitMuxSink {
@ -131,6 +134,9 @@ struct _GstSplitMuxSink {
gboolean switching_fragment;
gboolean have_video;
gboolean need_async_start;
gboolean async_pending;
};
struct _GstSplitMuxSinkClass {

View file

@ -192,6 +192,20 @@ GST_START_TEST (test_splitmuxsrc_format_location)
GST_END_TEST;
static gchar *
check_format_location (GstElement * object,
guint fragment_id, GstSample * first_sample)
{
GstBuffer *buf = gst_sample_get_buffer (first_sample);
/* Must have a buffer */
fail_if (buf == NULL);
GST_LOG ("New file - first buffer %" GST_TIME_FORMAT,
GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buf)));
return NULL;
}
GST_START_TEST (test_splitmuxsink)
{
GstMessage *msg;
@ -213,6 +227,8 @@ GST_START_TEST (test_splitmuxsink)
fail_if (pipeline == NULL);
sink = gst_bin_get_by_name (GST_BIN (pipeline), "splitsink");
fail_if (sink == NULL);
g_signal_connect (sink, "format-location-full",
(GCallback) check_format_location, NULL);
dest_pattern = g_build_filename (tmpdir, "out%05d.ogg", NULL);
g_object_set (G_OBJECT (sink), "location", dest_pattern, NULL);
g_free (dest_pattern);