baseparse: support reverse playback

... in pull mode or upstream driven.
This commit is contained in:
Mark Nauwelaerts 2010-09-29 16:12:42 +02:00 committed by Tim-Philipp Müller
parent 304b3c78dd
commit 0f5f06d5fa

View file

@ -269,6 +269,12 @@ struct _GstBaseParsePrivate
/* seek events are temporarily kept to match them with newsegments */
GSList *pending_seeks;
/* reverse playback */
GSList *buffers_pending;
GSList *buffers_queued;
GstClockTime last_ts;
gint64 last_offset;
/* property */
/* number of initial frames to discard */
gint skip;
@ -377,6 +383,24 @@ static void gst_base_parse_drain (GstBaseParse * parse);
static void gst_base_parse_post_bitrates (GstBaseParse * parse,
gboolean post_min, gboolean post_avg, gboolean post_max);
static gint64 gst_base_parse_find_offset (GstBaseParse * parse,
GstClockTime time, gboolean before, GstClockTime * _ts);
static GstFlowReturn gst_base_parse_process_fragment (GstBaseParse * parse,
gboolean push_only);
static void
gst_base_parse_clear_queues (GstBaseParse * parse)
{
g_slist_foreach (parse->priv->buffers_queued, (GFunc) gst_buffer_unref, NULL);
g_slist_free (parse->priv->buffers_queued);
parse->priv->buffers_queued = NULL;
g_slist_foreach (parse->priv->buffers_pending, (GFunc) gst_buffer_unref,
NULL);
g_slist_free (parse->priv->buffers_pending);
parse->priv->buffers_pending = NULL;
}
static void
gst_base_parse_finalize (GObject * object)
{
@ -410,6 +434,8 @@ gst_base_parse_finalize (GObject * object)
parse->priv->index = NULL;
}
gst_base_parse_clear_queues (parse);
G_OBJECT_CLASS (parent_class)->finalize (object);
}
@ -540,6 +566,9 @@ gst_base_parse_reset (GstBaseParse * parse)
parse->priv->idx_interval = 0;
parse->priv->exact_position = TRUE;
parse->priv->last_ts = GST_CLOCK_TIME_NONE;
parse->priv->last_offset = 0;
if (parse->pending_segment)
gst_event_unref (parse->pending_segment);
@ -829,11 +858,15 @@ gst_base_parse_sink_eventfunc (GstBaseParse * parse, GstEvent * event)
/* but finish the current segment */
GST_DEBUG_OBJECT (parse, "draining current segment");
gst_base_parse_drain (parse);
if (parse->segment.rate > 0.0)
gst_base_parse_drain (parse);
else
gst_base_parse_process_fragment (parse, FALSE);
gst_adapter_clear (parse->adapter);
parse->priv->offset = offset;
parse->priv->sync_offset = offset;
parse->priv->next_ts = next_ts;
parse->priv->last_ts = GST_CLOCK_TIME_NONE;
parse->priv->discont = TRUE;
break;
}
@ -849,12 +882,17 @@ gst_base_parse_sink_eventfunc (GstBaseParse * parse, GstEvent * event)
case GST_EVENT_FLUSH_STOP:
gst_adapter_clear (parse->adapter);
gst_base_parse_clear_queues (parse);
parse->priv->flushing = FALSE;
parse->priv->discont = TRUE;
parse->priv->last_ts = GST_CLOCK_TIME_NONE;
break;
case GST_EVENT_EOS:
gst_base_parse_drain (parse);
if (parse->segment.rate > 0.0)
gst_base_parse_drain (parse);
else
gst_base_parse_process_fragment (parse, FALSE);
/* If we STILL have zero frames processed, fire an error */
if (parse->priv->framecount == 0) {
@ -1402,7 +1440,7 @@ gst_base_parse_push_buffer (GstBaseParse * parse, GstBuffer * buffer)
gst_buffer_set_caps (buffer, GST_PAD_CAPS (parse->srcpad));
/* segment adjustment magic; only if we are running the whole show */
if (!parse->priv->passthrough &&
if (!parse->priv->passthrough && parse->segment.rate > 0.0 &&
(parse->priv->pad_mode == GST_ACTIVATE_PULL ||
parse->priv->upstream_seekable)) {
/* segment times are typically estimates,
@ -1537,14 +1575,22 @@ gst_base_parse_push_buffer (GstBaseParse * parse, GstBuffer * buffer)
gst_buffer_unref (buffer);
ret = GST_FLOW_OK;
} else if (ret == GST_FLOW_OK) {
if (G_LIKELY (!parse->priv->skip)) {
ret = gst_pad_push (parse->srcpad, buffer);
GST_LOG_OBJECT (parse, "frame (%d bytes) pushed: %s",
GST_BUFFER_SIZE (buffer), gst_flow_get_name (ret));
if (parse->segment.rate > 0.0) {
if (G_LIKELY (!parse->priv->skip)) {
ret = gst_pad_push (parse->srcpad, buffer);
GST_LOG_OBJECT (parse, "frame (%d bytes) pushed: %s",
GST_BUFFER_SIZE (buffer), gst_flow_get_name (ret));
} else {
GST_DEBUG_OBJECT (parse, "initial frame (%d bytes) discarded",
GST_BUFFER_SIZE (buffer));
parse->priv->skip--;
}
} else {
GST_DEBUG_OBJECT (parse, "initial frame (%d bytes) discarded",
GST_LOG_OBJECT (parse, "frame (%d bytes) queued for now: %d",
GST_BUFFER_SIZE (buffer));
parse->priv->skip--;
parse->priv->buffers_queued =
g_slist_prepend (parse->priv->buffers_queued, buffer);
ret = GST_FLOW_OK;
}
} else {
gst_buffer_unref (buffer);
@ -1602,6 +1648,118 @@ gst_base_parse_drain (GstBaseParse * parse)
parse->priv->drain = FALSE;
}
/**
* gst_base_parse_process_fragment:
* @parse: #GstBaseParse.
*
* Processes a reverse playback (forward) fragment:
* - append head of last fragment that was skipped to current fragment data
* - drain the resulting current fragment data (i.e. repeated chain)
* - add time/duration (if needed) to frames queued by chain
* - push queued data
*/
static GstFlowReturn
gst_base_parse_process_fragment (GstBaseParse * parse, gboolean push_only)
{
GstBuffer *buf;
GstFlowReturn ret = GST_FLOW_OK;
GSList *send = NULL;
if (push_only)
goto push;
/* restore order */
parse->priv->buffers_pending = g_slist_reverse (parse->priv->buffers_pending);
while (parse->priv->buffers_pending) {
buf = GST_BUFFER_CAST (parse->priv->buffers_pending->data);
GST_LOG_OBJECT (parse, "adding pending buffer (size %d)",
GST_BUFFER_SIZE (buf));
gst_adapter_push (parse->adapter, buf);
parse->priv->buffers_pending =
g_slist_delete_link (parse->priv->buffers_pending,
parse->priv->buffers_pending);
}
/* invalidate so no fall-back timestamping is performed;
* ok if taken from subclass or upstream */
parse->priv->next_ts = GST_CLOCK_TIME_NONE;
/* prevent it hanging around stop all the time */
parse->segment.last_stop = GST_CLOCK_TIME_NONE;
/* mark next run */
parse->priv->discont = TRUE;
/* chain looks for frames and queues resulting ones (in stead of pushing) */
/* initial skipped data is added to buffers_pending */
gst_base_parse_drain (parse);
push:
/* add metadata (if needed to queued buffers */
GST_LOG_OBJECT (parse, "last timestamp: %" GST_TIME_FORMAT,
GST_TIME_ARGS (parse->priv->last_ts));
while (parse->priv->buffers_queued) {
buf = GST_BUFFER_CAST (parse->priv->buffers_queued->data);
/* no touching if upstream or parsing provided time */
if (GST_BUFFER_TIMESTAMP_IS_VALID (buf)) {
GST_LOG_OBJECT (parse, "buffer has time %" GST_TIME_FORMAT,
GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buf)));
} else if (GST_CLOCK_TIME_IS_VALID (parse->priv->last_ts) &&
GST_BUFFER_DURATION_IS_VALID (buf)) {
if (G_LIKELY (GST_BUFFER_DURATION (buf) <= parse->priv->last_ts))
parse->priv->last_ts -= GST_BUFFER_DURATION (buf);
else
parse->priv->last_ts = 0;
GST_BUFFER_TIMESTAMP (buf) = parse->priv->last_ts;
GST_LOG_OBJECT (parse, "applied time %" GST_TIME_FORMAT,
GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buf)));
} else {
/* no idea, very bad */
GST_WARNING_OBJECT (parse, "could not determine time for buffer");
}
/* reverse order for ascending sending */
send = g_slist_prepend (send, buf);
parse->priv->buffers_queued =
g_slist_delete_link (parse->priv->buffers_queued,
parse->priv->buffers_queued);
}
/* send buffers */
while (send) {
buf = GST_BUFFER_CAST (send->data);
GST_LOG_OBJECT (parse, "pushing buffer %p, timestamp %"
GST_TIME_FORMAT ", duration %" GST_TIME_FORMAT
", offset %" G_GINT64_FORMAT, buf,
GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buf)),
GST_TIME_ARGS (GST_BUFFER_DURATION (buf)), GST_BUFFER_OFFSET (buf));
if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (parse->priv->last_ts)))
parse->priv->last_ts = GST_BUFFER_TIMESTAMP (buf);
/* iterate output queue an push downstream */
ret = gst_pad_push (parse->srcpad, buf);
send = g_slist_delete_link (send, send);
/* clear any leftover if error */
if (G_UNLIKELY (ret != GST_FLOW_OK)) {
while (send) {
buf = GST_BUFFER_CAST (send->data);
gst_buffer_unref (buf);
send = g_slist_delete_link (send, send);
}
}
}
/* any trailing unused no longer usable (ideally none) */
if (G_UNLIKELY (gst_adapter_available (parse->adapter))) {
GST_DEBUG_OBJECT (parse, "discarding %d trailing bytes",
gst_adapter_available (parse->adapter));
gst_adapter_clear (parse->adapter);
}
return ret;
}
/* small helper that checks whether we have been trying to resync too long */
static inline GstFlowReturn
gst_base_parse_check_sync (GstBaseParse * parse)
@ -1647,8 +1805,18 @@ gst_base_parse_chain (GstPad * pad, GstBuffer * buffer)
if (G_UNLIKELY (parse->priv->passthrough)) {
buffer = gst_buffer_make_metadata_writable (buffer);
return gst_base_parse_push_buffer (parse, buffer);
} else
}
/* upstream feeding us in reverse playback;
* gather each fragment, then process it in single run */
if (parse->segment.rate < 0.0) {
if (G_UNLIKELY (GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_DISCONT))) {
GST_DEBUG_OBJECT (parse, "buffer starts new reverse playback fragment");
ret = gst_base_parse_process_fragment (parse, FALSE);
}
gst_adapter_push (parse->adapter, buffer);
return ret;
}
gst_adapter_push (parse->adapter, buffer);
}
/* Parse and push as many frames as possible */
@ -1701,21 +1869,30 @@ gst_base_parse_chain (GstPad * pad, GstBuffer * buffer)
}
break;
}
if (skip == -1) {
/* subclass didn't touch this value. By default we skip 1 byte */
skip = 1;
}
if (skip > 0) {
GST_LOG_OBJECT (parse, "finding sync, skipping %d bytes", skip);
gst_adapter_flush (parse->adapter, skip);
if (parse->segment.rate < 0.0 && !parse->priv->buffers_queued) {
/* reverse playback, and no frames found yet, so we are skipping
* the leading part of a fragment, which may form the tail of
* fragment coming later, hopefully subclass skips efficiently ... */
timestamp = gst_adapter_prev_timestamp (parse->adapter, NULL);
outbuf = gst_adapter_take_buffer (parse->adapter, skip);
outbuf = gst_buffer_make_metadata_writable (outbuf);
GST_BUFFER_TIMESTAMP (outbuf) = timestamp;
parse->priv->buffers_pending =
g_slist_prepend (parse->priv->buffers_pending, outbuf);
outbuf = NULL;
} else {
gst_adapter_flush (parse->adapter, skip);
}
parse->priv->offset += skip;
if (!parse->priv->discont)
parse->priv->sync_offset = parse->priv->offset;
parse->priv->discont = TRUE;
} else if (skip == -1) {
/* subclass didn't touch this value. By default we skip 1 byte */
GST_LOG_OBJECT (parse, "finding sync, skipping 1 byte");
gst_adapter_flush (parse->adapter, 1);
parse->priv->offset++;
if (!parse->priv->discont)
parse->priv->sync_offset = parse->priv->offset;
parse->priv->discont = TRUE;
}
/* There is a possibility that subclass set the skip value to zero.
This means that it has probably found a frame but wants to ask
@ -1849,6 +2026,62 @@ gst_base_parse_pull_range (GstBaseParse * parse, guint size,
return GST_FLOW_OK;
}
static GstFlowReturn
gst_base_parse_handle_previous_fragment (GstBaseParse * parse)
{
gint64 offset = 0;
GstClockTime ts = 0;
GstBuffer *buffer;
GstFlowReturn ret;
GST_DEBUG_OBJECT (parse, "fragment ended; last_ts = %" GST_TIME_FORMAT
", last_offset = %" G_GINT64_FORMAT, GST_TIME_ARGS (parse->priv->last_ts),
parse->priv->last_offset);
if (!parse->priv->last_offset || parse->priv->last_ts <= parse->segment.start) {
GST_DEBUG_OBJECT (parse, "past start of segment %" GST_TIME_FORMAT,
GST_TIME_ARGS (parse->segment.start));
ret = GST_FLOW_UNEXPECTED;
goto exit;
}
/* last fragment started at last_offset / last_ts;
* seek back 10s capped at 1MB */
if (parse->priv->last_ts >= 10 * GST_SECOND)
ts = parse->priv->last_ts - 10 * GST_SECOND;
/* if we are exact now, we will be more so going backwards */
if (parse->priv->exact_position) {
offset = gst_base_parse_find_offset (parse, ts, TRUE, NULL);
} else {
GstFormat dstformat = GST_FORMAT_BYTES;
if (!gst_pad_query_convert (parse->srcpad, GST_FORMAT_TIME, ts,
&dstformat, &offset)) {
GST_DEBUG_OBJECT (parse, "conversion failed, only BYTE based");
}
}
offset = CLAMP (offset, parse->priv->last_offset - 1024 * 1024,
parse->priv->last_offset - 1024);
offset = MAX (0, offset);
GST_DEBUG_OBJECT (parse, "next fragment from offset %" G_GINT64_FORMAT,
offset);
parse->priv->offset = offset;
ret = gst_base_parse_pull_range (parse, parse->priv->last_offset - offset,
&buffer);
if (ret != GST_FLOW_OK)
goto exit;
gst_adapter_push (parse->adapter, buffer);
ret = gst_base_parse_process_fragment (parse, FALSE);
if (ret != GST_FLOW_OK)
goto exit;
exit:
return ret;
}
/**
* gst_base_parse_loop:
* @pad: GstPad
@ -1861,13 +2094,23 @@ gst_base_parse_loop (GstPad * pad)
GstBaseParse *parse;
GstBaseParseClass *klass;
GstBuffer *buffer, *outbuf;
gboolean ret = FALSE;
GstFlowReturn ret = FALSE;
guint fsize = 0, min_size;
gint skip = 0;
parse = GST_BASE_PARSE (gst_pad_get_parent (pad));
klass = GST_BASE_PARSE_GET_CLASS (parse);
/* reverse playback:
* first fragment (closest to stop time) is handled normally below,
* then we pull in fragments going backwards */
if (parse->segment.rate < 0.0) {
if (GST_CLOCK_TIME_IS_VALID (parse->priv->last_ts)) {
ret = gst_base_parse_handle_previous_fragment (parse);
goto done;
}
}
while (TRUE) {
GST_BASE_PARSE_LOCK (parse);
@ -1875,11 +2118,8 @@ gst_base_parse_loop (GstPad * pad)
GST_BASE_PARSE_UNLOCK (parse);
ret = gst_base_parse_pull_range (parse, min_size, &buffer);
if (ret == GST_FLOW_UNEXPECTED)
goto eos;
else if (ret != GST_FLOW_OK)
goto pause;
if (ret != GST_FLOW_OK)
goto done;
if (parse->priv->discont) {
GST_DEBUG_OBJECT (parse, "marking DISCONT");
@ -1897,18 +2137,23 @@ gst_base_parse_loop (GstPad * pad)
break;
}
parse->priv->drain = FALSE;
if (skip == -1)
skip = 1;
if (skip > 0) {
GST_LOG_OBJECT (parse, "finding sync, skipping %d bytes", skip);
if (parse->segment.rate < 0.0 && !parse->priv->buffers_queued) {
/* reverse playback, and no frames found yet, so we are skipping
* the leading part of a fragment, which may form the tail of
* fragment coming later, hopefully subclass skips efficiently ... */
outbuf = gst_buffer_create_sub (buffer, 0, skip);
parse->priv->buffers_pending =
g_slist_prepend (parse->priv->buffers_pending, outbuf);
outbuf = NULL;
}
parse->priv->offset += skip;
if (!parse->priv->discont)
parse->priv->sync_offset = parse->priv->offset;
parse->priv->discont = TRUE;
} else if (skip == -1) {
GST_LOG_OBJECT (parse, "finding sync, skipping 1 byte");
parse->priv->offset++;
if (!parse->priv->discont)
parse->priv->sync_offset = parse->priv->offset;
parse->priv->discont = TRUE;
}
/* skip == 0 should imply subclass set min_size to need more data ... */
GST_DEBUG_OBJECT (parse, "finding sync...");
@ -1925,11 +2170,8 @@ gst_base_parse_loop (GstPad * pad)
} else {
gst_buffer_unref (buffer);
ret = gst_base_parse_pull_range (parse, fsize, &outbuf);
if (ret == GST_FLOW_UNEXPECTED)
goto eos;
else if (ret != GST_FLOW_OK)
goto pause;
if (ret != GST_FLOW_OK)
goto done;
if (GST_BUFFER_SIZE (outbuf) < fsize)
goto eos;
}
@ -1943,10 +2185,21 @@ gst_base_parse_loop (GstPad * pad)
/* This always unrefs the outbuf, even if error occurs */
ret = gst_base_parse_handle_and_push_buffer (parse, klass, outbuf);
if (ret != GST_FLOW_OK)
goto pause;
/* eat expected eos signalling past segment in reverse playback */
if (parse->segment.rate < 0.0 && ret == GST_FLOW_UNEXPECTED &&
parse->segment.last_stop >= parse->segment.stop) {
GST_DEBUG_OBJECT (parse, "downstream has reached end of segment");
/* push what was accumulated during loop run */
gst_base_parse_process_fragment (parse, TRUE);
ret = GST_FLOW_OK;
}
done:
if (ret == GST_FLOW_UNEXPECTED)
goto eos;
else if (ret != GST_FLOW_OK)
goto pause;
gst_object_unref (parse);
return;
@ -2599,13 +2852,13 @@ gst_base_parse_handle_seek (GstBaseParse * parse, GstEvent * event)
gst_event_parse_seek (event, &rate, &format, &flags,
&cur_type, &cur, &stop_type, &stop);
GST_DEBUG_OBJECT (parse, "seek to format %s, "
GST_DEBUG_OBJECT (parse, "seek to format %s, rate %f, "
"start type %d at %" GST_TIME_FORMAT ", end type %d at %"
GST_TIME_FORMAT, gst_format_get_name (format),
GST_TIME_FORMAT, gst_format_get_name (format), rate,
cur_type, GST_TIME_ARGS (cur), stop_type, GST_TIME_ARGS (stop));
/* no negative rates yet */
if (rate < 0.0)
if (rate < 0.0 && parse->priv->pad_mode == GST_ACTIVATE_PUSH)
goto negative_rate;
if (cur_type != GST_SEEK_TYPE_SET ||
@ -2671,8 +2924,8 @@ gst_base_parse_handle_seek (GstBaseParse * parse, GstEvent * event)
}
GST_DEBUG_OBJECT (parse,
"seek position %" G_GINT64_FORMAT " in bytes: %" G_GINT64_FORMAT, cur,
seekpos);
"seek position %" G_GINT64_FORMAT " in bytes: %" G_GINT64_FORMAT,
start_ts, seekpos);
GST_DEBUG_OBJECT (parse,
"seek stop %" G_GINT64_FORMAT " in bytes: %" G_GINT64_FORMAT,
seeksegment.stop, seekstop);
@ -2706,6 +2959,7 @@ gst_base_parse_handle_seek (GstBaseParse * parse, GstEvent * event)
if (flush) {
GST_DEBUG_OBJECT (parse, "sending flush stop");
gst_pad_push_event (parse->srcpad, gst_event_new_flush_stop ());
gst_base_parse_clear_queues (parse);
} else {
if (parse->close_segment)
gst_event_unref (parse->close_segment);
@ -2734,22 +2988,25 @@ gst_base_parse_handle_seek (GstBaseParse * parse, GstEvent * event)
/* This will be sent later in _loop() */
parse->pending_segment =
gst_event_new_new_segment (FALSE, parse->segment.rate,
parse->segment.format, parse->segment.last_stop, parse->segment.stop,
parse->segment.last_stop);
parse->segment.format, parse->segment.start, parse->segment.stop,
parse->segment.start);
GST_DEBUG_OBJECT (parse, "Created newseg format %d, "
"start = %" GST_TIME_FORMAT ", stop = %" GST_TIME_FORMAT
", pos = %" GST_TIME_FORMAT, format,
GST_TIME_ARGS (parse->segment.last_stop),
GST_TIME_ARGS (stop), GST_TIME_ARGS (parse->segment.last_stop));
GST_TIME_ARGS (parse->segment.start),
GST_TIME_ARGS (parse->segment.stop),
GST_TIME_ARGS (parse->segment.start));
/* mark discont if we are going to stream from another position. */
if (seekpos != parse->priv->offset) {
GST_DEBUG_OBJECT (parse,
"mark DISCONT, we did a seek to another position");
parse->priv->offset = seekpos;
parse->priv->last_offset = seekpos;
parse->priv->discont = TRUE;
parse->priv->next_ts = start_ts;
parse->priv->last_ts = GST_CLOCK_TIME_NONE;
parse->priv->sync_offset = seekpos;
parse->priv->exact_position = accurate;
}