add new tcp elements

Original commit message from CVS:
add new tcp elements
This commit is contained in:
Thomas Vander Stichele 2004-05-20 10:15:31 +00:00
parent 12be7d6207
commit 253fc51655
15 changed files with 2836 additions and 5 deletions

View file

@ -1,3 +1,22 @@
2004-05-20 Thomas Vander Stichele <thomas at apestaart dot org>
* gst/tcp/.cvsignore:
ignore enums
* gst/tcp/Makefile.am:
* gst/tcp/README:
* gst/tcp/gsttcp.c:
* gst/tcp/gsttcp.h:
* gst/tcp/gsttcpclientsink.c:
* gst/tcp/gsttcpclientsink.h:
* gst/tcp/gsttcpclientsrc.c:
* gst/tcp/gsttcpclientsrc.h:
* gst/tcp/gsttcpplugin.c:
* gst/tcp/gsttcpserversink.c:
* gst/tcp/gsttcpserversink.h:
* gst/tcp/gsttcpserversrc.c:
* gst/tcp/gsttcpserversrc.h:
add new tcp elements
2004-05-19 Wim Taymans <wim@fluendo.com> 2004-05-19 Wim Taymans <wim@fluendo.com>
* gst/law/mulaw-conversion.c: (mulaw_encode): * gst/law/mulaw-conversion.c: (mulaw_encode):

2
gst/tcp/.gitignore vendored Normal file
View file

@ -0,0 +1,2 @@
gsttcp-enumtypes.c
gsttcp-enumtypes.h

View file

@ -1,11 +1,35 @@
plugin_LTLIBRARIES = libgsttcp.la plugin_LTLIBRARIES = libgsttcp.la
libgsttcp_la_SOURCES = gsttcpplugin.c gsttcpsrc.c gsttcpsink.c # variables used for enum/marshal generation
glib_enum_headers = gsttcp.h
glib_enum_define = GST_TCP_PROTOCOL
glib_enum_prefix = gst_tcp_protocol
include $(top_srcdir)/common/glib-gen.mak
built_sources = gsttcp-enumtypes.c
built_headers = gsttcp-enumtypes.h
BUILT_SOURCES = $(built_sources) $(built_headers)
libgsttcp_la_SOURCES = \
gsttcpplugin.c \
gsttcpsrc.c gsttcpsink.c \
$(built_sources) \
gsttcp.c \
gsttcpclientsrc.c gsttcpclientsink.c \
gsttcpserversrc.c gsttcpserversink.c
libgsttcp_la_CFLAGS = $(GST_CFLAGS) libgsttcp_la_CFLAGS = $(GST_CFLAGS)
libgsttcp_la_LIBADD = libgsttcp_la_LIBADD =
libgsttcp_la_LDFLAGS = $(GST_PLUGIN_LDFLAGS) libgsttcp_la_LDFLAGS = $(GST_PLUGIN_LDFLAGS)
noinst_HEADERS = gsttcpsink.h gsttcpsrc.h gsttcpplugin.h noinst_HEADERS = \
$(built_headers) \
gsttcpplugin.h \
gsttcpsrc.h gsttcpsink.h \
gsttcp.h \
gsttcpclientsrc.h gsttcpclientsink.h \
gsttcpserversrc.h gsttcpserversink.h
EXTRA_DIST = README CLEANFILES = $(BUILT_SOURCES)

View file

@ -1,6 +1,49 @@
This part of the documentation is for the new tcp elements:
- tcpclientsrc
- tcpclientsink
- tcpserversrc
- tcpserversink
which are created to replace the old tcpsrc/tcpsink
TESTS
-----
Use these tests to test functionality of the various tcp plugins
* server: nc -l -p 3000
client: nc localhost 3000
everything you type in the server is shown on the client
everything you type in the client is shown on the server
* server: nc -l -p 3000
client: gst-launch tcpclientsrc protocol=none port=3000 ! fdsink fd=2
everything you type in the server is shown on the client
* server: nc -l -p 3000
client: gst-launch fdsrc fd=1 ! tcpclientsink protocol=none port=3000
everything you type in the client is shown on the server
* server: gst-launch tcpserversrc protocol=none port=3000 ! fdsink fd=2
client: gst-launch fdsrc fd=1 ! tcpclientsink protocol=none port=3000
TODO
----
- implement DNS resolution
--------
This is the old documentation for the original tcpsrc/tcpsink elements.
* What is TCP src/sink? * What is TCP src/sink?
solution, like icecast or realaudio or whatever. But the future RTP plugins shall not do the actual transmission/reception of packets on the network themselve but the Application developer would be encouraged to use either the TCP or the UDP plugins for that. UDP would be used mostly but there could be situations where TCP would be the only available choice. For example streaming accross firewalls that do not allow UDP. solution, like icecast or realaudio or whatever.
But the future RTP plugins shall not do the actual transmission/reception
of packets on the network themselve but the Application developer would be
encouraged to use either the TCP or the UDP plugins for that. UDP would be
used mostly but there could be situations where TCP would be the only
available choice. For example streaming accross firewalls that do not
allow UDP.
* Shortcomings * Shortcomings

314
gst/tcp/gsttcp.c Normal file
View file

@ -0,0 +1,314 @@
/* GStreamer
* Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
* Copyright (C) <2004> Thomas Vander Stichele <thomas at apestaart dot org>
*
* gsttcp.c: TCP functions
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 59 Temple Place - Suite 330,
* Boston, MA 02111-1307, USA.
*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include <netinet/in.h>
#include <arpa/inet.h>
#include <netdb.h>
#include <unistd.h>
#include <glib.h>
#include <gst/gst.h>
#include <gst/gst-i18n-plugin.h>
#include <gst/dataprotocol/dataprotocol.h>
/* resolve host to IP address, throwing errors if it fails */
/* host can already be an IP address */
/* returns a newly allocated gchar * with the dotted ip address */
gchar *
gst_tcp_host_to_ip (GstElement * element, const gchar * host)
{
struct hostent *hostinfo;
char **addrs;
gchar *ip;
struct in_addr addr;
/* first check if it already is an IP address */
if (inet_aton (host, &addr)) {
return g_strdup (host);
}
/* FIXME: could do a localhost check here */
/* perform a name lookup */
hostinfo = gethostbyname (host);
if (!hostinfo) {
GST_ELEMENT_ERROR (element, RESOURCE, NOT_FOUND, (NULL),
("Could not find IP address for host \"%s\".", host));
return NULL;
}
if (hostinfo->h_addrtype != AF_INET) {
GST_ELEMENT_ERROR (element, RESOURCE, NOT_FOUND, (NULL),
("host \"%s\" is not an IP host", host));
return NULL;
}
addrs = hostinfo->h_addr_list;
/* There could be more than one IP address, but we just return the first */
ip = g_strdup (inet_ntoa (*(struct in_addr *) *addrs));
return ip;
}
/* write buffer to given socket incrementally.
* Returns number of bytes written.
*/
gint
gst_tcp_socket_write (int socket, const void *buf, size_t count)
{
size_t bytes_written = 0;
while (bytes_written < count) {
size_t wrote = write (socket, buf + bytes_written,
count - bytes_written);
if (wrote <= 0) {
return bytes_written;
}
bytes_written += wrote;
}
if (bytes_written < 0)
GST_DEBUG ("error while writing");
else
GST_DEBUG ("wrote %d bytes succesfully", bytes_written);
return bytes_written;
}
/* read number of bytes from a socket into a given buffer incrementally.
* Returns number of bytes read.
*/
gint
gst_tcp_socket_read (int socket, void *buf, size_t count)
{
size_t bytes_read = 0;
while (bytes_read < count) {
size_t ret = read (socket, buf + bytes_read,
count - bytes_read);
if (ret <= 0) {
return bytes_read;
}
bytes_read += ret;
}
if (bytes_read < 0)
GST_DEBUG ("error while reading");
else
GST_DEBUG ("read %d bytes succesfully", bytes_read);
return bytes_read;
}
/* read the gdp buffer header from the given socket
* returns a GstData,
* representing the new GstBuffer to read data into, or an EOS event
*/
GstData *
gst_tcp_gdp_read_header (GstElement * this, int socket)
{
size_t header_length = GST_DP_HEADER_LENGTH;
size_t readsize;
guint8 *header = NULL;
size_t ret;
GstBuffer *buffer;
header = g_malloc (header_length);
readsize = header_length;
GST_DEBUG_OBJECT (this, "Reading %d bytes for buffer packet header",
readsize);
ret = read (socket, header, readsize);
/* if we read 0 bytes, and we're blocking, we hit eos */
if (ret == 0) {
GST_DEBUG ("blocking read returns 0, EOS");
gst_element_set_eos (GST_ELEMENT (this));
return GST_DATA (gst_event_new (GST_EVENT_EOS));
}
if (ret < 0) {
GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
return NULL;
}
if (ret != readsize) {
g_warning ("Wanted %d bytes, got %d bytes", readsize, ret);
}
g_assert (ret == readsize);
if (!gst_dp_validate_header (header_length, header)) {
GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
("GDP buffer packet header does not validate"));
g_free (header);
return NULL;
}
GST_DEBUG_OBJECT (this, "validated buffer packet header");
buffer = gst_dp_buffer_from_header (header_length, header);
GST_DEBUG_OBJECT (this, "created new buffer %p from packet header", buffer);
return GST_DATA (buffer);
}
/* read the GDP caps packet from the given socket
* returns the caps, or NULL in case of an error */
GstCaps *
gst_tcp_gdp_read_caps (GstElement * this, int socket)
{
size_t header_length = GST_DP_HEADER_LENGTH;
size_t readsize;
guint8 *header = NULL;
guint8 *payload = NULL;
size_t ret;
GstCaps *caps;
gchar *string;
header = g_malloc (header_length);
readsize = header_length;
GST_LOG_OBJECT (this, "Reading %d bytes for caps packet header", readsize);
ret = read (socket, header, readsize);
if (ret < 0) {
GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
return NULL;
}
g_assert (ret == readsize);
if (!gst_dp_validate_header (header_length, header)) {
GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
("GDP caps packet header does not validate"));
g_free (header);
return NULL;
}
readsize = gst_dp_header_payload_length (header);
payload = g_malloc (readsize);
GST_LOG_OBJECT (this, "Reading %d bytes for caps packet payload", readsize);
ret = read (socket, payload, readsize);
if (ret < 0) {
GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
g_free (header);
g_free (payload);
return NULL;
}
if (gst_dp_header_payload_type (header) != GST_DP_PAYLOAD_CAPS) {
GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
("Header read doesn't describe CAPS payload"));
g_free (header);
g_free (payload);
return NULL;
}
g_assert (ret == readsize);
if (!gst_dp_validate_payload (readsize, header, payload)) {
GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
("GDP caps packet payload does not validate"));
g_free (header);
g_free (payload);
return NULL;
}
caps = gst_dp_caps_from_packet (header_length, header, payload);
string = gst_caps_to_string (caps);
GST_DEBUG_OBJECT (this, "retrieved GDP caps from packet payload: %s", string);
g_free (header);
g_free (payload);
g_free (string);
return caps;
}
/* write a GDP header to the socket. Return false if fails. */
gboolean
gst_tcp_gdp_write_header (GstElement * this, int socket, GstBuffer * buffer,
gboolean fatal, const gchar * host, int port)
{
guint length;
guint8 *header;
size_t wrote;
if (!gst_dp_header_from_buffer (buffer, 0, &length, &header)) {
if (fatal)
GST_ELEMENT_ERROR (this, CORE, TOO_LAZY, (NULL),
("Could not create GDP header from buffer"));
return FALSE;
}
GST_LOG_OBJECT (this, "writing %d bytes for GDP buffer header", length);
wrote = gst_tcp_socket_write (socket, header, length);
if (wrote != length) {
if (fatal)
GST_ELEMENT_ERROR (this, RESOURCE, WRITE,
(_("Error while sending data to \"%s:%d\"."), host, port),
("Only %d of %d bytes written: %s",
wrote, GST_BUFFER_SIZE (buffer), g_strerror (errno)));
return FALSE;
}
return TRUE;
}
/* write GDP header and payload to the given socket for the given caps.
* Return false if fails. */
gboolean
gst_tcp_gdp_write_caps (GstElement * this, int socket, const GstCaps * caps,
gboolean fatal, const char *host, int port)
{
guint length;
guint8 *header;
guint8 *payload;
size_t wrote;
if (!gst_dp_packet_from_caps (caps, 0, &length, &header, &payload)) {
if (fatal)
GST_ELEMENT_ERROR (this, CORE, TOO_LAZY, (NULL),
("Could not create GDP packet from caps"));
return FALSE;
}
GST_LOG_OBJECT (this, "writing %d bytes for GDP caps header", length);
wrote = gst_tcp_socket_write (socket, header, length);
if (wrote != length) {
if (fatal)
GST_ELEMENT_ERROR (this, RESOURCE, WRITE,
(_("Error while sending gdp header data to \"%s:%d\"."), host, port),
("Only %d of %d bytes written: %s",
wrote, length, g_strerror (errno)));
return FALSE;
}
length = gst_dp_header_payload_length (header);
GST_LOG_OBJECT (this, "writing %d bytes for GDP caps payload", length);
wrote = gst_tcp_socket_write (socket, payload, length);
if (wrote != length) {
if (fatal)
GST_ELEMENT_ERROR (this, RESOURCE, WRITE,
(_("Error while sending gdp payload data to \"%s:%d\"."), host, port),
("Only %d of %d bytes written: %s",
wrote, length, g_strerror (errno)));
return FALSE;
}
return TRUE;
}

