From e5045b33d5d9171b14ea6aad7de2b606a07209b6 Mon Sep 17 00:00:00 2001 From: Thibault Saunier Date: Sat, 22 Jun 2019 23:49:50 -0400 Subject: [PATCH] nlesource: Wait for the seek to actualy happen before removing the probe Make sure that an event resulting from the seek happens before removing the pad probe, dropping anything while it is not the case. This guarantees that the seek happens before `nlesource` outputs anything. This was not necessary as with decodebin or usual source flushing seeks lead to synchronous flush_start/flush_stop and we could safely assume that once the seek is sent, it was happenning. With nested `nlecomposition` this assumption is simply not true as in the composition seeks are basically cached and happen later in the composition updating thread. This fixes races where we ended up removing the blocking probe before the seek actually started to be executed in the nlecomposition nested inside an nlesource which leaded to data from *before* the seek to be outputed which means we could display wrong frames, and it was leading to interesting deadlocks. --- plugins/nle/nlesource.c | 69 ++++++++++++++++++++++++++++++++--------- 1 file changed, 54 insertions(+), 15 deletions(-) diff --git a/plugins/nle/nlesource.c b/plugins/nle/nlesource.c index 439dc3800a..36ad2779f9 100644 --- a/plugins/nle/nlesource.c +++ b/plugins/nle/nlesource.c @@ -60,6 +60,7 @@ struct _NleSourcePrivate GMutex seek_lock; GstEvent *seek_event; + guint32 flush_seqnum; gulong probeid; }; @@ -116,6 +117,24 @@ nle_source_class_init (NleSourceClass * klass) } +static GstPadProbeReturn +srcpad_probe_cb (GstPad * pad, GstPadProbeInfo * info, NleSource * source) +{ + GstEvent *event = info->data; + + switch (GST_EVENT_TYPE (event)) { + case GST_EVENT_SEEK: + GST_OBJECT_LOCK (source); + source->priv->flush_seqnum = GST_EVENT_SEQNUM (event); + GST_DEBUG_OBJECT (pad, "Seek seqnum: %d", source->priv->flush_seqnum); + GST_OBJECT_UNLOCK (source); + break; + default: + break; + } + + return GST_PAD_PROBE_OK; +} static void nle_source_init (NleSource * source) @@ -125,6 +144,10 @@ nle_source_init (NleSource * source) source->priv = nle_source_get_instance_private (source); g_mutex_init (&source->priv->seek_lock); + gst_pad_add_probe (NLE_OBJECT_SRC (source), + GST_PAD_PROBE_TYPE_EVENT_UPSTREAM, (GstPadProbeCallback) srcpad_probe_cb, + source, NULL); + GST_DEBUG_OBJECT (source, "Setting GstBin async-handling to TRUE"); g_object_set (G_OBJECT (source), "async-handling", TRUE, NULL); } @@ -441,34 +464,49 @@ ghost_seek_pad (GstElement * source, gpointer user_data) GstEvent *seek_event = priv->seek_event; priv->seek_event = NULL; + GST_INFO_OBJECT (source, "Sending seek: %" GST_PTR_FORMAT, seek_event); + GST_OBJECT_LOCK (source); + priv->flush_seqnum = GST_EVENT_SEQNUM (seek_event); + GST_OBJECT_UNLOCK (source); if (!(gst_pad_send_event (priv->ghostedpad, seek_event))) GST_ELEMENT_ERROR (source, RESOURCE, SEEK, (NULL), ("Sending initial seek to upstream element failed")); } g_mutex_unlock (&priv->seek_lock); - - GST_OBJECT_LOCK (source); - if (priv->probeid) { - GST_DEBUG_OBJECT (source, "Removing blocking probe! %lu", priv->probeid); - priv->areblocked = FALSE; - gst_pad_remove_probe (priv->ghostedpad, priv->probeid); - priv->probeid = 0; - } - GST_OBJECT_UNLOCK (source); } static GstPadProbeReturn -pad_blocked_cb (GstPad * pad, GstPadProbeInfo * info, NleSource * source) +pad_brobe_cb (GstPad * pad, GstPadProbeInfo * info, NleSource * source) { + NleSourcePrivate *priv = source->priv; + GstPadProbeReturn res = GST_PAD_PROBE_OK; + GST_OBJECT_LOCK (source); - if (!source->priv->areblocked) { + if (!priv->areblocked) { GST_INFO_OBJECT (pad, "Blocked now, launching seek"); + priv->areblocked = TRUE; gst_element_call_async (GST_ELEMENT (source), ghost_seek_pad, NULL, NULL); - source->priv->areblocked = TRUE; + GST_OBJECT_UNLOCK (source); + + return GST_PAD_PROBE_OK; + } + + if (priv->probeid && GST_EVENT_SEQNUM (info->data) == priv->flush_seqnum) { + GST_INFO_OBJECT (source, "Seeking now done: %" GST_PTR_FORMAT + " - %d ? %d", info->data, GST_EVENT_SEQNUM (info->data), + priv->flush_seqnum); + priv->flush_seqnum = GST_SEQNUM_INVALID; + priv->areblocked = FALSE; + priv->probeid = 0; + res = GST_PAD_PROBE_REMOVE; + } else { + res = GST_PAD_PROBE_DROP; + GST_DEBUG_OBJECT (source, "Dropping %" GST_PTR_FORMAT " - %d ? %d", + info->data, GST_EVENT_SEQNUM (info->data), priv->flush_seqnum); } GST_OBJECT_UNLOCK (source); - return GST_PAD_PROBE_OK; + return res; } static gboolean @@ -508,8 +546,9 @@ nle_source_prepare (NleObject * object) priv->ghostedpad = pad; GST_OBJECT_LOCK (source); priv->probeid = gst_pad_add_probe (pad, - GST_PAD_PROBE_TYPE_BLOCK_DOWNSTREAM, - (GstPadProbeCallback) pad_blocked_cb, source, NULL); + GST_PAD_PROBE_TYPE_BLOCK_DOWNSTREAM | GST_PAD_PROBE_TYPE_EVENT_FLUSH | + GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM, (GstPadProbeCallback) pad_brobe_cb, + source, NULL); GST_OBJECT_UNLOCK (source); gst_object_unref (pad); }