use streamheader

Original commit message from CVS:
use streamheader
This commit is contained in:
Thomas Vander Stichele 2004-06-08 10:44:59 +00:00
parent ecf7f33ec0
commit dfadb5df47
4 changed files with 130 additions and 51 deletions

View file

@ -1,3 +1,13 @@
2004-06-08 Thomas Vander Stichele <thomas at apestaart dot org>
* gst/tcp/gsttcpclientsrc.c: (gst_tcpclientsrc_get):
* gst/tcp/gsttcpserversink.c: (gst_tcpserversink_client_remove),
(gst_tcpserversink_handle_client_read), (gst_tcp_buffer_write),
(gst_tcpserversink_handle_client_write), (gst_tcpserversink_chain),
(gst_tcpserversink_init_send), (gst_tcpserversink_close):
* gst/tcp/gsttcpserversink.h:
take streamheader into account
2004-06-08 Thomas Vander Stichele <thomas at apestaart dot org>
* gst/level/Makefile.am:

View file

@ -231,6 +231,7 @@ gst_tcpclientsrc_get (GstPad * pad)
("ioctl failed: %s", g_strerror (errno)));
return NULL;
}
GST_LOG_OBJECT (src, "ioctl says %d bytes available", readsize);
buf = gst_buffer_new_and_alloc (readsize);
break;
case GST_TCP_PROTOCOL_TYPE_GDP:

View file

