[enh][mod] result handling refactor

Several changes has been made:
 - Parallel result merge
 - Scoring algorithm slightly changed (see result_score())
 - Proper Thread locking on global data manipulation
This commit is contained in:
Adam Tauber 2015-10-03 17:26:07 +02:00
parent 0ad272c5cb
commit b6c3cb0bdd
6 changed files with 321 additions and 292 deletions

239
searx/results.py Normal file
View file

@ -0,0 +1,239 @@
import re
from collections import defaultdict
from operator import itemgetter
from threading import RLock
from urlparse import urlparse, unquote
from searx.engines import engines
CONTENT_LEN_IGNORED_CHARS_REGEX = re.compile('[,;:!?\./\\\\ ()-_]', re.M | re.U)
WHITESPACE_REGEX = re.compile('( |\t|\n)+', re.M | re.U)
# return the meaningful length of the content for a result
def result_content_len(content):
if isinstance(content, basestring):
return len(CONTENT_LEN_IGNORED_CHARS_REGEX.sub('', content))
else:
return 0
def compare_urls(url_a, url_b):
if url_a.netloc != url_b.netloc or url_a.query != url_b.query:
return False
# remove / from the end of the url if required
path_a = url_a.path[:-1]\
if url_a.path.endswith('/')\
else url_a.path
path_b = url_b.path[:-1]\
if url_b.path.endswith('/')\
else url_b.path
return unquote(path_a) == unquote(path_b)
def merge_two_infoboxes(infobox1, infobox2):
if 'urls' in infobox2:
urls1 = infobox1.get('urls', None)
if urls1 is None:
urls1 = []
infobox1.set('urls', urls1)
urlSet = set()
for url in infobox1.get('urls', []):
urlSet.add(url.get('url', None))
for url in infobox2.get('urls', []):
if url.get('url', None) not in urlSet:
urls1.append(url)
if 'attributes' in infobox2:
attributes1 = infobox1.get('attributes', None)
if attributes1 is None:
attributes1 = []
infobox1.set('attributes', attributes1)
attributeSet = set()
for attribute in infobox1.get('attributes', []):
if attribute.get('label', None) not in attributeSet:
attributeSet.add(attribute.get('label', None))
for attribute in infobox2.get('attributes', []):
attributes1.append(attribute)
if 'content' in infobox2:
content1 = infobox1.get('content', None)
content2 = infobox2.get('content', '')
if content1 is not None:
if result_content_len(content2) > result_content_len(content1):
infobox1['content'] = content2
else:
infobox1.set('content', content2)
def result_score(result):
weight = 1.0
for result_engine in result['engines']:
if hasattr(engines[result_engine], 'weight'):
weight *= float(engines[result_engine].weight)
occurences = len(result['positions'])
return sum((occurences * weight) / position for position in result['positions'])
class ResultContainer(object):
"""docstring for ResultContainer"""
def __init__(self):
super(ResultContainer, self).__init__()
self.results = defaultdict(list)
self._merged_results = []
self.infoboxes = []
self._infobox_ids = {}
self.suggestions = set()
self.answers = set()
def extend(self, engine_name, results):
for result in list(results):
if 'suggestion' in result:
self.suggestions.add(result['suggestion'])
results.remove(result)
elif 'answer' in result:
self.answers.add(result['suggestion'])
results.remve(result)
elif 'infobox' in result:
self._merge_infobox(result)
results.remove(result)
with RLock():
engines[engine_name].stats['search_count'] += 1
engines[engine_name].stats['result_count'] += len(results)
if not results:
return
self.results[engine_name].extend(results)
for i, result in enumerate(results):
position = i + 1
self._merge_result(result, position)
def _merge_infobox(self, infobox):
add_infobox = True
infobox_id = infobox.get('id', None)
if infobox_id is not None:
existingIndex = self._infobox_ids.get(infobox_id, None)
if existingIndex is not None:
merge_two_infoboxes(self.infoboxes[existingIndex], infobox)
add_infobox = False
if add_infobox:
self.infoboxes.append(infobox)
self._infobox_ids[infobox_id] = len(self.infoboxes) - 1
def _merge_result(self, result, position):
result['parsed_url'] = urlparse(result['url'])
# if the result has no scheme, use http as default
if not result['parsed_url'].scheme:
result['parsed_url'] = result['parsed_url']._replace(scheme="http")
result['host'] = result['parsed_url'].netloc
if result['host'].startswith('www.'):
result['host'] = result['host'].replace('www.', '', 1)
result['engines'] = [result['engine']]
# strip multiple spaces and cariage returns from content
if result.get('content'):
result['content'] = WHITESPACE_REGEX.sub(' ', result['content'])
# check for duplicates
duplicated = False
for merged_result in self._merged_results:
if compare_urls(result['parsed_url'], merged_result['parsed_url'])\
and result.get('template') == merged_result.get('template'):
duplicated = merged_result
break
# merge duplicates together
if duplicated:
# using content with more text
if result_content_len(result.get('content', '')) >\
result_content_len(duplicated.get('content', '')):
duplicated['content'] = result['content']
# add the new position
duplicated['positions'].append(position)
# add engine to list of result-engines
duplicated['engines'].append(result['engine'])
# using https if possible
if duplicated['parsed_url'].scheme != 'https' and result['parsed_url'].scheme == 'https':
duplicated['url'] = result['parsed_url'].geturl()
duplicated['parsed_url'] = result['parsed_url']
# if there is no duplicate found, append result
else:
result['positions'] = [position]
with RLock():
self._merged_results.append(result)
def get_ordered_results(self):
for result in self._merged_results:
score = result_score(result)
result['score'] = score
with RLock():
for result_engine in result['engines']:
engines[result_engine].stats['score_count'] += score
results = sorted(self._merged_results, key=itemgetter('score'), reverse=True)
# pass 2 : group results by category and template
gresults = []
categoryPositions = {}
for i, res in enumerate(results):
# FIXME : handle more than one category per engine
category = engines[res['engine']].categories[0] + ':' + ''\
if 'template' not in res\
else res['template']
current = None if category not in categoryPositions\
else categoryPositions[category]
# group with previous results using the same category
# if the group can accept more result and is not too far
# from the current position
if current is not None and (current['count'] > 0)\
and (len(gresults) - current['index'] < 20):
# group with the previous results using
# the same category with this one
index = current['index']
gresults.insert(index, res)
# update every index after the current one
# (including the current one)
for k in categoryPositions:
v = categoryPositions[k]['index']
if v >= index:
categoryPositions[k]['index'] = v + 1
# update this category
current['count'] -= 1
else:
# same category
gresults.append(res)
# update categoryIndex
categoryPositions[category] = {'index': len(gresults), 'count': 8}
# return gresults
return gresults
def results_length(self):
return len(self._merged_results)

