diff --git a/libs/gst/net/gstnetclientclock.c b/libs/gst/net/gstnetclientclock.c index 691c414a8a..de9db2b9e7 100644 --- a/libs/gst/net/gstnetclientclock.c +++ b/libs/gst/net/gstnetclientclock.c @@ -248,70 +248,11 @@ bogus_observation: } } -typedef struct -{ - GSource source; - GstNetClientClock *clock; - gboolean *p_timeout; -} GstNetClientClockTimeoutSource; - -static gboolean -gst_net_client_clock_timeout_source_prepare (GSource * s, gint * p_timeout) -{ - GstNetClientClockTimeoutSource *source = (GstNetClientClockTimeoutSource *) s; - GstClockTime expiration_time = source->clock->priv->timeout_expiration; - GstClockTime now = gst_util_get_timestamp (); - - if (now >= expiration_time || (expiration_time - now) <= GST_MSECOND) { - *p_timeout = 0; - return TRUE; - } - - *p_timeout = (expiration_time - now) / GST_MSECOND; - GST_TRACE_OBJECT (source->clock, "time out in %d ms please", *p_timeout); - return FALSE; -} - -static gboolean -gst_net_client_clock_timeout_source_check (GSource * s) -{ - GstNetClientClockTimeoutSource *source = (GstNetClientClockTimeoutSource *) s; - - return (gst_util_get_timestamp () >= source->clock->priv->timeout_expiration); -} - -static gboolean -gst_net_client_clock_timeout_source_dispatch (GSource * s, GSourceFunc cb, - gpointer data) -{ - GstNetClientClockTimeoutSource *source = (GstNetClientClockTimeoutSource *) s; - - GST_TRACE_OBJECT (source->clock, "timed out"); - *source->p_timeout = TRUE; - return TRUE; -} - -static gboolean -gst_net_client_clock_socket_cb (GSocket * socket, GIOCondition condition, - gpointer user_data) -{ - GIOCondition *p_cond = user_data; - - GST_TRACE ("socket %p I/O condition: 0x%02x", socket, condition); - *p_cond = condition; - return TRUE; -} - static gpointer gst_net_client_clock_thread (gpointer data) { GstNetClientClock *self = data; GstNetTimePacket *packet; - GMainContext *ctx; - GSourceFuncs funcs = { NULL, }; - GSource *source; - GIOCondition cond; - gboolean timeout; GSocket *socket = self->priv->socket; GError *err = NULL; GstClock *clock = data; @@ -321,99 +262,82 @@ gst_net_client_clock_thread (gpointer data) g_socket_set_blocking (socket, TRUE); g_socket_set_timeout (socket, 0); - ctx = g_main_context_new (); - - source = g_socket_create_source (socket, G_IO_IN, self->priv->cancel); - g_source_set_name (source, "GStreamer net client clock thread socket"); - g_source_set_callback (source, (GSourceFunc) gst_net_client_clock_socket_cb, - &cond, NULL); - g_source_attach (source, ctx); - g_source_unref (source); - - /* GSocket only support second granularity for timeouts, so roll our own - * timeout source (so we don't have to create a new source whenever the - * timeout changes, as we would have to do with the default timeout source) */ - funcs.prepare = gst_net_client_clock_timeout_source_prepare; - funcs.check = gst_net_client_clock_timeout_source_check; - funcs.dispatch = gst_net_client_clock_timeout_source_dispatch; - funcs.finalize = NULL; - source = g_source_new (&funcs, sizeof (GstNetClientClockTimeoutSource)); - ((GstNetClientClockTimeoutSource *) source)->clock = self; - ((GstNetClientClockTimeoutSource *) source)->p_timeout = &timeout; - g_source_set_name (source, "GStreamer net client clock timeout"); - g_source_attach (source, ctx); - g_source_unref (source); - while (!g_cancellable_is_cancelled (self->priv->cancel)) { - cond = 0; - timeout = FALSE; - g_main_context_iteration (ctx, TRUE); + GstClockTime expiration_time = self->priv->timeout_expiration; + GstClockTime now = gst_util_get_timestamp (); + gint64 socket_timeout; - if (g_cancellable_is_cancelled (self->priv->cancel)) - break; - - if (timeout) { - /* timed out, let's send another packet */ - GST_DEBUG_OBJECT (self, "timed out"); - - packet = gst_net_time_packet_new (NULL); - - packet->local_time = gst_clock_get_internal_time (GST_CLOCK (self)); - - GST_DEBUG_OBJECT (self, "sending packet, local time = %" GST_TIME_FORMAT, - GST_TIME_ARGS (packet->local_time)); - - gst_net_time_packet_send (packet, self->priv->socket, - self->priv->servaddr, NULL); - - g_free (packet); - - /* reset timeout (but are expecting a response sooner anyway) */ - self->priv->timeout_expiration = - gst_util_get_timestamp () + gst_clock_get_timeout (clock); - continue; + if (now >= expiration_time || (expiration_time - now) <= GST_MSECOND) { + socket_timeout = 0; + } else { + socket_timeout = (expiration_time - now) / GST_USECOND; } - /* got data to read? */ - if ((cond & G_IO_IN)) { + GST_TRACE_OBJECT (self, "time out in %d microsecs please", socket_timeout); + + if (!g_socket_condition_timed_wait (socket, G_IO_IN, socket_timeout, + self->priv->cancel, &err)) { + /* cancelled, timeout or error */ + if (err->code == G_IO_ERROR_CANCELLED) { + GST_INFO_OBJECT (self, "cancelled"); + g_clear_error (&err); + break; + } else if (err->code == G_IO_ERROR_TIMED_OUT) { + /* timed out, let's send another packet */ + GST_DEBUG_OBJECT (self, "timed out"); + + packet = gst_net_time_packet_new (NULL); + + packet->local_time = gst_clock_get_internal_time (GST_CLOCK (self)); + + GST_DEBUG_OBJECT (self, + "sending packet, local time = %" GST_TIME_FORMAT, + GST_TIME_ARGS (packet->local_time)); + + gst_net_time_packet_send (packet, self->priv->socket, + self->priv->servaddr, NULL); + + g_free (packet); + + /* reset timeout (but are expecting a response sooner anyway) */ + self->priv->timeout_expiration = + gst_util_get_timestamp () + gst_clock_get_timeout (clock); + } else { + GST_DEBUG_OBJECT (self, "socket error: %s", err->message); + g_usleep (G_USEC_PER_SEC / 10); /* throttle */ + } + g_clear_error (&err); + } else { GstClockTime new_local; + /* got packet */ + new_local = gst_clock_get_internal_time (GST_CLOCK (self)); packet = gst_net_time_packet_receive (socket, NULL, &err); - if (err != NULL) { + if (packet != NULL) { + GST_LOG_OBJECT (self, "got packet back"); + GST_LOG_OBJECT (self, "local_1 = %" GST_TIME_FORMAT, + GST_TIME_ARGS (packet->local_time)); + GST_LOG_OBJECT (self, "remote = %" GST_TIME_FORMAT, + GST_TIME_ARGS (packet->remote_time)); + GST_LOG_OBJECT (self, "local_2 = %" GST_TIME_FORMAT, + GST_TIME_ARGS (new_local)); + + /* observe_times will reset the timeout */ + gst_net_client_clock_observe_times (self, packet->local_time, + packet->remote_time, new_local); + + g_free (packet); + } else if (err != NULL) { GST_WARNING_OBJECT (self, "receive error: %s", err->message); - g_error_free (err); - err = NULL; - continue; + g_clear_error (&err); } - - GST_LOG_OBJECT (self, "got packet back"); - GST_LOG_OBJECT (self, "local_1 = %" GST_TIME_FORMAT, - GST_TIME_ARGS (packet->local_time)); - GST_LOG_OBJECT (self, "remote = %" GST_TIME_FORMAT, - GST_TIME_ARGS (packet->remote_time)); - GST_LOG_OBJECT (self, "local_2 = %" GST_TIME_FORMAT, - GST_TIME_ARGS (new_local)); - - /* observe_times will reset the timeout */ - gst_net_client_clock_observe_times (self, packet->local_time, - packet->remote_time, new_local); - - g_free (packet); - continue; - } - - if ((cond & (G_IO_ERR | G_IO_HUP))) { - GST_DEBUG_OBJECT (self, "socket error?! %s", g_strerror (errno)); - g_usleep (G_USEC_PER_SEC / 10); - continue; } } GST_INFO_OBJECT (self, "shutting down net client clock thread"); - g_main_context_unref (ctx); return NULL; }