From e77b60a73f7a879e67d6f8c6ca69479118eb87a8 Mon Sep 17 00:00:00 2001 From: Vincent Penquerc'h Date: Thu, 5 Mar 2015 17:42:53 +0000 Subject: [PATCH] oggdemux: do not send seek events from the streaming thread This will usually deadlock, despite this patch being in master for quite some time and working fine. Nevertheless, we deem it to be not working, disregarding facts. As such, we fix it by keeping track of seek events, and sending them upstream from a separate thread. Buffers are then discarded till we get a new segment with the expected seqnum. --- ext/ogg/gstoggdemux.c | 135 ++++++++++++++++++++++++++++++++---------- ext/ogg/gstoggdemux.h | 8 +++ 2 files changed, 113 insertions(+), 30 deletions(-) diff --git a/ext/ogg/gstoggdemux.c b/ext/ogg/gstoggdemux.c index e1b913203f..88d8d14ba0 100644 --- a/ext/ogg/gstoggdemux.c +++ b/ext/ogg/gstoggdemux.c @@ -1449,22 +1449,19 @@ gst_ogg_demux_seek_back_after_push_duration_check_unlock (GstOggDemux * ogg) ogg->push_state = PUSH_PLAYING; - GST_PUSH_UNLOCK (ogg); - - if (event) { - /* If there is one, perform it */ - gst_ogg_demux_perform_seek_push (ogg, event); - } else { - /* If there wasn't, seek back at start to start normal playback */ + /* If there is one, perform it. Otherwise, seek back at start to start + * normal playback */ + if (!event) { GST_INFO_OBJECT (ogg, "Seeking back to 0 after duration check"); event = gst_event_new_seek (1.0, GST_FORMAT_BYTES, GST_SEEK_FLAG_ACCURATE | GST_SEEK_FLAG_FLUSH, GST_SEEK_TYPE_SET, 1, GST_SEEK_TYPE_SET, GST_CLOCK_TIME_NONE); - if (!gst_pad_push_event (ogg->sinkpad, event)) { - GST_WARNING_OBJECT (ogg, "Failed seeking back to start"); - return GST_FLOW_ERROR; - } } + gst_event_replace (&ogg->seek_event, event); + GST_PUSH_UNLOCK (ogg); + g_mutex_lock (&ogg->seek_event_mutex); + g_cond_broadcast (&ogg->seek_event_cond); + g_mutex_unlock (&ogg->seek_event_mutex); return GST_FLOW_OK; } @@ -1594,7 +1591,6 @@ gst_ogg_pad_handle_push_mode_state (GstOggPad * pad, ogg_page * page) GstClockTime t; gint64 best = -1; GstEvent *sevent; - int res; gboolean close_enough; float seek_quality; @@ -1791,13 +1787,11 @@ gst_ogg_pad_handle_push_mode_state (GstOggPad * pad, ogg_page * page) GST_SEEK_TYPE_NONE, -1); gst_event_set_seqnum (sevent, ogg->seqnum); + gst_event_replace (&ogg->seek_event, sevent); GST_PUSH_UNLOCK (ogg); - res = gst_pad_push_event (ogg->sinkpad, sevent); - if (!res) { - /* We failed to send the seek event, notify the pipeline */ - GST_ELEMENT_ERROR (ogg, RESOURCE, SEEK, (NULL), ("Failed to seek")); - return GST_FLOW_ERROR; - } + g_mutex_lock (&ogg->seek_event_mutex); + g_cond_broadcast (&ogg->seek_event_cond); + g_mutex_unlock (&ogg->seek_event_mutex); return GST_FLOW_SKIP_PUSH; } @@ -2313,12 +2307,21 @@ gst_ogg_demux_sink_event (GstPad * pad, GstObject * parent, GstEvent * event) gst_ogg_demux_reset_streams (ogg); } } + + if (!ogg->pullmode) { + if (ogg->seek_event_drop_till == gst_event_get_seqnum (event)) { + GST_DEBUG_OBJECT (ogg, "Got event seqnum %u, stopping dropping", + ogg->seek_event_drop_till); + ogg->seek_event_drop_till = 0; + } + } GST_PUSH_UNLOCK (ogg); } else { GST_WARNING_OBJECT (ogg, "unexpected segment format: %s", gst_format_get_name (segment.format)); } } + gst_event_unref (event); res = TRUE; break; @@ -3536,7 +3539,6 @@ gst_ogg_demux_get_duration_push (GstOggDemux * ogg, int flags) /* In push mode, we get to the end of the stream to get the duration */ gint64 position; GstEvent *sevent; - gboolean res; /* A full Ogg page can be almost 64 KB. There's no guarantee that there'll be a granpos there, but it's fairly likely */ @@ -3551,16 +3553,11 @@ gst_ogg_demux_get_duration_push (GstOggDemux * ogg, int flags) /* do not read the last byte */ sevent = gst_event_new_seek (1.0, GST_FORMAT_BYTES, flags, GST_SEEK_TYPE_SET, position, GST_SEEK_TYPE_SET, ogg->push_byte_length - 1); - res = gst_pad_push_event (ogg->sinkpad, sevent); - if (res) { - GST_DEBUG_OBJECT (ogg, "Seek succesful"); - return TRUE; - } else { - GST_INFO_OBJECT (ogg, "Seek failed, duration will stay unknown"); - ogg->push_state = PUSH_PLAYING; - ogg->push_disable_seeking = TRUE; - return FALSE; - } + gst_event_replace (&ogg->seek_event, sevent); + g_mutex_lock (&ogg->seek_event_mutex); + g_cond_broadcast (&ogg->seek_event_cond); + g_mutex_unlock (&ogg->seek_event_mutex); + return TRUE; } static gboolean @@ -3766,8 +3763,11 @@ gst_ogg_demux_perform_seek_push (GstOggDemux * ogg, GstEvent * event) start_type, best, GST_SEEK_TYPE_NONE, -1); gst_event_set_seqnum (sevent, gst_event_get_seqnum (event)); + gst_event_replace (&ogg->seek_event, sevent); GST_PUSH_UNLOCK (ogg); - res = gst_pad_push_event (ogg->sinkpad, sevent); + g_mutex_lock (&ogg->seek_event_mutex); + g_cond_broadcast (&ogg->seek_event_cond); + g_mutex_unlock (&ogg->seek_event_mutex); return res; @@ -4434,9 +4434,19 @@ gst_ogg_demux_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer) GstOggDemux *ogg; gint ret = 0; GstFlowReturn result = GST_FLOW_OK; + gboolean drop; ogg = GST_OGG_DEMUX (parent); + GST_PUSH_LOCK (ogg); + drop = (ogg->seek_event_drop_till > 0); + GST_PUSH_UNLOCK (ogg); + if (drop) { + GST_ERROR_OBJECT (ogg, "Dropping buffer because we have a pending seek"); + gst_buffer_unref (buffer); + return GST_FLOW_OK; + } + GST_DEBUG_OBJECT (ogg, "enter"); result = gst_ogg_demux_submit_buffer (ogg, buffer); if (result < 0) { @@ -4743,6 +4753,55 @@ pause: } } +/* The sink pad task function for push mode. + * It just sends any seek events queued by the streaming thread. + */ +static gpointer +gst_ogg_demux_loop_push (GstOggDemux * ogg) +{ + GstEvent *event; + + while (1) { + g_mutex_lock (&ogg->seek_event_mutex); + if (ogg->seek_event_thread_stop) { + g_mutex_unlock (&ogg->seek_event_mutex); + break; + } + g_cond_wait (&ogg->seek_event_cond, &ogg->seek_event_mutex); + if (ogg->seek_event_thread_stop) { + g_mutex_unlock (&ogg->seek_event_mutex); + break; + } + g_mutex_unlock (&ogg->seek_event_mutex); + + GST_PUSH_LOCK (ogg); + event = ogg->seek_event; + ogg->seek_event = NULL; + if (event) { + ogg->seek_event_drop_till = gst_event_get_seqnum (event); + } + GST_PUSH_UNLOCK (ogg); + + if (!event) + continue; + + GST_ERROR ("Pushing event %" GST_PTR_FORMAT, event); + if (!gst_pad_push_event (ogg->sinkpad, event)) { + GST_WARNING_OBJECT (ogg, "Failed to push event"); + GST_PUSH_LOCK (ogg); + if (!ogg->pullmode) { + ogg->push_state = PUSH_PLAYING; + ogg->push_disable_seeking = TRUE; + } + GST_PUSH_UNLOCK (ogg); + } else { + GST_ERROR ("Pushed event ok"); + } + } + gst_object_unref (ogg); + return NULL; +} + static void gst_ogg_demux_clear_chains (GstOggDemux * ogg) { @@ -4815,6 +4874,22 @@ gst_ogg_demux_sink_activate_mode (GstPad * sinkpad, GstObject * parent, case GST_PAD_MODE_PUSH: ogg->pullmode = FALSE; ogg->resync = FALSE; + if (active) { + ogg->seek_event_thread_stop = FALSE; + g_mutex_init (&ogg->seek_event_mutex); + g_cond_init (&ogg->seek_event_cond); + ogg->seek_event_thread = g_thread_new ("seek_event_thread", + (GThreadFunc) gst_ogg_demux_loop_push, gst_object_ref (ogg)); + } else { + g_mutex_lock (&ogg->seek_event_mutex); + ogg->seek_event_thread_stop = TRUE; + g_cond_broadcast (&ogg->seek_event_cond); + g_mutex_unlock (&ogg->seek_event_mutex); + g_thread_join (ogg->seek_event_thread); + g_cond_clear (&ogg->seek_event_cond); + g_mutex_clear (&ogg->seek_event_mutex); + ogg->seek_event_thread = NULL; + } res = TRUE; break; case GST_PAD_MODE_PULL: diff --git a/ext/ogg/gstoggdemux.h b/ext/ogg/gstoggdemux.h index bb7948a606..742392f143 100644 --- a/ext/ogg/gstoggdemux.h +++ b/ext/ogg/gstoggdemux.h @@ -200,6 +200,14 @@ struct _GstOggDemux /* ogg stuff */ ogg_sync_state sync; long chunk_size; + + /* Seek events set up by the streaming thread in push mode */ + GstEvent *seek_event; + GThread *seek_event_thread; + GMutex seek_event_mutex; + GCond seek_event_cond; + gboolean seek_event_thread_stop; + guint32 seek_event_drop_till; }; struct _GstOggDemuxClass