gst/tcp/gstmultifdsink.c: Fix wrong GList iteration that could crash the server when more then 2 clients disconnect a...

Original commit message from CVS:
* gst/tcp/gstmultifdsink.c: (gst_recover_policy_get_type),
(gst_multifdsink_class_init), (gst_multifdsink_add),
(gst_multifdsink_remove), (gst_multifdsink_clear),
(gst_multifdsink_client_remove),
(gst_multifdsink_handle_client_read),
(gst_multifdsink_client_queue_data),
(gst_multifdsink_client_queue_caps),
(gst_multifdsink_client_queue_buffer),
(gst_multifdsink_handle_client_write),
(gst_multifdsink_recover_client), (gst_multifdsink_queue_buffer),
(gst_multifdsink_handle_clients), (gst_multifdsink_thread),
(gst_multifdsink_init_send), (gst_multifdsink_close):
Fix wrong GList iteration that could crash the server when
more then 2 clients disconnect at the same time. Read all the
pending commands in one batch to recover from command storms under
very heavy load.
This commit is contained in:
Wim Taymans 2004-06-29 10:28:29 +00:00
parent 4f89d3a37b
commit d4474d8ad1
2 changed files with 92 additions and 35 deletions

View file

@ -1,3 +1,22 @@
2004-06-29 Wim Taymans <wim@fluendo.com>
* gst/tcp/gstmultifdsink.c: (gst_recover_policy_get_type),
(gst_multifdsink_class_init), (gst_multifdsink_add),
(gst_multifdsink_remove), (gst_multifdsink_clear),
(gst_multifdsink_client_remove),
(gst_multifdsink_handle_client_read),
(gst_multifdsink_client_queue_data),
(gst_multifdsink_client_queue_caps),
(gst_multifdsink_client_queue_buffer),
(gst_multifdsink_handle_client_write),
(gst_multifdsink_recover_client), (gst_multifdsink_queue_buffer),
(gst_multifdsink_handle_clients), (gst_multifdsink_thread),
(gst_multifdsink_init_send), (gst_multifdsink_close):
Fix wrong GList iteration that could crash the server when
more then 2 clients disconnect at the same time. Read all the
pending commands in one batch to recover from command storms under
very heavy load.
2004-06-28 Wim Taymans <wim@fluendo.com>
* gst/videomixer/videomixer.c: (gst_videomixer_pad_get_type),

View file

