appsrc: Fix flow return when buffer is dropped

Flow EOS on buffer drop (upstream leaky mode) was not
intended behavior. Appsrc should return OK instead.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/5755>
This commit is contained in:
Seungha Yang 2023-12-02 00:32:31 +09:00 committed by Tim-Philipp Müller
parent 4356d4262e
commit a89f33c86c
2 changed files with 32 additions and 16 deletions

View file

@ -2732,7 +2732,7 @@ dropped:
gst_buffer_unref (buffer);
}
g_mutex_unlock (&priv->mutex);
return GST_FLOW_EOS;
return GST_FLOW_OK;
}
}

View file

@ -1075,6 +1075,7 @@ GST_START_TEST (test_appsrc_limits)
GstBuffer *buffer;
gulong probe_id;
guint64 current_level;
GstFlowReturn ret;
/* Test if the bytes limit works correctly with both leaky types */
h = gst_harness_new ("appsrc");
@ -1096,7 +1097,8 @@ GST_START_TEST (test_appsrc_limits)
buffer = gst_buffer_new_and_alloc (100);
GST_BUFFER_PTS (buffer) = 0 * GST_SECOND;
gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer);
ret = gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer);
fail_unless_equals_int (ret, GST_FLOW_OK);
/* wait until the appsrc is blocked downstream */
while (!gst_pad_is_blocking (srcpad))
@ -1104,10 +1106,12 @@ GST_START_TEST (test_appsrc_limits)
buffer = gst_buffer_new_and_alloc (100);
GST_BUFFER_PTS (buffer) = 1 * GST_SECOND;
gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer);
ret = gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer);
fail_unless_equals_int (ret, GST_FLOW_OK);
buffer = gst_buffer_new_and_alloc (100);
GST_BUFFER_PTS (buffer) = 2 * GST_SECOND;
gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer);
ret = gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer);
fail_unless_equals_int (ret, GST_FLOW_OK);
/* The first buffer is not queued anymore but inside the pad probe */
g_object_get (h->element, "current-level-bytes", &current_level, NULL);
@ -1119,7 +1123,8 @@ GST_START_TEST (test_appsrc_limits)
buffer = gst_buffer_new_and_alloc (100);
GST_BUFFER_PTS (buffer) = 4 * GST_SECOND;
gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer);
ret = gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer);
fail_unless_equals_int (ret, GST_FLOW_OK);
/* The new buffer was dropped now, otherwise we would have 2 seconds queued */
g_object_get (h->element, "current-level-bytes", &current_level, NULL);
@ -1133,7 +1138,8 @@ GST_START_TEST (test_appsrc_limits)
buffer = gst_buffer_new_and_alloc (100);
GST_BUFFER_PTS (buffer) = 4 * GST_SECOND;
gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer);
ret = gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer);
fail_unless_equals_int (ret, GST_FLOW_OK);
/* The oldest buffer was dropped now, otherwise we would have only 1 second queued */
g_object_get (h->element, "current-level-bytes", &current_level, NULL);
@ -1191,7 +1197,8 @@ GST_START_TEST (test_appsrc_limits)
buffer = gst_buffer_new_and_alloc (100);
GST_BUFFER_PTS (buffer) = 0 * GST_SECOND;
gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer);
ret = gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer);
fail_unless_equals_int (ret, GST_FLOW_OK);
/* wait until the appsrc is blocked downstream */
while (!gst_pad_is_blocking (srcpad))
@ -1199,10 +1206,12 @@ GST_START_TEST (test_appsrc_limits)
buffer = gst_buffer_new_and_alloc (100);
GST_BUFFER_PTS (buffer) = 1 * GST_SECOND;
gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer);
ret = gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer);
fail_unless_equals_int (ret, GST_FLOW_OK);
buffer = gst_buffer_new_and_alloc (100);
GST_BUFFER_PTS (buffer) = 2 * GST_SECOND;
gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer);
ret = gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer);
fail_unless_equals_int (ret, GST_FLOW_OK);
/* The first buffer is not queued anymore but inside the pad probe */
g_object_get (h->element, "current-level-bytes", &current_level, NULL);
@ -1214,7 +1223,8 @@ GST_START_TEST (test_appsrc_limits)
buffer = gst_buffer_new_and_alloc (100);
GST_BUFFER_PTS (buffer) = 4 * GST_SECOND;
gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer);
ret = gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer);
fail_unless_equals_int (ret, GST_FLOW_OK);
/* The new buffer was dropped now, otherwise we would have 2 seconds queued */
g_object_get (h->element, "current-level-bytes", &current_level, NULL);
@ -1228,7 +1238,8 @@ GST_START_TEST (test_appsrc_limits)
buffer = gst_buffer_new_and_alloc (100);
GST_BUFFER_PTS (buffer) = 4 * GST_SECOND;
gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer);
ret = gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer);
fail_unless_equals_int (ret, GST_FLOW_OK);
/* The oldest buffer was dropped now, otherwise we would have only 1 second queued */
g_object_get (h->element, "current-level-bytes", &current_level, NULL);
@ -1287,7 +1298,8 @@ GST_START_TEST (test_appsrc_limits)
buffer = gst_buffer_new_and_alloc (100);
GST_BUFFER_PTS (buffer) = 0 * GST_SECOND;
GST_BUFFER_DURATION (buffer) = GST_SECOND;
gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer);
ret = gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer);
fail_unless_equals_int (ret, GST_FLOW_OK);
/* wait until the appsrc is blocked downstream */
while (!gst_pad_is_blocking (srcpad))
@ -1296,11 +1308,13 @@ GST_START_TEST (test_appsrc_limits)
buffer = gst_buffer_new_and_alloc (100);
GST_BUFFER_PTS (buffer) = 1 * GST_SECOND;
GST_BUFFER_DURATION (buffer) = GST_SECOND;
gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer);
ret = gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer);
fail_unless_equals_int (ret, GST_FLOW_OK);
buffer = gst_buffer_new_and_alloc (100);
GST_BUFFER_PTS (buffer) = 2 * GST_SECOND;
GST_BUFFER_DURATION (buffer) = GST_SECOND;
gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer);
ret = gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer);
fail_unless_equals_int (ret, GST_FLOW_OK);
/* The first buffer is not queued anymore but inside the pad probe */
g_object_get (h->element, "current-level-bytes", &current_level, NULL);
@ -1313,7 +1327,8 @@ GST_START_TEST (test_appsrc_limits)
buffer = gst_buffer_new_and_alloc (100);
GST_BUFFER_PTS (buffer) = 4 * GST_SECOND;
GST_BUFFER_DURATION (buffer) = GST_SECOND;
gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer);
ret = gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer);
fail_unless_equals_int (ret, GST_FLOW_OK);
/* The new buffer was dropped now, otherwise we would have more than 2 seconds queued */
g_object_get (h->element, "current-level-bytes", &current_level, NULL);
@ -1328,7 +1343,8 @@ GST_START_TEST (test_appsrc_limits)
buffer = gst_buffer_new_and_alloc (100);
GST_BUFFER_PTS (buffer) = 4 * GST_SECOND;
GST_BUFFER_DURATION (buffer) = GST_SECOND;
gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer);
ret = gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer);
fail_unless_equals_int (ret, GST_FLOW_OK);
/* The oldest buffer was dropped now, otherwise we would have only 1 second queued */
g_object_get (h->element, "current-level-bytes", &current_level, NULL);