Improve seek error-resilience.

Original commit message from CVS:
Improve seek error-resilience.
Better error handling generally.
This commit is contained in:
Michael Smith 2005-10-05 11:25:51 +00:00
parent 261c0d4a51
commit 72ba426c00
2 changed files with 204 additions and 60 deletions

View file

@ -1,3 +1,15 @@
2005-10-05 Michael Smith <msmith@fluendo.com>
* gst/realmedia/rmdemux.c: (gst_rmdemux_sink_event),
(gst_rmdemux_src_event), (gst_rmdemux_validate_offset),
(find_seek_offset_bytes), (find_seek_offset_time),
(gst_rmdemux_perform_seek), (gst_rmdemux_src_query),
(gst_rmdemux_loop), (gst_rmdemux_fourcc_isplausible),
(gst_rmdemux_chain), (gst_rmdemux_send_event),
(gst_rmdemux_add_stream), (gst_rmdemux_parse_packet):
Improve seeking error-resilience.
General improvements in error handling.
2005-10-03 Thomas Vander Stichele <thomas at apestaart dot org>
* configure.ac:

View file

@ -3,6 +3,7 @@
* Copyright (C) <2003> David A. Schleef <ds@schleef.org>
* Copyright (C) <2004> Stephane Loeuillet <gstreamer@leroutier.net>
* Copyright (C) <2005> Owen Fraser-Green <owen@discobabe.net>
* Copyright (C) <2005> Michael Smith <fluendo.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
@ -144,8 +145,8 @@ static void gst_rmdemux_parse_data (GstRMDemux * rmdemux, const void *data,
int length);
static void gst_rmdemux_parse_cont (GstRMDemux * rmdemux, const void *data,
int length);
static void gst_rmdemux_parse_packet (GstRMDemux * rmdemux, const void *data,
guint16 version, guint16 length);
static GstFlowReturn gst_rmdemux_parse_packet (GstRMDemux * rmdemux,
const void *data, guint16 version, guint16 length);
static void gst_rmdemux_parse_indx_data (GstRMDemux * rmdemux, const void *data,
int length);
@ -372,9 +373,134 @@ done_unref:
return ret;
}
/* Validate that this looks like a reasonable point to seek to */
static gboolean
gst_rmdemux_validate_offset (GstRMDemux * rmdemux)
{
GstBuffer *buffer;
GstFlowReturn flowret;
guint16 version, length;
gboolean ret = TRUE;
flowret = gst_pad_pull_range (rmdemux->sinkpad, rmdemux->offset, 4, &buffer);
if (flowret != GST_FLOW_OK) {
GST_DEBUG_OBJECT (rmdemux, "Failed to pull data at offset %d",
rmdemux->offset);
return FALSE;
}
// TODO: Can we also be seeking to a 'DATA' chunk header? Check this.
// Also, for the case we currently handle, can we check any more? It's pretty
// sucky to not be validating a little more heavily than this...
/* This should now be the start of a data packet header. That begins with
* a 2-byte 'version' field, which has to be 0 or 1, then a length. I'm not
* certain what values are valid for length, but it must always be at least
* 4 bytes, and we can check that it won't take us past our known total size
*/
version = RMDEMUX_GUINT16_GET (GST_BUFFER_DATA (buffer));
if (version != 0 && version != 1) {
GST_DEBUG_OBJECT (rmdemux, "Expected version 0 or 1, got %d",
(int) version);
ret = FALSE;
}
length = RMDEMUX_GUINT16_GET (GST_BUFFER_DATA (buffer) + 2);
// TODO: Also check against total stream length
if (length < 4) {
GST_DEBUG_OBJECT (rmdemux, "Expected length >= 4, got %d", (int) length);
ret = FALSE;
}
if (ret) {
rmdemux->offset += 4;
gst_adapter_clear (rmdemux->adapter);
gst_adapter_push (rmdemux->adapter, buffer);
} else {
GST_WARNING_OBJECT (rmdemux, "Failed to validate seek offset at %d",
rmdemux->offset);
}
return ret;
}
static gboolean
find_seek_offset_bytes (GstRMDemux * rmdemux, guint target)
{
int i, n;
gboolean ret = FALSE;
for (n = 0; n < rmdemux->n_streams; n++) {
GstRMDemuxStream *stream;
stream = rmdemux->streams[n];
/* Search backwards through this stream's index until we find the first
* timestamp before our target time */
for (i = stream->index_length - 1; i >= 0; i--) {
if (stream->index[i].offset <= target) {
/* Set the seek_offset for the stream so we don't bother parsing it
* until we've passed that point */
stream->seek_offset = stream->index[i].offset;
rmdemux->offset = stream->index[i].offset;
ret = TRUE;
break;
}
}
}
return ret;
}
static gboolean
find_seek_offset_time (GstRMDemux * rmdemux, GstClockTime time)
{
int i, n;
gboolean ret = FALSE;
GstClockTime earliest = GST_CLOCK_TIME_NONE;
for (n = 0; n < rmdemux->n_streams; n++) {
GstRMDemuxStream *stream;
stream = rmdemux->streams[n];
/* Search backwards through this stream's index until we find the first
* timestamp before our target time */
for (i = stream->index_length - 1; i >= 0; i--) {
if (stream->index[i].timestamp <= time) {
/* Set the seek_offset for the stream so we don't bother parsing it
* until we've passed that point */
stream->seek_offset = stream->index[i].offset;
/* If it's also the earliest timestamp we've seen of all streams, then
* that's our target!
*/
if (earliest == GST_CLOCK_TIME_NONE ||
stream->index[i].timestamp < earliest) {
earliest = stream->index[i].timestamp;
rmdemux->offset = stream->index[i].offset;
GST_DEBUG_OBJECT (rmdemux,
"We're looking for %" GST_TIME_FORMAT
" and we found that stream %d has the latest index at %"
GST_TIME_FORMAT, GST_TIME_ARGS (rmdemux->segment_start), n,
GST_TIME_ARGS (earliest));
}
ret = TRUE;
break;
}
}
}
return ret;
}
static gboolean
gst_rmdemux_perform_seek (GstRMDemux * rmdemux, gboolean flush)
{
gboolean validated;
gboolean ret = TRUE;
/* nothing configured, play complete file */
if (rmdemux->segment_start == GST_CLOCK_TIME_NONE)
rmdemux->segment_start = 0;
@ -402,56 +528,56 @@ gst_rmdemux_perform_seek (GstRMDemux * rmdemux, gboolean flush)
gst_pad_pause_task (rmdemux->sinkpad);
}
GST_LOG_OBJECT (rmdemux, "Done starting flushes");
/* now grab the stream lock so that streaming cannot continue, for
* non flushing seeks when the element is in PAUSED this could block
* forever. */
GST_STREAM_LOCK (rmdemux->sinkpad);
GST_LOG_OBJECT (rmdemux, "Took streamlock");
/* we need to stop flushing on the sinkpad as we're going to use it
* next. We can do this as we have the STREAM lock now. */
gst_pad_push_event (rmdemux->sinkpad, gst_event_new_flush_stop ());
/* Find the new offset */
/* Get the video stream */
{
int i, n;
GST_LOG_OBJECT (rmdemux, "Pushed FLUSH_STOP event");
rmdemux->offset = 0;
GstClockTime tmp_time = rmdemux->segment_stop;
/* Find the last offset which occurs before the seek time */
for (n = 0; n < rmdemux->n_streams; n++) {
GstRMDemuxStream *stream;
stream = rmdemux->streams[n];
for (i = stream->index_length - 1; i >= 0; i--) {
if (stream->index[i].timestamp < rmdemux->segment_start) {
/* Set the seek_offset for the stream so we don't bother parsing it
* until we've passed that point */
stream->seek_offset = stream->index[i].offset;
if (tmp_time > stream->index[i].timestamp) {
tmp_time = stream->index[i].timestamp;
rmdemux->offset = stream->index[i].offset;
GST_DEBUG_OBJECT (rmdemux,
"We're looking for %" GST_TIME_FORMAT
" and we found that stream %d has the latest index at %"
GST_TIME_FORMAT, GST_TIME_ARGS (rmdemux->segment_start), n,
GST_TIME_ARGS (tmp_time));
}
break;
}
}
}
/* For each stream, find the first index offset equal to or before our seek
* target. Of these, find the smallest offset. That's where we seek to.
*
* Then we pull 4 bytes from that offset, validate that we've seeked to a
* DATA chunk (with the DATA fourcc).
* If that fails, restart, with the seek target set to one less than the
* offset we just tried. If we run out of places to try, treat that as a fatal
* error.
*/
if (!find_seek_offset_time (rmdemux, rmdemux->segment_start)) {
GST_LOG_OBJECT (rmdemux, "Failed to find seek offset by time");
ret = FALSE;
goto done;
}
GST_LOG_OBJECT (rmdemux, "Validating offset %u", rmdemux->offset);
validated = gst_rmdemux_validate_offset (rmdemux);
while (!validated) {
GST_INFO_OBJECT (rmdemux, "Failed to validate offset at %u",
rmdemux->offset);
if (!find_seek_offset_bytes (rmdemux, rmdemux->offset - 1)) {
ret = FALSE;
goto done;
}
validated = gst_rmdemux_validate_offset (rmdemux);
}
GST_LOG_OBJECT (rmdemux, "Found final offset. Excellent!");
/* now we have a new position, prepare for streaming again */
{
GstEvent *event;
/* Reset the demuxer state */
rmdemux->state = RMDEMUX_STATE_DATA_PACKET;
gst_adapter_clear (rmdemux->adapter);
rmdemux->cur_timestamp = GST_CLOCK_TIME_NONE;
if (flush)
@ -479,10 +605,12 @@ gst_rmdemux_perform_seek (GstRMDemux * rmdemux, gboolean flush)
rmdemux->sinkpad);
}
done:
/* streaming can continue now */
GST_STREAM_UNLOCK (rmdemux->sinkpad);
return TRUE;
return ret;
}
@ -665,6 +793,7 @@ gst_rmdemux_loop (GstPad * pad)
size = rmdemux->avg_packet_size;
break;
case RMDEMUX_STATE_EOS:
GST_LOG_OBJECT (rmdemux, "At EOS, pausing task");
goto need_pause;
default:
GST_LOG_OBJECT (rmdemux, "Default: requires %d bytes (state is %d)",
@ -785,14 +914,14 @@ gst_rmdemux_chain (GstPad * pad, GstBuffer * buffer)
rmdemux->size = RMDEMUX_GUINT32_GET (data + 4) - HEADER_SIZE;
rmdemux->object_version = RMDEMUX_GUINT16_GET (data + 8);
/* Sanity-check, if this is after a seek we might have bad data.
* We assume that the FOURCC is printable ASCII */
/* Sanity-check. We assume that the FOURCC is printable ASCII */
if (!gst_rmdemux_fourcc_isplausible (rmdemux->object_id)) {
/* Failed. Remain in HEADER state, try again... We flush only the
* actual FOURCC, not the entire header, because we could need to
* resync anywhere at all... really, this should never happen. */
GST_WARNING_OBJECT (rmdemux,
"Bogus looking header, unprintable FOURCC");
/* Failed. Remain in HEADER state, try again... We flush only
* the actual FOURCC, not the entire header, because we could
* need to resync anywhere at all... really, this should never
* happen. */
GST_WARNING_OBJECT (rmdemux, "Bogus looking header, unprintable "
"FOURCC");
gst_adapter_flush (rmdemux->adapter, 4);
break;
@ -971,14 +1100,15 @@ gst_rmdemux_chain (GstPad * pad, GstBuffer * buffer)
data = gst_adapter_peek (rmdemux->adapter, 4);
length = RMDEMUX_GUINT16_GET (data + 2);
if (length == 0) {
gst_adapter_flush (rmdemux->adapter, 2);
if (length < 4) {
/* Invalid, just drop it */
gst_adapter_flush (rmdemux->adapter, 4);
} else {
if (gst_adapter_available (rmdemux->adapter) < length)
goto unlock;
data = gst_adapter_peek (rmdemux->adapter, length);
gst_rmdemux_parse_packet (rmdemux, data + 4, version, length);
gst_rmdemux_parse_packet (rmdemux, data + 4, version, length - 4);
rmdemux->chunk_index++;
gst_adapter_flush (rmdemux->adapter, length);
@ -990,9 +1120,11 @@ gst_rmdemux_chain (GstPad * pad, GstBuffer * buffer)
/* Stream done */
gst_adapter_flush (rmdemux->adapter, 2);
if (rmdemux->data_offset == 0)
if (rmdemux->data_offset == 0) {
GST_LOG_OBJECT (rmdemux,
"No further data, internal demux state EOS");
rmdemux->state = RMDEMUX_STATE_EOS;
else
} else
rmdemux->state = RMDEMUX_STATE_HEADER;
}
break;
@ -1105,9 +1237,8 @@ gst_rmdemux_add_stream (GstRMDemux * rmdemux, GstRMDemuxStream * stream)
stream->pad =
gst_pad_new_from_template (gst_static_pad_template_get
(&gst_rmdemux_audiosrc_template), name);
GST_LOG_OBJECT (rmdemux, "Created audio pad \"%s\"", name);
g_free (name);
GST_LOG_OBJECT (rmdemux, "Created audio pad \"%s\"",
gst_pad_get_name (stream->pad));
switch (stream->fourcc) {
/* Older RealAudio Codecs */
case GST_RM_AUD_14_4:
@ -1543,7 +1674,7 @@ gst_rmdemux_parse_cont (GstRMDemux * rmdemux, const void *data, int length)
g_free (title);
}
static void
static GstFlowReturn
gst_rmdemux_parse_packet (GstRMDemux * rmdemux, const void *data,
guint16 version, guint16 length)
{
@ -1559,12 +1690,14 @@ gst_rmdemux_parse_packet (GstRMDemux * rmdemux, const void *data,
"Parsing a packet for stream=%d, timestamp=" GST_TIME_FORMAT
", version=%d", id, GST_TIME_ARGS (rmdemux->cur_timestamp), version);
// TODO: This is skipping over either 2 or 3 bytes (version dependent)
// without even reading it. What are these for?
if (version == 0) {
data += 8;
packet_size = length - 12;
packet_size = length - 8;
} else {
data += 9;
packet_size = length - 13;
packet_size = length - 9;
}
stream = gst_rmdemux_get_stream_by_id (rmdemux, id);
@ -1572,28 +1705,27 @@ gst_rmdemux_parse_packet (GstRMDemux * rmdemux, const void *data,
if (!stream) {
GST_WARNING_OBJECT (rmdemux, "No stream for stream id %d in parsing "
"data packet", id);
return;
return GST_FLOW_OK;
}
if ((rmdemux->offset + packet_size) > stream->seek_offset) {
if ((rmdemux->offset + packet_size) > stream->seek_offset &&
stream && stream->pad && GST_PAD_IS_USABLE (stream->pad)) {
if (gst_pad_alloc_buffer (stream->pad, GST_BUFFER_OFFSET_NONE,
packet_size, stream->caps, &buffer) != GST_FLOW_OK) {
GST_WARNING_OBJECT (rmdemux, "failed to alloc src buffer for stream %d",
id);
return;
return GST_FLOW_ERROR;
}
memcpy (GST_BUFFER_DATA (buffer), (guint8 *) data, packet_size);
GST_BUFFER_TIMESTAMP (buffer) = rmdemux->cur_timestamp;
if (stream && stream->pad && GST_PAD_IS_USABLE (stream->pad)) {
GST_DEBUG_OBJECT (rmdemux, "Pushing buffer of size %d to pad",
packet_size);
gst_pad_push (stream->pad, buffer);
}
GST_DEBUG_OBJECT (rmdemux, "Pushing buffer of size %d to pad", packet_size);
return gst_pad_push (stream->pad, buffer);
} else {
GST_DEBUG_OBJECT (rmdemux,
"Stream %d is skipping: seek_offset=%d, offset=%d, packet_size",
stream->id, stream->seek_offset, rmdemux->offset, packet_size);
return GST_FLOW_OK;
}
}