queue, queue2: Flush internal queue on flow error

This is to fix an infinitely blocked upstream streaming thread if
* upstream has fixed-size buffer pool, some H/W decoders for example
* downstream returned flow error without releasing buffer

When the fixed-size buffer pool hits its configured max-buffers and
also downstream of queue returned flow error without releasing corresponding
buffer, upstream has no chance to run the next processing loop
because it will be blocked by acquire_buffer(), and therefore
downstream flow will not be propagated to upstream.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/5023>
This commit is contained in:
Seungha Yang 2023-07-12 02:43:18 +09:00 committed by GStreamer Marge Bot
parent 87c3dffc51
commit 8252ca0ef2
4 changed files with 310 additions and 2 deletions

View file

@ -1556,7 +1556,11 @@ out_flushing:
gst_pad_pause_task (queue->srcpad); gst_pad_pause_task (queue->srcpad);
GST_CAT_LOG_OBJECT (queue_dataflow, queue, GST_CAT_LOG_OBJECT (queue_dataflow, queue,
"pause task, reason: %s", gst_flow_get_name (ret)); "pause task, reason: %s", gst_flow_get_name (ret));
if (ret == GST_FLOW_FLUSHING) {
/* flush internal queue except for not-linked and eos
* not-linked: reconfigure event will start srcpad task
* eos: stream-start can clear eos and will start srcpad task again */
if (ret != GST_FLOW_NOT_LINKED && ret != GST_FLOW_EOS) {
gst_queue_locked_flush (queue, FALSE); gst_queue_locked_flush (queue, FALSE);
} else { } else {
GST_QUEUE_SIGNAL_DEL (queue); GST_QUEUE_SIGNAL_DEL (queue);

View file

@ -3235,7 +3235,11 @@ out_flushing:
GstFlowReturn ret = queue->srcresult; GstFlowReturn ret = queue->srcresult;
gst_pad_pause_task (queue->srcpad); gst_pad_pause_task (queue->srcpad);
if (ret == GST_FLOW_FLUSHING) {
/* flush internal queue except for not-linked and eos
* not-linked: reconfigure event will start srcpad task
* eos: stream-start can clear eos and will start srcpad task again */
if (ret != GST_FLOW_NOT_LINKED && ret != GST_FLOW_EOS) {
gst_queue2_locked_flush (queue, FALSE, FALSE); gst_queue2_locked_flush (queue, FALSE, FALSE);
} else { } else {
GST_QUEUE2_SIGNAL_DEL (queue); GST_QUEUE2_SIGNAL_DEL (queue);

View file

@ -1181,6 +1181,155 @@ GST_START_TEST (test_initial_events_nodelay)
GST_END_TEST; GST_END_TEST;
typedef struct
{
GstBuffer *buffer;
GMutex lock;
GCond cond;
gboolean blocked;
} FlushOnErrorData;
static GstPadProbeReturn
flush_on_error_block_probe (GstPad * pad, GstPadProbeInfo * info,
FlushOnErrorData * data)
{
g_mutex_lock (&data->lock);
data->blocked = TRUE;
g_cond_signal (&data->cond);
g_mutex_unlock (&data->lock);
return GST_PAD_PROBE_OK;
}
static GstPadProbeReturn
flush_on_error_probe (GstPad * pad, GstPadProbeInfo * info,
FlushOnErrorData * data)
{
if (GST_IS_EVENT (GST_PAD_PROBE_INFO_DATA (info)))
return GST_PAD_PROBE_DROP;
g_mutex_lock (&data->lock);
data->buffer = GST_PAD_PROBE_INFO_BUFFER (info);
g_cond_signal (&data->cond);
g_mutex_unlock (&data->lock);
GST_PAD_PROBE_INFO_FLOW_RETURN (info) = GST_FLOW_ERROR;
return GST_PAD_PROBE_HANDLED;
}
static gpointer
alloc_thread (GstBufferPool * pool)
{
GstFlowReturn ret;
GstBuffer *buf;
/* This call will be blocked */
ret = gst_buffer_pool_acquire_buffer (pool, &buf, NULL);
fail_unless (ret == GST_FLOW_OK);
gst_buffer_unref (buf);
return NULL;
}
GST_START_TEST (test_flush_on_error)
{
GstElement *elem;
GstPad *sinkpad;
GstPad *srcpad;
GstSegment segment;
GstCaps *caps;
gboolean ret;
gulong block_id;
FlushOnErrorData data;
GstBufferPool *pool;
GstStructure *config;
GstBuffer *buf;
GstFlowReturn flow_ret;
GThread *thread;
data.buffer = NULL;
data.blocked = FALSE;
g_mutex_init (&data.lock);
g_cond_init (&data.cond);
/* Setup bufferpool with max-buffers 2 */
caps = gst_caps_new_empty_simple ("foo/x-bar");
pool = gst_buffer_pool_new ();
config = gst_buffer_pool_get_config (pool);
gst_buffer_pool_config_set_params (config, caps, 4, 0, 2);
gst_buffer_pool_set_config (pool, config);
gst_buffer_pool_set_active (pool, TRUE);
elem = gst_element_factory_make ("queue", NULL);
gst_object_ref_sink (elem);
sinkpad = gst_element_get_static_pad (elem, "sink");
srcpad = gst_element_get_static_pad (elem, "src");
block_id = gst_pad_add_probe (srcpad,
GST_PAD_PROBE_TYPE_BLOCK | GST_PAD_PROBE_TYPE_BUFFER,
(GstPadProbeCallback) flush_on_error_block_probe, &data, NULL);
gst_pad_add_probe (srcpad,
GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM | GST_PAD_PROBE_TYPE_BUFFER,
(GstPadProbeCallback) flush_on_error_probe, &data, NULL);
fail_unless (gst_element_set_state (elem,
GST_STATE_PLAYING) == GST_STATE_CHANGE_SUCCESS,
"could not set to playing");
ret = gst_pad_send_event (sinkpad,
gst_event_new_stream_start ("test-stream-start"));
fail_unless (ret);
ret = gst_pad_send_event (sinkpad, gst_event_new_caps (caps));
gst_caps_unref (caps);
fail_unless (ret);
gst_segment_init (&segment, GST_FORMAT_TIME);
ret = gst_pad_send_event (sinkpad, gst_event_new_segment (&segment));
fail_unless (ret);
flow_ret = gst_buffer_pool_acquire_buffer (pool, &buf, NULL);
fail_unless (flow_ret == GST_FLOW_OK);
GST_BUFFER_PTS (buf) = 0;
flow_ret = gst_pad_chain (sinkpad, buf);
fail_unless (flow_ret == GST_FLOW_OK);
flow_ret = gst_buffer_pool_acquire_buffer (pool, &buf, NULL);
fail_unless (flow_ret == GST_FLOW_OK);
GST_BUFFER_PTS (buf) = GST_SECOND;
flow_ret = gst_pad_chain (sinkpad, buf);
fail_unless (flow_ret == GST_FLOW_OK);
/* Acquire buffer from other thread. The acquire_buffer() will be blocked
* due to max-buffers 2 */
thread = g_thread_new (NULL, (GThreadFunc) alloc_thread, pool);
g_mutex_lock (&data.lock);
while (!data.blocked)
g_cond_wait (&data.cond, &data.lock);
g_mutex_unlock (&data.lock);
gst_pad_remove_probe (srcpad, block_id);
/* Then now acquire thread can be unblocked since queue will flush
* internal queue on flow error */
g_thread_join (thread);
gst_element_set_state (elem, GST_STATE_NULL);
gst_clear_buffer (&data.buffer);
gst_buffer_pool_set_active (pool, FALSE);
gst_object_unref (pool);
gst_object_unref (sinkpad);
gst_object_unref (srcpad);
gst_object_unref (elem);
g_mutex_clear (&data.lock);
g_cond_clear (&data.cond);
}
GST_END_TEST;
static Suite * static Suite *
queue_suite (void) queue_suite (void)
{ {
@ -1204,6 +1353,7 @@ queue_suite (void)
tcase_add_test (tc_chain, test_sticky_not_linked); tcase_add_test (tc_chain, test_sticky_not_linked);
tcase_add_test (tc_chain, test_time_level_buffer_list); tcase_add_test (tc_chain, test_time_level_buffer_list);
tcase_add_test (tc_chain, test_initial_events_nodelay); tcase_add_test (tc_chain, test_initial_events_nodelay);
tcase_add_test (tc_chain, test_flush_on_error);
return s; return s;
} }

View file

@ -715,6 +715,155 @@ GST_START_TEST (test_ready_paused_buffering_message)
GST_END_TEST; GST_END_TEST;
typedef struct
{
GstBuffer *buffer;
GMutex lock;
GCond cond;
gboolean blocked;
} FlushOnErrorData;
static GstPadProbeReturn
flush_on_error_block_probe (GstPad * pad, GstPadProbeInfo * info,
FlushOnErrorData * data)
{
g_mutex_lock (&data->lock);
data->blocked = TRUE;
g_cond_signal (&data->cond);
g_mutex_unlock (&data->lock);
return GST_PAD_PROBE_OK;
}
static GstPadProbeReturn
flush_on_error_probe (GstPad * pad, GstPadProbeInfo * info,
FlushOnErrorData * data)
{
if (GST_IS_EVENT (GST_PAD_PROBE_INFO_DATA (info)))
return GST_PAD_PROBE_DROP;
g_mutex_lock (&data->lock);
data->buffer = GST_PAD_PROBE_INFO_BUFFER (info);
g_cond_signal (&data->cond);
g_mutex_unlock (&data->lock);
GST_PAD_PROBE_INFO_FLOW_RETURN (info) = GST_FLOW_ERROR;
return GST_PAD_PROBE_HANDLED;
}
static gpointer
alloc_thread (GstBufferPool * pool)
{
GstFlowReturn ret;
GstBuffer *buf;
/* This call will be blocked */
ret = gst_buffer_pool_acquire_buffer (pool, &buf, NULL);
fail_unless (ret == GST_FLOW_OK);
gst_buffer_unref (buf);
return NULL;
}
GST_START_TEST (test_flush_on_error)
{
GstElement *elem;
GstPad *sinkpad;
GstPad *srcpad;
GstSegment segment;
GstCaps *caps;
gboolean ret;
gulong block_id;
FlushOnErrorData data;
GstBufferPool *pool;
GstStructure *config;
GstBuffer *buf;
GstFlowReturn flow_ret;
GThread *thread;
data.buffer = NULL;
data.blocked = FALSE;
g_mutex_init (&data.lock);
g_cond_init (&data.cond);
/* Setup bufferpool with max-buffers 2 */
caps = gst_caps_new_empty_simple ("foo/x-bar");
pool = gst_buffer_pool_new ();
config = gst_buffer_pool_get_config (pool);
gst_buffer_pool_config_set_params (config, caps, 4, 0, 2);
gst_buffer_pool_set_config (pool, config);
gst_buffer_pool_set_active (pool, TRUE);
elem = gst_element_factory_make ("queue2", NULL);
gst_object_ref_sink (elem);
sinkpad = gst_element_get_static_pad (elem, "sink");
srcpad = gst_element_get_static_pad (elem, "src");
block_id = gst_pad_add_probe (srcpad,
GST_PAD_PROBE_TYPE_BLOCK | GST_PAD_PROBE_TYPE_BUFFER,
(GstPadProbeCallback) flush_on_error_block_probe, &data, NULL);
gst_pad_add_probe (srcpad,
GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM | GST_PAD_PROBE_TYPE_BUFFER,
(GstPadProbeCallback) flush_on_error_probe, &data, NULL);
fail_unless (gst_element_set_state (elem,
GST_STATE_PLAYING) == GST_STATE_CHANGE_SUCCESS,
"could not set to playing");
ret = gst_pad_send_event (sinkpad,
gst_event_new_stream_start ("test-stream-start"));
fail_unless (ret);
ret = gst_pad_send_event (sinkpad, gst_event_new_caps (caps));
gst_caps_unref (caps);
fail_unless (ret);
gst_segment_init (&segment, GST_FORMAT_TIME);
ret = gst_pad_send_event (sinkpad, gst_event_new_segment (&segment));
fail_unless (ret);
flow_ret = gst_buffer_pool_acquire_buffer (pool, &buf, NULL);
fail_unless (flow_ret == GST_FLOW_OK);
GST_BUFFER_PTS (buf) = 0;
flow_ret = gst_pad_chain (sinkpad, buf);
fail_unless (flow_ret == GST_FLOW_OK);
flow_ret = gst_buffer_pool_acquire_buffer (pool, &buf, NULL);
fail_unless (flow_ret == GST_FLOW_OK);
GST_BUFFER_PTS (buf) = GST_SECOND;
flow_ret = gst_pad_chain (sinkpad, buf);
fail_unless (flow_ret == GST_FLOW_OK);
/* Acquire buffer from other thread. The acquire_buffer() will be blocked
* due to max-buffers 2 */
thread = g_thread_new (NULL, (GThreadFunc) alloc_thread, pool);
g_mutex_lock (&data.lock);
while (!data.blocked)
g_cond_wait (&data.cond, &data.lock);
g_mutex_unlock (&data.lock);
gst_pad_remove_probe (srcpad, block_id);
/* Then now acquire thread can be unblocked since queue will flush
* internal queue on flow error */
g_thread_join (thread);
gst_element_set_state (elem, GST_STATE_NULL);
gst_clear_buffer (&data.buffer);
gst_buffer_pool_set_active (pool, FALSE);
gst_object_unref (pool);
gst_object_unref (sinkpad);
gst_object_unref (srcpad);
gst_object_unref (elem);
g_mutex_clear (&data.lock);
g_cond_clear (&data.cond);
}
GST_END_TEST;
static Suite * static Suite *
queue2_suite (void) queue2_suite (void)
{ {
@ -733,6 +882,7 @@ queue2_suite (void)
tcase_add_test (tc_chain, test_small_ring_buffer); tcase_add_test (tc_chain, test_small_ring_buffer);
tcase_add_test (tc_chain, test_bitrate_query); tcase_add_test (tc_chain, test_bitrate_query);
tcase_add_test (tc_chain, test_ready_paused_buffering_message); tcase_add_test (tc_chain, test_ready_paused_buffering_message);
tcase_add_test (tc_chain, test_flush_on_error);
return s; return s;
} }