From 3b5ba92ceaae7072997133f128add69aeb5dc7c5 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Tue, 10 Aug 2004 11:35:44 +0000 Subject: [PATCH] gst/tcp/gstmultifdsink.*: Added more debugging info. Changed the way clients are removed from the lists. Fixed a bug ... Original commit message from CVS: * gst/tcp/gstmultifdsink.c: (gst_multifdsink_add), (gst_multifdsink_remove), (gst_multifdsink_clear), (gst_multifdsink_remove_client_link), (gst_multifdsink_handle_client_read), (gst_multifdsink_client_queue_data), (gst_multifdsink_client_queue_buffer), (gst_multifdsink_handle_client_write), (gst_multifdsink_queue_buffer), (gst_multifdsink_handle_clients), (gst_multifdsink_chain), (gst_multifdsink_close): * gst/tcp/gstmultifdsink.h: Added more debugging info. Changed the way clients are removed from the lists. Fixed a bug where a bad file descriptor could cause many clients to be removed. --- ChangeLog | 16 ++++ gst/tcp/gstmultifdsink.c | 155 +++++++++++++++++++++++++-------------- gst/tcp/gstmultifdsink.h | 15 +++- 3 files changed, 127 insertions(+), 59 deletions(-) diff --git a/ChangeLog b/ChangeLog index 020046ae67..5f5643f754 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,19 @@ +2004-08-10 Wim Taymans + + * gst/tcp/gstmultifdsink.c: (gst_multifdsink_add), + (gst_multifdsink_remove), (gst_multifdsink_clear), + (gst_multifdsink_remove_client_link), + (gst_multifdsink_handle_client_read), + (gst_multifdsink_client_queue_data), + (gst_multifdsink_client_queue_buffer), + (gst_multifdsink_handle_client_write), + (gst_multifdsink_queue_buffer), (gst_multifdsink_handle_clients), + (gst_multifdsink_chain), (gst_multifdsink_close): + * gst/tcp/gstmultifdsink.h: + Added more debugging info. Changed the way clients are + removed from the lists. Fixed a bug where a bad file descriptor + could cause many clients to be removed. + 2004-08-06 Benjamin Otte * gst/videotestsrc/gstvideotestsrc.c: (generate_capslist): diff --git a/gst/tcp/gstmultifdsink.c b/gst/tcp/gstmultifdsink.c index 9f9952e861..60eb000e75 100644 --- a/gst/tcp/gstmultifdsink.c +++ b/gst/tcp/gstmultifdsink.c @@ -125,8 +125,8 @@ static void gst_multifdsink_base_init (gpointer g_class); static void gst_multifdsink_class_init (GstMultiFdSinkClass * klass); static void gst_multifdsink_init (GstMultiFdSink * multifdsink); -static void gst_multifdsink_client_remove (GstMultiFdSink * sink, - GstTCPClient * client); +static void gst_multifdsink_remove_client_link (GstMultiFdSink * sink, + GList * link); static void gst_multifdsink_chain (GstPad * pad, GstData * _data); static GstElementStateReturn gst_multifdsink_change_state (GstElement * @@ -307,7 +307,7 @@ gst_multifdsink_add (GstMultiFdSink * sink, int fd) /* create client datastructure */ client = g_new0 (GstTCPClient, 1); client->fd = fd; - client->bad = FALSE; + client->status = GST_CLIENT_STATUS_OK; client->bufpos = -1; client->bufoffset = 0; client->sending = NULL; @@ -341,19 +341,21 @@ gst_multifdsink_add (GstMultiFdSink * sink, int fd) void gst_multifdsink_remove (GstMultiFdSink * sink, int fd) { - GList *clients; + GList *clients, *next; GST_DEBUG_OBJECT (sink, "removing client on fd %d", fd); g_mutex_lock (sink->clientslock); /* loop over the clients to find the one with the fd */ - for (clients = sink->clients; clients; clients = g_list_next (clients)) { + for (clients = sink->clients; clients; clients = next) { GstTCPClient *client; client = (GstTCPClient *) clients->data; + next = g_list_next (clients); if (client->fd == fd) { - gst_multifdsink_client_remove (sink, client); + client->status = GST_CLIENT_STATUS_REMOVED; + gst_multifdsink_remove_client_link (sink, clients); break; } } @@ -363,14 +365,19 @@ gst_multifdsink_remove (GstMultiFdSink * sink, int fd) void gst_multifdsink_clear (GstMultiFdSink * sink) { + GList *clients, *next; + GST_DEBUG_OBJECT (sink, "clearing all clients"); g_mutex_lock (sink->clientslock); - while (sink->clients) { + for (clients = sink->clients; clients; clients = next) { GstTCPClient *client; - client = (GstTCPClient *) sink->clients->data; - gst_multifdsink_client_remove (sink, client); + client = (GstTCPClient *) clients->data; + next = g_list_next (clients); + + client->status = GST_CLIENT_STATUS_REMOVED; + gst_multifdsink_remove_client_link (sink, clients); } g_mutex_unlock (sink->clientslock); } @@ -437,16 +444,51 @@ gst_multifdsink_get_stats (GstMultiFdSink * sink, int fd) /* should be called with the clientslock held */ static void -gst_multifdsink_client_remove (GstMultiFdSink * sink, GstTCPClient * client) +gst_multifdsink_remove_client_link (GstMultiFdSink * sink, GList * link) { - int fd = client->fd; + int fd; GTimeVal now; + GstTCPClient *client = (GstTCPClient *) link->data; + + fd = client->fd; /* FIXME: if we keep track of ip we can log it here and signal */ GST_DEBUG_OBJECT (sink, "removing client on fd %d", fd); + + switch (client->status) { + case GST_CLIENT_STATUS_OK: + GST_WARNING_OBJECT (sink, "removing client %p with fd %d for no reason", + client, client->fd); + break; + case GST_CLIENT_STATUS_CLOSED: + GST_DEBUG_OBJECT (sink, "removing client %p with fd %d because of close", + client, client->fd); + break; + case GST_CLIENT_STATUS_REMOVED: + GST_DEBUG_OBJECT (sink, + "removing client %p with fd %d because the app removed it", client, + client->fd); + break; + case GST_CLIENT_STATUS_SLOW: + GST_INFO_OBJECT (sink, + "removing client %p with fd %d because it was too slow", client, + client->fd); + break; + case GST_CLIENT_STATUS_ERROR: + GST_WARNING_OBJECT (sink, + "removing client %p with fd %d because of error", client, client->fd); + break; + default: + GST_WARNING_OBJECT (sink, + "removing client %p with fd %d with invalid reason", client, + client->fd); + break; + } + FD_CLR (fd, &sink->readfds); FD_CLR (fd, &sink->writefds); if (close (fd) != 0) { + /* this is not really an error */ GST_DEBUG_OBJECT (sink, "error closing fd %d: %s", fd, g_strerror (errno)); } SEND_COMMAND (sink, CONTROL_RESTART); @@ -454,21 +496,24 @@ gst_multifdsink_client_remove (GstMultiFdSink * sink, GstTCPClient * client) g_get_current_time (&now); client->disconnect_time = GST_TIMEVAL_TO_TIME (now); + /* unlock the mutex before signaling because the signal handler + * might query some properties */ g_mutex_unlock (sink->clientslock); g_signal_emit (G_OBJECT (sink), gst_multifdsink_signals[SIGNAL_CLIENT_REMOVED], 0, fd); + /* lock again before we remove the client completely */ g_mutex_lock (sink->clientslock); - sink->clients = g_list_remove (sink->clients, client); + sink->clients = g_list_delete_link (sink->clients, link); g_free (client); } /* handle a read on a client fd, * which either indicates a close or should be ignored - * returns FALSE if the client has been closed. */ + * returns FALSE if some error occured or the client closed. */ static gboolean gst_multifdsink_handle_client_read (GstMultiFdSink * sink, GstTCPClient * client) @@ -479,7 +524,9 @@ gst_multifdsink_handle_client_read (GstMultiFdSink * sink, fd = client->fd; if (ioctl (fd, FIONREAD, &avail) < 0) { - GST_WARNING_OBJECT (sink, "ioctl failed for fd %d", fd); + GST_WARNING_OBJECT (sink, "ioctl failed for fd %d: %s", + fd, g_strerror (errno)); + client->status = GST_CLIENT_STATUS_ERROR; ret = FALSE; return ret; } @@ -492,9 +539,11 @@ gst_multifdsink_handle_client_read (GstMultiFdSink * sink, if (avail == 0) { /* client sent close, so remove it */ GST_DEBUG_OBJECT (sink, "client asked for close, removing on fd %d", fd); + client->status = GST_CLIENT_STATUS_CLOSED; ret = FALSE; } else if (avail < 0) { GST_WARNING_OBJECT (sink, "avail < 0, removing on fd %d", fd); + client->status = GST_CLIENT_STATUS_ERROR; ret = FALSE; } else { guint8 dummy[512]; @@ -514,10 +563,12 @@ gst_multifdsink_handle_client_read (GstMultiFdSink * sink, if (nread < -1) { GST_WARNING_OBJECT (sink, "could not read bytes from fd %d: %s", fd, g_strerror (errno)); + client->status = GST_CLIENT_STATUS_ERROR; ret = FALSE; break; } else if (nread == 0) { GST_WARNING_OBJECT (sink, "fd %d: gave 0 bytes in read, removing", fd); + client->status = GST_CLIENT_STATUS_ERROR; ret = FALSE; break; } @@ -541,7 +592,7 @@ gst_multifdsink_client_queue_data (GstMultiFdSink * sink, GstTCPClient * client, GST_LOG_OBJECT (sink, "Queueing data of length %d for fd %d", len, client->fd); - client->sending = g_list_append (client->sending, buf); + client->sending = g_slist_append (client->sending, buf); return TRUE; } @@ -589,7 +640,7 @@ gst_multifdsink_client_queue_buffer (GstMultiFdSink * sink, } gst_buffer_ref (buffer); - client->sending = g_list_append (client->sending, buffer); + client->sending = g_slist_append (client->sending, buffer); return TRUE; } @@ -617,6 +668,8 @@ gst_multifdsink_client_queue_buffer (GstMultiFdSink * sink, * * When the sending returns a partial buffer we stop sending more data as the next send * operation could block. + * + * This functions returns FALSE if some error occured. */ static gboolean gst_multifdsink_handle_client_write (GstMultiFdSink * sink, @@ -649,7 +702,7 @@ gst_multifdsink_handle_client_write (GstMultiFdSink * sink, * yet, send them out one by one */ if (!client->streamheader_sent) { if (sink->streamheader) { - GList *l; + GSList *l; for (l = sink->streamheader; l; l = l->next) { /* queue stream headers for sending */ @@ -719,6 +772,7 @@ gst_multifdsink_handle_client_write (GstMultiFdSink * sink, } else { GST_DEBUG_OBJECT (sink, "could not write, removing client on fd %d", fd); + client->status = GST_CLIENT_STATUS_ERROR; return FALSE; } } else { @@ -730,7 +784,7 @@ gst_multifdsink_handle_client_write (GstMultiFdSink * sink, more = FALSE; } else { /* complete buffer was written, we can proceed to the next one */ - client->sending = g_list_remove (client->sending, head); + client->sending = g_slist_remove (client->sending, head); gst_buffer_unref (head); /* make sure we start from byte 0 for the next buffer */ client->bufoffset = 0; @@ -808,9 +862,8 @@ gst_multifdsink_recover_client (GstMultiFdSink * sink, GstTCPClient * client) static void gst_multifdsink_queue_buffer (GstMultiFdSink * sink, GstBuffer * buf) { - GList *clients; + GList *clients, *next; gint queuelen; - GList *slow = NULL; gboolean need_signal = FALSE; gint max_buffer_usage; gint i; @@ -827,10 +880,11 @@ gst_multifdsink_queue_buffer (GstMultiFdSink * sink, GstBuffer * buf) /* then loop over the clients and update the positions */ max_buffer_usage = 0; - for (clients = sink->clients; clients; clients = g_list_next (clients)) { + for (clients = sink->clients; clients; clients = next) { GstTCPClient *client; client = (GstTCPClient *) clients->data; + next = g_list_next (clients); client->bufpos++; GST_LOG_OBJECT (sink, "client %p with fd %d at position %d", @@ -859,7 +913,8 @@ gst_multifdsink_queue_buffer (GstMultiFdSink * sink, GstBuffer * buf) client, client->fd); FD_CLR (client->fd, &sink->readfds); FD_CLR (client->fd, &sink->writefds); - slow = g_list_prepend (slow, client); + client->status = GST_CLIENT_STATUS_SLOW; + gst_multifdsink_remove_client_link (sink, clients); /* cannot send data to this client anymore. need to signal the select thread that * the fd_set changed */ need_signal = TRUE; @@ -876,15 +931,6 @@ gst_multifdsink_queue_buffer (GstMultiFdSink * sink, GstBuffer * buf) max_buffer_usage = client->bufpos; } } - /* remove crap clients */ - for (clients = slow; clients; clients = g_list_next (clients)) { - GstTCPClient *client; - - client = (GstTCPClient *) clients->data; - - gst_multifdsink_client_remove (sink, client); - } - g_list_free (slow); /* nobody is referencing buffers after max_buffer_usage so we can * remove them from the queue */ for (i = queuelen - 1; i > max_buffer_usage; i--) { @@ -920,7 +966,7 @@ gst_multifdsink_handle_clients (GstMultiFdSink * sink) { int result; fd_set testreadfds, testwritefds; - GList *clients, *closed = NULL; + GList *clients, *next; gboolean try_again; GstMultiFdSinkClass *fclass; @@ -951,29 +997,35 @@ gst_multifdsink_handle_clients (GstMultiFdSink * sink) GST_WARNING_OBJECT (sink, "select failed: %s", g_strerror (errno)); if (errno == EBADF) { /* ok, so one of the fds is invalid. We loop over them to find one - * that gives an error to the F_GETFL fcntl. - */ + * that gives an error to the F_GETFL fcntl. */ g_mutex_lock (sink->clientslock); - for (clients = sink->clients; clients; clients = g_list_next (clients)) { + for (clients = sink->clients; clients; clients = next) { GstTCPClient *client; int fd; long flags; int res; client = (GstTCPClient *) clients->data; + next = g_list_next (clients); + fd = client->fd; res = fcntl (fd, F_GETFL, &flags); if (res == -1) { - GST_WARNING_OBJECT (sink, "fnctl failed for %d, marking as bad: %s", + GST_WARNING_OBJECT (sink, "fnctl failed for %d, removing: %s", fd, g_strerror (errno)); if (errno == EBADF) { - client->bad = TRUE; + client->status = GST_CLIENT_STATUS_ERROR; + gst_multifdsink_remove_client_link (sink, clients); } } } g_mutex_unlock (sink->clientslock); + /* after this, go back in the select loop as the read/writefds + * are not valid */ + try_again = TRUE; } else if (errno == EINTR) { + /* interrupted system call, just redo the select */ try_again = TRUE; } else { GST_ELEMENT_ERROR (sink, RESOURCE, READ, (NULL), @@ -1025,13 +1077,15 @@ gst_multifdsink_handle_clients (GstMultiFdSink * sink) /* Check the reads */ g_mutex_lock (sink->clientslock); - for (clients = sink->clients; clients; clients = g_list_next (clients)) { + for (clients = sink->clients; clients; clients = next) { GstTCPClient *client; int fd; client = (GstTCPClient *) clients->data; - if (client->bad) { - closed = g_list_prepend (closed, client); + next = g_list_next (clients); + + if (client->status != GST_CLIENT_STATUS_OK) { + gst_multifdsink_remove_client_link (sink, clients); continue; } @@ -1040,29 +1094,18 @@ gst_multifdsink_handle_clients (GstMultiFdSink * sink) if (FD_ISSET (fd, &testreadfds)) { /* handle client read */ if (!gst_multifdsink_handle_client_read (sink, client)) { - closed = g_list_prepend (closed, client); + gst_multifdsink_remove_client_link (sink, clients); continue; } } if (FD_ISSET (fd, &testwritefds)) { /* handle client write */ if (!gst_multifdsink_handle_client_write (sink, client)) { - closed = g_list_prepend (closed, client); + gst_multifdsink_remove_client_link (sink, clients); continue; } } } - /* remove crappy clients */ - for (clients = closed; clients; clients = g_list_next (clients)) { - GstTCPClient *client; - - client = (GstTCPClient *) clients->data; - - GST_DEBUG_OBJECT (sink, "removing client %p with fd %d because of close", - client, client->fd); - gst_multifdsink_client_remove (sink, client); - } - g_list_free (closed); g_mutex_unlock (sink->clientslock); } @@ -1104,7 +1147,7 @@ gst_multifdsink_chain (GstPad * pad, GstData * _data) GST_DEBUG_OBJECT (sink, "appending IN_CAPS buffer with length %d to streamheader", GST_BUFFER_SIZE (buf)); - sink->streamheader = g_list_append (sink->streamheader, buf); + sink->streamheader = g_slist_append (sink->streamheader, buf); return; } @@ -1239,12 +1282,12 @@ gst_multifdsink_close (GstMultiFdSink * this) close (WRITE_SOCKET (this)); if (this->streamheader) { - GList *l; + GSList *l; for (l = this->streamheader; l; l = l->next) { gst_buffer_unref (l->data); } - g_list_free (this->streamheader); + g_slist_free (this->streamheader); } if (fclass->close) diff --git a/gst/tcp/gstmultifdsink.h b/gst/tcp/gstmultifdsink.h index 66558e400a..deeae5a025 100644 --- a/gst/tcp/gstmultifdsink.h +++ b/gst/tcp/gstmultifdsink.h @@ -75,15 +75,24 @@ typedef enum GST_RECOVER_POLICY_RESYNC_KEYFRAME, } GstRecoverPolicy; +typedef enum +{ + GST_CLIENT_STATUS_OK, + GST_CLIENT_STATUS_CLOSED, + GST_CLIENT_STATUS_REMOVED, + GST_CLIENT_STATUS_SLOW, + GST_CLIENT_STATUS_ERROR, +} GstClientStatus; + /* structure for a client * */ typedef struct { int fd; gint bufpos; /* position of this client in the global queue */ - gboolean bad; + GstClientStatus status; - GList *sending; /* the buffers we need to send */ + GSList *sending; /* the buffers we need to send */ gint bufoffset; /* offset in the first buffer */ gboolean discont; @@ -120,7 +129,7 @@ struct _GstMultiFdSink { int control_sock[2]; /* sockets for controlling the select call */ - GList *streamheader; /* GList of GstBuffers to use as streamheader */ + GSList *streamheader; /* GSList of GstBuffers to use as streamheader */ GstTCPProtocolType protocol; guint mtu;