mirror of
https://gitlab.freedesktop.org/gstreamer/gstreamer.git
synced 2024-09-24 04:50:16 +00:00
56f25a4653
Original commit message from CVS: * gst/common.h: cleanup * gst/gst-types.defs: * gst/gst.defs: Updated defs file to current gstreamer core * gst/gst.override: Added useless function (at least from python bindings) and little correction on _wrap_gst_xml_get_topelements() * gst/gstbin.override: * gst/gstbuffer.override: Fix memleak in gst.Buffer.set_caps() * gst/gstevent.override: Added wrapper for remaining gst_event_parse_*() * gst/gstlibs.override: Wrapped more gst.Controller methods * gst/gstmodule.c: (init_gst): new gst_init() Added atexit(gst_deinit) * gst/gstpad.override: Fix memleak in gst.Pad.set_caps() * gst/gstquery.override: add gst.Query.parse_segment() * gst/libs.defs: Updated to current gst-libs * gst/pygstminiobject.c: (pygstminiobject_register_wrapper), (pygstminiobject_new), (pygstminiobject_dealloc): Added debug * testsuite/Makefile.am: * testsuite/common.py: * testsuite/gstpython.supp: * testsuite/python.supp: * testsuite/test_bin.py: * testsuite/test_buffer.py: * testsuite/test_element.py: * testsuite/test_event.py: * testsuite/test_ghostpad.py: * testsuite/test_iterator.py: * testsuite/test_message.py: * testsuite/test_pipeline.py: Proper valgrind testing, Updated tests to new API
115 lines
4.1 KiB
Python
115 lines
4.1 KiB
Python
# -*- Mode: Python -*-
|
|
# vi:si:et:sw=4:sts=4:ts=4
|
|
#
|
|
# gst-python - Python bindings for GStreamer
|
|
# Copyright (C) 2002 David I. Lehn
|
|
# Copyright (C) 2004 Johan Dahlin
|
|
# Copyright (C) 2005 Edward Hervey
|
|
#
|
|
# This library is free software; you can redistribute it and/or
|
|
# modify it under the terms of the GNU Lesser General Public
|
|
# License as published by the Free Software Foundation; either
|
|
# version 2.1 of the License, or (at your option) any later version.
|
|
#
|
|
# This library is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
|
# Lesser General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Lesser General Public
|
|
# License along with this library; if not, write to the Free Software
|
|
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
|
|
|
|
import os
|
|
import sys
|
|
from common import gst, unittest, testhelper, TestCase
|
|
|
|
class EventTest(TestCase):
|
|
def setUp(self):
|
|
pipeline = gst.parse_launch('fakesrc ! fakesink name=sink')
|
|
self.sink = pipeline.get_by_name('sink')
|
|
pipeline.set_state(gst.STATE_PLAYING)
|
|
TestCase.setUp(self)
|
|
|
|
def testEventSeek(self):
|
|
event = gst.event_new_seek(1.0, gst.FORMAT_BYTES, gst.SEEK_FLAG_FLUSH,
|
|
gst.SEEK_TYPE_NONE, 0, gst.SEEK_TYPE_NONE, 0)
|
|
assert event
|
|
self.sink.send_event(event)
|
|
|
|
def testWrongEvent(self):
|
|
buffer = gst.Buffer()
|
|
self.assertRaises(TypeError, self.sink.send_event, buffer)
|
|
number = 1
|
|
self.assertRaises(TypeError, self.sink.send_event, number)
|
|
|
|
|
|
# FIXME: fix these tests
|
|
#class EventFileSrcTest(unittest.TestCase):
|
|
# # FIXME: properly create temp files
|
|
# filename = '/tmp/gst-python-test-file'
|
|
# def setUp(self):
|
|
# if os.path.exists(self.filename):
|
|
# os.remove(self.filename)
|
|
# open(self.filename, 'w').write(''.join(map(str, range(10))))
|
|
#
|
|
# self.pipeline = gst.parse_launch('filesrc name=source location=%s blocksize=1 ! fakesink signal-handoffs=1 name=sink' % self.filename)
|
|
# self.source = self.pipeline.get_by_name('source')
|
|
# self.sink = self.pipeline.get_by_name('sink')
|
|
# self.sink.connect('handoff', self.handoff_cb)
|
|
# self.bus = self.pipeline.get_bus()
|
|
# self.pipeline.set_state(gst.STATE_PLAYING)
|
|
#
|
|
# def tearDown(self):
|
|
# assert self.pipeline.set_state(gst.STATE_PLAYING)
|
|
# if os.path.exists(self.filename):
|
|
# os.remove(self.filename)
|
|
#
|
|
# def handoff_cb(self, element, buffer, pad):
|
|
# self.handoffs.append(str(buffer))
|
|
#
|
|
# def playAndIter(self):
|
|
# self.handoffs = []
|
|
# assert self.pipeline.set_state(gst.STATE_PLAYING)
|
|
# while 42:
|
|
# msg = self.bus.pop()
|
|
# if msg and msg.type == gst.MESSAGE_EOS:
|
|
# break
|
|
# assert self.pipeline.set_state(gst.STATE_PAUSED)
|
|
# handoffs = self.handoffs
|
|
# self.handoffs = []
|
|
# return handoffs
|
|
#
|
|
# def sink_seek(self, offset, method=gst.SEEK_METHOD_SET):
|
|
# method |= (gst.SEEK_FLAG_FLUSH | gst.FORMAT_BYTES)
|
|
# self.source.send_event(gst.event_new_seek(method, offset))
|
|
# self.source.send_event(gst.Event(gst.EVENT_FLUSH))
|
|
# self.sink.send_event(gst.event_new_seek(method, offset))
|
|
# self.sink.send_event(gst.Event(gst.EVENT_FLUSH))
|
|
#
|
|
# def testSimple(self):
|
|
# handoffs = self.playAndIter()
|
|
# assert handoffs == map(str, range(10))
|
|
#
|
|
# def testSeekCur(self):
|
|
# self.sink_seek(8)
|
|
#
|
|
# #print self.playAndIter()
|
|
|
|
class TestEmit(TestCase):
|
|
def testEmit(self):
|
|
object = testhelper.get_object()
|
|
object.connect('event', self._event_cb)
|
|
|
|
# First emit from C
|
|
testhelper.emit_event(object)
|
|
|
|
# Then emit from Python
|
|
object.emit('event', gst.event_new_eos())
|
|
|
|
def _event_cb(self, obj, event):
|
|
assert isinstance(event, gst.Event)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|