gst/tcp/: Updated for new gsttcp API.

Original commit message from CVS:
2005-09-27  Andy Wingo  <wingo@pobox.com>

* gst/tcp/gsttcpserversrc.c:
* gst/tcp/gsttcpclientsrc.c: Updated for new gsttcp API.

* gst/tcp/gsttcp.h:
* gst/tcp/gsttcp.c (gst_tcp_read_buffer): New function, factored
out of tcpclientsrc.c. Cancellable.
(gst_tcp_socket_read): Made private, cancellable, with better
diagnostics. Also the FIONREAD ioctl takes a int*, not a size_t*.
(gst_tcp_gdp_read_buffer): Made cancellable, actually returns the
whole buffer, and better diagnostics.
(gst_tcp_gdp_read_caps): Same.

* gst/sine/gstsinesrc.c (gst_sinesrc_wait): Add the base time.
This commit is contained in:
Andy Wingo 2005-09-27 16:37:12 +00:00
parent 9bea690fe1
commit 21881814bc
5 changed files with 332 additions and 304 deletions

View file

@ -1,5 +1,17 @@
2005-09-27 Andy Wingo <wingo@pobox.com> 2005-09-27 Andy Wingo <wingo@pobox.com>
* gst/tcp/gsttcpserversrc.c:
* gst/tcp/gsttcpclientsrc.c: Updated for new gsttcp API.
* gst/tcp/gsttcp.h:
* gst/tcp/gsttcp.c (gst_tcp_read_buffer): New function, factored
out of tcpclientsrc.c. Cancellable.
(gst_tcp_socket_read): Made private, cancellable, with better
diagnostics. Also the FIONREAD ioctl takes a int*, not a size_t*.
(gst_tcp_gdp_read_buffer): Made cancellable, actually returns the
whole buffer, and better diagnostics.
(gst_tcp_gdp_read_caps): Same.
* gst/sine/gstsinesrc.c (gst_sinesrc_wait): Add the base time. * gst/sine/gstsinesrc.c (gst_sinesrc_wait): Add the base time.
2005-09-26 Andy Wingo <wingo@pobox.com> 2005-09-26 Andy Wingo <wingo@pobox.com>

View file

