wavparse: Fix SEEK event handling in push mode, and SEEKABLY query handling

Standard pull mode loop based SEEK handling fails in push mode,
so convert the SEEK event appropriately and dispatch to upstream.
Also cater for NEWSEGMENT event handling, and properly inform
downstream and application of SEEKABLE capabilities, depending
on scheduling mode and upstream.
This commit is contained in:
Mark Nauwelaerts 2009-02-27 13:29:41 +01:00
parent 1846e0af0f
commit 3310a540e3

View file

@ -77,6 +77,7 @@ static gboolean gst_wavparse_pad_convert (GstPad * pad,
gint64 src_value, GstFormat * dest_format, gint64 * dest_value); gint64 src_value, GstFormat * dest_format, gint64 * dest_value);
static GstFlowReturn gst_wavparse_chain (GstPad * pad, GstBuffer * buf); static GstFlowReturn gst_wavparse_chain (GstPad * pad, GstBuffer * buf);
static gboolean gst_wavparse_sink_event (GstPad * pad, GstEvent * event);
static void gst_wavparse_loop (GstPad * pad); static void gst_wavparse_loop (GstPad * pad);
static gboolean gst_wavparse_srcpad_event (GstPad * pad, GstEvent * event); static gboolean gst_wavparse_srcpad_event (GstPad * pad, GstEvent * event);
@ -193,6 +194,8 @@ gst_wavparse_init (GstWavParse * wavparse, GstWavParseClass * g_class)
GST_DEBUG_FUNCPTR (gst_wavparse_sink_activate_pull)); GST_DEBUG_FUNCPTR (gst_wavparse_sink_activate_pull));
gst_pad_set_chain_function (wavparse->sinkpad, gst_pad_set_chain_function (wavparse->sinkpad,
GST_DEBUG_FUNCPTR (gst_wavparse_chain)); GST_DEBUG_FUNCPTR (gst_wavparse_chain));
gst_pad_set_event_function (wavparse->sinkpad,
GST_DEBUG_FUNCPTR (gst_wavparse_sink_event));
gst_element_add_pad (GST_ELEMENT_CAST (wavparse), wavparse->sinkpad); gst_element_add_pad (GST_ELEMENT_CAST (wavparse), wavparse->sinkpad);
/* src, will be created later */ /* src, will be created later */
@ -725,8 +728,35 @@ gst_wavparse_stream_init (GstWavParse * wav)
return GST_FLOW_OK; return GST_FLOW_OK;
} }
/* This function is used to perform seeks on the element in static gboolean
* pull mode. gst_wavparse_time_to_bytepos (GstWavParse * wav, gint64 ts, gint64 * bytepos)
{
/* -1 always maps to -1 */
if (ts == -1) {
*bytepos = -1;
return TRUE;
}
/* 0 always maps to 0 */
if (ts == 0) {
*bytepos = 0;
return TRUE;
}
if (wav->bps > 0) {
*bytepos = uint64_ceiling_scale (ts, (guint64) wav->bps, GST_SECOND);
return TRUE;
} else if (wav->fact) {
guint64 bps =
gst_util_uint64_scale_int (wav->datasize, wav->rate, wav->fact);
*bytepos = uint64_ceiling_scale (ts, bps, GST_SECOND);
return TRUE;
}
return FALSE;
}
/* This function is used to perform seeks on the element.
* *
* It also works when event is NULL, in which case it will just * It also works when event is NULL, in which case it will just
* start from the last configured segment. This technique is * start from the last configured segment. This technique is
@ -783,6 +813,48 @@ gst_wavparse_perform_seek (GstWavParse * wav, GstEvent * event)
stop_type = GST_SEEK_TYPE_SET; stop_type = GST_SEEK_TYPE_SET;
} }
/* in push mode, we must delegate to upstream */
if (wav->streaming) {
gboolean res = FALSE;
/* if streaming not yet started; only prepare initial newsegment */
if (!event || wav->state != GST_WAVPARSE_DATA) {
if (wav->start_segment)
gst_event_unref (wav->start_segment);
wav->start_segment =
gst_event_new_new_segment (FALSE, wav->segment.rate,
wav->segment.format, wav->segment.last_stop, wav->segment.duration,
wav->segment.last_stop);
res = TRUE;
} else {
/* convert seek positions to byte positions in data sections */
if (format == GST_FORMAT_TIME) {
/* should not fail */
if (!gst_wavparse_time_to_bytepos (wav, cur, &cur))
goto no_position;
if (!gst_wavparse_time_to_bytepos (wav, stop, &stop))
goto no_position;
}
/* mind sample boundary and header */
if (cur >= 0) {
cur -= (cur % wav->bytes_per_sample);
cur += wav->datastart;
}
if (stop >= 0) {
stop -= (stop % wav->bytes_per_sample);
stop += wav->datastart;
}
GST_DEBUG_OBJECT (wav, "Pushing BYTE seek rate %g, "
"start %" G_GINT64_FORMAT ", stop %" G_GINT64_FORMAT, rate, cur,
stop);
/* BYTE seek event */
event = gst_event_new_seek (rate, GST_FORMAT_BYTES, flags, cur_type, cur,
stop_type, stop);
res = gst_pad_push_event (wav->sinkpad, event);
}
return res;
}
/* get flush flag */ /* get flush flag */
flush = flags & GST_SEEK_FLAG_FLUSH; flush = flags & GST_SEEK_FLAG_FLUSH;
@ -832,16 +904,8 @@ gst_wavparse_perform_seek (GstWavParse * wav, GstEvent * event)
/* bring offset to bytes, if the bps is 0, we have the segment in BYTES and /* bring offset to bytes, if the bps is 0, we have the segment in BYTES and
* we can just copy the last_stop. If not, we use the bps to convert TIME to * we can just copy the last_stop. If not, we use the bps to convert TIME to
* bytes. */ * bytes. */
if (wav->bps > 0) if (!gst_wavparse_time_to_bytepos (wav, seeksegment.last_stop,
wav->offset = (gint64 *) & wav->offset))
uint64_ceiling_scale (seeksegment.last_stop, (guint64) wav->bps,
GST_SECOND);
else if (wav->fact) {
guint64 bps =
gst_util_uint64_scale_int (wav->datasize, wav->rate, wav->fact);
wav->offset =
uint64_ceiling_scale (seeksegment.last_stop, bps, GST_SECOND);
} else
wav->offset = seeksegment.last_stop; wav->offset = seeksegment.last_stop;
GST_LOG_OBJECT (wav, "offset=%" G_GUINT64_FORMAT, wav->offset); GST_LOG_OBJECT (wav, "offset=%" G_GUINT64_FORMAT, wav->offset);
wav->offset -= (wav->offset % wav->bytes_per_sample); wav->offset -= (wav->offset % wav->bytes_per_sample);
@ -854,14 +918,7 @@ gst_wavparse_perform_seek (GstWavParse * wav, GstEvent * event)
} }
if (stop_type != GST_SEEK_TYPE_NONE) { if (stop_type != GST_SEEK_TYPE_NONE) {
if (wav->bps > 0) if (!gst_wavparse_time_to_bytepos (wav, stop, (gint64 *) & wav->end_offset))
wav->end_offset =
uint64_ceiling_scale (stop, (guint64) wav->bps, GST_SECOND);
else if (wav->fact) {
guint64 bps =
gst_util_uint64_scale_int (wav->datasize, wav->rate, wav->fact);
wav->end_offset = uint64_ceiling_scale (stop, bps, GST_SECOND);
} else
wav->end_offset = stop; wav->end_offset = stop;
GST_LOG_OBJECT (wav, "end_offset=%" G_GUINT64_FORMAT, wav->end_offset); GST_LOG_OBJECT (wav, "end_offset=%" G_GUINT64_FORMAT, wav->end_offset);
wav->end_offset -= (wav->end_offset % wav->bytes_per_sample); wav->end_offset -= (wav->end_offset % wav->bytes_per_sample);
@ -962,6 +1019,12 @@ no_format:
GST_DEBUG_OBJECT (wav, "unsupported format given, seek aborted."); GST_DEBUG_OBJECT (wav, "unsupported format given, seek aborted.");
return FALSE; return FALSE;
} }
no_position:
{
GST_DEBUG_OBJECT (wav,
"Could not determine byte position for desired time");
return FALSE;
}
} }
/* /*
@ -1678,6 +1741,32 @@ iterate_adapter:
if (wav->streaming) { if (wav->streaming) {
guint avail = gst_adapter_available (wav->adapter); guint avail = gst_adapter_available (wav->adapter);
guint extra;
/* flush some bytes if evil upstream sends segment that starts
* before data or does is not send sample aligned segment */
if (G_LIKELY (wav->offset >= wav->datastart)) {
extra = (wav->offset - wav->datastart) % wav->bytes_per_sample;
} else {
extra = wav->datastart - wav->offset;
}
if (G_UNLIKELY (extra)) {
extra = wav->bytes_per_sample - extra;
if (extra <= avail) {
GST_DEBUG_OBJECT (wav, "flushing %d bytes to sample boundary", extra);
gst_adapter_flush (wav->adapter, extra);
wav->offset += extra;
wav->dataleft -= extra;
goto iterate_adapter;
} else {
GST_DEBUG_OBJECT (wav, "flushing %d bytes", avail);
gst_adapter_clear (wav->adapter);
wav->offset += avail;
wav->dataleft -= avail;
return GST_FLOW_OK;
}
}
if (avail < desired) { if (avail < desired) {
GST_LOG_OBJECT (wav, "Got only %d bytes of data from the sinkpad", avail); GST_LOG_OBJECT (wav, "Got only %d bytes of data from the sinkpad", avail);
@ -1927,6 +2016,8 @@ gst_wavparse_chain (GstPad * pad, GstBuffer * buf)
/* fall-through */ /* fall-through */
case GST_WAVPARSE_DATA: case GST_WAVPARSE_DATA:
if (buf && GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DISCONT))
wav->discont = TRUE;
if ((ret = gst_wavparse_stream_data (wav)) != GST_FLOW_OK) if ((ret = gst_wavparse_stream_data (wav)) != GST_FLOW_OK)
goto done; goto done;
break; break;
@ -1937,6 +2028,132 @@ done:
return ret; return ret;
} }
static GstFlowReturn
gst_wavparse_flush_data (GstWavParse * wav)
{
GstFlowReturn ret = GST_FLOW_OK;
guint av;
if ((av = gst_adapter_available (wav->adapter)) > 0) {
wav->dataleft = av;
wav->end_offset = wav->offset + av;
ret = gst_wavparse_stream_data (wav);
}
return ret;
}
static gboolean
gst_wavparse_sink_event (GstPad * pad, GstEvent * event)
{
GstWavParse *wav = GST_WAVPARSE (GST_PAD_PARENT (pad));
gboolean ret = TRUE;
GST_LOG_OBJECT (wav, "handling %s event", GST_EVENT_TYPE_NAME (event));
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_NEWSEGMENT:
{
GstFormat format;
gdouble rate, arate;
gint64 start, stop, time, offset = 0, end_offset = -1;
gboolean update;
GstSegment segment;
/* some debug output */
gst_segment_init (&segment, GST_FORMAT_UNDEFINED);
gst_event_parse_new_segment_full (event, &update, &rate, &arate, &format,
&start, &stop, &time);
gst_segment_set_newsegment_full (&segment, update, rate, arate, format,
start, stop, time);
GST_DEBUG_OBJECT (wav,
"received format %d newsegment %" GST_SEGMENT_FORMAT, format,
&segment);
if (wav->state != GST_WAVPARSE_DATA) {
GST_DEBUG_OBJECT (wav, "still starting, eating event");
goto exit;
}
/* now we are either committed to TIME or BYTE format,
* and we only expect a BYTE segment, e.g. following a seek */
if (format == GST_FORMAT_BYTES) {
if (start > 0) {
offset = start;
start -= wav->datastart;
start = MAX (start, 0);
}
if (stop > 0) {
end_offset = stop;
stop -= wav->datastart;
stop = MAX (stop, 0);
}
if (wav->segment.format == GST_FORMAT_TIME) {
guint64 bps = wav->bps;
/* operating in format TIME, so we can convert */
if (!bps && wav->fact)
bps =
gst_util_uint64_scale_int (wav->datasize, wav->rate, wav->fact);
if (bps) {
if (start >= 0)
start =
uint64_ceiling_scale (start, GST_SECOND, (guint64) wav->bps);
if (stop >= 0)
stop =
uint64_ceiling_scale (stop, GST_SECOND, (guint64) wav->bps);
}
}
} else {
GST_DEBUG_OBJECT (wav, "unsupported segment format, ignoring");
goto exit;
}
/* accept upstream's notion of segment and distribute along */
gst_segment_set_newsegment_full (&wav->segment, update, rate, arate,
wav->segment.format, start, stop, start);
/* also store the newsegment event for the streaming thread */
if (wav->start_segment)
gst_event_unref (wav->start_segment);
wav->start_segment =
gst_event_new_new_segment_full (update, rate, arate,
wav->segment.format, start, stop, start);
GST_DEBUG_OBJECT (wav, "Pushing newseg update %d, rate %g, "
"applied rate %g, format %d, start %" G_GINT64_FORMAT ", "
"stop %" G_GINT64_FORMAT, update, rate, arate, wav->segment.format,
start, stop);
/* stream leftover data in current segment */
gst_wavparse_flush_data (wav);
/* and set up streaming thread for next one */
wav->offset = offset;
wav->end_offset = end_offset;
if (wav->end_offset > 0) {
wav->dataleft = wav->end_offset - wav->offset;
} else {
/* infinity; upstream will EOS when done */
wav->dataleft = G_MAXUINT64;
}
exit:
gst_event_unref (event);
break;
}
case GST_EVENT_EOS:
/* stream leftover data in current segment */
gst_wavparse_flush_data (wav);
/* fall-through */
case GST_EVENT_FLUSH_STOP:
gst_adapter_clear (wav->adapter);
wav->discont = TRUE;
/* fall-through */
default:
ret = gst_pad_event_default (wav->sinkpad, event);
break;
}
return ret;
}
#if 0 #if 0
/* convert and query stuff */ /* convert and query stuff */
static const GstFormat * static const GstFormat *
@ -2089,6 +2306,8 @@ gst_wavparse_pad_query (GstPad * pad, GstQuery * query)
return FALSE; return FALSE;
} }
GST_LOG_OBJECT (pad, "%s query", GST_QUERY_TYPE_NAME (query));
switch (GST_QUERY_TYPE (query)) { switch (GST_QUERY_TYPE (query)) {
case GST_QUERY_POSITION: case GST_QUERY_POSITION:
{ {
@ -2152,19 +2371,30 @@ gst_wavparse_pad_query (GstPad * pad, GstQuery * query)
} }
case GST_QUERY_SEEKING:{ case GST_QUERY_SEEKING:{
GstFormat fmt; GstFormat fmt;
gboolean seekable = FALSE;
gst_query_parse_seeking (query, &fmt, NULL, NULL, NULL); gst_query_parse_seeking (query, &fmt, NULL, NULL, NULL);
if (fmt == GST_FORMAT_TIME) { if (fmt == wav->segment.format) {
gboolean seekable = TRUE;
if ((wav->bps == 0) && !wav->fact) {
seekable = FALSE;
} else if (!gst_wavparse_calculate_duration (wav)) {
seekable = FALSE;
}
gst_query_set_seeking (query, GST_FORMAT_TIME, seekable,
0, wav->duration);
res = TRUE; res = TRUE;
if (wav->streaming) {
GstQuery *q;
q = gst_query_new_seeking (GST_FORMAT_BYTES);
if ((res = gst_pad_peer_query (wav->sinkpad, q))) {
gst_query_parse_seeking (q, &fmt, &seekable, NULL, NULL);
GST_LOG_OBJECT (wav, "upstream BYTE seekable %d", seekable);
}
gst_query_unref (q);
} else {
GST_LOG_OBJECT (wav, "looping => seekable");
seekable = TRUE;
res = TRUE;
}
} else if (fmt == GST_FORMAT_TIME) {
res = TRUE;
}
if (res) {
gst_query_set_seeking (query, fmt, seekable, 0, wav->segment.duration);
} }
break; break;
} }