Ported tcp plugins to 0.9.

Original commit message from CVS:
* configure.ac:
* gst/tcp/Makefile.am:
* gst/tcp/README:
* gst/tcp/gstmultifdsink.c: (gst_multifdsink_get_type),
(gst_multifdsink_base_init), (gst_multifdsink_class_init),
(gst_multifdsink_init), (gst_multifdsink_remove_client_link),
(is_sync_frame), (gst_multifdsink_handle_client_write),
(gst_multifdsink_render), (gst_multifdsink_start),
(gst_multifdsink_stop), (gst_multifdsink_change_state):
* gst/tcp/gstmultifdsink.h:
* gst/tcp/gsttcp.c: (gst_tcp_host_to_ip),
(gst_tcp_gdp_read_buffer), (gst_tcp_gdp_read_caps),
(gst_tcp_gdp_write_buffer), (gst_tcp_gdp_write_caps):
* gst/tcp/gsttcp.h:
* gst/tcp/gsttcpclientsink.c: (gst_tcpclientsink_class_init),
(gst_tcpclientsink_init), (gst_tcpclientsink_setcaps),
(gst_tcpclientsink_render), (gst_tcpclientsink_start),
(gst_tcpclientsink_stop), (gst_tcpclientsink_change_state):
* gst/tcp/gsttcpclientsink.h:
* gst/tcp/gsttcpclientsrc.c: (gst_tcpclientsrc_get_type),
(gst_tcpclientsrc_base_init), (gst_tcpclientsrc_class_init),
(gst_tcpclientsrc_init), (gst_tcpclientsrc_getcaps),
(gst_tcpclientsrc_create), (gst_tcpclientsrc_start),
(gst_tcpclientsrc_stop), (gst_tcpclientsrc_unlock):
* gst/tcp/gsttcpclientsrc.h:
* gst/tcp/gsttcpplugin.c: (plugin_init):
* gst/tcp/gsttcpserversink.c: (gst_tcpserversink_class_init):
* gst/tcp/gsttcpserversink.h:
* gst/tcp/gsttcpserversrc.c: (gst_tcpserversrc_get_type),
(gst_tcpserversrc_base_init), (gst_tcpserversrc_class_init),
(gst_tcpserversrc_init), (gst_tcpserversrc_finalize),
(gst_tcpserversrc_create), (gst_tcpserversrc_start),
(gst_tcpserversrc_stop):
* gst/tcp/gsttcpserversrc.h:
* gst/tcp/gsttcpsink.c:
* gst/tcp/gsttcpsink.h:
* gst/tcp/gsttcpsrc.c:
* gst/tcp/gsttcpsrc.h:
Ported tcp plugins to 0.9.
This commit is contained in:
Wim Taymans 2005-07-05 10:21:40 +00:00
parent 207c8ee79a
commit e9de36e38c
21 changed files with 857 additions and 2002 deletions

View file

@ -1,3 +1,46 @@
2005-07-05 Wim Taymans <wim@fluendo.com>
* configure.ac:
* gst/tcp/Makefile.am:
* gst/tcp/README:
* gst/tcp/gstmultifdsink.c: (gst_multifdsink_get_type),
(gst_multifdsink_base_init), (gst_multifdsink_class_init),
(gst_multifdsink_init), (gst_multifdsink_remove_client_link),
(is_sync_frame), (gst_multifdsink_handle_client_write),
(gst_multifdsink_render), (gst_multifdsink_start),
(gst_multifdsink_stop), (gst_multifdsink_change_state):
* gst/tcp/gstmultifdsink.h:
* gst/tcp/gsttcp.c: (gst_tcp_host_to_ip),
(gst_tcp_gdp_read_buffer), (gst_tcp_gdp_read_caps),
(gst_tcp_gdp_write_buffer), (gst_tcp_gdp_write_caps):
* gst/tcp/gsttcp.h:
* gst/tcp/gsttcpclientsink.c: (gst_tcpclientsink_class_init),
(gst_tcpclientsink_init), (gst_tcpclientsink_setcaps),
(gst_tcpclientsink_render), (gst_tcpclientsink_start),
(gst_tcpclientsink_stop), (gst_tcpclientsink_change_state):
* gst/tcp/gsttcpclientsink.h:
* gst/tcp/gsttcpclientsrc.c: (gst_tcpclientsrc_get_type),
(gst_tcpclientsrc_base_init), (gst_tcpclientsrc_class_init),
(gst_tcpclientsrc_init), (gst_tcpclientsrc_getcaps),
(gst_tcpclientsrc_create), (gst_tcpclientsrc_start),
(gst_tcpclientsrc_stop), (gst_tcpclientsrc_unlock):
* gst/tcp/gsttcpclientsrc.h:
* gst/tcp/gsttcpplugin.c: (plugin_init):
* gst/tcp/gsttcpserversink.c: (gst_tcpserversink_class_init):
* gst/tcp/gsttcpserversink.h:
* gst/tcp/gsttcpserversrc.c: (gst_tcpserversrc_get_type),
(gst_tcpserversrc_base_init), (gst_tcpserversrc_class_init),
(gst_tcpserversrc_init), (gst_tcpserversrc_finalize),
(gst_tcpserversrc_create), (gst_tcpserversrc_start),
(gst_tcpserversrc_stop):
* gst/tcp/gsttcpserversrc.h:
* gst/tcp/gsttcpsink.c:
* gst/tcp/gsttcpsink.h:
* gst/tcp/gsttcpsrc.c:
* gst/tcp/gsttcpsrc.h:
Ported tcp plugins to 0.9.
2005-07-05 Andy Wingo <wingo@pobox.com>
* gst/playback/gstplaybasebin.c (fill_buffer):

View file

@ -237,6 +237,16 @@ fi
AC_SUBST(GST_CONTROL_LIBS)
dnl check for gstreamer-dataprotocol; uninstalled is selected preferentially
PKG_CHECK_MODULES(GST_GDP, gstreamer-dataprotocol-$GST_MAJORMINOR >= $GST_REQ,
HAVE_GST_GDP="yes", HAVE_GST_GDP="no")
if test "x$HAVE_GST_GDP" = "xno"; then
AC_MSG_ERROR(no GStreamer Dataprotocol Libs found)
fi
AC_SUBST(GST_GDP_LIBS)
PKG_CHECK_MODULES(GST_BASE, gstreamer-base-$GST_MAJORMINOR >= $GST_REQ,
HAVE_GST_BASE="yes", HAVE_GST_BASE="no")
@ -375,6 +385,7 @@ GST_PLUGINS_ALL="\
playback \
sine \
subparse \
tcp \
typefind \
videotestsrc \
videorate \
@ -883,6 +894,7 @@ gst/ffmpegcolorspace/Makefile
gst/playback/Makefile
gst/sine/Makefile
gst/subparse/Makefile
gst/tcp/Makefile
gst/typefind/Makefile
gst/videotestsrc/Makefile
gst/videorate/Makefile

View file

@ -14,7 +14,6 @@ BUILT_SOURCES = $(built_sources) $(built_headers)
libgsttcp_la_SOURCES = \
gsttcpplugin.c \
gsttcpsrc.c gsttcpsink.c \
gsttcp.c \
gstfdset.c \
gstmultifdsink.c \
@ -27,11 +26,10 @@ nodist_libgsttcp_la_SOURCES = \
# remove ENABLE_NEW when dataprotocol is stable
libgsttcp_la_CFLAGS = $(GST_CFLAGS) -DGST_ENABLE_NEW
libgsttcp_la_LIBADD =
libgsttcp_la_LDFLAGS = $(GST_PLUGIN_LDFLAGS)
libgsttcp_la_LDFLAGS = $(GST_PLUGIN_LDFLAGS) $(GST_BASE_LIBS) $(GST_GDP_LIBS)
noinst_HEADERS = \
gsttcpplugin.h \
gsttcpsrc.h gsttcpsink.h \
gsttcp.h \
gstfdset.h \
gstmultifdsink.h \

View file

@ -4,8 +4,6 @@ This part of the documentation is for the new tcp elements:
- tcpserversrc
- tcpserversink
which are created to replace the old tcpsrc/tcpsink
TESTS
-----
Use these tests to test functionality of the various tcp plugins
@ -31,33 +29,3 @@ TODO
----
- implement DNS resolution
--------
This is the old documentation for the original tcpsrc/tcpsink elements.
* What is TCP src/sink?
solution, like icecast or realaudio or whatever.
But the future RTP plugins shall not do the actual transmission/reception
of packets on the network themselve but the Application developer would be
encouraged to use either the TCP or the UDP plugins for that. UDP would be
used mostly but there could be situations where TCP would be the only
available choice. For example streaming accross firewalls that do not
allow UDP.
* Shortcomings
Even given our modest ambitions, the current code doesn't handle
caps negotiation robustly.
* Todo
The caps nego should do bi-directional negotiation.
Perhaps this plugin can be the example of how to do caps negotiation
via a point-to-point protocol.
12 Sep 2001
Wim Taymans <wim.taymans@chello.be>
Joshua N Pritikin <vishnu@pobox.com>
Zeeshan Ali <zak147@yahoo.com>

View file

