gstreamer/subprojects/gst-plugins-good/gst/multifile/gstsplitmuxsink.c
Sebastian Dröge 3044b0992f Revert "splitmuxsink: Avoid assertion when WAITING_GOP_COLLECT on reference context"
This reverts commit f29c19be58. If this is
called for the reference context then we would run into an infinite
loop, which is not really better than an assertion.

By fixing up DTS to never be ahead of the PTS in the previous commit
this situation should be impossible to hit now.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/4498>
2023-04-28 11:00:19 +00:00

4140 lines
142 KiB
C

/* GStreamer Muxer bin that splits output stream by size/time
* Copyright (C) <2014-2019> Jan Schmidt <jan@centricular.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
/**
* SECTION:element-splitmuxsink
* @title: splitmuxsink
* @short_description: Muxer wrapper for splitting output stream by size or time
*
* This element wraps a muxer and a sink, and starts a new file when the mux
* contents are about to cross a threshold of maximum size of maximum time,
* splitting at video keyframe boundaries. Exactly one input video stream
* can be muxed, with as many accompanying audio and subtitle streams as
* desired.
*
* By default, it uses mp4mux and filesink, but they can be changed via
* the 'muxer' and 'sink' properties.
*
* The minimum file size is 1 GOP, however - so limits may be overrun if the
* distance between any 2 keyframes is larger than the limits.
*
* If a video stream is available, the splitting process is driven by the video
* stream contents, and the video stream must contain closed GOPs for the output
* file parts to be played individually correctly. In the absence of a video
* stream, the first available stream is used as reference for synchronization.
*
* In the async-finalize mode, when the threshold is crossed, the old muxer
* and sink is disconnected from the pipeline and left to finish the file
* asynchronously, and a new muxer and sink is created to continue with the
* next fragment. For that reason, instead of muxer and sink objects, the
* muxer-factory and sink-factory properties are used to construct the new
* objects, together with muxer-properties and sink-properties.
*
* ## Example pipelines
* |[
* gst-launch-1.0 -e v4l2src num-buffers=500 ! video/x-raw,width=320,height=240 ! videoconvert ! queue ! timeoverlay ! x264enc key-int-max=10 ! h264parse ! splitmuxsink location=video%02d.mov max-size-time=10000000000 max-size-bytes=1000000
* ]|
* Records a video stream captured from a v4l2 device and muxes it into
* ISO mp4 files, splitting as needed to limit size/duration to 10 seconds
* and 1MB maximum size.
*
* |[
* gst-launch-1.0 -e v4l2src num-buffers=500 ! video/x-raw,width=320,height=240 ! videoconvert ! queue ! timeoverlay ! x264enc key-int-max=10 ! h264parse ! splitmuxsink location=video%02d.mkv max-size-time=10000000000 muxer-factory=matroskamux muxer-properties="properties,streamable=true"
* ]|
* Records a video stream captured from a v4l2 device and muxer it into
* streamable Matroska files, splitting as needed to limit size/duration to 10
* seconds. Each file will finalize asynchronously.
*
* |[
* gst-launch-1.0 videotestsrc num-buffers=10 ! jpegenc ! .video splitmuxsink muxer=qtmux muxer-pad-map=x-pad-map,video=video_1 location=test%05d.mp4 -v
* ]|
* Records 10 frames to an mp4 file, using a muxer-pad-map to make explicit mappings between the splitmuxsink sink pad and the corresponding muxer pad
* it will deliver to.
*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include <string.h>
#include <glib/gstdio.h>
#include <gst/video/video.h>
#include "gstsplitmuxsink.h"
GST_DEBUG_CATEGORY_STATIC (splitmux_debug);
#define GST_CAT_DEFAULT splitmux_debug
#define GST_SPLITMUX_STATE_LOCK(s) g_mutex_lock(&(s)->state_lock)
#define GST_SPLITMUX_STATE_UNLOCK(s) g_mutex_unlock(&(s)->state_lock)
#define GST_SPLITMUX_LOCK(s) g_mutex_lock(&(s)->lock)
#define GST_SPLITMUX_UNLOCK(s) g_mutex_unlock(&(s)->lock)
#define GST_SPLITMUX_WAIT_INPUT(s) g_cond_wait (&(s)->input_cond, &(s)->lock)
#define GST_SPLITMUX_BROADCAST_INPUT(s) g_cond_broadcast (&(s)->input_cond)
#define GST_SPLITMUX_WAIT_OUTPUT(s) g_cond_wait (&(s)->output_cond, &(s)->lock)
#define GST_SPLITMUX_BROADCAST_OUTPUT(s) g_cond_broadcast (&(s)->output_cond)
static void split_now (GstSplitMuxSink * splitmux);
static void split_after (GstSplitMuxSink * splitmux);
static void split_at_running_time (GstSplitMuxSink * splitmux,
GstClockTime split_time);
enum
{
PROP_0,
PROP_LOCATION,
PROP_START_INDEX,
PROP_MAX_SIZE_TIME,
PROP_MAX_SIZE_BYTES,
PROP_MAX_SIZE_TIMECODE,
PROP_SEND_KEYFRAME_REQUESTS,
PROP_MAX_FILES,
PROP_MUXER_OVERHEAD,
PROP_USE_ROBUST_MUXING,
PROP_ALIGNMENT_THRESHOLD,
PROP_MUXER,
PROP_SINK,
PROP_RESET_MUXER,
PROP_ASYNC_FINALIZE,
PROP_MUXER_FACTORY,
PROP_MUXER_PRESET,
PROP_MUXER_PROPERTIES,
PROP_SINK_FACTORY,
PROP_SINK_PRESET,
PROP_SINK_PROPERTIES,
PROP_MUXERPAD_MAP
};
#define DEFAULT_MAX_SIZE_TIME 0
#define DEFAULT_MAX_SIZE_BYTES 0
#define DEFAULT_MAX_FILES 0
#define DEFAULT_MUXER_OVERHEAD 0.02
#define DEFAULT_SEND_KEYFRAME_REQUESTS FALSE
#define DEFAULT_ALIGNMENT_THRESHOLD 0
#define DEFAULT_MUXER "mp4mux"
#define DEFAULT_SINK "filesink"
#define DEFAULT_USE_ROBUST_MUXING FALSE
#define DEFAULT_RESET_MUXER TRUE
#define DEFAULT_ASYNC_FINALIZE FALSE
#define DEFAULT_START_INDEX 0
typedef struct _AsyncEosHelper
{
MqStreamCtx *ctx;
GstPad *pad;
} AsyncEosHelper;
enum
{
SIGNAL_FORMAT_LOCATION,
SIGNAL_FORMAT_LOCATION_FULL,
SIGNAL_SPLIT_NOW,
SIGNAL_SPLIT_AFTER,
SIGNAL_SPLIT_AT_RUNNING_TIME,
SIGNAL_MUXER_ADDED,
SIGNAL_SINK_ADDED,
SIGNAL_LAST
};
static guint signals[SIGNAL_LAST];
static GstStaticPadTemplate video_sink_template =
GST_STATIC_PAD_TEMPLATE ("video",
GST_PAD_SINK,
GST_PAD_REQUEST,
GST_STATIC_CAPS_ANY);
static GstStaticPadTemplate video_aux_sink_template =
GST_STATIC_PAD_TEMPLATE ("video_aux_%u",
GST_PAD_SINK,
GST_PAD_REQUEST,
GST_STATIC_CAPS_ANY);
static GstStaticPadTemplate audio_sink_template =
GST_STATIC_PAD_TEMPLATE ("audio_%u",
GST_PAD_SINK,
GST_PAD_REQUEST,
GST_STATIC_CAPS_ANY);
static GstStaticPadTemplate subtitle_sink_template =
GST_STATIC_PAD_TEMPLATE ("subtitle_%u",
GST_PAD_SINK,
GST_PAD_REQUEST,
GST_STATIC_CAPS_ANY);
static GstStaticPadTemplate caption_sink_template =
GST_STATIC_PAD_TEMPLATE ("caption_%u",
GST_PAD_SINK,
GST_PAD_REQUEST,
GST_STATIC_CAPS_ANY);
static GQuark PAD_CONTEXT;
static GQuark EOS_FROM_US;
static GQuark RUNNING_TIME;
/* EOS_FROM_US is only valid in async-finalize mode. We need to know whether
* to forward an incoming EOS message, but we cannot rely on the state of the
* splitmux anymore, so we set this qdata on the sink instead.
* The muxer and sink must be destroyed after both of these things have
* finished:
* 1) The EOS message has been sent when the fragment is ending
* 2) The muxer has been unlinked and relinked
* Therefore, EOS_FROM_US can have these two values:
* 0: EOS was not requested from us. Forward the message. The muxer and the
* sink will be destroyed together with the rest of the bin.
* 1: EOS was requested from us, but the other of the two tasks hasn't
* finished. Set EOS_FROM_US to 2 and do your stuff.
* 2: EOS was requested from us and the other of the two tasks has finished.
* Now we can destroy the muxer and the sink.
*/
static void
_do_init (void)
{
PAD_CONTEXT = g_quark_from_static_string ("pad-context");
EOS_FROM_US = g_quark_from_static_string ("eos-from-us");
RUNNING_TIME = g_quark_from_static_string ("running-time");
GST_DEBUG_CATEGORY_INIT (splitmux_debug, "splitmuxsink", 0,
"Split File Muxing Sink");
}
#define gst_splitmux_sink_parent_class parent_class
G_DEFINE_TYPE_EXTENDED (GstSplitMuxSink, gst_splitmux_sink, GST_TYPE_BIN, 0,
_do_init ());
GST_ELEMENT_REGISTER_DEFINE (splitmuxsink, "splitmuxsink", GST_RANK_NONE,
GST_TYPE_SPLITMUX_SINK);
static gboolean create_muxer (GstSplitMuxSink * splitmux);
static gboolean create_sink (GstSplitMuxSink * splitmux);
static void gst_splitmux_sink_set_property (GObject * object, guint prop_id,
const GValue * value, GParamSpec * pspec);
static void gst_splitmux_sink_get_property (GObject * object, guint prop_id,
GValue * value, GParamSpec * pspec);
static void gst_splitmux_sink_dispose (GObject * object);
static void gst_splitmux_sink_finalize (GObject * object);
static GstPad *gst_splitmux_sink_request_new_pad (GstElement * element,
GstPadTemplate * templ, const gchar * name, const GstCaps * caps);
static void gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad);
static GstStateChangeReturn gst_splitmux_sink_change_state (GstElement *
element, GstStateChange transition);
static void bus_handler (GstBin * bin, GstMessage * msg);
static void set_next_filename (GstSplitMuxSink * splitmux, MqStreamCtx * ctx);
static GstFlowReturn start_next_fragment (GstSplitMuxSink * splitmux,
MqStreamCtx * ctx);
static void mq_stream_ctx_free (MqStreamCtx * ctx);
static void grow_blocked_queues (GstSplitMuxSink * splitmux);
static void gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux);
static GstElement *create_element (GstSplitMuxSink * splitmux,
const gchar * factory, const gchar * name, gboolean locked);
static void do_async_done (GstSplitMuxSink * splitmux);
static GstClockTime calculate_next_max_timecode (GstSplitMuxSink * splitmux,
const GstVideoTimeCode * cur_tc, GstClockTime running_time,
GstVideoTimeCode ** next_tc);
static MqStreamBuf *
mq_stream_buf_new (void)
{
return g_new0 (MqStreamBuf, 1);
}
static void
mq_stream_buf_free (MqStreamBuf * data)
{
g_free (data);
}
static SplitMuxOutputCommand *
out_cmd_buf_new (void)
{
return g_new0 (SplitMuxOutputCommand, 1);
}
static void
out_cmd_buf_free (SplitMuxOutputCommand * data)
{
g_free (data);
}
static void
input_gop_free (InputGop * gop)
{
g_clear_pointer (&gop->start_tc, gst_video_time_code_free);
g_free (gop);
}
static void
gst_splitmux_sink_class_init (GstSplitMuxSinkClass * klass)
{
GObjectClass *gobject_class = (GObjectClass *) klass;
GstElementClass *gstelement_class = (GstElementClass *) klass;
GstBinClass *gstbin_class = (GstBinClass *) klass;
gobject_class->set_property = gst_splitmux_sink_set_property;
gobject_class->get_property = gst_splitmux_sink_get_property;
gobject_class->dispose = gst_splitmux_sink_dispose;
gobject_class->finalize = gst_splitmux_sink_finalize;
gst_element_class_set_static_metadata (gstelement_class,
"Split Muxing Bin", "Generic/Bin/Muxer",
"Convenience bin that muxes incoming streams into multiple time/size limited files",
"Jan Schmidt <jan@centricular.com>");
gst_element_class_add_static_pad_template (gstelement_class,
&video_sink_template);
gst_element_class_add_static_pad_template (gstelement_class,
&video_aux_sink_template);
gst_element_class_add_static_pad_template (gstelement_class,
&audio_sink_template);
gst_element_class_add_static_pad_template (gstelement_class,
&subtitle_sink_template);
gst_element_class_add_static_pad_template (gstelement_class,
&caption_sink_template);
gstelement_class->change_state =
GST_DEBUG_FUNCPTR (gst_splitmux_sink_change_state);
gstelement_class->request_new_pad =
GST_DEBUG_FUNCPTR (gst_splitmux_sink_request_new_pad);
gstelement_class->release_pad =
GST_DEBUG_FUNCPTR (gst_splitmux_sink_release_pad);
gstbin_class->handle_message = bus_handler;
g_object_class_install_property (gobject_class, PROP_LOCATION,
g_param_spec_string ("location", "File Output Pattern",
"Format string pattern for the location of the files to write (e.g. video%05d.mp4)",
NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_MUXER_OVERHEAD,
g_param_spec_double ("mux-overhead", "Muxing Overhead",
"Extra size overhead of muxing (0.02 = 2%)", 0.0, 1.0,
DEFAULT_MUXER_OVERHEAD,
G_PARAM_READWRITE | GST_PARAM_CONTROLLABLE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
"Max. amount of time per file (in ns, 0=disable)", 0, G_MAXUINT64,
DEFAULT_MAX_SIZE_TIME,
G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BYTES,
g_param_spec_uint64 ("max-size-bytes", "Max. size bytes",
"Max. amount of data per file (in bytes, 0=disable)", 0, G_MAXUINT64,
DEFAULT_MAX_SIZE_BYTES,
G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIMECODE,
g_param_spec_string ("max-size-timecode", "Maximum timecode difference",
"Maximum difference in timecode between first and last frame. "
"Separator is assumed to be \":\" everywhere (e.g. 01:00:00:00). "
"Will only be effective if a timecode track is present.", NULL,
G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_SEND_KEYFRAME_REQUESTS,
g_param_spec_boolean ("send-keyframe-requests",
"Request keyframes at max-size-time",
"Request a keyframe every max-size-time ns to try splitting at that point. "
"Needs max-size-bytes to be 0 in order to be effective.",
DEFAULT_SEND_KEYFRAME_REQUESTS,
G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_MAX_FILES,
g_param_spec_uint ("max-files", "Max files",
"Maximum number of files to keep on disk. Once the maximum is reached,"
"old files start to be deleted to make room for new ones.", 0,
G_MAXUINT, DEFAULT_MAX_FILES,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_ALIGNMENT_THRESHOLD,
g_param_spec_uint64 ("alignment-threshold", "Alignment threshold (ns)",
"Allow non-reference streams to be that many ns before the reference"
" stream", 0, G_MAXUINT64, DEFAULT_ALIGNMENT_THRESHOLD,
G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_MUXER,
g_param_spec_object ("muxer", "Muxer",
"The muxer element to use (NULL = default mp4mux). "
"Valid only for async-finalize = FALSE",
GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_SINK,
g_param_spec_object ("sink", "Sink",
"The sink element (or element chain) to use (NULL = default filesink). "
"Valid only for async-finalize = FALSE",
GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_USE_ROBUST_MUXING,
g_param_spec_boolean ("use-robust-muxing",
"Support robust-muxing mode of some muxers",
"Check if muxers support robust muxing via the reserved-max-duration and "
"reserved-duration-remaining properties and use them if so. "
"(Only present on qtmux and mp4mux for now). splitmuxsink may then also "
" create new fragments if the reserved header space is about to overflow. "
"Note that for mp4mux and qtmux, reserved-moov-update-period must be set "
"manually by the app to a non-zero value for robust muxing to have an effect.",
DEFAULT_USE_ROBUST_MUXING,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_RESET_MUXER,
g_param_spec_boolean ("reset-muxer",
"Reset Muxer",
"Reset the muxer after each segment. Disabling this will not work for most muxers.",
DEFAULT_RESET_MUXER, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_ASYNC_FINALIZE,
g_param_spec_boolean ("async-finalize",
"Finalize fragments asynchronously",
"Finalize each fragment asynchronously and start a new one",
DEFAULT_ASYNC_FINALIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_MUXER_FACTORY,
g_param_spec_string ("muxer-factory", "Muxer factory",
"The muxer element factory to use (default = mp4mux). "
"Valid only for async-finalize = TRUE",
"mp4mux", G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
/**
* GstSplitMuxSink:muxer-preset
*
* An optional #GstPreset name to use for the muxer. This only has an effect
* in `async-finalize=TRUE` mode.
*
* Since: 1.18
*/
g_object_class_install_property (gobject_class, PROP_MUXER_PRESET,
g_param_spec_string ("muxer-preset", "Muxer preset",
"The muxer preset to use. "
"Valid only for async-finalize = TRUE",
NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_MUXER_PROPERTIES,
g_param_spec_boxed ("muxer-properties", "Muxer properties",
"The muxer element properties to use. "
"Example: {properties,boolean-prop=true,string-prop=\"hi\"}. "
"Valid only for async-finalize = TRUE",
GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_SINK_FACTORY,
g_param_spec_string ("sink-factory", "Sink factory",
"The sink element factory to use (default = filesink). "
"Valid only for async-finalize = TRUE",
"filesink", G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
/**
* GstSplitMuxSink:sink-preset
*
* An optional #GstPreset name to use for the sink. This only has an effect
* in `async-finalize=TRUE` mode.
*
* Since: 1.18
*/
g_object_class_install_property (gobject_class, PROP_SINK_PRESET,
g_param_spec_string ("sink-preset", "Sink preset",
"The sink preset to use. "
"Valid only for async-finalize = TRUE",
NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_SINK_PROPERTIES,
g_param_spec_boxed ("sink-properties", "Sink properties",
"The sink element properties to use. "
"Example: {properties,boolean-prop=true,string-prop=\"hi\"}. "
"Valid only for async-finalize = TRUE",
GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_START_INDEX,
g_param_spec_int ("start-index", "Start Index",
"Start value of fragment index.",
0, G_MAXINT, DEFAULT_START_INDEX,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
/**
* GstSplitMuxSink::muxer-pad-map
*
* An optional GstStructure that provides a map from splitmuxsink sinkpad
* names to muxer pad names they should feed. Splitmuxsink has some default
* mapping behaviour to link video to video pads and audio to audio pads
* that usually works fine. This property is useful if you need to ensure
* a particular mapping to muxed streams.
*
* The GstStructure contains string fields like so:
* splitmuxsink muxer-pad-map=x-pad-map,video=video_1
*
* Since: 1.18
*/
g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_MUXERPAD_MAP,
g_param_spec_boxed ("muxer-pad-map", "Muxer pad map",
"A GstStructure specifies the mapping from splitmuxsink sink pads to muxer pads",
GST_TYPE_STRUCTURE,
(GParamFlags) (G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
/**
* GstSplitMuxSink::format-location:
* @splitmux: the #GstSplitMuxSink
* @fragment_id: the sequence number of the file to be created
*
* Returns: the location to be used for the next output file. This must be
* a newly-allocated string which will be freed with g_free() by the
* splitmuxsink element when it no longer needs it, so use g_strdup() or
* g_strdup_printf() or similar functions to allocate it.
*/
signals[SIGNAL_FORMAT_LOCATION] =
g_signal_new ("format-location", G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_STRING, 1, G_TYPE_UINT);
/**
* GstSplitMuxSink::format-location-full:
* @splitmux: the #GstSplitMuxSink
* @fragment_id: the sequence number of the file to be created
* @first_sample: A #GstSample containing the first buffer
* from the reference stream in the new file
*
* Returns: the location to be used for the next output file. This must be
* a newly-allocated string which will be freed with g_free() by the
* splitmuxsink element when it no longer needs it, so use g_strdup() or
* g_strdup_printf() or similar functions to allocate it.
*
* Since: 1.12
*/
signals[SIGNAL_FORMAT_LOCATION_FULL] =
g_signal_new ("format-location-full", G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_STRING, 2, G_TYPE_UINT,
GST_TYPE_SAMPLE);
/**
* GstSplitMuxSink::split-now:
* @splitmux: the #GstSplitMuxSink
*
* When called by the user, this action signal splits the video file (and begins a new one) immediately.
* The current GOP will be output to the new file.
*
* Since: 1.14
*/
signals[SIGNAL_SPLIT_NOW] =
g_signal_new ("split-now", G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
G_STRUCT_OFFSET (GstSplitMuxSinkClass, split_now), NULL, NULL, NULL,
G_TYPE_NONE, 0);
/**
* GstSplitMuxSink::split-after:
* @splitmux: the #GstSplitMuxSink
*
* When called by the user, this action signal splits the video file (and begins a new one) immediately.
* Unlike the 'split-now' signal, with 'split-after', the current GOP will be output to the old file.
*
* Since: 1.16
*/
signals[SIGNAL_SPLIT_AFTER] =
g_signal_new ("split-after", G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
G_STRUCT_OFFSET (GstSplitMuxSinkClass, split_after), NULL, NULL, NULL,
G_TYPE_NONE, 0);
/**
* GstSplitMuxSink::split-at-running-time:
* @splitmux: the #GstSplitMuxSink
*
* When called by the user, this action signal splits the video file (and
* begins a new one) as soon as the given running time is reached. If this
* action signal is called multiple times, running times are queued up and
* processed in the order they were given.
*
* Note that this is prone to race conditions, where said running time is
* reached and surpassed before we had a chance to split. The file will
* still split immediately, but in order to make sure that the split doesn't
* happen too late, it is recommended to call this action signal from
* something that will prevent further buffers from flowing into
* splitmuxsink before the split is completed, such as a pad probe before
* splitmuxsink.
*
*
* Since: 1.16
*/
signals[SIGNAL_SPLIT_AT_RUNNING_TIME] =
g_signal_new ("split-at-running-time", G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
G_STRUCT_OFFSET (GstSplitMuxSinkClass, split_at_running_time), NULL, NULL,
NULL, G_TYPE_NONE, 1, G_TYPE_UINT64);
/**
* GstSplitMuxSink::muxer-added:
* @splitmux: the #GstSplitMuxSink
* @muxer: the newly added muxer element
*
* Since: 1.16
*/
signals[SIGNAL_MUXER_ADDED] =
g_signal_new ("muxer-added", G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
/**
* GstSplitMuxSink::sink-added:
* @splitmux: the #GstSplitMuxSink
* @sink: the newly added sink element
*
* Since: 1.16
*/
signals[SIGNAL_SINK_ADDED] =
g_signal_new ("sink-added", G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
klass->split_now = split_now;
klass->split_after = split_after;
klass->split_at_running_time = split_at_running_time;
}
static void
gst_splitmux_sink_init (GstSplitMuxSink * splitmux)
{
g_mutex_init (&splitmux->lock);
g_mutex_init (&splitmux->state_lock);
g_cond_init (&splitmux->input_cond);
g_cond_init (&splitmux->output_cond);
g_queue_init (&splitmux->out_cmd_q);
splitmux->mux_overhead = DEFAULT_MUXER_OVERHEAD;
splitmux->threshold_time = DEFAULT_MAX_SIZE_TIME;
splitmux->threshold_bytes = DEFAULT_MAX_SIZE_BYTES;
splitmux->max_files = DEFAULT_MAX_FILES;
splitmux->send_keyframe_requests = DEFAULT_SEND_KEYFRAME_REQUESTS;
splitmux->alignment_threshold = DEFAULT_ALIGNMENT_THRESHOLD;
splitmux->use_robust_muxing = DEFAULT_USE_ROBUST_MUXING;
splitmux->reset_muxer = DEFAULT_RESET_MUXER;
splitmux->threshold_timecode_str = NULL;
splitmux->async_finalize = DEFAULT_ASYNC_FINALIZE;
splitmux->muxer_factory = g_strdup (DEFAULT_MUXER);
splitmux->muxer_properties = NULL;
splitmux->sink_factory = g_strdup (DEFAULT_SINK);
splitmux->sink_properties = NULL;
GST_OBJECT_FLAG_SET (splitmux, GST_ELEMENT_FLAG_SINK);
splitmux->split_requested = FALSE;
splitmux->do_split_next_gop = FALSE;
splitmux->times_to_split = gst_queue_array_new_for_struct (8, 8);
splitmux->next_fku_time = GST_CLOCK_TIME_NONE;
g_queue_init (&splitmux->pending_input_gops);
}
static void
gst_splitmux_reset_elements (GstSplitMuxSink * splitmux)
{
if (splitmux->muxer) {
gst_element_set_locked_state (splitmux->muxer, TRUE);
gst_element_set_state (splitmux->muxer, GST_STATE_NULL);
gst_bin_remove (GST_BIN (splitmux), splitmux->muxer);
}
if (splitmux->active_sink) {
gst_element_set_locked_state (splitmux->active_sink, TRUE);
gst_element_set_state (splitmux->active_sink, GST_STATE_NULL);
gst_bin_remove (GST_BIN (splitmux), splitmux->active_sink);
}
splitmux->sink = splitmux->active_sink = splitmux->muxer = NULL;
}
static void
gst_splitmux_sink_dispose (GObject * object)
{
GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
/* Calling parent dispose invalidates all child pointers */
splitmux->sink = splitmux->active_sink = splitmux->muxer = NULL;
G_OBJECT_CLASS (parent_class)->dispose (object);
}
static void
gst_splitmux_sink_finalize (GObject * object)
{
GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
g_cond_clear (&splitmux->input_cond);
g_cond_clear (&splitmux->output_cond);
g_mutex_clear (&splitmux->lock);
g_mutex_clear (&splitmux->state_lock);
g_queue_foreach (&splitmux->out_cmd_q, (GFunc) out_cmd_buf_free, NULL);
g_queue_clear (&splitmux->out_cmd_q);
g_queue_foreach (&splitmux->pending_input_gops, (GFunc) input_gop_free, NULL);
g_queue_clear (&splitmux->pending_input_gops);
g_clear_pointer (&splitmux->fragment_start_tc, gst_video_time_code_free);
if (splitmux->muxerpad_map)
gst_structure_free (splitmux->muxerpad_map);
if (splitmux->provided_sink)
gst_object_unref (splitmux->provided_sink);
if (splitmux->provided_muxer)
gst_object_unref (splitmux->provided_muxer);
if (splitmux->muxer_factory)
g_free (splitmux->muxer_factory);
if (splitmux->muxer_preset)
g_free (splitmux->muxer_preset);
if (splitmux->muxer_properties)
gst_structure_free (splitmux->muxer_properties);
if (splitmux->sink_factory)
g_free (splitmux->sink_factory);
if (splitmux->sink_preset)
g_free (splitmux->sink_preset);
if (splitmux->sink_properties)
gst_structure_free (splitmux->sink_properties);
if (splitmux->threshold_timecode_str)
g_free (splitmux->threshold_timecode_str);
if (splitmux->tc_interval)
gst_video_time_code_interval_free (splitmux->tc_interval);
if (splitmux->times_to_split)
gst_queue_array_free (splitmux->times_to_split);
g_free (splitmux->location);
/* Make sure to free any un-released contexts. There should not be any,
* because the dispose will have freed all request pads though */
g_list_foreach (splitmux->contexts, (GFunc) mq_stream_ctx_free, NULL);
g_list_free (splitmux->contexts);
G_OBJECT_CLASS (parent_class)->finalize (object);
}
/*
* Set any time threshold to the muxer, if it has
* reserved-max-duration and reserved-duration-remaining
* properties. Called when creating/claiming the muxer
* in create_elements() */
static void
update_muxer_properties (GstSplitMuxSink * sink)
{
GObjectClass *klass;
GstClockTime threshold_time;
sink->muxer_has_reserved_props = FALSE;
if (sink->muxer == NULL)
return;
klass = G_OBJECT_GET_CLASS (sink->muxer);
if (g_object_class_find_property (klass, "reserved-max-duration") == NULL)
return;
if (g_object_class_find_property (klass,
"reserved-duration-remaining") == NULL)
return;
sink->muxer_has_reserved_props = TRUE;
GST_LOG_OBJECT (sink, "Setting muxer reserved time to %" GST_TIME_FORMAT,
GST_TIME_ARGS (sink->threshold_time));
GST_OBJECT_LOCK (sink);
threshold_time = sink->threshold_time;
GST_OBJECT_UNLOCK (sink);
if (threshold_time > 0) {
/* Tell the muxer how much space to reserve */
GstClockTime muxer_threshold = threshold_time;
g_object_set (sink->muxer, "reserved-max-duration", muxer_threshold, NULL);
}
}
static void
gst_splitmux_sink_set_property (GObject * object, guint prop_id,
const GValue * value, GParamSpec * pspec)
{
GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
switch (prop_id) {
case PROP_LOCATION:{
GST_OBJECT_LOCK (splitmux);
g_free (splitmux->location);
splitmux->location = g_value_dup_string (value);
GST_OBJECT_UNLOCK (splitmux);
break;
}
case PROP_START_INDEX:
GST_OBJECT_LOCK (splitmux);
splitmux->start_index = g_value_get_int (value);
GST_OBJECT_UNLOCK (splitmux);
break;
case PROP_MAX_SIZE_BYTES:
GST_OBJECT_LOCK (splitmux);
splitmux->threshold_bytes = g_value_get_uint64 (value);
GST_OBJECT_UNLOCK (splitmux);
break;
case PROP_MAX_SIZE_TIME:
GST_OBJECT_LOCK (splitmux);
splitmux->threshold_time = g_value_get_uint64 (value);
GST_OBJECT_UNLOCK (splitmux);
break;
case PROP_MAX_SIZE_TIMECODE:
GST_OBJECT_LOCK (splitmux);
g_free (splitmux->threshold_timecode_str);
/* will be calculated later */
g_clear_pointer (&splitmux->tc_interval,
gst_video_time_code_interval_free);
splitmux->threshold_timecode_str = g_value_dup_string (value);
if (splitmux->threshold_timecode_str) {
splitmux->tc_interval =
gst_video_time_code_interval_new_from_string
(splitmux->threshold_timecode_str);
if (!splitmux->tc_interval) {
g_warning ("Wrong timecode string %s",
splitmux->threshold_timecode_str);
g_free (splitmux->threshold_timecode_str);
splitmux->threshold_timecode_str = NULL;
}
}
splitmux->next_fragment_start_tc_time =
calculate_next_max_timecode (splitmux, splitmux->fragment_start_tc,
splitmux->fragment_start_time, NULL);
if (splitmux->tc_interval && splitmux->fragment_start_tc
&& !GST_CLOCK_TIME_IS_VALID (splitmux->next_fragment_start_tc_time)) {
GST_WARNING_OBJECT (splitmux,
"Couldn't calculate next fragment start time for timecode mode");
}
GST_OBJECT_UNLOCK (splitmux);
break;
case PROP_SEND_KEYFRAME_REQUESTS:
GST_OBJECT_LOCK (splitmux);
splitmux->send_keyframe_requests = g_value_get_boolean (value);
GST_OBJECT_UNLOCK (splitmux);
break;
case PROP_MAX_FILES:
GST_OBJECT_LOCK (splitmux);
splitmux->max_files = g_value_get_uint (value);
GST_OBJECT_UNLOCK (splitmux);
break;
case PROP_MUXER_OVERHEAD:
GST_OBJECT_LOCK (splitmux);
splitmux->mux_overhead = g_value_get_double (value);
GST_OBJECT_UNLOCK (splitmux);
break;
case PROP_USE_ROBUST_MUXING:
GST_OBJECT_LOCK (splitmux);
splitmux->use_robust_muxing = g_value_get_boolean (value);
GST_OBJECT_UNLOCK (splitmux);
if (splitmux->use_robust_muxing)
update_muxer_properties (splitmux);
break;
case PROP_ALIGNMENT_THRESHOLD:
GST_OBJECT_LOCK (splitmux);
splitmux->alignment_threshold = g_value_get_uint64 (value);
GST_OBJECT_UNLOCK (splitmux);
break;
case PROP_SINK:
GST_OBJECT_LOCK (splitmux);
gst_clear_object (&splitmux->provided_sink);
splitmux->provided_sink = g_value_get_object (value);
if (splitmux->provided_sink)
gst_object_ref_sink (splitmux->provided_sink);
GST_OBJECT_UNLOCK (splitmux);
break;
case PROP_MUXER:
GST_OBJECT_LOCK (splitmux);
gst_clear_object (&splitmux->provided_muxer);
splitmux->provided_muxer = g_value_get_object (value);
if (splitmux->provided_muxer)
gst_object_ref_sink (splitmux->provided_muxer);
GST_OBJECT_UNLOCK (splitmux);
break;
case PROP_RESET_MUXER:
GST_OBJECT_LOCK (splitmux);
splitmux->reset_muxer = g_value_get_boolean (value);
GST_OBJECT_UNLOCK (splitmux);
break;
case PROP_ASYNC_FINALIZE:
GST_OBJECT_LOCK (splitmux);
splitmux->async_finalize = g_value_get_boolean (value);
GST_OBJECT_UNLOCK (splitmux);
break;
case PROP_MUXER_FACTORY:
GST_OBJECT_LOCK (splitmux);
if (splitmux->muxer_factory)
g_free (splitmux->muxer_factory);
splitmux->muxer_factory = g_value_dup_string (value);
GST_OBJECT_UNLOCK (splitmux);
break;
case PROP_MUXER_PRESET:
GST_OBJECT_LOCK (splitmux);
if (splitmux->muxer_preset)
g_free (splitmux->muxer_preset);
splitmux->muxer_preset = g_value_dup_string (value);
GST_OBJECT_UNLOCK (splitmux);
break;
case PROP_MUXER_PROPERTIES:
GST_OBJECT_LOCK (splitmux);
if (splitmux->muxer_properties)
gst_structure_free (splitmux->muxer_properties);
if (gst_value_get_structure (value))
splitmux->muxer_properties =
gst_structure_copy (gst_value_get_structure (value));
else
splitmux->muxer_properties = NULL;
GST_OBJECT_UNLOCK (splitmux);
break;
case PROP_SINK_FACTORY:
GST_OBJECT_LOCK (splitmux);
if (splitmux->sink_factory)
g_free (splitmux->sink_factory);
splitmux->sink_factory = g_value_dup_string (value);
GST_OBJECT_UNLOCK (splitmux);
break;
case PROP_SINK_PRESET:
GST_OBJECT_LOCK (splitmux);
if (splitmux->sink_preset)
g_free (splitmux->sink_preset);
splitmux->sink_preset = g_value_dup_string (value);
GST_OBJECT_UNLOCK (splitmux);
break;
case PROP_SINK_PROPERTIES:
GST_OBJECT_LOCK (splitmux);
if (splitmux->sink_properties)
gst_structure_free (splitmux->sink_properties);
if (gst_value_get_structure (value))
splitmux->sink_properties =
gst_structure_copy (gst_value_get_structure (value));
else
splitmux->sink_properties = NULL;
GST_OBJECT_UNLOCK (splitmux);
break;
case PROP_MUXERPAD_MAP:
{
const GstStructure *s = gst_value_get_structure (value);
GST_SPLITMUX_LOCK (splitmux);
if (splitmux->muxerpad_map) {
gst_structure_free (splitmux->muxerpad_map);
}
if (s)
splitmux->muxerpad_map = gst_structure_copy (s);
else
splitmux->muxerpad_map = NULL;
GST_SPLITMUX_UNLOCK (splitmux);
break;
}
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
}
}
static void
gst_splitmux_sink_get_property (GObject * object, guint prop_id,
GValue * value, GParamSpec * pspec)
{
GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
switch (prop_id) {
case PROP_LOCATION:
GST_OBJECT_LOCK (splitmux);
g_value_set_string (value, splitmux->location);
GST_OBJECT_UNLOCK (splitmux);
break;
case PROP_START_INDEX:
GST_OBJECT_LOCK (splitmux);
g_value_set_int (value, splitmux->start_index);
GST_OBJECT_UNLOCK (splitmux);
break;
case PROP_MAX_SIZE_BYTES:
GST_OBJECT_LOCK (splitmux);
g_value_set_uint64 (value, splitmux->threshold_bytes);
GST_OBJECT_UNLOCK (splitmux);
break;
case PROP_MAX_SIZE_TIME:
GST_OBJECT_LOCK (splitmux);
g_value_set_uint64 (value, splitmux->threshold_time);
GST_OBJECT_UNLOCK (splitmux);
break;
case PROP_MAX_SIZE_TIMECODE:
GST_OBJECT_LOCK (splitmux);
g_value_set_string (value, splitmux->threshold_timecode_str);
GST_OBJECT_UNLOCK (splitmux);
break;
case PROP_SEND_KEYFRAME_REQUESTS:
GST_OBJECT_LOCK (splitmux);
g_value_set_boolean (value, splitmux->send_keyframe_requests);
GST_OBJECT_UNLOCK (splitmux);
break;
case PROP_MAX_FILES:
GST_OBJECT_LOCK (splitmux);
g_value_set_uint (value, splitmux->max_files);
GST_OBJECT_UNLOCK (splitmux);
break;
case PROP_MUXER_OVERHEAD:
GST_OBJECT_LOCK (splitmux);
g_value_set_double (value, splitmux->mux_overhead);
GST_OBJECT_UNLOCK (splitmux);
break;
case PROP_USE_ROBUST_MUXING:
GST_OBJECT_LOCK (splitmux);
g_value_set_boolean (value, splitmux->use_robust_muxing);
GST_OBJECT_UNLOCK (splitmux);
break;
case PROP_ALIGNMENT_THRESHOLD:
GST_OBJECT_LOCK (splitmux);
g_value_set_uint64 (value, splitmux->alignment_threshold);
GST_OBJECT_UNLOCK (splitmux);
break;
case PROP_SINK:
GST_OBJECT_LOCK (splitmux);
g_value_set_object (value, splitmux->provided_sink);
GST_OBJECT_UNLOCK (splitmux);
break;
case PROP_MUXER:
GST_OBJECT_LOCK (splitmux);
g_value_set_object (value, splitmux->provided_muxer);
GST_OBJECT_UNLOCK (splitmux);
break;
case PROP_RESET_MUXER:
GST_OBJECT_LOCK (splitmux);
g_value_set_boolean (value, splitmux->reset_muxer);
GST_OBJECT_UNLOCK (splitmux);
break;
case PROP_ASYNC_FINALIZE:
GST_OBJECT_LOCK (splitmux);
g_value_set_boolean (value, splitmux->async_finalize);
GST_OBJECT_UNLOCK (splitmux);
break;
case PROP_MUXER_FACTORY:
GST_OBJECT_LOCK (splitmux);
g_value_set_string (value, splitmux->muxer_factory);
GST_OBJECT_UNLOCK (splitmux);
break;
case PROP_MUXER_PRESET:
GST_OBJECT_LOCK (splitmux);
g_value_set_string (value, splitmux->muxer_preset);
GST_OBJECT_UNLOCK (splitmux);
break;
case PROP_MUXER_PROPERTIES:
GST_OBJECT_LOCK (splitmux);
gst_value_set_structure (value, splitmux->muxer_properties);
GST_OBJECT_UNLOCK (splitmux);
break;
case PROP_SINK_FACTORY:
GST_OBJECT_LOCK (splitmux);
g_value_set_string (value, splitmux->sink_factory);
GST_OBJECT_UNLOCK (splitmux);
break;
case PROP_SINK_PRESET:
GST_OBJECT_LOCK (splitmux);
g_value_set_string (value, splitmux->sink_preset);
GST_OBJECT_UNLOCK (splitmux);
break;
case PROP_SINK_PROPERTIES:
GST_OBJECT_LOCK (splitmux);
gst_value_set_structure (value, splitmux->sink_properties);
GST_OBJECT_UNLOCK (splitmux);
break;
case PROP_MUXERPAD_MAP:
GST_SPLITMUX_LOCK (splitmux);
gst_value_set_structure (value, splitmux->muxerpad_map);
GST_SPLITMUX_UNLOCK (splitmux);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
}
}
/* Convenience function */
static inline GstClockTimeDiff
my_segment_to_running_time (GstSegment * segment, GstClockTime val)
{
GstClockTimeDiff res = GST_CLOCK_STIME_NONE;
if (GST_CLOCK_TIME_IS_VALID (val)) {
gboolean sign =
gst_segment_to_running_time_full (segment, GST_FORMAT_TIME, val, &val);
if (sign > 0)
res = val;
else if (sign < 0)
res = -val;
}
return res;
}
static void
mq_stream_ctx_reset (MqStreamCtx * ctx)
{
gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
ctx->in_running_time = ctx->out_running_time = GST_CLOCK_STIME_NONE;
g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
g_queue_clear (&ctx->queued_bufs);
}
static MqStreamCtx *
mq_stream_ctx_new (GstSplitMuxSink * splitmux)
{
MqStreamCtx *ctx;
ctx = g_new0 (MqStreamCtx, 1);
ctx->splitmux = splitmux;
g_queue_init (&ctx->queued_bufs);
mq_stream_ctx_reset (ctx);
return ctx;
}
static void
mq_stream_ctx_free (MqStreamCtx * ctx)
{
if (ctx->q) {
GstObject *parent = gst_object_get_parent (GST_OBJECT (ctx->q));
g_signal_handler_disconnect (ctx->q, ctx->q_overrun_id);
if (parent == GST_OBJECT_CAST (ctx->splitmux)) {
gst_element_set_locked_state (ctx->q, TRUE);
gst_element_set_state (ctx->q, GST_STATE_NULL);
gst_bin_remove (GST_BIN (ctx->splitmux), ctx->q);
gst_object_unref (parent);
}
gst_object_unref (ctx->q);
}
gst_object_unref (ctx->sinkpad);
gst_object_unref (ctx->srcpad);
g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
g_queue_clear (&ctx->queued_bufs);
g_free (ctx);
}
static void
send_fragment_opened_closed_msg (GstSplitMuxSink * splitmux, gboolean opened,
GstElement * sink)
{
gchar *location = NULL;
GstMessage *msg;
const gchar *msg_name = opened ?
"splitmuxsink-fragment-opened" : "splitmuxsink-fragment-closed";
GstClockTime running_time = splitmux->reference_ctx->out_running_time;
if (!opened) {
GstClockTime *rtime = g_object_get_qdata (G_OBJECT (sink), RUNNING_TIME);
if (rtime)
running_time = *rtime;
}
if (g_object_class_find_property (G_OBJECT_GET_CLASS (sink),
"location") != NULL)
g_object_get (sink, "location", &location, NULL);
GST_DEBUG_OBJECT (splitmux,
"Sending %s message. Running time %" GST_TIME_FORMAT " location %s",
msg_name, GST_TIME_ARGS (running_time), GST_STR_NULL (location));
/* If it's in the middle of a teardown, the reference_ctc might have become
* NULL */
if (splitmux->reference_ctx) {
msg = gst_message_new_element (GST_OBJECT (splitmux),
gst_structure_new (msg_name,
"location", G_TYPE_STRING, location,
"running-time", GST_TYPE_CLOCK_TIME, running_time,
"sink", GST_TYPE_ELEMENT, sink, NULL));
gst_element_post_message (GST_ELEMENT_CAST (splitmux), msg);
}
g_free (location);
}
static void
send_eos_async (GstSplitMuxSink * splitmux, AsyncEosHelper * helper)
{
GstEvent *eos;
GstPad *pad;
MqStreamCtx *ctx;
eos = gst_event_new_eos ();
pad = helper->pad;
ctx = helper->ctx;
GST_SPLITMUX_LOCK (splitmux);
if (!pad)
pad = gst_pad_get_peer (ctx->srcpad);
GST_SPLITMUX_UNLOCK (splitmux);
gst_pad_send_event (pad, eos);
GST_INFO_OBJECT (splitmux, "Sent async EOS on %" GST_PTR_FORMAT, pad);
gst_object_unref (pad);
g_free (helper);
}
/* Called with lock held, drops the lock to send EOS to the
* pad
*/
static void
send_eos (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
{
GstEvent *eos;
GstPad *pad;
eos = gst_event_new_eos ();
pad = gst_pad_get_peer (ctx->srcpad);
ctx->out_eos = TRUE;
GST_INFO_OBJECT (splitmux, "Sending EOS on %" GST_PTR_FORMAT, pad);
GST_SPLITMUX_UNLOCK (splitmux);
gst_pad_send_event (pad, eos);
GST_SPLITMUX_LOCK (splitmux);
gst_object_unref (pad);
}
/* Called with lock held. Schedules an EOS event to the ctx pad
* to happen in another thread */
static void
eos_context_async (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
{
AsyncEosHelper *helper = g_new0 (AsyncEosHelper, 1);
GstPad *srcpad, *sinkpad;
srcpad = ctx->srcpad;
sinkpad = gst_pad_get_peer (srcpad);
helper->ctx = ctx;
helper->pad = sinkpad; /* Takes the reference */
ctx->out_eos_async_done = TRUE;
/* There used to be a bug here, where we had to explicitly remove
* the SINK flag so that GstBin would ignore it for EOS purposes.
* That fixed a race where if splitmuxsink really reaches EOS
* before an asynchronous background element has finished, then
* the bin wouldn't actually send EOS to the pipeline. Even after
* finishing and removing the old element, the bin didn't re-check
* EOS status on removing a SINK element. That bug was fixed
* in core. */
GST_DEBUG_OBJECT (splitmux, "scheduled EOS to pad %" GST_PTR_FORMAT " ctx %p",
sinkpad, ctx);
g_assert_nonnull (helper->pad);
gst_element_call_async (GST_ELEMENT (splitmux),
(GstElementCallAsyncFunc) send_eos_async, helper, NULL);
}
/* Called with lock held. TRUE iff all contexts have a
* pending (or delivered) async eos event */
static gboolean
all_contexts_are_async_eos (GstSplitMuxSink * splitmux)
{
gboolean ret = TRUE;
GList *item;
for (item = splitmux->contexts; item; item = item->next) {
MqStreamCtx *ctx = item->data;
ret &= ctx->out_eos_async_done;
}
return ret;
}
/* Called with splitmux lock held to check if this output
* context needs to sleep to wait for the release of the
* next GOP, or to send EOS to close out the current file
*/
static GstFlowReturn
complete_or_wait_on_out (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
{
if (ctx->caps_change)
return GST_FLOW_OK;
do {
/* When first starting up, the reference stream has to output
* the first buffer to prepare the muxer and sink */
gboolean can_output = (ctx->is_reference || splitmux->ready_for_output);
GstClockTimeDiff my_max_out_running_time = splitmux->max_out_running_time;
if (my_max_out_running_time != GST_CLOCK_STIME_NONE
&& my_max_out_running_time != G_MAXINT64) {
my_max_out_running_time -= splitmux->alignment_threshold;
GST_LOG_OBJECT (ctx->srcpad,
"Max out running time currently %" GST_STIME_FORMAT
", with threshold applied it is %" GST_STIME_FORMAT,
GST_STIME_ARGS (splitmux->max_out_running_time),
GST_STIME_ARGS (my_max_out_running_time));
}
if (ctx->flushing
|| splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
return GST_FLOW_FLUSHING;
GST_LOG_OBJECT (ctx->srcpad,
"Checking running time %" GST_STIME_FORMAT " against max %"
GST_STIME_FORMAT, GST_STIME_ARGS (ctx->out_running_time),
GST_STIME_ARGS (my_max_out_running_time));
if (can_output) {
if (splitmux->max_out_running_time != GST_CLOCK_STIME_NONE &&
ctx->out_running_time < my_max_out_running_time) {
return GST_FLOW_OK;
}
switch (splitmux->output_state) {
case SPLITMUX_OUTPUT_STATE_OUTPUT_GOP:
/* We only get here if we've finished outputting a GOP and need to know
* what to do next */
splitmux->output_state = SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND;
GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
continue;
case SPLITMUX_OUTPUT_STATE_ENDING_FILE:
case SPLITMUX_OUTPUT_STATE_ENDING_STREAM:
/* We've reached the max out running_time to get here, so end this file now */
if (ctx->out_eos == FALSE) {
if (splitmux->async_finalize) {
/* We must set EOS asynchronously at this point. We cannot defer
* it, because we need all contexts to wake up, for the
* reference context to eventually give us something at
* START_NEXT_FILE. Otherwise, collectpads might choose another
* context to give us the first buffer, and format-location-full
* will not contain a valid sample. */
g_object_set_qdata ((GObject *) splitmux->sink, EOS_FROM_US,
GINT_TO_POINTER (1));
eos_context_async (ctx, splitmux);
if (all_contexts_are_async_eos (splitmux)) {
GST_INFO_OBJECT (splitmux,
"All contexts are async_eos. Moving to the next file.");
/* We can start the next file once we've asked each pad to go EOS */
splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE;
GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
continue;
}
} else {
send_eos (splitmux, ctx);
continue;
}
} else {
GST_INFO_OBJECT (splitmux,
"At end-of-file state, but context %p is already EOS", ctx);
}
break;
case SPLITMUX_OUTPUT_STATE_START_NEXT_FILE:
if (ctx->is_reference) {
GstFlowReturn ret = GST_FLOW_OK;
/* Special handling on the reference ctx to start new fragments
* and collect commands from the command queue */
/* drops the splitmux lock briefly: */
/* We must have reference ctx in order for format-location-full to
* have a sample */
ret = start_next_fragment (splitmux, ctx);
if (ret != GST_FLOW_OK)
return ret;
continue;
}
break;
case SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND:{
do {
SplitMuxOutputCommand *cmd =
g_queue_pop_tail (&splitmux->out_cmd_q);
if (cmd != NULL) {
/* If we pop the last command, we need to make our queues bigger */
if (g_queue_get_length (&splitmux->out_cmd_q) == 0)
grow_blocked_queues (splitmux);
if (cmd->start_new_fragment) {
if (splitmux->muxed_out_bytes > 0) {
GST_DEBUG_OBJECT (splitmux, "Got cmd to start new fragment");
splitmux->output_state = SPLITMUX_OUTPUT_STATE_ENDING_FILE;
} else {
GST_DEBUG_OBJECT (splitmux,
"Got cmd to start new fragment, but fragment is empty - ignoring.");
}
} else {
GST_DEBUG_OBJECT (splitmux,
"Got new output cmd for time %" GST_STIME_FORMAT,
GST_STIME_ARGS (cmd->max_output_ts));
/* Extend the output range immediately */
if (splitmux->max_out_running_time == GST_CLOCK_STIME_NONE
|| cmd->max_output_ts > splitmux->max_out_running_time)
splitmux->max_out_running_time = cmd->max_output_ts;
GST_DEBUG_OBJECT (splitmux,
"Max out running time now %" GST_STIME_FORMAT,
GST_STIME_ARGS (splitmux->max_out_running_time));
splitmux->output_state = SPLITMUX_OUTPUT_STATE_OUTPUT_GOP;
}
GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
out_cmd_buf_free (cmd);
break;
} else {
GST_SPLITMUX_WAIT_OUTPUT (splitmux);
}
} while (!ctx->flushing && splitmux->output_state ==
SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND);
/* loop and re-check the state */
continue;
}
case SPLITMUX_OUTPUT_STATE_STOPPED:
return GST_FLOW_FLUSHING;
}
} else {
GST_LOG_OBJECT (ctx->srcpad, "Not yet ready for output");
}
GST_INFO_OBJECT (ctx->srcpad,
"Sleeping for running time %"
GST_STIME_FORMAT " (max %" GST_STIME_FORMAT ") or state change.",
GST_STIME_ARGS (ctx->out_running_time),
GST_STIME_ARGS (splitmux->max_out_running_time));
GST_SPLITMUX_WAIT_OUTPUT (splitmux);
GST_INFO_OBJECT (ctx->srcpad,
"Woken for new max running time %" GST_STIME_FORMAT,
GST_STIME_ARGS (splitmux->max_out_running_time));
}
while (1);
return GST_FLOW_OK;
}
static GstClockTime
calculate_next_max_timecode (GstSplitMuxSink * splitmux,
const GstVideoTimeCode * cur_tc, GstClockTime running_time,
GstVideoTimeCode ** next_tc)
{
GstVideoTimeCode *target_tc;
GstClockTime cur_tc_time, target_tc_time, next_max_tc_time;
if (cur_tc == NULL || splitmux->tc_interval == NULL)
return GST_CLOCK_TIME_NONE;
target_tc = gst_video_time_code_add_interval (cur_tc, splitmux->tc_interval);
if (!target_tc) {
GST_ELEMENT_ERROR (splitmux,
STREAM, FAILED, (NULL), ("Couldn't calculate target timecode"));
return GST_CLOCK_TIME_NONE;
}
/* Convert to ns */
target_tc_time = gst_video_time_code_nsec_since_daily_jam (target_tc);
cur_tc_time = gst_video_time_code_nsec_since_daily_jam (cur_tc);
/* Add running_time, accounting for wraparound. */
if (target_tc_time >= cur_tc_time) {
next_max_tc_time = target_tc_time - cur_tc_time + running_time;
} else {
GstClockTime day_in_ns = 24 * 60 * 60 * GST_SECOND;
if ((cur_tc->config.flags & GST_VIDEO_TIME_CODE_FLAGS_DROP_FRAME) &&
(cur_tc->config.fps_d == 1001)) {
/* Checking fps_d is probably unneeded, but better safe than sorry
* (e.g. someone accidentally set a flag) */
GstVideoTimeCode *tc_for_offset;
/* Here, the duration of the 24:00:00;00 timecode isn't exactly one day,
* but slightly less. Calculate that duration from a fake timecode. The
* problem is that 24:00:00;00 isn't a valid timecode, so the workaround
* is to add one frame to 23:59:59;29 */
tc_for_offset =
gst_video_time_code_new (cur_tc->config.fps_n, cur_tc->config.fps_d,
NULL, cur_tc->config.flags, 23, 59, 59,
cur_tc->config.fps_n / cur_tc->config.fps_d, 0);
day_in_ns =
gst_video_time_code_nsec_since_daily_jam (tc_for_offset) +
gst_util_uint64_scale (GST_SECOND, cur_tc->config.fps_d,
cur_tc->config.fps_n);
gst_video_time_code_free (tc_for_offset);
}
next_max_tc_time = day_in_ns - cur_tc_time + target_tc_time + running_time;
}
#ifndef GST_DISABLE_GST_DEBUG
{
gchar *next_max_tc_str, *cur_tc_str;
cur_tc_str = gst_video_time_code_to_string (cur_tc);
next_max_tc_str = gst_video_time_code_to_string (target_tc);
GST_INFO_OBJECT (splitmux, "Next max timecode %s time: %" GST_TIME_FORMAT
" from ref timecode %s time: %" GST_TIME_FORMAT,
next_max_tc_str,
GST_TIME_ARGS (next_max_tc_time),
cur_tc_str, GST_TIME_ARGS (cur_tc_time));
g_free (next_max_tc_str);
g_free (cur_tc_str);
}
#endif
if (next_tc)
*next_tc = target_tc;
else
gst_video_time_code_free (target_tc);
return next_max_tc_time;
}
static gboolean
request_next_keyframe (GstSplitMuxSink * splitmux, GstBuffer * buffer,
GstClockTimeDiff running_time_dts)
{
GstEvent *ev;
GstClockTime target_time;
gboolean timecode_based = FALSE;
GstClockTime max_tc_time = GST_CLOCK_TIME_NONE;
GstClockTime next_max_tc_time = GST_CLOCK_TIME_NONE;
GstClockTime next_fku_time = GST_CLOCK_TIME_NONE;
GstClockTime tc_rounding_error = 5 * GST_USECOND;
InputGop *newest_gop = NULL;
GList *l;
if (!splitmux->send_keyframe_requests)
return TRUE;
/* Find the newest GOP where we passed in DTS the start PTS */
for (l = splitmux->pending_input_gops.tail; l; l = l->prev) {
InputGop *tmp = l->data;
GST_TRACE_OBJECT (splitmux,
"Having pending input GOP with start PTS %" GST_STIME_FORMAT
" and start time %" GST_STIME_FORMAT,
GST_STIME_ARGS (tmp->start_time_pts), GST_STIME_ARGS (tmp->start_time));
if (tmp->sent_fku) {
GST_DEBUG_OBJECT (splitmux,
"Already checked for a keyframe request for this GOP");
return TRUE;
}
if (running_time_dts == GST_CLOCK_STIME_NONE ||
tmp->start_time_pts == GST_CLOCK_STIME_NONE ||
running_time_dts >= tmp->start_time_pts) {
GST_DEBUG_OBJECT (splitmux,
"Using GOP with start PTS %" GST_STIME_FORMAT " and start time %"
GST_STIME_FORMAT, GST_STIME_ARGS (tmp->start_time_pts),
GST_STIME_ARGS (tmp->start_time));
newest_gop = tmp;
break;
}
}
if (!newest_gop) {
GST_DEBUG_OBJECT (splitmux, "Have no complete enough pending input GOP");
return TRUE;
}
if (splitmux->tc_interval) {
if (newest_gop->start_tc
&& gst_video_time_code_is_valid (newest_gop->start_tc)) {
GstVideoTimeCode *next_tc = NULL;
max_tc_time =
calculate_next_max_timecode (splitmux, newest_gop->start_tc,
newest_gop->start_time, &next_tc);
/* calculate the next expected keyframe time to prevent too early fku
* event */
if (GST_CLOCK_TIME_IS_VALID (max_tc_time) && next_tc) {
next_max_tc_time =
calculate_next_max_timecode (splitmux, next_tc, max_tc_time, NULL);
}
if (next_tc)
gst_video_time_code_free (next_tc);
timecode_based = GST_CLOCK_TIME_IS_VALID (max_tc_time) &&
GST_CLOCK_TIME_IS_VALID (next_max_tc_time);
if (!timecode_based) {
GST_WARNING_OBJECT (splitmux,
"Couldn't calculate maximum fragment time for timecode mode");
}
} else {
/* This can happen in the presence of GAP events that trigger
* a new fragment start */
GST_WARNING_OBJECT (splitmux,
"No buffer available to calculate next timecode");
}
}
if ((splitmux->threshold_time == 0 && !timecode_based)
|| splitmux->threshold_bytes != 0)
return TRUE;
if (timecode_based) {
/* We might have rounding errors: aim slightly earlier */
if (max_tc_time >= tc_rounding_error) {
target_time = max_tc_time - tc_rounding_error;
} else {
/* unreliable target time */
GST_DEBUG_OBJECT (splitmux, "tc time %" GST_TIME_FORMAT
" is smaller than allowed rounding error, set it to zero",
GST_TIME_ARGS (max_tc_time));
target_time = 0;
}
if (next_max_tc_time >= tc_rounding_error) {
next_fku_time = next_max_tc_time - tc_rounding_error;
} else {
/* unreliable target time */
GST_DEBUG_OBJECT (splitmux, "next tc time %" GST_TIME_FORMAT
" is smaller than allowed rounding error, set it to zero",
GST_TIME_ARGS (next_max_tc_time));
next_fku_time = 0;
}
} else {
target_time = newest_gop->start_time + splitmux->threshold_time;
}
if (GST_CLOCK_TIME_IS_VALID (splitmux->next_fku_time)) {
GstClockTime allowed_time = splitmux->next_fku_time;
if (timecode_based) {
if (allowed_time >= tc_rounding_error) {
allowed_time -= tc_rounding_error;
} else {
/* unreliable next force key unit time */
GST_DEBUG_OBJECT (splitmux, "expected next force key unit time %"
GST_TIME_FORMAT
" is smaller than allowed rounding error, set it to zero",
GST_TIME_ARGS (splitmux->next_fku_time));
allowed_time = 0;
}
}
if (target_time < allowed_time) {
GST_LOG_OBJECT (splitmux, "Target time %" GST_TIME_FORMAT
" is smaller than expected next keyframe time %" GST_TIME_FORMAT
", rounding error compensated next keyframe time %" GST_TIME_FORMAT,
GST_TIME_ARGS (target_time),
GST_TIME_ARGS (splitmux->next_fku_time),
GST_TIME_ARGS (allowed_time));
return TRUE;
} else if (allowed_time != splitmux->next_fku_time &&
target_time < splitmux->next_fku_time) {
GST_DEBUG_OBJECT (splitmux, "Target time %" GST_TIME_FORMAT
" is smaller than expected next keyframe time %" GST_TIME_FORMAT
", but the difference is smaller than allowed rounding error",
GST_TIME_ARGS (target_time), GST_TIME_ARGS (splitmux->next_fku_time));
}
}
if (!timecode_based) {
next_fku_time = target_time + splitmux->threshold_time;
}
GST_INFO_OBJECT (splitmux, "Requesting keyframe at %" GST_TIME_FORMAT
", the next expected keyframe request time is %" GST_TIME_FORMAT,
GST_TIME_ARGS (target_time), GST_TIME_ARGS (next_fku_time));
newest_gop->sent_fku = TRUE;
splitmux->next_fku_time = next_fku_time;
ev = gst_video_event_new_upstream_force_key_unit (target_time, TRUE, 0);
return gst_pad_push_event (splitmux->reference_ctx->sinkpad, ev);
}
static GstPadProbeReturn
handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
{
GstSplitMuxSink *splitmux = ctx->splitmux;
MqStreamBuf *buf_info = NULL;
GstFlowReturn ret = GST_FLOW_OK;
GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type);
/* FIXME: Handle buffer lists, until then make it clear they won't work */
if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
g_warning ("Buffer list handling not implemented");
return GST_PAD_PROBE_DROP;
}
if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM ||
info->type & GST_PAD_PROBE_TYPE_EVENT_FLUSH) {
GstEvent *event = gst_pad_probe_info_get_event (info);
gboolean locked = FALSE, wait = !ctx->is_reference;
GST_LOG_OBJECT (pad, "Event %" GST_PTR_FORMAT, event);
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_SEGMENT:
gst_event_copy_segment (event, &ctx->out_segment);
break;
case GST_EVENT_FLUSH_STOP:
GST_SPLITMUX_LOCK (splitmux);
locked = TRUE;
gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
g_queue_clear (&ctx->queued_bufs);
g_queue_clear (&ctx->queued_bufs);
/* If this is the reference context, we just threw away any queued keyframes */
if (ctx->is_reference)
splitmux->queued_keyframes = 0;
ctx->flushing = FALSE;
wait = FALSE;
break;
case GST_EVENT_FLUSH_START:
GST_SPLITMUX_LOCK (splitmux);
locked = TRUE;
GST_LOG_OBJECT (pad, "Flush start");
ctx->flushing = TRUE;
GST_SPLITMUX_BROADCAST_INPUT (splitmux);
GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
break;
case GST_EVENT_EOS:
GST_SPLITMUX_LOCK (splitmux);
locked = TRUE;
if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
goto beach;
ctx->out_eos = TRUE;
if (ctx == splitmux->reference_ctx) {
splitmux->output_state = SPLITMUX_OUTPUT_STATE_ENDING_STREAM;
GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
}
GST_INFO_OBJECT (splitmux,
"Have EOS event at pad %" GST_PTR_FORMAT " ctx %p", pad, ctx);
break;
case GST_EVENT_GAP:{
GstClockTime gap_ts;
GstClockTimeDiff rtime;
gst_event_parse_gap (event, &gap_ts, NULL);
if (gap_ts == GST_CLOCK_TIME_NONE)
break;
GST_SPLITMUX_LOCK (splitmux);
locked = TRUE;
if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
goto beach;
/* When we get a gap event on the
* reference stream and we're trying to open a
* new file, we need to store it until we get
* the buffer afterwards
*/
if (ctx->is_reference &&
(splitmux->output_state != SPLITMUX_OUTPUT_STATE_OUTPUT_GOP)) {
GST_DEBUG_OBJECT (pad, "Storing GAP event until buffer arrives");
gst_event_replace (&ctx->pending_gap, event);
GST_SPLITMUX_UNLOCK (splitmux);
return GST_PAD_PROBE_HANDLED;
}
rtime = my_segment_to_running_time (&ctx->out_segment, gap_ts);
GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_STIME_FORMAT,
GST_STIME_ARGS (rtime));
if (rtime != GST_CLOCK_STIME_NONE) {
ctx->out_running_time = rtime;
complete_or_wait_on_out (splitmux, ctx);
}
break;
}
case GST_EVENT_CUSTOM_DOWNSTREAM:{
const GstStructure *s;
GstClockTimeDiff ts = 0;
s = gst_event_get_structure (event);
if (!gst_structure_has_name (s, "splitmuxsink-unblock"))
break;
gst_structure_get_int64 (s, "timestamp", &ts);
GST_SPLITMUX_LOCK (splitmux);
locked = TRUE;
if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
goto beach;
ctx->out_running_time = ts;
if (!ctx->is_reference)
ret = complete_or_wait_on_out (splitmux, ctx);
GST_SPLITMUX_UNLOCK (splitmux);
GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
return GST_PAD_PROBE_DROP;
}
case GST_EVENT_CAPS:{
GstPad *peer;
if (!ctx->is_reference)
break;
peer = gst_pad_get_peer (pad);
if (peer) {
gboolean ok = gst_pad_send_event (peer, gst_event_ref (event));
gst_object_unref (peer);
if (ok)
break;
} else {
break;
}
/* This is in the case the muxer doesn't allow this change of caps */
GST_SPLITMUX_LOCK (splitmux);
locked = TRUE;
ctx->caps_change = TRUE;
if (splitmux->output_state != SPLITMUX_OUTPUT_STATE_START_NEXT_FILE) {
GST_DEBUG_OBJECT (splitmux,
"New caps were not accepted. Switching output file");
if (ctx->out_eos == FALSE) {
splitmux->output_state = SPLITMUX_OUTPUT_STATE_ENDING_FILE;
GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
}
}
/* Lets it fall through, if it fails again, then the muxer just can't
* support this format, but at least we have a closed file.
*/
break;
}
default:
break;
}
/* We need to make sure events aren't passed
* until the muxer / sink are ready for it */
if (!locked)
GST_SPLITMUX_LOCK (splitmux);
if (wait)
ret = complete_or_wait_on_out (splitmux, ctx);
GST_SPLITMUX_UNLOCK (splitmux);
/* Don't try to forward sticky events before the next buffer is there
* because it would cause a new file to be created without the first
* buffer being available.
*/
GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
if (ctx->caps_change && GST_EVENT_IS_STICKY (event)) {
gst_event_unref (event);
return GST_PAD_PROBE_HANDLED;
} else {
return GST_PAD_PROBE_PASS;
}
}
/* Allow everything through until the configured next stopping point */
GST_SPLITMUX_LOCK (splitmux);
buf_info = g_queue_pop_tail (&ctx->queued_bufs);
if (buf_info == NULL) {
/* Can only happen due to a poorly timed flush */
ret = GST_FLOW_FLUSHING;
goto beach;
}
/* If we have popped a keyframe, decrement the queued_gop count */
if (buf_info->keyframe && splitmux->queued_keyframes > 0 && ctx->is_reference)
splitmux->queued_keyframes--;
ctx->out_running_time = buf_info->run_ts;
ctx->cur_out_buffer = gst_pad_probe_info_get_buffer (info);
GST_LOG_OBJECT (splitmux,
"Pad %" GST_PTR_FORMAT " buffer with run TS %" GST_STIME_FORMAT
" size %" G_GUINT64_FORMAT,
pad, GST_STIME_ARGS (ctx->out_running_time), buf_info->buf_size);
ctx->caps_change = FALSE;
ret = complete_or_wait_on_out (splitmux, ctx);
splitmux->muxed_out_bytes += buf_info->buf_size;
#ifndef GST_DISABLE_GST_DEBUG
{
GstBuffer *buf = gst_pad_probe_info_get_buffer (info);
GST_LOG_OBJECT (pad, "Returning to pass buffer %" GST_PTR_FORMAT
" run ts %" GST_STIME_FORMAT, buf,
GST_STIME_ARGS (ctx->out_running_time));
}
#endif
ctx->cur_out_buffer = NULL;
GST_SPLITMUX_UNLOCK (splitmux);
/* pending_gap is protected by the STREAM lock */
if (ctx->pending_gap) {
/* If we previously stored a gap event, send it now */
GstPad *peer = gst_pad_get_peer (ctx->srcpad);
GST_DEBUG_OBJECT (splitmux,
"Pad %" GST_PTR_FORMAT " sending pending GAP event", ctx->srcpad);
gst_pad_send_event (peer, ctx->pending_gap);
ctx->pending_gap = NULL;
gst_object_unref (peer);
}
mq_stream_buf_free (buf_info);
GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
return GST_PAD_PROBE_PASS;
beach:
GST_SPLITMUX_UNLOCK (splitmux);
GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
return GST_PAD_PROBE_DROP;
}
static gboolean
resend_sticky (GstPad * pad, GstEvent ** event, GstPad * peer)
{
return gst_pad_send_event (peer, gst_event_ref (*event));
}
static void
unlock_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
{
if (ctx->fragment_block_id > 0) {
gst_pad_remove_probe (ctx->srcpad, ctx->fragment_block_id);
ctx->fragment_block_id = 0;
}
}
static void
restart_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
{
GstPad *peer = gst_pad_get_peer (ctx->srcpad);
gst_pad_sticky_events_foreach (ctx->srcpad,
(GstPadStickyEventsForeachFunction) (resend_sticky), peer);
/* Clear EOS flag if not actually EOS */
ctx->out_eos = GST_PAD_IS_EOS (ctx->srcpad);
ctx->out_eos_async_done = ctx->out_eos;
gst_object_unref (peer);
}
static void
relink_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
{
GstPad *sinkpad, *srcpad, *newpad;
GstPadTemplate *templ;
srcpad = ctx->srcpad;
sinkpad = gst_pad_get_peer (srcpad);
templ = sinkpad->padtemplate;
newpad =
gst_element_request_pad (splitmux->muxer, templ,
GST_PAD_NAME (sinkpad), NULL);
GST_DEBUG_OBJECT (splitmux, "Relinking ctx %p to pad %" GST_PTR_FORMAT, ctx,
newpad);
if (!gst_pad_unlink (srcpad, sinkpad)) {
gst_object_unref (sinkpad);
goto fail;
}
if (gst_pad_link_full (srcpad, newpad,
GST_PAD_LINK_CHECK_NO_RECONFIGURE) != GST_PAD_LINK_OK) {
gst_element_release_request_pad (splitmux->muxer, newpad);
gst_object_unref (sinkpad);
gst_object_unref (newpad);
goto fail;
}
gst_object_unref (newpad);
gst_object_unref (sinkpad);
return;
fail:
GST_ELEMENT_ERROR (splitmux, RESOURCE, SETTINGS,
("Could not create the new muxer/sink"), NULL);
}
static GstPadProbeReturn
_block_pad (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
{
return GST_PAD_PROBE_OK;
}
static void
block_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
{
ctx->fragment_block_id =
gst_pad_add_probe (ctx->srcpad, GST_PAD_PROBE_TYPE_BLOCK, _block_pad,
NULL, NULL);
}
static gboolean
_set_property_from_structure (GQuark field_id, const GValue * value,
gpointer user_data)
{
const gchar *property_name = g_quark_to_string (field_id);
GObject *element = G_OBJECT (user_data);
g_object_set_property (element, property_name, value);
return TRUE;
}
static void
_lock_and_set_to_null (GstElement * element, GstSplitMuxSink * splitmux)
{
gst_element_set_locked_state (element, TRUE);
gst_element_set_state (element, GST_STATE_NULL);
GST_LOG_OBJECT (splitmux, "Removing old element %" GST_PTR_FORMAT, element);
gst_bin_remove (GST_BIN (splitmux), element);
}
static void
_send_event (const GValue * value, gpointer user_data)
{
GstPad *pad = g_value_get_object (value);
GstEvent *ev = user_data;
gst_pad_send_event (pad, gst_event_ref (ev));
}
/* Called with lock held when a fragment
* reaches EOS and it is time to restart
* a new fragment
*/
static GstFlowReturn
start_next_fragment (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
{
GstElement *muxer, *sink;
g_assert (ctx->is_reference);
/* 1 change to new file */
splitmux->switching_fragment = TRUE;
/* We need to drop the splitmux lock to acquire the state lock
* here and ensure there's no racy state change going on elsewhere */
muxer = gst_object_ref (splitmux->muxer);
sink = gst_object_ref (splitmux->active_sink);
GST_SPLITMUX_UNLOCK (splitmux);
GST_SPLITMUX_STATE_LOCK (splitmux);
if (splitmux->shutdown) {
GST_DEBUG_OBJECT (splitmux,
"Shutdown requested. Aborting fragment switch.");
GST_SPLITMUX_LOCK (splitmux);
GST_SPLITMUX_STATE_UNLOCK (splitmux);
gst_object_unref (muxer);
gst_object_unref (sink);
return GST_FLOW_FLUSHING;
}
if (splitmux->async_finalize) {
if (splitmux->muxed_out_bytes > 0
|| splitmux->fragment_id != splitmux->start_index) {
gchar *newname;
GstElement *new_sink, *new_muxer;
GST_DEBUG_OBJECT (splitmux, "Starting fragment %u",
splitmux->fragment_id);
g_list_foreach (splitmux->contexts, (GFunc) block_context, splitmux);
newname = g_strdup_printf ("sink_%u", splitmux->fragment_id);
GST_SPLITMUX_LOCK (splitmux);
if ((splitmux->sink =
create_element (splitmux, splitmux->sink_factory, newname,
TRUE)) == NULL)
goto fail;
if (splitmux->sink_preset && GST_IS_PRESET (splitmux->sink))
gst_preset_load_preset (GST_PRESET (splitmux->sink),
splitmux->sink_preset);
if (splitmux->sink_properties)
gst_structure_foreach (splitmux->sink_properties,
_set_property_from_structure, splitmux->sink);
splitmux->active_sink = splitmux->sink;
g_signal_emit (splitmux, signals[SIGNAL_SINK_ADDED], 0, splitmux->sink);
g_free (newname);
newname = g_strdup_printf ("muxer_%u", splitmux->fragment_id);
if ((splitmux->muxer =
create_element (splitmux, splitmux->muxer_factory, newname,
TRUE)) == NULL)
goto fail;
if (g_object_class_find_property (G_OBJECT_GET_CLASS (splitmux->sink),
"async") != NULL) {
/* async child elements are causing state change races and weird
* failures, so let's try and turn that off */
g_object_set (splitmux->sink, "async", FALSE, NULL);
}
if (splitmux->muxer_preset && GST_IS_PRESET (splitmux->muxer))
gst_preset_load_preset (GST_PRESET (splitmux->muxer),
splitmux->muxer_preset);
if (splitmux->muxer_properties)
gst_structure_foreach (splitmux->muxer_properties,
_set_property_from_structure, splitmux->muxer);
g_signal_emit (splitmux, signals[SIGNAL_MUXER_ADDED], 0, splitmux->muxer);
g_free (newname);
new_sink = splitmux->sink;
new_muxer = splitmux->muxer;
GST_SPLITMUX_UNLOCK (splitmux);
g_list_foreach (splitmux->contexts, (GFunc) relink_context, splitmux);
gst_element_link (new_muxer, new_sink);
if (g_object_get_qdata ((GObject *) sink, EOS_FROM_US)) {
if (GPOINTER_TO_INT (g_object_get_qdata ((GObject *) sink,
EOS_FROM_US)) == 2) {
_lock_and_set_to_null (muxer, splitmux);
_lock_and_set_to_null (sink, splitmux);
} else {
g_object_set_qdata ((GObject *) sink, EOS_FROM_US,
GINT_TO_POINTER (2));
}
}
gst_object_unref (muxer);
gst_object_unref (sink);
muxer = new_muxer;
sink = new_sink;
gst_object_ref (muxer);
gst_object_ref (sink);
}
} else {
gst_element_set_locked_state (muxer, TRUE);
gst_element_set_locked_state (sink, TRUE);
gst_element_set_state (sink, GST_STATE_NULL);
if (splitmux->reset_muxer) {
gst_element_set_state (muxer, GST_STATE_NULL);
} else {
GstIterator *it = gst_element_iterate_sink_pads (muxer);
GstEvent *ev;
guint32 seqnum;
ev = gst_event_new_flush_start ();
seqnum = gst_event_get_seqnum (ev);
while (gst_iterator_foreach (it, _send_event, ev) == GST_ITERATOR_RESYNC);
gst_event_unref (ev);
gst_iterator_resync (it);
ev = gst_event_new_flush_stop (TRUE);
gst_event_set_seqnum (ev, seqnum);
while (gst_iterator_foreach (it, _send_event, ev) == GST_ITERATOR_RESYNC);
gst_event_unref (ev);
gst_iterator_free (it);
}
}
GST_SPLITMUX_LOCK (splitmux);
set_next_filename (splitmux, ctx);
splitmux->muxed_out_bytes = 0;
GST_SPLITMUX_UNLOCK (splitmux);
if (gst_element_set_state (sink,
GST_STATE_TARGET (splitmux)) == GST_STATE_CHANGE_FAILURE) {
gst_element_set_state (sink, GST_STATE_NULL);
gst_element_set_locked_state (muxer, FALSE);
gst_element_set_locked_state (sink, FALSE);
goto fail_output;
}
if (gst_element_set_state (muxer,
GST_STATE_TARGET (splitmux)) == GST_STATE_CHANGE_FAILURE) {
gst_element_set_state (muxer, GST_STATE_NULL);
gst_element_set_state (sink, GST_STATE_NULL);
gst_element_set_locked_state (muxer, FALSE);
gst_element_set_locked_state (sink, FALSE);
goto fail_muxer;
}
gst_element_set_locked_state (muxer, FALSE);
gst_element_set_locked_state (sink, FALSE);
gst_object_unref (sink);
gst_object_unref (muxer);
GST_SPLITMUX_LOCK (splitmux);
GST_SPLITMUX_STATE_UNLOCK (splitmux);
splitmux->switching_fragment = FALSE;
do_async_done (splitmux);
splitmux->ready_for_output = TRUE;
g_list_foreach (splitmux->contexts, (GFunc) unlock_context, splitmux);
g_list_foreach (splitmux->contexts, (GFunc) restart_context, splitmux);
send_fragment_opened_closed_msg (splitmux, TRUE, sink);
/* FIXME: Is this always the correct next state? */
GST_LOG_OBJECT (splitmux, "Resetting state to AWAITING_COMMAND");
splitmux->output_state = SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND;
GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
return GST_FLOW_OK;
fail:
gst_object_unref (sink);
gst_object_unref (muxer);
GST_SPLITMUX_LOCK (splitmux);
GST_SPLITMUX_STATE_UNLOCK (splitmux);
GST_ELEMENT_ERROR (splitmux, RESOURCE, SETTINGS,
("Could not create the new muxer/sink"), NULL);
return GST_FLOW_ERROR;
fail_output:
GST_ELEMENT_ERROR (splitmux, RESOURCE, SETTINGS,
("Could not start new output sink"), NULL);
gst_object_unref (sink);
gst_object_unref (muxer);
GST_SPLITMUX_LOCK (splitmux);
GST_SPLITMUX_STATE_UNLOCK (splitmux);
splitmux->switching_fragment = FALSE;
return GST_FLOW_ERROR;
fail_muxer:
GST_ELEMENT_ERROR (splitmux, RESOURCE, SETTINGS,
("Could not start new muxer"), NULL);
gst_object_unref (sink);
gst_object_unref (muxer);
GST_SPLITMUX_LOCK (splitmux);
GST_SPLITMUX_STATE_UNLOCK (splitmux);
splitmux->switching_fragment = FALSE;
return GST_FLOW_ERROR;
}
static void
bus_handler (GstBin * bin, GstMessage * message)
{
GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (bin);
switch (GST_MESSAGE_TYPE (message)) {
case GST_MESSAGE_EOS:{
/* If the state is draining out the current file, drop this EOS */
GstElement *sink;
sink = GST_ELEMENT (GST_MESSAGE_SRC (message));
GST_SPLITMUX_LOCK (splitmux);
send_fragment_opened_closed_msg (splitmux, FALSE, sink);
if (splitmux->async_finalize) {
if (g_object_get_qdata ((GObject *) sink, EOS_FROM_US)) {
if (GPOINTER_TO_INT (g_object_get_qdata ((GObject *) sink,
EOS_FROM_US)) == 2) {
GstElement *muxer;
GstPad *sinksink, *muxersrc;
sinksink = gst_element_get_static_pad (sink, "sink");
muxersrc = gst_pad_get_peer (sinksink);
muxer = gst_pad_get_parent_element (muxersrc);
gst_object_unref (sinksink);
gst_object_unref (muxersrc);
gst_element_call_async (muxer,
(GstElementCallAsyncFunc) _lock_and_set_to_null,
gst_object_ref (splitmux), gst_object_unref);
gst_element_call_async (sink,
(GstElementCallAsyncFunc) _lock_and_set_to_null,
gst_object_ref (splitmux), gst_object_unref);
gst_object_unref (muxer);
} else {
g_object_set_qdata ((GObject *) sink, EOS_FROM_US,
GINT_TO_POINTER (2));
}
GST_DEBUG_OBJECT (splitmux,
"Caught async EOS from previous muxer+sink. Dropping.");
/* We forward the EOS so that it gets aggregated as normal. If the sink
* finishes and is removed before the end, it will be de-aggregated */
gst_message_unref (message);
GST_SPLITMUX_UNLOCK (splitmux);
return;
}
} else if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_ENDING_STREAM) {
GST_DEBUG_OBJECT (splitmux,
"Passing EOS message. Output state %d max_out_running_time %"
GST_STIME_FORMAT, splitmux->output_state,
GST_STIME_ARGS (splitmux->max_out_running_time));
} else {
GST_DEBUG_OBJECT (splitmux, "Caught EOS at end of fragment, dropping");
splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE;
GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
gst_message_unref (message);
GST_SPLITMUX_UNLOCK (splitmux);
return;
}
GST_SPLITMUX_UNLOCK (splitmux);
break;
}
case GST_MESSAGE_ASYNC_START:
case GST_MESSAGE_ASYNC_DONE:
/* Ignore state changes from our children while switching */
GST_SPLITMUX_LOCK (splitmux);
if (splitmux->switching_fragment) {
if (GST_MESSAGE_SRC (message) == (GstObject *) splitmux->active_sink
|| GST_MESSAGE_SRC (message) == (GstObject *) splitmux->muxer) {
GST_LOG_OBJECT (splitmux,
"Ignoring state change from child %" GST_PTR_FORMAT
" while switching", GST_MESSAGE_SRC (message));
gst_message_unref (message);
GST_SPLITMUX_UNLOCK (splitmux);
return;
}
}
GST_SPLITMUX_UNLOCK (splitmux);
break;
case GST_MESSAGE_WARNING:
{
GError *gerror = NULL;
gst_message_parse_warning (message, &gerror, NULL);
if (g_error_matches (gerror, GST_STREAM_ERROR, GST_STREAM_ERROR_FORMAT)) {
GList *item;
gboolean caps_change = FALSE;
GST_SPLITMUX_LOCK (splitmux);
for (item = splitmux->contexts; item; item = item->next) {
MqStreamCtx *ctx = item->data;
if (ctx->caps_change) {
caps_change = TRUE;
break;
}
}
GST_SPLITMUX_UNLOCK (splitmux);
if (caps_change) {
GST_LOG_OBJECT (splitmux,
"Ignoring warning change from child %" GST_PTR_FORMAT
" while switching caps", GST_MESSAGE_SRC (message));
gst_message_unref (message);
return;
}
}
break;
}
default:
break;
}
GST_BIN_CLASS (parent_class)->handle_message (bin, message);
}
static void
ctx_set_unblock (MqStreamCtx * ctx)
{
ctx->need_unblock = TRUE;
}
static gboolean
need_new_fragment (GstSplitMuxSink * splitmux,
GstClockTime queued_time, GstClockTime queued_gop_time,
guint64 queued_bytes)
{
guint64 thresh_bytes;
GstClockTime thresh_time;
gboolean check_robust_muxing;
GstClockTime time_to_split = GST_CLOCK_TIME_NONE;
GstClockTime *ptr_to_time;
const InputGop *gop, *next_gop;
GST_OBJECT_LOCK (splitmux);
thresh_bytes = splitmux->threshold_bytes;
thresh_time = splitmux->threshold_time;
ptr_to_time = (GstClockTime *)
gst_queue_array_peek_head_struct (splitmux->times_to_split);
if (ptr_to_time)
time_to_split = *ptr_to_time;
check_robust_muxing = splitmux->use_robust_muxing
&& splitmux->muxer_has_reserved_props;
GST_OBJECT_UNLOCK (splitmux);
/* Have we muxed at least one thing from the reference
* stream into the file? If not, no other streams can have
* either */
if (splitmux->fragment_reference_bytes <= 0) {
GST_TRACE_OBJECT (splitmux,
"Not ready to split - nothing muxed on the reference stream");
return FALSE;
}
/* User told us to split now */
if (g_atomic_int_get (&(splitmux->do_split_next_gop)) == TRUE) {
GST_TRACE_OBJECT (splitmux, "Forcing because split_next_gop is set");
return TRUE;
}
gop = g_queue_peek_head (&splitmux->pending_input_gops);
/* We need a full GOP queued up at this point */
g_assert (gop != NULL);
next_gop = g_queue_peek_nth (&splitmux->pending_input_gops, 1);
/* And the beginning of the next GOP or otherwise EOS */
/* User told us to split at this running time */
if (gop->start_time >= time_to_split) {
GST_OBJECT_LOCK (splitmux);
/* Dequeue running time */
gst_queue_array_pop_head_struct (splitmux->times_to_split);
/* Empty any running times after this that are past now */
ptr_to_time = gst_queue_array_peek_head_struct (splitmux->times_to_split);
while (ptr_to_time) {
time_to_split = *ptr_to_time;
if (gop->start_time < time_to_split) {
break;
}
gst_queue_array_pop_head_struct (splitmux->times_to_split);
ptr_to_time = gst_queue_array_peek_head_struct (splitmux->times_to_split);
}
GST_TRACE_OBJECT (splitmux,
"GOP start time %" GST_STIME_FORMAT " is after requested split point %"
GST_STIME_FORMAT, GST_STIME_ARGS (gop->start_time),
GST_STIME_ARGS (time_to_split));
GST_OBJECT_UNLOCK (splitmux);
return TRUE;
}
if (thresh_bytes > 0 && queued_bytes > thresh_bytes) {
GST_TRACE_OBJECT (splitmux,
"queued bytes %" G_GUINT64_FORMAT " overruns byte limit", queued_bytes);
return TRUE; /* Would overrun byte limit */
}
if (thresh_time > 0 && queued_time > thresh_time) {
GST_TRACE_OBJECT (splitmux,
"queued time %" GST_STIME_FORMAT " overruns time limit",
GST_STIME_ARGS (queued_time));
return TRUE; /* Would overrun time limit */
}
if (splitmux->tc_interval) {
GstClockTime next_gop_start_time =
next_gop ? next_gop->start_time : splitmux->max_in_running_time;
if (GST_CLOCK_TIME_IS_VALID (splitmux->next_fragment_start_tc_time) &&
GST_CLOCK_STIME_IS_VALID (next_gop_start_time) &&
next_gop_start_time >
splitmux->next_fragment_start_tc_time + 5 * GST_USECOND) {
GST_TRACE_OBJECT (splitmux,
"in running time %" GST_STIME_FORMAT " overruns time limit %"
GST_TIME_FORMAT, GST_STIME_ARGS (next_gop_start_time),
GST_TIME_ARGS (splitmux->next_fragment_start_tc_time));
return TRUE;
}
}
if (check_robust_muxing) {
GstClockTime mux_reserved_remain;
g_object_get (splitmux->muxer,
"reserved-duration-remaining", &mux_reserved_remain, NULL);
GST_LOG_OBJECT (splitmux,
"Muxer robust muxing report - %" G_GUINT64_FORMAT
" remaining. New GOP would enqueue %" G_GUINT64_FORMAT,
mux_reserved_remain, queued_gop_time);
if (queued_gop_time >= mux_reserved_remain) {
GST_INFO_OBJECT (splitmux,
"File is about to run out of header room - %" G_GUINT64_FORMAT
" remaining. New GOP would enqueue %" G_GUINT64_FORMAT
". Switching to new file", mux_reserved_remain, queued_gop_time);
return TRUE;
}
}
/* Continue and mux this GOP */
return FALSE;
}
/* probably we want to add this API? */
static void
video_time_code_replace (GstVideoTimeCode ** old_tc, GstVideoTimeCode * new_tc)
{
GstVideoTimeCode *timecode = NULL;
g_return_if_fail (old_tc != NULL);
if (*old_tc == new_tc)
return;
if (new_tc)
timecode = gst_video_time_code_copy (new_tc);
if (*old_tc)
gst_video_time_code_free (*old_tc);
*old_tc = timecode;
}
/* Called with splitmux lock held */
/* Called when entering ProcessingCompleteGop state
* Assess if mq contents overflowed the current file
* -> If yes, need to switch to new file
* -> if no, set max_out_running_time to let this GOP in and
* go to COLLECTING_GOP_START state
*/
static void
handle_gathered_gop (GstSplitMuxSink * splitmux, const InputGop * gop,
GstClockTimeDiff next_gop_start_time, GstClockTimeDiff max_out_running_time)
{
guint64 queued_bytes;
GstClockTimeDiff queued_time = 0;
GstClockTimeDiff queued_gop_time = 0;
SplitMuxOutputCommand *cmd;
/* Assess if the multiqueue contents overflowed the current file */
/* When considering if a newly gathered GOP overflows
* the time limit for the file, only consider the running time of the
* reference stream. Other streams might have run ahead a little bit,
* but extra pieces won't be released to the muxer beyond the reference
* stream cut-off anyway - so it forms the limit. */
queued_bytes = splitmux->fragment_total_bytes + gop->total_bytes;
queued_time = next_gop_start_time;
/* queued_gop_time tracks how much unwritten data there is waiting to
* be written to this fragment including this GOP */
if (splitmux->reference_ctx->out_running_time != GST_CLOCK_STIME_NONE)
queued_gop_time = queued_time - splitmux->reference_ctx->out_running_time;
else
queued_gop_time = queued_time - gop->start_time;
GST_LOG_OBJECT (splitmux, " queued_bytes %" G_GUINT64_FORMAT, queued_bytes);
GST_LOG_OBJECT (splitmux, "mq at TS %" GST_STIME_FORMAT
" bytes %" G_GUINT64_FORMAT " in next gop start time %" GST_STIME_FORMAT
" gop start time %" GST_STIME_FORMAT,
GST_STIME_ARGS (queued_time), queued_bytes,
GST_STIME_ARGS (next_gop_start_time), GST_STIME_ARGS (gop->start_time));
if (queued_gop_time < 0)
goto error_gop_duration;
if (queued_time < splitmux->fragment_start_time)
goto error_queued_time;
queued_time -= splitmux->fragment_start_time;
if (queued_time < queued_gop_time)
queued_gop_time = queued_time;
/* Expand queued bytes estimate by muxer overhead */
queued_bytes += (queued_bytes * splitmux->mux_overhead);
/* Check for overrun - have we output at least one byte and overrun
* either threshold? */
if (need_new_fragment (splitmux, queued_time, queued_gop_time, queued_bytes)) {
if (splitmux->async_finalize) {
GstClockTime *sink_running_time = g_new (GstClockTime, 1);
*sink_running_time = splitmux->reference_ctx->out_running_time;
g_object_set_qdata_full (G_OBJECT (splitmux->sink),
RUNNING_TIME, sink_running_time, g_free);
}
g_atomic_int_set (&(splitmux->do_split_next_gop), FALSE);
/* Tell the output side to start a new fragment */
GST_INFO_OBJECT (splitmux,
"This GOP (dur %" GST_STIME_FORMAT
") would overflow the fragment, Sending start_new_fragment cmd",
GST_STIME_ARGS (queued_gop_time));
cmd = out_cmd_buf_new ();
cmd->start_new_fragment = TRUE;
g_queue_push_head (&splitmux->out_cmd_q, cmd);
GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
splitmux->fragment_start_time = gop->start_time;
splitmux->fragment_start_time_pts = gop->start_time_pts;
splitmux->fragment_total_bytes = 0;
splitmux->fragment_reference_bytes = 0;
video_time_code_replace (&splitmux->fragment_start_tc, gop->start_tc);
splitmux->next_fragment_start_tc_time =
calculate_next_max_timecode (splitmux, splitmux->fragment_start_tc,
splitmux->fragment_start_time, NULL);
if (splitmux->tc_interval && splitmux->fragment_start_tc
&& !GST_CLOCK_TIME_IS_VALID (splitmux->next_fragment_start_tc_time)) {
GST_WARNING_OBJECT (splitmux,
"Couldn't calculate next fragment start time for timecode mode");
}
}
/* And set up to collect the next GOP */
if (max_out_running_time != G_MAXINT64) {
splitmux->input_state = SPLITMUX_INPUT_STATE_COLLECTING_GOP_START;
} else {
/* This is probably already the current state, but just in case: */
splitmux->input_state = SPLITMUX_INPUT_STATE_FINISHING_UP;
}
/* And wake all input contexts to send a wake-up event */
g_list_foreach (splitmux->contexts, (GFunc) ctx_set_unblock, NULL);
GST_SPLITMUX_BROADCAST_INPUT (splitmux);
/* Now either way - either there was no overflow, or we requested a new fragment: release this GOP */
splitmux->fragment_total_bytes += gop->total_bytes;
splitmux->fragment_reference_bytes += gop->reference_bytes;
if (gop->total_bytes > 0) {
GST_LOG_OBJECT (splitmux,
"Releasing GOP to output. Bytes in fragment now %" G_GUINT64_FORMAT
" time %" GST_STIME_FORMAT,
splitmux->fragment_total_bytes, GST_STIME_ARGS (queued_time));
/* Send this GOP to the output command queue */
cmd = out_cmd_buf_new ();
cmd->start_new_fragment = FALSE;
cmd->max_output_ts = max_out_running_time;
GST_LOG_OBJECT (splitmux, "Sending GOP cmd to output for TS %"
GST_STIME_FORMAT, GST_STIME_ARGS (max_out_running_time));
g_queue_push_head (&splitmux->out_cmd_q, cmd);
GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
}
return;
error_gop_duration:
GST_ELEMENT_ERROR (splitmux,
STREAM, FAILED, ("Timestamping error on input streams"),
("Queued GOP time is negative %" GST_STIME_FORMAT,
GST_STIME_ARGS (queued_gop_time)));
return;
error_queued_time:
GST_ELEMENT_ERROR (splitmux,
STREAM, FAILED, ("Timestamping error on input streams"),
("Queued time is negative. Input went backwards. queued_time - %"
GST_STIME_FORMAT, GST_STIME_ARGS (queued_time)));
return;
}
/* Called with splitmux lock held */
/* Called from each input pad when it is has all the pieces
* for a GOP or EOS, starting with the reference pad which has set the
* splitmux->max_in_running_time
*/
static void
check_completed_gop (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
{
GList *cur;
GstEvent *event;
/* On ENDING_FILE, the reference stream sends a command to start a new
* fragment, then releases the GOP for output in the new fragment.
* If some streams received no buffer during the last GOP that overran,
* because its next buffer has a timestamp bigger than
* ctx->max_in_running_time, its queue is empty. In that case the only
* way to wakeup the output thread is by injecting an event in the
* queue. This usually happen with subtitle streams.
* See https://bugzilla.gnome.org/show_bug.cgi?id=763711. */
if (ctx->need_unblock) {
GST_LOG_OBJECT (ctx->sinkpad, "Sending splitmuxsink-unblock event");
event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM |
GST_EVENT_TYPE_SERIALIZED,
gst_structure_new ("splitmuxsink-unblock", "timestamp",
G_TYPE_INT64, splitmux->max_in_running_time, NULL));
GST_SPLITMUX_UNLOCK (splitmux);
gst_pad_send_event (ctx->sinkpad, event);
GST_SPLITMUX_LOCK (splitmux);
ctx->need_unblock = FALSE;
GST_SPLITMUX_BROADCAST_INPUT (splitmux);
/* state may have changed while we were unlocked. Loop again if so */
if (splitmux->input_state != SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT)
return;
}
do {
GstClockTimeDiff next_gop_start = GST_CLOCK_STIME_NONE;
if (splitmux->input_state == SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT) {
GstClockTimeDiff max_out_running_time;
gboolean ready = TRUE;
InputGop *gop;
const InputGop *next_gop;
gop = g_queue_peek_head (&splitmux->pending_input_gops);
next_gop = g_queue_peek_nth (&splitmux->pending_input_gops, 1);
/* If we have no GOP or no next GOP here then the reference context is
* at EOS, otherwise use the start time of the next GOP if we're far
* enough in the GOP to know it */
if (gop && next_gop) {
if (!splitmux->reference_ctx->in_eos
&& splitmux->max_in_running_time_dts != GST_CLOCK_STIME_NONE
&& splitmux->max_in_running_time_dts < next_gop->start_time_pts) {
GST_LOG_OBJECT (splitmux,
"No further GOPs finished collecting, waiting until current DTS %"
GST_STIME_FORMAT " has passed next GOP start PTS %"
GST_STIME_FORMAT,
GST_STIME_ARGS (splitmux->max_in_running_time_dts),
GST_STIME_ARGS (next_gop->start_time_pts));
break;
}
GST_LOG_OBJECT (splitmux,
"Finished collecting GOP with start time %" GST_STIME_FORMAT
", next GOP start time %" GST_STIME_FORMAT,
GST_STIME_ARGS (gop->start_time),
GST_STIME_ARGS (next_gop->start_time));
next_gop_start = next_gop->start_time;
max_out_running_time =
splitmux->reference_ctx->in_eos ? G_MAXINT64 : next_gop->start_time;
} else if (!next_gop) {
GST_LOG_OBJECT (splitmux, "Reference context is EOS");
next_gop_start = splitmux->max_in_running_time;
max_out_running_time = G_MAXINT64;
} else if (!gop) {
GST_LOG_OBJECT (splitmux, "No further GOPs finished collecting");
break;
} else {
g_assert_not_reached ();
}
g_assert (gop != NULL);
/* Iterate each pad, and check that the input running time is at least
* up to the start running time of the next GOP or EOS, and if so handle
* the collected GOP */
GST_LOG_OBJECT (splitmux, "Checking GOP collected, next GOP start %"
GST_STIME_FORMAT " ctx %p", GST_STIME_ARGS (next_gop_start), ctx);
for (cur = g_list_first (splitmux->contexts); cur != NULL;
cur = g_list_next (cur)) {
MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
GST_LOG_OBJECT (splitmux,
"Context %p sink pad %" GST_PTR_FORMAT " @ TS %" GST_STIME_FORMAT
" EOS %d", tmpctx, tmpctx->sinkpad,
GST_STIME_ARGS (tmpctx->in_running_time), tmpctx->in_eos);
if (next_gop_start != GST_CLOCK_STIME_NONE &&
tmpctx->in_running_time < next_gop_start && !tmpctx->in_eos) {
GST_LOG_OBJECT (splitmux,
"Context %p sink pad %" GST_PTR_FORMAT " not ready. We'll sleep",
tmpctx, tmpctx->sinkpad);
ready = FALSE;
break;
}
}
if (ready) {
GST_DEBUG_OBJECT (splitmux,
"Collected GOP is complete. Processing (ctx %p)", ctx);
/* All pads have a complete GOP, release it into the multiqueue */
handle_gathered_gop (splitmux, gop, next_gop_start,
max_out_running_time);
g_queue_pop_head (&splitmux->pending_input_gops);
input_gop_free (gop);
/* The user has requested a split, we can split now that the previous GOP
* has been collected to the correct location */
if (g_atomic_int_compare_and_exchange (&(splitmux->split_requested),
TRUE, FALSE)) {
g_atomic_int_set (&(splitmux->do_split_next_gop), TRUE);
}
}
}
/* If upstream reached EOS we are not expecting more data, no need to wait
* here. */
if (ctx->in_eos)
return;
if (splitmux->input_state == SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT &&
!ctx->flushing &&
ctx->in_running_time >= next_gop_start &&
next_gop_start != GST_CLOCK_STIME_NONE) {
/* Some pad is not yet ready, or GOP is being pushed
* either way, sleep and wait to get woken */
GST_LOG_OBJECT (splitmux, "Sleeping for GOP collection (ctx %p)", ctx);
GST_SPLITMUX_WAIT_INPUT (splitmux);
GST_LOG_OBJECT (splitmux, "Done waiting for complete GOP (ctx %p)", ctx);
} else {
/* This pad is not ready or the state changed - break out and get another
* buffer / event */
break;
}
} while (splitmux->input_state == SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT);
}
static GstPadProbeReturn
handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
{
GstSplitMuxSink *splitmux = ctx->splitmux;
GstFlowReturn ret = GST_FLOW_OK;
GstBuffer *buf;
MqStreamBuf *buf_info = NULL;
GstClockTime ts, pts, dts;
GstClockTimeDiff running_time, running_time_pts, running_time_dts;
gboolean loop_again;
gboolean keyframe = FALSE;
GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type);
/* FIXME: Handle buffer lists, until then make it clear they won't work */
if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
g_warning ("Buffer list handling not implemented");
return GST_PAD_PROBE_DROP;
}
if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM ||
info->type & GST_PAD_PROBE_TYPE_EVENT_FLUSH) {
GstEvent *event = gst_pad_probe_info_get_event (info);
GST_LOG_OBJECT (pad, "Event %" GST_PTR_FORMAT, event);
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_SEGMENT:
gst_event_copy_segment (event, &ctx->in_segment);
break;
case GST_EVENT_FLUSH_STOP:
GST_SPLITMUX_LOCK (splitmux);
gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
ctx->in_eos = FALSE;
ctx->in_running_time = GST_CLOCK_STIME_NONE;
GST_SPLITMUX_UNLOCK (splitmux);
break;
case GST_EVENT_EOS:
GST_SPLITMUX_LOCK (splitmux);
ctx->in_eos = TRUE;
if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED) {
ret = GST_FLOW_FLUSHING;
goto beach;
}
if (ctx->is_reference) {
GST_INFO_OBJECT (splitmux, "Got Reference EOS. Finishing up");
/* check_completed_gop will act as if this is a new keyframe with infinite timestamp */
splitmux->input_state = SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT;
/* Wake up other input pads to collect this GOP */
GST_SPLITMUX_BROADCAST_INPUT (splitmux);
if (g_queue_is_empty (&splitmux->pending_input_gops)) {
GST_WARNING_OBJECT (splitmux,
"EOS with no buffers received on the reference pad");
/* - child muxer and sink might be still locked state
* (see gst_splitmux_reset_elements()) so should be unlocked
* for state change of splitmuxsink to be applied to child
* - would need to post async done message
* - location on sink element is still null then it will post
* error message on bus (muxer will produce something, header
* data for example)
*
* Calls start_next_fragment() here, the method will address
* everything the above mentioned one */
ret = start_next_fragment (splitmux, ctx);
if (ret != GST_FLOW_OK)
goto beach;
} else {
check_completed_gop (splitmux, ctx);
}
} else if (splitmux->input_state ==
SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT) {
/* If we are waiting for a GOP to be completed (ie, for aux
* pads to catch up), then this pad is complete, so check
* if the whole GOP is.
*/
if (!g_queue_is_empty (&splitmux->pending_input_gops))
check_completed_gop (splitmux, ctx);
}
GST_SPLITMUX_UNLOCK (splitmux);
break;
case GST_EVENT_GAP:{
GstClockTime gap_ts;
GstClockTimeDiff rtime;
gst_event_parse_gap (event, &gap_ts, NULL);
if (gap_ts == GST_CLOCK_TIME_NONE)
break;
GST_SPLITMUX_LOCK (splitmux);
if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED) {
ret = GST_FLOW_FLUSHING;
goto beach;
}
rtime = my_segment_to_running_time (&ctx->in_segment, gap_ts);
GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_STIME_FORMAT,
GST_STIME_ARGS (rtime));
if (ctx->is_reference && GST_CLOCK_STIME_IS_VALID (rtime)) {
/* If this GAP event happens before the first fragment then
* initialize the fragment start time here. */
if (!GST_CLOCK_STIME_IS_VALID (splitmux->fragment_start_time)) {
splitmux->fragment_start_time = rtime;
GST_LOG_OBJECT (splitmux,
"Fragment start time now %" GST_STIME_FORMAT,
GST_STIME_ARGS (splitmux->fragment_start_time));
/* Also take this as the first start time when starting up,
* so that we start counting overflow from the first frame */
if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time))
splitmux->max_in_running_time = rtime;
if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time_dts))
splitmux->max_in_running_time_dts = rtime;
}
/* Similarly take it as fragment start PTS and GOP start time if
* these are not set */
if (!GST_CLOCK_STIME_IS_VALID (splitmux->fragment_start_time_pts))
splitmux->fragment_start_time_pts = rtime;
if (g_queue_is_empty (&splitmux->pending_input_gops)) {
InputGop *gop = g_new0 (InputGop, 1);
gop->from_gap = TRUE;
gop->start_time = rtime;
gop->start_time_pts = rtime;
g_queue_push_tail (&splitmux->pending_input_gops, gop);
}
}
GST_SPLITMUX_UNLOCK (splitmux);
break;
}
default:
break;
}
return GST_PAD_PROBE_PASS;
} else if (info->type & GST_PAD_PROBE_TYPE_QUERY_DOWNSTREAM) {
switch (GST_QUERY_TYPE (GST_QUERY (info->data))) {
case GST_QUERY_ALLOCATION:
return GST_PAD_PROBE_DROP;
default:
return GST_PAD_PROBE_PASS;
}
}
buf = gst_pad_probe_info_get_buffer (info);
buf_info = mq_stream_buf_new ();
pts = GST_BUFFER_PTS (buf);
dts = GST_BUFFER_DTS (buf);
if (GST_BUFFER_PTS_IS_VALID (buf))
ts = GST_BUFFER_PTS (buf);
else
ts = GST_BUFFER_DTS (buf);
GST_LOG_OBJECT (pad,
"Buffer TS is %" GST_TIME_FORMAT " (PTS %" GST_TIME_FORMAT ", DTS %"
GST_TIME_FORMAT ")", GST_TIME_ARGS (ts), GST_TIME_ARGS (pts),
GST_TIME_ARGS (dts));
GST_SPLITMUX_LOCK (splitmux);
if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED) {
ret = GST_FLOW_FLUSHING;
goto beach;
}
/* If this buffer has a timestamp, advance the input timestamp of the
* stream */
if (GST_CLOCK_TIME_IS_VALID (ts)) {
running_time = my_segment_to_running_time (&ctx->in_segment, ts);
GST_LOG_OBJECT (pad, "Buffer running TS is %" GST_STIME_FORMAT,
GST_STIME_ARGS (running_time));
/* in running time is always the maximum PTS (or DTS) that was observed so far */
if (GST_CLOCK_STIME_IS_VALID (running_time)
&& running_time > ctx->in_running_time)
ctx->in_running_time = running_time;
} else {
running_time = ctx->in_running_time;
}
if (GST_CLOCK_TIME_IS_VALID (pts))
running_time_pts = my_segment_to_running_time (&ctx->in_segment, pts);
else
running_time_pts = GST_CLOCK_STIME_NONE;
if (GST_CLOCK_TIME_IS_VALID (dts)) {
running_time_dts = my_segment_to_running_time (&ctx->in_segment, dts);
/* DTS > PTS makes conceptually no sense so catch such invalid DTS here
* by clamping to the PTS */
running_time_dts = MIN (running_time_pts, running_time_dts);
} else {
/* If there is no DTS then assume PTS=DTS */
running_time_dts = running_time_pts;
}
/* Try to make sure we have a valid running time */
if (!GST_CLOCK_STIME_IS_VALID (ctx->in_running_time)) {
ctx->in_running_time =
my_segment_to_running_time (&ctx->in_segment, ctx->in_segment.start);
}
GST_LOG_OBJECT (pad, "in running time now %" GST_STIME_FORMAT,
GST_STIME_ARGS (ctx->in_running_time));
buf_info->run_ts = ctx->in_running_time;
buf_info->buf_size = gst_buffer_get_size (buf);
buf_info->duration = GST_BUFFER_DURATION (buf);
if (ctx->is_reference) {
InputGop *gop = NULL;
GstVideoTimeCodeMeta *tc_meta = gst_buffer_get_video_time_code_meta (buf);
/* initialize fragment_start_time if it was not set yet (i.e. for the
* first fragment), or otherwise set it to the minimum observed time */
if (!GST_CLOCK_STIME_IS_VALID (splitmux->fragment_start_time)
|| splitmux->fragment_start_time > running_time) {
if (!GST_CLOCK_STIME_IS_VALID (splitmux->fragment_start_time))
splitmux->fragment_start_time_pts = running_time_pts;
splitmux->fragment_start_time = running_time;
GST_LOG_OBJECT (splitmux,
"Fragment start time now %" GST_STIME_FORMAT " (initial PTS %"
GST_STIME_FORMAT ")", GST_STIME_ARGS (splitmux->fragment_start_time),
GST_STIME_ARGS (splitmux->fragment_start_time_pts));
/* Also take this as the first start time when starting up,
* so that we start counting overflow from the first frame */
if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time)
|| splitmux->max_in_running_time < splitmux->fragment_start_time)
splitmux->max_in_running_time = splitmux->fragment_start_time;
if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time_dts))
splitmux->max_in_running_time_dts = running_time_dts;
if (tc_meta) {
video_time_code_replace (&splitmux->fragment_start_tc, &tc_meta->tc);
splitmux->next_fragment_start_tc_time =
calculate_next_max_timecode (splitmux, &tc_meta->tc,
running_time, NULL);
if (splitmux->tc_interval
&& !GST_CLOCK_TIME_IS_VALID (splitmux->next_fragment_start_tc_time))
{
GST_WARNING_OBJECT (splitmux,
"Couldn't calculate next fragment start time for timecode mode");
}
#ifndef GST_DISABLE_GST_DEBUG
{
gchar *tc_str;
tc_str = gst_video_time_code_to_string (&tc_meta->tc);
GST_DEBUG_OBJECT (splitmux,
"Initialize fragment start timecode %s, next fragment start timecode time %"
GST_TIME_FORMAT, tc_str,
GST_TIME_ARGS (splitmux->next_fragment_start_tc_time));
g_free (tc_str);
}
#endif
}
}
/* First check if we're at the very first GOP and the tracking was created
* from a GAP event. In that case don't start a new GOP on keyframes but
* just updated it as needed */
gop = g_queue_peek_tail (&splitmux->pending_input_gops);
if (!gop || (!gop->from_gap
&& !GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT))) {
gop = g_new0 (InputGop, 1);
gop->start_time = running_time;
gop->start_time_pts = running_time_pts;
GST_LOG_OBJECT (splitmux,
"Next GOP start time now %" GST_STIME_FORMAT " (initial PTS %"
GST_STIME_FORMAT ")", GST_STIME_ARGS (gop->start_time),
GST_STIME_ARGS (gop->start_time_pts));
if (tc_meta) {
video_time_code_replace (&gop->start_tc, &tc_meta->tc);
#ifndef GST_DISABLE_GST_DEBUG
{
gchar *tc_str;
tc_str = gst_video_time_code_to_string (&tc_meta->tc);
GST_DEBUG_OBJECT (splitmux, "Next GOP start timecode %s", tc_str);
g_free (tc_str);
}
#endif
}
g_queue_push_tail (&splitmux->pending_input_gops, gop);
} else {
gop->from_gap = FALSE;
if (!GST_CLOCK_STIME_IS_VALID (gop->start_time)
|| gop->start_time > running_time) {
gop->start_time = running_time;
GST_LOG_OBJECT (splitmux,
"GOP start time updated now %" GST_STIME_FORMAT " (initial PTS %"
GST_STIME_FORMAT ")", GST_STIME_ARGS (gop->start_time),
GST_STIME_ARGS (gop->start_time_pts));
if (tc_meta) {
video_time_code_replace (&gop->start_tc, &tc_meta->tc);
#ifndef GST_DISABLE_GST_DEBUG
{
gchar *tc_str;
tc_str = gst_video_time_code_to_string (&tc_meta->tc);
GST_DEBUG_OBJECT (splitmux, "Next GOP start timecode updated %s",
tc_str);
g_free (tc_str);
}
#endif
}
}
}
/* Check whether we need to request next keyframe depending on
* current running time */
if (request_next_keyframe (splitmux, buf, running_time_dts) == FALSE) {
GST_WARNING_OBJECT (splitmux,
"Could not request a keyframe. Files may not split at the exact location they should");
}
}
{
InputGop *gop = g_queue_peek_tail (&splitmux->pending_input_gops);
if (gop) {
GST_DEBUG_OBJECT (pad, "Buf TS %" GST_STIME_FORMAT
" total GOP bytes %" G_GUINT64_FORMAT ", total next GOP bytes %"
G_GUINT64_FORMAT, GST_STIME_ARGS (buf_info->run_ts),
gop->total_bytes, gop->total_bytes);
}
}
loop_again = TRUE;
do {
if (ctx->flushing) {
ret = GST_FLOW_FLUSHING;
goto beach;
}
switch (splitmux->input_state) {
case SPLITMUX_INPUT_STATE_COLLECTING_GOP_START:
if (ctx->is_reference) {
const InputGop *gop, *next_gop;
/* This is the reference context. If it's a keyframe,
* it marks the start of a new GOP and we should wait in
* check_completed_gop before continuing, but either way
* (keyframe or no, we'll pass this buffer through after
* so set loop_again to FALSE */
loop_again = FALSE;
gop = g_queue_peek_head (&splitmux->pending_input_gops);
g_assert (gop != NULL);
next_gop = g_queue_peek_nth (&splitmux->pending_input_gops, 1);
if (ctx->in_running_time > splitmux->max_in_running_time)
splitmux->max_in_running_time = ctx->in_running_time;
if (running_time_dts > splitmux->max_in_running_time_dts)
splitmux->max_in_running_time_dts = running_time_dts;
GST_LOG_OBJECT (splitmux,
"Max in running time now %" GST_STIME_FORMAT ", DTS %"
GST_STIME_FORMAT, GST_STIME_ARGS (splitmux->max_in_running_time),
GST_STIME_ARGS (splitmux->max_in_running_time_dts));
if (!next_gop) {
GST_DEBUG_OBJECT (pad, "Waiting for end of GOP");
/* Allow other input pads to catch up to here too */
GST_SPLITMUX_BROADCAST_INPUT (splitmux);
break;
}
if (!GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT)) {
GST_INFO_OBJECT (pad,
"Have keyframe with running time %" GST_STIME_FORMAT,
GST_STIME_ARGS (ctx->in_running_time));
keyframe = TRUE;
}
if (running_time_dts != GST_CLOCK_STIME_NONE
&& running_time_dts < next_gop->start_time_pts) {
GST_DEBUG_OBJECT (splitmux,
"Waiting until DTS (%" GST_STIME_FORMAT
") has passed next GOP start PTS (%" GST_STIME_FORMAT ")",
GST_STIME_ARGS (running_time_dts),
GST_STIME_ARGS (next_gop->start_time_pts));
/* Allow other input pads to catch up to here too */
GST_SPLITMUX_BROADCAST_INPUT (splitmux);
break;
}
splitmux->input_state = SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT;
/* Wake up other input pads to collect this GOP */
GST_SPLITMUX_BROADCAST_INPUT (splitmux);
check_completed_gop (splitmux, ctx);
} else {
/* Pass this buffer if the reference ctx is far enough ahead */
if (ctx->in_running_time < splitmux->max_in_running_time) {
loop_again = FALSE;
break;
}
/* We're still waiting for a keyframe on the reference pad, sleep */
GST_LOG_OBJECT (pad, "Sleeping for GOP start");
GST_SPLITMUX_WAIT_INPUT (splitmux);
GST_LOG_OBJECT (pad,
"Done sleeping for GOP start input state now %d",
splitmux->input_state);
}
break;
case SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT:{
/* We're collecting a GOP, this is only ever called for non-reference
* contexts as the reference context would be waiting inside
* check_completed_gop() */
g_assert (!ctx->is_reference);
/* If we overran the target timestamp, it might be time to process
* the GOP, otherwise bail out for more data. */
GST_LOG_OBJECT (pad,
"Checking TS %" GST_STIME_FORMAT " against max %"
GST_STIME_FORMAT, GST_STIME_ARGS (ctx->in_running_time),
GST_STIME_ARGS (splitmux->max_in_running_time));
if (ctx->in_running_time < splitmux->max_in_running_time) {
loop_again = FALSE;
break;
}
GST_LOG_OBJECT (pad,
"Collected last packet of GOP. Checking other pads");
if (g_queue_is_empty (&splitmux->pending_input_gops)) {
GST_WARNING_OBJECT (pad,
"Reference was closed without GOP, dropping");
goto drop;
}
check_completed_gop (splitmux, ctx);
break;
}
case SPLITMUX_INPUT_STATE_FINISHING_UP:
loop_again = FALSE;
break;
default:
loop_again = FALSE;
break;
}
}
while (loop_again);
if (keyframe && ctx->is_reference)
splitmux->queued_keyframes++;
buf_info->keyframe = keyframe;
/* Update total input byte counter for overflow detect unless we're after
* EOS now */
if (splitmux->input_state != SPLITMUX_INPUT_STATE_FINISHING_UP
&& splitmux->input_state != SPLITMUX_INPUT_STATE_STOPPED) {
InputGop *gop = g_queue_peek_tail (&splitmux->pending_input_gops);
/* We must have a GOP at this point */
g_assert (gop != NULL);
gop->total_bytes += buf_info->buf_size;
if (ctx->is_reference) {
gop->reference_bytes += buf_info->buf_size;
}
}
/* Now add this buffer to the queue just before returning */
g_queue_push_head (&ctx->queued_bufs, buf_info);
GST_LOG_OBJECT (pad, "Returning to queue buffer %" GST_PTR_FORMAT
" run ts %" GST_STIME_FORMAT, buf, GST_STIME_ARGS (ctx->in_running_time));
GST_SPLITMUX_UNLOCK (splitmux);
GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
return GST_PAD_PROBE_PASS;
beach:
GST_SPLITMUX_UNLOCK (splitmux);
if (buf_info)
mq_stream_buf_free (buf_info);
GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
return GST_PAD_PROBE_PASS;
drop:
GST_SPLITMUX_UNLOCK (splitmux);
if (buf_info)
mq_stream_buf_free (buf_info);
GST_PAD_PROBE_INFO_FLOW_RETURN (info) = GST_FLOW_EOS;
return GST_PAD_PROBE_DROP;
}
static void
grow_blocked_queues (GstSplitMuxSink * splitmux)
{
GList *cur;
/* Scan other queues for full-ness and grow them */
for (cur = g_list_first (splitmux->contexts);
cur != NULL; cur = g_list_next (cur)) {
MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
guint cur_limit;
guint cur_len = g_queue_get_length (&tmpctx->queued_bufs);
g_object_get (tmpctx->q, "max-size-buffers", &cur_limit, NULL);
GST_LOG_OBJECT (tmpctx->q, "Queue len %u", cur_len);
if (cur_len >= cur_limit) {
cur_limit = cur_len + 1;
GST_DEBUG_OBJECT (tmpctx->q,
"Queue overflowed and needs enlarging. Growing to %u buffers",
cur_limit);
g_object_set (tmpctx->q, "max-size-buffers", cur_limit, NULL);
}
}
}
static void
handle_q_underrun (GstElement * q, gpointer user_data)
{
MqStreamCtx *ctx = (MqStreamCtx *) (user_data);
GstSplitMuxSink *splitmux = ctx->splitmux;
GST_SPLITMUX_LOCK (splitmux);
GST_DEBUG_OBJECT (q,
"Queue reported underrun with %d keyframes and %d cmds enqueued",
splitmux->queued_keyframes, g_queue_get_length (&splitmux->out_cmd_q));
grow_blocked_queues (splitmux);
GST_SPLITMUX_UNLOCK (splitmux);
}
static void
handle_q_overrun (GstElement * q, gpointer user_data)
{
MqStreamCtx *ctx = (MqStreamCtx *) (user_data);
GstSplitMuxSink *splitmux = ctx->splitmux;
gboolean allow_grow = FALSE;
GST_SPLITMUX_LOCK (splitmux);
GST_DEBUG_OBJECT (q,
"Queue reported overrun with %d keyframes and %d cmds enqueued",
splitmux->queued_keyframes, g_queue_get_length (&splitmux->out_cmd_q));
if (splitmux->queued_keyframes < 2) {
/* Less than a full GOP queued, grow the queue */
allow_grow = TRUE;
} else if (g_queue_get_length (&splitmux->out_cmd_q) < 1) {
allow_grow = TRUE;
} else {
/* If another queue is starved, grow */
GList *cur;
for (cur = g_list_first (splitmux->contexts);
cur != NULL; cur = g_list_next (cur)) {
MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
if (tmpctx != ctx && g_queue_get_length (&tmpctx->queued_bufs) < 1) {
allow_grow = TRUE;
}
}
}
GST_SPLITMUX_UNLOCK (splitmux);
if (allow_grow) {
guint cur_limit;
g_object_get (q, "max-size-buffers", &cur_limit, NULL);
cur_limit++;
GST_DEBUG_OBJECT (q,
"Queue overflowed and needs enlarging. Growing to %u buffers",
cur_limit);
g_object_set (q, "max-size-buffers", cur_limit, NULL);
}
}
/* Called with SPLITMUX lock held */
static const gchar *
lookup_muxer_pad (GstSplitMuxSink * splitmux, const gchar * sinkpad_name)
{
const gchar *ret = NULL;
if (splitmux->muxerpad_map == NULL)
return NULL;
if (sinkpad_name == NULL) {
GST_WARNING_OBJECT (splitmux,
"Can't look up request pad in pad map without providing a pad name");
return NULL;
}
ret = gst_structure_get_string (splitmux->muxerpad_map, sinkpad_name);
if (ret) {
GST_INFO_OBJECT (splitmux, "Sink pad %s maps to muxer pad %s", sinkpad_name,
ret);
return g_strdup (ret);
}
return NULL;
}
static GstPad *
gst_splitmux_sink_request_new_pad (GstElement * element,
GstPadTemplate * templ, const gchar * name, const GstCaps * caps)
{
GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
GstPadTemplate *mux_template = NULL;
GstPad *ret = NULL, *muxpad = NULL;
GstElement *q;
GstPad *q_sink = NULL, *q_src = NULL;
gchar *gname, *qname;
gboolean is_primary_video = FALSE, is_video = FALSE,
muxer_is_requestpad = FALSE;
MqStreamCtx *ctx;
const gchar *muxer_padname = NULL;
GST_DEBUG_OBJECT (splitmux, "templ:%s, name:%s", templ->name_template, name);
GST_SPLITMUX_LOCK (splitmux);
if (!create_muxer (splitmux))
goto fail;
g_signal_emit (splitmux, signals[SIGNAL_MUXER_ADDED], 0, splitmux->muxer);
if (g_str_equal (templ->name_template, "video") ||
g_str_has_prefix (templ->name_template, "video_aux_")) {
is_primary_video = g_str_equal (templ->name_template, "video");
if (is_primary_video && splitmux->have_video)
goto already_have_video;
is_video = TRUE;
}
/* See if there's a pad map and it lists this pad */
muxer_padname = lookup_muxer_pad (splitmux, name);
if (muxer_padname == NULL) {
if (is_video) {
/* FIXME: Look for a pad template with matching caps, rather than by name */
GST_DEBUG_OBJECT (element,
"searching for pad-template with name 'video_%%u'");
mux_template =
gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
(splitmux->muxer), "video_%u");
/* Fallback to find sink pad templates named 'video' (flvmux) */
if (!mux_template) {
GST_DEBUG_OBJECT (element,
"searching for pad-template with name 'video'");
mux_template =
gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
(splitmux->muxer), "video");
}
name = NULL;
} else {
GST_DEBUG_OBJECT (element, "searching for pad-template with name '%s'",
templ->name_template);
mux_template =
gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
(splitmux->muxer), templ->name_template);
/* Fallback to find sink pad templates named 'audio' (flvmux) */
if (!mux_template && g_str_has_prefix (templ->name_template, "audio_")) {
GST_DEBUG_OBJECT (element,
"searching for pad-template with name 'audio'");
mux_template =
gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
(splitmux->muxer), "audio");
name = NULL;
}
}
if (mux_template == NULL) {
GST_DEBUG_OBJECT (element,
"searching for pad-template with name 'sink_%%d'");
mux_template =
gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
(splitmux->muxer), "sink_%d");
name = NULL;
}
if (mux_template == NULL) {
GST_DEBUG_OBJECT (element, "searching for pad-template with name 'sink'");
mux_template =
gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
(splitmux->muxer), "sink");
name = NULL;
}
if (mux_template == NULL) {
GST_ERROR_OBJECT (element,
"unable to find a suitable sink pad-template on the muxer");
goto fail;
}
GST_DEBUG_OBJECT (element, "found sink pad-template '%s' on the muxer",
mux_template->name_template);
if (mux_template->presence == GST_PAD_REQUEST) {
GST_DEBUG_OBJECT (element, "requesting pad from pad-template");
muxpad =
gst_element_request_pad (splitmux->muxer, mux_template, name, caps);
muxer_is_requestpad = TRUE;
} else if (mux_template->presence == GST_PAD_ALWAYS) {
GST_DEBUG_OBJECT (element, "accessing always pad from pad-template");
muxpad =
gst_element_get_static_pad (splitmux->muxer,
mux_template->name_template);
} else {
GST_ERROR_OBJECT (element,
"unexpected pad presence %d", mux_template->presence);
goto fail;
}
} else {
/* Have a muxer pad name */
if (!(muxpad = gst_element_get_static_pad (splitmux->muxer, muxer_padname))) {
if ((muxpad =
gst_element_request_pad_simple (splitmux->muxer, muxer_padname)))
muxer_is_requestpad = TRUE;
}
g_free ((gchar *) muxer_padname);
muxer_padname = NULL;
}
/* One way or another, we must have a muxer pad by now */
if (muxpad == NULL)
goto fail;
if (is_primary_video)
gname = g_strdup ("video");
else if (name == NULL)
gname = gst_pad_get_name (muxpad);
else
gname = g_strdup (name);
qname = g_strdup_printf ("queue_%s", gname);
if ((q = create_element (splitmux, "queue", qname, FALSE)) == NULL) {
g_free (qname);
goto fail;
}
g_free (qname);
gst_element_set_state (q, GST_STATE_TARGET (splitmux));
g_object_set (q, "max-size-bytes", 0, "max-size-time", (guint64) (0),
"max-size-buffers", 5, NULL);
q_sink = gst_element_get_static_pad (q, "sink");
q_src = gst_element_get_static_pad (q, "src");
if (gst_pad_link (q_src, muxpad) != GST_PAD_LINK_OK) {
if (muxer_is_requestpad)
gst_element_release_request_pad (splitmux->muxer, muxpad);
gst_object_unref (GST_OBJECT (muxpad));
goto fail;
}
gst_object_unref (GST_OBJECT (muxpad));
ctx = mq_stream_ctx_new (splitmux);
/* Context holds a ref: */
ctx->q = gst_object_ref (q);
ctx->srcpad = q_src;
ctx->sinkpad = q_sink;
ctx->q_overrun_id =
g_signal_connect (q, "overrun", (GCallback) handle_q_overrun, ctx);
g_signal_connect (q, "underrun", (GCallback) handle_q_underrun, ctx);
ctx->src_pad_block_id =
gst_pad_add_probe (q_src,
GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM | GST_PAD_PROBE_TYPE_EVENT_FLUSH,
(GstPadProbeCallback) handle_mq_output, ctx, NULL);
if (is_primary_video && splitmux->reference_ctx != NULL) {
splitmux->reference_ctx->is_reference = FALSE;
splitmux->reference_ctx = NULL;
}
if (splitmux->reference_ctx == NULL) {
splitmux->reference_ctx = ctx;
ctx->is_reference = TRUE;
}
ret = gst_ghost_pad_new_from_template (gname, q_sink, templ);
g_object_set_qdata ((GObject *) (ret), PAD_CONTEXT, ctx);
ctx->sink_pad_block_id =
gst_pad_add_probe (q_sink,
GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM | GST_PAD_PROBE_TYPE_EVENT_FLUSH |
GST_PAD_PROBE_TYPE_QUERY_DOWNSTREAM,
(GstPadProbeCallback) handle_mq_input, ctx, NULL);
GST_DEBUG_OBJECT (splitmux, "splitmuxsink pad %" GST_PTR_FORMAT
" feeds queue pad %" GST_PTR_FORMAT, ret, q_sink);
splitmux->contexts = g_list_append (splitmux->contexts, ctx);
g_free (gname);
if (is_primary_video)
splitmux->have_video = TRUE;
gst_pad_set_active (ret, TRUE);
gst_element_add_pad (GST_ELEMENT (splitmux), ret);
GST_SPLITMUX_UNLOCK (splitmux);
return ret;
fail:
GST_SPLITMUX_UNLOCK (splitmux);
if (q_sink)
gst_object_unref (q_sink);
if (q_src)
gst_object_unref (q_src);
return NULL;
already_have_video:
GST_DEBUG_OBJECT (splitmux, "video sink pad already requested");
GST_SPLITMUX_UNLOCK (splitmux);
return NULL;
}
static void
gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad)
{
GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
GstPad *muxpad = NULL;
MqStreamCtx *ctx =
(MqStreamCtx *) (g_object_get_qdata ((GObject *) (pad), PAD_CONTEXT));
GST_SPLITMUX_LOCK (splitmux);
if (splitmux->muxer == NULL)
goto fail; /* Elements don't exist yet - nothing to release */
GST_INFO_OBJECT (pad, "releasing request pad");
muxpad = gst_pad_get_peer (ctx->srcpad);
/* Remove the context from our consideration */
splitmux->contexts = g_list_remove (splitmux->contexts, ctx);
ctx->flushing = TRUE;
GST_SPLITMUX_BROADCAST_INPUT (splitmux);
GST_SPLITMUX_UNLOCK (splitmux);
if (ctx->sink_pad_block_id) {
gst_pad_remove_probe (ctx->sinkpad, ctx->sink_pad_block_id);
gst_pad_send_event (ctx->sinkpad, gst_event_new_flush_start ());
}
if (ctx->src_pad_block_id)
gst_pad_remove_probe (ctx->srcpad, ctx->src_pad_block_id);
/* Wait for the pad to be free */
GST_PAD_STREAM_LOCK (pad);
GST_SPLITMUX_LOCK (splitmux);
GST_PAD_STREAM_UNLOCK (pad);
/* Can release the context now */
mq_stream_ctx_free (ctx);
if (ctx == splitmux->reference_ctx)
splitmux->reference_ctx = NULL;
/* Release and free the muxer input */
if (muxpad) {
gst_element_release_request_pad (splitmux->muxer, muxpad);
gst_object_unref (muxpad);
}
if (GST_PAD_PAD_TEMPLATE (pad) &&
g_str_equal (GST_PAD_TEMPLATE_NAME_TEMPLATE (GST_PAD_PAD_TEMPLATE
(pad)), "video"))
splitmux->have_video = FALSE;
gst_element_remove_pad (element, pad);
/* Reset the internal elements only after all request pads are released */
if (splitmux->contexts == NULL)
gst_splitmux_reset_elements (splitmux);
/* Wake up other input streams to check if the completion conditions have
* changed */
GST_SPLITMUX_BROADCAST_INPUT (splitmux);
fail:
GST_SPLITMUX_UNLOCK (splitmux);
}
static GstElement *
create_element (GstSplitMuxSink * splitmux,
const gchar * factory, const gchar * name, gboolean locked)
{
GstElement *ret = gst_element_factory_make (factory, name);
if (ret == NULL) {
g_warning ("Failed to create %s - splitmuxsink will not work", name);
return NULL;
}
if (locked) {
/* Ensure the sink starts in locked state and NULL - it will be changed
* by the filename setting code */
gst_element_set_locked_state (ret, TRUE);
gst_element_set_state (ret, GST_STATE_NULL);
}
if (!gst_bin_add (GST_BIN (splitmux), ret)) {
g_warning ("Could not add %s element - splitmuxsink will not work", name);
gst_object_unref (ret);
return NULL;
}
return ret;
}
static gboolean
create_muxer (GstSplitMuxSink * splitmux)
{
/* Create internal elements */
if (splitmux->muxer == NULL) {
GstElement *provided_muxer = NULL;
GST_OBJECT_LOCK (splitmux);
if (splitmux->provided_muxer != NULL)
provided_muxer = gst_object_ref (splitmux->provided_muxer);
GST_OBJECT_UNLOCK (splitmux);
if ((!splitmux->async_finalize && provided_muxer == NULL) ||
(splitmux->async_finalize && splitmux->muxer_factory == NULL)) {
if ((splitmux->muxer =
create_element (splitmux,
splitmux->muxer_factory ? splitmux->
muxer_factory : DEFAULT_MUXER, "muxer", FALSE)) == NULL)
goto fail;
} else if (splitmux->async_finalize) {
if ((splitmux->muxer =
create_element (splitmux, splitmux->muxer_factory, "muxer",
FALSE)) == NULL)
goto fail;
if (splitmux->muxer_preset && GST_IS_PRESET (splitmux->muxer))
gst_preset_load_preset (GST_PRESET (splitmux->muxer),
splitmux->muxer_preset);
if (splitmux->muxer_properties)
gst_structure_foreach (splitmux->muxer_properties,
_set_property_from_structure, splitmux->muxer);
} else {
/* Ensure it's not in locked state (we might be reusing an old element) */
gst_element_set_locked_state (provided_muxer, FALSE);
if (!gst_bin_add (GST_BIN (splitmux), provided_muxer)) {
g_warning ("Could not add muxer element - splitmuxsink will not work");
gst_object_unref (provided_muxer);
goto fail;
}
splitmux->muxer = provided_muxer;
gst_object_unref (provided_muxer);
}
if (splitmux->use_robust_muxing) {
update_muxer_properties (splitmux);
}
}
return TRUE;
fail:
return FALSE;
}
static GstElement *
find_sink (GstElement * e)
{
GstElement *res = NULL;
GstIterator *iter;
gboolean done = FALSE;
GValue data = { 0, };
if (!GST_IS_BIN (e))
return e;
if (g_object_class_find_property (G_OBJECT_GET_CLASS (e), "location") != NULL)
return e;
iter = gst_bin_iterate_sinks (GST_BIN (e));
while (!done) {
switch (gst_iterator_next (iter, &data)) {
case GST_ITERATOR_OK:
{
GstElement *child = g_value_get_object (&data);
if (g_object_class_find_property (G_OBJECT_GET_CLASS (child),
"location") != NULL) {
res = child;
done = TRUE;
}
g_value_reset (&data);
break;
}
case GST_ITERATOR_RESYNC:
gst_iterator_resync (iter);
break;
case GST_ITERATOR_DONE:
done = TRUE;
break;
case GST_ITERATOR_ERROR:
g_assert_not_reached ();
break;
}
}
g_value_unset (&data);
gst_iterator_free (iter);
return res;
}
static gboolean
create_sink (GstSplitMuxSink * splitmux)
{
GstElement *provided_sink = NULL;
if (splitmux->active_sink == NULL) {
GST_OBJECT_LOCK (splitmux);
if (splitmux->provided_sink != NULL)
provided_sink = gst_object_ref (splitmux->provided_sink);
GST_OBJECT_UNLOCK (splitmux);
if ((!splitmux->async_finalize && provided_sink == NULL) ||
(splitmux->async_finalize && splitmux->sink_factory == NULL)) {
if ((splitmux->sink =
create_element (splitmux, DEFAULT_SINK, "sink", TRUE)) == NULL)
goto fail;
splitmux->active_sink = splitmux->sink;
} else if (splitmux->async_finalize) {
if ((splitmux->sink =
create_element (splitmux, splitmux->sink_factory, "sink",
TRUE)) == NULL)
goto fail;
if (splitmux->sink_preset && GST_IS_PRESET (splitmux->sink))
gst_preset_load_preset (GST_PRESET (splitmux->sink),
splitmux->sink_preset);
if (splitmux->sink_properties)
gst_structure_foreach (splitmux->sink_properties,
_set_property_from_structure, splitmux->sink);
splitmux->active_sink = splitmux->sink;
} else {
/* Ensure the sink starts in locked state and NULL - it will be changed
* by the filename setting code */
gst_element_set_locked_state (provided_sink, TRUE);
gst_element_set_state (provided_sink, GST_STATE_NULL);
if (!gst_bin_add (GST_BIN (splitmux), provided_sink)) {
g_warning ("Could not add sink elements - splitmuxsink will not work");
gst_object_unref (provided_sink);
goto fail;
}
splitmux->active_sink = provided_sink;
/* The bin holds a ref now, we can drop our tmp ref */
gst_object_unref (provided_sink);
/* Find the sink element */
splitmux->sink = find_sink (splitmux->active_sink);
if (splitmux->sink == NULL) {
g_warning
("Could not locate sink element in provided sink - splitmuxsink will not work");
goto fail;
}
}
#if 1
if (g_object_class_find_property (G_OBJECT_GET_CLASS (splitmux->sink),
"async") != NULL) {
/* async child elements are causing state change races and weird
* failures, so let's try and turn that off */
g_object_set (splitmux->sink, "async", FALSE, NULL);
}
#endif
if (!gst_element_link (splitmux->muxer, splitmux->active_sink)) {
g_warning ("Failed to link muxer and sink- splitmuxsink will not work");
goto fail;
}
}
return TRUE;
fail:
return FALSE;
}
#ifdef __GNUC__
#pragma GCC diagnostic ignored "-Wformat-nonliteral"
#endif
static void
set_next_filename (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
{
gchar *fname = NULL;
GstSample *sample;
GstCaps *caps;
gst_splitmux_sink_ensure_max_files (splitmux);
if (ctx->cur_out_buffer == NULL) {
GST_WARNING_OBJECT (splitmux, "Starting next file without buffer");
}
caps = gst_pad_get_current_caps (ctx->srcpad);
sample = gst_sample_new (ctx->cur_out_buffer, caps, &ctx->out_segment, NULL);
g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION_FULL], 0,
splitmux->fragment_id, sample, &fname);
gst_sample_unref (sample);
if (caps)
gst_caps_unref (caps);
if (fname == NULL) {
/* Fallback to the old signal if the new one returned nothing */
g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION], 0,
splitmux->fragment_id, &fname);
}
if (!fname)
fname = splitmux->location ?
g_strdup_printf (splitmux->location, splitmux->fragment_id) : NULL;
if (fname) {
GST_INFO_OBJECT (splitmux, "Setting file to %s", fname);
if (g_object_class_find_property (G_OBJECT_GET_CLASS (splitmux->sink),
"location") != NULL)
g_object_set (splitmux->sink, "location", fname, NULL);
g_free (fname);
}
splitmux->fragment_id++;
}
/* called with GST_SPLITMUX_LOCK */
static void
do_async_start (GstSplitMuxSink * splitmux)
{
GstMessage *message;
if (!splitmux->need_async_start) {
GST_INFO_OBJECT (splitmux, "no async_start needed");
return;
}
splitmux->async_pending = TRUE;
GST_INFO_OBJECT (splitmux, "Sending async_start message");
message = gst_message_new_async_start (GST_OBJECT_CAST (splitmux));
GST_SPLITMUX_UNLOCK (splitmux);
GST_BIN_CLASS (parent_class)->handle_message (GST_BIN_CAST
(splitmux), message);
GST_SPLITMUX_LOCK (splitmux);
}
/* called with GST_SPLITMUX_LOCK */
static void
do_async_done (GstSplitMuxSink * splitmux)
{
GstMessage *message;
if (splitmux->async_pending) {
GST_INFO_OBJECT (splitmux, "Sending async_done message");
splitmux->async_pending = FALSE;
GST_SPLITMUX_UNLOCK (splitmux);
message =
gst_message_new_async_done (GST_OBJECT_CAST (splitmux),
GST_CLOCK_TIME_NONE);
GST_BIN_CLASS (parent_class)->handle_message (GST_BIN_CAST
(splitmux), message);
GST_SPLITMUX_LOCK (splitmux);
}
splitmux->need_async_start = FALSE;
}
static void
gst_splitmux_sink_reset (GstSplitMuxSink * splitmux)
{
splitmux->max_in_running_time = GST_CLOCK_STIME_NONE;
splitmux->max_in_running_time_dts = GST_CLOCK_STIME_NONE;
splitmux->fragment_start_time = GST_CLOCK_STIME_NONE;
splitmux->fragment_start_time_pts = GST_CLOCK_STIME_NONE;
g_clear_pointer (&splitmux->fragment_start_tc, gst_video_time_code_free);
g_queue_foreach (&splitmux->pending_input_gops, (GFunc) input_gop_free, NULL);
g_queue_clear (&splitmux->pending_input_gops);
splitmux->max_out_running_time = GST_CLOCK_STIME_NONE;
splitmux->fragment_total_bytes = 0;
splitmux->fragment_reference_bytes = 0;
splitmux->muxed_out_bytes = 0;
splitmux->ready_for_output = FALSE;
g_atomic_int_set (&(splitmux->split_requested), FALSE);
g_atomic_int_set (&(splitmux->do_split_next_gop), FALSE);
splitmux->next_fku_time = GST_CLOCK_TIME_NONE;
gst_queue_array_clear (splitmux->times_to_split);
g_list_foreach (splitmux->contexts, (GFunc) mq_stream_ctx_reset, NULL);
splitmux->queued_keyframes = 0;
g_queue_foreach (&splitmux->out_cmd_q, (GFunc) out_cmd_buf_free, NULL);
g_queue_clear (&splitmux->out_cmd_q);
}
static GstStateChangeReturn
gst_splitmux_sink_change_state (GstElement * element, GstStateChange transition)
{
GstStateChangeReturn ret;
GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
switch (transition) {
case GST_STATE_CHANGE_NULL_TO_READY:{
GST_SPLITMUX_LOCK (splitmux);
if (!create_muxer (splitmux) || !create_sink (splitmux)) {
ret = GST_STATE_CHANGE_FAILURE;
GST_SPLITMUX_UNLOCK (splitmux);
goto beach;
}
g_signal_emit (splitmux, signals[SIGNAL_MUXER_ADDED], 0, splitmux->muxer);
g_signal_emit (splitmux, signals[SIGNAL_SINK_ADDED], 0, splitmux->sink);
GST_SPLITMUX_UNLOCK (splitmux);
splitmux->fragment_id = splitmux->start_index;
break;
}
case GST_STATE_CHANGE_READY_TO_PAUSED:{
GST_SPLITMUX_LOCK (splitmux);
/* Make sure contexts and tracking times are cleared, in case we're being reused */
gst_splitmux_sink_reset (splitmux);
/* Start by collecting one input on each pad */
splitmux->input_state = SPLITMUX_INPUT_STATE_COLLECTING_GOP_START;
splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE;
GST_SPLITMUX_UNLOCK (splitmux);
GST_SPLITMUX_STATE_LOCK (splitmux);
splitmux->shutdown = FALSE;
GST_SPLITMUX_STATE_UNLOCK (splitmux);
break;
}
case GST_STATE_CHANGE_PAUSED_TO_READY:
case GST_STATE_CHANGE_READY_TO_READY:
g_atomic_int_set (&(splitmux->split_requested), FALSE);
g_atomic_int_set (&(splitmux->do_split_next_gop), FALSE);
/* Fall through */
case GST_STATE_CHANGE_READY_TO_NULL:
GST_SPLITMUX_STATE_LOCK (splitmux);
splitmux->shutdown = TRUE;
GST_SPLITMUX_STATE_UNLOCK (splitmux);
GST_SPLITMUX_LOCK (splitmux);
gst_splitmux_sink_reset (splitmux);
splitmux->output_state = SPLITMUX_OUTPUT_STATE_STOPPED;
splitmux->input_state = SPLITMUX_INPUT_STATE_STOPPED;
/* Wake up any blocked threads */
GST_LOG_OBJECT (splitmux,
"State change -> NULL or READY. Waking threads");
GST_SPLITMUX_BROADCAST_INPUT (splitmux);
GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
GST_SPLITMUX_UNLOCK (splitmux);
break;
default:
break;
}
ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
if (ret == GST_STATE_CHANGE_FAILURE)
goto beach;
switch (transition) {
case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
splitmux->need_async_start = TRUE;
break;
case GST_STATE_CHANGE_READY_TO_PAUSED:{
/* Change state async, because our child sink might not
* be ready to do that for us yet if it's state is still locked */
splitmux->need_async_start = TRUE;
/* we want to go async to PAUSED until we managed to configure and add the
* sink */
GST_SPLITMUX_LOCK (splitmux);
do_async_start (splitmux);
GST_SPLITMUX_UNLOCK (splitmux);
ret = GST_STATE_CHANGE_ASYNC;
break;
}
case GST_STATE_CHANGE_READY_TO_NULL:
GST_SPLITMUX_LOCK (splitmux);
splitmux->fragment_id = 0;
/* Reset internal elements only if no pad contexts are using them */
if (splitmux->contexts == NULL)
gst_splitmux_reset_elements (splitmux);
do_async_done (splitmux);
GST_SPLITMUX_UNLOCK (splitmux);
break;
default:
break;
}
return ret;
beach:
if (transition == GST_STATE_CHANGE_NULL_TO_READY) {
/* Cleanup elements on failed transition out of NULL */
gst_splitmux_reset_elements (splitmux);
GST_SPLITMUX_LOCK (splitmux);
do_async_done (splitmux);
GST_SPLITMUX_UNLOCK (splitmux);
}
if (transition == GST_STATE_CHANGE_READY_TO_READY) {
/* READY to READY transition only happens when we're already
* in READY state, but a child element is in NULL, which
* happens when there's an error changing the state of the sink.
* We need to make sure not to fail the state transition, or
* the core won't transition us back to NULL successfully */
ret = GST_STATE_CHANGE_SUCCESS;
}
return ret;
}
static void
gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux)
{
if (splitmux->max_files && splitmux->fragment_id >= splitmux->max_files) {
splitmux->fragment_id = 0;
}
}
static void
split_now (GstSplitMuxSink * splitmux)
{
g_atomic_int_set (&(splitmux->do_split_next_gop), TRUE);
}
static void
split_after (GstSplitMuxSink * splitmux)
{
g_atomic_int_set (&(splitmux->split_requested), TRUE);
}
static void
split_at_running_time (GstSplitMuxSink * splitmux, GstClockTime split_time)
{
gboolean send_keyframe_requests;
GST_SPLITMUX_LOCK (splitmux);
gst_queue_array_push_tail_struct (splitmux->times_to_split, &split_time);
send_keyframe_requests = splitmux->send_keyframe_requests;
GST_SPLITMUX_UNLOCK (splitmux);
if (send_keyframe_requests) {
GstEvent *ev =
gst_video_event_new_upstream_force_key_unit (split_time, TRUE, 0);
GST_INFO_OBJECT (splitmux, "Requesting next keyframe at %" GST_TIME_FORMAT,
GST_TIME_ARGS (split_time));
if (!gst_pad_push_event (splitmux->reference_ctx->sinkpad, ev)) {
GST_WARNING_OBJECT (splitmux,
"Could not request keyframe at %" GST_TIME_FORMAT,
GST_TIME_ARGS (split_time));
}
}
}