gstreamer/testsuite/test_event.py

87 lines
2.9 KiB
Python
Raw Normal View History

import os
import sys
from common import gst, unittest, testhelper
class EventTest(unittest.TestCase):
def setUp(self):
pipeline = gst.parse_launch('fakesrc ! fakesink name=sink')
self.sink = pipeline.get_by_name('sink')
pipeline.set_state(gst.STATE_PLAYING)
2005-07-12 09:45:58 +00:00
## def testEventEmpty(self):
## event = gst.Event(gst.EVENT_EMPTY)
## self.sink.send_event(event)
def testEventSeek(self):
event = gst.event_new_seek(gst.SEEK_METHOD_CUR, 0)
2005-07-12 09:45:58 +00:00
assert event
self.sink.send_event(event)
class EventFileSrcTest(unittest.TestCase):
filename = '/tmp/gst-python-test-file'
def setUp(self):
if os.path.exists(self.filename):
os.remove(self.filename)
open(self.filename, 'w').write(''.join(map(str, range(10))))
self.pipeline = gst.parse_launch('filesrc name=source location=%s blocksize=1 ! fakesink signal-handoffs=1 name=sink' % self.filename)
self.source = self.pipeline.get_by_name('source')
self.sink = self.pipeline.get_by_name('sink')
self.sink.connect('handoff', self.handoff_cb)
2005-07-12 09:45:58 +00:00
self.bus = self.pipeline.get_bus()
self.pipeline.set_state(gst.STATE_PLAYING)
def tearDown(self):
assert self.pipeline.set_state(gst.STATE_PLAYING)
if os.path.exists(self.filename):
os.remove(self.filename)
def handoff_cb(self, element, buffer, pad):
self.handoffs.append(str(buffer))
def playAndIter(self):
self.handoffs = []
assert self.pipeline.set_state(gst.STATE_PLAYING)
2005-07-12 09:45:58 +00:00
while 42:
msg = self.bus.pop()
if msg and msg.type == gst.MESSAGE_EOS:
break
assert self.pipeline.set_state(gst.STATE_PAUSED)
handoffs = self.handoffs
self.handoffs = []
return handoffs
def sink_seek(self, offset, method=gst.SEEK_METHOD_SET):
method |= (gst.SEEK_FLAG_FLUSH | gst.FORMAT_BYTES)
self.source.send_event(gst.event_new_seek(method, offset))
self.source.send_event(gst.Event(gst.EVENT_FLUSH))
self.sink.send_event(gst.event_new_seek(method, offset))
self.sink.send_event(gst.Event(gst.EVENT_FLUSH))
def testSimple(self):
handoffs = self.playAndIter()
assert handoffs == map(str, range(10))
def testSeekCur(self):
self.sink_seek(8)
#print self.playAndIter()
class TestEmit(unittest.TestCase):
def testEmit(self):
object = testhelper.get_object()
object.connect('event', self._event_cb)
# First emit from C
testhelper.emit_event(object)
# Then emit from Python
object.emit('event', gst.Event(gst.EVENT_UNKNOWN))
def _event_cb(self, obj, event):
assert isinstance(event, gst.Event)
if __name__ == "__main__":
unittest.main()