@ -43,9 +43,9 @@ G_STMT_START { \
write (WRITE_SOCKET(sink), &c, 1); \
} G_STMT_END
#define READ_COMMAND(sink, command) \
#define READ_COMMAND(sink, command, res) \
G_STMT_START { \
read(READ_SOCKET(sink), &command, 1); \
res = read(READ_SOCKET(sink), &command, 1); \
} G_STMT_END
/* elementfactory information */
@ -280,14 +280,16 @@ gst_multifdsink_add (GstMultiFdSink * sink, int fd)
client->sending = NULL;
g_mutex_lock (sink->clientslock);
sink->clients = g_list_prepend (sink->clients, client);
g_mutex_unlock (sink->clientslock);
/* we always read from a client */
FD_SET (fd, &sink->readfds);
sink->clients = g_list_prepend (sink->clients, client);
/* set the socket to non blocking */
fcntl (fd, F_SETFL, O_NONBLOCK);
/* we always read from a client */
FD_SET (fd, &sink->readfds);
SEND_COMMAND (sink, CONTROL_RESTART);
g_mutex_unlock (sink->clientslock);
g_signal_emit (G_OBJECT (sink),
gst_multifdsink_signals[SIGNAL_CLIENT_ADDED], 0, NULL, fd);
@ -316,13 +318,11 @@ gst_multifdsink_remove (GstMultiFdSink * sink, int fd)
void
gst_multifdsink_clear (GstMultiFdSink * sink)
{
GList *clients;
g_mutex_lock (sink->clientslock);
for (clients = sink->clients; clients; clients = g_list_next (clients)) {
while (sink->clients) {
GstTCPClient *client;
client = (GstTCPClient *) clients->data;
client = (GstTCPClient *) sink->clients->data;
gst_multifdsink_client_remove (sink, client);
}
g_mutex_unlock (sink->clientslock);
@ -335,11 +335,12 @@ gst_multifdsink_client_remove (GstMultiFdSink * sink, GstTCPClient * client)
/* FIXME: if we keep track of ip we can log it here and signal */
GST_DEBUG_OBJECT (sink, "removing client on fd %d", fd);
FD_CLR (fd, &sink->readfds);
FD_CLR (fd, &sink->writefds);
if (close (fd) != 0) {
GST_DEBUG_OBJECT (sink, "error closing fd %d: %s", fd, g_strerror (errno));
}
FD_CLR (fd, &sink->readfds);
FD_CLR (fd, &sink->writefds);
SEND_COMMAND (sink, CONTROL_RESTART);
sink->clients = g_list_remove (sink->clients, client);
@ -356,20 +357,34 @@ static gboolean
gst_multifdsink_handle_client_read (GstMultiFdSink * sink,
GstTCPClient * client)
{
int nread, fd;
int avail, fd;
fd = client->fd;
GST_LOG_OBJECT (sink, "select reports client read on fd %d", fd);
ioctl (fd, FIONREAD, &avail);
ioctl (fd, FIONREAD, &nread);
if (nread == 0) {
GST_LOG_OBJECT (sink, "select reports client read on fd %d of %d bytes",
fd, avail);
if (avail == 0) {
/* client sent close, so remove it */
GST_DEBUG_OBJECT (sink, "client asked for close, removing on fd %d", fd);
return FALSE;
} else {
/* FIXME: we should probably just Read 'n' Drop */
g_warning ("Don't know what to do with %d bytes to read", nread);
guint8 dummy[512];
gint nread;
/* just Read 'n' Drop */
do {
nread = read (fd, dummy, 512);
if (nread < -1) {
GST_DEBUG_OBJECT (sink, "could not read bytes from fd %d: %s",
fd, g_strerror (errno));
break;
}
avail -= nread;
}
while (avail > 0);
}
return TRUE;
}
@ -386,6 +401,7 @@ gst_multifdsink_client_queue_data (GstMultiFdSink * sink, GstTCPClient * client,
GST_LOG_OBJECT (sink, "Queueing data of length %d for fd %d",
len, client->fd);
client->sending = g_list_append (client->sending, buf);
return TRUE;
@ -588,8 +604,9 @@ gst_multifdsink_recover_client (GstMultiFdSink * sink, GstTCPClient * client)
/* FIXME: implement recover procedure here, like moving the position to
* the next keyframe, dropping buffers back to the beginning of the queue,
* stuff like that... */
GST_WARNING_OBJECT (sink, "client %p with fd %d is lagging",
client, client->fd);
GST_WARNING_OBJECT (sink,
"client %p with fd %d is lagging, recover using policy %d", client,
client->fd, sink->recover_policy);
switch (sink->recover_policy) {
case GST_RECOVER_POLICY_NONE:
/* do nothing, client will catch up or get kicked out when it reaches
@ -667,6 +684,11 @@ gst_multifdsink_queue_buffer (GstMultiFdSink * sink, GstBuffer * buf)
if (newpos != client->bufpos) {
client->bufpos = newpos;
client->discont = TRUE;
GST_WARNING_OBJECT (sink, "client %p with fd %d position reset to %d",
client, client->fd, client->bufpos);
} else {
GST_WARNING_OBJECT (sink,
"client %p with fd %d not recovering position", client, client->fd);
}
}
/* check hard max, remove client */
@ -697,7 +719,7 @@ gst_multifdsink_queue_buffer (GstMultiFdSink * sink, GstBuffer * buf)
for (clients = slow; clients; clients = g_list_next (clients)) {
GstTCPClient *client;
client = (GstTCPClient *) slow->data;
client = (GstTCPClient *) clients->data;
gst_multifdsink_client_remove (sink, client);
}
@ -744,6 +766,8 @@ gst_multifdsink_handle_clients (GstMultiFdSink * sink)
fclass = GST_MULTIFDSINK_GET_CLASS (sink);
do {
gboolean stop = FALSE;
try_again = FALSE;
/* check for:
@ -774,24 +798,36 @@ gst_multifdsink_handle_clients (GstMultiFdSink * sink)
GST_LOG_OBJECT (sink, "done select on client fds for writes");
gst_multifdsink_debug_fdset (sink, &testwritefds);
/* read all commands */
if (FD_ISSET (READ_SOCKET (sink), &testreadfds)) {
gchar command;
while (TRUE) {
gchar command;
int res;
READ_COMMAND (sink, command);
READ_COMMAND (sink, command, res);
if (res < 0) {
/* no more commands */
break;
}
switch (command) {
case CONTROL_RESTART:
/* need to restart the select call as the fd_set changed */
try_again = TRUE;
break;
case CONTROL_STOP:
/* stop this function */
return;
default:
g_warning ("multifdsink: unknown control message received");
break;
switch (command) {
case CONTROL_RESTART:
/* need to restart the select call as the fd_set changed */
try_again = TRUE;
break;
case CONTROL_STOP:
/* stop this function */
stop = TRUE;
break;
default:
g_warning ("multifdsink: unknown control message received");
break;
}
}
}
if (stop) {
return;
}
} while (try_again);
if (fclass->select)
@ -825,7 +861,7 @@ gst_multifdsink_handle_clients (GstMultiFdSink * sink)
for (clients = error; clients; clients = g_list_next (clients)) {
GstTCPClient *client;
client = (GstTCPClient *) error->data;
client = (GstTCPClient *) clients->data;
GST_LOG_OBJECT (sink, "removing client %p with fd %d with errors", client,
client->fd);
@ -958,6 +994,8 @@ gst_multifdsink_init_send (GstMultiFdSink * this)
perror ("creating socket pair");
}
FD_SET (READ_SOCKET (this), &this->readfds);
fcntl (READ_SOCKET (this), F_SETFL, O_NONBLOCK);
fcntl (WRITE_SOCKET (this), F_SETFL, O_NONBLOCK);
this->streamheader = NULL;
this->data_written = 0;