gst/gst-types.defs (Buffer): Add some fields

Original commit message from CVS:
* gst/gst-types.defs (Buffer): Add some fields

* gst/gstbuffer.override: Move out from gst.override, add
tp_as_sequence support and reorganize the code

* testsuite/buffer.py (BufferTest): Add new tests
This commit is contained in:
Johan Dahlin 2004-05-04 14:20:28 +00:00
parent a2d17fd974
commit 45c2cf641d
9 changed files with 486 additions and 153 deletions

View file

@ -1,3 +1,12 @@
2004-05-04 Johan Dahlin <johan@gnome.org>
* gst/gst-types.defs (Buffer): Add some fields
* gst/gstbuffer.override: Move out from gst.override, add
tp_as_sequence support and reorganize the code
* testsuite/buffer.py (BufferTest): Add new tests
2004-05-03 Johan Dahlin <johan@gnome.org>
* gst/__init__.py: Remove pygtk import and import gobject

View file

@ -35,7 +35,7 @@ _gst_la_LIBADD = $(common_libadd)
_gst_la_LDFLAGS = $(common_ldflags) -export-symbols-regex init_gst
_gst_la_SOURCES = gst-argtypes.c gstmodule.c
nodist__gst_la_SOURCES = gst.c
GST_OVERRIDES = gst.override gstpad-handlers.override
GST_OVERRIDES = gst.override gstbuffer.override gstpad-handlers.override
GST_DEFS = gst.defs gst-types.defs
CLEANFILES = gst.c
EXTRA_DIST += $(GST_DEFS) $(GST_OVERRIDES)

View file

