debug-viewer: Port to Python3

And fix unit-tests.

https://bugzilla.gnome.org/show_bug.cgi?id=795260
This commit is contained in:
Philippe Normand 2018-04-14 14:22:11 +01:00
parent a11b78a57d
commit e557b5326d
19 changed files with 182 additions and 541 deletions

View file

@ -57,7 +57,7 @@ class GSourceDispatcher (Dispatcher):
GObject.source_remove(self.source_id)
self.source_id = GObject.idle_add(
iterator.next, priority=GObject.PRIORITY_LOW)
iterator.__next__, priority=GObject.PRIORITY_LOW)
def cancel(self):

View file

@ -33,7 +33,7 @@ from gi.types import GObjectMeta
import GstDebugViewer
from GstDebugViewer.Common import utils
from generictreemodel import GenericTreeModel
from .generictreemodel import GenericTreeModel
def widget_add_popup_menu(widget, menu, button=3):
@ -150,7 +150,7 @@ class UIFactory (object):
def make(self, extra_actions=None):
ui_manager = Gtk.UIManager()
for action_group in self.action_groups.values():
for action_group in list(self.action_groups.values()):
ui_manager.insert_action_group(action_group, 0)
if extra_actions:
for action_group in extra_actions.groups:
@ -199,7 +199,7 @@ class MetaModel (GObjectMeta):
column_names = spec[::2]
column_types = spec[1::2]
column_indices = range(len(column_names))
column_indices = list(range(len(column_names)))
for col_index, col_name, in zip(column_indices, column_names):
setattr(cls, col_name, col_index)
@ -240,7 +240,7 @@ class Manager (object):
if len(kw) != 1:
raise ValueError("need exactly one keyword argument")
attr, value = kw.items()[0]
attr, value = list(kw.items())[0]
getter = attrgetter(attr)
for item in i:
@ -261,20 +261,20 @@ class StateString (object):
def __get__(self, section, section_class=None):
import ConfigParser
import configparser
if section is None:
return self
try:
return self.get(section)
except (ConfigParser.NoSectionError,
ConfigParser.NoOptionError,):
except (configparser.NoSectionError,
configparser.NoOptionError,):
return self.get_default(section)
def __set__(self, section, value):
import ConfigParser
import configparser
self.set(section, value)
@ -439,13 +439,13 @@ class StateSection (object):
def set(self, state_string, value):
import ConfigParser
import configparser
parser = self.state._parser
try:
parser.set(self._name, state_string.option, value)
except ConfigParser.NoSectionError:
except configparser.NoSectionError:
parser.add_section(self._name)
parser.set(self._name, state_string.option, value)
@ -454,12 +454,12 @@ class State (object):
def __init__(self, filename, old_filenames=()):
import ConfigParser
import configparser
self.sections = {}
self._filename = filename
self._parser = ConfigParser.RawConfigParser()
self._parser = configparser.RawConfigParser()
success = self._parser.read([filename])
if not success:
for old_filename in old_filenames:

View file

@ -30,6 +30,7 @@ from gettext import gettext as _, ngettext
import gi
from gi.repository import GLib
from gi.repository import GObject
from gi.repository import Gtk
@ -49,14 +50,7 @@ class ExceptionHandler (object):
class DefaultExceptionHandler (ExceptionHandler):
# TODO Py2.5: In Python 2.5, this succeeds. Remove the try...except block
# once we depend on 2.5.
try:
exc_types = (BaseException,)
except NameError:
# Python < 2.5.
exc_types = (Exception,)
exc_types = (BaseException,)
priority = 0
inherit_fork = True
@ -81,7 +75,7 @@ class ExitOnInterruptExceptionHandler (ExceptionHandler):
def __call__(self, *args):
print >> sys.stderr, "Interrupt caught, exiting."
print("Interrupt caught, exiting.", file=sys.stderr)
sys.exit(self.exit_status)
@ -115,7 +109,7 @@ class MainLoopWrapper (ExceptionHandler):
if self.exc_info != (None,) * 3:
# Re-raise unhandled exception that occured while running the loop.
exc_type, exc_value, exc_tb = self.exc_info
raise exc_type, exc_value, exc_tb
raise exc_type(exc_value).with_traceback(exc_tb)
class ExceptHookManagerClass (object):
@ -294,144 +288,6 @@ class PathsProgramBase (PathsBase):
# needed directory structure in the source dist.
class OptionError (Exception):
pass
class OptionParser (object):
def __init__(self, options):
self.__entries = []
self.__parsers = {}
self.options = options
self.__remaining_args = []
# Remaining args parsing with pygobject does not work with glib before
# 2.13.2 (e.g. Ubuntu Feisty).
# if GObject.glib_version >= (2, 13, 2,):
# self.__entries.append ((GObject.OPTION_REMAINING, "\0", 0, "", "",))
def add_option(self, long_name, short_name=None, description=None,
arg_name=None, arg_parser=None, hidden=False):
flags = 0
if not short_name:
# A deficiency of pygobject:
short_name = "\0"
if not description:
description = ""
if arg_name is None:
flags |= GObject.OPTION_FLAG_NO_ARG
elif arg_parser is not None:
self.__parsers[long_name] = arg_parser
if hidden:
flags |= GObject.OPTION_FLAG_HIDDEN
self.__entries.append((long_name, short_name, flags, description,
arg_name,))
def __handle_option(self, option, arg, group):
# See __init__ for glib requirement.
# if option == GObject.OPTION_REMAINING:
# self.__remaining_args.append (arg)
# return
for entry in self.__entries:
long_name, short_name = entry[:2]
arg_name = entry[-1]
if (option != "--%s" % (long_name,) and
option != "-%s" % (short_name,)):
continue
attr = long_name.replace("-", "_")
if arg_name is None:
value = True
elif long_name in self.__parsers:
value = self.__parsers[long_name](arg)
else:
value = arg
self.options[attr] = value
break
def parse(self, argv):
context = GObject.OptionContext(self.get_parameter_string())
group = GObject.OptionGroup(None, None, None, self.__handle_option)
context.set_main_group(group)
group.add_entries(self.__entries)
try:
result_argv = context.parse(argv)
except GObject.GError as exc:
raise OptionError(exc.message)
self.__remaining_args = result_argv[1:]
self.handle_parse_complete(self.__remaining_args)
def get_parameter_string(self):
raise NotImplementedError("derived classes must override this method")
def handle_parse_complete(self, remaining_args):
pass
class LogOptionParser (OptionParser):
"""Like OptionParser, but adds a --log-level option."""
def __init__(self, *a, **kw):
OptionParser.__init__(self, *a, **kw)
# TODO: Re-evaluate usage of log levels to use less of them. Like
# unifying warning, error and critical.
self.add_option("log-level", "l",
"%s (debug, info, warning, error, critical)"
% (_("Enable logging"),),
"LEVEL", self.parse_log_level)
@staticmethod
def parse_log_level(arg):
try:
level = int(arg)
except ValueError:
level = {"off": None,
"none": None,
"debug": logging.DEBUG,
"info": logging.INFO,
"warning": logging.WARNING,
"error": logging.ERROR,
"critical": logging.CRITICAL}.get(arg.strip().lower())
if level is None:
return None
else:
return level
else:
if level < 0:
level = 0
elif level > 5:
level = 5
return {0: None,
1: logging.DEBUG,
2: logging.INFO,
3: logging.WARNING,
4: logging.ERROR,
5: logging.CRITICAL}[level]
def _init_excepthooks():
ExceptHookManager.setup()
@ -459,24 +315,16 @@ def _init_locale(gettext_domain=None):
gettext.textdomain(gettext_domain)
gettext.bind_textdomain_codeset(gettext_domain, "UTF-8")
def _init_logging(level):
if level == "none":
return
def _init_options(option_parser=None):
if option_parser is None:
return {}
try:
option_parser.parse(sys.argv)
except OptionError as exc:
print >> sys.stderr, exc.args[0]
sys.exit(1)
return option_parser.options
def _init_logging(level=None):
logging.basicConfig(level=level,
mapping = { "debug": logging.DEBUG,
"info": logging.INFO,
"warning": logging.WARNING,
"error": logging.ERROR,
"critical": logging.CRITICAL }
logging.basicConfig(level=mapping[level],
format='%(asctime)s.%(msecs)03d %(levelname)8s %(name)20s: %(message)s',
datefmt='%H:%M:%S')
@ -484,8 +332,18 @@ def _init_logging(level=None):
logger.debug("logging at level %s", logging.getLevelName(level))
logger.info("using Python %i.%i.%i %s %i", *sys.version_info)
def _init_log_option(parser):
choices = ["none", "debug", "info", "warning", "error", "critical"]
parser.add_option("--log-level", "-l",
type="choice",
choices=choices,
action="store",
dest="log_level",
default="none",
help=_("Enable logging, possible values: ") + ", ".join(choices))
return parser
def main(option_parser=None, gettext_domain=None, paths=None):
def main(main_function, option_parser, gettext_domain=None, paths=None):
# FIXME:
global Paths
@ -494,15 +352,11 @@ def main(option_parser=None, gettext_domain=None, paths=None):
_init_excepthooks()
_init_paths(paths)
_init_locale(gettext_domain)
options = _init_options(option_parser)
try:
log_level = options["log_level"]
except KeyError:
_init_logging()
else:
_init_logging(log_level)
parser = _init_log_option(option_parser)
options, args = option_parser.parse_args()
_init_logging(options.log_level)
try:
options["main"](options)
main_function(args)
finally:
logging.shutdown()

View file

@ -19,7 +19,7 @@
"""GStreamer Development Utilities Common package."""
import Data
import GUI
import Main
import utils
from . import Data
from . import GUI
from . import Main
from . import utils

View file

@ -36,7 +36,7 @@ class SingletonMeta (type):
def __call__(cls, *a, **kw):
kw_key = tuple(sorted(kw.iteritems()))
kw_key = tuple(sorted(kw.items()))
try:
obj = cls._singleton_instances[a + kw_key]

View file

@ -22,6 +22,7 @@
import os
import logging
import re
import sys
# Nanosecond resolution (like Gst.SECOND)
SECOND = 1000000000
@ -67,8 +68,8 @@ def parse_time(st):
h, m, s = st.split(":")
secs, subsecs = s.split(".")
return (long((int(h) * 60 ** 2 + int(m) * 60) * SECOND) +
long(secs) * SECOND + long(subsecs))
return int((int(h) * 60 ** 2 + int(m) * 60) * SECOND) + \
int(secs) * SECOND + int(subsecs)
class DebugLevel (int):
@ -130,15 +131,15 @@ debug_levels = [debug_level_none,
debug_level_error]
# For stripping color codes:
_escape = re.compile("\x1b\\[[0-9;]*m")
_escape = re.compile(b"\x1b\\[[0-9;]*m")
def strip_escape(s):
# FIXME: This can be optimized further!
while "\x1b" in s:
s = _escape.sub("", s)
while b"\x1b" in s:
s = _escape.sub(b"", s)
return s
@ -155,7 +156,7 @@ def default_log_line_regex_():
PID = r"(\d+)\s*"
FILENAME = r"([^:]*):"
LINE = r"(\d+):"
FUNCTION = "(~?[A-Za-z0-9_]*|operator\(\)):"
FUNCTION = "(~?[A-Za-z0-9_\s\*,\(\)]*):"
# FIXME: When non-g(st)object stuff is logged with *_OBJECT (like
# buffers!), the address is printed *without* <> brackets!
OBJECT = "(?:<([^>]+)>)?"
@ -201,7 +202,7 @@ class SortHelper (object):
def __init__(self, fileobj, offsets):
self._gen = self.__gen(fileobj, offsets)
self._gen.next()
next(self._gen)
# Override in the instance, for performance (this gets called in an
# inner loop):
@ -252,7 +253,7 @@ class SortHelper (object):
mid = int(floor(lo * 0.1 + hi * 0.9))
seek(offsets[mid])
mid_time_string = read(time_len)
if insert_time_string < mid_time_string:
if insert_time_string.encode('utf8') < mid_time_string:
hi = mid
else:
lo = mid + 1
@ -339,7 +340,7 @@ class LineCache (Producer):
yield True
offset = tell()
line = readline()
line = readline().decode('utf-8')
if not line:
break
match = rexp_match(line)
@ -379,8 +380,7 @@ class LogLine (list):
@classmethod
def parse_full(cls, line_string):
match = cls._line_regex.match(line_string)
match = cls._line_regex.match(line_string.decode('utf8'))
if match is None:
# raise ValueError ("not a valid log line (%r)" % (line_string,))
groups = [0, 0, 0, 0, "", "", 0, "", "", 0]
@ -392,7 +392,7 @@ class LogLine (list):
# PID.
line[1] = int(line[1])
# Thread.
line[2] = long(line[2], 16)
line[2] = int(line[2], 16)
# Level (this is handled in LineCache).
line[3] = 0
# Line.
@ -404,7 +404,7 @@ class LogLine (list):
5, # COL_FILENAME
7, # COL_FUNCTION,
8,): # COL_OBJECT
line[col_id] = intern(line[col_id] or "")
line[col_id] = sys.intern(line[col_id] or "")
return line
@ -450,22 +450,12 @@ class LogFile (Producer):
self.logger = logging.getLogger("logfile")
self.path = os.path.normpath(os.path.abspath(filename))
self.__real_fileobj = file(filename, "rb")
self.__real_fileobj = open(filename, "rb")
self.fileobj = mmap.mmap(
self.__real_fileobj.fileno(), 0, access=mmap.ACCESS_READ)
self.line_cache = LineCache(self.fileobj, dispatcher)
self.line_cache.consumers.append(self)
def get_full_line(self, line_index):
offset = self.line_cache.offsets[line_index]
self.fileobj.seek(offset)
line_string = self.fileobj.readline()
line = LogLine.parse_full(line_string)
msg = line_string[line[-1]:]
line[-1] = msg
return line
def start_loading(self):
self.logger.debug("starting load")

View file

@ -27,9 +27,7 @@ import gi
from GstDebugViewer.GUI.app import App
def main(options):
args = options["args"]
def main(args):
app = App()

View file

@ -21,6 +21,10 @@
import os.path
import gi
gi.require_version('Gdk', '3.0')
gi.require_version('Gtk', '3.0')
from gi.repository import GObject
from gi.repository import Gdk
from gi.repository import Gtk
@ -92,7 +96,7 @@ class App (object):
# Apply custom widget stying
# TODO: check for dark theme
css = """
css = b"""
@define-color normal_bg_color #FFFFFF;
@define-color shade_bg_color shade(@normal_bg_color, 0.95);
#log_view row:nth-child(even) {

View file

@ -346,7 +346,7 @@ class MessageColumn (TextColumn):
def message_data_func(column, cell, model, tree_iter, user_data):
msg = model.get_value(tree_iter, id_)
msg = model.get_value(tree_iter, id_).decode("utf8")
if not highlighters:
cell.props.text = msg
@ -355,7 +355,7 @@ class MessageColumn (TextColumn):
if len(highlighters) > 1:
raise NotImplementedError("FIXME: Support more than one...")
highlighter = highlighters.values()[0]
highlighter = list(highlighters.values())[0]
row = model[tree_iter]
ranges = highlighter(row)
if not ranges:

View file

