gstreamer/subprojects/gst-python/testsuite/common.py

139 lines
4.1 KiB
Python

# -*- Mode: Python; py-indent-offset: 4 -*-
# vim: tabstop=4 shiftwidth=4 expandtab
#
# Copyright (C) 2015 Thibault Saunier <thibault.saunier@collabora.com>
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this program; if not, write to the
# Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
# Boston, MA 02110-1301, USA.
#
# SPDX-License-Identifier: LGPL-2.0-or-later
"""
A collection of objects to use for testing
Copyied from pitivi
"""
import os
import gc
import unittest
import gi.overrides
import gi
gi.require_version("Gst", "1.0")
from gi.repository import Gst
detect_leaks = os.environ.get("TEST_DETECT_LEAKS", "1") not in ("0", "")
class TestCase(unittest.TestCase):
_tracked_types = (Gst.MiniObject, Gst.Element, Gst.Pad, Gst.Caps)
def gctrack(self):
self.gccollect()
self._tracked = []
for obj in gc.get_objects():
if not isinstance(obj, self._tracked_types):
continue
self._tracked.append(obj)
def gccollect(self):
ret = 0
while True:
c = gc.collect()
ret += c
if c == 0:
break
return ret
def gcverify(self):
leaked = []
for obj in gc.get_objects():
if not isinstance(obj, self._tracked_types) or \
obj in self._tracked:
continue
leaked.append(obj)
# we collect again here to get rid of temporary objects created in the
# above loop
self.gccollect()
for elt in leaked:
print(elt)
for i in gc.get_referrers(elt):
print(" ", i)
self.assertFalse(leaked, leaked)
del self._tracked
def setUp(self):
self._num_failures = len(getattr(self._result, 'failures', []))
self._num_errors = len(getattr(self._result, 'errors', []))
if detect_leaks:
self.gctrack()
def tearDown(self):
# don't barf gc info all over the console if we have already failed a
# test case
if (self._num_failures < len(getattr(self._result, 'failures', []))
or self._num_errors < len(getattr(self._result, 'failures', []))):
return
if detect_leaks:
self.gccollect()
self.gcverify()
# override run() to save a reference to the test result object
def run(self, result=None):
if not result:
result = self.defaultTestResult()
self._result = result
unittest.TestCase.run(self, result)
class SignalMonitor(object):
def __init__(self, obj, *signals):
self.signals = signals
self.connectToObj(obj)
def connectToObj(self, obj):
self.obj = obj
for signal in self.signals:
obj.connect(signal, self._signalCb, signal)
setattr(self, self._getSignalCounterName(signal), 0)
setattr(self, self._getSignalCollectName(signal), [])
def disconnectFromObj(self, obj):
obj.disconnect_by_func(self._signalCb)
del self.obj
def _getSignalCounterName(self, signal):
field = '%s_count' % signal.replace('-', '_')
return field
def _getSignalCollectName(self, signal):
field = '%s_collect' % signal.replace('-', '_')
return field
def _signalCb(self, obj, *args):
name = args[-1]
field = self._getSignalCounterName(name)
setattr(self, field, getattr(self, field, 0) + 1)
field = self._getSignalCollectName(name)
setattr(self, field, getattr(self, field, []) + [args[:-1]])