nlesource: Wait for the seek to actualy happen before removing the probe

Make sure that an event resulting from the seek happens before removing
the pad probe, dropping anything while it is not the case.

This guarantees that the seek happens before `nlesource` outputs
anything. This was not necessary as with decodebin or usual source
flushing seeks lead to synchronous flush_start/flush_stop and we could
safely assume that once the seek is sent, it was happenning.
With nested `nlecomposition` this assumption is simply not true as
in the composition seeks are basically cached and happen later in
the composition updating thread.

This fixes races where we ended up removing the blocking probe before
the seek actually started to be executed in the nlecomposition
nested inside an nlesource which leaded to data from *before* the seek
to be outputed which means we could display wrong frames,
and it was leading to interesting deadlocks.
This commit is contained in:
Thibault Saunier 2019-06-22 23:49:50 -04:00
parent e97c90fa40
commit e5045b33d5

View file

@ -60,6 +60,7 @@ struct _NleSourcePrivate
GMutex seek_lock; GMutex seek_lock;
GstEvent *seek_event; GstEvent *seek_event;
guint32 flush_seqnum;
gulong probeid; gulong probeid;
}; };
@ -116,6 +117,24 @@ nle_source_class_init (NleSourceClass * klass)
} }
static GstPadProbeReturn
srcpad_probe_cb (GstPad * pad, GstPadProbeInfo * info, NleSource * source)
{
GstEvent *event = info->data;
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_SEEK:
GST_OBJECT_LOCK (source);
source->priv->flush_seqnum = GST_EVENT_SEQNUM (event);
GST_DEBUG_OBJECT (pad, "Seek seqnum: %d", source->priv->flush_seqnum);
GST_OBJECT_UNLOCK (source);
break;
default:
break;
}
return GST_PAD_PROBE_OK;
}
static void static void
nle_source_init (NleSource * source) nle_source_init (NleSource * source)
@ -125,6 +144,10 @@ nle_source_init (NleSource * source)
source->priv = nle_source_get_instance_private (source); source->priv = nle_source_get_instance_private (source);
g_mutex_init (&source->priv->seek_lock); g_mutex_init (&source->priv->seek_lock);
gst_pad_add_probe (NLE_OBJECT_SRC (source),
GST_PAD_PROBE_TYPE_EVENT_UPSTREAM, (GstPadProbeCallback) srcpad_probe_cb,
source, NULL);
GST_DEBUG_OBJECT (source, "Setting GstBin async-handling to TRUE"); GST_DEBUG_OBJECT (source, "Setting GstBin async-handling to TRUE");
g_object_set (G_OBJECT (source), "async-handling", TRUE, NULL); g_object_set (G_OBJECT (source), "async-handling", TRUE, NULL);
} }
@ -441,34 +464,49 @@ ghost_seek_pad (GstElement * source, gpointer user_data)
GstEvent *seek_event = priv->seek_event; GstEvent *seek_event = priv->seek_event;
priv->seek_event = NULL; priv->seek_event = NULL;
GST_INFO_OBJECT (source, "Sending seek: %" GST_PTR_FORMAT, seek_event);
GST_OBJECT_LOCK (source);
priv->flush_seqnum = GST_EVENT_SEQNUM (seek_event);
GST_OBJECT_UNLOCK (source);
if (!(gst_pad_send_event (priv->ghostedpad, seek_event))) if (!(gst_pad_send_event (priv->ghostedpad, seek_event)))
GST_ELEMENT_ERROR (source, RESOURCE, SEEK, GST_ELEMENT_ERROR (source, RESOURCE, SEEK,
(NULL), ("Sending initial seek to upstream element failed")); (NULL), ("Sending initial seek to upstream element failed"));
} }
g_mutex_unlock (&priv->seek_lock); g_mutex_unlock (&priv->seek_lock);
GST_OBJECT_LOCK (source);
if (priv->probeid) {
GST_DEBUG_OBJECT (source, "Removing blocking probe! %lu", priv->probeid);
priv->areblocked = FALSE;
gst_pad_remove_probe (priv->ghostedpad, priv->probeid);
priv->probeid = 0;
}
GST_OBJECT_UNLOCK (source);
} }
static GstPadProbeReturn static GstPadProbeReturn
pad_blocked_cb (GstPad * pad, GstPadProbeInfo * info, NleSource * source) pad_brobe_cb (GstPad * pad, GstPadProbeInfo * info, NleSource * source)
{ {
NleSourcePrivate *priv = source->priv;
GstPadProbeReturn res = GST_PAD_PROBE_OK;
GST_OBJECT_LOCK (source); GST_OBJECT_LOCK (source);
if (!source->priv->areblocked) { if (!priv->areblocked) {
GST_INFO_OBJECT (pad, "Blocked now, launching seek"); GST_INFO_OBJECT (pad, "Blocked now, launching seek");
priv->areblocked = TRUE;
gst_element_call_async (GST_ELEMENT (source), ghost_seek_pad, NULL, NULL); gst_element_call_async (GST_ELEMENT (source), ghost_seek_pad, NULL, NULL);
source->priv->areblocked = TRUE; GST_OBJECT_UNLOCK (source);
return GST_PAD_PROBE_OK;
}
if (priv->probeid && GST_EVENT_SEQNUM (info->data) == priv->flush_seqnum) {
GST_INFO_OBJECT (source, "Seeking now done: %" GST_PTR_FORMAT
" - %d ? %d", info->data, GST_EVENT_SEQNUM (info->data),
priv->flush_seqnum);
priv->flush_seqnum = GST_SEQNUM_INVALID;
priv->areblocked = FALSE;
priv->probeid = 0;
res = GST_PAD_PROBE_REMOVE;
} else {
res = GST_PAD_PROBE_DROP;
GST_DEBUG_OBJECT (source, "Dropping %" GST_PTR_FORMAT " - %d ? %d",
info->data, GST_EVENT_SEQNUM (info->data), priv->flush_seqnum);
} }
GST_OBJECT_UNLOCK (source); GST_OBJECT_UNLOCK (source);
return GST_PAD_PROBE_OK; return res;
} }
static gboolean static gboolean
@ -508,8 +546,9 @@ nle_source_prepare (NleObject * object)
priv->ghostedpad = pad; priv->ghostedpad = pad;
GST_OBJECT_LOCK (source); GST_OBJECT_LOCK (source);
priv->probeid = gst_pad_add_probe (pad, priv->probeid = gst_pad_add_probe (pad,
GST_PAD_PROBE_TYPE_BLOCK_DOWNSTREAM, GST_PAD_PROBE_TYPE_BLOCK_DOWNSTREAM | GST_PAD_PROBE_TYPE_EVENT_FLUSH |
(GstPadProbeCallback) pad_blocked_cb, source, NULL); GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM, (GstPadProbeCallback) pad_brobe_cb,
source, NULL);
GST_OBJECT_UNLOCK (source); GST_OBJECT_UNLOCK (source);
gst_object_unref (pad); gst_object_unref (pad);
} }