51
gst/tcp/gsttcp.h Normal file
View file

@ -0,0 +1,51 @@
/* GStreamer
* Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
* Copyright (C) <2004> Thomas Vander Stichele <thomas at apestaart dot org>
*
* gsttcp.h: helper functions
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 59 Temple Place - Suite 330,
* Boston, MA 02111-1307, USA.
*/
#ifndef __GST_TCP_HELP_H__
#define __GST_TCP_HELP_H__
#include "gsttcp-enumtypes.h"
#include <gst/gst.h>
#include <gst/dataprotocol/dataprotocol.h>
G_BEGIN_DECLS
typedef enum
{
GST_TCP_PROTOCOL_TYPE_NONE,
GST_TCP_PROTOCOL_TYPE_GDP
} GstTCPProtocolType;
gchar * gst_tcp_host_to_ip (GstElement *element, const gchar *host);
gint gst_tcp_socket_write (int socket, const void *buf, size_t count);
gint gst_tcp_socket_read (int socket, void *buf, size_t count);
GstData * gst_tcp_gdp_read_header (GstElement *this, int socket);
GstCaps * gst_tcp_gdp_read_caps (GstElement *this, int socket);
gboolean gst_tcp_gdp_write_header (GstElement *this, int socket, GstBuffer *buffer, gboolean fatal, const gchar *host, int port);
gboolean gst_tcp_gdp_write_caps (GstElement *this, int socket, const GstCaps *caps, gboolean fatal, const gchar *host, int port);
G_END_DECLS
#endif /* __GST_TCP_HELP_H__ */

392
gst/tcp/gsttcpclientsink.c Normal file
View file

@ -0,0 +1,392 @@
/* GStreamer
* Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
* Copyright (C) <2004> Thomas Vander Stichele <thomas at apestaart dot org>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 59 Temple Place - Suite 330,
* Boston, MA 02111-1307, USA.
*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include <gst/gst-i18n-plugin.h>
#include <gst/dataprotocol/dataprotocol.h>
#include "gsttcp.h"
#include "gsttcpclientsink.h"
#define TCP_DEFAULT_HOST "localhost"
#define TCP_DEFAULT_PORT 4953
/* elementfactory information */
static GstElementDetails gst_tcpclientsink_details =
GST_ELEMENT_DETAILS ("TCP Client sink",
"Sink/Network",
"Send data as a client over the network via TCP",
"Thomas Vander Stichele <thomas at apestaart dot org>");
/* TCPClientSink signals and args */
enum
{
FRAME_ENCODED,
/* FILL ME */
LAST_SIGNAL
};
GST_DEBUG_CATEGORY (tcpclientsink_debug);
#define GST_CAT_DEFAULT (tcpclientsink_debug)
enum
{
ARG_0,
ARG_HOST,
ARG_PORT,
ARG_PROTOCOL
/* FILL ME */
};
static void gst_tcpclientsink_base_init (gpointer g_class);
static void gst_tcpclientsink_class_init (GstTCPClientSink * klass);
static void gst_tcpclientsink_init (GstTCPClientSink * tcpclientsink);
static void gst_tcpclientsink_set_clock (GstElement * element,
GstClock * clock);
static void gst_tcpclientsink_chain (GstPad * pad, GstData * _data);
static GstElementStateReturn gst_tcpclientsink_change_state (GstElement *
element);
static void gst_tcpclientsink_set_property (GObject * object, guint prop_id,
const GValue * value, GParamSpec * pspec);
static void gst_tcpclientsink_get_property (GObject * object, guint prop_id,
GValue * value, GParamSpec * pspec);
static GstElementClass *parent_class = NULL;
/*static guint gst_tcpclientsink_signals[LAST_SIGNAL] = { 0 }; */
GType
gst_tcpclientsink_get_type (void)
{
static GType tcpclientsink_type = 0;
if (!tcpclientsink_type) {
static const GTypeInfo tcpclientsink_info = {
sizeof (GstTCPClientSinkClass),
gst_tcpclientsink_base_init,
NULL,
(GClassInitFunc) gst_tcpclientsink_class_init,
NULL,
NULL,
sizeof (GstTCPClientSink),
0,
(GInstanceInitFunc) gst_tcpclientsink_init,
NULL
};
tcpclientsink_type =
g_type_register_static (GST_TYPE_ELEMENT, "GstTCPClientSink",
&tcpclientsink_info, 0);
}
return tcpclientsink_type;
}
static void
gst_tcpclientsink_base_init (gpointer g_class)
{
GstElementClass *element_class = GST_ELEMENT_CLASS (g_class);
gst_element_class_set_details (element_class, &gst_tcpclientsink_details);
}
static void
gst_tcpclientsink_class_init (GstTCPClientSink * klass)
{
GObjectClass *gobject_class;
GstElementClass *gstelement_class;
gobject_class = (GObjectClass *) klass;
gstelement_class = (GstElementClass *) klass;
parent_class = g_type_class_ref (GST_TYPE_ELEMENT);
g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_HOST,
g_param_spec_string ("host", "Host", "The host/IP to send the packets to",
TCP_DEFAULT_HOST, G_PARAM_READWRITE));
g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_PORT,
g_param_spec_int ("port", "Port", "The port to send the packets to",
0, 32768, TCP_DEFAULT_PORT, G_PARAM_READWRITE));
g_object_class_install_property (gobject_class, ARG_PROTOCOL,
g_param_spec_enum ("protocol", "Protocol", "The protocol to wrap data in",
GST_TYPE_TCP_PROTOCOL_TYPE, GST_TCP_PROTOCOL_TYPE_GDP,
G_PARAM_READWRITE));
gobject_class->set_property = gst_tcpclientsink_set_property;
gobject_class->get_property = gst_tcpclientsink_get_property;
gstelement_class->change_state = gst_tcpclientsink_change_state;
gstelement_class->set_clock = gst_tcpclientsink_set_clock;
GST_DEBUG_CATEGORY_INIT (tcpclientsink_debug, "tcpclientsink", 0, "TCP sink");
}
static void
gst_tcpclientsink_set_clock (GstElement * element, GstClock * clock)
{
GstTCPClientSink *tcpclientsink;
tcpclientsink = GST_TCPCLIENTSINK (element);
tcpclientsink->clock = clock;
}
static void
gst_tcpclientsink_init (GstTCPClientSink * this)
{
/* create the sink pad */
this->sinkpad = gst_pad_new ("sink", GST_PAD_SINK);
gst_element_add_pad (GST_ELEMENT (this), this->sinkpad);
gst_pad_set_chain_function (this->sinkpad, gst_tcpclientsink_chain);
this->host = g_strdup (TCP_DEFAULT_HOST);
this->port = TCP_DEFAULT_PORT;
/* should support as minimum 576 for IPV4 and 1500 for IPV6 */
/* this->mtu = 1500; */
this->sock_fd = -1;
this->protocol = GST_TCP_PROTOCOL_TYPE_GDP;
GST_FLAG_UNSET (this, GST_TCPCLIENTSINK_OPEN);
this->clock = NULL;
}
static void
gst_tcpclientsink_chain (GstPad * pad, GstData * _data)
{
size_t wrote = 0;
GstBuffer *buf = GST_BUFFER (_data);
GstTCPClientSink *sink;
g_return_if_fail (pad != NULL);
g_return_if_fail (GST_IS_PAD (pad));
g_return_if_fail (buf != NULL);
sink = GST_TCPCLIENTSINK (GST_OBJECT_PARENT (pad));
g_return_if_fail (GST_FLAG_IS_SET (sink, GST_TCPCLIENTSINK_OPEN));
if (GST_IS_EVENT (buf)) {
g_warning ("FIXME: handl events");
return;
}
/* write the buffer header if we have one */
switch (sink->protocol) {
case GST_TCP_PROTOCOL_TYPE_NONE:
break;
case GST_TCP_PROTOCOL_TYPE_GDP:
/* if we haven't send caps yet, send them first */
if (!sink->caps_sent) {
const GstCaps *caps;
gchar *string;
caps = GST_PAD_CAPS (GST_PAD_PEER (pad));
string = gst_caps_to_string (caps);
GST_DEBUG_OBJECT (sink, "Sending caps %s through GDP", string);
if (!gst_tcp_gdp_write_caps (GST_ELEMENT (sink), sink->sock_fd, caps,
TRUE, sink->host, sink->port)) {
g_free (string);
return;
}
g_free (string);
sink->caps_sent = TRUE;
}
GST_LOG_OBJECT (sink, "Sending buffer header through GDP");
if (!gst_tcp_gdp_write_header (GST_ELEMENT (sink), sink->sock_fd, buf,
TRUE, sink->host, sink->port))
return;
break;
default:
g_warning ("Unhandled protocol type");
break;
}
GST_LOG_OBJECT (sink, "writing %d bytes for buffer data",
GST_BUFFER_SIZE (buf));
wrote =
gst_tcp_socket_write (sink->sock_fd, GST_BUFFER_DATA (buf),
GST_BUFFER_SIZE (buf));
if (wrote < GST_BUFFER_SIZE (buf)) {
GST_ELEMENT_ERROR (sink, RESOURCE, WRITE,
(_("Error while sending data to \"%s:%d\"."), sink->host, sink->port),
("Only %d of %d bytes written: %s",
wrote, GST_BUFFER_SIZE (buf), g_strerror (errno)));
}
sink->data_written += wrote;
gst_buffer_unref (buf);
/* FIXME: emit signal ? */
}
static void
gst_tcpclientsink_set_property (GObject * object, guint prop_id,
const GValue * value, GParamSpec * pspec)
{
GstTCPClientSink *tcpclientsink;
/* it's not null if we got it, but it might not be ours */
g_return_if_fail (GST_IS_TCPCLIENTSINK (object));
tcpclientsink = GST_TCPCLIENTSINK (object);
switch (prop_id) {
case ARG_HOST:
if (tcpclientsink->host != NULL)
g_free (tcpclientsink->host);
if (g_value_get_string (value) == NULL)
tcpclientsink->host = NULL;
else
tcpclientsink->host = g_strdup (g_value_get_string (value));
break;
case ARG_PORT:
tcpclientsink->port = g_value_get_int (value);
break;
case ARG_PROTOCOL:
tcpclientsink->protocol = g_value_get_enum (value);
break;
default:
break;
}
}
static void
gst_tcpclientsink_get_property (GObject * object, guint prop_id, GValue * value,
GParamSpec * pspec)
{
GstTCPClientSink *tcpclientsink;
/* it's not null if we got it, but it might not be ours */
g_return_if_fail (GST_IS_TCPCLIENTSINK (object));
tcpclientsink = GST_TCPCLIENTSINK (object);
switch (prop_id) {
case ARG_HOST:
g_value_set_string (value, tcpclientsink->host);
break;
case ARG_PORT:
g_value_set_int (value, tcpclientsink->port);
break;
case ARG_PROTOCOL:
g_value_set_enum (value, tcpclientsink->protocol);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
}
}
/* create a socket for sending to remote machine */
static gboolean
gst_tcpclientsink_init_send (GstTCPClientSink * this)
{
int ret;
gchar *ip;
/* reset caps_sent flag */
this->caps_sent = FALSE;
/* create sending client socket */
GST_DEBUG_OBJECT (this, "opening sending client socket to %s:%d", this->host,
this->port);
if ((this->sock_fd = socket (AF_INET, SOCK_STREAM, 0)) == -1) {
GST_ELEMENT_ERROR (this, RESOURCE, OPEN_WRITE, (NULL), GST_ERROR_SYSTEM);
return FALSE;
}
GST_DEBUG_OBJECT (this, "opened sending client socket with fd %d",
this->sock_fd);
/* look up name if we need to */
ip = gst_tcp_host_to_ip (GST_ELEMENT (this), this->host);
if (!ip)
return FALSE;
GST_DEBUG_OBJECT (this, "IP address for host %s is %s", this->host, ip);
/* connect to server */
memset (&this->server_sin, 0, sizeof (this->server_sin));
this->server_sin.sin_family = AF_INET; /* network socket */
this->server_sin.sin_port = htons (this->port); /* on port */
this->server_sin.sin_addr.s_addr = inet_addr (ip); /* on host ip */
GST_DEBUG_OBJECT (this, "connecting to server");
ret = connect (this->sock_fd, (struct sockaddr *) &this->server_sin,
sizeof (this->server_sin));
if (ret) {
switch (errno) {
case ECONNREFUSED:
GST_ELEMENT_ERROR (this, RESOURCE, OPEN_WRITE,
(_("Connection to %s:%d refused."), this->host, this->port),
(NULL));
return FALSE;
break;
default:
GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ, (NULL),
("connect to %s:%d failed: %s", this->host, this->port,
g_strerror (errno)));
return FALSE;
break;
}
}
GST_FLAG_SET (this, GST_TCPCLIENTSINK_OPEN);
this->data_written = 0;
return TRUE;
}
static void
gst_tcpclientsink_close (GstTCPClientSink * this)
{
if (this->sock_fd != -1) {
close (this->sock_fd);
this->sock_fd = -1;
}
GST_FLAG_UNSET (this, GST_TCPCLIENTSINK_OPEN);
}
static GstElementStateReturn
gst_tcpclientsink_change_state (GstElement * element)
{
g_return_val_if_fail (GST_IS_TCPCLIENTSINK (element), GST_STATE_FAILURE);
if (GST_STATE_PENDING (element) == GST_STATE_NULL) {
if (GST_FLAG_IS_SET (element, GST_TCPCLIENTSINK_OPEN))
gst_tcpclientsink_close (GST_TCPCLIENTSINK (element));
} else {
if (!GST_FLAG_IS_SET (element, GST_TCPCLIENTSINK_OPEN)) {
if (!gst_tcpclientsink_init_send (GST_TCPCLIENTSINK (element)))
return GST_STATE_FAILURE;
}
}
if (GST_ELEMENT_CLASS (parent_class)->change_state)
return GST_ELEMENT_CLASS (parent_class)->change_state (element);
return GST_STATE_SUCCESS;
}