View file

@ -16,13 +16,8 @@ along with searx. If not, see < http://www.gnu.org/licenses/ >.
''' '''
import threading import threading
import re
import searx.poolrequests as requests_lib import searx.poolrequests as requests_lib
from itertools import izip_longest, chain
from operator import itemgetter
from Queue import Queue
from time import time from time import time
from urlparse import urlparse, unquote
from searx import settings from searx import settings
from searx.engines import ( from searx.engines import (
categories, engines categories, engines
@ -30,6 +25,7 @@ from searx.engines import (
from searx.languages import language_codes from searx.languages import language_codes
from searx.utils import gen_useragent, get_blocked_engines from searx.utils import gen_useragent, get_blocked_engines
from searx.query import Query from searx.query import Query
from searx.results import ResultContainer
from searx import logger from searx import logger
logger = logger.getChild('search') logger = logger.getChild('search')
@ -42,7 +38,8 @@ def search_request_wrapper(fn, url, engine_name, **kwargs):
return fn(url, **kwargs) return fn(url, **kwargs)
except: except:
# increase errors stats # increase errors stats
engines[engine_name].stats['errors'] += 1 with threading.RLock():
engines[engine_name].stats['errors'] += 1
# print engine name and specific error message # print engine name and specific error message
logger.exception('engine crash: {0}'.format(engine_name)) logger.exception('engine crash: {0}'.format(engine_name))
@ -84,7 +81,7 @@ def default_request_params():
# create a callback wrapper for the search engine results # create a callback wrapper for the search engine results
def make_callback(engine_name, results_queue, callback, params): def make_callback(engine_name, callback, params, result_container):
# creating a callback wrapper for the search engine results # creating a callback wrapper for the search engine results
def process_callback(response, **kwargs): def process_callback(response, **kwargs):
@ -96,12 +93,17 @@ def make_callback(engine_name, results_queue, callback, params):
response.search_params = params response.search_params = params
timeout_overhead = 0.2 # seconds
search_duration = time() - params['started'] search_duration = time() - params['started']
# update stats with current page-load-time
with threading.RLock():
engines[engine_name].stats['page_load_time'] += search_duration
timeout_overhead = 0.2 # seconds
timeout_limit = engines[engine_name].timeout + timeout_overhead timeout_limit = engines[engine_name].timeout + timeout_overhead
if search_duration > timeout_limit: if search_duration > timeout_limit:
engines[engine_name].stats['page_load_time'] += timeout_limit with threading.RLock():
engines[engine_name].stats['errors'] += 1 engines[engine_name].stats['errors'] += 1
return return
# callback # callback
@ -111,212 +113,11 @@ def make_callback(engine_name, results_queue, callback, params):
for result in search_results: for result in search_results:
result['engine'] = engine_name result['engine'] = engine_name
results_queue.put_nowait((engine_name, search_results)) result_container.extend(engine_name, search_results)
# update stats with current page-load-time
engines[engine_name].stats['page_load_time'] += search_duration
return process_callback return process_callback
# return the meaningful length of the content for a result
def content_result_len(content):
if isinstance(content, basestring):
content = re.sub('[,;:!?\./\\\\ ()-_]', '', content)
return len(content)
else:
return 0
# score results and remove duplications
def score_results(results):
# calculate scoring parameters
flat_res = filter(
None, chain.from_iterable(izip_longest(*results.values())))
flat_len = len(flat_res)
engines_len = len(results)
results = []
# pass 1: deduplication + scoring
for i, res in enumerate(flat_res):
res['parsed_url'] = urlparse(res['url'])
# if the result has no scheme, use http as default
if not res['parsed_url'].scheme:
res['parsed_url'] = res['parsed_url']._replace(scheme="http")
res['host'] = res['parsed_url'].netloc
if res['host'].startswith('www.'):
res['host'] = res['host'].replace('www.', '', 1)
res['engines'] = [res['engine']]
weight = 1.0
# strip multiple spaces and cariage returns from content
if res.get('content'):
res['content'] = re.sub(' +', ' ',
res['content'].strip().replace('\n', ''))
# get weight of this engine if possible
if hasattr(engines[res['engine']], 'weight'):
weight = float(engines[res['engine']].weight)
# calculate score for that engine
score = int((flat_len - i) / engines_len) * weight + 1
# check for duplicates
duplicated = False
for new_res in results:
# remove / from the end of the url if required
p1 = res['parsed_url'].path[:-1]\
if res['parsed_url'].path.endswith('/')\
else res['parsed_url'].path
p2 = new_res['parsed_url'].path[:-1]\
if new_res['parsed_url'].path.endswith('/')\
else new_res['parsed_url'].path
# check if that result is a duplicate
if res['host'] == new_res['host'] and\
unquote(p1) == unquote(p2) and\
res['parsed_url'].query == new_res['parsed_url'].query and\
res.get('template') == new_res.get('template'):
duplicated = new_res
break
# merge duplicates together
if duplicated:
# using content with more text
if content_result_len(res.get('content', '')) >\
content_result_len(duplicated.get('content', '')):
duplicated['content'] = res['content']
# increase result-score
duplicated['score'] += score
# add engine to list of result-engines
duplicated['engines'].append(res['engine'])
# using https if possible
if duplicated['parsed_url'].scheme == 'https':
continue
elif res['parsed_url'].scheme == 'https':
duplicated['url'] = res['parsed_url'].geturl()
duplicated['parsed_url'] = res['parsed_url']
# if there is no duplicate found, append result
else:
res['score'] = score
results.append(res)
results = sorted(results, key=itemgetter('score'), reverse=True)
# pass 2 : group results by category and template
gresults = []
categoryPositions = {}
for i, res in enumerate(results):
# FIXME : handle more than one category per engine
category = engines[res['engine']].categories[0] + ':' + ''\
if 'template' not in res\
else res['template']
current = None if category not in categoryPositions\
else categoryPositions[category]
# group with previous results using the same category
# if the group can accept more result and is not too far
# from the current position
if current is not None and (current['count'] > 0)\
and (len(gresults) - current['index'] < 20):
# group with the previous results using
# the same category with this one
index = current['index']
gresults.insert(index, res)
# update every index after the current one
# (including the current one)
for k in categoryPositions:
v = categoryPositions[k]['index']
if v >= index:
categoryPositions[k]['index'] = v + 1
# update this category
current['count'] -= 1
else:
# same category
gresults.append(res)
# update categoryIndex
categoryPositions[category] = {'index': len(gresults), 'count': 8}
# return gresults
return gresults
def merge_two_infoboxes(infobox1, infobox2):
if 'urls' in infobox2:
urls1 = infobox1.get('urls', None)
if urls1 is None:
urls1 = []
infobox1.set('urls', urls1)
urlSet = set()
for url in infobox1.get('urls', []):
urlSet.add(url.get('url', None))
for url in infobox2.get('urls', []):
if url.get('url', None) not in urlSet:
urls1.append(url)
if 'attributes' in infobox2:
attributes1 = infobox1.get('attributes', None)
if attributes1 is None:
attributes1 = []
infobox1.set('attributes', attributes1)
attributeSet = set()
for attribute in infobox1.get('attributes', []):
if attribute.get('label', None) not in attributeSet:
attributeSet.add(attribute.get('label', None))
for attribute in infobox2.get('attributes', []):
attributes1.append(attribute)
if 'content' in infobox2:
content1 = infobox1.get('content', None)
content2 = infobox2.get('content', '')
if content1 is not None:
if content_result_len(content2) > content_result_len(content1):
infobox1['content'] = content2
else:
infobox1.set('content', content2)
def merge_infoboxes(infoboxes):
results = []
infoboxes_id = {}
for infobox in infoboxes:
add_infobox = True
infobox_id = infobox.get('id', None)
if infobox_id is not None:
existingIndex = infoboxes_id.get(infobox_id, None)
if existingIndex is not None:
merge_two_infoboxes(results[existingIndex], infobox)
add_infobox = False
if add_infobox:
results.append(infobox)
infoboxes_id[infobox_id] = len(results) - 1
return results
class Search(object): class Search(object):
"""Search information container""" """Search information container"""
@ -334,10 +135,7 @@ class Search(object):
# set blocked engines # set blocked engines
self.blocked_engines = get_blocked_engines(engines, request.cookies) self.blocked_engines = get_blocked_engines(engines, request.cookies)
self.results = [] self.result_container = ResultContainer()
self.suggestions = set()
self.answers = set()
self.infoboxes = []
self.request_data = {} self.request_data = {}
# set specific language if set # set specific language if set
@ -449,8 +247,6 @@ class Search(object):
# init vars # init vars
requests = [] requests = []
results_queue = Queue()
results = {}
# increase number of searches # increase number of searches
number_of_searches += 1 number_of_searches += 1
@ -504,9 +300,9 @@ class Search(object):
# create a callback wrapper for the search engine results # create a callback wrapper for the search engine results
callback = make_callback( callback = make_callback(
selected_engine['name'], selected_engine['name'],
results_queue,
engine.response, engine.response,
request_params) request_params,
self.result_container)
# create dictionary which contain all # create dictionary which contain all
# informations about the request # informations about the request
@ -539,42 +335,5 @@ class Search(object):
# send all search-request # send all search-request
threaded_requests(requests) threaded_requests(requests)
while not results_queue.empty():
engine_name, engine_results = results_queue.get_nowait()
# TODO type checks
[self.suggestions.add(x['suggestion'])
for x in list(engine_results)
if 'suggestion' in x
and engine_results.remove(x) is None]
[self.answers.add(x['answer'])
for x in list(engine_results)
if 'answer' in x
and engine_results.remove(x) is None]
self.infoboxes.extend(x for x in list(engine_results)
if 'infobox' in x
and engine_results.remove(x) is None)
results[engine_name] = engine_results
# update engine-specific stats
for engine_name, engine_results in results.items():
engines[engine_name].stats['search_count'] += 1
engines[engine_name].stats['result_count'] += len(engine_results)
# score results and remove duplications
self.results = score_results(results)
# merge infoboxes according to their ids
self.infoboxes = merge_infoboxes(self.infoboxes)
# update engine stats, using calculated score
for result in self.results:
for res_engine in result['engines']:
engines[result['engine']]\
.stats['score_count'] += result['score']
# return results, suggestions, answers and infoboxes # return results, suggestions, answers and infoboxes
return self return self

View file

@ -0,0 +1,41 @@
# -*- coding: utf-8 -*-
from searx.results import ResultContainer
from searx.testing import SearxTestCase
def fake_result(url='https://aa.bb/cc?dd=ee#ff',
title='aaa',
content='bbb',
engine='wikipedia', **kwargs):
result = {'url': url,
'title': title,
'content': content,
'engine': engine}
result.update(kwargs)
return result
# TODO
class ResultContainerTestCase(SearxTestCase):
def test_empty(self):
c = ResultContainer()
self.assertEqual(c.get_ordered_results(), [])
def test_one_result(self):
c = ResultContainer()
c.extend('wikipedia', [fake_result()])
self.assertEqual(c.results_length(), 1)
def test_one_suggestion(self):
c = ResultContainer()
c.extend('wikipedia', [fake_result(suggestion=True)])
self.assertEqual(len(c.suggestions), 1)
self.assertEqual(c.results_length(), 0)
def test_result_merge(self):
c = ResultContainer()
c.extend('wikipedia', [fake_result()])
c.extend('wikidata', [fake_result(), fake_result(url='https://example.com/')])
self.assertEqual(c.results_length(), 2)

View file

@ -1,25 +1,10 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
from searx.search import score_results
from searx.testing import SearxTestCase from searx.testing import SearxTestCase
def fake_result(url='https://aa.bb/cc?dd=ee#ff', # TODO
title='aaa', class SearchTestCase(SearxTestCase):
content='bbb',
engine='wikipedia'):
return {'url': url,
'title': title,
'content': content,
'engine': engine}
def test_(self):
class ScoreResultsTestCase(SearxTestCase): pass
def test_empty(self):
self.assertEqual(score_results(dict()), [])
def test_urlparse(self):
results = score_results(dict(a=[fake_result(url='https://aa.bb/cc?dd=ee#ff')]))
parsed_url = results[0]['parsed_url']
self.assertEqual(parsed_url.query, 'dd=ee')

View file

@ -1,6 +1,7 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import json import json
from mock import Mock
from urlparse import ParseResult from urlparse import ParseResult
from searx import webapp from searx import webapp
from searx.testing import SearxTestCase from searx.testing import SearxTestCase
@ -33,7 +34,12 @@ class ViewsTestCase(SearxTestCase):
] ]
def search_mock(search_self, *args): def search_mock(search_self, *args):
search_self.results = self.test_results search_self.result_container = Mock(get_ordered_results=lambda: self.test_results,
answers=set(),
suggestions=set(),
infoboxes=[],
results=self.test_results,
results_length=lambda: len(self.test_results))
webapp.Search.search = search_mock webapp.Search.search = search_mock

View file

@ -383,7 +383,7 @@ def index():
plugins.call('post_search', request, locals()) plugins.call('post_search', request, locals())
for result in search.results: for result in search.result_container.get_ordered_results():
plugins.call('on_result', request, locals()) plugins.call('on_result', request, locals())
if not search.paging and engines[result['engine']].paging: if not search.paging and engines[result['engine']].paging:
@ -411,7 +411,7 @@ def index():
minutes = int((timedifference.seconds / 60) % 60) minutes = int((timedifference.seconds / 60) % 60)
hours = int(timedifference.seconds / 60 / 60) hours = int(timedifference.seconds / 60 / 60)
if hours == 0: if hours == 0:
result['publishedDate'] = gettext(u'{minutes} minute(s) ago').format(minutes=minutes) # noqa result['publishedDate'] = gettext(u'{minutes} minute(s) ago').format(minutes=minutes)
else: else:
result['publishedDate'] = gettext(u'{hours} hour(s), {minutes} minute(s) ago').format(hours=hours, minutes=minutes) # noqa result['publishedDate'] = gettext(u'{hours} hour(s), {minutes} minute(s) ago').format(hours=hours, minutes=minutes) # noqa
else: else:
@ -419,17 +419,16 @@ def index():
if search.request_data.get('format') == 'json': if search.request_data.get('format') == 'json':
return Response(json.dumps({'query': search.query, return Response(json.dumps({'query': search.query,
'results': search.results}), 'results': search.result_container.get_ordered_results()}),
mimetype='application/json') mimetype='application/json')
elif search.request_data.get('format') == 'csv': elif search.request_data.get('format') == 'csv':
csv = UnicodeWriter(cStringIO.StringIO()) csv = UnicodeWriter(cStringIO.StringIO())
keys = ('title', 'url', 'content', 'host', 'engine', 'score') keys = ('title', 'url', 'content', 'host', 'engine', 'score')
if search.results: csv.writerow(keys)
csv.writerow(keys) for row in search.result_container.get_ordered_results():
for row in search.results: row['host'] = row['parsed_url'].netloc
row['host'] = row['parsed_url'].netloc csv.writerow([row.get(key, '') for key in keys])
csv.writerow([row.get(key, '') for key in keys]) csv.stream.seek(0)
csv.stream.seek(0)
response = Response(csv.stream.read(), mimetype='application/csv') response = Response(csv.stream.read(), mimetype='application/csv')
cont_disp = 'attachment;Filename=searx_-_{0}.csv'.format(search.query) cont_disp = 'attachment;Filename=searx_-_{0}.csv'.format(search.query)
response.headers.add('Content-Disposition', cont_disp) response.headers.add('Content-Disposition', cont_disp)
@ -437,24 +436,24 @@ def index():
elif search.request_data.get('format') == 'rss': elif search.request_data.get('format') == 'rss':
response_rss = render( response_rss = render(
'opensearch_response_rss.xml', 'opensearch_response_rss.xml',
results=search.results, results=search.result_container.get_ordered_results(),
q=search.request_data['q'], q=search.request_data['q'],
number_of_results=len(search.results), number_of_results=search.result_container.results_length(),
base_url=get_base_url() base_url=get_base_url()
) )
return Response(response_rss, mimetype='text/xml') return Response(response_rss, mimetype='text/xml')
return render( return render(
'results.html', 'results.html',
results=search.results, results=search.result_container.get_ordered_results(),
q=search.request_data['q'], q=search.request_data['q'],
selected_categories=search.categories, selected_categories=search.categories,
paging=search.paging, paging=search.paging,
pageno=search.pageno, pageno=search.pageno,
base_url=get_base_url(), base_url=get_base_url(),
suggestions=search.suggestions, suggestions=search.result_container.suggestions,
answers=search.answers, answers=search.result_container.answers,
infoboxes=search.infoboxes, infoboxes=search.result_container.infoboxes,
theme=get_current_theme_name(), theme=get_current_theme_name(),
favicons=global_favicons[themes.index(get_current_theme_name())] favicons=global_favicons[themes.index(get_current_theme_name())]
) )