tcpclientsrc: Port to GIO

This commit is contained in:
Sebastian Dröge 2012-01-11 15:09:46 +01:00
parent 5c91ca3256
commit 075ec8b4e4
3 changed files with 117 additions and 94 deletions

View file

@ -23,9 +23,9 @@ libgsttcp_la_SOURCES = \
nodist_libgsttcp_la_SOURCES = \ nodist_libgsttcp_la_SOURCES = \
$(built_sources) $(built_sources)
libgsttcp_la_CFLAGS = $(GST_PLUGINS_BASE_CFLAGS) $(GST_BASE_CFLAGS) $(GST_CFLAGS) libgsttcp_la_CFLAGS = $(GST_PLUGINS_BASE_CFLAGS) $(GST_BASE_CFLAGS) $(GST_CFLAGS) $(GIO_CFLAGS)
libgsttcp_la_LDFLAGS = $(GST_PLUGIN_LDFLAGS) libgsttcp_la_LDFLAGS = $(GST_PLUGIN_LDFLAGS)
libgsttcp_la_LIBADD = $(GST_BASE_LIBS) $(GST_LIBS) libgsttcp_la_LIBADD = $(GST_BASE_LIBS) $(GST_LIBS) $(GIO_LIBS)
libgsttcp_la_LIBTOOLFLAGS = --tag=disable-static libgsttcp_la_LIBTOOLFLAGS = --tag=disable-static
noinst_HEADERS = \ noinst_HEADERS = \

View file

