/* GStreamer Muxer bin that splits output stream by size/time * Copyright (C) <2014> Jan Schmidt * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Library General Public * License as published by the Free Software Foundation; either * version 2 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Library General Public License for more details. * * You should have received a copy of the GNU Library General Public * License along with this library; if not, write to the * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, * Boston, MA 02110-1301, USA. */ /** * SECTION:element-splitmuxsink * @short_description: Muxer wrapper for splitting output stream by size or time * * This element wraps a muxer and a sink, and starts a new file when the mux * contents are about to cross a threshold of maximum size of maximum time, * splitting at video keyframe boundaries. Exactly one input video stream * can be muxed, with as many accompanying audio and subtitle streams as * desired. * * By default, it uses mp4mux and filesink, but they can be changed via * the 'muxer' and 'sink' properties. * * The minimum file size is 1 GOP, however - so limits may be overrun if the * distance between any 2 keyframes is larger than the limits. * * If a video stream is available, the splitting process is driven by the video * stream contents, and the video stream must contain closed GOPs for the output * file parts to be played individually correctly. In the absence of a video * stream, the first available stream is used as reference for synchronization. * * * Example pipelines * |[ * gst-launch-1.0 -e v4l2src num-buffers=500 ! video/x-raw,width=320,height=240 ! videoconvert ! queue ! timeoverlay ! x264enc key-int-max=10 ! h264parse ! splitmuxsink location=video%02d.mov max-size-time=10000000000 max-size-bytes=1000000 * ]| * Records a video stream captured from a v4l2 device and muxes it into * ISO mp4 files, splitting as needed to limit size/duration to 10 seconds * and 1MB maximum size. * */ #ifdef HAVE_CONFIG_H #include "config.h" #endif #include #include #include #include "gstsplitmuxsink.h" GST_DEBUG_CATEGORY_STATIC (splitmux_debug); #define GST_CAT_DEFAULT splitmux_debug #define GST_SPLITMUX_LOCK(s) g_mutex_lock(&(s)->lock) #define GST_SPLITMUX_UNLOCK(s) g_mutex_unlock(&(s)->lock) #define GST_SPLITMUX_WAIT(s) g_cond_wait (&(s)->data_cond, &(s)->lock) #define GST_SPLITMUX_BROADCAST(s) g_cond_broadcast (&(s)->data_cond) enum { PROP_0, PROP_LOCATION, PROP_MAX_SIZE_TIME, PROP_MAX_SIZE_BYTES, PROP_SEND_KEYFRAME_REQUESTS, PROP_MAX_FILES, PROP_MUXER_OVERHEAD, PROP_MUXER, PROP_SINK }; #define DEFAULT_MAX_SIZE_TIME 0 #define DEFAULT_MAX_SIZE_BYTES 0 #define DEFAULT_MAX_FILES 0 #define DEFAULT_MUXER_OVERHEAD 0.02 #define DEFAULT_SEND_KEYFRAME_REQUESTS FALSE #define DEFAULT_MUXER "mp4mux" #define DEFAULT_SINK "filesink" enum { SIGNAL_FORMAT_LOCATION, SIGNAL_LAST }; static guint signals[SIGNAL_LAST]; static GstStaticPadTemplate video_sink_template = GST_STATIC_PAD_TEMPLATE ("video", GST_PAD_SINK, GST_PAD_REQUEST, GST_STATIC_CAPS_ANY); static GstStaticPadTemplate audio_sink_template = GST_STATIC_PAD_TEMPLATE ("audio_%u", GST_PAD_SINK, GST_PAD_REQUEST, GST_STATIC_CAPS_ANY); static GstStaticPadTemplate subtitle_sink_template = GST_STATIC_PAD_TEMPLATE ("subtitle_%u", GST_PAD_SINK, GST_PAD_REQUEST, GST_STATIC_CAPS_ANY); static GQuark PAD_CONTEXT; static void _do_init (void) { PAD_CONTEXT = g_quark_from_static_string ("pad-context"); } #define gst_splitmux_sink_parent_class parent_class G_DEFINE_TYPE_EXTENDED (GstSplitMuxSink, gst_splitmux_sink, GST_TYPE_BIN, 0, _do_init ()); static gboolean create_elements (GstSplitMuxSink * splitmux); static gboolean create_sink (GstSplitMuxSink * splitmux); static void gst_splitmux_sink_set_property (GObject * object, guint prop_id, const GValue * value, GParamSpec * pspec); static void gst_splitmux_sink_get_property (GObject * object, guint prop_id, GValue * value, GParamSpec * pspec); static void gst_splitmux_sink_dispose (GObject * object); static void gst_splitmux_sink_finalize (GObject * object); static GstPad *gst_splitmux_sink_request_new_pad (GstElement * element, GstPadTemplate * templ, const gchar * name, const GstCaps * caps); static void gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad); static GstStateChangeReturn gst_splitmux_sink_change_state (GstElement * element, GstStateChange transition); static void bus_handler (GstBin * bin, GstMessage * msg); static void set_next_filename (GstSplitMuxSink * splitmux); static void start_next_fragment (GstSplitMuxSink * splitmux); static void check_queue_length (GstSplitMuxSink * splitmux, MqStreamCtx * ctx); static void mq_stream_ctx_unref (MqStreamCtx * ctx); static void gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux); static MqStreamBuf * mq_stream_buf_new (void) { return g_slice_new0 (MqStreamBuf); } static void mq_stream_buf_free (MqStreamBuf * data) { g_slice_free (MqStreamBuf, data); } static void gst_splitmux_sink_class_init (GstSplitMuxSinkClass * klass) { GObjectClass *gobject_class = (GObjectClass *) klass; GstElementClass *gstelement_class = (GstElementClass *) klass; GstBinClass *gstbin_class = (GstBinClass *) klass; gobject_class->set_property = gst_splitmux_sink_set_property; gobject_class->get_property = gst_splitmux_sink_get_property; gobject_class->dispose = gst_splitmux_sink_dispose; gobject_class->finalize = gst_splitmux_sink_finalize; gst_element_class_set_static_metadata (gstelement_class, "Split Muxing Bin", "Generic/Bin/Muxer", "Convenience bin that muxes incoming streams into multiple time/size limited files", "Jan Schmidt "); gst_element_class_add_static_pad_template (gstelement_class, &video_sink_template); gst_element_class_add_static_pad_template (gstelement_class, &audio_sink_template); gst_element_class_add_static_pad_template (gstelement_class, &subtitle_sink_template); gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_splitmux_sink_change_state); gstelement_class->request_new_pad = GST_DEBUG_FUNCPTR (gst_splitmux_sink_request_new_pad); gstelement_class->release_pad = GST_DEBUG_FUNCPTR (gst_splitmux_sink_release_pad); gstbin_class->handle_message = bus_handler; g_object_class_install_property (gobject_class, PROP_LOCATION, g_param_spec_string ("location", "File Output Pattern", "Format string pattern for the location of the files to write (e.g. video%05d.mp4)", NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); g_object_class_install_property (gobject_class, PROP_MUXER_OVERHEAD, g_param_spec_double ("mux-overhead", "Muxing Overhead", "Extra size overhead of muxing (0.02 = 2%)", 0.0, 1.0, DEFAULT_MUXER_OVERHEAD, G_PARAM_READWRITE | GST_PARAM_CONTROLLABLE | G_PARAM_STATIC_STRINGS)); g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME, g_param_spec_uint64 ("max-size-time", "Max. size (ns)", "Max. amount of time per file (in ns, 0=disable)", 0, G_MAXUINT64, DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BYTES, g_param_spec_uint64 ("max-size-bytes", "Max. size bytes", "Max. amount of data per file (in bytes, 0=disable)", 0, G_MAXUINT64, DEFAULT_MAX_SIZE_BYTES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); g_object_class_install_property (gobject_class, PROP_SEND_KEYFRAME_REQUESTS, g_param_spec_boolean ("send-keyframe-requests", "Request keyframes at max-size-time", "Request a keyframe every max-size-time ns to try splitting at that point. " "Needs max-size-bytes to be 0 in order to be effective.", DEFAULT_SEND_KEYFRAME_REQUESTS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); g_object_class_install_property (gobject_class, PROP_MAX_FILES, g_param_spec_uint ("max-files", "Max files", "Maximum number of files to keep on disk. Once the maximum is reached," "old files start to be deleted to make room for new ones.", 0, G_MAXUINT, DEFAULT_MAX_FILES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); g_object_class_install_property (gobject_class, PROP_MUXER, g_param_spec_object ("muxer", "Muxer", "The muxer element to use (NULL = default mp4mux)", GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); g_object_class_install_property (gobject_class, PROP_SINK, g_param_spec_object ("sink", "Sink", "The sink element (or element chain) to use (NULL = default filesink)", GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); /** * GstSplitMuxSink::format-location: * @splitmux: the #GstSplitMuxSink * @fragment_id: the sequence number of the file to be created * * Returns: the location to be used for the next output file */ signals[SIGNAL_FORMAT_LOCATION] = g_signal_new ("format-location", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_STRING, 1, G_TYPE_UINT); } static void gst_splitmux_sink_init (GstSplitMuxSink * splitmux) { g_mutex_init (&splitmux->lock); g_cond_init (&splitmux->data_cond); splitmux->mux_overhead = DEFAULT_MUXER_OVERHEAD; splitmux->threshold_time = DEFAULT_MAX_SIZE_TIME; splitmux->threshold_bytes = DEFAULT_MAX_SIZE_BYTES; splitmux->max_files = DEFAULT_MAX_FILES; splitmux->send_keyframe_requests = DEFAULT_SEND_KEYFRAME_REQUESTS; GST_OBJECT_FLAG_SET (splitmux, GST_ELEMENT_FLAG_SINK); } static void gst_splitmux_reset (GstSplitMuxSink * splitmux) { if (splitmux->mq) { gst_element_set_locked_state (splitmux->mq, TRUE); gst_element_set_state (splitmux->mq, GST_STATE_NULL); gst_bin_remove (GST_BIN (splitmux), splitmux->mq); } if (splitmux->muxer) { gst_element_set_locked_state (splitmux->muxer, TRUE); gst_element_set_state (splitmux->muxer, GST_STATE_NULL); gst_bin_remove (GST_BIN (splitmux), splitmux->muxer); } if (splitmux->active_sink) { gst_element_set_locked_state (splitmux->active_sink, TRUE); gst_element_set_state (splitmux->active_sink, GST_STATE_NULL); gst_bin_remove (GST_BIN (splitmux), splitmux->active_sink); } splitmux->sink = splitmux->active_sink = splitmux->muxer = splitmux->mq = NULL; } static void gst_splitmux_sink_dispose (GObject * object) { GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object); G_OBJECT_CLASS (parent_class)->dispose (object); /* Calling parent dispose invalidates all child pointers */ splitmux->sink = splitmux->active_sink = splitmux->muxer = splitmux->mq = NULL; } static void gst_splitmux_sink_finalize (GObject * object) { GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object); g_cond_clear (&splitmux->data_cond); g_mutex_clear (&splitmux->lock); if (splitmux->provided_sink) gst_object_unref (splitmux->provided_sink); if (splitmux->provided_muxer) gst_object_unref (splitmux->provided_muxer); g_free (splitmux->location); /* Make sure to free any un-released contexts */ g_list_foreach (splitmux->contexts, (GFunc) mq_stream_ctx_unref, NULL); g_list_free (splitmux->contexts); G_OBJECT_CLASS (parent_class)->finalize (object); } static void gst_splitmux_sink_set_property (GObject * object, guint prop_id, const GValue * value, GParamSpec * pspec) { GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object); switch (prop_id) { case PROP_LOCATION:{ GST_OBJECT_LOCK (splitmux); g_free (splitmux->location); splitmux->location = g_value_dup_string (value); GST_OBJECT_UNLOCK (splitmux); break; } case PROP_MAX_SIZE_BYTES: GST_OBJECT_LOCK (splitmux); splitmux->threshold_bytes = g_value_get_uint64 (value); GST_OBJECT_UNLOCK (splitmux); break; case PROP_MAX_SIZE_TIME: GST_OBJECT_LOCK (splitmux); splitmux->threshold_time = g_value_get_uint64 (value); GST_OBJECT_UNLOCK (splitmux); break; case PROP_SEND_KEYFRAME_REQUESTS: GST_OBJECT_LOCK (splitmux); splitmux->send_keyframe_requests = g_value_get_boolean (value); GST_OBJECT_UNLOCK (splitmux); break; case PROP_MAX_FILES: GST_OBJECT_LOCK (splitmux); splitmux->max_files = g_value_get_uint (value); GST_OBJECT_UNLOCK (splitmux); break; case PROP_MUXER_OVERHEAD: GST_OBJECT_LOCK (splitmux); splitmux->mux_overhead = g_value_get_double (value); GST_OBJECT_UNLOCK (splitmux); break; case PROP_SINK: GST_OBJECT_LOCK (splitmux); if (splitmux->provided_sink) gst_object_unref (splitmux->provided_sink); splitmux->provided_sink = g_value_get_object (value); gst_object_ref_sink (splitmux->provided_sink); GST_OBJECT_UNLOCK (splitmux); break; case PROP_MUXER: GST_OBJECT_LOCK (splitmux); if (splitmux->provided_muxer) gst_object_unref (splitmux->provided_muxer); splitmux->provided_muxer = g_value_get_object (value); gst_object_ref_sink (splitmux->provided_muxer); GST_OBJECT_UNLOCK (splitmux); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; } } static void gst_splitmux_sink_get_property (GObject * object, guint prop_id, GValue * value, GParamSpec * pspec) { GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object); switch (prop_id) { case PROP_LOCATION: GST_OBJECT_LOCK (splitmux); g_value_set_string (value, splitmux->location); GST_OBJECT_UNLOCK (splitmux); break; case PROP_MAX_SIZE_BYTES: GST_OBJECT_LOCK (splitmux); g_value_set_uint64 (value, splitmux->threshold_bytes); GST_OBJECT_UNLOCK (splitmux); break; case PROP_MAX_SIZE_TIME: GST_OBJECT_LOCK (splitmux); g_value_set_uint64 (value, splitmux->threshold_time); GST_OBJECT_UNLOCK (splitmux); break; case PROP_SEND_KEYFRAME_REQUESTS: GST_OBJECT_LOCK (splitmux); g_value_set_boolean (value, splitmux->send_keyframe_requests); GST_OBJECT_UNLOCK (splitmux); break; case PROP_MAX_FILES: GST_OBJECT_LOCK (splitmux); g_value_set_uint (value, splitmux->max_files); GST_OBJECT_UNLOCK (splitmux); break; case PROP_MUXER_OVERHEAD: GST_OBJECT_LOCK (splitmux); g_value_set_double (value, splitmux->mux_overhead); GST_OBJECT_UNLOCK (splitmux); break; case PROP_SINK: GST_OBJECT_LOCK (splitmux); g_value_set_object (value, splitmux->provided_sink); GST_OBJECT_UNLOCK (splitmux); break; case PROP_MUXER: GST_OBJECT_LOCK (splitmux); g_value_set_object (value, splitmux->provided_muxer); GST_OBJECT_UNLOCK (splitmux); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; } } /* Convenience function */ static inline GstClockTimeDiff my_segment_to_running_time (GstSegment * segment, GstClockTime val) { GstClockTimeDiff res = GST_CLOCK_STIME_NONE; if (GST_CLOCK_TIME_IS_VALID (val)) { gboolean sign = gst_segment_to_running_time_full (segment, GST_FORMAT_TIME, val, &val); if (sign > 0) res = val; else if (sign < 0) res = -val; } return res; } static GstPad * mq_sink_to_src (GstElement * mq, GstPad * sink_pad) { gchar *tmp, *sinkname, *srcname; GstPad *mq_src; sinkname = gst_pad_get_name (sink_pad); tmp = sinkname + 5; srcname = g_strdup_printf ("src_%s", tmp); mq_src = gst_element_get_static_pad (mq, srcname); g_free (sinkname); g_free (srcname); return mq_src; } static gboolean get_pads_from_mq (GstSplitMuxSink * splitmux, GstPad ** sink_pad, GstPad ** src_pad) { GstPad *mq_sink; GstPad *mq_src; /* Request a pad from multiqueue, then connect this one, then * discover the corresponding output pad and return both */ mq_sink = gst_element_get_request_pad (splitmux->mq, "sink_%u"); if (mq_sink == NULL) return FALSE; mq_src = mq_sink_to_src (splitmux->mq, mq_sink); if (mq_src == NULL) goto fail; *sink_pad = mq_sink; *src_pad = mq_src; return TRUE; fail: gst_element_release_request_pad (splitmux->mq, mq_sink); return FALSE; } static MqStreamCtx * mq_stream_ctx_new (GstSplitMuxSink * splitmux) { MqStreamCtx *ctx; ctx = g_new0 (MqStreamCtx, 1); g_atomic_int_set (&ctx->refcount, 1); ctx->splitmux = splitmux; gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED); gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED); ctx->in_running_time = ctx->out_running_time = GST_CLOCK_STIME_NONE; g_queue_init (&ctx->queued_bufs); return ctx; } static void mq_stream_ctx_free (MqStreamCtx * ctx) { g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL); g_queue_clear (&ctx->queued_bufs); g_free (ctx); } static void mq_stream_ctx_unref (MqStreamCtx * ctx) { if (g_atomic_int_dec_and_test (&ctx->refcount)) mq_stream_ctx_free (ctx); } static void mq_stream_ctx_ref (MqStreamCtx * ctx) { g_atomic_int_inc (&ctx->refcount); } static void _pad_block_destroy_sink_notify (MqStreamCtx * ctx) { ctx->sink_pad_block_id = 0; mq_stream_ctx_unref (ctx); } static void _pad_block_destroy_src_notify (MqStreamCtx * ctx) { ctx->src_pad_block_id = 0; mq_stream_ctx_unref (ctx); } static void send_fragment_opened_closed_msg (GstSplitMuxSink * splitmux, gboolean opened) { gchar *location = NULL; GstMessage *msg; const gchar *msg_name = opened ? "splitmuxsink-fragment-opened" : "splitmuxsink-fragment-closed"; g_object_get (splitmux->sink, "location", &location, NULL); msg = gst_message_new_element (GST_OBJECT (splitmux), gst_structure_new (msg_name, "location", G_TYPE_STRING, location, "running-time", GST_TYPE_CLOCK_TIME, splitmux->reference_ctx->out_running_time, NULL)); gst_element_post_message (GST_ELEMENT_CAST (splitmux), msg); g_free (location); } /* Called with lock held, drops the lock to send EOS to the * pad */ static void send_eos (GstSplitMuxSink * splitmux, MqStreamCtx * ctx) { GstEvent *eos; GstPad *pad; eos = gst_event_new_eos (); pad = gst_pad_get_peer (ctx->srcpad); ctx->out_eos = TRUE; GST_INFO_OBJECT (splitmux, "Sending EOS on %" GST_PTR_FORMAT, pad); GST_SPLITMUX_UNLOCK (splitmux); gst_pad_send_event (pad, eos); GST_SPLITMUX_LOCK (splitmux); gst_object_unref (pad); } /* Called with splitmux lock held to check if this output * context needs to sleep to wait for the release of the * next GOP, or to send EOS to close out the current file */ static void complete_or_wait_on_out (GstSplitMuxSink * splitmux, MqStreamCtx * ctx) { do { GST_LOG_OBJECT (ctx->srcpad, "Checking running time %" GST_STIME_FORMAT " against max %" GST_STIME_FORMAT, GST_STIME_ARGS (ctx->out_running_time), GST_STIME_ARGS (splitmux->max_out_running_time)); if (splitmux->max_out_running_time == GST_CLOCK_STIME_NONE || ctx->out_running_time < splitmux->max_out_running_time) { splitmux->have_muxed_something = TRUE; return; } if (ctx->flushing || splitmux->state == SPLITMUX_STATE_STOPPED) return; if (splitmux->state == SPLITMUX_STATE_ENDING_FILE) { if (ctx->out_eos == FALSE) { send_eos (splitmux, ctx); continue; } } else if (splitmux->state == SPLITMUX_STATE_START_NEXT_FRAGMENT) { start_next_fragment (splitmux); continue; } GST_INFO_OBJECT (ctx->srcpad, "Sleeping for running time %" GST_STIME_FORMAT " (max %" GST_STIME_FORMAT ")", GST_STIME_ARGS (ctx->out_running_time), GST_STIME_ARGS (splitmux->max_out_running_time)); ctx->out_blocked = TRUE; /* Expand the mq if needed before sleeping */ check_queue_length (splitmux, ctx); GST_SPLITMUX_WAIT (splitmux); ctx->out_blocked = FALSE; GST_INFO_OBJECT (ctx->srcpad, "Woken for new max running time %" GST_STIME_FORMAT, GST_STIME_ARGS (splitmux->max_out_running_time)); } while (1); } static gboolean request_next_keyframe (GstSplitMuxSink * splitmux) { GstEvent *ev; if (splitmux->send_keyframe_requests == FALSE || splitmux->threshold_time == 0 || splitmux->threshold_bytes != 0) return TRUE; ev = gst_video_event_new_upstream_force_key_unit (splitmux->fragment_id * splitmux->threshold_time, TRUE, 0); GST_DEBUG_OBJECT (splitmux, "Requesting next keyframe at %" GST_TIME_FORMAT, GST_TIME_ARGS (splitmux->fragment_id * splitmux->threshold_time)); return gst_pad_push_event (splitmux->reference_ctx->sinkpad, ev); } static GstPadProbeReturn handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx) { GstSplitMuxSink *splitmux = ctx->splitmux; MqStreamBuf *buf_info = NULL; GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type); /* FIXME: Handle buffer lists, until then make it clear they won't work */ if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) { g_warning ("Buffer list handling not implemented"); return GST_PAD_PROBE_DROP; } if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM) { GstEvent *event = gst_pad_probe_info_get_event (info); GST_LOG_OBJECT (pad, "Event %" GST_PTR_FORMAT, event); switch (GST_EVENT_TYPE (event)) { case GST_EVENT_SEGMENT: gst_event_copy_segment (event, &ctx->out_segment); break; case GST_EVENT_FLUSH_STOP: GST_SPLITMUX_LOCK (splitmux); gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED); g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL); g_queue_clear (&ctx->queued_bufs); ctx->flushing = FALSE; GST_SPLITMUX_UNLOCK (splitmux); break; case GST_EVENT_FLUSH_START: GST_SPLITMUX_LOCK (splitmux); GST_LOG_OBJECT (pad, "Flush start"); ctx->flushing = TRUE; GST_SPLITMUX_BROADCAST (splitmux); GST_SPLITMUX_UNLOCK (splitmux); break; case GST_EVENT_EOS: GST_SPLITMUX_LOCK (splitmux); if (splitmux->state == SPLITMUX_STATE_STOPPED) goto beach; ctx->out_eos = TRUE; GST_SPLITMUX_UNLOCK (splitmux); break; case GST_EVENT_GAP:{ GstClockTime gap_ts; GstClockTimeDiff rtime; gst_event_parse_gap (event, &gap_ts, NULL); if (gap_ts == GST_CLOCK_TIME_NONE) break; GST_SPLITMUX_LOCK (splitmux); rtime = my_segment_to_running_time (&ctx->out_segment, gap_ts); GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_STIME_FORMAT, GST_STIME_ARGS (rtime)); if (splitmux->state == SPLITMUX_STATE_STOPPED) goto beach; if (rtime != GST_CLOCK_STIME_NONE) { ctx->out_running_time = rtime; complete_or_wait_on_out (splitmux, ctx); } GST_SPLITMUX_UNLOCK (splitmux); break; } case GST_EVENT_CUSTOM_DOWNSTREAM:{ const GstStructure *s; GstClockTimeDiff ts = 0; s = gst_event_get_structure (event); if (!gst_structure_has_name (s, "splitmuxsink-unblock")) break; gst_structure_get_int64 (s, "timestamp", &ts); GST_SPLITMUX_LOCK (splitmux); if (splitmux->state == SPLITMUX_STATE_STOPPED) goto beach; ctx->out_running_time = ts; complete_or_wait_on_out (splitmux, ctx); GST_SPLITMUX_UNLOCK (splitmux); return GST_PAD_PROBE_DROP; } default: break; } return GST_PAD_PROBE_PASS; } /* Allow everything through until the configured next stopping point */ GST_SPLITMUX_LOCK (splitmux); buf_info = g_queue_pop_tail (&ctx->queued_bufs); if (buf_info == NULL) /* Can only happen due to a poorly timed flush */ goto beach; /* If we have popped a keyframe, decrement the queued_gop count */ if (buf_info->keyframe && splitmux->queued_gops > 0) splitmux->queued_gops--; ctx->out_running_time = buf_info->run_ts; GST_LOG_OBJECT (splitmux, "Pad %" GST_PTR_FORMAT " buffer with run TS %" GST_STIME_FORMAT " size %" G_GUINT64_FORMAT, pad, GST_STIME_ARGS (ctx->out_running_time), buf_info->buf_size); if (splitmux->opening_first_fragment) { if (request_next_keyframe (splitmux) == FALSE) GST_WARNING_OBJECT (splitmux, "Could not request a keyframe. Files may not split at the exact location they should"); send_fragment_opened_closed_msg (splitmux, TRUE); splitmux->opening_first_fragment = FALSE; } complete_or_wait_on_out (splitmux, ctx); if (splitmux->muxed_out_time == GST_CLOCK_STIME_NONE || splitmux->muxed_out_time < buf_info->run_ts) splitmux->muxed_out_time = buf_info->run_ts; splitmux->muxed_out_bytes += buf_info->buf_size; splitmux->last_frame_duration = buf_info->duration; #ifndef GST_DISABLE_GST_DEBUG { GstBuffer *buf = gst_pad_probe_info_get_buffer (info); GST_LOG_OBJECT (pad, "Returning to pass buffer %" GST_PTR_FORMAT " run ts %" GST_STIME_FORMAT, buf, GST_STIME_ARGS (ctx->out_running_time)); } #endif GST_SPLITMUX_UNLOCK (splitmux); mq_stream_buf_free (buf_info); return GST_PAD_PROBE_PASS; beach: GST_SPLITMUX_UNLOCK (splitmux); return GST_PAD_PROBE_DROP; } static gboolean resend_sticky (GstPad * pad, GstEvent ** event, GstPad * peer) { return gst_pad_send_event (peer, gst_event_ref (*event)); } static void restart_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux) { GstPad *peer = gst_pad_get_peer (ctx->srcpad); gst_pad_sticky_events_foreach (ctx->srcpad, (GstPadStickyEventsForeachFunction) (resend_sticky), peer); /* Clear EOS flag if not actually EOS */ ctx->out_eos = GST_PAD_IS_EOS (ctx->srcpad); gst_object_unref (peer); } /* Called with lock held when a fragment * reaches EOS and it is time to restart * a new fragment */ static void start_next_fragment (GstSplitMuxSink * splitmux) { /* 1 change to new file */ splitmux->switching_fragment = TRUE; gst_element_set_locked_state (splitmux->muxer, TRUE); gst_element_set_locked_state (splitmux->active_sink, TRUE); gst_element_set_state (splitmux->muxer, GST_STATE_NULL); gst_element_set_state (splitmux->active_sink, GST_STATE_NULL); set_next_filename (splitmux); gst_element_set_state (splitmux->active_sink, GST_STATE_TARGET (splitmux)); gst_element_set_state (splitmux->muxer, GST_STATE_TARGET (splitmux)); gst_element_set_locked_state (splitmux->muxer, FALSE); gst_element_set_locked_state (splitmux->active_sink, FALSE); splitmux->switching_fragment = FALSE; g_list_foreach (splitmux->contexts, (GFunc) restart_context, splitmux); /* Switch state and go back to processing */ if (!splitmux->reference_ctx->in_eos) { splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START; splitmux->max_out_running_time = splitmux->reference_ctx->in_running_time; } else { splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE; splitmux->max_out_running_time = GST_CLOCK_STIME_NONE; splitmux->have_muxed_something = FALSE; } splitmux->have_muxed_something = (splitmux->reference_ctx->in_running_time > splitmux->muxed_out_time); /* Store the overflow parameters as the basis for the next fragment */ splitmux->mux_start_time = splitmux->muxed_out_time; if (splitmux->last_frame_duration != GST_CLOCK_STIME_NONE) splitmux->mux_start_time += splitmux->last_frame_duration; splitmux->mux_start_bytes = splitmux->muxed_out_bytes; GST_DEBUG_OBJECT (splitmux, "Restarting flow for new fragment. New running time %" GST_STIME_FORMAT, GST_STIME_ARGS (splitmux->max_out_running_time)); send_fragment_opened_closed_msg (splitmux, TRUE); if (request_next_keyframe (splitmux) == FALSE) GST_WARNING_OBJECT (splitmux, "Could not request a keyframe. Files may not split at the exact location they should"); GST_SPLITMUX_BROADCAST (splitmux); } static void bus_handler (GstBin * bin, GstMessage * message) { GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (bin); switch (GST_MESSAGE_TYPE (message)) { case GST_MESSAGE_EOS: /* If the state is draining out the current file, drop this EOS */ GST_SPLITMUX_LOCK (splitmux); send_fragment_opened_closed_msg (splitmux, FALSE); if (splitmux->state == SPLITMUX_STATE_ENDING_FILE && splitmux->max_out_running_time != GST_CLOCK_STIME_NONE) { GST_DEBUG_OBJECT (splitmux, "Caught EOS at end of fragment, dropping"); splitmux->state = SPLITMUX_STATE_START_NEXT_FRAGMENT; GST_SPLITMUX_BROADCAST (splitmux); gst_message_unref (message); GST_SPLITMUX_UNLOCK (splitmux); return; } GST_SPLITMUX_UNLOCK (splitmux); break; case GST_MESSAGE_ASYNC_START: case GST_MESSAGE_ASYNC_DONE: /* Ignore state changes from our children while switching */ if (splitmux->switching_fragment) { if (GST_MESSAGE_SRC (message) == (GstObject *) splitmux->active_sink || GST_MESSAGE_SRC (message) == (GstObject *) splitmux->muxer) { GST_LOG_OBJECT (splitmux, "Ignoring state change from child %" GST_PTR_FORMAT " while switching", GST_MESSAGE_SRC (message)); gst_message_unref (message); return; } } break; default: break; } GST_BIN_CLASS (parent_class)->handle_message (bin, message); } /* Called with splitmux lock held */ /* Called when entering ProcessingCompleteGop state * Assess if mq contents overflowed the current file * -> If yes, need to switch to new file * -> if no, set max_out_running_time to let this GOP in and * go to COLLECTING_GOP_START state */ static void handle_gathered_gop (GstSplitMuxSink * splitmux) { GList *cur; guint64 queued_bytes = 0; GstClockTimeDiff queued_time = 0; /* Assess if the multiqueue contents overflowed the current file */ for (cur = g_list_first (splitmux->contexts); cur != NULL; cur = g_list_next (cur)) { MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data); if (tmpctx->in_running_time > queued_time) queued_time = tmpctx->in_running_time; queued_bytes += tmpctx->in_bytes; } GST_LOG_OBJECT (splitmux, " queued_bytes %" G_GUINT64_FORMAT " splitmuxsink->mux_start_bytes %" G_GUINT64_FORMAT, queued_bytes, splitmux->mux_start_bytes); g_assert (queued_bytes >= splitmux->mux_start_bytes); g_assert (queued_time >= splitmux->mux_start_time); queued_bytes -= splitmux->mux_start_bytes; queued_time -= splitmux->mux_start_time; /* Expand queued bytes estimate by muxer overhead */ queued_bytes += (queued_bytes * splitmux->mux_overhead); GST_LOG_OBJECT (splitmux, "mq at TS %" GST_STIME_FORMAT " bytes %" G_GUINT64_FORMAT, GST_STIME_ARGS (queued_time), queued_bytes); /* Check for overrun - have we output at least one byte and overrun * either threshold? */ if ((splitmux->have_muxed_something && ((splitmux->threshold_bytes > 0 && queued_bytes > splitmux->threshold_bytes) || (splitmux->threshold_time > 0 && queued_time > splitmux->threshold_time)))) { splitmux->state = SPLITMUX_STATE_ENDING_FILE; GST_INFO_OBJECT (splitmux, "mq overflowed since last, draining out. max out TS is %" GST_STIME_FORMAT, GST_STIME_ARGS (splitmux->max_out_running_time)); GST_SPLITMUX_BROADCAST (splitmux); } else { /* No overflow */ GST_LOG_OBJECT (splitmux, "This GOP didn't overflow the fragment. Bytes sent %" G_GUINT64_FORMAT " queued %" G_GUINT64_FORMAT " time %" GST_STIME_FORMAT " Continuing.", splitmux->muxed_out_bytes - splitmux->mux_start_bytes, queued_bytes, GST_STIME_ARGS (queued_time)); /* Wake everyone up to push this one GOP, then sleep */ splitmux->have_muxed_something = TRUE; if (!splitmux->reference_ctx->in_eos) { splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START; splitmux->max_out_running_time = splitmux->reference_ctx->in_running_time; } else { splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE; splitmux->max_out_running_time = GST_CLOCK_STIME_NONE; } GST_LOG_OBJECT (splitmux, "Waking output for complete GOP, TS %" GST_STIME_FORMAT, GST_STIME_ARGS (splitmux->max_out_running_time)); GST_SPLITMUX_BROADCAST (splitmux); } } /* Called with splitmux lock held */ /* Called from each input pad when it is has all the pieces * for a GOP or EOS, starting with the reference pad which has set the * splitmux->max_in_running_time */ static void check_completed_gop (GstSplitMuxSink * splitmux, MqStreamCtx * ctx) { GList *cur; gboolean ready = TRUE; GstClockTimeDiff current_max_in_running_time; if (splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE) { /* Iterate each pad, and check that the input running time is at least * up to the reference running time, and if so handle the collected GOP */ GST_LOG_OBJECT (splitmux, "Checking GOP collected, Max in running time %" GST_STIME_FORMAT " ctx %p", GST_STIME_ARGS (splitmux->max_in_running_time), ctx); for (cur = g_list_first (splitmux->contexts); cur != NULL; cur = g_list_next (cur)) { MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data); GST_LOG_OBJECT (splitmux, "Context %p (src pad %" GST_PTR_FORMAT ") TS %" GST_STIME_FORMAT " EOS %d", tmpctx, tmpctx->srcpad, GST_STIME_ARGS (tmpctx->in_running_time), tmpctx->in_eos); if (splitmux->max_in_running_time != G_MAXINT64 && tmpctx->in_running_time < splitmux->max_in_running_time && !tmpctx->in_eos) { GST_LOG_OBJECT (splitmux, "Context %p (src pad %" GST_PTR_FORMAT ") not ready. We'll sleep", tmpctx, tmpctx->srcpad); ready = FALSE; break; } } if (ready) { GST_DEBUG_OBJECT (splitmux, "Collected GOP is complete. Processing (ctx %p)", ctx); /* All pads have a complete GOP, release it into the multiqueue */ handle_gathered_gop (splitmux); } } /* If upstream reached EOS we are not expecting more data, no need to wait * here. */ if (ctx->in_eos) return; /* Some pad is not yet ready, or GOP is being pushed * either way, sleep and wait to get woken */ current_max_in_running_time = splitmux->max_in_running_time; while ((splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE || splitmux->state == SPLITMUX_STATE_START_NEXT_FRAGMENT) && !ctx->flushing && (current_max_in_running_time == splitmux->max_in_running_time)) { GST_LOG_OBJECT (splitmux, "Sleeping for %s (ctx %p)", splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE ? "GOP complete" : "EOF draining", ctx); GST_SPLITMUX_WAIT (splitmux); GST_LOG_OBJECT (splitmux, "Done waiting for complete GOP (ctx %p)", ctx); } } static void check_queue_length (GstSplitMuxSink * splitmux, MqStreamCtx * ctx) { GList *cur; guint cur_len = g_queue_get_length (&ctx->queued_bufs); GST_DEBUG_OBJECT (ctx->sinkpad, "Checking queue length len %u cur_max %u queued gops %u", cur_len, splitmux->mq_max_buffers, splitmux->queued_gops); if (cur_len >= splitmux->mq_max_buffers) { gboolean allow_grow = FALSE; /* If collecting a GOP and this pad might block, * and there isn't already a pending GOP in the queue * then grow */ if (splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE && ctx->in_running_time < splitmux->max_in_running_time && splitmux->queued_gops <= 1) { allow_grow = TRUE; } else if (splitmux->state == SPLITMUX_STATE_COLLECTING_GOP_START && ctx->is_reference && splitmux->queued_gops <= 1) { allow_grow = TRUE; } if (!allow_grow) { for (cur = g_list_first (splitmux->contexts); cur != NULL; cur = g_list_next (cur)) { MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data); GST_DEBUG_OBJECT (tmpctx->sinkpad, " len %u out_blocked %d", g_queue_get_length (&tmpctx->queued_bufs), tmpctx->out_blocked); /* If another stream is starving, grow */ if (tmpctx != ctx && g_queue_get_length (&tmpctx->queued_bufs) < 1) { allow_grow = TRUE; } } } if (allow_grow) { splitmux->mq_max_buffers = cur_len + 1; GST_INFO_OBJECT (splitmux, "Multiqueue overrun - enlarging to %u buffers ctx %p", splitmux->mq_max_buffers, ctx); g_object_set (splitmux->mq, "max-size-buffers", splitmux->mq_max_buffers, NULL); } } } static GstPadProbeReturn handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx) { GstSplitMuxSink *splitmux = ctx->splitmux; GstBuffer *buf; MqStreamBuf *buf_info = NULL; GstClockTime ts; gboolean loop_again; gboolean keyframe = FALSE; GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type); /* FIXME: Handle buffer lists, until then make it clear they won't work */ if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) { g_warning ("Buffer list handling not implemented"); return GST_PAD_PROBE_DROP; } if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM) { GstEvent *event = gst_pad_probe_info_get_event (info); switch (GST_EVENT_TYPE (event)) { case GST_EVENT_SEGMENT: gst_event_copy_segment (event, &ctx->in_segment); break; case GST_EVENT_FLUSH_STOP: GST_SPLITMUX_LOCK (splitmux); gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED); ctx->in_eos = FALSE; ctx->in_bytes = 0; ctx->in_running_time = GST_CLOCK_STIME_NONE; GST_SPLITMUX_UNLOCK (splitmux); break; case GST_EVENT_EOS: GST_SPLITMUX_LOCK (splitmux); ctx->in_eos = TRUE; if (splitmux->state == SPLITMUX_STATE_STOPPED) goto beach; if (ctx->is_reference) { GST_INFO_OBJECT (splitmux, "Got Reference EOS. Finishing up"); /* Act as if this is a new keyframe with infinite timestamp */ splitmux->max_in_running_time = G_MAXINT64; splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE; /* Wake up other input pads to collect this GOP */ GST_SPLITMUX_BROADCAST (splitmux); check_completed_gop (splitmux, ctx); } else if (splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE) { /* If we are waiting for a GOP to be completed (ie, for aux * pads to catch up), then this pad is complete, so check * if the whole GOP is. */ check_completed_gop (splitmux, ctx); } GST_SPLITMUX_UNLOCK (splitmux); break; default: break; } return GST_PAD_PROBE_PASS; } buf = gst_pad_probe_info_get_buffer (info); buf_info = mq_stream_buf_new (); if (GST_BUFFER_PTS_IS_VALID (buf)) ts = GST_BUFFER_PTS (buf); else ts = GST_BUFFER_DTS (buf); GST_LOG_OBJECT (pad, "Buffer TS is %" GST_TIME_FORMAT, GST_TIME_ARGS (ts)); GST_SPLITMUX_LOCK (splitmux); if (splitmux->state == SPLITMUX_STATE_STOPPED) goto beach; /* If this buffer has a timestamp, advance the input timestamp of the * stream */ if (GST_CLOCK_TIME_IS_VALID (ts)) { GstClockTimeDiff running_time = my_segment_to_running_time (&ctx->in_segment, ts); GST_LOG_OBJECT (pad, "Buffer running TS is %" GST_STIME_FORMAT, GST_STIME_ARGS (running_time)); if (GST_CLOCK_STIME_IS_VALID (running_time) && running_time > ctx->in_running_time) ctx->in_running_time = running_time; } /* Try to make sure we have a valid running time */ if (!GST_CLOCK_STIME_IS_VALID (ctx->in_running_time)) { ctx->in_running_time = my_segment_to_running_time (&ctx->in_segment, ctx->in_segment.start); } GST_LOG_OBJECT (pad, "in running time now %" GST_STIME_FORMAT, GST_STIME_ARGS (ctx->in_running_time)); buf_info->run_ts = ctx->in_running_time; buf_info->buf_size = gst_buffer_get_size (buf); buf_info->duration = GST_BUFFER_DURATION (buf); /* Update total input byte counter for overflow detect */ ctx->in_bytes += buf_info->buf_size; /* initialize mux_start_time */ if (ctx->is_reference && splitmux->mux_start_time == GST_CLOCK_STIME_NONE) { splitmux->mux_start_time = buf_info->run_ts; GST_LOG_OBJECT (splitmux, "Mux start time now %" GST_STIME_FORMAT, GST_STIME_ARGS (splitmux->mux_start_time)); /* Also take this as the first start time when starting up, * so that we start counting overflow from the first frame */ if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time)) splitmux->max_in_running_time = splitmux->mux_start_time; } GST_DEBUG_OBJECT (pad, "Buf TS %" GST_STIME_FORMAT " total in_bytes %" G_GUINT64_FORMAT, GST_STIME_ARGS (buf_info->run_ts), ctx->in_bytes); loop_again = TRUE; do { if (ctx->flushing) break; switch (splitmux->state) { case SPLITMUX_STATE_COLLECTING_GOP_START: if (ctx->is_reference) { /* If a keyframe, we have a complete GOP */ if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT) || !GST_CLOCK_STIME_IS_VALID (ctx->in_running_time) || splitmux->max_in_running_time >= ctx->in_running_time) { /* Pass this buffer through */ loop_again = FALSE; break; } GST_INFO_OBJECT (pad, "Have keyframe with running time %" GST_STIME_FORMAT, GST_STIME_ARGS (ctx->in_running_time)); keyframe = TRUE; splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE; splitmux->max_in_running_time = ctx->in_running_time; /* Wake up other input pads to collect this GOP */ GST_SPLITMUX_BROADCAST (splitmux); check_completed_gop (splitmux, ctx); } else { /* We're still waiting for a keyframe on the reference pad, sleep */ GST_LOG_OBJECT (pad, "Sleeping for GOP start"); GST_SPLITMUX_WAIT (splitmux); GST_LOG_OBJECT (pad, "Done sleeping for GOP start state now %d", splitmux->state); } break; case SPLITMUX_STATE_WAITING_GOP_COMPLETE: /* If we overran the target timestamp, it might be time to process * the GOP, otherwise bail out for more data */ GST_LOG_OBJECT (pad, "Checking TS %" GST_STIME_FORMAT " against max %" GST_STIME_FORMAT, GST_STIME_ARGS (ctx->in_running_time), GST_STIME_ARGS (splitmux->max_in_running_time)); if (ctx->in_running_time < splitmux->max_in_running_time) { loop_again = FALSE; break; } GST_LOG_OBJECT (pad, "Collected last packet of GOP. Checking other pads"); check_completed_gop (splitmux, ctx); break; case SPLITMUX_STATE_ENDING_FILE:{ GstEvent *event; /* If somes streams received no buffer during the last GOP that overran, * because its next buffer has a timestamp bigger than * ctx->max_in_running_time, its queue is empty. In that case the only * way to wakeup the output thread is by injecting an event in the * queue. This usually happen with subtitle streams. * See https://bugzilla.gnome.org/show_bug.cgi?id=763711. */ GST_LOG_OBJECT (pad, "Sending splitmuxsink-unblock event"); event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM | GST_EVENT_TYPE_SERIALIZED, gst_structure_new ("splitmuxsink-unblock", "timestamp", G_TYPE_INT64, splitmux->max_in_running_time, NULL)); GST_SPLITMUX_UNLOCK (splitmux); gst_pad_send_event (ctx->sinkpad, event); GST_SPLITMUX_LOCK (splitmux); /* state may have changed while we were unlocked. Loop again if so */ if (splitmux->state != SPLITMUX_STATE_ENDING_FILE) break; /* fallthrough */ } case SPLITMUX_STATE_START_NEXT_FRAGMENT: /* A fragment is ending, wait until that's done before continuing */ GST_DEBUG_OBJECT (pad, "Sleeping for fragment restart"); GST_SPLITMUX_WAIT (splitmux); GST_DEBUG_OBJECT (pad, "Done sleeping for fragment restart state now %d", splitmux->state); break; default: loop_again = FALSE; break; } } while (loop_again); if (keyframe) { splitmux->queued_gops++; buf_info->keyframe = TRUE; } /* Now add this buffer to the queue just before returning */ g_queue_push_head (&ctx->queued_bufs, buf_info); /* Check the buffer will fit in the mq */ check_queue_length (splitmux, ctx); GST_LOG_OBJECT (pad, "Returning to queue buffer %" GST_PTR_FORMAT " run ts %" GST_STIME_FORMAT, buf, GST_STIME_ARGS (ctx->in_running_time)); GST_SPLITMUX_UNLOCK (splitmux); return GST_PAD_PROBE_PASS; beach: GST_SPLITMUX_UNLOCK (splitmux); if (buf_info) mq_stream_buf_free (buf_info); return GST_PAD_PROBE_PASS; } static GstPad * gst_splitmux_sink_request_new_pad (GstElement * element, GstPadTemplate * templ, const gchar * name, const GstCaps * caps) { GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element; GstPadTemplate *mux_template = NULL; GstPad *res = NULL; GstPad *mq_sink, *mq_src; gchar *gname; gboolean is_video = FALSE; MqStreamCtx *ctx; GST_DEBUG_OBJECT (element, "templ:%s, name:%s", templ->name_template, name); GST_SPLITMUX_LOCK (splitmux); if (!create_elements (splitmux)) goto fail; if (templ->name_template) { if (g_str_equal (templ->name_template, "video")) { if (splitmux->have_video) goto already_have_video; /* FIXME: Look for a pad template with matching caps, rather than by name */ mux_template = gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS (splitmux->muxer), "video_%u"); is_video = TRUE; name = NULL; } else { mux_template = gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS (splitmux->muxer), templ->name_template); } if (mux_template == NULL) { /* Fallback to find sink pad templates named 'sink_%d' (mpegtsmux) */ mux_template = gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS (splitmux->muxer), "sink_%d"); } } res = gst_element_request_pad (splitmux->muxer, mux_template, name, caps); if (res == NULL) goto fail; if (is_video) gname = g_strdup ("video"); else if (name == NULL) gname = gst_pad_get_name (res); else gname = g_strdup (name); if (!get_pads_from_mq (splitmux, &mq_sink, &mq_src)) { gst_element_release_request_pad (splitmux->muxer, res); gst_object_unref (GST_OBJECT (res)); goto fail; } if (gst_pad_link (mq_src, res) != GST_PAD_LINK_OK) { gst_element_release_request_pad (splitmux->muxer, res); gst_object_unref (GST_OBJECT (res)); gst_element_release_request_pad (splitmux->mq, mq_sink); gst_object_unref (GST_OBJECT (mq_sink)); goto fail; } gst_object_unref (GST_OBJECT (res)); ctx = mq_stream_ctx_new (splitmux); ctx->srcpad = mq_src; ctx->sinkpad = mq_sink; mq_stream_ctx_ref (ctx); ctx->src_pad_block_id = gst_pad_add_probe (mq_src, GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM, (GstPadProbeCallback) handle_mq_output, ctx, (GDestroyNotify) _pad_block_destroy_src_notify); if (is_video && splitmux->reference_ctx != NULL) { splitmux->reference_ctx->is_reference = FALSE; splitmux->reference_ctx = NULL; } if (splitmux->reference_ctx == NULL) { splitmux->reference_ctx = ctx; ctx->is_reference = TRUE; } res = gst_ghost_pad_new_from_template (gname, mq_sink, templ); g_object_set_qdata ((GObject *) (res), PAD_CONTEXT, ctx); mq_stream_ctx_ref (ctx); ctx->sink_pad_block_id = gst_pad_add_probe (mq_sink, GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM, (GstPadProbeCallback) handle_mq_input, ctx, (GDestroyNotify) _pad_block_destroy_sink_notify); GST_DEBUG_OBJECT (splitmux, "Request pad %" GST_PTR_FORMAT " is mq pad %" GST_PTR_FORMAT, res, mq_sink); splitmux->contexts = g_list_prepend (splitmux->contexts, ctx); g_free (gname); gst_object_unref (mq_sink); gst_object_unref (mq_src); if (is_video) splitmux->have_video = TRUE; gst_pad_set_active (res, TRUE); gst_element_add_pad (element, res); GST_SPLITMUX_UNLOCK (splitmux); return res; fail: GST_SPLITMUX_UNLOCK (splitmux); return NULL; already_have_video: GST_DEBUG_OBJECT (splitmux, "video sink pad already requested"); GST_SPLITMUX_UNLOCK (splitmux); return NULL; } static void gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad) { GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element; GstPad *mqsink, *mqsrc, *muxpad; MqStreamCtx *ctx = (MqStreamCtx *) (g_object_get_qdata ((GObject *) (pad), PAD_CONTEXT)); GST_SPLITMUX_LOCK (splitmux); if (splitmux->muxer == NULL || splitmux->mq == NULL) goto fail; /* Elements don't exist yet - nothing to release */ GST_INFO_OBJECT (pad, "releasing request pad"); mqsink = gst_ghost_pad_get_target (GST_GHOST_PAD (pad)); mqsrc = mq_sink_to_src (splitmux->mq, mqsink); muxpad = gst_pad_get_peer (mqsrc); /* Remove the context from our consideration */ splitmux->contexts = g_list_remove (splitmux->contexts, ctx); if (ctx->sink_pad_block_id) gst_pad_remove_probe (ctx->sinkpad, ctx->sink_pad_block_id); if (ctx->src_pad_block_id) gst_pad_remove_probe (ctx->srcpad, ctx->src_pad_block_id); /* Can release the context now */ mq_stream_ctx_unref (ctx); if (ctx == splitmux->reference_ctx) splitmux->reference_ctx = NULL; /* Release and free the mq input */ gst_element_release_request_pad (splitmux->mq, mqsink); /* Release and free the muxer input */ gst_element_release_request_pad (splitmux->muxer, muxpad); gst_object_unref (mqsink); gst_object_unref (mqsrc); gst_object_unref (muxpad); if (GST_PAD_PAD_TEMPLATE (pad) && g_str_equal (GST_PAD_TEMPLATE_NAME_TEMPLATE (GST_PAD_PAD_TEMPLATE (pad)), "video")) splitmux->have_video = FALSE; gst_element_remove_pad (element, pad); /* Reset the internal elements only after all request pads are released */ if (splitmux->contexts == NULL) gst_splitmux_reset (splitmux); fail: GST_SPLITMUX_UNLOCK (splitmux); } static GstElement * create_element (GstSplitMuxSink * splitmux, const gchar * factory, const gchar * name) { GstElement *ret = gst_element_factory_make (factory, name); if (ret == NULL) { g_warning ("Failed to create %s - splitmuxsink will not work", name); return NULL; } if (!gst_bin_add (GST_BIN (splitmux), ret)) { g_warning ("Could not add %s element - splitmuxsink will not work", name); gst_object_unref (ret); return NULL; } return ret; } static gboolean create_elements (GstSplitMuxSink * splitmux) { /* Create internal elements */ if (splitmux->mq == NULL) { if ((splitmux->mq = create_element (splitmux, "multiqueue", "multiqueue")) == NULL) goto fail; splitmux->mq_max_buffers = 5; /* No bytes or time limit, we limit buffers manually */ g_object_set (splitmux->mq, "max-size-bytes", 0, "max-size-time", (guint64) 0, "max-size-buffers", splitmux->mq_max_buffers, NULL); } if (splitmux->muxer == NULL) { GstElement *provided_muxer = NULL; GST_OBJECT_LOCK (splitmux); if (splitmux->provided_muxer != NULL) provided_muxer = gst_object_ref (splitmux->provided_muxer); GST_OBJECT_UNLOCK (splitmux); if (provided_muxer == NULL) { if ((splitmux->muxer = create_element (splitmux, "mp4mux", "muxer")) == NULL) goto fail; } else { /* Ensure it's not in locked state (we might be reusing an old element) */ gst_element_set_locked_state (provided_muxer, FALSE); if (!gst_bin_add (GST_BIN (splitmux), provided_muxer)) { g_warning ("Could not add muxer element - splitmuxsink will not work"); gst_object_unref (provided_muxer); goto fail; } splitmux->muxer = provided_muxer; gst_object_unref (provided_muxer); } } return TRUE; fail: return FALSE; } static GstElement * find_sink (GstElement * e) { GstElement *res = NULL; GstIterator *iter; gboolean done = FALSE; GValue data = { 0, }; if (!GST_IS_BIN (e)) return e; iter = gst_bin_iterate_sinks (GST_BIN (e)); while (!done) { switch (gst_iterator_next (iter, &data)) { case GST_ITERATOR_OK: { GstElement *child = g_value_get_object (&data); if (g_object_class_find_property (G_OBJECT_GET_CLASS (child), "location") != NULL) { res = child; done = TRUE; } g_value_reset (&data); break; } case GST_ITERATOR_RESYNC: gst_iterator_resync (iter); break; case GST_ITERATOR_DONE: done = TRUE; break; case GST_ITERATOR_ERROR: g_assert_not_reached (); break; } } g_value_unset (&data); gst_iterator_free (iter); return res; } static gboolean create_sink (GstSplitMuxSink * splitmux) { GstElement *provided_sink = NULL; if (splitmux->active_sink == NULL) { GST_OBJECT_LOCK (splitmux); if (splitmux->provided_sink != NULL) provided_sink = gst_object_ref (splitmux->provided_sink); GST_OBJECT_UNLOCK (splitmux); if (provided_sink == NULL) { if ((splitmux->sink = create_element (splitmux, DEFAULT_SINK, "sink")) == NULL) goto fail; splitmux->active_sink = splitmux->sink; } else { /* Ensure it's not in locked state (we might be reusing an old element) */ gst_element_set_locked_state (provided_sink, FALSE); if (!gst_bin_add (GST_BIN (splitmux), provided_sink)) { g_warning ("Could not add sink elements - splitmuxsink will not work"); gst_object_unref (provided_sink); goto fail; } splitmux->active_sink = provided_sink; /* The bin holds a ref now, we can drop our tmp ref */ gst_object_unref (provided_sink); /* Find the sink element */ splitmux->sink = find_sink (splitmux->active_sink); if (splitmux->sink == NULL) { g_warning ("Could not locate sink element in provided sink - splitmuxsink will not work"); goto fail; } } if (!gst_element_link (splitmux->muxer, splitmux->active_sink)) { g_warning ("Failed to link muxer and sink- splitmuxsink will not work"); goto fail; } } return TRUE; fail: return FALSE; } #ifdef __GNUC__ #pragma GCC diagnostic ignored "-Wformat-nonliteral" #endif static void set_next_filename (GstSplitMuxSink * splitmux) { gchar *fname = NULL; gst_splitmux_sink_ensure_max_files (splitmux); g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION], 0, splitmux->fragment_id, &fname); if (!fname) fname = splitmux->location ? g_strdup_printf (splitmux->location, splitmux->fragment_id) : NULL; if (fname) { GST_INFO_OBJECT (splitmux, "Setting file to %s", fname); g_object_set (splitmux->sink, "location", fname, NULL); g_free (fname); splitmux->fragment_id++; } } static GstStateChangeReturn gst_splitmux_sink_change_state (GstElement * element, GstStateChange transition) { GstStateChangeReturn ret; GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element; switch (transition) { case GST_STATE_CHANGE_NULL_TO_READY:{ GST_SPLITMUX_LOCK (splitmux); if (!create_elements (splitmux) || !create_sink (splitmux)) { ret = GST_STATE_CHANGE_FAILURE; GST_SPLITMUX_UNLOCK (splitmux); goto beach; } GST_SPLITMUX_UNLOCK (splitmux); splitmux->fragment_id = 0; set_next_filename (splitmux); break; } case GST_STATE_CHANGE_READY_TO_PAUSED:{ GST_SPLITMUX_LOCK (splitmux); /* Start by collecting one input on each pad */ splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START; splitmux->max_in_running_time = GST_CLOCK_STIME_NONE; splitmux->muxed_out_time = splitmux->mux_start_time = splitmux->last_frame_duration = GST_CLOCK_STIME_NONE; splitmux->muxed_out_bytes = splitmux->mux_start_bytes = 0; splitmux->opening_first_fragment = TRUE; GST_SPLITMUX_UNLOCK (splitmux); break; } case GST_STATE_CHANGE_PAUSED_TO_READY: case GST_STATE_CHANGE_READY_TO_NULL: GST_SPLITMUX_LOCK (splitmux); splitmux->state = SPLITMUX_STATE_STOPPED; /* Wake up any blocked threads */ GST_LOG_OBJECT (splitmux, "State change -> NULL or READY. Waking threads"); GST_SPLITMUX_BROADCAST (splitmux); GST_SPLITMUX_UNLOCK (splitmux); break; default: break; } ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition); if (ret == GST_STATE_CHANGE_FAILURE) goto beach; switch (transition) { case GST_STATE_CHANGE_READY_TO_NULL: GST_SPLITMUX_LOCK (splitmux); splitmux->fragment_id = 0; /* Reset internal elements only if no pad contexts are using them */ if (splitmux->contexts == NULL) gst_splitmux_reset (splitmux); GST_SPLITMUX_UNLOCK (splitmux); break; default: break; } beach: if (transition == GST_STATE_CHANGE_NULL_TO_READY && ret == GST_STATE_CHANGE_FAILURE) { /* Cleanup elements on failed transition out of NULL */ gst_splitmux_reset (splitmux); } return ret; } gboolean register_splitmuxsink (GstPlugin * plugin) { GST_DEBUG_CATEGORY_INIT (splitmux_debug, "splitmuxsink", 0, "Split File Muxing Sink"); return gst_element_register (plugin, "splitmuxsink", GST_RANK_NONE, GST_TYPE_SPLITMUX_SINK); } static void gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux) { if (splitmux->max_files && splitmux->fragment_id >= splitmux->max_files) { splitmux->fragment_id = 0; } }