diff --git a/libs/gst/base/gstbaseparse.c b/libs/gst/base/gstbaseparse.c index 02a4b99d20..5677001cbc 100644 --- a/libs/gst/base/gstbaseparse.c +++ b/libs/gst/base/gstbaseparse.c @@ -50,14 +50,16 @@ * * Set-up phase * - * GstBaseParse class calls @set_sink_caps to inform the subclass about - * incoming sinkpad caps. Subclass should set the srcpad caps accordingly. - * - * * GstBaseParse calls @start to inform subclass that data processing is * about to start now. * * + * GstBaseParse class calls @set_sink_caps to inform the subclass about + * incoming sinkpad caps. Subclass could already set the srcpad caps + * accordingly, but this might be delayed until calling + * gst_base_parse_finish_frame() with a non-queued frame. + * + * * At least at this point subclass needs to tell the GstBaseParse class * how big data chunks it wants to receive (min_frame_size). It can do * this with gst_base_parse_set_min_frame_size(). @@ -78,34 +80,39 @@ * * * A buffer of (at least) min_frame_size bytes is passed to subclass with - * @check_valid_frame. Subclass checks the contents and returns TRUE - * if the buffer contains a valid frame. It also needs to set the - * @framesize according to the detected frame size. If buffer didn't - * contain a valid frame, this call must return FALSE and optionally - * set the @skipsize value to inform base class that how many bytes - * it needs to skip in order to find a valid frame. @framesize can always - * indicate a new minimum for current frame parsing. Indicating G_MAXUINT - * for requested amount means subclass simply needs best available - * subsequent data. In push mode this amounts to an additional input buffer - * (thus minimal additional latency), in pull mode this amounts to some - * arbitrary reasonable buffer size increase. The passed buffer - * is read-only. Note that @check_valid_frame might receive any small - * amount of input data when leftover data is being drained (e.g. at EOS). - * - * - * After valid frame is found, it will be passed again to subclass with - * @parse_frame call. Now subclass is responsible for parsing the - * frame contents and setting the caps, and buffer metadata (e.g. - * buffer timestamp and duration, or keyframe if applicable). + * @handle_frame. Subclass checks the contents and can optionally + * return GST_FLOW_OK along with an amount of data to be skipped to find + * a valid frame (which will result in a subsequent DISCONT). + * If, otherwise, the buffer does not hold a complete frame, + * @handle_frame can merely return and will be called again when additional + * data is available. In push mode this amounts to an + * additional input buffer (thus minimal additional latency), in pull mode + * this amounts to some arbitrary reasonable buffer size increase. + * Of course, gst_base_parse_set_min_size() could also be used if a very + * specific known amount of additional data is required. + * If, however, the buffer holds a complete valid frame, it can pass + * the size of this frame to gst_base_parse_finish_frame(). + * If acting as a converter, it can also merely indicate consumed input data + * while simultaneously providing custom output data. + * Note that baseclass performs some processing (such as tracking + * overall consumed data rate versus duration) for each finished frame, + * but other state is only updated upon each call to @handle_frame + * (such as tracking upstream input timestamp). + * + * Subclass is also responsible for setting the buffer metadata + * (e.g. buffer timestamp and duration, or keyframe if applicable). * (although the latter can also be done by GstBaseParse if it is * appropriately configured, see below). Frame is provided with * timestamp derived from upstream (as much as generally possible), * duration obtained from configuration (see below), and offset * if meaningful (in pull mode). + * + * Note that @check_valid_frame might receive any small + * amount of input data when leftover data is being drained (e.g. at EOS). * * - * Finally the buffer can be pushed downstream and the parsing loop starts - * over again. Just prior to actually pushing the buffer in question, + * As part of finish frame processing, + * just prior to actually pushing the buffer in question, * it is passed to @pre_push_frame which gives subclass yet one * last chance to examine buffer metadata, or to send some custom (tag) * events, or to perform custom (segment) filtering. @@ -154,12 +161,9 @@ * done with gst_base_parse_set_min_frame_size() function. * * - * Examine data chunks passed to subclass with @check_valid_frame - * and tell if they contain a valid frame - * - * - * Set the caps and timestamp to frame that is passed to subclass with - * @parse_frame function. + * Examine data chunks passed to subclass with @handle_frame and pass + * proper frame(s) to gst_base_parse_finish_frame(), and setting src pad + * caps and timestamps on frame. * * Provide conversion functions * @@ -262,6 +266,7 @@ struct _GstBaseParsePrivate GstClockTime frame_duration; gboolean seen_keyframe; gboolean is_video; + gint flushed; guint64 framecount; guint64 bytecount; @@ -317,8 +322,14 @@ struct _GstBaseParsePrivate /* Newsegment event to be sent after SEEK */ GstEvent *pending_segment; - /* push mode helper frame */ - GstBaseParseFrame frame; + /* frame previously passed to subclass (might be re-used) */ + GstBaseParseFrame *prev_frame; + /* offset corresponding to above frame */ + gint64 prev_offset; + /* whether we are merely scanning for a frame */ + gboolean scanning; + /* ... and resulting frame, if any */ + GstBaseParseFrame *scanned_frame; /* TRUE if we're still detecting the format, i.e. * if ::detect() is still called for future buffers */ @@ -405,8 +416,6 @@ static GstFlowReturn gst_base_parse_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer); static void gst_base_parse_loop (GstPad * pad); -static gboolean gst_base_parse_check_frame (GstBaseParse * parse, - GstBaseParseFrame * frame, guint * framesize, gint * skipsize); static GstFlowReturn gst_base_parse_parse_frame (GstBaseParse * parse, GstBaseParseFrame * frame); @@ -510,8 +519,6 @@ gst_base_parse_class_init (GstBaseParseClass * klass) #endif /* Default handlers */ - klass->check_valid_frame = gst_base_parse_check_frame; - klass->parse_frame = gst_base_parse_parse_frame; klass->src_event = gst_base_parse_src_eventfunc; klass->convert = gst_base_parse_convert_default; @@ -748,9 +755,10 @@ gst_base_parse_reset (GstBaseParse * parse) gst_adapter_clear (parse->priv->adapter); /* we know it is not alloc'ed, but maybe other stuff to free, some day ... */ - parse->priv->frame._private_flags |= - GST_BASE_PARSE_FRAME_PRIVATE_FLAG_NOALLOC; - gst_base_parse_frame_free (&parse->priv->frame); + if (parse->priv->prev_frame) { + gst_base_parse_frame_free (parse->priv->prev_frame); + parse->priv->prev_frame = NULL; + } g_list_foreach (parse->priv->detect_buffers, (GFunc) gst_buffer_unref, NULL); g_list_free (parse->priv->detect_buffers); @@ -759,27 +767,6 @@ gst_base_parse_reset (GstBaseParse * parse) GST_OBJECT_UNLOCK (parse); } -/* gst_base_parse_check_frame: - * @parse: #GstBaseParse. - * @buffer: GstBuffer. - * @framesize: This will be set to tell the found frame size in bytes. - * @skipsize: Output parameter that tells how much data needs to be skipped - * in order to find the following frame header. - * - * Default callback for check_valid_frame. - * - * Returns: Always TRUE. - */ -static gboolean -gst_base_parse_check_frame (GstBaseParse * parse, - GstBaseParseFrame * frame, guint * framesize, gint * skipsize) -{ - *framesize = gst_buffer_get_size (frame->buffer); - *skipsize = 0; - return TRUE; -} - - /* gst_base_parse_parse_frame: * @parse: #GstBaseParse. * @buffer: #GstBuffer. @@ -1091,9 +1078,10 @@ gst_base_parse_sink_eventfunc (GstBaseParse * parse, GstEvent * event) parse->priv->flushing = FALSE; parse->priv->discont = TRUE; parse->priv->last_ts = GST_CLOCK_TIME_NONE; - parse->priv->frame._private_flags |= - GST_BASE_PARSE_FRAME_PRIVATE_FLAG_NOALLOC; - gst_base_parse_frame_free (&parse->priv->frame); + if (parse->priv->prev_frame) { + gst_base_parse_frame_free (parse->priv->prev_frame); + parse->priv->prev_frame = NULL; + } break; case GST_EVENT_EOS: @@ -1688,6 +1676,120 @@ gst_base_parse_queue_frame (GstBaseParse * parse, GstBaseParseFrame * frame) } } +/* makes sure that @buf is properly prepared and decorated for passing + * to baseclass, and an equally setup frame is returned setup with @buf. + * Takes ownership of @buf. */ +static GstBaseParseFrame * +gst_base_parse_prepare_frame (GstBaseParse * parse, GstBuffer * buffer) +{ + GstBaseParseFrame *frame = NULL; + + buffer = gst_buffer_make_writable (buffer); + + GST_LOG_OBJECT (parse, + "preparing frame at offset %" G_GUINT64_FORMAT + " (%#" G_GINT64_MODIFIER "x) of size %" G_GSIZE_FORMAT, + GST_BUFFER_OFFSET (buffer), GST_BUFFER_OFFSET (buffer), + gst_buffer_get_size (buffer)); + + if (parse->priv->discont) { + GST_DEBUG_OBJECT (parse, "marking DISCONT"); + GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT); + parse->priv->discont = FALSE; + } + + GST_BUFFER_OFFSET (buffer) = parse->priv->offset; + + if (parse->priv->prev_frame) { + if (parse->priv->prev_offset == parse->priv->offset) { + frame = parse->priv->prev_frame; + } else { + gst_base_parse_frame_free (parse->priv->prev_frame); + } + parse->priv->prev_frame = NULL; + } + + if (!frame) { + frame = gst_base_parse_frame_new (buffer, 0, 0); + } + + /* also ensure to update state flags */ + gst_base_parse_frame_update (parse, frame, buffer); + gst_buffer_unref (buffer); + + frame->offset = parse->priv->prev_offset = parse->priv->offset; + + /* use default handler to provide initial (upstream) metadata */ + gst_base_parse_parse_frame (parse, frame); + + return frame; +} + +static void +gst_base_parse_unprepare_frame (GstBaseParse * parse, GstBaseParseFrame * frame) +{ + g_assert (parse->priv->prev_frame == NULL); + + parse->priv->prev_frame = frame; + gst_base_parse_frame_update (parse, frame, NULL); +} + +/* takes ownership of @buffer */ +static GstFlowReturn +gst_base_parse_handle_buffer (GstBaseParse * parse, GstBuffer * buffer, + gint * skip, gint * flushed) +{ + GstBaseParseClass *klass = GST_BASE_PARSE_GET_CLASS (parse); + GstBaseParseFrame *frame; + GstFlowReturn ret; + + g_return_val_if_fail (skip != NULL || flushed != NULL, GST_FLOW_ERROR); + + /* track what is being flushed during this single round of frame processing */ + parse->priv->flushed = 0; + *skip = 0; + + /* make it easy for _finish_frame to pick up input data */ + if (parse->priv->pad_mode == GST_PAD_MODE_PULL) { + gst_buffer_ref (buffer); + gst_adapter_push (parse->priv->adapter, buffer); + } + + frame = gst_base_parse_prepare_frame (parse, buffer); + ret = klass->handle_frame (parse, frame, skip); + gst_base_parse_unprepare_frame (parse, frame); + + if (parse->priv->pad_mode == GST_PAD_MODE_PULL) { + gst_adapter_clear (parse->priv->adapter); + } + + *flushed = parse->priv->flushed; + + GST_LOG_OBJECT (parse, "handle_frame skipped %d, flushed %d", + *skip, *flushed); + + /* subclass can only do one of these, or semantics are too unclear */ + g_assert (*skip == 0 || *flushed == 0); + + /* if it did something, clear frame state, + * though this should also trigger the offset check anyway */ + if (*skip != 0 || *flushed != 0) { + GST_LOG_OBJECT (parse, "clearing prev frame"); + gst_base_parse_frame_free (parse->priv->prev_frame); + parse->priv->prev_frame = NULL; + } + + parse->priv->offset += *flushed; + +#ifndef GST_DISABLE_GST_DEBUG + if (ret != GST_FLOW_OK) { + GST_DEBUG_OBJECT (parse, "handle_frame returned %d", ret); + } +#endif + + return ret; +} + /* gst_base_parse_handle_and_push_buffer: * @parse: #GstBaseParse. * @klass: #GstBaseParseClass. @@ -1702,48 +1804,25 @@ gst_base_parse_queue_frame (GstBaseParse * parse, GstBaseParseFrame * frame) */ static GstFlowReturn gst_base_parse_handle_and_push_frame (GstBaseParse * parse, - GstBaseParseClass * klass, GstBaseParseFrame * frame) + GstBaseParseFrame * frame) { - GstFlowReturn ret; gint64 offset; GstBuffer *buffer; g_return_val_if_fail (frame != NULL, GST_FLOW_ERROR); - buffer = frame->buffer; - - if (parse->priv->discont) { - GST_DEBUG_OBJECT (parse, "marking DISCONT"); - GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT); - parse->priv->discont = FALSE; - } - /* some one-time start-up */ if (G_UNLIKELY (!parse->priv->framecount)) { gst_base_parse_check_seekability (parse); gst_base_parse_check_upstream (parse); } - GST_LOG_OBJECT (parse, - "parsing frame at offset %" G_GUINT64_FORMAT - " (%#" G_GINT64_MODIFIER "x) of size %" G_GSIZE_FORMAT, - GST_BUFFER_OFFSET (buffer), GST_BUFFER_OFFSET (buffer), - gst_buffer_get_size (buffer)); - - /* use default handler to provide initial (upstream) metadata */ - gst_base_parse_parse_frame (parse, frame); - - /* store offset as it might get overwritten */ - offset = GST_BUFFER_OFFSET (buffer); - ret = klass->parse_frame (parse, frame); - /* sync */ buffer = frame->buffer; - /* subclass must play nice */ - g_return_val_if_fail (buffer != NULL, GST_FLOW_ERROR); + offset = frame->offset; /* check if subclass/format can provide ts. * If so, that allows and enables extra seek and duration determining options */ - if (G_UNLIKELY (parse->priv->first_frame_offset < 0 && ret == GST_FLOW_OK)) { + if (G_UNLIKELY (parse->priv->first_frame_offset < 0)) { if (GST_BUFFER_TIMESTAMP_IS_VALID (buffer) && parse->priv->has_timing_info && parse->priv->pad_mode == GST_PAD_MODE_PULL) { parse->priv->first_frame_offset = offset; @@ -1787,19 +1866,6 @@ gst_base_parse_handle_and_push_frame (GstBaseParse * parse, GST_BUFFER_TIMESTAMP (buffer), !GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_DELTA_UNIT), FALSE); - /* First buffers are dropped, this means that the subclass needs more - * frames to decide on the format and queues them internally */ - /* convert internal flow to OK and mark discont for the next buffer. */ - if (ret == GST_BASE_PARSE_FLOW_DROPPED) { - gst_base_parse_frame_free (frame); - return GST_FLOW_OK; - } else if (ret == GST_BASE_PARSE_FLOW_QUEUED) { - gst_base_parse_queue_frame (parse, frame); - return GST_FLOW_OK; - } else if (ret != GST_FLOW_OK) { - return ret; - } - /* All OK, push queued frames if there are any */ if (G_UNLIKELY (!g_queue_is_empty (&parse->priv->queued_frames))) { GstBaseParseFrame *queued_frame; @@ -1815,12 +1881,11 @@ gst_base_parse_handle_and_push_frame (GstBaseParse * parse, /** * gst_base_parse_push_frame: * @parse: #GstBaseParse. - * @frame: (transfer full): a #GstBaseParseFrame + * @frame: (transfer none): a #GstBaseParseFrame * - * Pushes the frame downstream, sends any pending events and - * does some timestamp and segment handling. Takes ownership - * of @frame and will clear it (if it was initialised with - * gst_base_parse_frame_init()) or free it. + * Pushes the frame's buffer downstream, sends any pending events and + * does some timestamp and segment handling. Takes ownership of + * frame's buffer, though caller retains ownership of @frame. * * This must be called with sinkpad STREAM_LOCK held. * @@ -1852,8 +1917,7 @@ gst_base_parse_push_frame (GstBaseParse * parse, GstBaseParseFrame * frame) GST_TIME_ARGS (GST_BUFFER_DURATION (buffer))); /* update stats */ - size = gst_buffer_get_size (buffer); - parse->priv->bytecount += size; + parse->priv->bytecount += frame->size; if (G_LIKELY (!(frame->flags & GST_BASE_PARSE_FRAME_FLAG_NO_FRAME))) { parse->priv->framecount++; if (GST_BUFFER_DURATION_IS_VALID (buffer)) { @@ -1968,12 +2032,20 @@ gst_base_parse_push_frame (GstBaseParse * parse, GstBaseParseFrame * frame) } /* take final ownership of frame buffer */ - buffer = frame->buffer; - frame->buffer = NULL; + if (frame->out_buffer) { + buffer = frame->out_buffer; + frame->out_buffer = NULL; + gst_buffer_replace (&frame->buffer, NULL); + } else { + buffer = frame->buffer; + frame->buffer = NULL; + } /* subclass must play nice */ g_return_val_if_fail (buffer != NULL, GST_FLOW_ERROR); + size = gst_buffer_get_size (buffer); + parse->priv->seen_keyframe |= parse->priv->is_video && !GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_DELTA_UNIT); @@ -2035,8 +2107,6 @@ gst_base_parse_push_frame (GstBaseParse * parse, GstBaseParseFrame * frame) parse->segment.position < last_stop) parse->segment.position = last_stop; - gst_base_parse_frame_free (frame); - return ret; /* ERRORS */ @@ -2047,6 +2117,93 @@ no_caps: } } +/** + * gst_base_parse_finish_frame: + * @parse: a #GstBaseParse + * @frame: a #GstBaseParseFrame + * @size: consumed input data represented by frame + * + * Collects parsed data and pushes this downstream. + * Source pad caps must be set when this is called. + * + * If @frame's out_buffer is set, that will be used as subsequent frame data. + * Otherwise, @size samples will be taken from the input and used for output, + * and the output's metadata (timestamps etc) will be taken as (optionally) + * set by the subclass on @frame's (input) buffer. + * + * Note that the latter buffer is invalidated by this call, whereas the + * caller retains ownership of @frame. + * + * Returns: a #GstFlowReturn that should be escalated to caller (of caller) + * + * Since: 0.11.1 + */ +GstFlowReturn +gst_base_parse_finish_frame (GstBaseParse * parse, GstBaseParseFrame * frame, + gint size) +{ + GstFlowReturn ret = GST_FLOW_OK; + + g_return_val_if_fail (frame != NULL, GST_FLOW_ERROR); + g_return_val_if_fail (frame->buffer != NULL, GST_FLOW_ERROR); + g_return_val_if_fail (size > 0 || frame->out_buffer, GST_FLOW_ERROR); + g_return_val_if_fail (gst_adapter_available (parse->priv->adapter) >= size, + GST_FLOW_ERROR); + + GST_LOG_OBJECT (parse, "finished frame at offset %" G_GUINT64_FORMAT ", " + "flushing size %d", frame->offset, size); + + if (parse->priv->scanning && frame->buffer) { + if (!parse->priv->scanned_frame) { + parse->priv->scanned_frame = gst_base_parse_frame_copy (frame); + } + goto exit; + } + + parse->priv->flushed += size; + + /* either PUSH or PULL mode arranges for adapter data */ + /* ensure output buffer */ + if (!frame->out_buffer) { + GstBuffer *src, *dest; + + frame->out_buffer = gst_adapter_take_buffer (parse->priv->adapter, size); + dest = frame->out_buffer; + src = frame->buffer; + GST_BUFFER_PTS (dest) = GST_BUFFER_PTS (src); + GST_BUFFER_DTS (dest) = GST_BUFFER_DTS (src); + GST_BUFFER_OFFSET (dest) = GST_BUFFER_OFFSET (src); + GST_BUFFER_DURATION (dest) = GST_BUFFER_DURATION (src); + GST_BUFFER_OFFSET_END (dest) = GST_BUFFER_OFFSET_END (src); + GST_MINI_OBJECT_FLAGS (dest) = GST_MINI_OBJECT_FLAGS (src); + } else { + gst_adapter_flush (parse->priv->adapter, size); + } + + /* use as input for subsequent processing */ + gst_buffer_replace (&frame->buffer, frame->out_buffer); + gst_buffer_unref (frame->out_buffer); + frame->out_buffer = NULL; + + /* subclass might queue frames/data internally if it needs more + * frames to decide on the format, or might request us to queue here. */ + if (frame->flags & GST_BASE_PARSE_FRAME_FLAG_DROP) { + gst_buffer_replace (&frame->buffer, NULL); + goto exit; + } else if (frame->flags & GST_BASE_PARSE_FRAME_FLAG_QUEUE) { + GstBaseParseFrame *copy; + + copy = gst_base_parse_frame_copy (frame); + copy->flags &= ~GST_BASE_PARSE_FRAME_FLAG_QUEUE; + gst_base_parse_queue_frame (parse, copy); + goto exit; + } + + ret = gst_base_parse_handle_and_push_frame (parse, frame); + +exit: + return ret; +} /* gst_base_parse_drain: * @@ -2265,9 +2422,8 @@ gst_base_parse_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer) guint fsize = 1; gint skip = -1; const guint8 *data; - guint old_min_size = 0, min_size, av; + guint min_size, av; GstClockTime timestamp; - GstBaseParseFrame *frame; parse = GST_BASE_PARSE (parent); bclass = GST_BASE_PARSE_GET_CLASS (parse); @@ -2347,16 +2503,18 @@ gst_base_parse_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer) /* And now handle the current buffer if detection worked */ } - frame = &parse->priv->frame; - if (G_LIKELY (buffer)) { GST_LOG_OBJECT (parse, "buffer size: %" G_GSIZE_FORMAT ", offset = %" G_GINT64_FORMAT, gst_buffer_get_size (buffer), GST_BUFFER_OFFSET (buffer)); if (G_UNLIKELY (parse->priv->passthrough)) { - gst_base_parse_frame_init (frame); - frame->buffer = gst_buffer_make_writable (buffer); - return gst_base_parse_push_frame (parse, frame); + GstBaseParseFrame frame; + + gst_base_parse_frame_init (&frame); + frame.buffer = gst_buffer_make_writable (buffer); + ret = gst_base_parse_push_frame (parse, &frame); + gst_base_parse_frame_free (&frame); + return ret; } /* upstream feeding us in reverse playback; * gather each fragment, then process it in single run */ @@ -2371,25 +2529,11 @@ gst_base_parse_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer) gst_adapter_push (parse->priv->adapter, buffer); } - if (G_UNLIKELY (buffer && - GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_DISCONT))) { - frame->_private_flags |= GST_BASE_PARSE_FRAME_PRIVATE_FLAG_NOALLOC; - gst_base_parse_frame_free (frame); - } - /* Parse and push as many frames as possible */ /* Stop either when adapter is empty or we are flushing */ while (!parse->priv->flushing) { - gboolean res; + gint flush = 0; - /* maintain frame state for a single frame parsing round across _chain calls, - * so only init when needed */ - if (!frame->_private_flags) - gst_base_parse_frame_init (frame); - - tmpbuf = gst_buffer_new (); - - old_min_size = 0; /* Synchronization loop */ for (;;) { /* note: if subclass indicates MAX fsize, @@ -2397,16 +2541,10 @@ gst_base_parse_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer) min_size = MAX (parse->priv->min_frame_size, fsize); av = gst_adapter_available (parse->priv->adapter); - /* loop safety check */ - if (G_UNLIKELY (old_min_size >= min_size)) - goto invalid_min; - old_min_size = min_size; - if (G_UNLIKELY (parse->priv->drain)) { min_size = av; GST_DEBUG_OBJECT (parse, "draining, data left: %d", min_size); if (G_UNLIKELY (!min_size)) { - gst_buffer_unref (tmpbuf); goto done; } } @@ -2415,42 +2553,33 @@ gst_base_parse_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer) if (av < min_size) { GST_DEBUG_OBJECT (parse, "not enough data available (only %d bytes)", av); - gst_buffer_unref (tmpbuf); goto done; } + /* move along with upstream timestamp (if any), + * but interpolate in between */ + timestamp = gst_adapter_prev_timestamp (parse->priv->adapter, NULL); + if (GST_CLOCK_TIME_IS_VALID (timestamp) && + (parse->priv->prev_ts != timestamp)) { + parse->priv->prev_ts = parse->priv->next_ts = timestamp; + } + /* always pass all available data */ data = gst_adapter_map (parse->priv->adapter, av); + /* arrange for actual data to be copied if subclass tries to, + * since what is passed is tied to the adapter */ + tmpbuf = gst_buffer_new (); gst_buffer_take_memory (tmpbuf, -1, - gst_memory_new_wrapped (GST_MEMORY_FLAG_READONLY, - (gpointer) data, NULL, av, 0, av)); - GST_BUFFER_OFFSET (tmpbuf) = parse->priv->offset; + gst_memory_new_wrapped (GST_MEMORY_FLAG_READONLY | + GST_MEMORY_FLAG_NO_SHARE, (gpointer) data, NULL, av, 0, av)); - if (parse->priv->discont) { - GST_DEBUG_OBJECT (parse, "marking DISCONT"); - GST_BUFFER_FLAG_SET (tmpbuf, GST_BUFFER_FLAG_DISCONT); - } + /* keep the adapter mapped, so keep track of what has to be flushed */ + ret = gst_base_parse_handle_buffer (parse, tmpbuf, &skip, &flush); + tmpbuf = NULL; - skip = -1; - gst_base_parse_frame_update (parse, frame, tmpbuf); - res = bclass->check_valid_frame (parse, frame, &fsize, &skip); gst_adapter_unmap (parse->priv->adapter); - gst_buffer_replace (&frame->buffer, NULL); - gst_buffer_remove_memory_range (tmpbuf, 0, -1); - if (res) { - if (gst_adapter_available (parse->priv->adapter) < fsize) { - GST_DEBUG_OBJECT (parse, "found valid frame but not enough data" - " available (only %" G_GSIZE_FORMAT " bytes)", - gst_adapter_available (parse->priv->adapter)); - gst_buffer_unref (tmpbuf); - goto done; - } - GST_LOG_OBJECT (parse, "valid frame of size %d at pos %d", fsize, skip); - break; - } - if (skip == -1) { - /* subclass didn't touch this value. By default we skip 1 byte */ - skip = 1; + if (ret != GST_FLOW_OK) { + goto done; } if (skip > 0) { GST_LOG_OBJECT (parse, "finding sync, skipping %d bytes", skip); @@ -2468,28 +2597,18 @@ gst_base_parse_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer) } else { gst_adapter_flush (parse->priv->adapter, skip); } - parse->priv->offset += skip; if (!parse->priv->discont) parse->priv->sync_offset = parse->priv->offset; + parse->priv->offset += skip; parse->priv->discont = TRUE; - /* something changed least; nullify loop check */ - old_min_size = 0; - } - /* skip == 0 should imply subclass set min_size to need more data; - * we check this shortly */ - if ((ret = gst_base_parse_check_sync (parse)) != GST_FLOW_OK) { - gst_buffer_unref (tmpbuf); + } else if (!flush) { + GST_LOG_OBJECT (parse, "nothing skipped and no frames finished, " + "breaking to get more data"); + goto done; + } + if ((ret = gst_base_parse_check_sync (parse)) != GST_FLOW_OK) { goto done; } - } - gst_buffer_unref (tmpbuf); - tmpbuf = NULL; - - if (skip > 0) { - /* Subclass found the sync, but still wants to skip some data */ - GST_LOG_OBJECT (parse, "skipping %d bytes", skip); - gst_adapter_flush (parse->priv->adapter, skip); - parse->priv->offset += skip; } /* Grab lock to prevent a race with FLUSH_START handler */ @@ -2501,47 +2620,11 @@ gst_base_parse_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer) GST_PAD_STREAM_UNLOCK (parse->srcpad); break; } - - /* move along with upstream timestamp (if any), - * but interpolate in between */ - timestamp = gst_adapter_prev_timestamp (parse->priv->adapter, NULL); - if (GST_CLOCK_TIME_IS_VALID (timestamp) && - (parse->priv->prev_ts != timestamp)) { - parse->priv->prev_ts = parse->priv->next_ts = timestamp; - } - - /* FIXME: Would it be more efficient to make a subbuffer instead? */ - outbuf = gst_adapter_take_buffer (parse->priv->adapter, fsize); - outbuf = gst_buffer_make_writable (outbuf); - - /* Subclass may want to know the data offset */ - GST_BUFFER_OFFSET (outbuf) = parse->priv->offset; - parse->priv->offset += fsize; - GST_BUFFER_TIMESTAMP (outbuf) = GST_CLOCK_TIME_NONE; - GST_BUFFER_DURATION (outbuf) = GST_CLOCK_TIME_NONE; - - frame->buffer = outbuf; - ret = gst_base_parse_handle_and_push_frame (parse, bclass, frame); - GST_PAD_STREAM_UNLOCK (parse->srcpad); - - if (ret != GST_FLOW_OK) { - GST_LOG_OBJECT (parse, "push returned %d", ret); - break; - } } done: GST_LOG_OBJECT (parse, "chain leaving"); return ret; - - /* ERRORS */ -invalid_min: - { - GST_ELEMENT_ERROR (parse, STREAM, FAILED, (NULL), - ("min_size evolution %d -> %d; breaking to avoid looping", - old_min_size, min_size)); - return GST_FLOW_ERROR; - } } /* pull @size bytes at current offset, @@ -2686,15 +2769,14 @@ exit: * ajusts sync, drain and offset going along */ static GstFlowReturn gst_base_parse_scan_frame (GstBaseParse * parse, GstBaseParseClass * klass, - GstBaseParseFrame * frame, gboolean full) + gboolean full) { GstBuffer *buffer, *outbuf; GstFlowReturn ret = GST_FLOW_OK; - guint fsize = 1, min_size, old_min_size = 0; + guint fsize, min_size; + gint flushed = 0; gint skip = 0; - g_return_val_if_fail (frame != NULL, GST_FLOW_ERROR); - GST_LOG_OBJECT (parse, "scanning for frame at offset %" G_GUINT64_FORMAT " (%#" G_GINT64_MODIFIER "x)", parse->priv->offset, parse->priv->offset); @@ -2705,23 +2787,12 @@ gst_base_parse_scan_frame (GstBaseParse * parse, GstBaseParseClass * klass, fsize = 64 * 1024; while (TRUE) { - gboolean res; - min_size = MAX (parse->priv->min_frame_size, fsize); - /* loop safety check */ - if (G_UNLIKELY (old_min_size >= min_size)) - goto invalid_min; - old_min_size = min_size; ret = gst_base_parse_pull_range (parse, min_size, &buffer); if (ret != GST_FLOW_OK) goto done; - if (parse->priv->discont) { - GST_DEBUG_OBJECT (parse, "marking DISCONT"); - GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT); - } - /* if we got a short read, inform subclass we are draining leftover * and no more is to be expected */ if (gst_buffer_get_size (buffer) < min_size) @@ -2751,18 +2822,12 @@ gst_base_parse_scan_frame (GstBaseParse * parse, GstBaseParseClass * klass, /* Else handle this buffer normally */ } - skip = -1; - gst_base_parse_frame_update (parse, frame, buffer); - res = klass->check_valid_frame (parse, frame, &fsize, &skip); - gst_buffer_replace (&frame->buffer, NULL); - if (res) { - parse->priv->drain = FALSE; - GST_LOG_OBJECT (parse, "valid frame of size %d at pos %d", fsize, skip); + /* might need it later on */ + gst_buffer_ref (buffer); + ret = gst_base_parse_handle_buffer (parse, buffer, &skip, &flushed); + if (ret != GST_FLOW_OK) break; - } - parse->priv->drain = FALSE; - if (skip == -1) - skip = 1; + if (skip > 0) { GST_LOG_OBJECT (parse, "finding sync, skipping %d bytes", skip); if (full && parse->segment.rate < 0.0 && !parse->priv->buffers_queued) { @@ -2774,61 +2839,38 @@ gst_base_parse_scan_frame (GstBaseParse * parse, GstBaseParseClass * klass, g_slist_prepend (parse->priv->buffers_pending, outbuf); outbuf = NULL; } - parse->priv->offset += skip; if (!parse->priv->discont) parse->priv->sync_offset = parse->priv->offset; + parse->priv->offset += skip; parse->priv->discont = TRUE; - /* something changed at least; nullify loop check */ - if (fsize == G_MAXUINT) - fsize = old_min_size + 64 * 1024; - old_min_size = 0; + } else { + /* default to reasonable increase */ + fsize += 64 * 1024; } - /* skip == 0 should imply subclass set min_size to need more data; - * we check this shortly */ - GST_DEBUG_OBJECT (parse, "finding sync..."); + /* no longer needed */ gst_buffer_unref (buffer); + /* changed offset means something happened, + * and we should bail out of this loop so as not to occupy + * the task thread indefinitely */ + if (flushed) { + GST_LOG_OBJECT (parse, "frame finished, breaking loop"); + break; + } + /* nothing flushed, no skip and draining, so nothing left to do */ + if (!skip && parse->priv->drain) { + GST_LOG_OBJECT (parse, "no activity or result when draining; " + "breaking loop and marking EOS"); + ret = GST_FLOW_EOS; + break; + } + parse->priv->drain = FALSE; if ((ret = gst_base_parse_check_sync (parse)) != GST_FLOW_OK) { goto done; } } - /* Does the subclass want to skip too? */ - if (skip > 0) - parse->priv->offset += skip; - else if (skip < 0) - skip = 0; - - if (fsize + skip <= gst_buffer_get_size (buffer)) { - outbuf = gst_buffer_copy_region (buffer, GST_BUFFER_COPY_ALL, skip, fsize); - GST_BUFFER_OFFSET (outbuf) = GST_BUFFER_OFFSET (buffer) + skip; - GST_BUFFER_TIMESTAMP (outbuf) = GST_CLOCK_TIME_NONE; - gst_buffer_unref (buffer); - } else { - gst_buffer_unref (buffer); - ret = gst_base_parse_pull_range (parse, fsize, &outbuf); - if (ret != GST_FLOW_OK) - goto done; - if (gst_buffer_get_size (outbuf) < fsize) { - gst_buffer_unref (outbuf); - ret = GST_FLOW_EOS; - } - } - - parse->priv->offset += fsize; - - frame->buffer = outbuf; - done: return ret; - - /* ERRORS */ -invalid_min: - { - GST_ELEMENT_ERROR (parse, STREAM, FAILED, (NULL), - ("min_size evolution %d -> %d; breaking to avoid looping", - old_min_size, min_size)); - return GST_FLOW_ERROR; - } } /* Loop that is used in pull mode to retrieve data from upstream */ @@ -2838,7 +2880,6 @@ gst_base_parse_loop (GstPad * pad) GstBaseParse *parse; GstBaseParseClass *klass; GstFlowReturn ret = GST_FLOW_OK; - GstBaseParseFrame frame; parse = GST_BASE_PARSE (gst_pad_get_parent (pad)); klass = GST_BASE_PARSE_GET_CLASS (parse); @@ -2855,14 +2896,10 @@ gst_base_parse_loop (GstPad * pad) } } - gst_base_parse_frame_init (&frame); - ret = gst_base_parse_scan_frame (parse, klass, &frame, TRUE); + ret = gst_base_parse_scan_frame (parse, klass, TRUE); if (ret != GST_FLOW_OK) goto done; - /* This always cleans up frame, even if error occurs */ - ret = gst_base_parse_handle_and_push_frame (parse, klass, &frame); - /* eat expected eos signalling past segment in reverse playback */ if (parse->segment.rate < 0.0 && ret == GST_FLOW_EOS && parse->segment.position >= parse->segment.stop) { @@ -3460,7 +3497,7 @@ gst_base_parse_find_frame (GstBaseParse * parse, gint64 * pos, gboolean orig_drain, orig_discont; GstFlowReturn ret = GST_FLOW_OK; GstBuffer *buf = NULL; - GstBaseParseFrame frame; + GstBaseParseFrame *sframe = NULL; g_return_val_if_fail (pos != NULL, GST_FLOW_ERROR); g_return_val_if_fail (time != NULL, GST_FLOW_ERROR); @@ -3479,37 +3516,36 @@ gst_base_parse_find_frame (GstBaseParse * parse, gint64 * pos, GST_DEBUG_OBJECT (parse, "scanning for frame starting at %" G_GINT64_FORMAT " (%#" G_GINT64_MODIFIER "x)", *pos, *pos); - gst_base_parse_frame_init (&frame); - /* jump elsewhere and locate next frame */ parse->priv->offset = *pos; - ret = gst_base_parse_scan_frame (parse, klass, &frame, FALSE); - if (ret != GST_FLOW_OK) + /* mark as scanning so frames don't get processed all the way */ + parse->priv->scanning = TRUE; + ret = gst_base_parse_scan_frame (parse, klass, FALSE); + parse->priv->scanning = FALSE; + /* retrieve frame found during scan */ + sframe = parse->priv->scanned_frame; + parse->priv->scanned_frame = NULL; + + if (ret != GST_FLOW_OK || !sframe) goto done; - buf = frame.buffer; - GST_LOG_OBJECT (parse, - "peek parsing frame at offset %" G_GUINT64_FORMAT - " (%#" G_GINT64_MODIFIER "x) of size %" G_GSIZE_FORMAT, - GST_BUFFER_OFFSET (buf), GST_BUFFER_OFFSET (buf), - gst_buffer_get_size (buf)); - /* get offset first, subclass parsing might dump other stuff in there */ - *pos = GST_BUFFER_OFFSET (buf); - ret = klass->parse_frame (parse, &frame); - buf = frame.buffer; + *pos = sframe->offset; + buf = sframe->buffer; + g_assert (buf); /* but it should provide proper time */ *time = GST_BUFFER_TIMESTAMP (buf); *duration = GST_BUFFER_DURATION (buf); - gst_base_parse_frame_free (&frame); - GST_LOG_OBJECT (parse, "frame with time %" GST_TIME_FORMAT " at offset %" G_GINT64_FORMAT, GST_TIME_ARGS (*time), *pos); done: + if (sframe) + gst_base_parse_frame_free (sframe); + /* restore state */ parse->priv->offset = orig_offset; parse->priv->discont = orig_discont; diff --git a/libs/gst/base/gstbaseparse.h b/libs/gst/base/gstbaseparse.h index fcdb391bfc..3ba117120a 100644 --- a/libs/gst/base/gstbaseparse.h +++ b/libs/gst/base/gstbaseparse.h @@ -65,16 +65,6 @@ G_BEGIN_DECLS */ #define GST_BASE_PARSE_FLOW_DROPPED GST_FLOW_CUSTOM_SUCCESS -/** - * GST_BASE_PARSE_FLOW_QUEUED: - * - * A #GstFlowReturn that can be returned from parse frame to indicate that - * the buffer will be queued to be pushed when the next OK - * - * Since: 0.10.33 - */ -#define GST_BASE_PARSE_FLOW_QUEUED GST_FLOW_CUSTOM_SUCCESS_1 - /* not public API, use accessor macros below */ #define GST_BASE_PARSE_FLAG_LOST_SYNC (1 << 0) #define GST_BASE_PARSE_FLAG_DRAINING (1 << 1) @@ -110,6 +100,11 @@ G_BEGIN_DECLS * @GST_BASE_PARSE_FRAME_FLAG_CLIP: @pre_push_frame can set this to indicate * that regular segment clipping can still be performed (as opposed to * any custom one having been done). + * @GST_BASE_PARSE_FRAME_FLAG_DROP: indicates to @finish_frame that the + * the frame should be dropped (and might be handled internall by subclass) + * @GST_BASE_PARSE_FRAME_FLAG_QUEUE: indicates to @finish_frame that the + * the frame should be queued for now and processed fully later + * when the first non-queued frame is finished * * Flags to be used in a #GstBaseParseFrame. * @@ -118,13 +113,17 @@ G_BEGIN_DECLS typedef enum { GST_BASE_PARSE_FRAME_FLAG_NONE = 0, GST_BASE_PARSE_FRAME_FLAG_NO_FRAME = (1 << 0), - GST_BASE_PARSE_FRAME_FLAG_CLIP = (1 << 1) + GST_BASE_PARSE_FRAME_FLAG_CLIP = (1 << 1), + GST_BASE_PARSE_FRAME_FLAG_DROP = (1 << 2), + GST_BASE_PARSE_FRAME_FLAG_QUEUE = (1 << 3) } GstBaseParseFrameFlags; /** * GstBaseParseFrame: - * @buffer: data to check for valid frame or parsed frame. - * Subclass is allowed to replace this buffer. + * @buffer: input data to be parsed for frames. + * @out_buffer: (optional) (replacement) output data. + * @offset: media specific offset of input frame + * Note that a converter may have a different one on the frame's buffer. * @overhead: subclass can set this to indicates the metadata overhead * for the given frame, which is then used to enable more accurate bitrate * computations. If this is -1, it is assumed that this frame should be @@ -145,9 +144,12 @@ typedef enum { */ typedef struct { GstBuffer * buffer; + GstBuffer * out_buffer; guint flags; + guint64 offset; gint overhead; /*< private >*/ + gint size; guint _gst_reserved_i[2]; gpointer _gst_reserved_p[2]; guint _private_flags; @@ -192,12 +194,17 @@ struct _GstBaseParse { * Allows closing external resources. * @set_sink_caps: allows the subclass to be notified of the actual caps set. * @get_sink_caps: allows the subclass to do its own sink get caps if needed. - * @check_valid_frame: Check if the given piece of data contains a valid - * frame. - * @parse_frame: Parse the already checked frame. Subclass need to - * set the buffer timestamp, duration, caps and possibly - * other necessary metadata. This is called with srcpad's - * STREAM_LOCK held. + * @handle_frame: Parses the input data into valid frames as defined by subclass + * which should be passed to gst_base_parse_finish_frame(). + * The frame's input buffer is guaranteed writable, + * whereas the input frame ownership is held by caller + * (so subclass should make a copy if it needs to hang on). + * Input buffer (data) is equally managed by baseclass and should also be + * copied (e.g. gst_buffer_copy_region()) when needed. + * Time metadata will already be set as much as possible by baseclass + * according to upstream information and/or subclass settings, + * though subclass may still set buffer timestamp and duration + * if desired. * @convert: Optional. * Convert between formats. * @event: Optional. @@ -234,14 +241,10 @@ struct _GstBaseParseClass { gboolean (*set_sink_caps) (GstBaseParse * parse, GstCaps * caps); - gboolean (*check_valid_frame) (GstBaseParse * parse, + GstFlowReturn (*handle_frame) (GstBaseParse * parse, GstBaseParseFrame * frame, - guint * framesize, gint * skipsize); - GstFlowReturn (*parse_frame) (GstBaseParse * parse, - GstBaseParseFrame * frame); - GstFlowReturn (*pre_push_frame) (GstBaseParse * parse, GstBaseParseFrame * frame); @@ -282,6 +285,10 @@ void gst_base_parse_frame_free (GstBaseParseFrame * frame); GstFlowReturn gst_base_parse_push_frame (GstBaseParse * parse, GstBaseParseFrame * frame); +GstFlowReturn gst_base_parse_finish_frame (GstBaseParse * parse, + GstBaseParseFrame * frame, + gint size); + void gst_base_parse_set_duration (GstBaseParse * parse, GstFormat fmt, gint64 duration,