From 44005ab9fbcd554a518c5ab3e6981ca3eca0c338 Mon Sep 17 00:00:00 2001 From: Jan Schmidt Date: Fri, 21 Jun 2024 11:53:19 +1000 Subject: [PATCH] splitmuxsink: Fix race in unit tests. Add fragment-id to messages Publish fragment-id in the messages that splitmuxsink and splitmuxsrc send, so when they are received out of order (due to async finalization, for example), they can still be identified / ordered correctly. Fix a race in the splitmuxsink unit test where messages might be received out of order Part-of: --- .../gst/multifile/gstsplitmuxsink.c | 36 ++++++++++--------- .../gst/multifile/gstsplitmuxsink.h | 5 ++- .../gst/multifile/gstsplitmuxsrc.c | 1 + .../tests/check/elements/splitmuxsink.c | 22 +++++++----- 4 files changed, 38 insertions(+), 26 deletions(-) diff --git a/subprojects/gst-plugins-good/gst/multifile/gstsplitmuxsink.c b/subprojects/gst-plugins-good/gst/multifile/gstsplitmuxsink.c index e244403830..047fc2e38a 100644 --- a/subprojects/gst-plugins-good/gst/multifile/gstsplitmuxsink.c +++ b/subprojects/gst-plugins-good/gst/multifile/gstsplitmuxsink.c @@ -1153,6 +1153,7 @@ send_fragment_opened_closed_msg (GstSplitMuxSink * splitmux, gboolean opened, * NULL */ if (splitmux->reference_ctx) { GstStructure *s = gst_structure_new (msg_name, + "fragment-id", G_TYPE_UINT, out_fragment_info->fragment_id, "location", G_TYPE_STRING, location, "running-time", GST_TYPE_CLOCK_TIME, out_fragment_info->last_running_time, "sink", GST_TYPE_ELEMENT, @@ -1296,6 +1297,7 @@ update_output_fragment_info (GstSplitMuxSink * splitmux) GST_STIME_ARGS (splitmux->reference_ctx->out_running_time), &offset, &duration); + splitmux->out_fragment_info.fragment_id = splitmux->cur_fragment_id; splitmux->out_fragment_info.last_running_time = splitmux->reference_ctx->out_running_time; splitmux->out_fragment_info.fragment_offset = offset; @@ -1960,7 +1962,7 @@ handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx) ctx->out_fragment_start_runts = buf_info->run_ts; /* For the first fragment check if this is the earliest of all start running times */ - if (splitmux->fragment_id == 1) { + if (splitmux->cur_fragment_id == splitmux->start_index) { if (!GST_CLOCK_STIME_IS_VALID (splitmux->out_start_runts) || (ctx->out_fragment_start_runts < splitmux->out_start_runts)) { splitmux->out_start_runts = ctx->out_fragment_start_runts; @@ -2185,14 +2187,14 @@ start_next_fragment (GstSplitMuxSink * splitmux, MqStreamCtx * ctx) if (splitmux->async_finalize) { if (splitmux->muxed_out_bytes > 0 - || splitmux->fragment_id != splitmux->start_index) { + || splitmux->cur_fragment_id != splitmux->start_index) { gchar *newname; GstElement *new_sink, *new_muxer; GST_DEBUG_OBJECT (splitmux, "Starting fragment %u", - splitmux->fragment_id); + splitmux->next_fragment_id); g_list_foreach (splitmux->contexts, (GFunc) block_context, splitmux); - newname = g_strdup_printf ("sink_%u", splitmux->fragment_id); + newname = g_strdup_printf ("sink_%u", splitmux->next_fragment_id); GST_SPLITMUX_LOCK (splitmux); if ((splitmux->sink = create_element (splitmux, splitmux->sink_factory, newname, @@ -2207,7 +2209,7 @@ start_next_fragment (GstSplitMuxSink * splitmux, MqStreamCtx * ctx) splitmux->active_sink = splitmux->sink; g_signal_emit (splitmux, signals[SIGNAL_SINK_ADDED], 0, splitmux->sink); g_free (newname); - newname = g_strdup_printf ("muxer_%u", splitmux->fragment_id); + newname = g_strdup_printf ("muxer_%u", splitmux->next_fragment_id); if ((splitmux->muxer = create_element (splitmux, splitmux->muxer_factory, newname, TRUE)) == NULL) @@ -2280,6 +2282,7 @@ start_next_fragment (GstSplitMuxSink * splitmux, MqStreamCtx * ctx) GST_SPLITMUX_LOCK (splitmux); set_next_filename (splitmux, ctx); + splitmux->next_fragment_id++; splitmux->muxed_out_bytes = 0; splitmux->out_fragment_start_runts = GST_CLOCK_STIME_NONE; GST_SPLITMUX_UNLOCK (splitmux); @@ -4029,7 +4032,7 @@ set_next_filename (GstSplitMuxSink * splitmux, MqStreamCtx * ctx) caps = gst_pad_get_current_caps (ctx->srcpad); sample = gst_sample_new (ctx->cur_out_buffer, caps, &ctx->out_segment, NULL); g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION_FULL], 0, - splitmux->fragment_id, sample, &fname); + splitmux->next_fragment_id, sample, &fname); gst_sample_unref (sample); if (caps) gst_caps_unref (caps); @@ -4037,22 +4040,23 @@ set_next_filename (GstSplitMuxSink * splitmux, MqStreamCtx * ctx) if (fname == NULL) { /* Fallback to the old signal if the new one returned nothing */ g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION], 0, - splitmux->fragment_id, &fname); + splitmux->next_fragment_id, &fname); } - if (!fname) + if (!fname) { fname = splitmux->location ? - g_strdup_printf (splitmux->location, splitmux->fragment_id) : NULL; + g_strdup_printf (splitmux->location, splitmux->next_fragment_id) : NULL; + } if (fname) { GST_INFO_OBJECT (splitmux, "Setting file to %s", fname); if (g_object_class_find_property (G_OBJECT_GET_CLASS (splitmux->sink), - "location") != NULL) + "location") != NULL) { g_object_set (splitmux->sink, "location", fname, NULL); + } g_free (fname); + splitmux->cur_fragment_id = splitmux->next_fragment_id; } - - splitmux->fragment_id++; } /* called with GST_SPLITMUX_LOCK */ @@ -4151,7 +4155,7 @@ gst_splitmux_sink_change_state (GstElement * element, GstStateChange transition) g_signal_emit (splitmux, signals[SIGNAL_MUXER_ADDED], 0, splitmux->muxer); g_signal_emit (splitmux, signals[SIGNAL_SINK_ADDED], 0, splitmux->sink); GST_SPLITMUX_UNLOCK (splitmux); - splitmux->fragment_id = splitmux->start_index; + splitmux->next_fragment_id = splitmux->start_index; break; } case GST_STATE_CHANGE_READY_TO_PAUSED:{ @@ -4217,7 +4221,7 @@ gst_splitmux_sink_change_state (GstElement * element, GstStateChange transition) } case GST_STATE_CHANGE_READY_TO_NULL: GST_SPLITMUX_LOCK (splitmux); - splitmux->fragment_id = 0; + splitmux->cur_fragment_id = splitmux->next_fragment_id = 0; /* Reset internal elements only if no pad contexts are using them */ if (splitmux->contexts == NULL) gst_splitmux_reset_elements (splitmux); @@ -4252,8 +4256,8 @@ beach: static void gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux) { - if (splitmux->max_files && splitmux->fragment_id >= splitmux->max_files) { - splitmux->fragment_id = 0; + if (splitmux->max_files && splitmux->next_fragment_id >= splitmux->max_files) { + splitmux->next_fragment_id = 0; } } diff --git a/subprojects/gst-plugins-good/gst/multifile/gstsplitmuxsink.h b/subprojects/gst-plugins-good/gst/multifile/gstsplitmuxsink.h index 1a3395c9c4..3b8ecfd98a 100644 --- a/subprojects/gst-plugins-good/gst/multifile/gstsplitmuxsink.h +++ b/subprojects/gst-plugins-good/gst/multifile/gstsplitmuxsink.h @@ -70,6 +70,7 @@ typedef struct _SplitMuxOutputCommand typedef struct { + guint fragment_id; GstClockTime last_running_time; GstClockTime fragment_offset; @@ -181,7 +182,9 @@ struct _GstSplitMuxSink gboolean ready_for_output; gchar *location; - guint fragment_id; + guint cur_fragment_id; + + guint next_fragment_id; guint start_index; GList *contexts; diff --git a/subprojects/gst-plugins-good/gst/multifile/gstsplitmuxsrc.c b/subprojects/gst-plugins-good/gst/multifile/gstsplitmuxsrc.c index 76ea174394..581c887a18 100644 --- a/subprojects/gst-plugins-good/gst/multifile/gstsplitmuxsrc.c +++ b/subprojects/gst-plugins-good/gst/multifile/gstsplitmuxsrc.c @@ -577,6 +577,7 @@ gst_splitmux_part_measured_cb (GstSplitMuxPartReader * part, /* Post part measured info message */ GstMessage *msg = gst_message_new_element (GST_OBJECT (splitmux), gst_structure_new ("splitmuxsrc-fragment-info", + "fragment-id", G_TYPE_UINT, idx, "location", G_TYPE_STRING, filename, "fragment-offset", GST_TYPE_CLOCK_TIME, offset, "fragment-duration", GST_TYPE_CLOCK_TIME, duration, diff --git a/subprojects/gst-plugins-good/tests/check/elements/splitmuxsink.c b/subprojects/gst-plugins-good/tests/check/elements/splitmuxsink.c index dc33480c94..6d0178125e 100644 --- a/subprojects/gst-plugins-good/tests/check/elements/splitmuxsink.c +++ b/subprojects/gst-plugins-good/tests/check/elements/splitmuxsink.c @@ -109,7 +109,7 @@ run_pipeline (GstElement * pipeline, guint num_fragments_expected, { GstBus *bus = gst_element_get_bus (GST_ELEMENT (pipeline)); GstMessage *msg; - guint fragment_number = 0; + guint fragments_seen = 0; gst_element_set_state (pipeline, GST_STATE_PLAYING); do { @@ -126,25 +126,29 @@ run_pipeline (GstElement * pipeline, guint num_fragments_expected, if (gst_structure_has_name (s, "splitmuxsrc-fragment-info") || gst_structure_has_name (s, "splitmuxsink-fragment-closed")) { GstClockTime fragment_offset, fragment_duration; + guint fragment_id; + fail_unless (gst_structure_get_uint (s, "fragment-id", &fragment_id)); + fail_unless (fragment_id < num_fragments_expected); + fail_unless (gst_structure_get_clock_time (s, "fragment-offset", &fragment_offset)); fail_unless (gst_structure_get_clock_time (s, "fragment-duration", &fragment_duration)); if (fragment_offsets != NULL) { - fail_unless (fragment_offsets[fragment_number] == fragment_offset, + fail_unless (fragment_offsets[fragment_id] == fragment_offset, "Expected offset %" GST_TIME_FORMAT " for fragment %u. Got offset %" GST_TIME_FORMAT, - GST_TIME_ARGS (fragment_offsets[fragment_number]), - fragment_number, GST_TIME_ARGS (fragment_offset)); + GST_TIME_ARGS (fragment_offsets[fragment_id]), + fragment_id, GST_TIME_ARGS (fragment_offset)); } if (fragment_durations != NULL) { - fail_unless (fragment_durations[fragment_number] == fragment_duration, + fail_unless (fragment_durations[fragment_id] == fragment_duration, "Expected duration %" GST_TIME_FORMAT " for fragment %u. Got duration %" GST_TIME_FORMAT, - GST_TIME_ARGS (fragment_durations[fragment_number]), - fragment_number, GST_TIME_ARGS (fragment_duration)); + GST_TIME_ARGS (fragment_durations[fragment_id]), + fragment_id, GST_TIME_ARGS (fragment_duration)); } - fragment_number++; + fragments_seen++; } } gst_message_unref (msg); @@ -158,7 +162,7 @@ run_pipeline (GstElement * pipeline, guint num_fragments_expected, dump_error (msg); else if (num_fragments_expected != 0) { // Success. Check we got the expected number of fragment messages - fail_unless (fragment_number == num_fragments_expected); + fail_unless (fragments_seen == num_fragments_expected); } return msg;