# -*- Mode: Python -*- # vi:si:et:sw=4:sts=4:ts=4 # # gst-python - Python bindings for GStreamer # Copyright (C) 2002 David I. Lehn # Copyright (C) 2004 Johan Dahlin # Copyright (C) 2005 Edward Hervey # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public # License as published by the Free Software Foundation; either # version 2.1 of the License, or (at your option) any later version. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this library; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111-1307 USA import os import sys import time import tempfile from common import gst, unittest, testhelper, TestCase class EventTest(TestCase): def setUp(self): TestCase.setUp(self) self.pipeline = gst.parse_launch('fakesrc ! fakesink name=sink') self.sink = self.pipeline.get_by_name('sink') self.pipeline.set_state(gst.STATE_PLAYING) def tearDown(self): gst.debug('setting pipeline to NULL') self.pipeline.set_state(gst.STATE_NULL) gst.debug('set pipeline to NULL') # FIXME: wait for state change thread to die while self.pipeline.__gstrefcount__ > 1: gst.debug('waiting for self.pipeline G rc to drop to 1') time.sleep(0.1) self.assertEquals(self.pipeline.__gstrefcount__, 1) del self.sink del self.pipeline TestCase.tearDown(self) def testEventSeek(self): # this event only serves to change the rate of data transfer event = gst.event_new_seek(1.0, gst.FORMAT_BYTES, gst.SEEK_FLAG_FLUSH, gst.SEEK_TYPE_NONE, 0, gst.SEEK_TYPE_NONE, 0) # FIXME: but basesrc goes into an mmap/munmap spree, needs to be fixed event = gst.event_new_seek(1.0, gst.FORMAT_BYTES, gst.SEEK_FLAG_FLUSH, gst.SEEK_TYPE_SET, 0, gst.SEEK_TYPE_NONE, 0) assert event gst.debug('sending event') self.sink.send_event(event) gst.debug('sent event') self.assertEqual(event.parse_seek(), (1.0, gst.FORMAT_BYTES, gst.SEEK_FLAG_FLUSH, gst.SEEK_TYPE_SET, 0, gst.SEEK_TYPE_NONE, 0)) def testWrongEvent(self): buffer = gst.Buffer() self.assertRaises(TypeError, self.sink.send_event, buffer) number = 1 self.assertRaises(TypeError, self.sink.send_event, number) class EventFileSrcTest(TestCase): def setUp(self): TestCase.setUp(self) gst.info("start") self.filename = tempfile.mktemp() open(self.filename, 'w').write(''.join(map(str, range(10)))) self.pipeline = gst.parse_launch('filesrc name=source location=%s blocksize=1 ! fakesink signal-handoffs=1 name=sink' % self.filename) self.source = self.pipeline.get_by_name('source') self.sink = self.pipeline.get_by_name('sink') self.sigid = self.sink.connect('handoff', self.handoff_cb) self.bus = self.pipeline.get_bus() def tearDown(self): self.pipeline.set_state(gst.STATE_NULL) self.sink.disconnect(self.sigid) if os.path.exists(self.filename): os.remove(self.filename) del self.bus del self.pipeline del self.source del self.sink del self.handoffs TestCase.tearDown(self) def handoff_cb(self, element, buffer, pad): self.handoffs.append(str(buffer)) def playAndIter(self): self.handoffs = [] self.pipeline.set_state(gst.STATE_PLAYING) assert self.pipeline.set_state(gst.STATE_PLAYING) while 42: msg = self.bus.pop() if msg and msg.type == gst.MESSAGE_EOS: break assert self.pipeline.set_state(gst.STATE_PAUSED) handoffs = self.handoffs self.handoffs = [] return handoffs def sink_seek(self, offset, method=gst.SEEK_TYPE_SET): self.sink.seek(1.0, gst.FORMAT_BYTES, gst.SEEK_FLAG_FLUSH, method, offset, gst.SEEK_TYPE_NONE, 0) def testSimple(self): handoffs = self.playAndIter() assert handoffs == map(str, range(10)) def testSeekCur(self): self.sink_seek(8) self.playAndIter() class TestEmit(TestCase): def testEmit(self): object = testhelper.get_object() object.connect('event', self._event_cb) # First emit from C testhelper.emit_event(object) # Then emit from Python object.emit('event', gst.event_new_eos()) def _event_cb(self, obj, event): assert isinstance(event, gst.Event) class TestDelayedEventProbe(TestCase): # this test: # starts a pipeline with only a source # adds an event probe to catch the (first) new-segment # adds a buffer probe to "autoplug" and send out this event def setUp(self): TestCase.setUp(self) self.pipeline = gst.Pipeline() self.src = gst.element_factory_make('fakesrc') self.src.set_property('num-buffers', 10) self.pipeline.add(self.src) self.srcpad = self.src.get_pad('src') def tearDown(self): gst.debug('setting pipeline to NULL') self.pipeline.set_state(gst.STATE_NULL) gst.debug('set pipeline to NULL') # FIXME: wait for state change thread to die while self.pipeline.__gstrefcount__ > 1: gst.debug('waiting for self.pipeline G rc to drop to 1') time.sleep(0.1) self.assertEquals(self.pipeline.__gstrefcount__, 1) def testProbe(self): self.srcpad.add_event_probe(self._event_probe_cb) self._buffer_probe_id = self.srcpad.add_buffer_probe( self._buffer_probe_cb) self._newsegment = None self._eos = None self._had_buffer = False self.pipeline.set_state(gst.STATE_PLAYING) while not self._eos: time.sleep(0.1) # verify if our newsegment event is still around and valid self.failUnless(self._newsegment) self.assertEquals(self._newsegment.type, gst.EVENT_NEWSEGMENT) self.assertEquals(self._newsegment.__grefcount__, 1) # verify if our eos event is still around and valid self.failUnless(self._eos) self.assertEquals(self._eos.type, gst.EVENT_EOS) self.assertEquals(self._eos.__grefcount__, 1) def _event_probe_cb(self, pad, event): if event.type == gst.EVENT_NEWSEGMENT: self._newsegment = event self.assertEquals(event.__grefcount__, 3) # drop the event, we're storing it for later sending return False if event.type == gst.EVENT_EOS: self._eos = event # we also want fakesink to get it return True # sinks now send Latency events upstream if event.type == gst.EVENT_LATENCY: return True self.fail("Got an unknown event %r" % event) def _buffer_probe_cb(self, pad, buffer): self.failUnless(self._newsegment) # fake autoplugging by now putting in a fakesink sink = gst.element_factory_make('fakesink') self.pipeline.add(sink) self.src.link(sink) sink.set_state(gst.STATE_PLAYING) pad = sink.get_pad('sink') pad.send_event(self._newsegment) # we don't want to be called again self.srcpad.remove_buffer_probe(self._buffer_probe_id) self._had_buffer = True # now let the buffer through return True class TestEventCreationParsing(TestCase): def testEventStep(self): if hasattr(gst.Event, "parse_step"): e = gst.event_new_step(gst.FORMAT_TIME, 42, 1.0, True, True) self.assertEquals(e.type, gst.EVENT_STEP) fmt, amount, rate, flush, intermediate = e.parse_step() self.assertEquals(fmt, gst.FORMAT_TIME) self.assertEquals(amount, 42) self.assertEquals(rate, 1.0) self.assertEquals(flush, True) self.assertEquals(intermediate, True) if __name__ == "__main__": unittest.main()