From 7c9a5e86fe3a504811aad9af7c86b64d7ea5a8d7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Olivier=20Cr=C3=AAte?= Date: Wed, 8 Jul 2020 17:28:31 -0400 Subject: [PATCH] rtpfunnel: Also forward custom sticky event This is useful to track metadata about each group of packets Also include a unit test Part-of: --- gst/rtpmanager/gstrtpfunnel.c | 16 +++- tests/check/elements/rtpfunnel.c | 148 +++++++++++++++++++++++++------ 2 files changed, 133 insertions(+), 31 deletions(-) diff --git a/gst/rtpmanager/gstrtpfunnel.c b/gst/rtpmanager/gstrtpfunnel.c index 7638b34973..6bd86b034e 100644 --- a/gst/rtpmanager/gstrtpfunnel.c +++ b/gst/rtpmanager/gstrtpfunnel.c @@ -189,18 +189,28 @@ done: static void gst_rtp_funnel_forward_segment (GstRtpFunnel * funnel, GstPad * pad) { - GstEvent *segment; + GstEvent *event; + guint i; if (pad == funnel->current_pad) { goto done; } - segment = gst_pad_get_sticky_event (pad, GST_EVENT_SEGMENT, 0); - if (segment && !gst_pad_push_event (funnel->srcpad, segment)) { + event = gst_pad_get_sticky_event (pad, GST_EVENT_SEGMENT, 0); + if (event && !gst_pad_push_event (funnel->srcpad, event)) { GST_ERROR_OBJECT (funnel, "Could not push segment"); goto done; } + for (i = 0;; i++) { + event = gst_pad_get_sticky_event (pad, GST_EVENT_CUSTOM_DOWNSTREAM_STICKY, + i); + if (event == NULL) + break; + if (!gst_pad_push_event (funnel->srcpad, event)) + GST_ERROR_OBJECT (funnel, "Could not push custom event"); + } + funnel->current_pad = pad; done: diff --git a/tests/check/elements/rtpfunnel.c b/tests/check/elements/rtpfunnel.c index 5224dce8d6..1b22f7c14f 100644 --- a/tests/check/elements/rtpfunnel.c +++ b/tests/check/elements/rtpfunnel.c @@ -127,6 +127,125 @@ GST_START_TEST (rtpfunnel_common_ts_offset) GST_END_TEST; +static GstBuffer * +generate_test_buffer (guint seqnum, guint ssrc, guint8 twcc_ext_id) +{ + GstBuffer *buf; + GstRTPBuffer rtp = GST_RTP_BUFFER_INIT; + + buf = gst_rtp_buffer_new_allocate (0, 0, 0); + GST_BUFFER_PTS (buf) = seqnum * 20 * GST_MSECOND; + GST_BUFFER_DTS (buf) = GST_BUFFER_PTS (buf); + + gst_rtp_buffer_map (buf, GST_MAP_READWRITE, &rtp); + gst_rtp_buffer_set_payload_type (&rtp, 100); + gst_rtp_buffer_set_seq (&rtp, seqnum); + gst_rtp_buffer_set_timestamp (&rtp, seqnum * 160); + gst_rtp_buffer_set_ssrc (&rtp, ssrc); + + if (twcc_ext_id > 0) { + guint16 data; + GST_WRITE_UINT16_BE (&data, seqnum); + gst_rtp_buffer_add_extension_onebyte_header (&rtp, twcc_ext_id, + &data, sizeof (guint16)); + } + + gst_rtp_buffer_unmap (&rtp); + + return buf; +} + +GST_START_TEST (rtpfunnel_custom_sticky) +{ + GstHarness *h, *h0, *h1; + GstEvent *event; + const GstStructure *s; + const gchar *value = NULL; + + h = gst_harness_new_with_padnames ("rtpfunnel", NULL, "src"); + + /* request a sinkpad, with some caps */ + h0 = gst_harness_new_with_element (h->element, "sink_0", NULL); + gst_harness_set_src_caps_str (h0, "application/x-rtp, " "ssrc=(uint)123"); + + /* request a second sinkpad, also with caps */ + h1 = gst_harness_new_with_element (h->element, "sink_1", NULL); + gst_harness_set_src_caps_str (h1, "application/x-rtp, " "ssrc=(uint)456"); + + while ((event = gst_harness_try_pull_event (h))) + gst_event_unref (event); + + fail_unless (gst_harness_push_event (h0, + gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM_STICKY, + gst_structure_new ("test", "key", G_TYPE_STRING, "value0", + NULL)))); + + fail_unless (gst_harness_push_event (h1, + gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM_STICKY, + gst_structure_new ("test", "key", G_TYPE_STRING, "value1", + NULL)))); + + /* Send a buffer through first pad, expect the event to be the first one */ + fail_unless_equals_int (GST_FLOW_OK, + gst_harness_push (h0, generate_test_buffer (500, 123, 0))); + for (;;) { + event = gst_harness_pull_event (h); + if (GST_EVENT_TYPE (event) == GST_EVENT_CUSTOM_DOWNSTREAM_STICKY) + break; + gst_event_unref (event); + } + s = gst_event_get_structure (event); + fail_unless (s); + fail_unless (gst_structure_has_name (s, "test")); + value = gst_structure_get_string (s, "key"); + fail_unless_equals_string (value, "value0"); + gst_event_unref (event); + gst_buffer_unref (gst_harness_pull (h)); + + /* Send a buffer through second pad, expect the event to be the second one + */ + fail_unless_equals_int (GST_FLOW_OK, + gst_harness_push (h1, generate_test_buffer (500, 123, 0))); + for (;;) { + event = gst_harness_pull_event (h); + if (GST_EVENT_TYPE (event) == GST_EVENT_CUSTOM_DOWNSTREAM_STICKY) + break; + gst_event_unref (event); + } + s = gst_event_get_structure (event); + fail_unless (s); + fail_unless (gst_structure_has_name (s, "test")); + value = gst_structure_get_string (s, "key"); + fail_unless_equals_string (value, "value1"); + gst_event_unref (event); + gst_buffer_unref (gst_harness_pull (h)); + + /* Send a buffer through first pad, expect the event to again be the first + * one + */ + fail_unless_equals_int (GST_FLOW_OK, + gst_harness_push (h0, generate_test_buffer (500, 123, 5))); + for (;;) { + event = gst_harness_pull_event (h); + if (GST_EVENT_TYPE (event) == GST_EVENT_CUSTOM_DOWNSTREAM_STICKY) + break; + gst_event_unref (event); + } + s = gst_event_get_structure (event); + fail_unless (s); + fail_unless (gst_structure_has_name (s, "test")); + value = gst_structure_get_string (s, "key"); + fail_unless_equals_string (value, "value0"); + gst_event_unref (event); + gst_buffer_unref (gst_harness_pull (h)); + + gst_harness_teardown (h); + gst_harness_teardown (h0); + gst_harness_teardown (h1); +} + +GST_END_TEST; + GST_START_TEST (rtpfunnel_stress) { GstHarness *h = gst_harness_new_with_padnames ("rtpfunnel", @@ -224,34 +343,6 @@ GST_START_TEST (rtpfunnel_twcc_caps) GST_END_TEST; -static GstBuffer * -generate_test_buffer (guint seqnum, guint ssrc, guint8 twcc_ext_id) -{ - GstBuffer *buf; - GstRTPBuffer rtp = GST_RTP_BUFFER_INIT; - - buf = gst_rtp_buffer_new_allocate (0, 0, 0); - GST_BUFFER_PTS (buf) = seqnum * 20 * GST_MSECOND; - GST_BUFFER_DTS (buf) = GST_BUFFER_PTS (buf); - - gst_rtp_buffer_map (buf, GST_MAP_READWRITE, &rtp); - gst_rtp_buffer_set_payload_type (&rtp, 100); - gst_rtp_buffer_set_seq (&rtp, seqnum); - gst_rtp_buffer_set_timestamp (&rtp, seqnum * 160); - gst_rtp_buffer_set_ssrc (&rtp, ssrc); - - if (twcc_ext_id > 0) { - guint16 data; - GST_WRITE_UINT16_BE (&data, seqnum); - gst_rtp_buffer_add_extension_onebyte_header (&rtp, twcc_ext_id, - &data, sizeof (guint16)); - } - - gst_rtp_buffer_unmap (&rtp); - - return buf; -} - static gint32 get_twcc_seqnum (GstBuffer * buf, guint8 ext_id) { @@ -408,6 +499,7 @@ rtpfunnel_suite (void) tcase_add_test (tc_chain, rtpfunnel_ssrc_demuxing); tcase_add_test (tc_chain, rtpfunnel_ssrc_downstream_not_leaking_through); tcase_add_test (tc_chain, rtpfunnel_common_ts_offset); + tcase_add_test (tc_chain, rtpfunnel_custom_sticky); tcase_add_test (tc_chain, rtpfunnel_stress);