@ -66,6 +66,11 @@ GST_ELEMENT_DETAILS ("MultiFd sink",
"Thomas Vander Stichele <thomas at apestaart dot org>, "
"Wim Taymans <wim@fluendo.com>");
static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
GST_PAD_SINK,
GST_PAD_ALWAYS,
GST_STATIC_CAPS_ANY);
GST_DEBUG_CATEGORY (multifdsink_debug);
#define GST_CAT_DEFAULT (multifdsink_debug)
@ -215,7 +220,8 @@ static void gst_multifdsink_init (GstMultiFdSink * multifdsink);
static void gst_multifdsink_remove_client_link (GstMultiFdSink * sink,
GList * link);
static void gst_multifdsink_chain (GstPad * pad, GstData * _data);
static GstFlowReturn gst_multifdsink_render (GstBaseSink * bsink,
GstBuffer * buf);
static GstElementStateReturn gst_multifdsink_change_state (GstElement *
element);
@ -250,7 +256,7 @@ gst_multifdsink_get_type (void)
};
multifdsink_type =
g_type_register_static (GST_TYPE_ELEMENT, "GstMultiFdSink",
g_type_register_static (GST_TYPE_BASESINK, "GstMultiFdSink",
&multifdsink_info, 0);
}
return multifdsink_type;
@ -261,6 +267,9 @@ gst_multifdsink_base_init (gpointer g_class)
{
GstElementClass *element_class = GST_ELEMENT_CLASS (g_class);
gst_element_class_add_pad_template (element_class,
gst_static_pad_template_get (&sinktemplate));
gst_element_class_set_details (element_class, &gst_multifdsink_details);
}
@ -269,11 +278,16 @@ gst_multifdsink_class_init (GstMultiFdSinkClass * klass)
{
GObjectClass *gobject_class;
GstElementClass *gstelement_class;
GstBaseSinkClass *gstbasesink_class;
gobject_class = (GObjectClass *) klass;
gstelement_class = (GstElementClass *) klass;
gstbasesink_class = (GstBaseSinkClass *) klass;
parent_class = g_type_class_ref (GST_TYPE_ELEMENT);
parent_class = g_type_class_ref (GST_TYPE_BASESINK);
gobject_class->set_property = gst_multifdsink_set_property;
gobject_class->get_property = gst_multifdsink_get_property;
g_object_class_install_property (gobject_class, ARG_PROTOCOL,
g_param_spec_enum ("protocol", "Protocol", "The protocol to wrap data in",
@ -375,11 +389,10 @@ gst_multifdsink_class_init (GstMultiFdSinkClass * klass)
client_removed), NULL, NULL, gst_tcp_marshal_VOID__INT_BOXED,
G_TYPE_NONE, 2, G_TYPE_INT, GST_TYPE_CLIENT_STATUS);
gobject_class->set_property = gst_multifdsink_set_property;
gobject_class->get_property = gst_multifdsink_get_property;
gstelement_class->change_state = gst_multifdsink_change_state;
gstbasesink_class->render = gst_multifdsink_render;
klass->add = gst_multifdsink_add;
klass->remove = gst_multifdsink_remove;
klass->clear = gst_multifdsink_clear;
@ -391,11 +404,6 @@ gst_multifdsink_class_init (GstMultiFdSinkClass * klass)
static void
gst_multifdsink_init (GstMultiFdSink * this)
{
/* create the sink pad */
this->sinkpad = gst_pad_new ("sink", GST_PAD_SINK);
gst_element_add_pad (GST_ELEMENT (this), this->sinkpad);
gst_pad_set_chain_function (this->sinkpad, gst_multifdsink_chain);
GST_FLAG_UNSET (this, GST_MULTIFDSINK_OPEN);
this->protocol = DEFAULT_PROTOCOL;
@ -636,7 +644,7 @@ gst_multifdsink_remove_client_link (GstMultiFdSink * sink, GList * link)
client->disconnect_time = GST_TIMEVAL_TO_TIME (now);
/* free client buffers */
g_slist_foreach (client->sending, (GFunc) gst_data_unref, NULL);
g_slist_foreach (client->sending, (GFunc) gst_mini_object_unref, NULL);
g_slist_free (client->sending);
client->sending = NULL;
@ -777,9 +785,9 @@ gst_multifdsink_client_queue_caps (GstMultiFdSink * sink, GstTCPClient * client,
static gboolean
is_sync_frame (GstMultiFdSink * sink, GstBuffer * buffer)
{
if (GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_DELTA_UNIT)) {
if (GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_DELTA_UNIT)) {
return FALSE;
} else if (!GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_IN_CAPS)) {
} else if (!GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_IN_CAPS)) {
return TRUE;
}
return FALSE;
@ -934,7 +942,8 @@ gst_multifdsink_handle_client_write (GstMultiFdSink * sink,
/* when using GDP, first check if we have queued caps yet */
if (sink->protocol == GST_TCP_PROTOCOL_TYPE_GDP) {
if (!client->caps_sent) {
const GstCaps *caps = GST_PAD_CAPS (GST_PAD_PEER (sink->sinkpad));
const GstCaps *caps =
GST_PAD_CAPS (GST_PAD_PEER (GST_BASESINK_PAD (sink)));
/* queue caps for sending */
res = gst_multifdsink_client_queue_caps (sink, client, caps);
@ -1443,32 +1452,28 @@ gst_multifdsink_thread (GstMultiFdSink * sink)
return NULL;
}
static void
gst_multifdsink_chain (GstPad * pad, GstData * _data)
static GstFlowReturn
gst_multifdsink_render (GstBaseSink * bsink, GstBuffer * buf)
{
GstBuffer *buf = GST_BUFFER (_data);
GstMultiFdSink *sink;
g_return_if_fail (pad != NULL);
g_return_if_fail (GST_IS_PAD (pad));
g_return_if_fail (buf != NULL);
sink = GST_MULTIFDSINK (GST_OBJECT_PARENT (pad));
g_return_if_fail (GST_FLAG_IS_SET (sink, GST_MULTIFDSINK_OPEN));
sink = GST_MULTIFDSINK (bsink);
if (GST_IS_EVENT (buf)) {
g_warning ("FIXME: handle events");
return;
}
/* since we keep this buffer out of the scope of this method */
gst_buffer_ref (buf);
g_return_val_if_fail (GST_FLAG_IS_SET (sink, GST_MULTIFDSINK_OPEN),
GST_FLOW_ERROR);
GST_LOG_OBJECT (sink, "received buffer %p", buf);
/* if we get IN_CAPS buffers, but the previous buffer was not IN_CAPS,
* it means we're getting new streamheader buffers, and we should clear
* the old ones */
if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_IN_CAPS) &&
if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_IN_CAPS) &&
sink->previous_buffer_in_caps == FALSE) {
GST_DEBUG_OBJECT (sink,
"receiving new IN_CAPS buffers, clearing old streamheader");
g_slist_foreach (sink->streamheader, (GFunc) gst_data_unref, NULL);
g_slist_foreach (sink->streamheader, (GFunc) gst_mini_object_unref, NULL);
g_slist_free (sink->streamheader);
sink->streamheader = NULL;
}
@ -1478,13 +1483,13 @@ gst_multifdsink_chain (GstPad * pad, GstData * _data)
* After that we return, since we only send these out when we get
* non IN_CAPS buffers so we properly keep track of clients that got
* streamheaders. */
if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_IN_CAPS)) {
if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_IN_CAPS)) {
sink->previous_buffer_in_caps = TRUE;
GST_DEBUG_OBJECT (sink,
"appending IN_CAPS buffer with length %d to streamheader",
GST_BUFFER_SIZE (buf));
sink->streamheader = g_slist_append (sink->streamheader, buf);
return;
return GST_FLOW_OK;
}
sink->previous_buffer_in_caps = FALSE;
@ -1492,6 +1497,8 @@ gst_multifdsink_chain (GstPad * pad, GstData * _data)
gst_multifdsink_queue_buffer (sink, buf);
sink->bytes_to_serve += GST_BUFFER_SIZE (buf);
return GST_FLOW_OK;
}
static void
@ -1617,21 +1624,24 @@ gst_multifdsink_get_property (GObject * object, guint prop_id, GValue * value,
/* create a socket for sending to remote machine */
static gboolean
gst_multifdsink_init_send (GstMultiFdSink * this)
gst_multifdsink_start (GstBaseSink * bsink)
{
GstMultiFdSinkClass *fclass;
int control_socket[2];
GstMultiFdSink *this;
if (GST_FLAG_IS_SET (bsink, GST_MULTIFDSINK_OPEN))
return TRUE;
this = GST_MULTIFDSINK (bsink);
fclass = GST_MULTIFDSINK_GET_CLASS (this);
GST_INFO_OBJECT (this, "starting in mode %d", this->mode);
this->fdset = gst_fdset_new (this->mode);
if (socketpair (PF_UNIX, SOCK_STREAM, 0, control_socket) < 0) {
GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ_WRITE, (NULL),
GST_ERROR_SYSTEM);
return FALSE;
}
if (socketpair (PF_UNIX, SOCK_STREAM, 0, control_socket) < 0)
goto socket_pair;
READ_SOCKET (this).fd = control_socket[0];
WRITE_SOCKET (this).fd = control_socket[1];
@ -1653,16 +1663,31 @@ gst_multifdsink_init_send (GstMultiFdSink * this)
this->thread = g_thread_create ((GThreadFunc) gst_multifdsink_thread,
this, TRUE, NULL);
GST_FLAG_SET (this, GST_MULTIFDSINK_OPEN);
return TRUE;
/* ERRORS */
socket_pair:
{
GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ_WRITE, (NULL),
GST_ERROR_SYSTEM);
return FALSE;
}
}
static void
gst_multifdsink_close (GstMultiFdSink * this)
static gboolean
gst_multifdsink_stop (GstBaseSink * bsink)
{
GstMultiFdSinkClass *fclass;
GstMultiFdSink *this;
this = GST_MULTIFDSINK (bsink);
fclass = GST_MULTIFDSINK_GET_CLASS (this);
if (!GST_FLAG_IS_SET (bsink, GST_MULTIFDSINK_OPEN))
return TRUE;
this->running = FALSE;
SEND_COMMAND (this, CONTROL_STOP);
@ -1678,7 +1703,7 @@ gst_multifdsink_close (GstMultiFdSink * this)
close (WRITE_SOCKET (this).fd);
if (this->streamheader) {
g_slist_foreach (this->streamheader, (GFunc) gst_data_unref, NULL);
g_slist_foreach (this->streamheader, (GFunc) gst_mini_object_unref, NULL);
g_slist_free (this->streamheader);
this->streamheader = NULL;
}
@ -1691,46 +1716,55 @@ gst_multifdsink_close (GstMultiFdSink * this)
gst_fdset_free (this->fdset);
this->fdset = NULL;
}
GST_FLAG_UNSET (this, GST_MULTIFDSINK_OPEN);
return TRUE;
}
static GstElementStateReturn
gst_multifdsink_change_state (GstElement * element)
{
GstMultiFdSink *sink;
gint transition;
GstElementStateReturn ret;
g_return_val_if_fail (GST_IS_MULTIFDSINK (element), GST_STATE_FAILURE);
sink = GST_MULTIFDSINK (element);
/* we disallow changing the state from the streaming thread */
if (g_thread_self () == sink->thread)
return GST_STATE_FAILURE;
switch (GST_STATE_TRANSITION (element)) {
transition = GST_STATE_TRANSITION (element);
switch (transition) {
case GST_STATE_NULL_TO_READY:
if (!GST_FLAG_IS_SET (sink, GST_MULTIFDSINK_OPEN)) {
if (!gst_multifdsink_init_send (sink))
return GST_STATE_FAILURE;
GST_FLAG_SET (sink, GST_MULTIFDSINK_OPEN);
}
if (!gst_multifdsink_start (GST_BASESINK (sink)))
goto start_failed;
break;
case GST_STATE_READY_TO_PAUSED:
break;
case GST_STATE_PAUSED_TO_PLAYING:
break;
default:
break;
}
ret = GST_ELEMENT_CLASS (parent_class)->change_state (element);
switch (transition) {
case GST_STATE_PLAYING_TO_PAUSED:
break;
case GST_STATE_PAUSED_TO_READY:
break;
case GST_STATE_READY_TO_NULL:
if (GST_FLAG_IS_SET (sink, GST_MULTIFDSINK_OPEN)) {
gst_multifdsink_close (GST_MULTIFDSINK (element));
GST_FLAG_UNSET (sink, GST_MULTIFDSINK_OPEN);
}
gst_multifdsink_stop (GST_BASESINK (sink));
break;
}
return ret;
if (GST_ELEMENT_CLASS (parent_class)->change_state)
return GST_ELEMENT_CLASS (parent_class)->change_state (element);
return GST_STATE_SUCCESS;
/* ERRORS */
start_failed:
{
return GST_STATE_FAILURE;
}
}

View file

@ -22,12 +22,10 @@
#ifndef __GST_MULTIFDSINK_H__
#define __GST_MULTIFDSINK_H__
#include <gst/gst.h>
#include <gst/base/gstbasesink.h>
#ifdef __cplusplus
extern "C" {
#endif /* __cplusplus */
G_BEGIN_DECLS
#include "gsttcp.h"
#include "gstfdset.h"
@ -119,10 +117,7 @@ typedef struct {
} GstTCPClient;
struct _GstMultiFdSink {
GstElement element;
/* pad */
GstPad *sinkpad;
GstBaseSink element;
guint64 bytes_to_serve; /* how much bytes we must serve */
guint64 bytes_served; /* how much bytes have we served */
@ -161,7 +156,7 @@ struct _GstMultiFdSink {
};
struct _GstMultiFdSinkClass {
GstElementClass parent_class;
GstBaseSinkClass parent_class;
/* element methods */
void (*add) (GstMultiFdSink *sink, int fd);
@ -187,10 +182,6 @@ void gst_multifdsink_remove (GstMultiFdSink *sink, int fd);
void gst_multifdsink_clear (GstMultiFdSink *sink);
GValueArray* gst_multifdsink_get_stats (GstMultiFdSink *sink, int fd);
#ifdef __cplusplus
}
#endif /* __cplusplus */
G_END_DECLS
#endif /* __GST_MULTIFDSINK_H__ */

View file

@ -56,35 +56,42 @@ gst_tcp_host_to_ip (GstElement * element, const gchar * host)
struct in_addr addr;
GST_DEBUG_OBJECT (element, "resolving host %s", host);
/* first check if it already is an IP address */
if (inet_aton (host, &addr)) {
ip = g_strdup (host);
goto beach;
}
/* FIXME: could do a localhost check here */
/* perform a name lookup */
hostinfo = gethostbyname (host);
if (!hostinfo) {
GST_ELEMENT_ERROR (element, RESOURCE, NOT_FOUND, (NULL),
("Could not find IP address for host \"%s\".", host));
return NULL;
}
if (!(hostinfo = gethostbyname (host)))
goto resolve_error;
if (hostinfo->h_addrtype != AF_INET) {
GST_ELEMENT_ERROR (element, RESOURCE, NOT_FOUND, (NULL),
("host \"%s\" is not an IP host", host));
return NULL;
}
if (hostinfo->h_addrtype != AF_INET)
goto not_ip;
addrs = hostinfo->h_addr_list;
/* There could be more than one IP address, but we just return the first */
ip = g_strdup (inet_ntoa (*(struct in_addr *) *addrs));
beach:
GST_DEBUG_OBJECT (element, "resolved to IP %s", ip);
return ip;
resolve_error:
{
GST_ELEMENT_ERROR (element, RESOURCE, NOT_FOUND, (NULL),
("Could not find IP address for host \"%s\".", host));
return NULL;
}
not_ip:
{
GST_ELEMENT_ERROR (element, RESOURCE, NOT_FOUND, (NULL),
("host \"%s\" is not an IP host", host));
return NULL;
}
}
/* write buffer to given socket incrementally.
@ -149,15 +156,14 @@ gst_tcp_socket_close (int *socket)
*socket = -1;
}
/* read the gdp buffer header from the given socket
/* read a buffer from the given socket
* returns:
* - a GstData representing a GstBuffer in which data should be read
* - a GstData representing a GstEvent
* - a GstBuffer in which data should be read
* - NULL, indicating a connection close or an error, to be handled with
* EOS
*/
GstData *
gst_tcp_gdp_read_header (GstElement * this, int socket)
GstBuffer *
gst_tcp_gdp_read_buffer (GstElement * this, int socket)
{
size_t header_length = GST_DP_HEADER_LENGTH;
size_t readsize;
@ -169,36 +175,51 @@ gst_tcp_gdp_read_header (GstElement * this, int socket)
readsize = header_length;
GST_LOG_OBJECT (this, "Reading %d bytes for buffer packet header", readsize);
ret = gst_tcp_socket_read (socket, header, readsize);
/* if we read 0 bytes, and we're blocking, we hit eos */
if (ret == 0) {
GST_DEBUG ("blocking read returns 0, returning NULL");
g_free (header);
return NULL;
}
if (ret < 0) {
GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
g_free (header);
return NULL;
}
if (ret != readsize) {
g_warning ("Wanted %d bytes, got %d bytes", (int) readsize, (int) ret);
}
g_assert (ret == readsize);
if ((ret = gst_tcp_socket_read (socket, header, readsize)) <= 0)
goto read_error;
if (ret != readsize)
goto short_read;
if (!gst_dp_validate_header (header_length, header))
goto validate_error;
if (!gst_dp_validate_header (header_length, header)) {
GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
("GDP buffer packet header does not validate"));
g_free (header);
return NULL;
}
GST_LOG_OBJECT (this, "validated buffer packet header");
buffer = gst_dp_buffer_from_header (header_length, header);
g_free (header);
GST_LOG_OBJECT (this, "created new buffer %p from packet header", buffer);
return GST_DATA (buffer);
return buffer;
/* ERRORS */
read_error:
{
if (ret == 0) {
/* if we read 0 bytes, and we're blocking, we hit eos */
GST_DEBUG ("blocking read returns 0, returning NULL");
g_free (header);
return NULL;
} else {
GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
g_free (header);
return NULL;
}
}
short_read:
{
GST_WARNING ("Wanted %d bytes, got %d bytes", (int) readsize, (int) ret);
g_warning ("Wanted %d bytes, got %d bytes", (int) readsize, (int) ret);
return NULL;
}
validate_error:
{
GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
("GDP buffer packet header does not validate"));
g_free (header);
return NULL;
}
}
/* read the GDP caps packet from the given socket
@ -218,56 +239,29 @@ gst_tcp_gdp_read_caps (GstElement * this, int socket)
readsize = header_length;
GST_LOG_OBJECT (this, "Reading %d bytes for caps packet header", readsize);
ret = gst_tcp_socket_read (socket, header, readsize);
if (ret < 0) {
g_free (header);
GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
return NULL;
}
if (ret == 0) {
GST_WARNING_OBJECT (this, "read returned EOF");
return NULL;
}
if (ret != readsize) {
GST_WARNING_OBJECT (this, "Tried to read %d bytes but only read %d bytes",
readsize, ret);
return NULL;
}
if ((ret = gst_tcp_socket_read (socket, header, readsize)) <= 0)
goto read_error;
if (!gst_dp_validate_header (header_length, header)) {
GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
("GDP caps packet header does not validate"));
g_free (header);
return NULL;
}
if (ret != readsize)
goto short_read;
if (!gst_dp_validate_header (header_length, header))
goto validate_error;
readsize = gst_dp_header_payload_length (header);
payload = g_malloc (readsize);
GST_LOG_OBJECT (this, "Reading %d bytes for caps packet payload", readsize);
ret = gst_tcp_socket_read (socket, payload, readsize);
if (ret < 0) {
GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
g_free (header);
g_free (payload);
return NULL;
}
if (gst_dp_header_payload_type (header) != GST_DP_PAYLOAD_CAPS) {
GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
("Header read doesn't describe CAPS payload"));
g_free (header);
g_free (payload);
return NULL;
}
GST_LOG_OBJECT (this, "Reading %d bytes for caps packet payload", readsize);
if ((ret = gst_tcp_socket_read (socket, payload, readsize)) < 0)
goto socket_read_error;
if (gst_dp_header_payload_type (header) != GST_DP_PAYLOAD_CAPS)
goto is_not_caps;
g_assert (ret == readsize);
if (!gst_dp_validate_payload (readsize, header, payload)) {
GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
("GDP caps packet payload does not validate"));
g_free (header);
g_free (payload);
return NULL;
}
if (!gst_dp_validate_payload (readsize, header, payload))
goto packet_validate_error;
caps = gst_dp_caps_from_packet (header_length, header, payload);
string = gst_caps_to_string (caps);
@ -278,28 +272,89 @@ gst_tcp_gdp_read_caps (GstElement * this, int socket)
g_free (payload);
return caps;
/* ERRORS */
read_error:
{
if (ret < 0) {
g_free (header);
GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
return NULL;
}
if (ret == 0) {
GST_WARNING_OBJECT (this, "read returned EOF");
return NULL;
}
}
short_read:
{
GST_WARNING_OBJECT (this, "Tried to read %d bytes but only read %d bytes",
readsize, ret);
return NULL;
}
validate_error:
{
GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
("GDP caps packet header does not validate"));
g_free (header);
return NULL;
}
socket_read_error:
{
GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
g_free (header);
g_free (payload);
return NULL;
}
is_not_caps:
{
GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
("Header read doesn't describe CAPS payload"));
g_free (header);
g_free (payload);
return NULL;
}
packet_validate_error:
{
GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
("GDP caps packet payload does not validate"));
g_free (header);
g_free (payload);
return NULL;
}
}
/* write a GDP header to the socket. Return false if fails. */
gboolean
gst_tcp_gdp_write_header (GstElement * this, int socket, GstBuffer * buffer,
gst_tcp_gdp_write_buffer (GstElement * this, int socket, GstBuffer * buffer,
gboolean fatal, const gchar * host, int port)
{
guint length;
guint8 *header;
size_t wrote;
if (!gst_dp_header_from_buffer (buffer, 0, &length, &header)) {
if (!gst_dp_header_from_buffer (buffer, 0, &length, &header))
goto create_error;
GST_LOG_OBJECT (this, "writing %d bytes for GDP buffer header", length);
wrote = gst_tcp_socket_write (socket, header, length);
g_free (header);
if (wrote != length)
goto write_error;
return TRUE;
/* ERRORS */
create_error:
{
if (fatal)
GST_ELEMENT_ERROR (this, CORE, TOO_LAZY, (NULL),
("Could not create GDP header from buffer"));
return FALSE;
}
GST_LOG_OBJECT (this, "writing %d bytes for GDP buffer header", length);
wrote = gst_tcp_socket_write (socket, header, length);
g_free (header);
if (wrote != length) {
write_error:
{
if (fatal)
GST_ELEMENT_ERROR (this, RESOURCE, WRITE,
(_("Error while sending data to \"%s:%d\"."), host, port),
@ -307,8 +362,6 @@ gst_tcp_gdp_write_header (GstElement * this, int socket, GstBuffer * buffer,
wrote, GST_BUFFER_SIZE (buffer), g_strerror (errno)));
return FALSE;
}
return TRUE;
}
/* write GDP header and payload to the given socket for the given caps.
@ -322,15 +375,36 @@ gst_tcp_gdp_write_caps (GstElement * this, int socket, const GstCaps * caps,
guint8 *payload;
size_t wrote;
if (!gst_dp_packet_from_caps (caps, 0, &length, &header, &payload)) {
if (!gst_dp_packet_from_caps (caps, 0, &length, &header, &payload))
goto create_error;
GST_LOG_OBJECT (this, "writing %d bytes for GDP caps header", length);
wrote = gst_tcp_socket_write (socket, header, length);
if (wrote != length)
goto write_header_error;
length = gst_dp_header_payload_length (header);
g_free (header);
GST_LOG_OBJECT (this, "writing %d bytes for GDP caps payload", length);
wrote = gst_tcp_socket_write (socket, payload, length);
g_free (payload);
if (wrote != length)
goto write_payload_error;
return TRUE;
/* ERRORS */
create_error:
{
if (fatal)
GST_ELEMENT_ERROR (this, CORE, TOO_LAZY, (NULL),
("Could not create GDP packet from caps"));
return FALSE;
}
GST_LOG_OBJECT (this, "writing %d bytes for GDP caps header", length);
wrote = gst_tcp_socket_write (socket, header, length);
if (wrote != length) {
write_header_error:
{
g_free (header);
g_free (payload);
if (fatal)
@ -340,13 +414,8 @@ gst_tcp_gdp_write_caps (GstElement * this, int socket, const GstCaps * caps,
wrote, length, g_strerror (errno)));
return FALSE;
}
length = gst_dp_header_payload_length (header);
g_free (header);
GST_LOG_OBJECT (this, "writing %d bytes for GDP caps payload", length);
wrote = gst_tcp_socket_write (socket, payload, length);
g_free (payload);
if (wrote != length) {
write_payload_error:
{
if (fatal)
GST_ELEMENT_ERROR (this, RESOURCE, WRITE,
(_("Error while sending gdp payload data to \"%s:%d\"."), host, port),
@ -354,5 +423,4 @@ gst_tcp_gdp_write_caps (GstElement * this, int socket, const GstCaps * caps,
wrote, length, g_strerror (errno)));
return FALSE;
}
return TRUE;
}

View file

@ -46,11 +46,13 @@ gint gst_tcp_socket_read (int socket, void *buf, size_t count);
void gst_tcp_socket_close (int *socket);
GstData * gst_tcp_gdp_read_header (GstElement *this, int socket);
GstCaps * gst_tcp_gdp_read_caps (GstElement *this, int socket);
GstBuffer * gst_tcp_gdp_read_buffer (GstElement *elem, int socket);
GstEvent * gst_tcp_gdp_read_event (GstElement *elem, int socket);
GstCaps * gst_tcp_gdp_read_caps (GstElement *elem, int socket);
gboolean gst_tcp_gdp_write_header (GstElement *this, int socket, GstBuffer *buffer, gboolean fatal, const gchar *host, int port);
gboolean gst_tcp_gdp_write_caps (GstElement *this, int socket, const GstCaps *caps, gboolean fatal, const gchar *host, int port);
gboolean gst_tcp_gdp_write_buffer (GstElement *elem, int socket, GstBuffer *buffer, gboolean fatal, const gchar *host, int port);
gboolean gst_tcp_gdp_write_event (GstElement *elem, int socket, GstEvent *event, gboolean fatal, const gchar *host, int port);
gboolean gst_tcp_gdp_write_caps (GstElement *elem, int socket, const GstCaps *caps, gboolean fatal, const gchar *host, int port);
G_END_DECLS

View file

@ -58,10 +58,9 @@ static void gst_tcpclientsink_class_init (GstTCPClientSink * klass);
static void gst_tcpclientsink_init (GstTCPClientSink * tcpclientsink);
static void gst_tcpclientsink_finalize (GObject * gobject);
static void gst_tcpclientsink_set_clock (GstElement * element,
GstClock * clock);
static void gst_tcpclientsink_chain (GstPad * pad, GstData * _data);
static gboolean gst_tcpclientsink_setcaps (GstBaseSink * bsink, GstCaps * caps);
static GstFlowReturn gst_tcpclientsink_render (GstBaseSink * bsink,
GstBuffer * buf);
static GstElementStateReturn gst_tcpclientsink_change_state (GstElement *
element);
@ -115,11 +114,17 @@ gst_tcpclientsink_class_init (GstTCPClientSink * klass)
{
GObjectClass *gobject_class;
GstElementClass *gstelement_class;
GstBaseSinkClass *gstbasesink_class;
gobject_class = (GObjectClass *) klass;
gstelement_class = (GstElementClass *) klass;
gstbasesink_class = (GstBaseSinkClass *) klass;
parent_class = g_type_class_ref (GST_TYPE_ELEMENT);
parent_class = g_type_class_ref (GST_TYPE_BASESINK);
gobject_class->set_property = gst_tcpclientsink_set_property;
gobject_class->get_property = gst_tcpclientsink_get_property;
gobject_class->finalize = gst_tcpclientsink_finalize;
g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_HOST,
g_param_spec_string ("host", "Host", "The host/IP to send the packets to",
@ -131,44 +136,24 @@ gst_tcpclientsink_class_init (GstTCPClientSink * klass)
g_param_spec_enum ("protocol", "Protocol", "The protocol to wrap data in",
GST_TYPE_TCP_PROTOCOL_TYPE, GST_TCP_PROTOCOL_TYPE_NONE,
G_PARAM_READWRITE));
gobject_class->set_property = gst_tcpclientsink_set_property;
gobject_class->get_property = gst_tcpclientsink_get_property;
gobject_class->finalize = gst_tcpclientsink_finalize;
gstelement_class->change_state = gst_tcpclientsink_change_state;
gstelement_class->set_clock = gst_tcpclientsink_set_clock;
gstbasesink_class->set_caps = gst_tcpclientsink_setcaps;
gstbasesink_class->render = gst_tcpclientsink_render;
GST_DEBUG_CATEGORY_INIT (tcpclientsink_debug, "tcpclientsink", 0, "TCP sink");
}
static void
gst_tcpclientsink_set_clock (GstElement * element, GstClock * clock)
{
GstTCPClientSink *tcpclientsink;
tcpclientsink = GST_TCPCLIENTSINK (element);
tcpclientsink->clock = clock;
}
static void
gst_tcpclientsink_init (GstTCPClientSink * this)
{
/* create the sink pad */
this->sinkpad = gst_pad_new ("sink", GST_PAD_SINK);
gst_element_add_pad (GST_ELEMENT (this), this->sinkpad);
gst_pad_set_chain_function (this->sinkpad, gst_tcpclientsink_chain);
this->host = g_strdup (TCP_DEFAULT_HOST);
this->port = TCP_DEFAULT_PORT;
/* should support as minimum 576 for IPV4 and 1500 for IPV6 */
/* this->mtu = 1500; */
this->sock_fd = -1;
this->protocol = GST_TCP_PROTOCOL_TYPE_NONE;
GST_FLAG_UNSET (this, GST_TCPCLIENTSINK_OPEN);
this->clock = NULL;
}
static void
@ -179,24 +164,12 @@ gst_tcpclientsink_finalize (GObject * gobject)
g_free (this->host);
}
static void
gst_tcpclientsink_chain (GstPad * pad, GstData * _data)
static gboolean
gst_tcpclientsink_setcaps (GstBaseSink * bsink, GstCaps * caps)
{
size_t wrote = 0;
GstBuffer *buf = GST_BUFFER (_data);
GstTCPClientSink *sink;
g_return_if_fail (pad != NULL);
g_return_if_fail (GST_IS_PAD (pad));
g_return_if_fail (buf != NULL);
sink = GST_TCPCLIENTSINK (GST_OBJECT_PARENT (pad));
g_return_if_fail (GST_FLAG_IS_SET (sink, GST_TCPCLIENTSINK_OPEN));
if (GST_IS_EVENT (buf)) {
g_warning ("FIXME: handl events");
return;
}
sink = GST_TCPCLIENTSINK (bsink);
/* write the buffer header if we have one */
switch (sink->protocol) {
@ -209,44 +182,85 @@ gst_tcpclientsink_chain (GstPad * pad, GstData * _data)
const GstCaps *caps;
gchar *string;
caps = GST_PAD_CAPS (GST_PAD_PEER (pad));
caps = GST_PAD_CAPS (GST_PAD_PEER (GST_BASESINK_PAD (bsink)));
string = gst_caps_to_string (caps);
GST_DEBUG_OBJECT (sink, "Sending caps %s through GDP", string);
if (!gst_tcp_gdp_write_caps (GST_ELEMENT (sink), sink->sock_fd, caps,
TRUE, sink->host, sink->port)) {
g_free (string);
return;
}
g_free (string);
if (!gst_tcp_gdp_write_caps (GST_ELEMENT (sink), sink->sock_fd, caps,
TRUE, sink->host, sink->port))
goto gdp_write_error;
sink->caps_sent = TRUE;
}
GST_LOG_OBJECT (sink, "Sending buffer header through GDP");
if (!gst_tcp_gdp_write_header (GST_ELEMENT (sink), sink->sock_fd, buf,
TRUE, sink->host, sink->port))
return;
break;
default:
g_warning ("Unhandled protocol type");
break;
}
GST_LOG_OBJECT (sink, "writing %d bytes for buffer data",
GST_BUFFER_SIZE (buf));
wrote =
gst_tcp_socket_write (sink->sock_fd, GST_BUFFER_DATA (buf),
GST_BUFFER_SIZE (buf));
return TRUE;
if (wrote < GST_BUFFER_SIZE (buf)) {
/* ERRORS */
gdp_write_error:
{
return FALSE;
}
}
static GstFlowReturn
gst_tcpclientsink_render (GstBaseSink * bsink, GstBuffer * buf)
{
size_t wrote = 0;
GstTCPClientSink *sink;
gint size;
sink = GST_TCPCLIENTSINK (bsink);
g_return_val_if_fail (GST_FLAG_IS_SET (sink, GST_TCPCLIENTSINK_OPEN),
GST_FLOW_WRONG_STATE);
size = GST_BUFFER_SIZE (buf);
GST_LOG_OBJECT (sink, "writing %d bytes for buffer data", size);
/* write the buffer header if we have one */
switch (sink->protocol) {
case GST_TCP_PROTOCOL_TYPE_NONE:
break;
case GST_TCP_PROTOCOL_TYPE_GDP:
GST_LOG_OBJECT (sink, "Sending buffer header through GDP");
if (!gst_tcp_gdp_write_buffer (GST_ELEMENT (sink), sink->sock_fd, buf,
TRUE, sink->host, sink->port))
goto gdp_write_error;
break;
default:
break;
}
/* write buffer data */
wrote = gst_tcp_socket_write (sink->sock_fd, GST_BUFFER_DATA (buf), size);
if (wrote < size)
goto write_error;
sink->data_written += wrote;
return GST_FLOW_OK;
/* ERRORS */
gdp_write_error:
{
return FALSE;
}
write_error:
{
GST_ELEMENT_ERROR (sink, RESOURCE, WRITE,
(_("Error while sending data to \"%s:%d\"."), sink->host, sink->port),
("Only %d of %d bytes written: %s",
wrote, GST_BUFFER_SIZE (buf), g_strerror (errno)));
return GST_FLOW_ERROR;
}
sink->data_written += wrote;
gst_buffer_unref (buf);
/* FIXME: emit signal ? */
}
static void
@ -311,11 +325,14 @@ gst_tcpclientsink_get_property (GObject * object, guint prop_id, GValue * value,
/* create a socket for sending to remote machine */
static gboolean
gst_tcpclientsink_init_send (GstTCPClientSink * this)
gst_tcpclientsink_start (GstTCPClientSink * this)
{
int ret;
gchar *ip;
if (GST_FLAG_IS_SET (this, GST_TCPCLIENTSINK_OPEN))
return TRUE;
/* reset caps_sent flag */
this->caps_sent = FALSE;
@ -373,34 +390,53 @@ gst_tcpclientsink_init_send (GstTCPClientSink * this)
return TRUE;
}
static void
gst_tcpclientsink_close (GstTCPClientSink * this)
static gboolean
gst_tcpclientsink_stop (GstTCPClientSink * this)
{
if (!GST_FLAG_IS_SET (this, GST_TCPCLIENTSINK_OPEN))
return TRUE;
if (this->sock_fd != -1) {
close (this->sock_fd);
this->sock_fd = -1;
}
GST_FLAG_UNSET (this, GST_TCPCLIENTSINK_OPEN);
return TRUE;
}
static GstElementStateReturn
gst_tcpclientsink_change_state (GstElement * element)
{
g_return_val_if_fail (GST_IS_TCPCLIENTSINK (element), GST_STATE_FAILURE);
GstTCPClientSink *sink;
gint transition;
GstElementStateReturn res;
if (GST_STATE_PENDING (element) == GST_STATE_NULL) {
if (GST_FLAG_IS_SET (element, GST_TCPCLIENTSINK_OPEN))
gst_tcpclientsink_close (GST_TCPCLIENTSINK (element));
} else {
if (!GST_FLAG_IS_SET (element, GST_TCPCLIENTSINK_OPEN)) {
if (!gst_tcpclientsink_init_send (GST_TCPCLIENTSINK (element)))
return GST_STATE_FAILURE;
}
sink = GST_TCPCLIENTSINK (element);
transition = GST_STATE_TRANSITION (element);
switch (transition) {
case GST_STATE_NULL_TO_READY:
case GST_STATE_READY_TO_PAUSED:
if (!gst_tcpclientsink_start (GST_TCPCLIENTSINK (element)))
goto start_failure;
break;
default:
break;
}
res = GST_ELEMENT_CLASS (parent_class)->change_state (element);
if (GST_ELEMENT_CLASS (parent_class)->change_state)
return GST_ELEMENT_CLASS (parent_class)->change_state (element);
switch (transition) {
case GST_STATE_READY_TO_NULL:
gst_tcpclientsink_stop (GST_TCPCLIENTSINK (element));
default:
break;
}
return res;
return GST_STATE_SUCCESS;
start_failure:
{
return GST_STATE_FAILURE;
}
}

View file

@ -23,11 +23,11 @@
#include <gst/gst.h>
#include <gst/base/gstbasesink.h>
#include "gsttcp.h"
#ifdef __cplusplus
extern "C" {
#endif /* __cplusplus */
G_BEGIN_DECLS
#include <stdio.h>
#include <stdlib.h>
@ -65,10 +65,7 @@ typedef enum {
} GstTCPClientSinkFlags;
struct _GstTCPClientSink {
GstElement element;
/* pad */
GstPad *sinkpad;
GstBaseSink element;
/* server information */
int port;
@ -81,21 +78,14 @@ struct _GstTCPClientSink {
size_t data_written; /* how much bytes have we written ? */
GstTCPProtocolType protocol; /* used with the protocol enum */
gboolean caps_sent; /* whether or not we sent caps already */
guint mtu;
GstClock *clock;
};
struct _GstTCPClientSinkClass {
GstElementClass parent_class;
GstBaseSinkClass parent_class;
};
GType gst_tcpclientsink_get_type(void);
#ifdef __cplusplus
}
#endif /* __cplusplus */
G_END_DECLS
#endif /* __GST_TCPCLIENTSINK_H__ */

View file

@ -47,6 +47,11 @@ GST_ELEMENT_DETAILS ("TCP Client source",
"Receive data as a client over the network via TCP",
"Thomas Vander Stichele <thomas at apestaart dot org>");
static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
GST_PAD_SRC,
GST_PAD_ALWAYS,
GST_STATIC_CAPS_ANY);
/* TCPClientSrc signals and args */
enum
{
@ -66,17 +71,18 @@ static void gst_tcpclientsrc_class_init (GstTCPClientSrc * klass);
static void gst_tcpclientsrc_init (GstTCPClientSrc * tcpclientsrc);
static void gst_tcpclientsrc_finalize (GObject * gobject);
static GstCaps *gst_tcpclientsrc_getcaps (GstPad * pad);
static GstCaps *gst_tcpclientsrc_getcaps (GstBaseSrc * psrc);
static GstData *gst_tcpclientsrc_get (GstPad * pad);
static GstElementStateReturn gst_tcpclientsrc_change_state (GstElement *
element);
static GstFlowReturn gst_tcpclientsrc_create (GstPushSrc * psrc,
GstBuffer ** outbuf);
static gboolean gst_tcpclientsrc_stop (GstBaseSrc * bsrc);
static gboolean gst_tcpclientsrc_start (GstBaseSrc * bsrc);
static gboolean gst_tcpclientsrc_unlock (GstBaseSrc * bsrc);
static void gst_tcpclientsrc_set_property (GObject * object, guint prop_id,
const GValue * value, GParamSpec * pspec);
static void gst_tcpclientsrc_get_property (GObject * object, guint prop_id,
GValue * value, GParamSpec * pspec);
static void gst_tcpclientsrc_set_clock (GstElement * element, GstClock * clock);
static GstElementClass *parent_class = NULL;
@ -87,7 +93,6 @@ gst_tcpclientsrc_get_type (void)
{
static GType tcpclientsrc_type = 0;
if (!tcpclientsrc_type) {
static const GTypeInfo tcpclientsrc_info = {
sizeof (GstTCPClientSrcClass),
@ -103,7 +108,7 @@ gst_tcpclientsrc_get_type (void)
};
tcpclientsrc_type =
g_type_register_static (GST_TYPE_ELEMENT, "GstTCPClientSrc",
g_type_register_static (GST_TYPE_PUSHSRC, "GstTCPClientSrc",
&tcpclientsrc_info, 0);
}
return tcpclientsrc_type;
@ -114,6 +119,9 @@ gst_tcpclientsrc_base_init (gpointer g_class)
{
GstElementClass *element_class = GST_ELEMENT_CLASS (g_class);
gst_element_class_add_pad_template (element_class,
gst_static_pad_template_get (&srctemplate));
gst_element_class_set_details (element_class, &gst_tcpclientsrc_details);
}
@ -122,11 +130,19 @@ gst_tcpclientsrc_class_init (GstTCPClientSrc * klass)
{
GObjectClass *gobject_class;
GstElementClass *gstelement_class;
GstBaseSrcClass *gstbasesrc_class;
GstPushSrcClass *gstpushsrc_class;
gobject_class = (GObjectClass *) klass;
gstelement_class = (GstElementClass *) klass;
gstbasesrc_class = (GstBaseSrcClass *) klass;
gstpushsrc_class = (GstPushSrcClass *) klass;
parent_class = g_type_class_ref (GST_TYPE_ELEMENT);
parent_class = g_type_class_ref (GST_TYPE_PUSHSRC);
gobject_class->set_property = gst_tcpclientsrc_set_property;
gobject_class->get_property = gst_tcpclientsrc_get_property;
gobject_class->finalize = gst_tcpclientsrc_finalize;
g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_HOST,
g_param_spec_string ("host", "Host",
@ -140,43 +156,28 @@ gst_tcpclientsrc_class_init (GstTCPClientSrc * klass)
GST_TYPE_TCP_PROTOCOL_TYPE, GST_TCP_PROTOCOL_TYPE_NONE,
G_PARAM_READWRITE));
gobject_class->set_property = gst_tcpclientsrc_set_property;
gobject_class->get_property = gst_tcpclientsrc_get_property;
gobject_class->finalize = gst_tcpclientsrc_finalize;
gstbasesrc_class->get_caps = gst_tcpclientsrc_getcaps;
gstbasesrc_class->start = gst_tcpclientsrc_start;
gstbasesrc_class->stop = gst_tcpclientsrc_stop;
gstbasesrc_class->unlock = gst_tcpclientsrc_unlock;
gstelement_class->change_state = gst_tcpclientsrc_change_state;
gstelement_class->set_clock = gst_tcpclientsrc_set_clock;
gstpushsrc_class->create = gst_tcpclientsrc_create;
GST_DEBUG_CATEGORY_INIT (tcpclientsrc_debug, "tcpclientsrc", 0,
"TCP Client Source");
}
static void
gst_tcpclientsrc_set_clock (GstElement * element, GstClock * clock)
{
GstTCPClientSrc *tcpclientsrc;
tcpclientsrc = GST_TCPCLIENTSRC (element);
tcpclientsrc->clock = clock;
}
static void
gst_tcpclientsrc_init (GstTCPClientSrc * this)
{
/* create the src pad */
this->srcpad = gst_pad_new ("src", GST_PAD_SRC);
gst_element_add_pad (GST_ELEMENT (this), this->srcpad);
gst_pad_set_get_function (this->srcpad, gst_tcpclientsrc_get);
gst_pad_set_getcaps_function (this->srcpad, gst_tcpclientsrc_getcaps);
this->port = TCP_DEFAULT_PORT;
this->host = g_strdup (TCP_DEFAULT_HOST);
this->clock = NULL;
this->sock_fd = -1;
this->protocol = GST_TCP_PROTOCOL_TYPE_NONE;
this->curoffset = 0;
this->caps = NULL;
this->curoffset = 0;
gst_base_src_set_live (GST_BASESRC (this), TRUE);
GST_FLAG_UNSET (this, GST_TCPCLIENTSRC_OPEN);
}
@ -190,12 +191,12 @@ gst_tcpclientsrc_finalize (GObject * gobject)
}
static GstCaps *
gst_tcpclientsrc_getcaps (GstPad * pad)
gst_tcpclientsrc_getcaps (GstBaseSrc * bsrc)
{
GstTCPClientSrc *src;
GstCaps *caps = NULL;
src = GST_TCPCLIENTSRC (GST_OBJECT_PARENT (pad));
src = GST_TCPCLIENTSRC (bsrc);
if (!GST_FLAG_IS_SET (src, GST_TCPCLIENTSRC_OPEN))
caps = gst_caps_new_any ();
@ -208,78 +209,21 @@ gst_tcpclientsrc_getcaps (GstPad * pad)
return caps;
}
/* close the socket and associated resources
* unset OPEN flag
* used both to recover from errors and go to NULL state */
static void
gst_tcpclientsrc_close (GstTCPClientSrc * this)
{
GST_DEBUG_OBJECT (this, "closing socket");
if (this->sock_fd != -1) {
close (this->sock_fd);
this->sock_fd = -1;
}
this->caps_received = FALSE;
if (this->caps) {
gst_caps_free (this->caps);
this->caps = NULL;
}
GST_FLAG_UNSET (this, GST_TCPCLIENTSRC_OPEN);
}
/* close socket and related items and return an EOS GstData
* called from _get */
static GstData *
gst_tcpclientsrc_eos (GstTCPClientSrc * src)
{
GST_DEBUG_OBJECT (src, "going to EOS");
gst_element_set_eos (GST_ELEMENT (src));
gst_tcpclientsrc_close (src);
return GST_DATA (gst_event_new (GST_EVENT_EOS));
}
static GstData *
gst_tcpclientsrc_get (GstPad * pad)
static GstFlowReturn
gst_tcpclientsrc_create (GstPushSrc * psrc, GstBuffer ** outbuf)
{
GstTCPClientSrc *src;
size_t readsize;
int ret;
GstData *data = NULL;
GstBuffer *buf = NULL;
g_return_val_if_fail (pad != NULL, NULL);
g_return_val_if_fail (GST_IS_PAD (pad), NULL);
src = GST_TCPCLIENTSRC (GST_OBJECT_PARENT (pad));
if (!GST_FLAG_IS_SET (src, GST_TCPCLIENTSRC_OPEN)) {
GST_DEBUG_OBJECT (src, "connection to server closed, cannot give data");
return NULL;
}
src = GST_TCPCLIENTSRC (psrc);
if (!GST_FLAG_IS_SET (src, GST_TCPCLIENTSRC_OPEN))
goto wrong_state;
GST_LOG_OBJECT (src, "asked for a buffer");
/* try to negotiate here */
if (!gst_pad_is_negotiated (pad)) {
if (GST_PAD_LINK_FAILED (gst_pad_renegotiate (pad))) {
GST_ELEMENT_ERROR (src, CORE, NEGOTIATION, (NULL), GST_ERROR_SYSTEM);
gst_buffer_unref (buf);
return gst_tcpclientsrc_eos (src);
}
}
/* if we have a left over buffer after a discont, return that */
if (src->buffer_after_discont) {
buf = src->buffer_after_discont;
GST_LOG_OBJECT (src,
"Returning buffer after discont of size %d, ts %"
GST_TIME_FORMAT ", dur %" GST_TIME_FORMAT
", offset %" G_GINT64_FORMAT ", offset_end %" G_GINT64_FORMAT,
GST_BUFFER_SIZE (buf), GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buf)),
GST_TIME_ARGS (GST_BUFFER_DURATION (buf)),
GST_BUFFER_OFFSET (buf), GST_BUFFER_OFFSET_END (buf));
src->buffer_after_discont = NULL;
return GST_DATA (buf);
}
/* read the buffer header if we're using a protocol */
switch (src->protocol) {
fd_set testfds;
@ -288,101 +232,51 @@ gst_tcpclientsrc_get (GstPad * pad)
/* do a blocking select on the socket */
FD_ZERO (&testfds);
FD_SET (src->sock_fd, &testfds);
ret = select (src->sock_fd + 1, &testfds, (fd_set *) 0, (fd_set *) 0, 0);
/* no action (0) is an error too in our case */
if (ret <= 0) {
GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
("select failed: %s", g_strerror (errno)));
return gst_tcpclientsrc_eos (src);
}
if ((ret = select (src->sock_fd + 1, &testfds, NULL, NULL, 0)) <= 0)
goto select_error;
/* ask how much is available for reading on the socket */
ret = ioctl (src->sock_fd, FIONREAD, &readsize);
if (ret < 0) {
GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
("ioctl failed: %s", g_strerror (errno)));
return gst_tcpclientsrc_eos (src);
}
if ((ret = ioctl (src->sock_fd, FIONREAD, &readsize)) < 0)
goto ioctl_error;
GST_LOG_OBJECT (src, "ioctl says %d bytes available", readsize);
buf = gst_buffer_new_and_alloc (readsize);
break;
case GST_TCP_PROTOCOL_TYPE_GDP:
if (!(data = gst_tcp_gdp_read_header (GST_ELEMENT (src), src->sock_fd))) {
return gst_tcpclientsrc_eos (src);
}
if (GST_IS_EVENT (data)) {
/* if we got back an EOS event, then we should go into eos ourselves */
if (GST_EVENT_TYPE (data) == GST_EVENT_EOS) {
gst_event_unref (data);
return gst_tcpclientsrc_eos (src);
}
return data;
}
buf = GST_BUFFER (data);
case GST_TCP_PROTOCOL_TYPE_GDP:
if (!(buf = gst_tcp_gdp_read_buffer (GST_ELEMENT (src), src->sock_fd)))
goto hit_eos;
GST_LOG_OBJECT (src, "Going to read data from socket into buffer %p",
buf);
/* use this new buffer to read data into */
readsize = GST_BUFFER_SIZE (buf);
break;
default:
g_warning ("Unhandled protocol type");
/* need to assert as buf == NULL */
g_assert ("Unhandled protocol type");
break;
}
GST_LOG_OBJECT (src, "Reading %d bytes into buffer", readsize);
ret = gst_tcp_socket_read (src->sock_fd, GST_BUFFER_DATA (buf), readsize);
if (ret < 0) {
GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
gst_buffer_unref (buf);
return gst_tcpclientsrc_eos (src);
}
if ((ret =
gst_tcp_socket_read (src->sock_fd, GST_BUFFER_DATA (buf),
readsize)) < 0)
goto read_error;
/* if we read 0 bytes, and we're blocking, we hit eos */
if (ret == 0) {
GST_DEBUG_OBJECT (src, "blocking read returns 0, EOS");
gst_buffer_unref (buf);
return gst_tcpclientsrc_eos (src);
}
if (ret == 0)
goto zero_read;
readsize = ret;
GST_BUFFER_SIZE (buf) = readsize;
GST_BUFFER_MAXSIZE (buf) = readsize;
/* FIXME: we could decide to set OFFSET and OFFSET_END for non-protocol
* streams to mean the bytes processed */
/* if this is our first buffer, we need to send a discont with the
* given timestamp or the current offset, and store the buffer for
* the next iteration through the get loop */
if (src->send_discont) {
GstClockTime timestamp;
GstEvent *event;
src->send_discont = FALSE;
src->buffer_after_discont = buf;
/* if the timestamp is valid, send a timed discont
* taking into account the incoming buffer's timestamps */
timestamp = GST_BUFFER_TIMESTAMP (buf);
if (GST_CLOCK_TIME_IS_VALID (timestamp)) {
GST_DEBUG_OBJECT (src,
"sending discontinuous with timestamp %" GST_TIME_FORMAT,
GST_TIME_ARGS (timestamp));
event =
gst_event_new_discontinuous (FALSE, GST_FORMAT_TIME, timestamp, NULL);
return GST_DATA (event);
}
/* otherwise, send an offset discont */
GST_DEBUG_OBJECT (src, "sending discontinuous with offset %d",
src->curoffset);
event =
gst_event_new_discontinuous (FALSE, GST_FORMAT_BYTES, src->curoffset,
NULL);
return GST_DATA (event);
}
src->curoffset += readsize;
GST_LOG_OBJECT (src,
"Returning buffer from _get of size %d, ts %"
GST_TIME_FORMAT ", dur %" GST_TIME_FORMAT
@ -390,7 +284,47 @@ gst_tcpclientsrc_get (GstPad * pad)
GST_BUFFER_SIZE (buf), GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buf)),
GST_TIME_ARGS (GST_BUFFER_DURATION (buf)),
GST_BUFFER_OFFSET (buf), GST_BUFFER_OFFSET_END (buf));
return GST_DATA (buf);
gst_buffer_set_caps (buf, src->caps);
*outbuf = buf;
return GST_FLOW_OK;
/* ERRORS */
wrong_state:
{
GST_DEBUG_OBJECT (src, "connection to server closed, cannot give data");
return GST_FLOW_WRONG_STATE;
}
select_error:
{
GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
("select failed: %s", g_strerror (errno)));
return GST_FLOW_ERROR;
}
ioctl_error:
{
GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
("ioctl failed: %s", g_strerror (errno)));
return GST_FLOW_ERROR;
}
hit_eos:
{
return GST_FLOW_WRONG_STATE;
}
read_error:
{
GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
gst_buffer_unref (buf);
return GST_FLOW_ERROR;
}
zero_read:
{
GST_DEBUG_OBJECT (src, "blocking read returns 0, EOS");
gst_buffer_unref (buf);
return GST_FLOW_WRONG_STATE;
}
}
static void
@ -453,109 +387,125 @@ gst_tcpclientsrc_get_property (GObject * object, guint prop_id, GValue * value,
/* create a socket for connecting to remote server */
static gboolean
gst_tcpclientsrc_init_receive (GstTCPClientSrc * this)
gst_tcpclientsrc_start (GstBaseSrc * bsrc)
{
int ret;
gchar *ip;
GstTCPClientSrc *src = GST_TCPCLIENTSRC (bsrc);
/* create receiving client socket */
GST_DEBUG_OBJECT (this, "opening receiving client socket to %s:%d",
this->host, this->port);
if ((this->sock_fd = socket (AF_INET, SOCK_STREAM, 0)) == -1) {
GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ, (NULL), GST_ERROR_SYSTEM);
return FALSE;
}
GST_DEBUG_OBJECT (this, "opened receiving client socket with fd %d",
this->sock_fd);
GST_FLAG_SET (this, GST_TCPCLIENTSRC_OPEN);
GST_DEBUG_OBJECT (src, "opening receiving client socket to %s:%d",
src->host, src->port);
if ((src->sock_fd = socket (AF_INET, SOCK_STREAM, 0)) == -1)
goto no_socket;
GST_DEBUG_OBJECT (src, "opened receiving client socket with fd %d",
src->sock_fd);
GST_FLAG_SET (src, GST_TCPCLIENTSRC_OPEN);
/* look up name if we need to */
ip = gst_tcp_host_to_ip (GST_ELEMENT (this), this->host);
if (!ip) {
gst_tcpclientsrc_close (this);
return FALSE;
}
GST_DEBUG_OBJECT (this, "IP address for host %s is %s", this->host, ip);
if (!(ip = gst_tcp_host_to_ip (GST_ELEMENT (src), src->host)))
goto name_resolv;
GST_DEBUG_OBJECT (src, "IP address for host %s is %s", src->host, ip);
/* connect to server */
memset (&this->server_sin, 0, sizeof (this->server_sin));
this->server_sin.sin_family = AF_INET; /* network socket */
this->server_sin.sin_port = htons (this->port); /* on port */
this->server_sin.sin_addr.s_addr = inet_addr (ip); /* on host ip */
memset (&src->server_sin, 0, sizeof (src->server_sin));
src->server_sin.sin_family = AF_INET; /* network socket */
src->server_sin.sin_port = htons (src->port); /* on port */
src->server_sin.sin_addr.s_addr = inet_addr (ip); /* on host ip */
g_free (ip);
GST_DEBUG_OBJECT (this, "connecting to server");
ret = connect (this->sock_fd, (struct sockaddr *) &this->server_sin,
sizeof (this->server_sin));
GST_DEBUG_OBJECT (src, "connecting to server");
ret = connect (src->sock_fd, (struct sockaddr *) &src->server_sin,
sizeof (src->server_sin));
if (ret) {
gst_tcpclientsrc_close (this);
gst_tcpclientsrc_stop (GST_BASESRC (src));
switch (errno) {
case ECONNREFUSED:
GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ,
(_("Connection to %s:%d refused."), this->host, this->port),
(NULL));
GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ,
(_("Connection to %s:%d refused."), src->host, src->port), (NULL));
return FALSE;
break;
default:
GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ, (NULL),
("connect to %s:%d failed: %s", this->host, this->port,
GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL),
("connect to %s:%d failed: %s", src->host, src->port,
g_strerror (errno)));
return FALSE;
break;
}
}
this->send_discont = TRUE;
this->buffer_after_discont = NULL;
/* get the caps if we're using GDP */
if (this->protocol == GST_TCP_PROTOCOL_TYPE_GDP) {
if (src->protocol == GST_TCP_PROTOCOL_TYPE_GDP) {
/* if we haven't received caps yet, we should get them first */
if (!this->caps_received) {
if (!src->caps_received) {
GstCaps *caps;
GST_DEBUG_OBJECT (this, "getting caps through GDP");
if (!(caps = gst_tcp_gdp_read_caps (GST_ELEMENT (this), this->sock_fd))) {
gst_tcpclientsrc_close (this);
GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
("Could not read caps through GDP"));
return FALSE;
}
if (!GST_IS_CAPS (caps)) {
gst_tcpclientsrc_close (this);
GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
("Could not read caps through GDP"));
return FALSE;
}
GST_DEBUG_OBJECT (this, "Received caps through GDP: %" GST_PTR_FORMAT,
GST_DEBUG_OBJECT (src, "getting caps through GDP");
if (!(caps = gst_tcp_gdp_read_caps (GST_ELEMENT (src), src->sock_fd)))
goto no_caps;
if (!GST_IS_CAPS (caps))
goto no_caps;
GST_DEBUG_OBJECT (src, "Received caps through GDP: %" GST_PTR_FORMAT,
caps);
this->caps_received = TRUE;
this->caps = caps;
src->caps_received = TRUE;
src->caps = caps;
}
}
return TRUE;
no_socket:
{
GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL), GST_ERROR_SYSTEM);
return FALSE;
}
name_resolv:
{
gst_tcpclientsrc_stop (GST_BASESRC (src));
return FALSE;
}
no_caps:
{
gst_tcpclientsrc_stop (GST_BASESRC (src));
GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
("Could not read caps through GDP"));
return FALSE;
}
}
static GstElementStateReturn
gst_tcpclientsrc_change_state (GstElement * element)
/* close the socket and associated resources
* unset OPEN flag
* used both to recover from errors and go to NULL state */
static gboolean
gst_tcpclientsrc_stop (GstBaseSrc * bsrc)
{
g_return_val_if_fail (GST_IS_TCPCLIENTSRC (element), GST_STATE_FAILURE);
GstTCPClientSrc *src;
/* if open and going to NULL, close it */
if (GST_FLAG_IS_SET (element, GST_TCPCLIENTSRC_OPEN) &&
GST_STATE_PENDING (element) == GST_STATE_NULL) {
gst_tcpclientsrc_close (GST_TCPCLIENTSRC (element));
src = GST_TCPCLIENTSRC (bsrc);
GST_DEBUG_OBJECT (src, "closing socket");
if (src->sock_fd != -1) {
close (src->sock_fd);
src->sock_fd = -1;
}
/* if closed and going to a state higher than NULL, open it */
if (!GST_FLAG_IS_SET (element, GST_TCPCLIENTSRC_OPEN) &&
GST_STATE_PENDING (element) > GST_STATE_NULL) {
if (!gst_tcpclientsrc_init_receive (GST_TCPCLIENTSRC (element)))
return GST_STATE_FAILURE;
src->caps_received = FALSE;
if (src->caps) {
gst_caps_unref (src->caps);
src->caps = NULL;
}
GST_FLAG_UNSET (src, GST_TCPCLIENTSRC_OPEN);
if (GST_ELEMENT_CLASS (parent_class)->change_state)
return GST_ELEMENT_CLASS (parent_class)->change_state (element);
return GST_STATE_SUCCESS;
return TRUE;
}
static gboolean
gst_tcpclientsrc_unlock (GstBaseSrc * bsrc)
{
return TRUE;
}

View file

@ -23,16 +23,16 @@
#define __GST_TCPCLIENTSRC_H__
#include <gst/gst.h>
#include <gst/base/gstpushsrc.h>
#ifdef __cplusplus
extern "C" {
#endif /* __cplusplus */
G_BEGIN_DECLS
#include <netdb.h> /* sockaddr_in */
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h> /* sockaddr_in */
#include <unistd.h>
#include "gsttcp.h"
#define GST_TYPE_TCPCLIENTSRC \
@ -56,10 +56,7 @@ typedef enum {
} GstTCPClientSrcFlags;
struct _GstTCPClientSrc {
GstElement element;
/* pad */
GstPad *srcpad;
GstPushSrc element;
/* server information */
int port;
@ -75,21 +72,14 @@ struct _GstTCPClientSrc {
GstTCPProtocolType protocol; /* protocol used for reading data */
gboolean caps_received; /* if we have received caps yet */
GstCaps *caps;
GstClock *clock;
gboolean send_discont; /* TRUE when we need to send a discont */
GstBuffer *buffer_after_discont; /* temporary storage for buffer */
};
struct _GstTCPClientSrcClass {
GstElementClass parent_class;
GstPushSrcClass parent_class;
};
GType gst_tcpclientsrc_get_type (void);
#ifdef __cplusplus
}
#endif /* __cplusplus */
G_END_DECLS
#endif /* __GST_TCPCLIENTSRC_H__ */

View file

@ -21,8 +21,7 @@
#include "config.h"
#endif
#include "gsttcpsrc.h"
#include "gsttcpsink.h"
#include <gst/dataprotocol/dataprotocol.h>
#include "gsttcpclientsrc.h"
#include "gsttcpclientsink.h"
#include "gsttcpserversrc.h"
@ -34,12 +33,7 @@ GST_DEBUG_CATEGORY (tcp_debug);
static gboolean
plugin_init (GstPlugin * plugin)
{
if (!gst_element_register (plugin, "tcpsink", GST_RANK_NONE,
GST_TYPE_TCPSINK))
return FALSE;
if (!gst_element_register (plugin, "tcpsrc", GST_RANK_NONE, GST_TYPE_TCPSRC))
return FALSE;
gst_dp_init ();
if (!gst_element_register (plugin, "tcpclientsink", GST_RANK_NONE,
GST_TYPE_TCPCLIENTSINK))

View file

@ -119,6 +119,10 @@ gst_tcpserversink_class_init (GstTCPServerSink * klass)
parent_class = g_type_class_ref (GST_TYPE_MULTIFDSINK);
gobject_class->set_property = gst_tcpserversink_set_property;
gobject_class->get_property = gst_tcpserversink_get_property;
gobject_class->finalize = gst_tcpserversink_finalize;
g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_HOST,
g_param_spec_string ("host", "host", "The host/IP to send the packets to",
TCP_DEFAULT_HOST, G_PARAM_READWRITE));
@ -126,10 +130,6 @@ gst_tcpserversink_class_init (GstTCPServerSink * klass)
g_param_spec_int ("port", "port", "The port to send the packets to",
0, TCP_HIGHEST_PORT, TCP_DEFAULT_PORT, G_PARAM_READWRITE));
gobject_class->set_property = gst_tcpserversink_set_property;
gobject_class->get_property = gst_tcpserversink_get_property;
gobject_class->finalize = gst_tcpserversink_finalize;
gstmultifdsink_class->init = gst_tcpserversink_init_send;
gstmultifdsink_class->wait = gst_tcpserversink_handle_wait;
gstmultifdsink_class->close = gst_tcpserversink_close;

View file

@ -25,9 +25,7 @@
#include <gst/gst.h>
#ifdef __cplusplus
extern "C" {
#endif /* __cplusplus */
G_BEGIN_DECLS
#include <stdio.h>
#include <stdlib.h>
@ -82,10 +80,6 @@ struct _GstTCPServerSinkClass {
GType gst_tcpserversink_get_type (void);
#ifdef __cplusplus
}
#endif /* __cplusplus */
G_END_DECLS
#endif /* __GST_TCPSERVERSINK_H__ */

View file

@ -46,6 +46,12 @@ GST_ELEMENT_DETAILS ("TCP Server source",
"Receive data as a server over the network via TCP",
"Thomas Vander Stichele <thomas at apestaart dot org>");
static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
GST_PAD_SRC,
GST_PAD_ALWAYS,
GST_STATIC_CAPS_ANY);
/* TCPServerSrc signals and args */
enum
{
@ -65,15 +71,15 @@ static void gst_tcpserversrc_class_init (GstTCPServerSrc * klass);
static void gst_tcpserversrc_init (GstTCPServerSrc * tcpserversrc);
static void gst_tcpserversrc_finalize (GObject * gobject);
static GstData *gst_tcpserversrc_get (GstPad * pad);
static GstElementStateReturn gst_tcpserversrc_change_state (GstElement *
element);
static gboolean gst_tcpserversrc_start (GstBaseSrc * bsrc);
static gboolean gst_tcpserversrc_stop (GstBaseSrc * bsrc);
static GstFlowReturn gst_tcpserversrc_create (GstPushSrc * psrc,
GstBuffer ** buf);
static void gst_tcpserversrc_set_property (GObject * object, guint prop_id,
const GValue * value, GParamSpec * pspec);
static void gst_tcpserversrc_get_property (GObject * object, guint prop_id,
GValue * value, GParamSpec * pspec);
static void gst_tcpserversrc_set_clock (GstElement * element, GstClock * clock);
static GstElementClass *parent_class = NULL;
@ -100,7 +106,7 @@ gst_tcpserversrc_get_type (void)
};
tcpserversrc_type =
g_type_register_static (GST_TYPE_ELEMENT, "GstTCPServerSrc",
g_type_register_static (GST_TYPE_PUSHSRC, "GstTCPServerSrc",
&tcpserversrc_info, 0);
}
return tcpserversrc_type;
@ -111,6 +117,9 @@ gst_tcpserversrc_base_init (gpointer g_class)
{
GstElementClass *element_class = GST_ELEMENT_CLASS (g_class);
gst_element_class_add_pad_template (element_class,
gst_static_pad_template_get (&srctemplate));
gst_element_class_set_details (element_class, &gst_tcpserversrc_details);
}
@ -119,11 +128,19 @@ gst_tcpserversrc_class_init (GstTCPServerSrc * klass)
{
GObjectClass *gobject_class;
GstElementClass *gstelement_class;
GstBaseSrcClass *gstbasesrc_class;
GstPushSrcClass *gstpushsrc_class;
gobject_class = (GObjectClass *) klass;
gstelement_class = (GstElementClass *) klass;
gstbasesrc_class = (GstBaseSrcClass *) klass;
gstpushsrc_class = (GstPushSrcClass *) klass;
parent_class = g_type_class_ref (GST_TYPE_ELEMENT);
parent_class = g_type_class_ref (GST_TYPE_PUSHSRC);
gobject_class->set_property = gst_tcpserversrc_set_property;
gobject_class->get_property = gst_tcpserversrc_get_property;
gobject_class->finalize = gst_tcpserversrc_finalize;
g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_HOST,
g_param_spec_string ("host", "Host", "The hostname to listen as",
@ -136,241 +153,98 @@ gst_tcpserversrc_class_init (GstTCPServerSrc * klass)
GST_TYPE_TCP_PROTOCOL_TYPE, GST_TCP_PROTOCOL_TYPE_NONE,
G_PARAM_READWRITE));
gobject_class->set_property = gst_tcpserversrc_set_property;
gobject_class->get_property = gst_tcpserversrc_get_property;
gobject_class->finalize = gst_tcpserversrc_finalize;
gstbasesrc_class->start = gst_tcpserversrc_start;
gstbasesrc_class->stop = gst_tcpserversrc_stop;
gstelement_class->change_state = gst_tcpserversrc_change_state;
gstelement_class->set_clock = gst_tcpserversrc_set_clock;
gstpushsrc_class->create = gst_tcpserversrc_create;
GST_DEBUG_CATEGORY_INIT (tcpserversrc_debug, "tcpserversrc", 0,
"TCP Server Source");
}
static void
gst_tcpserversrc_set_clock (GstElement * element, GstClock * clock)
gst_tcpserversrc_init (GstTCPServerSrc * src)
{
GstTCPServerSrc *tcpserversrc;
src->server_port = TCP_DEFAULT_PORT;
src->host = g_strdup (TCP_DEFAULT_HOST);
src->server_sock_fd = -1;
src->client_sock_fd = -1;
src->curoffset = 0;
src->protocol = GST_TCP_PROTOCOL_TYPE_NONE;
tcpserversrc = GST_TCPSERVERSRC (element);
tcpserversrc->clock = clock;
}
static void
gst_tcpserversrc_init (GstTCPServerSrc * this)
{
/* create the src pad */
this->srcpad = gst_pad_new ("src", GST_PAD_SRC);
gst_element_add_pad (GST_ELEMENT (this), this->srcpad);
gst_pad_set_get_function (this->srcpad, gst_tcpserversrc_get);
this->server_port = TCP_DEFAULT_PORT;
this->host = g_strdup (TCP_DEFAULT_HOST);
this->clock = NULL;
this->server_sock_fd = -1;
this->client_sock_fd = -1;
this->curoffset = 0;
this->protocol = GST_TCP_PROTOCOL_TYPE_NONE;
GST_FLAG_UNSET (this, GST_TCPSERVERSRC_OPEN);
GST_FLAG_UNSET (src, GST_TCPSERVERSRC_OPEN);
}
static void
gst_tcpserversrc_finalize (GObject * gobject)
{
GstTCPServerSrc *this = GST_TCPSERVERSRC (gobject);
GstTCPServerSrc *src = GST_TCPSERVERSRC (gobject);
g_free (this->host);
g_free (src->host);
}
/* read the gdp caps packet from the socket */
static GstCaps *
gst_tcpserversrc_gdp_read_caps (GstTCPServerSrc * this)
{
size_t header_length = GST_DP_HEADER_LENGTH;
size_t readsize;
guint8 *header = NULL;
guint8 *payload = NULL;
ssize_t ret;
GstCaps *caps;
gchar *string;
header = g_malloc (header_length);
readsize = header_length;
GST_LOG_OBJECT (this, "Reading %d bytes for caps packet header", readsize);
ret = read (this->client_sock_fd, header, readsize);
if (ret < 0) {
GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
g_free (header);
return NULL;
}
g_assert (ret == readsize);
if (!gst_dp_validate_header (header_length, header)) {
GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
("GDP caps packet header does not validate"));
g_free (header);
return NULL;
}
readsize = gst_dp_header_payload_length (header);
payload = g_malloc (readsize);
GST_LOG_OBJECT (this, "Reading %d bytes for caps packet payload", readsize);
ret = read (this->client_sock_fd, payload, readsize);
if (ret < 0) {
GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
g_free (header);
g_free (payload);
return NULL;
}
g_assert (ret == readsize);
if (!gst_dp_validate_payload (readsize, header, payload)) {
GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
("GDP caps packet payload does not validate"));
g_free (header);
g_free (payload);
return NULL;
}
caps = gst_dp_caps_from_packet (header_length, header, payload);
string = gst_caps_to_string (caps);
GST_DEBUG_OBJECT (this, "retrieved GDP caps from packet payload: %s", string);
g_free (string);
g_free (header);
g_free (payload);
return caps;
}
/* read the gdp buffer header from the socket
* returns a GstData,
* representing the new GstBuffer to read data into, or an EOS event
*/
static GstData *
gst_tcpserversrc_gdp_read_header (GstTCPServerSrc * this)
{
size_t header_length = GST_DP_HEADER_LENGTH;
size_t readsize;
guint8 *header = NULL;
ssize_t ret;
GstBuffer *buffer;
header = g_malloc (header_length);
readsize = header_length;
GST_LOG_OBJECT (this, "Reading %d bytes for buffer packet header", readsize);
ret = read (this->client_sock_fd, header, readsize);
/* if we read 0 bytes, and we're blocking, we hit eos */
if (ret == 0) {
GST_DEBUG ("blocking read returns 0, EOS");
gst_element_set_eos (GST_ELEMENT (this));
g_free (header);
return GST_DATA (gst_event_new (GST_EVENT_EOS));
}
if (ret < 0) {
GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
g_free (header);
return NULL;
}
if (ret != readsize) {
g_warning ("Wanted %d bytes, got %d bytes", (int) readsize, (int) ret);
}
g_assert (ret == readsize);
if (!gst_dp_validate_header (header_length, header)) {
GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
("GDP buffer packet header does not validate"));
g_free (header);
return NULL;
}
GST_LOG_OBJECT (this, "validated buffer packet header");
buffer = gst_dp_buffer_from_header (header_length, header);
GST_LOG_OBJECT (this, "created new buffer %p from packet header", buffer);
return GST_DATA (buffer);
}
static GstData *
gst_tcpserversrc_get (GstPad * pad)
static GstFlowReturn
gst_tcpserversrc_create (GstPushSrc * psrc, GstBuffer ** outbuf)
{
GstTCPServerSrc *src;
size_t readsize;
int ret;
GstData *data = NULL;
GstBuffer *buf = NULL;
GstCaps *caps;
g_return_val_if_fail (pad != NULL, NULL);
g_return_val_if_fail (GST_IS_PAD (pad), NULL);
src = GST_TCPSERVERSRC (GST_OBJECT_PARENT (pad));
g_return_val_if_fail (GST_FLAG_IS_SET (src, GST_TCPSERVERSRC_OPEN), NULL);
src = GST_TCPSERVERSRC (psrc);
g_return_val_if_fail (GST_FLAG_IS_SET (src, GST_TCPSERVERSRC_OPEN),
GST_FLOW_ERROR);
/* read the buffer header if we're using a protocol */
switch (src->protocol) {
case GST_TCP_PROTOCOL_TYPE_NONE:
{
fd_set testfds;
case GST_TCP_PROTOCOL_TYPE_NONE:
/* do a blocking select on the socket */
FD_ZERO (&testfds);
FD_SET (src->client_sock_fd, &testfds);
ret =
select (src->client_sock_fd + 1, &testfds, (fd_set *) 0, (fd_set *) 0,
0);
/* no action (0) is an error too in our case */
if (ret <= 0) {
GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
("select failed: %s", g_strerror (errno)));
return GST_DATA (gst_event_new (GST_EVENT_EOS));
}
if ((ret =
select (src->client_sock_fd + 1, &testfds, (fd_set *) 0,
(fd_set *) 0, 0)) <= 0)
goto select_error;
/* ask how much is available for reading on the socket */
ret = ioctl (src->client_sock_fd, FIONREAD, &readsize);
if (ret < 0) {
GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
("ioctl failed: %s", g_strerror (errno)));
return GST_DATA (gst_event_new (GST_EVENT_EOS));
}
if ((ret = ioctl (src->client_sock_fd, FIONREAD, &readsize)) < 0)
goto ioctl_error;
buf = gst_buffer_new_and_alloc (readsize);
break;
}
case GST_TCP_PROTOCOL_TYPE_GDP:
/* if we haven't received caps yet, we should get them first */
if (!src->caps_received) {
gchar *string;
if (!(caps = gst_tcpserversrc_gdp_read_caps (src))) {
GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
("Could not read caps through GDP"));
return GST_DATA (gst_event_new (GST_EVENT_EOS));
}
if (!(caps =
gst_tcp_gdp_read_caps (GST_ELEMENT (src), src->client_sock_fd)))
goto gdp_caps_read_error;
src->caps_received = TRUE;
string = gst_caps_to_string (caps);
GST_DEBUG_OBJECT (src, "Received caps through GDP: %s", string);
g_free (string);
if (!gst_pad_try_set_caps (pad, caps)) {
g_warning ("Could not set caps");
return GST_DATA (gst_event_new (GST_EVENT_EOS));
}
gst_pad_set_caps (GST_BASESRC_PAD (psrc), caps);
}
/* now receive the buffer header */
if (!(data = gst_tcpserversrc_gdp_read_header (src))) {
GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
("Could not read data header through GDP"));
return GST_DATA (gst_event_new (GST_EVENT_EOS));
}
if (GST_IS_EVENT (data))
return data;
buf = GST_BUFFER (data);
if (!(buf =
gst_tcp_gdp_read_buffer (GST_ELEMENT (src), src->client_sock_fd)))
goto gdp_buffer_read_error;
GST_LOG_OBJECT (src, "Going to read data from socket into buffer %p",
buf);
/* use this new buffer to read data into */
readsize = GST_BUFFER_SIZE (buf);
break;
@ -380,31 +254,63 @@ gst_tcpserversrc_get (GstPad * pad)
}
GST_LOG_OBJECT (src, "Reading %d bytes", readsize);
ret =
gst_tcp_socket_read (src->client_sock_fd, GST_BUFFER_DATA (buf),
readsize);
if (ret < 0) {
GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
gst_buffer_unref (buf);
return GST_DATA (gst_event_new (GST_EVENT_EOS));
}
if ((ret =
gst_tcp_socket_read (src->client_sock_fd, GST_BUFFER_DATA (buf),
readsize)) < 0)
goto read_error;
/* if we read 0 bytes, and we're blocking, we hit eos */
if (ret == 0) {
GST_DEBUG ("blocking read returns 0, EOS");
gst_buffer_unref (buf);
gst_element_set_eos (GST_ELEMENT (src));
return GST_DATA (gst_event_new (GST_EVENT_EOS));
}
if (ret == 0)
goto hit_eos;
readsize = ret;
GST_LOG_OBJECT (src, "Read %d bytes", readsize);
GST_BUFFER_SIZE (buf) = readsize;
GST_BUFFER_MAXSIZE (buf) = readsize;
GST_BUFFER_OFFSET (buf) = src->curoffset;
GST_BUFFER_OFFSET_END (buf) = src->curoffset + readsize;
src->curoffset += readsize;
return GST_DATA (buf);
*outbuf = buf;
return GST_FLOW_OK;
/* ERROR */
select_error:
{
GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
("select failed: %s", g_strerror (errno)));
return GST_FLOW_ERROR;
}
ioctl_error:
{
GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
("ioctl failed: %s", g_strerror (errno)));
return GST_FLOW_ERROR;
}
gdp_caps_read_error:
{
GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
("Could not read caps through GDP"));
return GST_FLOW_ERROR;
}
gdp_buffer_read_error:
{
GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
("Could not read buffer header through GDP"));
return GST_FLOW_ERROR;
}
read_error:
{
GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
gst_buffer_unref (buf);
return GST_FLOW_ERROR;
}
hit_eos:
{
GST_DEBUG ("blocking read returns 0, EOS");
gst_buffer_unref (buf);
return GST_FLOW_WRONG_STATE;
}
}
@ -467,121 +373,125 @@ gst_tcpserversrc_get_property (GObject * object, guint prop_id, GValue * value,
/* set up server */
static gboolean
gst_tcpserversrc_init_receive (GstTCPServerSrc * this)
gst_tcpserversrc_start (GstBaseSrc * bsrc)
{
int ret;
GstTCPServerSrc *src = GST_TCPSERVERSRC (bsrc);
/* reset caps_received flag */
this->caps_received = FALSE;
src->caps_received = FALSE;
/* create the server listener socket */
if ((this->server_sock_fd = socket (AF_INET, SOCK_STREAM, 0)) == -1) {
GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ, (NULL), GST_ERROR_SYSTEM);
return FALSE;
}
GST_DEBUG_OBJECT (this, "opened receiving server socket with fd %d",
this->server_sock_fd);
if ((src->server_sock_fd = socket (AF_INET, SOCK_STREAM, 0)) == -1)
goto socket_error;
GST_DEBUG_OBJECT (src, "opened receiving server socket with fd %d",
src->server_sock_fd);
/* make address reusable */
ret = 1;
if (setsockopt (this->server_sock_fd, SOL_SOCKET, SO_REUSEADDR, &ret,
sizeof (int)) < 0) {
GST_ELEMENT_ERROR (this, RESOURCE, SETTINGS, (NULL),
("Could not setsockopt: %s", g_strerror (errno)));
return FALSE;
}
if (setsockopt (src->server_sock_fd, SOL_SOCKET, SO_REUSEADDR, &ret,
sizeof (int)) < 0)
goto sock_opt;
/* name the socket */
memset (&this->server_sin, 0, sizeof (this->server_sin));
this->server_sin.sin_family = AF_INET; /* network socket */
this->server_sin.sin_port = htons (this->server_port); /* on port */
if (this->host) {
gchar *host = gst_tcp_host_to_ip (GST_ELEMENT (this), this->host);
memset (&src->server_sin, 0, sizeof (src->server_sin));
src->server_sin.sin_family = AF_INET; /* network socket */
src->server_sin.sin_port = htons (src->server_port); /* on port */
if (src->host) {
gchar *host;
if (!host) {
gst_tcp_socket_close (&this->server_sock_fd);
return FALSE;
}
this->server_sin.sin_addr.s_addr = inet_addr (host);
if (!(host = gst_tcp_host_to_ip (GST_ELEMENT (src), src->host)))
goto host_error;
src->server_sin.sin_addr.s_addr = inet_addr (host);
g_free (host);
} else
this->server_sin.sin_addr.s_addr = htonl (INADDR_ANY);
src->server_sin.sin_addr.s_addr = htonl (INADDR_ANY);
/* bind it */
GST_DEBUG_OBJECT (this, "binding server socket to address");
ret = bind (this->server_sock_fd, (struct sockaddr *) &this->server_sin,
sizeof (this->server_sin));
GST_DEBUG_OBJECT (src, "binding server socket to address");
if ((ret = bind (src->server_sock_fd, (struct sockaddr *) &src->server_sin,
sizeof (src->server_sin))) < 0)
goto bind_error;
if (ret) {
gst_tcp_socket_close (&this->server_sock_fd);
switch (errno) {
default:
GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ, (NULL),
("bind failed: %s", g_strerror (errno)));
return FALSE;
break;
}
}
GST_DEBUG_OBJECT (src, "listening on server socket %d with queue of %d",
src->server_sock_fd, TCP_BACKLOG);
GST_DEBUG_OBJECT (this, "listening on server socket %d with queue of %d",
this->server_sock_fd, TCP_BACKLOG);
if (listen (this->server_sock_fd, TCP_BACKLOG) == -1) {
gst_tcp_socket_close (&this->server_sock_fd);
GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ, (NULL),
("Could not listen on server socket: %s", g_strerror (errno)));
return FALSE;
}
if (listen (src->server_sock_fd, TCP_BACKLOG) == -1)
goto listen_error;
/* FIXME: maybe we should think about moving actual client accepting
somewhere else */
GST_DEBUG_OBJECT (this, "waiting for client");
this->client_sock_fd =
accept (this->server_sock_fd, (struct sockaddr *) &this->client_sin,
&this->client_sin_len);
if (this->client_sock_fd == -1) {
gst_tcp_socket_close (&this->server_sock_fd);
GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ, (NULL),
GST_DEBUG_OBJECT (src, "waiting for client");
if ((src->client_sock_fd =
accept (src->server_sock_fd, (struct sockaddr *) &src->client_sin,
&src->client_sin_len)) == -1)
goto accept_error;
GST_DEBUG_OBJECT (src, "received client");
GST_FLAG_SET (src, GST_TCPSERVERSRC_OPEN);
return TRUE;
/* ERRORS */
socket_error:
{
GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL), GST_ERROR_SYSTEM);
return FALSE;
}
sock_opt:
{
GST_ELEMENT_ERROR (src, RESOURCE, SETTINGS, (NULL),
("Could not setsockopt: %s", g_strerror (errno)));
return FALSE;
}
host_error:
{
gst_tcp_socket_close (&src->server_sock_fd);
return FALSE;
}
bind_error:
{
gst_tcp_socket_close (&src->server_sock_fd);
switch (errno) {
default:
GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL),
("bind failed: %s", g_strerror (errno)));
break;
}
return FALSE;
}
listen_error:
{
gst_tcp_socket_close (&src->server_sock_fd);
GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL),
("Could not listen on server socket: %s", g_strerror (errno)));
return FALSE;
}
accept_error:
{
gst_tcp_socket_close (&src->server_sock_fd);
GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL),
("Could not accept client on server socket: %s", g_strerror (errno)));
return FALSE;
}
GST_DEBUG_OBJECT (this, "received client");
}
static gboolean
gst_tcpserversrc_stop (GstBaseSrc * bsrc)
{
GstTCPServerSrc *src = GST_TCPSERVERSRC (bsrc);
if (src->server_sock_fd != -1) {
close (src->server_sock_fd);
src->server_sock_fd = -1;
}
if (src->client_sock_fd != -1) {
close (src->client_sock_fd);
src->client_sock_fd = -1;
}
GST_FLAG_UNSET (src, GST_TCPSERVERSRC_OPEN);
GST_FLAG_SET (this, GST_TCPSERVERSRC_OPEN);
return TRUE;
}
static void
gst_tcpserversrc_close (GstTCPServerSrc * this)
{
if (this->server_sock_fd != -1) {
close (this->server_sock_fd);
this->server_sock_fd = -1;
}
if (this->client_sock_fd != -1) {
close (this->client_sock_fd);
this->client_sock_fd = -1;
}
GST_FLAG_UNSET (this, GST_TCPSERVERSRC_OPEN);
}
static GstElementStateReturn
gst_tcpserversrc_change_state (GstElement * element)
{
g_return_val_if_fail (GST_IS_TCPSERVERSRC (element), GST_STATE_FAILURE);
if (GST_STATE_PENDING (element) == GST_STATE_NULL) {
if (GST_FLAG_IS_SET (element, GST_TCPSERVERSRC_OPEN))
gst_tcpserversrc_close (GST_TCPSERVERSRC (element));
} else {
if (!GST_FLAG_IS_SET (element, GST_TCPSERVERSRC_OPEN)) {
if (!gst_tcpserversrc_init_receive (GST_TCPSERVERSRC (element)))
return GST_STATE_FAILURE;
}
}
if (GST_ELEMENT_CLASS (parent_class)->change_state)
return GST_ELEMENT_CLASS (parent_class)->change_state (element);
return GST_STATE_SUCCESS;
}

