testsuite/common.py: provide a default setUp/tearDown

Original commit message from CVS:

* testsuite/common.py:
provide a default setUp/tearDown
* testsuite/test_bin.py:
* testsuite/test_bus.py:
* testsuite/test_element.py:
* testsuite/test_pipeline.py:
adding/updating tests and gc tracking
This commit is contained in:
Thomas Vander Stichele 2005-09-28 15:48:10 +00:00
parent 9ba2f7fbbe
commit de81816a93
6 changed files with 150 additions and 29 deletions

View file

@ -1,3 +1,13 @@
2005-09-28 Thomas Vander Stichele <thomas at apestaart dot org>
* testsuite/common.py:
provide a default setUp/tearDown
* testsuite/test_bin.py:
* testsuite/test_bus.py:
* testsuite/test_element.py:
* testsuite/test_pipeline.py:
adding/updating tests and gc tracking
2005-09-28 Edward Hervey <edward@fluendo.com>
* gst/gst.override:

View file

@ -112,6 +112,9 @@ def run_silent(function, *args, **kwargs):
return output
class TestCase(unittest.TestCase):
_types = [gst.Element, gst.Pad, gst.Bus]
def gccollect(self):
# run the garbage collector
ret = 0
@ -126,20 +129,33 @@ class TestCase(unittest.TestCase):
def gctrack(self):
# store all gst objects in the gc in a tracking dict
# call before doing any allocation in your test, from setUp
gst.debug('tracking gc GstObjects')
gst.debug('tracking gc GstObjects for types %r' % self._types)
self.gccollect()
self._tracked = {}
for c in [gst.Element, gst.Pad]:
for c in self._types:
self._tracked[c] = [o for o in gc.get_objects() if isinstance(o, c)]
def gcverify(self):
# verify no new gst objects got added to the gc
# call after doing all cleanup in your test, from tearDown
gst.debug('verifying gc GstObjects')
gst.debug('verifying gc GstObjects for types %r' % self._types)
new = []
for c in [gst.Element, gst.Pad]:
for c in self._types:
objs = [o for o in gc.get_objects() if isinstance(o, c)]
new.extend([o for o in objs if o not in self._tracked[c]])
self.failIf(new, new)
del self._tracked
def setUp(self):
"""
Override me by chaining up to me at the start of your setUp.
"""
self.gctrack()
def tearDown(self):
"""
Override me by chaining up to me at the end of your tearDown.
"""
self.gccollect()
self.gcverify()

View file

@ -21,7 +21,8 @@
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
from common import gobject, gst, unittest
import sys
from common import gobject, gst, unittest, TestCase
# see
# http://www.sicem.biz/personal/lgs/docs/gobject-python/gobject-tutorial.html
@ -38,6 +39,9 @@ class MyBin(gst.Bin):
def do_change_state(self, state_change):
if state_change == gst.STATE_CHANGE_PAUSED_TO_PLAYING:
self._state_changed = True
# FIXME: it seems a vmethod increases the refcount without unreffing
# print self.__gstrefcount__
# print self.__grefcount__
# chain up to parent
return gst.Bin.do_change_state(self, state_change)
@ -45,10 +49,28 @@ class MyBin(gst.Bin):
# we need to register the type for PyGTK < 2.8
gobject.type_register(MyBin)
class BinSubclassTest(unittest.TestCase):
# FIXME: fix leak in vmethods before removing overriding fixture
class BinSubclassTest(TestCase):
def setUp(self):
pass
def tearDown(self):
pass
def testStateChange(self):
bin = MyBin("mybin")
self.assertEquals(bin.__gstrefcount__, 1)
self.assertEquals(sys.getrefcount(bin), 3)
self.assertEquals(bin.get_name(), "mybin")
self.assertEquals(bin.__gstrefcount__, 1)
# test get_state with no timeout
(ret, state, pending) = bin.get_state(None)
self.failIfEqual(ret, gst.STATE_CHANGE_FAILURE)
self.assertEquals(bin.__gstrefcount__, 1)
# set to playing
bin.set_state(gst.STATE_PLAYING)
self.failUnless(bin._state_changed)
@ -70,5 +92,8 @@ class BinSubclassTest(unittest.TestCase):
(ret, state, pending) = bin.get_state(timeout=0.1)
# back to NULL
bin.set_state(gst.STATE_NULL)
if __name__ == "__main__":
unittest.main()

