baseparse: proper and more extended segment and seek handling

That is, loop pause handling, segment seek support, newsegment for gaps, etc
This commit is contained in:
Mark Nauwelaerts 2010-09-21 13:57:10 +02:00
parent ec195ab2e5
commit 174d2d46fc

View file

@ -1323,17 +1323,73 @@ gst_base_parse_push_buffer (GstBaseParse * parse, GstBuffer * buffer)
gst_buffer_set_caps (buffer, GST_PAD_CAPS (parse->srcpad));
/* segment times are typically estimates,
* actual frame data might lead subclass to different timestamps,
* so override segment start from what is supplied there */
if (G_UNLIKELY (parse->pending_segment && !parse->priv->passthrough &&
GST_CLOCK_TIME_IS_VALID (last_start))) {
gst_event_unref (parse->pending_segment);
/* stop time possibly lost this way,
* but unlikely and not really supported */
parse->pending_segment =
gst_event_new_new_segment (FALSE, parse->segment.rate,
parse->segment.format, last_start, -1, last_start);
/* segment adjustment magic; only if we are running the whole show */
if (!parse->priv->passthrough &&
(parse->priv->pad_mode == GST_ACTIVATE_PULL ||
parse->priv->upstream_seekable)) {
/* segment times are typically estimates,
* actual frame data might lead subclass to different timestamps,
* so override segment start from what is supplied there */
if (G_UNLIKELY (parse->pending_segment &&
GST_CLOCK_TIME_IS_VALID (last_start))) {
gst_event_unref (parse->pending_segment);
parse->segment.start =
MIN ((guint64) last_start, (guint64) parse->segment.stop);
GST_DEBUG_OBJECT (parse,
"adjusting pending segment start to %" GST_TIME_FORMAT,
GST_TIME_ARGS (parse->segment.start));
parse->pending_segment =
gst_event_new_new_segment (FALSE, parse->segment.rate,
parse->segment.format, parse->segment.start, parse->segment.stop,
parse->segment.start);
}
/* handle gaps, e.g. non-zero start-time, in as much not handled by above */
if (GST_CLOCK_TIME_IS_VALID (parse->segment.last_stop) &&
GST_CLOCK_TIME_IS_VALID (last_start)) {
GstClockTimeDiff diff;
/* only send newsegments with increasing start times,
* otherwise if these go back and forth downstream (sinks) increase
* accumulated time and running_time */
diff = GST_CLOCK_DIFF (parse->segment.last_stop, last_start);
if (G_UNLIKELY (diff > 2 * GST_SECOND && last_start > parse->segment.start
&& (!GST_CLOCK_TIME_IS_VALID (parse->segment.stop) ||
last_start < parse->segment.stop))) {
GST_DEBUG_OBJECT (parse,
"Gap of %" G_GINT64_FORMAT " ns detected in stream "
"(%" GST_TIME_FORMAT " -> %" GST_TIME_FORMAT "). "
"Sending updated NEWSEGMENT events", diff,
GST_TIME_ARGS (parse->segment.last_stop),
GST_TIME_ARGS (last_start));
if (G_UNLIKELY (parse->pending_segment)) {
gst_event_unref (parse->pending_segment);
parse->segment.start = last_start;
parse->pending_segment =
gst_event_new_new_segment (FALSE, parse->segment.rate,
parse->segment.format, parse->segment.start, parse->segment.stop,
parse->segment.start);
} else {
/* send newsegment events such that the gap is not accounted in
* accum time, hence running_time */
/* close ahead of gap */
gst_pad_push_event (parse->srcpad,
gst_event_new_new_segment (TRUE, parse->segment.rate,
parse->segment.format, parse->segment.last_stop,
parse->segment.last_stop, parse->segment.last_stop));
/* skip gap */
gst_pad_push_event (parse->srcpad,
gst_event_new_new_segment (FALSE, parse->segment.rate,
parse->segment.format, last_start, parse->segment.stop,
last_start));
}
/* align segment view with downstream,
* prevents double-counting accum when closing segment */
gst_segment_set_newsegment (&parse->segment, FALSE,
parse->segment.rate, parse->segment.format, last_start,
parse->segment.stop, last_start);
parse->segment.last_stop = last_start;
}
}
}
/* and should then also be linked downstream, so safe to send some events */
@ -1405,10 +1461,17 @@ gst_base_parse_push_buffer (GstBaseParse * parse, GstBuffer * buffer)
gst_buffer_unref (buffer);
GST_LOG_OBJECT (parse, "frame (%d bytes) not pushed: %s",
GST_BUFFER_SIZE (buffer), gst_flow_get_name (ret));
/* if we are not sufficiently in control, let upstream decide on EOS */
if (ret == GST_FLOW_UNEXPECTED &&
(parse->priv->passthrough ||
(parse->priv->pad_mode == GST_ACTIVATE_PUSH &&
!parse->priv->upstream_seekable)))
ret = GST_FLOW_OK;
}
/* Update current running segment position */
if (ret == GST_FLOW_OK && last_stop != GST_CLOCK_TIME_NONE)
if (ret == GST_FLOW_OK && last_stop != GST_CLOCK_TIME_NONE &&
parse->segment.last_stop < last_stop)
gst_segment_set_last_stop (&parse->segment, GST_FORMAT_TIME, last_stop);
return ret;
@ -1729,7 +1792,7 @@ gst_base_parse_loop (GstPad * pad)
if (ret == GST_FLOW_UNEXPECTED)
goto eos;
else if (ret != GST_FLOW_OK)
goto need_pause;
goto pause;
if (parse->priv->discont) {
GST_DEBUG_OBJECT (parse, "marking DISCONT");
@ -1779,7 +1842,7 @@ gst_base_parse_loop (GstPad * pad)
if (ret == GST_FLOW_UNEXPECTED)
goto eos;
else if (ret != GST_FLOW_OK)
goto need_pause;
goto pause;
if (GST_BUFFER_SIZE (outbuf) < fsize)
goto eos;
}
@ -1793,34 +1856,67 @@ gst_base_parse_loop (GstPad * pad)
/* This always unrefs the outbuf, even if error occurs */
ret = gst_base_parse_handle_and_push_buffer (parse, klass, outbuf);
if (ret != GST_FLOW_OK) {
GST_DEBUG_OBJECT (parse, "flow: %s", gst_flow_get_name (ret));
if (ret == GST_FLOW_UNEXPECTED) {
gst_pad_push_event (parse->srcpad, gst_event_new_eos ());
} else if (ret == GST_FLOW_NOT_LINKED || ret < GST_FLOW_UNEXPECTED) {
GST_ELEMENT_ERROR (parse, STREAM, FAILED, (NULL),
("streaming task paused, reason: %s", gst_flow_get_name (ret)));
gst_pad_push_event (parse->srcpad, gst_event_new_eos ());
}
goto need_pause;
}
if (ret != GST_FLOW_OK)
goto pause;
done:
gst_object_unref (parse);
return;
need_pause:
{
GST_LOG_OBJECT (parse, "pausing task %d", ret);
gst_pad_pause_task (pad);
gst_object_unref (parse);
return;
}
/* ERRORS */
eos:
{
GST_LOG_OBJECT (parse, "sending eos");
gst_pad_push_event (parse->srcpad, gst_event_new_eos ());
goto need_pause;
ret = GST_FLOW_UNEXPECTED;
GST_DEBUG_OBJECT (parse, "eos");
/* fall-through */
}
pause:
{
gboolean push_eos = FALSE;
GST_DEBUG_OBJECT (parse, "pausing task, reason %s",
gst_flow_get_name (ret));
gst_pad_pause_task (parse->sinkpad);
if (ret == GST_FLOW_UNEXPECTED) {
/* handle end-of-stream/segment */
if (parse->segment.flags & GST_SEEK_FLAG_SEGMENT) {
gint64 stop;
if ((stop = parse->segment.stop) == -1)
stop = parse->segment.duration;
GST_DEBUG_OBJECT (parse, "sending segment_done");
gst_element_post_message
(GST_ELEMENT_CAST (parse),
gst_message_new_segment_done (GST_OBJECT_CAST (parse),
GST_FORMAT_TIME, stop));
} else {
/* If we STILL have zero frames processed, fire an error */
if (parse->priv->framecount == 0) {
GST_ELEMENT_ERROR (parse, STREAM, WRONG_TYPE,
("No valid frames found before end of stream"), (NULL));
}
push_eos = TRUE;
}
} else if (ret == GST_FLOW_NOT_LINKED || ret < GST_FLOW_UNEXPECTED) {
/* for fatal errors we post an error message, wrong-state is
* not fatal because it happens due to flushes and only means
* that we should stop now. */
GST_ELEMENT_ERROR (parse, STREAM, FAILED, (NULL),
("streaming stopped, reason %s", gst_flow_get_name (ret)));
push_eos = TRUE;
}
if (push_eos) {
/* newsegment before eos */
if (parse->pending_segment) {
gst_pad_push_event (parse->srcpad, parse->pending_segment);
parse->pending_segment = NULL;
}
gst_pad_push_event (parse->srcpad, gst_event_new_eos ());
}
gst_object_unref (parse);
}
}
@ -2368,7 +2464,8 @@ gst_base_parse_handle_seek (GstBaseParse * parse, GstEvent * event)
if (rate < 0.0)
goto negative_rate;
if (cur_type != GST_SEEK_TYPE_SET)
if (cur_type != GST_SEEK_TYPE_SET ||
(stop_type != GST_SEEK_TYPE_SET && stop_type != GST_SEEK_TYPE_NONE))
goto wrong_type;
/* For any format other than TIME, see if upstream handles
@ -2384,14 +2481,6 @@ gst_base_parse_handle_seek (GstBaseParse * parse, GstEvent * event)
}
}
/* too much estimating going on to support this sensibly,
* and no eos/end-of-segment loop handling either ... */
if ((stop_type == GST_SEEK_TYPE_SET && stop != GST_CLOCK_TIME_NONE) ||
(stop_type != GST_SEEK_TYPE_NONE && stop_type != GST_SEEK_TYPE_SET) ||
(flags & GST_SEEK_FLAG_SEGMENT))
goto wrong_type;
stop = -1;
/* get flush flag */
flush = flags & GST_SEEK_FLAG_FLUSH;
@ -2403,17 +2492,10 @@ gst_base_parse_handle_seek (GstBaseParse * parse, GstEvent * event)
gst_segment_set_seek (&seeksegment, rate, format, flags,
cur_type, cur, stop_type, stop, &update);
/* figure out the last position we need to play. If it's configured (stop !=
* -1), use that, else we play until the total duration of the file */
if ((stop = seeksegment.stop) == -1)
stop = seeksegment.duration;
dstformat = GST_FORMAT_BYTES;
if (!gst_pad_query_convert (parse->srcpad, format, seeksegment.last_stop,
&dstformat, &seekpos)) {
GST_DEBUG_OBJECT (parse, "conversion failed");
return FALSE;
}
&dstformat, &seekpos))
goto convert_failed;
GST_DEBUG_OBJECT (parse,
"seek position %" G_GINT64_FORMAT " in bytes: %" G_GINT64_FORMAT, cur,
@ -2476,8 +2558,8 @@ gst_base_parse_handle_seek (GstBaseParse * parse, GstEvent * event)
/* This will be sent later in _loop() */
parse->pending_segment =
gst_event_new_new_segment (FALSE, parse->segment.rate,
parse->segment.format,
parse->segment.last_stop, stop, parse->segment.last_stop);
parse->segment.format, parse->segment.last_stop, parse->segment.stop,
parse->segment.last_stop);
GST_DEBUG_OBJECT (parse, "Created newseg format %d, "
"start = %" GST_TIME_FORMAT ", stop = %" GST_TIME_FORMAT
@ -2502,12 +2584,20 @@ gst_base_parse_handle_seek (GstBaseParse * parse, GstEvent * event)
GST_PAD_STREAM_UNLOCK (parse->sinkpad);
} else {
GstEvent *new_event;
gint64 stop_pos;
/* The only thing we need to do in PUSH-mode is to send the
seek event (in bytes) to upstream. Segment / flush handling happens
in corresponding src event handlers */
GST_DEBUG_OBJECT (parse, "seek in PUSH mode");
/* already converted start, need same for stop */
dstformat = GST_FORMAT_BYTES;
if (!gst_pad_query_convert (parse->srcpad, format, seeksegment.start,
&dstformat, &stop_pos))
goto convert_failed;
new_event = gst_event_new_seek (rate, GST_FORMAT_BYTES, flush,
GST_SEEK_TYPE_SET, seekpos, stop_type, stop);
GST_SEEK_TYPE_SET, seekpos, stop_type, stop_pos);
res = gst_pad_push_event (parse->sinkpad, new_event);
}
@ -2528,6 +2618,12 @@ wrong_type:
res = FALSE;
goto done;
}
convert_failed:
{
GST_DEBUG_OBJECT (parse, "conversion TIME to BYTES failed.");
res = FALSE;
goto done;
}
}
/**