@ -161,6 +161,12 @@
(gtype-id "GST_TYPE_BUFFER")
(copy-func "gst_data_copy")
(release-func "gst_data_unref")
(fields
'("guint" "size")
'("guint" "maxsize")
'("guint64" "offset")
'("guint64" "offset_end")
)
)
(define-boxed Caps

View file

@ -39,6 +39,7 @@ static PyObject *_wrap_gst_element_factory_make(PyObject *self, PyObject *args,
%%
include
gstbuffer.override
gstpad-handlers.override
%%
init
@ -94,47 +95,6 @@ ignore
gst_tag_list_copy_value
gst_trace_read_tsc
%%
override gst_buffer_get_data
static PyObject*
_wrap_gst_buffer_get_data(PyObject *self)
{
GstBuffer *buf = pyg_boxed_get(self, GstBuffer);
return PyString_FromStringAndSize(GST_BUFFER_DATA(buf),
GST_BUFFER_SIZE(buf));
}
%%
override gst_buffer_set_data kwargs
static PyObject*
_wrap_gst_buffer_set_data(PyObject *self, PyObject *args, PyObject *kwargs)
{
static char *kwlist[] = {"data", NULL};
PyObject *data;
GstBuffer *buf;
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O:GstBuffer:set_data", kwlist, &data))
{
return NULL;
}
if (!PyString_Check(data)) {
PyErr_SetString(PyExc_TypeError, "data should be a string");
return NULL;
}
buf = pyg_boxed_get(self, GstBuffer);
if (GST_BUFFER_FLAGS(buf) & GST_BUFFER_READONLY) {
PyErr_SetString(PyExc_TypeError, "set_data can't use a READONLY buffer");
return NULL;
}
GST_BUFFER_SIZE(buf) = PyString_Size(data);
GST_BUFFER_DATA(buf) = g_new0(char, GST_BUFFER_SIZE(buf));
memcpy(GST_BUFFER_DATA(buf),
PyString_AsString(data),
PyString_Size(data));
Py_INCREF(Py_None);
return Py_None;
}
%%
override gst_bin_iterate
static PyObject *
_wrap_gst_bin_iterate(PyGObject *self)
@ -464,18 +424,6 @@ _wrap_gst_element_unlink_many(PyObject *self, PyObject *args)
return PyInt_FromLong(1);
}
%%
override-slot GstBuffer.tp_getattr
PyObject *
_wrap_gst_buffer_tp_getattr(PyGObject *self, char *attr)
{
if (!strcmp(attr, "type"))
return pyg_type_wrapper_new(GST_DATA_TYPE(self->obj));
else if (!strcmp(attr, "flags"))
return PyInt_FromLong(GST_DATA_FLAGS(self->obj));
return Py_FindMethod(_PyGstBuffer_methods, (PyObject*)self, attr);
}
%%
override GstPad.get_negotiated_caps
static PyObject *
_wrap_gst_pad_get_negotiated_caps(PyGObject *self)
@ -520,41 +468,6 @@ static PySequenceMethods _wrap_gst_caps_tp_as_sequence = {
NULL,
};
%%
override gst_buffer_new kwargs
static int
_wrap_gst_buffer_new(PyGBoxed *self, PyObject *args, PyObject *kwargs)
{
static char *kwlist[] = { "data", "buffer_size", NULL };
char *data = NULL;
int size;
int buf_size = 4096;
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|z#s:GstBuffer.__init__", kwlist,
&data, &size, &buf_size))
return -1;
self->gtype = GST_TYPE_BUFFER;
self->free_on_dealloc = FALSE;
self->boxed = gst_buffer_new(); //_and_alloc(buf_size);
if (!self->boxed) {
PyErr_SetString(PyExc_RuntimeError, "could not create GstBuffer object");
return -1;
}
GST_BUFFER_SIZE (self->boxed) = size;
GST_BUFFER_DATA (self->boxed) = data;
#if 0
if (data)
memcpy(GST_BUFFER_DATA (self->boxed), data, size);
gst_buffer_set_data (self->boxed, data, size);
gst_buffer_ref (GST_BUFFER (self->boxed));
#endif
return 0;
}
%%
override-slot GstCaps.tp_str
static PyObject *
_wrap_gst_caps_tp_str(PyGObject *self)
@ -1078,57 +991,3 @@ _wrap_gst_element_tp_new(PyTypeObject *type, PyObject *args, PyObject *kwargs)
{
return _wrap_gst_element_factory_make(NULL, args, kwargs);
}
%%
override-slot GstBuffer.tp_as_buffer
static int
gst_buffer_getreadbuffer(PyGObject *self, int index, const void **ptr)
{
if ( index != 0 ) {
PyErr_SetString(PyExc_SystemError,
"accessing non-existent string segment");
return -1;
}
*ptr = GST_BUFFER_DATA(self->obj);
return GST_BUFFER_SIZE(self->obj);
}
static int
gst_buffer_getsegcount(PyGObject *self, int *lenp)
{
if (lenp)
*lenp = GST_BUFFER_SIZE(self->obj);
return 1;
}
static int
gst_buffer_getcharbuf(PyGObject *self, int index, const char **ptr)
{
if ( index != 0 ) {
PyErr_SetString(PyExc_SystemError,
"accessing non-existent string segment");
return -1;
}
*ptr = GST_BUFFER_DATA(self->obj);
return GST_BUFFER_SIZE(self->obj);
}
#if 0
static int
string_buffer_getwritebuf(PyStringObject *self, int index, const void **ptr)
{
PyErr_SetString(PyExc_TypeError,
"Cannot use string as modifiable buffer");
return -1;
}
#endif
static PyBufferProcs _wrap_gst_buffer_tp_as_buffer = {
(getreadbufferproc)gst_buffer_getreadbuffer,
(getwritebufferproc)NULL,
(getsegcountproc)gst_buffer_getsegcount,
(getcharbufferproc)gst_buffer_getcharbuf,
};

251
gst/gstbuffer.override Normal file
View file

