From a89f33c86c8ba6731a8177721de90b6e07dbc25c Mon Sep 17 00:00:00 2001 From: Seungha Yang Date: Sat, 2 Dec 2023 00:32:31 +0900 Subject: [PATCH] appsrc: Fix flow return when buffer is dropped Flow EOS on buffer drop (upstream leaky mode) was not intended behavior. Appsrc should return OK instead. Part-of: --- .../gst-libs/gst/app/gstappsrc.c | 2 +- .../tests/check/elements/appsrc.c | 46 +++++++++++++------ 2 files changed, 32 insertions(+), 16 deletions(-) diff --git a/subprojects/gst-plugins-base/gst-libs/gst/app/gstappsrc.c b/subprojects/gst-plugins-base/gst-libs/gst/app/gstappsrc.c index 913fd40af5..8577f61a6c 100644 --- a/subprojects/gst-plugins-base/gst-libs/gst/app/gstappsrc.c +++ b/subprojects/gst-plugins-base/gst-libs/gst/app/gstappsrc.c @@ -2732,7 +2732,7 @@ dropped: gst_buffer_unref (buffer); } g_mutex_unlock (&priv->mutex); - return GST_FLOW_EOS; + return GST_FLOW_OK; } } diff --git a/subprojects/gst-plugins-base/tests/check/elements/appsrc.c b/subprojects/gst-plugins-base/tests/check/elements/appsrc.c index cb306fb36e..a19b0f9f71 100644 --- a/subprojects/gst-plugins-base/tests/check/elements/appsrc.c +++ b/subprojects/gst-plugins-base/tests/check/elements/appsrc.c @@ -1075,6 +1075,7 @@ GST_START_TEST (test_appsrc_limits) GstBuffer *buffer; gulong probe_id; guint64 current_level; + GstFlowReturn ret; /* Test if the bytes limit works correctly with both leaky types */ h = gst_harness_new ("appsrc"); @@ -1096,7 +1097,8 @@ GST_START_TEST (test_appsrc_limits) buffer = gst_buffer_new_and_alloc (100); GST_BUFFER_PTS (buffer) = 0 * GST_SECOND; - gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer); + ret = gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer); + fail_unless_equals_int (ret, GST_FLOW_OK); /* wait until the appsrc is blocked downstream */ while (!gst_pad_is_blocking (srcpad)) @@ -1104,10 +1106,12 @@ GST_START_TEST (test_appsrc_limits) buffer = gst_buffer_new_and_alloc (100); GST_BUFFER_PTS (buffer) = 1 * GST_SECOND; - gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer); + ret = gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer); + fail_unless_equals_int (ret, GST_FLOW_OK); buffer = gst_buffer_new_and_alloc (100); GST_BUFFER_PTS (buffer) = 2 * GST_SECOND; - gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer); + ret = gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer); + fail_unless_equals_int (ret, GST_FLOW_OK); /* The first buffer is not queued anymore but inside the pad probe */ g_object_get (h->element, "current-level-bytes", ¤t_level, NULL); @@ -1119,7 +1123,8 @@ GST_START_TEST (test_appsrc_limits) buffer = gst_buffer_new_and_alloc (100); GST_BUFFER_PTS (buffer) = 4 * GST_SECOND; - gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer); + ret = gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer); + fail_unless_equals_int (ret, GST_FLOW_OK); /* The new buffer was dropped now, otherwise we would have 2 seconds queued */ g_object_get (h->element, "current-level-bytes", ¤t_level, NULL); @@ -1133,7 +1138,8 @@ GST_START_TEST (test_appsrc_limits) buffer = gst_buffer_new_and_alloc (100); GST_BUFFER_PTS (buffer) = 4 * GST_SECOND; - gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer); + ret = gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer); + fail_unless_equals_int (ret, GST_FLOW_OK); /* The oldest buffer was dropped now, otherwise we would have only 1 second queued */ g_object_get (h->element, "current-level-bytes", ¤t_level, NULL); @@ -1191,7 +1197,8 @@ GST_START_TEST (test_appsrc_limits) buffer = gst_buffer_new_and_alloc (100); GST_BUFFER_PTS (buffer) = 0 * GST_SECOND; - gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer); + ret = gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer); + fail_unless_equals_int (ret, GST_FLOW_OK); /* wait until the appsrc is blocked downstream */ while (!gst_pad_is_blocking (srcpad)) @@ -1199,10 +1206,12 @@ GST_START_TEST (test_appsrc_limits) buffer = gst_buffer_new_and_alloc (100); GST_BUFFER_PTS (buffer) = 1 * GST_SECOND; - gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer); + ret = gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer); + fail_unless_equals_int (ret, GST_FLOW_OK); buffer = gst_buffer_new_and_alloc (100); GST_BUFFER_PTS (buffer) = 2 * GST_SECOND; - gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer); + ret = gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer); + fail_unless_equals_int (ret, GST_FLOW_OK); /* The first buffer is not queued anymore but inside the pad probe */ g_object_get (h->element, "current-level-bytes", ¤t_level, NULL); @@ -1214,7 +1223,8 @@ GST_START_TEST (test_appsrc_limits) buffer = gst_buffer_new_and_alloc (100); GST_BUFFER_PTS (buffer) = 4 * GST_SECOND; - gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer); + ret = gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer); + fail_unless_equals_int (ret, GST_FLOW_OK); /* The new buffer was dropped now, otherwise we would have 2 seconds queued */ g_object_get (h->element, "current-level-bytes", ¤t_level, NULL); @@ -1228,7 +1238,8 @@ GST_START_TEST (test_appsrc_limits) buffer = gst_buffer_new_and_alloc (100); GST_BUFFER_PTS (buffer) = 4 * GST_SECOND; - gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer); + ret = gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer); + fail_unless_equals_int (ret, GST_FLOW_OK); /* The oldest buffer was dropped now, otherwise we would have only 1 second queued */ g_object_get (h->element, "current-level-bytes", ¤t_level, NULL); @@ -1287,7 +1298,8 @@ GST_START_TEST (test_appsrc_limits) buffer = gst_buffer_new_and_alloc (100); GST_BUFFER_PTS (buffer) = 0 * GST_SECOND; GST_BUFFER_DURATION (buffer) = GST_SECOND; - gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer); + ret = gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer); + fail_unless_equals_int (ret, GST_FLOW_OK); /* wait until the appsrc is blocked downstream */ while (!gst_pad_is_blocking (srcpad)) @@ -1296,11 +1308,13 @@ GST_START_TEST (test_appsrc_limits) buffer = gst_buffer_new_and_alloc (100); GST_BUFFER_PTS (buffer) = 1 * GST_SECOND; GST_BUFFER_DURATION (buffer) = GST_SECOND; - gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer); + ret = gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer); + fail_unless_equals_int (ret, GST_FLOW_OK); buffer = gst_buffer_new_and_alloc (100); GST_BUFFER_PTS (buffer) = 2 * GST_SECOND; GST_BUFFER_DURATION (buffer) = GST_SECOND; - gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer); + ret = gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer); + fail_unless_equals_int (ret, GST_FLOW_OK); /* The first buffer is not queued anymore but inside the pad probe */ g_object_get (h->element, "current-level-bytes", ¤t_level, NULL); @@ -1313,7 +1327,8 @@ GST_START_TEST (test_appsrc_limits) buffer = gst_buffer_new_and_alloc (100); GST_BUFFER_PTS (buffer) = 4 * GST_SECOND; GST_BUFFER_DURATION (buffer) = GST_SECOND; - gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer); + ret = gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer); + fail_unless_equals_int (ret, GST_FLOW_OK); /* The new buffer was dropped now, otherwise we would have more than 2 seconds queued */ g_object_get (h->element, "current-level-bytes", ¤t_level, NULL); @@ -1328,7 +1343,8 @@ GST_START_TEST (test_appsrc_limits) buffer = gst_buffer_new_and_alloc (100); GST_BUFFER_PTS (buffer) = 4 * GST_SECOND; GST_BUFFER_DURATION (buffer) = GST_SECOND; - gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer); + ret = gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer); + fail_unless_equals_int (ret, GST_FLOW_OK); /* The oldest buffer was dropped now, otherwise we would have only 1 second queued */ g_object_get (h->element, "current-level-bytes", ¤t_level, NULL);