matroskademux: implement push mode seeking

This commit is contained in:
Mark Nauwelaerts 2010-04-14 11:53:46 +02:00
parent e79f7beba6
commit a6bb8338fd
2 changed files with 325 additions and 32 deletions

View file

@ -175,6 +175,8 @@ static GstCaps
/* stream methods */
static void gst_matroska_demux_reset (GstElement * element);
static gboolean perform_seek_to_offset (GstMatroskaDemux * demux,
guint64 offset);
GType gst_matroska_demux_get_type (void);
GST_BOILERPLATE (GstMatroskaDemux, gst_matroska_demux, GstEbmlRead,
@ -408,6 +410,14 @@ gst_matroska_demux_reset (GstElement * element)
demux->offset = 0;
demux->cluster_time = GST_CLOCK_TIME_NONE;
demux->cluster_offset = 0;
demux->index_offset = 0;
demux->seekable = FALSE;
demux->need_newsegment = FALSE;
demux->building_index = FALSE;
if (demux->seek_event) {
gst_event_unref (demux->seek_event);
demux->seek_event = NULL;
}
demux->from_offset = -1;
demux->to_offset = G_MAXINT64;
@ -1969,19 +1979,25 @@ gst_matroska_demux_query (GstMatroskaDemux * demux, GstPad * pad,
break;
}
case GST_QUERY_SEEKING:{
case GST_QUERY_SEEKING:
{
GstFormat fmt;
res = TRUE;
gst_query_parse_seeking (query, &fmt, NULL, NULL, NULL);
if (fmt == GST_FORMAT_TIME) {
gboolean seekable;
if (fmt != GST_FORMAT_TIME || !demux->index) {
gst_query_set_seeking (query, fmt, FALSE, -1, -1);
} else {
gst_query_set_seeking (query, GST_FORMAT_TIME, TRUE, 0,
demux->segment.duration);
if (demux->streaming) {
/* assuming we'll be able to get an index ... */
seekable = demux->seekable;
} else {
seekable = !!demux->index;
}
gst_query_set_seeking (query, GST_FORMAT_TIME, seekable,
0, demux->segment.duration);
res = TRUE;
}
break;
}
default:
@ -2164,12 +2180,30 @@ gst_matroska_demux_get_seek_track (GstMatroskaDemux * demux,
return track;
}
static void
gst_matroska_demux_reset_streams (GstMatroskaDemux * demux, GstClockTime time,
gboolean full)
{
gint i;
GST_DEBUG_OBJECT (demux, "resetting stream state");
g_assert (demux->src->len == demux->num_streams);
for (i = 0; i < demux->src->len; i++) {
GstMatroskaTrackContext *context = g_ptr_array_index (demux->src, i);
context->pos = time;
context->set_discont = TRUE;
context->eos = FALSE;
context->from_time = GST_CLOCK_TIME_NONE;
if (full)
context->last_flow = GST_FLOW_OK;
}
}
static gboolean
gst_matroska_demux_move_to_entry (GstMatroskaDemux * demux,
GstMatroskaIndex * entry, gboolean reset)
{
gint i;
GST_OBJECT_LOCK (demux);
/* seek (relative to matroska segment) */
@ -2185,15 +2219,7 @@ gst_matroska_demux_move_to_entry (GstMatroskaDemux * demux,
entry->block, GST_TIME_ARGS (entry->time));
/* update the time */
g_assert (demux->src->len == demux->num_streams);
for (i = 0; i < demux->src->len; i++) {
GstMatroskaTrackContext *context = g_ptr_array_index (demux->src, i);
context->pos = entry->time;
context->set_discont = TRUE;
context->last_flow = GST_FLOW_OK;
context->eos = FALSE;
context->from_time = GST_CLOCK_TIME_NONE;
}
gst_matroska_demux_reset_streams (demux, entry->time, TRUE);
demux->segment.last_stop = entry->time;
demux->seek_block = entry->block;
demux->last_stop_end = GST_CLOCK_TIME_NONE;
@ -2230,12 +2256,6 @@ gst_matroska_demux_handle_seek_event (GstMatroskaDemux * demux,
GstSegment seeksegment = { 0, };
gboolean update;
/* no seeking until we are (safely) ready */
if (demux->state != GST_MATROSKA_DEMUX_STATE_DATA) {
GST_DEBUG_OBJECT (demux, "not ready for seeking yet");
return FALSE;
}
if (pad)
track = gst_pad_get_element_private (pad);
@ -2274,6 +2294,14 @@ gst_matroska_demux_handle_seek_event (GstMatroskaDemux * demux,
GST_DEBUG_OBJECT (demux, "Seek position looks sane");
GST_OBJECT_UNLOCK (demux);
if (demux->streaming) {
/* need to seek to cluster start to pick up cluster time */
/* upstream takes care of flushing and all that
* ... and newsegment event handling takes care of the rest */
return perform_seek_to_offset (demux,
entry->pos + demux->ebml_segment_start);
}
flush = !!(flags & GST_SEEK_FLAG_FLUSH);
keyunit = !!(flags & GST_SEEK_FLAG_KEY_UNIT);
@ -2364,6 +2392,90 @@ seek_error:
}
}
/*
* Handle whether we can perform the seek event or if we have to let the chain
* function handle seeks to build the seek indexes first.
*/
static gboolean
gst_matroska_demux_handle_seek_push (GstMatroskaDemux * demux, GstPad * pad,
GstEvent * event)
{
GstSeekFlags flags;
GstSeekType cur_type, stop_type;
GstFormat format;
gdouble rate;
gint64 cur, stop;
gst_event_parse_seek (event, &rate, &format, &flags, &cur_type, &cur,
&stop_type, &stop);
/* sanity checks */
/* we can only seek on time */
if (format != GST_FORMAT_TIME) {
GST_DEBUG_OBJECT (demux, "Can only seek on TIME");
return FALSE;
}
if (stop_type != GST_SEEK_TYPE_NONE && stop != GST_CLOCK_TIME_NONE) {
GST_DEBUG_OBJECT (demux, "Seek end-time not supported in streaming mode");
return FALSE;
}
if (!(flags & GST_SEEK_FLAG_FLUSH)) {
GST_DEBUG_OBJECT (demux,
"Non-flushing seek not supported in streaming mode");
return FALSE;
}
if (flags & GST_SEEK_FLAG_SEGMENT) {
GST_DEBUG_OBJECT (demux, "Segment seek not supported in streaming mode");
return FALSE;
}
/* check for having parsed index already */
if (!demux->index_parsed) {
guint64 offset;
gboolean building_index;
if (!demux->index_offset) {
GST_DEBUG_OBJECT (demux, "no index (location); no seek in push mode");
return FALSE;
}
GST_OBJECT_LOCK (demux);
/* handle the seek event in the chain function */
demux->state = GST_MATROSKA_DEMUX_STATE_SEEK;
/* no more seek can be issued until state reset to _DATA */
/* copy the event */
if (demux->seek_event)
gst_event_unref (demux->seek_event);
demux->seek_event = gst_event_ref (event);
/* set the building_index flag so that only one thread can setup the
* structures for index seeking. */
building_index = demux->building_index;
if (!building_index) {
demux->building_index = TRUE;
offset = demux->index_offset;
}
GST_OBJECT_UNLOCK (demux);
if (!building_index) {
/* seek to the first subindex or legacy index */
GST_INFO_OBJECT (demux, "Seeking to Cues at %" G_GUINT64_FORMAT, offset);
return perform_seek_to_offset (demux, offset);
}
/* well, we are handling it already */
return TRUE;
}
/* delegate to tweaked regular seek */
return gst_matroska_demux_handle_seek_event (demux, pad, event);
}
static gboolean
gst_matroska_demux_handle_src_event (GstPad * pad, GstEvent * event)
{
@ -2372,7 +2484,15 @@ gst_matroska_demux_handle_src_event (GstPad * pad, GstEvent * event)
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_SEEK:
res = gst_matroska_demux_handle_seek_event (demux, pad, event);
/* no seeking until we are (safely) ready */
if (demux->state != GST_MATROSKA_DEMUX_STATE_DATA) {
GST_DEBUG_OBJECT (demux, "not ready for seeking yet");
return FALSE;
}
if (!demux->streaming)
res = gst_matroska_demux_handle_seek_event (demux, pad, event);
else
res = gst_matroska_demux_handle_seek_push (demux, pad, event);
gst_event_unref (event);
break;
@ -4464,6 +4584,22 @@ gst_matroska_demux_parse_blockgroup_or_simpleblock (GstMatroskaDemux * demux,
lace_time = GST_CLOCK_TIME_NONE;
}
/* need to refresh segment info ASAP */
if (GST_CLOCK_TIME_IS_VALID (lace_time) && demux->need_newsegment) {
GST_DEBUG_OBJECT (demux,
"generating segment starting at %" GST_TIME_FORMAT,
GST_TIME_ARGS (lace_time));
/* pretend we seeked here */
gst_segment_set_seek (&demux->segment, demux->segment.rate,
GST_FORMAT_TIME, 0, GST_SEEK_TYPE_SET, lace_time,
GST_SEEK_TYPE_SET, GST_CLOCK_TIME_NONE, NULL);
/* now convey our segment notion downstream */
gst_matroska_demux_send_event (demux, gst_event_new_new_segment (FALSE,
demux->segment.rate, demux->segment.format, demux->segment.start,
demux->segment.stop, demux->segment.start));
demux->need_newsegment = FALSE;
}
if (block_duration) {
if (stream->timecodescale == 1.0)
duration = block_duration * demux->time_scale;
@ -4902,6 +5038,16 @@ gst_matroska_demux_parse_contents_seekentry (GstMatroskaDemux * demux)
break;
}
/* only pick up index location when streaming */
if (demux->streaming) {
if (seek_id == GST_MATROSKA_ID_CUES) {
demux->index_offset = seek_pos + demux->ebml_segment_start;
GST_DEBUG_OBJECT (demux, "Cues located at offset %" G_GUINT64_FORMAT,
demux->index_offset);
}
break;
}
/* seek */
if (gst_ebml_read_seek (ebml, seek_pos + demux->ebml_segment_start) !=
GST_FLOW_OK)
@ -5404,6 +5550,66 @@ pause:
}
}
/*
* Create and push a flushing seek event upstream
*/
static gboolean
perform_seek_to_offset (GstMatroskaDemux * demux, guint64 offset)
{
GstEvent *event;
gboolean res = 0;
GST_DEBUG_OBJECT (demux, "Seeking to %" G_GUINT64_FORMAT, offset);
event =
gst_event_new_seek (1.0, GST_FORMAT_BYTES,
GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_ACCURATE, GST_SEEK_TYPE_SET, offset,
GST_SEEK_TYPE_NONE, -1);
res = gst_pad_push_event (demux->sinkpad, event);
/* newsegment event will update offset */
return res;
}
static void
gst_matroska_demux_check_seekability (GstMatroskaDemux * demux)
{
GstQuery *query;
gboolean seekable = FALSE;
gint64 start = -1, stop = -1;
query = gst_query_new_seeking (GST_FORMAT_BYTES);
if (!gst_pad_peer_query (demux->sinkpad, query)) {
GST_DEBUG_OBJECT (demux, "seeking query failed");
goto done;
}
gst_query_parse_seeking (query, NULL, &seekable, &start, &stop);
/* try harder to query upstream size if we didn't get it the first time */
if (seekable && stop == -1) {
GstFormat fmt = GST_FORMAT_BYTES;
GST_DEBUG_OBJECT (demux, "doing duration query to fix up unset stop");
gst_pad_query_peer_duration (demux->sinkpad, &fmt, &stop);
}
/* if upstream doesn't know the size, it's likely that it's not seekable in
* practice even if it technically may be seekable */
if (seekable && (start != 0 || stop <= start)) {
GST_DEBUG_OBJECT (demux, "seekable but unknown start/stop -> disable");
seekable = FALSE;
}
done:
GST_INFO_OBJECT (demux, "seekable: %d (%" G_GUINT64_FORMAT " - %"
G_GUINT64_FORMAT ")", seekable, start, stop);
demux->seekable = seekable;
gst_query_unref (query);
}
static inline void
gst_matroska_demux_take (GstMatroskaDemux * demux, guint bytes)
{
@ -5538,6 +5744,12 @@ gst_matroska_demux_chain (GstPad * pad, GstBuffer * buffer)
guint64 length;
const gchar *name;
if (G_UNLIKELY (GST_BUFFER_IS_DISCONT (buffer))) {
GST_DEBUG_OBJECT (demux, "got DISCONT");
gst_adapter_clear (demux->adapter);
gst_matroska_demux_reset_streams (demux, GST_CLOCK_TIME_NONE, FALSE);
}
gst_adapter_push (demux->adapter, buffer);
buffer = NULL;
@ -5577,6 +5789,7 @@ next:
if (ret != GST_FLOW_OK)
goto parse_failed;
demux->state = GST_MATROSKA_DEMUX_STATE_HEADER;
gst_matroska_demux_check_seekability (demux);
break;
default:
goto invalid_header;
@ -5585,6 +5798,7 @@ next:
break;
case GST_MATROSKA_DEMUX_STATE_HEADER:
case GST_MATROSKA_DEMUX_STATE_DATA:
case GST_MATROSKA_DEMUX_STATE_SEEK:
switch (id) {
case GST_MATROSKA_ID_SEGMENT:
/* eat segment prefix */
@ -5678,11 +5892,38 @@ next:
name = "Cues";
goto skip;
case GST_MATROSKA_ID_SEEKHEAD:
name = "SeekHead";
goto skip;
gst_matroska_demux_take (demux, length + needed);
DEBUG_ELEMENT_START (demux, ebml, "SeekHead");
if ((ret = gst_ebml_read_master (ebml, &id)) == GST_FLOW_OK)
ret = gst_matroska_demux_parse_contents (demux);
if (ret != GST_FLOW_OK)
goto exit;
break;
case GST_MATROSKA_ID_CUES:
name = "Cues";
goto skip;
if (demux->index_parsed) {
gst_matroska_demux_flush (demux, needed + length);
break;
}
gst_matroska_demux_take (demux, length + needed);
ret = gst_matroska_demux_parse_index (demux);
if (demux->state == GST_MATROSKA_DEMUX_STATE_SEEK) {
GstEvent *event;
GST_OBJECT_LOCK (demux);
event = demux->seek_event;
demux->seek_event = NULL;
GST_OBJECT_UNLOCK (demux);
g_assert (event);
/* unlikely to fail, since we managed to seek to this point */
if (!gst_matroska_demux_handle_seek_event (demux, NULL, event))
goto seek_failed;
/* resume data handling, main thread clear to seek again */
GST_OBJECT_LOCK (demux);
demux->state = GST_MATROSKA_DEMUX_STATE_DATA;
GST_OBJECT_UNLOCK (demux);
}
break;
default:
name = "Unknown";
skip:
@ -5719,6 +5960,11 @@ invalid_header:
GST_ELEMENT_ERROR (demux, STREAM, DEMUX, (NULL), ("Invalid header"));
return GST_FLOW_ERROR;
}
seek_failed:
{
GST_ELEMENT_ERROR (demux, STREAM, DEMUX, (NULL), ("Failed to seek"));
return GST_FLOW_ERROR;
}
}
static gboolean
@ -5749,7 +5995,33 @@ gst_matroska_demux_handle_sink_event (GstPad * pad, GstEvent * event)
"received format %d newsegment %" GST_SEGMENT_FORMAT, format,
&segment);
/* chain will send initial newsegment after pads have been added */
if (demux->state < GST_MATROSKA_DEMUX_STATE_DATA) {
GST_DEBUG_OBJECT (demux, "still starting");
goto exit;
}
/* we only expect a BYTE segment, e.g. following a seek */
if (format != GST_FORMAT_BYTES) {
GST_DEBUG_OBJECT (demux, "unsupported segment format, ignoring");
goto exit;
}
GST_DEBUG_OBJECT (demux, "clearing segment state");
/* clear current segment leftover */
gst_adapter_clear (demux->adapter);
/* and some streaming setup */
demux->offset = start;
/* do not know where we are;
* need to come across a cluster and generate newsegment */
demux->segment.last_stop = GST_CLOCK_TIME_NONE;
demux->cluster_time = GST_CLOCK_TIME_NONE;
demux->cluster_offset = 0;
demux->need_newsegment = TRUE;
/* but keep some of the upstream segment */
demux->segment.rate = rate;
exit:
/* chain will send initial newsegment after pads have been added,
* or otherwise come up with one */
GST_DEBUG_OBJECT (demux, "eating event");
gst_event_unref (event);
res = TRUE;
@ -5769,6 +6041,15 @@ gst_matroska_demux_handle_sink_event (GstPad * pad, GstEvent * event)
}
break;
}
case GST_EVENT_FLUSH_STOP:
{
gst_adapter_clear (demux->adapter);
gst_matroska_demux_reset_streams (demux, GST_CLOCK_TIME_NONE, TRUE);
demux->segment.last_stop = GST_CLOCK_TIME_NONE;
demux->cluster_time = GST_CLOCK_TIME_NONE;
demux->cluster_offset = 0;
/* fall-through */
}
default:
res = gst_pad_event_default (pad, event);
break;
@ -5780,11 +6061,15 @@ gst_matroska_demux_handle_sink_event (GstPad * pad, GstEvent * event)
static gboolean
gst_matroska_demux_sink_activate (GstPad * sinkpad)
{
GstMatroskaDemux *demux = GST_MATROSKA_DEMUX (GST_PAD_PARENT (sinkpad));
if (gst_pad_check_pull_range (sinkpad)) {
GST_DEBUG ("going to pull mode");
demux->streaming = FALSE;
return gst_pad_activate_pull (sinkpad, TRUE);
} else {
GST_DEBUG ("going to push (streaming) mode");
demux->streaming = TRUE;
return gst_pad_activate_push (sinkpad, TRUE);
}

View file

@ -44,7 +44,8 @@ G_BEGIN_DECLS
typedef enum {
GST_MATROSKA_DEMUX_STATE_START,
GST_MATROSKA_DEMUX_STATE_HEADER,
GST_MATROSKA_DEMUX_STATE_DATA
GST_MATROSKA_DEMUX_STATE_DATA,
GST_MATROSKA_DEMUX_STATE_SEEK
} GstMatroskaDemuxState;
typedef struct _GstMatroskaDemux {
@ -70,6 +71,7 @@ typedef struct _GstMatroskaDemux {
gint64 created;
/* state */
gboolean streaming;
GstMatroskaDemuxState state;
guint level_up;
guint64 seek_block;
@ -105,6 +107,12 @@ typedef struct _GstMatroskaDemux {
/* some state saving */
GstClockTime cluster_time;
guint64 cluster_offset;
/* index stuff */
gboolean seekable;
gboolean building_index;
guint64 index_offset;
GstEvent *seek_event;
gboolean need_newsegment;
/* reverse playback */
GArray *seek_index;