101
gst/tcp/gsttcpclientsink.h Normal file
View file

@ -0,0 +1,101 @@
/* GStreamer
* Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 59 Temple Place - Suite 330,
* Boston, MA 02111-1307, USA.
*/
#ifndef __GST_TCPCLIENTSINK_H__
#define __GST_TCPCLIENTSINK_H__
#include <gst/gst.h>
#include "gsttcp.h"
#ifdef __cplusplus
extern "C" {
#endif /* __cplusplus */
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <sys/time.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <netdb.h>
#include <sys/socket.h>
#include <sys/wait.h>
#include <fcntl.h>
#include <arpa/inet.h>
#include "gsttcp.h"
#define GST_TYPE_TCPCLIENTSINK \
(gst_tcpclientsink_get_type())
#define GST_TCPCLIENTSINK(obj) \
(G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_TCPCLIENTSINK,GstTCPClientSink))
#define GST_TCPCLIENTSINK_CLASS(klass) \
(G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_TCPCLIENTSINK,GstTCPClientSink))
#define GST_IS_TCPCLIENTSINK(obj) \
(G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_TCPCLIENTSINK))
#define GST_IS_TCPCLIENTSINK_CLASS(obj) \
(G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_TCPCLIENTSINK))
typedef struct _GstTCPClientSink GstTCPClientSink;
typedef struct _GstTCPClientSinkClass GstTCPClientSinkClass;
typedef enum {
GST_TCPCLIENTSINK_OPEN = GST_ELEMENT_FLAG_LAST,
GST_TCPCLIENTSINK_FLAG_LAST = GST_ELEMENT_FLAG_LAST + 2,
} GstTCPClientSinkFlags;
struct _GstTCPClientSink {
GstElement element;
/* pad */
GstPad *sinkpad;
/* server information */
int port;
gchar *host;
struct sockaddr_in server_sin;
/* socket */
int sock_fd;
size_t data_written; /* how much bytes have we written ? */
GstTCPProtocolType protocol; /* used with the protocol enum */
gboolean caps_sent; /* whether or not we sent caps already */
guint mtu;
GstClock *clock;
};
struct _GstTCPClientSinkClass {
GstElementClass parent_class;
};
GType gst_tcpclientsink_get_type(void);
#ifdef __cplusplus
}
#endif /* __cplusplus */
#endif /* __GST_TCPCLIENTSINK_H__ */

472
gst/tcp/gsttcpclientsrc.c Normal file
View file

