gstreamer/subprojects/gst-editing-services/tests/check/python/common.py
Thibault Saunier 19c0921179 ges:tests: Create shorter assets to avoid timeouts
And use a simple GStreamer pipeline as testsrcbin with GstTranscoder
doesn't let us easily set the framerate of the source and we end up
having videorate dropping frames leading to the rendered file having
an unprecise duration.

This should fix races with `check.gst-editing-services.pythontests.pyunittest.python.test_assets.TestTimeline.test_reload_asset`

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/1130>
2021-10-13 18:01:38 +00:00

773 lines
28 KiB
Python

# -*- coding: utf-8 -*-
#
# Copyright (c) 2016 Alexandru Băluț <alexandru.balut@gmail.com>
# Copyright (c) 2016, Thibault Saunier
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this program; if not, write to the
# Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
# Boston, MA 02110-1301, USA.
from urllib.parse import urlparse
import gi
gi.require_version("Gst", "1.0")
gi.require_version("GES", "1.0")
from gi.repository import Gst # noqa
from gi.repository import GES # noqa
from gi.repository import GLib # noqa
from gi.repository import GObject # noqa
import contextlib # noqa
import os #noqa
import unittest # noqa
import tempfile # noqa
try:
gi.require_version("GstTranscoder", "1.0")
from gi.repository import GstTranscoder
except ValueError:
GstTranscoder = None
Gst.init(None)
GES.init()
def create_main_loop():
"""Creates a MainLoop with a timeout."""
mainloop = GLib.MainLoop()
timed_out = False
def timeout_cb(unused):
nonlocal timed_out
timed_out = True
mainloop.quit()
def run(timeout_seconds=5, until_empty=False):
source = GLib.timeout_source_new_seconds(timeout_seconds)
source.set_callback(timeout_cb)
source.attach()
if until_empty:
GLib.idle_add(mainloop.quit)
GLib.MainLoop.run(mainloop)
source.destroy()
if timed_out:
raise Exception("Timed out after %s seconds" % timeout_seconds)
mainloop.run = run
return mainloop
def create_project(with_group=False, saved=False):
"""Creates a project with two clips in a group."""
timeline = GES.Timeline.new_audio_video()
layer = timeline.append_layer()
if with_group:
clip1 = GES.TitleClip()
clip1.set_start(0)
clip1.set_duration(10*Gst.SECOND)
layer.add_clip(clip1)
clip2 = GES.TitleClip()
clip2.set_start(100 * Gst.SECOND)
clip2.set_duration(10*Gst.SECOND)
layer.add_clip(clip2)
group = GES.Container.group([clip1, clip2])
if saved:
if isinstance(saved, str):
suffix = "-%s.xges" % saved
else:
suffix = ".xges"
uri = "file://%s" % tempfile.NamedTemporaryFile(suffix=suffix).name
timeline.get_asset().save(timeline, uri, None, overwrite=True)
return timeline
@contextlib.contextmanager
def created_project_file(xges):
_, xges_path = tempfile.mkstemp(suffix=".xges")
with open(xges_path, "w") as f:
f.write(xges)
yield Gst.filename_to_uri(os.path.abspath(xges_path))
os.remove(xges_path)
def can_generate_assets():
if GstTranscoder is None:
return False, "GstTranscoder is not available"
if not Gst.ElementFactory.make("testsrcbin"):
return False, "testbinsrc is not available"
return True, None
@contextlib.contextmanager
def created_video_asset(uri=None, num_bufs=30, framerate="30/1"):
with tempfile.NamedTemporaryFile(suffix=".ogg") as f:
if not uri:
uri = Gst.filename_to_uri(f.name)
name = f.name
else:
name = urlparse(uri).path
pipe = Gst.parse_launch(f"videotestsrc num-buffers={num_bufs} ! video/x-raw,framerate={framerate} ! theoraenc ! oggmux ! filesink location={name}")
pipe.set_state(Gst.State.PLAYING)
assert pipe.get_bus().timed_pop_filtered(Gst.CLOCK_TIME_NONE, Gst.MessageType.EOS)
pipe.set_state(Gst.State.NULL)
yield uri
def get_asset_uri(name):
python_tests_dir = os.path.dirname(os.path.abspath(__file__))
assets_dir = os.path.join(python_tests_dir, "..", "assets")
return Gst.filename_to_uri(os.path.join(assets_dir, name))
class GESTest(unittest.TestCase):
def _log(self, func, format, *args):
string = format
if args:
string = string % args[0]
func(string)
def log(self, format, *args):
self._log(Gst.log, format, *args)
def debug(self, format, *args):
self._log(Gst.debug, format, *args)
def info(self, format, *args):
self._log(Gst.info, format, *args)
def fixme(self, format, *args):
self._log(Gst.fixme, format, *args)
def warning(self, format, *args):
self._log(Gst.warning, format, *args)
def error(self, format, *args):
self._log(Gst.error, format, *args)
def check_clip_values(self, clip, start, in_point, duration):
for elem in [clip] + clip.get_children(False):
self.check_element_values(elem, start, in_point, duration)
def check_element_values(self, element, start, in_point, duration):
self.assertEqual(element.props.start, start, element)
self.assertEqual(element.props.in_point, in_point, element)
self.assertEqual(element.props.duration, duration, element)
def assert_effects(self, clip, *effects):
# Make sure there are no other effects.
self.assertEqual(set(clip.get_top_effects()), set(effects))
# Make sure their order is correct.
indexes = [clip.get_top_effect_index(effect)
for effect in effects]
self.assertEqual(indexes, list(range(len(effects))))
def assertGESError(self, error, code, message=""):
if error is None:
raise AssertionError(
"{}{}Received no error".format(message, message and ": "))
if error.domain != "GES_ERROR":
raise AssertionError(
"{}{}Received error ({}) in domain {} rather than "
"GES_ERROR".format(
message, message and ": ", error.message, error.domain))
err_code = GES.Error(error.code)
if err_code != code:
raise AssertionError(
"{}{}Received {} error ({}) rather than {}".format(
message, message and ": ", err_code.value_name,
error.message, code.value_name))
class GESSimpleTimelineTest(GESTest):
def __init__(self, *args):
self.track_types = [GES.TrackType.AUDIO, GES.TrackType.VIDEO]
super(GESSimpleTimelineTest, self).__init__(*args)
def timeline_as_str(self):
res = "====== %s =======\n" % self.timeline
for layer in self.timeline.get_layers():
res += "Layer %04d: " % layer.get_priority()
for clip in layer.get_clips():
res += "{ %s }" % clip
res += '\n------------------------\n'
for group in self.timeline.get_groups():
res += "GROUP %s :" % group
for clip in group.get_children(False):
res += " { %s }" % clip.props.name
res += '\n'
res += "================================\n"
return res
def print_timeline(self):
print(self.timeline_as_str())
def setUp(self):
self.timeline = GES.Timeline.new()
for track_type in self.track_types:
self.assertIn(
track_type, [GES.TrackType.AUDIO, GES.TrackType.VIDEO])
if track_type == GES.TrackType.AUDIO:
self.assertTrue(self.timeline.add_track(GES.AudioTrack.new()))
else:
self.assertTrue(self.timeline.add_track(GES.VideoTrack.new()))
self.assertEqual(len(self.timeline.get_tracks()),
len(self.track_types))
self.layer = self.timeline.append_layer()
def add_clip(self, start, in_point, duration, asset_type=GES.TestClip):
clip = GES.Asset.request(asset_type, None).extract()
clip.props.start = start
clip.props.in_point = in_point
clip.props.duration = duration
self.assertTrue(self.layer.add_clip(clip))
return clip
def append_clip(self, layer=0, asset_type=GES.TestClip, asset_id=None):
while len(self.timeline.get_layers()) < layer + 1:
self.timeline.append_layer()
layer = self.timeline.get_layers()[layer]
if asset_type == GES.UriClip:
asset = GES.UriClipAsset.request_sync(asset_id)
else:
asset = GES.Asset.request(asset_type, asset_id)
clip = asset.extract()
clip.props.start = layer.get_duration()
clip.props.duration = 10
self.assertTrue(layer.add_clip(clip))
return clip
def assertElementAreEqual(self, ref, element):
self.assertTrue(isinstance(element, type(ref)), "%s and %s do not have the same type!" % (ref, element))
props = [p for p in ref.list_properties() if p.name not in ['name']
and not GObject.type_is_a(p.value_type, GObject.Object)]
for p in props:
pname = p.name
refval = GObject.Value()
refval.init(p.value_type)
refval.set_value(ref.get_property(pname))
value = GObject.Value()
value.init(p.value_type)
value.set_value(element.get_property(pname))
self.assertTrue(Gst.value_compare(refval, value) == Gst.VALUE_EQUAL,
"%s are not equal: %s != %s\n %s != %s" % (pname, value, refval, element, ref))
if isinstance(ref, GES.TrackElement):
self.assertElementAreEqual(ref.get_nleobject(), element.get_nleobject())
return
if not isinstance(ref, GES.Clip):
return
ttypes = [track.type for track in self.timeline.get_tracks()]
for ttype in ttypes:
if ttypes.count(ttype) > 1:
self.warning("Can't deeply check %s and %s "
"(only one track per type supported %s %s found)" % (ref,
element, ttypes.count(ttype), ttype))
return
children = element.get_children(False)
for ref_child in ref.get_children(False):
ref_track = ref_child.get_track()
if not ref_track:
self.warning("Can't check %s as not in a track" % (ref_child))
continue
child = None
for tmpchild in children:
if not isinstance(tmpchild, type(ref_child)):
continue
if ref_track.type != tmpchild.get_track().type:
continue
if not isinstance(ref_child, GES.Effect):
child = tmpchild
break
elif ref_child.props.bin_description == tmpchild.props.bin_description:
child = tmpchild
break
self.assertIsNotNone(child, "Could not find equivalent child %s in %s(%s)" % (ref_child,
element, children))
self.assertElementAreEqual(ref_child, child)
def check_reload_timeline(self):
tmpf = tempfile.NamedTemporaryFile(suffix='.xges')
uri = Gst.filename_to_uri(tmpf.name)
self.assertTrue(self.timeline.save_to_uri(uri, None, True))
project = GES.Project.new(uri)
mainloop = create_main_loop()
def loaded_cb(unused_project, unused_timeline):
mainloop.quit()
project.connect("loaded", loaded_cb)
reloaded_timeline = project.extract()
mainloop.run()
self.assertIsNotNone(reloaded_timeline)
layers = self.timeline.get_layers()
reloaded_layers = reloaded_timeline.get_layers()
self.assertEqual(len(layers), len(reloaded_layers))
for layer, reloaded_layer in zip(layers, reloaded_layers):
clips = layer.get_clips()
reloaded_clips = reloaded_layer.get_clips()
self.assertEqual(len(clips), len(reloaded_clips))
for clip, reloaded_clip in zip(clips, reloaded_clips):
self.assertElementAreEqual(clip, reloaded_clip)
return reloaded_timeline
def assertTimelineTopology(self, topology, groups=[]):
res = []
for layer in self.timeline.get_layers():
layer_timings = []
for clip in layer.get_clips():
layer_timings.append(
(type(clip), clip.props.start, clip.props.duration))
for child in clip.get_children(True):
self.assertEqual(child.props.start, clip.props.start)
self.assertEqual(child.props.duration, clip.props.duration)
res.append(layer_timings)
if topology != res:
Gst.error(self.timeline_as_str())
self.assertEqual(topology, res)
timeline_groups = self.timeline.get_groups()
if groups and timeline_groups:
for i, group in enumerate(groups):
self.assertEqual(set(group), set(timeline_groups[i].get_children(False)))
self.assertEqual(len(timeline_groups), i + 1)
return res
class GESTimelineConfigTest(GESTest):
"""
Tests where all the configuration changes, snapping positions and
auto-transitions are accounted for.
"""
def setUp(self):
timeline = GES.Timeline.new()
self.timeline = timeline
timeline.set_auto_transition(True)
self.snap_occured = False
self.snap = None
def snap_started(tl, el1, el2, pos):
if self.snap_occured:
raise AssertionError(
"Previous snap {} not accounted for".format(self.snap))
self.snap_occured = True
if self.snap is not None:
raise AssertionError(
"Previous snap {} not ended".format(self.snap))
self.snap = (el1.get_parent(), el2.get_parent(), pos)
def snap_ended(tl, el1, el2, pos):
self.assertEqual(
self.snap, (el1.get_parent(), el2.get_parent(), pos))
self.snap = None
timeline.connect("snapping-started", snap_started)
timeline.connect("snapping-ended", snap_ended)
self.lost_clips = []
def unrecord_lost_clip(layer, clip):
if clip in self.lost_clips:
self.lost_clips.remove(clip)
def record_lost_clip(layer, clip):
self.lost_clips.append(clip)
def layer_added(tl, layer):
layer.connect("clip-added", unrecord_lost_clip)
layer.connect("clip-removed", record_lost_clip)
timeline.connect("layer-added", layer_added)
self.clips = []
self.auto_transitions = {}
self.config = {}
@staticmethod
def new_config(start, duration, inpoint, maxduration, layer):
return {"start": start, "duration": duration, "in-point": inpoint,
"max-duration": maxduration, "layer": layer}
def add_clip(self, name, layer, tracks, start, duration, inpoint=0,
maxduration=Gst.CLOCK_TIME_NONE, clip_type=GES.TestClip,
asset_id=None, effects=None):
"""
Create a clip with the given @name and properties and add it to the
layer of priority @layer to the tracks in @tracks. Also registers
its expected configuration.
"""
if effects is None:
effects = []
lay = self.timeline.get_layer(layer)
while lay is None:
self.timeline.append_layer()
lay = self.timeline.get_layer(layer)
asset = GES.Asset.request(clip_type, asset_id)
clip = asset.extract()
self.assertTrue(clip.set_name(name))
# FIXME: would be better to use select-tracks-for-object
# hack around the fact that we cannot use select-tracks-for-object
# in python by setting start to large number to ensure no conflict
# when adding a clip
self.assertTrue(clip.set_start(10000))
self.assertTrue(clip.set_duration(duration))
self.assertTrue(clip.set_inpoint(inpoint))
for effect in effects:
self.assertTrue(clip.add(effect))
if lay.add_clip(clip) != True:
raise AssertionError(
"Failed to add clip {} to layer {}".format(name, layer))
# then remove the children not in the selected tracks, which may
# now allow some clips to fully/triple overlap because they do
# not share a track
for child in clip.get_children(False):
if child.get_track() not in tracks:
clip.remove(child)
# then move to the desired start
prev_snap = self.timeline.get_snapping_distance()
self.timeline.set_snapping_distance(0)
self.assertTrue(clip.set_start(start))
self.timeline.set_snapping_distance(prev_snap)
self.assertTrue(clip.set_max_duration(maxduration))
self.config[clip] = self.new_config(
start, duration, inpoint, maxduration, layer)
self.clips.append(clip)
return clip
def add_group(self, name, to_group):
"""
Create a group with the given @name and the elements in @to_group.
Also registers its expected configuration.
"""
group = GES.Group.new()
self.assertTrue(group.set_name(name))
start = None
end = None
layer = None
for element in to_group:
if start is None:
start = element.start
end = element.start + element.duration
layer = element.get_layer_priority()
else:
start = min(start, element.start)
end = max(end, element.start + element.duration)
layer = min(layer, element.get_layer_priority())
self.assertTrue(group.add(element))
self.config[group] = self.new_config(
start, end - start, 0, Gst.CLOCK_TIME_NONE, layer)
return group
def register_auto_transition(self, clip1, clip2, track):
"""
Register that we expect an auto-transition to exist between
@clip1 and @clip2 in @track.
"""
transition = self._find_transition(clip1, clip2, track)
if transition is None:
raise AssertionError(
"{} and {} have no auto-transition in track {}".format(
clip1, clip2, track))
if transition in self.auto_transitions.values():
raise AssertionError(
"Auto-transition between {} and {} in track {} already "
"registered".format(clip1, clip2, track))
key = (clip1, clip2, track)
if key in self.auto_transitions:
raise AssertionError(
"Auto-transition already registered for {}".format(key))
self.auto_transitions[key] = transition
def add_video_track(self):
track = GES.VideoTrack.new()
self.assertTrue(self.timeline.add_track(track))
return track
def add_audio_track(self):
track = GES.AudioTrack.new()
self.assertTrue(self.timeline.add_track(track))
return track
def assertElementConfig(self, element, config):
for prop in config:
if prop == "layer":
val = element.get_layer_priority()
else:
val = element.get_property(prop)
if val != config[prop]:
raise AssertionError("{} property {}: {} != {}".format(
element, prop, val, config[prop]))
@staticmethod
def _source_in_track(clip, track):
if clip.find_track_element(track, GES.Source):
return True
return False
def _find_transition(self, clip1, clip2, track):
"""find transition from earlier clip1 to later clip2"""
if not self._source_in_track(clip1, track) or \
not self._source_in_track(clip2, track):
return None
layer_prio = clip1.get_layer_priority()
if layer_prio != clip2.get_layer_priority():
return None
if clip1.start >= clip2.start:
return None
start = clip2.start
end = clip1.start + clip1.duration
if start >= end:
return None
duration = end - start
layer = self.timeline.get_layer(layer_prio)
self.assertIsNotNone(layer)
for clip in layer.get_clips():
children = clip.get_children(False)
if len(children) == 1:
child = children[0]
else:
continue
if isinstance(clip, GES.TransitionClip) and clip.start == start \
and clip.duration == duration and child.get_track() == track:
return clip
raise AssertionError(
"No auto-transition between {} and {} in track {}".format(
clip1, clip2, track))
def _transition_between(self, new, existing, clip1, clip2, track):
if clip1.start < clip2.start:
entry = (clip1, clip2, track)
else:
entry = (clip2, clip1, track)
trans = self._find_transition(*entry)
if trans is None:
return
if entry in new:
new.remove(entry)
self.auto_transitions[entry] = trans
elif entry in existing:
existing.remove(entry)
expect = self.auto_transitions[entry]
if trans != expect:
raise AssertionError(
"Auto-transition between {} and {} in track {} changed "
"from {} to {}".format(
clip1, clip2, track, expect, trans))
else:
raise AssertionError(
"Unexpected transition found between {} and {} in track {}"
"".format(clip1, clip2, track))
def assertTimelineConfig(
self, new_props=None, snap_position=None, snap_froms=None,
snap_tos=None, new_transitions=None, lost_transitions=None):
"""
Check that the timeline configuration has only changed by the
differences present in @new_props.
Check that a snap occurred at @snap_position between one of the
clips in @snap_froms and one of the clips in @snap_tos.
Check that all new transitions in the timeline are present in
@new_transitions.
Checl that all the transitions that were lost are in
@lost_transitions.
"""
if new_props is None:
new_props = {}
if snap_froms is None:
snap_froms = []
if snap_tos is None:
snap_tos = []
if new_transitions is None:
new_transitions = []
if lost_transitions is None:
lost_transitions = []
for element, config in new_props.items():
if element not in self.config:
self.config[element] = {}
for prop in config:
self.config[element][prop] = new_props[element][prop]
for element, config in self.config.items():
self.assertElementConfig(element, config)
# check that snapping occurred
snaps = []
for snap_from in snap_froms:
for snap_to in snap_tos:
snaps.append((snap_from, snap_to, snap_position))
if self.snap is None:
if snaps:
raise AssertionError(
"No snap occurred, but expected a snap in {}".format(snaps))
elif not snaps:
if self.snap_occured:
raise AssertionError(
"Snap {} occurred, but expected no snap".format(self.snap))
elif self.snap not in snaps:
raise AssertionError(
"Snap {} occurred, but expected a snap in {}".format(
self.snap, snaps))
self.snap_occured = False
# check that lost transitions are not part of the layer
for clip1, clip2, track in lost_transitions:
key = (clip1, clip2, track)
if key not in self.auto_transitions:
raise AssertionError(
"No such auto-transition between {} and {} in track {} "
"is registered".format(clip1, clip2, track))
# make sure original transition was removed from the layer
trans = self.auto_transitions[key]
if trans not in self.lost_clips:
raise AssertionError(
"The auto-transition {} between {} and {} track {} was "
"not removed from the layers, but expect it to be lost"
"".format(trans, clip1, clip2, track))
self.lost_clips.remove(trans)
# make sure a new one wasn't created
trans = self._find_transition(clip1, clip2, track)
if trans is not None:
raise AssertionError(
"Found auto-transition between {} and {} in track {} "
"is present, but expected it to be lost".format(
clip1, clip2, track))
# since it was lost, remove it
del self.auto_transitions[key]
# check that all lost clips are accounted for
if self.lost_clips:
raise AssertionError(
"Clips were lost that are not accounted for: {}".format(
self.lost_clips))
# check that all other transitions are either existing ones or
# new ones
new = set(new_transitions)
existing = set(self.auto_transitions.keys())
for i, clip1 in enumerate(self.clips):
for clip2 in self.clips[i+1:]:
for track in self.timeline.get_tracks():
self._transition_between(
new, existing, clip1, clip2, track)
# make sure we are not missing any expected transitions
if new:
raise AssertionError(
"Did not find new transitions for {}".format(new))
if existing:
raise AssertionError(
"Did not find existing transitions for {}".format(existing))
# make sure there aren't any clips we are unaware of
transitions = self.auto_transitions.values()
for layer in self.timeline.get_layers():
for clip in layer.get_clips():
if clip not in self.clips and clip not in transitions:
raise AssertionError("Unknown clip {}".format(clip))
def assertEdit(self, element, layer, mode, edge, position, snap,
snap_froms, snap_tos, new_props, new_transitions,
lost_transitions):
if not element.edit_full(layer, mode, edge, position):
raise AssertionError(
"Edit of {} to layer {}, mode {}, edge {}, at position {} "
"failed when a success was expected".format(
element, layer, mode, edge, position))
self.assertTimelineConfig(
new_props=new_props, snap_position=snap, snap_froms=snap_froms,
snap_tos=snap_tos, new_transitions=new_transitions,
lost_transitions=lost_transitions)
def assertFailEdit(self, element, layer, mode, edge, position, err_code):
res = None
error = None
try:
res = element.edit_full(layer, mode, edge, position)
except GLib.Error as exception:
error = exception
if err_code is None:
if res is not False:
raise AssertionError(
"Edit of {} to layer {}, mode {}, edge {}, at "
"position {} succeeded when a failure was expected"
"".format(
element, layer, mode, edge, position))
if error is not None:
raise AssertionError(
"Edit of {} to layer {}, mode {}, edge {}, at "
"position {} did produced an error when none was "
"expected".format(
element, layer, mode, edge, position))
else:
self.assertGESError(
error, err_code,
"Edit of {} to layer {}, mode {}, edge {}, at "
"position {}".format(element, layer, mode, edge, position))
# should be no change or snapping if edit fails
self.assertTimelineConfig()