mirror of
https://gitlab.freedesktop.org/gstreamer/gstreamer.git
synced 2025-01-23 23:58:17 +00:00
netclientclock: simplify by using g_socket_condition_timed_wait()
No need to use a custom main context and custom timeout sources, just use g_socket_condition_timed_wait() instead, which was added for exactly this case. Also seems to help with the unit test deadlocking with glib 2.33.x https://bugzilla.gnome.org/show_bug.cgi?id=681575
This commit is contained in:
parent
690a20a54d
commit
fe082cbe24
1 changed files with 60 additions and 136 deletions
|
@ -248,70 +248,11 @@ bogus_observation:
|
|||
}
|
||||
}
|
||||
|
||||
typedef struct
|
||||
{
|
||||
GSource source;
|
||||
GstNetClientClock *clock;
|
||||
gboolean *p_timeout;
|
||||
} GstNetClientClockTimeoutSource;
|
||||
|
||||
static gboolean
|
||||
gst_net_client_clock_timeout_source_prepare (GSource * s, gint * p_timeout)
|
||||
{
|
||||
GstNetClientClockTimeoutSource *source = (GstNetClientClockTimeoutSource *) s;
|
||||
GstClockTime expiration_time = source->clock->priv->timeout_expiration;
|
||||
GstClockTime now = gst_util_get_timestamp ();
|
||||
|
||||
if (now >= expiration_time || (expiration_time - now) <= GST_MSECOND) {
|
||||
*p_timeout = 0;
|
||||
return TRUE;
|
||||
}
|
||||
|
||||
*p_timeout = (expiration_time - now) / GST_MSECOND;
|
||||
GST_TRACE_OBJECT (source->clock, "time out in %d ms please", *p_timeout);
|
||||
return FALSE;
|
||||
}
|
||||
|
||||
static gboolean
|
||||
gst_net_client_clock_timeout_source_check (GSource * s)
|
||||
{
|
||||
GstNetClientClockTimeoutSource *source = (GstNetClientClockTimeoutSource *) s;
|
||||
|
||||
return (gst_util_get_timestamp () >= source->clock->priv->timeout_expiration);
|
||||
}
|
||||
|
||||
static gboolean
|
||||
gst_net_client_clock_timeout_source_dispatch (GSource * s, GSourceFunc cb,
|
||||
gpointer data)
|
||||
{
|
||||
GstNetClientClockTimeoutSource *source = (GstNetClientClockTimeoutSource *) s;
|
||||
|
||||
GST_TRACE_OBJECT (source->clock, "timed out");
|
||||
*source->p_timeout = TRUE;
|
||||
return TRUE;
|
||||
}
|
||||
|
||||
static gboolean
|
||||
gst_net_client_clock_socket_cb (GSocket * socket, GIOCondition condition,
|
||||
gpointer user_data)
|
||||
{
|
||||
GIOCondition *p_cond = user_data;
|
||||
|
||||
GST_TRACE ("socket %p I/O condition: 0x%02x", socket, condition);
|
||||
*p_cond = condition;
|
||||
return TRUE;
|
||||
}
|
||||
|
||||
static gpointer
|
||||
gst_net_client_clock_thread (gpointer data)
|
||||
{
|
||||
GstNetClientClock *self = data;
|
||||
GstNetTimePacket *packet;
|
||||
GMainContext *ctx;
|
||||
GSourceFuncs funcs = { NULL, };
|
||||
GSource *source;
|
||||
GIOCondition cond;
|
||||
gboolean timeout;
|
||||
GSocket *socket = self->priv->socket;
|
||||
GError *err = NULL;
|
||||
GstClock *clock = data;
|
||||
|
@ -321,99 +262,82 @@ gst_net_client_clock_thread (gpointer data)
|
|||
g_socket_set_blocking (socket, TRUE);
|
||||
g_socket_set_timeout (socket, 0);
|
||||
|
||||
ctx = g_main_context_new ();
|
||||
|
||||
source = g_socket_create_source (socket, G_IO_IN, self->priv->cancel);
|
||||
g_source_set_name (source, "GStreamer net client clock thread socket");
|
||||
g_source_set_callback (source, (GSourceFunc) gst_net_client_clock_socket_cb,
|
||||
&cond, NULL);
|
||||
g_source_attach (source, ctx);
|
||||
g_source_unref (source);
|
||||
|
||||
/* GSocket only support second granularity for timeouts, so roll our own
|
||||
* timeout source (so we don't have to create a new source whenever the
|
||||
* timeout changes, as we would have to do with the default timeout source) */
|
||||
funcs.prepare = gst_net_client_clock_timeout_source_prepare;
|
||||
funcs.check = gst_net_client_clock_timeout_source_check;
|
||||
funcs.dispatch = gst_net_client_clock_timeout_source_dispatch;
|
||||
funcs.finalize = NULL;
|
||||
source = g_source_new (&funcs, sizeof (GstNetClientClockTimeoutSource));
|
||||
((GstNetClientClockTimeoutSource *) source)->clock = self;
|
||||
((GstNetClientClockTimeoutSource *) source)->p_timeout = &timeout;
|
||||
g_source_set_name (source, "GStreamer net client clock timeout");
|
||||
g_source_attach (source, ctx);
|
||||
g_source_unref (source);
|
||||
|
||||
while (!g_cancellable_is_cancelled (self->priv->cancel)) {
|
||||
cond = 0;
|
||||
timeout = FALSE;
|
||||
g_main_context_iteration (ctx, TRUE);
|
||||
GstClockTime expiration_time = self->priv->timeout_expiration;
|
||||
GstClockTime now = gst_util_get_timestamp ();
|
||||
gint64 socket_timeout;
|
||||
|
||||
if (g_cancellable_is_cancelled (self->priv->cancel))
|
||||
break;
|
||||
|
||||
if (timeout) {
|
||||
/* timed out, let's send another packet */
|
||||
GST_DEBUG_OBJECT (self, "timed out");
|
||||
|
||||
packet = gst_net_time_packet_new (NULL);
|
||||
|
||||
packet->local_time = gst_clock_get_internal_time (GST_CLOCK (self));
|
||||
|
||||
GST_DEBUG_OBJECT (self, "sending packet, local time = %" GST_TIME_FORMAT,
|
||||
GST_TIME_ARGS (packet->local_time));
|
||||
|
||||
gst_net_time_packet_send (packet, self->priv->socket,
|
||||
self->priv->servaddr, NULL);
|
||||
|
||||
g_free (packet);
|
||||
|
||||
/* reset timeout (but are expecting a response sooner anyway) */
|
||||
self->priv->timeout_expiration =
|
||||
gst_util_get_timestamp () + gst_clock_get_timeout (clock);
|
||||
continue;
|
||||
if (now >= expiration_time || (expiration_time - now) <= GST_MSECOND) {
|
||||
socket_timeout = 0;
|
||||
} else {
|
||||
socket_timeout = (expiration_time - now) / GST_USECOND;
|
||||
}
|
||||
|
||||
/* got data to read? */
|
||||
if ((cond & G_IO_IN)) {
|
||||
GST_TRACE_OBJECT (self, "time out in %d microsecs please", socket_timeout);
|
||||
|
||||
if (!g_socket_condition_timed_wait (socket, G_IO_IN, socket_timeout,
|
||||
self->priv->cancel, &err)) {
|
||||
/* cancelled, timeout or error */
|
||||
if (err->code == G_IO_ERROR_CANCELLED) {
|
||||
GST_INFO_OBJECT (self, "cancelled");
|
||||
g_clear_error (&err);
|
||||
break;
|
||||
} else if (err->code == G_IO_ERROR_TIMED_OUT) {
|
||||
/* timed out, let's send another packet */
|
||||
GST_DEBUG_OBJECT (self, "timed out");
|
||||
|
||||
packet = gst_net_time_packet_new (NULL);
|
||||
|
||||
packet->local_time = gst_clock_get_internal_time (GST_CLOCK (self));
|
||||
|
||||
GST_DEBUG_OBJECT (self,
|
||||
"sending packet, local time = %" GST_TIME_FORMAT,
|
||||
GST_TIME_ARGS (packet->local_time));
|
||||
|
||||
gst_net_time_packet_send (packet, self->priv->socket,
|
||||
self->priv->servaddr, NULL);
|
||||
|
||||
g_free (packet);
|
||||
|
||||
/* reset timeout (but are expecting a response sooner anyway) */
|
||||
self->priv->timeout_expiration =
|
||||
gst_util_get_timestamp () + gst_clock_get_timeout (clock);
|
||||
} else {
|
||||
GST_DEBUG_OBJECT (self, "socket error: %s", err->message);
|
||||
g_usleep (G_USEC_PER_SEC / 10); /* throttle */
|
||||
}
|
||||
g_clear_error (&err);
|
||||
} else {
|
||||
GstClockTime new_local;
|
||||
|
||||
/* got packet */
|
||||
|
||||
new_local = gst_clock_get_internal_time (GST_CLOCK (self));
|
||||
|
||||
packet = gst_net_time_packet_receive (socket, NULL, &err);
|
||||
|
||||
if (err != NULL) {
|
||||
if (packet != NULL) {
|
||||
GST_LOG_OBJECT (self, "got packet back");
|
||||
GST_LOG_OBJECT (self, "local_1 = %" GST_TIME_FORMAT,
|
||||
GST_TIME_ARGS (packet->local_time));
|
||||
GST_LOG_OBJECT (self, "remote = %" GST_TIME_FORMAT,
|
||||
GST_TIME_ARGS (packet->remote_time));
|
||||
GST_LOG_OBJECT (self, "local_2 = %" GST_TIME_FORMAT,
|
||||
GST_TIME_ARGS (new_local));
|
||||
|
||||
/* observe_times will reset the timeout */
|
||||
gst_net_client_clock_observe_times (self, packet->local_time,
|
||||
packet->remote_time, new_local);
|
||||
|
||||
g_free (packet);
|
||||
} else if (err != NULL) {
|
||||
GST_WARNING_OBJECT (self, "receive error: %s", err->message);
|
||||
g_error_free (err);
|
||||
err = NULL;
|
||||
continue;
|
||||
g_clear_error (&err);
|
||||
}
|
||||
|
||||
GST_LOG_OBJECT (self, "got packet back");
|
||||
GST_LOG_OBJECT (self, "local_1 = %" GST_TIME_FORMAT,
|
||||
GST_TIME_ARGS (packet->local_time));
|
||||
GST_LOG_OBJECT (self, "remote = %" GST_TIME_FORMAT,
|
||||
GST_TIME_ARGS (packet->remote_time));
|
||||
GST_LOG_OBJECT (self, "local_2 = %" GST_TIME_FORMAT,
|
||||
GST_TIME_ARGS (new_local));
|
||||
|
||||
/* observe_times will reset the timeout */
|
||||
gst_net_client_clock_observe_times (self, packet->local_time,
|
||||
packet->remote_time, new_local);
|
||||
|
||||
g_free (packet);
|
||||
continue;
|
||||
}
|
||||
|
||||
if ((cond & (G_IO_ERR | G_IO_HUP))) {
|
||||
GST_DEBUG_OBJECT (self, "socket error?! %s", g_strerror (errno));
|
||||
g_usleep (G_USEC_PER_SEC / 10);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
GST_INFO_OBJECT (self, "shutting down net client clock thread");
|
||||
g_main_context_unref (ctx);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue