diff --git a/gst/multifile/gstsplitmuxsink.c b/gst/multifile/gstsplitmuxsink.c index 8b0247643d..b04085747a 100644 --- a/gst/multifile/gstsplitmuxsink.c +++ b/gst/multifile/gstsplitmuxsink.c @@ -234,7 +234,8 @@ static GstStateChangeReturn gst_splitmux_sink_change_state (GstElement * static void bus_handler (GstBin * bin, GstMessage * msg); static void set_next_filename (GstSplitMuxSink * splitmux, MqStreamCtx * ctx); -static void start_next_fragment (GstSplitMuxSink * splitmux, MqStreamCtx * ctx); +static GstFlowReturn start_next_fragment (GstSplitMuxSink * splitmux, + MqStreamCtx * ctx); static void mq_stream_ctx_free (MqStreamCtx * ctx); static void grow_blocked_queues (GstSplitMuxSink * splitmux); @@ -1226,11 +1227,11 @@ all_contexts_are_async_eos (GstSplitMuxSink * splitmux) * context needs to sleep to wait for the release of the * next GOP, or to send EOS to close out the current file */ -static void +static GstFlowReturn complete_or_wait_on_out (GstSplitMuxSink * splitmux, MqStreamCtx * ctx) { if (ctx->caps_change) - return; + return GST_FLOW_OK; do { /* When first starting up, the reference stream has to output @@ -1252,7 +1253,7 @@ complete_or_wait_on_out (GstSplitMuxSink * splitmux, MqStreamCtx * ctx) if (ctx->flushing || splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED) - return; + return GST_FLOW_FLUSHING; GST_LOG_OBJECT (ctx->srcpad, "Checking running time %" GST_STIME_FORMAT " against max %" @@ -1262,7 +1263,7 @@ complete_or_wait_on_out (GstSplitMuxSink * splitmux, MqStreamCtx * ctx) if (can_output) { if (splitmux->max_out_running_time == GST_CLOCK_STIME_NONE || ctx->out_running_time < my_max_out_running_time) { - return; + return GST_FLOW_OK; } switch (splitmux->output_state) { @@ -1306,12 +1307,17 @@ complete_or_wait_on_out (GstSplitMuxSink * splitmux, MqStreamCtx * ctx) break; case SPLITMUX_OUTPUT_STATE_START_NEXT_FILE: if (ctx->is_reference) { + GstFlowReturn ret = GST_FLOW_OK; + /* Special handling on the reference ctx to start new fragments * and collect commands from the command queue */ /* drops the splitmux lock briefly: */ /* We must have reference ctx in order for format-location-full to * have a sample */ - start_next_fragment (splitmux, ctx); + ret = start_next_fragment (splitmux, ctx); + if (ret != GST_FLOW_OK) + return ret; + continue; } break; @@ -1354,7 +1360,7 @@ complete_or_wait_on_out (GstSplitMuxSink * splitmux, MqStreamCtx * ctx) continue; } case SPLITMUX_OUTPUT_STATE_STOPPED: - return; + return GST_FLOW_FLUSHING; } } else { GST_LOG_OBJECT (ctx->srcpad, "Not yet ready for output"); @@ -1371,6 +1377,8 @@ complete_or_wait_on_out (GstSplitMuxSink * splitmux, MqStreamCtx * ctx) GST_STIME_ARGS (splitmux->max_out_running_time)); } while (1); + + return GST_FLOW_OK; } static GstClockTime @@ -1557,6 +1565,7 @@ handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx) { GstSplitMuxSink *splitmux = ctx->splitmux; MqStreamBuf *buf_info = NULL; + GstFlowReturn ret = GST_FLOW_OK; GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type); @@ -1667,8 +1676,9 @@ handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx) goto beach; ctx->out_running_time = ts; if (!ctx->is_reference) - complete_or_wait_on_out (splitmux, ctx); + ret = complete_or_wait_on_out (splitmux, ctx); GST_SPLITMUX_UNLOCK (splitmux); + GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret; return GST_PAD_PROBE_DROP; } case GST_EVENT_CAPS:{ @@ -1717,27 +1727,31 @@ handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx) if (!locked) GST_SPLITMUX_LOCK (splitmux); if (wait) - complete_or_wait_on_out (splitmux, ctx); + ret = complete_or_wait_on_out (splitmux, ctx); GST_SPLITMUX_UNLOCK (splitmux); /* Don't try to forward sticky events before the next buffer is there * because it would cause a new file to be created without the first * buffer being available. */ + GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret; if (ctx->caps_change && GST_EVENT_IS_STICKY (event)) { gst_event_unref (event); return GST_PAD_PROBE_HANDLED; - } else + } else { return GST_PAD_PROBE_PASS; + } } /* Allow everything through until the configured next stopping point */ GST_SPLITMUX_LOCK (splitmux); buf_info = g_queue_pop_tail (&ctx->queued_bufs); - if (buf_info == NULL) + if (buf_info == NULL) { /* Can only happen due to a poorly timed flush */ + ret = GST_FLOW_FLUSHING; goto beach; + } /* If we have popped a keyframe, decrement the queued_gop count */ if (buf_info->keyframe && splitmux->queued_keyframes > 0) @@ -1753,7 +1767,7 @@ handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx) ctx->caps_change = FALSE; - complete_or_wait_on_out (splitmux, ctx); + ret = complete_or_wait_on_out (splitmux, ctx); splitmux->muxed_out_bytes += buf_info->buf_size; @@ -1785,10 +1799,12 @@ handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx) mq_stream_buf_free (buf_info); + GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret; return GST_PAD_PROBE_PASS; beach: GST_SPLITMUX_UNLOCK (splitmux); + GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret; return GST_PAD_PROBE_DROP; } @@ -1907,7 +1923,7 @@ _send_event (const GValue * value, gpointer user_data) * reaches EOS and it is time to restart * a new fragment */ -static void +static GstFlowReturn start_next_fragment (GstSplitMuxSink * splitmux, MqStreamCtx * ctx) { GstElement *muxer, *sink; @@ -1928,8 +1944,9 @@ start_next_fragment (GstSplitMuxSink * splitmux, MqStreamCtx * ctx) if (splitmux->shutdown) { GST_DEBUG_OBJECT (splitmux, "Shutdown requested. Aborting fragment switch."); + GST_SPLITMUX_LOCK (splitmux); GST_SPLITMUX_STATE_UNLOCK (splitmux); - return; + return GST_FLOW_FLUSHING; } if (splitmux->async_finalize) { @@ -2032,8 +2049,24 @@ start_next_fragment (GstSplitMuxSink * splitmux, MqStreamCtx * ctx) splitmux->muxed_out_bytes = 0; GST_SPLITMUX_UNLOCK (splitmux); - gst_element_set_state (sink, GST_STATE_TARGET (splitmux)); - gst_element_set_state (muxer, GST_STATE_TARGET (splitmux)); + if (gst_element_set_state (sink, + GST_STATE_TARGET (splitmux)) == GST_STATE_CHANGE_FAILURE) { + gst_element_set_state (sink, GST_STATE_NULL); + gst_element_set_locked_state (muxer, FALSE); + gst_element_set_locked_state (sink, FALSE); + + goto fail_output; + } + + if (gst_element_set_state (muxer, + GST_STATE_TARGET (splitmux)) == GST_STATE_CHANGE_FAILURE) { + gst_element_set_state (muxer, GST_STATE_NULL); + gst_element_set_state (sink, GST_STATE_NULL); + gst_element_set_locked_state (muxer, FALSE); + gst_element_set_locked_state (sink, FALSE); + goto fail_muxer; + } + gst_element_set_locked_state (muxer, FALSE); gst_element_set_locked_state (sink, FALSE); @@ -2056,12 +2089,32 @@ start_next_fragment (GstSplitMuxSink * splitmux, MqStreamCtx * ctx) GST_LOG_OBJECT (splitmux, "Resetting state to AWAITING_COMMAND"); splitmux->output_state = SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND; GST_SPLITMUX_BROADCAST_OUTPUT (splitmux); - return; + return GST_FLOW_OK; fail: + GST_SPLITMUX_LOCK (splitmux); GST_SPLITMUX_STATE_UNLOCK (splitmux); GST_ELEMENT_ERROR (splitmux, RESOURCE, SETTINGS, ("Could not create the new muxer/sink"), NULL); + return GST_FLOW_ERROR; + +fail_output: + GST_ELEMENT_ERROR (splitmux, RESOURCE, SETTINGS, + ("Could not start new output sink"), NULL); + + GST_SPLITMUX_LOCK (splitmux); + GST_SPLITMUX_STATE_UNLOCK (splitmux); + splitmux->switching_fragment = FALSE; + return GST_FLOW_ERROR; + +fail_muxer: + GST_ELEMENT_ERROR (splitmux, RESOURCE, SETTINGS, + ("Could not start new muxer"), NULL); + + GST_SPLITMUX_LOCK (splitmux); + GST_SPLITMUX_STATE_UNLOCK (splitmux); + splitmux->switching_fragment = FALSE; + return GST_FLOW_ERROR; } static void @@ -2578,6 +2631,7 @@ static GstPadProbeReturn handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx) { GstSplitMuxSink *splitmux = ctx->splitmux; + GstFlowReturn ret = GST_FLOW_OK; GstBuffer *buf; MqStreamBuf *buf_info = NULL; GstClockTime ts; @@ -2612,8 +2666,10 @@ handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx) GST_SPLITMUX_LOCK (splitmux); ctx->in_eos = TRUE; - if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED) + if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED) { + ret = GST_FLOW_FLUSHING; goto beach; + } if (ctx->is_reference) { GST_INFO_OBJECT (splitmux, "Got Reference EOS. Finishing up"); @@ -2642,8 +2698,10 @@ handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx) GST_SPLITMUX_LOCK (splitmux); - if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED) + if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED) { + ret = GST_FLOW_FLUSHING; goto beach; + } rtime = my_segment_to_running_time (&ctx->in_segment, gap_ts); GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_STIME_FORMAT, @@ -2688,8 +2746,10 @@ handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx) GST_SPLITMUX_LOCK (splitmux); - if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED) + if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED) { + ret = GST_FLOW_FLUSHING; goto beach; + } /* If this buffer has a timestamp, advance the input timestamp of the * stream */ @@ -2875,12 +2935,14 @@ handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx) " run ts %" GST_STIME_FORMAT, buf, GST_STIME_ARGS (ctx->in_running_time)); GST_SPLITMUX_UNLOCK (splitmux); + GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret; return GST_PAD_PROBE_PASS; beach: GST_SPLITMUX_UNLOCK (splitmux); if (buf_info) mq_stream_buf_free (buf_info); + GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret; return GST_PAD_PROBE_PASS; } @@ -3625,9 +3687,10 @@ gst_splitmux_sink_change_state (GstElement * element, GstStateChange transition) break; } case GST_STATE_CHANGE_PAUSED_TO_READY: + case GST_STATE_CHANGE_READY_TO_READY: g_atomic_int_set (&(splitmux->split_requested), FALSE); g_atomic_int_set (&(splitmux->do_split_next_gop), FALSE); - + /* Fall through */ case GST_STATE_CHANGE_READY_TO_NULL: GST_SPLITMUX_STATE_LOCK (splitmux); splitmux->shutdown = TRUE; @@ -3682,15 +3745,24 @@ gst_splitmux_sink_change_state (GstElement * element, GstStateChange transition) break; } + return ret; + beach: - if (transition == GST_STATE_CHANGE_NULL_TO_READY && - ret == GST_STATE_CHANGE_FAILURE) { + if (transition == GST_STATE_CHANGE_NULL_TO_READY) { /* Cleanup elements on failed transition out of NULL */ gst_splitmux_reset_elements (splitmux); GST_SPLITMUX_LOCK (splitmux); do_async_done (splitmux); GST_SPLITMUX_UNLOCK (splitmux); } + if (transition == GST_STATE_CHANGE_READY_TO_READY) { + /* READY to READY transition only happens when we're already + * in READY state, but a child element is in NULL, which + * happens when there's an error changing the state of the sink. + * We need to make sure not to fail the state transition, or + * the core won't transition us back to NULL successfully */ + ret = GST_STATE_CHANGE_SUCCESS; + } return ret; } diff --git a/tests/check/elements/splitmuxsink.c b/tests/check/elements/splitmuxsink.c index 9db9b91cf2..93c86e7486 100644 --- a/tests/check/elements/splitmuxsink.c +++ b/tests/check/elements/splitmuxsink.c @@ -433,6 +433,43 @@ GST_START_TEST (test_splitmuxsink) GST_END_TEST; +GST_START_TEST (test_splitmuxsink_clean_failure) +{ + GstMessage *msg; + GstElement *pipeline; + GstElement *sink, *fakesink; + + /* This pipeline has a small time cutoff - it should start a new file + * every GOP, ie 1 second */ + pipeline = + gst_parse_launch + ("videotestsrc horizontal-speed=2 is-live=true ! video/x-raw,width=80,height=64,framerate=5/1 ! videoconvert !" + " queue ! theoraenc keyframe-force=5 ! splitmuxsink name=splitsink " + " max-size-time=1000000 max-size-bytes=1000000 muxer=oggmux", NULL); + fail_if (pipeline == NULL); + sink = gst_bin_get_by_name (GST_BIN (pipeline), "splitsink"); + fail_if (sink == NULL); + + fakesink = gst_element_factory_make ("fakesink", "fakesink-fail"); + fail_if (fakesink == NULL); + + /* Trigger an error on READY->PAUSED */ + g_object_set (fakesink, "state-error", 2, NULL); + g_object_set (sink, "sink", fakesink, NULL); + gst_object_unref (sink); + + msg = run_pipeline (pipeline); + + fail_unless (GST_MESSAGE_TYPE (msg) == GST_MESSAGE_ERROR); + gst_message_unref (msg); + + fail_unless (gst_element_set_state (pipeline, + GST_STATE_NULL) == GST_STATE_CHANGE_SUCCESS); + gst_object_unref (pipeline); +} + +GST_END_TEST; + GST_START_TEST (test_splitmuxsink_multivid) { GstMessage *msg; @@ -807,6 +844,7 @@ splitmuxsink_suite (void) tcase_add_checked_fixture (tc_chain, tempdir_setup, tempdir_cleanup); tcase_add_test (tc_chain, test_splitmuxsink); + tcase_add_test (tc_chain, test_splitmuxsink_clean_failure); if (have_matroska && have_vorbis) { tcase_add_checked_fixture (tc_chain_complex, tempdir_setup,