@ -0,0 +1,251 @@
/* -*- Mode: C; ; c-file-style: "python" -*- */
/* gst-python
* Copyright (C) 2002 David I. Lehn
* Copyright (C) 2004 Johan Dahlin
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 59 Temple Place - Suite 330,
* Boston, MA 02111-1307, USA.
*
* Author: Johan Dahlin <johan@gnome.org>
*/
%%
headers
static int gst_buffer_getreadbuffer (PyGObject *self,
int index,
const void **ptr);
static int gst_buffer_length (PyGObject *self);
static int gst_buffer_getwritebuf (PyGObject *self,
int index,
const void **ptr);
static int gst_buffer_getsegcount (PyGObject *self,
int *lenp);
static int gst_buffer_getcharbuf (PyGObject *self,
int index,
const char **ptr);
%%
override gst_buffer_new kwargs
static int
_wrap_gst_buffer_new(PyGBoxed *self, PyObject *args, PyObject *kwargs)
{
static char *kwlist[] = { "data", "buffer_size", NULL };
char *data = NULL;
int size;
int buf_size = -1;
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|z#i:GstBuffer.__init__", kwlist,
&data, &size, &buf_size))
return -1;
self->gtype = GST_TYPE_BUFFER;
self->free_on_dealloc = FALSE;
if (buf_size != -1)
self->boxed = gst_buffer_new_and_alloc(buf_size);
else
self->boxed = gst_buffer_new();
if (!self->boxed) {
PyErr_SetString(PyExc_RuntimeError, "could not create GstBuffer object");
return -1;
}
if (data == NULL)
return 0;
if (buf_size != -1 && buf_size != size) {
PyErr_Format(PyExc_TypeError, "data must be of length %d, not %d", size, buf_size);
return -1;
}
GST_BUFFER_DATA (self->boxed) = data;
GST_BUFFER_SIZE (self->boxed) = size;
return 0;
}
%%
override gst_buffer_get_data
static PyObject*
_wrap_gst_buffer_get_data(PyObject *self)
{
GstBuffer *buf = pyg_boxed_get(self, GstBuffer);
return PyString_FromStringAndSize(GST_BUFFER_DATA(buf),
GST_BUFFER_SIZE(buf));
}
%%
override gst_buffer_set_data kwargs
static PyObject*
_wrap_gst_buffer_set_data(PyObject *self, PyObject *args, PyObject *kwargs)
{
static char *kwlist[] = {"data", NULL};
PyObject *data;
GstBuffer *buf;
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O:GstBuffer:set_data", kwlist, &data))
{
return NULL;
}
if (!PyString_Check(data)) {
PyErr_SetString(PyExc_TypeError, "data should be a string");
return NULL;
}
buf = pyg_boxed_get(self, GstBuffer);
if (GST_BUFFER_FLAGS(buf) & GST_BUFFER_READONLY) {
PyErr_SetString(PyExc_TypeError, "set_data can't use a READONLY buffer");
return NULL;
}
GST_BUFFER_SIZE(buf) = PyString_Size(data);
GST_BUFFER_DATA(buf) = g_new0(char, GST_BUFFER_SIZE(buf));
memcpy(GST_BUFFER_DATA(buf),
PyString_AsString(data),
PyString_Size(data));
Py_INCREF(Py_None);
return Py_None;
}
#if 0
override-slot GstBuffer.tp_getattr
PyObject *
_wrap_gst_buffer_tp_getattr(PyGObject *self, char *attr)
{
/* We have some GstData methods since it's not a subclass */
if (!strcmp(attr, "type"))
return pyg_type_wrapper_new(GST_DATA_TYPE(self->obj));
else if (!strcmp(attr, "flags"))
return PyInt_FromLong(GST_DATA_FLAGS(self->obj));
#if 0
else if (!strcmp(attr, "size"))
return PyInt_FromLong(GST_BUFFER_SIZE(self->obj));
#endif
else if (!strcmp(attr, "maxsize"))
return PyInt_FromLong(GST_BUFFER_MAXSIZE(self->obj));
/* XXX: timestamp and duration */
else if (!strcmp(attr, "offset"))
return PyInt_FromLong(GST_BUFFER_OFFSET(self->obj));
else if (!strcmp(attr, "offset_end"))
return PyInt_FromLong(GST_BUFFER_OFFSET_END(self->obj));
return Py_FindMethod(_PyGstBuffer_methods, (PyObject*)self, attr);
}
#endif
%%
override-attr GstBuffer.size
static PyObject *
_wrap_gst_buffer__get_size(PyGObject *self, void *closure)
{
return PyInt_FromLong(GST_BUFFER_SIZE(self->obj));
}
%%
override-attr GstBuffer.maxsize
static PyObject *
_wrap_gst_buffer__get_maxsize(PyGObject *self, void *closure)
{
return PyInt_FromLong(GST_BUFFER_MAXSIZE(self->obj));
}
%%
override-attr GstBuffer.offset
static PyObject *
_wrap_gst_buffer__get_offset(PyGObject *self, void *closure)
{
return PyInt_FromLong(GST_BUFFER_OFFSET(self->obj));
}
%%
override-attr GstBuffer.offset_end
static PyObject *
_wrap_gst_buffer__get_offset_end(PyGObject *self, void *closure)
{
return PyInt_FromLong(GST_BUFFER_OFFSET_END(self->obj));
}
%%
override-slot GstBuffer.tp_str
static PyObject *
_wrap_gst_buffer_tp_str(PyGObject *self)
{
return PyString_FromStringAndSize(GST_BUFFER_DATA(self->obj),
GST_BUFFER_SIZE(self->obj));
}
%%
override-slot GstBuffer.tp_as_buffer
static PyBufferProcs _wrap_gst_buffer_tp_as_buffer = {
(getreadbufferproc)gst_buffer_getreadbuffer, /* bf_getreadbuffer */
(getwritebufferproc)gst_buffer_getwritebuf, /* bf_getwritebuffer */
(getsegcountproc)gst_buffer_getsegcount, /* bf_getsegcount */
(getcharbufferproc)gst_buffer_getcharbuf, /* bf_getcharbuffer */
};
static int
gst_buffer_getreadbuffer(PyGObject *self, int index, const void **ptr)
{
if ( index != 0 ) {
PyErr_SetString(PyExc_SystemError,
"accessing non-existent GstBuffer segment");
return -1;
}
*ptr = GST_BUFFER_DATA(self->obj);
return GST_BUFFER_SIZE(self->obj);
}
static int
gst_buffer_getsegcount(PyGObject *self, int *lenp)
{
if (lenp)
*lenp = GST_BUFFER_SIZE(self->obj);
return 1;
}
static int
gst_buffer_getcharbuf(PyGObject *self, int index, const char **ptr)
{
if ( index != 0 ) {
PyErr_SetString(PyExc_SystemError,
"accessing non-existent GstBuffer segment");
return -1;
}
*ptr = GST_BUFFER_DATA(self->obj);
return GST_BUFFER_SIZE(self->obj);
}
static int
gst_buffer_getwritebuf(PyGObject *self, int index, const void **ptr)
{
PyErr_SetString(PyExc_TypeError,
"Cannot use GstBuffer as modifiable buffer");
return -1;
}
%%
override-slot GstBuffer.tp_as_sequence
static PySequenceMethods _wrap_gst_buffer_tp_as_sequence = {
(inquiry)gst_buffer_length, /* sq_length */
NULL, /* sq_concat */
NULL, /* sq_repeat */
NULL, /* sq_item */
NULL, /* sq_slice */
NULL, /* sq_ass_item */
NULL, /* sq_ass_slice */
NULL, /* sq_contains */
NULL, /* sq_inplace_concat */
NULL, /* sq_inplace_repeat */
};
static int
gst_buffer_length(PyGObject *self)
{
return GST_BUFFER_SIZE(self->obj);
}