@ -30,6 +30,11 @@
#include <arpa/inet.h> #include <arpa/inet.h>
#include <netdb.h> #include <netdb.h>
#include <unistd.h> #include <unistd.h>
#include <sys/ioctl.h>
#ifdef HAVE_FIONREAD_IN_SYS_FILIO
#include <sys/filio.h>
#endif
#include <glib.h> #include <glib.h>
#include <gst/gst.h> #include <gst/gst.h>
@ -125,27 +130,84 @@ gst_tcp_socket_write (int socket, const void *buf, size_t count)
* = 0: EOF * = 0: EOF
* > 0: bytes read * > 0: bytes read
*/ */
gint static GstFlowReturn
gst_tcp_socket_read (int socket, void *buf, size_t count) gst_tcp_socket_read (GstElement * this, int socket, void *buf, size_t count,
int cancel_fd)
{ {
size_t bytes_read = 0; fd_set testfds;
int maxfdp1;
ssize_t n;
size_t bytes_read;
int num_to_read;
bytes_read = 0;
while (bytes_read < count) { while (bytes_read < count) {
ssize_t ret = read (socket, buf + bytes_read, /* do a blocking select on the socket */
count - bytes_read); FD_ZERO (&testfds);
FD_SET (socket, &testfds);
if (cancel_fd >= 0)
FD_SET (cancel_fd, &testfds);
maxfdp1 = MAX (socket, cancel_fd) + 1;
if (ret < 0) /* no action (0) is an error too in our case */
GST_WARNING ("error while reading: %s", g_strerror (errno)); if (select (maxfdp1, &testfds, NULL, NULL, 0) <= 0)
if (ret <= 0) goto select_error;
return bytes_read;
bytes_read += ret; if (cancel_fd >= 0 && FD_ISSET (cancel_fd, &testfds))
goto cancelled;
/* ask how much is available for reading on the socket */
if (ioctl (socket, FIONREAD, &num_to_read) < 0)
goto ioctl_error;
/* sizeof(ssize_t) >= sizeof(int), so I know num_to_read <= SSIZE_MAX */
num_to_read = MIN (num_to_read, count - bytes_read);
n = read (socket, ((guint8 *) buf) + bytes_read, num_to_read);
if (n < 0)
goto read_error;
if (n < num_to_read)
goto short_read;
bytes_read += num_to_read;
} }
if (bytes_read < 0) return GST_FLOW_OK;
GST_WARNING ("error while reading: %s", g_strerror (errno));
else /* ERRORS */
GST_LOG ("read %d bytes succesfully", bytes_read); select_error:
return bytes_read; {
GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
("select failed: %s", g_strerror (errno)));
return GST_FLOW_ERROR;
}
cancelled:
{
GST_DEBUG_OBJECT (this, "Select was cancelled");
return GST_FLOW_WRONG_STATE;
}
ioctl_error:
{
GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
("ioctl failed: %s", g_strerror (errno)));
return GST_FLOW_ERROR;
}
read_error:
{
GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
("read failed: %s", g_strerror (errno)));
return GST_FLOW_ERROR;
}
short_read:
{
GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
("short read: wanted %d bytes, got %d", num_to_read, n));
return GST_FLOW_ERROR;
}
} }
/* close the socket and reset the fd. Used to clean up after errors. */ /* close the socket and reset the fd. Used to clean up after errors. */
@ -162,165 +224,246 @@ gst_tcp_socket_close (int *socket)
* - NULL, indicating a connection close or an error, to be handled with * - NULL, indicating a connection close or an error, to be handled with
* EOS * EOS
*/ */
GstBuffer * GstFlowReturn
gst_tcp_gdp_read_buffer (GstElement * this, int socket) gst_tcp_read_buffer (GstElement * this, int socket, int cancel_fd,
GstBuffer ** buf)
{ {
size_t header_length = GST_DP_HEADER_LENGTH; fd_set testfds;
size_t readsize; int ret;
guint8 *header = NULL; int maxfdp1;
ssize_t ret; ssize_t bytes_read;
GstBuffer *buffer; int readsize;
header = g_malloc (header_length); *buf = NULL;
readsize = header_length;
GST_LOG_OBJECT (this, "Reading %d bytes for buffer packet header", readsize); /* do a blocking select on the socket */
if ((ret = gst_tcp_socket_read (socket, header, readsize)) <= 0) FD_ZERO (&testfds);
FD_SET (socket, &testfds);
if (cancel_fd >= 0)
FD_SET (cancel_fd, &testfds);
maxfdp1 = MAX (socket, cancel_fd) + 1;
/* no action (0) is an error too in our case */
if ((ret = select (maxfdp1, &testfds, NULL, NULL, 0)) <= 0)
goto select_error;
if (cancel_fd >= 0 && FD_ISSET (cancel_fd, &testfds))
goto cancelled;
/* ask how much is available for reading on the socket */
if ((ret = ioctl (socket, FIONREAD, &readsize)) < 0)
goto ioctl_error;
/* sizeof(ssize_t) >= sizeof(int), so I know readsize <= SSIZE_MAX */
*buf = gst_buffer_new_and_alloc (readsize);
bytes_read = read (socket, GST_BUFFER_DATA (*buf), readsize);
if (bytes_read < 0)
goto read_error; goto read_error;
if (ret != readsize) if (bytes_read < readsize)
/* but mom, you promised to give me readsize bytes! */
goto short_read; goto short_read;
if (!gst_dp_validate_header (header_length, header)) GST_DEBUG_OBJECT (this, "returning buffer of size %d", GST_BUFFER_SIZE (buf));
goto validate_error; return GST_FLOW_OK;
GST_LOG_OBJECT (this, "validated buffer packet header");
buffer = gst_dp_buffer_from_header (header_length, header);
g_free (header);
GST_LOG_OBJECT (this, "created new buffer %p from packet header", buffer);
return buffer;
/* ERRORS */ /* ERRORS */
select_error:
{
GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
("select failed: %s", g_strerror (errno)));
return GST_FLOW_ERROR;
}
cancelled:
{
GST_DEBUG_OBJECT (this, "Select was cancelled");
return GST_FLOW_WRONG_STATE;
}
ioctl_error:
{
GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
("ioctl failed: %s", g_strerror (errno)));
return GST_FLOW_ERROR;
}
read_error: read_error:
{ {
if (ret == 0) { GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
/* if we read 0 bytes, and we're blocking, we hit eos */ ("read failed: %s", g_strerror (errno)));
GST_DEBUG ("blocking read returns 0, returning NULL"); gst_buffer_unref (*buf);
g_free (header); *buf = NULL;
return NULL; return GST_FLOW_ERROR;
} else {
GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
g_free (header);
return NULL;
}
} }
short_read: short_read:
{ {
GST_WARNING ("Wanted %d bytes, got %d bytes", (int) readsize, (int) ret); GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
g_warning ("Wanted %d bytes, got %d bytes", (int) readsize, (int) ret); ("short read: wanted %d bytes, got %d", readsize, bytes_read));
return NULL; gst_buffer_unref (*buf);
*buf = NULL;
return GST_FLOW_ERROR;
}
}
/* read a buffer from the given socket
* returns:
* - a GstBuffer in which data should be read
* - NULL, indicating a connection close or an error, to be handled with
* EOS
*/
GstFlowReturn
gst_tcp_gdp_read_buffer (GstElement * this, int socket, int cancel_fd,
GstBuffer ** buf)
{
GstFlowReturn ret;
guint8 *header = NULL;
GST_LOG_OBJECT (this, "Reading %d bytes for buffer packet header",
GST_DP_HEADER_LENGTH);
*buf = NULL;
header = g_malloc (GST_DP_HEADER_LENGTH);
ret = gst_tcp_socket_read (this, socket, header, GST_DP_HEADER_LENGTH,
cancel_fd);
if (ret != GST_FLOW_OK)
goto header_read_error;
if (!gst_dp_validate_header (GST_DP_HEADER_LENGTH, header))
goto validate_error;
if (gst_dp_header_payload_type (header) != GST_DP_PAYLOAD_BUFFER)
goto is_not_buffer;
GST_LOG_OBJECT (this, "validated buffer packet header");
*buf = gst_dp_buffer_from_header (GST_DP_HEADER_LENGTH, header);
g_free (header);
ret = gst_tcp_socket_read (this, socket, GST_BUFFER_DATA (*buf),
GST_BUFFER_SIZE (*buf), cancel_fd);
if (ret != GST_FLOW_OK)
goto data_read_error;
return GST_FLOW_OK;
/* ERRORS */
header_read_error:
{
g_free (header);
return ret;
} }
validate_error: validate_error:
{ {
GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
("GDP buffer packet header does not validate")); ("GDP buffer packet header does not validate"));
g_free (header); g_free (header);
return NULL; return GST_FLOW_ERROR;
}
is_not_buffer:
{
GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
("GDP packet contains something that is not a buffer"));
g_free (header);
return GST_FLOW_ERROR;
}
data_read_error:
{
gst_buffer_unref (*buf);
*buf = NULL;
return ret;
} }
} }
/* read the GDP caps packet from the given socket GstFlowReturn
* returns the caps, or NULL in case of an error */ gst_tcp_gdp_read_caps (GstElement * this, int socket, int cancel_fd,
GstCaps * GstCaps ** caps)
gst_tcp_gdp_read_caps (GstElement * this, int socket)
{ {
size_t header_length = GST_DP_HEADER_LENGTH; GstFlowReturn ret;
size_t readsize;
guint8 *header = NULL; guint8 *header = NULL;
guint8 *payload = NULL; guint8 *payload = NULL;
ssize_t ret; size_t payload_length;
GstCaps *caps;
gchar *string;
header = g_malloc (header_length); GST_LOG_OBJECT (this, "Reading %d bytes for caps packet header",
readsize = header_length; GST_DP_HEADER_LENGTH);
GST_LOG_OBJECT (this, "Reading %d bytes for caps packet header", readsize); *caps = NULL;
if ((ret = gst_tcp_socket_read (socket, header, readsize)) <= 0) header = g_malloc (GST_DP_HEADER_LENGTH);
goto read_error;
if (ret != readsize) ret = gst_tcp_socket_read (this, socket, header, GST_DP_HEADER_LENGTH,
goto short_read; cancel_fd);
if (!gst_dp_validate_header (header_length, header)) if (ret != GST_FLOW_OK)
goto validate_error; goto header_read_error;
readsize = gst_dp_header_payload_length (header); if (!gst_dp_validate_header (GST_DP_HEADER_LENGTH, header))
payload = g_malloc (readsize); goto header_validate_error;
GST_LOG_OBJECT (this, "Reading %d bytes for caps packet payload", readsize);
if ((ret = gst_tcp_socket_read (socket, payload, readsize)) < 0)
goto socket_read_error;
if (gst_dp_header_payload_type (header) != GST_DP_PAYLOAD_CAPS) if (gst_dp_header_payload_type (header) != GST_DP_PAYLOAD_CAPS)
goto is_not_caps; goto is_not_caps;
g_assert (ret == readsize); GST_LOG_OBJECT (this, "validated caps packet header");
if (!gst_dp_validate_payload (readsize, header, payload)) payload_length = gst_dp_header_payload_length (header);
goto packet_validate_error; payload = g_malloc (payload_length);
caps = gst_dp_caps_from_packet (header_length, header, payload); GST_LOG_OBJECT (this, "Reading %d bytes for caps packet payload",
string = gst_caps_to_string (caps); payload_length);
GST_LOG_OBJECT (this, "retrieved GDP caps from packet payload: %s", string);
g_free (string); ret = gst_tcp_socket_read (this, socket, payload, payload_length, cancel_fd);
if (ret != GST_FLOW_OK)
goto payload_read_error;
if (!gst_dp_validate_payload (payload_length, header, payload))
goto payload_validate_error;
*caps = gst_dp_caps_from_packet (GST_DP_HEADER_LENGTH, header, payload);
GST_DEBUG_OBJECT (this, "Got caps over GDP: %" GST_PTR_FORMAT, *caps);
g_free (header); g_free (header);
g_free (payload); g_free (payload);
return caps; return GST_FLOW_OK;
/* ERRORS */ /* ERRORS */
read_error: header_read_error:
{ {
if (ret < 0) { g_free (header);
g_free (header); return ret;
GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
return NULL;
}
if (ret == 0) {
GST_WARNING_OBJECT (this, "read returned EOF");
return NULL;
}
} }
short_read: header_validate_error:
{
GST_WARNING_OBJECT (this, "Tried to read %d bytes but only read %d bytes",
readsize, ret);
return NULL;
}
validate_error:
{ {
GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
("GDP caps packet header does not validate")); ("GDP caps packet header does not validate"));
g_free (header); g_free (header);
return NULL; return GST_FLOW_ERROR;
}
socket_read_error:
{
GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
g_free (header);
g_free (payload);
return NULL;
} }
is_not_caps: is_not_caps:
{ {
GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
("Header read doesn't describe CAPS payload")); ("GDP packet contains something that is not a caps"));
g_free (header);
return GST_FLOW_ERROR;
}
payload_read_error:
{
g_free (header); g_free (header);
g_free (payload); g_free (payload);
return NULL; return ret;
} }
packet_validate_error: payload_validate_error:
{ {
GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
("GDP caps packet payload does not validate")); ("GDP caps packet payload does not validate"));
g_free (header); g_free (header);
g_free (payload); g_free (payload);
return NULL; return GST_FLOW_ERROR;
} }
} }

