/* GStreamer * * Copyright (C) 2013 Collabora Ltd. * @author Julien Isorce * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Library General Public * License as published by the Free Software Foundation; either * version 2 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Library General Public License for more details. * * You should have received a copy of the GNU Library General Public * License along with this library; if not, write to the * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, * Boston, MA 02110-1301, USA. */ #include #include #include #include /* For ease of programming we use globals to keep refs for our floating * src and sink pads we create; otherwise we always have to do get_pad, * get_peer, and then remove references in every test function */ static GstPad *srcpad, *sinkpad; /* we also have a list of src buffers */ static GList *inbuffers = NULL; static GMainLoop *main_loop; #define RTP_CAPS_STRING \ "application/x-rtp, " \ "media = (string)audio, " \ "payload = (int) 0, " \ "clock-rate = (int) 8000, " \ "encoding-name = (string)PCMU" #define RTP_FRAME_SIZE 20 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink", GST_PAD_SINK, GST_PAD_ALWAYS, GST_STATIC_CAPS ("application/x-rtp") ); static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src", GST_PAD_SRC, GST_PAD_ALWAYS, GST_STATIC_CAPS ("application/x-rtp") ); static void setup_rtprtx (GstElement * rtprtxsend, GstElement * rtprtxreceive, gint num_buffers) { GstClock *clock; GstBuffer *buffer; GstPad *sendsrcpad; GstPad *receivesinkpad; gboolean ret = FALSE; /* a 20 sample audio block (2,5 ms) generated with * gst-launch audiotestsrc wave=silence blocksize=40 num-buffers=3 ! * "audio/x-raw,channels=1,rate=8000" ! mulawenc ! rtppcmupay ! * fakesink dump=1 */ guint8 in[] = { /* first 4 bytes are rtp-header, next 4 bytes are timestamp */ 0x80, 0x80, 0x1c, 0x24, 0x46, 0xcd, 0xb7, 0x11, 0x3c, 0x3a, 0x7c, 0x5b, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff }; GstClockTime ts = G_GUINT64_CONSTANT (0); GstClockTime tso = gst_util_uint64_scale (RTP_FRAME_SIZE, GST_SECOND, 8000); gint i; /* we need a clock here */ clock = gst_system_clock_obtain (); gst_element_set_clock (rtprtxsend, clock); gst_object_unref (clock); srcpad = gst_check_setup_src_pad (rtprtxsend, &srctemplate); sendsrcpad = gst_element_get_static_pad (rtprtxsend, "src"); ret = gst_pad_set_active (srcpad, TRUE); fail_if (ret == FALSE); sinkpad = gst_check_setup_sink_pad (rtprtxreceive, &sinktemplate); receivesinkpad = gst_element_get_static_pad (rtprtxreceive, "sink"); ret = gst_pad_set_active (sinkpad, TRUE); fail_if (ret == FALSE); fail_if (gst_pad_link (sendsrcpad, receivesinkpad) != GST_PAD_LINK_OK); ret = gst_pad_set_active (sendsrcpad, TRUE); fail_if (ret == FALSE); ret = gst_pad_set_active (receivesinkpad, TRUE); fail_if (ret == FALSE); gst_object_unref (sendsrcpad); gst_object_unref (receivesinkpad); for (i = 0; i < num_buffers; i++) { buffer = gst_buffer_new_and_alloc (sizeof (in)); gst_buffer_fill (buffer, 0, in, sizeof (in)); GST_BUFFER_DTS (buffer) = ts; GST_BUFFER_PTS (buffer) = ts; GST_BUFFER_DURATION (buffer) = tso; GST_DEBUG ("created buffer: %p", buffer); /*if (!i) GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT); */ inbuffers = g_list_append (inbuffers, buffer); /* hackish way to update the rtp header */ in[1] = 0x00; in[3]++; /* seqnumber */ in[7] += RTP_FRAME_SIZE; /* inc. timestamp with framesize */ ts += tso; } } static GstStateChangeReturn start_rtprtx (GstElement * element) { GstStateChangeReturn ret; GstClockTime now; GstClock *clock; clock = gst_element_get_clock (element); if (clock) { now = gst_clock_get_time (clock); gst_object_unref (clock); gst_element_set_base_time (element, now); } ret = gst_element_set_state (element, GST_STATE_PLAYING); ck_assert_int_ne (ret, GST_STATE_CHANGE_FAILURE); ret = gst_element_get_state (element, NULL, NULL, GST_CLOCK_TIME_NONE); ck_assert_int_ne (ret, GST_STATE_CHANGE_FAILURE); return ret; } static void cleanup_rtprtx (GstElement * rtprtxsend, GstElement * rtprtxreceive) { GST_DEBUG ("cleanup_rtprtx"); g_list_free (inbuffers); inbuffers = NULL; gst_pad_set_active (srcpad, FALSE); gst_check_teardown_src_pad (rtprtxsend); gst_check_teardown_element (rtprtxsend); gst_pad_set_active (sinkpad, FALSE); gst_check_teardown_sink_pad (rtprtxreceive); gst_check_teardown_element (rtprtxreceive); } static void check_rtprtx_results (GstElement * rtprtxsend, GstElement * rtprtxreceive, gint num_buffers) { guint nbrtxrequests = 0; guint nbrtxpackets = 0; g_object_get (G_OBJECT (rtprtxsend), "num-rtx-requests", &nbrtxrequests, NULL); fail_unless_equals_int (nbrtxrequests, 3); g_object_get (G_OBJECT (rtprtxsend), "num-rtx-packets", &nbrtxpackets, NULL); fail_unless_equals_int (nbrtxpackets, 3); g_object_get (G_OBJECT (rtprtxreceive), "num-rtx-requests", &nbrtxrequests, NULL); fail_unless_equals_int (nbrtxrequests, 3); g_object_get (G_OBJECT (rtprtxreceive), "num-rtx-packets", &nbrtxpackets, NULL); fail_unless_equals_int (nbrtxpackets, 3); g_object_get (G_OBJECT (rtprtxreceive), "num-rtx-assoc-packets", &nbrtxpackets, NULL); fail_unless_equals_int (nbrtxpackets, 3); } GST_START_TEST (test_push_forward_seq) { GstElement *rtprtxsend; GstElement *rtprtxreceive; const guint num_buffers = 4; GList *node; gint i = 0; GstCaps *caps = NULL; rtprtxsend = gst_check_setup_element ("rtprtxsend"); rtprtxreceive = gst_check_setup_element ("rtprtxreceive"); setup_rtprtx (rtprtxsend, rtprtxreceive, num_buffers); GST_DEBUG ("setup_rtprtx"); fail_unless (start_rtprtx (rtprtxsend) == GST_STATE_CHANGE_SUCCESS, "could not set to playing"); fail_unless (start_rtprtx (rtprtxreceive) == GST_STATE_CHANGE_SUCCESS, "could not set to playing"); caps = gst_caps_from_string (RTP_CAPS_STRING); gst_check_setup_events (srcpad, rtprtxsend, caps, GST_FORMAT_TIME); gst_caps_unref (caps); g_object_set (rtprtxsend, "rtx-payload-type", 97, NULL); g_object_set (rtprtxreceive, "rtx-payload-types", "97", NULL); /* push buffers: 0,1,2, */ for (node = inbuffers; node; node = g_list_next (node)) { GstEvent *event = NULL; GstRTPBuffer rtp = GST_RTP_BUFFER_INIT; GstBuffer *buffer = (GstBuffer *) node->data; fail_unless (gst_pad_push (srcpad, buffer) == GST_FLOW_OK); if (i < 3) { gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp); event = gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM, gst_structure_new ("GstRTPRetransmissionRequest", "seqnum", G_TYPE_UINT, (guint) gst_rtp_buffer_get_seq (&rtp), "ssrc", G_TYPE_UINT, (guint) gst_rtp_buffer_get_ssrc (&rtp), "payload-type", G_TYPE_UINT, (guint) gst_rtp_buffer_get_payload_type (&rtp), NULL)); fail_unless (gst_pad_push_event (sinkpad, event)); gst_rtp_buffer_unmap (&rtp); } gst_buffer_unref (buffer); ++i; } /* check the buffer list */ check_rtprtx_results (rtprtxsend, rtprtxreceive, num_buffers); /* cleanup */ cleanup_rtprtx (rtprtxsend, rtprtxreceive); } GST_END_TEST; static void message_received (GstBus * bus, GstMessage * message, GstPipeline * bin) { GST_INFO ("bus message from \"%" GST_PTR_FORMAT "\": %" GST_PTR_FORMAT, GST_MESSAGE_SRC (message), message); switch (message->type) { case GST_MESSAGE_EOS: g_main_loop_quit (main_loop); break; case GST_MESSAGE_WARNING:{ GError *gerror; gchar *debug; gst_message_parse_warning (message, &gerror, &debug); gst_object_default_error (GST_MESSAGE_SRC (message), gerror, debug); g_error_free (gerror); g_free (debug); break; } case GST_MESSAGE_ERROR:{ GError *gerror; gchar *debug; gst_message_parse_error (message, &gerror, &debug); gst_object_default_error (GST_MESSAGE_SRC (message), gerror, debug); g_error_free (gerror); g_free (debug); g_main_loop_quit (main_loop); break; } default: break; } } typedef struct { guint count; guint nb_packets; guint drop_every_n_packets; } RTXSendData; typedef struct { guint nb_packets; guint seqnum_offset; guint seqnum_prev; } RTXReceiveData; static GstPadProbeReturn rtprtxsend_srcpad_probe (GstPad * pad, GstPadProbeInfo * info, gpointer user_data) { GstPadProbeReturn ret = GST_PAD_PROBE_OK; if (info->type == (GST_PAD_PROBE_TYPE_BUFFER | GST_PAD_PROBE_TYPE_PUSH)) { GstBuffer *buffer = GST_BUFFER (info->data); RTXSendData *rtxdata = (RTXSendData *) user_data; GstRTPBuffer rtp = GST_RTP_BUFFER_INIT; guint payload_type = 0; gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp); payload_type = gst_rtp_buffer_get_payload_type (&rtp); /* main stream packets */ if (payload_type == 96) { /* count packets of the main stream */ ++rtxdata->nb_packets; /* drop some packets */ if (rtxdata->count < rtxdata->drop_every_n_packets) { ++rtxdata->count; } else { /* drop a packet every 'rtxdata->count' packets */ rtxdata->count = 1; ret = GST_PAD_PROBE_DROP; } } else { /* retransmission packets */ } gst_rtp_buffer_unmap (&rtp); } return ret; } static GstPadProbeReturn rtprtxreceive_srcpad_probe (GstPad * pad, GstPadProbeInfo * info, gpointer user_data) { if (info->type == (GST_PAD_PROBE_TYPE_BUFFER | GST_PAD_PROBE_TYPE_PUSH)) { GstBuffer *buffer = GST_BUFFER (info->data); RTXReceiveData *rtxdata = (RTXReceiveData *) user_data; GstRTPBuffer rtp = GST_RTP_BUFFER_INIT; guint seqnum = 0; guint i = 0; gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp); seqnum = gst_rtp_buffer_get_seq (&rtp); /* check if there is a dropped packet */ if (seqnum > rtxdata->seqnum_prev + rtxdata->seqnum_offset) { GstPad *peerpad = gst_pad_get_peer (pad); /* ask retransmission of missing packet */ for (i = rtxdata->seqnum_prev + rtxdata->seqnum_offset; i < seqnum; i += rtxdata->seqnum_offset) { GstEvent *event = gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM, gst_structure_new ("GstRTPRetransmissionRequest", "seqnum", G_TYPE_UINT, i, "ssrc", G_TYPE_UINT, gst_rtp_buffer_get_ssrc (&rtp), "payload-type", G_TYPE_UINT, gst_rtp_buffer_get_payload_type (&rtp), NULL)); gst_pad_push_event (peerpad, event); } gst_object_unref (peerpad); rtxdata->seqnum_prev = seqnum; } else if (seqnum == rtxdata->seqnum_prev + rtxdata->seqnum_offset) { /* also update previous seqnum in this case */ rtxdata->seqnum_prev = seqnum; } gst_rtp_buffer_unmap (&rtp); ++rtxdata->nb_packets; } return GST_PAD_PROBE_OK; } static void start_test_drop_and_check_results (GstElement * bin, GstElement * rtppayloader, GstElement * rtprtxsend, GstElement * rtprtxreceive, RTXSendData * send_rtxdata, RTXReceiveData * receive_rtxdata, guint drop_every_n_packets) { GstStateChangeReturn state_res = GST_STATE_CHANGE_FAILURE; guint nbrtxrequests = 0; guint nbrtxpackets = 0; guint nb_expected_requests = 0; GST_INFO ("starting test"); g_object_set (rtppayloader, "pt", 96, NULL); g_object_set (rtppayloader, "seqnum-offset", 1, NULL); g_object_set (rtprtxsend, "rtx-payload-type", 99, NULL); g_object_set (rtprtxreceive, "rtx-payload-types", "99:111:125", NULL); send_rtxdata->count = 1; send_rtxdata->nb_packets = 0; send_rtxdata->drop_every_n_packets = drop_every_n_packets; receive_rtxdata->nb_packets = 0; receive_rtxdata->seqnum_offset = 0; receive_rtxdata->seqnum_prev = 0; /* retrieve offset before going to paused */ g_object_get (G_OBJECT (rtppayloader), "seqnum-offset", &receive_rtxdata->seqnum_offset, NULL); /* prepare playing */ state_res = gst_element_set_state (bin, GST_STATE_PAUSED); ck_assert_int_ne (state_res, GST_STATE_CHANGE_FAILURE); /* wait for completion */ state_res = gst_element_get_state (bin, NULL, NULL, GST_CLOCK_TIME_NONE); ck_assert_int_ne (state_res, GST_STATE_CHANGE_FAILURE); /* retrieve seqnum_prev here to make sure it has been reseted */ g_object_get (G_OBJECT (rtppayloader), "seqnum", &receive_rtxdata->seqnum_prev, NULL); /* run pipeline */ state_res = gst_element_set_state (bin, GST_STATE_PLAYING); ck_assert_int_ne (state_res, GST_STATE_CHANGE_FAILURE); GST_INFO ("running main loop"); g_main_loop_run (main_loop); /* check results */ if (send_rtxdata->nb_packets % drop_every_n_packets == 0) { /* special case because the last buffer will be dropped * so the receiver cannot know if it has been dropped (no next packet) */ nb_expected_requests = send_rtxdata->nb_packets / drop_every_n_packets - 1; fail_unless_equals_int (send_rtxdata->nb_packets, receive_rtxdata->nb_packets + 1); } else { nb_expected_requests = send_rtxdata->nb_packets / drop_every_n_packets; fail_unless_equals_int (send_rtxdata->nb_packets, receive_rtxdata->nb_packets); } g_object_get (G_OBJECT (rtprtxsend), "num-rtx-requests", &nbrtxrequests, NULL); fail_unless_equals_int (nbrtxrequests, nb_expected_requests); g_object_get (G_OBJECT (rtprtxsend), "num-rtx-packets", &nbrtxpackets, NULL); fail_unless_equals_int (nbrtxpackets, nb_expected_requests); g_object_get (G_OBJECT (rtprtxreceive), "num-rtx-requests", &nbrtxrequests, NULL); fail_unless_equals_int (nbrtxrequests, nb_expected_requests); g_object_get (G_OBJECT (rtprtxreceive), "num-rtx-packets", &nbrtxpackets, NULL); fail_unless_equals_int (nbrtxpackets, nb_expected_requests); g_object_get (G_OBJECT (rtprtxreceive), "num-rtx-assoc-packets", &nbrtxpackets, NULL); fail_unless_equals_int (nbrtxpackets, nb_expected_requests); state_res = gst_element_set_state (bin, GST_STATE_NULL); ck_assert_int_ne (state_res, GST_STATE_CHANGE_FAILURE); } /* This test build the pipeline videotestsrc ! rtpvrawpay ! rtprtxsend ! rtprtxreceive ! fakesink * and drop some buffer between rtprtxsend and rtprtxreceive * Then it checks that every dropped packet has been re-sent and it checks that * not too much requests has been sent. */ GST_START_TEST (test_drop_one_sender) { GstElement *bin, *src, *rtppayloader, *rtprtxsend, *rtprtxreceive, *sink; GstBus *bus; gboolean res; GstPad *srcpad, *sinkpad; GstStreamConsistency *chk_1, *chk_2, *chk_3; gint num_buffers = 20; guint drop_every_n_packets = 0; RTXSendData send_rtxdata; RTXReceiveData receive_rtxdata; GST_INFO ("preparing test"); /* build pipeline */ bin = gst_pipeline_new ("pipeline"); bus = gst_element_get_bus (bin); gst_bus_add_signal_watch_full (bus, G_PRIORITY_HIGH); src = gst_element_factory_make ("videotestsrc", "src"); g_object_set (src, "num-buffers", num_buffers, NULL); rtppayloader = gst_element_factory_make ("rtpvrawpay", "rtppayloader"); rtprtxsend = gst_element_factory_make ("rtprtxsend", "rtprtxsend"); rtprtxreceive = gst_element_factory_make ("rtprtxreceive", "rtprtxreceive"); sink = gst_element_factory_make ("fakesink", "sink"); gst_bin_add_many (GST_BIN (bin), src, rtppayloader, rtprtxsend, rtprtxreceive, sink, NULL); res = gst_element_link (src, rtppayloader); fail_unless (res == TRUE, NULL); res = gst_element_link (rtppayloader, rtprtxsend); fail_unless (res == TRUE, NULL); res = gst_element_link (rtprtxsend, rtprtxreceive); fail_unless (res == TRUE, NULL); res = gst_element_link (rtprtxreceive, sink); fail_unless (res == TRUE, NULL); /* create consistency checkers for the pads */ srcpad = gst_element_get_static_pad (rtppayloader, "src"); chk_1 = gst_consistency_checker_new (srcpad); gst_object_unref (srcpad); srcpad = gst_element_get_static_pad (rtprtxsend, "src"); gst_pad_add_probe (srcpad, (GST_PAD_PROBE_TYPE_BUFFER | GST_PAD_PROBE_TYPE_PUSH), (GstPadProbeCallback) rtprtxsend_srcpad_probe, &send_rtxdata, NULL); sinkpad = gst_pad_get_peer (srcpad); fail_if (sinkpad == NULL); chk_2 = gst_consistency_checker_new (sinkpad); gst_object_unref (sinkpad); gst_object_unref (srcpad); srcpad = gst_element_get_static_pad (rtprtxreceive, "src"); gst_pad_add_probe (srcpad, (GST_PAD_PROBE_TYPE_BUFFER | GST_PAD_PROBE_TYPE_PUSH), (GstPadProbeCallback) rtprtxreceive_srcpad_probe, &receive_rtxdata, NULL); sinkpad = gst_pad_get_peer (srcpad); fail_if (sinkpad == NULL); chk_3 = gst_consistency_checker_new (sinkpad); gst_object_unref (sinkpad); gst_object_unref (srcpad); main_loop = g_main_loop_new (NULL, FALSE); g_signal_connect (bus, "message::error", (GCallback) message_received, bin); g_signal_connect (bus, "message::warning", (GCallback) message_received, bin); g_signal_connect (bus, "message::eos", (GCallback) message_received, bin); for (drop_every_n_packets = 2; drop_every_n_packets < 10; drop_every_n_packets++) { start_test_drop_and_check_results (bin, rtppayloader, rtprtxsend, rtprtxreceive, &send_rtxdata, &receive_rtxdata, drop_every_n_packets); } /* cleanup */ g_main_loop_unref (main_loop); gst_consistency_checker_free (chk_1); gst_consistency_checker_free (chk_2); gst_consistency_checker_free (chk_3); gst_bus_remove_signal_watch (bus); gst_object_unref (bus); gst_object_unref (bin); } GST_END_TEST; GMutex lock_eos; static gint nb_sources; static gint nb_eos; static void message_received_multiple (GstBus * bus, GstMessage * message, GstPipeline * bin) { GST_INFO ("bus message from \"%" GST_PTR_FORMAT "\": %" GST_PTR_FORMAT, GST_MESSAGE_SRC (message), message); switch (message->type) { case GST_MESSAGE_EOS: g_main_loop_quit (main_loop); break; case GST_MESSAGE_WARNING:{ GError *gerror; gchar *debug; gst_message_parse_warning (message, &gerror, &debug); gst_object_default_error (GST_MESSAGE_SRC (message), gerror, debug); g_error_free (gerror); g_free (debug); break; } case GST_MESSAGE_ERROR:{ GError *gerror; gchar *debug; gst_message_parse_error (message, &gerror, &debug); gst_object_default_error (GST_MESSAGE_SRC (message), gerror, debug); g_error_free (gerror); g_free (debug); g_main_loop_quit (main_loop); break; } default: break; } } typedef struct { guint count; guint nb_packets; guint drop_every_n_packets; guint payload_type_master; } RTXSendMultipleData; /* drop some packets */ static GstPadProbeReturn rtprtxsend_srcpad_probe_multiple (GstPad * pad, GstPadProbeInfo * info, gpointer user_data) { GstPadProbeReturn ret = GST_PAD_PROBE_OK; if (info->type == (GST_PAD_PROBE_TYPE_BUFFER | GST_PAD_PROBE_TYPE_PUSH)) { GstBuffer *buffer = GST_BUFFER (info->data); RTXSendMultipleData *rtxdata = (RTXSendMultipleData *) user_data; GstRTPBuffer rtp = GST_RTP_BUFFER_INIT; guint payload_type = 0; gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp); payload_type = gst_rtp_buffer_get_payload_type (&rtp); /* main stream packets */ if (payload_type == rtxdata->payload_type_master) { /* count packets of the main stream */ ++rtxdata->nb_packets; /* drop some packets */ if (rtxdata->count < rtxdata->drop_every_n_packets) { ++rtxdata->count; } else { /* drop a packet every 'rtxdata->count' packets */ rtxdata->count = 1; ret = GST_PAD_PROBE_DROP; } } else { /* retransmission packets */ } gst_rtp_buffer_unmap (&rtp); } return ret; } /* make sure every sources has sent all their buffers */ static GstPadProbeReturn source_srcpad_probe_multiple (GstPad * pad, GstPadProbeInfo * info, gpointer user_data) { GstPadProbeReturn ret = GST_PAD_PROBE_OK; if (info->type == (GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM | GST_PAD_PROBE_TYPE_PUSH)) { GstEvent *event = GST_PAD_PROBE_INFO_EVENT (info); if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) { g_mutex_lock (&lock_eos); ++nb_eos; if (nb_eos < nb_sources) ret = GST_PAD_PROBE_DROP; g_mutex_unlock (&lock_eos); } } return ret; } /* add one branch videotestsrc ! rtpvrawpay ! rtprtxsend ! queue ! funnel. */ static RTXSendMultipleData * add_sender (GstElement * bin, const gchar * src_name, const gchar * payloader_name, guint payload_type_master, guint payload_type_aux) { GstElement *src = NULL; GstElement *rtppayloader = NULL; GstElement *rtprtxsend = NULL; GstElement *queue = NULL; GstElement *funnel = NULL; GstPad *srcpad = NULL; gboolean res = FALSE; RTXSendMultipleData *send_rtxdata = g_slice_new0 (RTXSendMultipleData); send_rtxdata->count = 1; send_rtxdata->nb_packets = 0; send_rtxdata->drop_every_n_packets = 0; send_rtxdata->payload_type_master = payload_type_master; src = gst_element_factory_make (src_name, NULL); rtppayloader = gst_element_factory_make (payloader_name, NULL); rtprtxsend = gst_element_factory_make ("rtprtxsend", NULL); queue = gst_element_factory_make ("queue", NULL); funnel = gst_bin_get_by_name (GST_BIN (bin), "funnel"); g_object_set (src, "num-buffers", 25, NULL); g_object_set (rtppayloader, "pt", payload_type_master, NULL); g_object_set (rtppayloader, "seqnum-offset", 1, NULL); g_object_set (rtprtxsend, "rtx-payload-type", payload_type_aux, NULL); /* we want that every drop packet be resent fast */ g_object_set (queue, "max-size-buffers", 1, NULL); g_object_set (queue, "flush-on-eos", FALSE, NULL); gst_bin_add_many (GST_BIN (bin), src, rtppayloader, rtprtxsend, queue, NULL); res = gst_element_link (src, rtppayloader); fail_unless (res == TRUE, NULL); res = gst_element_link (rtppayloader, rtprtxsend); fail_unless (res == TRUE, NULL); res = gst_element_link (rtprtxsend, queue); fail_unless (res == TRUE, NULL); res = gst_element_link (queue, funnel); fail_unless (res == TRUE, NULL); gst_object_unref (funnel); /* to drop some packets */ srcpad = gst_element_get_static_pad (rtprtxsend, "src"); gst_pad_add_probe (srcpad, (GST_PAD_PROBE_TYPE_BUFFER | GST_PAD_PROBE_TYPE_PUSH), (GstPadProbeCallback) rtprtxsend_srcpad_probe_multiple, send_rtxdata, NULL); gst_object_unref (srcpad); /* to make sure every sources has sent all their buffers */ srcpad = gst_element_get_static_pad (queue, "src"); gst_pad_add_probe (srcpad, (GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM | GST_PAD_PROBE_TYPE_PUSH), (GstPadProbeCallback) source_srcpad_probe_multiple, NULL, NULL); gst_object_unref (srcpad); ++nb_sources; return send_rtxdata; } typedef struct { GHashTable *ssrc_to_nb_packets_map; GHashTable *ssrc_to_seqnum_offset_map; guint seqnum_offset; } RTXReceiveMultipleData; static GstPadProbeReturn rtprtxreceive_srcpad_probe_multiple (GstPad * pad, GstPadProbeInfo * info, gpointer user_data) { if (info->type == (GST_PAD_PROBE_TYPE_BUFFER | GST_PAD_PROBE_TYPE_PUSH)) { GstBuffer *buffer = GST_BUFFER (info->data); RTXReceiveMultipleData *rtxdata = (RTXReceiveMultipleData *) user_data; GstRTPBuffer rtp = GST_RTP_BUFFER_INIT; guint ssrc = 0; guint seqnum = 0; gpointer seqnum_prev = 0; guint nb_packets = 0; gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp); ssrc = gst_rtp_buffer_get_ssrc (&rtp); seqnum = gst_rtp_buffer_get_seq (&rtp); if (!g_hash_table_lookup_extended (rtxdata->ssrc_to_seqnum_offset_map, GUINT_TO_POINTER (ssrc), NULL, &seqnum_prev)) { /*In our test we take care to never drop the first buffer */ g_hash_table_insert (rtxdata->ssrc_to_seqnum_offset_map, GUINT_TO_POINTER (ssrc), GUINT_TO_POINTER (seqnum)); g_hash_table_insert (rtxdata->ssrc_to_nb_packets_map, GUINT_TO_POINTER (ssrc), GUINT_TO_POINTER (1)); return GST_PAD_PROBE_OK; } /* check if there is a dropped packet * (in our test every packet arrived in increasing order) */ if (seqnum > GPOINTER_TO_UINT (seqnum_prev) + rtxdata->seqnum_offset) { GstPad *peerpad = gst_pad_get_peer (pad); guint i = 0; /* ask retransmission of missing packets */ for (i = GPOINTER_TO_UINT (seqnum_prev) + rtxdata->seqnum_offset; i < seqnum; i += rtxdata->seqnum_offset) { GstEvent *event = gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM, gst_structure_new ("GstRTPRetransmissionRequest", "seqnum", G_TYPE_UINT, i, "ssrc", G_TYPE_UINT, gst_rtp_buffer_get_ssrc (&rtp), "payload-type", G_TYPE_UINT, gst_rtp_buffer_get_payload_type (&rtp), NULL)); gst_pad_push_event (peerpad, event); } gst_object_unref (peerpad); g_hash_table_insert (rtxdata->ssrc_to_seqnum_offset_map, GUINT_TO_POINTER (ssrc), GUINT_TO_POINTER (seqnum)); } else if (seqnum == GPOINTER_TO_UINT (seqnum_prev) + rtxdata->seqnum_offset) { /* also update previous seqnum in this case */ g_hash_table_insert (rtxdata->ssrc_to_seqnum_offset_map, GUINT_TO_POINTER (ssrc), GUINT_TO_POINTER (seqnum)); } else { /* receive retransmited packet */ } gst_rtp_buffer_unmap (&rtp); nb_packets = GPOINTER_TO_UINT (g_hash_table_lookup (rtxdata->ssrc_to_nb_packets_map, GUINT_TO_POINTER (ssrc))); g_hash_table_insert (rtxdata->ssrc_to_nb_packets_map, GUINT_TO_POINTER (ssrc), GUINT_TO_POINTER (++nb_packets)); } return GST_PAD_PROBE_OK; } static void reset_rtx_send_data (RTXSendMultipleData * send_rtxdata, gpointer data) { send_rtxdata->count = 1; send_rtxdata->nb_packets = 0; send_rtxdata->drop_every_n_packets = *(guint *) data; } /* compute number of all packets sent by all sender */ static void compute_total_packets_sent (RTXSendMultipleData * send_rtxdata, gpointer data) { guint *sum = (guint *) data; *sum += send_rtxdata->nb_packets; } /* compute number of all packets received by rtprtxreceive::src pad */ static void compute_total_packets_received (gpointer key, gpointer value, gpointer data) { guint *sum = (guint *) data; *sum += GPOINTER_TO_UINT (value); } static void start_test_drop_multiple_and_check_results (GstElement * bin, GList * send_rtxdata_list, RTXReceiveMultipleData * receive_rtxdata, guint drop_every_n_packets) { GstStateChangeReturn state_res = GST_STATE_CHANGE_FAILURE; GstElement *rtprtxreceive = gst_bin_get_by_name (GST_BIN (bin), "rtprtxreceive"); guint sum_all_packets_sent = 0; guint sum_rtx_packets_sent = 0; guint sum_all_packets_received = 0; guint sum_rtx_packets_received = 0; guint sum_rtx_assoc_packets_received = 0; guint sum_rtx_dropped_packets_received = 0; gdouble error_sent_recv = 0; GstIterator *itr_elements = NULL; gboolean done = FALSE; GValue item = { 0 }; GstElement *element = NULL; gchar *name = NULL; GST_INFO ("starting test"); g_hash_table_remove_all (receive_rtxdata->ssrc_to_nb_packets_map); g_hash_table_remove_all (receive_rtxdata->ssrc_to_seqnum_offset_map); g_list_foreach (send_rtxdata_list, (GFunc) reset_rtx_send_data, &drop_every_n_packets); /* run pipeline */ state_res = gst_element_set_state (bin, GST_STATE_PLAYING); ck_assert_int_ne (state_res, GST_STATE_CHANGE_FAILURE); state_res = gst_element_get_state (bin, NULL, NULL, GST_CLOCK_TIME_NONE); ck_assert_int_ne (state_res, GST_STATE_CHANGE_FAILURE); GST_INFO ("running main loop"); g_main_loop_run (main_loop); /* check results */ itr_elements = gst_bin_iterate_elements (GST_BIN (bin)); done = FALSE; while (!done) { switch (gst_iterator_next (itr_elements, &item)) { case GST_ITERATOR_OK: element = GST_ELEMENT (g_value_get_object (&item)); name = gst_element_get_name (element); if (g_str_has_prefix (name, "rtprtxsend") > 0) { guint nb_packets = 0; g_object_get (G_OBJECT (element), "num-rtx-packets", &nb_packets, NULL); sum_rtx_packets_sent += nb_packets; } g_free (name); g_value_reset (&item); break; case GST_ITERATOR_RESYNC: gst_iterator_resync (itr_elements); break; case GST_ITERATOR_ERROR: done = TRUE; break; case GST_ITERATOR_DONE: done = TRUE; break; } } g_value_unset (&item); gst_iterator_free (itr_elements); /* compute number of all packets sent by all sender */ g_list_foreach (send_rtxdata_list, (GFunc) compute_total_packets_sent, &sum_all_packets_sent); /* compute number of all packets received by rtprtxreceive::src pad */ g_hash_table_foreach (receive_rtxdata->ssrc_to_nb_packets_map, compute_total_packets_received, (gpointer) & sum_all_packets_received); /* check that we sent as many packets as received */ /* when eos is received by sources we cannot ensure that every packets * will be received by sinks (maybe queue flush ?) */ fail_if (sum_all_packets_sent < sum_all_packets_received); /* some packet are not received, I still have to figure out why * but I suspect it comes from pipeline setup/shutdown */ if (sum_all_packets_sent != sum_all_packets_received) { error_sent_recv = 1 - sum_all_packets_received / (gdouble) sum_all_packets_sent; fail_if (error_sent_recv > 0.30); /* it should be 0% */ } /* retrieve number of retransmit packets received by rtprtxreceive */ g_object_get (G_OBJECT (rtprtxreceive), "num-rtx-packets", &sum_rtx_packets_received, NULL); /* some of rtx packet are not received because the receiver avoids * collision (= requests that have the same seqnum) */ fail_if (sum_rtx_packets_sent < sum_rtx_packets_received); g_object_get (G_OBJECT (rtprtxreceive), "num-rtx-assoc-packets", &sum_rtx_assoc_packets_received, NULL); sum_rtx_dropped_packets_received = sum_rtx_packets_received - sum_rtx_assoc_packets_received; fail_unless_equals_int (sum_rtx_packets_sent, sum_rtx_assoc_packets_received + sum_rtx_dropped_packets_received); gst_object_unref (rtprtxreceive); state_res = gst_element_set_state (bin, GST_STATE_NULL); ck_assert_int_ne (state_res, GST_STATE_CHANGE_FAILURE); } static void free_rtx_send_data (gpointer data) { g_slice_free (RTXSendMultipleData, data); } /* This test build the pipeline funnel name=funnel * videotestsrc ! rtpvrawpay ! rtprtxsend ! queue ! funnel. * videotestsrc ! rtpvrawpay ! rtprtxsend ! queue ! funnel. * N * funnel. ! rtprtxreceive ! fakesink * and drop some buffer just after each rtprtxsend * Then it checks that every dropped packet has been re-sent and it checks * that not too much requests has been sent. */ GST_START_TEST (test_drop_multiple_sender) { GstElement *bin, *funnel, *rtprtxreceive, *sink; GstBus *bus; gboolean res; GstPad *srcpad; guint drop_every_n_packets = 0; GList *send_rtxdata_list = NULL; RTXReceiveMultipleData receive_rtxdata; GST_INFO ("preparing test"); receive_rtxdata.ssrc_to_nb_packets_map = g_hash_table_new (g_direct_hash, g_direct_equal); receive_rtxdata.ssrc_to_seqnum_offset_map = g_hash_table_new (g_direct_hash, g_direct_equal); receive_rtxdata.seqnum_offset = 1; /* build pipeline */ bin = gst_pipeline_new ("pipeline"); bus = gst_element_get_bus (bin); gst_bus_add_signal_watch_full (bus, G_PRIORITY_HIGH); funnel = gst_element_factory_make ("funnel", "funnel"); rtprtxreceive = gst_element_factory_make ("rtprtxreceive", "rtprtxreceive"); sink = gst_element_factory_make ("fakesink", "sink"); g_object_set (sink, "sync", TRUE, NULL); g_object_set (sink, "qos", FALSE, NULL); gst_bin_add_many (GST_BIN (bin), funnel, rtprtxreceive, sink, NULL); nb_sources = 0; g_mutex_init (&lock_eos); send_rtxdata_list = g_list_append (send_rtxdata_list, add_sender (bin, "videotestsrc", "rtpvrawpay", 96, 121)); send_rtxdata_list = g_list_append (send_rtxdata_list, add_sender (bin, "videotestsrc", "rtpvrawpay", 97, 122)); send_rtxdata_list = g_list_append (send_rtxdata_list, add_sender (bin, "videotestsrc", "rtpvrawpay", 98, 123)); send_rtxdata_list = g_list_append (send_rtxdata_list, add_sender (bin, "videotestsrc", "rtpvrawpay", 99, 124)); res = gst_element_link (funnel, rtprtxreceive); fail_unless (res == TRUE, NULL); res = gst_element_link (rtprtxreceive, sink); fail_unless (res == TRUE, NULL); srcpad = gst_element_get_static_pad (rtprtxreceive, "src"); gst_pad_add_probe (srcpad, (GST_PAD_PROBE_TYPE_BUFFER | GST_PAD_PROBE_TYPE_PUSH), (GstPadProbeCallback) rtprtxreceive_srcpad_probe_multiple, &receive_rtxdata, NULL); gst_object_unref (srcpad); main_loop = g_main_loop_new (NULL, FALSE); g_signal_connect (bus, "message::error", (GCallback) message_received_multiple, bin); g_signal_connect (bus, "message::warning", (GCallback) message_received_multiple, bin); g_signal_connect (bus, "message::eos", (GCallback) message_received_multiple, bin); for (drop_every_n_packets = 2; drop_every_n_packets < 10; drop_every_n_packets++) { g_object_set (rtprtxreceive, "rtx-payload-types", "121:122:123:124", NULL); nb_eos = 0; start_test_drop_multiple_and_check_results (bin, send_rtxdata_list, &receive_rtxdata, drop_every_n_packets); } /* cleanup */ g_main_loop_unref (main_loop); g_list_free_full (send_rtxdata_list, free_rtx_send_data); g_hash_table_destroy (receive_rtxdata.ssrc_to_nb_packets_map); g_hash_table_destroy (receive_rtxdata.ssrc_to_seqnum_offset_map); gst_bus_remove_signal_watch (bus); gst_object_unref (bus); gst_object_unref (bin); g_mutex_clear (&lock_eos); } GST_END_TEST; struct GenerateTestBuffersData { GstElement *src, *capsfilter, *payloader, *sink; GMutex mutex; GCond cond; GList *buffers; gint num_buffers; guint last_seqnum; }; static void fakesink_handoff (GstElement * sink, GstBuffer * buf, GstPad * pad, gpointer user_data) { struct GenerateTestBuffersData *data = user_data; g_mutex_lock (&data->mutex); if (data->num_buffers > 0) data->buffers = g_list_append (data->buffers, gst_buffer_ref (buf)); /* if we have collected enough buffers, unblock the main thread to stop */ if (--data->num_buffers <= 0) g_cond_signal (&data->cond); if (data->num_buffers == 0) g_object_get (data->payloader, "seqnum", &data->last_seqnum, NULL); g_mutex_unlock (&data->mutex); } static GList * generate_test_buffers (const gint num_buffers, guint ssrc, guint * payload_type) { GstElement *bin; GstCaps *videotestsrc_caps; gboolean res; struct GenerateTestBuffersData data; fail_unless (num_buffers > 0); g_mutex_init (&data.mutex); g_cond_init (&data.cond); data.buffers = NULL; data.num_buffers = num_buffers; bin = gst_pipeline_new (NULL); data.src = gst_element_factory_make ("videotestsrc", NULL); data.capsfilter = gst_element_factory_make ("capsfilter", NULL); data.payloader = gst_element_factory_make ("rtpvrawpay", NULL); data.sink = gst_element_factory_make ("fakesink", NULL); /* small frame size will cause vrawpay to generate exactly one rtp packet * per video frame, which we need for the max-size-time test */ videotestsrc_caps = gst_caps_from_string ("video/x-raw,format=I420,width=10,height=10,framerate=30/1"); g_object_set (data.src, "do-timestamp", TRUE, NULL); g_object_set (data.capsfilter, "caps", videotestsrc_caps, NULL); g_object_set (data.payloader, "seqnum-offset", 1, "ssrc", ssrc, NULL); g_object_set (data.sink, "signal-handoffs", TRUE, NULL); g_signal_connect (data.sink, "handoff", (GCallback) fakesink_handoff, &data); gst_caps_unref (videotestsrc_caps); gst_bin_add_many (GST_BIN (bin), data.src, data.capsfilter, data.payloader, data.sink, NULL); res = gst_element_link_many (data.src, data.capsfilter, data.payloader, data.sink, NULL); fail_unless_equals_int (res, TRUE); g_mutex_lock (&data.mutex); ASSERT_SET_STATE (bin, GST_STATE_PLAYING, GST_STATE_CHANGE_ASYNC); while (data.num_buffers > 0) g_cond_wait (&data.cond, &data.mutex); g_mutex_unlock (&data.mutex); g_object_get (data.payloader, "pt", payload_type, NULL); ASSERT_SET_STATE (bin, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS); fail_unless_equals_int (g_list_length (data.buffers), num_buffers); fail_unless_equals_int (num_buffers, data.last_seqnum); g_mutex_clear (&data.mutex); g_cond_clear (&data.cond); gst_object_unref (bin); return data.buffers; } static GstEvent * create_rtx_event (guint seqnum, guint ssrc, guint payload_type) { return gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM, gst_structure_new ("GstRTPRetransmissionRequest", "seqnum", G_TYPE_UINT, seqnum, "ssrc", G_TYPE_UINT, ssrc, "payload-type", G_TYPE_UINT, payload_type, NULL)); } static void test_rtxsender_packet_retention (gboolean test_with_time) { const gint num_buffers = test_with_time ? 30 : 10; const gint half_buffers = num_buffers / 2; const guint ssrc = 1234567; const guint rtx_payload_type = 99; GList *in_buffers, *node; guint payload_type; GstElement *rtxsend; GstPad *srcpad, *sinkpad; GstCaps *caps; GstRTPBuffer rtp = GST_RTP_BUFFER_INIT; gint i, j; gboolean res; /* generate test data */ in_buffers = generate_test_buffers (num_buffers, ssrc, &payload_type); /* clear the global buffers list, which we are going to use later */ gst_check_drop_buffers (); /* setup element & pads */ rtxsend = gst_check_setup_element ("rtprtxsend"); /* in both cases we want the rtxsend queue to store 'half_buffers' * amount of buffers at most. In max-size-packets mode, it's trivial. * In max-size-time mode, we specify almost half a second, which is * the equivalent of 15 frames in a 30fps video stream */ g_object_set (rtxsend, "max-size-packets", test_with_time ? 0 : half_buffers, "max-size-time", test_with_time ? 499 : 0, "rtx-payload-type", rtx_payload_type, NULL); srcpad = gst_check_setup_src_pad (rtxsend, &srctemplate); fail_unless_equals_int (gst_pad_set_active (srcpad, TRUE), TRUE); sinkpad = gst_check_setup_sink_pad (rtxsend, &sinktemplate); fail_unless_equals_int (gst_pad_set_active (sinkpad, TRUE), TRUE); ASSERT_SET_STATE (rtxsend, GST_STATE_PLAYING, GST_STATE_CHANGE_SUCCESS); caps = gst_caps_from_string ("application/x-rtp, " "media = (string)video, payload = (int)96, " "clock-rate = (int)90000, encoding-name = (string)RAW"); gst_check_setup_events (srcpad, rtxsend, caps, GST_FORMAT_TIME); gst_caps_unref (caps); /* now push all buffers and request retransmission every time for all of them */ node = in_buffers; for (i = 1; i <= num_buffers; i++) { GstBuffer *buffer = GST_BUFFER (node->data); /* verify that the original packets are correct */ res = gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp); fail_unless_equals_int (res, TRUE); fail_unless_equals_int (gst_rtp_buffer_get_ssrc (&rtp), ssrc); fail_unless_equals_int (gst_rtp_buffer_get_payload_type (&rtp), payload_type); fail_unless_equals_int (gst_rtp_buffer_get_seq (&rtp), i); gst_rtp_buffer_unmap (&rtp); /* retransmit all the previous ones */ for (j = 1; j < i; j++) { res = gst_pad_push_event (sinkpad, create_rtx_event (j, ssrc, payload_type)); fail_unless_equals_int (res, TRUE); } /* push this one, triggering the retransmit in rtxsend's chain() function */ gst_pad_push (srcpad, gst_buffer_ref (buffer)); node = g_list_next (node); } /* verify the result. buffers should be in this order (numbers are seqnums): * 1, 1rtx, 2, 1rtx, 2rtx, 3, ... , 9, 5rtx, 6rtx, 7rtx, 8rtx, 9rtx, 10 */ { GstRTPBuffer orig_rtp = GST_RTP_BUFFER_INIT; gint expected_rtx_requests, expected_rtx_packets; gint real_rtx_requests, real_rtx_packets; /* verify statistics first */ expected_rtx_packets = half_buffers * half_buffers + ((half_buffers - 1) / 2.0f) * half_buffers; for (i = 1, expected_rtx_requests = 0; i < num_buffers; i++) expected_rtx_requests += i; g_object_get (rtxsend, "num-rtx-requests", &real_rtx_requests, "num-rtx-packets", &real_rtx_packets, NULL); fail_unless_equals_int (expected_rtx_requests, real_rtx_requests); fail_unless_equals_int (expected_rtx_packets, real_rtx_packets); /* and the number of actual buffers that we were pushed out of rtxsend */ fail_unless_equals_int (g_list_length (buffers), num_buffers + expected_rtx_packets); node = buffers; for (i = 1; i <= num_buffers; i++) { /* verify the retransmission packets */ for (j = MAX (i - half_buffers, 1); j < i; j++) { GST_INFO ("checking %d, %d", i, j); res = gst_rtp_buffer_map (GST_BUFFER (node->data), GST_MAP_READ, &rtp); fail_unless_equals_int (res, TRUE); fail_if (gst_rtp_buffer_get_ssrc (&rtp) == ssrc); fail_unless_equals_int (gst_rtp_buffer_get_payload_type (&rtp), rtx_payload_type); fail_unless_equals_int (GST_READ_UINT16_BE (gst_rtp_buffer_get_payload (&rtp)), j); /* j == rtx seqnum */ /* open the original packet for this rtx packet and verify timestamps */ res = gst_rtp_buffer_map (GST_BUFFER (g_list_nth_data (in_buffers, j - 1)), GST_MAP_READ, &orig_rtp); fail_unless_equals_int (res, TRUE); fail_unless_equals_int (gst_rtp_buffer_get_timestamp (&orig_rtp), gst_rtp_buffer_get_timestamp (&rtp)); gst_rtp_buffer_unmap (&orig_rtp); gst_rtp_buffer_unmap (&rtp); node = g_list_next (node); } /* verify the normal rtp flow packet */ res = gst_rtp_buffer_map (GST_BUFFER (node->data), GST_MAP_READ, &rtp); fail_unless_equals_int (res, TRUE); fail_unless_equals_int (gst_rtp_buffer_get_ssrc (&rtp), ssrc); fail_unless_equals_int (gst_rtp_buffer_get_payload_type (&rtp), payload_type); fail_unless_equals_int (gst_rtp_buffer_get_seq (&rtp), i); gst_rtp_buffer_unmap (&rtp); node = g_list_next (node); } } g_list_free_full (in_buffers, (GDestroyNotify) gst_buffer_unref); gst_check_drop_buffers (); gst_check_teardown_src_pad (rtxsend); gst_check_teardown_sink_pad (rtxsend); gst_check_teardown_element (rtxsend); } GST_START_TEST (test_rtxsender_max_size_packets) { test_rtxsender_packet_retention (FALSE); } GST_END_TEST; GST_START_TEST (test_rtxsender_max_size_time) { test_rtxsender_packet_retention (TRUE); } GST_END_TEST; static Suite * rtprtx_suite (void) { Suite *s = suite_create ("rtprtx"); TCase *tc_chain = tcase_create ("general"); tcase_set_timeout (tc_chain, 10); suite_add_tcase (s, tc_chain); tcase_add_test (tc_chain, test_push_forward_seq); tcase_add_test (tc_chain, test_drop_one_sender); tcase_add_test (tc_chain, test_drop_multiple_sender); tcase_add_test (tc_chain, test_rtxsender_max_size_packets); tcase_add_test (tc_chain, test_rtxsender_max_size_time); return s; } GST_CHECK_MAIN (rtprtx);