From 870c00c76a1fcbc65423280abe440678ba8ce5a0 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Wed, 11 Aug 2004 15:58:48 +0000 Subject: [PATCH] gst/tcp/: Abstracted away the select call, implemented poll (yes we ran into the 1024 limit in production). Original commit message from CVS: * gst/tcp/Makefile.am: * gst/tcp/gstfdset.c: (gst_fdset_mode_get_type), (nearest_pow), (ensure_size), (gst_fdset_new), (gst_fdset_free), (gst_fdset_set_mode), (gst_fdset_get_mode), (gst_fdset_add_fd), (gst_fdset_remove_fd), (gst_fdset_fd_ctl_write), (gst_fdset_fd_ctl_read), (gst_fdset_fd_has_closed), (gst_fdset_fd_has_error), (gst_fdset_fd_can_read), (gst_fdset_fd_can_write), (gst_fdset_wait): * gst/tcp/gstfdset.h: * gst/tcp/gstmultifdsink.c: (gst_unit_type_get_type), (gst_multifdsink_class_init), (gst_multifdsink_init), (gst_multifdsink_add), (gst_multifdsink_remove), (gst_multifdsink_clear), (gst_multifdsink_get_stats), (gst_multifdsink_remove_client_link), (gst_multifdsink_handle_client_read), (gst_multifdsink_client_queue_data), (gst_multifdsink_client_queue_caps), (gst_multifdsink_client_queue_buffer), (gst_multifdsink_handle_client_write), (gst_multifdsink_recover_client), (gst_multifdsink_queue_buffer), (gst_multifdsink_handle_clients), (gst_multifdsink_set_property), (gst_multifdsink_get_property), (gst_multifdsink_init_send), (gst_multifdsink_close): * gst/tcp/gstmultifdsink.h: * gst/tcp/gsttcpserversink.c: (gst_tcpserversink_class_init), (gst_tcpserversink_init), (gst_tcpserversink_handle_server_read), (gst_tcpserversink_handle_wait), (gst_tcpserversink_init_send), (gst_tcpserversink_close): * gst/tcp/gsttcpserversink.h: Abstracted away the select call, implemented poll (yes we ran into the 1024 limit in production). --- ChangeLog | 34 ++++ gst/tcp/Makefile.am | 2 + gst/tcp/gstfdset.c | 368 +++++++++++++++++++++++++++++++++++++ gst/tcp/gstfdset.h | 69 +++++++ gst/tcp/gstmultifdsink.c | 186 ++++++++++--------- gst/tcp/gstmultifdsink.h | 28 +-- gst/tcp/gsttcpserversink.c | 46 ++--- gst/tcp/gsttcpserversink.h | 2 +- 8 files changed, 611 insertions(+), 124 deletions(-) create mode 100644 gst/tcp/gstfdset.c create mode 100644 gst/tcp/gstfdset.h diff --git a/ChangeLog b/ChangeLog index ed9aa4ff70..391506a200 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,37 @@ +2004-08-11 Wim Taymans + + * gst/tcp/Makefile.am: + * gst/tcp/gstfdset.c: (gst_fdset_mode_get_type), (nearest_pow), + (ensure_size), (gst_fdset_new), (gst_fdset_free), + (gst_fdset_set_mode), (gst_fdset_get_mode), (gst_fdset_add_fd), + (gst_fdset_remove_fd), (gst_fdset_fd_ctl_write), + (gst_fdset_fd_ctl_read), (gst_fdset_fd_has_closed), + (gst_fdset_fd_has_error), (gst_fdset_fd_can_read), + (gst_fdset_fd_can_write), (gst_fdset_wait): + * gst/tcp/gstfdset.h: + * gst/tcp/gstmultifdsink.c: (gst_unit_type_get_type), + (gst_multifdsink_class_init), (gst_multifdsink_init), + (gst_multifdsink_add), (gst_multifdsink_remove), + (gst_multifdsink_clear), (gst_multifdsink_get_stats), + (gst_multifdsink_remove_client_link), + (gst_multifdsink_handle_client_read), + (gst_multifdsink_client_queue_data), + (gst_multifdsink_client_queue_caps), + (gst_multifdsink_client_queue_buffer), + (gst_multifdsink_handle_client_write), + (gst_multifdsink_recover_client), (gst_multifdsink_queue_buffer), + (gst_multifdsink_handle_clients), (gst_multifdsink_set_property), + (gst_multifdsink_get_property), (gst_multifdsink_init_send), + (gst_multifdsink_close): + * gst/tcp/gstmultifdsink.h: + * gst/tcp/gsttcpserversink.c: (gst_tcpserversink_class_init), + (gst_tcpserversink_init), (gst_tcpserversink_handle_server_read), + (gst_tcpserversink_handle_wait), (gst_tcpserversink_init_send), + (gst_tcpserversink_close): + * gst/tcp/gsttcpserversink.h: + Abstracted away the select call, implemented poll (yes we ran into + the 1024 limit in production). + 2004-08-11 Thomas Vander Stichele * gst/tcp/gsttcp.c: diff --git a/gst/tcp/Makefile.am b/gst/tcp/Makefile.am index dad81b878b..82ccdc2d48 100644 --- a/gst/tcp/Makefile.am +++ b/gst/tcp/Makefile.am @@ -16,6 +16,7 @@ libgsttcp_la_SOURCES = \ gsttcpplugin.c \ gsttcpsrc.c gsttcpsink.c \ gsttcp.c \ + gstfdset.c \ gstmultifdsink.c \ gsttcpclientsrc.c gsttcpclientsink.c \ gsttcpserversrc.c gsttcpserversink.c @@ -32,6 +33,7 @@ noinst_HEADERS = \ gsttcpplugin.h \ gsttcpsrc.h gsttcpsink.h \ gsttcp.h \ + gstfdset.h \ gstmultifdsink.h \ gsttcpclientsrc.h gsttcpclientsink.h \ gsttcpserversrc.h gsttcpserversink.h diff --git a/gst/tcp/gstfdset.c b/gst/tcp/gstfdset.c new file mode 100644 index 0000000000..fddfe4e342 --- /dev/null +++ b/gst/tcp/gstfdset.c @@ -0,0 +1,368 @@ +/* GStreamer + * Copyright (C) <1999> Erik Walthinsen + * Copyright (C) <2004> Wim Taymans + * + * gsttcpfdset.h: fdset datastructure + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place - Suite 330, + * Boston, MA 02111-1307, USA. + */ + +#define MIN_POLLFDS 64 +#define INIT_POLLFDS MIN_POLLFDS + +#include + +#include "gstfdset.h" + +GType +gst_fdset_mode_get_type (void) +{ + static GType fdset_mode_type = 0; + static GEnumValue fdset_mode[] = { + {GST_FDSET_MODE_SELECT, "GST_FDSET_MODE_SELECT", "Select"}, + {GST_FDSET_MODE_POLL, "GST_FDSET_MODE_POLL", "Poll"}, + {GST_FDSET_MODE_EPOLL, "GST_FDSET_MODE_EPOLL", "EPoll"}, + {0, NULL, NULL}, + }; + + if (!fdset_mode_type) { + fdset_mode_type = g_enum_register_static ("GstFDSetModeType", fdset_mode); + } + return fdset_mode_type; +} + +struct _GstFDSet +{ + GstFDSetMode mode; + + /* for poll */ + struct pollfd *pollfds; + gint last_pollfds; + gint size; + gint free; + + /* for select */ + fd_set readfds, writefds; /* input */ + fd_set testreadfds, testwritefds; /* output */ +}; + +static gint +nearest_pow (gint num) +{ + gint n = 1; + + while (n < num) + n <<= 1; + + return n; +} + +static void +ensure_size (GstFDSet * set, gint len) +{ + guint need = len * sizeof (struct pollfd); + + if (need > set->size) { + need = nearest_pow (need); + need = MAX (need, MIN_POLLFDS * sizeof (struct pollfd)); + + set->pollfds = g_realloc (set->pollfds, need); + + set->size = need; + } +} + +GstFDSet * +gst_fdset_new (GstFDSetMode mode) +{ + GstFDSet *nset; + + nset = g_new0 (GstFDSet, 1); + nset->mode = mode; + + switch (mode) { + case GST_FDSET_MODE_SELECT: + FD_ZERO (&nset->readfds); + FD_ZERO (&nset->writefds); + break; + case GST_FDSET_MODE_POLL: + nset->pollfds = NULL; + nset->free = 0; + nset->last_pollfds = 0; + ensure_size (nset, MIN_POLLFDS); + break; + case GST_FDSET_MODE_EPOLL: + g_warning ("implement me"); + break; + default: + break; + } + return nset; +} + +void +gst_fdset_free (GstFDSet * set) +{ + switch (set->mode) { + case GST_FDSET_MODE_SELECT: + break; + case GST_FDSET_MODE_POLL: + g_free (set->pollfds); + break; + case GST_FDSET_MODE_EPOLL: + g_warning ("implement me"); + break; + default: + break; + } + g_free (set); +} + + +void +gst_fdset_set_mode (GstFDSet * set, GstFDSetMode mode) +{ + g_warning ("implement me"); +} + +GstFDSetMode +gst_fdset_get_mode (GstFDSet * set) +{ + return set->mode; +} + +void +gst_fdset_add_fd (GstFDSet * set, GstFD * fd) +{ + switch (set->mode) { + case GST_FDSET_MODE_SELECT: + /* nothing */ + break; + case GST_FDSET_MODE_POLL: + { + struct pollfd *nfd; + gint idx; + + ensure_size (set, set->last_pollfds + 1); + + idx = set->free; + if (idx == -1) { + /* find free space */ + while (idx < set->last_pollfds) { + idx++; + if (set->pollfds[idx].fd == -1) + break; + } + } + nfd = &set->pollfds[idx]; + + nfd->fd = fd->fd; + nfd->events = 0; + nfd->revents = 0; + + /* see if we have one fd more */ + set->last_pollfds = MAX (idx + 1, set->last_pollfds); + fd->idx = idx; + set->free = -1; + break; + } + case GST_FDSET_MODE_EPOLL: + break; + } +} + +void +gst_fdset_remove_fd (GstFDSet * set, GstFD * fd) +{ + switch (set->mode) { + case GST_FDSET_MODE_SELECT: + /* nothing */ + FD_CLR (fd->fd, &set->writefds); + FD_CLR (fd->fd, &set->readfds); + break; + case GST_FDSET_MODE_POLL: + { + set->pollfds[fd->idx].fd = -1; + set->pollfds[fd->idx].events = 0; + set->pollfds[fd->idx].revents = 0; + + /* if we removed the last fd, we can lower the last_pollfds */ + if (fd->idx + 1 == set->last_pollfds) { + set->last_pollfds--; + } + fd->idx = -1; + + if (set->free == -1) { + set->free = fd->idx; + } else { + set->free = MIN (set->free, fd->idx); + } + break; + } + case GST_FDSET_MODE_EPOLL: + break; + } +} + +void +gst_fdset_fd_ctl_write (GstFDSet * set, GstFD * fd, gboolean active) +{ + switch (set->mode) { + case GST_FDSET_MODE_SELECT: + if (active) + FD_SET (fd->fd, &set->writefds); + else + FD_CLR (fd->fd, &set->writefds); + break; + case GST_FDSET_MODE_POLL: + { + set->pollfds[fd->idx].events = (active ? POLLOUT : 0); + break; + } + case GST_FDSET_MODE_EPOLL: + break; + } +} + +void +gst_fdset_fd_ctl_read (GstFDSet * set, GstFD * fd, gboolean active) +{ + switch (set->mode) { + case GST_FDSET_MODE_SELECT: + if (active) + FD_SET (fd->fd, &set->readfds); + else + FD_CLR (fd->fd, &set->readfds); + break; + case GST_FDSET_MODE_POLL: + { + set->pollfds[fd->idx].events = (active ? (POLLIN | POLLPRI) : 0); + break; + } + case GST_FDSET_MODE_EPOLL: + break; + } +} + +gboolean +gst_fdset_fd_has_closed (GstFDSet * set, GstFD * fd) +{ + gboolean res = FALSE; + + switch (set->mode) { + case GST_FDSET_MODE_SELECT: + res = FALSE; + break; + case GST_FDSET_MODE_POLL: + res = (set->pollfds[fd->idx].revents & POLLHUP) != 0; + break; + case GST_FDSET_MODE_EPOLL: + break; + } + return res; +} + +gboolean +gst_fdset_fd_has_error (GstFDSet * set, GstFD * fd) +{ + gboolean res = FALSE; + + switch (set->mode) { + case GST_FDSET_MODE_SELECT: + res = FALSE; + break; + case GST_FDSET_MODE_POLL: + res = (set->pollfds[fd->idx].revents & (POLLERR | POLLNVAL)) != 0; + break; + case GST_FDSET_MODE_EPOLL: + break; + } + return res; +} + +gboolean +gst_fdset_fd_can_read (GstFDSet * set, GstFD * fd) +{ + gboolean res = FALSE; + + switch (set->mode) { + case GST_FDSET_MODE_SELECT: + res = FD_ISSET (fd->fd, &set->testreadfds); + break; + case GST_FDSET_MODE_POLL: + res = (set->pollfds[fd->idx].revents & (POLLIN | POLLPRI)) != 0; + break; + case GST_FDSET_MODE_EPOLL: + break; + } + return res; +} + +gboolean +gst_fdset_fd_can_write (GstFDSet * set, GstFD * fd) +{ + gboolean res = FALSE; + + switch (set->mode) { + case GST_FDSET_MODE_SELECT: + res = FD_ISSET (fd->fd, &set->testwritefds); + break; + case GST_FDSET_MODE_POLL: + res = (set->pollfds[fd->idx].revents & POLLOUT) != 0; + break; + case GST_FDSET_MODE_EPOLL: + break; + } + return res; +} + +int +gst_fdset_wait (GstFDSet * set, int timeout) +{ + int res = -1; + + switch (set->mode) { + case GST_FDSET_MODE_SELECT: + { + struct timeval tv; + struct timeval *tvptr = NULL; + + set->testreadfds = set->readfds; + set->testwritefds = set->writefds; + + if (timeout > 0) { + tv.tv_sec = timeout / 1000; + tv.tv_usec = timeout % 1000; + + tvptr = &tv; + } + res = + select (FD_SETSIZE, &set->testreadfds, &set->testwritefds, + (fd_set *) 0, tvptr); + break; + } + case GST_FDSET_MODE_POLL: + /* we do not make a copy here. The polfds could change while + * executing this call but even if this should happen and cause + * problems, we can recover from it */ + res = poll (set->pollfds, set->last_pollfds, timeout); + break; + case GST_FDSET_MODE_EPOLL: + break; + } + + return res; +} diff --git a/gst/tcp/gstfdset.h b/gst/tcp/gstfdset.h new file mode 100644 index 0000000000..c3475a783f --- /dev/null +++ b/gst/tcp/gstfdset.h @@ -0,0 +1,69 @@ +/* GStreamer + * Copyright (C) <1999> Erik Walthinsen + * Copyright (C) <2004> Wim Taymans + * + * gstfdset.h: fdset datastructure + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place - Suite 330, + * Boston, MA 02111-1307, USA. + */ + +#ifndef __GST_FDSET_H__ +#define __GST_FDSET_H__ + +#include + +G_BEGIN_DECLS + +typedef struct _GstFDSet GstFDSet; + +typedef struct { + int fd; + gint idx; + gpointer data; +} GstFD; + +typedef enum { + GST_FDSET_MODE_SELECT, + GST_FDSET_MODE_POLL, + GST_FDSET_MODE_EPOLL +} GstFDSetMode; + +#define GST_TYPE_FDSET_MODE (gst_fdset_mode_get_type()) +GType gst_fdset_mode_get_type (void); + + +GstFDSet* gst_fdset_new (GstFDSetMode mode); +void gst_fdset_free (GstFDSet *set); + +void gst_fdset_set_mode (GstFDSet *set, GstFDSetMode mode); +GstFDSetMode gst_fdset_get_mode (GstFDSet *set); + +void gst_fdset_add_fd (GstFDSet *set, GstFD *fd); +void gst_fdset_remove_fd (GstFDSet *set, GstFD *fd); + +void gst_fdset_fd_ctl_write (GstFDSet *set, GstFD *fd, gboolean active); +void gst_fdset_fd_ctl_read (GstFDSet *set, GstFD *fd, gboolean active); + +gboolean gst_fdset_fd_has_closed (GstFDSet *set, GstFD *fd); +gboolean gst_fdset_fd_has_error (GstFDSet *set, GstFD *fd); +gboolean gst_fdset_fd_can_read (GstFDSet *set, GstFD *fd); +gboolean gst_fdset_fd_can_write (GstFDSet *set, GstFD *fd); + +int gst_fdset_wait (GstFDSet *set, int timeout); + +G_END_DECLS + +#endif /* __GST_FDSET_H__ */ diff --git a/gst/tcp/gstmultifdsink.c b/gst/tcp/gstmultifdsink.c index 20057ed47b..133296b3bb 100644 --- a/gst/tcp/gstmultifdsink.c +++ b/gst/tcp/gstmultifdsink.c @@ -24,6 +24,10 @@ #include #include +#include +#include +#include +#include #ifdef HAVE_FIONREAD_IN_SYS_FILIO #include @@ -32,6 +36,8 @@ #include "gstmultifdsink.h" #include "gsttcp-marshal.h" +#define NOT_IMPLEMENTED 0 + /* the select call is also performed on the control sockets, that way * we can send special commands to unblock or restart the select call */ #define CONTROL_RESTART 'R' /* restart the select call */ @@ -39,15 +45,16 @@ #define CONTROL_SOCKETS(sink) sink->control_sock #define WRITE_SOCKET(sink) sink->control_sock[1] #define READ_SOCKET(sink) sink->control_sock[0] + #define SEND_COMMAND(sink, command) \ G_STMT_START { \ unsigned char c; c = command; \ - write (WRITE_SOCKET(sink), &c, 1); \ + write (WRITE_SOCKET(sink).fd, &c, 1); \ } G_STMT_END #define READ_COMMAND(sink, command, res) \ G_STMT_START { \ - res = read(READ_SOCKET(sink), &command, 1); \ + res = read(READ_SOCKET(sink).fd, &command, 1); \ } G_STMT_END /* elementfactory information */ @@ -79,6 +86,7 @@ enum /* this is really arbitrary choosen */ #define DEFAULT_PROTOCOL GST_TCP_PROTOCOL_TYPE_NONE +#define DEFAULT_MODE GST_FDSET_MODE_POLL #define DEFAULT_BUFFERS_MAX -1 #define DEFAULT_BUFFERS_SOFT_MAX -1 #define DEFAULT_UNIT_TYPE GST_UNIT_TYPE_BUFFERS @@ -91,6 +99,7 @@ enum { ARG_0, ARG_PROTOCOL, + ARG_MODE, ARG_BUFFERS_QUEUED, ARG_BYTES_QUEUED, ARG_TIME_QUEUED, @@ -132,6 +141,7 @@ gst_recover_policy_get_type (void) return recover_policy_type; } +#if NOT_IMPLEMENTED #define GST_TYPE_UNIT_TYPE (gst_unit_type_get_type()) static GType gst_unit_type_get_type (void) @@ -149,6 +159,7 @@ gst_unit_type_get_type (void) } return unit_type_type; } +#endif #define GST_TYPE_CLIENT_STATUS (gst_client_status_get_type()) static GType @@ -241,6 +252,10 @@ gst_multifdsink_class_init (GstMultiFdSinkClass * klass) g_object_class_install_property (gobject_class, ARG_PROTOCOL, g_param_spec_enum ("protocol", "Protocol", "The protocol to wrap data in", GST_TYPE_TCP_PROTOCOL_TYPE, DEFAULT_PROTOCOL, G_PARAM_READWRITE)); + g_object_class_install_property (gobject_class, ARG_MODE, + g_param_spec_enum ("mode", "Mode", + "The mode for selecting activity on the fds", GST_TYPE_FDSET_MODE, + DEFAULT_MODE, G_PARAM_READWRITE)); g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_BUFFERS_MAX, g_param_spec_int ("buffers-max", "Buffers max", @@ -251,6 +266,7 @@ gst_multifdsink_class_init (GstMultiFdSinkClass * klass) "Recover client when going over this limit (-1 = no limit)", -1, G_MAXINT, DEFAULT_BUFFERS_SOFT_MAX, G_PARAM_READWRITE)); +#if NOT_IMPLEMENTED g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_UNIT_TYPE, g_param_spec_enum ("unit-type", "Units type", "The unit to measure the max/soft-max/queued properties", @@ -263,11 +279,13 @@ gst_multifdsink_class_init (GstMultiFdSinkClass * klass) g_param_spec_int ("units-soft-max", "Units soft max", "Recover client when going over this limit (-1 = no limit)", -1, G_MAXINT, DEFAULT_UNITS_SOFT_MAX, G_PARAM_READWRITE)); +#endif g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_BUFFERS_QUEUED, g_param_spec_uint ("buffers-queued", "Buffers queued", "Number of buffers currently queued", 0, G_MAXUINT, 0, G_PARAM_READABLE)); +#if NOT_IMPLEMENTED g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_BYTES_QUEUED, g_param_spec_uint ("bytes-queued", "Bytes queued", "Number of bytes currently queued", 0, G_MAXUINT, 0, @@ -276,6 +294,7 @@ gst_multifdsink_class_init (GstMultiFdSinkClass * klass) g_param_spec_uint64 ("time-queued", "Time queued", "Number of time currently queued", 0, G_MAXUINT64, 0, G_PARAM_READABLE)); +#endif g_object_class_install_property (gobject_class, ARG_RECOVER_POLICY, g_param_spec_enum ("recover-policy", "Recover Policy", @@ -346,6 +365,7 @@ gst_multifdsink_init (GstMultiFdSink * this) GST_FLAG_UNSET (this, GST_MULTIFDSINK_OPEN); this->protocol = DEFAULT_PROTOCOL; + this->mode = DEFAULT_MODE; this->clientslock = g_mutex_new (); this->clients = NULL; @@ -359,18 +379,6 @@ gst_multifdsink_init (GstMultiFdSink * this) this->timeout = DEFAULT_TIMEOUT; } -static void -gst_multifdsink_debug_fdset (GstMultiFdSink * sink, fd_set * testfds) -{ - int fd; - - for (fd = 0; fd < FD_SETSIZE; fd++) { - if (FD_ISSET (fd, testfds)) { - GST_LOG_OBJECT (sink, "fd %d", fd); - } - } -} - void gst_multifdsink_add (GstMultiFdSink * sink, int fd) { @@ -381,7 +389,7 @@ gst_multifdsink_add (GstMultiFdSink * sink, int fd) /* create client datastructure */ client = g_new0 (GstTCPClient, 1); - client->fd = fd; + client->fd.fd = fd; client->status = GST_CLIENT_STATUS_OK; client->bufpos = -1; client->bufoffset = 0; @@ -404,7 +412,9 @@ gst_multifdsink_add (GstMultiFdSink * sink, int fd) /* set the socket to non blocking */ fcntl (fd, F_SETFL, O_NONBLOCK); /* we always read from a client */ - FD_SET (fd, &sink->readfds); + gst_fdset_add_fd (sink->fdset, &client->fd); + gst_fdset_fd_ctl_read (sink->fdset, &client->fd, TRUE); + SEND_COMMAND (sink, CONTROL_RESTART); g_mutex_unlock (sink->clientslock); @@ -428,9 +438,10 @@ gst_multifdsink_remove (GstMultiFdSink * sink, int fd) client = (GstTCPClient *) clients->data; next = g_list_next (clients); - if (client->fd == fd) { + if (client->fd.fd == fd) { client->status = GST_CLIENT_STATUS_REMOVED; gst_multifdsink_remove_client_link (sink, clients); + SEND_COMMAND (sink, CONTROL_RESTART); break; } } @@ -454,6 +465,7 @@ gst_multifdsink_clear (GstMultiFdSink * sink) client->status = GST_CLIENT_STATUS_REMOVED; gst_multifdsink_remove_client_link (sink, clients); } + SEND_COMMAND (sink, CONTROL_RESTART); g_mutex_unlock (sink->clientslock); } @@ -470,7 +482,7 @@ gst_multifdsink_get_stats (GstMultiFdSink * sink, int fd) client = (GstTCPClient *) clients->data; - if (client->fd == fd) { + if (client->fd.fd == fd) { GValue value = { 0 }; guint64 interval; @@ -525,48 +537,42 @@ gst_multifdsink_remove_client_link (GstMultiFdSink * sink, GList * link) GTimeVal now; GstTCPClient *client = (GstTCPClient *) link->data; - fd = client->fd; + fd = client->fd.fd; /* FIXME: if we keep track of ip we can log it here and signal */ - GST_DEBUG_OBJECT (sink, "removing client on fd %d", fd); - switch (client->status) { case GST_CLIENT_STATUS_OK: GST_WARNING_OBJECT (sink, "removing client %p with fd %d for no reason", - client, client->fd); + client, fd); break; case GST_CLIENT_STATUS_CLOSED: GST_DEBUG_OBJECT (sink, "removing client %p with fd %d because of close", - client, client->fd); + client, fd); break; case GST_CLIENT_STATUS_REMOVED: GST_DEBUG_OBJECT (sink, "removing client %p with fd %d because the app removed it", client, - client->fd); + fd); break; case GST_CLIENT_STATUS_SLOW: GST_INFO_OBJECT (sink, - "removing client %p with fd %d because it was too slow", client, - client->fd); + "removing client %p with fd %d because it was too slow", client, fd); break; case GST_CLIENT_STATUS_ERROR: GST_WARNING_OBJECT (sink, - "removing client %p with fd %d because of error", client, client->fd); + "removing client %p with fd %d because of error", client, fd); break; default: GST_WARNING_OBJECT (sink, - "removing client %p with fd %d with invalid reason", client, - client->fd); + "removing client %p with fd %d with invalid reason", client, fd); break; } - FD_CLR (fd, &sink->readfds); - FD_CLR (fd, &sink->writefds); + gst_fdset_remove_fd (sink->fdset, &client->fd); if (close (fd) != 0) { /* this is not really an error */ GST_DEBUG_OBJECT (sink, "error closing fd %d: %s", fd, g_strerror (errno)); } - SEND_COMMAND (sink, CONTROL_RESTART); g_get_current_time (&now); client->disconnect_time = GST_TIMEVAL_TO_TIME (now); @@ -596,7 +602,7 @@ gst_multifdsink_handle_client_read (GstMultiFdSink * sink, int avail, fd; gboolean ret; - fd = client->fd; + fd = client->fd.fd; if (ioctl (fd, FIONREAD, &avail) < 0) { GST_WARNING_OBJECT (sink, "ioctl failed for fd %d: %s", @@ -665,7 +671,7 @@ gst_multifdsink_client_queue_data (GstMultiFdSink * sink, GstTCPClient * client, GST_BUFFER_SIZE (buf) = len; GST_LOG_OBJECT (sink, "Queueing data of length %d for fd %d", - len, client->fd); + len, client->fd.fd); client->sending = g_slist_append (client->sending, buf); @@ -683,7 +689,7 @@ gst_multifdsink_client_queue_caps (GstMultiFdSink * sink, GstTCPClient * client, string = gst_caps_to_string (caps); GST_DEBUG_OBJECT (sink, "Queueing caps %s for fd %d through GDP", string, - client->fd); + client->fd.fd); g_free (string); if (!gst_dp_packet_from_caps (caps, 0, &length, &header, &payload)) { @@ -708,7 +714,7 @@ gst_multifdsink_client_queue_buffer (GstMultiFdSink * sink, if (!gst_dp_header_from_buffer (buffer, 0, &len, &header)) { GST_DEBUG_OBJECT (sink, - "could not create header, removing client on fd %d", client->fd); + "could not create header, removing client on fd %d", client->fd.fd); return FALSE; } gst_multifdsink_client_queue_data (sink, client, header, len); @@ -750,7 +756,7 @@ static gboolean gst_multifdsink_handle_client_write (GstMultiFdSink * sink, GstTCPClient * client) { - int fd = client->fd; + int fd = client->fd.fd; gboolean more; gboolean res; GstClockTime now; @@ -803,7 +809,7 @@ gst_multifdsink_handle_client_write (GstMultiFdSink * sink, if (client->bufpos == -1) { /* client is too fast, remove from write queue until new buffer is * available */ - FD_CLR (fd, &sink->writefds); + gst_fdset_fd_ctl_write (sink->fdset, &client->fd, FALSE); return TRUE; } else { /* client can pick a buffer from the global queue */ @@ -885,7 +891,7 @@ gst_multifdsink_recover_client (GstMultiFdSink * sink, GstTCPClient * client) GST_WARNING_OBJECT (sink, "client %p with fd %d is lagging at %d, recover using policy %d", client, - client->fd, client->bufpos, sink->recover_policy); + client->fd.fd, client->bufpos, sink->recover_policy); switch (sink->recover_policy) { case GST_RECOVER_POLICY_NONE: @@ -961,7 +967,7 @@ gst_multifdsink_queue_buffer (GstMultiFdSink * sink, GstBuffer * buf) client->bufpos++; GST_LOG_OBJECT (sink, "client %p with fd %d at position %d", - client, client->fd, client->bufpos); + client, client->fd.fd, client->bufpos); /* check soft max if needed, recover client */ if (sink->units_soft_max > 0 && client->bufpos >= sink->units_soft_max) { gint newpos; @@ -971,10 +977,11 @@ gst_multifdsink_queue_buffer (GstMultiFdSink * sink, GstBuffer * buf) client->bufpos = newpos; client->discont = TRUE; GST_WARNING_OBJECT (sink, "client %p with fd %d position reset to %d", - client, client->fd, client->bufpos); + client, client->fd.fd, client->bufpos); } else { GST_WARNING_OBJECT (sink, - "client %p with fd %d not recovering position", client, client->fd); + "client %p with fd %d not recovering position", client, + client->fd.fd); } } /* check hard max and timeout, remove client */ @@ -983,20 +990,18 @@ gst_multifdsink_queue_buffer (GstMultiFdSink * sink, GstBuffer * buf) && now - client->last_activity_time > sink->timeout)) { /* remove client */ GST_WARNING_OBJECT (sink, "client %p with fd %d is too slow, removing", - client, client->fd); - FD_CLR (client->fd, &sink->readfds); - FD_CLR (client->fd, &sink->writefds); + client, client->fd.fd); + /* remove the client, the fd set will be cleared and the select thread will + * be signaled */ client->status = GST_CLIENT_STATUS_SLOW; gst_multifdsink_remove_client_link (sink, clients); - /* cannot send data to this client anymore. need to signal the select thread that - * the fd_set changed */ - need_signal = TRUE; /* set client to invalid position while being removed */ client->bufpos = -1; + need_signal = TRUE; } else if (client->bufpos == 0) { /* can send data to this client now. need to signal the select thread that * the fd_set changed */ - FD_SET (client->fd, &sink->writefds); + gst_fdset_fd_ctl_write (sink->fdset, &client->fd, TRUE); need_signal = TRUE; } /* keep track of maximum buffer usage */ @@ -1038,7 +1043,6 @@ static void gst_multifdsink_handle_clients (GstMultiFdSink * sink) { int result; - fd_set testreadfds, testwritefds; GList *clients, *next; gboolean try_again; GstMultiFdSinkClass *fclass; @@ -1054,20 +1058,11 @@ gst_multifdsink_handle_clients (GstMultiFdSink * sink) * - server socket input (ie, new client connections) * - client socket input (ie, clients saying goodbye) * - client socket output (ie, client reads) */ - testwritefds = sink->writefds; - testreadfds = sink->readfds; - - GST_LOG_OBJECT (sink, "doing select on server + client fds for reads"); - gst_multifdsink_debug_fdset (sink, &testreadfds); - GST_LOG_OBJECT (sink, "doing select on client fds for writes"); - gst_multifdsink_debug_fdset (sink, &testwritefds); - - result = - select (FD_SETSIZE, &testreadfds, &testwritefds, (fd_set *) 0, NULL); + result = gst_fdset_wait (sink->fdset, -1); /* < 0 is an error, 0 just means a timeout happened */ if (result < 0) { - GST_WARNING_OBJECT (sink, "select failed: %s", g_strerror (errno)); + GST_WARNING_OBJECT (sink, "wait failed: %s", g_strerror (errno)); if (errno == EBADF) { /* ok, so one of the fds is invalid. We loop over them to find one * that gives an error to the F_GETFL fcntl. */ @@ -1081,7 +1076,7 @@ gst_multifdsink_handle_clients (GstMultiFdSink * sink) client = (GstTCPClient *) clients->data; next = g_list_next (clients); - fd = client->fd; + fd = client->fd.fd; res = fcntl (fd, F_GETFL, &flags); if (res == -1) { @@ -1106,34 +1101,35 @@ gst_multifdsink_handle_clients (GstMultiFdSink * sink) return; } } else { - GST_LOG_OBJECT (sink, "%d sockets had action", result); - GST_LOG_OBJECT (sink, "done select on server/client fds for reads"); - gst_multifdsink_debug_fdset (sink, &testreadfds); - GST_LOG_OBJECT (sink, "done select on client fds for writes"); - gst_multifdsink_debug_fdset (sink, &testwritefds); - + GST_INFO_OBJECT (sink, "wait done: %d", result); /* read all commands */ - if (FD_ISSET (READ_SOCKET (sink), &testreadfds)) { + if (gst_fdset_fd_can_read (sink->fdset, &READ_SOCKET (sink))) { + GST_INFO_OBJECT (sink, "have a command"); while (TRUE) { gchar command; int res; READ_COMMAND (sink, command, res); if (res < 0) { + GST_INFO_OBJECT (sink, "no more commands"); /* no more commands */ break; } switch (command) { case CONTROL_RESTART: + GST_INFO_OBJECT (sink, "restart"); /* need to restart the select call as the fd_set changed */ try_again = TRUE; break; + /* need to restart the select call as the fd_set changed */ case CONTROL_STOP: + GST_INFO_OBJECT (sink, "stop"); /* stop this function */ stop = TRUE; break; default: + GST_WARNING_OBJECT (sink, "unkown"); g_warning ("multifdsink: unknown control message received"); break; } @@ -1145,14 +1141,13 @@ gst_multifdsink_handle_clients (GstMultiFdSink * sink) } } while (try_again); - if (fclass->select) - fclass->select (sink, &testreadfds, &testwritefds); + if (fclass->wait) + fclass->wait (sink, sink->fdset); /* Check the reads */ g_mutex_lock (sink->clientslock); for (clients = sink->clients; clients; clients = next) { GstTCPClient *client; - int fd; client = (GstTCPClient *) clients->data; next = g_list_next (clients); @@ -1162,16 +1157,24 @@ gst_multifdsink_handle_clients (GstMultiFdSink * sink) continue; } - fd = client->fd; - - if (FD_ISSET (fd, &testreadfds)) { + if (gst_fdset_fd_has_closed (sink->fdset, &client->fd)) { + client->status = GST_CLIENT_STATUS_CLOSED; + gst_multifdsink_remove_client_link (sink, clients); + continue; + } + if (gst_fdset_fd_has_error (sink->fdset, &client->fd)) { + client->status = GST_CLIENT_STATUS_ERROR; + gst_multifdsink_remove_client_link (sink, clients); + continue; + } + if (gst_fdset_fd_can_read (sink->fdset, &client->fd)) { /* handle client read */ if (!gst_multifdsink_handle_client_read (sink, client)) { gst_multifdsink_remove_client_link (sink, clients); continue; } } - if (FD_ISSET (fd, &testwritefds)) { + if (gst_fdset_fd_can_write (sink->fdset, &client->fd)) { /* handle client write */ if (!gst_multifdsink_handle_client_write (sink, client)) { gst_multifdsink_remove_client_link (sink, clients); @@ -1243,6 +1246,9 @@ gst_multifdsink_set_property (GObject * object, guint prop_id, case ARG_PROTOCOL: multifdsink->protocol = g_value_get_enum (value); break; + case ARG_MODE: + multifdsink->mode = g_value_get_enum (value); + break; case ARG_BUFFERS_MAX: multifdsink->units_max = g_value_get_int (value); break; @@ -1284,6 +1290,9 @@ gst_multifdsink_get_property (GObject * object, guint prop_id, GValue * value, case ARG_PROTOCOL: g_value_set_enum (value, multifdsink->protocol); break; + case ARG_MODE: + g_value_set_enum (value, multifdsink->mode); + break; case ARG_BUFFERS_MAX: g_value_set_int (value, multifdsink->units_max); break; @@ -1333,20 +1342,26 @@ static gboolean gst_multifdsink_init_send (GstMultiFdSink * this) { GstMultiFdSinkClass *fclass; + int control_socket[2]; fclass = GST_MULTIFDSINK_GET_CLASS (this); - FD_ZERO (&this->readfds); - FD_ZERO (&this->writefds); + GST_INFO_OBJECT (this, "starting in mode %d", this->mode); + this->fdset = gst_fdset_new (this->mode); - if (socketpair (PF_UNIX, SOCK_STREAM, 0, CONTROL_SOCKETS (this)) < 0) { + if (socketpair (PF_UNIX, SOCK_STREAM, 0, control_socket) < 0) { GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ_WRITE, (NULL), GST_ERROR_SYSTEM); return FALSE; } - FD_SET (READ_SOCKET (this), &this->readfds); - fcntl (READ_SOCKET (this), F_SETFL, O_NONBLOCK); - fcntl (WRITE_SOCKET (this), F_SETFL, O_NONBLOCK); + READ_SOCKET (this).fd = control_socket[0]; + WRITE_SOCKET (this).fd = control_socket[1]; + + gst_fdset_add_fd (this->fdset, &READ_SOCKET (this)); + gst_fdset_fd_ctl_read (this->fdset, &READ_SOCKET (this), TRUE); + + fcntl (READ_SOCKET (this).fd, F_SETFL, O_NONBLOCK); + fcntl (WRITE_SOCKET (this).fd, F_SETFL, O_NONBLOCK); this->streamheader = NULL; this->bytes_to_serve = 0; @@ -1375,8 +1390,9 @@ gst_multifdsink_close (GstMultiFdSink * this) SEND_COMMAND (this, CONTROL_STOP); g_thread_join (this->thread); - close (READ_SOCKET (this)); - close (WRITE_SOCKET (this)); + close (READ_SOCKET (this).fd); + close (WRITE_SOCKET (this).fd); + gst_fdset_remove_fd (this->fdset, &READ_SOCKET (this)); if (this->streamheader) { GSList *l; @@ -1389,6 +1405,8 @@ gst_multifdsink_close (GstMultiFdSink * this) if (fclass->close) fclass->close (this); + + gst_fdset_free (this->fdset); } static GstElementStateReturn diff --git a/gst/tcp/gstmultifdsink.h b/gst/tcp/gstmultifdsink.h index 03ca73bd91..840ac70e18 100644 --- a/gst/tcp/gstmultifdsink.h +++ b/gst/tcp/gstmultifdsink.h @@ -29,20 +29,8 @@ extern "C" { #endif /* __cplusplus */ -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include #include "gsttcp.h" +#include "gstfdset.h" #define GST_TYPE_MULTIFDSINK \ (gst_multifdsink_get_type()) @@ -94,7 +82,8 @@ typedef enum /* structure for a client * */ typedef struct { - int fd; + GstFD fd; + gint bufpos; /* position of this client in the global queue */ GstClientStatus status; @@ -131,10 +120,13 @@ struct _GstMultiFdSink { GMutex *clientslock; /* lock to protect the clients list */ GList *clients; /* list of clients we are serving */ - fd_set readfds; /* all the client file descriptors that we can read from */ - fd_set writefds; /* all the client file descriptors that we can write to */ + GstFDSetMode mode; + GstFDSet *fdset; - int control_sock[2]; /* sockets for controlling the select call */ + //fd_set readfds; /* all the client file descriptors that we can read from */ + //fd_set writefds; /* all the client file descriptors that we can write to */ + + GstFD control_sock[2];/* sockets for controlling the select call */ GSList *streamheader; /* GSList of GstBuffers to use as streamheader */ GstTCPProtocolType protocol; @@ -168,7 +160,7 @@ struct _GstMultiFdSinkClass { /* vtable */ gboolean (*init) (GstMultiFdSink *sink); - gboolean (*select) (GstMultiFdSink *sink, fd_set *readfds, fd_set *writefds); + gboolean (*wait) (GstMultiFdSink *sink, GstFDSet *set); gboolean (*close) (GstMultiFdSink *sink); /* signals */ diff --git a/gst/tcp/gsttcpserversink.c b/gst/tcp/gsttcpserversink.c index b4499836cf..d378631a3c 100644 --- a/gst/tcp/gsttcpserversink.c +++ b/gst/tcp/gsttcpserversink.c @@ -57,8 +57,8 @@ static void gst_tcpserversink_base_init (gpointer g_class); static void gst_tcpserversink_class_init (GstTCPServerSink * klass); static void gst_tcpserversink_init (GstTCPServerSink * tcpserversink); -static gboolean gst_tcpserversink_handle_select (GstMultiFdSink * sink, - fd_set * readfds, fd_set * writefds); +static gboolean gst_tcpserversink_handle_wait (GstMultiFdSink * sink, + GstFDSet * set); static gboolean gst_tcpserversink_init_send (GstMultiFdSink * this); static gboolean gst_tcpserversink_close (GstMultiFdSink * this); @@ -129,7 +129,7 @@ gst_tcpserversink_class_init (GstTCPServerSink * klass) gobject_class->get_property = gst_tcpserversink_get_property; gstmultifdsink_class->init = gst_tcpserversink_init_send; - gstmultifdsink_class->select = gst_tcpserversink_handle_select; + gstmultifdsink_class->wait = gst_tcpserversink_handle_wait; gstmultifdsink_class->close = gst_tcpserversink_close; GST_DEBUG_CATEGORY_INIT (tcpserversink_debug, "tcpserversink", 0, "TCP sink"); @@ -142,7 +142,7 @@ gst_tcpserversink_init (GstTCPServerSink * this) /* should support as minimum 576 for IPV4 and 1500 for IPV6 */ /* this->mtu = 1500; */ - this->server_sock_fd = -1; + this->server_sock.fd = -1; } /* handle a read request on the server, @@ -156,7 +156,7 @@ gst_tcpserversink_handle_server_read (GstTCPServerSink * sink) int client_address_len; client_sock_fd = - accept (sink->server_sock_fd, (struct sockaddr *) &client_address, + accept (sink->server_sock.fd, (struct sockaddr *) &client_address, &client_address_len); if (client_sock_fd == -1) { GST_ELEMENT_ERROR (sink, RESOURCE, OPEN_WRITE, (NULL), @@ -173,12 +173,11 @@ gst_tcpserversink_handle_server_read (GstTCPServerSink * sink) } static gboolean -gst_tcpserversink_handle_select (GstMultiFdSink * sink, - fd_set * readfds, fd_set * writefds) +gst_tcpserversink_handle_wait (GstMultiFdSink * sink, GstFDSet * set) { GstTCPServerSink *this = GST_TCPSERVERSINK (sink); - if (FD_ISSET (this->server_sock_fd, readfds)) { + if (gst_fdset_fd_can_read (set, &this->server_sock)) { /* handle new client connection on server socket */ if (!gst_tcpserversink_handle_server_read (this)) { GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), @@ -245,22 +244,22 @@ gst_tcpserversink_init_send (GstMultiFdSink * parent) GstTCPServerSink *this = GST_TCPSERVERSINK (parent); /* create sending server socket */ - if ((this->server_sock_fd = socket (AF_INET, SOCK_STREAM, 0)) == -1) { + if ((this->server_sock.fd = socket (AF_INET, SOCK_STREAM, 0)) == -1) { GST_ELEMENT_ERROR (this, RESOURCE, OPEN_WRITE, (NULL), GST_ERROR_SYSTEM); return FALSE; } GST_DEBUG_OBJECT (this, "opened sending server socket with fd %d", - this->server_sock_fd); + this->server_sock.fd); /* make address reusable */ - if (setsockopt (this->server_sock_fd, SOL_SOCKET, SO_REUSEADDR, &ret, + if (setsockopt (this->server_sock.fd, SOL_SOCKET, SO_REUSEADDR, &ret, sizeof (int)) < 0) { GST_ELEMENT_ERROR (this, RESOURCE, SETTINGS, (NULL), ("Could not setsockopt: %s", g_strerror (errno))); return FALSE; } /* keep connection alive; avoids SIGPIPE during write */ - if (setsockopt (this->server_sock_fd, SOL_SOCKET, SO_KEEPALIVE, &ret, + if (setsockopt (this->server_sock.fd, SOL_SOCKET, SO_KEEPALIVE, &ret, sizeof (int)) < 0) { GST_ELEMENT_ERROR (this, RESOURCE, SETTINGS, (NULL), ("Could not setsockopt: %s", g_strerror (errno))); @@ -275,7 +274,7 @@ gst_tcpserversink_init_send (GstMultiFdSink * parent) /* bind it */ GST_DEBUG_OBJECT (this, "binding server socket to address"); - ret = bind (this->server_sock_fd, (struct sockaddr *) &this->server_sin, + ret = bind (this->server_sock.fd, (struct sockaddr *) &this->server_sin, sizeof (this->server_sin)); if (ret) { @@ -289,20 +288,23 @@ gst_tcpserversink_init_send (GstMultiFdSink * parent) } /* set the server socket to nonblocking */ - fcntl (this->server_sock_fd, F_SETFL, O_NONBLOCK); + fcntl (this->server_sock.fd, F_SETFL, O_NONBLOCK); GST_DEBUG_OBJECT (this, "listening on server socket %d with queue of %d", - this->server_sock_fd, TCP_BACKLOG); - if (listen (this->server_sock_fd, TCP_BACKLOG) == -1) { + this->server_sock.fd, TCP_BACKLOG); + if (listen (this->server_sock.fd, TCP_BACKLOG) == -1) { GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ, (NULL), ("Could not listen on server socket: %s", g_strerror (errno))); return FALSE; } GST_DEBUG_OBJECT (this, "listened on server socket %d, returning from connection setup", - this->server_sock_fd); + this->server_sock.fd); - FD_SET (this->server_sock_fd, &parent->readfds); + gst_fdset_add_fd (parent->fdset, &this->server_sock); + gst_fdset_fd_ctl_read (parent->fdset, &this->server_sock, TRUE); + + //FD_SET (this->server_sock_fd, &parent->readfds); return TRUE; } @@ -312,9 +314,11 @@ gst_tcpserversink_close (GstMultiFdSink * parent) { GstTCPServerSink *this = GST_TCPSERVERSINK (parent); - if (this->server_sock_fd != -1) { - close (this->server_sock_fd); - this->server_sock_fd = -1; + if (this->server_sock.fd != -1) { + close (this->server_sock.fd); + this->server_sock.fd = -1; + + gst_fdset_remove_fd (parent->fdset, &this->server_sock); } return TRUE; } diff --git a/gst/tcp/gsttcpserversink.h b/gst/tcp/gsttcpserversink.h index 92a1847801..6b4848abec 100644 --- a/gst/tcp/gsttcpserversink.h +++ b/gst/tcp/gsttcpserversink.h @@ -73,7 +73,7 @@ struct _GstTCPServerSink { struct sockaddr_in server_sin; /* socket */ - int server_sock_fd; + GstFD server_sock; }; struct _GstTCPServerSinkClass {