@ -0,0 +1,472 @@
/* GStreamer
* Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
* Copyright (C) <2004> Thomas Vander Stichele <thomas at apestaart dot org>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 59 Temple Place - Suite 330,
* Boston, MA 02111-1307, USA.
*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include <gst/gst-i18n-plugin.h>
#include "gsttcp.h"
#include "gsttcpclientsrc.h"
#include <string.h> /* memset */
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/ioctl.h>
GST_DEBUG_CATEGORY (tcpclientsrc_debug);
#define GST_CAT_DEFAULT tcpclientsrc_debug
#define TCP_DEFAULT_PORT 4953
#define TCP_DEFAULT_HOST "localhost"
#define MAX_READ_SIZE 4 * 1024
/* elementfactory information */
static GstElementDetails gst_tcpclientsrc_details =
GST_ELEMENT_DETAILS ("TCP Client source",
"Source/Network",
"Receive data as a client over the network via TCP",
"Thomas Vander Stichele <thomas at apestaart dot org>");
/* TCPClientSrc signals and args */
enum
{
LAST_SIGNAL
};
enum
{
ARG_0,
ARG_PORT,
ARG_HOST,
ARG_PROTOCOL
};
static void gst_tcpclientsrc_base_init (gpointer g_class);
static void gst_tcpclientsrc_class_init (GstTCPClientSrc * klass);
static void gst_tcpclientsrc_init (GstTCPClientSrc * tcpclientsrc);
static GstData *gst_tcpclientsrc_get (GstPad * pad);
static GstElementStateReturn gst_tcpclientsrc_change_state (GstElement *
element);
static void gst_tcpclientsrc_set_property (GObject * object, guint prop_id,
const GValue * value, GParamSpec * pspec);
static void gst_tcpclientsrc_get_property (GObject * object, guint prop_id,
GValue * value, GParamSpec * pspec);
static void gst_tcpclientsrc_set_clock (GstElement * element, GstClock * clock);
static GstElementClass *parent_class = NULL;
/*static guint gst_tcpclientsrc_signals[LAST_SIGNAL] = { 0 }; */
GType
gst_tcpclientsrc_get_type (void)
{
static GType tcpclientsrc_type = 0;
if (!tcpclientsrc_type) {
static const GTypeInfo tcpclientsrc_info = {
sizeof (GstTCPClientSrcClass),
gst_tcpclientsrc_base_init,
NULL,
(GClassInitFunc) gst_tcpclientsrc_class_init,
NULL,
NULL,
sizeof (GstTCPClientSrc),
0,
(GInstanceInitFunc) gst_tcpclientsrc_init,
NULL
};
tcpclientsrc_type =
g_type_register_static (GST_TYPE_ELEMENT, "GstTCPClientSrc",
&tcpclientsrc_info, 0);
}
return tcpclientsrc_type;
}
static void
gst_tcpclientsrc_base_init (gpointer g_class)
{
GstElementClass *element_class = GST_ELEMENT_CLASS (g_class);
gst_element_class_set_details (element_class, &gst_tcpclientsrc_details);
}
static void
gst_tcpclientsrc_class_init (GstTCPClientSrc * klass)
{
GObjectClass *gobject_class;
GstElementClass *gstelement_class;
gobject_class = (GObjectClass *) klass;
gstelement_class = (GstElementClass *) klass;
parent_class = g_type_class_ref (GST_TYPE_ELEMENT);
g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_HOST,
g_param_spec_string ("host", "Host",
"The host IP address to receive packets from", TCP_DEFAULT_HOST,
G_PARAM_READWRITE));
g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_PORT,
g_param_spec_int ("port", "Port", "The port to receive packets from", 0,
32768, TCP_DEFAULT_PORT, G_PARAM_READWRITE));
g_object_class_install_property (gobject_class, ARG_PROTOCOL,
g_param_spec_enum ("protocol", "Protocol", "The protocol to wrap data in",
GST_TYPE_TCP_PROTOCOL_TYPE, GST_TCP_PROTOCOL_TYPE_GDP,
G_PARAM_READWRITE));
gobject_class->set_property = gst_tcpclientsrc_set_property;
gobject_class->get_property = gst_tcpclientsrc_get_property;
gstelement_class->change_state = gst_tcpclientsrc_change_state;
gstelement_class->set_clock = gst_tcpclientsrc_set_clock;
GST_DEBUG_CATEGORY_INIT (tcpclientsrc_debug, "tcpclientsrc", 0,
"TCP Client Source");
}
static void
gst_tcpclientsrc_set_clock (GstElement * element, GstClock * clock)
{
GstTCPClientSrc *tcpclientsrc;
tcpclientsrc = GST_TCPCLIENTSRC (element);
tcpclientsrc->clock = clock;
}
static void
gst_tcpclientsrc_init (GstTCPClientSrc * this)
{
/* create the src pad */
this->srcpad = gst_pad_new ("src", GST_PAD_SRC);
gst_element_add_pad (GST_ELEMENT (this), this->srcpad);
gst_pad_set_get_function (this->srcpad, gst_tcpclientsrc_get);
this->port = TCP_DEFAULT_PORT;
this->host = g_strdup (TCP_DEFAULT_HOST);
this->clock = NULL;
this->sock_fd = -1;
this->protocol = GST_TCP_PROTOCOL_TYPE_GDP;
this->curoffset = 0;
GST_FLAG_UNSET (this, GST_TCPCLIENTSRC_OPEN);
}
static GstData *
gst_tcpclientsrc_get (GstPad * pad)
{
GstTCPClientSrc *src;
size_t readsize;
int ret;
GstData *data = NULL;
GstBuffer *buf = NULL;
GstCaps *caps;
g_return_val_if_fail (pad != NULL, NULL);
g_return_val_if_fail (GST_IS_PAD (pad), NULL);
src = GST_TCPCLIENTSRC (GST_OBJECT_PARENT (pad));
g_return_val_if_fail (GST_FLAG_IS_SET (src, GST_TCPCLIENTSRC_OPEN), NULL);
/* if we have a left over buffer after a discont, return that */
if (src->buffer_after_discont) {
buf = src->buffer_after_discont;
GST_LOG_OBJECT (src,
"Returning buffer after discont of size %d with timestamp %"
GST_TIME_FORMAT " and duration %" GST_TIME_FORMAT,
GST_BUFFER_SIZE (buf), GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buf)),
GST_TIME_ARGS (GST_BUFFER_DURATION (buf)));
src->buffer_after_discont = NULL;
return GST_DATA (buf);
}
/* read the buffer header if we're using a protocol */
switch (src->protocol) {
fd_set testfds;
case GST_TCP_PROTOCOL_TYPE_NONE:
/* do a blocking select on the socket */
FD_ZERO (&testfds);
FD_SET (src->sock_fd, &testfds);
ret = select (src->sock_fd + 1, &testfds, (fd_set *) 0, (fd_set *) 0, 0);
/* no action (0) is an error too in our case */
if (ret <= 0) {
GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
("select failed: %s", g_strerror (errno)));
return NULL;
}
/* ask how much is available for reading on the socket */
ret = ioctl (src->sock_fd, FIONREAD, &readsize);
if (ret < 0) {
GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
("ioctl failed: %s", g_strerror (errno)));
return NULL;
}
buf = gst_buffer_new_and_alloc (readsize);
break;
case GST_TCP_PROTOCOL_TYPE_GDP:
/* if we haven't received caps yet, we should get them first */
if (!src->caps_received) {
gchar *string;
if (!(caps = gst_tcp_gdp_read_caps (GST_ELEMENT (src), src->sock_fd))) {
GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
("Could not read caps through GDP"));
return NULL;
}
src->caps_received = TRUE;
string = gst_caps_to_string (caps);
GST_DEBUG_OBJECT (src, "Received caps through GDP: %s", string);
g_free (string);
if (!gst_pad_try_set_caps (pad, caps)) {
g_warning ("Could not set caps");
return NULL;
}
}
/* now receive the buffer header */
if (!(data = gst_tcp_gdp_read_header (GST_ELEMENT (src), src->sock_fd))) {
GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
("Could not read data header through GDP"));
return NULL;
}
if (GST_IS_EVENT (data))
return data;
buf = GST_BUFFER (data);
GST_LOG_OBJECT (src, "Going to read data from socket into buffer %p",
buf);
/* use this new buffer to read data into */
readsize = GST_BUFFER_SIZE (buf);
break;
default:
g_warning ("Unhandled protocol type");
break;
}
GST_LOG_OBJECT (src, "Reading %d bytes", readsize);
ret = gst_tcp_socket_read (src->sock_fd, GST_BUFFER_DATA (buf), readsize);
if (ret < 0) {
GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
return NULL;
}
/* if we read 0 bytes, and we're blocking, we hit eos */
if (ret == 0) {
GST_DEBUG ("blocking read returns 0, EOS");
gst_buffer_unref (buf);
gst_element_set_eos (GST_ELEMENT (src));
return GST_DATA (gst_event_new (GST_EVENT_EOS));
}
readsize = ret;
GST_BUFFER_SIZE (buf) = readsize;
GST_BUFFER_MAXSIZE (buf) = readsize;
GST_BUFFER_OFFSET (buf) = src->curoffset;
GST_BUFFER_OFFSET_END (buf) = src->curoffset + readsize;
/* if this is our first buffer, we need to send a discont with the
* given timestamp or the current offset, and store the buffer for
* the next iteration through the get loop */
if (src->send_discont) {
GstClockTime timestamp;
GstEvent *event;
src->send_discont = FALSE;
src->buffer_after_discont = buf;
/* if the timestamp is valid, send a timed discont
* taking into account the incoming buffer's timestamps */
timestamp = GST_BUFFER_TIMESTAMP (buf);
if (GST_CLOCK_TIME_IS_VALID (timestamp)) {
GST_DEBUG_OBJECT (src,
"sending discontinuous with timestamp %" GST_TIME_FORMAT,
GST_TIME_ARGS (timestamp));
event =
gst_event_new_discontinuous (FALSE, GST_FORMAT_TIME, timestamp, NULL);
return GST_DATA (event);
}
/* otherwise, send an offset discont */
GST_DEBUG_OBJECT (src, "sending discontinuous with offset %d",
src->curoffset);
event =
gst_event_new_discontinuous (FALSE, GST_FORMAT_BYTES, src->curoffset,
NULL);
return GST_DATA (event);
}
src->curoffset += readsize;
GST_LOG_OBJECT (src,
"Returning buffer of size %d with timestamp %" GST_TIME_FORMAT
" and duration %" GST_TIME_FORMAT, readsize,
GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buf)),
GST_TIME_ARGS (GST_BUFFER_DURATION (buf)));
return GST_DATA (buf);
}
static void
gst_tcpclientsrc_set_property (GObject * object, guint prop_id,
const GValue * value, GParamSpec * pspec)
{
GstTCPClientSrc *tcpclientsrc;
/* it's not null if we got it, but it might not be ours */
g_return_if_fail (GST_IS_TCPCLIENTSRC (object));
tcpclientsrc = GST_TCPCLIENTSRC (object);
switch (prop_id) {
case ARG_PORT:
tcpclientsrc->port = g_value_get_int (value);
break;
case ARG_HOST:
/* FIXME: create a setter and handle changes correctly */
g_free (tcpclientsrc->host);
tcpclientsrc->host = g_strdup (g_value_get_string (value));
break;
case ARG_PROTOCOL:
tcpclientsrc->protocol = g_value_get_enum (value);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
}
}
static void
gst_tcpclientsrc_get_property (GObject * object, guint prop_id, GValue * value,
GParamSpec * pspec)
{
GstTCPClientSrc *tcpclientsrc;
g_return_if_fail (GST_IS_TCPCLIENTSRC (object));
tcpclientsrc = GST_TCPCLIENTSRC (object);
switch (prop_id) {
case ARG_PORT:
g_value_set_int (value, tcpclientsrc->port);
break;
case ARG_HOST:
g_value_set_string (value, tcpclientsrc->host);
break;
case ARG_PROTOCOL:
g_value_set_enum (value, tcpclientsrc->protocol);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
}
}
/* create a socket for connecting to remote server */
static gboolean
gst_tcpclientsrc_init_receive (GstTCPClientSrc * this)
{
int ret;
gchar *ip;
/* create receiving client socket */
GST_DEBUG_OBJECT (this, "opening receiving client socket to %s:%d",
this->host, this->port);
if ((this->sock_fd = socket (AF_INET, SOCK_STREAM, 0)) == -1) {
GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ, (NULL), GST_ERROR_SYSTEM);
return FALSE;
}
GST_DEBUG_OBJECT (this, "opened receiving client socket with fd %d",
this->sock_fd);
/* look up name if we need to */
ip = gst_tcp_host_to_ip (GST_ELEMENT (this), this->host);
if (!ip)
return FALSE;
GST_DEBUG_OBJECT (this, "IP address for host %s is %s", this->host, ip);
/* connect to server */
memset (&this->server_sin, 0, sizeof (this->server_sin));
this->server_sin.sin_family = AF_INET; /* network socket */
this->server_sin.sin_port = htons (this->port); /* on port */
this->server_sin.sin_addr.s_addr = inet_addr (ip); /* on host ip */
GST_DEBUG_OBJECT (this, "connecting to server");
ret = connect (this->sock_fd, (struct sockaddr *) &this->server_sin,
sizeof (this->server_sin));
if (ret) {
switch (errno) {
case ECONNREFUSED:
GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ,
(_("Connection to %s:%d refused."), this->host, this->port),
(NULL));
return FALSE;
break;
default:
GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ, (NULL),
("connect to %s:%d failed: %s", this->host, this->port,
g_strerror (errno)));
return FALSE;
break;
}
}
this->send_discont = TRUE;
this->buffer_after_discont = NULL;
GST_FLAG_SET (this, GST_TCPCLIENTSRC_OPEN);
return TRUE;
}
static void
gst_tcpclientsrc_close (GstTCPClientSrc * this)
{
if (this->sock_fd != -1) {
close (this->sock_fd);
this->sock_fd = -1;
}
GST_FLAG_UNSET (this, GST_TCPCLIENTSRC_OPEN);
}
static GstElementStateReturn
gst_tcpclientsrc_change_state (GstElement * element)
{
g_return_val_if_fail (GST_IS_TCPCLIENTSRC (element), GST_STATE_FAILURE);
if (GST_STATE_PENDING (element) == GST_STATE_NULL) {
if (GST_FLAG_IS_SET (element, GST_TCPCLIENTSRC_OPEN))
gst_tcpclientsrc_close (GST_TCPCLIENTSRC (element));
} else {
if (!GST_FLAG_IS_SET (element, GST_TCPCLIENTSRC_OPEN)) {
if (!gst_tcpclientsrc_init_receive (GST_TCPCLIENTSRC (element)))
return GST_STATE_FAILURE;
}
}
if (GST_ELEMENT_CLASS (parent_class)->change_state)
return GST_ELEMENT_CLASS (parent_class)->change_state (element);
return GST_STATE_SUCCESS;
}