@ -1,6 +1,8 @@
/* GStreamer /* GStreamer
* Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu> * Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
* Copyright (C) <2004> Thomas Vander Stichele <thomas at apestaart dot org> * Copyright (C) <2004> Thomas Vander Stichele <thomas at apestaart dot org>
* Copyright (C) <2011> Collabora Ltd.
* Author: Sebastian Dröge <sebastian.droege@collabora.co.uk>
* *
* This library is free software; you can redistribute it and/or * This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public * modify it under the terms of the GNU Library General Public
@ -38,13 +40,8 @@
#endif #endif
#include <gst/gst-i18n-plugin.h> #include <gst/gst-i18n-plugin.h>
#include "gsttcp.h"
#include "gsttcpclientsrc.h" #include "gsttcpclientsrc.h"
#include <string.h> /* memset */ #include "gsttcp.h"
#include <unistd.h>
#include <arpa/inet.h>
#include <fcntl.h>
GST_DEBUG_CATEGORY_STATIC (tcpclientsrc_debug); GST_DEBUG_CATEGORY_STATIC (tcpclientsrc_debug);
#define GST_CAT_DEFAULT tcpclientsrc_debug #define GST_CAT_DEFAULT tcpclientsrc_debug
@ -137,8 +134,8 @@ gst_tcp_client_src_init (GstTCPClientSrc * this)
{ {
this->port = TCP_DEFAULT_PORT; this->port = TCP_DEFAULT_PORT;
this->host = g_strdup (TCP_DEFAULT_HOST); this->host = g_strdup (TCP_DEFAULT_HOST);
this->sock_fd.fd = -1; this->socket = NULL;
this->caps = NULL; this->cancellable = g_cancellable_new ();
GST_OBJECT_FLAG_UNSET (this, GST_TCP_CLIENT_SRC_OPEN); GST_OBJECT_FLAG_UNSET (this, GST_TCP_CLIENT_SRC_OPEN);
} }
@ -148,7 +145,14 @@ gst_tcp_client_src_finalize (GObject * gobject)
{ {
GstTCPClientSrc *this = GST_TCP_CLIENT_SRC (gobject); GstTCPClientSrc *this = GST_TCP_CLIENT_SRC (gobject);
if (this->cancellable)
g_object_unref (this->cancellable);
this->cancellable = NULL;
if (this->socket)
g_object_unref (this->socket);
this->socket = NULL;
g_free (this->host); g_free (this->host);
this->host = NULL;
G_OBJECT_CLASS (parent_class)->finalize (gobject); G_OBJECT_CLASS (parent_class)->finalize (gobject);
} }
@ -161,15 +165,7 @@ gst_tcp_client_src_getcaps (GstBaseSrc * bsrc, GstCaps * filter)
src = GST_TCP_CLIENT_SRC (bsrc); src = GST_TCP_CLIENT_SRC (bsrc);
if (!GST_OBJECT_FLAG_IS_SET (src, GST_TCP_CLIENT_SRC_OPEN)) caps = (filter ? gst_caps_ref (filter) : gst_caps_new_any ());
caps = (filter ? gst_caps_ref (filter) : gst_caps_new_any ());
else if (src->caps && filter)
caps =
gst_caps_intersect_full (filter, src->caps, GST_CAPS_INTERSECT_FIRST);
else if (src->caps)
caps = gst_caps_copy (src->caps);
else
caps = (filter ? gst_caps_ref (filter) : gst_caps_new_any ());
GST_DEBUG_OBJECT (src, "returning caps %" GST_PTR_FORMAT, caps); GST_DEBUG_OBJECT (src, "returning caps %" GST_PTR_FORMAT, caps);
g_assert (GST_IS_CAPS (caps)); g_assert (GST_IS_CAPS (caps));
@ -181,6 +177,9 @@ gst_tcp_client_src_create (GstPushSrc * psrc, GstBuffer ** outbuf)
{ {
GstTCPClientSrc *src; GstTCPClientSrc *src;
GstFlowReturn ret = GST_FLOW_OK; GstFlowReturn ret = GST_FLOW_OK;
gssize rret;
GError *err = NULL;
guint8 *data;
src = GST_TCP_CLIENT_SRC (psrc); src = GST_TCP_CLIENT_SRC (psrc);
@ -190,10 +189,34 @@ gst_tcp_client_src_create (GstPushSrc * psrc, GstBuffer ** outbuf)
GST_LOG_OBJECT (src, "asked for a buffer"); GST_LOG_OBJECT (src, "asked for a buffer");
/* read the buffer header */ /* read the buffer header */
ret = gst_tcp_read_buffer (GST_ELEMENT (src), src->sock_fd.fd, *outbuf = gst_buffer_new_and_alloc (MAX_READ_SIZE);
src->fdset, outbuf); data = gst_buffer_map (*outbuf, NULL, NULL, GST_MAP_READWRITE);
rret =
g_socket_receive (src->socket, (gchar *) data, MAX_READ_SIZE,
src->cancellable, &err);
if (rret == 0) {
GST_DEBUG_OBJECT (src, "Connection closed");
ret = GST_FLOW_EOS;
gst_buffer_unmap (*outbuf, data, MAX_READ_SIZE);
gst_buffer_unref (*outbuf);
*outbuf = NULL;
} else if (ret < 0) {
if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
ret = GST_FLOW_WRONG_STATE;
GST_DEBUG_OBJECT (src, "Cancelled reading from socket");
} else {
ret = GST_FLOW_ERROR;
GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
("Failed to read from socket: %s", err->message));
}
gst_buffer_unmap (*outbuf, data, MAX_READ_SIZE);
gst_buffer_unref (*outbuf);
*outbuf = NULL;
} else {
ret = GST_FLOW_OK;
gst_buffer_unmap (*outbuf, data, rret);
if (ret == GST_FLOW_OK) {
GST_LOG_OBJECT (src, GST_LOG_OBJECT (src,
"Returning buffer from _get of size %" G_GSIZE_FORMAT ", ts %" "Returning buffer from _get of size %" G_GSIZE_FORMAT ", ts %"
GST_TIME_FORMAT ", dur %" GST_TIME_FORMAT GST_TIME_FORMAT ", dur %" GST_TIME_FORMAT
@ -203,6 +226,7 @@ gst_tcp_client_src_create (GstPushSrc * psrc, GstBuffer ** outbuf)
GST_TIME_ARGS (GST_BUFFER_DURATION (*outbuf)), GST_TIME_ARGS (GST_BUFFER_DURATION (*outbuf)),
GST_BUFFER_OFFSET (*outbuf), GST_BUFFER_OFFSET_END (*outbuf)); GST_BUFFER_OFFSET (*outbuf), GST_BUFFER_OFFSET_END (*outbuf));
} }
g_clear_error (&err);
return ret; return ret;
@ -251,7 +275,6 @@ gst_tcp_client_src_get_property (GObject * object, guint prop_id,
case PROP_PORT: case PROP_PORT:
g_value_set_int (value, tcpclientsrc->port); g_value_set_int (value, tcpclientsrc->port);
break; break;
default: default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break; break;
@ -262,79 +285,90 @@ gst_tcp_client_src_get_property (GObject * object, guint prop_id,
static gboolean static gboolean
gst_tcp_client_src_start (GstBaseSrc * bsrc) gst_tcp_client_src_start (GstBaseSrc * bsrc)
{ {
int ret;
gchar *ip;
GstTCPClientSrc *src = GST_TCP_CLIENT_SRC (bsrc); GstTCPClientSrc *src = GST_TCP_CLIENT_SRC (bsrc);
GError *err = NULL;
if ((src->fdset = gst_poll_new (TRUE)) == NULL) GInetAddress *addr;
goto socket_pair; GSocketAddress *saddr;
/* create receiving client socket */ /* create receiving client socket */
GST_DEBUG_OBJECT (src, "opening receiving client socket to %s:%d", GST_DEBUG_OBJECT (src, "opening receiving client socket to %s:%d",
src->host, src->port); src->host, src->port);
if ((src->sock_fd.fd = socket (AF_INET, SOCK_STREAM, 0)) == -1) src->socket =
g_socket_new (G_SOCKET_FAMILY_IPV4, G_SOCKET_TYPE_STREAM,
G_SOCKET_PROTOCOL_TCP, &err);
if (!src->socket)
goto no_socket; goto no_socket;
GST_DEBUG_OBJECT (src, "opened receiving client socket with fd %d", GST_DEBUG_OBJECT (src, "opened receiving client socket");
src->sock_fd.fd);
GST_OBJECT_FLAG_SET (src, GST_TCP_CLIENT_SRC_OPEN); GST_OBJECT_FLAG_SET (src, GST_TCP_CLIENT_SRC_OPEN);
/* look up name if we need to */ /* look up name if we need to */
if (!(ip = gst_tcp_host_to_ip (GST_ELEMENT (src), src->host))) addr = g_inet_address_new_from_string (src->host);
goto name_resolv; if (!addr) {
GResolver *resolver = g_resolver_get_default ();
GList *results;
GST_DEBUG_OBJECT (src, "IP address for host %s is %s", src->host, ip); results =
g_resolver_lookup_by_name (resolver, src->host, src->cancellable, &err);
if (!results)
goto name_resolve;
addr = G_INET_ADDRESS (g_object_ref (results->data));
g_resolver_free_addresses (results);
g_object_unref (resolver);
}
#ifndef GST_DISABLE_GST_DEBUG
{
gchar *ip = g_inet_address_to_string (addr);
GST_DEBUG_OBJECT (src, "IP address for host %s is %s", src->host, ip);
g_free (ip);
}
#endif
/* connect to server */ /* connect to server */
memset (&src->server_sin, 0, sizeof (src->server_sin)); saddr = g_inet_socket_address_new (addr, src->port);
src->server_sin.sin_family = AF_INET; /* network socket */ if (!g_socket_connect (src->socket, saddr, src->cancellable, &err))
src->server_sin.sin_port = htons (src->port); /* on port */
src->server_sin.sin_addr.s_addr = inet_addr (ip); /* on host ip */
g_free (ip);
GST_DEBUG_OBJECT (src, "connecting to server");
ret = connect (src->sock_fd.fd, (struct sockaddr *) &src->server_sin,
sizeof (src->server_sin));
if (ret)
goto connect_failed; goto connect_failed;
/* add the socket to the poll */ g_object_unref (saddr);
gst_poll_add_fd (src->fdset, &src->sock_fd); g_object_unref (addr);
gst_poll_fd_ctl_read (src->fdset, &src->sock_fd, TRUE);
return TRUE; return TRUE;
socket_pair:
{
GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ_WRITE, (NULL),
GST_ERROR_SYSTEM);
return FALSE;
}
no_socket: no_socket:
{ {
GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL), GST_ERROR_SYSTEM); GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL),
("Failed to create socket: %s", err->message));
g_clear_error (&err);
return FALSE; return FALSE;
} }
name_resolv: name_resolve:
{ {
if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
GST_DEBUG_OBJECT (src, "Cancelled name resolval");
} else {
GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL),
("Failed to resolve host '%s': %s", src->host, err->message));
}
g_clear_error (&err);
gst_tcp_client_src_stop (GST_BASE_SRC (src)); gst_tcp_client_src_stop (GST_BASE_SRC (src));
return FALSE; return FALSE;
} }
connect_failed: connect_failed:
{ {
gst_tcp_client_src_stop (GST_BASE_SRC (src)); if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
switch (errno) { GST_DEBUG_OBJECT (src, "Cancelled connecting");
case ECONNREFUSED: } else {
GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL),
(_("Connection to %s:%d refused."), src->host, src->port), (NULL)); ("Failed to connect to host '%s:%d': %s", src->host, src->port,
break; err->message));
default:
GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL),
("connect to %s:%d failed: %s", src->host, src->port,
g_strerror (errno)));
break;
} }
g_clear_error (&err);
g_object_unref (saddr);
g_object_unref (addr);
gst_tcp_client_src_stop (GST_BASE_SRC (src));
return FALSE; return FALSE;
} }
} }
@ -346,22 +380,21 @@ static gboolean
gst_tcp_client_src_stop (GstBaseSrc * bsrc) gst_tcp_client_src_stop (GstBaseSrc * bsrc)
{ {
GstTCPClientSrc *src; GstTCPClientSrc *src;
GError *err = NULL;
src = GST_TCP_CLIENT_SRC (bsrc); src = GST_TCP_CLIENT_SRC (bsrc);
GST_DEBUG_OBJECT (src, "closing socket"); if (src->socket) {
GST_DEBUG_OBJECT (src, "closing socket");
if (src->fdset != NULL) { if (!g_socket_close (src->socket, &err)) {
gst_poll_free (src->fdset); GST_ERROR_OBJECT (src, "Failed to close socket: %s", err->message);
src->fdset = NULL; g_clear_error (&err);
}
g_object_unref (src->socket);
src->socket = NULL;
} }
gst_tcp_socket_close (&src->sock_fd);
src->caps_received = FALSE;
if (src->caps) {
gst_caps_unref (src->caps);
src->caps = NULL;
}
GST_OBJECT_FLAG_UNSET (src, GST_TCP_CLIENT_SRC_OPEN); GST_OBJECT_FLAG_UNSET (src, GST_TCP_CLIENT_SRC_OPEN);
return TRUE; return TRUE;
@ -374,7 +407,7 @@ gst_tcp_client_src_unlock (GstBaseSrc * bsrc)
GstTCPClientSrc *src = GST_TCP_CLIENT_SRC (bsrc); GstTCPClientSrc *src = GST_TCP_CLIENT_SRC (bsrc);
GST_DEBUG_OBJECT (src, "set to flushing"); GST_DEBUG_OBJECT (src, "set to flushing");
gst_poll_set_flushing (src->fdset, TRUE); g_cancellable_cancel (src->cancellable);
return TRUE; return TRUE;
} }
@ -386,7 +419,7 @@ gst_tcp_client_src_unlock_stop (GstBaseSrc * bsrc)
GstTCPClientSrc *src = GST_TCP_CLIENT_SRC (bsrc); GstTCPClientSrc *src = GST_TCP_CLIENT_SRC (bsrc);
GST_DEBUG_OBJECT (src, "unset flushing"); GST_DEBUG_OBJECT (src, "unset flushing");
gst_poll_set_flushing (src->fdset, FALSE); g_cancellable_reset (src->cancellable);
return TRUE; return TRUE;
} }

