mirror of
https://gitlab.freedesktop.org/gstreamer/gstreamer.git
synced 2025-01-11 01:45:33 +00:00
netclock: Implement sending statistic bus messages and discont limits
Allow setting a GstBus on the network clock client via a new 'bus' object property. If a bus is set, the clock will output an element message containing statistics about new clock observations and the clock correlation. When the local clock is synchronised with the remote, limit the maximum jump in the clock at any point to be one average RTT to the server. Also, publish in the bus message whether we are synched with the remote or not.
This commit is contained in:
parent
4bc7d34f57
commit
1c9c9847fd
1 changed files with 138 additions and 8 deletions
|
@ -45,6 +45,10 @@
|
|||
*
|
||||
* A #GstNetClientClock is typically set on a #GstPipeline with
|
||||
* gst_pipeline_use_clock().
|
||||
*
|
||||
* If you set a #GstBus on the clock via the "bus" object property, it will
|
||||
* send @GST_MESSAGE_INFO messages with an attached #GstStructure containing
|
||||
* statistics about clock accuracy and network traffic.
|
||||
*/
|
||||
|
||||
#ifdef HAVE_CONFIG_H
|
||||
|
@ -69,7 +73,8 @@ enum
|
|||
PROP_0,
|
||||
PROP_ADDRESS,
|
||||
PROP_PORT,
|
||||
PROP_ROUNDTRIP_LIMIT
|
||||
PROP_ROUNDTRIP_LIMIT,
|
||||
PROP_BUS
|
||||
};
|
||||
|
||||
#define GST_NET_CLIENT_CLOCK_GET_PRIVATE(obj) \
|
||||
|
@ -89,6 +94,8 @@ struct _GstNetClientClockPrivate
|
|||
|
||||
gchar *address;
|
||||
gint port;
|
||||
|
||||
GstBus *bus;
|
||||
};
|
||||
|
||||
#define _do_init \
|
||||
|
@ -126,6 +133,10 @@ gst_net_client_clock_class_init (GstNetClientClockClass * klass)
|
|||
g_param_spec_int ("port", "port",
|
||||
"The port on which the remote server is listening", 0, G_MAXUINT16,
|
||||
DEFAULT_PORT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
|
||||
g_object_class_install_property (gobject_class, PROP_BUS,
|
||||
g_param_spec_object ("bus", "bus",
|
||||
"A GstBus on which to send clock status information", GST_TYPE_BUS,
|
||||
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
|
||||
|
||||
/**
|
||||
* GstNetClientClock::round-trip-limit:
|
||||
|
@ -191,6 +202,11 @@ gst_net_client_clock_finalize (GObject * object)
|
|||
self->priv->socket = NULL;
|
||||
}
|
||||
|
||||
if (self->priv->bus != NULL) {
|
||||
gst_object_unref (self->priv->bus);
|
||||
self->priv->bus = NULL;
|
||||
}
|
||||
|
||||
G_OBJECT_CLASS (parent_class)->finalize (object);
|
||||
}
|
||||
|
||||
|
@ -202,16 +218,29 @@ gst_net_client_clock_set_property (GObject * object, guint prop_id,
|
|||
|
||||
switch (prop_id) {
|
||||
case PROP_ADDRESS:
|
||||
GST_OBJECT_LOCK (self);
|
||||
g_free (self->priv->address);
|
||||
self->priv->address = g_value_dup_string (value);
|
||||
if (self->priv->address == NULL)
|
||||
self->priv->address = g_strdup (DEFAULT_ADDRESS);
|
||||
GST_OBJECT_UNLOCK (self);
|
||||
break;
|
||||
case PROP_PORT:
|
||||
GST_OBJECT_LOCK (self);
|
||||
self->priv->port = g_value_get_int (value);
|
||||
GST_OBJECT_UNLOCK (self);
|
||||
break;
|
||||
case PROP_ROUNDTRIP_LIMIT:
|
||||
GST_OBJECT_LOCK (self);
|
||||
self->priv->roundtrip_limit = g_value_get_uint64 (value);
|
||||
GST_OBJECT_UNLOCK (self);
|
||||
break;
|
||||
case PROP_BUS:
|
||||
GST_OBJECT_LOCK (self);
|
||||
if (self->priv->bus)
|
||||
gst_object_unref (self->priv->bus);
|
||||
self->priv->bus = g_value_dup_object (value);
|
||||
GST_OBJECT_UNLOCK (self);
|
||||
break;
|
||||
default:
|
||||
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
|
||||
|
@ -227,13 +256,22 @@ gst_net_client_clock_get_property (GObject * object, guint prop_id,
|
|||
|
||||
switch (prop_id) {
|
||||
case PROP_ADDRESS:
|
||||
GST_OBJECT_LOCK (self);
|
||||
g_value_set_string (value, self->priv->address);
|
||||
GST_OBJECT_UNLOCK (self);
|
||||
break;
|
||||
case PROP_PORT:
|
||||
g_value_set_int (value, self->priv->port);
|
||||
break;
|
||||
case PROP_ROUNDTRIP_LIMIT:
|
||||
GST_OBJECT_LOCK (self);
|
||||
g_value_set_uint64 (value, self->priv->roundtrip_limit);
|
||||
GST_OBJECT_UNLOCK (self);
|
||||
break;
|
||||
case PROP_BUS:
|
||||
GST_OBJECT_LOCK (self);
|
||||
g_value_set_object (value, self->priv->bus);
|
||||
GST_OBJECT_UNLOCK (self);
|
||||
break;
|
||||
default:
|
||||
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
|
||||
|
@ -250,7 +288,20 @@ gst_net_client_clock_observe_times (GstNetClientClock * self,
|
|||
GstClockTime local_avg;
|
||||
gdouble r_squared;
|
||||
GstClock *clock;
|
||||
GstClockTime rtt;
|
||||
GstClockTime rtt, rtt_limit;
|
||||
GstBus *bus = NULL;
|
||||
/* Use for discont tracking */
|
||||
GstClockTime time_before = 0;
|
||||
GstClockTime min_guess = 0;
|
||||
GstClockTimeDiff time_discont;
|
||||
gboolean synched;
|
||||
GstClockTime internal_time, external_time, rate_num, rate_den;
|
||||
|
||||
GST_OBJECT_LOCK (self);
|
||||
rtt_limit = self->priv->roundtrip_limit;
|
||||
if (self->priv->bus)
|
||||
bus = gst_object_ref (self->priv->bus);
|
||||
GST_OBJECT_UNLOCK (self);
|
||||
|
||||
if (local_2 < local_1) {
|
||||
GST_LOG_OBJECT (self, "Dropping observation: receive time %" GST_TIME_FORMAT
|
||||
|
@ -261,11 +312,10 @@ gst_net_client_clock_observe_times (GstNetClientClock * self,
|
|||
|
||||
rtt = GST_CLOCK_DIFF (local_1, local_2);
|
||||
|
||||
if ((self->priv->roundtrip_limit > 0) && (rtt > self->priv->roundtrip_limit)) {
|
||||
if ((rtt_limit > 0) && (rtt > rtt_limit)) {
|
||||
GST_LOG_OBJECT (self,
|
||||
"Dropping observation: RTT %" GST_TIME_FORMAT " > limit %"
|
||||
GST_TIME_FORMAT, GST_TIME_ARGS (rtt),
|
||||
GST_TIME_ARGS (self->priv->roundtrip_limit));
|
||||
GST_TIME_FORMAT, GST_TIME_ARGS (rtt), GST_TIME_ARGS (rtt_limit));
|
||||
goto bogus_observation;
|
||||
}
|
||||
|
||||
|
@ -294,8 +344,23 @@ gst_net_client_clock_observe_times (GstNetClientClock * self,
|
|||
|
||||
clock = GST_CLOCK_CAST (self);
|
||||
|
||||
if (gst_clock_add_observation (GST_CLOCK (self), local_avg, remote,
|
||||
&r_squared)) {
|
||||
/* Store what the clock produced as 'now' before this update */
|
||||
gst_clock_get_calibration (GST_CLOCK (self), &internal_time, &external_time,
|
||||
&rate_num, &rate_den);
|
||||
|
||||
min_guess =
|
||||
gst_clock_adjust_with_calibration (GST_CLOCK (self), local_1,
|
||||
internal_time, external_time, rate_num, rate_den);
|
||||
time_before =
|
||||
gst_clock_adjust_with_calibration (GST_CLOCK (self), local_2,
|
||||
internal_time, external_time, rate_num, rate_den);
|
||||
|
||||
/* If the remote observation was within our min/max estimates, we're synched */
|
||||
synched = (GST_CLOCK_DIFF (remote, min_guess) < 0
|
||||
&& GST_CLOCK_DIFF (remote, time_before) > 0);
|
||||
|
||||
if (gst_clock_add_observation_unapplied (GST_CLOCK (self), local_avg, remote,
|
||||
&r_squared, &internal_time, &external_time, &rate_num, &rate_den)) {
|
||||
/* ghetto formula - shorter timeout for bad correlations */
|
||||
current_timeout = (1e-3 / (1 - MIN (r_squared, 0.99999))) * GST_SECOND;
|
||||
current_timeout = MIN (current_timeout, gst_clock_get_timeout (clock));
|
||||
|
@ -303,12 +368,78 @@ gst_net_client_clock_observe_times (GstNetClientClock * self,
|
|||
current_timeout = 0;
|
||||
}
|
||||
|
||||
/* Now compare the difference (discont) in the clock
|
||||
* after this observation */
|
||||
time_discont = GST_CLOCK_DIFF (time_before,
|
||||
gst_clock_adjust_with_calibration (GST_CLOCK (self), local_2,
|
||||
internal_time, external_time, rate_num, rate_den));
|
||||
|
||||
/* If we were in sync with the remote clock, clamp the allowed
|
||||
* discontinuity to within quarter of one RTT. In sync means our send/receive estimates
|
||||
* of remote time correctly windowed the actual remote time observation */
|
||||
if (synched && ABS (time_discont) > priv->rtt_avg / 4) {
|
||||
GstClockTimeDiff offset;
|
||||
GstClockTime max_discont = priv->rtt_avg / 4;
|
||||
GST_LOG_OBJECT (clock,
|
||||
"Too large a discont, clamping to 1/2 average RTT = %" GST_TIME_FORMAT,
|
||||
GST_TIME_ARGS (max_discont));
|
||||
if (time_discont > 0) { /* Too large a forward step - add a -ve offset */
|
||||
offset = max_discont - time_discont;
|
||||
if (-offset > external_time)
|
||||
external_time = 0;
|
||||
else
|
||||
external_time += offset;
|
||||
} else { /* Too large a backward step - add a +ve offset */
|
||||
offset = -(max_discont + time_discont);
|
||||
external_time += offset;
|
||||
}
|
||||
|
||||
time_discont += offset;
|
||||
}
|
||||
|
||||
gst_clock_set_calibration (GST_CLOCK (self), internal_time, external_time,
|
||||
rate_num, rate_den);
|
||||
|
||||
if (bus) {
|
||||
GstStructure *s;
|
||||
GstMessage *msg;
|
||||
|
||||
|
||||
s = gst_structure_new ("gst-netclock-statistics",
|
||||
"synchronised", G_TYPE_BOOLEAN, synched,
|
||||
"rtt", G_TYPE_UINT64, rtt,
|
||||
"rtt-average", G_TYPE_UINT64, priv->rtt_avg,
|
||||
"local", G_TYPE_UINT64, local_avg,
|
||||
"remote", G_TYPE_UINT64, remote,
|
||||
"discontinuity", G_TYPE_INT64, time_discont,
|
||||
"remote-min-estimate", G_TYPE_UINT64, min_guess,
|
||||
"remote-max-estimate", G_TYPE_UINT64, time_before,
|
||||
"remote-min-error", G_TYPE_INT64, GST_CLOCK_DIFF (remote, min_guess),
|
||||
"remote-max-error", G_TYPE_INT64, GST_CLOCK_DIFF (remote, time_before),
|
||||
"request-send", G_TYPE_UINT64, local_1,
|
||||
"request-receive", G_TYPE_UINT64, local_2,
|
||||
"r-squared", G_TYPE_DOUBLE, r_squared,
|
||||
"timeout", G_TYPE_UINT64, current_timeout,
|
||||
"internal-time", G_TYPE_UINT64, internal_time,
|
||||
"external-time", G_TYPE_UINT64, external_time,
|
||||
"rate-num", G_TYPE_UINT64, rate_num,
|
||||
"rate-den", G_TYPE_UINT64, rate_den,
|
||||
"local-clock-offset", G_TYPE_INT64, GST_CLOCK_DIFF (internal_time,
|
||||
external_time), NULL);
|
||||
msg = gst_message_new_element (GST_OBJECT (self), s);
|
||||
gst_bus_post (bus, msg);
|
||||
}
|
||||
|
||||
GST_INFO ("next timeout: %" GST_TIME_FORMAT, GST_TIME_ARGS (current_timeout));
|
||||
self->priv->timeout_expiration = gst_util_get_timestamp () + current_timeout;
|
||||
|
||||
if (bus)
|
||||
gst_object_unref (bus);
|
||||
return;
|
||||
|
||||
bogus_observation:
|
||||
if (bus)
|
||||
gst_object_unref (bus);
|
||||
/* Schedule a new packet again soon */
|
||||
self->priv->timeout_expiration = gst_util_get_timestamp () + (GST_SECOND / 4);
|
||||
return;
|
||||
|
@ -402,7 +533,6 @@ gst_net_client_clock_thread (gpointer data)
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
GST_INFO_OBJECT (self, "shutting down net client clock thread");
|
||||
return NULL;
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue