oggmux: refactor how EOS is determined

This decreases the number of buffers held on each pad by one,
eliminating next_buffer.  Simplifies the logic by relying solely
on CollectPads to let us know when a pad is in EOS.  As a side
benefit, the collect pads related code is structured more like
other CollectPad users.

The previous code would occasionally mark the wrong pad as EOS,
causing the code to get in a state where all the streams were
finished, but EOS hadn't been sent to the source pad.
This commit is contained in:
David Schleef 2011-06-07 21:30:18 -07:00
parent efdd32580e
commit ea0d666d11
2 changed files with 46 additions and 115 deletions

View file

@ -739,24 +739,18 @@ gst_ogg_mux_compare_pads (GstOggMux * ogg_mux, GstOggPadData * first,
/* if the first pad doesn't contain anything or is even NULL, return /* if the first pad doesn't contain anything or is even NULL, return
* the second pad as best candidate and vice versa */ * the second pad as best candidate and vice versa */
if (first == NULL || (first->buffer == NULL && first->next_buffer == NULL)) if (first == NULL)
return 1; return 1;
if (second == NULL || (second->buffer == NULL && second->next_buffer == NULL)) if (second == NULL)
return -1; return -1;
/* no timestamp on first buffer, it must go first */ /* no timestamp on first buffer, it must go first */
if (first->buffer) firsttime = GST_BUFFER_TIMESTAMP (first->buffer);
firsttime = GST_BUFFER_TIMESTAMP (first->buffer);
else
firsttime = GST_BUFFER_TIMESTAMP (first->next_buffer);
if (firsttime == GST_CLOCK_TIME_NONE) if (firsttime == GST_CLOCK_TIME_NONE)
return -1; return -1;
/* no timestamp on second buffer, it must go first */ /* no timestamp on second buffer, it must go first */
if (second->buffer) secondtime = GST_BUFFER_TIMESTAMP (second->buffer);
secondtime = GST_BUFFER_TIMESTAMP (second->buffer);
else
secondtime = GST_BUFFER_TIMESTAMP (second->next_buffer);
if (secondtime == GST_CLOCK_TIME_NONE) if (secondtime == GST_CLOCK_TIME_NONE)
return 1; return 1;
@ -898,14 +892,16 @@ no_granule:
* *
* returns a pointer to an oggpad that holds the best buffer, or * returns a pointer to an oggpad that holds the best buffer, or
* NULL when no pad was usable. "best" means the buffer marked * NULL when no pad was usable. "best" means the buffer marked
* with the lowest timestamp. If best->buffer == NULL then nothing * with the lowest timestamp. If best->buffer == NULL then either
* should be done until more data arrives */ * we're at EOS (popped = FALSE), or a buffer got dropped, so retry. */
static GstOggPadData * static GstOggPadData *
gst_ogg_mux_queue_pads (GstOggMux * ogg_mux) gst_ogg_mux_queue_pads (GstOggMux * ogg_mux, gboolean * popped)
{ {
GstOggPadData *bestpad = NULL, *still_hungry = NULL; GstOggPadData *bestpad = NULL;
GSList *walk; GSList *walk;
*popped = FALSE;
/* try to make sure we have a buffer from each usable pad first */ /* try to make sure we have a buffer from each usable pad first */
walk = ogg_mux->collect->data; walk = ogg_mux->collect->data;
while (walk) { while (walk) {
@ -923,19 +919,13 @@ gst_ogg_mux_queue_pads (GstOggMux * ogg_mux)
if (pad->buffer == NULL) { if (pad->buffer == NULL) {
GstBuffer *buf; GstBuffer *buf;
/* shift the buffer along if needed (it's okay if next_buffer is NULL) */
if (pad->buffer == NULL) {
GST_LOG_OBJECT (data->pad, "shifting buffer %" GST_PTR_FORMAT,
pad->next_buffer);
pad->buffer = pad->next_buffer;
pad->next_buffer = NULL;
}
buf = gst_collect_pads_pop (ogg_mux->collect, data); buf = gst_collect_pads_pop (ogg_mux->collect, data);
GST_LOG_OBJECT (data->pad, "popped buffer %" GST_PTR_FORMAT, buf); GST_LOG_OBJECT (data->pad, "popped buffer %" GST_PTR_FORMAT, buf);
/* On EOS we get a NULL buffer */ /* On EOS we get a NULL buffer */
if (buf != NULL) { if (buf != NULL) {
*popped = TRUE;
if (ogg_mux->delta_pad == NULL && if (ogg_mux->delta_pad == NULL &&
GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT)) GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT))
ogg_mux->delta_pad = pad; ogg_mux->delta_pad = pad;
@ -1008,52 +998,24 @@ gst_ogg_mux_queue_pads (GstOggMux * ogg_mux)
if (G_UNLIKELY (!buf)) if (G_UNLIKELY (!buf))
GST_DEBUG_OBJECT (data->pad, "buffer clipped"); GST_DEBUG_OBJECT (data->pad, "buffer clipped");
} }
} else {
GST_DEBUG_OBJECT (data->pad, "EOS on pad");
if (!pad->eos) {
ogg_page page;
/* it's no longer active */
ogg_mux->active_pads--;
/* Just gone to EOS. Flush existing page(s) */
pad->eos = TRUE;
while (ogg_stream_flush (&pad->map.stream, &page)) {
/* Place page into the per-pad queue */
gst_ogg_mux_pad_queue_page (ogg_mux, pad, &page, pad->first_delta);
/* increment the page number counter */
pad->pageno++;
/* mark other pages as delta */
pad->first_delta = TRUE;
}
}
} }
pad->next_buffer = buf; pad->buffer = buf;
} }
/* we should have a buffer now, see if it is the best pad to /* we should have a buffer now, see if it is the best pad to
* pull on */ * pull on */
if (pad->buffer || pad->next_buffer) { if (pad->buffer) {
if (gst_ogg_mux_compare_pads (ogg_mux, bestpad, pad) > 0) { if (gst_ogg_mux_compare_pads (ogg_mux, bestpad, pad) > 0) {
GST_LOG_OBJECT (data->pad, GST_LOG_OBJECT (data->pad,
"new best pad, with buffers %" GST_PTR_FORMAT "new best pad, with buffer %" GST_PTR_FORMAT, pad->buffer);
" and %" GST_PTR_FORMAT, pad->buffer, pad->next_buffer);
bestpad = pad; bestpad = pad;
} }
} else if (!pad->eos) {
GST_LOG_OBJECT (data->pad, "hungry pad");
still_hungry = pad;
} }
} }
if (still_hungry) return bestpad;
/* drop back into collectpads... */
return still_hungry;
else
return bestpad;
} }
static GList * static GList *
@ -1104,8 +1066,7 @@ gst_ogg_mux_get_headers (GstOggPadData * pad)
pad->always_flush_page = TRUE; pad->always_flush_page = TRUE;
} else if (gst_structure_has_name (structure, "video/x-dirac")) { } else if (gst_structure_has_name (structure, "video/x-dirac")) {
res = g_list_append (res, pad->buffer); res = g_list_append (res, pad->buffer);
pad->buffer = pad->next_buffer; pad->buffer = NULL;
pad->next_buffer = NULL;
pad->always_flush_page = TRUE; pad->always_flush_page = TRUE;
} else { } else {
GST_LOG_OBJECT (thepad, "caps don't have streamheader"); GST_LOG_OBJECT (thepad, "caps don't have streamheader");
@ -1189,7 +1150,7 @@ gst_ogg_mux_send_headers (GstOggMux * mux)
GST_LOG_OBJECT (mux, "looking at pad %s:%s", GST_DEBUG_PAD_NAME (thepad)); GST_LOG_OBJECT (mux, "looking at pad %s:%s", GST_DEBUG_PAD_NAME (thepad));
/* if the pad has no buffer, we don't care */ /* if the pad has no buffer, we don't care */
if (pad->buffer == NULL && pad->next_buffer == NULL) if (pad->buffer == NULL)
continue; continue;
/* now figure out the headers */ /* now figure out the headers */
@ -1225,9 +1186,6 @@ gst_ogg_mux_send_headers (GstOggMux * mux)
} else if (pad->buffer) { } else if (pad->buffer) {
buf = pad->buffer; buf = pad->buffer;
gst_buffer_ref (buf); gst_buffer_ref (buf);
} else if (pad->next_buffer) {
buf = pad->next_buffer;
gst_buffer_ref (buf);
} else { } else {
/* fixme -- should be caught in the previous list traversal. */ /* fixme -- should be caught in the previous list traversal. */
GST_OBJECT_LOCK (thepad); GST_OBJECT_LOCK (thepad);
@ -1398,16 +1356,18 @@ gst_ogg_mux_process_best_pad (GstOggMux * ogg_mux, GstOggPadData * best)
gboolean delta_unit; gboolean delta_unit;
gint64 granulepos = 0; gint64 granulepos = 0;
GstClockTime timestamp, gp_time; GstClockTime timestamp, gp_time;
GstBuffer *next_buf;
GST_LOG_OBJECT (ogg_mux, "best pad %" GST_PTR_FORMAT GST_LOG_OBJECT (ogg_mux, "best pad %" GST_PTR_FORMAT
", currently pulling from %" GST_PTR_FORMAT, best->collect.pad, ", currently pulling from %" GST_PTR_FORMAT, best->collect.pad,
ogg_mux->pulling); ogg_mux->pulling);
/* best->buffer is non-NULL, either the pad is EOS's or there is a next next_buf = gst_collect_pads_peek (ogg_mux->collect, &best->collect);
* buffer */ if (next_buf) {
if (best->next_buffer == NULL && !best->eos) { best->eos = FALSE;
GST_WARNING_OBJECT (ogg_mux, "no subsequent buffer and EOS not reached"); gst_buffer_unref (next_buf);
return GST_FLOW_WRONG_STATE; } else {
best->eos = TRUE;
} }
/* if we were already pulling from one pad, but the new "best" buffer is /* if we were already pulling from one pad, but the new "best" buffer is
@ -1512,7 +1472,7 @@ gst_ogg_mux_process_best_pad (GstOggMux * ogg_mux, GstOggPadData * best)
GST_GP_CAST (packet.granulepos), (gint64) packet.packetno, GST_GP_CAST (packet.granulepos), (gint64) packet.packetno,
packet.bytes); packet.bytes);
packet.e_o_s = (pad->eos ? 1 : 0); packet.e_o_s = best->eos ? 1 : 0;
tmpbuf = NULL; tmpbuf = NULL;
/* we flush when we see a new keyframe */ /* we flush when we see a new keyframe */
@ -1694,23 +1654,20 @@ static gboolean
all_pads_eos (GstCollectPads * pads) all_pads_eos (GstCollectPads * pads)
{ {
GSList *walk; GSList *walk;
gboolean alleos = TRUE;
walk = pads->data; walk = pads->data;
while (walk) { while (walk) {
GstBuffer *buf; GstOggPadData *oggpad = (GstOggPadData *) walk->data;
GstCollectData *data = (GstCollectData *) walk->data;
buf = gst_collect_pads_peek (pads, data); GST_DEBUG ("oggpad %p eos %d", oggpad, oggpad->eos);
if (buf) {
alleos = FALSE; if (oggpad->eos == FALSE)
gst_buffer_unref (buf); return FALSE;
goto beach;
} walk = g_slist_next (walk);
walk = walk->next;
} }
beach:
return alleos; return TRUE;
} }
/* This function is called when there is data on all pads. /* This function is called when there is data on all pads.
@ -1728,45 +1685,26 @@ gst_ogg_mux_collected (GstCollectPads * pads, GstOggMux * ogg_mux)
{ {
GstOggPadData *best; GstOggPadData *best;
GstFlowReturn ret; GstFlowReturn ret;
gint activebefore; gboolean popped;
GST_LOG_OBJECT (ogg_mux, "collected"); GST_LOG_OBJECT (ogg_mux, "collected");
activebefore = ogg_mux->active_pads;
/* queue buffers on all pads; find a buffer with the lowest timestamp */ /* queue buffers on all pads; find a buffer with the lowest timestamp */
best = gst_ogg_mux_queue_pads (ogg_mux); best = gst_ogg_mux_queue_pads (ogg_mux, &popped);
if (best && !best->buffer) {
GST_DEBUG_OBJECT (ogg_mux, "No buffer available on best pad");
return GST_FLOW_OK;
}
if (!best) { if (popped)
return GST_FLOW_WRONG_STATE; return GST_FLOW_OK;
if (best == NULL || best->buffer == NULL) {
/* This is not supposed to happen */
return GST_FLOW_ERROR;
} }
ret = gst_ogg_mux_process_best_pad (ogg_mux, best); ret = gst_ogg_mux_process_best_pad (ogg_mux, best);
if (ogg_mux->active_pads < activebefore) { if (best->eos && all_pads_eos (pads)) {
/* If the active pad count went down, this mean at least one pad has gone gst_pad_push_event (ogg_mux->srcpad, gst_event_new_eos ());
* EOS. Since CollectPads only calls _collected() once when all pads are return GST_FLOW_UNEXPECTED;
* EOS, and our code doesn't _pop() from all pads we need to check that by
* peeking on all pads, else we won't be called again and the muxing will
* not terminate (push out EOS). */
/* if all the pads have been removed, flush all pending data */
if ((ret == GST_FLOW_OK) && all_pads_eos (pads)) {
GST_LOG_OBJECT (ogg_mux, "no pads remaining, flushing data");
do {
best = gst_ogg_mux_queue_pads (ogg_mux);
if (best)
ret = gst_ogg_mux_process_best_pad (ogg_mux, best);
} while ((ret == GST_FLOW_OK) && (best != NULL));
GST_DEBUG_OBJECT (ogg_mux, "Pushing EOS");
gst_pad_push_event (ogg_mux->srcpad, gst_event_new_eos ());
}
} }
return ret; return ret;
@ -1870,10 +1808,6 @@ gst_ogg_mux_clear_collectpads (GstCollectPads * collect)
gst_buffer_unref (oggpad->buffer); gst_buffer_unref (oggpad->buffer);
oggpad->buffer = NULL; oggpad->buffer = NULL;
} }
if (oggpad->next_buffer) {
gst_buffer_unref (oggpad->next_buffer);
oggpad->next_buffer = NULL;
}
gst_segment_init (&oggpad->segment, GST_FORMAT_TIME); gst_segment_init (&oggpad->segment, GST_FORMAT_TIME);
} }

View file

@ -55,10 +55,7 @@ typedef struct
GstSegment segment; GstSegment segment;
/* These two buffers make a very simple queue - they enter as 'next_buffer'
* and (usually) leave as 'buffer', except at EOS, when buffer will be NULL */
GstBuffer *buffer; /* the first waiting buffer for the pad */ GstBuffer *buffer; /* the first waiting buffer for the pad */
GstBuffer *next_buffer; /* the second waiting buffer for the pad */
gint64 packetno; /* number of next packet */ gint64 packetno; /* number of next packet */
gint64 pageno; /* number of next page */ gint64 pageno; /* number of next page */