Totally new bytestream code, works well for non-spannable buffers, there's some refcounting problem still when used w...

Original commit message from CVS:
Totally new bytestream code, works well for non-spannable buffers, there's
some refcounting problem still when used with filesrc (spannable buffers).
To test, set the bytesperread or blocksize parameter of disksrc or filesrc
(resp), attach it to gstbstest (in this dir) with byte_size and count
parameters set, and pipe to fakesink or something.  gstbstest will exit(1)
after count buffers have been read from the bytestream.

disksrc location= bytesperread= ! gstbstest byte_size= count= ! fakesink
This commit is contained in:
Erik Walthinsen 2001-09-20 20:48:49 +00:00
parent d1ba7c212d
commit d2ed932d25
4 changed files with 601 additions and 4 deletions

View file

@ -1,10 +1,11 @@
filterdir = $(libdir)/gst
filter_LTLIBRARIES = libgstbytestream.la
filter_LTLIBRARIES = libgstbytestream.la libgstbstest.la
libgstbytestream_la_SOURCES = gstbytestream.c
libgstbytestream_la_SOURCES = gstbytestream.c gstbytestream2.c
libgstbstest_la_SOURCES = gstbstest.c
libgstbytestreamincludedir = $(includedir)/gst/libs/bytestream
libgstbytestreaminclude_HEADERS = gstbytestream.h
libgstbytestreaminclude_HEADERS = gstbytestream.h gstbytestream2.h
CFLAGS += -O2 $(FOMIT_FRAME_POINTER) -funroll-all-loops -finline-functions -ffast-math
# CFLAGS += -O2 $(FOMIT_FRAME_POINTER) -funroll-all-loops -finline-functions -ffast-math

274
libs/bytestream/gstbstest.c Normal file
View file

