oggdemux: Move seeking in pull mode to the streaming thread

Flushing and teering down the streaming thread from the seeking thread
and simply letting the streaming thread handle the seek event in its
loop function.

Fixes https://gitlab.freedesktop.org/gstreamer/gst-plugins-base/issues/639
This commit is contained in:
Thibault Saunier 2019-08-29 11:16:39 -04:00 committed by Thibault Saunier
parent 41bad88d6d
commit 51e49ab96b
2 changed files with 87 additions and 55 deletions

View file

@ -3516,7 +3516,7 @@ gst_ogg_demux_perform_seek_pull (GstOggDemux * ogg, GstEvent * event)
{
GstOggChain *chain = NULL;
gboolean res;
gboolean flush, accurate, keyframe;
gboolean accurate, keyframe;
GstFormat format;
gdouble rate;
GstSeekFlags flags;
@ -3524,7 +3524,6 @@ gst_ogg_demux_perform_seek_pull (GstOggDemux * ogg, GstEvent * event)
gint64 start, stop;
gboolean update;
guint32 seqnum;
GstEvent *tevent;
if (event) {
GST_DEBUG_OBJECT (ogg, "seek with event");
@ -3548,41 +3547,10 @@ gst_ogg_demux_perform_seek_pull (GstOggDemux * ogg, GstEvent * event)
GST_DEBUG_OBJECT (ogg, "seek, rate %g", rate);
flush = flags & GST_SEEK_FLAG_FLUSH;
accurate = flags & GST_SEEK_FLAG_ACCURATE;
keyframe = flags & GST_SEEK_FLAG_KEY_UNIT;
/* first step is to unlock the streaming thread if it is
* blocked in a chain call, we do this by starting the flush. because
* we cannot yet hold any streaming lock, we have to protect the chains
* with their own lock. */
if (flush) {
gint i;
tevent = gst_event_new_flush_start ();
gst_event_set_seqnum (tevent, seqnum);
gst_event_ref (tevent);
gst_pad_push_event (ogg->sinkpad, tevent);
GST_CHAIN_LOCK (ogg);
for (i = 0; i < ogg->chains->len; i++) {
GstOggChain *chain = g_array_index (ogg->chains, GstOggChain *, i);
gint j;
for (j = 0; j < chain->streams->len; j++) {
GstOggPad *pad = g_array_index (chain->streams, GstOggPad *, j);
gst_event_ref (tevent);
gst_pad_push_event (GST_PAD (pad), tevent);
}
}
GST_CHAIN_UNLOCK (ogg);
gst_event_unref (tevent);
} else {
gst_pad_pause_task (ogg->sinkpad);
}
gst_pad_pause_task (ogg->sinkpad);
/* now grab the stream lock so that streaming cannot continue, for
* non flushing seeks when the element is in PAUSED this could block
@ -3598,14 +3566,6 @@ gst_ogg_demux_perform_seek_pull (GstOggDemux * ogg, GstEvent * event)
GST_TIME_FORMAT, GST_TIME_ARGS (ogg->segment.start),
GST_TIME_ARGS (ogg->segment.stop));
/* we need to stop flushing on the srcpad as we're going to use it
* next. We can do this as we have the STREAM lock now. */
if (flush) {
tevent = gst_event_new_flush_stop (TRUE);
gst_event_set_seqnum (tevent, seqnum);
gst_pad_push_event (ogg->sinkpad, tevent);
}
{
gint i;
@ -3640,13 +3600,6 @@ gst_ogg_demux_perform_seek_pull (GstOggDemux * ogg, GstEvent * event)
gint64 position, begin_time;
GstSegment segment;
/* we have to send the flush to the old chain, not the new one */
if (flush) {
tevent = gst_event_new_flush_stop (TRUE);
gst_event_set_seqnum (tevent, seqnum);
gst_ogg_demux_send_event (ogg, tevent);
}
/* we need this to see how far inside the chain we need to start */
if (chain->begin_time != GST_CLOCK_TIME_NONE)
begin_time = chain->begin_time;
@ -3725,19 +3678,24 @@ gst_ogg_demux_perform_seek_pull (GstOggDemux * ogg, GstEvent * event)
/* streaming can continue now */
GST_PAD_STREAM_UNLOCK (ogg->sinkpad);
done:
if (event)
gst_event_unref (event);
return res;
/* ERRORS */
error:
{
GST_DEBUG_OBJECT (ogg, "seek failed");
return FALSE;
res = FALSE;
goto done;
}
no_chain:
{
GST_DEBUG_OBJECT (ogg, "no chain to seek in");
GST_PAD_STREAM_UNLOCK (ogg->sinkpad);
return FALSE;
res = FALSE;
goto done;
}
}
@ -3992,13 +3950,80 @@ error_locked:
goto error;
}
static gboolean
gst_ogg_demux_setup_seek_pull (GstOggDemux * ogg, GstEvent * event)
{
gboolean flush;
GstSeekFlags flags;
GstEvent *tevent;
guint32 seqnum = gst_event_get_seqnum (event);
GST_DEBUG_OBJECT (ogg, "Scheduling seek: %" GST_PTR_FORMAT, event);
gst_event_parse_seek (event, NULL, NULL, &flags, NULL, NULL, NULL, NULL);
flush = flags & GST_SEEK_FLAG_FLUSH;
/* first step is to unlock the streaming thread if it is
* blocked in a chain call, we do this by starting the flush. because
* we cannot yet hold any streaming lock, we have to protect the chains
* with their own lock. */
if (flush) {
gint i;
tevent = gst_event_new_flush_start ();
gst_event_set_seqnum (tevent, seqnum);
gst_event_ref (tevent);
gst_pad_push_event (ogg->sinkpad, tevent);
GST_CHAIN_LOCK (ogg);
for (i = 0; i < ogg->chains->len; i++) {
GstOggChain *chain = g_array_index (ogg->chains, GstOggChain *, i);
gint j;
for (j = 0; j < chain->streams->len; j++) {
GstOggPad *pad = g_array_index (chain->streams, GstOggPad *, j);
gst_event_ref (tevent);
gst_pad_push_event (GST_PAD (pad), tevent);
}
}
GST_CHAIN_UNLOCK (ogg);
gst_event_unref (tevent);
}
gst_pad_pause_task (ogg->sinkpad);
/* now grab the stream lock so that streaming cannot continue, for
* non flushing seeks when the element is in PAUSED this could block
* forever. */
GST_PAD_STREAM_LOCK (ogg->sinkpad);
/* we need to stop flushing on the sinkpad as we're going to use it
* next. We can do this as we have the STREAM lock now. */
if (flush) {
tevent = gst_event_new_flush_stop (TRUE);
gst_event_set_seqnum (tevent, seqnum);
gst_pad_push_event (ogg->sinkpad, gst_event_ref (tevent));
gst_ogg_demux_send_event (ogg, tevent);
}
gst_event_replace (&ogg->seek_event, event);
gst_pad_start_task (ogg->sinkpad, (GstTaskFunction) gst_ogg_demux_loop,
ogg->sinkpad, NULL);
GST_PAD_STREAM_UNLOCK (ogg->sinkpad);
return TRUE;
}
static gboolean
gst_ogg_demux_perform_seek (GstOggDemux * ogg, GstEvent * event)
{
gboolean res;
if (ogg->pullmode) {
res = gst_ogg_demux_perform_seek_pull (ogg, event);
res = gst_ogg_demux_setup_seek_pull (ogg, event);
} else {
res = gst_ogg_demux_perform_seek_push (ogg, event);
}
@ -4862,12 +4887,15 @@ static void
gst_ogg_demux_loop (GstOggPad * pad)
{
GstOggDemux *ogg;
gboolean res;
GstFlowReturn ret;
GstEvent *seek;
ogg = GST_OGG_DEMUX (GST_OBJECT_PARENT (pad));
seek = ogg->seek_event;
ogg->seek_event = NULL;
if (ogg->need_chains) {
gboolean res;
/* this is the only place where we write chains and thus need to lock. */
GST_CHAIN_LOCK (ogg);
@ -4883,8 +4911,12 @@ gst_ogg_demux_loop (GstOggPad * pad)
GST_OBJECT_UNLOCK (ogg);
/* and seek to configured positions without FLUSH */
res = gst_ogg_demux_perform_seek_pull (ogg, NULL);
res = gst_ogg_demux_perform_seek_pull (ogg, seek);
if (!res)
goto seek_failed;
} else if (seek) {
res = gst_ogg_demux_perform_seek_pull (ogg, seek);
if (!res)
goto seek_failed;
}

View file

@ -203,7 +203,7 @@ struct _GstOggDemux
ogg_sync_state sync;
long chunk_size;
/* Seek events set up by the streaming thread in push mode */
/* Seek events set up by the streaming thread */
GstEvent *seek_event;
GThread *seek_event_thread;
GMutex seek_event_mutex;