View file

@ -42,13 +42,15 @@ typedef enum
gchar * gst_tcp_host_to_ip (GstElement *element, const gchar *host); gchar * gst_tcp_host_to_ip (GstElement *element, const gchar *host);
gint gst_tcp_socket_write (int socket, const void *buf, size_t count); gint gst_tcp_socket_write (int socket, const void *buf, size_t count);
gint gst_tcp_socket_read (int socket, void *buf, size_t count);
void gst_tcp_socket_close (int *socket); void gst_tcp_socket_close (int *socket);
GstBuffer * gst_tcp_gdp_read_buffer (GstElement *elem, int socket); GstFlowReturn gst_tcp_read_buffer (GstElement * this, int socket, int cancel_fd, GstBuffer **buf);
GstEvent * gst_tcp_gdp_read_event (GstElement *elem, int socket);
GstCaps * gst_tcp_gdp_read_caps (GstElement *elem, int socket); GstFlowReturn gst_tcp_gdp_read_buffer (GstElement * this, int socket, int cancel_fd, GstBuffer **buf);
GstFlowReturn gst_tcp_gdp_read_caps (GstElement * this, int socket, int cancel_fd, GstCaps **caps);
GstEvent * gst_tcp_gdp_read_event (GstElement *elem, int socket, int cancel_fd);
gboolean gst_tcp_gdp_write_buffer (GstElement *elem, int socket, GstBuffer *buffer, gboolean fatal, const gchar *host, int port); gboolean gst_tcp_gdp_write_buffer (GstElement *elem, int socket, GstBuffer *buffer, gboolean fatal, const gchar *host, int port);
gboolean gst_tcp_gdp_write_event (GstElement *elem, int socket, GstEvent *event, gboolean fatal, const gchar *host, int port); gboolean gst_tcp_gdp_write_event (GstElement *elem, int socket, GstEvent *event, gboolean fatal, const gchar *host, int port);

