mirror of
https://gitlab.freedesktop.org/gstreamer/gstreamer.git
synced 2025-03-07 12:21:17 +00:00
gst/tcp/gsttcpclientsrc.c: Make interruptable -- code stolen from fdsrc. Get caps in create() instead of start() so i...
Original commit message from CVS: 2005-09-28 Andy Wingo <wingo@pobox.com> * gst/tcp/gsttcpclientsrc.c: Make interruptable -- code stolen from fdsrc. Get caps in create() instead of start() so it can be interrupted. Interruption somewhat untested. * gst/tcp/gsttcp.c (gst_tcp_read_buffer, gst_tcp_socket_read): Proper EOS handling.
This commit is contained in:
parent
c2c41e9f01
commit
cd5ad0ec01
4 changed files with 96 additions and 27 deletions
|
@ -1,3 +1,12 @@
|
|||
2005-09-28 Andy Wingo <wingo@pobox.com>
|
||||
|
||||
* gst/tcp/gsttcpclientsrc.c: Make interruptable -- code stolen
|
||||
from fdsrc. Get caps in create() instead of start() so it can be
|
||||
interrupted. Interruption somewhat untested.
|
||||
|
||||
* gst/tcp/gsttcp.c (gst_tcp_read_buffer, gst_tcp_socket_read):
|
||||
Proper EOS handling.
|
||||
|
||||
2005-09-27 Andy Wingo <wingo@pobox.com>
|
||||
|
||||
* gst/tcp/gsttcpclientsrc.c: Cleaned up.
|
||||
|
|
|
@ -161,6 +161,9 @@ gst_tcp_socket_read (GstElement * this, int socket, void *buf, size_t count,
|
|||
if (ioctl (socket, FIONREAD, &num_to_read) < 0)
|
||||
goto ioctl_error;
|
||||
|
||||
if (num_to_read == 0)
|
||||
goto got_eos;
|
||||
|
||||
/* sizeof(ssize_t) >= sizeof(int), so I know num_to_read <= SSIZE_MAX */
|
||||
|
||||
num_to_read = MIN (num_to_read, count - bytes_read);
|
||||
|
@ -196,6 +199,11 @@ ioctl_error:
|
|||
("ioctl failed: %s", g_strerror (errno)));
|
||||
return GST_FLOW_ERROR;
|
||||
}
|
||||
got_eos:
|
||||
{
|
||||
GST_DEBUG_OBJECT (this, "Got EOS on socket stream");
|
||||
return GST_FLOW_WRONG_STATE;
|
||||
}
|
||||
read_error:
|
||||
{
|
||||
GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
|
||||
|
@ -254,6 +262,9 @@ gst_tcp_read_buffer (GstElement * this, int socket, int cancel_fd,
|
|||
if ((ret = ioctl (socket, FIONREAD, &readsize)) < 0)
|
||||
goto ioctl_error;
|
||||
|
||||
if (readsize == 0)
|
||||
goto got_eos;
|
||||
|
||||
/* sizeof(ssize_t) >= sizeof(int), so I know readsize <= SSIZE_MAX */
|
||||
|
||||
*buf = gst_buffer_new_and_alloc (readsize);
|
||||
|
@ -288,6 +299,11 @@ ioctl_error:
|
|||
("ioctl failed: %s", g_strerror (errno)));
|
||||
return GST_FLOW_ERROR;
|
||||
}
|
||||
got_eos:
|
||||
{
|
||||
GST_DEBUG_OBJECT (this, "Got EOS on socket stream");
|
||||
return GST_FLOW_WRONG_STATE;
|
||||
}
|
||||
read_error:
|
||||
{
|
||||
GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
|
||||
|
|
|
@ -29,6 +29,25 @@
|
|||
#include <string.h> /* memset */
|
||||
#include <unistd.h>
|
||||
#include <arpa/inet.h>
|
||||
#include <fcntl.h>
|
||||
|
||||
|
||||
/* control stuff stolen from fdsrc */
|
||||
#define CONTROL_STOP 'S' /* stop the select call */
|
||||
#define CONTROL_SOCKETS(src) src->control_fds
|
||||
#define WRITE_SOCKET(src) src->control_fds[1]
|
||||
#define READ_SOCKET(src) src->control_fds[0]
|
||||
|
||||
#define SEND_COMMAND(src, command) \
|
||||
G_STMT_START { \
|
||||
unsigned char c; c = command; \
|
||||
write (WRITE_SOCKET(src), &c, 1); \
|
||||
} G_STMT_END
|
||||
|
||||
#define READ_COMMAND(src, command, res) \
|
||||
G_STMT_START { \
|
||||
res = read(READ_SOCKET(src), &command, 1); \
|
||||
} G_STMT_END
|
||||
|
||||
|
||||
GST_DEBUG_CATEGORY (tcpclientsrc_debug);
|
||||
|
@ -136,6 +155,9 @@ gst_tcpclientsrc_init (GstTCPClientSrc * this, GstTCPClientSrcClass * g_class)
|
|||
this->caps = NULL;
|
||||
this->curoffset = 0;
|
||||
|
||||
READ_SOCKET (this) = -1;
|
||||
WRITE_SOCKET (this) = -1;
|
||||
|
||||
gst_base_src_set_live (GST_BASE_SRC (this), TRUE);
|
||||
|
||||
GST_FLAG_UNSET (this, GST_TCPCLIENTSRC_OPEN);
|
||||
|
@ -184,12 +206,28 @@ gst_tcpclientsrc_create (GstPushSrc * psrc, GstBuffer ** outbuf)
|
|||
/* read the buffer header if we're using a protocol */
|
||||
switch (src->protocol) {
|
||||
case GST_TCP_PROTOCOL_NONE:
|
||||
ret = gst_tcp_read_buffer (GST_ELEMENT (src), src->sock_fd, -1, outbuf);
|
||||
ret = gst_tcp_read_buffer (GST_ELEMENT (src), src->sock_fd,
|
||||
READ_SOCKET (src), outbuf);
|
||||
break;
|
||||
|
||||
case GST_TCP_PROTOCOL_GDP:
|
||||
ret = gst_tcp_gdp_read_buffer (GST_ELEMENT (src), src->sock_fd, -1,
|
||||
outbuf);
|
||||
/* get the caps if we're using GDP */
|
||||
if (!src->caps_received) {
|
||||
GstCaps *caps;
|
||||
|
||||
GST_DEBUG_OBJECT (src, "getting caps through GDP");
|
||||
ret = gst_tcp_gdp_read_caps (GST_ELEMENT (src), src->sock_fd,
|
||||
READ_SOCKET (src), &caps);
|
||||
|
||||
if (ret != GST_FLOW_OK)
|
||||
goto no_caps;
|
||||
|
||||
src->caps_received = TRUE;
|
||||
src->caps = caps;
|
||||
}
|
||||
|
||||
ret = gst_tcp_gdp_read_buffer (GST_ELEMENT (src), src->sock_fd,
|
||||
READ_SOCKET (src), outbuf);
|
||||
break;
|
||||
default:
|
||||
/* need to assert as buf == NULL */
|
||||
|
@ -217,6 +255,12 @@ wrong_state:
|
|||
GST_DEBUG_OBJECT (src, "connection to closed, cannot read data");
|
||||
return GST_FLOW_WRONG_STATE;
|
||||
}
|
||||
no_caps:
|
||||
{
|
||||
GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
|
||||
("Could not read caps through GDP"));
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
|
||||
static void
|
||||
|
@ -278,6 +322,13 @@ gst_tcpclientsrc_start (GstBaseSrc * bsrc)
|
|||
gchar *ip;
|
||||
GstTCPClientSrc *src = GST_TCPCLIENTSRC (bsrc);
|
||||
|
||||
/* create the control sockets before anything */
|
||||
if (socketpair (PF_UNIX, SOCK_STREAM, 0, CONTROL_SOCKETS (src)) < 0)
|
||||
goto socket_pair;
|
||||
|
||||
fcntl (READ_SOCKET (src), F_SETFL, O_NONBLOCK);
|
||||
fcntl (WRITE_SOCKET (src), F_SETFL, O_NONBLOCK);
|
||||
|
||||
/* create receiving client socket */
|
||||
GST_DEBUG_OBJECT (src, "opening receiving client socket to %s:%d",
|
||||
src->host, src->port);
|
||||
|
@ -323,25 +374,14 @@ gst_tcpclientsrc_start (GstBaseSrc * bsrc)
|
|||
}
|
||||
}
|
||||
|
||||
/* get the caps if we're using GDP */
|
||||
if (src->protocol == GST_TCP_PROTOCOL_GDP) {
|
||||
/* if we haven't received caps yet, we should get them first */
|
||||
if (!src->caps_received) {
|
||||
GstFlowReturn fret;
|
||||
GstCaps *caps;
|
||||
|
||||
GST_DEBUG_OBJECT (src, "getting caps through GDP");
|
||||
fret = gst_tcp_gdp_read_caps (GST_ELEMENT (src), src->sock_fd, -1, &caps);
|
||||
|
||||
if (fret != GST_FLOW_OK)
|
||||
goto no_caps;
|
||||
|
||||
src->caps_received = TRUE;
|
||||
src->caps = caps;
|
||||
}
|
||||
}
|
||||
return TRUE;
|
||||
|
||||
socket_pair:
|
||||
{
|
||||
GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ_WRITE, (NULL),
|
||||
GST_ERROR_SYSTEM);
|
||||
return FALSE;
|
||||
}
|
||||
no_socket:
|
||||
{
|
||||
GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL), GST_ERROR_SYSTEM);
|
||||
|
@ -352,13 +392,6 @@ name_resolv:
|
|||
gst_tcpclientsrc_stop (GST_BASE_SRC (src));
|
||||
return FALSE;
|
||||
}
|
||||
no_caps:
|
||||
{
|
||||
gst_tcpclientsrc_stop (GST_BASE_SRC (src));
|
||||
GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
|
||||
("Could not read caps through GDP"));
|
||||
return FALSE;
|
||||
}
|
||||
}
|
||||
|
||||
/* close the socket and associated resources
|
||||
|
@ -383,11 +416,21 @@ gst_tcpclientsrc_stop (GstBaseSrc * bsrc)
|
|||
}
|
||||
GST_FLAG_UNSET (src, GST_TCPCLIENTSRC_OPEN);
|
||||
|
||||
close (READ_SOCKET (src));
|
||||
close (WRITE_SOCKET (src));
|
||||
READ_SOCKET (src) = -1;
|
||||
WRITE_SOCKET (src) = -1;
|
||||
|
||||
return TRUE;
|
||||
}
|
||||
|
||||
/* will be called only between calls to start() and stop() */
|
||||
static gboolean
|
||||
gst_tcpclientsrc_unlock (GstBaseSrc * bsrc)
|
||||
{
|
||||
GstTCPClientSrc *src = GST_TCPCLIENTSRC (bsrc);
|
||||
|
||||
SEND_COMMAND (src, CONTROL_STOP);
|
||||
|
||||
return TRUE;
|
||||
}
|
||||
|
|
|
@ -65,6 +65,7 @@ struct _GstTCPClientSrc {
|
|||
|
||||
/* socket */
|
||||
int sock_fd;
|
||||
int control_fds[2];
|
||||
|
||||
/* number of bytes we've gotten */
|
||||
off_t curoffset;
|
||||
|
|
Loading…
Reference in a new issue