From 1c9c9847fdfd901b62c76df4e99684731cf5a16e Mon Sep 17 00:00:00 2001 From: Jan Schmidt Date: Sat, 10 Jan 2015 21:42:00 +1100 Subject: [PATCH] netclock: Implement sending statistic bus messages and discont limits Allow setting a GstBus on the network clock client via a new 'bus' object property. If a bus is set, the clock will output an element message containing statistics about new clock observations and the clock correlation. When the local clock is synchronised with the remote, limit the maximum jump in the clock at any point to be one average RTT to the server. Also, publish in the bus message whether we are synched with the remote or not. --- libs/gst/net/gstnetclientclock.c | 146 +++++++++++++++++++++++++++++-- 1 file changed, 138 insertions(+), 8 deletions(-) diff --git a/libs/gst/net/gstnetclientclock.c b/libs/gst/net/gstnetclientclock.c index dd1dafe7b7..d125e8987a 100644 --- a/libs/gst/net/gstnetclientclock.c +++ b/libs/gst/net/gstnetclientclock.c @@ -45,6 +45,10 @@ * * A #GstNetClientClock is typically set on a #GstPipeline with * gst_pipeline_use_clock(). + * + * If you set a #GstBus on the clock via the "bus" object property, it will + * send @GST_MESSAGE_INFO messages with an attached #GstStructure containing + * statistics about clock accuracy and network traffic. */ #ifdef HAVE_CONFIG_H @@ -69,7 +73,8 @@ enum PROP_0, PROP_ADDRESS, PROP_PORT, - PROP_ROUNDTRIP_LIMIT + PROP_ROUNDTRIP_LIMIT, + PROP_BUS }; #define GST_NET_CLIENT_CLOCK_GET_PRIVATE(obj) \ @@ -89,6 +94,8 @@ struct _GstNetClientClockPrivate gchar *address; gint port; + + GstBus *bus; }; #define _do_init \ @@ -126,6 +133,10 @@ gst_net_client_clock_class_init (GstNetClientClockClass * klass) g_param_spec_int ("port", "port", "The port on which the remote server is listening", 0, G_MAXUINT16, DEFAULT_PORT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + g_object_class_install_property (gobject_class, PROP_BUS, + g_param_spec_object ("bus", "bus", + "A GstBus on which to send clock status information", GST_TYPE_BUS, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); /** * GstNetClientClock::round-trip-limit: @@ -191,6 +202,11 @@ gst_net_client_clock_finalize (GObject * object) self->priv->socket = NULL; } + if (self->priv->bus != NULL) { + gst_object_unref (self->priv->bus); + self->priv->bus = NULL; + } + G_OBJECT_CLASS (parent_class)->finalize (object); } @@ -202,16 +218,29 @@ gst_net_client_clock_set_property (GObject * object, guint prop_id, switch (prop_id) { case PROP_ADDRESS: + GST_OBJECT_LOCK (self); g_free (self->priv->address); self->priv->address = g_value_dup_string (value); if (self->priv->address == NULL) self->priv->address = g_strdup (DEFAULT_ADDRESS); + GST_OBJECT_UNLOCK (self); break; case PROP_PORT: + GST_OBJECT_LOCK (self); self->priv->port = g_value_get_int (value); + GST_OBJECT_UNLOCK (self); break; case PROP_ROUNDTRIP_LIMIT: + GST_OBJECT_LOCK (self); self->priv->roundtrip_limit = g_value_get_uint64 (value); + GST_OBJECT_UNLOCK (self); + break; + case PROP_BUS: + GST_OBJECT_LOCK (self); + if (self->priv->bus) + gst_object_unref (self->priv->bus); + self->priv->bus = g_value_dup_object (value); + GST_OBJECT_UNLOCK (self); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); @@ -227,13 +256,22 @@ gst_net_client_clock_get_property (GObject * object, guint prop_id, switch (prop_id) { case PROP_ADDRESS: + GST_OBJECT_LOCK (self); g_value_set_string (value, self->priv->address); + GST_OBJECT_UNLOCK (self); break; case PROP_PORT: g_value_set_int (value, self->priv->port); break; case PROP_ROUNDTRIP_LIMIT: + GST_OBJECT_LOCK (self); g_value_set_uint64 (value, self->priv->roundtrip_limit); + GST_OBJECT_UNLOCK (self); + break; + case PROP_BUS: + GST_OBJECT_LOCK (self); + g_value_set_object (value, self->priv->bus); + GST_OBJECT_UNLOCK (self); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); @@ -250,7 +288,20 @@ gst_net_client_clock_observe_times (GstNetClientClock * self, GstClockTime local_avg; gdouble r_squared; GstClock *clock; - GstClockTime rtt; + GstClockTime rtt, rtt_limit; + GstBus *bus = NULL; + /* Use for discont tracking */ + GstClockTime time_before = 0; + GstClockTime min_guess = 0; + GstClockTimeDiff time_discont; + gboolean synched; + GstClockTime internal_time, external_time, rate_num, rate_den; + + GST_OBJECT_LOCK (self); + rtt_limit = self->priv->roundtrip_limit; + if (self->priv->bus) + bus = gst_object_ref (self->priv->bus); + GST_OBJECT_UNLOCK (self); if (local_2 < local_1) { GST_LOG_OBJECT (self, "Dropping observation: receive time %" GST_TIME_FORMAT @@ -261,11 +312,10 @@ gst_net_client_clock_observe_times (GstNetClientClock * self, rtt = GST_CLOCK_DIFF (local_1, local_2); - if ((self->priv->roundtrip_limit > 0) && (rtt > self->priv->roundtrip_limit)) { + if ((rtt_limit > 0) && (rtt > rtt_limit)) { GST_LOG_OBJECT (self, "Dropping observation: RTT %" GST_TIME_FORMAT " > limit %" - GST_TIME_FORMAT, GST_TIME_ARGS (rtt), - GST_TIME_ARGS (self->priv->roundtrip_limit)); + GST_TIME_FORMAT, GST_TIME_ARGS (rtt), GST_TIME_ARGS (rtt_limit)); goto bogus_observation; } @@ -294,8 +344,23 @@ gst_net_client_clock_observe_times (GstNetClientClock * self, clock = GST_CLOCK_CAST (self); - if (gst_clock_add_observation (GST_CLOCK (self), local_avg, remote, - &r_squared)) { + /* Store what the clock produced as 'now' before this update */ + gst_clock_get_calibration (GST_CLOCK (self), &internal_time, &external_time, + &rate_num, &rate_den); + + min_guess = + gst_clock_adjust_with_calibration (GST_CLOCK (self), local_1, + internal_time, external_time, rate_num, rate_den); + time_before = + gst_clock_adjust_with_calibration (GST_CLOCK (self), local_2, + internal_time, external_time, rate_num, rate_den); + + /* If the remote observation was within our min/max estimates, we're synched */ + synched = (GST_CLOCK_DIFF (remote, min_guess) < 0 + && GST_CLOCK_DIFF (remote, time_before) > 0); + + if (gst_clock_add_observation_unapplied (GST_CLOCK (self), local_avg, remote, + &r_squared, &internal_time, &external_time, &rate_num, &rate_den)) { /* ghetto formula - shorter timeout for bad correlations */ current_timeout = (1e-3 / (1 - MIN (r_squared, 0.99999))) * GST_SECOND; current_timeout = MIN (current_timeout, gst_clock_get_timeout (clock)); @@ -303,12 +368,78 @@ gst_net_client_clock_observe_times (GstNetClientClock * self, current_timeout = 0; } + /* Now compare the difference (discont) in the clock + * after this observation */ + time_discont = GST_CLOCK_DIFF (time_before, + gst_clock_adjust_with_calibration (GST_CLOCK (self), local_2, + internal_time, external_time, rate_num, rate_den)); + + /* If we were in sync with the remote clock, clamp the allowed + * discontinuity to within quarter of one RTT. In sync means our send/receive estimates + * of remote time correctly windowed the actual remote time observation */ + if (synched && ABS (time_discont) > priv->rtt_avg / 4) { + GstClockTimeDiff offset; + GstClockTime max_discont = priv->rtt_avg / 4; + GST_LOG_OBJECT (clock, + "Too large a discont, clamping to 1/2 average RTT = %" GST_TIME_FORMAT, + GST_TIME_ARGS (max_discont)); + if (time_discont > 0) { /* Too large a forward step - add a -ve offset */ + offset = max_discont - time_discont; + if (-offset > external_time) + external_time = 0; + else + external_time += offset; + } else { /* Too large a backward step - add a +ve offset */ + offset = -(max_discont + time_discont); + external_time += offset; + } + + time_discont += offset; + } + + gst_clock_set_calibration (GST_CLOCK (self), internal_time, external_time, + rate_num, rate_den); + + if (bus) { + GstStructure *s; + GstMessage *msg; + + + s = gst_structure_new ("gst-netclock-statistics", + "synchronised", G_TYPE_BOOLEAN, synched, + "rtt", G_TYPE_UINT64, rtt, + "rtt-average", G_TYPE_UINT64, priv->rtt_avg, + "local", G_TYPE_UINT64, local_avg, + "remote", G_TYPE_UINT64, remote, + "discontinuity", G_TYPE_INT64, time_discont, + "remote-min-estimate", G_TYPE_UINT64, min_guess, + "remote-max-estimate", G_TYPE_UINT64, time_before, + "remote-min-error", G_TYPE_INT64, GST_CLOCK_DIFF (remote, min_guess), + "remote-max-error", G_TYPE_INT64, GST_CLOCK_DIFF (remote, time_before), + "request-send", G_TYPE_UINT64, local_1, + "request-receive", G_TYPE_UINT64, local_2, + "r-squared", G_TYPE_DOUBLE, r_squared, + "timeout", G_TYPE_UINT64, current_timeout, + "internal-time", G_TYPE_UINT64, internal_time, + "external-time", G_TYPE_UINT64, external_time, + "rate-num", G_TYPE_UINT64, rate_num, + "rate-den", G_TYPE_UINT64, rate_den, + "local-clock-offset", G_TYPE_INT64, GST_CLOCK_DIFF (internal_time, + external_time), NULL); + msg = gst_message_new_element (GST_OBJECT (self), s); + gst_bus_post (bus, msg); + } + GST_INFO ("next timeout: %" GST_TIME_FORMAT, GST_TIME_ARGS (current_timeout)); self->priv->timeout_expiration = gst_util_get_timestamp () + current_timeout; + if (bus) + gst_object_unref (bus); return; bogus_observation: + if (bus) + gst_object_unref (bus); /* Schedule a new packet again soon */ self->priv->timeout_expiration = gst_util_get_timestamp () + (GST_SECOND / 4); return; @@ -402,7 +533,6 @@ gst_net_client_clock_thread (gpointer data) } } } - GST_INFO_OBJECT (self, "shutting down net client clock thread"); return NULL; }