A TCP plugin could be needed by many, including wtay himself cause he is sitting behind a firewall blocking UDP and h...

Original commit message from CVS:
A TCP plugin could be needed by many, including wtay himself cause he is sitting behind a firewall blocking UDP and he can't hear or see me. :) Shamefully most of the code is from udpsrc/sink. Still timestamping/clock does'nt work. :(
This commit is contained in:
Zeeshan Ali 2003-07-17 23:04:46 +00:00
parent 8bd20d2b8d
commit e05e44f6d5
10 changed files with 1137 additions and 0 deletions

12
gst/tcp/Makefile.am Normal file
View file

@ -0,0 +1,12 @@
plugindir = $(libdir)/gstreamer-@GST_MAJORMINOR@
plugin_LTLIBRARIES = libgsttcp.la
libgsttcp_la_SOURCES = gsttcp.c gsttcpsrc.c gsttcpsink.c
libgsttcp_la_CFLAGS = $(GST_CFLAGS)
libgsttcp_la_LIBADD =
libgsttcp_la_LDFLAGS = $(GST_PLUGIN_LDFLAGS)
noinst_HEADERS = gsttcpsink.h gsttcpsrc.h gsttcp.h
EXTRA_DIST = README

20
gst/tcp/README Normal file
View file

@ -0,0 +1,20 @@
* What is TCP src/sink?
solution, like icecast or realaudio or whatever. But the future RTP plugins shall not do the actual transmission/reception of packets on the network themselve but the Application developer would be encouraged to use either the TCP or the UDP plugins for that. UDP would be used mostly but there could be situations where TCP would be the only available choice. For example streaming accross firewalls that do not allow UDP.
* Shortcomings
Even given our modest ambitions, the current code doesn't handle
caps negotiation robustly.
* Todo
The caps nego should do bi-directional negotiation.
Perhaps this plugin can be the example of how to do caps negotiation
via a point-to-point protocol.
12 Sep 2001
Wim Taymans <wim.taymans@chello.be>
Joshua N Pritikin <vishnu@pobox.com>
Zeeshan Ali <zak147@yahoo.com>

52
gst/tcp/gsttcp.c Normal file
View file

@ -0,0 +1,52 @@
/* GStreamer
* Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 59 Temple Place - Suite 330,
* Boston, MA 02111-1307, USA.
*/
#include "gsttcpsrc.h"
#include "gsttcpsink.h"
/* elementfactory information */
extern GstElementDetails gst_tcpsrc_details;
extern GstElementDetails gst_tcpsink_details;
static gboolean
plugin_init (GModule *module, GstPlugin *plugin)
{
GstElementFactory *src, *sink;
/* create an elementfactory for the tcpsrc element */
sink = gst_element_factory_new ("tcpsink",GST_TYPE_TCPSINK,
&gst_tcpsink_details);
g_return_val_if_fail (sink != NULL, FALSE);
gst_plugin_add_feature (plugin, GST_PLUGIN_FEATURE (sink));
src = gst_element_factory_new ("tcpsrc",GST_TYPE_TCPSRC,
&gst_tcpsrc_details);
g_return_val_if_fail (src != NULL, FALSE);
gst_plugin_add_feature (plugin, GST_PLUGIN_FEATURE (src));
return TRUE;
}
GstPluginDesc plugin_desc = {
GST_VERSION_MAJOR,
GST_VERSION_MINOR,
"tcp",
plugin_init
};

35
gst/tcp/gsttcp.h Normal file
View file

@ -0,0 +1,35 @@
/* GStreamer
* Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 59 Temple Place - Suite 330,
* Boston, MA 02111-1307, USA.
*/
#ifndef __GST_TCP_H__
#define __GST_TCP_H__
#ifdef __cplusplus
extern "C" {
#endif /* __cplusplus */
typedef enum {
CONTROL_ZERO,
CONTROL_NONE,
CONTROL_TCP,
CONTROL_TCP
} Gst_TCP_Control;
#endif

52
gst/tcp/gsttcpplugin.c Normal file
View file

@ -0,0 +1,52 @@
/* GStreamer
* Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 59 Temple Place - Suite 330,
* Boston, MA 02111-1307, USA.
*/
#include "gsttcpsrc.h"
#include "gsttcpsink.h"
/* elementfactory information */
extern GstElementDetails gst_tcpsrc_details;
extern GstElementDetails gst_tcpsink_details;
static gboolean
plugin_init (GModule *module, GstPlugin *plugin)
{
GstElementFactory *src, *sink;
/* create an elementfactory for the tcpsrc element */
sink = gst_element_factory_new ("tcpsink",GST_TYPE_TCPSINK,
&gst_tcpsink_details);
g_return_val_if_fail (sink != NULL, FALSE);
gst_plugin_add_feature (plugin, GST_PLUGIN_FEATURE (sink));
src = gst_element_factory_new ("tcpsrc",GST_TYPE_TCPSRC,
&gst_tcpsrc_details);
g_return_val_if_fail (src != NULL, FALSE);
gst_plugin_add_feature (plugin, GST_PLUGIN_FEATURE (src));
return TRUE;
}
GstPluginDesc plugin_desc = {
GST_VERSION_MAJOR,
GST_VERSION_MINOR,
"tcp",
plugin_init
};

35
gst/tcp/gsttcpplugin.h Normal file
View file

@ -0,0 +1,35 @@
/* GStreamer
* Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 59 Temple Place - Suite 330,
* Boston, MA 02111-1307, USA.
*/
#ifndef __GST_TCP_H__
#define __GST_TCP_H__
#ifdef __cplusplus
extern "C" {
#endif /* __cplusplus */
typedef enum {
CONTROL_ZERO,
CONTROL_NONE,
CONTROL_TCP,
CONTROL_TCP
} Gst_TCP_Control;
#endif

363
gst/tcp/gsttcpsink.c Normal file
View file

@ -0,0 +1,363 @@
/* GStreamer
* Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 59 Temple Place - Suite 330,
* Boston, MA 02111-1307, USA.
*/
#include "gsttcpsink.h"
#define TCP_DEFAULT_HOST "localhost"
#define TCP_DEFAULT_PORT 4953
/* elementfactory information */
GstElementDetails gst_tcpsink_details = {
"TCP packet sender",
"Sink/Network",
"LGPL",
"Send data over the network via TCP",
VERSION,
"Zeeshan Ali <zak147@yahoo.com>",
"(C) 2003",
};
/* TCPSink signals and args */
enum {
FRAME_ENCODED,
/* FILL ME */
LAST_SIGNAL
};
enum {
ARG_0,
ARG_HOST,
ARG_PORT,
/* FILL ME */
};
static void gst_tcpsink_class_init (GstTCPSink *klass);
static void gst_tcpsink_init (GstTCPSink *tcpsink);
static void gst_tcpsink_set_clock (GstElement *element, GstClock *clock);
static void gst_tcpsink_chain (GstPad *pad,GstBuffer *buf);
static GstElementStateReturn gst_tcpsink_change_state (GstElement *element);
static void gst_tcpsink_set_property (GObject *object, guint prop_id,
const GValue *value, GParamSpec *pspec);
static void gst_tcpsink_get_property (GObject *object, guint prop_id,
GValue *value, GParamSpec *pspec);
static GstElementClass *parent_class = NULL;
/*static guint gst_tcpsink_signals[LAST_SIGNAL] = { 0 }; */
GType
gst_tcpsink_get_type (void)
{
static GType tcpsink_type = 0;
if (!tcpsink_type) {
static const GTypeInfo tcpsink_info = {
sizeof(GstTCPSinkClass),
NULL,
NULL,
(GClassInitFunc)gst_tcpsink_class_init,
NULL,
NULL,
sizeof(GstTCPSink),
0,
(GInstanceInitFunc)gst_tcpsink_init,
NULL
};
tcpsink_type = g_type_register_static (GST_TYPE_ELEMENT, "GstTCPSink", &tcpsink_info, 0);
}
return tcpsink_type;
}
static void
gst_tcpsink_class_init (GstTCPSink *klass)
{
GObjectClass *gobject_class;
GstElementClass *gstelement_class;
gobject_class = (GObjectClass*) klass;
gstelement_class = (GstElementClass*) klass;
parent_class = g_type_class_ref (GST_TYPE_ELEMENT);
g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_HOST,
g_param_spec_string ("host", "host", "The host/IP to send the packets to",
TCP_DEFAULT_HOST, G_PARAM_READWRITE));
g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_PORT,
g_param_spec_int ("port", "port", "The port to send the packets to",
0, 32768, TCP_DEFAULT_PORT, G_PARAM_READWRITE));
gobject_class->set_property = gst_tcpsink_set_property;
gobject_class->get_property = gst_tcpsink_get_property;
gstelement_class->change_state = gst_tcpsink_change_state;
gstelement_class->set_clock = gst_tcpsink_set_clock;
}
static GstPadLinkReturn
gst_tcpsink_sinkconnect (GstPad *pad, GstCaps *caps)
{
GstTCPSink *tcpsink;
struct sockaddr_in serv_addr;
struct in_addr addr;
struct hostent *he;
int fd;
FILE *f;
#ifndef GST_DISABLE_LOADSAVE
xmlDocPtr doc;
tcpsink = GST_TCPSINK (gst_pad_get_parent (pad));
memset (&serv_addr, 0, sizeof(serv_addr));
/* if its an IP address */
if (inet_aton (tcpsink->host, &addr)) {
memmove (&(serv_addr.sin_addr), &addr, sizeof (struct in_addr));
}
/* we dont need to lookup for localhost */
else if (strcmp (tcpsink->host, TCP_DEFAULT_HOST) == 0) {
if (inet_aton ("127.0.0.1", &addr)) {
memmove (&(serv_addr.sin_addr), &addr, sizeof (struct in_addr));
}
}
/* if its a hostname */
else if ((he = gethostbyname (tcpsink->host))) {
memmove (&(serv_addr.sin_addr), he->h_addr, he->h_length);
}
else {
perror("hostname lookup error?");
return GST_PAD_LINK_REFUSED;
}
serv_addr.sin_family = AF_INET;
serv_addr.sin_port = htons(tcpsink->port+1);
doc = xmlNewDoc ("1.0");
doc->xmlRootNode = xmlNewDocNode (doc, NULL, "NewCaps", NULL);
gst_caps_save_thyself (caps, doc->xmlRootNode);
if ((fd = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP)) == -1) {
perror("socket");
return GST_PAD_LINK_REFUSED;
}
if (connect(fd, (struct sockaddr *)&serv_addr, sizeof (serv_addr)) != 0) {
g_printerr ("tcpsink: connect to %s port %d failed: %s\n",
tcpsink->host, tcpsink->port+1, g_strerror(errno));
return GST_PAD_LINK_REFUSED;
}
f = fdopen (dup (fd), "wb");
xmlDocDump(f, doc);
fclose (f);
close (fd);
#endif
return GST_PAD_LINK_OK;
}
static void
gst_tcpsink_set_clock (GstElement *element, GstClock *clock)
{
GstTCPSink *tcpsink;
tcpsink = GST_TCPSINK (element);
tcpsink->clock = clock;
}
static void
gst_tcpsink_init (GstTCPSink *tcpsink)
{
/* create the sink and src pads */
tcpsink->sinkpad = gst_pad_new ("sink", GST_PAD_SINK);
gst_element_add_pad (GST_ELEMENT (tcpsink), tcpsink->sinkpad);
gst_pad_set_chain_function (tcpsink->sinkpad, gst_tcpsink_chain);
gst_pad_set_link_function (tcpsink->sinkpad, gst_tcpsink_sinkconnect);
tcpsink->host = g_strdup (TCP_DEFAULT_HOST);
tcpsink->port = TCP_DEFAULT_PORT;
tcpsink->clock = NULL;
}
static void
gst_tcpsink_chain (GstPad *pad, GstBuffer *buf)
{
GstTCPSink *tcpsink;
g_return_if_fail (pad != NULL);
g_return_if_fail (GST_IS_PAD (pad));
g_return_if_fail (buf != NULL);
tcpsink = GST_TCPSINK (GST_OBJECT_PARENT (pad));
if (tcpsink->clock) {
GstClockID id = gst_clock_new_single_shot_id (tcpsink->clock, GST_BUFFER_TIMESTAMP (buf));
GST_DEBUG (0, "tcpsink: clock wait: %" G_GUINT64_FORMAT "\n", GST_BUFFER_TIMESTAMP (buf));
gst_element_clock_wait (GST_ELEMENT (tcpsink), id, NULL);
gst_clock_id_free (id);
}
if (write (tcpsink->sock, GST_BUFFER_DATA (buf), GST_BUFFER_SIZE (buf)) <= 0)
{
perror("write");
}
gst_buffer_unref(buf);
}
static void
gst_tcpsink_set_property (GObject *object, guint prop_id, const GValue *value, GParamSpec *pspec)
{
GstTCPSink *tcpsink;
/* it's not null if we got it, but it might not be ours */
g_return_if_fail(GST_IS_TCPSINK(object));
tcpsink = GST_TCPSINK(object);
switch (prop_id) {
case ARG_HOST:
if (tcpsink->host != NULL) g_free(tcpsink->host);
if (g_value_get_string (value) == NULL)
tcpsink->host = NULL;
else
tcpsink->host = g_strdup (g_value_get_string (value));
break;
case ARG_PORT:
tcpsink->port = g_value_get_int (value);
break;
default:
break;
}
}
static void
gst_tcpsink_get_property (GObject *object, guint prop_id, GValue *value, GParamSpec *pspec)
{
GstTCPSink *tcpsink;
/* it's not null if we got it, but it might not be ours */
g_return_if_fail(GST_IS_TCPSINK(object));
tcpsink = GST_TCPSINK(object);
switch (prop_id) {
case ARG_HOST:
g_value_set_string (value, tcpsink->host);
break;
case ARG_PORT:
g_value_set_int (value, tcpsink->port);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
}
}
/* create a socket for sending to remote machine */
static gboolean
gst_tcpsink_init_send (GstTCPSink *sink)
{
struct hostent *he;
struct in_addr addr;
bzero (&sink->theiraddr, sizeof (sink->theiraddr));
sink->theiraddr.sin_family = AF_INET; /* host byte order */
sink->theiraddr.sin_port = htons (sink->port); /* short, network byte order */
/* if its an IP address */
if (inet_aton (sink->host, &addr)) {
memmove (&(sink->theiraddr.sin_addr), &addr, sizeof (struct in_addr));
}
/* we dont need to lookup for localhost */
else if (strcmp (sink->host, TCP_DEFAULT_HOST) == 0) {
if (inet_aton ("127.0.0.1", &addr)) {
memmove (&(sink->theiraddr.sin_addr), &addr, sizeof (struct in_addr));
}
}
/* if its a hostname */
else if ((he = gethostbyname (sink->host))) {
memmove (&(sink->theiraddr.sin_addr), he->h_addr, he->h_length);
}
else {
perror("hostname lookup error?");
return FALSE;
}
if ((sink->sock = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP)) == -1) {
perror("socket");
return FALSE;
}
if (connect (sink->sock, (struct sockaddr *)&(sink->theiraddr), sizeof (sink->theiraddr)) != 0) {
perror("stream connect");
return FALSE;
}
g_print ("tcpsink: connected to %s port %d\n", sink->host, sink->port);
GST_FLAG_SET (sink, GST_TCPSINK_OPEN);
return TRUE;
}
static void
gst_tcpsink_close (GstTCPSink *sink)
{
close (sink->sock);
GST_FLAG_UNSET (sink, GST_TCPSINK_OPEN);
}
static GstElementStateReturn
gst_tcpsink_change_state (GstElement *element)
{
g_return_val_if_fail (GST_IS_TCPSINK (element), GST_STATE_FAILURE);
if (GST_STATE_PENDING (element) == GST_STATE_NULL) {
if (GST_FLAG_IS_SET (element, GST_TCPSINK_OPEN))
gst_tcpsink_close (GST_TCPSINK (element));
} else {
if (!GST_FLAG_IS_SET (element, GST_TCPSINK_OPEN)) {
if (!gst_tcpsink_init_send (GST_TCPSINK (element)))
return GST_STATE_FAILURE;
}
}
if (GST_ELEMENT_CLASS (parent_class)->change_state)
return GST_ELEMENT_CLASS (parent_class)->change_state (element);
return GST_STATE_SUCCESS;
}

