From d2ed932d25b83d907f5728ffcd5c55aaddad925b Mon Sep 17 00:00:00 2001 From: Erik Walthinsen Date: Thu, 20 Sep 2001 20:48:49 +0000 Subject: [PATCH] Totally new bytestream code, works well for non-spannable buffers, there's some refcounting problem still when used w... Original commit message from CVS: Totally new bytestream code, works well for non-spannable buffers, there's some refcounting problem still when used with filesrc (spannable buffers). To test, set the bytesperread or blocksize parameter of disksrc or filesrc (resp), attach it to gstbstest (in this dir) with byte_size and count parameters set, and pipe to fakesink or something. gstbstest will exit(1) after count buffers have been read from the bytestream. disksrc location= bytesperread= ! gstbstest byte_size= count= ! fakesink --- libs/bytestream/Makefile.am | 9 +- libs/bytestream/gstbstest.c | 274 +++++++++++++++++++++++++++++ libs/bytestream/gstbytestream2.c | 288 +++++++++++++++++++++++++++++++ libs/bytestream/gstbytestream2.h | 34 ++++ 4 files changed, 601 insertions(+), 4 deletions(-) create mode 100644 libs/bytestream/gstbstest.c create mode 100644 libs/bytestream/gstbytestream2.c create mode 100644 libs/bytestream/gstbytestream2.h diff --git a/libs/bytestream/Makefile.am b/libs/bytestream/Makefile.am index a858c91a49..9eb695bc33 100644 --- a/libs/bytestream/Makefile.am +++ b/libs/bytestream/Makefile.am @@ -1,10 +1,11 @@ filterdir = $(libdir)/gst -filter_LTLIBRARIES = libgstbytestream.la +filter_LTLIBRARIES = libgstbytestream.la libgstbstest.la -libgstbytestream_la_SOURCES = gstbytestream.c +libgstbytestream_la_SOURCES = gstbytestream.c gstbytestream2.c +libgstbstest_la_SOURCES = gstbstest.c libgstbytestreamincludedir = $(includedir)/gst/libs/bytestream -libgstbytestreaminclude_HEADERS = gstbytestream.h +libgstbytestreaminclude_HEADERS = gstbytestream.h gstbytestream2.h -CFLAGS += -O2 $(FOMIT_FRAME_POINTER) -funroll-all-loops -finline-functions -ffast-math +# CFLAGS += -O2 $(FOMIT_FRAME_POINTER) -funroll-all-loops -finline-functions -ffast-math diff --git a/libs/bytestream/gstbstest.c b/libs/bytestream/gstbstest.c new file mode 100644 index 0000000000..7d7ce44322 --- /dev/null +++ b/libs/bytestream/gstbstest.c @@ -0,0 +1,274 @@ +/* GStreamer + * Copyright (C) 1999,2000 Erik Walthinsen + * 2000 Wim Taymans + * + * gstidentity.c: + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place - Suite 330, + * Boston, MA 02111-1307, USA. + */ + +#include +#include "gstbytestream2.h" + +#define GST_TYPE_IDENTITY \ + (gst_identity_get_type()) +#define GST_IDENTITY(obj) \ + (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_IDENTITY,GstIdentity)) +#define GST_IDENTITY_CLASS(klass) \ + (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_IDENTITY,GstIdentityClass)) +#define GST_IS_IDENTITY(obj) \ + (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_IDENTITY)) +#define GST_IS_IDENTITY_CLASS(obj) \ + (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_IDENTITY)) + +typedef struct _GstIdentity GstIdentity; +typedef struct _GstIdentityClass GstIdentityClass; + +struct _GstIdentity { + GstElement element; + + GstPad *sinkpad; + GstPad *srcpad; + + GstByteStream2 *bs; + gint byte_size; + gint count; +}; + +struct _GstIdentityClass { + GstElementClass parent_class; +}; + +GType gst_identity_get_type(void); + + +GstElementDetails gst_identity_details = { + "ByteStream2Test", + "Filter", + "Test for the GstByteStream2 code", + VERSION, + "Erik Walthinsen ", + "(C) 2001", +}; + + +/* Identity signals and args */ +enum { + /* FILL ME */ + LAST_SIGNAL +}; + +enum { + ARG_0, + ARG_BYTE_SIZE, + ARG_COUNT, +}; + + +static void gst_identity_class_init (GstIdentityClass *klass); +static void gst_identity_init (GstIdentity *identity); + +static void gst_identity_set_property (GObject *object, guint prop_id, const GValue *value, GParamSpec *pspec); +static void gst_identity_get_property (GObject *object, guint prop_id, GValue *value, GParamSpec *pspec); + +static void gst_identity_loop (GstElement *element); + +static GstElementClass *parent_class = NULL; +// static guint gst_identity_signals[LAST_SIGNAL] = { 0 }; + +GType +gst_identity_get_type (void) +{ + static GType identity_type = 0; + + if (!identity_type) { + static const GTypeInfo identity_info = { + sizeof(GstIdentityClass), NULL, + NULL, + (GClassInitFunc)gst_identity_class_init, + NULL, + NULL, + sizeof(GstIdentity), + 0, + (GInstanceInitFunc)gst_identity_init, + }; + identity_type = g_type_register_static (GST_TYPE_ELEMENT, "GstBSTest", &identity_info, 0); + } + return identity_type; +} + +static void +gst_identity_class_init (GstIdentityClass *klass) +{ + GObjectClass *gobject_class; + + gobject_class = (GObjectClass*)klass; + + parent_class = g_type_class_ref (GST_TYPE_ELEMENT); + + g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_BYTE_SIZE, + g_param_spec_uint ("byte_size", "byte_size", "byte_size", + 0, G_MAXUINT, 0, G_PARAM_READWRITE)); + g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_COUNT, + g_param_spec_uint ("count", "count", "count", + 0, G_MAXUINT, 0, G_PARAM_READWRITE)); + + gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_identity_set_property); + gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_identity_get_property); +} + +static GstPadNegotiateReturn +gst_identity_negotiate_src (GstPad *pad, GstCaps **caps, gpointer *data) +{ + GstIdentity *identity; + + identity = GST_IDENTITY (gst_pad_get_parent (pad)); + + return gst_pad_negotiate_proxy (pad, identity->sinkpad, caps); +} + +static GstPadNegotiateReturn +gst_identity_negotiate_sink (GstPad *pad, GstCaps **caps, gpointer *data) +{ + GstIdentity *identity; + + identity = GST_IDENTITY (gst_pad_get_parent (pad)); + + return gst_pad_negotiate_proxy (pad, identity->srcpad, caps); +} + +static void +gst_identity_init (GstIdentity *identity) +{ + identity->sinkpad = gst_pad_new ("sink", GST_PAD_SINK); + gst_element_add_pad (GST_ELEMENT (identity), identity->sinkpad); + gst_pad_set_negotiate_function (identity->sinkpad, gst_identity_negotiate_sink); + + identity->srcpad = gst_pad_new ("src", GST_PAD_SRC); + gst_element_add_pad (GST_ELEMENT (identity), identity->srcpad); + gst_pad_set_negotiate_function (identity->srcpad, gst_identity_negotiate_src); + + gst_element_set_loop_function (GST_ELEMENT (identity), gst_identity_loop); + + identity->byte_size = 384; + identity->count = 5; + + identity->bs = gst_bytestream2_new(identity->sinkpad); +} + +static void +gst_identity_loop (GstElement *element) +{ + GstIdentity *identity; + GstBuffer *buf; + int i; + + g_return_if_fail (element != NULL); + g_return_if_fail (GST_IS_IDENTITY (element)); + + identity = GST_IDENTITY (element); + + do { + g_print("\n"); + + for (i=0;icount;i++) { + g_print("bstest: getting a buffer of %d bytes\n",identity->byte_size); + buf = gst_bytestream2_read(identity->bs,identity->byte_size); + if (!buf) g_print("BUFFER IS BOGUS\n"); + g_print("pushing the buffer, %d bytes at %d\n",GST_BUFFER_SIZE(buf),GST_BUFFER_OFFSET(buf)); + gst_pad_push(identity->srcpad,buf); + g_print("\n"); + } + + exit(1); + } while (!GST_ELEMENT_IS_COTHREAD_STOPPING(element)); +} + +static void +gst_identity_set_property (GObject *object, guint prop_id, const GValue *value, GParamSpec *pspec) +{ + GstIdentity *identity; + + /* it's not null if we got it, but it might not be ours */ + g_return_if_fail (GST_IS_IDENTITY (object)); + + identity = GST_IDENTITY (object); + + switch (prop_id) { + case ARG_BYTE_SIZE: + identity->byte_size = g_value_get_uint (value); + break; + case ARG_COUNT: + identity->count = g_value_get_uint (value); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static void gst_identity_get_property(GObject *object, guint prop_id, GValue *value, GParamSpec *pspec) { + GstIdentity *identity; + + /* it's not null if we got it, but it might not be ours */ + g_return_if_fail (GST_IS_IDENTITY (object)); + + identity = GST_IDENTITY (object); + + switch (prop_id) { + case ARG_BYTE_SIZE: + g_value_set_uint (value, identity->byte_size); + break; + case ARG_COUNT: + g_value_set_uint (value, identity->count); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static gboolean +plugin_init (GModule *module, GstPlugin *plugin) +{ + GstElementFactory *factory; + + // we need gstbytestream + if (!gst_library_load ("gstbytestream")) { + g_print("can't load bytestream\n"); + return FALSE; + } + + /* We need to create an ElementFactory for each element we provide. + * This consists of the name of the element, the GType identifier, + * and a pointer to the details structure at the top of the file. + */ + factory = gst_elementfactory_new("gstbstest", GST_TYPE_IDENTITY, &gst_identity_details); + g_return_val_if_fail(factory != NULL, FALSE); + + /* The very last thing is to register the elementfactory with the plugin. */ + gst_plugin_add_feature (plugin, GST_PLUGIN_FEATURE (factory)); + + return TRUE; +} + +GstPluginDesc plugin_desc = { + GST_VERSION_MAJOR, + GST_VERSION_MINOR, + "gstbstest", + plugin_init +}; + diff --git a/libs/bytestream/gstbytestream2.c b/libs/bytestream/gstbytestream2.c new file mode 100644 index 0000000000..13bab3cd66 --- /dev/null +++ b/libs/bytestream/gstbytestream2.c @@ -0,0 +1,288 @@ +#include +#include +#include + +#include +#include "gstbytestream2.h" + +static void gst_bytestream2_print_status(GstByteStream2 *bs); +guint8 *gst_bytestream2_assemble(GstByteStream2 *bs, guint32 len); + +/** + * gst_bytestream_new: + * @pad: the pad to attach the bytestream to + * + * creates a bytestream from the given pad + * + * Returns: a new #GstByteStream object + */ +GstByteStream2* +gst_bytestream2_new (GstPad *pad) +{ + GstByteStream2 *bs = g_new (GstByteStream2, 1); + + bs->pad = pad; + bs->flushptr = 0LL; + bs->size = 0LL; + + bs->curbuf = NULL; + bs->curbufavail = 0; + + bs->buflist = NULL; + bs->listcount = 0; + bs->listavail = 0; + + return bs; +} + +// 0 ..... ---------|----|---.......---|----------- ..... N +// f +// ^tail ^head +// cba +// \------la-------/ +// \ ..... -----------size-------------/ + +// get the next buffer +// if the buffer can be merged with the head buffer, do so +// else add it onto the head of the +static gboolean +gst_bytestream2_get_next_buf(GstByteStream2 *bs) { + GstBuffer *nextbuf, *lastbuf; + GSList *end; + + g_print("get_next_buf: pulling buffer\n"); + nextbuf = gst_pad_pull(bs->pad); + g_print("get_next_buf: got buffer of %d bytes\n",GST_BUFFER_SIZE(nextbuf)); + + // first check to see if there's a curbuf + if (bs->curbuf == NULL) { + g_print("get_next_buf: no curbuf, filling\n"); + // there isn't, let's fill it + bs->curbuf = nextbuf; + bs->curbufavail = GST_BUFFER_SIZE(nextbuf); + + } else { + // there is, first check to see if there's a list of buffers at all + if (bs->buflist) { + g_print("gst_next_buf: there's a buflist, search for the end\n"); + // now find the end of the list + end = g_slist_last(bs->buflist); + // get the buffer that's there + lastbuf = GST_BUFFER(end->data); + + // see if we can marge cheaply + if (gst_buffer_is_span_fast(lastbuf,nextbuf)) { + g_print("get_next_buf: merging new buffer with last buf on list\n"); + // it is, let's merge them + end->data = gst_buffer_merge(lastbuf,nextbuf); + // add to the length of the list, but not buffer count + bs->listavail += GST_BUFFER_SIZE(nextbuf); + // we can ditch the nextbuf then + gst_buffer_unref(nextbuf); + + // if we can't, we just append this buffer + } else { + g_print("get_next_buf: adding new buffer to the end of the list\n"); + end = g_slist_append(end,nextbuf); + // also need to increment length of list and buffer count + bs->listcount++; + bs->listavail += GST_BUFFER_SIZE(nextbuf); + } + + // if there are no buffers in the list + } else { + g_print("get_next_buf: buflist is empty\n"); + // first see if we can merge with curbuf + if (gst_buffer_is_span_fast(bs->curbuf,nextbuf)) { + g_print("get_next_buf: merging new buffer with curbuf\n"); + // it is, merge them + bs->curbuf = gst_buffer_merge(bs->curbuf,nextbuf); + // add to the length of curbuf that's available + bs->curbufavail += GST_BUFFER_SIZE(nextbuf); + // we can unref nextbuf now + gst_buffer_unref(nextbuf); + + // instead we tack this onto the (empty) list + } else { + g_print("get_next_buf: adding new buffer to list\n"); + // put this on the end of the list + bs->buflist = g_slist_append(bs->buflist,nextbuf); + // and increment the number of bytes in the list + bs->listcount++; + bs->listavail += GST_BUFFER_SIZE(nextbuf); + } + } + } + + return TRUE; +} + + +static gboolean +gst_bytestream2_fill_bytes(GstByteStream2 *bs, guint32 len) { +// GSList *walk; +// GstBuffer *buf; + + // as long as we don't have enough, we get more buffers + while ((bs->curbufavail + bs->listavail) < len) { + g_print("fill_bytes: there are %d bytes in curbuf and %d in the list, we need %d\n",bs->curbufavail,bs->listavail,len); + gst_bytestream2_get_next_buf(bs); + } + + return TRUE; +} + + +GstBuffer * +gst_bytestream2_peek (GstByteStream2 *bs, guint32 len) { + GstBuffer *retbuf = NULL; + + g_return_val_if_fail(bs != NULL, NULL); + g_return_val_if_fail(len > 0, NULL); + + g_print("peek: asking for %d bytes\n",len); + + // make sure we have enough + g_print("peek: there are %d in curbuf and %d in the list\n",bs->curbufavail,bs->listavail); + if (len > bs->listavail) { + gst_bytestream2_fill_bytes(bs,len); + g_print("peek: there are now %d in curbuf and %d in the list\n",bs->curbufavail,bs->listavail); + } + + // if the requested bytes are in the current buffer + g_print("peek: curbufavail is %d\n",bs->curbufavail); + if (len <= bs->curbufavail) { + g_print("peek: there are enough bytes in curbuf (need %d, have %d)\n",len,bs->curbufavail); + // create a sub-buffer of the curbuf + retbuf = gst_buffer_create_sub(bs->curbuf, GST_BUFFER_SIZE(bs->curbuf) - bs->curbufavail, len); + + // otherwise we need to figure out how to assemble one + } else { + g_print("peek: current buffer is not big enough for len %d\n",len); + + retbuf = gst_buffer_new(); + GST_BUFFER_SIZE(retbuf) = len; + GST_BUFFER_DATA(retbuf) = gst_bytestream2_assemble(bs,len); + GST_BUFFER_OFFSET(retbuf) = GST_BUFFER_OFFSET(bs->curbuf) + (GST_BUFFER_SIZE(bs->curbuf) - bs->curbufavail); + } + + return retbuf; +} + +guint8 * +gst_bytestream2_assemble(GstByteStream2 *bs, guint32 len) +{ + guint8 *data = g_malloc(len); + GSList *walk; + guint32 copied = 0; + GstBuffer *buf; + + // copy the data from the curbuf + g_print("copying %d bytes from curbuf at %d to *data\n",bs->curbufavail, + GST_BUFFER_SIZE(bs->curbuf) - bs->curbufavail); + memcpy(data,GST_BUFFER_DATA(bs->curbuf) + GST_BUFFER_SIZE(bs->curbuf) - bs->curbufavail, + bs->curbufavail); + copied += bs->curbufavail; + + // asumption is made that the buffers all exist in the list + walk = bs->buflist; + while (copied < len) { + buf = GST_BUFFER(walk->data); + if (GST_BUFFER_SIZE(buf) < (len-copied)) { + g_print("coping %d bytes from buf to output offset %d\n",GST_BUFFER_SIZE(buf),copied); + memcpy(data+copied,GST_BUFFER_DATA(buf),GST_BUFFER_SIZE(buf)); + copied += GST_BUFFER_SIZE(buf); + } else { + g_print("coping %d bytes from buf to output offset %d\n",len-copied,copied); + memcpy(data+copied,GST_BUFFER_DATA(buf),len-copied); + copied = len; + } + walk = g_slist_next(walk); + } + + return data; +} + +gboolean +gst_bytestream2_flush(GstByteStream2 *bs, guint32 len) +{ + GSList *walk; + + g_print("flush: flushing %d bytes\n",len); + + // if the flush is totally in the curbuf, we can just trim those bytes + // note that if len == curbufavail, this doesn't trigger because we must refill curbuf + if (len < bs->curbufavail) { + g_print("trimming %d bytes from curbuf[avail]\n",len); + bs->curbufavail -= len; + + // otherwise we have to flush at least one full buffer + } else { + // we can unref the curbuf and trim that many bytes off + gst_buffer_unref(bs->curbuf); + len -= bs->curbufavail; + g_print("unreffed curbuf, leaving %d bytes still to flush \n",len); + + // repeat until we've flushed enough data + walk = bs->buflist; + do { + g_print("flush: there are %d in curbuf and %d in the list\n",bs->curbufavail,bs->listavail); + // if the list is empty, so is curbuf + if (bs->buflist == NULL) { + g_print("buffer list is totally empty, pulling a new buffer\n"); + gst_bytestream2_get_next_buf(bs); + // else we can move a buffer down into curbuf + } else { + g_print("still some buffers in the list, retrieving from there\n"); + // retrieve the next buffer + bs->curbuf = GST_BUFFER(bs->buflist->data); + bs->curbufavail = GST_BUFFER_SIZE(bs->curbuf); + // pull it off the list + bs->buflist = g_slist_delete_link(bs->buflist,bs->buflist); + bs->listavail -= GST_BUFFER_SIZE(bs->curbuf); + } + g_print("next buffer in list is at offset %d, is %d bytes long\n",GST_BUFFER_OFFSET(bs->curbuf), +GST_BUFFER_SIZE(bs->curbuf)); + + // figure out how much of it (if any) is left + if (len < GST_BUFFER_SIZE(bs->curbuf)) { + g_print("removing first %d bytes from the new curbuf\n",len); + // the buffer is bigger than the remaining bytes to be flushed + bs->curbufavail = GST_BUFFER_SIZE(bs->curbuf) - len; + len = 0; + } else { + g_print("buffer is totally contained in flush region, unreffing\n"); + // the buffer is only part of the total, unref it + len -= GST_BUFFER_SIZE(bs->curbuf); + gst_buffer_unref(bs->curbuf); + bs->curbuf = NULL; + bs->curbufavail = 0; + } + } while ((len > 0) || (bs->curbuf == NULL)); + } +} + +GstBuffer * +gst_bytestream2_read(GstByteStream2 *bs, guint32 len) +{ + GstBuffer *buf = gst_bytestream2_peek(bs,len); + gst_bytestream2_flush(bs,len); + return buf; +} + +static void +gst_bytestream2_print_status(GstByteStream2 *bs) { + GSList *walk; + GstBuffer *buf; + + g_print("flush pointer is at %d\n",bs->flushptr); + + g_print("list has %d bytes available\n",bs->listavail); + walk = bs->buflist; + while (walk) { + buf = GST_BUFFER(walk->data); + walk = g_slist_next(walk); + + g_print("buffer starts at %d and is %d bytes long\n",GST_BUFFER_OFFSET(buf),GST_BUFFER_SIZE(buf)); + } +} diff --git a/libs/bytestream/gstbytestream2.h b/libs/bytestream/gstbytestream2.h new file mode 100644 index 0000000000..cb32e3ef5e --- /dev/null +++ b/libs/bytestream/gstbytestream2.h @@ -0,0 +1,34 @@ +#ifndef __GST_BYTESTREAM2_H__ +#define __GST_BYTESTREAM2_H__ + +#include + +#ifdef __cplusplus +extern "C" { +#endif /* __cplusplus */ + +typedef struct _GstByteStream2 GstByteStream2; + +struct _GstByteStream2 { + GstPad *pad; + + guint64 readptr; + guint64 flushptr; + guint64 size; + + GstBuffer *curbuf; + guint32 curbufavail; + + GSList *buflist; + gint listcount; + guint32 listavail; +}; + +GstByteStream2 * gst_bytestream2_new (GstPad *pad); + +GstBuffer * gst_bytestream2_read (GstByteStream2 *bs, guint32 len); +GstBuffer * gst_bytestream2_peek (GstByteStream2 *bs, guint32 len); +gboolean gst_bytestream2_flush (GstByteStream2 *bs, guint32 len); +guint8 * gst_bytestream2_peek_bytes (GstByteStream2 *bs, guint32 len); + +#endif /* __GST_BYTESTREAM2_H__ */