appsink: Fix race condition on caps handling

Background:

Whenever a caps event is received by appsink, the caps are stored in the
same internal queue as buffers. Only when enough buffers have been
popped from the queue to reach the caps, `priv->sample` gets its caps
updated to match, so that they are correct for the following buffers.

Note that as far as upstream elements are concerned, the caps of appsink
are updated immediately when the CAPS event is sent. Samples pulled from
appsink retain the old caps until a later buffer -- one that was sent by
upstream elements after the new caps -- is pulled.

The race condition:

When a flush is received, appsink clears the entire internal queue. The
caps of `priv->sample` are not updated as part of this process, and
instead remain as those of the sample that was last pulled by the user.

This leaves open a race condition where:
1. Upstream sends a new caps event, and possibly some buffers for the
   new caps.
2. Upstream sends a flush (possibly from a different thread).
3. Upstream sends a new buffer for the new caps. Since as far as
   upstream is concerned, appsink caps are the new caps already, no new
   CAPS event is sent.
4. The appsink user pulls a sample, having not pulled before enough
   samples to reach the buffers sent in step 1.

Bug: the pulled sample has the old caps instead of the new caps.

Fixing the race condition:

To avoid this problem, when a buffer is received after a flush,
`priv->sample`'s caps should be updated with the current caps before the
buffer is added to the internal queue.

Interestingly, before this patch, appsink already had code for this, in
gst_app_sink_render_common():

    /* queue holding caps event might have been FLUSHed,
     * but caps state still present in pad caps */
    if (G_UNLIKELY (!priv->last_caps &&
            gst_pad_has_current_caps (GST_BASE_SINK_PAD (psink)))) {
      priv->last_caps = gst_pad_get_current_caps (GST_BASE_SINK_PAD (psink));
      gst_sample_set_caps (priv->sample, priv->last_caps);
      GST_DEBUG_OBJECT (appsink, "activating pad caps %" GST_PTR_FORMAT,
          priv->last_caps);
    }

This code assumes `priv->last_caps` is reset when a flush is received,
which makes sense, but unfortunately, there was no code in the flush
code path resetting it.

This patch adds such code, therefore fixing the race condition. A unit
test demonstrating the bug and testing its behavior with the fix has
also been added.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/2442>
This commit is contained in:
Alicia Boya García 2022-05-13 13:31:55 +02:00 committed by Tim-Philipp Müller
parent 9132a1a3c8
commit 9b0fad5f0c
2 changed files with 83 additions and 0 deletions

View file

@ -731,6 +731,7 @@ gst_app_sink_flush_unlocked (GstAppSink * appsink)
gst_mini_object_unref (obj); gst_mini_object_unref (obj);
priv->num_buffers = 0; priv->num_buffers = 0;
priv->num_events = 0; priv->num_events = 0;
gst_caps_replace (&priv->last_caps, NULL);
g_cond_signal (&priv->cond); g_cond_signal (&priv->cond);
} }

View file

@ -987,6 +987,87 @@ GST_START_TEST (test_reverse_stepping)
GST_END_TEST; GST_END_TEST;
static void
push_caps_with_type (gint caps_type)
{
GstCaps *caps;
caps =
gst_caps_new_simple ("application/x-gst-check", "type", G_TYPE_INT,
caps_type, NULL);
fail_unless (gst_pad_push_event (mysrcpad, gst_event_new_caps (caps)));
gst_caps_unref (caps);
}
static void
push_buffer_with_number (gint buffer_number)
{
GstBuffer *buffer;
buffer = gst_buffer_new_and_alloc (sizeof (gint));
gst_buffer_fill (buffer, 0, &buffer_number, sizeof (gint));
fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK);
}
static void
pull_and_check_sample (GstElement * appsink, gint expected_buffer_number,
gint expected_caps_type)
{
GstSample *sample;
GstCaps *caps;
GstBuffer *buffer;
GstStructure *structure;
gint actual_caps_type;
sample = gst_app_sink_pull_sample (GST_APP_SINK (appsink));
caps = gst_sample_get_caps (sample);
fail_unless (structure = gst_caps_get_structure (caps, 0));
fail_unless (gst_structure_get_int (structure, "type", &actual_caps_type));
assert_equals_int (actual_caps_type, expected_caps_type);
buffer = gst_sample_get_buffer (sample);
gst_check_buffer_data (buffer, &expected_buffer_number, sizeof (gint));
gst_sample_unref (sample);
}
GST_START_TEST (test_caps_before_flush_race_condition)
{
GstElement *sink;
GstSegment segment;
sink = setup_appsink ();
ASSERT_SET_STATE (sink, GST_STATE_PLAYING, GST_STATE_CHANGE_ASYNC);
// Push a series of buffers, and at the end, a new caps event.
push_caps_with_type (1);
push_buffer_with_number (10);
push_buffer_with_number (11);
push_caps_with_type (2);
pull_and_check_sample (sink, 10, 1);
// Then, let a flush happen.
fail_unless (gst_pad_push_event (mysrcpad, gst_event_new_flush_start ()));
fail_unless (gst_pad_push_event (mysrcpad, gst_event_new_flush_stop (TRUE)));
// Sinks downgrade state to PAUSED after a flush, let's up it to PLAYING again to avoid gst_pad_push becoming blocking.
ASSERT_SET_STATE (sink, GST_STATE_PLAYING, GST_STATE_CHANGE_ASYNC);
// A segment must be sent after a flush.
gst_segment_init (&segment, GST_FORMAT_TIME);
fail_unless (gst_pad_push_event (mysrcpad, gst_event_new_segment (&segment)));
// Send a buffer now, and check that when pulled by the appsink user, it didn't come with the wrong old caps.
push_buffer_with_number (20);
pull_and_check_sample (sink, 20, 2);
cleanup_appsink (sink);
}
GST_END_TEST;
static Suite * static Suite *
appsink_suite (void) appsink_suite (void)
{ {
@ -1012,6 +1093,7 @@ appsink_suite (void)
tcase_add_test (tc_chain, test_event_signals); tcase_add_test (tc_chain, test_event_signals);
tcase_add_test (tc_chain, test_event_paused); tcase_add_test (tc_chain, test_event_paused);
tcase_add_test (tc_chain, test_reverse_stepping); tcase_add_test (tc_chain, test_reverse_stepping);
tcase_add_test (tc_chain, test_caps_before_flush_race_condition);
return s; return s;
} }