testsuite/test_event.py: add a test case for autoplugging behaviour: create a source, connect probes, store new-segme...

Original commit message from CVS:

* testsuite/test_event.py:
add a test case for autoplugging behaviour:
create a source, connect probes, store new-segment event,
add element in buffer probe callback, and forward event
Currently fails due to refcounting on the stored new-segment
event
This commit is contained in:
Thomas Vander Stichele 2006-02-01 11:52:04 +00:00
parent 2f3f34b7af
commit d763780e7f
2 changed files with 89 additions and 0 deletions

View file

@ -1,3 +1,12 @@
2006-02-01 Thomas Vander Stichele <thomas at apestaart dot org>
* testsuite/test_event.py:
add a test case for autoplugging behaviour:
create a source, connect probes, store new-segment event,
add element in buffer probe callback, and forward event
Currently fails due to refcounting on the stored new-segment
event
2006-02-01 Thomas Vander Stichele <thomas at apestaart dot org>
* testsuite/test_element.py:

View file

@ -142,5 +142,85 @@ class TestEmit(TestCase):
assert isinstance(event, gst.Event)
class TestDelayedEventProbe(TestCase):
# this test:
# starts a pipeline with only a source
# adds an event probe to catch the (first) new-segment
# adds a buffer probe to "autoplug" and send out this event
def setUp(self):
TestCase.setUp(self)
self.pipeline = gst.Pipeline()
self.src = gst.element_factory_make('fakesrc')
self.src.set_property('num-buffers', 10)
self.pipeline.add(self.src)
self.srcpad = self.src.get_pad('src')
def tearDown(self):
gst.debug('setting pipeline to NULL')
self.pipeline.set_state(gst.STATE_NULL)
gst.debug('set pipeline to NULL')
# FIXME: wait for state change thread to die
while self.pipeline.__gstrefcount__ > 1:
gst.debug('waiting for self.pipeline G rc to drop to 1')
time.sleep(0.1)
self.assertEquals(self.pipeline.__gstrefcount__, 1)
def FIXMEtestProbe(self):
self.srcpad.add_event_probe(self._event_probe_cb)
self._buffer_probe_id = self.srcpad.add_buffer_probe(
self._buffer_probe_cb)
self._newsegment = None
self._eos = None
self._had_buffer = False
self.pipeline.set_state(gst.STATE_PLAYING)
while not self._eos:
time.sleep(0.1)
# verify if our newsegment event is still around and valid
self.failUnless(self._newsegment)
self.assertEquals(self._newsegment.type, gst.EVENT_NEWSEGMENT)
self.assertEquals(self._newsegment.__grefcount__, 1)
# verify if our eos event is still around and valid
self.failUnless(self._eos)
self.assertEquals(self._eos.type, gst.EVENT_EOS)
self.assertEquals(self._eos.__grefcount__, 1)
def _event_probe_cb(self, pad, event):
if event.type == gst.EVENT_NEWSEGMENT:
self._newsegment = event
self.assertEquals(event.__grefcount__, 3)
# drop the event, we're storing it for later sending
return False
if event.type == gst.EVENT_EOS:
self._eos = event
# we also want fakesink to get it
return True
self.fail("Got an unknown event %r" % event)
def _buffer_probe_cb(self, pad, buffer):
self.failUnless(self._newsegment)
# fake autoplugging by now putting in a fakesink
sink = gst.element_factory_make('fakesink')
self.pipeline.add(sink)
self.src.link(sink)
sink.set_state(gst.STATE_PLAYING)
pad = sink.get_pad('sink')
pad.send_event(self._newsegment)
# we don't want to be called again
self.srcpad.remove_buffer_probe(self._buffer_probe_id)
self._had_buffer = True
# now let the buffer through
return True
if __name__ == "__main__":
unittest.main()