/* -*- Mode: C; c-basic-offset: 4 -*- */ /* gst-python * Copyright (C) 2002 David I. Lehn * Copyright (C) 2004 Johan Dahlin * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Library General Public * License as published by the Free Software Foundation; either * version 2 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Library General Public License for more details. * * You should have received a copy of the GNU Library General Public * License along with this library; if not, write to the * Free Software Foundation, Inc., 59 Temple Place - Suite 330, * Boston, MA 02111-1307, USA. * * Author: David I. Lehn */ %% headers #ifdef HAVE_CONFIG_H # include #endif #include "common.h" #include #include #include #include #include #include #include extern gboolean pygst_data_from_pyobject (PyObject *object, GstData **data); PyObject *PyGstExc_LinkError = NULL; GSList *mainloops = NULL; void _pygst_main_quit(void) { if (!mainloops) g_error ("Quit more loops than there are"); else { GMainLoop *loop = mainloops->data; mainloops = g_slist_delete_link (mainloops, mainloops); g_main_loop_quit (loop); g_main_loop_unref (loop); } } void _pygst_main(void) { GMainLoop *loop; loop = g_main_loop_new (NULL, FALSE); mainloops = g_slist_prepend (mainloops, loop); g_main_loop_run (loop); } %% include gstbin.override gstbuffer.override gstcaps.override gstelement.override gstpad.override gststructure.override %% init %% modulename gst %% import gobject.GObject as PyGObject_Type %% ignore-glob _* *_copy *_error_quark *_free *_get_type *_private *_thyself *_valist gst_class_* gst_init* gst_interface_* gst_tag_list_get_* gst_value_* %% ignore gst_alloc_trace_list gst_alloc_trace_get gst_error_get_message gst_object_default_deep_notify gst_object_check_uniqueness gst_object_replace gst_parse_launchv gst_tag_list_add gst_tag_list_add_values gst_tag_list_add_valist_values gst_tag_list_copy_value gst_trace_read_tsc %% define GstTagList.keys noargs void tag_foreach_func_dict (const GstTagList *list, const gchar *tag, PyObject *dict) { int count; guint i; const GValue *gvalue; PyObject *value; gchar *key; count = gst_tag_list_get_tag_size(GST_TAG_LIST(list), tag); for (i = 0; i < count; i++) { gvalue = gst_tag_list_get_value_index(GST_TAG_LIST(list), tag, i); value = pyg_value_as_pyobject(gvalue, TRUE); key = g_strdup (tag); PyDict_SetItemString(dict, key, value); g_free (key); Py_DECREF(value); } } void tag_foreach_func_list (const GstTagList *list, const gchar *tag, PyObject *py_list) { int count; count = gst_tag_list_get_tag_size(GST_TAG_LIST(list), tag); if (count == 0) PyErr_SetString(PyExc_KeyError, tag); else if (count == 1) PyList_Append(py_list, PyString_FromString(tag)); #if 0 else if (count > 1) PyErr_SetString(PyExc_TypeError, "lists are currently unspported"); #endif } static PyObject* _wrap_gst_tag_list_keys(PyGObject *self) { PyObject *dict; dict = PyList_New(0); gst_tag_list_foreach(GST_TAG_LIST(self->obj), (GstTagForeachFunc)tag_foreach_func_list, (gpointer)dict); return dict; } %% override-slot GstTagList.tp_as_mapping static int _wrap_gst_tag_list_length(PyGObject *self) { return gst_structure_n_fields((GstStructure*)self->obj); } static PyObject * _wrap_gst_tag_list_subscript(PyGObject *self, PyObject *py_key) { PyObject *v = NULL; char *key = PyString_AsString(py_key); int count = gst_tag_list_get_tag_size(GST_TAG_LIST(self->obj), key); if (count == 0) { PyErr_SetObject(PyExc_KeyError, py_key); } else if (count == 1) { const GValue *gvalue; gvalue = gst_tag_list_get_value_index(GST_TAG_LIST(self->obj), key, 0); v = pyg_value_as_pyobject(gvalue, TRUE); } else { PyErr_SetString(PyExc_TypeError, "lists are currently unspported"); } if (v != NULL) Py_INCREF(v); return v; } static PySequenceMethods _wrap_gst_tag_list_tp_as_mapping = { (inquiry)_wrap_gst_tag_list_length, /* mp_length */ (binaryfunc)_wrap_gst_tag_list_subscript, /* mp_subscript */ NULL, }; %% define GstTagList.has_key args static PyObject* _wrap_gst_tag_list_has_key(PyGObject *self, PyObject *args) { gchar *key; const GValue *gvalue; if (!PyArg_ParseTuple(args, "s:GstTagList.has_key", &key)) return NULL; gvalue = gst_tag_list_get_value_index(GST_TAG_LIST(self->obj), key, 0); return PyInt_FromLong(gvalue != NULL); } %% define GstTagList.get static PyObject * _wrap_gst_tag_list_get(PyGObject *self, PyObject *args) { char *key; PyObject *failobj = Py_None; PyObject *val = NULL; const GValue *gvalue; if (!PyArg_ParseTuple(args, "s|O:GstTagList.get", &key, &failobj)) return NULL; gvalue = gst_tag_list_get_value_index(GST_TAG_LIST(self->obj), key, 0); if (gvalue != NULL) { int count = gst_tag_list_get_tag_size(GST_TAG_LIST(self->obj), key); if (count == 0) { PyErr_SetString(PyExc_KeyError, key); } else if (count == 1) { gvalue = gst_tag_list_get_value_index(GST_TAG_LIST(self->obj), key, 0); val = pyg_value_as_pyobject(gvalue, TRUE); } else { PyErr_SetString(PyExc_TypeError, "lists are currently unspported"); } } if (val == NULL) val = failobj; Py_INCREF(val); return val; } %% override gst_tag_list_foreach kwargs static gboolean pygst_tag_list_foreach_marshal(GstTagList *list, const gchar *tag, gpointer user_data) { PyGstCustomNotify *cunote = user_data; PyObject *py_list; PyObject *py_key, *retobj; gboolean retval = TRUE; PyGILState_STATE state; g_assert(cunote->func); state = pyg_gil_state_ensure(); py_list = pyg_boxed_new(GST_TYPE_TAG_LIST, list, TRUE, TRUE); py_key = Py_BuildValue("s", tag); if (cunote->data) retobj = PyEval_CallFunction(cunote->func, "(NNO)", py_list, py_key, cunote->data); else retobj = PyEval_CallFunction(cunote->func, "(NN)", py_list, py_key); if (PyErr_Occurred () || (retobj == NULL) || (retobj == Py_None)) { PyErr_Print (); retval = FALSE; } else if (retobj != Py_None) { retval = PyInt_AsLong(retobj); } Py_XDECREF(retobj); pyg_gil_state_release(state); return retval; } static PyObject * _wrap_gst_tag_list_foreach (PyGObject *self, PyObject *args, PyObject *kwargs) { static char *kwlist[] = { "foreach_function", "args", NULL }; PyObject *pyfunc, *pyarg = NULL; PyGstCustomNotify cunote; if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|O:GstTagList.foreach", kwlist, &pyfunc, &pyarg)) return NULL; if (!PyCallable_Check(pyfunc)) { PyErr_SetString(PyExc_TypeError, "foreach_function not callable"); return NULL; } cunote.func = pyfunc; cunote.data = pyarg; gst_tag_list_foreach(pyg_boxed_get(self, GstTagList), (GstTagForeachFunc)pygst_tag_list_foreach_marshal, &cunote); Py_INCREF(Py_None); return Py_None; } %% override gst_tag_list_get_value_index kwargs static PyObject * _wrap_gst_tag_list_get_value_index (PyGObject *self, PyObject *args, PyObject *kwargs) { static char *kwlist[] = { "tag", "index", NULL }; char *tag; int index; const GValue *gvalue; if (!PyArg_ParseTupleAndKeywords(args, kwargs, "si:GstTagList.get_value_index", kwlist, &tag, &index)) return NULL; gvalue = gst_tag_list_get_value_index(pyg_boxed_get(self, GstTagList), tag, index); return pyg_value_as_pyobject(gvalue, FALSE); } %% override gst_type_find_factory_get_caps noargs static PyObject * _wrap_gst_type_find_factory_get_caps(PyGObject *self) { GstCaps *ret = (GstCaps*)gst_type_find_factory_get_caps(GST_TYPE_FIND_FACTORY(self->obj)); return pyg_boxed_new(GST_TYPE_CAPS, ret, TRUE, TRUE); } %% override gst_type_find_factory_get_caps noargs static PyObject * _wrap_gst_type_find_factory_get_caps(PyGObject *self) { GstCaps *ret = (GstCaps*)gst_type_find_factory_get_caps(GST_TYPE_FIND_FACTORY(self->obj)); return pyg_boxed_new(GST_TYPE_CAPS, ret, TRUE, TRUE); } %% override-attr GError.domain static PyObject * _wrap_gst_g_error__get_domain(PyGObject *self, void *closure) { return PyString_FromString(g_quark_to_string(((GError*)self->obj)->domain)); } %% override-slot GError.tp_str static PyObject * _wrap_gst_g_error_tp_str(PyGObject *self) { GError *error = (GError*)self->obj; return PyString_FromString(gst_error_get_message (error->domain, error->code)); } %% override gst_main noargs static PyObject * _wrap_gst_main(PyObject *self) { pyg_begin_allow_threads; _pygst_main(); pyg_end_allow_threads; if (PyErr_Occurred()) return NULL; Py_INCREF(Py_None); return Py_None; } %% override gst_main_quit args static PyObject * _wrap_gst_main_quit(PyObject *self) { _pygst_main_quit(); Py_INCREF(Py_None); return Py_None; } %% override gst_event_new_any kwargs static PyObject * _wrap_gst_event_new_any(PyObject *self, PyObject *args, PyObject *kwargs) { static char *kwlist[] = { "structure", NULL }; PyObject *py_structure; GstStructure *structure = NULL; GstEvent *event; if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O:event_new_any", kwlist, &py_structure)) return NULL; if (pyg_boxed_check(py_structure, GST_TYPE_STRUCTURE)) structure = pyg_boxed_get(py_structure, GstStructure); else { PyErr_SetString(PyExc_TypeError, "structure should be a GstStructure"); return NULL; } event = gst_event_new (GST_EVENT_ANY); if (!event) { PyErr_SetString(PyExc_RuntimeError, "could not create GstEvent object"); return NULL; } event->event_data.structure.structure = gst_structure_copy(structure); /* pyg_boxed_new handles NULL checking */ return pyg_boxed_new(GST_TYPE_EVENT, event, TRUE, TRUE); } %% override gst_event_any_get_structure noargs static PyObject * _wrap_gst_event_any_get_structure(PyObject *self) { GstStructure *ret; GstEvent *event; event = pyg_pointer_get(self, GstEvent); if (GST_EVENT_TYPE(event) == GST_EVENT_ANY) { ret = event->event_data.structure.structure; /* pyg_boxed_new handles NULL checking */ return pyg_boxed_new(GST_TYPE_STRUCTURE, ret, TRUE, TRUE); } else { Py_INCREF(Py_None); return Py_None; } } %% override gst_event_new_discontinuous kwargs static PyObject * _wrap_gst_event_new_discontinuous(PyObject *self, PyObject *args, PyObject *kwargs) { static char *kwlist[] = { "new_media", "format", "value", NULL }; int new_media = FALSE; PyObject *py_format = NULL; gint64 value = 0; GstFormat format; GstEvent *event; if (!PyArg_ParseTupleAndKeywords(args, kwargs, "iOL:event_new_discontinuous", kwlist, &new_media, &py_format, &value)) { return NULL; } if (pyg_enum_get_value(GST_TYPE_FORMAT, py_format, (gint *) &format)) { return NULL; } event = gst_event_new_discontinuous (new_media, format, value, GST_FORMAT_UNDEFINED); return pyg_boxed_new(GST_TYPE_EVENT, event, TRUE, TRUE); } %% override gst_registry_pool_plugin_list noargs static PyObject * _wrap_gst_registry_pool_plugin_list(PyGObject *self) { GList *l, *plugins; PyObject *list; plugins = gst_registry_pool_plugin_list(); list = PyList_New(0); for (l = plugins; l; l = l->next) { GstPlugin *plugin = (GstPlugin*)l->data; PyList_Append(list, pyg_boxed_new(GST_TYPE_PLUGIN, plugin, TRUE, TRUE)); } g_list_free(plugins); return list; } %% override gst_registry_pool_feature_list static PyObject * _wrap_gst_registry_pool_feature_list(PyGObject *self, PyObject *args) { GList *l, *features; PyObject *pygtype, *list; GType gtype; if (!PyArg_ParseTuple(args, "O:registry_pool_feature_list", &pygtype)) return NULL; gtype = pyg_type_from_object (pygtype); if (!gtype) return NULL; features = gst_registry_pool_feature_list(gtype); list = PyList_New(0); for (l = features; l; l = l->next) { GstPluginFeature *feature = (GstPluginFeature*)l->data; PyList_Append(list, pygobject_new (G_OBJECT (feature))); } g_list_free(features); return list; } %% override gst_xml_new noargs extern PyObject * libxml_xmlDocPtrWrap(xmlDocPtr doc); extern PyObject * libxml_xmlNodePtrWrap(xmlNodePtr node); /* libxml2 available check */ static PyObject * _gst_get_libxml2_module(void) { PyObject *xml = NULL; xml = PyImport_ImportModule("libxml2"); if (!xml) { PyErr_Clear(); PyErr_SetString(PyExc_RuntimeError,"libxml2 bindings required"); return NULL; } return xml; } static int _wrap_gst_xml_new(PyGObject *self) { PyObject *xml = _gst_get_libxml2_module(); if(!xml) return -1; self->obj = (GObject *)gst_xml_new(); if (!self->obj) { PyErr_SetString(PyExc_RuntimeError, "could not create GstXML object"); return -1; } pygobject_register_wrapper((PyObject *)self); return 0; } %% override gst_xml_get_topelements noargs static PyObject * _wrap_gst_xml_get_topelements(PyGObject *self) { GList *l, *xml_elements; PyObject *py_list; py_list = PyList_New(0); xml_elements = gst_xml_get_topelements(GST_XML(self->obj)); for (l = xml_elements; l; l = l->next) { GstElement *element = (GstElement*)l->data; PyList_Append(py_list, pygobject_new(G_OBJECT(element))); } return py_list; } %% override gst_xml_parse_memory kwargs static PyObject * _wrap_gst_xml_parse_memory(PyGObject *self, PyObject *args, PyObject *kwargs) { static char *kwlist[] = { "buffer", "root", NULL }; int buffer_len, ret; char *root = NULL; guchar *buffer; if (!PyArg_ParseTupleAndKeywords(args, kwargs, "s#|s:GstXML.parse_memory", kwlist, &buffer, &buffer_len, &root)) return NULL; ret = gst_xml_parse_memory(GST_XML(self->obj), buffer, buffer_len, root); return PyBool_FromLong(ret); } %% override gst_tag_setter_get_list noargs static PyObject * _wrap_gst_tag_setter_get_list(PyGObject *self) { GstTagList *ret; ret = (GstTagList*)gst_tag_setter_get_list(GST_TAG_SETTER(self->obj)); /* pyg_boxed_new handles NULL checking */ return pyg_boxed_new(GST_TYPE_TAG_LIST, ret, TRUE, TRUE); } %% override gst_probe_new args static gboolean probe_handler_marshal(GstProbe *probe, GstData **data, gpointer user_data) { PyGILState_STATE state; PyObject *callback, *args; PyObject *ret; PyObject *py_user_data; gboolean res; gint len, i; g_return_val_if_fail(user_data != NULL, FALSE); state = pyg_gil_state_ensure(); py_user_data = (PyObject *) user_data; callback = PyTuple_GetItem(py_user_data, 0); args = Py_BuildValue("(NN)", pyg_boxed_new(GST_TYPE_PROBE, probe, TRUE, TRUE), pyg_boxed_new(GST_TYPE_DATA, *data, TRUE, TRUE)); len = PyTuple_Size(py_user_data); for (i = 1; i < len; ++i) { PyObject *tuple = args; args = PySequence_Concat(tuple, PyTuple_GetItem(py_user_data, i)); Py_DECREF(tuple); } ret = PyObject_CallObject(callback, args); if (!ret) { PyErr_Print(); res = FALSE; } else { res = PyObject_IsTrue(ret); Py_DECREF(ret); } pyg_gil_state_release(state); return res; } static int _wrap_gst_probe_new(PyGBoxed *self, PyObject *args, PyObject *kwargs) { PyObject *first, *callback, *cbargs = NULL, *data; gboolean single_shot; gint len; len = PyTuple_Size(args); self->gtype = GST_TYPE_PROBE; self->free_on_dealloc = FALSE; if (len < 2) { PyErr_SetString(PyExc_TypeError, "Probe requires at least 2 args"); return -1; } first = PySequence_GetSlice(args, 0, 2); if (!PyArg_ParseTuple(first, "iO:Probe", &single_shot, &callback)) { Py_DECREF(first); return -1; } Py_DECREF(first); if (!PyCallable_Check(callback)) { PyErr_SetString(PyExc_TypeError, "second argument not callable"); return -1; } cbargs = PySequence_GetSlice(args, 2, len); if (cbargs == NULL) return -1; data = Py_BuildValue("(ON)", callback, cbargs); if (data == NULL) return -1; self->boxed = gst_probe_new(single_shot, probe_handler_marshal, data); return 0; } %% override gst_probe_perform object static PyObject * _wrap_gst_probe_perform(PyGBoxed *self, PyObject *args, PyObject *kwargs) { PyObject *py_data; GstData *data = NULL; gint len; len = PyTuple_Size(args); if (len != 1) { PyErr_SetString(PyExc_TypeError, "perform requires 1 arg"); return NULL; } if (!PyArg_ParseTuple(args, "O:perform", &py_data)) { return NULL; } /* FIXME: GstBuffer and GstEvent are not really "subclasses" so we hardcode checks for them here by hand. Ugh. */ if (pyg_boxed_check(py_data, GST_TYPE_EVENT)) data = pyg_boxed_get(py_data, GstEvent); if (pyg_boxed_check(py_data, GST_TYPE_BUFFER)) data = pyg_boxed_get(py_data, GstBuffer); if (pyg_boxed_check(py_data, GST_TYPE_DATA)) data = pyg_boxed_get(py_data, GstData); if (!data) { PyErr_SetString(PyExc_TypeError, "arg 1 must be GstData"); return NULL; } return PyBool_FromLong(gst_probe_perform(self->boxed, &data)); }