From b50d3b9c9fa93af1dcf5162b3b2424209659c2bf Mon Sep 17 00:00:00 2001 From: Jan Schmidt Date: Thu, 8 Jul 2021 00:12:52 +1000 Subject: [PATCH] splitmuxsink: Prevent hang going back to NULL after failures Prevent a condition where splitmuxsink won't go back to NULL state after a child element fails to change state by making sure that a READY->READY state change doesn't fail, and by returning GST_FLOW_ERROR or GST_FLOW_FLUSHING upstream to shut down streaming as quickly as possible. This can happen after (for example) setting an invalid filename on the sink element. In that case, the READY->PAUSED transition fails, but with internal elements still in the NULL state. Trying to set splitmuxsink back to NULL then ends up trying to bring those NULL elements up to READY with a READY->READY transition, (which fails, prevent splitmuxsink from getting to NULL) Part-of: --- gst/multifile/gstsplitmuxsink.c | 118 ++++++++++++++++++++++------ tests/check/elements/splitmuxsink.c | 38 +++++++++ 2 files changed, 133 insertions(+), 23 deletions(-) diff --git a/gst/multifile/gstsplitmuxsink.c b/gst/multifile/gstsplitmuxsink.c index 8b0247643d..b04085747a 100644 --- a/gst/multifile/gstsplitmuxsink.c +++ b/gst/multifile/gstsplitmuxsink.c @@ -234,7 +234,8 @@ static GstStateChangeReturn gst_splitmux_sink_change_state (GstElement * static void bus_handler (GstBin * bin, GstMessage * msg); static void set_next_filename (GstSplitMuxSink * splitmux, MqStreamCtx * ctx); -static void start_next_fragment (GstSplitMuxSink * splitmux, MqStreamCtx * ctx); +static GstFlowReturn start_next_fragment (GstSplitMuxSink * splitmux, + MqStreamCtx * ctx); static void mq_stream_ctx_free (MqStreamCtx * ctx); static void grow_blocked_queues (GstSplitMuxSink * splitmux); @@ -1226,11 +1227,11 @@ all_contexts_are_async_eos (GstSplitMuxSink * splitmux) * context needs to sleep to wait for the release of the * next GOP, or to send EOS to close out the current file */ -static void +static GstFlowReturn complete_or_wait_on_out (GstSplitMuxSink * splitmux, MqStreamCtx * ctx) { if (ctx->caps_change) - return; + return GST_FLOW_OK; do { /* When first starting up, the reference stream has to output @@ -1252,7 +1253,7 @@ complete_or_wait_on_out (GstSplitMuxSink * splitmux, MqStreamCtx * ctx) if (ctx->flushing || splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED) - return; + return GST_FLOW_FLUSHING; GST_LOG_OBJECT (ctx->srcpad, "Checking running time %" GST_STIME_FORMAT " against max %" @@ -1262,7 +1263,7 @@ complete_or_wait_on_out (GstSplitMuxSink * splitmux, MqStreamCtx * ctx) if (can_output) { if (splitmux->max_out_running_time == GST_CLOCK_STIME_NONE || ctx->out_running_time < my_max_out_running_time) { - return; + return GST_FLOW_OK; } switch (splitmux->output_state) { @@ -1306,12 +1307,17 @@ complete_or_wait_on_out (GstSplitMuxSink * splitmux, MqStreamCtx * ctx) break; case SPLITMUX_OUTPUT_STATE_START_NEXT_FILE: if (ctx->is_reference) { + GstFlowReturn ret = GST_FLOW_OK; + /* Special handling on the reference ctx to start new fragments * and collect commands from the command queue */ /* drops the splitmux lock briefly: */ /* We must have reference ctx in order for format-location-full to * have a sample */ - start_next_fragment (splitmux, ctx); + ret = start_next_fragment (splitmux, ctx); + if (ret != GST_FLOW_OK) + return ret; + continue; } break; @@ -1354,7 +1360,7 @@ complete_or_wait_on_out (GstSplitMuxSink * splitmux, MqStreamCtx * ctx) continue; } case SPLITMUX_OUTPUT_STATE_STOPPED: - return; + return GST_FLOW_FLUSHING; } } else { GST_LOG_OBJECT (ctx->srcpad, "Not yet ready for output"); @@ -1371,6 +1377,8 @@ complete_or_wait_on_out (GstSplitMuxSink * splitmux, MqStreamCtx * ctx) GST_STIME_ARGS (splitmux->max_out_running_time)); } while (1); + + return GST_FLOW_OK; } static GstClockTime @@ -1557,6 +1565,7 @@ handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx) { GstSplitMuxSink *splitmux = ctx->splitmux; MqStreamBuf *buf_info = NULL; + GstFlowReturn ret = GST_FLOW_OK; GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type); @@ -1667,8 +1676,9 @@ handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx) goto beach; ctx->out_running_time = ts; if (!ctx->is_reference) - complete_or_wait_on_out (splitmux, ctx); + ret = complete_or_wait_on_out (splitmux, ctx); GST_SPLITMUX_UNLOCK (splitmux); + GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret; return GST_PAD_PROBE_DROP; } case GST_EVENT_CAPS:{ @@ -1717,27 +1727,31 @@ handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx) if (!locked) GST_SPLITMUX_LOCK (splitmux); if (wait) - complete_or_wait_on_out (splitmux, ctx); + ret = complete_or_wait_on_out (splitmux, ctx); GST_SPLITMUX_UNLOCK (splitmux); /* Don't try to forward sticky events before the next buffer is there * because it would cause a new file to be created without the first * buffer being available. */ + GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret; if (ctx->caps_change && GST_EVENT_IS_STICKY (event)) { gst_event_unref (event); return GST_PAD_PROBE_HANDLED; - } else + } else { return GST_PAD_PROBE_PASS; + } } /* Allow everything through until the configured next stopping point */ GST_SPLITMUX_LOCK (splitmux); buf_info = g_queue_pop_tail (&ctx->queued_bufs); - if (buf_info == NULL) + if (buf_info == NULL) { /* Can only happen due to a poorly timed flush */ + ret = GST_FLOW_FLUSHING; goto beach; + } /* If we have popped a keyframe, decrement the queued_gop count */ if (buf_info->keyframe && splitmux->queued_keyframes > 0) @@ -1753,7 +1767,7 @@ handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx) ctx->caps_change = FALSE; - complete_or_wait_on_out (splitmux, ctx); + ret = complete_or_wait_on_out (splitmux, ctx); splitmux->muxed_out_bytes += buf_info->buf_size; @@ -1785,10 +1799,12 @@ handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx) mq_stream_buf_free (buf_info); + GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret; return GST_PAD_PROBE_PASS; beach: GST_SPLITMUX_UNLOCK (splitmux); + GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret; return GST_PAD_PROBE_DROP; } @@ -1907,7 +1923,7 @@ _send_event (const GValue * value, gpointer user_data) * reaches EOS and it is time to restart * a new fragment */ -static void +static GstFlowReturn start_next_fragment (GstSplitMuxSink * splitmux, MqStreamCtx * ctx) { GstElement *muxer, *sink; @@ -1928,8 +1944,9 @@ start_next_fragment (GstSplitMuxSink * splitmux, MqStreamCtx * ctx) if (splitmux->shutdown) { GST_DEBUG_OBJECT (splitmux, "Shutdown requested. Aborting fragment switch."); + GST_SPLITMUX_LOCK (splitmux); GST_SPLITMUX_STATE_UNLOCK (splitmux); - return; + return GST_FLOW_FLUSHING; } if (splitmux->async_finalize) { @@ -2032,8 +2049,24 @@ start_next_fragment (GstSplitMuxSink * splitmux, MqStreamCtx * ctx) splitmux->muxed_out_bytes = 0; GST_SPLITMUX_UNLOCK (splitmux); - gst_element_set_state (sink, GST_STATE_TARGET (splitmux)); - gst_element_set_state (muxer, GST_STATE_TARGET (splitmux)); + if (gst_element_set_state (sink, + GST_STATE_TARGET (splitmux)) == GST_STATE_CHANGE_FAILURE) { + gst_element_set_state (sink, GST_STATE_NULL); + gst_element_set_locked_state (muxer, FALSE); + gst_element_set_locked_state (sink, FALSE); + + goto fail_output; + } + + if (gst_element_set_state (muxer, + GST_STATE_TARGET (splitmux)) == GST_STATE_CHANGE_FAILURE) { + gst_element_set_state (muxer, GST_STATE_NULL); + gst_element_set_state (sink, GST_STATE_NULL); + gst_element_set_locked_state (muxer, FALSE); + gst_element_set_locked_state (sink, FALSE); + goto fail_muxer; + } + gst_element_set_locked_state (muxer, FALSE); gst_element_set_locked_state (sink, FALSE); @@ -2056,12 +2089,32 @@ start_next_fragment (GstSplitMuxSink * splitmux, MqStreamCtx * ctx) GST_LOG_OBJECT (splitmux, "Resetting state to AWAITING_COMMAND"); splitmux->output_state = SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND; GST_SPLITMUX_BROADCAST_OUTPUT (splitmux); - return; + return GST_FLOW_OK; fail: + GST_SPLITMUX_LOCK (splitmux); GST_SPLITMUX_STATE_UNLOCK (splitmux); GST_ELEMENT_ERROR (splitmux, RESOURCE, SETTINGS, ("Could not create the new muxer/sink"), NULL); + return GST_FLOW_ERROR; + +fail_output: + GST_ELEMENT_ERROR (splitmux, RESOURCE, SETTINGS, + ("Could not start new output sink"), NULL); + + GST_SPLITMUX_LOCK (splitmux); + GST_SPLITMUX_STATE_UNLOCK (splitmux); + splitmux->switching_fragment = FALSE; + return GST_FLOW_ERROR; + +fail_muxer: + GST_ELEMENT_ERROR (splitmux, RESOURCE, SETTINGS, + ("Could not start new muxer"), NULL); + + GST_SPLITMUX_LOCK (splitmux); + GST_SPLITMUX_STATE_UNLOCK (splitmux); + splitmux->switching_fragment = FALSE; + return GST_FLOW_ERROR; } static void @@ -2578,6 +2631,7 @@ static GstPadProbeReturn handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx) { GstSplitMuxSink *splitmux = ctx->splitmux; + GstFlowReturn ret = GST_FLOW_OK; GstBuffer *buf; MqStreamBuf *buf_info = NULL; GstClockTime ts; @@ -2612,8 +2666,10 @@ handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx) GST_SPLITMUX_LOCK (splitmux); ctx->in_eos = TRUE; - if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED) + if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED) { + ret = GST_FLOW_FLUSHING; goto beach; + } if (ctx->is_reference) { GST_INFO_OBJECT (splitmux, "Got Reference EOS. Finishing up"); @@ -2642,8 +2698,10 @@ handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx) GST_SPLITMUX_LOCK (splitmux); - if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED) + if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED) { + ret = GST_FLOW_FLUSHING; goto beach; + } rtime = my_segment_to_running_time (&ctx->in_segment, gap_ts); GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_STIME_FORMAT, @@ -2688,8 +2746,10 @@ handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx) GST_SPLITMUX_LOCK (splitmux); - if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED) + if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED) { + ret = GST_FLOW_FLUSHING; goto beach; + } /* If this buffer has a timestamp, advance the input timestamp of the * stream */ @@ -2875,12 +2935,14 @@ handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx) " run ts %" GST_STIME_FORMAT, buf, GST_STIME_ARGS (ctx->in_running_time)); GST_SPLITMUX_UNLOCK (splitmux); + GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret; return GST_PAD_PROBE_PASS; beach: GST_SPLITMUX_UNLOCK (splitmux); if (buf_info) mq_stream_buf_free (buf_info); + GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret; return GST_PAD_PROBE_PASS; } @@ -3625,9 +3687,10 @@ gst_splitmux_sink_change_state (GstElement * element, GstStateChange transition) break; } case GST_STATE_CHANGE_PAUSED_TO_READY: + case GST_STATE_CHANGE_READY_TO_READY: g_atomic_int_set (&(splitmux->split_requested), FALSE); g_atomic_int_set (&(splitmux->do_split_next_gop), FALSE); - + /* Fall through */ case GST_STATE_CHANGE_READY_TO_NULL: GST_SPLITMUX_STATE_LOCK (splitmux); splitmux->shutdown = TRUE; @@ -3682,15 +3745,24 @@ gst_splitmux_sink_change_state (GstElement * element, GstStateChange transition) break; } + return ret; + beach: - if (transition == GST_STATE_CHANGE_NULL_TO_READY && - ret == GST_STATE_CHANGE_FAILURE) { + if (transition == GST_STATE_CHANGE_NULL_TO_READY) { /* Cleanup elements on failed transition out of NULL */ gst_splitmux_reset_elements (splitmux); GST_SPLITMUX_LOCK (splitmux); do_async_done (splitmux); GST_SPLITMUX_UNLOCK (splitmux); } + if (transition == GST_STATE_CHANGE_READY_TO_READY) { + /* READY to READY transition only happens when we're already + * in READY state, but a child element is in NULL, which + * happens when there's an error changing the state of the sink. + * We need to make sure not to fail the state transition, or + * the core won't transition us back to NULL successfully */ + ret = GST_STATE_CHANGE_SUCCESS; + } return ret; } diff --git a/tests/check/elements/splitmuxsink.c b/tests/check/elements/splitmuxsink.c index 9db9b91cf2..93c86e7486 100644 --- a/tests/check/elements/splitmuxsink.c +++ b/tests/check/elements/splitmuxsink.c @@ -433,6 +433,43 @@ GST_START_TEST (test_splitmuxsink) GST_END_TEST; +GST_START_TEST (test_splitmuxsink_clean_failure) +{ + GstMessage *msg; + GstElement *pipeline; + GstElement *sink, *fakesink; + + /* This pipeline has a small time cutoff - it should start a new file + * every GOP, ie 1 second */ + pipeline = + gst_parse_launch + ("videotestsrc horizontal-speed=2 is-live=true ! video/x-raw,width=80,height=64,framerate=5/1 ! videoconvert !" + " queue ! theoraenc keyframe-force=5 ! splitmuxsink name=splitsink " + " max-size-time=1000000 max-size-bytes=1000000 muxer=oggmux", NULL); + fail_if (pipeline == NULL); + sink = gst_bin_get_by_name (GST_BIN (pipeline), "splitsink"); + fail_if (sink == NULL); + + fakesink = gst_element_factory_make ("fakesink", "fakesink-fail"); + fail_if (fakesink == NULL); + + /* Trigger an error on READY->PAUSED */ + g_object_set (fakesink, "state-error", 2, NULL); + g_object_set (sink, "sink", fakesink, NULL); + gst_object_unref (sink); + + msg = run_pipeline (pipeline); + + fail_unless (GST_MESSAGE_TYPE (msg) == GST_MESSAGE_ERROR); + gst_message_unref (msg); + + fail_unless (gst_element_set_state (pipeline, + GST_STATE_NULL) == GST_STATE_CHANGE_SUCCESS); + gst_object_unref (pipeline); +} + +GST_END_TEST; + GST_START_TEST (test_splitmuxsink_multivid) { GstMessage *msg; @@ -807,6 +844,7 @@ splitmuxsink_suite (void) tcase_add_checked_fixture (tc_chain, tempdir_setup, tempdir_cleanup); tcase_add_test (tc_chain, test_splitmuxsink); + tcase_add_test (tc_chain, test_splitmuxsink_clean_failure); if (have_matroska && have_vorbis) { tcase_add_checked_fixture (tc_chain_complex, tempdir_setup,