tcpserversrc: Port to GIO

This commit is contained in:
Sebastian Dröge 2012-01-11 16:06:22 +01:00
parent 2a2acedde0
commit d29c7826ab
2 changed files with 182 additions and 157 deletions

View file

@ -1,6 +1,8 @@
/* GStreamer /* GStreamer
* Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu> * Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
* Copyright (C) <2004> Thomas Vander Stichele <thomas at apestaart dot org> * Copyright (C) <2004> Thomas Vander Stichele <thomas at apestaart dot org>
* Copyright (C) <2011> Collabora Ltd.
* Author: Sebastian Dröge <sebastian.droege@collabora.co.uk>
* *
* This library is free software; you can redistribute it and/or * This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public * modify it under the terms of the GNU Library General Public
@ -40,11 +42,6 @@
#include <gst/gst-i18n-plugin.h> #include <gst/gst-i18n-plugin.h>
#include "gsttcp.h" #include "gsttcp.h"
#include "gsttcpserversrc.h" #include "gsttcpserversrc.h"
#include <string.h> /* memset */
#include <unistd.h>
#include <sys/ioctl.h>
#include <fcntl.h>
GST_DEBUG_CATEGORY_STATIC (tcpserversrc_debug); GST_DEBUG_CATEGORY_STATIC (tcpserversrc_debug);
#define GST_CAT_DEFAULT tcpserversrc_debug #define GST_CAT_DEFAULT tcpserversrc_debug
@ -52,13 +49,13 @@ GST_DEBUG_CATEGORY_STATIC (tcpserversrc_debug);
#define TCP_DEFAULT_LISTEN_HOST NULL /* listen on all interfaces */ #define TCP_DEFAULT_LISTEN_HOST NULL /* listen on all interfaces */
#define TCP_BACKLOG 1 /* client connection queue */ #define TCP_BACKLOG 1 /* client connection queue */
#define MAX_READ_SIZE 4 * 1024
static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src", static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
GST_PAD_SRC, GST_PAD_SRC,
GST_PAD_ALWAYS, GST_PAD_ALWAYS,
GST_STATIC_CAPS_ANY); GST_STATIC_CAPS_ANY);
enum enum
{ {
PROP_0, PROP_0,
@ -69,12 +66,12 @@ enum
#define gst_tcp_server_src_parent_class parent_class #define gst_tcp_server_src_parent_class parent_class
G_DEFINE_TYPE (GstTCPServerSrc, gst_tcp_server_src, GST_TYPE_PUSH_SRC); G_DEFINE_TYPE (GstTCPServerSrc, gst_tcp_server_src, GST_TYPE_PUSH_SRC);
static void gst_tcp_server_src_finalize (GObject * gobject); static void gst_tcp_server_src_finalize (GObject * gobject);
static gboolean gst_tcp_server_src_start (GstBaseSrc * bsrc); static gboolean gst_tcp_server_src_start (GstBaseSrc * bsrc);
static gboolean gst_tcp_server_src_stop (GstBaseSrc * bsrc); static gboolean gst_tcp_server_src_stop (GstBaseSrc * bsrc);
static gboolean gst_tcp_server_src_unlock (GstBaseSrc * bsrc); static gboolean gst_tcp_server_src_unlock (GstBaseSrc * bsrc);
static gboolean gst_tcp_server_src_unlock_stop (GstBaseSrc * bsrc);
static GstFlowReturn gst_tcp_server_src_create (GstPushSrc * psrc, static GstFlowReturn gst_tcp_server_src_create (GstPushSrc * psrc,
GstBuffer ** buf); GstBuffer ** buf);
@ -119,6 +116,7 @@ gst_tcp_server_src_class_init (GstTCPServerSrcClass * klass)
gstbasesrc_class->start = gst_tcp_server_src_start; gstbasesrc_class->start = gst_tcp_server_src_start;
gstbasesrc_class->stop = gst_tcp_server_src_stop; gstbasesrc_class->stop = gst_tcp_server_src_stop;
gstbasesrc_class->unlock = gst_tcp_server_src_unlock; gstbasesrc_class->unlock = gst_tcp_server_src_unlock;
gstbasesrc_class->unlock_stop = gst_tcp_server_src_unlock_stop;
gstpush_src_class->create = gst_tcp_server_src_create; gstpush_src_class->create = gst_tcp_server_src_create;
@ -131,8 +129,9 @@ gst_tcp_server_src_init (GstTCPServerSrc * src)
{ {
src->server_port = TCP_DEFAULT_PORT; src->server_port = TCP_DEFAULT_PORT;
src->host = g_strdup (TCP_DEFAULT_HOST); src->host = g_strdup (TCP_DEFAULT_HOST);
src->server_sock_fd.fd = -1; src->server_socket = NULL;
src->client_sock_fd.fd = -1; src->client_socket = NULL;
src->cancellable = g_cancellable_new ();
GST_OBJECT_FLAG_UNSET (src, GST_TCP_SERVER_SRC_OPEN); GST_OBJECT_FLAG_UNSET (src, GST_TCP_SERVER_SRC_OPEN);
} }
@ -142,7 +141,18 @@ gst_tcp_server_src_finalize (GObject * gobject)
{ {
GstTCPServerSrc *src = GST_TCP_SERVER_SRC (gobject); GstTCPServerSrc *src = GST_TCP_SERVER_SRC (gobject);
if (src->cancellable)
g_object_unref (src->cancellable);
src->cancellable = NULL;
if (src->server_socket)
g_object_unref (src->server_socket);
src->server_socket = NULL;
if (src->client_socket)
g_object_unref (src->client_socket);
src->client_socket = NULL;
g_free (src->host); g_free (src->host);
src->host = NULL;
G_OBJECT_CLASS (parent_class)->finalize (gobject); G_OBJECT_CLASS (parent_class)->finalize (gobject);
} }
@ -152,51 +162,56 @@ gst_tcp_server_src_create (GstPushSrc * psrc, GstBuffer ** outbuf)
{ {
GstTCPServerSrc *src; GstTCPServerSrc *src;
GstFlowReturn ret = GST_FLOW_OK; GstFlowReturn ret = GST_FLOW_OK;
gssize rret;
GError *err = NULL;
guint8 *data;
src = GST_TCP_SERVER_SRC (psrc); src = GST_TCP_SERVER_SRC (psrc);
if (!GST_OBJECT_FLAG_IS_SET (src, GST_TCP_SERVER_SRC_OPEN)) if (!GST_OBJECT_FLAG_IS_SET (src, GST_TCP_SERVER_SRC_OPEN))
goto wrong_state; goto wrong_state;
restart: if (!src->client_socket) {
if (src->client_sock_fd.fd >= 0) { /* wait on server socket for connections */
/* if we have a client, wait for read */ src->client_socket =
gst_poll_fd_ctl_read (src->fdset, &src->server_sock_fd, FALSE); g_socket_accept (src->server_socket, src->cancellable, &err);
gst_poll_fd_ctl_read (src->fdset, &src->client_sock_fd, TRUE); if (!src->client_socket)
} else {
/* else wait on server socket for connections */
gst_poll_fd_ctl_read (src->fdset, &src->server_sock_fd, TRUE);
}
/* no action (0) is an error too in our case */
if ((ret = gst_poll_wait (src->fdset, GST_CLOCK_TIME_NONE)) <= 0) {
if (ret == -1 && errno == EBUSY)
goto select_cancelled;
else
goto select_error;
}
/* if we have no client socket we can accept one now */
if (src->client_sock_fd.fd < 0) {
if (gst_poll_fd_can_read (src->fdset, &src->server_sock_fd)) {
if ((src->client_sock_fd.fd =
accept (src->server_sock_fd.fd,
(struct sockaddr *) &src->client_sin,
&src->client_sin_len)) == -1)
goto accept_error; goto accept_error;
/* now read from the socket. */
gst_poll_add_fd (src->fdset, &src->client_sock_fd);
}
/* and restart now to poll the socket. */
goto restart;
} }
/* if we have a client, wait for read */
GST_LOG_OBJECT (src, "asked for a buffer"); GST_LOG_OBJECT (src, "asked for a buffer");
ret = gst_tcp_read_buffer (GST_ELEMENT (src), src->client_sock_fd.fd, /* read the buffer header */
src->fdset, outbuf); *outbuf = gst_buffer_new_and_alloc (MAX_READ_SIZE);
data = gst_buffer_map (*outbuf, NULL, NULL, GST_MAP_READWRITE);
rret =
g_socket_receive (src->client_socket, (gchar *) data, MAX_READ_SIZE,
src->cancellable, &err);
if (rret == 0) {
GST_DEBUG_OBJECT (src, "Connection closed");
ret = GST_FLOW_EOS;
gst_buffer_unmap (*outbuf, data, MAX_READ_SIZE);
gst_buffer_unref (*outbuf);
*outbuf = NULL;
} else if (ret < 0) {
if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
ret = GST_FLOW_WRONG_STATE;
GST_DEBUG_OBJECT (src, "Cancelled reading from socket");
} else {
ret = GST_FLOW_ERROR;
GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
("Failed to read from socket: %s", err->message));
}
gst_buffer_unmap (*outbuf, data, MAX_READ_SIZE);
gst_buffer_unref (*outbuf);
*outbuf = NULL;
} else {
ret = GST_FLOW_OK;
gst_buffer_unmap (*outbuf, data, rret);
if (ret == GST_FLOW_OK) {
GST_LOG_OBJECT (src, GST_LOG_OBJECT (src,
"Returning buffer from _get of size %" G_GSIZE_FORMAT ", ts %" "Returning buffer from _get of size %" G_GSIZE_FORMAT ", ts %"
GST_TIME_FORMAT ", dur %" GST_TIME_FORMAT GST_TIME_FORMAT ", dur %" GST_TIME_FORMAT
@ -206,6 +221,7 @@ restart:
GST_TIME_ARGS (GST_BUFFER_DURATION (*outbuf)), GST_TIME_ARGS (GST_BUFFER_DURATION (*outbuf)),
GST_BUFFER_OFFSET (*outbuf), GST_BUFFER_OFFSET_END (*outbuf)); GST_BUFFER_OFFSET (*outbuf), GST_BUFFER_OFFSET_END (*outbuf));
} }
g_clear_error (&err);
return ret; return ret;
@ -214,21 +230,15 @@ wrong_state:
GST_DEBUG_OBJECT (src, "connection to closed, cannot read data"); GST_DEBUG_OBJECT (src, "connection to closed, cannot read data");
return GST_FLOW_WRONG_STATE; return GST_FLOW_WRONG_STATE;
} }
select_error:
{
GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
("Select error: %s", g_strerror (errno)));
return GST_FLOW_ERROR;
}
select_cancelled:
{
GST_DEBUG_OBJECT (src, "select canceled");
return GST_FLOW_WRONG_STATE;
}
accept_error: accept_error:
{ {
if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
GST_DEBUG_OBJECT (src, "Cancelled accepting of client");
} else {
GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL), GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL),
("Could not accept client on server socket: %s", g_strerror (errno))); ("Failed to accept client: %s", err->message));
}
g_clear_error (&err);
return GST_FLOW_ERROR; return GST_FLOW_ERROR;
} }
} }
@ -271,7 +281,6 @@ gst_tcp_server_src_get_property (GObject * object, guint prop_id,
case PROP_PORT: case PROP_PORT:
g_value_set_int (value, tcpserversrc->server_port); g_value_set_int (value, tcpserversrc->server_port);
break; break;
default: default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break; break;
@ -282,104 +291,108 @@ gst_tcp_server_src_get_property (GObject * object, guint prop_id,
static gboolean static gboolean
gst_tcp_server_src_start (GstBaseSrc * bsrc) gst_tcp_server_src_start (GstBaseSrc * bsrc)
{ {
int ret;
GstTCPServerSrc *src = GST_TCP_SERVER_SRC (bsrc); GstTCPServerSrc *src = GST_TCP_SERVER_SRC (bsrc);
GError *err = NULL;
/* reset caps_received flag */ GInetAddress *addr;
src->caps_received = FALSE; GSocketAddress *saddr;
/* create the server listener socket */ /* create the server listener socket */
if ((src->server_sock_fd.fd = socket (AF_INET, SOCK_STREAM, 0)) == -1) src->server_socket = g_socket_new (G_SOCKET_FAMILY_IPV4, G_SOCKET_TYPE_STREAM,
goto socket_error; G_SOCKET_PROTOCOL_TCP, &err);
if (!src->server_socket)
goto no_socket;
GST_DEBUG_OBJECT (src, "opened receiving server socket with fd %d", GST_DEBUG_OBJECT (src, "opened receiving server socket");
src->server_sock_fd.fd);
/* make address reusable */ /* look up name if we need to */
ret = 1; addr = g_inet_address_new_from_string (src->host);
if (setsockopt (src->server_sock_fd.fd, SOL_SOCKET, SO_REUSEADDR, &ret, if (!addr) {
sizeof (int)) < 0) GResolver *resolver = g_resolver_get_default ();
goto sock_opt; GList *results;
/* name the socket */ results =
memset (&src->server_sin, 0, sizeof (src->server_sin)); g_resolver_lookup_by_name (resolver, src->host, src->cancellable, &err);
src->server_sin.sin_family = AF_INET; /* network socket */ if (!results)
src->server_sin.sin_port = htons (src->server_port); /* on port */ goto name_resolve;
if (src->host) { addr = G_INET_ADDRESS (g_object_ref (results->data));
gchar *host;
if (!(host = gst_tcp_host_to_ip (GST_ELEMENT (src), src->host))) g_resolver_free_addresses (results);
goto host_error; g_object_unref (resolver);
src->server_sin.sin_addr.s_addr = inet_addr (host); }
g_free (host); #ifndef GST_DISABLE_GST_DEBUG
} else {
src->server_sin.sin_addr.s_addr = htonl (INADDR_ANY); gchar *ip = g_inet_address_to_string (addr);
GST_DEBUG_OBJECT (src, "IP address for host %s is %s", src->host, ip);
g_free (ip);
}
#endif
/* bind it */ /* bind it */
saddr = g_inet_socket_address_new (addr, src->server_port);
GST_DEBUG_OBJECT (src, "binding server socket to address"); GST_DEBUG_OBJECT (src, "binding server socket to address");
if ((ret = bind (src->server_sock_fd.fd, (struct sockaddr *) &src->server_sin, if (!g_socket_bind (src->server_socket, saddr, TRUE, &err))
sizeof (src->server_sin))) < 0) goto bind_failed;
goto bind_error;
GST_DEBUG_OBJECT (src, "listening on server socket %d with queue of %d", GST_DEBUG_OBJECT (src, "listening on server socket");
src->server_sock_fd.fd, TCP_BACKLOG);
if (listen (src->server_sock_fd.fd, TCP_BACKLOG) == -1) g_socket_set_listen_backlog (src->server_socket, TCP_BACKLOG);
goto listen_error;
/* create an fdset to keep track of our file descriptors */ if (!g_socket_listen (src->server_socket, &err))
if ((src->fdset = gst_poll_new (TRUE)) == NULL) goto listen_failed;
goto socket_pair;
gst_poll_add_fd (src->fdset, &src->server_sock_fd);
GST_DEBUG_OBJECT (src, "received client");
GST_OBJECT_FLAG_SET (src, GST_TCP_SERVER_SRC_OPEN); GST_OBJECT_FLAG_SET (src, GST_TCP_SERVER_SRC_OPEN);
return TRUE; return TRUE;
/* ERRORS */ /* ERRORS */
socket_error: no_socket:
{ {
GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL), GST_ERROR_SYSTEM);
return FALSE;
}
sock_opt:
{
GST_ELEMENT_ERROR (src, RESOURCE, SETTINGS, (NULL),
("Could not setsockopt: %s", g_strerror (errno)));
gst_tcp_socket_close (&src->server_sock_fd);
return FALSE;
}
host_error:
{
gst_tcp_socket_close (&src->server_sock_fd);
return FALSE;
}
bind_error:
{
gst_tcp_socket_close (&src->server_sock_fd);
switch (errno) {
default:
GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL), GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL),
("bind failed: %s", g_strerror (errno))); ("Failed to create socket: %s", err->message));
break; g_clear_error (&err);
}
return FALSE; return FALSE;
} }
listen_error: name_resolve:
{ {
gst_tcp_socket_close (&src->server_sock_fd); if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
GST_DEBUG_OBJECT (src, "Cancelled name resolval");
} else {
GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL), GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL),
("Could not listen on server socket: %s", g_strerror (errno))); ("Failed to resolve host '%s': %s", src->host, err->message));
}
g_clear_error (&err);
gst_tcp_server_src_stop (GST_BASE_SRC (src));
return FALSE; return FALSE;
} }
socket_pair: bind_failed:
{ {
GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ_WRITE, (NULL), if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
GST_ERROR_SYSTEM); GST_DEBUG_OBJECT (src, "Cancelled binding");
gst_tcp_socket_close (&src->server_sock_fd); } else {
GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL),
("Failed to bind on host '%s:%d': %s", src->host, src->server_port,
err->message));
}
g_clear_error (&err);
g_object_unref (saddr);
g_object_unref (addr);
gst_tcp_server_src_stop (GST_BASE_SRC (src));
return FALSE;
}
listen_failed:
{
if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
GST_DEBUG_OBJECT (src, "Cancelled listening");
} else {
GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL),
("Failed to listen on host '%s:%d': %s", src->host, src->server_port,
err->message));
}
g_clear_error (&err);
g_object_unref (saddr);
g_object_unref (addr);
gst_tcp_server_src_stop (GST_BASE_SRC (src));
return FALSE; return FALSE;
} }
} }
@ -388,12 +401,29 @@ static gboolean
gst_tcp_server_src_stop (GstBaseSrc * bsrc) gst_tcp_server_src_stop (GstBaseSrc * bsrc)
{ {
GstTCPServerSrc *src = GST_TCP_SERVER_SRC (bsrc); GstTCPServerSrc *src = GST_TCP_SERVER_SRC (bsrc);
GError *err = NULL;
gst_poll_free (src->fdset); if (src->client_socket) {
src->fdset = NULL; GST_DEBUG_OBJECT (src, "closing socket");
gst_tcp_socket_close (&src->server_sock_fd); if (!g_socket_close (src->client_socket, &err)) {
gst_tcp_socket_close (&src->client_sock_fd); GST_ERROR_OBJECT (src, "Failed to close socket: %s", err->message);
g_clear_error (&err);
}
g_object_unref (src->client_socket);
src->client_socket = NULL;
}
if (src->server_socket) {
GST_DEBUG_OBJECT (src, "closing socket");
if (!g_socket_close (src->server_socket, &err)) {
GST_ERROR_OBJECT (src, "Failed to close socket: %s", err->message);
g_clear_error (&err);
}
g_object_unref (src->server_socket);
src->server_socket = NULL;
}
GST_OBJECT_FLAG_UNSET (src, GST_TCP_SERVER_SRC_OPEN); GST_OBJECT_FLAG_UNSET (src, GST_TCP_SERVER_SRC_OPEN);
@ -406,7 +436,17 @@ gst_tcp_server_src_unlock (GstBaseSrc * bsrc)
{ {
GstTCPServerSrc *src = GST_TCP_SERVER_SRC (bsrc); GstTCPServerSrc *src = GST_TCP_SERVER_SRC (bsrc);
gst_poll_set_flushing (src->fdset, TRUE); g_cancellable_cancel (src->cancellable);
return TRUE;
}
static gboolean
gst_tcp_server_src_unlock_stop (GstBaseSrc * bsrc)
{
GstTCPServerSrc *src = GST_TCP_SERVER_SRC (bsrc);
g_cancellable_reset (src->cancellable);
return TRUE; return TRUE;
} }

View file

@ -24,20 +24,12 @@
#include <gst/gst.h> #include <gst/gst.h>
#include <gst/base/gstpushsrc.h> #include <gst/base/gstpushsrc.h>
#include <gio/gio.h>
G_END_DECLS G_END_DECLS
#include <errno.h>
#include <string.h>
#include <sys/types.h>
#include <netdb.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "gsttcp.h" #include "gsttcp.h"
#include <fcntl.h>
#define GST_TYPE_TCP_SERVER_SRC \ #define GST_TYPE_TCP_SERVER_SRC \
(gst_tcp_server_src_get_type()) (gst_tcp_server_src_get_type())
#define GST_TCP_SERVER_SRC(obj) \ #define GST_TCP_SERVER_SRC(obj) \
@ -53,9 +45,9 @@ typedef struct _GstTCPServerSrc GstTCPServerSrc;
typedef struct _GstTCPServerSrcClass GstTCPServerSrcClass; typedef struct _GstTCPServerSrcClass GstTCPServerSrcClass;
typedef enum { typedef enum {
GST_TCP_SERVER_SRC_OPEN = (GST_ELEMENT_FLAG_LAST << 0), GST_TCP_SERVER_SRC_OPEN = (GST_BASE_SRC_FLAG_LAST << 0),
GST_TCP_SERVER_SRC_FLAG_LAST = (GST_ELEMENT_FLAG_LAST << 2) GST_TCP_SERVER_SRC_FLAG_LAST = (GST_BASE_SRC_FLAG_LAST << 2)
} GstTCPServerSrcFlags; } GstTCPServerSrcFlags;
struct _GstTCPServerSrc { struct _GstTCPServerSrc {
@ -64,17 +56,10 @@ struct _GstTCPServerSrc {
/* server information */ /* server information */
int server_port; int server_port;
gchar *host; gchar *host;
struct sockaddr_in server_sin;
GstPollFD server_sock_fd;
/* client information */ GCancellable *cancellable;
struct sockaddr_in client_sin; GSocket *server_socket;
socklen_t client_sin_len; GSocket *client_socket;
GstPollFD client_sock_fd;
GstPoll *fdset;
gboolean caps_received; /* if we have received caps yet */
}; };
struct _GstTCPServerSrcClass { struct _GstTCPServerSrcClass {