rtpsession: Keep local conflicting addresses in the session

As we now replace the local RTPSource on a conflict, it's no longer possible
to keep local conflicts in the RTPSource, so they instead need to be kept
in the RTPSession.

Also fix the rtpcollision test to generate multiple collisions instead of
one by change the address, as otherwise we detected that it was a single one.
This commit is contained in:
Olivier Crête 2014-05-03 18:30:20 -04:00
parent 9ea8b03946
commit 2e54d38dd0
5 changed files with 135 additions and 63 deletions

View file

@ -100,11 +100,6 @@ enum
(avg) = ((val) + (15 * (avg))) >> 4;
/* The number RTCP intervals after which to timeout entries in the
* collision table
*/
#define RTCP_INTERVAL_COLLISION_TIMEOUT 10
/* GObject vmethods */
static void rtp_session_finalize (GObject * object);
static void rtp_session_set_property (GObject * object, guint prop_id,
@ -546,6 +541,9 @@ rtp_session_finalize (GObject * object)
gst_structure_free (sess->sdes);
g_list_free_full (sess->conflicting_addresses,
(GDestroyNotify) rtp_conflicting_address_free);
for (i = 0; i < 32; i++)
g_hash_table_destroy (sess->ssrcs[i]);
@ -1207,6 +1205,43 @@ static RTPSourceCallbacks callbacks = {
(RTPSourceClockRate) source_clock_rate,
};
/**
* rtp_session_find_conflicting_address:
* @session: The session the packet came in
* @address: address to check for
* @time: The time when the packet that is possibly in conflict arrived
*
* Checks if an address which has a conflict is already known. If it is
* a known conflict, remember the time
*
* Returns: TRUE if it was a known conflict, FALSE otherwise
*/
static gboolean
rtp_session_find_conflicting_address (RTPSession * session,
GSocketAddress * address, GstClockTime time)
{
return find_conflicting_address (session->conflicting_addresses, address,
time);
}
/**
* rtp_session_add_conflicting_address:
* @session: The session the packet came in
* @address: address to remember
* @time: The time when the packet that is in conflict arrived
*
* Adds a new conflict address
*/
static void
rtp_session_add_conflicting_address (RTPSession * sess,
GSocketAddress * address, GstClockTime time)
{
sess->conflicting_addresses =
add_conflicting_address (sess->conflicting_addresses, address, time);
}
static gboolean
check_collision (RTPSession * sess, RTPSource * source,
RTPPacketInfo * pinfo, gboolean rtp)
@ -1292,7 +1327,7 @@ check_collision (RTPSession * sess, RTPSource * source,
*/
} else {
/* This is sending with our ssrc, is it an address we already know */
if (rtp_source_find_conflicting_address (source, pinfo->address,
if (rtp_session_find_conflicting_address (sess, pinfo->address,
pinfo->current_time)) {
/* Its a known conflict, its probably a loop, not a collision
* lets just drop the incoming packet
@ -1300,7 +1335,7 @@ check_collision (RTPSession * sess, RTPSource * source,
GST_DEBUG ("Our packets are being looped back to us, dropping");
} else {
/* Its a new collision, lets change our SSRC */
rtp_source_add_conflicting_address (source, pinfo->address,
rtp_session_add_conflicting_address (sess, pinfo->address,
pinfo->current_time);
GST_DEBUG ("Collision for SSRC %x", ssrc);
@ -3179,8 +3214,6 @@ session_cleanup (const gchar * key, RTPSource * source, ReportData * data)
if (source->internal) {
GST_DEBUG ("Timing out collisions for %x", source->ssrc);
rtp_source_timeout (source, data->current_time,
/* "a relatively long time" -- RFC 3550 section 8.2 */
RTP_STATS_MIN_INTERVAL * GST_SECOND * 10,
data->running_time - sess->rtcp_feedback_retention_window);
}
@ -3622,6 +3655,9 @@ rtp_session_on_timeout (RTPSession * sess, GstClockTime current_time,
g_object_unref (source);
}
sess->conflicting_addresses =
timeout_conflicting_addresses (sess->conflicting_addresses, current_time);
/* Make a local copy of the hashtable. We need to do this because the
* cleanup stage below releases the session lock. */
table_copy = g_hash_table_new_full (NULL, NULL, NULL,

View file

@ -204,6 +204,7 @@ typedef struct {
* @callbacks: callbacks
* @user_data: user data passed in callbacks
* @stats: session statistics
* @conflicting_addresses: GList of conflicting addresses
*
* The RTP session manager object
*/
@ -269,6 +270,8 @@ struct _RTPSession {
gboolean last_keyframe_all_headers;
gboolean is_doing_ptp;
GList *conflicting_addresses;
};
/**

View file

@ -267,11 +267,11 @@ rtp_source_init (RTPSource * src)
rtp_source_reset (src);
}
static void
void
rtp_conflicting_address_free (RTPConflictingAddress * addr)
{
g_object_unref (addr->address);
g_free (addr);
g_slice_free (RTPConflictingAddress, addr);
}
static void
@ -292,10 +292,8 @@ rtp_source_finalize (GObject * object)
gst_caps_replace (&src->caps, NULL);
g_list_foreach (src->conflicting_addresses,
(GFunc) rtp_conflicting_address_free, NULL);
g_list_free (src->conflicting_addresses);
g_list_free_full (src->conflicting_addresses,
(GDestroyNotify) rtp_conflicting_address_free);
while ((buffer = g_queue_pop_head (src->retained_feedback)))
gst_buffer_unref (buffer);
g_queue_free (src->retained_feedback);
@ -1600,6 +1598,67 @@ rtp_source_get_last_rb (RTPSource * src, guint8 * fractionlost,
return TRUE;
}
gboolean
find_conflicting_address (GList * conflicting_addresses,
GSocketAddress * address, GstClockTime time)
{
GList *item;
for (item = conflicting_addresses; item; item = g_list_next (item)) {
RTPConflictingAddress *known_conflict = item->data;
if (__g_socket_address_equal (address, known_conflict->address)) {
known_conflict->time = time;
return TRUE;
}
}
return FALSE;
}
GList *
add_conflicting_address (GList * conflicting_addresses,
GSocketAddress * address, GstClockTime time)
{
RTPConflictingAddress *new_conflict;
new_conflict = g_slice_new (RTPConflictingAddress);
new_conflict->address = G_SOCKET_ADDRESS (g_object_ref (address));
new_conflict->time = time;
return g_list_prepend (conflicting_addresses, new_conflict);
}
GList *
timeout_conflicting_addresses (GList * conflicting_addresses,
GstClockTime current_time)
{
GList *item;
/* "a relatively long time" -- RFC 3550 section 8.2 */
const GstClockTime collision_timeout =
RTP_STATS_MIN_INTERVAL * GST_SECOND * 10;
item = g_list_first (conflicting_addresses);
while (item) {
RTPConflictingAddress *known_conflict = item->data;
GList *next_item = g_list_next (item);
if (known_conflict->time < current_time - collision_timeout) {
gchar *buf;
conflicting_addresses = g_list_delete_link (conflicting_addresses, item);
buf = __g_socket_address_to_string (known_conflict->address);
GST_DEBUG ("collision %p timed out: %s", known_conflict, buf);
g_free (buf);
rtp_conflicting_address_free (known_conflict);
}
item = next_item;
}
return conflicting_addresses;
}
/**
* rtp_source_find_conflicting_address:
* @src: The source the packet came in
@ -1615,19 +1674,7 @@ gboolean
rtp_source_find_conflicting_address (RTPSource * src, GSocketAddress * address,
GstClockTime time)
{
GList *item;
for (item = g_list_first (src->conflicting_addresses);
item; item = g_list_next (item)) {
RTPConflictingAddress *known_conflict = item->data;
if (__g_socket_address_equal (address, known_conflict->address)) {
known_conflict->time = time;
return TRUE;
}
}
return FALSE;
return find_conflicting_address (src->conflicting_addresses, address, time);
}
/**
@ -1642,22 +1689,14 @@ void
rtp_source_add_conflicting_address (RTPSource * src,
GSocketAddress * address, GstClockTime time)
{
RTPConflictingAddress *new_conflict;
new_conflict = g_new0 (RTPConflictingAddress, 1);
new_conflict->address = G_SOCKET_ADDRESS (g_object_ref (address));
new_conflict->time = time;
src->conflicting_addresses = g_list_prepend (src->conflicting_addresses,
new_conflict);
src->conflicting_addresses =
add_conflicting_address (src->conflicting_addresses, address, time);
}
/**
* rtp_source_timeout:
* @src: The #RTPSource
* @current_time: The current time
* @collision_timeout: The amount of time after which a collision is timed out
* @feedback_retention_window: The running time before which retained feedback
* packets have to be discarded
*
@ -1666,29 +1705,12 @@ rtp_source_add_conflicting_address (RTPSource * src,
*/
void
rtp_source_timeout (RTPSource * src, GstClockTime current_time,
GstClockTime collision_timeout, GstClockTime feedback_retention_window)
GstClockTime feedback_retention_window)
{
GList *item;
GstRTCPPacket *pkt;
item = g_list_first (src->conflicting_addresses);
while (item) {
RTPConflictingAddress *known_conflict = item->data;
GList *next_item = g_list_next (item);
if (known_conflict->time < current_time - collision_timeout) {
gchar *buf;
src->conflicting_addresses =
g_list_delete_link (src->conflicting_addresses, item);
buf = __g_socket_address_to_string (known_conflict->address);
GST_DEBUG ("collision %p timed out: %s", known_conflict, buf);
g_free (buf);
g_object_unref (known_conflict->address);
g_free (known_conflict);
}
item = next_item;
}
src->conflicting_addresses =
timeout_conflicting_addresses (src->conflicting_addresses, current_time);
/* Time out AVPF packets that are older than the desired length */
while ((pkt = g_queue_peek_tail (src->retained_feedback)) &&

View file

@ -266,9 +266,20 @@ void rtp_source_add_conflicting_address (RTPSource * src,
GSocketAddress *address,
GstClockTime time);
gboolean find_conflicting_address (GList * conflicting_address,
GSocketAddress * address,
GstClockTime time);
GList * add_conflicting_address (GList * conflicting_addresses,
GSocketAddress * address,
GstClockTime time);
GList * timeout_conflicting_addresses (GList * conflicting_addresses,
GstClockTime current_time);
void rtp_conflicting_address_free (RTPConflictingAddress * addr);
void rtp_source_timeout (RTPSource * src,
GstClockTime current_time,
GstClockTime collision_timeout,
GstClockTime feedback_retention_window);
void rtp_source_retain_rtcp_packet (RTPSource * src,

View file

@ -76,10 +76,10 @@ message_received (GstBus * bus, GstMessage * message, GstPipeline * bin)
}
static GstBuffer *
create_rtcp_app (guint32 ssrc)
create_rtcp_app (guint32 ssrc, guint count)
{
GInetAddress *inet_addr_0;
guint16 port = 5678;
guint16 port = 5678 + count;
GSocketAddress *socket_addr_0;
GstBuffer *rtcp_buffer;
GstRTCPPacket *rtcp_packet = NULL;
@ -140,7 +140,7 @@ rtpsession_sinkpad_probe (GstPad * pad, GstPadProbeInfo * info,
* (note that after being marked as collied the rtpsession ignores
* all non bye packets)
*/
rtcp_buffer = create_rtcp_app (ssrc);
rtcp_buffer = create_rtcp_app (ssrc, nb_ssrc_changes);
/* push collied packet on recv_rtcp_sink */
gst_pad_push (srcpad, rtcp_buffer);
@ -313,7 +313,7 @@ rtpsession_sinkpad_probe2 (GstPad * pad, GstPadProbeInfo * info,
* all non bye packets)
*/
if (i == 2) {
GstBuffer *rtcp_buffer = create_rtcp_app (rtx_ssrc_before);
GstBuffer *rtcp_buffer = create_rtcp_app (rtx_ssrc_before, 0);
/* push collied packet on recv_rtcp_sink */
gst_pad_push (srcpad, rtcp_buffer);