From e05e44f6d57ba3ba24945c3c8ccca6aa5ab8ac71 Mon Sep 17 00:00:00 2001 From: Zeeshan Ali Date: Thu, 17 Jul 2003 23:04:46 +0000 Subject: [PATCH] A TCP plugin could be needed by many, including wtay himself cause he is sitting behind a firewall blocking UDP and h... Original commit message from CVS: A TCP plugin could be needed by many, including wtay himself cause he is sitting behind a firewall blocking UDP and he can't hear or see me. :) Shamefully most of the code is from udpsrc/sink. Still timestamping/clock does'nt work. :( --- gst/tcp/Makefile.am | 12 ++ gst/tcp/README | 20 +++ gst/tcp/gsttcp.c | 52 ++++++ gst/tcp/gsttcp.h | 35 ++++ gst/tcp/gsttcpplugin.c | 52 ++++++ gst/tcp/gsttcpplugin.h | 35 ++++ gst/tcp/gsttcpsink.c | 363 ++++++++++++++++++++++++++++++++++++++ gst/tcp/gsttcpsink.h | 95 ++++++++++ gst/tcp/gsttcpsrc.c | 383 +++++++++++++++++++++++++++++++++++++++++ gst/tcp/gsttcpsrc.h | 90 ++++++++++ 10 files changed, 1137 insertions(+) create mode 100644 gst/tcp/Makefile.am create mode 100644 gst/tcp/README create mode 100644 gst/tcp/gsttcp.c create mode 100644 gst/tcp/gsttcp.h create mode 100644 gst/tcp/gsttcpplugin.c create mode 100644 gst/tcp/gsttcpplugin.h create mode 100644 gst/tcp/gsttcpsink.c create mode 100644 gst/tcp/gsttcpsink.h create mode 100644 gst/tcp/gsttcpsrc.c create mode 100644 gst/tcp/gsttcpsrc.h diff --git a/gst/tcp/Makefile.am b/gst/tcp/Makefile.am new file mode 100644 index 0000000000..0230cf80fc --- /dev/null +++ b/gst/tcp/Makefile.am @@ -0,0 +1,12 @@ +plugindir = $(libdir)/gstreamer-@GST_MAJORMINOR@ + +plugin_LTLIBRARIES = libgsttcp.la + +libgsttcp_la_SOURCES = gsttcp.c gsttcpsrc.c gsttcpsink.c +libgsttcp_la_CFLAGS = $(GST_CFLAGS) +libgsttcp_la_LIBADD = +libgsttcp_la_LDFLAGS = $(GST_PLUGIN_LDFLAGS) + +noinst_HEADERS = gsttcpsink.h gsttcpsrc.h gsttcp.h + +EXTRA_DIST = README diff --git a/gst/tcp/README b/gst/tcp/README new file mode 100644 index 0000000000..72972ab3c9 --- /dev/null +++ b/gst/tcp/README @@ -0,0 +1,20 @@ +* What is TCP src/sink? + +solution, like icecast or realaudio or whatever. But the future RTP plugins shall not do the actual transmission/reception of packets on the network themselve but the Application developer would be encouraged to use either the TCP or the UDP plugins for that. UDP would be used mostly but there could be situations where TCP would be the only available choice. For example streaming accross firewalls that do not allow UDP. + +* Shortcomings + +Even given our modest ambitions, the current code doesn't handle +caps negotiation robustly. + +* Todo + +The caps nego should do bi-directional negotiation. + +Perhaps this plugin can be the example of how to do caps negotiation +via a point-to-point protocol. + +12 Sep 2001 +Wim Taymans +Joshua N Pritikin +Zeeshan Ali diff --git a/gst/tcp/gsttcp.c b/gst/tcp/gsttcp.c new file mode 100644 index 0000000000..1b7f57aeed --- /dev/null +++ b/gst/tcp/gsttcp.c @@ -0,0 +1,52 @@ +/* GStreamer + * Copyright (C) <1999> Erik Walthinsen + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place - Suite 330, + * Boston, MA 02111-1307, USA. + */ + + +#include "gsttcpsrc.h" +#include "gsttcpsink.h" + +/* elementfactory information */ +extern GstElementDetails gst_tcpsrc_details; +extern GstElementDetails gst_tcpsink_details; + +static gboolean +plugin_init (GModule *module, GstPlugin *plugin) +{ + GstElementFactory *src, *sink; + + /* create an elementfactory for the tcpsrc element */ + sink = gst_element_factory_new ("tcpsink",GST_TYPE_TCPSINK, + &gst_tcpsink_details); + g_return_val_if_fail (sink != NULL, FALSE); + gst_plugin_add_feature (plugin, GST_PLUGIN_FEATURE (sink)); + + src = gst_element_factory_new ("tcpsrc",GST_TYPE_TCPSRC, + &gst_tcpsrc_details); + g_return_val_if_fail (src != NULL, FALSE); + gst_plugin_add_feature (plugin, GST_PLUGIN_FEATURE (src)); + + return TRUE; +} + +GstPluginDesc plugin_desc = { + GST_VERSION_MAJOR, + GST_VERSION_MINOR, + "tcp", + plugin_init +}; diff --git a/gst/tcp/gsttcp.h b/gst/tcp/gsttcp.h new file mode 100644 index 0000000000..38377d81d6 --- /dev/null +++ b/gst/tcp/gsttcp.h @@ -0,0 +1,35 @@ +/* GStreamer + * Copyright (C) <1999> Erik Walthinsen + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place - Suite 330, + * Boston, MA 02111-1307, USA. + */ + + +#ifndef __GST_TCP_H__ +#define __GST_TCP_H__ + +#ifdef __cplusplus +extern "C" { +#endif /* __cplusplus */ + +typedef enum { + CONTROL_ZERO, + CONTROL_NONE, + CONTROL_TCP, + CONTROL_TCP +} Gst_TCP_Control; + +#endif diff --git a/gst/tcp/gsttcpplugin.c b/gst/tcp/gsttcpplugin.c new file mode 100644 index 0000000000..1b7f57aeed --- /dev/null +++ b/gst/tcp/gsttcpplugin.c @@ -0,0 +1,52 @@ +/* GStreamer + * Copyright (C) <1999> Erik Walthinsen + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place - Suite 330, + * Boston, MA 02111-1307, USA. + */ + + +#include "gsttcpsrc.h" +#include "gsttcpsink.h" + +/* elementfactory information */ +extern GstElementDetails gst_tcpsrc_details; +extern GstElementDetails gst_tcpsink_details; + +static gboolean +plugin_init (GModule *module, GstPlugin *plugin) +{ + GstElementFactory *src, *sink; + + /* create an elementfactory for the tcpsrc element */ + sink = gst_element_factory_new ("tcpsink",GST_TYPE_TCPSINK, + &gst_tcpsink_details); + g_return_val_if_fail (sink != NULL, FALSE); + gst_plugin_add_feature (plugin, GST_PLUGIN_FEATURE (sink)); + + src = gst_element_factory_new ("tcpsrc",GST_TYPE_TCPSRC, + &gst_tcpsrc_details); + g_return_val_if_fail (src != NULL, FALSE); + gst_plugin_add_feature (plugin, GST_PLUGIN_FEATURE (src)); + + return TRUE; +} + +GstPluginDesc plugin_desc = { + GST_VERSION_MAJOR, + GST_VERSION_MINOR, + "tcp", + plugin_init +}; diff --git a/gst/tcp/gsttcpplugin.h b/gst/tcp/gsttcpplugin.h new file mode 100644 index 0000000000..38377d81d6 --- /dev/null +++ b/gst/tcp/gsttcpplugin.h @@ -0,0 +1,35 @@ +/* GStreamer + * Copyright (C) <1999> Erik Walthinsen + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place - Suite 330, + * Boston, MA 02111-1307, USA. + */ + + +#ifndef __GST_TCP_H__ +#define __GST_TCP_H__ + +#ifdef __cplusplus +extern "C" { +#endif /* __cplusplus */ + +typedef enum { + CONTROL_ZERO, + CONTROL_NONE, + CONTROL_TCP, + CONTROL_TCP +} Gst_TCP_Control; + +#endif diff --git a/gst/tcp/gsttcpsink.c b/gst/tcp/gsttcpsink.c new file mode 100644 index 0000000000..dcbbe41a72 --- /dev/null +++ b/gst/tcp/gsttcpsink.c @@ -0,0 +1,363 @@ +/* GStreamer + * Copyright (C) <1999> Erik Walthinsen + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place - Suite 330, + * Boston, MA 02111-1307, USA. + */ + + +#include "gsttcpsink.h" + +#define TCP_DEFAULT_HOST "localhost" +#define TCP_DEFAULT_PORT 4953 + +/* elementfactory information */ +GstElementDetails gst_tcpsink_details = { + "TCP packet sender", + "Sink/Network", + "LGPL", + "Send data over the network via TCP", + VERSION, + "Zeeshan Ali ", + "(C) 2003", +}; + +/* TCPSink signals and args */ +enum { + FRAME_ENCODED, + /* FILL ME */ + LAST_SIGNAL +}; + +enum { + ARG_0, + ARG_HOST, + ARG_PORT, + /* FILL ME */ +}; + +static void gst_tcpsink_class_init (GstTCPSink *klass); +static void gst_tcpsink_init (GstTCPSink *tcpsink); + +static void gst_tcpsink_set_clock (GstElement *element, GstClock *clock); + +static void gst_tcpsink_chain (GstPad *pad,GstBuffer *buf); +static GstElementStateReturn gst_tcpsink_change_state (GstElement *element); + +static void gst_tcpsink_set_property (GObject *object, guint prop_id, + const GValue *value, GParamSpec *pspec); +static void gst_tcpsink_get_property (GObject *object, guint prop_id, + GValue *value, GParamSpec *pspec); + + +static GstElementClass *parent_class = NULL; +/*static guint gst_tcpsink_signals[LAST_SIGNAL] = { 0 }; */ + +GType +gst_tcpsink_get_type (void) +{ + static GType tcpsink_type = 0; + + + if (!tcpsink_type) { + static const GTypeInfo tcpsink_info = { + sizeof(GstTCPSinkClass), + NULL, + NULL, + (GClassInitFunc)gst_tcpsink_class_init, + NULL, + NULL, + sizeof(GstTCPSink), + 0, + (GInstanceInitFunc)gst_tcpsink_init, + NULL + }; + tcpsink_type = g_type_register_static (GST_TYPE_ELEMENT, "GstTCPSink", &tcpsink_info, 0); + } + return tcpsink_type; +} + +static void +gst_tcpsink_class_init (GstTCPSink *klass) +{ + GObjectClass *gobject_class; + GstElementClass *gstelement_class; + + gobject_class = (GObjectClass*) klass; + gstelement_class = (GstElementClass*) klass; + + parent_class = g_type_class_ref (GST_TYPE_ELEMENT); + + g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_HOST, + g_param_spec_string ("host", "host", "The host/IP to send the packets to", + TCP_DEFAULT_HOST, G_PARAM_READWRITE)); + g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_PORT, + g_param_spec_int ("port", "port", "The port to send the packets to", + 0, 32768, TCP_DEFAULT_PORT, G_PARAM_READWRITE)); + + gobject_class->set_property = gst_tcpsink_set_property; + gobject_class->get_property = gst_tcpsink_get_property; + + gstelement_class->change_state = gst_tcpsink_change_state; + gstelement_class->set_clock = gst_tcpsink_set_clock; +} + + +static GstPadLinkReturn +gst_tcpsink_sinkconnect (GstPad *pad, GstCaps *caps) +{ + GstTCPSink *tcpsink; + struct sockaddr_in serv_addr; + struct in_addr addr; + struct hostent *he; + int fd; + FILE *f; +#ifndef GST_DISABLE_LOADSAVE + xmlDocPtr doc; + + tcpsink = GST_TCPSINK (gst_pad_get_parent (pad)); + + memset (&serv_addr, 0, sizeof(serv_addr)); + + /* if its an IP address */ + if (inet_aton (tcpsink->host, &addr)) { + memmove (&(serv_addr.sin_addr), &addr, sizeof (struct in_addr)); + } + + /* we dont need to lookup for localhost */ + else if (strcmp (tcpsink->host, TCP_DEFAULT_HOST) == 0) { + if (inet_aton ("127.0.0.1", &addr)) { + memmove (&(serv_addr.sin_addr), &addr, sizeof (struct in_addr)); + } + } + + /* if its a hostname */ + else if ((he = gethostbyname (tcpsink->host))) { + memmove (&(serv_addr.sin_addr), he->h_addr, he->h_length); + } + + else { + perror("hostname lookup error?"); + return GST_PAD_LINK_REFUSED; + } + + serv_addr.sin_family = AF_INET; + serv_addr.sin_port = htons(tcpsink->port+1); + + doc = xmlNewDoc ("1.0"); + doc->xmlRootNode = xmlNewDocNode (doc, NULL, "NewCaps", NULL); + + gst_caps_save_thyself (caps, doc->xmlRootNode); + + if ((fd = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP)) == -1) { + perror("socket"); + return GST_PAD_LINK_REFUSED; + } + + if (connect(fd, (struct sockaddr *)&serv_addr, sizeof (serv_addr)) != 0) { + g_printerr ("tcpsink: connect to %s port %d failed: %s\n", + tcpsink->host, tcpsink->port+1, g_strerror(errno)); + return GST_PAD_LINK_REFUSED; + } + + f = fdopen (dup (fd), "wb"); + + xmlDocDump(f, doc); + fclose (f); + close (fd); +#endif + + return GST_PAD_LINK_OK; +} + +static void +gst_tcpsink_set_clock (GstElement *element, GstClock *clock) +{ + GstTCPSink *tcpsink; + + tcpsink = GST_TCPSINK (element); + + tcpsink->clock = clock; +} + +static void +gst_tcpsink_init (GstTCPSink *tcpsink) +{ + /* create the sink and src pads */ + tcpsink->sinkpad = gst_pad_new ("sink", GST_PAD_SINK); + gst_element_add_pad (GST_ELEMENT (tcpsink), tcpsink->sinkpad); + gst_pad_set_chain_function (tcpsink->sinkpad, gst_tcpsink_chain); + gst_pad_set_link_function (tcpsink->sinkpad, gst_tcpsink_sinkconnect); + + tcpsink->host = g_strdup (TCP_DEFAULT_HOST); + tcpsink->port = TCP_DEFAULT_PORT; + + tcpsink->clock = NULL; +} + +static void +gst_tcpsink_chain (GstPad *pad, GstBuffer *buf) +{ + GstTCPSink *tcpsink; + + g_return_if_fail (pad != NULL); + g_return_if_fail (GST_IS_PAD (pad)); + g_return_if_fail (buf != NULL); + + tcpsink = GST_TCPSINK (GST_OBJECT_PARENT (pad)); + + if (tcpsink->clock) { + GstClockID id = gst_clock_new_single_shot_id (tcpsink->clock, GST_BUFFER_TIMESTAMP (buf)); + + GST_DEBUG (0, "tcpsink: clock wait: %" G_GUINT64_FORMAT "\n", GST_BUFFER_TIMESTAMP (buf)); + gst_element_clock_wait (GST_ELEMENT (tcpsink), id, NULL); + gst_clock_id_free (id); + } + + if (write (tcpsink->sock, GST_BUFFER_DATA (buf), GST_BUFFER_SIZE (buf)) <= 0) + { + perror("write"); + } + + gst_buffer_unref(buf); +} + +static void +gst_tcpsink_set_property (GObject *object, guint prop_id, const GValue *value, GParamSpec *pspec) +{ + GstTCPSink *tcpsink; + + /* it's not null if we got it, but it might not be ours */ + g_return_if_fail(GST_IS_TCPSINK(object)); + tcpsink = GST_TCPSINK(object); + + switch (prop_id) { + case ARG_HOST: + if (tcpsink->host != NULL) g_free(tcpsink->host); + if (g_value_get_string (value) == NULL) + tcpsink->host = NULL; + else + tcpsink->host = g_strdup (g_value_get_string (value)); + break; + case ARG_PORT: + tcpsink->port = g_value_get_int (value); + break; + default: + break; + } +} + +static void +gst_tcpsink_get_property (GObject *object, guint prop_id, GValue *value, GParamSpec *pspec) +{ + GstTCPSink *tcpsink; + + /* it's not null if we got it, but it might not be ours */ + g_return_if_fail(GST_IS_TCPSINK(object)); + tcpsink = GST_TCPSINK(object); + + switch (prop_id) { + case ARG_HOST: + g_value_set_string (value, tcpsink->host); + break; + case ARG_PORT: + g_value_set_int (value, tcpsink->port); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + + +/* create a socket for sending to remote machine */ +static gboolean +gst_tcpsink_init_send (GstTCPSink *sink) +{ + struct hostent *he; + struct in_addr addr; + + bzero (&sink->theiraddr, sizeof (sink->theiraddr)); + sink->theiraddr.sin_family = AF_INET; /* host byte order */ + sink->theiraddr.sin_port = htons (sink->port); /* short, network byte order */ + + /* if its an IP address */ + if (inet_aton (sink->host, &addr)) { + memmove (&(sink->theiraddr.sin_addr), &addr, sizeof (struct in_addr)); + } + + /* we dont need to lookup for localhost */ + else if (strcmp (sink->host, TCP_DEFAULT_HOST) == 0) { + if (inet_aton ("127.0.0.1", &addr)) { + memmove (&(sink->theiraddr.sin_addr), &addr, sizeof (struct in_addr)); + } + } + + /* if its a hostname */ + else if ((he = gethostbyname (sink->host))) { + memmove (&(sink->theiraddr.sin_addr), he->h_addr, he->h_length); + } + + else { + perror("hostname lookup error?"); + return FALSE; + } + + if ((sink->sock = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP)) == -1) { + perror("socket"); + return FALSE; + } + + if (connect (sink->sock, (struct sockaddr *)&(sink->theiraddr), sizeof (sink->theiraddr)) != 0) { + perror("stream connect"); + return FALSE; + } + + g_print ("tcpsink: connected to %s port %d\n", sink->host, sink->port); + + GST_FLAG_SET (sink, GST_TCPSINK_OPEN); + + return TRUE; +} + +static void +gst_tcpsink_close (GstTCPSink *sink) +{ + close (sink->sock); + + GST_FLAG_UNSET (sink, GST_TCPSINK_OPEN); +} + +static GstElementStateReturn +gst_tcpsink_change_state (GstElement *element) +{ + g_return_val_if_fail (GST_IS_TCPSINK (element), GST_STATE_FAILURE); + + if (GST_STATE_PENDING (element) == GST_STATE_NULL) { + if (GST_FLAG_IS_SET (element, GST_TCPSINK_OPEN)) + gst_tcpsink_close (GST_TCPSINK (element)); + } else { + if (!GST_FLAG_IS_SET (element, GST_TCPSINK_OPEN)) { + if (!gst_tcpsink_init_send (GST_TCPSINK (element))) + return GST_STATE_FAILURE; + } + } + + if (GST_ELEMENT_CLASS (parent_class)->change_state) + return GST_ELEMENT_CLASS (parent_class)->change_state (element); + + return GST_STATE_SUCCESS; +} + diff --git a/gst/tcp/gsttcpsink.h b/gst/tcp/gsttcpsink.h new file mode 100644 index 0000000000..844ae5b735 --- /dev/null +++ b/gst/tcp/gsttcpsink.h @@ -0,0 +1,95 @@ +/* GStreamer + * Copyright (C) <1999> Erik Walthinsen + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place - Suite 330, + * Boston, MA 02111-1307, USA. + */ + + +#ifndef __GST_TCPSINK_H__ +#define __GST_TCPSINK_H__ + + +#include +#include + +#ifdef __cplusplus +extern "C" { +#endif /* __cplusplus */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#define GST_TYPE_TCPSINK \ + (gst_tcpsink_get_type()) +#define GST_TCPSINK(obj) \ + (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_TCPSINK,GstTCPSink)) +#define GST_TCPSINK_CLASS(klass) \ + (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_TCPSINK,GstTCPSink)) +#define GST_IS_TCPSINK(obj) \ + (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_TCPSINK)) +#define GST_IS_TCPSINK_CLASS(obj) \ + (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_TCPSINK)) + +typedef struct _GstTCPSink GstTCPSink; +typedef struct _GstTCPSinkClass GstTCPSinkClass; + +typedef enum { + GST_TCPSINK_OPEN = GST_ELEMENT_FLAG_LAST, + + GST_TCPSINK_FLAG_LAST = GST_ELEMENT_FLAG_LAST + 2, +} GstTCPSinkFlags; + +struct _GstTCPSink { + GstElement element; + + /* pads */ + GstPad *sinkpad,*srcpad; + + int sock; + struct sockaddr_in theiraddr; + + gint port; + gchar *host; + + guint mtu; + + GstClock *clock; +}; + +struct _GstTCPSinkClass { + GstElementClass parent_class; +}; + +GType gst_tcpsink_get_type(void); + + +#ifdef __cplusplus +} +#endif /* __cplusplus */ + + +#endif /* __GST_TCPSINK_H__ */ diff --git a/gst/tcp/gsttcpsrc.c b/gst/tcp/gsttcpsrc.c new file mode 100644 index 0000000000..45f9663961 --- /dev/null +++ b/gst/tcp/gsttcpsrc.c @@ -0,0 +1,383 @@ +/* GStreamer + * Copyright (C) <1999> Erik Walthinsen + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place - Suite 330, + * Boston, MA 02111-1307, USA. + */ + + +#include "gsttcpsrc.h" + +#define TCP_DEFAULT_PORT 4953 + +/* elementfactory information */ +GstElementDetails gst_tcpsrc_details = { + "TCP packet receiver", + "Source/Network", + "LGPL", + "Receive data over the network via TCP", + VERSION, + "Zeeshan Ali ", + "(C) 2003", +}; + +/* TCPSrc signals and args */ +enum { + /* FILL ME */ + LAST_SIGNAL +}; + +enum { + ARG_0, + ARG_PORT, + /* FILL ME */ +}; + +static void gst_tcpsrc_class_init (GstTCPSrc *klass); +static void gst_tcpsrc_init (GstTCPSrc *tcpsrc); + +static GstBuffer* gst_tcpsrc_get (GstPad *pad); +static GstElementStateReturn + gst_tcpsrc_change_state (GstElement *element); + +static void gst_tcpsrc_set_property (GObject *object, guint prop_id, + const GValue *value, GParamSpec *pspec); +static void gst_tcpsrc_get_property (GObject *object, guint prop_id, + GValue *value, GParamSpec *pspec); +static void gst_tcpsrc_set_clock (GstElement *element, GstClock *clock); + +static GstElementClass *parent_class = NULL; +/*static guint gst_tcpsrc_signals[LAST_SIGNAL] = { 0 }; */ + +GType +gst_tcpsrc_get_type (void) +{ + static GType tcpsrc_type = 0; + + + if (!tcpsrc_type) { + static const GTypeInfo tcpsrc_info = { + sizeof(GstTCPSrcClass), + NULL, + NULL, + (GClassInitFunc)gst_tcpsrc_class_init, + NULL, + NULL, + sizeof(GstTCPSrc), + 0, + (GInstanceInitFunc)gst_tcpsrc_init, + NULL + }; + tcpsrc_type = g_type_register_static (GST_TYPE_ELEMENT, "GstTCPSrc", &tcpsrc_info, 0); + } + return tcpsrc_type; +} + +static void +gst_tcpsrc_class_init (GstTCPSrc *klass) +{ + GObjectClass *gobject_class; + GstElementClass *gstelement_class; + + gobject_class = (GObjectClass*) klass; + gstelement_class = (GstElementClass*) klass; + + parent_class = g_type_class_ref (GST_TYPE_ELEMENT); + + g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_PORT, + g_param_spec_int ("port", "port", "The port to receive the packets from", + 0, 32768, TCP_DEFAULT_PORT, G_PARAM_READWRITE)); + + gobject_class->set_property = gst_tcpsrc_set_property; + gobject_class->get_property = gst_tcpsrc_get_property; + + gstelement_class->change_state = gst_tcpsrc_change_state; + gstelement_class->set_clock = gst_tcpsrc_set_clock; +} + +static void +gst_tcpsrc_set_clock (GstElement *element, GstClock *clock) +{ + GstTCPSrc *tcpsrc; + + tcpsrc = GST_TCPSRC (element); + + tcpsrc->clock = clock; +} + +static void +gst_tcpsrc_init (GstTCPSrc *tcpsrc) +{ + /* create the src and src pads */ + tcpsrc->srcpad = gst_pad_new ("src", GST_PAD_SRC); + gst_element_add_pad (GST_ELEMENT (tcpsrc), tcpsrc->srcpad); + gst_pad_set_get_function (tcpsrc->srcpad, gst_tcpsrc_get); + + tcpsrc->port = TCP_DEFAULT_PORT; + tcpsrc->clock = NULL; + tcpsrc->sock = -1; + tcpsrc->control_sock = -1; + + GST_FLAG_UNSET (tcpsrc, GST_TCPSRC_OPEN); + GST_FLAG_SET (tcpsrc, GST_TCPSRC_1ST_BUF); + GST_FLAG_UNSET (tcpsrc, GST_TCPSRC_CONNECTED); +} + +static GstBuffer* +gst_tcpsrc_get (GstPad *pad) +{ + GstTCPSrc *tcpsrc; + GstBuffer *outbuf; + socklen_t len; + gint numbytes; + fd_set read_fds; + guint max_sock; + int ret, client_sock; + struct sockaddr client_addr; + + g_return_val_if_fail (pad != NULL, NULL); + g_return_val_if_fail (GST_IS_PAD (pad), NULL); + + tcpsrc = GST_TCPSRC (GST_OBJECT_PARENT (pad)); + + FD_ZERO (&read_fds); + FD_SET (tcpsrc->sock, &read_fds); + FD_SET (tcpsrc->control_sock, &read_fds); + + max_sock = MAX(tcpsrc->sock, tcpsrc->control_sock); + + if (select (max_sock+1, &read_fds, NULL, NULL, NULL) > 0) { + if ((tcpsrc->control_sock != -1) && + FD_ISSET (tcpsrc->control_sock, &read_fds)) { +#ifndef GST_DISABLE_LOADSAVE + guchar *buf; + xmlDocPtr doc; + GstCaps *caps; + + buf = g_malloc (1024*10); + + len = sizeof (struct sockaddr); + client_sock = accept (tcpsrc->control_sock, &client_addr, &len); + + if (client_sock <= 0) { + perror ("control_sock accept"); + } + + else if ((ret = read (client_sock, buf, 1024*10)) <= 0) { + perror ("control_sock read"); + } + + else { + buf[ret] = '\0'; + doc = xmlParseMemory(buf, ret); + caps = gst_caps_load_thyself(doc->xmlRootNode); + + /* foward the connect, we don't signal back the result here... */ + gst_pad_proxy_link (tcpsrc->srcpad, caps); + } + + g_free (buf); +#endif + outbuf = NULL; + } + else { + outbuf = gst_buffer_new (); + GST_BUFFER_DATA (outbuf) = g_malloc (24000); + GST_BUFFER_SIZE (outbuf) = 24000; + + if (GST_FLAG_IS_SET (tcpsrc, GST_TCPSRC_1ST_BUF)) { + if (tcpsrc->clock) { + GstClockTime current_time; + GstEvent *discont; + + current_time = gst_clock_get_time (tcpsrc->clock); + + GST_BUFFER_TIMESTAMP (outbuf) = current_time; + + discont = gst_event_new_discontinuous (FALSE, GST_FORMAT_TIME, + current_time, NULL); + + gst_pad_push (tcpsrc->srcpad, GST_BUFFER (discont)); + } + + GST_FLAG_UNSET (tcpsrc, GST_TCPSRC_1ST_BUF); + } + + else { + GST_BUFFER_TIMESTAMP (outbuf) = GST_CLOCK_TIME_NONE; + } + + if (!GST_FLAG_IS_SET (tcpsrc, GST_TCPSRC_CONNECTED)) { + g_print ("accepting stream..\n"); + tcpsrc->client_sock = accept (tcpsrc->sock, &client_addr, &len); + g_print ("accepted stream.\n"); + + if (tcpsrc->client_sock <= 0) { + perror ("accept"); + } + + else { + GST_FLAG_SET (tcpsrc, GST_TCPSRC_CONNECTED); + } + } + + numbytes = + read (tcpsrc->client_sock, GST_BUFFER_DATA (outbuf), GST_BUFFER_SIZE (outbuf)); + + if (numbytes > 0) { + GST_BUFFER_SIZE (outbuf) = numbytes; + } + + else { + perror ("read"); + gst_buffer_unref (outbuf); + outbuf = NULL; + close (tcpsrc->client_sock); + GST_FLAG_UNSET (tcpsrc, GST_TCPSRC_CONNECTED); + } + } + } + + else { + perror ("select"); + outbuf = NULL; + } + + return outbuf; +} + + +static void +gst_tcpsrc_set_property (GObject *object, guint prop_id, const GValue *value, GParamSpec *pspec) +{ + GstTCPSrc *tcpsrc; + + /* it's not null if we got it, but it might not be ours */ + g_return_if_fail(GST_IS_TCPSRC(object)); + tcpsrc = GST_TCPSRC(object); + + switch (prop_id) { + case ARG_PORT: + tcpsrc->port = g_value_get_int (value); + break; + default: + break; + } +} + +static void +gst_tcpsrc_get_property (GObject *object, guint prop_id, GValue *value, GParamSpec *pspec) +{ + GstTCPSrc *tcpsrc; + + /* it's not null if we got it, but it might not be ours */ + g_return_if_fail(GST_IS_TCPSRC(object)); + tcpsrc = GST_TCPSRC(object); + + switch (prop_id) { + case ARG_PORT: + g_value_set_int (value, tcpsrc->port); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +/* create a socket for sending to remote machine */ +static gboolean +gst_tcpsrc_init_receive (GstTCPSrc *src) +{ + bzero (&src->myaddr, sizeof (src->myaddr)); + src->myaddr.sin_family = AF_INET; /* host byte order */ + src->myaddr.sin_port = htons (src->port); /* short, network byte order */ + src->myaddr.sin_addr.s_addr = INADDR_ANY; + + if ((src->sock = socket (AF_INET, SOCK_STREAM, 0)) == -1) { + perror("stream_socket"); + return FALSE; + } + + if ((src->control_sock = socket (AF_INET, SOCK_STREAM, 0)) == -1) { + perror("control_socket"); + return FALSE; + } + + if (bind (src->sock, (struct sockaddr *) &src->myaddr, sizeof (src->myaddr)) == -1) { + perror("stream_sock bind"); + return FALSE; + } + + src->myaddr.sin_port = htons (src->port+1); /* short, network byte order */ + + if (bind (src->control_sock, (struct sockaddr *) &src->myaddr, sizeof (src->myaddr)) == -1) { + perror("control bind"); + return FALSE; + } + + if (listen (src->sock, 5) == -1) { + perror("stream_sock listen"); + return FALSE; + } + + if (listen (src->control_sock, 5) == -1) { + perror("control listen"); + return FALSE; + } + + fcntl (src->sock, F_SETFL, O_NONBLOCK); + fcntl (src->control_sock, F_SETFL, O_NONBLOCK); + + GST_FLAG_SET (src, GST_TCPSRC_OPEN); + + return TRUE; +} + +static void +gst_tcpsrc_close (GstTCPSrc *src) +{ + if (src->sock != -1) { + close (src->sock); + src->sock = -1; + } + if (src->control_sock != -1) { + close (src->control_sock); + src->control_sock = -1; + } + + GST_FLAG_UNSET (src, GST_TCPSRC_OPEN); +} + +static GstElementStateReturn +gst_tcpsrc_change_state (GstElement *element) +{ + g_return_val_if_fail (GST_IS_TCPSRC (element), GST_STATE_FAILURE); + + if (GST_STATE_PENDING (element) == GST_STATE_NULL) { + if (GST_FLAG_IS_SET (element, GST_TCPSRC_OPEN)) + gst_tcpsrc_close (GST_TCPSRC (element)); + } else { + if (!GST_FLAG_IS_SET (element, GST_TCPSRC_OPEN)) { + if (!gst_tcpsrc_init_receive (GST_TCPSRC (element))) + return GST_STATE_FAILURE; + } + } + + if (GST_ELEMENT_CLASS (parent_class)->change_state) + return GST_ELEMENT_CLASS (parent_class)->change_state (element); + + return GST_STATE_SUCCESS; +} + diff --git a/gst/tcp/gsttcpsrc.h b/gst/tcp/gsttcpsrc.h new file mode 100644 index 0000000000..bc29a18f8e --- /dev/null +++ b/gst/tcp/gsttcpsrc.h @@ -0,0 +1,90 @@ +/* GStreamer + * Copyright (C) <1999> Erik Walthinsen + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place - Suite 330, + * Boston, MA 02111-1307, USA. + */ + + +#ifndef __GST_TCPSRC_H__ +#define __GST_TCPSRC_H__ + +#include +#include + +#ifdef __cplusplus +extern "C" { +#endif /* __cplusplus */ + +#include +#include +#include +#include +#include +#include +#include + +#include + +#define GST_TYPE_TCPSRC \ + (gst_tcpsrc_get_type()) +#define GST_TCPSRC(obj) \ + (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_TCPSRC,GstTCPSrc)) +#define GST_TCPSRC_CLASS(klass) \ + (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_TCPSRC,GstTCPSrc)) +#define GST_IS_TCPSRC(obj) \ + (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_TCPSRC)) +#define GST_IS_TCPSRC_CLASS(obj) \ + (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_TCPSRC)) + +typedef struct _GstTCPSrc GstTCPSrc; +typedef struct _GstTCPSrcClass GstTCPSrcClass; + +typedef enum { + GST_TCPSRC_OPEN = GST_ELEMENT_FLAG_LAST, + GST_TCPSRC_1ST_BUF, + GST_TCPSRC_CONNECTED, + + GST_TCPSRC_FLAG_LAST, +} GstTCPSrcFlags; + +struct _GstTCPSrc { + GstElement element; + + /* pads */ + GstPad *sinkpad,*srcpad; + + int port; + int sock; + int client_sock; + int control_sock; + + struct sockaddr_in myaddr; + GstClock *clock; +}; + +struct _GstTCPSrcClass { + GstElementClass parent_class; +}; + +GType gst_tcpsrc_get_type(void); + + +#ifdef __cplusplus +} +#endif /* __cplusplus */ + + +#endif /* __GST_TCPSRC_H__ */