# -*- Mode: Python -*-
# vi:si:et:sw=4:sts=4:ts=4
#
# gst-python - Python bindings for GStreamer
# Copyright (C) 2002 David I. Lehn
# Copyright (C) 2004 Johan Dahlin
# Copyright (C) 2005 Edward Hervey
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301  USA

import time

from common import gst, unittest, TestCase, pygobject_2_13

import gobject

class TestConstruction(TestCase):
    def setUp(self):
        self.gctrack()

    def tearDown(self):
        self.gccollect()
        self.gcverify()

    def testGoodConstructor(self):
        name = 'test-pipeline'
        pipeline = gst.Pipeline(name)
        self.assertEquals(pipeline.__gstrefcount__, 1)
        assert pipeline is not None, 'pipeline is None'
        self.failUnless(isinstance(pipeline, gst.Pipeline),
            'pipeline is not a GstPipline')
        assert pipeline.get_name() == name, 'pipelines name is wrong'
        self.assertEquals(pipeline.__gstrefcount__, 1)

    def testParseLaunch(self):
        pipeline = gst.parse_launch('fakesrc ! fakesink')
        
class Pipeline(TestCase):
    def setUp(self):
        self.gctrack()
        self.pipeline = gst.Pipeline('test-pipeline')
        source = gst.element_factory_make('fakesrc', 'source')
        source.set_property('num-buffers', 5)
        sink = gst.element_factory_make('fakesink', 'sink')
        self.pipeline.add(source, sink)
        gst.element_link_many(source, sink)

    def tearDown(self):
        del self.pipeline
        self.gccollect()
        self.gcverify()

    def testRun(self):
        self.assertEqual(self.pipeline.get_state()[1], gst.STATE_NULL)
        self.pipeline.set_state(gst.STATE_PLAYING)
        self.assertEqual(self.pipeline.get_state()[1], gst.STATE_PLAYING)
        
        time.sleep(1)

        self.assertEqual(self.pipeline.get_state()[1], gst.STATE_PLAYING)
        self.pipeline.set_state(gst.STATE_NULL)
        self.assertEqual(self.pipeline.get_state()[1], gst.STATE_NULL)

class PipelineTags(TestCase):
    def setUp(self):
        self.gctrack()
        self.pipeline = gst.parse_launch('audiotestsrc num-buffers=100 ! vorbisenc name=encoder ! oggmux name=muxer ! fakesink')

    def tearDown(self):
        del self.pipeline
        self.gccollect()
        self.gcverify()

    def testRun(self):
        # in 0.10.15.1, this triggers
        # sys:1: gobject.Warning: g_value_get_uint: assertion `G_VALUE_HOLDS_UINT (value)' failed
        # during pipeline playing

        l = gst.TagList()
        l[gst.TAG_ARTIST] = 'artist'
        l[gst.TAG_TRACK_NUMBER] = 1
        encoder = self.pipeline.get_by_name('encoder')
        encoder.merge_tags(l, gst.TAG_MERGE_APPEND)

        self.assertEqual(self.pipeline.get_state()[1], gst.STATE_NULL)
        self.pipeline.set_state(gst.STATE_PLAYING)
        self.assertEqual(self.pipeline.get_state()[1], gst.STATE_PLAYING)
        
        time.sleep(1)

        self.assertEqual(self.pipeline.get_state()[1], gst.STATE_PLAYING)
        self.pipeline.set_state(gst.STATE_NULL)
        self.assertEqual(self.pipeline.get_state()[1], gst.STATE_NULL)


class Bus(TestCase):
    def testGet(self):
        pipeline = gst.Pipeline('test')
        self.assertEquals(pipeline.__gstrefcount__, 1)
        bus = pipeline.get_bus()
        self.assertEquals(pipeline.__gstrefcount__, 1)
        # one for python and one for the pipeline
        self.assertEquals(bus.__gstrefcount__, 2)

        del pipeline
		if not pygobject_2_13:
            self.failUnless(self.gccollect())
        self.assertEquals(bus.__gstrefcount__, 1)
        
