debug-viewer: PEP8 all the things

This commit is contained in:
Philippe Normand 2018-04-14 16:04:22 +01:00
parent e557b5326d
commit 8a9d72b7e6
17 changed files with 529 additions and 517 deletions

View file

@ -221,6 +221,7 @@ class ExceptHookManagerClass (object):
RuntimeWarning,
stacklevel=2)
ExceptHookManager = ExceptHookManagerClass()
@ -315,15 +316,16 @@ def _init_locale(gettext_domain=None):
gettext.textdomain(gettext_domain)
gettext.bind_textdomain_codeset(gettext_domain, "UTF-8")
def _init_logging(level):
if level == "none":
return
mapping = { "debug": logging.DEBUG,
"info": logging.INFO,
"warning": logging.WARNING,
"error": logging.ERROR,
"critical": logging.CRITICAL }
mapping = {"debug": logging.DEBUG,
"info": logging.INFO,
"warning": logging.WARNING,
"error": logging.ERROR,
"critical": logging.CRITICAL}
logging.basicConfig(level=mapping[level],
format='%(asctime)s.%(msecs)03d %(levelname)8s %(name)20s: %(message)s',
datefmt='%H:%M:%S')
@ -332,6 +334,7 @@ def _init_logging(level):
logger.debug("logging at level %s", logging.getLevelName(level))
logger.info("using Python %i.%i.%i %s %i", *sys.version_info)
def _init_log_option(parser):
choices = ["none", "debug", "info", "warning", "error", "critical"]
parser.add_option("--log-level", "-l",
@ -343,6 +346,7 @@ def _init_log_option(parser):
help=_("Enable logging, possible values: ") + ", ".join(choices))
return parser
def main(main_function, option_parser, gettext_domain=None, paths=None):
# FIXME:

View file

@ -58,7 +58,7 @@ def gettext_cache():
def gettext_cache_access(s):
if not s in d:
if s not in d:
d[s] = gettext(s)
return d[s]
@ -104,6 +104,7 @@ class _XDGClass (object):
setattr(self, name, dir)
XDG = _XDGClass()

View file

@ -69,7 +69,7 @@ def parse_time(st):
secs, subsecs = s.split(".")
return int((int(h) * 60 ** 2 + int(m) * 60) * SECOND) + \
int(secs) * SECOND + int(subsecs)
int(secs) * SECOND + int(subsecs)
class DebugLevel (int):
@ -113,6 +113,7 @@ class DebugLevel (int):
return DebugLevel(self - 1)
debug_level_none = DebugLevel("NONE")
debug_level_error = DebugLevel("ERROR")
debug_level_warning = DebugLevel("WARN")
@ -310,10 +311,10 @@ class LineCache (Producer):
"I": debug_level_info, "W": debug_level_warning,
"E": debug_level_error, " ": debug_level_none}
ANSI = "(?:\x1b\\[[0-9;]*m)?"
ANSI_PATTERN = (r"\d:\d\d:\d\d\.\d+ " + ANSI +
r" *\d+" + ANSI +
r" +0x[0-9a-f]+ +" + ANSI +
r"([TFLDIEW ])")
ANSI_PATTERN = r"\d:\d\d:\d\d\.\d+ " + ANSI + \
r" *\d+" + ANSI + \
r" +0x[0-9a-f]+ +" + ANSI + \
r"([TFLDIEW ])"
BARE_PATTERN = ANSI_PATTERN.replace(ANSI, "")
rexp_bare = re.compile(BARE_PATTERN)
rexp_ansi = re.compile(ANSI_PATTERN)
@ -345,7 +346,7 @@ class LineCache (Producer):
break
match = rexp_match(line)
if match is None:
if rexp is rexp_ansi or not "\x1b" in line:
if rexp is rexp_ansi or "\x1b" not in line:
continue
match = rexp_ansi.match(line)
@ -432,9 +433,9 @@ class LogLines (object):
def __iter__(self):
l = len(self)
size = len(self)
i = 0
while i < l:
while i < size:
yield self[i]
i += 1

View file

@ -39,5 +39,6 @@ def main(args):
app.run()
if __name__ == "__main__":
main()

View file

@ -19,10 +19,6 @@
"""GStreamer Debug Viewer GUI module."""
def _(s):
return s
import logging
from gi.repository import Gtk, GLib
@ -31,6 +27,10 @@ from GstDebugViewer import Common, Data
from GstDebugViewer.GUI.colors import LevelColorThemeTango
from GstDebugViewer.GUI.models import LazyLogModel, LogModelBase
def _(s):
return s
# Sync with gst-inspector!
@ -526,7 +526,7 @@ class ColumnManager (Common.GUI.Manager):
before = self.column_order[:pos]
shown_names = [col.name for col in self.columns]
for col_class in before:
if not col_class.name in shown_names:
if col_class.name not in shown_names:
pos -= 1
return pos

View file

@ -43,7 +43,8 @@ class DebugLevelFilter (Filter):
col_id = LogModelBase.COL_LEVEL
if mode == self.this_and_above:
comparison_function = lambda x, y: x < y
def comparison_function(x, y):
return x < y
else:
comparison_function = get_comparison_function(
mode == self.all_but_this)

View file

