rtpsession: Send EOS if all internal sources sent bye

The ones which are not internal should not matter, and we should
wait for all sources to have sent their BYEs.

And add unit test

https://bugzilla.gnome.org/show_bug.cgi?id=773218
This commit is contained in:
Olivier Crête 2017-07-04 17:42:25 -04:00
parent 7e7e52caa0
commit 96e71b0286
3 changed files with 205 additions and 1 deletions

View file

@ -3958,7 +3958,7 @@ rtp_session_are_all_sources_bye (RTPSession * sess)
g_hash_table_iter_init (&iter, sess->ssrcs[sess->mask_idx]);
while (g_hash_table_iter_next (&iter, NULL, (gpointer *) & src)) {
if (!src->marked_bye)
if (src->internal && !src->sent_bye)
return FALSE;
}

View file

@ -533,6 +533,10 @@ elements_rtpbin_buffer_list_LDADD = $(GST_PLUGINS_BASE_LIBS) \
$(GST_BASE_LIBS) $(GST_LIBS) $(GST_CHECK_LIBS) $(LDADD)
elements_rtpbin_buffer_list_SOURCES = elements/rtpbin_buffer_list.c
elements_rtpbin_LDADD = $(GST_PLUGINS_BASE_LIBS) \
-lgstrtp-$(GST_API_VERSION) \
$(GST_BASE_LIBS) $(GST_LIBS) $(GST_CHECK_LIBS) $(LDADD)
elements_rtph261_CFLAGS = $(GST_PLUGINS_BASE_CFLAGS) $(GST_BASE_CFLAGS) $(AM_CFLAGS)
elements_rtph261_LDADD = $(GST_PLUGINS_BASE_LIBS) -lgstrtp-$(GST_API_VERSION) $(GST_BASE_LIBS) $(LDADD)

View file

