Source code for ievv_opensource.ievv_elasticsearch.search

# -*- coding: utf-8 -*-
from __future__ import print_function
from __future__ import unicode_literals

import json
import sys
from pprint import pformat

import elasticsearch_dsl
from django.conf import settings
from django.http import QueryDict
from future.utils import python_2_unicode_compatible
from pyelasticsearch import ElasticSearch, ElasticHttpError, ElasticHttpNotFoundError

from ievv_opensource.utils import ievv_colorize
from ievv_opensource.utils.singleton import Singleton


def _instant_print(text='', color=None, bold=False):
    print(ievv_colorize.colorize(text, color=color, bold=bold))
    sys.stdout.flush()


def _instant_prettyjson(data, is_newline_list=False, color=None, bold=False):
    if is_newline_list:
        for line in data.split('\n'):
            _instant_print(line)
    else:
        try:
            pretty = json.dumps(data, indent=4)
        except TypeError:
            pretty = pformat(data)
        print(ievv_colorize.colorize(pretty, color=color, bold=bold))
        sys.stdout.flush()


[docs]class IevvElasticSearch(ElasticSearch): def __prettyprint_request(self, method, path_components, body, query_params): _instant_print() _instant_print('>>>>>> prettyprint ElasticSearch request input >>>>>>', color=ievv_colorize.COLOR_BLUE) querystring = '' if query_params: querydict = QueryDict(mutable=True) querydict.update(query_params) querystring = '?' + querydict.urlencode() prettyformatted_requestheader = '{method} {path}{querystring}'.format( method=method, path=self._join_path(path_components), querystring=querystring) _instant_print(prettyformatted_requestheader) if body: _instant_prettyjson(body, is_newline_list=len(path_components) > 0 and path_components[-1] == '_bulk') _instant_print('<<<<<< prettyprint ElasticSearch request input <<<<<<', color=ievv_colorize.COLOR_BLUE) return prettyformatted_requestheader def __prettyprint_successful_response(self, response, prettyformatted_requestheader): _instant_print() _instant_print('>>>>>> {} response >>>>>>'.format(prettyformatted_requestheader), color=ievv_colorize.COLOR_GREEN) _instant_prettyjson(response) _instant_print('<<<<<< {} response <<<<<<'.format(prettyformatted_requestheader), color=ievv_colorize.COLOR_GREEN) _instant_print() def __prettyprint_error_response(self, error, prettyformatted_requestheader): _instant_print() _instant_print('>>>>>> {} response >>>>>>'.format(prettyformatted_requestheader), color=ievv_colorize.COLOR_RED, bold=True) _instant_print('HTTP error code: {}'.format(error.status_code)) try: errormessage = json.loads(error.error) except ValueError: _instant_print(error.error) else: _instant_prettyjson(errormessage) _instant_print('<<<<<< {} response <<<<<<'.format(prettyformatted_requestheader), ievv_colorize.COLOR_RED, bold=True) _instant_print()
[docs] def send_request(self, method, path_components, body='', query_params=None): """ Does exactly the same as the method from the superclass, but also prettyprints the request and response if the :setting:`IEVV_ELASTICSEARCH_PRETTYPRINT_ALL_REQUESTS` setting is ``True``. """ print_all_requests = getattr(settings, 'IEVV_ELASTICSEARCH_PRETTYPRINT_ALL_REQUESTS', False) prettyformatted_requestheader = None if print_all_requests: prettyformatted_requestheader = self.__prettyprint_request( method=method, path_components=path_components, body=body, query_params=query_params) try: response = super(IevvElasticSearch, self).send_request( method=method, path_components=path_components, body=body, query_params=query_params) except ElasticHttpError as error: if print_all_requests: self.__prettyprint_error_response( error=error, prettyformatted_requestheader=prettyformatted_requestheader) raise if print_all_requests: self.__prettyprint_successful_response( response=response, prettyformatted_requestheader=prettyformatted_requestheader) return response
[docs]class Connection(Singleton): """ Singleton wrapper around :class:`pyelasticsearch.ElasticSearch`. We do not try to wrap everything, instead we use the pyelasticsearch API as it is, and add extra features that makes it easier to use with IEVV and Django. We provide shortcuts for the most commonly used methods of :class:`pyelasticsearch.ElasticSearch`, some custom methods and we add some code to make unit testing easier. Usage:: from ievv_opensource.ievv_elasticsearch import search searchapi = search.Connection.get_instance() searchapi.bulk_index( index='contacts', doc_type='person', docs=[{'name': 'Joe Tester'}, {'name': 'Peter The Super Tester'}]) searchresult1 = searchapi.wrapped_search(query='name:joe OR name:freddy', index='contacts') searchresult2 = searchapi.wrapped_search(query={ 'query': { 'match': { 'name': { 'query': 'Joe' } } } }) print(searchresult1.total) for item in searchresult1: print(item.doc['name']) .. attribute:: elasticsearch The :class:`pyelasticsearch.ElasticSearch` object. """ def __init__(self): super(Connection, self).__init__() self._testmode = getattr(settings, 'IEVV_ELASTICSEARCH_TESTMODE', False) self._autorefresh_after_index_changes = self._testmode if self._testmode: self._connectionurl = settings.IEVV_ELASTICSEARCH_TESTURL else: if getattr(settings, 'IEVV_ELASTICSEARCH_URL', None): self._connectionurl = settings.IEVV_ELASTICSEARCH_URL else: raise ValueError('ievv_opensource.ievv_elasticsearch.search.Connection ' 'requires the IEVV_ELASTICSEARCH_URL setting') self.elasticsearch = IevvElasticSearch(self._connectionurl)
[docs] def clear_all_data(self): """ Clear all data from ElasticSearch. Perfect for unit tests. Only allowed when the ``IEVV_ELASTICSEARCH_TESTMODE``-setting is ``True``. Usage:: class MyTest(TestCase): def setUp(self): self.searchapi = search.Connection.get_instance() self.searchapi.clear_all_data() """ if not self._testmode: raise Exception('clear_all_data() is only allowed when settings.IEVV_ELASTICSEARCH_TESTMODE=True.') self.elasticsearch.delete_all_indexes() self.elasticsearch.refresh()
[docs] def index(self, *args, **kwargs): """ Wrapper around :meth:`pyelasticsearch.ElasticSearch.index`. Works exactly like the wrapped function, except that we provide some extra features that makes testing easier. When the ``IEVV_ELASTICSEARCH_TESTMODE``-setting is ``True``, we automatically run :meth:`pyelasticsearch.ElasticSearch.refresh` before returning. """ result = self.elasticsearch.index(*args, **kwargs) if self._autorefresh_after_index_changes: self.refresh() return result
[docs] def bulk_index(self, *args, **kwargs): """ Wrapper around :meth:`pyelasticsearch.ElasticSearch.bulk_index`. Works exactly like the wrapped function, except that we provide some extra features that makes testing easier. When the ``IEVV_ELASTICSEARCH_TESTMODE``-setting is ``True``, we automatically run :meth:`pyelasticsearch.ElasticSearch.refresh` before returning. """ result = self.elasticsearch.bulk_index(*args, **kwargs) if self._autorefresh_after_index_changes: self.refresh() return result
[docs] def bulk(self, *args, **kwargs): """ Wrapper around :meth:`pyelasticsearch.ElasticSearch.bulk`. Works exactly like the wrapped function, except that we provide some extra features that makes testing easier. When the ``IEVV_ELASTICSEARCH_TESTMODE``-setting is ``True``, we automatically run :meth:`pyelasticsearch.ElasticSearch.refresh` before returning. """ result = self.elasticsearch.bulk(*args, **kwargs) if self._autorefresh_after_index_changes: self.refresh() return result
[docs] def search(self, query, prettyprint_query=False, **kwargs): """ Wrapper around :meth:`pyelasticsearch.ElasticSearch.search`. Works just like the wrapped function, except that ``query`` can also be an ``elasticsearch_dsl.Search`` object, and you can only specify arguments as kwargs (no positional arguments). If ``query`` is an ``elasticsearch_dsl.Search`` object, we convert it to a dict with ``query.to_dict`` before forwaring it to the underling pyelasticsearch API. Args: query: A string, dict or ``elasticsearch_dsl.Search`` object. prettyprint_query: If this is ``True``, we prettyprint the query before executing it. Good for debugging. """ if isinstance(query, elasticsearch_dsl.Search): query = query.to_dict() if prettyprint_query or getattr( settings, 'IEVV_ELASTICSEARCH_PRETTYPRINT_ALL_SEARCH_QUERIES', False): _instant_print() _instant_print('>>> prettyprinted query >>>', color=ievv_colorize.COLOR_BLUE, bold=True) _instant_prettyjson(query) _instant_print('<<< prettyprinted query <<<', color=ievv_colorize.COLOR_BLUE, bold=True) return self.elasticsearch.search(query, **kwargs)
[docs] def refresh(self, *args, **kwargs): """ Wrapper around :meth:`pyelasticsearch.ElasticSearch.refresh`. Works exactly like the wrapped function. """ return self.elasticsearch.refresh(*args, **kwargs)
[docs] def delete_index(self, *args, **kwargs): """ Wrapper around :meth:`pyelasticsearch.ElasticSearch.delete_index`. Works exactly like the wrapped function. """ return self.elasticsearch.delete_index(*args, **kwargs)
[docs] def delete(self, *args, **kwargs): """ Wrapper around :meth:`pyelasticsearch.ElasticSearch.delete`. Works exactly like the wrapped function. """ return self.elasticsearch.delete(*args, **kwargs)
[docs] def get(self, *args, **kwargs): """ Wrapper around :meth:`pyelasticsearch.ElasticSearch.get`. Works exactly like the wrapped function. """ return self.elasticsearch.get(*args, **kwargs)
[docs] def wrapped_get(self, *args, **kwargs): """ Just like :meth:`.get`, but we return a :class:`.SearchResultItem` instead of the raw search response. """ return SearchResultItem(self.get(*args, **kwargs))
[docs] def get_or_none(self, *args, **kwargs): """ Works like :meth:`.get`, but instead of raising an exception, we return ``None`` if the requested object does not exist. """ try: return self.get(*args, **kwargs) except ElasticHttpNotFoundError: return None
[docs] def wrapped_get_or_none(self, *args, **kwargs): """ Works like :meth:`.wrapped_get`, but instead of raising an exception, we return ``None`` if the requested object does not exist. """ try: return self.wrapped_get(*args, **kwargs) except ElasticHttpNotFoundError: return None
[docs] def search_all(self, **kwargs): """ Get all documents in the index. Nice for testing and debugging of small datasets. Useless in production. ``**kwargs`` are forwarded to :meth:`.search`, but the query argument is added automatically. """ return self.search( query={ 'query': { 'match_all': {} } }, **kwargs )
[docs] def wrapped_search_all(self, **kwargs): """ Just like :meth:`.search_all`, but wraps the results in a :class:`.SearchResultWrapper`. """ return SearchResultWrapper(self.search_all(**kwargs))
[docs]@python_2_unicode_compatible class SearchResultItem(object): """ Wrapper around a single dictionary in the ``hits.hits`` list of the result returned by :meth:`pyelasticsearch.ElasticSearch.search`. """ def __init__(self, search_hit): self.search_hit = search_hit @property def id(self): """ Returns the value of the ``_id`` key of the search hit. """ return self.search_hit['_id'] @property def index(self): """ Returns the value of the ``_index`` key of the search hit. """ return self.search_hit['_index'] @property def score(self): """ Returns the value of the ``_score`` key of the search hit. """ return self.search_hit['_score'] @property def source(self): """ Returns the value of the ``_source`` key of the search hit. """ return self.search_hit['_source'] @property def doc_type(self): """ Returns the value of the ``_type`` key of the search hit. """ return self.search_hit['_type'] def __str__(self): return pformat(self.search_hit) def __repr__(self): return 'SearchResultItem({!r})'.format(self.search_hit)
[docs]@python_2_unicode_compatible class SearchResultWrapper(object): """ An efficient wrapper around the data returned by :meth:`pyelasticsearch.ElasticSearch.search`. """ def __init__(self, searchresult): """ :param searchresult: Data returned by :meth:`pyelasticsearch.ElasticSearch.search`. """ self.searchresult = searchresult @property def total(self): """ Returns the total number of hits. """ return self.searchresult['hits']['total'] @property def retrieved_hits_count(self): """ Returns the number of retrieved hits. """ return len(self.searchresult['hits']['hits']) def __iter__(self): """ Iterate over the search result hits, and yield :class:`.SearchResultItem` objects. """ for search_hit in self.searchresult['hits']['hits']: yield SearchResultItem(search_hit) def __getitem__(self, index): """ Get the search result at the given index as a :class:`.SearchResultItem` """ return SearchResultItem(self.searchresult['hits']['hits'][index])
[docs] def first(self): """ Shortcut for getting the first search result as a :class:`.SearchResultItem`. """ return self[0]
def __str__(self): return pformat(self.searchresult) def __repr__(self): return repr(self.searchresult)
[docs]class Paginator: """ Paginator for :class:`ievv_opensource.ievv_elasticsearch.search.SearchResultWrapper`. The paginator counts the first page as 0, the second as 1, and so on. """ def __init__(self, searchresultwrapper, page_number, page_size=100, resultitemwrapper=None): """ Parameters: searchresultwrapper: A :class:`ievv_opensource.ievv_elasticsearch.search.SearchResultWrapper`. page_number: The current page number. page_size: Number of items per page. resultitemwrapper: A class/callable that takes a single item in the ``searchresultwrapper`` and does something with it before returning it when iterating the search result. Defaults to just returning the item as returned from ``searchresultwrapper.__iter__``. """ self.searchresultwrapper = searchresultwrapper self.page_number = page_number self.page_size = page_size if resultitemwrapper: self.resultitemwrapper = resultitemwrapper else: self.resultitemwrapper = lambda item: item @property def total_items(self): """ Returns the number of items in total in all pages. """ return self.searchresultwrapper.total @property def number_of_items_in_current_page(self): """ Returns the number of items in the current page. """ return self.searchresultwrapper.retrieved_hits_count @property def previous_page_number(self): return self.page_number - 1 @property def next_page_number(self): return self.page_number + 1
[docs] def get_page_startindex(self, pagenumber): """ Get the start index of the given ``pagenumber``. """ return self.page_size * pagenumber
[docs] def get_current_page_startindex(self): """ Get the start index of the current page. """ return self.get_page_startindex(self.page_number)
[docs] def page_has_content(self, pagenumber): """ Check if the given ``pagenumber`` is within the total number of items in the given ``searchresultwrapper``. Returns: A boolean. """ if pagenumber < 0: return False else: return self.get_page_startindex(pagenumber) < self.searchresultwrapper.total
[docs] def current_page_has_content(self): """ Check if current page is within the total number of items in the given ``searchresultwrapper``. Returns: A boolean. """ return self.page_has_content(self.get_current_page_startindex())
[docs] def has_next(self): """ Check if we have a next page. Checks if the start index of the next page is lower than ``searchresultwrapper.total``. Returns: A boolean. """ return self.page_has_content(self.page_number + 1)
[docs] def has_previous(self): """ Check if we have a previous page. Checks if the start index of the previous page is larger than 0. Returns: A boolean. """ return self.page_has_content(self.page_number - 1)
def __iter__(self): """ Iterate the search results, yielding objects wrapped by the ``resultitemwrapper`` given to the constructor (or the raw search result items if no ``resultitemwrapper`` was provided). """ for item in self.searchresultwrapper: yield self.resultitemwrapper(item)