diff --git a/gst/rtpmanager/gstrtpsession.c b/gst/rtpmanager/gstrtpsession.c index ce5694de1b..8ce9275fff 100644 --- a/gst/rtpmanager/gstrtpsession.c +++ b/gst/rtpmanager/gstrtpsession.c @@ -239,6 +239,7 @@ enum PROP_MAX_DROPOUT_TIME, PROP_MAX_MISORDER_TIME, PROP_STATS, + PROP_TWCC_STATS, PROP_RTP_PROFILE, PROP_NTP_TIME_SOURCE, PROP_RTCP_SYNC_SEND_TIME @@ -277,6 +278,8 @@ struct _GstRtpSessionPrivate guint recv_rtx_req_count; guint sent_rtx_req_count; + GstStructure *last_twcc_stats; + /* * This is the list of processed packets in the receive path when upstream * pushed a buffer list. @@ -302,6 +305,8 @@ static GstClockTime gst_rtp_session_request_time (RTPSession * session, gpointer user_data); static void gst_rtp_session_notify_nack (RTPSession * sess, guint16 seqnum, guint16 blp, guint32 ssrc, gpointer user_data); +static void gst_rtp_session_notify_twcc (RTPSession * sess, + GstStructure * twcc_packets, GstStructure * twcc_stats, gpointer user_data); static void gst_rtp_session_reconfigure (RTPSession * sess, gpointer user_data); static void gst_rtp_session_notify_early_rtcp (RTPSession * sess, gpointer user_data); @@ -326,6 +331,7 @@ static RTPSessionCallbacks callbacks = { gst_rtp_session_request_key_unit, gst_rtp_session_request_time, gst_rtp_session_notify_nack, + gst_rtp_session_notify_twcc, gst_rtp_session_reconfigure, gst_rtp_session_notify_early_rtcp }; @@ -754,6 +760,30 @@ gst_rtp_session_class_init (GstRtpSessionClass * klass) "Various statistics", GST_TYPE_STRUCTURE, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); + /** + * GstRtpSession::twcc-stats: + * + * Various statistics derived from TWCC. This property returns a GstStructure + * with name RTPTWCCStats with the following fields: + * + * "bitrate-sent" G_TYPE_UINT The actual sent bitrate of TWCC packets + * "bitrate-recv" G_TYPE_UINT The estimated bitrate for the receiver. + * "packets-sent" G_TYPE_UINT Number of packets sent + * "packets-recv" G_TYPE_UINT Number of packets reported recevied + * "packet-loss-pct" G_TYPE_DOUBLE Packetloss percentage, based on + packets reported as lost from the recevier. + * "avg-delta-of-delta", G_TYPE_INT64 In nanoseconds, a moving window + average of the difference in inter-packet spacing between + sender and receiver. A sudden increase in this number can indicate + network congestion. + * + * Since: 1.18 + */ + g_object_class_install_property (gobject_class, PROP_TWCC_STATS, + g_param_spec_boxed ("twcc-stats", "TWCC Statistics", + "Various statistics from TWCC", GST_TYPE_STRUCTURE, + G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); + g_object_class_install_property (gobject_class, PROP_RTP_PROFILE, g_param_spec_enum ("rtp-profile", "RTP Profile", "RTP profile to use", GST_TYPE_RTP_PROFILE, DEFAULT_RTP_PROFILE, @@ -880,6 +910,8 @@ gst_rtp_session_finalize (GObject * object) g_cond_clear (&rtpsession->priv->cond); g_object_unref (rtpsession->priv->sysclock); g_object_unref (rtpsession->priv->session); + if (rtpsession->priv->last_twcc_stats) + gst_structure_free (rtpsession->priv->last_twcc_stats); G_OBJECT_CLASS (parent_class)->finalize (object); } @@ -1004,6 +1036,11 @@ gst_rtp_session_get_property (GObject * object, guint prop_id, case PROP_STATS: g_value_take_boxed (value, gst_rtp_session_create_stats (rtpsession)); break; + case PROP_TWCC_STATS: + GST_RTP_SESSION_LOCK (rtpsession); + g_value_set_boxed (value, priv->last_twcc_stats); + GST_RTP_SESSION_UNLOCK (rtpsession); + break; case PROP_RTP_PROFILE: g_object_get_property (G_OBJECT (priv->session), "rtp-profile", value); break; @@ -1563,12 +1600,15 @@ gst_rtp_session_cache_caps (GstRtpSession * rtpsession, GstCaps * caps) GST_DEBUG_OBJECT (rtpsession, "parsing caps"); s = gst_caps_get_structure (caps, 0); + if (!gst_structure_get_int (s, "payload", &payload)) return; if (g_hash_table_lookup (priv->ptmap, GINT_TO_POINTER (payload))) return; + rtp_session_update_recv_caps_structure (rtpsession->priv->session, s); + g_hash_table_insert (priv->ptmap, GINT_TO_POINTER (payload), gst_caps_ref (caps)); } @@ -2801,6 +2841,31 @@ gst_rtp_session_notify_nack (RTPSession * sess, guint16 seqnum, } } +static void +gst_rtp_session_notify_twcc (RTPSession * sess, + GstStructure * twcc_packets, GstStructure * twcc_stats, gpointer user_data) +{ + GstRtpSession *rtpsession = GST_RTP_SESSION (user_data); + GstEvent *event; + GstPad *send_rtp_sink; + + GST_RTP_SESSION_LOCK (rtpsession); + if ((send_rtp_sink = rtpsession->send_rtp_sink)) + gst_object_ref (send_rtp_sink); + if (rtpsession->priv->last_twcc_stats) + gst_structure_free (rtpsession->priv->last_twcc_stats); + rtpsession->priv->last_twcc_stats = twcc_stats; + GST_RTP_SESSION_UNLOCK (rtpsession); + + if (send_rtp_sink) { + event = gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM, twcc_packets); + gst_pad_push_event (send_rtp_sink, event); + gst_object_unref (send_rtp_sink); + } + + g_object_notify (G_OBJECT (rtpsession), "twcc-stats"); +} + static void gst_rtp_session_reconfigure (RTPSession * sess, gpointer user_data) { diff --git a/gst/rtpmanager/meson.build b/gst/rtpmanager/meson.build index 221c8e3e5f..118a1e1ea8 100644 --- a/gst/rtpmanager/meson.build +++ b/gst/rtpmanager/meson.build @@ -14,6 +14,7 @@ rtpmanager_sources = [ 'rtpsource.c', 'rtpstats.c', 'rtptimerqueue.c', + 'rtptwcc.c', 'gstrtpsession.c', 'gstrtpfunnel.c', ] diff --git a/gst/rtpmanager/rtpsession.c b/gst/rtpmanager/rtpsession.c index bfd96ca8ff..b96d3cdfdf 100644 --- a/gst/rtpmanager/rtpsession.c +++ b/gst/rtpmanager/rtpsession.c @@ -22,6 +22,7 @@ #define GLIB_DISABLE_DEPRECATION_WARNINGS #include +#include #include #include @@ -30,7 +31,7 @@ #include "rtpsession.h" -GST_DEBUG_CATEGORY_STATIC (rtp_session_debug); +GST_DEBUG_CATEGORY (rtp_session_debug); #define GST_CAT_DEFAULT rtp_session_debug /* signals and args */ @@ -115,6 +116,8 @@ enum (avg) = ((val) + (15 * (avg))) >> 4; +#define TWCC_EXTMAP_STR "http://www.ietf.org/id/draft-holmer-rmcat-transport-wide-cc-extensions-01" + /* GObject vmethods */ static void rtp_session_finalize (GObject * object); static void rtp_session_set_property (GObject * object, guint prop_id, @@ -706,6 +709,9 @@ rtp_session_init (RTPSession * sess) sess->timestamp_sender_reports = !DEFAULT_RTCP_DISABLE_SR_TIMESTAMP; sess->is_doing_ptp = TRUE; + + sess->twcc = rtp_twcc_manager_new (sess->mtu); + sess->twcc_stats = rtp_twcc_stats_new (); } static void @@ -727,6 +733,9 @@ rtp_session_finalize (GObject * object) for (i = 0; i < 1; i++) g_hash_table_destroy (sess->ssrcs[i]); + rtp_twcc_manager_free (sess->twcc); + rtp_twcc_stats_free (sess->twcc_stats); + g_mutex_clear (&sess->lock); G_OBJECT_CLASS (rtp_session_parent_class)->finalize (object); @@ -847,6 +856,7 @@ rtp_session_set_property (GObject * object, guint prop_id, break; case PROP_RTCP_MTU: sess->mtu = g_value_get_uint (value); + rtp_twcc_manager_set_mtu (sess->twcc, sess->mtu); break; case PROP_SDES: rtp_session_set_sdes_struct (sess, g_value_get_boxed (value)); @@ -1206,6 +1216,10 @@ rtp_session_set_callbacks (RTPSession * sess, RTPSessionCallbacks * callbacks, sess->callbacks.notify_nack = callbacks->notify_nack; sess->notify_nack_user_data = user_data; } + if (callbacks->notify_twcc) { + sess->callbacks.notify_twcc = callbacks->notify_twcc; + sess->notify_twcc_user_data = user_data; + } if (callbacks->reconfigure) { sess->callbacks.reconfigure = callbacks->reconfigure; sess->reconfigure_user_data = user_data; @@ -2067,10 +2081,15 @@ update_packet (GstBuffer ** buffer, guint idx, RTPPacketInfo * pinfo) pinfo->seqnum = gst_rtp_buffer_get_seq (&rtp); pinfo->pt = gst_rtp_buffer_get_payload_type (&rtp); pinfo->rtptime = gst_rtp_buffer_get_timestamp (&rtp); + pinfo->marker = gst_rtp_buffer_get_marker (&rtp); /* copy available csrc */ pinfo->csrc_count = gst_rtp_buffer_get_csrc_count (&rtp); for (i = 0; i < pinfo->csrc_count; i++) pinfo->csrcs[i] = gst_rtp_buffer_get_csrc (&rtp, i); + + /* RTP header extensions */ + pinfo->header_ext = gst_rtp_buffer_get_extension_bytes (&rtp, + &pinfo->header_ext_bit_pattern); } gst_rtp_buffer_unmap (&rtp); } @@ -2119,6 +2138,7 @@ update_packet_info (RTPSession * sess, RTPPacketInfo * pinfo, pinfo->bytes = 0; pinfo->payload_len = 0; pinfo->packets = 0; + pinfo->marker = FALSE; if (is_list) { GstBufferList *list = GST_BUFFER_LIST_CAST (data); @@ -2129,6 +2149,7 @@ update_packet_info (RTPSession * sess, RTPPacketInfo * pinfo, GstBuffer *buffer = GST_BUFFER_CAST (data); res = update_packet (&buffer, 0, pinfo); } + return res; } @@ -2141,6 +2162,23 @@ clean_packet_info (RTPPacketInfo * pinfo) gst_mini_object_unref (pinfo->data); pinfo->data = NULL; } + if (pinfo->header_ext) + g_bytes_unref (pinfo->header_ext); +} + +static gint32 +packet_info_get_twcc_seqnum (RTPPacketInfo * pinfo, guint8 ext_id) +{ + gint32 val = -1; + gpointer data; + guint size; + + if (gst_rtp_buffer_get_extension_onebyte_header_from_bytes (pinfo->header_ext, + pinfo->header_ext_bit_pattern, ext_id, 0, &data, &size)) { + if (size == 2) + val = GST_READ_UINT16_BE (data); + } + return val; } static gboolean @@ -2165,6 +2203,30 @@ source_update_active (RTPSession * sess, RTPSource * source, return TRUE; } +static void +process_twcc_packet (RTPSession * sess, RTPPacketInfo * pinfo) +{ + gint32 twcc_seqnum; + + if (sess->twcc_recv_ext_id == 0) + return; + + twcc_seqnum = packet_info_get_twcc_seqnum (pinfo, sess->twcc_recv_ext_id); + if (twcc_seqnum == -1) + return; + + if (rtp_twcc_manager_recv_packet (sess->twcc, twcc_seqnum, pinfo)) { + RTP_SESSION_UNLOCK (sess); + + /* TODO: find a better rational for this number, and possibly tune it based + on factors like framerate / bandwidth etc */ + if (!rtp_session_send_rtcp (sess, 100 * GST_MSECOND)) { + GST_INFO ("Could not schedule TWCC straight away"); + } + RTP_SESSION_LOCK (sess); + } +} + static gboolean source_update_sender (RTPSession * sess, RTPSource * source, gboolean prevsender) @@ -2244,6 +2306,7 @@ rtp_session_process_rtp (RTPSession * sess, GstBuffer * buffer, /* let source process the packet */ result = rtp_source_process_rtp (source, &pinfo); + process_twcc_packet (sess, &pinfo); /* source became active */ if (source_update_active (sess, source, prevactive)) @@ -2801,6 +2864,35 @@ rtp_session_process_nack (RTPSession * sess, guint32 sender_ssrc, } } +static void +rtp_session_process_twcc (RTPSession * sess, guint32 sender_ssrc, + guint32 media_ssrc, guint8 * fci_data, guint fci_length) +{ + GArray *twcc_packets; + GstStructure *twcc_packets_s; + GstStructure *twcc_stats_s; + + twcc_packets = rtp_twcc_manager_parse_fci (sess->twcc, + fci_data, fci_length * sizeof (guint32)); + if (twcc_packets == NULL) + return; + + twcc_packets_s = rtp_twcc_stats_get_packets_structure (twcc_packets); + twcc_stats_s = + rtp_twcc_stats_process_packets (sess->twcc_stats, twcc_packets); + + GST_DEBUG_OBJECT (sess, "Parsed TWCC: %" GST_PTR_FORMAT, twcc_packets_s); + GST_INFO_OBJECT (sess, "Current TWCC stats %" GST_PTR_FORMAT, twcc_stats_s); + + g_array_unref (twcc_packets); + + RTP_SESSION_UNLOCK (sess); + if (sess->callbacks.notify_twcc) + sess->callbacks.notify_twcc (sess, twcc_packets_s, twcc_stats_s, + sess->notify_twcc_user_data); + RTP_SESSION_LOCK (sess); +} + static void rtp_session_process_feedback (RTPSession * sess, GstRTCPPacket * packet, RTPPacketInfo * pinfo, GstClockTime current_time) @@ -2862,7 +2954,9 @@ rtp_session_process_feedback (RTPSession * sess, GstRTCPPacket * packet, if ((src && src->internal) || /* PSFB FIR puts the media ssrc inside the FCI */ - (type == GST_RTCP_TYPE_PSFB && fbtype == GST_RTCP_PSFB_TYPE_FIR)) { + (type == GST_RTCP_TYPE_PSFB && fbtype == GST_RTCP_PSFB_TYPE_FIR) || + /* TWCC is for all sources, so a single media-ssrc is not enough */ + (type == GST_RTCP_TYPE_RTPFB && fbtype == GST_RTCP_RTPFB_TYPE_TWCC)) { switch (type) { case GST_RTCP_TYPE_PSFB: switch (fbtype) { @@ -2890,6 +2984,10 @@ rtp_session_process_feedback (RTPSession * sess, GstRTCPPacket * packet, rtp_session_process_nack (sess, sender_ssrc, media_ssrc, fci_data, fci_length, current_time); break; + case GST_RTCP_RTPFB_TYPE_TWCC: + rtp_session_process_twcc (sess, sender_ssrc, media_ssrc, + fci_data, fci_length); + break; default: break; } @@ -3021,6 +3119,29 @@ invalid_packet: } } +static guint8 +_get_extmap_id_for_attribute (const GstStructure * s, const gchar * ext_name) +{ + guint i; + guint8 extmap_id = 0; + guint n_fields = gst_structure_n_fields (s); + + for (i = 0; i < n_fields; i++) { + const gchar *field_name = gst_structure_nth_field_name (s, i); + if (g_str_has_prefix (field_name, "extmap-")) { + const gchar *str = gst_structure_get_string (s, field_name); + if (str && g_strcmp0 (str, ext_name) == 0) { + gint64 id = g_ascii_strtoll (field_name + 7, NULL, 10); + if (id > 0 && id < 15) { + extmap_id = id; + break; + } + } + } + } + return extmap_id; +} + /** * rtp_session_update_send_caps: * @sess: an #RTPSession @@ -3075,8 +3196,30 @@ rtp_session_update_send_caps (RTPSession * sess, GstCaps * caps) } else { sess->internal_ssrc_from_caps_or_property = FALSE; } + + sess->twcc_send_ext_id = _get_extmap_id_for_attribute (s, TWCC_EXTMAP_STR); + if (sess->twcc_send_ext_id > 0) { + GST_INFO ("TWCC enabled for send using extension id: %u", + sess->twcc_send_ext_id); + } } +static void +send_twcc_packet (RTPSession * sess, RTPPacketInfo * pinfo) +{ + gint32 twcc_seqnum; + + if (sess->twcc_send_ext_id == 0) + return; + + twcc_seqnum = packet_info_get_twcc_seqnum (pinfo, sess->twcc_send_ext_id); + if (twcc_seqnum == -1) + return; + + rtp_twcc_manager_send_packet (sess->twcc, twcc_seqnum, pinfo); +} + + /** * rtp_session_send_rtp: * @sess: an #RTPSession @@ -3111,6 +3254,8 @@ rtp_session_send_rtp (RTPSession * sess, gpointer data, gboolean is_list, current_time, running_time, -1)) goto invalid_packet; + send_twcc_packet (sess, &pinfo); + source = obtain_internal_source (sess, pinfo.ssrc, &created, current_time); if (created) on_new_sender_ssrc (sess, source); @@ -3168,7 +3313,7 @@ invalid_packet: collision: { g_object_unref (source); - gst_mini_object_unref (GST_MINI_OBJECT_CAST (data)); + clean_packet_info (&pinfo); RTP_SESSION_UNLOCK (sess); GST_WARNING ("non-internal source with same ssrc %08x, drop packet", pinfo.ssrc); @@ -4114,6 +4259,37 @@ remove_closing_sources (const gchar * key, RTPSource * source, return FALSE; } +static void +generate_twcc (const gchar * key, RTPSource * source, ReportData * data) +{ + RTPSession *sess = data->sess; + GstBuffer *buf; + + /* only generate RTCP for active internal sources */ + if (!source->internal || source->sent_bye) + return; + + /* ignore other sources when we do the timeout after a scheduled BYE */ + if (sess->scheduled_bye && !source->marked_bye) + return; + + /* skip if RTCP is disabled */ + if (source->disable_rtcp) { + GST_DEBUG ("source %08x has RTCP disabled", source->ssrc); + return; + } + + while ((buf = rtp_twcc_manager_get_feedback (sess->twcc, source->ssrc))) { + ReportOutput *output = g_slice_new (ReportOutput); + output->source = g_object_ref (source); + output->is_bye = FALSE; + output->buffer = buf; + /* queue the RTCP packet to push later */ + g_queue_push_tail (&data->output, output); + } +} + + static void generate_rtcp (const gchar * key, RTPSource * source, ReportData * data) { @@ -4338,6 +4514,9 @@ rtp_session_on_timeout (RTPSession * sess, GstClockTime current_time, g_hash_table_foreach (sess->ssrcs[sess->mask_idx], (GHFunc) generate_rtcp, &data); + g_hash_table_foreach (sess->ssrcs[sess->mask_idx], + (GHFunc) generate_twcc, &data); + /* update the generation for all the sources that have been reported */ g_hash_table_foreach (sess->ssrcs[sess->mask_idx], (GHFunc) update_generation, &data); @@ -4721,3 +4900,22 @@ no_source: return FALSE; } } + +/** + * rtp_session_update_recv_caps_structure: + * @sess: an #RTPSession + * @s: a #GstStructure from a #GstCaps + * + * Update the caps of the receiver in the rtp session. + */ +void +rtp_session_update_recv_caps_structure (RTPSession * sess, + const GstStructure * s) +{ + guint8 ext_id = _get_extmap_id_for_attribute (s, TWCC_EXTMAP_STR); + if (ext_id > 0) { + sess->twcc_recv_ext_id = ext_id; + GST_INFO ("TWCC enabled for recv using extension id: %u", + sess->twcc_recv_ext_id); + } +} diff --git a/gst/rtpmanager/rtpsession.h b/gst/rtpmanager/rtpsession.h index 6aa28ca2e0..949fcc49b8 100644 --- a/gst/rtpmanager/rtpsession.h +++ b/gst/rtpmanager/rtpsession.h @@ -23,6 +23,7 @@ #include #include "rtpsource.h" +#include "rtptwcc.h" typedef struct _RTPSession RTPSession; typedef struct _RTPSessionClass RTPSessionClass; @@ -156,6 +157,15 @@ typedef GstClockTime (*RTPSessionRequestTime) (RTPSession *sess, typedef void (*RTPSessionNotifyNACK) (RTPSession *sess, guint16 seqnum, guint16 blp, guint32 ssrc, gpointer user_data); +/** + * RTPSessionNotifyTWCC: + * @user_data: user data specified when registering + * + * Notifies of Transport-wide congestion control packets and stats. + */ +typedef void (*RTPSessionNotifyTWCC) (RTPSession *sess, + GstStructure * twcc_packets, GstStructure * twcc_stats, gpointer user_data); + /** * RTPSessionReconfigure: * @sess: an #RTPSession @@ -186,6 +196,7 @@ typedef void (*RTPSessionNotifyEarlyRTCP) (RTPSession *sess, * @RTPSessionRequestKeyUnit: callback for requesting a new key unit * @RTPSessionRequestTime: callback for requesting the current time * @RTPSessionNotifyNACK: callback for notifying NACK + * @RTPSessionNotifyTWCC: callback for notifying TWCC * @RTPSessionReconfigure: callback for requesting reconfiguration * @RTPSessionNotifyEarlyRTCP: callback for notifying early RTCP * @@ -203,6 +214,7 @@ typedef struct { RTPSessionRequestKeyUnit request_key_unit; RTPSessionRequestTime request_time; RTPSessionNotifyNACK notify_nack; + RTPSessionNotifyTWCC notify_twcc; RTPSessionReconfigure reconfigure; RTPSessionNotifyEarlyRTCP notify_early_rtcp; } RTPSessionCallbacks; @@ -280,6 +292,7 @@ struct _RTPSession { gpointer request_key_unit_user_data; gpointer request_time_user_data; gpointer notify_nack_user_data; + gpointer notify_twcc_user_data; gpointer reconfigure_user_data; gpointer notify_early_rtcp_user_data; @@ -295,6 +308,12 @@ struct _RTPSession { GList *conflicting_addresses; gboolean timestamp_sender_reports; + + /* Transport-wide cc-extension */ + RTPTWCCManager *twcc; + RTPTWCCStats *twcc_stats; + guint8 twcc_recv_ext_id; + guint8 twcc_send_ext_id; }; /** @@ -418,5 +437,7 @@ gboolean rtp_session_request_nack (RTPSession * sess, guint16 seqnum, GstClockTime max_delay); +void rtp_session_update_recv_caps_structure (RTPSession * sess, const GstStructure * s); + #endif /* __RTP_SESSION_H__ */ diff --git a/gst/rtpmanager/rtpsource.c b/gst/rtpmanager/rtpsource.c index 6937c3adf1..a25875acac 100644 --- a/gst/rtpmanager/rtpsource.c +++ b/gst/rtpmanager/rtpsource.c @@ -920,8 +920,8 @@ push_packet (RTPSource * src, GstBuffer * buffer) return ret; } -static gint -get_clock_rate (RTPSource * src, guint8 payload) +static void +fetch_clock_rate_from_payload (RTPSource * src, guint8 payload) { if (src->payload == -1) { /* first payload received, nothing was in the caps, lock on to this payload */ @@ -946,7 +946,6 @@ get_clock_rate (RTPSource * src, guint8 payload) src->clock_rate = clock_rate; gst_rtp_packet_rate_ctx_reset (&src->packet_rate_ctx, clock_rate); } - return src->clock_rate; } /* Jitter is the variation in the delay of received packets in a flow. It is @@ -960,26 +959,23 @@ calculate_jitter (RTPSource * src, RTPPacketInfo * pinfo) GstClockTime running_time; guint32 rtparrival, transit, rtptime; gint32 diff; - gint clock_rate; - guint8 pt; /* get arrival time */ if ((running_time = pinfo->running_time) == GST_CLOCK_TIME_NONE) goto no_time; - pt = pinfo->pt; + GST_LOG ("SSRC %08x got payload %d", src->ssrc, pinfo->pt); - GST_LOG ("SSRC %08x got payload %d", src->ssrc, pt); - - /* get clockrate */ - if ((clock_rate = get_clock_rate (src, pt)) == -1) + /* check if clock-rate is valid */ + if (src->clock_rate == -1) goto no_clock_rate; rtptime = pinfo->rtptime; /* convert arrival time to RTP timestamp units, truncate to 32 bits, we don't * care about the absolute value, just the difference. */ - rtparrival = gst_util_uint64_scale_int (running_time, clock_rate, GST_SECOND); + rtparrival = + gst_util_uint64_scale_int (running_time, src->clock_rate, GST_SECOND); /* transit time is difference with RTP timestamp */ transit = rtparrival - rtptime; @@ -1002,7 +998,7 @@ calculate_jitter (RTPSource * src, RTPPacketInfo * pinfo) src->stats.last_rtptime = rtparrival; GST_LOG ("rtparrival %u, rtptime %u, clock-rate %d, diff %d, jitter: %f", - rtparrival, rtptime, clock_rate, diff, (src->stats.jitter) / 16.0); + rtparrival, rtptime, src->clock_rate, diff, (src->stats.jitter) / 16.0); return; @@ -1014,7 +1010,7 @@ no_time: } no_clock_rate: { - GST_WARNING ("cannot get clock-rate for pt %d", pt); + GST_WARNING ("cannot get clock-rate for pt %d", pinfo->pt); return; } } @@ -1265,6 +1261,8 @@ rtp_source_process_rtp (RTPSource * src, RTPPacketInfo * pinfo) g_return_val_if_fail (RTP_IS_SOURCE (src), GST_FLOW_ERROR); g_return_val_if_fail (pinfo != NULL, GST_FLOW_ERROR); + fetch_clock_rate_from_payload (src, pinfo->pt); + if (!update_receiver_stats (src, pinfo, TRUE)) return GST_FLOW_OK; @@ -1553,7 +1551,7 @@ rtp_source_get_new_sr (RTPSource * src, guint64 ntpnstime, if (src->clock_rate == -1 && src->pt_set) { GST_INFO ("no clock-rate, getting for pt %u and SSRC %u", src->pt, src->ssrc); - get_clock_rate (src, src->pt); + fetch_clock_rate_from_payload (src, src->pt); } if (src->clock_rate != -1) { diff --git a/gst/rtpmanager/rtpstats.c b/gst/rtpmanager/rtpstats.c index c07ac9b1fa..88e5f07d07 100644 --- a/gst/rtpmanager/rtpstats.c +++ b/gst/rtpmanager/rtpstats.c @@ -19,7 +19,10 @@ * Boston, MA 02110-1301, USA. */ +#define GLIB_DISABLE_DEPRECATION_WARNINGS + #include "rtpstats.h" +#include "rtptwcc.h" void gst_rtp_packet_rate_ctx_reset (RTPPacketRateCtx * ctx, gint32 clock_rate) @@ -445,3 +448,230 @@ __g_socket_address_to_string (GSocketAddress * addr) return ret; } + +static void +_append_structure_to_value_array (GValueArray * array, GstStructure * s) +{ + GValue *val; + g_value_array_append (array, NULL); + val = g_value_array_get_nth (array, array->n_values - 1); + g_value_init (val, GST_TYPE_STRUCTURE); + g_value_take_boxed (val, s); +} + +static void +_structure_take_value_array (GstStructure * s, + const gchar * field_name, GValueArray * array) +{ + GValue value = G_VALUE_INIT; + g_value_init (&value, G_TYPE_VALUE_ARRAY); + g_value_take_boxed (&value, array); + gst_structure_take_value (s, field_name, &value); + g_value_unset (&value); +} + +GstStructure * +rtp_twcc_stats_get_packets_structure (GArray * twcc_packets) +{ + GstStructure *ret = gst_structure_new_empty ("RTPTWCCPackets"); + GValueArray *array = g_value_array_new (0); + guint i; + + for (i = 0; i < twcc_packets->len; i++) { + RTPTWCCPacket *pkt = &g_array_index (twcc_packets, RTPTWCCPacket, i); + + GstStructure *pkt_s = gst_structure_new ("RTPTWCCPacket", + "seqnum", G_TYPE_UINT, pkt->seqnum, + "local-ts", G_TYPE_UINT64, pkt->local_ts, + "remote-ts", G_TYPE_UINT64, pkt->remote_ts, + "size", G_TYPE_UINT, pkt->size, + "lost", G_TYPE_BOOLEAN, pkt->status == RTP_TWCC_PACKET_STATUS_NOT_RECV, + NULL); + _append_structure_to_value_array (array, pkt_s); + } + + _structure_take_value_array (ret, "packets", array); + return ret; +} + +static void +rtp_twcc_stats_calculate_stats (RTPTWCCStats * stats, GArray * twcc_packets) +{ + guint packets_recv = 0; + guint i; + + for (i = 0; i < twcc_packets->len; i++) { + RTPTWCCPacket *pkt = &g_array_index (twcc_packets, RTPTWCCPacket, i); + + if (pkt->status != RTP_TWCC_PACKET_STATUS_NOT_RECV) + packets_recv++; + + if (GST_CLOCK_TIME_IS_VALID (pkt->local_ts) && + GST_CLOCK_TIME_IS_VALID (stats->last_local_ts)) { + pkt->local_delta = GST_CLOCK_DIFF (stats->last_local_ts, pkt->local_ts); + } + + if (GST_CLOCK_TIME_IS_VALID (pkt->remote_ts) && + GST_CLOCK_TIME_IS_VALID (stats->last_remote_ts)) { + pkt->remote_delta = + GST_CLOCK_DIFF (stats->last_remote_ts, pkt->remote_ts); + } + + if (GST_CLOCK_STIME_IS_VALID (pkt->local_delta) && + GST_CLOCK_STIME_IS_VALID (pkt->remote_delta)) { + pkt->delta_delta = pkt->remote_delta - pkt->local_delta; + } + + stats->last_local_ts = pkt->local_ts; + stats->last_remote_ts = pkt->remote_ts; + } + + stats->packets_sent = twcc_packets->len; + stats->packets_recv = packets_recv; +} + +static gint +_get_window_start_index (RTPTWCCStats * stats, GstClockTime duration, + GstClockTime * local_duration, GstClockTime * remote_duration) +{ + RTPTWCCPacket *last = NULL; + guint i; + + if (stats->packets->len < 2) + return -1; + + for (i = 0; i < stats->packets->len; i++) { + guint start_index = stats->packets->len - 1 - i; + RTPTWCCPacket *pkt = + &g_array_index (stats->packets, RTPTWCCPacket, start_index); + if (GST_CLOCK_TIME_IS_VALID (pkt->local_ts) + && GST_CLOCK_TIME_IS_VALID (pkt->remote_ts)) { + /* first find the last valid packet */ + if (last == NULL) { + last = pkt; + } else { + /* and then get the duration in local ts */ + GstClockTimeDiff ld = GST_CLOCK_DIFF (pkt->local_ts, last->local_ts); + if (ld >= duration) { + *local_duration = ld; + *remote_duration = GST_CLOCK_DIFF (pkt->remote_ts, last->remote_ts); + return start_index; + } + } + } + } + + return -1; +} + +static void +rtp_twcc_stats_calculate_windowed_stats (RTPTWCCStats * stats) +{ + guint i; + gint start_idx; + guint bits_sent = 0; + guint bits_recv = 0; + guint packets_sent = 0; + guint packets_recv = 0; + guint packets_lost; + GstClockTimeDiff delta_delta_sum = 0; + guint delta_delta_count = 0; + GstClockTime local_duration; + GstClockTime remote_duration; + + start_idx = _get_window_start_index (stats, stats->window_size, + &local_duration, &remote_duration); + if (start_idx == -1) { + return; + } + + /* remove the old packets */ + if (start_idx > 0) + g_array_remove_range (stats->packets, 0, start_idx); + + packets_sent = stats->packets->len - 1; + + for (i = 0; i < packets_sent; i++) { + RTPTWCCPacket *pkt = &g_array_index (stats->packets, RTPTWCCPacket, i); + + if (GST_CLOCK_TIME_IS_VALID (pkt->local_ts)) { + bits_sent += pkt->size * 8; + } + + if (GST_CLOCK_TIME_IS_VALID (pkt->remote_ts)) { + bits_recv += pkt->size * 8; + packets_recv++; + } + + if (GST_CLOCK_STIME_IS_VALID (pkt->delta_delta)) { + delta_delta_sum += pkt->delta_delta; + delta_delta_count++; + } + } + + packets_lost = packets_sent - packets_recv; + stats->packet_loss_pct = (packets_lost * 100) / (gfloat) packets_sent; + + if (delta_delta_count) { + GstClockTimeDiff avg_delta_of_delta = delta_delta_sum / delta_delta_count; + if (GST_CLOCK_STIME_IS_VALID (stats->avg_delta_of_delta)) { + stats->avg_delta_of_delta_change = + (avg_delta_of_delta - + stats->avg_delta_of_delta) / (250 * GST_USECOND); + } + stats->avg_delta_of_delta = avg_delta_of_delta; + } + + stats->bitrate_sent = + gst_util_uint64_scale (bits_sent, GST_SECOND, local_duration); + stats->bitrate_recv = + gst_util_uint64_scale (bits_recv, GST_SECOND, remote_duration); + + GST_DEBUG ("Got stats: bits_sent: %u, bits_recv: %u, packets_sent = %u, " + "packets_recv: %u, packetlost_pct = %f, sent_bitrate = %u, " + "recv_bitrate = %u, delta-delta-avg = %" GST_STIME_FORMAT ", " + "delta-delta-change: %f", bits_sent, bits_recv, stats->packets_sent, + packets_recv, stats->packet_loss_pct, stats->bitrate_sent, + stats->bitrate_recv, GST_STIME_ARGS (stats->avg_delta_of_delta), + stats->avg_delta_of_delta_change); +} + +RTPTWCCStats * +rtp_twcc_stats_new (void) +{ + RTPTWCCStats *stats = g_new0 (RTPTWCCStats, 1); + stats->packets = g_array_new (FALSE, FALSE, sizeof (RTPTWCCPacket)); + stats->last_local_ts = GST_CLOCK_TIME_NONE; + stats->last_remote_ts = GST_CLOCK_TIME_NONE; + stats->avg_delta_of_delta = GST_CLOCK_STIME_NONE; + stats->window_size = 300 * GST_MSECOND; /* FIXME: could be configurable? */ + return stats; +} + +void +rtp_twcc_stats_free (RTPTWCCStats * stats) +{ + g_array_unref (stats->packets); + g_free (stats); +} + +static GstStructure * +rtp_twcc_stats_get_stats_structure (RTPTWCCStats * stats) +{ + return gst_structure_new ("RTPTWCCStats", + "bitrate-sent", G_TYPE_UINT, stats->bitrate_sent, + "bitrate-recv", G_TYPE_UINT, stats->bitrate_recv, + "packets-sent", G_TYPE_UINT, stats->packets_sent, + "packets-recv", G_TYPE_UINT, stats->packets_recv, + "packet-loss-pct", G_TYPE_DOUBLE, stats->packet_loss_pct, + "avg-delta-of-delta", G_TYPE_INT64, stats->avg_delta_of_delta, NULL); +} + +GstStructure * +rtp_twcc_stats_process_packets (RTPTWCCStats * stats, GArray * twcc_packets) +{ + rtp_twcc_stats_calculate_stats (stats, twcc_packets); + g_array_append_vals (stats->packets, twcc_packets->data, twcc_packets->len); + rtp_twcc_stats_calculate_windowed_stats (stats); + return rtp_twcc_stats_get_stats_structure (stats); +} diff --git a/gst/rtpmanager/rtpstats.h b/gst/rtpmanager/rtpstats.h index bd3a54efed..776651f444 100644 --- a/gst/rtpmanager/rtpstats.h +++ b/gst/rtpmanager/rtpstats.h @@ -77,6 +77,10 @@ typedef struct { * @seqnum: the seqnum of the packet * @pt: the payload type of the packet * @rtptime: the RTP time of the packet + * @marker: the marker bit + * + * @tw_seqnum_ext_id: the extension-header ID for transport-wide seqnums + * @tw_seqnum: the transport-wide seqnum of the packet * * Structure holding information about the packet. */ @@ -97,8 +101,11 @@ typedef struct { guint16 seqnum; guint8 pt; guint32 rtptime; + gboolean marker; guint32 csrc_count; guint32 csrcs[16]; + GBytes *header_ext; + guint16 header_ext_bit_pattern; } RTPPacketInfo; /** @@ -245,6 +252,27 @@ typedef struct { guint nacks_received; } RTPSessionStats; +/** + * RTPTWCCStats: + * + * Stats kept for a session and used to produce TWCC stats. + */ +typedef struct { + GArray *packets; + GstClockTime window_size; + GstClockTime last_local_ts; + GstClockTime last_remote_ts; + + guint bitrate_sent; + guint bitrate_recv; + guint packets_sent; + guint packets_recv; + gfloat packet_loss_pct; + GstClockTimeDiff avg_delta_of_delta; + gfloat avg_delta_of_delta_change; +} RTPTWCCStats; + + void rtp_stats_init_defaults (RTPSessionStats *stats); void rtp_stats_set_bandwidths (RTPSessionStats *stats, @@ -264,4 +292,10 @@ void rtp_stats_set_min_interval (RTPSessionStats *stats, gboolean __g_socket_address_equal (GSocketAddress *a, GSocketAddress *b); gchar * __g_socket_address_to_string (GSocketAddress * addr); +RTPTWCCStats * rtp_twcc_stats_new (void); +void rtp_twcc_stats_free (RTPTWCCStats * stats); +GstStructure * rtp_twcc_stats_process_packets (RTPTWCCStats * stats, + GArray * twcc_packets); +GstStructure * rtp_twcc_stats_get_packets_structure (GArray * twcc_packets); + #endif /* __RTP_STATS_H__ */ diff --git a/gst/rtpmanager/rtptwcc.c b/gst/rtpmanager/rtptwcc.c new file mode 100644 index 0000000000..0d7b44605e --- /dev/null +++ b/gst/rtpmanager/rtptwcc.c @@ -0,0 +1,888 @@ +/* GStreamer + * Copyright (C) 2019 Pexip (http://pexip.com/) + * @author: Havard Graff + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ +#include "rtptwcc.h" +#include +#include +#include + +GST_DEBUG_CATEGORY_EXTERN (rtp_session_debug); +#define GST_CAT_DEFAULT rtp_session_debug + +#define REF_TIME_UNIT (64 * GST_MSECOND) +#define DELTA_UNIT (250 * GST_USECOND) +#define MAX_TS_DELTA (0xff * DELTA_UNIT) + +struct _RTPTWCCManager +{ + guint mtu; + guint max_packets_per_rtcp; + GArray *recv_packets; + + guint8 fb_pkt_count; + gint32 last_seqnum; + + GArray *sent_packets; + GArray *parsed_packets; + GQueue *rtcp_buffers; + + guint64 recv_sender_ssrc; + guint64 recv_media_ssrc; + + guint16 expected_recv_seqnum; + + gboolean first_fci_parse; + guint16 expected_parsed_seqnum; + guint8 expected_parsed_fb_pkt_count; +}; + +typedef enum +{ + RTP_TWCC_CHUNK_TYPE_RUN_LENGTH = 0, + RTP_TWCC_CHUNK_TYPE_STATUS_VECTOR = 1, +} RTPTWCCChunkType; + +typedef struct +{ + guint8 base_seqnum[2]; + guint8 packet_count[2]; + guint8 base_time[3]; + guint8 fb_pkt_count[1]; +} RTPTWCCHeader; + +typedef struct +{ + GstClockTime ts; + guint16 seqnum; + + gint64 delta; + RTPTWCCPacketStatus status; + guint16 missing_run; + guint equal_run; +} RecvPacket; + +typedef struct +{ + GstClockTime ts; + GstClockTime socket_ts; + GstClockTime remote_ts; + guint16 seqnum; + guint size; + gboolean lost; +} SentPacket; + +RTPTWCCManager * +rtp_twcc_manager_new (guint mtu) +{ + RTPTWCCManager *twcc = g_new0 (RTPTWCCManager, 1); + + twcc->recv_packets = g_array_new (FALSE, FALSE, sizeof (RecvPacket)); + + twcc->sent_packets = g_array_new (FALSE, FALSE, sizeof (SentPacket)); + twcc->parsed_packets = g_array_new (FALSE, FALSE, sizeof (RecvPacket)); + + twcc->rtcp_buffers = g_queue_new (); + + twcc->last_seqnum = -1; + twcc->recv_media_ssrc = -1; + twcc->recv_sender_ssrc = -1; + + rtp_twcc_manager_set_mtu (twcc, mtu); + + twcc->first_fci_parse = TRUE; + + return twcc; +} + +void +rtp_twcc_manager_free (RTPTWCCManager * twcc) +{ + g_array_unref (twcc->recv_packets); + g_array_unref (twcc->sent_packets); + g_array_unref (twcc->parsed_packets); + g_queue_free_full (twcc->rtcp_buffers, (GDestroyNotify) gst_buffer_unref); + g_free (twcc); +} + +static void +recv_packet_init (RecvPacket * packet, guint16 seqnum, RTPPacketInfo * pinfo) +{ + memset (packet, 0, sizeof (RecvPacket)); + packet->seqnum = seqnum; + packet->ts = pinfo->running_time; +} + +void +rtp_twcc_manager_set_mtu (RTPTWCCManager * twcc, guint mtu) +{ + twcc->mtu = mtu; + + /* the absolute worst case is that 7 packets uses + header (4 * 4 * 4) 32 bytes) and + packet_chunk 2 bytes + + recv_deltas (2 * 7) 14 bytes */ + twcc->max_packets_per_rtcp = ((twcc->mtu - 32) * 7) / (2 + 14); +} + +static gint +_twcc_seqnum_sort (gconstpointer a, gconstpointer b) +{ + gint32 seqa = ((RecvPacket *) a)->seqnum; + gint32 seqb = ((RecvPacket *) b)->seqnum; + gint res = seqa - seqb; + if (res < -65000) + res = 1; + if (res > 65000) + res = -1; + return res; +} + +static void +rtp_twcc_write_recv_deltas (guint8 * fci_data, GArray * twcc_packets) +{ + guint i; + for (i = 0; i < twcc_packets->len; i++) { + RecvPacket *pkt = &g_array_index (twcc_packets, RecvPacket, i); + + if (pkt->status == RTP_TWCC_PACKET_STATUS_SMALL_DELTA) { + GST_WRITE_UINT8 (fci_data, pkt->delta); + fci_data += 1; + } else if (pkt->status == RTP_TWCC_PACKET_STATUS_LARGE_NEGATIVE_DELTA) { + GST_WRITE_UINT16_BE (fci_data, pkt->delta); + fci_data += 2; + } + } +} + +static void +rtp_twcc_write_run_length_chunk (GArray * packet_chunks, + RTPTWCCPacketStatus status, guint run_length) +{ + guint written = 0; + while (written < run_length) { + GstBitWriter writer; + guint16 data = 0; + guint len = MIN (run_length - written, 8191); + + GST_LOG ("Writing a run-lenght of %u with status %u", len, status); + + gst_bit_writer_init_with_data (&writer, (guint8 *) & data, 2, FALSE); + gst_bit_writer_put_bits_uint8 (&writer, RTP_TWCC_CHUNK_TYPE_RUN_LENGTH, 1); + gst_bit_writer_put_bits_uint8 (&writer, status, 2); + gst_bit_writer_put_bits_uint16 (&writer, len, 13); + g_array_append_val (packet_chunks, data); + written += len; + } +} + +typedef struct +{ + GArray *packet_chunks; + GstBitWriter writer; + guint16 data; + guint symbol_size; +} ChunkBitWriter; + +static void +chunk_bit_writer_reset (ChunkBitWriter * writer) +{ + writer->data = 0; + gst_bit_writer_init_with_data (&writer->writer, + (guint8 *) & writer->data, 2, FALSE); + + gst_bit_writer_put_bits_uint8 (&writer->writer, + RTP_TWCC_CHUNK_TYPE_STATUS_VECTOR, 1); + /* 1 for 2-bit symbol-size, 0 for 1-bit */ + gst_bit_writer_put_bits_uint8 (&writer->writer, writer->symbol_size - 1, 1); +} + +static void +chunk_bit_writer_configure (ChunkBitWriter * writer, guint symbol_size) +{ + writer->symbol_size = symbol_size; + chunk_bit_writer_reset (writer); +} + +static gboolean +chunk_bit_writer_is_empty (ChunkBitWriter * writer) +{ + return writer->writer.bit_size == 2; +} + +static gboolean +chunk_bit_writer_is_full (ChunkBitWriter * writer) +{ + return writer->writer.bit_size == 16; +} + +static guint +chunk_bit_writer_get_available_slots (ChunkBitWriter * writer) +{ + return (16 - writer->writer.bit_size) / writer->symbol_size; +} + +static guint +chunk_bit_writer_get_total_slots (ChunkBitWriter * writer) +{ + return 14 / writer->symbol_size; +} + +static void +chunk_bit_writer_flush (ChunkBitWriter * writer) +{ + /* don't append a chunk if no bits have been written */ + if (!chunk_bit_writer_is_empty (writer)) { + g_array_append_val (writer->packet_chunks, writer->data); + chunk_bit_writer_reset (writer); + } +} + +static void +chunk_bit_writer_init (ChunkBitWriter * writer, + GArray * packet_chunks, guint symbol_size) +{ + writer->packet_chunks = packet_chunks; + chunk_bit_writer_configure (writer, symbol_size); +} + +static void +chunk_bit_writer_write (ChunkBitWriter * writer, RTPTWCCPacketStatus status) +{ + gst_bit_writer_put_bits_uint8 (&writer->writer, status, writer->symbol_size); + if (chunk_bit_writer_is_full (writer)) { + chunk_bit_writer_flush (writer); + } +} + +static void +rtp_twcc_write_status_vector_chunk (ChunkBitWriter * writer, RecvPacket * pkt) +{ + if (pkt->missing_run > 0) { + guint available = chunk_bit_writer_get_available_slots (writer); + guint total = chunk_bit_writer_get_total_slots (writer); + if (pkt->missing_run > (available + total)) { + /* here it is better to finish up the current status-chunk and then + go for run-length */ + for (guint i = 0; i < available; i++) { + chunk_bit_writer_write (writer, RTP_TWCC_PACKET_STATUS_NOT_RECV); + } + rtp_twcc_write_run_length_chunk (writer->packet_chunks, + RTP_TWCC_PACKET_STATUS_NOT_RECV, pkt->missing_run - available); + } else { + for (guint i = 0; i < pkt->missing_run; i++) { + chunk_bit_writer_write (writer, RTP_TWCC_PACKET_STATUS_NOT_RECV); + } + } + } + + chunk_bit_writer_write (writer, pkt->status); +} + +typedef struct +{ + RecvPacket *equal; +} RunLengthHelper; + +static void +run_lenght_helper_update (RunLengthHelper * rlh, RecvPacket * pkt) +{ + /* for missing packets we reset */ + if (pkt->missing_run > 0) { + rlh->equal = NULL; + } + + /* all status equal run */ + if (rlh->equal == NULL) { + rlh->equal = pkt; + rlh->equal->equal_run = 0; + } + + if (rlh->equal->status == pkt->status) { + rlh->equal->equal_run++; + } else { + rlh->equal = pkt; + rlh->equal->equal_run = 1; + } +} + +static void +rtp_twcc_write_chunks (GArray * packet_chunks, + GArray * twcc_packets, guint symbol_size) +{ + ChunkBitWriter writer; + guint i; + guint bits_per_chunks = 7 * symbol_size; + + chunk_bit_writer_init (&writer, packet_chunks, symbol_size); + + for (i = 0; i < twcc_packets->len; i++) { + RecvPacket *pkt = &g_array_index (twcc_packets, RecvPacket, i); + guint remaining_packets = twcc_packets->len - i; + + /* we can only start a run-length chunk if the status-chunk is + completed */ + if (chunk_bit_writer_is_empty (&writer)) { + /* first write in any preceeding gaps, we use run-length + if it would take up more than one chunk (14/7) */ + if (pkt->missing_run > bits_per_chunks) { + rtp_twcc_write_run_length_chunk (packet_chunks, + RTP_TWCC_PACKET_STATUS_NOT_RECV, pkt->missing_run); + } + + /* we have a run of the same status, write a run-length chunk and skip + to the next point */ + if (pkt->missing_run == 0 && + (pkt->equal_run > bits_per_chunks || + pkt->equal_run == remaining_packets)) { + rtp_twcc_write_run_length_chunk (packet_chunks, + pkt->status, pkt->equal_run); + i += pkt->equal_run - 1; + continue; + } + } + + GST_LOG ("i=%u: Writing a %u-bit vector of status: %u", + i, symbol_size, pkt->status); + rtp_twcc_write_status_vector_chunk (&writer, pkt); + } + chunk_bit_writer_flush (&writer); +} + +static void +rtp_twcc_manager_add_fci (RTPTWCCManager * twcc, GstRTCPPacket * packet) +{ + RecvPacket *first, *last, *prev; + guint16 packet_count; + GstClockTime base_time; + GstClockTime ts_rounded; + guint i; + GArray *packet_chunks = g_array_new (FALSE, FALSE, 2); + RTPTWCCHeader header; + guint header_size = sizeof (RTPTWCCHeader); + guint packet_chunks_size; + guint recv_deltas_size = 0; + guint16 fci_length; + guint16 fci_chunks; + guint8 *fci_data; + guint8 *fci_data_ptr; + RunLengthHelper rlh = { NULL }; + guint symbol_size = 1; + GstClockTimeDiff delta_ts; + gint64 delta_ts_rounded; + + g_array_sort (twcc->recv_packets, _twcc_seqnum_sort); + + /* get first and last packet */ + first = &g_array_index (twcc->recv_packets, RecvPacket, 0); + last = + &g_array_index (twcc->recv_packets, RecvPacket, + twcc->recv_packets->len - 1); + + packet_count = last->seqnum - first->seqnum + 1; + base_time = first->ts / REF_TIME_UNIT; + + GST_WRITE_UINT16_BE (header.base_seqnum, first->seqnum); + GST_WRITE_UINT16_BE (header.packet_count, packet_count); + GST_WRITE_UINT24_BE (header.base_time, base_time); + GST_WRITE_UINT8 (header.fb_pkt_count, twcc->fb_pkt_count); + + base_time *= REF_TIME_UNIT; + ts_rounded = base_time; + + GST_DEBUG ("Created TWCC feedback: base_seqnum: #%u, packet_count: %u, " + "base_time %" GST_TIME_FORMAT " fb_pkt_count: %u", + first->seqnum, packet_count, GST_TIME_ARGS (base_time), + twcc->fb_pkt_count); + + twcc->fb_pkt_count++; + twcc->expected_recv_seqnum = first->seqnum + packet_count; + + /* calculate all deltas and check for gaps etc */ + prev = first; + for (i = 0; i < twcc->recv_packets->len; i++) { + RecvPacket *pkt = &g_array_index (twcc->recv_packets, RecvPacket, i); + if (i != 0) { + pkt->missing_run = pkt->seqnum - prev->seqnum - 1; + } + + delta_ts = GST_CLOCK_DIFF (ts_rounded, pkt->ts); + pkt->delta = delta_ts / DELTA_UNIT; + delta_ts_rounded = pkt->delta * DELTA_UNIT; + ts_rounded += delta_ts_rounded; + + if (delta_ts_rounded < 0 || delta_ts_rounded > MAX_TS_DELTA) { + pkt->status = RTP_TWCC_PACKET_STATUS_LARGE_NEGATIVE_DELTA; + recv_deltas_size += 2; + symbol_size = 2; + } else { + pkt->status = RTP_TWCC_PACKET_STATUS_SMALL_DELTA; + recv_deltas_size += 1; + } + run_lenght_helper_update (&rlh, pkt); + + GST_LOG ("pkt: #%u, ts: %" GST_TIME_FORMAT + " ts_rounded: %" GST_TIME_FORMAT + " delta_ts: %" GST_STIME_FORMAT + " delta_ts_rounded: %" GST_STIME_FORMAT + " missing_run: %u, status: %u", pkt->seqnum, + GST_TIME_ARGS (pkt->ts), GST_TIME_ARGS (ts_rounded), + GST_STIME_ARGS (delta_ts), GST_STIME_ARGS (delta_ts_rounded), + pkt->missing_run, pkt->status); + prev = pkt; + } + + rtp_twcc_write_chunks (packet_chunks, twcc->recv_packets, symbol_size); + + packet_chunks_size = packet_chunks->len * 2; + fci_length = header_size + packet_chunks_size + recv_deltas_size; + fci_chunks = (fci_length - 1) / sizeof (guint32) + 1; + + if (!gst_rtcp_packet_fb_set_fci_length (packet, fci_chunks)) { + GST_ERROR ("Could not fit: %u packets", packet_count); + g_assert_not_reached (); + } + + fci_data = gst_rtcp_packet_fb_get_fci (packet); + fci_data_ptr = fci_data; + + memcpy (fci_data_ptr, &header, header_size); + fci_data_ptr += header_size; + + memcpy (fci_data_ptr, packet_chunks->data, packet_chunks_size); + fci_data_ptr += packet_chunks_size; + + rtp_twcc_write_recv_deltas (fci_data_ptr, twcc->recv_packets); + + GST_MEMDUMP ("twcc-header:", (guint8 *) & header, header_size); + GST_MEMDUMP ("packet-chunks:", (guint8 *) packet_chunks->data, + packet_chunks_size); + GST_MEMDUMP ("full fci:", fci_data, fci_length); + + g_array_unref (packet_chunks); + g_array_set_size (twcc->recv_packets, 0); +} + +static void +rtp_twcc_manager_create_feedback (RTPTWCCManager * twcc) +{ + GstBuffer *buf; + GstRTCPBuffer rtcp = GST_RTCP_BUFFER_INIT; + GstRTCPPacket packet; + + buf = gst_rtcp_buffer_new (twcc->mtu); + + gst_rtcp_buffer_map (buf, GST_MAP_READWRITE, &rtcp); + + gst_rtcp_buffer_add_packet (&rtcp, GST_RTCP_TYPE_RTPFB, &packet); + + gst_rtcp_packet_fb_set_type (&packet, GST_RTCP_RTPFB_TYPE_TWCC); + if (twcc->recv_sender_ssrc != 1) + gst_rtcp_packet_fb_set_sender_ssrc (&packet, twcc->recv_sender_ssrc); + gst_rtcp_packet_fb_set_media_ssrc (&packet, twcc->recv_media_ssrc); + + rtp_twcc_manager_add_fci (twcc, &packet); + + gst_rtcp_buffer_unmap (&rtcp); + + g_queue_push_tail (twcc->rtcp_buffers, buf); +} + +/* we have calculated a (very pessimistic) max-packets per RTCP feedback, + so this is to make sure we don't exceed that */ +static gboolean +_exceeds_max_packets (RTPTWCCManager * twcc, guint16 seqnum) +{ + RecvPacket *first, *last; + guint16 packet_count; + + if (twcc->recv_packets->len == 0) + return FALSE; + + /* find the delta betwen first stored packet and this seqnum */ + first = &g_array_index (twcc->recv_packets, RecvPacket, 0); + packet_count = seqnum - first->seqnum + 1; + if (packet_count > twcc->max_packets_per_rtcp) + return TRUE; + + /* then find the delta between last stored packet and this seqnum */ + last = + &g_array_index (twcc->recv_packets, RecvPacket, + twcc->recv_packets->len - 1); + packet_count = seqnum - (last->seqnum + 1); + if (packet_count > twcc->max_packets_per_rtcp) + return TRUE; + + return FALSE; +} + +/* in this case we could have lost the packet with the marker bit, + so with a large (30) amount of packets, lost packets and still no marker, + we send a feedback anyway */ +static gboolean +_many_packets_some_lost (RTPTWCCManager * twcc, guint16 seqnum) +{ + RecvPacket *first; + guint16 packet_count; + guint received_packets = twcc->recv_packets->len; + if (received_packets == 0) + return FALSE; + + first = &g_array_index (twcc->recv_packets, RecvPacket, 0); + packet_count = seqnum - first->seqnum + 1; + /* packet-count larger than recevied-packets means we have lost packets */ + if (packet_count >= 30 && packet_count > received_packets) + return TRUE; + + return FALSE; +} + +gboolean +rtp_twcc_manager_recv_packet (RTPTWCCManager * twcc, + guint16 seqnum, RTPPacketInfo * pinfo) +{ + gboolean send_feedback = FALSE; + RecvPacket packet; + gint32 diff; + + /* if this packet would exceed the capacity of our MTU, we create a feedback + with the current packets, and start over with this one */ + if (_exceeds_max_packets (twcc, seqnum)) { + GST_INFO ("twcc-seqnum: %u would overflow max packets: %u, create feedback" + " with current packets", seqnum, twcc->max_packets_per_rtcp); + rtp_twcc_manager_create_feedback (twcc); + send_feedback = TRUE; + } + + /* we can have multiple ssrcs here, so just pick the first one */ + if (twcc->recv_media_ssrc == -1) + twcc->recv_media_ssrc = pinfo->ssrc; + + /* check if we are reordered, and treat it as lost if we already sent + a feedback msg with a higher seqnum. If the diff is huge, treat + it as a restart of a stream */ + diff = (gint32) seqnum - (gint32) twcc->expected_recv_seqnum; + if (twcc->fb_pkt_count > 0 && diff < 0 && diff > -1000) { + GST_INFO ("Received out of order packet (%u after %u), treating as lost", + seqnum, twcc->expected_recv_seqnum); + return FALSE; + } + + /* store the packet for Transport-wide RTCP feedback message */ + recv_packet_init (&packet, seqnum, pinfo); + g_array_append_val (twcc->recv_packets, packet); + twcc->last_seqnum = seqnum; + GST_LOG ("Receive: twcc-seqnum: %u, marker: %d, ts: %" GST_TIME_FORMAT, + seqnum, pinfo->marker, GST_TIME_ARGS (pinfo->running_time)); + + if (pinfo->marker || _many_packets_some_lost (twcc, seqnum)) { + rtp_twcc_manager_create_feedback (twcc); + send_feedback = TRUE; + } + + return send_feedback; +} + +static void +_change_rtcp_fb_sender_ssrc (GstBuffer * buf, guint32 sender_ssrc) +{ + GstRTCPBuffer rtcp = GST_RTCP_BUFFER_INIT; + GstRTCPPacket packet; + gst_rtcp_buffer_map (buf, GST_MAP_READWRITE, &rtcp); + gst_rtcp_buffer_get_first_packet (&rtcp, &packet); + gst_rtcp_packet_fb_set_sender_ssrc (&packet, sender_ssrc); + gst_rtcp_buffer_unmap (&rtcp); +} + +GstBuffer * +rtp_twcc_manager_get_feedback (RTPTWCCManager * twcc, guint sender_ssrc) +{ + GstBuffer *buf; + buf = g_queue_pop_head (twcc->rtcp_buffers); + + if (buf && twcc->recv_sender_ssrc != sender_ssrc) { + _change_rtcp_fb_sender_ssrc (buf, sender_ssrc); + twcc->recv_sender_ssrc = sender_ssrc; + } + + return buf; +} + +static void +sent_packet_init (SentPacket * packet, guint16 seqnum, RTPPacketInfo * pinfo) +{ + packet->seqnum = seqnum; + packet->ts = pinfo->running_time; + packet->size = pinfo->payload_len; + packet->remote_ts = GST_CLOCK_TIME_NONE; + packet->socket_ts = GST_CLOCK_TIME_NONE; + packet->lost = FALSE; +} + +void +rtp_twcc_manager_send_packet (RTPTWCCManager * twcc, + guint16 seqnum, RTPPacketInfo * pinfo) +{ + SentPacket packet; + sent_packet_init (&packet, seqnum, pinfo); + g_array_append_val (twcc->sent_packets, packet); + + GST_LOG ("Send: twcc-seqnum: %u, marker: %d, ts: %" GST_TIME_FORMAT, + seqnum, pinfo->marker, GST_TIME_ARGS (pinfo->running_time)); +} + +void +rtp_twcc_manager_set_send_packet_ts (RTPTWCCManager * twcc, + guint packet_id, GstClockTime ts) +{ + SentPacket *pkt = NULL; + pkt = &g_array_index (twcc->sent_packets, SentPacket, packet_id); + if (pkt) { + pkt->socket_ts = ts; + GST_DEBUG ("assigning: pkt-id: %u to packet: %u", packet_id, pkt->seqnum); + } +} + +static void +_add_twcc_packet (GArray * twcc_packets, guint16 seqnum, guint status) +{ + RTPTWCCPacket packet; + memset (&packet, 0, sizeof (RTPTWCCPacket)); + packet.local_ts = GST_CLOCK_TIME_NONE; + packet.remote_ts = GST_CLOCK_TIME_NONE; + packet.local_delta = GST_CLOCK_STIME_NONE; + packet.remote_delta = GST_CLOCK_STIME_NONE; + packet.delta_delta = GST_CLOCK_STIME_NONE; + packet.seqnum = seqnum; + packet.status = status; + g_array_append_val (twcc_packets, packet); +} + +static guint +_parse_run_length_chunk (GstBitReader * reader, GArray * twcc_packets, + guint16 seqnum_offset, guint remaining_packets) +{ + guint run_length; + guint8 status_code; + + gst_bit_reader_get_bits_uint8 (reader, &status_code, 2); + + run_length = *(guint16 *) reader->data & ~0xE0; /* mask out the 3 last bits */ + run_length = MIN (remaining_packets, GST_READ_UINT16_BE (&run_length)); + + for (guint i = 0; i < run_length; i++) { + _add_twcc_packet (twcc_packets, seqnum_offset + i, status_code); + } + + return run_length; +} + +static guint +_parse_status_vector_chunk (GstBitReader * reader, GArray * twcc_packets, + guint16 seqnum_offset, guint remaining_packets) +{ + guint8 symbol_size; + guint num_bits; + + gst_bit_reader_get_bits_uint8 (reader, &symbol_size, 1); + symbol_size += 1; + num_bits = MIN (remaining_packets, 14 / symbol_size); + + for (guint i = 0; i < num_bits; i++) { + guint8 status_code; + if (gst_bit_reader_get_bits_uint8 (reader, &status_code, symbol_size)) + _add_twcc_packet (twcc_packets, seqnum_offset + i, status_code); + } + + return num_bits; +} + +/* Remove all locally stored packets that has been reported + back to us */ +static void +_prune_sent_packets (RTPTWCCManager * twcc, GArray * twcc_packets) +{ + SentPacket *first; + RTPTWCCPacket *last; + guint16 last_idx; + + if (twcc_packets->len == 0 || twcc->sent_packets->len == 0) + return; + + first = &g_array_index (twcc->sent_packets, SentPacket, 0); + last = &g_array_index (twcc_packets, RTPTWCCPacket, twcc_packets->len - 1); + + last_idx = last->seqnum - first->seqnum; + + if (last_idx >= twcc->sent_packets->len) + g_array_remove_range (twcc->sent_packets, 0, last_idx); +} + +static void +_check_for_lost_packets (RTPTWCCManager * twcc, GArray * twcc_packets, + guint16 base_seqnum, guint16 packet_count, guint8 fb_pkt_count) +{ + guint packets_lost; + guint i; + + /* first packet */ + if (twcc->first_fci_parse) { + twcc->first_fci_parse = FALSE; + goto done; + } + + /* we have gone backwards, don't reset the expectations, + but process the packet nonetheless */ + if (fb_pkt_count < twcc->expected_parsed_fb_pkt_count) { + GST_WARNING ("feedback packet count going backwards (%u < %u)", + fb_pkt_count, twcc->expected_parsed_fb_pkt_count); + return; + } + + /* we have jumped forwards, reset expectations, but don't trigger + lost packets in case the missing fb-packet(s) arrive later */ + if (fb_pkt_count > twcc->expected_parsed_fb_pkt_count) { + GST_WARNING ("feedback packet count jumped ahead (%u > %u)", + fb_pkt_count, twcc->expected_parsed_fb_pkt_count); + goto done; + } + + packets_lost = base_seqnum - twcc->expected_parsed_seqnum; + for (i = 0; i < packets_lost; i++) { + _add_twcc_packet (twcc_packets, twcc->expected_parsed_seqnum + i, + RTP_TWCC_PACKET_STATUS_NOT_RECV); + } + +done: + twcc->expected_parsed_seqnum = base_seqnum + packet_count; + twcc->expected_parsed_fb_pkt_count = fb_pkt_count + 1; + return; +} + +GArray * +rtp_twcc_manager_parse_fci (RTPTWCCManager * twcc, + guint8 * fci_data, guint fci_length) +{ + GArray *twcc_packets; + guint16 base_seqnum; + guint16 packet_count; + GstClockTime base_time; + GstClockTime ts_rounded; + guint8 fb_pkt_count; + guint packets_parsed = 0; + guint fci_parsed; + guint i; + SentPacket *first_sent_pkt = NULL; + + if (fci_length < 10) { + GST_WARNING ("Malformed TWCC RTCP feedback packet"); + return NULL; + } + + base_seqnum = GST_READ_UINT16_BE (&fci_data[0]); + packet_count = GST_READ_UINT16_BE (&fci_data[2]); + base_time = GST_READ_UINT24_BE (&fci_data[4]) * REF_TIME_UNIT; + fb_pkt_count = fci_data[7]; + + GST_DEBUG ("Parsed TWCC feedback: base_seqnum: #%u, packet_count: %u, " + "base_time %" GST_TIME_FORMAT " fb_pkt_count: %u", + base_seqnum, packet_count, GST_TIME_ARGS (base_time), fb_pkt_count); + + twcc_packets = g_array_sized_new (FALSE, FALSE, + sizeof (RTPTWCCPacket), packet_count); + + _check_for_lost_packets (twcc, twcc_packets, + base_seqnum, packet_count, fb_pkt_count); + + fci_parsed = 8; + while (packets_parsed < packet_count && (fci_parsed + 1) < fci_length) { + GstBitReader reader = GST_BIT_READER_INIT (&fci_data[fci_parsed], 2); + guint8 chunk_type; + guint seqnum_offset = base_seqnum + packets_parsed; + guint remaining_packets = packet_count - packets_parsed; + + gst_bit_reader_get_bits_uint8 (&reader, &chunk_type, 1); + + if (chunk_type == RTP_TWCC_CHUNK_TYPE_RUN_LENGTH) { + packets_parsed += _parse_run_length_chunk (&reader, + twcc_packets, seqnum_offset, remaining_packets); + } else { + packets_parsed += _parse_status_vector_chunk (&reader, + twcc_packets, seqnum_offset, remaining_packets); + } + fci_parsed += 2; + } + + if (twcc->sent_packets->len > 0) + first_sent_pkt = &g_array_index (twcc->sent_packets, SentPacket, 0); + + ts_rounded = base_time; + for (i = 0; i < twcc_packets->len; i++) { + RTPTWCCPacket *pkt = &g_array_index (twcc_packets, RTPTWCCPacket, i); + gint16 delta = 0; + GstClockTimeDiff delta_ts; + + if (pkt->status == RTP_TWCC_PACKET_STATUS_SMALL_DELTA) { + delta = fci_data[fci_parsed]; + fci_parsed += 1; + } else if (pkt->status == RTP_TWCC_PACKET_STATUS_LARGE_NEGATIVE_DELTA) { + delta = GST_READ_UINT16_BE (&fci_data[fci_parsed]); + fci_parsed += 2; + } + + if (fci_parsed > fci_length) { + GST_WARNING ("Malformed TWCC RTCP feedback packet"); + g_array_set_size (twcc_packets, 0); + break; + } + + if (pkt->status != RTP_TWCC_PACKET_STATUS_NOT_RECV) { + delta_ts = delta * DELTA_UNIT; + ts_rounded += delta_ts; + pkt->remote_ts = ts_rounded; + + GST_LOG ("pkt: #%u, remote_ts: %" GST_TIME_FORMAT + " delta_ts: %" GST_STIME_FORMAT + " status: %u", pkt->seqnum, + GST_TIME_ARGS (pkt->remote_ts), GST_STIME_ARGS (delta_ts), + pkt->status); + } + + if (first_sent_pkt) { + SentPacket *found = NULL; + guint16 sent_idx = pkt->seqnum - first_sent_pkt->seqnum; + if (sent_idx < twcc->sent_packets->len) + found = &g_array_index (twcc->sent_packets, SentPacket, sent_idx); + if (found && found->seqnum == pkt->seqnum) { + if (GST_CLOCK_TIME_IS_VALID (found->socket_ts)) { + pkt->local_ts = found->socket_ts; + } else { + pkt->local_ts = found->ts; + } + pkt->size = found->size; + + GST_LOG ("matching pkt: #%u with local_ts: %" GST_TIME_FORMAT + " size: %u", pkt->seqnum, GST_TIME_ARGS (pkt->local_ts), pkt->size); + } + } + } + + _prune_sent_packets (twcc, twcc_packets); + + return twcc_packets; +} diff --git a/gst/rtpmanager/rtptwcc.h b/gst/rtpmanager/rtptwcc.h new file mode 100644 index 0000000000..39f9d58e0a --- /dev/null +++ b/gst/rtpmanager/rtptwcc.h @@ -0,0 +1,70 @@ +/* GStreamer + * Copyright (C) 2019 Pexip (http://pexip.com/) + * @author: Havard Graff + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#ifndef __RTP_TWCC_H__ +#define __RTP_TWCC_H__ + +#include +#include +#include "rtpstats.h" + +typedef struct _RTPTWCCManager RTPTWCCManager; +typedef struct _RTPTWCCPacket RTPTWCCPacket; +typedef enum _RTPTWCCPacketStatus RTPTWCCPacketStatus; + +enum _RTPTWCCPacketStatus +{ + RTP_TWCC_PACKET_STATUS_NOT_RECV = 0, + RTP_TWCC_PACKET_STATUS_SMALL_DELTA = 1, + RTP_TWCC_PACKET_STATUS_LARGE_NEGATIVE_DELTA = 2, +}; + +struct _RTPTWCCPacket +{ + GstClockTime local_ts; + GstClockTime remote_ts; + GstClockTimeDiff local_delta; + GstClockTimeDiff remote_delta; + GstClockTimeDiff delta_delta; + RTPTWCCPacketStatus status; + guint16 seqnum; + guint size; +}; + +RTPTWCCManager * rtp_twcc_manager_new (guint mtu); +void rtp_twcc_manager_free (RTPTWCCManager * twcc); + +void rtp_twcc_manager_set_mtu (RTPTWCCManager * twcc, guint mtu); + +gboolean rtp_twcc_manager_recv_packet (RTPTWCCManager * twcc, + guint16 seqnum, RTPPacketInfo * pinfo); + +void rtp_twcc_manager_send_packet (RTPTWCCManager * twcc, + guint16 seqnum, RTPPacketInfo * pinfo); +void rtp_twcc_manager_set_send_packet_ts (RTPTWCCManager * twcc, + guint packet_id, GstClockTime ts); + +GstBuffer * rtp_twcc_manager_get_feedback (RTPTWCCManager * twcc, + guint32 sender_ssrc); + +GArray * rtp_twcc_manager_parse_fci (RTPTWCCManager * twcc, + guint8 * fci_data, guint fci_length); + +#endif /* __RTP_TWCC_H__ */ diff --git a/tests/check/elements/rtpsession.c b/tests/check/elements/rtpsession.c index 1f150b52a1..da1a1fb071 100644 --- a/tests/check/elements/rtpsession.c +++ b/tests/check/elements/rtpsession.c @@ -37,9 +37,13 @@ #define TEST_BUF_SSRC 0x01BADBAD #define TEST_BUF_MS 20 #define TEST_BUF_DURATION (TEST_BUF_MS * GST_MSECOND) -#define TEST_BUF_SIZE (64000 * TEST_BUF_MS / 1000) +#define TEST_BUF_BPS 512000 +#define TEST_BUF_SIZE (TEST_BUF_BPS * TEST_BUF_MS / (1000 * 8)) #define TEST_RTP_TS_DURATION (TEST_BUF_CLOCK_RATE * TEST_BUF_MS / 1000) +#define TEST_TWCC_EXT_ID 5 +#define TWCC_EXTMAP_STR "http://www.ietf.org/id/draft-holmer-rmcat-transport-wide-cc-extensions-01" + static GstCaps * generate_caps (void) { @@ -49,8 +53,9 @@ generate_caps (void) } static GstBuffer * -generate_test_buffer_full (GstClockTime dts, - guint seq_num, guint32 rtp_ts, guint ssrc) +generate_test_buffer_full (GstClockTime ts, + guint seqnum, guint32 rtp_ts, guint ssrc, + gboolean marker_bit, guint8 twcc_ext_id, guint16 twcc_seqnum) { GstBuffer *buf; guint8 *payload; @@ -58,30 +63,57 @@ generate_test_buffer_full (GstClockTime dts, GstRTPBuffer rtp = GST_RTP_BUFFER_INIT; buf = gst_rtp_buffer_new_allocate (TEST_BUF_SIZE, 0, 0); - GST_BUFFER_DTS (buf) = dts; + GST_BUFFER_PTS (buf) = ts; + GST_BUFFER_DTS (buf) = ts; gst_rtp_buffer_map (buf, GST_MAP_READWRITE, &rtp); gst_rtp_buffer_set_payload_type (&rtp, TEST_BUF_PT); - gst_rtp_buffer_set_seq (&rtp, seq_num); + gst_rtp_buffer_set_seq (&rtp, seqnum); gst_rtp_buffer_set_timestamp (&rtp, rtp_ts); gst_rtp_buffer_set_ssrc (&rtp, ssrc); + gst_rtp_buffer_set_marker (&rtp, marker_bit); payload = gst_rtp_buffer_get_payload (&rtp); for (i = 0; i < TEST_BUF_SIZE; i++) payload[i] = 0xff; + if (twcc_ext_id > 0) { + guint8 twcc_seqnum_be[2]; + GST_WRITE_UINT16_BE (twcc_seqnum_be, twcc_seqnum); + gst_rtp_buffer_add_extension_onebyte_header (&rtp, twcc_ext_id, + twcc_seqnum_be, sizeof (twcc_seqnum_be)); + } + gst_rtp_buffer_unmap (&rtp); return buf; } static GstBuffer * -generate_test_buffer (guint seq_num, guint ssrc) +generate_test_buffer (guint seqnum, guint ssrc) { - return generate_test_buffer_full (seq_num * TEST_BUF_DURATION, - seq_num, seq_num * TEST_RTP_TS_DURATION, ssrc); + return generate_test_buffer_full (seqnum * TEST_BUF_DURATION, + seqnum, seqnum * TEST_RTP_TS_DURATION, ssrc, FALSE, 0, 0); } +static GstBuffer * +generate_twcc_recv_buffer (guint seqnum, + GstClockTime arrival_time, gboolean marker_bit) +{ + return generate_test_buffer_full (arrival_time, seqnum, + seqnum * TEST_RTP_TS_DURATION, TEST_BUF_SSRC, marker_bit, + TEST_TWCC_EXT_ID, seqnum); +} + +static GstBuffer * +generate_twcc_send_buffer (guint seqnum, gboolean marker_bit) +{ + return generate_test_buffer_full (seqnum * TEST_BUF_DURATION, + seqnum, seqnum * TEST_RTP_TS_DURATION, TEST_BUF_SSRC, marker_bit, + TEST_TWCC_EXT_ID, seqnum); +} + + typedef struct { GstHarness *send_rtp_h; @@ -94,6 +126,8 @@ typedef struct GstCaps *caps; gboolean running; + GMutex lock; + GstStructure *last_twcc_stats; } SessionHarness; static GstCaps * @@ -103,11 +137,38 @@ _pt_map_requested (GstElement * element, guint pt, gpointer data) return gst_caps_copy (h->caps); } +static void +_notify_twcc_stats (GParamSpec * spec G_GNUC_UNUSED, + GObject * object G_GNUC_UNUSED, gpointer data) +{ + SessionHarness *h = data; + GstStructure *stats; + g_object_get (h->session, "twcc-stats", &stats, NULL); + + g_mutex_lock (&h->lock); + if (h->last_twcc_stats) + gst_structure_free (h->last_twcc_stats); + h->last_twcc_stats = stats; + g_mutex_unlock (&h->lock); +} + +static GstStructure * +session_harness_get_last_twcc_stats (SessionHarness * h) +{ + GstStructure *ret = NULL; + g_mutex_lock (&h->lock); + if (h->last_twcc_stats) + ret = gst_structure_copy (h->last_twcc_stats); + g_mutex_unlock (&h->lock); + return ret; +} + static SessionHarness * session_harness_new (void) { SessionHarness *h = g_new0 (SessionHarness, 1); h->caps = generate_caps (); + g_mutex_init (&h->lock); h->testclock = GST_TEST_CLOCK_CAST (gst_test_clock_new ()); gst_system_clock_set_default (GST_CLOCK_CAST (h->testclock)); @@ -130,6 +191,9 @@ session_harness_new (void) g_signal_connect (h->session, "request-pt-map", (GCallback) _pt_map_requested, h); + g_signal_connect (h->session, "notify::twcc-stats", + (GCallback) _notify_twcc_stats, h); + g_object_get (h->session, "internal-session", &h->internal_session, NULL); return h; @@ -147,6 +211,11 @@ session_harness_free (SessionHarness * h) gst_harness_teardown (h->recv_rtp_h); gst_harness_teardown (h->send_rtp_h); + g_mutex_clear (&h->lock); + + if (h->last_twcc_stats) + gst_structure_free (h->last_twcc_stats); + g_object_unref (h->internal_session); gst_object_unref (h->session); g_free (h); @@ -158,6 +227,12 @@ session_harness_send_rtp (SessionHarness * h, GstBuffer * buf) return gst_harness_push (h->send_rtp_h, buf); } +static GstBuffer * +session_harness_pull_send_rtp (SessionHarness * h) +{ + return gst_harness_pull (h->send_rtp_h); +} + static GstFlowReturn session_harness_recv_rtp (SessionHarness * h, GstBuffer * buf) { @@ -254,6 +329,29 @@ session_harness_rtp_retransmission_request (SessionHarness * h, gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM, s)); } +static void +_add_twcc_field_to_caps (GstCaps * caps, guint8 ext_id) +{ + gchar *name = g_strdup_printf ("extmap-%u", ext_id); + gst_caps_set_simple (caps, name, G_TYPE_STRING, TWCC_EXTMAP_STR, NULL); + g_free (name); +} + +static void +session_harness_set_twcc_recv_ext_id (SessionHarness * h, guint8 ext_id) +{ + _add_twcc_field_to_caps (h->caps, ext_id); + g_signal_emit_by_name (h->session, "clear-pt-map"); +} + +static void +session_harness_set_twcc_send_ext_id (SessionHarness * h, guint8 ext_id) +{ + GstCaps *caps = gst_caps_copy (h->caps); + _add_twcc_field_to_caps (caps, ext_id); + gst_harness_set_src_caps (h->send_rtp_h, caps); +} + GST_START_TEST (test_multiple_ssrc_rr) { SessionHarness *h = session_harness_new (); @@ -2399,7 +2497,7 @@ generate_stepped_ts_buffer (guint i, gboolean stepped) GST_TIME_ARGS (gst_util_uint64_scale_int (GST_SECOND, ts, TEST_BUF_CLOCK_RATE)), i); - buf = generate_test_buffer_full (i * GST_MSECOND, i, ts, 0xAAAA); + buf = generate_test_buffer_full (i * GST_MSECOND, i, ts, 0xAAAA, FALSE, 0, 0); return buf; } @@ -2463,6 +2561,1054 @@ GST_START_TEST (test_stepped_packet_rate) GST_END_TEST; + +/********************* TWCC-tests *********************/ + +static GstRTCPFBType +_gst_buffer_get_rtcp_fbtype (GstBuffer * buf) +{ + GstRTCPFBType ret = GST_RTCP_FB_TYPE_INVALID; + GstRTCPBuffer rtcp = GST_RTCP_BUFFER_INIT; + GstRTCPPacket packet; + + if (!gst_rtcp_buffer_validate_reduced (buf)) + return ret; + + if (!gst_rtcp_buffer_map (buf, GST_MAP_READ, &rtcp)) + return ret; + + if (!gst_rtcp_buffer_get_first_packet (&rtcp, &packet)) + goto done; + + if (GST_RTCP_TYPE_RTPFB != gst_rtcp_packet_get_type (&packet)) + goto done; + + ret = gst_rtcp_packet_fb_get_type (&packet); + +done: + gst_rtcp_buffer_unmap (&rtcp); + return ret; +} + +static GstBuffer * +session_harness_pull_twcc_rtcp (SessionHarness * h) +{ + GstBuffer *ret = NULL; + + while (ret == NULL) { + GstBuffer *buf = session_harness_pull_rtcp (h); + if (GST_RTCP_RTPFB_TYPE_TWCC == _gst_buffer_get_rtcp_fbtype (buf)) { + ret = buf; + } else { + gst_buffer_unref (buf); + } + } + return ret; +} + +typedef struct +{ + guint16 base_seqnum; + guint16 num_packets; + GstClockTime base_time; + GstClockTime duration; +} TWCCTestData; + +static TWCCTestData twcc_header_and_run_lenght_test_data[] = { + {0, 10, 0, 33 * GST_MSECOND}, + {65530, 12, 37 * 64 * GST_MSECOND, 10 * GST_MSECOND}, /* seqnum wrap */ + {99, 200, 1024 * 64 * GST_MSECOND, 10 * GST_MSECOND}, /* many packets */ + {20000, 23, 0, 250 * GST_USECOND}, /* minimal duration */ + {56000, 15, 1000 * 64 * GST_MSECOND, 10 * GST_MSECOND}, /* timestamp offset */ +}; + +GST_START_TEST (test_twcc_header_and_run_length) +{ + SessionHarness *h = session_harness_new (); + gint i; + GstFlowReturn res; + GstBuffer *buf; + GstRTCPBuffer rtcp = GST_RTCP_BUFFER_INIT; + GstRTCPPacket packet; + guint8 *fci_data; + guint16 run_length; + + TWCCTestData *td = &twcc_header_and_run_lenght_test_data[__i__]; + + /* enable twcc */ + session_harness_set_twcc_recv_ext_id (h, TEST_TWCC_EXT_ID); + + /* receive some buffers */ + for (i = 0; i < td->num_packets; i++) { + gboolean last_packet = i == (td->num_packets - 1); + + buf = generate_twcc_recv_buffer (i + td->base_seqnum, + td->base_time + i * td->duration, last_packet); + res = session_harness_recv_rtp (h, buf); + fail_unless_equals_int (GST_FLOW_OK, res); + } + + session_harness_produce_rtcp (h, 1); + buf = session_harness_pull_twcc_rtcp (h); + fail_unless (buf); + + gst_rtcp_buffer_map (buf, GST_MAP_READ, &rtcp); + fail_unless (gst_rtcp_buffer_get_first_packet (&rtcp, &packet)); + + fci_data = gst_rtcp_packet_fb_get_fci (&packet); + + /* base seqnum */ + fail_unless_equals_int (td->base_seqnum, GST_READ_UINT16_BE (&fci_data[0])); + + /* packet count */ + fail_unless_equals_int (td->num_packets, GST_READ_UINT16_BE (&fci_data[2])); + + /* reference time (in 64ms units) */ + fail_unless_equals_int (td->base_time, + GST_READ_UINT24_BE (&fci_data[4]) * 64 * GST_MSECOND); + + /* feedback packet number */ + fail_unless_equals_int (0, fci_data[7]); + + /* run-length coding */ + fail_unless_equals_int (0, fci_data[8] & 0x80); + + /* status: small-delta */ + fail_unless_equals_int (0x20, fci_data[8] & 0x60); + + /* packets in run_length */ + run_length = *((guint16 *) & fci_data[8]); + run_length = run_length & ~0xE0; /* mask out the 3 last bits */ + fail_unless_equals_int (td->num_packets, GST_READ_UINT16_BE (&run_length)); + + /* first recv-delta always 0 */ + fail_unless_equals_int (0, fci_data[10]); + + /* following recv-delta equal to duration (in 250us units) */ + fail_unless_equals_clocktime (td->duration, fci_data[11] * 250 * GST_USECOND); + + gst_rtcp_buffer_unmap (&rtcp); + gst_buffer_unref (buf); + + session_harness_free (h); +} + +GST_END_TEST; + +typedef struct +{ + guint16 seqnum; + GstClockTime timestamp; + gboolean marker; +} TWCCPacket; + +#define twcc_push_packets(h, packets) \ +G_STMT_START { \ + guint i; \ + session_harness_set_twcc_recv_ext_id ((h), TEST_TWCC_EXT_ID); \ + for (i = 0; i < G_N_ELEMENTS ((packets)); i++) { \ + TWCCPacket *twcc_pkt = &(packets)[i]; \ + fail_unless_equals_int (GST_FLOW_OK, \ + session_harness_recv_rtp ((h), \ + generate_twcc_recv_buffer (twcc_pkt->seqnum, \ + twcc_pkt->timestamp, twcc_pkt->marker))); \ + } \ +} G_STMT_END + +#define twcc_verify_fci(buf, exp_fci) \ +G_STMT_START { \ + GstRTCPBuffer rtcp = GST_RTCP_BUFFER_INIT; \ + GstRTCPPacket packet; \ + guint8 *fci_data; \ + guint16 fci_length; \ + fail_unless (gst_rtcp_buffer_validate_reduced (buf)); \ + gst_rtcp_buffer_map (buf, GST_MAP_READ, &rtcp); \ + fail_unless (gst_rtcp_buffer_get_first_packet (&rtcp, &packet)); \ + fail_unless_equals_int (GST_RTCP_TYPE_RTPFB, \ + gst_rtcp_packet_get_type (&packet)); \ + fail_unless_equals_int (GST_RTCP_RTPFB_TYPE_TWCC, \ + gst_rtcp_packet_fb_get_type (&packet)); \ + fci_data = gst_rtcp_packet_fb_get_fci (&packet); \ + fci_length = gst_rtcp_packet_fb_get_fci_length (&packet) * sizeof (guint32); \ + fail_unless_equals_int (fci_length, sizeof (exp_fci)); \ + fail_unless_equals_int (0, memcmp (fci_data, (exp_fci), fci_length)); \ + gst_rtcp_buffer_unmap (&rtcp); \ +} G_STMT_END + +#define twcc_verify_packets_to_fci(h, packets, exp_fci) \ +G_STMT_START { \ + GstBuffer *buf; \ + twcc_push_packets (h, packets); \ + session_harness_produce_rtcp ((h), 1); \ + buf = session_harness_pull_twcc_rtcp ((h)); \ + twcc_verify_fci (buf, exp_fci); \ + gst_buffer_unref (buf); \ +} G_STMT_END + +#define twcc_verify_packets_to_event(packets, event) \ +G_STMT_START { \ + guint i; \ + guint j = 0; \ + GValueArray *packets_array = g_value_get_boxed ( \ + gst_structure_get_value (gst_event_get_structure ((event)), "packets")); \ + for (i = 0; i < packets_array->n_values; i++) { \ + TWCCPacket *twcc_pkt; \ + GstClockTime ts; \ + guint seqnum; \ + gboolean lost; \ + const GstStructure *pkt_s = \ + gst_value_get_structure (g_value_array_get_nth (packets_array, i)); \ + fail_unless (gst_structure_get_boolean (pkt_s, "lost", &lost)); \ + if (lost) \ + continue; \ + fail_unless (gst_structure_get_clock_time (pkt_s, "remote-ts", &ts)); \ + fail_unless (gst_structure_get_uint (pkt_s, "seqnum", &seqnum)); \ + twcc_pkt = &(packets)[j++]; \ + fail_unless_equals_int (twcc_pkt->seqnum, seqnum); \ + fail_unless_equals_clocktime (twcc_pkt->timestamp, ts); \ + } \ + gst_event_unref (event); \ +} G_STMT_END + +#define twcc_verify_packets_to_packets(send_h, recv_h, packets) \ +G_STMT_START { \ + guint i; \ + GstEvent *event; \ + twcc_push_packets ((recv_h), packets); \ + session_harness_produce_rtcp ((recv_h), 1); \ + session_harness_recv_rtcp ((send_h), \ + session_harness_pull_twcc_rtcp ((recv_h))); \ + for (i = 0; i < 2; i++) \ + gst_event_unref (gst_harness_pull_upstream_event ((send_h)->send_rtp_h)); \ + event = gst_harness_pull_upstream_event ((send_h)->send_rtp_h); \ + twcc_verify_packets_to_event (packets, event); \ +} G_STMT_END + +GST_START_TEST (test_twcc_1_bit_status_vector) +{ + SessionHarness *h0 = session_harness_new (); + SessionHarness *h1 = session_harness_new (); + + TWCCPacket packets[] = { + {10, 0 * GST_MSECOND, FALSE}, + {12, 12 * GST_MSECOND, FALSE}, + {14, 14 * GST_MSECOND, FALSE}, + {15, 15 * GST_MSECOND, FALSE}, + {17, 17 * GST_MSECOND, FALSE}, + {20, 20 * GST_MSECOND, FALSE}, + {21, 21 * GST_MSECOND, FALSE}, + {23, 23 * GST_MSECOND, TRUE}, + }; + + guint8 exp_fci[] = { + 0x00, 0x0a, /* base sequence number: 10 */ + 0x00, 0x0e, /* packet status count: 14 */ + 0x00, 0x00, 0x00, /* reference time: 0 */ + 0x00, /* feedback packet count: 0 */ + 0xab, 0x4d, /* packet chunk: 1 0 1 0 1 0 1 1 | 0 1 0 0 1 1 0 1 */ + 0x00, /* recv delta: +0:00:00.000000000 */ + 0x30, /* recv delta: +0:00:00.012000000 */ + 0x08, /* recv delta: +0:00:00.002000000 */ + 0x04, /* recv delta: +0:00:00.001000000 */ + 0x08, /* recv delta: +0:00:00.002000000 */ + 0x0c, /* recv delta: +0:00:00.003000000 */ + 0x04, /* recv delta: +0:00:00.001000000 */ + 0x08, /* recv delta: +0:00:00.002000000 */ + 0x00, 0x00, /* padding */ + }; + + /* check we get the expected fci */ + twcc_verify_packets_to_fci (h0, packets, exp_fci); + + /* and check we can parse this back to the original packets */ + twcc_verify_packets_to_packets (h1, h1, packets); + + session_harness_free (h0); + session_harness_free (h1); +} + +GST_END_TEST; + +GST_START_TEST (test_twcc_2_bit_status_vector) +{ + SessionHarness *h0 = session_harness_new (); + SessionHarness *h1 = session_harness_new (); + + TWCCPacket packets[] = { + {5, 5 * 64 * GST_MSECOND, FALSE}, + {7, 7 * 64 * GST_MSECOND, FALSE}, + {8, 8 * 64 * GST_MSECOND, FALSE}, + {11, 12 * 64 * GST_MSECOND, TRUE}, + }; + + guint8 exp_fci[] = { + 0x00, 0x05, /* base sequence number: 5 */ + 0x00, 0x07, /* packet status count: 7 */ + 0x00, 0x00, 0x05, /* reference time: 5 */ + 0x00, /* feedback packet count: 0 */ + 0xd2, 0x82, /* packet chunk: 1 1 0 1 0 0 1 0 | 1 0 0 0 0 0 1 0 */ + /* normal, missing, large, large, missing, missing, large */ + 0x00, /* recv delta: +0:00:00.000000000 */ + 0x02, 0x00, /* recv delta: +0:00:00.128000000 */ + 0x01, 0x00, /* recv delta: +0:00:00.064000000 */ + 0x04, 0x00, /* recv delta: +0:00:00.256000000 */ + 0x00, 0x00, 0x00, /* padding */ + }; + + twcc_verify_packets_to_fci (h0, packets, exp_fci); + + twcc_verify_packets_to_packets (h1, h1, packets); + + session_harness_free (h0); + session_harness_free (h1); +} + +GST_END_TEST; + +GST_START_TEST (test_twcc_various_gaps) +{ + SessionHarness *h = session_harness_new (); + guint16 seq = 1 + __i__; + + TWCCPacket packets[] = { + {0, 0 * 250 * GST_USECOND, FALSE}, + {seq, seq * 250 * GST_USECOND, TRUE}, + }; + + twcc_verify_packets_to_packets (h, h, packets); + + session_harness_free (h); +} + +GST_END_TEST; + +GST_START_TEST (test_twcc_negative_delta) +{ + SessionHarness *h0 = session_harness_new (); + SessionHarness *h1 = session_harness_new (); + + TWCCPacket packets[] = { + {0, 0 * 250 * GST_USECOND, FALSE}, + {1, 2 * 250 * GST_USECOND, FALSE}, + {2, 1 * 250 * GST_USECOND, FALSE}, + {3, 3 * 250 * GST_USECOND, TRUE}, + }; + + guint8 exp_fci[] = { + 0x00, 0x00, /* base sequence number: 0 */ + 0x00, 0x04, /* packet status count: 4 */ + 0x00, 0x00, 0x00, /* reference time: 0 */ + 0x00, /* feedback packet count: 0 */ + 0xd6, 0x40, /* packet chunk: 1 1 0 1 0 1 1 0 | 0 1 0 0 0 0 0 0 */ + 0x00, /* recv delta: +0:00:00.000000000 */ + 0x02, /* recv delta: +0:00:00.000500000 */ + 0xff, 0xff, /* recv delta: -0:00:00.000250000 */ + 0x02, /* recv delta: +0:00:00.000500000 */ + 0x00, /* padding */ + }; + + twcc_verify_packets_to_fci (h0, packets, exp_fci); + + twcc_verify_packets_to_packets (h1, h1, packets); + + session_harness_free (h0); + session_harness_free (h1); +} + +GST_END_TEST; + +GST_START_TEST (test_twcc_seqnum_wrap) +{ + SessionHarness *h0 = session_harness_new (); + SessionHarness *h1 = session_harness_new (); + + TWCCPacket packets[] = { + {65534, 0 * 250 * GST_USECOND, FALSE}, + {65535, 1 * 250 * GST_USECOND, FALSE}, + {0, 2 * 250 * GST_USECOND, FALSE}, + {1, 3 * 250 * GST_USECOND, TRUE}, + }; + + guint8 exp_fci[] = { + 0xff, 0xfe, /* base sequence number: 65534 */ + 0x00, 0x04, /* packet status count: 4 */ + 0x00, 0x00, 0x00, /* reference time: 0 */ + 0x00, /* feedback packet count: 0 */ + 0x20, 0x04, /* packet chunk: 0 0 1 0 0 0 0 0 | 0 0 0 0 0 1 0 0 */ + 0x00, /* recv delta: +0:00:00.000000000 */ + 0x01, /* recv delta: +0:00:00.000250000 */ + 0x01, /* recv delta: +0:00:00.000250000 */ + 0x01, /* recv delta: +0:00:00.000250000 */ + 0x00, 0x00, /* padding */ + }; + + twcc_verify_packets_to_fci (h0, packets, exp_fci); + + twcc_verify_packets_to_packets (h1, h1, packets); + + session_harness_free (h0); + session_harness_free (h1); +} + +GST_END_TEST; + + +GST_START_TEST (test_twcc_double_packets) +{ + SessionHarness *h = session_harness_new (); + + TWCCPacket packets0[] = { + {11, 11 * GST_MSECOND, FALSE}, + {12, 12 * GST_MSECOND, TRUE}, + }; + + TWCCPacket packets1[] = { + {13, 13 * GST_MSECOND, FALSE}, + {14, 14 * GST_MSECOND, FALSE}, + {15, 15 * GST_MSECOND, TRUE}, + }; + + guint8 exp_fci0[] = { + 0x00, 0x0b, /* base sequence number: 11 */ + 0x00, 0x02, /* packet status count: 14 */ + 0x00, 0x00, 0x00, /* reference time: 0 */ + 0x00, /* feedback packet count: 0 */ + 0x20, 0x02, /* packet chunk: 0 0 1 0 0 0 0 0 | 0 0 0 0 0 0 1 0 */ + 0x2c, 0x04, /* recv deltas */ + }; + + guint8 exp_fci1[] = { + 0x00, 0x0d, /* base sequence number: 13 */ + 0x00, 0x03, /* packet status count: 3 */ + 0x00, 0x00, 0x00, /* reference time: 0 */ + 0x01, /* feedback packet count: 1 */ + 0x20, 0x03, /* packet chunk: 0 0 1 0 0 0 0 0 | 0 0 0 0 0 0 1 1 */ + 0x34, 0x04, 0x04, /* recv deltas */ + 0x00, 0x00, 0x00, /* padding */ + }; + + twcc_verify_packets_to_fci (h, packets0, exp_fci0); + twcc_verify_packets_to_fci (h, packets1, exp_fci1); + + session_harness_free (h); +} + +GST_END_TEST; + +GST_START_TEST (test_twcc_huge_seqnum_gap) +{ + SessionHarness *h = session_harness_new (); + GstBuffer *buf; + + TWCCPacket packets[] = { + {9, 4 * 32 * GST_MSECOND, FALSE}, + {10, 5 * 32 * GST_MSECOND, FALSE}, + {30011, 6 * 32 * GST_MSECOND, FALSE}, + {30012, 7 * 32 * GST_MSECOND, FALSE}, + {30013, 8 * 32 * GST_MSECOND, TRUE}, + }; + + guint8 exp_fci0[] = { + 0x00, 0x09, /* base sequence number: 9 */ + 0x00, 0x02, /* packet status count: 2 */ + 0x00, 0x00, 0x02, /* reference time: 2 * 64ms */ + 0x00, /* feedback packet count: 0 */ + /* packet chunks: */ + 0x20, 0x02, /* 0 0 1 0 0 0 0 0 | 0 0 0 0 0 0 1 0 */ + 0x00, 0x80, /* recv deltas, +0, +32ms */ + }; + + guint8 exp_fci1[] = { + 0x75, 0x3b, /* base sequence number: 300011 */ + 0x00, 0x03, /* packet status count: 3 */ + 0x00, 0x00, 0x03, /* reference time: 3 * 64ms */ + 0x01, /* feedback packet count: 1 */ + /* packet chunks: */ + 0x20, 0x03, /* 0 0 1 0 0 0 0 0 | 0 0 0 0 0 0 1 1 */ + 0x00, 0x80, 0x80, /* recv deltas, +4, +32ms, +32ms */ + 0x00, 0x00, 0x00, /* padding */ + }; + + /* The sequence-number does a huge leap. Instead of encoding this as + a massive run-lenght sequence, like so */ +#if 0 + guint8 exp_fci[] = { + 0x00, 0x09, /* base sequence number: 9 */ + 0x75, 0x35, /* packet status count: 30005 */ + 0x00, 0x00, 0x02, /* reference time: 2 */ + 0x00, /* feedback packet count: 0 */ + /* packet chunks: */ + 0xb0, 0x00, /* 1 bit 2 there, 12 lost: 1 0 1 1 0 0 0 0 | 0 0 0 0 0 0 0 0 */ + 0x1f, 0xff, /* run-length: 8191 lost: 0 0 0 1 1 1 1 1 | 1 1 1 1 1 1 1 1 */ + 0x1f, 0xff, /* run-length: 8191 lost: 0 0 0 1 1 1 1 1 | 1 1 1 1 1 1 1 1 */ + 0x1f, 0xff, /* run-length: 8191 lost: 0 0 0 1 1 1 1 1 | 1 1 1 1 1 1 1 1 */ + 0x15, 0x27, /* run-length: 5415 lost: 0 0 0 1 0 1 0 1 | 0 0 1 0 0 1 1 1 */ + /* 12 + 8191 + 8191 + 8191 + 5415 = 30000 lost packets */ + 0xb8, 0x00, /* 1 bit 3 there : 1 0 1 1 1 0 0 0 | 0 0 0 0 0 0 0 0 */ + + 0x00, 0x80, 0x80, 0x80, 0x80, /* recv deltas */ + 0x00, 0x00, 0x00, /* padding */ + }; +#endif + + /* ...just send two feedback-packets, with + the second one starting from the new sequence-number. */ + twcc_push_packets (h, packets); + + session_harness_produce_rtcp (h, 1); + + buf = session_harness_pull_twcc_rtcp (h); + twcc_verify_fci (buf, exp_fci0); + gst_buffer_unref (buf); + + buf = session_harness_pull_twcc_rtcp (h); + twcc_verify_fci (buf, exp_fci1); + gst_buffer_unref (buf); + + session_harness_free (h); +} + +GST_END_TEST; + +GST_START_TEST (test_twcc_duplicate_seqnums) +{ + SessionHarness *h = session_harness_new (); + GstBuffer *buf; + + /* A duplicate seqnum can be interpreted as a gap of 65536 packets. + Whatever the cause might be, we will follow the behavior of reordered + packets, and drop it */ + TWCCPacket packets[] = { + {1, 4 * 32 * GST_MSECOND, FALSE}, + {2, 5 * 32 * GST_MSECOND, FALSE}, + {2, 6 * 32 * GST_MSECOND, FALSE}, + {3, 7 * 32 * GST_MSECOND, TRUE}, + }; + + guint8 exp_fci0[] = { + 0x00, 0x01, /* base sequence number: 1 */ + 0x00, 0x02, /* packet status count: 2 */ + 0x00, 0x00, 0x02, /* reference time: 2 * 64ms */ + 0x00, /* feedback packet count: 0 */ + /* packet chunks: */ + 0x20, 0x02, /* 0 0 1 0 0 0 0 0 | 0 0 0 0 0 0 1 0 */ + 0x00, 0x80, /* recv deltas: +0, +32ms */ + }; + + guint8 exp_fci1[] = { + 0x00, 0x03, /* base sequence number: 3 */ + 0x00, 0x01, /* packet status count: 1 */ + 0x00, 0x00, 0x03, /* reference time: 3 * 64ms */ + 0x01, /* feedback packet count: 1 */ + /* packet chunks: */ + 0x20, 0x01, /* 0 0 1 0 0 0 0 0 | 0 0 0 0 0 0 0 1 */ + 0x80, /* recv deltas: +32ms */ + 0x00, /* padding */ + }; + + twcc_push_packets (h, packets); + + session_harness_produce_rtcp (h, 1); + + buf = session_harness_pull_twcc_rtcp (h); + twcc_verify_fci (buf, exp_fci0); + gst_buffer_unref (buf); + + buf = session_harness_pull_twcc_rtcp (h); + twcc_verify_fci (buf, exp_fci1); + gst_buffer_unref (buf); + + session_harness_free (h); +} + +GST_END_TEST; + +GST_START_TEST (test_twcc_multiple_markers) +{ + SessionHarness *h = session_harness_new (); + GstBuffer *buf; + + /* for this test, notice how the first recv-delta should relate back to + the reference-time, which is 0 in this case. The packets are incrementing + in timestamps equal to the smallest unit for TWCC (250 microseconds) */ + TWCCPacket packets[] = { + {1, 1 * 250 * GST_USECOND, FALSE}, + {2, 2 * 250 * GST_USECOND, FALSE}, + {3, 3 * 250 * GST_USECOND, TRUE}, + {4, 4 * 250 * GST_USECOND, FALSE}, + {5, 5 * 250 * GST_USECOND, TRUE}, + {6, 6 * 250 * GST_USECOND, FALSE}, + {7, 7 * 250 * GST_USECOND, FALSE}, + {8, 8 * 250 * GST_USECOND, FALSE}, + {9, 9 * 250 * GST_USECOND, TRUE}, + }; + + guint8 exp_fci0[] = { + 0x00, 0x01, /* base sequence number: 1 */ + 0x00, 0x03, /* packet status count: 3 */ + 0x00, 0x00, 0x00, /* reference time: 0 */ + 0x00, /* feedback packet count: 0 */ + /* packet chunks: */ + 0x20, 0x03, /* 0 0 1 0 0 0 0 0 | 0 0 0 0 0 0 1 1 */ + 0x01, 0x01, 0x01, /* recv deltas, +1, +1, +1 */ + 0x00, 0x00, 0x00, /* padding */ + }; + + guint8 exp_fci1[] = { + 0x00, 0x04, /* base sequence number: 4 */ + 0x00, 0x02, /* packet status count: 2 */ + 0x00, 0x00, 0x00, /* reference time: 0 */ + 0x01, /* feedback packet count: 1 */ + /* packet chunks: */ + 0x20, 0x02, /* 0 0 1 0 0 0 0 0 | 0 0 0 0 0 0 1 0 */ + 0x04, 0x01, /* recv deltas, +4, +1, +1 */ + }; + + guint8 exp_fci2[] = { + 0x00, 0x06, /* base sequence number: 6 */ + 0x00, 0x04, /* packet status count: 4 */ + 0x00, 0x00, 0x00, /* reference time: 0 */ + 0x02, /* feedback packet count: 2 */ + /* packet chunks: */ + 0x20, 0x04, /* 0 0 1 0 0 0 0 0 | 0 0 0 0 0 1 0 0 */ + 0x06, 0x01, 0x01, 0x01, /* recv deltas, +6, +1, +1, +1 */ + 0x00, 0x00, + }; + + twcc_push_packets (h, packets); + + /* we should get 1 SR/RR, and then 3x TWCC packets */ + session_harness_produce_rtcp (h, 1); + + buf = session_harness_pull_twcc_rtcp (h); + twcc_verify_fci (buf, exp_fci0); + gst_buffer_unref (buf); + + buf = session_harness_pull_twcc_rtcp (h); + twcc_verify_fci (buf, exp_fci1); + gst_buffer_unref (buf); + + buf = session_harness_pull_twcc_rtcp (h); + twcc_verify_fci (buf, exp_fci2); + gst_buffer_unref (buf); + + session_harness_free (h); +} + +GST_END_TEST; + +GST_START_TEST (test_twcc_no_marker_and_gaps) +{ + SessionHarness *h = session_harness_new (); + guint i; + + g_object_set (h->internal_session, "probation", 1, NULL); + + /* Push packets with gaps and no marker bit. This should not prevent + the feedback packets from being sent at all. */ + for (i = 0; i < 80; i += 10) { + TWCCPacket packets[] = { {i, i * 250 * GST_USECOND, FALSE} + }; + twcc_push_packets (h, packets); + } + + /* verify we did receive some feedback for these packets */ + session_harness_produce_rtcp (h, 1); + for (i = 0; i < 2; i++) { + gst_buffer_unref (session_harness_pull_twcc_rtcp (h)); + } + + session_harness_free (h); +} + +GST_END_TEST; + +static GstBuffer * +generate_twcc_feedback_rtcp (guint8 * fci_data, guint16 fci_length) +{ + GstRTCPPacket packet; + GstRTCPBuffer rtcp = GST_RTCP_BUFFER_INIT; + GstBuffer *buffer = gst_rtcp_buffer_new (1000); + guint8 *fci; + + fail_unless (gst_rtcp_buffer_map (buffer, GST_MAP_READWRITE, &rtcp)); + fail_unless (gst_rtcp_buffer_add_packet (&rtcp, GST_RTCP_TYPE_RTPFB, + &packet)); + gst_rtcp_packet_fb_set_type (&packet, GST_RTCP_RTPFB_TYPE_TWCC); + gst_rtcp_packet_fb_set_fci_length (&packet, fci_length); + fci = gst_rtcp_packet_fb_get_fci (&packet); + memcpy (fci, fci_data, fci_length); + gst_rtcp_packet_fb_set_sender_ssrc (&packet, TEST_BUF_SSRC); + gst_rtcp_packet_fb_set_media_ssrc (&packet, 0); + gst_rtcp_buffer_unmap (&rtcp); + + return buffer; +} + +GST_START_TEST (test_twcc_bad_rtcp) +{ + SessionHarness *h = session_harness_new (); + guint i; + GstBuffer *buf; + GstEvent *event; + GValueArray *packets_array; + + guint8 fci[] = { + 0xff, 0xff, /* base sequence number: max */ + 0xff, 0xff, /* packet status count: max */ + 0xff, 0xff, 0xff, /* reference time: max */ + 0xff, /* feedback packet count: max */ + 0x3f, 0xff, /* packet chunk: run-length, max */ + 0x00, /* only 1 recv-delta */ + }; + + buf = generate_twcc_feedback_rtcp (fci, sizeof (fci)); + session_harness_recv_rtcp (h, buf); + + /* two reconfigure events */ + for (i = 0; i < 2; i++) + gst_event_unref (gst_harness_pull_upstream_event (h->send_rtp_h)); + + event = gst_harness_pull_upstream_event (h->send_rtp_h); + packets_array = + g_value_get_boxed (gst_structure_get_value (gst_event_get_structure + (event), "packets")); + + /* this ends up with 0 packets, due to completely invalid data */ + fail_unless_equals_int (packets_array->n_values, 0); + + gst_event_unref (event); + session_harness_free (h); +} + +GST_END_TEST; + +GST_START_TEST (test_twcc_delta_ts_rounding) +{ + SessionHarness *h = session_harness_new (); + guint i, j = 0; + GstEvent *event; + GstBuffer *buf; + GValueArray *packets_array; + + TWCCPacket packets[] = { + {2002, 9 * GST_SECOND + 366458177, FALSE} + , + {2003, 9 * GST_SECOND + 366497068, FALSE} + , + {2017, 9 * GST_SECOND + 366929482, FALSE} + , + {2019, 9 * GST_SECOND + 391595309, FALSE} + , + {2020, 9 * GST_SECOND + 426883507, FALSE} + , + {2025, 9 * GST_SECOND + 427021638, TRUE} + , + }; + + TWCCPacket exp_packets[] = { + {2002, 9 * GST_SECOND + 366250000, FALSE} + , + {2003, 9 * GST_SECOND + 366250000, FALSE} + , + {2017, 9 * GST_SECOND + 366750000, FALSE} + , + {2019, 9 * GST_SECOND + 391500000, FALSE} + , + {2020, 9 * GST_SECOND + 426750000, FALSE} + , + {2025, 9 * GST_SECOND + 427000000, TRUE} + , + }; + + guint8 exp_fci[] = { + 0x07, 0xd2, /* base sequence number: 2002 */ + 0x00, 0x18, /* packet status count: 24 */ + 0x00, 0x00, 0x92, /* reference time: 0:00:09.344000000 */ + 0x00, /* feedback packet count: 0 */ + 0xb0, 0x00, /* packet chunk: 1 0 1 1 0 0 0 0 | 0 0 0 0 0 0 0 0 */ + 0x96, 0x10, /* packet chunk: 1 0 0 1 0 1 1 0 | 0 0 0 1 0 0 0 0 */ + 0x59, /* recv delta: 0:00:00.022250000 abs: 0:00:09.366250000 */ + 0x00, /* recv delta: 0:00:00.000000000 abs: 0:00:09.366250000 */ + 0x02, /* recv delta: 0:00:00.000500000 abs: 0:00:09.366750000 */ + 0x63, /* recv delta: 0:00:00.024750000 abs: 0:00:09.391500000 */ + 0x8d, /* recv delta: 0:00:00.035250000 abs: 0:00:09.426750000 */ + 0x01, /* recv delta: 0:00:00.000250000 abs: 0:00:09.427000000 */ + 0x00, 0x00, /* padding */ + }; + + twcc_push_packets (h, packets); + session_harness_produce_rtcp (h, 1); + buf = session_harness_pull_twcc_rtcp (h); + twcc_verify_fci (buf, exp_fci); + + session_harness_recv_rtcp (h, buf); + for (i = 0; i < 2; i++) + gst_event_unref (gst_harness_pull_upstream_event (h->send_rtp_h)); + event = gst_harness_pull_upstream_event (h->send_rtp_h); + + packets_array = + g_value_get_boxed (gst_structure_get_value (gst_event_get_structure + (event), "packets")); + for (i = 0; i < packets_array->n_values; i++) { + TWCCPacket *twcc_pkt; + const GstStructure *pkt_s = + gst_value_get_structure (g_value_array_get_nth (packets_array, i)); + GstClockTime ts; + guint seqnum; + gboolean lost; + fail_unless (gst_structure_get_boolean (pkt_s, "lost", &lost)); + if (lost) + continue; + twcc_pkt = &exp_packets[j++]; + + fail_unless (gst_structure_get_clock_time (pkt_s, "remote-ts", &ts)); + fail_unless (gst_structure_get_uint (pkt_s, "seqnum", &seqnum)); + + fail_unless_equals_int (twcc_pkt->seqnum, seqnum); + fail_unless_equals_clocktime (twcc_pkt->timestamp, ts); + } + + gst_event_unref (event); + session_harness_free (h); +} + +GST_END_TEST; + +GST_START_TEST (test_twcc_double_gap) +{ + SessionHarness *h0 = session_harness_new (); + SessionHarness *h1 = session_harness_new (); + + TWCCPacket packets[] = { + {1202, 5 * GST_SECOND + 717000000, FALSE} + , + {1215, 5 * GST_SECOND + 760250000, FALSE} + , + {1221, 5 * GST_SECOND + 775500000, TRUE} + , + }; + + guint8 exp_fci[] = { + 0x04, 0xb2, /* base sequence number: 1202 */ + 0x00, 0x14, /* packet status count: 20 */ + 0x00, 0x00, 0x59, /* reference time: 0:00:05.696000000 */ + 0x00, /* feedback packet count: 0 */ + 0xa0, 0x01, /* packet chunk: 1 0 1 0 0 0 0 0 | 0 0 0 0 0 0 0 1 */ + 0x81, 0x00, /* packet chunk: 1 0 0 0 0 0 0 1 | 0 0 0 0 0 0 0 0 */ + 0x54, /* recv delta: +0:00:00.021000000 */ + 0xad, /* recv delta: +0:00:00.043250000 */ + 0x3d, /* recv delta: +0:00:00.015250000 */ + 0x00, /* padding */ + }; + + twcc_verify_packets_to_fci (h0, packets, exp_fci); + + twcc_verify_packets_to_packets (h1, h1, packets); + + session_harness_free (h0); + session_harness_free (h1); +} + +GST_END_TEST; + +GST_START_TEST (test_twcc_recv_packets_reordered) +{ + SessionHarness *h = session_harness_new (); + GstBuffer *buf; + + /* a reordered seqence, with marker-bits for #3 and #4 */ + TWCCPacket packets[] = { + {1, 1 * 250 * GST_USECOND, FALSE} + , + {3, 2 * 250 * GST_USECOND, TRUE} + , + {2, 3 * 250 * GST_USECOND, FALSE} + , + {4, 4 * 250 * GST_USECOND, TRUE} + , + }; + + /* first we expect #2 to be reported lost */ + guint8 exp_fci0[] = { + 0x00, 0x01, /* base sequence number: 1 */ + 0x00, 0x03, /* packet status count: 3 */ + 0x00, 0x00, 0x00, /* reference time: 0 */ + 0x00, /* feedback packet count: 0 */ + /* packet chunks: */ + 0xa8, 0x00, /* 1 0 1 0 1 0 0 0 | 0 0 0 0 0 0 0 0 */ + 0x01, 0x01, /* recv deltas, +1, +1 */ + }; + + /* and then when 2 actually arrives, it is already reported lost, + so we will not re-report it, but drop it */ + guint8 exp_fci1[] = { + 0x00, 0x04, /* base sequence number: 4 */ + 0x00, 0x01, /* packet status count: 1 */ + 0x00, 0x00, 0x00, /* reference time: 0 */ + 0x01, /* feedback packet count: 1 */ + /* packet chunks: */ + 0x20, 0x01, /* 0 0 1 0 0 0 0 0 | 0 0 0 0 0 0 0 1 */ + 0x04, /* recv deltas, +4 */ + 0x00, /* padding */ + }; + + twcc_push_packets (h, packets); + + session_harness_produce_rtcp (h, 1); + + buf = session_harness_pull_twcc_rtcp (h); + twcc_verify_fci (buf, exp_fci0); + gst_buffer_unref (buf); + + buf = session_harness_pull_twcc_rtcp (h); + twcc_verify_fci (buf, exp_fci1); + gst_buffer_unref (buf); + + session_harness_free (h); +} + +GST_END_TEST; + +GST_START_TEST (test_twcc_recv_rtcp_reordered) +{ + SessionHarness *send_h = session_harness_new (); + SessionHarness *recv_h = session_harness_new (); + GstBuffer *buf[4]; + GstEvent *event; + guint i; + + /* three frames, two packets each */ + TWCCPacket packets[] = { + {1, 1 * GST_SECOND, FALSE} + , + {2, 2 * GST_SECOND, TRUE} + , + {3, 3 * GST_SECOND, FALSE} + , + {4, 4 * GST_SECOND, TRUE} + , + {5, 5 * GST_SECOND, FALSE} + , + {6, 6 * GST_SECOND, TRUE} + , + {7, 7 * GST_SECOND, FALSE} + , + {8, 8 * GST_SECOND, TRUE} + , + }; + +/* + TWCCPacket expected_packets0[] = { + {1, 1 * 250 * GST_USECOND, FALSE}, + {2, 2 * 250 * GST_USECOND, TRUE}, + }; +*/ + twcc_push_packets (recv_h, packets); + + session_harness_produce_rtcp (recv_h, 1); + + buf[0] = session_harness_pull_twcc_rtcp (recv_h); + buf[1] = session_harness_pull_twcc_rtcp (recv_h); + buf[2] = session_harness_pull_twcc_rtcp (recv_h); + buf[3] = session_harness_pull_twcc_rtcp (recv_h); + + /* reorder the twcc-feedback */ + session_harness_recv_rtcp (send_h, buf[0]); + session_harness_recv_rtcp (send_h, buf[2]); + session_harness_recv_rtcp (send_h, buf[1]); + session_harness_recv_rtcp (send_h, buf[3]); + + for (i = 0; i < 2; i++) + gst_event_unref (gst_harness_pull_upstream_event (send_h->send_rtp_h)); + + event = gst_harness_pull_upstream_event (send_h->send_rtp_h); + twcc_verify_packets_to_event (&packets[0 * 2], event); + + event = gst_harness_pull_upstream_event (send_h->send_rtp_h); + twcc_verify_packets_to_event (&packets[2 * 2], event); + + event = gst_harness_pull_upstream_event (send_h->send_rtp_h); + twcc_verify_packets_to_event (&packets[1 * 2], event); + + event = gst_harness_pull_upstream_event (send_h->send_rtp_h); + twcc_verify_packets_to_event (&packets[3 * 2], event); + + session_harness_free (send_h); + session_harness_free (recv_h); +} + +GST_END_TEST; + +GST_START_TEST (test_twcc_send_and_recv) +{ + SessionHarness *h_send = session_harness_new (); + SessionHarness *h_recv = session_harness_new (); + guint frame; + const guint num_frames = 2; + const guint num_slices = 15; + + /* enable twcc */ + session_harness_set_twcc_recv_ext_id (h_recv, TEST_TWCC_EXT_ID); + session_harness_set_twcc_send_ext_id (h_send, TEST_TWCC_EXT_ID); + + for (frame = 0; frame < num_frames; frame++) { + GstBuffer *buf; + for (guint slice = 0; slice < num_slices; slice++) { + GstFlowReturn res; + guint seq = frame * num_slices + slice; + + /* from payloder to rtpbin */ + buf = generate_twcc_send_buffer (seq, slice == num_slices - 1); + res = session_harness_send_rtp (h_send, buf); + fail_unless_equals_int (GST_FLOW_OK, res); + + /* get the buffer ready for the network */ + buf = session_harness_pull_send_rtp (h_send); + + /* buffer arrives at the receiver */ + res = session_harness_recv_rtp (h_recv, buf); + fail_unless_equals_int (GST_FLOW_OK, res); + } + + /* receiver sends a TWCC packet to the sender */ + session_harness_produce_rtcp (h_recv, 1); + buf = session_harness_pull_twcc_rtcp (h_recv); + /* sender receives the TWCC packet */ + session_harness_recv_rtcp (h_send, buf); + + if (frame > 0) { + GstStructure *twcc_stats; + guint bitrate_sent; + guint bitrate_recv; + guint packets_sent; + guint packets_recv; + gdouble packet_loss_pct; + GstClockTimeDiff avg_delta_of_delta; + twcc_stats = session_harness_get_last_twcc_stats (h_send); + fail_unless (gst_structure_get (twcc_stats, + "bitrate-sent", G_TYPE_UINT, &bitrate_sent, + "bitrate-recv", G_TYPE_UINT, &bitrate_recv, + "packets-sent", G_TYPE_UINT, &packets_sent, + "packets-recv", G_TYPE_UINT, &packets_recv, + "packet-loss-pct", G_TYPE_DOUBLE, &packet_loss_pct, + "avg-delta-of-delta", G_TYPE_INT64, &avg_delta_of_delta, NULL)); + fail_unless_equals_int (TEST_BUF_BPS, bitrate_sent); + fail_unless_equals_int (TEST_BUF_BPS, bitrate_recv); + fail_unless_equals_int (num_slices, packets_sent); + fail_unless_equals_int (num_slices, packets_recv); + fail_unless_equals_float (0.0f, packet_loss_pct); + fail_unless_equals_int64 (0, avg_delta_of_delta); + gst_structure_free (twcc_stats); + } + } + + session_harness_free (h_send); + session_harness_free (h_recv); +} + +GST_END_TEST; + static Suite * rtpsession_suite (void) { @@ -2503,6 +3649,26 @@ rtpsession_suite (void) tcase_add_test (tc_chain, test_packet_rate); tcase_add_test (tc_chain, test_stepped_packet_rate); + /* twcc */ + tcase_add_loop_test (tc_chain, test_twcc_header_and_run_length, + 0, G_N_ELEMENTS (twcc_header_and_run_lenght_test_data)); + tcase_add_test (tc_chain, test_twcc_1_bit_status_vector); + tcase_add_test (tc_chain, test_twcc_2_bit_status_vector); + tcase_add_loop_test (tc_chain, test_twcc_various_gaps, 0, 50); + tcase_add_test (tc_chain, test_twcc_negative_delta); + tcase_add_test (tc_chain, test_twcc_seqnum_wrap); + tcase_add_test (tc_chain, test_twcc_huge_seqnum_gap); + tcase_add_test (tc_chain, test_twcc_double_packets); + tcase_add_test (tc_chain, test_twcc_duplicate_seqnums); + tcase_add_test (tc_chain, test_twcc_multiple_markers); + tcase_add_test (tc_chain, test_twcc_no_marker_and_gaps); + tcase_add_test (tc_chain, test_twcc_bad_rtcp); + tcase_add_test (tc_chain, test_twcc_delta_ts_rounding); + tcase_add_test (tc_chain, test_twcc_double_gap); + tcase_add_test (tc_chain, test_twcc_recv_packets_reordered); + tcase_add_test (tc_chain, test_twcc_recv_rtcp_reordered); + tcase_add_test (tc_chain, test_twcc_send_and_recv); + return s; }