mirror of
https://gitlab.freedesktop.org/gstreamer/gstreamer.git
synced 2025-01-22 15:18:21 +00:00
rtpssrcdemux: fix "data flow before segment event" crash
This crash could happen at any time a RTP and RTCP buffer arrived simultaneously in ssrcdemux. The problem was that sticky-event arriving while the rtp and rtcp pads were being set up could arrive just too late to be included in the initial forwarding. The fix checks if the stickies have been sent on the srcpad about to be pushed on, and if not sends them. It also blocks any stickes from being forwarded *prior* to this happening, to avoid them arriving on the srcpad multiple times. Since the test loops 1000 times, this will make running under valgrind take forever, so use the RUNNING_ON_VALGRIND variable to detect we are running under valgrind, and reduce the loop-count to 2 in that case. Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-good/-/merge_requests/992>
This commit is contained in:
parent
de3a3882e9
commit
26c94af2ea
2 changed files with 125 additions and 20 deletions
|
@ -83,6 +83,10 @@ GST_STATIC_PAD_TEMPLATE ("rtcp_src_%u",
|
|||
#define INTERNAL_STREAM_LOCK(obj) (g_rec_mutex_lock (&(obj)->padlock))
|
||||
#define INTERNAL_STREAM_UNLOCK(obj) (g_rec_mutex_unlock (&(obj)->padlock))
|
||||
|
||||
#define GST_PAD_FLAG_STICKIES_SENT (GST_PAD_FLAG_LAST << 0)
|
||||
#define GST_PAD_STICKIES_SENT(pad) (GST_OBJECT_FLAG_IS_SET (pad, GST_PAD_FLAG_STICKIES_SENT))
|
||||
#define GST_PAD_SET_STICKIES_SENT(pad) (GST_OBJECT_FLAG_SET (pad, GST_PAD_FLAG_STICKIES_SENT))
|
||||
|
||||
typedef enum
|
||||
{
|
||||
RTP_PAD,
|
||||
|
@ -244,13 +248,11 @@ forward_sticky_events (GstPad * pad, GstEvent ** event, gpointer user_data)
|
|||
GstEvent *newevent;
|
||||
|
||||
newevent = add_ssrc_and_ref (*event, data->ssrc);
|
||||
|
||||
gst_pad_push_event (data->pad, newevent);
|
||||
|
||||
return TRUE;
|
||||
}
|
||||
|
||||
/* With internal stream lock held */
|
||||
static void
|
||||
forward_initial_events (GstRtpSsrcDemux * demux, guint32 ssrc, GstPad * pad,
|
||||
PadType padtype)
|
||||
|
@ -318,9 +320,6 @@ find_or_create_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc,
|
|||
dpads->rtp_pad = rtp_pad;
|
||||
dpads->rtcp_pad = rtcp_pad;
|
||||
|
||||
gst_pad_set_element_private (rtp_pad, dpads);
|
||||
gst_pad_set_element_private (rtcp_pad, dpads);
|
||||
|
||||
GST_OBJECT_LOCK (demux);
|
||||
demux->srcpads = g_slist_prepend (demux->srcpads, dpads);
|
||||
GST_OBJECT_UNLOCK (demux);
|
||||
|
@ -338,9 +337,6 @@ find_or_create_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc,
|
|||
gst_pad_use_fixed_caps (rtcp_pad);
|
||||
gst_pad_set_active (rtcp_pad, TRUE);
|
||||
|
||||
forward_initial_events (demux, ssrc, rtp_pad, RTP_PAD);
|
||||
forward_initial_events (demux, ssrc, rtcp_pad, RTCP_PAD);
|
||||
|
||||
gst_element_add_pad (GST_ELEMENT_CAST (demux), rtp_pad);
|
||||
gst_element_add_pad (GST_ELEMENT_CAST (demux), rtcp_pad);
|
||||
|
||||
|
@ -606,6 +602,13 @@ forward_event (GstPad * pad, gpointer user_data)
|
|||
GSList *walk = NULL;
|
||||
GstEvent *newevent = NULL;
|
||||
|
||||
/* special case for EOS */
|
||||
if (GST_EVENT_TYPE (fdata->event) == GST_EVENT_EOS)
|
||||
GST_PAD_SET_STICKIES_SENT (pad);
|
||||
|
||||
if (GST_EVENT_IS_STICKY (fdata->event) && !GST_PAD_STICKIES_SENT (pad))
|
||||
return FALSE;
|
||||
|
||||
GST_OBJECT_LOCK (fdata->demux);
|
||||
for (walk = fdata->demux->srcpads; walk; walk = walk->next) {
|
||||
GstRtpSsrcDemuxPads *dpads = (GstRtpSsrcDemuxPads *) walk->data;
|
||||
|
@ -668,6 +671,11 @@ gst_rtp_ssrc_demux_chain (GstPad * pad, GstObject * parent, GstBuffer * buf)
|
|||
if (srcpad == NULL)
|
||||
goto create_failed;
|
||||
|
||||
if (!GST_PAD_STICKIES_SENT (srcpad)) {
|
||||
forward_initial_events (demux, ssrc, srcpad, RTP_PAD);
|
||||
GST_PAD_SET_STICKIES_SENT (srcpad);
|
||||
}
|
||||
|
||||
/* push to srcpad */
|
||||
ret = gst_pad_push (srcpad, buf);
|
||||
|
||||
|
@ -758,6 +766,11 @@ gst_rtp_ssrc_demux_rtcp_chain (GstPad * pad, GstObject * parent,
|
|||
if (srcpad == NULL)
|
||||
goto create_failed;
|
||||
|
||||
if (!GST_PAD_STICKIES_SENT (srcpad)) {
|
||||
forward_initial_events (demux, ssrc, srcpad, RTCP_PAD);
|
||||
GST_PAD_SET_STICKIES_SENT (srcpad);
|
||||
}
|
||||
|
||||
/* push to srcpad */
|
||||
ret = gst_pad_push (srcpad, buf);
|
||||
|
||||
|
@ -944,17 +957,12 @@ gst_rtp_ssrc_demux_src_query (GstPad * pad, GstObject * parent,
|
|||
if ((res = gst_pad_peer_query (demux->rtp_sink, query))) {
|
||||
gboolean live;
|
||||
GstClockTime min_latency, max_latency;
|
||||
GstRtpSsrcDemuxPads *dpads;
|
||||
|
||||
dpads = gst_pad_get_element_private (pad);
|
||||
|
||||
gst_query_parse_latency (query, &live, &min_latency, &max_latency);
|
||||
|
||||
GST_DEBUG_OBJECT (demux, "peer min latency %" GST_TIME_FORMAT,
|
||||
GST_DEBUG_OBJECT (pad, "peer min latency %" GST_TIME_FORMAT,
|
||||
GST_TIME_ARGS (min_latency));
|
||||
|
||||
GST_DEBUG_OBJECT (demux, "latency for SSRC %08x", dpads->ssrc);
|
||||
|
||||
gst_query_set_latency (query, live, min_latency, max_latency);
|
||||
}
|
||||
break;
|
||||
|
|
|
@ -20,10 +20,22 @@
|
|||
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
|
||||
* Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
#ifdef HAVE_CONFIG_H
|
||||
#include "config.h"
|
||||
#endif
|
||||
|
||||
#include <gst/rtp/gstrtpbuffer.h>
|
||||
#include <gst/rtp/gstrtcpbuffer.h>
|
||||
|
||||
#include <gst/check/gstcheck.h>
|
||||
#include <gst/check/gstharness.h>
|
||||
|
||||
#ifdef HAVE_VALGRIND
|
||||
# include <valgrind/valgrind.h>
|
||||
#else
|
||||
# define RUNNING_ON_VALGRIND 0
|
||||
#endif
|
||||
|
||||
#define TEST_BUF_CLOCK_RATE 8000
|
||||
#define TEST_BUF_PT 0
|
||||
#define TEST_BUF_SSRC 0x01BADBAD
|
||||
|
@ -68,6 +80,7 @@ create_buffer (guint seq_num, guint32 ssrc)
|
|||
return buf;
|
||||
}
|
||||
|
||||
|
||||
typedef struct
|
||||
{
|
||||
GstHarness *rtp_sink;
|
||||
|
@ -158,11 +171,7 @@ GST_START_TEST (test_event_forwarding)
|
|||
gst_harness_push_event (ctx.rtcp_sink, gst_event_new_eos ());
|
||||
|
||||
g_assert_cmpint (gst_harness_events_in_queue (ctx.rtp_src), ==, 0);
|
||||
g_assert_cmpint (gst_harness_events_in_queue (ctx.rtcp_src), ==, 2);
|
||||
|
||||
event = gst_harness_pull_event (ctx.rtcp_src);
|
||||
g_assert_cmpint (event->type, ==, GST_EVENT_STREAM_START);
|
||||
gst_event_unref (event);
|
||||
g_assert_cmpint (gst_harness_events_in_queue (ctx.rtcp_src), ==, 1);
|
||||
|
||||
event = gst_harness_pull_event (ctx.rtcp_src);
|
||||
g_assert_cmpint (event->type, ==, GST_EVENT_EOS);
|
||||
|
@ -278,7 +287,7 @@ GST_START_TEST (test_rtpssrcdemux_max_streams)
|
|||
GST_END_TEST;
|
||||
|
||||
static void
|
||||
new_rtcp_ssrc_pad_found (GstElement * element, G_GNUC_UNUSED guint ssrc,
|
||||
new_rtcp_ssrc_pad_found (GstElement * element, guint ssrc,
|
||||
G_GNUC_UNUSED GstPad * rtp_pad, GSList ** src_h)
|
||||
{
|
||||
GstHarness *h;
|
||||
|
@ -355,7 +364,94 @@ GST_START_TEST (test_rtpssrcdemux_invalid_rtcp)
|
|||
|
||||
GST_END_TEST;
|
||||
|
||||
static GstBuffer *
|
||||
generate_rtcp_sr_buffer (guint ssrc)
|
||||
{
|
||||
GstBuffer *buf;
|
||||
GstRTCPBuffer rtcp = GST_RTCP_BUFFER_INIT;
|
||||
GstRTCPPacket packet;
|
||||
|
||||
buf = gst_rtcp_buffer_new (1000);
|
||||
fail_unless (gst_rtcp_buffer_map (buf, GST_MAP_READWRITE, &rtcp));
|
||||
fail_unless (gst_rtcp_buffer_add_packet (&rtcp, GST_RTCP_TYPE_SR, &packet));
|
||||
gst_rtcp_packet_sr_set_sender_info (&packet, ssrc, 0, 0, 1, 1);
|
||||
gst_rtcp_buffer_unmap (&rtcp);
|
||||
return buf;
|
||||
}
|
||||
|
||||
typedef struct
|
||||
{
|
||||
GstHarness *rtp_h;
|
||||
GstHarness *rtcp_h;
|
||||
} SimulCtx;
|
||||
|
||||
static void
|
||||
_simul_ctx_new_ssrc_pad_cb (GstElement * element, guint ssrc,
|
||||
GstPad * rtp_pad, SimulCtx * ctx)
|
||||
{
|
||||
GstPad *rtcp_pad;
|
||||
gchar *name;
|
||||
|
||||
gst_harness_add_element_src_pad (ctx->rtp_h, rtp_pad);
|
||||
|
||||
name = g_strdup_printf ("rtcp_src_%u", ssrc);
|
||||
rtcp_pad = gst_element_get_static_pad (element, name);
|
||||
gst_harness_add_element_src_pad (ctx->rtcp_h, rtcp_pad);
|
||||
gst_object_unref (rtcp_pad);
|
||||
g_free (name);
|
||||
}
|
||||
|
||||
static gpointer
|
||||
_simul_ctx_push_rtp_buffers (gpointer user_data)
|
||||
{
|
||||
SimulCtx *ctx = user_data;
|
||||
|
||||
gst_harness_set_src_caps_str (ctx->rtp_h, "application/x-rtp");
|
||||
gst_harness_push (ctx->rtp_h, create_buffer (0, 1111));
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static gpointer
|
||||
_simul_ctx_push_rtcp_buffers (gpointer user_data)
|
||||
{
|
||||
SimulCtx *ctx = user_data;
|
||||
|
||||
g_usleep (10);
|
||||
gst_harness_set_src_caps_str (ctx->rtcp_h, "application/x-rtcp");
|
||||
gst_harness_push (ctx->rtcp_h, generate_rtcp_sr_buffer (1111));
|
||||
return NULL;
|
||||
}
|
||||
|
||||
GST_START_TEST (test_rtp_and_rtcp_arrives_simultaneously)
|
||||
{
|
||||
guint r;
|
||||
guint repeats = 1000;
|
||||
if (RUNNING_ON_VALGRIND)
|
||||
repeats = 2;
|
||||
|
||||
for (r = 0; r < repeats; r++) {
|
||||
SimulCtx ctx;
|
||||
GThread *t0, *t1;
|
||||
|
||||
ctx.rtp_h = gst_harness_new_with_padnames ("rtpssrcdemux", "sink", NULL);
|
||||
ctx.rtcp_h =
|
||||
gst_harness_new_with_element (ctx.rtp_h->element, "rtcp_sink", NULL);
|
||||
|
||||
g_signal_connect (ctx.rtp_h->element,
|
||||
"new-ssrc-pad", (GCallback) _simul_ctx_new_ssrc_pad_cb, &ctx);
|
||||
|
||||
t0 = g_thread_new ("push rtp", _simul_ctx_push_rtp_buffers, &ctx);
|
||||
t1 = g_thread_new ("push rtcp", _simul_ctx_push_rtcp_buffers, &ctx);
|
||||
|
||||
g_thread_join (t0);
|
||||
g_thread_join (t1);
|
||||
|
||||
gst_harness_teardown (ctx.rtp_h);
|
||||
gst_harness_teardown (ctx.rtcp_h);
|
||||
}
|
||||
}
|
||||
|
||||
GST_END_TEST;
|
||||
|
||||
static Suite *
|
||||
rtpssrcdemux_suite (void)
|
||||
|
@ -370,6 +466,7 @@ rtpssrcdemux_suite (void)
|
|||
tcase_add_test (tc_chain, test_rtpssrcdemux_rtcp_app);
|
||||
tcase_add_test (tc_chain, test_rtpssrcdemux_invalid_rtp);
|
||||
tcase_add_test (tc_chain, test_rtpssrcdemux_invalid_rtcp);
|
||||
tcase_add_test (tc_chain, test_rtp_and_rtcp_arrives_simultaneously);
|
||||
|
||||
return s;
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue