#!/usr/bin/python # # Copyright (c) 2013,Thibault Saunier # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public # License as published by the Free Software Foundation; either # version 2.1 of the License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this program; if not, write to the # Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, # Boston, MA 02110-1301, USA. import os import time import urlparse import subprocess import ConfigParser import xml.etree.ElementTree as ET from loggable import Loggable from baseclasses import GstValidateTest, TestsManager, Test, ScenarioManager, NamedDic from utils import MediaFormatCombination, get_profile,\ path2url, DEFAULT_TIMEOUT, which, GST_SECOND, Result, \ compare_rendered_with_original, Protocols class MediaDescriptor(object): def __init__(self, xml_path): self.media_xml = ET.parse(xml_path).getroot() # Sanity checks self.media_xml.attrib["duration"] self.media_xml.attrib["seekable"] def get_caps(self): return self.media_xml.findall("streams")[0].attrib["caps"] def get_uri(self): return self.media_xml.attrib["uri"] def get_duration(self): return self.media_xml.attrib["duration"] def set_protocol(self, protocol): self.media_xml.attrib["protocol"] = protocol def get_protocol(self): return self.media_xml.attrib["protocol"] def is_seekable(self): return self.media_xml.attrib["seekable"] def is_image(self): for stream in self.media_xml.findall("streams")[0].findall("stream"): if stream.attrib["type"] == "image": return True return False def num_audio_tracks(media_xml): naudio = 0 for stream in media_xml.findall("streams")[0].findall("stream"): if stream.attrib["type"] == "audio": naudio += 1 return naudio class PipelineDescriptor(object): def __init__(self, name, pipeline): self.name = name self._pipeline = pipeline def needs_uri(self): return False def get_pipeline(self, protocol=Protocols.FILE, uri=""): return self._pipeline class PlaybinDescriptor(PipelineDescriptor): def __init__(self): PipelineDescriptor.__init__(self, "playbin", "playbin") def needs_uri(self): return True def get_pipeline(self, options, protocol, scenario, uri): pipe = self._pipeline if options.mute: fakesink = "'fakesink sync=true'" pipe += " audio-sink=%s video-sink=%s" %(fakesink, fakesink) pipe += " uri=%s" % uri if hasattr(scenario, "reverse-playback") and protocol == Protocols.HTTP: # 10MB so we can reverse playback pipe += " ring-buffer-max-size=10485760" return pipe # definitions of commands to use GST_VALIDATE_COMMAND = "gst-validate-1.0" GST_VALIDATE_TRANSCODING_COMMAND = "gst-validate-transcoding-1.0" G_V_DISCOVERER_COMMAND = "gst-validate-media-check-1.0" if "win32" in sys.platform: GST_VALIDATE_COMMAND += ".exe" GST_VALIDATE_TRANSCODING_COMMAND += ".exe" G_V_DISCOVERER_COMMAND += ".exe" # Some extension file for discovering results G_V_MEDIA_INFO_EXT = "media_info" G_V_STREAM_INFO_EXT = "stream_info" # Some info about protocols and how to handle them G_V_CAPS_TO_PROTOCOL = [("application/x-hls", Protocols.HLS)] G_V_PROTOCOL_TIMEOUTS = {Protocols.HTTP: 120, Protocols.HLS: 240} # Tests descriptions G_V_PLAYBACK_TESTS = [PlaybinDescriptor()] # Description of wanted output formats for transcoding test G_V_ENCODING_TARGET_COMBINATIONS = [ MediaFormatCombination("ogg", "vorbis", "theora"), MediaFormatCombination("webm", "vorbis", "vp8"), MediaFormatCombination("mp4", "mp3", "h264"), MediaFormatCombination("mkv", "vorbis", "h264")] # List of scenarios to run depending on the protocol in use G_V_SCENARIOS = {Protocols.FILE: ["play_15s", "reverse_playback", "fast_forward", "seek_forward", "seek_backward", "seek_with_stop", "scrub_forward_seeking"], Protocols.HTTP: ["play_15s", "fast_forward", "seek_forward", "seek_backward", "seek_with_stop", "reverse_playback"], Protocols.HLS: ["play_15s", "fast_forward", "seek_forward", "seek_with_stop", "seek_backward"], } G_V_PROTOCOL_VIDEO_RESTRICTION_CAPS = { # Handle the unknown framerate in HLS samples Protocols.HLS: "video/x-raw,framerate=25/1" } G_V_BLACKLISTED_TESTS = \ [("validate.hls.playback.fast_forward.*", "https://bugzilla.gnome.org/show_bug.cgi?id=698155"), ("validate.hls.playback.seek_with_stop.*", "https://bugzilla.gnome.org/show_bug.cgi?id=723268"), ("validate.*.reverse_playback.*webm$", "https://bugzilla.gnome.org/show_bug.cgi?id=679250"), ("validate.*.playback.reverse_playback.*\.ts|validate.*.playback.reverse_playback.*\.MTS", "https://bugzilla.gnome.org/show_bug.cgi?id=702595"), ("validate.http.playback.seek_with_stop.*webm", "matroskademux.gst_matroska_demux_handle_seek_push: Seek end-time not supported in streaming mode"), ("validate.http.playback.seek_with_stop.*mkv", "matroskademux.gst_matroska_demux_handle_seek_push: Seek end-time not supported in streaming mode"), ] class GstValidateLaunchTest(GstValidateTest): def __init__(self, classname, options, reporter, pipeline_desc, timeout=DEFAULT_TIMEOUT, scenario=None, media_descriptor=None): try: timeout = G_V_PROTOCOL_TIMEOUTS[media_descriptor.get_protocol()] except KeyError: pass super(GstValidateLaunchTest, self).__init__(GST_VALIDATE_COMMAND, classname, options, reporter, scenario=scenario, timeout=timeout) self.pipeline_desc = pipeline_desc self.media_descriptor = media_descriptor def build_arguments(self): GstValidateTest.build_arguments(self) self.add_arguments(self.pipeline_desc) def get_current_value(self): if self.scenario: sent_eos = self.sent_eos_position() if sent_eos is not None: t = time.time() if ((t - sent_eos)) > 30: if self.media_descriptor.get_protocol() == Protocols.HLS: self.set_result(Result.PASSED, """Got no EOS 30 seconds after sending EOS, in HLS known and tolerated issue: https://bugzilla.gnome.org/show_bug.cgi?id=723868""") return Result.KNOWN_ERROR return Result.FAILED return self.get_current_position() class GstValidateMediaCheckTest(Test): def __init__(self, classname, options, reporter, media_descriptor, uri, minfo_path, timeout=DEFAULT_TIMEOUT): super(GstValidateMediaCheckTest, self).__init__(G_V_DISCOVERER_COMMAND, classname, options, reporter, timeout=timeout) self._uri = uri self.media_descriptor = media_descriptor self._media_info_path = minfo_path def build_arguments(self): self.add_arguments(self._uri, "--expected-results", self._media_info_path) class GstValidateTranscodingTest(GstValidateTest): _scenarios = ScenarioManager() def __init__(self, classname, options, reporter, combination, uri, media_descriptor, timeout=DEFAULT_TIMEOUT, scenario_name="play_15s"): Loggable.__init__(self) file_dur = long(media_descriptor.get_duration()) / GST_SECOND if file_dur < 30: self.debug("%s is short (%ds< 30 secs) playing it all" % (uri, file_dur)) scenario = None else: self.debug("%s is long (%ds > 30 secs) playing it all" % (uri, file_dur)) scenario = self._scenarios.get_scenario(scenario_name) try: timeout = G_V_PROTOCOL_TIMEOUTS[media_descriptor.get_protocol()] except KeyError: pass try: hard_timeout = 4 * int(scenario.duration) + timeout except AttributeError: hard_timeout = None super(GstValidateTranscodingTest, self).__init__( GST_VALIDATE_TRANSCODING_COMMAND, classname, options, reporter, scenario=scenario, timeout=timeout, hard_timeout=hard_timeout) self.media_descriptor = media_descriptor self.uri = uri self.combination = combination self.dest_file = "" def set_rendering_info(self): self.dest_file = path = os.path.join(self.options.dest, self.classname.replace(".transcode.", os.sep). replace(".", os.sep)) utils.mkdir(os.path.dirname(urlparse.urlsplit(self.dest_file).path)) if urlparse.urlparse(self.dest_file).scheme == "": self.dest_file = path2url(self.dest_file) try: video_restriction = G_V_PROTOCOL_VIDEO_RESTRICTION_CAPS[self.media_descriptor.get_protocol()] except KeyError: video_restriction = None profile = get_profile(self.combination, video_restriction=video_restriction) self.add_arguments("-o", profile) def build_arguments(self): GstValidateTest.build_arguments(self) self.set_rendering_info() self.add_arguments(self.uri, self.dest_file) def get_current_value(self): if self.scenario: sent_eos = self.sent_eos_position() if sent_eos is not None: t = time.time() if ((t - sent_eos)) > 30: if self.media_descriptor.get_protocol() == Protocols.HLS: self.set_result(Result.PASSED, """Got no EOS 30 seconds after sending EOS, in HLS known and tolerated issue: https://bugzilla.gnome.org/show_bug.cgi?id=723868""") return Result.KNOWN_ERROR return Result.FAILED return self.get_current_size() def check_results(self): if self.result is Result.PASSED and not self.scenario: orig_duration = long(self.media_descriptor.get_duration()) res, msg = compare_rendered_with_original(orig_duration, self.dest_file) self.set_result(res, msg) elif self.message == "": GstValidateTest.check_results(self) class GstValidateManager(TestsManager, Loggable): name = "validate" _scenarios = ScenarioManager() def __init__(self): TestsManager.__init__(self) Loggable.__init__(self) self._uris = [] self._run_defaults = True def init(self): if which(GST_VALIDATE_COMMAND) and which(GST_VALIDATE_TRANSCODING_COMMAND): return True return False def list_tests(self): if self.tests: return self.tests for test_pipeline in G_V_PLAYBACK_TESTS: self._add_playback_test(test_pipeline) for uri, mediainfo in self._list_uris(): protocol = mediainfo.media_descriptor.get_protocol() try: timeout = G_V_PROTOCOL_TIMEOUTS[protocol] except KeyError: timeout = DEFAULT_TIMEOUT classname = "validate.%s.media_check.%s" % (protocol, os.path.basename(uri).replace(".", "_")) self.add_test(GstValidateMediaCheckTest(classname, self.options, self.reporter, mediainfo.media_descriptor, uri, mediainfo.path, timeout=timeout)) for uri, mediainfo in self._list_uris(): if mediainfo.media_descriptor.is_image(): continue for comb in G_V_ENCODING_TARGET_COMBINATIONS: classname = "validate.%s.transcode.to_%s.%s" % (mediainfo.media_descriptor.get_protocol(), str(comb).replace(' ', '_'), os.path.basename(uri).replace(".", "_")) self.add_test(GstValidateTranscodingTest(classname, self.options, self.reporter, comb, uri, mediainfo.media_descriptor)) return self.tests def _check_discovering_info(self, media_info, uri=None): self.debug("Checking %s", media_info) media_descriptor = MediaDescriptor(media_info) try: # Just testing that the vairous mandatory infos are present caps = media_descriptor.get_caps() if uri is None: uri = media_descriptor.get_uri() media_descriptor.set_protocol(urlparse.urlparse(uri).scheme) for caps2, prot in G_V_CAPS_TO_PROTOCOL: if caps2 == caps: media_descriptor.set_protocol(prot) break self._uris.append((uri, NamedDic({"path": media_info, "media_descriptor": media_descriptor}))) except ConfigParser.NoOptionError as e: self.debug("Exception: %s for %s", e, media_info) def _discover_file(self, uri, fpath): try: media_info = "%s.%s" % (fpath, G_V_MEDIA_INFO_EXT) args = G_V_DISCOVERER_COMMAND.split(" ") args.append(uri) if os.path.isfile(media_info): self._check_discovering_info(media_info, uri) return True elif fpath.endswith(G_V_STREAM_INFO_EXT): self._check_discovering_info(fpath) return True elif self.options.generate_info: args.extend(["--output-file", media_info]) else: return True subprocess.check_output(args) self._check_discovering_info(media_info, uri) return True except subprocess.CalledProcessError as e: self.debug("Exception: %s", e) return False def _list_uris(self): if self._uris: return self._uris if not self.args: if isinstance(self.options.paths, str): self.options.paths = [os.path.join(self.options.paths)] for path in self.options.paths: for root, dirs, files in os.walk(path): for f in files: fpath = os.path.join(path, root, f) if os.path.isdir(fpath) or fpath.endswith(G_V_MEDIA_INFO_EXT): continue else: self._discover_file(path2url(fpath), fpath) self.debug("Uris found: %s", self._uris) return self._uris def _get_fname(self, scenario, protocol=None): if scenario is not None and scenario.name.lower() != "none": return "%s.%s.%s.%s" % ("validate", protocol, "playback", scenario.name) return "%s.%s.%s" % ("validate", protocol, "playback") def _add_playback_test(self, pipe_descriptor): if pipe_descriptor.needs_uri(): for uri, minfo in self._list_uris(): protocol = minfo.media_descriptor.get_protocol() if self._run_defaults: scenarios = [self._scenarios.get_scenario(scenario_name) for scenario_name in G_V_SCENARIOS[protocol]] else: scenarios = self._scenarios.get_scenario(None) for scenario in scenarios: npipe = pipe_descriptor.get_pipeline(self.options, protocol, scenario, uri) if not minfo.media_descriptor.is_seekable() or minfo.media_descriptor.is_image(): self.debug("Do not run %s as %s does not support seeking", scenario, uri) continue fname = "%s.%s" % (self._get_fname(scenario, protocol), os.path.basename(uri).replace(".", "_")) self.debug("Adding: %s", fname) self.add_test(GstValidateLaunchTest(fname, self.options, self.reporter, npipe, scenario=scenario, media_descriptor=minfo.media_descriptor) ) else: self.add_test(GstValidateLaunchTest(self._get_fname(scenario, "testing"), self.options, self.reporter, pipe_descriptor.get_pipeline(self.options), scenario=scenario)) def needs_http_server(self): for test in self.list_tests(): if self._is_test_wanted(test): protocol = test.media_descriptor.get_protocol() uri = test.media_descriptor.get_uri() if protocol == Protocols.HTTP and \ "127.0.0.1:%s" % (self.options.http_server_port) in uri: return True return False def get_blacklisted(self): return G_V_BLACKLISTED_TESTS def set_settings(self, options, args, reporter): TestsManager.set_settings(self, options, args, reporter) if options.wanted_tests: self._run_defaults = False