95
gst/tcp/gsttcpsink.h Normal file
View file

@ -0,0 +1,95 @@
/* GStreamer
* Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 59 Temple Place - Suite 330,
* Boston, MA 02111-1307, USA.
*/
#ifndef __GST_TCPSINK_H__
#define __GST_TCPSINK_H__
#include <config.h>
#include <gst/gst.h>
#ifdef __cplusplus
extern "C" {
#endif /* __cplusplus */
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <sys/time.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <netdb.h>
#include <sys/socket.h>
#include <sys/wait.h>
#include <fcntl.h>
#include <arpa/inet.h>
#define GST_TYPE_TCPSINK \
(gst_tcpsink_get_type())
#define GST_TCPSINK(obj) \
(G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_TCPSINK,GstTCPSink))
#define GST_TCPSINK_CLASS(klass) \
(G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_TCPSINK,GstTCPSink))
#define GST_IS_TCPSINK(obj) \
(G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_TCPSINK))
#define GST_IS_TCPSINK_CLASS(obj) \
(G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_TCPSINK))
typedef struct _GstTCPSink GstTCPSink;
typedef struct _GstTCPSinkClass GstTCPSinkClass;
typedef enum {
GST_TCPSINK_OPEN = GST_ELEMENT_FLAG_LAST,
GST_TCPSINK_FLAG_LAST = GST_ELEMENT_FLAG_LAST + 2,
} GstTCPSinkFlags;
struct _GstTCPSink {
GstElement element;
/* pads */
GstPad *sinkpad,*srcpad;
int sock;
struct sockaddr_in theiraddr;
gint port;
gchar *host;
guint mtu;
GstClock *clock;
};
struct _GstTCPSinkClass {
GstElementClass parent_class;
};
GType gst_tcpsink_get_type(void);
#ifdef __cplusplus
}
#endif /* __cplusplus */
#endif /* __GST_TCPSINK_H__ */

