mirror of
https://gitlab.freedesktop.org/gstreamer/gstreamer.git
synced 2025-01-12 02:15:31 +00:00
250 lines
8.9 KiB
Python
250 lines
8.9 KiB
Python
# -*- Mode: Python -*-
|
|
# vi:si:et:sw=4:sts=4:ts=4
|
|
#
|
|
# gst-python - Python bindings for GStreamer
|
|
# Copyright (C) 2002 David I. Lehn
|
|
# Copyright (C) 2004 Johan Dahlin
|
|
# Copyright (C) 2005 Edward Hervey
|
|
#
|
|
# This library is free software; you can redistribute it and/or
|
|
# modify it under the terms of the GNU Lesser General Public
|
|
# License as published by the Free Software Foundation; either
|
|
# version 2.1 of the License, or (at your option) any later version.
|
|
#
|
|
# This library is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
|
# Lesser General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Lesser General Public
|
|
# License along with this library; if not, write to the Free Software
|
|
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
|
|
|
|
import time
|
|
|
|
from common import gst, unittest, TestCase, pygobject_2_13
|
|
|
|
import gobject
|
|
|
|
class TestConstruction(TestCase):
|
|
def setUp(self):
|
|
self.gctrack()
|
|
|
|
def tearDown(self):
|
|
self.gccollect()
|
|
self.gcverify()
|
|
|
|
def testGoodConstructor(self):
|
|
name = 'test-pipeline'
|
|
pipeline = gst.Pipeline(name)
|
|
self.assertEquals(pipeline.__gstrefcount__, 1)
|
|
assert pipeline is not None, 'pipeline is None'
|
|
self.failUnless(isinstance(pipeline, gst.Pipeline),
|
|
'pipeline is not a GstPipline')
|
|
assert pipeline.get_name() == name, 'pipelines name is wrong'
|
|
self.assertEquals(pipeline.__gstrefcount__, 1)
|
|
|
|
def testParseLaunch(self):
|
|
pipeline = gst.parse_launch('fakesrc ! fakesink')
|
|
|
|
class Pipeline(TestCase):
|
|
def setUp(self):
|
|
self.gctrack()
|
|
self.pipeline = gst.Pipeline('test-pipeline')
|
|
source = gst.element_factory_make('fakesrc', 'source')
|
|
source.set_property('num-buffers', 5)
|
|
sink = gst.element_factory_make('fakesink', 'sink')
|
|
self.pipeline.add(source, sink)
|
|
gst.element_link_many(source, sink)
|
|
|
|
def tearDown(self):
|
|
del self.pipeline
|
|
self.gccollect()
|
|
self.gcverify()
|
|
|
|
def testRun(self):
|
|
self.assertEqual(self.pipeline.get_state()[1], gst.STATE_NULL)
|
|
self.pipeline.set_state(gst.STATE_PLAYING)
|
|
self.assertEqual(self.pipeline.get_state()[1], gst.STATE_PLAYING)
|
|
|
|
time.sleep(1)
|
|
|
|
self.assertEqual(self.pipeline.get_state()[1], gst.STATE_PLAYING)
|
|
self.pipeline.set_state(gst.STATE_NULL)
|
|
self.assertEqual(self.pipeline.get_state()[1], gst.STATE_NULL)
|
|
|
|
class PipelineTags(TestCase):
|
|
def setUp(self):
|
|
self.gctrack()
|
|
self.pipeline = gst.parse_launch('audiotestsrc num-buffers=100 ! vorbisenc name=encoder ! oggmux name=muxer ! fakesink')
|
|
|
|
def tearDown(self):
|
|
del self.pipeline
|
|
self.gccollect()
|
|
self.gcverify()
|
|
|
|
def testRun(self):
|
|
# in 0.10.15.1, this triggers
|
|
# sys:1: gobject.Warning: g_value_get_uint: assertion `G_VALUE_HOLDS_UINT (value)' failed
|
|
# during pipeline playing
|
|
|
|
l = gst.TagList()
|
|
l[gst.TAG_ARTIST] = 'artist'
|
|
l[gst.TAG_TRACK_NUMBER] = 1
|
|
encoder = self.pipeline.get_by_name('encoder')
|
|
encoder.merge_tags(l, gst.TAG_MERGE_APPEND)
|
|
|
|
self.assertEqual(self.pipeline.get_state()[1], gst.STATE_NULL)
|
|
self.pipeline.set_state(gst.STATE_PLAYING)
|
|
self.assertEqual(self.pipeline.get_state()[1], gst.STATE_PLAYING)
|
|
|
|
time.sleep(1)
|
|
|
|
self.assertEqual(self.pipeline.get_state()[1], gst.STATE_PLAYING)
|
|
self.pipeline.set_state(gst.STATE_NULL)
|
|
self.assertEqual(self.pipeline.get_state()[1], gst.STATE_NULL)
|
|
|
|
|
|
class Bus(TestCase):
|
|
def testGet(self):
|
|
pipeline = gst.Pipeline('test')
|
|
self.assertEquals(pipeline.__gstrefcount__, 1)
|
|
bus = pipeline.get_bus()
|
|
self.assertEquals(pipeline.__gstrefcount__, 1)
|
|
# one for python and one for the pipeline
|
|
self.assertEquals(bus.__gstrefcount__, 2)
|
|
|
|
del pipeline
|
|
if not pygobject_2_13:
|
|
self.failUnless(self.gccollect())
|
|
self.assertEquals(bus.__gstrefcount__, 1)
|
|
|
|
class PipelineAndBus(TestCase):
|
|
def setUp(self):
|
|
TestCase.setUp(self)
|
|
self.pipeline = gst.Pipeline('test-pipeline')
|
|
source = gst.element_factory_make('fakesrc', 'source')
|
|
sink = gst.element_factory_make('fakesink', 'sink')
|
|
self.pipeline.add(source, sink)
|
|
gst.element_link_many(source, sink)
|
|
|
|
self.bus = self.pipeline.get_bus()
|
|
self.assertEquals(self.bus.__gstrefcount__, 2)
|
|
self.handler = self.bus.add_watch(self._message_received)
|
|
self.assertEquals(self.bus.__gstrefcount__, 3)
|
|
self.assertEquals(self.pipeline.__gstrefcount__, 1)
|
|
|
|
self.loop = gobject.MainLoop()
|
|
|
|
def tearDown(self):
|
|
# FIXME: fix the refcount issues with the bus/pipeline
|
|
# flush the bus to be able to assert on the pipeline refcount
|
|
#while self.pipeline.__gstrefcount__ > 1:
|
|
self.gccollect()
|
|
|
|
# one for the pipeline, two for the snake
|
|
# three for the watch now shake shake shake but don't you
|
|
self.assertEquals(self.bus.__gstrefcount__, 3)
|
|
self.failUnless(gobject.source_remove(self.handler))
|
|
self.assertEquals(self.bus.__gstrefcount__, 2)
|
|
self.gccollect()
|
|
|
|
gst.debug('THOMAS: pipeline rc %d' % self.pipeline.__gstrefcount__)
|
|
#self.assertEquals(self.pipeline.__gstrefcount__, 1)
|
|
del self.pipeline
|
|
self.gccollect()
|
|
#self.assertEquals(self.bus.__gstrefcount__, 2)
|
|
del self.bus
|
|
self.gccollect()
|
|
|
|
# the async thread can be holding a ref, Wim is going to work on this
|
|
#TestCase.tearDown(self)
|
|
|
|
def _message_received(self, bus, message):
|
|
gst.debug('received message: %s, %s' % (
|
|
message.src.get_path_string(), message.type.value_nicks[1]))
|
|
t = message.type
|
|
if t == gst.MESSAGE_STATE_CHANGED:
|
|
old, new, pen = message.parse_state_changed()
|
|
gst.debug('%r state change from %r to %r' % (
|
|
message.src.get_path_string(), old, new))
|
|
if message.src == self.pipeline and new == self.final:
|
|
self.loop.quit()
|
|
|
|
return True
|
|
|
|
def testPlaying(self):
|
|
self.final = gst.STATE_PLAYING
|
|
ret = self.pipeline.set_state(gst.STATE_PLAYING)
|
|
self.assertEquals(ret, gst.STATE_CHANGE_ASYNC)
|
|
|
|
# go into a main loop to wait for messages
|
|
self.loop.run()
|
|
|
|
# we go to READY so we get messages; going to NULL would set
|
|
# the bus flushing
|
|
self.final = gst.STATE_READY
|
|
ret = self.pipeline.set_state(gst.STATE_READY)
|
|
self.assertEquals(ret, gst.STATE_CHANGE_SUCCESS)
|
|
self.loop.run()
|
|
|
|
# FIXME: not setting to NULL causes a deadlock; we might want to
|
|
# fix this in the bindings
|
|
self.assertEquals(self.pipeline.set_state(gst.STATE_NULL),
|
|
gst.STATE_CHANGE_SUCCESS)
|
|
self.assertEquals(self.pipeline.get_state(),
|
|
(gst.STATE_CHANGE_SUCCESS, gst.STATE_NULL, gst.STATE_VOID_PENDING))
|
|
self.gccollect()
|
|
|
|
class TestPipeSub(gst.Pipeline):
|
|
def do_handle_message(self, message):
|
|
self.debug('do_handle_message')
|
|
gst.Pipeline.do_handle_message(self, message)
|
|
self.type = message.type
|
|
gobject.type_register(TestPipeSub)
|
|
|
|
class TestPipeSubSub(TestPipeSub):
|
|
def do_handle_message(self, message):
|
|
self.debug('do_handle_message')
|
|
TestPipeSub.do_handle_message(self, message)
|
|
gobject.type_register(TestPipeSubSub)
|
|
|
|
|
|
# see http://bugzilla.gnome.org/show_bug.cgi?id=577735
|
|
class TestSubClass(TestCase):
|
|
def setUp(self):
|
|
self.gctrack()
|
|
|
|
def tearDown(self):
|
|
self.gccollect()
|
|
self.gcverify()
|
|
|
|
def testSubClass(self):
|
|
p = TestPipeSub()
|
|
u = gst.element_factory_make('uridecodebin')
|
|
self.assertEquals(u.__grefcount__, 1)
|
|
self.failIf(getattr(p, 'type', None))
|
|
# adding uridecodebin triggers a clock-provide message;
|
|
# this message should be dropped, and thus not affect
|
|
# the refcount of u beyond the parenting.
|
|
p.add(u)
|
|
self.assertEquals(getattr(p, 'type', None), gst.MESSAGE_CLOCK_PROVIDE)
|
|
self.assertEquals(u.__grefcount__, 2)
|
|
del p
|
|
self.assertEquals(u.__grefcount__, 1)
|
|
|
|
def testSubSubClass(self):
|
|
# Edward is worried that a subclass of a subclass will screw up
|
|
# the refcounting wrt. GST_BUS_DROP
|
|
p = TestPipeSubSub()
|
|
u = gst.element_factory_make('uridecodebin')
|
|
self.assertEquals(u.__grefcount__, 1)
|
|
self.failIf(getattr(p, 'type', None))
|
|
p.add(u)
|
|
self.assertEquals(getattr(p, 'type', None), gst.MESSAGE_CLOCK_PROVIDE)
|
|
self.assertEquals(u.__grefcount__, 2)
|
|
del p
|
|
self.assertEquals(u.__grefcount__, 1)
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|