From 83113506938e5c8ae2e5b458b62dabb1ec5c0711 Mon Sep 17 00:00:00 2001 From: Peter Kjellerstedt Date: Wed, 27 Feb 2008 18:27:59 +0000 Subject: [PATCH] libs/gst/net/: Massive code removal and cleanups because of GstPoll. Original commit message from CVS: Patch by: Peter Kjellerstedt * libs/gst/net/gstnetclientclock.c: (gst_net_client_clock_init), (gst_net_client_clock_finalize), (gst_net_client_clock_do_select), (gst_net_client_clock_thread), (gst_net_client_clock_start), (gst_net_client_clock_stop), (gst_net_client_clock_new): * libs/gst/net/gstnetclientclock.h: * libs/gst/net/gstnettimeprovider.c: (gst_net_time_provider_init), (gst_net_time_provider_finalize), (gst_net_time_provider_thread), (gst_net_time_provider_start), (gst_net_time_provider_stop), (gst_net_time_provider_new): * libs/gst/net/gstnettimeprovider.h: Massive code removal and cleanups because of GstPoll. Fixes #505417. --- ChangeLog | 17 ++++ libs/gst/net/gstnetclientclock.c | 162 ++++++++---------------------- libs/gst/net/gstnetclientclock.h | 11 +- libs/gst/net/gstnettimeprovider.c | 143 +++++++------------------- libs/gst/net/gstnettimeprovider.h | 12 ++- 5 files changed, 111 insertions(+), 234 deletions(-) diff --git a/ChangeLog b/ChangeLog index f2f22881db..a2c606451d 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,20 @@ +2008-02-27 Wim Taymans + + Patch by: Peter Kjellerstedt + + * libs/gst/net/gstnetclientclock.c: (gst_net_client_clock_init), + (gst_net_client_clock_finalize), (gst_net_client_clock_do_select), + (gst_net_client_clock_thread), (gst_net_client_clock_start), + (gst_net_client_clock_stop), (gst_net_client_clock_new): + * libs/gst/net/gstnetclientclock.h: + * libs/gst/net/gstnettimeprovider.c: (gst_net_time_provider_init), + (gst_net_time_provider_finalize), (gst_net_time_provider_thread), + (gst_net_time_provider_start), (gst_net_time_provider_stop), + (gst_net_time_provider_new): + * libs/gst/net/gstnettimeprovider.h: + Massive code removal and cleanups because of GstPoll. + Fixes #505417. + 2008-02-27 Wim Taymans * configure.ac: diff --git a/libs/gst/net/gstnetclientclock.c b/libs/gst/net/gstnetclientclock.c index 687dc48fb2..bd30deba3b 100644 --- a/libs/gst/net/gstnetclientclock.c +++ b/libs/gst/net/gstnetclientclock.c @@ -64,25 +64,6 @@ GST_DEBUG_CATEGORY_STATIC (ncc_debug); #define GST_CAT_DEFAULT (ncc_debug) -/* the select call is also performed on the control sockets, that way - * we can send special commands to unblock or restart the select call */ -#define CONTROL_RESTART 'R' /* restart the select call */ -#define CONTROL_STOP 'S' /* stop the select call */ -#define CONTROL_SOCKETS(self) self->control_sock -#define WRITE_SOCKET(self) self->control_sock[1] -#define READ_SOCKET(self) self->control_sock[0] - -#define SEND_COMMAND(self, command) \ -G_STMT_START { \ - unsigned char c; c = command; \ - write (WRITE_SOCKET(self), &c, 1); \ -} G_STMT_END - -#define READ_COMMAND(self, command, res) \ -G_STMT_START { \ - res = read(READ_SOCKET(self), &command, 1); \ -} G_STMT_END - #define DEFAULT_ADDRESS "127.0.0.1" #define DEFAULT_PORT 5637 #define DEFAULT_TIMEOUT GST_SECOND @@ -177,13 +158,10 @@ gst_net_client_clock_init (GstNetClientClock * self, clock->timeout = DEFAULT_TIMEOUT; - self->sock = -1; + self->sock.fd = -1; self->thread = NULL; self->servaddr = NULL; - - READ_SOCKET (self) = -1; - WRITE_SOCKET (self) = -1; } static void @@ -196,11 +174,9 @@ gst_net_client_clock_finalize (GObject * object) g_assert (self->thread == NULL); } - if (READ_SOCKET (self) != -1) { - close (READ_SOCKET (self)); - close (WRITE_SOCKET (self)); - READ_SOCKET (self) = -1; - WRITE_SOCKET (self) = -1; + if (self->fdset) { + gst_poll_free (self->fdset); + self->fdset = NULL; } g_free (self->address); @@ -298,46 +274,26 @@ bogus_observation: } static gint -gst_net_client_clock_do_select (GstNetClientClock * self, fd_set * readfds) +gst_net_client_clock_do_select (GstNetClientClock * self) { - gint max_sock; - gint ret; - while (TRUE) { - FD_ZERO (readfds); - FD_SET (self->sock, readfds); - FD_SET (READ_SOCKET (self), readfds); - max_sock = MAX (self->sock, READ_SOCKET (self)); + GstClockTime diff; + gint ret; GST_LOG_OBJECT (self, "doing select"); - { - GstClockTime diff; - GTimeVal tv, *ptv = &tv; - diff = gst_clock_get_internal_time (GST_CLOCK (self)); - GST_TIME_TO_TIMEVAL (self->current_timeout, tv); + diff = gst_clock_get_internal_time (GST_CLOCK (self)); + ret = gst_poll_wait (self->fdset, self->current_timeout); + diff = gst_clock_get_internal_time (GST_CLOCK (self)) - diff; -#ifdef G_OS_WIN32 - if (((max_sock + 1) != READ_SOCKET (self)) || - ((max_sock + 1) != WRITE_SOCKET (self))) { - ret = - select (max_sock + 1, readfds, NULL, NULL, (struct timeval *) ptv); - } else { - ret = 1; - } -#else - ret = select (max_sock + 1, readfds, NULL, NULL, (struct timeval *) ptv); -#endif - diff = gst_clock_get_internal_time (GST_CLOCK (self)) - diff; + if (diff > self->current_timeout) + self->current_timeout = 0; + else + self->current_timeout -= diff; - if (diff > self->current_timeout) - self->current_timeout = 0; - else - self->current_timeout -= diff; - } GST_LOG_OBJECT (self, "select returned %d", ret); - if (ret < 0) { + if (ret < 0 && errno != EBUSY) { if (errno != EAGAIN && errno != EINTR) goto select_error; else @@ -367,42 +323,16 @@ gst_net_client_clock_thread (gpointer data) GstNetClientClock *self = data; struct sockaddr_in tmpaddr; socklen_t len; - fd_set read_fds; GstNetTimePacket *packet; gint ret; GstClock *clock = data; while (TRUE) { - ret = gst_net_client_clock_do_select (self, &read_fds); + ret = gst_net_client_clock_do_select (self); - if (FD_ISSET (READ_SOCKET (self), &read_fds)) { - /* got control message */ - while (TRUE) { - gchar command; - int res; - - READ_COMMAND (self, command, res); - if (res <= 0) { - GST_LOG_OBJECT (self, "no more commands"); - break; - } - - GST_LOG_OBJECT (self, "control message: '%c'", command); - switch (command) { - case CONTROL_STOP: - /* break out of the select loop */ - GST_LOG_OBJECT (self, "stop"); - goto stopped; - default: - GST_WARNING_OBJECT (self, "unknown message: '%c'", command); - g_warning ("netclientclock: unknown control message received"); - continue; - } - - g_assert_not_reached (); - } - - continue; + if (ret < 0 && errno == EBUSY) { + GST_LOG_OBJECT (self, "stop"); + goto stopped; } else if (ret == 0) { /* timed out, let's send another packet */ GST_DEBUG_OBJECT (self, "timed out"); @@ -413,7 +343,7 @@ gst_net_client_clock_thread (gpointer data) GST_DEBUG_OBJECT (self, "sending packet, local time = %" GST_TIME_FORMAT, GST_TIME_ARGS (packet->local_time)); - gst_net_time_packet_send (packet, self->sock, + gst_net_time_packet_send (packet, self->sock.fd, (struct sockaddr *) self->servaddr, sizeof (struct sockaddr_in)); g_free (packet); @@ -421,12 +351,12 @@ gst_net_client_clock_thread (gpointer data) /* reset timeout */ self->current_timeout = clock->timeout; continue; - } else if (FD_ISSET (self->sock, &read_fds)) { + } else if (gst_poll_fd_can_read (self->fdset, &self->sock)) { /* got data in */ GstClockTime new_local = gst_clock_get_internal_time (GST_CLOCK (self)); len = sizeof (struct sockaddr); - packet = gst_net_time_packet_receive (self->sock, + packet = gst_net_time_packet_receive (self->sock.fd, (struct sockaddr *) &tmpaddr, &len); if (!packet) @@ -488,10 +418,10 @@ gst_net_client_clock_start (GstNetClientClock * self) if ((ret = socket (AF_INET, SOCK_DGRAM, 0)) < 0) goto no_socket; - self->sock = ret; + self->sock.fd = ret; len = sizeof (myaddr); - ret = getsockname (self->sock, (struct sockaddr *) &myaddr, &len); + ret = getsockname (self->sock.fd, (struct sockaddr *) &myaddr, &len); if (ret < 0) goto getsockname_error; @@ -511,6 +441,9 @@ gst_net_client_clock_start (GstNetClientClock * self) GST_DEBUG_OBJECT (self, "will communicate with %s:%d", self->address, self->port); + gst_poll_add_fd (self->fdset, &self->sock); + gst_poll_fd_ctl_read (self->fdset, &self->sock, TRUE); + self->thread = g_thread_create (gst_net_client_clock_thread, self, TRUE, &error); if (!self->thread) @@ -529,23 +462,24 @@ getsockname_error: { GST_ERROR_OBJECT (self, "getsockname failed %d: %s (%d)", ret, g_strerror (errno), errno); - close (self->sock); - self->sock = -1; + close (self->sock.fd); + self->sock.fd = -1; return FALSE; } bad_address: { GST_ERROR_OBJECT (self, "inet_aton failed %d: %s (%d)", ret, g_strerror (errno), errno); - close (self->sock); - self->sock = -1; + close (self->sock.fd); + self->sock.fd = -1; return FALSE; } no_thread: { GST_ERROR_OBJECT (self, "could not create thread: %s", error->message); - close (self->sock); - self->sock = -1; + gst_poll_remove_fd (self->fdset, &self->sock); + close (self->sock.fd); + self->sock.fd = -1; g_free (self->servaddr); self->servaddr = NULL; g_error_free (error); @@ -556,13 +490,14 @@ no_thread: static void gst_net_client_clock_stop (GstNetClientClock * self) { - SEND_COMMAND (self, CONTROL_STOP); + gst_poll_set_flushing (self->fdset, TRUE); g_thread_join (self->thread); self->thread = NULL; - if (self->sock != -1) { - close (self->sock); - self->sock = -1; + if (self->sock.fd != -1) { + gst_poll_remove_fd (self->fdset, &self->sock); + close (self->sock.fd); + self->sock.fd = -1; } } @@ -586,7 +521,6 @@ gst_net_client_clock_new (gchar * name, const gchar * remote_address, { GstNetClientClock *ret; GstClockTime internal; - gint iret; g_return_val_if_fail (remote_address != NULL, NULL); g_return_val_if_fail (remote_port > 0, NULL); @@ -614,18 +548,8 @@ gst_net_client_clock_new (gchar * name, const gchar * remote_address, g_warning ("unable to set the base time, expect sync problems!"); } -#ifdef G_OS_WIN32 - GST_DEBUG_OBJECT (ret, "creating pipe"); - if ((iret = _pipe (CONTROL_SOCKETS (ret), 4096, _O_BINARY)) < 0) - goto no_socket_pair; -#else - GST_DEBUG_OBJECT (ret, "creating socket pair"); - if ((iret = socketpair (PF_UNIX, SOCK_STREAM, 0, CONTROL_SOCKETS (ret))) < 0) - goto no_socket_pair; - - fcntl (READ_SOCKET (ret), F_SETFL, O_NONBLOCK); - fcntl (WRITE_SOCKET (ret), F_SETFL, O_NONBLOCK); -#endif + if ((ret->fdset = gst_poll_new (GST_POLL_MODE_AUTO, TRUE)) == NULL) + goto no_fdset; if (!gst_net_client_clock_start (ret)) goto failed_start; @@ -633,9 +557,9 @@ gst_net_client_clock_new (gchar * name, const gchar * remote_address, /* all systems go, cap'n */ return (GstClock *) ret; -no_socket_pair: +no_fdset: { - GST_ERROR_OBJECT (ret, "no socket pair %d: %s (%d)", iret, + GST_ERROR_OBJECT (ret, "could not create an fdset: %s (%d)", g_strerror (errno), errno); gst_object_unref (ret); return NULL; diff --git a/libs/gst/net/gstnetclientclock.h b/libs/gst/net/gstnetclientclock.h index 4d46815c16..ce3f50412f 100644 --- a/libs/gst/net/gstnetclientclock.h +++ b/libs/gst/net/gstnetclientclock.h @@ -72,10 +72,13 @@ struct _GstNetClientClock { /*< protected >*/ gchar *address; gint port; - + /*< private >*/ - int sock; - int control_sock[2]; + /* the size of _gst_reserved1 and sock must equal three ints since this used + * to be int sock and int control_sock[2] */ + int _gst_reserved1; + GstPollFD sock; + GstPoll *fdset; GstClockTime current_timeout; @@ -84,7 +87,7 @@ struct _GstNetClientClock { GThread *thread; /*< private >*/ - gpointer _gst_reserved[GST_PADDING]; + gpointer _gst_reserved[GST_PADDING - 1]; }; struct _GstNetClientClockClass { diff --git a/libs/gst/net/gstnettimeprovider.c b/libs/gst/net/gstnettimeprovider.c index 36e04d3721..1d8afd8902 100644 --- a/libs/gst/net/gstnettimeprovider.c +++ b/libs/gst/net/gstnettimeprovider.c @@ -63,29 +63,10 @@ GST_DEBUG_CATEGORY_STATIC (ntp_debug); #define GST_CAT_DEFAULT (ntp_debug) -/* the select call is also performed on the control sockets, that way - * we can send special commands to unblock or restart the select call */ -#define CONTROL_RESTART 'R' /* restart the select call */ -#define CONTROL_STOP 'S' /* stop the select call */ -#define CONTROL_SOCKETS(self) self->control_sock -#define WRITE_SOCKET(self) self->control_sock[1] -#define READ_SOCKET(self) self->control_sock[0] - #ifdef G_OS_WIN32 #define close(sock) closesocket(sock) #endif -#define SEND_COMMAND(self, command) \ -G_STMT_START { \ - unsigned char c; c = command; \ - write (WRITE_SOCKET(self), &c, 1); \ -} G_STMT_END - -#define READ_COMMAND(self, command, res) \ -G_STMT_START { \ - res = read(READ_SOCKET(self), &command, 1); \ -} G_STMT_END - #define DEFAULT_ADDRESS "0.0.0.0" #define DEFAULT_PORT 5637 @@ -187,13 +168,10 @@ gst_net_time_provider_init (GstNetTimeProvider * self, #endif self->port = DEFAULT_PORT; - self->sock = -1; + self->sock.fd = -1; self->address = g_strdup (DEFAULT_ADDRESS); self->thread = NULL; self->active.active = TRUE; - - READ_SOCKET (self) = -1; - WRITE_SOCKET (self) = -1; } static void @@ -206,11 +184,9 @@ gst_net_time_provider_finalize (GObject * object) g_assert (self->thread == NULL); } - if (READ_SOCKET (self) != -1) { - close (READ_SOCKET (self)); - close (WRITE_SOCKET (self)); - READ_SOCKET (self) = -1; - WRITE_SOCKET (self) = -1; + if (self->fdset) { + gst_poll_free (self->fdset); + self->fdset = NULL; } g_free (self->address); @@ -233,67 +209,27 @@ gst_net_time_provider_thread (gpointer data) GstNetTimeProvider *self = data; struct sockaddr_in tmpaddr; socklen_t len; - fd_set read_fds; - guint max_sock; GstNetTimePacket *packet; gint ret; while (TRUE) { - FD_ZERO (&read_fds); - FD_SET (self->sock, &read_fds); - FD_SET (READ_SOCKET (self), &read_fds); - max_sock = MAX (self->sock, READ_SOCKET (self)); - GST_LOG_OBJECT (self, "doing select"); -#ifdef G_OS_WIN32 - if (((max_sock + 1) != READ_SOCKET (self)) || - ((max_sock + 1) != WRITE_SOCKET (self))) { - ret = select (max_sock + 1, &read_fds, NULL, NULL, NULL); - } else { - ret = 1; - } -#else - ret = select (max_sock + 1, &read_fds, NULL, NULL, NULL); -#endif + ret = gst_poll_wait (self->fdset, GST_CLOCK_TIME_NONE); GST_LOG_OBJECT (self, "select returned %d", ret); if (ret <= 0) { - if (errno != EAGAIN && errno != EINTR) + if (errno == EBUSY) { + GST_LOG_OBJECT (self, "stop"); + goto stopped; + } else if (errno != EAGAIN && errno != EINTR) goto select_error; else continue; - } else if (FD_ISSET (READ_SOCKET (self), &read_fds)) { - /* got control message */ - while (TRUE) { - gchar command; - int res; - - READ_COMMAND (self, command, res); - if (res <= 0) { - GST_LOG_OBJECT (self, "no more commands"); - break; - } - - switch (command) { - case CONTROL_STOP: - /* break out of the select loop */ - GST_LOG_OBJECT (self, "stop"); - goto stopped; - default: - GST_WARNING_OBJECT (self, "unkown"); - g_warning ("nettimeprovider: unknown control message received"); - continue; - } - - g_assert_not_reached (); - } - - continue; } else { /* got data in */ len = sizeof (struct sockaddr); - packet = gst_net_time_packet_receive (self->sock, + packet = gst_net_time_packet_receive (self->sock.fd, (struct sockaddr *) &tmpaddr, &len); if (!packet) @@ -304,7 +240,7 @@ gst_net_time_provider_thread (gpointer data) packet->remote_time = gst_clock_get_time (self->clock); /* ignore errors */ - gst_net_time_packet_send (packet, self->sock, + gst_net_time_packet_send (packet, self->sock.fd, (struct sockaddr *) &tmpaddr, len); } @@ -412,10 +348,10 @@ gst_net_time_provider_start (GstNetTimeProvider * self) if ((ret = socket (AF_INET, SOCK_DGRAM, 0)) < 0) goto no_socket; - self->sock = ret; + self->sock.fd = ret; ru = 1; - ret = setsockopt (self->sock, SOL_SOCKET, SO_REUSEADDR, &ru, sizeof (ru)); + ret = setsockopt (self->sock.fd, SOL_SOCKET, SO_REUSEADDR, &ru, sizeof (ru)); if (ret < 0) goto setsockopt_error; @@ -427,12 +363,12 @@ gst_net_time_provider_start (GstNetTimeProvider * self) inet_aton (self->address, &my_addr.sin_addr); GST_DEBUG_OBJECT (self, "binding on port %d", self->port); - ret = bind (self->sock, (struct sockaddr *) &my_addr, sizeof (my_addr)); + ret = bind (self->sock.fd, (struct sockaddr *) &my_addr, sizeof (my_addr)); if (ret < 0) goto bind_error; len = sizeof (my_addr); - ret = getsockname (self->sock, (struct sockaddr *) &my_addr, &len); + ret = getsockname (self->sock.fd, (struct sockaddr *) &my_addr, &len); if (ret < 0) goto getsockname_error; @@ -445,6 +381,9 @@ gst_net_time_provider_start (GstNetTimeProvider * self) g_object_notify (G_OBJECT (self), "port"); } + gst_poll_add_fd (self->fdset, &self->sock); + gst_poll_fd_ctl_read (self->fdset, &self->sock, TRUE); + self->thread = g_thread_create (gst_net_time_provider_thread, self, TRUE, &error); if (!self->thread) @@ -461,32 +400,33 @@ no_socket: } setsockopt_error: { - close (self->sock); - self->sock = -1; + close (self->sock.fd); + self->sock.fd = -1; GST_ERROR_OBJECT (self, "setsockopt failed %d: %s (%d)", ret, g_strerror (errno), errno); return FALSE; } bind_error: { - close (self->sock); - self->sock = -1; + close (self->sock.fd); + self->sock.fd = -1; GST_ERROR_OBJECT (self, "bind failed %d: %s (%d)", ret, g_strerror (errno), errno); return FALSE; } getsockname_error: { - close (self->sock); - self->sock = -1; + close (self->sock.fd); + self->sock.fd = -1; GST_ERROR_OBJECT (self, "getsockname failed %d: %s (%d)", ret, g_strerror (errno), errno); return FALSE; } no_thread: { - close (self->sock); - self->sock = -1; + gst_poll_remove_fd (self->fdset, &self->sock); + close (self->sock.fd); + self->sock.fd = -1; GST_ERROR_OBJECT (self, "could not create thread: %s", error->message); g_error_free (error); return FALSE; @@ -496,13 +436,14 @@ no_thread: static void gst_net_time_provider_stop (GstNetTimeProvider * self) { - SEND_COMMAND (self, CONTROL_STOP); + gst_poll_set_flushing (self->fdset, TRUE); g_thread_join (self->thread); self->thread = NULL; - if (self->sock != -1) { - close (self->sock); - self->sock = -1; + if (self->sock.fd != -1) { + gst_poll_remove_fd (self->fdset, &self->sock); + close (self->sock.fd); + self->sock.fd = -1; } } @@ -521,7 +462,6 @@ GstNetTimeProvider * gst_net_time_provider_new (GstClock * clock, const gchar * address, gint port) { GstNetTimeProvider *ret; - gint iret; g_return_val_if_fail (clock && GST_IS_CLOCK (clock), NULL); g_return_val_if_fail (port >= 0 && port <= G_MAXUINT16, NULL); @@ -529,18 +469,8 @@ gst_net_time_provider_new (GstClock * clock, const gchar * address, gint port) ret = g_object_new (GST_TYPE_NET_TIME_PROVIDER, "clock", clock, "address", address, "port", port, NULL); -#ifdef G_OS_WIN32 - GST_DEBUG_OBJECT (ret, "creating pipe"); - if ((iret = _pipe (CONTROL_SOCKETS (ret), 4096, _O_BINARY)) < 0) - goto no_socket_pair; -#else - GST_DEBUG_OBJECT (ret, "creating socket pair"); - if ((iret = socketpair (PF_UNIX, SOCK_STREAM, 0, CONTROL_SOCKETS (ret))) < 0) - goto no_socket_pair; - - fcntl (READ_SOCKET (ret), F_SETFL, O_NONBLOCK); - fcntl (WRITE_SOCKET (ret), F_SETFL, O_NONBLOCK); -#endif + if ((ret->fdset = gst_poll_new (GST_POLL_MODE_AUTO, TRUE)) == NULL) + goto no_fdset; if (!gst_net_time_provider_start (ret)) goto failed_start; @@ -548,9 +478,9 @@ gst_net_time_provider_new (GstClock * clock, const gchar * address, gint port) /* all systems go, cap'n */ return ret; -no_socket_pair: +no_fdset: { - GST_ERROR_OBJECT (ret, "no socket pair %d: %s (%d)", iret, + GST_ERROR_OBJECT (ret, "could not create an fdset: %s (%d)", g_strerror (errno), errno); gst_object_unref (ret); return NULL; @@ -561,5 +491,4 @@ failed_start: gst_object_unref (ret); return NULL; } - } diff --git a/libs/gst/net/gstnettimeprovider.h b/libs/gst/net/gstnettimeprovider.h index a1b4ae2aa0..11ca45e80f 100644 --- a/libs/gst/net/gstnettimeprovider.h +++ b/libs/gst/net/gstnettimeprovider.h @@ -71,8 +71,12 @@ struct _GstNetTimeProvider { gchar *address; int port; - int sock; - int control_sock[2]; + /* the size of _gst_reserved2 and sock must equal three ints since this used + * to be int sock and int control_sock[2]. This relies on the fact that + * GstPollFD happens to be two ints */ + int _gst_reserved2; + GstPollFD sock; + GstPoll *fdset; GThread *thread; @@ -83,9 +87,9 @@ struct _GstNetTimeProvider { /* has to be a gint, we use atomic ops here */ gint active; } active; - + /*< private >*/ - gpointer _gst_reserved[GST_PADDING - 1]; + gpointer _gst_reserved[GST_PADDING - 2]; }; struct _GstNetTimeProviderClass {