diff --git a/libs/bytestream/gstbytestream2.c b/libs/bytestream/gstbytestream2.c index 13bab3cd66..d3887b8b46 100644 --- a/libs/bytestream/gstbytestream2.c +++ b/libs/bytestream/gstbytestream2.c @@ -5,7 +5,10 @@ #include #include "gstbytestream2.h" -static void gst_bytestream2_print_status(GstByteStream2 *bs); +//#define bs_print(format,args...) g_print(format, ## args) +#define bs_print(format,args...) + +//static void gst_bytestream2_print_status(GstByteStream2 *bs); guint8 *gst_bytestream2_assemble(GstByteStream2 *bs, guint32 len); /** @@ -22,25 +25,44 @@ gst_bytestream2_new (GstPad *pad) GstByteStream2 *bs = g_new (GstByteStream2, 1); bs->pad = pad; - bs->flushptr = 0LL; - bs->size = 0LL; - - bs->curbuf = NULL; - bs->curbufavail = 0; bs->buflist = NULL; - bs->listcount = 0; + bs->headbufavail = 0; bs->listavail = 0; return bs; } -// 0 ..... ---------|----|---.......---|----------- ..... N -// f -// ^tail ^head -// cba -// \------la-------/ -// \ ..... -----------size-------------/ + +// HOW THIS WORKS: +// +// The fundamental structure is a singly-linked list of buffers. The +// buffer on the front is the oldest, and thus the first to read data +// from. The number of bytes left to be read in this buffer is stored +// in bs->headbufavail. The number of bytes available in the entire +// list (including the head buffer) is in bs->listavail. +// +// When a request is made for data (peek), _fill_bytes is called with +// the number of bytes needed, but only if the listavail indicates +// that there aren't already enough. This calls _get_next_buf until +// the listavail is sufficient to satisfy the demand. +// +// _get_next_buf pulls a buffer from the pad the bytestream is attached +// to, and shoves it in the list. There are actually two things it can +// do. If there's already a buffer in the list, and the _is_span_fast() +// test returns true, it will merge it with that last buffer. Otherwise +// it will simply tack it onto the end of the list. +// +// The _peek itself first checks the simple case of the request fitting +// within the head buffer, and if so creates a subbuffer and returns. +// Otherwise, it creates a new buffer and allocates space for the request +// and calls _assemble to fill it. We know we have to copy because this +// case only happens when the _merge wasn't feasible during _get_next_buf. +// +// The _flush method repeatedly inspects the head buffer and flushes as +// much data from it as it needs to, up to the size of the buffer. If +// the flush decimates the buffer, it's stripped, unref'd, and removed. + // get the next buffer // if the buffer can be merged with the head buffer, do so @@ -50,68 +72,51 @@ gst_bytestream2_get_next_buf(GstByteStream2 *bs) { GstBuffer *nextbuf, *lastbuf; GSList *end; - g_print("get_next_buf: pulling buffer\n"); + bs_print("get_next_buf: pulling buffer\n"); nextbuf = gst_pad_pull(bs->pad); - g_print("get_next_buf: got buffer of %d bytes\n",GST_BUFFER_SIZE(nextbuf)); + bs_print("get_next_buf: got buffer of %d bytes\n",GST_BUFFER_SIZE(nextbuf)); - // first check to see if there's a curbuf - if (bs->curbuf == NULL) { - g_print("get_next_buf: no curbuf, filling\n"); - // there isn't, let's fill it - bs->curbuf = nextbuf; - bs->curbufavail = GST_BUFFER_SIZE(nextbuf); + // first see if there are any buffers in the list at all + if (bs->buflist) { + bs_print("gst_next_buf: there is at least one buffer in the list\n"); + // now find the end of the list + end = g_slist_last(bs->buflist); + // get the buffer that's there + lastbuf = GST_BUFFER(end->data); - } else { - // there is, first check to see if there's a list of buffers at all - if (bs->buflist) { - g_print("gst_next_buf: there's a buflist, search for the end\n"); - // now find the end of the list - end = g_slist_last(bs->buflist); - // get the buffer that's there - lastbuf = GST_BUFFER(end->data); + // see if we can marge cheaply + if (gst_buffer_is_span_fast(lastbuf,nextbuf)) { + bs_print("get_next_buf: merging new buffer with last buf on list\n"); + // it is, let's merge them (this is really an append, but...) + end->data = gst_buffer_merge(lastbuf,nextbuf); + // add to the length of the list + bs->listavail += GST_BUFFER_SIZE(nextbuf); - // see if we can marge cheaply - if (gst_buffer_is_span_fast(lastbuf,nextbuf)) { - g_print("get_next_buf: merging new buffer with last buf on list\n"); - // it is, let's merge them - end->data = gst_buffer_merge(lastbuf,nextbuf); - // add to the length of the list, but not buffer count - bs->listavail += GST_BUFFER_SIZE(nextbuf); - // we can ditch the nextbuf then - gst_buffer_unref(nextbuf); - - // if we can't, we just append this buffer - } else { - g_print("get_next_buf: adding new buffer to the end of the list\n"); - end = g_slist_append(end,nextbuf); - // also need to increment length of list and buffer count - bs->listcount++; - bs->listavail += GST_BUFFER_SIZE(nextbuf); + // have to check to see if we merged with the head buffer + if (end == bs->buflist) { + bs->headbufavail += GST_BUFFER_SIZE(nextbuf); } - // if there are no buffers in the list + // we can ditch the nextbuf then + gst_buffer_unref(nextbuf); + + // if we can't, we just append this buffer } else { - g_print("get_next_buf: buflist is empty\n"); - // first see if we can merge with curbuf - if (gst_buffer_is_span_fast(bs->curbuf,nextbuf)) { - g_print("get_next_buf: merging new buffer with curbuf\n"); - // it is, merge them - bs->curbuf = gst_buffer_merge(bs->curbuf,nextbuf); - // add to the length of curbuf that's available - bs->curbufavail += GST_BUFFER_SIZE(nextbuf); - // we can unref nextbuf now - gst_buffer_unref(nextbuf); - - // instead we tack this onto the (empty) list - } else { - g_print("get_next_buf: adding new buffer to list\n"); - // put this on the end of the list - bs->buflist = g_slist_append(bs->buflist,nextbuf); - // and increment the number of bytes in the list - bs->listcount++; - bs->listavail += GST_BUFFER_SIZE(nextbuf); - } + bs_print("get_next_buf: adding new buffer to the end of the list\n"); + end = g_slist_append(end,nextbuf); + // also need to increment length of list and buffer count + bs->listavail += GST_BUFFER_SIZE(nextbuf); } + + // if there are no buffers in the list + } else { + bs_print("get_next_buf: buflist is empty, adding new buffer to list\n"); + // put this on the end of the list + bs->buflist = g_slist_append(bs->buflist,nextbuf); + // and increment the number of bytes in the list + bs->listavail = GST_BUFFER_SIZE(nextbuf); + // set the head buffer avail to the size + bs->headbufavail = GST_BUFFER_SIZE(nextbuf); } return TRUE; @@ -120,12 +125,9 @@ gst_bytestream2_get_next_buf(GstByteStream2 *bs) { static gboolean gst_bytestream2_fill_bytes(GstByteStream2 *bs, guint32 len) { -// GSList *walk; -// GstBuffer *buf; - // as long as we don't have enough, we get more buffers - while ((bs->curbufavail + bs->listavail) < len) { - g_print("fill_bytes: there are %d bytes in curbuf and %d in the list, we need %d\n",bs->curbufavail,bs->listavail,len); + while (bs->listavail < len) { + bs_print("fill_bytes: there are %d bytes in the list, we need %d\n",bs->listavail,len); gst_bytestream2_get_next_buf(bs); } @@ -135,35 +137,40 @@ gst_bytestream2_fill_bytes(GstByteStream2 *bs, guint32 len) { GstBuffer * gst_bytestream2_peek (GstByteStream2 *bs, guint32 len) { - GstBuffer *retbuf = NULL; + GstBuffer *headbuf, *retbuf = NULL; g_return_val_if_fail(bs != NULL, NULL); g_return_val_if_fail(len > 0, NULL); - g_print("peek: asking for %d bytes\n",len); + bs_print("peek: asking for %d bytes\n",len); // make sure we have enough - g_print("peek: there are %d in curbuf and %d in the list\n",bs->curbufavail,bs->listavail); + bs_print("peek: there are %d bytes in the list\n",bs->listavail); if (len > bs->listavail) { gst_bytestream2_fill_bytes(bs,len); - g_print("peek: there are now %d in curbuf and %d in the list\n",bs->curbufavail,bs->listavail); + bs_print("peek: there are now %d bytes in the list\n",bs->listavail); } + gst_bytestream2_print_status(bs); + + // extract the head buffer + headbuf = GST_BUFFER(bs->buflist->data); // if the requested bytes are in the current buffer - g_print("peek: curbufavail is %d\n",bs->curbufavail); - if (len <= bs->curbufavail) { - g_print("peek: there are enough bytes in curbuf (need %d, have %d)\n",len,bs->curbufavail); - // create a sub-buffer of the curbuf - retbuf = gst_buffer_create_sub(bs->curbuf, GST_BUFFER_SIZE(bs->curbuf) - bs->curbufavail, len); + bs_print("peek: headbufavail is %d\n",bs->headbufavail); + if (len <= bs->headbufavail) { + bs_print("peek: there are enough bytes in headbuf (need %d, have %d)\n",len,bs->headbufavail); + // create a sub-buffer of the headbuf + retbuf = gst_buffer_create_sub(headbuf, GST_BUFFER_SIZE(headbuf) - bs->headbufavail, len); // otherwise we need to figure out how to assemble one } else { - g_print("peek: current buffer is not big enough for len %d\n",len); + bs_print("peek: current buffer is not big enough for len %d\n",len); retbuf = gst_buffer_new(); GST_BUFFER_SIZE(retbuf) = len; GST_BUFFER_DATA(retbuf) = gst_bytestream2_assemble(bs,len); - GST_BUFFER_OFFSET(retbuf) = GST_BUFFER_OFFSET(bs->curbuf) + (GST_BUFFER_SIZE(bs->curbuf) - bs->curbufavail); + if (GST_BUFFER_OFFSET(headbuf) != -1) + GST_BUFFER_OFFSET(retbuf) = GST_BUFFER_OFFSET(headbuf) + (GST_BUFFER_SIZE(headbuf) - bs->headbufavail); } return retbuf; @@ -178,22 +185,23 @@ gst_bytestream2_assemble(GstByteStream2 *bs, guint32 len) GstBuffer *buf; // copy the data from the curbuf - g_print("copying %d bytes from curbuf at %d to *data\n",bs->curbufavail, - GST_BUFFER_SIZE(bs->curbuf) - bs->curbufavail); - memcpy(data,GST_BUFFER_DATA(bs->curbuf) + GST_BUFFER_SIZE(bs->curbuf) - bs->curbufavail, - bs->curbufavail); - copied += bs->curbufavail; + buf = GST_BUFFER(bs->buflist->data); + bs_print("assemble: copying %d bytes from curbuf at %d to *data\n",bs->headbufavail, + GST_BUFFER_SIZE(buf) - bs->headbufavail); + memcpy(data,GST_BUFFER_DATA(buf) + GST_BUFFER_SIZE(buf) - bs->headbufavail, + bs->headbufavail); + copied += bs->headbufavail; // asumption is made that the buffers all exist in the list - walk = bs->buflist; + walk = g_slist_next(bs->buflist); while (copied < len) { buf = GST_BUFFER(walk->data); if (GST_BUFFER_SIZE(buf) < (len-copied)) { - g_print("coping %d bytes from buf to output offset %d\n",GST_BUFFER_SIZE(buf),copied); + bs_print("assemble: copying %d bytes from buf to output offset %d\n",GST_BUFFER_SIZE(buf),copied); memcpy(data+copied,GST_BUFFER_DATA(buf),GST_BUFFER_SIZE(buf)); copied += GST_BUFFER_SIZE(buf); } else { - g_print("coping %d bytes from buf to output offset %d\n",len-copied,copied); + bs_print("assemble: copying %d bytes from buf to output offset %d\n",len-copied,copied); memcpy(data+copied,GST_BUFFER_DATA(buf),len-copied); copied = len; } @@ -206,61 +214,55 @@ gst_bytestream2_assemble(GstByteStream2 *bs, guint32 len) gboolean gst_bytestream2_flush(GstByteStream2 *bs, guint32 len) { - GSList *walk; + GstBuffer *headbuf; - g_print("flush: flushing %d bytes\n",len); + bs_print("flush: flushing %d bytes\n",len); - // if the flush is totally in the curbuf, we can just trim those bytes - // note that if len == curbufavail, this doesn't trigger because we must refill curbuf - if (len < bs->curbufavail) { - g_print("trimming %d bytes from curbuf[avail]\n",len); - bs->curbufavail -= len; - - // otherwise we have to flush at least one full buffer - } else { - // we can unref the curbuf and trim that many bytes off - gst_buffer_unref(bs->curbuf); - len -= bs->curbufavail; - g_print("unreffed curbuf, leaving %d bytes still to flush \n",len); - - // repeat until we've flushed enough data - walk = bs->buflist; - do { - g_print("flush: there are %d in curbuf and %d in the list\n",bs->curbufavail,bs->listavail); - // if the list is empty, so is curbuf - if (bs->buflist == NULL) { - g_print("buffer list is totally empty, pulling a new buffer\n"); - gst_bytestream2_get_next_buf(bs); - // else we can move a buffer down into curbuf - } else { - g_print("still some buffers in the list, retrieving from there\n"); - // retrieve the next buffer - bs->curbuf = GST_BUFFER(bs->buflist->data); - bs->curbufavail = GST_BUFFER_SIZE(bs->curbuf); - // pull it off the list - bs->buflist = g_slist_delete_link(bs->buflist,bs->buflist); - bs->listavail -= GST_BUFFER_SIZE(bs->curbuf); - } - g_print("next buffer in list is at offset %d, is %d bytes long\n",GST_BUFFER_OFFSET(bs->curbuf), -GST_BUFFER_SIZE(bs->curbuf)); - - // figure out how much of it (if any) is left - if (len < GST_BUFFER_SIZE(bs->curbuf)) { - g_print("removing first %d bytes from the new curbuf\n",len); - // the buffer is bigger than the remaining bytes to be flushed - bs->curbufavail = GST_BUFFER_SIZE(bs->curbuf) - len; - len = 0; - } else { - g_print("buffer is totally contained in flush region, unreffing\n"); - // the buffer is only part of the total, unref it - len -= GST_BUFFER_SIZE(bs->curbuf); - gst_buffer_unref(bs->curbuf); - bs->curbuf = NULL; - bs->curbufavail = 0; - } - } while ((len > 0) || (bs->curbuf == NULL)); + // make sure we have enough + bs_print("flush: there are %d bytes in the list\n",bs->listavail); + if (len > bs->listavail) { + gst_bytestream2_fill_bytes(bs,len); + bs_print("flush: there are now %d bytes in the list\n",bs->listavail); } -} + + // repeat until we've flushed enough data + while (len > 0) { + headbuf = GST_BUFFER(bs->buflist->data); + + bs_print("flush: analyzing buffer that's %d bytes long, offset %d\n",GST_BUFFER_SIZE(headbuf),GST_BUFFER_OFFSET(headbuf)); + + // if there's enough to complete the flush + if (bs->headbufavail > len) { + // just trim it off + bs_print("flush: trimming %d bytes off end of headbuf\n",len); + bs->headbufavail -= len; + bs->listavail -= len; + len = 0; + + // otherwise we have to trim the whole buffer + } else { + bs_print("flush: removing head buffer completely\n"); + // remove it from the list + bs->buflist = g_slist_delete_link(bs->buflist,bs->buflist); + // trim it from the avail size + bs->listavail -= bs->headbufavail; + // record that we've trimmed this many bytes + len -= bs->headbufavail; + // unref it + gst_buffer_unref(headbuf); + + // record the new headbufavail + if (bs->buflist) { + bs->headbufavail = GST_BUFFER_SIZE(GST_BUFFER(bs->buflist->data)); + bs_print("flush: next headbuf is %d bytes\n",bs->headbufavail); + } else { + bs_print("flush: no more bytes at all\n"); + } + } + + bs_print("flush: bottom of while(), len is now %d\n",len); + } +} GstBuffer * gst_bytestream2_read(GstByteStream2 *bs, guint32 len) @@ -270,19 +272,18 @@ gst_bytestream2_read(GstByteStream2 *bs, guint32 len) return buf; } -static void +void gst_bytestream2_print_status(GstByteStream2 *bs) { GSList *walk; GstBuffer *buf; - g_print("flush pointer is at %d\n",bs->flushptr); - - g_print("list has %d bytes available\n",bs->listavail); + bs_print("STATUS: head buffer has %d bytes available\n",bs->headbufavail); + bs_print("STATUS: list has %d bytes available\n",bs->listavail); walk = bs->buflist; while (walk) { buf = GST_BUFFER(walk->data); walk = g_slist_next(walk); - g_print("buffer starts at %d and is %d bytes long\n",GST_BUFFER_OFFSET(buf),GST_BUFFER_SIZE(buf)); + bs_print("STATUS: buffer starts at %d and is %d bytes long\n",GST_BUFFER_OFFSET(buf),GST_BUFFER_SIZE(buf)); } } diff --git a/libs/bytestream/gstbytestream2.h b/libs/bytestream/gstbytestream2.h index cb32e3ef5e..11fcaa135d 100644 --- a/libs/bytestream/gstbytestream2.h +++ b/libs/bytestream/gstbytestream2.h @@ -12,15 +12,8 @@ typedef struct _GstByteStream2 GstByteStream2; struct _GstByteStream2 { GstPad *pad; - guint64 readptr; - guint64 flushptr; - guint64 size; - - GstBuffer *curbuf; - guint32 curbufavail; - GSList *buflist; - gint listcount; + guint32 headbufavail; guint32 listavail; }; @@ -30,5 +23,6 @@ GstBuffer * gst_bytestream2_read (GstByteStream2 *bs, guint32 len); GstBuffer * gst_bytestream2_peek (GstByteStream2 *bs, guint32 len); gboolean gst_bytestream2_flush (GstByteStream2 *bs, guint32 len); guint8 * gst_bytestream2_peek_bytes (GstByteStream2 *bs, guint32 len); +void gst_bytestream2_print_status(GstByteStream2 *bs); #endif /* __GST_BYTESTREAM2_H__ */