gstreamer/gst/multifile/gstsplitmuxsink.c

2264 lines
74 KiB
C
Raw Normal View History

/* GStreamer Muxer bin that splits output stream by size/time
* Copyright (C) <2014> Jan Schmidt <jan@centricular.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
/**
* SECTION:element-splitmuxsink
* @short_description: Muxer wrapper for splitting output stream by size or time
*
* This element wraps a muxer and a sink, and starts a new file when the mux
* contents are about to cross a threshold of maximum size of maximum time,
* splitting at video keyframe boundaries. Exactly one input video stream
* can be muxed, with as many accompanying audio and subtitle streams as
* desired.
*
* By default, it uses mp4mux and filesink, but they can be changed via
* the 'muxer' and 'sink' properties.
*
* The minimum file size is 1 GOP, however - so limits may be overrun if the
* distance between any 2 keyframes is larger than the limits.
*
* If a video stream is available, the splitting process is driven by the video
* stream contents, and the video stream must contain closed GOPs for the output
* file parts to be played individually correctly. In the absence of a video
* stream, the first available stream is used as reference for synchronization.
*
* <refsect2>
* <title>Example pipelines</title>
* |[
* gst-launch-1.0 -e v4l2src num-buffers=500 ! video/x-raw,width=320,height=240 ! videoconvert ! queue ! timeoverlay ! x264enc key-int-max=10 ! h264parse ! splitmuxsink location=video%02d.mov max-size-time=10000000000 max-size-bytes=1000000
* ]|
* Records a video stream captured from a v4l2 device and muxes it into
* ISO mp4 files, splitting as needed to limit size/duration to 10 seconds
* and 1MB maximum size.
* </refsect2>
*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include <string.h>
#include <glib/gstdio.h>
#include <gst/video/video.h>
#include "gstsplitmuxsink.h"
GST_DEBUG_CATEGORY_STATIC (splitmux_debug);
#define GST_CAT_DEFAULT splitmux_debug
#define GST_SPLITMUX_LOCK(s) g_mutex_lock(&(s)->lock)
#define GST_SPLITMUX_UNLOCK(s) g_mutex_unlock(&(s)->lock)
#define GST_SPLITMUX_WAIT_INPUT(s) g_cond_wait (&(s)->input_cond, &(s)->lock)
#define GST_SPLITMUX_BROADCAST_INPUT(s) g_cond_broadcast (&(s)->input_cond)
#define GST_SPLITMUX_WAIT_OUTPUT(s) g_cond_wait (&(s)->output_cond, &(s)->lock)
#define GST_SPLITMUX_BROADCAST_OUTPUT(s) g_cond_broadcast (&(s)->output_cond)
enum
{
PROP_0,
PROP_LOCATION,
PROP_MAX_SIZE_TIME,
PROP_MAX_SIZE_BYTES,
PROP_MAX_SIZE_TIMECODE,
PROP_SEND_KEYFRAME_REQUESTS,
PROP_MAX_FILES,
PROP_MUXER_OVERHEAD,
PROP_MUXER,
PROP_SINK
};
#define DEFAULT_MAX_SIZE_TIME 0
#define DEFAULT_MAX_SIZE_BYTES 0
#define DEFAULT_MAX_FILES 0
#define DEFAULT_MUXER_OVERHEAD 0.02
#define DEFAULT_SEND_KEYFRAME_REQUESTS FALSE
#define DEFAULT_MUXER "mp4mux"
#define DEFAULT_SINK "filesink"
enum
{
SIGNAL_FORMAT_LOCATION,
SIGNAL_FORMAT_LOCATION_FULL,
SIGNAL_LAST
};
static guint signals[SIGNAL_LAST];
static GstStaticPadTemplate video_sink_template =
GST_STATIC_PAD_TEMPLATE ("video",
GST_PAD_SINK,
GST_PAD_REQUEST,
GST_STATIC_CAPS_ANY);
static GstStaticPadTemplate audio_sink_template =
GST_STATIC_PAD_TEMPLATE ("audio_%u",
GST_PAD_SINK,
GST_PAD_REQUEST,
GST_STATIC_CAPS_ANY);
static GstStaticPadTemplate subtitle_sink_template =
GST_STATIC_PAD_TEMPLATE ("subtitle_%u",
GST_PAD_SINK,
GST_PAD_REQUEST,
GST_STATIC_CAPS_ANY);
static GQuark PAD_CONTEXT;
static void
_do_init (void)
{
PAD_CONTEXT = g_quark_from_static_string ("pad-context");
}
#define gst_splitmux_sink_parent_class parent_class
G_DEFINE_TYPE_EXTENDED (GstSplitMuxSink, gst_splitmux_sink, GST_TYPE_BIN, 0,
_do_init ());
static gboolean create_muxer (GstSplitMuxSink * splitmux);
static gboolean create_sink (GstSplitMuxSink * splitmux);
static void gst_splitmux_sink_set_property (GObject * object, guint prop_id,
const GValue * value, GParamSpec * pspec);
static void gst_splitmux_sink_get_property (GObject * object, guint prop_id,
GValue * value, GParamSpec * pspec);
static void gst_splitmux_sink_dispose (GObject * object);
static void gst_splitmux_sink_finalize (GObject * object);
static GstPad *gst_splitmux_sink_request_new_pad (GstElement * element,
GstPadTemplate * templ, const gchar * name, const GstCaps * caps);
static void gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad);
static GstStateChangeReturn gst_splitmux_sink_change_state (GstElement *
element, GstStateChange transition);
static void bus_handler (GstBin * bin, GstMessage * msg);
static void set_next_filename (GstSplitMuxSink * splitmux, MqStreamCtx * ctx);
static void start_next_fragment (GstSplitMuxSink * splitmux, MqStreamCtx * ctx);
static void mq_stream_ctx_unref (MqStreamCtx * ctx);
static void grow_blocked_queues (GstSplitMuxSink * splitmux);
static void gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux);
static GstElement *create_element (GstSplitMuxSink * splitmux,
const gchar * factory, const gchar * name, gboolean locked);
static void do_async_done (GstSplitMuxSink * splitmux);
static MqStreamBuf *
mq_stream_buf_new (void)
{
return g_slice_new0 (MqStreamBuf);
}
static void
mq_stream_buf_free (MqStreamBuf * data)
{
g_slice_free (MqStreamBuf, data);
}
static SplitMuxOutputCommand *
out_cmd_buf_new (void)
{
return g_slice_new0 (SplitMuxOutputCommand);
}
static void
out_cmd_buf_free (SplitMuxOutputCommand * data)
{
g_slice_free (SplitMuxOutputCommand, data);
}
static void
gst_splitmux_sink_class_init (GstSplitMuxSinkClass * klass)
{
GObjectClass *gobject_class = (GObjectClass *) klass;
GstElementClass *gstelement_class = (GstElementClass *) klass;
GstBinClass *gstbin_class = (GstBinClass *) klass;
gobject_class->set_property = gst_splitmux_sink_set_property;
gobject_class->get_property = gst_splitmux_sink_get_property;
gobject_class->dispose = gst_splitmux_sink_dispose;
gobject_class->finalize = gst_splitmux_sink_finalize;
gst_element_class_set_static_metadata (gstelement_class,
"Split Muxing Bin", "Generic/Bin/Muxer",
"Convenience bin that muxes incoming streams into multiple time/size limited files",
"Jan Schmidt <jan@centricular.com>");
gst_element_class_add_static_pad_template (gstelement_class,
&video_sink_template);
gst_element_class_add_static_pad_template (gstelement_class,
&audio_sink_template);
gst_element_class_add_static_pad_template (gstelement_class,
&subtitle_sink_template);
gstelement_class->change_state =
GST_DEBUG_FUNCPTR (gst_splitmux_sink_change_state);
gstelement_class->request_new_pad =
GST_DEBUG_FUNCPTR (gst_splitmux_sink_request_new_pad);
gstelement_class->release_pad =
GST_DEBUG_FUNCPTR (gst_splitmux_sink_release_pad);
gstbin_class->handle_message = bus_handler;
g_object_class_install_property (gobject_class, PROP_LOCATION,
g_param_spec_string ("location", "File Output Pattern",
"Format string pattern for the location of the files to write (e.g. video%05d.mp4)",
NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_MUXER_OVERHEAD,
g_param_spec_double ("mux-overhead", "Muxing Overhead",
"Extra size overhead of muxing (0.02 = 2%)", 0.0, 1.0,
DEFAULT_MUXER_OVERHEAD,
G_PARAM_READWRITE | GST_PARAM_CONTROLLABLE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
"Max. amount of time per file (in ns, 0=disable)", 0, G_MAXUINT64,
DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BYTES,
g_param_spec_uint64 ("max-size-bytes", "Max. size bytes",
"Max. amount of data per file (in bytes, 0=disable)", 0, G_MAXUINT64,
DEFAULT_MAX_SIZE_BYTES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIMECODE,
g_param_spec_string ("max-size-timecode", "Maximum timecode difference",
"Maximum difference in timecode between first and last frame. "
"Separator is assumed to be \":\" everywhere (e.g. 01:00:00:00). "
"Will only be effective if a timecode track is present.",
NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_SEND_KEYFRAME_REQUESTS,
g_param_spec_boolean ("send-keyframe-requests",
"Request keyframes at max-size-time",
"Request a keyframe every max-size-time ns to try splitting at that point. "
"Needs max-size-bytes to be 0 in order to be effective.",
DEFAULT_SEND_KEYFRAME_REQUESTS,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_MAX_FILES,
g_param_spec_uint ("max-files", "Max files",
"Maximum number of files to keep on disk. Once the maximum is reached,"
"old files start to be deleted to make room for new ones.", 0,
G_MAXUINT, DEFAULT_MAX_FILES,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_MUXER,
g_param_spec_object ("muxer", "Muxer",
"The muxer element to use (NULL = default mp4mux)",
GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_SINK,
g_param_spec_object ("sink", "Sink",
"The sink element (or element chain) to use (NULL = default filesink)",
GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
/**
* GstSplitMuxSink::format-location:
* @splitmux: the #GstSplitMuxSink
* @fragment_id: the sequence number of the file to be created
*
* Returns: the location to be used for the next output file
*/
signals[SIGNAL_FORMAT_LOCATION] =
g_signal_new ("format-location", G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_STRING, 1, G_TYPE_UINT);
/**
* GstSplitMuxSink::format-location-full:
* @splitmux: the #GstSplitMuxSink
* @fragment_id: the sequence number of the file to be created
* @first_sample: A #GstSample containing the first buffer
* from the reference stream in the new file
*
* Returns: the location to be used for the next output file
*/
signals[SIGNAL_FORMAT_LOCATION_FULL] =
g_signal_new ("format-location-full", G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_STRING, 2, G_TYPE_UINT,
GST_TYPE_SAMPLE);
}
static void
gst_splitmux_sink_init (GstSplitMuxSink * splitmux)
{
g_mutex_init (&splitmux->lock);
g_cond_init (&splitmux->input_cond);
g_cond_init (&splitmux->output_cond);
g_queue_init (&splitmux->out_cmd_q);
splitmux->mux_overhead = DEFAULT_MUXER_OVERHEAD;
splitmux->threshold_time = DEFAULT_MAX_SIZE_TIME;
splitmux->threshold_bytes = DEFAULT_MAX_SIZE_BYTES;
splitmux->max_files = DEFAULT_MAX_FILES;
splitmux->send_keyframe_requests = DEFAULT_SEND_KEYFRAME_REQUESTS;
splitmux->next_max_tc_time = GST_CLOCK_TIME_NONE;
splitmux->threshold_timecode_str = NULL;
GST_OBJECT_FLAG_SET (splitmux, GST_ELEMENT_FLAG_SINK);
}
static void
gst_splitmux_reset (GstSplitMuxSink * splitmux)
{
if (splitmux->muxer) {
gst_element_set_locked_state (splitmux->muxer, TRUE);
gst_element_set_state (splitmux->muxer, GST_STATE_NULL);
gst_bin_remove (GST_BIN (splitmux), splitmux->muxer);
}
if (splitmux->active_sink) {
gst_element_set_locked_state (splitmux->active_sink, TRUE);
gst_element_set_state (splitmux->active_sink, GST_STATE_NULL);
gst_bin_remove (GST_BIN (splitmux), splitmux->active_sink);
}
splitmux->sink = splitmux->active_sink = splitmux->muxer = NULL;
}
static void
gst_splitmux_sink_dispose (GObject * object)
{
GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
/* Calling parent dispose invalidates all child pointers */
splitmux->sink = splitmux->active_sink = splitmux->muxer = NULL;
G_OBJECT_CLASS (parent_class)->dispose (object);
}
static void
gst_splitmux_sink_finalize (GObject * object)
{
GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
g_cond_clear (&splitmux->input_cond);
g_cond_clear (&splitmux->output_cond);
2015-04-09 11:58:26 +00:00
g_mutex_clear (&splitmux->lock);
g_queue_foreach (&splitmux->out_cmd_q, (GFunc) out_cmd_buf_free, NULL);
g_queue_clear (&splitmux->out_cmd_q);
if (splitmux->provided_sink)
gst_object_unref (splitmux->provided_sink);
if (splitmux->provided_muxer)
gst_object_unref (splitmux->provided_muxer);
if (splitmux->threshold_timecode_str)
g_free (splitmux->threshold_timecode_str);
g_free (splitmux->location);
/* Make sure to free any un-released contexts */
g_list_foreach (splitmux->contexts, (GFunc) mq_stream_ctx_unref, NULL);
g_list_free (splitmux->contexts);
G_OBJECT_CLASS (parent_class)->finalize (object);
}
static void
gst_splitmux_sink_set_property (GObject * object, guint prop_id,
const GValue * value, GParamSpec * pspec)
{
GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
switch (prop_id) {
case PROP_LOCATION:{
GST_OBJECT_LOCK (splitmux);
g_free (splitmux->location);
splitmux->location = g_value_dup_string (value);
GST_OBJECT_UNLOCK (splitmux);
break;
}
case PROP_MAX_SIZE_BYTES:
GST_OBJECT_LOCK (splitmux);
splitmux->threshold_bytes = g_value_get_uint64 (value);
GST_OBJECT_UNLOCK (splitmux);
break;
case PROP_MAX_SIZE_TIME:
GST_OBJECT_LOCK (splitmux);
splitmux->threshold_time = g_value_get_uint64 (value);
GST_OBJECT_UNLOCK (splitmux);
break;
case PROP_MAX_SIZE_TIMECODE:
GST_OBJECT_LOCK (splitmux);
splitmux->threshold_timecode_str = g_value_dup_string (value);
GST_OBJECT_UNLOCK (splitmux);
break;
case PROP_SEND_KEYFRAME_REQUESTS:
GST_OBJECT_LOCK (splitmux);
splitmux->send_keyframe_requests = g_value_get_boolean (value);
GST_OBJECT_UNLOCK (splitmux);
break;
case PROP_MAX_FILES:
GST_OBJECT_LOCK (splitmux);
splitmux->max_files = g_value_get_uint (value);
GST_OBJECT_UNLOCK (splitmux);
break;
case PROP_MUXER_OVERHEAD:
GST_OBJECT_LOCK (splitmux);
splitmux->mux_overhead = g_value_get_double (value);
GST_OBJECT_UNLOCK (splitmux);
break;
case PROP_SINK:
GST_OBJECT_LOCK (splitmux);
if (splitmux->provided_sink)
gst_object_unref (splitmux->provided_sink);
splitmux->provided_sink = g_value_get_object (value);
gst_object_ref_sink (splitmux->provided_sink);
GST_OBJECT_UNLOCK (splitmux);
break;
case PROP_MUXER:
GST_OBJECT_LOCK (splitmux);
if (splitmux->provided_muxer)
gst_object_unref (splitmux->provided_muxer);
splitmux->provided_muxer = g_value_get_object (value);
gst_object_ref_sink (splitmux->provided_muxer);
GST_OBJECT_UNLOCK (splitmux);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
}
}
static void
gst_splitmux_sink_get_property (GObject * object, guint prop_id,
GValue * value, GParamSpec * pspec)
{
GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
switch (prop_id) {
case PROP_LOCATION:
GST_OBJECT_LOCK (splitmux);
g_value_set_string (value, splitmux->location);
GST_OBJECT_UNLOCK (splitmux);
break;
case PROP_MAX_SIZE_BYTES:
GST_OBJECT_LOCK (splitmux);
g_value_set_uint64 (value, splitmux->threshold_bytes);
GST_OBJECT_UNLOCK (splitmux);
break;
case PROP_MAX_SIZE_TIME:
GST_OBJECT_LOCK (splitmux);
g_value_set_uint64 (value, splitmux->threshold_time);
GST_OBJECT_UNLOCK (splitmux);
break;
case PROP_MAX_SIZE_TIMECODE:
GST_OBJECT_LOCK (splitmux);
g_value_set_string (value, splitmux->threshold_timecode_str);
GST_OBJECT_UNLOCK (splitmux);
break;
case PROP_SEND_KEYFRAME_REQUESTS:
GST_OBJECT_LOCK (splitmux);
g_value_set_boolean (value, splitmux->send_keyframe_requests);
GST_OBJECT_UNLOCK (splitmux);
break;
case PROP_MAX_FILES:
GST_OBJECT_LOCK (splitmux);
g_value_set_uint (value, splitmux->max_files);
GST_OBJECT_UNLOCK (splitmux);
break;
case PROP_MUXER_OVERHEAD:
GST_OBJECT_LOCK (splitmux);
g_value_set_double (value, splitmux->mux_overhead);
GST_OBJECT_UNLOCK (splitmux);
break;
case PROP_SINK:
GST_OBJECT_LOCK (splitmux);
g_value_set_object (value, splitmux->provided_sink);
GST_OBJECT_UNLOCK (splitmux);
break;
case PROP_MUXER:
GST_OBJECT_LOCK (splitmux);
g_value_set_object (value, splitmux->provided_muxer);
GST_OBJECT_UNLOCK (splitmux);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
}
}
/* Convenience function */
static inline GstClockTimeDiff
my_segment_to_running_time (GstSegment * segment, GstClockTime val)
{
GstClockTimeDiff res = GST_CLOCK_STIME_NONE;
if (GST_CLOCK_TIME_IS_VALID (val)) {
gboolean sign =
gst_segment_to_running_time_full (segment, GST_FORMAT_TIME, val, &val);
if (sign > 0)
res = val;
else if (sign < 0)
res = -val;
}
return res;
}
static MqStreamCtx *
mq_stream_ctx_new (GstSplitMuxSink * splitmux)
{
MqStreamCtx *ctx;
ctx = g_new0 (MqStreamCtx, 1);
g_atomic_int_set (&ctx->refcount, 1);
ctx->splitmux = splitmux;
gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
ctx->in_running_time = ctx->out_running_time = GST_CLOCK_STIME_NONE;
g_queue_init (&ctx->queued_bufs);
return ctx;
}
static void
mq_stream_ctx_free (MqStreamCtx * ctx)
{
if (ctx->q) {
g_signal_handler_disconnect (ctx->q, ctx->q_overrun_id);
gst_element_set_locked_state (ctx->q, TRUE);
gst_element_set_state (ctx->q, GST_STATE_NULL);
gst_bin_remove (GST_BIN (ctx->splitmux), ctx->q);
gst_object_unref (ctx->q);
}
gst_buffer_replace (&ctx->prev_in_keyframe, NULL);
gst_object_unref (ctx->sinkpad);
gst_object_unref (ctx->srcpad);
g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
g_queue_clear (&ctx->queued_bufs);
g_free (ctx);
}
static void
mq_stream_ctx_unref (MqStreamCtx * ctx)
{
if (g_atomic_int_dec_and_test (&ctx->refcount))
mq_stream_ctx_free (ctx);
}
static void
mq_stream_ctx_ref (MqStreamCtx * ctx)
{
g_atomic_int_inc (&ctx->refcount);
}
static void
_pad_block_destroy_sink_notify (MqStreamCtx * ctx)
{
ctx->sink_pad_block_id = 0;
mq_stream_ctx_unref (ctx);
}
static void
_pad_block_destroy_src_notify (MqStreamCtx * ctx)
{
ctx->src_pad_block_id = 0;
mq_stream_ctx_unref (ctx);
}
static void
send_fragment_opened_closed_msg (GstSplitMuxSink * splitmux, gboolean opened)
{
gchar *location = NULL;
GstMessage *msg;
const gchar *msg_name = opened ?
"splitmuxsink-fragment-opened" : "splitmuxsink-fragment-closed";
g_object_get (splitmux->sink, "location", &location, NULL);
msg = gst_message_new_element (GST_OBJECT (splitmux),
gst_structure_new (msg_name,
"location", G_TYPE_STRING, location,
"running-time", GST_TYPE_CLOCK_TIME,
splitmux->reference_ctx->out_running_time, NULL));
gst_element_post_message (GST_ELEMENT_CAST (splitmux), msg);
g_free (location);
}
/* Called with lock held, drops the lock to send EOS to the
* pad
*/
static void
send_eos (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
{
GstEvent *eos;
GstPad *pad;
eos = gst_event_new_eos ();
pad = gst_pad_get_peer (ctx->srcpad);
ctx->out_eos = TRUE;
GST_INFO_OBJECT (splitmux, "Sending EOS on %" GST_PTR_FORMAT, pad);
GST_SPLITMUX_UNLOCK (splitmux);
gst_pad_send_event (pad, eos);
GST_SPLITMUX_LOCK (splitmux);
gst_object_unref (pad);
}
/* Called with splitmux lock held to check if this output
* context needs to sleep to wait for the release of the
* next GOP, or to send EOS to close out the current file
*/
static void
complete_or_wait_on_out (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
{
if (ctx->caps_change)
return;
do {
/* When first starting up, the reference stream has to output
* the first buffer to prepare the muxer and sink */
gboolean can_output = (ctx->is_reference || splitmux->ready_for_output);
if (ctx->flushing
|| splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
return;
GST_LOG_OBJECT (ctx->srcpad,
"Checking running time %" GST_STIME_FORMAT " against max %"
GST_STIME_FORMAT, GST_STIME_ARGS (ctx->out_running_time),
GST_STIME_ARGS (splitmux->max_out_running_time));
if (can_output) {
if (splitmux->max_out_running_time == GST_CLOCK_STIME_NONE ||
ctx->out_running_time < splitmux->max_out_running_time) {
return;
}
switch (splitmux->output_state) {
case SPLITMUX_OUTPUT_STATE_OUTPUT_GOP:
/* We only get here if we've finished outputting a GOP and need to know
* what to do next */
splitmux->output_state = SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND;
GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
continue;
case SPLITMUX_OUTPUT_STATE_ENDING_FILE:
/* We've reached the max out running_time to get here, so end this file now */
if (ctx->out_eos == FALSE) {
send_eos (splitmux, ctx);
continue;
}
break;
case SPLITMUX_OUTPUT_STATE_START_NEXT_FILE:
if (ctx->is_reference) {
/* Special handling on the reference ctx to start new fragments
* and collect commands from the command queue */
/* drops the splitmux lock briefly: */
start_next_fragment (splitmux, ctx);
continue;
}
break;
case SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND:{
do {
SplitMuxOutputCommand *cmd =
g_queue_pop_tail (&splitmux->out_cmd_q);
if (cmd != NULL) {
/* If we pop the last command, we need to make our queues bigger */
if (g_queue_get_length (&splitmux->out_cmd_q) == 0)
grow_blocked_queues (splitmux);
if (cmd->start_new_fragment) {
GST_DEBUG_OBJECT (splitmux, "Got cmd to start new fragment");
splitmux->output_state = SPLITMUX_OUTPUT_STATE_ENDING_FILE;
} else {
GST_DEBUG_OBJECT (splitmux,
"Got new output cmd for time %" GST_STIME_FORMAT,
GST_STIME_ARGS (cmd->max_output_ts));
/* Extend the output range immediately */
splitmux->max_out_running_time = cmd->max_output_ts;
splitmux->output_state = SPLITMUX_OUTPUT_STATE_OUTPUT_GOP;
}
GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
out_cmd_buf_free (cmd);
break;
} else {
GST_SPLITMUX_WAIT_OUTPUT (splitmux);
}
} while (splitmux->output_state ==
SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND);
/* loop and re-check the state */
continue;
}
case SPLITMUX_OUTPUT_STATE_STOPPED:
return;
}
}
GST_INFO_OBJECT (ctx->srcpad,
"Sleeping for running time %"
GST_STIME_FORMAT " (max %" GST_STIME_FORMAT ") or state change.",
GST_STIME_ARGS (ctx->out_running_time),
GST_STIME_ARGS (splitmux->max_out_running_time));
GST_SPLITMUX_WAIT_OUTPUT (splitmux);
GST_INFO_OBJECT (ctx->srcpad,
"Woken for new max running time %" GST_STIME_FORMAT,
GST_STIME_ARGS (splitmux->max_out_running_time));
}
while (1);
}
static GstClockTime
calculate_next_max_timecode (GstSplitMuxSink * splitmux,
const GstVideoTimeCode * cur_tc)
{
GstVideoTimeCode *target_tc;
GstVideoTimeCodeInterval *tc_inter;
GstClockTime cur_tc_time, target_tc_time, next_max_tc_time;
if (cur_tc == NULL || splitmux->threshold_timecode_str == NULL)
return GST_CLOCK_TIME_NONE;
tc_inter =
gst_video_time_code_interval_new_from_string
(splitmux->threshold_timecode_str);
target_tc = gst_video_time_code_add_interval (cur_tc, tc_inter);
gst_video_time_code_interval_free (tc_inter);
/* Convert to ns */
target_tc_time = gst_video_time_code_nsec_since_daily_jam (target_tc);
cur_tc_time = gst_video_time_code_nsec_since_daily_jam (cur_tc);
/* Add fragment_start_time, accounting for wraparound */
if (target_tc_time >= cur_tc_time) {
next_max_tc_time =
target_tc_time - cur_tc_time + splitmux->fragment_start_time;
} else {
GstClockTime day_in_ns = 24 * 60 * 60 * GST_SECOND;
next_max_tc_time =
day_in_ns - cur_tc_time + target_tc_time +
splitmux->fragment_start_time;
}
GST_INFO_OBJECT (splitmux, "Next max TC time: %" GST_TIME_FORMAT
" from ref TC: %" GST_TIME_FORMAT, GST_TIME_ARGS (next_max_tc_time),
GST_TIME_ARGS (cur_tc_time));
gst_video_time_code_free (target_tc);
return next_max_tc_time;
}
static gboolean
request_next_keyframe (GstSplitMuxSink * splitmux, GstBuffer * buffer)
{
GstEvent *ev;
GstClockTime target_time;
gboolean timecode_based = FALSE;
splitmux->next_max_tc_time = GST_CLOCK_TIME_NONE;
if (splitmux->threshold_timecode_str) {
GstVideoTimeCodeMeta *tc_meta;
if (buffer != NULL) {
tc_meta = gst_buffer_get_video_time_code_meta (buffer);
if (tc_meta) {
splitmux->next_max_tc_time =
calculate_next_max_timecode (splitmux, &tc_meta->tc);
timecode_based = (splitmux->next_max_tc_time != GST_CLOCK_TIME_NONE);
}
} else {
/* This can happen in the presence of GAP events that trigger
* a new fragment start */
GST_WARNING_OBJECT (splitmux,
"No buffer available to calculate next timecode");
}
}
if (splitmux->send_keyframe_requests == FALSE
|| (splitmux->threshold_time == 0 && !timecode_based)
|| splitmux->threshold_bytes != 0)
return TRUE;
if (timecode_based) {
/* We might have rounding errors: aim slightly earlier */
target_time = splitmux->next_max_tc_time - 5 * GST_USECOND;
} else {
target_time = splitmux->fragment_start_time + splitmux->threshold_time;
}
ev = gst_video_event_new_upstream_force_key_unit (target_time, TRUE, 0);
GST_INFO_OBJECT (splitmux, "Requesting next keyframe at %" GST_TIME_FORMAT,
GST_TIME_ARGS (target_time));
return gst_pad_push_event (splitmux->reference_ctx->sinkpad, ev);
}
static GstPadProbeReturn
handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
{
GstSplitMuxSink *splitmux = ctx->splitmux;
MqStreamBuf *buf_info = NULL;
GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type);
/* FIXME: Handle buffer lists, until then make it clear they won't work */
if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
g_warning ("Buffer list handling not implemented");
return GST_PAD_PROBE_DROP;
}
if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM) {
GstEvent *event = gst_pad_probe_info_get_event (info);
gboolean locked = FALSE;
GST_LOG_OBJECT (pad, "Event %" GST_PTR_FORMAT, event);
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_SEGMENT:
gst_event_copy_segment (event, &ctx->out_segment);
break;
case GST_EVENT_FLUSH_STOP:
GST_SPLITMUX_LOCK (splitmux);
locked = TRUE;
gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
g_queue_clear (&ctx->queued_bufs);
ctx->flushing = FALSE;
break;
case GST_EVENT_FLUSH_START:
GST_SPLITMUX_LOCK (splitmux);
locked = TRUE;
GST_LOG_OBJECT (pad, "Flush start");
ctx->flushing = TRUE;
GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
break;
case GST_EVENT_EOS:
GST_SPLITMUX_LOCK (splitmux);
locked = TRUE;
if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
goto beach;
ctx->out_eos = TRUE;
break;
case GST_EVENT_GAP:{
GstClockTime gap_ts;
GstClockTimeDiff rtime;
gst_event_parse_gap (event, &gap_ts, NULL);
if (gap_ts == GST_CLOCK_TIME_NONE)
break;
GST_SPLITMUX_LOCK (splitmux);
locked = TRUE;
if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
goto beach;
/* When we get a gap event on the
* reference stream and we're trying to open a
* new file, we need to store it until we get
* the buffer afterwards
*/
if (ctx->is_reference &&
(splitmux->output_state != SPLITMUX_OUTPUT_STATE_OUTPUT_GOP)) {
GST_DEBUG_OBJECT (pad, "Storing GAP event until buffer arrives");
gst_event_replace (&ctx->pending_gap, event);
GST_SPLITMUX_UNLOCK (splitmux);
return GST_PAD_PROBE_HANDLED;
}
rtime = my_segment_to_running_time (&ctx->out_segment, gap_ts);
GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_STIME_FORMAT,
GST_STIME_ARGS (rtime));
if (rtime != GST_CLOCK_STIME_NONE) {
ctx->out_running_time = rtime;
complete_or_wait_on_out (splitmux, ctx);
}
break;
}
splitmuxsink: Fix occasional deadlock when ending file with subtitle Deadlock occurs when splitting files if one stream received no buffer during the first GOP of the next file. That can happen in that scenario for example: 1) The first GOP of video is collected, it has a duration of 10s. max_in_running_time is set to 10s. 2) Other streams catchup and we receive the first subtitle buffer at ts=0 and has a duration of 1min. 3) We receive the 2nd subtitle buffer with a ts=1min. in_running_time is set to 1min. That buffer is blocked in handle_mq_input() because max_in_running_time is still 10s. 4) Since all in_running_time are now > 10s, max_out_running_time is now set to 10s. That first GOP gets recorded into the file. The muxer pop buffers out of the mq, when it tries to pop a 2nd subtitle buffer it blocks because the GstDataQueue is empty. 5) A 2nd GOP of video is collected and has a duration of 10s as well. max_in_running_time is now 20s. Since subtitle's in_running_time is already 1min, that GOP is already complete. 6) But let's say we overran the max file size, we thus set state to SPLITMUX_STATE_ENDING_FILE now. As soon as a buffer with ts > 10s (end of previous GOP) arrives in handle_mq_output(), EOS event is sent downstream instead. But since the subtitle queue is empty, that's never going to happen. Pipeline is now deadlocked. To fix this situation we have to: - Send a dummy event through the queue to wakeup output thread. - Update out_running_time to at least max_out_running_time so it sends EOS. - Respect time order, so we set out_running_tim=max_in_running_time because that's bigger than previous buffer and smaller than next. https://bugzilla.gnome.org/show_bug.cgi?id=763711
2016-03-18 19:45:01 +00:00
case GST_EVENT_CUSTOM_DOWNSTREAM:{
const GstStructure *s;
GstClockTimeDiff ts = 0;
splitmuxsink: Fix occasional deadlock when ending file with subtitle Deadlock occurs when splitting files if one stream received no buffer during the first GOP of the next file. That can happen in that scenario for example: 1) The first GOP of video is collected, it has a duration of 10s. max_in_running_time is set to 10s. 2) Other streams catchup and we receive the first subtitle buffer at ts=0 and has a duration of 1min. 3) We receive the 2nd subtitle buffer with a ts=1min. in_running_time is set to 1min. That buffer is blocked in handle_mq_input() because max_in_running_time is still 10s. 4) Since all in_running_time are now > 10s, max_out_running_time is now set to 10s. That first GOP gets recorded into the file. The muxer pop buffers out of the mq, when it tries to pop a 2nd subtitle buffer it blocks because the GstDataQueue is empty. 5) A 2nd GOP of video is collected and has a duration of 10s as well. max_in_running_time is now 20s. Since subtitle's in_running_time is already 1min, that GOP is already complete. 6) But let's say we overran the max file size, we thus set state to SPLITMUX_STATE_ENDING_FILE now. As soon as a buffer with ts > 10s (end of previous GOP) arrives in handle_mq_output(), EOS event is sent downstream instead. But since the subtitle queue is empty, that's never going to happen. Pipeline is now deadlocked. To fix this situation we have to: - Send a dummy event through the queue to wakeup output thread. - Update out_running_time to at least max_out_running_time so it sends EOS. - Respect time order, so we set out_running_tim=max_in_running_time because that's bigger than previous buffer and smaller than next. https://bugzilla.gnome.org/show_bug.cgi?id=763711
2016-03-18 19:45:01 +00:00
s = gst_event_get_structure (event);
if (!gst_structure_has_name (s, "splitmuxsink-unblock"))
break;
gst_structure_get_int64 (s, "timestamp", &ts);
splitmuxsink: Fix occasional deadlock when ending file with subtitle Deadlock occurs when splitting files if one stream received no buffer during the first GOP of the next file. That can happen in that scenario for example: 1) The first GOP of video is collected, it has a duration of 10s. max_in_running_time is set to 10s. 2) Other streams catchup and we receive the first subtitle buffer at ts=0 and has a duration of 1min. 3) We receive the 2nd subtitle buffer with a ts=1min. in_running_time is set to 1min. That buffer is blocked in handle_mq_input() because max_in_running_time is still 10s. 4) Since all in_running_time are now > 10s, max_out_running_time is now set to 10s. That first GOP gets recorded into the file. The muxer pop buffers out of the mq, when it tries to pop a 2nd subtitle buffer it blocks because the GstDataQueue is empty. 5) A 2nd GOP of video is collected and has a duration of 10s as well. max_in_running_time is now 20s. Since subtitle's in_running_time is already 1min, that GOP is already complete. 6) But let's say we overran the max file size, we thus set state to SPLITMUX_STATE_ENDING_FILE now. As soon as a buffer with ts > 10s (end of previous GOP) arrives in handle_mq_output(), EOS event is sent downstream instead. But since the subtitle queue is empty, that's never going to happen. Pipeline is now deadlocked. To fix this situation we have to: - Send a dummy event through the queue to wakeup output thread. - Update out_running_time to at least max_out_running_time so it sends EOS. - Respect time order, so we set out_running_tim=max_in_running_time because that's bigger than previous buffer and smaller than next. https://bugzilla.gnome.org/show_bug.cgi?id=763711
2016-03-18 19:45:01 +00:00
GST_SPLITMUX_LOCK (splitmux);
locked = TRUE;
splitmuxsink: Fix occasional deadlock when ending file with subtitle Deadlock occurs when splitting files if one stream received no buffer during the first GOP of the next file. That can happen in that scenario for example: 1) The first GOP of video is collected, it has a duration of 10s. max_in_running_time is set to 10s. 2) Other streams catchup and we receive the first subtitle buffer at ts=0 and has a duration of 1min. 3) We receive the 2nd subtitle buffer with a ts=1min. in_running_time is set to 1min. That buffer is blocked in handle_mq_input() because max_in_running_time is still 10s. 4) Since all in_running_time are now > 10s, max_out_running_time is now set to 10s. That first GOP gets recorded into the file. The muxer pop buffers out of the mq, when it tries to pop a 2nd subtitle buffer it blocks because the GstDataQueue is empty. 5) A 2nd GOP of video is collected and has a duration of 10s as well. max_in_running_time is now 20s. Since subtitle's in_running_time is already 1min, that GOP is already complete. 6) But let's say we overran the max file size, we thus set state to SPLITMUX_STATE_ENDING_FILE now. As soon as a buffer with ts > 10s (end of previous GOP) arrives in handle_mq_output(), EOS event is sent downstream instead. But since the subtitle queue is empty, that's never going to happen. Pipeline is now deadlocked. To fix this situation we have to: - Send a dummy event through the queue to wakeup output thread. - Update out_running_time to at least max_out_running_time so it sends EOS. - Respect time order, so we set out_running_tim=max_in_running_time because that's bigger than previous buffer and smaller than next. https://bugzilla.gnome.org/show_bug.cgi?id=763711
2016-03-18 19:45:01 +00:00
if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
splitmuxsink: Fix occasional deadlock when ending file with subtitle Deadlock occurs when splitting files if one stream received no buffer during the first GOP of the next file. That can happen in that scenario for example: 1) The first GOP of video is collected, it has a duration of 10s. max_in_running_time is set to 10s. 2) Other streams catchup and we receive the first subtitle buffer at ts=0 and has a duration of 1min. 3) We receive the 2nd subtitle buffer with a ts=1min. in_running_time is set to 1min. That buffer is blocked in handle_mq_input() because max_in_running_time is still 10s. 4) Since all in_running_time are now > 10s, max_out_running_time is now set to 10s. That first GOP gets recorded into the file. The muxer pop buffers out of the mq, when it tries to pop a 2nd subtitle buffer it blocks because the GstDataQueue is empty. 5) A 2nd GOP of video is collected and has a duration of 10s as well. max_in_running_time is now 20s. Since subtitle's in_running_time is already 1min, that GOP is already complete. 6) But let's say we overran the max file size, we thus set state to SPLITMUX_STATE_ENDING_FILE now. As soon as a buffer with ts > 10s (end of previous GOP) arrives in handle_mq_output(), EOS event is sent downstream instead. But since the subtitle queue is empty, that's never going to happen. Pipeline is now deadlocked. To fix this situation we have to: - Send a dummy event through the queue to wakeup output thread. - Update out_running_time to at least max_out_running_time so it sends EOS. - Respect time order, so we set out_running_tim=max_in_running_time because that's bigger than previous buffer and smaller than next. https://bugzilla.gnome.org/show_bug.cgi?id=763711
2016-03-18 19:45:01 +00:00
goto beach;
ctx->out_running_time = ts;
if (!ctx->is_reference)
complete_or_wait_on_out (splitmux, ctx);
splitmuxsink: Fix occasional deadlock when ending file with subtitle Deadlock occurs when splitting files if one stream received no buffer during the first GOP of the next file. That can happen in that scenario for example: 1) The first GOP of video is collected, it has a duration of 10s. max_in_running_time is set to 10s. 2) Other streams catchup and we receive the first subtitle buffer at ts=0 and has a duration of 1min. 3) We receive the 2nd subtitle buffer with a ts=1min. in_running_time is set to 1min. That buffer is blocked in handle_mq_input() because max_in_running_time is still 10s. 4) Since all in_running_time are now > 10s, max_out_running_time is now set to 10s. That first GOP gets recorded into the file. The muxer pop buffers out of the mq, when it tries to pop a 2nd subtitle buffer it blocks because the GstDataQueue is empty. 5) A 2nd GOP of video is collected and has a duration of 10s as well. max_in_running_time is now 20s. Since subtitle's in_running_time is already 1min, that GOP is already complete. 6) But let's say we overran the max file size, we thus set state to SPLITMUX_STATE_ENDING_FILE now. As soon as a buffer with ts > 10s (end of previous GOP) arrives in handle_mq_output(), EOS event is sent downstream instead. But since the subtitle queue is empty, that's never going to happen. Pipeline is now deadlocked. To fix this situation we have to: - Send a dummy event through the queue to wakeup output thread. - Update out_running_time to at least max_out_running_time so it sends EOS. - Respect time order, so we set out_running_tim=max_in_running_time because that's bigger than previous buffer and smaller than next. https://bugzilla.gnome.org/show_bug.cgi?id=763711
2016-03-18 19:45:01 +00:00
GST_SPLITMUX_UNLOCK (splitmux);
return GST_PAD_PROBE_DROP;
}
case GST_EVENT_CAPS:{
GstPad *peer;
if (!ctx->is_reference)
break;
peer = gst_pad_get_peer (pad);
if (peer) {
gboolean ok = gst_pad_send_event (peer, gst_event_ref (event));
gst_object_unref (peer);
if (ok)
break;
} else {
break;
}
/* This is in the case the muxer doesn't allow this change of caps */
GST_SPLITMUX_LOCK (splitmux);
locked = TRUE;
ctx->caps_change = TRUE;
splitmux->ready_for_output = FALSE;
if (splitmux->output_state != SPLITMUX_OUTPUT_STATE_START_NEXT_FILE) {
if (ctx->out_eos == FALSE) {
send_eos (splitmux, ctx);
}
splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE;
}
/* Lets it fall through, if it fails again, then the muxer just can't
* support this format, but at least we have a closed file.
*/
break;
}
default:
break;
}
/* We need to make sure events aren't passed
* until the muxer / sink are ready for it */
if (!locked)
GST_SPLITMUX_LOCK (splitmux);
if (!ctx->is_reference)
complete_or_wait_on_out (splitmux, ctx);
GST_SPLITMUX_UNLOCK (splitmux);
/* Don't try to forward sticky events before the next buffer is there
* because it would cause a new file to be created without the first
* buffer being available.
*/
if (ctx->caps_change && GST_EVENT_IS_STICKY (event))
return GST_PAD_PROBE_DROP;
else
return GST_PAD_PROBE_PASS;
}
/* Allow everything through until the configured next stopping point */
GST_SPLITMUX_LOCK (splitmux);
buf_info = g_queue_pop_tail (&ctx->queued_bufs);
if (buf_info == NULL)
/* Can only happen due to a poorly timed flush */
goto beach;
/* If we have popped a keyframe, decrement the queued_gop count */
if (buf_info->keyframe && splitmux->queued_keyframes > 0)
splitmux->queued_keyframes--;
ctx->out_running_time = buf_info->run_ts;
ctx->cur_out_buffer = gst_pad_probe_info_get_buffer (info);
GST_LOG_OBJECT (splitmux,
"Pad %" GST_PTR_FORMAT " buffer with run TS %" GST_STIME_FORMAT
" size %" G_GUINT64_FORMAT,
pad, GST_STIME_ARGS (ctx->out_running_time), buf_info->buf_size);
ctx->caps_change = FALSE;
complete_or_wait_on_out (splitmux, ctx);
splitmux->muxed_out_bytes += buf_info->buf_size;
#ifndef GST_DISABLE_GST_DEBUG
{
GstBuffer *buf = gst_pad_probe_info_get_buffer (info);
GST_LOG_OBJECT (pad, "Returning to pass buffer %" GST_PTR_FORMAT
" run ts %" GST_STIME_FORMAT, buf,
GST_STIME_ARGS (ctx->out_running_time));
}
#endif
ctx->cur_out_buffer = NULL;
GST_SPLITMUX_UNLOCK (splitmux);
/* pending_gap is protected by the STREAM lock */
if (ctx->pending_gap) {
/* If we previously stored a gap event, send it now */
GstPad *peer = gst_pad_get_peer (ctx->srcpad);
GST_DEBUG_OBJECT (splitmux,
"Pad %" GST_PTR_FORMAT " sending pending GAP event", ctx->srcpad);
gst_pad_send_event (peer, ctx->pending_gap);
ctx->pending_gap = NULL;
gst_object_unref (peer);
}
mq_stream_buf_free (buf_info);
return GST_PAD_PROBE_PASS;
beach:
GST_SPLITMUX_UNLOCK (splitmux);
return GST_PAD_PROBE_DROP;
}
static gboolean
resend_sticky (GstPad * pad, GstEvent ** event, GstPad * peer)
{
return gst_pad_send_event (peer, gst_event_ref (*event));
}
static void
restart_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
{
GstPad *peer = gst_pad_get_peer (ctx->srcpad);
gst_pad_sticky_events_foreach (ctx->srcpad,
(GstPadStickyEventsForeachFunction) (resend_sticky), peer);
/* Clear EOS flag if not actually EOS */
ctx->out_eos = GST_PAD_IS_EOS (ctx->srcpad);
gst_object_unref (peer);
}
/* Called with lock held when a fragment
* reaches EOS and it is time to restart
* a new fragment
*/
static void
start_next_fragment (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
{
GstElement *muxer, *sink;
/* 1 change to new file */
splitmux->switching_fragment = TRUE;
/* We need to drop the splitmux lock to acquire the state lock
* here and ensure there's no racy state change going on elsewhere */
muxer = gst_object_ref (splitmux->muxer);
sink = gst_object_ref (splitmux->active_sink);
GST_SPLITMUX_UNLOCK (splitmux);
GST_STATE_LOCK (splitmux);
gst_element_set_locked_state (muxer, TRUE);
gst_element_set_locked_state (sink, TRUE);
gst_element_set_state (muxer, GST_STATE_NULL);
gst_element_set_state (sink, GST_STATE_NULL);
GST_SPLITMUX_LOCK (splitmux);
if (splitmux->muxed_out_bytes > 0 || splitmux->fragment_id == 0)
set_next_filename (splitmux, ctx);
splitmux->muxed_out_bytes = 0;
GST_SPLITMUX_UNLOCK (splitmux);
gst_element_set_state (sink, GST_STATE_TARGET (splitmux));
gst_element_set_state (muxer, GST_STATE_TARGET (splitmux));
gst_element_set_locked_state (muxer, FALSE);
gst_element_set_locked_state (sink, FALSE);
gst_object_unref (sink);
gst_object_unref (muxer);
GST_SPLITMUX_LOCK (splitmux);
GST_STATE_UNLOCK (splitmux);
splitmux->switching_fragment = FALSE;
do_async_done (splitmux);
splitmux->ready_for_output = TRUE;
g_list_foreach (splitmux->contexts, (GFunc) restart_context, splitmux);
send_fragment_opened_closed_msg (splitmux, TRUE);
/* FIXME: Is this always the correct next state? */
splitmux->output_state = SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND;
GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
}
static void
bus_handler (GstBin * bin, GstMessage * message)
{
GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (bin);
switch (GST_MESSAGE_TYPE (message)) {
case GST_MESSAGE_EOS:
/* If the state is draining out the current file, drop this EOS */
GST_SPLITMUX_LOCK (splitmux);
send_fragment_opened_closed_msg (splitmux, FALSE);
if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_ENDING_FILE) {
GST_DEBUG_OBJECT (splitmux, "Caught EOS at end of fragment, dropping");
splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE;
GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
gst_message_unref (message);
GST_SPLITMUX_UNLOCK (splitmux);
return;
} else {
GST_DEBUG_OBJECT (splitmux,
"Passing EOS message. Output state %d max_out_running_time %"
GST_STIME_FORMAT, splitmux->output_state,
GST_STIME_ARGS (splitmux->max_out_running_time));
}
GST_SPLITMUX_UNLOCK (splitmux);
break;
case GST_MESSAGE_ASYNC_START:
case GST_MESSAGE_ASYNC_DONE:
/* Ignore state changes from our children while switching */
if (splitmux->switching_fragment) {
if (GST_MESSAGE_SRC (message) == (GstObject *) splitmux->active_sink
|| GST_MESSAGE_SRC (message) == (GstObject *) splitmux->muxer) {
GST_LOG_OBJECT (splitmux,
"Ignoring state change from child %" GST_PTR_FORMAT
" while switching", GST_MESSAGE_SRC (message));
gst_message_unref (message);
return;
}
}
break;
default:
break;
}
GST_BIN_CLASS (parent_class)->handle_message (bin, message);
}
static void
ctx_set_unblock (MqStreamCtx * ctx)
{
ctx->need_unblock = TRUE;
}
/* Called with splitmux lock held */
/* Called when entering ProcessingCompleteGop state
* Assess if mq contents overflowed the current file
* -> If yes, need to switch to new file
* -> if no, set max_out_running_time to let this GOP in and
* go to COLLECTING_GOP_START state
*/
static void
handle_gathered_gop (GstSplitMuxSink * splitmux)
{
guint64 queued_bytes;
GstClockTimeDiff queued_time = 0;
GstClockTimeDiff new_out_ts = splitmux->reference_ctx->in_running_time;
SplitMuxOutputCommand *cmd;
/* Assess if the multiqueue contents overflowed the current file */
/* When considering if a newly gathered GOP overflows
* the time limit for the file, only consider the running time of the
* reference stream. Other streams might have run ahead a little bit,
* but extra pieces won't be released to the muxer beyond the reference
* stream cut-off anyway - so it forms the limit. */
queued_bytes = splitmux->fragment_total_bytes + splitmux->gop_total_bytes;
queued_time = splitmux->reference_ctx->in_running_time;
GST_LOG_OBJECT (splitmux, " queued_bytes %" G_GUINT64_FORMAT, queued_bytes);
g_assert (queued_time >= splitmux->fragment_start_time);
queued_time -= splitmux->fragment_start_time;
/* Expand queued bytes estimate by muxer overhead */
queued_bytes += (queued_bytes * splitmux->mux_overhead);
GST_LOG_OBJECT (splitmux, "mq at TS %" GST_STIME_FORMAT
" bytes %" G_GUINT64_FORMAT, GST_STIME_ARGS (queued_time), queued_bytes);
if (splitmux->next_max_tc_time != GST_CLOCK_TIME_NONE) {
GST_LOG_OBJECT (splitmux,
"timecode mq TS %" GST_TIME_FORMAT " vs target %" GST_TIME_FORMAT,
GST_TIME_ARGS (splitmux->reference_ctx->in_running_time),
GST_TIME_ARGS (splitmux->next_max_tc_time + 5 * GST_USECOND));
}
/* Check for overrun - have we output at least one byte and overrun
* either threshold? */
/* Timecode-based threshold accounts for possible rounding errors:
* 5us should be bigger than all possible rounding errors but nowhere near
* big enough to skip to another frame */
if ((splitmux->fragment_total_bytes > 0 &&
((splitmux->threshold_bytes > 0 &&
queued_bytes > splitmux->threshold_bytes) ||
(splitmux->threshold_time > 0 &&
queued_time > splitmux->threshold_time) ||
(splitmux->next_max_tc_time != GST_CLOCK_TIME_NONE &&
splitmux->reference_ctx->in_running_time >
splitmux->next_max_tc_time + 5 * GST_USECOND)))) {
/* Tell the output side to start a new fragment */
GST_INFO_OBJECT (splitmux,
"This GOP (dur %" GST_STIME_FORMAT
") would overflow the fragment, Sending start_new_fragment cmd",
GST_STIME_ARGS (splitmux->reference_ctx->in_running_time -
splitmux->gop_start_time));
cmd = out_cmd_buf_new ();
cmd->start_new_fragment = TRUE;
g_queue_push_head (&splitmux->out_cmd_q, cmd);
GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
new_out_ts = splitmux->reference_ctx->in_running_time;
splitmux->fragment_start_time = splitmux->gop_start_time;
splitmux->fragment_total_bytes = 0;
if (request_next_keyframe (splitmux,
splitmux->reference_ctx->prev_in_keyframe) == FALSE) {
GST_WARNING_OBJECT (splitmux,
"Could not request a keyframe. Files may not split at the exact location they should");
}
gst_buffer_replace (&splitmux->reference_ctx->prev_in_keyframe, NULL);
}
/* And set up to collect the next GOP */
if (!splitmux->reference_ctx->in_eos) {
splitmux->input_state = SPLITMUX_INPUT_STATE_COLLECTING_GOP_START;
splitmux->gop_start_time = new_out_ts;
} else {
/* This is probably already the current state, but just in case: */
splitmux->input_state = SPLITMUX_INPUT_STATE_FINISHING_UP;
new_out_ts = GST_CLOCK_STIME_NONE; /* EOS runs until forever */
}
/* And wake all input contexts to send a wake-up event */
g_list_foreach (splitmux->contexts, (GFunc) ctx_set_unblock, NULL);
GST_SPLITMUX_BROADCAST_INPUT (splitmux);
/* Now either way - either there was no overflow, or we requested a new fragment: release this GOP */
splitmux->fragment_total_bytes += splitmux->gop_total_bytes;
if (splitmux->gop_total_bytes > 0) {
GST_LOG_OBJECT (splitmux,
"Releasing GOP to output. Bytes in fragment now %" G_GUINT64_FORMAT
" time %" GST_STIME_FORMAT,
splitmux->fragment_total_bytes, GST_STIME_ARGS (queued_time));
/* Send this GOP to the output command queue */
cmd = out_cmd_buf_new ();
cmd->start_new_fragment = FALSE;
cmd->max_output_ts = new_out_ts;
GST_LOG_OBJECT (splitmux, "Sending GOP cmd to output for TS %"
GST_STIME_FORMAT, GST_STIME_ARGS (new_out_ts));
g_queue_push_head (&splitmux->out_cmd_q, cmd);
GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
}
splitmux->gop_total_bytes = 0;
}
/* Called with splitmux lock held */
/* Called from each input pad when it is has all the pieces
* for a GOP or EOS, starting with the reference pad which has set the
* splitmux->max_in_running_time
*/
static void
check_completed_gop (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
{
GList *cur;
GstEvent *event;
/* On ENDING_FILE, the reference stream sends a command to start a new
* fragment, then releases the GOP for output in the new fragment.
* If somes streams received no buffer during the last GOP that overran,
* because its next buffer has a timestamp bigger than
* ctx->max_in_running_time, its queue is empty. In that case the only
* way to wakeup the output thread is by injecting an event in the
* queue. This usually happen with subtitle streams.
* See https://bugzilla.gnome.org/show_bug.cgi?id=763711. */
if (ctx->need_unblock) {
GST_LOG_OBJECT (ctx->sinkpad, "Sending splitmuxsink-unblock event");
event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM |
GST_EVENT_TYPE_SERIALIZED,
gst_structure_new ("splitmuxsink-unblock", "timestamp",
G_TYPE_INT64, splitmux->max_in_running_time, NULL));
GST_SPLITMUX_UNLOCK (splitmux);
gst_pad_send_event (ctx->sinkpad, event);
GST_SPLITMUX_LOCK (splitmux);
ctx->need_unblock = FALSE;
GST_SPLITMUX_BROADCAST_INPUT (splitmux);
/* state may have changed while we were unlocked. Loop again if so */
if (splitmux->input_state != SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT)
return;
}
if (splitmux->input_state == SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT) {
gboolean ready = TRUE;
/* Iterate each pad, and check that the input running time is at least
* up to the reference running time, and if so handle the collected GOP */
GST_LOG_OBJECT (splitmux, "Checking GOP collected, Max in running time %"
GST_STIME_FORMAT " ctx %p",
GST_STIME_ARGS (splitmux->max_in_running_time), ctx);
for (cur = g_list_first (splitmux->contexts); cur != NULL;
cur = g_list_next (cur)) {
MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
GST_LOG_OBJECT (splitmux,
"Context %p (src pad %" GST_PTR_FORMAT ") TS %" GST_STIME_FORMAT
" EOS %d", tmpctx, tmpctx->srcpad,
GST_STIME_ARGS (tmpctx->in_running_time), tmpctx->in_eos);
if (splitmux->max_in_running_time != GST_CLOCK_STIME_NONE &&
tmpctx->in_running_time < splitmux->max_in_running_time &&
!tmpctx->in_eos) {
GST_LOG_OBJECT (splitmux,
"Context %p (src pad %" GST_PTR_FORMAT ") not ready. We'll sleep",
tmpctx, tmpctx->srcpad);
ready = FALSE;
break;
}
}
if (ready) {
GST_DEBUG_OBJECT (splitmux,
"Collected GOP is complete. Processing (ctx %p)", ctx);
/* All pads have a complete GOP, release it into the multiqueue */
handle_gathered_gop (splitmux);
}
}
/* If upstream reached EOS we are not expecting more data, no need to wait
* here. */
if (ctx->in_eos)
return;
/* Some pad is not yet ready, or GOP is being pushed
* either way, sleep and wait to get woken */
while (splitmux->input_state == SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT &&
!ctx->flushing &&
(ctx->in_running_time >= splitmux->max_in_running_time) &&
(splitmux->max_in_running_time != GST_CLOCK_STIME_NONE)) {
GST_LOG_OBJECT (splitmux, "Sleeping for GOP collection (ctx %p)", ctx);
GST_SPLITMUX_WAIT_INPUT (splitmux);
GST_LOG_OBJECT (splitmux, "Done waiting for complete GOP (ctx %p)", ctx);
}
}
static GstPadProbeReturn
handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
{
GstSplitMuxSink *splitmux = ctx->splitmux;
GstBuffer *buf;
MqStreamBuf *buf_info = NULL;
GstClockTime ts;
gboolean loop_again;
gboolean keyframe = FALSE;
GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type);
/* FIXME: Handle buffer lists, until then make it clear they won't work */
if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
g_warning ("Buffer list handling not implemented");
return GST_PAD_PROBE_DROP;
}
if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM) {
GstEvent *event = gst_pad_probe_info_get_event (info);
GST_LOG_OBJECT (pad, "Event %" GST_PTR_FORMAT, event);
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_SEGMENT:
gst_event_copy_segment (event, &ctx->in_segment);
break;
case GST_EVENT_FLUSH_STOP:
GST_SPLITMUX_LOCK (splitmux);
gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
ctx->in_eos = FALSE;
ctx->in_running_time = GST_CLOCK_STIME_NONE;
GST_SPLITMUX_UNLOCK (splitmux);
break;
case GST_EVENT_EOS:
GST_SPLITMUX_LOCK (splitmux);
ctx->in_eos = TRUE;
if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED)
goto beach;
if (ctx->is_reference) {
GST_INFO_OBJECT (splitmux, "Got Reference EOS. Finishing up");
/* check_completed_gop will act as if this is a new keyframe with infinite timestamp */
splitmux->input_state = SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT;
/* Wake up other input pads to collect this GOP */
GST_SPLITMUX_BROADCAST_INPUT (splitmux);
check_completed_gop (splitmux, ctx);
} else if (splitmux->input_state ==
SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT) {
/* If we are waiting for a GOP to be completed (ie, for aux
* pads to catch up), then this pad is complete, so check
* if the whole GOP is.
*/
check_completed_gop (splitmux, ctx);
}
GST_SPLITMUX_UNLOCK (splitmux);
break;
case GST_EVENT_GAP:{
GstClockTime gap_ts;
GstClockTimeDiff rtime;
gst_event_parse_gap (event, &gap_ts, NULL);
if (gap_ts == GST_CLOCK_TIME_NONE)
break;
GST_SPLITMUX_LOCK (splitmux);
if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED)
goto beach;
rtime = my_segment_to_running_time (&ctx->in_segment, gap_ts);
GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_STIME_FORMAT,
GST_STIME_ARGS (rtime));
if (ctx->is_reference
&& splitmux->fragment_start_time == GST_CLOCK_STIME_NONE) {
splitmux->gop_start_time = splitmux->fragment_start_time = rtime;
GST_LOG_OBJECT (splitmux, "Mux start time now %" GST_STIME_FORMAT,
GST_STIME_ARGS (splitmux->fragment_start_time));
/* Also take this as the first start time when starting up,
* so that we start counting overflow from the first frame */
if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time))
splitmux->max_in_running_time = splitmux->fragment_start_time;
}
GST_SPLITMUX_UNLOCK (splitmux);
break;
}
default:
break;
}
return GST_PAD_PROBE_PASS;
}
buf = gst_pad_probe_info_get_buffer (info);
buf_info = mq_stream_buf_new ();
if (GST_BUFFER_PTS_IS_VALID (buf))
ts = GST_BUFFER_PTS (buf);
else
ts = GST_BUFFER_DTS (buf);
GST_LOG_OBJECT (pad, "Buffer TS is %" GST_TIME_FORMAT, GST_TIME_ARGS (ts));
GST_SPLITMUX_LOCK (splitmux);
if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED)
goto beach;
/* If this buffer has a timestamp, advance the input timestamp of the
* stream */
if (GST_CLOCK_TIME_IS_VALID (ts)) {
GstClockTimeDiff running_time =
my_segment_to_running_time (&ctx->in_segment, ts);
GST_LOG_OBJECT (pad, "Buffer running TS is %" GST_STIME_FORMAT,
GST_STIME_ARGS (running_time));
if (GST_CLOCK_STIME_IS_VALID (running_time)
&& running_time > ctx->in_running_time)
ctx->in_running_time = running_time;
}
/* Try to make sure we have a valid running time */
if (!GST_CLOCK_STIME_IS_VALID (ctx->in_running_time)) {
ctx->in_running_time =
my_segment_to_running_time (&ctx->in_segment, ctx->in_segment.start);
}
GST_LOG_OBJECT (pad, "in running time now %" GST_STIME_FORMAT,
GST_STIME_ARGS (ctx->in_running_time));
buf_info->run_ts = ctx->in_running_time;
buf_info->buf_size = gst_buffer_get_size (buf);
buf_info->duration = GST_BUFFER_DURATION (buf);
/* initialize fragment_start_time */
if (ctx->is_reference
&& splitmux->fragment_start_time == GST_CLOCK_STIME_NONE) {
splitmux->gop_start_time = splitmux->fragment_start_time = buf_info->run_ts;
GST_LOG_OBJECT (splitmux, "Mux start time now %" GST_STIME_FORMAT,
GST_STIME_ARGS (splitmux->fragment_start_time));
gst_buffer_replace (&ctx->prev_in_keyframe, buf);
/* Also take this as the first start time when starting up,
* so that we start counting overflow from the first frame */
if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time))
splitmux->max_in_running_time = splitmux->fragment_start_time;
if (request_next_keyframe (splitmux, ctx->prev_in_keyframe) == FALSE) {
GST_WARNING_OBJECT (splitmux,
"Could not request a keyframe. Files may not split at the exact location they should");
}
gst_buffer_replace (&splitmux->reference_ctx->prev_in_keyframe, NULL);
}
GST_DEBUG_OBJECT (pad, "Buf TS %" GST_STIME_FORMAT
" total GOP bytes %" G_GUINT64_FORMAT,
GST_STIME_ARGS (buf_info->run_ts), splitmux->gop_total_bytes);
loop_again = TRUE;
do {
if (ctx->flushing)
break;
switch (splitmux->input_state) {
case SPLITMUX_INPUT_STATE_COLLECTING_GOP_START:
if (ctx->is_reference) {
/* If a keyframe, we have a complete GOP */
if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT) ||
!GST_CLOCK_STIME_IS_VALID (ctx->in_running_time) ||
splitmux->max_in_running_time >= ctx->in_running_time) {
/* Pass this buffer through */
loop_again = FALSE;
/* Allow other input pads to catch up to here too */
splitmux->max_in_running_time = ctx->in_running_time;
GST_SPLITMUX_BROADCAST_INPUT (splitmux);
break;
}
GST_INFO_OBJECT (pad,
"Have keyframe with running time %" GST_STIME_FORMAT,
GST_STIME_ARGS (ctx->in_running_time));
keyframe = TRUE;
splitmux->input_state = SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT;
splitmux->max_in_running_time = ctx->in_running_time;
/* Wake up other input pads to collect this GOP */
GST_SPLITMUX_BROADCAST_INPUT (splitmux);
check_completed_gop (splitmux, ctx);
/* Store this new keyframe to remember the start of GOP */
gst_buffer_replace (&ctx->prev_in_keyframe, buf);
} else {
/* Pass this buffer if the reference ctx is far enough ahead */
if (ctx->in_running_time < splitmux->max_in_running_time) {
loop_again = FALSE;
break;
}
/* We're still waiting for a keyframe on the reference pad, sleep */
GST_LOG_OBJECT (pad, "Sleeping for GOP start");
GST_SPLITMUX_WAIT_INPUT (splitmux);
GST_LOG_OBJECT (pad,
"Done sleeping for GOP start input state now %d",
splitmux->input_state);
}
break;
case SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT:{
/* We're collecting a GOP. If this is the reference context,
* we need to check if this is a keyframe that marks the start
* of the next GOP. If it is, it marks the end of the GOP we're
* collecting, so sleep and wait until all the other pads also
* reach that timestamp - at which point, we have an entire GOP
* and either go to ENDING_FILE or release this GOP to the muxer and
* go back to COLLECT_GOP_START. */
/* If we overran the target timestamp, it might be time to process
* the GOP, otherwise bail out for more data
*/
GST_LOG_OBJECT (pad,
"Checking TS %" GST_STIME_FORMAT " against max %"
GST_STIME_FORMAT, GST_STIME_ARGS (ctx->in_running_time),
GST_STIME_ARGS (splitmux->max_in_running_time));
if (ctx->in_running_time < splitmux->max_in_running_time) {
loop_again = FALSE;
break;
}
GST_LOG_OBJECT (pad,
"Collected last packet of GOP. Checking other pads");
check_completed_gop (splitmux, ctx);
break;
splitmuxsink: Fix occasional deadlock when ending file with subtitle Deadlock occurs when splitting files if one stream received no buffer during the first GOP of the next file. That can happen in that scenario for example: 1) The first GOP of video is collected, it has a duration of 10s. max_in_running_time is set to 10s. 2) Other streams catchup and we receive the first subtitle buffer at ts=0 and has a duration of 1min. 3) We receive the 2nd subtitle buffer with a ts=1min. in_running_time is set to 1min. That buffer is blocked in handle_mq_input() because max_in_running_time is still 10s. 4) Since all in_running_time are now > 10s, max_out_running_time is now set to 10s. That first GOP gets recorded into the file. The muxer pop buffers out of the mq, when it tries to pop a 2nd subtitle buffer it blocks because the GstDataQueue is empty. 5) A 2nd GOP of video is collected and has a duration of 10s as well. max_in_running_time is now 20s. Since subtitle's in_running_time is already 1min, that GOP is already complete. 6) But let's say we overran the max file size, we thus set state to SPLITMUX_STATE_ENDING_FILE now. As soon as a buffer with ts > 10s (end of previous GOP) arrives in handle_mq_output(), EOS event is sent downstream instead. But since the subtitle queue is empty, that's never going to happen. Pipeline is now deadlocked. To fix this situation we have to: - Send a dummy event through the queue to wakeup output thread. - Update out_running_time to at least max_out_running_time so it sends EOS. - Respect time order, so we set out_running_tim=max_in_running_time because that's bigger than previous buffer and smaller than next. https://bugzilla.gnome.org/show_bug.cgi?id=763711
2016-03-18 19:45:01 +00:00
}
case SPLITMUX_INPUT_STATE_FINISHING_UP:
loop_again = FALSE;
break;
default:
loop_again = FALSE;
break;
}
}
while (loop_again);
if (keyframe) {
splitmux->queued_keyframes++;
buf_info->keyframe = TRUE;
}
/* Update total input byte counter for overflow detect */
splitmux->gop_total_bytes += buf_info->buf_size;
/* Now add this buffer to the queue just before returning */
g_queue_push_head (&ctx->queued_bufs, buf_info);
GST_LOG_OBJECT (pad, "Returning to queue buffer %" GST_PTR_FORMAT
" run ts %" GST_STIME_FORMAT, buf, GST_STIME_ARGS (ctx->in_running_time));
GST_SPLITMUX_UNLOCK (splitmux);
return GST_PAD_PROBE_PASS;
beach:
GST_SPLITMUX_UNLOCK (splitmux);
if (buf_info)
mq_stream_buf_free (buf_info);
return GST_PAD_PROBE_PASS;
}
static void
grow_blocked_queues (GstSplitMuxSink * splitmux)
{
GList *cur;
/* Scan other queues for full-ness and grow them */
for (cur = g_list_first (splitmux->contexts);
cur != NULL; cur = g_list_next (cur)) {
MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
guint cur_limit;
guint cur_len = g_queue_get_length (&tmpctx->queued_bufs);
g_object_get (tmpctx->q, "max-size-buffers", &cur_limit, NULL);
GST_LOG_OBJECT (tmpctx->q, "Queue len %u", cur_len);
if (cur_len >= cur_limit) {
cur_limit = cur_len + 1;
GST_DEBUG_OBJECT (tmpctx->q,
"Queue overflowed and needs enlarging. Growing to %u buffers",
cur_limit);
g_object_set (tmpctx->q, "max-size-buffers", cur_limit, NULL);
}
}
}
static void
handle_q_underrun (GstElement * q, gpointer user_data)
{
MqStreamCtx *ctx = (MqStreamCtx *) (user_data);
GstSplitMuxSink *splitmux = ctx->splitmux;
GST_SPLITMUX_LOCK (splitmux);
GST_DEBUG_OBJECT (q,
"Queue reported underrun with %d keyframes and %d cmds enqueued",
splitmux->queued_keyframes, g_queue_get_length (&splitmux->out_cmd_q));
grow_blocked_queues (splitmux);
GST_SPLITMUX_UNLOCK (splitmux);
}
static void
handle_q_overrun (GstElement * q, gpointer user_data)
{
MqStreamCtx *ctx = (MqStreamCtx *) (user_data);
GstSplitMuxSink *splitmux = ctx->splitmux;
gboolean allow_grow = FALSE;
GST_SPLITMUX_LOCK (splitmux);
GST_DEBUG_OBJECT (q,
"Queue reported overrun with %d keyframes and %d cmds enqueued",
splitmux->queued_keyframes, g_queue_get_length (&splitmux->out_cmd_q));
if (splitmux->queued_keyframes < 2) {
/* Less than a full GOP queued, grow the queue */
allow_grow = TRUE;
} else if (g_queue_get_length (&splitmux->out_cmd_q) < 1) {
allow_grow = TRUE;
} else {
/* If another queue is starved, grow */
GList *cur;
for (cur = g_list_first (splitmux->contexts);
cur != NULL; cur = g_list_next (cur)) {
MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
if (tmpctx != ctx && g_queue_get_length (&tmpctx->queued_bufs) < 1) {
allow_grow = TRUE;
}
}
}
GST_SPLITMUX_UNLOCK (splitmux);
if (allow_grow) {
guint cur_limit;
g_object_get (q, "max-size-buffers", &cur_limit, NULL);
cur_limit++;
GST_DEBUG_OBJECT (q,
"Queue overflowed and needs enlarging. Growing to %u buffers",
cur_limit);
g_object_set (q, "max-size-buffers", cur_limit, NULL);
}
}
static GstPad *
gst_splitmux_sink_request_new_pad (GstElement * element,
GstPadTemplate * templ, const gchar * name, const GstCaps * caps)
{
GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
GstPadTemplate *mux_template = NULL;
GstPad *res = NULL;
GstElement *q;
GstPad *q_sink = NULL, *q_src = NULL;
gchar *gname;
gboolean is_video = FALSE;
MqStreamCtx *ctx;
GST_DEBUG_OBJECT (element, "templ:%s, name:%s", templ->name_template, name);
GST_SPLITMUX_LOCK (splitmux);
if (!create_muxer (splitmux))
goto fail;
if (templ->name_template) {
if (g_str_equal (templ->name_template, "video")) {
if (splitmux->have_video)
goto already_have_video;
/* FIXME: Look for a pad template with matching caps, rather than by name */
mux_template =
gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
(splitmux->muxer), "video_%u");
/* Fallback to find sink pad templates named 'video' (flvmux) */
if (!mux_template) {
mux_template =
gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
(splitmux->muxer), "video");
}
is_video = TRUE;
name = NULL;
} else {
mux_template =
gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
(splitmux->muxer), templ->name_template);
/* Fallback to find sink pad templates named 'audio' (flvmux) */
if (!mux_template) {
mux_template =
gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
(splitmux->muxer), "audio");
}
}
if (mux_template == NULL) {
/* Fallback to find sink pad templates named 'sink_%d' (mpegtsmux) */
mux_template =
gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
(splitmux->muxer), "sink_%d");
}
}
res = gst_element_request_pad (splitmux->muxer, mux_template, name, caps);
if (res == NULL)
goto fail;
if (is_video)
gname = g_strdup ("video");
else if (name == NULL)
gname = gst_pad_get_name (res);
else
gname = g_strdup (name);
if ((q = create_element (splitmux, "queue", NULL, FALSE)) == NULL)
goto fail;
gst_element_set_state (q, GST_STATE_TARGET (splitmux));
g_object_set (q, "max-size-bytes", 0, "max-size-time", (guint64) (0),
"max-size-buffers", 5, NULL);
q_sink = gst_element_get_static_pad (q, "sink");
q_src = gst_element_get_static_pad (q, "src");
if (gst_pad_link (q_src, res) != GST_PAD_LINK_OK) {
gst_element_release_request_pad (splitmux->muxer, res);
gst_object_unref (GST_OBJECT (res));
goto fail;
}
gst_object_unref (GST_OBJECT (res));
ctx = mq_stream_ctx_new (splitmux);
/* Context holds a ref: */
ctx->q = gst_object_ref (q);
ctx->srcpad = q_src;
ctx->sinkpad = q_sink;
ctx->q_overrun_id =
g_signal_connect (q, "overrun", (GCallback) handle_q_overrun, ctx);
g_signal_connect (q, "underrun", (GCallback) handle_q_underrun, ctx);
mq_stream_ctx_ref (ctx);
ctx->src_pad_block_id =
gst_pad_add_probe (q_src, GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM,
(GstPadProbeCallback) handle_mq_output, ctx, (GDestroyNotify)
_pad_block_destroy_src_notify);
if (is_video && splitmux->reference_ctx != NULL) {
splitmux->reference_ctx->is_reference = FALSE;
splitmux->reference_ctx = NULL;
}
if (splitmux->reference_ctx == NULL) {
splitmux->reference_ctx = ctx;
ctx->is_reference = TRUE;
}
res = gst_ghost_pad_new_from_template (gname, q_sink, templ);
g_object_set_qdata ((GObject *) (res), PAD_CONTEXT, ctx);
mq_stream_ctx_ref (ctx);
ctx->sink_pad_block_id =
gst_pad_add_probe (q_sink, GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM,
(GstPadProbeCallback) handle_mq_input, ctx, (GDestroyNotify)
_pad_block_destroy_sink_notify);
GST_DEBUG_OBJECT (splitmux, "Request pad %" GST_PTR_FORMAT
" feeds queue pad %" GST_PTR_FORMAT, res, q_sink);
splitmux->contexts = g_list_prepend (splitmux->contexts, ctx);
g_free (gname);
if (is_video)
splitmux->have_video = TRUE;
gst_pad_set_active (res, TRUE);
gst_element_add_pad (element, res);
GST_SPLITMUX_UNLOCK (splitmux);
return res;
fail:
GST_SPLITMUX_UNLOCK (splitmux);
if (q_sink)
gst_object_unref (q_sink);
if (q_src)
gst_object_unref (q_src);
return NULL;
already_have_video:
GST_DEBUG_OBJECT (splitmux, "video sink pad already requested");
GST_SPLITMUX_UNLOCK (splitmux);
return NULL;
}
static void
gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad)
{
GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
GstPad *muxpad = NULL;
MqStreamCtx *ctx =
(MqStreamCtx *) (g_object_get_qdata ((GObject *) (pad), PAD_CONTEXT));
GST_SPLITMUX_LOCK (splitmux);
if (splitmux->muxer == NULL)
goto fail; /* Elements don't exist yet - nothing to release */
GST_INFO_OBJECT (pad, "releasing request pad");
muxpad = gst_pad_get_peer (ctx->srcpad);
/* Remove the context from our consideration */
splitmux->contexts = g_list_remove (splitmux->contexts, ctx);
if (ctx->sink_pad_block_id)
gst_pad_remove_probe (ctx->sinkpad, ctx->sink_pad_block_id);
if (ctx->src_pad_block_id)
gst_pad_remove_probe (ctx->srcpad, ctx->src_pad_block_id);
/* Can release the context now */
mq_stream_ctx_unref (ctx);
if (ctx == splitmux->reference_ctx)
splitmux->reference_ctx = NULL;
/* Release and free the muxer input */
if (muxpad) {
gst_element_release_request_pad (splitmux->muxer, muxpad);
gst_object_unref (muxpad);
}
if (GST_PAD_PAD_TEMPLATE (pad) &&
g_str_equal (GST_PAD_TEMPLATE_NAME_TEMPLATE (GST_PAD_PAD_TEMPLATE
(pad)), "video"))
splitmux->have_video = FALSE;
gst_element_remove_pad (element, pad);
/* Reset the internal elements only after all request pads are released */
if (splitmux->contexts == NULL)
gst_splitmux_reset (splitmux);
fail:
GST_SPLITMUX_UNLOCK (splitmux);
}
static GstElement *
create_element (GstSplitMuxSink * splitmux,
const gchar * factory, const gchar * name, gboolean locked)
{
GstElement *ret = gst_element_factory_make (factory, name);
if (ret == NULL) {
g_warning ("Failed to create %s - splitmuxsink will not work", name);
return NULL;
}
if (locked) {
/* Ensure the sink starts in locked state and NULL - it will be changed
* by the filename setting code */
gst_element_set_locked_state (ret, TRUE);
gst_element_set_state (ret, GST_STATE_NULL);
}
if (!gst_bin_add (GST_BIN (splitmux), ret)) {
g_warning ("Could not add %s element - splitmuxsink will not work", name);
gst_object_unref (ret);
return NULL;
}
return ret;
}
static gboolean
create_muxer (GstSplitMuxSink * splitmux)
{
/* Create internal elements */
if (splitmux->muxer == NULL) {
GstElement *provided_muxer = NULL;
GST_OBJECT_LOCK (splitmux);
if (splitmux->provided_muxer != NULL)
provided_muxer = gst_object_ref (splitmux->provided_muxer);
GST_OBJECT_UNLOCK (splitmux);
if (provided_muxer == NULL) {
if ((splitmux->muxer =
create_element (splitmux, "mp4mux", "muxer", FALSE)) == NULL)
goto fail;
} else {
/* Ensure it's not in locked state (we might be reusing an old element) */
gst_element_set_locked_state (provided_muxer, FALSE);
if (!gst_bin_add (GST_BIN (splitmux), provided_muxer)) {
g_warning ("Could not add muxer element - splitmuxsink will not work");
gst_object_unref (provided_muxer);
goto fail;
}
splitmux->muxer = provided_muxer;
gst_object_unref (provided_muxer);
}
}
return TRUE;
fail:
return FALSE;
}
static GstElement *
find_sink (GstElement * e)
{
GstElement *res = NULL;
GstIterator *iter;
gboolean done = FALSE;
GValue data = { 0, };
if (!GST_IS_BIN (e))
return e;
if (g_object_class_find_property (G_OBJECT_GET_CLASS (e), "location") != NULL)
return e;
iter = gst_bin_iterate_sinks (GST_BIN (e));
while (!done) {
switch (gst_iterator_next (iter, &data)) {
case GST_ITERATOR_OK:
{
GstElement *child = g_value_get_object (&data);
if (g_object_class_find_property (G_OBJECT_GET_CLASS (child),
"location") != NULL) {
res = child;
done = TRUE;
}
g_value_reset (&data);
break;
}
case GST_ITERATOR_RESYNC:
gst_iterator_resync (iter);
break;
case GST_ITERATOR_DONE:
done = TRUE;
break;
case GST_ITERATOR_ERROR:
g_assert_not_reached ();
break;
}
}
g_value_unset (&data);
gst_iterator_free (iter);
return res;
}
static gboolean
create_sink (GstSplitMuxSink * splitmux)
{
GstElement *provided_sink = NULL;
if (splitmux->active_sink == NULL) {
GST_OBJECT_LOCK (splitmux);
if (splitmux->provided_sink != NULL)
provided_sink = gst_object_ref (splitmux->provided_sink);
GST_OBJECT_UNLOCK (splitmux);
if (provided_sink == NULL) {
if ((splitmux->sink =
create_element (splitmux, DEFAULT_SINK, "sink", TRUE)) == NULL)
goto fail;
splitmux->active_sink = splitmux->sink;
} else {
/* Ensure the sink starts in locked state and NULL - it will be changed
* by the filename setting code */
gst_element_set_locked_state (provided_sink, TRUE);
gst_element_set_state (provided_sink, GST_STATE_NULL);
if (!gst_bin_add (GST_BIN (splitmux), provided_sink)) {
g_warning ("Could not add sink elements - splitmuxsink will not work");
gst_object_unref (provided_sink);
goto fail;
}
splitmux->active_sink = provided_sink;
/* The bin holds a ref now, we can drop our tmp ref */
gst_object_unref (provided_sink);
/* Find the sink element */
splitmux->sink = find_sink (splitmux->active_sink);
if (splitmux->sink == NULL) {
g_warning
("Could not locate sink element in provided sink - splitmuxsink will not work");
goto fail;
}
}
#if 1
if (g_object_class_find_property (G_OBJECT_GET_CLASS (splitmux->sink),
"async") != NULL) {
/* async child elements are causing state change races and weird
* failures, so let's try and turn that off */
g_object_set (splitmux->sink, "async", FALSE, NULL);
}
#endif
if (!gst_element_link (splitmux->muxer, splitmux->active_sink)) {
g_warning ("Failed to link muxer and sink- splitmuxsink will not work");
goto fail;
}
}
return TRUE;
fail:
return FALSE;
}
#ifdef __GNUC__
#pragma GCC diagnostic ignored "-Wformat-nonliteral"
#endif
static void
set_next_filename (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
{
gchar *fname = NULL;
GstSample *sample;
GstCaps *caps;
gst_splitmux_sink_ensure_max_files (splitmux);
if (ctx->cur_out_buffer == NULL) {
GST_WARNING_OBJECT (splitmux, "Starting next file without buffer");
}
caps = gst_pad_get_current_caps (ctx->srcpad);
sample = gst_sample_new (ctx->cur_out_buffer, caps, &ctx->out_segment, NULL);
g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION_FULL], 0,
splitmux->fragment_id, sample, &fname);
gst_sample_unref (sample);
if (caps)
gst_caps_unref (caps);
if (fname == NULL) {
/* Fallback to the old signal if the new one returned nothing */
g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION], 0,
splitmux->fragment_id, &fname);
}
if (!fname)
fname = splitmux->location ?
g_strdup_printf (splitmux->location, splitmux->fragment_id) : NULL;
if (fname) {
GST_INFO_OBJECT (splitmux, "Setting file to %s", fname);
g_object_set (splitmux->sink, "location", fname, NULL);
g_free (fname);
splitmux->fragment_id++;
}
}
static void
do_async_start (GstSplitMuxSink * splitmux)
{
GstMessage *message;
if (!splitmux->need_async_start) {
GST_INFO_OBJECT (splitmux, "no async_start needed");
return;
}
splitmux->async_pending = TRUE;
GST_INFO_OBJECT (splitmux, "Sending async_start message");
message = gst_message_new_async_start (GST_OBJECT_CAST (splitmux));
GST_BIN_CLASS (parent_class)->handle_message (GST_BIN_CAST
(splitmux), message);
}
static void
do_async_done (GstSplitMuxSink * splitmux)
{
GstMessage *message;
if (splitmux->async_pending) {
GST_INFO_OBJECT (splitmux, "Sending async_done message");
message =
gst_message_new_async_done (GST_OBJECT_CAST (splitmux),
GST_CLOCK_TIME_NONE);
GST_BIN_CLASS (parent_class)->handle_message (GST_BIN_CAST
(splitmux), message);
splitmux->async_pending = FALSE;
}
splitmux->need_async_start = FALSE;
}
static GstStateChangeReturn
gst_splitmux_sink_change_state (GstElement * element, GstStateChange transition)
{
GstStateChangeReturn ret;
GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
switch (transition) {
case GST_STATE_CHANGE_NULL_TO_READY:{
GST_SPLITMUX_LOCK (splitmux);
if (!create_muxer (splitmux) || !create_sink (splitmux)) {
ret = GST_STATE_CHANGE_FAILURE;
GST_SPLITMUX_UNLOCK (splitmux);
goto beach;
}
GST_SPLITMUX_UNLOCK (splitmux);
splitmux->fragment_id = 0;
break;
}
case GST_STATE_CHANGE_READY_TO_PAUSED:{
GST_SPLITMUX_LOCK (splitmux);
/* Start by collecting one input on each pad */
splitmux->input_state = SPLITMUX_INPUT_STATE_COLLECTING_GOP_START;
splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE;
splitmux->max_in_running_time = GST_CLOCK_STIME_NONE;
splitmux->gop_start_time = splitmux->fragment_start_time =
GST_CLOCK_STIME_NONE;
splitmux->muxed_out_bytes = 0;
splitmux->ready_for_output = FALSE;
GST_SPLITMUX_UNLOCK (splitmux);
break;
}
case GST_STATE_CHANGE_PAUSED_TO_READY:
case GST_STATE_CHANGE_READY_TO_NULL:
GST_SPLITMUX_LOCK (splitmux);
splitmux->output_state = SPLITMUX_OUTPUT_STATE_STOPPED;
splitmux->input_state = SPLITMUX_INPUT_STATE_STOPPED;
/* Wake up any blocked threads */
GST_LOG_OBJECT (splitmux,
"State change -> NULL or READY. Waking threads");
GST_SPLITMUX_BROADCAST_INPUT (splitmux);
GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
GST_SPLITMUX_UNLOCK (splitmux);
break;
default:
break;
}
ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
if (ret == GST_STATE_CHANGE_FAILURE)
goto beach;
switch (transition) {
case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
splitmux->need_async_start = TRUE;
break;
case GST_STATE_CHANGE_READY_TO_PAUSED:{
/* Change state async, because our child sink might not
* be ready to do that for us yet if it's state is still locked */
splitmux->need_async_start = TRUE;
/* we want to go async to PAUSED until we managed to configure and add the
* sink */
GST_SPLITMUX_LOCK (splitmux);
do_async_start (splitmux);
GST_SPLITMUX_UNLOCK (splitmux);
ret = GST_STATE_CHANGE_ASYNC;
break;
}
case GST_STATE_CHANGE_READY_TO_NULL:
GST_SPLITMUX_LOCK (splitmux);
splitmux->fragment_id = 0;
/* Reset internal elements only if no pad contexts are using them */
if (splitmux->contexts == NULL)
gst_splitmux_reset (splitmux);
do_async_done (splitmux);
GST_SPLITMUX_UNLOCK (splitmux);
break;
default:
break;
}
beach:
if (transition == GST_STATE_CHANGE_NULL_TO_READY &&
ret == GST_STATE_CHANGE_FAILURE) {
/* Cleanup elements on failed transition out of NULL */
gst_splitmux_reset (splitmux);
GST_SPLITMUX_LOCK (splitmux);
do_async_done (splitmux);
GST_SPLITMUX_UNLOCK (splitmux);
}
return ret;
}
gboolean
register_splitmuxsink (GstPlugin * plugin)
{
GST_DEBUG_CATEGORY_INIT (splitmux_debug, "splitmuxsink", 0,
"Split File Muxing Sink");
return gst_element_register (plugin, "splitmuxsink", GST_RANK_NONE,
GST_TYPE_SPLITMUX_SINK);
}
static void
gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux)
{
if (splitmux->max_files && splitmux->fragment_id >= splitmux->max_files) {
splitmux->fragment_id = 0;
}
}