rtpmanager: Port to GIO

This commit is contained in:
Sebastian Dröge 2012-01-17 13:08:42 +01:00
parent a3496b14ea
commit cb789e32ad
5 changed files with 127 additions and 63 deletions

View file

@ -1189,27 +1189,24 @@ check_collision (RTPSession * sess, RTPSource * source,
RTPArrivalStats * arrival, gboolean rtp)
{
/* If we have no arrival address, we can't do collision checking */
if (!arrival->have_address)
if (!arrival->address)
return FALSE;
if (sess->source != source) {
GstNetAddress *from;
gboolean have_from;
GSocketAddress *from;
/* This is not our local source, but lets check if two remote
* source collide
*/
if (rtp) {
from = &source->rtp_from;
have_from = source->have_rtp_from;
from = source->rtp_from;
} else {
from = &source->rtcp_from;
have_from = source->have_rtcp_from;
from = source->rtcp_from;
}
if (have_from) {
if (gst_net_address_equal (from, &arrival->address)) {
if (from) {
if (__g_socket_address_equal (from, arrival->address)) {
/* Address is the same */
return FALSE;
} else {
@ -1217,14 +1214,17 @@ check_collision (RTPSession * sess, RTPSource * source,
rtp_source_get_ssrc (source));
if (sess->favor_new) {
if (rtp_source_find_conflicting_address (source,
&arrival->address, arrival->current_time)) {
gchar buf1[40];
gst_net_address_to_string (&arrival->address, buf1, 40);
arrival->address, arrival->current_time)) {
gchar *buf1;
buf1 = __g_socket_address_to_string (arrival->address);
GST_LOG ("Known conflict on %x for %s, dropping packet",
rtp_source_get_ssrc (source), buf1);
g_free (buf1);
return TRUE;
} else {
gchar buf1[40], buf2[40];
gchar *buf1, *buf2;
/* Current address is not a known conflict, lets assume this is
* a new source. Save old address in possible conflict list
@ -1232,16 +1232,21 @@ check_collision (RTPSession * sess, RTPSource * source,
rtp_source_add_conflicting_address (source, from,
arrival->current_time);
gst_net_address_to_string (from, buf1, 40);
gst_net_address_to_string (&arrival->address, buf2, 40);
buf1 = __g_socket_address_to_string (from);
buf2 = __g_socket_address_to_string (arrival->address);
GST_DEBUG ("New conflict for ssrc %x, replacing %s with %s,"
" saving old as known conflict",
rtp_source_get_ssrc (source), buf1, buf2);
if (rtp)
rtp_source_set_rtp_from (source, &arrival->address);
rtp_source_set_rtp_from (source, arrival->address);
else
rtp_source_set_rtcp_from (source, &arrival->address);
rtp_source_set_rtcp_from (source, arrival->address);
g_free (buf1);
g_free (buf2);
return FALSE;
}
} else {
@ -1252,9 +1257,9 @@ check_collision (RTPSession * sess, RTPSource * source,
} else {
/* We don't already have a from address for RTP, just set it */
if (rtp)
rtp_source_set_rtp_from (source, &arrival->address);
rtp_source_set_rtp_from (source, arrival->address);
else
rtp_source_set_rtcp_from (source, &arrival->address);
rtp_source_set_rtcp_from (source, arrival->address);
return FALSE;
}
@ -1272,11 +1277,11 @@ check_collision (RTPSession * sess, RTPSource * source,
if (inactivity_period > 1 * GST_SECOND) {
/* Use new network address */
if (rtp) {
g_assert (source->have_rtp_from);
rtp_source_set_rtp_from (source, &arrival->address);
g_assert (source->rtp_from);
rtp_source_set_rtp_from (source, arrival->address);
} else {
g_assert (source->have_rtcp_from);
rtp_source_set_rtcp_from (source, &arrival->address);
g_assert (source->rtcp_from);
rtp_source_set_rtcp_from (source, arrival->address);
}
return FALSE;
}
@ -1284,7 +1289,7 @@ check_collision (RTPSession * sess, RTPSource * source,
} else {
/* This is sending with our ssrc, is it an address we already know */
if (rtp_source_find_conflicting_address (source, &arrival->address,
if (rtp_source_find_conflicting_address (source, arrival->address,
arrival->current_time)) {
/* Its a known conflict, its probably a loop, not a collision
* lets just drop the incoming packet
@ -1293,7 +1298,7 @@ check_collision (RTPSession * sess, RTPSource * source,
} else {
/* Its a new collision, lets change our SSRC */
rtp_source_add_conflicting_address (source, &arrival->address,
rtp_source_add_conflicting_address (source, arrival->address,
arrival->current_time);
GST_DEBUG ("Collision for SSRC %x", rtp_source_get_ssrc (source));
@ -1333,11 +1338,11 @@ obtain_source (RTPSession * sess, guint32 ssrc, gboolean * created,
source->probation = 0;
/* store from address, if any */
if (arrival->have_address) {
if (arrival->address) {
if (rtp)
rtp_source_set_rtp_from (source, &arrival->address);
rtp_source_set_rtp_from (source, arrival->address);
else
rtp_source_set_rtcp_from (source, &arrival->address);
rtp_source_set_rtcp_from (source, arrival->address);
}
/* configure a callback on the source */
@ -1649,11 +1654,12 @@ update_arrival_stats (RTPSession * sess, RTPArrivalStats * arrival,
/* for netbuffer we can store the IP address to check for collisions */
meta = gst_buffer_get_net_address_meta (buffer);
if (arrival->address)
g_object_unref (arrival->address);
if (meta) {
arrival->have_address = TRUE;
memcpy (&arrival->address, &meta->naddr, sizeof (GstNetAddress));
arrival->address = G_SOCKET_ADDRESS (g_object_ref (meta->addr));
} else {
arrival->have_address = FALSE;
arrival->address = NULL;
}
}
@ -2373,6 +2379,9 @@ rtp_session_process_rtcp (RTPSession * sess, GstBuffer * buffer,
sess->stats.avg_rtcp_packet_size, arrival.bytes);
RTP_SESSION_UNLOCK (sess);
if (arrival.address)
g_object_unref (arrival.address);
/* notify caller of sr packets in the callback */
if (do_sync && sess->callbacks.sync_rtcp) {
/* make writable, we might want to change the buffer */

View file

@ -243,6 +243,13 @@ rtp_source_init (RTPSource * src)
rtp_source_reset (src);
}
static void
rtp_conflicting_address_free (RTPConflictingAddress * addr)
{
g_object_unref (addr->address);
g_free (addr);
}
static void
rtp_source_finalize (GObject * object)
{
@ -261,13 +268,19 @@ rtp_source_finalize (GObject * object)
gst_caps_replace (&src->caps, NULL);
g_list_foreach (src->conflicting_addresses, (GFunc) g_free, NULL);
g_list_foreach (src->conflicting_addresses,
(GFunc) rtp_conflicting_address_free, NULL);
g_list_free (src->conflicting_addresses);
while ((buffer = g_queue_pop_head (src->retained_feedback)))
gst_buffer_unref (buffer);
g_queue_free (src->retained_feedback);
if (src->rtp_from)
g_object_unref (src->rtp_from);
if (src->rtcp_from)
g_object_unref (src->rtcp_from);
G_OBJECT_CLASS (rtp_source_parent_class)->finalize (object);
}
@ -277,7 +290,7 @@ rtp_source_create_stats (RTPSource * src)
GstStructure *s;
gboolean is_sender = src->is_sender;
gboolean internal = src->internal;
gchar address_str[GST_NETADDRESS_MAX_LEN];
gchar *address_str;
gboolean have_rb;
guint8 fractionlost = 0;
gint32 packetslost = 0;
@ -306,15 +319,15 @@ rtp_source_create_stats (RTPSource * src)
"clock-rate", G_TYPE_INT, src->clock_rate, NULL);
/* add address and port */
if (src->have_rtp_from) {
gst_net_address_to_string (&src->rtp_from, address_str,
sizeof (address_str));
if (src->rtp_from) {
address_str = __g_socket_address_to_string (src->rtp_from);
gst_structure_set (s, "rtp-from", G_TYPE_STRING, address_str, NULL);
g_free (address_str);
}
if (src->have_rtcp_from) {
gst_net_address_to_string (&src->rtcp_from, address_str,
sizeof (address_str));
if (src->rtcp_from) {
address_str = __g_socket_address_to_string (src->rtcp_from);
gst_structure_set (s, "rtcp-from", G_TYPE_STRING, address_str, NULL);
g_free (address_str);
}
gst_structure_set (s,
@ -805,12 +818,13 @@ rtp_source_get_sdes_string (RTPSource * src, GstRTCPSDESType type)
* collistion checking.
*/
void
rtp_source_set_rtp_from (RTPSource * src, GstNetAddress * address)
rtp_source_set_rtp_from (RTPSource * src, GSocketAddress * address)
{
g_return_if_fail (RTP_IS_SOURCE (src));
src->have_rtp_from = TRUE;
memcpy (&src->rtp_from, address, sizeof (GstNetAddress));
if (src->rtp_from)
g_object_unref (src->rtp_from);
src->rtp_from = G_SOCKET_ADDRESS (g_object_ref (address));
}
/**
@ -822,12 +836,13 @@ rtp_source_set_rtp_from (RTPSource * src, GstNetAddress * address)
* collistion checking.
*/
void
rtp_source_set_rtcp_from (RTPSource * src, GstNetAddress * address)
rtp_source_set_rtcp_from (RTPSource * src, GSocketAddress * address)
{
g_return_if_fail (RTP_IS_SOURCE (src));
src->have_rtcp_from = TRUE;
memcpy (&src->rtcp_from, address, sizeof (GstNetAddress));
if (src->rtcp_from)
g_object_unref (src->rtcp_from);
src->rtcp_from = G_SOCKET_ADDRESS (g_object_ref (address));
}
static GstFlowReturn
@ -1700,7 +1715,7 @@ rtp_source_get_last_rb (RTPSource * src, guint8 * fractionlost,
* Returns: TRUE if it was a known conflict, FALSE otherwise
*/
gboolean
rtp_source_find_conflicting_address (RTPSource * src, GstNetAddress * address,
rtp_source_find_conflicting_address (RTPSource * src, GSocketAddress * address,
GstClockTime time)
{
GList *item;
@ -1709,7 +1724,7 @@ rtp_source_find_conflicting_address (RTPSource * src, GstNetAddress * address,
item; item = g_list_next (item)) {
RTPConflictingAddress *known_conflict = item->data;
if (gst_net_address_equal (address, &known_conflict->address)) {
if (__g_socket_address_equal (address, known_conflict->address)) {
known_conflict->time = time;
return TRUE;
}
@ -1728,13 +1743,13 @@ rtp_source_find_conflicting_address (RTPSource * src, GstNetAddress * address,
*/
void
rtp_source_add_conflicting_address (RTPSource * src,
GstNetAddress * address, GstClockTime time)
GSocketAddress * address, GstClockTime time)
{
RTPConflictingAddress *new_conflict;
new_conflict = g_new0 (RTPConflictingAddress, 1);
memcpy (&new_conflict->address, address, sizeof (GstNetAddress));
new_conflict->address = G_SOCKET_ADDRESS (g_object_ref (address));
new_conflict->time = time;
src->conflicting_addresses = g_list_prepend (src->conflicting_addresses,
@ -1765,12 +1780,14 @@ rtp_source_timeout (RTPSource * src, GstClockTime current_time,
GList *next_item = g_list_next (item);
if (known_conflict->time < current_time - collision_timeout) {
gchar buf[40];
gchar *buf;
src->conflicting_addresses =
g_list_delete_link (src->conflicting_addresses, item);
gst_net_address_to_string (&known_conflict->address, buf, 40);
buf = __g_socket_address_to_string (known_conflict->address);
GST_DEBUG ("collision %p timed out: %s", known_conflict, buf);
g_free (buf);
g_object_unref (known_conflict->address);
g_free (known_conflict);
}
item = next_item;

View file

@ -23,6 +23,7 @@
#include <gst/gst.h>
#include <gst/rtp/gstrtcpbuffer.h>
#include <gst/net/gstnetaddressmeta.h>
#include <gio/gio.h>
#include "rtpstats.h"
@ -102,14 +103,14 @@ typedef struct {
/**
* RTPConflictingAddress:
* @address: #GstNetAddress which conflicted
* @address: #GSocketAddress which conflicted
* @last_conflict_time: time when the last conflict was seen
*
* This structure is used to account for addresses that have conflicted to find
* loops.
*/
typedef struct {
GstNetAddress address;
GSocketAddress *address;
GstClockTime time;
} RTPConflictingAddress;
@ -138,10 +139,8 @@ struct _RTPSource {
gboolean received_bye;
gchar *bye_reason;
gboolean have_rtp_from;
GstNetAddress rtp_from;
gboolean have_rtcp_from;
GstNetAddress rtcp_from;
GSocketAddress *rtp_from;
GSocketAddress *rtcp_from;
gint payload;
GstCaps *caps;
@ -213,8 +212,8 @@ const GstStructure *
gboolean rtp_source_set_sdes_struct (RTPSource * src, GstStructure *sdes);
/* handling network address */
void rtp_source_set_rtp_from (RTPSource *src, GstNetAddress *address);
void rtp_source_set_rtcp_from (RTPSource *src, GstNetAddress *address);
void rtp_source_set_rtp_from (RTPSource *src, GSocketAddress *address);
void rtp_source_set_rtcp_from (RTPSource *src, GSocketAddress *address);
/* handling RTP */
GstFlowReturn rtp_source_process_rtp (RTPSource *src, GstBuffer *buffer, RTPArrivalStats *arrival);
@ -246,11 +245,11 @@ gboolean rtp_source_get_last_rb (RTPSource *src, guint8 *fraction
void rtp_source_reset (RTPSource * src);
gboolean rtp_source_find_conflicting_address (RTPSource * src,
GstNetAddress *address,
GSocketAddress *address,
GstClockTime time);
void rtp_source_add_conflicting_address (RTPSource * src,
GstNetAddress *address,
GSocketAddress *address,
GstClockTime time);
void rtp_source_timeout (RTPSource * src,

View file

@ -292,3 +292,37 @@ rtp_stats_set_min_interval (RTPSessionStats * stats, gdouble min_interval)
{
stats->min_interval = min_interval;
}
gboolean
__g_socket_address_equal (GSocketAddress * a, GSocketAddress * b)
{
GInetSocketAddress *ia, *ib;
GInetAddress *iaa, *iab;
ia = G_INET_SOCKET_ADDRESS (a);
ib = G_INET_SOCKET_ADDRESS (b);
if (g_inet_socket_address_get_port (ia) !=
g_inet_socket_address_get_port (ib))
return FALSE;
iaa = g_inet_socket_address_get_address (ia);
iab = g_inet_socket_address_get_address (ib);
return g_inet_address_equal (iaa, iab);
}
gchar *
__g_socket_address_to_string (GSocketAddress * addr)
{
GInetSocketAddress *ia;
gchar *ret, *tmp;
ia = G_INET_SOCKET_ADDRESS (addr);
tmp = g_inet_address_to_string (g_inet_socket_address_get_address (ia));
ret = g_strdup_printf ("%s:%u", tmp, g_inet_socket_address_get_port (ia));
g_free (tmp);
return ret;
}

View file

@ -22,6 +22,7 @@
#include <gst/gst.h>
#include <gst/net/gstnetaddressmeta.h>
#include <gio/gio.h>
/**
* RTPSenderReport:
@ -70,8 +71,7 @@ typedef struct {
GstClockTime current_time;
GstClockTime running_time;
guint64 ntpnstime;
gboolean have_address;
GstNetAddress address;
GSocketAddress *address;
guint bytes;
guint payload_len;
} RTPArrivalStats;
@ -200,4 +200,9 @@ gint64 rtp_stats_get_packets_lost (const RTPSourceStats *stats
void rtp_stats_set_min_interval (RTPSessionStats *stats,
gdouble min_interval);
gboolean __g_socket_address_equal (GSocketAddress *a, GSocketAddress *b);
gchar * __g_socket_address_to_string (GSocketAddress * addr);
#endif /* __RTP_STATS_H__ */