rtptwcc: changes to use rtp buffer arrival time and current time.

For TWCC we are more interested to track the arrival time (receive side)
and the current time (sender side) of the buffers rather than the
running time.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-good/-/merge_requests/927>
This commit is contained in:
Tulio Beloqui 2021-04-13 16:19:22 +02:00 committed by GStreamer Marge Bot
parent 0440cb12de
commit 266c2d0619
4 changed files with 126 additions and 47 deletions

View file

@ -2172,9 +2172,11 @@ update_packet_info (RTPSession * sess, RTPPacketInfo * pinfo,
res =
gst_buffer_list_foreach (list, (GstBufferListFunc) update_packet,
pinfo);
pinfo->arrival_time = GST_CLOCK_TIME_NONE;
} else {
GstBuffer *buffer = GST_BUFFER_CAST (data);
res = update_packet (&buffer, 0, pinfo);
pinfo->arrival_time = GST_BUFFER_DTS (buffer);
}
return res;

View file

@ -70,6 +70,7 @@ typedef struct {
* @address: address of the sender of the packet
* @current_time: current time according to the system clock
* @running_time: time of a packet as buffer running_time
* @arrival_time: time of arrival of a packet
* @ntpnstime: time of a packet NTP time in nanoseconds
* @header_len: number of overhead bytes per packet
* @bytes: bytes of the packet including lowlevel overhead
@ -92,6 +93,7 @@ typedef struct {
GSocketAddress *address;
GstClockTime current_time;
GstClockTime running_time;
GstClockTime arrival_time;
guint64 ntpnstime;
guint header_len;
guint bytes;

View file

@ -159,7 +159,11 @@ recv_packet_init (RecvPacket * packet, guint16 seqnum, RTPPacketInfo * pinfo)
{
memset (packet, 0, sizeof (RecvPacket));
packet->seqnum = seqnum;
packet->ts = pinfo->running_time;
if (GST_CLOCK_TIME_IS_VALID (pinfo->arrival_time))
packet->ts = pinfo->arrival_time;
else
packet->ts = pinfo->current_time;
}
static guint8
@ -784,7 +788,7 @@ rtp_twcc_manager_recv_packet (RTPTWCCManager * twcc, RTPPacketInfo * pinfo)
GST_LOG ("Receive: twcc-seqnum: %u, pt: %u, marker: %d, ts: %"
GST_TIME_FORMAT, seqnum, pinfo->pt, pinfo->marker,
GST_TIME_ARGS (pinfo->running_time));
GST_TIME_ARGS (pinfo->arrival_time));
if (!pinfo->marker)
twcc->packet_count_no_marker++;
@ -841,7 +845,7 @@ static void
sent_packet_init (SentPacket * packet, guint16 seqnum, RTPPacketInfo * pinfo)
{
packet->seqnum = seqnum;
packet->ts = pinfo->running_time;
packet->ts = pinfo->current_time;
packet->size = pinfo->payload_len;
packet->pt = pinfo->pt;
packet->remote_ts = GST_CLOCK_TIME_NONE;
@ -864,8 +868,9 @@ rtp_twcc_manager_send_packet (RTPTWCCManager * twcc, RTPPacketInfo * pinfo)
g_array_append_val (twcc->sent_packets, packet);
GST_LOG ("Send: twcc-seqnum: %u, pt: %u, marker: %d, ts: %" GST_TIME_FORMAT,
seqnum, pinfo->pt, pinfo->marker, GST_TIME_ARGS (pinfo->running_time));
GST_LOG ("Send: twcc-seqnum: %u, pt: %u, marker: %d, ts: %"
GST_TIME_FORMAT, seqnum, pinfo->pt, pinfo->marker,
GST_TIME_ARGS (pinfo->current_time));
}
static void

View file

@ -55,7 +55,8 @@ generate_caps (void)
static GstBuffer *
generate_test_buffer_full (GstClockTime ts,
guint seqnum, guint32 rtp_ts, guint ssrc,
gboolean marker_bit, guint8 twcc_ext_id, guint16 twcc_seqnum)
gboolean marker_bit, guint8 payload_type, guint8 twcc_ext_id,
guint16 twcc_seqnum)
{
GstBuffer *buf;
guint8 *payload;
@ -67,7 +68,7 @@ generate_test_buffer_full (GstClockTime ts,
GST_BUFFER_DTS (buf) = ts;
gst_rtp_buffer_map (buf, GST_MAP_READWRITE, &rtp);
gst_rtp_buffer_set_payload_type (&rtp, TEST_BUF_PT);
gst_rtp_buffer_set_payload_type (&rtp, payload_type);
gst_rtp_buffer_set_seq (&rtp, seqnum);
gst_rtp_buffer_set_timestamp (&rtp, rtp_ts);
gst_rtp_buffer_set_ssrc (&rtp, ssrc);
@ -93,7 +94,7 @@ static GstBuffer *
generate_test_buffer (guint seqnum, guint ssrc)
{
return generate_test_buffer_full (seqnum * TEST_BUF_DURATION,
seqnum, seqnum * TEST_RTP_TS_DURATION, ssrc, FALSE, 0, 0);
seqnum, seqnum * TEST_RTP_TS_DURATION, ssrc, FALSE, TEST_BUF_PT, 0, 0);
}
static GstBuffer *
@ -101,19 +102,26 @@ generate_twcc_recv_buffer (guint seqnum,
GstClockTime arrival_time, gboolean marker_bit)
{
return generate_test_buffer_full (arrival_time, seqnum,
seqnum * TEST_RTP_TS_DURATION, TEST_BUF_SSRC, marker_bit,
seqnum * TEST_RTP_TS_DURATION, TEST_BUF_SSRC, marker_bit, TEST_BUF_PT,
TEST_TWCC_EXT_ID, seqnum);
}
static GstBuffer *
generate_twcc_send_buffer_full (guint seqnum, gboolean marker_bit,
guint ssrc, guint8 payload_type)
{
return generate_test_buffer_full (seqnum * TEST_BUF_DURATION,
seqnum, seqnum * TEST_RTP_TS_DURATION, ssrc, marker_bit,
payload_type, TEST_TWCC_EXT_ID, seqnum);
}
static GstBuffer *
generate_twcc_send_buffer (guint seqnum, gboolean marker_bit)
{
return generate_test_buffer_full (seqnum * TEST_BUF_DURATION,
seqnum, seqnum * TEST_RTP_TS_DURATION, TEST_BUF_SSRC, marker_bit,
TEST_TWCC_EXT_ID, seqnum);
return generate_twcc_send_buffer_full (seqnum, marker_bit, TEST_BUF_SSRC,
TEST_BUF_PT);
}
typedef struct
{
GstHarness *send_rtp_h;
@ -2569,7 +2577,8 @@ generate_stepped_ts_buffer (guint i, gboolean stepped)
GST_TIME_ARGS (gst_util_uint64_scale_int (GST_SECOND, ts,
TEST_BUF_CLOCK_RATE)), i);
buf = generate_test_buffer_full (i * GST_MSECOND, i, ts, 0xAAAA, FALSE, 0, 0);
buf = generate_test_buffer_full (i * GST_MSECOND, i, ts, 0xAAAA, FALSE,
TEST_BUF_PT, 0, 0);
return buf;
}
@ -2716,8 +2725,12 @@ GST_START_TEST (test_twcc_header_and_run_length)
for (i = 0; i < td->num_packets; i++) {
gboolean last_packet = i == (td->num_packets - 1);
buf = generate_twcc_recv_buffer (i + td->base_seqnum,
td->base_time + i * td->duration, last_packet);
GstClockTime now = gst_clock_get_time (GST_CLOCK_CAST (h->testclock));
GstClockTime ts = td->base_time + i * td->duration;
if (ts > now)
gst_test_clock_set_time (h->testclock, ts);
buf = generate_twcc_recv_buffer (i + td->base_seqnum, ts, last_packet);
res = session_harness_recv_rtp (h, buf);
fail_unless_equals_int (GST_FLOW_OK, res);
}
@ -2869,6 +2882,32 @@ G_STMT_START { \
twcc_verify_packets_to_event (packets, event); \
} G_STMT_END
#define twcc_verify_stats(h, bitrate_sent, bitrate_recv, pkts_sent, pkts_recv, loss_pct, avg_dod) \
G_STMT_START { \
GstStructure *twcc_stats; \
guint stats_bitrate_sent; \
guint stats_bitrate_recv; \
guint stats_packets_sent; \
guint stats_packets_recv; \
gdouble stats_loss_pct; \
GstClockTimeDiff stats_avg_dod; \
twcc_stats = session_harness_get_last_twcc_stats (h); \
fail_unless (gst_structure_get (twcc_stats, \
"bitrate-sent", G_TYPE_UINT, &stats_bitrate_sent, \
"bitrate-recv", G_TYPE_UINT, &stats_bitrate_recv, \
"packets-sent", G_TYPE_UINT, &stats_packets_sent, \
"packets-recv", G_TYPE_UINT, &stats_packets_recv, \
"packet-loss-pct", G_TYPE_DOUBLE, &stats_loss_pct, \
"avg-delta-of-delta", G_TYPE_INT64, &stats_avg_dod, NULL)); \
fail_unless_equals_int (bitrate_sent, stats_bitrate_sent); \
fail_unless_equals_int (bitrate_recv, stats_bitrate_recv); \
fail_unless_equals_int (pkts_sent, stats_packets_sent); \
fail_unless_equals_int (pkts_recv, stats_packets_recv); \
fail_unless_equals_float (loss_pct, stats_loss_pct); \
fail_unless_equals_int64 (avg_dod, stats_avg_dod); \
gst_structure_free (twcc_stats); \
} G_STMT_END
GST_START_TEST (test_twcc_1_bit_status_vector)
{
SessionHarness *h0 = session_harness_new ();
@ -3776,7 +3815,7 @@ GST_START_TEST (test_twcc_recv_late_packet_fb_pkt_count_wrap)
guint8 exp_fci0[] = {
0x01, 0x00, /* base sequence number: 256 */
0x00, 0x01, /* packet status count: 1 */
0x00, 0x00, 0x01, /* reference time: 0 */
0x00, 0x00, 0x01, /* reference time: 1 */
0x00, /* feedback packet count: 00 */
/* packet chunks: */
0x20, 0x01, /* 0 0 1 0 0 0 0 0 | 0 0 0 0 0 0 0 1 */
@ -3787,8 +3826,8 @@ GST_START_TEST (test_twcc_recv_late_packet_fb_pkt_count_wrap)
guint8 exp_fci1[] = {
0x01, 0x01, /* base sequence number: 257 */
0x00, 0x01, /* packet status count: 1 */
0x00, 0x00, 0x01, /* reference time: 0 */
0x01, /* feedback packet count: 0 */
0x00, 0x00, 0x01, /* reference time: 1 */
0x01, /* feedback packet count: 1 */
/* packet chunks: */
0x20, 0x01, /* 0 0 1 0 0 0 0 0 | 0 0 0 0 0 0 0 1 */
0x01, /* 1 recv-delta */
@ -3799,15 +3838,15 @@ GST_START_TEST (test_twcc_recv_late_packet_fb_pkt_count_wrap)
/* Push packets to get the feedback packet count wrap limit */
for (i = 0; i < 255; i++) {
GstClockTime ts = i * 250 * GST_USECOND;
gst_test_clock_set_time (h->testclock, ts);
fail_unless_equals_int (GST_FLOW_OK,
session_harness_recv_rtp ((h),
generate_twcc_recv_buffer (i, i * 250 * GST_USECOND, TRUE)));
/* ignore the twcc for these ones */
gst_buffer_unref (session_harness_produce_twcc (h));
generate_twcc_recv_buffer (i, ts, TRUE)));
}
/* push pkt #256 to jump ahead and force the overflow */
gst_test_clock_set_time (h->testclock, 256 * 250 * GST_USECOND);
fail_unless_equals_int (GST_FLOW_OK,
session_harness_recv_rtp ((h),
generate_twcc_recv_buffer (256, 256 * 250 * GST_USECOND, TRUE)));
@ -3817,11 +3856,16 @@ GST_START_TEST (test_twcc_recv_late_packet_fb_pkt_count_wrap)
session_harness_recv_rtp ((h),
generate_twcc_recv_buffer (255, 255 * 250 * GST_USECOND, TRUE)));
/* push pkt #257 to verify fci is correct */
gst_test_clock_set_time (h->testclock, 257 * 250 * GST_USECOND);
fail_unless_equals_int (GST_FLOW_OK,
session_harness_recv_rtp ((h),
generate_twcc_recv_buffer (257, 257 * 250 * GST_USECOND, TRUE)));
/* ignore the twcc for the first 255 packets */
for (i = 0; i < 255; i++)
gst_buffer_unref (session_harness_produce_twcc (h));
/* we expect a fci for pkt #256 */
buf = session_harness_produce_twcc (h);
@ -3943,6 +3987,7 @@ GST_START_TEST (test_twcc_send_and_recv)
buf = generate_twcc_send_buffer (seq, slice == num_slices - 1);
res = session_harness_send_rtp (h_send, buf);
fail_unless_equals_int (GST_FLOW_OK, res);
session_harness_advance_and_crank (h_send, TEST_BUF_DURATION);
/* get the buffer ready for the network */
buf = session_harness_pull_send_rtp (h_send);
@ -3958,30 +4003,9 @@ GST_START_TEST (test_twcc_send_and_recv)
/* sender receives the TWCC packet */
session_harness_recv_rtcp (h_send, buf);
if (frame > 0) {
GstStructure *twcc_stats;
guint bitrate_sent;
guint bitrate_recv;
guint packets_sent;
guint packets_recv;
gdouble packet_loss_pct;
GstClockTimeDiff avg_delta_of_delta;
twcc_stats = session_harness_get_last_twcc_stats (h_send);
fail_unless (gst_structure_get (twcc_stats,
"bitrate-sent", G_TYPE_UINT, &bitrate_sent,
"bitrate-recv", G_TYPE_UINT, &bitrate_recv,
"packets-sent", G_TYPE_UINT, &packets_sent,
"packets-recv", G_TYPE_UINT, &packets_recv,
"packet-loss-pct", G_TYPE_DOUBLE, &packet_loss_pct,
"avg-delta-of-delta", G_TYPE_INT64, &avg_delta_of_delta, NULL));
fail_unless_equals_int (TEST_BUF_BPS, bitrate_sent);
fail_unless_equals_int (TEST_BUF_BPS, bitrate_recv);
fail_unless_equals_int (num_slices, packets_sent);
fail_unless_equals_int (num_slices, packets_recv);
fail_unless_equals_float (0.0f, packet_loss_pct);
fail_unless_equals_int64 (0, avg_delta_of_delta);
gst_structure_free (twcc_stats);
}
if (frame > 0)
twcc_verify_stats (h_send, TEST_BUF_BPS, TEST_BUF_BPS, num_slices,
num_slices, 0.0f, 0);
}
session_harness_free (h_send);
@ -3990,6 +4014,51 @@ GST_START_TEST (test_twcc_send_and_recv)
GST_END_TEST;
GST_START_TEST (test_twcc_multiple_payloads_below_window)
{
SessionHarness *h_send = session_harness_new ();
SessionHarness *h_recv = session_harness_new ();
guint i;
GstBuffer *buffers[] = {
generate_twcc_send_buffer_full (0, FALSE, 0xabc, 98),
generate_twcc_send_buffer_full (0, FALSE, 0xdef, 111),
generate_twcc_send_buffer_full (1, FALSE, 0xdef, 111),
generate_twcc_send_buffer_full (2, FALSE, 0xdef, 111),
generate_twcc_send_buffer_full (1, TRUE, 0xabc, 98),
};
/* enable twcc */
session_harness_set_twcc_recv_ext_id (h_recv, TEST_TWCC_EXT_ID);
session_harness_set_twcc_send_ext_id (h_send, TEST_TWCC_EXT_ID);
for (i = 0; i < G_N_ELEMENTS (buffers); i++) {
GstBuffer *buf = buffers[i];
GstFlowReturn res;
/* from payloder to rtpbin */
res = session_harness_send_rtp (h_send, buf);
fail_unless_equals_int (GST_FLOW_OK, res);
buf = session_harness_pull_send_rtp (h_send);
session_harness_advance_and_crank (h_send, TEST_BUF_DURATION);
/* buffer arrives at the receiver */
res = session_harness_recv_rtp (h_recv, buf);
fail_unless_equals_int (GST_FLOW_OK, res);
}
/* sender receives the TWCC packet from the receiver */
session_harness_recv_rtcp (h_send, session_harness_produce_twcc (h_recv));
twcc_verify_stats (h_send, 0, 0, 5, 5, 0.0f, GST_CLOCK_STIME_NONE);
session_harness_free (h_send);
session_harness_free (h_recv);
}
GST_END_TEST;
typedef struct
{
GstClockTime interval;
@ -4281,6 +4350,7 @@ rtpsession_suite (void)
tcase_add_test (tc_chain, test_twcc_recv_rtcp_reordered);
tcase_add_test (tc_chain, test_twcc_no_exthdr_in_buffer);
tcase_add_test (tc_chain, test_twcc_send_and_recv);
tcase_add_test (tc_chain, test_twcc_multiple_payloads_below_window);
tcase_add_loop_test (tc_chain, test_twcc_feedback_interval, 0,
G_N_ELEMENTS (test_twcc_feedback_interval_ctx));
tcase_add_test (tc_chain, test_twcc_feedback_count_wrap);