View file

@ -2,9 +2,66 @@ import sys
from common import gst, unittest
class BufferTest(unittest.TestCase):
def testBuffer(self):
self.buffer = gst.Buffer('test')
assert str(buffer(self.buffer)) == 'test'
def testBufferBuffer(self):
buf = gst.Buffer('test')
assert str(buffer(buf)) == 'test'
def testBufferStr(self):
buffer = gst.Buffer('test')
assert str(buffer) == 'test'
def testBufferBadConstructor(self):
self.assertRaises(TypeError, gst.Buffer, 'test', 0)
def testBufferStrNull(self):
test_string = 't\0e\0s\0t\0'
buffer = gst.Buffer(test_string)
assert str(buffer) == test_string
def testBufferSize(self):
test_string = 'a little string'
buffer = gst.Buffer(test_string)
assert len(buffer) == len(test_string)
assert hasattr(buffer, 'size')
assert buffer.size == len(buffer)
def testBufferMaxSize(self):
buffer = gst.Buffer(buffer_size=16)
assert hasattr(buffer, 'maxsize')
assert buffer.maxsize == 16
def testBufferCreateSub(self):
s = ''
for i in range(64):
s += '%02d' % i
buffer = gst.Buffer(s)
assert len(buffer) == 128
sub = buffer.create_sub(16, 16)
assert sub.maxsize == 16
assert sub.offset == -1, sub.offset
def testBufferMerge(self):
buffer1 = gst.Buffer('foo')
buffer2 = gst.Buffer('bar')
merged_buffer = buffer1.merge(buffer2)
assert str(merged_buffer) == 'foobar'
def testBufferJoin(self):
buffer1 = gst.Buffer('foo')
buffer2 = gst.Buffer('bar')
joined_buffer = buffer1.merge(buffer2)
assert str(joined_buffer) == 'foobar'
def testBufferSpan(self):
buffer1 = gst.Buffer('foo')
buffer2 = gst.Buffer('bar')
spaned_buffer = buffer1.span(0L, buffer2, 6L)
assert str(spaned_buffer) == 'foobar'
if __name__ == "__main__":
unittest.main()

