mirror of
https://gitlab.freedesktop.org/gstreamer/gstreamer.git
synced 2024-12-24 17:20:36 +00:00
a51073c7de
Commit #1018aa made rtprtxsend handle buffer lists, breaking the test which probes for buffers, but not buffer lists. Use a utility function to run the probe callback on each buffer in the list in turn and remove any buffers that are dropped.
1598 lines
53 KiB
C
1598 lines
53 KiB
C
/* GStreamer
|
|
*
|
|
* Copyright (C) 2013 Collabora Ltd.
|
|
* @author Julien Isorce <julien.isorce@collabora.co.uk>
|
|
*
|
|
* This library is free software; you can redistribute it and/or
|
|
* modify it under the terms of the GNU Library General Public
|
|
* License as published by the Free Software Foundation; either
|
|
* version 2 of the License, or (at your option) any later version.
|
|
*
|
|
* This library is distributed in the hope that it will be useful,
|
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
|
* Library General Public License for more details.
|
|
*
|
|
* You should have received a copy of the GNU Library General Public
|
|
* License along with this library; if not, write to the
|
|
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
|
|
* Boston, MA 02110-1301, USA.
|
|
*/
|
|
|
|
#include <gst/check/gstcheck.h>
|
|
#include <gst/check/gstconsistencychecker.h>
|
|
|
|
#include <gst/rtp/gstrtpbuffer.h>
|
|
|
|
/* For ease of programming we use globals to keep refs for our floating
|
|
* src and sink pads we create; otherwise we always have to do get_pad,
|
|
* get_peer, and then remove references in every test function */
|
|
static GstPad *srcpad, *sinkpad;
|
|
/* we also have a list of src buffers */
|
|
static GList *inbuffers = NULL;
|
|
|
|
#define RTP_CAPS_STRING \
|
|
"application/x-rtp, " \
|
|
"media = (string)audio, " \
|
|
"payload = (int) 0, " \
|
|
"clock-rate = (int) 8000, " \
|
|
"ssrc = (uint) 42, " \
|
|
"encoding-name = (string)PCMU"
|
|
|
|
#define RTP_FRAME_SIZE 20
|
|
|
|
static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
|
|
GST_PAD_SINK,
|
|
GST_PAD_ALWAYS,
|
|
GST_STATIC_CAPS ("application/x-rtp")
|
|
);
|
|
static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
|
|
GST_PAD_SRC,
|
|
GST_PAD_ALWAYS,
|
|
GST_STATIC_CAPS ("application/x-rtp")
|
|
);
|
|
|
|
static void
|
|
setup_rtprtx (GstElement * rtprtxsend, GstElement * rtprtxreceive,
|
|
gint num_buffers)
|
|
{
|
|
GstBuffer *buffer;
|
|
GstPad *sendsrcpad;
|
|
GstPad *receivesinkpad;
|
|
gboolean ret = FALSE;
|
|
|
|
/* a 20 sample audio block (2,5 ms) generated with
|
|
* gst-launch audiotestsrc wave=silence blocksize=40 num-buffers=3 !
|
|
* "audio/x-raw,channels=1,rate=8000" ! mulawenc ! rtppcmupay !
|
|
* fakesink dump=1
|
|
*/
|
|
guint8 in[] = { /* first 4 bytes are rtp-header, next 4 bytes are timestamp */
|
|
0x80, 0x80, 0x1c, 0x24, 0x46, 0xcd, 0xb7, 0x11, 0x3c, 0x3a, 0x7c, 0x5b,
|
|
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
|
|
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff
|
|
};
|
|
GstClockTime ts = G_GUINT64_CONSTANT (0);
|
|
GstClockTime tso = gst_util_uint64_scale (RTP_FRAME_SIZE, GST_SECOND, 8000);
|
|
gint i;
|
|
|
|
srcpad = gst_check_setup_src_pad (rtprtxsend, &srctemplate);
|
|
sendsrcpad = gst_element_get_static_pad (rtprtxsend, "src");
|
|
ret = gst_pad_set_active (srcpad, TRUE);
|
|
fail_if (ret == FALSE);
|
|
|
|
sinkpad = gst_check_setup_sink_pad (rtprtxreceive, &sinktemplate);
|
|
receivesinkpad = gst_element_get_static_pad (rtprtxreceive, "sink");
|
|
ret = gst_pad_set_active (sinkpad, TRUE);
|
|
fail_if (ret == FALSE);
|
|
|
|
fail_if (gst_pad_link (sendsrcpad, receivesinkpad) != GST_PAD_LINK_OK);
|
|
|
|
ret = gst_pad_set_active (sendsrcpad, TRUE);
|
|
fail_if (ret == FALSE);
|
|
ret = gst_pad_set_active (receivesinkpad, TRUE);
|
|
fail_if (ret == FALSE);
|
|
|
|
gst_object_unref (sendsrcpad);
|
|
gst_object_unref (receivesinkpad);
|
|
|
|
for (i = 0; i < num_buffers; i++) {
|
|
buffer = gst_buffer_new_and_alloc (sizeof (in));
|
|
gst_buffer_fill (buffer, 0, in, sizeof (in));
|
|
GST_BUFFER_DTS (buffer) = ts;
|
|
GST_BUFFER_PTS (buffer) = ts;
|
|
GST_BUFFER_DURATION (buffer) = tso;
|
|
GST_DEBUG ("created buffer: %p", buffer);
|
|
|
|
/*if (!i)
|
|
GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT); */
|
|
|
|
inbuffers = g_list_append (inbuffers, buffer);
|
|
|
|
/* hackish way to update the rtp header */
|
|
in[1] = 0x00;
|
|
in[3]++; /* seqnumber */
|
|
in[7] += RTP_FRAME_SIZE; /* inc. timestamp with framesize */
|
|
ts += tso;
|
|
}
|
|
}
|
|
|
|
static GstStateChangeReturn
|
|
start_rtprtx (GstElement * element)
|
|
{
|
|
GstStateChangeReturn ret;
|
|
|
|
ret = gst_element_set_state (element, GST_STATE_PLAYING);
|
|
ck_assert_int_ne (ret, GST_STATE_CHANGE_FAILURE);
|
|
|
|
ret = gst_element_get_state (element, NULL, NULL, GST_CLOCK_TIME_NONE);
|
|
ck_assert_int_ne (ret, GST_STATE_CHANGE_FAILURE);
|
|
|
|
return ret;
|
|
}
|
|
|
|
static void
|
|
cleanup_rtprtx (GstElement * rtprtxsend, GstElement * rtprtxreceive)
|
|
{
|
|
GST_DEBUG ("cleanup_rtprtx");
|
|
|
|
g_list_free (inbuffers);
|
|
inbuffers = NULL;
|
|
|
|
gst_pad_set_active (srcpad, FALSE);
|
|
gst_check_teardown_src_pad (rtprtxsend);
|
|
gst_check_teardown_element (rtprtxsend);
|
|
|
|
gst_pad_set_active (sinkpad, FALSE);
|
|
gst_check_teardown_sink_pad (rtprtxreceive);
|
|
gst_check_teardown_element (rtprtxreceive);
|
|
}
|
|
|
|
static void
|
|
check_rtprtx_results (GstElement * rtprtxsend, GstElement * rtprtxreceive,
|
|
gint num_buffers)
|
|
{
|
|
guint nbrtxrequests = 0;
|
|
guint nbrtxpackets = 0;
|
|
|
|
g_object_get (G_OBJECT (rtprtxsend), "num-rtx-requests", &nbrtxrequests,
|
|
NULL);
|
|
fail_unless_equals_int (nbrtxrequests, 3);
|
|
|
|
g_object_get (G_OBJECT (rtprtxsend), "num-rtx-packets", &nbrtxpackets, NULL);
|
|
fail_unless_equals_int (nbrtxpackets, 3);
|
|
|
|
g_object_get (G_OBJECT (rtprtxreceive), "num-rtx-requests", &nbrtxrequests,
|
|
NULL);
|
|
fail_unless_equals_int (nbrtxrequests, 3);
|
|
|
|
g_object_get (G_OBJECT (rtprtxreceive), "num-rtx-packets", &nbrtxpackets,
|
|
NULL);
|
|
fail_unless_equals_int (nbrtxpackets, 3);
|
|
|
|
g_object_get (G_OBJECT (rtprtxreceive), "num-rtx-assoc-packets",
|
|
&nbrtxpackets, NULL);
|
|
fail_unless_equals_int (nbrtxpackets, 3);
|
|
}
|
|
|
|
|
|
GST_START_TEST (test_push_forward_seq)
|
|
{
|
|
GstElement *rtprtxsend;
|
|
GstElement *rtprtxreceive;
|
|
const guint num_buffers = 4;
|
|
GList *node;
|
|
gint i = 0;
|
|
GstCaps *caps = NULL;
|
|
GstStructure *pt_map;
|
|
|
|
rtprtxsend = gst_check_setup_element ("rtprtxsend");
|
|
rtprtxreceive = gst_check_setup_element ("rtprtxreceive");
|
|
setup_rtprtx (rtprtxsend, rtprtxreceive, num_buffers);
|
|
GST_DEBUG ("setup_rtprtx");
|
|
|
|
fail_unless (start_rtprtx (rtprtxsend)
|
|
== GST_STATE_CHANGE_SUCCESS, "could not set to playing");
|
|
|
|
fail_unless (start_rtprtx (rtprtxreceive)
|
|
== GST_STATE_CHANGE_SUCCESS, "could not set to playing");
|
|
|
|
caps = gst_caps_from_string (RTP_CAPS_STRING);
|
|
gst_check_setup_events (srcpad, rtprtxsend, caps, GST_FORMAT_TIME);
|
|
gst_caps_unref (caps);
|
|
|
|
pt_map = gst_structure_new ("application/x-rtp-pt-map",
|
|
"0", G_TYPE_UINT, 97, NULL);
|
|
g_object_set (rtprtxsend, "payload-type-map", pt_map, NULL);
|
|
g_object_set (rtprtxreceive, "payload-type-map", pt_map, NULL);
|
|
gst_structure_free (pt_map);
|
|
|
|
/* push buffers: 0,1,2, */
|
|
for (node = inbuffers; node; node = g_list_next (node)) {
|
|
GstEvent *event = NULL;
|
|
GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
|
|
GstBuffer *buffer = (GstBuffer *) node->data;
|
|
GList *last_out_buffer;
|
|
guint64 end_time;
|
|
gboolean res;
|
|
|
|
gst_buffer_ref (buffer);
|
|
fail_unless_equals_int (gst_pad_push (srcpad, buffer), GST_FLOW_OK);
|
|
|
|
if (i < 3) {
|
|
gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp);
|
|
event = gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM,
|
|
gst_structure_new ("GstRTPRetransmissionRequest",
|
|
"seqnum", G_TYPE_UINT, (guint) gst_rtp_buffer_get_seq (&rtp),
|
|
"ssrc", G_TYPE_UINT, (guint) gst_rtp_buffer_get_ssrc (&rtp),
|
|
"payload-type", G_TYPE_UINT,
|
|
(guint) gst_rtp_buffer_get_payload_type (&rtp), NULL));
|
|
gst_rtp_buffer_unmap (&rtp);
|
|
|
|
/* synchronize with the chain() function of the "sinkpad"
|
|
* to make sure that rtxsend has pushed the rtx buffer out
|
|
* before continuing */
|
|
last_out_buffer = g_list_last (buffers);
|
|
g_mutex_lock (&check_mutex);
|
|
fail_unless (gst_pad_push_event (sinkpad, event));
|
|
end_time = g_get_monotonic_time () + G_TIME_SPAN_SECOND;
|
|
do
|
|
res = g_cond_wait_until (&check_cond, &check_mutex, end_time);
|
|
while (res == TRUE && last_out_buffer == g_list_last (buffers));
|
|
g_mutex_unlock (&check_mutex);
|
|
}
|
|
gst_buffer_unref (buffer);
|
|
++i;
|
|
}
|
|
|
|
/* check the buffer list */
|
|
check_rtprtx_results (rtprtxsend, rtprtxreceive, num_buffers);
|
|
|
|
/* cleanup */
|
|
cleanup_rtprtx (rtprtxsend, rtprtxreceive);
|
|
}
|
|
|
|
GST_END_TEST;
|
|
|
|
static void
|
|
message_received (GstBus * bus, GstMessage * message, gboolean * eos)
|
|
{
|
|
GST_INFO ("bus message from \"%" GST_PTR_FORMAT "\": %" GST_PTR_FORMAT,
|
|
GST_MESSAGE_SRC (message), message);
|
|
|
|
switch (message->type) {
|
|
case GST_MESSAGE_EOS:
|
|
*eos = TRUE;
|
|
break;
|
|
case GST_MESSAGE_WARNING:{
|
|
GError *gerror;
|
|
gchar *debug;
|
|
|
|
gst_message_parse_warning (message, &gerror, &debug);
|
|
gst_object_default_error (GST_MESSAGE_SRC (message), gerror, debug);
|
|
g_error_free (gerror);
|
|
g_free (debug);
|
|
break;
|
|
}
|
|
case GST_MESSAGE_ERROR:{
|
|
GError *gerror;
|
|
gchar *debug;
|
|
|
|
gst_message_parse_error (message, &gerror, &debug);
|
|
gst_object_default_error (GST_MESSAGE_SRC (message), gerror, debug);
|
|
fail ("Error: %s / %s", gerror->message, debug);
|
|
g_error_free (gerror);
|
|
g_free (debug);
|
|
break;
|
|
}
|
|
default:
|
|
break;
|
|
}
|
|
}
|
|
|
|
typedef struct
|
|
{
|
|
guint count;
|
|
guint nb_packets;
|
|
guint drop_every_n_packets;
|
|
} RTXSendData;
|
|
|
|
typedef struct
|
|
{
|
|
guint nb_packets;
|
|
guint seqnum_offset;
|
|
guint seqnum_prev;
|
|
} RTXReceiveData;
|
|
|
|
static GstPadProbeReturn
|
|
do_buffer_list_as_buffers_probe (GstPad * pad, GstPadProbeInfo * info,
|
|
gpointer user_data, GstPadProbeCallback callback)
|
|
{
|
|
/* Iterate the buffer list, removing any items that we're
|
|
* told to drop and creating a new bufferlist. If all buffers
|
|
* are dropped, return DROP.
|
|
*/
|
|
guint i, len;
|
|
GstBufferList *list;
|
|
GstBufferList *outlist;
|
|
GstPadProbeInfo buf_info = *info;
|
|
|
|
GST_INFO_OBJECT (pad, "probing each buffer in list individually");
|
|
|
|
list = gst_pad_probe_info_get_buffer_list (info);
|
|
|
|
g_return_val_if_fail (list != NULL, GST_PAD_PROBE_REMOVE);
|
|
|
|
len = gst_buffer_list_length (list);
|
|
outlist = gst_buffer_list_new_sized (len);
|
|
|
|
buf_info.type = GST_PAD_PROBE_TYPE_BUFFER | GST_PAD_PROBE_TYPE_PUSH;
|
|
for (i = 0; i < len; i++) {
|
|
GstBuffer *buffer = gst_buffer_list_get (list, i);
|
|
GstPadProbeReturn ret;
|
|
buf_info.data = buffer;
|
|
ret = callback (pad, &buf_info, user_data);
|
|
/* If the buffer wasn't dropped, add it to the output list */
|
|
if (ret != GST_PAD_PROBE_DROP)
|
|
gst_buffer_list_insert (outlist, -1, gst_buffer_ref (buffer));
|
|
}
|
|
|
|
len = gst_buffer_list_length (outlist);
|
|
if (len == 0) {
|
|
/* Everything was discarded, drop our outlist */
|
|
gst_buffer_list_unref (outlist);
|
|
return GST_PAD_PROBE_DROP;
|
|
}
|
|
|
|
/* Replace the original buffer list with the modified one */
|
|
gst_buffer_list_unref (list);
|
|
info->data = outlist;
|
|
return GST_PAD_PROBE_OK;
|
|
}
|
|
|
|
static GstPadProbeReturn
|
|
rtprtxsend_srcpad_probe (GstPad * pad, GstPadProbeInfo * info,
|
|
gpointer user_data)
|
|
{
|
|
GstPadProbeReturn ret = GST_PAD_PROBE_OK;
|
|
|
|
GST_LOG_OBJECT (pad, "here");
|
|
if (info->type == (GST_PAD_PROBE_TYPE_BUFFER_LIST | GST_PAD_PROBE_TYPE_PUSH))
|
|
return do_buffer_list_as_buffers_probe (pad, info, user_data,
|
|
rtprtxsend_srcpad_probe);
|
|
|
|
if (info->type == (GST_PAD_PROBE_TYPE_BUFFER | GST_PAD_PROBE_TYPE_PUSH)) {
|
|
GstBuffer *buffer = GST_BUFFER (info->data);
|
|
RTXSendData *rtxdata = (RTXSendData *) user_data;
|
|
GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
|
|
guint payload_type = 0;
|
|
|
|
gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp);
|
|
payload_type = gst_rtp_buffer_get_payload_type (&rtp);
|
|
|
|
/* main stream packets */
|
|
if (payload_type == 96) {
|
|
/* count packets of the main stream */
|
|
++rtxdata->nb_packets;
|
|
/* drop some packets */
|
|
if (rtxdata->count < rtxdata->drop_every_n_packets) {
|
|
++rtxdata->count;
|
|
} else {
|
|
/* drop a packet every 'rtxdata->count' packets */
|
|
rtxdata->count = 1;
|
|
ret = GST_PAD_PROBE_DROP;
|
|
}
|
|
} else {
|
|
/* retransmission packets */
|
|
}
|
|
|
|
gst_rtp_buffer_unmap (&rtp);
|
|
}
|
|
|
|
return ret;
|
|
}
|
|
|
|
static GstPadProbeReturn
|
|
rtprtxreceive_srcpad_probe (GstPad * pad, GstPadProbeInfo * info,
|
|
gpointer user_data)
|
|
{
|
|
if (info->type == (GST_PAD_PROBE_TYPE_BUFFER | GST_PAD_PROBE_TYPE_PUSH)) {
|
|
GstBuffer *buffer = GST_BUFFER (info->data);
|
|
RTXReceiveData *rtxdata = (RTXReceiveData *) user_data;
|
|
GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
|
|
guint seqnum = 0;
|
|
guint i = 0;
|
|
|
|
gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp);
|
|
seqnum = gst_rtp_buffer_get_seq (&rtp);
|
|
|
|
/* check if there is a dropped packet */
|
|
if (seqnum > rtxdata->seqnum_prev + rtxdata->seqnum_offset) {
|
|
GstPad *peerpad = gst_pad_get_peer (pad);
|
|
|
|
/* ask retransmission of missing packet */
|
|
for (i = rtxdata->seqnum_prev + rtxdata->seqnum_offset; i < seqnum;
|
|
i += rtxdata->seqnum_offset) {
|
|
GstEvent *event = gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM,
|
|
gst_structure_new ("GstRTPRetransmissionRequest",
|
|
"seqnum", G_TYPE_UINT, i,
|
|
"ssrc", G_TYPE_UINT, gst_rtp_buffer_get_ssrc (&rtp),
|
|
"payload-type", G_TYPE_UINT,
|
|
gst_rtp_buffer_get_payload_type (&rtp),
|
|
NULL));
|
|
gst_pad_push_event (peerpad, event);
|
|
}
|
|
gst_object_unref (peerpad);
|
|
|
|
rtxdata->seqnum_prev = seqnum;
|
|
} else if (seqnum == rtxdata->seqnum_prev + rtxdata->seqnum_offset) {
|
|
/* also update previous seqnum in this case */
|
|
rtxdata->seqnum_prev = seqnum;
|
|
}
|
|
|
|
gst_rtp_buffer_unmap (&rtp);
|
|
|
|
++rtxdata->nb_packets;
|
|
}
|
|
|
|
return GST_PAD_PROBE_OK;
|
|
}
|
|
|
|
static void
|
|
start_test_drop_and_check_results (GstElement * bin, GstElement * rtppayloader,
|
|
GstElement * rtprtxsend, GstElement * rtprtxreceive,
|
|
RTXSendData * send_rtxdata, RTXReceiveData * receive_rtxdata,
|
|
guint drop_every_n_packets, gboolean * eos)
|
|
{
|
|
GstStateChangeReturn state_res = GST_STATE_CHANGE_FAILURE;
|
|
guint nbrtxrequests = 0;
|
|
guint nbrtxpackets = 0;
|
|
guint nb_expected_requests = 0;
|
|
GstStructure *pt_map;
|
|
|
|
GST_INFO ("starting test");
|
|
|
|
pt_map = gst_structure_new ("application/x-rtp-pt-map",
|
|
"96", G_TYPE_UINT, 99, NULL);
|
|
g_object_set (rtppayloader, "pt", 96, NULL);
|
|
g_object_set (rtppayloader, "seqnum-offset", 1, NULL);
|
|
g_object_set (rtprtxsend, "payload-type-map", pt_map, NULL);
|
|
g_object_set (rtprtxreceive, "payload-type-map", pt_map, NULL);
|
|
gst_structure_free (pt_map);
|
|
|
|
send_rtxdata->count = 1;
|
|
send_rtxdata->nb_packets = 0;
|
|
send_rtxdata->drop_every_n_packets = drop_every_n_packets;
|
|
|
|
receive_rtxdata->nb_packets = 0;
|
|
receive_rtxdata->seqnum_offset = 0;
|
|
receive_rtxdata->seqnum_prev = 0;
|
|
|
|
*eos = FALSE;
|
|
|
|
/* retrieve offset before going to paused */
|
|
g_object_get (G_OBJECT (rtppayloader), "seqnum-offset",
|
|
&receive_rtxdata->seqnum_offset, NULL);
|
|
|
|
/* prepare playing */
|
|
state_res = gst_element_set_state (bin, GST_STATE_PAUSED);
|
|
ck_assert_int_ne (state_res, GST_STATE_CHANGE_FAILURE);
|
|
|
|
/* wait for completion */
|
|
state_res = gst_element_get_state (bin, NULL, NULL, GST_CLOCK_TIME_NONE);
|
|
ck_assert_int_ne (state_res, GST_STATE_CHANGE_FAILURE);
|
|
|
|
/* retrieve seqnum_prev here to make sure it has been reseted */
|
|
g_object_get (G_OBJECT (rtppayloader), "seqnum",
|
|
&receive_rtxdata->seqnum_prev, NULL);
|
|
|
|
/* run pipeline */
|
|
state_res = gst_element_set_state (bin, GST_STATE_PLAYING);
|
|
ck_assert_int_ne (state_res, GST_STATE_CHANGE_FAILURE);
|
|
|
|
GST_INFO ("running main loop");
|
|
while (!*eos)
|
|
g_main_context_iteration (NULL, TRUE);
|
|
|
|
/* check results */
|
|
|
|
if (send_rtxdata->nb_packets % drop_every_n_packets == 0) {
|
|
/* special case because the last buffer will be dropped
|
|
* so the receiver cannot know if it has been dropped (no next packet)
|
|
*/
|
|
nb_expected_requests = send_rtxdata->nb_packets / drop_every_n_packets - 1;
|
|
fail_unless_equals_int (send_rtxdata->nb_packets,
|
|
receive_rtxdata->nb_packets + 1);
|
|
} else {
|
|
nb_expected_requests = send_rtxdata->nb_packets / drop_every_n_packets;
|
|
fail_unless_equals_int (send_rtxdata->nb_packets,
|
|
receive_rtxdata->nb_packets);
|
|
}
|
|
|
|
g_object_get (G_OBJECT (rtprtxsend), "num-rtx-requests", &nbrtxrequests,
|
|
NULL);
|
|
fail_unless_equals_int (nbrtxrequests, nb_expected_requests);
|
|
|
|
g_object_get (G_OBJECT (rtprtxsend), "num-rtx-packets", &nbrtxpackets, NULL);
|
|
fail_unless_equals_int (nbrtxpackets, nb_expected_requests);
|
|
|
|
g_object_get (G_OBJECT (rtprtxreceive), "num-rtx-requests", &nbrtxrequests,
|
|
NULL);
|
|
fail_unless_equals_int (nbrtxrequests, nb_expected_requests);
|
|
|
|
g_object_get (G_OBJECT (rtprtxreceive), "num-rtx-packets", &nbrtxpackets,
|
|
NULL);
|
|
fail_unless_equals_int (nbrtxpackets, nb_expected_requests);
|
|
|
|
g_object_get (G_OBJECT (rtprtxreceive), "num-rtx-assoc-packets",
|
|
&nbrtxpackets, NULL);
|
|
fail_unless_equals_int (nbrtxpackets, nb_expected_requests);
|
|
|
|
state_res = gst_element_set_state (bin, GST_STATE_NULL);
|
|
ck_assert_int_ne (state_res, GST_STATE_CHANGE_FAILURE);
|
|
}
|
|
|
|
/* This test build the pipeline videotestsrc ! rtpvrawpay ! rtprtxsend ! rtprtxreceive ! fakesink
|
|
* and drop some buffer between rtprtxsend and rtprtxreceive
|
|
* Then it checks that every dropped packet has been re-sent and it checks that
|
|
* not too much requests has been sent.
|
|
*/
|
|
GST_START_TEST (test_drop_one_sender)
|
|
{
|
|
GstElement *bin, *src, *rtppayloader, *rtprtxsend, *rtprtxreceive, *sink;
|
|
GstBus *bus;
|
|
gboolean res;
|
|
GstPad *srcpad, *sinkpad;
|
|
GstStreamConsistency *chk_1, *chk_2, *chk_3;
|
|
gint num_buffers = 20;
|
|
guint drop_every_n_packets = 0;
|
|
RTXSendData send_rtxdata;
|
|
RTXReceiveData receive_rtxdata;
|
|
gboolean eos = FALSE;
|
|
|
|
GST_INFO ("preparing test");
|
|
|
|
/* build pipeline */
|
|
bin = gst_pipeline_new ("pipeline");
|
|
bus = gst_element_get_bus (bin);
|
|
gst_bus_add_signal_watch_full (bus, G_PRIORITY_HIGH);
|
|
|
|
src = gst_element_factory_make ("videotestsrc", "src");
|
|
g_object_set (src, "num-buffers", num_buffers, NULL);
|
|
rtppayloader = gst_element_factory_make ("rtpvrawpay", "rtppayloader");
|
|
rtprtxsend = gst_element_factory_make ("rtprtxsend", "rtprtxsend");
|
|
rtprtxreceive = gst_element_factory_make ("rtprtxreceive", "rtprtxreceive");
|
|
sink = gst_element_factory_make ("fakesink", "sink");
|
|
gst_bin_add_many (GST_BIN (bin), src, rtppayloader, rtprtxsend, rtprtxreceive,
|
|
sink, NULL);
|
|
|
|
res = gst_element_link (src, rtppayloader);
|
|
fail_unless (res == TRUE, NULL);
|
|
res = gst_element_link (rtppayloader, rtprtxsend);
|
|
fail_unless (res == TRUE, NULL);
|
|
res = gst_element_link (rtprtxsend, rtprtxreceive);
|
|
fail_unless (res == TRUE, NULL);
|
|
res = gst_element_link (rtprtxreceive, sink);
|
|
fail_unless (res == TRUE, NULL);
|
|
|
|
/* create consistency checkers for the pads */
|
|
|
|
srcpad = gst_element_get_static_pad (rtppayloader, "src");
|
|
chk_1 = gst_consistency_checker_new (srcpad);
|
|
gst_object_unref (srcpad);
|
|
|
|
srcpad = gst_element_get_static_pad (rtprtxsend, "src");
|
|
gst_pad_add_probe (srcpad,
|
|
(GST_PAD_PROBE_TYPE_BUFFER | GST_PAD_PROBE_TYPE_BUFFER_LIST |
|
|
GST_PAD_PROBE_TYPE_PUSH),
|
|
(GstPadProbeCallback) rtprtxsend_srcpad_probe, &send_rtxdata, NULL);
|
|
sinkpad = gst_pad_get_peer (srcpad);
|
|
fail_if (sinkpad == NULL);
|
|
chk_2 = gst_consistency_checker_new (sinkpad);
|
|
gst_object_unref (sinkpad);
|
|
gst_object_unref (srcpad);
|
|
|
|
srcpad = gst_element_get_static_pad (rtprtxreceive, "src");
|
|
gst_pad_add_probe (srcpad,
|
|
(GST_PAD_PROBE_TYPE_BUFFER | GST_PAD_PROBE_TYPE_PUSH),
|
|
(GstPadProbeCallback) rtprtxreceive_srcpad_probe, &receive_rtxdata, NULL);
|
|
sinkpad = gst_pad_get_peer (srcpad);
|
|
fail_if (sinkpad == NULL);
|
|
chk_3 = gst_consistency_checker_new (sinkpad);
|
|
gst_object_unref (sinkpad);
|
|
gst_object_unref (srcpad);
|
|
|
|
g_signal_connect (bus, "message::error", (GCallback) message_received, NULL);
|
|
g_signal_connect (bus, "message::warning", (GCallback) message_received,
|
|
NULL);
|
|
g_signal_connect (bus, "message::eos", (GCallback) message_received, &eos);
|
|
|
|
for (drop_every_n_packets = 2; drop_every_n_packets < 10;
|
|
drop_every_n_packets++) {
|
|
start_test_drop_and_check_results (bin, rtppayloader, rtprtxsend,
|
|
rtprtxreceive, &send_rtxdata, &receive_rtxdata, drop_every_n_packets,
|
|
&eos);
|
|
}
|
|
|
|
/* cleanup */
|
|
gst_consistency_checker_free (chk_1);
|
|
gst_consistency_checker_free (chk_2);
|
|
gst_consistency_checker_free (chk_3);
|
|
gst_bus_remove_signal_watch (bus);
|
|
gst_object_unref (bus);
|
|
gst_object_unref (bin);
|
|
}
|
|
|
|
GST_END_TEST;
|
|
|
|
static void
|
|
message_received_multiple (GstBus * bus, GstMessage * message, gpointer data)
|
|
{
|
|
GST_INFO ("bus message from \"%" GST_PTR_FORMAT "\": %" GST_PTR_FORMAT,
|
|
GST_MESSAGE_SRC (message), message);
|
|
|
|
switch (message->type) {
|
|
case GST_MESSAGE_WARNING:{
|
|
GError *gerror;
|
|
gchar *debug;
|
|
|
|
gst_message_parse_warning (message, &gerror, &debug);
|
|
gst_object_default_error (GST_MESSAGE_SRC (message), gerror, debug);
|
|
g_error_free (gerror);
|
|
g_free (debug);
|
|
break;
|
|
}
|
|
case GST_MESSAGE_ERROR:{
|
|
GError *gerror;
|
|
gchar *debug;
|
|
|
|
gst_message_parse_error (message, &gerror, &debug);
|
|
gst_object_default_error (GST_MESSAGE_SRC (message), gerror, debug);
|
|
fail ("Error: %s / %s", gerror->message, debug);
|
|
g_error_free (gerror);
|
|
g_free (debug);
|
|
break;
|
|
}
|
|
default:
|
|
break;
|
|
}
|
|
}
|
|
|
|
typedef struct
|
|
{
|
|
guint count;
|
|
guint nb_packets;
|
|
guint drop_every_n_packets;
|
|
guint payload_type_master;
|
|
guint total_packets;
|
|
} RTXSendMultipleData;
|
|
|
|
/* drop some packets */
|
|
static GstPadProbeReturn
|
|
rtprtxsend_srcpad_probe_multiple (GstPad * pad, GstPadProbeInfo * info,
|
|
gpointer user_data)
|
|
{
|
|
GstPadProbeReturn ret = GST_PAD_PROBE_OK;
|
|
|
|
if (info->type == (GST_PAD_PROBE_TYPE_BUFFER | GST_PAD_PROBE_TYPE_PUSH)) {
|
|
GstBuffer *buffer = GST_BUFFER (info->data);
|
|
RTXSendMultipleData *rtxdata = (RTXSendMultipleData *) user_data;
|
|
GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
|
|
guint payload_type = 0;
|
|
|
|
gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp);
|
|
payload_type = gst_rtp_buffer_get_payload_type (&rtp);
|
|
|
|
/* main stream packets */
|
|
if (payload_type == rtxdata->payload_type_master) {
|
|
/* count packets of the main stream */
|
|
++rtxdata->nb_packets;
|
|
/* drop some packets */
|
|
/* but make sure we never drop the last one, otherwise there
|
|
* will be nothing to trigger a retransmission.
|
|
*/
|
|
if (rtxdata->count < rtxdata->drop_every_n_packets ||
|
|
rtxdata->nb_packets == rtxdata->total_packets) {
|
|
++rtxdata->count;
|
|
} else {
|
|
/* drop a packet every 'rtxdata->count' packets */
|
|
rtxdata->count = 1;
|
|
ret = GST_PAD_PROBE_DROP;
|
|
}
|
|
} else {
|
|
/* retransmission packets */
|
|
}
|
|
|
|
gst_rtp_buffer_unmap (&rtp);
|
|
}
|
|
|
|
return ret;
|
|
}
|
|
|
|
/* make sure every sources has sent all their buffers */
|
|
static GstPadProbeReturn
|
|
source_srcpad_probe_multiple_drop_eos (GstPad * pad, GstPadProbeInfo * info,
|
|
gpointer user_data)
|
|
{
|
|
GstEvent *event = GST_PAD_PROBE_INFO_EVENT (info);
|
|
|
|
if (GST_EVENT_TYPE (event) == GST_EVENT_EOS)
|
|
return GST_PAD_PROBE_DROP;
|
|
else
|
|
return GST_PAD_PROBE_OK;
|
|
}
|
|
|
|
typedef struct
|
|
{
|
|
GHashTable *ssrc_to_nb_packets_map;
|
|
GHashTable *ssrc_to_seqnum_offset_map;
|
|
guint seqnum_offset;
|
|
|
|
gint to_send;
|
|
volatile gint dropped_requests;
|
|
volatile gint received;
|
|
gboolean request_passed;
|
|
} RTXReceiveMultipleData;
|
|
|
|
/* add one branch videotestsrc ! rtpvrawpay ! rtprtxsend ! queue ! funnel. */
|
|
static RTXSendMultipleData *
|
|
add_sender (GstElement * bin, const gchar * src_name,
|
|
const gchar * payloader_name, guint payload_type_master,
|
|
guint payload_type_aux, RTXReceiveMultipleData * rtxdata)
|
|
{
|
|
GstElement *src = NULL;
|
|
GstCaps *caps;
|
|
GstElement *rtppayloader = NULL;
|
|
GstElement *rtprtxsend = NULL;
|
|
GstElement *queue = NULL;
|
|
GstElement *funnel = NULL;
|
|
GstPad *srcpad = NULL;
|
|
gboolean res = FALSE;
|
|
RTXSendMultipleData *send_rtxdata = g_slice_new0 (RTXSendMultipleData);
|
|
gchar *pt_master;
|
|
GstStructure *pt_map;
|
|
|
|
send_rtxdata->count = 1;
|
|
send_rtxdata->nb_packets = 0;
|
|
send_rtxdata->drop_every_n_packets = 0;
|
|
send_rtxdata->payload_type_master = payload_type_master;
|
|
send_rtxdata->total_packets = 25;
|
|
rtxdata->to_send += send_rtxdata->total_packets;
|
|
|
|
src = gst_element_factory_make (src_name, NULL);
|
|
rtppayloader = gst_element_factory_make (payloader_name, NULL);
|
|
rtprtxsend = gst_element_factory_make ("rtprtxsend", NULL);
|
|
queue = gst_element_factory_make ("queue", NULL);
|
|
funnel = gst_bin_get_by_name (GST_BIN (bin), "funnel");
|
|
|
|
pt_master = g_strdup_printf ("%" G_GUINT32_FORMAT, payload_type_master);
|
|
pt_map = gst_structure_new ("application/x-rtp-pt-map",
|
|
pt_master, G_TYPE_UINT, payload_type_aux, NULL);
|
|
g_free (pt_master);
|
|
|
|
g_object_set (src, "num-buffers", send_rtxdata->total_packets, NULL);
|
|
g_object_set (src, "is-live", TRUE, NULL);
|
|
g_object_set (rtppayloader, "pt", payload_type_master, NULL);
|
|
g_object_set (rtppayloader, "seqnum-offset", 1, NULL);
|
|
g_object_set (rtprtxsend, "payload-type-map", pt_map, NULL);
|
|
/* we want that every drop packet be resent fast */
|
|
g_object_set (queue, "max-size-buffers", 1, NULL);
|
|
g_object_set (queue, "flush-on-eos", FALSE, NULL);
|
|
|
|
gst_structure_free (pt_map);
|
|
|
|
gst_bin_add_many (GST_BIN (bin), src, rtppayloader, rtprtxsend, queue, NULL);
|
|
|
|
/* Make sure we have one buffer per frame, makes it easier to count! */
|
|
caps =
|
|
gst_caps_from_string ("video/x-raw, width=20, height=10, framerate=30/1");
|
|
res = gst_element_link_filtered (src, rtppayloader, caps);
|
|
gst_caps_unref (caps);
|
|
fail_unless (res == TRUE, NULL);
|
|
res = gst_element_link (rtppayloader, rtprtxsend);
|
|
fail_unless (res == TRUE, NULL);
|
|
res = gst_element_link (rtprtxsend, queue);
|
|
fail_unless (res == TRUE, NULL);
|
|
res = gst_element_link (queue, funnel);
|
|
fail_unless (res == TRUE, NULL);
|
|
gst_object_unref (funnel);
|
|
|
|
/* to drop some packets */
|
|
srcpad = gst_element_get_static_pad (rtprtxsend, "src");
|
|
gst_pad_add_probe (srcpad,
|
|
(GST_PAD_PROBE_TYPE_BUFFER | GST_PAD_PROBE_TYPE_PUSH),
|
|
(GstPadProbeCallback) rtprtxsend_srcpad_probe_multiple, send_rtxdata,
|
|
NULL);
|
|
gst_object_unref (srcpad);
|
|
|
|
/* to make sure every sources has sent all their buffers */
|
|
srcpad = gst_element_get_static_pad (src, "src");
|
|
gst_pad_add_probe (srcpad, GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM,
|
|
(GstPadProbeCallback) source_srcpad_probe_multiple_drop_eos, NULL, NULL);
|
|
gst_object_unref (srcpad);
|
|
|
|
return send_rtxdata;
|
|
}
|
|
|
|
static GstPadProbeReturn
|
|
rtprtxreceive_sinkpad_probe_check_drop (GstPad * pad, GstPadProbeInfo * info,
|
|
gpointer user_data)
|
|
{
|
|
GstEvent *event = GST_PAD_PROBE_INFO_EVENT (info);
|
|
RTXReceiveMultipleData *rtxdata = (RTXReceiveMultipleData *) user_data;
|
|
|
|
if (GST_EVENT_TYPE (event) == GST_EVENT_CUSTOM_UPSTREAM &&
|
|
gst_event_get_structure (event) != NULL &&
|
|
gst_structure_has_name (gst_event_get_structure (event),
|
|
"GstRTPRetransmissionRequest"))
|
|
rtxdata->request_passed = TRUE;
|
|
|
|
return GST_PAD_PROBE_OK;
|
|
}
|
|
|
|
static gboolean
|
|
check_finished (RTXReceiveMultipleData * rtxdata)
|
|
{
|
|
return (g_atomic_int_get (&rtxdata->received) >= (rtxdata->to_send -
|
|
g_atomic_int_get (&rtxdata->dropped_requests)));
|
|
}
|
|
|
|
static GstPadProbeReturn
|
|
rtprtxreceive_srcpad_probe_multiple (GstPad * pad, GstPadProbeInfo * info,
|
|
gpointer user_data)
|
|
{
|
|
if (info->type == (GST_PAD_PROBE_TYPE_BUFFER | GST_PAD_PROBE_TYPE_PUSH)) {
|
|
GstBuffer *buffer = GST_BUFFER (info->data);
|
|
RTXReceiveMultipleData *rtxdata = (RTXReceiveMultipleData *) user_data;
|
|
GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
|
|
guint ssrc = 0;
|
|
guint seqnum = 0;
|
|
gpointer seqnum_prev = 0;
|
|
guint nb_packets = 0;
|
|
|
|
gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp);
|
|
ssrc = gst_rtp_buffer_get_ssrc (&rtp);
|
|
seqnum = gst_rtp_buffer_get_seq (&rtp);
|
|
|
|
g_atomic_int_inc (&rtxdata->received);
|
|
if (check_finished (rtxdata))
|
|
g_main_context_wakeup (NULL);
|
|
|
|
if (!g_hash_table_lookup_extended (rtxdata->ssrc_to_seqnum_offset_map,
|
|
GUINT_TO_POINTER (ssrc), NULL, &seqnum_prev)) {
|
|
/*In our test we take care to never drop the first buffer */
|
|
g_hash_table_insert (rtxdata->ssrc_to_seqnum_offset_map,
|
|
GUINT_TO_POINTER (ssrc), GUINT_TO_POINTER (seqnum));
|
|
g_hash_table_insert (rtxdata->ssrc_to_nb_packets_map,
|
|
GUINT_TO_POINTER (ssrc), GUINT_TO_POINTER (1));
|
|
gst_rtp_buffer_unmap (&rtp);
|
|
return GST_PAD_PROBE_OK;
|
|
}
|
|
|
|
|
|
/* check if there is a dropped packet
|
|
* (in our test every packet arrived in increasing order) */
|
|
if (seqnum > GPOINTER_TO_UINT (seqnum_prev) + rtxdata->seqnum_offset) {
|
|
GstPad *peerpad = gst_pad_get_peer (pad);
|
|
guint i = 0;
|
|
|
|
/* ask retransmission of missing packets */
|
|
for (i = GPOINTER_TO_UINT (seqnum_prev) + rtxdata->seqnum_offset;
|
|
i < seqnum; i += rtxdata->seqnum_offset) {
|
|
GstEvent *event = gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM,
|
|
gst_structure_new ("GstRTPRetransmissionRequest",
|
|
"seqnum", G_TYPE_UINT, i,
|
|
"ssrc", G_TYPE_UINT, gst_rtp_buffer_get_ssrc (&rtp),
|
|
"payload-type", G_TYPE_UINT,
|
|
gst_rtp_buffer_get_payload_type (&rtp),
|
|
NULL));
|
|
rtxdata->request_passed = FALSE;
|
|
gst_pad_push_event (peerpad, event);
|
|
if (!rtxdata->request_passed) {
|
|
g_atomic_int_inc (&rtxdata->dropped_requests);
|
|
if (check_finished (rtxdata))
|
|
g_main_context_wakeup (NULL);
|
|
}
|
|
}
|
|
gst_object_unref (peerpad);
|
|
|
|
g_hash_table_insert (rtxdata->ssrc_to_seqnum_offset_map,
|
|
GUINT_TO_POINTER (ssrc), GUINT_TO_POINTER (seqnum));
|
|
} else if (seqnum ==
|
|
GPOINTER_TO_UINT (seqnum_prev) + rtxdata->seqnum_offset) {
|
|
/* also update previous seqnum in this case */
|
|
g_hash_table_insert (rtxdata->ssrc_to_seqnum_offset_map,
|
|
GUINT_TO_POINTER (ssrc), GUINT_TO_POINTER (seqnum));
|
|
} else {
|
|
/* receive retransmited packet */
|
|
}
|
|
|
|
gst_rtp_buffer_unmap (&rtp);
|
|
|
|
nb_packets =
|
|
GPOINTER_TO_UINT (g_hash_table_lookup (rtxdata->ssrc_to_nb_packets_map,
|
|
GUINT_TO_POINTER (ssrc)));
|
|
g_hash_table_insert (rtxdata->ssrc_to_nb_packets_map,
|
|
GUINT_TO_POINTER (ssrc), GUINT_TO_POINTER (++nb_packets));
|
|
}
|
|
|
|
return GST_PAD_PROBE_OK;
|
|
}
|
|
|
|
static void
|
|
reset_rtx_send_data (RTXSendMultipleData * send_rtxdata, gpointer data)
|
|
{
|
|
send_rtxdata->count = 1;
|
|
send_rtxdata->nb_packets = 0;
|
|
send_rtxdata->drop_every_n_packets = *(guint *) data;
|
|
}
|
|
|
|
/* compute number of all packets sent by all sender */
|
|
static void
|
|
compute_total_packets_sent (RTXSendMultipleData * send_rtxdata, gpointer data)
|
|
{
|
|
guint *sum = (guint *) data;
|
|
*sum += send_rtxdata->nb_packets;
|
|
}
|
|
|
|
/* compute number of all packets received by rtprtxreceive::src pad */
|
|
static void
|
|
compute_total_packets_received (gpointer key, gpointer value, gpointer data)
|
|
{
|
|
guint *sum = (guint *) data;
|
|
*sum += GPOINTER_TO_UINT (value);
|
|
}
|
|
|
|
static void
|
|
start_test_drop_multiple_and_check_results (GstElement * bin,
|
|
GList * send_rtxdata_list, RTXReceiveMultipleData * receive_rtxdata,
|
|
guint drop_every_n_packets)
|
|
{
|
|
GstStateChangeReturn state_res = GST_STATE_CHANGE_FAILURE;
|
|
GstElement *rtprtxreceive =
|
|
gst_bin_get_by_name (GST_BIN (bin), "rtprtxreceive");
|
|
guint sum_all_packets_sent = 0;
|
|
guint sum_rtx_packets_sent = 0;
|
|
guint sum_all_packets_received = 0;
|
|
guint sum_rtx_packets_received = 0;
|
|
guint sum_rtx_assoc_packets_received = 0;
|
|
guint sum_rtx_dropped_packets_received = 0;
|
|
gdouble error_sent_recv = 0;
|
|
GstIterator *itr_elements = NULL;
|
|
gboolean done = FALSE;
|
|
GValue item = { 0 };
|
|
GstElement *element = NULL;
|
|
gchar *name = NULL;
|
|
|
|
GST_INFO ("starting test");
|
|
|
|
g_atomic_int_set (&receive_rtxdata->received, 0);
|
|
g_atomic_int_set (&receive_rtxdata->dropped_requests, 0);
|
|
|
|
g_hash_table_remove_all (receive_rtxdata->ssrc_to_nb_packets_map);
|
|
g_hash_table_remove_all (receive_rtxdata->ssrc_to_seqnum_offset_map);
|
|
|
|
g_list_foreach (send_rtxdata_list, (GFunc) reset_rtx_send_data,
|
|
&drop_every_n_packets);
|
|
|
|
/* run pipeline */
|
|
state_res = gst_element_set_state (bin, GST_STATE_PLAYING);
|
|
ck_assert_int_ne (state_res, GST_STATE_CHANGE_FAILURE);
|
|
|
|
state_res = gst_element_get_state (bin, NULL, NULL, GST_CLOCK_TIME_NONE);
|
|
ck_assert_int_ne (state_res, GST_STATE_CHANGE_FAILURE);
|
|
|
|
GST_INFO ("running main loop");
|
|
while (!check_finished (receive_rtxdata))
|
|
g_main_context_iteration (NULL, TRUE);
|
|
|
|
/* check results */
|
|
itr_elements = gst_bin_iterate_elements (GST_BIN (bin));
|
|
done = FALSE;
|
|
while (!done) {
|
|
switch (gst_iterator_next (itr_elements, &item)) {
|
|
case GST_ITERATOR_OK:
|
|
element = GST_ELEMENT (g_value_get_object (&item));
|
|
name = gst_element_get_name (element);
|
|
if (g_str_has_prefix (name, "rtprtxsend") > 0) {
|
|
guint nb_packets = 0;
|
|
g_object_get (G_OBJECT (element), "num-rtx-packets", &nb_packets,
|
|
NULL);
|
|
sum_rtx_packets_sent += nb_packets;
|
|
}
|
|
g_free (name);
|
|
g_value_reset (&item);
|
|
break;
|
|
case GST_ITERATOR_RESYNC:
|
|
gst_iterator_resync (itr_elements);
|
|
break;
|
|
case GST_ITERATOR_ERROR:
|
|
done = TRUE;
|
|
break;
|
|
case GST_ITERATOR_DONE:
|
|
done = TRUE;
|
|
break;
|
|
}
|
|
}
|
|
g_value_unset (&item);
|
|
gst_iterator_free (itr_elements);
|
|
|
|
/* compute number of all packets sent by all sender */
|
|
g_list_foreach (send_rtxdata_list, (GFunc) compute_total_packets_sent,
|
|
&sum_all_packets_sent);
|
|
|
|
/* compute number of all packets received by rtprtxreceive::src pad */
|
|
g_hash_table_foreach (receive_rtxdata->ssrc_to_nb_packets_map,
|
|
compute_total_packets_received, (gpointer) & sum_all_packets_received);
|
|
|
|
sum_all_packets_received +=
|
|
g_atomic_int_get (&receive_rtxdata->dropped_requests);
|
|
fail_if (sum_all_packets_sent < sum_all_packets_received);
|
|
|
|
/* some packet are not received, I still have to figure out why
|
|
* but I suspect it comes from pipeline setup/shutdown
|
|
*/
|
|
if (sum_all_packets_sent != sum_all_packets_received) {
|
|
error_sent_recv =
|
|
1 - sum_all_packets_received / (gdouble) sum_all_packets_sent;
|
|
fail_if (error_sent_recv > 0.30);
|
|
/* it should be 0% */
|
|
}
|
|
|
|
/* retrieve number of retransmit packets received by rtprtxreceive */
|
|
g_object_get (G_OBJECT (rtprtxreceive), "num-rtx-packets",
|
|
&sum_rtx_packets_received, NULL);
|
|
|
|
/* some of rtx packet are not received because the receiver avoids
|
|
* collision (= requests that have the same seqnum)
|
|
*/
|
|
fail_if (sum_rtx_packets_sent < sum_rtx_packets_received);
|
|
g_object_get (G_OBJECT (rtprtxreceive), "num-rtx-assoc-packets",
|
|
&sum_rtx_assoc_packets_received, NULL);
|
|
sum_rtx_dropped_packets_received =
|
|
sum_rtx_packets_received - sum_rtx_assoc_packets_received;
|
|
fail_unless_equals_int (sum_rtx_packets_sent,
|
|
sum_rtx_assoc_packets_received + sum_rtx_dropped_packets_received);
|
|
|
|
gst_object_unref (rtprtxreceive);
|
|
state_res = gst_element_set_state (bin, GST_STATE_NULL);
|
|
ck_assert_int_ne (state_res, GST_STATE_CHANGE_FAILURE);
|
|
}
|
|
|
|
static void
|
|
free_rtx_send_data (gpointer data)
|
|
{
|
|
g_slice_free (RTXSendMultipleData, data);
|
|
}
|
|
|
|
/* This test build the pipeline funnel name=funnel
|
|
* videotestsrc ! rtpvrawpay ! rtprtxsend ! queue ! funnel.
|
|
* videotestsrc ! rtpvrawpay ! rtprtxsend ! queue ! funnel.
|
|
* N
|
|
* funnel. ! rtprtxreceive ! fakesink
|
|
* and drop some buffer just after each rtprtxsend
|
|
* Then it checks that every dropped packet has been re-sent and it checks
|
|
* that not too much requests has been sent.
|
|
*/
|
|
GST_START_TEST (test_drop_multiple_sender)
|
|
{
|
|
GstElement *bin, *funnel, *rtprtxreceive, *sink;
|
|
GstBus *bus;
|
|
gboolean res;
|
|
GstPad *srcpad, *sinkpad;
|
|
guint drop_every_n_packets = 0;
|
|
GList *send_rtxdata_list = NULL;
|
|
RTXReceiveMultipleData receive_rtxdata = { NULL };
|
|
GstStructure *pt_map;
|
|
|
|
GST_INFO ("preparing test");
|
|
|
|
receive_rtxdata.ssrc_to_nb_packets_map =
|
|
g_hash_table_new (g_direct_hash, g_direct_equal);
|
|
receive_rtxdata.ssrc_to_seqnum_offset_map =
|
|
g_hash_table_new (g_direct_hash, g_direct_equal);
|
|
receive_rtxdata.seqnum_offset = 1;
|
|
|
|
/* build pipeline */
|
|
bin = gst_pipeline_new ("pipeline");
|
|
bus = gst_element_get_bus (bin);
|
|
gst_bus_add_signal_watch_full (bus, G_PRIORITY_HIGH);
|
|
|
|
funnel = gst_element_factory_make ("funnel", "funnel");
|
|
rtprtxreceive = gst_element_factory_make ("rtprtxreceive", "rtprtxreceive");
|
|
sink = gst_element_factory_make ("fakesink", "sink");
|
|
g_object_set (sink, "sync", TRUE, NULL);
|
|
g_object_set (sink, "qos", FALSE, NULL);
|
|
gst_bin_add_many (GST_BIN (bin), funnel, rtprtxreceive, sink, NULL);
|
|
|
|
send_rtxdata_list =
|
|
g_list_append (send_rtxdata_list, add_sender (bin, "videotestsrc",
|
|
"rtpvrawpay", 96, 121, &receive_rtxdata));
|
|
send_rtxdata_list =
|
|
g_list_append (send_rtxdata_list, add_sender (bin, "videotestsrc",
|
|
"rtpvrawpay", 97, 122, &receive_rtxdata));
|
|
send_rtxdata_list =
|
|
g_list_append (send_rtxdata_list, add_sender (bin, "videotestsrc",
|
|
"rtpvrawpay", 98, 123, &receive_rtxdata));
|
|
send_rtxdata_list =
|
|
g_list_append (send_rtxdata_list, add_sender (bin, "videotestsrc",
|
|
"rtpvrawpay", 99, 124, &receive_rtxdata));
|
|
|
|
pt_map = gst_structure_new ("application/x-rtp-pt-map",
|
|
"96", G_TYPE_UINT, 121, "97", G_TYPE_UINT, 122,
|
|
"98", G_TYPE_UINT, 123, "99", G_TYPE_UINT, 124, NULL);
|
|
g_object_set (rtprtxreceive, "payload-type-map", pt_map, NULL);
|
|
gst_structure_free (pt_map);
|
|
|
|
res = gst_element_link (funnel, rtprtxreceive);
|
|
fail_unless (res == TRUE, NULL);
|
|
res = gst_element_link (rtprtxreceive, sink);
|
|
fail_unless (res == TRUE, NULL);
|
|
|
|
srcpad = gst_element_get_static_pad (rtprtxreceive, "src");
|
|
gst_pad_add_probe (srcpad,
|
|
(GST_PAD_PROBE_TYPE_BUFFER | GST_PAD_PROBE_TYPE_PUSH),
|
|
(GstPadProbeCallback) rtprtxreceive_srcpad_probe_multiple,
|
|
&receive_rtxdata, NULL);
|
|
gst_object_unref (srcpad);
|
|
|
|
sinkpad = gst_element_get_static_pad (rtprtxreceive, "sink");
|
|
gst_pad_add_probe (sinkpad,
|
|
GST_PAD_PROBE_TYPE_EVENT_UPSTREAM,
|
|
(GstPadProbeCallback) rtprtxreceive_sinkpad_probe_check_drop,
|
|
&receive_rtxdata, NULL);
|
|
gst_object_unref (sinkpad);
|
|
|
|
g_signal_connect (bus, "message::error",
|
|
(GCallback) message_received_multiple, NULL);
|
|
g_signal_connect (bus, "message::warning",
|
|
(GCallback) message_received_multiple, NULL);
|
|
|
|
for (drop_every_n_packets = 2; drop_every_n_packets < 10;
|
|
drop_every_n_packets++) {
|
|
start_test_drop_multiple_and_check_results (bin, send_rtxdata_list,
|
|
&receive_rtxdata, drop_every_n_packets);
|
|
}
|
|
|
|
/* cleanup */
|
|
|
|
g_list_free_full (send_rtxdata_list, free_rtx_send_data);
|
|
g_hash_table_destroy (receive_rtxdata.ssrc_to_nb_packets_map);
|
|
g_hash_table_destroy (receive_rtxdata.ssrc_to_seqnum_offset_map);
|
|
|
|
gst_bus_remove_signal_watch (bus);
|
|
gst_object_unref (bus);
|
|
gst_object_unref (bin);
|
|
}
|
|
|
|
GST_END_TEST;
|
|
|
|
struct GenerateTestBuffersData
|
|
{
|
|
GstElement *src, *capsfilter, *payloader, *sink;
|
|
GMutex mutex;
|
|
GCond cond;
|
|
GList *buffers;
|
|
gint num_buffers;
|
|
guint last_seqnum;
|
|
};
|
|
|
|
static void
|
|
fakesink_handoff (GstElement * sink, GstBuffer * buf, GstPad * pad,
|
|
gpointer user_data)
|
|
{
|
|
struct GenerateTestBuffersData *data = user_data;
|
|
|
|
g_mutex_lock (&data->mutex);
|
|
|
|
if (data->num_buffers > 0)
|
|
data->buffers = g_list_append (data->buffers, gst_buffer_ref (buf));
|
|
|
|
/* if we have collected enough buffers, unblock the main thread to stop */
|
|
if (--data->num_buffers <= 0)
|
|
g_cond_signal (&data->cond);
|
|
|
|
if (data->num_buffers == 0)
|
|
g_object_get (data->payloader, "seqnum", &data->last_seqnum, NULL);
|
|
|
|
g_mutex_unlock (&data->mutex);
|
|
}
|
|
|
|
static GList *
|
|
generate_test_buffers (const gint num_buffers, guint ssrc, guint * payload_type)
|
|
{
|
|
GstElement *bin;
|
|
GstCaps *videotestsrc_caps;
|
|
gboolean res;
|
|
struct GenerateTestBuffersData data;
|
|
|
|
fail_unless (num_buffers > 0);
|
|
|
|
g_mutex_init (&data.mutex);
|
|
g_cond_init (&data.cond);
|
|
data.buffers = NULL;
|
|
data.num_buffers = num_buffers;
|
|
|
|
bin = gst_pipeline_new (NULL);
|
|
data.src = gst_element_factory_make ("videotestsrc", NULL);
|
|
data.capsfilter = gst_element_factory_make ("capsfilter", NULL);
|
|
data.payloader = gst_element_factory_make ("rtpvrawpay", NULL);
|
|
data.sink = gst_element_factory_make ("fakesink", NULL);
|
|
|
|
/* small frame size will cause vrawpay to generate exactly one rtp packet
|
|
* per video frame, which we need for the max-size-time test */
|
|
videotestsrc_caps =
|
|
gst_caps_from_string
|
|
("video/x-raw,format=I420,width=10,height=10,framerate=30/1");
|
|
|
|
g_object_set (data.src, "do-timestamp", TRUE, NULL);
|
|
g_object_set (data.capsfilter, "caps", videotestsrc_caps, NULL);
|
|
g_object_set (data.payloader, "seqnum-offset", 1, "ssrc", ssrc, NULL);
|
|
g_object_set (data.sink, "signal-handoffs", TRUE, NULL);
|
|
g_signal_connect (data.sink, "handoff", (GCallback) fakesink_handoff, &data);
|
|
|
|
gst_caps_unref (videotestsrc_caps);
|
|
|
|
gst_bin_add_many (GST_BIN (bin), data.src, data.capsfilter, data.payloader,
|
|
data.sink, NULL);
|
|
res = gst_element_link_many (data.src, data.capsfilter, data.payloader,
|
|
data.sink, NULL);
|
|
fail_unless_equals_int (res, TRUE);
|
|
|
|
g_mutex_lock (&data.mutex);
|
|
ASSERT_SET_STATE (bin, GST_STATE_PLAYING, GST_STATE_CHANGE_ASYNC);
|
|
while (data.num_buffers > 0)
|
|
g_cond_wait (&data.cond, &data.mutex);
|
|
g_mutex_unlock (&data.mutex);
|
|
|
|
g_object_get (data.payloader, "pt", payload_type, NULL);
|
|
|
|
ASSERT_SET_STATE (bin, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS);
|
|
|
|
fail_unless_equals_int (g_list_length (data.buffers), num_buffers);
|
|
fail_unless_equals_int (num_buffers, data.last_seqnum);
|
|
|
|
g_mutex_clear (&data.mutex);
|
|
g_cond_clear (&data.cond);
|
|
gst_object_unref (bin);
|
|
|
|
return data.buffers;
|
|
}
|
|
|
|
static GstEvent *
|
|
create_rtx_event (guint seqnum, guint ssrc, guint payload_type)
|
|
{
|
|
return gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM,
|
|
gst_structure_new ("GstRTPRetransmissionRequest",
|
|
"seqnum", G_TYPE_UINT, seqnum,
|
|
"ssrc", G_TYPE_UINT, ssrc,
|
|
"payload-type", G_TYPE_UINT, payload_type, NULL));
|
|
}
|
|
|
|
static void
|
|
test_rtxsender_packet_retention (gboolean test_with_time)
|
|
{
|
|
const gint num_buffers = test_with_time ? 30 : 10;
|
|
const gint half_buffers = num_buffers / 2;
|
|
const guint ssrc = 1234567;
|
|
const guint rtx_ssrc = 7654321;
|
|
const guint rtx_payload_type = 99;
|
|
GstStructure *pt_map;
|
|
GstStructure *ssrc_map;
|
|
GList *in_buffers, *node;
|
|
guint payload_type;
|
|
GstElement *rtxsend;
|
|
GstPad *srcpad, *sinkpad;
|
|
GstCaps *caps;
|
|
GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
|
|
gint i, j;
|
|
gboolean res;
|
|
|
|
/* generate test data */
|
|
in_buffers = generate_test_buffers (num_buffers, ssrc, &payload_type);
|
|
|
|
/* clear the global buffers list, which we are going to use later */
|
|
gst_check_drop_buffers ();
|
|
|
|
/* setup element & pads */
|
|
rtxsend = gst_check_setup_element ("rtprtxsend");
|
|
|
|
pt_map = gst_structure_new ("application/x-rtp-pt-map",
|
|
"96", G_TYPE_UINT, rtx_payload_type, NULL);
|
|
ssrc_map = gst_structure_new ("application/x-rtp-ssrc-map",
|
|
"1234567", G_TYPE_UINT, rtx_ssrc, NULL);
|
|
|
|
/* in both cases we want the rtxsend queue to store 'half_buffers'
|
|
* amount of buffers at most. In max-size-packets mode, it's trivial.
|
|
* In max-size-time mode, we specify almost half a second, which is
|
|
* the equivalent of 15 frames in a 30fps video stream */
|
|
g_object_set (rtxsend,
|
|
"max-size-packets", test_with_time ? 0 : half_buffers,
|
|
"max-size-time", test_with_time ? 499 : 0,
|
|
"payload-type-map", pt_map, "ssrc-map", ssrc_map, NULL);
|
|
gst_structure_free (pt_map);
|
|
gst_structure_free (ssrc_map);
|
|
|
|
srcpad = gst_check_setup_src_pad (rtxsend, &srctemplate);
|
|
fail_unless_equals_int (gst_pad_set_active (srcpad, TRUE), TRUE);
|
|
|
|
sinkpad = gst_check_setup_sink_pad (rtxsend, &sinktemplate);
|
|
fail_unless_equals_int (gst_pad_set_active (sinkpad, TRUE), TRUE);
|
|
|
|
ASSERT_SET_STATE (rtxsend, GST_STATE_PLAYING, GST_STATE_CHANGE_SUCCESS);
|
|
|
|
caps = gst_caps_from_string ("application/x-rtp, "
|
|
"media = (string)video, payload = (int)96, "
|
|
"ssrc = (uint)1234567, clock-rate = (int)90000, "
|
|
"encoding-name = (string)RAW");
|
|
gst_check_setup_events (srcpad, rtxsend, caps, GST_FORMAT_TIME);
|
|
gst_caps_unref (caps);
|
|
|
|
/* now push all buffers and request retransmission every time for all of them */
|
|
node = in_buffers;
|
|
for (i = 1; i <= num_buffers; i++) {
|
|
GstBuffer *buffer = GST_BUFFER (node->data);
|
|
|
|
/* verify that the original packets are correct */
|
|
res = gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp);
|
|
fail_unless_equals_int (res, TRUE);
|
|
fail_unless_equals_int (gst_rtp_buffer_get_ssrc (&rtp), ssrc);
|
|
fail_unless_equals_int (gst_rtp_buffer_get_payload_type (&rtp),
|
|
payload_type);
|
|
fail_unless_equals_int (gst_rtp_buffer_get_seq (&rtp), i);
|
|
gst_rtp_buffer_unmap (&rtp);
|
|
|
|
/* retransmit all the previous ones */
|
|
for (j = 1; j < i; j++) {
|
|
/* synchronize with the chain() function of the "sinkpad"
|
|
* to make sure that rtxsend has pushed the rtx buffer out
|
|
* before continuing */
|
|
GList *last_out_buffer = g_list_last (buffers);
|
|
g_mutex_lock (&check_mutex);
|
|
fail_unless_equals_int (gst_pad_push_event (sinkpad,
|
|
create_rtx_event (j, ssrc, payload_type)), TRUE);
|
|
/* wait for the rtx packet only if we expect the element
|
|
* to actually retransmit something */
|
|
if (j >= MAX (i - half_buffers, 1)) {
|
|
guint64 end_time = g_get_monotonic_time () + G_TIME_SPAN_SECOND;
|
|
|
|
while (last_out_buffer == g_list_last (buffers))
|
|
fail_unless (g_cond_wait_until (&check_cond, &check_mutex, end_time));
|
|
}
|
|
g_mutex_unlock (&check_mutex);
|
|
}
|
|
|
|
/* push this one */
|
|
gst_pad_push (srcpad, gst_buffer_ref (buffer));
|
|
node = g_list_next (node);
|
|
}
|
|
|
|
/* verify the result. buffers should be in this order (numbers are seqnums):
|
|
* 1, 1rtx, 2, 1rtx, 2rtx, 3, ... , 9, 5rtx, 6rtx, 7rtx, 8rtx, 9rtx, 10 */
|
|
{
|
|
GstRTPBuffer orig_rtp = GST_RTP_BUFFER_INIT;
|
|
gint expected_rtx_requests, expected_rtx_packets;
|
|
gint real_rtx_requests, real_rtx_packets;
|
|
|
|
/* verify statistics first */
|
|
expected_rtx_packets = half_buffers * half_buffers +
|
|
((half_buffers - 1) / 2.0f) * half_buffers;
|
|
for (i = 1, expected_rtx_requests = 0; i < num_buffers; i++)
|
|
expected_rtx_requests += i;
|
|
|
|
g_object_get (rtxsend, "num-rtx-requests", &real_rtx_requests,
|
|
"num-rtx-packets", &real_rtx_packets, NULL);
|
|
fail_unless_equals_int (expected_rtx_requests, real_rtx_requests);
|
|
fail_unless_equals_int (expected_rtx_packets, real_rtx_packets);
|
|
|
|
/* and the number of actual buffers that we were pushed out of rtxsend */
|
|
fail_unless_equals_int (g_list_length (buffers),
|
|
num_buffers + expected_rtx_packets);
|
|
|
|
node = buffers;
|
|
for (i = 1; i <= num_buffers; i++) {
|
|
/* verify the retransmission packets */
|
|
for (j = MAX (i - half_buffers, 1); j < i; j++) {
|
|
GST_INFO ("checking %d, %d", i, j);
|
|
|
|
res = gst_rtp_buffer_map (GST_BUFFER (node->data), GST_MAP_READ, &rtp);
|
|
fail_unless_equals_int (res, TRUE);
|
|
|
|
fail_if (gst_rtp_buffer_get_ssrc (&rtp) == ssrc);
|
|
fail_unless_equals_int (gst_rtp_buffer_get_ssrc (&rtp), rtx_ssrc);
|
|
fail_unless_equals_int (gst_rtp_buffer_get_payload_type (&rtp),
|
|
rtx_payload_type);
|
|
fail_unless_equals_int (GST_READ_UINT16_BE (gst_rtp_buffer_get_payload (&rtp)), j); /* j == rtx seqnum */
|
|
|
|
/* open the original packet for this rtx packet and verify timestamps */
|
|
res = gst_rtp_buffer_map (GST_BUFFER (g_list_nth_data (in_buffers,
|
|
j - 1)), GST_MAP_READ, &orig_rtp);
|
|
fail_unless_equals_int (res, TRUE);
|
|
fail_unless_equals_int (gst_rtp_buffer_get_timestamp (&orig_rtp),
|
|
gst_rtp_buffer_get_timestamp (&rtp));
|
|
gst_rtp_buffer_unmap (&orig_rtp);
|
|
|
|
gst_rtp_buffer_unmap (&rtp);
|
|
node = g_list_next (node);
|
|
}
|
|
|
|
/* verify the normal rtp flow packet */
|
|
res = gst_rtp_buffer_map (GST_BUFFER (node->data), GST_MAP_READ, &rtp);
|
|
fail_unless_equals_int (res, TRUE);
|
|
fail_unless_equals_int (gst_rtp_buffer_get_ssrc (&rtp), ssrc);
|
|
fail_unless_equals_int (gst_rtp_buffer_get_payload_type (&rtp),
|
|
payload_type);
|
|
fail_unless_equals_int (gst_rtp_buffer_get_seq (&rtp), i);
|
|
gst_rtp_buffer_unmap (&rtp);
|
|
node = g_list_next (node);
|
|
}
|
|
}
|
|
|
|
g_list_free_full (in_buffers, (GDestroyNotify) gst_buffer_unref);
|
|
gst_check_drop_buffers ();
|
|
|
|
gst_check_teardown_src_pad (rtxsend);
|
|
gst_check_teardown_sink_pad (rtxsend);
|
|
gst_check_teardown_element (rtxsend);
|
|
}
|
|
|
|
GST_START_TEST (test_rtxsender_max_size_packets)
|
|
{
|
|
test_rtxsender_packet_retention (FALSE);
|
|
}
|
|
|
|
GST_END_TEST;
|
|
|
|
GST_START_TEST (test_rtxsender_max_size_time)
|
|
{
|
|
test_rtxsender_packet_retention (TRUE);
|
|
}
|
|
|
|
GST_END_TEST;
|
|
|
|
static void
|
|
compare_rtp_packets (GstBuffer * a, GstBuffer * b)
|
|
{
|
|
GstRTPBuffer rtp_a = GST_RTP_BUFFER_INIT;
|
|
GstRTPBuffer rtp_b = GST_RTP_BUFFER_INIT;
|
|
|
|
gst_rtp_buffer_map (a, GST_MAP_READ, &rtp_a);
|
|
gst_rtp_buffer_map (b, GST_MAP_READ, &rtp_b);
|
|
|
|
fail_unless_equals_int (gst_rtp_buffer_get_header_len (&rtp_a),
|
|
gst_rtp_buffer_get_header_len (&rtp_b));
|
|
fail_unless_equals_int (gst_rtp_buffer_get_version (&rtp_a),
|
|
gst_rtp_buffer_get_version (&rtp_b));
|
|
fail_unless_equals_int (gst_rtp_buffer_get_ssrc (&rtp_a),
|
|
gst_rtp_buffer_get_ssrc (&rtp_b));
|
|
fail_unless_equals_int (gst_rtp_buffer_get_seq (&rtp_a),
|
|
gst_rtp_buffer_get_seq (&rtp_b));
|
|
fail_unless_equals_int (gst_rtp_buffer_get_csrc_count (&rtp_a),
|
|
gst_rtp_buffer_get_csrc_count (&rtp_b));
|
|
fail_unless_equals_int (gst_rtp_buffer_get_marker (&rtp_a),
|
|
gst_rtp_buffer_get_marker (&rtp_b));
|
|
fail_unless_equals_int (gst_rtp_buffer_get_payload_type (&rtp_a),
|
|
gst_rtp_buffer_get_payload_type (&rtp_b));
|
|
fail_unless_equals_int (gst_rtp_buffer_get_timestamp (&rtp_a),
|
|
gst_rtp_buffer_get_timestamp (&rtp_b));
|
|
fail_unless_equals_int (gst_rtp_buffer_get_extension (&rtp_a),
|
|
gst_rtp_buffer_get_extension (&rtp_b));
|
|
|
|
fail_unless_equals_int (gst_rtp_buffer_get_payload_len (&rtp_a),
|
|
gst_rtp_buffer_get_payload_len (&rtp_b));
|
|
fail_unless_equals_int (memcmp (gst_rtp_buffer_get_payload (&rtp_a),
|
|
gst_rtp_buffer_get_payload (&rtp_b),
|
|
gst_rtp_buffer_get_payload_len (&rtp_a)), 0);
|
|
|
|
gst_rtp_buffer_unmap (&rtp_a);
|
|
gst_rtp_buffer_unmap (&rtp_b);
|
|
}
|
|
|
|
GST_START_TEST (test_rtxreceive_data_reconstruction)
|
|
{
|
|
const guint ssrc = 1234567;
|
|
GList *in_buffers;
|
|
guint payload_type;
|
|
GstElement *rtxsend, *rtxrecv;
|
|
GstPad *srcpad, *sinkpad;
|
|
GstCaps *caps;
|
|
GstBuffer *buffer;
|
|
GstStructure *pt_map;
|
|
|
|
/* generate test data */
|
|
in_buffers = generate_test_buffers (1, ssrc, &payload_type);
|
|
|
|
/* clear the global buffers list, which we are going to use later */
|
|
gst_check_drop_buffers ();
|
|
|
|
/* setup element & pads */
|
|
rtxsend = gst_check_setup_element ("rtprtxsend");
|
|
rtxrecv = gst_check_setup_element ("rtprtxreceive");
|
|
|
|
pt_map = gst_structure_new ("application/x-rtp-pt-map",
|
|
"96", G_TYPE_UINT, 99, NULL);
|
|
g_object_set (rtxsend, "payload-type-map", pt_map, NULL);
|
|
g_object_set (rtxrecv, "payload-type-map", pt_map, NULL);
|
|
gst_structure_free (pt_map);
|
|
|
|
fail_unless_equals_int (gst_element_link (rtxsend, rtxrecv), TRUE);
|
|
|
|
srcpad = gst_check_setup_src_pad (rtxsend, &srctemplate);
|
|
fail_unless_equals_int (gst_pad_set_active (srcpad, TRUE), TRUE);
|
|
|
|
sinkpad = gst_check_setup_sink_pad (rtxrecv, &sinktemplate);
|
|
fail_unless_equals_int (gst_pad_set_active (sinkpad, TRUE), TRUE);
|
|
|
|
ASSERT_SET_STATE (rtxsend, GST_STATE_PLAYING, GST_STATE_CHANGE_SUCCESS);
|
|
ASSERT_SET_STATE (rtxrecv, GST_STATE_PLAYING, GST_STATE_CHANGE_SUCCESS);
|
|
|
|
caps = gst_caps_from_string ("application/x-rtp, "
|
|
"media = (string)video, payload = (int)96, "
|
|
"ssrc = (uint)1234567, clock-rate = (int)90000, "
|
|
"encoding-name = (string)RAW");
|
|
gst_check_setup_events (srcpad, rtxsend, caps, GST_FORMAT_TIME);
|
|
gst_caps_unref (caps);
|
|
|
|
/* push buffer */
|
|
buffer = gst_buffer_ref (GST_BUFFER (in_buffers->data));
|
|
fail_unless_equals_int (gst_pad_push (srcpad, buffer), GST_FLOW_OK);
|
|
|
|
/* push retransmission request */
|
|
{
|
|
GList *last_out_buffer;
|
|
guint64 end_time;
|
|
gboolean res;
|
|
|
|
/* synchronize with the chain() function of the "sinkpad"
|
|
* to make sure that rtxsend has pushed the rtx buffer out
|
|
* before continuing */
|
|
last_out_buffer = g_list_last (buffers);
|
|
g_mutex_lock (&check_mutex);
|
|
fail_unless_equals_int (gst_pad_push_event (sinkpad,
|
|
create_rtx_event (1, ssrc, payload_type)), TRUE);
|
|
end_time = g_get_monotonic_time () + G_TIME_SPAN_SECOND;
|
|
do
|
|
res = g_cond_wait_until (&check_cond, &check_mutex, end_time);
|
|
while (res == TRUE && last_out_buffer == g_list_last (buffers));
|
|
fail_unless_equals_int (res, TRUE);
|
|
g_mutex_unlock (&check_mutex);
|
|
}
|
|
|
|
/* verify */
|
|
fail_unless_equals_int (g_list_length (buffers), 2);
|
|
compare_rtp_packets (GST_BUFFER (buffers->data),
|
|
GST_BUFFER (buffers->next->data));
|
|
|
|
/* cleanup */
|
|
g_list_free_full (in_buffers, (GDestroyNotify) gst_buffer_unref);
|
|
gst_check_drop_buffers ();
|
|
|
|
gst_check_teardown_src_pad (rtxsend);
|
|
gst_check_teardown_sink_pad (rtxrecv);
|
|
gst_element_unlink (rtxsend, rtxrecv);
|
|
gst_check_teardown_element (rtxsend);
|
|
gst_check_teardown_element (rtxrecv);
|
|
}
|
|
|
|
GST_END_TEST;
|
|
|
|
static Suite *
|
|
rtprtx_suite (void)
|
|
{
|
|
Suite *s = suite_create ("rtprtx");
|
|
TCase *tc_chain = tcase_create ("general");
|
|
|
|
tcase_set_timeout (tc_chain, 120);
|
|
|
|
suite_add_tcase (s, tc_chain);
|
|
|
|
tcase_add_test (tc_chain, test_push_forward_seq);
|
|
tcase_add_test (tc_chain, test_drop_one_sender);
|
|
tcase_add_test (tc_chain, test_drop_multiple_sender);
|
|
tcase_add_test (tc_chain, test_rtxsender_max_size_packets);
|
|
tcase_add_test (tc_chain, test_rtxsender_max_size_time);
|
|
tcase_add_test (tc_chain, test_rtxreceive_data_reconstruction);
|
|
|
|
return s;
|
|
}
|
|
|
|
GST_CHECK_MAIN (rtprtx);
|