gst/tcp/gstmultifdsink.*: Added more debugging info. Changed the way clients are removed from the lists. Fixed a bug ...

Original commit message from CVS:
* gst/tcp/gstmultifdsink.c: (gst_multifdsink_add),
(gst_multifdsink_remove), (gst_multifdsink_clear),
(gst_multifdsink_remove_client_link),
(gst_multifdsink_handle_client_read),
(gst_multifdsink_client_queue_data),
(gst_multifdsink_client_queue_buffer),
(gst_multifdsink_handle_client_write),
(gst_multifdsink_queue_buffer), (gst_multifdsink_handle_clients),
(gst_multifdsink_chain), (gst_multifdsink_close):
* gst/tcp/gstmultifdsink.h:
Added more debugging info. Changed the way clients are
removed from the lists. Fixed a bug where a bad file descriptor
could cause many clients to be removed.
This commit is contained in:
Wim Taymans 2004-08-10 11:35:44 +00:00
parent 1e8be7e191
commit 3b5ba92cea
3 changed files with 127 additions and 59 deletions

View file

@ -1,3 +1,19 @@
2004-08-10 Wim Taymans <wim@fluendo.com>
* gst/tcp/gstmultifdsink.c: (gst_multifdsink_add),
(gst_multifdsink_remove), (gst_multifdsink_clear),
(gst_multifdsink_remove_client_link),
(gst_multifdsink_handle_client_read),
(gst_multifdsink_client_queue_data),
(gst_multifdsink_client_queue_buffer),
(gst_multifdsink_handle_client_write),
(gst_multifdsink_queue_buffer), (gst_multifdsink_handle_clients),
(gst_multifdsink_chain), (gst_multifdsink_close):
* gst/tcp/gstmultifdsink.h:
Added more debugging info. Changed the way clients are
removed from the lists. Fixed a bug where a bad file descriptor
could cause many clients to be removed.
2004-08-06 Benjamin Otte <in7y118@public.uni-hamburg.de> 2004-08-06 Benjamin Otte <in7y118@public.uni-hamburg.de>
* gst/videotestsrc/gstvideotestsrc.c: (generate_capslist): * gst/videotestsrc/gstvideotestsrc.c: (generate_capslist):

View file

