baseparse: modify reverse playback handling

... so as to allow the push-mode case to provide data to subclass
on a buffer by buffer basis (as in regular forward case), rather
than all buffers of a fragment chucked together.

Also refactor buffer handling some more, and add some debug.
This commit is contained in:
Mark Nauwelaerts 2012-02-14 19:33:46 +01:00
parent 2363490ef5
commit 193e1cf16e

View file

@ -314,6 +314,7 @@ struct _GstBaseParsePrivate
/* reverse playback */
GSList *buffers_pending;
GSList *buffers_head;
GSList *buffers_queued;
GSList *buffers_send;
GstClockTime last_ts;
@ -435,8 +436,11 @@ static gint64 gst_base_parse_find_offset (GstBaseParse * parse,
static GstFlowReturn gst_base_parse_locate_time (GstBaseParse * parse,
GstClockTime * _time, gint64 * _offset);
static GstFlowReturn gst_base_parse_process_fragment (GstBaseParse * parse,
gboolean push_only);
static GstFlowReturn gst_base_parse_start_fragment (GstBaseParse * parse);
static GstFlowReturn gst_base_parse_finish_fragment (GstBaseParse * parse,
gboolean prev_head);
static inline GstFlowReturn gst_base_parse_check_sync (GstBaseParse * parse);
static gboolean gst_base_parse_is_seekable (GstBaseParse * parse);
@ -450,6 +454,9 @@ gst_base_parse_clear_queues (GstBaseParse * parse)
NULL);
g_slist_free (parse->priv->buffers_pending);
parse->priv->buffers_pending = NULL;
g_slist_foreach (parse->priv->buffers_head, (GFunc) gst_buffer_unref, NULL);
g_slist_free (parse->priv->buffers_head);
parse->priv->buffers_head = NULL;
g_slist_foreach (parse->priv->buffers_send, (GFunc) gst_buffer_unref, NULL);
g_slist_free (parse->priv->buffers_send);
parse->priv->buffers_send = NULL;
@ -1049,7 +1056,7 @@ gst_base_parse_sink_eventfunc (GstBaseParse * parse, GstEvent * event)
if (in_segment->rate > 0.0)
gst_base_parse_drain (parse);
else
gst_base_parse_process_fragment (parse, FALSE);
gst_base_parse_finish_fragment (parse, FALSE);
gst_adapter_clear (parse->priv->adapter);
parse->priv->offset = offset;
@ -1088,7 +1095,7 @@ gst_base_parse_sink_eventfunc (GstBaseParse * parse, GstEvent * event)
if (parse->segment.rate > 0.0)
gst_base_parse_drain (parse);
else
gst_base_parse_process_fragment (parse, FALSE);
gst_base_parse_finish_fragment (parse, TRUE);
/* If we STILL have zero frames processed, fire an error */
if (parse->priv->framecount == 0) {
@ -1734,7 +1741,9 @@ gst_base_parse_unprepare_frame (GstBaseParse * parse, GstBaseParseFrame * frame)
gst_base_parse_frame_update (parse, frame, NULL);
}
/* takes ownership of @buffer */
/* Wraps buffer in a frame and dispatches to subclass.
* Also manages data skipping and offset handling (including adapter flushing).
* Takes ownership of @buffer */
static GstFlowReturn
gst_base_parse_handle_buffer (GstBaseParse * parse, GstBuffer * buffer,
gint * skip, gint * flushed)
@ -1745,6 +1754,12 @@ gst_base_parse_handle_buffer (GstBaseParse * parse, GstBuffer * buffer,
g_return_val_if_fail (skip != NULL || flushed != NULL, GST_FLOW_ERROR);
GST_LOG_OBJECT (parse,
"handling buffer of size %" G_GSIZE_FORMAT " with ts %" GST_TIME_FORMAT
", duration %" GST_TIME_FORMAT, gst_buffer_get_size (buffer),
GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)),
GST_TIME_ARGS (GST_BUFFER_DURATION (buffer)));
/* track what is being flushed during this single round of frame processing */
parse->priv->flushed = 0;
*skip = 0;
@ -1759,15 +1774,16 @@ gst_base_parse_handle_buffer (GstBaseParse * parse, GstBuffer * buffer,
ret = klass->handle_frame (parse, frame, skip);
gst_base_parse_unprepare_frame (parse, frame);
if (parse->priv->pad_mode == GST_PAD_MODE_PULL) {
gst_adapter_clear (parse->priv->adapter);
}
*flushed = parse->priv->flushed;
GST_LOG_OBJECT (parse, "handle_frame skipped %d, flushed %d",
*skip, *flushed);
if (ret != GST_FLOW_OK) {
GST_DEBUG_OBJECT (parse, "handle_frame returned %d", ret);
goto exit;
}
/* subclass can only do one of these, or semantics are too unclear */
g_assert (*skip == 0 || *flushed == 0);
@ -1779,13 +1795,41 @@ gst_base_parse_handle_buffer (GstBaseParse * parse, GstBuffer * buffer,
parse->priv->prev_frame = NULL;
}
/* track skipping */
if (*skip > 0) {
GstClockTime timestamp;
GstBuffer *outbuf;
GST_LOG_OBJECT (parse, "finding sync, skipping %d bytes", *skip);
if (parse->segment.rate < 0.0 && !parse->priv->buffers_queued) {
/* reverse playback, and no frames found yet, so we are skipping
* the leading part of a fragment, which may form the tail of
* fragment coming later, hopefully subclass skips efficiently ... */
timestamp = gst_adapter_prev_timestamp (parse->priv->adapter, NULL);
outbuf = gst_adapter_take_buffer (parse->priv->adapter, *skip);
outbuf = gst_buffer_make_writable (outbuf);
GST_BUFFER_TIMESTAMP (outbuf) = timestamp;
parse->priv->buffers_head =
g_slist_prepend (parse->priv->buffers_head, outbuf);
outbuf = NULL;
} else {
gst_adapter_flush (parse->priv->adapter, *skip);
}
if (!parse->priv->discont)
parse->priv->sync_offset = parse->priv->offset;
parse->priv->offset += *skip;
parse->priv->discont = TRUE;
/* check for indefinite skipping */
if (ret == GST_FLOW_OK)
ret = gst_base_parse_check_sync (parse);
}
parse->priv->offset += *flushed;
#ifndef GST_DISABLE_GST_DEBUG
if (ret != GST_FLOW_OK) {
GST_DEBUG_OBJECT (parse, "handle_frame returned %d", ret);
exit:
if (parse->priv->pad_mode == GST_PAD_MODE_PULL) {
gst_adapter_clear (parse->priv->adapter);
}
#endif
return ret;
}
@ -2281,7 +2325,34 @@ gst_base_parse_send_buffers (GstBaseParse * parse)
return ret;
}
/* gst_base_parse_process_fragment:
/* gst_base_parse_start_fragment:
*
* Prepares for processing a reverse playback (forward) fragment
* by (re)setting proper state variables.
*/
static GstFlowReturn
gst_base_parse_start_fragment (GstBaseParse * parse)
{
GST_LOG_OBJECT (parse, "starting fragment");
/* invalidate so no fall-back timestamping is performed;
* ok if taken from subclass or upstream */
parse->priv->next_ts = GST_CLOCK_TIME_NONE;
parse->priv->prev_ts = GST_CLOCK_TIME_NONE;
/* prevent it hanging around stop all the time */
parse->segment.position = GST_CLOCK_TIME_NONE;
/* mark next run */
parse->priv->discont = TRUE;
/* head of previous fragment is now pending tail of current fragment */
parse->priv->buffers_pending = parse->priv->buffers_head;
parse->priv->buffers_head = NULL;
return GST_FLOW_OK;
}
/* gst_base_parse_finish_fragment:
*
* Processes a reverse playback (forward) fragment:
* - append head of last fragment that was skipped to current fragment data
@ -2290,40 +2361,35 @@ gst_base_parse_send_buffers (GstBaseParse * parse)
* - push queued data
*/
static GstFlowReturn
gst_base_parse_process_fragment (GstBaseParse * parse, gboolean push_only)
gst_base_parse_finish_fragment (GstBaseParse * parse, gboolean prev_head)
{
GstBuffer *buf;
GstFlowReturn ret = GST_FLOW_OK;
gboolean seen_key = FALSE, seen_delta = FALSE;
if (push_only)
goto push;
GST_LOG_OBJECT (parse, "finishing fragment");
/* restore order */
parse->priv->buffers_pending = g_slist_reverse (parse->priv->buffers_pending);
while (parse->priv->buffers_pending) {
buf = GST_BUFFER_CAST (parse->priv->buffers_pending->data);
GST_LOG_OBJECT (parse, "adding pending buffer (size %" G_GSIZE_FORMAT ")",
gst_buffer_get_size (buf));
gst_adapter_push (parse->priv->adapter, buf);
if (prev_head) {
GST_LOG_OBJECT (parse, "adding pending buffer (size %" G_GSIZE_FORMAT ")",
gst_buffer_get_size (buf));
gst_adapter_push (parse->priv->adapter, buf);
} else {
GST_LOG_OBJECT (parse, "discarding head buffer");
gst_buffer_unref (buf);
}
parse->priv->buffers_pending =
g_slist_delete_link (parse->priv->buffers_pending,
parse->priv->buffers_pending);
}
/* invalidate so no fall-back timestamping is performed;
* ok if taken from subclass or upstream */
parse->priv->next_ts = GST_CLOCK_TIME_NONE;
/* prevent it hanging around stop all the time */
parse->segment.position = GST_CLOCK_TIME_NONE;
/* mark next run */
parse->priv->discont = TRUE;
/* chain looks for frames and queues resulting ones (in stead of pushing) */
/* initial skipped data is added to buffers_pending */
gst_base_parse_drain (parse);
push:
if (parse->priv->buffers_send) {
buf = GST_BUFFER_CAST (parse->priv->buffers_send->data);
seen_key |= !GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT);
@ -2416,7 +2482,6 @@ gst_base_parse_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
GstBaseParseClass *bclass;
GstBaseParse *parse;
GstFlowReturn ret = GST_FLOW_OK;
GstBuffer *outbuf = NULL;
GstBuffer *tmpbuf = NULL;
guint fsize = 1;
gint skip = -1;
@ -2516,14 +2581,13 @@ gst_base_parse_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
return ret;
}
/* upstream feeding us in reverse playback;
* gather each fragment, then process it in single run */
* finish previous fragment and start new upon DISCONT */
if (parse->segment.rate < 0.0) {
if (G_UNLIKELY (GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_DISCONT))) {
GST_DEBUG_OBJECT (parse, "buffer starts new reverse playback fragment");
ret = gst_base_parse_process_fragment (parse, FALSE);
ret = gst_base_parse_finish_fragment (parse, TRUE);
gst_base_parse_start_fragment (parse);
}
gst_adapter_push (parse->priv->adapter, buffer);
return ret;
}
gst_adapter_push (parse->priv->adapter, buffer);
}
@ -2577,38 +2641,17 @@ gst_base_parse_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
ret = gst_base_parse_handle_buffer (parse, tmpbuf, &skip, &flush);
tmpbuf = NULL;
/* probably already implicitly unmapped due to adapter operation,
* but for good measure ... */
gst_adapter_unmap (parse->priv->adapter);
if (ret != GST_FLOW_OK) {
goto done;
}
if (skip > 0) {
GST_LOG_OBJECT (parse, "finding sync, skipping %d bytes", skip);
if (parse->segment.rate < 0.0 && !parse->priv->buffers_queued) {
/* reverse playback, and no frames found yet, so we are skipping
* the leading part of a fragment, which may form the tail of
* fragment coming later, hopefully subclass skips efficiently ... */
timestamp = gst_adapter_prev_timestamp (parse->priv->adapter, NULL);
outbuf = gst_adapter_take_buffer (parse->priv->adapter, skip);
outbuf = gst_buffer_make_writable (outbuf);
GST_BUFFER_TIMESTAMP (outbuf) = timestamp;
parse->priv->buffers_pending =
g_slist_prepend (parse->priv->buffers_pending, outbuf);
outbuf = NULL;
} else {
gst_adapter_flush (parse->priv->adapter, skip);
}
if (!parse->priv->discont)
parse->priv->sync_offset = parse->priv->offset;
parse->priv->offset += skip;
parse->priv->discont = TRUE;
} else if (!flush) {
if (skip == 0 && flush == 0) {
GST_LOG_OBJECT (parse, "nothing skipped and no frames finished, "
"breaking to get more data");
goto done;
}
if ((ret = gst_base_parse_check_sync (parse)) != GST_FLOW_OK) {
goto done;
}
}
/* Grab lock to prevent a race with FLUSH_START handler */
@ -2752,8 +2795,9 @@ gst_base_parse_handle_previous_fragment (GstBaseParse * parse)
/* offset will increase again as fragment is processed/parsed */
parse->priv->last_offset = offset;
gst_base_parse_start_fragment (parse);
gst_adapter_push (parse->priv->adapter, buffer);
ret = gst_base_parse_process_fragment (parse, FALSE);
ret = gst_base_parse_finish_fragment (parse, TRUE);
if (ret != GST_FLOW_OK)
goto exit;
@ -2771,7 +2815,7 @@ static GstFlowReturn
gst_base_parse_scan_frame (GstBaseParse * parse, GstBaseParseClass * klass,
gboolean full)
{
GstBuffer *buffer, *outbuf;
GstBuffer *buffer;
GstFlowReturn ret = GST_FLOW_OK;
guint fsize, min_size;
gint flushed = 0;
@ -2789,14 +2833,18 @@ gst_base_parse_scan_frame (GstBaseParse * parse, GstBaseParseClass * klass,
while (TRUE) {
min_size = MAX (parse->priv->min_frame_size, fsize);
GST_LOG_OBJECT (parse, "reading buffer size %u", min_size);
ret = gst_base_parse_pull_range (parse, min_size, &buffer);
if (ret != GST_FLOW_OK)
goto done;
/* if we got a short read, inform subclass we are draining leftover
* and no more is to be expected */
if (gst_buffer_get_size (buffer) < min_size)
if (gst_buffer_get_size (buffer) < min_size) {
GST_LOG_OBJECT (parse, "... but did not get that; marked draining");
parse->priv->drain = TRUE;
}
if (parse->priv->detecting) {
ret = klass->detect (parse, buffer);
@ -2822,34 +2870,11 @@ gst_base_parse_scan_frame (GstBaseParse * parse, GstBaseParseClass * klass,
/* Else handle this buffer normally */
}
/* might need it later on */
gst_buffer_ref (buffer);
ret = gst_base_parse_handle_buffer (parse, buffer, &skip, &flushed);
if (ret != GST_FLOW_OK)
break;
if (skip > 0) {
GST_LOG_OBJECT (parse, "finding sync, skipping %d bytes", skip);
if (full && parse->segment.rate < 0.0 && !parse->priv->buffers_queued) {
/* reverse playback, and no frames found yet, so we are skipping
* the leading part of a fragment, which may form the tail of
* fragment coming later, hopefully subclass skips efficiently ... */
outbuf = gst_buffer_copy_region (buffer, GST_BUFFER_COPY_ALL, 0, skip);
parse->priv->buffers_pending =
g_slist_prepend (parse->priv->buffers_pending, outbuf);
outbuf = NULL;
}
if (!parse->priv->discont)
parse->priv->sync_offset = parse->priv->offset;
parse->priv->offset += skip;
parse->priv->discont = TRUE;
} else {
/* default to reasonable increase */
fsize += 64 * 1024;
}
/* no longer needed */
gst_buffer_unref (buffer);
/* changed offset means something happened,
/* something flushed means something happened,
* and we should bail out of this loop so as not to occupy
* the task thread indefinitely */
if (flushed) {
@ -2863,10 +2888,13 @@ gst_base_parse_scan_frame (GstBaseParse * parse, GstBaseParseClass * klass,
ret = GST_FLOW_EOS;
break;
}
parse->priv->drain = FALSE;
if ((ret = gst_base_parse_check_sync (parse)) != GST_FLOW_OK) {
goto done;
/* otherwise, get some more data
* note that is checked this does not happen indefinitely */
if (!skip) {
GST_LOG_OBJECT (parse, "getting some more data");
fsize += 64 * 1024;
}
parse->priv->drain = FALSE;
}
done:
@ -2905,7 +2933,7 @@ gst_base_parse_loop (GstPad * pad)
parse->segment.position >= parse->segment.stop) {
GST_DEBUG_OBJECT (parse, "downstream has reached end of segment");
/* push what was accumulated during loop run */
gst_base_parse_process_fragment (parse, TRUE);
gst_base_parse_finish_fragment (parse, FALSE);
/* force previous fragment */
parse->priv->offset = -1;
ret = GST_FLOW_OK;