View file

@ -23,10 +23,9 @@
#define __GST_TCPSERVERSRC_H__
#include <gst/gst.h>
#include <gst/base/gstpushsrc.h>
#ifdef __cplusplus
extern "C" {
#endif /* __cplusplus */
G_END_DECLS
#include <errno.h>
#include <string.h>
@ -60,10 +59,7 @@ typedef enum {
} GstTCPServerSrcFlags;
struct _GstTCPServerSrc {
GstElement element;
/* pad */
GstPad *srcpad;
GstPushSrc element;
/* server information */
int server_port;
@ -81,18 +77,14 @@ struct _GstTCPServerSrc {
GstTCPProtocolType protocol; /* protocol used for reading data */
gboolean caps_received; /* if we have received caps yet */
GstClock *clock;
};
struct _GstTCPServerSrcClass {
GstElementClass parent_class;
GstPushSrcClass parent_class;
};
GType gst_tcpserversrc_get_type (void);
#ifdef __cplusplus
}
#endif /* __cplusplus */
G_BEGIN_DECLS
#endif /* __GST_TCPSERVERSRC_H__ */

View file

@ -1,425 +0,0 @@
/* GStreamer
* Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 59 Temple Place - Suite 330,
* Boston, MA 02111-1307, USA.
*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include "gsttcpsink.h"
#define TCP_DEFAULT_HOST "localhost"
#define TCP_DEFAULT_PORT 4953
/* elementfactory information */
static GstElementDetails gst_tcpsink_details =
GST_ELEMENT_DETAILS ("TCP packet sender",
"Sink/Network",
"Send data over the network via TCP",
"Zeeshan Ali <zak147@yahoo.com>");
/* TCPSink signals and args */
enum
{
FRAME_ENCODED,
/* FILL ME */
LAST_SIGNAL
};
enum
{
ARG_0,
ARG_HOST,
ARG_PORT,
ARG_CONTROL,
ARG_MTU
/* FILL ME */
};
#define GST_TYPE_TCPSINK_CONTROL (gst_tcpsink_control_get_type())
static GType
gst_tcpsink_control_get_type (void)
{
static GType tcpsink_control_type = 0;
static GEnumValue tcpsink_control[] = {
{CONTROL_NONE, "1", "none"},
{CONTROL_TCP, "2", "tcp"},
{CONTROL_ZERO, NULL, NULL}
};
if (!tcpsink_control_type) {
tcpsink_control_type =
g_enum_register_static ("GstTCPSinkControl", tcpsink_control);
}
return tcpsink_control_type;
}
static void gst_tcpsink_base_init (gpointer g_class);
static void gst_tcpsink_class_init (GstTCPSink * klass);
static void gst_tcpsink_init (GstTCPSink * tcpsink);
static void gst_tcpsink_set_clock (GstElement * element, GstClock * clock);
static void gst_tcpsink_chain (GstPad * pad, GstData * _data);
static GstElementStateReturn gst_tcpsink_change_state (GstElement * element);
static void gst_tcpsink_set_property (GObject * object, guint prop_id,
const GValue * value, GParamSpec * pspec);
static void gst_tcpsink_get_property (GObject * object, guint prop_id,
GValue * value, GParamSpec * pspec);
static GstElementClass *parent_class = NULL;
/*static guint gst_tcpsink_signals[LAST_SIGNAL] = { 0 }; */
GType
gst_tcpsink_get_type (void)
{
static GType tcpsink_type = 0;
if (!tcpsink_type) {
static const GTypeInfo tcpsink_info = {
sizeof (GstTCPSinkClass),
gst_tcpsink_base_init,
NULL,
(GClassInitFunc) gst_tcpsink_class_init,
NULL,
NULL,
sizeof (GstTCPSink),
0,
(GInstanceInitFunc) gst_tcpsink_init,
NULL
};
tcpsink_type =
g_type_register_static (GST_TYPE_ELEMENT, "GstTCPSink", &tcpsink_info,
0);
}
return tcpsink_type;
}
static void
gst_tcpsink_base_init (gpointer g_class)
{
GstElementClass *element_class = GST_ELEMENT_CLASS (g_class);
gst_element_class_set_details (element_class, &gst_tcpsink_details);
}
static void
gst_tcpsink_class_init (GstTCPSink * klass)
{
GObjectClass *gobject_class;
GstElementClass *gstelement_class;
gobject_class = (GObjectClass *) klass;
gstelement_class = (GstElementClass *) klass;
parent_class = g_type_class_ref (GST_TYPE_ELEMENT);
g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_HOST,
g_param_spec_string ("host", "host", "The host/IP to send the packets to",
TCP_DEFAULT_HOST, G_PARAM_READWRITE));
g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_PORT,
g_param_spec_int ("port", "port", "The port to send the packets to",
0, 32768, TCP_DEFAULT_PORT, G_PARAM_READWRITE));
g_object_class_install_property (gobject_class, ARG_CONTROL,
g_param_spec_enum ("control", "control", "The type of control",
GST_TYPE_TCPSINK_CONTROL, CONTROL_TCP, G_PARAM_READWRITE));
g_object_class_install_property (gobject_class, ARG_MTU, g_param_spec_int ("mtu", "mtu", "mtu", G_MININT, G_MAXINT, 0, G_PARAM_READWRITE)); /* CHECKME */
gobject_class->set_property = gst_tcpsink_set_property;
gobject_class->get_property = gst_tcpsink_get_property;
gstelement_class->change_state = gst_tcpsink_change_state;
gstelement_class->set_clock = gst_tcpsink_set_clock;
}
static GstPadLinkReturn
gst_tcpsink_sink_link (GstPad * pad, const GstCaps * caps)
{
GstTCPSink *tcpsink;
#ifndef GST_DISABLE_LOADSAVE
struct sockaddr_in serv_addr;
struct in_addr addr;
struct hostent *he;
int fd;
FILE *f;
xmlDocPtr doc;
#endif
tcpsink = GST_TCPSINK (gst_pad_get_parent (pad));
switch (tcpsink->control) {
#ifndef GST_DISABLE_LOADSAVE
case CONTROL_TCP:
memset (&serv_addr, 0, sizeof (serv_addr));
/* if its an IP address */
if (inet_aton (tcpsink->host, &addr)) {
memmove (&(serv_addr.sin_addr), &addr, sizeof (struct in_addr));
}
/* we dont need to lookup for localhost */
else if (strcmp (tcpsink->host, TCP_DEFAULT_HOST) == 0) {
if (inet_aton ("127.0.0.1", &addr)) {
memmove (&(serv_addr.sin_addr), &addr, sizeof (struct in_addr));
}
}
/* if its a hostname */
else if ((he = gethostbyname (tcpsink->host))) {
memmove (&(serv_addr.sin_addr), he->h_addr, he->h_length);
}
else {
perror ("hostname lookup error?");
return GST_PAD_LINK_REFUSED;
}
serv_addr.sin_family = AF_INET;
serv_addr.sin_port = htons (tcpsink->port + 1);
doc = xmlNewDoc ("1.0");
doc->xmlRootNode = xmlNewDocNode (doc, NULL, "NewCaps", NULL);
gst_caps_save_thyself (caps, doc->xmlRootNode);
if ((fd = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP)) == -1) {
perror ("socket");
return GST_PAD_LINK_REFUSED;
}
if (connect (fd, (struct sockaddr *) &serv_addr, sizeof (serv_addr)) != 0) {
g_printerr ("tcpsink: connect to %s port %d failed: %s\n",
tcpsink->host, tcpsink->port + 1, g_strerror (errno));
return GST_PAD_LINK_REFUSED;
}
f = fdopen (dup (fd), "wb");
xmlDocDump (f, doc);
fclose (f);
close (fd);
break;
#endif
case CONTROL_NONE:
return GST_PAD_LINK_OK;
break;
default:
return GST_PAD_LINK_REFUSED;
break;
}
return GST_PAD_LINK_OK;
}
static void
gst_tcpsink_set_clock (GstElement * element, GstClock * clock)
{
GstTCPSink *tcpsink;
tcpsink = GST_TCPSINK (element);
tcpsink->clock = clock;
}
static void
gst_tcpsink_init (GstTCPSink * tcpsink)
{
/* create the sink and src pads */
tcpsink->sinkpad = gst_pad_new ("sink", GST_PAD_SINK);
gst_element_add_pad (GST_ELEMENT (tcpsink), tcpsink->sinkpad);
gst_pad_set_chain_function (tcpsink->sinkpad, gst_tcpsink_chain);
gst_pad_set_link_function (tcpsink->sinkpad, gst_tcpsink_sink_link);
tcpsink->host = g_strdup (TCP_DEFAULT_HOST);
tcpsink->port = TCP_DEFAULT_PORT;
tcpsink->control = CONTROL_TCP;
/* should support as minimum 576 for IPV4 and 1500 for IPV6 */
tcpsink->mtu = 1500;
tcpsink->clock = NULL;
}
static void
gst_tcpsink_chain (GstPad * pad, GstData * _data)
{
GstBuffer *buf = GST_BUFFER (_data);
GstTCPSink *tcpsink;
g_return_if_fail (pad != NULL);
g_return_if_fail (GST_IS_PAD (pad));
g_return_if_fail (buf != NULL);
tcpsink = GST_TCPSINK (GST_OBJECT_PARENT (pad));
if (tcpsink->clock && GST_BUFFER_TIMESTAMP_IS_VALID (buf)) {
gst_element_wait (GST_ELEMENT (tcpsink), GST_BUFFER_TIMESTAMP (buf));
}
if (write (tcpsink->sock, GST_BUFFER_DATA (buf), GST_BUFFER_SIZE (buf)) <= 0) {
perror ("write");
}
gst_buffer_unref (buf);
}
static void
gst_tcpsink_set_property (GObject * object, guint prop_id, const GValue * value,
GParamSpec * pspec)
{
GstTCPSink *tcpsink;
/* it's not null if we got it, but it might not be ours */
g_return_if_fail (GST_IS_TCPSINK (object));
tcpsink = GST_TCPSINK (object);
switch (prop_id) {
case ARG_HOST:
if (tcpsink->host != NULL)
g_free (tcpsink->host);
if (g_value_get_string (value) == NULL)
tcpsink->host = NULL;
else
tcpsink->host = g_strdup (g_value_get_string (value));
break;
case ARG_PORT:
tcpsink->port = g_value_get_int (value);
break;
case ARG_CONTROL:
tcpsink->control = g_value_get_enum (value);
break;
case ARG_MTU:
tcpsink->mtu = g_value_get_int (value);
break;
default:
break;
}
}
static void
gst_tcpsink_get_property (GObject * object, guint prop_id, GValue * value,
GParamSpec * pspec)
{
GstTCPSink *tcpsink;
/* it's not null if we got it, but it might not be ours */
g_return_if_fail (GST_IS_TCPSINK (object));
tcpsink = GST_TCPSINK (object);
switch (prop_id) {
case ARG_HOST:
g_value_set_string (value, tcpsink->host);
break;
case ARG_PORT:
g_value_set_int (value, tcpsink->port);
break;
case ARG_CONTROL:
g_value_set_enum (value, tcpsink->control);
break;
case ARG_MTU:
g_value_set_int (value, tcpsink->mtu);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
}
}
/* create a socket for sending to remote machine */
static gboolean
gst_tcpsink_init_send (GstTCPSink * sink)
{
struct hostent *he;
struct in_addr addr;
memset (&sink->theiraddr, 0, sizeof (sink->theiraddr));
sink->theiraddr.sin_family = AF_INET; /* host byte order */
sink->theiraddr.sin_port = htons (sink->port); /* short, network byte order */
/* if its an IP address */
if (inet_aton (sink->host, &addr)) {
memmove (&(sink->theiraddr.sin_addr), &addr, sizeof (struct in_addr));
}
/* we dont need to lookup for localhost */
else if (strcmp (sink->host, TCP_DEFAULT_HOST) == 0) {
if (inet_aton ("127.0.0.1", &addr)) {
memmove (&(sink->theiraddr.sin_addr), &addr, sizeof (struct in_addr));
}
}
/* if its a hostname */
else if ((he = gethostbyname (sink->host))) {
memmove (&(sink->theiraddr.sin_addr), he->h_addr, he->h_length);
}
else {
perror ("hostname lookup error?");
return FALSE;
}
if ((sink->sock = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP)) == -1) {
perror ("socket");
return FALSE;
}
if (connect (sink->sock, (struct sockaddr *) &(sink->theiraddr),
sizeof (sink->theiraddr)) != 0) {
perror ("stream connect");
return FALSE;
}
GST_FLAG_SET (sink, GST_TCPSINK_OPEN);
return TRUE;
}
static void
gst_tcpsink_close (GstTCPSink * sink)
{
close (sink->sock);
GST_FLAG_UNSET (sink, GST_TCPSINK_OPEN);
}
static GstElementStateReturn
gst_tcpsink_change_state (GstElement * element)
{
g_return_val_if_fail (GST_IS_TCPSINK (element), GST_STATE_FAILURE);
if (GST_STATE_PENDING (element) == GST_STATE_NULL) {
if (GST_FLAG_IS_SET (element, GST_TCPSINK_OPEN))
gst_tcpsink_close (GST_TCPSINK (element));
} else {
if (!GST_FLAG_IS_SET (element, GST_TCPSINK_OPEN)) {
if (!gst_tcpsink_init_send (GST_TCPSINK (element)))
return GST_STATE_FAILURE;
}
}
if (GST_ELEMENT_CLASS (parent_class)->change_state)
return GST_ELEMENT_CLASS (parent_class)->change_state (element);
return GST_STATE_SUCCESS;
}

