dynudpsink: Port to GIO

This commit is contained in:
Sebastian Dröge 2012-01-17 09:32:27 +01:00
parent 7f74fc9ef6
commit 7f0239b19b
2 changed files with 174 additions and 155 deletions

View file

@ -2,6 +2,8 @@
* Copyright (C) <2005> Philippe Khalaf <burger@speedy.org>
* Copyright (C) <2005> Nokia Corporation <kai.vehmanen@nokia.com>
* Copyright (C) <2006> Joni Valtanen <joni.valtanen@movial.fi>
* Copyright (C) <2012> Collabora Ltd.
* Author: Sebastian Dröge <sebastian.droege@collabora.co.uk>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
@ -25,17 +27,6 @@
#include "gstudp-marshal.h"
#include "gstdynudpsink.h"
#include <stdio.h>
#include <stdlib.h>
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif
#include <errno.h>
#include <string.h>
#ifdef HAVE_SYS_TIME_H
#include <sys/time.h>
#endif
#include <sys/types.h>
#include <gst/net/gstnetaddressmeta.h>
GST_DEBUG_CATEGORY_STATIC (dynudpsink_debug);
@ -58,33 +49,26 @@ enum
LAST_SIGNAL
};
#define UDP_DEFAULT_SOCKFD -1
#define UDP_DEFAULT_CLOSEFD TRUE
#define UDP_DEFAULT_SOCKET NULL
#define UDP_DEFAULT_CLOSE_SOCKET TRUE
#define UDP_DEFAULT_FAMILY G_SOCKET_FAMILY_IPV4
enum
{
PROP_0,
PROP_SOCKFD,
PROP_CLOSEFD
PROP_SOCKET,
PROP_CLOSE_SOCKET,
PROP_FAMILY
};
#define CLOSE_IF_REQUESTED(udpctx) \
G_STMT_START { \
if ((!udpctx->externalfd) || (udpctx->externalfd && udpctx->closefd)) { \
CLOSE_SOCKET(udpctx->sock); \
if (udpctx->sock == udpctx->sockfd) \
udpctx->sockfd = UDP_DEFAULT_SOCKFD; \
} \
udpctx->sock = -1; \
} G_STMT_END
static void gst_dynudpsink_finalize (GObject * object);
static GstFlowReturn gst_dynudpsink_render (GstBaseSink * sink,
GstBuffer * buffer);
static void gst_dynudpsink_close (GstDynUDPSink * sink);
static GstStateChangeReturn gst_dynudpsink_change_state (GstElement * element,
GstStateChange transition);
static gboolean gst_dynudpsink_stop (GstBaseSink * bsink);
static gboolean gst_dynudpsink_start (GstBaseSink * bsink);
static gboolean gst_dynudpsink_unlock (GstBaseSink * bsink);
static gboolean gst_dynudpsink_unlock_stop (GstBaseSink * bsink);
static void gst_dynudpsink_set_property (GObject * object, guint prop_id,
const GValue * value, GParamSpec * pspec);
@ -119,17 +103,21 @@ gst_dynudpsink_class_init (GstDynUDPSinkClass * klass)
NULL, NULL, gst_udp_marshal_BOXED__STRING_INT, G_TYPE_VALUE_ARRAY, 2,
G_TYPE_STRING, G_TYPE_INT);
g_object_class_install_property (gobject_class, PROP_SOCKFD,
g_param_spec_int ("sockfd", "socket handle",
"Socket to use for UDP sending. (-1 == allocate)",
-1, G_MAXINT16, UDP_DEFAULT_SOCKFD,
g_object_class_install_property (gobject_class, PROP_SOCKET,
g_param_spec_object ("socket", "Socket",
"Socket to use for UDP sending. (NULL == allocate)",
G_TYPE_SOCKET, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_CLOSE_SOCKET,
g_param_spec_boolean ("close-socket", "Close socket",
"Close socket if passed as property on state change",
UDP_DEFAULT_CLOSE_SOCKET,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_CLOSEFD,
g_param_spec_boolean ("closefd", "Close sockfd",
"Close sockfd if passed as property on state change",
UDP_DEFAULT_CLOSEFD, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
gstelement_class->change_state = gst_dynudpsink_change_state;
g_object_class_install_property (gobject_class, PROP_FAMILY,
g_param_spec_enum ("family", "Socket family",
"Use IPv4 or IPv6",
G_TYPE_SOCKET_FAMILY, UDP_DEFAULT_FAMILY,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
gst_element_class_add_pad_template (gstelement_class,
gst_static_pad_template_get (&sink_template));
@ -140,6 +128,10 @@ gst_dynudpsink_class_init (GstDynUDPSinkClass * klass)
"Philippe Khalaf <burger@speedy.org>");
gstbasesink_class->render = gst_dynudpsink_render;
gstbasesink_class->start = gst_dynudpsink_start;
gstbasesink_class->stop = gst_dynudpsink_stop;
gstbasesink_class->unlock = gst_dynudpsink_unlock;
gstbasesink_class->unlock_stop = gst_dynudpsink_unlock_stop;
GST_DEBUG_CATEGORY_INIT (dynudpsink_debug, "dynudpsink", 0, "UDP sink");
}
@ -147,44 +139,47 @@ gst_dynudpsink_class_init (GstDynUDPSinkClass * klass)
static void
gst_dynudpsink_init (GstDynUDPSink * sink)
{
WSA_STARTUP (sink);
sink->socket = UDP_DEFAULT_SOCKET;
sink->close_socket = UDP_DEFAULT_CLOSE_SOCKET;
sink->external_socket = FALSE;
sink->family = G_SOCKET_FAMILY_IPV4;
sink->sockfd = UDP_DEFAULT_SOCKFD;
sink->closefd = UDP_DEFAULT_CLOSEFD;
sink->externalfd = FALSE;
sink->sock = -1;
sink->used_socket = NULL;
sink->cancellable = g_cancellable_new ();
}
static void
gst_dynudpsink_finalize (GObject * object)
{
GstDynUDPSink *udpsink;
GstDynUDPSink *sink;
udpsink = GST_DYNUDPSINK (object);
sink = GST_DYNUDPSINK (object);
if (udpsink->sockfd >= 0 && udpsink->closefd)
CLOSE_SOCKET (udpsink->sockfd);
if (sink->cancellable)
g_object_unref (sink->cancellable);
sink->cancellable = NULL;
if (sink->socket)
g_object_unref (sink->socket);
sink->socket = NULL;
if (sink->used_socket)
g_object_unref (sink->used_socket);
sink->used_socket = NULL;
G_OBJECT_CLASS (parent_class)->finalize (object);
WSA_CLEANUP (object);
}
static GstFlowReturn
gst_dynudpsink_render (GstBaseSink * bsink, GstBuffer * buffer)
{
GstDynUDPSink *sink;
gint ret;
gssize ret;
gsize size;
guint8 *data;
GstNetAddressMeta *meta;
struct sockaddr_in theiraddr;
guint16 destport;
guint32 destaddr;
const guint8 *destaddr_bytes;
memset (&theiraddr, 0, sizeof (theiraddr));
GSocketAddress *addr;
GError *err = NULL;
meta = gst_buffer_get_net_address_meta (buffer);
@ -195,46 +190,51 @@ gst_dynudpsink_render (GstBaseSink * bsink, GstBuffer * buffer)
sink = GST_DYNUDPSINK (bsink);
/* let's get the address from the metadata */
addr = meta->addr;
if (g_socket_address_get_family (addr) != sink->family)
goto invalid_family;
data = gst_buffer_map (buffer, &size, NULL, GST_MAP_READ);
GST_DEBUG ("about to send %" G_GSIZE_FORMAT " bytes", size);
/* let's get the address from the metaata */
destaddr_bytes =
g_inet_address_to_bytes (g_inet_socket_address_get_address
(G_INET_SOCKET_ADDRESS (meta->addr)));
destaddr = GST_READ_UINT32_BE (destaddr_bytes);
destport =
g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (meta->addr));
#ifndef GST_DISABLE_GST_DEBUG
{
gchar *host;
GST_DEBUG ("sending %" G_GSIZE_FORMAT " bytes to client %d port %d", size,
destaddr, destport);
theiraddr.sin_family = AF_INET;
theiraddr.sin_addr.s_addr = destaddr;
theiraddr.sin_port = destport;
#ifdef G_OS_WIN32
ret = sendto (sink->sock, (char *) data, size, 0,
#else
ret = sendto (sink->sock, data, size, 0,
host =
g_inet_address_to_string (g_inet_socket_address_get_address
(G_INET_SOCKET_ADDRESS (addr)));
GST_DEBUG ("sending %" G_GSIZE_FORMAT " bytes to client %s port %d", size,
host, g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (addr)));
g_free (host);
}
#endif
(struct sockaddr *) &theiraddr, sizeof (theiraddr));
ret =
g_socket_send_to (sink->used_socket, addr, (gchar *) data, size,
sink->cancellable, &err);
gst_buffer_unmap (buffer, data, size);
if (ret < 0) {
if (errno != EINTR && errno != EAGAIN) {
goto send_error;
}
}
if (ret < 0)
goto send_error;
GST_DEBUG ("sent %" G_GSIZE_FORMAT " bytes", size);
GST_DEBUG ("sent %" G_GSSIZE_FORMAT " bytes", ret);
return GST_FLOW_OK;
send_error:
{
GST_DEBUG ("got send error %s (%d)", g_strerror (errno), errno);
GST_DEBUG ("got send error %s", err->message);
g_clear_error (&err);
return GST_FLOW_ERROR;
}
invalid_family:
{
GST_DEBUG ("invalid family (got %d, expected %d)",
g_socket_address_get_family (addr), sink->family);
return GST_FLOW_ERROR;
}
}
@ -248,17 +248,28 @@ gst_dynudpsink_set_property (GObject * object, guint prop_id,
udpsink = GST_DYNUDPSINK (object);
switch (prop_id) {
case PROP_SOCKFD:
if (udpsink->sockfd >= 0 && udpsink->sockfd != udpsink->sock &&
udpsink->closefd)
CLOSE_SOCKET (udpsink->sockfd);
udpsink->sockfd = g_value_get_int (value);
GST_DEBUG ("setting SOCKFD to %d", udpsink->sockfd);
break;
case PROP_CLOSEFD:
udpsink->closefd = g_value_get_boolean (value);
break;
case PROP_SOCKET:
if (udpsink->socket != NULL && udpsink->socket != udpsink->used_socket &&
udpsink->close_socket) {
GError *err = NULL;
if (!g_socket_close (udpsink->socket, &err)) {
GST_ERROR ("failed to close socket %p: %s", udpsink->socket,
err->message);
g_clear_error (&err);
}
}
if (udpsink->socket)
g_object_unref (udpsink->socket);
udpsink->socket = g_value_dup_object (value);
GST_DEBUG ("setting socket to %p", udpsink->socket);
break;
case PROP_CLOSE_SOCKET:
udpsink->close_socket = g_value_get_boolean (value);
break;
case PROP_FAMILY:
udpsink->family = g_value_get_enum (value);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
@ -274,11 +285,14 @@ gst_dynudpsink_get_property (GObject * object, guint prop_id, GValue * value,
udpsink = GST_DYNUDPSINK (object);
switch (prop_id) {
case PROP_SOCKFD:
g_value_set_int (value, udpsink->sockfd);
case PROP_SOCKET:
g_value_set_object (value, udpsink->socket);
break;
case PROP_CLOSEFD:
g_value_set_boolean (value, udpsink->closefd);
case PROP_CLOSE_SOCKET:
g_value_set_boolean (value, udpsink->close_socket);
break;
case PROP_FAMILY:
g_value_set_enum (value, udpsink->family);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
@ -286,87 +300,90 @@ gst_dynudpsink_get_property (GObject * object, guint prop_id, GValue * value,
}
}
/* create a socket for sending to remote machine */
static gboolean
gst_dynudpsink_init_send (GstDynUDPSink * sink)
gst_dynudpsink_start (GstBaseSink * bsink)
{
guint bc_val;
GstDynUDPSink *udpsink;
GError *err = NULL;
if (sink->sockfd == -1) {
udpsink = GST_DYNUDPSINK (bsink);
if (udpsink->socket == NULL) {
/* create sender socket if none available */
if ((sink->sock = socket (AF_INET, SOCK_DGRAM, 0)) < 0)
if ((udpsink->used_socket =
g_socket_new (udpsink->family,
G_SOCKET_TYPE_DATAGRAM, G_SOCKET_PROTOCOL_UDP, &err)) == NULL)
goto no_socket;
bc_val = 1;
if (setsockopt (sink->sock, SOL_SOCKET, SO_BROADCAST, &bc_val,
sizeof (bc_val)) < 0)
goto no_broadcast;
sink->externalfd = TRUE;
g_socket_set_broadcast (udpsink->used_socket, TRUE);
udpsink->external_socket = FALSE;
} else {
sink->sock = sink->sockfd;
sink->externalfd = TRUE;
udpsink->used_socket = G_SOCKET (g_object_ref (udpsink->socket));
udpsink->external_socket = TRUE;
}
return TRUE;
/* ERRORS */
no_socket:
{
perror ("socket");
return FALSE;
}
no_broadcast:
{
perror ("setsockopt");
CLOSE_IF_REQUESTED (sink);
GST_ERROR_OBJECT (udpsink, "Failed to create socket: %s", err->message);
g_clear_error (&err);
return FALSE;
}
}
GValueArray *
GstStructure *
gst_dynudpsink_get_stats (GstDynUDPSink * sink, const gchar * host, gint port)
{
return NULL;
}
static void
gst_dynudpsink_close (GstDynUDPSink * sink)
static gboolean
gst_dynudpsink_stop (GstBaseSink * bsink)
{
CLOSE_IF_REQUESTED (sink);
GstDynUDPSink *udpsink;
udpsink = GST_DYNUDPSINK (bsink);
if (udpsink->used_socket) {
if (udpsink->close_socket || !udpsink->external_socket) {
GError *err = NULL;
if (!g_socket_close (udpsink->used_socket, &err)) {
GST_ERROR_OBJECT (udpsink, "Failed to close socket: %s", err->message);
g_clear_error (&err);
}
}
g_object_unref (udpsink->used_socket);
udpsink->used_socket = NULL;
}
return TRUE;
}
static GstStateChangeReturn
gst_dynudpsink_change_state (GstElement * element, GstStateChange transition)
static gboolean
gst_dynudpsink_unlock (GstBaseSink * bsink)
{
GstStateChangeReturn ret;
GstDynUDPSink *sink;
GstDynUDPSink *udpsink;
sink = GST_DYNUDPSINK (element);
udpsink = GST_DYNUDPSINK (bsink);
switch (transition) {
case GST_STATE_CHANGE_READY_TO_PAUSED:
if (!gst_dynudpsink_init_send (sink))
goto no_init;
break;
default:
break;
}
g_cancellable_cancel (udpsink->cancellable);
ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
switch (transition) {
case GST_STATE_CHANGE_PAUSED_TO_READY:
gst_dynudpsink_close (sink);
break;
default:
break;
}
return ret;
/* ERRORS */
no_init:
{
return GST_STATE_CHANGE_FAILURE;
}
return TRUE;
}
static gboolean
gst_dynudpsink_unlock_stop (GstBaseSink * bsink)
{
GstDynUDPSink *udpsink;
udpsink = GST_DYNUDPSINK (bsink);
g_cancellable_reset (udpsink->cancellable);
return TRUE;
}

View file

@ -22,11 +22,11 @@
#include <gst/gst.h>
#include <gst/base/gstbasesink.h>
#include <gio/gio.h>
G_BEGIN_DECLS
#include "gstudpnetutils.h"
#include "gstudp.h"
#define GST_TYPE_DYNUDPSINK (gst_dynudpsink_get_type())
@ -45,26 +45,28 @@ struct _GstDynUDPSink {
GstBaseSink parent;
/* properties */
gint sockfd;
gboolean closefd;
GSocket *socket;
gboolean close_socket;
GSocketFamily family;
/* the socket in use */
int sock;
gboolean externalfd;
GSocket *used_socket;
gboolean external_socket;
GCancellable *cancellable;
};
struct _GstDynUDPSinkClass {
GstBaseSinkClass parent_class;
/* element methods */
GValueArray* (*get_stats) (GstDynUDPSink *sink, const gchar *host, gint port);
GstStructure* (*get_stats) (GstDynUDPSink *sink, const gchar *host, gint port);
/* signals */
};
GType gst_dynudpsink_get_type(void);
GValueArray* gst_dynudpsink_get_stats (GstDynUDPSink *sink, const gchar *host, gint port);
GstStructure* gst_dynudpsink_get_stats (GstDynUDPSink *sink, const gchar *host, gint port);
G_END_DECLS