gstreamer/gst/multifile/gstsplitmuxsink.c
Jan Schmidt 8ff5dd8029 splitmuxsink: Fix GObject warnings on shutdown.
Commit 83e718 added a pad template to splitmux request
pads, which means that GstElement now releases the pads on
dispose, but after having removed all elements in the bin
and unlinked them. Make sure we can handle cleanup in that case
without throwing assertions.

https://bugzilla.gnome.org/show_bug.cgi?id=773784
2016-11-02 11:02:12 +11:00

1816 lines
58 KiB
C

/* GStreamer Muxer bin that splits output stream by size/time
* Copyright (C) <2014> Jan Schmidt <jan@centricular.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
/**
* SECTION:element-splitmuxsink
* @short_description: Muxer wrapper for splitting output stream by size or time
*
* This element wraps a muxer and a sink, and starts a new file when the mux
* contents are about to cross a threshold of maximum size of maximum time,
* splitting at video keyframe boundaries. Exactly one input video stream
* can be muxed, with as many accompanying audio and subtitle streams as
* desired.
*
* By default, it uses mp4mux and filesink, but they can be changed via
* the 'muxer' and 'sink' properties.
*
* The minimum file size is 1 GOP, however - so limits may be overrun if the
* distance between any 2 keyframes is larger than the limits.
*
* If a video stream is available, the splitting process is driven by the video
* stream contents, and the video stream must contain closed GOPs for the output
* file parts to be played individually correctly. In the absence of a video
* stream, the first available stream is used as reference for synchronization.
*
* <refsect2>
* <title>Example pipelines</title>
* |[
* gst-launch-1.0 -e v4l2src num-buffers=500 ! video/x-raw,width=320,height=240 ! videoconvert ! queue ! timeoverlay ! x264enc key-int-max=10 ! h264parse ! splitmuxsink location=video%02d.mov max-size-time=10000000000 max-size-bytes=1000000
* ]|
* Records a video stream captured from a v4l2 device and muxes it into
* ISO mp4 files, splitting as needed to limit size/duration to 10 seconds
* and 1MB maximum size.
* </refsect2>
*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include <string.h>
#include <glib/gstdio.h>
#include <gst/video/video.h>
#include "gstsplitmuxsink.h"
GST_DEBUG_CATEGORY_STATIC (splitmux_debug);
#define GST_CAT_DEFAULT splitmux_debug
#define GST_SPLITMUX_LOCK(s) g_mutex_lock(&(s)->lock)
#define GST_SPLITMUX_UNLOCK(s) g_mutex_unlock(&(s)->lock)
#define GST_SPLITMUX_WAIT(s) g_cond_wait (&(s)->data_cond, &(s)->lock)
#define GST_SPLITMUX_BROADCAST(s) g_cond_broadcast (&(s)->data_cond)
enum
{
PROP_0,
PROP_LOCATION,
PROP_MAX_SIZE_TIME,
PROP_MAX_SIZE_BYTES,
PROP_SEND_KEYFRAME_REQUESTS,
PROP_MAX_FILES,
PROP_MUXER_OVERHEAD,
PROP_MUXER,
PROP_SINK
};
#define DEFAULT_MAX_SIZE_TIME 0
#define DEFAULT_MAX_SIZE_BYTES 0
#define DEFAULT_MAX_FILES 0
#define DEFAULT_MUXER_OVERHEAD 0.02
#define DEFAULT_SEND_KEYFRAME_REQUESTS FALSE
#define DEFAULT_MUXER "mp4mux"
#define DEFAULT_SINK "filesink"
enum
{
SIGNAL_FORMAT_LOCATION,
SIGNAL_LAST
};
static guint signals[SIGNAL_LAST];
static GstStaticPadTemplate video_sink_template =
GST_STATIC_PAD_TEMPLATE ("video",
GST_PAD_SINK,
GST_PAD_REQUEST,
GST_STATIC_CAPS_ANY);
static GstStaticPadTemplate audio_sink_template =
GST_STATIC_PAD_TEMPLATE ("audio_%u",
GST_PAD_SINK,
GST_PAD_REQUEST,
GST_STATIC_CAPS_ANY);
static GstStaticPadTemplate subtitle_sink_template =
GST_STATIC_PAD_TEMPLATE ("subtitle_%u",
GST_PAD_SINK,
GST_PAD_REQUEST,
GST_STATIC_CAPS_ANY);
static GQuark PAD_CONTEXT;
static void
_do_init (void)
{
PAD_CONTEXT = g_quark_from_static_string ("pad-context");
}
#define gst_splitmux_sink_parent_class parent_class
G_DEFINE_TYPE_EXTENDED (GstSplitMuxSink, gst_splitmux_sink, GST_TYPE_BIN, 0,
_do_init ());
static gboolean create_elements (GstSplitMuxSink * splitmux);
static gboolean create_sink (GstSplitMuxSink * splitmux);
static void gst_splitmux_sink_set_property (GObject * object, guint prop_id,
const GValue * value, GParamSpec * pspec);
static void gst_splitmux_sink_get_property (GObject * object, guint prop_id,
GValue * value, GParamSpec * pspec);
static void gst_splitmux_sink_dispose (GObject * object);
static void gst_splitmux_sink_finalize (GObject * object);
static GstPad *gst_splitmux_sink_request_new_pad (GstElement * element,
GstPadTemplate * templ, const gchar * name, const GstCaps * caps);
static void gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad);
static GstStateChangeReturn gst_splitmux_sink_change_state (GstElement *
element, GstStateChange transition);
static void bus_handler (GstBin * bin, GstMessage * msg);
static void set_next_filename (GstSplitMuxSink * splitmux);
static void start_next_fragment (GstSplitMuxSink * splitmux);
static void check_queue_length (GstSplitMuxSink * splitmux, MqStreamCtx * ctx);
static void mq_stream_ctx_unref (MqStreamCtx * ctx);
static void gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux);
static MqStreamBuf *
mq_stream_buf_new (void)
{
return g_slice_new0 (MqStreamBuf);
}
static void
mq_stream_buf_free (MqStreamBuf * data)
{
g_slice_free (MqStreamBuf, data);
}
static void
gst_splitmux_sink_class_init (GstSplitMuxSinkClass * klass)
{
GObjectClass *gobject_class = (GObjectClass *) klass;
GstElementClass *gstelement_class = (GstElementClass *) klass;
GstBinClass *gstbin_class = (GstBinClass *) klass;
gobject_class->set_property = gst_splitmux_sink_set_property;
gobject_class->get_property = gst_splitmux_sink_get_property;
gobject_class->dispose = gst_splitmux_sink_dispose;
gobject_class->finalize = gst_splitmux_sink_finalize;
gst_element_class_set_static_metadata (gstelement_class,
"Split Muxing Bin", "Generic/Bin/Muxer",
"Convenience bin that muxes incoming streams into multiple time/size limited files",
"Jan Schmidt <jan@centricular.com>");
gst_element_class_add_static_pad_template (gstelement_class,
&video_sink_template);
gst_element_class_add_static_pad_template (gstelement_class,
&audio_sink_template);
gst_element_class_add_static_pad_template (gstelement_class,
&subtitle_sink_template);
gstelement_class->change_state =
GST_DEBUG_FUNCPTR (gst_splitmux_sink_change_state);
gstelement_class->request_new_pad =
GST_DEBUG_FUNCPTR (gst_splitmux_sink_request_new_pad);
gstelement_class->release_pad =
GST_DEBUG_FUNCPTR (gst_splitmux_sink_release_pad);
gstbin_class->handle_message = bus_handler;
g_object_class_install_property (gobject_class, PROP_LOCATION,
g_param_spec_string ("location", "File Output Pattern",
"Format string pattern for the location of the files to write (e.g. video%05d.mp4)",
NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_MUXER_OVERHEAD,
g_param_spec_double ("mux-overhead", "Muxing Overhead",
"Extra size overhead of muxing (0.02 = 2%)", 0.0, 1.0,
DEFAULT_MUXER_OVERHEAD,
G_PARAM_READWRITE | GST_PARAM_CONTROLLABLE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
"Max. amount of time per file (in ns, 0=disable)", 0, G_MAXUINT64,
DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BYTES,
g_param_spec_uint64 ("max-size-bytes", "Max. size bytes",
"Max. amount of data per file (in bytes, 0=disable)", 0, G_MAXUINT64,
DEFAULT_MAX_SIZE_BYTES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_SEND_KEYFRAME_REQUESTS,
g_param_spec_boolean ("send-keyframe-requests",
"Request keyframes at max-size-time",
"Request a keyframe every max-size-time ns to try splitting at that point. "
"Needs max-size-bytes to be 0 in order to be effective.",
DEFAULT_SEND_KEYFRAME_REQUESTS,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_MAX_FILES,
g_param_spec_uint ("max-files", "Max files",
"Maximum number of files to keep on disk. Once the maximum is reached,"
"old files start to be deleted to make room for new ones.", 0,
G_MAXUINT, DEFAULT_MAX_FILES,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_MUXER,
g_param_spec_object ("muxer", "Muxer",
"The muxer element to use (NULL = default mp4mux)",
GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_SINK,
g_param_spec_object ("sink", "Sink",
"The sink element (or element chain) to use (NULL = default filesink)",
GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
/**
* GstSplitMuxSink::format-location:
* @splitmux: the #GstSplitMuxSink
* @fragment_id: the sequence number of the file to be created
*
* Returns: the location to be used for the next output file
*/
signals[SIGNAL_FORMAT_LOCATION] =
g_signal_new ("format-location", G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_STRING, 1, G_TYPE_UINT);
}
static void
gst_splitmux_sink_init (GstSplitMuxSink * splitmux)
{
g_mutex_init (&splitmux->lock);
g_cond_init (&splitmux->data_cond);
splitmux->mux_overhead = DEFAULT_MUXER_OVERHEAD;
splitmux->threshold_time = DEFAULT_MAX_SIZE_TIME;
splitmux->threshold_bytes = DEFAULT_MAX_SIZE_BYTES;
splitmux->max_files = DEFAULT_MAX_FILES;
splitmux->send_keyframe_requests = DEFAULT_SEND_KEYFRAME_REQUESTS;
GST_OBJECT_FLAG_SET (splitmux, GST_ELEMENT_FLAG_SINK);
}
static void
gst_splitmux_reset (GstSplitMuxSink * splitmux)
{
if (splitmux->mq) {
gst_element_set_locked_state (splitmux->mq, TRUE);
gst_element_set_state (splitmux->mq, GST_STATE_NULL);
gst_bin_remove (GST_BIN (splitmux), splitmux->mq);
}
if (splitmux->muxer) {
gst_element_set_locked_state (splitmux->muxer, TRUE);
gst_element_set_state (splitmux->muxer, GST_STATE_NULL);
gst_bin_remove (GST_BIN (splitmux), splitmux->muxer);
}
if (splitmux->active_sink) {
gst_element_set_locked_state (splitmux->active_sink, TRUE);
gst_element_set_state (splitmux->active_sink, GST_STATE_NULL);
gst_bin_remove (GST_BIN (splitmux), splitmux->active_sink);
}
splitmux->sink = splitmux->active_sink = splitmux->muxer = splitmux->mq =
NULL;
}
static void
gst_splitmux_sink_dispose (GObject * object)
{
GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
/* Calling parent dispose invalidates all child pointers */
splitmux->sink = splitmux->active_sink = splitmux->muxer = splitmux->mq =
NULL;
G_OBJECT_CLASS (parent_class)->dispose (object);
}
static void
gst_splitmux_sink_finalize (GObject * object)
{
GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
g_cond_clear (&splitmux->data_cond);
g_mutex_clear (&splitmux->lock);
if (splitmux->provided_sink)
gst_object_unref (splitmux->provided_sink);
if (splitmux->provided_muxer)
gst_object_unref (splitmux->provided_muxer);
g_free (splitmux->location);
/* Make sure to free any un-released contexts */
g_list_foreach (splitmux->contexts, (GFunc) mq_stream_ctx_unref, NULL);
g_list_free (splitmux->contexts);
G_OBJECT_CLASS (parent_class)->finalize (object);
}
static void
gst_splitmux_sink_set_property (GObject * object, guint prop_id,
const GValue * value, GParamSpec * pspec)
{
GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
switch (prop_id) {
case PROP_LOCATION:{
GST_OBJECT_LOCK (splitmux);
g_free (splitmux->location);
splitmux->location = g_value_dup_string (value);
GST_OBJECT_UNLOCK (splitmux);
break;
}
case PROP_MAX_SIZE_BYTES:
GST_OBJECT_LOCK (splitmux);
splitmux->threshold_bytes = g_value_get_uint64 (value);
GST_OBJECT_UNLOCK (splitmux);
break;
case PROP_MAX_SIZE_TIME:
GST_OBJECT_LOCK (splitmux);
splitmux->threshold_time = g_value_get_uint64 (value);
GST_OBJECT_UNLOCK (splitmux);
break;
case PROP_SEND_KEYFRAME_REQUESTS:
GST_OBJECT_LOCK (splitmux);
splitmux->send_keyframe_requests = g_value_get_boolean (value);
GST_OBJECT_UNLOCK (splitmux);
break;
case PROP_MAX_FILES:
GST_OBJECT_LOCK (splitmux);
splitmux->max_files = g_value_get_uint (value);
GST_OBJECT_UNLOCK (splitmux);
break;
case PROP_MUXER_OVERHEAD:
GST_OBJECT_LOCK (splitmux);
splitmux->mux_overhead = g_value_get_double (value);
GST_OBJECT_UNLOCK (splitmux);
break;
case PROP_SINK:
GST_OBJECT_LOCK (splitmux);
if (splitmux->provided_sink)
gst_object_unref (splitmux->provided_sink);
splitmux->provided_sink = g_value_get_object (value);
gst_object_ref_sink (splitmux->provided_sink);
GST_OBJECT_UNLOCK (splitmux);
break;
case PROP_MUXER:
GST_OBJECT_LOCK (splitmux);
if (splitmux->provided_muxer)
gst_object_unref (splitmux->provided_muxer);
splitmux->provided_muxer = g_value_get_object (value);
gst_object_ref_sink (splitmux->provided_muxer);
GST_OBJECT_UNLOCK (splitmux);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
}
}
static void
gst_splitmux_sink_get_property (GObject * object, guint prop_id,
GValue * value, GParamSpec * pspec)
{
GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
switch (prop_id) {
case PROP_LOCATION:
GST_OBJECT_LOCK (splitmux);
g_value_set_string (value, splitmux->location);
GST_OBJECT_UNLOCK (splitmux);
break;
case PROP_MAX_SIZE_BYTES:
GST_OBJECT_LOCK (splitmux);
g_value_set_uint64 (value, splitmux->threshold_bytes);
GST_OBJECT_UNLOCK (splitmux);
break;
case PROP_MAX_SIZE_TIME:
GST_OBJECT_LOCK (splitmux);
g_value_set_uint64 (value, splitmux->threshold_time);
GST_OBJECT_UNLOCK (splitmux);
break;
case PROP_SEND_KEYFRAME_REQUESTS:
GST_OBJECT_LOCK (splitmux);
g_value_set_boolean (value, splitmux->send_keyframe_requests);
GST_OBJECT_UNLOCK (splitmux);
break;
case PROP_MAX_FILES:
GST_OBJECT_LOCK (splitmux);
g_value_set_uint (value, splitmux->max_files);
GST_OBJECT_UNLOCK (splitmux);
break;
case PROP_MUXER_OVERHEAD:
GST_OBJECT_LOCK (splitmux);
g_value_set_double (value, splitmux->mux_overhead);
GST_OBJECT_UNLOCK (splitmux);
break;
case PROP_SINK:
GST_OBJECT_LOCK (splitmux);
g_value_set_object (value, splitmux->provided_sink);
GST_OBJECT_UNLOCK (splitmux);
break;
case PROP_MUXER:
GST_OBJECT_LOCK (splitmux);
g_value_set_object (value, splitmux->provided_muxer);
GST_OBJECT_UNLOCK (splitmux);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
}
}
/* Convenience function */
static inline GstClockTimeDiff
my_segment_to_running_time (GstSegment * segment, GstClockTime val)
{
GstClockTimeDiff res = GST_CLOCK_STIME_NONE;
if (GST_CLOCK_TIME_IS_VALID (val)) {
gboolean sign =
gst_segment_to_running_time_full (segment, GST_FORMAT_TIME, val, &val);
if (sign > 0)
res = val;
else if (sign < 0)
res = -val;
}
return res;
}
static GstPad *
mq_sink_to_src (GstElement * mq, GstPad * sink_pad)
{
gchar *tmp, *sinkname, *srcname;
GstPad *mq_src;
sinkname = gst_pad_get_name (sink_pad);
tmp = sinkname + 5;
srcname = g_strdup_printf ("src_%s", tmp);
mq_src = gst_element_get_static_pad (mq, srcname);
g_free (sinkname);
g_free (srcname);
return mq_src;
}
static gboolean
get_pads_from_mq (GstSplitMuxSink * splitmux, GstPad ** sink_pad,
GstPad ** src_pad)
{
GstPad *mq_sink;
GstPad *mq_src;
/* Request a pad from multiqueue, then connect this one, then
* discover the corresponding output pad and return both */
mq_sink = gst_element_get_request_pad (splitmux->mq, "sink_%u");
if (mq_sink == NULL)
return FALSE;
mq_src = mq_sink_to_src (splitmux->mq, mq_sink);
if (mq_src == NULL)
goto fail;
*sink_pad = mq_sink;
*src_pad = mq_src;
return TRUE;
fail:
gst_element_release_request_pad (splitmux->mq, mq_sink);
return FALSE;
}
static MqStreamCtx *
mq_stream_ctx_new (GstSplitMuxSink * splitmux)
{
MqStreamCtx *ctx;
ctx = g_new0 (MqStreamCtx, 1);
g_atomic_int_set (&ctx->refcount, 1);
ctx->splitmux = splitmux;
gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
ctx->in_running_time = ctx->out_running_time = GST_CLOCK_STIME_NONE;
g_queue_init (&ctx->queued_bufs);
return ctx;
}
static void
mq_stream_ctx_free (MqStreamCtx * ctx)
{
g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
g_queue_clear (&ctx->queued_bufs);
g_free (ctx);
}
static void
mq_stream_ctx_unref (MqStreamCtx * ctx)
{
if (g_atomic_int_dec_and_test (&ctx->refcount))
mq_stream_ctx_free (ctx);
}
static void
mq_stream_ctx_ref (MqStreamCtx * ctx)
{
g_atomic_int_inc (&ctx->refcount);
}
static void
_pad_block_destroy_sink_notify (MqStreamCtx * ctx)
{
ctx->sink_pad_block_id = 0;
mq_stream_ctx_unref (ctx);
}
static void
_pad_block_destroy_src_notify (MqStreamCtx * ctx)
{
ctx->src_pad_block_id = 0;
mq_stream_ctx_unref (ctx);
}
static void
send_fragment_opened_closed_msg (GstSplitMuxSink * splitmux, gboolean opened)
{
gchar *location = NULL;
GstMessage *msg;
const gchar *msg_name = opened ?
"splitmuxsink-fragment-opened" : "splitmuxsink-fragment-closed";
g_object_get (splitmux->sink, "location", &location, NULL);
msg = gst_message_new_element (GST_OBJECT (splitmux),
gst_structure_new (msg_name,
"location", G_TYPE_STRING, location,
"running-time", GST_TYPE_CLOCK_TIME,
splitmux->reference_ctx->out_running_time, NULL));
gst_element_post_message (GST_ELEMENT_CAST (splitmux), msg);
g_free (location);
}
/* Called with lock held, drops the lock to send EOS to the
* pad
*/
static void
send_eos (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
{
GstEvent *eos;
GstPad *pad;
eos = gst_event_new_eos ();
pad = gst_pad_get_peer (ctx->srcpad);
ctx->out_eos = TRUE;
GST_INFO_OBJECT (splitmux, "Sending EOS on %" GST_PTR_FORMAT, pad);
GST_SPLITMUX_UNLOCK (splitmux);
gst_pad_send_event (pad, eos);
GST_SPLITMUX_LOCK (splitmux);
gst_object_unref (pad);
}
/* Called with splitmux lock held to check if this output
* context needs to sleep to wait for the release of the
* next GOP, or to send EOS to close out the current file
*/
static void
complete_or_wait_on_out (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
{
do {
GST_LOG_OBJECT (ctx->srcpad,
"Checking running time %" GST_STIME_FORMAT " against max %"
GST_STIME_FORMAT, GST_STIME_ARGS (ctx->out_running_time),
GST_STIME_ARGS (splitmux->max_out_running_time));
if (splitmux->max_out_running_time == GST_CLOCK_STIME_NONE ||
ctx->out_running_time < splitmux->max_out_running_time) {
splitmux->have_muxed_something = TRUE;
return;
}
if (ctx->flushing || splitmux->state == SPLITMUX_STATE_STOPPED)
return;
if (splitmux->state == SPLITMUX_STATE_ENDING_FILE) {
if (ctx->out_eos == FALSE) {
send_eos (splitmux, ctx);
continue;
}
} else if (splitmux->state == SPLITMUX_STATE_START_NEXT_FRAGMENT) {
start_next_fragment (splitmux);
continue;
}
GST_INFO_OBJECT (ctx->srcpad,
"Sleeping for running time %"
GST_STIME_FORMAT " (max %" GST_STIME_FORMAT ")",
GST_STIME_ARGS (ctx->out_running_time),
GST_STIME_ARGS (splitmux->max_out_running_time));
ctx->out_blocked = TRUE;
/* Expand the mq if needed before sleeping */
check_queue_length (splitmux, ctx);
GST_SPLITMUX_WAIT (splitmux);
ctx->out_blocked = FALSE;
GST_INFO_OBJECT (ctx->srcpad,
"Woken for new max running time %" GST_STIME_FORMAT,
GST_STIME_ARGS (splitmux->max_out_running_time));
} while (1);
}
static gboolean
request_next_keyframe (GstSplitMuxSink * splitmux)
{
GstEvent *ev;
if (splitmux->send_keyframe_requests == FALSE || splitmux->threshold_time == 0
|| splitmux->threshold_bytes != 0)
return TRUE;
ev = gst_video_event_new_upstream_force_key_unit (splitmux->fragment_id *
splitmux->threshold_time, TRUE, 0);
GST_DEBUG_OBJECT (splitmux, "Requesting next keyframe at %" GST_TIME_FORMAT,
GST_TIME_ARGS (splitmux->fragment_id * splitmux->threshold_time));
return gst_pad_push_event (splitmux->reference_ctx->sinkpad, ev);
}
static GstPadProbeReturn
handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
{
GstSplitMuxSink *splitmux = ctx->splitmux;
MqStreamBuf *buf_info = NULL;
GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type);
/* FIXME: Handle buffer lists, until then make it clear they won't work */
if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
g_warning ("Buffer list handling not implemented");
return GST_PAD_PROBE_DROP;
}
if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM) {
GstEvent *event = gst_pad_probe_info_get_event (info);
GST_LOG_OBJECT (pad, "Event %" GST_PTR_FORMAT, event);
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_SEGMENT:
gst_event_copy_segment (event, &ctx->out_segment);
break;
case GST_EVENT_FLUSH_STOP:
GST_SPLITMUX_LOCK (splitmux);
gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
g_queue_clear (&ctx->queued_bufs);
ctx->flushing = FALSE;
GST_SPLITMUX_UNLOCK (splitmux);
break;
case GST_EVENT_FLUSH_START:
GST_SPLITMUX_LOCK (splitmux);
GST_LOG_OBJECT (pad, "Flush start");
ctx->flushing = TRUE;
GST_SPLITMUX_BROADCAST (splitmux);
GST_SPLITMUX_UNLOCK (splitmux);
break;
case GST_EVENT_EOS:
GST_SPLITMUX_LOCK (splitmux);
if (splitmux->state == SPLITMUX_STATE_STOPPED)
goto beach;
ctx->out_eos = TRUE;
GST_SPLITMUX_UNLOCK (splitmux);
break;
case GST_EVENT_GAP:{
GstClockTime gap_ts;
GstClockTimeDiff rtime;
gst_event_parse_gap (event, &gap_ts, NULL);
if (gap_ts == GST_CLOCK_TIME_NONE)
break;
GST_SPLITMUX_LOCK (splitmux);
rtime = my_segment_to_running_time (&ctx->out_segment, gap_ts);
GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_STIME_FORMAT,
GST_STIME_ARGS (rtime));
if (splitmux->state == SPLITMUX_STATE_STOPPED)
goto beach;
if (rtime != GST_CLOCK_STIME_NONE) {
ctx->out_running_time = rtime;
complete_or_wait_on_out (splitmux, ctx);
}
GST_SPLITMUX_UNLOCK (splitmux);
break;
}
case GST_EVENT_CUSTOM_DOWNSTREAM:{
const GstStructure *s;
GstClockTimeDiff ts = 0;
s = gst_event_get_structure (event);
if (!gst_structure_has_name (s, "splitmuxsink-unblock"))
break;
gst_structure_get_int64 (s, "timestamp", &ts);
GST_SPLITMUX_LOCK (splitmux);
if (splitmux->state == SPLITMUX_STATE_STOPPED)
goto beach;
ctx->out_running_time = ts;
complete_or_wait_on_out (splitmux, ctx);
GST_SPLITMUX_UNLOCK (splitmux);
return GST_PAD_PROBE_DROP;
}
default:
break;
}
return GST_PAD_PROBE_PASS;
}
/* Allow everything through until the configured next stopping point */
GST_SPLITMUX_LOCK (splitmux);
buf_info = g_queue_pop_tail (&ctx->queued_bufs);
if (buf_info == NULL)
/* Can only happen due to a poorly timed flush */
goto beach;
/* If we have popped a keyframe, decrement the queued_gop count */
if (buf_info->keyframe && splitmux->queued_gops > 0)
splitmux->queued_gops--;
ctx->out_running_time = buf_info->run_ts;
GST_LOG_OBJECT (splitmux,
"Pad %" GST_PTR_FORMAT " buffer with run TS %" GST_STIME_FORMAT
" size %" G_GUINT64_FORMAT,
pad, GST_STIME_ARGS (ctx->out_running_time), buf_info->buf_size);
if (splitmux->opening_first_fragment) {
if (request_next_keyframe (splitmux) == FALSE)
GST_WARNING_OBJECT (splitmux,
"Could not request a keyframe. Files may not split at the exact location they should");
send_fragment_opened_closed_msg (splitmux, TRUE);
splitmux->opening_first_fragment = FALSE;
}
complete_or_wait_on_out (splitmux, ctx);
if (splitmux->muxed_out_time == GST_CLOCK_STIME_NONE ||
splitmux->muxed_out_time < buf_info->run_ts)
splitmux->muxed_out_time = buf_info->run_ts;
splitmux->muxed_out_bytes += buf_info->buf_size;
splitmux->last_frame_duration = buf_info->duration;
#ifndef GST_DISABLE_GST_DEBUG
{
GstBuffer *buf = gst_pad_probe_info_get_buffer (info);
GST_LOG_OBJECT (pad, "Returning to pass buffer %" GST_PTR_FORMAT
" run ts %" GST_STIME_FORMAT, buf,
GST_STIME_ARGS (ctx->out_running_time));
}
#endif
GST_SPLITMUX_UNLOCK (splitmux);
mq_stream_buf_free (buf_info);
return GST_PAD_PROBE_PASS;
beach:
GST_SPLITMUX_UNLOCK (splitmux);
return GST_PAD_PROBE_DROP;
}
static gboolean
resend_sticky (GstPad * pad, GstEvent ** event, GstPad * peer)
{
return gst_pad_send_event (peer, gst_event_ref (*event));
}
static void
restart_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
{
GstPad *peer = gst_pad_get_peer (ctx->srcpad);
gst_pad_sticky_events_foreach (ctx->srcpad,
(GstPadStickyEventsForeachFunction) (resend_sticky), peer);
/* Clear EOS flag if not actually EOS */
ctx->out_eos = GST_PAD_IS_EOS (ctx->srcpad);
gst_object_unref (peer);
}
/* Called with lock held when a fragment
* reaches EOS and it is time to restart
* a new fragment
*/
static void
start_next_fragment (GstSplitMuxSink * splitmux)
{
/* 1 change to new file */
splitmux->switching_fragment = TRUE;
gst_element_set_locked_state (splitmux->muxer, TRUE);
gst_element_set_locked_state (splitmux->active_sink, TRUE);
gst_element_set_state (splitmux->muxer, GST_STATE_NULL);
gst_element_set_state (splitmux->active_sink, GST_STATE_NULL);
set_next_filename (splitmux);
gst_element_set_state (splitmux->active_sink, GST_STATE_TARGET (splitmux));
gst_element_set_state (splitmux->muxer, GST_STATE_TARGET (splitmux));
gst_element_set_locked_state (splitmux->muxer, FALSE);
gst_element_set_locked_state (splitmux->active_sink, FALSE);
splitmux->switching_fragment = FALSE;
g_list_foreach (splitmux->contexts, (GFunc) restart_context, splitmux);
/* Switch state and go back to processing */
if (!splitmux->reference_ctx->in_eos) {
splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START;
splitmux->max_out_running_time = splitmux->reference_ctx->in_running_time;
} else {
splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
splitmux->max_out_running_time = GST_CLOCK_STIME_NONE;
splitmux->have_muxed_something = FALSE;
}
splitmux->have_muxed_something =
(splitmux->reference_ctx->in_running_time > splitmux->muxed_out_time);
/* Store the overflow parameters as the basis for the next fragment */
splitmux->mux_start_time = splitmux->muxed_out_time;
if (splitmux->last_frame_duration != GST_CLOCK_STIME_NONE)
splitmux->mux_start_time += splitmux->last_frame_duration;
splitmux->mux_start_bytes = splitmux->muxed_out_bytes;
GST_DEBUG_OBJECT (splitmux,
"Restarting flow for new fragment. New running time %" GST_STIME_FORMAT,
GST_STIME_ARGS (splitmux->max_out_running_time));
send_fragment_opened_closed_msg (splitmux, TRUE);
if (request_next_keyframe (splitmux) == FALSE)
GST_WARNING_OBJECT (splitmux,
"Could not request a keyframe. Files may not split at the exact location they should");
GST_SPLITMUX_BROADCAST (splitmux);
}
static void
bus_handler (GstBin * bin, GstMessage * message)
{
GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (bin);
switch (GST_MESSAGE_TYPE (message)) {
case GST_MESSAGE_EOS:
/* If the state is draining out the current file, drop this EOS */
GST_SPLITMUX_LOCK (splitmux);
send_fragment_opened_closed_msg (splitmux, FALSE);
if (splitmux->state == SPLITMUX_STATE_ENDING_FILE &&
splitmux->max_out_running_time != GST_CLOCK_STIME_NONE) {
GST_DEBUG_OBJECT (splitmux, "Caught EOS at end of fragment, dropping");
splitmux->state = SPLITMUX_STATE_START_NEXT_FRAGMENT;
GST_SPLITMUX_BROADCAST (splitmux);
gst_message_unref (message);
GST_SPLITMUX_UNLOCK (splitmux);
return;
}
GST_SPLITMUX_UNLOCK (splitmux);
break;
case GST_MESSAGE_ASYNC_START:
case GST_MESSAGE_ASYNC_DONE:
/* Ignore state changes from our children while switching */
if (splitmux->switching_fragment) {
if (GST_MESSAGE_SRC (message) == (GstObject *) splitmux->active_sink ||
GST_MESSAGE_SRC (message) == (GstObject *) splitmux->muxer) {
GST_LOG_OBJECT (splitmux,
"Ignoring state change from child %" GST_PTR_FORMAT
" while switching", GST_MESSAGE_SRC (message));
gst_message_unref (message);
return;
}
}
break;
default:
break;
}
GST_BIN_CLASS (parent_class)->handle_message (bin, message);
}
/* Called with splitmux lock held */
/* Called when entering ProcessingCompleteGop state
* Assess if mq contents overflowed the current file
* -> If yes, need to switch to new file
* -> if no, set max_out_running_time to let this GOP in and
* go to COLLECTING_GOP_START state
*/
static void
handle_gathered_gop (GstSplitMuxSink * splitmux)
{
GList *cur;
guint64 queued_bytes = 0;
GstClockTimeDiff queued_time = 0;
/* Assess if the multiqueue contents overflowed the current file */
for (cur = g_list_first (splitmux->contexts);
cur != NULL; cur = g_list_next (cur)) {
MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
if (tmpctx->in_running_time > queued_time)
queued_time = tmpctx->in_running_time;
queued_bytes += tmpctx->in_bytes;
}
GST_LOG_OBJECT (splitmux, " queued_bytes %" G_GUINT64_FORMAT
" splitmuxsink->mux_start_bytes %" G_GUINT64_FORMAT, queued_bytes,
splitmux->mux_start_bytes);
g_assert (queued_bytes >= splitmux->mux_start_bytes);
g_assert (queued_time >= splitmux->mux_start_time);
queued_bytes -= splitmux->mux_start_bytes;
queued_time -= splitmux->mux_start_time;
/* Expand queued bytes estimate by muxer overhead */
queued_bytes += (queued_bytes * splitmux->mux_overhead);
GST_LOG_OBJECT (splitmux, "mq at TS %" GST_STIME_FORMAT
" bytes %" G_GUINT64_FORMAT, GST_STIME_ARGS (queued_time), queued_bytes);
/* Check for overrun - have we output at least one byte and overrun
* either threshold? */
if ((splitmux->have_muxed_something &&
((splitmux->threshold_bytes > 0 &&
queued_bytes > splitmux->threshold_bytes) ||
(splitmux->threshold_time > 0 &&
queued_time > splitmux->threshold_time)))) {
splitmux->state = SPLITMUX_STATE_ENDING_FILE;
GST_INFO_OBJECT (splitmux,
"mq overflowed since last, draining out. max out TS is %"
GST_STIME_FORMAT, GST_STIME_ARGS (splitmux->max_out_running_time));
GST_SPLITMUX_BROADCAST (splitmux);
} else {
/* No overflow */
GST_LOG_OBJECT (splitmux,
"This GOP didn't overflow the fragment. Bytes sent %" G_GUINT64_FORMAT
" queued %" G_GUINT64_FORMAT " time %" GST_STIME_FORMAT " Continuing.",
splitmux->muxed_out_bytes - splitmux->mux_start_bytes,
queued_bytes, GST_STIME_ARGS (queued_time));
/* Wake everyone up to push this one GOP, then sleep */
splitmux->have_muxed_something = TRUE;
if (!splitmux->reference_ctx->in_eos) {
splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START;
splitmux->max_out_running_time = splitmux->reference_ctx->in_running_time;
} else {
splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
splitmux->max_out_running_time = GST_CLOCK_STIME_NONE;
}
GST_LOG_OBJECT (splitmux, "Waking output for complete GOP, TS %"
GST_STIME_FORMAT, GST_STIME_ARGS (splitmux->max_out_running_time));
GST_SPLITMUX_BROADCAST (splitmux);
}
}
/* Called with splitmux lock held */
/* Called from each input pad when it is has all the pieces
* for a GOP or EOS, starting with the reference pad which has set the
* splitmux->max_in_running_time
*/
static void
check_completed_gop (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
{
GList *cur;
gboolean ready = TRUE;
GstClockTimeDiff current_max_in_running_time;
if (splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE) {
/* Iterate each pad, and check that the input running time is at least
* up to the reference running time, and if so handle the collected GOP */
GST_LOG_OBJECT (splitmux, "Checking GOP collected, Max in running time %"
GST_STIME_FORMAT " ctx %p",
GST_STIME_ARGS (splitmux->max_in_running_time), ctx);
for (cur = g_list_first (splitmux->contexts); cur != NULL;
cur = g_list_next (cur)) {
MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
GST_LOG_OBJECT (splitmux,
"Context %p (src pad %" GST_PTR_FORMAT ") TS %" GST_STIME_FORMAT
" EOS %d", tmpctx, tmpctx->srcpad,
GST_STIME_ARGS (tmpctx->in_running_time), tmpctx->in_eos);
if (splitmux->max_in_running_time != G_MAXINT64 &&
tmpctx->in_running_time < splitmux->max_in_running_time &&
!tmpctx->in_eos) {
GST_LOG_OBJECT (splitmux,
"Context %p (src pad %" GST_PTR_FORMAT ") not ready. We'll sleep",
tmpctx, tmpctx->srcpad);
ready = FALSE;
break;
}
}
if (ready) {
GST_DEBUG_OBJECT (splitmux,
"Collected GOP is complete. Processing (ctx %p)", ctx);
/* All pads have a complete GOP, release it into the multiqueue */
handle_gathered_gop (splitmux);
}
}
/* If upstream reached EOS we are not expecting more data, no need to wait
* here. */
if (ctx->in_eos)
return;
/* Some pad is not yet ready, or GOP is being pushed
* either way, sleep and wait to get woken */
current_max_in_running_time = splitmux->max_in_running_time;
while ((splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE ||
splitmux->state == SPLITMUX_STATE_START_NEXT_FRAGMENT) &&
!ctx->flushing &&
(current_max_in_running_time == splitmux->max_in_running_time)) {
GST_LOG_OBJECT (splitmux, "Sleeping for %s (ctx %p)",
splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE ?
"GOP complete" : "EOF draining", ctx);
GST_SPLITMUX_WAIT (splitmux);
GST_LOG_OBJECT (splitmux, "Done waiting for complete GOP (ctx %p)", ctx);
}
}
static void
check_queue_length (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
{
GList *cur;
guint cur_len = g_queue_get_length (&ctx->queued_bufs);
GST_DEBUG_OBJECT (ctx->sinkpad,
"Checking queue length len %u cur_max %u queued gops %u",
cur_len, splitmux->mq_max_buffers, splitmux->queued_gops);
if (cur_len >= splitmux->mq_max_buffers) {
gboolean allow_grow = FALSE;
/* If collecting a GOP and this pad might block,
* and there isn't already a pending GOP in the queue
* then grow
*/
if (splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE &&
ctx->in_running_time < splitmux->max_in_running_time &&
splitmux->queued_gops <= 1) {
allow_grow = TRUE;
} else if (splitmux->state == SPLITMUX_STATE_COLLECTING_GOP_START &&
ctx->is_reference && splitmux->queued_gops <= 1) {
allow_grow = TRUE;
}
if (!allow_grow) {
for (cur = g_list_first (splitmux->contexts);
cur != NULL; cur = g_list_next (cur)) {
MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
GST_DEBUG_OBJECT (tmpctx->sinkpad,
" len %u out_blocked %d",
g_queue_get_length (&tmpctx->queued_bufs), tmpctx->out_blocked);
/* If another stream is starving, grow */
if (tmpctx != ctx && g_queue_get_length (&tmpctx->queued_bufs) < 1) {
allow_grow = TRUE;
}
}
}
if (allow_grow) {
splitmux->mq_max_buffers = cur_len + 1;
GST_INFO_OBJECT (splitmux,
"Multiqueue overrun - enlarging to %u buffers ctx %p",
splitmux->mq_max_buffers, ctx);
g_object_set (splitmux->mq, "max-size-buffers",
splitmux->mq_max_buffers, NULL);
}
}
}
static GstPadProbeReturn
handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
{
GstSplitMuxSink *splitmux = ctx->splitmux;
GstBuffer *buf;
MqStreamBuf *buf_info = NULL;
GstClockTime ts;
gboolean loop_again;
gboolean keyframe = FALSE;
GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type);
/* FIXME: Handle buffer lists, until then make it clear they won't work */
if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
g_warning ("Buffer list handling not implemented");
return GST_PAD_PROBE_DROP;
}
if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM) {
GstEvent *event = gst_pad_probe_info_get_event (info);
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_SEGMENT:
gst_event_copy_segment (event, &ctx->in_segment);
break;
case GST_EVENT_FLUSH_STOP:
GST_SPLITMUX_LOCK (splitmux);
gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
ctx->in_eos = FALSE;
ctx->in_bytes = 0;
ctx->in_running_time = GST_CLOCK_STIME_NONE;
GST_SPLITMUX_UNLOCK (splitmux);
break;
case GST_EVENT_EOS:
GST_SPLITMUX_LOCK (splitmux);
ctx->in_eos = TRUE;
if (splitmux->state == SPLITMUX_STATE_STOPPED)
goto beach;
if (ctx->is_reference) {
GST_INFO_OBJECT (splitmux, "Got Reference EOS. Finishing up");
/* Act as if this is a new keyframe with infinite timestamp */
splitmux->max_in_running_time = G_MAXINT64;
splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
/* Wake up other input pads to collect this GOP */
GST_SPLITMUX_BROADCAST (splitmux);
check_completed_gop (splitmux, ctx);
} else if (splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE) {
/* If we are waiting for a GOP to be completed (ie, for aux
* pads to catch up), then this pad is complete, so check
* if the whole GOP is.
*/
check_completed_gop (splitmux, ctx);
}
GST_SPLITMUX_UNLOCK (splitmux);
break;
default:
break;
}
return GST_PAD_PROBE_PASS;
}
buf = gst_pad_probe_info_get_buffer (info);
buf_info = mq_stream_buf_new ();
if (GST_BUFFER_PTS_IS_VALID (buf))
ts = GST_BUFFER_PTS (buf);
else
ts = GST_BUFFER_DTS (buf);
GST_LOG_OBJECT (pad, "Buffer TS is %" GST_TIME_FORMAT, GST_TIME_ARGS (ts));
GST_SPLITMUX_LOCK (splitmux);
if (splitmux->state == SPLITMUX_STATE_STOPPED)
goto beach;
/* If this buffer has a timestamp, advance the input timestamp of the
* stream */
if (GST_CLOCK_TIME_IS_VALID (ts)) {
GstClockTimeDiff running_time =
my_segment_to_running_time (&ctx->in_segment, ts);
GST_LOG_OBJECT (pad, "Buffer running TS is %" GST_STIME_FORMAT,
GST_STIME_ARGS (running_time));
if (GST_CLOCK_STIME_IS_VALID (running_time)
&& running_time > ctx->in_running_time)
ctx->in_running_time = running_time;
}
/* Try to make sure we have a valid running time */
if (!GST_CLOCK_STIME_IS_VALID (ctx->in_running_time)) {
ctx->in_running_time =
my_segment_to_running_time (&ctx->in_segment, ctx->in_segment.start);
}
GST_LOG_OBJECT (pad, "in running time now %" GST_STIME_FORMAT,
GST_STIME_ARGS (ctx->in_running_time));
buf_info->run_ts = ctx->in_running_time;
buf_info->buf_size = gst_buffer_get_size (buf);
buf_info->duration = GST_BUFFER_DURATION (buf);
/* Update total input byte counter for overflow detect */
ctx->in_bytes += buf_info->buf_size;
/* initialize mux_start_time */
if (ctx->is_reference && splitmux->mux_start_time == GST_CLOCK_STIME_NONE) {
splitmux->mux_start_time = buf_info->run_ts;
GST_LOG_OBJECT (splitmux, "Mux start time now %" GST_STIME_FORMAT,
GST_STIME_ARGS (splitmux->mux_start_time));
/* Also take this as the first start time when starting up,
* so that we start counting overflow from the first frame */
if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time))
splitmux->max_in_running_time = splitmux->mux_start_time;
}
GST_DEBUG_OBJECT (pad, "Buf TS %" GST_STIME_FORMAT
" total in_bytes %" G_GUINT64_FORMAT,
GST_STIME_ARGS (buf_info->run_ts), ctx->in_bytes);
loop_again = TRUE;
do {
if (ctx->flushing)
break;
switch (splitmux->state) {
case SPLITMUX_STATE_COLLECTING_GOP_START:
if (ctx->is_reference) {
/* If a keyframe, we have a complete GOP */
if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT) ||
!GST_CLOCK_STIME_IS_VALID (ctx->in_running_time) ||
splitmux->max_in_running_time >= ctx->in_running_time) {
/* Pass this buffer through */
loop_again = FALSE;
break;
}
GST_INFO_OBJECT (pad,
"Have keyframe with running time %" GST_STIME_FORMAT,
GST_STIME_ARGS (ctx->in_running_time));
keyframe = TRUE;
splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
splitmux->max_in_running_time = ctx->in_running_time;
/* Wake up other input pads to collect this GOP */
GST_SPLITMUX_BROADCAST (splitmux);
check_completed_gop (splitmux, ctx);
} else {
/* We're still waiting for a keyframe on the reference pad, sleep */
GST_LOG_OBJECT (pad, "Sleeping for GOP start");
GST_SPLITMUX_WAIT (splitmux);
GST_LOG_OBJECT (pad, "Done sleeping for GOP start state now %d",
splitmux->state);
}
break;
case SPLITMUX_STATE_WAITING_GOP_COMPLETE:
/* If we overran the target timestamp, it might be time to process
* the GOP, otherwise bail out for more data
*/
GST_LOG_OBJECT (pad,
"Checking TS %" GST_STIME_FORMAT " against max %" GST_STIME_FORMAT,
GST_STIME_ARGS (ctx->in_running_time),
GST_STIME_ARGS (splitmux->max_in_running_time));
if (ctx->in_running_time < splitmux->max_in_running_time) {
loop_again = FALSE;
break;
}
GST_LOG_OBJECT (pad,
"Collected last packet of GOP. Checking other pads");
check_completed_gop (splitmux, ctx);
break;
case SPLITMUX_STATE_ENDING_FILE:{
GstEvent *event;
/* If somes streams received no buffer during the last GOP that overran,
* because its next buffer has a timestamp bigger than
* ctx->max_in_running_time, its queue is empty. In that case the only
* way to wakeup the output thread is by injecting an event in the
* queue. This usually happen with subtitle streams.
* See https://bugzilla.gnome.org/show_bug.cgi?id=763711. */
GST_LOG_OBJECT (pad, "Sending splitmuxsink-unblock event");
event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM |
GST_EVENT_TYPE_SERIALIZED,
gst_structure_new ("splitmuxsink-unblock", "timestamp",
G_TYPE_INT64, splitmux->max_in_running_time, NULL));
GST_SPLITMUX_UNLOCK (splitmux);
gst_pad_send_event (ctx->sinkpad, event);
GST_SPLITMUX_LOCK (splitmux);
/* state may have changed while we were unlocked. Loop again if so */
if (splitmux->state != SPLITMUX_STATE_ENDING_FILE)
break;
/* fallthrough */
}
case SPLITMUX_STATE_START_NEXT_FRAGMENT:
/* A fragment is ending, wait until that's done before continuing */
GST_DEBUG_OBJECT (pad, "Sleeping for fragment restart");
GST_SPLITMUX_WAIT (splitmux);
GST_DEBUG_OBJECT (pad,
"Done sleeping for fragment restart state now %d", splitmux->state);
break;
default:
loop_again = FALSE;
break;
}
} while (loop_again);
if (keyframe) {
splitmux->queued_gops++;
buf_info->keyframe = TRUE;
}
/* Now add this buffer to the queue just before returning */
g_queue_push_head (&ctx->queued_bufs, buf_info);
/* Check the buffer will fit in the mq */
check_queue_length (splitmux, ctx);
GST_LOG_OBJECT (pad, "Returning to queue buffer %" GST_PTR_FORMAT
" run ts %" GST_STIME_FORMAT, buf, GST_STIME_ARGS (ctx->in_running_time));
GST_SPLITMUX_UNLOCK (splitmux);
return GST_PAD_PROBE_PASS;
beach:
GST_SPLITMUX_UNLOCK (splitmux);
if (buf_info)
mq_stream_buf_free (buf_info);
return GST_PAD_PROBE_PASS;
}
static GstPad *
gst_splitmux_sink_request_new_pad (GstElement * element,
GstPadTemplate * templ, const gchar * name, const GstCaps * caps)
{
GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
GstPadTemplate *mux_template = NULL;
GstPad *res = NULL;
GstPad *mq_sink, *mq_src;
gchar *gname;
gboolean is_video = FALSE;
MqStreamCtx *ctx;
GST_DEBUG_OBJECT (element, "templ:%s, name:%s", templ->name_template, name);
GST_SPLITMUX_LOCK (splitmux);
if (!create_elements (splitmux))
goto fail;
if (templ->name_template) {
if (g_str_equal (templ->name_template, "video")) {
if (splitmux->have_video)
goto already_have_video;
/* FIXME: Look for a pad template with matching caps, rather than by name */
mux_template =
gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
(splitmux->muxer), "video_%u");
is_video = TRUE;
name = NULL;
} else {
mux_template =
gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
(splitmux->muxer), templ->name_template);
}
if (mux_template == NULL) {
/* Fallback to find sink pad templates named 'sink_%d' (mpegtsmux) */
mux_template =
gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
(splitmux->muxer), "sink_%d");
}
}
res = gst_element_request_pad (splitmux->muxer, mux_template, name, caps);
if (res == NULL)
goto fail;
if (is_video)
gname = g_strdup ("video");
else if (name == NULL)
gname = gst_pad_get_name (res);
else
gname = g_strdup (name);
if (!get_pads_from_mq (splitmux, &mq_sink, &mq_src)) {
gst_element_release_request_pad (splitmux->muxer, res);
gst_object_unref (GST_OBJECT (res));
goto fail;
}
if (gst_pad_link (mq_src, res) != GST_PAD_LINK_OK) {
gst_element_release_request_pad (splitmux->muxer, res);
gst_object_unref (GST_OBJECT (res));
gst_element_release_request_pad (splitmux->mq, mq_sink);
gst_object_unref (GST_OBJECT (mq_sink));
goto fail;
}
gst_object_unref (GST_OBJECT (res));
ctx = mq_stream_ctx_new (splitmux);
ctx->srcpad = mq_src;
ctx->sinkpad = mq_sink;
mq_stream_ctx_ref (ctx);
ctx->src_pad_block_id =
gst_pad_add_probe (mq_src, GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM,
(GstPadProbeCallback) handle_mq_output, ctx, (GDestroyNotify)
_pad_block_destroy_src_notify);
if (is_video && splitmux->reference_ctx != NULL) {
splitmux->reference_ctx->is_reference = FALSE;
splitmux->reference_ctx = NULL;
}
if (splitmux->reference_ctx == NULL) {
splitmux->reference_ctx = ctx;
ctx->is_reference = TRUE;
}
res = gst_ghost_pad_new_from_template (gname, mq_sink, templ);
g_object_set_qdata ((GObject *) (res), PAD_CONTEXT, ctx);
mq_stream_ctx_ref (ctx);
ctx->sink_pad_block_id =
gst_pad_add_probe (mq_sink, GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM,
(GstPadProbeCallback) handle_mq_input, ctx, (GDestroyNotify)
_pad_block_destroy_sink_notify);
GST_DEBUG_OBJECT (splitmux, "Request pad %" GST_PTR_FORMAT
" is mq pad %" GST_PTR_FORMAT, res, mq_sink);
splitmux->contexts = g_list_prepend (splitmux->contexts, ctx);
g_free (gname);
gst_object_unref (mq_sink);
gst_object_unref (mq_src);
if (is_video)
splitmux->have_video = TRUE;
gst_pad_set_active (res, TRUE);
gst_element_add_pad (element, res);
GST_SPLITMUX_UNLOCK (splitmux);
return res;
fail:
GST_SPLITMUX_UNLOCK (splitmux);
return NULL;
already_have_video:
GST_DEBUG_OBJECT (splitmux, "video sink pad already requested");
GST_SPLITMUX_UNLOCK (splitmux);
return NULL;
}
static void
gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad)
{
GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
GstPad *mqsink, *mqsrc = NULL, *muxpad = NULL;
MqStreamCtx *ctx =
(MqStreamCtx *) (g_object_get_qdata ((GObject *) (pad), PAD_CONTEXT));
GST_SPLITMUX_LOCK (splitmux);
if (splitmux->muxer == NULL || splitmux->mq == NULL)
goto fail; /* Elements don't exist yet - nothing to release */
GST_INFO_OBJECT (pad, "releasing request pad");
mqsink = gst_ghost_pad_get_target (GST_GHOST_PAD (pad));
/* The ghostpad target might have disappeared during pipeline destruct */
if (mqsink)
mqsrc = mq_sink_to_src (splitmux->mq, mqsink);
if (mqsrc)
muxpad = gst_pad_get_peer (mqsrc);
/* Remove the context from our consideration */
splitmux->contexts = g_list_remove (splitmux->contexts, ctx);
if (ctx->sink_pad_block_id)
gst_pad_remove_probe (ctx->sinkpad, ctx->sink_pad_block_id);
if (ctx->src_pad_block_id)
gst_pad_remove_probe (ctx->srcpad, ctx->src_pad_block_id);
/* Can release the context now */
mq_stream_ctx_unref (ctx);
if (ctx == splitmux->reference_ctx)
splitmux->reference_ctx = NULL;
/* Release and free the mq input */
if (mqsink) {
gst_element_release_request_pad (splitmux->mq, mqsink);
gst_object_unref (mqsink);
}
/* Release and free the muxer input */
if (muxpad) {
gst_element_release_request_pad (splitmux->muxer, muxpad);
gst_object_unref (muxpad);
}
if (mqsrc)
gst_object_unref (mqsrc);
if (GST_PAD_PAD_TEMPLATE (pad) &&
g_str_equal (GST_PAD_TEMPLATE_NAME_TEMPLATE (GST_PAD_PAD_TEMPLATE (pad)),
"video"))
splitmux->have_video = FALSE;
gst_element_remove_pad (element, pad);
/* Reset the internal elements only after all request pads are released */
if (splitmux->contexts == NULL)
gst_splitmux_reset (splitmux);
fail:
GST_SPLITMUX_UNLOCK (splitmux);
}
static GstElement *
create_element (GstSplitMuxSink * splitmux,
const gchar * factory, const gchar * name)
{
GstElement *ret = gst_element_factory_make (factory, name);
if (ret == NULL) {
g_warning ("Failed to create %s - splitmuxsink will not work", name);
return NULL;
}
if (!gst_bin_add (GST_BIN (splitmux), ret)) {
g_warning ("Could not add %s element - splitmuxsink will not work", name);
gst_object_unref (ret);
return NULL;
}
return ret;
}
static gboolean
create_elements (GstSplitMuxSink * splitmux)
{
/* Create internal elements */
if (splitmux->mq == NULL) {
if ((splitmux->mq =
create_element (splitmux, "multiqueue", "multiqueue")) == NULL)
goto fail;
splitmux->mq_max_buffers = 5;
/* No bytes or time limit, we limit buffers manually */
g_object_set (splitmux->mq, "max-size-bytes", 0, "max-size-time",
(guint64) 0, "max-size-buffers", splitmux->mq_max_buffers, NULL);
}
if (splitmux->muxer == NULL) {
GstElement *provided_muxer = NULL;
GST_OBJECT_LOCK (splitmux);
if (splitmux->provided_muxer != NULL)
provided_muxer = gst_object_ref (splitmux->provided_muxer);
GST_OBJECT_UNLOCK (splitmux);
if (provided_muxer == NULL) {
if ((splitmux->muxer =
create_element (splitmux, "mp4mux", "muxer")) == NULL)
goto fail;
} else {
/* Ensure it's not in locked state (we might be reusing an old element) */
gst_element_set_locked_state (provided_muxer, FALSE);
if (!gst_bin_add (GST_BIN (splitmux), provided_muxer)) {
g_warning ("Could not add muxer element - splitmuxsink will not work");
gst_object_unref (provided_muxer);
goto fail;
}
splitmux->muxer = provided_muxer;
gst_object_unref (provided_muxer);
}
}
return TRUE;
fail:
return FALSE;
}
static GstElement *
find_sink (GstElement * e)
{
GstElement *res = NULL;
GstIterator *iter;
gboolean done = FALSE;
GValue data = { 0, };
if (!GST_IS_BIN (e))
return e;
iter = gst_bin_iterate_sinks (GST_BIN (e));
while (!done) {
switch (gst_iterator_next (iter, &data)) {
case GST_ITERATOR_OK:
{
GstElement *child = g_value_get_object (&data);
if (g_object_class_find_property (G_OBJECT_GET_CLASS (child),
"location") != NULL) {
res = child;
done = TRUE;
}
g_value_reset (&data);
break;
}
case GST_ITERATOR_RESYNC:
gst_iterator_resync (iter);
break;
case GST_ITERATOR_DONE:
done = TRUE;
break;
case GST_ITERATOR_ERROR:
g_assert_not_reached ();
break;
}
}
g_value_unset (&data);
gst_iterator_free (iter);
return res;
}
static gboolean
create_sink (GstSplitMuxSink * splitmux)
{
GstElement *provided_sink = NULL;
if (splitmux->active_sink == NULL) {
GST_OBJECT_LOCK (splitmux);
if (splitmux->provided_sink != NULL)
provided_sink = gst_object_ref (splitmux->provided_sink);
GST_OBJECT_UNLOCK (splitmux);
if (provided_sink == NULL) {
if ((splitmux->sink =
create_element (splitmux, DEFAULT_SINK, "sink")) == NULL)
goto fail;
splitmux->active_sink = splitmux->sink;
} else {
/* Ensure it's not in locked state (we might be reusing an old element) */
gst_element_set_locked_state (provided_sink, FALSE);
if (!gst_bin_add (GST_BIN (splitmux), provided_sink)) {
g_warning ("Could not add sink elements - splitmuxsink will not work");
gst_object_unref (provided_sink);
goto fail;
}
splitmux->active_sink = provided_sink;
/* The bin holds a ref now, we can drop our tmp ref */
gst_object_unref (provided_sink);
/* Find the sink element */
splitmux->sink = find_sink (splitmux->active_sink);
if (splitmux->sink == NULL) {
g_warning
("Could not locate sink element in provided sink - splitmuxsink will not work");
goto fail;
}
}
if (!gst_element_link (splitmux->muxer, splitmux->active_sink)) {
g_warning ("Failed to link muxer and sink- splitmuxsink will not work");
goto fail;
}
}
return TRUE;
fail:
return FALSE;
}
#ifdef __GNUC__
#pragma GCC diagnostic ignored "-Wformat-nonliteral"
#endif
static void
set_next_filename (GstSplitMuxSink * splitmux)
{
gchar *fname = NULL;
gst_splitmux_sink_ensure_max_files (splitmux);
g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION], 0,
splitmux->fragment_id, &fname);
if (!fname)
fname = splitmux->location ?
g_strdup_printf (splitmux->location, splitmux->fragment_id) : NULL;
if (fname) {
GST_INFO_OBJECT (splitmux, "Setting file to %s", fname);
g_object_set (splitmux->sink, "location", fname, NULL);
g_free (fname);
splitmux->fragment_id++;
}
}
static GstStateChangeReturn
gst_splitmux_sink_change_state (GstElement * element, GstStateChange transition)
{
GstStateChangeReturn ret;
GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
switch (transition) {
case GST_STATE_CHANGE_NULL_TO_READY:{
GST_SPLITMUX_LOCK (splitmux);
if (!create_elements (splitmux) || !create_sink (splitmux)) {
ret = GST_STATE_CHANGE_FAILURE;
GST_SPLITMUX_UNLOCK (splitmux);
goto beach;
}
GST_SPLITMUX_UNLOCK (splitmux);
splitmux->fragment_id = 0;
set_next_filename (splitmux);
break;
}
case GST_STATE_CHANGE_READY_TO_PAUSED:{
GST_SPLITMUX_LOCK (splitmux);
/* Start by collecting one input on each pad */
splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START;
splitmux->max_in_running_time = GST_CLOCK_STIME_NONE;
splitmux->muxed_out_time = splitmux->mux_start_time =
splitmux->last_frame_duration = GST_CLOCK_STIME_NONE;
splitmux->muxed_out_bytes = splitmux->mux_start_bytes = 0;
splitmux->opening_first_fragment = TRUE;
GST_SPLITMUX_UNLOCK (splitmux);
break;
}
case GST_STATE_CHANGE_PAUSED_TO_READY:
case GST_STATE_CHANGE_READY_TO_NULL:
GST_SPLITMUX_LOCK (splitmux);
splitmux->state = SPLITMUX_STATE_STOPPED;
/* Wake up any blocked threads */
GST_LOG_OBJECT (splitmux,
"State change -> NULL or READY. Waking threads");
GST_SPLITMUX_BROADCAST (splitmux);
GST_SPLITMUX_UNLOCK (splitmux);
break;
default:
break;
}
ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
if (ret == GST_STATE_CHANGE_FAILURE)
goto beach;
switch (transition) {
case GST_STATE_CHANGE_READY_TO_NULL:
GST_SPLITMUX_LOCK (splitmux);
splitmux->fragment_id = 0;
/* Reset internal elements only if no pad contexts are using them */
if (splitmux->contexts == NULL)
gst_splitmux_reset (splitmux);
GST_SPLITMUX_UNLOCK (splitmux);
break;
default:
break;
}
beach:
if (transition == GST_STATE_CHANGE_NULL_TO_READY &&
ret == GST_STATE_CHANGE_FAILURE) {
/* Cleanup elements on failed transition out of NULL */
gst_splitmux_reset (splitmux);
}
return ret;
}
gboolean
register_splitmuxsink (GstPlugin * plugin)
{
GST_DEBUG_CATEGORY_INIT (splitmux_debug, "splitmuxsink", 0,
"Split File Muxing Sink");
return gst_element_register (plugin, "splitmuxsink", GST_RANK_NONE,
GST_TYPE_SPLITMUX_SINK);
}
static void
gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux)
{
if (splitmux->max_files && splitmux->fragment_id >= splitmux->max_files) {
splitmux->fragment_id = 0;
}
}