View file

@ -18,6 +18,12 @@ class ElementTest(unittest.TestCase):
assert element is not None, 'element is None'
assert isinstance(element, gst.Element)
assert element.get_name() == self.alias
def testGoodConstructor2(self):
element = gst.element_factory_make(self.name, self.alias)
assert element is not None, 'element is None'
assert isinstance(element, gst.Element)
assert element.get_name() == self.alias
class FakeSinkTest(ElementTest):
FAKESINK_STATE_ERROR_NONE = "0"
@ -75,14 +81,55 @@ class FakeSinkTest(ElementTest):
def testStateErrorReadyNull(self):
self.checkError(gst.STATE_READY, gst.STATE_NULL,
self.FAKESINK_STATE_ERROR_READY_NULL)
def checkStateChange(self, old, new):
def state_change_cb(element, old_s, new_s):
assert isinstance(element, gst.Element)
assert element == self.element
assert old_s == old
assert new_s == new
assert self.element.set_state(old)
assert self.element.get_state() == old
self.element.connect('state-change', state_change_cb)
assert self.element.set_state(new)
assert self.element.get_state() == new
def testStateChangeNullReady(self):
self.checkStateChange(gst.STATE_NULL, gst.STATE_READY)
def testStateChangeReadyPaused(self):
self.checkStateChange(gst.STATE_READY, gst.STATE_PAUSED)
def testStateChangePausedPlaying(self):
self.checkStateChange(gst.STATE_PAUSED, gst.STATE_PLAYING)
def testStateChangePlayingPaused(self):
self.checkStateChange(gst.STATE_PLAYING, gst.STATE_PAUSED)
def testStateChangePausedReady(self):
self.checkStateChange(gst.STATE_PAUSED, gst.STATE_READY)
def testStateChangeReadyNull(self):
self.checkStateChange(gst.STATE_READY, gst.STATE_NULL)
class NonExistentTest(ElementTest):
name = 'this-element-does-not-exist'
alias = 'no-alias'
def testGoodConstructor(self):
pass
testGoodConstructor = lambda s: None
testGoodConstructor2 = lambda s: None
class FileSrcTest(ElementTest):
name = 'filesrc'
alias = 'source'
class FileSinkTest(ElementTest):
name = 'filesink'
alias = 'sink'
class ElementName(unittest.TestCase):
def testElementStateGetName(self):
get_name = gst.element_state_get_name

View file

