rtpsession: Send as many nack seqnum as possible

In order to do that, we now split the nacks registration from the actual
FB nack packet construction. We then try and add as many FB Nacks as
possible into the active packets and leave the remaining seqnums in the
RTPSource. In order to avoid sending outdated NACK later on, we save the
seqnum calculated deadline and cleanup the outdated seqnums before the
next RTCP send.

Fixes #583
This commit is contained in:
Nicolas Dufresne 2019-03-25 13:42:25 -04:00 committed by Nicolas Dufresne
parent 74a74bfc99
commit 6bb53e75fb
3 changed files with 207 additions and 60 deletions

View file

@ -121,6 +121,8 @@ static void rtp_session_get_property (GObject * object, guint prop_id,
static gboolean rtp_session_send_rtcp (RTPSession * sess,
GstClockTime max_delay);
static gboolean rtp_session_send_rtcp_with_deadline (RTPSession * sess,
GstClockTime deadline);
static guint rtp_session_signals[LAST_SIGNAL] = { 0 };
@ -3581,13 +3583,36 @@ session_nack (const gchar * key, RTPSource * source, ReportData * data)
{
GstRTCPBuffer *rtcp = &data->rtcpbuf;
GstRTCPPacket *packet = &data->packet;
guint32 *nacks;
guint n_nacks, i;
guint16 *nacks;
GstClockTime *nack_deadlines;
guint n_nacks, i = 0;
guint nacked_seqnums = 0;
guint16 n_fb_nacks = 0;
guint8 *fci_data;
if (!source->send_nack)
return;
nacks = rtp_source_get_nacks (source, &n_nacks);
nack_deadlines = rtp_source_get_nack_deadlines (source, NULL);
GST_DEBUG ("%u NACKs current time %" GST_TIME_FORMAT, n_nacks,
GST_TIME_ARGS (data->current_time));
/* cleanup expired nacks */
for (i = 0; i < n_nacks; i++) {
GST_DEBUG ("#%u deadline %" GST_TIME_FORMAT, nacks[i],
GST_TIME_ARGS (nack_deadlines[i]));
if (nack_deadlines[i] >= data->current_time)
break;
}
if (i) {
GST_WARNING ("Removing %u expired NACKS", i);
rtp_source_clear_nacks (source, i);
n_nacks -= i;
if (n_nacks == 0)
return;
}
if (!gst_rtcp_buffer_add_packet (rtcp, GST_RTCP_TYPE_RTPFB, packet))
/* exit because the packet is full, will put next request in a
* further packet */
@ -3597,21 +3622,46 @@ session_nack (const gchar * key, RTPSource * source, ReportData * data)
gst_rtcp_packet_fb_set_sender_ssrc (packet, data->source->ssrc);
gst_rtcp_packet_fb_set_media_ssrc (packet, source->ssrc);
nacks = rtp_source_get_nacks (source, &n_nacks);
GST_DEBUG ("%u NACKs", n_nacks);
if (!gst_rtcp_packet_fb_set_fci_length (packet, n_nacks))
if (!gst_rtcp_packet_fb_set_fci_length (packet, 1)) {
gst_rtcp_packet_remove (packet);
GST_WARNING ("no nacks fit in the packet");
return;
fci_data = gst_rtcp_packet_fb_get_fci (packet);
for (i = 0; i < n_nacks; i++) {
GST_WRITE_UINT32_BE (fci_data, nacks[i]);
fci_data += 4;
data->nacked_seqnums++;
}
rtp_source_clear_nacks (source);
fci_data = gst_rtcp_packet_fb_get_fci (packet);
for (i = 0; i < n_nacks; i = nacked_seqnums) {
guint16 seqnum = nacks[i];
guint16 blp = 0;
guint j;
if (!gst_rtcp_packet_fb_set_fci_length (packet, n_fb_nacks + 1))
break;
n_fb_nacks++;
nacked_seqnums++;
for (j = i + 1; j < n_nacks; j++) {
gint diff;
diff = gst_rtp_buffer_compare_seqnum (seqnum, nacks[j]);
GST_TRACE ("[%u][%u] %u %u diff %i", i, j, seqnum, nacks[j], diff);
if (diff > 16)
break;
blp |= 1 << (diff - 1);
nacked_seqnums++;
}
GST_WRITE_UINT32_BE (fci_data, seqnum << 16 | blp);
fci_data += 4;
}
data->nacked_seqnums += nacked_seqnums;
rtp_source_clear_nacks (source, nacked_seqnums);
data->may_suppress = FALSE;
source->stats.sent_nack_count += n_nacks;
source->stats.sent_nack_count += n_fb_nacks;
GST_DEBUG ("Sent %u seqnums into %u FB NACKs", nacked_seqnums, n_fb_nacks);
}
/* perform cleanup of sources that timed out */
@ -4037,6 +4087,28 @@ update_generation (const gchar * key, RTPSource * source, ReportData * data)
}
}
static void
schedule_remaining_nacks (const gchar * key, RTPSource * source,
ReportData * data)
{
RTPSession *sess = data->sess;
GstClockTime *nack_deadlines;
GstClockTime deadline;
guint n_nacks;
if (!source->send_nack)
return;
/* the scheduling is entirely based on available bandwidth, just take the
* biggest seqnum, which will have the largest deadline to request early
* RTCP. */
nack_deadlines = rtp_source_get_nack_deadlines (source, &n_nacks);
deadline = nack_deadlines[n_nacks - 1];
RTP_SESSION_UNLOCK (sess);
rtp_session_send_rtcp_with_deadline (sess, deadline);
RTP_SESSION_LOCK (sess);
}
static gboolean
rtp_session_are_all_sources_bye (RTPSession * sess)
{
@ -4238,6 +4310,12 @@ done:
if (all_empty)
GST_ERROR ("generated empty RTCP messages for all the sources");
/* schedule remaining nacks */
RTP_SESSION_LOCK (sess);
g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
(GHFunc) schedule_remaining_nacks, &data);
RTP_SESSION_UNLOCK (sess);
return result;
}
@ -4409,6 +4487,35 @@ end:
return ret;
}
static gboolean
rtp_session_send_rtcp_internal (RTPSession * sess, GstClockTime now,
GstClockTime max_delay)
{
/* notify the application that we intend to send early RTCP */
if (sess->callbacks.notify_early_rtcp)
sess->callbacks.notify_early_rtcp (sess, sess->notify_early_rtcp_user_data);
return rtp_session_request_early_rtcp (sess, now, max_delay);
}
static gboolean
rtp_session_send_rtcp_with_deadline (RTPSession * sess, GstClockTime deadline)
{
GstClockTime now, max_delay;
if (!sess->callbacks.send_rtcp)
return FALSE;
now = sess->callbacks.request_time (sess, sess->request_time_user_data);
if (deadline < now)
return FALSE;
max_delay = deadline - now;
return rtp_session_send_rtcp_internal (sess, now, max_delay);
}
static gboolean
rtp_session_send_rtcp (RTPSession * sess, GstClockTime max_delay)
{
@ -4419,11 +4526,7 @@ rtp_session_send_rtcp (RTPSession * sess, GstClockTime max_delay)
now = sess->callbacks.request_time (sess, sess->request_time_user_data);
/* notify the application that we intend to send early RTCP */
if (sess->callbacks.notify_early_rtcp)
sess->callbacks.notify_early_rtcp (sess, sess->notify_early_rtcp_user_data);
return rtp_session_request_early_rtcp (sess, now, max_delay);
return rtp_session_send_rtcp_internal (sess, now, max_delay);
}
gboolean
@ -4479,17 +4582,24 @@ rtp_session_request_nack (RTPSession * sess, guint32 ssrc, guint16 seqnum,
GstClockTime max_delay)
{
RTPSource *source;
GstClockTime now;
if (!sess->callbacks.send_rtcp)
return FALSE;
now = sess->callbacks.request_time (sess, sess->request_time_user_data);
RTP_SESSION_LOCK (sess);
source = find_source (sess, ssrc);
if (source == NULL)
goto no_source;
GST_DEBUG ("request NACK for %08x, #%u", ssrc, seqnum);
rtp_source_register_nack (source, seqnum);
GST_DEBUG ("request NACK for SSRC %08x, #%u, deadline %" GST_TIME_FORMAT,
ssrc, seqnum, GST_TIME_ARGS (now + max_delay));
rtp_source_register_nack (source, seqnum, now + max_delay);
RTP_SESSION_UNLOCK (sess);
if (!rtp_session_send_rtcp (sess, max_delay)) {
if (!rtp_session_send_rtcp_internal (sess, now, max_delay)) {
GST_DEBUG ("NACK not sent early, sending with next regular RTCP");
}

View file

@ -311,7 +311,8 @@ rtp_source_init (RTPSource * src)
src->seqnum_offset = -1;
src->retained_feedback = g_queue_new ();
src->nacks = g_array_new (FALSE, FALSE, sizeof (guint32));
src->nacks = g_array_new (FALSE, FALSE, sizeof (guint16));
src->nack_deadlines = g_array_new (FALSE, FALSE, sizeof (GstClockTime));
src->reported_in_sr_of = g_hash_table_new (g_direct_hash, g_direct_equal);
@ -351,6 +352,7 @@ rtp_source_finalize (GObject * object)
g_queue_free (src->retained_feedback);
g_array_free (src->nacks, TRUE);
g_array_free (src->nack_deadlines, TRUE);
if (src->rtp_from)
g_object_unref (src->rtp_from);
@ -1933,47 +1935,46 @@ rtp_source_has_retained (RTPSource * src, GCompareFunc func, gconstpointer data)
* rtp_source_register_nack:
* @src: The #RTPSource
* @seqnum: a seqnum
* @deadline: the deadline before which RTX is still possible
*
* Register that @seqnum has not been received from @src.
*/
void
rtp_source_register_nack (RTPSource * src, guint16 seqnum)
rtp_source_register_nack (RTPSource * src, guint16 seqnum,
GstClockTime deadline)
{
guint i, len;
guint32 dword = seqnum << 16;
gint diff = 16;
gint i;
guint len;
gint diff = -1;
guint16 tseq;
len = src->nacks->len;
for (i = 0; i < len; i++) {
guint32 tdword;
guint16 tseq;
tdword = g_array_index (src->nacks, guint32, i);
tseq = tdword >> 16;
for (i = len - 1; i >= 0; i--) {
tseq = g_array_index (src->nacks, guint16, i);
diff = gst_rtp_buffer_compare_seqnum (tseq, seqnum);
if (diff < 16)
GST_TRACE ("[%u] %u %u diff %i len %u", i, tseq, seqnum, diff, len);
if (diff >= 0)
break;
}
/* we already have this seqnum */
if (diff == 0)
return;
/* it comes before the recorded seqnum, FIXME, we could merge it
* if not to far away */
if (diff < 0) {
GST_DEBUG ("insert NACK #%u at %u", seqnum, i);
g_array_insert_val (src->nacks, i, dword);
} else if (diff < 16) {
/* we can merge it */
dword = g_array_index (src->nacks, guint32, i);
dword |= 1 << (diff - 1);
GST_DEBUG ("merge NACK #%u at %u with NACK #%u -> 0x%08x", seqnum, i,
dword >> 16, dword);
g_array_index (src->nacks, guint32, i) = dword;
if (diff == 0) {
GST_DEBUG ("update NACK #%u deadline to %" GST_TIME_FORMAT, seqnum,
GST_TIME_ARGS (deadline));
g_array_index (src->nack_deadlines, GstClockTime, i) = deadline;
} else if (i == len - 1) {
GST_DEBUG ("append NACK #%u with deadline %" GST_TIME_FORMAT, seqnum,
GST_TIME_ARGS (deadline));
g_array_append_val (src->nacks, seqnum);
g_array_append_val (src->nack_deadlines, deadline);
} else {
GST_DEBUG ("append NACK #%u", seqnum);
g_array_append_val (src->nacks, dword);
GST_DEBUG ("insert NACK #%u with deadline %" GST_TIME_FORMAT, seqnum,
GST_TIME_ARGS (deadline));
g_array_insert_val (src->nacks, i + 1, seqnum);
g_array_insert_val (src->nack_deadlines, i + 1, deadline);
}
src->send_nack = TRUE;
}
@ -1986,18 +1987,51 @@ rtp_source_register_nack (RTPSource * src, guint16 seqnum)
*
* Returns: an array of @n_nacks seqnum values.
*/
guint32 *
guint16 *
rtp_source_get_nacks (RTPSource * src, guint * n_nacks)
{
if (n_nacks)
*n_nacks = src->nacks->len;
return (guint32 *) src->nacks->data;
return (guint16 *) src->nacks->data;
}
void
rtp_source_clear_nacks (RTPSource * src)
/**
* rtp_source_get_nacks:
* @src: The #RTPSource
* @n_nacks: result number of nacks
*
* Get the registered NACKS deadlines.
*
* Returns: an array of @n_nacks deadline values.
*/
GstClockTime *
rtp_source_get_nack_deadlines (RTPSource * src, guint * n_nacks)
{
g_array_set_size (src->nacks, 0);
src->send_nack = FALSE;
if (n_nacks)
*n_nacks = src->nack_deadlines->len;
return (GstClockTime *) src->nack_deadlines->data;
}
/**
* rtp_source_clear_nacks:
* @src: The #RTPSource
* @n_nacks: number of nacks
*
* Remove @n_nacks oldest NACKS form array.
*/
void
rtp_source_clear_nacks (RTPSource * src, guint n_nacks)
{
g_return_if_fail (n_nacks <= src->nacks->len);
if (src->nacks->len == n_nacks) {
g_array_set_size (src->nacks, 0);
g_array_set_size (src->nack_deadlines, 0);
src->send_nack = FALSE;
} else {
g_array_remove_range (src->nacks, 0, n_nacks);
g_array_remove_range (src->nack_deadlines, 0, n_nacks);
}
}

View file

@ -198,6 +198,7 @@ struct _RTPSource {
gboolean send_nack;
GArray *nacks;
GArray *nack_deadlines;
gboolean pt_set;
guint8 pt;
@ -301,8 +302,10 @@ gboolean rtp_source_has_retained (RTPSource * src,
gconstpointer data);
void rtp_source_register_nack (RTPSource * src,
guint16 seqnum);
guint32 * rtp_source_get_nacks (RTPSource * src, guint *n_nacks);
void rtp_source_clear_nacks (RTPSource * src);
guint16 seqnum,
GstClockTime deadline);
guint16 * rtp_source_get_nacks (RTPSource * src, guint *n_nacks);
GstClockTime * rtp_source_get_nack_deadlines (RTPSource * src, guint *n_nacks);
void rtp_source_clear_nacks (RTPSource * src, guint n_nacks);
#endif /* __RTP_SOURCE_H__ */