From bb38a849e57393b67ff72f05ac48c44eba02430c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Tue, 17 Jan 2012 11:18:15 +0100 Subject: [PATCH] udpsink/multiudpsink: Port to GIO --- gst/udp/gstmultiudpsink.c | 739 +++++++++++++++----------------------- gst/udp/gstmultiudpsink.h | 23 +- gst/udp/gstudpsink.c | 64 ++-- gst/udp/gstudpsink.h | 6 +- 4 files changed, 337 insertions(+), 495 deletions(-) diff --git a/gst/udp/gstmultiudpsink.c b/gst/udp/gstmultiudpsink.c index 970b4aef0b..3871a6e1d8 100644 --- a/gst/udp/gstmultiudpsink.c +++ b/gst/udp/gstmultiudpsink.c @@ -1,6 +1,8 @@ /* GStreamer * Copyright (C) <2007> Wim Taymans * Copyright (C) <2009> Jarkko Palviainen + * Copyright (C) <2012> Collabora Ltd. + * Author: Sebastian Dröge * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Library General Public @@ -33,12 +35,6 @@ #include "gstudp-marshal.h" #include "gstmultiudpsink.h" -#include -#include -#ifdef HAVE_UNISTD_H -#include -#endif -#include #include #include "gst/glib-compat-private.h" @@ -70,11 +66,11 @@ enum LAST_SIGNAL }; -#define DEFAULT_SOCKFD -1 -#define DEFAULT_CLOSEFD TRUE -#define DEFAULT_SOCK -1 +#define DEFAULT_SOCKET NULL +#define DEFAULT_CLOSE_SOCKET TRUE +#define DEFAULT_USED_SOCKET NULL #define DEFAULT_CLIENTS NULL -#define DEFAULT_FAMILY 0 +#define DEFAULT_FAMILY G_SOCKET_FAMILY_IPV6 /* FIXME, this should be disabled by default, we don't need to join a multicast * group for sending, if this socket is also used for receiving, it should * be configured in the element that does the receive. */ @@ -91,9 +87,9 @@ enum PROP_0, PROP_BYTES_TO_SERVE, PROP_BYTES_SERVED, - PROP_SOCKFD, - PROP_CLOSEFD, - PROP_SOCK, + PROP_SOCKET, + PROP_CLOSE_SOCKET, + PROP_USED_SOCKET, PROP_CLIENTS, PROP_AUTO_MULTICAST, PROP_TTL, @@ -105,28 +101,15 @@ enum PROP_LAST }; -#define CLOSE_IF_REQUESTED(udpctx) \ -G_STMT_START { \ - if ((!udpctx->externalfd) || (udpctx->externalfd && udpctx->closefd)) { \ - CLOSE_SOCKET(udpctx->sock); \ - if (udpctx->sock == udpctx->sockfd) \ - udpctx->sockfd = DEFAULT_SOCKFD; \ - } \ - udpctx->sock = DEFAULT_SOCK; \ -} G_STMT_END - static void gst_multiudpsink_finalize (GObject * object); static GstFlowReturn gst_multiudpsink_render (GstBaseSink * sink, GstBuffer * buffer); -#if 0 -#ifndef G_OS_WIN32 /* sendmsg() is not available on Windows */ -static GstFlowReturn gst_multiudpsink_render_list (GstBaseSink * bsink, - GstBufferList * list); -#endif -#endif -static GstStateChangeReturn gst_multiudpsink_change_state (GstElement * - element, GstStateChange transition); + +static gboolean gst_multiudpsink_start (GstBaseSink * bsink); +static gboolean gst_multiudpsink_stop (GstBaseSink * bsink); +static gboolean gst_multiudpsink_unlock (GstBaseSink * bsink); +static gboolean gst_multiudpsink_unlock_stop (GstBaseSink * bsink); static void gst_multiudpsink_set_property (GObject * object, guint prop_id, const GValue * value, GParamSpec * pspec); @@ -154,8 +137,6 @@ gst_multiudpsink_class_init (GstMultiUDPSinkClass * klass) gstelement_class = (GstElementClass *) klass; gstbasesink_class = (GstBaseSinkClass *) klass; - parent_class = g_type_class_peek_parent (klass); - gobject_class->set_property = gst_multiudpsink_set_property; gobject_class->get_property = gst_multiudpsink_get_property; gobject_class->finalize = gst_multiudpsink_finalize; @@ -215,7 +196,7 @@ gst_multiudpsink_class_init (GstMultiUDPSinkClass * klass) * * Get the statistics of the client with destination @host and @port. * - * Returns: a GValueArray of uint64: bytes_sent, packets_sent, + * Returns: a GstStructure: bytes_sent, packets_sent, * connect_time (in epoch seconds), disconnect_time (in epoch seconds) */ gst_multiudpsink_signals[SIGNAL_GET_STATS] = @@ -261,20 +242,18 @@ gst_multiudpsink_class_init (GstMultiUDPSinkClass * klass) g_param_spec_uint64 ("bytes-served", "Bytes served", "Total number of bytes send to all clients", 0, G_MAXUINT64, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); - g_object_class_install_property (gobject_class, PROP_SOCKFD, - g_param_spec_int ("sockfd", "Socket Handle", - "Socket to use for UDP sending. (-1 == allocate)", - -1, G_MAXINT, DEFAULT_SOCKFD, - G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - g_object_class_install_property (gobject_class, PROP_CLOSEFD, - g_param_spec_boolean ("closefd", "Close sockfd", - "Close sockfd if passed as property on state change", - DEFAULT_CLOSEFD, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - g_object_class_install_property (gobject_class, PROP_SOCK, - g_param_spec_int ("sock", "Socket Handle", - "Socket currently in use for UDP sending. (-1 == no socket)", - -1, G_MAXINT, DEFAULT_SOCK, - G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); + g_object_class_install_property (gobject_class, PROP_SOCKET, + g_param_spec_object ("socket", "Socket Handle", + "Socket to use for UDP sending. (NULL == allocate)", + G_TYPE_SOCKET, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + g_object_class_install_property (gobject_class, PROP_CLOSE_SOCKET, + g_param_spec_boolean ("close-socket", "Close socket", + "Close socket if passed as property on state change", + DEFAULT_CLOSE_SOCKET, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + g_object_class_install_property (gobject_class, PROP_USED_SOCKET, + g_param_spec_object ("used-socket", "Used Socket Handle", + "Socket currently in use for UDP sending. (NULL == no socket)", + G_TYPE_SOCKET, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); g_object_class_install_property (gobject_class, PROP_CLIENTS, g_param_spec_string ("clients", "Clients", "A comma separated list of host:port pairs with destinations", @@ -322,8 +301,6 @@ gst_multiudpsink_class_init (GstMultiUDPSinkClass * klass) "Size of the kernel send buffer in bytes, 0=default", 0, G_MAXINT, DEFAULT_BUFFER_SIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - gstelement_class->change_state = gst_multiudpsink_change_state; - gst_element_class_add_pad_template (gstelement_class, gst_static_pad_template_get (&sink_template)); @@ -333,11 +310,10 @@ gst_multiudpsink_class_init (GstMultiUDPSinkClass * klass) "Wim Taymans "); gstbasesink_class->render = gst_multiudpsink_render; -#if 0 -#ifndef G_OS_WIN32 - gstbasesink_class->render_list = gst_multiudpsink_render_list; -#endif -#endif + gstbasesink_class->start = gst_multiudpsink_start; + gstbasesink_class->stop = gst_multiudpsink_stop; + gstbasesink_class->unlock = gst_multiudpsink_unlock; + gstbasesink_class->unlock_stop = gst_multiudpsink_unlock_stop; klass->add = gst_multiudpsink_add; klass->remove = gst_multiudpsink_remove; klass->clear = gst_multiudpsink_clear; @@ -350,38 +326,74 @@ gst_multiudpsink_class_init (GstMultiUDPSinkClass * klass) static void gst_multiudpsink_init (GstMultiUDPSink * sink) { - WSA_STARTUP (sink); - sink->client_lock = g_mutex_new (); - sink->sock = DEFAULT_SOCK; - sink->sockfd = DEFAULT_SOCKFD; - sink->closefd = DEFAULT_CLOSEFD; - sink->externalfd = (sink->sockfd != -1); + sink->socket = DEFAULT_SOCKET; + sink->used_socket = DEFAULT_USED_SOCKET; + sink->close_socket = DEFAULT_CLOSE_SOCKET; + sink->external_socket = (sink->socket != NULL); sink->auto_multicast = DEFAULT_AUTO_MULTICAST; sink->ttl = DEFAULT_TTL; sink->ttl_mc = DEFAULT_TTL_MC; sink->loop = DEFAULT_LOOP; sink->qos_dscp = DEFAULT_QOS_DSCP; - sink->ss_family = DEFAULT_FAMILY; + sink->family = DEFAULT_FAMILY; sink->send_duplicates = DEFAULT_SEND_DUPLICATES; + + sink->cancellable = g_cancellable_new (); } static GstUDPClient * create_client (GstMultiUDPSink * sink, const gchar * host, gint port) { GstUDPClient *client; + GInetAddress *addr; + GResolver *resolver; + GError *err = NULL; + + addr = g_inet_address_new_from_string (host); + if (!addr) { + GList *results; + + resolver = g_resolver_get_default (); + results = + g_resolver_lookup_by_name (resolver, host, sink->cancellable, &err); + if (!results) + goto name_resolve; + addr = G_INET_ADDRESS (g_object_ref (results->data)); + + g_resolver_free_addresses (results); + g_object_unref (resolver); + } +#ifndef GST_DISABLE_GST_DEBUG + { + gchar *ip = g_inet_address_to_string (addr); + + GST_DEBUG_OBJECT (sink, "IP address for host %s is %s", host, ip); + g_free (ip); + } +#endif client = g_slice_new0 (GstUDPClient); client->refcount = 1; client->host = g_strdup (host); client->port = port; + client->addr = g_inet_socket_address_new (addr, port); + g_object_unref (addr); return client; + +name_resolve: + { + g_object_unref (resolver); + + return NULL; + } } static void free_client (GstUDPClient * client) { + g_object_unref (client->addr); g_free (client->host); g_slice_free (GstUDPClient, client); } @@ -405,167 +417,45 @@ gst_multiudpsink_finalize (GObject * object) g_list_foreach (sink->clients, (GFunc) free_client, NULL); g_list_free (sink->clients); - if (sink->sockfd >= 0 && sink->closefd) - CLOSE_SOCKET (sink->sockfd); + if (sink->socket) + g_object_unref (sink->socket); + sink->socket = NULL; + + if (sink->used_socket) + g_object_unref (sink->used_socket); + sink->used_socket = NULL; + + if (sink->cancellable) + g_object_unref (sink->cancellable); + sink->cancellable = NULL; g_mutex_free (sink->client_lock); - WSA_CLEANUP (object); - G_OBJECT_CLASS (parent_class)->finalize (object); } -static gboolean -socket_error_is_ignorable (void) -{ -#ifdef G_OS_WIN32 - /* Windows doesn't seem to have an EAGAIN for sockets */ - return WSAGetLastError () == WSAEINTR; -#else - return errno == EINTR || errno == EAGAIN; -#endif -} - -static int -socket_last_error_code (void) -{ -#ifdef G_OS_WIN32 - return WSAGetLastError (); -#else - return errno; -#endif -} - -static gchar * -socket_last_error_message (void) -{ -#ifdef G_OS_WIN32 - int errorcode = WSAGetLastError (); - wchar_t buf[1024]; - DWORD result = - FormatMessage (FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS, - NULL, errorcode, 0, (LPSTR) buf, sizeof (buf) / sizeof (wchar_t), NULL); - if (FAILED (result)) { - return g_strdup ("failed to get error message from system"); - } else { - gchar *res = - g_convert ((gchar *) buf, -1, "UTF-16", "UTF-8", NULL, NULL, NULL); - /* g_convert() internally calls windows functions which reset the - windows error code, so fix it up again like this */ - WSASetLastError (errorcode); - return res; - } -#else - return g_strdup (g_strerror (errno)); -#endif -} - -#ifdef G_OS_WIN32 -/* version without sendmsg */ -static GstFlowReturn -gst_multiudpsink_render (GstBaseSink * bsink, GstBuffer * buffer) -{ - GstMultiUDPSink *sink; - gint ret, num = 0, no_clients = 0; - gsize size; - guint8 *data; - GList *clients; - gint len; - - sink = GST_MULTIUDPSINK (bsink); - - data = gst_buffer_map (buffer, &size, NULL, GST_MAP_READ); - - if (size > UDP_MAX_SIZE) { - GST_WARNING ("Attempting to send a UDP packet larger than maximum " - "size (%d > %d)", size, UDP_MAX_SIZE); - } - - sink->bytes_to_serve += size; - - /* grab lock while iterating and sending to clients, this should be - * fast as UDP never blocks */ - g_mutex_lock (sink->client_lock); - GST_LOG_OBJECT (bsink, "about to send %d bytes", size); - - for (clients = sink->clients; clients; clients = g_list_next (clients)) { - GstUDPClient *client; - gint count; - - client = (GstUDPClient *) clients->data; - no_clients++; - GST_LOG_OBJECT (sink, "sending %d bytes to client %p", size, client); - - count = sink->send_duplicates ? client->refcount : 1; - - while (count--) { - while (TRUE) { - len = gst_udp_get_sockaddr_length (&client->theiraddr); - - ret = sendto (*client->sock, -#ifdef G_OS_WIN32 - (char *) data, -#else - data, -#endif - size, 0, (struct sockaddr *) &client->theiraddr, len); - - if (ret < 0) { - /* some error, just warn, it's likely recoverable and we don't want to - * break streaming. We break so that we stop retrying for this client. */ - if (!socket_error_is_ignorable ()) { - gchar *errormessage = socket_last_error_message (); - GST_WARNING_OBJECT (sink, "client %p gave error %d (%s)", client, - socket_last_error_code (), errormessage); - g_free (errormessage); - break; - } - } else { - num++; - client->bytes_sent += ret; - client->packets_sent++; - sink->bytes_served += ret; - break; - } - } - } - } - g_mutex_unlock (sink->client_lock); - - gst_buffer_unmap (buffer, data, size); - - GST_LOG_OBJECT (sink, "sent %d bytes to %d (of %d) clients", size, num, - no_clients); - - return GST_FLOW_OK; -} -#else /* !G_OS_WIN32 */ -/* version with sendmsg */ static GstFlowReturn gst_multiudpsink_render (GstBaseSink * bsink, GstBuffer * buffer) { GstMultiUDPSink *sink; GList *clients; - gint ret, size = 0, num = 0, no_clients = 0; - struct iovec *iov; - struct msghdr msg = { 0 }; + GOutputVector *vec; guint n_mem, i; gpointer bdata; - gsize bsize; + gsize bsize, size; GstMemory *mem; + gint num, no_clients; + GError *err = NULL; sink = GST_MULTIUDPSINK (bsink); - msg.msg_iovlen = 0; - size = 0; - n_mem = gst_buffer_n_memory (buffer); if (n_mem == 0) goto no_data; - iov = (struct iovec *) g_malloc (n_mem * sizeof (struct iovec)); - msg.msg_iov = iov; + vec = g_new (GOutputVector, n_mem); + size = 0; for (i = 0; i < n_mem; i++) { mem = gst_buffer_peek_memory (buffer, i, GST_MAP_READ); bdata = gst_memory_map (mem, &bsize, NULL, GST_MAP_READ); @@ -575,9 +465,8 @@ gst_multiudpsink_render (GstBaseSink * bsink, GstBuffer * buffer) "size (%" G_GSIZE_FORMAT " > %d)", bsize, UDP_MAX_SIZE); } - msg.msg_iov[msg.msg_iovlen].iov_len = bsize; - msg.msg_iov[msg.msg_iovlen].iov_base = bdata; - msg.msg_iovlen++; + vec[i].buffer = bdata; + vec[i].size = bsize; size += bsize; } @@ -589,6 +478,8 @@ gst_multiudpsink_render (GstBaseSink * bsink, GstBuffer * buffer) g_mutex_lock (sink->client_lock); GST_LOG_OBJECT (bsink, "about to send %d bytes", size); + no_clients = 0; + num = 0; for (clients = sink->clients; clients; clients = g_list_next (clients)) { GstUDPClient *client; gint count; @@ -600,28 +491,19 @@ gst_multiudpsink_render (GstBaseSink * bsink, GstBuffer * buffer) count = sink->send_duplicates ? client->refcount : 1; while (count--) { - while (TRUE) { - msg.msg_name = (void *) &client->theiraddr; - msg.msg_namelen = sizeof (client->theiraddr); - ret = sendmsg (*client->sock, &msg, 0); + gssize ret; - if (ret < 0) { - if (!socket_error_is_ignorable ()) { - gchar *errormessage = socket_last_error_message (); - GST_WARNING_OBJECT (sink, "client %p gave error %d (%s)", client, - socket_last_error_code (), errormessage); - g_free (errormessage); - break; - break; - } - } else { - num++; - client->bytes_sent += ret; - client->packets_sent++; - sink->bytes_served += ret; - break; - } - } + ret = + g_socket_send_message (sink->used_socket, client->addr, vec, n_mem, + NULL, 0, 0, sink->cancellable, &err); + + if (ret < 0) + goto send_error; + + num++; + client->bytes_sent += ret; + client->packets_sent++; + sink->bytes_served += ret; } } g_mutex_unlock (sink->client_lock); @@ -630,12 +512,12 @@ gst_multiudpsink_render (GstBaseSink * bsink, GstBuffer * buffer) for (i = 0; i < n_mem; i++) { mem = gst_buffer_peek_memory (buffer, i, GST_MAP_READ); - bsize = msg.msg_iov[i].iov_len; - bdata = msg.msg_iov[i].iov_base; + bdata = (guint8 *) vec[i].buffer; + bsize = vec[i].size; gst_memory_unmap (mem, bdata, bsize); } - g_free (iov); + g_free (vec); GST_LOG_OBJECT (sink, "sent %d bytes to %d (of %d) clients", size, num, no_clients); @@ -646,17 +528,13 @@ no_data: { return GST_FLOW_OK; } +send_error: + { + GST_DEBUG ("got send error %s", err->message); + g_clear_error (&err); + return GST_FLOW_ERROR; + } } -#endif - -#if 0 -/* DISABLED, core sends buffers to our render one by one, we can't really do - * much better */ -static GstFlowReturn -gst_multiudpsink_render_list (GstBaseSink * bsink, GstBufferList * list) -{ -} -#endif static void gst_multiudpsink_set_clients_string (GstMultiUDPSink * sink, @@ -672,13 +550,13 @@ gst_multiudpsink_set_clients_string (GstMultiUDPSink * sink, gst_multiudpsink_clear_internal (sink, FALSE); for (i = 0; clients[i]; i++) { gchar *host, *p; - gint port = 0; + gint64 port = 0; host = clients[i]; p = strstr (clients[i], ":"); if (p != NULL) { *p = '\0'; - port = atoi (p + 1); + port = g_ascii_strtoll (p + 1, NULL, 10); } if (port != 0) gst_multiudpsink_add_internal (sink, host, port, FALSE); @@ -720,33 +598,35 @@ gst_multiudpsink_get_clients_string (GstMultiUDPSink * sink) static void gst_multiudpsink_setup_qos_dscp (GstMultiUDPSink * sink) { - gint tos; - /* don't touch on -1 */ if (sink->qos_dscp < 0) return; - if (sink->sock < 0) + if (sink->used_socket == NULL) return; - GST_DEBUG_OBJECT (sink, "setting TOS to %d", sink->qos_dscp); +#ifdef IP_TOS + { + gint tos; + gint fd; - /* Extract and shift 6 bits of DSFIELD */ - tos = (sink->qos_dscp & 0x3f) << 2; + fd = g_socket_get_fd (sink->used_socket); - if (setsockopt (sink->sock, IPPROTO_IP, IP_TOS, &tos, sizeof (tos)) < 0) { - gchar *errormessage = socket_last_error_message (); - GST_ERROR_OBJECT (sink, "could not set TOS: %s", errormessage); - g_free (errormessage); - } + GST_DEBUG_OBJECT (sink, "setting TOS to %d", sink->qos_dscp); + + /* Extract and shift 6 bits of DSFIELD */ + tos = (sink->qos_dscp & 0x3f) << 2; + + if (setsockopt (fd, IPPROTO_IP, IP_TOS, &tos, sizeof (tos)) < 0) { + GST_ERROR_OBJECT (sink, "could not set TOS: %s", g_strerror (errno)); + } #ifdef IPV6_TCLASS - if (setsockopt (sink->sock, IPPROTO_IPV6, IPV6_TCLASS, &tos, - sizeof (tos)) < 0) { - gchar *errormessage = socket_last_error_message (); - GST_ERROR_OBJECT (sink, "could not set TCLASS: %s", errormessage); - g_free (errormessage); + if (setsockopt (fd, IPPROTO_IPV6, IPV6_TCLASS, &tos, sizeof (tos)) < 0) { + GST_ERROR_OBJECT (sink, "could not set TCLASS: %s", g_strerror (errno)); + } } #endif +#endif } static void @@ -758,15 +638,24 @@ gst_multiudpsink_set_property (GObject * object, guint prop_id, udpsink = GST_MULTIUDPSINK (object); switch (prop_id) { - case PROP_SOCKFD: - if (udpsink->sockfd >= 0 && udpsink->sockfd != udpsink->sock && - udpsink->closefd) - CLOSE_SOCKET (udpsink->sockfd); - udpsink->sockfd = g_value_get_int (value); - GST_DEBUG_OBJECT (udpsink, "setting SOCKFD to %d", udpsink->sockfd); + case PROP_SOCKET: + if (udpsink->socket != NULL && udpsink->socket != udpsink->used_socket && + udpsink->close_socket) { + GError *err = NULL; + + if (!g_socket_close (udpsink->socket, &err)) { + GST_ERROR ("failed to close socket %p: %s", udpsink->socket, + err->message); + g_clear_error (&err); + } + } + if (udpsink->socket) + g_object_unref (udpsink->socket); + udpsink->socket = g_value_dup_object (value); + GST_DEBUG_OBJECT (udpsink, "setting socket to %p", udpsink->socket); break; - case PROP_CLOSEFD: - udpsink->closefd = g_value_get_boolean (value); + case PROP_CLOSE_SOCKET: + udpsink->close_socket = g_value_get_boolean (value); break; case PROP_CLIENTS: gst_multiudpsink_set_clients_string (udpsink, g_value_get_string (value)); @@ -814,14 +703,14 @@ gst_multiudpsink_get_property (GObject * object, guint prop_id, GValue * value, case PROP_BYTES_SERVED: g_value_set_uint64 (value, udpsink->bytes_served); break; - case PROP_SOCKFD: - g_value_set_int (value, udpsink->sockfd); + case PROP_SOCKET: + g_value_set_object (value, udpsink->socket); break; - case PROP_CLOSEFD: - g_value_set_boolean (value, udpsink->closefd); + case PROP_CLOSE_SOCKET: + g_value_set_boolean (value, udpsink->close_socket); break; - case PROP_SOCK: - g_value_set_int (value, udpsink->sock); + case PROP_USED_SOCKET: + g_value_set_object (value, udpsink->used_socket); break; case PROP_CLIENTS: g_value_take_string (value, @@ -858,133 +747,113 @@ static gboolean gst_multiudpsink_configure_client (GstMultiUDPSink * sink, GstUDPClient * client) { + GInetSocketAddress *saddr = G_INET_SOCKET_ADDRESS (client->addr); + GInetAddress *addr = g_inet_socket_address_get_address (saddr); + GError *err = NULL; + GST_DEBUG_OBJECT (sink, "configuring client %p", client); - if (gst_udp_is_multicast (&client->theiraddr)) { + if (g_inet_address_get_is_multicast (addr)) { GST_DEBUG_OBJECT (sink, "we have a multicast client %p", client); if (sink->auto_multicast) { GST_DEBUG_OBJECT (sink, "autojoining group"); - if (gst_udp_join_group (*(client->sock), &client->theiraddr, NULL) - != 0) + if (!g_socket_join_multicast_group (sink->used_socket, addr, FALSE, NULL, + &err)) goto join_group_failed; } GST_DEBUG_OBJECT (sink, "setting loop to %d", sink->loop); - if (gst_udp_set_loop (sink->sock, sink->ss_family, sink->loop) != 0) - goto loop_failed; + g_socket_set_multicast_loopback (sink->used_socket, sink->loop); GST_DEBUG_OBJECT (sink, "setting ttl to %d", sink->ttl_mc); - if (gst_udp_set_ttl (sink->sock, sink->ss_family, sink->ttl_mc, TRUE) != 0) - goto ttl_failed; + g_socket_set_multicast_ttl (sink->used_socket, sink->ttl_mc); } else { GST_DEBUG_OBJECT (sink, "setting unicast ttl to %d", sink->ttl); - if (gst_udp_set_ttl (sink->sock, sink->ss_family, sink->ttl, FALSE) != 0) - goto ttl_failed; + g_socket_set_ttl (sink->used_socket, sink->ttl); } return TRUE; /* ERRORS */ join_group_failed: { - gchar *errormessage = socket_last_error_message (); - int errorcode = socket_last_error_code (); - CLOSE_IF_REQUESTED (sink); + gst_multiudpsink_stop (GST_BASE_SINK (sink)); GST_ELEMENT_ERROR (sink, RESOURCE, SETTINGS, (NULL), - ("Could not join multicast group (%d): %s", errorcode, errormessage)); - g_free (errormessage); - return FALSE; - } -ttl_failed: - { - gchar *errormessage = socket_last_error_message (); - int errorcode = socket_last_error_code (); - CLOSE_IF_REQUESTED (sink); - GST_ELEMENT_ERROR (sink, RESOURCE, SETTINGS, (NULL), - ("Could not set TTL socket option (%d): %s", errorcode, errormessage)); - g_free (errormessage); - return FALSE; - } -loop_failed: - { - gchar *errormessage = socket_last_error_message (); - int errorcode = socket_last_error_code (); - CLOSE_IF_REQUESTED (sink); - GST_ELEMENT_ERROR (sink, RESOURCE, SETTINGS, (NULL), - ("Could not set loopback socket option (%d): %s", - errorcode, errormessage)); - g_free (errormessage); + ("Could not join multicast group: %s", err->message)); + g_clear_error (&err); return FALSE; } } /* create a socket for sending to remote machine */ static gboolean -gst_multiudpsink_init_send (GstMultiUDPSink * sink) +gst_multiudpsink_start (GstBaseSink * bsink) { - guint bc_val; + GstMultiUDPSink *sink; GList *clients; GstUDPClient *client; - int sndsize, ret; - socklen_t len; + GError *err = NULL; - if (sink->sockfd == -1) { + sink = GST_MULTIUDPSINK (bsink); + + if (sink->socket == NULL) { GST_DEBUG_OBJECT (sink, "creating sockets"); /* create sender socket try IP6, fall back to IP4 */ - sink->ss_family = AF_INET6; - if ((sink->sock = socket (AF_INET6, SOCK_DGRAM, 0)) == -1) { - sink->ss_family = AF_INET; - if ((sink->sock = socket (AF_INET, SOCK_DGRAM, 0)) == -1) + sink->family = G_SOCKET_FAMILY_IPV6; + if ((sink->used_socket = + g_socket_new (G_SOCKET_FAMILY_IPV6, + G_SOCKET_TYPE_DATAGRAM, G_SOCKET_PROTOCOL_UDP, &err)) == NULL) { + sink->family = G_SOCKET_FAMILY_IPV4; + if ((sink->used_socket = g_socket_new (G_SOCKET_FAMILY_IPV4, + G_SOCKET_TYPE_DATAGRAM, G_SOCKET_PROTOCOL_UDP, &err)) == NULL) goto no_socket; } GST_DEBUG_OBJECT (sink, "have socket"); - sink->externalfd = FALSE; + sink->external_socket = FALSE; } else { - struct sockaddr_storage myaddr; - GST_DEBUG_OBJECT (sink, "using configured socket"); - /* we use the configured socket, try to get some info about it */ - len = sizeof (myaddr); - if (getsockname (sink->sockfd, (struct sockaddr *) &myaddr, &len) < 0) - goto getsockname_error; - - sink->ss_family = myaddr.ss_family; /* we use the configured socket */ - sink->sock = sink->sockfd; - sink->externalfd = TRUE; + sink->used_socket = G_SOCKET (g_object_ref (sink->socket)); + sink->family = g_socket_get_family (sink->used_socket); + sink->external_socket = TRUE; } - len = sizeof (sndsize); - if (sink->buffer_size != 0) { - sndsize = sink->buffer_size; +#ifdef SO_SNDBUF - GST_DEBUG_OBJECT (sink, "setting udp buffer of %d bytes", sndsize); - /* set buffer size, Note that on Linux this is typically limited to a - * maximum of around 100K. Also a minimum of 128 bytes is required on - * Linux. */ - ret = - setsockopt (sink->sockfd, SOL_SOCKET, SO_SNDBUF, (void *) &sndsize, - len); - if (ret != 0) { - GST_ELEMENT_WARNING (sink, RESOURCE, SETTINGS, (NULL), - ("Could not create a buffer of requested %d bytes, %d: %s (%d)", - sndsize, ret, g_strerror (errno), errno)); + { + socklen_t len; + gint sndsize, ret; + + len = sizeof (sndsize); + if (sink->buffer_size != 0) { + sndsize = sink->buffer_size; + + GST_DEBUG_OBJECT (sink, "setting udp buffer of %d bytes", sndsize); + /* set buffer size, Note that on Linux this is typically limited to a + * maximum of around 100K. Also a minimum of 128 bytes is required on + * Linux. */ + ret = + setsockopt (g_socket_get_fd (sink->used_socket), SOL_SOCKET, + SO_SNDBUF, (void *) &sndsize, len); + if (ret != 0) { + GST_ELEMENT_WARNING (sink, RESOURCE, SETTINGS, (NULL), + ("Could not create a buffer of requested %d bytes, %d: %s", + sndsize, ret, g_strerror (errno))); + } } + + /* read the value of the receive buffer. Note that on linux this returns 2x the + * value we set because the kernel allocates extra memory for metadata. + * The default on Linux is about 100K (which is about 50K without metadata) */ + ret = + getsockopt (g_socket_get_fd (sink->used_socket), SOL_SOCKET, SO_SNDBUF, + (void *) &sndsize, &len); + if (ret == 0) + GST_DEBUG_OBJECT (sink, "have udp buffer of %d bytes", sndsize); + else + GST_DEBUG_OBJECT (sink, "could not get udp buffer size"); } +#endif - /* read the value of the receive buffer. Note that on linux this returns 2x the - * value we set because the kernel allocates extra memory for metadata. - * The default on Linux is about 100K (which is about 50K without metadata) */ - ret = - getsockopt (sink->sockfd, SOL_SOCKET, SO_SNDBUF, (void *) &sndsize, &len); - if (ret == 0) - GST_DEBUG_OBJECT (sink, "have udp buffer of %d bytes", sndsize); - else - GST_DEBUG_OBJECT (sink, "could not get udp buffer size"); - - - bc_val = 1; - if (setsockopt (sink->sock, SOL_SOCKET, SO_BROADCAST, &bc_val, - sizeof (bc_val)) < 0) - goto no_broadcast; + g_socket_set_broadcast (sink->used_socket, TRUE); sink->bytes_to_serve = 0; sink->bytes_served = 0; @@ -1004,39 +873,35 @@ gst_multiudpsink_init_send (GstMultiUDPSink * sink) /* ERRORS */ no_socket: { - gchar *errormessage = socket_last_error_message (); - int errorcode = socket_last_error_code (); GST_ELEMENT_ERROR (sink, RESOURCE, FAILED, (NULL), - ("Could not create socket (%d): %s", errorcode, errormessage)); - g_free (errormessage); - return FALSE; - } -getsockname_error: - { - gchar *errormessage = socket_last_error_message (); - int errorcode = socket_last_error_code (); - GST_ELEMENT_ERROR (sink, RESOURCE, FAILED, (NULL), - ("Could not getsockname (%d): %s", errorcode, errormessage)); - g_free (errormessage); - return FALSE; - } -no_broadcast: - { - gchar *errormessage = socket_last_error_message (); - int errorcode = socket_last_error_code (); - CLOSE_IF_REQUESTED (sink); - GST_ELEMENT_ERROR (sink, RESOURCE, SETTINGS, (NULL), - ("Could not set broadcast socket option (%d): %s", - errorcode, errormessage)); - g_free (errormessage); + ("Could not create socket: %s", err->message)); + g_clear_error (&err); return FALSE; } } -static void -gst_multiudpsink_close (GstMultiUDPSink * sink) +static gboolean +gst_multiudpsink_stop (GstBaseSink * bsink) { - CLOSE_IF_REQUESTED (sink); + GstMultiUDPSink *udpsink; + + udpsink = GST_MULTIUDPSINK (bsink); + + if (udpsink->used_socket) { + if (udpsink->close_socket || !udpsink->external_socket) { + GError *err = NULL; + + if (!g_socket_close (udpsink->used_socket, &err)) { + GST_ERROR_OBJECT (udpsink, "Failed to close socket: %s", err->message); + g_clear_error (&err); + } + } + + g_object_unref (udpsink->used_socket); + udpsink->used_socket = NULL; + } + + return TRUE; } static void @@ -1066,18 +931,14 @@ gst_multiudpsink_add_internal (GstMultiUDPSink * sink, const gchar * host, client->refcount++; } else { client = create_client (sink, host, port); - - client->sock = &sink->sock; - - if (gst_udp_get_addr (host, port, &client->theiraddr) < 0) - goto getaddrinfo_error; + if (!client) + goto error; g_get_current_time (&now); client->connect_time = GST_TIMEVAL_TO_TIME (now); - if (*client->sock > 0) { + if (sink->used_socket) gst_multiudpsink_configure_client (sink, client); - } GST_DEBUG_OBJECT (sink, "add client with host %s, port %d", host, port); sink->clients = g_list_prepend (sink->clients, client); @@ -1093,12 +954,10 @@ gst_multiudpsink_add_internal (GstMultiUDPSink * sink, const gchar * host, return; /* ERRORS */ -getaddrinfo_error: +error: { GST_DEBUG_OBJECT (sink, "did not add client on host %s, port %d", host, port); - GST_WARNING_OBJECT (sink, "getaddrinfo lookup error?"); - free_client (client); if (lock) g_mutex_unlock (sink->client_lock); return; @@ -1135,14 +994,25 @@ gst_multiudpsink_remove (GstMultiUDPSink * sink, const gchar * host, gint port) client->refcount--; if (client->refcount == 0) { + GInetSocketAddress *saddr = G_INET_SOCKET_ADDRESS (client->addr); + GInetAddress *addr = g_inet_socket_address_get_address (saddr); + GST_DEBUG_OBJECT (sink, "remove client with host %s, port %d", host, port); g_get_current_time (&now); client->disconnect_time = GST_TIMEVAL_TO_TIME (now); - if (*(client->sock) != -1 && sink->auto_multicast - && gst_udp_is_multicast (&client->theiraddr)) - gst_udp_leave_group (*(client->sock), &client->theiraddr); + if (sink->used_socket && sink->auto_multicast + && g_inet_address_get_is_multicast (addr)) { + GError *err = NULL; + + if (!g_socket_leave_multicast_group (sink->used_socket, addr, FALSE, NULL, + &err)) { + GST_DEBUG_OBJECT (sink, "Failed to leave multicast group: %s", + err->message); + g_clear_error (&err); + } + } /* Unlock to emit signal before we delete the actual client */ g_mutex_unlock (sink->client_lock); @@ -1189,15 +1059,14 @@ gst_multiudpsink_clear (GstMultiUDPSink * sink) gst_multiudpsink_clear_internal (sink, TRUE); } -GValueArray * +GstStructure * gst_multiudpsink_get_stats (GstMultiUDPSink * sink, const gchar * host, gint port) { GstUDPClient *client; - GValueArray *result = NULL; + GstStructure *result = NULL; GstUDPClient udpclient; GList *find; - GValue value = { 0 }; udpclient.host = (gchar *) host; udpclient.port = port; @@ -1213,29 +1082,13 @@ gst_multiudpsink_get_stats (GstMultiUDPSink * sink, const gchar * host, client = (GstUDPClient *) find->data; - /* Result is a value array of (bytes_sent, packets_sent, - * connect_time, disconnect_time), all as uint64 */ - result = g_value_array_new (4); + result = gst_structure_new_empty ("multiudpsink-stats"); - g_value_init (&value, G_TYPE_UINT64); - g_value_set_uint64 (&value, client->bytes_sent); - result = g_value_array_append (result, &value); - g_value_unset (&value); - - g_value_init (&value, G_TYPE_UINT64); - g_value_set_uint64 (&value, client->packets_sent); - result = g_value_array_append (result, &value); - g_value_unset (&value); - - g_value_init (&value, G_TYPE_UINT64); - g_value_set_uint64 (&value, client->connect_time); - result = g_value_array_append (result, &value); - g_value_unset (&value); - - g_value_init (&value, G_TYPE_UINT64); - g_value_set_uint64 (&value, client->disconnect_time); - result = g_value_array_append (result, &value); - g_value_unset (&value); + gst_structure_set (result, + "bytes-sent", G_TYPE_UINT64, client->bytes_sent, + "packets-sent", G_TYPE_UINT64, client->packets_sent, + "connect-time", G_TYPE_UINT64, client->connect_time, + "disconnect-time", G_TYPE_UINT64, client->disconnect_time, NULL); g_mutex_unlock (sink->client_lock); @@ -1249,42 +1102,30 @@ not_found: host, port); /* Apparently (see comment in gstmultifdsink.c) returning NULL from here may * confuse/break python bindings */ - return g_value_array_new (0); + return gst_structure_new_empty ("multiudpsink-stats"); } } -static GstStateChangeReturn -gst_multiudpsink_change_state (GstElement * element, GstStateChange transition) +static gboolean +gst_multiudpsink_unlock (GstBaseSink * bsink) { - GstStateChangeReturn ret; GstMultiUDPSink *sink; - sink = GST_MULTIUDPSINK (element); + sink = GST_MULTIUDPSINK (bsink); - switch (transition) { - case GST_STATE_CHANGE_READY_TO_PAUSED: - if (!gst_multiudpsink_init_send (sink)) - goto no_init; - break; - default: - break; - } + g_cancellable_cancel (sink->cancellable); - ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition); - - switch (transition) { - case GST_STATE_CHANGE_PAUSED_TO_READY: - gst_multiudpsink_close (sink); - break; - default: - break; - } - return ret; - - /* ERRORS */ -no_init: - { - /* _init_send() posted specific error already */ - return GST_STATE_CHANGE_FAILURE; - } + return TRUE; +} + +static gboolean +gst_multiudpsink_unlock_stop (GstBaseSink * bsink) +{ + GstMultiUDPSink *sink; + + sink = GST_MULTIUDPSINK (bsink); + + g_cancellable_reset (sink->cancellable); + + return TRUE; } diff --git a/gst/udp/gstmultiudpsink.h b/gst/udp/gstmultiudpsink.h index c31dbad83a..aa5f2e9ebf 100644 --- a/gst/udp/gstmultiudpsink.h +++ b/gst/udp/gstmultiudpsink.h @@ -1,5 +1,5 @@ /* GStreamer - * Copyright (C) <2005> Wim Taymand + * Copyright (C) <2005> Wim Taymans * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Library General Public @@ -22,6 +22,7 @@ #include #include +#include G_BEGIN_DECLS @@ -40,10 +41,7 @@ typedef struct _GstMultiUDPSinkClass GstMultiUDPSinkClass; typedef struct { gint refcount; - int *sock; - - struct sockaddr_storage theiraddr; - + GSocketAddress *addr; gchar *host; gint port; @@ -59,7 +57,8 @@ typedef struct { struct _GstMultiUDPSink { GstBaseSink parent; - int sock; + GSocket *used_socket; + GCancellable *cancellable; GMutex *client_lock; GList *clients; @@ -67,17 +66,17 @@ struct _GstMultiUDPSink { /* properties */ guint64 bytes_to_serve; guint64 bytes_served; - int sockfd; - gboolean closefd; + GSocket *socket; + gboolean close_socket; - gboolean externalfd; + gboolean external_socket; gboolean auto_multicast; gint ttl; gint ttl_mc; gboolean loop; gint qos_dscp; - guint16 ss_family; + GSocketFamily family; gboolean send_duplicates; gint buffer_size; @@ -90,7 +89,7 @@ struct _GstMultiUDPSinkClass { void (*add) (GstMultiUDPSink *sink, const gchar *host, gint port); void (*remove) (GstMultiUDPSink *sink, const gchar *host, gint port); void (*clear) (GstMultiUDPSink *sink); - GValueArray* (*get_stats) (GstMultiUDPSink *sink, const gchar *host, gint port); + GstStructure* (*get_stats) (GstMultiUDPSink *sink, const gchar *host, gint port); /* signals */ void (*client_added) (GstElement *element, const gchar *host, gint port); @@ -102,7 +101,7 @@ GType gst_multiudpsink_get_type(void); void gst_multiudpsink_add (GstMultiUDPSink *sink, const gchar *host, gint port); void gst_multiudpsink_remove (GstMultiUDPSink *sink, const gchar *host, gint port); void gst_multiudpsink_clear (GstMultiUDPSink *sink); -GValueArray* gst_multiudpsink_get_stats (GstMultiUDPSink *sink, const gchar *host, gint port); +GstStructure* gst_multiudpsink_get_stats (GstMultiUDPSink *sink, const gchar *host, gint port); G_END_DECLS diff --git a/gst/udp/gstudpsink.c b/gst/udp/gstudpsink.c index c336ccfc2e..915c82ba20 100644 --- a/gst/udp/gstudpsink.c +++ b/gst/udp/gstudpsink.c @@ -1,5 +1,7 @@ /* GStreamer * Copyright (C) <2005> Wim Taymans + * Copyright (C) <2012> Collabora Ltd. + * Author: Sebastian Dröge * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Library General Public @@ -35,14 +37,6 @@ #endif #include "gstudpsink.h" -#include -#include -#ifdef HAVE_UNISTD_H -#include -#endif -#include -#include - #define UDP_DEFAULT_HOST "localhost" #define UDP_DEFAULT_PORT 4951 @@ -108,17 +102,22 @@ gst_udpsink_class_init (GstUDPSinkClass * klass) static void gst_udpsink_init (GstUDPSink * udpsink) { - gst_udp_uri_init (&udpsink->uri, UDP_DEFAULT_HOST, UDP_DEFAULT_PORT); + udpsink->host = g_strdup (UDP_DEFAULT_HOST); + udpsink->port = UDP_DEFAULT_PORT; + udpsink->uri = g_strdup_printf ("udp://%s:%d", udpsink->host, udpsink->port); - gst_multiudpsink_add (GST_MULTIUDPSINK (udpsink), udpsink->uri.host, - udpsink->uri.port); + gst_multiudpsink_add (GST_MULTIUDPSINK (udpsink), udpsink->host, + udpsink->port); } static void gst_udpsink_finalize (GstUDPSink * udpsink) { - gst_udp_uri_free (&udpsink->uri); - g_free (udpsink->uristr); + g_free (udpsink->host); + udpsink->host = NULL; + + g_free (udpsink->uri); + udpsink->uri = NULL; G_OBJECT_CLASS (parent_class)->finalize ((GObject *) udpsink); } @@ -126,14 +125,15 @@ gst_udpsink_finalize (GstUDPSink * udpsink) static gboolean gst_udpsink_set_uri (GstUDPSink * sink, const gchar * uri, GError ** error) { - gst_multiudpsink_remove (GST_MULTIUDPSINK (sink), sink->uri.host, - sink->uri.port); + gst_multiudpsink_remove (GST_MULTIUDPSINK (sink), sink->host, sink->port); - if (gst_udp_parse_uri (uri, &sink->uri.host, &sink->uri.port) < 0) + if (!gst_udp_parse_uri (uri, &sink->host, &sink->port)) goto wrong_uri; - gst_multiudpsink_add (GST_MULTIUDPSINK (sink), sink->uri.host, - sink->uri.port); + g_free (sink->uri); + sink->uri = g_strdup (uri); + + gst_multiudpsink_add (GST_MULTIUDPSINK (sink), sink->host, sink->port); return TRUE; @@ -158,7 +158,7 @@ gst_udpsink_set_property (GObject * object, guint prop_id, const GValue * value, /* remove old host */ gst_multiudpsink_remove (GST_MULTIUDPSINK (udpsink), - udpsink->uri.host, udpsink->uri.port); + udpsink->host, udpsink->port); switch (prop_id) { case PROP_HOST: @@ -166,15 +166,18 @@ gst_udpsink_set_property (GObject * object, guint prop_id, const GValue * value, const gchar *host; host = g_value_get_string (value); - - if (host) - gst_udp_uri_update (&udpsink->uri, host, -1); - else - gst_udp_uri_update (&udpsink->uri, UDP_DEFAULT_HOST, -1); + g_free (udpsink->host); + udpsink->host = g_strdup (host); + g_free (udpsink->uri); + udpsink->uri = + g_strdup_printf ("udp://%s:%d", udpsink->host, udpsink->port); break; } case PROP_PORT: - gst_udp_uri_update (&udpsink->uri, NULL, g_value_get_int (value)); + udpsink->port = g_value_get_int (value); + g_free (udpsink->uri); + udpsink->uri = + g_strdup_printf ("udp://%s:%d", udpsink->host, udpsink->port); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); @@ -182,7 +185,7 @@ gst_udpsink_set_property (GObject * object, guint prop_id, const GValue * value, } /* add new host */ gst_multiudpsink_add (GST_MULTIUDPSINK (udpsink), - udpsink->uri.host, udpsink->uri.port); + udpsink->host, udpsink->port); } static void @@ -195,10 +198,10 @@ gst_udpsink_get_property (GObject * object, guint prop_id, GValue * value, switch (prop_id) { case PROP_HOST: - g_value_set_string (value, udpsink->uri.host); + g_value_set_string (value, udpsink->host); break; case PROP_PORT: - g_value_set_int (value, udpsink->uri.port); + g_value_set_int (value, udpsink->port); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); @@ -227,10 +230,7 @@ gst_udpsink_uri_get_uri (GstURIHandler * handler) { GstUDPSink *sink = GST_UDPSINK (handler); - g_free (sink->uristr); - sink->uristr = gst_udp_uri_string (&sink->uri); - - return g_strdup (sink->uristr); + return g_strdup (sink->uri); } static gboolean diff --git a/gst/udp/gstudpsink.h b/gst/udp/gstudpsink.h index d45045e023..b04fdda92c 100644 --- a/gst/udp/gstudpsink.h +++ b/gst/udp/gstudpsink.h @@ -41,8 +41,10 @@ typedef struct _GstUDPSinkClass GstUDPSinkClass; struct _GstUDPSink { GstMultiUDPSink parent; - GstUDPUri uri; - gchar *uristr; + gchar *host; + guint16 port; + + gchar *uri; }; struct _GstUDPSinkClass {