@ -229,8 +229,24 @@ gst_tcpserversink_handle_server_read (GstTCPServerSink * sink)
return TRUE;
}
static void
gst_tcpserversink_client_remove (GstTCPServerSink * sink, int fd)
{
/* FIXME: if we keep track of ip we can log it here and signal */
GST_DEBUG_OBJECT (sink, "removing client on fd %d", fd);
if (close (fd) != 0) {
GST_DEBUG_OBJECT (sink, "error closing fd %d: %s", fd, g_strerror (errno));
}
FD_CLR (fd, &sink->clientfds);
FD_CLR (fd, &sink->caps_sent);
FD_CLR (fd, &sink->streamheader_sent);
g_signal_emit (G_OBJECT (sink),
gst_tcpserversink_signals[SIGNAL_CLIENT_REMOVED], 0, NULL, fd);
}
/* handle a read on a client fd,
* which either indicates a close or should be ignored */
* which either indicates a close or should be ignored
* returns FALSE if the client has been closed. */
static gboolean
gst_tcpserversink_handle_client_read (GstTCPServerSink * sink, int fd)
{
@ -241,17 +257,9 @@ gst_tcpserversink_handle_client_read (GstTCPServerSink * sink, int fd)
ioctl (fd, FIONREAD, &nread);
if (nread == 0) {
/* client sent close, so remove it */
GST_DEBUG_OBJECT (sink, "removing client on fd %d", fd);
if (close (fd) != 0) {
GST_ELEMENT_ERROR (sink, RESOURCE, CLOSE, (NULL),
("error closing fd %d: %s", fd, g_strerror (errno)));
return FALSE;
}
FD_CLR (fd, &sink->clientfds);
FD_CLR (fd, &sink->caps_sent);
/* FIXME: we need to keep track of IP info so we can signal it here */
g_signal_emit (G_OBJECT (sink),
gst_tcpserversink_signals[SIGNAL_CLIENT_REMOVED], 0, NULL, fd);
GST_DEBUG_OBJECT (sink, "client asked for close, removing on fd %d", fd);
gst_tcpserversink_client_remove (sink, fd);
return FALSE;
} else {
/* FIXME: we should probably just Read 'n' Drop */
g_warning ("Don't know what to do with %d bytes to read", nread);
@ -259,40 +267,25 @@ gst_tcpserversink_handle_client_read (GstTCPServerSink * sink, int fd)
return TRUE;
}
/* handle a write on a client fd,
* which indicates a read request from a client */
static gboolean
gst_tcpserversink_handle_client_write (GstTCPServerSink * sink, int fd,
GstPad * pad, GstBuffer * buf)
/* Write a buffer to the given fd for the given element using the given
* protocol.
* Return number of buffer bytes written.
*/
static gint
gst_tcp_buffer_write (GstBuffer * buf, int fd, GstElement * element,
GstTCPProtocolType protocol)
{
gint wrote = 0;
/* write the buffer header if we have one */
switch (sink->protocol) {
switch (protocol) {
case GST_TCP_PROTOCOL_TYPE_NONE:
break;
case GST_TCP_PROTOCOL_TYPE_GDP:
/* if we haven't sent caps yet, send them first */
if (!FD_ISSET (fd, &(sink->caps_sent))) {
const GstCaps *caps;
gchar *string;
caps = GST_PAD_CAPS (GST_PAD_PEER (pad));
string = gst_caps_to_string (caps);
GST_DEBUG_OBJECT (sink, "Sending caps %s for fd %d through GDP", string,
fd);
/* FIXME: fix this again so that write_caps is non-fatal for multiple clients; also use a fd, host, port struct */
if (!gst_tcp_gdp_write_caps (GST_ELEMENT (sink), fd, caps, FALSE,
"unknown", 0)) {
g_free (string);
return FALSE;
}
g_free (string);
FD_SET (fd, &(sink->caps_sent));
}
GST_LOG_OBJECT (sink, "Sending buffer header through GDP");
if (!gst_tcp_gdp_write_header (GST_ELEMENT (sink), fd, buf, FALSE,
"unknown", 0))
return FALSE;
GST_LOG_OBJECT (element, "Sending buffer header through GDP");
if (!gst_tcp_gdp_write_header (element, fd, buf, FALSE, "unknown", 0))
return 0;
break;
default:
g_warning ("Unhandled protocol type");
@ -300,11 +293,9 @@ gst_tcpserversink_handle_client_write (GstTCPServerSink * sink, int fd,
}
/* serve data to client */
GST_LOG_OBJECT (sink, "serving data buffer of size %d to client on fd %d",
GST_LOG_OBJECT (element, "serving data buffer of size %d to client on fd %d",
GST_BUFFER_SIZE (buf), fd);
int wrote = 0;
wrote =
gst_tcp_socket_write (fd, GST_BUFFER_DATA (buf), GST_BUFFER_SIZE (buf));
@ -312,24 +303,78 @@ gst_tcpserversink_handle_client_write (GstTCPServerSink * sink, int fd,
/* FIXME: keep track of client ip and port and so on */
/*
GST_ELEMENT_ERROR (sink, RESOURCE, WRITE,
(_("Error while sending data to \"%s:%d\"."), sink->host, sink->port),
(_("Error while sending data to \"%s:%d\"."),
sink->host, sink->port),
("Only %d of %d bytes written: %s",
bytes_written, GST_BUFFER_SIZE (buf), g_strerror (errno)));
*/
/* FIXME: there should be a better way to report problems, since we
want to continue for other clients and just drop this particular one */
GST_DEBUG_OBJECT (sink, "Write failed: %d of %d bytes written", wrote,
GST_DEBUG_OBJECT (element, "Write failed: %d of %d bytes written", wrote,
GST_BUFFER_SIZE (buf));
}
return wrote;
}
/* handle a write on a client fd,
* which indicates a read request from a client */
static gboolean
gst_tcpserversink_handle_client_write (GstTCPServerSink * sink, int fd,
GstPad * pad, GstBuffer * buf)
{
gint wrote = 0;
/* when using GDP, first check if we have sent caps yet */
if (sink->protocol == GST_TCP_PROTOCOL_TYPE_GDP) {
if (!FD_ISSET (fd, &(sink->caps_sent))) {
const GstCaps *caps;
gchar *string;
caps = GST_PAD_CAPS (GST_PAD_PEER (pad));
string = gst_caps_to_string (caps);
GST_DEBUG_OBJECT (sink, "Sending caps %s for fd %d through GDP", string,
fd);
if (!gst_tcp_gdp_write_caps (GST_ELEMENT (sink), fd, caps, FALSE,
"unknown", 0)) {
GST_DEBUG_OBJECT (sink, "Failed sending caps, removing client");
gst_tcpserversink_client_remove (sink, fd);
g_free (string);
return FALSE;
}
g_free (string);
FD_SET (fd, &(sink->caps_sent));
}
}
/* if we have streamheader buffers, and haven't sent them to this client
* yet, send them out one by one */
if (!FD_ISSET (fd, &(sink->streamheader_sent))) {
if (sink->streamheader) {
GList *l;
for (l = sink->streamheader; l; l = l->next) {
wrote = gst_tcp_buffer_write (l->data, fd, GST_ELEMENT (sink),
sink->protocol);
if (wrote < GST_BUFFER_SIZE (l->data)) {
GST_DEBUG_OBJECT (sink,
"Failed sending streamheader, removing client");
gst_tcpserversink_client_remove (sink, fd);
}
}
}
FD_SET (fd, &(sink->streamheader_sent));
}
/* now we write the data buffer */
wrote = gst_tcp_buffer_write (buf, fd, GST_ELEMENT (sink), sink->protocol);
if (wrote < GST_BUFFER_SIZE (buf)) {
gst_tcpserversink_client_remove (sink, fd);
/* write failed, so drop the client */
GST_DEBUG_OBJECT (sink, "removing client on fd %d", fd);
if (close (fd) != 0) {
GST_DEBUG_OBJECT (sink, "error closing fd %d after failed write: %s",
fd, g_strerror (errno));
}
FD_CLR (fd, &sink->clientfds);
FD_CLR (fd, &sink->caps_sent);
g_signal_emit (G_OBJECT (sink),
gst_tcpserversink_signals[SIGNAL_CLIENT_REMOVED], 0, NULL, fd);
return FALSE;
}
return TRUE;
@ -354,10 +399,23 @@ gst_tcpserversink_chain (GstPad * pad, GstData * _data)
g_return_if_fail (GST_FLAG_IS_SET (sink, GST_TCPSERVERSINK_OPEN));
if (GST_IS_EVENT (buf)) {
g_warning ("FIXME: handl events");
g_warning ("FIXME: handle events");
return;
}
/* if the incoming buffer is marked as IN CAPS, then we assume for now
* it's a streamheader that needs to be sent to each new client, so we
* put it on our internal list of streamheader buffers.
* After that we return, since we only send these out when we get
* non IN_CAPS buffers so we properly keep track of clients that got
* streamheaders. */
if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_IN_CAPS)) {
GST_DEBUG_OBJECT (sink,
"appending IN_CAPS buffer with length %d to streamheader",
GST_BUFFER_SIZE (buf));
sink->streamheader = g_list_append (sink->streamheader, buf);
return;
}
/* if the incoming buffer has a duration, we can use that as the timeout
* value; otherwise, we block */
timeout.tv_sec = 0;
@ -428,7 +486,6 @@ gst_tcpserversink_chain (GstPad * pad, GstData * _data)
gst_buffer_unref (buf);
/* FIXME: emit signal ? */
}
static void
@ -551,8 +608,10 @@ gst_tcpserversink_init_send (GstTCPServerSink * this)
FD_ZERO (&this->clientfds);
FD_ZERO (&this->caps_sent);
FD_ZERO (&this->streamheader_sent);
FD_SET (this->server_sock_fd, &this->clientfds);
GST_FLAG_SET (this, GST_TCPSERVERSINK_OPEN);
this->streamheader = NULL;
this->data_written = 0;
@ -567,6 +626,14 @@ gst_tcpserversink_close (GstTCPServerSink * this)
this->server_sock_fd = -1;
}
if (this->streamheader) {
GList *l;
for (l = sink->streamheader; l; l = l->next) {
gst_buffer_unref (l->data);
}
g_list_free (this->streamheader);
}
GST_FLAG_UNSET (this, GST_TCPSERVERSINK_OPEN);
}

View file

@ -86,6 +86,7 @@ struct _GstTCPServerSink {
fd_set streamheader_sent; /* all the client file descriptors that have had
* streamheader sent */
GList *streamheader; /* GList of GstBuffers to use as streamheader */
GstTCPProtocolType protocol;
guint mtu;
GstClock *clock;