splitmuxsink: Fix race in unit tests. Add fragment-id to messages

Publish fragment-id in the messages that splitmuxsink and splitmuxsrc
send, so when they are received out of order (due to async finalization,
for example), they can still be identified / ordered correctly.

Fix a race in the splitmuxsink unit test where messages might be
received out of order

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/7053>
This commit is contained in:
Jan Schmidt 2024-06-21 11:53:19 +10:00
parent 356710f6fa
commit 44005ab9fb
4 changed files with 38 additions and 26 deletions

View file

@ -1153,6 +1153,7 @@ send_fragment_opened_closed_msg (GstSplitMuxSink * splitmux, gboolean opened,
* NULL */
if (splitmux->reference_ctx) {
GstStructure *s = gst_structure_new (msg_name,
"fragment-id", G_TYPE_UINT, out_fragment_info->fragment_id,
"location", G_TYPE_STRING, location,
"running-time", GST_TYPE_CLOCK_TIME,
out_fragment_info->last_running_time, "sink", GST_TYPE_ELEMENT,
@ -1296,6 +1297,7 @@ update_output_fragment_info (GstSplitMuxSink * splitmux)
GST_STIME_ARGS (splitmux->reference_ctx->out_running_time),
&offset, &duration);
splitmux->out_fragment_info.fragment_id = splitmux->cur_fragment_id;
splitmux->out_fragment_info.last_running_time =
splitmux->reference_ctx->out_running_time;
splitmux->out_fragment_info.fragment_offset = offset;
@ -1960,7 +1962,7 @@ handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
ctx->out_fragment_start_runts = buf_info->run_ts;
/* For the first fragment check if this is the earliest of all start running times */
if (splitmux->fragment_id == 1) {
if (splitmux->cur_fragment_id == splitmux->start_index) {
if (!GST_CLOCK_STIME_IS_VALID (splitmux->out_start_runts)
|| (ctx->out_fragment_start_runts < splitmux->out_start_runts)) {
splitmux->out_start_runts = ctx->out_fragment_start_runts;
@ -2185,14 +2187,14 @@ start_next_fragment (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
if (splitmux->async_finalize) {
if (splitmux->muxed_out_bytes > 0
|| splitmux->fragment_id != splitmux->start_index) {
|| splitmux->cur_fragment_id != splitmux->start_index) {
gchar *newname;
GstElement *new_sink, *new_muxer;
GST_DEBUG_OBJECT (splitmux, "Starting fragment %u",
splitmux->fragment_id);
splitmux->next_fragment_id);
g_list_foreach (splitmux->contexts, (GFunc) block_context, splitmux);
newname = g_strdup_printf ("sink_%u", splitmux->fragment_id);
newname = g_strdup_printf ("sink_%u", splitmux->next_fragment_id);
GST_SPLITMUX_LOCK (splitmux);
if ((splitmux->sink =
create_element (splitmux, splitmux->sink_factory, newname,
@ -2207,7 +2209,7 @@ start_next_fragment (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
splitmux->active_sink = splitmux->sink;
g_signal_emit (splitmux, signals[SIGNAL_SINK_ADDED], 0, splitmux->sink);
g_free (newname);
newname = g_strdup_printf ("muxer_%u", splitmux->fragment_id);
newname = g_strdup_printf ("muxer_%u", splitmux->next_fragment_id);
if ((splitmux->muxer =
create_element (splitmux, splitmux->muxer_factory, newname,
TRUE)) == NULL)
@ -2280,6 +2282,7 @@ start_next_fragment (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
GST_SPLITMUX_LOCK (splitmux);
set_next_filename (splitmux, ctx);
splitmux->next_fragment_id++;
splitmux->muxed_out_bytes = 0;
splitmux->out_fragment_start_runts = GST_CLOCK_STIME_NONE;
GST_SPLITMUX_UNLOCK (splitmux);
@ -4029,7 +4032,7 @@ set_next_filename (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
caps = gst_pad_get_current_caps (ctx->srcpad);
sample = gst_sample_new (ctx->cur_out_buffer, caps, &ctx->out_segment, NULL);
g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION_FULL], 0,
splitmux->fragment_id, sample, &fname);
splitmux->next_fragment_id, sample, &fname);
gst_sample_unref (sample);
if (caps)
gst_caps_unref (caps);
@ -4037,22 +4040,23 @@ set_next_filename (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
if (fname == NULL) {
/* Fallback to the old signal if the new one returned nothing */
g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION], 0,
splitmux->fragment_id, &fname);
splitmux->next_fragment_id, &fname);
}
if (!fname)
if (!fname) {
fname = splitmux->location ?
g_strdup_printf (splitmux->location, splitmux->fragment_id) : NULL;
g_strdup_printf (splitmux->location, splitmux->next_fragment_id) : NULL;
}
if (fname) {
GST_INFO_OBJECT (splitmux, "Setting file to %s", fname);
if (g_object_class_find_property (G_OBJECT_GET_CLASS (splitmux->sink),
"location") != NULL)
"location") != NULL) {
g_object_set (splitmux->sink, "location", fname, NULL);
}
g_free (fname);
splitmux->cur_fragment_id = splitmux->next_fragment_id;
}
splitmux->fragment_id++;
}
/* called with GST_SPLITMUX_LOCK */
@ -4151,7 +4155,7 @@ gst_splitmux_sink_change_state (GstElement * element, GstStateChange transition)
g_signal_emit (splitmux, signals[SIGNAL_MUXER_ADDED], 0, splitmux->muxer);
g_signal_emit (splitmux, signals[SIGNAL_SINK_ADDED], 0, splitmux->sink);
GST_SPLITMUX_UNLOCK (splitmux);
splitmux->fragment_id = splitmux->start_index;
splitmux->next_fragment_id = splitmux->start_index;
break;
}
case GST_STATE_CHANGE_READY_TO_PAUSED:{
@ -4217,7 +4221,7 @@ gst_splitmux_sink_change_state (GstElement * element, GstStateChange transition)
}
case GST_STATE_CHANGE_READY_TO_NULL:
GST_SPLITMUX_LOCK (splitmux);
splitmux->fragment_id = 0;
splitmux->cur_fragment_id = splitmux->next_fragment_id = 0;
/* Reset internal elements only if no pad contexts are using them */
if (splitmux->contexts == NULL)
gst_splitmux_reset_elements (splitmux);
@ -4252,8 +4256,8 @@ beach:
static void
gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux)
{
if (splitmux->max_files && splitmux->fragment_id >= splitmux->max_files) {
splitmux->fragment_id = 0;
if (splitmux->max_files && splitmux->next_fragment_id >= splitmux->max_files) {
splitmux->next_fragment_id = 0;
}
}

View file

@ -70,6 +70,7 @@ typedef struct _SplitMuxOutputCommand
typedef struct
{
guint fragment_id;
GstClockTime last_running_time;
GstClockTime fragment_offset;
@ -181,7 +182,9 @@ struct _GstSplitMuxSink
gboolean ready_for_output;
gchar *location;
guint fragment_id;
guint cur_fragment_id;
guint next_fragment_id;
guint start_index;
GList *contexts;

View file

@ -577,6 +577,7 @@ gst_splitmux_part_measured_cb (GstSplitMuxPartReader * part,
/* Post part measured info message */
GstMessage *msg = gst_message_new_element (GST_OBJECT (splitmux),
gst_structure_new ("splitmuxsrc-fragment-info",
"fragment-id", G_TYPE_UINT, idx,
"location", G_TYPE_STRING, filename,
"fragment-offset", GST_TYPE_CLOCK_TIME, offset,
"fragment-duration", GST_TYPE_CLOCK_TIME, duration,

View file

@ -109,7 +109,7 @@ run_pipeline (GstElement * pipeline, guint num_fragments_expected,
{
GstBus *bus = gst_element_get_bus (GST_ELEMENT (pipeline));
GstMessage *msg;
guint fragment_number = 0;
guint fragments_seen = 0;
gst_element_set_state (pipeline, GST_STATE_PLAYING);
do {
@ -126,25 +126,29 @@ run_pipeline (GstElement * pipeline, guint num_fragments_expected,
if (gst_structure_has_name (s, "splitmuxsrc-fragment-info") ||
gst_structure_has_name (s, "splitmuxsink-fragment-closed")) {
GstClockTime fragment_offset, fragment_duration;
guint fragment_id;
fail_unless (gst_structure_get_uint (s, "fragment-id", &fragment_id));
fail_unless (fragment_id < num_fragments_expected);
fail_unless (gst_structure_get_clock_time (s, "fragment-offset",
&fragment_offset));
fail_unless (gst_structure_get_clock_time (s, "fragment-duration",
&fragment_duration));
if (fragment_offsets != NULL) {
fail_unless (fragment_offsets[fragment_number] == fragment_offset,
fail_unless (fragment_offsets[fragment_id] == fragment_offset,
"Expected offset %" GST_TIME_FORMAT
" for fragment %u. Got offset %" GST_TIME_FORMAT,
GST_TIME_ARGS (fragment_offsets[fragment_number]),
fragment_number, GST_TIME_ARGS (fragment_offset));
GST_TIME_ARGS (fragment_offsets[fragment_id]),
fragment_id, GST_TIME_ARGS (fragment_offset));
}
if (fragment_durations != NULL) {
fail_unless (fragment_durations[fragment_number] == fragment_duration,
fail_unless (fragment_durations[fragment_id] == fragment_duration,
"Expected duration %" GST_TIME_FORMAT
" for fragment %u. Got duration %" GST_TIME_FORMAT,
GST_TIME_ARGS (fragment_durations[fragment_number]),
fragment_number, GST_TIME_ARGS (fragment_duration));
GST_TIME_ARGS (fragment_durations[fragment_id]),
fragment_id, GST_TIME_ARGS (fragment_duration));
}
fragment_number++;
fragments_seen++;
}
}
gst_message_unref (msg);
@ -158,7 +162,7 @@ run_pipeline (GstElement * pipeline, guint num_fragments_expected,
dump_error (msg);
else if (num_fragments_expected != 0) {
// Success. Check we got the expected number of fragment messages
fail_unless (fragment_number == num_fragments_expected);
fail_unless (fragments_seen == num_fragments_expected);
}
return msg;