@ -29,9 +29,7 @@ from gi.repository import Gtk
from GstDebugViewer import Common, Data
class LogModelBase (Common.GUI.GenericTreeModel):
__metaclass__ = Common.GUI.MetaModel
class LogModelBase (Common.GUI.GenericTreeModel, metaclass=Common.GUI.MetaModel):
columns = ("COL_TIME", GObject.TYPE_UINT64,
"COL_PID", int,
@ -134,6 +132,8 @@ class LogModelBase (Common.GUI.GenericTreeModel):
if col_id == self.COL_MESSAGE:
# strip whitespace + newline
value = self.access_offset(line_offset + value).strip()
elif col_id in (self.COL_TIME, self.COL_THREAD):
value = GObject.Value(GObject.TYPE_UINT64, value)
return value
@ -270,7 +270,7 @@ class FilteredLogModel (FilteredLogModelBase):
self.line_offsets = self.super_model.line_offsets
self.line_levels = self.super_model.line_levels
self.super_index = xrange(len(self.line_offsets))
self.super_index = range(len(self.line_offsets))
del self.filters[:]
@ -374,7 +374,7 @@ class FilteredLogModel (FilteredLogModelBase):
if len(self.filters) == 0:
# Identity.
self.super_index = xrange(super_start, super_stop)
self.super_index = range(super_start, super_stop)
self.line_offsets = SubRange(self.super_model.line_offsets,
super_start, super_stop)
self.line_levels = SubRange(self.super_model.line_levels,
@ -384,12 +384,12 @@ class FilteredLogModel (FilteredLogModelBase):
if super_start < old_super_start:
# TODO:
raise NotImplementedError("Only handling further restriction of the range"
" (start offset = %i)" % (start_offset,))
" (start offset = %i)" % (super_start,))
if super_stop > old_super_stop:
# TODO:
raise NotImplementedError("Only handling further restriction of the range"
" (end offset = %i)" % (stop_offset,))
" (end offset = %i)" % (super_stop,))
start = self.line_index_from_super(super_start)
stop = self.line_index_from_super(super_stop)
@ -439,7 +439,7 @@ class SubRange (object):
def __iter__(self):
l = self.l
for i in xrange(self.start, self.stop):
for i in range(self.start, self.stop):
yield l[i]

View file

@ -58,7 +58,7 @@ def action(func):
def iter_actions(manager):
cls = type(manager)
it = cls.__dict__.iteritems()
it = cls.__dict__.items()
for name, member in it:
try:
member.is_action_handler
@ -508,9 +508,9 @@ class Window (object):
if start_index is not None and not scroll_to_selection:
def traverse():
for i in xrange(start_index, len(model)):
for i in range(start_index, len(model)):
yield i
for i in xrange(start_index - 1, 0, -1):
for i in range(start_index - 1, 0, -1):
yield i
for current_index in traverse():
try:
@ -686,13 +686,13 @@ class Window (object):
line_text = model.access_offset(line_offset).strip()
line_text = Data.strip_escape(line_text)
self.clipboard.set_text(line_text, -1)
self.clipboard.set_text(line_text.decode('utf8'), -1)
@action
def handle_edit_copy_message_action_activate(self, action):
col_id = LogModelBase.COL_MESSAGE
self.clipboard.set_text(self.get_active_line()[col_id])
self.clipboard.set_text(self.get_active_line()[col_id].decode('utf8'), -1)
@action
def handle_enlarge_text_action_activate(self, action):

View file

