mirror of
https://gitlab.freedesktop.org/gstreamer/gstreamer.git
synced 2025-01-26 09:08:14 +00:00
56f25a4653
Original commit message from CVS: * gst/common.h: cleanup * gst/gst-types.defs: * gst/gst.defs: Updated defs file to current gstreamer core * gst/gst.override: Added useless function (at least from python bindings) and little correction on _wrap_gst_xml_get_topelements() * gst/gstbin.override: * gst/gstbuffer.override: Fix memleak in gst.Buffer.set_caps() * gst/gstevent.override: Added wrapper for remaining gst_event_parse_*() * gst/gstlibs.override: Wrapped more gst.Controller methods * gst/gstmodule.c: (init_gst): new gst_init() Added atexit(gst_deinit) * gst/gstpad.override: Fix memleak in gst.Pad.set_caps() * gst/gstquery.override: add gst.Query.parse_segment() * gst/libs.defs: Updated to current gst-libs * gst/pygstminiobject.c: (pygstminiobject_register_wrapper), (pygstminiobject_new), (pygstminiobject_dealloc): Added debug * testsuite/Makefile.am: * testsuite/common.py: * testsuite/gstpython.supp: * testsuite/python.supp: * testsuite/test_bin.py: * testsuite/test_buffer.py: * testsuite/test_element.py: * testsuite/test_event.py: * testsuite/test_ghostpad.py: * testsuite/test_iterator.py: * testsuite/test_message.py: * testsuite/test_pipeline.py: Proper valgrind testing, Updated tests to new API
182 lines
6.2 KiB
Python
182 lines
6.2 KiB
Python
# -*- Mode: Python -*-
|
|
# vi:si:et:sw=4:sts=4:ts=4
|
|
#
|
|
# gst-python - Python bindings for GStreamer
|
|
# Copyright (C) 2002 David I. Lehn
|
|
# Copyright (C) 2004 Johan Dahlin
|
|
# Copyright (C) 2005 Edward Hervey
|
|
#
|
|
# This library is free software; you can redistribute it and/or
|
|
# modify it under the terms of the GNU Lesser General Public
|
|
# License as published by the Free Software Foundation; either
|
|
# version 2.1 of the License, or (at your option) any later version.
|
|
#
|
|
# This library is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
|
# Lesser General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Lesser General Public
|
|
# License along with this library; if not, write to the Free Software
|
|
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
|
|
|
|
from common import gst, unittest, TestCase
|
|
|
|
import sys
|
|
import gc
|
|
import gobject
|
|
|
|
class SrcBin(gst.Bin):
|
|
def prepare(self):
|
|
src = gst.element_factory_make('fakesrc')
|
|
self.add(src)
|
|
pad = src.get_pad("src")
|
|
ghostpad = gst.GhostPad("src", pad)
|
|
self.add_pad(ghostpad)
|
|
gobject.type_register(SrcBin)
|
|
|
|
class SinkBin(gst.Bin):
|
|
def prepare(self):
|
|
sink = gst.element_factory_make('fakesink')
|
|
self.add(sink)
|
|
pad = sink.get_pad("sink")
|
|
ghostpad = gst.GhostPad("sink", pad)
|
|
self.add_pad(ghostpad)
|
|
self.sink = sink
|
|
|
|
def connect_handoff(self, cb, *args, **kwargs):
|
|
self.sink.set_property('signal-handoffs', True)
|
|
self.sink.connect('handoff', cb, *args, **kwargs)
|
|
|
|
gobject.type_register(SinkBin)
|
|
|
|
|
|
class PipeTest(TestCase):
|
|
def setUp(self):
|
|
gst.info("setUp")
|
|
TestCase.setUp(self)
|
|
self.pipeline = gst.Pipeline()
|
|
self.assertEquals(self.pipeline.__gstrefcount__, 1)
|
|
self.assertEquals(sys.getrefcount(self.pipeline), 3)
|
|
|
|
self.src = SrcBin()
|
|
self.src.prepare()
|
|
self.sink = SinkBin()
|
|
self.sink.prepare()
|
|
self.assertEquals(self.src.__gstrefcount__, 1)
|
|
self.assertEquals(sys.getrefcount(self.src), 3)
|
|
self.assertEquals(self.sink.__gstrefcount__, 1)
|
|
self.assertEquals(sys.getrefcount(self.sink), 3)
|
|
gst.info("end of SetUp")
|
|
|
|
def tearDown(self):
|
|
gst.info("tearDown")
|
|
self.assertEquals(self.pipeline.__gstrefcount__, 1)
|
|
self.assertEquals(sys.getrefcount(self.pipeline), 3)
|
|
self.assertEquals(self.src.__gstrefcount__, 2)
|
|
self.assertEquals(sys.getrefcount(self.src), 3)
|
|
self.assertEquals(self.sink.__gstrefcount__, 2)
|
|
self.assertEquals(sys.getrefcount(self.sink), 3)
|
|
gst.debug('deleting pipeline')
|
|
del self.pipeline
|
|
self.gccollect()
|
|
|
|
self.assertEquals(self.src.__gstrefcount__, 1) # parent gone
|
|
self.assertEquals(self.sink.__gstrefcount__, 1) # parent gone
|
|
self.assertEquals(sys.getrefcount(self.src), 3)
|
|
self.assertEquals(sys.getrefcount(self.sink), 3)
|
|
gst.debug('deleting src')
|
|
del self.src
|
|
self.gccollect()
|
|
gst.debug('deleting sink')
|
|
del self.sink
|
|
self.gccollect()
|
|
|
|
TestCase.tearDown(self)
|
|
|
|
def testBinState(self):
|
|
self.pipeline.add(self.src, self.sink)
|
|
self.src.link(self.sink)
|
|
self.sink.connect_handoff(self._sink_handoff_cb)
|
|
self._handoffs = 0
|
|
|
|
self.pipeline.set_state(gst.STATE_PLAYING)
|
|
while True:
|
|
(ret, cur, pen) = self.pipeline.get_state(timeout=None)
|
|
if ret == gst.STATE_CHANGE_SUCCESS and cur == gst.STATE_PLAYING:
|
|
break
|
|
|
|
while self._handoffs < 10:
|
|
pass
|
|
|
|
self.pipeline.set_state(gst.STATE_NULL)
|
|
while True:
|
|
(ret, cur, pen) = self.pipeline.get_state(timeout=None)
|
|
if ret == gst.STATE_CHANGE_SUCCESS and cur == gst.STATE_NULL:
|
|
break
|
|
|
|
def testProbedLink(self):
|
|
self.pipeline.add(self.src)
|
|
pad = self.src.get_pad("src")
|
|
|
|
self.sink.connect_handoff(self._sink_handoff_cb)
|
|
self._handoffs = 0
|
|
|
|
# FIXME: adding a probe to the ghost pad does not work atm
|
|
# id = pad.add_buffer_probe(self._src_buffer_probe_cb)
|
|
realpad = pad.get_target()
|
|
self._probe_id = realpad.add_buffer_probe(self._src_buffer_probe_cb)
|
|
|
|
self._probed = False
|
|
|
|
self.pipeline.set_state(gst.STATE_PLAYING)
|
|
while True:
|
|
(ret, cur, pen) = self.pipeline.get_state(timeout=None)
|
|
if ret == gst.STATE_CHANGE_SUCCESS and cur == gst.STATE_PLAYING:
|
|
break
|
|
|
|
while not self._probed:
|
|
pass
|
|
|
|
while self._handoffs < 10:
|
|
pass
|
|
|
|
self.pipeline.set_state(gst.STATE_NULL)
|
|
while True:
|
|
(ret, cur, pen) = self.pipeline.get_state(timeout=None)
|
|
if ret == gst.STATE_CHANGE_SUCCESS and cur == gst.STATE_NULL:
|
|
break
|
|
|
|
def _src_buffer_probe_cb(self, pad, buffer):
|
|
gst.debug("received probe on pad %r" % pad)
|
|
self._probed = True
|
|
gst.debug('adding sink bin')
|
|
self.pipeline.add(self.sink)
|
|
# this seems to get rid of the warnings about pushing on an unactivated
|
|
# pad
|
|
gst.debug('setting sink state')
|
|
|
|
# FIXME: attempt one: sync to current pending state of bin
|
|
(res, cur, pen) = self.pipeline.get_state(timeout=0.0)
|
|
target = pen
|
|
if target == gst.STATE_VOID_PENDING:
|
|
target = cur
|
|
gst.debug("setting sink state to %r" % target)
|
|
# FIXME: the following print can cause a lock-up; why ?
|
|
# print target
|
|
# if we don't set async, it will possibly end up in PAUSED
|
|
self.sink.set_state(target)
|
|
|
|
gst.debug('linking')
|
|
self.src.link(self.sink)
|
|
gst.debug('removing buffer probe id %r' % self._probe_id)
|
|
pad.remove_buffer_probe(self._probe_id)
|
|
self._probe_id = None
|
|
gst.debug('done')
|
|
|
|
def _sink_handoff_cb(self, sink, pad, buffer):
|
|
gst.debug('received handoff on pad %r' % pad)
|
|
self._handoffs += 1
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|