From a6bb8338fdf3e04e1caba95637309e0b9ad08f51 Mon Sep 17 00:00:00 2001 From: Mark Nauwelaerts Date: Wed, 14 Apr 2010 11:53:46 +0200 Subject: [PATCH] matroskademux: implement push mode seeking --- gst/matroska/matroska-demux.c | 347 +++++++++++++++++++++++++++++++--- gst/matroska/matroska-demux.h | 10 +- 2 files changed, 325 insertions(+), 32 deletions(-) diff --git a/gst/matroska/matroska-demux.c b/gst/matroska/matroska-demux.c index 59b2cb1a5b..7feda07ae4 100644 --- a/gst/matroska/matroska-demux.c +++ b/gst/matroska/matroska-demux.c @@ -175,6 +175,8 @@ static GstCaps /* stream methods */ static void gst_matroska_demux_reset (GstElement * element); +static gboolean perform_seek_to_offset (GstMatroskaDemux * demux, + guint64 offset); GType gst_matroska_demux_get_type (void); GST_BOILERPLATE (GstMatroskaDemux, gst_matroska_demux, GstEbmlRead, @@ -408,6 +410,14 @@ gst_matroska_demux_reset (GstElement * element) demux->offset = 0; demux->cluster_time = GST_CLOCK_TIME_NONE; demux->cluster_offset = 0; + demux->index_offset = 0; + demux->seekable = FALSE; + demux->need_newsegment = FALSE; + demux->building_index = FALSE; + if (demux->seek_event) { + gst_event_unref (demux->seek_event); + demux->seek_event = NULL; + } demux->from_offset = -1; demux->to_offset = G_MAXINT64; @@ -1969,19 +1979,25 @@ gst_matroska_demux_query (GstMatroskaDemux * demux, GstPad * pad, break; } - case GST_QUERY_SEEKING:{ + case GST_QUERY_SEEKING: + { GstFormat fmt; - res = TRUE; gst_query_parse_seeking (query, &fmt, NULL, NULL, NULL); + if (fmt == GST_FORMAT_TIME) { + gboolean seekable; - if (fmt != GST_FORMAT_TIME || !demux->index) { - gst_query_set_seeking (query, fmt, FALSE, -1, -1); - } else { - gst_query_set_seeking (query, GST_FORMAT_TIME, TRUE, 0, - demux->segment.duration); + if (demux->streaming) { + /* assuming we'll be able to get an index ... */ + seekable = demux->seekable; + } else { + seekable = !!demux->index; + } + + gst_query_set_seeking (query, GST_FORMAT_TIME, seekable, + 0, demux->segment.duration); + res = TRUE; } - break; } default: @@ -2164,12 +2180,30 @@ gst_matroska_demux_get_seek_track (GstMatroskaDemux * demux, return track; } +static void +gst_matroska_demux_reset_streams (GstMatroskaDemux * demux, GstClockTime time, + gboolean full) +{ + gint i; + + GST_DEBUG_OBJECT (demux, "resetting stream state"); + + g_assert (demux->src->len == demux->num_streams); + for (i = 0; i < demux->src->len; i++) { + GstMatroskaTrackContext *context = g_ptr_array_index (demux->src, i); + context->pos = time; + context->set_discont = TRUE; + context->eos = FALSE; + context->from_time = GST_CLOCK_TIME_NONE; + if (full) + context->last_flow = GST_FLOW_OK; + } +} + static gboolean gst_matroska_demux_move_to_entry (GstMatroskaDemux * demux, GstMatroskaIndex * entry, gboolean reset) { - gint i; - GST_OBJECT_LOCK (demux); /* seek (relative to matroska segment) */ @@ -2185,15 +2219,7 @@ gst_matroska_demux_move_to_entry (GstMatroskaDemux * demux, entry->block, GST_TIME_ARGS (entry->time)); /* update the time */ - g_assert (demux->src->len == demux->num_streams); - for (i = 0; i < demux->src->len; i++) { - GstMatroskaTrackContext *context = g_ptr_array_index (demux->src, i); - context->pos = entry->time; - context->set_discont = TRUE; - context->last_flow = GST_FLOW_OK; - context->eos = FALSE; - context->from_time = GST_CLOCK_TIME_NONE; - } + gst_matroska_demux_reset_streams (demux, entry->time, TRUE); demux->segment.last_stop = entry->time; demux->seek_block = entry->block; demux->last_stop_end = GST_CLOCK_TIME_NONE; @@ -2230,12 +2256,6 @@ gst_matroska_demux_handle_seek_event (GstMatroskaDemux * demux, GstSegment seeksegment = { 0, }; gboolean update; - /* no seeking until we are (safely) ready */ - if (demux->state != GST_MATROSKA_DEMUX_STATE_DATA) { - GST_DEBUG_OBJECT (demux, "not ready for seeking yet"); - return FALSE; - } - if (pad) track = gst_pad_get_element_private (pad); @@ -2274,6 +2294,14 @@ gst_matroska_demux_handle_seek_event (GstMatroskaDemux * demux, GST_DEBUG_OBJECT (demux, "Seek position looks sane"); GST_OBJECT_UNLOCK (demux); + if (demux->streaming) { + /* need to seek to cluster start to pick up cluster time */ + /* upstream takes care of flushing and all that + * ... and newsegment event handling takes care of the rest */ + return perform_seek_to_offset (demux, + entry->pos + demux->ebml_segment_start); + } + flush = !!(flags & GST_SEEK_FLAG_FLUSH); keyunit = !!(flags & GST_SEEK_FLAG_KEY_UNIT); @@ -2364,6 +2392,90 @@ seek_error: } } +/* + * Handle whether we can perform the seek event or if we have to let the chain + * function handle seeks to build the seek indexes first. + */ +static gboolean +gst_matroska_demux_handle_seek_push (GstMatroskaDemux * demux, GstPad * pad, + GstEvent * event) +{ + GstSeekFlags flags; + GstSeekType cur_type, stop_type; + GstFormat format; + gdouble rate; + gint64 cur, stop; + + gst_event_parse_seek (event, &rate, &format, &flags, &cur_type, &cur, + &stop_type, &stop); + + /* sanity checks */ + + /* we can only seek on time */ + if (format != GST_FORMAT_TIME) { + GST_DEBUG_OBJECT (demux, "Can only seek on TIME"); + return FALSE; + } + + if (stop_type != GST_SEEK_TYPE_NONE && stop != GST_CLOCK_TIME_NONE) { + GST_DEBUG_OBJECT (demux, "Seek end-time not supported in streaming mode"); + return FALSE; + } + + if (!(flags & GST_SEEK_FLAG_FLUSH)) { + GST_DEBUG_OBJECT (demux, + "Non-flushing seek not supported in streaming mode"); + return FALSE; + } + + if (flags & GST_SEEK_FLAG_SEGMENT) { + GST_DEBUG_OBJECT (demux, "Segment seek not supported in streaming mode"); + return FALSE; + } + + /* check for having parsed index already */ + if (!demux->index_parsed) { + guint64 offset; + gboolean building_index; + + if (!demux->index_offset) { + GST_DEBUG_OBJECT (demux, "no index (location); no seek in push mode"); + return FALSE; + } + + GST_OBJECT_LOCK (demux); + /* handle the seek event in the chain function */ + demux->state = GST_MATROSKA_DEMUX_STATE_SEEK; + /* no more seek can be issued until state reset to _DATA */ + + /* copy the event */ + if (demux->seek_event) + gst_event_unref (demux->seek_event); + demux->seek_event = gst_event_ref (event); + + /* set the building_index flag so that only one thread can setup the + * structures for index seeking. */ + building_index = demux->building_index; + if (!building_index) { + demux->building_index = TRUE; + offset = demux->index_offset; + } + GST_OBJECT_UNLOCK (demux); + + if (!building_index) { + /* seek to the first subindex or legacy index */ + GST_INFO_OBJECT (demux, "Seeking to Cues at %" G_GUINT64_FORMAT, offset); + return perform_seek_to_offset (demux, offset); + } + + /* well, we are handling it already */ + return TRUE; + } + + /* delegate to tweaked regular seek */ + return gst_matroska_demux_handle_seek_event (demux, pad, event); +} + static gboolean gst_matroska_demux_handle_src_event (GstPad * pad, GstEvent * event) { @@ -2372,7 +2484,15 @@ gst_matroska_demux_handle_src_event (GstPad * pad, GstEvent * event) switch (GST_EVENT_TYPE (event)) { case GST_EVENT_SEEK: - res = gst_matroska_demux_handle_seek_event (demux, pad, event); + /* no seeking until we are (safely) ready */ + if (demux->state != GST_MATROSKA_DEMUX_STATE_DATA) { + GST_DEBUG_OBJECT (demux, "not ready for seeking yet"); + return FALSE; + } + if (!demux->streaming) + res = gst_matroska_demux_handle_seek_event (demux, pad, event); + else + res = gst_matroska_demux_handle_seek_push (demux, pad, event); gst_event_unref (event); break; @@ -4464,6 +4584,22 @@ gst_matroska_demux_parse_blockgroup_or_simpleblock (GstMatroskaDemux * demux, lace_time = GST_CLOCK_TIME_NONE; } + /* need to refresh segment info ASAP */ + if (GST_CLOCK_TIME_IS_VALID (lace_time) && demux->need_newsegment) { + GST_DEBUG_OBJECT (demux, + "generating segment starting at %" GST_TIME_FORMAT, + GST_TIME_ARGS (lace_time)); + /* pretend we seeked here */ + gst_segment_set_seek (&demux->segment, demux->segment.rate, + GST_FORMAT_TIME, 0, GST_SEEK_TYPE_SET, lace_time, + GST_SEEK_TYPE_SET, GST_CLOCK_TIME_NONE, NULL); + /* now convey our segment notion downstream */ + gst_matroska_demux_send_event (demux, gst_event_new_new_segment (FALSE, + demux->segment.rate, demux->segment.format, demux->segment.start, + demux->segment.stop, demux->segment.start)); + demux->need_newsegment = FALSE; + } + if (block_duration) { if (stream->timecodescale == 1.0) duration = block_duration * demux->time_scale; @@ -4902,6 +5038,16 @@ gst_matroska_demux_parse_contents_seekentry (GstMatroskaDemux * demux) break; } + /* only pick up index location when streaming */ + if (demux->streaming) { + if (seek_id == GST_MATROSKA_ID_CUES) { + demux->index_offset = seek_pos + demux->ebml_segment_start; + GST_DEBUG_OBJECT (demux, "Cues located at offset %" G_GUINT64_FORMAT, + demux->index_offset); + } + break; + } + /* seek */ if (gst_ebml_read_seek (ebml, seek_pos + demux->ebml_segment_start) != GST_FLOW_OK) @@ -5404,6 +5550,66 @@ pause: } } +/* + * Create and push a flushing seek event upstream + */ +static gboolean +perform_seek_to_offset (GstMatroskaDemux * demux, guint64 offset) +{ + GstEvent *event; + gboolean res = 0; + + GST_DEBUG_OBJECT (demux, "Seeking to %" G_GUINT64_FORMAT, offset); + + event = + gst_event_new_seek (1.0, GST_FORMAT_BYTES, + GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_ACCURATE, GST_SEEK_TYPE_SET, offset, + GST_SEEK_TYPE_NONE, -1); + + res = gst_pad_push_event (demux->sinkpad, event); + + /* newsegment event will update offset */ + return res; +} + +static void +gst_matroska_demux_check_seekability (GstMatroskaDemux * demux) +{ + GstQuery *query; + gboolean seekable = FALSE; + gint64 start = -1, stop = -1; + + query = gst_query_new_seeking (GST_FORMAT_BYTES); + if (!gst_pad_peer_query (demux->sinkpad, query)) { + GST_DEBUG_OBJECT (demux, "seeking query failed"); + goto done; + } + + gst_query_parse_seeking (query, NULL, &seekable, &start, &stop); + + /* try harder to query upstream size if we didn't get it the first time */ + if (seekable && stop == -1) { + GstFormat fmt = GST_FORMAT_BYTES; + + GST_DEBUG_OBJECT (demux, "doing duration query to fix up unset stop"); + gst_pad_query_peer_duration (demux->sinkpad, &fmt, &stop); + } + + /* if upstream doesn't know the size, it's likely that it's not seekable in + * practice even if it technically may be seekable */ + if (seekable && (start != 0 || stop <= start)) { + GST_DEBUG_OBJECT (demux, "seekable but unknown start/stop -> disable"); + seekable = FALSE; + } + +done: + GST_INFO_OBJECT (demux, "seekable: %d (%" G_GUINT64_FORMAT " - %" + G_GUINT64_FORMAT ")", seekable, start, stop); + demux->seekable = seekable; + + gst_query_unref (query); +} + static inline void gst_matroska_demux_take (GstMatroskaDemux * demux, guint bytes) { @@ -5538,6 +5744,12 @@ gst_matroska_demux_chain (GstPad * pad, GstBuffer * buffer) guint64 length; const gchar *name; + if (G_UNLIKELY (GST_BUFFER_IS_DISCONT (buffer))) { + GST_DEBUG_OBJECT (demux, "got DISCONT"); + gst_adapter_clear (demux->adapter); + gst_matroska_demux_reset_streams (demux, GST_CLOCK_TIME_NONE, FALSE); + } + gst_adapter_push (demux->adapter, buffer); buffer = NULL; @@ -5577,6 +5789,7 @@ next: if (ret != GST_FLOW_OK) goto parse_failed; demux->state = GST_MATROSKA_DEMUX_STATE_HEADER; + gst_matroska_demux_check_seekability (demux); break; default: goto invalid_header; @@ -5585,6 +5798,7 @@ next: break; case GST_MATROSKA_DEMUX_STATE_HEADER: case GST_MATROSKA_DEMUX_STATE_DATA: + case GST_MATROSKA_DEMUX_STATE_SEEK: switch (id) { case GST_MATROSKA_ID_SEGMENT: /* eat segment prefix */ @@ -5678,11 +5892,38 @@ next: name = "Cues"; goto skip; case GST_MATROSKA_ID_SEEKHEAD: - name = "SeekHead"; - goto skip; + gst_matroska_demux_take (demux, length + needed); + DEBUG_ELEMENT_START (demux, ebml, "SeekHead"); + if ((ret = gst_ebml_read_master (ebml, &id)) == GST_FLOW_OK) + ret = gst_matroska_demux_parse_contents (demux); + if (ret != GST_FLOW_OK) + goto exit; + break; case GST_MATROSKA_ID_CUES: - name = "Cues"; - goto skip; + if (demux->index_parsed) { + gst_matroska_demux_flush (demux, needed + length); + break; + } + gst_matroska_demux_take (demux, length + needed); + ret = gst_matroska_demux_parse_index (demux); + if (demux->state == GST_MATROSKA_DEMUX_STATE_SEEK) { + GstEvent *event; + + GST_OBJECT_LOCK (demux); + event = demux->seek_event; + demux->seek_event = NULL; + GST_OBJECT_UNLOCK (demux); + + g_assert (event); + /* unlikely to fail, since we managed to seek to this point */ + if (!gst_matroska_demux_handle_seek_event (demux, NULL, event)) + goto seek_failed; + /* resume data handling, main thread clear to seek again */ + GST_OBJECT_LOCK (demux); + demux->state = GST_MATROSKA_DEMUX_STATE_DATA; + GST_OBJECT_UNLOCK (demux); + } + break; default: name = "Unknown"; skip: @@ -5719,6 +5960,11 @@ invalid_header: GST_ELEMENT_ERROR (demux, STREAM, DEMUX, (NULL), ("Invalid header")); return GST_FLOW_ERROR; } +seek_failed: + { + GST_ELEMENT_ERROR (demux, STREAM, DEMUX, (NULL), ("Failed to seek")); + return GST_FLOW_ERROR; + } } static gboolean @@ -5749,7 +5995,33 @@ gst_matroska_demux_handle_sink_event (GstPad * pad, GstEvent * event) "received format %d newsegment %" GST_SEGMENT_FORMAT, format, &segment); - /* chain will send initial newsegment after pads have been added */ + if (demux->state < GST_MATROSKA_DEMUX_STATE_DATA) { + GST_DEBUG_OBJECT (demux, "still starting"); + goto exit; + } + + /* we only expect a BYTE segment, e.g. following a seek */ + if (format != GST_FORMAT_BYTES) { + GST_DEBUG_OBJECT (demux, "unsupported segment format, ignoring"); + goto exit; + } + + GST_DEBUG_OBJECT (demux, "clearing segment state"); + /* clear current segment leftover */ + gst_adapter_clear (demux->adapter); + /* and some streaming setup */ + demux->offset = start; + /* do not know where we are; + * need to come across a cluster and generate newsegment */ + demux->segment.last_stop = GST_CLOCK_TIME_NONE; + demux->cluster_time = GST_CLOCK_TIME_NONE; + demux->cluster_offset = 0; + demux->need_newsegment = TRUE; + /* but keep some of the upstream segment */ + demux->segment.rate = rate; + exit: + /* chain will send initial newsegment after pads have been added, + * or otherwise come up with one */ GST_DEBUG_OBJECT (demux, "eating event"); gst_event_unref (event); res = TRUE; @@ -5769,6 +6041,15 @@ gst_matroska_demux_handle_sink_event (GstPad * pad, GstEvent * event) } break; } + case GST_EVENT_FLUSH_STOP: + { + gst_adapter_clear (demux->adapter); + gst_matroska_demux_reset_streams (demux, GST_CLOCK_TIME_NONE, TRUE); + demux->segment.last_stop = GST_CLOCK_TIME_NONE; + demux->cluster_time = GST_CLOCK_TIME_NONE; + demux->cluster_offset = 0; + /* fall-through */ + } default: res = gst_pad_event_default (pad, event); break; @@ -5780,11 +6061,15 @@ gst_matroska_demux_handle_sink_event (GstPad * pad, GstEvent * event) static gboolean gst_matroska_demux_sink_activate (GstPad * sinkpad) { + GstMatroskaDemux *demux = GST_MATROSKA_DEMUX (GST_PAD_PARENT (sinkpad)); + if (gst_pad_check_pull_range (sinkpad)) { GST_DEBUG ("going to pull mode"); + demux->streaming = FALSE; return gst_pad_activate_pull (sinkpad, TRUE); } else { GST_DEBUG ("going to push (streaming) mode"); + demux->streaming = TRUE; return gst_pad_activate_push (sinkpad, TRUE); } diff --git a/gst/matroska/matroska-demux.h b/gst/matroska/matroska-demux.h index 24d72eebf9..6c14d79aae 100644 --- a/gst/matroska/matroska-demux.h +++ b/gst/matroska/matroska-demux.h @@ -44,7 +44,8 @@ G_BEGIN_DECLS typedef enum { GST_MATROSKA_DEMUX_STATE_START, GST_MATROSKA_DEMUX_STATE_HEADER, - GST_MATROSKA_DEMUX_STATE_DATA + GST_MATROSKA_DEMUX_STATE_DATA, + GST_MATROSKA_DEMUX_STATE_SEEK } GstMatroskaDemuxState; typedef struct _GstMatroskaDemux { @@ -70,6 +71,7 @@ typedef struct _GstMatroskaDemux { gint64 created; /* state */ + gboolean streaming; GstMatroskaDemuxState state; guint level_up; guint64 seek_block; @@ -105,6 +107,12 @@ typedef struct _GstMatroskaDemux { /* some state saving */ GstClockTime cluster_time; guint64 cluster_offset; + /* index stuff */ + gboolean seekable; + gboolean building_index; + guint64 index_offset; + GstEvent *seek_event; + gboolean need_newsegment; /* reverse playback */ GArray *seek_index;