From d29c7826aba7ba3233e03412e5482b235e70fcd4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Wed, 11 Jan 2012 16:06:22 +0100 Subject: [PATCH] tcpserversrc: Port to GIO --- gst/tcp/gsttcpserversrc.c | 312 +++++++++++++++++++++----------------- gst/tcp/gsttcpserversrc.h | 27 +--- 2 files changed, 182 insertions(+), 157 deletions(-) diff --git a/gst/tcp/gsttcpserversrc.c b/gst/tcp/gsttcpserversrc.c index 3c4b5b1cd3..42a89ed703 100644 --- a/gst/tcp/gsttcpserversrc.c +++ b/gst/tcp/gsttcpserversrc.c @@ -1,6 +1,8 @@ /* GStreamer * Copyright (C) <1999> Erik Walthinsen * Copyright (C) <2004> Thomas Vander Stichele + * Copyright (C) <2011> Collabora Ltd. + * Author: Sebastian Dröge * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Library General Public @@ -40,11 +42,6 @@ #include #include "gsttcp.h" #include "gsttcpserversrc.h" -#include /* memset */ -#include -#include -#include - GST_DEBUG_CATEGORY_STATIC (tcpserversrc_debug); #define GST_CAT_DEFAULT tcpserversrc_debug @@ -52,13 +49,13 @@ GST_DEBUG_CATEGORY_STATIC (tcpserversrc_debug); #define TCP_DEFAULT_LISTEN_HOST NULL /* listen on all interfaces */ #define TCP_BACKLOG 1 /* client connection queue */ +#define MAX_READ_SIZE 4 * 1024 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src", GST_PAD_SRC, GST_PAD_ALWAYS, GST_STATIC_CAPS_ANY); - enum { PROP_0, @@ -69,12 +66,12 @@ enum #define gst_tcp_server_src_parent_class parent_class G_DEFINE_TYPE (GstTCPServerSrc, gst_tcp_server_src, GST_TYPE_PUSH_SRC); - static void gst_tcp_server_src_finalize (GObject * gobject); static gboolean gst_tcp_server_src_start (GstBaseSrc * bsrc); static gboolean gst_tcp_server_src_stop (GstBaseSrc * bsrc); static gboolean gst_tcp_server_src_unlock (GstBaseSrc * bsrc); +static gboolean gst_tcp_server_src_unlock_stop (GstBaseSrc * bsrc); static GstFlowReturn gst_tcp_server_src_create (GstPushSrc * psrc, GstBuffer ** buf); @@ -119,6 +116,7 @@ gst_tcp_server_src_class_init (GstTCPServerSrcClass * klass) gstbasesrc_class->start = gst_tcp_server_src_start; gstbasesrc_class->stop = gst_tcp_server_src_stop; gstbasesrc_class->unlock = gst_tcp_server_src_unlock; + gstbasesrc_class->unlock_stop = gst_tcp_server_src_unlock_stop; gstpush_src_class->create = gst_tcp_server_src_create; @@ -131,8 +129,9 @@ gst_tcp_server_src_init (GstTCPServerSrc * src) { src->server_port = TCP_DEFAULT_PORT; src->host = g_strdup (TCP_DEFAULT_HOST); - src->server_sock_fd.fd = -1; - src->client_sock_fd.fd = -1; + src->server_socket = NULL; + src->client_socket = NULL; + src->cancellable = g_cancellable_new (); GST_OBJECT_FLAG_UNSET (src, GST_TCP_SERVER_SRC_OPEN); } @@ -142,7 +141,18 @@ gst_tcp_server_src_finalize (GObject * gobject) { GstTCPServerSrc *src = GST_TCP_SERVER_SRC (gobject); + if (src->cancellable) + g_object_unref (src->cancellable); + src->cancellable = NULL; + if (src->server_socket) + g_object_unref (src->server_socket); + src->server_socket = NULL; + if (src->client_socket) + g_object_unref (src->client_socket); + src->client_socket = NULL; + g_free (src->host); + src->host = NULL; G_OBJECT_CLASS (parent_class)->finalize (gobject); } @@ -152,51 +162,56 @@ gst_tcp_server_src_create (GstPushSrc * psrc, GstBuffer ** outbuf) { GstTCPServerSrc *src; GstFlowReturn ret = GST_FLOW_OK; + gssize rret; + GError *err = NULL; + guint8 *data; src = GST_TCP_SERVER_SRC (psrc); if (!GST_OBJECT_FLAG_IS_SET (src, GST_TCP_SERVER_SRC_OPEN)) goto wrong_state; -restart: - if (src->client_sock_fd.fd >= 0) { - /* if we have a client, wait for read */ - gst_poll_fd_ctl_read (src->fdset, &src->server_sock_fd, FALSE); - gst_poll_fd_ctl_read (src->fdset, &src->client_sock_fd, TRUE); - } else { - /* else wait on server socket for connections */ - gst_poll_fd_ctl_read (src->fdset, &src->server_sock_fd, TRUE); - } - - /* no action (0) is an error too in our case */ - if ((ret = gst_poll_wait (src->fdset, GST_CLOCK_TIME_NONE)) <= 0) { - if (ret == -1 && errno == EBUSY) - goto select_cancelled; - else - goto select_error; - } - - /* if we have no client socket we can accept one now */ - if (src->client_sock_fd.fd < 0) { - if (gst_poll_fd_can_read (src->fdset, &src->server_sock_fd)) { - if ((src->client_sock_fd.fd = - accept (src->server_sock_fd.fd, - (struct sockaddr *) &src->client_sin, - &src->client_sin_len)) == -1) - goto accept_error; - - gst_poll_add_fd (src->fdset, &src->client_sock_fd); - } - /* and restart now to poll the socket. */ - goto restart; + if (!src->client_socket) { + /* wait on server socket for connections */ + src->client_socket = + g_socket_accept (src->server_socket, src->cancellable, &err); + if (!src->client_socket) + goto accept_error; + /* now read from the socket. */ } + /* if we have a client, wait for read */ GST_LOG_OBJECT (src, "asked for a buffer"); - ret = gst_tcp_read_buffer (GST_ELEMENT (src), src->client_sock_fd.fd, - src->fdset, outbuf); + /* read the buffer header */ + *outbuf = gst_buffer_new_and_alloc (MAX_READ_SIZE); + data = gst_buffer_map (*outbuf, NULL, NULL, GST_MAP_READWRITE); + rret = + g_socket_receive (src->client_socket, (gchar *) data, MAX_READ_SIZE, + src->cancellable, &err); + + if (rret == 0) { + GST_DEBUG_OBJECT (src, "Connection closed"); + ret = GST_FLOW_EOS; + gst_buffer_unmap (*outbuf, data, MAX_READ_SIZE); + gst_buffer_unref (*outbuf); + *outbuf = NULL; + } else if (ret < 0) { + if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) { + ret = GST_FLOW_WRONG_STATE; + GST_DEBUG_OBJECT (src, "Cancelled reading from socket"); + } else { + ret = GST_FLOW_ERROR; + GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL), + ("Failed to read from socket: %s", err->message)); + } + gst_buffer_unmap (*outbuf, data, MAX_READ_SIZE); + gst_buffer_unref (*outbuf); + *outbuf = NULL; + } else { + ret = GST_FLOW_OK; + gst_buffer_unmap (*outbuf, data, rret); - if (ret == GST_FLOW_OK) { GST_LOG_OBJECT (src, "Returning buffer from _get of size %" G_GSIZE_FORMAT ", ts %" GST_TIME_FORMAT ", dur %" GST_TIME_FORMAT @@ -206,6 +221,7 @@ restart: GST_TIME_ARGS (GST_BUFFER_DURATION (*outbuf)), GST_BUFFER_OFFSET (*outbuf), GST_BUFFER_OFFSET_END (*outbuf)); } + g_clear_error (&err); return ret; @@ -214,21 +230,15 @@ wrong_state: GST_DEBUG_OBJECT (src, "connection to closed, cannot read data"); return GST_FLOW_WRONG_STATE; } -select_error: - { - GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL), - ("Select error: %s", g_strerror (errno))); - return GST_FLOW_ERROR; - } -select_cancelled: - { - GST_DEBUG_OBJECT (src, "select canceled"); - return GST_FLOW_WRONG_STATE; - } accept_error: { - GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL), - ("Could not accept client on server socket: %s", g_strerror (errno))); + if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) { + GST_DEBUG_OBJECT (src, "Cancelled accepting of client"); + } else { + GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL), + ("Failed to accept client: %s", err->message)); + } + g_clear_error (&err); return GST_FLOW_ERROR; } } @@ -271,7 +281,6 @@ gst_tcp_server_src_get_property (GObject * object, guint prop_id, case PROP_PORT: g_value_set_int (value, tcpserversrc->server_port); break; - default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -282,104 +291,108 @@ gst_tcp_server_src_get_property (GObject * object, guint prop_id, static gboolean gst_tcp_server_src_start (GstBaseSrc * bsrc) { - int ret; GstTCPServerSrc *src = GST_TCP_SERVER_SRC (bsrc); - - /* reset caps_received flag */ - src->caps_received = FALSE; + GError *err = NULL; + GInetAddress *addr; + GSocketAddress *saddr; /* create the server listener socket */ - if ((src->server_sock_fd.fd = socket (AF_INET, SOCK_STREAM, 0)) == -1) - goto socket_error; + src->server_socket = g_socket_new (G_SOCKET_FAMILY_IPV4, G_SOCKET_TYPE_STREAM, + G_SOCKET_PROTOCOL_TCP, &err); + if (!src->server_socket) + goto no_socket; - GST_DEBUG_OBJECT (src, "opened receiving server socket with fd %d", - src->server_sock_fd.fd); + GST_DEBUG_OBJECT (src, "opened receiving server socket"); - /* make address reusable */ - ret = 1; - if (setsockopt (src->server_sock_fd.fd, SOL_SOCKET, SO_REUSEADDR, &ret, - sizeof (int)) < 0) - goto sock_opt; + /* look up name if we need to */ + addr = g_inet_address_new_from_string (src->host); + if (!addr) { + GResolver *resolver = g_resolver_get_default (); + GList *results; - /* name the socket */ - memset (&src->server_sin, 0, sizeof (src->server_sin)); - src->server_sin.sin_family = AF_INET; /* network socket */ - src->server_sin.sin_port = htons (src->server_port); /* on port */ - if (src->host) { - gchar *host; + results = + g_resolver_lookup_by_name (resolver, src->host, src->cancellable, &err); + if (!results) + goto name_resolve; + addr = G_INET_ADDRESS (g_object_ref (results->data)); - if (!(host = gst_tcp_host_to_ip (GST_ELEMENT (src), src->host))) - goto host_error; - src->server_sin.sin_addr.s_addr = inet_addr (host); - g_free (host); - } else - src->server_sin.sin_addr.s_addr = htonl (INADDR_ANY); + g_resolver_free_addresses (results); + g_object_unref (resolver); + } +#ifndef GST_DISABLE_GST_DEBUG + { + gchar *ip = g_inet_address_to_string (addr); + + GST_DEBUG_OBJECT (src, "IP address for host %s is %s", src->host, ip); + g_free (ip); + } +#endif /* bind it */ + saddr = g_inet_socket_address_new (addr, src->server_port); GST_DEBUG_OBJECT (src, "binding server socket to address"); - if ((ret = bind (src->server_sock_fd.fd, (struct sockaddr *) &src->server_sin, - sizeof (src->server_sin))) < 0) - goto bind_error; + if (!g_socket_bind (src->server_socket, saddr, TRUE, &err)) + goto bind_failed; - GST_DEBUG_OBJECT (src, "listening on server socket %d with queue of %d", - src->server_sock_fd.fd, TCP_BACKLOG); + GST_DEBUG_OBJECT (src, "listening on server socket"); - if (listen (src->server_sock_fd.fd, TCP_BACKLOG) == -1) - goto listen_error; + g_socket_set_listen_backlog (src->server_socket, TCP_BACKLOG); - /* create an fdset to keep track of our file descriptors */ - if ((src->fdset = gst_poll_new (TRUE)) == NULL) - goto socket_pair; - - gst_poll_add_fd (src->fdset, &src->server_sock_fd); - - GST_DEBUG_OBJECT (src, "received client"); + if (!g_socket_listen (src->server_socket, &err)) + goto listen_failed; GST_OBJECT_FLAG_SET (src, GST_TCP_SERVER_SRC_OPEN); return TRUE; /* ERRORS */ -socket_error: +no_socket: { - GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL), GST_ERROR_SYSTEM); - return FALSE; - } -sock_opt: - { - GST_ELEMENT_ERROR (src, RESOURCE, SETTINGS, (NULL), - ("Could not setsockopt: %s", g_strerror (errno))); - gst_tcp_socket_close (&src->server_sock_fd); - return FALSE; - } -host_error: - { - gst_tcp_socket_close (&src->server_sock_fd); - return FALSE; - } -bind_error: - { - gst_tcp_socket_close (&src->server_sock_fd); - switch (errno) { - default: - GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL), - ("bind failed: %s", g_strerror (errno))); - break; - } - return FALSE; - } -listen_error: - { - gst_tcp_socket_close (&src->server_sock_fd); GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL), - ("Could not listen on server socket: %s", g_strerror (errno))); + ("Failed to create socket: %s", err->message)); + g_clear_error (&err); return FALSE; } -socket_pair: +name_resolve: { - GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ_WRITE, (NULL), - GST_ERROR_SYSTEM); - gst_tcp_socket_close (&src->server_sock_fd); + if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) { + GST_DEBUG_OBJECT (src, "Cancelled name resolval"); + } else { + GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL), + ("Failed to resolve host '%s': %s", src->host, err->message)); + } + g_clear_error (&err); + gst_tcp_server_src_stop (GST_BASE_SRC (src)); + return FALSE; + } +bind_failed: + { + if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) { + GST_DEBUG_OBJECT (src, "Cancelled binding"); + } else { + GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL), + ("Failed to bind on host '%s:%d': %s", src->host, src->server_port, + err->message)); + } + g_clear_error (&err); + g_object_unref (saddr); + g_object_unref (addr); + gst_tcp_server_src_stop (GST_BASE_SRC (src)); + return FALSE; + } +listen_failed: + { + if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) { + GST_DEBUG_OBJECT (src, "Cancelled listening"); + } else { + GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL), + ("Failed to listen on host '%s:%d': %s", src->host, src->server_port, + err->message)); + } + g_clear_error (&err); + g_object_unref (saddr); + g_object_unref (addr); + gst_tcp_server_src_stop (GST_BASE_SRC (src)); return FALSE; } } @@ -388,12 +401,29 @@ static gboolean gst_tcp_server_src_stop (GstBaseSrc * bsrc) { GstTCPServerSrc *src = GST_TCP_SERVER_SRC (bsrc); + GError *err = NULL; - gst_poll_free (src->fdset); - src->fdset = NULL; + if (src->client_socket) { + GST_DEBUG_OBJECT (src, "closing socket"); - gst_tcp_socket_close (&src->server_sock_fd); - gst_tcp_socket_close (&src->client_sock_fd); + if (!g_socket_close (src->client_socket, &err)) { + GST_ERROR_OBJECT (src, "Failed to close socket: %s", err->message); + g_clear_error (&err); + } + g_object_unref (src->client_socket); + src->client_socket = NULL; + } + + if (src->server_socket) { + GST_DEBUG_OBJECT (src, "closing socket"); + + if (!g_socket_close (src->server_socket, &err)) { + GST_ERROR_OBJECT (src, "Failed to close socket: %s", err->message); + g_clear_error (&err); + } + g_object_unref (src->server_socket); + src->server_socket = NULL; + } GST_OBJECT_FLAG_UNSET (src, GST_TCP_SERVER_SRC_OPEN); @@ -406,7 +436,17 @@ gst_tcp_server_src_unlock (GstBaseSrc * bsrc) { GstTCPServerSrc *src = GST_TCP_SERVER_SRC (bsrc); - gst_poll_set_flushing (src->fdset, TRUE); + g_cancellable_cancel (src->cancellable); + + return TRUE; +} + +static gboolean +gst_tcp_server_src_unlock_stop (GstBaseSrc * bsrc) +{ + GstTCPServerSrc *src = GST_TCP_SERVER_SRC (bsrc); + + g_cancellable_reset (src->cancellable); return TRUE; } diff --git a/gst/tcp/gsttcpserversrc.h b/gst/tcp/gsttcpserversrc.h index f5e80e269c..326c4a5865 100644 --- a/gst/tcp/gsttcpserversrc.h +++ b/gst/tcp/gsttcpserversrc.h @@ -24,20 +24,12 @@ #include #include +#include G_END_DECLS -#include -#include -#include -#include -#include -#include -#include #include "gsttcp.h" -#include - #define GST_TYPE_TCP_SERVER_SRC \ (gst_tcp_server_src_get_type()) #define GST_TCP_SERVER_SRC(obj) \ @@ -53,9 +45,9 @@ typedef struct _GstTCPServerSrc GstTCPServerSrc; typedef struct _GstTCPServerSrcClass GstTCPServerSrcClass; typedef enum { - GST_TCP_SERVER_SRC_OPEN = (GST_ELEMENT_FLAG_LAST << 0), + GST_TCP_SERVER_SRC_OPEN = (GST_BASE_SRC_FLAG_LAST << 0), - GST_TCP_SERVER_SRC_FLAG_LAST = (GST_ELEMENT_FLAG_LAST << 2) + GST_TCP_SERVER_SRC_FLAG_LAST = (GST_BASE_SRC_FLAG_LAST << 2) } GstTCPServerSrcFlags; struct _GstTCPServerSrc { @@ -64,17 +56,10 @@ struct _GstTCPServerSrc { /* server information */ int server_port; gchar *host; - struct sockaddr_in server_sin; - GstPollFD server_sock_fd; - /* client information */ - struct sockaddr_in client_sin; - socklen_t client_sin_len; - GstPollFD client_sock_fd; - - GstPoll *fdset; - - gboolean caps_received; /* if we have received caps yet */ + GCancellable *cancellable; + GSocket *server_socket; + GSocket *client_socket; }; struct _GstTCPServerSrcClass {