View file

@ -18,29 +18,55 @@
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
from common import gst, unittest
from common import gst, unittest, TestCase
import gobject
class BusAddWatchTest(unittest.TestCase):
class BusAddWatchTest(TestCase):
# FIXME: one last remaining ref
def tearDown(self):
pass
def testGoodConstructor(self):
loop = gobject.MainLoop()
pipeline = gst.parse_launch("fakesrc ! fakesink")
bus = pipeline.get_bus()
self.assertEquals(bus.__gstrefcount__, 2)
self.assertEquals(pipeline.__gstrefcount__, 1)
watch_id = bus.add_watch(gst.MESSAGE_ANY, self._message_received, pipeline, loop, "one")
self.assertEquals(bus.__gstrefcount__, 3)
self.assertEquals(pipeline.__gstrefcount__, 1)
pipeline.set_state(gst.STATE_PLAYING)
loop.run()
pipeline.set_state(gst.STATE_PAUSED)
loop.run()
pipeline.set_state(gst.STATE_READY)
loop.run()
pipeline.set_state(gst.STATE_NULL)
self.gccollect()
self.assertEquals(bus.__gstrefcount__, 3)
self.assertEquals(pipeline.__gstrefcount__, 1)
self.failUnless(gobject.source_remove(watch_id))
self.gccollect()
self.assertEquals(bus.__gstrefcount__, 2)
self.assertEquals(pipeline.__gstrefcount__, 1)
del pipeline
self.gccollect()
# flush the bus
bus.set_flushing(True)
bus.set_flushing(False)
pipeline.set_state(gst.STATE_PAUSED)
loop.run()
self.gccollect()
# FIXME: refcount is still 2
# self.assertEquals(bus.__gstrefcount__, 1)
def _message_received(self, bus, message, pipeline, loop, id):
self.failUnless(isinstance(bus, gst.Bus))

View file

@ -30,7 +30,7 @@ class TestElement(gst.Bin):
def break_it_down(self):
self.debug('Hammer Time')
class ElementTest(unittest.TestCase):
class ElementTest(TestCase):
name = 'fakesink'
alias = 'sink'
@ -52,8 +52,13 @@ class FakeSinkTest(ElementTest):
name = 'fakesink'
alias = 'sink'
def setUp(self):
ElementTest.setUp(self)
self.element = gst.element_factory_make('fakesink', 'sink')
def tearDown(self):
del self.element
ElementTest.tearDown(self)
def checkError(self, old_state, state, name):
assert self.element.get_state() == gst.STATE_NULL
assert self.element.set_state(old_state)
@ -166,7 +171,7 @@ class ElementName(unittest.TestCase):
class QueryTest(TestCase):
def setUp(self):
self.gctrack()
TestCase.setUp(self)
self.pipeline = gst.parse_launch('fakesrc name=source ! fakesink')
self.assertEquals(self.pipeline.__gstrefcount__, 1)
@ -178,8 +183,7 @@ class QueryTest(TestCase):
def tearDown(self):
del self.pipeline
del self.element
self.gccollect()
self.gcverify()
TestCase.tearDown(self)
def testQuery(self):
gst.debug('querying fakesrc in FORMAT_BYTES')
@ -195,15 +199,14 @@ class QueryTest(TestCase):
assert not res
self.gccollect()
class QueueTest(unittest.TestCase):
class QueueTest(TestCase):
def testConstruct(self):
queue = gst.element_factory_make('queue')
assert isinstance(queue, gst.Queue)
assert queue.get_name() == 'queue0'
self.assertEquals(queue.__gstrefcount__, 1)
class DebugTest(unittest.TestCase):
class DebugTest(TestCase):
def testDebug(self):
e = gst.element_factory_make('fakesrc')
e.error('I am an error string')

