ext/ogg/gstoggmux.c: Forward port rewrite of muxing strategy to 0.9 version of oggmux.

Original commit message from CVS:
* ext/ogg/gstoggmux.c: (gst_ogg_mux_request_new_pad),
(gst_ogg_mux_buffer_from_page), (gst_ogg_mux_push_buffer),
(gst_ogg_mux_dequeue_page), (gst_ogg_mux_pad_queue_page),
(gst_ogg_mux_send_headers), (gst_ogg_mux_collected):
Forward port rewrite of muxing strategy to 0.9 version of oggmux.
This makes us mux things correctly according to the ogg muxing
rules. Still not handling EOS correctly right now, though.
This commit is contained in:
Michael Smith 2005-11-08 16:37:32 +00:00
parent 9136b3d6aa
commit 6f1277ab13
2 changed files with 202 additions and 19 deletions

View file

@ -1,3 +1,13 @@
2005-11-08 Michael Smith <msmith@fluendo.com>
* ext/ogg/gstoggmux.c: (gst_ogg_mux_request_new_pad),
(gst_ogg_mux_buffer_from_page), (gst_ogg_mux_push_buffer),
(gst_ogg_mux_dequeue_page), (gst_ogg_mux_pad_queue_page),
(gst_ogg_mux_send_headers), (gst_ogg_mux_collected):
Forward port rewrite of muxing strategy to 0.9 version of oggmux.
This makes us mux things correctly according to the ogg muxing
rules. Still not handling EOS correctly right now, though.
2005-11-08 Tim-Philipp Müller <tim at centricular dot net>
* gst/audioconvert/gstaudioconvert.c:

View file