@ -0,0 +1,274 @@
/* GStreamer
* Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
* 2000 Wim Taymans <wtay@chello.be>
*
* gstidentity.c:
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 59 Temple Place - Suite 330,
* Boston, MA 02111-1307, USA.
*/
#include <gst/gst.h>
#include "gstbytestream2.h"
#define GST_TYPE_IDENTITY \
(gst_identity_get_type())
#define GST_IDENTITY(obj) \
(G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_IDENTITY,GstIdentity))
#define GST_IDENTITY_CLASS(klass) \
(G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_IDENTITY,GstIdentityClass))
#define GST_IS_IDENTITY(obj) \
(G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_IDENTITY))
#define GST_IS_IDENTITY_CLASS(obj) \
(G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_IDENTITY))
typedef struct _GstIdentity GstIdentity;
typedef struct _GstIdentityClass GstIdentityClass;
struct _GstIdentity {
GstElement element;
GstPad *sinkpad;
GstPad *srcpad;
GstByteStream2 *bs;
gint byte_size;
gint count;
};
struct _GstIdentityClass {
GstElementClass parent_class;
};
GType gst_identity_get_type(void);
GstElementDetails gst_identity_details = {
"ByteStream2Test",
"Filter",
"Test for the GstByteStream2 code",
VERSION,
"Erik Walthinsen <omega@temple-baptist.com>",
"(C) 2001",
};
/* Identity signals and args */
enum {
/* FILL ME */
LAST_SIGNAL
};
enum {
ARG_0,
ARG_BYTE_SIZE,
ARG_COUNT,
};
static void gst_identity_class_init (GstIdentityClass *klass);
static void gst_identity_init (GstIdentity *identity);
static void gst_identity_set_property (GObject *object, guint prop_id, const GValue *value, GParamSpec *pspec);
static void gst_identity_get_property (GObject *object, guint prop_id, GValue *value, GParamSpec *pspec);
static void gst_identity_loop (GstElement *element);
static GstElementClass *parent_class = NULL;
// static guint gst_identity_signals[LAST_SIGNAL] = { 0 };
GType
gst_identity_get_type (void)
{
static GType identity_type = 0;
if (!identity_type) {
static const GTypeInfo identity_info = {
sizeof(GstIdentityClass), NULL,
NULL,
(GClassInitFunc)gst_identity_class_init,
NULL,
NULL,
sizeof(GstIdentity),
0,
(GInstanceInitFunc)gst_identity_init,
};
identity_type = g_type_register_static (GST_TYPE_ELEMENT, "GstBSTest", &identity_info, 0);
}
return identity_type;
}
static void
gst_identity_class_init (GstIdentityClass *klass)
{
GObjectClass *gobject_class;
gobject_class = (GObjectClass*)klass;
parent_class = g_type_class_ref (GST_TYPE_ELEMENT);
g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_BYTE_SIZE,
g_param_spec_uint ("byte_size", "byte_size", "byte_size",
0, G_MAXUINT, 0, G_PARAM_READWRITE));
g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_COUNT,
g_param_spec_uint ("count", "count", "count",
0, G_MAXUINT, 0, G_PARAM_READWRITE));
gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_identity_set_property);
gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_identity_get_property);
}
static GstPadNegotiateReturn
gst_identity_negotiate_src (GstPad *pad, GstCaps **caps, gpointer *data)
{
GstIdentity *identity;
identity = GST_IDENTITY (gst_pad_get_parent (pad));
return gst_pad_negotiate_proxy (pad, identity->sinkpad, caps);
}
static GstPadNegotiateReturn
gst_identity_negotiate_sink (GstPad *pad, GstCaps **caps, gpointer *data)
{
GstIdentity *identity;
identity = GST_IDENTITY (gst_pad_get_parent (pad));
return gst_pad_negotiate_proxy (pad, identity->srcpad, caps);
}
static void
gst_identity_init (GstIdentity *identity)
{
identity->sinkpad = gst_pad_new ("sink", GST_PAD_SINK);
gst_element_add_pad (GST_ELEMENT (identity), identity->sinkpad);
gst_pad_set_negotiate_function (identity->sinkpad, gst_identity_negotiate_sink);
identity->srcpad = gst_pad_new ("src", GST_PAD_SRC);
gst_element_add_pad (GST_ELEMENT (identity), identity->srcpad);
gst_pad_set_negotiate_function (identity->srcpad, gst_identity_negotiate_src);
gst_element_set_loop_function (GST_ELEMENT (identity), gst_identity_loop);
identity->byte_size = 384;
identity->count = 5;
identity->bs = gst_bytestream2_new(identity->sinkpad);
}
static void
gst_identity_loop (GstElement *element)
{
GstIdentity *identity;
GstBuffer *buf;
int i;
g_return_if_fail (element != NULL);
g_return_if_fail (GST_IS_IDENTITY (element));
identity = GST_IDENTITY (element);
do {
g_print("\n");
for (i=0;i<identity->count;i++) {
g_print("bstest: getting a buffer of %d bytes\n",identity->byte_size);
buf = gst_bytestream2_read(identity->bs,identity->byte_size);
if (!buf) g_print("BUFFER IS BOGUS\n");
g_print("pushing the buffer, %d bytes at %d\n",GST_BUFFER_SIZE(buf),GST_BUFFER_OFFSET(buf));
gst_pad_push(identity->srcpad,buf);
g_print("\n");
}
exit(1);
} while (!GST_ELEMENT_IS_COTHREAD_STOPPING(element));
}
static void
gst_identity_set_property (GObject *object, guint prop_id, const GValue *value, GParamSpec *pspec)
{
GstIdentity *identity;
/* it's not null if we got it, but it might not be ours */
g_return_if_fail (GST_IS_IDENTITY (object));
identity = GST_IDENTITY (object);
switch (prop_id) {
case ARG_BYTE_SIZE:
identity->byte_size = g_value_get_uint (value);
break;
case ARG_COUNT:
identity->count = g_value_get_uint (value);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
}
}
static void gst_identity_get_property(GObject *object, guint prop_id, GValue *value, GParamSpec *pspec) {
GstIdentity *identity;
/* it's not null if we got it, but it might not be ours */
g_return_if_fail (GST_IS_IDENTITY (object));
identity = GST_IDENTITY (object);
switch (prop_id) {
case ARG_BYTE_SIZE:
g_value_set_uint (value, identity->byte_size);
break;
case ARG_COUNT:
g_value_set_uint (value, identity->count);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
}
}
static gboolean
plugin_init (GModule *module, GstPlugin *plugin)
{
GstElementFactory *factory;
// we need gstbytestream
if (!gst_library_load ("gstbytestream")) {
g_print("can't load bytestream\n");
return FALSE;
}
/* We need to create an ElementFactory for each element we provide.
* This consists of the name of the element, the GType identifier,
* and a pointer to the details structure at the top of the file.
*/
factory = gst_elementfactory_new("gstbstest", GST_TYPE_IDENTITY, &gst_identity_details);
g_return_val_if_fail(factory != NULL, FALSE);
/* The very last thing is to register the elementfactory with the plugin. */
gst_plugin_add_feature (plugin, GST_PLUGIN_FEATURE (factory));
return TRUE;
}
GstPluginDesc plugin_desc = {
GST_VERSION_MAJOR,
GST_VERSION_MINOR,
"gstbstest",
plugin_init
};

