mirror of
https://gitlab.freedesktop.org/gstreamer/gstreamer.git
synced 2025-01-08 08:25:33 +00:00
f9f30c4ced
a.k.a muting layers. Adding unit tests and making sure serialization works properly
321 lines
11 KiB
Python
321 lines
11 KiB
Python
# -*- coding: utf-8 -*-
|
|
#
|
|
# Copyright (c) 2016 Alexandru Băluț <alexandru.balut@gmail.com>
|
|
# Copyright (c) 2016, Thibault Saunier
|
|
#
|
|
# This program is free software; you can redistribute it and/or
|
|
# modify it under the terms of the GNU Lesser General Public
|
|
# License as published by the Free Software Foundation; either
|
|
# version 2.1 of the License, or (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
|
# Lesser General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Lesser General Public
|
|
# License along with this program; if not, write to the
|
|
# Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
|
|
# Boston, MA 02110-1301, USA.
|
|
|
|
import gi
|
|
|
|
gi.require_version("Gst", "1.0")
|
|
gi.require_version("GES", "1.0")
|
|
|
|
from gi.repository import Gst # noqa
|
|
from gi.repository import GES # noqa
|
|
from gi.repository import GLib # noqa
|
|
from gi.repository import GObject # noqa
|
|
import contextlib # noqa
|
|
import os #noqa
|
|
import unittest # noqa
|
|
import tempfile # noqa
|
|
|
|
Gst.init(None)
|
|
GES.init()
|
|
|
|
|
|
def create_main_loop():
|
|
"""Creates a MainLoop with a timeout."""
|
|
mainloop = GLib.MainLoop()
|
|
timed_out = False
|
|
|
|
def timeout_cb(unused):
|
|
nonlocal timed_out
|
|
timed_out = True
|
|
mainloop.quit()
|
|
|
|
def run(timeout_seconds=5, until_empty=False):
|
|
source = GLib.timeout_source_new_seconds(timeout_seconds)
|
|
source.set_callback(timeout_cb)
|
|
source.attach()
|
|
if until_empty:
|
|
GLib.idle_add(mainloop.quit)
|
|
GLib.MainLoop.run(mainloop)
|
|
source.destroy()
|
|
if timed_out:
|
|
raise Exception("Timed out after %s seconds" % timeout_seconds)
|
|
|
|
mainloop.run = run
|
|
return mainloop
|
|
|
|
|
|
def create_project(with_group=False, saved=False):
|
|
"""Creates a project with two clips in a group."""
|
|
timeline = GES.Timeline.new_audio_video()
|
|
layer = timeline.append_layer()
|
|
|
|
if with_group:
|
|
clip1 = GES.TitleClip()
|
|
clip1.set_start(0)
|
|
clip1.set_duration(10*Gst.SECOND)
|
|
layer.add_clip(clip1)
|
|
clip2 = GES.TitleClip()
|
|
clip2.set_start(100 * Gst.SECOND)
|
|
clip2.set_duration(10*Gst.SECOND)
|
|
layer.add_clip(clip2)
|
|
group = GES.Container.group([clip1, clip2])
|
|
|
|
if saved:
|
|
if isinstance(saved, str):
|
|
suffix = "-%s.xges" % saved
|
|
else:
|
|
suffix = ".xges"
|
|
uri = "file://%s" % tempfile.NamedTemporaryFile(suffix=suffix).name
|
|
timeline.get_asset().save(timeline, uri, None, overwrite=True)
|
|
|
|
return timeline
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def created_project_file(xges):
|
|
_, xges_path = tempfile.mkstemp(suffix=".xges")
|
|
with open(xges_path, "w") as f:
|
|
f.write(xges)
|
|
|
|
yield Gst.filename_to_uri(os.path.abspath(xges_path))
|
|
|
|
os.remove(xges_path)
|
|
|
|
|
|
def get_asset_uri(name):
|
|
python_tests_dir = os.path.dirname(os.path.abspath(__file__))
|
|
assets_dir = os.path.join(python_tests_dir, "..", "assets")
|
|
return Gst.filename_to_uri(os.path.join(assets_dir, name))
|
|
|
|
|
|
class GESTest(unittest.TestCase):
|
|
|
|
def _log(self, func, format, *args):
|
|
string = format
|
|
if args:
|
|
string = string % args[0]
|
|
func(string)
|
|
|
|
def log(self, format, *args):
|
|
self._log(Gst.log, format, *args)
|
|
|
|
def debug(self, format, *args):
|
|
self._log(Gst.debug, format, *args)
|
|
|
|
def info(self, format, *args):
|
|
self._log(Gst.info, format, *args)
|
|
|
|
def fixme(self, format, *args):
|
|
self._log(Gst.fixme, format, *args)
|
|
|
|
def warning(self, format, *args):
|
|
self._log(Gst.warning, format, *args)
|
|
|
|
def error(self, format, *args):
|
|
self._log(Gst.error, format, *args)
|
|
|
|
def check_clip_values(self, clip, start, in_point, duration):
|
|
for elem in [clip] + clip.get_children(False):
|
|
self.check_element_values(elem, start, in_point, duration)
|
|
|
|
def check_element_values(self, element, start, in_point, duration):
|
|
self.assertEqual(element.props.start, start, element)
|
|
self.assertEqual(element.props.in_point, in_point, element)
|
|
self.assertEqual(element.props.duration, duration, element)
|
|
|
|
def assert_effects(self, clip, *effects):
|
|
# Make sure there are no other effects.
|
|
self.assertEqual(set(clip.get_top_effects()), set(effects))
|
|
|
|
# Make sure their order is correct.
|
|
indexes = [clip.get_top_effect_index(effect)
|
|
for effect in effects]
|
|
self.assertEqual(indexes, list(range(len(effects))))
|
|
|
|
|
|
class GESSimpleTimelineTest(GESTest):
|
|
|
|
def __init__(self, *args):
|
|
self.track_types = [GES.TrackType.AUDIO, GES.TrackType.VIDEO]
|
|
super(GESSimpleTimelineTest, self).__init__(*args)
|
|
|
|
def timeline_as_str(self):
|
|
res = "====== %s =======\n" % self.timeline
|
|
for layer in self.timeline.get_layers():
|
|
res += "Layer %04d: " % layer.get_priority()
|
|
for clip in layer.get_clips():
|
|
res += "{ %s }" % clip
|
|
res += '\n------------------------\n'
|
|
|
|
for group in self.timeline.get_groups():
|
|
res += "GROUP %s :" % group
|
|
for clip in group.get_children(False):
|
|
res += " { %s }" % clip.props.name
|
|
res += '\n'
|
|
res += "================================\n"
|
|
return res
|
|
|
|
def print_timeline(self):
|
|
print(self.timeline_as_str())
|
|
|
|
def setUp(self):
|
|
self.timeline = GES.Timeline.new()
|
|
for track_type in self.track_types:
|
|
self.assertIn(
|
|
track_type, [GES.TrackType.AUDIO, GES.TrackType.VIDEO])
|
|
if track_type == GES.TrackType.AUDIO:
|
|
self.assertTrue(self.timeline.add_track(GES.AudioTrack.new()))
|
|
else:
|
|
self.assertTrue(self.timeline.add_track(GES.VideoTrack.new()))
|
|
|
|
self.assertEqual(len(self.timeline.get_tracks()),
|
|
len(self.track_types))
|
|
self.layer = self.timeline.append_layer()
|
|
|
|
def add_clip(self, start, in_point, duration, asset_type=GES.TestClip):
|
|
clip = GES.Asset.request(asset_type, None).extract()
|
|
clip.props.start = start
|
|
clip.props.in_point = in_point
|
|
clip.props.duration = duration
|
|
self.assertTrue(self.layer.add_clip(clip))
|
|
|
|
return clip
|
|
|
|
def append_clip(self, layer=0, asset_type=GES.TestClip):
|
|
while len(self.timeline.get_layers()) < layer + 1:
|
|
self.timeline.append_layer()
|
|
layer = self.timeline.get_layers()[layer]
|
|
clip = GES.Asset.request(asset_type, None).extract()
|
|
clip.props.start = layer.get_duration()
|
|
clip.props.duration = 10
|
|
self.assertTrue(layer.add_clip(clip))
|
|
|
|
return clip
|
|
|
|
def assertElementAreEqual(self, ref, element):
|
|
self.assertTrue(isinstance(element, type(ref)), "%s and %s do not have the same type!" % (ref, element))
|
|
|
|
props = [p for p in ref.list_properties() if p.name not in ['name']
|
|
and not GObject.type_is_a(p.value_type, GObject.Object)]
|
|
for p in props:
|
|
pname = p.name
|
|
v0 = GObject.Value()
|
|
v0.init(p.value_type)
|
|
v0.set_value(ref.get_property(pname))
|
|
|
|
v1 = GObject.Value()
|
|
v1.init(p.value_type)
|
|
v1.set_value(element.get_property(pname))
|
|
|
|
self.assertTrue(Gst.value_compare(v0, v1) == Gst.VALUE_EQUAL,
|
|
"%s are not equal: %s != %s" % (pname, v0, v1))
|
|
|
|
if isinstance(ref, GES.TrackElement):
|
|
self.assertElementAreEqual(ref.get_nleobject(), element.get_nleobject())
|
|
return
|
|
|
|
if not isinstance(ref, GES.Clip):
|
|
return
|
|
|
|
ttypes = [track.type for track in self.timeline.get_tracks()]
|
|
for ttype in ttypes:
|
|
if ttypes.count(ttype) > 1:
|
|
self.warning("Can't deeply check %s and %s "
|
|
"(only one track per type supported %s %s found)" % (ref,
|
|
element, ttypes.count(ttype), ttype))
|
|
return
|
|
|
|
children = element.get_children(False)
|
|
for ref_child in ref.get_children(False):
|
|
ref_track = ref_child.get_track()
|
|
if not ref_track:
|
|
self.warning("Can't check %s as not in a track" % (ref_child))
|
|
continue
|
|
|
|
child = None
|
|
for tmpchild in children:
|
|
if not isinstance(tmpchild, type(ref_child)):
|
|
continue
|
|
|
|
if ref_track.type != tmpchild.get_track().type:
|
|
continue
|
|
|
|
if not isinstance(ref_child, GES.Effect):
|
|
child = tmpchild
|
|
break
|
|
elif ref_child.props.bin_description == child.props.bin_description:
|
|
child = tmpchild
|
|
break
|
|
|
|
self.assertIsNotNone(child, "Could not find equivalent child %s in %s(%s)" % (ref_child,
|
|
element, children))
|
|
|
|
self.assertElementAreEqual(ref_child, child)
|
|
|
|
def check_reload_timeline(self):
|
|
tmpf = tempfile.NamedTemporaryFile(suffix='.xges')
|
|
uri = Gst.filename_to_uri(tmpf.name)
|
|
self.assertTrue(self.timeline.save_to_uri(uri, None, True))
|
|
project = GES.Project.new(uri)
|
|
mainloop = create_main_loop()
|
|
def loaded_cb(unused_project, unused_timeline):
|
|
mainloop.quit()
|
|
|
|
project.connect("loaded", loaded_cb)
|
|
reloaded_timeline = project.extract()
|
|
|
|
mainloop.run()
|
|
self.assertIsNotNone(reloaded_timeline)
|
|
|
|
layers = self.timeline.get_layers()
|
|
reloaded_layers = reloaded_timeline.get_layers()
|
|
self.assertEqual(len(layers), len(reloaded_layers))
|
|
for layer, reloaded_layer in zip(layers, reloaded_layers):
|
|
clips = layer.get_clips()
|
|
reloaded_clips = reloaded_layer.get_clips()
|
|
self.assertEqual(len(clips), len(reloaded_clips))
|
|
for clip, reloaded_clip in zip(clips, reloaded_clips):
|
|
self.assertElementAreEqual(clip, reloaded_clip)
|
|
|
|
return reloaded_timeline
|
|
|
|
def assertTimelineTopology(self, topology, groups=[]):
|
|
res = []
|
|
for layer in self.timeline.get_layers():
|
|
layer_timings = []
|
|
for clip in layer.get_clips():
|
|
layer_timings.append(
|
|
(type(clip), clip.props.start, clip.props.duration))
|
|
for child in clip.get_children(True):
|
|
self.assertEqual(child.props.start, clip.props.start)
|
|
self.assertEqual(child.props.duration, clip.props.duration)
|
|
|
|
res.append(layer_timings)
|
|
if topology != res:
|
|
Gst.error(self.timeline_as_str())
|
|
self.assertEqual(topology, res)
|
|
|
|
timeline_groups = self.timeline.get_groups()
|
|
if groups and timeline_groups:
|
|
for i, group in enumerate(groups):
|
|
self.assertEqual(set(group), set(timeline_groups[i].get_children(False)))
|
|
self.assertEqual(len(timeline_groups), i + 1)
|
|
|
|
return res
|