mirror of
https://gitlab.freedesktop.org/gstreamer/gstreamer.git
synced 2024-12-12 19:36:38 +00:00
6b7c658b6a
We were implementing the logic for moving/trimming elements specific to SourceClip but this was not correct ass the new timeline tree allows us to handle that for all element types in a generic and nice way. This make us need to have groups trimming properly implemented in the timeline tree, leading to some fixes in the group tests. This adds tests for the various cases known to not be handled properly by the previous code. Fixes https://gitlab.freedesktop.org/gstreamer/gst-editing-services/issues/92
233 lines
7.7 KiB
Python
233 lines
7.7 KiB
Python
# -*- coding: utf-8 -*-
|
|
#
|
|
# Copyright (c) 2016 Alexandru Băluț <alexandru.balut@gmail.com>
|
|
# Copyright (c) 2016, Thibault Saunier
|
|
#
|
|
# This program is free software; you can redistribute it and/or
|
|
# modify it under the terms of the GNU Lesser General Public
|
|
# License as published by the Free Software Foundation; either
|
|
# version 2.1 of the License, or (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
|
# Lesser General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Lesser General Public
|
|
# License along with this program; if not, write to the
|
|
# Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
|
|
# Boston, MA 02110-1301, USA.
|
|
|
|
import gi
|
|
|
|
gi.require_version("Gst", "1.0")
|
|
gi.require_version("GES", "1.0")
|
|
|
|
from gi.repository import Gst # noqa
|
|
from gi.repository import GES # noqa
|
|
from gi.repository import GLib # noqa
|
|
import contextlib # noqa
|
|
import os #noqa
|
|
import unittest # noqa
|
|
import tempfile # noqa
|
|
|
|
Gst.init(None)
|
|
GES.init()
|
|
|
|
|
|
def create_main_loop():
|
|
"""Creates a MainLoop with a timeout."""
|
|
mainloop = GLib.MainLoop()
|
|
timed_out = False
|
|
|
|
def timeout_cb(unused):
|
|
nonlocal timed_out
|
|
timed_out = True
|
|
mainloop.quit()
|
|
|
|
def run(timeout_seconds=5, until_empty=False):
|
|
source = GLib.timeout_source_new_seconds(timeout_seconds)
|
|
source.set_callback(timeout_cb)
|
|
source.attach()
|
|
if until_empty:
|
|
GLib.idle_add(mainloop.quit)
|
|
GLib.MainLoop.run(mainloop)
|
|
source.destroy()
|
|
if timed_out:
|
|
raise Exception("Timed out after %s seconds" % timeout_seconds)
|
|
|
|
mainloop.run = run
|
|
return mainloop
|
|
|
|
|
|
def create_project(with_group=False, saved=False):
|
|
"""Creates a project with two clips in a group."""
|
|
timeline = GES.Timeline.new_audio_video()
|
|
layer = timeline.append_layer()
|
|
|
|
if with_group:
|
|
clip1 = GES.TitleClip()
|
|
clip1.set_start(0)
|
|
clip1.set_duration(10*Gst.SECOND)
|
|
layer.add_clip(clip1)
|
|
clip2 = GES.TitleClip()
|
|
clip2.set_start(100 * Gst.SECOND)
|
|
clip2.set_duration(10*Gst.SECOND)
|
|
layer.add_clip(clip2)
|
|
group = GES.Container.group([clip1, clip2])
|
|
|
|
if saved:
|
|
if isinstance(saved, str):
|
|
suffix = "-%s.xges" % saved
|
|
else:
|
|
suffix = ".xges"
|
|
uri = "file://%s" % tempfile.NamedTemporaryFile(suffix=suffix).name
|
|
timeline.get_asset().save(timeline, uri, None, overwrite=True)
|
|
|
|
return timeline
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def created_project_file(xges):
|
|
_, xges_path = tempfile.mkstemp(suffix=".xges")
|
|
with open(xges_path, "w") as f:
|
|
f.write(xges)
|
|
|
|
yield Gst.filename_to_uri(os.path.abspath(xges_path))
|
|
|
|
os.remove(xges_path)
|
|
|
|
|
|
def get_asset_uri(name):
|
|
python_tests_dir = os.path.dirname(os.path.abspath(__file__))
|
|
assets_dir = os.path.join(python_tests_dir, "..", "assets")
|
|
return Gst.filename_to_uri(os.path.join(assets_dir, name))
|
|
|
|
|
|
class GESTest(unittest.TestCase):
|
|
|
|
def _log(self, func, format, *args):
|
|
string = format
|
|
if args:
|
|
string = string % args[0]
|
|
func(string)
|
|
|
|
def log(self, format, *args):
|
|
self._log(Gst.log, format, *args)
|
|
|
|
def debug(self, format, *args):
|
|
self._log(Gst.debug, format, *args)
|
|
|
|
def info(self, format, *args):
|
|
self._log(Gst.info, format, *args)
|
|
|
|
def fixme(self, format, *args):
|
|
self._log(Gst.fixme, format, *args)
|
|
|
|
def warning(self, format, *args):
|
|
self._log(Gst.warning, format, *args)
|
|
|
|
def error(self, format, *args):
|
|
self._log(Gst.error, format, *args)
|
|
|
|
def check_clip_values(self, clip, start, in_point, duration):
|
|
for elem in [clip] + clip.get_children(False):
|
|
self.check_element_values(elem, start, in_point, duration)
|
|
|
|
def check_element_values(self, element, start, in_point, duration):
|
|
self.assertEqual(element.props.start, start, element)
|
|
self.assertEqual(element.props.in_point, in_point, element)
|
|
self.assertEqual(element.props.duration, duration, element)
|
|
|
|
def assert_effects(self, clip, *effects):
|
|
# Make sure there are no other effects.
|
|
self.assertEqual(set(clip.get_top_effects()), set(effects))
|
|
|
|
# Make sure their order is correct.
|
|
indexes = [clip.get_top_effect_index(effect)
|
|
for effect in effects]
|
|
self.assertEqual(indexes, list(range(len(effects))))
|
|
|
|
|
|
class GESSimpleTimelineTest(GESTest):
|
|
|
|
def __init__(self, *args):
|
|
self.track_types = [GES.TrackType.AUDIO, GES.TrackType.VIDEO]
|
|
super(GESSimpleTimelineTest, self).__init__(*args)
|
|
|
|
def timeline_as_str(self):
|
|
res = "====== %s =======\n" % self.timeline
|
|
for layer in self.timeline.get_layers():
|
|
res += "Layer %04d: " % layer.get_priority()
|
|
for clip in layer.get_clips():
|
|
res += "{ %s }" % clip
|
|
res += '\n------------------------\n'
|
|
|
|
for group in self.timeline.get_groups():
|
|
res += "GROUP %s :" % group
|
|
for clip in group.get_children(False):
|
|
res += " { %s }" % clip.props.name
|
|
res += '\n'
|
|
res += "================================\n"
|
|
return res
|
|
|
|
def print_timeline(self):
|
|
print(self.timeline_as_str())
|
|
|
|
def setUp(self):
|
|
self.timeline = GES.Timeline.new()
|
|
for track_type in self.track_types:
|
|
self.assertIn(
|
|
track_type, [GES.TrackType.AUDIO, GES.TrackType.VIDEO])
|
|
if track_type == GES.TrackType.AUDIO:
|
|
self.assertTrue(self.timeline.add_track(GES.AudioTrack.new()))
|
|
else:
|
|
self.assertTrue(self.timeline.add_track(GES.VideoTrack.new()))
|
|
|
|
self.assertEqual(len(self.timeline.get_tracks()),
|
|
len(self.track_types))
|
|
self.layer = self.timeline.append_layer()
|
|
|
|
def add_clip(self, start, in_point, duration, asset_type=GES.TestClip):
|
|
clip = GES.Asset.request(asset_type, None).extract()
|
|
clip.props.start = start
|
|
clip.props.in_point = in_point
|
|
clip.props.duration = duration
|
|
self.assertTrue(self.layer.add_clip(clip))
|
|
|
|
return clip
|
|
|
|
def append_clip(self, layer=0, asset_type=GES.TestClip):
|
|
while len(self.timeline.get_layers()) < layer + 1:
|
|
self.timeline.append_layer()
|
|
layer = self.timeline.get_layers()[layer]
|
|
clip = GES.Asset.request(asset_type, None).extract()
|
|
clip.props.start = layer.get_duration()
|
|
clip.props.duration = 10
|
|
self.assertTrue(layer.add_clip(clip))
|
|
|
|
return clip
|
|
|
|
def assertTimelineTopology(self, topology, groups=[]):
|
|
res = []
|
|
for layer in self.timeline.get_layers():
|
|
layer_timings = []
|
|
for clip in layer.get_clips():
|
|
layer_timings.append(
|
|
(type(clip), clip.props.start, clip.props.duration))
|
|
for child in clip.get_children(True):
|
|
self.assertEqual(child.props.start, clip.props.start)
|
|
self.assertEqual(child.props.duration, clip.props.duration)
|
|
|
|
res.append(layer_timings)
|
|
if topology != res:
|
|
Gst.error(self.timeline_as_str())
|
|
self.assertEqual(topology, res)
|
|
|
|
timeline_groups = self.timeline.get_groups()
|
|
if groups and timeline_groups:
|
|
for i, group in enumerate(groups):
|
|
self.assertEqual(set(group), set(timeline_groups[i].get_children(False)))
|
|
self.assertEqual(len(timeline_groups), i + 1)
|
|
|
|
return res
|