From 6660817af14d6cf740cc296af6751e9665f928b8 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Fri, 20 Aug 2010 15:35:27 +0200 Subject: [PATCH] multifdsink: use refcount to count host/port duplicates Instead of adding multiple client structures for the same host/port pair, use a refcount. Add a send-duplicates feature that allows you to disable sending multiple copies of the same packet to the same host when it was added multiple times. The send-duplicates property is by default set to TRUE for backwards compatibility although it is very likely that this is not desired behaviour. --- gst/udp/gstmultiudpsink.c | 246 +++++++++++++++++++++++++------------- gst/udp/gstmultiudpsink.h | 4 + 2 files changed, 166 insertions(+), 84 deletions(-) diff --git a/gst/udp/gstmultiudpsink.c b/gst/udp/gstmultiudpsink.c index c1befdad58..f06673bbf3 100644 --- a/gst/udp/gstmultiudpsink.c +++ b/gst/udp/gstmultiudpsink.c @@ -79,6 +79,7 @@ enum #define DEFAULT_TTL_MC 1 #define DEFAULT_LOOP TRUE #define DEFAULT_QOS_DSCP -1 +#define DEFAULT_SEND_DUPLICATES TRUE enum { @@ -94,6 +95,7 @@ enum PROP_TTL_MC, PROP_LOOP, PROP_QOS_DSCP, + PROP_SEND_DUPLICATES, PROP_LAST }; @@ -131,8 +133,6 @@ static void gst_multiudpsink_add_internal (GstMultiUDPSink * sink, static void gst_multiudpsink_clear_internal (GstMultiUDPSink * sink, gboolean lock); -static void free_client (GstUDPClient * client); - static GstElementClass *parent_class = NULL; static guint gst_multiudpsink_signals[LAST_SIGNAL] = { 0 }; @@ -201,7 +201,13 @@ gst_multiudpsink_class_init (GstMultiUDPSinkClass * klass) * @port: the port of the client to add * * Add a client with destination @host and @port to the list of - * clients. + * clients. When the same host/port pair is added multiple times, the + * send-duplicates property defines if the packets are sent multiple times to + * the same host/port pair or not. + * + * When a host/port pair is added multiple times, an equal amount of remove + * calls must be performed to actually remove the host/port pair from the list + * of destinations. */ gst_multiudpsink_signals[SIGNAL_ADD] = g_signal_new ("add", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST, @@ -322,6 +328,19 @@ gst_multiudpsink_class_init (GstMultiUDPSinkClass * klass) g_param_spec_int ("qos-dscp", "QoS diff srv code point", "Quality of Service, differentiated services code point (-1 default)", -1, 63, DEFAULT_QOS_DSCP, G_PARAM_READWRITE)); + /** + * GstMultiUDPSink::send-duplicates + * + * When a host/port pair is added mutliple times, send the packet to the host + * multiple times as well. + * + * Since: 0.10.26 + */ + g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_SEND_DUPLICATES, + g_param_spec_boolean ("send-duplicates", "Send Duplicates", + "When a distination/port pair is added multiple times, send packets " + "multiple times as well", DEFAULT_SEND_DUPLICATES, + G_PARAM_READWRITE)); gstelement_class->change_state = gst_multiudpsink_change_state; @@ -354,6 +373,36 @@ gst_multiudpsink_init (GstMultiUDPSink * sink) sink->loop = DEFAULT_LOOP; sink->qos_dscp = DEFAULT_QOS_DSCP; sink->ss_family = DEFAULT_FAMILY; + sink->send_duplicates = DEFAULT_SEND_DUPLICATES; +} + +static GstUDPClient * +create_client (GstMultiUDPSink * sink, const gchar * host, gint port) +{ + GstUDPClient *client; + + client = g_slice_new0 (GstUDPClient); + client->refcount = 1; + client->host = g_strdup (host); + client->port = port; + + return client; +} + +static void +free_client (GstUDPClient * client) +{ + g_free (client->host); + g_slice_free (GstUDPClient, client); +} + +static gint +client_compare (GstUDPClient * a, GstUDPClient * b) +{ + if ((a->port == b->port) && (strcmp (a->host, b->host) == 0)) + return 0; + + return 1; } static void @@ -444,38 +493,43 @@ gst_multiudpsink_render (GstBaseSink * bsink, GstBuffer * buffer) for (clients = sink->clients; clients; clients = g_list_next (clients)) { GstUDPClient *client; + gint count; client = (GstUDPClient *) clients->data; no_clients++; GST_LOG_OBJECT (sink, "sending %d bytes to client %p", size, client); - while (TRUE) { - len = gst_udp_get_sockaddr_length (&client->theiraddr); + count = sink->send_duplicates ? client->refcount : 1; - ret = sendto (*client->sock, + while (count--) { + while (TRUE) { + len = gst_udp_get_sockaddr_length (&client->theiraddr); + + ret = sendto (*client->sock, #ifdef G_OS_WIN32 - (char *) data, + (char *) data, #else - data, + data, #endif - size, 0, (struct sockaddr *) &client->theiraddr, len); + size, 0, (struct sockaddr *) &client->theiraddr, len); - if (ret < 0) { - /* some error, just warn, it's likely recoverable and we don't want to - * break streaming. We break so that we stop retrying for this client. */ - if (!socket_error_is_ignorable ()) { - gchar *errormessage = socket_last_error_message (); - GST_WARNING_OBJECT (sink, "client %p gave error %d (%s)", client, - socket_last_error_code (), errormessage); - g_free (errormessage); + if (ret < 0) { + /* some error, just warn, it's likely recoverable and we don't want to + * break streaming. We break so that we stop retrying for this client. */ + if (!socket_error_is_ignorable ()) { + gchar *errormessage = socket_last_error_message (); + GST_WARNING_OBJECT (sink, "client %p gave error %d (%s)", client, + socket_last_error_code (), errormessage); + g_free (errormessage); + break; + } + } else { + num++; + client->bytes_sent += ret; + client->packets_sent++; + sink->bytes_served += ret; break; } - } else { - num++; - client->bytes_sent += ret; - client->packets_sent++; - sink->bytes_served += ret; - break; } } } @@ -535,26 +589,31 @@ gst_multiudpsink_render_list (GstBaseSink * bsink, GstBufferList * list) for (clients = sink->clients; clients; clients = g_list_next (clients)) { GstUDPClient *client; + gint count; client = (GstUDPClient *) clients->data; no_clients++; GST_LOG_OBJECT (sink, "sending %d bytes to client %p", size, client); - while (TRUE) { - msg.msg_name = (void *) &client->theiraddr; - msg.msg_namelen = sizeof (client->theiraddr); - ret = sendmsg (*client->sock, &msg, 0); + count = sink->send_duplicates ? client->refcount : 1; - if (ret < 0) { - if (!socket_error_is_ignorable ()) { + while (count--) { + while (TRUE) { + msg.msg_name = (void *) &client->theiraddr; + msg.msg_namelen = sizeof (client->theiraddr); + ret = sendmsg (*client->sock, &msg, 0); + + if (ret < 0) { + if (!socket_error_is_ignorable ()) { + break; + } + } else { + num++; + client->bytes_sent += ret; + client->packets_sent++; + sink->bytes_served += ret; break; } - } else { - num++; - client->bytes_sent += ret; - client->packets_sent++; - sink->bytes_served += ret; - break; } } } @@ -619,13 +678,17 @@ gst_multiudpsink_get_clients_string (GstMultiUDPSink * sink) clients = sink->clients; while (clients) { GstUDPClient *client; + gint count; client = (GstUDPClient *) clients->data; clients = g_list_next (clients); - g_string_append_printf (str, "%s:%d%s", client->host, client->port, - (clients ? "," : "")); + count = client->refcount; + while (count--) { + g_string_append_printf (str, "%s:%d%s", client->host, client->port, + (clients || count > 1 ? "," : "")); + } } g_mutex_unlock (sink->client_lock); @@ -702,6 +765,9 @@ gst_multiudpsink_set_property (GObject * object, guint prop_id, udpsink->qos_dscp = g_value_get_int (value); gst_multiudpsink_setup_qos_dscp (udpsink); break; + case PROP_SEND_DUPLICATES: + udpsink->send_duplicates = g_value_get_boolean (value); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -751,6 +817,9 @@ gst_multiudpsink_get_property (GObject * object, guint prop_id, GValue * value, case PROP_QOS_DSCP: g_value_set_int (value, udpsink->qos_dscp); break; + case PROP_SEND_DUPLICATES: + g_value_set_boolean (value, udpsink->send_duplicates); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -921,27 +990,45 @@ gst_multiudpsink_add_internal (GstMultiUDPSink * sink, const gchar * host, gint port, gboolean lock) { GstUDPClient *client; + GstUDPClient udpclient; GTimeVal now; + GList *find; + + udpclient.host = (gchar *) host; + udpclient.port = port; GST_DEBUG_OBJECT (sink, "adding client on host %s, port %d", host, port); - client = g_new0 (GstUDPClient, 1); - client->host = g_strdup (host); - client->port = port; - client->sock = &sink->sock; - - if (gst_udp_get_addr (host, port, &client->theiraddr) < 0) - goto getaddrinfo_error; - - g_get_current_time (&now); - client->connect_time = GST_TIMEVAL_TO_TIME (now); - - if (*client->sock > 0) { - gst_multiudpsink_configure_client (sink, client); - } if (lock) g_mutex_lock (sink->client_lock); - sink->clients = g_list_prepend (sink->clients, client); + + find = g_list_find_custom (sink->clients, &udpclient, + (GCompareFunc) client_compare); + if (find) { + client = (GstUDPClient *) find->data; + + GST_DEBUG_OBJECT (sink, "found %d existing clients with host %s, port %d", + client->refcount, host, port); + client->refcount++; + } else { + client = create_client (sink, host, port); + + client->sock = &sink->sock; + + if (gst_udp_get_addr (host, port, &client->theiraddr) < 0) + goto getaddrinfo_error; + + g_get_current_time (&now); + client->connect_time = GST_TIMEVAL_TO_TIME (now); + + if (*client->sock > 0) { + gst_multiudpsink_configure_client (sink, client); + } + + GST_DEBUG_OBJECT (sink, "add client with host %s, port %d", host, port); + sink->clients = g_list_prepend (sink->clients, client); + } + if (lock) g_mutex_unlock (sink->client_lock); @@ -957,8 +1044,9 @@ getaddrinfo_error: GST_DEBUG_OBJECT (sink, "did not add client on host %s, port %d", host, port); GST_WARNING_OBJECT (sink, "getaddrinfo lookup error?"); - g_free (client->host); - g_free (client); + free_client (client); + if (lock) + g_mutex_unlock (sink->client_lock); return; } } @@ -969,22 +1057,6 @@ gst_multiudpsink_add (GstMultiUDPSink * sink, const gchar * host, gint port) gst_multiudpsink_add_internal (sink, host, port, TRUE); } -static gint -client_compare (GstUDPClient * a, GstUDPClient * b) -{ - if ((a->port == b->port) && (strcmp (a->host, b->host) == 0)) - return 0; - - return 1; -} - -static void -free_client (GstUDPClient * client) -{ - g_free (client->host); - g_free (client); -} - void gst_multiudpsink_remove (GstMultiUDPSink * sink, const gchar * host, gint port) { @@ -1002,26 +1074,32 @@ gst_multiudpsink_remove (GstMultiUDPSink * sink, const gchar * host, gint port) if (!find) goto not_found; - GST_DEBUG_OBJECT (sink, "remove client with host %s, port %d", host, port); - client = (GstUDPClient *) find->data; - g_get_current_time (&now); - client->disconnect_time = GST_TIMEVAL_TO_TIME (now); + GST_DEBUG_OBJECT (sink, "found %d clients with host %s, port %d", + client->refcount, host, port); - if (*(client->sock) != -1 && sink->auto_multicast - && gst_udp_is_multicast (&client->theiraddr)) - gst_udp_leave_group (*(client->sock), &client->theiraddr); + client->refcount--; + if (client->refcount == 0) { + GST_DEBUG_OBJECT (sink, "remove client with host %s, port %d", host, port); - /* Unlock to emit signal before we delete the actual client */ - g_mutex_unlock (sink->client_lock); - g_signal_emit (G_OBJECT (sink), - gst_multiudpsink_signals[SIGNAL_CLIENT_REMOVED], 0, host, port); - g_mutex_lock (sink->client_lock); + g_get_current_time (&now); + client->disconnect_time = GST_TIMEVAL_TO_TIME (now); - sink->clients = g_list_delete_link (sink->clients, find); + if (*(client->sock) != -1 && sink->auto_multicast + && gst_udp_is_multicast (&client->theiraddr)) + gst_udp_leave_group (*(client->sock), &client->theiraddr); - free_client (client); + /* Unlock to emit signal before we delete the actual client */ + g_mutex_unlock (sink->client_lock); + g_signal_emit (G_OBJECT (sink), + gst_multiudpsink_signals[SIGNAL_CLIENT_REMOVED], 0, host, port); + g_mutex_lock (sink->client_lock); + + sink->clients = g_list_delete_link (sink->clients, find); + + free_client (client); + } g_mutex_unlock (sink->client_lock); return; diff --git a/gst/udp/gstmultiudpsink.h b/gst/udp/gstmultiudpsink.h index ce9794ed23..6d4da77e95 100644 --- a/gst/udp/gstmultiudpsink.h +++ b/gst/udp/gstmultiudpsink.h @@ -38,6 +38,8 @@ typedef struct _GstMultiUDPSink GstMultiUDPSink; typedef struct _GstMultiUDPSinkClass GstMultiUDPSinkClass; typedef struct { + gint refcount; + int *sock; struct sockaddr_storage theiraddr; @@ -76,6 +78,8 @@ struct _GstMultiUDPSink { gboolean loop; gint qos_dscp; guint16 ss_family; + + gboolean send_duplicates; }; struct _GstMultiUDPSinkClass {