View file

@ -46,9 +46,6 @@ class TestConstruction(TestCase):
def testParseLaunch(self):
pipeline = gst.parse_launch('fakesrc ! fakesink')
del pipeline
pass
class Pipeline(TestCase):
def setUp(self):
@ -76,9 +73,22 @@ class Pipeline(TestCase):
self.pipeline.set_state(gst.STATE_NULL)
self.assertEqual(self.pipeline.get_state(None)[1], gst.STATE_NULL)
class Bus(TestCase):
def testGet(self):
pipeline = gst.Pipeline('test')
self.assertEquals(pipeline.__gstrefcount__, 1)
bus = pipeline.get_bus()
self.assertEquals(pipeline.__gstrefcount__, 1)
# one for python and one for the pipeline
self.assertEquals(bus.__gstrefcount__, 2)
del pipeline
self.failUnless(self.gccollect())
self.assertEquals(bus.__gstrefcount__, 1)
class PipelineAndBus(TestCase):
def setUp(self):
self.gctrack()
TestCase.setUp(self)
self.pipeline = gst.Pipeline('test-pipeline')
self.pipeline.set_property('play-timeout', 0L)
source = gst.element_factory_make('fakesrc', 'source')
@ -87,16 +97,35 @@ class PipelineAndBus(TestCase):
gst.element_link_many(source, sink)
self.bus = self.pipeline.get_bus()
self.bus.add_watch(gst.MESSAGE_ANY, self._message_received)
self.assertEquals(self.bus.__gstrefcount__, 2)
self.handler = self.bus.add_watch(
gst.MESSAGE_ANY, self._message_received)
self.assertEquals(self.bus.__gstrefcount__, 3)
self.loop = gobject.MainLoop()
def tearDown(self):
# FIXME: fix the refcount issues with the bus/pipeline
#del self.pipeline
#del self.bus
# flush the bus to be able to assert on the pipeline refcount
self.bus.set_flushing(True)
self.gccollect()
#self.gcverify()
self.assertEquals(self.pipeline.__gstrefcount__, 1)
# one for the pipeline, two for the snake
# three for the watch now shake shake shake but don't you
self.assertEquals(self.bus.__gstrefcount__, 3)
del self.pipeline
self.gccollect()
self.assertEquals(self.bus.__gstrefcount__, 2)
self.failUnless(gobject.source_remove(self.handler))
self.assertEquals(self.bus.__gstrefcount__, 1)
del self.bus
self.gccollect()
TestCase.tearDown(self)
def _message_received(self, bus, message):
gst.debug('received message: %s, %s' % (
@ -106,17 +135,29 @@ class PipelineAndBus(TestCase):
old, new = message.parse_state_changed()
gst.debug('%r state change from %r to %r' % (
message.src.get_path_string(), old, new))
if message.src == self.pipeline and new == gst.STATE_PLAYING:
if message.src == self.pipeline and new == self.final:
self.loop.quit()
return True
def testPlaying(self):
self.final = gst.STATE_PLAYING
ret = self.pipeline.set_state_async(gst.STATE_PLAYING)
self.assertEquals(ret, gst.STATE_CHANGE_ASYNC)
# go into a main loop to wait for messages
self.loop.run()
# we go to READY so we get messages; going to NULL would set
# the bus flushing
self.final = gst.STATE_READY
ret = self.pipeline.set_state_async(gst.STATE_READY)
self.assertEquals(ret, gst.STATE_CHANGE_ASYNC)
self.loop.run()
# FIXME: not setting to NULL causes a deadlock; we might want to
# fix this in the bindings
self.pipeline.set_state(gst.STATE_NULL)
if __name__ == "__main__":
unittest.main()