From b6c3cb0bdd020a459d0ef5c21d1303ed0148cc0c Mon Sep 17 00:00:00 2001 From: Adam Tauber Date: Sat, 3 Oct 2015 17:26:07 +0200 Subject: [PATCH] [enh][mod] result handling refactor Several changes has been made: - Parallel result merge - Scoring algorithm slightly changed (see result_score()) - Proper Thread locking on global data manipulation --- searx/results.py | 239 +++++++++++++++++++++++++++++++ searx/search.py | 273 +++--------------------------------- searx/tests/test_results.py | 41 ++++++ searx/tests/test_search.py | 23 +-- searx/tests/test_webapp.py | 8 +- searx/webapp.py | 29 ++-- 6 files changed, 321 insertions(+), 292 deletions(-) create mode 100644 searx/results.py create mode 100644 searx/tests/test_results.py diff --git a/searx/results.py b/searx/results.py new file mode 100644 index 000000000..2837f6998 --- /dev/null +++ b/searx/results.py @@ -0,0 +1,239 @@ +import re +from collections import defaultdict +from operator import itemgetter +from threading import RLock +from urlparse import urlparse, unquote +from searx.engines import engines + +CONTENT_LEN_IGNORED_CHARS_REGEX = re.compile('[,;:!?\./\\\\ ()-_]', re.M | re.U) +WHITESPACE_REGEX = re.compile('( |\t|\n)+', re.M | re.U) + + +# return the meaningful length of the content for a result +def result_content_len(content): + if isinstance(content, basestring): + return len(CONTENT_LEN_IGNORED_CHARS_REGEX.sub('', content)) + else: + return 0 + + +def compare_urls(url_a, url_b): + if url_a.netloc != url_b.netloc or url_a.query != url_b.query: + return False + + # remove / from the end of the url if required + path_a = url_a.path[:-1]\ + if url_a.path.endswith('/')\ + else url_a.path + path_b = url_b.path[:-1]\ + if url_b.path.endswith('/')\ + else url_b.path + + return unquote(path_a) == unquote(path_b) + + +def merge_two_infoboxes(infobox1, infobox2): + if 'urls' in infobox2: + urls1 = infobox1.get('urls', None) + if urls1 is None: + urls1 = [] + infobox1.set('urls', urls1) + + urlSet = set() + for url in infobox1.get('urls', []): + urlSet.add(url.get('url', None)) + + for url in infobox2.get('urls', []): + if url.get('url', None) not in urlSet: + urls1.append(url) + + if 'attributes' in infobox2: + attributes1 = infobox1.get('attributes', None) + if attributes1 is None: + attributes1 = [] + infobox1.set('attributes', attributes1) + + attributeSet = set() + for attribute in infobox1.get('attributes', []): + if attribute.get('label', None) not in attributeSet: + attributeSet.add(attribute.get('label', None)) + + for attribute in infobox2.get('attributes', []): + attributes1.append(attribute) + + if 'content' in infobox2: + content1 = infobox1.get('content', None) + content2 = infobox2.get('content', '') + if content1 is not None: + if result_content_len(content2) > result_content_len(content1): + infobox1['content'] = content2 + else: + infobox1.set('content', content2) + + +def result_score(result): + weight = 1.0 + + for result_engine in result['engines']: + if hasattr(engines[result_engine], 'weight'): + weight *= float(engines[result_engine].weight) + + occurences = len(result['positions']) + + return sum((occurences * weight) / position for position in result['positions']) + + +class ResultContainer(object): + """docstring for ResultContainer""" + def __init__(self): + super(ResultContainer, self).__init__() + self.results = defaultdict(list) + self._merged_results = [] + self.infoboxes = [] + self._infobox_ids = {} + self.suggestions = set() + self.answers = set() + + def extend(self, engine_name, results): + for result in list(results): + if 'suggestion' in result: + self.suggestions.add(result['suggestion']) + results.remove(result) + elif 'answer' in result: + self.answers.add(result['suggestion']) + results.remve(result) + elif 'infobox' in result: + self._merge_infobox(result) + results.remove(result) + + with RLock(): + engines[engine_name].stats['search_count'] += 1 + engines[engine_name].stats['result_count'] += len(results) + + if not results: + return + + self.results[engine_name].extend(results) + + for i, result in enumerate(results): + position = i + 1 + self._merge_result(result, position) + + def _merge_infobox(self, infobox): + add_infobox = True + infobox_id = infobox.get('id', None) + if infobox_id is not None: + existingIndex = self._infobox_ids.get(infobox_id, None) + if existingIndex is not None: + merge_two_infoboxes(self.infoboxes[existingIndex], infobox) + add_infobox = False + + if add_infobox: + self.infoboxes.append(infobox) + self._infobox_ids[infobox_id] = len(self.infoboxes) - 1 + + def _merge_result(self, result, position): + result['parsed_url'] = urlparse(result['url']) + + # if the result has no scheme, use http as default + if not result['parsed_url'].scheme: + result['parsed_url'] = result['parsed_url']._replace(scheme="http") + + result['host'] = result['parsed_url'].netloc + + if result['host'].startswith('www.'): + result['host'] = result['host'].replace('www.', '', 1) + + result['engines'] = [result['engine']] + + # strip multiple spaces and cariage returns from content + if result.get('content'): + result['content'] = WHITESPACE_REGEX.sub(' ', result['content']) + + # check for duplicates + duplicated = False + for merged_result in self._merged_results: + if compare_urls(result['parsed_url'], merged_result['parsed_url'])\ + and result.get('template') == merged_result.get('template'): + duplicated = merged_result + break + + # merge duplicates together + if duplicated: + # using content with more text + if result_content_len(result.get('content', '')) >\ + result_content_len(duplicated.get('content', '')): + duplicated['content'] = result['content'] + + # add the new position + duplicated['positions'].append(position) + + # add engine to list of result-engines + duplicated['engines'].append(result['engine']) + + # using https if possible + if duplicated['parsed_url'].scheme != 'https' and result['parsed_url'].scheme == 'https': + duplicated['url'] = result['parsed_url'].geturl() + duplicated['parsed_url'] = result['parsed_url'] + + # if there is no duplicate found, append result + else: + result['positions'] = [position] + with RLock(): + self._merged_results.append(result) + + def get_ordered_results(self): + for result in self._merged_results: + score = result_score(result) + result['score'] = score + with RLock(): + for result_engine in result['engines']: + engines[result_engine].stats['score_count'] += score + + results = sorted(self._merged_results, key=itemgetter('score'), reverse=True) + + # pass 2 : group results by category and template + gresults = [] + categoryPositions = {} + + for i, res in enumerate(results): + # FIXME : handle more than one category per engine + category = engines[res['engine']].categories[0] + ':' + ''\ + if 'template' not in res\ + else res['template'] + + current = None if category not in categoryPositions\ + else categoryPositions[category] + + # group with previous results using the same category + # if the group can accept more result and is not too far + # from the current position + if current is not None and (current['count'] > 0)\ + and (len(gresults) - current['index'] < 20): + # group with the previous results using + # the same category with this one + index = current['index'] + gresults.insert(index, res) + + # update every index after the current one + # (including the current one) + for k in categoryPositions: + v = categoryPositions[k]['index'] + if v >= index: + categoryPositions[k]['index'] = v + 1 + + # update this category + current['count'] -= 1 + + else: + # same category + gresults.append(res) + + # update categoryIndex + categoryPositions[category] = {'index': len(gresults), 'count': 8} + + # return gresults + return gresults + + def results_length(self): + return len(self._merged_results) diff --git a/searx/search.py b/searx/search.py index f2b5235b8..02676a149 100644 --- a/searx/search.py +++ b/searx/search.py @@ -16,13 +16,8 @@ along with searx. If not, see < http://www.gnu.org/licenses/ >. ''' import threading -import re import searx.poolrequests as requests_lib -from itertools import izip_longest, chain -from operator import itemgetter -from Queue import Queue from time import time -from urlparse import urlparse, unquote from searx import settings from searx.engines import ( categories, engines @@ -30,6 +25,7 @@ from searx.engines import ( from searx.languages import language_codes from searx.utils import gen_useragent, get_blocked_engines from searx.query import Query +from searx.results import ResultContainer from searx import logger logger = logger.getChild('search') @@ -42,7 +38,8 @@ def search_request_wrapper(fn, url, engine_name, **kwargs): return fn(url, **kwargs) except: # increase errors stats - engines[engine_name].stats['errors'] += 1 + with threading.RLock(): + engines[engine_name].stats['errors'] += 1 # print engine name and specific error message logger.exception('engine crash: {0}'.format(engine_name)) @@ -84,7 +81,7 @@ def default_request_params(): # create a callback wrapper for the search engine results -def make_callback(engine_name, results_queue, callback, params): +def make_callback(engine_name, callback, params, result_container): # creating a callback wrapper for the search engine results def process_callback(response, **kwargs): @@ -96,12 +93,17 @@ def make_callback(engine_name, results_queue, callback, params): response.search_params = params - timeout_overhead = 0.2 # seconds search_duration = time() - params['started'] + # update stats with current page-load-time + with threading.RLock(): + engines[engine_name].stats['page_load_time'] += search_duration + + timeout_overhead = 0.2 # seconds timeout_limit = engines[engine_name].timeout + timeout_overhead + if search_duration > timeout_limit: - engines[engine_name].stats['page_load_time'] += timeout_limit - engines[engine_name].stats['errors'] += 1 + with threading.RLock(): + engines[engine_name].stats['errors'] += 1 return # callback @@ -111,212 +113,11 @@ def make_callback(engine_name, results_queue, callback, params): for result in search_results: result['engine'] = engine_name - results_queue.put_nowait((engine_name, search_results)) - - # update stats with current page-load-time - engines[engine_name].stats['page_load_time'] += search_duration + result_container.extend(engine_name, search_results) return process_callback -# return the meaningful length of the content for a result -def content_result_len(content): - if isinstance(content, basestring): - content = re.sub('[,;:!?\./\\\\ ()-_]', '', content) - return len(content) - else: - return 0 - - -# score results and remove duplications -def score_results(results): - # calculate scoring parameters - flat_res = filter( - None, chain.from_iterable(izip_longest(*results.values()))) - flat_len = len(flat_res) - engines_len = len(results) - - results = [] - - # pass 1: deduplication + scoring - for i, res in enumerate(flat_res): - - res['parsed_url'] = urlparse(res['url']) - - # if the result has no scheme, use http as default - if not res['parsed_url'].scheme: - res['parsed_url'] = res['parsed_url']._replace(scheme="http") - - res['host'] = res['parsed_url'].netloc - - if res['host'].startswith('www.'): - res['host'] = res['host'].replace('www.', '', 1) - - res['engines'] = [res['engine']] - - weight = 1.0 - - # strip multiple spaces and cariage returns from content - if res.get('content'): - res['content'] = re.sub(' +', ' ', - res['content'].strip().replace('\n', '')) - - # get weight of this engine if possible - if hasattr(engines[res['engine']], 'weight'): - weight = float(engines[res['engine']].weight) - - # calculate score for that engine - score = int((flat_len - i) / engines_len) * weight + 1 - - # check for duplicates - duplicated = False - for new_res in results: - # remove / from the end of the url if required - p1 = res['parsed_url'].path[:-1]\ - if res['parsed_url'].path.endswith('/')\ - else res['parsed_url'].path - p2 = new_res['parsed_url'].path[:-1]\ - if new_res['parsed_url'].path.endswith('/')\ - else new_res['parsed_url'].path - - # check if that result is a duplicate - if res['host'] == new_res['host'] and\ - unquote(p1) == unquote(p2) and\ - res['parsed_url'].query == new_res['parsed_url'].query and\ - res.get('template') == new_res.get('template'): - duplicated = new_res - break - - # merge duplicates together - if duplicated: - # using content with more text - if content_result_len(res.get('content', '')) >\ - content_result_len(duplicated.get('content', '')): - duplicated['content'] = res['content'] - - # increase result-score - duplicated['score'] += score - - # add engine to list of result-engines - duplicated['engines'].append(res['engine']) - - # using https if possible - if duplicated['parsed_url'].scheme == 'https': - continue - elif res['parsed_url'].scheme == 'https': - duplicated['url'] = res['parsed_url'].geturl() - duplicated['parsed_url'] = res['parsed_url'] - - # if there is no duplicate found, append result - else: - res['score'] = score - - results.append(res) - - results = sorted(results, key=itemgetter('score'), reverse=True) - - # pass 2 : group results by category and template - gresults = [] - categoryPositions = {} - - for i, res in enumerate(results): - # FIXME : handle more than one category per engine - category = engines[res['engine']].categories[0] + ':' + ''\ - if 'template' not in res\ - else res['template'] - - current = None if category not in categoryPositions\ - else categoryPositions[category] - - # group with previous results using the same category - # if the group can accept more result and is not too far - # from the current position - if current is not None and (current['count'] > 0)\ - and (len(gresults) - current['index'] < 20): - # group with the previous results using - # the same category with this one - index = current['index'] - gresults.insert(index, res) - - # update every index after the current one - # (including the current one) - for k in categoryPositions: - v = categoryPositions[k]['index'] - if v >= index: - categoryPositions[k]['index'] = v + 1 - - # update this category - current['count'] -= 1 - - else: - # same category - gresults.append(res) - - # update categoryIndex - categoryPositions[category] = {'index': len(gresults), 'count': 8} - - # return gresults - return gresults - - -def merge_two_infoboxes(infobox1, infobox2): - if 'urls' in infobox2: - urls1 = infobox1.get('urls', None) - if urls1 is None: - urls1 = [] - infobox1.set('urls', urls1) - - urlSet = set() - for url in infobox1.get('urls', []): - urlSet.add(url.get('url', None)) - - for url in infobox2.get('urls', []): - if url.get('url', None) not in urlSet: - urls1.append(url) - - if 'attributes' in infobox2: - attributes1 = infobox1.get('attributes', None) - if attributes1 is None: - attributes1 = [] - infobox1.set('attributes', attributes1) - - attributeSet = set() - for attribute in infobox1.get('attributes', []): - if attribute.get('label', None) not in attributeSet: - attributeSet.add(attribute.get('label', None)) - - for attribute in infobox2.get('attributes', []): - attributes1.append(attribute) - - if 'content' in infobox2: - content1 = infobox1.get('content', None) - content2 = infobox2.get('content', '') - if content1 is not None: - if content_result_len(content2) > content_result_len(content1): - infobox1['content'] = content2 - else: - infobox1.set('content', content2) - - -def merge_infoboxes(infoboxes): - results = [] - infoboxes_id = {} - for infobox in infoboxes: - add_infobox = True - infobox_id = infobox.get('id', None) - if infobox_id is not None: - existingIndex = infoboxes_id.get(infobox_id, None) - if existingIndex is not None: - merge_two_infoboxes(results[existingIndex], infobox) - add_infobox = False - - if add_infobox: - results.append(infobox) - infoboxes_id[infobox_id] = len(results) - 1 - - return results - - class Search(object): """Search information container""" @@ -334,10 +135,7 @@ class Search(object): # set blocked engines self.blocked_engines = get_blocked_engines(engines, request.cookies) - self.results = [] - self.suggestions = set() - self.answers = set() - self.infoboxes = [] + self.result_container = ResultContainer() self.request_data = {} # set specific language if set @@ -449,8 +247,6 @@ class Search(object): # init vars requests = [] - results_queue = Queue() - results = {} # increase number of searches number_of_searches += 1 @@ -504,9 +300,9 @@ class Search(object): # create a callback wrapper for the search engine results callback = make_callback( selected_engine['name'], - results_queue, engine.response, - request_params) + request_params, + self.result_container) # create dictionary which contain all # informations about the request @@ -539,42 +335,5 @@ class Search(object): # send all search-request threaded_requests(requests) - while not results_queue.empty(): - engine_name, engine_results = results_queue.get_nowait() - - # TODO type checks - [self.suggestions.add(x['suggestion']) - for x in list(engine_results) - if 'suggestion' in x - and engine_results.remove(x) is None] - - [self.answers.add(x['answer']) - for x in list(engine_results) - if 'answer' in x - and engine_results.remove(x) is None] - - self.infoboxes.extend(x for x in list(engine_results) - if 'infobox' in x - and engine_results.remove(x) is None) - - results[engine_name] = engine_results - - # update engine-specific stats - for engine_name, engine_results in results.items(): - engines[engine_name].stats['search_count'] += 1 - engines[engine_name].stats['result_count'] += len(engine_results) - - # score results and remove duplications - self.results = score_results(results) - - # merge infoboxes according to their ids - self.infoboxes = merge_infoboxes(self.infoboxes) - - # update engine stats, using calculated score - for result in self.results: - for res_engine in result['engines']: - engines[result['engine']]\ - .stats['score_count'] += result['score'] - # return results, suggestions, answers and infoboxes return self diff --git a/searx/tests/test_results.py b/searx/tests/test_results.py new file mode 100644 index 000000000..274b5b37a --- /dev/null +++ b/searx/tests/test_results.py @@ -0,0 +1,41 @@ +# -*- coding: utf-8 -*- + +from searx.results import ResultContainer +from searx.testing import SearxTestCase + + +def fake_result(url='https://aa.bb/cc?dd=ee#ff', + title='aaa', + content='bbb', + engine='wikipedia', **kwargs): + result = {'url': url, + 'title': title, + 'content': content, + 'engine': engine} + result.update(kwargs) + return result + + +# TODO +class ResultContainerTestCase(SearxTestCase): + + def test_empty(self): + c = ResultContainer() + self.assertEqual(c.get_ordered_results(), []) + + def test_one_result(self): + c = ResultContainer() + c.extend('wikipedia', [fake_result()]) + self.assertEqual(c.results_length(), 1) + + def test_one_suggestion(self): + c = ResultContainer() + c.extend('wikipedia', [fake_result(suggestion=True)]) + self.assertEqual(len(c.suggestions), 1) + self.assertEqual(c.results_length(), 0) + + def test_result_merge(self): + c = ResultContainer() + c.extend('wikipedia', [fake_result()]) + c.extend('wikidata', [fake_result(), fake_result(url='https://example.com/')]) + self.assertEqual(c.results_length(), 2) diff --git a/searx/tests/test_search.py b/searx/tests/test_search.py index 89d0b620d..af5fffd8b 100644 --- a/searx/tests/test_search.py +++ b/searx/tests/test_search.py @@ -1,25 +1,10 @@ # -*- coding: utf-8 -*- -from searx.search import score_results from searx.testing import SearxTestCase -def fake_result(url='https://aa.bb/cc?dd=ee#ff', - title='aaa', - content='bbb', - engine='wikipedia'): - return {'url': url, - 'title': title, - 'content': content, - 'engine': engine} +# TODO +class SearchTestCase(SearxTestCase): - -class ScoreResultsTestCase(SearxTestCase): - - def test_empty(self): - self.assertEqual(score_results(dict()), []) - - def test_urlparse(self): - results = score_results(dict(a=[fake_result(url='https://aa.bb/cc?dd=ee#ff')])) - parsed_url = results[0]['parsed_url'] - self.assertEqual(parsed_url.query, 'dd=ee') + def test_(self): + pass diff --git a/searx/tests/test_webapp.py b/searx/tests/test_webapp.py index 471ec2f2d..1d9cc649c 100644 --- a/searx/tests/test_webapp.py +++ b/searx/tests/test_webapp.py @@ -1,6 +1,7 @@ # -*- coding: utf-8 -*- import json +from mock import Mock from urlparse import ParseResult from searx import webapp from searx.testing import SearxTestCase @@ -33,7 +34,12 @@ class ViewsTestCase(SearxTestCase): ] def search_mock(search_self, *args): - search_self.results = self.test_results + search_self.result_container = Mock(get_ordered_results=lambda: self.test_results, + answers=set(), + suggestions=set(), + infoboxes=[], + results=self.test_results, + results_length=lambda: len(self.test_results)) webapp.Search.search = search_mock diff --git a/searx/webapp.py b/searx/webapp.py index 7f1621a6a..07750456d 100644 --- a/searx/webapp.py +++ b/searx/webapp.py @@ -383,7 +383,7 @@ def index(): plugins.call('post_search', request, locals()) - for result in search.results: + for result in search.result_container.get_ordered_results(): plugins.call('on_result', request, locals()) if not search.paging and engines[result['engine']].paging: @@ -411,7 +411,7 @@ def index(): minutes = int((timedifference.seconds / 60) % 60) hours = int(timedifference.seconds / 60 / 60) if hours == 0: - result['publishedDate'] = gettext(u'{minutes} minute(s) ago').format(minutes=minutes) # noqa + result['publishedDate'] = gettext(u'{minutes} minute(s) ago').format(minutes=minutes) else: result['publishedDate'] = gettext(u'{hours} hour(s), {minutes} minute(s) ago').format(hours=hours, minutes=minutes) # noqa else: @@ -419,17 +419,16 @@ def index(): if search.request_data.get('format') == 'json': return Response(json.dumps({'query': search.query, - 'results': search.results}), + 'results': search.result_container.get_ordered_results()}), mimetype='application/json') elif search.request_data.get('format') == 'csv': csv = UnicodeWriter(cStringIO.StringIO()) keys = ('title', 'url', 'content', 'host', 'engine', 'score') - if search.results: - csv.writerow(keys) - for row in search.results: - row['host'] = row['parsed_url'].netloc - csv.writerow([row.get(key, '') for key in keys]) - csv.stream.seek(0) + csv.writerow(keys) + for row in search.result_container.get_ordered_results(): + row['host'] = row['parsed_url'].netloc + csv.writerow([row.get(key, '') for key in keys]) + csv.stream.seek(0) response = Response(csv.stream.read(), mimetype='application/csv') cont_disp = 'attachment;Filename=searx_-_{0}.csv'.format(search.query) response.headers.add('Content-Disposition', cont_disp) @@ -437,24 +436,24 @@ def index(): elif search.request_data.get('format') == 'rss': response_rss = render( 'opensearch_response_rss.xml', - results=search.results, + results=search.result_container.get_ordered_results(), q=search.request_data['q'], - number_of_results=len(search.results), + number_of_results=search.result_container.results_length(), base_url=get_base_url() ) return Response(response_rss, mimetype='text/xml') return render( 'results.html', - results=search.results, + results=search.result_container.get_ordered_results(), q=search.request_data['q'], selected_categories=search.categories, paging=search.paging, pageno=search.pageno, base_url=get_base_url(), - suggestions=search.suggestions, - answers=search.answers, - infoboxes=search.infoboxes, + suggestions=search.result_container.suggestions, + answers=search.result_container.answers, + infoboxes=search.result_container.infoboxes, theme=get_current_theme_name(), favicons=global_favicons[themes.index(get_current_theme_name())] )