From 3121eeeb08799824a5812e22448902aa7ce89d07 Mon Sep 17 00:00:00 2001 From: Jan Schmidt Date: Fri, 10 May 2024 23:25:51 +1000 Subject: [PATCH] splitmuxsrc: Allow adding fragments during playback Trigger measurement / inclusion of new fragments into the playback timeline if they are added after the element is already running. Part-of: --- .../gst/multifile/gstsplitmuxsrc.c | 49 +++- .../gst/multifile/gstsplitmuxsrc.h | 1 + .../tests/examples/splitmux/meson.build | 6 + .../splitmux/splitmux-record-and-play-live.c | 245 ++++++++++++++++++ 4 files changed, 288 insertions(+), 13 deletions(-) create mode 100644 subprojects/gst-plugins-good/tests/examples/splitmux/splitmux-record-and-play-live.c diff --git a/subprojects/gst-plugins-good/gst/multifile/gstsplitmuxsrc.c b/subprojects/gst-plugins-good/gst/multifile/gstsplitmuxsrc.c index 5416a43049..8f19cc3352 100644 --- a/subprojects/gst-plugins-good/gst/multifile/gstsplitmuxsrc.c +++ b/subprojects/gst-plugins-good/gst/multifile/gstsplitmuxsrc.c @@ -289,8 +289,8 @@ gst_splitmux_src_class_init (GstSplitMuxSrcClass * klass) G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_CALLBACK (gst_splitmuxsrc_add_fragment), NULL, NULL, NULL, - G_TYPE_BOOLEAN, 3, G_TYPE_STRING, GST_TYPE_CLOCK_TIME, - GST_TYPE_CLOCK_TIME); + G_TYPE_BOOLEAN, 3, G_TYPE_STRING | G_SIGNAL_TYPE_STATIC_SCOPE, + GST_TYPE_CLOCK_TIME, GST_TYPE_CLOCK_TIME); } static void @@ -517,10 +517,9 @@ gst_splitmux_part_measured_cb (GstSplitMuxPartReader * part, splitmux->total_duration += gst_splitmux_part_reader_get_duration (splitmux->parts[idx]); splitmux->play_segment.duration = splitmux->total_duration; - GST_OBJECT_UNLOCK (splitmux); - splitmux->end_offset = gst_splitmux_part_reader_get_end_offset (splitmux->parts[idx]); + GST_OBJECT_UNLOCK (splitmux); GST_DEBUG_OBJECT (splitmux, "Duration %" GST_TIME_FORMAT ", total duration now: %" GST_TIME_FORMAT @@ -529,6 +528,7 @@ gst_splitmux_part_measured_cb (GstSplitMuxPartReader * part, [idx])), GST_TIME_ARGS (splitmux->total_duration), GST_TIME_ARGS (splitmux->end_offset)); + SPLITMUX_SRC_LOCK (splitmux); splitmux->num_measured_parts++; /* If we're done or preparing the next part fails, finish here */ @@ -537,14 +537,18 @@ gst_splitmux_part_measured_cb (GstSplitMuxPartReader * part, /* Store how many parts we actually prepared in the end */ splitmux->num_parts = splitmux->num_measured_parts; - /* All done preparing, activate the first part */ - GST_INFO_OBJECT (splitmux, - "All parts measured. Total duration %" GST_TIME_FORMAT - " Activating first part", GST_TIME_ARGS (splitmux->total_duration)); - gst_element_call_async (GST_ELEMENT_CAST (splitmux), - (GstElementCallAsyncFunc) gst_splitmux_src_activate_first_part, - NULL, NULL); + if (!splitmux->did_initial_measuring) { + /* All done preparing, activate the first part if this was the initial measurement phase */ + GST_INFO_OBJECT (splitmux, + "All parts measured. Total duration %" GST_TIME_FORMAT + " Activating first part", GST_TIME_ARGS (splitmux->total_duration)); + gst_element_call_async (GST_ELEMENT_CAST (splitmux), + (GstElementCallAsyncFunc) gst_splitmux_src_activate_first_part, + NULL, NULL); + } + splitmux->did_initial_measuring = TRUE; } + SPLITMUX_SRC_UNLOCK (splitmux); } static GstBusSyncReply @@ -561,6 +565,7 @@ gst_splitmux_part_bus_handler (GstBus * bus, GstMessage * msg, GST_ERROR_OBJECT (splitmux, "Got error message from part %" GST_PTR_FORMAT ": %" GST_PTR_FORMAT, GST_MESSAGE_SRC (msg), msg); + SPLITMUX_SRC_LOCK (splitmux); if (splitmux->num_measured_parts < splitmux->num_parts) { guint idx = splitmux->num_measured_parts; @@ -582,9 +587,8 @@ gst_splitmux_part_bus_handler (GstBus * bus, GstMessage * msg, /* Store how many parts we actually prepared in the end */ splitmux->num_parts = splitmux->num_measured_parts; - do_async_done (splitmux); - if (idx > 0) { + if (idx > 0 && !splitmux->did_initial_measuring) { /* All done preparing, activate the first part */ GST_INFO_OBJECT (splitmux, "All parts prepared. Total duration %" GST_TIME_FORMAT @@ -594,7 +598,13 @@ gst_splitmux_part_bus_handler (GstBus * bus, GstMessage * msg, (GstElementCallAsyncFunc) gst_splitmux_src_activate_first_part, NULL, NULL); } + splitmux->did_initial_measuring = TRUE; + SPLITMUX_SRC_UNLOCK (splitmux); + + do_async_done (splitmux); } else { + SPLITMUX_SRC_UNLOCK (splitmux); + /* Need to update the message source so that it's part of the element * hierarchy the application would expect */ msg = gst_message_copy (msg); @@ -1051,10 +1061,14 @@ gst_splitmux_src_measure_next_part (GstSplitMuxSrc * splitmux) return TRUE; } + GST_OBJECT_LOCK (splitmux); + /* Get the end offset (start offset of the next piece) */ end_offset = gst_splitmux_part_reader_get_end_offset (reader); splitmux->total_duration += gst_splitmux_part_reader_get_duration (reader); splitmux->num_measured_parts++; + + GST_OBJECT_UNLOCK (splitmux); } return TRUE; @@ -1228,6 +1242,7 @@ gst_splitmux_src_stop (GstSplitMuxSrc * splitmux) splitmux->parts = NULL; splitmux->num_parts = 0; splitmux->num_measured_parts = 0; + splitmux->did_initial_measuring = FALSE; splitmux->num_parts_alloced = 0; splitmux->total_duration = GST_CLOCK_TIME_NONE; @@ -1783,6 +1798,14 @@ gst_splitmuxsrc_add_fragment (GstSplitMuxSrc * splitmux, splitmux->parts[splitmux->num_parts] = reader; splitmux->num_parts++; + /* If we already did the initial measuring, and we added a new first part here, + * call 'measure_next_part' to get it measured / added to our duration */ + if (splitmux->did_initial_measuring + && (splitmux->num_measured_parts + 1) == splitmux->num_parts) { + if (!gst_splitmux_src_measure_next_part (splitmux)) { + } + } + SPLITMUX_SRC_UNLOCK (splitmux); return TRUE; } diff --git a/subprojects/gst-plugins-good/gst/multifile/gstsplitmuxsrc.h b/subprojects/gst-plugins-good/gst/multifile/gstsplitmuxsrc.h index cb39c54962..8d89e44afe 100644 --- a/subprojects/gst-plugins-good/gst/multifile/gstsplitmuxsrc.h +++ b/subprojects/gst-plugins-good/gst/multifile/gstsplitmuxsrc.h @@ -46,6 +46,7 @@ struct _GstSplitMuxSrc GMutex lock; GMutex msg_lock; gboolean running; + gboolean did_initial_measuring; gchar *location; /* OBJECT_LOCK */ diff --git a/subprojects/gst-plugins-good/tests/examples/splitmux/meson.build b/subprojects/gst-plugins-good/tests/examples/splitmux/meson.build index 7f168da8ca..38d4e45586 100644 --- a/subprojects/gst-plugins-good/tests/examples/splitmux/meson.build +++ b/subprojects/gst-plugins-good/tests/examples/splitmux/meson.build @@ -15,3 +15,9 @@ executable('splitmuxsrc-add-fragment', 'splitmuxsrc-add-fragment.c', c_args : gst_plugins_good_args, include_directories : [configinc], install: false) + +executable('splitmux-record-and-play-live', 'splitmux-record-and-play-live.c', + dependencies: [gst_dep], + c_args : gst_plugins_good_args, + include_directories : [configinc], + install: false) diff --git a/subprojects/gst-plugins-good/tests/examples/splitmux/splitmux-record-and-play-live.c b/subprojects/gst-plugins-good/tests/examples/splitmux/splitmux-record-and-play-live.c new file mode 100644 index 0000000000..7c45edd940 --- /dev/null +++ b/subprojects/gst-plugins-good/tests/examples/splitmux/splitmux-record-and-play-live.c @@ -0,0 +1,245 @@ +/* GStreamer + * Copyright (C) 2024 Jan Schmidt + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +/* + * This example creates a test recording using splitmuxsink, + * listening for the fragment-closed messages from splitmuxsink + * and using them to pass fragments to splitmuxsrc for live playback + * as fragments are generated + */ +#include +#include +#include +#include + +typedef struct +{ + GMainLoop *loop; + gboolean running; + + GstElement *record_pipe; + GstElement *playback_pipe; + GstElement *splitmuxsrc; + + gboolean playback_started; + gsize num_fragments; + + GMutex lock; + GCond cond; + + // Pending fragment info for initial fragment + const gchar *fname; + GstClockTime start_offset; + GstClockTime duration; +} State; + +static gboolean +record_message_handler (GstBus * bus, GstMessage * message, gpointer data) +{ + State *state = data; + + if (message->type == GST_MESSAGE_ELEMENT) { + const GstStructure *s = gst_message_get_structure (message); + const gchar *name = gst_structure_get_name (s); + + if (strcmp (name, "splitmuxsink-fragment-closed") == 0) { + const gchar *fname = gst_structure_get_string (s, "location"); + GstClockTime start_offset, duration; + if (!gst_structure_get_uint64 (s, "fragment-offset", &start_offset) || + !gst_structure_get_uint64 (s, "fragment-duration", &duration)) { + g_assert_not_reached (); + } + + g_mutex_lock (&state->lock); + + if (!state->playback_started) { + g_print ("Finished first fragment. Starting playback\n"); + + state->fname = fname; + state->start_offset = start_offset; + state->duration = duration; + + g_mutex_unlock (&state->lock); + gst_element_set_state (state->playback_pipe, GST_STATE_PLAYING); + g_mutex_lock (&state->lock); + + state->playback_started = TRUE; + + while (state->splitmuxsrc == NULL && state->running) { + g_cond_wait (&state->cond, &state->lock); + } + } else { + gboolean add_result; + g_signal_emit_by_name (G_OBJECT (state->splitmuxsrc), "add-fragment", + fname, start_offset, duration, &add_result); + if (!add_result) { + g_printerr ("Failed to add fragment %" G_GSIZE_FORMAT + ": %s for playback\n", state->num_fragments, fname); + g_main_loop_quit (state->loop); + return FALSE; + } + } + + state->num_fragments++; + g_mutex_unlock (&state->lock); + } + } else if (message->type == GST_MESSAGE_EOS) { + g_print ("Recording finished.\n"); + } else if (message->type == GST_MESSAGE_ERROR) { + GError *err; + gchar *debug_info; + + gst_message_parse_error (message, &err, &debug_info); + g_printerr ("Error received from element %s: %s\n", + GST_OBJECT_NAME (message->src), err->message); + g_printerr ("Debugging information: %s\n", + debug_info ? debug_info : "none"); + g_main_loop_quit (state->loop); + } + return TRUE; +} + +static void +setup_splitmuxsrc (GstElement * playbin, GstElement * src, gpointer userdata) +{ + State *state = userdata; + + g_mutex_lock (&state->lock); + state->splitmuxsrc = src; + g_cond_broadcast (&state->cond); + + /* We need to give splitmuxsrc a first fragment when it starts to avoid races */ + gboolean add_result; + g_signal_emit_by_name (G_OBJECT (state->splitmuxsrc), "add-fragment", + state->fname, state->start_offset, state->duration, &add_result); + if (!add_result) { + g_printerr ("Failed to add fragment %" G_GSIZE_FORMAT ": %s for playback\n", + state->num_fragments, state->fname); + g_main_loop_quit (state->loop); + } + g_mutex_unlock (&state->lock); +} + +static gboolean +playback_message_handler (GstBus * bus, GstMessage * message, gpointer data) +{ + State *state = data; + + if (message->type == GST_MESSAGE_ERROR) { + GError *err; + gchar *debug_info; + + gst_message_parse_error (message, &err, &debug_info); + g_printerr ("Error received from element %s: %s\n", + GST_OBJECT_NAME (message->src), err->message); + g_printerr ("Debugging information: %s\n", + debug_info ? debug_info : "none"); + g_mutex_lock (&state->lock); + state->running = FALSE; + g_cond_broadcast (&state->cond); + g_mutex_unlock (&state->lock); + g_main_loop_quit (state->loop); + } + if (message->type == GST_MESSAGE_EOS) { + g_print ("Playback finished exiting.\n"); + g_main_loop_quit (state->loop); + } + return TRUE; +} + +int +main (int argc, char *argv[]) +{ + State state = { 0, }; + GstBus *bus; + + gst_init (&argc, &argv); + + if (argc < 2) { + g_printerr + ("Usage: %s target_dir\n Pass splitmuxsink target directory for generated recording\n", + argv[0]); + return 1; + } + + g_mutex_init (&state.lock); + g_cond_init (&state.cond); + + /* First create our playback pipeline that the recording pipe will pass fragments to */ + state.playback_pipe = gst_element_factory_make ("playbin3", NULL); + if (state.playback_pipe == NULL) { + g_print ("Failed to create playback pipeline. Check your installation\n"); + return 3; + } + + /* Connect to source-setup to set fragments on splitmuxsrc */ + g_signal_connect (state.playback_pipe, "source-setup", + G_CALLBACK (setup_splitmuxsrc), &state); + g_object_set (state.playback_pipe, "uri", "splitmux://", NULL); + + bus = gst_element_get_bus (state.playback_pipe); + gst_bus_add_watch (bus, playback_message_handler, &state); + gst_object_unref (bus); + + GError *error = NULL; + state.record_pipe = + gst_parse_launch + ("videotestsrc num-buffers=300 ! video/x-raw,framerate=30/1 ! timeoverlay ! x264enc key-int-max=30 ! " + "h264parse ! queue ! splitmuxsink name=sink " + "audiotestsrc samplesperbuffer=1600 num-buffers=300 ! audio/x-raw,rate=48000 ! opusenc ! queue ! sink.audio_0 ", + &error); + + if (state.record_pipe == NULL || error != NULL) { + g_print ("Failed to create generator pipeline. Error %s\n", error->message); + return 3; + } + + GstElement *splitmuxsink = + gst_bin_get_by_name (GST_BIN (state.record_pipe), "sink"); + + /* Set the files glob on src */ + gchar *file_pattern = g_strdup_printf ("%s/test%%05d.mp4", argv[1]); + g_object_set (splitmuxsink, "location", file_pattern, NULL); + g_object_set (splitmuxsink, "max-size-time", GST_SECOND, NULL); + + gst_object_unref (splitmuxsink); + + bus = gst_element_get_bus (state.record_pipe); + gst_bus_add_watch (bus, record_message_handler, &state); + gst_object_unref (bus); + + /* Start the recording pipeline. It will start playback once the first + * fragment is available */ + gst_element_set_state (state.record_pipe, GST_STATE_PLAYING); + + state.loop = g_main_loop_new (NULL, FALSE); + state.running = TRUE; + g_main_loop_run (state.loop); + + gst_element_set_state (state.record_pipe, GST_STATE_NULL); + gst_element_set_state (state.playback_pipe, GST_STATE_NULL); + + gst_object_unref (state.record_pipe); + gst_object_unref (state.playback_pipe); + + g_mutex_clear (&state.lock); + g_cond_clear (&state.cond); + + return 0; +}