mirror of
https://gitlab.freedesktop.org/gstreamer/gstreamer.git
synced 2024-11-27 04:01:08 +00:00
rtpmanager: Google Transport-Wide Congestion Control RTP Extension
Generating and parsing the RTCP-messages described in: https://tools.ietf.org/html/draft-holmer-rmcat-transport-wide-cc-extensions-01
This commit is contained in:
parent
9ba9837058
commit
1df706448c
10 changed files with 2697 additions and 26 deletions
|
@ -239,6 +239,7 @@ enum
|
|||
PROP_MAX_DROPOUT_TIME,
|
||||
PROP_MAX_MISORDER_TIME,
|
||||
PROP_STATS,
|
||||
PROP_TWCC_STATS,
|
||||
PROP_RTP_PROFILE,
|
||||
PROP_NTP_TIME_SOURCE,
|
||||
PROP_RTCP_SYNC_SEND_TIME
|
||||
|
@ -277,6 +278,8 @@ struct _GstRtpSessionPrivate
|
|||
guint recv_rtx_req_count;
|
||||
guint sent_rtx_req_count;
|
||||
|
||||
GstStructure *last_twcc_stats;
|
||||
|
||||
/*
|
||||
* This is the list of processed packets in the receive path when upstream
|
||||
* pushed a buffer list.
|
||||
|
@ -302,6 +305,8 @@ static GstClockTime gst_rtp_session_request_time (RTPSession * session,
|
|||
gpointer user_data);
|
||||
static void gst_rtp_session_notify_nack (RTPSession * sess,
|
||||
guint16 seqnum, guint16 blp, guint32 ssrc, gpointer user_data);
|
||||
static void gst_rtp_session_notify_twcc (RTPSession * sess,
|
||||
GstStructure * twcc_packets, GstStructure * twcc_stats, gpointer user_data);
|
||||
static void gst_rtp_session_reconfigure (RTPSession * sess, gpointer user_data);
|
||||
static void gst_rtp_session_notify_early_rtcp (RTPSession * sess,
|
||||
gpointer user_data);
|
||||
|
@ -326,6 +331,7 @@ static RTPSessionCallbacks callbacks = {
|
|||
gst_rtp_session_request_key_unit,
|
||||
gst_rtp_session_request_time,
|
||||
gst_rtp_session_notify_nack,
|
||||
gst_rtp_session_notify_twcc,
|
||||
gst_rtp_session_reconfigure,
|
||||
gst_rtp_session_notify_early_rtcp
|
||||
};
|
||||
|
@ -754,6 +760,30 @@ gst_rtp_session_class_init (GstRtpSessionClass * klass)
|
|||
"Various statistics", GST_TYPE_STRUCTURE,
|
||||
G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
|
||||
|
||||
/**
|
||||
* GstRtpSession::twcc-stats:
|
||||
*
|
||||
* Various statistics derived from TWCC. This property returns a GstStructure
|
||||
* with name RTPTWCCStats with the following fields:
|
||||
*
|
||||
* "bitrate-sent" G_TYPE_UINT The actual sent bitrate of TWCC packets
|
||||
* "bitrate-recv" G_TYPE_UINT The estimated bitrate for the receiver.
|
||||
* "packets-sent" G_TYPE_UINT Number of packets sent
|
||||
* "packets-recv" G_TYPE_UINT Number of packets reported recevied
|
||||
* "packet-loss-pct" G_TYPE_DOUBLE Packetloss percentage, based on
|
||||
packets reported as lost from the recevier.
|
||||
* "avg-delta-of-delta", G_TYPE_INT64 In nanoseconds, a moving window
|
||||
average of the difference in inter-packet spacing between
|
||||
sender and receiver. A sudden increase in this number can indicate
|
||||
network congestion.
|
||||
*
|
||||
* Since: 1.18
|
||||
*/
|
||||
g_object_class_install_property (gobject_class, PROP_TWCC_STATS,
|
||||
g_param_spec_boxed ("twcc-stats", "TWCC Statistics",
|
||||
"Various statistics from TWCC", GST_TYPE_STRUCTURE,
|
||||
G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
|
||||
|
||||
g_object_class_install_property (gobject_class, PROP_RTP_PROFILE,
|
||||
g_param_spec_enum ("rtp-profile", "RTP Profile",
|
||||
"RTP profile to use", GST_TYPE_RTP_PROFILE, DEFAULT_RTP_PROFILE,
|
||||
|
@ -880,6 +910,8 @@ gst_rtp_session_finalize (GObject * object)
|
|||
g_cond_clear (&rtpsession->priv->cond);
|
||||
g_object_unref (rtpsession->priv->sysclock);
|
||||
g_object_unref (rtpsession->priv->session);
|
||||
if (rtpsession->priv->last_twcc_stats)
|
||||
gst_structure_free (rtpsession->priv->last_twcc_stats);
|
||||
|
||||
G_OBJECT_CLASS (parent_class)->finalize (object);
|
||||
}
|
||||
|
@ -1004,6 +1036,11 @@ gst_rtp_session_get_property (GObject * object, guint prop_id,
|
|||
case PROP_STATS:
|
||||
g_value_take_boxed (value, gst_rtp_session_create_stats (rtpsession));
|
||||
break;
|
||||
case PROP_TWCC_STATS:
|
||||
GST_RTP_SESSION_LOCK (rtpsession);
|
||||
g_value_set_boxed (value, priv->last_twcc_stats);
|
||||
GST_RTP_SESSION_UNLOCK (rtpsession);
|
||||
break;
|
||||
case PROP_RTP_PROFILE:
|
||||
g_object_get_property (G_OBJECT (priv->session), "rtp-profile", value);
|
||||
break;
|
||||
|
@ -1563,12 +1600,15 @@ gst_rtp_session_cache_caps (GstRtpSession * rtpsession, GstCaps * caps)
|
|||
GST_DEBUG_OBJECT (rtpsession, "parsing caps");
|
||||
|
||||
s = gst_caps_get_structure (caps, 0);
|
||||
|
||||
if (!gst_structure_get_int (s, "payload", &payload))
|
||||
return;
|
||||
|
||||
if (g_hash_table_lookup (priv->ptmap, GINT_TO_POINTER (payload)))
|
||||
return;
|
||||
|
||||
rtp_session_update_recv_caps_structure (rtpsession->priv->session, s);
|
||||
|
||||
g_hash_table_insert (priv->ptmap, GINT_TO_POINTER (payload),
|
||||
gst_caps_ref (caps));
|
||||
}
|
||||
|
@ -2801,6 +2841,31 @@ gst_rtp_session_notify_nack (RTPSession * sess, guint16 seqnum,
|
|||
}
|
||||
}
|
||||
|
||||
static void
|
||||
gst_rtp_session_notify_twcc (RTPSession * sess,
|
||||
GstStructure * twcc_packets, GstStructure * twcc_stats, gpointer user_data)
|
||||
{
|
||||
GstRtpSession *rtpsession = GST_RTP_SESSION (user_data);
|
||||
GstEvent *event;
|
||||
GstPad *send_rtp_sink;
|
||||
|
||||
GST_RTP_SESSION_LOCK (rtpsession);
|
||||
if ((send_rtp_sink = rtpsession->send_rtp_sink))
|
||||
gst_object_ref (send_rtp_sink);
|
||||
if (rtpsession->priv->last_twcc_stats)
|
||||
gst_structure_free (rtpsession->priv->last_twcc_stats);
|
||||
rtpsession->priv->last_twcc_stats = twcc_stats;
|
||||
GST_RTP_SESSION_UNLOCK (rtpsession);
|
||||
|
||||
if (send_rtp_sink) {
|
||||
event = gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM, twcc_packets);
|
||||
gst_pad_push_event (send_rtp_sink, event);
|
||||
gst_object_unref (send_rtp_sink);
|
||||
}
|
||||
|
||||
g_object_notify (G_OBJECT (rtpsession), "twcc-stats");
|
||||
}
|
||||
|
||||
static void
|
||||
gst_rtp_session_reconfigure (RTPSession * sess, gpointer user_data)
|
||||
{
|
||||
|
|
|
@ -14,6 +14,7 @@ rtpmanager_sources = [
|
|||
'rtpsource.c',
|
||||
'rtpstats.c',
|
||||
'rtptimerqueue.c',
|
||||
'rtptwcc.c',
|
||||
'gstrtpsession.c',
|
||||
'gstrtpfunnel.c',
|
||||
]
|
||||
|
|
|
@ -22,6 +22,7 @@
|
|||
#define GLIB_DISABLE_DEPRECATION_WARNINGS
|
||||
|
||||
#include <string.h>
|
||||
#include <stdlib.h>
|
||||
|
||||
#include <gst/rtp/gstrtpbuffer.h>
|
||||
#include <gst/rtp/gstrtcpbuffer.h>
|
||||
|
@ -30,7 +31,7 @@
|
|||
|
||||
#include "rtpsession.h"
|
||||
|
||||
GST_DEBUG_CATEGORY_STATIC (rtp_session_debug);
|
||||
GST_DEBUG_CATEGORY (rtp_session_debug);
|
||||
#define GST_CAT_DEFAULT rtp_session_debug
|
||||
|
||||
/* signals and args */
|
||||
|
@ -115,6 +116,8 @@ enum
|
|||
(avg) = ((val) + (15 * (avg))) >> 4;
|
||||
|
||||
|
||||
#define TWCC_EXTMAP_STR "http://www.ietf.org/id/draft-holmer-rmcat-transport-wide-cc-extensions-01"
|
||||
|
||||
/* GObject vmethods */
|
||||
static void rtp_session_finalize (GObject * object);
|
||||
static void rtp_session_set_property (GObject * object, guint prop_id,
|
||||
|
@ -706,6 +709,9 @@ rtp_session_init (RTPSession * sess)
|
|||
sess->timestamp_sender_reports = !DEFAULT_RTCP_DISABLE_SR_TIMESTAMP;
|
||||
|
||||
sess->is_doing_ptp = TRUE;
|
||||
|
||||
sess->twcc = rtp_twcc_manager_new (sess->mtu);
|
||||
sess->twcc_stats = rtp_twcc_stats_new ();
|
||||
}
|
||||
|
||||
static void
|
||||
|
@ -727,6 +733,9 @@ rtp_session_finalize (GObject * object)
|
|||
for (i = 0; i < 1; i++)
|
||||
g_hash_table_destroy (sess->ssrcs[i]);
|
||||
|
||||
rtp_twcc_manager_free (sess->twcc);
|
||||
rtp_twcc_stats_free (sess->twcc_stats);
|
||||
|
||||
g_mutex_clear (&sess->lock);
|
||||
|
||||
G_OBJECT_CLASS (rtp_session_parent_class)->finalize (object);
|
||||
|
@ -847,6 +856,7 @@ rtp_session_set_property (GObject * object, guint prop_id,
|
|||
break;
|
||||
case PROP_RTCP_MTU:
|
||||
sess->mtu = g_value_get_uint (value);
|
||||
rtp_twcc_manager_set_mtu (sess->twcc, sess->mtu);
|
||||
break;
|
||||
case PROP_SDES:
|
||||
rtp_session_set_sdes_struct (sess, g_value_get_boxed (value));
|
||||
|
@ -1206,6 +1216,10 @@ rtp_session_set_callbacks (RTPSession * sess, RTPSessionCallbacks * callbacks,
|
|||
sess->callbacks.notify_nack = callbacks->notify_nack;
|
||||
sess->notify_nack_user_data = user_data;
|
||||
}
|
||||
if (callbacks->notify_twcc) {
|
||||
sess->callbacks.notify_twcc = callbacks->notify_twcc;
|
||||
sess->notify_twcc_user_data = user_data;
|
||||
}
|
||||
if (callbacks->reconfigure) {
|
||||
sess->callbacks.reconfigure = callbacks->reconfigure;
|
||||
sess->reconfigure_user_data = user_data;
|
||||
|
@ -2067,10 +2081,15 @@ update_packet (GstBuffer ** buffer, guint idx, RTPPacketInfo * pinfo)
|
|||
pinfo->seqnum = gst_rtp_buffer_get_seq (&rtp);
|
||||
pinfo->pt = gst_rtp_buffer_get_payload_type (&rtp);
|
||||
pinfo->rtptime = gst_rtp_buffer_get_timestamp (&rtp);
|
||||
pinfo->marker = gst_rtp_buffer_get_marker (&rtp);
|
||||
/* copy available csrc */
|
||||
pinfo->csrc_count = gst_rtp_buffer_get_csrc_count (&rtp);
|
||||
for (i = 0; i < pinfo->csrc_count; i++)
|
||||
pinfo->csrcs[i] = gst_rtp_buffer_get_csrc (&rtp, i);
|
||||
|
||||
/* RTP header extensions */
|
||||
pinfo->header_ext = gst_rtp_buffer_get_extension_bytes (&rtp,
|
||||
&pinfo->header_ext_bit_pattern);
|
||||
}
|
||||
gst_rtp_buffer_unmap (&rtp);
|
||||
}
|
||||
|
@ -2119,6 +2138,7 @@ update_packet_info (RTPSession * sess, RTPPacketInfo * pinfo,
|
|||
pinfo->bytes = 0;
|
||||
pinfo->payload_len = 0;
|
||||
pinfo->packets = 0;
|
||||
pinfo->marker = FALSE;
|
||||
|
||||
if (is_list) {
|
||||
GstBufferList *list = GST_BUFFER_LIST_CAST (data);
|
||||
|
@ -2129,6 +2149,7 @@ update_packet_info (RTPSession * sess, RTPPacketInfo * pinfo,
|
|||
GstBuffer *buffer = GST_BUFFER_CAST (data);
|
||||
res = update_packet (&buffer, 0, pinfo);
|
||||
}
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
|
@ -2141,6 +2162,23 @@ clean_packet_info (RTPPacketInfo * pinfo)
|
|||
gst_mini_object_unref (pinfo->data);
|
||||
pinfo->data = NULL;
|
||||
}
|
||||
if (pinfo->header_ext)
|
||||
g_bytes_unref (pinfo->header_ext);
|
||||
}
|
||||
|
||||
static gint32
|
||||
packet_info_get_twcc_seqnum (RTPPacketInfo * pinfo, guint8 ext_id)
|
||||
{
|
||||
gint32 val = -1;
|
||||
gpointer data;
|
||||
guint size;
|
||||
|
||||
if (gst_rtp_buffer_get_extension_onebyte_header_from_bytes (pinfo->header_ext,
|
||||
pinfo->header_ext_bit_pattern, ext_id, 0, &data, &size)) {
|
||||
if (size == 2)
|
||||
val = GST_READ_UINT16_BE (data);
|
||||
}
|
||||
return val;
|
||||
}
|
||||
|
||||
static gboolean
|
||||
|
@ -2165,6 +2203,30 @@ source_update_active (RTPSession * sess, RTPSource * source,
|
|||
return TRUE;
|
||||
}
|
||||
|
||||
static void
|
||||
process_twcc_packet (RTPSession * sess, RTPPacketInfo * pinfo)
|
||||
{
|
||||
gint32 twcc_seqnum;
|
||||
|
||||
if (sess->twcc_recv_ext_id == 0)
|
||||
return;
|
||||
|
||||
twcc_seqnum = packet_info_get_twcc_seqnum (pinfo, sess->twcc_recv_ext_id);
|
||||
if (twcc_seqnum == -1)
|
||||
return;
|
||||
|
||||
if (rtp_twcc_manager_recv_packet (sess->twcc, twcc_seqnum, pinfo)) {
|
||||
RTP_SESSION_UNLOCK (sess);
|
||||
|
||||
/* TODO: find a better rational for this number, and possibly tune it based
|
||||
on factors like framerate / bandwidth etc */
|
||||
if (!rtp_session_send_rtcp (sess, 100 * GST_MSECOND)) {
|
||||
GST_INFO ("Could not schedule TWCC straight away");
|
||||
}
|
||||
RTP_SESSION_LOCK (sess);
|
||||
}
|
||||
}
|
||||
|
||||
static gboolean
|
||||
source_update_sender (RTPSession * sess, RTPSource * source,
|
||||
gboolean prevsender)
|
||||
|
@ -2244,6 +2306,7 @@ rtp_session_process_rtp (RTPSession * sess, GstBuffer * buffer,
|
|||
|
||||
/* let source process the packet */
|
||||
result = rtp_source_process_rtp (source, &pinfo);
|
||||
process_twcc_packet (sess, &pinfo);
|
||||
|
||||
/* source became active */
|
||||
if (source_update_active (sess, source, prevactive))
|
||||
|
@ -2801,6 +2864,35 @@ rtp_session_process_nack (RTPSession * sess, guint32 sender_ssrc,
|
|||
}
|
||||
}
|
||||
|
||||
static void
|
||||
rtp_session_process_twcc (RTPSession * sess, guint32 sender_ssrc,
|
||||
guint32 media_ssrc, guint8 * fci_data, guint fci_length)
|
||||
{
|
||||
GArray *twcc_packets;
|
||||
GstStructure *twcc_packets_s;
|
||||
GstStructure *twcc_stats_s;
|
||||
|
||||
twcc_packets = rtp_twcc_manager_parse_fci (sess->twcc,
|
||||
fci_data, fci_length * sizeof (guint32));
|
||||
if (twcc_packets == NULL)
|
||||
return;
|
||||
|
||||
twcc_packets_s = rtp_twcc_stats_get_packets_structure (twcc_packets);
|
||||
twcc_stats_s =
|
||||
rtp_twcc_stats_process_packets (sess->twcc_stats, twcc_packets);
|
||||
|
||||
GST_DEBUG_OBJECT (sess, "Parsed TWCC: %" GST_PTR_FORMAT, twcc_packets_s);
|
||||
GST_INFO_OBJECT (sess, "Current TWCC stats %" GST_PTR_FORMAT, twcc_stats_s);
|
||||
|
||||
g_array_unref (twcc_packets);
|
||||
|
||||
RTP_SESSION_UNLOCK (sess);
|
||||
if (sess->callbacks.notify_twcc)
|
||||
sess->callbacks.notify_twcc (sess, twcc_packets_s, twcc_stats_s,
|
||||
sess->notify_twcc_user_data);
|
||||
RTP_SESSION_LOCK (sess);
|
||||
}
|
||||
|
||||
static void
|
||||
rtp_session_process_feedback (RTPSession * sess, GstRTCPPacket * packet,
|
||||
RTPPacketInfo * pinfo, GstClockTime current_time)
|
||||
|
@ -2862,7 +2954,9 @@ rtp_session_process_feedback (RTPSession * sess, GstRTCPPacket * packet,
|
|||
|
||||
if ((src && src->internal) ||
|
||||
/* PSFB FIR puts the media ssrc inside the FCI */
|
||||
(type == GST_RTCP_TYPE_PSFB && fbtype == GST_RTCP_PSFB_TYPE_FIR)) {
|
||||
(type == GST_RTCP_TYPE_PSFB && fbtype == GST_RTCP_PSFB_TYPE_FIR) ||
|
||||
/* TWCC is for all sources, so a single media-ssrc is not enough */
|
||||
(type == GST_RTCP_TYPE_RTPFB && fbtype == GST_RTCP_RTPFB_TYPE_TWCC)) {
|
||||
switch (type) {
|
||||
case GST_RTCP_TYPE_PSFB:
|
||||
switch (fbtype) {
|
||||
|
@ -2890,6 +2984,10 @@ rtp_session_process_feedback (RTPSession * sess, GstRTCPPacket * packet,
|
|||
rtp_session_process_nack (sess, sender_ssrc, media_ssrc,
|
||||
fci_data, fci_length, current_time);
|
||||
break;
|
||||
case GST_RTCP_RTPFB_TYPE_TWCC:
|
||||
rtp_session_process_twcc (sess, sender_ssrc, media_ssrc,
|
||||
fci_data, fci_length);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
@ -3021,6 +3119,29 @@ invalid_packet:
|
|||
}
|
||||
}
|
||||
|
||||
static guint8
|
||||
_get_extmap_id_for_attribute (const GstStructure * s, const gchar * ext_name)
|
||||
{
|
||||
guint i;
|
||||
guint8 extmap_id = 0;
|
||||
guint n_fields = gst_structure_n_fields (s);
|
||||
|
||||
for (i = 0; i < n_fields; i++) {
|
||||
const gchar *field_name = gst_structure_nth_field_name (s, i);
|
||||
if (g_str_has_prefix (field_name, "extmap-")) {
|
||||
const gchar *str = gst_structure_get_string (s, field_name);
|
||||
if (str && g_strcmp0 (str, ext_name) == 0) {
|
||||
gint64 id = g_ascii_strtoll (field_name + 7, NULL, 10);
|
||||
if (id > 0 && id < 15) {
|
||||
extmap_id = id;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return extmap_id;
|
||||
}
|
||||
|
||||
/**
|
||||
* rtp_session_update_send_caps:
|
||||
* @sess: an #RTPSession
|
||||
|
@ -3075,8 +3196,30 @@ rtp_session_update_send_caps (RTPSession * sess, GstCaps * caps)
|
|||
} else {
|
||||
sess->internal_ssrc_from_caps_or_property = FALSE;
|
||||
}
|
||||
|
||||
sess->twcc_send_ext_id = _get_extmap_id_for_attribute (s, TWCC_EXTMAP_STR);
|
||||
if (sess->twcc_send_ext_id > 0) {
|
||||
GST_INFO ("TWCC enabled for send using extension id: %u",
|
||||
sess->twcc_send_ext_id);
|
||||
}
|
||||
}
|
||||
|
||||
static void
|
||||
send_twcc_packet (RTPSession * sess, RTPPacketInfo * pinfo)
|
||||
{
|
||||
gint32 twcc_seqnum;
|
||||
|
||||
if (sess->twcc_send_ext_id == 0)
|
||||
return;
|
||||
|
||||
twcc_seqnum = packet_info_get_twcc_seqnum (pinfo, sess->twcc_send_ext_id);
|
||||
if (twcc_seqnum == -1)
|
||||
return;
|
||||
|
||||
rtp_twcc_manager_send_packet (sess->twcc, twcc_seqnum, pinfo);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* rtp_session_send_rtp:
|
||||
* @sess: an #RTPSession
|
||||
|
@ -3111,6 +3254,8 @@ rtp_session_send_rtp (RTPSession * sess, gpointer data, gboolean is_list,
|
|||
current_time, running_time, -1))
|
||||
goto invalid_packet;
|
||||
|
||||
send_twcc_packet (sess, &pinfo);
|
||||
|
||||
source = obtain_internal_source (sess, pinfo.ssrc, &created, current_time);
|
||||
if (created)
|
||||
on_new_sender_ssrc (sess, source);
|
||||
|
@ -3168,7 +3313,7 @@ invalid_packet:
|
|||
collision:
|
||||
{
|
||||
g_object_unref (source);
|
||||
gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
|
||||
clean_packet_info (&pinfo);
|
||||
RTP_SESSION_UNLOCK (sess);
|
||||
GST_WARNING ("non-internal source with same ssrc %08x, drop packet",
|
||||
pinfo.ssrc);
|
||||
|
@ -4114,6 +4259,37 @@ remove_closing_sources (const gchar * key, RTPSource * source,
|
|||
return FALSE;
|
||||
}
|
||||
|
||||
static void
|
||||
generate_twcc (const gchar * key, RTPSource * source, ReportData * data)
|
||||
{
|
||||
RTPSession *sess = data->sess;
|
||||
GstBuffer *buf;
|
||||
|
||||
/* only generate RTCP for active internal sources */
|
||||
if (!source->internal || source->sent_bye)
|
||||
return;
|
||||
|
||||
/* ignore other sources when we do the timeout after a scheduled BYE */
|
||||
if (sess->scheduled_bye && !source->marked_bye)
|
||||
return;
|
||||
|
||||
/* skip if RTCP is disabled */
|
||||
if (source->disable_rtcp) {
|
||||
GST_DEBUG ("source %08x has RTCP disabled", source->ssrc);
|
||||
return;
|
||||
}
|
||||
|
||||
while ((buf = rtp_twcc_manager_get_feedback (sess->twcc, source->ssrc))) {
|
||||
ReportOutput *output = g_slice_new (ReportOutput);
|
||||
output->source = g_object_ref (source);
|
||||
output->is_bye = FALSE;
|
||||
output->buffer = buf;
|
||||
/* queue the RTCP packet to push later */
|
||||
g_queue_push_tail (&data->output, output);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
generate_rtcp (const gchar * key, RTPSource * source, ReportData * data)
|
||||
{
|
||||
|
@ -4338,6 +4514,9 @@ rtp_session_on_timeout (RTPSession * sess, GstClockTime current_time,
|
|||
g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
|
||||
(GHFunc) generate_rtcp, &data);
|
||||
|
||||
g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
|
||||
(GHFunc) generate_twcc, &data);
|
||||
|
||||
/* update the generation for all the sources that have been reported */
|
||||
g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
|
||||
(GHFunc) update_generation, &data);
|
||||
|
@ -4721,3 +4900,22 @@ no_source:
|
|||
return FALSE;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* rtp_session_update_recv_caps_structure:
|
||||
* @sess: an #RTPSession
|
||||
* @s: a #GstStructure from a #GstCaps
|
||||
*
|
||||
* Update the caps of the receiver in the rtp session.
|
||||
*/
|
||||
void
|
||||
rtp_session_update_recv_caps_structure (RTPSession * sess,
|
||||
const GstStructure * s)
|
||||
{
|
||||
guint8 ext_id = _get_extmap_id_for_attribute (s, TWCC_EXTMAP_STR);
|
||||
if (ext_id > 0) {
|
||||
sess->twcc_recv_ext_id = ext_id;
|
||||
GST_INFO ("TWCC enabled for recv using extension id: %u",
|
||||
sess->twcc_recv_ext_id);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -23,6 +23,7 @@
|
|||
#include <gst/gst.h>
|
||||
|
||||
#include "rtpsource.h"
|
||||
#include "rtptwcc.h"
|
||||
|
||||
typedef struct _RTPSession RTPSession;
|
||||
typedef struct _RTPSessionClass RTPSessionClass;
|
||||
|
@ -156,6 +157,15 @@ typedef GstClockTime (*RTPSessionRequestTime) (RTPSession *sess,
|
|||
typedef void (*RTPSessionNotifyNACK) (RTPSession *sess,
|
||||
guint16 seqnum, guint16 blp, guint32 ssrc, gpointer user_data);
|
||||
|
||||
/**
|
||||
* RTPSessionNotifyTWCC:
|
||||
* @user_data: user data specified when registering
|
||||
*
|
||||
* Notifies of Transport-wide congestion control packets and stats.
|
||||
*/
|
||||
typedef void (*RTPSessionNotifyTWCC) (RTPSession *sess,
|
||||
GstStructure * twcc_packets, GstStructure * twcc_stats, gpointer user_data);
|
||||
|
||||
/**
|
||||
* RTPSessionReconfigure:
|
||||
* @sess: an #RTPSession
|
||||
|
@ -186,6 +196,7 @@ typedef void (*RTPSessionNotifyEarlyRTCP) (RTPSession *sess,
|
|||
* @RTPSessionRequestKeyUnit: callback for requesting a new key unit
|
||||
* @RTPSessionRequestTime: callback for requesting the current time
|
||||
* @RTPSessionNotifyNACK: callback for notifying NACK
|
||||
* @RTPSessionNotifyTWCC: callback for notifying TWCC
|
||||
* @RTPSessionReconfigure: callback for requesting reconfiguration
|
||||
* @RTPSessionNotifyEarlyRTCP: callback for notifying early RTCP
|
||||
*
|
||||
|
@ -203,6 +214,7 @@ typedef struct {
|
|||
RTPSessionRequestKeyUnit request_key_unit;
|
||||
RTPSessionRequestTime request_time;
|
||||
RTPSessionNotifyNACK notify_nack;
|
||||
RTPSessionNotifyTWCC notify_twcc;
|
||||
RTPSessionReconfigure reconfigure;
|
||||
RTPSessionNotifyEarlyRTCP notify_early_rtcp;
|
||||
} RTPSessionCallbacks;
|
||||
|
@ -280,6 +292,7 @@ struct _RTPSession {
|
|||
gpointer request_key_unit_user_data;
|
||||
gpointer request_time_user_data;
|
||||
gpointer notify_nack_user_data;
|
||||
gpointer notify_twcc_user_data;
|
||||
gpointer reconfigure_user_data;
|
||||
gpointer notify_early_rtcp_user_data;
|
||||
|
||||
|
@ -295,6 +308,12 @@ struct _RTPSession {
|
|||
GList *conflicting_addresses;
|
||||
|
||||
gboolean timestamp_sender_reports;
|
||||
|
||||
/* Transport-wide cc-extension */
|
||||
RTPTWCCManager *twcc;
|
||||
RTPTWCCStats *twcc_stats;
|
||||
guint8 twcc_recv_ext_id;
|
||||
guint8 twcc_send_ext_id;
|
||||
};
|
||||
|
||||
/**
|
||||
|
@ -418,5 +437,7 @@ gboolean rtp_session_request_nack (RTPSession * sess,
|
|||
guint16 seqnum,
|
||||
GstClockTime max_delay);
|
||||
|
||||
void rtp_session_update_recv_caps_structure (RTPSession * sess, const GstStructure * s);
|
||||
|
||||
|
||||
#endif /* __RTP_SESSION_H__ */
|
||||
|
|
|
@ -920,8 +920,8 @@ push_packet (RTPSource * src, GstBuffer * buffer)
|
|||
return ret;
|
||||
}
|
||||
|
||||
static gint
|
||||
get_clock_rate (RTPSource * src, guint8 payload)
|
||||
static void
|
||||
fetch_clock_rate_from_payload (RTPSource * src, guint8 payload)
|
||||
{
|
||||
if (src->payload == -1) {
|
||||
/* first payload received, nothing was in the caps, lock on to this payload */
|
||||
|
@ -946,7 +946,6 @@ get_clock_rate (RTPSource * src, guint8 payload)
|
|||
src->clock_rate = clock_rate;
|
||||
gst_rtp_packet_rate_ctx_reset (&src->packet_rate_ctx, clock_rate);
|
||||
}
|
||||
return src->clock_rate;
|
||||
}
|
||||
|
||||
/* Jitter is the variation in the delay of received packets in a flow. It is
|
||||
|
@ -960,26 +959,23 @@ calculate_jitter (RTPSource * src, RTPPacketInfo * pinfo)
|
|||
GstClockTime running_time;
|
||||
guint32 rtparrival, transit, rtptime;
|
||||
gint32 diff;
|
||||
gint clock_rate;
|
||||
guint8 pt;
|
||||
|
||||
/* get arrival time */
|
||||
if ((running_time = pinfo->running_time) == GST_CLOCK_TIME_NONE)
|
||||
goto no_time;
|
||||
|
||||
pt = pinfo->pt;
|
||||
GST_LOG ("SSRC %08x got payload %d", src->ssrc, pinfo->pt);
|
||||
|
||||
GST_LOG ("SSRC %08x got payload %d", src->ssrc, pt);
|
||||
|
||||
/* get clockrate */
|
||||
if ((clock_rate = get_clock_rate (src, pt)) == -1)
|
||||
/* check if clock-rate is valid */
|
||||
if (src->clock_rate == -1)
|
||||
goto no_clock_rate;
|
||||
|
||||
rtptime = pinfo->rtptime;
|
||||
|
||||
/* convert arrival time to RTP timestamp units, truncate to 32 bits, we don't
|
||||
* care about the absolute value, just the difference. */
|
||||
rtparrival = gst_util_uint64_scale_int (running_time, clock_rate, GST_SECOND);
|
||||
rtparrival =
|
||||
gst_util_uint64_scale_int (running_time, src->clock_rate, GST_SECOND);
|
||||
|
||||
/* transit time is difference with RTP timestamp */
|
||||
transit = rtparrival - rtptime;
|
||||
|
@ -1002,7 +998,7 @@ calculate_jitter (RTPSource * src, RTPPacketInfo * pinfo)
|
|||
src->stats.last_rtptime = rtparrival;
|
||||
|
||||
GST_LOG ("rtparrival %u, rtptime %u, clock-rate %d, diff %d, jitter: %f",
|
||||
rtparrival, rtptime, clock_rate, diff, (src->stats.jitter) / 16.0);
|
||||
rtparrival, rtptime, src->clock_rate, diff, (src->stats.jitter) / 16.0);
|
||||
|
||||
return;
|
||||
|
||||
|
@ -1014,7 +1010,7 @@ no_time:
|
|||
}
|
||||
no_clock_rate:
|
||||
{
|
||||
GST_WARNING ("cannot get clock-rate for pt %d", pt);
|
||||
GST_WARNING ("cannot get clock-rate for pt %d", pinfo->pt);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
@ -1265,6 +1261,8 @@ rtp_source_process_rtp (RTPSource * src, RTPPacketInfo * pinfo)
|
|||
g_return_val_if_fail (RTP_IS_SOURCE (src), GST_FLOW_ERROR);
|
||||
g_return_val_if_fail (pinfo != NULL, GST_FLOW_ERROR);
|
||||
|
||||
fetch_clock_rate_from_payload (src, pinfo->pt);
|
||||
|
||||
if (!update_receiver_stats (src, pinfo, TRUE))
|
||||
return GST_FLOW_OK;
|
||||
|
||||
|
@ -1553,7 +1551,7 @@ rtp_source_get_new_sr (RTPSource * src, guint64 ntpnstime,
|
|||
if (src->clock_rate == -1 && src->pt_set) {
|
||||
GST_INFO ("no clock-rate, getting for pt %u and SSRC %u", src->pt,
|
||||
src->ssrc);
|
||||
get_clock_rate (src, src->pt);
|
||||
fetch_clock_rate_from_payload (src, src->pt);
|
||||
}
|
||||
|
||||
if (src->clock_rate != -1) {
|
||||
|
|
|
@ -19,7 +19,10 @@
|
|||
* Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
#define GLIB_DISABLE_DEPRECATION_WARNINGS
|
||||
|
||||
#include "rtpstats.h"
|
||||
#include "rtptwcc.h"
|
||||
|
||||
void
|
||||
gst_rtp_packet_rate_ctx_reset (RTPPacketRateCtx * ctx, gint32 clock_rate)
|
||||
|
@ -445,3 +448,230 @@ __g_socket_address_to_string (GSocketAddress * addr)
|
|||
|
||||
return ret;
|
||||
}
|
||||
|
||||
static void
|
||||
_append_structure_to_value_array (GValueArray * array, GstStructure * s)
|
||||
{
|
||||
GValue *val;
|
||||
g_value_array_append (array, NULL);
|
||||
val = g_value_array_get_nth (array, array->n_values - 1);
|
||||
g_value_init (val, GST_TYPE_STRUCTURE);
|
||||
g_value_take_boxed (val, s);
|
||||
}
|
||||
|
||||
static void
|
||||
_structure_take_value_array (GstStructure * s,
|
||||
const gchar * field_name, GValueArray * array)
|
||||
{
|
||||
GValue value = G_VALUE_INIT;
|
||||
g_value_init (&value, G_TYPE_VALUE_ARRAY);
|
||||
g_value_take_boxed (&value, array);
|
||||
gst_structure_take_value (s, field_name, &value);
|
||||
g_value_unset (&value);
|
||||
}
|
||||
|
||||
GstStructure *
|
||||
rtp_twcc_stats_get_packets_structure (GArray * twcc_packets)
|
||||
{
|
||||
GstStructure *ret = gst_structure_new_empty ("RTPTWCCPackets");
|
||||
GValueArray *array = g_value_array_new (0);
|
||||
guint i;
|
||||
|
||||
for (i = 0; i < twcc_packets->len; i++) {
|
||||
RTPTWCCPacket *pkt = &g_array_index (twcc_packets, RTPTWCCPacket, i);
|
||||
|
||||
GstStructure *pkt_s = gst_structure_new ("RTPTWCCPacket",
|
||||
"seqnum", G_TYPE_UINT, pkt->seqnum,
|
||||
"local-ts", G_TYPE_UINT64, pkt->local_ts,
|
||||
"remote-ts", G_TYPE_UINT64, pkt->remote_ts,
|
||||
"size", G_TYPE_UINT, pkt->size,
|
||||
"lost", G_TYPE_BOOLEAN, pkt->status == RTP_TWCC_PACKET_STATUS_NOT_RECV,
|
||||
NULL);
|
||||
_append_structure_to_value_array (array, pkt_s);
|
||||
}
|
||||
|
||||
_structure_take_value_array (ret, "packets", array);
|
||||
return ret;
|
||||
}
|
||||
|
||||
static void
|
||||
rtp_twcc_stats_calculate_stats (RTPTWCCStats * stats, GArray * twcc_packets)
|
||||
{
|
||||
guint packets_recv = 0;
|
||||
guint i;
|
||||
|
||||
for (i = 0; i < twcc_packets->len; i++) {
|
||||
RTPTWCCPacket *pkt = &g_array_index (twcc_packets, RTPTWCCPacket, i);
|
||||
|
||||
if (pkt->status != RTP_TWCC_PACKET_STATUS_NOT_RECV)
|
||||
packets_recv++;
|
||||
|
||||
if (GST_CLOCK_TIME_IS_VALID (pkt->local_ts) &&
|
||||
GST_CLOCK_TIME_IS_VALID (stats->last_local_ts)) {
|
||||
pkt->local_delta = GST_CLOCK_DIFF (stats->last_local_ts, pkt->local_ts);
|
||||
}
|
||||
|
||||
if (GST_CLOCK_TIME_IS_VALID (pkt->remote_ts) &&
|
||||
GST_CLOCK_TIME_IS_VALID (stats->last_remote_ts)) {
|
||||
pkt->remote_delta =
|
||||
GST_CLOCK_DIFF (stats->last_remote_ts, pkt->remote_ts);
|
||||
}
|
||||
|
||||
if (GST_CLOCK_STIME_IS_VALID (pkt->local_delta) &&
|
||||
GST_CLOCK_STIME_IS_VALID (pkt->remote_delta)) {
|
||||
pkt->delta_delta = pkt->remote_delta - pkt->local_delta;
|
||||
}
|
||||
|
||||
stats->last_local_ts = pkt->local_ts;
|
||||
stats->last_remote_ts = pkt->remote_ts;
|
||||
}
|
||||
|
||||
stats->packets_sent = twcc_packets->len;
|
||||
stats->packets_recv = packets_recv;
|
||||
}
|
||||
|
||||
static gint
|
||||
_get_window_start_index (RTPTWCCStats * stats, GstClockTime duration,
|
||||
GstClockTime * local_duration, GstClockTime * remote_duration)
|
||||
{
|
||||
RTPTWCCPacket *last = NULL;
|
||||
guint i;
|
||||
|
||||
if (stats->packets->len < 2)
|
||||
return -1;
|
||||
|
||||
for (i = 0; i < stats->packets->len; i++) {
|
||||
guint start_index = stats->packets->len - 1 - i;
|
||||
RTPTWCCPacket *pkt =
|
||||
&g_array_index (stats->packets, RTPTWCCPacket, start_index);
|
||||
if (GST_CLOCK_TIME_IS_VALID (pkt->local_ts)
|
||||
&& GST_CLOCK_TIME_IS_VALID (pkt->remote_ts)) {
|
||||
/* first find the last valid packet */
|
||||
if (last == NULL) {
|
||||
last = pkt;
|
||||
} else {
|
||||
/* and then get the duration in local ts */
|
||||
GstClockTimeDiff ld = GST_CLOCK_DIFF (pkt->local_ts, last->local_ts);
|
||||
if (ld >= duration) {
|
||||
*local_duration = ld;
|
||||
*remote_duration = GST_CLOCK_DIFF (pkt->remote_ts, last->remote_ts);
|
||||
return start_index;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return -1;
|
||||
}
|
||||
|
||||
static void
|
||||
rtp_twcc_stats_calculate_windowed_stats (RTPTWCCStats * stats)
|
||||
{
|
||||
guint i;
|
||||
gint start_idx;
|
||||
guint bits_sent = 0;
|
||||
guint bits_recv = 0;
|
||||
guint packets_sent = 0;
|
||||
guint packets_recv = 0;
|
||||
guint packets_lost;
|
||||
GstClockTimeDiff delta_delta_sum = 0;
|
||||
guint delta_delta_count = 0;
|
||||
GstClockTime local_duration;
|
||||
GstClockTime remote_duration;
|
||||
|
||||
start_idx = _get_window_start_index (stats, stats->window_size,
|
||||
&local_duration, &remote_duration);
|
||||
if (start_idx == -1) {
|
||||
return;
|
||||
}
|
||||
|
||||
/* remove the old packets */
|
||||
if (start_idx > 0)
|
||||
g_array_remove_range (stats->packets, 0, start_idx);
|
||||
|
||||
packets_sent = stats->packets->len - 1;
|
||||
|
||||
for (i = 0; i < packets_sent; i++) {
|
||||
RTPTWCCPacket *pkt = &g_array_index (stats->packets, RTPTWCCPacket, i);
|
||||
|
||||
if (GST_CLOCK_TIME_IS_VALID (pkt->local_ts)) {
|
||||
bits_sent += pkt->size * 8;
|
||||
}
|
||||
|
||||
if (GST_CLOCK_TIME_IS_VALID (pkt->remote_ts)) {
|
||||
bits_recv += pkt->size * 8;
|
||||
packets_recv++;
|
||||
}
|
||||
|
||||
if (GST_CLOCK_STIME_IS_VALID (pkt->delta_delta)) {
|
||||
delta_delta_sum += pkt->delta_delta;
|
||||
delta_delta_count++;
|
||||
}
|
||||
}
|
||||
|
||||
packets_lost = packets_sent - packets_recv;
|
||||
stats->packet_loss_pct = (packets_lost * 100) / (gfloat) packets_sent;
|
||||
|
||||
if (delta_delta_count) {
|
||||
GstClockTimeDiff avg_delta_of_delta = delta_delta_sum / delta_delta_count;
|
||||
if (GST_CLOCK_STIME_IS_VALID (stats->avg_delta_of_delta)) {
|
||||
stats->avg_delta_of_delta_change =
|
||||
(avg_delta_of_delta -
|
||||
stats->avg_delta_of_delta) / (250 * GST_USECOND);
|
||||
}
|
||||
stats->avg_delta_of_delta = avg_delta_of_delta;
|
||||
}
|
||||
|
||||
stats->bitrate_sent =
|
||||
gst_util_uint64_scale (bits_sent, GST_SECOND, local_duration);
|
||||
stats->bitrate_recv =
|
||||
gst_util_uint64_scale (bits_recv, GST_SECOND, remote_duration);
|
||||
|
||||
GST_DEBUG ("Got stats: bits_sent: %u, bits_recv: %u, packets_sent = %u, "
|
||||
"packets_recv: %u, packetlost_pct = %f, sent_bitrate = %u, "
|
||||
"recv_bitrate = %u, delta-delta-avg = %" GST_STIME_FORMAT ", "
|
||||
"delta-delta-change: %f", bits_sent, bits_recv, stats->packets_sent,
|
||||
packets_recv, stats->packet_loss_pct, stats->bitrate_sent,
|
||||
stats->bitrate_recv, GST_STIME_ARGS (stats->avg_delta_of_delta),
|
||||
stats->avg_delta_of_delta_change);
|
||||
}
|
||||
|
||||
RTPTWCCStats *
|
||||
rtp_twcc_stats_new (void)
|
||||
{
|
||||
RTPTWCCStats *stats = g_new0 (RTPTWCCStats, 1);
|
||||
stats->packets = g_array_new (FALSE, FALSE, sizeof (RTPTWCCPacket));
|
||||
stats->last_local_ts = GST_CLOCK_TIME_NONE;
|
||||
stats->last_remote_ts = GST_CLOCK_TIME_NONE;
|
||||
stats->avg_delta_of_delta = GST_CLOCK_STIME_NONE;
|
||||
stats->window_size = 300 * GST_MSECOND; /* FIXME: could be configurable? */
|
||||
return stats;
|
||||
}
|
||||
|
||||
void
|
||||
rtp_twcc_stats_free (RTPTWCCStats * stats)
|
||||
{
|
||||
g_array_unref (stats->packets);
|
||||
g_free (stats);
|
||||
}
|
||||
|
||||
static GstStructure *
|
||||
rtp_twcc_stats_get_stats_structure (RTPTWCCStats * stats)
|
||||
{
|
||||
return gst_structure_new ("RTPTWCCStats",
|
||||
"bitrate-sent", G_TYPE_UINT, stats->bitrate_sent,
|
||||
"bitrate-recv", G_TYPE_UINT, stats->bitrate_recv,
|
||||
"packets-sent", G_TYPE_UINT, stats->packets_sent,
|
||||
"packets-recv", G_TYPE_UINT, stats->packets_recv,
|
||||
"packet-loss-pct", G_TYPE_DOUBLE, stats->packet_loss_pct,
|
||||
"avg-delta-of-delta", G_TYPE_INT64, stats->avg_delta_of_delta, NULL);
|
||||
}
|
||||
|
||||
GstStructure *
|
||||
rtp_twcc_stats_process_packets (RTPTWCCStats * stats, GArray * twcc_packets)
|
||||
{
|
||||
rtp_twcc_stats_calculate_stats (stats, twcc_packets);
|
||||
g_array_append_vals (stats->packets, twcc_packets->data, twcc_packets->len);
|
||||
rtp_twcc_stats_calculate_windowed_stats (stats);
|
||||
return rtp_twcc_stats_get_stats_structure (stats);
|
||||
}
|
||||
|
|
|
@ -77,6 +77,10 @@ typedef struct {
|
|||
* @seqnum: the seqnum of the packet
|
||||
* @pt: the payload type of the packet
|
||||
* @rtptime: the RTP time of the packet
|
||||
* @marker: the marker bit
|
||||
*
|
||||
* @tw_seqnum_ext_id: the extension-header ID for transport-wide seqnums
|
||||
* @tw_seqnum: the transport-wide seqnum of the packet
|
||||
*
|
||||
* Structure holding information about the packet.
|
||||
*/
|
||||
|
@ -97,8 +101,11 @@ typedef struct {
|
|||
guint16 seqnum;
|
||||
guint8 pt;
|
||||
guint32 rtptime;
|
||||
gboolean marker;
|
||||
guint32 csrc_count;
|
||||
guint32 csrcs[16];
|
||||
GBytes *header_ext;
|
||||
guint16 header_ext_bit_pattern;
|
||||
} RTPPacketInfo;
|
||||
|
||||
/**
|
||||
|
@ -245,6 +252,27 @@ typedef struct {
|
|||
guint nacks_received;
|
||||
} RTPSessionStats;
|
||||
|
||||
/**
|
||||
* RTPTWCCStats:
|
||||
*
|
||||
* Stats kept for a session and used to produce TWCC stats.
|
||||
*/
|
||||
typedef struct {
|
||||
GArray *packets;
|
||||
GstClockTime window_size;
|
||||
GstClockTime last_local_ts;
|
||||
GstClockTime last_remote_ts;
|
||||
|
||||
guint bitrate_sent;
|
||||
guint bitrate_recv;
|
||||
guint packets_sent;
|
||||
guint packets_recv;
|
||||
gfloat packet_loss_pct;
|
||||
GstClockTimeDiff avg_delta_of_delta;
|
||||
gfloat avg_delta_of_delta_change;
|
||||
} RTPTWCCStats;
|
||||
|
||||
|
||||
void rtp_stats_init_defaults (RTPSessionStats *stats);
|
||||
|
||||
void rtp_stats_set_bandwidths (RTPSessionStats *stats,
|
||||
|
@ -264,4 +292,10 @@ void rtp_stats_set_min_interval (RTPSessionStats *stats,
|
|||
gboolean __g_socket_address_equal (GSocketAddress *a, GSocketAddress *b);
|
||||
gchar * __g_socket_address_to_string (GSocketAddress * addr);
|
||||
|
||||
RTPTWCCStats * rtp_twcc_stats_new (void);
|
||||
void rtp_twcc_stats_free (RTPTWCCStats * stats);
|
||||
GstStructure * rtp_twcc_stats_process_packets (RTPTWCCStats * stats,
|
||||
GArray * twcc_packets);
|
||||
GstStructure * rtp_twcc_stats_get_packets_structure (GArray * twcc_packets);
|
||||
|
||||
#endif /* __RTP_STATS_H__ */
|
||||
|
|
888
gst/rtpmanager/rtptwcc.c
Normal file
888
gst/rtpmanager/rtptwcc.c
Normal file
|
@ -0,0 +1,888 @@
|
|||
/* GStreamer
|
||||
* Copyright (C) 2019 Pexip (http://pexip.com/)
|
||||
* @author: Havard Graff <havard@pexip.com>
|
||||
*
|
||||
* This library is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU Library General Public
|
||||
* License as published by the Free Software Foundation; either
|
||||
* version 2 of the License, or (at your option) any later version.
|
||||
*
|
||||
* This library is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
||||
* Library General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Library General Public
|
||||
* License along with this library; if not, write to the
|
||||
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
|
||||
* Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
#include "rtptwcc.h"
|
||||
#include <gst/rtp/gstrtcpbuffer.h>
|
||||
#include <gst/base/gstbitreader.h>
|
||||
#include <gst/base/gstbitwriter.h>
|
||||
|
||||
GST_DEBUG_CATEGORY_EXTERN (rtp_session_debug);
|
||||
#define GST_CAT_DEFAULT rtp_session_debug
|
||||
|
||||
#define REF_TIME_UNIT (64 * GST_MSECOND)
|
||||
#define DELTA_UNIT (250 * GST_USECOND)
|
||||
#define MAX_TS_DELTA (0xff * DELTA_UNIT)
|
||||
|
||||
struct _RTPTWCCManager
|
||||
{
|
||||
guint mtu;
|
||||
guint max_packets_per_rtcp;
|
||||
GArray *recv_packets;
|
||||
|
||||
guint8 fb_pkt_count;
|
||||
gint32 last_seqnum;
|
||||
|
||||
GArray *sent_packets;
|
||||
GArray *parsed_packets;
|
||||
GQueue *rtcp_buffers;
|
||||
|
||||
guint64 recv_sender_ssrc;
|
||||
guint64 recv_media_ssrc;
|
||||
|
||||
guint16 expected_recv_seqnum;
|
||||
|
||||
gboolean first_fci_parse;
|
||||
guint16 expected_parsed_seqnum;
|
||||
guint8 expected_parsed_fb_pkt_count;
|
||||
};
|
||||
|
||||
typedef enum
|
||||
{
|
||||
RTP_TWCC_CHUNK_TYPE_RUN_LENGTH = 0,
|
||||
RTP_TWCC_CHUNK_TYPE_STATUS_VECTOR = 1,
|
||||
} RTPTWCCChunkType;
|
||||
|
||||
typedef struct
|
||||
{
|
||||
guint8 base_seqnum[2];
|
||||
guint8 packet_count[2];
|
||||
guint8 base_time[3];
|
||||
guint8 fb_pkt_count[1];
|
||||
} RTPTWCCHeader;
|
||||
|
||||
typedef struct
|
||||
{
|
||||
GstClockTime ts;
|
||||
guint16 seqnum;
|
||||
|
||||
gint64 delta;
|
||||
RTPTWCCPacketStatus status;
|
||||
guint16 missing_run;
|
||||
guint equal_run;
|
||||
} RecvPacket;
|
||||
|
||||
typedef struct
|
||||
{
|
||||
GstClockTime ts;
|
||||
GstClockTime socket_ts;
|
||||
GstClockTime remote_ts;
|
||||
guint16 seqnum;
|
||||
guint size;
|
||||
gboolean lost;
|
||||
} SentPacket;
|
||||
|
||||
RTPTWCCManager *
|
||||
rtp_twcc_manager_new (guint mtu)
|
||||
{
|
||||
RTPTWCCManager *twcc = g_new0 (RTPTWCCManager, 1);
|
||||
|
||||
twcc->recv_packets = g_array_new (FALSE, FALSE, sizeof (RecvPacket));
|
||||
|
||||
twcc->sent_packets = g_array_new (FALSE, FALSE, sizeof (SentPacket));
|
||||
twcc->parsed_packets = g_array_new (FALSE, FALSE, sizeof (RecvPacket));
|
||||
|
||||
twcc->rtcp_buffers = g_queue_new ();
|
||||
|
||||
twcc->last_seqnum = -1;
|
||||
twcc->recv_media_ssrc = -1;
|
||||
twcc->recv_sender_ssrc = -1;
|
||||
|
||||
rtp_twcc_manager_set_mtu (twcc, mtu);
|
||||
|
||||
twcc->first_fci_parse = TRUE;
|
||||
|
||||
return twcc;
|
||||
}
|
||||
|
||||
void
|
||||
rtp_twcc_manager_free (RTPTWCCManager * twcc)
|
||||
{
|
||||
g_array_unref (twcc->recv_packets);
|
||||
g_array_unref (twcc->sent_packets);
|
||||
g_array_unref (twcc->parsed_packets);
|
||||
g_queue_free_full (twcc->rtcp_buffers, (GDestroyNotify) gst_buffer_unref);
|
||||
g_free (twcc);
|
||||
}
|
||||
|
||||
static void
|
||||
recv_packet_init (RecvPacket * packet, guint16 seqnum, RTPPacketInfo * pinfo)
|
||||
{
|
||||
memset (packet, 0, sizeof (RecvPacket));
|
||||
packet->seqnum = seqnum;
|
||||
packet->ts = pinfo->running_time;
|
||||
}
|
||||
|
||||
void
|
||||
rtp_twcc_manager_set_mtu (RTPTWCCManager * twcc, guint mtu)
|
||||
{
|
||||
twcc->mtu = mtu;
|
||||
|
||||
/* the absolute worst case is that 7 packets uses
|
||||
header (4 * 4 * 4) 32 bytes) and
|
||||
packet_chunk 2 bytes +
|
||||
recv_deltas (2 * 7) 14 bytes */
|
||||
twcc->max_packets_per_rtcp = ((twcc->mtu - 32) * 7) / (2 + 14);
|
||||
}
|
||||
|
||||
static gint
|
||||
_twcc_seqnum_sort (gconstpointer a, gconstpointer b)
|
||||
{
|
||||
gint32 seqa = ((RecvPacket *) a)->seqnum;
|
||||
gint32 seqb = ((RecvPacket *) b)->seqnum;
|
||||
gint res = seqa - seqb;
|
||||
if (res < -65000)
|
||||
res = 1;
|
||||
if (res > 65000)
|
||||
res = -1;
|
||||
return res;
|
||||
}
|
||||
|
||||
static void
|
||||
rtp_twcc_write_recv_deltas (guint8 * fci_data, GArray * twcc_packets)
|
||||
{
|
||||
guint i;
|
||||
for (i = 0; i < twcc_packets->len; i++) {
|
||||
RecvPacket *pkt = &g_array_index (twcc_packets, RecvPacket, i);
|
||||
|
||||
if (pkt->status == RTP_TWCC_PACKET_STATUS_SMALL_DELTA) {
|
||||
GST_WRITE_UINT8 (fci_data, pkt->delta);
|
||||
fci_data += 1;
|
||||
} else if (pkt->status == RTP_TWCC_PACKET_STATUS_LARGE_NEGATIVE_DELTA) {
|
||||
GST_WRITE_UINT16_BE (fci_data, pkt->delta);
|
||||
fci_data += 2;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static void
|
||||
rtp_twcc_write_run_length_chunk (GArray * packet_chunks,
|
||||
RTPTWCCPacketStatus status, guint run_length)
|
||||
{
|
||||
guint written = 0;
|
||||
while (written < run_length) {
|
||||
GstBitWriter writer;
|
||||
guint16 data = 0;
|
||||
guint len = MIN (run_length - written, 8191);
|
||||
|
||||
GST_LOG ("Writing a run-lenght of %u with status %u", len, status);
|
||||
|
||||
gst_bit_writer_init_with_data (&writer, (guint8 *) & data, 2, FALSE);
|
||||
gst_bit_writer_put_bits_uint8 (&writer, RTP_TWCC_CHUNK_TYPE_RUN_LENGTH, 1);
|
||||
gst_bit_writer_put_bits_uint8 (&writer, status, 2);
|
||||
gst_bit_writer_put_bits_uint16 (&writer, len, 13);
|
||||
g_array_append_val (packet_chunks, data);
|
||||
written += len;
|
||||
}
|
||||
}
|
||||
|
||||
typedef struct
|
||||
{
|
||||
GArray *packet_chunks;
|
||||
GstBitWriter writer;
|
||||
guint16 data;
|
||||
guint symbol_size;
|
||||
} ChunkBitWriter;
|
||||
|
||||
static void
|
||||
chunk_bit_writer_reset (ChunkBitWriter * writer)
|
||||
{
|
||||
writer->data = 0;
|
||||
gst_bit_writer_init_with_data (&writer->writer,
|
||||
(guint8 *) & writer->data, 2, FALSE);
|
||||
|
||||
gst_bit_writer_put_bits_uint8 (&writer->writer,
|
||||
RTP_TWCC_CHUNK_TYPE_STATUS_VECTOR, 1);
|
||||
/* 1 for 2-bit symbol-size, 0 for 1-bit */
|
||||
gst_bit_writer_put_bits_uint8 (&writer->writer, writer->symbol_size - 1, 1);
|
||||
}
|
||||
|
||||
static void
|
||||
chunk_bit_writer_configure (ChunkBitWriter * writer, guint symbol_size)
|
||||
{
|
||||
writer->symbol_size = symbol_size;
|
||||
chunk_bit_writer_reset (writer);
|
||||
}
|
||||
|
||||
static gboolean
|
||||
chunk_bit_writer_is_empty (ChunkBitWriter * writer)
|
||||
{
|
||||
return writer->writer.bit_size == 2;
|
||||
}
|
||||
|
||||
static gboolean
|
||||
chunk_bit_writer_is_full (ChunkBitWriter * writer)
|
||||
{
|
||||
return writer->writer.bit_size == 16;
|
||||
}
|
||||
|
||||
static guint
|
||||
chunk_bit_writer_get_available_slots (ChunkBitWriter * writer)
|
||||
{
|
||||
return (16 - writer->writer.bit_size) / writer->symbol_size;
|
||||
}
|
||||
|
||||
static guint
|
||||
chunk_bit_writer_get_total_slots (ChunkBitWriter * writer)
|
||||
{
|
||||
return 14 / writer->symbol_size;
|
||||
}
|
||||
|
||||
static void
|
||||
chunk_bit_writer_flush (ChunkBitWriter * writer)
|
||||
{
|
||||
/* don't append a chunk if no bits have been written */
|
||||
if (!chunk_bit_writer_is_empty (writer)) {
|
||||
g_array_append_val (writer->packet_chunks, writer->data);
|
||||
chunk_bit_writer_reset (writer);
|
||||
}
|
||||
}
|
||||
|
||||
static void
|
||||
chunk_bit_writer_init (ChunkBitWriter * writer,
|
||||
GArray * packet_chunks, guint symbol_size)
|
||||
{
|
||||
writer->packet_chunks = packet_chunks;
|
||||
chunk_bit_writer_configure (writer, symbol_size);
|
||||
}
|
||||
|
||||
static void
|
||||
chunk_bit_writer_write (ChunkBitWriter * writer, RTPTWCCPacketStatus status)
|
||||
{
|
||||
gst_bit_writer_put_bits_uint8 (&writer->writer, status, writer->symbol_size);
|
||||
if (chunk_bit_writer_is_full (writer)) {
|
||||
chunk_bit_writer_flush (writer);
|
||||
}
|
||||
}
|
||||
|
||||
static void
|
||||
rtp_twcc_write_status_vector_chunk (ChunkBitWriter * writer, RecvPacket * pkt)
|
||||
{
|
||||
if (pkt->missing_run > 0) {
|
||||
guint available = chunk_bit_writer_get_available_slots (writer);
|
||||
guint total = chunk_bit_writer_get_total_slots (writer);
|
||||
if (pkt->missing_run > (available + total)) {
|
||||
/* here it is better to finish up the current status-chunk and then
|
||||
go for run-length */
|
||||
for (guint i = 0; i < available; i++) {
|
||||
chunk_bit_writer_write (writer, RTP_TWCC_PACKET_STATUS_NOT_RECV);
|
||||
}
|
||||
rtp_twcc_write_run_length_chunk (writer->packet_chunks,
|
||||
RTP_TWCC_PACKET_STATUS_NOT_RECV, pkt->missing_run - available);
|
||||
} else {
|
||||
for (guint i = 0; i < pkt->missing_run; i++) {
|
||||
chunk_bit_writer_write (writer, RTP_TWCC_PACKET_STATUS_NOT_RECV);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
chunk_bit_writer_write (writer, pkt->status);
|
||||
}
|
||||
|
||||
typedef struct
|
||||
{
|
||||
RecvPacket *equal;
|
||||
} RunLengthHelper;
|
||||
|
||||
static void
|
||||
run_lenght_helper_update (RunLengthHelper * rlh, RecvPacket * pkt)
|
||||
{
|
||||
/* for missing packets we reset */
|
||||
if (pkt->missing_run > 0) {
|
||||
rlh->equal = NULL;
|
||||
}
|
||||
|
||||
/* all status equal run */
|
||||
if (rlh->equal == NULL) {
|
||||
rlh->equal = pkt;
|
||||
rlh->equal->equal_run = 0;
|
||||
}
|
||||
|
||||
if (rlh->equal->status == pkt->status) {
|
||||
rlh->equal->equal_run++;
|
||||
} else {
|
||||
rlh->equal = pkt;
|
||||
rlh->equal->equal_run = 1;
|
||||
}
|
||||
}
|
||||
|
||||
static void
|
||||
rtp_twcc_write_chunks (GArray * packet_chunks,
|
||||
GArray * twcc_packets, guint symbol_size)
|
||||
{
|
||||
ChunkBitWriter writer;
|
||||
guint i;
|
||||
guint bits_per_chunks = 7 * symbol_size;
|
||||
|
||||
chunk_bit_writer_init (&writer, packet_chunks, symbol_size);
|
||||
|
||||
for (i = 0; i < twcc_packets->len; i++) {
|
||||
RecvPacket *pkt = &g_array_index (twcc_packets, RecvPacket, i);
|
||||
guint remaining_packets = twcc_packets->len - i;
|
||||
|
||||
/* we can only start a run-length chunk if the status-chunk is
|
||||
completed */
|
||||
if (chunk_bit_writer_is_empty (&writer)) {
|
||||
/* first write in any preceeding gaps, we use run-length
|
||||
if it would take up more than one chunk (14/7) */
|
||||
if (pkt->missing_run > bits_per_chunks) {
|
||||
rtp_twcc_write_run_length_chunk (packet_chunks,
|
||||
RTP_TWCC_PACKET_STATUS_NOT_RECV, pkt->missing_run);
|
||||
}
|
||||
|
||||
/* we have a run of the same status, write a run-length chunk and skip
|
||||
to the next point */
|
||||
if (pkt->missing_run == 0 &&
|
||||
(pkt->equal_run > bits_per_chunks ||
|
||||
pkt->equal_run == remaining_packets)) {
|
||||
rtp_twcc_write_run_length_chunk (packet_chunks,
|
||||
pkt->status, pkt->equal_run);
|
||||
i += pkt->equal_run - 1;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
GST_LOG ("i=%u: Writing a %u-bit vector of status: %u",
|
||||
i, symbol_size, pkt->status);
|
||||
rtp_twcc_write_status_vector_chunk (&writer, pkt);
|
||||
}
|
||||
chunk_bit_writer_flush (&writer);
|
||||
}
|
||||
|
||||
static void
|
||||
rtp_twcc_manager_add_fci (RTPTWCCManager * twcc, GstRTCPPacket * packet)
|
||||
{
|
||||
RecvPacket *first, *last, *prev;
|
||||
guint16 packet_count;
|
||||
GstClockTime base_time;
|
||||
GstClockTime ts_rounded;
|
||||
guint i;
|
||||
GArray *packet_chunks = g_array_new (FALSE, FALSE, 2);
|
||||
RTPTWCCHeader header;
|
||||
guint header_size = sizeof (RTPTWCCHeader);
|
||||
guint packet_chunks_size;
|
||||
guint recv_deltas_size = 0;
|
||||
guint16 fci_length;
|
||||
guint16 fci_chunks;
|
||||
guint8 *fci_data;
|
||||
guint8 *fci_data_ptr;
|
||||
RunLengthHelper rlh = { NULL };
|
||||
guint symbol_size = 1;
|
||||
GstClockTimeDiff delta_ts;
|
||||
gint64 delta_ts_rounded;
|
||||
|
||||
g_array_sort (twcc->recv_packets, _twcc_seqnum_sort);
|
||||
|
||||
/* get first and last packet */
|
||||
first = &g_array_index (twcc->recv_packets, RecvPacket, 0);
|
||||
last =
|
||||
&g_array_index (twcc->recv_packets, RecvPacket,
|
||||
twcc->recv_packets->len - 1);
|
||||
|
||||
packet_count = last->seqnum - first->seqnum + 1;
|
||||
base_time = first->ts / REF_TIME_UNIT;
|
||||
|
||||
GST_WRITE_UINT16_BE (header.base_seqnum, first->seqnum);
|
||||
GST_WRITE_UINT16_BE (header.packet_count, packet_count);
|
||||
GST_WRITE_UINT24_BE (header.base_time, base_time);
|
||||
GST_WRITE_UINT8 (header.fb_pkt_count, twcc->fb_pkt_count);
|
||||
|
||||
base_time *= REF_TIME_UNIT;
|
||||
ts_rounded = base_time;
|
||||
|
||||
GST_DEBUG ("Created TWCC feedback: base_seqnum: #%u, packet_count: %u, "
|
||||
"base_time %" GST_TIME_FORMAT " fb_pkt_count: %u",
|
||||
first->seqnum, packet_count, GST_TIME_ARGS (base_time),
|
||||
twcc->fb_pkt_count);
|
||||
|
||||
twcc->fb_pkt_count++;
|
||||
twcc->expected_recv_seqnum = first->seqnum + packet_count;
|
||||
|
||||
/* calculate all deltas and check for gaps etc */
|
||||
prev = first;
|
||||
for (i = 0; i < twcc->recv_packets->len; i++) {
|
||||
RecvPacket *pkt = &g_array_index (twcc->recv_packets, RecvPacket, i);
|
||||
if (i != 0) {
|
||||
pkt->missing_run = pkt->seqnum - prev->seqnum - 1;
|
||||
}
|
||||
|
||||
delta_ts = GST_CLOCK_DIFF (ts_rounded, pkt->ts);
|
||||
pkt->delta = delta_ts / DELTA_UNIT;
|
||||
delta_ts_rounded = pkt->delta * DELTA_UNIT;
|
||||
ts_rounded += delta_ts_rounded;
|
||||
|
||||
if (delta_ts_rounded < 0 || delta_ts_rounded > MAX_TS_DELTA) {
|
||||
pkt->status = RTP_TWCC_PACKET_STATUS_LARGE_NEGATIVE_DELTA;
|
||||
recv_deltas_size += 2;
|
||||
symbol_size = 2;
|
||||
} else {
|
||||
pkt->status = RTP_TWCC_PACKET_STATUS_SMALL_DELTA;
|
||||
recv_deltas_size += 1;
|
||||
}
|
||||
run_lenght_helper_update (&rlh, pkt);
|
||||
|
||||
GST_LOG ("pkt: #%u, ts: %" GST_TIME_FORMAT
|
||||
" ts_rounded: %" GST_TIME_FORMAT
|
||||
" delta_ts: %" GST_STIME_FORMAT
|
||||
" delta_ts_rounded: %" GST_STIME_FORMAT
|
||||
" missing_run: %u, status: %u", pkt->seqnum,
|
||||
GST_TIME_ARGS (pkt->ts), GST_TIME_ARGS (ts_rounded),
|
||||
GST_STIME_ARGS (delta_ts), GST_STIME_ARGS (delta_ts_rounded),
|
||||
pkt->missing_run, pkt->status);
|
||||
prev = pkt;
|
||||
}
|
||||
|
||||
rtp_twcc_write_chunks (packet_chunks, twcc->recv_packets, symbol_size);
|
||||
|
||||
packet_chunks_size = packet_chunks->len * 2;
|
||||
fci_length = header_size + packet_chunks_size + recv_deltas_size;
|
||||
fci_chunks = (fci_length - 1) / sizeof (guint32) + 1;
|
||||
|
||||
if (!gst_rtcp_packet_fb_set_fci_length (packet, fci_chunks)) {
|
||||
GST_ERROR ("Could not fit: %u packets", packet_count);
|
||||
g_assert_not_reached ();
|
||||
}
|
||||
|
||||
fci_data = gst_rtcp_packet_fb_get_fci (packet);
|
||||
fci_data_ptr = fci_data;
|
||||
|
||||
memcpy (fci_data_ptr, &header, header_size);
|
||||
fci_data_ptr += header_size;
|
||||
|
||||
memcpy (fci_data_ptr, packet_chunks->data, packet_chunks_size);
|
||||
fci_data_ptr += packet_chunks_size;
|
||||
|
||||
rtp_twcc_write_recv_deltas (fci_data_ptr, twcc->recv_packets);
|
||||
|
||||
GST_MEMDUMP ("twcc-header:", (guint8 *) & header, header_size);
|
||||
GST_MEMDUMP ("packet-chunks:", (guint8 *) packet_chunks->data,
|
||||
packet_chunks_size);
|
||||
GST_MEMDUMP ("full fci:", fci_data, fci_length);
|
||||
|
||||
g_array_unref (packet_chunks);
|
||||
g_array_set_size (twcc->recv_packets, 0);
|
||||
}
|
||||
|
||||
static void
|
||||
rtp_twcc_manager_create_feedback (RTPTWCCManager * twcc)
|
||||
{
|
||||
GstBuffer *buf;
|
||||
GstRTCPBuffer rtcp = GST_RTCP_BUFFER_INIT;
|
||||
GstRTCPPacket packet;
|
||||
|
||||
buf = gst_rtcp_buffer_new (twcc->mtu);
|
||||
|
||||
gst_rtcp_buffer_map (buf, GST_MAP_READWRITE, &rtcp);
|
||||
|
||||
gst_rtcp_buffer_add_packet (&rtcp, GST_RTCP_TYPE_RTPFB, &packet);
|
||||
|
||||
gst_rtcp_packet_fb_set_type (&packet, GST_RTCP_RTPFB_TYPE_TWCC);
|
||||
if (twcc->recv_sender_ssrc != 1)
|
||||
gst_rtcp_packet_fb_set_sender_ssrc (&packet, twcc->recv_sender_ssrc);
|
||||
gst_rtcp_packet_fb_set_media_ssrc (&packet, twcc->recv_media_ssrc);
|
||||
|
||||
rtp_twcc_manager_add_fci (twcc, &packet);
|
||||
|
||||
gst_rtcp_buffer_unmap (&rtcp);
|
||||
|
||||
g_queue_push_tail (twcc->rtcp_buffers, buf);
|
||||
}
|
||||
|
||||
/* we have calculated a (very pessimistic) max-packets per RTCP feedback,
|
||||
so this is to make sure we don't exceed that */
|
||||
static gboolean
|
||||
_exceeds_max_packets (RTPTWCCManager * twcc, guint16 seqnum)
|
||||
{
|
||||
RecvPacket *first, *last;
|
||||
guint16 packet_count;
|
||||
|
||||
if (twcc->recv_packets->len == 0)
|
||||
return FALSE;
|
||||
|
||||
/* find the delta betwen first stored packet and this seqnum */
|
||||
first = &g_array_index (twcc->recv_packets, RecvPacket, 0);
|
||||
packet_count = seqnum - first->seqnum + 1;
|
||||
if (packet_count > twcc->max_packets_per_rtcp)
|
||||
return TRUE;
|
||||
|
||||
/* then find the delta between last stored packet and this seqnum */
|
||||
last =
|
||||
&g_array_index (twcc->recv_packets, RecvPacket,
|
||||
twcc->recv_packets->len - 1);
|
||||
packet_count = seqnum - (last->seqnum + 1);
|
||||
if (packet_count > twcc->max_packets_per_rtcp)
|
||||
return TRUE;
|
||||
|
||||
return FALSE;
|
||||
}
|
||||
|
||||
/* in this case we could have lost the packet with the marker bit,
|
||||
so with a large (30) amount of packets, lost packets and still no marker,
|
||||
we send a feedback anyway */
|
||||
static gboolean
|
||||
_many_packets_some_lost (RTPTWCCManager * twcc, guint16 seqnum)
|
||||
{
|
||||
RecvPacket *first;
|
||||
guint16 packet_count;
|
||||
guint received_packets = twcc->recv_packets->len;
|
||||
if (received_packets == 0)
|
||||
return FALSE;
|
||||
|
||||
first = &g_array_index (twcc->recv_packets, RecvPacket, 0);
|
||||
packet_count = seqnum - first->seqnum + 1;
|
||||
/* packet-count larger than recevied-packets means we have lost packets */
|
||||
if (packet_count >= 30 && packet_count > received_packets)
|
||||
return TRUE;
|
||||
|
||||
return FALSE;
|
||||
}
|
||||
|
||||
gboolean
|
||||
rtp_twcc_manager_recv_packet (RTPTWCCManager * twcc,
|
||||
guint16 seqnum, RTPPacketInfo * pinfo)
|
||||
{
|
||||
gboolean send_feedback = FALSE;
|
||||
RecvPacket packet;
|
||||
gint32 diff;
|
||||
|
||||
/* if this packet would exceed the capacity of our MTU, we create a feedback
|
||||
with the current packets, and start over with this one */
|
||||
if (_exceeds_max_packets (twcc, seqnum)) {
|
||||
GST_INFO ("twcc-seqnum: %u would overflow max packets: %u, create feedback"
|
||||
" with current packets", seqnum, twcc->max_packets_per_rtcp);
|
||||
rtp_twcc_manager_create_feedback (twcc);
|
||||
send_feedback = TRUE;
|
||||
}
|
||||
|
||||
/* we can have multiple ssrcs here, so just pick the first one */
|
||||
if (twcc->recv_media_ssrc == -1)
|
||||
twcc->recv_media_ssrc = pinfo->ssrc;
|
||||
|
||||
/* check if we are reordered, and treat it as lost if we already sent
|
||||
a feedback msg with a higher seqnum. If the diff is huge, treat
|
||||
it as a restart of a stream */
|
||||
diff = (gint32) seqnum - (gint32) twcc->expected_recv_seqnum;
|
||||
if (twcc->fb_pkt_count > 0 && diff < 0 && diff > -1000) {
|
||||
GST_INFO ("Received out of order packet (%u after %u), treating as lost",
|
||||
seqnum, twcc->expected_recv_seqnum);
|
||||
return FALSE;
|
||||
}
|
||||
|
||||
/* store the packet for Transport-wide RTCP feedback message */
|
||||
recv_packet_init (&packet, seqnum, pinfo);
|
||||
g_array_append_val (twcc->recv_packets, packet);
|
||||
twcc->last_seqnum = seqnum;
|
||||
GST_LOG ("Receive: twcc-seqnum: %u, marker: %d, ts: %" GST_TIME_FORMAT,
|
||||
seqnum, pinfo->marker, GST_TIME_ARGS (pinfo->running_time));
|
||||
|
||||
if (pinfo->marker || _many_packets_some_lost (twcc, seqnum)) {
|
||||
rtp_twcc_manager_create_feedback (twcc);
|
||||
send_feedback = TRUE;
|
||||
}
|
||||
|
||||
return send_feedback;
|
||||
}
|
||||
|
||||
static void
|
||||
_change_rtcp_fb_sender_ssrc (GstBuffer * buf, guint32 sender_ssrc)
|
||||
{
|
||||
GstRTCPBuffer rtcp = GST_RTCP_BUFFER_INIT;
|
||||
GstRTCPPacket packet;
|
||||
gst_rtcp_buffer_map (buf, GST_MAP_READWRITE, &rtcp);
|
||||
gst_rtcp_buffer_get_first_packet (&rtcp, &packet);
|
||||
gst_rtcp_packet_fb_set_sender_ssrc (&packet, sender_ssrc);
|
||||
gst_rtcp_buffer_unmap (&rtcp);
|
||||
}
|
||||
|
||||
GstBuffer *
|
||||
rtp_twcc_manager_get_feedback (RTPTWCCManager * twcc, guint sender_ssrc)
|
||||
{
|
||||
GstBuffer *buf;
|
||||
buf = g_queue_pop_head (twcc->rtcp_buffers);
|
||||
|
||||
if (buf && twcc->recv_sender_ssrc != sender_ssrc) {
|
||||
_change_rtcp_fb_sender_ssrc (buf, sender_ssrc);
|
||||
twcc->recv_sender_ssrc = sender_ssrc;
|
||||
}
|
||||
|
||||
return buf;
|
||||
}
|
||||
|
||||
static void
|
||||
sent_packet_init (SentPacket * packet, guint16 seqnum, RTPPacketInfo * pinfo)
|
||||
{
|
||||
packet->seqnum = seqnum;
|
||||
packet->ts = pinfo->running_time;
|
||||
packet->size = pinfo->payload_len;
|
||||
packet->remote_ts = GST_CLOCK_TIME_NONE;
|
||||
packet->socket_ts = GST_CLOCK_TIME_NONE;
|
||||
packet->lost = FALSE;
|
||||
}
|
||||
|
||||
void
|
||||
rtp_twcc_manager_send_packet (RTPTWCCManager * twcc,
|
||||
guint16 seqnum, RTPPacketInfo * pinfo)
|
||||
{
|
||||
SentPacket packet;
|
||||
sent_packet_init (&packet, seqnum, pinfo);
|
||||
g_array_append_val (twcc->sent_packets, packet);
|
||||
|
||||
GST_LOG ("Send: twcc-seqnum: %u, marker: %d, ts: %" GST_TIME_FORMAT,
|
||||
seqnum, pinfo->marker, GST_TIME_ARGS (pinfo->running_time));
|
||||
}
|
||||
|
||||
void
|
||||
rtp_twcc_manager_set_send_packet_ts (RTPTWCCManager * twcc,
|
||||
guint packet_id, GstClockTime ts)
|
||||
{
|
||||
SentPacket *pkt = NULL;
|
||||
pkt = &g_array_index (twcc->sent_packets, SentPacket, packet_id);
|
||||
if (pkt) {
|
||||
pkt->socket_ts = ts;
|
||||
GST_DEBUG ("assigning: pkt-id: %u to packet: %u", packet_id, pkt->seqnum);
|
||||
}
|
||||
}
|
||||
|
||||
static void
|
||||
_add_twcc_packet (GArray * twcc_packets, guint16 seqnum, guint status)
|
||||
{
|
||||
RTPTWCCPacket packet;
|
||||
memset (&packet, 0, sizeof (RTPTWCCPacket));
|
||||
packet.local_ts = GST_CLOCK_TIME_NONE;
|
||||
packet.remote_ts = GST_CLOCK_TIME_NONE;
|
||||
packet.local_delta = GST_CLOCK_STIME_NONE;
|
||||
packet.remote_delta = GST_CLOCK_STIME_NONE;
|
||||
packet.delta_delta = GST_CLOCK_STIME_NONE;
|
||||
packet.seqnum = seqnum;
|
||||
packet.status = status;
|
||||
g_array_append_val (twcc_packets, packet);
|
||||
}
|
||||
|
||||
static guint
|
||||
_parse_run_length_chunk (GstBitReader * reader, GArray * twcc_packets,
|
||||
guint16 seqnum_offset, guint remaining_packets)
|
||||
{
|
||||
guint run_length;
|
||||
guint8 status_code;
|
||||
|
||||
gst_bit_reader_get_bits_uint8 (reader, &status_code, 2);
|
||||
|
||||
run_length = *(guint16 *) reader->data & ~0xE0; /* mask out the 3 last bits */
|
||||
run_length = MIN (remaining_packets, GST_READ_UINT16_BE (&run_length));
|
||||
|
||||
for (guint i = 0; i < run_length; i++) {
|
||||
_add_twcc_packet (twcc_packets, seqnum_offset + i, status_code);
|
||||
}
|
||||
|
||||
return run_length;
|
||||
}
|
||||
|
||||
static guint
|
||||
_parse_status_vector_chunk (GstBitReader * reader, GArray * twcc_packets,
|
||||
guint16 seqnum_offset, guint remaining_packets)
|
||||
{
|
||||
guint8 symbol_size;
|
||||
guint num_bits;
|
||||
|
||||
gst_bit_reader_get_bits_uint8 (reader, &symbol_size, 1);
|
||||
symbol_size += 1;
|
||||
num_bits = MIN (remaining_packets, 14 / symbol_size);
|
||||
|
||||
for (guint i = 0; i < num_bits; i++) {
|
||||
guint8 status_code;
|
||||
if (gst_bit_reader_get_bits_uint8 (reader, &status_code, symbol_size))
|
||||
_add_twcc_packet (twcc_packets, seqnum_offset + i, status_code);
|
||||
}
|
||||
|
||||
return num_bits;
|
||||
}
|
||||
|
||||
/* Remove all locally stored packets that has been reported
|
||||
back to us */
|
||||
static void
|
||||
_prune_sent_packets (RTPTWCCManager * twcc, GArray * twcc_packets)
|
||||
{
|
||||
SentPacket *first;
|
||||
RTPTWCCPacket *last;
|
||||
guint16 last_idx;
|
||||
|
||||
if (twcc_packets->len == 0 || twcc->sent_packets->len == 0)
|
||||
return;
|
||||
|
||||
first = &g_array_index (twcc->sent_packets, SentPacket, 0);
|
||||
last = &g_array_index (twcc_packets, RTPTWCCPacket, twcc_packets->len - 1);
|
||||
|
||||
last_idx = last->seqnum - first->seqnum;
|
||||
|
||||
if (last_idx >= twcc->sent_packets->len)
|
||||
g_array_remove_range (twcc->sent_packets, 0, last_idx);
|
||||
}
|
||||
|
||||
static void
|
||||
_check_for_lost_packets (RTPTWCCManager * twcc, GArray * twcc_packets,
|
||||
guint16 base_seqnum, guint16 packet_count, guint8 fb_pkt_count)
|
||||
{
|
||||
guint packets_lost;
|
||||
guint i;
|
||||
|
||||
/* first packet */
|
||||
if (twcc->first_fci_parse) {
|
||||
twcc->first_fci_parse = FALSE;
|
||||
goto done;
|
||||
}
|
||||
|
||||
/* we have gone backwards, don't reset the expectations,
|
||||
but process the packet nonetheless */
|
||||
if (fb_pkt_count < twcc->expected_parsed_fb_pkt_count) {
|
||||
GST_WARNING ("feedback packet count going backwards (%u < %u)",
|
||||
fb_pkt_count, twcc->expected_parsed_fb_pkt_count);
|
||||
return;
|
||||
}
|
||||
|
||||
/* we have jumped forwards, reset expectations, but don't trigger
|
||||
lost packets in case the missing fb-packet(s) arrive later */
|
||||
if (fb_pkt_count > twcc->expected_parsed_fb_pkt_count) {
|
||||
GST_WARNING ("feedback packet count jumped ahead (%u > %u)",
|
||||
fb_pkt_count, twcc->expected_parsed_fb_pkt_count);
|
||||
goto done;
|
||||
}
|
||||
|
||||
packets_lost = base_seqnum - twcc->expected_parsed_seqnum;
|
||||
for (i = 0; i < packets_lost; i++) {
|
||||
_add_twcc_packet (twcc_packets, twcc->expected_parsed_seqnum + i,
|
||||
RTP_TWCC_PACKET_STATUS_NOT_RECV);
|
||||
}
|
||||
|
||||
done:
|
||||
twcc->expected_parsed_seqnum = base_seqnum + packet_count;
|
||||
twcc->expected_parsed_fb_pkt_count = fb_pkt_count + 1;
|
||||
return;
|
||||
}
|
||||
|
||||
GArray *
|
||||
rtp_twcc_manager_parse_fci (RTPTWCCManager * twcc,
|
||||
guint8 * fci_data, guint fci_length)
|
||||
{
|
||||
GArray *twcc_packets;
|
||||
guint16 base_seqnum;
|
||||
guint16 packet_count;
|
||||
GstClockTime base_time;
|
||||
GstClockTime ts_rounded;
|
||||
guint8 fb_pkt_count;
|
||||
guint packets_parsed = 0;
|
||||
guint fci_parsed;
|
||||
guint i;
|
||||
SentPacket *first_sent_pkt = NULL;
|
||||
|
||||
if (fci_length < 10) {
|
||||
GST_WARNING ("Malformed TWCC RTCP feedback packet");
|
||||
return NULL;
|
||||
}
|
||||
|
||||
base_seqnum = GST_READ_UINT16_BE (&fci_data[0]);
|
||||
packet_count = GST_READ_UINT16_BE (&fci_data[2]);
|
||||
base_time = GST_READ_UINT24_BE (&fci_data[4]) * REF_TIME_UNIT;
|
||||
fb_pkt_count = fci_data[7];
|
||||
|
||||
GST_DEBUG ("Parsed TWCC feedback: base_seqnum: #%u, packet_count: %u, "
|
||||
"base_time %" GST_TIME_FORMAT " fb_pkt_count: %u",
|
||||
base_seqnum, packet_count, GST_TIME_ARGS (base_time), fb_pkt_count);
|
||||
|
||||
twcc_packets = g_array_sized_new (FALSE, FALSE,
|
||||
sizeof (RTPTWCCPacket), packet_count);
|
||||
|
||||
_check_for_lost_packets (twcc, twcc_packets,
|
||||
base_seqnum, packet_count, fb_pkt_count);
|
||||
|
||||
fci_parsed = 8;
|
||||
while (packets_parsed < packet_count && (fci_parsed + 1) < fci_length) {
|
||||
GstBitReader reader = GST_BIT_READER_INIT (&fci_data[fci_parsed], 2);
|
||||
guint8 chunk_type;
|
||||
guint seqnum_offset = base_seqnum + packets_parsed;
|
||||
guint remaining_packets = packet_count - packets_parsed;
|
||||
|
||||
gst_bit_reader_get_bits_uint8 (&reader, &chunk_type, 1);
|
||||
|
||||
if (chunk_type == RTP_TWCC_CHUNK_TYPE_RUN_LENGTH) {
|
||||
packets_parsed += _parse_run_length_chunk (&reader,
|
||||
twcc_packets, seqnum_offset, remaining_packets);
|
||||
} else {
|
||||
packets_parsed += _parse_status_vector_chunk (&reader,
|
||||
twcc_packets, seqnum_offset, remaining_packets);
|
||||
}
|
||||
fci_parsed += 2;
|
||||
}
|
||||
|
||||
if (twcc->sent_packets->len > 0)
|
||||
first_sent_pkt = &g_array_index (twcc->sent_packets, SentPacket, 0);
|
||||
|
||||
ts_rounded = base_time;
|
||||
for (i = 0; i < twcc_packets->len; i++) {
|
||||
RTPTWCCPacket *pkt = &g_array_index (twcc_packets, RTPTWCCPacket, i);
|
||||
gint16 delta = 0;
|
||||
GstClockTimeDiff delta_ts;
|
||||
|
||||
if (pkt->status == RTP_TWCC_PACKET_STATUS_SMALL_DELTA) {
|
||||
delta = fci_data[fci_parsed];
|
||||
fci_parsed += 1;
|
||||
} else if (pkt->status == RTP_TWCC_PACKET_STATUS_LARGE_NEGATIVE_DELTA) {
|
||||
delta = GST_READ_UINT16_BE (&fci_data[fci_parsed]);
|
||||
fci_parsed += 2;
|
||||
}
|
||||
|
||||
if (fci_parsed > fci_length) {
|
||||
GST_WARNING ("Malformed TWCC RTCP feedback packet");
|
||||
g_array_set_size (twcc_packets, 0);
|
||||
break;
|
||||
}
|
||||
|
||||
if (pkt->status != RTP_TWCC_PACKET_STATUS_NOT_RECV) {
|
||||
delta_ts = delta * DELTA_UNIT;
|
||||
ts_rounded += delta_ts;
|
||||
pkt->remote_ts = ts_rounded;
|
||||
|
||||
GST_LOG ("pkt: #%u, remote_ts: %" GST_TIME_FORMAT
|
||||
" delta_ts: %" GST_STIME_FORMAT
|
||||
" status: %u", pkt->seqnum,
|
||||
GST_TIME_ARGS (pkt->remote_ts), GST_STIME_ARGS (delta_ts),
|
||||
pkt->status);
|
||||
}
|
||||
|
||||
if (first_sent_pkt) {
|
||||
SentPacket *found = NULL;
|
||||
guint16 sent_idx = pkt->seqnum - first_sent_pkt->seqnum;
|
||||
if (sent_idx < twcc->sent_packets->len)
|
||||
found = &g_array_index (twcc->sent_packets, SentPacket, sent_idx);
|
||||
if (found && found->seqnum == pkt->seqnum) {
|
||||
if (GST_CLOCK_TIME_IS_VALID (found->socket_ts)) {
|
||||
pkt->local_ts = found->socket_ts;
|
||||
} else {
|
||||
pkt->local_ts = found->ts;
|
||||
}
|
||||
pkt->size = found->size;
|
||||
|
||||
GST_LOG ("matching pkt: #%u with local_ts: %" GST_TIME_FORMAT
|
||||
" size: %u", pkt->seqnum, GST_TIME_ARGS (pkt->local_ts), pkt->size);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
_prune_sent_packets (twcc, twcc_packets);
|
||||
|
||||
return twcc_packets;
|
||||
}
|
70
gst/rtpmanager/rtptwcc.h
Normal file
70
gst/rtpmanager/rtptwcc.h
Normal file
|
@ -0,0 +1,70 @@
|
|||
/* GStreamer
|
||||
* Copyright (C) 2019 Pexip (http://pexip.com/)
|
||||
* @author: Havard Graff <havard@pexip.com>
|
||||
*
|
||||
* This library is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU Library General Public
|
||||
* License as published by the Free Software Foundation; either
|
||||
* version 2 of the License, or (at your option) any later version.
|
||||
*
|
||||
* This library is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
||||
* Library General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Library General Public
|
||||
* License along with this library; if not, write to the
|
||||
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
|
||||
* Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
#ifndef __RTP_TWCC_H__
|
||||
#define __RTP_TWCC_H__
|
||||
|
||||
#include <gst/gst.h>
|
||||
#include <gst/rtp/rtp.h>
|
||||
#include "rtpstats.h"
|
||||
|
||||
typedef struct _RTPTWCCManager RTPTWCCManager;
|
||||
typedef struct _RTPTWCCPacket RTPTWCCPacket;
|
||||
typedef enum _RTPTWCCPacketStatus RTPTWCCPacketStatus;
|
||||
|
||||
enum _RTPTWCCPacketStatus
|
||||
{
|
||||
RTP_TWCC_PACKET_STATUS_NOT_RECV = 0,
|
||||
RTP_TWCC_PACKET_STATUS_SMALL_DELTA = 1,
|
||||
RTP_TWCC_PACKET_STATUS_LARGE_NEGATIVE_DELTA = 2,
|
||||
};
|
||||
|
||||
struct _RTPTWCCPacket
|
||||
{
|
||||
GstClockTime local_ts;
|
||||
GstClockTime remote_ts;
|
||||
GstClockTimeDiff local_delta;
|
||||
GstClockTimeDiff remote_delta;
|
||||
GstClockTimeDiff delta_delta;
|
||||
RTPTWCCPacketStatus status;
|
||||
guint16 seqnum;
|
||||
guint size;
|
||||
};
|
||||
|
||||
RTPTWCCManager * rtp_twcc_manager_new (guint mtu);
|
||||
void rtp_twcc_manager_free (RTPTWCCManager * twcc);
|
||||
|
||||
void rtp_twcc_manager_set_mtu (RTPTWCCManager * twcc, guint mtu);
|
||||
|
||||
gboolean rtp_twcc_manager_recv_packet (RTPTWCCManager * twcc,
|
||||
guint16 seqnum, RTPPacketInfo * pinfo);
|
||||
|
||||
void rtp_twcc_manager_send_packet (RTPTWCCManager * twcc,
|
||||
guint16 seqnum, RTPPacketInfo * pinfo);
|
||||
void rtp_twcc_manager_set_send_packet_ts (RTPTWCCManager * twcc,
|
||||
guint packet_id, GstClockTime ts);
|
||||
|
||||
GstBuffer * rtp_twcc_manager_get_feedback (RTPTWCCManager * twcc,
|
||||
guint32 sender_ssrc);
|
||||
|
||||
GArray * rtp_twcc_manager_parse_fci (RTPTWCCManager * twcc,
|
||||
guint8 * fci_data, guint fci_length);
|
||||
|
||||
#endif /* __RTP_TWCC_H__ */
|
File diff suppressed because it is too large
Load diff
Loading…
Reference in a new issue