mirror of
https://gitlab.freedesktop.org/gstreamer/gstreamer.git
synced 2024-12-18 14:26:43 +00:00
pyges : Add a test suite with three test cases for the bindings
This commit is contained in:
parent
61e851d658
commit
cea06907c9
5 changed files with 259 additions and 0 deletions
|
@ -39,3 +39,6 @@ CLEANFILES = ges.c
|
|||
--prefix pyges $<) > gen-$*.c \
|
||||
&& cp gen-$*.c $*.c \
|
||||
&& rm -f gen-$*.c
|
||||
|
||||
check:
|
||||
@PYTHONPATH=$(top_srcdir):$(PYTHONPATH) $(PYTHON) $(srcdir)/testsuite/runtests.py
|
||||
|
|
129
bindings/python/testsuite/common.py
Normal file
129
bindings/python/testsuite/common.py
Normal file
|
@ -0,0 +1,129 @@
|
|||
# -*- Mode: Python -*-
|
||||
# vi:si:et:sw=4:sts=4:ts=4
|
||||
#
|
||||
# gst-python - Python bindings for GStreamer
|
||||
# Copyright (C) 2002 David I. Lehn
|
||||
# Copyright (C) 2004 Johan Dahlin
|
||||
# Copyright (C) 2005 Edward Hervey
|
||||
#
|
||||
# This library is free software; you can redistribute it and/or
|
||||
# modify it under the terms of the GNU Lesser General Public
|
||||
# License as published by the Free Software Foundation; either
|
||||
# version 2.1 of the License, or (at your option) any later version.
|
||||
#
|
||||
# This library is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
||||
# Lesser General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU Lesser General Public
|
||||
# License along with this library; if not, write to the Free Software
|
||||
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
|
||||
|
||||
import os
|
||||
import sys
|
||||
import gc
|
||||
import unittest
|
||||
import gst
|
||||
|
||||
import gobject
|
||||
try:
|
||||
gobject.threads_init()
|
||||
except:
|
||||
print "WARNING: gobject doesn't have threads_init, no threadsafety"
|
||||
|
||||
def disable_stderr():
|
||||
global _stderr
|
||||
_stderr = file('/tmp/stderr', 'w+')
|
||||
sys.stderr = os.fdopen(os.dup(2), 'w')
|
||||
os.close(2)
|
||||
os.dup(_stderr.fileno())
|
||||
|
||||
def enable_stderr():
|
||||
global _stderr
|
||||
|
||||
os.close(2)
|
||||
os.dup(sys.stderr.fileno())
|
||||
_stderr.seek(0, 0)
|
||||
data = _stderr.read()
|
||||
_stderr.close()
|
||||
os.remove('/tmp/stderr')
|
||||
return data
|
||||
|
||||
def run_silent(function, *args, **kwargs):
|
||||
disable_stderr()
|
||||
|
||||
try:
|
||||
function(*args, **kwargs)
|
||||
except Exception, exc:
|
||||
enable_stderr()
|
||||
raise exc
|
||||
|
||||
output = enable_stderr()
|
||||
|
||||
return output
|
||||
|
||||
class TestCase(unittest.TestCase):
|
||||
|
||||
_types = [gst.Object, gst.MiniObject]
|
||||
|
||||
def gccollect(self):
|
||||
# run the garbage collector
|
||||
ret = 0
|
||||
gst.debug('garbage collecting')
|
||||
while True:
|
||||
c = gc.collect()
|
||||
ret += c
|
||||
if c == 0: break
|
||||
gst.debug('done garbage collecting, %d objects' % ret)
|
||||
return ret
|
||||
|
||||
def gctrack(self):
|
||||
# store all gst objects in the gc in a tracking dict
|
||||
# call before doing any allocation in your test, from setUp
|
||||
gst.debug('tracking gc GstObjects for types %r' % self._types)
|
||||
self.gccollect()
|
||||
self._tracked = {}
|
||||
for c in self._types:
|
||||
self._tracked[c] = [o for o in gc.get_objects() if isinstance(o, c)]
|
||||
|
||||
def gcverify(self):
|
||||
# verify no new gst objects got added to the gc
|
||||
# call after doing all cleanup in your test, from tearDown
|
||||
gst.debug('verifying gc GstObjects for types %r' % self._types)
|
||||
new = []
|
||||
for c in self._types:
|
||||
objs = [o for o in gc.get_objects() if isinstance(o, c)]
|
||||
new.extend([o for o in objs if o not in self._tracked[c]])
|
||||
|
||||
print new
|
||||
self.failIf(new, new)
|
||||
#self.failIf(new, ["%r:%d" % (type(o), id(o)) for o in new])
|
||||
del self._tracked
|
||||
|
||||
def setUp(self):
|
||||
"""
|
||||
Override me by chaining up to me at the start of your setUp.
|
||||
"""
|
||||
# Using private variables is BAD ! this variable changed name in
|
||||
# python 2.5
|
||||
try:
|
||||
methodName = self.__testMethodName
|
||||
except:
|
||||
methodName = self._testMethodName
|
||||
gst.debug('%s.%s' % (self.__class__.__name__, methodName))
|
||||
self.gctrack()
|
||||
|
||||
def tearDown(self):
|
||||
"""
|
||||
Override me by chaining up to me at the end of your tearDown.
|
||||
"""
|
||||
# Using private variables is BAD ! this variable changed name in
|
||||
# python 2.5
|
||||
try:
|
||||
methodName = self.__testMethodName
|
||||
except:
|
||||
methodName = self._testMethodName
|
||||
gst.debug('%s.%s' % (self.__class__.__name__, methodName))
|
||||
self.gccollect()
|
||||
self.gcverify()
|
38
bindings/python/testsuite/runtests.py
Normal file
38
bindings/python/testsuite/runtests.py
Normal file
|
@ -0,0 +1,38 @@
|
|||
import glob
|
||||
import os
|
||||
import sys
|
||||
import unittest
|
||||
|
||||
SKIP_FILES = ['common', 'runtests']
|
||||
|
||||
os.environ['LC_MESSAGES'] = 'C'
|
||||
|
||||
def gettestnames(which):
|
||||
if not which:
|
||||
dir = os.path.split(os.path.abspath(__file__))[0]
|
||||
which = [os.path.basename(p) for p in glob.glob('%s/test_*.py' % dir)]
|
||||
print which
|
||||
|
||||
names = map(lambda x: x[:-3], which)
|
||||
for f in SKIP_FILES:
|
||||
if f in names:
|
||||
names.remove(f)
|
||||
return names
|
||||
|
||||
suite = unittest.TestSuite()
|
||||
loader = unittest.TestLoader()
|
||||
|
||||
for name in gettestnames(sys.argv[1:]):
|
||||
suite.addTest(loader.loadTestsFromName(name))
|
||||
|
||||
descriptions = 1
|
||||
verbosity = 1
|
||||
if os.environ.has_key('VERBOSE'):
|
||||
descriptions = 2
|
||||
verbosity = 2
|
||||
|
||||
testRunner = unittest.TextTestRunner(descriptions=descriptions,
|
||||
verbosity=verbosity)
|
||||
result = testRunner.run(suite)
|
||||
if result.failures or result.errors:
|
||||
sys.exit(1)
|
39
bindings/python/testsuite/test_timeline.py
Normal file
39
bindings/python/testsuite/test_timeline.py
Normal file
|
@ -0,0 +1,39 @@
|
|||
import glib
|
||||
import gst
|
||||
|
||||
from common import TestCase
|
||||
from gst import ges
|
||||
|
||||
class Timeline(TestCase):
|
||||
|
||||
def testTimeline(self):
|
||||
|
||||
tl = ges.timeline_new_audio_video()
|
||||
lyr = ges.SimpleTimelineLayer()
|
||||
src = ges.TimelineTestSource()
|
||||
pip = ges.TimelinePipeline()
|
||||
bus = pip.get_bus()
|
||||
self.mainloop = glib.MainLoop()
|
||||
|
||||
#Let's add the layer to the timeline, and the source to the layer.
|
||||
tl.add_layer(lyr)
|
||||
src.set_duration(long(gst.SECOND * 10))
|
||||
src.set_vpattern("Random (television snow)")
|
||||
lyr.add_object(src, -1)
|
||||
|
||||
pip.add_timeline(tl)
|
||||
bus.set_sync_handler(self.bus_handler)
|
||||
|
||||
self.pipeline = pip
|
||||
self.layer = lyr
|
||||
|
||||
#Mainloop is finished, tear down.
|
||||
self.pipeline = None
|
||||
|
||||
|
||||
def bus_handler(self, unused_bus, message):
|
||||
if message.type == gst.MESSAGE_ERROR:
|
||||
print "ERROR"
|
||||
elif message.type == gst.MESSAGE_EOS:
|
||||
print "Done"
|
||||
return gst.BUS_PASS
|
50
bindings/python/testsuite/test_transition.py
Normal file
50
bindings/python/testsuite/test_transition.py
Normal file
|
@ -0,0 +1,50 @@
|
|||
import glib
|
||||
import gst
|
||||
|
||||
from common import TestCase
|
||||
from gst import ges
|
||||
|
||||
class Timeline(TestCase):
|
||||
|
||||
def testTimeline(self):
|
||||
|
||||
tl = ges.timeline_new_audio_video()
|
||||
lyr = ges.SimpleTimelineLayer()
|
||||
src = ges.TimelineTestSource()
|
||||
src2 = ges.TimelineTestSource()
|
||||
tr = ges.TimelineStandardTransition("crossfade")
|
||||
pip = ges.TimelinePipeline()
|
||||
bus = pip.get_bus()
|
||||
self.mainloop = glib.MainLoop()
|
||||
|
||||
# Let's add the layer to the timeline, and the sources
|
||||
# and transition to the layer.
|
||||
|
||||
tl.add_layer(lyr)
|
||||
src.set_duration(long(gst.SECOND * 10))
|
||||
src2.set_duration(long(gst.SECOND * 10))
|
||||
src.set_vpattern("Random (television snow)")
|
||||
tr.set_duration(long(gst.SECOND * 10))
|
||||
|
||||
lyr.add_object(src, -1)
|
||||
lyr.add_object(tr, -1)
|
||||
lyr.add_object(src2, -1)
|
||||
|
||||
pip.add_timeline(tl)
|
||||
bus.set_sync_handler(self.bus_handler)
|
||||
|
||||
self.pipeline = pip
|
||||
self.layer = lyr
|
||||
|
||||
#Mainloop is finished, tear down.
|
||||
self.pipeline = None
|
||||
|
||||
|
||||
def bus_handler(self, unused_bus, message):
|
||||
if message.type == gst.MESSAGE_ERROR:
|
||||
print "ERROR"
|
||||
self.mainloop.quit()
|
||||
elif message.type == gst.MESSAGE_EOS:
|
||||
print "Done"
|
||||
self.mainloop.quit()
|
||||
return gst.BUS_PASS
|
Loading…
Reference in a new issue