383
gst/tcp/gsttcpsrc.c Normal file
View file

@ -0,0 +1,383 @@
/* GStreamer
* Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 59 Temple Place - Suite 330,
* Boston, MA 02111-1307, USA.
*/
#include "gsttcpsrc.h"
#define TCP_DEFAULT_PORT 4953
/* elementfactory information */
GstElementDetails gst_tcpsrc_details = {
"TCP packet receiver",
"Source/Network",
"LGPL",
"Receive data over the network via TCP",
VERSION,
"Zeeshan Ali <zak147@yahoo.com>",
"(C) 2003",
};
/* TCPSrc signals and args */
enum {
/* FILL ME */
LAST_SIGNAL
};
enum {
ARG_0,
ARG_PORT,
/* FILL ME */
};
static void gst_tcpsrc_class_init (GstTCPSrc *klass);
static void gst_tcpsrc_init (GstTCPSrc *tcpsrc);
static GstBuffer* gst_tcpsrc_get (GstPad *pad);
static GstElementStateReturn
gst_tcpsrc_change_state (GstElement *element);
static void gst_tcpsrc_set_property (GObject *object, guint prop_id,
const GValue *value, GParamSpec *pspec);
static void gst_tcpsrc_get_property (GObject *object, guint prop_id,
GValue *value, GParamSpec *pspec);
static void gst_tcpsrc_set_clock (GstElement *element, GstClock *clock);
static GstElementClass *parent_class = NULL;
/*static guint gst_tcpsrc_signals[LAST_SIGNAL] = { 0 }; */
GType
gst_tcpsrc_get_type (void)
{
static GType tcpsrc_type = 0;
if (!tcpsrc_type) {
static const GTypeInfo tcpsrc_info = {
sizeof(GstTCPSrcClass),
NULL,
NULL,
(GClassInitFunc)gst_tcpsrc_class_init,
NULL,
NULL,
sizeof(GstTCPSrc),
0,
(GInstanceInitFunc)gst_tcpsrc_init,
NULL
};
tcpsrc_type = g_type_register_static (GST_TYPE_ELEMENT, "GstTCPSrc", &tcpsrc_info, 0);
}
return tcpsrc_type;
}
static void
gst_tcpsrc_class_init (GstTCPSrc *klass)
{
GObjectClass *gobject_class;
GstElementClass *gstelement_class;
gobject_class = (GObjectClass*) klass;
gstelement_class = (GstElementClass*) klass;
parent_class = g_type_class_ref (GST_TYPE_ELEMENT);
g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_PORT,
g_param_spec_int ("port", "port", "The port to receive the packets from",
0, 32768, TCP_DEFAULT_PORT, G_PARAM_READWRITE));
gobject_class->set_property = gst_tcpsrc_set_property;
gobject_class->get_property = gst_tcpsrc_get_property;
gstelement_class->change_state = gst_tcpsrc_change_state;
gstelement_class->set_clock = gst_tcpsrc_set_clock;
}
static void
gst_tcpsrc_set_clock (GstElement *element, GstClock *clock)
{
GstTCPSrc *tcpsrc;
tcpsrc = GST_TCPSRC (element);
tcpsrc->clock = clock;
}
static void
gst_tcpsrc_init (GstTCPSrc *tcpsrc)
{
/* create the src and src pads */
tcpsrc->srcpad = gst_pad_new ("src", GST_PAD_SRC);
gst_element_add_pad (GST_ELEMENT (tcpsrc), tcpsrc->srcpad);
gst_pad_set_get_function (tcpsrc->srcpad, gst_tcpsrc_get);
tcpsrc->port = TCP_DEFAULT_PORT;
tcpsrc->clock = NULL;
tcpsrc->sock = -1;
tcpsrc->control_sock = -1;
GST_FLAG_UNSET (tcpsrc, GST_TCPSRC_OPEN);
GST_FLAG_SET (tcpsrc, GST_TCPSRC_1ST_BUF);
GST_FLAG_UNSET (tcpsrc, GST_TCPSRC_CONNECTED);
}
static GstBuffer*
gst_tcpsrc_get (GstPad *pad)
{
GstTCPSrc *tcpsrc;
GstBuffer *outbuf;
socklen_t len;
gint numbytes;
fd_set read_fds;
guint max_sock;
int ret, client_sock;
struct sockaddr client_addr;
g_return_val_if_fail (pad != NULL, NULL);
g_return_val_if_fail (GST_IS_PAD (pad), NULL);
tcpsrc = GST_TCPSRC (GST_OBJECT_PARENT (pad));
FD_ZERO (&read_fds);
FD_SET (tcpsrc->sock, &read_fds);
FD_SET (tcpsrc->control_sock, &read_fds);
max_sock = MAX(tcpsrc->sock, tcpsrc->control_sock);
if (select (max_sock+1, &read_fds, NULL, NULL, NULL) > 0) {
if ((tcpsrc->control_sock != -1) &&
FD_ISSET (tcpsrc->control_sock, &read_fds)) {
#ifndef GST_DISABLE_LOADSAVE
guchar *buf;
xmlDocPtr doc;
GstCaps *caps;
buf = g_malloc (1024*10);
len = sizeof (struct sockaddr);
client_sock = accept (tcpsrc->control_sock, &client_addr, &len);
if (client_sock <= 0) {
perror ("control_sock accept");
}
else if ((ret = read (client_sock, buf, 1024*10)) <= 0) {
perror ("control_sock read");
}
else {
buf[ret] = '\0';
doc = xmlParseMemory(buf, ret);
caps = gst_caps_load_thyself(doc->xmlRootNode);
/* foward the connect, we don't signal back the result here... */
gst_pad_proxy_link (tcpsrc->srcpad, caps);
}
g_free (buf);
#endif
outbuf = NULL;
}
else {
outbuf = gst_buffer_new ();
GST_BUFFER_DATA (outbuf) = g_malloc (24000);
GST_BUFFER_SIZE (outbuf) = 24000;
if (GST_FLAG_IS_SET (tcpsrc, GST_TCPSRC_1ST_BUF)) {
if (tcpsrc->clock) {
GstClockTime current_time;
GstEvent *discont;
current_time = gst_clock_get_time (tcpsrc->clock);
GST_BUFFER_TIMESTAMP (outbuf) = current_time;
discont = gst_event_new_discontinuous (FALSE, GST_FORMAT_TIME,
current_time, NULL);
gst_pad_push (tcpsrc->srcpad, GST_BUFFER (discont));
}
GST_FLAG_UNSET (tcpsrc, GST_TCPSRC_1ST_BUF);
}
else {
GST_BUFFER_TIMESTAMP (outbuf) = GST_CLOCK_TIME_NONE;
}
if (!GST_FLAG_IS_SET (tcpsrc, GST_TCPSRC_CONNECTED)) {
g_print ("accepting stream..\n");
tcpsrc->client_sock = accept (tcpsrc->sock, &client_addr, &len);
g_print ("accepted stream.\n");
if (tcpsrc->client_sock <= 0) {
perror ("accept");
}
else {
GST_FLAG_SET (tcpsrc, GST_TCPSRC_CONNECTED);
}
}
numbytes =
read (tcpsrc->client_sock, GST_BUFFER_DATA (outbuf), GST_BUFFER_SIZE (outbuf));
if (numbytes > 0) {
GST_BUFFER_SIZE (outbuf) = numbytes;
}
else {
perror ("read");
gst_buffer_unref (outbuf);
outbuf = NULL;
close (tcpsrc->client_sock);
GST_FLAG_UNSET (tcpsrc, GST_TCPSRC_CONNECTED);
}
}
}
else {
perror ("select");
outbuf = NULL;
}
return outbuf;
}
static void
gst_tcpsrc_set_property (GObject *object, guint prop_id, const GValue *value, GParamSpec *pspec)
{
GstTCPSrc *tcpsrc;
/* it's not null if we got it, but it might not be ours */
g_return_if_fail(GST_IS_TCPSRC(object));
tcpsrc = GST_TCPSRC(object);
switch (prop_id) {
case ARG_PORT:
tcpsrc->port = g_value_get_int (value);
break;
default:
break;
}
}
static void
gst_tcpsrc_get_property (GObject *object, guint prop_id, GValue *value, GParamSpec *pspec)
{
GstTCPSrc *tcpsrc;
/* it's not null if we got it, but it might not be ours */
g_return_if_fail(GST_IS_TCPSRC(object));
tcpsrc = GST_TCPSRC(object);
switch (prop_id) {
case ARG_PORT:
g_value_set_int (value, tcpsrc->port);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
}
}
/* create a socket for sending to remote machine */
static gboolean
gst_tcpsrc_init_receive (GstTCPSrc *src)
{
bzero (&src->myaddr, sizeof (src->myaddr));
src->myaddr.sin_family = AF_INET; /* host byte order */
src->myaddr.sin_port = htons (src->port); /* short, network byte order */
src->myaddr.sin_addr.s_addr = INADDR_ANY;
if ((src->sock = socket (AF_INET, SOCK_STREAM, 0)) == -1) {
perror("stream_socket");
return FALSE;
}
if ((src->control_sock = socket (AF_INET, SOCK_STREAM, 0)) == -1) {
perror("control_socket");
return FALSE;
}
if (bind (src->sock, (struct sockaddr *) &src->myaddr, sizeof (src->myaddr)) == -1) {
perror("stream_sock bind");
return FALSE;
}
src->myaddr.sin_port = htons (src->port+1); /* short, network byte order */
if (bind (src->control_sock, (struct sockaddr *) &src->myaddr, sizeof (src->myaddr)) == -1) {
perror("control bind");
return FALSE;
}
if (listen (src->sock, 5) == -1) {
perror("stream_sock listen");
return FALSE;
}
if (listen (src->control_sock, 5) == -1) {
perror("control listen");
return FALSE;
}
fcntl (src->sock, F_SETFL, O_NONBLOCK);
fcntl (src->control_sock, F_SETFL, O_NONBLOCK);
GST_FLAG_SET (src, GST_TCPSRC_OPEN);
return TRUE;
}
static void
gst_tcpsrc_close (GstTCPSrc *src)
{
if (src->sock != -1) {
close (src->sock);
src->sock = -1;
}
if (src->control_sock != -1) {
close (src->control_sock);
src->control_sock = -1;
}
GST_FLAG_UNSET (src, GST_TCPSRC_OPEN);
}
static GstElementStateReturn
gst_tcpsrc_change_state (GstElement *element)
{
g_return_val_if_fail (GST_IS_TCPSRC (element), GST_STATE_FAILURE);
if (GST_STATE_PENDING (element) == GST_STATE_NULL) {
if (GST_FLAG_IS_SET (element, GST_TCPSRC_OPEN))
gst_tcpsrc_close (GST_TCPSRC (element));
} else {
if (!GST_FLAG_IS_SET (element, GST_TCPSRC_OPEN)) {
if (!gst_tcpsrc_init_receive (GST_TCPSRC (element)))
return GST_STATE_FAILURE;
}
}
if (GST_ELEMENT_CLASS (parent_class)->change_state)
return GST_ELEMENT_CLASS (parent_class)->change_state (element);
return GST_STATE_SUCCESS;
}

