mirror of
https://gitlab.freedesktop.org/gstreamer/gstreamer.git
synced 2025-01-18 05:16:05 +00:00
3042 lines
103 KiB
C
3042 lines
103 KiB
C
/* GStreamer Muxer bin that splits output stream by size/time
|
|
* Copyright (C) <2014> Jan Schmidt <jan@centricular.com>
|
|
*
|
|
* This library is free software; you can redistribute it and/or
|
|
* modify it under the terms of the GNU Library General Public
|
|
* License as published by the Free Software Foundation; either
|
|
* version 2 of the License, or (at your option) any later version.
|
|
*
|
|
* This library is distributed in the hope that it will be useful,
|
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
|
* Library General Public License for more details.
|
|
*
|
|
* You should have received a copy of the GNU Library General Public
|
|
* License along with this library; if not, write to the
|
|
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
|
|
* Boston, MA 02110-1301, USA.
|
|
*/
|
|
|
|
/**
|
|
* SECTION:element-splitmuxsink
|
|
* @short_description: Muxer wrapper for splitting output stream by size or time
|
|
*
|
|
* This element wraps a muxer and a sink, and starts a new file when the mux
|
|
* contents are about to cross a threshold of maximum size of maximum time,
|
|
* splitting at video keyframe boundaries. Exactly one input video stream
|
|
* can be muxed, with as many accompanying audio and subtitle streams as
|
|
* desired.
|
|
*
|
|
* By default, it uses mp4mux and filesink, but they can be changed via
|
|
* the 'muxer' and 'sink' properties.
|
|
*
|
|
* The minimum file size is 1 GOP, however - so limits may be overrun if the
|
|
* distance between any 2 keyframes is larger than the limits.
|
|
*
|
|
* If a video stream is available, the splitting process is driven by the video
|
|
* stream contents, and the video stream must contain closed GOPs for the output
|
|
* file parts to be played individually correctly. In the absence of a video
|
|
* stream, the first available stream is used as reference for synchronization.
|
|
*
|
|
* In the async-finalize mode, when the threshold is crossed, the old muxer
|
|
* and sink is disconnected from the pipeline and left to finish the file
|
|
* asynchronously, and a new muxer and sink is created to continue with the
|
|
* next fragment. For that reason, instead of muxer and sink objects, the
|
|
* muxer-factory and sink-factory properties are used to construct the new
|
|
* objects, together with muxer-properties and sink-properties.
|
|
*
|
|
* <refsect2>
|
|
* <title>Example pipelines</title>
|
|
* |[
|
|
* gst-launch-1.0 -e v4l2src num-buffers=500 ! video/x-raw,width=320,height=240 ! videoconvert ! queue ! timeoverlay ! x264enc key-int-max=10 ! h264parse ! splitmuxsink location=video%02d.mov max-size-time=10000000000 max-size-bytes=1000000
|
|
* ]|
|
|
* Records a video stream captured from a v4l2 device and muxes it into
|
|
* ISO mp4 files, splitting as needed to limit size/duration to 10 seconds
|
|
* and 1MB maximum size.
|
|
*
|
|
* |[
|
|
* gst-launch-1.0 -e v4l2src num-buffers=500 ! video/x-raw,width=320,height=240 ! videoconvert ! queue ! timeoverlay ! x264enc key-int-max=10 ! h264parse ! splitmuxsink location=video%02d.mkv max-size-time=10000000000 muxer-factory=matroskamux muxer-properties="properties,streamable=true"
|
|
* ]|
|
|
* Records a video stream captured from a v4l2 device and muxer it into
|
|
* streamable Matroska files, splitting as needed to limit size/duration to 10
|
|
* seconds. Each file will finalize asynchronously.
|
|
* </refsect2>
|
|
*/
|
|
|
|
#ifdef HAVE_CONFIG_H
|
|
#include "config.h"
|
|
#endif
|
|
|
|
#include <string.h>
|
|
#include <glib/gstdio.h>
|
|
#include <gst/video/video.h>
|
|
#include "gstsplitmuxsink.h"
|
|
|
|
GST_DEBUG_CATEGORY_STATIC (splitmux_debug);
|
|
#define GST_CAT_DEFAULT splitmux_debug
|
|
|
|
#define GST_SPLITMUX_LOCK(s) g_mutex_lock(&(s)->lock)
|
|
#define GST_SPLITMUX_UNLOCK(s) g_mutex_unlock(&(s)->lock)
|
|
#define GST_SPLITMUX_WAIT_INPUT(s) g_cond_wait (&(s)->input_cond, &(s)->lock)
|
|
#define GST_SPLITMUX_BROADCAST_INPUT(s) g_cond_broadcast (&(s)->input_cond)
|
|
|
|
#define GST_SPLITMUX_WAIT_OUTPUT(s) g_cond_wait (&(s)->output_cond, &(s)->lock)
|
|
#define GST_SPLITMUX_BROADCAST_OUTPUT(s) g_cond_broadcast (&(s)->output_cond)
|
|
|
|
static void split_now (GstSplitMuxSink * splitmux);
|
|
|
|
enum
|
|
{
|
|
PROP_0,
|
|
PROP_LOCATION,
|
|
PROP_MAX_SIZE_TIME,
|
|
PROP_MAX_SIZE_BYTES,
|
|
PROP_MAX_SIZE_TIMECODE,
|
|
PROP_SEND_KEYFRAME_REQUESTS,
|
|
PROP_MAX_FILES,
|
|
PROP_MUXER_OVERHEAD,
|
|
PROP_USE_ROBUST_MUXING,
|
|
PROP_ALIGNMENT_THRESHOLD,
|
|
PROP_MUXER,
|
|
PROP_SINK,
|
|
PROP_RESET_MUXER,
|
|
PROP_ASYNC_FINALIZE,
|
|
PROP_MUXER_FACTORY,
|
|
PROP_MUXER_PROPERTIES,
|
|
PROP_SINK_FACTORY,
|
|
PROP_SINK_PROPERTIES
|
|
};
|
|
|
|
#define DEFAULT_MAX_SIZE_TIME 0
|
|
#define DEFAULT_MAX_SIZE_BYTES 0
|
|
#define DEFAULT_MAX_FILES 0
|
|
#define DEFAULT_MUXER_OVERHEAD 0.02
|
|
#define DEFAULT_SEND_KEYFRAME_REQUESTS FALSE
|
|
#define DEFAULT_ALIGNMENT_THRESHOLD 0
|
|
#define DEFAULT_MUXER "mp4mux"
|
|
#define DEFAULT_SINK "filesink"
|
|
#define DEFAULT_USE_ROBUST_MUXING FALSE
|
|
#define DEFAULT_RESET_MUXER TRUE
|
|
#define DEFAULT_ASYNC_FINALIZE FALSE
|
|
|
|
typedef struct _AsyncEosHelper
|
|
{
|
|
MqStreamCtx *ctx;
|
|
GstPad *pad;
|
|
} AsyncEosHelper;
|
|
|
|
enum
|
|
{
|
|
SIGNAL_FORMAT_LOCATION,
|
|
SIGNAL_FORMAT_LOCATION_FULL,
|
|
SIGNAL_SPLIT_NOW,
|
|
SIGNAL_MUXER_ADDED,
|
|
SIGNAL_SINK_ADDED,
|
|
SIGNAL_LAST
|
|
};
|
|
|
|
static guint signals[SIGNAL_LAST];
|
|
|
|
static GstStaticPadTemplate video_sink_template =
|
|
GST_STATIC_PAD_TEMPLATE ("video",
|
|
GST_PAD_SINK,
|
|
GST_PAD_REQUEST,
|
|
GST_STATIC_CAPS_ANY);
|
|
static GstStaticPadTemplate audio_sink_template =
|
|
GST_STATIC_PAD_TEMPLATE ("audio_%u",
|
|
GST_PAD_SINK,
|
|
GST_PAD_REQUEST,
|
|
GST_STATIC_CAPS_ANY);
|
|
static GstStaticPadTemplate subtitle_sink_template =
|
|
GST_STATIC_PAD_TEMPLATE ("subtitle_%u",
|
|
GST_PAD_SINK,
|
|
GST_PAD_REQUEST,
|
|
GST_STATIC_CAPS_ANY);
|
|
static GstStaticPadTemplate caption_sink_template =
|
|
GST_STATIC_PAD_TEMPLATE ("caption_%u",
|
|
GST_PAD_SINK,
|
|
GST_PAD_REQUEST,
|
|
GST_STATIC_CAPS_ANY);
|
|
|
|
static GQuark PAD_CONTEXT;
|
|
static GQuark EOS_FROM_US;
|
|
static GQuark RUNNING_TIME;
|
|
/* EOS_FROM_US is only valid in async-finalize mode. We need to know whether
|
|
* to forward an incoming EOS message, but we cannot rely on the state of the
|
|
* splitmux anymore, so we set this qdata on the sink instead.
|
|
* The muxer and sink must be destroyed after both of these things have
|
|
* finished:
|
|
* 1) The EOS message has been sent when the fragment is ending
|
|
* 2) The muxer has been unlinked and relinked
|
|
* Therefore, EOS_FROM_US can have these two values:
|
|
* 0: EOS was not requested from us. Forward the message. The muxer and the
|
|
* sink will be destroyed together with the rest of the bin.
|
|
* 1: EOS was requested from us, but the other of the two tasks hasn't
|
|
* finished. Set EOS_FROM_US to 2 and do your stuff.
|
|
* 2: EOS was requested from us and the other of the two tasks has finished.
|
|
* Now we can destroy the muxer and the sink.
|
|
*/
|
|
|
|
static void
|
|
_do_init (void)
|
|
{
|
|
PAD_CONTEXT = g_quark_from_static_string ("pad-context");
|
|
EOS_FROM_US = g_quark_from_static_string ("eos-from-us");
|
|
RUNNING_TIME = g_quark_from_static_string ("running-time");
|
|
}
|
|
|
|
#define gst_splitmux_sink_parent_class parent_class
|
|
G_DEFINE_TYPE_EXTENDED (GstSplitMuxSink, gst_splitmux_sink, GST_TYPE_BIN, 0,
|
|
_do_init ());
|
|
|
|
static gboolean create_muxer (GstSplitMuxSink * splitmux);
|
|
static gboolean create_sink (GstSplitMuxSink * splitmux);
|
|
static void gst_splitmux_sink_set_property (GObject * object, guint prop_id,
|
|
const GValue * value, GParamSpec * pspec);
|
|
static void gst_splitmux_sink_get_property (GObject * object, guint prop_id,
|
|
GValue * value, GParamSpec * pspec);
|
|
static void gst_splitmux_sink_dispose (GObject * object);
|
|
static void gst_splitmux_sink_finalize (GObject * object);
|
|
|
|
static GstPad *gst_splitmux_sink_request_new_pad (GstElement * element,
|
|
GstPadTemplate * templ, const gchar * name, const GstCaps * caps);
|
|
static void gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad);
|
|
|
|
static GstStateChangeReturn gst_splitmux_sink_change_state (GstElement *
|
|
element, GstStateChange transition);
|
|
|
|
static void bus_handler (GstBin * bin, GstMessage * msg);
|
|
static void set_next_filename (GstSplitMuxSink * splitmux, MqStreamCtx * ctx);
|
|
static void start_next_fragment (GstSplitMuxSink * splitmux, MqStreamCtx * ctx);
|
|
static void mq_stream_ctx_free (MqStreamCtx * ctx);
|
|
static void grow_blocked_queues (GstSplitMuxSink * splitmux);
|
|
|
|
static void gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux);
|
|
static GstElement *create_element (GstSplitMuxSink * splitmux,
|
|
const gchar * factory, const gchar * name, gboolean locked);
|
|
|
|
static void do_async_done (GstSplitMuxSink * splitmux);
|
|
|
|
static MqStreamBuf *
|
|
mq_stream_buf_new (void)
|
|
{
|
|
return g_slice_new0 (MqStreamBuf);
|
|
}
|
|
|
|
static void
|
|
mq_stream_buf_free (MqStreamBuf * data)
|
|
{
|
|
g_slice_free (MqStreamBuf, data);
|
|
}
|
|
|
|
static SplitMuxOutputCommand *
|
|
out_cmd_buf_new (void)
|
|
{
|
|
return g_slice_new0 (SplitMuxOutputCommand);
|
|
}
|
|
|
|
static void
|
|
out_cmd_buf_free (SplitMuxOutputCommand * data)
|
|
{
|
|
g_slice_free (SplitMuxOutputCommand, data);
|
|
}
|
|
|
|
static void
|
|
gst_splitmux_sink_class_init (GstSplitMuxSinkClass * klass)
|
|
{
|
|
GObjectClass *gobject_class = (GObjectClass *) klass;
|
|
GstElementClass *gstelement_class = (GstElementClass *) klass;
|
|
GstBinClass *gstbin_class = (GstBinClass *) klass;
|
|
|
|
gobject_class->set_property = gst_splitmux_sink_set_property;
|
|
gobject_class->get_property = gst_splitmux_sink_get_property;
|
|
gobject_class->dispose = gst_splitmux_sink_dispose;
|
|
gobject_class->finalize = gst_splitmux_sink_finalize;
|
|
|
|
gst_element_class_set_static_metadata (gstelement_class,
|
|
"Split Muxing Bin", "Generic/Bin/Muxer",
|
|
"Convenience bin that muxes incoming streams into multiple time/size limited files",
|
|
"Jan Schmidt <jan@centricular.com>");
|
|
|
|
gst_element_class_add_static_pad_template (gstelement_class,
|
|
&video_sink_template);
|
|
gst_element_class_add_static_pad_template (gstelement_class,
|
|
&audio_sink_template);
|
|
gst_element_class_add_static_pad_template (gstelement_class,
|
|
&subtitle_sink_template);
|
|
gst_element_class_add_static_pad_template (gstelement_class,
|
|
&caption_sink_template);
|
|
|
|
gstelement_class->change_state =
|
|
GST_DEBUG_FUNCPTR (gst_splitmux_sink_change_state);
|
|
gstelement_class->request_new_pad =
|
|
GST_DEBUG_FUNCPTR (gst_splitmux_sink_request_new_pad);
|
|
gstelement_class->release_pad =
|
|
GST_DEBUG_FUNCPTR (gst_splitmux_sink_release_pad);
|
|
|
|
gstbin_class->handle_message = bus_handler;
|
|
|
|
g_object_class_install_property (gobject_class, PROP_LOCATION,
|
|
g_param_spec_string ("location", "File Output Pattern",
|
|
"Format string pattern for the location of the files to write (e.g. video%05d.mp4)",
|
|
NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
|
|
g_object_class_install_property (gobject_class, PROP_MUXER_OVERHEAD,
|
|
g_param_spec_double ("mux-overhead", "Muxing Overhead",
|
|
"Extra size overhead of muxing (0.02 = 2%)", 0.0, 1.0,
|
|
DEFAULT_MUXER_OVERHEAD,
|
|
G_PARAM_READWRITE | GST_PARAM_CONTROLLABLE | G_PARAM_STATIC_STRINGS));
|
|
|
|
g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
|
|
g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
|
|
"Max. amount of time per file (in ns, 0=disable)", 0, G_MAXUINT64,
|
|
DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
|
|
g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BYTES,
|
|
g_param_spec_uint64 ("max-size-bytes", "Max. size bytes",
|
|
"Max. amount of data per file (in bytes, 0=disable)", 0, G_MAXUINT64,
|
|
DEFAULT_MAX_SIZE_BYTES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
|
|
g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIMECODE,
|
|
g_param_spec_string ("max-size-timecode", "Maximum timecode difference",
|
|
"Maximum difference in timecode between first and last frame. "
|
|
"Separator is assumed to be \":\" everywhere (e.g. 01:00:00:00). "
|
|
"Will only be effective if a timecode track is present.",
|
|
NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
|
|
g_object_class_install_property (gobject_class, PROP_SEND_KEYFRAME_REQUESTS,
|
|
g_param_spec_boolean ("send-keyframe-requests",
|
|
"Request keyframes at max-size-time",
|
|
"Request a keyframe every max-size-time ns to try splitting at that point. "
|
|
"Needs max-size-bytes to be 0 in order to be effective.",
|
|
DEFAULT_SEND_KEYFRAME_REQUESTS,
|
|
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
|
|
g_object_class_install_property (gobject_class, PROP_MAX_FILES,
|
|
g_param_spec_uint ("max-files", "Max files",
|
|
"Maximum number of files to keep on disk. Once the maximum is reached,"
|
|
"old files start to be deleted to make room for new ones.", 0,
|
|
G_MAXUINT, DEFAULT_MAX_FILES,
|
|
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
|
|
g_object_class_install_property (gobject_class, PROP_ALIGNMENT_THRESHOLD,
|
|
g_param_spec_uint64 ("alignment-threshold", "Alignment threshold (ns)",
|
|
"Allow non-reference streams to be that many ns before the reference"
|
|
" stream",
|
|
0, G_MAXUINT64, DEFAULT_ALIGNMENT_THRESHOLD,
|
|
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
|
|
|
|
g_object_class_install_property (gobject_class, PROP_MUXER,
|
|
g_param_spec_object ("muxer", "Muxer",
|
|
"The muxer element to use (NULL = default mp4mux). "
|
|
"Valid only for async-finalize = FALSE",
|
|
GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
|
|
g_object_class_install_property (gobject_class, PROP_SINK,
|
|
g_param_spec_object ("sink", "Sink",
|
|
"The sink element (or element chain) to use (NULL = default filesink). "
|
|
"Valid only for async-finalize = FALSE",
|
|
GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
|
|
|
|
g_object_class_install_property (gobject_class, PROP_USE_ROBUST_MUXING,
|
|
g_param_spec_boolean ("use-robust-muxing",
|
|
"Support robust-muxing mode of some muxers",
|
|
"Check if muxers support robust muxing via the reserved-max-duration and "
|
|
"reserved-duration-remaining properties and use them if so. "
|
|
"(Only present on qtmux and mp4mux for now). splitmuxsink may then also "
|
|
" create new fragments if the reserved header space is about to overflow. "
|
|
"Note that for mp4mux and qtmux, reserved-moov-update-period must be set "
|
|
"manually by the app to a non-zero value for robust muxing to have an effect.",
|
|
DEFAULT_USE_ROBUST_MUXING,
|
|
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
|
|
|
|
g_object_class_install_property (gobject_class, PROP_RESET_MUXER,
|
|
g_param_spec_boolean ("reset-muxer",
|
|
"Reset Muxer",
|
|
"Reset the muxer after each segment. Disabling this will not work for most muxers.",
|
|
DEFAULT_RESET_MUXER, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
|
|
|
|
g_object_class_install_property (gobject_class, PROP_ASYNC_FINALIZE,
|
|
g_param_spec_boolean ("async-finalize",
|
|
"Finalize fragments asynchronously",
|
|
"Finalize each fragment asynchronously and start a new one",
|
|
DEFAULT_ASYNC_FINALIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
|
|
g_object_class_install_property (gobject_class, PROP_MUXER_FACTORY,
|
|
g_param_spec_string ("muxer-factory", "Muxer factory",
|
|
"The muxer element factory to use (default = mp4mux). "
|
|
"Valid only for async-finalize = TRUE",
|
|
"mp4mux", G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
|
|
g_object_class_install_property (gobject_class, PROP_MUXER_PROPERTIES,
|
|
g_param_spec_boxed ("muxer-properties", "Muxer properties",
|
|
"The muxer element properties to use. "
|
|
"Example: {properties,boolean-prop=true,string-prop=\"hi\"}. "
|
|
"Valid only for async-finalize = TRUE",
|
|
GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
|
|
g_object_class_install_property (gobject_class, PROP_SINK_FACTORY,
|
|
g_param_spec_string ("sink-factory", "Sink factory",
|
|
"The sink element factory to use (default = filesink). "
|
|
"Valid only for async-finalize = TRUE",
|
|
"filesink", G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
|
|
g_object_class_install_property (gobject_class, PROP_SINK_PROPERTIES,
|
|
g_param_spec_boxed ("sink-properties", "Sink properties",
|
|
"The sink element properties to use. "
|
|
"Example: {properties,boolean-prop=true,string-prop=\"hi\"}. "
|
|
"Valid only for async-finalize = TRUE",
|
|
GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
|
|
|
|
/**
|
|
* GstSplitMuxSink::format-location:
|
|
* @splitmux: the #GstSplitMuxSink
|
|
* @fragment_id: the sequence number of the file to be created
|
|
*
|
|
* Returns: the location to be used for the next output file
|
|
*/
|
|
signals[SIGNAL_FORMAT_LOCATION] =
|
|
g_signal_new ("format-location", G_TYPE_FROM_CLASS (klass),
|
|
G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_STRING, 1, G_TYPE_UINT);
|
|
|
|
/**
|
|
* GstSplitMuxSink::format-location-full:
|
|
* @splitmux: the #GstSplitMuxSink
|
|
* @fragment_id: the sequence number of the file to be created
|
|
* @first_sample: A #GstSample containing the first buffer
|
|
* from the reference stream in the new file
|
|
*
|
|
* Returns: the location to be used for the next output file
|
|
*/
|
|
signals[SIGNAL_FORMAT_LOCATION_FULL] =
|
|
g_signal_new ("format-location-full", G_TYPE_FROM_CLASS (klass),
|
|
G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_STRING, 2, G_TYPE_UINT,
|
|
GST_TYPE_SAMPLE);
|
|
|
|
/**
|
|
* GstSplitMuxSink::split-now:
|
|
* @splitmux: the #GstSplitMuxSink
|
|
*
|
|
* When called by the user, this action signal splits the video file (and begins a new one) immediately.
|
|
*
|
|
*
|
|
* Since: 1.14
|
|
*/
|
|
signals[SIGNAL_SPLIT_NOW] =
|
|
g_signal_new ("split-now", G_TYPE_FROM_CLASS (klass),
|
|
G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstSplitMuxSinkClass,
|
|
split_now), NULL, NULL, NULL, G_TYPE_NONE, 0);
|
|
|
|
/**
|
|
* GstSplitMuxSink::muxer-added:
|
|
* @splitmux: the #GstSplitMuxSink
|
|
* @muxer: the newly added muxer element
|
|
*
|
|
* Since: 1.14
|
|
*/
|
|
signals[SIGNAL_MUXER_ADDED] =
|
|
g_signal_new ("muxer-added", G_TYPE_FROM_CLASS (klass),
|
|
G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
|
|
|
|
/**
|
|
* GstSplitMuxSink::sink-added:
|
|
* @splitmux: the #GstSplitMuxSink
|
|
* @sink: the newly added sink element
|
|
*
|
|
* Since: 1.14
|
|
*/
|
|
signals[SIGNAL_SINK_ADDED] =
|
|
g_signal_new ("sink-added", G_TYPE_FROM_CLASS (klass),
|
|
G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
|
|
|
|
klass->split_now = split_now;
|
|
}
|
|
|
|
static void
|
|
gst_splitmux_sink_init (GstSplitMuxSink * splitmux)
|
|
{
|
|
g_mutex_init (&splitmux->lock);
|
|
g_cond_init (&splitmux->input_cond);
|
|
g_cond_init (&splitmux->output_cond);
|
|
g_queue_init (&splitmux->out_cmd_q);
|
|
|
|
splitmux->mux_overhead = DEFAULT_MUXER_OVERHEAD;
|
|
splitmux->threshold_time = DEFAULT_MAX_SIZE_TIME;
|
|
splitmux->threshold_bytes = DEFAULT_MAX_SIZE_BYTES;
|
|
splitmux->max_files = DEFAULT_MAX_FILES;
|
|
splitmux->send_keyframe_requests = DEFAULT_SEND_KEYFRAME_REQUESTS;
|
|
splitmux->next_max_tc_time = GST_CLOCK_TIME_NONE;
|
|
splitmux->alignment_threshold = DEFAULT_ALIGNMENT_THRESHOLD;
|
|
splitmux->use_robust_muxing = DEFAULT_USE_ROBUST_MUXING;
|
|
splitmux->reset_muxer = DEFAULT_RESET_MUXER;
|
|
|
|
splitmux->threshold_timecode_str = NULL;
|
|
|
|
splitmux->async_finalize = DEFAULT_ASYNC_FINALIZE;
|
|
splitmux->muxer_factory = g_strdup (DEFAULT_MUXER);
|
|
splitmux->muxer_properties = NULL;
|
|
splitmux->sink_factory = g_strdup (DEFAULT_SINK);
|
|
splitmux->sink_properties = NULL;
|
|
|
|
GST_OBJECT_FLAG_SET (splitmux, GST_ELEMENT_FLAG_SINK);
|
|
splitmux->split_now = FALSE;
|
|
}
|
|
|
|
static void
|
|
gst_splitmux_reset (GstSplitMuxSink * splitmux)
|
|
{
|
|
if (splitmux->muxer) {
|
|
gst_element_set_locked_state (splitmux->muxer, TRUE);
|
|
gst_element_set_state (splitmux->muxer, GST_STATE_NULL);
|
|
gst_bin_remove (GST_BIN (splitmux), splitmux->muxer);
|
|
}
|
|
if (splitmux->active_sink) {
|
|
gst_element_set_locked_state (splitmux->active_sink, TRUE);
|
|
gst_element_set_state (splitmux->active_sink, GST_STATE_NULL);
|
|
gst_bin_remove (GST_BIN (splitmux), splitmux->active_sink);
|
|
}
|
|
|
|
splitmux->sink = splitmux->active_sink = splitmux->muxer = NULL;
|
|
}
|
|
|
|
static void
|
|
gst_splitmux_sink_dispose (GObject * object)
|
|
{
|
|
GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
|
|
|
|
/* Calling parent dispose invalidates all child pointers */
|
|
splitmux->sink = splitmux->active_sink = splitmux->muxer = NULL;
|
|
|
|
G_OBJECT_CLASS (parent_class)->dispose (object);
|
|
}
|
|
|
|
static void
|
|
gst_splitmux_sink_finalize (GObject * object)
|
|
{
|
|
GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
|
|
g_cond_clear (&splitmux->input_cond);
|
|
g_cond_clear (&splitmux->output_cond);
|
|
g_mutex_clear (&splitmux->lock);
|
|
g_queue_foreach (&splitmux->out_cmd_q, (GFunc) out_cmd_buf_free, NULL);
|
|
g_queue_clear (&splitmux->out_cmd_q);
|
|
|
|
if (splitmux->provided_sink)
|
|
gst_object_unref (splitmux->provided_sink);
|
|
if (splitmux->provided_muxer)
|
|
gst_object_unref (splitmux->provided_muxer);
|
|
|
|
if (splitmux->muxer_factory)
|
|
g_free (splitmux->muxer_factory);
|
|
if (splitmux->muxer_properties)
|
|
gst_structure_free (splitmux->muxer_properties);
|
|
if (splitmux->sink_factory)
|
|
g_free (splitmux->sink_factory);
|
|
if (splitmux->sink_properties)
|
|
gst_structure_free (splitmux->sink_properties);
|
|
|
|
if (splitmux->threshold_timecode_str)
|
|
g_free (splitmux->threshold_timecode_str);
|
|
|
|
g_free (splitmux->location);
|
|
|
|
/* Make sure to free any un-released contexts. There should not be any,
|
|
* because the dispose will have freed all request pads though */
|
|
g_list_foreach (splitmux->contexts, (GFunc) mq_stream_ctx_free, NULL);
|
|
g_list_free (splitmux->contexts);
|
|
|
|
G_OBJECT_CLASS (parent_class)->finalize (object);
|
|
}
|
|
|
|
/*
|
|
* Set any time threshold to the muxer, if it has
|
|
* reserved-max-duration and reserved-duration-remaining
|
|
* properties. Called when creating/claiming the muxer
|
|
* in create_elements() */
|
|
static void
|
|
update_muxer_properties (GstSplitMuxSink * sink)
|
|
{
|
|
GObjectClass *klass;
|
|
GstClockTime threshold_time;
|
|
|
|
sink->muxer_has_reserved_props = FALSE;
|
|
if (sink->muxer == NULL)
|
|
return;
|
|
klass = G_OBJECT_GET_CLASS (sink->muxer);
|
|
if (g_object_class_find_property (klass, "reserved-max-duration") == NULL)
|
|
return;
|
|
if (g_object_class_find_property (klass,
|
|
"reserved-duration-remaining") == NULL)
|
|
return;
|
|
sink->muxer_has_reserved_props = TRUE;
|
|
|
|
GST_LOG_OBJECT (sink, "Setting muxer reserved time to %" GST_TIME_FORMAT,
|
|
GST_TIME_ARGS (sink->threshold_time));
|
|
GST_OBJECT_LOCK (sink);
|
|
threshold_time = sink->threshold_time;
|
|
GST_OBJECT_UNLOCK (sink);
|
|
|
|
if (threshold_time > 0) {
|
|
/* Tell the muxer how much space to reserve */
|
|
GstClockTime muxer_threshold = threshold_time;
|
|
g_object_set (sink->muxer, "reserved-max-duration", muxer_threshold, NULL);
|
|
}
|
|
}
|
|
|
|
static void
|
|
gst_splitmux_sink_set_property (GObject * object, guint prop_id,
|
|
const GValue * value, GParamSpec * pspec)
|
|
{
|
|
GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
|
|
|
|
switch (prop_id) {
|
|
case PROP_LOCATION:{
|
|
GST_OBJECT_LOCK (splitmux);
|
|
g_free (splitmux->location);
|
|
splitmux->location = g_value_dup_string (value);
|
|
GST_OBJECT_UNLOCK (splitmux);
|
|
break;
|
|
}
|
|
case PROP_MAX_SIZE_BYTES:
|
|
GST_OBJECT_LOCK (splitmux);
|
|
splitmux->threshold_bytes = g_value_get_uint64 (value);
|
|
GST_OBJECT_UNLOCK (splitmux);
|
|
break;
|
|
case PROP_MAX_SIZE_TIME:
|
|
GST_OBJECT_LOCK (splitmux);
|
|
splitmux->threshold_time = g_value_get_uint64 (value);
|
|
GST_OBJECT_UNLOCK (splitmux);
|
|
break;
|
|
case PROP_MAX_SIZE_TIMECODE:
|
|
GST_OBJECT_LOCK (splitmux);
|
|
splitmux->threshold_timecode_str = g_value_dup_string (value);
|
|
GST_OBJECT_UNLOCK (splitmux);
|
|
break;
|
|
case PROP_SEND_KEYFRAME_REQUESTS:
|
|
GST_OBJECT_LOCK (splitmux);
|
|
splitmux->send_keyframe_requests = g_value_get_boolean (value);
|
|
GST_OBJECT_UNLOCK (splitmux);
|
|
break;
|
|
case PROP_MAX_FILES:
|
|
GST_OBJECT_LOCK (splitmux);
|
|
splitmux->max_files = g_value_get_uint (value);
|
|
GST_OBJECT_UNLOCK (splitmux);
|
|
break;
|
|
case PROP_MUXER_OVERHEAD:
|
|
GST_OBJECT_LOCK (splitmux);
|
|
splitmux->mux_overhead = g_value_get_double (value);
|
|
GST_OBJECT_UNLOCK (splitmux);
|
|
break;
|
|
case PROP_USE_ROBUST_MUXING:
|
|
GST_OBJECT_LOCK (splitmux);
|
|
splitmux->use_robust_muxing = g_value_get_boolean (value);
|
|
GST_OBJECT_UNLOCK (splitmux);
|
|
if (splitmux->use_robust_muxing)
|
|
update_muxer_properties (splitmux);
|
|
break;
|
|
case PROP_ALIGNMENT_THRESHOLD:
|
|
GST_OBJECT_LOCK (splitmux);
|
|
splitmux->alignment_threshold = g_value_get_uint64 (value);
|
|
GST_OBJECT_UNLOCK (splitmux);
|
|
break;
|
|
case PROP_SINK:
|
|
GST_OBJECT_LOCK (splitmux);
|
|
if (splitmux->provided_sink)
|
|
gst_object_unref (splitmux->provided_sink);
|
|
splitmux->provided_sink = g_value_get_object (value);
|
|
gst_object_ref_sink (splitmux->provided_sink);
|
|
GST_OBJECT_UNLOCK (splitmux);
|
|
break;
|
|
case PROP_MUXER:
|
|
GST_OBJECT_LOCK (splitmux);
|
|
if (splitmux->provided_muxer)
|
|
gst_object_unref (splitmux->provided_muxer);
|
|
splitmux->provided_muxer = g_value_get_object (value);
|
|
gst_object_ref_sink (splitmux->provided_muxer);
|
|
GST_OBJECT_UNLOCK (splitmux);
|
|
break;
|
|
case PROP_RESET_MUXER:
|
|
GST_OBJECT_LOCK (splitmux);
|
|
splitmux->reset_muxer = g_value_get_boolean (value);
|
|
GST_OBJECT_UNLOCK (splitmux);
|
|
break;
|
|
case PROP_ASYNC_FINALIZE:
|
|
GST_OBJECT_LOCK (splitmux);
|
|
splitmux->async_finalize = g_value_get_boolean (value);
|
|
GST_OBJECT_UNLOCK (splitmux);
|
|
break;
|
|
case PROP_MUXER_FACTORY:
|
|
GST_OBJECT_LOCK (splitmux);
|
|
if (splitmux->muxer_factory)
|
|
g_free (splitmux->muxer_factory);
|
|
splitmux->muxer_factory = g_value_dup_string (value);
|
|
GST_OBJECT_UNLOCK (splitmux);
|
|
break;
|
|
case PROP_MUXER_PROPERTIES:
|
|
GST_OBJECT_LOCK (splitmux);
|
|
if (splitmux->muxer_properties)
|
|
gst_structure_free (splitmux->muxer_properties);
|
|
if (gst_value_get_structure (value))
|
|
splitmux->muxer_properties =
|
|
gst_structure_copy (gst_value_get_structure (value));
|
|
else
|
|
splitmux->muxer_properties = NULL;
|
|
GST_OBJECT_UNLOCK (splitmux);
|
|
break;
|
|
case PROP_SINK_FACTORY:
|
|
GST_OBJECT_LOCK (splitmux);
|
|
if (splitmux->sink_factory)
|
|
g_free (splitmux->sink_factory);
|
|
splitmux->sink_factory = g_value_dup_string (value);
|
|
GST_OBJECT_UNLOCK (splitmux);
|
|
break;
|
|
case PROP_SINK_PROPERTIES:
|
|
GST_OBJECT_LOCK (splitmux);
|
|
if (splitmux->sink_properties)
|
|
gst_structure_free (splitmux->sink_properties);
|
|
if (gst_value_get_structure (value))
|
|
splitmux->sink_properties =
|
|
gst_structure_copy (gst_value_get_structure (value));
|
|
else
|
|
splitmux->sink_properties = NULL;
|
|
GST_OBJECT_UNLOCK (splitmux);
|
|
break;
|
|
default:
|
|
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
|
|
break;
|
|
}
|
|
}
|
|
|
|
static void
|
|
gst_splitmux_sink_get_property (GObject * object, guint prop_id,
|
|
GValue * value, GParamSpec * pspec)
|
|
{
|
|
GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
|
|
|
|
switch (prop_id) {
|
|
case PROP_LOCATION:
|
|
GST_OBJECT_LOCK (splitmux);
|
|
g_value_set_string (value, splitmux->location);
|
|
GST_OBJECT_UNLOCK (splitmux);
|
|
break;
|
|
case PROP_MAX_SIZE_BYTES:
|
|
GST_OBJECT_LOCK (splitmux);
|
|
g_value_set_uint64 (value, splitmux->threshold_bytes);
|
|
GST_OBJECT_UNLOCK (splitmux);
|
|
break;
|
|
case PROP_MAX_SIZE_TIME:
|
|
GST_OBJECT_LOCK (splitmux);
|
|
g_value_set_uint64 (value, splitmux->threshold_time);
|
|
GST_OBJECT_UNLOCK (splitmux);
|
|
break;
|
|
case PROP_MAX_SIZE_TIMECODE:
|
|
GST_OBJECT_LOCK (splitmux);
|
|
g_value_set_string (value, splitmux->threshold_timecode_str);
|
|
GST_OBJECT_UNLOCK (splitmux);
|
|
break;
|
|
case PROP_SEND_KEYFRAME_REQUESTS:
|
|
GST_OBJECT_LOCK (splitmux);
|
|
g_value_set_boolean (value, splitmux->send_keyframe_requests);
|
|
GST_OBJECT_UNLOCK (splitmux);
|
|
break;
|
|
case PROP_MAX_FILES:
|
|
GST_OBJECT_LOCK (splitmux);
|
|
g_value_set_uint (value, splitmux->max_files);
|
|
GST_OBJECT_UNLOCK (splitmux);
|
|
break;
|
|
case PROP_MUXER_OVERHEAD:
|
|
GST_OBJECT_LOCK (splitmux);
|
|
g_value_set_double (value, splitmux->mux_overhead);
|
|
GST_OBJECT_UNLOCK (splitmux);
|
|
break;
|
|
case PROP_USE_ROBUST_MUXING:
|
|
GST_OBJECT_LOCK (splitmux);
|
|
g_value_set_boolean (value, splitmux->use_robust_muxing);
|
|
GST_OBJECT_UNLOCK (splitmux);
|
|
break;
|
|
case PROP_ALIGNMENT_THRESHOLD:
|
|
GST_OBJECT_LOCK (splitmux);
|
|
g_value_set_uint64 (value, splitmux->alignment_threshold);
|
|
GST_OBJECT_UNLOCK (splitmux);
|
|
break;
|
|
case PROP_SINK:
|
|
GST_OBJECT_LOCK (splitmux);
|
|
g_value_set_object (value, splitmux->provided_sink);
|
|
GST_OBJECT_UNLOCK (splitmux);
|
|
break;
|
|
case PROP_MUXER:
|
|
GST_OBJECT_LOCK (splitmux);
|
|
g_value_set_object (value, splitmux->provided_muxer);
|
|
GST_OBJECT_UNLOCK (splitmux);
|
|
break;
|
|
case PROP_RESET_MUXER:
|
|
GST_OBJECT_LOCK (splitmux);
|
|
g_value_set_boolean (value, splitmux->reset_muxer);
|
|
GST_OBJECT_UNLOCK (splitmux);
|
|
break;
|
|
case PROP_ASYNC_FINALIZE:
|
|
GST_OBJECT_LOCK (splitmux);
|
|
g_value_set_boolean (value, splitmux->async_finalize);
|
|
GST_OBJECT_UNLOCK (splitmux);
|
|
break;
|
|
case PROP_MUXER_FACTORY:
|
|
GST_OBJECT_LOCK (splitmux);
|
|
g_value_set_string (value, splitmux->muxer_factory);
|
|
GST_OBJECT_UNLOCK (splitmux);
|
|
break;
|
|
case PROP_MUXER_PROPERTIES:
|
|
GST_OBJECT_LOCK (splitmux);
|
|
gst_value_set_structure (value, splitmux->muxer_properties);
|
|
GST_OBJECT_UNLOCK (splitmux);
|
|
break;
|
|
case PROP_SINK_FACTORY:
|
|
GST_OBJECT_LOCK (splitmux);
|
|
g_value_set_string (value, splitmux->sink_factory);
|
|
GST_OBJECT_UNLOCK (splitmux);
|
|
break;
|
|
case PROP_SINK_PROPERTIES:
|
|
GST_OBJECT_LOCK (splitmux);
|
|
gst_value_set_structure (value, splitmux->sink_properties);
|
|
GST_OBJECT_UNLOCK (splitmux);
|
|
break;
|
|
default:
|
|
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
|
|
break;
|
|
}
|
|
}
|
|
|
|
/* Convenience function */
|
|
static inline GstClockTimeDiff
|
|
my_segment_to_running_time (GstSegment * segment, GstClockTime val)
|
|
{
|
|
GstClockTimeDiff res = GST_CLOCK_STIME_NONE;
|
|
|
|
if (GST_CLOCK_TIME_IS_VALID (val)) {
|
|
gboolean sign =
|
|
gst_segment_to_running_time_full (segment, GST_FORMAT_TIME, val, &val);
|
|
if (sign > 0)
|
|
res = val;
|
|
else if (sign < 0)
|
|
res = -val;
|
|
}
|
|
return res;
|
|
}
|
|
|
|
static MqStreamCtx *
|
|
mq_stream_ctx_new (GstSplitMuxSink * splitmux)
|
|
{
|
|
MqStreamCtx *ctx;
|
|
|
|
ctx = g_new0 (MqStreamCtx, 1);
|
|
ctx->splitmux = splitmux;
|
|
gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
|
|
gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
|
|
ctx->in_running_time = ctx->out_running_time = GST_CLOCK_STIME_NONE;
|
|
g_queue_init (&ctx->queued_bufs);
|
|
return ctx;
|
|
}
|
|
|
|
static void
|
|
mq_stream_ctx_free (MqStreamCtx * ctx)
|
|
{
|
|
if (ctx->q) {
|
|
GstObject *parent = gst_object_get_parent (GST_OBJECT (ctx->q));
|
|
|
|
g_signal_handler_disconnect (ctx->q, ctx->q_overrun_id);
|
|
|
|
if (parent == GST_OBJECT_CAST (ctx->splitmux)) {
|
|
gst_element_set_locked_state (ctx->q, TRUE);
|
|
gst_element_set_state (ctx->q, GST_STATE_NULL);
|
|
gst_bin_remove (GST_BIN (ctx->splitmux), ctx->q);
|
|
gst_object_unref (parent);
|
|
}
|
|
gst_object_unref (ctx->q);
|
|
}
|
|
gst_buffer_replace (&ctx->prev_in_keyframe, NULL);
|
|
gst_object_unref (ctx->sinkpad);
|
|
gst_object_unref (ctx->srcpad);
|
|
g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
|
|
g_queue_clear (&ctx->queued_bufs);
|
|
g_free (ctx);
|
|
}
|
|
|
|
static void
|
|
send_fragment_opened_closed_msg (GstSplitMuxSink * splitmux, gboolean opened,
|
|
GstElement * sink)
|
|
{
|
|
gchar *location = NULL;
|
|
GstMessage *msg;
|
|
const gchar *msg_name = opened ?
|
|
"splitmuxsink-fragment-opened" : "splitmuxsink-fragment-closed";
|
|
GstClockTime running_time = splitmux->reference_ctx->out_running_time;
|
|
|
|
if (!opened) {
|
|
GstClockTime *rtime = g_object_get_qdata (G_OBJECT (sink), RUNNING_TIME);
|
|
if (rtime)
|
|
running_time = *rtime;
|
|
}
|
|
|
|
g_object_get (sink, "location", &location, NULL);
|
|
|
|
/* If it's in the middle of a teardown, the reference_ctc might have become
|
|
* NULL */
|
|
if (splitmux->reference_ctx) {
|
|
msg = gst_message_new_element (GST_OBJECT (splitmux),
|
|
gst_structure_new (msg_name,
|
|
"location", G_TYPE_STRING, location,
|
|
"running-time", GST_TYPE_CLOCK_TIME, running_time, NULL));
|
|
gst_element_post_message (GST_ELEMENT_CAST (splitmux), msg);
|
|
}
|
|
|
|
g_free (location);
|
|
}
|
|
|
|
static void
|
|
send_eos_async (GstSplitMuxSink * splitmux, AsyncEosHelper * helper)
|
|
{
|
|
GstEvent *eos;
|
|
GstPad *pad;
|
|
MqStreamCtx *ctx;
|
|
|
|
eos = gst_event_new_eos ();
|
|
pad = helper->pad;
|
|
ctx = helper->ctx;
|
|
|
|
GST_SPLITMUX_LOCK (splitmux);
|
|
if (!pad)
|
|
pad = gst_pad_get_peer (ctx->srcpad);
|
|
GST_SPLITMUX_UNLOCK (splitmux);
|
|
|
|
gst_pad_send_event (pad, eos);
|
|
GST_INFO_OBJECT (splitmux, "Sent async EOS on %" GST_PTR_FORMAT, pad);
|
|
|
|
gst_object_unref (pad);
|
|
g_free (helper);
|
|
}
|
|
|
|
/* Called with lock held, drops the lock to send EOS to the
|
|
* pad
|
|
*/
|
|
static void
|
|
send_eos (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
|
|
{
|
|
GstEvent *eos;
|
|
GstPad *pad;
|
|
|
|
eos = gst_event_new_eos ();
|
|
pad = gst_pad_get_peer (ctx->srcpad);
|
|
|
|
ctx->out_eos = TRUE;
|
|
|
|
GST_INFO_OBJECT (splitmux, "Sending EOS on %" GST_PTR_FORMAT, pad);
|
|
GST_SPLITMUX_UNLOCK (splitmux);
|
|
gst_pad_send_event (pad, eos);
|
|
GST_SPLITMUX_LOCK (splitmux);
|
|
|
|
gst_object_unref (pad);
|
|
}
|
|
|
|
/* Called with lock held. Schedules an EOS event to the ctx pad
|
|
* to happen in another thread */
|
|
static void
|
|
eos_context_async (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
|
|
{
|
|
AsyncEosHelper *helper = g_new0 (AsyncEosHelper, 1);
|
|
GstPad *srcpad, *sinkpad;
|
|
|
|
srcpad = ctx->srcpad;
|
|
sinkpad = gst_pad_get_peer (srcpad);
|
|
|
|
helper->ctx = ctx;
|
|
helper->pad = sinkpad; /* Takes the reference */
|
|
|
|
ctx->out_eos_async_done = TRUE;
|
|
/* HACK: Here, we explicitly unset the SINK flag on the target sink element
|
|
* that's about to be asynchronously disposed, so that it no longer
|
|
* participates in GstBin EOS logic. This fixes a race where if
|
|
* splitmuxsink really reaches EOS before an asynchronous background
|
|
* element has finished, then the bin won't actually send EOS to the
|
|
* pipeline. Even after finishing and removing the old element, the
|
|
* bin doesn't re-check EOS status on removing a SINK element. This
|
|
* should be fixed in core, making this hack unnecessary. */
|
|
GST_OBJECT_FLAG_UNSET (splitmux->active_sink, GST_ELEMENT_FLAG_SINK);
|
|
|
|
GST_DEBUG_OBJECT (splitmux, "scheduled EOS to pad %" GST_PTR_FORMAT " ctx %p",
|
|
sinkpad, ctx);
|
|
|
|
g_assert_nonnull (helper->pad);
|
|
gst_element_call_async (GST_ELEMENT (splitmux),
|
|
(GstElementCallAsyncFunc) send_eos_async, helper, NULL);
|
|
}
|
|
|
|
/* Called with lock held. TRUE iff all contexts have a
|
|
* pending (or delivered) async eos event */
|
|
static gboolean
|
|
all_contexts_are_async_eos (GstSplitMuxSink * splitmux)
|
|
{
|
|
gboolean ret = TRUE;
|
|
GList *item;
|
|
|
|
for (item = splitmux->contexts; item; item = item->next) {
|
|
MqStreamCtx *ctx = item->data;
|
|
ret &= ctx->out_eos_async_done;
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
/* Called with splitmux lock held to check if this output
|
|
* context needs to sleep to wait for the release of the
|
|
* next GOP, or to send EOS to close out the current file
|
|
*/
|
|
static void
|
|
complete_or_wait_on_out (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
|
|
{
|
|
if (ctx->caps_change)
|
|
return;
|
|
|
|
do {
|
|
/* When first starting up, the reference stream has to output
|
|
* the first buffer to prepare the muxer and sink */
|
|
gboolean can_output = (ctx->is_reference || splitmux->ready_for_output);
|
|
GstClockTimeDiff my_max_out_running_time = splitmux->max_out_running_time;
|
|
|
|
if (!(splitmux->max_out_running_time == 0 ||
|
|
splitmux->max_out_running_time == GST_CLOCK_STIME_NONE ||
|
|
splitmux->alignment_threshold == 0 ||
|
|
splitmux->max_out_running_time < splitmux->alignment_threshold)) {
|
|
my_max_out_running_time -= splitmux->alignment_threshold;
|
|
GST_LOG_OBJECT (ctx->srcpad,
|
|
"Max out running time currently %" GST_STIME_FORMAT
|
|
", with threshold applied it is %" GST_STIME_FORMAT,
|
|
GST_STIME_ARGS (splitmux->max_out_running_time),
|
|
GST_STIME_ARGS (my_max_out_running_time));
|
|
}
|
|
|
|
if (ctx->flushing
|
|
|| splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
|
|
return;
|
|
|
|
GST_LOG_OBJECT (ctx->srcpad,
|
|
"Checking running time %" GST_STIME_FORMAT " against max %"
|
|
GST_STIME_FORMAT, GST_STIME_ARGS (ctx->out_running_time),
|
|
GST_STIME_ARGS (my_max_out_running_time));
|
|
|
|
if (can_output) {
|
|
if (splitmux->max_out_running_time == GST_CLOCK_STIME_NONE ||
|
|
ctx->out_running_time < my_max_out_running_time) {
|
|
return;
|
|
}
|
|
|
|
switch (splitmux->output_state) {
|
|
case SPLITMUX_OUTPUT_STATE_OUTPUT_GOP:
|
|
/* We only get here if we've finished outputting a GOP and need to know
|
|
* what to do next */
|
|
splitmux->output_state = SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND;
|
|
GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
|
|
continue;
|
|
|
|
case SPLITMUX_OUTPUT_STATE_ENDING_FILE:
|
|
/* We've reached the max out running_time to get here, so end this file now */
|
|
if (ctx->out_eos == FALSE) {
|
|
if (splitmux->async_finalize) {
|
|
/* We must set EOS asynchronously at this point. We cannot defer
|
|
* it, because we need all contexts to wake up, for the
|
|
* reference context to eventually give us something at
|
|
* START_NEXT_FILE. Otherwise, collectpads might choose another
|
|
* context to give us the first buffer, and format-location-full
|
|
* will not contain a valid sample. */
|
|
g_object_set_qdata ((GObject *) splitmux->sink, EOS_FROM_US,
|
|
GINT_TO_POINTER (1));
|
|
eos_context_async (ctx, splitmux);
|
|
if (all_contexts_are_async_eos (splitmux)) {
|
|
GST_INFO_OBJECT (splitmux,
|
|
"All contexts are async_eos. Moving to the next file.");
|
|
/* We can start the next file once we've asked each pad to go EOS */
|
|
splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE;
|
|
GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
|
|
continue;
|
|
}
|
|
} else {
|
|
send_eos (splitmux, ctx);
|
|
continue;
|
|
}
|
|
} else {
|
|
GST_INFO_OBJECT (splitmux,
|
|
"At end-of-file state, but context %p is already EOS", ctx);
|
|
}
|
|
break;
|
|
case SPLITMUX_OUTPUT_STATE_START_NEXT_FILE:
|
|
if (ctx->is_reference) {
|
|
/* Special handling on the reference ctx to start new fragments
|
|
* and collect commands from the command queue */
|
|
/* drops the splitmux lock briefly: */
|
|
/* We must have reference ctx in order for format-location-full to
|
|
* have a sample */
|
|
start_next_fragment (splitmux, ctx);
|
|
continue;
|
|
}
|
|
break;
|
|
case SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND:{
|
|
do {
|
|
SplitMuxOutputCommand *cmd =
|
|
g_queue_pop_tail (&splitmux->out_cmd_q);
|
|
if (cmd != NULL) {
|
|
/* If we pop the last command, we need to make our queues bigger */
|
|
if (g_queue_get_length (&splitmux->out_cmd_q) == 0)
|
|
grow_blocked_queues (splitmux);
|
|
|
|
if (cmd->start_new_fragment) {
|
|
GST_DEBUG_OBJECT (splitmux, "Got cmd to start new fragment");
|
|
splitmux->output_state = SPLITMUX_OUTPUT_STATE_ENDING_FILE;
|
|
} else {
|
|
GST_DEBUG_OBJECT (splitmux,
|
|
"Got new output cmd for time %" GST_STIME_FORMAT,
|
|
GST_STIME_ARGS (cmd->max_output_ts));
|
|
|
|
/* Extend the output range immediately */
|
|
splitmux->max_out_running_time = cmd->max_output_ts;
|
|
splitmux->output_state = SPLITMUX_OUTPUT_STATE_OUTPUT_GOP;
|
|
}
|
|
GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
|
|
|
|
out_cmd_buf_free (cmd);
|
|
break;
|
|
} else {
|
|
GST_SPLITMUX_WAIT_OUTPUT (splitmux);
|
|
}
|
|
} while (splitmux->output_state ==
|
|
SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND);
|
|
/* loop and re-check the state */
|
|
continue;
|
|
}
|
|
case SPLITMUX_OUTPUT_STATE_STOPPED:
|
|
return;
|
|
}
|
|
}
|
|
|
|
GST_INFO_OBJECT (ctx->srcpad,
|
|
"Sleeping for running time %"
|
|
GST_STIME_FORMAT " (max %" GST_STIME_FORMAT ") or state change.",
|
|
GST_STIME_ARGS (ctx->out_running_time),
|
|
GST_STIME_ARGS (splitmux->max_out_running_time));
|
|
GST_SPLITMUX_WAIT_OUTPUT (splitmux);
|
|
GST_INFO_OBJECT (ctx->srcpad,
|
|
"Woken for new max running time %" GST_STIME_FORMAT,
|
|
GST_STIME_ARGS (splitmux->max_out_running_time));
|
|
}
|
|
while (1);
|
|
}
|
|
|
|
static GstClockTime
|
|
calculate_next_max_timecode (GstSplitMuxSink * splitmux,
|
|
const GstVideoTimeCode * cur_tc)
|
|
{
|
|
GstVideoTimeCode *target_tc;
|
|
GstVideoTimeCodeInterval *tc_inter;
|
|
GstClockTime cur_tc_time, target_tc_time, next_max_tc_time;
|
|
|
|
if (cur_tc == NULL || splitmux->threshold_timecode_str == NULL)
|
|
return GST_CLOCK_TIME_NONE;
|
|
|
|
tc_inter =
|
|
gst_video_time_code_interval_new_from_string
|
|
(splitmux->threshold_timecode_str);
|
|
target_tc = gst_video_time_code_add_interval (cur_tc, tc_inter);
|
|
gst_video_time_code_interval_free (tc_inter);
|
|
|
|
/* Convert to ns */
|
|
target_tc_time = gst_video_time_code_nsec_since_daily_jam (target_tc);
|
|
cur_tc_time = gst_video_time_code_nsec_since_daily_jam (cur_tc);
|
|
|
|
/* Add fragment_start_time, accounting for wraparound */
|
|
if (target_tc_time >= cur_tc_time) {
|
|
next_max_tc_time =
|
|
target_tc_time - cur_tc_time + splitmux->fragment_start_time;
|
|
} else {
|
|
GstClockTime day_in_ns = 24 * 60 * 60 * GST_SECOND;
|
|
|
|
next_max_tc_time =
|
|
day_in_ns - cur_tc_time + target_tc_time +
|
|
splitmux->fragment_start_time;
|
|
}
|
|
GST_INFO_OBJECT (splitmux, "Next max TC time: %" GST_TIME_FORMAT
|
|
" from ref TC: %" GST_TIME_FORMAT, GST_TIME_ARGS (next_max_tc_time),
|
|
GST_TIME_ARGS (cur_tc_time));
|
|
gst_video_time_code_free (target_tc);
|
|
|
|
return next_max_tc_time;
|
|
}
|
|
|
|
static gboolean
|
|
request_next_keyframe (GstSplitMuxSink * splitmux, GstBuffer * buffer)
|
|
{
|
|
GstEvent *ev;
|
|
GstClockTime target_time;
|
|
gboolean timecode_based = FALSE;
|
|
|
|
splitmux->next_max_tc_time = GST_CLOCK_TIME_NONE;
|
|
if (splitmux->threshold_timecode_str) {
|
|
GstVideoTimeCodeMeta *tc_meta;
|
|
|
|
if (buffer != NULL) {
|
|
tc_meta = gst_buffer_get_video_time_code_meta (buffer);
|
|
if (tc_meta) {
|
|
splitmux->next_max_tc_time =
|
|
calculate_next_max_timecode (splitmux, &tc_meta->tc);
|
|
timecode_based = (splitmux->next_max_tc_time != GST_CLOCK_TIME_NONE);
|
|
}
|
|
} else {
|
|
/* This can happen in the presence of GAP events that trigger
|
|
* a new fragment start */
|
|
GST_WARNING_OBJECT (splitmux,
|
|
"No buffer available to calculate next timecode");
|
|
}
|
|
}
|
|
|
|
if (splitmux->send_keyframe_requests == FALSE
|
|
|| (splitmux->threshold_time == 0 && !timecode_based)
|
|
|| splitmux->threshold_bytes != 0)
|
|
return TRUE;
|
|
|
|
if (timecode_based) {
|
|
/* We might have rounding errors: aim slightly earlier */
|
|
target_time = splitmux->next_max_tc_time - 5 * GST_USECOND;
|
|
} else {
|
|
target_time = splitmux->fragment_start_time + splitmux->threshold_time;
|
|
}
|
|
ev = gst_video_event_new_upstream_force_key_unit (target_time, TRUE, 0);
|
|
GST_INFO_OBJECT (splitmux, "Requesting next keyframe at %" GST_TIME_FORMAT,
|
|
GST_TIME_ARGS (target_time));
|
|
return gst_pad_push_event (splitmux->reference_ctx->sinkpad, ev);
|
|
}
|
|
|
|
static GstPadProbeReturn
|
|
handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
|
|
{
|
|
GstSplitMuxSink *splitmux = ctx->splitmux;
|
|
MqStreamBuf *buf_info = NULL;
|
|
|
|
GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type);
|
|
|
|
/* FIXME: Handle buffer lists, until then make it clear they won't work */
|
|
if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
|
|
g_warning ("Buffer list handling not implemented");
|
|
return GST_PAD_PROBE_DROP;
|
|
}
|
|
if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM ||
|
|
info->type & GST_PAD_PROBE_TYPE_EVENT_FLUSH) {
|
|
GstEvent *event = gst_pad_probe_info_get_event (info);
|
|
gboolean locked = FALSE;
|
|
|
|
GST_LOG_OBJECT (pad, "Event %" GST_PTR_FORMAT, event);
|
|
|
|
switch (GST_EVENT_TYPE (event)) {
|
|
case GST_EVENT_SEGMENT:
|
|
gst_event_copy_segment (event, &ctx->out_segment);
|
|
break;
|
|
case GST_EVENT_FLUSH_STOP:
|
|
GST_SPLITMUX_LOCK (splitmux);
|
|
locked = TRUE;
|
|
gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
|
|
g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
|
|
g_queue_clear (&ctx->queued_bufs);
|
|
ctx->flushing = FALSE;
|
|
break;
|
|
case GST_EVENT_FLUSH_START:
|
|
GST_SPLITMUX_LOCK (splitmux);
|
|
locked = TRUE;
|
|
GST_LOG_OBJECT (pad, "Flush start");
|
|
ctx->flushing = TRUE;
|
|
GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
|
|
break;
|
|
case GST_EVENT_EOS:
|
|
GST_SPLITMUX_LOCK (splitmux);
|
|
locked = TRUE;
|
|
if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
|
|
goto beach;
|
|
ctx->out_eos = TRUE;
|
|
GST_INFO_OBJECT (splitmux,
|
|
"Have EOS event at pad %" GST_PTR_FORMAT " ctx %p", pad, ctx);
|
|
break;
|
|
case GST_EVENT_GAP:{
|
|
GstClockTime gap_ts;
|
|
GstClockTimeDiff rtime;
|
|
|
|
gst_event_parse_gap (event, &gap_ts, NULL);
|
|
if (gap_ts == GST_CLOCK_TIME_NONE)
|
|
break;
|
|
|
|
GST_SPLITMUX_LOCK (splitmux);
|
|
locked = TRUE;
|
|
|
|
if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
|
|
goto beach;
|
|
|
|
/* When we get a gap event on the
|
|
* reference stream and we're trying to open a
|
|
* new file, we need to store it until we get
|
|
* the buffer afterwards
|
|
*/
|
|
if (ctx->is_reference &&
|
|
(splitmux->output_state != SPLITMUX_OUTPUT_STATE_OUTPUT_GOP)) {
|
|
GST_DEBUG_OBJECT (pad, "Storing GAP event until buffer arrives");
|
|
gst_event_replace (&ctx->pending_gap, event);
|
|
GST_SPLITMUX_UNLOCK (splitmux);
|
|
return GST_PAD_PROBE_HANDLED;
|
|
}
|
|
|
|
rtime = my_segment_to_running_time (&ctx->out_segment, gap_ts);
|
|
|
|
GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_STIME_FORMAT,
|
|
GST_STIME_ARGS (rtime));
|
|
|
|
if (rtime != GST_CLOCK_STIME_NONE) {
|
|
ctx->out_running_time = rtime;
|
|
complete_or_wait_on_out (splitmux, ctx);
|
|
}
|
|
break;
|
|
}
|
|
case GST_EVENT_CUSTOM_DOWNSTREAM:{
|
|
const GstStructure *s;
|
|
GstClockTimeDiff ts = 0;
|
|
|
|
s = gst_event_get_structure (event);
|
|
if (!gst_structure_has_name (s, "splitmuxsink-unblock"))
|
|
break;
|
|
|
|
gst_structure_get_int64 (s, "timestamp", &ts);
|
|
|
|
GST_SPLITMUX_LOCK (splitmux);
|
|
locked = TRUE;
|
|
|
|
if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
|
|
goto beach;
|
|
ctx->out_running_time = ts;
|
|
if (!ctx->is_reference)
|
|
complete_or_wait_on_out (splitmux, ctx);
|
|
GST_SPLITMUX_UNLOCK (splitmux);
|
|
return GST_PAD_PROBE_DROP;
|
|
}
|
|
case GST_EVENT_CAPS:{
|
|
GstPad *peer;
|
|
|
|
if (!ctx->is_reference)
|
|
break;
|
|
|
|
peer = gst_pad_get_peer (pad);
|
|
if (peer) {
|
|
gboolean ok = gst_pad_send_event (peer, gst_event_ref (event));
|
|
|
|
gst_object_unref (peer);
|
|
|
|
if (ok)
|
|
break;
|
|
|
|
} else {
|
|
break;
|
|
}
|
|
/* This is in the case the muxer doesn't allow this change of caps */
|
|
GST_SPLITMUX_LOCK (splitmux);
|
|
locked = TRUE;
|
|
ctx->caps_change = TRUE;
|
|
|
|
if (splitmux->output_state != SPLITMUX_OUTPUT_STATE_START_NEXT_FILE) {
|
|
GST_DEBUG_OBJECT (splitmux,
|
|
"New caps were not accepted. Switching output file");
|
|
if (ctx->out_eos == FALSE) {
|
|
splitmux->output_state = SPLITMUX_OUTPUT_STATE_ENDING_FILE;
|
|
GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
|
|
}
|
|
}
|
|
|
|
/* Lets it fall through, if it fails again, then the muxer just can't
|
|
* support this format, but at least we have a closed file.
|
|
*/
|
|
break;
|
|
}
|
|
default:
|
|
break;
|
|
}
|
|
|
|
/* We need to make sure events aren't passed
|
|
* until the muxer / sink are ready for it */
|
|
if (!locked)
|
|
GST_SPLITMUX_LOCK (splitmux);
|
|
if (!ctx->is_reference)
|
|
complete_or_wait_on_out (splitmux, ctx);
|
|
GST_SPLITMUX_UNLOCK (splitmux);
|
|
|
|
/* Don't try to forward sticky events before the next buffer is there
|
|
* because it would cause a new file to be created without the first
|
|
* buffer being available.
|
|
*/
|
|
if (ctx->caps_change && GST_EVENT_IS_STICKY (event)) {
|
|
gst_event_unref (event);
|
|
return GST_PAD_PROBE_HANDLED;
|
|
} else
|
|
return GST_PAD_PROBE_PASS;
|
|
}
|
|
|
|
/* Allow everything through until the configured next stopping point */
|
|
GST_SPLITMUX_LOCK (splitmux);
|
|
|
|
buf_info = g_queue_pop_tail (&ctx->queued_bufs);
|
|
if (buf_info == NULL)
|
|
/* Can only happen due to a poorly timed flush */
|
|
goto beach;
|
|
|
|
/* If we have popped a keyframe, decrement the queued_gop count */
|
|
if (buf_info->keyframe && splitmux->queued_keyframes > 0)
|
|
splitmux->queued_keyframes--;
|
|
|
|
ctx->out_running_time = buf_info->run_ts;
|
|
ctx->cur_out_buffer = gst_pad_probe_info_get_buffer (info);
|
|
|
|
GST_LOG_OBJECT (splitmux,
|
|
"Pad %" GST_PTR_FORMAT " buffer with run TS %" GST_STIME_FORMAT
|
|
" size %" G_GUINT64_FORMAT,
|
|
pad, GST_STIME_ARGS (ctx->out_running_time), buf_info->buf_size);
|
|
|
|
ctx->caps_change = FALSE;
|
|
|
|
complete_or_wait_on_out (splitmux, ctx);
|
|
|
|
splitmux->muxed_out_bytes += buf_info->buf_size;
|
|
|
|
#ifndef GST_DISABLE_GST_DEBUG
|
|
{
|
|
GstBuffer *buf = gst_pad_probe_info_get_buffer (info);
|
|
GST_LOG_OBJECT (pad, "Returning to pass buffer %" GST_PTR_FORMAT
|
|
" run ts %" GST_STIME_FORMAT, buf,
|
|
GST_STIME_ARGS (ctx->out_running_time));
|
|
}
|
|
#endif
|
|
|
|
ctx->cur_out_buffer = NULL;
|
|
GST_SPLITMUX_UNLOCK (splitmux);
|
|
|
|
/* pending_gap is protected by the STREAM lock */
|
|
if (ctx->pending_gap) {
|
|
/* If we previously stored a gap event, send it now */
|
|
GstPad *peer = gst_pad_get_peer (ctx->srcpad);
|
|
|
|
GST_DEBUG_OBJECT (splitmux,
|
|
"Pad %" GST_PTR_FORMAT " sending pending GAP event", ctx->srcpad);
|
|
|
|
gst_pad_send_event (peer, ctx->pending_gap);
|
|
ctx->pending_gap = NULL;
|
|
|
|
gst_object_unref (peer);
|
|
}
|
|
|
|
mq_stream_buf_free (buf_info);
|
|
|
|
return GST_PAD_PROBE_PASS;
|
|
|
|
beach:
|
|
GST_SPLITMUX_UNLOCK (splitmux);
|
|
return GST_PAD_PROBE_DROP;
|
|
}
|
|
|
|
static gboolean
|
|
resend_sticky (GstPad * pad, GstEvent ** event, GstPad * peer)
|
|
{
|
|
return gst_pad_send_event (peer, gst_event_ref (*event));
|
|
}
|
|
|
|
static void
|
|
unlock_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
|
|
{
|
|
if (ctx->fragment_block_id > 0) {
|
|
gst_pad_remove_probe (ctx->srcpad, ctx->fragment_block_id);
|
|
ctx->fragment_block_id = 0;
|
|
}
|
|
}
|
|
|
|
static void
|
|
restart_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
|
|
{
|
|
GstPad *peer = gst_pad_get_peer (ctx->srcpad);
|
|
|
|
gst_pad_sticky_events_foreach (ctx->srcpad,
|
|
(GstPadStickyEventsForeachFunction) (resend_sticky), peer);
|
|
|
|
/* Clear EOS flag if not actually EOS */
|
|
ctx->out_eos = GST_PAD_IS_EOS (ctx->srcpad);
|
|
ctx->out_eos_async_done = ctx->out_eos;
|
|
|
|
gst_object_unref (peer);
|
|
}
|
|
|
|
static void
|
|
relink_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
|
|
{
|
|
GstPad *sinkpad, *srcpad, *newpad;
|
|
GstPadTemplate *templ;
|
|
|
|
srcpad = ctx->srcpad;
|
|
sinkpad = gst_pad_get_peer (srcpad);
|
|
|
|
templ = sinkpad->padtemplate;
|
|
newpad =
|
|
gst_element_request_pad (splitmux->muxer, templ,
|
|
GST_PAD_TEMPLATE_NAME_TEMPLATE (templ), NULL);
|
|
|
|
GST_DEBUG_OBJECT (splitmux, "Relinking ctx %p to pad %" GST_PTR_FORMAT, ctx,
|
|
newpad);
|
|
if (!gst_pad_unlink (srcpad, sinkpad)) {
|
|
gst_object_unref (sinkpad);
|
|
goto fail;
|
|
}
|
|
if (gst_pad_link_full (srcpad, newpad,
|
|
GST_PAD_LINK_CHECK_NO_RECONFIGURE) != GST_PAD_LINK_OK) {
|
|
gst_element_release_request_pad (splitmux->muxer, newpad);
|
|
gst_object_unref (sinkpad);
|
|
gst_object_unref (newpad);
|
|
goto fail;
|
|
}
|
|
gst_object_unref (newpad);
|
|
gst_object_unref (sinkpad);
|
|
return;
|
|
|
|
fail:
|
|
GST_ELEMENT_ERROR (splitmux, RESOURCE, SETTINGS,
|
|
("Could not create the new muxer/sink"), NULL);
|
|
}
|
|
|
|
static GstPadProbeReturn
|
|
_block_pad (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
|
|
{
|
|
return GST_PAD_PROBE_OK;
|
|
}
|
|
|
|
static void
|
|
block_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
|
|
{
|
|
ctx->fragment_block_id =
|
|
gst_pad_add_probe (ctx->srcpad, GST_PAD_PROBE_TYPE_BLOCK, _block_pad,
|
|
NULL, NULL);
|
|
}
|
|
|
|
static gboolean
|
|
_set_property_from_structure (GQuark field_id, const GValue * value,
|
|
gpointer user_data)
|
|
{
|
|
const gchar *property_name = g_quark_to_string (field_id);
|
|
GObject *element = G_OBJECT (user_data);
|
|
|
|
g_object_set_property (element, property_name, value);
|
|
|
|
return TRUE;
|
|
}
|
|
|
|
static void
|
|
_lock_and_set_to_null (GstElement * element, GstSplitMuxSink * splitmux)
|
|
{
|
|
gst_element_set_locked_state (element, TRUE);
|
|
gst_element_set_state (element, GST_STATE_NULL);
|
|
GST_LOG_OBJECT (splitmux, "Removing old element %" GST_PTR_FORMAT, element);
|
|
gst_bin_remove (GST_BIN (splitmux), element);
|
|
}
|
|
|
|
|
|
static void
|
|
_send_event (const GValue * value, gpointer user_data)
|
|
{
|
|
GstPad *pad = g_value_get_object (value);
|
|
GstEvent *ev = user_data;
|
|
|
|
gst_pad_send_event (pad, gst_event_ref (ev));
|
|
}
|
|
|
|
/* Called with lock held when a fragment
|
|
* reaches EOS and it is time to restart
|
|
* a new fragment
|
|
*/
|
|
static void
|
|
start_next_fragment (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
|
|
{
|
|
GstElement *muxer, *sink;
|
|
|
|
g_assert (ctx->is_reference);
|
|
|
|
/* 1 change to new file */
|
|
splitmux->switching_fragment = TRUE;
|
|
|
|
/* We need to drop the splitmux lock to acquire the state lock
|
|
* here and ensure there's no racy state change going on elsewhere */
|
|
muxer = gst_object_ref (splitmux->muxer);
|
|
sink = gst_object_ref (splitmux->active_sink);
|
|
|
|
GST_SPLITMUX_UNLOCK (splitmux);
|
|
GST_STATE_LOCK (splitmux);
|
|
|
|
if (splitmux->async_finalize) {
|
|
if (splitmux->muxed_out_bytes > 0 || splitmux->fragment_id != 0) {
|
|
gchar *newname;
|
|
GstElement *new_sink, *new_muxer;
|
|
|
|
GST_DEBUG_OBJECT (splitmux, "Starting fragment %u",
|
|
splitmux->fragment_id);
|
|
g_list_foreach (splitmux->contexts, (GFunc) block_context, splitmux);
|
|
newname = g_strdup_printf ("sink_%u", splitmux->fragment_id);
|
|
GST_SPLITMUX_LOCK (splitmux);
|
|
if ((splitmux->sink =
|
|
create_element (splitmux, splitmux->sink_factory, newname,
|
|
TRUE)) == NULL)
|
|
goto fail;
|
|
if (splitmux->sink_properties)
|
|
gst_structure_foreach (splitmux->sink_properties,
|
|
_set_property_from_structure, splitmux->sink);
|
|
splitmux->active_sink = splitmux->sink;
|
|
g_signal_emit (splitmux, signals[SIGNAL_SINK_ADDED], 0, splitmux->sink);
|
|
g_free (newname);
|
|
newname = g_strdup_printf ("muxer_%u", splitmux->fragment_id);
|
|
if ((splitmux->muxer =
|
|
create_element (splitmux, splitmux->muxer_factory, newname,
|
|
TRUE)) == NULL)
|
|
goto fail;
|
|
if (g_object_class_find_property (G_OBJECT_GET_CLASS (splitmux->sink),
|
|
"async") != NULL) {
|
|
/* async child elements are causing state change races and weird
|
|
* failures, so let's try and turn that off */
|
|
g_object_set (splitmux->sink, "async", FALSE, NULL);
|
|
}
|
|
if (splitmux->muxer_properties)
|
|
gst_structure_foreach (splitmux->muxer_properties,
|
|
_set_property_from_structure, splitmux->muxer);
|
|
g_signal_emit (splitmux, signals[SIGNAL_MUXER_ADDED], 0, splitmux->muxer);
|
|
g_free (newname);
|
|
new_sink = splitmux->sink;
|
|
new_muxer = splitmux->muxer;
|
|
GST_SPLITMUX_UNLOCK (splitmux);
|
|
g_list_foreach (splitmux->contexts, (GFunc) relink_context, splitmux);
|
|
gst_element_link (new_muxer, new_sink);
|
|
|
|
if (g_object_get_qdata ((GObject *) sink, EOS_FROM_US)) {
|
|
if (GPOINTER_TO_INT (g_object_get_qdata ((GObject *) sink,
|
|
EOS_FROM_US)) == 2) {
|
|
_lock_and_set_to_null (muxer, splitmux);
|
|
_lock_and_set_to_null (sink, splitmux);
|
|
} else {
|
|
g_object_set_qdata ((GObject *) sink, EOS_FROM_US,
|
|
GINT_TO_POINTER (2));
|
|
}
|
|
}
|
|
gst_object_unref (muxer);
|
|
gst_object_unref (sink);
|
|
muxer = new_muxer;
|
|
sink = new_sink;
|
|
gst_object_ref (muxer);
|
|
gst_object_ref (sink);
|
|
}
|
|
} else {
|
|
|
|
gst_element_set_locked_state (muxer, TRUE);
|
|
gst_element_set_locked_state (sink, TRUE);
|
|
gst_element_set_state (sink, GST_STATE_NULL);
|
|
|
|
if (splitmux->reset_muxer) {
|
|
gst_element_set_state (muxer, GST_STATE_NULL);
|
|
} else {
|
|
GstIterator *it = gst_element_iterate_sink_pads (muxer);
|
|
GstEvent *ev;
|
|
|
|
ev = gst_event_new_flush_start ();
|
|
while (gst_iterator_foreach (it, _send_event, ev) == GST_ITERATOR_RESYNC);
|
|
gst_event_unref (ev);
|
|
|
|
gst_iterator_resync (it);
|
|
|
|
ev = gst_event_new_flush_stop (TRUE);
|
|
while (gst_iterator_foreach (it, _send_event, ev) == GST_ITERATOR_RESYNC);
|
|
gst_event_unref (ev);
|
|
|
|
gst_iterator_free (it);
|
|
}
|
|
}
|
|
|
|
GST_SPLITMUX_LOCK (splitmux);
|
|
if (splitmux->muxed_out_bytes > 0 || splitmux->fragment_id == 0)
|
|
set_next_filename (splitmux, ctx);
|
|
splitmux->muxed_out_bytes = 0;
|
|
GST_SPLITMUX_UNLOCK (splitmux);
|
|
|
|
gst_element_set_state (sink, GST_STATE_TARGET (splitmux));
|
|
gst_element_set_state (muxer, GST_STATE_TARGET (splitmux));
|
|
gst_element_set_locked_state (muxer, FALSE);
|
|
gst_element_set_locked_state (sink, FALSE);
|
|
|
|
gst_object_unref (sink);
|
|
gst_object_unref (muxer);
|
|
|
|
GST_SPLITMUX_LOCK (splitmux);
|
|
GST_STATE_UNLOCK (splitmux);
|
|
splitmux->switching_fragment = FALSE;
|
|
do_async_done (splitmux);
|
|
|
|
splitmux->ready_for_output = TRUE;
|
|
|
|
g_list_foreach (splitmux->contexts, (GFunc) unlock_context, splitmux);
|
|
g_list_foreach (splitmux->contexts, (GFunc) restart_context, splitmux);
|
|
|
|
send_fragment_opened_closed_msg (splitmux, TRUE, sink);
|
|
|
|
/* FIXME: Is this always the correct next state? */
|
|
splitmux->output_state = SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND;
|
|
GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
|
|
return;
|
|
|
|
fail:
|
|
GST_STATE_UNLOCK (splitmux);
|
|
GST_ELEMENT_ERROR (splitmux, RESOURCE, SETTINGS,
|
|
("Could not create the new muxer/sink"), NULL);
|
|
}
|
|
|
|
static void
|
|
bus_handler (GstBin * bin, GstMessage * message)
|
|
{
|
|
GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (bin);
|
|
|
|
switch (GST_MESSAGE_TYPE (message)) {
|
|
case GST_MESSAGE_EOS:{
|
|
/* If the state is draining out the current file, drop this EOS */
|
|
GstElement *sink;
|
|
|
|
sink = GST_ELEMENT (GST_MESSAGE_SRC (message));
|
|
GST_SPLITMUX_LOCK (splitmux);
|
|
|
|
send_fragment_opened_closed_msg (splitmux, FALSE, sink);
|
|
|
|
if (splitmux->async_finalize) {
|
|
|
|
if (g_object_get_qdata ((GObject *) sink, EOS_FROM_US)) {
|
|
if (GPOINTER_TO_INT (g_object_get_qdata ((GObject *) sink,
|
|
EOS_FROM_US)) == 2) {
|
|
GstElement *muxer;
|
|
GstPad *sinksink, *muxersrc;
|
|
|
|
sinksink = gst_element_get_static_pad (sink, "sink");
|
|
muxersrc = gst_pad_get_peer (sinksink);
|
|
muxer = gst_pad_get_parent_element (muxersrc);
|
|
gst_object_unref (sinksink);
|
|
gst_object_unref (muxersrc);
|
|
|
|
gst_element_call_async (muxer,
|
|
(GstElementCallAsyncFunc) _lock_and_set_to_null,
|
|
gst_object_ref (splitmux), gst_object_unref);
|
|
gst_element_call_async (sink,
|
|
(GstElementCallAsyncFunc) _lock_and_set_to_null,
|
|
gst_object_ref (splitmux), gst_object_unref);
|
|
gst_object_unref (muxer);
|
|
} else {
|
|
g_object_set_qdata ((GObject *) sink, EOS_FROM_US,
|
|
GINT_TO_POINTER (2));
|
|
}
|
|
GST_DEBUG_OBJECT (splitmux,
|
|
"Caught async EOS from previous muxer+sink. Dropping.");
|
|
/* We forward the EOS so that it gets aggregated as normal. If the sink
|
|
* finishes and is removed before the end, it will be de-aggregated */
|
|
gst_message_unref (message);
|
|
GST_SPLITMUX_UNLOCK (splitmux);
|
|
return;
|
|
}
|
|
} else if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_ENDING_FILE) {
|
|
GST_DEBUG_OBJECT (splitmux, "Caught EOS at end of fragment, dropping");
|
|
splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE;
|
|
GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
|
|
|
|
gst_message_unref (message);
|
|
GST_SPLITMUX_UNLOCK (splitmux);
|
|
return;
|
|
} else {
|
|
GST_DEBUG_OBJECT (splitmux,
|
|
"Passing EOS message. Output state %d max_out_running_time %"
|
|
GST_STIME_FORMAT, splitmux->output_state,
|
|
GST_STIME_ARGS (splitmux->max_out_running_time));
|
|
}
|
|
GST_SPLITMUX_UNLOCK (splitmux);
|
|
break;
|
|
}
|
|
case GST_MESSAGE_ASYNC_START:
|
|
case GST_MESSAGE_ASYNC_DONE:
|
|
/* Ignore state changes from our children while switching */
|
|
GST_SPLITMUX_LOCK (splitmux);
|
|
if (splitmux->switching_fragment) {
|
|
if (GST_MESSAGE_SRC (message) == (GstObject *) splitmux->active_sink
|
|
|| GST_MESSAGE_SRC (message) == (GstObject *) splitmux->muxer) {
|
|
GST_LOG_OBJECT (splitmux,
|
|
"Ignoring state change from child %" GST_PTR_FORMAT
|
|
" while switching", GST_MESSAGE_SRC (message));
|
|
gst_message_unref (message);
|
|
GST_SPLITMUX_UNLOCK (splitmux);
|
|
return;
|
|
}
|
|
}
|
|
GST_SPLITMUX_UNLOCK (splitmux);
|
|
break;
|
|
case GST_MESSAGE_WARNING:
|
|
{
|
|
GError *gerror = NULL;
|
|
|
|
gst_message_parse_warning (message, &gerror, NULL);
|
|
|
|
if (g_error_matches (gerror, GST_STREAM_ERROR, GST_STREAM_ERROR_FORMAT)) {
|
|
GList *item;
|
|
gboolean caps_change = FALSE;
|
|
|
|
GST_SPLITMUX_LOCK (splitmux);
|
|
|
|
for (item = splitmux->contexts; item; item = item->next) {
|
|
MqStreamCtx *ctx = item->data;
|
|
|
|
if (ctx->caps_change) {
|
|
caps_change = TRUE;
|
|
break;
|
|
}
|
|
}
|
|
|
|
GST_SPLITMUX_UNLOCK (splitmux);
|
|
|
|
if (caps_change) {
|
|
GST_LOG_OBJECT (splitmux,
|
|
"Ignoring warning change from child %" GST_PTR_FORMAT
|
|
" while switching caps", GST_MESSAGE_SRC (message));
|
|
gst_message_unref (message);
|
|
return;
|
|
}
|
|
}
|
|
break;
|
|
}
|
|
default:
|
|
break;
|
|
}
|
|
|
|
GST_BIN_CLASS (parent_class)->handle_message (bin, message);
|
|
}
|
|
|
|
static void
|
|
ctx_set_unblock (MqStreamCtx * ctx)
|
|
{
|
|
ctx->need_unblock = TRUE;
|
|
}
|
|
|
|
static gboolean
|
|
need_new_fragment (GstSplitMuxSink * splitmux,
|
|
GstClockTime queued_time, GstClockTime queued_gop_time,
|
|
guint64 queued_bytes)
|
|
{
|
|
guint64 thresh_bytes;
|
|
GstClockTime thresh_time;
|
|
gboolean check_robust_muxing;
|
|
|
|
GST_OBJECT_LOCK (splitmux);
|
|
thresh_bytes = splitmux->threshold_bytes;
|
|
thresh_time = splitmux->threshold_time;
|
|
check_robust_muxing = splitmux->use_robust_muxing
|
|
&& splitmux->muxer_has_reserved_props;
|
|
GST_OBJECT_UNLOCK (splitmux);
|
|
|
|
/* Have we muxed anything into the new file at all? */
|
|
if (splitmux->fragment_total_bytes <= 0)
|
|
return FALSE;
|
|
|
|
/* User told us to split now */
|
|
if (g_atomic_int_get (&(splitmux->split_now)) == TRUE)
|
|
return TRUE;
|
|
|
|
if (thresh_bytes > 0 && queued_bytes > thresh_bytes)
|
|
return TRUE; /* Would overrun byte limit */
|
|
|
|
if (thresh_time > 0 && queued_time > thresh_time)
|
|
return TRUE; /* Would overrun byte limit */
|
|
|
|
/* Timecode-based threshold accounts for possible rounding errors:
|
|
* 5us should be bigger than all possible rounding errors but nowhere near
|
|
* big enough to skip to another frame */
|
|
if (splitmux->next_max_tc_time != GST_CLOCK_TIME_NONE &&
|
|
splitmux->reference_ctx->in_running_time >
|
|
splitmux->next_max_tc_time + 5 * GST_USECOND)
|
|
return TRUE; /* Timecode threshold */
|
|
|
|
if (check_robust_muxing) {
|
|
GstClockTime mux_reserved_remain;
|
|
|
|
g_object_get (splitmux->muxer,
|
|
"reserved-duration-remaining", &mux_reserved_remain, NULL);
|
|
|
|
GST_LOG_OBJECT (splitmux,
|
|
"Muxer robust muxing report - %" G_GUINT64_FORMAT
|
|
" remaining. New GOP would enqueue %" G_GUINT64_FORMAT,
|
|
mux_reserved_remain, queued_gop_time);
|
|
|
|
if (queued_gop_time >= mux_reserved_remain) {
|
|
GST_INFO_OBJECT (splitmux,
|
|
"File is about to run out of header room - %" G_GUINT64_FORMAT
|
|
" remaining. New GOP would enqueue %" G_GUINT64_FORMAT
|
|
". Switching to new file", mux_reserved_remain, queued_gop_time);
|
|
return TRUE;
|
|
}
|
|
}
|
|
|
|
/* Continue and mux this GOP */
|
|
return FALSE;
|
|
}
|
|
|
|
/* Called with splitmux lock held */
|
|
/* Called when entering ProcessingCompleteGop state
|
|
* Assess if mq contents overflowed the current file
|
|
* -> If yes, need to switch to new file
|
|
* -> if no, set max_out_running_time to let this GOP in and
|
|
* go to COLLECTING_GOP_START state
|
|
*/
|
|
static void
|
|
handle_gathered_gop (GstSplitMuxSink * splitmux)
|
|
{
|
|
guint64 queued_bytes;
|
|
GstClockTimeDiff queued_time = 0;
|
|
GstClockTimeDiff queued_gop_time = 0;
|
|
GstClockTimeDiff new_out_ts = splitmux->reference_ctx->in_running_time;
|
|
SplitMuxOutputCommand *cmd;
|
|
|
|
/* Assess if the multiqueue contents overflowed the current file */
|
|
/* When considering if a newly gathered GOP overflows
|
|
* the time limit for the file, only consider the running time of the
|
|
* reference stream. Other streams might have run ahead a little bit,
|
|
* but extra pieces won't be released to the muxer beyond the reference
|
|
* stream cut-off anyway - so it forms the limit. */
|
|
queued_bytes = splitmux->fragment_total_bytes + splitmux->gop_total_bytes;
|
|
queued_time = splitmux->reference_ctx->in_running_time;
|
|
/* queued_gop_time tracks how much unwritten data there is waiting to
|
|
* be written to this fragment including this GOP */
|
|
if (splitmux->reference_ctx->out_running_time != GST_CLOCK_STIME_NONE)
|
|
queued_gop_time =
|
|
splitmux->reference_ctx->in_running_time -
|
|
splitmux->reference_ctx->out_running_time;
|
|
else
|
|
queued_gop_time =
|
|
splitmux->reference_ctx->in_running_time - splitmux->gop_start_time;
|
|
|
|
GST_LOG_OBJECT (splitmux, " queued_bytes %" G_GUINT64_FORMAT, queued_bytes);
|
|
|
|
g_assert (queued_gop_time >= 0);
|
|
g_assert (queued_time >= splitmux->fragment_start_time);
|
|
|
|
queued_time -= splitmux->fragment_start_time;
|
|
if (queued_time < queued_gop_time)
|
|
queued_gop_time = queued_time;
|
|
|
|
/* Expand queued bytes estimate by muxer overhead */
|
|
queued_bytes += (queued_bytes * splitmux->mux_overhead);
|
|
|
|
GST_LOG_OBJECT (splitmux, "mq at TS %" GST_STIME_FORMAT
|
|
" bytes %" G_GUINT64_FORMAT, GST_STIME_ARGS (queued_time), queued_bytes);
|
|
if (splitmux->next_max_tc_time != GST_CLOCK_TIME_NONE) {
|
|
GST_LOG_OBJECT (splitmux,
|
|
"timecode mq TS %" GST_TIME_FORMAT " vs target %" GST_TIME_FORMAT,
|
|
GST_TIME_ARGS (splitmux->reference_ctx->in_running_time),
|
|
GST_TIME_ARGS (splitmux->next_max_tc_time + 5 * GST_USECOND));
|
|
}
|
|
|
|
/* Check for overrun - have we output at least one byte and overrun
|
|
* either threshold? */
|
|
if (need_new_fragment (splitmux, queued_time, queued_gop_time, queued_bytes)) {
|
|
GstClockTime *sink_running_time = g_new (GstClockTime, 1);
|
|
*sink_running_time = splitmux->reference_ctx->out_running_time;
|
|
g_object_set_qdata_full (G_OBJECT (splitmux->sink),
|
|
RUNNING_TIME, sink_running_time, g_free);
|
|
g_atomic_int_set (&(splitmux->split_now), FALSE);
|
|
/* Tell the output side to start a new fragment */
|
|
GST_INFO_OBJECT (splitmux,
|
|
"This GOP (dur %" GST_STIME_FORMAT
|
|
") would overflow the fragment, Sending start_new_fragment cmd",
|
|
GST_STIME_ARGS (splitmux->reference_ctx->in_running_time -
|
|
splitmux->gop_start_time));
|
|
cmd = out_cmd_buf_new ();
|
|
cmd->start_new_fragment = TRUE;
|
|
g_queue_push_head (&splitmux->out_cmd_q, cmd);
|
|
GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
|
|
|
|
new_out_ts = splitmux->reference_ctx->in_running_time;
|
|
splitmux->fragment_start_time = splitmux->gop_start_time;
|
|
splitmux->fragment_total_bytes = 0;
|
|
|
|
if (request_next_keyframe (splitmux,
|
|
splitmux->reference_ctx->prev_in_keyframe) == FALSE) {
|
|
GST_WARNING_OBJECT (splitmux,
|
|
"Could not request a keyframe. Files may not split at the exact location they should");
|
|
}
|
|
gst_buffer_replace (&splitmux->reference_ctx->prev_in_keyframe, NULL);
|
|
}
|
|
|
|
/* And set up to collect the next GOP */
|
|
if (!splitmux->reference_ctx->in_eos) {
|
|
splitmux->input_state = SPLITMUX_INPUT_STATE_COLLECTING_GOP_START;
|
|
splitmux->gop_start_time = new_out_ts;
|
|
} else {
|
|
/* This is probably already the current state, but just in case: */
|
|
splitmux->input_state = SPLITMUX_INPUT_STATE_FINISHING_UP;
|
|
new_out_ts = GST_CLOCK_STIME_NONE; /* EOS runs until forever */
|
|
}
|
|
|
|
/* And wake all input contexts to send a wake-up event */
|
|
g_list_foreach (splitmux->contexts, (GFunc) ctx_set_unblock, NULL);
|
|
GST_SPLITMUX_BROADCAST_INPUT (splitmux);
|
|
|
|
/* Now either way - either there was no overflow, or we requested a new fragment: release this GOP */
|
|
splitmux->fragment_total_bytes += splitmux->gop_total_bytes;
|
|
|
|
if (splitmux->gop_total_bytes > 0) {
|
|
GST_LOG_OBJECT (splitmux,
|
|
"Releasing GOP to output. Bytes in fragment now %" G_GUINT64_FORMAT
|
|
" time %" GST_STIME_FORMAT,
|
|
splitmux->fragment_total_bytes, GST_STIME_ARGS (queued_time));
|
|
|
|
/* Send this GOP to the output command queue */
|
|
cmd = out_cmd_buf_new ();
|
|
cmd->start_new_fragment = FALSE;
|
|
cmd->max_output_ts = new_out_ts;
|
|
GST_LOG_OBJECT (splitmux, "Sending GOP cmd to output for TS %"
|
|
GST_STIME_FORMAT, GST_STIME_ARGS (new_out_ts));
|
|
g_queue_push_head (&splitmux->out_cmd_q, cmd);
|
|
|
|
GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
|
|
}
|
|
|
|
splitmux->gop_total_bytes = 0;
|
|
}
|
|
|
|
/* Called with splitmux lock held */
|
|
/* Called from each input pad when it is has all the pieces
|
|
* for a GOP or EOS, starting with the reference pad which has set the
|
|
* splitmux->max_in_running_time
|
|
*/
|
|
static void
|
|
check_completed_gop (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
|
|
{
|
|
GList *cur;
|
|
GstEvent *event;
|
|
|
|
/* On ENDING_FILE, the reference stream sends a command to start a new
|
|
* fragment, then releases the GOP for output in the new fragment.
|
|
* If somes streams received no buffer during the last GOP that overran,
|
|
* because its next buffer has a timestamp bigger than
|
|
* ctx->max_in_running_time, its queue is empty. In that case the only
|
|
* way to wakeup the output thread is by injecting an event in the
|
|
* queue. This usually happen with subtitle streams.
|
|
* See https://bugzilla.gnome.org/show_bug.cgi?id=763711. */
|
|
if (ctx->need_unblock) {
|
|
GST_LOG_OBJECT (ctx->sinkpad, "Sending splitmuxsink-unblock event");
|
|
event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM |
|
|
GST_EVENT_TYPE_SERIALIZED,
|
|
gst_structure_new ("splitmuxsink-unblock", "timestamp",
|
|
G_TYPE_INT64, splitmux->max_in_running_time, NULL));
|
|
|
|
GST_SPLITMUX_UNLOCK (splitmux);
|
|
gst_pad_send_event (ctx->sinkpad, event);
|
|
GST_SPLITMUX_LOCK (splitmux);
|
|
|
|
ctx->need_unblock = FALSE;
|
|
GST_SPLITMUX_BROADCAST_INPUT (splitmux);
|
|
/* state may have changed while we were unlocked. Loop again if so */
|
|
if (splitmux->input_state != SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT)
|
|
return;
|
|
}
|
|
|
|
if (splitmux->input_state == SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT) {
|
|
gboolean ready = TRUE;
|
|
|
|
/* Iterate each pad, and check that the input running time is at least
|
|
* up to the reference running time, and if so handle the collected GOP */
|
|
GST_LOG_OBJECT (splitmux, "Checking GOP collected, Max in running time %"
|
|
GST_STIME_FORMAT " ctx %p",
|
|
GST_STIME_ARGS (splitmux->max_in_running_time), ctx);
|
|
for (cur = g_list_first (splitmux->contexts); cur != NULL;
|
|
cur = g_list_next (cur)) {
|
|
MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
|
|
|
|
GST_LOG_OBJECT (splitmux,
|
|
"Context %p (src pad %" GST_PTR_FORMAT ") TS %" GST_STIME_FORMAT
|
|
" EOS %d", tmpctx, tmpctx->srcpad,
|
|
GST_STIME_ARGS (tmpctx->in_running_time), tmpctx->in_eos);
|
|
|
|
if (splitmux->max_in_running_time != GST_CLOCK_STIME_NONE &&
|
|
tmpctx->in_running_time < splitmux->max_in_running_time &&
|
|
!tmpctx->in_eos) {
|
|
GST_LOG_OBJECT (splitmux,
|
|
"Context %p (src pad %" GST_PTR_FORMAT ") not ready. We'll sleep",
|
|
tmpctx, tmpctx->srcpad);
|
|
ready = FALSE;
|
|
break;
|
|
}
|
|
}
|
|
if (ready) {
|
|
GST_DEBUG_OBJECT (splitmux,
|
|
"Collected GOP is complete. Processing (ctx %p)", ctx);
|
|
/* All pads have a complete GOP, release it into the multiqueue */
|
|
handle_gathered_gop (splitmux);
|
|
}
|
|
}
|
|
|
|
/* If upstream reached EOS we are not expecting more data, no need to wait
|
|
* here. */
|
|
if (ctx->in_eos)
|
|
return;
|
|
|
|
/* Some pad is not yet ready, or GOP is being pushed
|
|
* either way, sleep and wait to get woken */
|
|
while (splitmux->input_state == SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT &&
|
|
!ctx->flushing &&
|
|
(ctx->in_running_time >= splitmux->max_in_running_time) &&
|
|
(splitmux->max_in_running_time != GST_CLOCK_STIME_NONE)) {
|
|
|
|
GST_LOG_OBJECT (splitmux, "Sleeping for GOP collection (ctx %p)", ctx);
|
|
GST_SPLITMUX_WAIT_INPUT (splitmux);
|
|
GST_LOG_OBJECT (splitmux, "Done waiting for complete GOP (ctx %p)", ctx);
|
|
}
|
|
}
|
|
|
|
static GstPadProbeReturn
|
|
handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
|
|
{
|
|
GstSplitMuxSink *splitmux = ctx->splitmux;
|
|
GstBuffer *buf;
|
|
MqStreamBuf *buf_info = NULL;
|
|
GstClockTime ts;
|
|
gboolean loop_again;
|
|
gboolean keyframe = FALSE;
|
|
|
|
GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type);
|
|
|
|
/* FIXME: Handle buffer lists, until then make it clear they won't work */
|
|
if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
|
|
g_warning ("Buffer list handling not implemented");
|
|
return GST_PAD_PROBE_DROP;
|
|
}
|
|
if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM ||
|
|
info->type & GST_PAD_PROBE_TYPE_EVENT_FLUSH) {
|
|
GstEvent *event = gst_pad_probe_info_get_event (info);
|
|
|
|
GST_LOG_OBJECT (pad, "Event %" GST_PTR_FORMAT, event);
|
|
|
|
switch (GST_EVENT_TYPE (event)) {
|
|
case GST_EVENT_SEGMENT:
|
|
gst_event_copy_segment (event, &ctx->in_segment);
|
|
break;
|
|
case GST_EVENT_FLUSH_STOP:
|
|
GST_SPLITMUX_LOCK (splitmux);
|
|
gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
|
|
ctx->in_eos = FALSE;
|
|
ctx->in_running_time = GST_CLOCK_STIME_NONE;
|
|
GST_SPLITMUX_UNLOCK (splitmux);
|
|
break;
|
|
case GST_EVENT_EOS:
|
|
GST_SPLITMUX_LOCK (splitmux);
|
|
ctx->in_eos = TRUE;
|
|
|
|
if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED)
|
|
goto beach;
|
|
|
|
if (ctx->is_reference) {
|
|
GST_INFO_OBJECT (splitmux, "Got Reference EOS. Finishing up");
|
|
/* check_completed_gop will act as if this is a new keyframe with infinite timestamp */
|
|
splitmux->input_state = SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT;
|
|
/* Wake up other input pads to collect this GOP */
|
|
GST_SPLITMUX_BROADCAST_INPUT (splitmux);
|
|
check_completed_gop (splitmux, ctx);
|
|
} else if (splitmux->input_state ==
|
|
SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT) {
|
|
/* If we are waiting for a GOP to be completed (ie, for aux
|
|
* pads to catch up), then this pad is complete, so check
|
|
* if the whole GOP is.
|
|
*/
|
|
check_completed_gop (splitmux, ctx);
|
|
}
|
|
GST_SPLITMUX_UNLOCK (splitmux);
|
|
break;
|
|
case GST_EVENT_GAP:{
|
|
GstClockTime gap_ts;
|
|
GstClockTimeDiff rtime;
|
|
|
|
gst_event_parse_gap (event, &gap_ts, NULL);
|
|
if (gap_ts == GST_CLOCK_TIME_NONE)
|
|
break;
|
|
|
|
GST_SPLITMUX_LOCK (splitmux);
|
|
|
|
if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED)
|
|
goto beach;
|
|
rtime = my_segment_to_running_time (&ctx->in_segment, gap_ts);
|
|
|
|
GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_STIME_FORMAT,
|
|
GST_STIME_ARGS (rtime));
|
|
|
|
if (ctx->is_reference
|
|
&& splitmux->fragment_start_time == GST_CLOCK_STIME_NONE) {
|
|
splitmux->gop_start_time = splitmux->fragment_start_time = rtime;
|
|
GST_LOG_OBJECT (splitmux, "Mux start time now %" GST_STIME_FORMAT,
|
|
GST_STIME_ARGS (splitmux->fragment_start_time));
|
|
/* Also take this as the first start time when starting up,
|
|
* so that we start counting overflow from the first frame */
|
|
if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time))
|
|
splitmux->max_in_running_time = splitmux->fragment_start_time;
|
|
}
|
|
|
|
GST_SPLITMUX_UNLOCK (splitmux);
|
|
break;
|
|
}
|
|
default:
|
|
break;
|
|
}
|
|
return GST_PAD_PROBE_PASS;
|
|
} else if (info->type & GST_PAD_PROBE_TYPE_QUERY_DOWNSTREAM) {
|
|
switch (GST_QUERY_TYPE (GST_QUERY (info->data))) {
|
|
case GST_QUERY_ALLOCATION:
|
|
return GST_PAD_PROBE_DROP;
|
|
default:
|
|
return GST_PAD_PROBE_PASS;
|
|
}
|
|
}
|
|
|
|
buf = gst_pad_probe_info_get_buffer (info);
|
|
buf_info = mq_stream_buf_new ();
|
|
|
|
if (GST_BUFFER_PTS_IS_VALID (buf))
|
|
ts = GST_BUFFER_PTS (buf);
|
|
else
|
|
ts = GST_BUFFER_DTS (buf);
|
|
|
|
GST_LOG_OBJECT (pad, "Buffer TS is %" GST_TIME_FORMAT, GST_TIME_ARGS (ts));
|
|
|
|
GST_SPLITMUX_LOCK (splitmux);
|
|
|
|
if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED)
|
|
goto beach;
|
|
|
|
/* If this buffer has a timestamp, advance the input timestamp of the
|
|
* stream */
|
|
if (GST_CLOCK_TIME_IS_VALID (ts)) {
|
|
GstClockTimeDiff running_time =
|
|
my_segment_to_running_time (&ctx->in_segment, ts);
|
|
|
|
GST_LOG_OBJECT (pad, "Buffer running TS is %" GST_STIME_FORMAT,
|
|
GST_STIME_ARGS (running_time));
|
|
|
|
if (GST_CLOCK_STIME_IS_VALID (running_time)
|
|
&& running_time > ctx->in_running_time)
|
|
ctx->in_running_time = running_time;
|
|
}
|
|
|
|
/* Try to make sure we have a valid running time */
|
|
if (!GST_CLOCK_STIME_IS_VALID (ctx->in_running_time)) {
|
|
ctx->in_running_time =
|
|
my_segment_to_running_time (&ctx->in_segment, ctx->in_segment.start);
|
|
}
|
|
|
|
GST_LOG_OBJECT (pad, "in running time now %" GST_STIME_FORMAT,
|
|
GST_STIME_ARGS (ctx->in_running_time));
|
|
|
|
buf_info->run_ts = ctx->in_running_time;
|
|
buf_info->buf_size = gst_buffer_get_size (buf);
|
|
buf_info->duration = GST_BUFFER_DURATION (buf);
|
|
|
|
/* initialize fragment_start_time */
|
|
if (ctx->is_reference
|
|
&& splitmux->fragment_start_time == GST_CLOCK_STIME_NONE) {
|
|
splitmux->gop_start_time = splitmux->fragment_start_time = buf_info->run_ts;
|
|
GST_LOG_OBJECT (splitmux, "Mux start time now %" GST_STIME_FORMAT,
|
|
GST_STIME_ARGS (splitmux->fragment_start_time));
|
|
gst_buffer_replace (&ctx->prev_in_keyframe, buf);
|
|
|
|
/* Also take this as the first start time when starting up,
|
|
* so that we start counting overflow from the first frame */
|
|
if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time))
|
|
splitmux->max_in_running_time = splitmux->fragment_start_time;
|
|
if (request_next_keyframe (splitmux, ctx->prev_in_keyframe) == FALSE) {
|
|
GST_WARNING_OBJECT (splitmux,
|
|
"Could not request a keyframe. Files may not split at the exact location they should");
|
|
}
|
|
gst_buffer_replace (&splitmux->reference_ctx->prev_in_keyframe, NULL);
|
|
}
|
|
|
|
GST_DEBUG_OBJECT (pad, "Buf TS %" GST_STIME_FORMAT
|
|
" total GOP bytes %" G_GUINT64_FORMAT,
|
|
GST_STIME_ARGS (buf_info->run_ts), splitmux->gop_total_bytes);
|
|
|
|
loop_again = TRUE;
|
|
do {
|
|
if (ctx->flushing)
|
|
break;
|
|
|
|
switch (splitmux->input_state) {
|
|
case SPLITMUX_INPUT_STATE_COLLECTING_GOP_START:
|
|
if (ctx->is_reference) {
|
|
/* This is the reference context. If it's a keyframe,
|
|
* it marks the start of a new GOP and we should wait in
|
|
* check_completed_gop before continuing, but either way
|
|
* (keyframe or no, we'll pass this buffer through after
|
|
* so set loop_again to FALSE */
|
|
loop_again = FALSE;
|
|
|
|
if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT)) {
|
|
/* Allow other input pads to catch up to here too */
|
|
splitmux->max_in_running_time = ctx->in_running_time;
|
|
GST_LOG_OBJECT (splitmux,
|
|
"Max in running time now %" GST_TIME_FORMAT,
|
|
GST_TIME_ARGS (splitmux->max_in_running_time));
|
|
GST_SPLITMUX_BROADCAST_INPUT (splitmux);
|
|
break;
|
|
}
|
|
GST_INFO_OBJECT (pad,
|
|
"Have keyframe with running time %" GST_STIME_FORMAT,
|
|
GST_STIME_ARGS (ctx->in_running_time));
|
|
keyframe = TRUE;
|
|
splitmux->input_state = SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT;
|
|
splitmux->max_in_running_time = ctx->in_running_time;
|
|
GST_LOG_OBJECT (splitmux, "Max in running time now %" GST_TIME_FORMAT,
|
|
GST_TIME_ARGS (splitmux->max_in_running_time));
|
|
/* Wake up other input pads to collect this GOP */
|
|
GST_SPLITMUX_BROADCAST_INPUT (splitmux);
|
|
check_completed_gop (splitmux, ctx);
|
|
/* Store this new keyframe to remember the start of GOP */
|
|
gst_buffer_replace (&ctx->prev_in_keyframe, buf);
|
|
} else {
|
|
/* Pass this buffer if the reference ctx is far enough ahead */
|
|
if (ctx->in_running_time < splitmux->max_in_running_time) {
|
|
loop_again = FALSE;
|
|
break;
|
|
}
|
|
|
|
/* We're still waiting for a keyframe on the reference pad, sleep */
|
|
GST_LOG_OBJECT (pad, "Sleeping for GOP start");
|
|
GST_SPLITMUX_WAIT_INPUT (splitmux);
|
|
GST_LOG_OBJECT (pad,
|
|
"Done sleeping for GOP start input state now %d",
|
|
splitmux->input_state);
|
|
}
|
|
break;
|
|
case SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT:{
|
|
/* We're collecting a GOP. If this is the reference context,
|
|
* we need to check if this is a keyframe that marks the start
|
|
* of the next GOP. If it is, it marks the end of the GOP we're
|
|
* collecting, so sleep and wait until all the other pads also
|
|
* reach that timestamp - at which point, we have an entire GOP
|
|
* and either go to ENDING_FILE or release this GOP to the muxer and
|
|
* go back to COLLECT_GOP_START. */
|
|
|
|
/* If we overran the target timestamp, it might be time to process
|
|
* the GOP, otherwise bail out for more data
|
|
*/
|
|
GST_LOG_OBJECT (pad,
|
|
"Checking TS %" GST_STIME_FORMAT " against max %"
|
|
GST_STIME_FORMAT, GST_STIME_ARGS (ctx->in_running_time),
|
|
GST_STIME_ARGS (splitmux->max_in_running_time));
|
|
|
|
if (ctx->in_running_time < splitmux->max_in_running_time) {
|
|
loop_again = FALSE;
|
|
break;
|
|
}
|
|
|
|
GST_LOG_OBJECT (pad,
|
|
"Collected last packet of GOP. Checking other pads");
|
|
check_completed_gop (splitmux, ctx);
|
|
break;
|
|
}
|
|
case SPLITMUX_INPUT_STATE_FINISHING_UP:
|
|
loop_again = FALSE;
|
|
break;
|
|
default:
|
|
loop_again = FALSE;
|
|
break;
|
|
}
|
|
}
|
|
while (loop_again);
|
|
|
|
if (keyframe) {
|
|
splitmux->queued_keyframes++;
|
|
buf_info->keyframe = TRUE;
|
|
}
|
|
|
|
/* Update total input byte counter for overflow detect */
|
|
splitmux->gop_total_bytes += buf_info->buf_size;
|
|
|
|
/* Now add this buffer to the queue just before returning */
|
|
g_queue_push_head (&ctx->queued_bufs, buf_info);
|
|
|
|
GST_LOG_OBJECT (pad, "Returning to queue buffer %" GST_PTR_FORMAT
|
|
" run ts %" GST_STIME_FORMAT, buf, GST_STIME_ARGS (ctx->in_running_time));
|
|
|
|
GST_SPLITMUX_UNLOCK (splitmux);
|
|
return GST_PAD_PROBE_PASS;
|
|
|
|
beach:
|
|
GST_SPLITMUX_UNLOCK (splitmux);
|
|
if (buf_info)
|
|
mq_stream_buf_free (buf_info);
|
|
return GST_PAD_PROBE_PASS;
|
|
}
|
|
|
|
static void
|
|
grow_blocked_queues (GstSplitMuxSink * splitmux)
|
|
{
|
|
GList *cur;
|
|
|
|
/* Scan other queues for full-ness and grow them */
|
|
for (cur = g_list_first (splitmux->contexts);
|
|
cur != NULL; cur = g_list_next (cur)) {
|
|
MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
|
|
guint cur_limit;
|
|
guint cur_len = g_queue_get_length (&tmpctx->queued_bufs);
|
|
|
|
g_object_get (tmpctx->q, "max-size-buffers", &cur_limit, NULL);
|
|
GST_LOG_OBJECT (tmpctx->q, "Queue len %u", cur_len);
|
|
|
|
if (cur_len >= cur_limit) {
|
|
cur_limit = cur_len + 1;
|
|
GST_DEBUG_OBJECT (tmpctx->q,
|
|
"Queue overflowed and needs enlarging. Growing to %u buffers",
|
|
cur_limit);
|
|
g_object_set (tmpctx->q, "max-size-buffers", cur_limit, NULL);
|
|
}
|
|
}
|
|
}
|
|
|
|
static void
|
|
handle_q_underrun (GstElement * q, gpointer user_data)
|
|
{
|
|
MqStreamCtx *ctx = (MqStreamCtx *) (user_data);
|
|
GstSplitMuxSink *splitmux = ctx->splitmux;
|
|
|
|
GST_SPLITMUX_LOCK (splitmux);
|
|
GST_DEBUG_OBJECT (q,
|
|
"Queue reported underrun with %d keyframes and %d cmds enqueued",
|
|
splitmux->queued_keyframes, g_queue_get_length (&splitmux->out_cmd_q));
|
|
grow_blocked_queues (splitmux);
|
|
GST_SPLITMUX_UNLOCK (splitmux);
|
|
}
|
|
|
|
static void
|
|
handle_q_overrun (GstElement * q, gpointer user_data)
|
|
{
|
|
MqStreamCtx *ctx = (MqStreamCtx *) (user_data);
|
|
GstSplitMuxSink *splitmux = ctx->splitmux;
|
|
gboolean allow_grow = FALSE;
|
|
|
|
GST_SPLITMUX_LOCK (splitmux);
|
|
GST_DEBUG_OBJECT (q,
|
|
"Queue reported overrun with %d keyframes and %d cmds enqueued",
|
|
splitmux->queued_keyframes, g_queue_get_length (&splitmux->out_cmd_q));
|
|
|
|
if (splitmux->queued_keyframes < 2) {
|
|
/* Less than a full GOP queued, grow the queue */
|
|
allow_grow = TRUE;
|
|
} else if (g_queue_get_length (&splitmux->out_cmd_q) < 1) {
|
|
allow_grow = TRUE;
|
|
} else {
|
|
/* If another queue is starved, grow */
|
|
GList *cur;
|
|
for (cur = g_list_first (splitmux->contexts);
|
|
cur != NULL; cur = g_list_next (cur)) {
|
|
MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
|
|
if (tmpctx != ctx && g_queue_get_length (&tmpctx->queued_bufs) < 1) {
|
|
allow_grow = TRUE;
|
|
}
|
|
}
|
|
}
|
|
GST_SPLITMUX_UNLOCK (splitmux);
|
|
|
|
if (allow_grow) {
|
|
guint cur_limit;
|
|
|
|
g_object_get (q, "max-size-buffers", &cur_limit, NULL);
|
|
cur_limit++;
|
|
|
|
GST_DEBUG_OBJECT (q,
|
|
"Queue overflowed and needs enlarging. Growing to %u buffers",
|
|
cur_limit);
|
|
|
|
g_object_set (q, "max-size-buffers", cur_limit, NULL);
|
|
}
|
|
}
|
|
|
|
static GstPad *
|
|
gst_splitmux_sink_request_new_pad (GstElement * element,
|
|
GstPadTemplate * templ, const gchar * name, const GstCaps * caps)
|
|
{
|
|
GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
|
|
GstPadTemplate *mux_template = NULL;
|
|
GstPad *res = NULL;
|
|
GstElement *q;
|
|
GstPad *q_sink = NULL, *q_src = NULL;
|
|
gchar *gname;
|
|
gboolean is_video = FALSE;
|
|
MqStreamCtx *ctx;
|
|
|
|
GST_DEBUG_OBJECT (element, "templ:%s, name:%s", templ->name_template, name);
|
|
|
|
GST_SPLITMUX_LOCK (splitmux);
|
|
if (!create_muxer (splitmux))
|
|
goto fail;
|
|
g_signal_emit (splitmux, signals[SIGNAL_MUXER_ADDED], 0, splitmux->muxer);
|
|
|
|
if (templ->name_template) {
|
|
if (g_str_equal (templ->name_template, "video")) {
|
|
if (splitmux->have_video)
|
|
goto already_have_video;
|
|
|
|
/* FIXME: Look for a pad template with matching caps, rather than by name */
|
|
mux_template =
|
|
gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
|
|
(splitmux->muxer), "video_%u");
|
|
|
|
/* Fallback to find sink pad templates named 'video' (flvmux) */
|
|
if (!mux_template) {
|
|
mux_template =
|
|
gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
|
|
(splitmux->muxer), "video");
|
|
}
|
|
is_video = TRUE;
|
|
name = NULL;
|
|
} else {
|
|
mux_template =
|
|
gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
|
|
(splitmux->muxer), templ->name_template);
|
|
|
|
/* Fallback to find sink pad templates named 'audio' (flvmux) */
|
|
if (!mux_template) {
|
|
mux_template =
|
|
gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
|
|
(splitmux->muxer), "audio");
|
|
name = NULL;
|
|
}
|
|
}
|
|
if (mux_template == NULL) {
|
|
/* Fallback to find sink pad templates named 'sink_%d' (mpegtsmux) */
|
|
mux_template =
|
|
gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
|
|
(splitmux->muxer), "sink_%d");
|
|
name = NULL;
|
|
}
|
|
}
|
|
|
|
res = gst_element_request_pad (splitmux->muxer, mux_template, name, caps);
|
|
if (res == NULL)
|
|
goto fail;
|
|
|
|
if (is_video)
|
|
gname = g_strdup ("video");
|
|
else if (name == NULL)
|
|
gname = gst_pad_get_name (res);
|
|
else
|
|
gname = g_strdup (name);
|
|
|
|
if ((q = create_element (splitmux, "queue", NULL, FALSE)) == NULL)
|
|
goto fail;
|
|
|
|
gst_element_set_state (q, GST_STATE_TARGET (splitmux));
|
|
|
|
g_object_set (q, "max-size-bytes", 0, "max-size-time", (guint64) (0),
|
|
"max-size-buffers", 5, NULL);
|
|
|
|
q_sink = gst_element_get_static_pad (q, "sink");
|
|
q_src = gst_element_get_static_pad (q, "src");
|
|
|
|
if (gst_pad_link (q_src, res) != GST_PAD_LINK_OK) {
|
|
gst_element_release_request_pad (splitmux->muxer, res);
|
|
gst_object_unref (GST_OBJECT (res));
|
|
goto fail;
|
|
}
|
|
|
|
gst_object_unref (GST_OBJECT (res));
|
|
|
|
ctx = mq_stream_ctx_new (splitmux);
|
|
/* Context holds a ref: */
|
|
ctx->q = gst_object_ref (q);
|
|
ctx->srcpad = q_src;
|
|
ctx->sinkpad = q_sink;
|
|
ctx->q_overrun_id =
|
|
g_signal_connect (q, "overrun", (GCallback) handle_q_overrun, ctx);
|
|
g_signal_connect (q, "underrun", (GCallback) handle_q_underrun, ctx);
|
|
|
|
ctx->src_pad_block_id =
|
|
gst_pad_add_probe (q_src,
|
|
GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM | GST_PAD_PROBE_TYPE_EVENT_FLUSH,
|
|
(GstPadProbeCallback) handle_mq_output, ctx, NULL);
|
|
if (is_video && splitmux->reference_ctx != NULL) {
|
|
splitmux->reference_ctx->is_reference = FALSE;
|
|
splitmux->reference_ctx = NULL;
|
|
}
|
|
if (splitmux->reference_ctx == NULL) {
|
|
splitmux->reference_ctx = ctx;
|
|
ctx->is_reference = TRUE;
|
|
}
|
|
|
|
res = gst_ghost_pad_new_from_template (gname, q_sink, templ);
|
|
g_object_set_qdata ((GObject *) (res), PAD_CONTEXT, ctx);
|
|
|
|
ctx->sink_pad_block_id =
|
|
gst_pad_add_probe (q_sink,
|
|
GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM | GST_PAD_PROBE_TYPE_EVENT_FLUSH |
|
|
GST_PAD_PROBE_TYPE_QUERY_DOWNSTREAM,
|
|
(GstPadProbeCallback) handle_mq_input, ctx, NULL);
|
|
|
|
GST_DEBUG_OBJECT (splitmux, "Request pad %" GST_PTR_FORMAT
|
|
" feeds queue pad %" GST_PTR_FORMAT, res, q_sink);
|
|
|
|
splitmux->contexts = g_list_append (splitmux->contexts, ctx);
|
|
|
|
g_free (gname);
|
|
|
|
if (is_video)
|
|
splitmux->have_video = TRUE;
|
|
|
|
gst_pad_set_active (res, TRUE);
|
|
gst_element_add_pad (element, res);
|
|
|
|
GST_SPLITMUX_UNLOCK (splitmux);
|
|
|
|
return res;
|
|
fail:
|
|
GST_SPLITMUX_UNLOCK (splitmux);
|
|
|
|
if (q_sink)
|
|
gst_object_unref (q_sink);
|
|
if (q_src)
|
|
gst_object_unref (q_src);
|
|
return NULL;
|
|
already_have_video:
|
|
GST_DEBUG_OBJECT (splitmux, "video sink pad already requested");
|
|
GST_SPLITMUX_UNLOCK (splitmux);
|
|
return NULL;
|
|
}
|
|
|
|
static void
|
|
gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad)
|
|
{
|
|
GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
|
|
GstPad *muxpad = NULL;
|
|
MqStreamCtx *ctx =
|
|
(MqStreamCtx *) (g_object_get_qdata ((GObject *) (pad), PAD_CONTEXT));
|
|
|
|
GST_SPLITMUX_LOCK (splitmux);
|
|
|
|
if (splitmux->muxer == NULL)
|
|
goto fail; /* Elements don't exist yet - nothing to release */
|
|
|
|
GST_INFO_OBJECT (pad, "releasing request pad");
|
|
|
|
muxpad = gst_pad_get_peer (ctx->srcpad);
|
|
|
|
/* Remove the context from our consideration */
|
|
splitmux->contexts = g_list_remove (splitmux->contexts, ctx);
|
|
|
|
if (ctx->sink_pad_block_id)
|
|
gst_pad_remove_probe (ctx->sinkpad, ctx->sink_pad_block_id);
|
|
|
|
if (ctx->src_pad_block_id)
|
|
gst_pad_remove_probe (ctx->srcpad, ctx->src_pad_block_id);
|
|
|
|
/* Can release the context now */
|
|
mq_stream_ctx_free (ctx);
|
|
if (ctx == splitmux->reference_ctx)
|
|
splitmux->reference_ctx = NULL;
|
|
|
|
/* Release and free the muxer input */
|
|
if (muxpad) {
|
|
gst_element_release_request_pad (splitmux->muxer, muxpad);
|
|
gst_object_unref (muxpad);
|
|
}
|
|
|
|
if (GST_PAD_PAD_TEMPLATE (pad) &&
|
|
g_str_equal (GST_PAD_TEMPLATE_NAME_TEMPLATE (GST_PAD_PAD_TEMPLATE
|
|
(pad)), "video"))
|
|
splitmux->have_video = FALSE;
|
|
|
|
gst_element_remove_pad (element, pad);
|
|
|
|
/* Reset the internal elements only after all request pads are released */
|
|
if (splitmux->contexts == NULL)
|
|
gst_splitmux_reset (splitmux);
|
|
|
|
fail:
|
|
GST_SPLITMUX_UNLOCK (splitmux);
|
|
}
|
|
|
|
static GstElement *
|
|
create_element (GstSplitMuxSink * splitmux,
|
|
const gchar * factory, const gchar * name, gboolean locked)
|
|
{
|
|
GstElement *ret = gst_element_factory_make (factory, name);
|
|
if (ret == NULL) {
|
|
g_warning ("Failed to create %s - splitmuxsink will not work", name);
|
|
return NULL;
|
|
}
|
|
|
|
if (locked) {
|
|
/* Ensure the sink starts in locked state and NULL - it will be changed
|
|
* by the filename setting code */
|
|
gst_element_set_locked_state (ret, TRUE);
|
|
gst_element_set_state (ret, GST_STATE_NULL);
|
|
}
|
|
|
|
if (!gst_bin_add (GST_BIN (splitmux), ret)) {
|
|
g_warning ("Could not add %s element - splitmuxsink will not work", name);
|
|
gst_object_unref (ret);
|
|
return NULL;
|
|
}
|
|
|
|
return ret;
|
|
}
|
|
|
|
static gboolean
|
|
create_muxer (GstSplitMuxSink * splitmux)
|
|
{
|
|
/* Create internal elements */
|
|
if (splitmux->muxer == NULL) {
|
|
GstElement *provided_muxer = NULL;
|
|
|
|
GST_OBJECT_LOCK (splitmux);
|
|
if (splitmux->provided_muxer != NULL)
|
|
provided_muxer = gst_object_ref (splitmux->provided_muxer);
|
|
GST_OBJECT_UNLOCK (splitmux);
|
|
|
|
if ((!splitmux->async_finalize && provided_muxer == NULL) ||
|
|
(splitmux->async_finalize && splitmux->muxer_factory == NULL)) {
|
|
if ((splitmux->muxer =
|
|
create_element (splitmux, DEFAULT_MUXER, "muxer", FALSE)) == NULL)
|
|
goto fail;
|
|
} else if (splitmux->async_finalize) {
|
|
if ((splitmux->muxer =
|
|
create_element (splitmux, splitmux->muxer_factory, "muxer",
|
|
FALSE)) == NULL)
|
|
goto fail;
|
|
if (splitmux->muxer_properties)
|
|
gst_structure_foreach (splitmux->muxer_properties,
|
|
_set_property_from_structure, splitmux->muxer);
|
|
} else {
|
|
/* Ensure it's not in locked state (we might be reusing an old element) */
|
|
gst_element_set_locked_state (provided_muxer, FALSE);
|
|
if (!gst_bin_add (GST_BIN (splitmux), provided_muxer)) {
|
|
g_warning ("Could not add muxer element - splitmuxsink will not work");
|
|
gst_object_unref (provided_muxer);
|
|
goto fail;
|
|
}
|
|
|
|
splitmux->muxer = provided_muxer;
|
|
gst_object_unref (provided_muxer);
|
|
}
|
|
|
|
if (splitmux->use_robust_muxing) {
|
|
update_muxer_properties (splitmux);
|
|
}
|
|
}
|
|
|
|
return TRUE;
|
|
fail:
|
|
return FALSE;
|
|
}
|
|
|
|
static GstElement *
|
|
find_sink (GstElement * e)
|
|
{
|
|
GstElement *res = NULL;
|
|
GstIterator *iter;
|
|
gboolean done = FALSE;
|
|
GValue data = { 0, };
|
|
|
|
if (!GST_IS_BIN (e))
|
|
return e;
|
|
|
|
if (g_object_class_find_property (G_OBJECT_GET_CLASS (e), "location") != NULL)
|
|
return e;
|
|
|
|
iter = gst_bin_iterate_sinks (GST_BIN (e));
|
|
while (!done) {
|
|
switch (gst_iterator_next (iter, &data)) {
|
|
case GST_ITERATOR_OK:
|
|
{
|
|
GstElement *child = g_value_get_object (&data);
|
|
if (g_object_class_find_property (G_OBJECT_GET_CLASS (child),
|
|
"location") != NULL) {
|
|
res = child;
|
|
done = TRUE;
|
|
}
|
|
g_value_reset (&data);
|
|
break;
|
|
}
|
|
case GST_ITERATOR_RESYNC:
|
|
gst_iterator_resync (iter);
|
|
break;
|
|
case GST_ITERATOR_DONE:
|
|
done = TRUE;
|
|
break;
|
|
case GST_ITERATOR_ERROR:
|
|
g_assert_not_reached ();
|
|
break;
|
|
}
|
|
}
|
|
g_value_unset (&data);
|
|
gst_iterator_free (iter);
|
|
|
|
return res;
|
|
}
|
|
|
|
static gboolean
|
|
create_sink (GstSplitMuxSink * splitmux)
|
|
{
|
|
GstElement *provided_sink = NULL;
|
|
|
|
if (splitmux->active_sink == NULL) {
|
|
|
|
GST_OBJECT_LOCK (splitmux);
|
|
if (splitmux->provided_sink != NULL)
|
|
provided_sink = gst_object_ref (splitmux->provided_sink);
|
|
GST_OBJECT_UNLOCK (splitmux);
|
|
|
|
if ((!splitmux->async_finalize && provided_sink == NULL) ||
|
|
(splitmux->async_finalize && splitmux->sink_factory == NULL)) {
|
|
if ((splitmux->sink =
|
|
create_element (splitmux, DEFAULT_SINK, "sink", TRUE)) == NULL)
|
|
goto fail;
|
|
splitmux->active_sink = splitmux->sink;
|
|
} else if (splitmux->async_finalize) {
|
|
if ((splitmux->sink =
|
|
create_element (splitmux, splitmux->sink_factory, "sink",
|
|
TRUE)) == NULL)
|
|
goto fail;
|
|
if (splitmux->sink_properties)
|
|
gst_structure_foreach (splitmux->sink_properties,
|
|
_set_property_from_structure, splitmux->sink);
|
|
splitmux->active_sink = splitmux->sink;
|
|
} else {
|
|
/* Ensure the sink starts in locked state and NULL - it will be changed
|
|
* by the filename setting code */
|
|
gst_element_set_locked_state (provided_sink, TRUE);
|
|
gst_element_set_state (provided_sink, GST_STATE_NULL);
|
|
if (!gst_bin_add (GST_BIN (splitmux), provided_sink)) {
|
|
g_warning ("Could not add sink elements - splitmuxsink will not work");
|
|
gst_object_unref (provided_sink);
|
|
goto fail;
|
|
}
|
|
|
|
splitmux->active_sink = provided_sink;
|
|
|
|
/* The bin holds a ref now, we can drop our tmp ref */
|
|
gst_object_unref (provided_sink);
|
|
|
|
/* Find the sink element */
|
|
splitmux->sink = find_sink (splitmux->active_sink);
|
|
if (splitmux->sink == NULL) {
|
|
g_warning
|
|
("Could not locate sink element in provided sink - splitmuxsink will not work");
|
|
goto fail;
|
|
}
|
|
}
|
|
|
|
#if 1
|
|
if (g_object_class_find_property (G_OBJECT_GET_CLASS (splitmux->sink),
|
|
"async") != NULL) {
|
|
/* async child elements are causing state change races and weird
|
|
* failures, so let's try and turn that off */
|
|
g_object_set (splitmux->sink, "async", FALSE, NULL);
|
|
}
|
|
#endif
|
|
|
|
if (!gst_element_link (splitmux->muxer, splitmux->active_sink)) {
|
|
g_warning ("Failed to link muxer and sink- splitmuxsink will not work");
|
|
goto fail;
|
|
}
|
|
}
|
|
|
|
return TRUE;
|
|
fail:
|
|
return FALSE;
|
|
}
|
|
|
|
#ifdef __GNUC__
|
|
#pragma GCC diagnostic ignored "-Wformat-nonliteral"
|
|
#endif
|
|
static void
|
|
set_next_filename (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
|
|
{
|
|
gchar *fname = NULL;
|
|
GstSample *sample;
|
|
GstCaps *caps;
|
|
|
|
gst_splitmux_sink_ensure_max_files (splitmux);
|
|
|
|
if (ctx->cur_out_buffer == NULL) {
|
|
GST_WARNING_OBJECT (splitmux, "Starting next file without buffer");
|
|
}
|
|
|
|
caps = gst_pad_get_current_caps (ctx->srcpad);
|
|
sample = gst_sample_new (ctx->cur_out_buffer, caps, &ctx->out_segment, NULL);
|
|
g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION_FULL], 0,
|
|
splitmux->fragment_id, sample, &fname);
|
|
gst_sample_unref (sample);
|
|
if (caps)
|
|
gst_caps_unref (caps);
|
|
|
|
if (fname == NULL) {
|
|
/* Fallback to the old signal if the new one returned nothing */
|
|
g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION], 0,
|
|
splitmux->fragment_id, &fname);
|
|
}
|
|
|
|
if (!fname)
|
|
fname = splitmux->location ?
|
|
g_strdup_printf (splitmux->location, splitmux->fragment_id) : NULL;
|
|
|
|
if (fname) {
|
|
GST_INFO_OBJECT (splitmux, "Setting file to %s", fname);
|
|
g_object_set (splitmux->sink, "location", fname, NULL);
|
|
g_free (fname);
|
|
|
|
splitmux->fragment_id++;
|
|
}
|
|
}
|
|
|
|
static void
|
|
do_async_start (GstSplitMuxSink * splitmux)
|
|
{
|
|
GstMessage *message;
|
|
|
|
if (!splitmux->need_async_start) {
|
|
GST_INFO_OBJECT (splitmux, "no async_start needed");
|
|
return;
|
|
}
|
|
|
|
splitmux->async_pending = TRUE;
|
|
|
|
GST_INFO_OBJECT (splitmux, "Sending async_start message");
|
|
message = gst_message_new_async_start (GST_OBJECT_CAST (splitmux));
|
|
GST_BIN_CLASS (parent_class)->handle_message (GST_BIN_CAST
|
|
(splitmux), message);
|
|
}
|
|
|
|
static void
|
|
do_async_done (GstSplitMuxSink * splitmux)
|
|
{
|
|
GstMessage *message;
|
|
|
|
if (splitmux->async_pending) {
|
|
GST_INFO_OBJECT (splitmux, "Sending async_done message");
|
|
message =
|
|
gst_message_new_async_done (GST_OBJECT_CAST (splitmux),
|
|
GST_CLOCK_TIME_NONE);
|
|
GST_BIN_CLASS (parent_class)->handle_message (GST_BIN_CAST
|
|
(splitmux), message);
|
|
|
|
splitmux->async_pending = FALSE;
|
|
}
|
|
|
|
splitmux->need_async_start = FALSE;
|
|
}
|
|
|
|
static GstStateChangeReturn
|
|
gst_splitmux_sink_change_state (GstElement * element, GstStateChange transition)
|
|
{
|
|
GstStateChangeReturn ret;
|
|
GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
|
|
|
|
switch (transition) {
|
|
case GST_STATE_CHANGE_NULL_TO_READY:{
|
|
GST_SPLITMUX_LOCK (splitmux);
|
|
if (!create_muxer (splitmux) || !create_sink (splitmux)) {
|
|
ret = GST_STATE_CHANGE_FAILURE;
|
|
GST_SPLITMUX_UNLOCK (splitmux);
|
|
goto beach;
|
|
}
|
|
g_signal_emit (splitmux, signals[SIGNAL_MUXER_ADDED], 0, splitmux->muxer);
|
|
g_signal_emit (splitmux, signals[SIGNAL_SINK_ADDED], 0, splitmux->sink);
|
|
GST_SPLITMUX_UNLOCK (splitmux);
|
|
splitmux->fragment_id = 0;
|
|
break;
|
|
}
|
|
case GST_STATE_CHANGE_READY_TO_PAUSED:{
|
|
GST_SPLITMUX_LOCK (splitmux);
|
|
/* Start by collecting one input on each pad */
|
|
splitmux->input_state = SPLITMUX_INPUT_STATE_COLLECTING_GOP_START;
|
|
splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE;
|
|
splitmux->max_in_running_time = GST_CLOCK_STIME_NONE;
|
|
splitmux->gop_start_time = splitmux->fragment_start_time =
|
|
GST_CLOCK_STIME_NONE;
|
|
splitmux->muxed_out_bytes = 0;
|
|
splitmux->ready_for_output = FALSE;
|
|
GST_SPLITMUX_UNLOCK (splitmux);
|
|
break;
|
|
}
|
|
case GST_STATE_CHANGE_PAUSED_TO_READY:
|
|
g_atomic_int_set (&(splitmux->split_now), FALSE);
|
|
case GST_STATE_CHANGE_READY_TO_NULL:
|
|
GST_SPLITMUX_LOCK (splitmux);
|
|
splitmux->output_state = SPLITMUX_OUTPUT_STATE_STOPPED;
|
|
splitmux->input_state = SPLITMUX_INPUT_STATE_STOPPED;
|
|
/* Wake up any blocked threads */
|
|
GST_LOG_OBJECT (splitmux,
|
|
"State change -> NULL or READY. Waking threads");
|
|
GST_SPLITMUX_BROADCAST_INPUT (splitmux);
|
|
GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
|
|
GST_SPLITMUX_UNLOCK (splitmux);
|
|
break;
|
|
default:
|
|
break;
|
|
}
|
|
|
|
ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
|
|
if (ret == GST_STATE_CHANGE_FAILURE)
|
|
goto beach;
|
|
|
|
switch (transition) {
|
|
case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
|
|
splitmux->need_async_start = TRUE;
|
|
break;
|
|
case GST_STATE_CHANGE_READY_TO_PAUSED:{
|
|
/* Change state async, because our child sink might not
|
|
* be ready to do that for us yet if it's state is still locked */
|
|
|
|
splitmux->need_async_start = TRUE;
|
|
/* we want to go async to PAUSED until we managed to configure and add the
|
|
* sink */
|
|
GST_SPLITMUX_LOCK (splitmux);
|
|
do_async_start (splitmux);
|
|
GST_SPLITMUX_UNLOCK (splitmux);
|
|
ret = GST_STATE_CHANGE_ASYNC;
|
|
break;
|
|
}
|
|
case GST_STATE_CHANGE_READY_TO_NULL:
|
|
GST_SPLITMUX_LOCK (splitmux);
|
|
splitmux->fragment_id = 0;
|
|
/* Reset internal elements only if no pad contexts are using them */
|
|
if (splitmux->contexts == NULL)
|
|
gst_splitmux_reset (splitmux);
|
|
do_async_done (splitmux);
|
|
GST_SPLITMUX_UNLOCK (splitmux);
|
|
break;
|
|
default:
|
|
break;
|
|
}
|
|
|
|
beach:
|
|
if (transition == GST_STATE_CHANGE_NULL_TO_READY &&
|
|
ret == GST_STATE_CHANGE_FAILURE) {
|
|
/* Cleanup elements on failed transition out of NULL */
|
|
gst_splitmux_reset (splitmux);
|
|
GST_SPLITMUX_LOCK (splitmux);
|
|
do_async_done (splitmux);
|
|
GST_SPLITMUX_UNLOCK (splitmux);
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
gboolean
|
|
register_splitmuxsink (GstPlugin * plugin)
|
|
{
|
|
GST_DEBUG_CATEGORY_INIT (splitmux_debug, "splitmuxsink", 0,
|
|
"Split File Muxing Sink");
|
|
|
|
return gst_element_register (plugin, "splitmuxsink", GST_RANK_NONE,
|
|
GST_TYPE_SPLITMUX_SINK);
|
|
}
|
|
|
|
static void
|
|
gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux)
|
|
{
|
|
if (splitmux->max_files && splitmux->fragment_id >= splitmux->max_files) {
|
|
splitmux->fragment_id = 0;
|
|
}
|
|
}
|
|
|
|
static void
|
|
split_now (GstSplitMuxSink * splitmux)
|
|
{
|
|
g_atomic_int_set (&(splitmux->split_now), TRUE);
|
|
}
|