diff --git a/plugins/nle/nlesource.c b/plugins/nle/nlesource.c index 439dc3800a..36ad2779f9 100644 --- a/plugins/nle/nlesource.c +++ b/plugins/nle/nlesource.c @@ -60,6 +60,7 @@ struct _NleSourcePrivate GMutex seek_lock; GstEvent *seek_event; + guint32 flush_seqnum; gulong probeid; }; @@ -116,6 +117,24 @@ nle_source_class_init (NleSourceClass * klass) } +static GstPadProbeReturn +srcpad_probe_cb (GstPad * pad, GstPadProbeInfo * info, NleSource * source) +{ + GstEvent *event = info->data; + + switch (GST_EVENT_TYPE (event)) { + case GST_EVENT_SEEK: + GST_OBJECT_LOCK (source); + source->priv->flush_seqnum = GST_EVENT_SEQNUM (event); + GST_DEBUG_OBJECT (pad, "Seek seqnum: %d", source->priv->flush_seqnum); + GST_OBJECT_UNLOCK (source); + break; + default: + break; + } + + return GST_PAD_PROBE_OK; +} static void nle_source_init (NleSource * source) @@ -125,6 +144,10 @@ nle_source_init (NleSource * source) source->priv = nle_source_get_instance_private (source); g_mutex_init (&source->priv->seek_lock); + gst_pad_add_probe (NLE_OBJECT_SRC (source), + GST_PAD_PROBE_TYPE_EVENT_UPSTREAM, (GstPadProbeCallback) srcpad_probe_cb, + source, NULL); + GST_DEBUG_OBJECT (source, "Setting GstBin async-handling to TRUE"); g_object_set (G_OBJECT (source), "async-handling", TRUE, NULL); } @@ -441,34 +464,49 @@ ghost_seek_pad (GstElement * source, gpointer user_data) GstEvent *seek_event = priv->seek_event; priv->seek_event = NULL; + GST_INFO_OBJECT (source, "Sending seek: %" GST_PTR_FORMAT, seek_event); + GST_OBJECT_LOCK (source); + priv->flush_seqnum = GST_EVENT_SEQNUM (seek_event); + GST_OBJECT_UNLOCK (source); if (!(gst_pad_send_event (priv->ghostedpad, seek_event))) GST_ELEMENT_ERROR (source, RESOURCE, SEEK, (NULL), ("Sending initial seek to upstream element failed")); } g_mutex_unlock (&priv->seek_lock); - - GST_OBJECT_LOCK (source); - if (priv->probeid) { - GST_DEBUG_OBJECT (source, "Removing blocking probe! %lu", priv->probeid); - priv->areblocked = FALSE; - gst_pad_remove_probe (priv->ghostedpad, priv->probeid); - priv->probeid = 0; - } - GST_OBJECT_UNLOCK (source); } static GstPadProbeReturn -pad_blocked_cb (GstPad * pad, GstPadProbeInfo * info, NleSource * source) +pad_brobe_cb (GstPad * pad, GstPadProbeInfo * info, NleSource * source) { + NleSourcePrivate *priv = source->priv; + GstPadProbeReturn res = GST_PAD_PROBE_OK; + GST_OBJECT_LOCK (source); - if (!source->priv->areblocked) { + if (!priv->areblocked) { GST_INFO_OBJECT (pad, "Blocked now, launching seek"); + priv->areblocked = TRUE; gst_element_call_async (GST_ELEMENT (source), ghost_seek_pad, NULL, NULL); - source->priv->areblocked = TRUE; + GST_OBJECT_UNLOCK (source); + + return GST_PAD_PROBE_OK; + } + + if (priv->probeid && GST_EVENT_SEQNUM (info->data) == priv->flush_seqnum) { + GST_INFO_OBJECT (source, "Seeking now done: %" GST_PTR_FORMAT + " - %d ? %d", info->data, GST_EVENT_SEQNUM (info->data), + priv->flush_seqnum); + priv->flush_seqnum = GST_SEQNUM_INVALID; + priv->areblocked = FALSE; + priv->probeid = 0; + res = GST_PAD_PROBE_REMOVE; + } else { + res = GST_PAD_PROBE_DROP; + GST_DEBUG_OBJECT (source, "Dropping %" GST_PTR_FORMAT " - %d ? %d", + info->data, GST_EVENT_SEQNUM (info->data), priv->flush_seqnum); } GST_OBJECT_UNLOCK (source); - return GST_PAD_PROBE_OK; + return res; } static gboolean @@ -508,8 +546,9 @@ nle_source_prepare (NleObject * object) priv->ghostedpad = pad; GST_OBJECT_LOCK (source); priv->probeid = gst_pad_add_probe (pad, - GST_PAD_PROBE_TYPE_BLOCK_DOWNSTREAM, - (GstPadProbeCallback) pad_blocked_cb, source, NULL); + GST_PAD_PROBE_TYPE_BLOCK_DOWNSTREAM | GST_PAD_PROBE_TYPE_EVENT_FLUSH | + GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM, (GstPadProbeCallback) pad_brobe_cb, + source, NULL); GST_OBJECT_UNLOCK (source); gst_object_unref (pad); }