splitmuxsink: Fix a race in fragment switching with async handling

Only do output/muxer operations at the output side of splitmuxsink
to avoid races if fragments are small, by moving the RUNNING_TIME
qdata setting.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/7053>
This commit is contained in:
Jan Schmidt 2024-04-30 00:33:35 +10:00
parent eca97e7940
commit b0df6ee408

View file

@ -1304,6 +1304,14 @@ complete_or_wait_on_out (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
/* We've reached the max out running_time to get here, so end this file now */ /* We've reached the max out running_time to get here, so end this file now */
if (ctx->out_eos == FALSE) { if (ctx->out_eos == FALSE) {
if (splitmux->async_finalize) { if (splitmux->async_finalize) {
/* For async finalization, we must store the fragment timing
* info on the element via qdata, because EOS will be processed
* asynchronously */
GstClockTime *sink_running_time = g_new (GstClockTime, 1);
*sink_running_time = splitmux->reference_ctx->out_running_time;
g_object_set_qdata_full (G_OBJECT (splitmux->sink),
RUNNING_TIME, sink_running_time, g_free);
/* We must set EOS asynchronously at this point. We cannot defer /* We must set EOS asynchronously at this point. We cannot defer
* it, because we need all contexts to wake up, for the * it, because we need all contexts to wake up, for the
* reference context to eventually give us something at * reference context to eventually give us something at
@ -1717,6 +1725,9 @@ handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
if (ctx == splitmux->reference_ctx) { if (ctx == splitmux->reference_ctx) {
splitmux->output_state = SPLITMUX_OUTPUT_STATE_ENDING_STREAM; splitmux->output_state = SPLITMUX_OUTPUT_STATE_ENDING_STREAM;
// Waiting before outputting will ensure the muxer end-of-stream
// qdata is set without racing against this EOS event reaching the muxer
wait = TRUE;
GST_SPLITMUX_BROADCAST_OUTPUT (splitmux); GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
} }
@ -2550,12 +2561,6 @@ handle_gathered_gop (GstSplitMuxSink * splitmux, const InputGop * gop,
/* Check for overrun - have we output at least one byte and overrun /* Check for overrun - have we output at least one byte and overrun
* either threshold? */ * either threshold? */
if (need_new_fragment (splitmux, queued_time, queued_gop_time, queued_bytes)) { if (need_new_fragment (splitmux, queued_time, queued_gop_time, queued_bytes)) {
if (splitmux->async_finalize) {
GstClockTime *sink_running_time = g_new (GstClockTime, 1);
*sink_running_time = splitmux->reference_ctx->out_running_time;
g_object_set_qdata_full (G_OBJECT (splitmux->sink),
RUNNING_TIME, sink_running_time, g_free);
}
g_atomic_int_set (&(splitmux->do_split_next_gop), FALSE); g_atomic_int_set (&(splitmux->do_split_next_gop), FALSE);
/* Tell the output side to start a new fragment */ /* Tell the output side to start a new fragment */
GST_INFO_OBJECT (splitmux, GST_INFO_OBJECT (splitmux,