90
gst/tcp/gsttcpclientsrc.h Normal file
View file

@ -0,0 +1,90 @@
/* GStreamer
* Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
* Copyright (C) <2004> Thomas Vander Stichele <thomas at apestaart dot org>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 59 Temple Place - Suite 330,
* Boston, MA 02111-1307, USA.
*/
#ifndef __GST_TCPCLIENTSRC_H__
#define __GST_TCPCLIENTSRC_H__
#include <gst/gst.h>
#ifdef __cplusplus
extern "C" {
#endif /* __cplusplus */
#include <netdb.h> /* sockaddr_in */
#include "gsttcp.h"
#define GST_TYPE_TCPCLIENTSRC \
(gst_tcpclientsrc_get_type())
#define GST_TCPCLIENTSRC(obj) \
(G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_TCPCLIENTSRC,GstTCPClientSrc))
#define GST_TCPCLIENTSRC_CLASS(klass) \
(G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_TCPCLIENTSRC,GstTCPClientSrc))
#define GST_IS_TCPCLIENTSRC(obj) \
(G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_TCPCLIENTSRC))
#define GST_IS_TCPCLIENTSRC_CLASS(obj) \
(G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_TCPCLIENTSRC))
typedef struct _GstTCPClientSrc GstTCPClientSrc;
typedef struct _GstTCPClientSrcClass GstTCPClientSrcClass;
typedef enum {
GST_TCPCLIENTSRC_OPEN = GST_ELEMENT_FLAG_LAST,
GST_TCPCLIENTSRC_FLAG_LAST,
} GstTCPClientSrcFlags;
struct _GstTCPClientSrc {
GstElement element;
/* pad */
GstPad *srcpad;
/* server information */
int port;
gchar *host;
struct sockaddr_in server_sin;
/* socket */
int sock_fd;
/* number of bytes we've gotten */
off_t curoffset;
GstTCPProtocolType protocol; /* protocol used for reading data */
gboolean caps_received; /* if we have received caps yet */
GstClock *clock;
gboolean send_discont; /* TRUE when we need to send a discont */
GstBuffer *buffer_after_discont; /* temporary storage for buffer */
};
struct _GstTCPClientSrcClass {
GstElementClass parent_class;
};
GType gst_tcpclientsrc_get_type (void);
#ifdef __cplusplus
}
#endif /* __cplusplus */
#endif /* __GST_TCPCLIENTSRC_H__ */

View file

@ -23,10 +23,17 @@
#include "gsttcpsrc.h" #include "gsttcpsrc.h"
#include "gsttcpsink.h" #include "gsttcpsink.h"
#include "gsttcpclientsrc.h"
#include "gsttcpclientsink.h"
#include "gsttcpserversrc.h"
#include "gsttcpserversink.h"
static gboolean static gboolean
plugin_init (GstPlugin * plugin) plugin_init (GstPlugin * plugin)
{ {
if (!gst_library_load ("gstdataprotocol"))
return FALSE;
if (!gst_element_register (plugin, "tcpsink", GST_RANK_NONE, if (!gst_element_register (plugin, "tcpsink", GST_RANK_NONE,
GST_TYPE_TCPSINK)) GST_TYPE_TCPSINK))
return FALSE; return FALSE;
@ -34,6 +41,19 @@ plugin_init (GstPlugin * plugin)
if (!gst_element_register (plugin, "tcpsrc", GST_RANK_NONE, GST_TYPE_TCPSRC)) if (!gst_element_register (plugin, "tcpsrc", GST_RANK_NONE, GST_TYPE_TCPSRC))
return FALSE; return FALSE;
if (!gst_element_register (plugin, "tcpclientsink", GST_RANK_NONE,
GST_TYPE_TCPCLIENTSINK))
return FALSE;
if (!gst_element_register (plugin, "tcpclientsrc", GST_RANK_NONE,
GST_TYPE_TCPCLIENTSRC))
return FALSE;
if (!gst_element_register (plugin, "tcpserversink", GST_RANK_NONE,
GST_TYPE_TCPSERVERSINK))
return FALSE;
if (!gst_element_register (plugin, "tcpserversrc", GST_RANK_NONE,
GST_TYPE_TCPSERVERSRC))
return FALSE;
return TRUE; return TRUE;
} }

545
gst/tcp/gsttcpserversink.c Normal file
View file

