gstreamer/testsuite/test_event.py

111 lines
3.9 KiB
Python
Raw Normal View History

# -*- Mode: Python -*-
# vi:si:et:sw=4:sts=4:ts=4
#
# gst-python - Python bindings for GStreamer
# Copyright (C) 2002 David I. Lehn
# Copyright (C) 2004 Johan Dahlin
# Copyright (C) 2005 Edward Hervey
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
import os
import sys
from common import gst, unittest, testhelper
class EventTest(unittest.TestCase):
def setUp(self):
pipeline = gst.parse_launch('fakesrc ! fakesink name=sink')
self.sink = pipeline.get_by_name('sink')
pipeline.set_state(gst.STATE_PLAYING)
# def testEventEmpty(self):
# event = gst.Event(gst.EVENT_EMPTY)
# self.sink.send_event(event)
# def testEventSeek(self):
# event = gst.event_new_seek(gst.SEEK_METHOD_CUR, 0)
# assert event
# self.sink.send_event(event)
# FIXME: fix these tests
#class EventFileSrcTest(unittest.TestCase):
# # FIXME: properly create temp files
# filename = '/tmp/gst-python-test-file'
# def setUp(self):
# if os.path.exists(self.filename):
# os.remove(self.filename)
# open(self.filename, 'w').write(''.join(map(str, range(10))))
#
# self.pipeline = gst.parse_launch('filesrc name=source location=%s blocksize=1 ! fakesink signal-handoffs=1 name=sink' % self.filename)
# self.source = self.pipeline.get_by_name('source')
# self.sink = self.pipeline.get_by_name('sink')
# self.sink.connect('handoff', self.handoff_cb)
# self.bus = self.pipeline.get_bus()
# self.pipeline.set_state(gst.STATE_PLAYING)
#
# def tearDown(self):
# assert self.pipeline.set_state(gst.STATE_PLAYING)
# if os.path.exists(self.filename):
# os.remove(self.filename)
#
# def handoff_cb(self, element, buffer, pad):
# self.handoffs.append(str(buffer))
#
# def playAndIter(self):
# self.handoffs = []
# assert self.pipeline.set_state(gst.STATE_PLAYING)
# while 42:
# msg = self.bus.pop()
# if msg and msg.type == gst.MESSAGE_EOS:
# break
# assert self.pipeline.set_state(gst.STATE_PAUSED)
# handoffs = self.handoffs
# self.handoffs = []
# return handoffs
#
# def sink_seek(self, offset, method=gst.SEEK_METHOD_SET):
# method |= (gst.SEEK_FLAG_FLUSH | gst.FORMAT_BYTES)
# self.source.send_event(gst.event_new_seek(method, offset))
# self.source.send_event(gst.Event(gst.EVENT_FLUSH))
# self.sink.send_event(gst.event_new_seek(method, offset))
# self.sink.send_event(gst.Event(gst.EVENT_FLUSH))
#
# def testSimple(self):
# handoffs = self.playAndIter()
# assert handoffs == map(str, range(10))
#
# def testSeekCur(self):
# self.sink_seek(8)
#
# #print self.playAndIter()
class TestEmit(unittest.TestCase):
def testEmit(self):
object = testhelper.get_object()
object.connect('event', self._event_cb)
# First emit from C
testhelper.emit_event(object)
# Then emit from Python
object.emit('event', gst.event_new_eos())
def _event_cb(self, obj, event):
assert isinstance(event, gst.Event)
if __name__ == "__main__":
unittest.main()