@ -75,7 +75,7 @@ class LogModelBase (Common.GUI.GenericTreeModel, metaclass=Common.GUI.MetaModel)
# adjust special rows
row[COL_LEVEL] = line_levels[i]
msg_offset = row[COL_MESSAGE]
row[COL_MESSAGE] = access_offset(offset + msg_offset)
row[COL_MESSAGE] = access_offset(offset + msg_offset)
yield (row, offset,)
row[COL_MESSAGE] = msg_offset
@ -401,21 +401,21 @@ class FilteredLogModel (FilteredLogModelBase):
class SubRange (object):
__slots__ = ("l", "start", "stop",)
__slots__ = ("size", "start", "stop",)
def __init__(self, l, start, stop):
def __init__(self, size, start, stop):
if start > stop:
raise ValueError(
"need start <= stop (got %r, %r)" % (start, stop,))
if type(l) == type(self):
if type(size) == type(self):
# Another SubRange, don't stack:
start += l.start
stop += l.start
l = l.l
start += size.start
stop += size.start
size = size.size
self.l = l
self.size = size
self.start = start
self.stop = stop
@ -428,9 +428,9 @@ class SubRange (object):
else:
stop += self.stop
return self.l[i.start + self.start:stop]
return self.size[i.start + self.start:stop]
else:
return self.l[i + self.start]
return self.size[i + self.start]
def __len__(self):
@ -438,9 +438,9 @@ class SubRange (object):
def __iter__(self):
l = self.l
size = self.size
for i in range(self.start, self.stop):
yield l[i]
yield size[i]
class LineViewLogModel (FilteredLogModelBase):

View file

@ -19,12 +19,6 @@
"""GStreamer Debug Viewer GUI module."""
ZOOM_FACTOR = 1.15
def _(s):
return s
import os.path
from bisect import bisect_right, bisect_left
import logging
@ -48,6 +42,13 @@ from GstDebugViewer.GUI.models import (FilteredLogModel,
LogModelBase)
ZOOM_FACTOR = 1.15
def _(s):
return s
def action(func):
func.is_action_handler = True
@ -242,10 +243,10 @@ class Window (object):
group = Gtk.ActionGroup("MenuActions")
group.add_actions([("AppMenuAction", None, _("_Application")),
("ViewMenuAction", None, _("_View")),
("ViewColumnsMenuAction", None, _("_Columns")),
("HelpMenuAction", None, _("_Help")),
("LineViewContextMenuAction", None, "")])
("ViewMenuAction", None, _("_View")),
("ViewColumnsMenuAction", None, _("_Columns")),
("HelpMenuAction", None, _("_Help")),
("LineViewContextMenuAction", None, "")])
self.actions.add_group(group)
group = Gtk.ActionGroup("WindowActions")
@ -567,8 +568,8 @@ class Window (object):
dialog = Gtk.FileChooserDialog(None, self.gtk_window,
Gtk.FileChooserAction.OPEN,
(Gtk.STOCK_CANCEL, Gtk.ResponseType.CANCEL,
Gtk.STOCK_OPEN, Gtk.ResponseType.ACCEPT,))
(Gtk.STOCK_CANCEL, Gtk.ResponseType.CANCEL,
Gtk.STOCK_OPEN, Gtk.ResponseType.ACCEPT,))
response = dialog.run()
dialog.hide()
if response == Gtk.ResponseType.ACCEPT:
@ -692,7 +693,8 @@ class Window (object):
def handle_edit_copy_message_action_activate(self, action):
col_id = LogModelBase.COL_MESSAGE
self.clipboard.set_text(self.get_active_line()[col_id].decode('utf8'), -1)
self.clipboard.set_text(self.get_active_line()[
col_id].decode('utf8'), -1)
@action
def handle_enlarge_text_action_activate(self, action):

View file

@ -39,6 +39,7 @@ def main_version(opt, value, parser, *args, **kwargs):
print("GStreamer Debug Viewer %s" % (version,))
sys.exit(0)
class Paths (Common.Main.PathsProgramBase):
program_name = "gst-debug-viewer"

View file

@ -224,9 +224,9 @@ class FindBarFeature (FeatureBase):
self.action_group.add_actions([("goto-next-search-result",
None, _("Goto Next Match"),
"<Ctrl>G"),
("goto-previous-search-result",
None, _("Goto Previous Match"),
"<Ctrl><Shift>G")])
("goto-previous-search-result",
None, _("Goto Previous Match"),
"<Ctrl><Shift>G")])
self.bar = None
self.operation = None
@ -434,7 +434,7 @@ class FindBarFeature (FeatureBase):
def handle_match_found(self, model, tree_iter):
if not self.search_state in ("search-forward", "search-backward",):
if self.search_state not in ("search-forward", "search-backward",):
self.logger.warning(
"inconsistent search state %r", self.search_state)
return

View file

