From db5150a23a7eb99033a89045be4a3d3f59b201f1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Olivier=20Cr=C3=AAte?= Date: Tue, 22 Jun 2010 13:33:32 -0400 Subject: [PATCH] rtpsource: Retain RTCP Feedback packets for a specified amount of time --- gst/rtpmanager/rtpsession.c | 25 ++++++++++++++++++- gst/rtpmanager/rtpsession.h | 1 + gst/rtpmanager/rtpsource.c | 49 ++++++++++++++++++++++++++++++++++++- gst/rtpmanager/rtpsource.h | 13 +++++++++- 4 files changed, 85 insertions(+), 3 deletions(-) diff --git a/gst/rtpmanager/rtpsession.c b/gst/rtpmanager/rtpsession.c index 5938d55802..ec40115756 100644 --- a/gst/rtpmanager/rtpsession.c +++ b/gst/rtpmanager/rtpsession.c @@ -58,6 +58,7 @@ enum #define DEFAULT_NUM_ACTIVE_SOURCES 0 #define DEFAULT_SOURCES NULL #define DEFAULT_RTCP_MIN_INTERVAL (RTP_STATS_MIN_INTERVAL * GST_SECOND) +#define DEFAULT_RTCP_FEEDBACK_RETENTION_WINDOW (2 * GST_SECOND) enum { @@ -75,6 +76,7 @@ enum PROP_SOURCES, PROP_FAVOR_NEW, PROP_RTCP_MIN_INTERVAL, + PROP_RTCP_FEEDBACK_RETENTION_WINDOW, PROP_LAST }; @@ -390,6 +392,15 @@ rtp_session_class_init (RTPSessionClass * klass) 0, G_MAXUINT64, DEFAULT_RTCP_MIN_INTERVAL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + g_object_class_install_property (gobject_class, + PROP_RTCP_FEEDBACK_RETENTION_WINDOW, + g_param_spec_uint64 ("rtcp-feedback-retention-window", + "RTCP Feedback retention window", + "Duration during which RTCP Feedback packets are retained (in ns)", + 0, G_MAXUINT64, DEFAULT_RTCP_FEEDBACK_RETENTION_WINDOW, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + klass->get_source_by_ssrc = GST_DEBUG_FUNCPTR (rtp_session_get_source_by_ssrc); @@ -444,6 +455,7 @@ rtp_session_init (RTPSession * sess) sess->first_rtcp = TRUE; sess->allow_early = TRUE; + sess->rtcp_feedback_retention_window = DEFAULT_RTCP_FEEDBACK_RETENTION_WINDOW; GST_DEBUG ("%p: session using SSRC: %08x", sess, sess->source->ssrc); } @@ -2005,12 +2017,22 @@ rtp_session_process_feedback (RTPSession * sess, GstRTCPPacket * packet, GST_BUFFER_TIMESTAMP (fci) = arrival->running_time; } + RTP_SESSION_UNLOCK (sess); g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_FEEDBACK_RTCP], 0, type, fbtype, sender_ssrc, media_ssrc, fci); + RTP_SESSION_LOCK (sess); if (fci) gst_buffer_unref (fci); } + + if (sess->rtcp_feedback_retention_window) { + RTPSource *src = g_hash_table_lookup (sess->ssrcs[sess->mask_idx], + GINT_TO_POINTER (media_ssrc)); + + if (src) + rtp_source_retain_rtcp_packet (src, packet, arrival->running_time); + } } /** @@ -2829,7 +2851,8 @@ rtp_session_on_timeout (RTPSession * sess, GstClockTime current_time, /* check for outdated collisions */ GST_DEBUG ("Timing out collisions"); rtp_source_timeout (sess->source, current_time, - data.interval * RTCP_INTERVAL_COLLISION_TIMEOUT); + data.interval * RTCP_INTERVAL_COLLISION_TIMEOUT, + running_time - sess->rtcp_feedback_retention_window); if (sess->change_ssrc) { GST_DEBUG ("need to change our SSRC (%08x)", own->ssrc); diff --git a/gst/rtpmanager/rtpsession.h b/gst/rtpmanager/rtpsession.h index b63bbcc0a2..6ae318e1f5 100644 --- a/gst/rtpmanager/rtpsession.h +++ b/gst/rtpmanager/rtpsession.h @@ -202,6 +202,7 @@ struct _RTPSession { gboolean change_ssrc; gboolean favor_new; + GstClockTime rtcp_feedback_retention_window; }; /** diff --git a/gst/rtpmanager/rtpsource.c b/gst/rtpmanager/rtpsource.c index 4a3af168a7..423d2b65d2 100644 --- a/gst/rtpmanager/rtpsource.c +++ b/gst/rtpmanager/rtpsource.c @@ -238,6 +238,8 @@ rtp_source_init (RTPSource * src) src->seqnum_base = -1; src->last_rtptime = -1; + src->retained_feedback = g_queue_new (); + rtp_source_reset (src); } @@ -262,6 +264,10 @@ rtp_source_finalize (GObject * object) g_list_foreach (src->conflicting_addresses, (GFunc) g_free, NULL); g_list_free (src->conflicting_addresses); + while ((buffer = g_queue_pop_head (src->retained_feedback))) + gst_buffer_unref (buffer); + g_queue_free (src->retained_feedback); + G_OBJECT_CLASS (rtp_source_parent_class)->finalize (object); } @@ -1719,14 +1725,18 @@ rtp_source_add_conflicting_address (RTPSource * src, * @src: The #RTPSource * @current_time: The current time * @collision_timeout: The amount of time after which a collision is timed out + * @feedback_retention_window: The running time before which retained feedback + * packets have to be discarded * * This is processed on each RTCP interval. It times out old collisions. + * It also times out old retained feedback packets */ void rtp_source_timeout (RTPSource * src, GstClockTime current_time, - GstClockTime collision_timeout) + GstClockTime collision_timeout, GstClockTime feedback_retention_window) { GList *item; + GstRTCPPacket *pkt; item = g_list_first (src->conflicting_addresses); while (item) { @@ -1744,4 +1754,41 @@ rtp_source_timeout (RTPSource * src, GstClockTime current_time, } item = next_item; } + + /* Time out AVPF packets that are older than the desired length */ + while ((pkt = g_queue_peek_tail (src->retained_feedback)) && + GST_BUFFER_TIMESTAMP (pkt) < feedback_retention_window) + gst_buffer_unref (g_queue_pop_tail (src->retained_feedback)); +} + +static gint +compare_buffers (gconstpointer a, gconstpointer b, gpointer user_data) +{ + const GstBuffer *bufa = a; + const GstBuffer *bufb = b; + + return GST_BUFFER_TIMESTAMP (bufa) - GST_BUFFER_TIMESTAMP (bufb); +} + +void +rtp_source_retain_rtcp_packet (RTPSource * src, GstRTCPPacket * packet, + GstClockTime running_time) +{ + GstBuffer *buffer; + + buffer = gst_buffer_create_sub (packet->buffer, packet->offset, + (gst_rtcp_packet_get_length (packet) + 1) * 4); + + GST_BUFFER_TIMESTAMP (buffer) = running_time; + + g_queue_insert_sorted (src->retained_feedback, buffer, compare_buffers, NULL); +} + +gboolean +rtp_source_has_retained (RTPSource * src, GCompareFunc func, gconstpointer data) +{ + if (g_queue_find_custom (src->retained_feedback, data, func)) + return TRUE; + else + return FALSE; } diff --git a/gst/rtpmanager/rtpsource.h b/gst/rtpmanager/rtpsource.h index 94c22b3f5a..fadbe3068a 100644 --- a/gst/rtpmanager/rtpsource.h +++ b/gst/rtpmanager/rtpsource.h @@ -170,6 +170,8 @@ struct _RTPSource { RTPReceiverReport last_rr; GList *conflicting_addresses; + + GQueue *retained_feedback; }; struct _RTPSourceClass { @@ -248,7 +250,16 @@ void rtp_source_add_conflicting_address (RTPSource * src, void rtp_source_timeout (RTPSource * src, GstClockTime current_time, - GstClockTime collision_timeout); + GstClockTime collision_timeout, + GstClockTime feedback_retention_window); + +void rtp_source_retain_rtcp_packet (RTPSource * src, + GstRTCPPacket *pkt, + GstClockTime running_time); + +gboolean rtp_source_has_retained (RTPSource * src, + GCompareFunc func, + gconstpointer data); #endif /* __RTP_SOURCE_H__ */