@ -0,0 +1,545 @@
/* GStreamer
* Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
* Copyright (C) <2004> Thomas Vander Stichele <thomas at apestaart dot org>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 59 Temple Place - Suite 330,
* Boston, MA 02111-1307, USA.
*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include <gst/gst-i18n-plugin.h>
#include <sys/ioctl.h>
#include "gsttcpserversink.h"
#define TCP_DEFAULT_HOST "127.0.0.1"
#define TCP_DEFAULT_PORT 4953
#define TCP_BACKLOG 5
/* elementfactory information */
static GstElementDetails gst_tcpserversink_details =
GST_ELEMENT_DETAILS ("TCP Server sink",
"Sink/Network",
"Send data as a server over the network via TCP",
"Thomas Vander Stichele <thomas at apestaart dot org>");
/* TCPServerSink signals and args */
enum
{
FRAME_ENCODED,
/* FILL ME */
LAST_SIGNAL
};
GST_DEBUG_CATEGORY (tcpserversink_debug);
#define GST_CAT_DEFAULT (tcpserversink_debug)
enum
{
ARG_0,
ARG_HOST,
ARG_PORT,
/* FILL ME */
};
static void gst_tcpserversink_base_init (gpointer g_class);
static void gst_tcpserversink_class_init (GstTCPServerSink * klass);
static void gst_tcpserversink_init (GstTCPServerSink * tcpserversink);
static void gst_tcpserversink_set_clock (GstElement * element,
GstClock * clock);
static void gst_tcpserversink_chain (GstPad * pad, GstData * _data);
static GstElementStateReturn gst_tcpserversink_change_state (GstElement *
element);
static void gst_tcpserversink_set_property (GObject * object, guint prop_id,
const GValue * value, GParamSpec * pspec);
static void gst_tcpserversink_get_property (GObject * object, guint prop_id,
GValue * value, GParamSpec * pspec);
static GstElementClass *parent_class = NULL;
/*static guint gst_tcpserversink_signals[LAST_SIGNAL] = { 0 }; */
GType
gst_tcpserversink_get_type (void)
{
static GType tcpserversink_type = 0;
if (!tcpserversink_type) {
static const GTypeInfo tcpserversink_info = {
sizeof (GstTCPServerSinkClass),
gst_tcpserversink_base_init,
NULL,
(GClassInitFunc) gst_tcpserversink_class_init,
NULL,
NULL,
sizeof (GstTCPServerSink),
0,
(GInstanceInitFunc) gst_tcpserversink_init,
NULL
};
tcpserversink_type =
g_type_register_static (GST_TYPE_ELEMENT, "GstTCPServerSink",
&tcpserversink_info, 0);
}
return tcpserversink_type;
}
static void
gst_tcpserversink_base_init (gpointer g_class)
{
GstElementClass *element_class = GST_ELEMENT_CLASS (g_class);
gst_element_class_set_details (element_class, &gst_tcpserversink_details);
}
static void
gst_tcpserversink_class_init (GstTCPServerSink * klass)
{
GObjectClass *gobject_class;
GstElementClass *gstelement_class;
gobject_class = (GObjectClass *) klass;
gstelement_class = (GstElementClass *) klass;
parent_class = g_type_class_ref (GST_TYPE_ELEMENT);
g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_HOST,
g_param_spec_string ("host", "host", "The host/IP to send the packets to",
TCP_DEFAULT_HOST, G_PARAM_READWRITE));
g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_PORT,
g_param_spec_int ("port", "port", "The port to send the packets to",
0, 32768, TCP_DEFAULT_PORT, G_PARAM_READWRITE));
gobject_class->set_property = gst_tcpserversink_set_property;
gobject_class->get_property = gst_tcpserversink_get_property;
gstelement_class->change_state = gst_tcpserversink_change_state;
gstelement_class->set_clock = gst_tcpserversink_set_clock;
GST_DEBUG_CATEGORY_INIT (tcpserversink_debug, "tcpserversink", 0, "TCP sink");
}
static void
gst_tcpserversink_set_clock (GstElement * element, GstClock * clock)
{
GstTCPServerSink *tcpserversink;
tcpserversink = GST_TCPSERVERSINK (element);
tcpserversink->clock = clock;
}
static void
gst_tcpserversink_init (GstTCPServerSink * this)
{
/* create the sink pad */
this->sinkpad = gst_pad_new ("sink", GST_PAD_SINK);
gst_element_add_pad (GST_ELEMENT (this), this->sinkpad);
gst_pad_set_chain_function (this->sinkpad, gst_tcpserversink_chain);
this->server_port = TCP_DEFAULT_PORT;
/* should support as minimum 576 for IPV4 and 1500 for IPV6 */
/* this->mtu = 1500; */
this->server_sock_fd = -1;
GST_FLAG_UNSET (this, GST_TCPSERVERSINK_OPEN);
this->protocol = GST_TCP_PROTOCOL_TYPE_GDP;
this->clock = NULL;
}
static void
gst_tcpserversink_debug_fdset (GstTCPServerSink * sink, fd_set * testfds)
{
int fd;
for (fd = 0; fd < FD_SETSIZE; fd++) {
if (FD_ISSET (fd, testfds)) {
GST_LOG_OBJECT (sink, "fd %d", fd);
}
}
}
/* handle a read request on the server,
* which indicates a new client connection */
static gboolean
gst_tcpserversink_handle_server_read (GstTCPServerSink * sink)
{
/* new client */
int client_sock_fd;
struct sockaddr_in client_address;
int client_address_len;
client_sock_fd =
accept (sink->server_sock_fd, (struct sockaddr *) &client_address,
&client_address_len);
if (client_sock_fd == -1) {
GST_ELEMENT_ERROR (sink, RESOURCE, OPEN_WRITE, (NULL),
("Could not accept client on server socket: %s", g_strerror (errno)));
return FALSE;
}
FD_SET (client_sock_fd, &(sink->clientfds));
GST_DEBUG_OBJECT (sink, "added new client ip %s with fd %d",
inet_ntoa (client_address.sin_addr), client_sock_fd);
return TRUE;
}
/* handle a read on a client fd,
* which either indicates a close or should be ignored */
static gboolean
gst_tcpserversink_handle_client_read (GstTCPServerSink * sink, int fd)
{
int nread;
GST_LOG_OBJECT (sink, "select reports client read on fd %d", fd);
ioctl (fd, FIONREAD, &nread);
if (nread == 0) {
/* client sent close, so remove it */
GST_DEBUG_OBJECT (sink, "removing client on fd %d", fd);
if (close (fd) != 0) {
GST_ELEMENT_ERROR (sink, RESOURCE, CLOSE, (NULL),
("error closing fd %d: %s", fd, g_strerror (errno)));
return FALSE;
}
FD_CLR (fd, &sink->clientfds);
FD_CLR (fd, &sink->caps_sent);
} else {
/* FIXME: we should probably just Read 'n' Drop */
g_warning ("Don't know what to do with %d bytes to read", nread);
}
return TRUE;
}
/* handle a write on a client fd,
* which indicates a read request from a client */
static gboolean
gst_tcpserversink_handle_client_write (GstTCPServerSink * sink, int fd,
GstPad * pad, GstBuffer * buf)
{
/* write the buffer header if we have one */
switch (sink->protocol) {
case GST_TCP_PROTOCOL_TYPE_NONE:
break;
case GST_TCP_PROTOCOL_TYPE_GDP:
/* if we haven't send caps yet, send them first */
if (!FD_ISSET (fd, &(sink->caps_sent))) {
const GstCaps *caps;
gchar *string;
caps = GST_PAD_CAPS (GST_PAD_PEER (pad));
string = gst_caps_to_string (caps);
GST_DEBUG_OBJECT (sink, "Sending caps %s for fd %d through GDP", string,
fd);
/* FIXME: fix this again so that write_caps is non-fatal for multiple clients; also use a fd, host, port struct */
if (!gst_tcp_gdp_write_caps (GST_ELEMENT (sink), fd, caps, FALSE,
"unknown", 0)) {
g_free (string);
return FALSE;
}
g_free (string);
FD_SET (fd, &(sink->caps_sent));
}
GST_LOG_OBJECT (sink, "Sending buffer header through GDP");
if (!gst_tcp_gdp_write_header (GST_ELEMENT (sink), fd, buf, FALSE,
"unknown", 0))
return FALSE;
break;
default:
g_warning ("Unhandled protocol type");
break;
}
/* serve data to client */
GST_LOG_OBJECT (sink, "serving data buffer of size %d to client on fd %d",
GST_BUFFER_SIZE (buf), fd);
int wrote = 0;
wrote =
gst_tcp_socket_write (fd, GST_BUFFER_DATA (buf), GST_BUFFER_SIZE (buf));
if (wrote < GST_BUFFER_SIZE (buf)) {
/* FIXME: keep track of client ip and port and so on */
/*
GST_ELEMENT_ERROR (sink, RESOURCE, WRITE,
(_("Error while sending data to \"%s:%d\"."), sink->host, sink->port),
("Only %d of %d bytes written: %s",
bytes_written, GST_BUFFER_SIZE (buf), g_strerror (errno)));
*/
/* FIXME: there should be a better way to report problems, since we
want to continue for other clients and just drop this particular one */
g_warning ("Write failed: %d of %d written", wrote, GST_BUFFER_SIZE (buf));
}
return TRUE;
}
static void
gst_tcpserversink_chain (GstPad * pad, GstData * _data)
{
int result;
int fd;
fd_set testreadfds, testwritefds;
struct timeval timeout;
struct timeval *timeoutp;
GstBuffer *buf = GST_BUFFER (_data);
GstTCPServerSink *sink;
g_return_if_fail (pad != NULL);
g_return_if_fail (GST_IS_PAD (pad));
g_return_if_fail (buf != NULL);
sink = GST_TCPSERVERSINK (GST_OBJECT_PARENT (pad));
g_return_if_fail (GST_FLAG_IS_SET (sink, GST_TCPSERVERSINK_OPEN));
if (GST_IS_EVENT (buf)) {
g_warning ("FIXME: handl events");
return;
}
/* if the incoming buffer has a duration, we can use that as the timeout
* value; otherwise, we block */
timeout.tv_sec = 0;
timeout.tv_usec = 0;
timeoutp = NULL;
GST_LOG_OBJECT (sink, "incoming buffer duration: %" GST_TIME_FORMAT,
GST_TIME_ARGS (GST_BUFFER_DURATION (buf)));
if (GST_CLOCK_TIME_IS_VALID (GST_BUFFER_DURATION (buf))) {
GST_TIME_TO_TIMEVAL (GST_BUFFER_DURATION (buf), timeout);
timeoutp = &timeout;
GST_LOG_OBJECT (sink, "select will be with timeout %" GST_TIME_FORMAT,
GST_TIME_ARGS (GST_BUFFER_DURATION (buf)));
GST_LOG_OBJECT (sink, "select will be with timeout %d.%d",
timeout.tv_sec, timeout.tv_usec);
}
/* check for:
* - server socket input (ie, new client connections)
* - client socket input (ie, clients saying goodbye)
* - client socket output (ie, client reads) */
testwritefds = sink->clientfds;
testreadfds = sink->clientfds;
FD_SET (sink->server_sock_fd, &testreadfds);
GST_LOG_OBJECT (sink, "doing select on server + client fds for reads");
gst_tcpserversink_debug_fdset (sink, &testreadfds);
GST_LOG_OBJECT (sink, "doing select on client fds for writes");
gst_tcpserversink_debug_fdset (sink, &testwritefds);
result = select (FD_SETSIZE, &testreadfds, &testwritefds, (fd_set *) 0,
timeoutp);
/* < 0 is an error, 0 just means a timeout happened */
if (result < 0) {
GST_ELEMENT_ERROR (sink, RESOURCE, READ, (NULL),
("select failed: %s", g_strerror (errno)));
return;
}
GST_LOG_OBJECT (sink, "%d sockets had action", result);
GST_LOG_OBJECT (sink, "done select on server/client fds for reads");
gst_tcpserversink_debug_fdset (sink, &testreadfds);
GST_LOG_OBJECT (sink, "done select on client fds for writes");
gst_tcpserversink_debug_fdset (sink, &testwritefds);
/* Check the reads */
for (fd = 0; fd < FD_SETSIZE; fd++) {
if (FD_ISSET (fd, &testreadfds)) {
if (fd == sink->server_sock_fd) {
/* handle new client connection on server socket */
if (!gst_tcpserversink_handle_server_read (sink))
return;
} else {
/* handle client read */
if (!gst_tcpserversink_handle_client_read (sink, fd))
return;
}
}
}
/* Check the writes */
for (fd = 0; fd < FD_SETSIZE; fd++) {
if (FD_ISSET (fd, &testwritefds)) {
if (!gst_tcpserversink_handle_client_write (sink, fd, pad, buf))
return;
}
}
sink->data_written += GST_BUFFER_SIZE (buf);
gst_buffer_unref (buf);
/* FIXME: emit signal ? */
}
static void
gst_tcpserversink_set_property (GObject * object, guint prop_id,
const GValue * value, GParamSpec * pspec)
{
GstTCPServerSink *tcpserversink;
g_return_if_fail (GST_IS_TCPSERVERSINK (object));
tcpserversink = GST_TCPSERVERSINK (object);
switch (prop_id) {
case ARG_HOST:
if (tcpserversink->host != NULL)
g_free (tcpserversink->host);
if (g_value_get_string (value) == NULL)
tcpserversink->host = NULL;
else
tcpserversink->host = g_strdup (g_value_get_string (value));
break;
case ARG_PORT:
tcpserversink->server_port = g_value_get_int (value);
break;
default:
break;
}
}
static void
gst_tcpserversink_get_property (GObject * object, guint prop_id, GValue * value,
GParamSpec * pspec)
{
GstTCPServerSink *tcpserversink;
g_return_if_fail (GST_IS_TCPSERVERSINK (object));
tcpserversink = GST_TCPSERVERSINK (object);
switch (prop_id) {
case ARG_HOST:
g_value_set_string (value, tcpserversink->host);
break;
case ARG_PORT:
g_value_set_int (value, tcpserversink->server_port);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
}
}
/* create a socket for sending to remote machine */
static gboolean
gst_tcpserversink_init_send (GstTCPServerSink * this)
{
int ret;
/* create sending server socket */
if ((this->server_sock_fd = socket (AF_INET, SOCK_STREAM, 0)) == -1) {
GST_ELEMENT_ERROR (this, RESOURCE, OPEN_WRITE, (NULL), GST_ERROR_SYSTEM);
return FALSE;
}
GST_DEBUG_OBJECT (this, "opened sending server socket with fd %d",
this->server_sock_fd);
/* make address reusable */
if (setsockopt (this->server_sock_fd, SOL_SOCKET, SO_REUSEADDR, &ret,
sizeof (int)) < 0) {
GST_ELEMENT_ERROR (this, RESOURCE, SETTINGS, (NULL),
("Could not setsockopt: %s", g_strerror (errno)));
return FALSE;
}
/* keep connection alive; avoids SIGPIPE during write */
if (setsockopt (this->server_sock_fd, SOL_SOCKET, SO_KEEPALIVE, &ret,
sizeof (int)) < 0) {
GST_ELEMENT_ERROR (this, RESOURCE, SETTINGS, (NULL),
("Could not setsockopt: %s", g_strerror (errno)));
return FALSE;
}
/* name the socket */
memset (&this->server_sin, 0, sizeof (this->server_sin));
this->server_sin.sin_family = AF_INET; /* network socket */
this->server_sin.sin_port = htons (this->server_port); /* on port */
this->server_sin.sin_addr.s_addr = htonl (INADDR_ANY); /* for hosts */
/* bind it */
GST_DEBUG_OBJECT (this, "binding server socket to address");
ret = bind (this->server_sock_fd, (struct sockaddr *) &this->server_sin,
sizeof (this->server_sin));
if (ret) {
switch (errno) {
default:
GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ, (NULL),
("bind failed: %s", g_strerror (errno)));
return FALSE;
break;
}
}
/* set the server socket to nonblocking */
fcntl (this->server_sock_fd, F_SETFL, O_NONBLOCK);
GST_DEBUG_OBJECT (this, "listening on server socket %d with queue of %d",
this->server_sock_fd, TCP_BACKLOG);
if (listen (this->server_sock_fd, TCP_BACKLOG) == -1) {
GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ, (NULL),
("Could not listen on server socket: %s", g_strerror (errno)));
return FALSE;
}
GST_DEBUG_OBJECT (this,
"listened on server socket %d, returning from connection setup",
this->server_sock_fd);
FD_ZERO (&this->clientfds);
FD_ZERO (&this->caps_sent);
FD_SET (this->server_sock_fd, &this->clientfds);
GST_FLAG_SET (this, GST_TCPSERVERSINK_OPEN);
this->data_written = 0;
return TRUE;
}
static void
gst_tcpserversink_close (GstTCPServerSink * this)
{
if (this->server_sock_fd != -1) {
close (this->server_sock_fd);
this->server_sock_fd = -1;
}
GST_FLAG_UNSET (this, GST_TCPSERVERSINK_OPEN);
}
static GstElementStateReturn
gst_tcpserversink_change_state (GstElement * element)
{
g_return_val_if_fail (GST_IS_TCPSERVERSINK (element), GST_STATE_FAILURE);
if (GST_STATE_PENDING (element) == GST_STATE_NULL) {
if (GST_FLAG_IS_SET (element, GST_TCPSERVERSINK_OPEN))
gst_tcpserversink_close (GST_TCPSERVERSINK (element));
} else {
if (!GST_FLAG_IS_SET (element, GST_TCPSERVERSINK_OPEN)) {
if (!gst_tcpserversink_init_send (GST_TCPSERVERSINK (element)))
return GST_STATE_FAILURE;
}
}
if (GST_ELEMENT_CLASS (parent_class)->change_state)
return GST_ELEMENT_CLASS (parent_class)->change_state (element);
return GST_STATE_SUCCESS;
}