View file

@ -1,96 +0,0 @@
/* GStreamer
* Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 59 Temple Place - Suite 330,
* Boston, MA 02111-1307, USA.
*/
#ifndef __GST_TCPSINK_H__
#define __GST_TCPSINK_H__
#include <gst/gst.h>
#ifdef __cplusplus
extern "C" {
#endif /* __cplusplus */
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <sys/time.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <netdb.h>
#include <sys/socket.h>
#include <sys/wait.h>
#include <fcntl.h>
#include <arpa/inet.h>
#include "gsttcpplugin.h"
#define GST_TYPE_TCPSINK \
(gst_tcpsink_get_type())
#define GST_TCPSINK(obj) \
(G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_TCPSINK,GstTCPSink))
#define GST_TCPSINK_CLASS(klass) \
(G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_TCPSINK,GstTCPSink))
#define GST_IS_TCPSINK(obj) \
(G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_TCPSINK))
#define GST_IS_TCPSINK_CLASS(obj) \
(G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_TCPSINK))
typedef struct _GstTCPSink GstTCPSink;
typedef struct _GstTCPSinkClass GstTCPSinkClass;
typedef enum {
GST_TCPSINK_OPEN = GST_ELEMENT_FLAG_LAST,
GST_TCPSINK_FLAG_LAST = GST_ELEMENT_FLAG_LAST + 2,
} GstTCPSinkFlags;
struct _GstTCPSink {
GstElement element;
/* pads */
GstPad *sinkpad,*srcpad;
int sock;
struct sockaddr_in theiraddr;
Gst_TCP_Control control;
gint port;
gchar *host;
guint mtu;
GstClock *clock;
};
struct _GstTCPSinkClass {
GstElementClass parent_class;
};
GType gst_tcpsink_get_type(void);
#ifdef __cplusplus
}
#endif /* __cplusplus */
#endif /* __GST_TCPSINK_H__ */

