rtpbin: fix shutdown crash in rtpbin

The key is to make sure the jitterbuffer is set to NULL *before* the
ptdemux.

The race that existed would basically happen when ptdemux had reached
READY, and the jitterbuffer would then push a buffer, triggering a new
pad with a new payloadtype being added and ghosted to the rtpbin itself.

However, the srcpad of the ptdemux would now be inactive, and all the
sticky-event pushed on it would be swallowed, not allowing any to reach
the ghost-pad. Then the buffer in-flight would come to the ghostpad,
and we would assert that a buffer arrived before the necessary
events.

By simply re-ordering the state-changes, we ensure that there will be
no buffer racing into the ptdemux while its state is being changed,
and the problem disappears completely.

Notice also that there is not point in disconnecting the signals on the
ptdemux before this point, since we need the push-thread to settle
down before we can do this in a non-racy way.
This commit is contained in:
Havard Graff 2019-12-19 23:48:09 +01:00 committed by GStreamer Merge Bot
parent 4155c59cc4
commit 8b96d8ee8d
2 changed files with 74 additions and 12 deletions

View file

@ -1859,10 +1859,19 @@ free_stream (GstRtpBinStream * stream, GstRtpBin * bin)
GST_DEBUG_OBJECT (bin, "freeing stream %p", stream); GST_DEBUG_OBJECT (bin, "freeing stream %p", stream);
gst_element_set_locked_state (stream->buffer, TRUE);
if (stream->demux)
gst_element_set_locked_state (stream->demux, TRUE);
gst_element_set_state (stream->buffer, GST_STATE_NULL);
if (stream->demux)
gst_element_set_state (stream->demux, GST_STATE_NULL);
if (stream->demux) { if (stream->demux) {
g_signal_handler_disconnect (stream->demux, stream->demux_newpad_sig); g_signal_handler_disconnect (stream->demux, stream->demux_newpad_sig);
g_signal_handler_disconnect (stream->demux, stream->demux_ptreq_sig); g_signal_handler_disconnect (stream->demux, stream->demux_ptreq_sig);
g_signal_handler_disconnect (stream->demux, stream->demux_ptchange_sig); g_signal_handler_disconnect (stream->demux, stream->demux_ptchange_sig);
g_signal_handler_disconnect (stream->demux, stream->demux_padremoved_sig);
} }
if (stream->buffer_handlesync_sig) if (stream->buffer_handlesync_sig)
@ -1873,18 +1882,6 @@ free_stream (GstRtpBinStream * stream, GstRtpBin * bin)
g_signal_handler_disconnect (stream->buffer, stream->buffer_ntpstop_sig); g_signal_handler_disconnect (stream->buffer, stream->buffer_ntpstop_sig);
gst_object_unref (stream->buffer); gst_object_unref (stream->buffer);
if (stream->demux)
gst_element_set_locked_state (stream->demux, TRUE);
if (stream->demux)
gst_element_set_state (stream->demux, GST_STATE_NULL);
/* now remove this signal, we need this while going to NULL because it to
* do some cleanups */
if (stream->demux)
g_signal_handler_disconnect (stream->demux, stream->demux_padremoved_sig);
if (stream->demux) if (stream->demux)
gst_bin_remove (GST_BIN_CAST (bin), stream->demux); gst_bin_remove (GST_BIN_CAST (bin), stream->demux);

View file

@ -22,6 +22,7 @@
#include <gst/check/gstcheck.h> #include <gst/check/gstcheck.h>
#include <gst/check/gsttestclock.h> #include <gst/check/gsttestclock.h>
#include <gst/check/gstharness.h>
#include <gst/rtp/gstrtpbuffer.h> #include <gst/rtp/gstrtpbuffer.h>
#include <gst/rtp/gstrtcpbuffer.h> #include <gst/rtp/gstrtcpbuffer.h>
@ -902,6 +903,69 @@ GST_START_TEST (test_sender_eos)
GST_END_TEST; GST_END_TEST;
static GstBuffer *
generate_rtp_buffer (GstClockTime ts,
guint seqnum, guint32 rtp_ts, guint pt, guint ssrc)
{
GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
GstBuffer *buf = gst_rtp_buffer_new_allocate (0, 0, 0);
GST_BUFFER_PTS (buf) = ts;
GST_BUFFER_DTS (buf) = ts;
gst_rtp_buffer_map (buf, GST_MAP_READWRITE, &rtp);
gst_rtp_buffer_set_payload_type (&rtp, pt);
gst_rtp_buffer_set_seq (&rtp, seqnum);
gst_rtp_buffer_set_timestamp (&rtp, rtp_ts);
gst_rtp_buffer_set_ssrc (&rtp, ssrc);
gst_rtp_buffer_unmap (&rtp);
return buf;
}
static GstCaps *
_request_pt_map (G_GNUC_UNUSED GstElement * rtpbin,
G_GNUC_UNUSED guint session_id, G_GNUC_UNUSED guint pt,
const GstCaps * caps)
{
return gst_caps_copy (caps);
}
static void
_pad_added (G_GNUC_UNUSED GstElement * rtpbin, GstPad * pad, GstHarness * h)
{
gst_harness_add_element_src_pad (h, pad);
}
GST_START_TEST (test_quick_shutdown)
{
for (guint r = 0; r < 1000; r++) {
guint i;
GstHarness *h = gst_harness_new_with_padnames ("rtpbin",
"recv_rtp_sink_0", NULL);
GstCaps *caps = gst_caps_new_simple ("application/x-rtp",
"clock-rate", G_TYPE_INT, 8000,
"payload", G_TYPE_INT, 100, NULL);
g_signal_connect (h->element, "request-pt-map",
G_CALLBACK (_request_pt_map), caps);
g_signal_connect (h->element, "pad-added", G_CALLBACK (_pad_added), h);
gst_harness_set_src_caps (h, gst_caps_copy (caps));
for (i = 0; i < 50; i++) {
gst_harness_push (h,
generate_rtp_buffer (i * GST_MSECOND * 20, i, i * 160, 100, 1234));
}
gst_harness_crank_single_clock_wait (h);
gst_caps_unref (caps);
gst_harness_teardown (h);
}
}
GST_END_TEST;
static Suite * static Suite *
rtpbin_suite (void) rtpbin_suite (void)
{ {
@ -919,6 +983,7 @@ rtpbin_suite (void)
tcase_add_test (tc_chain, test_aux_sender); tcase_add_test (tc_chain, test_aux_sender);
tcase_add_test (tc_chain, test_aux_receiver); tcase_add_test (tc_chain, test_aux_receiver);
tcase_add_test (tc_chain, test_sender_eos); tcase_add_test (tc_chain, test_sender_eos);
tcase_add_test (tc_chain, test_quick_shutdown);
return s; return s;
} }