@ -39,6 +39,8 @@ GST_DEBUG_CATEGORY_STATIC (gst_ogg_mux_debug);
#define GST_IS_OGG_MUX(obj) (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_OGG_MUX))
#define GST_IS_OGG_MUX_CLASS(obj) (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_OGG_MUX))
#define GST_GP_FORMAT "[gp %8" G_GINT64_FORMAT "]"
typedef struct _GstOggMux GstOggMux;
typedef struct _GstOggMuxClass GstOggMuxClass;
@ -63,11 +65,15 @@ typedef struct
guint64 duration; /* duration of current page */
gboolean eos;
gint64 offset;
GstClockTime timestamp; /* timestamp for granulepos of last complete
packet on this page */
GstOggPadState state; /* state of the pad */
GList *headers;
GQueue *pagebuffers; /* List of pages in buffers ready for pushing */
gboolean new_page; /* starting a new page */
gboolean first_delta; /* was the first packet in the page a delta */
gboolean prev_delta; /* was the previous buffer a delta frame */
@ -351,6 +357,11 @@ gst_ogg_mux_request_new_pad (GstElement * element,
oggpad->new_page = TRUE;
oggpad->first_delta = FALSE;
oggpad->prev_delta = FALSE;
/* TODO: delete this queue (and the things contained within) later,
* possibly when doing gst_collectpads_remove_pad() (which we don't seem
* to do at all?)
*/
oggpad->pagebuffers = g_queue_new ();
}
}
@ -483,7 +494,10 @@ gst_ogg_mux_buffer_from_page (GstOggMux * mux, ogg_page * page, gboolean delta)
GST_BUFFER_TIMESTAMP (buffer) = mux->next_ts;
GST_BUFFER_OFFSET (buffer) = mux->offset;
mux->offset += GST_BUFFER_SIZE (buffer);
GST_BUFFER_OFFSET_END (buffer) = mux->offset;
/* Here we set granulepos as our OFFSET_END to give easy direct access to
* this value later. Before we push it, we reset this to OFFSET + SIZE
* (see gst_ogg_mux_push_buffer). */
GST_BUFFER_OFFSET_END (buffer) = ogg_page_granulepos (page);
if (delta)
GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DELTA_UNIT);
@ -491,14 +505,130 @@ gst_ogg_mux_buffer_from_page (GstOggMux * mux, ogg_page * page, gboolean delta)
}
static GstFlowReturn
gst_ogg_mux_push_page (GstOggMux * mux, ogg_page * page, gboolean delta)
gst_ogg_mux_push_buffer (GstOggMux * mux, GstBuffer * buffer)
{
GstBuffer *buffer;
GST_BUFFER_OFFSET_END (buffer) = GST_BUFFER_OFFSET (buffer) +
GST_BUFFER_SIZE (buffer);
return gst_pad_push (mux->srcpad, buffer);
}
/* if all queues have at least one page, dequeue the page with the lowest
* timestamp */
static gboolean
gst_ogg_mux_dequeue_page (GstOggMux * mux, GstFlowReturn * flowret)
{
GSList *walk;
GstOggPad *opad = NULL; /* "oldest" pad */
GstClockTime oldest = GST_CLOCK_TIME_NONE;
GstBuffer *buf = NULL;
gboolean ret = FALSE;
*flowret = GST_FLOW_OK;
walk = mux->collect->data;
while (walk) {
GstOggPad *pad = (GstOggPad *) walk->data;
/* We need each queue to either be at EOS, or have one or more pages
* available with a set granulepos (i.e. not -1), otherwise we don't have
* enough data yet to determine which stream needs to go next for correct
* time ordering. */
if (pad->pagebuffers->length == 0) {
if (pad->eos) {
GST_LOG_OBJECT (pad, "pad is EOS, skipping for dequeue decision");
} else {
GST_LOG_OBJECT (pad, "no pages in this queue, can't dequeue");
return FALSE;
}
} else {
/* We then need to check for a non-negative granulepos */
int i;
gboolean valid = FALSE;
for (i = 0; i < pad->pagebuffers->length; i++) {
buf = g_queue_peek_nth (pad->pagebuffers, i);
/* Here we check the OFFSET_END, which is actually temporarily the
* granulepos value for this buffer */
if (GST_BUFFER_OFFSET_END (buf) != -1) {
valid = TRUE;
break;
}
}
if (!valid) {
GST_LOG_OBJECT (pad, "No page timestamps in queue, can't dequeue");
return FALSE;
}
}
walk = g_slist_next (walk);
}
walk = mux->collect->data;
while (walk) {
GstOggPad *pad = (GstOggPad *) walk->data;
/* any page with a granulepos of -1 can be pushed immediately.
* TODO: it CAN be, but it seems silly to do so? */
buf = g_queue_peek_head (pad->pagebuffers);
while (buf && GST_BUFFER_OFFSET_END (buf) == -1) {
GST_LOG_OBJECT (pad, GST_GP_FORMAT " pushing page", -1);
g_queue_pop_head (pad->pagebuffers);
*flowret = gst_ogg_mux_push_buffer (mux, buf);
buf = g_queue_peek_head (pad->pagebuffers);
ret = TRUE;
}
if (buf) {
/* if no oldest buffer yet, take this one */
if (oldest == GST_CLOCK_TIME_NONE) {
oldest = GST_BUFFER_TIMESTAMP (buf);
opad = pad;
} else {
/* if we have an oldest, compare with this one */
if (GST_BUFFER_TIMESTAMP (buf) < oldest) {
oldest = GST_BUFFER_TIMESTAMP (buf);
opad = pad;
}
}
}
walk = g_slist_next (walk);
}
if (oldest != GST_CLOCK_TIME_NONE) {
g_assert (opad);
buf = g_queue_pop_head (opad->pagebuffers);
GST_LOG_OBJECT (opad,
GST_GP_FORMAT " pushing oldest page (time %" GST_TIME_FORMAT ")",
GST_BUFFER_OFFSET_END (buf),
GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buf)));
*flowret = gst_ogg_mux_push_buffer (mux, buf);
ret = TRUE;
}
return ret;
}
/* put the given page on a per-pad queue, timestamping it correctly.
* after that, dequeue and push as many pages as possible
* before calling me, make sure that the the pad's timestamp matches
* the page's granulepos */
static GstFlowReturn
gst_ogg_mux_pad_queue_page (GstOggMux * mux, GstOggPad * pad, ogg_page * page,
gboolean delta)
{
GstBuffer *buffer = gst_ogg_mux_buffer_from_page (mux, page, delta);
GstFlowReturn ret;
buffer = gst_ogg_mux_buffer_from_page (mux, page, delta);
/* take the timestamp of the last completed packet on this page */
GST_BUFFER_TIMESTAMP (buffer) = pad->timestamp;
g_queue_push_tail (pad->pagebuffers, buffer);
GST_LOG_OBJECT (pad, GST_GP_FORMAT " queued buffer page (time %" GST_TIME_FORMAT "), %d page buffers queued", ogg_page_granulepos (page), GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)), pad->pagebuffers->length); /* no g_queue_get_length in 2.2 */
ret = gst_pad_push (mux->srcpad, buffer);
while (gst_ogg_mux_dequeue_page (mux, &ret)) {
if (ret != GST_FLOW_OK)
break;
}
return ret;
}
@ -549,7 +679,8 @@ gst_ogg_mux_compare_pads (GstOggMux * ogg_mux, GstOggPad * old, GstOggPad * new)
}
/* make sure a buffer is queued on all pads, returns a pointer to an oggpad
* that holds the best buffer or NULL when no pad was usable */
* that holds the best buffer or NULL when no pad was usable.
* "best" means the buffer marked with the lowest timestamp */
static GstOggPad *
gst_ogg_mux_queue_pads (GstOggMux * ogg_mux)
{
@ -876,7 +1007,7 @@ gst_ogg_mux_send_headers (GstOggMux * mux)
hwalk = hwalk->next;
if ((ret = gst_pad_push (mux->srcpad, buf)) != GST_FLOW_OK)
if ((ret = gst_ogg_mux_push_buffer (mux, buf)) != GST_FLOW_OK)
break;
}
g_list_free (hbufs);
@ -889,16 +1020,21 @@ gst_ogg_mux_send_headers (GstOggMux * mux)
* basic idea:
*
* 1) find a pad to pull on, this is done by looking at the buffers
* to decide which one should be muxed first.
* to decide which one to use, we use the 'oldest' one first.
* 2) store the selected pad and keep on pulling until we fill a
* complete ogg page or the ogg page is filled above the max-delay
* threshold. This is needed because the ogg spec says that
* you should fill a complete page with data from the same logical
* stream. When the page is filled, go back to 1).
* 3) before filling a packet, read ahead one more buffer to see if this
* 3) before filling a page, read ahead one more buffer to see if this
* packet is the last of the stream. We need to do this because the ogg
* spec mandates that the last packet should have the EOS flag set before
* sending it to ogg.
* sending it to ogg. FIXME: Apparently we're allowed to send empty 'nil'
* pages with the EOS flag set for EOS, so we could do this.
* 4) pages get queued on a per-pad queue. Every time a page is queued, a
* dequeue is called, which will dequeue the oldest page on any pad, provided
* that ALL pads have at least one marked page in the queue (TODO: or that
* pad is at EOS?)
*/
static GstFlowReturn
gst_ogg_mux_collected (GstCollectPads * pads, GstOggMux * ogg_mux)
@ -906,9 +1042,12 @@ gst_ogg_mux_collected (GstCollectPads * pads, GstOggMux * ogg_mux)
GstOggPad *best;
gboolean delta_unit;
GstFlowReturn ret;
gint64 granulepos = 0;
GstClockTime timestamp;
GST_DEBUG ("collected");
/* queue buffers on all pads; find a buffer with the lowest timestamp */
best = gst_ogg_mux_queue_pads (ogg_mux);
if (best && !best->buffer)
return GST_FLOW_OK;
@ -920,8 +1059,9 @@ gst_ogg_mux_collected (GstCollectPads * pads, GstOggMux * ogg_mux)
return GST_FLOW_WRONG_STATE;
}
/* we're pulling a pad and there is a better one, see if we need
* to flush the current page */
/* if we were already pulling from one pad, but the new "best" buffer is
* from another pad, we need to check if we have reason to flush a page
* for the pad we were pulling from before */
if (ogg_mux->pulling && best &&
ogg_mux->pulling != best && ogg_mux->pulling->buffer) {
GstOggPad *pad = ogg_mux->pulling;
@ -935,7 +1075,9 @@ gst_ogg_mux_collected (GstCollectPads * pads, GstOggMux * ogg_mux)
ogg_page page;
while (ogg_stream_flush (&pad->stream, &page)) {
ret = gst_ogg_mux_push_page (ogg_mux, &page, pad->first_delta);
/* Place page into the per-pad queue */
ret = gst_ogg_mux_pad_queue_page (ogg_mux, pad, &page,
pad->first_delta);
/* increment the page number counter */
pad->pageno++;
/* mark other pages as delta */
@ -965,7 +1107,7 @@ gst_ogg_mux_collected (GstCollectPads * pads, GstOggMux * ogg_mux)
}
/* we are pulling from a pad, continue to do so until a page
* has been filled and pushed */
* has been filled and queued */
if (ogg_mux->pulling != NULL) {
ogg_packet packet;
ogg_page page;
@ -1018,10 +1160,11 @@ gst_ogg_mux_collected (GstCollectPads * pads, GstOggMux * ogg_mux)
}
}
/* force flush */
/* flush the currently built page if neccesary */
if (force_flush) {
while (ogg_stream_flush (&pad->stream, &page)) {
ret = gst_ogg_mux_push_page (ogg_mux, &page, pad->first_delta);
ret = gst_ogg_mux_pad_queue_page (ogg_mux, pad, &page,
pad->first_delta);
/* increment the page number counter */
pad->pageno++;
/* mark other pages as delta */
@ -1068,6 +1211,9 @@ gst_ogg_mux_collected (GstCollectPads * pads, GstOggMux * ogg_mux)
ogg_stream_packetin (&pad->stream, &packet);
granulepos = GST_BUFFER_OFFSET_END (pad->buffer);
timestamp = GST_BUFFER_TIMESTAMP (pad->buffer);
/* don't need the old buffer anymore */
gst_buffer_unref (pad->buffer);
/* store new readahead buffer */
@ -1076,18 +1222,36 @@ gst_ogg_mux_collected (GstCollectPads * pads, GstOggMux * ogg_mux)
/* let ogg write out the pages now. The packet we got could end
* up in more than one page so we need to write them all */
if (ogg_stream_pageout (&pad->stream, &page) > 0) {
if (ogg_page_granulepos (&page) == granulepos) {
/* the packet we streamed in finishes on the page,
* because the page's granulepos is the granulepos of the last
* packet completed on that page,
* so update the timestamp that we will give to the page */
pad->timestamp = timestamp;
GST_DEBUG_OBJECT (pad, "Timestamp of pad is %" GST_TIME_FORMAT
", granulepos is %lld", GST_TIME_ARGS (timestamp), granulepos);
}
/* push the page */
ret = gst_ogg_mux_push_page (ogg_mux, &page, pad->first_delta);
ret = gst_ogg_mux_pad_queue_page (ogg_mux, pad, &page, pad->first_delta);
pad->pageno++;
/* mark next pages as delta */
pad->first_delta = TRUE;
/* use an inner loop her to flush the remaining pages and
/* use an inner loop here to flush the remaining pages and
* mark them as delta frames as well */
while (ogg_stream_pageout (&pad->stream, &page) > 0) {
if (ogg_page_granulepos (&page) == granulepos) {
/* the page has taken up the new packet completely, which means
* the packet ends the page and we can update the timestamp
* before pushing out */
pad->timestamp = timestamp;
}
/* we have a complete page now, we can push the page
* and make sure to pull on a new pad the next time around */
ret = gst_ogg_mux_push_page (ogg_mux, &page, pad->first_delta);
ret = gst_ogg_mux_pad_queue_page (ogg_mux, pad, &page,
pad->first_delta);
/* increment the page number counter */
pad->pageno++;
}
@ -1098,6 +1262,15 @@ gst_ogg_mux_collected (GstCollectPads * pads, GstOggMux * ogg_mux)
* pad for pulling in the next iteration */
ogg_mux->pulling = NULL;
}
/* Update the timestamp, if neccesary, since and future page will have at
* least this timestamp.
*/
if (pad->timestamp < timestamp) {
pad->timestamp = timestamp;
GST_DEBUG_OBJECT (pad, "Updated timestamp of pad to %" GST_TIME_FORMAT,
GST_TIME_ARGS (timestamp));
}
}
return GST_FLOW_OK;