From 8252ca0ef2f9e43070f791f6f9461b6146f529cc Mon Sep 17 00:00:00 2001 From: Seungha Yang Date: Wed, 12 Jul 2023 02:43:18 +0900 Subject: [PATCH] queue, queue2: Flush internal queue on flow error This is to fix an infinitely blocked upstream streaming thread if * upstream has fixed-size buffer pool, some H/W decoders for example * downstream returned flow error without releasing buffer When the fixed-size buffer pool hits its configured max-buffers and also downstream of queue returned flow error without releasing corresponding buffer, upstream has no chance to run the next processing loop because it will be blocked by acquire_buffer(), and therefore downstream flow will not be propagated to upstream. Part-of: --- .../gstreamer/plugins/elements/gstqueue.c | 6 +- .../gstreamer/plugins/elements/gstqueue2.c | 6 +- .../gstreamer/tests/check/elements/queue.c | 150 ++++++++++++++++++ .../gstreamer/tests/check/elements/queue2.c | 150 ++++++++++++++++++ 4 files changed, 310 insertions(+), 2 deletions(-) diff --git a/subprojects/gstreamer/plugins/elements/gstqueue.c b/subprojects/gstreamer/plugins/elements/gstqueue.c index 6955d63cf5..a154c7842a 100644 --- a/subprojects/gstreamer/plugins/elements/gstqueue.c +++ b/subprojects/gstreamer/plugins/elements/gstqueue.c @@ -1556,7 +1556,11 @@ out_flushing: gst_pad_pause_task (queue->srcpad); GST_CAT_LOG_OBJECT (queue_dataflow, queue, "pause task, reason: %s", gst_flow_get_name (ret)); - if (ret == GST_FLOW_FLUSHING) { + + /* flush internal queue except for not-linked and eos + * not-linked: reconfigure event will start srcpad task + * eos: stream-start can clear eos and will start srcpad task again */ + if (ret != GST_FLOW_NOT_LINKED && ret != GST_FLOW_EOS) { gst_queue_locked_flush (queue, FALSE); } else { GST_QUEUE_SIGNAL_DEL (queue); diff --git a/subprojects/gstreamer/plugins/elements/gstqueue2.c b/subprojects/gstreamer/plugins/elements/gstqueue2.c index 50068ab8c7..19488e664c 100644 --- a/subprojects/gstreamer/plugins/elements/gstqueue2.c +++ b/subprojects/gstreamer/plugins/elements/gstqueue2.c @@ -3235,7 +3235,11 @@ out_flushing: GstFlowReturn ret = queue->srcresult; gst_pad_pause_task (queue->srcpad); - if (ret == GST_FLOW_FLUSHING) { + + /* flush internal queue except for not-linked and eos + * not-linked: reconfigure event will start srcpad task + * eos: stream-start can clear eos and will start srcpad task again */ + if (ret != GST_FLOW_NOT_LINKED && ret != GST_FLOW_EOS) { gst_queue2_locked_flush (queue, FALSE, FALSE); } else { GST_QUEUE2_SIGNAL_DEL (queue); diff --git a/subprojects/gstreamer/tests/check/elements/queue.c b/subprojects/gstreamer/tests/check/elements/queue.c index 9663a68639..c2bba73d91 100644 --- a/subprojects/gstreamer/tests/check/elements/queue.c +++ b/subprojects/gstreamer/tests/check/elements/queue.c @@ -1181,6 +1181,155 @@ GST_START_TEST (test_initial_events_nodelay) GST_END_TEST; +typedef struct +{ + GstBuffer *buffer; + GMutex lock; + GCond cond; + gboolean blocked; +} FlushOnErrorData; + +static GstPadProbeReturn +flush_on_error_block_probe (GstPad * pad, GstPadProbeInfo * info, + FlushOnErrorData * data) +{ + g_mutex_lock (&data->lock); + data->blocked = TRUE; + g_cond_signal (&data->cond); + g_mutex_unlock (&data->lock); + + return GST_PAD_PROBE_OK; +} + +static GstPadProbeReturn +flush_on_error_probe (GstPad * pad, GstPadProbeInfo * info, + FlushOnErrorData * data) +{ + if (GST_IS_EVENT (GST_PAD_PROBE_INFO_DATA (info))) + return GST_PAD_PROBE_DROP; + + g_mutex_lock (&data->lock); + data->buffer = GST_PAD_PROBE_INFO_BUFFER (info); + g_cond_signal (&data->cond); + g_mutex_unlock (&data->lock); + + GST_PAD_PROBE_INFO_FLOW_RETURN (info) = GST_FLOW_ERROR; + + return GST_PAD_PROBE_HANDLED; +} + +static gpointer +alloc_thread (GstBufferPool * pool) +{ + GstFlowReturn ret; + GstBuffer *buf; + + /* This call will be blocked */ + ret = gst_buffer_pool_acquire_buffer (pool, &buf, NULL); + fail_unless (ret == GST_FLOW_OK); + + gst_buffer_unref (buf); + + return NULL; +} + +GST_START_TEST (test_flush_on_error) +{ + GstElement *elem; + GstPad *sinkpad; + GstPad *srcpad; + GstSegment segment; + GstCaps *caps; + gboolean ret; + gulong block_id; + FlushOnErrorData data; + GstBufferPool *pool; + GstStructure *config; + GstBuffer *buf; + GstFlowReturn flow_ret; + GThread *thread; + + data.buffer = NULL; + data.blocked = FALSE; + g_mutex_init (&data.lock); + g_cond_init (&data.cond); + + /* Setup bufferpool with max-buffers 2 */ + caps = gst_caps_new_empty_simple ("foo/x-bar"); + pool = gst_buffer_pool_new (); + config = gst_buffer_pool_get_config (pool); + gst_buffer_pool_config_set_params (config, caps, 4, 0, 2); + gst_buffer_pool_set_config (pool, config); + gst_buffer_pool_set_active (pool, TRUE); + + elem = gst_element_factory_make ("queue", NULL); + gst_object_ref_sink (elem); + sinkpad = gst_element_get_static_pad (elem, "sink"); + srcpad = gst_element_get_static_pad (elem, "src"); + + block_id = gst_pad_add_probe (srcpad, + GST_PAD_PROBE_TYPE_BLOCK | GST_PAD_PROBE_TYPE_BUFFER, + (GstPadProbeCallback) flush_on_error_block_probe, &data, NULL); + gst_pad_add_probe (srcpad, + GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM | GST_PAD_PROBE_TYPE_BUFFER, + (GstPadProbeCallback) flush_on_error_probe, &data, NULL); + + fail_unless (gst_element_set_state (elem, + GST_STATE_PLAYING) == GST_STATE_CHANGE_SUCCESS, + "could not set to playing"); + + ret = gst_pad_send_event (sinkpad, + gst_event_new_stream_start ("test-stream-start")); + fail_unless (ret); + + ret = gst_pad_send_event (sinkpad, gst_event_new_caps (caps)); + gst_caps_unref (caps); + fail_unless (ret); + + gst_segment_init (&segment, GST_FORMAT_TIME); + ret = gst_pad_send_event (sinkpad, gst_event_new_segment (&segment)); + fail_unless (ret); + + flow_ret = gst_buffer_pool_acquire_buffer (pool, &buf, NULL); + fail_unless (flow_ret == GST_FLOW_OK); + GST_BUFFER_PTS (buf) = 0; + flow_ret = gst_pad_chain (sinkpad, buf); + fail_unless (flow_ret == GST_FLOW_OK); + + flow_ret = gst_buffer_pool_acquire_buffer (pool, &buf, NULL); + fail_unless (flow_ret == GST_FLOW_OK); + GST_BUFFER_PTS (buf) = GST_SECOND; + flow_ret = gst_pad_chain (sinkpad, buf); + fail_unless (flow_ret == GST_FLOW_OK); + + /* Acquire buffer from other thread. The acquire_buffer() will be blocked + * due to max-buffers 2 */ + thread = g_thread_new (NULL, (GThreadFunc) alloc_thread, pool); + + g_mutex_lock (&data.lock); + while (!data.blocked) + g_cond_wait (&data.cond, &data.lock); + g_mutex_unlock (&data.lock); + + gst_pad_remove_probe (srcpad, block_id); + + /* Then now acquire thread can be unblocked since queue will flush + * internal queue on flow error */ + g_thread_join (thread); + + gst_element_set_state (elem, GST_STATE_NULL); + gst_clear_buffer (&data.buffer); + gst_buffer_pool_set_active (pool, FALSE); + gst_object_unref (pool); + gst_object_unref (sinkpad); + gst_object_unref (srcpad); + gst_object_unref (elem); + g_mutex_clear (&data.lock); + g_cond_clear (&data.cond); +} + +GST_END_TEST; + static Suite * queue_suite (void) { @@ -1204,6 +1353,7 @@ queue_suite (void) tcase_add_test (tc_chain, test_sticky_not_linked); tcase_add_test (tc_chain, test_time_level_buffer_list); tcase_add_test (tc_chain, test_initial_events_nodelay); + tcase_add_test (tc_chain, test_flush_on_error); return s; } diff --git a/subprojects/gstreamer/tests/check/elements/queue2.c b/subprojects/gstreamer/tests/check/elements/queue2.c index c27e973073..e05ec35ddd 100644 --- a/subprojects/gstreamer/tests/check/elements/queue2.c +++ b/subprojects/gstreamer/tests/check/elements/queue2.c @@ -715,6 +715,155 @@ GST_START_TEST (test_ready_paused_buffering_message) GST_END_TEST; +typedef struct +{ + GstBuffer *buffer; + GMutex lock; + GCond cond; + gboolean blocked; +} FlushOnErrorData; + +static GstPadProbeReturn +flush_on_error_block_probe (GstPad * pad, GstPadProbeInfo * info, + FlushOnErrorData * data) +{ + g_mutex_lock (&data->lock); + data->blocked = TRUE; + g_cond_signal (&data->cond); + g_mutex_unlock (&data->lock); + + return GST_PAD_PROBE_OK; +} + +static GstPadProbeReturn +flush_on_error_probe (GstPad * pad, GstPadProbeInfo * info, + FlushOnErrorData * data) +{ + if (GST_IS_EVENT (GST_PAD_PROBE_INFO_DATA (info))) + return GST_PAD_PROBE_DROP; + + g_mutex_lock (&data->lock); + data->buffer = GST_PAD_PROBE_INFO_BUFFER (info); + g_cond_signal (&data->cond); + g_mutex_unlock (&data->lock); + + GST_PAD_PROBE_INFO_FLOW_RETURN (info) = GST_FLOW_ERROR; + + return GST_PAD_PROBE_HANDLED; +} + +static gpointer +alloc_thread (GstBufferPool * pool) +{ + GstFlowReturn ret; + GstBuffer *buf; + + /* This call will be blocked */ + ret = gst_buffer_pool_acquire_buffer (pool, &buf, NULL); + fail_unless (ret == GST_FLOW_OK); + + gst_buffer_unref (buf); + + return NULL; +} + +GST_START_TEST (test_flush_on_error) +{ + GstElement *elem; + GstPad *sinkpad; + GstPad *srcpad; + GstSegment segment; + GstCaps *caps; + gboolean ret; + gulong block_id; + FlushOnErrorData data; + GstBufferPool *pool; + GstStructure *config; + GstBuffer *buf; + GstFlowReturn flow_ret; + GThread *thread; + + data.buffer = NULL; + data.blocked = FALSE; + g_mutex_init (&data.lock); + g_cond_init (&data.cond); + + /* Setup bufferpool with max-buffers 2 */ + caps = gst_caps_new_empty_simple ("foo/x-bar"); + pool = gst_buffer_pool_new (); + config = gst_buffer_pool_get_config (pool); + gst_buffer_pool_config_set_params (config, caps, 4, 0, 2); + gst_buffer_pool_set_config (pool, config); + gst_buffer_pool_set_active (pool, TRUE); + + elem = gst_element_factory_make ("queue2", NULL); + gst_object_ref_sink (elem); + sinkpad = gst_element_get_static_pad (elem, "sink"); + srcpad = gst_element_get_static_pad (elem, "src"); + + block_id = gst_pad_add_probe (srcpad, + GST_PAD_PROBE_TYPE_BLOCK | GST_PAD_PROBE_TYPE_BUFFER, + (GstPadProbeCallback) flush_on_error_block_probe, &data, NULL); + gst_pad_add_probe (srcpad, + GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM | GST_PAD_PROBE_TYPE_BUFFER, + (GstPadProbeCallback) flush_on_error_probe, &data, NULL); + + fail_unless (gst_element_set_state (elem, + GST_STATE_PLAYING) == GST_STATE_CHANGE_SUCCESS, + "could not set to playing"); + + ret = gst_pad_send_event (sinkpad, + gst_event_new_stream_start ("test-stream-start")); + fail_unless (ret); + + ret = gst_pad_send_event (sinkpad, gst_event_new_caps (caps)); + gst_caps_unref (caps); + fail_unless (ret); + + gst_segment_init (&segment, GST_FORMAT_TIME); + ret = gst_pad_send_event (sinkpad, gst_event_new_segment (&segment)); + fail_unless (ret); + + flow_ret = gst_buffer_pool_acquire_buffer (pool, &buf, NULL); + fail_unless (flow_ret == GST_FLOW_OK); + GST_BUFFER_PTS (buf) = 0; + flow_ret = gst_pad_chain (sinkpad, buf); + fail_unless (flow_ret == GST_FLOW_OK); + + flow_ret = gst_buffer_pool_acquire_buffer (pool, &buf, NULL); + fail_unless (flow_ret == GST_FLOW_OK); + GST_BUFFER_PTS (buf) = GST_SECOND; + flow_ret = gst_pad_chain (sinkpad, buf); + fail_unless (flow_ret == GST_FLOW_OK); + + /* Acquire buffer from other thread. The acquire_buffer() will be blocked + * due to max-buffers 2 */ + thread = g_thread_new (NULL, (GThreadFunc) alloc_thread, pool); + + g_mutex_lock (&data.lock); + while (!data.blocked) + g_cond_wait (&data.cond, &data.lock); + g_mutex_unlock (&data.lock); + + gst_pad_remove_probe (srcpad, block_id); + + /* Then now acquire thread can be unblocked since queue will flush + * internal queue on flow error */ + g_thread_join (thread); + + gst_element_set_state (elem, GST_STATE_NULL); + gst_clear_buffer (&data.buffer); + gst_buffer_pool_set_active (pool, FALSE); + gst_object_unref (pool); + gst_object_unref (sinkpad); + gst_object_unref (srcpad); + gst_object_unref (elem); + g_mutex_clear (&data.lock); + g_cond_clear (&data.cond); +} + +GST_END_TEST; + static Suite * queue2_suite (void) { @@ -733,6 +882,7 @@ queue2_suite (void) tcase_add_test (tc_chain, test_small_ring_buffer); tcase_add_test (tc_chain, test_bitrate_query); tcase_add_test (tc_chain, test_ready_paused_buffering_message); + tcase_add_test (tc_chain, test_flush_on_error); return s; }