diff --git a/gst/rtpmanager/rtpsession.c b/gst/rtpmanager/rtpsession.c index c29427243b..fd4ddb0fb7 100644 --- a/gst/rtpmanager/rtpsession.c +++ b/gst/rtpmanager/rtpsession.c @@ -100,11 +100,6 @@ enum (avg) = ((val) + (15 * (avg))) >> 4; -/* The number RTCP intervals after which to timeout entries in the - * collision table - */ -#define RTCP_INTERVAL_COLLISION_TIMEOUT 10 - /* GObject vmethods */ static void rtp_session_finalize (GObject * object); static void rtp_session_set_property (GObject * object, guint prop_id, @@ -546,6 +541,9 @@ rtp_session_finalize (GObject * object) gst_structure_free (sess->sdes); + g_list_free_full (sess->conflicting_addresses, + (GDestroyNotify) rtp_conflicting_address_free); + for (i = 0; i < 32; i++) g_hash_table_destroy (sess->ssrcs[i]); @@ -1207,6 +1205,43 @@ static RTPSourceCallbacks callbacks = { (RTPSourceClockRate) source_clock_rate, }; + +/** + * rtp_session_find_conflicting_address: + * @session: The session the packet came in + * @address: address to check for + * @time: The time when the packet that is possibly in conflict arrived + * + * Checks if an address which has a conflict is already known. If it is + * a known conflict, remember the time + * + * Returns: TRUE if it was a known conflict, FALSE otherwise + */ +static gboolean +rtp_session_find_conflicting_address (RTPSession * session, + GSocketAddress * address, GstClockTime time) +{ + return find_conflicting_address (session->conflicting_addresses, address, + time); +} + +/** + * rtp_session_add_conflicting_address: + * @session: The session the packet came in + * @address: address to remember + * @time: The time when the packet that is in conflict arrived + * + * Adds a new conflict address + */ +static void +rtp_session_add_conflicting_address (RTPSession * sess, + GSocketAddress * address, GstClockTime time) +{ + sess->conflicting_addresses = + add_conflicting_address (sess->conflicting_addresses, address, time); +} + + static gboolean check_collision (RTPSession * sess, RTPSource * source, RTPPacketInfo * pinfo, gboolean rtp) @@ -1292,7 +1327,7 @@ check_collision (RTPSession * sess, RTPSource * source, */ } else { /* This is sending with our ssrc, is it an address we already know */ - if (rtp_source_find_conflicting_address (source, pinfo->address, + if (rtp_session_find_conflicting_address (sess, pinfo->address, pinfo->current_time)) { /* Its a known conflict, its probably a loop, not a collision * lets just drop the incoming packet @@ -1300,7 +1335,7 @@ check_collision (RTPSession * sess, RTPSource * source, GST_DEBUG ("Our packets are being looped back to us, dropping"); } else { /* Its a new collision, lets change our SSRC */ - rtp_source_add_conflicting_address (source, pinfo->address, + rtp_session_add_conflicting_address (sess, pinfo->address, pinfo->current_time); GST_DEBUG ("Collision for SSRC %x", ssrc); @@ -3179,8 +3214,6 @@ session_cleanup (const gchar * key, RTPSource * source, ReportData * data) if (source->internal) { GST_DEBUG ("Timing out collisions for %x", source->ssrc); rtp_source_timeout (source, data->current_time, - /* "a relatively long time" -- RFC 3550 section 8.2 */ - RTP_STATS_MIN_INTERVAL * GST_SECOND * 10, data->running_time - sess->rtcp_feedback_retention_window); } @@ -3622,6 +3655,9 @@ rtp_session_on_timeout (RTPSession * sess, GstClockTime current_time, g_object_unref (source); } + sess->conflicting_addresses = + timeout_conflicting_addresses (sess->conflicting_addresses, current_time); + /* Make a local copy of the hashtable. We need to do this because the * cleanup stage below releases the session lock. */ table_copy = g_hash_table_new_full (NULL, NULL, NULL, diff --git a/gst/rtpmanager/rtpsession.h b/gst/rtpmanager/rtpsession.h index 87388aec50..b567ee46a5 100644 --- a/gst/rtpmanager/rtpsession.h +++ b/gst/rtpmanager/rtpsession.h @@ -204,6 +204,7 @@ typedef struct { * @callbacks: callbacks * @user_data: user data passed in callbacks * @stats: session statistics + * @conflicting_addresses: GList of conflicting addresses * * The RTP session manager object */ @@ -269,6 +270,8 @@ struct _RTPSession { gboolean last_keyframe_all_headers; gboolean is_doing_ptp; + + GList *conflicting_addresses; }; /** diff --git a/gst/rtpmanager/rtpsource.c b/gst/rtpmanager/rtpsource.c index 9a56955c6a..d47c85fb3e 100644 --- a/gst/rtpmanager/rtpsource.c +++ b/gst/rtpmanager/rtpsource.c @@ -267,11 +267,11 @@ rtp_source_init (RTPSource * src) rtp_source_reset (src); } -static void +void rtp_conflicting_address_free (RTPConflictingAddress * addr) { g_object_unref (addr->address); - g_free (addr); + g_slice_free (RTPConflictingAddress, addr); } static void @@ -292,10 +292,8 @@ rtp_source_finalize (GObject * object) gst_caps_replace (&src->caps, NULL); - g_list_foreach (src->conflicting_addresses, - (GFunc) rtp_conflicting_address_free, NULL); - g_list_free (src->conflicting_addresses); - + g_list_free_full (src->conflicting_addresses, + (GDestroyNotify) rtp_conflicting_address_free); while ((buffer = g_queue_pop_head (src->retained_feedback))) gst_buffer_unref (buffer); g_queue_free (src->retained_feedback); @@ -1600,6 +1598,67 @@ rtp_source_get_last_rb (RTPSource * src, guint8 * fractionlost, return TRUE; } +gboolean +find_conflicting_address (GList * conflicting_addresses, + GSocketAddress * address, GstClockTime time) +{ + GList *item; + + for (item = conflicting_addresses; item; item = g_list_next (item)) { + RTPConflictingAddress *known_conflict = item->data; + + if (__g_socket_address_equal (address, known_conflict->address)) { + known_conflict->time = time; + return TRUE; + } + } + + return FALSE; +} + +GList * +add_conflicting_address (GList * conflicting_addresses, + GSocketAddress * address, GstClockTime time) +{ + RTPConflictingAddress *new_conflict; + + new_conflict = g_slice_new (RTPConflictingAddress); + + new_conflict->address = G_SOCKET_ADDRESS (g_object_ref (address)); + new_conflict->time = time; + + return g_list_prepend (conflicting_addresses, new_conflict); +} + +GList * +timeout_conflicting_addresses (GList * conflicting_addresses, + GstClockTime current_time) +{ + GList *item; + /* "a relatively long time" -- RFC 3550 section 8.2 */ + const GstClockTime collision_timeout = + RTP_STATS_MIN_INTERVAL * GST_SECOND * 10; + + item = g_list_first (conflicting_addresses); + while (item) { + RTPConflictingAddress *known_conflict = item->data; + GList *next_item = g_list_next (item); + + if (known_conflict->time < current_time - collision_timeout) { + gchar *buf; + + conflicting_addresses = g_list_delete_link (conflicting_addresses, item); + buf = __g_socket_address_to_string (known_conflict->address); + GST_DEBUG ("collision %p timed out: %s", known_conflict, buf); + g_free (buf); + rtp_conflicting_address_free (known_conflict); + } + item = next_item; + } + + return conflicting_addresses; +} + /** * rtp_source_find_conflicting_address: * @src: The source the packet came in @@ -1615,19 +1674,7 @@ gboolean rtp_source_find_conflicting_address (RTPSource * src, GSocketAddress * address, GstClockTime time) { - GList *item; - - for (item = g_list_first (src->conflicting_addresses); - item; item = g_list_next (item)) { - RTPConflictingAddress *known_conflict = item->data; - - if (__g_socket_address_equal (address, known_conflict->address)) { - known_conflict->time = time; - return TRUE; - } - } - - return FALSE; + return find_conflicting_address (src->conflicting_addresses, address, time); } /** @@ -1642,22 +1689,14 @@ void rtp_source_add_conflicting_address (RTPSource * src, GSocketAddress * address, GstClockTime time) { - RTPConflictingAddress *new_conflict; - - new_conflict = g_new0 (RTPConflictingAddress, 1); - - new_conflict->address = G_SOCKET_ADDRESS (g_object_ref (address)); - new_conflict->time = time; - - src->conflicting_addresses = g_list_prepend (src->conflicting_addresses, - new_conflict); + src->conflicting_addresses = + add_conflicting_address (src->conflicting_addresses, address, time); } /** * rtp_source_timeout: * @src: The #RTPSource * @current_time: The current time - * @collision_timeout: The amount of time after which a collision is timed out * @feedback_retention_window: The running time before which retained feedback * packets have to be discarded * @@ -1666,29 +1705,12 @@ rtp_source_add_conflicting_address (RTPSource * src, */ void rtp_source_timeout (RTPSource * src, GstClockTime current_time, - GstClockTime collision_timeout, GstClockTime feedback_retention_window) + GstClockTime feedback_retention_window) { - GList *item; GstRTCPPacket *pkt; - item = g_list_first (src->conflicting_addresses); - while (item) { - RTPConflictingAddress *known_conflict = item->data; - GList *next_item = g_list_next (item); - - if (known_conflict->time < current_time - collision_timeout) { - gchar *buf; - - src->conflicting_addresses = - g_list_delete_link (src->conflicting_addresses, item); - buf = __g_socket_address_to_string (known_conflict->address); - GST_DEBUG ("collision %p timed out: %s", known_conflict, buf); - g_free (buf); - g_object_unref (known_conflict->address); - g_free (known_conflict); - } - item = next_item; - } + src->conflicting_addresses = + timeout_conflicting_addresses (src->conflicting_addresses, current_time); /* Time out AVPF packets that are older than the desired length */ while ((pkt = g_queue_peek_tail (src->retained_feedback)) && diff --git a/gst/rtpmanager/rtpsource.h b/gst/rtpmanager/rtpsource.h index 9af7d8acea..ed4adc9a91 100644 --- a/gst/rtpmanager/rtpsource.h +++ b/gst/rtpmanager/rtpsource.h @@ -266,9 +266,20 @@ void rtp_source_add_conflicting_address (RTPSource * src, GSocketAddress *address, GstClockTime time); +gboolean find_conflicting_address (GList * conflicting_address, + GSocketAddress * address, + GstClockTime time); + +GList * add_conflicting_address (GList * conflicting_addresses, + GSocketAddress * address, + GstClockTime time); +GList * timeout_conflicting_addresses (GList * conflicting_addresses, + GstClockTime current_time); + +void rtp_conflicting_address_free (RTPConflictingAddress * addr); + void rtp_source_timeout (RTPSource * src, GstClockTime current_time, - GstClockTime collision_timeout, GstClockTime feedback_retention_window); void rtp_source_retain_rtcp_packet (RTPSource * src, diff --git a/tests/check/elements/rtpcollision.c b/tests/check/elements/rtpcollision.c index 6c09ef19eb..e9528f9a5a 100644 --- a/tests/check/elements/rtpcollision.c +++ b/tests/check/elements/rtpcollision.c @@ -76,10 +76,10 @@ message_received (GstBus * bus, GstMessage * message, GstPipeline * bin) } static GstBuffer * -create_rtcp_app (guint32 ssrc) +create_rtcp_app (guint32 ssrc, guint count) { GInetAddress *inet_addr_0; - guint16 port = 5678; + guint16 port = 5678 + count; GSocketAddress *socket_addr_0; GstBuffer *rtcp_buffer; GstRTCPPacket *rtcp_packet = NULL; @@ -140,7 +140,7 @@ rtpsession_sinkpad_probe (GstPad * pad, GstPadProbeInfo * info, * (note that after being marked as collied the rtpsession ignores * all non bye packets) */ - rtcp_buffer = create_rtcp_app (ssrc); + rtcp_buffer = create_rtcp_app (ssrc, nb_ssrc_changes); /* push collied packet on recv_rtcp_sink */ gst_pad_push (srcpad, rtcp_buffer); @@ -313,7 +313,7 @@ rtpsession_sinkpad_probe2 (GstPad * pad, GstPadProbeInfo * info, * all non bye packets) */ if (i == 2) { - GstBuffer *rtcp_buffer = create_rtcp_app (rtx_ssrc_before); + GstBuffer *rtcp_buffer = create_rtcp_app (rtx_ssrc_before, 0); /* push collied packet on recv_rtcp_sink */ gst_pad_push (srcpad, rtcp_buffer);