gstreamer/ext/ogg/gstoggdemux.c
Vincent Penquerc'h c956c5fd00 oggstream: only use information from skeleton if we have nothing better
The codec setup headers are a lot more likely to have correct information,
especially as it's easy to remux a skeleton in a file where streams don't
have the same parameters (I've even seen a file with two skeletons).

Still, this is useful in the case we have a codec we can't decode, so we
can at least (theoretically) convert granpos to time, so we discard this
information if the codec setup has already provided it.

This fixes playback on (at lesat) the original archive.org encoding of
"The Night of the Living Dead" (now replaced by another encoding).

https://bugzilla.gnome.org/show_bug.cgi?id=612443
2011-09-19 23:21:23 +01:00

4507 lines
130 KiB
C

/* GStreamer
* Copyright (C) 2004 Wim Taymans <wim@fluendo.com>
*
* gstoggdemux.c: ogg stream demuxer
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 59 Temple Place - Suite 330,
* Boston, MA 02111-1307, USA.
*/
/**
* SECTION:element-oggdemux
* @see_also: <link linkend="gst-plugins-base-plugins-oggmux">oggmux</link>
*
* This element demuxes ogg files into their encoded audio and video components.
*
* <refsect2>
* <title>Example pipelines</title>
* |[
* gst-launch -v filesrc location=test.ogg ! oggdemux ! vorbisdec ! audioconvert ! alsasink
* ]| Decodes the vorbis audio stored inside an ogg container.
* </refsect2>
*
* Last reviewed on 2006-12-30 (0.10.5)
*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include <string.h>
#include <gst/gst-i18n-plugin.h>
#include <gst/tag/tag.h>
#include "gstoggdemux.h"
#define CHUNKSIZE (8500) /* this is out of vorbisfile */
/* we hope we get a granpos within this many bytes off the end */
#define DURATION_CHUNK_OFFSET (64*1024)
/* stop duration checks within this much of EOS */
#define EOS_AVOIDANCE_THRESHOLD 8192
#define GST_FLOW_LIMIT GST_FLOW_CUSTOM_ERROR
#define GST_FLOW_SKIP_PUSH GST_FLOW_CUSTOM_SUCCESS_1
#define GST_CHAIN_LOCK(ogg) g_mutex_lock((ogg)->chain_lock)
#define GST_CHAIN_UNLOCK(ogg) g_mutex_unlock((ogg)->chain_lock)
#define GST_PUSH_LOCK(ogg) \
do { \
GST_TRACE_OBJECT(ogg, "Push lock"); \
g_mutex_lock((ogg)->push_lock); \
} while(0)
#define GST_PUSH_UNLOCK(ogg) \
do { \
GST_TRACE_OBJECT(ogg, "Push unlock"); \
g_mutex_unlock((ogg)->push_lock); \
} while(0)
GST_DEBUG_CATEGORY (gst_ogg_demux_debug);
GST_DEBUG_CATEGORY (gst_ogg_demux_setup_debug);
#define GST_CAT_DEFAULT gst_ogg_demux_debug
static ogg_packet *
_ogg_packet_copy (const ogg_packet * packet)
{
ogg_packet *ret = g_slice_new (ogg_packet);
*ret = *packet;
ret->packet = g_memdup (packet->packet, packet->bytes);
return ret;
}
static void
_ogg_packet_free (ogg_packet * packet)
{
g_free (packet->packet);
g_slice_free (ogg_packet, packet);
}
static ogg_page *
gst_ogg_page_copy (ogg_page * page)
{
ogg_page *p = g_slice_new (ogg_page);
/* make a copy of the page */
p->header = g_memdup (page->header, page->header_len);
p->header_len = page->header_len;
p->body = g_memdup (page->body, page->body_len);
p->body_len = page->body_len;
return p;
}
static void
gst_ogg_page_free (ogg_page * page)
{
g_free (page->header);
g_free (page->body);
g_slice_free (ogg_page, page);
}
static gboolean gst_ogg_demux_collect_chain_info (GstOggDemux * ogg,
GstOggChain * chain);
static gboolean gst_ogg_demux_activate_chain (GstOggDemux * ogg,
GstOggChain * chain, GstEvent * event);
static void gst_ogg_pad_mark_discont (GstOggPad * pad);
static void gst_ogg_chain_mark_discont (GstOggChain * chain);
static gboolean gst_ogg_demux_perform_seek (GstOggDemux * ogg,
GstEvent * event);
static gboolean gst_ogg_demux_receive_event (GstElement * element,
GstEvent * event);
static void gst_ogg_pad_dispose (GObject * object);
static void gst_ogg_pad_finalize (GObject * object);
static const GstQueryType *gst_ogg_pad_query_types (GstPad * pad);
static gboolean gst_ogg_pad_src_query (GstPad * pad, GstQuery * query);
static gboolean gst_ogg_pad_event (GstPad * pad, GstEvent * event);
static GstCaps *gst_ogg_pad_getcaps (GstPad * pad);
static GstOggPad *gst_ogg_chain_get_stream (GstOggChain * chain,
guint32 serialno);
static GstFlowReturn gst_ogg_demux_combine_flows (GstOggDemux * ogg,
GstOggPad * pad, GstFlowReturn ret);
static void gst_ogg_demux_sync_streams (GstOggDemux * ogg);
GstCaps *gst_ogg_demux_set_header_on_caps (GstOggDemux * ogg,
GstCaps * caps, GList * headers);
static gboolean gst_ogg_demux_send_event (GstOggDemux * ogg, GstEvent * event);
static gboolean gst_ogg_demux_perform_seek_push (GstOggDemux * ogg,
GstEvent * event);
static gboolean gst_ogg_demux_check_duration_push (GstOggDemux * ogg,
GstSeekFlags flags, GstEvent * event);
GType gst_ogg_pad_get_type (void);
G_DEFINE_TYPE (GstOggPad, gst_ogg_pad, GST_TYPE_PAD);
static void
gst_ogg_pad_class_init (GstOggPadClass * klass)
{
GObjectClass *gobject_class;
gobject_class = (GObjectClass *) klass;
gobject_class->dispose = gst_ogg_pad_dispose;
gobject_class->finalize = gst_ogg_pad_finalize;
}
static void
gst_ogg_pad_init (GstOggPad * pad)
{
gst_pad_set_event_function (GST_PAD (pad),
GST_DEBUG_FUNCPTR (gst_ogg_pad_event));
gst_pad_set_getcaps_function (GST_PAD (pad),
GST_DEBUG_FUNCPTR (gst_ogg_pad_getcaps));
gst_pad_set_query_type_function (GST_PAD (pad),
GST_DEBUG_FUNCPTR (gst_ogg_pad_query_types));
gst_pad_set_query_function (GST_PAD (pad),
GST_DEBUG_FUNCPTR (gst_ogg_pad_src_query));
pad->mode = GST_OGG_PAD_MODE_INIT;
pad->current_granule = -1;
pad->keyframe_granule = -1;
pad->start_time = GST_CLOCK_TIME_NONE;
pad->last_stop = GST_CLOCK_TIME_NONE;
pad->have_type = FALSE;
pad->continued = NULL;
pad->map.headers = NULL;
pad->map.queued = NULL;
pad->map.granulerate_n = 0;
pad->map.granulerate_d = 0;
pad->map.granuleshift = -1;
}
static void
gst_ogg_pad_dispose (GObject * object)
{
GstOggPad *pad = GST_OGG_PAD (object);
pad->chain = NULL;
pad->ogg = NULL;
g_list_foreach (pad->map.headers, (GFunc) _ogg_packet_free, NULL);
g_list_free (pad->map.headers);
pad->map.headers = NULL;
g_list_foreach (pad->map.queued, (GFunc) _ogg_packet_free, NULL);
g_list_free (pad->map.queued);
pad->map.queued = NULL;
g_free (pad->map.index);
pad->map.index = NULL;
/* clear continued pages */
g_list_foreach (pad->continued, (GFunc) gst_ogg_page_free, NULL);
g_list_free (pad->continued);
pad->continued = NULL;
if (pad->map.caps) {
gst_caps_unref (pad->map.caps);
pad->map.caps = NULL;
}
if (pad->map.taglist) {
gst_tag_list_free (pad->map.taglist);
pad->map.taglist = NULL;
}
ogg_stream_reset (&pad->map.stream);
G_OBJECT_CLASS (gst_ogg_pad_parent_class)->dispose (object);
}
static void
gst_ogg_pad_finalize (GObject * object)
{
GstOggPad *pad = GST_OGG_PAD (object);
ogg_stream_clear (&pad->map.stream);
G_OBJECT_CLASS (gst_ogg_pad_parent_class)->finalize (object);
}
static const GstQueryType *
gst_ogg_pad_query_types (GstPad * pad)
{
static const GstQueryType query_types[] = {
GST_QUERY_DURATION,
GST_QUERY_SEEKING,
0
};
return query_types;
}
static GstCaps *
gst_ogg_pad_getcaps (GstPad * pad)
{
return gst_caps_ref (GST_PAD_CAPS (pad));
}
static gboolean
gst_ogg_pad_src_query (GstPad * pad, GstQuery * query)
{
gboolean res = TRUE;
GstOggDemux *ogg;
ogg = GST_OGG_DEMUX (gst_pad_get_parent (pad));
switch (GST_QUERY_TYPE (query)) {
case GST_QUERY_DURATION:
{
GstFormat format;
gint64 total_time = -1;
gst_query_parse_duration (query, &format, NULL);
/* can only get position in time */
if (format != GST_FORMAT_TIME)
goto wrong_format;
if (ogg->total_time != -1) {
/* we can return the total length */
total_time = ogg->total_time;
} else {
gint bitrate = ogg->bitrate;
/* try with length and bitrate */
if (bitrate > 0) {
GstQuery *uquery;
/* ask upstream for total length in bytes */
uquery = gst_query_new_duration (GST_FORMAT_BYTES);
if (gst_pad_peer_query (ogg->sinkpad, uquery)) {
gint64 length;
gst_query_parse_duration (uquery, NULL, &length);
/* estimate using the bitrate */
total_time =
gst_util_uint64_scale (length, 8 * GST_SECOND, bitrate);
GST_LOG_OBJECT (ogg,
"length: %" G_GINT64_FORMAT ", bitrate %d, total_time %"
GST_TIME_FORMAT, length, bitrate, GST_TIME_ARGS (total_time));
}
gst_query_unref (uquery);
}
}
gst_query_set_duration (query, GST_FORMAT_TIME, total_time);
break;
}
case GST_QUERY_SEEKING:
{
GstFormat format;
gst_query_parse_seeking (query, &format, NULL, NULL, NULL);
if (format == GST_FORMAT_TIME) {
gboolean seekable = FALSE;
gint64 stop = -1;
if (ogg->pullmode) {
seekable = TRUE;
stop = ogg->total_time;
} else if (ogg->current_chain->streams->len) {
gint i;
seekable = FALSE;
for (i = 0; i < ogg->current_chain->streams->len; i++) {
GstOggPad *pad =
g_array_index (ogg->current_chain->streams, GstOggPad *, i);
seekable = TRUE;
if (pad->map.index != NULL && pad->map.n_index != 0) {
GstOggIndex *idx;
GstClockTime idx_time;
idx = &pad->map.index[pad->map.n_index - 1];
idx_time =
gst_util_uint64_scale (idx->timestamp, GST_SECOND,
pad->map.kp_denom);
if (stop == -1)
stop = idx_time;
else
stop = MAX (idx_time, stop);
} else {
stop = -1; /* we've no clue, sadly, without seeking */
}
}
}
gst_query_set_seeking (query, GST_FORMAT_TIME, seekable, 0, stop);
} else {
res = FALSE;
}
break;
}
default:
res = gst_pad_query_default (pad, query);
break;
}
done:
gst_object_unref (ogg);
return res;
/* ERRORS */
wrong_format:
{
GST_DEBUG_OBJECT (ogg, "only query duration on TIME is supported");
res = FALSE;
goto done;
}
}
static gboolean
gst_ogg_demux_receive_event (GstElement * element, GstEvent * event)
{
gboolean res;
GstOggDemux *ogg;
ogg = GST_OGG_DEMUX (element);
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_SEEK:
/* now do the seek */
res = gst_ogg_demux_perform_seek (ogg, event);
gst_event_unref (event);
break;
default:
GST_DEBUG_OBJECT (ogg, "We only handle seek events here");
goto error;
}
return res;
/* ERRORS */
error:
{
GST_DEBUG_OBJECT (ogg, "error handling event");
gst_event_unref (event);
return FALSE;
}
}
static gboolean
gst_ogg_pad_event (GstPad * pad, GstEvent * event)
{
gboolean res;
GstOggDemux *ogg;
ogg = GST_OGG_DEMUX (gst_pad_get_parent (pad));
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_SEEK:
/* now do the seek */
res = gst_ogg_demux_perform_seek (ogg, event);
gst_event_unref (event);
break;
default:
res = gst_pad_event_default (pad, event);
break;
}
gst_object_unref (ogg);
return res;
}
static void
gst_ogg_pad_reset (GstOggPad * pad)
{
ogg_stream_reset (&pad->map.stream);
GST_DEBUG_OBJECT (pad, "doing reset");
/* clear continued pages */
g_list_foreach (pad->continued, (GFunc) gst_ogg_page_free, NULL);
g_list_free (pad->continued);
pad->continued = NULL;
pad->last_ret = GST_FLOW_OK;
pad->last_stop = GST_CLOCK_TIME_NONE;
pad->current_granule = -1;
pad->keyframe_granule = -1;
pad->is_eos = FALSE;
}
/* queue data, basically takes the packet, puts it in a buffer and store the
* buffer in the queued list. */
static GstFlowReturn
gst_ogg_demux_queue_data (GstOggPad * pad, ogg_packet * packet)
{
#ifndef GST_DISABLE_GST_DEBUG
GstOggDemux *ogg = pad->ogg;
#endif
GST_DEBUG_OBJECT (ogg, "%p queueing data serial %08x",
pad, pad->map.serialno);
pad->map.queued = g_list_append (pad->map.queued, _ogg_packet_copy (packet));
/* we are ok now */
return GST_FLOW_OK;
}
static GstFlowReturn
gst_ogg_demux_chain_peer (GstOggPad * pad, ogg_packet * packet,
gboolean push_headers)
{
GstBuffer *buf = NULL;
GstFlowReturn ret, cret;
GstOggDemux *ogg = pad->ogg;
gint64 current_time;
GstOggChain *chain;
gint64 duration;
gint offset;
gint trim;
GstClockTime out_timestamp, out_duration;
guint64 out_offset, out_offset_end;
gboolean delta_unit = FALSE;
cret = GST_FLOW_OK;
GST_PUSH_LOCK (ogg);
if (!ogg->pullmode && ogg->push_state == PUSH_PLAYING
&& ogg->push_time_length == GST_CLOCK_TIME_NONE
&& !ogg->push_disable_seeking) {
if (!ogg->building_chain) {
/* we got all headers, now try to get duration */
if (!gst_ogg_demux_check_duration_push (ogg, GST_SEEK_FLAG_FLUSH, NULL)) {
GST_PUSH_UNLOCK (ogg);
return GST_FLOW_OK;
}
}
GST_PUSH_UNLOCK (ogg);
return GST_FLOW_OK;
}
GST_PUSH_UNLOCK (ogg);
GST_DEBUG_OBJECT (ogg,
"%p streaming to peer serial %08x", pad, pad->map.serialno);
if (pad->map.is_ogm) {
const guint8 *data;
long bytes;
data = packet->packet;
bytes = packet->bytes;
if (bytes < 1)
goto empty_packet;
if ((data[0] & 1) || (data[0] & 3 && pad->map.is_ogm_text)) {
/* We don't push header packets for OGM */
goto done;
}
offset = 1 + (((data[0] & 0xc0) >> 6) | ((data[0] & 0x02) << 1));
delta_unit = (((data[0] & 0x08) >> 3) == 0);
trim = 0;
/* Strip trailing \0 for subtitles */
if (pad->map.is_ogm_text) {
while (bytes && data[bytes - 1] == 0) {
trim++;
bytes--;
}
}
} else if (pad->map.is_vp8) {
if ((packet->bytes >= 7 && memcmp (packet->packet, "OVP80\2 ", 7) == 0) ||
packet->b_o_s ||
(packet->bytes >= 5 && memcmp (packet->packet, "OVP80", 5) == 0)) {
/* We don't push header packets for VP8 */
goto done;
}
offset = 0;
trim = 0;
} else {
offset = 0;
trim = 0;
}
/* get timing info for the packet */
if (gst_ogg_stream_packet_is_header (&pad->map, packet)) {
duration = 0;
GST_DEBUG_OBJECT (ogg, "packet is header");
} else {
duration = gst_ogg_stream_get_packet_duration (&pad->map, packet);
GST_DEBUG_OBJECT (ogg, "packet duration %" G_GUINT64_FORMAT, duration);
}
if (packet->b_o_s) {
out_timestamp = GST_CLOCK_TIME_NONE;
out_duration = GST_CLOCK_TIME_NONE;
out_offset = 0;
out_offset_end = -1;
} else {
if (packet->granulepos != -1) {
pad->current_granule = gst_ogg_stream_granulepos_to_granule (&pad->map,
packet->granulepos);
pad->keyframe_granule =
gst_ogg_stream_granulepos_to_key_granule (&pad->map,
packet->granulepos);
GST_DEBUG_OBJECT (ogg, "new granule %" G_GUINT64_FORMAT,
pad->current_granule);
} else if (ogg->segment.rate > 0.0 && pad->current_granule != -1) {
pad->current_granule += duration;
GST_DEBUG_OBJECT (ogg, "interpollating granule %" G_GUINT64_FORMAT,
pad->current_granule);
}
if (ogg->segment.rate < 0.0 && packet->granulepos == -1) {
/* negative rates, only set timestamp on the packets with a granulepos */
out_timestamp = -1;
out_duration = -1;
out_offset = -1;
out_offset_end = -1;
} else {
/* we only push buffers after we have a valid granule. This is done so that
* we nicely skip packets without a timestamp after a seek. This is ok
* because we base or seek on the packet after the page with the smaller
* timestamp. */
if (pad->current_granule == -1)
goto no_timestamp;
if (pad->map.is_ogm) {
out_timestamp = gst_ogg_stream_granule_to_time (&pad->map,
pad->current_granule);
out_duration = gst_util_uint64_scale (duration,
GST_SECOND * pad->map.granulerate_d, pad->map.granulerate_n);
} else if (pad->map.is_sparse) {
out_timestamp = gst_ogg_stream_granule_to_time (&pad->map,
pad->current_granule);
if (duration == GST_CLOCK_TIME_NONE) {
out_duration = GST_CLOCK_TIME_NONE;
} else {
out_duration = gst_util_uint64_scale (duration,
GST_SECOND * pad->map.granulerate_d, pad->map.granulerate_n);
}
} else {
out_timestamp = gst_ogg_stream_granule_to_time (&pad->map,
pad->current_granule - duration);
out_duration =
gst_ogg_stream_granule_to_time (&pad->map,
pad->current_granule) - out_timestamp;
}
out_offset_end =
gst_ogg_stream_granule_to_granulepos (&pad->map,
pad->current_granule, pad->keyframe_granule);
out_offset =
gst_ogg_stream_granule_to_time (&pad->map, pad->current_granule);
}
}
if (pad->map.is_ogm_text) {
/* check for invalid buffer sizes */
if (G_UNLIKELY (offset + trim >= packet->bytes))
goto empty_packet;
}
if (!pad->added)
goto not_added;
buf = gst_buffer_new_and_alloc (packet->bytes - offset - trim);
gst_buffer_set_caps (buf, GST_PAD_CAPS (pad));
/* set delta flag for OGM content */
if (delta_unit)
GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT);
/* copy packet in buffer */
memcpy (buf->data, packet->packet + offset, packet->bytes - offset - trim);
GST_BUFFER_TIMESTAMP (buf) = out_timestamp;
GST_BUFFER_DURATION (buf) = out_duration;
GST_BUFFER_OFFSET (buf) = out_offset;
GST_BUFFER_OFFSET_END (buf) = out_offset_end;
/* Mark discont on the buffer */
if (pad->discont) {
GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLAG_DISCONT);
pad->discont = FALSE;
}
pad->last_stop = ogg->segment.last_stop;
/* don't push the header packets when we are asked to skip them */
if (!packet->b_o_s || push_headers) {
ret = gst_pad_push (GST_PAD_CAST (pad), buf);
buf = NULL;
/* combine flows */
cret = gst_ogg_demux_combine_flows (ogg, pad, ret);
}
/* we're done with skeleton stuff */
if (pad->map.is_skeleton)
goto done;
/* check if valid granulepos, then we can calculate the current
* position. We know the granule for each packet but we only want to update
* the last_stop when we have a valid granulepos on the packet because else
* our time jumps around for the different streams. */
if (packet->granulepos < 0)
goto done;
/* convert to time */
current_time = gst_ogg_stream_get_end_time_for_granulepos (&pad->map,
packet->granulepos);
/* convert to stream time */
if ((chain = pad->chain)) {
gint64 chain_start = 0;
if (chain->segment_start != GST_CLOCK_TIME_NONE)
chain_start = chain->segment_start;
current_time = current_time - chain_start + chain->begin_time;
}
/* and store as the current position */
gst_segment_set_last_stop (&ogg->segment, GST_FORMAT_TIME, current_time);
GST_DEBUG_OBJECT (ogg, "ogg current time %" GST_TIME_FORMAT,
GST_TIME_ARGS (current_time));
/* check stream eos */
if ((ogg->segment.rate > 0.0 && ogg->segment.stop != GST_CLOCK_TIME_NONE &&
current_time > ogg->segment.stop) ||
(ogg->segment.rate < 0.0 && ogg->segment.start != GST_CLOCK_TIME_NONE &&
current_time < ogg->segment.start)) {
GST_DEBUG_OBJECT (ogg, "marking pad %p EOS", pad);
pad->is_eos = TRUE;
}
done:
if (buf)
gst_buffer_unref (buf);
/* return combined flow result */
return cret;
/* special cases */
empty_packet:
{
GST_DEBUG_OBJECT (ogg, "Skipping empty packet");
goto done;
}
no_timestamp:
{
GST_DEBUG_OBJECT (ogg, "skipping packet: no valid granule found yet");
goto done;
}
not_added:
{
GST_DEBUG_OBJECT (ogg, "pad not added yet");
goto done;
}
}
static guint64
gst_ogg_demux_collect_start_time (GstOggDemux * ogg, GstOggChain * chain)
{
gint i;
guint64 start_time = G_MAXUINT64;
for (i = 0; i < chain->streams->len; i++) {
GstOggPad *pad = g_array_index (chain->streams, GstOggPad *, i);
if (pad->map.is_skeleton)
continue;
/* can do this if the pad start time is not defined */
GST_DEBUG_OBJECT (ogg, "Pad %08x (%s) start time is %" GST_TIME_FORMAT,
pad->map.serialno, gst_ogg_stream_get_media_type (&pad->map),
GST_TIME_ARGS (pad->start_time));
if (pad->start_time == GST_CLOCK_TIME_NONE) {
if (!pad->map.is_sparse) {
start_time = G_MAXUINT64;
break;
}
} else {
start_time = MIN (start_time, pad->start_time);
}
}
return start_time;
}
static GstClockTime
gst_ogg_demux_collect_sync_time (GstOggDemux * ogg, GstOggChain * chain)
{
gint i;
GstClockTime sync_time = GST_CLOCK_TIME_NONE;
if (!chain) {
GST_WARNING_OBJECT (ogg, "No chain!");
return GST_CLOCK_TIME_NONE;
}
for (i = 0; i < chain->streams->len; i++) {
GstOggPad *pad = g_array_index (chain->streams, GstOggPad *, i);
if (pad->map.is_sparse)
continue;
if (pad->push_sync_time == GST_CLOCK_TIME_NONE) {
sync_time = GST_CLOCK_TIME_NONE;
break;
} else {
if (sync_time == GST_CLOCK_TIME_NONE)
sync_time = pad->push_sync_time;
else
sync_time = MAX (sync_time, pad->push_sync_time);
}
}
return sync_time;
}
/* submit a packet to the oggpad, this function will run the
* typefind code for the pad if this is the first packet for this
* stream
*/
static GstFlowReturn
gst_ogg_pad_submit_packet (GstOggPad * pad, ogg_packet * packet)
{
gint64 granule;
GstFlowReturn ret = GST_FLOW_OK;
GstOggDemux *ogg = pad->ogg;
GST_DEBUG_OBJECT (ogg, "%p submit packet serial %08x",
pad, pad->map.serialno);
if (!pad->have_type) {
pad->have_type = gst_ogg_stream_setup_map (&pad->map, packet);
if (!pad->have_type) {
pad->map.caps = gst_caps_new_simple ("application/x-unknown", NULL);
}
if (pad->map.is_skeleton) {
GST_DEBUG_OBJECT (ogg, "we have a fishead");
/* copy values over to global ogg level */
ogg->basetime = pad->map.basetime;
ogg->prestime = pad->map.prestime;
/* use total time to update the total ogg time */
if (ogg->total_time == -1) {
ogg->total_time = pad->map.total_time;
} else if (pad->map.total_time > 0) {
ogg->total_time = MAX (ogg->total_time, pad->map.total_time);
}
}
if (pad->map.caps) {
gst_pad_set_caps (GST_PAD (pad), pad->map.caps);
} else {
GST_WARNING_OBJECT (ogg, "stream parser didn't create src pad caps");
}
}
if (pad->map.is_skeleton) {
guint32 serialno;
GstOggPad *skel_pad;
GstOggSkeleton type;
/* try to parse the serialno first */
if (gst_ogg_map_parse_fisbone (&pad->map, packet->packet, packet->bytes,
&serialno, &type)) {
GST_DEBUG_OBJECT (pad->ogg,
"got skeleton packet for stream 0x%08x", serialno);
skel_pad = gst_ogg_chain_get_stream (pad->chain, serialno);
if (skel_pad) {
switch (type) {
case GST_OGG_SKELETON_FISBONE:
/* parse the remainder of the fisbone in the pad with the serialno,
* note that we ignore the start_time as this is usually wrong for
* live streams */
gst_ogg_map_add_fisbone (&skel_pad->map, &pad->map, packet->packet,
packet->bytes, NULL);
break;
case GST_OGG_SKELETON_INDEX:
gst_ogg_map_add_index (&skel_pad->map, &pad->map, packet->packet,
packet->bytes);
/* use total time to update the total ogg time */
if (ogg->total_time == -1) {
ogg->total_time = skel_pad->map.total_time;
} else if (skel_pad->map.total_time > 0) {
ogg->total_time = MAX (ogg->total_time, skel_pad->map.total_time);
}
break;
default:
break;
}
} else {
GST_WARNING_OBJECT (pad->ogg,
"found skeleton fisbone for an unknown stream 0x%08x", serialno);
}
}
}
granule = gst_ogg_stream_granulepos_to_granule (&pad->map,
packet->granulepos);
if (granule != -1) {
GST_DEBUG_OBJECT (ogg, "%p has granulepos %" G_GINT64_FORMAT, pad, granule);
pad->current_granule = granule;
}
/* restart header packet count when seeing a b_o_s page;
* particularly useful following a seek or even following chain finding */
if (packet->b_o_s) {
GST_DEBUG_OBJECT (ogg, "b_o_s packet, resetting header packet count");
pad->map.n_header_packets_seen = 0;
if (!pad->map.have_headers) {
GST_DEBUG_OBJECT (ogg, "clearing header packets");
g_list_foreach (pad->map.headers, (GFunc) _ogg_packet_free, NULL);
g_list_free (pad->map.headers);
pad->map.headers = NULL;
}
}
/* Overload the value of b_o_s in ogg_packet with a flag whether or
* not this is a header packet. Maybe some day this could be cleaned
* up. */
packet->b_o_s = gst_ogg_stream_packet_is_header (&pad->map, packet);
if (!packet->b_o_s) {
GST_DEBUG ("found non-header packet");
pad->map.have_headers = TRUE;
if (pad->start_time == GST_CLOCK_TIME_NONE) {
gint64 duration = gst_ogg_stream_get_packet_duration (&pad->map, packet);
GST_DEBUG ("duration %" G_GINT64_FORMAT, duration);
if (duration != -1) {
pad->map.accumulated_granule += duration;
GST_DEBUG ("accumulated granule %" G_GINT64_FORMAT,
pad->map.accumulated_granule);
}
if (packet->granulepos != -1) {
ogg_int64_t start_granule;
gint64 granule;
granule = gst_ogg_stream_granulepos_to_granule (&pad->map,
packet->granulepos);
if (granule > pad->map.accumulated_granule)
start_granule = granule - pad->map.accumulated_granule;
else
start_granule = 0;
pad->start_time = gst_ogg_stream_granule_to_time (&pad->map,
start_granule);
GST_DEBUG_OBJECT (ogg,
"start time %" GST_TIME_FORMAT " (%" GST_TIME_FORMAT ") for %s",
GST_TIME_ARGS (pad->start_time), GST_TIME_ARGS (pad->start_time),
gst_ogg_stream_get_media_type (&pad->map));
} else {
packet->granulepos = gst_ogg_stream_granule_to_granulepos (&pad->map,
pad->map.accumulated_granule, pad->keyframe_granule);
}
}
} else {
/* look for tags in header packet (before inc header count) */
gst_ogg_stream_extract_tags (&pad->map, packet);
pad->map.n_header_packets_seen++;
if (!pad->map.have_headers) {
pad->map.headers =
g_list_append (pad->map.headers, _ogg_packet_copy (packet));
GST_DEBUG ("keeping header packet %d", pad->map.n_header_packets_seen);
}
}
/* we know the start_time of the pad data, see if we
* can activate the complete chain if this is a dynamic
* chain. We need all the headers too for this. */
if (pad->start_time != GST_CLOCK_TIME_NONE && pad->map.have_headers) {
GstOggChain *chain = pad->chain;
/* check if complete chain has start time */
if (chain == ogg->building_chain) {
GstEvent *event = NULL;
if (ogg->resync) {
guint64 start_time;
GST_DEBUG_OBJECT (ogg, "need to resync");
/* when we need to resync after a seek, we wait until we have received
* timestamps on all streams */
start_time = gst_ogg_demux_collect_start_time (ogg, chain);
if (start_time != G_MAXUINT64) {
gint64 segment_time;
GST_DEBUG_OBJECT (ogg, "start_time: %" GST_TIME_FORMAT,
GST_TIME_ARGS (start_time));
if (chain->segment_start < start_time)
segment_time =
(start_time - chain->segment_start) + chain->begin_time;
else
segment_time = chain->begin_time;
/* create the newsegment event we are going to send out */
GST_PUSH_LOCK (ogg);
if (!ogg->pullmode && ogg->push_state == PUSH_LINEAR2) {
/* if we are fast forwarding to the actual seek target,
ensure previous frames are clipped */
GST_DEBUG_OBJECT (ogg,
"Resynced, starting segment at %" GST_TIME_FORMAT
", start_time %" GST_TIME_FORMAT,
GST_TIME_ARGS (ogg->push_seek_time_original_target),
GST_TIME_ARGS (start_time));
event =
gst_event_new_new_segment (FALSE, ogg->push_seek_rate,
GST_FORMAT_TIME, ogg->push_seek_time_original_target, -1,
ogg->push_seek_time_original_target);
ogg->push_state = PUSH_PLAYING;
} else {
event = gst_event_new_new_segment (FALSE, ogg->segment.rate,
GST_FORMAT_TIME, start_time, chain->segment_stop, segment_time);
}
GST_PUSH_UNLOCK (ogg);
ogg->resync = FALSE;
}
} else {
/* see if we have enough info to activate the chain, we have enough info
* when all streams have a valid start time. */
if (gst_ogg_demux_collect_chain_info (ogg, chain)) {
GST_DEBUG_OBJECT (ogg, "segment_start: %" GST_TIME_FORMAT,
GST_TIME_ARGS (chain->segment_start));
GST_DEBUG_OBJECT (ogg, "segment_stop: %" GST_TIME_FORMAT,
GST_TIME_ARGS (chain->segment_stop));
GST_DEBUG_OBJECT (ogg, "segment_time: %" GST_TIME_FORMAT,
GST_TIME_ARGS (chain->begin_time));
/* create the newsegment event we are going to send out */
event = gst_event_new_new_segment (FALSE, ogg->segment.rate,
GST_FORMAT_TIME, chain->segment_start, chain->segment_stop,
chain->begin_time);
}
}
if (event) {
gst_event_set_seqnum (event, ogg->seqnum);
gst_ogg_demux_activate_chain (ogg, chain, event);
ogg->building_chain = NULL;
}
}
}
/* if we are building a chain, store buffer for when we activate
* it. This path is taken if we operate in streaming mode. */
if (ogg->building_chain) {
/* bos packets where stored in the header list so we can discard
* them here*/
if (!packet->b_o_s)
ret = gst_ogg_demux_queue_data (pad, packet);
}
/* else we are completely streaming to the peer */
else {
ret = gst_ogg_demux_chain_peer (pad, packet, !ogg->pullmode);
}
return ret;
}
/* flush at most @npackets from the stream layer. All packets if
* @npackets is 0;
*/
static GstFlowReturn
gst_ogg_pad_stream_out (GstOggPad * pad, gint npackets)
{
GstFlowReturn result = GST_FLOW_OK;
gboolean done = FALSE;
GstOggDemux *ogg;
ogg = pad->ogg;
while (!done) {
int ret;
ogg_packet packet;
ret = ogg_stream_packetout (&pad->map.stream, &packet);
switch (ret) {
case 0:
GST_LOG_OBJECT (ogg, "packetout done");
done = TRUE;
break;
case -1:
GST_LOG_OBJECT (ogg, "packetout discont");
if (!pad->map.is_sparse) {
gst_ogg_chain_mark_discont (pad->chain);
} else {
gst_ogg_pad_mark_discont (pad);
}
break;
case 1:
GST_LOG_OBJECT (ogg, "packetout gave packet of size %ld", packet.bytes);
result = gst_ogg_pad_submit_packet (pad, &packet);
/* not linked is not a problem, it's possible that we are still
* collecting headers and that we don't have exposed the pads yet */
if (result == GST_FLOW_NOT_LINKED)
break;
else if (result <= GST_FLOW_UNEXPECTED)
goto could_not_submit;
break;
default:
GST_WARNING_OBJECT (ogg,
"invalid return value %d for ogg_stream_packetout, resetting stream",
ret);
gst_ogg_pad_reset (pad);
break;
}
if (npackets > 0) {
npackets--;
done = (npackets == 0);
}
}
return result;
/* ERRORS */
could_not_submit:
{
GST_WARNING_OBJECT (ogg,
"could not submit packet for stream %08x, "
"error: %d", pad->map.serialno, result);
gst_ogg_pad_reset (pad);
return result;
}
}
static void
gst_ogg_demux_setup_bisection_bounds (GstOggDemux * ogg)
{
if (ogg->push_last_seek_time >= ogg->push_seek_time_target) {
GST_DEBUG_OBJECT (ogg, "We overshot by %" GST_TIME_FORMAT,
GST_TIME_ARGS (ogg->push_last_seek_time - ogg->push_seek_time_target));
ogg->push_offset1 = ogg->push_last_seek_offset;
ogg->push_time1 = ogg->push_last_seek_time;
} else {
GST_DEBUG_OBJECT (ogg, "We undershot by %" GST_TIME_FORMAT,
GST_TIME_ARGS (ogg->push_seek_time_target - ogg->push_last_seek_time));
ogg->push_offset0 = ogg->push_last_seek_offset;
ogg->push_time0 = ogg->push_last_seek_time;
}
}
static gint64
gst_ogg_demux_estimate_bisection_target (GstOggDemux * ogg)
{
gint64 best;
gint64 segment_bitrate;
/* we might not know the length of the stream in time,
so push_time1 might not be set */
GST_DEBUG_OBJECT (ogg,
"push time 1: %" GST_TIME_FORMAT ", dbytes %" G_GINT64_FORMAT,
GST_TIME_ARGS (ogg->push_time1), ogg->push_offset1 - ogg->push_offset0);
if (ogg->push_time1 == GST_CLOCK_TIME_NONE) {
GST_DEBUG_OBJECT (ogg,
"New segment to consider: bytes %" G_GINT64_FORMAT " %" G_GINT64_FORMAT
", time %" GST_TIME_FORMAT " (open ended)", ogg->push_offset0,
ogg->push_offset1, GST_TIME_ARGS (ogg->push_time0));
if (ogg->push_last_seek_time == ogg->push_start_time) {
/* if we're at start and don't know the end time, we can't estimate
bitrate, so get the nominal declared bitrate as a failsafe, or some
random constant which will be discarded after we made a (probably
dire) first guess */
segment_bitrate = (ogg->bitrate > 0 ? ogg->bitrate : 1000);
} else {
segment_bitrate =
gst_util_uint64_scale (ogg->push_last_seek_offset - 0,
8 * GST_SECOND, ogg->push_last_seek_time - ogg->push_start_time);
}
best =
ogg->push_offset0 +
gst_util_uint64_scale (ogg->push_seek_time_target - ogg->push_time0,
segment_bitrate, 8 * GST_SECOND);
} else {
GST_DEBUG_OBJECT (ogg,
"New segment to consider: bytes %" G_GINT64_FORMAT " %" G_GINT64_FORMAT
", time %" GST_TIME_FORMAT " %" GST_TIME_FORMAT, ogg->push_offset0,
ogg->push_offset1, GST_TIME_ARGS (ogg->push_time0),
GST_TIME_ARGS (ogg->push_time1));
if (ogg->push_time0 == ogg->push_time1) {
best = ogg->push_offset0;
} else {
segment_bitrate =
gst_util_uint64_scale (ogg->push_offset1 - ogg->push_offset0,
8 * GST_SECOND, ogg->push_time1 - ogg->push_time0);
GST_DEBUG_OBJECT (ogg,
"Local bitrate on the %" GST_TIME_FORMAT " - %" GST_TIME_FORMAT
" segment: %" G_GINT64_FORMAT, GST_TIME_ARGS (ogg->push_time0),
GST_TIME_ARGS (ogg->push_time1), segment_bitrate);
best =
ogg->push_offset0 +
gst_util_uint64_scale (ogg->push_seek_time_target - ogg->push_time0,
segment_bitrate, 8 * GST_SECOND);
}
}
/* offset by typical page size */
best -= CHUNKSIZE;
if (best < ogg->push_offset0)
best = ogg->push_offset0;
if (best < 0)
best = 0;
return best;
}
static void
gst_ogg_demux_record_keyframe_time (GstOggDemux * ogg, GstOggPad * pad,
ogg_int64_t granpos)
{
gint64 kf_granule;
GstClockTime kf_time;
kf_granule = gst_ogg_stream_granulepos_to_key_granule (&pad->map, granpos);
kf_time = gst_ogg_stream_granule_to_time (&pad->map, kf_granule);
pad->push_kf_time = kf_time;
}
/* returns the earliest keyframe time for all non sparse pads in the chain,
* if known, and GST_CLOCK_TIME_NONE if not */
static GstClockTime
gst_ogg_demux_get_earliest_keyframe_time (GstOggDemux * ogg)
{
GstClockTime t = GST_CLOCK_TIME_NONE;
GstOggChain *chain = ogg->building_chain;
int i;
if (!chain) {
GST_WARNING_OBJECT (ogg, "No chain!");
return GST_CLOCK_TIME_NONE;
}
for (i = 0; i < chain->streams->len; i++) {
GstOggPad *pad = g_array_index (chain->streams, GstOggPad *, i);
if (pad->map.is_sparse)
continue;
if (pad->push_kf_time == GST_CLOCK_TIME_NONE)
return GST_CLOCK_TIME_NONE;
if (t == GST_CLOCK_TIME_NONE || pad->push_kf_time < t)
t = pad->push_kf_time;
}
return t;
}
/* MUST be called with the push lock locked, and will unlock it
regardless of return value. */
static GstFlowReturn
gst_ogg_demux_seek_back_after_push_duration_check_unlock (GstOggDemux * ogg)
{
GstEvent *event;
/* Get the delayed event, if any */
event = ogg->push_mode_seek_delayed_event;
ogg->push_mode_seek_delayed_event = NULL;
ogg->push_state = PUSH_PLAYING;
GST_PUSH_UNLOCK (ogg);
if (event) {
/* If there is one, perform it */
gst_ogg_demux_perform_seek_push (ogg, event);
} else {
/* If there wasn't, seek back at start to start normal playback */
GST_INFO_OBJECT (ogg, "Seeking back to 0 after duration check");
event = gst_event_new_seek (1.0, GST_FORMAT_BYTES,
GST_SEEK_FLAG_ACCURATE | GST_SEEK_FLAG_FLUSH,
GST_SEEK_TYPE_SET, 1, GST_SEEK_TYPE_NONE, GST_CLOCK_TIME_NONE);
if (!gst_pad_push_event (ogg->sinkpad, event)) {
GST_WARNING_OBJECT (ogg, "Failed seeking back to start");
return GST_FLOW_ERROR;
}
}
return GST_FLOW_OK;
}
static gboolean
gst_ogg_pad_handle_push_mode_state (GstOggPad * pad, ogg_page * page)
{
GstOggDemux *ogg = pad->ogg;
ogg_int64_t granpos = ogg_page_granulepos (page);
GST_PUSH_LOCK (ogg);
if (granpos >= 0) {
if (ogg->push_start_time == GST_CLOCK_TIME_NONE) {
ogg->push_start_time =
gst_ogg_stream_get_start_time_for_granulepos (&pad->map, granpos);
GST_DEBUG_OBJECT (ogg, "Stream start time: %" GST_TIME_FORMAT,
GST_TIME_ARGS (ogg->push_start_time));
}
ogg->push_time_offset =
gst_ogg_stream_get_end_time_for_granulepos (&pad->map, granpos);
if (ogg->push_time_offset > 0) {
GST_DEBUG_OBJECT (ogg, "Bitrate since start: %" G_GUINT64_FORMAT,
gst_util_uint64_scale (ogg->push_byte_offset, 8 * GST_SECOND,
ogg->push_time_offset));
}
if (ogg->push_state == PUSH_DURATION) {
GstClockTime t =
gst_ogg_stream_get_end_time_for_granulepos (&pad->map, granpos);
if (ogg->total_time == GST_CLOCK_TIME_NONE || t > ogg->total_time) {
GST_DEBUG_OBJECT (ogg, "New total time: %" GST_TIME_FORMAT,
GST_TIME_ARGS (t));
ogg->total_time = t;
ogg->push_time_length = t;
}
/* If we were determining the duration of the stream, we're now done,
and can get back to sending the original event we delayed.
We stop a bit before the end of the stream, as if we get a EOS
event and there is a queue2 upstream (such as when using playbin2),
it will pause the task *after* we come back from the EOS handler,
so we cannot prevent the pausing by issuing a seek. */
if (ogg->push_byte_offset + EOS_AVOIDANCE_THRESHOLD >=
ogg->push_byte_length) {
GstMessage *message;
GstFlowReturn res;
/* tell the pipeline we've just found out the duration */
GST_INFO_OBJECT (ogg, "New duration found: %" GST_TIME_FORMAT,
GST_TIME_ARGS (ogg->total_time));
message =
gst_message_new_duration (GST_OBJECT (ogg), GST_FORMAT_TIME,
ogg->total_time);
gst_element_post_message (GST_ELEMENT (ogg), message);
GST_DEBUG_OBJECT (ogg,
"We're close enough to the end, and we're scared "
"to get too close, seeking back to start");
res = gst_ogg_demux_seek_back_after_push_duration_check_unlock (ogg);
if (res != GST_FLOW_OK)
return res;
return GST_FLOW_SKIP_PUSH;
} else {
GST_PUSH_UNLOCK (ogg);
}
return GST_FLOW_SKIP_PUSH;
}
}
/* if we're seeking, look at time, and decide what to do */
if (ogg->push_state != PUSH_PLAYING && ogg->push_state != PUSH_LINEAR2) {
GstClockTime t;
gint64 best = -1;
GstEvent *sevent;
int res;
gboolean close_enough;
/* ignore -1 granpos when seeking, we want to sync on a real granpos */
if (granpos < 0) {
GST_PUSH_UNLOCK (ogg);
if (ogg_stream_pagein (&pad->map.stream, page) != 0)
goto choked;
return GST_FLOW_SKIP_PUSH;
}
t = gst_ogg_stream_get_end_time_for_granulepos (&pad->map, granpos);
if (ogg->push_state == PUSH_BISECT1 || ogg->push_state == PUSH_BISECT2) {
GstClockTime sync_time;
if (pad->push_sync_time == GST_CLOCK_TIME_NONE)
pad->push_sync_time = t;
GST_DEBUG_OBJECT (ogg, "Got timestamp %" GST_TIME_FORMAT " for %s",
GST_TIME_ARGS (t), gst_ogg_stream_get_media_type (&pad->map));
sync_time = gst_ogg_demux_collect_sync_time (ogg, ogg->building_chain);
if (sync_time == GST_CLOCK_TIME_NONE) {
GST_PUSH_UNLOCK (ogg);
GST_DEBUG_OBJECT (ogg,
"Not enough timing info collected for sync, waiting for more");
if (ogg_stream_pagein (&pad->map.stream, page) != 0)
goto choked;
return GST_FLOW_SKIP_PUSH;
}
ogg->push_last_seek_time = sync_time;
GST_DEBUG_OBJECT (ogg,
"Bisection just seeked at %" G_GINT64_FORMAT ", time %"
GST_TIME_FORMAT ", target was %" GST_TIME_FORMAT,
ogg->push_last_seek_offset,
GST_TIME_ARGS (ogg->push_last_seek_time),
GST_TIME_ARGS (ogg->push_seek_time_target));
if (ogg->push_time1 != GST_CLOCK_TIME_NONE) {
GST_DEBUG_OBJECT (ogg,
"Interval was %" G_GINT64_FORMAT " - %" G_GINT64_FORMAT " (%"
G_GINT64_FORMAT "), time %" GST_TIME_FORMAT " - %" GST_TIME_FORMAT
" (%" GST_TIME_FORMAT ")", ogg->push_offset0, ogg->push_offset1,
ogg->push_offset1 - ogg->push_offset0,
GST_TIME_ARGS (ogg->push_time0), GST_TIME_ARGS (ogg->push_time1),
GST_TIME_ARGS (ogg->push_time1 - ogg->push_time0));
} else {
GST_DEBUG_OBJECT (ogg,
"Interval was %" G_GINT64_FORMAT " - %" G_GINT64_FORMAT " (%"
G_GINT64_FORMAT "), time %" GST_TIME_FORMAT " - unknown",
ogg->push_offset0, ogg->push_offset1,
ogg->push_offset1 - ogg->push_offset0,
GST_TIME_ARGS (ogg->push_time0));
}
gst_ogg_demux_setup_bisection_bounds (ogg);
best = gst_ogg_demux_estimate_bisection_target (ogg);
if (ogg->push_seek_time_target == 0) {
GST_DEBUG_OBJECT (ogg, "Seeking to 0, deemed close enough");
close_enough = (ogg->push_last_seek_time == 0);
} else {
/* TODO: make this dependent on framerate ? */
GstClockTime threshold = GST_SECOND / 2;
/* We want to be within half a second before the target */
if (threshold > ogg->push_seek_time_target)
threshold = ogg->push_seek_time_target;
close_enough = ogg->push_last_seek_time < ogg->push_seek_time_target
&& ogg->push_last_seek_time >=
ogg->push_seek_time_target - threshold;
GST_DEBUG_OBJECT (ogg,
"testing if we're close enough: %" GST_TIME_FORMAT " <= %"
GST_TIME_FORMAT " < %" GST_TIME_FORMAT " ? %s",
GST_TIME_ARGS (ogg->push_seek_time_target - threshold),
GST_TIME_ARGS (ogg->push_last_seek_time),
GST_TIME_ARGS (ogg->push_seek_time_target),
close_enough ? "Yes" : "No");
}
if (close_enough || best == ogg->push_last_seek_offset) {
if (ogg->push_state == PUSH_BISECT1) {
/* we now know the time segment we'll have to search for
the second bisection */
ogg->push_time0 = ogg->push_start_time;
ogg->push_offset0 = 0;
GST_DEBUG_OBJECT (ogg,
"Seek to %" GST_TIME_FORMAT
" (%lx) done, now gathering pages for all non-sparse streams",
GST_TIME_ARGS (ogg->push_seek_time_target), (long) granpos);
ogg->push_state = PUSH_LINEAR1;
} else {
/* If we're asked for an accurate seek, we'll go forward till
we get to the original seek target time, else we'll just drop
here at the keyframe */
if (ogg->push_seek_flags & GST_SEEK_FLAG_ACCURATE) {
GST_INFO_OBJECT (ogg,
"Seek to keyframe at %" GST_TIME_FORMAT " done (we're at %"
GST_TIME_FORMAT "), skipping to original target (%"
GST_TIME_FORMAT ")",
GST_TIME_ARGS (ogg->push_seek_time_target),
GST_TIME_ARGS (sync_time),
GST_TIME_ARGS (ogg->push_seek_time_original_target));
ogg->push_state = PUSH_LINEAR2;
} else {
GST_DEBUG_OBJECT (ogg, "Seek to keyframe done, playing");
/* we're synced to the seek target, so flush stream and stuff
any queued pages into the stream so we start decoding there */
ogg->push_state = PUSH_PLAYING;
}
GST_INFO_OBJECT (ogg, "Bisection needed %d + %d steps",
ogg->push_bisection_steps[0], ogg->push_bisection_steps[1]);
}
}
} else if (ogg->push_state == PUSH_LINEAR1) {
if (pad->push_kf_time == GST_CLOCK_TIME_NONE) {
GstClockTime earliest_keyframe_time;
gst_ogg_demux_record_keyframe_time (ogg, pad, granpos);
GST_DEBUG_OBJECT (ogg,
"Previous keyframe for %s stream at %" GST_TIME_FORMAT,
gst_ogg_stream_get_media_type (&pad->map),
GST_TIME_ARGS (pad->push_kf_time));
earliest_keyframe_time = gst_ogg_demux_get_earliest_keyframe_time (ogg);
if (earliest_keyframe_time != GST_CLOCK_TIME_NONE) {
GST_DEBUG_OBJECT (ogg,
"All non sparse streams now have a previous keyframe time,"
"bisecting again to %" GST_TIME_FORMAT,
GST_TIME_ARGS (earliest_keyframe_time));
ogg->push_seek_time_target = earliest_keyframe_time;
ogg->push_state = PUSH_BISECT2;
best = gst_ogg_demux_estimate_bisection_target (ogg);
}
}
}
if (ogg->push_state == PUSH_BISECT1 || ogg->push_state == PUSH_BISECT2) {
gint i;
ogg_sync_reset (&ogg->sync);
for (i = 0; i < ogg->building_chain->streams->len; i++) {
GstOggPad *pad =
g_array_index (ogg->building_chain->streams, GstOggPad *, i);
pad->push_sync_time = GST_CLOCK_TIME_NONE;
ogg_stream_reset (&pad->map.stream);
}
GST_DEBUG_OBJECT (ogg,
"seeking to %" G_GINT64_FORMAT " - %" G_GINT64_FORMAT, best,
(gint64) - 1);
/* do seek */
g_assert (best != -1);
ogg->push_bisection_steps[ogg->push_state == PUSH_BISECT2 ? 1 : 0]++;
sevent =
gst_event_new_seek (ogg->push_seek_rate, GST_FORMAT_BYTES,
ogg->push_seek_flags, GST_SEEK_TYPE_SET, best,
GST_SEEK_TYPE_NONE, -1);
GST_PUSH_UNLOCK (ogg);
res = gst_pad_push_event (ogg->sinkpad, sevent);
if (!res) {
/* We failed to send the seek event, notify the pipeline */
GST_ELEMENT_ERROR (ogg, RESOURCE, SEEK, (NULL), ("Failed to seek"));
return GST_FLOW_ERROR;
}
return GST_FLOW_SKIP_PUSH;
}
if (ogg->push_state != PUSH_PLAYING) {
GST_PUSH_UNLOCK (ogg);
return GST_FLOW_SKIP_PUSH;
}
}
GST_PUSH_UNLOCK (ogg);
return GST_FLOW_OK;
choked:
{
GST_WARNING_OBJECT (ogg,
"ogg stream choked on page (serial %08x), "
"resetting stream", pad->map.serialno);
gst_ogg_pad_reset (pad);
/* we continue to recover */
return GST_FLOW_SKIP_PUSH;
}
}
/* submit a page to an oggpad, this function will then submit all
* the packets in the page.
*/
static GstFlowReturn
gst_ogg_pad_submit_page (GstOggPad * pad, ogg_page * page)
{
GstFlowReturn result = GST_FLOW_OK;
GstOggDemux *ogg;
gboolean continued = FALSE;
ogg = pad->ogg;
/* for negative rates we read pages backwards and must therefore be careful
* with continued pages */
if (ogg->segment.rate < 0.0) {
gint npackets;
continued = ogg_page_continued (page);
/* number of completed packets in the page */
npackets = ogg_page_packets (page);
if (!continued) {
/* page is not continued so it contains at least one packet start. It's
* possible that no packet ends on this page (npackets == 0). In that
* case, the next (continued) page(s) we kept contain the remainder of the
* packets. We mark npackets=1 to make us start decoding the pages in the
* remainder of the algorithm. */
if (npackets == 0)
npackets = 1;
}
GST_LOG_OBJECT (ogg, "continued: %d, %d packets", continued, npackets);
if (npackets == 0) {
GST_LOG_OBJECT (ogg, "no decodable packets, we need a previous page");
goto done;
}
}
/* keep track of time in push mode */
if (!ogg->pullmode) {
result = gst_ogg_pad_handle_push_mode_state (pad, page);
if (result == GST_FLOW_SKIP_PUSH)
return GST_FLOW_OK;
if (result != GST_FLOW_OK)
return result;
}
if (ogg_stream_pagein (&pad->map.stream, page) != 0)
goto choked;
/* flush all packets in the stream layer, this might not give a packet if
* the page had no packets finishing on the page (npackets == 0). */
result = gst_ogg_pad_stream_out (pad, 0);
if (pad->continued) {
ogg_packet packet;
/* now send the continued pages to the stream layer */
while (pad->continued) {
ogg_page *p = (ogg_page *) pad->continued->data;
GST_LOG_OBJECT (ogg, "submitting continued page %p", p);
if (ogg_stream_pagein (&pad->map.stream, p) != 0)
goto choked;
pad->continued = g_list_delete_link (pad->continued, pad->continued);
/* free the page */
gst_ogg_page_free (p);
}
GST_LOG_OBJECT (ogg, "flushing last continued packet");
/* flush 1 continued packet in the stream layer */
result = gst_ogg_pad_stream_out (pad, 1);
/* flush all remaining packets, we pushed them in the previous round.
* We don't use _reset() because we still want to get the discont when
* we submit a next page. */
while (ogg_stream_packetout (&pad->map.stream, &packet) != 0);
}
done:
/* keep continued pages (only in reverse mode) */
if (continued) {
ogg_page *p = gst_ogg_page_copy (page);
GST_LOG_OBJECT (ogg, "keeping continued page %p", p);
pad->continued = g_list_prepend (pad->continued, p);
}
return result;
choked:
{
GST_WARNING_OBJECT (ogg,
"ogg stream choked on page (serial %08x), "
"resetting stream", pad->map.serialno);
gst_ogg_pad_reset (pad);
/* we continue to recover */
return GST_FLOW_OK;
}
}
static GstOggChain *
gst_ogg_chain_new (GstOggDemux * ogg)
{
GstOggChain *chain = g_slice_new0 (GstOggChain);
GST_DEBUG_OBJECT (ogg, "creating new chain %p", chain);
chain->ogg = ogg;
chain->offset = -1;
chain->bytes = -1;
chain->have_bos = FALSE;
chain->streams = g_array_new (FALSE, TRUE, sizeof (GstOggPad *));
chain->begin_time = GST_CLOCK_TIME_NONE;
chain->segment_start = GST_CLOCK_TIME_NONE;
chain->segment_stop = GST_CLOCK_TIME_NONE;
chain->total_time = GST_CLOCK_TIME_NONE;
return chain;
}
static void
gst_ogg_chain_free (GstOggChain * chain)
{
gint i;
for (i = 0; i < chain->streams->len; i++) {
GstOggPad *pad = g_array_index (chain->streams, GstOggPad *, i);
gst_object_unref (pad);
}
g_array_free (chain->streams, TRUE);
g_slice_free (GstOggChain, chain);
}
static void
gst_ogg_pad_mark_discont (GstOggPad * pad)
{
pad->discont = TRUE;
pad->map.last_size = 0;
}
static void
gst_ogg_chain_mark_discont (GstOggChain * chain)
{
gint i;
for (i = 0; i < chain->streams->len; i++) {
GstOggPad *pad = g_array_index (chain->streams, GstOggPad *, i);
gst_ogg_pad_mark_discont (pad);
}
}
static void
gst_ogg_chain_reset (GstOggChain * chain)
{
gint i;
for (i = 0; i < chain->streams->len; i++) {
GstOggPad *pad = g_array_index (chain->streams, GstOggPad *, i);
gst_ogg_pad_reset (pad);
}
}
static GstOggPad *
gst_ogg_chain_new_stream (GstOggChain * chain, guint32 serialno)
{
GstOggPad *ret;
GstTagList *list;
gchar *name;
GST_DEBUG_OBJECT (chain->ogg,
"creating new stream %08x in chain %p", serialno, chain);
ret = g_object_new (GST_TYPE_OGG_PAD, NULL);
/* we own this one */
gst_object_ref (ret);
gst_object_sink (ret);
GST_PAD_DIRECTION (ret) = GST_PAD_SRC;
gst_ogg_pad_mark_discont (ret);
ret->chain = chain;
ret->ogg = chain->ogg;
ret->map.serialno = serialno;
if (ogg_stream_init (&ret->map.stream, serialno) != 0)
goto init_failed;
name = g_strdup_printf ("serial_%08x", serialno);
gst_object_set_name (GST_OBJECT (ret), name);
g_free (name);
/* FIXME: either do something with it or remove it */
list = gst_tag_list_new ();
gst_tag_list_add (list, GST_TAG_MERGE_REPLACE, GST_TAG_SERIAL, serialno,
NULL);
gst_tag_list_free (list);
GST_DEBUG_OBJECT (chain->ogg,
"created new ogg src %p for stream with serial %08x", ret, serialno);
g_array_append_val (chain->streams, ret);
return ret;
/* ERRORS */
init_failed:
{
GST_ERROR ("Could not initialize ogg_stream struct for serial %08x",
serialno);
gst_object_unref (ret);
return NULL;
}
}
static GstOggPad *
gst_ogg_chain_get_stream (GstOggChain * chain, guint32 serialno)
{
gint i;
for (i = 0; i < chain->streams->len; i++) {
GstOggPad *pad = g_array_index (chain->streams, GstOggPad *, i);
if (pad->map.serialno == serialno)
return pad;
}
return NULL;
}
static gboolean
gst_ogg_chain_has_stream (GstOggChain * chain, guint32 serialno)
{
return gst_ogg_chain_get_stream (chain, serialno) != NULL;
}
/* signals and args */
enum
{
/* FILL ME */
LAST_SIGNAL
};
enum
{
ARG_0
/* FILL ME */
};
static GstStaticPadTemplate ogg_demux_src_template_factory =
GST_STATIC_PAD_TEMPLATE ("src_%d",
GST_PAD_SRC,
GST_PAD_SOMETIMES,
GST_STATIC_CAPS_ANY);
static GstStaticPadTemplate ogg_demux_sink_template_factory =
GST_STATIC_PAD_TEMPLATE ("sink",
GST_PAD_SINK,
GST_PAD_ALWAYS,
GST_STATIC_CAPS ("application/ogg; application/x-annodex")
);
static void gst_ogg_demux_finalize (GObject * object);
static GstFlowReturn gst_ogg_demux_read_chain (GstOggDemux * ogg,
GstOggChain ** chain);
static GstFlowReturn gst_ogg_demux_read_end_chain (GstOggDemux * ogg,
GstOggChain * chain);
static gboolean gst_ogg_demux_sink_event (GstPad * pad, GstEvent * event);
static void gst_ogg_demux_loop (GstOggPad * pad);
static GstFlowReturn gst_ogg_demux_chain (GstPad * pad, GstBuffer * buffer);
static gboolean gst_ogg_demux_sink_activate (GstPad * sinkpad);
static gboolean gst_ogg_demux_sink_activate_pull (GstPad * sinkpad,
gboolean active);
static gboolean gst_ogg_demux_sink_activate_push (GstPad * sinkpad,
gboolean active);
static GstStateChangeReturn gst_ogg_demux_change_state (GstElement * element,
GstStateChange transition);
static void gst_ogg_print (GstOggDemux * demux);
GST_BOILERPLATE (GstOggDemux, gst_ogg_demux, GstElement, GST_TYPE_ELEMENT);
static void
gst_ogg_demux_base_init (gpointer g_class)
{
GstElementClass *element_class = GST_ELEMENT_CLASS (g_class);
gst_element_class_set_details_simple (element_class,
"Ogg demuxer", "Codec/Demuxer",
"demux ogg streams (info about ogg: http://xiph.org)",
"Wim Taymans <wim@fluendo.com>");
gst_element_class_add_pad_template (element_class,
gst_static_pad_template_get (&ogg_demux_sink_template_factory));
gst_element_class_add_pad_template (element_class,
gst_static_pad_template_get (&ogg_demux_src_template_factory));
}
static void
gst_ogg_demux_class_init (GstOggDemuxClass * klass)
{
GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
gstelement_class->change_state = gst_ogg_demux_change_state;
gstelement_class->send_event = gst_ogg_demux_receive_event;
gobject_class->finalize = gst_ogg_demux_finalize;
}
static void
gst_ogg_demux_init (GstOggDemux * ogg, GstOggDemuxClass * g_class)
{
/* create the sink pad */
ogg->sinkpad =
gst_pad_new_from_static_template (&ogg_demux_sink_template_factory,
"sink");
gst_pad_set_event_function (ogg->sinkpad, gst_ogg_demux_sink_event);
gst_pad_set_chain_function (ogg->sinkpad, gst_ogg_demux_chain);
gst_pad_set_activate_function (ogg->sinkpad, gst_ogg_demux_sink_activate);
gst_pad_set_activatepull_function (ogg->sinkpad,
gst_ogg_demux_sink_activate_pull);
gst_pad_set_activatepush_function (ogg->sinkpad,
gst_ogg_demux_sink_activate_push);
gst_element_add_pad (GST_ELEMENT (ogg), ogg->sinkpad);
ogg->chain_lock = g_mutex_new ();
ogg->push_lock = g_mutex_new ();
ogg->chains = g_array_new (FALSE, TRUE, sizeof (GstOggChain *));
ogg->newsegment = NULL;
}
static void
gst_ogg_demux_finalize (GObject * object)
{
GstOggDemux *ogg;
ogg = GST_OGG_DEMUX (object);
g_array_free (ogg->chains, TRUE);
g_mutex_free (ogg->chain_lock);
g_mutex_free (ogg->push_lock);
ogg_sync_clear (&ogg->sync);
if (ogg->newsegment)
gst_event_unref (ogg->newsegment);
G_OBJECT_CLASS (parent_class)->finalize (object);
}
static void
gst_ogg_demux_reset_streams (GstOggDemux * ogg)
{
GstOggChain *chain;
guint i;
chain = ogg->current_chain;
if (chain == NULL)
return;
for (i = 0; i < chain->streams->len; i++) {
GstOggPad *stream = g_array_index (chain->streams, GstOggPad *, i);
stream->start_time = -1;
stream->map.accumulated_granule = 0;
}
ogg->building_chain = chain;
GST_DEBUG_OBJECT (ogg, "Resetting current chain");
ogg->current_chain = NULL;
ogg->resync = TRUE;
}
static gboolean
gst_ogg_demux_sink_event (GstPad * pad, GstEvent * event)
{
gboolean res;
GstOggDemux *ogg;
ogg = GST_OGG_DEMUX (gst_pad_get_parent (pad));
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_FLUSH_START:
res = gst_ogg_demux_send_event (ogg, event);
break;
case GST_EVENT_FLUSH_STOP:
GST_DEBUG_OBJECT (ogg, "got a flush stop event");
ogg_sync_reset (&ogg->sync);
res = gst_ogg_demux_send_event (ogg, event);
if (ogg->pullmode || ogg->push_state != PUSH_DURATION) {
/* it's starting to feel reaaaally dirty :(
if we're on a spliced seek to get duration, don't reset streams,
we'll need them for the delayed seek */
gst_ogg_demux_reset_streams (ogg);
}
break;
case GST_EVENT_NEWSEGMENT:
GST_DEBUG_OBJECT (ogg, "got a new segment event");
{
gboolean update;
GstFormat format;
gdouble rate, arate;
gint64 start, stop, time;
gst_event_parse_new_segment_full (event, &update, &rate, &arate,
&format, &start, &stop, &time);
if (format == GST_FORMAT_BYTES) {
GST_PUSH_LOCK (ogg);
ogg->push_byte_offset = start;
ogg->push_last_seek_offset = start;
GST_PUSH_UNLOCK (ogg);
} else {
GST_WARNING_OBJECT (ogg, "unexpected segment format: %s",
gst_format_get_name (format));
}
}
gst_event_unref (event);
res = TRUE;
break;
case GST_EVENT_EOS:
{
GST_DEBUG_OBJECT (ogg, "got an EOS event");
#if 0
/* This would be what is needed (recover from EOS by going on to
the next step (issue the delayed seek)), but it does not work
if there is a queue2 upstream - see more details comment in
gst_ogg_pad_submit_page.
If I could find a way to bypass queue2 behavior, this should
be enabled. */
GST_PUSH_LOCK (ogg);
if (ogg->push_state == PUSH_DURATION) {
GST_DEBUG_OBJECT (ogg, "Got EOS while determining length");
res = gst_ogg_demux_seek_back_after_push_duration_check_unlock (ogg);
if (res != GST_FLOW_OK) {
GST_DEBUG_OBJECT (ogg, "Error seeking back after duration check: %d",
res);
}
break;
}
GST_PUSH_UNLOCK (ogg);
#endif
res = gst_ogg_demux_send_event (ogg, event);
if (ogg->current_chain == NULL) {
GST_ELEMENT_ERROR (ogg, STREAM, DEMUX, (NULL),
("can't get first chain"));
}
break;
}
default:
res = gst_ogg_demux_send_event (ogg, event);
break;
}
gst_object_unref (ogg);
return res;
}
/* submit the given buffer to the ogg sync */
static GstFlowReturn
gst_ogg_demux_submit_buffer (GstOggDemux * ogg, GstBuffer * buffer)
{
gint size;
guint8 *data;
gchar *oggbuffer;
GstFlowReturn ret = GST_FLOW_OK;
size = GST_BUFFER_SIZE (buffer);
data = GST_BUFFER_DATA (buffer);
GST_DEBUG_OBJECT (ogg, "submitting %u bytes", size);
if (G_UNLIKELY (size == 0))
goto done;
oggbuffer = ogg_sync_buffer (&ogg->sync, size);
if (G_UNLIKELY (oggbuffer == NULL))
goto no_buffer;
memcpy (oggbuffer, data, size);
if (G_UNLIKELY (ogg_sync_wrote (&ogg->sync, size) < 0))
goto write_failed;
if (!ogg->pullmode) {
GST_PUSH_LOCK (ogg);
ogg->push_byte_offset += size;
GST_PUSH_UNLOCK (ogg);
}
done:
gst_buffer_unref (buffer);
return ret;
/* ERRORS */
no_buffer:
{
GST_ELEMENT_ERROR (ogg, STREAM, DECODE,
(NULL), ("failed to get ogg sync buffer"));
ret = GST_FLOW_ERROR;
goto done;
}
write_failed:
{
GST_ELEMENT_ERROR (ogg, STREAM, DECODE,
(NULL), ("failed to write %d bytes to the sync buffer", size));
ret = GST_FLOW_ERROR;
goto done;
}
}
/* in random access mode this code updates the current read position
* and resets the ogg sync buffer so that the next read will happen
* from this new location.
*/
static void
gst_ogg_demux_seek (GstOggDemux * ogg, gint64 offset)
{
GST_LOG_OBJECT (ogg, "seeking to %" G_GINT64_FORMAT, offset);
ogg->offset = offset;
ogg->read_offset = offset;
ogg_sync_reset (&ogg->sync);
}
/* read more data from the current offset and submit to
* the ogg sync layer.
*/
static GstFlowReturn
gst_ogg_demux_get_data (GstOggDemux * ogg, gint64 end_offset)
{
GstFlowReturn ret;
GstBuffer *buffer;
GST_LOG_OBJECT (ogg,
"get data %" G_GINT64_FORMAT " %" G_GINT64_FORMAT " %" G_GINT64_FORMAT,
ogg->read_offset, ogg->length, end_offset);
if (end_offset > 0 && ogg->read_offset >= end_offset)
goto boundary_reached;
if (ogg->read_offset == ogg->length)
goto eos;
ret = gst_pad_pull_range (ogg->sinkpad, ogg->read_offset, CHUNKSIZE, &buffer);
if (ret != GST_FLOW_OK)
goto error;
ogg->read_offset += GST_BUFFER_SIZE (buffer);
ret = gst_ogg_demux_submit_buffer (ogg, buffer);
return ret;
/* ERROR */
boundary_reached:
{
GST_LOG_OBJECT (ogg, "reached boundary");
return GST_FLOW_LIMIT;
}
eos:
{
GST_LOG_OBJECT (ogg, "reached EOS");
return GST_FLOW_UNEXPECTED;
}
error:
{
GST_WARNING_OBJECT (ogg, "got %d (%s) from pull range", ret,
gst_flow_get_name (ret));
return ret;
}
}
/* Read the next page from the current offset.
* boundary: number of bytes ahead we allow looking for;
* -1 if no boundary
*
* @offset will contain the offset the next page starts at when this function
* returns GST_FLOW_OK.
*
* GST_FLOW_UNEXPECTED is returned on EOS.
*
* GST_FLOW_LIMIT is returned when we did not find a page before the
* boundary. If @boundary is -1, this is never returned.
*
* Any other error returned while retrieving data from the peer is returned as
* is.
*/
static GstFlowReturn
gst_ogg_demux_get_next_page (GstOggDemux * ogg, ogg_page * og,
gint64 boundary, gint64 * offset)
{
gint64 end_offset = -1;
GstFlowReturn ret;
GST_LOG_OBJECT (ogg,
"get next page, current offset %" G_GINT64_FORMAT ", bytes boundary %"
G_GINT64_FORMAT, ogg->offset, boundary);
if (boundary >= 0)
end_offset = ogg->offset + boundary;
while (TRUE) {
glong more;
if (end_offset > 0 && ogg->offset >= end_offset)
goto boundary_reached;
more = ogg_sync_pageseek (&ogg->sync, og);
GST_LOG_OBJECT (ogg, "pageseek gave %ld", more);
if (more < 0) {
/* skipped n bytes */
ogg->offset -= more;
GST_LOG_OBJECT (ogg, "skipped %ld bytes, offset %" G_GINT64_FORMAT,
more, ogg->offset);
} else if (more == 0) {
/* we need more data */
if (boundary == 0)
goto boundary_reached;
GST_LOG_OBJECT (ogg, "need more data");
ret = gst_ogg_demux_get_data (ogg, end_offset);
if (ret != GST_FLOW_OK)
break;
} else {
gint64 res_offset = ogg->offset;
/* got a page. Return the offset at the page beginning,
advance the internal offset past the page end */
if (offset)
*offset = res_offset;
ret = GST_FLOW_OK;
ogg->offset += more;
GST_LOG_OBJECT (ogg,
"got page at %" G_GINT64_FORMAT ", serial %08x, end at %"
G_GINT64_FORMAT ", granule %" G_GINT64_FORMAT, res_offset,
ogg_page_serialno (og), ogg->offset,
(gint64) ogg_page_granulepos (og));
break;
}
}
GST_LOG_OBJECT (ogg, "returning %d", ret);
return ret;
/* ERRORS */
boundary_reached:
{
GST_LOG_OBJECT (ogg,
"offset %" G_GINT64_FORMAT " >= end_offset %" G_GINT64_FORMAT,
ogg->offset, end_offset);
return GST_FLOW_LIMIT;
}
}
/* from the current offset, find the previous page, seeking backwards
* until we find the page.
*/
static GstFlowReturn
gst_ogg_demux_get_prev_page (GstOggDemux * ogg, ogg_page * og, gint64 * offset)
{
GstFlowReturn ret;
gint64 begin = ogg->offset;
gint64 end = begin;
gint64 cur_offset = -1;
GST_LOG_OBJECT (ogg, "getting page before %" G_GINT64_FORMAT, begin);
while (cur_offset == -1) {
begin -= CHUNKSIZE;
if (begin < 0)
begin = 0;
/* seek CHUNKSIZE back */
gst_ogg_demux_seek (ogg, begin);
/* now continue reading until we run out of data, if we find a page
* start, we save it. It might not be the final page as there could be
* another page after this one. */
while (ogg->offset < end) {
gint64 new_offset;
ret =
gst_ogg_demux_get_next_page (ogg, og, end - ogg->offset, &new_offset);
/* we hit the upper limit, offset contains the last page start */
if (ret == GST_FLOW_LIMIT) {
GST_LOG_OBJECT (ogg, "hit limit");
break;
}
/* something went wrong */
if (ret == GST_FLOW_UNEXPECTED) {
new_offset = 0;
GST_LOG_OBJECT (ogg, "got unexpected");
} else if (ret != GST_FLOW_OK) {
GST_LOG_OBJECT (ogg, "got error %d", ret);
return ret;
}
GST_LOG_OBJECT (ogg, "found page at %" G_GINT64_FORMAT, new_offset);
/* offset is next page start */
cur_offset = new_offset;
}
}
GST_LOG_OBJECT (ogg, "found previous page at %" G_GINT64_FORMAT, cur_offset);
/* we have the offset. Actually snork and hold the page now */
gst_ogg_demux_seek (ogg, cur_offset);
ret = gst_ogg_demux_get_next_page (ogg, og, -1, NULL);
if (ret != GST_FLOW_OK) {
GST_WARNING_OBJECT (ogg, "can't get last page at %" G_GINT64_FORMAT,
cur_offset);
/* this shouldn't be possible */
return ret;
}
if (offset)
*offset = cur_offset;
return ret;
}
static gboolean
gst_ogg_demux_deactivate_current_chain (GstOggDemux * ogg)
{
gint i;
GstOggChain *chain = ogg->current_chain;
if (chain == NULL)
return TRUE;
GST_DEBUG_OBJECT (ogg, "deactivating chain %p", chain);
/* send EOS on all the pads */
for (i = 0; i < chain->streams->len; i++) {
GstOggPad *pad = g_array_index (chain->streams, GstOggPad *, i);
GstEvent *event;
if (!pad->added)
continue;
event = gst_event_new_eos ();
gst_event_set_seqnum (event, ogg->seqnum);
gst_pad_push_event (GST_PAD_CAST (pad), event);
GST_DEBUG_OBJECT (ogg, "removing pad %" GST_PTR_FORMAT, pad);
/* deactivate first */
gst_pad_set_active (GST_PAD_CAST (pad), FALSE);
gst_element_remove_pad (GST_ELEMENT (ogg), GST_PAD_CAST (pad));
pad->added = FALSE;
}
/* With push mode seeking implemented, we can now seek back to the chain,
so we do not destroy it */
GST_DEBUG_OBJECT (ogg, "Resetting current chain");
ogg->current_chain = NULL;
return TRUE;
}
GstCaps *
gst_ogg_demux_set_header_on_caps (GstOggDemux * ogg, GstCaps * caps,
GList * headers)
{
GstStructure *structure;
GValue array = { 0 };
GST_LOG_OBJECT (ogg, "caps: %" GST_PTR_FORMAT, caps);
if (G_UNLIKELY (!caps))
return NULL;
if (G_UNLIKELY (!headers))
return NULL;
caps = gst_caps_make_writable (caps);
structure = gst_caps_get_structure (caps, 0);
g_value_init (&array, GST_TYPE_ARRAY);
while (headers) {
GValue value = { 0 };
GstBuffer *buffer;
ogg_packet *op = headers->data;
g_assert (op);
buffer = gst_buffer_new_and_alloc (op->bytes);
memcpy (GST_BUFFER_DATA (buffer), op->packet, op->bytes);
GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_IN_CAPS);
g_value_init (&value, GST_TYPE_BUFFER);
gst_value_take_buffer (&value, buffer);
gst_value_array_append_value (&array, &value);
g_value_unset (&value);
headers = headers->next;
}
gst_structure_set_value (structure, "streamheader", &array);
g_value_unset (&array);
GST_LOG_OBJECT (ogg, "here are the newly set caps: %" GST_PTR_FORMAT, caps);
return caps;
}
static void
gst_ogg_demux_push_queued_buffers (GstOggDemux * ogg, GstOggPad * pad)
{
GList *walk;
/* push queued packets */
for (walk = pad->map.queued; walk; walk = g_list_next (walk)) {
ogg_packet *p = walk->data;
gst_ogg_demux_chain_peer (pad, p, TRUE);
_ogg_packet_free (p);
}
/* and free the queued buffers */
g_list_free (pad->map.queued);
pad->map.queued = NULL;
}
static gboolean
gst_ogg_demux_activate_chain (GstOggDemux * ogg, GstOggChain * chain,
GstEvent * event)
{
gint i;
gint bitrate, idx_bitrate;
g_return_val_if_fail (chain != NULL, FALSE);
if (chain == ogg->current_chain) {
if (event)
gst_event_unref (event);
for (i = 0; i < chain->streams->len; i++) {
GstOggPad *pad = g_array_index (chain->streams, GstOggPad *, i);
gst_ogg_demux_push_queued_buffers (ogg, pad);
}
return TRUE;
}
GST_DEBUG_OBJECT (ogg, "activating chain %p", chain);
bitrate = idx_bitrate = 0;
/* first add the pads */
for (i = 0; i < chain->streams->len; i++) {
GstOggPad *pad;
pad = g_array_index (chain->streams, GstOggPad *, i);
if (pad->map.idx_bitrate)
idx_bitrate = MAX (idx_bitrate, pad->map.idx_bitrate);
bitrate += pad->map.bitrate;
/* mark discont */
gst_ogg_pad_mark_discont (pad);
pad->last_ret = GST_FLOW_OK;
if (pad->map.is_skeleton || pad->added || GST_PAD_CAPS (pad) == NULL)
continue;
GST_DEBUG_OBJECT (ogg, "adding pad %" GST_PTR_FORMAT, pad);
/* activate first */
gst_pad_set_active (GST_PAD_CAST (pad), TRUE);
gst_element_add_pad (GST_ELEMENT (ogg), GST_PAD_CAST (pad));
pad->added = TRUE;
}
/* prefer the index bitrate over the ones encoded in the streams */
ogg->bitrate = (idx_bitrate ? idx_bitrate : bitrate);
/* after adding the new pads, remove the old pads */
gst_ogg_demux_deactivate_current_chain (ogg);
GST_DEBUG_OBJECT (ogg, "Setting current chain to %p", chain);
ogg->current_chain = chain;
/* we are finished now */
gst_element_no_more_pads (GST_ELEMENT (ogg));
/* FIXME, must be sent from the streaming thread */
if (event) {
gst_ogg_demux_send_event (ogg, event);
gst_element_found_tags (GST_ELEMENT_CAST (ogg),
gst_tag_list_new_full (GST_TAG_CONTAINER_FORMAT, "Ogg", NULL));
}
GST_DEBUG_OBJECT (ogg, "starting chain");
/* then send out any headers and queued packets */
for (i = 0; i < chain->streams->len; i++) {
GList *walk;
GstOggPad *pad;
pad = g_array_index (chain->streams, GstOggPad *, i);
/* FIXME also streaming thread */
if (pad->map.taglist) {
GST_DEBUG_OBJECT (ogg, "pushing tags");
gst_element_found_tags_for_pad (GST_ELEMENT_CAST (ogg),
GST_PAD_CAST (pad), pad->map.taglist);
pad->map.taglist = NULL;
}
/* Set headers on caps */
pad->map.caps =
gst_ogg_demux_set_header_on_caps (ogg, pad->map.caps, pad->map.headers);
gst_pad_set_caps (GST_PAD_CAST (pad), pad->map.caps);
GST_DEBUG_OBJECT (ogg, "pushing headers");
/* push headers */
for (walk = pad->map.headers; walk; walk = g_list_next (walk)) {
ogg_packet *p = walk->data;
gst_ogg_demux_chain_peer (pad, p, TRUE);
}
GST_DEBUG_OBJECT (ogg, "pushing queued buffers");
gst_ogg_demux_push_queued_buffers (ogg, pad);
}
return TRUE;
}
static gboolean
do_binary_search (GstOggDemux * ogg, GstOggChain * chain, gint64 begin,
gint64 end, gint64 begintime, gint64 endtime, gint64 target,
gint64 * offset)
{
gint64 best;
GstFlowReturn ret;
gint64 result = 0;
best = begin;
GST_DEBUG_OBJECT (ogg,
"chain offset %" G_GINT64_FORMAT ", end offset %" G_GINT64_FORMAT,
begin, end);
GST_DEBUG_OBJECT (ogg,
"chain begin time %" GST_TIME_FORMAT ", end time %" GST_TIME_FORMAT,
GST_TIME_ARGS (begintime), GST_TIME_ARGS (endtime));
GST_DEBUG_OBJECT (ogg, "target %" GST_TIME_FORMAT, GST_TIME_ARGS (target));
/* perform the seek */
while (begin < end) {
gint64 bisect;
if ((end - begin < CHUNKSIZE) || (endtime == begintime)) {
bisect = begin;
} else {
/* take a (pretty decent) guess, avoiding overflow */
gint64 rate = (end - begin) * GST_MSECOND / (endtime - begintime);
bisect = (target - begintime) / GST_MSECOND * rate + begin - CHUNKSIZE;
if (bisect <= begin)
bisect = begin;
GST_DEBUG_OBJECT (ogg, "Initial guess: %" G_GINT64_FORMAT, bisect);
}
gst_ogg_demux_seek (ogg, bisect);
while (begin < end) {
ogg_page og;
GST_DEBUG_OBJECT (ogg,
"after seek, bisect %" G_GINT64_FORMAT ", begin %" G_GINT64_FORMAT
", end %" G_GINT64_FORMAT, bisect, begin, end);
ret = gst_ogg_demux_get_next_page (ogg, &og, end - ogg->offset, &result);
GST_LOG_OBJECT (ogg, "looking for next page returned %" G_GINT64_FORMAT,
result);
if (ret == GST_FLOW_LIMIT) {
/* we hit the upper limit, go back a bit */
if (bisect <= begin + 1) {
end = begin; /* found it */
} else {
if (bisect == 0)
goto seek_error;
bisect -= CHUNKSIZE;
if (bisect <= begin)
bisect = begin + 1;
gst_ogg_demux_seek (ogg, bisect);
}
} else if (ret == GST_FLOW_OK) {
/* found offset of next ogg page */
gint64 granulepos;
GstClockTime granuletime;
GstOggPad *pad;
/* get the granulepos */
GST_LOG_OBJECT (ogg, "found next ogg page at %" G_GINT64_FORMAT,
result);
granulepos = ogg_page_granulepos (&og);
if (granulepos == -1) {
GST_LOG_OBJECT (ogg, "granulepos of next page is -1");
continue;
}
/* get the stream */
pad = gst_ogg_chain_get_stream (chain, ogg_page_serialno (&og));
if (pad == NULL || pad->map.is_skeleton)
continue;
/* convert granulepos to time */
granuletime = gst_ogg_stream_get_end_time_for_granulepos (&pad->map,
granulepos);
if (granuletime < pad->start_time)
continue;
GST_LOG_OBJECT (ogg, "granulepos %" G_GINT64_FORMAT " maps to time %"
GST_TIME_FORMAT, granulepos, GST_TIME_ARGS (granuletime));
granuletime -= pad->start_time;
granuletime += chain->begin_time;
GST_DEBUG_OBJECT (ogg,
"found page with granule %" G_GINT64_FORMAT " and time %"
GST_TIME_FORMAT, granulepos, GST_TIME_ARGS (granuletime));
if (granuletime < target) {
best = result; /* raw offset of packet with granulepos */
begin = ogg->offset; /* raw offset of next page */
begintime = granuletime;
bisect = begin; /* *not* begin + 1 */
} else {
if (bisect <= begin + 1) {
end = begin; /* found it */
} else {
if (end == ogg->offset) { /* we're pretty close - we'd be stuck in */
end = result;
bisect -= CHUNKSIZE; /* an endless loop otherwise. */
if (bisect <= begin)
bisect = begin + 1;
gst_ogg_demux_seek (ogg, bisect);
} else {
end = result;
endtime = granuletime;
break;
}
}
}
} else
goto seek_error;
}
}
GST_DEBUG_OBJECT (ogg, "seeking to %" G_GINT64_FORMAT, best);
gst_ogg_demux_seek (ogg, best);
*offset = best;
return TRUE;
/* ERRORS */
seek_error:
{
GST_DEBUG_OBJECT (ogg, "got a seek error");
return FALSE;
}
}
static gboolean
do_index_search (GstOggDemux * ogg, GstOggChain * chain, gint64 begin,
gint64 end, gint64 begintime, gint64 endtime, gint64 target,
gint64 * p_offset, gint64 * p_timestamp)
{
guint i;
guint64 timestamp, offset;
guint64 r_timestamp, r_offset;
gboolean result = FALSE;
target -= begintime;
r_offset = -1;
r_timestamp = -1;
for (i = 0; i < chain->streams->len; i++) {
GstOggPad *pad = g_array_index (chain->streams, GstOggPad *, i);
timestamp = target;
if (gst_ogg_map_search_index (&pad->map, TRUE, &timestamp, &offset)) {
GST_INFO ("found %" G_GUINT64_FORMAT " at offset %" G_GUINT64_FORMAT,
timestamp, offset);
if (r_offset == -1 || offset < r_offset) {
r_offset = offset;
r_timestamp = timestamp;
}
result |= TRUE;
}
}
if (p_timestamp)
*p_timestamp = r_timestamp;
if (p_offset)
*p_offset = r_offset;
return result;
}
/*
* do seek to time @position, return FALSE or chain and TRUE
*/
static gboolean
gst_ogg_demux_do_seek (GstOggDemux * ogg, GstSegment * segment,
gboolean accurate, gboolean keyframe, GstOggChain ** rchain)
{
guint64 position;
GstOggChain *chain = NULL;
gint64 begin, end;
gint64 begintime, endtime;
gint64 target, keytarget;
gint64 best;
gint64 total;
gint64 result = 0;
GstFlowReturn ret;
gint i, pending, len;
gboolean first_parsed_page = TRUE;
position = segment->last_stop;
/* first find the chain to search in */
total = ogg->total_time;
if (ogg->chains->len == 0)
goto no_chains;
for (i = ogg->chains->len - 1; i >= 0; i--) {
chain = g_array_index (ogg->chains, GstOggChain *, i);
total -= chain->total_time;
if (position >= total)
break;
}
/* first step, locate page containing the required data */
begin = chain->offset;
end = chain->end_offset;
begintime = chain->begin_time;
endtime = begintime + chain->total_time;
target = position - total + begintime;
if (!do_binary_search (ogg, chain, begin, end, begintime, endtime, target,
&best))
goto seek_error;
/* second step: find pages for all streams, we use the keyframe_granule to keep
* track of which ones we saw. If we have seen a page for each stream we can
* calculate the positions of each keyframe. */
GST_DEBUG_OBJECT (ogg, "find keyframes");
len = pending = chain->streams->len;
/* figure out where the keyframes are */
keytarget = target;
while (TRUE) {
ogg_page og;
gint64 granulepos;
GstOggPad *pad;
GstClockTime keyframe_time, granule_time;
ret = gst_ogg_demux_get_next_page (ogg, &og, end - ogg->offset, &result);
GST_LOG_OBJECT (ogg, "looking for next page returned %" G_GINT64_FORMAT,
result);
if (ret == GST_FLOW_LIMIT) {
GST_LOG_OBJECT (ogg, "reached limit");
break;
} else if (ret != GST_FLOW_OK)
goto seek_error;
/* get the stream */
pad = gst_ogg_chain_get_stream (chain, ogg_page_serialno (&og));
if (pad == NULL)
continue;
if (pad->map.is_skeleton)
goto next;
granulepos = ogg_page_granulepos (&og);
if (granulepos == -1 || granulepos == 0) {
GST_LOG_OBJECT (ogg, "granulepos of next page is -1");
continue;
}
/* we only do this the first time we pass here */
if (first_parsed_page) {
/* Now that we have a time reference from the page, we can check
* whether all streams still have pages from here on.
*
* This would be more elegant before the loop, but getting the page from
* there without breaking anything would be more costly */
granule_time = gst_ogg_stream_get_end_time_for_granulepos (&pad->map,
granulepos);
for (i = 0; i < len; i++) {
GstOggPad *stream = g_array_index (chain->streams, GstOggPad *, i);
if (stream == pad)
/* we already know we have at least one page (the current one)
* for this stream */
continue;
if (granule_time > stream->map.total_time)
/* we won't encounter any more pages of this stream, so we don't
* try finding a key frame for it */
pending--;
}
first_parsed_page = FALSE;
}
/* in reverse we want to go past the page with the lower timestamp */
if (segment->rate < 0.0) {
/* get time for this pad */
granule_time = gst_ogg_stream_get_end_time_for_granulepos (&pad->map,
granulepos);
GST_LOG_OBJECT (ogg,
"looking at page with ts %" GST_TIME_FORMAT ", target %"
GST_TIME_FORMAT, GST_TIME_ARGS (granule_time),
GST_TIME_ARGS (target));
if (granule_time < target)
continue;
}
/* we've seen this pad before */
if (pad->keyframe_granule != -1)
continue;
/* convert granule of this pad to the granule of the keyframe */
pad->keyframe_granule =
gst_ogg_stream_granulepos_to_key_granule (&pad->map, granulepos);
GST_LOG_OBJECT (ogg, "marking stream granule %" G_GINT64_FORMAT,
pad->keyframe_granule);
/* get time of the keyframe */
keyframe_time =
gst_ogg_stream_granule_to_time (&pad->map, pad->keyframe_granule);
GST_LOG_OBJECT (ogg,
"stream %08x granule time %" GST_TIME_FORMAT,
pad->map.serialno, GST_TIME_ARGS (keyframe_time));
/* collect smallest value */
if (keyframe_time != -1) {
keyframe_time += begintime;
if (keyframe_time < keytarget)
keytarget = keyframe_time;
}
next:
pending--;
if (pending == 0)
break;
}
/* for negative rates we will get to the keyframe backwards */
if (segment->rate < 0.0)
goto done;
if (keytarget != target) {
GST_LOG_OBJECT (ogg, "final seek to target %" GST_TIME_FORMAT,
GST_TIME_ARGS (keytarget));
/* last step, seek to the location of the keyframe */
if (!do_binary_search (ogg, chain, begin, end, begintime, endtime,
keytarget, &best))
goto seek_error;
} else {
/* seek back to previous position */
GST_LOG_OBJECT (ogg, "keyframe on target");
gst_ogg_demux_seek (ogg, best);
}
done:
if (keyframe) {
if (segment->rate > 0.0)
segment->time = keytarget;
segment->last_stop = keytarget - begintime;
}
*rchain = chain;
return TRUE;
no_chains:
{
GST_DEBUG_OBJECT (ogg, "no chains");
return FALSE;
}
seek_error:
{
GST_DEBUG_OBJECT (ogg, "got a seek error");
return FALSE;
}
}
/* does not take ownership of the event */
static gboolean
gst_ogg_demux_perform_seek_pull (GstOggDemux * ogg, GstEvent * event)
{
GstOggChain *chain = NULL;
gboolean res;
gboolean flush, accurate, keyframe;
GstFormat format;
gdouble rate;
GstSeekFlags flags;
GstSeekType cur_type, stop_type;
gint64 cur, stop;
gboolean update;
guint32 seqnum;
GstEvent *tevent;
if (event) {
GST_DEBUG_OBJECT (ogg, "seek with event");
gst_event_parse_seek (event, &rate, &format, &flags,
&cur_type, &cur, &stop_type, &stop);
/* we can only seek on time */
if (format != GST_FORMAT_TIME) {
GST_DEBUG_OBJECT (ogg, "can only seek on TIME");
goto error;
}
seqnum = gst_event_get_seqnum (event);
} else {
GST_DEBUG_OBJECT (ogg, "seek without event");
flags = 0;
rate = 1.0;
seqnum = gst_util_seqnum_next ();
}
GST_DEBUG_OBJECT (ogg, "seek, rate %g", rate);
flush = flags & GST_SEEK_FLAG_FLUSH;
accurate = flags & GST_SEEK_FLAG_ACCURATE;
keyframe = flags & GST_SEEK_FLAG_KEY_UNIT;
/* first step is to unlock the streaming thread if it is
* blocked in a chain call, we do this by starting the flush. because
* we cannot yet hold any streaming lock, we have to protect the chains
* with their own lock. */
if (flush) {
gint i;
tevent = gst_event_new_flush_start ();
gst_event_set_seqnum (tevent, seqnum);
gst_event_ref (tevent);
gst_pad_push_event (ogg->sinkpad, tevent);
GST_CHAIN_LOCK (ogg);
for (i = 0; i < ogg->chains->len; i++) {
GstOggChain *chain = g_array_index (ogg->chains, GstOggChain *, i);
gint j;
for (j = 0; j < chain->streams->len; j++) {
GstOggPad *pad = g_array_index (chain->streams, GstOggPad *, j);
gst_event_ref (tevent);
gst_pad_push_event (GST_PAD (pad), tevent);
}
}
GST_CHAIN_UNLOCK (ogg);
gst_event_unref (tevent);
} else {
gst_pad_pause_task (ogg->sinkpad);
}
/* now grab the stream lock so that streaming cannot continue, for
* non flushing seeks when the element is in PAUSED this could block
* forever. */
GST_PAD_STREAM_LOCK (ogg->sinkpad);
if (ogg->segment_running && !flush) {
/* create the segment event to close the current segment */
if ((chain = ogg->current_chain)) {
GstEvent *newseg;
gint64 chain_start = 0;
if (chain->segment_start != GST_CLOCK_TIME_NONE)
chain_start = chain->segment_start;
newseg = gst_event_new_new_segment (TRUE, ogg->segment.rate,
GST_FORMAT_TIME, ogg->segment.start + chain_start,
ogg->segment.last_stop + chain_start, ogg->segment.time);
/* set the seqnum of the running segment */
gst_event_set_seqnum (newseg, ogg->seqnum);
/* send segment on old chain, FIXME, must be sent from streaming thread. */
gst_ogg_demux_send_event (ogg, newseg);
}
}
if (event) {
gst_segment_set_seek (&ogg->segment, rate, format, flags,
cur_type, cur, stop_type, stop, &update);
}
GST_DEBUG_OBJECT (ogg, "segment positions set to %" GST_TIME_FORMAT "-%"
GST_TIME_FORMAT, GST_TIME_ARGS (ogg->segment.start),
GST_TIME_ARGS (ogg->segment.stop));
/* we need to stop flushing on the srcpad as we're going to use it
* next. We can do this as we have the STREAM lock now. */
if (flush) {
tevent = gst_event_new_flush_stop ();
gst_event_set_seqnum (tevent, seqnum);
gst_pad_push_event (ogg->sinkpad, tevent);
}
{
gint i;
/* reset all ogg streams now, need to do this from within the lock to
* make sure the streaming thread is not messing with the stream */
for (i = 0; i < ogg->chains->len; i++) {
GstOggChain *chain = g_array_index (ogg->chains, GstOggChain *, i);
gst_ogg_chain_reset (chain);
}
}
/* for reverse we will already seek accurately */
res = gst_ogg_demux_do_seek (ogg, &ogg->segment, accurate, keyframe, &chain);
/* seek failed, make sure we continue the current chain */
if (!res) {
GST_DEBUG_OBJECT (ogg, "seek failed");
chain = ogg->current_chain;
} else {
GST_DEBUG_OBJECT (ogg, "seek success");
}
if (!chain)
goto no_chain;
/* now we have a new position, prepare for streaming again */
{
GstEvent *event;
gint64 stop;
gint64 start;
gint64 last_stop, begin_time;
/* we have to send the flush to the old chain, not the new one */
if (flush) {
tevent = gst_event_new_flush_stop ();
gst_event_set_seqnum (tevent, seqnum);
gst_ogg_demux_send_event (ogg, tevent);
}
/* we need this to see how far inside the chain we need to start */
if (chain->begin_time != GST_CLOCK_TIME_NONE)
begin_time = chain->begin_time;
else
begin_time = 0;
/* segment.start gives the start over all chains, we calculate the amount
* of time into this chain we need to start */
start = ogg->segment.start - begin_time;
if (chain->segment_start != GST_CLOCK_TIME_NONE)
start += chain->segment_start;
if ((stop = ogg->segment.stop) == -1)
stop = ogg->segment.duration;
/* segment.stop gives the stop time over all chains, calculate the amount of
* time we need to stop in this chain */
if (stop != -1) {
if (stop > begin_time)
stop -= begin_time;
else
stop = 0;
stop += chain->segment_start;
/* we must stop when this chain ends and switch to the next chain to play
* the remainder of the segment. */
stop = MIN (stop, chain->segment_stop);
}
last_stop = ogg->segment.last_stop;
if (chain->segment_start != GST_CLOCK_TIME_NONE)
last_stop += chain->segment_start;
/* create the segment event we are going to send out */
if (ogg->segment.rate >= 0.0)
event = gst_event_new_new_segment (FALSE, ogg->segment.rate,
ogg->segment.format, last_stop, stop, ogg->segment.time);
else
event = gst_event_new_new_segment (FALSE, ogg->segment.rate,
ogg->segment.format, start, last_stop, ogg->segment.time);
gst_event_set_seqnum (event, seqnum);
if (chain != ogg->current_chain) {
/* switch to different chain, send segment on new chain */
gst_ogg_demux_activate_chain (ogg, chain, event);
} else {
/* mark discont and send segment on current chain */
gst_ogg_chain_mark_discont (chain);
/* This event should be sent from the streaming thread (sink pad task) */
if (ogg->newsegment)
gst_event_unref (ogg->newsegment);
ogg->newsegment = event;
}
/* notify start of new segment */
if (ogg->segment.flags & GST_SEEK_FLAG_SEGMENT) {
GstMessage *message;
message = gst_message_new_segment_start (GST_OBJECT (ogg),
GST_FORMAT_TIME, ogg->segment.last_stop);
gst_message_set_seqnum (message, seqnum);
gst_element_post_message (GST_ELEMENT (ogg), message);
}
ogg->segment_running = TRUE;
ogg->seqnum = seqnum;
/* restart our task since it might have been stopped when we did the
* flush. */
gst_pad_start_task (ogg->sinkpad, (GstTaskFunction) gst_ogg_demux_loop,
ogg->sinkpad);
}
/* streaming can continue now */
GST_PAD_STREAM_UNLOCK (ogg->sinkpad);
return res;
/* ERRORS */
error:
{
GST_DEBUG_OBJECT (ogg, "seek failed");
return FALSE;
}
no_chain:
{
GST_DEBUG_OBJECT (ogg, "no chain to seek in");
GST_PAD_STREAM_UNLOCK (ogg->sinkpad);
return FALSE;
}
}
static gboolean
gst_ogg_demux_get_duration_push (GstOggDemux * ogg, int flags)
{
/* In push mode, we get to the end of the stream to get the duration */
gint64 position;
GstEvent *sevent;
gboolean res;
/* A full Ogg page can be almost 64 KB. There's no guarantee that there'll be a
granpos there, but it's fairly likely */
position =
ogg->push_byte_length - DURATION_CHUNK_OFFSET - EOS_AVOIDANCE_THRESHOLD;
if (position < 0)
position = 0;
GST_DEBUG_OBJECT (ogg,
"Getting duration, seeking near the end, to %" G_GINT64_FORMAT, position);
ogg->push_state = PUSH_DURATION;
/* do not read the last byte */
sevent = gst_event_new_seek (1.0, GST_FORMAT_BYTES, flags, GST_SEEK_TYPE_SET,
position, GST_SEEK_TYPE_SET, ogg->push_byte_length - 1);
res = gst_pad_push_event (ogg->sinkpad, sevent);
if (res) {
GST_DEBUG_OBJECT (ogg, "Seek succesful");
return TRUE;
} else {
GST_INFO_OBJECT (ogg, "Seek failed, duration will stay unknown");
ogg->push_state = PUSH_PLAYING;
return FALSE;
}
}
static gboolean
gst_ogg_demux_check_duration_push (GstOggDemux * ogg, GstSeekFlags flags,
GstEvent * event)
{
if (ogg->push_byte_length < 0) {
GstPad *peer;
GST_DEBUG_OBJECT (ogg, "Trying to find byte/time length");
if ((peer = gst_pad_get_peer (ogg->sinkpad)) != NULL) {
GstFormat format = GST_FORMAT_BYTES;
gint64 length;
int res;
res = gst_pad_query_duration (peer, &format, &length);
if (res && length > 0) {
ogg->push_byte_length = length;
GST_DEBUG_OBJECT (ogg,
"File byte length %" G_GINT64_FORMAT, ogg->push_byte_length);
}
format = GST_FORMAT_TIME;
res = gst_pad_query_duration (peer, &format, &length);
gst_object_unref (peer);
if (res && length >= 0) {
ogg->push_time_length = length;
GST_DEBUG_OBJECT (ogg, "File time length %" GST_TIME_FORMAT,
GST_TIME_ARGS (ogg->push_time_length));
} else if (!ogg->push_disable_seeking) {
gboolean res;
res = gst_ogg_demux_get_duration_push (ogg, flags);
if (res) {
GST_DEBUG_OBJECT (ogg,
"File time length unknown, trying to determine");
ogg->push_mode_seek_delayed_event = NULL;
if (event) {
GST_DEBUG_OBJECT (ogg,
"Let me intercept this innocent looking seek request");
ogg->push_mode_seek_delayed_event = gst_event_copy (event);
}
return FALSE;
}
}
}
}
return TRUE;
}
static gboolean
gst_ogg_demux_perform_seek_push (GstOggDemux * ogg, GstEvent * event)
{
gint bitrate;
gboolean res = TRUE;
GstFormat format;
gdouble rate;
GstSeekFlags flags;
GstSeekType start_type, stop_type;
gint64 start, stop;
GstEvent *sevent;
GstOggChain *chain;
gint64 best, best_time;
gint i;
GST_DEBUG_OBJECT (ogg, "Push mode seek request received");
gst_event_parse_seek (event, &rate, &format, &flags,
&start_type, &start, &stop_type, &stop);
if (format != GST_FORMAT_TIME) {
GST_DEBUG_OBJECT (ogg, "can only seek on TIME");
goto error;
}
if (start_type != GST_SEEK_TYPE_SET) {
GST_DEBUG_OBJECT (ogg, "can only seek to a SET target");
goto error;
}
if (!(flags & GST_SEEK_FLAG_FLUSH)) {
GST_DEBUG_OBJECT (ogg, "can only do flushing seeks");
goto error;
}
GST_DEBUG_OBJECT (ogg, "Push mode seek request: %" GST_TIME_FORMAT,
GST_TIME_ARGS (start));
chain = ogg->current_chain;
if (!chain) {
GST_WARNING_OBJECT (ogg, "No chain to seek on");
goto error;
}
/* start accessing push_* members */
GST_PUSH_LOCK (ogg);
/* not if we disabled seeking (chained streams) */
if (ogg->push_disable_seeking) {
GST_DEBUG_OBJECT (ogg, "Seeking disabled");
goto error_locked;
}
/* not when we're trying to work out duration */
if (ogg->push_state == PUSH_DURATION) {
GST_DEBUG_OBJECT (ogg, "Busy working out duration, try again later");
goto error_locked;
}
/* actually, not if we're doing any seeking already */
if (ogg->push_state != PUSH_PLAYING) {
GST_DEBUG_OBJECT (ogg, "Already doing some seeking, try again later");
goto error_locked;
}
/* on the first seek, get length if we can */
if (!gst_ogg_demux_check_duration_push (ogg, flags, event)) {
GST_PUSH_UNLOCK (ogg);
return FALSE;
}
if (do_index_search (ogg, chain, 0, -1, 0, -1, start, &best, &best_time)) {
/* the index gave some result */
GST_DEBUG_OBJECT (ogg,
"found offset %" G_GINT64_FORMAT " with time %" G_GUINT64_FORMAT,
best, best_time);
} else {
if (ogg->push_time_length > 0) {
/* if we know the time length, we know the full segment bitrate */
GST_DEBUG_OBJECT (ogg, "Using real file bitrate");
bitrate =
gst_util_uint64_scale (ogg->push_byte_length, 8 * GST_SECOND,
ogg->push_time_length);
} else if (ogg->push_time_offset > 0) {
/* get a first approximation using known bitrate to the current position */
GST_DEBUG_OBJECT (ogg, "Using file bitrate so far");
bitrate =
gst_util_uint64_scale (ogg->push_byte_offset, 8 * GST_SECOND,
ogg->push_time_offset);
} else if (ogg->bitrate > 0) {
/* nominal bitrate is better than nothing, even if it lies often */
GST_DEBUG_OBJECT (ogg, "Using nominal bitrate");
bitrate = ogg->bitrate;
} else {
/* meh */
GST_DEBUG_OBJECT (ogg,
"At stream start, and no nominal bitrate, using some random magic "
"number to seed");
/* the bisection, once started, should give us a better approximation */
bitrate = 1000;
}
best = gst_util_uint64_scale (start, bitrate, 8 * GST_SECOND);
}
/* offset by typical page length, and ensure our best guess is within
reasonable bounds */
best -= CHUNKSIZE;
if (best < 0)
best = 0;
if (ogg->push_byte_length > 0 && best >= ogg->push_byte_length)
best = ogg->push_byte_length - 1;
/* set up bisection search */
ogg->push_offset0 = 0;
ogg->push_offset1 = ogg->push_byte_length - 1;
ogg->push_time0 = ogg->push_start_time;
ogg->push_time1 = ogg->push_time_length;
ogg->push_seek_time_target = start;
ogg->push_seek_time_original_target = start;
ogg->push_state = PUSH_BISECT1;
/* reset pad push mode seeking state */
for (i = 0; i < chain->streams->len; i++) {
GstOggPad *pad = g_array_index (chain->streams, GstOggPad *, i);
pad->push_kf_time = GST_CLOCK_TIME_NONE;
pad->push_sync_time = GST_CLOCK_TIME_NONE;
}
GST_DEBUG_OBJECT (ogg,
"Setting up bisection search for %" G_GINT64_FORMAT " - %" G_GINT64_FORMAT
" (time %" GST_TIME_FORMAT " - %" GST_TIME_FORMAT ")", ogg->push_offset0,
ogg->push_offset1, GST_TIME_ARGS (ogg->push_time0),
GST_TIME_ARGS (ogg->push_time1));
GST_DEBUG_OBJECT (ogg,
"Target time is %" GST_TIME_FORMAT ", best first guess is %"
G_GINT64_FORMAT, GST_TIME_ARGS (ogg->push_seek_time_target), best);
ogg->push_seek_rate = rate;
ogg->push_seek_flags = flags;
ogg->push_mode_seek_delayed_event = NULL;
ogg->push_bisection_steps[0] = 1;
ogg->push_bisection_steps[1] = 0;
sevent = gst_event_new_seek (rate, GST_FORMAT_BYTES, flags,
start_type, best, GST_SEEK_TYPE_NONE, -1);
GST_PUSH_UNLOCK (ogg);
res = gst_pad_push_event (ogg->sinkpad, sevent);
return res;
/* ERRORS */
error:
{
GST_DEBUG_OBJECT (ogg, "seek failed");
return FALSE;
}
error_locked:
GST_PUSH_UNLOCK (ogg);
goto error;
}
static gboolean
gst_ogg_demux_perform_seek (GstOggDemux * ogg, GstEvent * event)
{
gboolean res;
if (ogg->pullmode) {
res = gst_ogg_demux_perform_seek_pull (ogg, event);
} else {
res = gst_ogg_demux_perform_seek_push (ogg, event);
}
return res;
}
/* finds each bitstream link one at a time using a bisection search
* (has to begin by knowing the offset of the lb's initial page).
* Recurses for each link so it can alloc the link storage after
* finding them all, then unroll and fill the cache at the same time
*/
static GstFlowReturn
gst_ogg_demux_bisect_forward_serialno (GstOggDemux * ogg,
gint64 begin, gint64 searched, gint64 end, GstOggChain * chain, glong m)
{
gint64 endsearched = end;
gint64 next = end;
ogg_page og;
GstFlowReturn ret;
gint64 offset;
GstOggChain *nextchain;
GST_LOG_OBJECT (ogg,
"bisect begin: %" G_GINT64_FORMAT ", searched: %" G_GINT64_FORMAT
", end %" G_GINT64_FORMAT ", chain: %p", begin, searched, end, chain);
/* the below guards against garbage seperating the last and
* first pages of two links. */
while (searched < endsearched) {
gint64 bisect;
if (endsearched - searched < CHUNKSIZE) {
bisect = searched;
} else {
bisect = (searched + endsearched) / 2;
}
gst_ogg_demux_seek (ogg, bisect);
ret = gst_ogg_demux_get_next_page (ogg, &og, -1, &offset);
if (ret == GST_FLOW_UNEXPECTED) {
endsearched = bisect;
} else if (ret == GST_FLOW_OK) {
guint32 serial = ogg_page_serialno (&og);
if (!gst_ogg_chain_has_stream (chain, serial)) {
endsearched = bisect;
next = offset;
} else {
searched = offset + og.header_len + og.body_len;
}
} else
return ret;
}
GST_LOG_OBJECT (ogg, "current chain ends at %" G_GINT64_FORMAT, searched);
chain->end_offset = searched;
ret = gst_ogg_demux_read_end_chain (ogg, chain);
if (ret != GST_FLOW_OK)
return ret;
GST_LOG_OBJECT (ogg, "found begin at %" G_GINT64_FORMAT, next);
gst_ogg_demux_seek (ogg, next);
ret = gst_ogg_demux_read_chain (ogg, &nextchain);
if (ret == GST_FLOW_UNEXPECTED) {
nextchain = NULL;
ret = GST_FLOW_OK;
GST_LOG_OBJECT (ogg, "no next chain");
} else if (ret != GST_FLOW_OK)
goto done;
if (searched < end && nextchain != NULL) {
ret = gst_ogg_demux_bisect_forward_serialno (ogg, next, ogg->offset,
end, nextchain, m + 1);
if (ret != GST_FLOW_OK)
goto done;
}
GST_LOG_OBJECT (ogg, "adding chain %p", chain);
g_array_insert_val (ogg->chains, 0, chain);
done:
return ret;
}
/* read a chain from the ogg file. This code will
* read all BOS pages and will create and return a GstOggChain
* structure with the results.
*
* This function will also read N pages from each stream in the
* chain and submit them to the decoders. When the decoder has
* decoded the first buffer, we know the timestamp of the first
* page in the chain.
*/
static GstFlowReturn
gst_ogg_demux_read_chain (GstOggDemux * ogg, GstOggChain ** res_chain)
{
GstFlowReturn ret;
GstOggChain *chain = NULL;
gint64 offset = ogg->offset;
ogg_page og;
gboolean done;
gint i;
GST_LOG_OBJECT (ogg, "reading chain at %" G_GINT64_FORMAT, offset);
/* first read the BOS pages, do typefind on them, create
* the decoders, send data to the decoders. */
while (TRUE) {
GstOggPad *pad;
guint32 serial;
ret = gst_ogg_demux_get_next_page (ogg, &og, -1, NULL);
if (ret != GST_FLOW_OK) {
if (ret == GST_FLOW_UNEXPECTED) {
GST_DEBUG_OBJECT (ogg, "Reached EOS, done reading end chain");
} else {
GST_WARNING_OBJECT (ogg, "problem reading BOS page: ret=%d", ret);
}
break;
}
if (!ogg_page_bos (&og)) {
GST_INFO_OBJECT (ogg, "page is not BOS page, all streams identified");
/* if we did not find a chain yet, assume this is a bogus stream and
* ignore it */
if (!chain) {
GST_WARNING_OBJECT (ogg, "No chain found, no Ogg data in stream ?");
ret = GST_FLOW_UNEXPECTED;
}
break;
}
if (chain == NULL) {
chain = gst_ogg_chain_new (ogg);
chain->offset = offset;
}
serial = ogg_page_serialno (&og);
if (gst_ogg_chain_get_stream (chain, serial) != NULL) {
GST_WARNING_OBJECT (ogg,
"found serial %08x BOS page twice, ignoring", serial);
continue;
}
pad = gst_ogg_chain_new_stream (chain, serial);
gst_ogg_pad_submit_page (pad, &og);
}
if (ret != GST_FLOW_OK || chain == NULL) {
if (ret == GST_FLOW_OK) {
GST_WARNING_OBJECT (ogg, "no chain was found");
ret = GST_FLOW_ERROR;
} else if (ret != GST_FLOW_UNEXPECTED) {
GST_WARNING_OBJECT (ogg, "failed to read chain");
} else {
GST_DEBUG_OBJECT (ogg, "done reading chains");
}
if (chain) {
gst_ogg_chain_free (chain);
}
if (res_chain)
*res_chain = NULL;
return ret;
}
chain->have_bos = TRUE;
GST_LOG_OBJECT (ogg, "read bos pages, init decoder now");
/* now read pages until we receive a buffer from each of the
* stream decoders, this will tell us the timestamp of the
* first packet in the chain then */
/* save the offset to the first non bos page in the chain: if searching for
* pad->first_time we read past the end of the chain, we'll seek back to this
* position
*/
offset = ogg->offset;
done = FALSE;
while (!done) {
guint32 serial;
gboolean known_serial = FALSE;
GstFlowReturn ret;
serial = ogg_page_serialno (&og);
done = TRUE;
for (i = 0; i < chain->streams->len; i++) {
GstOggPad *pad = g_array_index (chain->streams, GstOggPad *, i);
GST_LOG_OBJECT (ogg,
"serial %08x time %" GST_TIME_FORMAT,
pad->map.serialno, GST_TIME_ARGS (pad->start_time));
if (pad->map.serialno == serial) {
known_serial = TRUE;
/* submit the page now, this will fill in the start_time when the
* internal decoder finds it */
gst_ogg_pad_submit_page (pad, &og);
if (!pad->map.is_skeleton && pad->start_time == -1
&& ogg_page_eos (&og)) {
/* got EOS on a pad before we could find its start_time.
* We have no chance of finding a start_time for every pad so
* stop searching for the other start_time(s).
*/
done = TRUE;
break;
}
}
/* the timestamp will be filled in when we submit the pages */
if (!pad->map.is_sparse)
done &= (pad->start_time != GST_CLOCK_TIME_NONE);
GST_LOG_OBJECT (ogg, "done %08x now %d", pad->map.serialno, done);
}
/* we read a page not belonging to the current chain: seek back to the
* beginning of the chain
*/
if (!known_serial) {
GST_LOG_OBJECT (ogg, "unknown serial %08x", serial);
gst_ogg_demux_seek (ogg, offset);
break;
}
if (!done) {
ret = gst_ogg_demux_get_next_page (ogg, &og, -1, NULL);
if (ret != GST_FLOW_OK)
break;
}
}
GST_LOG_OBJECT (ogg, "done reading chain");
/* now we can fill in the missing info using queries */
for (i = 0; i < chain->streams->len; i++) {
GstOggPad *pad = g_array_index (chain->streams, GstOggPad *, i);
if (pad->map.is_skeleton)
continue;
pad->mode = GST_OGG_PAD_MODE_STREAMING;
}
if (res_chain)
*res_chain = chain;
return GST_FLOW_OK;
}
/* read the last pages from the ogg stream to get the final
* page end_offsets.
*/
static GstFlowReturn
gst_ogg_demux_read_end_chain (GstOggDemux * ogg, GstOggChain * chain)
{
gint64 begin = chain->end_offset;
gint64 end = begin;
gint64 last_granule = -1;
GstOggPad *last_pad = NULL;
GstFlowReturn ret;
gboolean done = FALSE;
ogg_page og;
gint i;
while (!done) {
begin -= CHUNKSIZE;
if (begin < 0)
begin = 0;
gst_ogg_demux_seek (ogg, begin);
/* now continue reading until we run out of data, if we find a page
* start, we save it. It might not be the final page as there could be
* another page after this one. */
while (ogg->offset < end) {
ret = gst_ogg_demux_get_next_page (ogg, &og, end - ogg->offset, NULL);
if (ret == GST_FLOW_LIMIT)
break;
if (ret != GST_FLOW_OK)
return ret;
for (i = 0; i < chain->streams->len; i++) {
GstOggPad *pad = g_array_index (chain->streams, GstOggPad *, i);
if (pad->map.is_skeleton)
continue;
if (pad->map.serialno == ogg_page_serialno (&og)) {
gint64 granulepos = ogg_page_granulepos (&og);
if (granulepos != -1) {
last_granule = granulepos;
last_pad = pad;
done = TRUE;
}
break;
}
}
}
}
if (last_pad) {
chain->segment_stop =
gst_ogg_stream_get_end_time_for_granulepos (&last_pad->map,
last_granule);
} else {
chain->segment_stop = GST_CLOCK_TIME_NONE;
}
GST_INFO ("segment stop %" G_GUINT64_FORMAT, chain->segment_stop);
return GST_FLOW_OK;
}
/* find a pad with a given serial number
*/
static GstOggPad *
gst_ogg_demux_find_pad (GstOggDemux * ogg, guint32 serialno)
{
GstOggPad *pad;
gint i;
/* first look in building chain if any */
if (ogg->building_chain) {
pad = gst_ogg_chain_get_stream (ogg->building_chain, serialno);
if (pad)
return pad;
}
/* then look in current chain if any */
if (ogg->current_chain) {
pad = gst_ogg_chain_get_stream (ogg->current_chain, serialno);
if (pad)
return pad;
}
for (i = 0; i < ogg->chains->len; i++) {
GstOggChain *chain = g_array_index (ogg->chains, GstOggChain *, i);
pad = gst_ogg_chain_get_stream (chain, serialno);
if (pad)
return pad;
}
return NULL;
}
/* find a chain with a given serial number
*/
static GstOggChain *
gst_ogg_demux_find_chain (GstOggDemux * ogg, guint32 serialno)
{
GstOggPad *pad;
pad = gst_ogg_demux_find_pad (ogg, serialno);
if (pad) {
return pad->chain;
}
return NULL;
}
/* returns TRUE if all streams have valid start time */
static gboolean
gst_ogg_demux_collect_chain_info (GstOggDemux * ogg, GstOggChain * chain)
{
gboolean res = TRUE;
chain->total_time = GST_CLOCK_TIME_NONE;
GST_DEBUG_OBJECT (ogg, "trying to collect chain info");
/* see if we have a start time on all streams */
chain->segment_start = gst_ogg_demux_collect_start_time (ogg, chain);
if (chain->segment_start == G_MAXUINT64) {
/* not yet, stream some more data */
res = FALSE;
} else if (chain->segment_stop != GST_CLOCK_TIME_NONE) {
/* we can calculate a total time */
chain->total_time = chain->segment_stop - chain->segment_start;
}
GST_DEBUG ("total time %" G_GUINT64_FORMAT, chain->total_time);
GST_DEBUG_OBJECT (ogg, "return %d", res);
return res;
}
static void
gst_ogg_demux_collect_info (GstOggDemux * ogg)
{
gint i;
/* collect all info */
ogg->total_time = 0;
for (i = 0; i < ogg->chains->len; i++) {
GstOggChain *chain = g_array_index (ogg->chains, GstOggChain *, i);
chain->begin_time = ogg->total_time;
gst_ogg_demux_collect_chain_info (ogg, chain);
ogg->total_time += chain->total_time;
}
gst_segment_set_duration (&ogg->segment, GST_FORMAT_TIME, ogg->total_time);
}
/* find all the chains in the ogg file, this reads the first and
* last page of the ogg stream, if they match then the ogg file has
* just one chain, else we do a binary search for all chains.
*/
static GstFlowReturn
gst_ogg_demux_find_chains (GstOggDemux * ogg)
{
ogg_page og;
GstPad *peer;
GstFormat format;
gboolean res;
guint32 serialno;
GstOggChain *chain;
GstFlowReturn ret;
/* get peer to figure out length */
if ((peer = gst_pad_get_peer (ogg->sinkpad)) == NULL)
goto no_peer;
/* find length to read last page, we store this for later use. */
format = GST_FORMAT_BYTES;
res = gst_pad_query_duration (peer, &format, &ogg->length);
gst_object_unref (peer);
if (!res || ogg->length <= 0)
goto no_length;
GST_DEBUG_OBJECT (ogg, "file length %" G_GINT64_FORMAT, ogg->length);
/* read chain from offset 0, this is the first chain of the
* ogg file. */
gst_ogg_demux_seek (ogg, 0);
ret = gst_ogg_demux_read_chain (ogg, &chain);
if (ret != GST_FLOW_OK)
goto no_first_chain;
/* read page from end offset, we use this page to check if its serial
* number is contained in the first chain. If this is the case then
* this ogg is not a chained ogg and we can skip the scanning. */
gst_ogg_demux_seek (ogg, ogg->length);
ret = gst_ogg_demux_get_prev_page (ogg, &og, NULL);
if (ret != GST_FLOW_OK)
goto no_last_page;
serialno = ogg_page_serialno (&og);
if (!gst_ogg_chain_has_stream (chain, serialno)) {
/* the last page is not in the first stream, this means we should
* find all the chains in this chained ogg. */
ret =
gst_ogg_demux_bisect_forward_serialno (ogg, 0, 0, ogg->length, chain,
0);
} else {
/* we still call this function here but with an empty range so that
* we can reuse the setup code in this routine. */
ret =
gst_ogg_demux_bisect_forward_serialno (ogg, 0, ogg->length,
ogg->length, chain, 0);
}
if (ret != GST_FLOW_OK)
goto done;
/* all fine, collect and print */
gst_ogg_demux_collect_info (ogg);
/* dump our chains and streams */
gst_ogg_print (ogg);
done:
return ret;
/*** error cases ***/
no_peer:
{
GST_ELEMENT_ERROR (ogg, STREAM, DEMUX, (NULL), ("we don't have a peer"));
return GST_FLOW_NOT_LINKED;
}
no_length:
{
GST_ELEMENT_ERROR (ogg, STREAM, DEMUX, (NULL), ("can't get file length"));
return GST_FLOW_NOT_SUPPORTED;
}
no_first_chain:
{
GST_ELEMENT_ERROR (ogg, STREAM, DEMUX, (NULL), ("can't get first chain"));
return GST_FLOW_ERROR;
}
no_last_page:
{
GST_DEBUG_OBJECT (ogg, "can't get last page");
if (chain)
gst_ogg_chain_free (chain);
return ret;
}
}
static GstFlowReturn
gst_ogg_demux_handle_page (GstOggDemux * ogg, ogg_page * page)
{
GstOggPad *pad;
gint64 granule;
guint32 serialno;
GstFlowReturn result = GST_FLOW_OK;
serialno = ogg_page_serialno (page);
granule = ogg_page_granulepos (page);
GST_LOG_OBJECT (ogg,
"processing ogg page (serial %08x, "
"pageno %ld, granulepos %" G_GINT64_FORMAT ", bos %d)", serialno,
ogg_page_pageno (page), granule, ogg_page_bos (page));
if (ogg_page_bos (page)) {
GstOggChain *chain;
/* first page */
/* see if we know about the chain already */
chain = gst_ogg_demux_find_chain (ogg, serialno);
if (chain) {
GstEvent *event;
gint64 start = 0;
if (chain->segment_start != GST_CLOCK_TIME_NONE)
start = chain->segment_start;
/* create the newsegment event we are going to send out */
event = gst_event_new_new_segment (FALSE, ogg->segment.rate,
GST_FORMAT_TIME, start, chain->segment_stop, chain->begin_time);
gst_event_set_seqnum (event, ogg->seqnum);
GST_DEBUG_OBJECT (ogg,
"segment: start %" GST_TIME_FORMAT ", stop %" GST_TIME_FORMAT
", time %" GST_TIME_FORMAT, GST_TIME_ARGS (start),
GST_TIME_ARGS (chain->segment_stop),
GST_TIME_ARGS (chain->begin_time));
/* activate it as it means we have a non-header, this will also deactivate
* the currently running chain. */
gst_ogg_demux_activate_chain (ogg, chain, event);
pad = gst_ogg_demux_find_pad (ogg, serialno);
} else {
GstClockTime chain_time;
gint64 current_time;
/* this can only happen in push mode */
if (ogg->pullmode)
goto unknown_chain;
current_time = ogg->segment.last_stop;
/* time of new chain is current time */
chain_time = current_time;
if (ogg->building_chain == NULL) {
GstOggChain *newchain;
newchain = gst_ogg_chain_new (ogg);
newchain->offset = 0;
/* set new chain begin time aligned with end time of old chain */
newchain->begin_time = chain_time;
GST_DEBUG_OBJECT (ogg, "new chain, begin time %" GST_TIME_FORMAT,
GST_TIME_ARGS (chain_time));
/* and this is the one we are building now */
ogg->building_chain = newchain;
}
pad = gst_ogg_chain_new_stream (ogg->building_chain, serialno);
}
} else {
pad = gst_ogg_demux_find_pad (ogg, serialno);
}
if (pad) {
result = gst_ogg_pad_submit_page (pad, page);
} else {
GST_PUSH_LOCK (ogg);
if (!ogg->pullmode && !ogg->push_disable_seeking) {
/* no pad while probing for duration, we must have a chained stream,
and we don't support them, so back off */
GST_INFO_OBJECT (ogg, "We seem to have a chained stream, we won't seek");
if (ogg->push_state == PUSH_DURATION) {
GstFlowReturn res;
res = gst_ogg_demux_seek_back_after_push_duration_check_unlock (ogg);
if (res != GST_FLOW_OK)
return res;
}
/* only once we seeked back */
GST_PUSH_LOCK (ogg);
ogg->push_disable_seeking = TRUE;
} else {
GST_PUSH_UNLOCK (ogg);
/* no pad. This means an ogg page without bos has been seen for this
* serialno. we just ignore it but post a warning... */
GST_ELEMENT_WARNING (ogg, STREAM, DECODE,
(NULL), ("unknown ogg pad for serial %08x detected", serialno));
return GST_FLOW_OK;
}
GST_PUSH_UNLOCK (ogg);
}
return result;
/* ERRORS */
unknown_chain:
{
GST_ELEMENT_ERROR (ogg, STREAM, DECODE,
(NULL), ("unknown ogg chain for serial %08x detected", serialno));
return GST_FLOW_ERROR;
}
}
/* streaming mode, receive a buffer, parse it, create pads for
* the serialno, submit pages and packets to the oggpads
*/
static GstFlowReturn
gst_ogg_demux_chain (GstPad * pad, GstBuffer * buffer)
{
GstOggDemux *ogg;
gint ret = 0;
GstFlowReturn result = GST_FLOW_OK;
ogg = GST_OGG_DEMUX (GST_OBJECT_PARENT (pad));
GST_DEBUG_OBJECT (ogg, "enter");
result = gst_ogg_demux_submit_buffer (ogg, buffer);
if (result < 0) {
GST_DEBUG_OBJECT (ogg, "gst_ogg_demux_submit_buffer returned %d", result);
}
while (result == GST_FLOW_OK) {
ogg_page page;
ret = ogg_sync_pageout (&ogg->sync, &page);
if (ret == 0)
/* need more data */
break;
if (ret == -1) {
/* discontinuity in the pages */
GST_DEBUG_OBJECT (ogg, "discont in page found, continuing");
} else {
result = gst_ogg_demux_handle_page (ogg, &page);
if (result < 0) {
GST_DEBUG_OBJECT (ogg, "gst_ogg_demux_handle_page returned %d", result);
}
}
}
if (ret == 0 || result == GST_FLOW_OK) {
gst_ogg_demux_sync_streams (ogg);
}
GST_DEBUG_OBJECT (ogg, "leave with %d", result);
return result;
}
static gboolean
gst_ogg_demux_send_event (GstOggDemux * ogg, GstEvent * event)
{
GstOggChain *chain = ogg->current_chain;
gboolean res = TRUE;
if (!chain)
chain = ogg->building_chain;
if (chain) {
gint i;
for (i = 0; i < chain->streams->len; i++) {
GstOggPad *pad = g_array_index (chain->streams, GstOggPad *, i);
gst_event_ref (event);
GST_DEBUG_OBJECT (pad, "Pushing event %" GST_PTR_FORMAT, event);
res &= gst_pad_push_event (GST_PAD (pad), event);
}
} else {
GST_WARNING_OBJECT (ogg, "No chain to forward event to");
}
gst_event_unref (event);
return res;
}
static GstFlowReturn
gst_ogg_demux_combine_flows (GstOggDemux * ogg, GstOggPad * pad,
GstFlowReturn ret)
{
GstOggChain *chain;
/* store the value */
pad->last_ret = ret;
/* any other error that is not-linked can be returned right
* away */
if (ret != GST_FLOW_NOT_LINKED)
goto done;
/* only return NOT_LINKED if all other pads returned NOT_LINKED */
chain = ogg->current_chain;
if (chain) {
gint i;
for (i = 0; i < chain->streams->len; i++) {
GstOggPad *opad = g_array_index (chain->streams, GstOggPad *, i);
ret = opad->last_ret;
/* some other return value (must be SUCCESS but we can return
* other values as well) */
if (ret != GST_FLOW_NOT_LINKED)
goto done;
}
/* if we get here, all other pads were unlinked and we return
* NOT_LINKED then */
}
done:
return ret;
}
/* returns TRUE if all streams in current chain reached EOS, FALSE otherwise */
static gboolean
gst_ogg_demux_check_eos (GstOggDemux * ogg)
{
GstOggChain *chain;
gboolean eos = TRUE;
chain = ogg->current_chain;
if (G_LIKELY (chain)) {
gint i;
for (i = 0; i < chain->streams->len; i++) {
GstOggPad *opad = g_array_index (chain->streams, GstOggPad *, i);
eos = eos && opad->is_eos;
}
} else {
eos = FALSE;
}
return eos;
}
static GstFlowReturn
gst_ogg_demux_loop_forward (GstOggDemux * ogg)
{
GstFlowReturn ret;
GstBuffer *buffer;
if (ogg->offset == ogg->length) {
GST_LOG_OBJECT (ogg, "no more data to pull %" G_GINT64_FORMAT
" == %" G_GINT64_FORMAT, ogg->offset, ogg->length);
ret = GST_FLOW_UNEXPECTED;
goto done;
}
GST_LOG_OBJECT (ogg, "pull data %" G_GINT64_FORMAT, ogg->offset);
ret = gst_pad_pull_range (ogg->sinkpad, ogg->offset, CHUNKSIZE, &buffer);
if (ret != GST_FLOW_OK) {
GST_LOG_OBJECT (ogg, "Failed pull_range");
goto done;
}
ogg->offset += GST_BUFFER_SIZE (buffer);
if (G_UNLIKELY (ogg->newsegment)) {
gst_ogg_demux_send_event (ogg, ogg->newsegment);
ogg->newsegment = NULL;
}
ret = gst_ogg_demux_chain (ogg->sinkpad, buffer);
if (ret != GST_FLOW_OK) {
GST_LOG_OBJECT (ogg, "Failed demux_chain");
goto done;
}
/* check for the end of the segment */
if (gst_ogg_demux_check_eos (ogg)) {
GST_LOG_OBJECT (ogg, "got EOS");
ret = GST_FLOW_UNEXPECTED;
goto done;
}
done:
return ret;
}
/* reverse mode.
*
* We read the pages backwards and send the packets forwards. The first packet
* in the page will be pushed with the DISCONT flag set.
*
* Special care has to be taken for continued pages, which we can only decode
* when we have the previous page(s).
*/
static GstFlowReturn
gst_ogg_demux_loop_reverse (GstOggDemux * ogg)
{
GstFlowReturn ret;
ogg_page page;
gint64 offset;
if (ogg->offset == 0) {
GST_LOG_OBJECT (ogg, "no more data to pull %" G_GINT64_FORMAT
" == 0", ogg->offset);
ret = GST_FLOW_UNEXPECTED;
goto done;
}
GST_LOG_OBJECT (ogg, "read page from %" G_GINT64_FORMAT, ogg->offset);
ret = gst_ogg_demux_get_prev_page (ogg, &page, &offset);
if (ret != GST_FLOW_OK)
goto done;
ogg->offset = offset;
if (G_UNLIKELY (ogg->newsegment)) {
gst_ogg_demux_send_event (ogg, ogg->newsegment);
ogg->newsegment = NULL;
}
ret = gst_ogg_demux_handle_page (ogg, &page);
if (ret != GST_FLOW_OK)
goto done;
/* check for the end of the segment */
if (gst_ogg_demux_check_eos (ogg)) {
GST_LOG_OBJECT (ogg, "got EOS");
ret = GST_FLOW_UNEXPECTED;
goto done;
}
done:
return ret;
}
static void
gst_ogg_demux_sync_streams (GstOggDemux * ogg)
{
GstClockTime cur;
GstOggChain *chain;
guint i;
chain = ogg->current_chain;
cur = ogg->segment.last_stop;
if (chain == NULL || cur == -1)
return;
for (i = 0; i < chain->streams->len; i++) {
GstOggPad *stream = g_array_index (chain->streams, GstOggPad *, i);
/* Theoretically, we should be doing this for all streams, but we're only
* doing it for known-to-be-sparse streams at the moment in order not to
* break things for wrongly-muxed streams (like we used to produce once) */
if (stream->map.is_sparse && stream->last_stop != GST_CLOCK_TIME_NONE) {
/* Does this stream lag? Random threshold of 2 seconds */
if (GST_CLOCK_DIFF (stream->last_stop, cur) > (2 * GST_SECOND)) {
GST_DEBUG_OBJECT (stream, "synchronizing stream with others by "
"advancing time from %" GST_TIME_FORMAT " to %" GST_TIME_FORMAT,
GST_TIME_ARGS (stream->last_stop), GST_TIME_ARGS (cur));
stream->last_stop = cur;
/* advance stream time (FIXME: is this right, esp. time_pos?) */
gst_pad_push_event (GST_PAD_CAST (stream),
gst_event_new_new_segment (TRUE, ogg->segment.rate,
GST_FORMAT_TIME, stream->last_stop, -1, stream->last_stop));
}
}
}
}
/* random access code
*
* - first find all the chains and streams by scanning the file.
* - then get and chain buffers, just like the streaming case.
* - when seeking, we can use the chain info to perform the seek.
*/
static void
gst_ogg_demux_loop (GstOggPad * pad)
{
GstOggDemux *ogg;
GstFlowReturn ret;
GstEvent *event;
ogg = GST_OGG_DEMUX (GST_OBJECT_PARENT (pad));
if (ogg->need_chains) {
gboolean res;
/* this is the only place where we write chains and thus need to lock. */
GST_CHAIN_LOCK (ogg);
ret = gst_ogg_demux_find_chains (ogg);
GST_CHAIN_UNLOCK (ogg);
if (ret != GST_FLOW_OK)
goto chain_read_failed;
ogg->need_chains = FALSE;
GST_OBJECT_LOCK (ogg);
ogg->running = TRUE;
event = ogg->event;
ogg->event = NULL;
GST_OBJECT_UNLOCK (ogg);
/* and seek to configured positions without FLUSH */
res = gst_ogg_demux_perform_seek_pull (ogg, event);
if (event)
gst_event_unref (event);
if (!res)
goto seek_failed;
}
if (ogg->segment.rate >= 0.0)
ret = gst_ogg_demux_loop_forward (ogg);
else
ret = gst_ogg_demux_loop_reverse (ogg);
if (ret != GST_FLOW_OK)
goto pause;
gst_ogg_demux_sync_streams (ogg);
return;
/* ERRORS */
chain_read_failed:
{
/* error was posted */
goto pause;
}
seek_failed:
{
GST_ELEMENT_ERROR (ogg, STREAM, DEMUX, (NULL),
("failed to start demuxing ogg"));
ret = GST_FLOW_ERROR;
goto pause;
}
pause:
{
const gchar *reason = gst_flow_get_name (ret);
GstEvent *event = NULL;
GST_LOG_OBJECT (ogg, "pausing task, reason %s", reason);
ogg->segment_running = FALSE;
gst_pad_pause_task (ogg->sinkpad);
if (ret == GST_FLOW_UNEXPECTED) {
/* perform EOS logic */
if (ogg->segment.flags & GST_SEEK_FLAG_SEGMENT) {
gint64 stop;
GstMessage *message;
/* for segment playback we need to post when (in stream time)
* we stopped, this is either stop (when set) or the duration. */
if ((stop = ogg->segment.stop) == -1)
stop = ogg->segment.duration;
GST_LOG_OBJECT (ogg, "Sending segment done, at end of segment");
message =
gst_message_new_segment_done (GST_OBJECT (ogg), GST_FORMAT_TIME,
stop);
gst_message_set_seqnum (message, ogg->seqnum);
gst_element_post_message (GST_ELEMENT (ogg), message);
} else {
/* normal playback, send EOS to all linked pads */
GST_LOG_OBJECT (ogg, "Sending EOS, at end of stream");
event = gst_event_new_eos ();
}
} else if (ret == GST_FLOW_NOT_LINKED || ret < GST_FLOW_UNEXPECTED) {
GST_ELEMENT_ERROR (ogg, STREAM, FAILED,
(_("Internal data stream error.")),
("stream stopped, reason %s", reason));
event = gst_event_new_eos ();
}
/* For wrong-state we still want to pause the task and stop
* but no error message or other things are necessary.
* wrong-state is no real error and will be caused by flushing,
* e.g. because of a flushing seek.
*/
if (event) {
gst_event_set_seqnum (event, ogg->seqnum);
gst_ogg_demux_send_event (ogg, event);
}
return;
}
}
static void
gst_ogg_demux_clear_chains (GstOggDemux * ogg)
{
gint i;
gst_ogg_demux_deactivate_current_chain (ogg);
GST_CHAIN_LOCK (ogg);
for (i = 0; i < ogg->chains->len; i++) {
GstOggChain *chain = g_array_index (ogg->chains, GstOggChain *, i);
gst_ogg_chain_free (chain);
}
ogg->chains = g_array_set_size (ogg->chains, 0);
GST_CHAIN_UNLOCK (ogg);
}
/* this function is called when the pad is activated and should start
* processing data.
*
* We check if we can do random access to decide if we work push or
* pull based.
*/
static gboolean
gst_ogg_demux_sink_activate (GstPad * sinkpad)
{
if (gst_pad_check_pull_range (sinkpad)) {
GST_DEBUG_OBJECT (sinkpad, "activating pull");
return gst_pad_activate_pull (sinkpad, TRUE);
} else {
GST_DEBUG_OBJECT (sinkpad, "activating push");
return gst_pad_activate_push (sinkpad, TRUE);
}
}
/* this function gets called when we activate ourselves in push mode.
* We cannot seek (ourselves) in the stream */
static gboolean
gst_ogg_demux_sink_activate_push (GstPad * sinkpad, gboolean active)
{
GstOggDemux *ogg;
ogg = GST_OGG_DEMUX (GST_OBJECT_PARENT (sinkpad));
ogg->pullmode = FALSE;
ogg->resync = FALSE;
return TRUE;
}
/* this function gets called when we activate ourselves in pull mode.
* We can perform random access to the resource and we start a task
* to start reading */
static gboolean
gst_ogg_demux_sink_activate_pull (GstPad * sinkpad, gboolean active)
{
GstOggDemux *ogg;
ogg = GST_OGG_DEMUX (GST_OBJECT_PARENT (sinkpad));
if (active) {
ogg->need_chains = TRUE;
ogg->pullmode = TRUE;
return gst_pad_start_task (sinkpad, (GstTaskFunction) gst_ogg_demux_loop,
sinkpad);
} else {
return gst_pad_stop_task (sinkpad);
}
}
static GstStateChangeReturn
gst_ogg_demux_change_state (GstElement * element, GstStateChange transition)
{
GstOggDemux *ogg;
GstStateChangeReturn result = GST_STATE_CHANGE_FAILURE;
ogg = GST_OGG_DEMUX (element);
switch (transition) {
case GST_STATE_CHANGE_NULL_TO_READY:
ogg->basetime = 0;
ogg_sync_init (&ogg->sync);
break;
case GST_STATE_CHANGE_READY_TO_PAUSED:
ogg_sync_reset (&ogg->sync);
ogg->running = FALSE;
ogg->bitrate = 0;
ogg->segment_running = FALSE;
ogg->total_time = -1;
GST_PUSH_LOCK (ogg);
ogg->push_byte_offset = 0;
ogg->push_byte_length = -1;
ogg->push_time_length = GST_CLOCK_TIME_NONE;
ogg->push_time_offset = GST_CLOCK_TIME_NONE;
ogg->push_disable_seeking = FALSE;
ogg->push_state = PUSH_PLAYING;
GST_PUSH_UNLOCK (ogg);
gst_segment_init (&ogg->segment, GST_FORMAT_TIME);
break;
case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
break;
default:
break;
}
result = parent_class->change_state (element, transition);
switch (transition) {
case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
break;
case GST_STATE_CHANGE_PAUSED_TO_READY:
gst_ogg_demux_clear_chains (ogg);
GST_OBJECT_LOCK (ogg);
ogg->running = FALSE;
ogg->segment_running = FALSE;
GST_OBJECT_UNLOCK (ogg);
break;
case GST_STATE_CHANGE_READY_TO_NULL:
ogg_sync_clear (&ogg->sync);
break;
default:
break;
}
return result;
}
gboolean
gst_ogg_demux_plugin_init (GstPlugin * plugin)
{
GST_DEBUG_CATEGORY_INIT (gst_ogg_demux_debug, "oggdemux", 0, "ogg demuxer");
GST_DEBUG_CATEGORY_INIT (gst_ogg_demux_setup_debug, "oggdemux_setup", 0,
"ogg demuxer setup stage when parsing pipeline");
#ifdef ENABLE_NLS
GST_DEBUG ("binding text domain %s to locale dir %s", GETTEXT_PACKAGE,
LOCALEDIR);
bindtextdomain (GETTEXT_PACKAGE, LOCALEDIR);
bind_textdomain_codeset (GETTEXT_PACKAGE, "UTF-8");
#endif
return gst_element_register (plugin, "oggdemux", GST_RANK_PRIMARY,
GST_TYPE_OGG_DEMUX);
}
/* prints all info about the element */
#undef GST_CAT_DEFAULT
#define GST_CAT_DEFAULT gst_ogg_demux_setup_debug
#ifdef GST_DISABLE_GST_DEBUG
static void
gst_ogg_print (GstOggDemux * ogg)
{
/* NOP */
}
#else /* !GST_DISABLE_GST_DEBUG */
static void
gst_ogg_print (GstOggDemux * ogg)
{
guint j, i;
GST_INFO_OBJECT (ogg, "%u chains", ogg->chains->len);
GST_INFO_OBJECT (ogg, " total time: %" GST_TIME_FORMAT,
GST_TIME_ARGS (ogg->total_time));
for (i = 0; i < ogg->chains->len; i++) {
GstOggChain *chain = g_array_index (ogg->chains, GstOggChain *, i);
GST_INFO_OBJECT (ogg, " chain %d (%u streams):", i, chain->streams->len);
GST_INFO_OBJECT (ogg,
" offset: %" G_GINT64_FORMAT " - %" G_GINT64_FORMAT, chain->offset,
chain->end_offset);
GST_INFO_OBJECT (ogg, " begin time: %" GST_TIME_FORMAT,
GST_TIME_ARGS (chain->begin_time));
GST_INFO_OBJECT (ogg, " total time: %" GST_TIME_FORMAT,
GST_TIME_ARGS (chain->total_time));
GST_INFO_OBJECT (ogg, " segment start: %" GST_TIME_FORMAT,
GST_TIME_ARGS (chain->segment_start));
GST_INFO_OBJECT (ogg, " segment stop: %" GST_TIME_FORMAT,
GST_TIME_ARGS (chain->segment_stop));
for (j = 0; j < chain->streams->len; j++) {
GstOggPad *stream = g_array_index (chain->streams, GstOggPad *, j);
GST_INFO_OBJECT (ogg, " stream %08x: %s", stream->map.serialno,
gst_ogg_stream_get_media_type (&stream->map));
GST_INFO_OBJECT (ogg, " start time: %" GST_TIME_FORMAT,
GST_TIME_ARGS (stream->start_time));
}
}
}
#endif /* GST_DISABLE_GST_DEBUG */