/* GStreamer Muxer bin that splits output stream by size/time * Copyright (C) <2014> Jan Schmidt * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Library General Public * License as published by the Free Software Foundation; either * version 2 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Library General Public License for more details. * * You should have received a copy of the GNU Library General Public * License along with this library; if not, write to the * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, * Boston, MA 02110-1301, USA. */ /** * SECTION:element-splitmuxsink * @title: splitmuxsink * @short_description: Muxer wrapper for splitting output stream by size or time * * This element wraps a muxer and a sink, and starts a new file when the mux * contents are about to cross a threshold of maximum size of maximum time, * splitting at video keyframe boundaries. Exactly one input video stream * can be muxed, with as many accompanying audio and subtitle streams as * desired. * * By default, it uses mp4mux and filesink, but they can be changed via * the 'muxer' and 'sink' properties. * * The minimum file size is 1 GOP, however - so limits may be overrun if the * distance between any 2 keyframes is larger than the limits. * * If a video stream is available, the splitting process is driven by the video * stream contents, and the video stream must contain closed GOPs for the output * file parts to be played individually correctly. In the absence of a video * stream, the first available stream is used as reference for synchronization. * * In the async-finalize mode, when the threshold is crossed, the old muxer * and sink is disconnected from the pipeline and left to finish the file * asynchronously, and a new muxer and sink is created to continue with the * next fragment. For that reason, instead of muxer and sink objects, the * muxer-factory and sink-factory properties are used to construct the new * objects, together with muxer-properties and sink-properties. * * ## Example pipelines * |[ * gst-launch-1.0 -e v4l2src num-buffers=500 ! video/x-raw,width=320,height=240 ! videoconvert ! queue ! timeoverlay ! x264enc key-int-max=10 ! h264parse ! splitmuxsink location=video%02d.mov max-size-time=10000000000 max-size-bytes=1000000 * ]| * Records a video stream captured from a v4l2 device and muxes it into * ISO mp4 files, splitting as needed to limit size/duration to 10 seconds * and 1MB maximum size. * * |[ * gst-launch-1.0 -e v4l2src num-buffers=500 ! video/x-raw,width=320,height=240 ! videoconvert ! queue ! timeoverlay ! x264enc key-int-max=10 ! h264parse ! splitmuxsink location=video%02d.mkv max-size-time=10000000000 muxer-factory=matroskamux muxer-properties="properties,streamable=true" * ]| * Records a video stream captured from a v4l2 device and muxer it into * streamable Matroska files, splitting as needed to limit size/duration to 10 * seconds. Each file will finalize asynchronously. */ #ifdef HAVE_CONFIG_H #include "config.h" #endif #include #include #include #include "gstsplitmuxsink.h" GST_DEBUG_CATEGORY_STATIC (splitmux_debug); #define GST_CAT_DEFAULT splitmux_debug #define GST_SPLITMUX_LOCK(s) g_mutex_lock(&(s)->lock) #define GST_SPLITMUX_UNLOCK(s) g_mutex_unlock(&(s)->lock) #define GST_SPLITMUX_WAIT_INPUT(s) g_cond_wait (&(s)->input_cond, &(s)->lock) #define GST_SPLITMUX_BROADCAST_INPUT(s) g_cond_broadcast (&(s)->input_cond) #define GST_SPLITMUX_WAIT_OUTPUT(s) g_cond_wait (&(s)->output_cond, &(s)->lock) #define GST_SPLITMUX_BROADCAST_OUTPUT(s) g_cond_broadcast (&(s)->output_cond) static void split_now (GstSplitMuxSink * splitmux); static void split_after (GstSplitMuxSink * splitmux); static void split_at_running_time (GstSplitMuxSink * splitmux, GstClockTime split_time); enum { PROP_0, PROP_LOCATION, PROP_MAX_SIZE_TIME, PROP_MAX_SIZE_BYTES, PROP_MAX_SIZE_TIMECODE, PROP_SEND_KEYFRAME_REQUESTS, PROP_MAX_FILES, PROP_MUXER_OVERHEAD, PROP_USE_ROBUST_MUXING, PROP_ALIGNMENT_THRESHOLD, PROP_MUXER, PROP_SINK, PROP_RESET_MUXER, PROP_ASYNC_FINALIZE, PROP_MUXER_FACTORY, PROP_MUXER_PROPERTIES, PROP_SINK_FACTORY, PROP_SINK_PROPERTIES }; #define DEFAULT_MAX_SIZE_TIME 0 #define DEFAULT_MAX_SIZE_BYTES 0 #define DEFAULT_MAX_FILES 0 #define DEFAULT_MUXER_OVERHEAD 0.02 #define DEFAULT_SEND_KEYFRAME_REQUESTS FALSE #define DEFAULT_ALIGNMENT_THRESHOLD 0 #define DEFAULT_MUXER "mp4mux" #define DEFAULT_SINK "filesink" #define DEFAULT_USE_ROBUST_MUXING FALSE #define DEFAULT_RESET_MUXER TRUE #define DEFAULT_ASYNC_FINALIZE FALSE typedef struct _AsyncEosHelper { MqStreamCtx *ctx; GstPad *pad; } AsyncEosHelper; enum { SIGNAL_FORMAT_LOCATION, SIGNAL_FORMAT_LOCATION_FULL, SIGNAL_SPLIT_NOW, SIGNAL_SPLIT_AFTER, SIGNAL_SPLIT_AT_RUNNING_TIME, SIGNAL_MUXER_ADDED, SIGNAL_SINK_ADDED, SIGNAL_LAST }; static guint signals[SIGNAL_LAST]; static GstStaticPadTemplate video_sink_template = GST_STATIC_PAD_TEMPLATE ("video", GST_PAD_SINK, GST_PAD_REQUEST, GST_STATIC_CAPS_ANY); static GstStaticPadTemplate audio_sink_template = GST_STATIC_PAD_TEMPLATE ("audio_%u", GST_PAD_SINK, GST_PAD_REQUEST, GST_STATIC_CAPS_ANY); static GstStaticPadTemplate subtitle_sink_template = GST_STATIC_PAD_TEMPLATE ("subtitle_%u", GST_PAD_SINK, GST_PAD_REQUEST, GST_STATIC_CAPS_ANY); static GstStaticPadTemplate caption_sink_template = GST_STATIC_PAD_TEMPLATE ("caption_%u", GST_PAD_SINK, GST_PAD_REQUEST, GST_STATIC_CAPS_ANY); static GQuark PAD_CONTEXT; static GQuark EOS_FROM_US; static GQuark RUNNING_TIME; /* EOS_FROM_US is only valid in async-finalize mode. We need to know whether * to forward an incoming EOS message, but we cannot rely on the state of the * splitmux anymore, so we set this qdata on the sink instead. * The muxer and sink must be destroyed after both of these things have * finished: * 1) The EOS message has been sent when the fragment is ending * 2) The muxer has been unlinked and relinked * Therefore, EOS_FROM_US can have these two values: * 0: EOS was not requested from us. Forward the message. The muxer and the * sink will be destroyed together with the rest of the bin. * 1: EOS was requested from us, but the other of the two tasks hasn't * finished. Set EOS_FROM_US to 2 and do your stuff. * 2: EOS was requested from us and the other of the two tasks has finished. * Now we can destroy the muxer and the sink. */ static void _do_init (void) { PAD_CONTEXT = g_quark_from_static_string ("pad-context"); EOS_FROM_US = g_quark_from_static_string ("eos-from-us"); RUNNING_TIME = g_quark_from_static_string ("running-time"); } #define gst_splitmux_sink_parent_class parent_class G_DEFINE_TYPE_EXTENDED (GstSplitMuxSink, gst_splitmux_sink, GST_TYPE_BIN, 0, _do_init ()); static gboolean create_muxer (GstSplitMuxSink * splitmux); static gboolean create_sink (GstSplitMuxSink * splitmux); static void gst_splitmux_sink_set_property (GObject * object, guint prop_id, const GValue * value, GParamSpec * pspec); static void gst_splitmux_sink_get_property (GObject * object, guint prop_id, GValue * value, GParamSpec * pspec); static void gst_splitmux_sink_dispose (GObject * object); static void gst_splitmux_sink_finalize (GObject * object); static GstPad *gst_splitmux_sink_request_new_pad (GstElement * element, GstPadTemplate * templ, const gchar * name, const GstCaps * caps); static void gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad); static GstStateChangeReturn gst_splitmux_sink_change_state (GstElement * element, GstStateChange transition); static void bus_handler (GstBin * bin, GstMessage * msg); static void set_next_filename (GstSplitMuxSink * splitmux, MqStreamCtx * ctx); static void start_next_fragment (GstSplitMuxSink * splitmux, MqStreamCtx * ctx); static void mq_stream_ctx_free (MqStreamCtx * ctx); static void grow_blocked_queues (GstSplitMuxSink * splitmux); static void gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux); static GstElement *create_element (GstSplitMuxSink * splitmux, const gchar * factory, const gchar * name, gboolean locked); static void do_async_done (GstSplitMuxSink * splitmux); static MqStreamBuf * mq_stream_buf_new (void) { return g_slice_new0 (MqStreamBuf); } static void mq_stream_buf_free (MqStreamBuf * data) { g_slice_free (MqStreamBuf, data); } static SplitMuxOutputCommand * out_cmd_buf_new (void) { return g_slice_new0 (SplitMuxOutputCommand); } static void out_cmd_buf_free (SplitMuxOutputCommand * data) { g_slice_free (SplitMuxOutputCommand, data); } static void gst_splitmux_sink_class_init (GstSplitMuxSinkClass * klass) { GObjectClass *gobject_class = (GObjectClass *) klass; GstElementClass *gstelement_class = (GstElementClass *) klass; GstBinClass *gstbin_class = (GstBinClass *) klass; gobject_class->set_property = gst_splitmux_sink_set_property; gobject_class->get_property = gst_splitmux_sink_get_property; gobject_class->dispose = gst_splitmux_sink_dispose; gobject_class->finalize = gst_splitmux_sink_finalize; gst_element_class_set_static_metadata (gstelement_class, "Split Muxing Bin", "Generic/Bin/Muxer", "Convenience bin that muxes incoming streams into multiple time/size limited files", "Jan Schmidt "); gst_element_class_add_static_pad_template (gstelement_class, &video_sink_template); gst_element_class_add_static_pad_template (gstelement_class, &audio_sink_template); gst_element_class_add_static_pad_template (gstelement_class, &subtitle_sink_template); gst_element_class_add_static_pad_template (gstelement_class, &caption_sink_template); gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_splitmux_sink_change_state); gstelement_class->request_new_pad = GST_DEBUG_FUNCPTR (gst_splitmux_sink_request_new_pad); gstelement_class->release_pad = GST_DEBUG_FUNCPTR (gst_splitmux_sink_release_pad); gstbin_class->handle_message = bus_handler; g_object_class_install_property (gobject_class, PROP_LOCATION, g_param_spec_string ("location", "File Output Pattern", "Format string pattern for the location of the files to write (e.g. video%05d.mp4)", NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); g_object_class_install_property (gobject_class, PROP_MUXER_OVERHEAD, g_param_spec_double ("mux-overhead", "Muxing Overhead", "Extra size overhead of muxing (0.02 = 2%)", 0.0, 1.0, DEFAULT_MUXER_OVERHEAD, G_PARAM_READWRITE | GST_PARAM_CONTROLLABLE | G_PARAM_STATIC_STRINGS)); g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME, g_param_spec_uint64 ("max-size-time", "Max. size (ns)", "Max. amount of time per file (in ns, 0=disable)", 0, G_MAXUINT64, DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BYTES, g_param_spec_uint64 ("max-size-bytes", "Max. size bytes", "Max. amount of data per file (in bytes, 0=disable)", 0, G_MAXUINT64, DEFAULT_MAX_SIZE_BYTES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIMECODE, g_param_spec_string ("max-size-timecode", "Maximum timecode difference", "Maximum difference in timecode between first and last frame. " "Separator is assumed to be \":\" everywhere (e.g. 01:00:00:00). " "Will only be effective if a timecode track is present.", NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); g_object_class_install_property (gobject_class, PROP_SEND_KEYFRAME_REQUESTS, g_param_spec_boolean ("send-keyframe-requests", "Request keyframes at max-size-time", "Request a keyframe every max-size-time ns to try splitting at that point. " "Needs max-size-bytes to be 0 in order to be effective.", DEFAULT_SEND_KEYFRAME_REQUESTS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); g_object_class_install_property (gobject_class, PROP_MAX_FILES, g_param_spec_uint ("max-files", "Max files", "Maximum number of files to keep on disk. Once the maximum is reached," "old files start to be deleted to make room for new ones.", 0, G_MAXUINT, DEFAULT_MAX_FILES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); g_object_class_install_property (gobject_class, PROP_ALIGNMENT_THRESHOLD, g_param_spec_uint64 ("alignment-threshold", "Alignment threshold (ns)", "Allow non-reference streams to be that many ns before the reference" " stream", 0, G_MAXUINT64, DEFAULT_ALIGNMENT_THRESHOLD, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); g_object_class_install_property (gobject_class, PROP_MUXER, g_param_spec_object ("muxer", "Muxer", "The muxer element to use (NULL = default mp4mux). " "Valid only for async-finalize = FALSE", GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); g_object_class_install_property (gobject_class, PROP_SINK, g_param_spec_object ("sink", "Sink", "The sink element (or element chain) to use (NULL = default filesink). " "Valid only for async-finalize = FALSE", GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); g_object_class_install_property (gobject_class, PROP_USE_ROBUST_MUXING, g_param_spec_boolean ("use-robust-muxing", "Support robust-muxing mode of some muxers", "Check if muxers support robust muxing via the reserved-max-duration and " "reserved-duration-remaining properties and use them if so. " "(Only present on qtmux and mp4mux for now). splitmuxsink may then also " " create new fragments if the reserved header space is about to overflow. " "Note that for mp4mux and qtmux, reserved-moov-update-period must be set " "manually by the app to a non-zero value for robust muxing to have an effect.", DEFAULT_USE_ROBUST_MUXING, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); g_object_class_install_property (gobject_class, PROP_RESET_MUXER, g_param_spec_boolean ("reset-muxer", "Reset Muxer", "Reset the muxer after each segment. Disabling this will not work for most muxers.", DEFAULT_RESET_MUXER, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); g_object_class_install_property (gobject_class, PROP_ASYNC_FINALIZE, g_param_spec_boolean ("async-finalize", "Finalize fragments asynchronously", "Finalize each fragment asynchronously and start a new one", DEFAULT_ASYNC_FINALIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); g_object_class_install_property (gobject_class, PROP_MUXER_FACTORY, g_param_spec_string ("muxer-factory", "Muxer factory", "The muxer element factory to use (default = mp4mux). " "Valid only for async-finalize = TRUE", "mp4mux", G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); g_object_class_install_property (gobject_class, PROP_MUXER_PROPERTIES, g_param_spec_boxed ("muxer-properties", "Muxer properties", "The muxer element properties to use. " "Example: {properties,boolean-prop=true,string-prop=\"hi\"}. " "Valid only for async-finalize = TRUE", GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); g_object_class_install_property (gobject_class, PROP_SINK_FACTORY, g_param_spec_string ("sink-factory", "Sink factory", "The sink element factory to use (default = filesink). " "Valid only for async-finalize = TRUE", "filesink", G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); g_object_class_install_property (gobject_class, PROP_SINK_PROPERTIES, g_param_spec_boxed ("sink-properties", "Sink properties", "The sink element properties to use. " "Example: {properties,boolean-prop=true,string-prop=\"hi\"}. " "Valid only for async-finalize = TRUE", GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); /** * GstSplitMuxSink::format-location: * @splitmux: the #GstSplitMuxSink * @fragment_id: the sequence number of the file to be created * * Returns: the location to be used for the next output file */ signals[SIGNAL_FORMAT_LOCATION] = g_signal_new ("format-location", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_STRING, 1, G_TYPE_UINT); /** * GstSplitMuxSink::format-location-full: * @splitmux: the #GstSplitMuxSink * @fragment_id: the sequence number of the file to be created * @first_sample: A #GstSample containing the first buffer * from the reference stream in the new file * * Returns: the location to be used for the next output file */ signals[SIGNAL_FORMAT_LOCATION_FULL] = g_signal_new ("format-location-full", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_STRING, 2, G_TYPE_UINT, GST_TYPE_SAMPLE); /** * GstSplitMuxSink::split-now: * @splitmux: the #GstSplitMuxSink * * When called by the user, this action signal splits the video file (and begins a new one) immediately. * The current GOP will be output to the new file. * * Since: 1.14 */ signals[SIGNAL_SPLIT_NOW] = g_signal_new ("split-now", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstSplitMuxSinkClass, split_now), NULL, NULL, NULL, G_TYPE_NONE, 0); /** * GstSplitMuxSink::split-after: * @splitmux: the #GstSplitMuxSink * * When called by the user, this action signal splits the video file (and begins a new one) immediately. * The current GOP will be output to the old file. * * Since: 1.16 */ signals[SIGNAL_SPLIT_AFTER] = g_signal_new ("split-after", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstSplitMuxSinkClass, split_after), NULL, NULL, NULL, G_TYPE_NONE, 0); /** * GstSplitMuxSink::split-now: * @splitmux: the #GstSplitMuxSink * * When called by the user, this action signal splits the video file (and * begins a new one) as soon as the given running time is reached. If this * action signal is called multiple times, running times are queued up and * processed in the order they were given. * * Note that this is prone to race conditions, where said running time is * reached and surpassed before we had a chance to split. The file will * still split immediately, but in order to make sure that the split doesn't * happen too late, it is recommended to call this action signal from * something that will prevent further buffers from flowing into * splitmuxsink before the split is completed, such as a pad probe before * splitmuxsink. * * * Since: 1.16 */ signals[SIGNAL_SPLIT_AT_RUNNING_TIME] = g_signal_new ("split-at-running-time", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstSplitMuxSinkClass, split_at_running_time), NULL, NULL, NULL, G_TYPE_NONE, 1, G_TYPE_UINT64); /** * GstSplitMuxSink::muxer-added: * @splitmux: the #GstSplitMuxSink * @muxer: the newly added muxer element * * Since: 1.14 */ signals[SIGNAL_MUXER_ADDED] = g_signal_new ("muxer-added", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 1, GST_TYPE_ELEMENT); /** * GstSplitMuxSink::sink-added: * @splitmux: the #GstSplitMuxSink * @sink: the newly added sink element * * Since: 1.14 */ signals[SIGNAL_SINK_ADDED] = g_signal_new ("sink-added", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 1, GST_TYPE_ELEMENT); klass->split_now = split_now; klass->split_after = split_after; klass->split_at_running_time = split_at_running_time; } static void gst_splitmux_sink_init (GstSplitMuxSink * splitmux) { g_mutex_init (&splitmux->lock); g_cond_init (&splitmux->input_cond); g_cond_init (&splitmux->output_cond); g_queue_init (&splitmux->out_cmd_q); splitmux->mux_overhead = DEFAULT_MUXER_OVERHEAD; splitmux->threshold_time = DEFAULT_MAX_SIZE_TIME; splitmux->threshold_bytes = DEFAULT_MAX_SIZE_BYTES; splitmux->max_files = DEFAULT_MAX_FILES; splitmux->send_keyframe_requests = DEFAULT_SEND_KEYFRAME_REQUESTS; splitmux->next_max_tc_time = GST_CLOCK_TIME_NONE; splitmux->alignment_threshold = DEFAULT_ALIGNMENT_THRESHOLD; splitmux->use_robust_muxing = DEFAULT_USE_ROBUST_MUXING; splitmux->reset_muxer = DEFAULT_RESET_MUXER; splitmux->threshold_timecode_str = NULL; splitmux->async_finalize = DEFAULT_ASYNC_FINALIZE; splitmux->muxer_factory = g_strdup (DEFAULT_MUXER); splitmux->muxer_properties = NULL; splitmux->sink_factory = g_strdup (DEFAULT_SINK); splitmux->sink_properties = NULL; GST_OBJECT_FLAG_SET (splitmux, GST_ELEMENT_FLAG_SINK); splitmux->split_requested = FALSE; splitmux->do_split_next_gop = FALSE; splitmux->times_to_split = gst_queue_array_new_for_struct (8, 8); } static void gst_splitmux_reset (GstSplitMuxSink * splitmux) { if (splitmux->muxer) { gst_element_set_locked_state (splitmux->muxer, TRUE); gst_element_set_state (splitmux->muxer, GST_STATE_NULL); gst_bin_remove (GST_BIN (splitmux), splitmux->muxer); } if (splitmux->active_sink) { gst_element_set_locked_state (splitmux->active_sink, TRUE); gst_element_set_state (splitmux->active_sink, GST_STATE_NULL); gst_bin_remove (GST_BIN (splitmux), splitmux->active_sink); } splitmux->sink = splitmux->active_sink = splitmux->muxer = NULL; } static void gst_splitmux_sink_dispose (GObject * object) { GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object); /* Calling parent dispose invalidates all child pointers */ splitmux->sink = splitmux->active_sink = splitmux->muxer = NULL; G_OBJECT_CLASS (parent_class)->dispose (object); } static void gst_splitmux_sink_finalize (GObject * object) { GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object); g_cond_clear (&splitmux->input_cond); g_cond_clear (&splitmux->output_cond); g_mutex_clear (&splitmux->lock); g_queue_foreach (&splitmux->out_cmd_q, (GFunc) out_cmd_buf_free, NULL); g_queue_clear (&splitmux->out_cmd_q); if (splitmux->provided_sink) gst_object_unref (splitmux->provided_sink); if (splitmux->provided_muxer) gst_object_unref (splitmux->provided_muxer); if (splitmux->muxer_factory) g_free (splitmux->muxer_factory); if (splitmux->muxer_properties) gst_structure_free (splitmux->muxer_properties); if (splitmux->sink_factory) g_free (splitmux->sink_factory); if (splitmux->sink_properties) gst_structure_free (splitmux->sink_properties); if (splitmux->threshold_timecode_str) g_free (splitmux->threshold_timecode_str); if (splitmux->times_to_split) gst_queue_array_free (splitmux->times_to_split); g_free (splitmux->location); /* Make sure to free any un-released contexts. There should not be any, * because the dispose will have freed all request pads though */ g_list_foreach (splitmux->contexts, (GFunc) mq_stream_ctx_free, NULL); g_list_free (splitmux->contexts); G_OBJECT_CLASS (parent_class)->finalize (object); } /* * Set any time threshold to the muxer, if it has * reserved-max-duration and reserved-duration-remaining * properties. Called when creating/claiming the muxer * in create_elements() */ static void update_muxer_properties (GstSplitMuxSink * sink) { GObjectClass *klass; GstClockTime threshold_time; sink->muxer_has_reserved_props = FALSE; if (sink->muxer == NULL) return; klass = G_OBJECT_GET_CLASS (sink->muxer); if (g_object_class_find_property (klass, "reserved-max-duration") == NULL) return; if (g_object_class_find_property (klass, "reserved-duration-remaining") == NULL) return; sink->muxer_has_reserved_props = TRUE; GST_LOG_OBJECT (sink, "Setting muxer reserved time to %" GST_TIME_FORMAT, GST_TIME_ARGS (sink->threshold_time)); GST_OBJECT_LOCK (sink); threshold_time = sink->threshold_time; GST_OBJECT_UNLOCK (sink); if (threshold_time > 0) { /* Tell the muxer how much space to reserve */ GstClockTime muxer_threshold = threshold_time; g_object_set (sink->muxer, "reserved-max-duration", muxer_threshold, NULL); } } static void gst_splitmux_sink_set_property (GObject * object, guint prop_id, const GValue * value, GParamSpec * pspec) { GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object); switch (prop_id) { case PROP_LOCATION:{ GST_OBJECT_LOCK (splitmux); g_free (splitmux->location); splitmux->location = g_value_dup_string (value); GST_OBJECT_UNLOCK (splitmux); break; } case PROP_MAX_SIZE_BYTES: GST_OBJECT_LOCK (splitmux); splitmux->threshold_bytes = g_value_get_uint64 (value); GST_OBJECT_UNLOCK (splitmux); break; case PROP_MAX_SIZE_TIME: GST_OBJECT_LOCK (splitmux); splitmux->threshold_time = g_value_get_uint64 (value); GST_OBJECT_UNLOCK (splitmux); break; case PROP_MAX_SIZE_TIMECODE: GST_OBJECT_LOCK (splitmux); splitmux->threshold_timecode_str = g_value_dup_string (value); GST_OBJECT_UNLOCK (splitmux); break; case PROP_SEND_KEYFRAME_REQUESTS: GST_OBJECT_LOCK (splitmux); splitmux->send_keyframe_requests = g_value_get_boolean (value); GST_OBJECT_UNLOCK (splitmux); break; case PROP_MAX_FILES: GST_OBJECT_LOCK (splitmux); splitmux->max_files = g_value_get_uint (value); GST_OBJECT_UNLOCK (splitmux); break; case PROP_MUXER_OVERHEAD: GST_OBJECT_LOCK (splitmux); splitmux->mux_overhead = g_value_get_double (value); GST_OBJECT_UNLOCK (splitmux); break; case PROP_USE_ROBUST_MUXING: GST_OBJECT_LOCK (splitmux); splitmux->use_robust_muxing = g_value_get_boolean (value); GST_OBJECT_UNLOCK (splitmux); if (splitmux->use_robust_muxing) update_muxer_properties (splitmux); break; case PROP_ALIGNMENT_THRESHOLD: GST_OBJECT_LOCK (splitmux); splitmux->alignment_threshold = g_value_get_uint64 (value); GST_OBJECT_UNLOCK (splitmux); break; case PROP_SINK: GST_OBJECT_LOCK (splitmux); if (splitmux->provided_sink) gst_object_unref (splitmux->provided_sink); splitmux->provided_sink = g_value_get_object (value); gst_object_ref_sink (splitmux->provided_sink); GST_OBJECT_UNLOCK (splitmux); break; case PROP_MUXER: GST_OBJECT_LOCK (splitmux); if (splitmux->provided_muxer) gst_object_unref (splitmux->provided_muxer); splitmux->provided_muxer = g_value_get_object (value); gst_object_ref_sink (splitmux->provided_muxer); GST_OBJECT_UNLOCK (splitmux); break; case PROP_RESET_MUXER: GST_OBJECT_LOCK (splitmux); splitmux->reset_muxer = g_value_get_boolean (value); GST_OBJECT_UNLOCK (splitmux); break; case PROP_ASYNC_FINALIZE: GST_OBJECT_LOCK (splitmux); splitmux->async_finalize = g_value_get_boolean (value); GST_OBJECT_UNLOCK (splitmux); break; case PROP_MUXER_FACTORY: GST_OBJECT_LOCK (splitmux); if (splitmux->muxer_factory) g_free (splitmux->muxer_factory); splitmux->muxer_factory = g_value_dup_string (value); GST_OBJECT_UNLOCK (splitmux); break; case PROP_MUXER_PROPERTIES: GST_OBJECT_LOCK (splitmux); if (splitmux->muxer_properties) gst_structure_free (splitmux->muxer_properties); if (gst_value_get_structure (value)) splitmux->muxer_properties = gst_structure_copy (gst_value_get_structure (value)); else splitmux->muxer_properties = NULL; GST_OBJECT_UNLOCK (splitmux); break; case PROP_SINK_FACTORY: GST_OBJECT_LOCK (splitmux); if (splitmux->sink_factory) g_free (splitmux->sink_factory); splitmux->sink_factory = g_value_dup_string (value); GST_OBJECT_UNLOCK (splitmux); break; case PROP_SINK_PROPERTIES: GST_OBJECT_LOCK (splitmux); if (splitmux->sink_properties) gst_structure_free (splitmux->sink_properties); if (gst_value_get_structure (value)) splitmux->sink_properties = gst_structure_copy (gst_value_get_structure (value)); else splitmux->sink_properties = NULL; GST_OBJECT_UNLOCK (splitmux); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; } } static void gst_splitmux_sink_get_property (GObject * object, guint prop_id, GValue * value, GParamSpec * pspec) { GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object); switch (prop_id) { case PROP_LOCATION: GST_OBJECT_LOCK (splitmux); g_value_set_string (value, splitmux->location); GST_OBJECT_UNLOCK (splitmux); break; case PROP_MAX_SIZE_BYTES: GST_OBJECT_LOCK (splitmux); g_value_set_uint64 (value, splitmux->threshold_bytes); GST_OBJECT_UNLOCK (splitmux); break; case PROP_MAX_SIZE_TIME: GST_OBJECT_LOCK (splitmux); g_value_set_uint64 (value, splitmux->threshold_time); GST_OBJECT_UNLOCK (splitmux); break; case PROP_MAX_SIZE_TIMECODE: GST_OBJECT_LOCK (splitmux); g_value_set_string (value, splitmux->threshold_timecode_str); GST_OBJECT_UNLOCK (splitmux); break; case PROP_SEND_KEYFRAME_REQUESTS: GST_OBJECT_LOCK (splitmux); g_value_set_boolean (value, splitmux->send_keyframe_requests); GST_OBJECT_UNLOCK (splitmux); break; case PROP_MAX_FILES: GST_OBJECT_LOCK (splitmux); g_value_set_uint (value, splitmux->max_files); GST_OBJECT_UNLOCK (splitmux); break; case PROP_MUXER_OVERHEAD: GST_OBJECT_LOCK (splitmux); g_value_set_double (value, splitmux->mux_overhead); GST_OBJECT_UNLOCK (splitmux); break; case PROP_USE_ROBUST_MUXING: GST_OBJECT_LOCK (splitmux); g_value_set_boolean (value, splitmux->use_robust_muxing); GST_OBJECT_UNLOCK (splitmux); break; case PROP_ALIGNMENT_THRESHOLD: GST_OBJECT_LOCK (splitmux); g_value_set_uint64 (value, splitmux->alignment_threshold); GST_OBJECT_UNLOCK (splitmux); break; case PROP_SINK: GST_OBJECT_LOCK (splitmux); g_value_set_object (value, splitmux->provided_sink); GST_OBJECT_UNLOCK (splitmux); break; case PROP_MUXER: GST_OBJECT_LOCK (splitmux); g_value_set_object (value, splitmux->provided_muxer); GST_OBJECT_UNLOCK (splitmux); break; case PROP_RESET_MUXER: GST_OBJECT_LOCK (splitmux); g_value_set_boolean (value, splitmux->reset_muxer); GST_OBJECT_UNLOCK (splitmux); break; case PROP_ASYNC_FINALIZE: GST_OBJECT_LOCK (splitmux); g_value_set_boolean (value, splitmux->async_finalize); GST_OBJECT_UNLOCK (splitmux); break; case PROP_MUXER_FACTORY: GST_OBJECT_LOCK (splitmux); g_value_set_string (value, splitmux->muxer_factory); GST_OBJECT_UNLOCK (splitmux); break; case PROP_MUXER_PROPERTIES: GST_OBJECT_LOCK (splitmux); gst_value_set_structure (value, splitmux->muxer_properties); GST_OBJECT_UNLOCK (splitmux); break; case PROP_SINK_FACTORY: GST_OBJECT_LOCK (splitmux); g_value_set_string (value, splitmux->sink_factory); GST_OBJECT_UNLOCK (splitmux); break; case PROP_SINK_PROPERTIES: GST_OBJECT_LOCK (splitmux); gst_value_set_structure (value, splitmux->sink_properties); GST_OBJECT_UNLOCK (splitmux); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; } } /* Convenience function */ static inline GstClockTimeDiff my_segment_to_running_time (GstSegment * segment, GstClockTime val) { GstClockTimeDiff res = GST_CLOCK_STIME_NONE; if (GST_CLOCK_TIME_IS_VALID (val)) { gboolean sign = gst_segment_to_running_time_full (segment, GST_FORMAT_TIME, val, &val); if (sign > 0) res = val; else if (sign < 0) res = -val; } return res; } static MqStreamCtx * mq_stream_ctx_new (GstSplitMuxSink * splitmux) { MqStreamCtx *ctx; ctx = g_new0 (MqStreamCtx, 1); ctx->splitmux = splitmux; gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED); gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED); ctx->in_running_time = ctx->out_running_time = GST_CLOCK_STIME_NONE; g_queue_init (&ctx->queued_bufs); return ctx; } static void mq_stream_ctx_free (MqStreamCtx * ctx) { if (ctx->q) { GstObject *parent = gst_object_get_parent (GST_OBJECT (ctx->q)); g_signal_handler_disconnect (ctx->q, ctx->q_overrun_id); if (parent == GST_OBJECT_CAST (ctx->splitmux)) { gst_element_set_locked_state (ctx->q, TRUE); gst_element_set_state (ctx->q, GST_STATE_NULL); gst_bin_remove (GST_BIN (ctx->splitmux), ctx->q); gst_object_unref (parent); } gst_object_unref (ctx->q); } gst_buffer_replace (&ctx->prev_in_keyframe, NULL); gst_object_unref (ctx->sinkpad); gst_object_unref (ctx->srcpad); g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL); g_queue_clear (&ctx->queued_bufs); g_free (ctx); } static void send_fragment_opened_closed_msg (GstSplitMuxSink * splitmux, gboolean opened, GstElement * sink) { gchar *location = NULL; GstMessage *msg; const gchar *msg_name = opened ? "splitmuxsink-fragment-opened" : "splitmuxsink-fragment-closed"; GstClockTime running_time = splitmux->reference_ctx->out_running_time; if (!opened) { GstClockTime *rtime = g_object_get_qdata (G_OBJECT (sink), RUNNING_TIME); if (rtime) running_time = *rtime; } g_object_get (sink, "location", &location, NULL); /* If it's in the middle of a teardown, the reference_ctc might have become * NULL */ if (splitmux->reference_ctx) { msg = gst_message_new_element (GST_OBJECT (splitmux), gst_structure_new (msg_name, "location", G_TYPE_STRING, location, "running-time", GST_TYPE_CLOCK_TIME, running_time, NULL)); gst_element_post_message (GST_ELEMENT_CAST (splitmux), msg); } g_free (location); } static void send_eos_async (GstSplitMuxSink * splitmux, AsyncEosHelper * helper) { GstEvent *eos; GstPad *pad; MqStreamCtx *ctx; eos = gst_event_new_eos (); pad = helper->pad; ctx = helper->ctx; GST_SPLITMUX_LOCK (splitmux); if (!pad) pad = gst_pad_get_peer (ctx->srcpad); GST_SPLITMUX_UNLOCK (splitmux); gst_pad_send_event (pad, eos); GST_INFO_OBJECT (splitmux, "Sent async EOS on %" GST_PTR_FORMAT, pad); gst_object_unref (pad); g_free (helper); } /* Called with lock held, drops the lock to send EOS to the * pad */ static void send_eos (GstSplitMuxSink * splitmux, MqStreamCtx * ctx) { GstEvent *eos; GstPad *pad; eos = gst_event_new_eos (); pad = gst_pad_get_peer (ctx->srcpad); ctx->out_eos = TRUE; GST_INFO_OBJECT (splitmux, "Sending EOS on %" GST_PTR_FORMAT, pad); GST_SPLITMUX_UNLOCK (splitmux); gst_pad_send_event (pad, eos); GST_SPLITMUX_LOCK (splitmux); gst_object_unref (pad); } /* Called with lock held. Schedules an EOS event to the ctx pad * to happen in another thread */ static void eos_context_async (MqStreamCtx * ctx, GstSplitMuxSink * splitmux) { AsyncEosHelper *helper = g_new0 (AsyncEosHelper, 1); GstPad *srcpad, *sinkpad; srcpad = ctx->srcpad; sinkpad = gst_pad_get_peer (srcpad); helper->ctx = ctx; helper->pad = sinkpad; /* Takes the reference */ ctx->out_eos_async_done = TRUE; /* HACK: Here, we explicitly unset the SINK flag on the target sink element * that's about to be asynchronously disposed, so that it no longer * participates in GstBin EOS logic. This fixes a race where if * splitmuxsink really reaches EOS before an asynchronous background * element has finished, then the bin won't actually send EOS to the * pipeline. Even after finishing and removing the old element, the * bin doesn't re-check EOS status on removing a SINK element. This * should be fixed in core, making this hack unnecessary. */ GST_OBJECT_FLAG_UNSET (splitmux->active_sink, GST_ELEMENT_FLAG_SINK); GST_DEBUG_OBJECT (splitmux, "scheduled EOS to pad %" GST_PTR_FORMAT " ctx %p", sinkpad, ctx); g_assert_nonnull (helper->pad); gst_element_call_async (GST_ELEMENT (splitmux), (GstElementCallAsyncFunc) send_eos_async, helper, NULL); } /* Called with lock held. TRUE iff all contexts have a * pending (or delivered) async eos event */ static gboolean all_contexts_are_async_eos (GstSplitMuxSink * splitmux) { gboolean ret = TRUE; GList *item; for (item = splitmux->contexts; item; item = item->next) { MqStreamCtx *ctx = item->data; ret &= ctx->out_eos_async_done; } return ret; } /* Called with splitmux lock held to check if this output * context needs to sleep to wait for the release of the * next GOP, or to send EOS to close out the current file */ static void complete_or_wait_on_out (GstSplitMuxSink * splitmux, MqStreamCtx * ctx) { if (ctx->caps_change) return; do { /* When first starting up, the reference stream has to output * the first buffer to prepare the muxer and sink */ gboolean can_output = (ctx->is_reference || splitmux->ready_for_output); GstClockTimeDiff my_max_out_running_time = splitmux->max_out_running_time; if (!(splitmux->max_out_running_time == 0 || splitmux->max_out_running_time == GST_CLOCK_STIME_NONE || splitmux->alignment_threshold == 0 || splitmux->max_out_running_time < splitmux->alignment_threshold)) { my_max_out_running_time -= splitmux->alignment_threshold; GST_LOG_OBJECT (ctx->srcpad, "Max out running time currently %" GST_STIME_FORMAT ", with threshold applied it is %" GST_STIME_FORMAT, GST_STIME_ARGS (splitmux->max_out_running_time), GST_STIME_ARGS (my_max_out_running_time)); } if (ctx->flushing || splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED) return; GST_LOG_OBJECT (ctx->srcpad, "Checking running time %" GST_STIME_FORMAT " against max %" GST_STIME_FORMAT, GST_STIME_ARGS (ctx->out_running_time), GST_STIME_ARGS (my_max_out_running_time)); if (can_output) { if (splitmux->max_out_running_time == GST_CLOCK_STIME_NONE || ctx->out_running_time < my_max_out_running_time) { return; } switch (splitmux->output_state) { case SPLITMUX_OUTPUT_STATE_OUTPUT_GOP: /* We only get here if we've finished outputting a GOP and need to know * what to do next */ splitmux->output_state = SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND; GST_SPLITMUX_BROADCAST_OUTPUT (splitmux); continue; case SPLITMUX_OUTPUT_STATE_ENDING_FILE: /* We've reached the max out running_time to get here, so end this file now */ if (ctx->out_eos == FALSE) { if (splitmux->async_finalize) { /* We must set EOS asynchronously at this point. We cannot defer * it, because we need all contexts to wake up, for the * reference context to eventually give us something at * START_NEXT_FILE. Otherwise, collectpads might choose another * context to give us the first buffer, and format-location-full * will not contain a valid sample. */ g_object_set_qdata ((GObject *) splitmux->sink, EOS_FROM_US, GINT_TO_POINTER (1)); eos_context_async (ctx, splitmux); if (all_contexts_are_async_eos (splitmux)) { GST_INFO_OBJECT (splitmux, "All contexts are async_eos. Moving to the next file."); /* We can start the next file once we've asked each pad to go EOS */ splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE; GST_SPLITMUX_BROADCAST_OUTPUT (splitmux); continue; } } else { send_eos (splitmux, ctx); continue; } } else { GST_INFO_OBJECT (splitmux, "At end-of-file state, but context %p is already EOS", ctx); } break; case SPLITMUX_OUTPUT_STATE_START_NEXT_FILE: if (ctx->is_reference) { /* Special handling on the reference ctx to start new fragments * and collect commands from the command queue */ /* drops the splitmux lock briefly: */ /* We must have reference ctx in order for format-location-full to * have a sample */ start_next_fragment (splitmux, ctx); continue; } break; case SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND:{ do { SplitMuxOutputCommand *cmd = g_queue_pop_tail (&splitmux->out_cmd_q); if (cmd != NULL) { /* If we pop the last command, we need to make our queues bigger */ if (g_queue_get_length (&splitmux->out_cmd_q) == 0) grow_blocked_queues (splitmux); if (cmd->start_new_fragment) { GST_DEBUG_OBJECT (splitmux, "Got cmd to start new fragment"); splitmux->output_state = SPLITMUX_OUTPUT_STATE_ENDING_FILE; } else { GST_DEBUG_OBJECT (splitmux, "Got new output cmd for time %" GST_STIME_FORMAT, GST_STIME_ARGS (cmd->max_output_ts)); /* Extend the output range immediately */ splitmux->max_out_running_time = cmd->max_output_ts; splitmux->output_state = SPLITMUX_OUTPUT_STATE_OUTPUT_GOP; } GST_SPLITMUX_BROADCAST_OUTPUT (splitmux); out_cmd_buf_free (cmd); break; } else { GST_SPLITMUX_WAIT_OUTPUT (splitmux); } } while (splitmux->output_state == SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND); /* loop and re-check the state */ continue; } case SPLITMUX_OUTPUT_STATE_STOPPED: return; } } else { GST_LOG_OBJECT (ctx->srcpad, "Not yet ready for output"); } GST_INFO_OBJECT (ctx->srcpad, "Sleeping for running time %" GST_STIME_FORMAT " (max %" GST_STIME_FORMAT ") or state change.", GST_STIME_ARGS (ctx->out_running_time), GST_STIME_ARGS (splitmux->max_out_running_time)); GST_SPLITMUX_WAIT_OUTPUT (splitmux); GST_INFO_OBJECT (ctx->srcpad, "Woken for new max running time %" GST_STIME_FORMAT, GST_STIME_ARGS (splitmux->max_out_running_time)); } while (1); } static GstClockTime calculate_next_max_timecode (GstSplitMuxSink * splitmux, const GstVideoTimeCode * cur_tc) { GstVideoTimeCode *target_tc; GstVideoTimeCodeInterval *tc_inter; GstClockTime cur_tc_time, target_tc_time, next_max_tc_time; if (cur_tc == NULL || splitmux->threshold_timecode_str == NULL) return GST_CLOCK_TIME_NONE; tc_inter = gst_video_time_code_interval_new_from_string (splitmux->threshold_timecode_str); target_tc = gst_video_time_code_add_interval (cur_tc, tc_inter); gst_video_time_code_interval_free (tc_inter); /* Convert to ns */ target_tc_time = gst_video_time_code_nsec_since_daily_jam (target_tc); cur_tc_time = gst_video_time_code_nsec_since_daily_jam (cur_tc); /* Add fragment_start_time, accounting for wraparound */ if (target_tc_time >= cur_tc_time) { next_max_tc_time = target_tc_time - cur_tc_time + splitmux->fragment_start_time; } else { GstClockTime day_in_ns = 24 * 60 * 60 * GST_SECOND; if ((cur_tc->config.flags & GST_VIDEO_TIME_CODE_FLAGS_DROP_FRAME) && (cur_tc->config.fps_d == 1001)) { /* Checking fps_d is probably unneeded, but better safe than sorry * (e.g. someone accidentally set a flag) */ GstVideoTimeCode *tc_for_offset; /* Here, the duration of the 24:00:00;00 timecode isn't exactly one day, * but slightly less. Calculate that duration from a fake timecode. The * problem is that 24:00:00;00 isn't a valid timecode, so the workaround * is to add one frame to 23:59:59;29 */ tc_for_offset = gst_video_time_code_new (cur_tc->config.fps_n, cur_tc->config.fps_d, NULL, cur_tc->config.flags, 23, 59, 59, cur_tc->config.fps_n / cur_tc->config.fps_d, 0); day_in_ns = gst_video_time_code_nsec_since_daily_jam (tc_for_offset) + gst_util_uint64_scale (GST_SECOND, cur_tc->config.fps_d, cur_tc->config.fps_n); gst_video_time_code_free (tc_for_offset); } next_max_tc_time = day_in_ns - cur_tc_time + target_tc_time + splitmux->fragment_start_time; } GST_INFO_OBJECT (splitmux, "Next max TC time: %" GST_TIME_FORMAT " from ref TC: %" GST_TIME_FORMAT, GST_TIME_ARGS (next_max_tc_time), GST_TIME_ARGS (cur_tc_time)); gst_video_time_code_free (target_tc); return next_max_tc_time; } static gboolean request_next_keyframe (GstSplitMuxSink * splitmux, GstBuffer * buffer) { GstEvent *ev; GstClockTime target_time; gboolean timecode_based = FALSE; splitmux->next_max_tc_time = GST_CLOCK_TIME_NONE; if (splitmux->threshold_timecode_str) { GstVideoTimeCodeMeta *tc_meta; if (buffer != NULL) { tc_meta = gst_buffer_get_video_time_code_meta (buffer); if (tc_meta) { splitmux->next_max_tc_time = calculate_next_max_timecode (splitmux, &tc_meta->tc); timecode_based = (splitmux->next_max_tc_time != GST_CLOCK_TIME_NONE); } } else { /* This can happen in the presence of GAP events that trigger * a new fragment start */ GST_WARNING_OBJECT (splitmux, "No buffer available to calculate next timecode"); } } if (splitmux->send_keyframe_requests == FALSE || (splitmux->threshold_time == 0 && !timecode_based) || splitmux->threshold_bytes != 0) return TRUE; if (timecode_based) { /* We might have rounding errors: aim slightly earlier */ target_time = splitmux->next_max_tc_time - 5 * GST_USECOND; } else { target_time = splitmux->fragment_start_time + splitmux->threshold_time; } ev = gst_video_event_new_upstream_force_key_unit (target_time, TRUE, 0); GST_INFO_OBJECT (splitmux, "Requesting next keyframe at %" GST_TIME_FORMAT, GST_TIME_ARGS (target_time)); return gst_pad_push_event (splitmux->reference_ctx->sinkpad, ev); } static GstPadProbeReturn handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx) { GstSplitMuxSink *splitmux = ctx->splitmux; MqStreamBuf *buf_info = NULL; GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type); /* FIXME: Handle buffer lists, until then make it clear they won't work */ if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) { g_warning ("Buffer list handling not implemented"); return GST_PAD_PROBE_DROP; } if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM || info->type & GST_PAD_PROBE_TYPE_EVENT_FLUSH) { GstEvent *event = gst_pad_probe_info_get_event (info); gboolean locked = FALSE; GST_LOG_OBJECT (pad, "Event %" GST_PTR_FORMAT, event); switch (GST_EVENT_TYPE (event)) { case GST_EVENT_SEGMENT: gst_event_copy_segment (event, &ctx->out_segment); break; case GST_EVENT_FLUSH_STOP: GST_SPLITMUX_LOCK (splitmux); locked = TRUE; gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED); g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL); g_queue_clear (&ctx->queued_bufs); ctx->flushing = FALSE; break; case GST_EVENT_FLUSH_START: GST_SPLITMUX_LOCK (splitmux); locked = TRUE; GST_LOG_OBJECT (pad, "Flush start"); ctx->flushing = TRUE; GST_SPLITMUX_BROADCAST_OUTPUT (splitmux); break; case GST_EVENT_EOS: GST_SPLITMUX_LOCK (splitmux); locked = TRUE; if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED) goto beach; ctx->out_eos = TRUE; GST_INFO_OBJECT (splitmux, "Have EOS event at pad %" GST_PTR_FORMAT " ctx %p", pad, ctx); break; case GST_EVENT_GAP:{ GstClockTime gap_ts; GstClockTimeDiff rtime; gst_event_parse_gap (event, &gap_ts, NULL); if (gap_ts == GST_CLOCK_TIME_NONE) break; GST_SPLITMUX_LOCK (splitmux); locked = TRUE; if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED) goto beach; /* When we get a gap event on the * reference stream and we're trying to open a * new file, we need to store it until we get * the buffer afterwards */ if (ctx->is_reference && (splitmux->output_state != SPLITMUX_OUTPUT_STATE_OUTPUT_GOP)) { GST_DEBUG_OBJECT (pad, "Storing GAP event until buffer arrives"); gst_event_replace (&ctx->pending_gap, event); GST_SPLITMUX_UNLOCK (splitmux); return GST_PAD_PROBE_HANDLED; } rtime = my_segment_to_running_time (&ctx->out_segment, gap_ts); GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_STIME_FORMAT, GST_STIME_ARGS (rtime)); if (rtime != GST_CLOCK_STIME_NONE) { ctx->out_running_time = rtime; complete_or_wait_on_out (splitmux, ctx); } break; } case GST_EVENT_CUSTOM_DOWNSTREAM:{ const GstStructure *s; GstClockTimeDiff ts = 0; s = gst_event_get_structure (event); if (!gst_structure_has_name (s, "splitmuxsink-unblock")) break; gst_structure_get_int64 (s, "timestamp", &ts); GST_SPLITMUX_LOCK (splitmux); locked = TRUE; if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED) goto beach; ctx->out_running_time = ts; if (!ctx->is_reference) complete_or_wait_on_out (splitmux, ctx); GST_SPLITMUX_UNLOCK (splitmux); return GST_PAD_PROBE_DROP; } case GST_EVENT_CAPS:{ GstPad *peer; if (!ctx->is_reference) break; peer = gst_pad_get_peer (pad); if (peer) { gboolean ok = gst_pad_send_event (peer, gst_event_ref (event)); gst_object_unref (peer); if (ok) break; } else { break; } /* This is in the case the muxer doesn't allow this change of caps */ GST_SPLITMUX_LOCK (splitmux); locked = TRUE; ctx->caps_change = TRUE; if (splitmux->output_state != SPLITMUX_OUTPUT_STATE_START_NEXT_FILE) { GST_DEBUG_OBJECT (splitmux, "New caps were not accepted. Switching output file"); if (ctx->out_eos == FALSE) { splitmux->output_state = SPLITMUX_OUTPUT_STATE_ENDING_FILE; GST_SPLITMUX_BROADCAST_OUTPUT (splitmux); } } /* Lets it fall through, if it fails again, then the muxer just can't * support this format, but at least we have a closed file. */ break; } default: break; } /* We need to make sure events aren't passed * until the muxer / sink are ready for it */ if (!locked) GST_SPLITMUX_LOCK (splitmux); if (!ctx->is_reference) complete_or_wait_on_out (splitmux, ctx); GST_SPLITMUX_UNLOCK (splitmux); /* Don't try to forward sticky events before the next buffer is there * because it would cause a new file to be created without the first * buffer being available. */ if (ctx->caps_change && GST_EVENT_IS_STICKY (event)) { gst_event_unref (event); return GST_PAD_PROBE_HANDLED; } else return GST_PAD_PROBE_PASS; } /* Allow everything through until the configured next stopping point */ GST_SPLITMUX_LOCK (splitmux); buf_info = g_queue_pop_tail (&ctx->queued_bufs); if (buf_info == NULL) /* Can only happen due to a poorly timed flush */ goto beach; /* If we have popped a keyframe, decrement the queued_gop count */ if (buf_info->keyframe && splitmux->queued_keyframes > 0) splitmux->queued_keyframes--; ctx->out_running_time = buf_info->run_ts; ctx->cur_out_buffer = gst_pad_probe_info_get_buffer (info); GST_LOG_OBJECT (splitmux, "Pad %" GST_PTR_FORMAT " buffer with run TS %" GST_STIME_FORMAT " size %" G_GUINT64_FORMAT, pad, GST_STIME_ARGS (ctx->out_running_time), buf_info->buf_size); ctx->caps_change = FALSE; complete_or_wait_on_out (splitmux, ctx); splitmux->muxed_out_bytes += buf_info->buf_size; #ifndef GST_DISABLE_GST_DEBUG { GstBuffer *buf = gst_pad_probe_info_get_buffer (info); GST_LOG_OBJECT (pad, "Returning to pass buffer %" GST_PTR_FORMAT " run ts %" GST_STIME_FORMAT, buf, GST_STIME_ARGS (ctx->out_running_time)); } #endif ctx->cur_out_buffer = NULL; GST_SPLITMUX_UNLOCK (splitmux); /* pending_gap is protected by the STREAM lock */ if (ctx->pending_gap) { /* If we previously stored a gap event, send it now */ GstPad *peer = gst_pad_get_peer (ctx->srcpad); GST_DEBUG_OBJECT (splitmux, "Pad %" GST_PTR_FORMAT " sending pending GAP event", ctx->srcpad); gst_pad_send_event (peer, ctx->pending_gap); ctx->pending_gap = NULL; gst_object_unref (peer); } mq_stream_buf_free (buf_info); return GST_PAD_PROBE_PASS; beach: GST_SPLITMUX_UNLOCK (splitmux); return GST_PAD_PROBE_DROP; } static gboolean resend_sticky (GstPad * pad, GstEvent ** event, GstPad * peer) { return gst_pad_send_event (peer, gst_event_ref (*event)); } static void unlock_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux) { if (ctx->fragment_block_id > 0) { gst_pad_remove_probe (ctx->srcpad, ctx->fragment_block_id); ctx->fragment_block_id = 0; } } static void restart_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux) { GstPad *peer = gst_pad_get_peer (ctx->srcpad); gst_pad_sticky_events_foreach (ctx->srcpad, (GstPadStickyEventsForeachFunction) (resend_sticky), peer); /* Clear EOS flag if not actually EOS */ ctx->out_eos = GST_PAD_IS_EOS (ctx->srcpad); ctx->out_eos_async_done = ctx->out_eos; gst_object_unref (peer); } static void relink_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux) { GstPad *sinkpad, *srcpad, *newpad; GstPadTemplate *templ; srcpad = ctx->srcpad; sinkpad = gst_pad_get_peer (srcpad); templ = sinkpad->padtemplate; newpad = gst_element_request_pad (splitmux->muxer, templ, GST_PAD_TEMPLATE_NAME_TEMPLATE (templ), NULL); GST_DEBUG_OBJECT (splitmux, "Relinking ctx %p to pad %" GST_PTR_FORMAT, ctx, newpad); if (!gst_pad_unlink (srcpad, sinkpad)) { gst_object_unref (sinkpad); goto fail; } if (gst_pad_link_full (srcpad, newpad, GST_PAD_LINK_CHECK_NO_RECONFIGURE) != GST_PAD_LINK_OK) { gst_element_release_request_pad (splitmux->muxer, newpad); gst_object_unref (sinkpad); gst_object_unref (newpad); goto fail; } gst_object_unref (newpad); gst_object_unref (sinkpad); return; fail: GST_ELEMENT_ERROR (splitmux, RESOURCE, SETTINGS, ("Could not create the new muxer/sink"), NULL); } static GstPadProbeReturn _block_pad (GstPad * pad, GstPadProbeInfo * info, gpointer user_data) { return GST_PAD_PROBE_OK; } static void block_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux) { ctx->fragment_block_id = gst_pad_add_probe (ctx->srcpad, GST_PAD_PROBE_TYPE_BLOCK, _block_pad, NULL, NULL); } static gboolean _set_property_from_structure (GQuark field_id, const GValue * value, gpointer user_data) { const gchar *property_name = g_quark_to_string (field_id); GObject *element = G_OBJECT (user_data); g_object_set_property (element, property_name, value); return TRUE; } static void _lock_and_set_to_null (GstElement * element, GstSplitMuxSink * splitmux) { gst_element_set_locked_state (element, TRUE); gst_element_set_state (element, GST_STATE_NULL); GST_LOG_OBJECT (splitmux, "Removing old element %" GST_PTR_FORMAT, element); gst_bin_remove (GST_BIN (splitmux), element); } static void _send_event (const GValue * value, gpointer user_data) { GstPad *pad = g_value_get_object (value); GstEvent *ev = user_data; gst_pad_send_event (pad, gst_event_ref (ev)); } /* Called with lock held when a fragment * reaches EOS and it is time to restart * a new fragment */ static void start_next_fragment (GstSplitMuxSink * splitmux, MqStreamCtx * ctx) { GstElement *muxer, *sink; g_assert (ctx->is_reference); /* 1 change to new file */ splitmux->switching_fragment = TRUE; /* We need to drop the splitmux lock to acquire the state lock * here and ensure there's no racy state change going on elsewhere */ muxer = gst_object_ref (splitmux->muxer); sink = gst_object_ref (splitmux->active_sink); GST_SPLITMUX_UNLOCK (splitmux); GST_STATE_LOCK (splitmux); if (splitmux->async_finalize) { if (splitmux->muxed_out_bytes > 0 || splitmux->fragment_id != 0) { gchar *newname; GstElement *new_sink, *new_muxer; GST_DEBUG_OBJECT (splitmux, "Starting fragment %u", splitmux->fragment_id); g_list_foreach (splitmux->contexts, (GFunc) block_context, splitmux); newname = g_strdup_printf ("sink_%u", splitmux->fragment_id); GST_SPLITMUX_LOCK (splitmux); if ((splitmux->sink = create_element (splitmux, splitmux->sink_factory, newname, TRUE)) == NULL) goto fail; if (splitmux->sink_properties) gst_structure_foreach (splitmux->sink_properties, _set_property_from_structure, splitmux->sink); splitmux->active_sink = splitmux->sink; g_signal_emit (splitmux, signals[SIGNAL_SINK_ADDED], 0, splitmux->sink); g_free (newname); newname = g_strdup_printf ("muxer_%u", splitmux->fragment_id); if ((splitmux->muxer = create_element (splitmux, splitmux->muxer_factory, newname, TRUE)) == NULL) goto fail; if (g_object_class_find_property (G_OBJECT_GET_CLASS (splitmux->sink), "async") != NULL) { /* async child elements are causing state change races and weird * failures, so let's try and turn that off */ g_object_set (splitmux->sink, "async", FALSE, NULL); } if (splitmux->muxer_properties) gst_structure_foreach (splitmux->muxer_properties, _set_property_from_structure, splitmux->muxer); g_signal_emit (splitmux, signals[SIGNAL_MUXER_ADDED], 0, splitmux->muxer); g_free (newname); new_sink = splitmux->sink; new_muxer = splitmux->muxer; GST_SPLITMUX_UNLOCK (splitmux); g_list_foreach (splitmux->contexts, (GFunc) relink_context, splitmux); gst_element_link (new_muxer, new_sink); if (g_object_get_qdata ((GObject *) sink, EOS_FROM_US)) { if (GPOINTER_TO_INT (g_object_get_qdata ((GObject *) sink, EOS_FROM_US)) == 2) { _lock_and_set_to_null (muxer, splitmux); _lock_and_set_to_null (sink, splitmux); } else { g_object_set_qdata ((GObject *) sink, EOS_FROM_US, GINT_TO_POINTER (2)); } } gst_object_unref (muxer); gst_object_unref (sink); muxer = new_muxer; sink = new_sink; gst_object_ref (muxer); gst_object_ref (sink); } } else { gst_element_set_locked_state (muxer, TRUE); gst_element_set_locked_state (sink, TRUE); gst_element_set_state (sink, GST_STATE_NULL); if (splitmux->reset_muxer) { gst_element_set_state (muxer, GST_STATE_NULL); } else { GstIterator *it = gst_element_iterate_sink_pads (muxer); GstEvent *ev; ev = gst_event_new_flush_start (); while (gst_iterator_foreach (it, _send_event, ev) == GST_ITERATOR_RESYNC); gst_event_unref (ev); gst_iterator_resync (it); ev = gst_event_new_flush_stop (TRUE); while (gst_iterator_foreach (it, _send_event, ev) == GST_ITERATOR_RESYNC); gst_event_unref (ev); gst_iterator_free (it); } } GST_SPLITMUX_LOCK (splitmux); if (splitmux->muxed_out_bytes > 0 || splitmux->fragment_id == 0) set_next_filename (splitmux, ctx); splitmux->muxed_out_bytes = 0; GST_SPLITMUX_UNLOCK (splitmux); gst_element_set_state (sink, GST_STATE_TARGET (splitmux)); gst_element_set_state (muxer, GST_STATE_TARGET (splitmux)); gst_element_set_locked_state (muxer, FALSE); gst_element_set_locked_state (sink, FALSE); gst_object_unref (sink); gst_object_unref (muxer); GST_SPLITMUX_LOCK (splitmux); GST_STATE_UNLOCK (splitmux); splitmux->switching_fragment = FALSE; do_async_done (splitmux); splitmux->ready_for_output = TRUE; g_list_foreach (splitmux->contexts, (GFunc) unlock_context, splitmux); g_list_foreach (splitmux->contexts, (GFunc) restart_context, splitmux); send_fragment_opened_closed_msg (splitmux, TRUE, sink); /* FIXME: Is this always the correct next state? */ GST_LOG_OBJECT (splitmux, "Resetting state to AWAITING_COMMAND"); splitmux->output_state = SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND; GST_SPLITMUX_BROADCAST_OUTPUT (splitmux); return; fail: GST_STATE_UNLOCK (splitmux); GST_ELEMENT_ERROR (splitmux, RESOURCE, SETTINGS, ("Could not create the new muxer/sink"), NULL); } static void bus_handler (GstBin * bin, GstMessage * message) { GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (bin); switch (GST_MESSAGE_TYPE (message)) { case GST_MESSAGE_EOS:{ /* If the state is draining out the current file, drop this EOS */ GstElement *sink; sink = GST_ELEMENT (GST_MESSAGE_SRC (message)); GST_SPLITMUX_LOCK (splitmux); send_fragment_opened_closed_msg (splitmux, FALSE, sink); if (splitmux->async_finalize) { if (g_object_get_qdata ((GObject *) sink, EOS_FROM_US)) { if (GPOINTER_TO_INT (g_object_get_qdata ((GObject *) sink, EOS_FROM_US)) == 2) { GstElement *muxer; GstPad *sinksink, *muxersrc; sinksink = gst_element_get_static_pad (sink, "sink"); muxersrc = gst_pad_get_peer (sinksink); muxer = gst_pad_get_parent_element (muxersrc); gst_object_unref (sinksink); gst_object_unref (muxersrc); gst_element_call_async (muxer, (GstElementCallAsyncFunc) _lock_and_set_to_null, gst_object_ref (splitmux), gst_object_unref); gst_element_call_async (sink, (GstElementCallAsyncFunc) _lock_and_set_to_null, gst_object_ref (splitmux), gst_object_unref); gst_object_unref (muxer); } else { g_object_set_qdata ((GObject *) sink, EOS_FROM_US, GINT_TO_POINTER (2)); } GST_DEBUG_OBJECT (splitmux, "Caught async EOS from previous muxer+sink. Dropping."); /* We forward the EOS so that it gets aggregated as normal. If the sink * finishes and is removed before the end, it will be de-aggregated */ gst_message_unref (message); GST_SPLITMUX_UNLOCK (splitmux); return; } } else if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_ENDING_FILE) { GST_DEBUG_OBJECT (splitmux, "Caught EOS at end of fragment, dropping"); splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE; GST_SPLITMUX_BROADCAST_OUTPUT (splitmux); gst_message_unref (message); GST_SPLITMUX_UNLOCK (splitmux); return; } else { GST_DEBUG_OBJECT (splitmux, "Passing EOS message. Output state %d max_out_running_time %" GST_STIME_FORMAT, splitmux->output_state, GST_STIME_ARGS (splitmux->max_out_running_time)); } GST_SPLITMUX_UNLOCK (splitmux); break; } case GST_MESSAGE_ASYNC_START: case GST_MESSAGE_ASYNC_DONE: /* Ignore state changes from our children while switching */ GST_SPLITMUX_LOCK (splitmux); if (splitmux->switching_fragment) { if (GST_MESSAGE_SRC (message) == (GstObject *) splitmux->active_sink || GST_MESSAGE_SRC (message) == (GstObject *) splitmux->muxer) { GST_LOG_OBJECT (splitmux, "Ignoring state change from child %" GST_PTR_FORMAT " while switching", GST_MESSAGE_SRC (message)); gst_message_unref (message); GST_SPLITMUX_UNLOCK (splitmux); return; } } GST_SPLITMUX_UNLOCK (splitmux); break; case GST_MESSAGE_WARNING: { GError *gerror = NULL; gst_message_parse_warning (message, &gerror, NULL); if (g_error_matches (gerror, GST_STREAM_ERROR, GST_STREAM_ERROR_FORMAT)) { GList *item; gboolean caps_change = FALSE; GST_SPLITMUX_LOCK (splitmux); for (item = splitmux->contexts; item; item = item->next) { MqStreamCtx *ctx = item->data; if (ctx->caps_change) { caps_change = TRUE; break; } } GST_SPLITMUX_UNLOCK (splitmux); if (caps_change) { GST_LOG_OBJECT (splitmux, "Ignoring warning change from child %" GST_PTR_FORMAT " while switching caps", GST_MESSAGE_SRC (message)); gst_message_unref (message); return; } } break; } default: break; } GST_BIN_CLASS (parent_class)->handle_message (bin, message); } static void ctx_set_unblock (MqStreamCtx * ctx) { ctx->need_unblock = TRUE; } static gboolean need_new_fragment (GstSplitMuxSink * splitmux, GstClockTime queued_time, GstClockTime queued_gop_time, guint64 queued_bytes) { guint64 thresh_bytes; GstClockTime thresh_time; gboolean check_robust_muxing; GstClockTime time_to_split = GST_CLOCK_TIME_NONE; GstClockTime *ptr_to_time; GST_OBJECT_LOCK (splitmux); thresh_bytes = splitmux->threshold_bytes; thresh_time = splitmux->threshold_time; ptr_to_time = (GstClockTime *) gst_queue_array_peek_head_struct (splitmux->times_to_split); if (ptr_to_time) time_to_split = *ptr_to_time; check_robust_muxing = splitmux->use_robust_muxing && splitmux->muxer_has_reserved_props; GST_OBJECT_UNLOCK (splitmux); /* Have we muxed anything into the new file at all? */ if (splitmux->fragment_total_bytes <= 0) return FALSE; /* User told us to split now */ if (g_atomic_int_get (&(splitmux->do_split_next_gop)) == TRUE) { GST_TRACE_OBJECT (splitmux, "Forcing because split_next_gop is set"); return TRUE; } /* User told us to split at this running time */ if (splitmux->reference_ctx->in_running_time > time_to_split) { GST_OBJECT_LOCK (splitmux); /* Dequeue running time */ gst_queue_array_pop_head_struct (splitmux->times_to_split); /* Empty any running times after this that are past now */ ptr_to_time = gst_queue_array_peek_head_struct (splitmux->times_to_split); while (ptr_to_time) { time_to_split = *ptr_to_time; if (splitmux->reference_ctx->in_running_time <= time_to_split) { break; } gst_queue_array_pop_head_struct (splitmux->times_to_split); ptr_to_time = gst_queue_array_peek_head_struct (splitmux->times_to_split); } GST_OBJECT_UNLOCK (splitmux); return TRUE; } if (thresh_bytes > 0 && queued_bytes > thresh_bytes) { GST_TRACE_OBJECT (splitmux, "queued bytes %" G_GUINT64_FORMAT " overruns byte limit", queued_bytes); return TRUE; /* Would overrun byte limit */ } if (thresh_time > 0 && queued_time > thresh_time) { GST_TRACE_OBJECT (splitmux, "queued time %" GST_STIME_FORMAT " overruns time limit", GST_STIME_ARGS (queued_time)); return TRUE; /* Would overrun time limit */ } /* Timecode-based threshold accounts for possible rounding errors: * 5us should be bigger than all possible rounding errors but nowhere near * big enough to skip to another frame */ if (splitmux->next_max_tc_time != GST_CLOCK_TIME_NONE && splitmux->reference_ctx->in_running_time > splitmux->next_max_tc_time + 5 * GST_USECOND) { GST_TRACE_OBJECT (splitmux, "Splitting at timecode mark"); return TRUE; /* Timecode threshold */ } if (check_robust_muxing) { GstClockTime mux_reserved_remain; g_object_get (splitmux->muxer, "reserved-duration-remaining", &mux_reserved_remain, NULL); GST_LOG_OBJECT (splitmux, "Muxer robust muxing report - %" G_GUINT64_FORMAT " remaining. New GOP would enqueue %" G_GUINT64_FORMAT, mux_reserved_remain, queued_gop_time); if (queued_gop_time >= mux_reserved_remain) { GST_INFO_OBJECT (splitmux, "File is about to run out of header room - %" G_GUINT64_FORMAT " remaining. New GOP would enqueue %" G_GUINT64_FORMAT ". Switching to new file", mux_reserved_remain, queued_gop_time); return TRUE; } } /* Continue and mux this GOP */ return FALSE; } /* Called with splitmux lock held */ /* Called when entering ProcessingCompleteGop state * Assess if mq contents overflowed the current file * -> If yes, need to switch to new file * -> if no, set max_out_running_time to let this GOP in and * go to COLLECTING_GOP_START state */ static void handle_gathered_gop (GstSplitMuxSink * splitmux) { guint64 queued_bytes; GstClockTimeDiff queued_time = 0; GstClockTimeDiff queued_gop_time = 0; GstClockTimeDiff new_out_ts = splitmux->reference_ctx->in_running_time; SplitMuxOutputCommand *cmd; /* Assess if the multiqueue contents overflowed the current file */ /* When considering if a newly gathered GOP overflows * the time limit for the file, only consider the running time of the * reference stream. Other streams might have run ahead a little bit, * but extra pieces won't be released to the muxer beyond the reference * stream cut-off anyway - so it forms the limit. */ queued_bytes = splitmux->fragment_total_bytes + splitmux->gop_total_bytes; queued_time = splitmux->reference_ctx->in_running_time; /* queued_gop_time tracks how much unwritten data there is waiting to * be written to this fragment including this GOP */ if (splitmux->reference_ctx->out_running_time != GST_CLOCK_STIME_NONE) queued_gop_time = splitmux->reference_ctx->in_running_time - splitmux->reference_ctx->out_running_time; else queued_gop_time = splitmux->reference_ctx->in_running_time - splitmux->gop_start_time; GST_LOG_OBJECT (splitmux, " queued_bytes %" G_GUINT64_FORMAT, queued_bytes); g_assert (queued_gop_time >= 0); g_assert (queued_time >= splitmux->fragment_start_time); queued_time -= splitmux->fragment_start_time; if (queued_time < queued_gop_time) queued_gop_time = queued_time; /* Expand queued bytes estimate by muxer overhead */ queued_bytes += (queued_bytes * splitmux->mux_overhead); GST_LOG_OBJECT (splitmux, "mq at TS %" GST_STIME_FORMAT " bytes %" G_GUINT64_FORMAT, GST_STIME_ARGS (queued_time), queued_bytes); if (splitmux->next_max_tc_time != GST_CLOCK_TIME_NONE) { GST_LOG_OBJECT (splitmux, "timecode mq TS %" GST_TIME_FORMAT " vs target %" GST_TIME_FORMAT, GST_TIME_ARGS (splitmux->reference_ctx->in_running_time), GST_TIME_ARGS (splitmux->next_max_tc_time + 5 * GST_USECOND)); } /* Check for overrun - have we output at least one byte and overrun * either threshold? */ if (need_new_fragment (splitmux, queued_time, queued_gop_time, queued_bytes)) { if (splitmux->async_finalize) { GstClockTime *sink_running_time = g_new (GstClockTime, 1); *sink_running_time = splitmux->reference_ctx->out_running_time; g_object_set_qdata_full (G_OBJECT (splitmux->sink), RUNNING_TIME, sink_running_time, g_free); } g_atomic_int_set (&(splitmux->do_split_next_gop), FALSE); /* Tell the output side to start a new fragment */ GST_INFO_OBJECT (splitmux, "This GOP (dur %" GST_STIME_FORMAT ") would overflow the fragment, Sending start_new_fragment cmd", GST_STIME_ARGS (splitmux->reference_ctx->in_running_time - splitmux->gop_start_time)); cmd = out_cmd_buf_new (); cmd->start_new_fragment = TRUE; g_queue_push_head (&splitmux->out_cmd_q, cmd); GST_SPLITMUX_BROADCAST_OUTPUT (splitmux); new_out_ts = splitmux->reference_ctx->in_running_time; splitmux->fragment_start_time = splitmux->gop_start_time; splitmux->fragment_total_bytes = 0; if (request_next_keyframe (splitmux, splitmux->reference_ctx->prev_in_keyframe) == FALSE) { GST_WARNING_OBJECT (splitmux, "Could not request a keyframe. Files may not split at the exact location they should"); } gst_buffer_replace (&splitmux->reference_ctx->prev_in_keyframe, NULL); } /* And set up to collect the next GOP */ if (!splitmux->reference_ctx->in_eos) { splitmux->input_state = SPLITMUX_INPUT_STATE_COLLECTING_GOP_START; splitmux->gop_start_time = new_out_ts; } else { /* This is probably already the current state, but just in case: */ splitmux->input_state = SPLITMUX_INPUT_STATE_FINISHING_UP; new_out_ts = GST_CLOCK_STIME_NONE; /* EOS runs until forever */ } /* And wake all input contexts to send a wake-up event */ g_list_foreach (splitmux->contexts, (GFunc) ctx_set_unblock, NULL); GST_SPLITMUX_BROADCAST_INPUT (splitmux); /* Now either way - either there was no overflow, or we requested a new fragment: release this GOP */ splitmux->fragment_total_bytes += splitmux->gop_total_bytes; if (splitmux->gop_total_bytes > 0) { GST_LOG_OBJECT (splitmux, "Releasing GOP to output. Bytes in fragment now %" G_GUINT64_FORMAT " time %" GST_STIME_FORMAT, splitmux->fragment_total_bytes, GST_STIME_ARGS (queued_time)); /* Send this GOP to the output command queue */ cmd = out_cmd_buf_new (); cmd->start_new_fragment = FALSE; cmd->max_output_ts = new_out_ts; GST_LOG_OBJECT (splitmux, "Sending GOP cmd to output for TS %" GST_STIME_FORMAT, GST_STIME_ARGS (new_out_ts)); g_queue_push_head (&splitmux->out_cmd_q, cmd); GST_SPLITMUX_BROADCAST_OUTPUT (splitmux); } splitmux->gop_total_bytes = 0; } /* Called with splitmux lock held */ /* Called from each input pad when it is has all the pieces * for a GOP or EOS, starting with the reference pad which has set the * splitmux->max_in_running_time */ static void check_completed_gop (GstSplitMuxSink * splitmux, MqStreamCtx * ctx) { GList *cur; GstEvent *event; /* On ENDING_FILE, the reference stream sends a command to start a new * fragment, then releases the GOP for output in the new fragment. * If somes streams received no buffer during the last GOP that overran, * because its next buffer has a timestamp bigger than * ctx->max_in_running_time, its queue is empty. In that case the only * way to wakeup the output thread is by injecting an event in the * queue. This usually happen with subtitle streams. * See https://bugzilla.gnome.org/show_bug.cgi?id=763711. */ if (ctx->need_unblock) { GST_LOG_OBJECT (ctx->sinkpad, "Sending splitmuxsink-unblock event"); event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM | GST_EVENT_TYPE_SERIALIZED, gst_structure_new ("splitmuxsink-unblock", "timestamp", G_TYPE_INT64, splitmux->max_in_running_time, NULL)); GST_SPLITMUX_UNLOCK (splitmux); gst_pad_send_event (ctx->sinkpad, event); GST_SPLITMUX_LOCK (splitmux); ctx->need_unblock = FALSE; GST_SPLITMUX_BROADCAST_INPUT (splitmux); /* state may have changed while we were unlocked. Loop again if so */ if (splitmux->input_state != SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT) return; } if (splitmux->input_state == SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT) { gboolean ready = TRUE; /* Iterate each pad, and check that the input running time is at least * up to the reference running time, and if so handle the collected GOP */ GST_LOG_OBJECT (splitmux, "Checking GOP collected, Max in running time %" GST_STIME_FORMAT " ctx %p", GST_STIME_ARGS (splitmux->max_in_running_time), ctx); for (cur = g_list_first (splitmux->contexts); cur != NULL; cur = g_list_next (cur)) { MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data); GST_LOG_OBJECT (splitmux, "Context %p sink pad %" GST_PTR_FORMAT " @ TS %" GST_STIME_FORMAT " EOS %d", tmpctx, tmpctx->sinkpad, GST_STIME_ARGS (tmpctx->in_running_time), tmpctx->in_eos); if (splitmux->max_in_running_time != GST_CLOCK_STIME_NONE && tmpctx->in_running_time < splitmux->max_in_running_time && !tmpctx->in_eos) { GST_LOG_OBJECT (splitmux, "Context %p sink pad %" GST_PTR_FORMAT " not ready. We'll sleep", tmpctx, tmpctx->sinkpad); ready = FALSE; break; } } if (ready) { GST_DEBUG_OBJECT (splitmux, "Collected GOP is complete. Processing (ctx %p)", ctx); /* All pads have a complete GOP, release it into the multiqueue */ handle_gathered_gop (splitmux); /* The user has requested a split, we can split now that the previous GOP * has been collected to the correct location */ if (g_atomic_int_compare_and_exchange (&(splitmux->split_requested), TRUE, FALSE)) { g_atomic_int_set (&(splitmux->do_split_next_gop), TRUE); } } } /* If upstream reached EOS we are not expecting more data, no need to wait * here. */ if (ctx->in_eos) return; /* Some pad is not yet ready, or GOP is being pushed * either way, sleep and wait to get woken */ while (splitmux->input_state == SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT && !ctx->flushing && (ctx->in_running_time >= splitmux->max_in_running_time) && (splitmux->max_in_running_time != GST_CLOCK_STIME_NONE)) { GST_LOG_OBJECT (splitmux, "Sleeping for GOP collection (ctx %p)", ctx); GST_SPLITMUX_WAIT_INPUT (splitmux); GST_LOG_OBJECT (splitmux, "Done waiting for complete GOP (ctx %p)", ctx); } } static GstPadProbeReturn handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx) { GstSplitMuxSink *splitmux = ctx->splitmux; GstBuffer *buf; MqStreamBuf *buf_info = NULL; GstClockTime ts; gboolean loop_again; gboolean keyframe = FALSE; GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type); /* FIXME: Handle buffer lists, until then make it clear they won't work */ if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) { g_warning ("Buffer list handling not implemented"); return GST_PAD_PROBE_DROP; } if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM || info->type & GST_PAD_PROBE_TYPE_EVENT_FLUSH) { GstEvent *event = gst_pad_probe_info_get_event (info); GST_LOG_OBJECT (pad, "Event %" GST_PTR_FORMAT, event); switch (GST_EVENT_TYPE (event)) { case GST_EVENT_SEGMENT: gst_event_copy_segment (event, &ctx->in_segment); break; case GST_EVENT_FLUSH_STOP: GST_SPLITMUX_LOCK (splitmux); gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED); ctx->in_eos = FALSE; ctx->in_running_time = GST_CLOCK_STIME_NONE; GST_SPLITMUX_UNLOCK (splitmux); break; case GST_EVENT_EOS: GST_SPLITMUX_LOCK (splitmux); ctx->in_eos = TRUE; if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED) goto beach; if (ctx->is_reference) { GST_INFO_OBJECT (splitmux, "Got Reference EOS. Finishing up"); /* check_completed_gop will act as if this is a new keyframe with infinite timestamp */ splitmux->input_state = SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT; /* Wake up other input pads to collect this GOP */ GST_SPLITMUX_BROADCAST_INPUT (splitmux); check_completed_gop (splitmux, ctx); } else if (splitmux->input_state == SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT) { /* If we are waiting for a GOP to be completed (ie, for aux * pads to catch up), then this pad is complete, so check * if the whole GOP is. */ check_completed_gop (splitmux, ctx); } GST_SPLITMUX_UNLOCK (splitmux); break; case GST_EVENT_GAP:{ GstClockTime gap_ts; GstClockTimeDiff rtime; gst_event_parse_gap (event, &gap_ts, NULL); if (gap_ts == GST_CLOCK_TIME_NONE) break; GST_SPLITMUX_LOCK (splitmux); if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED) goto beach; rtime = my_segment_to_running_time (&ctx->in_segment, gap_ts); GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_STIME_FORMAT, GST_STIME_ARGS (rtime)); if (ctx->is_reference && splitmux->fragment_start_time == GST_CLOCK_STIME_NONE) { splitmux->gop_start_time = splitmux->fragment_start_time = rtime; GST_LOG_OBJECT (splitmux, "Mux start time now %" GST_STIME_FORMAT, GST_STIME_ARGS (splitmux->fragment_start_time)); /* Also take this as the first start time when starting up, * so that we start counting overflow from the first frame */ if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time)) splitmux->max_in_running_time = splitmux->fragment_start_time; } GST_SPLITMUX_UNLOCK (splitmux); break; } default: break; } return GST_PAD_PROBE_PASS; } else if (info->type & GST_PAD_PROBE_TYPE_QUERY_DOWNSTREAM) { switch (GST_QUERY_TYPE (GST_QUERY (info->data))) { case GST_QUERY_ALLOCATION: return GST_PAD_PROBE_DROP; default: return GST_PAD_PROBE_PASS; } } buf = gst_pad_probe_info_get_buffer (info); buf_info = mq_stream_buf_new (); if (GST_BUFFER_PTS_IS_VALID (buf)) ts = GST_BUFFER_PTS (buf); else ts = GST_BUFFER_DTS (buf); GST_LOG_OBJECT (pad, "Buffer TS is %" GST_TIME_FORMAT, GST_TIME_ARGS (ts)); GST_SPLITMUX_LOCK (splitmux); if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED) goto beach; /* If this buffer has a timestamp, advance the input timestamp of the * stream */ if (GST_CLOCK_TIME_IS_VALID (ts)) { GstClockTimeDiff running_time = my_segment_to_running_time (&ctx->in_segment, ts); GST_LOG_OBJECT (pad, "Buffer running TS is %" GST_STIME_FORMAT, GST_STIME_ARGS (running_time)); if (GST_CLOCK_STIME_IS_VALID (running_time) && running_time > ctx->in_running_time) ctx->in_running_time = running_time; } /* Try to make sure we have a valid running time */ if (!GST_CLOCK_STIME_IS_VALID (ctx->in_running_time)) { ctx->in_running_time = my_segment_to_running_time (&ctx->in_segment, ctx->in_segment.start); } GST_LOG_OBJECT (pad, "in running time now %" GST_STIME_FORMAT, GST_STIME_ARGS (ctx->in_running_time)); buf_info->run_ts = ctx->in_running_time; buf_info->buf_size = gst_buffer_get_size (buf); buf_info->duration = GST_BUFFER_DURATION (buf); /* initialize fragment_start_time */ if (ctx->is_reference && splitmux->fragment_start_time == GST_CLOCK_STIME_NONE) { splitmux->gop_start_time = splitmux->fragment_start_time = buf_info->run_ts; GST_LOG_OBJECT (splitmux, "Mux start time now %" GST_STIME_FORMAT, GST_STIME_ARGS (splitmux->fragment_start_time)); gst_buffer_replace (&ctx->prev_in_keyframe, buf); /* Also take this as the first start time when starting up, * so that we start counting overflow from the first frame */ if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time)) splitmux->max_in_running_time = splitmux->fragment_start_time; if (request_next_keyframe (splitmux, ctx->prev_in_keyframe) == FALSE) { GST_WARNING_OBJECT (splitmux, "Could not request a keyframe. Files may not split at the exact location they should"); } gst_buffer_replace (&splitmux->reference_ctx->prev_in_keyframe, NULL); } GST_DEBUG_OBJECT (pad, "Buf TS %" GST_STIME_FORMAT " total GOP bytes %" G_GUINT64_FORMAT, GST_STIME_ARGS (buf_info->run_ts), splitmux->gop_total_bytes); loop_again = TRUE; do { if (ctx->flushing) break; switch (splitmux->input_state) { case SPLITMUX_INPUT_STATE_COLLECTING_GOP_START: if (ctx->is_reference) { /* This is the reference context. If it's a keyframe, * it marks the start of a new GOP and we should wait in * check_completed_gop before continuing, but either way * (keyframe or no, we'll pass this buffer through after * so set loop_again to FALSE */ loop_again = FALSE; if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT)) { /* Allow other input pads to catch up to here too */ splitmux->max_in_running_time = ctx->in_running_time; GST_LOG_OBJECT (splitmux, "Max in running time now %" GST_TIME_FORMAT, GST_TIME_ARGS (splitmux->max_in_running_time)); GST_SPLITMUX_BROADCAST_INPUT (splitmux); break; } GST_INFO_OBJECT (pad, "Have keyframe with running time %" GST_STIME_FORMAT, GST_STIME_ARGS (ctx->in_running_time)); keyframe = TRUE; splitmux->input_state = SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT; splitmux->max_in_running_time = ctx->in_running_time; GST_LOG_OBJECT (splitmux, "Max in running time now %" GST_TIME_FORMAT, GST_TIME_ARGS (splitmux->max_in_running_time)); /* Wake up other input pads to collect this GOP */ GST_SPLITMUX_BROADCAST_INPUT (splitmux); check_completed_gop (splitmux, ctx); /* Store this new keyframe to remember the start of GOP */ gst_buffer_replace (&ctx->prev_in_keyframe, buf); } else { /* Pass this buffer if the reference ctx is far enough ahead */ if (ctx->in_running_time < splitmux->max_in_running_time) { loop_again = FALSE; break; } /* We're still waiting for a keyframe on the reference pad, sleep */ GST_LOG_OBJECT (pad, "Sleeping for GOP start"); GST_SPLITMUX_WAIT_INPUT (splitmux); GST_LOG_OBJECT (pad, "Done sleeping for GOP start input state now %d", splitmux->input_state); } break; case SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT:{ /* We're collecting a GOP. If this is the reference context, * we need to check if this is a keyframe that marks the start * of the next GOP. If it is, it marks the end of the GOP we're * collecting, so sleep and wait until all the other pads also * reach that timestamp - at which point, we have an entire GOP * and either go to ENDING_FILE or release this GOP to the muxer and * go back to COLLECT_GOP_START. */ /* If we overran the target timestamp, it might be time to process * the GOP, otherwise bail out for more data */ GST_LOG_OBJECT (pad, "Checking TS %" GST_STIME_FORMAT " against max %" GST_STIME_FORMAT, GST_STIME_ARGS (ctx->in_running_time), GST_STIME_ARGS (splitmux->max_in_running_time)); if (ctx->in_running_time < splitmux->max_in_running_time) { loop_again = FALSE; break; } GST_LOG_OBJECT (pad, "Collected last packet of GOP. Checking other pads"); check_completed_gop (splitmux, ctx); break; } case SPLITMUX_INPUT_STATE_FINISHING_UP: loop_again = FALSE; break; default: loop_again = FALSE; break; } } while (loop_again); if (keyframe) { splitmux->queued_keyframes++; buf_info->keyframe = TRUE; } /* Update total input byte counter for overflow detect */ splitmux->gop_total_bytes += buf_info->buf_size; /* Now add this buffer to the queue just before returning */ g_queue_push_head (&ctx->queued_bufs, buf_info); GST_LOG_OBJECT (pad, "Returning to queue buffer %" GST_PTR_FORMAT " run ts %" GST_STIME_FORMAT, buf, GST_STIME_ARGS (ctx->in_running_time)); GST_SPLITMUX_UNLOCK (splitmux); return GST_PAD_PROBE_PASS; beach: GST_SPLITMUX_UNLOCK (splitmux); if (buf_info) mq_stream_buf_free (buf_info); return GST_PAD_PROBE_PASS; } static void grow_blocked_queues (GstSplitMuxSink * splitmux) { GList *cur; /* Scan other queues for full-ness and grow them */ for (cur = g_list_first (splitmux->contexts); cur != NULL; cur = g_list_next (cur)) { MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data); guint cur_limit; guint cur_len = g_queue_get_length (&tmpctx->queued_bufs); g_object_get (tmpctx->q, "max-size-buffers", &cur_limit, NULL); GST_LOG_OBJECT (tmpctx->q, "Queue len %u", cur_len); if (cur_len >= cur_limit) { cur_limit = cur_len + 1; GST_DEBUG_OBJECT (tmpctx->q, "Queue overflowed and needs enlarging. Growing to %u buffers", cur_limit); g_object_set (tmpctx->q, "max-size-buffers", cur_limit, NULL); } } } static void handle_q_underrun (GstElement * q, gpointer user_data) { MqStreamCtx *ctx = (MqStreamCtx *) (user_data); GstSplitMuxSink *splitmux = ctx->splitmux; GST_SPLITMUX_LOCK (splitmux); GST_DEBUG_OBJECT (q, "Queue reported underrun with %d keyframes and %d cmds enqueued", splitmux->queued_keyframes, g_queue_get_length (&splitmux->out_cmd_q)); grow_blocked_queues (splitmux); GST_SPLITMUX_UNLOCK (splitmux); } static void handle_q_overrun (GstElement * q, gpointer user_data) { MqStreamCtx *ctx = (MqStreamCtx *) (user_data); GstSplitMuxSink *splitmux = ctx->splitmux; gboolean allow_grow = FALSE; GST_SPLITMUX_LOCK (splitmux); GST_DEBUG_OBJECT (q, "Queue reported overrun with %d keyframes and %d cmds enqueued", splitmux->queued_keyframes, g_queue_get_length (&splitmux->out_cmd_q)); if (splitmux->queued_keyframes < 2) { /* Less than a full GOP queued, grow the queue */ allow_grow = TRUE; } else if (g_queue_get_length (&splitmux->out_cmd_q) < 1) { allow_grow = TRUE; } else { /* If another queue is starved, grow */ GList *cur; for (cur = g_list_first (splitmux->contexts); cur != NULL; cur = g_list_next (cur)) { MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data); if (tmpctx != ctx && g_queue_get_length (&tmpctx->queued_bufs) < 1) { allow_grow = TRUE; } } } GST_SPLITMUX_UNLOCK (splitmux); if (allow_grow) { guint cur_limit; g_object_get (q, "max-size-buffers", &cur_limit, NULL); cur_limit++; GST_DEBUG_OBJECT (q, "Queue overflowed and needs enlarging. Growing to %u buffers", cur_limit); g_object_set (q, "max-size-buffers", cur_limit, NULL); } } static GstPad * gst_splitmux_sink_request_new_pad (GstElement * element, GstPadTemplate * templ, const gchar * name, const GstCaps * caps) { GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element; GstPadTemplate *mux_template = NULL; GstPad *res = NULL; GstElement *q; GstPad *q_sink = NULL, *q_src = NULL; gchar *gname, *qname; gboolean is_video = FALSE; MqStreamCtx *ctx; GST_DEBUG_OBJECT (element, "templ:%s, name:%s", templ->name_template, name); GST_SPLITMUX_LOCK (splitmux); if (!create_muxer (splitmux)) goto fail; g_signal_emit (splitmux, signals[SIGNAL_MUXER_ADDED], 0, splitmux->muxer); if (templ->name_template) { if (g_str_equal (templ->name_template, "video")) { if (splitmux->have_video) goto already_have_video; /* FIXME: Look for a pad template with matching caps, rather than by name */ GST_DEBUG_OBJECT (element, "searching for pad-template with name 'video_%%u'"); mux_template = gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS (splitmux->muxer), "video_%u"); /* Fallback to find sink pad templates named 'video' (flvmux) */ if (!mux_template) { GST_DEBUG_OBJECT (element, "searching for pad-template with name 'video'"); mux_template = gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS (splitmux->muxer), "video"); } is_video = TRUE; name = NULL; } else { GST_DEBUG_OBJECT (element, "searching for pad-template with name '%s'", templ->name_template); mux_template = gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS (splitmux->muxer), templ->name_template); /* Fallback to find sink pad templates named 'audio' (flvmux) */ if (!mux_template) { GST_DEBUG_OBJECT (element, "searching for pad-template with name 'audio'"); mux_template = gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS (splitmux->muxer), "audio"); name = NULL; } } if (mux_template == NULL) { GST_DEBUG_OBJECT (element, "searching for pad-template with name 'sink_%%d'"); mux_template = gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS (splitmux->muxer), "sink_%d"); name = NULL; } if (mux_template == NULL) { GST_DEBUG_OBJECT (element, "searching for pad-template with name 'sink'"); mux_template = gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS (splitmux->muxer), "sink"); name = NULL; } } if (mux_template == NULL) { GST_ERROR_OBJECT (element, "unable to find a suitable sink pad-template on the muxer"); goto fail; } GST_DEBUG_OBJECT (element, "found sink pad-template '%s' on the muxer", mux_template->name_template); if (mux_template->presence == GST_PAD_REQUEST) { GST_DEBUG_OBJECT (element, "requesting pad from pad-template"); res = gst_element_request_pad (splitmux->muxer, mux_template, name, caps); if (res == NULL) goto fail; } else if (mux_template->presence == GST_PAD_ALWAYS) { GST_DEBUG_OBJECT (element, "accessing always pad from pad-template"); res = gst_element_get_static_pad (splitmux->muxer, mux_template->name_template); if (res == NULL) goto fail; } else { GST_ERROR_OBJECT (element, "unexpected pad presence %d", mux_template->presence); goto fail; } if (is_video) gname = g_strdup ("video"); else if (name == NULL) gname = gst_pad_get_name (res); else gname = g_strdup (name); qname = g_strdup_printf ("queue_%s", gname); if ((q = create_element (splitmux, "queue", qname, FALSE)) == NULL) { g_free (qname); goto fail; } g_free (qname); gst_element_set_state (q, GST_STATE_TARGET (splitmux)); g_object_set (q, "max-size-bytes", 0, "max-size-time", (guint64) (0), "max-size-buffers", 5, NULL); q_sink = gst_element_get_static_pad (q, "sink"); q_src = gst_element_get_static_pad (q, "src"); if (gst_pad_link (q_src, res) != GST_PAD_LINK_OK) { gst_element_release_request_pad (splitmux->muxer, res); gst_object_unref (GST_OBJECT (res)); goto fail; } gst_object_unref (GST_OBJECT (res)); ctx = mq_stream_ctx_new (splitmux); /* Context holds a ref: */ ctx->q = gst_object_ref (q); ctx->srcpad = q_src; ctx->sinkpad = q_sink; ctx->q_overrun_id = g_signal_connect (q, "overrun", (GCallback) handle_q_overrun, ctx); g_signal_connect (q, "underrun", (GCallback) handle_q_underrun, ctx); ctx->src_pad_block_id = gst_pad_add_probe (q_src, GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM | GST_PAD_PROBE_TYPE_EVENT_FLUSH, (GstPadProbeCallback) handle_mq_output, ctx, NULL); if (is_video && splitmux->reference_ctx != NULL) { splitmux->reference_ctx->is_reference = FALSE; splitmux->reference_ctx = NULL; } if (splitmux->reference_ctx == NULL) { splitmux->reference_ctx = ctx; ctx->is_reference = TRUE; } res = gst_ghost_pad_new_from_template (gname, q_sink, templ); g_object_set_qdata ((GObject *) (res), PAD_CONTEXT, ctx); ctx->sink_pad_block_id = gst_pad_add_probe (q_sink, GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM | GST_PAD_PROBE_TYPE_EVENT_FLUSH | GST_PAD_PROBE_TYPE_QUERY_DOWNSTREAM, (GstPadProbeCallback) handle_mq_input, ctx, NULL); GST_DEBUG_OBJECT (splitmux, "Request pad %" GST_PTR_FORMAT " feeds queue pad %" GST_PTR_FORMAT, res, q_sink); splitmux->contexts = g_list_append (splitmux->contexts, ctx); g_free (gname); if (is_video) splitmux->have_video = TRUE; gst_pad_set_active (res, TRUE); gst_element_add_pad (element, res); GST_SPLITMUX_UNLOCK (splitmux); return res; fail: GST_SPLITMUX_UNLOCK (splitmux); if (q_sink) gst_object_unref (q_sink); if (q_src) gst_object_unref (q_src); return NULL; already_have_video: GST_DEBUG_OBJECT (splitmux, "video sink pad already requested"); GST_SPLITMUX_UNLOCK (splitmux); return NULL; } static void gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad) { GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element; GstPad *muxpad = NULL; MqStreamCtx *ctx = (MqStreamCtx *) (g_object_get_qdata ((GObject *) (pad), PAD_CONTEXT)); GST_SPLITMUX_LOCK (splitmux); if (splitmux->muxer == NULL) goto fail; /* Elements don't exist yet - nothing to release */ GST_INFO_OBJECT (pad, "releasing request pad"); muxpad = gst_pad_get_peer (ctx->srcpad); /* Remove the context from our consideration */ splitmux->contexts = g_list_remove (splitmux->contexts, ctx); if (ctx->sink_pad_block_id) gst_pad_remove_probe (ctx->sinkpad, ctx->sink_pad_block_id); if (ctx->src_pad_block_id) gst_pad_remove_probe (ctx->srcpad, ctx->src_pad_block_id); /* Can release the context now */ mq_stream_ctx_free (ctx); if (ctx == splitmux->reference_ctx) splitmux->reference_ctx = NULL; /* Release and free the muxer input */ if (muxpad) { gst_element_release_request_pad (splitmux->muxer, muxpad); gst_object_unref (muxpad); } if (GST_PAD_PAD_TEMPLATE (pad) && g_str_equal (GST_PAD_TEMPLATE_NAME_TEMPLATE (GST_PAD_PAD_TEMPLATE (pad)), "video")) splitmux->have_video = FALSE; gst_element_remove_pad (element, pad); /* Reset the internal elements only after all request pads are released */ if (splitmux->contexts == NULL) gst_splitmux_reset (splitmux); fail: GST_SPLITMUX_UNLOCK (splitmux); } static GstElement * create_element (GstSplitMuxSink * splitmux, const gchar * factory, const gchar * name, gboolean locked) { GstElement *ret = gst_element_factory_make (factory, name); if (ret == NULL) { g_warning ("Failed to create %s - splitmuxsink will not work", name); return NULL; } if (locked) { /* Ensure the sink starts in locked state and NULL - it will be changed * by the filename setting code */ gst_element_set_locked_state (ret, TRUE); gst_element_set_state (ret, GST_STATE_NULL); } if (!gst_bin_add (GST_BIN (splitmux), ret)) { g_warning ("Could not add %s element - splitmuxsink will not work", name); gst_object_unref (ret); return NULL; } return ret; } static gboolean create_muxer (GstSplitMuxSink * splitmux) { /* Create internal elements */ if (splitmux->muxer == NULL) { GstElement *provided_muxer = NULL; GST_OBJECT_LOCK (splitmux); if (splitmux->provided_muxer != NULL) provided_muxer = gst_object_ref (splitmux->provided_muxer); GST_OBJECT_UNLOCK (splitmux); if ((!splitmux->async_finalize && provided_muxer == NULL) || (splitmux->async_finalize && splitmux->muxer_factory == NULL)) { if ((splitmux->muxer = create_element (splitmux, DEFAULT_MUXER, "muxer", FALSE)) == NULL) goto fail; } else if (splitmux->async_finalize) { if ((splitmux->muxer = create_element (splitmux, splitmux->muxer_factory, "muxer", FALSE)) == NULL) goto fail; if (splitmux->muxer_properties) gst_structure_foreach (splitmux->muxer_properties, _set_property_from_structure, splitmux->muxer); } else { /* Ensure it's not in locked state (we might be reusing an old element) */ gst_element_set_locked_state (provided_muxer, FALSE); if (!gst_bin_add (GST_BIN (splitmux), provided_muxer)) { g_warning ("Could not add muxer element - splitmuxsink will not work"); gst_object_unref (provided_muxer); goto fail; } splitmux->muxer = provided_muxer; gst_object_unref (provided_muxer); } if (splitmux->use_robust_muxing) { update_muxer_properties (splitmux); } } return TRUE; fail: return FALSE; } static GstElement * find_sink (GstElement * e) { GstElement *res = NULL; GstIterator *iter; gboolean done = FALSE; GValue data = { 0, }; if (!GST_IS_BIN (e)) return e; if (g_object_class_find_property (G_OBJECT_GET_CLASS (e), "location") != NULL) return e; iter = gst_bin_iterate_sinks (GST_BIN (e)); while (!done) { switch (gst_iterator_next (iter, &data)) { case GST_ITERATOR_OK: { GstElement *child = g_value_get_object (&data); if (g_object_class_find_property (G_OBJECT_GET_CLASS (child), "location") != NULL) { res = child; done = TRUE; } g_value_reset (&data); break; } case GST_ITERATOR_RESYNC: gst_iterator_resync (iter); break; case GST_ITERATOR_DONE: done = TRUE; break; case GST_ITERATOR_ERROR: g_assert_not_reached (); break; } } g_value_unset (&data); gst_iterator_free (iter); return res; } static gboolean create_sink (GstSplitMuxSink * splitmux) { GstElement *provided_sink = NULL; if (splitmux->active_sink == NULL) { GST_OBJECT_LOCK (splitmux); if (splitmux->provided_sink != NULL) provided_sink = gst_object_ref (splitmux->provided_sink); GST_OBJECT_UNLOCK (splitmux); if ((!splitmux->async_finalize && provided_sink == NULL) || (splitmux->async_finalize && splitmux->sink_factory == NULL)) { if ((splitmux->sink = create_element (splitmux, DEFAULT_SINK, "sink", TRUE)) == NULL) goto fail; splitmux->active_sink = splitmux->sink; } else if (splitmux->async_finalize) { if ((splitmux->sink = create_element (splitmux, splitmux->sink_factory, "sink", TRUE)) == NULL) goto fail; if (splitmux->sink_properties) gst_structure_foreach (splitmux->sink_properties, _set_property_from_structure, splitmux->sink); splitmux->active_sink = splitmux->sink; } else { /* Ensure the sink starts in locked state and NULL - it will be changed * by the filename setting code */ gst_element_set_locked_state (provided_sink, TRUE); gst_element_set_state (provided_sink, GST_STATE_NULL); if (!gst_bin_add (GST_BIN (splitmux), provided_sink)) { g_warning ("Could not add sink elements - splitmuxsink will not work"); gst_object_unref (provided_sink); goto fail; } splitmux->active_sink = provided_sink; /* The bin holds a ref now, we can drop our tmp ref */ gst_object_unref (provided_sink); /* Find the sink element */ splitmux->sink = find_sink (splitmux->active_sink); if (splitmux->sink == NULL) { g_warning ("Could not locate sink element in provided sink - splitmuxsink will not work"); goto fail; } } #if 1 if (g_object_class_find_property (G_OBJECT_GET_CLASS (splitmux->sink), "async") != NULL) { /* async child elements are causing state change races and weird * failures, so let's try and turn that off */ g_object_set (splitmux->sink, "async", FALSE, NULL); } #endif if (!gst_element_link (splitmux->muxer, splitmux->active_sink)) { g_warning ("Failed to link muxer and sink- splitmuxsink will not work"); goto fail; } } return TRUE; fail: return FALSE; } #ifdef __GNUC__ #pragma GCC diagnostic ignored "-Wformat-nonliteral" #endif static void set_next_filename (GstSplitMuxSink * splitmux, MqStreamCtx * ctx) { gchar *fname = NULL; GstSample *sample; GstCaps *caps; gst_splitmux_sink_ensure_max_files (splitmux); if (ctx->cur_out_buffer == NULL) { GST_WARNING_OBJECT (splitmux, "Starting next file without buffer"); } caps = gst_pad_get_current_caps (ctx->srcpad); sample = gst_sample_new (ctx->cur_out_buffer, caps, &ctx->out_segment, NULL); g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION_FULL], 0, splitmux->fragment_id, sample, &fname); gst_sample_unref (sample); if (caps) gst_caps_unref (caps); if (fname == NULL) { /* Fallback to the old signal if the new one returned nothing */ g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION], 0, splitmux->fragment_id, &fname); } if (!fname) fname = splitmux->location ? g_strdup_printf (splitmux->location, splitmux->fragment_id) : NULL; if (fname) { GST_INFO_OBJECT (splitmux, "Setting file to %s", fname); g_object_set (splitmux->sink, "location", fname, NULL); g_free (fname); splitmux->fragment_id++; } } static void do_async_start (GstSplitMuxSink * splitmux) { GstMessage *message; if (!splitmux->need_async_start) { GST_INFO_OBJECT (splitmux, "no async_start needed"); return; } splitmux->async_pending = TRUE; GST_INFO_OBJECT (splitmux, "Sending async_start message"); message = gst_message_new_async_start (GST_OBJECT_CAST (splitmux)); GST_BIN_CLASS (parent_class)->handle_message (GST_BIN_CAST (splitmux), message); } static void do_async_done (GstSplitMuxSink * splitmux) { GstMessage *message; if (splitmux->async_pending) { GST_INFO_OBJECT (splitmux, "Sending async_done message"); message = gst_message_new_async_done (GST_OBJECT_CAST (splitmux), GST_CLOCK_TIME_NONE); GST_BIN_CLASS (parent_class)->handle_message (GST_BIN_CAST (splitmux), message); splitmux->async_pending = FALSE; } splitmux->need_async_start = FALSE; } static GstStateChangeReturn gst_splitmux_sink_change_state (GstElement * element, GstStateChange transition) { GstStateChangeReturn ret; GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element; switch (transition) { case GST_STATE_CHANGE_NULL_TO_READY:{ GST_SPLITMUX_LOCK (splitmux); if (!create_muxer (splitmux) || !create_sink (splitmux)) { ret = GST_STATE_CHANGE_FAILURE; GST_SPLITMUX_UNLOCK (splitmux); goto beach; } g_signal_emit (splitmux, signals[SIGNAL_MUXER_ADDED], 0, splitmux->muxer); g_signal_emit (splitmux, signals[SIGNAL_SINK_ADDED], 0, splitmux->sink); GST_SPLITMUX_UNLOCK (splitmux); splitmux->fragment_id = 0; break; } case GST_STATE_CHANGE_READY_TO_PAUSED:{ GST_SPLITMUX_LOCK (splitmux); /* Start by collecting one input on each pad */ splitmux->input_state = SPLITMUX_INPUT_STATE_COLLECTING_GOP_START; splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE; splitmux->max_in_running_time = GST_CLOCK_STIME_NONE; splitmux->gop_start_time = splitmux->fragment_start_time = GST_CLOCK_STIME_NONE; splitmux->muxed_out_bytes = 0; splitmux->ready_for_output = FALSE; GST_SPLITMUX_UNLOCK (splitmux); break; } case GST_STATE_CHANGE_PAUSED_TO_READY: g_atomic_int_set (&(splitmux->split_requested), FALSE); g_atomic_int_set (&(splitmux->do_split_next_gop), FALSE); case GST_STATE_CHANGE_READY_TO_NULL: GST_SPLITMUX_LOCK (splitmux); gst_queue_array_clear (splitmux->times_to_split); splitmux->output_state = SPLITMUX_OUTPUT_STATE_STOPPED; splitmux->input_state = SPLITMUX_INPUT_STATE_STOPPED; /* Wake up any blocked threads */ GST_LOG_OBJECT (splitmux, "State change -> NULL or READY. Waking threads"); GST_SPLITMUX_BROADCAST_INPUT (splitmux); GST_SPLITMUX_BROADCAST_OUTPUT (splitmux); GST_SPLITMUX_UNLOCK (splitmux); break; default: break; } ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition); if (ret == GST_STATE_CHANGE_FAILURE) goto beach; switch (transition) { case GST_STATE_CHANGE_PLAYING_TO_PAUSED: splitmux->need_async_start = TRUE; break; case GST_STATE_CHANGE_READY_TO_PAUSED:{ /* Change state async, because our child sink might not * be ready to do that for us yet if it's state is still locked */ splitmux->need_async_start = TRUE; /* we want to go async to PAUSED until we managed to configure and add the * sink */ GST_SPLITMUX_LOCK (splitmux); do_async_start (splitmux); GST_SPLITMUX_UNLOCK (splitmux); ret = GST_STATE_CHANGE_ASYNC; break; } case GST_STATE_CHANGE_READY_TO_NULL: GST_SPLITMUX_LOCK (splitmux); splitmux->fragment_id = 0; /* Reset internal elements only if no pad contexts are using them */ if (splitmux->contexts == NULL) gst_splitmux_reset (splitmux); do_async_done (splitmux); GST_SPLITMUX_UNLOCK (splitmux); break; default: break; } beach: if (transition == GST_STATE_CHANGE_NULL_TO_READY && ret == GST_STATE_CHANGE_FAILURE) { /* Cleanup elements on failed transition out of NULL */ gst_splitmux_reset (splitmux); GST_SPLITMUX_LOCK (splitmux); do_async_done (splitmux); GST_SPLITMUX_UNLOCK (splitmux); } return ret; } gboolean register_splitmuxsink (GstPlugin * plugin) { GST_DEBUG_CATEGORY_INIT (splitmux_debug, "splitmuxsink", 0, "Split File Muxing Sink"); return gst_element_register (plugin, "splitmuxsink", GST_RANK_NONE, GST_TYPE_SPLITMUX_SINK); } static void gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux) { if (splitmux->max_files && splitmux->fragment_id >= splitmux->max_files) { splitmux->fragment_id = 0; } } static void split_now (GstSplitMuxSink * splitmux) { g_atomic_int_set (&(splitmux->do_split_next_gop), TRUE); } static void split_after (GstSplitMuxSink * splitmux) { g_atomic_int_set (&(splitmux->split_requested), TRUE); } static void split_at_running_time (GstSplitMuxSink * splitmux, GstClockTime split_time) { gboolean send_keyframe_requests; GST_SPLITMUX_LOCK (splitmux); gst_queue_array_push_tail_struct (splitmux->times_to_split, &split_time); send_keyframe_requests = splitmux->send_keyframe_requests; GST_SPLITMUX_UNLOCK (splitmux); if (send_keyframe_requests) { GstEvent *ev = gst_video_event_new_upstream_force_key_unit (split_time, TRUE, 0); GST_INFO_OBJECT (splitmux, "Requesting next keyframe at %" GST_TIME_FORMAT, GST_TIME_ARGS (split_time)); if (!gst_pad_push_event (splitmux->reference_ctx->sinkpad, ev)) { GST_WARNING_OBJECT (splitmux, "Could not request keyframe at %" GST_TIME_FORMAT, GST_TIME_ARGS (split_time)); } } }