multifdsink: use refcount to count host/port duplicates

Instead of adding multiple client structures for the same host/port pair, use a
refcount.
Add a send-duplicates feature that allows you to disable sending multiple copies
of the same packet to the same host when it was added multiple times. The
send-duplicates property is by default set to TRUE for backwards compatibility
although it is very likely that this is not desired behaviour.
This commit is contained in:
Wim Taymans 2010-08-20 15:35:27 +02:00
parent e4f8144bbf
commit 6660817af1
2 changed files with 166 additions and 84 deletions

View file

@ -79,6 +79,7 @@ enum
#define DEFAULT_TTL_MC 1 #define DEFAULT_TTL_MC 1
#define DEFAULT_LOOP TRUE #define DEFAULT_LOOP TRUE
#define DEFAULT_QOS_DSCP -1 #define DEFAULT_QOS_DSCP -1
#define DEFAULT_SEND_DUPLICATES TRUE
enum enum
{ {
@ -94,6 +95,7 @@ enum
PROP_TTL_MC, PROP_TTL_MC,
PROP_LOOP, PROP_LOOP,
PROP_QOS_DSCP, PROP_QOS_DSCP,
PROP_SEND_DUPLICATES,
PROP_LAST PROP_LAST
}; };
@ -131,8 +133,6 @@ static void gst_multiudpsink_add_internal (GstMultiUDPSink * sink,
static void gst_multiudpsink_clear_internal (GstMultiUDPSink * sink, static void gst_multiudpsink_clear_internal (GstMultiUDPSink * sink,
gboolean lock); gboolean lock);
static void free_client (GstUDPClient * client);
static GstElementClass *parent_class = NULL; static GstElementClass *parent_class = NULL;
static guint gst_multiudpsink_signals[LAST_SIGNAL] = { 0 }; static guint gst_multiudpsink_signals[LAST_SIGNAL] = { 0 };
@ -201,7 +201,13 @@ gst_multiudpsink_class_init (GstMultiUDPSinkClass * klass)
* @port: the port of the client to add * @port: the port of the client to add
* *
* Add a client with destination @host and @port to the list of * Add a client with destination @host and @port to the list of
* clients. * clients. When the same host/port pair is added multiple times, the
* send-duplicates property defines if the packets are sent multiple times to
* the same host/port pair or not.
*
* When a host/port pair is added multiple times, an equal amount of remove
* calls must be performed to actually remove the host/port pair from the list
* of destinations.
*/ */
gst_multiudpsink_signals[SIGNAL_ADD] = gst_multiudpsink_signals[SIGNAL_ADD] =
g_signal_new ("add", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST, g_signal_new ("add", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
@ -322,6 +328,19 @@ gst_multiudpsink_class_init (GstMultiUDPSinkClass * klass)
g_param_spec_int ("qos-dscp", "QoS diff srv code point", g_param_spec_int ("qos-dscp", "QoS diff srv code point",
"Quality of Service, differentiated services code point (-1 default)", "Quality of Service, differentiated services code point (-1 default)",
-1, 63, DEFAULT_QOS_DSCP, G_PARAM_READWRITE)); -1, 63, DEFAULT_QOS_DSCP, G_PARAM_READWRITE));
/**
* GstMultiUDPSink::send-duplicates
*
* When a host/port pair is added mutliple times, send the packet to the host
* multiple times as well.
*
* Since: 0.10.26
*/
g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_SEND_DUPLICATES,
g_param_spec_boolean ("send-duplicates", "Send Duplicates",
"When a distination/port pair is added multiple times, send packets "
"multiple times as well", DEFAULT_SEND_DUPLICATES,
G_PARAM_READWRITE));
gstelement_class->change_state = gst_multiudpsink_change_state; gstelement_class->change_state = gst_multiudpsink_change_state;
@ -354,6 +373,36 @@ gst_multiudpsink_init (GstMultiUDPSink * sink)
sink->loop = DEFAULT_LOOP; sink->loop = DEFAULT_LOOP;
sink->qos_dscp = DEFAULT_QOS_DSCP; sink->qos_dscp = DEFAULT_QOS_DSCP;
sink->ss_family = DEFAULT_FAMILY; sink->ss_family = DEFAULT_FAMILY;
sink->send_duplicates = DEFAULT_SEND_DUPLICATES;
}
static GstUDPClient *
create_client (GstMultiUDPSink * sink, const gchar * host, gint port)
{
GstUDPClient *client;
client = g_slice_new0 (GstUDPClient);
client->refcount = 1;
client->host = g_strdup (host);
client->port = port;
return client;
}
static void
free_client (GstUDPClient * client)
{
g_free (client->host);
g_slice_free (GstUDPClient, client);
}
static gint
client_compare (GstUDPClient * a, GstUDPClient * b)
{
if ((a->port == b->port) && (strcmp (a->host, b->host) == 0))
return 0;
return 1;
} }
static void static void
@ -444,38 +493,43 @@ gst_multiudpsink_render (GstBaseSink * bsink, GstBuffer * buffer)
for (clients = sink->clients; clients; clients = g_list_next (clients)) { for (clients = sink->clients; clients; clients = g_list_next (clients)) {
GstUDPClient *client; GstUDPClient *client;
gint count;
client = (GstUDPClient *) clients->data; client = (GstUDPClient *) clients->data;
no_clients++; no_clients++;
GST_LOG_OBJECT (sink, "sending %d bytes to client %p", size, client); GST_LOG_OBJECT (sink, "sending %d bytes to client %p", size, client);
while (TRUE) { count = sink->send_duplicates ? client->refcount : 1;
len = gst_udp_get_sockaddr_length (&client->theiraddr);
ret = sendto (*client->sock, while (count--) {
while (TRUE) {
len = gst_udp_get_sockaddr_length (&client->theiraddr);
ret = sendto (*client->sock,
#ifdef G_OS_WIN32 #ifdef G_OS_WIN32
(char *) data, (char *) data,
#else #else
data, data,
#endif #endif
size, 0, (struct sockaddr *) &client->theiraddr, len); size, 0, (struct sockaddr *) &client->theiraddr, len);
if (ret < 0) { if (ret < 0) {
/* some error, just warn, it's likely recoverable and we don't want to /* some error, just warn, it's likely recoverable and we don't want to
* break streaming. We break so that we stop retrying for this client. */ * break streaming. We break so that we stop retrying for this client. */
if (!socket_error_is_ignorable ()) { if (!socket_error_is_ignorable ()) {
gchar *errormessage = socket_last_error_message (); gchar *errormessage = socket_last_error_message ();
GST_WARNING_OBJECT (sink, "client %p gave error %d (%s)", client, GST_WARNING_OBJECT (sink, "client %p gave error %d (%s)", client,
socket_last_error_code (), errormessage); socket_last_error_code (), errormessage);
g_free (errormessage); g_free (errormessage);
break;
}
} else {
num++;
client->bytes_sent += ret;
client->packets_sent++;
sink->bytes_served += ret;
break; break;
} }
} else {
num++;
client->bytes_sent += ret;
client->packets_sent++;
sink->bytes_served += ret;
break;
} }
} }
} }
@ -535,26 +589,31 @@ gst_multiudpsink_render_list (GstBaseSink * bsink, GstBufferList * list)
for (clients = sink->clients; clients; clients = g_list_next (clients)) { for (clients = sink->clients; clients; clients = g_list_next (clients)) {
GstUDPClient *client; GstUDPClient *client;
gint count;
client = (GstUDPClient *) clients->data; client = (GstUDPClient *) clients->data;
no_clients++; no_clients++;
GST_LOG_OBJECT (sink, "sending %d bytes to client %p", size, client); GST_LOG_OBJECT (sink, "sending %d bytes to client %p", size, client);
while (TRUE) { count = sink->send_duplicates ? client->refcount : 1;
msg.msg_name = (void *) &client->theiraddr;
msg.msg_namelen = sizeof (client->theiraddr);
ret = sendmsg (*client->sock, &msg, 0);
if (ret < 0) { while (count--) {
if (!socket_error_is_ignorable ()) { while (TRUE) {
msg.msg_name = (void *) &client->theiraddr;
msg.msg_namelen = sizeof (client->theiraddr);
ret = sendmsg (*client->sock, &msg, 0);
if (ret < 0) {
if (!socket_error_is_ignorable ()) {
break;
}
} else {
num++;
client->bytes_sent += ret;
client->packets_sent++;
sink->bytes_served += ret;
break; break;
} }
} else {
num++;
client->bytes_sent += ret;
client->packets_sent++;
sink->bytes_served += ret;
break;
} }
} }
} }
@ -619,13 +678,17 @@ gst_multiudpsink_get_clients_string (GstMultiUDPSink * sink)
clients = sink->clients; clients = sink->clients;
while (clients) { while (clients) {
GstUDPClient *client; GstUDPClient *client;
gint count;
client = (GstUDPClient *) clients->data; client = (GstUDPClient *) clients->data;
clients = g_list_next (clients); clients = g_list_next (clients);
g_string_append_printf (str, "%s:%d%s", client->host, client->port, count = client->refcount;
(clients ? "," : "")); while (count--) {
g_string_append_printf (str, "%s:%d%s", client->host, client->port,
(clients || count > 1 ? "," : ""));
}
} }
g_mutex_unlock (sink->client_lock); g_mutex_unlock (sink->client_lock);
@ -702,6 +765,9 @@ gst_multiudpsink_set_property (GObject * object, guint prop_id,
udpsink->qos_dscp = g_value_get_int (value); udpsink->qos_dscp = g_value_get_int (value);
gst_multiudpsink_setup_qos_dscp (udpsink); gst_multiudpsink_setup_qos_dscp (udpsink);
break; break;
case PROP_SEND_DUPLICATES:
udpsink->send_duplicates = g_value_get_boolean (value);
break;
default: default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break; break;
@ -751,6 +817,9 @@ gst_multiudpsink_get_property (GObject * object, guint prop_id, GValue * value,
case PROP_QOS_DSCP: case PROP_QOS_DSCP:
g_value_set_int (value, udpsink->qos_dscp); g_value_set_int (value, udpsink->qos_dscp);
break; break;
case PROP_SEND_DUPLICATES:
g_value_set_boolean (value, udpsink->send_duplicates);
break;
default: default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break; break;
@ -921,27 +990,45 @@ gst_multiudpsink_add_internal (GstMultiUDPSink * sink, const gchar * host,
gint port, gboolean lock) gint port, gboolean lock)
{ {
GstUDPClient *client; GstUDPClient *client;
GstUDPClient udpclient;
GTimeVal now; GTimeVal now;
GList *find;
udpclient.host = (gchar *) host;
udpclient.port = port;
GST_DEBUG_OBJECT (sink, "adding client on host %s, port %d", host, port); GST_DEBUG_OBJECT (sink, "adding client on host %s, port %d", host, port);
client = g_new0 (GstUDPClient, 1);
client->host = g_strdup (host);
client->port = port;
client->sock = &sink->sock;
if (gst_udp_get_addr (host, port, &client->theiraddr) < 0)
goto getaddrinfo_error;
g_get_current_time (&now);
client->connect_time = GST_TIMEVAL_TO_TIME (now);
if (*client->sock > 0) {
gst_multiudpsink_configure_client (sink, client);
}
if (lock) if (lock)
g_mutex_lock (sink->client_lock); g_mutex_lock (sink->client_lock);
sink->clients = g_list_prepend (sink->clients, client);
find = g_list_find_custom (sink->clients, &udpclient,
(GCompareFunc) client_compare);
if (find) {
client = (GstUDPClient *) find->data;
GST_DEBUG_OBJECT (sink, "found %d existing clients with host %s, port %d",
client->refcount, host, port);
client->refcount++;
} else {
client = create_client (sink, host, port);
client->sock = &sink->sock;
if (gst_udp_get_addr (host, port, &client->theiraddr) < 0)
goto getaddrinfo_error;
g_get_current_time (&now);
client->connect_time = GST_TIMEVAL_TO_TIME (now);
if (*client->sock > 0) {
gst_multiudpsink_configure_client (sink, client);
}
GST_DEBUG_OBJECT (sink, "add client with host %s, port %d", host, port);
sink->clients = g_list_prepend (sink->clients, client);
}
if (lock) if (lock)
g_mutex_unlock (sink->client_lock); g_mutex_unlock (sink->client_lock);
@ -957,8 +1044,9 @@ getaddrinfo_error:
GST_DEBUG_OBJECT (sink, "did not add client on host %s, port %d", host, GST_DEBUG_OBJECT (sink, "did not add client on host %s, port %d", host,
port); port);
GST_WARNING_OBJECT (sink, "getaddrinfo lookup error?"); GST_WARNING_OBJECT (sink, "getaddrinfo lookup error?");
g_free (client->host); free_client (client);
g_free (client); if (lock)
g_mutex_unlock (sink->client_lock);
return; return;
} }
} }
@ -969,22 +1057,6 @@ gst_multiudpsink_add (GstMultiUDPSink * sink, const gchar * host, gint port)
gst_multiudpsink_add_internal (sink, host, port, TRUE); gst_multiudpsink_add_internal (sink, host, port, TRUE);
} }
static gint
client_compare (GstUDPClient * a, GstUDPClient * b)
{
if ((a->port == b->port) && (strcmp (a->host, b->host) == 0))
return 0;
return 1;
}
static void
free_client (GstUDPClient * client)
{
g_free (client->host);
g_free (client);
}
void void
gst_multiudpsink_remove (GstMultiUDPSink * sink, const gchar * host, gint port) gst_multiudpsink_remove (GstMultiUDPSink * sink, const gchar * host, gint port)
{ {
@ -1002,26 +1074,32 @@ gst_multiudpsink_remove (GstMultiUDPSink * sink, const gchar * host, gint port)
if (!find) if (!find)
goto not_found; goto not_found;
GST_DEBUG_OBJECT (sink, "remove client with host %s, port %d", host, port);
client = (GstUDPClient *) find->data; client = (GstUDPClient *) find->data;
g_get_current_time (&now); GST_DEBUG_OBJECT (sink, "found %d clients with host %s, port %d",
client->disconnect_time = GST_TIMEVAL_TO_TIME (now); client->refcount, host, port);
if (*(client->sock) != -1 && sink->auto_multicast client->refcount--;
&& gst_udp_is_multicast (&client->theiraddr)) if (client->refcount == 0) {
gst_udp_leave_group (*(client->sock), &client->theiraddr); GST_DEBUG_OBJECT (sink, "remove client with host %s, port %d", host, port);
/* Unlock to emit signal before we delete the actual client */ g_get_current_time (&now);
g_mutex_unlock (sink->client_lock); client->disconnect_time = GST_TIMEVAL_TO_TIME (now);
g_signal_emit (G_OBJECT (sink),
gst_multiudpsink_signals[SIGNAL_CLIENT_REMOVED], 0, host, port);
g_mutex_lock (sink->client_lock);
sink->clients = g_list_delete_link (sink->clients, find); if (*(client->sock) != -1 && sink->auto_multicast
&& gst_udp_is_multicast (&client->theiraddr))
gst_udp_leave_group (*(client->sock), &client->theiraddr);
free_client (client); /* Unlock to emit signal before we delete the actual client */
g_mutex_unlock (sink->client_lock);
g_signal_emit (G_OBJECT (sink),
gst_multiudpsink_signals[SIGNAL_CLIENT_REMOVED], 0, host, port);
g_mutex_lock (sink->client_lock);
sink->clients = g_list_delete_link (sink->clients, find);
free_client (client);
}
g_mutex_unlock (sink->client_lock); g_mutex_unlock (sink->client_lock);
return; return;

View file

@ -38,6 +38,8 @@ typedef struct _GstMultiUDPSink GstMultiUDPSink;
typedef struct _GstMultiUDPSinkClass GstMultiUDPSinkClass; typedef struct _GstMultiUDPSinkClass GstMultiUDPSinkClass;
typedef struct { typedef struct {
gint refcount;
int *sock; int *sock;
struct sockaddr_storage theiraddr; struct sockaddr_storage theiraddr;
@ -76,6 +78,8 @@ struct _GstMultiUDPSink {
gboolean loop; gboolean loop;
gint qos_dscp; gint qos_dscp;
guint16 ss_family; guint16 ss_family;
gboolean send_duplicates;
}; };
struct _GstMultiUDPSinkClass { struct _GstMultiUDPSinkClass {