Data: fix up out-of-order log lines

This is important because we rely on monotonically increasing timestamps for
binary searches in various places.

Overhead for an already sorted file with 1 million lines is less than 5%.
This commit is contained in:
René Stadler 2011-11-16 20:37:21 +01:00 committed by Stefan Sauer
parent 50dd570f3a
commit 66e87f752a

View file

@ -183,6 +183,75 @@ class Producer (object):
for consumer in self.consumers:
consumer.handle_load_finished ()
class SortHelper (object):
def __init__ (self, fileobj, offsets):
self._gen = self.__gen (fileobj, offsets)
self._gen.next ()
# Override in the instance, for performance (this gets called in an
# inner loop):
self.find_insert_position = self._gen.send
@staticmethod
def find_insert_position (insert_time_string):
# Stub for documentary purposes.
pass
@staticmethod
def __gen (fileobj, offsets):
from math import floor
tell = fileobj.tell
seek = fileobj.seek
read = fileobj.read
time_len = len (time_args (0))
# We remember the previous insertion point. This gives a nice speed up
# for larger bubbles which are already sorted. TODO: In practice, log
# lines only get out of order across threads. Need to check if it pays
# to parse the thread here and maintain multiple insertion points for
# heavily interleaved parts of the log.
pos = 0
pos_time_string = ""
insert_pos = None
while True:
insert_time_string = (yield insert_pos)
save_offset = tell ()
if pos_time_string <= insert_time_string:
lo = pos
hi = len (offsets)
else:
lo = 0
hi = pos
# This is a bisection search, except we don't cut the range in the
# middle each time, but at the 90th percentile. This is because
# logs are "mostly sorted", so the insertion point is much more
# likely to be at the end anyways:
while lo < hi:
mid = int (floor (lo * 0.1 + hi * 0.9))
seek (offsets[mid])
mid_time_string = read (time_len)
if insert_time_string < mid_time_string:
hi = mid
else:
lo = mid + 1
pos = lo
# Caller will replace row at pos with the new one, so this is
# correct:
pos_time_string = insert_time_string
insert_pos = pos
seek (save_offset)
class LineCache (Producer):
_lines_per_iteration = 50000
@ -233,17 +302,33 @@ class LineCache (Producer):
self.__fileobj.seek (0)
limit = self._lines_per_iteration
last_line = ""
i = 0
sort_helper = SortHelper (self.__fileobj, offsets)
find_insert_position = sort_helper.find_insert_position
while True:
offset = tell ()
line = readline ()
if not line:
break
# if line[18] == "\x1b":
# line = strip_escape (line)
match = rexp_match (line)
if match is None:
continue
# Timestamp is in the very beginning of the row, and can be sorted
# by lexical comparison. That's why we don't bother parsing the
# time to integer. We also don't have to take a substring here,
# which would be a useless memcpy.
if line >= last_line:
levels_append (dict_levels_get (match.group (1), debug_level_none))
offsets_append (offset)
last_line = line
else:
pos = find_insert_position (line)
levels.insert (pos, dict_levels_get (match.group (1), debug_level_none))
offsets.insert (pos, offset)
i += 1
if i >= limit:
i = 0