View file

@ -25,16 +25,10 @@
#include <gst/gst.h> #include <gst/gst.h>
#include <gst/base/gstpushsrc.h> #include <gst/base/gstpushsrc.h>
#include <gio/gio.h>
G_BEGIN_DECLS G_BEGIN_DECLS
#include <netdb.h> /* sockaddr_in */
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h> /* sockaddr_in */
#include <unistd.h>
#include "gsttcp.h"
#define GST_TYPE_TCP_CLIENT_SRC \ #define GST_TYPE_TCP_CLIENT_SRC \
(gst_tcp_client_src_get_type()) (gst_tcp_client_src_get_type())
#define GST_TCP_CLIENT_SRC(obj) \ #define GST_TCP_CLIENT_SRC(obj) \
@ -50,9 +44,9 @@ typedef struct _GstTCPClientSrc GstTCPClientSrc;
typedef struct _GstTCPClientSrcClass GstTCPClientSrcClass; typedef struct _GstTCPClientSrcClass GstTCPClientSrcClass;
typedef enum { typedef enum {
GST_TCP_CLIENT_SRC_OPEN = (GST_ELEMENT_FLAG_LAST << 0), GST_TCP_CLIENT_SRC_OPEN = (GST_BASE_SRC_FLAG_LAST << 0),
GST_TCP_CLIENT_SRC_FLAG_LAST = (GST_ELEMENT_FLAG_LAST << 2) GST_TCP_CLIENT_SRC_FLAG_LAST = (GST_BASE_SRC_FLAG_LAST << 2)
} GstTCPClientSrcFlags; } GstTCPClientSrcFlags;
struct _GstTCPClientSrc { struct _GstTCPClientSrc {
@ -61,14 +55,10 @@ struct _GstTCPClientSrc {
/* server information */ /* server information */
int port; int port;
gchar *host; gchar *host;
struct sockaddr_in server_sin;
/* socket */ /* socket */
GstPollFD sock_fd; GSocket *socket;
GstPoll *fdset; GCancellable *cancellable;
gboolean caps_received; /* if we have received caps yet */
GstCaps *caps;
}; };
struct _GstTCPClientSrcClass { struct _GstTCPClientSrcClass {