diff --git a/libs/gst/net/gstnetclientclock.c b/libs/gst/net/gstnetclientclock.c index dd1dafe7b7..d125e8987a 100644 --- a/libs/gst/net/gstnetclientclock.c +++ b/libs/gst/net/gstnetclientclock.c @@ -45,6 +45,10 @@ * * A #GstNetClientClock is typically set on a #GstPipeline with * gst_pipeline_use_clock(). + * + * If you set a #GstBus on the clock via the "bus" object property, it will + * send @GST_MESSAGE_INFO messages with an attached #GstStructure containing + * statistics about clock accuracy and network traffic. */ #ifdef HAVE_CONFIG_H @@ -69,7 +73,8 @@ enum PROP_0, PROP_ADDRESS, PROP_PORT, - PROP_ROUNDTRIP_LIMIT + PROP_ROUNDTRIP_LIMIT, + PROP_BUS }; #define GST_NET_CLIENT_CLOCK_GET_PRIVATE(obj) \ @@ -89,6 +94,8 @@ struct _GstNetClientClockPrivate gchar *address; gint port; + + GstBus *bus; }; #define _do_init \ @@ -126,6 +133,10 @@ gst_net_client_clock_class_init (GstNetClientClockClass * klass) g_param_spec_int ("port", "port", "The port on which the remote server is listening", 0, G_MAXUINT16, DEFAULT_PORT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + g_object_class_install_property (gobject_class, PROP_BUS, + g_param_spec_object ("bus", "bus", + "A GstBus on which to send clock status information", GST_TYPE_BUS, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); /** * GstNetClientClock::round-trip-limit: @@ -191,6 +202,11 @@ gst_net_client_clock_finalize (GObject * object) self->priv->socket = NULL; } + if (self->priv->bus != NULL) { + gst_object_unref (self->priv->bus); + self->priv->bus = NULL; + } + G_OBJECT_CLASS (parent_class)->finalize (object); } @@ -202,16 +218,29 @@ gst_net_client_clock_set_property (GObject * object, guint prop_id, switch (prop_id) { case PROP_ADDRESS: + GST_OBJECT_LOCK (self); g_free (self->priv->address); self->priv->address = g_value_dup_string (value); if (self->priv->address == NULL) self->priv->address = g_strdup (DEFAULT_ADDRESS); + GST_OBJECT_UNLOCK (self); break; case PROP_PORT: + GST_OBJECT_LOCK (self); self->priv->port = g_value_get_int (value); + GST_OBJECT_UNLOCK (self); break; case PROP_ROUNDTRIP_LIMIT: + GST_OBJECT_LOCK (self); self->priv->roundtrip_limit = g_value_get_uint64 (value); + GST_OBJECT_UNLOCK (self); + break; + case PROP_BUS: + GST_OBJECT_LOCK (self); + if (self->priv->bus) + gst_object_unref (self->priv->bus); + self->priv->bus = g_value_dup_object (value); + GST_OBJECT_UNLOCK (self); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); @@ -227,13 +256,22 @@ gst_net_client_clock_get_property (GObject * object, guint prop_id, switch (prop_id) { case PROP_ADDRESS: + GST_OBJECT_LOCK (self); g_value_set_string (value, self->priv->address); + GST_OBJECT_UNLOCK (self); break; case PROP_PORT: g_value_set_int (value, self->priv->port); break; case PROP_ROUNDTRIP_LIMIT: + GST_OBJECT_LOCK (self); g_value_set_uint64 (value, self->priv->roundtrip_limit); + GST_OBJECT_UNLOCK (self); + break; + case PROP_BUS: + GST_OBJECT_LOCK (self); + g_value_set_object (value, self->priv->bus); + GST_OBJECT_UNLOCK (self); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); @@ -250,7 +288,20 @@ gst_net_client_clock_observe_times (GstNetClientClock * self, GstClockTime local_avg; gdouble r_squared; GstClock *clock; - GstClockTime rtt; + GstClockTime rtt, rtt_limit; + GstBus *bus = NULL; + /* Use for discont tracking */ + GstClockTime time_before = 0; + GstClockTime min_guess = 0; + GstClockTimeDiff time_discont; + gboolean synched; + GstClockTime internal_time, external_time, rate_num, rate_den; + + GST_OBJECT_LOCK (self); + rtt_limit = self->priv->roundtrip_limit; + if (self->priv->bus) + bus = gst_object_ref (self->priv->bus); + GST_OBJECT_UNLOCK (self); if (local_2 < local_1) { GST_LOG_OBJECT (self, "Dropping observation: receive time %" GST_TIME_FORMAT @@ -261,11 +312,10 @@ gst_net_client_clock_observe_times (GstNetClientClock * self, rtt = GST_CLOCK_DIFF (local_1, local_2); - if ((self->priv->roundtrip_limit > 0) && (rtt > self->priv->roundtrip_limit)) { + if ((rtt_limit > 0) && (rtt > rtt_limit)) { GST_LOG_OBJECT (self, "Dropping observation: RTT %" GST_TIME_FORMAT " > limit %" - GST_TIME_FORMAT, GST_TIME_ARGS (rtt), - GST_TIME_ARGS (self->priv->roundtrip_limit)); + GST_TIME_FORMAT, GST_TIME_ARGS (rtt), GST_TIME_ARGS (rtt_limit)); goto bogus_observation; } @@ -294,8 +344,23 @@ gst_net_client_clock_observe_times (GstNetClientClock * self, clock = GST_CLOCK_CAST (self); - if (gst_clock_add_observation (GST_CLOCK (self), local_avg, remote, - &r_squared)) { + /* Store what the clock produced as 'now' before this update */ + gst_clock_get_calibration (GST_CLOCK (self), &internal_time, &external_time, + &rate_num, &rate_den); + + min_guess = + gst_clock_adjust_with_calibration (GST_CLOCK (self), local_1, + internal_time, external_time, rate_num, rate_den); + time_before = + gst_clock_adjust_with_calibration (GST_CLOCK (self), local_2, + internal_time, external_time, rate_num, rate_den); + + /* If the remote observation was within our min/max estimates, we're synched */ + synched = (GST_CLOCK_DIFF (remote, min_guess) < 0 + && GST_CLOCK_DIFF (remote, time_before) > 0); + + if (gst_clock_add_observation_unapplied (GST_CLOCK (self), local_avg, remote, + &r_squared, &internal_time, &external_time, &rate_num, &rate_den)) { /* ghetto formula - shorter timeout for bad correlations */ current_timeout = (1e-3 / (1 - MIN (r_squared, 0.99999))) * GST_SECOND; current_timeout = MIN (current_timeout, gst_clock_get_timeout (clock)); @@ -303,12 +368,78 @@ gst_net_client_clock_observe_times (GstNetClientClock * self, current_timeout = 0; } + /* Now compare the difference (discont) in the clock + * after this observation */ + time_discont = GST_CLOCK_DIFF (time_before, + gst_clock_adjust_with_calibration (GST_CLOCK (self), local_2, + internal_time, external_time, rate_num, rate_den)); + + /* If we were in sync with the remote clock, clamp the allowed + * discontinuity to within quarter of one RTT. In sync means our send/receive estimates + * of remote time correctly windowed the actual remote time observation */ + if (synched && ABS (time_discont) > priv->rtt_avg / 4) { + GstClockTimeDiff offset; + GstClockTime max_discont = priv->rtt_avg / 4; + GST_LOG_OBJECT (clock, + "Too large a discont, clamping to 1/2 average RTT = %" GST_TIME_FORMAT, + GST_TIME_ARGS (max_discont)); + if (time_discont > 0) { /* Too large a forward step - add a -ve offset */ + offset = max_discont - time_discont; + if (-offset > external_time) + external_time = 0; + else + external_time += offset; + } else { /* Too large a backward step - add a +ve offset */ + offset = -(max_discont + time_discont); + external_time += offset; + } + + time_discont += offset; + } + + gst_clock_set_calibration (GST_CLOCK (self), internal_time, external_time, + rate_num, rate_den); + + if (bus) { + GstStructure *s; + GstMessage *msg; + + + s = gst_structure_new ("gst-netclock-statistics", + "synchronised", G_TYPE_BOOLEAN, synched, + "rtt", G_TYPE_UINT64, rtt, + "rtt-average", G_TYPE_UINT64, priv->rtt_avg, + "local", G_TYPE_UINT64, local_avg, + "remote", G_TYPE_UINT64, remote, + "discontinuity", G_TYPE_INT64, time_discont, + "remote-min-estimate", G_TYPE_UINT64, min_guess, + "remote-max-estimate", G_TYPE_UINT64, time_before, + "remote-min-error", G_TYPE_INT64, GST_CLOCK_DIFF (remote, min_guess), + "remote-max-error", G_TYPE_INT64, GST_CLOCK_DIFF (remote, time_before), + "request-send", G_TYPE_UINT64, local_1, + "request-receive", G_TYPE_UINT64, local_2, + "r-squared", G_TYPE_DOUBLE, r_squared, + "timeout", G_TYPE_UINT64, current_timeout, + "internal-time", G_TYPE_UINT64, internal_time, + "external-time", G_TYPE_UINT64, external_time, + "rate-num", G_TYPE_UINT64, rate_num, + "rate-den", G_TYPE_UINT64, rate_den, + "local-clock-offset", G_TYPE_INT64, GST_CLOCK_DIFF (internal_time, + external_time), NULL); + msg = gst_message_new_element (GST_OBJECT (self), s); + gst_bus_post (bus, msg); + } + GST_INFO ("next timeout: %" GST_TIME_FORMAT, GST_TIME_ARGS (current_timeout)); self->priv->timeout_expiration = gst_util_get_timestamp () + current_timeout; + if (bus) + gst_object_unref (bus); return; bogus_observation: + if (bus) + gst_object_unref (bus); /* Schedule a new packet again soon */ self->priv->timeout_expiration = gst_util_get_timestamp () + (GST_SECOND / 4); return; @@ -402,7 +533,6 @@ gst_net_client_clock_thread (gpointer data) } } } - GST_INFO_OBJECT (self, "shutting down net client clock thread"); return NULL; }