class PipelineAndBus(TestCase):
    def setUp(self):
        TestCase.setUp(self)
        self.pipeline = gst.Pipeline('test-pipeline')
        source = gst.element_factory_make('fakesrc', 'source')
        sink = gst.element_factory_make('fakesink', 'sink')
        self.pipeline.add(source, sink)
        gst.element_link_many(source, sink)

        self.bus = self.pipeline.get_bus()
        self.assertEquals(self.bus.__gstrefcount__, 2)
        self.handler = self.bus.add_watch(self._message_received)
        self.assertEquals(self.bus.__gstrefcount__, 3)
        self.assertEquals(self.pipeline.__gstrefcount__, 1)

        self.loop = gobject.MainLoop()

    def tearDown(self):
        # FIXME: fix the refcount issues with the bus/pipeline
        # flush the bus to be able to assert on the pipeline refcount
        #while self.pipeline.__gstrefcount__ > 1:
        self.gccollect()

        # one for the pipeline, two for the snake
        # three for the watch now shake shake shake but don't you
        self.assertEquals(self.bus.__gstrefcount__, 3)
        self.failUnless(gobject.source_remove(self.handler))
        self.assertEquals(self.bus.__gstrefcount__, 2)
        self.gccollect()

        gst.debug('THOMAS: pipeline rc %d' % self.pipeline.__gstrefcount__)
        #self.assertEquals(self.pipeline.__gstrefcount__, 1)
        del self.pipeline
        self.gccollect()
        #self.assertEquals(self.bus.__gstrefcount__, 2)
        del self.bus
        self.gccollect()

        # the async thread can be holding a ref, Wim is going to work on this
        #TestCase.tearDown(self)

    def _message_received(self, bus, message):
        gst.debug('received message: %s, %s' % (
            message.src.get_path_string(), message.type.value_nicks[1]))
        t = message.type
        if t == gst.MESSAGE_STATE_CHANGED:
            old, new, pen = message.parse_state_changed()
            gst.debug('%r state change from %r to %r' % (
                message.src.get_path_string(), old, new))
            if message.src == self.pipeline and new == self.final:
                self.loop.quit()

        return True

    def testPlaying(self):
        self.final = gst.STATE_PLAYING
        ret = self.pipeline.set_state(gst.STATE_PLAYING)
        self.assertEquals(ret, gst.STATE_CHANGE_ASYNC)

        # go into a main loop to wait for messages
        self.loop.run()

        # we go to READY so we get messages; going to NULL would set
        # the bus flushing
        self.final = gst.STATE_READY
        ret = self.pipeline.set_state(gst.STATE_READY)
        self.assertEquals(ret, gst.STATE_CHANGE_SUCCESS)
        self.loop.run()

        # FIXME: not setting to NULL causes a deadlock; we might want to
        # fix this in the bindings
        self.assertEquals(self.pipeline.set_state(gst.STATE_NULL),
            gst.STATE_CHANGE_SUCCESS)
        self.assertEquals(self.pipeline.get_state(),
            (gst.STATE_CHANGE_SUCCESS, gst.STATE_NULL, gst.STATE_VOID_PENDING))
        self.gccollect()

class TestPipeSub(gst.Pipeline):
    def do_handle_message(self, message):
        self.debug('do_handle_message')
        gst.Pipeline.do_handle_message(self, message)
        self.type = message.type
gobject.type_register(TestPipeSub)

class TestPipeSubSub(TestPipeSub):
    def do_handle_message(self, message):
        self.debug('do_handle_message')
        TestPipeSub.do_handle_message(self, message)
gobject.type_register(TestPipeSubSub)


# see http://bugzilla.gnome.org/show_bug.cgi?id=577735
class TestSubClass(TestCase):
    def setUp(self):
        self.gctrack()

    def tearDown(self):
        self.gccollect()
        self.gcverify()

    def testSubClass(self):
        p = TestPipeSub()
        u = gst.element_factory_make('uridecodebin')
        self.assertEquals(u.__grefcount__, 1)
        self.failIf(getattr(p, 'type', None))
        # adding uridecodebin triggers a clock-provide message;
        # this message should be dropped, and thus not affect
        # the refcount of u beyond the parenting.
        p.add(u)
        self.assertEquals(getattr(p, 'type', None), gst.MESSAGE_CLOCK_PROVIDE)
        self.assertEquals(u.__grefcount__, 2)
        del p
        self.assertEquals(u.__grefcount__, 1)

    def testSubSubClass(self):
        # Edward is worried that a subclass of a subclass will screw up
        # the refcounting wrt. GST_BUS_DROP
        p = TestPipeSubSub()
        u = gst.element_factory_make('uridecodebin')
        self.assertEquals(u.__grefcount__, 1)
        self.failIf(getattr(p, 'type', None))
        p.add(u)
        self.assertEquals(getattr(p, 'type', None), gst.MESSAGE_CLOCK_PROVIDE)
        self.assertEquals(u.__grefcount__, 2)
        del p
        self.assertEquals(u.__grefcount__, 1)
     
if __name__ == "__main__":
    unittest.main()