90
gst/tcp/gsttcpsrc.h Normal file
View file

@ -0,0 +1,90 @@
/* GStreamer
* Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 59 Temple Place - Suite 330,
* Boston, MA 02111-1307, USA.
*/
#ifndef __GST_TCPSRC_H__
#define __GST_TCPSRC_H__
#include <config.h>
#include <gst/gst.h>
#ifdef __cplusplus
extern "C" {
#endif /* __cplusplus */
#include <errno.h>
#include <string.h>
#include <sys/types.h>
#include <netdb.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <fcntl.h>
#define GST_TYPE_TCPSRC \
(gst_tcpsrc_get_type())
#define GST_TCPSRC(obj) \
(G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_TCPSRC,GstTCPSrc))
#define GST_TCPSRC_CLASS(klass) \
(G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_TCPSRC,GstTCPSrc))
#define GST_IS_TCPSRC(obj) \
(G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_TCPSRC))
#define GST_IS_TCPSRC_CLASS(obj) \
(G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_TCPSRC))
typedef struct _GstTCPSrc GstTCPSrc;
typedef struct _GstTCPSrcClass GstTCPSrcClass;
typedef enum {
GST_TCPSRC_OPEN = GST_ELEMENT_FLAG_LAST,
GST_TCPSRC_1ST_BUF,
GST_TCPSRC_CONNECTED,
GST_TCPSRC_FLAG_LAST,
} GstTCPSrcFlags;
struct _GstTCPSrc {
GstElement element;
/* pads */
GstPad *sinkpad,*srcpad;
int port;
int sock;
int client_sock;
int control_sock;
struct sockaddr_in myaddr;
GstClock *clock;
};
struct _GstTCPSrcClass {
GstElementClass parent_class;
};
GType gst_tcpsrc_get_type(void);
#ifdef __cplusplus
}
#endif /* __cplusplus */
#endif /* __GST_TCPSRC_H__ */