diff --git a/gst/udp/gstdynudpsink.c b/gst/udp/gstdynudpsink.c index a9e1cf9a2b..abcd6f438b 100644 --- a/gst/udp/gstdynudpsink.c +++ b/gst/udp/gstdynudpsink.c @@ -2,6 +2,8 @@ * Copyright (C) <2005> Philippe Khalaf * Copyright (C) <2005> Nokia Corporation * Copyright (C) <2006> Joni Valtanen + * Copyright (C) <2012> Collabora Ltd. + * Author: Sebastian Dröge * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Library General Public @@ -25,17 +27,6 @@ #include "gstudp-marshal.h" #include "gstdynudpsink.h" -#include -#include -#ifdef HAVE_UNISTD_H -#include -#endif -#include -#include -#ifdef HAVE_SYS_TIME_H -#include -#endif -#include #include GST_DEBUG_CATEGORY_STATIC (dynudpsink_debug); @@ -58,33 +49,26 @@ enum LAST_SIGNAL }; -#define UDP_DEFAULT_SOCKFD -1 -#define UDP_DEFAULT_CLOSEFD TRUE +#define UDP_DEFAULT_SOCKET NULL +#define UDP_DEFAULT_CLOSE_SOCKET TRUE +#define UDP_DEFAULT_FAMILY G_SOCKET_FAMILY_IPV4 enum { PROP_0, - PROP_SOCKFD, - PROP_CLOSEFD + PROP_SOCKET, + PROP_CLOSE_SOCKET, + PROP_FAMILY }; -#define CLOSE_IF_REQUESTED(udpctx) \ -G_STMT_START { \ - if ((!udpctx->externalfd) || (udpctx->externalfd && udpctx->closefd)) { \ - CLOSE_SOCKET(udpctx->sock); \ - if (udpctx->sock == udpctx->sockfd) \ - udpctx->sockfd = UDP_DEFAULT_SOCKFD; \ - } \ - udpctx->sock = -1; \ -} G_STMT_END - static void gst_dynudpsink_finalize (GObject * object); static GstFlowReturn gst_dynudpsink_render (GstBaseSink * sink, GstBuffer * buffer); -static void gst_dynudpsink_close (GstDynUDPSink * sink); -static GstStateChangeReturn gst_dynudpsink_change_state (GstElement * element, - GstStateChange transition); +static gboolean gst_dynudpsink_stop (GstBaseSink * bsink); +static gboolean gst_dynudpsink_start (GstBaseSink * bsink); +static gboolean gst_dynudpsink_unlock (GstBaseSink * bsink); +static gboolean gst_dynudpsink_unlock_stop (GstBaseSink * bsink); static void gst_dynudpsink_set_property (GObject * object, guint prop_id, const GValue * value, GParamSpec * pspec); @@ -119,17 +103,21 @@ gst_dynudpsink_class_init (GstDynUDPSinkClass * klass) NULL, NULL, gst_udp_marshal_BOXED__STRING_INT, G_TYPE_VALUE_ARRAY, 2, G_TYPE_STRING, G_TYPE_INT); - g_object_class_install_property (gobject_class, PROP_SOCKFD, - g_param_spec_int ("sockfd", "socket handle", - "Socket to use for UDP sending. (-1 == allocate)", - -1, G_MAXINT16, UDP_DEFAULT_SOCKFD, + g_object_class_install_property (gobject_class, PROP_SOCKET, + g_param_spec_object ("socket", "Socket", + "Socket to use for UDP sending. (NULL == allocate)", + G_TYPE_SOCKET, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + g_object_class_install_property (gobject_class, PROP_CLOSE_SOCKET, + g_param_spec_boolean ("close-socket", "Close socket", + "Close socket if passed as property on state change", + UDP_DEFAULT_CLOSE_SOCKET, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - g_object_class_install_property (gobject_class, PROP_CLOSEFD, - g_param_spec_boolean ("closefd", "Close sockfd", - "Close sockfd if passed as property on state change", - UDP_DEFAULT_CLOSEFD, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - gstelement_class->change_state = gst_dynudpsink_change_state; + g_object_class_install_property (gobject_class, PROP_FAMILY, + g_param_spec_enum ("family", "Socket family", + "Use IPv4 or IPv6", + G_TYPE_SOCKET_FAMILY, UDP_DEFAULT_FAMILY, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); gst_element_class_add_pad_template (gstelement_class, gst_static_pad_template_get (&sink_template)); @@ -140,6 +128,10 @@ gst_dynudpsink_class_init (GstDynUDPSinkClass * klass) "Philippe Khalaf "); gstbasesink_class->render = gst_dynudpsink_render; + gstbasesink_class->start = gst_dynudpsink_start; + gstbasesink_class->stop = gst_dynudpsink_stop; + gstbasesink_class->unlock = gst_dynudpsink_unlock; + gstbasesink_class->unlock_stop = gst_dynudpsink_unlock_stop; GST_DEBUG_CATEGORY_INIT (dynudpsink_debug, "dynudpsink", 0, "UDP sink"); } @@ -147,44 +139,47 @@ gst_dynudpsink_class_init (GstDynUDPSinkClass * klass) static void gst_dynudpsink_init (GstDynUDPSink * sink) { - WSA_STARTUP (sink); + sink->socket = UDP_DEFAULT_SOCKET; + sink->close_socket = UDP_DEFAULT_CLOSE_SOCKET; + sink->external_socket = FALSE; + sink->family = G_SOCKET_FAMILY_IPV4; - sink->sockfd = UDP_DEFAULT_SOCKFD; - sink->closefd = UDP_DEFAULT_CLOSEFD; - sink->externalfd = FALSE; - - sink->sock = -1; + sink->used_socket = NULL; + sink->cancellable = g_cancellable_new (); } static void gst_dynudpsink_finalize (GObject * object) { - GstDynUDPSink *udpsink; + GstDynUDPSink *sink; - udpsink = GST_DYNUDPSINK (object); + sink = GST_DYNUDPSINK (object); - if (udpsink->sockfd >= 0 && udpsink->closefd) - CLOSE_SOCKET (udpsink->sockfd); + if (sink->cancellable) + g_object_unref (sink->cancellable); + sink->cancellable = NULL; + + if (sink->socket) + g_object_unref (sink->socket); + sink->socket = NULL; + + if (sink->used_socket) + g_object_unref (sink->used_socket); + sink->used_socket = NULL; G_OBJECT_CLASS (parent_class)->finalize (object); - - WSA_CLEANUP (object); } static GstFlowReturn gst_dynudpsink_render (GstBaseSink * bsink, GstBuffer * buffer) { GstDynUDPSink *sink; - gint ret; + gssize ret; gsize size; guint8 *data; GstNetAddressMeta *meta; - struct sockaddr_in theiraddr; - guint16 destport; - guint32 destaddr; - const guint8 *destaddr_bytes; - - memset (&theiraddr, 0, sizeof (theiraddr)); + GSocketAddress *addr; + GError *err = NULL; meta = gst_buffer_get_net_address_meta (buffer); @@ -195,46 +190,51 @@ gst_dynudpsink_render (GstBaseSink * bsink, GstBuffer * buffer) sink = GST_DYNUDPSINK (bsink); + /* let's get the address from the metadata */ + addr = meta->addr; + + if (g_socket_address_get_family (addr) != sink->family) + goto invalid_family; + data = gst_buffer_map (buffer, &size, NULL, GST_MAP_READ); GST_DEBUG ("about to send %" G_GSIZE_FORMAT " bytes", size); - /* let's get the address from the metaata */ - destaddr_bytes = - g_inet_address_to_bytes (g_inet_socket_address_get_address - (G_INET_SOCKET_ADDRESS (meta->addr))); - destaddr = GST_READ_UINT32_BE (destaddr_bytes); - destport = - g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (meta->addr)); +#ifndef GST_DISABLE_GST_DEBUG + { + gchar *host; - GST_DEBUG ("sending %" G_GSIZE_FORMAT " bytes to client %d port %d", size, - destaddr, destport); - - theiraddr.sin_family = AF_INET; - theiraddr.sin_addr.s_addr = destaddr; - theiraddr.sin_port = destport; -#ifdef G_OS_WIN32 - ret = sendto (sink->sock, (char *) data, size, 0, -#else - ret = sendto (sink->sock, data, size, 0, + host = + g_inet_address_to_string (g_inet_socket_address_get_address + (G_INET_SOCKET_ADDRESS (addr))); + GST_DEBUG ("sending %" G_GSIZE_FORMAT " bytes to client %s port %d", size, + host, g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (addr))); + g_free (host); + } #endif - (struct sockaddr *) &theiraddr, sizeof (theiraddr)); + ret = + g_socket_send_to (sink->used_socket, addr, (gchar *) data, size, + sink->cancellable, &err); gst_buffer_unmap (buffer, data, size); - if (ret < 0) { - if (errno != EINTR && errno != EAGAIN) { - goto send_error; - } - } + if (ret < 0) + goto send_error; - GST_DEBUG ("sent %" G_GSIZE_FORMAT " bytes", size); + GST_DEBUG ("sent %" G_GSSIZE_FORMAT " bytes", ret); return GST_FLOW_OK; send_error: { - GST_DEBUG ("got send error %s (%d)", g_strerror (errno), errno); + GST_DEBUG ("got send error %s", err->message); + g_clear_error (&err); + return GST_FLOW_ERROR; + } +invalid_family: + { + GST_DEBUG ("invalid family (got %d, expected %d)", + g_socket_address_get_family (addr), sink->family); return GST_FLOW_ERROR; } } @@ -248,17 +248,28 @@ gst_dynudpsink_set_property (GObject * object, guint prop_id, udpsink = GST_DYNUDPSINK (object); switch (prop_id) { - case PROP_SOCKFD: - if (udpsink->sockfd >= 0 && udpsink->sockfd != udpsink->sock && - udpsink->closefd) - CLOSE_SOCKET (udpsink->sockfd); - udpsink->sockfd = g_value_get_int (value); - GST_DEBUG ("setting SOCKFD to %d", udpsink->sockfd); - break; - case PROP_CLOSEFD: - udpsink->closefd = g_value_get_boolean (value); - break; + case PROP_SOCKET: + if (udpsink->socket != NULL && udpsink->socket != udpsink->used_socket && + udpsink->close_socket) { + GError *err = NULL; + if (!g_socket_close (udpsink->socket, &err)) { + GST_ERROR ("failed to close socket %p: %s", udpsink->socket, + err->message); + g_clear_error (&err); + } + } + if (udpsink->socket) + g_object_unref (udpsink->socket); + udpsink->socket = g_value_dup_object (value); + GST_DEBUG ("setting socket to %p", udpsink->socket); + break; + case PROP_CLOSE_SOCKET: + udpsink->close_socket = g_value_get_boolean (value); + break; + case PROP_FAMILY: + udpsink->family = g_value_get_enum (value); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -274,11 +285,14 @@ gst_dynudpsink_get_property (GObject * object, guint prop_id, GValue * value, udpsink = GST_DYNUDPSINK (object); switch (prop_id) { - case PROP_SOCKFD: - g_value_set_int (value, udpsink->sockfd); + case PROP_SOCKET: + g_value_set_object (value, udpsink->socket); break; - case PROP_CLOSEFD: - g_value_set_boolean (value, udpsink->closefd); + case PROP_CLOSE_SOCKET: + g_value_set_boolean (value, udpsink->close_socket); + break; + case PROP_FAMILY: + g_value_set_enum (value, udpsink->family); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); @@ -286,87 +300,90 @@ gst_dynudpsink_get_property (GObject * object, guint prop_id, GValue * value, } } - /* create a socket for sending to remote machine */ static gboolean -gst_dynudpsink_init_send (GstDynUDPSink * sink) +gst_dynudpsink_start (GstBaseSink * bsink) { - guint bc_val; + GstDynUDPSink *udpsink; + GError *err = NULL; - if (sink->sockfd == -1) { + udpsink = GST_DYNUDPSINK (bsink); + + if (udpsink->socket == NULL) { /* create sender socket if none available */ - if ((sink->sock = socket (AF_INET, SOCK_DGRAM, 0)) < 0) + if ((udpsink->used_socket = + g_socket_new (udpsink->family, + G_SOCKET_TYPE_DATAGRAM, G_SOCKET_PROTOCOL_UDP, &err)) == NULL) goto no_socket; - bc_val = 1; - if (setsockopt (sink->sock, SOL_SOCKET, SO_BROADCAST, &bc_val, - sizeof (bc_val)) < 0) - goto no_broadcast; - - sink->externalfd = TRUE; + g_socket_set_broadcast (udpsink->used_socket, TRUE); + udpsink->external_socket = FALSE; } else { - sink->sock = sink->sockfd; - sink->externalfd = TRUE; + udpsink->used_socket = G_SOCKET (g_object_ref (udpsink->socket)); + udpsink->external_socket = TRUE; } + return TRUE; /* ERRORS */ no_socket: { - perror ("socket"); - return FALSE; - } -no_broadcast: - { - perror ("setsockopt"); - CLOSE_IF_REQUESTED (sink); + GST_ERROR_OBJECT (udpsink, "Failed to create socket: %s", err->message); + g_clear_error (&err); return FALSE; } } -GValueArray * +GstStructure * gst_dynudpsink_get_stats (GstDynUDPSink * sink, const gchar * host, gint port) { return NULL; } -static void -gst_dynudpsink_close (GstDynUDPSink * sink) +static gboolean +gst_dynudpsink_stop (GstBaseSink * bsink) { - CLOSE_IF_REQUESTED (sink); + GstDynUDPSink *udpsink; + + udpsink = GST_DYNUDPSINK (bsink); + + if (udpsink->used_socket) { + if (udpsink->close_socket || !udpsink->external_socket) { + GError *err = NULL; + + if (!g_socket_close (udpsink->used_socket, &err)) { + GST_ERROR_OBJECT (udpsink, "Failed to close socket: %s", err->message); + g_clear_error (&err); + } + } + + g_object_unref (udpsink->used_socket); + udpsink->used_socket = NULL; + } + + return TRUE; } -static GstStateChangeReturn -gst_dynudpsink_change_state (GstElement * element, GstStateChange transition) +static gboolean +gst_dynudpsink_unlock (GstBaseSink * bsink) { - GstStateChangeReturn ret; - GstDynUDPSink *sink; + GstDynUDPSink *udpsink; - sink = GST_DYNUDPSINK (element); + udpsink = GST_DYNUDPSINK (bsink); - switch (transition) { - case GST_STATE_CHANGE_READY_TO_PAUSED: - if (!gst_dynudpsink_init_send (sink)) - goto no_init; - break; - default: - break; - } + g_cancellable_cancel (udpsink->cancellable); - ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition); - - switch (transition) { - case GST_STATE_CHANGE_PAUSED_TO_READY: - gst_dynudpsink_close (sink); - break; - default: - break; - } - return ret; - - /* ERRORS */ -no_init: - { - return GST_STATE_CHANGE_FAILURE; - } + return TRUE; +} + +static gboolean +gst_dynudpsink_unlock_stop (GstBaseSink * bsink) +{ + GstDynUDPSink *udpsink; + + udpsink = GST_DYNUDPSINK (bsink); + + g_cancellable_reset (udpsink->cancellable); + + return TRUE; } diff --git a/gst/udp/gstdynudpsink.h b/gst/udp/gstdynudpsink.h index d46dfdf05d..a59d8750e6 100644 --- a/gst/udp/gstdynudpsink.h +++ b/gst/udp/gstdynudpsink.h @@ -22,11 +22,11 @@ #include #include +#include G_BEGIN_DECLS #include "gstudpnetutils.h" - #include "gstudp.h" #define GST_TYPE_DYNUDPSINK (gst_dynudpsink_get_type()) @@ -45,26 +45,28 @@ struct _GstDynUDPSink { GstBaseSink parent; /* properties */ - gint sockfd; - gboolean closefd; + GSocket *socket; + gboolean close_socket; + GSocketFamily family; /* the socket in use */ - int sock; - gboolean externalfd; + GSocket *used_socket; + gboolean external_socket; + GCancellable *cancellable; }; struct _GstDynUDPSinkClass { GstBaseSinkClass parent_class; /* element methods */ - GValueArray* (*get_stats) (GstDynUDPSink *sink, const gchar *host, gint port); + GstStructure* (*get_stats) (GstDynUDPSink *sink, const gchar *host, gint port); /* signals */ }; GType gst_dynudpsink_get_type(void); -GValueArray* gst_dynudpsink_get_stats (GstDynUDPSink *sink, const gchar *host, gint port); +GstStructure* gst_dynudpsink_get_stats (GstDynUDPSink *sink, const gchar *host, gint port); G_END_DECLS