mirror of
https://gitlab.freedesktop.org/gstreamer/gstreamer.git
synced 2025-04-26 06:54:49 +00:00
testsuite/test_pad.py: add a test that shows we can link a pad in a buffer probe callback. yay !
Original commit message from CVS: * testsuite/test_pad.py: add a test that shows we can link a pad in a buffer probe callback. yay !
This commit is contained in:
parent
deb316e3fd
commit
ead5c45836
2 changed files with 51 additions and 1 deletions
|
@ -1,3 +1,9 @@
|
||||||
|
2005-10-05 Thomas Vander Stichele <thomas at apestaart dot org>
|
||||||
|
|
||||||
|
* testsuite/test_pad.py:
|
||||||
|
add a test that shows we can link a pad in a buffer probe
|
||||||
|
callback. yay !
|
||||||
|
|
||||||
2005-10-05 Thomas Vander Stichele <thomas at apestaart dot org>
|
2005-10-05 Thomas Vander Stichele <thomas at apestaart dot org>
|
||||||
|
|
||||||
* gst/gstobject.override:
|
* gst/gstobject.override:
|
||||||
|
|
|
@ -140,7 +140,51 @@ class PadPushLinkedTest(TestCase):
|
||||||
|
|
||||||
def _probe_handler(self, pad, buffer, ret):
|
def _probe_handler(self, pad, buffer, ret):
|
||||||
return ret
|
return ret
|
||||||
|
|
||||||
|
# a test to show that we can link a pad from the probe handler
|
||||||
|
|
||||||
|
class PadPushProbeLinkTest(TestCase):
|
||||||
|
def setUp(self):
|
||||||
|
self.gctrack()
|
||||||
|
self.src = gst.Pad("src", gst.PAD_SRC)
|
||||||
|
self.sink = gst.Pad("sink", gst.PAD_SINK)
|
||||||
|
caps = gst.caps_from_string("foo/bar")
|
||||||
|
self.src.set_caps(caps)
|
||||||
|
self.sink.set_caps(caps)
|
||||||
|
self.sink.set_chain_function(self._chain_func)
|
||||||
|
self.buffers = []
|
||||||
|
|
||||||
|
def tearDown(self):
|
||||||
|
self.assertEquals(sys.getrefcount(self.src), 3)
|
||||||
|
self.assertEquals(self.src.__gstrefcount__, 1)
|
||||||
|
del self.src
|
||||||
|
self.assertEquals(sys.getrefcount(self.sink), 3)
|
||||||
|
self.assertEquals(self.sink.__gstrefcount__, 1)
|
||||||
|
del self.sink
|
||||||
|
self.gccollect()
|
||||||
|
self.gcverify()
|
||||||
|
|
||||||
|
def _chain_func(self, pad, buffer):
|
||||||
|
self.buffers.append(buffer)
|
||||||
|
|
||||||
|
return gst.FLOW_OK
|
||||||
|
|
||||||
|
def testProbeLink(self):
|
||||||
|
id = self.src.add_buffer_probe(self._probe_handler)
|
||||||
|
self.buffer = gst.Buffer()
|
||||||
|
self.assertEquals(self.buffer.__grefcount__, 1)
|
||||||
|
gst.debug('pushing buffer on linked pad, no probe')
|
||||||
|
self.assertEquals(self.src.push(self.buffer), gst.FLOW_OK)
|
||||||
|
gst.debug('pushed buffer on linked pad, no probe')
|
||||||
|
# pushing it takes a ref in the python wrapper to keep buffer
|
||||||
|
# alive afterwards; fakesink will get the buffer
|
||||||
|
self.assertEquals(self.buffer.__grefcount__, 1)
|
||||||
|
self.assertEquals(len(self.buffers), 1)
|
||||||
|
|
||||||
|
def _probe_handler(self, pad, buffer):
|
||||||
|
self.src.link(self.sink)
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
class PadTest(TestCase):
|
class PadTest(TestCase):
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
|
|
Loading…
Reference in a new issue