@ -2,9 +2,66 @@ import sys
from common import gst, unittest
class BufferTest(unittest.TestCase):
def testBuffer(self):
self.buffer = gst.Buffer('test')
assert str(buffer(self.buffer)) == 'test'
def testBufferBuffer(self):
buf = gst.Buffer('test')
assert str(buffer(buf)) == 'test'
def testBufferStr(self):
buffer = gst.Buffer('test')
assert str(buffer) == 'test'
def testBufferBadConstructor(self):
self.assertRaises(TypeError, gst.Buffer, 'test', 0)
def testBufferStrNull(self):
test_string = 't\0e\0s\0t\0'
buffer = gst.Buffer(test_string)
assert str(buffer) == test_string
def testBufferSize(self):
test_string = 'a little string'
buffer = gst.Buffer(test_string)
assert len(buffer) == len(test_string)
assert hasattr(buffer, 'size')
assert buffer.size == len(buffer)
def testBufferMaxSize(self):
buffer = gst.Buffer(buffer_size=16)
assert hasattr(buffer, 'maxsize')
assert buffer.maxsize == 16
def testBufferCreateSub(self):
s = ''
for i in range(64):
s += '%02d' % i
buffer = gst.Buffer(s)
assert len(buffer) == 128
sub = buffer.create_sub(16, 16)
assert sub.maxsize == 16
assert sub.offset == -1, sub.offset
def testBufferMerge(self):
buffer1 = gst.Buffer('foo')
buffer2 = gst.Buffer('bar')
merged_buffer = buffer1.merge(buffer2)
assert str(merged_buffer) == 'foobar'
def testBufferJoin(self):
buffer1 = gst.Buffer('foo')
buffer2 = gst.Buffer('bar')
joined_buffer = buffer1.merge(buffer2)
assert str(joined_buffer) == 'foobar'
def testBufferSpan(self):
buffer1 = gst.Buffer('foo')
buffer2 = gst.Buffer('bar')
spaned_buffer = buffer1.span(0L, buffer2, 6L)
assert str(spaned_buffer) == 'foobar'
if __name__ == "__main__":
unittest.main()

View file

@ -18,6 +18,12 @@ class ElementTest(unittest.TestCase):
assert element is not None, 'element is None'
assert isinstance(element, gst.Element)
assert element.get_name() == self.alias
def testGoodConstructor2(self):
element = gst.element_factory_make(self.name, self.alias)
assert element is not None, 'element is None'
assert isinstance(element, gst.Element)
assert element.get_name() == self.alias
class FakeSinkTest(ElementTest):
FAKESINK_STATE_ERROR_NONE = "0"
@ -75,14 +81,55 @@ class FakeSinkTest(ElementTest):
def testStateErrorReadyNull(self):
self.checkError(gst.STATE_READY, gst.STATE_NULL,
self.FAKESINK_STATE_ERROR_READY_NULL)
def checkStateChange(self, old, new):
def state_change_cb(element, old_s, new_s):
assert isinstance(element, gst.Element)
assert element == self.element
assert old_s == old
assert new_s == new
assert self.element.set_state(old)
assert self.element.get_state() == old
self.element.connect('state-change', state_change_cb)
assert self.element.set_state(new)
assert self.element.get_state() == new
def testStateChangeNullReady(self):
self.checkStateChange(gst.STATE_NULL, gst.STATE_READY)
def testStateChangeReadyPaused(self):
self.checkStateChange(gst.STATE_READY, gst.STATE_PAUSED)
def testStateChangePausedPlaying(self):
self.checkStateChange(gst.STATE_PAUSED, gst.STATE_PLAYING)
def testStateChangePlayingPaused(self):
self.checkStateChange(gst.STATE_PLAYING, gst.STATE_PAUSED)
def testStateChangePausedReady(self):
self.checkStateChange(gst.STATE_PAUSED, gst.STATE_READY)
def testStateChangeReadyNull(self):
self.checkStateChange(gst.STATE_READY, gst.STATE_NULL)
class NonExistentTest(ElementTest):
name = 'this-element-does-not-exist'
alias = 'no-alias'
def testGoodConstructor(self):
pass
testGoodConstructor = lambda s: None
testGoodConstructor2 = lambda s: None
class FileSrcTest(ElementTest):
name = 'filesrc'
alias = 'source'
class FileSinkTest(ElementTest):
name = 'filesink'
alias = 'sink'
class ElementName(unittest.TestCase):
def testElementStateGetName(self):
get_name = gst.element_state_get_name