103
gst/tcp/gsttcpserversink.h Normal file
View file

@ -0,0 +1,103 @@
/* GStreamer
* Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
* Copyright (C) <2004> Thomas Vander Stichele <thomas at apestaart dot org>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 59 Temple Place - Suite 330,
* Boston, MA 02111-1307, USA.
*/
#ifndef __GST_TCPSERVERSINK_H__
#define __GST_TCPSERVERSINK_H__
#include <gst/gst.h>
#ifdef __cplusplus
extern "C" {
#endif /* __cplusplus */
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <sys/time.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <netdb.h>
#include <sys/socket.h>
#include <sys/wait.h>
#include <fcntl.h>
#include <arpa/inet.h>
#include "gsttcp.h"
#define GST_TYPE_TCPSERVERSINK \
(gst_tcpserversink_get_type())
#define GST_TCPSERVERSINK(obj) \
(G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_TCPSERVERSINK,GstTCPServerSink))
#define GST_TCPSERVERSINK_CLASS(klass) \
(G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_TCPSERVERSINK,GstTCPServerSink))
#define GST_IS_TCPSERVERSINK(obj) \
(G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_TCPSERVERSINK))
#define GST_IS_TCPSERVERSINK_CLASS(obj) \
(G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_TCPSERVERSINK))
typedef struct _GstTCPServerSink GstTCPServerSink;
typedef struct _GstTCPServerSinkClass GstTCPServerSinkClass;
typedef enum {
GST_TCPSERVERSINK_OPEN = GST_ELEMENT_FLAG_LAST,
GST_TCPSERVERSINK_FLAG_LAST = GST_ELEMENT_FLAG_LAST + 2,
} GstTCPServerSinkFlags;
struct _GstTCPServerSink {
GstElement element;
/* pad */
GstPad *sinkpad;
/* server information */
int server_port;
gchar *host;
struct sockaddr_in server_sin;
/* socket */
int server_sock_fd;
size_t data_written; /* how much bytes have we written ? */
fd_set clientfds; /* all the client file descriptors that are open */
fd_set caps_sent; /* all the client file descriptors that have had caps sent */
GstTCPProtocolType protocol;
guint mtu;
GstClock *clock;
};
struct _GstTCPServerSinkClass {
GstElementClass parent_class;
};
GType gst_tcpserversink_get_type (void);
#ifdef __cplusplus
}
#endif /* __cplusplus */
#endif /* __GST_TCPSERVERSINK_H__ */

557
gst/tcp/gsttcpserversrc.c Normal file
View file

