mirror of
https://gitlab.freedesktop.org/gstreamer/gstreamer.git
synced 2025-01-10 17:35:59 +00:00
19c0921179
And use a simple GStreamer pipeline as testsrcbin with GstTranscoder doesn't let us easily set the framerate of the source and we end up having videorate dropping frames leading to the rendered file having an unprecise duration. This should fix races with `check.gst-editing-services.pythontests.pyunittest.python.test_assets.TestTimeline.test_reload_asset` Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/1130>
772 lines
28 KiB
Python
772 lines
28 KiB
Python
# -*- coding: utf-8 -*-
|
|
#
|
|
# Copyright (c) 2016 Alexandru Băluț <alexandru.balut@gmail.com>
|
|
# Copyright (c) 2016, Thibault Saunier
|
|
#
|
|
# This program is free software; you can redistribute it and/or
|
|
# modify it under the terms of the GNU Lesser General Public
|
|
# License as published by the Free Software Foundation; either
|
|
# version 2.1 of the License, or (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
|
# Lesser General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Lesser General Public
|
|
# License along with this program; if not, write to the
|
|
# Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
|
|
# Boston, MA 02110-1301, USA.
|
|
|
|
from urllib.parse import urlparse
|
|
import gi
|
|
|
|
gi.require_version("Gst", "1.0")
|
|
gi.require_version("GES", "1.0")
|
|
|
|
from gi.repository import Gst # noqa
|
|
from gi.repository import GES # noqa
|
|
from gi.repository import GLib # noqa
|
|
from gi.repository import GObject # noqa
|
|
import contextlib # noqa
|
|
import os #noqa
|
|
import unittest # noqa
|
|
import tempfile # noqa
|
|
|
|
try:
|
|
gi.require_version("GstTranscoder", "1.0")
|
|
from gi.repository import GstTranscoder
|
|
except ValueError:
|
|
GstTranscoder = None
|
|
|
|
Gst.init(None)
|
|
GES.init()
|
|
|
|
|
|
def create_main_loop():
|
|
"""Creates a MainLoop with a timeout."""
|
|
mainloop = GLib.MainLoop()
|
|
timed_out = False
|
|
|
|
def timeout_cb(unused):
|
|
nonlocal timed_out
|
|
timed_out = True
|
|
mainloop.quit()
|
|
|
|
def run(timeout_seconds=5, until_empty=False):
|
|
source = GLib.timeout_source_new_seconds(timeout_seconds)
|
|
source.set_callback(timeout_cb)
|
|
source.attach()
|
|
if until_empty:
|
|
GLib.idle_add(mainloop.quit)
|
|
GLib.MainLoop.run(mainloop)
|
|
source.destroy()
|
|
if timed_out:
|
|
raise Exception("Timed out after %s seconds" % timeout_seconds)
|
|
|
|
mainloop.run = run
|
|
return mainloop
|
|
|
|
|
|
def create_project(with_group=False, saved=False):
|
|
"""Creates a project with two clips in a group."""
|
|
timeline = GES.Timeline.new_audio_video()
|
|
layer = timeline.append_layer()
|
|
|
|
if with_group:
|
|
clip1 = GES.TitleClip()
|
|
clip1.set_start(0)
|
|
clip1.set_duration(10*Gst.SECOND)
|
|
layer.add_clip(clip1)
|
|
clip2 = GES.TitleClip()
|
|
clip2.set_start(100 * Gst.SECOND)
|
|
clip2.set_duration(10*Gst.SECOND)
|
|
layer.add_clip(clip2)
|
|
group = GES.Container.group([clip1, clip2])
|
|
|
|
if saved:
|
|
if isinstance(saved, str):
|
|
suffix = "-%s.xges" % saved
|
|
else:
|
|
suffix = ".xges"
|
|
uri = "file://%s" % tempfile.NamedTemporaryFile(suffix=suffix).name
|
|
timeline.get_asset().save(timeline, uri, None, overwrite=True)
|
|
|
|
return timeline
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def created_project_file(xges):
|
|
_, xges_path = tempfile.mkstemp(suffix=".xges")
|
|
with open(xges_path, "w") as f:
|
|
f.write(xges)
|
|
|
|
yield Gst.filename_to_uri(os.path.abspath(xges_path))
|
|
|
|
os.remove(xges_path)
|
|
|
|
|
|
def can_generate_assets():
|
|
if GstTranscoder is None:
|
|
return False, "GstTranscoder is not available"
|
|
|
|
if not Gst.ElementFactory.make("testsrcbin"):
|
|
return False, "testbinsrc is not available"
|
|
|
|
return True, None
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def created_video_asset(uri=None, num_bufs=30, framerate="30/1"):
|
|
with tempfile.NamedTemporaryFile(suffix=".ogg") as f:
|
|
if not uri:
|
|
uri = Gst.filename_to_uri(f.name)
|
|
name = f.name
|
|
else:
|
|
name = urlparse(uri).path
|
|
pipe = Gst.parse_launch(f"videotestsrc num-buffers={num_bufs} ! video/x-raw,framerate={framerate} ! theoraenc ! oggmux ! filesink location={name}")
|
|
pipe.set_state(Gst.State.PLAYING)
|
|
assert pipe.get_bus().timed_pop_filtered(Gst.CLOCK_TIME_NONE, Gst.MessageType.EOS)
|
|
pipe.set_state(Gst.State.NULL)
|
|
|
|
yield uri
|
|
|
|
|
|
def get_asset_uri(name):
|
|
python_tests_dir = os.path.dirname(os.path.abspath(__file__))
|
|
assets_dir = os.path.join(python_tests_dir, "..", "assets")
|
|
return Gst.filename_to_uri(os.path.join(assets_dir, name))
|
|
|
|
|
|
class GESTest(unittest.TestCase):
|
|
|
|
def _log(self, func, format, *args):
|
|
string = format
|
|
if args:
|
|
string = string % args[0]
|
|
func(string)
|
|
|
|
def log(self, format, *args):
|
|
self._log(Gst.log, format, *args)
|
|
|
|
def debug(self, format, *args):
|
|
self._log(Gst.debug, format, *args)
|
|
|
|
def info(self, format, *args):
|
|
self._log(Gst.info, format, *args)
|
|
|
|
def fixme(self, format, *args):
|
|
self._log(Gst.fixme, format, *args)
|
|
|
|
def warning(self, format, *args):
|
|
self._log(Gst.warning, format, *args)
|
|
|
|
def error(self, format, *args):
|
|
self._log(Gst.error, format, *args)
|
|
|
|
def check_clip_values(self, clip, start, in_point, duration):
|
|
for elem in [clip] + clip.get_children(False):
|
|
self.check_element_values(elem, start, in_point, duration)
|
|
|
|
def check_element_values(self, element, start, in_point, duration):
|
|
self.assertEqual(element.props.start, start, element)
|
|
self.assertEqual(element.props.in_point, in_point, element)
|
|
self.assertEqual(element.props.duration, duration, element)
|
|
|
|
def assert_effects(self, clip, *effects):
|
|
# Make sure there are no other effects.
|
|
self.assertEqual(set(clip.get_top_effects()), set(effects))
|
|
|
|
# Make sure their order is correct.
|
|
indexes = [clip.get_top_effect_index(effect)
|
|
for effect in effects]
|
|
self.assertEqual(indexes, list(range(len(effects))))
|
|
|
|
def assertGESError(self, error, code, message=""):
|
|
if error is None:
|
|
raise AssertionError(
|
|
"{}{}Received no error".format(message, message and ": "))
|
|
if error.domain != "GES_ERROR":
|
|
raise AssertionError(
|
|
"{}{}Received error ({}) in domain {} rather than "
|
|
"GES_ERROR".format(
|
|
message, message and ": ", error.message, error.domain))
|
|
err_code = GES.Error(error.code)
|
|
if err_code != code:
|
|
raise AssertionError(
|
|
"{}{}Received {} error ({}) rather than {}".format(
|
|
message, message and ": ", err_code.value_name,
|
|
error.message, code.value_name))
|
|
|
|
class GESSimpleTimelineTest(GESTest):
|
|
|
|
def __init__(self, *args):
|
|
self.track_types = [GES.TrackType.AUDIO, GES.TrackType.VIDEO]
|
|
super(GESSimpleTimelineTest, self).__init__(*args)
|
|
|
|
def timeline_as_str(self):
|
|
res = "====== %s =======\n" % self.timeline
|
|
for layer in self.timeline.get_layers():
|
|
res += "Layer %04d: " % layer.get_priority()
|
|
for clip in layer.get_clips():
|
|
res += "{ %s }" % clip
|
|
res += '\n------------------------\n'
|
|
|
|
for group in self.timeline.get_groups():
|
|
res += "GROUP %s :" % group
|
|
for clip in group.get_children(False):
|
|
res += " { %s }" % clip.props.name
|
|
res += '\n'
|
|
res += "================================\n"
|
|
return res
|
|
|
|
def print_timeline(self):
|
|
print(self.timeline_as_str())
|
|
|
|
def setUp(self):
|
|
self.timeline = GES.Timeline.new()
|
|
for track_type in self.track_types:
|
|
self.assertIn(
|
|
track_type, [GES.TrackType.AUDIO, GES.TrackType.VIDEO])
|
|
if track_type == GES.TrackType.AUDIO:
|
|
self.assertTrue(self.timeline.add_track(GES.AudioTrack.new()))
|
|
else:
|
|
self.assertTrue(self.timeline.add_track(GES.VideoTrack.new()))
|
|
|
|
self.assertEqual(len(self.timeline.get_tracks()),
|
|
len(self.track_types))
|
|
self.layer = self.timeline.append_layer()
|
|
|
|
def add_clip(self, start, in_point, duration, asset_type=GES.TestClip):
|
|
clip = GES.Asset.request(asset_type, None).extract()
|
|
clip.props.start = start
|
|
clip.props.in_point = in_point
|
|
clip.props.duration = duration
|
|
self.assertTrue(self.layer.add_clip(clip))
|
|
|
|
return clip
|
|
|
|
def append_clip(self, layer=0, asset_type=GES.TestClip, asset_id=None):
|
|
while len(self.timeline.get_layers()) < layer + 1:
|
|
self.timeline.append_layer()
|
|
layer = self.timeline.get_layers()[layer]
|
|
if asset_type == GES.UriClip:
|
|
asset = GES.UriClipAsset.request_sync(asset_id)
|
|
else:
|
|
asset = GES.Asset.request(asset_type, asset_id)
|
|
clip = asset.extract()
|
|
clip.props.start = layer.get_duration()
|
|
clip.props.duration = 10
|
|
self.assertTrue(layer.add_clip(clip))
|
|
|
|
return clip
|
|
|
|
def assertElementAreEqual(self, ref, element):
|
|
self.assertTrue(isinstance(element, type(ref)), "%s and %s do not have the same type!" % (ref, element))
|
|
|
|
props = [p for p in ref.list_properties() if p.name not in ['name']
|
|
and not GObject.type_is_a(p.value_type, GObject.Object)]
|
|
for p in props:
|
|
pname = p.name
|
|
refval = GObject.Value()
|
|
refval.init(p.value_type)
|
|
refval.set_value(ref.get_property(pname))
|
|
|
|
value = GObject.Value()
|
|
value.init(p.value_type)
|
|
value.set_value(element.get_property(pname))
|
|
|
|
self.assertTrue(Gst.value_compare(refval, value) == Gst.VALUE_EQUAL,
|
|
"%s are not equal: %s != %s\n %s != %s" % (pname, value, refval, element, ref))
|
|
|
|
if isinstance(ref, GES.TrackElement):
|
|
self.assertElementAreEqual(ref.get_nleobject(), element.get_nleobject())
|
|
return
|
|
|
|
if not isinstance(ref, GES.Clip):
|
|
return
|
|
|
|
ttypes = [track.type for track in self.timeline.get_tracks()]
|
|
for ttype in ttypes:
|
|
if ttypes.count(ttype) > 1:
|
|
self.warning("Can't deeply check %s and %s "
|
|
"(only one track per type supported %s %s found)" % (ref,
|
|
element, ttypes.count(ttype), ttype))
|
|
return
|
|
|
|
children = element.get_children(False)
|
|
for ref_child in ref.get_children(False):
|
|
ref_track = ref_child.get_track()
|
|
if not ref_track:
|
|
self.warning("Can't check %s as not in a track" % (ref_child))
|
|
continue
|
|
|
|
child = None
|
|
for tmpchild in children:
|
|
if not isinstance(tmpchild, type(ref_child)):
|
|
continue
|
|
|
|
if ref_track.type != tmpchild.get_track().type:
|
|
continue
|
|
|
|
if not isinstance(ref_child, GES.Effect):
|
|
child = tmpchild
|
|
break
|
|
elif ref_child.props.bin_description == tmpchild.props.bin_description:
|
|
child = tmpchild
|
|
break
|
|
|
|
self.assertIsNotNone(child, "Could not find equivalent child %s in %s(%s)" % (ref_child,
|
|
element, children))
|
|
|
|
self.assertElementAreEqual(ref_child, child)
|
|
|
|
def check_reload_timeline(self):
|
|
tmpf = tempfile.NamedTemporaryFile(suffix='.xges')
|
|
uri = Gst.filename_to_uri(tmpf.name)
|
|
self.assertTrue(self.timeline.save_to_uri(uri, None, True))
|
|
project = GES.Project.new(uri)
|
|
mainloop = create_main_loop()
|
|
def loaded_cb(unused_project, unused_timeline):
|
|
mainloop.quit()
|
|
|
|
project.connect("loaded", loaded_cb)
|
|
reloaded_timeline = project.extract()
|
|
|
|
mainloop.run()
|
|
self.assertIsNotNone(reloaded_timeline)
|
|
|
|
layers = self.timeline.get_layers()
|
|
reloaded_layers = reloaded_timeline.get_layers()
|
|
self.assertEqual(len(layers), len(reloaded_layers))
|
|
for layer, reloaded_layer in zip(layers, reloaded_layers):
|
|
clips = layer.get_clips()
|
|
reloaded_clips = reloaded_layer.get_clips()
|
|
self.assertEqual(len(clips), len(reloaded_clips))
|
|
for clip, reloaded_clip in zip(clips, reloaded_clips):
|
|
self.assertElementAreEqual(clip, reloaded_clip)
|
|
|
|
return reloaded_timeline
|
|
|
|
def assertTimelineTopology(self, topology, groups=[]):
|
|
res = []
|
|
for layer in self.timeline.get_layers():
|
|
layer_timings = []
|
|
for clip in layer.get_clips():
|
|
layer_timings.append(
|
|
(type(clip), clip.props.start, clip.props.duration))
|
|
for child in clip.get_children(True):
|
|
self.assertEqual(child.props.start, clip.props.start)
|
|
self.assertEqual(child.props.duration, clip.props.duration)
|
|
|
|
res.append(layer_timings)
|
|
if topology != res:
|
|
Gst.error(self.timeline_as_str())
|
|
self.assertEqual(topology, res)
|
|
|
|
timeline_groups = self.timeline.get_groups()
|
|
if groups and timeline_groups:
|
|
for i, group in enumerate(groups):
|
|
self.assertEqual(set(group), set(timeline_groups[i].get_children(False)))
|
|
self.assertEqual(len(timeline_groups), i + 1)
|
|
|
|
return res
|
|
|
|
|
|
class GESTimelineConfigTest(GESTest):
|
|
"""
|
|
Tests where all the configuration changes, snapping positions and
|
|
auto-transitions are accounted for.
|
|
"""
|
|
|
|
def setUp(self):
|
|
timeline = GES.Timeline.new()
|
|
self.timeline = timeline
|
|
timeline.set_auto_transition(True)
|
|
|
|
self.snap_occured = False
|
|
self.snap = None
|
|
|
|
def snap_started(tl, el1, el2, pos):
|
|
if self.snap_occured:
|
|
raise AssertionError(
|
|
"Previous snap {} not accounted for".format(self.snap))
|
|
self.snap_occured = True
|
|
if self.snap is not None:
|
|
raise AssertionError(
|
|
"Previous snap {} not ended".format(self.snap))
|
|
self.snap = (el1.get_parent(), el2.get_parent(), pos)
|
|
|
|
def snap_ended(tl, el1, el2, pos):
|
|
self.assertEqual(
|
|
self.snap, (el1.get_parent(), el2.get_parent(), pos))
|
|
self.snap = None
|
|
|
|
timeline.connect("snapping-started", snap_started)
|
|
timeline.connect("snapping-ended", snap_ended)
|
|
|
|
self.lost_clips = []
|
|
|
|
def unrecord_lost_clip(layer, clip):
|
|
if clip in self.lost_clips:
|
|
self.lost_clips.remove(clip)
|
|
|
|
def record_lost_clip(layer, clip):
|
|
self.lost_clips.append(clip)
|
|
|
|
def layer_added(tl, layer):
|
|
layer.connect("clip-added", unrecord_lost_clip)
|
|
layer.connect("clip-removed", record_lost_clip)
|
|
|
|
timeline.connect("layer-added", layer_added)
|
|
|
|
self.clips = []
|
|
self.auto_transitions = {}
|
|
self.config = {}
|
|
|
|
@staticmethod
|
|
def new_config(start, duration, inpoint, maxduration, layer):
|
|
return {"start": start, "duration": duration, "in-point": inpoint,
|
|
"max-duration": maxduration, "layer": layer}
|
|
|
|
def add_clip(self, name, layer, tracks, start, duration, inpoint=0,
|
|
maxduration=Gst.CLOCK_TIME_NONE, clip_type=GES.TestClip,
|
|
asset_id=None, effects=None):
|
|
"""
|
|
Create a clip with the given @name and properties and add it to the
|
|
layer of priority @layer to the tracks in @tracks. Also registers
|
|
its expected configuration.
|
|
"""
|
|
if effects is None:
|
|
effects = []
|
|
|
|
lay = self.timeline.get_layer(layer)
|
|
while lay is None:
|
|
self.timeline.append_layer()
|
|
lay = self.timeline.get_layer(layer)
|
|
|
|
asset = GES.Asset.request(clip_type, asset_id)
|
|
clip = asset.extract()
|
|
self.assertTrue(clip.set_name(name))
|
|
# FIXME: would be better to use select-tracks-for-object
|
|
# hack around the fact that we cannot use select-tracks-for-object
|
|
# in python by setting start to large number to ensure no conflict
|
|
# when adding a clip
|
|
self.assertTrue(clip.set_start(10000))
|
|
self.assertTrue(clip.set_duration(duration))
|
|
self.assertTrue(clip.set_inpoint(inpoint))
|
|
|
|
for effect in effects:
|
|
self.assertTrue(clip.add(effect))
|
|
|
|
if lay.add_clip(clip) != True:
|
|
raise AssertionError(
|
|
"Failed to add clip {} to layer {}".format(name, layer))
|
|
|
|
# then remove the children not in the selected tracks, which may
|
|
# now allow some clips to fully/triple overlap because they do
|
|
# not share a track
|
|
for child in clip.get_children(False):
|
|
if child.get_track() not in tracks:
|
|
clip.remove(child)
|
|
|
|
# then move to the desired start
|
|
prev_snap = self.timeline.get_snapping_distance()
|
|
self.timeline.set_snapping_distance(0)
|
|
self.assertTrue(clip.set_start(start))
|
|
self.timeline.set_snapping_distance(prev_snap)
|
|
|
|
self.assertTrue(clip.set_max_duration(maxduration))
|
|
|
|
self.config[clip] = self.new_config(
|
|
start, duration, inpoint, maxduration, layer)
|
|
self.clips.append(clip)
|
|
|
|
return clip
|
|
|
|
def add_group(self, name, to_group):
|
|
"""
|
|
Create a group with the given @name and the elements in @to_group.
|
|
Also registers its expected configuration.
|
|
"""
|
|
group = GES.Group.new()
|
|
self.assertTrue(group.set_name(name))
|
|
start = None
|
|
end = None
|
|
layer = None
|
|
for element in to_group:
|
|
if start is None:
|
|
start = element.start
|
|
end = element.start + element.duration
|
|
layer = element.get_layer_priority()
|
|
else:
|
|
start = min(start, element.start)
|
|
end = max(end, element.start + element.duration)
|
|
layer = min(layer, element.get_layer_priority())
|
|
self.assertTrue(group.add(element))
|
|
|
|
self.config[group] = self.new_config(
|
|
start, end - start, 0, Gst.CLOCK_TIME_NONE, layer)
|
|
return group
|
|
|
|
def register_auto_transition(self, clip1, clip2, track):
|
|
"""
|
|
Register that we expect an auto-transition to exist between
|
|
@clip1 and @clip2 in @track.
|
|
"""
|
|
transition = self._find_transition(clip1, clip2, track)
|
|
if transition is None:
|
|
raise AssertionError(
|
|
"{} and {} have no auto-transition in track {}".format(
|
|
clip1, clip2, track))
|
|
if transition in self.auto_transitions.values():
|
|
raise AssertionError(
|
|
"Auto-transition between {} and {} in track {} already "
|
|
"registered".format(clip1, clip2, track))
|
|
key = (clip1, clip2, track)
|
|
if key in self.auto_transitions:
|
|
raise AssertionError(
|
|
"Auto-transition already registered for {}".format(key))
|
|
|
|
self.auto_transitions[key] = transition
|
|
|
|
def add_video_track(self):
|
|
track = GES.VideoTrack.new()
|
|
self.assertTrue(self.timeline.add_track(track))
|
|
return track
|
|
|
|
def add_audio_track(self):
|
|
track = GES.AudioTrack.new()
|
|
self.assertTrue(self.timeline.add_track(track))
|
|
return track
|
|
|
|
def assertElementConfig(self, element, config):
|
|
for prop in config:
|
|
if prop == "layer":
|
|
val = element.get_layer_priority()
|
|
else:
|
|
val = element.get_property(prop)
|
|
|
|
if val != config[prop]:
|
|
raise AssertionError("{} property {}: {} != {}".format(
|
|
element, prop, val, config[prop]))
|
|
|
|
@staticmethod
|
|
def _source_in_track(clip, track):
|
|
if clip.find_track_element(track, GES.Source):
|
|
return True
|
|
return False
|
|
|
|
def _find_transition(self, clip1, clip2, track):
|
|
"""find transition from earlier clip1 to later clip2"""
|
|
if not self._source_in_track(clip1, track) or \
|
|
not self._source_in_track(clip2, track):
|
|
return None
|
|
|
|
layer_prio = clip1.get_layer_priority()
|
|
if layer_prio != clip2.get_layer_priority():
|
|
return None
|
|
|
|
if clip1.start >= clip2.start:
|
|
return None
|
|
|
|
start = clip2.start
|
|
end = clip1.start + clip1.duration
|
|
if start >= end:
|
|
return None
|
|
duration = end - start
|
|
|
|
layer = self.timeline.get_layer(layer_prio)
|
|
self.assertIsNotNone(layer)
|
|
|
|
for clip in layer.get_clips():
|
|
children = clip.get_children(False)
|
|
if len(children) == 1:
|
|
child = children[0]
|
|
else:
|
|
continue
|
|
if isinstance(clip, GES.TransitionClip) and clip.start == start \
|
|
and clip.duration == duration and child.get_track() == track:
|
|
return clip
|
|
|
|
raise AssertionError(
|
|
"No auto-transition between {} and {} in track {}".format(
|
|
clip1, clip2, track))
|
|
|
|
def _transition_between(self, new, existing, clip1, clip2, track):
|
|
if clip1.start < clip2.start:
|
|
entry = (clip1, clip2, track)
|
|
else:
|
|
entry = (clip2, clip1, track)
|
|
trans = self._find_transition(*entry)
|
|
|
|
if trans is None:
|
|
return
|
|
|
|
if entry in new:
|
|
new.remove(entry)
|
|
self.auto_transitions[entry] = trans
|
|
elif entry in existing:
|
|
existing.remove(entry)
|
|
expect = self.auto_transitions[entry]
|
|
if trans != expect:
|
|
raise AssertionError(
|
|
"Auto-transition between {} and {} in track {} changed "
|
|
"from {} to {}".format(
|
|
clip1, clip2, track, expect, trans))
|
|
else:
|
|
raise AssertionError(
|
|
"Unexpected transition found between {} and {} in track {}"
|
|
"".format(clip1, clip2, track))
|
|
|
|
def assertTimelineConfig(
|
|
self, new_props=None, snap_position=None, snap_froms=None,
|
|
snap_tos=None, new_transitions=None, lost_transitions=None):
|
|
"""
|
|
Check that the timeline configuration has only changed by the
|
|
differences present in @new_props.
|
|
Check that a snap occurred at @snap_position between one of the
|
|
clips in @snap_froms and one of the clips in @snap_tos.
|
|
Check that all new transitions in the timeline are present in
|
|
@new_transitions.
|
|
Checl that all the transitions that were lost are in
|
|
@lost_transitions.
|
|
"""
|
|
if new_props is None:
|
|
new_props = {}
|
|
if snap_froms is None:
|
|
snap_froms = []
|
|
if snap_tos is None:
|
|
snap_tos = []
|
|
if new_transitions is None:
|
|
new_transitions = []
|
|
if lost_transitions is None:
|
|
lost_transitions = []
|
|
|
|
for element, config in new_props.items():
|
|
if element not in self.config:
|
|
self.config[element] = {}
|
|
|
|
for prop in config:
|
|
self.config[element][prop] = new_props[element][prop]
|
|
|
|
for element, config in self.config.items():
|
|
self.assertElementConfig(element, config)
|
|
|
|
# check that snapping occurred
|
|
snaps = []
|
|
for snap_from in snap_froms:
|
|
for snap_to in snap_tos:
|
|
snaps.append((snap_from, snap_to, snap_position))
|
|
|
|
if self.snap is None:
|
|
if snaps:
|
|
raise AssertionError(
|
|
"No snap occurred, but expected a snap in {}".format(snaps))
|
|
elif not snaps:
|
|
if self.snap_occured:
|
|
raise AssertionError(
|
|
"Snap {} occurred, but expected no snap".format(self.snap))
|
|
elif self.snap not in snaps:
|
|
raise AssertionError(
|
|
"Snap {} occurred, but expected a snap in {}".format(
|
|
self.snap, snaps))
|
|
self.snap_occured = False
|
|
|
|
# check that lost transitions are not part of the layer
|
|
for clip1, clip2, track in lost_transitions:
|
|
key = (clip1, clip2, track)
|
|
if key not in self.auto_transitions:
|
|
raise AssertionError(
|
|
"No such auto-transition between {} and {} in track {} "
|
|
"is registered".format(clip1, clip2, track))
|
|
# make sure original transition was removed from the layer
|
|
trans = self.auto_transitions[key]
|
|
if trans not in self.lost_clips:
|
|
raise AssertionError(
|
|
"The auto-transition {} between {} and {} track {} was "
|
|
"not removed from the layers, but expect it to be lost"
|
|
"".format(trans, clip1, clip2, track))
|
|
self.lost_clips.remove(trans)
|
|
# make sure a new one wasn't created
|
|
trans = self._find_transition(clip1, clip2, track)
|
|
if trans is not None:
|
|
raise AssertionError(
|
|
"Found auto-transition between {} and {} in track {} "
|
|
"is present, but expected it to be lost".format(
|
|
clip1, clip2, track))
|
|
# since it was lost, remove it
|
|
del self.auto_transitions[key]
|
|
|
|
# check that all lost clips are accounted for
|
|
if self.lost_clips:
|
|
raise AssertionError(
|
|
"Clips were lost that are not accounted for: {}".format(
|
|
self.lost_clips))
|
|
|
|
# check that all other transitions are either existing ones or
|
|
# new ones
|
|
new = set(new_transitions)
|
|
existing = set(self.auto_transitions.keys())
|
|
for i, clip1 in enumerate(self.clips):
|
|
for clip2 in self.clips[i+1:]:
|
|
for track in self.timeline.get_tracks():
|
|
self._transition_between(
|
|
new, existing, clip1, clip2, track)
|
|
|
|
# make sure we are not missing any expected transitions
|
|
if new:
|
|
raise AssertionError(
|
|
"Did not find new transitions for {}".format(new))
|
|
if existing:
|
|
raise AssertionError(
|
|
"Did not find existing transitions for {}".format(existing))
|
|
|
|
# make sure there aren't any clips we are unaware of
|
|
transitions = self.auto_transitions.values()
|
|
for layer in self.timeline.get_layers():
|
|
for clip in layer.get_clips():
|
|
if clip not in self.clips and clip not in transitions:
|
|
raise AssertionError("Unknown clip {}".format(clip))
|
|
|
|
def assertEdit(self, element, layer, mode, edge, position, snap,
|
|
snap_froms, snap_tos, new_props, new_transitions,
|
|
lost_transitions):
|
|
if not element.edit_full(layer, mode, edge, position):
|
|
raise AssertionError(
|
|
"Edit of {} to layer {}, mode {}, edge {}, at position {} "
|
|
"failed when a success was expected".format(
|
|
element, layer, mode, edge, position))
|
|
self.assertTimelineConfig(
|
|
new_props=new_props, snap_position=snap, snap_froms=snap_froms,
|
|
snap_tos=snap_tos, new_transitions=new_transitions,
|
|
lost_transitions=lost_transitions)
|
|
|
|
def assertFailEdit(self, element, layer, mode, edge, position, err_code):
|
|
res = None
|
|
error = None
|
|
try:
|
|
res = element.edit_full(layer, mode, edge, position)
|
|
except GLib.Error as exception:
|
|
error = exception
|
|
|
|
if err_code is None:
|
|
if res is not False:
|
|
raise AssertionError(
|
|
"Edit of {} to layer {}, mode {}, edge {}, at "
|
|
"position {} succeeded when a failure was expected"
|
|
"".format(
|
|
element, layer, mode, edge, position))
|
|
if error is not None:
|
|
raise AssertionError(
|
|
"Edit of {} to layer {}, mode {}, edge {}, at "
|
|
"position {} did produced an error when none was "
|
|
"expected".format(
|
|
element, layer, mode, edge, position))
|
|
else:
|
|
self.assertGESError(
|
|
error, err_code,
|
|
"Edit of {} to layer {}, mode {}, edge {}, at "
|
|
"position {}".format(element, layer, mode, edge, position))
|
|
# should be no change or snapping if edit fails
|
|
self.assertTimelineConfig()
|