oggdemux: do not send seek events from the streaming thread

This will usually deadlock, despite this patch being in master for
quite some time and working fine. Nevertheless, we deem it to be
not working, disregarding facts.

As such, we fix it by keeping track of seek events, and sending
them upstream from a separate thread. Buffers are then discarded
till we get a new segment with the expected seqnum.
This commit is contained in:
Vincent Penquerc'h 2015-03-05 17:42:53 +00:00
parent 5f3fc5db05
commit e77b60a73f
2 changed files with 113 additions and 30 deletions

View file

@ -1449,22 +1449,19 @@ gst_ogg_demux_seek_back_after_push_duration_check_unlock (GstOggDemux * ogg)
ogg->push_state = PUSH_PLAYING;
GST_PUSH_UNLOCK (ogg);
if (event) {
/* If there is one, perform it */
gst_ogg_demux_perform_seek_push (ogg, event);
} else {
/* If there wasn't, seek back at start to start normal playback */
/* If there is one, perform it. Otherwise, seek back at start to start
* normal playback */
if (!event) {
GST_INFO_OBJECT (ogg, "Seeking back to 0 after duration check");
event = gst_event_new_seek (1.0, GST_FORMAT_BYTES,
GST_SEEK_FLAG_ACCURATE | GST_SEEK_FLAG_FLUSH,
GST_SEEK_TYPE_SET, 1, GST_SEEK_TYPE_SET, GST_CLOCK_TIME_NONE);
if (!gst_pad_push_event (ogg->sinkpad, event)) {
GST_WARNING_OBJECT (ogg, "Failed seeking back to start");
return GST_FLOW_ERROR;
}
}
gst_event_replace (&ogg->seek_event, event);
GST_PUSH_UNLOCK (ogg);
g_mutex_lock (&ogg->seek_event_mutex);
g_cond_broadcast (&ogg->seek_event_cond);
g_mutex_unlock (&ogg->seek_event_mutex);
return GST_FLOW_OK;
}
@ -1594,7 +1591,6 @@ gst_ogg_pad_handle_push_mode_state (GstOggPad * pad, ogg_page * page)
GstClockTime t;
gint64 best = -1;
GstEvent *sevent;
int res;
gboolean close_enough;
float seek_quality;
@ -1791,13 +1787,11 @@ gst_ogg_pad_handle_push_mode_state (GstOggPad * pad, ogg_page * page)
GST_SEEK_TYPE_NONE, -1);
gst_event_set_seqnum (sevent, ogg->seqnum);
gst_event_replace (&ogg->seek_event, sevent);
GST_PUSH_UNLOCK (ogg);
res = gst_pad_push_event (ogg->sinkpad, sevent);
if (!res) {
/* We failed to send the seek event, notify the pipeline */
GST_ELEMENT_ERROR (ogg, RESOURCE, SEEK, (NULL), ("Failed to seek"));
return GST_FLOW_ERROR;
}
g_mutex_lock (&ogg->seek_event_mutex);
g_cond_broadcast (&ogg->seek_event_cond);
g_mutex_unlock (&ogg->seek_event_mutex);
return GST_FLOW_SKIP_PUSH;
}
@ -2313,12 +2307,21 @@ gst_ogg_demux_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
gst_ogg_demux_reset_streams (ogg);
}
}
if (!ogg->pullmode) {
if (ogg->seek_event_drop_till == gst_event_get_seqnum (event)) {
GST_DEBUG_OBJECT (ogg, "Got event seqnum %u, stopping dropping",
ogg->seek_event_drop_till);
ogg->seek_event_drop_till = 0;
}
}
GST_PUSH_UNLOCK (ogg);
} else {
GST_WARNING_OBJECT (ogg, "unexpected segment format: %s",
gst_format_get_name (segment.format));
}
}
gst_event_unref (event);
res = TRUE;
break;
@ -3536,7 +3539,6 @@ gst_ogg_demux_get_duration_push (GstOggDemux * ogg, int flags)
/* In push mode, we get to the end of the stream to get the duration */
gint64 position;
GstEvent *sevent;
gboolean res;
/* A full Ogg page can be almost 64 KB. There's no guarantee that there'll be a
granpos there, but it's fairly likely */
@ -3551,16 +3553,11 @@ gst_ogg_demux_get_duration_push (GstOggDemux * ogg, int flags)
/* do not read the last byte */
sevent = gst_event_new_seek (1.0, GST_FORMAT_BYTES, flags, GST_SEEK_TYPE_SET,
position, GST_SEEK_TYPE_SET, ogg->push_byte_length - 1);
res = gst_pad_push_event (ogg->sinkpad, sevent);
if (res) {
GST_DEBUG_OBJECT (ogg, "Seek succesful");
return TRUE;
} else {
GST_INFO_OBJECT (ogg, "Seek failed, duration will stay unknown");
ogg->push_state = PUSH_PLAYING;
ogg->push_disable_seeking = TRUE;
return FALSE;
}
gst_event_replace (&ogg->seek_event, sevent);
g_mutex_lock (&ogg->seek_event_mutex);
g_cond_broadcast (&ogg->seek_event_cond);
g_mutex_unlock (&ogg->seek_event_mutex);
return TRUE;
}
static gboolean
@ -3766,8 +3763,11 @@ gst_ogg_demux_perform_seek_push (GstOggDemux * ogg, GstEvent * event)
start_type, best, GST_SEEK_TYPE_NONE, -1);
gst_event_set_seqnum (sevent, gst_event_get_seqnum (event));
gst_event_replace (&ogg->seek_event, sevent);
GST_PUSH_UNLOCK (ogg);
res = gst_pad_push_event (ogg->sinkpad, sevent);
g_mutex_lock (&ogg->seek_event_mutex);
g_cond_broadcast (&ogg->seek_event_cond);
g_mutex_unlock (&ogg->seek_event_mutex);
return res;
@ -4434,9 +4434,19 @@ gst_ogg_demux_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
GstOggDemux *ogg;
gint ret = 0;
GstFlowReturn result = GST_FLOW_OK;
gboolean drop;
ogg = GST_OGG_DEMUX (parent);
GST_PUSH_LOCK (ogg);
drop = (ogg->seek_event_drop_till > 0);
GST_PUSH_UNLOCK (ogg);
if (drop) {
GST_ERROR_OBJECT (ogg, "Dropping buffer because we have a pending seek");
gst_buffer_unref (buffer);
return GST_FLOW_OK;
}
GST_DEBUG_OBJECT (ogg, "enter");
result = gst_ogg_demux_submit_buffer (ogg, buffer);
if (result < 0) {
@ -4743,6 +4753,55 @@ pause:
}
}
/* The sink pad task function for push mode.
* It just sends any seek events queued by the streaming thread.
*/
static gpointer
gst_ogg_demux_loop_push (GstOggDemux * ogg)
{
GstEvent *event;
while (1) {
g_mutex_lock (&ogg->seek_event_mutex);
if (ogg->seek_event_thread_stop) {
g_mutex_unlock (&ogg->seek_event_mutex);
break;
}
g_cond_wait (&ogg->seek_event_cond, &ogg->seek_event_mutex);
if (ogg->seek_event_thread_stop) {
g_mutex_unlock (&ogg->seek_event_mutex);
break;
}
g_mutex_unlock (&ogg->seek_event_mutex);
GST_PUSH_LOCK (ogg);
event = ogg->seek_event;
ogg->seek_event = NULL;
if (event) {
ogg->seek_event_drop_till = gst_event_get_seqnum (event);
}
GST_PUSH_UNLOCK (ogg);
if (!event)
continue;
GST_ERROR ("Pushing event %" GST_PTR_FORMAT, event);
if (!gst_pad_push_event (ogg->sinkpad, event)) {
GST_WARNING_OBJECT (ogg, "Failed to push event");
GST_PUSH_LOCK (ogg);
if (!ogg->pullmode) {
ogg->push_state = PUSH_PLAYING;
ogg->push_disable_seeking = TRUE;
}
GST_PUSH_UNLOCK (ogg);
} else {
GST_ERROR ("Pushed event ok");
}
}
gst_object_unref (ogg);
return NULL;
}
static void
gst_ogg_demux_clear_chains (GstOggDemux * ogg)
{
@ -4815,6 +4874,22 @@ gst_ogg_demux_sink_activate_mode (GstPad * sinkpad, GstObject * parent,
case GST_PAD_MODE_PUSH:
ogg->pullmode = FALSE;
ogg->resync = FALSE;
if (active) {
ogg->seek_event_thread_stop = FALSE;
g_mutex_init (&ogg->seek_event_mutex);
g_cond_init (&ogg->seek_event_cond);
ogg->seek_event_thread = g_thread_new ("seek_event_thread",
(GThreadFunc) gst_ogg_demux_loop_push, gst_object_ref (ogg));
} else {
g_mutex_lock (&ogg->seek_event_mutex);
ogg->seek_event_thread_stop = TRUE;
g_cond_broadcast (&ogg->seek_event_cond);
g_mutex_unlock (&ogg->seek_event_mutex);
g_thread_join (ogg->seek_event_thread);
g_cond_clear (&ogg->seek_event_cond);
g_mutex_clear (&ogg->seek_event_mutex);
ogg->seek_event_thread = NULL;
}
res = TRUE;
break;
case GST_PAD_MODE_PULL:

View file

@ -200,6 +200,14 @@ struct _GstOggDemux
/* ogg stuff */
ogg_sync_state sync;
long chunk_size;
/* Seek events set up by the streaming thread in push mode */
GstEvent *seek_event;
GThread *seek_event_thread;
GMutex seek_event_mutex;
GCond seek_event_cond;
gboolean seek_event_thread_stop;
guint32 seek_event_drop_till;
};
struct _GstOggDemuxClass