@ -20,63 +20,41 @@
"""GStreamer Debug Viewer Main module."""
import sys
import optparse
from gettext import gettext as _, ngettext
from gi.repository import GLib
from GstDebugViewer import GUI
import GstDebugViewer.Common.Main
Common = GstDebugViewer.Common
GETTEXT_DOMAIN = "gst-debug-viewer"
def main_version():
def main_version(opt, value, parser, *args, **kwargs):
from GstDebugViewer import version
print "GStreamer Debug Viewer %s" % (version,)
print("GStreamer Debug Viewer %s" % (version,))
sys.exit(0)
class Paths (Common.Main.PathsProgramBase):
program_name = "gst-debug-viewer"
class OptionParser (Common.Main.LogOptionParser):
def __init__(self, options):
Common.Main.LogOptionParser.__init__(self, options)
options["main"] = None
options["args"] = []
self.add_option("version", None, _("Display version and exit"))
def get_parameter_string(self):
return _("[FILENAME] - Display and analyze GStreamer debug log files")
def handle_parse_complete(self, remaining_args):
try:
version = self.options["version"]
except KeyError:
pass
else:
main_version()
sys.exit(0)
if self.options["main"] is None:
from GstDebugViewer import GUI
self.options["main"] = GUI.main
self.options["args"][:] = remaining_args
def main():
parser = optparse.OptionParser(
_("%prog [OPTION...] [FILENAME]"),
description=_("Display and analyze GStreamer debug log files"))
parser.add_option("--version", "-v",
action="callback",
dest="version",
callback=main_version,
help=_("Display version and exit"))
options = {}
parser = OptionParser(options)
Common.Main.main(option_parser=parser,
Common.Main.main(main_function=GUI.main,
option_parser=parser,
gettext_domain=GETTEXT_DOMAIN,
paths=Paths)

View file

@ -34,21 +34,24 @@ class SearchOperation (object):
def __init__(self, model, search_text, search_forward=True, start_position=None):
self.model = model
self.search_text = search_text
if isinstance(search_text, str):
self.search_text = search_text.encode('utf8')
else:
self.search_text = search_text
self.search_forward = search_forward
self.start_position = start_position
col_id = GUI.models.LogModelBase.COL_MESSAGE
len_search_text = len(search_text)
len_search_text = len(self.search_text)
def match_func(model_row):
message = model_row[col_id]
if search_text in message:
if self.search_text in message:
ranges = []
start = 0
while True:
pos = message.find(search_text, start)
pos = message.find(self.search_text, start)
if pos == -1:
break
ranges.append((pos, pos + len_search_text,))
@ -99,7 +102,7 @@ class SearchSentinel (object):
nth_child = model.iter_nth_child
def iter_next_():
for i in xrange(start_pos, -1, -1):
for i in range(start_pos, -1, -1):
yield nth_child(None, i)
yield None
it_ = iter_next_()

View file

@ -35,7 +35,7 @@ import cairo
def iter_model_reversed(model):
count = model.iter_n_children(None)
for i in xrange(count - 1, 0, -1):
for i in range(count - 1, 0, -1):
yield model[i]
@ -192,7 +192,7 @@ class LevelDistributionSentinel (object):
level_index = stop_index
level_iter = iter(levels)
try:
level = level_iter.next()
level = level_iter.__next__()
except StopIteration:
level_iter = None
continue
@ -640,7 +640,7 @@ class TimelineWidget (Gtk.DrawingArea):
pixel_step = self.find_indicative_time_step()
ctx.set_source_rgb(.9, .9, .9)
start = dirty_start - dirty_start % pixel_step
for x in xrange(start + pixel_step, dirty_stop, pixel_step):
for x in range(start + pixel_step, dirty_stop, pixel_step):
ctx.move_to(x - .5, 0)
ctx.line_to(x - .5, height)
ctx.stroke()

View file

@ -1,4 +1,4 @@
#!/usr/bin/env python
#!/usr/bin/env python3
# -*- coding: utf-8; mode: python; -*-
#
# GStreamer Debug Viewer - View and analyze GStreamer debug log files

View file

@ -38,7 +38,7 @@ def perform_substitution (filename, values):
data = fp.read ()
fp.close ()
for name, value in values.items ():
for name, value in list(values.items ()):
data = data.replace ("$%s$" % (name,), value)
fp = file (filename, "wt")
@ -74,7 +74,7 @@ class clean_custom (clean):
def remove_file (self, path):
if os.path.exists (path):
print "removing '%s'" % (path,)
print("removing '%s'" % (path,))
if not self.dry_run:
os.unlink (path)
@ -300,7 +300,7 @@ class install_scripts_custom (install_scripts):
if install.root:
root = normpath (install.root)
len_root = len (root)
for name, value in values.items ():
for name, value in list(values.items ()):
if normpath (value).startswith (root):
values[name] = normpath (value)[len_root:]

View file

@ -38,13 +38,13 @@ def main ():
Data.debug_level_info,)
shift = 0
for i in xrange (count):
for i in range (count):
ts = i * 10000
shift += i % (count // 100)
level = levels[(i + shift) % 3]
print line_string (ts, pid, thread, level, category, filename, file_line,
function, object_, message)
print(line_string (ts, pid, thread, level, category, filename, file_line,
function, object_, message))
if __name__ == "__main__":
main ()

View file

@ -53,20 +53,20 @@ class TestParsingPerformance (object):
def handle_load_finished (self):
diff = time.time () - self.start_time
print "line cache built in %0.1f ms" % (diff * 1000.,)
print("line cache built in %0.1f ms" % (diff * 1000.,))
start_time = time.time ()
model = GUI.LazyLogModel (self.log_file)
for row in model:
pass
diff = time.time () - start_time
print "model iterated in %0.1f ms" % (diff * 1000.,)
print "overall time spent: %0.1f s" % (time.time () - self.start_time,)
print("model iterated in %0.1f ms" % (diff * 1000.,))
print("overall time spent: %0.1f s" % (time.time () - self.start_time,))
import resource
rusage = resource.getrusage (resource.RUSAGE_SELF)
print "time spent in user mode: %.2f s" % (rusage.ru_utime,)
print "time spent in system mode: %.2f s" % (rusage.ru_stime,)
print("time spent in user mode: %.2f s" % (rusage.ru_utime,))
print("time spent in system mode: %.2f s" % (rusage.ru_stime,))
def main ():

View file

@ -33,42 +33,41 @@ from GstDebugViewer import Common, Data
from GstDebugViewer.GUI.filters import CategoryFilter, Filter
from GstDebugViewer.GUI.models import (FilteredLogModel,
LogModelBase,
RangeFilteredLogModel,
SubRange,)
class TestSubRange (TestCase):
def test_len (self):
l = range (20)
l = list(range(20))
sr = SubRange (l, 0, 20)
self.assertEquals (len (sr), 20)
self.assertEqual (len (sr), 20)
sr = SubRange (l, 10, 20)
self.assertEquals (len (sr), 10)
self.assertEqual (len (sr), 10)
sr = SubRange (l, 0, 10)
self.assertEquals (len (sr), 10)
self.assertEqual (len (sr), 10)
sr = SubRange (l, 5, 15)
self.assertEquals (len (sr), 10)
self.assertEqual (len (sr), 10)
def test_iter (self):
l = range (20)
l = list(range(20))
sr = SubRange (l, 0, 20)
self.assertEquals (list (sr), l)
self.assertEqual (list (sr), l)
sr = SubRange (l, 10, 20)
self.assertEquals (list (sr), range (10, 20))
self.assertEqual (list (sr), list(range(10, 20)))
sr = SubRange (l, 0, 10)
self.assertEquals (list (sr), range (0, 10))
self.assertEqual (list (sr), list(range(0, 10)))
sr = SubRange (l, 5, 15)
self.assertEquals (list (sr), range (5, 15))
self.assertEqual (list (sr), list(range(5, 15)))
class Model (LogModelBase):
@ -84,12 +83,12 @@ class Model (LogModelBase):
pid = line_offset // 100
if pid % 2 == 0:
category = "EVEN"
category = b"EVEN"
else:
category = "ODD"
category = b"ODD"
line_fmt = ("0:00:00.000000000 %5i 0x0000000 DEBUG "
"%20s dummy.c:1:dummy: dummy")
line_fmt = (b"0:00:00.000000000 %5i 0x0000000 DEBUG "
b"%20s dummy.c:1:dummy: dummy")
line_str = line_fmt % (pid, category,)
log_line = Data.LogLine.parse_full (line_str)
self.line_cache[line_offset] = log_line
@ -122,76 +121,35 @@ class TestDynamicFilter (TestCase):
def test_unset_filter_rerange (self):
full_model = Model ()
ranged_model = RangeFilteredLogModel (full_model)
# FIXME: Call to .reset should not be needed.
ranged_model.reset ()
filtered_model = FilteredLogModel (ranged_model)
filtered_model = FilteredLogModel (full_model)
row_list = self.__row_list
self.assertEquals (row_list (full_model), range (20))
self.assertEquals (row_list (ranged_model), range (20))
self.assertEquals (row_list (filtered_model), range (20))
self.assertEqual (row_list (full_model), list(range(20)))
self.assertEqual (row_list (filtered_model), list(range(20)))
ranged_model.set_range (5, 16)
filtered_model.super_model_changed_range ()
filtered_model.set_range (5, 16)
self.assertEquals (row_list (ranged_model), range (5, 16))
self.assertEquals (row_list (filtered_model), range (5, 16))
self.assertEquals ([filtered_model.line_index_from_super (i)
for i in range (11)],
range (11))
self.assertEquals ([filtered_model.line_index_to_super (i)
for i in range (11)],
range (11))
self.assertEquals ([filtered_model.line_index_from_top (i)
for i in range (5, 16)],
range (11))
self.assertEquals ([filtered_model.line_index_to_top (i)
for i in range (11)],
range (5, 16))
self.assertEqual (row_list (filtered_model), list(range(5, 16)))
def test_identity_filter_rerange (self):
full_model = Model ()
ranged_model = RangeFilteredLogModel (full_model)
# FIXME: Call to .reset should not be needed.
ranged_model.reset ()
filtered_model = FilteredLogModel (ranged_model)
filtered_model = FilteredLogModel (full_model)
row_list = self.__row_list
self.assertEquals (row_list (full_model), range (20))
self.assertEquals (row_list (ranged_model), range (20))
self.assertEquals (row_list (filtered_model), range (20))
self.assertEqual (row_list (full_model), list(range(20)))
self.assertEqual (row_list (filtered_model), list(range(20)))
filtered_model.add_filter (IdentityFilter (),
Common.Data.DefaultDispatcher ())
ranged_model.set_range (5, 16)
filtered_model.super_model_changed_range ()
filtered_model.set_range (5, 16)
self.assertEquals (row_list (ranged_model), range (5, 16))
self.assertEquals (row_list (filtered_model), range (5, 16))
self.assertEquals ([filtered_model.line_index_from_super (i)
for i in range (11)],
range (11))
self.assertEquals ([filtered_model.line_index_to_super (i)
for i in range (11)],
range (11))
self.assertEquals ([filtered_model.line_index_from_top (i)
for i in range (5, 16)],
range (11))
self.assertEquals ([filtered_model.line_index_to_top (i)
for i in range (11)],
range (5, 16))
self.assertEqual (row_list (filtered_model), list(range(5, 16)))
def test_filtered_range_refilter_skip (self):
full_model = Model ()
ranged_model = RangeFilteredLogModel (full_model)
# FIXME: Call to .reset should not be needed.
ranged_model.reset ()
filtered_model = FilteredLogModel (ranged_model)
filtered_model = FilteredLogModel (full_model)
row_list = self.__row_list
@ -199,233 +157,89 @@ class TestDynamicFilter (TestCase):
Common.Data.DefaultDispatcher ())
self.__dump_model (filtered_model, "filtered")
self.assertEquals (row_list (filtered_model), range (1, 20, 2))
self.assertEquals ([filtered_model.line_index_from_super (i)
self.assertEqual (row_list (filtered_model), list(range(1, 20, 2)))
self.assertEqual ([filtered_model.line_index_from_super (i)
for i in range (1, 20, 2)],
range (10))
self.assertEquals ([filtered_model.line_index_to_super (i)
list(range(10)))
self.assertEqual ([filtered_model.line_index_to_super (i)
for i in range (10)],
range (1, 20, 2))
self.assertEquals ([filtered_model.line_index_from_top (i)
for i in range (1, 20, 2)],
range (10))
self.assertEquals ([filtered_model.line_index_to_top (i)
for i in range (10)],
range (1, 20, 2))
list(range(1, 20, 2)))
ranged_model.set_range (1, 20)
self.__dump_model (ranged_model, "ranged (1, 20)")
filtered_model.super_model_changed_range ()
filtered_model.set_range (1, 20)
self.__dump_model (filtered_model, "ranged (1, 20)")
self.__dump_model (filtered_model, "filtered range")
self.assertEquals ([filtered_model.line_index_from_super (i)
self.assertEqual ([filtered_model.line_index_from_super (i)
for i in range (0, 19, 2)],
range (10))
self.assertEquals ([filtered_model.line_index_to_super (i)
list(range(10)))
self.assertEqual ([filtered_model.line_index_to_super (i)
for i in range (10)],
range (0, 19, 2))
self.assertEquals ([filtered_model.line_index_from_top (i)
for i in range (1, 20, 2)],
range (10))
self.assertEquals ([filtered_model.line_index_to_top (i)
for i in range (10)],
range (1, 20, 2))
list(range(1, 20, 2)))
ranged_model.set_range (2, 20)
self.__dump_model (ranged_model, "ranged (2, 20)")
filtered_model.super_model_changed_range ()
self.__dump_model (filtered_model, "filtered range")
filtered_model.set_range (2, 20)
self.__dump_model (filtered_model, "ranged (2, 20)")
self.assertEquals (row_list (filtered_model), range (3, 20, 2))
self.assertEquals ([filtered_model.line_index_from_super (i)
for i in range (1, 18, 2)],
range (9))
self.assertEquals ([filtered_model.line_index_to_super (i)
for i in range (9)],
range (1, 18, 2))
self.assertEquals ([filtered_model.line_index_from_top (i)
for i in range (3, 20, 2)],
range (9))
self.assertEquals ([filtered_model.line_index_to_top (i)
for i in range (9)],
range (3, 20, 2))
self.assertEqual (row_list (filtered_model), list(range(3, 20, 2)))
def test_filtered_range_refilter (self):
full_model = Model ()
ranged_model = RangeFilteredLogModel (full_model)
# FIXME: Call to .reset should not be needed.
ranged_model.reset ()
filtered_model = FilteredLogModel (ranged_model)
filtered_model = FilteredLogModel (full_model)
row_list = self.__row_list
rows = row_list (full_model)
rows_ranged = row_list (ranged_model)
rows_filtered = row_list (filtered_model)
self.__dump_model (full_model, "full model")
## self.__dump_model (ranged_model, "ranged model")
## self.__dump_model (filtered_model, "filtered model")
self.assertEquals (rows, rows_ranged)
self.assertEquals (rows, rows_filtered)
self.assertEqual (rows, rows_filtered)
self.assertEquals ([ranged_model.line_index_from_super (i)
self.assertEqual ([filtered_model.line_index_from_super (i)
for i in range (20)],
range (20))
self.assertEquals ([ranged_model.line_index_to_super (i)
list(range(20)))
self.assertEqual ([filtered_model.line_index_to_super (i)
for i in range (20)],
range (20))
self.assertEquals ([ranged_model.line_index_from_top (i)
for i in range (20)],
range (20))
self.assertEquals ([ranged_model.line_index_to_top (i)
for i in range (20)],
range (20))
list(range(20)))
self.assertEquals ([filtered_model.line_index_from_super (i)
for i in range (20)],
range (20))
self.assertEquals ([filtered_model.line_index_to_super (i)
for i in range (20)],
range (20))
self.assertEquals ([filtered_model.line_index_from_top (i)
for i in range (20)],
range (20))
self.assertEquals ([filtered_model.line_index_to_top (i)
for i in range (20)],
range (20))
filtered_model.set_range (5, 16)
self.__dump_model (filtered_model, "ranged model (5, 16)")
ranged_model.set_range (5, 16)
self.__dump_model (ranged_model, "ranged model (5, 16)")
filtered_model.super_model_changed_range ()
rows_ranged = row_list (ranged_model)
self.assertEquals (rows_ranged, range (5, 16))
rows_ranged = row_list (filtered_model)
self.assertEqual (rows_ranged, list(range(5, 16)))
self.__dump_model (filtered_model, "filtered model (nofilter, 5, 15)")
rows_filtered = row_list (filtered_model)
self.assertEquals (rows_ranged, rows_filtered)
self.assertEquals ([ranged_model.line_index_from_super (i)
for i in range (5, 16)],
range (11))
self.assertEquals ([ranged_model.line_index_to_super (i)
for i in range (11)],
range (5, 16))
self.assertEquals ([ranged_model.line_index_from_top (i)
for i in range (5, 16)],
range (11))
self.assertEquals ([ranged_model.line_index_to_top (i)
for i in range (11)],
range (5, 16))
self.assertEquals ([filtered_model.line_index_from_super (i)
for i in range (11)],
range (11))
self.assertEquals ([filtered_model.line_index_to_super (i)
for i in range (11)],
range (11))
self.assertEquals ([filtered_model.line_index_from_top (i)
for i in range (5, 16)],
range (11))
self.assertEquals ([filtered_model.line_index_to_top (i)
for i in range (11)],
range (5, 16))
filtered_model.add_filter (CategoryFilter ("EVEN"),
Common.Data.DefaultDispatcher ())
rows_filtered = row_list (filtered_model)
self.assertEquals (rows_filtered, range (5, 16, 2))
self.assertEqual (rows_filtered, list(range(5, 16, 2)))
self.__dump_model (filtered_model, "filtered model")
self.assertEquals ([filtered_model.line_index_from_super (i)
for i in range (0, 11, 2)],
range (6))
self.assertEquals ([filtered_model.line_index_from_top (i)
for i in range (5, 16, 2)],
range (6))
ranged_model.set_range (7, 13)
self.__dump_model (ranged_model, "ranged model (7, 13)")
filtered_model.super_model_changed_range ()
self.assertEquals (row_list (ranged_model), range (7, 13))
self.assertEquals ([ranged_model.line_index_from_super (i)
for i in range (7, 13)],
range (6))
self.assertEquals ([ranged_model.line_index_to_super (i)
for i in range (6)],
range (7, 13))
self.assertEquals ([ranged_model.line_index_from_top (i)
for i in range (7, 13)],
range (6))
self.assertEquals ([ranged_model.line_index_to_top (i)
for i in range (6)],
range (7, 13))
self.__dump_model (filtered_model, "filtered model (ranged 7, 12)")
self.assertEquals ([filtered_model.line_index_from_super (i)
for i in range (0, 6, 2)],
range (3))
self.assertEquals ([filtered_model.line_index_to_super (i)
for i in range (3)],
range (0, 6, 2))
self.assertEquals ([filtered_model.line_index_from_top (i)
for i in range (7, 12, 2)],
range (3))
self.assertEquals ([filtered_model.line_index_to_top (i)
for i in range (3)],
range (7, 12, 2))
rows_filtered = row_list (filtered_model)
self.assertEquals (rows_filtered, range (7, 13, 2))
def test_random_filtered_range_refilter (self):
full_model = Model ()
ranged_model = RangeFilteredLogModel (full_model)
# FIXME: Call to .reset should not be needed.
ranged_model.reset ()
filtered_model = FilteredLogModel (ranged_model)
filtered_model = FilteredLogModel (full_model)
row_list = self.__row_list
self.assertEquals (row_list (full_model), range (20))
self.assertEquals (row_list (ranged_model), range (20))
self.assertEquals (row_list (filtered_model), range (20))
self.assertEqual (row_list (full_model), list(range(20)))
self.assertEqual (row_list (filtered_model), list(range(20)))
filtered_model.add_filter (RandomFilter (538295943),
Common.Data.DefaultDispatcher ())
random_rows = row_list (filtered_model)
self.__dump_model (filtered_model)
ranged_model.set_range (10, 20)
self.__dump_model (ranged_model, "ranged_model (10, 20)")
self.assertEquals (row_list (ranged_model), range (10, 20))
filtered_model.super_model_changed_range ()
self.__dump_model (filtered_model)
self.assertEquals (row_list (filtered_model), [x for x in range (10, 20) if x in random_rows])
ranged_model.set_range (0, 20)
self.assertEquals (row_list (ranged_model), range (0, 20))
ranged_model = RangeFilteredLogModel (full_model)
# FIXME: Call to .reset should not be needed.
ranged_model.reset ()
filtered_model = FilteredLogModel (ranged_model)
filtered_model = FilteredLogModel (full_model)
filtered_model.add_filter (RandomFilter (538295943),
Common.Data.DefaultDispatcher ())
self.__dump_model (filtered_model, "filtered model")
self.assertEquals (row_list (filtered_model), random_rows)
self.assertEqual (row_list (filtered_model), random_rows)
ranged_model.set_range (0, 10)
self.__dump_model (ranged_model, "ranged model (0, 10)")
filtered_model.super_model_changed_range ()
self.assertEquals (row_list (ranged_model), range (0, 10))
filtered_model.set_range (1, 10)
self.__dump_model (filtered_model)
self.assertEquals (row_list (filtered_model), [x for x in range (0, 10) if x in random_rows])
self.assertEqual (row_list (filtered_model), [x for x in range (0, 10) if x in random_rows])
def __row_list (self, model):
@ -439,7 +253,7 @@ class TestDynamicFilter (TestCase):
if not hasattr (model, "super_model"):
# Top model.
print "\t(%s)" % ("|".join ([str (i).rjust (2) for i in self.__row_list (model)]),),
print("\t(%s)" % ("|".join ([str (i).rjust (2) for i in self.__row_list (model)]),), end=' ')
else:
top_model = model.super_model
if hasattr (top_model, "super_model"):
@ -449,12 +263,12 @@ class TestDynamicFilter (TestCase):
output = [" "] * len (top_indices)
for i, position in enumerate (positions):
output[position] = str (i).rjust (2)
print "\t(%s)" % ("|".join (output),),
print("\t(%s)" % ("|".join (output),), end=' ')
if comment is None:
print
print()
else:
print comment
print(comment)
if __name__ == "__main__":
test_main ()