From d4474d8ad19edf642f7bdda55373dd470e3d4ba2 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Tue, 29 Jun 2004 10:28:29 +0000 Subject: [PATCH] gst/tcp/gstmultifdsink.c: Fix wrong GList iteration that could crash the server when more then 2 clients disconnect a... Original commit message from CVS: * gst/tcp/gstmultifdsink.c: (gst_recover_policy_get_type), (gst_multifdsink_class_init), (gst_multifdsink_add), (gst_multifdsink_remove), (gst_multifdsink_clear), (gst_multifdsink_client_remove), (gst_multifdsink_handle_client_read), (gst_multifdsink_client_queue_data), (gst_multifdsink_client_queue_caps), (gst_multifdsink_client_queue_buffer), (gst_multifdsink_handle_client_write), (gst_multifdsink_recover_client), (gst_multifdsink_queue_buffer), (gst_multifdsink_handle_clients), (gst_multifdsink_thread), (gst_multifdsink_init_send), (gst_multifdsink_close): Fix wrong GList iteration that could crash the server when more then 2 clients disconnect at the same time. Read all the pending commands in one batch to recover from command storms under very heavy load. --- ChangeLog | 19 +++++++ gst/tcp/gstmultifdsink.c | 108 ++++++++++++++++++++++++++------------- 2 files changed, 92 insertions(+), 35 deletions(-) diff --git a/ChangeLog b/ChangeLog index b411337d24..bc2406e6e5 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,22 @@ +2004-06-29 Wim Taymans + + * gst/tcp/gstmultifdsink.c: (gst_recover_policy_get_type), + (gst_multifdsink_class_init), (gst_multifdsink_add), + (gst_multifdsink_remove), (gst_multifdsink_clear), + (gst_multifdsink_client_remove), + (gst_multifdsink_handle_client_read), + (gst_multifdsink_client_queue_data), + (gst_multifdsink_client_queue_caps), + (gst_multifdsink_client_queue_buffer), + (gst_multifdsink_handle_client_write), + (gst_multifdsink_recover_client), (gst_multifdsink_queue_buffer), + (gst_multifdsink_handle_clients), (gst_multifdsink_thread), + (gst_multifdsink_init_send), (gst_multifdsink_close): + Fix wrong GList iteration that could crash the server when + more then 2 clients disconnect at the same time. Read all the + pending commands in one batch to recover from command storms under + very heavy load. + 2004-06-28 Wim Taymans * gst/videomixer/videomixer.c: (gst_videomixer_pad_get_type), diff --git a/gst/tcp/gstmultifdsink.c b/gst/tcp/gstmultifdsink.c index 58415cd24d..b77f8ff7f9 100644 --- a/gst/tcp/gstmultifdsink.c +++ b/gst/tcp/gstmultifdsink.c @@ -43,9 +43,9 @@ G_STMT_START { \ write (WRITE_SOCKET(sink), &c, 1); \ } G_STMT_END -#define READ_COMMAND(sink, command) \ +#define READ_COMMAND(sink, command, res) \ G_STMT_START { \ - read(READ_SOCKET(sink), &command, 1); \ + res = read(READ_SOCKET(sink), &command, 1); \ } G_STMT_END /* elementfactory information */ @@ -280,14 +280,16 @@ gst_multifdsink_add (GstMultiFdSink * sink, int fd) client->sending = NULL; g_mutex_lock (sink->clientslock); - sink->clients = g_list_prepend (sink->clients, client); - g_mutex_unlock (sink->clientslock); - /* we always read from a client */ - FD_SET (fd, &sink->readfds); + sink->clients = g_list_prepend (sink->clients, client); /* set the socket to non blocking */ fcntl (fd, F_SETFL, O_NONBLOCK); + /* we always read from a client */ + FD_SET (fd, &sink->readfds); + SEND_COMMAND (sink, CONTROL_RESTART); + + g_mutex_unlock (sink->clientslock); g_signal_emit (G_OBJECT (sink), gst_multifdsink_signals[SIGNAL_CLIENT_ADDED], 0, NULL, fd); @@ -316,13 +318,11 @@ gst_multifdsink_remove (GstMultiFdSink * sink, int fd) void gst_multifdsink_clear (GstMultiFdSink * sink) { - GList *clients; - g_mutex_lock (sink->clientslock); - for (clients = sink->clients; clients; clients = g_list_next (clients)) { + while (sink->clients) { GstTCPClient *client; - client = (GstTCPClient *) clients->data; + client = (GstTCPClient *) sink->clients->data; gst_multifdsink_client_remove (sink, client); } g_mutex_unlock (sink->clientslock); @@ -335,11 +335,12 @@ gst_multifdsink_client_remove (GstMultiFdSink * sink, GstTCPClient * client) /* FIXME: if we keep track of ip we can log it here and signal */ GST_DEBUG_OBJECT (sink, "removing client on fd %d", fd); + FD_CLR (fd, &sink->readfds); + FD_CLR (fd, &sink->writefds); if (close (fd) != 0) { GST_DEBUG_OBJECT (sink, "error closing fd %d: %s", fd, g_strerror (errno)); } - FD_CLR (fd, &sink->readfds); - FD_CLR (fd, &sink->writefds); + SEND_COMMAND (sink, CONTROL_RESTART); sink->clients = g_list_remove (sink->clients, client); @@ -356,20 +357,34 @@ static gboolean gst_multifdsink_handle_client_read (GstMultiFdSink * sink, GstTCPClient * client) { - int nread, fd; + int avail, fd; fd = client->fd; - GST_LOG_OBJECT (sink, "select reports client read on fd %d", fd); + ioctl (fd, FIONREAD, &avail); - ioctl (fd, FIONREAD, &nread); - if (nread == 0) { + GST_LOG_OBJECT (sink, "select reports client read on fd %d of %d bytes", + fd, avail); + + if (avail == 0) { /* client sent close, so remove it */ GST_DEBUG_OBJECT (sink, "client asked for close, removing on fd %d", fd); return FALSE; } else { - /* FIXME: we should probably just Read 'n' Drop */ - g_warning ("Don't know what to do with %d bytes to read", nread); + guint8 dummy[512]; + gint nread; + + /* just Read 'n' Drop */ + do { + nread = read (fd, dummy, 512); + if (nread < -1) { + GST_DEBUG_OBJECT (sink, "could not read bytes from fd %d: %s", + fd, g_strerror (errno)); + break; + } + avail -= nread; + } + while (avail > 0); } return TRUE; } @@ -386,6 +401,7 @@ gst_multifdsink_client_queue_data (GstMultiFdSink * sink, GstTCPClient * client, GST_LOG_OBJECT (sink, "Queueing data of length %d for fd %d", len, client->fd); + client->sending = g_list_append (client->sending, buf); return TRUE; @@ -588,8 +604,9 @@ gst_multifdsink_recover_client (GstMultiFdSink * sink, GstTCPClient * client) /* FIXME: implement recover procedure here, like moving the position to * the next keyframe, dropping buffers back to the beginning of the queue, * stuff like that... */ - GST_WARNING_OBJECT (sink, "client %p with fd %d is lagging", - client, client->fd); + GST_WARNING_OBJECT (sink, + "client %p with fd %d is lagging, recover using policy %d", client, + client->fd, sink->recover_policy); switch (sink->recover_policy) { case GST_RECOVER_POLICY_NONE: /* do nothing, client will catch up or get kicked out when it reaches @@ -667,6 +684,11 @@ gst_multifdsink_queue_buffer (GstMultiFdSink * sink, GstBuffer * buf) if (newpos != client->bufpos) { client->bufpos = newpos; client->discont = TRUE; + GST_WARNING_OBJECT (sink, "client %p with fd %d position reset to %d", + client, client->fd, client->bufpos); + } else { + GST_WARNING_OBJECT (sink, + "client %p with fd %d not recovering position", client, client->fd); } } /* check hard max, remove client */ @@ -697,7 +719,7 @@ gst_multifdsink_queue_buffer (GstMultiFdSink * sink, GstBuffer * buf) for (clients = slow; clients; clients = g_list_next (clients)) { GstTCPClient *client; - client = (GstTCPClient *) slow->data; + client = (GstTCPClient *) clients->data; gst_multifdsink_client_remove (sink, client); } @@ -744,6 +766,8 @@ gst_multifdsink_handle_clients (GstMultiFdSink * sink) fclass = GST_MULTIFDSINK_GET_CLASS (sink); do { + gboolean stop = FALSE; + try_again = FALSE; /* check for: @@ -774,24 +798,36 @@ gst_multifdsink_handle_clients (GstMultiFdSink * sink) GST_LOG_OBJECT (sink, "done select on client fds for writes"); gst_multifdsink_debug_fdset (sink, &testwritefds); + /* read all commands */ if (FD_ISSET (READ_SOCKET (sink), &testreadfds)) { - gchar command; + while (TRUE) { + gchar command; + int res; - READ_COMMAND (sink, command); + READ_COMMAND (sink, command, res); + if (res < 0) { + /* no more commands */ + break; + } - switch (command) { - case CONTROL_RESTART: - /* need to restart the select call as the fd_set changed */ - try_again = TRUE; - break; - case CONTROL_STOP: - /* stop this function */ - return; - default: - g_warning ("multifdsink: unknown control message received"); - break; + switch (command) { + case CONTROL_RESTART: + /* need to restart the select call as the fd_set changed */ + try_again = TRUE; + break; + case CONTROL_STOP: + /* stop this function */ + stop = TRUE; + break; + default: + g_warning ("multifdsink: unknown control message received"); + break; + } } } + if (stop) { + return; + } } while (try_again); if (fclass->select) @@ -825,7 +861,7 @@ gst_multifdsink_handle_clients (GstMultiFdSink * sink) for (clients = error; clients; clients = g_list_next (clients)) { GstTCPClient *client; - client = (GstTCPClient *) error->data; + client = (GstTCPClient *) clients->data; GST_LOG_OBJECT (sink, "removing client %p with fd %d with errors", client, client->fd); @@ -958,6 +994,8 @@ gst_multifdsink_init_send (GstMultiFdSink * this) perror ("creating socket pair"); } FD_SET (READ_SOCKET (this), &this->readfds); + fcntl (READ_SOCKET (this), F_SETFL, O_NONBLOCK); + fcntl (WRITE_SOCKET (this), F_SETFL, O_NONBLOCK); this->streamheader = NULL; this->data_written = 0;