mirror of
https://gitlab.freedesktop.org/gstreamer/gstreamer.git
synced 2025-01-11 01:45:33 +00:00
tcp: Only read as much as is currently available from the socket
This commit is contained in:
parent
c2438ba868
commit
a649fe2d61
3 changed files with 108 additions and 8 deletions
|
@ -1160,11 +1160,17 @@ gst_multi_socket_sink_handle_client_read (GstMultiSocketSink * sink,
|
|||
* to write to us except for closing the socket, I guess it's because we
|
||||
* like to listen to our customers. */
|
||||
do {
|
||||
gssize navail;
|
||||
|
||||
GST_DEBUG_OBJECT (sink, "[socket %p] client wants us to read",
|
||||
client->socket);
|
||||
|
||||
navail = g_socket_get_available_bytes (client->socket);
|
||||
if (navail < 0)
|
||||
break;
|
||||
|
||||
nread =
|
||||
g_socket_receive (client->socket, dummy, sizeof (dummy),
|
||||
g_socket_receive (client->socket, dummy, MIN (navail, sizeof (dummy)),
|
||||
sink->cancellable, &err);
|
||||
if (first && nread == 0) {
|
||||
/* client sent close, so remove it */
|
||||
|
|
|
@ -180,6 +180,7 @@ gst_tcp_client_src_create (GstPushSrc * psrc, GstBuffer ** outbuf)
|
|||
gssize rret;
|
||||
GError *err = NULL;
|
||||
guint8 *data;
|
||||
gssize avail, read;
|
||||
|
||||
src = GST_TCP_CLIENT_SRC (psrc);
|
||||
|
||||
|
@ -189,16 +190,48 @@ gst_tcp_client_src_create (GstPushSrc * psrc, GstBuffer ** outbuf)
|
|||
GST_LOG_OBJECT (src, "asked for a buffer");
|
||||
|
||||
/* read the buffer header */
|
||||
*outbuf = gst_buffer_new_and_alloc (MAX_READ_SIZE);
|
||||
avail = g_socket_get_available_bytes (src->socket);
|
||||
if (avail < 0) {
|
||||
goto get_available_error;
|
||||
} else if (avail == 0) {
|
||||
GIOCondition condition;
|
||||
|
||||
if (!g_socket_condition_wait (src->socket,
|
||||
G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP, src->cancellable, &err))
|
||||
goto select_error;
|
||||
|
||||
condition =
|
||||
g_socket_condition_check (src->socket,
|
||||
G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP);
|
||||
|
||||
if ((condition & G_IO_ERR)) {
|
||||
GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
|
||||
("Socket in error state"));
|
||||
*outbuf = NULL;
|
||||
ret = GST_FLOW_ERROR;
|
||||
goto done;
|
||||
} else if ((condition & G_IO_HUP)) {
|
||||
GST_DEBUG_OBJECT (src, "Connection closed");
|
||||
*outbuf = NULL;
|
||||
ret = GST_FLOW_EOS;
|
||||
goto done;
|
||||
}
|
||||
avail = g_socket_get_available_bytes (src->socket);
|
||||
if (avail <= 0)
|
||||
goto get_available_error;
|
||||
}
|
||||
|
||||
read = MIN (avail, MAX_READ_SIZE);
|
||||
*outbuf = gst_buffer_new_and_alloc (read);
|
||||
data = gst_buffer_map (*outbuf, NULL, NULL, GST_MAP_READWRITE);
|
||||
rret =
|
||||
g_socket_receive (src->socket, (gchar *) data, MAX_READ_SIZE,
|
||||
g_socket_receive (src->socket, (gchar *) data, read,
|
||||
src->cancellable, &err);
|
||||
|
||||
if (rret == 0) {
|
||||
GST_DEBUG_OBJECT (src, "Connection closed");
|
||||
ret = GST_FLOW_EOS;
|
||||
gst_buffer_unmap (*outbuf, data, MAX_READ_SIZE);
|
||||
gst_buffer_unmap (*outbuf, data, read);
|
||||
gst_buffer_unref (*outbuf);
|
||||
*outbuf = NULL;
|
||||
} else if (ret < 0) {
|
||||
|
@ -210,7 +243,7 @@ gst_tcp_client_src_create (GstPushSrc * psrc, GstBuffer ** outbuf)
|
|||
GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
|
||||
("Failed to read from socket: %s", err->message));
|
||||
}
|
||||
gst_buffer_unmap (*outbuf, data, MAX_READ_SIZE);
|
||||
gst_buffer_unmap (*outbuf, data, read);
|
||||
gst_buffer_unref (*outbuf);
|
||||
*outbuf = NULL;
|
||||
} else {
|
||||
|
@ -228,8 +261,22 @@ gst_tcp_client_src_create (GstPushSrc * psrc, GstBuffer ** outbuf)
|
|||
}
|
||||
g_clear_error (&err);
|
||||
|
||||
done:
|
||||
return ret;
|
||||
|
||||
select_error:
|
||||
{
|
||||
GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
|
||||
("Select failed: %s", err->message));
|
||||
g_clear_error (&err);
|
||||
return GST_FLOW_ERROR;
|
||||
}
|
||||
get_available_error:
|
||||
{
|
||||
GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
|
||||
("Select to get available bytes from socket"));
|
||||
return GST_FLOW_ERROR;
|
||||
}
|
||||
wrong_state:
|
||||
{
|
||||
GST_DEBUG_OBJECT (src, "connection to closed, cannot read data");
|
||||
|
|
|
@ -162,7 +162,8 @@ gst_tcp_server_src_create (GstPushSrc * psrc, GstBuffer ** outbuf)
|
|||
{
|
||||
GstTCPServerSrc *src;
|
||||
GstFlowReturn ret = GST_FLOW_OK;
|
||||
gssize rret;
|
||||
gssize rret, avail;
|
||||
gsize read;
|
||||
GError *err = NULL;
|
||||
guint8 *data;
|
||||
|
||||
|
@ -184,10 +185,42 @@ gst_tcp_server_src_create (GstPushSrc * psrc, GstBuffer ** outbuf)
|
|||
GST_LOG_OBJECT (src, "asked for a buffer");
|
||||
|
||||
/* read the buffer header */
|
||||
*outbuf = gst_buffer_new_and_alloc (MAX_READ_SIZE);
|
||||
avail = g_socket_get_available_bytes (src->client_socket);
|
||||
if (avail < 0) {
|
||||
goto get_available_error;
|
||||
} else if (avail == 0) {
|
||||
GIOCondition condition;
|
||||
|
||||
if (!g_socket_condition_wait (src->client_socket,
|
||||
G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP, src->cancellable, &err))
|
||||
goto select_error;
|
||||
|
||||
condition =
|
||||
g_socket_condition_check (src->client_socket,
|
||||
G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP);
|
||||
|
||||
if ((condition & G_IO_ERR)) {
|
||||
GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
|
||||
("Socket in error state"));
|
||||
*outbuf = NULL;
|
||||
ret = GST_FLOW_ERROR;
|
||||
goto done;
|
||||
} else if ((condition & G_IO_HUP)) {
|
||||
GST_DEBUG_OBJECT (src, "Connection closed");
|
||||
*outbuf = NULL;
|
||||
ret = GST_FLOW_EOS;
|
||||
goto done;
|
||||
}
|
||||
avail = g_socket_get_available_bytes (src->client_socket);
|
||||
if (avail <= 0)
|
||||
goto get_available_error;
|
||||
}
|
||||
|
||||
read = MIN (avail, MAX_READ_SIZE);
|
||||
*outbuf = gst_buffer_new_and_alloc (read);
|
||||
data = gst_buffer_map (*outbuf, NULL, NULL, GST_MAP_READWRITE);
|
||||
rret =
|
||||
g_socket_receive (src->client_socket, (gchar *) data, MAX_READ_SIZE,
|
||||
g_socket_receive (src->client_socket, (gchar *) data, read,
|
||||
src->cancellable, &err);
|
||||
|
||||
if (rret == 0) {
|
||||
|
@ -223,6 +256,7 @@ gst_tcp_server_src_create (GstPushSrc * psrc, GstBuffer ** outbuf)
|
|||
}
|
||||
g_clear_error (&err);
|
||||
|
||||
done:
|
||||
return ret;
|
||||
|
||||
wrong_state:
|
||||
|
@ -241,6 +275,19 @@ accept_error:
|
|||
g_clear_error (&err);
|
||||
return GST_FLOW_ERROR;
|
||||
}
|
||||
select_error:
|
||||
{
|
||||
GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
|
||||
("Select failed: %s", err->message));
|
||||
g_clear_error (&err);
|
||||
return GST_FLOW_ERROR;
|
||||
}
|
||||
get_available_error:
|
||||
{
|
||||
GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
|
||||
("Select to get available bytes from socket"));
|
||||
return GST_FLOW_ERROR;
|
||||
}
|
||||
}
|
||||
|
||||
static void
|
||||
|
|
Loading…
Reference in a new issue