@ -21,6 +21,10 @@
*/
#include <gst/check/gstcheck.h>
#include <gst/check/gsttestclock.h>
#include <gst/rtp/gstrtpbuffer.h>
#include <gst/rtp/gstrtcpbuffer.h>
GST_START_TEST (test_pads)
{
@ -689,6 +693,201 @@ GST_START_TEST (test_aux_receiver)
GST_END_TEST;
GST_START_TEST (test_sender_eos)
{
GstElement *rtpsession;
GstBuffer *rtp_buffer;
GstBuffer *rtcp_buffer;
GstRTPBuffer rtpbuf = GST_RTP_BUFFER_INIT;
GstRTCPBuffer rtcpbuf = GST_RTCP_BUFFER_INIT;
GstRTCPPacket rtcppacket;
static GstStaticPadTemplate recv_tmpl =
GST_STATIC_PAD_TEMPLATE ("sink", GST_PAD_SINK, GST_PAD_ALWAYS,
GST_STATIC_CAPS ("ANY"));
GstPad *send_rtp_sink;
GstPad *recv_rtcp_sink;
GstCaps *caps;
GstSegment segment;
GstPad *rtp_sink, *rtcp_sink;
GstClock *clock;
GstTestClock *tclock;
GstStructure *s;
guint ssrc = 1;
guint32 ssrc_in, packet_count, octet_count;
gboolean got_bye = FALSE;
clock = gst_test_clock_new ();
gst_system_clock_set_default (clock);
tclock = GST_TEST_CLOCK (clock);
gst_test_clock_set_time (tclock, 0);
rtpsession = gst_element_factory_make ("rtpsession", NULL);
send_rtp_sink = gst_element_get_request_pad (rtpsession, "send_rtp_sink");
recv_rtcp_sink = gst_element_get_request_pad (rtpsession, "recv_rtcp_sink");
rtp_sink = gst_check_setup_sink_pad_by_name (rtpsession, &recv_tmpl,
"send_rtp_src");
rtcp_sink = gst_check_setup_sink_pad_by_name (rtpsession, &recv_tmpl,
"send_rtcp_src");
gst_pad_set_active (rtp_sink, TRUE);
gst_pad_set_active (rtcp_sink, TRUE);
gst_element_set_state (rtpsession, GST_STATE_PLAYING);
/* Send initial events */
gst_segment_init (&segment, GST_FORMAT_TIME);
fail_unless (gst_pad_send_event (send_rtp_sink,
gst_event_new_stream_start ("id")));
fail_unless (gst_pad_send_event (send_rtp_sink,
gst_event_new_segment (&segment)));
fail_unless (gst_pad_send_event (recv_rtcp_sink,
gst_event_new_stream_start ("id")));
fail_unless (gst_pad_send_event (recv_rtcp_sink,
gst_event_new_segment (&segment)));
/* Get the suggested SSRC from the rtpsession */
caps = gst_pad_query_caps (send_rtp_sink, NULL);
s = gst_caps_get_structure (caps, 0);
gst_structure_get (s, "ssrc", G_TYPE_UINT, &ssrc, NULL);
gst_caps_unref (caps);
/* Send a RTP packet */
rtp_buffer = gst_rtp_buffer_new_allocate (10, 0, 0);
gst_rtp_buffer_map (rtp_buffer, GST_MAP_READWRITE, &rtpbuf);
gst_rtp_buffer_set_ssrc (&rtpbuf, 1);
gst_rtp_buffer_set_seq (&rtpbuf, 0);
gst_rtp_buffer_unmap (&rtpbuf);
fail_unless (gst_pad_chain (send_rtp_sink, rtp_buffer) == GST_FLOW_OK);
/* Make sure it went through */
fail_unless_equals_int (g_list_length (buffers), 1);
fail_unless_equals_pointer (buffers->data, rtp_buffer);
gst_check_drop_buffers ();
/* Advance time and send a packet to prevent source sender timeout */
gst_test_clock_set_time (tclock, 1 * GST_SECOND);
/* Just send a send packet to prevent timeout */
rtp_buffer = gst_rtp_buffer_new_allocate (10, 0, 0);
gst_rtp_buffer_map (rtp_buffer, GST_MAP_READWRITE, &rtpbuf);
gst_rtp_buffer_set_ssrc (&rtpbuf, 1);
gst_rtp_buffer_set_seq (&rtpbuf, 1);
gst_rtp_buffer_set_timestamp (&rtpbuf, 10);
gst_rtp_buffer_unmap (&rtpbuf);
fail_unless (gst_pad_chain (send_rtp_sink, rtp_buffer) == GST_FLOW_OK);
/* Make sure it went through */
fail_unless_equals_int (g_list_length (buffers), 1);
fail_unless_equals_pointer (buffers->data, rtp_buffer);
gst_check_drop_buffers ();
/* Advance clock twice and we shoudl have one RTCP packet at least */
gst_test_clock_crank (tclock);
gst_test_clock_crank (tclock);
g_mutex_lock (&check_mutex);
while (buffers == NULL)
g_cond_wait (&check_cond, &check_mutex);
fail_unless (gst_rtcp_buffer_map (buffers->data, GST_MAP_READ, &rtcpbuf));
fail_unless (gst_rtcp_buffer_get_first_packet (&rtcpbuf, &rtcppacket));
fail_unless_equals_int (gst_rtcp_packet_get_type (&rtcppacket),
GST_RTCP_TYPE_SR);
gst_rtcp_packet_sr_get_sender_info (&rtcppacket, &ssrc_in, NULL, NULL,
&packet_count, &octet_count);
fail_unless_equals_int (packet_count, 2);
fail_unless_equals_int (octet_count, 20);
fail_unless (gst_rtcp_packet_move_to_next (&rtcppacket));
fail_unless_equals_int (gst_rtcp_packet_get_type (&rtcppacket),
GST_RTCP_TYPE_SDES);
gst_rtcp_buffer_unmap (&rtcpbuf);
gst_check_drop_buffers ();
g_mutex_unlock (&check_mutex);
/* Create and send a valid RTCP reply packet */
rtcp_buffer = gst_rtcp_buffer_new (1500);
gst_rtcp_buffer_map (rtcp_buffer, GST_MAP_READWRITE, &rtcpbuf);
gst_rtcp_buffer_add_packet (&rtcpbuf, GST_RTCP_TYPE_RR, &rtcppacket);
gst_rtcp_packet_rr_set_ssrc (&rtcppacket, ssrc + 1);
gst_rtcp_packet_add_rb (&rtcppacket, ssrc, 0, 0, 0, 0, 0, 0);
gst_rtcp_buffer_add_packet (&rtcpbuf, GST_RTCP_TYPE_SDES, &rtcppacket);
gst_rtcp_packet_sdes_add_item (&rtcppacket, ssrc + 1);
gst_rtcp_packet_sdes_add_entry (&rtcppacket, GST_RTCP_SDES_CNAME, 3,
(guint8 *) "a@a");
gst_rtcp_packet_sdes_add_entry (&rtcppacket, GST_RTCP_SDES_NAME, 2,
(guint8 *) "aa");
gst_rtcp_packet_sdes_add_entry (&rtcppacket, GST_RTCP_SDES_END, 0,
(guint8 *) "");
gst_rtcp_buffer_unmap (&rtcpbuf);
fail_unless (gst_pad_chain (recv_rtcp_sink, rtcp_buffer) == GST_FLOW_OK);
/* Send a EOS to trigger sending a BYE message */
fail_unless (gst_pad_send_event (send_rtp_sink, gst_event_new_eos ()));
/* Crank to process EOS and wait for BYE */
for (;;) {
gst_test_clock_crank (tclock);
g_mutex_lock (&check_mutex);
while (buffers == NULL)
g_cond_wait (&check_cond, &check_mutex);
fail_unless (gst_rtcp_buffer_map (g_list_last (buffers)->data, GST_MAP_READ,
&rtcpbuf));
fail_unless (gst_rtcp_buffer_get_first_packet (&rtcpbuf, &rtcppacket));
while (gst_rtcp_packet_move_to_next (&rtcppacket)) {
if (gst_rtcp_packet_get_type (&rtcppacket) == GST_RTCP_TYPE_BYE) {
got_bye = TRUE;
break;
}
}
g_mutex_unlock (&check_mutex);
gst_rtcp_buffer_unmap (&rtcpbuf);
if (got_bye)
break;
}
gst_check_drop_buffers ();
fail_unless (GST_PAD_IS_EOS (rtp_sink));
fail_unless (GST_PAD_IS_EOS (rtcp_sink));
gst_pad_set_active (rtp_sink, FALSE);
gst_pad_set_active (rtcp_sink, FALSE);
gst_check_teardown_pad_by_name (rtpsession, "send_rtp_src");
gst_check_teardown_pad_by_name (rtpsession, "send_rtcp_src");
gst_element_release_request_pad (rtpsession, send_rtp_sink);
gst_object_unref (send_rtp_sink);
gst_element_release_request_pad (rtpsession, recv_rtcp_sink);
gst_object_unref (recv_rtcp_sink);
gst_check_teardown_element (rtpsession);
gst_system_clock_set_default (NULL);
gst_object_unref (clock);
}
GST_END_TEST;
static Suite *
rtpbin_suite (void)
{
@ -705,6 +904,7 @@ rtpbin_suite (void)
tcase_add_test (tc_chain, test_decoder);
tcase_add_test (tc_chain, test_aux_sender);
tcase_add_test (tc_chain, test_aux_receiver);
tcase_add_test (tc_chain, test_sender_eos);
return s;
}