@ -1,21 +1,24 @@
#!/usr/bin/env python
def line_string (ts, pid, thread, level, category, filename, line, function,
object_, message):
def line_string(ts, pid, thread, level, category, filename, line, function,
object_, message):
# Replicates gstreamer/gst/gstinfo.c:gst_debug_log_default.
# FIXME: Regarding object_, this doesn't fully replicate the formatting!
return "%s %5d 0x%x %s %20s %s:%d:%s:<%s> %s" % (Data.time_args (ts), pid, thread,
level.name.ljust (5), category,
return "%s %5d 0x%x %s %20s %s:%d:%s:<%s> %s" % (Data.time_args(ts), pid, thread,
level.name.ljust(
5), category,
filename, line, function,
object_, message,)
def main ():
def main():
import sys
import os.path
sys.path.append (os.path.dirname (os.path.dirname (sys.argv[0])))
sys.path.append(os.path.dirname(os.path.dirname(sys.argv[0])))
global Data
from GstDebugViewer import Data
@ -24,7 +27,7 @@ def main ():
ts = 0
pid = 12345
thread = int ("89abcdef", 16)
thread = int("89abcdef", 16)
level = Data.debug_level_log
category = "GST_DUMMY"
filename = "gstdummyfilename.c"
@ -38,13 +41,14 @@ def main ():
Data.debug_level_info,)
shift = 0
for i in range (count):
for i in range(count):
ts = i * 10000
shift += i % (count // 100)
level = levels[(i + shift) % 3]
print(line_string (ts, pid, thread, level, category, filename, file_line,
function, object_, message))
print(line_string(ts, pid, thread, level, category, filename, file_line,
function, object_, message))
if __name__ == "__main__":
main ()
main()

View file

@ -30,49 +30,50 @@ import gi
from gi.repository import GObject
sys.path.insert (0, os.path.join (sys.path[0], os.pardir))
from .. import Common, Data, GUI
from GstDebugViewer import Common, Data, GUI
class TestParsingPerformance (object):
def __init__ (self, filename):
def __init__(self, filename):
self.main_loop = GObject.MainLoop ()
self.log_file = Data.LogFile (filename, Common.Data.DefaultDispatcher ())
self.log_file.consumers.append (self)
self.main_loop = GObject.MainLoop()
self.log_file = Data.LogFile(filename, Common.Data.DefaultDispatcher())
self.log_file.consumers.append(self)
def start (self):
def start(self):
self.log_file.start_loading ()
self.log_file.start_loading()
def handle_load_started (self):
def handle_load_started(self):
self.start_time = time.time ()
self.start_time = time.time()
def handle_load_finished (self):
def handle_load_finished(self):
diff = time.time () - self.start_time
diff = time.time() - self.start_time
print("line cache built in %0.1f ms" % (diff * 1000.,))
start_time = time.time ()
model = GUI.LazyLogModel (self.log_file)
start_time = time.time()
model = GUI.LazyLogModel(self.log_file)
for row in model:
pass
diff = time.time () - start_time
diff = time.time() - start_time
print("model iterated in %0.1f ms" % (diff * 1000.,))
print("overall time spent: %0.1f s" % (time.time () - self.start_time,))
print("overall time spent: %0.1f s" % (time.time() - self.start_time,))
import resource
rusage = resource.getrusage (resource.RUSAGE_SELF)
rusage = resource.getrusage(resource.RUSAGE_SELF)
print("time spent in user mode: %.2f s" % (rusage.ru_utime,))
print("time spent in system mode: %.2f s" % (rusage.ru_stime,))
def main ():
if len (sys.argv) > 1:
test = TestParsingPerformance (sys.argv[1])
test.start ()
def main():
if len(sys.argv) > 1:
test = TestParsingPerformance(sys.argv[1])
test.start()
if __name__ == "__main__":
main ()
main()

View file

@ -0,0 +1,281 @@
#!/usr/bin/env python
# -*- coding: utf-8; mode: python; -*-
#
# GStreamer Debug Viewer - View and analyze GStreamer debug log files
#
# Copyright (C) 2007 René Stadler <mail@renestadler.de>
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the Free
# Software Foundation; either version 3 of the License, or (at your option)
# any later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
# more details.
#
# You should have received a copy of the GNU General Public License along with
# this program. If not, see <http://www.gnu.org/licenses/>.
"""GStreamer Debug Viewer test suite for the custom tree models."""
import sys
import os
import os.path
from glob import glob
from unittest import TestCase, main as test_main
from .. import Common, Data
from .. GUI.filters import CategoryFilter, Filter
from .. GUI.models import (FilteredLogModel,
LogModelBase,
SubRange,)
class TestSubRange (TestCase):
def test_len(self):
values = list(range(20))
sr = SubRange(values, 0, 20)
self.assertEqual(len(sr), 20)
sr = SubRange(values, 10, 20)
self.assertEqual(len(sr), 10)
sr = SubRange(values, 0, 10)
self.assertEqual(len(sr), 10)
sr = SubRange(values, 5, 15)
self.assertEqual(len(sr), 10)
def test_iter(self):
values = list(range(20))
sr = SubRange(values, 0, 20)
self.assertEqual(list(sr), values)
sr = SubRange(values, 10, 20)
self.assertEqual(list(sr), list(range(10, 20)))
sr = SubRange(values, 0, 10)
self.assertEqual(list(sr), list(range(0, 10)))
sr = SubRange(values, 5, 15)
self.assertEqual(list(sr), list(range(5, 15)))
class Model (LogModelBase):
def __init__(self):
LogModelBase.__init__(self)
for i in range(20):
self.line_offsets.append(i * 100)
self.line_levels.append(Data.debug_level_debug)
def ensure_cached(self, line_offset):
pid = line_offset // 100
if pid % 2 == 0:
category = b"EVEN"
else:
category = b"ODD"
line_fmt = (b"0:00:00.000000000 %5i 0x0000000 DEBUG "
b"%20s dummy.c:1:dummy: dummy")
line_str = line_fmt % (pid, category,)
log_line = Data.LogLine.parse_full(line_str)
self.line_cache[line_offset] = log_line
def access_offset(self, line_offset):
return ""
class IdentityFilter (Filter):
def __init__(self):
def filter_func(row):
return True
self.filter_func = filter_func
class RandomFilter (Filter):
def __init__(self, seed):
import random
rand = random.Random()
rand.seed(seed)
def filter_func(row):
return rand.choice((True, False,))
self.filter_func = filter_func
class TestDynamicFilter (TestCase):
def test_unset_filter_rerange(self):
full_model = Model()
filtered_model = FilteredLogModel(full_model)
row_list = self.__row_list
self.assertEqual(row_list(full_model), list(range(20)))
self.assertEqual(row_list(filtered_model), list(range(20)))
filtered_model.set_range(5, 16)
self.assertEqual(row_list(filtered_model), list(range(5, 16)))
def test_identity_filter_rerange(self):
full_model = Model()
filtered_model = FilteredLogModel(full_model)
row_list = self.__row_list
self.assertEqual(row_list(full_model), list(range(20)))
self.assertEqual(row_list(filtered_model), list(range(20)))
filtered_model.add_filter(IdentityFilter(),
Common.Data.DefaultDispatcher())
filtered_model.set_range(5, 16)
self.assertEqual(row_list(filtered_model), list(range(5, 16)))
def test_filtered_range_refilter_skip(self):
full_model = Model()
filtered_model = FilteredLogModel(full_model)
row_list = self.__row_list
filtered_model.add_filter(CategoryFilter("EVEN"),
Common.Data.DefaultDispatcher())
self.__dump_model(filtered_model, "filtered")
self.assertEqual(row_list(filtered_model), list(range(1, 20, 2)))
self.assertEqual([filtered_model.line_index_from_super(i)
for i in range(1, 20, 2)],
list(range(10)))
self.assertEqual([filtered_model.line_index_to_super(i)
for i in range(10)],
list(range(1, 20, 2)))
filtered_model.set_range(1, 20)
self.__dump_model(filtered_model, "ranged (1, 20)")
self.__dump_model(filtered_model, "filtered range")
self.assertEqual([filtered_model.line_index_from_super(i)
for i in range(0, 19, 2)],
list(range(10)))
self.assertEqual([filtered_model.line_index_to_super(i)
for i in range(10)],
list(range(1, 20, 2)))
filtered_model.set_range(2, 20)
self.__dump_model(filtered_model, "ranged (2, 20)")
self.assertEqual(row_list(filtered_model), list(range(3, 20, 2)))
def test_filtered_range_refilter(self):
full_model = Model()
filtered_model = FilteredLogModel(full_model)
row_list = self.__row_list
rows = row_list(full_model)
rows_filtered = row_list(filtered_model)
self.__dump_model(full_model, "full model")
self.assertEqual(rows, rows_filtered)
self.assertEqual([filtered_model.line_index_from_super(i)
for i in range(20)],
list(range(20)))
self.assertEqual([filtered_model.line_index_to_super(i)
for i in range(20)],
list(range(20)))
filtered_model.set_range(5, 16)
self.__dump_model(filtered_model, "ranged model (5, 16)")
rows_ranged = row_list(filtered_model)
self.assertEqual(rows_ranged, list(range(5, 16)))
self.__dump_model(filtered_model, "filtered model (nofilter, 5, 15)")
filtered_model.add_filter(CategoryFilter("EVEN"),
Common.Data.DefaultDispatcher())
rows_filtered = row_list(filtered_model)
self.assertEqual(rows_filtered, list(range(5, 16, 2)))
self.__dump_model(filtered_model, "filtered model")
def test_random_filtered_range_refilter(self):
full_model = Model()
filtered_model = FilteredLogModel(full_model)
row_list = self.__row_list
self.assertEqual(row_list(full_model), list(range(20)))
self.assertEqual(row_list(filtered_model), list(range(20)))
filtered_model.add_filter(RandomFilter(538295943),
Common.Data.DefaultDispatcher())
random_rows = row_list(filtered_model)
self.__dump_model(filtered_model)
filtered_model = FilteredLogModel(full_model)
filtered_model.add_filter(RandomFilter(538295943),
Common.Data.DefaultDispatcher())
self.__dump_model(filtered_model, "filtered model")
self.assertEqual(row_list(filtered_model), random_rows)
filtered_model.set_range(1, 10)
self.__dump_model(filtered_model)
self.assertEqual(row_list(filtered_model), [
x for x in range(0, 10) if x in random_rows])
def __row_list(self, model):
return [row[Model.COL_PID] for row in model]
def __dump_model(self, model, comment=None):
# TODO: Provide a command line option to turn this on and off.
return
if not hasattr(model, "super_model"):
# Top model.
print("\t(%s)" % ("|".join([str(i).rjust(2)
for i in self.__row_list(model)]),), end=' ')
else:
top_model = model.super_model
if hasattr(top_model, "super_model"):
top_model = top_model.super_model
top_indices = self.__row_list(top_model)
positions = self.__row_list(model)
output = [" "] * len(top_indices)
for i, position in enumerate(positions):
output[position] = str(i).rjust(2)
print("\t(%s)" % ("|".join(output),), end=' ')
if comment is None:
print()
else:
print(comment)
if __name__ == "__main__":
test_main()

View file

@ -25,102 +25,83 @@ import os
import os.path
import distutils.cmd
from distutils.core import setup
from setuptools import setup
from distutils.command.clean import clean
from distutils.command.build import build
from distutils.command.sdist import sdist
from distutils.command.install_scripts import install_scripts
from distutils.errors import *
def perform_substitution (filename, values):
fp = file (filename, "rt")
data = fp.read ()
fp.close ()
def perform_substitution(filename, values):
for name, value in list(values.items ()):
data = data.replace ("$%s$" % (name,), value)
fp = file(filename, "rt")
data = fp.read()
fp.close()
fp = file (filename, "wt")
fp.write (data)
fp.close ()
for name, value in list(values.items()):
data = data.replace("$%s$" % (name,), value)
class tests (distutils.cmd.Command):
fp = file(filename, "wt")
fp.write(data)
fp.close()
description = "run unit tests"
user_options = [("files=", "f", "test scripts",)]
def initialize_options (self):
self.files = []
def finalize_options (self):
from glob import glob
if self.files:
self.files = glob (os.path.join (*self.files.split ("/")))
else:
self.files = []
def run (self):
for filename in self.files:
self.spawn ([sys.executable, filename])
class clean_custom (clean):
def remove_file (self, path):
def remove_file(self, path):
if os.path.exists (path):
if os.path.exists(path):
print("removing '%s'" % (path,))
if not self.dry_run:
os.unlink (path)
os.unlink(path)
def remove_directory (self, path):
def remove_directory(self, path):
from distutils import dir_util
if os.path.exists (path):
dir_util.remove_tree (path, dry_run = self.dry_run)
if os.path.exists(path):
dir_util.remove_tree(path, dry_run=self.dry_run)
def run (self):
def run(self):
clean.run (self)
clean.run(self)
if os.path.exists ("MANIFEST.in"):
if os.path.exists("MANIFEST.in"):
# MANIFEST is generated, get rid of it.
self.remove_file ("MANIFEST")
self.remove_file("MANIFEST")
pot_file = os.path.join ("po", "gst-debug-viewer.pot")
self.remove_file (pot_file)
pot_file = os.path.join("po", "gst-debug-viewer.pot")
self.remove_file(pot_file)
self.remove_directory ("build")
self.remove_directory ("dist")
self.remove_directory("build")
self.remove_directory("dist")
for path, dirs, files in os.walk ("."):
for path, dirs, files in os.walk("."):
for filename in files:
if filename.endswith (".pyc") or filename.endswith (".pyo"):
file_path = os.path.join (path, filename)
self.remove_file (file_path)
if filename.endswith(".pyc") or filename.endswith(".pyo"):
file_path = os.path.join(path, filename)
self.remove_file(file_path)
class build_custom (build):
def build_l10n (self):
def build_l10n(self):
return self.l10n
sub_commands = build.sub_commands + [("build_l10n", build_l10n,)]
user_options = build.user_options + [("l10n", None, "enable translations",)]
user_options = build.user_options + \
[("l10n", None, "enable translations",)]
boolean_options = build.boolean_options + ["l10n"]
def initialize_options (self):
def initialize_options(self):
build.initialize_options (self)
build.initialize_options(self)
self.l10n = False
class build_l10n (distutils.cmd.Command):
# Based on code from python-distutils-extra by Sebastian Heinlein.
@ -135,7 +116,7 @@ class build_l10n (distutils.cmd.Command):
("domain=", "d", "gettext domain"),
("bug-contact=", "c", "contact address for msgid bugs")]
def initialize_options (self):
def initialize_options(self):
self.merge_desktop_files = []
self.merge_xml_files = []
@ -145,54 +126,55 @@ class build_l10n (distutils.cmd.Command):
self.domain = None
self.bug_contact = None
def finalize_options (self):
def finalize_options(self):
for attr in ("desktop", "xml", "key", "schemas", "rfc822deb",):
value = getattr (self, "merge_%s_files" % (attr,))
value = getattr(self, "merge_%s_files" % (attr,))
if not value:
value = []
else:
value = eval (value)
setattr (self, "merge_%s_files" % (attr,), value)
value = eval(value)
setattr(self, "merge_%s_files" % (attr,), value)
if self.domain is None:
self.domain = self.distribution.metadata.name
def run (self):
def run(self):
from glob import glob
data_files = self.distribution.data_files
po_makefile = os.path.join ("po", "Makefile")
if os.path.exists (po_makefile):
raise DistutilsFileError ("file %s exists (intltool will pick up "
"values from there)" % (po_makefile,))
po_makefile = os.path.join("po", "Makefile")
if os.path.exists(po_makefile):
raise DistutilsFileError("file %s exists (intltool will pick up "
"values from there)" % (po_makefile,))
cwd = os.getcwd ()
cwd = os.getcwd()
if self.bug_contact is not None:
os.environ["XGETTEXT_ARGS"] = "--msgid-bugs-address=%s" % (self.bug_contact,)
os.chdir (os.path.join (cwd, "po"))
os.environ["XGETTEXT_ARGS"] = "--msgid-bugs-address=%s" % (
self.bug_contact,)
os.chdir(os.path.join(cwd, "po"))
# Update .pot file.
self.spawn (["intltool-update", "-p", "-g", self.domain])
self.spawn(["intltool-update", "-p", "-g", self.domain])
# Merge new strings into .po files.
self.spawn (["intltool-update", "-r", "-g", self.domain])
self.spawn(["intltool-update", "-r", "-g", self.domain])
os.chdir (cwd)
os.chdir(cwd)
for po_file in glob (os.path.join ("po", "*.po")):
lang = os.path.basename (po_file[:-3])
if lang.startswith ("."):
for po_file in glob(os.path.join("po", "*.po")):
lang = os.path.basename(po_file[:-3])
if lang.startswith("."):
# Hidden file, like auto-save data from an editor.
continue
mo_dir = os.path.join ("build", "mo", lang, "LC_MESSAGES")
mo_file = os.path.join (mo_dir, "%s.mo" % (self.domain,))
self.mkpath (mo_dir)
self.spawn (["msgfmt", po_file, "-o", mo_file])
mo_dir = os.path.join("build", "mo", lang, "LC_MESSAGES")
mo_file = os.path.join(mo_dir, "%s.mo" % (self.domain,))
self.mkpath(mo_dir)
self.spawn(["msgfmt", po_file, "-o", mo_file])
targetpath = os.path.join ("share", "locale", lang, "LC_MESSAGES")
data_files.append ((targetpath, (mo_file,)))
targetpath = os.path.join("share", "locale", lang, "LC_MESSAGES")
data_files.append((targetpath, (mo_file,)))
for parameter, option in ((self.merge_xml_files, "-x",),
(self.merge_desktop_files, "-d",),
@ -202,17 +184,18 @@ class build_l10n (distutils.cmd.Command):
if not parameter:
continue
for target, files in parameter:
build_target = os.path.join ("build", target)
build_target = os.path.join("build", target)
for file in files:
if file.endswith (".in"):
file_merged = os.path.basename (file[:-3])
if file.endswith(".in"):
file_merged = os.path.basename(file[:-3])
else:
file_merged = os.path.basename (file)
file_merged = os.path.basename(file)
self.mkpath(build_target)
file_merged = os.path.join(build_target, file_merged)
self.spawn(["intltool-merge", option, "po", file, file_merged])
data_files.append((target, [file_merged],))
self.mkpath (build_target)
file_merged = os.path.join (build_target, file_merged)
self.spawn (["intltool-merge", option, "po", file, file_merged])
data_files.append ((target, [file_merged],))
class distcheck (sdist):
@ -220,126 +203,132 @@ class distcheck (sdist):
description = "verify self-containedness of source distribution"
def run (self):
def run(self):
from distutils import dir_util
from distutils.spawn import spawn
# This creates e.g. dist/gst-debug-viewer-0.1.tar.gz.
sdist.run (self)
sdist.run(self)
base_dir = self.distribution.get_fullname ()
distcheck_dir = os.path.join (self.dist_dir, "distcheck")
self.mkpath (distcheck_dir)
self.mkpath (os.path.join (distcheck_dir, "again"))
base_dir = self.distribution.get_fullname()
distcheck_dir = os.path.join(self.dist_dir, "distcheck")
self.mkpath(distcheck_dir)
self.mkpath(os.path.join(distcheck_dir, "again"))
cwd = os.getcwd ()
os.chdir (distcheck_dir)
cwd = os.getcwd()
os.chdir(distcheck_dir)
if os.path.isdir (base_dir):
dir_util.remove_tree (base_dir)
if os.path.isdir(base_dir):
dir_util.remove_tree(base_dir)
# Unpack tarball into dist/distcheck, creating
# e.g. dist/distcheck/gst-debug-viewer-0.1.
for archive in self.archive_files:
if archive.endswith (".tar.gz"):
archive_rel = os.path.join (os.pardir, os.pardir, archive)
spawn (["tar", "-xzf", archive_rel, base_dir])
if archive.endswith(".tar.gz"):
archive_rel = os.path.join(os.pardir, os.pardir, archive)
spawn(["tar", "-xzf", archive_rel, base_dir])
break
else:
raise ValueError ("no supported archives were created")
raise ValueError("no supported archives were created")
os.chdir (cwd)
os.chdir (os.path.join (distcheck_dir, base_dir))
spawn ([sys.executable, "setup.py", "sdist", "--formats", "gztar"])
os.chdir(cwd)
os.chdir(os.path.join(distcheck_dir, base_dir))
spawn([sys.executable, "setup.py", "sdist", "--formats", "gztar"])
# Unpack tarball into dist/distcheck/again.
os.chdir (cwd)
os.chdir (os.path.join (distcheck_dir, "again"))
archive_rel = os.path.join (os.pardir, base_dir, "dist", "%s.tar.gz" % (base_dir,))
spawn (["tar", "-xzf", archive_rel, base_dir])
os.chdir(cwd)
os.chdir(os.path.join(distcheck_dir, "again"))
archive_rel = os.path.join(
os.pardir, base_dir, "dist", "%s.tar.gz" % (base_dir,))
spawn(["tar", "-xzf", archive_rel, base_dir])
os.chdir (cwd)
os.chdir (os.path.join (distcheck_dir, base_dir))
spawn ([sys.executable, "setup.py", "clean"])
os.chdir(cwd)
os.chdir(os.path.join(distcheck_dir, base_dir))
spawn([sys.executable, "setup.py", "clean"])
os.chdir (cwd)
spawn (["diff", "-ru",
os.path.join (distcheck_dir, base_dir),
os.path.join (distcheck_dir, "again", base_dir)])
os.chdir(cwd)
spawn(["diff", "-ru",
os.path.join(distcheck_dir, base_dir),
os.path.join(distcheck_dir, "again", base_dir)])
if not self.keep_temp:
dir_util.remove_tree (distcheck_dir)
dir_util.remove_tree(distcheck_dir)
class install_scripts_custom (install_scripts):
user_options = install_scripts.user_options \
+ [("substitute-files=", None,
"files to perform substitution on")]
+ [("substitute-files=", None,
"files to perform substitution on")]
def initialize_options (self):
def initialize_options(self):
install_scripts.initialize_options (self)
install_scripts.initialize_options(self)
self.substitute_files = "[]"
def run (self):
def run(self):
from os.path import normpath
install = self.distribution.get_command_obj ("install")
install.ensure_finalized ()
install = self.distribution.get_command_obj("install")
install.ensure_finalized()
values = {"DATADIR" : install.install_data or "",
"PREFIX" : install.home or install.prefix or "",
"SCRIPTSDIR" : self.install_dir or ""}
values = {"DATADIR": install.install_data or "",
"PREFIX": install.home or install.prefix or "",
"SCRIPTSDIR": self.install_dir or ""}
if install.home:
values["LIBDIR"] = os.path.normpath (install.install_lib)
values["LIBDIR"] = os.path.normpath(install.install_lib)
if install.root:
root = normpath (install.root)
len_root = len (root)
for name, value in list(values.items ()):
if normpath (value).startswith (root):
values[name] = normpath (value)[len_root:]
root = normpath(install.root)
len_root = len(root)
for name, value in list(values.items()):
if normpath(value).startswith(root):
values[name] = normpath(value)[len_root:]
# Perform installation as normal...
install_scripts.run (self)
install_scripts.run(self)
if self.dry_run:
return
# ...then substitute in-place:
for filename in eval (self.substitute_files):
perform_substitution (os.path.join (self.install_dir, filename), values)
for filename in eval(self.substitute_files):
perform_substitution(os.path.join(
self.install_dir, filename), values)
cmdclass = {"build" : build_custom,
"clean" : clean_custom,
"install_scripts" : install_scripts_custom,
"build_l10n" : build_l10n,
"distcheck" : distcheck,
"tests" : tests}
cmdclass = {"build": build_custom,
"clean": clean_custom,
"install_scripts": install_scripts_custom,
setup (cmdclass = cmdclass,
"build_l10n": build_l10n,
"distcheck": distcheck}
packages = ["GstDebugViewer",
"GstDebugViewer.Common",
"GstDebugViewer.GUI",
"GstDebugViewer.Plugins"],
scripts = ["gst-debug-viewer"],
data_files = [("share/gst-debug-viewer", ["data/about-dialog.ui",
"data/main-window.ui",
"data/menus.ui"],),
("share/icons/hicolor/48x48/apps", ["data/gst-debug-viewer.png"],),
("share/icons/hicolor/scalable/apps", ["data/gst-debug-viewer.svg"],)],
setup(cmdclass=cmdclass,
name = "gst-debug-viewer",
version = "0.1",
description = "GStreamer Debug Viewer",
long_description = """""",
license = "GNU GPL",
author = "Rene Stadler",
author_email = "mail@renestadler.de",
url = "http://renestadler.de/projects/gst-debug-viewer")
packages=["GstDebugViewer",
"GstDebugViewer.Common",
"GstDebugViewer.GUI",
"GstDebugViewer.Plugins",
"GstDebugViewer.tests"],
scripts=["gst-debug-viewer"],
data_files=[("share/gst-debug-viewer", ["data/about-dialog.ui",
"data/main-window.ui",
"data/menus.ui"],),
("share/icons/hicolor/48x48/apps",
["data/gst-debug-viewer.png"],),
("share/icons/hicolor/scalable/apps", ["data/gst-debug-viewer.svg"],)],
name="gst-debug-viewer",
version="0.1",
description="GStreamer Debug Viewer",
long_description="""""",
test_suite="GstDebugViewer.tests",
license="GNU GPL",
author="Rene Stadler",
author_email="mail@renestadler.de",
url="http://renestadler.de/projects/gst-debug-viewer")

View file

@ -1,274 +0,0 @@
#!/usr/bin/env python
# -*- coding: utf-8; mode: python; -*-
#
# GStreamer Debug Viewer - View and analyze GStreamer debug log files
#
# Copyright (C) 2007 René Stadler <mail@renestadler.de>
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the Free
# Software Foundation; either version 3 of the License, or (at your option)
# any later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
# more details.
#
# You should have received a copy of the GNU General Public License along with
# this program. If not, see <http://www.gnu.org/licenses/>.
"""GStreamer Debug Viewer test suite for the custom tree models."""
import sys
import os
import os.path
from glob import glob
sys.path.insert (0, os.path.join (sys.path[0], os.pardir))
from unittest import TestCase, main as test_main
from GstDebugViewer import Common, Data
from GstDebugViewer.GUI.filters import CategoryFilter, Filter
from GstDebugViewer.GUI.models import (FilteredLogModel,
LogModelBase,
SubRange,)
class TestSubRange (TestCase):
def test_len (self):
l = list(range(20))
sr = SubRange (l, 0, 20)
self.assertEqual (len (sr), 20)
sr = SubRange (l, 10, 20)
self.assertEqual (len (sr), 10)
sr = SubRange (l, 0, 10)
self.assertEqual (len (sr), 10)
sr = SubRange (l, 5, 15)
self.assertEqual (len (sr), 10)
def test_iter (self):
l = list(range(20))
sr = SubRange (l, 0, 20)
self.assertEqual (list (sr), l)
sr = SubRange (l, 10, 20)
self.assertEqual (list (sr), list(range(10, 20)))
sr = SubRange (l, 0, 10)
self.assertEqual (list (sr), list(range(0, 10)))
sr = SubRange (l, 5, 15)
self.assertEqual (list (sr), list(range(5, 15)))
class Model (LogModelBase):
def __init__ (self):
LogModelBase.__init__ (self)
for i in range (20):
self.line_offsets.append (i * 100)
self.line_levels.append (Data.debug_level_debug)
def ensure_cached (self, line_offset):
pid = line_offset // 100
if pid % 2 == 0:
category = b"EVEN"
else:
category = b"ODD"
line_fmt = (b"0:00:00.000000000 %5i 0x0000000 DEBUG "
b"%20s dummy.c:1:dummy: dummy")
line_str = line_fmt % (pid, category,)
log_line = Data.LogLine.parse_full (line_str)
self.line_cache[line_offset] = log_line
def access_offset (self, line_offset):
return ""
class IdentityFilter (Filter):
def __init__ (self):
def filter_func (row):
return True
self.filter_func = filter_func
class RandomFilter (Filter):
def __init__ (self, seed):
import random
rand = random.Random ()
rand.seed (seed)
def filter_func (row):
return rand.choice ((True, False,))
self.filter_func = filter_func
class TestDynamicFilter (TestCase):
def test_unset_filter_rerange (self):
full_model = Model ()
filtered_model = FilteredLogModel (full_model)
row_list = self.__row_list
self.assertEqual (row_list (full_model), list(range(20)))
self.assertEqual (row_list (filtered_model), list(range(20)))
filtered_model.set_range (5, 16)
self.assertEqual (row_list (filtered_model), list(range(5, 16)))
def test_identity_filter_rerange (self):
full_model = Model ()
filtered_model = FilteredLogModel (full_model)
row_list = self.__row_list
self.assertEqual (row_list (full_model), list(range(20)))
self.assertEqual (row_list (filtered_model), list(range(20)))
filtered_model.add_filter (IdentityFilter (),
Common.Data.DefaultDispatcher ())
filtered_model.set_range (5, 16)
self.assertEqual (row_list (filtered_model), list(range(5, 16)))
def test_filtered_range_refilter_skip (self):
full_model = Model ()
filtered_model = FilteredLogModel (full_model)
row_list = self.__row_list
filtered_model.add_filter (CategoryFilter ("EVEN"),
Common.Data.DefaultDispatcher ())
self.__dump_model (filtered_model, "filtered")
self.assertEqual (row_list (filtered_model), list(range(1, 20, 2)))
self.assertEqual ([filtered_model.line_index_from_super (i)
for i in range (1, 20, 2)],
list(range(10)))
self.assertEqual ([filtered_model.line_index_to_super (i)
for i in range (10)],
list(range(1, 20, 2)))
filtered_model.set_range (1, 20)
self.__dump_model (filtered_model, "ranged (1, 20)")
self.__dump_model (filtered_model, "filtered range")
self.assertEqual ([filtered_model.line_index_from_super (i)
for i in range (0, 19, 2)],
list(range(10)))
self.assertEqual ([filtered_model.line_index_to_super (i)
for i in range (10)],
list(range(1, 20, 2)))
filtered_model.set_range (2, 20)
self.__dump_model (filtered_model, "ranged (2, 20)")
self.assertEqual (row_list (filtered_model), list(range(3, 20, 2)))
def test_filtered_range_refilter (self):
full_model = Model ()
filtered_model = FilteredLogModel (full_model)
row_list = self.__row_list
rows = row_list (full_model)
rows_filtered = row_list (filtered_model)
self.__dump_model (full_model, "full model")
self.assertEqual (rows, rows_filtered)
self.assertEqual ([filtered_model.line_index_from_super (i)
for i in range (20)],
list(range(20)))
self.assertEqual ([filtered_model.line_index_to_super (i)
for i in range (20)],
list(range(20)))
filtered_model.set_range (5, 16)
self.__dump_model (filtered_model, "ranged model (5, 16)")
rows_ranged = row_list (filtered_model)
self.assertEqual (rows_ranged, list(range(5, 16)))
self.__dump_model (filtered_model, "filtered model (nofilter, 5, 15)")
filtered_model.add_filter (CategoryFilter ("EVEN"),
Common.Data.DefaultDispatcher ())
rows_filtered = row_list (filtered_model)
self.assertEqual (rows_filtered, list(range(5, 16, 2)))
self.__dump_model (filtered_model, "filtered model")
def test_random_filtered_range_refilter (self):
full_model = Model ()
filtered_model = FilteredLogModel (full_model)
row_list = self.__row_list
self.assertEqual (row_list (full_model), list(range(20)))
self.assertEqual (row_list (filtered_model), list(range(20)))
filtered_model.add_filter (RandomFilter (538295943),
Common.Data.DefaultDispatcher ())
random_rows = row_list (filtered_model)
self.__dump_model (filtered_model)
filtered_model = FilteredLogModel (full_model)
filtered_model.add_filter (RandomFilter (538295943),
Common.Data.DefaultDispatcher ())
self.__dump_model (filtered_model, "filtered model")
self.assertEqual (row_list (filtered_model), random_rows)
filtered_model.set_range (1, 10)
self.__dump_model (filtered_model)
self.assertEqual (row_list (filtered_model), [x for x in range (0, 10) if x in random_rows])
def __row_list (self, model):
return [row[Model.COL_PID] for row in model]
def __dump_model (self, model, comment = None):
# TODO: Provide a command line option to turn this on and off.
return
if not hasattr (model, "super_model"):
# Top model.
print("\t(%s)" % ("|".join ([str (i).rjust (2) for i in self.__row_list (model)]),), end=' ')
else:
top_model = model.super_model
if hasattr (top_model, "super_model"):
top_model = top_model.super_model
top_indices = self.__row_list (top_model)
positions = self.__row_list (model)
output = [" "] * len (top_indices)
for i, position in enumerate (positions):
output[position] = str (i).rjust (2)
print("\t(%s)" % ("|".join (output),), end=' ')
if comment is None:
print()
else:
print(comment)
if __name__ == "__main__":
test_main ()

View file

@ -56,7 +56,7 @@ def main():
try:
if not modified_file.endswith(".py"):
continue
pycodestyle_errors = system('pycodestyle', '--repeat', '--ignore', 'E501,E128', modified_file)
pycodestyle_errors = system('pycodestyle', '--repeat', '--ignore', 'E501,E128,W605', modified_file)
if pycodestyle_errors:
if output_message is None:
output_message = NOT_PYCODESTYLE_COMPLIANT_MESSAGE_PRE