/* -*- Mode: C; ; c-file-style: "python" -*- */ /* gst-python * Copyright (C) 2002 David I. Lehn * Copyright (C) 2004 Johan Dahlin * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Library General Public * License as published by the Free Software Foundation; either * version 2 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Library General Public License for more details. * * You should have received a copy of the GNU Library General Public * License along with this library; if not, write to the * Free Software Foundation, Inc., 59 Temple Place - Suite 330, * Boston, MA 02111-1307, USA. * * Author: Johan Dahlin */ %% headers static Py_ssize_t gst_buffer_getreadbuffer (PyObject *self, Py_ssize_t index, void **ptr); static Py_ssize_t gst_buffer_getwritebuf (PyObject *self, Py_ssize_t index, void **ptr); static Py_ssize_t gst_buffer_getsegcount (PyObject *self, Py_ssize_t *lenp); #if PY_VERSION_HEX >= 0x02050000 static Py_ssize_t gst_buffer_getcharbuf (PyObject *self, Py_ssize_t index, char **ptr); #else static Py_ssize_t gst_buffer_getcharbuf (PyObject *self, Py_ssize_t index, const char **ptr); #endif %% override gst_buffer_new kwargs static int _wrap_gst_buffer_new(PyGstMiniObject *self, PyObject *args, PyObject *kwargs) { static char *kwlist[] = { "data", "buffer_size", NULL }; char *data = NULL; int size = 0; int buf_size = -1; GST_INFO("self:%p", self); if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|z#i:GstBuffer.__init__", kwlist, &data, &size, &buf_size)) return -1; if (size < 0) { PyErr_SetString(PyExc_TypeError, "buffer size must be >= 0"); return -1; } if (buf_size < 0) buf_size = size; if (buf_size < size) { PyErr_SetString(PyExc_TypeError, "buffer size must be >= data size"); return -1; } self->obj = GST_MINI_OBJECT(gst_buffer_new_and_alloc(buf_size)); GST_INFO ("pyo:%p pyr:%"G_GSSIZE_FORMAT" minio:%p minir:%d", self, ((PyObject*)self)->ob_refcnt, self->obj, GST_MINI_OBJECT_REFCOUNT_VALUE(self->obj)); if (!self->obj) { PyErr_SetString(PyExc_RuntimeError, "could not create GstBuffer object"); return -1; } pygstminiobject_register_wrapper((PyObject *) self); if (data == NULL) return 0; memcpy (GST_BUFFER_DATA (self->obj), data, size); GST_BUFFER_SIZE (self->obj) = size; return 0; } %% override-slot GstBuffer.tp_str static PyObject * _wrap_gst_buffer_tp_str (PyGstMiniObject *self) { GstBuffer *buf; g_assert (self); buf = pyg_boxed_get (self, GstBuffer); g_assert (buf); return PyString_FromStringAndSize((const gchar*) GST_BUFFER_DATA(buf), (gint) GST_BUFFER_SIZE(buf)); } %% override-slot GstBuffer.tp_repr static PyObject * _wrap_gst_buffer_tp_repr (PyGstMiniObject *self) { GstBuffer *buf; guchar *data; gchar *repr; gint size = 0; PyObject *ret; g_assert (self); buf = GST_BUFFER(self->obj); g_assert (buf); size = GST_BUFFER_SIZE (buf); if (size == 0) { repr = g_strdup_printf ("", buf, size); } else { data = GST_BUFFER_DATA (buf); repr = g_strdup_printf ( "", buf, size, *data, size > 0 ? *(data + 1) : 0, size > 1 ? *(data + 2) : 0, size > 2 ? *(data + 3) : 0 ); } ret = PyString_FromStringAndSize(repr, strlen (repr)); g_free (repr); return ret; } %% override-slot GstBuffer.tp_as_buffer static PyBufferProcs _wrap_gst_buffer_tp_as_buffer = { gst_buffer_getreadbuffer, /* bf_getreadbuffer */ gst_buffer_getwritebuf, /* bf_getwritebuffer */ gst_buffer_getsegcount, /* bf_getsegcount */ gst_buffer_getcharbuf, /* bf_getcharbuffer */ }; static Py_ssize_t gst_buffer_getreadbuffer(PyObject *self, Py_ssize_t index, void **ptr) { GstBuffer *buf = pyg_boxed_get(self, GstBuffer); if ( index != 0 ) { PyErr_SetString(PyExc_SystemError, "accessing non-existent GstBuffer segment"); return -1; } *ptr = GST_BUFFER_DATA(buf); return GST_BUFFER_SIZE(buf); } static Py_ssize_t gst_buffer_getsegcount(PyObject *self, Py_ssize_t *lenp) { GstBuffer *buf = pyg_boxed_get(self, GstBuffer); if (lenp) *lenp = GST_BUFFER_SIZE(buf); return 1; } /* Need a version that has const char ** for Python 2.4 */ #if PY_VERSION_HEX >= 0x02050000 static Py_ssize_t gst_buffer_getcharbuf (PyObject *self, Py_ssize_t index, char **ptr) #else static Py_ssize_t gst_buffer_getcharbuf (PyObject *self, Py_ssize_t index, const char **ptr) #endif { return gst_buffer_getreadbuffer (self, index, (void **) ptr); } static Py_ssize_t gst_buffer_getwritebuf(PyObject *self, Py_ssize_t index, void **ptr) { GstBuffer *buf = pyg_boxed_get(self, GstBuffer); if ( index != 0 ) { PyErr_SetString(PyExc_SystemError, "accessing non-existent GstBuffer segment"); return -1; } if (!gst_buffer_is_writable (buf)) { PyErr_SetString(PyExc_TypeError, "buffer is not writable"); return -1; } *ptr = GST_BUFFER_DATA(buf); return GST_BUFFER_SIZE(buf); } %% override-slot GstBuffer.tp_as_sequence /* FIXME: should buffer parts be buffers or strings? */ static Py_ssize_t pygst_buffer_length(PyObject *self) { return GST_BUFFER_SIZE(pygobject_get (self)); } static PyObject * pygst_buffer_slice(PyObject *self, Py_ssize_t start, Py_ssize_t end) { GstBuffer *buf = GST_BUFFER (pygobject_get (self)); if (start < 0) start = 0; if (end < 0) end = 0; if (end > GST_BUFFER_SIZE(buf)) end = GST_BUFFER_SIZE(buf); if (end <= start) { PyErr_SetString(PyExc_IndexError, "buffer index out of range"); return NULL; } return PyString_FromStringAndSize((gchar *) GST_BUFFER_DATA (buf) + start, end - start); } static PyObject * pygst_buffer_item(PyObject *self, Py_ssize_t index) { return pygst_buffer_slice (self, index, index + 1); } static int pygst_buffer_ass_slice (PyObject *self, Py_ssize_t start, Py_ssize_t end, PyObject *val) { GstBuffer *buf = GST_BUFFER (pygobject_get (self)); const void *data; Py_ssize_t len; if (!gst_buffer_is_writable (buf)) { PyErr_SetString(PyExc_TypeError, "buffer is not writable"); return -1; } /* FIXME: policy? */ if (start < 0 || end <= start || end > GST_BUFFER_SIZE (buf)) { PyErr_SetString(PyExc_IndexError, "buffer index out of range"); return -1; } end -= start; if (PyObject_AsReadBuffer(val, &data, &len)) return -1; if (len > end) len = end; memcpy (GST_BUFFER_DATA (buf) + start, data, len); return 0; } static int pygst_buffer_ass_item (PyObject *self, Py_ssize_t index, PyObject *val) { GstBuffer *buf = GST_BUFFER (pygobject_get (self)); const void *data; Py_ssize_t len; if (!gst_buffer_is_writable (buf)) { PyErr_SetString(PyExc_TypeError, "buffer is not writable"); return -1; } if (index < 0 || index > GST_BUFFER_SIZE (buf)) { PyErr_SetString(PyExc_IndexError, "buffer index out of range"); return -1; } if (PyObject_AsReadBuffer(val, &data, &len)) return -1; /* FIXME: how do we handle this? */ if (len > GST_BUFFER_SIZE (buf) - index) len = GST_BUFFER_SIZE (buf) - index; memcpy (GST_BUFFER_DATA (buf) + index, data, len); return 0; } static PySequenceMethods _wrap_gst_buffer_tp_as_sequence = { pygst_buffer_length, /* sq_length */ NULL, /* sq_concat */ NULL, /* sq_repeat */ pygst_buffer_item, /* sq_item */ pygst_buffer_slice, /* sq_slice */ pygst_buffer_ass_item, /* sq_ass_item */ pygst_buffer_ass_slice, /* sq_ass_slice */ NULL, /* sq_contains */ NULL, /* sq_inplace_concat */ NULL, /* sq_inplace_repeat */ }; %% define GstBuffer.copy_on_write static PyObject * _wrap_gst_buffer_copy_on_write (PyObject *self) { GstBuffer *buf = pyg_boxed_get(self, GstBuffer); GST_INFO("INCREF"); if (gst_buffer_is_writable (buf)) { Py_INCREF (self); return self; } buf = gst_buffer_copy (buf); self = pyg_boxed_new (GST_TYPE_BUFFER, buf, FALSE, TRUE); return self; } %% define GstBuffer.flag_is_set static PyObject * _wrap_gst_buffer_flag_is_set(PyObject *self, PyObject *args) { int flag; PyObject *retval; GstBuffer *buf; if (!PyArg_ParseTuple(args, "i:GstBuffer.flag_is_set", &flag)) return NULL; buf = pyg_boxed_get(self, GstBuffer); g_assert(GST_IS_BUFFER(buf)); retval = GST_BUFFER_FLAG_IS_SET(buf, flag) ? Py_True : Py_False; Py_INCREF(retval); return retval; } %% define GstBuffer.flag_set static PyObject * _wrap_gst_buffer_flag_set(PyObject *self, PyObject *args) { int flag; GstBuffer *buf; if (!PyArg_ParseTuple(args, "i:GstBuffer.set", &flag)) return NULL; buf = pyg_boxed_get(self, GstBuffer); g_assert(GST_IS_BUFFER(buf)); GST_BUFFER_FLAG_SET(buf, flag); Py_INCREF(Py_None); return Py_None; } %% define GstBuffer.flag_unset static PyObject * _wrap_gst_buffer_flag_unset(PyObject *self, PyObject *args) { int flag; GstBuffer *buf; if (!PyArg_ParseTuple(args, "i:GstBuffer.unset", &flag)) return NULL; buf = pyg_boxed_get(self, GstBuffer); g_assert(GST_IS_BUFFER(buf)); GST_BUFFER_FLAG_UNSET(buf, flag); Py_INCREF(Py_None); return Py_None; } %% override-attr GstBuffer.data static PyObject * _wrap_gst_buffer__get_data(PyObject *self, void *closure) { GstBuffer *buf; g_assert (self); buf = pyg_boxed_get (self, GstBuffer); g_assert (buf); return PyString_FromStringAndSize((const gchar*) GST_BUFFER_DATA(buf), (gint) GST_BUFFER_SIZE(buf)); } %% override-attr GstBuffer.timestamp static PyObject * _wrap_gst_buffer__get_timestamp(PyObject *self, void *closure) { guint64 ret; ret = GST_BUFFER(pygstminiobject_get(self))->timestamp; return PyLong_FromUnsignedLongLong(ret); } static int _wrap_gst_buffer__set_timestamp(PyGstMiniObject *self, PyObject *value, void *closure) { guint64 val; if (PyInt_CheckExact(value)) val = PyInt_AsUnsignedLongLongMask(value); else val = PyLong_AsUnsignedLongLong(value); if (PyErr_Occurred()) return -1; GST_BUFFER(self->obj)->timestamp = val; return 0; } %% override-attr GstBuffer.duration static PyObject * _wrap_gst_buffer__get_duration(PyObject *self, void *closure) { guint64 ret; ret = GST_BUFFER(pygstminiobject_get(self))->duration; return PyLong_FromUnsignedLongLong(ret); } static int _wrap_gst_buffer__set_duration(PyGstMiniObject *self, PyObject *value, void *closure) { guint64 val; if (PyInt_CheckExact(value)) val = PyInt_AsUnsignedLongLongMask(value); else val = PyLong_AsUnsignedLongLong(value); if (PyErr_Occurred()) return -1; GST_BUFFER(self->obj)->duration = val; return 0; } %% override-attr GstBuffer.offset static PyObject * _wrap_gst_buffer__get_offset (PyObject *self, void *closure) { guint64 ret; GstMiniObject *miniobject; g_assert (self); miniobject = pygstminiobject_get (self); g_assert (miniobject); ret = GST_BUFFER_OFFSET (GST_BUFFER (miniobject)); return PyLong_FromUnsignedLongLong (ret); } static int _wrap_gst_buffer__set_offset (PyGstMiniObject *self, PyObject *value, void *closure) { guint64 val; g_assert (self); if (PyInt_CheckExact (value)) val = PyInt_AsUnsignedLongLongMask (value); else val = PyLong_AsUnsignedLongLong (value); if (PyErr_Occurred()) return -1; GST_BUFFER_OFFSET (GST_BUFFER (self->obj)) = val; return 0; } %% override-attr GstBuffer.offset_end static PyObject * _wrap_gst_buffer__get_offset_end (PyObject *self, void *closure) { guint64 ret; GstMiniObject *miniobject; g_assert (self); miniobject = pygstminiobject_get (self); g_assert (miniobject); ret = GST_BUFFER_OFFSET_END (GST_BUFFER (miniobject)); return PyLong_FromUnsignedLongLong (ret); } static int _wrap_gst_buffer__set_offset_end (PyGstMiniObject *self, PyObject *value, void *closure) { guint64 val; g_assert (self); if (PyInt_CheckExact (value)) val = PyInt_AsUnsignedLongLongMask (value); else val = PyLong_AsUnsignedLongLong (value); if (PyErr_Occurred ()) return -1; GST_BUFFER_OFFSET_END (GST_BUFFER (self->obj)) = val; return 0; } %% override gst_buffer_stamp kwargs static PyObject * _wrap_gst_buffer_stamp (PyGstMiniObject *self, PyObject *args, PyObject *kwargs) { static char *kwlist[] = { "src", NULL }; PyGstMiniObject *srcobj; GstBuffer *dest, *src; if (!PyArg_ParseTupleAndKeywords (args, kwargs, "O:GstBuffer.stamp", kwlist, &srcobj)) return NULL; dest = GST_BUFFER(pygstminiobject_get(self)); src = GST_BUFFER(pygstminiobject_get(srcobj)); gst_buffer_stamp (dest, (const GstBuffer*) src); Py_INCREF(Py_None); return Py_None; } %% override-attr GstBuffer.caps static PyObject * _wrap_gst_buffer__get_caps (PyObject *self, void *closure) { GstMiniObject *miniobject; GstCaps *ret; miniobject = pygstminiobject_get (self); g_assert (miniobject); pyg_begin_allow_threads; ret = gst_buffer_get_caps(GST_BUFFER(miniobject)); pyg_end_allow_threads; return pyg_boxed_new (GST_TYPE_CAPS, ret, FALSE, TRUE); } static int _wrap_gst_buffer__set_caps (PyGstMiniObject *self, PyObject *value, void *closure) { GstCaps *caps; g_assert (self); caps = pygst_caps_from_pyobject (value, NULL); if (PyErr_Occurred()) return -1; pyg_begin_allow_threads; gst_buffer_set_caps(GST_BUFFER(self->obj), caps); gst_caps_unref (caps); pyg_end_allow_threads; return 0; } %% override gst_buffer_set_caps kwargs static PyObject * _wrap_gst_buffer_set_caps(PyGstMiniObject *self, PyObject *args, PyObject *kwargs) { static char *kwlist[] = { "caps", NULL }; PyObject *py_caps; GstCaps *caps; if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O:GstBuffer.set_caps", kwlist, &py_caps)) return NULL; caps = pygst_caps_from_pyobject (py_caps, NULL); if (PyErr_Occurred()) return NULL; pyg_begin_allow_threads; gst_buffer_set_caps(GST_BUFFER(self->obj), caps); gst_caps_unref (caps); pyg_end_allow_threads; Py_INCREF(Py_None); return Py_None; }