diff --git a/gst/rtpmanager/rtpsession.c b/gst/rtpmanager/rtpsession.c index 051140f0ad..d43938b3fb 100644 --- a/gst/rtpmanager/rtpsession.c +++ b/gst/rtpmanager/rtpsession.c @@ -121,6 +121,8 @@ static void rtp_session_get_property (GObject * object, guint prop_id, static gboolean rtp_session_send_rtcp (RTPSession * sess, GstClockTime max_delay); +static gboolean rtp_session_send_rtcp_with_deadline (RTPSession * sess, + GstClockTime deadline); static guint rtp_session_signals[LAST_SIGNAL] = { 0 }; @@ -3581,13 +3583,36 @@ session_nack (const gchar * key, RTPSource * source, ReportData * data) { GstRTCPBuffer *rtcp = &data->rtcpbuf; GstRTCPPacket *packet = &data->packet; - guint32 *nacks; - guint n_nacks, i; + guint16 *nacks; + GstClockTime *nack_deadlines; + guint n_nacks, i = 0; + guint nacked_seqnums = 0; + guint16 n_fb_nacks = 0; guint8 *fci_data; if (!source->send_nack) return; + nacks = rtp_source_get_nacks (source, &n_nacks); + nack_deadlines = rtp_source_get_nack_deadlines (source, NULL); + GST_DEBUG ("%u NACKs current time %" GST_TIME_FORMAT, n_nacks, + GST_TIME_ARGS (data->current_time)); + + /* cleanup expired nacks */ + for (i = 0; i < n_nacks; i++) { + GST_DEBUG ("#%u deadline %" GST_TIME_FORMAT, nacks[i], + GST_TIME_ARGS (nack_deadlines[i])); + if (nack_deadlines[i] >= data->current_time) + break; + } + if (i) { + GST_WARNING ("Removing %u expired NACKS", i); + rtp_source_clear_nacks (source, i); + n_nacks -= i; + if (n_nacks == 0) + return; + } + if (!gst_rtcp_buffer_add_packet (rtcp, GST_RTCP_TYPE_RTPFB, packet)) /* exit because the packet is full, will put next request in a * further packet */ @@ -3597,21 +3622,46 @@ session_nack (const gchar * key, RTPSource * source, ReportData * data) gst_rtcp_packet_fb_set_sender_ssrc (packet, data->source->ssrc); gst_rtcp_packet_fb_set_media_ssrc (packet, source->ssrc); - nacks = rtp_source_get_nacks (source, &n_nacks); - GST_DEBUG ("%u NACKs", n_nacks); - if (!gst_rtcp_packet_fb_set_fci_length (packet, n_nacks)) + if (!gst_rtcp_packet_fb_set_fci_length (packet, 1)) { + gst_rtcp_packet_remove (packet); + GST_WARNING ("no nacks fit in the packet"); return; - - fci_data = gst_rtcp_packet_fb_get_fci (packet); - for (i = 0; i < n_nacks; i++) { - GST_WRITE_UINT32_BE (fci_data, nacks[i]); - fci_data += 4; - data->nacked_seqnums++; } - rtp_source_clear_nacks (source); + fci_data = gst_rtcp_packet_fb_get_fci (packet); + for (i = 0; i < n_nacks; i = nacked_seqnums) { + guint16 seqnum = nacks[i]; + guint16 blp = 0; + guint j; + + if (!gst_rtcp_packet_fb_set_fci_length (packet, n_fb_nacks + 1)) + break; + + n_fb_nacks++; + nacked_seqnums++; + + for (j = i + 1; j < n_nacks; j++) { + gint diff; + + diff = gst_rtp_buffer_compare_seqnum (seqnum, nacks[j]); + GST_TRACE ("[%u][%u] %u %u diff %i", i, j, seqnum, nacks[j], diff); + if (diff > 16) + break; + + blp |= 1 << (diff - 1); + nacked_seqnums++; + } + + GST_WRITE_UINT32_BE (fci_data, seqnum << 16 | blp); + fci_data += 4; + } + + data->nacked_seqnums += nacked_seqnums; + rtp_source_clear_nacks (source, nacked_seqnums); data->may_suppress = FALSE; - source->stats.sent_nack_count += n_nacks; + source->stats.sent_nack_count += n_fb_nacks; + + GST_DEBUG ("Sent %u seqnums into %u FB NACKs", nacked_seqnums, n_fb_nacks); } /* perform cleanup of sources that timed out */ @@ -4037,6 +4087,28 @@ update_generation (const gchar * key, RTPSource * source, ReportData * data) } } +static void +schedule_remaining_nacks (const gchar * key, RTPSource * source, + ReportData * data) +{ + RTPSession *sess = data->sess; + GstClockTime *nack_deadlines; + GstClockTime deadline; + guint n_nacks; + + if (!source->send_nack) + return; + + /* the scheduling is entirely based on available bandwidth, just take the + * biggest seqnum, which will have the largest deadline to request early + * RTCP. */ + nack_deadlines = rtp_source_get_nack_deadlines (source, &n_nacks); + deadline = nack_deadlines[n_nacks - 1]; + RTP_SESSION_UNLOCK (sess); + rtp_session_send_rtcp_with_deadline (sess, deadline); + RTP_SESSION_LOCK (sess); +} + static gboolean rtp_session_are_all_sources_bye (RTPSession * sess) { @@ -4238,6 +4310,12 @@ done: if (all_empty) GST_ERROR ("generated empty RTCP messages for all the sources"); + /* schedule remaining nacks */ + RTP_SESSION_LOCK (sess); + g_hash_table_foreach (sess->ssrcs[sess->mask_idx], + (GHFunc) schedule_remaining_nacks, &data); + RTP_SESSION_UNLOCK (sess); + return result; } @@ -4409,6 +4487,35 @@ end: return ret; } +static gboolean +rtp_session_send_rtcp_internal (RTPSession * sess, GstClockTime now, + GstClockTime max_delay) +{ + /* notify the application that we intend to send early RTCP */ + if (sess->callbacks.notify_early_rtcp) + sess->callbacks.notify_early_rtcp (sess, sess->notify_early_rtcp_user_data); + + return rtp_session_request_early_rtcp (sess, now, max_delay); +} + +static gboolean +rtp_session_send_rtcp_with_deadline (RTPSession * sess, GstClockTime deadline) +{ + GstClockTime now, max_delay; + + if (!sess->callbacks.send_rtcp) + return FALSE; + + now = sess->callbacks.request_time (sess, sess->request_time_user_data); + + if (deadline < now) + return FALSE; + + max_delay = deadline - now; + + return rtp_session_send_rtcp_internal (sess, now, max_delay); +} + static gboolean rtp_session_send_rtcp (RTPSession * sess, GstClockTime max_delay) { @@ -4419,11 +4526,7 @@ rtp_session_send_rtcp (RTPSession * sess, GstClockTime max_delay) now = sess->callbacks.request_time (sess, sess->request_time_user_data); - /* notify the application that we intend to send early RTCP */ - if (sess->callbacks.notify_early_rtcp) - sess->callbacks.notify_early_rtcp (sess, sess->notify_early_rtcp_user_data); - - return rtp_session_request_early_rtcp (sess, now, max_delay); + return rtp_session_send_rtcp_internal (sess, now, max_delay); } gboolean @@ -4479,17 +4582,24 @@ rtp_session_request_nack (RTPSession * sess, guint32 ssrc, guint16 seqnum, GstClockTime max_delay) { RTPSource *source; + GstClockTime now; + + if (!sess->callbacks.send_rtcp) + return FALSE; + + now = sess->callbacks.request_time (sess, sess->request_time_user_data); RTP_SESSION_LOCK (sess); source = find_source (sess, ssrc); if (source == NULL) goto no_source; - GST_DEBUG ("request NACK for %08x, #%u", ssrc, seqnum); - rtp_source_register_nack (source, seqnum); + GST_DEBUG ("request NACK for SSRC %08x, #%u, deadline %" GST_TIME_FORMAT, + ssrc, seqnum, GST_TIME_ARGS (now + max_delay)); + rtp_source_register_nack (source, seqnum, now + max_delay); RTP_SESSION_UNLOCK (sess); - if (!rtp_session_send_rtcp (sess, max_delay)) { + if (!rtp_session_send_rtcp_internal (sess, now, max_delay)) { GST_DEBUG ("NACK not sent early, sending with next regular RTCP"); } diff --git a/gst/rtpmanager/rtpsource.c b/gst/rtpmanager/rtpsource.c index 52fe81d7c2..4d90795e6b 100644 --- a/gst/rtpmanager/rtpsource.c +++ b/gst/rtpmanager/rtpsource.c @@ -311,7 +311,8 @@ rtp_source_init (RTPSource * src) src->seqnum_offset = -1; src->retained_feedback = g_queue_new (); - src->nacks = g_array_new (FALSE, FALSE, sizeof (guint32)); + src->nacks = g_array_new (FALSE, FALSE, sizeof (guint16)); + src->nack_deadlines = g_array_new (FALSE, FALSE, sizeof (GstClockTime)); src->reported_in_sr_of = g_hash_table_new (g_direct_hash, g_direct_equal); @@ -351,6 +352,7 @@ rtp_source_finalize (GObject * object) g_queue_free (src->retained_feedback); g_array_free (src->nacks, TRUE); + g_array_free (src->nack_deadlines, TRUE); if (src->rtp_from) g_object_unref (src->rtp_from); @@ -1933,47 +1935,46 @@ rtp_source_has_retained (RTPSource * src, GCompareFunc func, gconstpointer data) * rtp_source_register_nack: * @src: The #RTPSource * @seqnum: a seqnum + * @deadline: the deadline before which RTX is still possible * * Register that @seqnum has not been received from @src. */ void -rtp_source_register_nack (RTPSource * src, guint16 seqnum) +rtp_source_register_nack (RTPSource * src, guint16 seqnum, + GstClockTime deadline) { - guint i, len; - guint32 dword = seqnum << 16; - gint diff = 16; + gint i; + guint len; + gint diff = -1; + guint16 tseq; len = src->nacks->len; - for (i = 0; i < len; i++) { - guint32 tdword; - guint16 tseq; - - tdword = g_array_index (src->nacks, guint32, i); - tseq = tdword >> 16; - + for (i = len - 1; i >= 0; i--) { + tseq = g_array_index (src->nacks, guint16, i); diff = gst_rtp_buffer_compare_seqnum (tseq, seqnum); - if (diff < 16) + + GST_TRACE ("[%u] %u %u diff %i len %u", i, tseq, seqnum, diff, len); + + if (diff >= 0) break; } - /* we already have this seqnum */ - if (diff == 0) - return; - /* it comes before the recorded seqnum, FIXME, we could merge it - * if not to far away */ - if (diff < 0) { - GST_DEBUG ("insert NACK #%u at %u", seqnum, i); - g_array_insert_val (src->nacks, i, dword); - } else if (diff < 16) { - /* we can merge it */ - dword = g_array_index (src->nacks, guint32, i); - dword |= 1 << (diff - 1); - GST_DEBUG ("merge NACK #%u at %u with NACK #%u -> 0x%08x", seqnum, i, - dword >> 16, dword); - g_array_index (src->nacks, guint32, i) = dword; + + if (diff == 0) { + GST_DEBUG ("update NACK #%u deadline to %" GST_TIME_FORMAT, seqnum, + GST_TIME_ARGS (deadline)); + g_array_index (src->nack_deadlines, GstClockTime, i) = deadline; + } else if (i == len - 1) { + GST_DEBUG ("append NACK #%u with deadline %" GST_TIME_FORMAT, seqnum, + GST_TIME_ARGS (deadline)); + g_array_append_val (src->nacks, seqnum); + g_array_append_val (src->nack_deadlines, deadline); } else { - GST_DEBUG ("append NACK #%u", seqnum); - g_array_append_val (src->nacks, dword); + GST_DEBUG ("insert NACK #%u with deadline %" GST_TIME_FORMAT, seqnum, + GST_TIME_ARGS (deadline)); + g_array_insert_val (src->nacks, i + 1, seqnum); + g_array_insert_val (src->nack_deadlines, i + 1, deadline); } + src->send_nack = TRUE; } @@ -1986,18 +1987,51 @@ rtp_source_register_nack (RTPSource * src, guint16 seqnum) * * Returns: an array of @n_nacks seqnum values. */ -guint32 * +guint16 * rtp_source_get_nacks (RTPSource * src, guint * n_nacks) { if (n_nacks) *n_nacks = src->nacks->len; - return (guint32 *) src->nacks->data; + return (guint16 *) src->nacks->data; } -void -rtp_source_clear_nacks (RTPSource * src) +/** + * rtp_source_get_nacks: + * @src: The #RTPSource + * @n_nacks: result number of nacks + * + * Get the registered NACKS deadlines. + * + * Returns: an array of @n_nacks deadline values. + */ +GstClockTime * +rtp_source_get_nack_deadlines (RTPSource * src, guint * n_nacks) { - g_array_set_size (src->nacks, 0); - src->send_nack = FALSE; + if (n_nacks) + *n_nacks = src->nack_deadlines->len; + + return (GstClockTime *) src->nack_deadlines->data; +} + +/** + * rtp_source_clear_nacks: + * @src: The #RTPSource + * @n_nacks: number of nacks + * + * Remove @n_nacks oldest NACKS form array. + */ +void +rtp_source_clear_nacks (RTPSource * src, guint n_nacks) +{ + g_return_if_fail (n_nacks <= src->nacks->len); + + if (src->nacks->len == n_nacks) { + g_array_set_size (src->nacks, 0); + g_array_set_size (src->nack_deadlines, 0); + src->send_nack = FALSE; + } else { + g_array_remove_range (src->nacks, 0, n_nacks); + g_array_remove_range (src->nack_deadlines, 0, n_nacks); + } } diff --git a/gst/rtpmanager/rtpsource.h b/gst/rtpmanager/rtpsource.h index e292074d54..dff7b313f8 100644 --- a/gst/rtpmanager/rtpsource.h +++ b/gst/rtpmanager/rtpsource.h @@ -198,6 +198,7 @@ struct _RTPSource { gboolean send_nack; GArray *nacks; + GArray *nack_deadlines; gboolean pt_set; guint8 pt; @@ -301,8 +302,10 @@ gboolean rtp_source_has_retained (RTPSource * src, gconstpointer data); void rtp_source_register_nack (RTPSource * src, - guint16 seqnum); -guint32 * rtp_source_get_nacks (RTPSource * src, guint *n_nacks); -void rtp_source_clear_nacks (RTPSource * src); + guint16 seqnum, + GstClockTime deadline); +guint16 * rtp_source_get_nacks (RTPSource * src, guint *n_nacks); +GstClockTime * rtp_source_get_nack_deadlines (RTPSource * src, guint *n_nacks); +void rtp_source_clear_nacks (RTPSource * src, guint n_nacks); #endif /* __RTP_SOURCE_H__ */