View file

@ -0,0 +1,288 @@
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <gst/gstinfo.h>
#include "gstbytestream2.h"
static void gst_bytestream2_print_status(GstByteStream2 *bs);
guint8 *gst_bytestream2_assemble(GstByteStream2 *bs, guint32 len);
/**
* gst_bytestream_new:
* @pad: the pad to attach the bytestream to
*
* creates a bytestream from the given pad
*
* Returns: a new #GstByteStream object
*/
GstByteStream2*
gst_bytestream2_new (GstPad *pad)
{
GstByteStream2 *bs = g_new (GstByteStream2, 1);
bs->pad = pad;
bs->flushptr = 0LL;
bs->size = 0LL;
bs->curbuf = NULL;
bs->curbufavail = 0;
bs->buflist = NULL;
bs->listcount = 0;
bs->listavail = 0;
return bs;
}
// 0 ..... ---------|----|---.......---|----------- ..... N
// f
// ^tail ^head
// cba
// \------la-------/
// \ ..... -----------size-------------/
// get the next buffer
// if the buffer can be merged with the head buffer, do so
// else add it onto the head of the
static gboolean
gst_bytestream2_get_next_buf(GstByteStream2 *bs) {
GstBuffer *nextbuf, *lastbuf;
GSList *end;
g_print("get_next_buf: pulling buffer\n");
nextbuf = gst_pad_pull(bs->pad);
g_print("get_next_buf: got buffer of %d bytes\n",GST_BUFFER_SIZE(nextbuf));
// first check to see if there's a curbuf
if (bs->curbuf == NULL) {
g_print("get_next_buf: no curbuf, filling\n");
// there isn't, let's fill it
bs->curbuf = nextbuf;
bs->curbufavail = GST_BUFFER_SIZE(nextbuf);
} else {
// there is, first check to see if there's a list of buffers at all
if (bs->buflist) {
g_print("gst_next_buf: there's a buflist, search for the end\n");
// now find the end of the list
end = g_slist_last(bs->buflist);
// get the buffer that's there
lastbuf = GST_BUFFER(end->data);
// see if we can marge cheaply
if (gst_buffer_is_span_fast(lastbuf,nextbuf)) {
g_print("get_next_buf: merging new buffer with last buf on list\n");
// it is, let's merge them
end->data = gst_buffer_merge(lastbuf,nextbuf);
// add to the length of the list, but not buffer count
bs->listavail += GST_BUFFER_SIZE(nextbuf);
// we can ditch the nextbuf then
gst_buffer_unref(nextbuf);
// if we can't, we just append this buffer
} else {
g_print("get_next_buf: adding new buffer to the end of the list\n");
end = g_slist_append(end,nextbuf);
// also need to increment length of list and buffer count
bs->listcount++;
bs->listavail += GST_BUFFER_SIZE(nextbuf);
}
// if there are no buffers in the list
} else {
g_print("get_next_buf: buflist is empty\n");
// first see if we can merge with curbuf
if (gst_buffer_is_span_fast(bs->curbuf,nextbuf)) {
g_print("get_next_buf: merging new buffer with curbuf\n");
// it is, merge them
bs->curbuf = gst_buffer_merge(bs->curbuf,nextbuf);
// add to the length of curbuf that's available
bs->curbufavail += GST_BUFFER_SIZE(nextbuf);
// we can unref nextbuf now
gst_buffer_unref(nextbuf);
// instead we tack this onto the (empty) list
} else {
g_print("get_next_buf: adding new buffer to list\n");
// put this on the end of the list
bs->buflist = g_slist_append(bs->buflist,nextbuf);
// and increment the number of bytes in the list
bs->listcount++;
bs->listavail += GST_BUFFER_SIZE(nextbuf);
}
}
}
return TRUE;
}
static gboolean
gst_bytestream2_fill_bytes(GstByteStream2 *bs, guint32 len) {
// GSList *walk;
// GstBuffer *buf;
// as long as we don't have enough, we get more buffers
while ((bs->curbufavail + bs->listavail) < len) {
g_print("fill_bytes: there are %d bytes in curbuf and %d in the list, we need %d\n",bs->curbufavail,bs->listavail,len);
gst_bytestream2_get_next_buf(bs);
}
return TRUE;
}
GstBuffer *
gst_bytestream2_peek (GstByteStream2 *bs, guint32 len) {
GstBuffer *retbuf = NULL;
g_return_val_if_fail(bs != NULL, NULL);
g_return_val_if_fail(len > 0, NULL);
g_print("peek: asking for %d bytes\n",len);
// make sure we have enough
g_print("peek: there are %d in curbuf and %d in the list\n",bs->curbufavail,bs->listavail);
if (len > bs->listavail) {
gst_bytestream2_fill_bytes(bs,len);
g_print("peek: there are now %d in curbuf and %d in the list\n",bs->curbufavail,bs->listavail);
}
// if the requested bytes are in the current buffer
g_print("peek: curbufavail is %d\n",bs->curbufavail);
if (len <= bs->curbufavail) {
g_print("peek: there are enough bytes in curbuf (need %d, have %d)\n",len,bs->curbufavail);
// create a sub-buffer of the curbuf
retbuf = gst_buffer_create_sub(bs->curbuf, GST_BUFFER_SIZE(bs->curbuf) - bs->curbufavail, len);
// otherwise we need to figure out how to assemble one
} else {
g_print("peek: current buffer is not big enough for len %d\n",len);
retbuf = gst_buffer_new();
GST_BUFFER_SIZE(retbuf) = len;
GST_BUFFER_DATA(retbuf) = gst_bytestream2_assemble(bs,len);
GST_BUFFER_OFFSET(retbuf) = GST_BUFFER_OFFSET(bs->curbuf) + (GST_BUFFER_SIZE(bs->curbuf) - bs->curbufavail);
}
return retbuf;
}
guint8 *
gst_bytestream2_assemble(GstByteStream2 *bs, guint32 len)
{
guint8 *data = g_malloc(len);
GSList *walk;
guint32 copied = 0;
GstBuffer *buf;
// copy the data from the curbuf
g_print("copying %d bytes from curbuf at %d to *data\n",bs->curbufavail,
GST_BUFFER_SIZE(bs->curbuf) - bs->curbufavail);
memcpy(data,GST_BUFFER_DATA(bs->curbuf) + GST_BUFFER_SIZE(bs->curbuf) - bs->curbufavail,
bs->curbufavail);
copied += bs->curbufavail;
// asumption is made that the buffers all exist in the list
walk = bs->buflist;
while (copied < len) {
buf = GST_BUFFER(walk->data);
if (GST_BUFFER_SIZE(buf) < (len-copied)) {
g_print("coping %d bytes from buf to output offset %d\n",GST_BUFFER_SIZE(buf),copied);
memcpy(data+copied,GST_BUFFER_DATA(buf),GST_BUFFER_SIZE(buf));
copied += GST_BUFFER_SIZE(buf);
} else {
g_print("coping %d bytes from buf to output offset %d\n",len-copied,copied);
memcpy(data+copied,GST_BUFFER_DATA(buf),len-copied);
copied = len;
}
walk = g_slist_next(walk);
}
return data;
}
gboolean
gst_bytestream2_flush(GstByteStream2 *bs, guint32 len)
{
GSList *walk;
g_print("flush: flushing %d bytes\n",len);
// if the flush is totally in the curbuf, we can just trim those bytes
// note that if len == curbufavail, this doesn't trigger because we must refill curbuf
if (len < bs->curbufavail) {
g_print("trimming %d bytes from curbuf[avail]\n",len);
bs->curbufavail -= len;
// otherwise we have to flush at least one full buffer
} else {
// we can unref the curbuf and trim that many bytes off
gst_buffer_unref(bs->curbuf);
len -= bs->curbufavail;
g_print("unreffed curbuf, leaving %d bytes still to flush \n",len);
// repeat until we've flushed enough data
walk = bs->buflist;
do {
g_print("flush: there are %d in curbuf and %d in the list\n",bs->curbufavail,bs->listavail);
// if the list is empty, so is curbuf
if (bs->buflist == NULL) {
g_print("buffer list is totally empty, pulling a new buffer\n");
gst_bytestream2_get_next_buf(bs);
// else we can move a buffer down into curbuf
} else {
g_print("still some buffers in the list, retrieving from there\n");
// retrieve the next buffer
bs->curbuf = GST_BUFFER(bs->buflist->data);
bs->curbufavail = GST_BUFFER_SIZE(bs->curbuf);
// pull it off the list
bs->buflist = g_slist_delete_link(bs->buflist,bs->buflist);
bs->listavail -= GST_BUFFER_SIZE(bs->curbuf);
}
g_print("next buffer in list is at offset %d, is %d bytes long\n",GST_BUFFER_OFFSET(bs->curbuf),
GST_BUFFER_SIZE(bs->curbuf));
// figure out how much of it (if any) is left
if (len < GST_BUFFER_SIZE(bs->curbuf)) {
g_print("removing first %d bytes from the new curbuf\n",len);
// the buffer is bigger than the remaining bytes to be flushed
bs->curbufavail = GST_BUFFER_SIZE(bs->curbuf) - len;
len = 0;
} else {
g_print("buffer is totally contained in flush region, unreffing\n");
// the buffer is only part of the total, unref it
len -= GST_BUFFER_SIZE(bs->curbuf);
gst_buffer_unref(bs->curbuf);
bs->curbuf = NULL;
bs->curbufavail = 0;
}
} while ((len > 0) || (bs->curbuf == NULL));
}
}
GstBuffer *
gst_bytestream2_read(GstByteStream2 *bs, guint32 len)
{
GstBuffer *buf = gst_bytestream2_peek(bs,len);
gst_bytestream2_flush(bs,len);
return buf;
}
static void
gst_bytestream2_print_status(GstByteStream2 *bs) {
GSList *walk;
GstBuffer *buf;
g_print("flush pointer is at %d\n",bs->flushptr);
g_print("list has %d bytes available\n",bs->listavail);
walk = bs->buflist;
while (walk) {
buf = GST_BUFFER(walk->data);
walk = g_slist_next(walk);
g_print("buffer starts at %d and is %d bytes long\n",GST_BUFFER_OFFSET(buf),GST_BUFFER_SIZE(buf));
}
}

View file

@ -0,0 +1,34 @@
#ifndef __GST_BYTESTREAM2_H__
#define __GST_BYTESTREAM2_H__
#include <gst/gstpad.h>
#ifdef __cplusplus
extern "C" {
#endif /* __cplusplus */
typedef struct _GstByteStream2 GstByteStream2;
struct _GstByteStream2 {
GstPad *pad;
guint64 readptr;
guint64 flushptr;
guint64 size;
GstBuffer *curbuf;
guint32 curbufavail;
GSList *buflist;
gint listcount;
guint32 listavail;
};
GstByteStream2 * gst_bytestream2_new (GstPad *pad);
GstBuffer * gst_bytestream2_read (GstByteStream2 *bs, guint32 len);
GstBuffer * gst_bytestream2_peek (GstByteStream2 *bs, guint32 len);
gboolean gst_bytestream2_flush (GstByteStream2 *bs, guint32 len);
guint8 * gst_bytestream2_peek_bytes (GstByteStream2 *bs, guint32 len);
#endif /* __GST_BYTESTREAM2_H__ */