@ -0,0 +1,557 @@
/* GStreamer
* Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
* Copyright (C) <2004> Thomas Vander Stichele <thomas at apestaart dot org>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 59 Temple Place - Suite 330,
* Boston, MA 02111-1307, USA.
*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include <gst/gst-i18n-plugin.h>
#include "gsttcp.h"
#include "gsttcpserversrc.h"
#include <unistd.h>
#include <sys/ioctl.h>
GST_DEBUG_CATEGORY (tcpserversrc_debug);
#define GST_CAT_DEFAULT tcpserversrc_debug
#define TCP_DEFAULT_PORT 4953
#define TCP_DEFAULT_HOST NULL /* listen on all interfaces */
#define TCP_BACKLOG 1 /* client connection queue */
/* elementfactory information */
static GstElementDetails gst_tcpserversrc_details =
GST_ELEMENT_DETAILS ("TCP Server source",
"Source/Network",
"Receive data as a server over the network via TCP",
"Thomas Vander Stichele <thomas at apestaart dot org>");
/* TCPServerSrc signals and args */
enum
{
LAST_SIGNAL
};
enum
{
ARG_0,
ARG_PORT,
ARG_HOST,
ARG_PROTOCOL
};
static void gst_tcpserversrc_base_init (gpointer g_class);
static void gst_tcpserversrc_class_init (GstTCPServerSrc * klass);
static void gst_tcpserversrc_init (GstTCPServerSrc * tcpserversrc);
static GstData *gst_tcpserversrc_get (GstPad * pad);
static GstElementStateReturn gst_tcpserversrc_change_state (GstElement *
element);
static void gst_tcpserversrc_set_property (GObject * object, guint prop_id,
const GValue * value, GParamSpec * pspec);
static void gst_tcpserversrc_get_property (GObject * object, guint prop_id,
GValue * value, GParamSpec * pspec);
static void gst_tcpserversrc_set_clock (GstElement * element, GstClock * clock);
static GstElementClass *parent_class = NULL;
/*static guint gst_tcpserversrc_signals[LAST_SIGNAL] = { 0 }; */
GType
gst_tcpserversrc_get_type (void)
{
static GType tcpserversrc_type = 0;
if (!tcpserversrc_type) {
static const GTypeInfo tcpserversrc_info = {
sizeof (GstTCPServerSrcClass),
gst_tcpserversrc_base_init,
NULL,
(GClassInitFunc) gst_tcpserversrc_class_init,
NULL,
NULL,
sizeof (GstTCPServerSrc),
0,
(GInstanceInitFunc) gst_tcpserversrc_init,
NULL
};
tcpserversrc_type =
g_type_register_static (GST_TYPE_ELEMENT, "GstTCPServerSrc",
&tcpserversrc_info, 0);
}
return tcpserversrc_type;
}
static void
gst_tcpserversrc_base_init (gpointer g_class)
{
GstElementClass *element_class = GST_ELEMENT_CLASS (g_class);
gst_element_class_set_details (element_class, &gst_tcpserversrc_details);
}
static void
gst_tcpserversrc_class_init (GstTCPServerSrc * klass)
{
GObjectClass *gobject_class;
GstElementClass *gstelement_class;
gobject_class = (GObjectClass *) klass;
gstelement_class = (GstElementClass *) klass;
parent_class = g_type_class_ref (GST_TYPE_ELEMENT);
g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_PORT,
g_param_spec_int ("port", "Port", "The port to listen to",
0, 32768, TCP_DEFAULT_PORT, G_PARAM_READWRITE));
g_object_class_install_property (gobject_class, ARG_PROTOCOL,
g_param_spec_enum ("protocol", "Protocol", "The protocol to wrap data in",
GST_TYPE_TCP_PROTOCOL_TYPE, GST_TCP_PROTOCOL_TYPE_GDP,
G_PARAM_READWRITE));
g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_HOST,
g_param_spec_string ("host", "Host", "The hostname to listen",
TCP_DEFAULT_HOST, G_PARAM_READWRITE));
gobject_class->set_property = gst_tcpserversrc_set_property;
gobject_class->get_property = gst_tcpserversrc_get_property;
gstelement_class->change_state = gst_tcpserversrc_change_state;
gstelement_class->set_clock = gst_tcpserversrc_set_clock;
GST_DEBUG_CATEGORY_INIT (tcpserversrc_debug, "tcpserversrc", 0,
"TCP Server Source");
}
static void
gst_tcpserversrc_set_clock (GstElement * element, GstClock * clock)
{
GstTCPServerSrc *tcpserversrc;
tcpserversrc = GST_TCPSERVERSRC (element);
tcpserversrc->clock = clock;
}
static void
gst_tcpserversrc_init (GstTCPServerSrc * this)
{
/* create the src pad */
this->srcpad = gst_pad_new ("src", GST_PAD_SRC);
gst_element_add_pad (GST_ELEMENT (this), this->srcpad);
gst_pad_set_get_function (this->srcpad, gst_tcpserversrc_get);
this->server_port = TCP_DEFAULT_PORT;
this->host = TCP_DEFAULT_HOST;
this->clock = NULL;
this->server_sock_fd = -1;
this->client_sock_fd = -1;
this->curoffset = 0;
this->protocol = GST_TCP_PROTOCOL_TYPE_GDP;
GST_FLAG_UNSET (this, GST_TCPSERVERSRC_OPEN);
}
/* read the gdp caps packet from the socket */
static GstCaps *
gst_tcpserversrc_gdp_read_caps (GstTCPServerSrc * this)
{
size_t header_length = GST_DP_HEADER_LENGTH;
size_t readsize;
guint8 *header = NULL;
guint8 *payload = NULL;
size_t ret;
GstCaps *caps;
gchar *string;
header = g_malloc (header_length);
readsize = header_length;
GST_LOG_OBJECT (this, "Reading %d bytes for caps packet header", readsize);
ret = read (this->client_sock_fd, header, readsize);
if (ret < 0) {
GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
return NULL;
}
g_assert (ret == readsize);
if (!gst_dp_validate_header (header_length, header)) {
GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
("GDP caps packet header does not validate"));
g_free (header);
return NULL;
}
readsize = gst_dp_header_payload_length (header);
payload = g_malloc (readsize);
GST_LOG_OBJECT (this, "Reading %d bytes for caps packet payload", readsize);
ret = read (this->client_sock_fd, payload, readsize);
if (ret < 0) {
GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
g_free (header);
return NULL;
}
g_assert (ret == readsize);
if (!gst_dp_validate_payload (readsize, header, payload)) {
GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
("GDP caps packet payload does not validate"));
g_free (header);
g_free (payload);
return NULL;
}
caps = gst_dp_caps_from_packet (header_length, header, payload);
string = gst_caps_to_string (caps);
GST_DEBUG_OBJECT (this, "retrieved GDP caps from packet payload: %s", string);
g_free (header);
g_free (payload);
g_free (string);
return caps;
}
/* read the gdp buffer header from the socket
* returns a GstData,
* representing the new GstBuffer to read data into, or an EOS event
*/
static GstData *
gst_tcpserversrc_gdp_read_header (GstTCPServerSrc * this)
{
size_t header_length = GST_DP_HEADER_LENGTH;
size_t readsize;
guint8 *header = NULL;
size_t ret;
GstBuffer *buffer;
header = g_malloc (header_length);
readsize = header_length;
GST_LOG_OBJECT (this, "Reading %d bytes for buffer packet header", readsize);
ret = read (this->client_sock_fd, header, readsize);
/* if we read 0 bytes, and we're blocking, we hit eos */
if (ret == 0) {
GST_DEBUG ("blocking read returns 0, EOS");
gst_element_set_eos (GST_ELEMENT (this));
return GST_DATA (gst_event_new (GST_EVENT_EOS));
}
if (ret < 0) {
GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
return NULL;
}
if (ret != readsize) {
g_warning ("Wanted %d bytes, got %d bytes", readsize, ret);
}
g_assert (ret == readsize);
if (!gst_dp_validate_header (header_length, header)) {
GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
("GDP buffer packet header does not validate"));
g_free (header);
return NULL;
}
GST_LOG_OBJECT (this, "validated buffer packet header");
buffer = gst_dp_buffer_from_header (header_length, header);
GST_LOG_OBJECT (this, "created new buffer %p from packet header", buffer);
return GST_DATA (buffer);
}
static GstData *
gst_tcpserversrc_get (GstPad * pad)
{
GstTCPServerSrc *src;
size_t readsize;
int ret;
GstData *data = NULL;
GstBuffer *buf = NULL;
GstCaps *caps;
g_return_val_if_fail (pad != NULL, NULL);
g_return_val_if_fail (GST_IS_PAD (pad), NULL);
src = GST_TCPSERVERSRC (GST_OBJECT_PARENT (pad));
g_return_val_if_fail (GST_FLAG_IS_SET (src, GST_TCPSERVERSRC_OPEN), NULL);
/* read the buffer header if we're using a protocol */
switch (src->protocol) {
fd_set testfds;
case GST_TCP_PROTOCOL_TYPE_NONE:
/* do a blocking select on the socket */
FD_ZERO (&testfds);
FD_SET (src->client_sock_fd, &testfds);
ret =
select (src->client_sock_fd + 1, &testfds, (fd_set *) 0, (fd_set *) 0,
0);
/* no action (0) is an error too in our case */
if (ret <= 0) {
GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
("select failed: %s", g_strerror (errno)));
return NULL;
}
/* ask how much is available for reading on the socket */
ret = ioctl (src->client_sock_fd, FIONREAD, &readsize);
if (ret < 0) {
GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
("ioctl failed: %s", g_strerror (errno)));
return NULL;
}
buf = gst_buffer_new_and_alloc (readsize);
break;
case GST_TCP_PROTOCOL_TYPE_GDP:
/* if we haven't received caps yet, we should get them first */
if (!src->caps_received) {
gchar *string;
if (!(caps = gst_tcpserversrc_gdp_read_caps (src))) {
GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
("Could not read caps through GDP"));
return NULL;
}
src->caps_received = TRUE;
string = gst_caps_to_string (caps);
GST_DEBUG_OBJECT (src, "Received caps through GDP: %s", string);
g_free (string);
if (!gst_pad_try_set_caps (pad, caps)) {
g_warning ("Could not set caps");
return NULL;
}
}
/* now receive the buffer header */
if (!(data = gst_tcpserversrc_gdp_read_header (src))) {
GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
("Could not read data header through GDP"));
return NULL;
}
if (GST_IS_EVENT (data))
return data;
buf = GST_BUFFER (data);
GST_LOG_OBJECT (src, "Going to read data from socket into buffer %p",
buf);
/* use this new buffer to read data into */
readsize = GST_BUFFER_SIZE (buf);
break;
default:
g_warning ("Unhandled protocol type");
break;
}
GST_LOG_OBJECT (src, "Reading %d bytes", readsize);
ret =
gst_tcp_socket_read (src->client_sock_fd, GST_BUFFER_DATA (buf),
readsize);
if (ret < 0) {
GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
return NULL;
}
/* if we read 0 bytes, and we're blocking, we hit eos */
if (ret == 0) {
GST_DEBUG ("blocking read returns 0, EOS");
gst_buffer_unref (buf);
gst_element_set_eos (GST_ELEMENT (src));
return GST_DATA (gst_event_new (GST_EVENT_EOS));
}
readsize = ret;
GST_LOG_OBJECT (src, "Read %d bytes", readsize);
GST_BUFFER_SIZE (buf) = readsize;
GST_BUFFER_MAXSIZE (buf) = readsize;
GST_BUFFER_OFFSET (buf) = src->curoffset;
GST_BUFFER_OFFSET_END (buf) = src->curoffset + readsize;
src->curoffset += readsize;
return GST_DATA (buf);
}
static void
gst_tcpserversrc_set_property (GObject * object, guint prop_id,
const GValue * value, GParamSpec * pspec)
{
GstTCPServerSrc *tcpserversrc;
g_return_if_fail (GST_IS_TCPSERVERSRC (object));
tcpserversrc = GST_TCPSERVERSRC (object);
switch (prop_id) {
case ARG_PORT:
tcpserversrc->server_port = g_value_get_int (value);
break;
case ARG_PROTOCOL:
tcpserversrc->protocol = g_value_get_enum (value);
break;
case ARG_HOST:
if (tcpserversrc->host)
g_free (tcpserversrc->host);
tcpserversrc->host = g_strdup (g_value_get_string (value));
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
}
}
static void
gst_tcpserversrc_get_property (GObject * object, guint prop_id, GValue * value,
GParamSpec * pspec)
{
GstTCPServerSrc *tcpserversrc;
g_return_if_fail (GST_IS_TCPSERVERSRC (object));
tcpserversrc = GST_TCPSERVERSRC (object);
switch (prop_id) {
case ARG_PORT:
g_value_set_int (value, tcpserversrc->server_port);
break;
case ARG_PROTOCOL:
g_value_set_enum (value, tcpserversrc->protocol);
break;
case ARG_HOST:
g_value_set_string (value, tcpserversrc->host);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
}
}
/* set up server */
static gboolean
gst_tcpserversrc_init_receive (GstTCPServerSrc * this)
{
int ret;
/* reset caps_received flag */
this->caps_received = FALSE;
/* create the server listener socket */
if ((this->server_sock_fd = socket (AF_INET, SOCK_STREAM, 0)) == -1) {
GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ, (NULL), GST_ERROR_SYSTEM);
return FALSE;
}
GST_DEBUG_OBJECT (this, "opened receiving server socket with fd %d",
this->server_sock_fd);
/* make address reusable */
if (setsockopt (this->server_sock_fd, SOL_SOCKET, SO_REUSEADDR, &ret,
sizeof (int)) < 0) {
GST_ELEMENT_ERROR (this, RESOURCE, SETTINGS, (NULL),
("Could not setsockopt: %s", g_strerror (errno)));
return FALSE;
}
/* name the socket */
memset (&this->server_sin, 0, sizeof (this->server_sin));
this->server_sin.sin_family = AF_INET; /* network socket */
this->server_sin.sin_port = htons (this->server_port); /* on port */
if (this->host) {
gchar *host = gst_tcp_host_to_ip (GST_ELEMENT (this), this->host);
if (!host)
return FALSE;
this->server_sin.sin_addr.s_addr = inet_addr (host);
g_free (host);
} else
this->server_sin.sin_addr.s_addr = htonl (INADDR_ANY);
/* bind it */
GST_DEBUG_OBJECT (this, "binding server socket to address");
ret = bind (this->server_sock_fd, (struct sockaddr *) &this->server_sin,
sizeof (this->server_sin));
if (ret) {
switch (errno) {
default:
GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ, (NULL),
("bind failed: %s", g_strerror (errno)));
return FALSE;
break;
}
}
GST_DEBUG_OBJECT (this, "listening on server socket %d with queue of %d",
this->server_sock_fd, TCP_BACKLOG);
if (listen (this->server_sock_fd, TCP_BACKLOG) == -1) {
GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ, (NULL),
("Could not listen on server socket: %s", g_strerror (errno)));
return FALSE;
}
/* FIXME: maybe we should think about moving actual client accepting
somewhere else */
GST_DEBUG_OBJECT (this, "waiting for client");
this->client_sock_fd =
accept (this->server_sock_fd, (struct sockaddr *) &this->client_sin,
&this->client_sin_len);
if (this->client_sock_fd == -1) {
GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ, (NULL),
("Could not accept client on server socket: %s", g_strerror (errno)));
return FALSE;
}
GST_DEBUG_OBJECT (this, "received client");
GST_FLAG_SET (this, GST_TCPSERVERSRC_OPEN);
return TRUE;
}
static void
gst_tcpserversrc_close (GstTCPServerSrc * this)
{
if (this->server_sock_fd != -1) {
close (this->server_sock_fd);
this->server_sock_fd = -1;
}
if (this->client_sock_fd != -1) {
close (this->client_sock_fd);
this->client_sock_fd = -1;
}
GST_FLAG_UNSET (this, GST_TCPSERVERSRC_OPEN);
}
static GstElementStateReturn
gst_tcpserversrc_change_state (GstElement * element)
{
g_return_val_if_fail (GST_IS_TCPSERVERSRC (element), GST_STATE_FAILURE);
if (GST_STATE_PENDING (element) == GST_STATE_NULL) {
if (GST_FLAG_IS_SET (element, GST_TCPSERVERSRC_OPEN))
gst_tcpserversrc_close (GST_TCPSERVERSRC (element));
} else {
if (!GST_FLAG_IS_SET (element, GST_TCPSERVERSRC_OPEN)) {
if (!gst_tcpserversrc_init_receive (GST_TCPSERVERSRC (element)))
return GST_STATE_FAILURE;
}
}
if (GST_ELEMENT_CLASS (parent_class)->change_state)
return GST_ELEMENT_CLASS (parent_class)->change_state (element);
return GST_STATE_SUCCESS;
}

98
gst/tcp/gsttcpserversrc.h Normal file
View file

@ -0,0 +1,98 @@
/* GStreamer
* Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
* Copyright (C) <2004> Thomas Vander Stichele <thomas at apestaart dot org>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 59 Temple Place - Suite 330,
* Boston, MA 02111-1307, USA.
*/
#ifndef __GST_TCPSERVERSRC_H__
#define __GST_TCPSERVERSRC_H__
#include <gst/gst.h>
#ifdef __cplusplus
extern "C" {
#endif /* __cplusplus */
#include <errno.h>
#include <string.h>
#include <sys/types.h>
#include <netdb.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "gsttcp.h"
#include <fcntl.h>
#define GST_TYPE_TCPSERVERSRC \
(gst_tcpserversrc_get_type())
#define GST_TCPSERVERSRC(obj) \
(G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_TCPSERVERSRC,GstTCPServerSrc))
#define GST_TCPSERVERSRC_CLASS(klass) \
(G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_TCPSERVERSRC,GstTCPServerSrc))
#define GST_IS_TCPSERVERSRC(obj) \
(G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_TCPSERVERSRC))
#define GST_IS_TCPSERVERSRC_CLASS(obj) \
(G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_TCPSERVERSRC))
typedef struct _GstTCPServerSrc GstTCPServerSrc;
typedef struct _GstTCPServerSrcClass GstTCPServerSrcClass;
typedef enum {
GST_TCPSERVERSRC_OPEN = GST_ELEMENT_FLAG_LAST,
GST_TCPSERVERSRC_FLAG_LAST,
} GstTCPServerSrcFlags;
struct _GstTCPServerSrc {
GstElement element;
/* pad */
GstPad *srcpad;
/* server information */
int server_port;
gchar *host;
struct sockaddr_in server_sin;
int server_sock_fd;
/* client information */
struct sockaddr_in client_sin;
socklen_t client_sin_len;
int client_sock_fd;
/* number of bytes we've gotten */
off_t curoffset;
GstTCPProtocolType protocol; /* protocol used for reading data */
gboolean caps_received; /* if we have received caps yet */
GstClock *clock;
};
struct _GstTCPServerSrcClass {
GstElementClass parent_class;
};
GType gst_tcpserversrc_get_type (void);
#ifdef __cplusplus
}
#endif /* __cplusplus */
#endif /* __GST_TCPSERVERSRC_H__ */