View file

@ -212,9 +212,7 @@ static GstFlowReturn
gst_tcpclientsrc_create (GstPushSrc * psrc, GstBuffer ** outbuf) gst_tcpclientsrc_create (GstPushSrc * psrc, GstBuffer ** outbuf)
{ {
GstTCPClientSrc *src; GstTCPClientSrc *src;
size_t readsize; GstFlowReturn ret;
int ret;
GstBuffer *buf = NULL;
src = GST_TCPCLIENTSRC (psrc); src = GST_TCPCLIENTSRC (psrc);
@ -225,35 +223,13 @@ gst_tcpclientsrc_create (GstPushSrc * psrc, GstBuffer ** outbuf)
/* read the buffer header if we're using a protocol */ /* read the buffer header if we're using a protocol */
switch (src->protocol) { switch (src->protocol) {
fd_set testfds;
case GST_TCP_PROTOCOL_NONE: case GST_TCP_PROTOCOL_NONE:
/* do a blocking select on the socket */ ret = gst_tcp_read_buffer (GST_ELEMENT (src), src->sock_fd, -1, outbuf);
FD_ZERO (&testfds);
FD_SET (src->sock_fd, &testfds);
/* no action (0) is an error too in our case */
if ((ret = select (src->sock_fd + 1, &testfds, NULL, NULL, 0)) <= 0)
goto select_error;
/* ask how much is available for reading on the socket */
if ((ret = ioctl (src->sock_fd, FIONREAD, &readsize)) < 0)
goto ioctl_error;
GST_LOG_OBJECT (src, "ioctl says %d bytes available", readsize);
buf = gst_buffer_new_and_alloc (readsize);
break; break;
case GST_TCP_PROTOCOL_GDP: case GST_TCP_PROTOCOL_GDP:
if (!(buf = gst_tcp_gdp_read_buffer (GST_ELEMENT (src), src->sock_fd))) ret = gst_tcp_gdp_read_buffer (GST_ELEMENT (src), src->sock_fd, -1,
goto hit_eos; outbuf);
GST_LOG_OBJECT (src, "Going to read data from socket into buffer %p",
buf);
/* use this new buffer to read data into */
readsize = GST_BUFFER_SIZE (buf);
break; break;
default: default:
/* need to assert as buf == NULL */ /* need to assert as buf == NULL */
@ -261,67 +237,24 @@ gst_tcpclientsrc_create (GstPushSrc * psrc, GstBuffer ** outbuf)
break; break;
} }
GST_LOG_OBJECT (src, "Reading %d bytes into buffer", readsize); if (ret == GST_FLOW_OK) {
if ((ret = GST_LOG_OBJECT (src,
gst_tcp_socket_read (src->sock_fd, GST_BUFFER_DATA (buf), "Returning buffer from _get of size %d, ts %"
readsize)) < 0) GST_TIME_FORMAT ", dur %" GST_TIME_FORMAT
goto read_error; ", offset %" G_GINT64_FORMAT ", offset_end %" G_GINT64_FORMAT,
GST_BUFFER_SIZE (*outbuf),
GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (*outbuf)),
GST_TIME_ARGS (GST_BUFFER_DURATION (*outbuf)),
GST_BUFFER_OFFSET (*outbuf), GST_BUFFER_OFFSET_END (*outbuf));
/* if we read 0 bytes, and we're blocking, we hit eos */ gst_buffer_set_caps (*outbuf, src->caps);
if (ret == 0) }
goto zero_read;
readsize = ret; return ret;
GST_BUFFER_SIZE (buf) = readsize;
src->curoffset += readsize;
GST_LOG_OBJECT (src,
"Returning buffer from _get of size %d, ts %"
GST_TIME_FORMAT ", dur %" GST_TIME_FORMAT
", offset %" G_GINT64_FORMAT ", offset_end %" G_GINT64_FORMAT,
GST_BUFFER_SIZE (buf), GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buf)),
GST_TIME_ARGS (GST_BUFFER_DURATION (buf)),
GST_BUFFER_OFFSET (buf), GST_BUFFER_OFFSET_END (buf));
gst_buffer_set_caps (buf, src->caps);
*outbuf = buf;
return GST_FLOW_OK;
/* ERRORS */
wrong_state: wrong_state:
{ {
GST_DEBUG_OBJECT (src, "connection to server closed, cannot give data"); GST_DEBUG_OBJECT (src, "connection to closed, cannot read data");
return GST_FLOW_WRONG_STATE;
}
select_error:
{
GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
("select failed: %s", g_strerror (errno)));
return GST_FLOW_ERROR;
}
ioctl_error:
{
GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
("ioctl failed: %s", g_strerror (errno)));
return GST_FLOW_ERROR;
}
hit_eos:
{
return GST_FLOW_WRONG_STATE;
}
read_error:
{
GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
gst_buffer_unref (buf);
return GST_FLOW_ERROR;
}
zero_read:
{
GST_DEBUG_OBJECT (src, "blocking read returns 0, EOS");
gst_buffer_unref (buf);
return GST_FLOW_WRONG_STATE; return GST_FLOW_WRONG_STATE;
} }
} }
@ -330,10 +263,7 @@ static void
gst_tcpclientsrc_set_property (GObject * object, guint prop_id, gst_tcpclientsrc_set_property (GObject * object, guint prop_id,
const GValue * value, GParamSpec * pspec) const GValue * value, GParamSpec * pspec)
{ {
GstTCPClientSrc *tcpclientsrc; GstTCPClientSrc *tcpclientsrc = GST_TCPCLIENTSRC (object);
g_return_if_fail (GST_IS_TCPCLIENTSRC (object));
tcpclientsrc = GST_TCPCLIENTSRC (object);
switch (prop_id) { switch (prop_id) {
case ARG_HOST: case ARG_HOST:
@ -361,10 +291,7 @@ static void
gst_tcpclientsrc_get_property (GObject * object, guint prop_id, GValue * value, gst_tcpclientsrc_get_property (GObject * object, guint prop_id, GValue * value,
GParamSpec * pspec) GParamSpec * pspec)
{ {
GstTCPClientSrc *tcpclientsrc; GstTCPClientSrc *tcpclientsrc = GST_TCPCLIENTSRC (object);
g_return_if_fail (GST_IS_TCPCLIENTSRC (object));
tcpclientsrc = GST_TCPCLIENTSRC (object);
switch (prop_id) { switch (prop_id) {
case ARG_HOST: case ARG_HOST:
@ -440,18 +367,15 @@ gst_tcpclientsrc_start (GstBaseSrc * bsrc)
if (src->protocol == GST_TCP_PROTOCOL_GDP) { if (src->protocol == GST_TCP_PROTOCOL_GDP) {
/* if we haven't received caps yet, we should get them first */ /* if we haven't received caps yet, we should get them first */
if (!src->caps_received) { if (!src->caps_received) {
GstFlowReturn fret;
GstCaps *caps; GstCaps *caps;
GST_DEBUG_OBJECT (src, "getting caps through GDP"); GST_DEBUG_OBJECT (src, "getting caps through GDP");
if (!(caps = gst_tcp_gdp_read_caps (GST_ELEMENT (src), src->sock_fd))) fret = gst_tcp_gdp_read_caps (GST_ELEMENT (src), src->sock_fd, -1, &caps);
goto no_caps;
if (!GST_IS_CAPS (caps)) if (fret != GST_FLOW_OK)
goto no_caps; goto no_caps;
GST_DEBUG_OBJECT (src, "Received caps through GDP: %" GST_PTR_FORMAT,
caps);
src->caps_received = TRUE; src->caps_received = TRUE;
src->caps = caps; src->caps = caps;
} }

View file

@ -186,46 +186,30 @@ static GstFlowReturn
gst_tcpserversrc_create (GstPushSrc * psrc, GstBuffer ** outbuf) gst_tcpserversrc_create (GstPushSrc * psrc, GstBuffer ** outbuf)
{ {
GstTCPServerSrc *src; GstTCPServerSrc *src;
size_t readsize; GstFlowReturn ret;
int ret;
GstBuffer *buf = NULL;
GstCaps *caps;
src = GST_TCPSERVERSRC (psrc); src = GST_TCPSERVERSRC (psrc);
g_return_val_if_fail (GST_FLAG_IS_SET (src, GST_TCPSERVERSRC_OPEN), if (!GST_FLAG_IS_SET (src, GST_TCPSERVERSRC_OPEN))
GST_FLOW_ERROR); goto wrong_state;
GST_LOG_OBJECT (src, "asked for a buffer");
/* read the buffer header if we're using a protocol */
switch (src->protocol) { switch (src->protocol) {
case GST_TCP_PROTOCOL_NONE: case GST_TCP_PROTOCOL_NONE:
{ ret = gst_tcp_read_buffer (GST_ELEMENT (src), src->client_sock_fd, -1,
fd_set testfds; outbuf);
/* do a blocking select on the socket */
FD_ZERO (&testfds);
FD_SET (src->client_sock_fd, &testfds);
/* no action (0) is an error too in our case */
if ((ret =
select (src->client_sock_fd + 1, &testfds, (fd_set *) 0,
(fd_set *) 0, 0)) <= 0)
goto select_error;
/* ask how much is available for reading on the socket */
if ((ret = ioctl (src->client_sock_fd, FIONREAD, &readsize)) < 0)
goto ioctl_error;
buf = gst_buffer_new_and_alloc (readsize);
break; break;
}
case GST_TCP_PROTOCOL_GDP: case GST_TCP_PROTOCOL_GDP:
/* if we haven't received caps yet, we should get them first */
if (!src->caps_received) { if (!src->caps_received) {
GstCaps *caps;
gchar *string; gchar *string;
if (!(caps = ret = gst_tcp_gdp_read_caps (GST_ELEMENT (src), src->client_sock_fd, -1,
gst_tcp_gdp_read_caps (GST_ELEMENT (src), src->client_sock_fd))) &caps);
if (ret != GST_FLOW_OK)
goto gdp_caps_read_error; goto gdp_caps_read_error;
src->caps_received = TRUE; src->caps_received = TRUE;
@ -236,83 +220,46 @@ gst_tcpserversrc_create (GstPushSrc * psrc, GstBuffer ** outbuf)
gst_pad_set_caps (GST_BASE_SRC_PAD (psrc), caps); gst_pad_set_caps (GST_BASE_SRC_PAD (psrc), caps);
} }
/* now receive the buffer header */ ret = gst_tcp_gdp_read_buffer (GST_ELEMENT (src), src->client_sock_fd, -1,
if (!(buf = outbuf);
gst_tcp_gdp_read_buffer (GST_ELEMENT (src), src->client_sock_fd)))
goto gdp_buffer_read_error;
GST_LOG_OBJECT (src, "Going to read data from socket into buffer %p", if (ret == GST_FLOW_OK)
buf); gst_buffer_set_caps (*outbuf, GST_PAD_CAPS (GST_BASE_SRC_PAD (src)));
/* use this new buffer to read data into */
readsize = GST_BUFFER_SIZE (buf);
break; break;
default: default:
g_warning ("Unhandled protocol type"); /* need to assert as buf == NULL */
g_assert ("Unhandled protocol type");
break; break;
} }
GST_LOG_OBJECT (src, "Reading %d bytes", readsize); if (ret == GST_FLOW_OK) {
if ((ret = GST_LOG_OBJECT (src,
gst_tcp_socket_read (src->client_sock_fd, GST_BUFFER_DATA (buf), "Returning buffer from _get of size %d, ts %"
readsize)) < 0) GST_TIME_FORMAT ", dur %" GST_TIME_FORMAT
goto read_error; ", offset %" G_GINT64_FORMAT ", offset_end %" G_GINT64_FORMAT,
GST_BUFFER_SIZE (*outbuf),
/* if we read 0 bytes, and we're blocking, we hit eos */ GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (*outbuf)),
if (ret == 0) GST_TIME_ARGS (GST_BUFFER_DURATION (*outbuf)),
goto hit_eos; GST_BUFFER_OFFSET (*outbuf), GST_BUFFER_OFFSET_END (*outbuf));
readsize = ret;
GST_LOG_OBJECT (src, "Read %d bytes", readsize);
GST_BUFFER_SIZE (buf) = readsize;
GST_BUFFER_OFFSET (buf) = src->curoffset;
GST_BUFFER_OFFSET_END (buf) = src->curoffset + readsize;
src->curoffset += readsize;
*outbuf = buf;
return GST_FLOW_OK;
/* ERROR */
select_error:
{
GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
("select failed: %s", g_strerror (errno)));
return GST_FLOW_ERROR;
} }
ioctl_error:
return ret;
wrong_state:
{ {
GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL), GST_DEBUG_OBJECT (src, "connection to closed, cannot read data");
("ioctl failed: %s", g_strerror (errno))); return GST_FLOW_WRONG_STATE;
return GST_FLOW_ERROR;
} }
gdp_caps_read_error: gdp_caps_read_error:
{ {
GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL), GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
("Could not read caps through GDP")); ("Could not read caps through GDP"));
return GST_FLOW_ERROR; return ret;
}
gdp_buffer_read_error:
{
GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
("Could not read buffer header through GDP"));
return GST_FLOW_ERROR;
}
read_error:
{
GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
gst_buffer_unref (buf);
return GST_FLOW_ERROR;
}
hit_eos:
{
GST_DEBUG ("blocking read returns 0, EOS");
gst_buffer_unref (buf);
return GST_FLOW_WRONG_STATE;
} }
} }
static void static void
gst_tcpserversrc_set_property (GObject * object, guint prop_id, gst_tcpserversrc_set_property (GObject * object, guint prop_id,
const GValue * value, GParamSpec * pspec) const GValue * value, GParamSpec * pspec)