diff --git a/gst/rtpmanager/rtpsession.c b/gst/rtpmanager/rtpsession.c index 1654244284..e79dfb6914 100644 --- a/gst/rtpmanager/rtpsession.c +++ b/gst/rtpmanager/rtpsession.c @@ -2731,6 +2731,9 @@ rtp_session_process_feedback (RTPSession * sess, GstRTCPPacket * packet, if (sess->scheduled_bye && src && RTP_SOURCE_IS_MARKED_BYE (src)) return; + if (src) + g_object_ref (src); + fci_data = gst_rtcp_packet_fb_get_fci (packet); fci_length = gst_rtcp_packet_fb_get_fci_length (packet) * sizeof (guint32); @@ -2798,6 +2801,9 @@ rtp_session_process_feedback (RTPSession * sess, GstRTCPPacket * packet, break; } } + + if (src) + g_object_unref (src); } /** diff --git a/tests/check/elements/rtpsession.c b/tests/check/elements/rtpsession.c index 345588ad16..82243bd065 100644 --- a/tests/check/elements/rtpsession.c +++ b/tests/check/elements/rtpsession.c @@ -823,6 +823,88 @@ GST_START_TEST (test_illegal_rtcp_fb_packet) GST_END_TEST; +typedef struct +{ + GCond *cond; + GMutex *mutex; + gboolean fired; +} FeedbackRTCPCallbackData; + +static void +feedback_rtcp_cb (GstElement * element, guint fbtype, guint fmt, + guint sender_ssrc, guint media_ssrc, GstBuffer * fci, + FeedbackRTCPCallbackData * cb_data) +{ + g_mutex_lock (cb_data->mutex); + cb_data->fired = TRUE; + g_cond_wait (cb_data->cond, cb_data->mutex); + g_mutex_unlock (cb_data->mutex); +} + +static void * +send_feedback_rtcp (SessionHarness * h) +{ + GstRTCPPacket packet; + GstRTCPBuffer rtcp = GST_RTCP_BUFFER_INIT; + GstBuffer *buffer = gst_rtcp_buffer_new (1000); + + fail_unless (gst_rtcp_buffer_map (buffer, GST_MAP_READWRITE, &rtcp)); + fail_unless (gst_rtcp_buffer_add_packet (&rtcp, GST_RTCP_TYPE_PSFB, &packet)); + gst_rtcp_packet_fb_set_type (&packet, GST_RTCP_PSFB_TYPE_PLI); + gst_rtcp_packet_fb_set_fci_length (&packet, 0); + gst_rtcp_packet_fb_set_media_ssrc (&packet, 0xABE2B0B); + gst_rtcp_packet_fb_set_media_ssrc (&packet, 0xDEADBEEF); + gst_rtcp_buffer_unmap (&rtcp); + fail_unless_equals_int (GST_FLOW_OK, session_harness_recv_rtcp (h, buffer)); + + return NULL; +} + +GST_START_TEST (test_feedback_rtcp_race) +{ + SessionHarness *h = session_harness_new (); + + GCond cond; + GMutex mutex; + FeedbackRTCPCallbackData cb_data; + GThread *send_rtcp_thread; + + g_cond_init (&cond); + g_mutex_init (&mutex); + cb_data.cond = &cond; + cb_data.mutex = &mutex; + cb_data.fired = FALSE; + g_signal_connect (h->internal_session, "on-feedback-rtcp", + G_CALLBACK (feedback_rtcp_cb), &cb_data); + + /* Push RTP buffer making external source with SSRC=0xDEADBEEF */ + fail_unless_equals_int (GST_FLOW_OK, session_harness_recv_rtp (h, + generate_test_buffer (0, 0xDEADBEEF))); + + /* Push feedback RTCP with media SSRC=0xDEADBEEF */ + send_rtcp_thread = g_thread_new (NULL, (GThreadFunc) send_feedback_rtcp, h); + + /* Waiting for feedback RTCP callback to fire */ + while (!cb_data.fired) + g_usleep (G_USEC_PER_SEC / 100); + + /* While send_rtcp_thread thread is waiting for our signal + advance the clock by 30sec triggering removal of 0xDEADBEEF, + as if the source was inactive for too long */ + session_harness_advance_and_crank (h, GST_SECOND * 30); + gst_buffer_unref (session_harness_pull_rtcp (h)); + + /* Let send_rtcp_thread finish */ + g_mutex_lock (&mutex); + g_cond_signal (&cond); + g_mutex_unlock (&mutex); + g_thread_join (send_rtcp_thread); + + session_harness_free (h); +} + +GST_END_TEST; + static Suite * rtpsession_suite (void) { @@ -837,9 +919,8 @@ rtpsession_suite (void) tcase_add_test (tc_chain, test_receive_rtcp_app_packet); tcase_add_test (tc_chain, test_dont_lock_on_stats); tcase_add_test (tc_chain, test_ignore_suspicious_bye); - - tcase_add_test (tc_chain, test_illegal_rtcp_fb_packet); + tcase_add_test (tc_chain, test_feedback_rtcp_race); return s; }