View file

@ -1,504 +0,0 @@
/* GStreamer
* Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 59 Temple Place - Suite 330,
* Boston, MA 02111-1307, USA.
*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include "gsttcpsrc.h"
#include <unistd.h>
#define TCP_DEFAULT_PORT 4953
/* elementfactory information */
static GstElementDetails gst_tcpsrc_details =
GST_ELEMENT_DETAILS ("TCP packet receiver",
"Source/Network",
"Receive data over the network via TCP",
"Zeeshan Ali <zak147@yahoo.com>");
/* TCPSrc signals and args */
enum
{
/* FILL ME */
LAST_SIGNAL
};
enum
{
ARG_0,
ARG_PORT,
ARG_CONTROL
/* ARG_SOCKET_OPTIONS,*/
/* FILL ME */
};
#define GST_TYPE_TCPSRC_CONTROL (gst_tcpsrc_control_get_type())
static GType
gst_tcpsrc_control_get_type (void)
{
static GType tcpsrc_control_type = 0;
static GEnumValue tcpsrc_control[] = {
{CONTROL_NONE, "1", "none"},
{CONTROL_TCP, "2", "tcp"},
{CONTROL_ZERO, NULL, NULL}
};
if (!tcpsrc_control_type) {
tcpsrc_control_type =
g_enum_register_static ("GstTCPSrcControl", tcpsrc_control);
}
return tcpsrc_control_type;
}
static void gst_tcpsrc_base_init (gpointer g_class);
static void gst_tcpsrc_class_init (GstTCPSrc * klass);
static void gst_tcpsrc_init (GstTCPSrc * tcpsrc);
static GstData *gst_tcpsrc_get (GstPad * pad);
static GstElementStateReturn gst_tcpsrc_change_state (GstElement * element);
static void gst_tcpsrc_set_property (GObject * object, guint prop_id,
const GValue * value, GParamSpec * pspec);
static void gst_tcpsrc_get_property (GObject * object, guint prop_id,
GValue * value, GParamSpec * pspec);
static void gst_tcpsrc_set_clock (GstElement * element, GstClock * clock);
static GstElementClass *parent_class = NULL;
/*static guint gst_tcpsrc_signals[LAST_SIGNAL] = { 0 }; */
GType
gst_tcpsrc_get_type (void)
{
static GType tcpsrc_type = 0;
if (!tcpsrc_type) {
static const GTypeInfo tcpsrc_info = {
sizeof (GstTCPSrcClass),
gst_tcpsrc_base_init,
NULL,
(GClassInitFunc) gst_tcpsrc_class_init,
NULL,
NULL,
sizeof (GstTCPSrc),
0,
(GInstanceInitFunc) gst_tcpsrc_init,
NULL
};
tcpsrc_type =
g_type_register_static (GST_TYPE_ELEMENT, "GstTCPSrc", &tcpsrc_info, 0);
}
return tcpsrc_type;
}
static void
gst_tcpsrc_base_init (gpointer g_class)
{
GstElementClass *element_class = GST_ELEMENT_CLASS (g_class);
gst_element_class_set_details (element_class, &gst_tcpsrc_details);
}
static void
gst_tcpsrc_class_init (GstTCPSrc * klass)
{
GObjectClass *gobject_class;
GstElementClass *gstelement_class;
gobject_class = (GObjectClass *) klass;
gstelement_class = (GstElementClass *) klass;
parent_class = g_type_class_ref (GST_TYPE_ELEMENT);
g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_PORT,
g_param_spec_int ("port", "port", "The port to receive the packets from",
0, 32768, TCP_DEFAULT_PORT, G_PARAM_READWRITE));
g_object_class_install_property (gobject_class, ARG_CONTROL,
g_param_spec_enum ("control", "control", "The type of control",
GST_TYPE_TCPSRC_CONTROL, CONTROL_TCP, G_PARAM_READWRITE));
/*
g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_SOCKET_OPTIONS,
g_param_spec_boolean ("socketop", "socketop", "Enable or disable socket options REUSEADDR and KEEPALIVE",
FALSE, G_PARAM_READWRITE));
*/
gobject_class->set_property = gst_tcpsrc_set_property;
gobject_class->get_property = gst_tcpsrc_get_property;
gstelement_class->change_state = gst_tcpsrc_change_state;
gstelement_class->set_clock = gst_tcpsrc_set_clock;
}
static void
gst_tcpsrc_set_clock (GstElement * element, GstClock * clock)
{
GstTCPSrc *tcpsrc;
tcpsrc = GST_TCPSRC (element);
tcpsrc->clock = clock;
}
static void
gst_tcpsrc_init (GstTCPSrc * tcpsrc)
{
/* create the src and src pads */
tcpsrc->srcpad = gst_pad_new ("src", GST_PAD_SRC);
gst_element_add_pad (GST_ELEMENT (tcpsrc), tcpsrc->srcpad);
gst_pad_set_get_function (tcpsrc->srcpad, gst_tcpsrc_get);
tcpsrc->port = TCP_DEFAULT_PORT;
tcpsrc->control = CONTROL_TCP;
tcpsrc->clock = NULL;
tcpsrc->sock = -1;
tcpsrc->control_sock = -1;
tcpsrc->client_sock = -1;
/*tcpsrc->socket_options = FALSE; */
GST_FLAG_UNSET (tcpsrc, GST_TCPSRC_OPEN);
GST_FLAG_SET (tcpsrc, GST_TCPSRC_1ST_BUF);
GST_FLAG_UNSET (tcpsrc, GST_TCPSRC_CONNECTED);
}
static GstData *
gst_tcpsrc_get (GstPad * pad)
{
GstTCPSrc *tcpsrc;
GstBuffer *outbuf;
socklen_t len;
gint numbytes;
fd_set read_fds;
guint max_sock;
#ifndef GST_DISABLE_LOADSAVE
int ret, client_sock;
#endif
struct sockaddr client_addr;
g_return_val_if_fail (pad != NULL, NULL);
g_return_val_if_fail (GST_IS_PAD (pad), NULL);
tcpsrc = GST_TCPSRC (GST_OBJECT_PARENT (pad));
FD_ZERO (&read_fds);
FD_SET (tcpsrc->sock, &read_fds);
max_sock = tcpsrc->sock;
if (tcpsrc->control_sock >= 0) {
FD_SET (tcpsrc->control_sock, &read_fds);
max_sock = MAX (tcpsrc->sock, tcpsrc->control_sock);
}
/* Add to FD_SET client socket, when connection has been established */
if (tcpsrc->client_sock >= 0) {
FD_SET (tcpsrc->client_sock, &read_fds);
max_sock = MAX (tcpsrc->client_sock, max_sock);
}
if (select (max_sock + 1, &read_fds, NULL, NULL, NULL) > 0) {
if ((tcpsrc->control_sock != -1)
&& FD_ISSET (tcpsrc->control_sock, &read_fds)) {
guchar *buf = NULL;
#ifndef GST_DISABLE_LOADSAVE
xmlDocPtr doc;
GstCaps *caps;
#endif
switch (tcpsrc->control) {
case CONTROL_TCP:
#ifndef GST_DISABLE_LOADSAVE
buf = g_malloc (1024 * 10);
len = sizeof (struct sockaddr);
client_sock = accept (tcpsrc->control_sock, &client_addr, &len);
if (client_sock <= 0) {
perror ("control_sock accept");
}
else if ((ret = read (client_sock, buf, 1024 * 10)) <= 0) {
perror ("control_sock read");
}
else {
buf[ret] = '\0';
doc = xmlParseMemory (buf, ret);
caps = gst_caps_load_thyself (doc->xmlRootNode);
/* foward the connect, we don't signal back the result here... */
gst_pad_try_set_caps (tcpsrc->srcpad, caps);
}
g_free (buf);
#endif
break;
case CONTROL_NONE:
default:
g_free (buf);
return NULL;
break;
}
outbuf = NULL;
} else {
outbuf = gst_buffer_new ();
GST_BUFFER_DATA (outbuf) = g_malloc (24000);
GST_BUFFER_SIZE (outbuf) = 24000;
if (GST_FLAG_IS_SET (tcpsrc, GST_TCPSRC_1ST_BUF)) {
if (tcpsrc->clock) {
GstClockTime current_time;
GstEvent *discont;
current_time = gst_clock_get_time (tcpsrc->clock);
GST_BUFFER_TIMESTAMP (outbuf) = current_time;
discont = gst_event_new_discontinuous (FALSE, GST_FORMAT_TIME,
current_time, NULL);
gst_pad_push (tcpsrc->srcpad, GST_DATA (discont));
}
GST_FLAG_UNSET (tcpsrc, GST_TCPSRC_1ST_BUF);
}
else {
GST_BUFFER_TIMESTAMP (outbuf) = GST_CLOCK_TIME_NONE;
}
if (!GST_FLAG_IS_SET (tcpsrc, GST_TCPSRC_CONNECTED)) {
tcpsrc->client_sock = accept (tcpsrc->sock, &client_addr, &len);
if (tcpsrc->client_sock <= 0) {
perror ("accept");
}
else {
GST_FLAG_SET (tcpsrc, GST_TCPSRC_CONNECTED);
}
}
numbytes =
read (tcpsrc->client_sock, GST_BUFFER_DATA (outbuf),
GST_BUFFER_SIZE (outbuf));
if (numbytes > 0) {
GST_BUFFER_SIZE (outbuf) = numbytes;
}
else {
if (numbytes == -1) {
perror ("read");
} else
g_print ("End of Stream reached\n");
gst_buffer_unref (outbuf);
outbuf = NULL;
close (tcpsrc->client_sock);
tcpsrc->client_sock = -1;
GST_FLAG_UNSET (tcpsrc, GST_TCPSRC_CONNECTED);
}
}
}
else {
perror ("select");
outbuf = NULL;
}
return GST_DATA (outbuf);
}
static void
gst_tcpsrc_set_property (GObject * object, guint prop_id, const GValue * value,
GParamSpec * pspec)
{
GstTCPSrc *tcpsrc;
/* it's not null if we got it, but it might not be ours */
g_return_if_fail (GST_IS_TCPSRC (object));
tcpsrc = GST_TCPSRC (object);
switch (prop_id) {
case ARG_PORT:
tcpsrc->port = g_value_get_int (value);
break;
case ARG_CONTROL:
tcpsrc->control = g_value_get_enum (value);
break;
/* case ARG_SOCKET_OPTIONS:
tcpsrc->socket_options = g_value_get_boolean(value);
break; */
default:
break;
}
}
static void
gst_tcpsrc_get_property (GObject * object, guint prop_id, GValue * value,
GParamSpec * pspec)
{
GstTCPSrc *tcpsrc;
/* it's not null if we got it, but it might not be ours */
g_return_if_fail (GST_IS_TCPSRC (object));
tcpsrc = GST_TCPSRC (object);
switch (prop_id) {
case ARG_PORT:
g_value_set_int (value, tcpsrc->port);
break;
case ARG_CONTROL:
g_value_set_enum (value, tcpsrc->control);
break;
/* case ARG_SOCKET_OPTIONS:
g_value_set_boolean(value,tcpsrc->socket_options);
break;*/
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
}
}
/* create a socket for sending to remote machine */
static gboolean
gst_tcpsrc_init_receive (GstTCPSrc * src)
{
guint val = 0;
memset (&src->myaddr, 0, sizeof (src->myaddr));
src->myaddr.sin_family = AF_INET; /* host byte order */
src->myaddr.sin_port = htons (src->port); /* short, network byte order */
src->myaddr.sin_addr.s_addr = INADDR_ANY;
if ((src->sock = socket (AF_INET, SOCK_STREAM, 0)) == -1) {
perror ("stream_socket");
return FALSE;
}
/* if (src->socket_options)
{*/
g_print ("Socket Options SO_REUSEADDR, SO_KEEPALIVE\n");
/* Sock Options */
val = 1;
/* allow local address reuse */
if (setsockopt (src->sock, SOL_SOCKET, SO_REUSEADDR, &val, sizeof (int)) < 0)
perror ("setsockopt()");
val = 1;
/* periodically test if connection still alive */
if (setsockopt (src->sock, SOL_SOCKET, SO_KEEPALIVE, &val, sizeof (int)) < 0)
perror ("setsockopt()");
/* Sock Options */
/* } */
if (bind (src->sock, (struct sockaddr *) &src->myaddr,
sizeof (src->myaddr)) == -1) {
perror ("stream_sock bind");
return FALSE;
}
if (listen (src->sock, 5) == -1) {
perror ("stream_sock listen");
return FALSE;
}
fcntl (src->sock, F_SETFL, O_NONBLOCK);
switch (src->control) {
case CONTROL_TCP:
if ((src->control_sock = socket (AF_INET, SOCK_STREAM, 0)) == -1) {
perror ("control_socket");
return FALSE;
}
src->myaddr.sin_port = htons (src->port + 1);
if (bind (src->control_sock, (struct sockaddr *) &src->myaddr,
sizeof (src->myaddr)) == -1) {
perror ("control bind");
return FALSE;
}
if (listen (src->control_sock, 5) == -1) {
perror ("control listen");
return FALSE;
}
fcntl (src->control_sock, F_SETFL, O_NONBLOCK);
case CONTROL_NONE:
GST_FLAG_SET (src, GST_TCPSRC_OPEN);
return TRUE;
break;
default:
return FALSE;
break;
}
GST_FLAG_SET (src, GST_TCPSRC_OPEN);
return TRUE;
}
static void
gst_tcpsrc_close (GstTCPSrc * src)
{
if (src->sock != -1) {
close (src->sock);
src->sock = -1;
}
if (src->control_sock != -1) {
close (src->control_sock);
src->control_sock = -1;
}
if (src->client_sock != -1) {
close (src->client_sock);
src->client_sock = -1;
}
GST_FLAG_UNSET (src, GST_TCPSRC_OPEN);
}
static GstElementStateReturn
gst_tcpsrc_change_state (GstElement * element)
{
g_return_val_if_fail (GST_IS_TCPSRC (element), GST_STATE_FAILURE);
if (GST_STATE_PENDING (element) == GST_STATE_NULL) {
if (GST_FLAG_IS_SET (element, GST_TCPSRC_OPEN))
gst_tcpsrc_close (GST_TCPSRC (element));
} else {
if (!GST_FLAG_IS_SET (element, GST_TCPSRC_OPEN)) {
if (!gst_tcpsrc_init_receive (GST_TCPSRC (element)))
return GST_STATE_FAILURE;
}
}
if (GST_ELEMENT_CLASS (parent_class)->change_state)
return GST_ELEMENT_CLASS (parent_class)->change_state (element);
return GST_STATE_SUCCESS;
}

