mirror of
https://gitlab.freedesktop.org/gstreamer/gstreamer.git
synced 2024-11-14 13:21:28 +00:00
0a37fd1928
Original commit message from CVS: * testsuite/event.py (EventFileSrcTest.setUp): Start the pipeline, so we don't get warnings when sending events (EventTest.setUp): Ditto. * testsuite/pad.py: New test, only testing simple pad queries so far. * testsuite/Makefile.am (tests): Add missing tests * gst/gst.override (_wrap_gst_pad_query): Raise RuntimeError if the return value is False and only return the queried value.
69 lines
2.3 KiB
Python
69 lines
2.3 KiB
Python
import os
|
|
import sys
|
|
from common import gst, unittest
|
|
|
|
class EventTest(unittest.TestCase):
|
|
def setUp(self):
|
|
pipeline = gst.parse_launch('fakesrc ! fakesink name=sink')
|
|
self.sink = pipeline.get_by_name('sink')
|
|
pipeline.set_state(gst.STATE_PLAYING)
|
|
|
|
def testEventEmpty(self):
|
|
event = gst.Event(gst.EVENT_EMPTY)
|
|
self.sink.send_event(event)
|
|
|
|
def testEventSeek(self):
|
|
event = gst.event_new_seek(gst.SEEK_METHOD_CUR, 0)
|
|
self.sink.send_event(event)
|
|
|
|
class EventFileSrcTest(unittest.TestCase):
|
|
filename = '/tmp/gst-python-test-file'
|
|
def setUp(self):
|
|
if os.path.exists(self.filename):
|
|
os.remove(self.filename)
|
|
open(self.filename, 'w').write(''.join(map(str, range(10))))
|
|
|
|
self.pipeline = gst.parse_launch('filesrc name=source location=%s blocksize=1 ! fakesink signal-handoffs=1 name=sink' % self.filename)
|
|
self.source = self.pipeline.get_by_name('source')
|
|
self.sink = self.pipeline.get_by_name('sink')
|
|
self.sink.connect('handoff', self.handoff_cb)
|
|
self.pipeline.set_state(gst.STATE_PLAYING)
|
|
|
|
def tearDown(self):
|
|
assert self.pipeline.set_state(gst.STATE_PLAYING)
|
|
if os.path.exists(self.filename):
|
|
os.remove(self.filename)
|
|
|
|
def handoff_cb(self, element, buffer, pad):
|
|
self.handoffs.append(str(buffer))
|
|
|
|
def playAndIter(self):
|
|
self.handoffs = []
|
|
assert self.pipeline.set_state(gst.STATE_PLAYING)
|
|
while self.pipeline.iterate():
|
|
pass
|
|
assert self.pipeline.set_state(gst.STATE_PAUSED)
|
|
handoffs = self.handoffs
|
|
self.handoffs = []
|
|
return handoffs
|
|
|
|
def sink_seek(self, offset, method=gst.SEEK_METHOD_SET):
|
|
method |= (gst.SEEK_FLAG_FLUSH | gst.FORMAT_BYTES)
|
|
self.source.send_event(gst.event_new_seek(method, offset))
|
|
self.source.send_event(gst.Event(gst.EVENT_FLUSH))
|
|
self.sink.send_event(gst.event_new_seek(method, offset))
|
|
self.sink.send_event(gst.Event(gst.EVENT_FLUSH))
|
|
|
|
def testSimple(self):
|
|
handoffs = self.playAndIter()
|
|
assert handoffs == map(str, range(10))
|
|
|
|
def testSeekCur(self):
|
|
self.sink_seek(8)
|
|
|
|
#print self.playAndIter()
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|