@ -125,8 +125,8 @@ static void gst_multifdsink_base_init (gpointer g_class);
static void gst_multifdsink_class_init (GstMultiFdSinkClass * klass); static void gst_multifdsink_class_init (GstMultiFdSinkClass * klass);
static void gst_multifdsink_init (GstMultiFdSink * multifdsink); static void gst_multifdsink_init (GstMultiFdSink * multifdsink);
static void gst_multifdsink_client_remove (GstMultiFdSink * sink, static void gst_multifdsink_remove_client_link (GstMultiFdSink * sink,
GstTCPClient * client); GList * link);
static void gst_multifdsink_chain (GstPad * pad, GstData * _data); static void gst_multifdsink_chain (GstPad * pad, GstData * _data);
static GstElementStateReturn gst_multifdsink_change_state (GstElement * static GstElementStateReturn gst_multifdsink_change_state (GstElement *
@ -307,7 +307,7 @@ gst_multifdsink_add (GstMultiFdSink * sink, int fd)
/* create client datastructure */ /* create client datastructure */
client = g_new0 (GstTCPClient, 1); client = g_new0 (GstTCPClient, 1);
client->fd = fd; client->fd = fd;
client->bad = FALSE; client->status = GST_CLIENT_STATUS_OK;
client->bufpos = -1; client->bufpos = -1;
client->bufoffset = 0; client->bufoffset = 0;
client->sending = NULL; client->sending = NULL;
@ -341,19 +341,21 @@ gst_multifdsink_add (GstMultiFdSink * sink, int fd)
void void
gst_multifdsink_remove (GstMultiFdSink * sink, int fd) gst_multifdsink_remove (GstMultiFdSink * sink, int fd)
{ {
GList *clients; GList *clients, *next;
GST_DEBUG_OBJECT (sink, "removing client on fd %d", fd); GST_DEBUG_OBJECT (sink, "removing client on fd %d", fd);
g_mutex_lock (sink->clientslock); g_mutex_lock (sink->clientslock);
/* loop over the clients to find the one with the fd */ /* loop over the clients to find the one with the fd */
for (clients = sink->clients; clients; clients = g_list_next (clients)) { for (clients = sink->clients; clients; clients = next) {
GstTCPClient *client; GstTCPClient *client;
client = (GstTCPClient *) clients->data; client = (GstTCPClient *) clients->data;
next = g_list_next (clients);
if (client->fd == fd) { if (client->fd == fd) {
gst_multifdsink_client_remove (sink, client); client->status = GST_CLIENT_STATUS_REMOVED;
gst_multifdsink_remove_client_link (sink, clients);
break; break;
} }
} }
@ -363,14 +365,19 @@ gst_multifdsink_remove (GstMultiFdSink * sink, int fd)
void void
gst_multifdsink_clear (GstMultiFdSink * sink) gst_multifdsink_clear (GstMultiFdSink * sink)
{ {
GList *clients, *next;
GST_DEBUG_OBJECT (sink, "clearing all clients"); GST_DEBUG_OBJECT (sink, "clearing all clients");
g_mutex_lock (sink->clientslock); g_mutex_lock (sink->clientslock);
while (sink->clients) { for (clients = sink->clients; clients; clients = next) {
GstTCPClient *client; GstTCPClient *client;
client = (GstTCPClient *) sink->clients->data; client = (GstTCPClient *) clients->data;
gst_multifdsink_client_remove (sink, client); next = g_list_next (clients);
client->status = GST_CLIENT_STATUS_REMOVED;
gst_multifdsink_remove_client_link (sink, clients);
} }
g_mutex_unlock (sink->clientslock); g_mutex_unlock (sink->clientslock);
} }
@ -437,16 +444,51 @@ gst_multifdsink_get_stats (GstMultiFdSink * sink, int fd)
/* should be called with the clientslock held */ /* should be called with the clientslock held */
static void static void
gst_multifdsink_client_remove (GstMultiFdSink * sink, GstTCPClient * client) gst_multifdsink_remove_client_link (GstMultiFdSink * sink, GList * link)
{ {
int fd = client->fd; int fd;
GTimeVal now; GTimeVal now;
GstTCPClient *client = (GstTCPClient *) link->data;
fd = client->fd;
/* FIXME: if we keep track of ip we can log it here and signal */ /* FIXME: if we keep track of ip we can log it here and signal */
GST_DEBUG_OBJECT (sink, "removing client on fd %d", fd); GST_DEBUG_OBJECT (sink, "removing client on fd %d", fd);
switch (client->status) {
case GST_CLIENT_STATUS_OK:
GST_WARNING_OBJECT (sink, "removing client %p with fd %d for no reason",
client, client->fd);
break;
case GST_CLIENT_STATUS_CLOSED:
GST_DEBUG_OBJECT (sink, "removing client %p with fd %d because of close",
client, client->fd);
break;
case GST_CLIENT_STATUS_REMOVED:
GST_DEBUG_OBJECT (sink,
"removing client %p with fd %d because the app removed it", client,
client->fd);
break;
case GST_CLIENT_STATUS_SLOW:
GST_INFO_OBJECT (sink,
"removing client %p with fd %d because it was too slow", client,
client->fd);
break;
case GST_CLIENT_STATUS_ERROR:
GST_WARNING_OBJECT (sink,
"removing client %p with fd %d because of error", client, client->fd);
break;
default:
GST_WARNING_OBJECT (sink,
"removing client %p with fd %d with invalid reason", client,
client->fd);
break;
}
FD_CLR (fd, &sink->readfds); FD_CLR (fd, &sink->readfds);
FD_CLR (fd, &sink->writefds); FD_CLR (fd, &sink->writefds);
if (close (fd) != 0) { if (close (fd) != 0) {
/* this is not really an error */
GST_DEBUG_OBJECT (sink, "error closing fd %d: %s", fd, g_strerror (errno)); GST_DEBUG_OBJECT (sink, "error closing fd %d: %s", fd, g_strerror (errno));
} }
SEND_COMMAND (sink, CONTROL_RESTART); SEND_COMMAND (sink, CONTROL_RESTART);
@ -454,21 +496,24 @@ gst_multifdsink_client_remove (GstMultiFdSink * sink, GstTCPClient * client)
g_get_current_time (&now); g_get_current_time (&now);
client->disconnect_time = GST_TIMEVAL_TO_TIME (now); client->disconnect_time = GST_TIMEVAL_TO_TIME (now);
/* unlock the mutex before signaling because the signal handler
* might query some properties */
g_mutex_unlock (sink->clientslock); g_mutex_unlock (sink->clientslock);
g_signal_emit (G_OBJECT (sink), g_signal_emit (G_OBJECT (sink),
gst_multifdsink_signals[SIGNAL_CLIENT_REMOVED], 0, fd); gst_multifdsink_signals[SIGNAL_CLIENT_REMOVED], 0, fd);
/* lock again before we remove the client completely */
g_mutex_lock (sink->clientslock); g_mutex_lock (sink->clientslock);
sink->clients = g_list_remove (sink->clients, client); sink->clients = g_list_delete_link (sink->clients, link);
g_free (client); g_free (client);
} }
/* handle a read on a client fd, /* handle a read on a client fd,
* which either indicates a close or should be ignored * which either indicates a close or should be ignored
* returns FALSE if the client has been closed. */ * returns FALSE if some error occured or the client closed. */
static gboolean static gboolean
gst_multifdsink_handle_client_read (GstMultiFdSink * sink, gst_multifdsink_handle_client_read (GstMultiFdSink * sink,
GstTCPClient * client) GstTCPClient * client)
@ -479,7 +524,9 @@ gst_multifdsink_handle_client_read (GstMultiFdSink * sink,
fd = client->fd; fd = client->fd;
if (ioctl (fd, FIONREAD, &avail) < 0) { if (ioctl (fd, FIONREAD, &avail) < 0) {
GST_WARNING_OBJECT (sink, "ioctl failed for fd %d", fd); GST_WARNING_OBJECT (sink, "ioctl failed for fd %d: %s",
fd, g_strerror (errno));
client->status = GST_CLIENT_STATUS_ERROR;
ret = FALSE; ret = FALSE;
return ret; return ret;
} }
@ -492,9 +539,11 @@ gst_multifdsink_handle_client_read (GstMultiFdSink * sink,
if (avail == 0) { if (avail == 0) {
/* client sent close, so remove it */ /* client sent close, so remove it */
GST_DEBUG_OBJECT (sink, "client asked for close, removing on fd %d", fd); GST_DEBUG_OBJECT (sink, "client asked for close, removing on fd %d", fd);
client->status = GST_CLIENT_STATUS_CLOSED;
ret = FALSE; ret = FALSE;
} else if (avail < 0) { } else if (avail < 0) {
GST_WARNING_OBJECT (sink, "avail < 0, removing on fd %d", fd); GST_WARNING_OBJECT (sink, "avail < 0, removing on fd %d", fd);
client->status = GST_CLIENT_STATUS_ERROR;
ret = FALSE; ret = FALSE;
} else { } else {
guint8 dummy[512]; guint8 dummy[512];
@ -514,10 +563,12 @@ gst_multifdsink_handle_client_read (GstMultiFdSink * sink,
if (nread < -1) { if (nread < -1) {
GST_WARNING_OBJECT (sink, "could not read bytes from fd %d: %s", GST_WARNING_OBJECT (sink, "could not read bytes from fd %d: %s",
fd, g_strerror (errno)); fd, g_strerror (errno));
client->status = GST_CLIENT_STATUS_ERROR;
ret = FALSE; ret = FALSE;
break; break;
} else if (nread == 0) { } else if (nread == 0) {
GST_WARNING_OBJECT (sink, "fd %d: gave 0 bytes in read, removing", fd); GST_WARNING_OBJECT (sink, "fd %d: gave 0 bytes in read, removing", fd);
client->status = GST_CLIENT_STATUS_ERROR;
ret = FALSE; ret = FALSE;
break; break;
} }
@ -541,7 +592,7 @@ gst_multifdsink_client_queue_data (GstMultiFdSink * sink, GstTCPClient * client,
GST_LOG_OBJECT (sink, "Queueing data of length %d for fd %d", GST_LOG_OBJECT (sink, "Queueing data of length %d for fd %d",
len, client->fd); len, client->fd);
client->sending = g_list_append (client->sending, buf); client->sending = g_slist_append (client->sending, buf);
return TRUE; return TRUE;
} }
@ -589,7 +640,7 @@ gst_multifdsink_client_queue_buffer (GstMultiFdSink * sink,
} }
gst_buffer_ref (buffer); gst_buffer_ref (buffer);
client->sending = g_list_append (client->sending, buffer); client->sending = g_slist_append (client->sending, buffer);
return TRUE; return TRUE;
} }
@ -617,6 +668,8 @@ gst_multifdsink_client_queue_buffer (GstMultiFdSink * sink,
* *
* When the sending returns a partial buffer we stop sending more data as the next send * When the sending returns a partial buffer we stop sending more data as the next send
* operation could block. * operation could block.
*
* This functions returns FALSE if some error occured.
*/ */
static gboolean static gboolean
gst_multifdsink_handle_client_write (GstMultiFdSink * sink, gst_multifdsink_handle_client_write (GstMultiFdSink * sink,
@ -649,7 +702,7 @@ gst_multifdsink_handle_client_write (GstMultiFdSink * sink,
* yet, send them out one by one */ * yet, send them out one by one */
if (!client->streamheader_sent) { if (!client->streamheader_sent) {
if (sink->streamheader) { if (sink->streamheader) {
GList *l; GSList *l;
for (l = sink->streamheader; l; l = l->next) { for (l = sink->streamheader; l; l = l->next) {
/* queue stream headers for sending */ /* queue stream headers for sending */
@ -719,6 +772,7 @@ gst_multifdsink_handle_client_write (GstMultiFdSink * sink,
} else { } else {
GST_DEBUG_OBJECT (sink, "could not write, removing client on fd %d", GST_DEBUG_OBJECT (sink, "could not write, removing client on fd %d",
fd); fd);
client->status = GST_CLIENT_STATUS_ERROR;
return FALSE; return FALSE;
} }
} else { } else {
@ -730,7 +784,7 @@ gst_multifdsink_handle_client_write (GstMultiFdSink * sink,
more = FALSE; more = FALSE;
} else { } else {
/* complete buffer was written, we can proceed to the next one */ /* complete buffer was written, we can proceed to the next one */
client->sending = g_list_remove (client->sending, head); client->sending = g_slist_remove (client->sending, head);
gst_buffer_unref (head); gst_buffer_unref (head);
/* make sure we start from byte 0 for the next buffer */ /* make sure we start from byte 0 for the next buffer */
client->bufoffset = 0; client->bufoffset = 0;
@ -808,9 +862,8 @@ gst_multifdsink_recover_client (GstMultiFdSink * sink, GstTCPClient * client)
static void static void
gst_multifdsink_queue_buffer (GstMultiFdSink * sink, GstBuffer * buf) gst_multifdsink_queue_buffer (GstMultiFdSink * sink, GstBuffer * buf)
{ {
GList *clients; GList *clients, *next;
gint queuelen; gint queuelen;
GList *slow = NULL;
gboolean need_signal = FALSE; gboolean need_signal = FALSE;
gint max_buffer_usage; gint max_buffer_usage;
gint i; gint i;
@ -827,10 +880,11 @@ gst_multifdsink_queue_buffer (GstMultiFdSink * sink, GstBuffer * buf)
/* then loop over the clients and update the positions */ /* then loop over the clients and update the positions */
max_buffer_usage = 0; max_buffer_usage = 0;
for (clients = sink->clients; clients; clients = g_list_next (clients)) { for (clients = sink->clients; clients; clients = next) {
GstTCPClient *client; GstTCPClient *client;
client = (GstTCPClient *) clients->data; client = (GstTCPClient *) clients->data;
next = g_list_next (clients);
client->bufpos++; client->bufpos++;
GST_LOG_OBJECT (sink, "client %p with fd %d at position %d", GST_LOG_OBJECT (sink, "client %p with fd %d at position %d",
@ -859,7 +913,8 @@ gst_multifdsink_queue_buffer (GstMultiFdSink * sink, GstBuffer * buf)
client, client->fd); client, client->fd);
FD_CLR (client->fd, &sink->readfds); FD_CLR (client->fd, &sink->readfds);
FD_CLR (client->fd, &sink->writefds); FD_CLR (client->fd, &sink->writefds);
slow = g_list_prepend (slow, client); client->status = GST_CLIENT_STATUS_SLOW;
gst_multifdsink_remove_client_link (sink, clients);
/* cannot send data to this client anymore. need to signal the select thread that /* cannot send data to this client anymore. need to signal the select thread that
* the fd_set changed */ * the fd_set changed */
need_signal = TRUE; need_signal = TRUE;
@ -876,15 +931,6 @@ gst_multifdsink_queue_buffer (GstMultiFdSink * sink, GstBuffer * buf)
max_buffer_usage = client->bufpos; max_buffer_usage = client->bufpos;
} }
} }
/* remove crap clients */
for (clients = slow; clients; clients = g_list_next (clients)) {
GstTCPClient *client;
client = (GstTCPClient *) clients->data;
gst_multifdsink_client_remove (sink, client);
}
g_list_free (slow);
/* nobody is referencing buffers after max_buffer_usage so we can /* nobody is referencing buffers after max_buffer_usage so we can
* remove them from the queue */ * remove them from the queue */
for (i = queuelen - 1; i > max_buffer_usage; i--) { for (i = queuelen - 1; i > max_buffer_usage; i--) {
@ -920,7 +966,7 @@ gst_multifdsink_handle_clients (GstMultiFdSink * sink)
{ {
int result; int result;
fd_set testreadfds, testwritefds; fd_set testreadfds, testwritefds;
GList *clients, *closed = NULL; GList *clients, *next;
gboolean try_again; gboolean try_again;
GstMultiFdSinkClass *fclass; GstMultiFdSinkClass *fclass;
@ -951,29 +997,35 @@ gst_multifdsink_handle_clients (GstMultiFdSink * sink)
GST_WARNING_OBJECT (sink, "select failed: %s", g_strerror (errno)); GST_WARNING_OBJECT (sink, "select failed: %s", g_strerror (errno));
if (errno == EBADF) { if (errno == EBADF) {
/* ok, so one of the fds is invalid. We loop over them to find one /* ok, so one of the fds is invalid. We loop over them to find one
* that gives an error to the F_GETFL fcntl. * that gives an error to the F_GETFL fcntl. */
*/
g_mutex_lock (sink->clientslock); g_mutex_lock (sink->clientslock);
for (clients = sink->clients; clients; clients = g_list_next (clients)) { for (clients = sink->clients; clients; clients = next) {
GstTCPClient *client; GstTCPClient *client;
int fd; int fd;
long flags; long flags;
int res; int res;
client = (GstTCPClient *) clients->data; client = (GstTCPClient *) clients->data;
next = g_list_next (clients);
fd = client->fd; fd = client->fd;
res = fcntl (fd, F_GETFL, &flags); res = fcntl (fd, F_GETFL, &flags);
if (res == -1) { if (res == -1) {
GST_WARNING_OBJECT (sink, "fnctl failed for %d, marking as bad: %s", GST_WARNING_OBJECT (sink, "fnctl failed for %d, removing: %s",
fd, g_strerror (errno)); fd, g_strerror (errno));
if (errno == EBADF) { if (errno == EBADF) {
client->bad = TRUE; client->status = GST_CLIENT_STATUS_ERROR;
gst_multifdsink_remove_client_link (sink, clients);
} }
} }
} }
g_mutex_unlock (sink->clientslock); g_mutex_unlock (sink->clientslock);
/* after this, go back in the select loop as the read/writefds
* are not valid */
try_again = TRUE;
} else if (errno == EINTR) { } else if (errno == EINTR) {
/* interrupted system call, just redo the select */
try_again = TRUE; try_again = TRUE;
} else { } else {
GST_ELEMENT_ERROR (sink, RESOURCE, READ, (NULL), GST_ELEMENT_ERROR (sink, RESOURCE, READ, (NULL),
@ -1025,13 +1077,15 @@ gst_multifdsink_handle_clients (GstMultiFdSink * sink)
/* Check the reads */ /* Check the reads */
g_mutex_lock (sink->clientslock); g_mutex_lock (sink->clientslock);
for (clients = sink->clients; clients; clients = g_list_next (clients)) { for (clients = sink->clients; clients; clients = next) {
GstTCPClient *client; GstTCPClient *client;
int fd; int fd;
client = (GstTCPClient *) clients->data; client = (GstTCPClient *) clients->data;
if (client->bad) { next = g_list_next (clients);
closed = g_list_prepend (closed, client);
if (client->status != GST_CLIENT_STATUS_OK) {
gst_multifdsink_remove_client_link (sink, clients);
continue; continue;
} }
@ -1040,29 +1094,18 @@ gst_multifdsink_handle_clients (GstMultiFdSink * sink)
if (FD_ISSET (fd, &testreadfds)) { if (FD_ISSET (fd, &testreadfds)) {
/* handle client read */ /* handle client read */
if (!gst_multifdsink_handle_client_read (sink, client)) { if (!gst_multifdsink_handle_client_read (sink, client)) {
closed = g_list_prepend (closed, client); gst_multifdsink_remove_client_link (sink, clients);
continue; continue;
} }
} }
if (FD_ISSET (fd, &testwritefds)) { if (FD_ISSET (fd, &testwritefds)) {
/* handle client write */ /* handle client write */
if (!gst_multifdsink_handle_client_write (sink, client)) { if (!gst_multifdsink_handle_client_write (sink, client)) {
closed = g_list_prepend (closed, client); gst_multifdsink_remove_client_link (sink, clients);
continue; continue;
} }
} }
} }
/* remove crappy clients */
for (clients = closed; clients; clients = g_list_next (clients)) {
GstTCPClient *client;
client = (GstTCPClient *) clients->data;
GST_DEBUG_OBJECT (sink, "removing client %p with fd %d because of close",
client, client->fd);
gst_multifdsink_client_remove (sink, client);
}
g_list_free (closed);
g_mutex_unlock (sink->clientslock); g_mutex_unlock (sink->clientslock);
} }
@ -1104,7 +1147,7 @@ gst_multifdsink_chain (GstPad * pad, GstData * _data)
GST_DEBUG_OBJECT (sink, GST_DEBUG_OBJECT (sink,
"appending IN_CAPS buffer with length %d to streamheader", "appending IN_CAPS buffer with length %d to streamheader",
GST_BUFFER_SIZE (buf)); GST_BUFFER_SIZE (buf));
sink->streamheader = g_list_append (sink->streamheader, buf); sink->streamheader = g_slist_append (sink->streamheader, buf);
return; return;
} }
@ -1239,12 +1282,12 @@ gst_multifdsink_close (GstMultiFdSink * this)
close (WRITE_SOCKET (this)); close (WRITE_SOCKET (this));
if (this->streamheader) { if (this->streamheader) {
GList *l; GSList *l;
for (l = this->streamheader; l; l = l->next) { for (l = this->streamheader; l; l = l->next) {
gst_buffer_unref (l->data); gst_buffer_unref (l->data);
} }
g_list_free (this->streamheader); g_slist_free (this->streamheader);
} }
if (fclass->close) if (fclass->close)

View file

@ -75,15 +75,24 @@ typedef enum
GST_RECOVER_POLICY_RESYNC_KEYFRAME, GST_RECOVER_POLICY_RESYNC_KEYFRAME,
} GstRecoverPolicy; } GstRecoverPolicy;
typedef enum
{
GST_CLIENT_STATUS_OK,
GST_CLIENT_STATUS_CLOSED,
GST_CLIENT_STATUS_REMOVED,
GST_CLIENT_STATUS_SLOW,
GST_CLIENT_STATUS_ERROR,
} GstClientStatus;
/* structure for a client /* structure for a client
* */ * */
typedef struct { typedef struct {
int fd; int fd;
gint bufpos; /* position of this client in the global queue */ gint bufpos; /* position of this client in the global queue */
gboolean bad; GstClientStatus status;
GList *sending; /* the buffers we need to send */ GSList *sending; /* the buffers we need to send */
gint bufoffset; /* offset in the first buffer */ gint bufoffset; /* offset in the first buffer */
gboolean discont; gboolean discont;
@ -120,7 +129,7 @@ struct _GstMultiFdSink {
int control_sock[2]; /* sockets for controlling the select call */ int control_sock[2]; /* sockets for controlling the select call */
GList *streamheader; /* GList of GstBuffers to use as streamheader */ GSList *streamheader; /* GSList of GstBuffers to use as streamheader */
GstTCPProtocolType protocol; GstTCPProtocolType protocol;
guint mtu; guint mtu;