View file

@ -1,92 +0,0 @@
/* GStreamer
* Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 59 Temple Place - Suite 330,
* Boston, MA 02111-1307, USA.
*/
#ifndef __GST_TCPSRC_H__
#define __GST_TCPSRC_H__
#include <gst/gst.h>
#ifdef __cplusplus
extern "C" {
#endif /* __cplusplus */
#include <errno.h>
#include <string.h>
#include <sys/types.h>
#include <netdb.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "gsttcpplugin.h"
#include <fcntl.h>
#define GST_TYPE_TCPSRC \
(gst_tcpsrc_get_type())
#define GST_TCPSRC(obj) \
(G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_TCPSRC,GstTCPSrc))
#define GST_TCPSRC_CLASS(klass) \
(G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_TCPSRC,GstTCPSrc))
#define GST_IS_TCPSRC(obj) \
(G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_TCPSRC))
#define GST_IS_TCPSRC_CLASS(obj) \
(G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_TCPSRC))
typedef struct _GstTCPSrc GstTCPSrc;
typedef struct _GstTCPSrcClass GstTCPSrcClass;
typedef enum {
GST_TCPSRC_OPEN = GST_ELEMENT_FLAG_LAST,
GST_TCPSRC_1ST_BUF,
GST_TCPSRC_CONNECTED,
GST_TCPSRC_FLAG_LAST,
} GstTCPSrcFlags;
struct _GstTCPSrc {
GstElement element;
/* pads */
GstPad *sinkpad,*srcpad;
int port;
int sock;
int client_sock;
int control_sock;
/* gboolean socket_options;*/
Gst_TCP_Control control;
struct sockaddr_in myaddr;
GstClock *clock;
};
struct _GstTCPSrcClass {
GstElementClass parent_class;
};
GType gst_tcpsrc_get_type(void);
#ifdef __cplusplus
}
#endif /* __cplusplus */
#endif /* __GST_TCPSRC_H__ */