diff options
author | Chayim <chayim@users.noreply.github.com> | 2021-10-25 17:18:27 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-10-25 17:18:27 +0300 |
commit | ddd1496782cc8eb15fca6c9059b2b08a03efe366 (patch) | |
tree | ab907bd2fd2f7cfb951e750de69ea9d4478a5b9c /redis/commands/search | |
parent | 3946da29d7e451a20289fb6e282516fa24e402af (diff) | |
download | redis-py-ddd1496782cc8eb15fca6c9059b2b08a03efe366.tar.gz |
Adding support for redisearch (#1640)
Diffstat (limited to 'redis/commands/search')
-rw-r--r-- | redis/commands/search/__init__.py | 96 | ||||
-rw-r--r-- | redis/commands/search/_util.py | 10 | ||||
-rw-r--r-- | redis/commands/search/aggregation.py | 408 | ||||
-rw-r--r-- | redis/commands/search/commands.py | 704 | ||||
-rw-r--r-- | redis/commands/search/document.py | 16 | ||||
-rw-r--r-- | redis/commands/search/field.py | 94 | ||||
-rw-r--r-- | redis/commands/search/indexDefinition.py | 80 | ||||
-rw-r--r-- | redis/commands/search/query.py | 328 | ||||
-rw-r--r-- | redis/commands/search/querystring.py | 324 | ||||
-rw-r--r-- | redis/commands/search/reducers.py | 178 | ||||
-rw-r--r-- | redis/commands/search/result.py | 75 | ||||
-rw-r--r-- | redis/commands/search/suggestion.py | 54 |
12 files changed, 2367 insertions, 0 deletions
diff --git a/redis/commands/search/__init__.py b/redis/commands/search/__init__.py new file mode 100644 index 0000000..8320ad4 --- /dev/null +++ b/redis/commands/search/__init__.py @@ -0,0 +1,96 @@ +from .commands import SearchCommands + + +class Search(SearchCommands): + """ + Create a client for talking to search. + It abstracts the API of the module and lets you just use the engine. + """ + + class BatchIndexer(object): + """ + A batch indexer allows you to automatically batch + document indexing in pipelines, flushing it every N documents. + """ + + def __init__(self, client, chunk_size=1000): + + self.client = client + self.execute_command = client.execute_command + self.pipeline = client.pipeline(transaction=False, shard_hint=None) + self.total = 0 + self.chunk_size = chunk_size + self.current_chunk = 0 + + def __del__(self): + if self.current_chunk: + self.commit() + + def add_document( + self, + doc_id, + nosave=False, + score=1.0, + payload=None, + replace=False, + partial=False, + no_create=False, + **fields + ): + """ + Add a document to the batch query + """ + self.client._add_document( + doc_id, + conn=self.pipeline, + nosave=nosave, + score=score, + payload=payload, + replace=replace, + partial=partial, + no_create=no_create, + **fields + ) + self.current_chunk += 1 + self.total += 1 + if self.current_chunk >= self.chunk_size: + self.commit() + + def add_document_hash( + self, + doc_id, + score=1.0, + replace=False, + ): + """ + Add a hash to the batch query + """ + self.client._add_document_hash( + doc_id, + conn=self.pipeline, + score=score, + replace=replace, + ) + self.current_chunk += 1 + self.total += 1 + if self.current_chunk >= self.chunk_size: + self.commit() + + def commit(self): + """ + Manually commit and flush the batch indexing query + """ + self.pipeline.execute() + self.current_chunk = 0 + + def __init__(self, client, index_name="idx"): + """ + Create a new Client for the given index_name. + The default name is `idx` + + If conn is not None, we employ an already existing redis connection + """ + self.client = client + self.index_name = index_name + self.execute_command = client.execute_command + self.pipeline = client.pipeline diff --git a/redis/commands/search/_util.py b/redis/commands/search/_util.py new file mode 100644 index 0000000..b4ac19f --- /dev/null +++ b/redis/commands/search/_util.py @@ -0,0 +1,10 @@ +import six + + +def to_string(s): + if isinstance(s, six.string_types): + return s + elif isinstance(s, six.binary_type): + return s.decode("utf-8", "ignore") + else: + return s # Not a string we care about diff --git a/redis/commands/search/aggregation.py b/redis/commands/search/aggregation.py new file mode 100644 index 0000000..df912f8 --- /dev/null +++ b/redis/commands/search/aggregation.py @@ -0,0 +1,408 @@ +from six import string_types + +FIELDNAME = object() + + +class Limit(object): + def __init__(self, offset=0, count=0): + self.offset = offset + self.count = count + + def build_args(self): + if self.count: + return ["LIMIT", str(self.offset), str(self.count)] + else: + return [] + + +class Reducer(object): + """ + Base reducer object for all reducers. + + See the `redisearch.reducers` module for the actual reducers. + """ + + NAME = None + + def __init__(self, *args): + self._args = args + self._field = None + self._alias = None + + def alias(self, alias): + """ + Set the alias for this reducer. + + ### Parameters + + - **alias**: The value of the alias for this reducer. If this is the + special value `aggregation.FIELDNAME` then this reducer will be + aliased using the same name as the field upon which it operates. + Note that using `FIELDNAME` is only possible on reducers which + operate on a single field value. + + This method returns the `Reducer` object making it suitable for + chaining. + """ + if alias is FIELDNAME: + if not self._field: + raise ValueError("Cannot use FIELDNAME alias with no field") + # Chop off initial '@' + alias = self._field[1:] + self._alias = alias + return self + + @property + def args(self): + return self._args + + +class SortDirection(object): + """ + This special class is used to indicate sort direction. + """ + + DIRSTRING = None + + def __init__(self, field): + self.field = field + + +class Asc(SortDirection): + """ + Indicate that the given field should be sorted in ascending order + """ + + DIRSTRING = "ASC" + + +class Desc(SortDirection): + """ + Indicate that the given field should be sorted in descending order + """ + + DIRSTRING = "DESC" + + +class Group(object): + """ + This object automatically created in the `AggregateRequest.group_by()` + """ + + def __init__(self, fields, reducers): + if not reducers: + raise ValueError("Need at least one reducer") + + fields = [fields] if isinstance(fields, string_types) else fields + reducers = [reducers] if isinstance(reducers, Reducer) else reducers + + self.fields = fields + self.reducers = reducers + self.limit = Limit() + + def build_args(self): + ret = ["GROUPBY", str(len(self.fields))] + ret.extend(self.fields) + for reducer in self.reducers: + ret += ["REDUCE", reducer.NAME, str(len(reducer.args))] + ret.extend(reducer.args) + if reducer._alias is not None: + ret += ["AS", reducer._alias] + return ret + + +class Projection(object): + """ + This object automatically created in the `AggregateRequest.apply()` + """ + + def __init__(self, projector, alias=None): + self.alias = alias + self.projector = projector + + def build_args(self): + ret = ["APPLY", self.projector] + if self.alias is not None: + ret += ["AS", self.alias] + + return ret + + +class SortBy(object): + """ + This object automatically created in the `AggregateRequest.sort_by()` + """ + + def __init__(self, fields, max=0): + self.fields = fields + self.max = max + + def build_args(self): + fields_args = [] + for f in self.fields: + if isinstance(f, SortDirection): + fields_args += [f.field, f.DIRSTRING] + else: + fields_args += [f] + + ret = ["SORTBY", str(len(fields_args))] + ret.extend(fields_args) + if self.max > 0: + ret += ["MAX", str(self.max)] + + return ret + + +class AggregateRequest(object): + """ + Aggregation request which can be passed to `Client.aggregate`. + """ + + def __init__(self, query="*"): + """ + Create an aggregation request. This request may then be passed to + `client.aggregate()`. + + In order for the request to be usable, it must contain at least one + group. + + - **query** Query string for filtering records. + + All member methods (except `build_args()`) + return the object itself, making them useful for chaining. + """ + self._query = query + self._aggregateplan = [] + self._loadfields = [] + self._limit = Limit() + self._max = 0 + self._with_schema = False + self._verbatim = False + self._cursor = [] + + def load(self, *fields): + """ + Indicate the fields to be returned in the response. These fields are + returned in addition to any others implicitly specified. + + ### Parameters + + - **fields**: One or more fields in the format of `@field` + """ + self._loadfields.extend(fields) + return self + + def group_by(self, fields, *reducers): + """ + Specify by which fields to group the aggregation. + + ### Parameters + + - **fields**: Fields to group by. This can either be a single string, + or a list of strings. both cases, the field should be specified as + `@field`. + - **reducers**: One or more reducers. Reducers may be found in the + `aggregation` module. + """ + group = Group(fields, reducers) + self._aggregateplan.extend(group.build_args()) + + return self + + def apply(self, **kwexpr): + """ + Specify one or more projection expressions to add to each result + + ### Parameters + + - **kwexpr**: One or more key-value pairs for a projection. The key is + the alias for the projection, and the value is the projection + expression itself, for example `apply(square_root="sqrt(@foo)")` + """ + for alias, expr in kwexpr.items(): + projection = Projection(expr, alias) + self._aggregateplan.extend(projection.build_args()) + + return self + + def limit(self, offset, num): + """ + Sets the limit for the most recent group or query. + + If no group has been defined yet (via `group_by()`) then this sets + the limit for the initial pool of results from the query. Otherwise, + this limits the number of items operated on from the previous group. + + Setting a limit on the initial search results may be useful when + attempting to execute an aggregation on a sample of a large data set. + + ### Parameters + + - **offset**: Result offset from which to begin paging + - **num**: Number of results to return + + + Example of sorting the initial results: + + ``` + AggregateRequest("@sale_amount:[10000, inf]")\ + .limit(0, 10)\ + .group_by("@state", r.count()) + ``` + + Will only group by the states found in the first 10 results of the + query `@sale_amount:[10000, inf]`. On the other hand, + + ``` + AggregateRequest("@sale_amount:[10000, inf]")\ + .limit(0, 1000)\ + .group_by("@state", r.count()\ + .limit(0, 10) + ``` + + Will group all the results matching the query, but only return the + first 10 groups. + + If you only wish to return a *top-N* style query, consider using + `sort_by()` instead. + + """ + limit = Limit(offset, num) + self._limit = limit + return self + + def sort_by(self, *fields, **kwargs): + """ + Indicate how the results should be sorted. This can also be used for + *top-N* style queries + + ### Parameters + + - **fields**: The fields by which to sort. This can be either a single + field or a list of fields. If you wish to specify order, you can + use the `Asc` or `Desc` wrapper classes. + - **max**: Maximum number of results to return. This can be + used instead of `LIMIT` and is also faster. + + + Example of sorting by `foo` ascending and `bar` descending: + + ``` + sort_by(Asc("@foo"), Desc("@bar")) + ``` + + Return the top 10 customers: + + ``` + AggregateRequest()\ + .group_by("@customer", r.sum("@paid").alias(FIELDNAME))\ + .sort_by(Desc("@paid"), max=10) + ``` + """ + if isinstance(fields, (string_types, SortDirection)): + fields = [fields] + + max = kwargs.get("max", 0) + sortby = SortBy(fields, max) + + self._aggregateplan.extend(sortby.build_args()) + return self + + def filter(self, expressions): + """ + Specify filter for post-query results using predicates relating to + values in the result set. + + ### Parameters + + - **fields**: Fields to group by. This can either be a single string, + or a list of strings. + """ + if isinstance(expressions, string_types): + expressions = [expressions] + + for expression in expressions: + self._aggregateplan.extend(["FILTER", expression]) + + return self + + def with_schema(self): + """ + If set, the `schema` property will contain a list of `[field, type]` + entries in the result object. + """ + self._with_schema = True + return self + + def verbatim(self): + self._verbatim = True + return self + + def cursor(self, count=0, max_idle=0.0): + args = ["WITHCURSOR"] + if count: + args += ["COUNT", str(count)] + if max_idle: + args += ["MAXIDLE", str(max_idle * 1000)] + self._cursor = args + return self + + def _limit_2_args(self, limit): + if limit[1]: + return ["LIMIT"] + [str(x) for x in limit] + else: + return [] + + def build_args(self): + # @foo:bar ... + ret = [self._query] + + if self._with_schema: + ret.append("WITHSCHEMA") + + if self._verbatim: + ret.append("VERBATIM") + + if self._cursor: + ret += self._cursor + + if self._loadfields: + ret.append("LOAD") + ret.append(str(len(self._loadfields))) + ret.extend(self._loadfields) + + ret.extend(self._aggregateplan) + + ret += self._limit.build_args() + + return ret + + +class Cursor(object): + def __init__(self, cid): + self.cid = cid + self.max_idle = 0 + self.count = 0 + + def build_args(self): + args = [str(self.cid)] + if self.max_idle: + args += ["MAXIDLE", str(self.max_idle)] + if self.count: + args += ["COUNT", str(self.count)] + return args + + +class AggregateResult(object): + def __init__(self, rows, cursor, schema): + self.rows = rows + self.cursor = cursor + self.schema = schema + + def __repr__(self): + return "<{} at 0x{:x} Rows={}, Cursor={}>".format( + self.__class__.__name__, + id(self), + len(self.rows), + self.cursor.cid if self.cursor else -1, + ) diff --git a/redis/commands/search/commands.py b/redis/commands/search/commands.py new file mode 100644 index 0000000..6074d29 --- /dev/null +++ b/redis/commands/search/commands.py @@ -0,0 +1,704 @@ +import itertools +import time +import six + +from .document import Document +from .result import Result +from .query import Query +from ._util import to_string +from .aggregation import AggregateRequest, AggregateResult, Cursor +from .suggestion import SuggestionParser + +NUMERIC = "NUMERIC" + +CREATE_CMD = "FT.CREATE" +ALTER_CMD = "FT.ALTER" +SEARCH_CMD = "FT.SEARCH" +ADD_CMD = "FT.ADD" +ADDHASH_CMD = "FT.ADDHASH" +DROP_CMD = "FT.DROP" +EXPLAIN_CMD = "FT.EXPLAIN" +DEL_CMD = "FT.DEL" +AGGREGATE_CMD = "FT.AGGREGATE" +CURSOR_CMD = "FT.CURSOR" +SPELLCHECK_CMD = "FT.SPELLCHECK" +DICT_ADD_CMD = "FT.DICTADD" +DICT_DEL_CMD = "FT.DICTDEL" +DICT_DUMP_CMD = "FT.DICTDUMP" +GET_CMD = "FT.GET" +MGET_CMD = "FT.MGET" +CONFIG_CMD = "FT.CONFIG" +TAGVALS_CMD = "FT.TAGVALS" +ALIAS_ADD_CMD = "FT.ALIASADD" +ALIAS_UPDATE_CMD = "FT.ALIASUPDATE" +ALIAS_DEL_CMD = "FT.ALIASDEL" +INFO_CMD = "FT.INFO" +SUGADD_COMMAND = "FT.SUGADD" +SUGDEL_COMMAND = "FT.SUGDEL" +SUGLEN_COMMAND = "FT.SUGLEN" +SUGGET_COMMAND = "FT.SUGGET" +SYNUPDATE_CMD = "FT.SYNUPDATE" +SYNDUMP_CMD = "FT.SYNDUMP" + +NOOFFSETS = "NOOFFSETS" +NOFIELDS = "NOFIELDS" +STOPWORDS = "STOPWORDS" +WITHSCORES = "WITHSCORES" +FUZZY = "FUZZY" +WITHPAYLOADS = "WITHPAYLOADS" + + +class SearchCommands: + """Search commands.""" + + def batch_indexer(self, chunk_size=100): + """ + Create a new batch indexer from the client with a given chunk size + """ + return self.BatchIndexer(self, chunk_size=chunk_size) + + def create_index( + self, + fields, + no_term_offsets=False, + no_field_flags=False, + stopwords=None, + definition=None, + ): + """ + Create the search index. The index must not already exist. + + ### Parameters: + + - **fields**: a list of TextField or NumericField objects + - **no_term_offsets**: If true, we will not save term offsets in + the index + - **no_field_flags**: If true, we will not save field flags that + allow searching in specific fields + - **stopwords**: If not None, we create the index with this custom + stopword list. The list can be empty + """ + + args = [CREATE_CMD, self.index_name] + if definition is not None: + args += definition.args + if no_term_offsets: + args.append(NOOFFSETS) + if no_field_flags: + args.append(NOFIELDS) + if stopwords is not None and isinstance(stopwords, (list, tuple, set)): + args += [STOPWORDS, len(stopwords)] + if len(stopwords) > 0: + args += list(stopwords) + + args.append("SCHEMA") + try: + args += list(itertools.chain(*(f.redis_args() for f in fields))) + except TypeError: + args += fields.redis_args() + + return self.execute_command(*args) + + def alter_schema_add(self, fields): + """ + Alter the existing search index by adding new fields. The index + must already exist. + + ### Parameters: + + - **fields**: a list of Field objects to add for the index + """ + + args = [ALTER_CMD, self.index_name, "SCHEMA", "ADD"] + try: + args += list(itertools.chain(*(f.redis_args() for f in fields))) + except TypeError: + args += fields.redis_args() + + return self.execute_command(*args) + + def drop_index(self, delete_documents=True): + """ + Drop the index if it exists. Deprecated from RediSearch 2.0. + + ### Parameters: + + - **delete_documents**: If `True`, all documents will be deleted. + """ + keep_str = "" if delete_documents else "KEEPDOCS" + return self.execute_command(DROP_CMD, self.index_name, keep_str) + + def dropindex(self, delete_documents=False): + """ + Drop the index if it exists. + Replaced `drop_index` in RediSearch 2.0. + Default behavior was changed to not delete the indexed documents. + + ### Parameters: + + - **delete_documents**: If `True`, all documents will be deleted. + """ + keep_str = "" if delete_documents else "KEEPDOCS" + return self.execute_command(DROP_CMD, self.index_name, keep_str) + + def _add_document( + self, + doc_id, + conn=None, + nosave=False, + score=1.0, + payload=None, + replace=False, + partial=False, + language=None, + no_create=False, + **fields + ): + """ + Internal add_document used for both batch and single doc indexing + """ + if conn is None: + conn = self.client + + if partial or no_create: + replace = True + + args = [ADD_CMD, self.index_name, doc_id, score] + if nosave: + args.append("NOSAVE") + if payload is not None: + args.append("PAYLOAD") + args.append(payload) + if replace: + args.append("REPLACE") + if partial: + args.append("PARTIAL") + if no_create: + args.append("NOCREATE") + if language: + args += ["LANGUAGE", language] + args.append("FIELDS") + args += list(itertools.chain(*fields.items())) + return conn.execute_command(*args) + + def _add_document_hash( + self, + doc_id, + conn=None, + score=1.0, + language=None, + replace=False, + ): + """ + Internal add_document_hash used for both batch and single doc indexing + """ + if conn is None: + conn = self.client + + args = [ADDHASH_CMD, self.index_name, doc_id, score] + + if replace: + args.append("REPLACE") + + if language: + args += ["LANGUAGE", language] + + return conn.execute_command(*args) + + def add_document( + self, + doc_id, + nosave=False, + score=1.0, + payload=None, + replace=False, + partial=False, + language=None, + no_create=False, + **fields + ): + """ + Add a single document to the index. + + ### Parameters + + - **doc_id**: the id of the saved document. + - **nosave**: if set to true, we just index the document, and don't + save a copy of it. This means that searches will just + return ids. + - **score**: the document ranking, between 0.0 and 1.0 + - **payload**: optional inner-index payload we can save for fast + i access in scoring functions + - **replace**: if True, and the document already is in the index, + we perform an update and reindex the document + - **partial**: if True, the fields specified will be added to the + existing document. + This has the added benefit that any fields specified + with `no_index` + will not be reindexed again. Implies `replace` + - **language**: Specify the language used for document tokenization. + - **no_create**: if True, the document is only updated and reindexed + if it already exists. + If the document does not exist, an error will be + returned. Implies `replace` + - **fields** kwargs dictionary of the document fields to be saved + and/or indexed. + NOTE: Geo points shoule be encoded as strings of "lon,lat" + """ + return self._add_document( + doc_id, + conn=None, + nosave=nosave, + score=score, + payload=payload, + replace=replace, + partial=partial, + language=language, + no_create=no_create, + **fields + ) + + def add_document_hash( + self, + doc_id, + score=1.0, + language=None, + replace=False, + ): + """ + Add a hash document to the index. + + ### Parameters + + - **doc_id**: the document's id. This has to be an existing HASH key + in Redis that will hold the fields the index needs. + - **score**: the document ranking, between 0.0 and 1.0 + - **replace**: if True, and the document already is in the index, we + perform an update and reindex the document + - **language**: Specify the language used for document tokenization. + """ + return self._add_document_hash( + doc_id, + conn=None, + score=score, + language=language, + replace=replace, + ) + + def delete_document(self, doc_id, conn=None, delete_actual_document=False): + """ + Delete a document from index + Returns 1 if the document was deleted, 0 if not + + ### Parameters + + - **delete_actual_document**: if set to True, RediSearch also delete + the actual document if it is in the index + """ + args = [DEL_CMD, self.index_name, doc_id] + if conn is None: + conn = self.client + if delete_actual_document: + args.append("DD") + + return conn.execute_command(*args) + + def load_document(self, id): + """ + Load a single document by id + """ + fields = self.client.hgetall(id) + if six.PY3: + f2 = {to_string(k): to_string(v) for k, v in fields.items()} + fields = f2 + + try: + del fields["id"] + except KeyError: + pass + + return Document(id=id, **fields) + + def get(self, *ids): + """ + Returns the full contents of multiple documents. + + ### Parameters + + - **ids**: the ids of the saved documents. + """ + + return self.client.execute_command(MGET_CMD, self.index_name, *ids) + + def info(self): + """ + Get info an stats about the the current index, including the number of + documents, memory consumption, etc + """ + + res = self.client.execute_command(INFO_CMD, self.index_name) + it = six.moves.map(to_string, res) + return dict(six.moves.zip(it, it)) + + def _mk_query_args(self, query): + args = [self.index_name] + + if isinstance(query, six.string_types): + # convert the query from a text to a query object + query = Query(query) + if not isinstance(query, Query): + raise ValueError("Bad query type %s" % type(query)) + + args += query.get_args() + return args, query + + def search(self, query): + """ + Search the index for a given query, and return a result of documents + + ### Parameters + + - **query**: the search query. Either a text for simple queries with + default parameters, or a Query object for complex queries. + See RediSearch's documentation on query format + """ + args, query = self._mk_query_args(query) + st = time.time() + res = self.execute_command(SEARCH_CMD, *args) + + return Result( + res, + not query._no_content, + duration=(time.time() - st) * 1000.0, + has_payload=query._with_payloads, + with_scores=query._with_scores, + ) + + def explain(self, query): + args, query_text = self._mk_query_args(query) + return self.execute_command(EXPLAIN_CMD, *args) + + def aggregate(self, query): + """ + Issue an aggregation query + + ### Parameters + + **query**: This can be either an `AggeregateRequest`, or a `Cursor` + + An `AggregateResult` object is returned. You can access the rows from + its `rows` property, which will always yield the rows of the result. + """ + if isinstance(query, AggregateRequest): + has_cursor = bool(query._cursor) + cmd = [AGGREGATE_CMD, self.index_name] + query.build_args() + elif isinstance(query, Cursor): + has_cursor = True + cmd = [CURSOR_CMD, "READ", self.index_name] + query.build_args() + else: + raise ValueError("Bad query", query) + + raw = self.execute_command(*cmd) + if has_cursor: + if isinstance(query, Cursor): + query.cid = raw[1] + cursor = query + else: + cursor = Cursor(raw[1]) + raw = raw[0] + else: + cursor = None + + if isinstance(query, AggregateRequest) and query._with_schema: + schema = raw[0] + rows = raw[2:] + else: + schema = None + rows = raw[1:] + + res = AggregateResult(rows, cursor, schema) + return res + + def spellcheck(self, query, distance=None, include=None, exclude=None): + """ + Issue a spellcheck query + + ### Parameters + + **query**: search query. + **distance***: the maximal Levenshtein distance for spelling + suggestions (default: 1, max: 4). + **include**: specifies an inclusion custom dictionary. + **exclude**: specifies an exclusion custom dictionary. + """ + cmd = [SPELLCHECK_CMD, self.index_name, query] + if distance: + cmd.extend(["DISTANCE", distance]) + + if include: + cmd.extend(["TERMS", "INCLUDE", include]) + + if exclude: + cmd.extend(["TERMS", "EXCLUDE", exclude]) + + raw = self.execute_command(*cmd) + + corrections = {} + if raw == 0: + return corrections + + for _correction in raw: + if isinstance(_correction, six.integer_types) and _correction == 0: + continue + + if len(_correction) != 3: + continue + if not _correction[2]: + continue + if not _correction[2][0]: + continue + + # For spellcheck output + # 1) 1) "TERM" + # 2) "{term1}" + # 3) 1) 1) "{score1}" + # 2) "{suggestion1}" + # 2) 1) "{score2}" + # 2) "{suggestion2}" + # + # Following dictionary will be made + # corrections = { + # '{term1}': [ + # {'score': '{score1}', 'suggestion': '{suggestion1}'}, + # {'score': '{score2}', 'suggestion': '{suggestion2}'} + # ] + # } + corrections[_correction[1]] = [ + {"score": _item[0], "suggestion": _item[1]} + for _item in _correction[2] + ] + + return corrections + + def dict_add(self, name, *terms): + """Adds terms to a dictionary. + + ### Parameters + + - **name**: Dictionary name. + - **terms**: List of items for adding to the dictionary. + """ + cmd = [DICT_ADD_CMD, name] + cmd.extend(terms) + return self.execute_command(*cmd) + + def dict_del(self, name, *terms): + """Deletes terms from a dictionary. + + ### Parameters + + - **name**: Dictionary name. + - **terms**: List of items for removing from the dictionary. + """ + cmd = [DICT_DEL_CMD, name] + cmd.extend(terms) + return self.execute_command(*cmd) + + def dict_dump(self, name): + """Dumps all terms in the given dictionary. + + ### Parameters + + - **name**: Dictionary name. + """ + cmd = [DICT_DUMP_CMD, name] + return self.execute_command(*cmd) + + def config_set(self, option, value): + """Set runtime configuration option. + + ### Parameters + + - **option**: the name of the configuration option. + - **value**: a value for the configuration option. + """ + cmd = [CONFIG_CMD, "SET", option, value] + raw = self.execute_command(*cmd) + return raw == "OK" + + def config_get(self, option): + """Get runtime configuration option value. + + ### Parameters + + - **option**: the name of the configuration option. + """ + cmd = [CONFIG_CMD, "GET", option] + res = {} + raw = self.execute_command(*cmd) + if raw: + for kvs in raw: + res[kvs[0]] = kvs[1] + return res + + def tagvals(self, tagfield): + """ + Return a list of all possible tag values + + ### Parameters + + - **tagfield**: Tag field name + """ + + return self.execute_command(TAGVALS_CMD, self.index_name, tagfield) + + def aliasadd(self, alias): + """ + Alias a search index - will fail if alias already exists + + ### Parameters + + - **alias**: Name of the alias to create + """ + + return self.execute_command(ALIAS_ADD_CMD, alias, self.index_name) + + def aliasupdate(self, alias): + """ + Updates an alias - will fail if alias does not already exist + + ### Parameters + + - **alias**: Name of the alias to create + """ + + return self.execute_command(ALIAS_UPDATE_CMD, alias, self.index_name) + + def aliasdel(self, alias): + """ + Removes an alias to a search index + + ### Parameters + + - **alias**: Name of the alias to delete + """ + return self.execute_command(ALIAS_DEL_CMD, alias) + + def sugadd(self, key, *suggestions, **kwargs): + """ + Add suggestion terms to the AutoCompleter engine. Each suggestion has + a score and string. + If kwargs["increment"] is true and the terms are already in the + server's dictionary, we increment their scores. + More information `here <https://oss.redis.com/redisearch/master/Commands/#ftsugadd>`_. # noqa + """ + # If Transaction is not False it will MULTI/EXEC which will error + pipe = self.pipeline(transaction=False) + for sug in suggestions: + args = [SUGADD_COMMAND, key, sug.string, sug.score] + if kwargs.get("increment"): + args.append("INCR") + if sug.payload: + args.append("PAYLOAD") + args.append(sug.payload) + + pipe.execute_command(*args) + + return pipe.execute()[-1] + + def suglen(self, key): + """ + Return the number of entries in the AutoCompleter index. + More information `here <https://oss.redis.com/redisearch/master/Commands/#ftsuglen>`_. # noqa + """ + return self.execute_command(SUGLEN_COMMAND, key) + + def sugdel(self, key, string): + """ + Delete a string from the AutoCompleter index. + Returns 1 if the string was found and deleted, 0 otherwise. + More information `here <https://oss.redis.com/redisearch/master/Commands/#ftsugdel>`_. # noqa + """ + return self.execute_command(SUGDEL_COMMAND, key, string) + + def sugget( + self, key, prefix, fuzzy=False, num=10, with_scores=False, + with_payloads=False + ): + """ + Get a list of suggestions from the AutoCompleter, for a given prefix. + More information `here <https://oss.redis.com/redisearch/master/Commands/#ftsugget>`_. # noqa + + Parameters: + + prefix : str + The prefix we are searching. **Must be valid ascii or utf-8** + fuzzy : bool + If set to true, the prefix search is done in fuzzy mode. + **NOTE**: Running fuzzy searches on short (<3 letters) prefixes + can be very + slow, and even scan the entire index. + with_scores : bool + If set to true, we also return the (refactored) score of + each suggestion. + This is normally not needed, and is NOT the original score + inserted into the index. + with_payloads : bool + Return suggestion payloads + num : int + The maximum number of results we return. Note that we might + return less. The algorithm trims irrelevant suggestions. + + Returns: + + list: + A list of Suggestion objects. If with_scores was False, the + score of all suggestions is 1. + """ + args = [SUGGET_COMMAND, key, prefix, "MAX", num] + if fuzzy: + args.append(FUZZY) + if with_scores: + args.append(WITHSCORES) + if with_payloads: + args.append(WITHPAYLOADS) + + ret = self.execute_command(*args) + results = [] + if not ret: + return results + + parser = SuggestionParser(with_scores, with_payloads, ret) + return [s for s in parser] + + def synupdate(self, groupid, skipinitial=False, *terms): + """ + Updates a synonym group. + The command is used to create or update a synonym group with + additional terms. + Only documents which were indexed after the update will be affected. + + Parameters: + + groupid : + Synonym group id. + skipinitial : bool + If set to true, we do not scan and index. + terms : + The terms. + """ + cmd = [SYNUPDATE_CMD, self.index_name, groupid] + if skipinitial: + cmd.extend(["SKIPINITIALSCAN"]) + cmd.extend(terms) + return self.execute_command(*cmd) + + def syndump(self): + """ + Dumps the contents of a synonym group. + + The command is used to dump the synonyms data structure. + Returns a list of synonym terms and their synonym group ids. + """ + raw = self.execute_command(SYNDUMP_CMD, self.index_name) + return {raw[i]: raw[i + 1] for i in range(0, len(raw), 2)} diff --git a/redis/commands/search/document.py b/redis/commands/search/document.py new file mode 100644 index 0000000..26ede34 --- /dev/null +++ b/redis/commands/search/document.py @@ -0,0 +1,16 @@ +import six + + +class Document(object): + """ + Represents a single document in a result set + """ + + def __init__(self, id, payload=None, **fields): + self.id = id + self.payload = payload + for k, v in six.iteritems(fields): + setattr(self, k, v) + + def __repr__(self): + return "Document %s" % self.__dict__ diff --git a/redis/commands/search/field.py b/redis/commands/search/field.py new file mode 100644 index 0000000..45114a4 --- /dev/null +++ b/redis/commands/search/field.py @@ -0,0 +1,94 @@ +class Field(object): + + NUMERIC = "NUMERIC" + TEXT = "TEXT" + WEIGHT = "WEIGHT" + GEO = "GEO" + TAG = "TAG" + SORTABLE = "SORTABLE" + NOINDEX = "NOINDEX" + AS = "AS" + + def __init__(self, name, args=[], sortable=False, + no_index=False, as_name=None): + self.name = name + self.args = args + self.args_suffix = list() + self.as_name = as_name + + if sortable: + self.args_suffix.append(Field.SORTABLE) + if no_index: + self.args_suffix.append(Field.NOINDEX) + + if no_index and not sortable: + raise ValueError("Non-Sortable non-Indexable fields are ignored") + + def append_arg(self, value): + self.args.append(value) + + def redis_args(self): + args = [self.name] + if self.as_name: + args += [self.AS, self.as_name] + args += self.args + args += self.args_suffix + return args + + +class TextField(Field): + """ + TextField is used to define a text field in a schema definition + """ + + NOSTEM = "NOSTEM" + PHONETIC = "PHONETIC" + + def __init__( + self, name, weight=1.0, no_stem=False, phonetic_matcher=None, **kwargs + ): + Field.__init__(self, name, + args=[Field.TEXT, Field.WEIGHT, weight], **kwargs) + + if no_stem: + Field.append_arg(self, self.NOSTEM) + if phonetic_matcher and phonetic_matcher in [ + "dm:en", + "dm:fr", + "dm:pt", + "dm:es", + ]: + Field.append_arg(self, self.PHONETIC) + Field.append_arg(self, phonetic_matcher) + + +class NumericField(Field): + """ + NumericField is used to define a numeric field in a schema definition + """ + + def __init__(self, name, **kwargs): + Field.__init__(self, name, args=[Field.NUMERIC], **kwargs) + + +class GeoField(Field): + """ + GeoField is used to define a geo-indexing field in a schema definition + """ + + def __init__(self, name, **kwargs): + Field.__init__(self, name, args=[Field.GEO], **kwargs) + + +class TagField(Field): + """ + TagField is a tag-indexing field with simpler compression and tokenization. + See http://redisearch.io/Tags/ + """ + + SEPARATOR = "SEPARATOR" + + def __init__(self, name, separator=",", **kwargs): + Field.__init__( + self, name, args=[Field.TAG, self.SEPARATOR, separator], **kwargs + ) diff --git a/redis/commands/search/indexDefinition.py b/redis/commands/search/indexDefinition.py new file mode 100644 index 0000000..4fbc609 --- /dev/null +++ b/redis/commands/search/indexDefinition.py @@ -0,0 +1,80 @@ +from enum import Enum + + +class IndexType(Enum): + """Enum of the currently supported index types.""" + + HASH = 1 + JSON = 2 + + +class IndexDefinition(object): + """IndexDefinition is used to define a index definition for automatic + indexing on Hash or Json update.""" + + def __init__( + self, + prefix=[], + filter=None, + language_field=None, + language=None, + score_field=None, + score=1.0, + payload_field=None, + index_type=None, + ): + self.args = [] + self._appendIndexType(index_type) + self._appendPrefix(prefix) + self._appendFilter(filter) + self._appendLanguage(language_field, language) + self._appendScore(score_field, score) + self._appendPayload(payload_field) + + def _appendIndexType(self, index_type): + """Append `ON HASH` or `ON JSON` according to the enum.""" + if index_type is IndexType.HASH: + self.args.extend(["ON", "HASH"]) + elif index_type is IndexType.JSON: + self.args.extend(["ON", "JSON"]) + elif index_type is not None: + raise RuntimeError("index_type must be one of {}". + format(list(IndexType))) + + def _appendPrefix(self, prefix): + """Append PREFIX.""" + if len(prefix) > 0: + self.args.append("PREFIX") + self.args.append(len(prefix)) + for p in prefix: + self.args.append(p) + + def _appendFilter(self, filter): + """Append FILTER.""" + if filter is not None: + self.args.append("FILTER") + self.args.append(filter) + + def _appendLanguage(self, language_field, language): + """Append LANGUAGE_FIELD and LANGUAGE.""" + if language_field is not None: + self.args.append("LANGUAGE_FIELD") + self.args.append(language_field) + if language is not None: + self.args.append("LANGUAGE") + self.args.append(language) + + def _appendScore(self, score_field, score): + """Append SCORE_FIELD and SCORE.""" + if score_field is not None: + self.args.append("SCORE_FIELD") + self.args.append(score_field) + if score is not None: + self.args.append("SCORE") + self.args.append(score) + + def _appendPayload(self, payload_field): + """Append PAYLOAD_FIELD.""" + if payload_field is not None: + self.args.append("PAYLOAD_FIELD") + self.args.append(payload_field) diff --git a/redis/commands/search/query.py b/redis/commands/search/query.py new file mode 100644 index 0000000..e2db7a4 --- /dev/null +++ b/redis/commands/search/query.py @@ -0,0 +1,328 @@ +import six + + +class Query(object): + """ + Query is used to build complex queries that have more parameters than just + the query string. The query string is set in the constructor, and other + options have setter functions. + + The setter functions return the query object, so they can be chained, + i.e. `Query("foo").verbatim().filter(...)` etc. + """ + + def __init__(self, query_string): + """ + Create a new query object. + The query string is set in the constructor, and other options have + setter functions. + """ + + self._query_string = query_string + self._offset = 0 + self._num = 10 + self._no_content = False + self._no_stopwords = False + self._fields = None + self._verbatim = False + self._with_payloads = False + self._with_scores = False + self._scorer = False + self._filters = list() + self._ids = None + self._slop = -1 + self._in_order = False + self._sortby = None + self._return_fields = [] + self._summarize_fields = [] + self._highlight_fields = [] + self._language = None + self._expander = None + + def query_string(self): + """Return the query string of this query only.""" + return self._query_string + + def limit_ids(self, *ids): + """Limit the results to a specific set of pre-known document + ids of any length.""" + self._ids = ids + return self + + def return_fields(self, *fields): + """Add fields to return fields.""" + self._return_fields += fields + return self + + def return_field(self, field, as_field=None): + """Add field to return fields (Optional: add 'AS' name + to the field).""" + self._return_fields.append(field) + if as_field is not None: + self._return_fields += ("AS", as_field) + return self + + def _mk_field_list(self, fields): + if not fields: + return [] + return \ + [fields] if isinstance(fields, six.string_types) else list(fields) + + def summarize(self, fields=None, context_len=None, + num_frags=None, sep=None): + """ + Return an abridged format of the field, containing only the segments of + the field which contain the matching term(s). + + If `fields` is specified, then only the mentioned fields are + summarized; otherwise all results are summarized. + + Server side defaults are used for each option (except `fields`) + if not specified + + - **fields** List of fields to summarize. All fields are summarized + if not specified + - **context_len** Amount of context to include with each fragment + - **num_frags** Number of fragments per document + - **sep** Separator string to separate fragments + """ + args = ["SUMMARIZE"] + fields = self._mk_field_list(fields) + if fields: + args += ["FIELDS", str(len(fields))] + fields + + if context_len is not None: + args += ["LEN", str(context_len)] + if num_frags is not None: + args += ["FRAGS", str(num_frags)] + if sep is not None: + args += ["SEPARATOR", sep] + + self._summarize_fields = args + return self + + def highlight(self, fields=None, tags=None): + """ + Apply specified markup to matched term(s) within the returned field(s). + + - **fields** If specified then only those mentioned fields are + highlighted, otherwise all fields are highlighted + - **tags** A list of two strings to surround the match. + """ + args = ["HIGHLIGHT"] + fields = self._mk_field_list(fields) + if fields: + args += ["FIELDS", str(len(fields))] + fields + if tags: + args += ["TAGS"] + list(tags) + + self._highlight_fields = args + return self + + def language(self, language): + """ + Analyze the query as being in the specified language. + + :param language: The language (e.g. `chinese` or `english`) + """ + self._language = language + return self + + def slop(self, slop): + """Allow a maximum of N intervening non matched terms between + phrase terms (0 means exact phrase). + """ + self._slop = slop + return self + + def in_order(self): + """ + Match only documents where the query terms appear in + the same order in the document. + i.e. for the query "hello world", we do not match "world hello" + """ + self._in_order = True + return self + + def scorer(self, scorer): + """ + Use a different scoring function to evaluate document relevance. + Default is `TFIDF`. + + :param scorer: The scoring function to use + (e.g. `TFIDF.DOCNORM` or `BM25`) + """ + self._scorer = scorer + return self + + def get_args(self): + """Format the redis arguments for this query and return them.""" + args = [self._query_string] + args += self._get_args_tags() + args += self._summarize_fields + self._highlight_fields + args += ["LIMIT", self._offset, self._num] + return args + + def _get_args_tags(self): + args = [] + if self._no_content: + args.append("NOCONTENT") + if self._fields: + args.append("INFIELDS") + args.append(len(self._fields)) + args += self._fields + if self._verbatim: + args.append("VERBATIM") + if self._no_stopwords: + args.append("NOSTOPWORDS") + if self._filters: + for flt in self._filters: + if not isinstance(flt, Filter): + raise AttributeError("Did not receive a Filter object.") + args += flt.args + if self._with_payloads: + args.append("WITHPAYLOADS") + if self._scorer: + args += ["SCORER", self._scorer] + if self._with_scores: + args.append("WITHSCORES") + if self._ids: + args.append("INKEYS") + args.append(len(self._ids)) + args += self._ids + if self._slop >= 0: + args += ["SLOP", self._slop] + if self._in_order: + args.append("INORDER") + if self._return_fields: + args.append("RETURN") + args.append(len(self._return_fields)) + args += self._return_fields + if self._sortby: + if not isinstance(self._sortby, SortbyField): + raise AttributeError("Did not receive a SortByField.") + args.append("SORTBY") + args += self._sortby.args + if self._language: + args += ["LANGUAGE", self._language] + if self._expander: + args += ["EXPANDER", self._expander] + + return args + + def paging(self, offset, num): + """ + Set the paging for the query (defaults to 0..10). + + - **offset**: Paging offset for the results. Defaults to 0 + - **num**: How many results do we want + """ + self._offset = offset + self._num = num + return self + + def verbatim(self): + """Set the query to be verbatim, i.e. use no query expansion + or stemming. + """ + self._verbatim = True + return self + + def no_content(self): + """Set the query to only return ids and not the document content.""" + self._no_content = True + return self + + def no_stopwords(self): + """ + Prevent the query from being filtered for stopwords. + Only useful in very big queries that you are certain contain + no stopwords. + """ + self._no_stopwords = True + return self + + def with_payloads(self): + """Ask the engine to return document payloads.""" + self._with_payloads = True + return self + + def with_scores(self): + """Ask the engine to return document search scores.""" + self._with_scores = True + return self + + def limit_fields(self, *fields): + """ + Limit the search to specific TEXT fields only. + + - **fields**: A list of strings, case sensitive field names + from the defined schema. + """ + self._fields = fields + return self + + def add_filter(self, flt): + """ + Add a numeric or geo filter to the query. + **Currently only one of each filter is supported by the engine** + + - **flt**: A NumericFilter or GeoFilter object, used on a + corresponding field + """ + + self._filters.append(flt) + return self + + def sort_by(self, field, asc=True): + """ + Add a sortby field to the query. + + - **field** - the name of the field to sort by + - **asc** - when `True`, sorting will be done in asceding order + """ + self._sortby = SortbyField(field, asc) + return self + + def expander(self, expander): + """ + Add a expander field to the query. + + - **expander** - the name of the expander + """ + self._expander = expander + return self + + +class Filter(object): + def __init__(self, keyword, field, *args): + self.args = [keyword, field] + list(args) + + +class NumericFilter(Filter): + INF = "+inf" + NEG_INF = "-inf" + + def __init__(self, field, minval, maxval, minExclusive=False, + maxExclusive=False): + args = [ + minval if not minExclusive else "({}".format(minval), + maxval if not maxExclusive else "({}".format(maxval), + ] + + Filter.__init__(self, "FILTER", field, *args) + + +class GeoFilter(Filter): + METERS = "m" + KILOMETERS = "km" + FEET = "ft" + MILES = "mi" + + def __init__(self, field, lon, lat, radius, unit=KILOMETERS): + Filter.__init__(self, "GEOFILTER", field, lon, lat, radius, unit) + + +class SortbyField(object): + def __init__(self, field, asc=True): + self.args = [field, "ASC" if asc else "DESC"] diff --git a/redis/commands/search/querystring.py b/redis/commands/search/querystring.py new file mode 100644 index 0000000..f5f59b7 --- /dev/null +++ b/redis/commands/search/querystring.py @@ -0,0 +1,324 @@ +from six import string_types, integer_types + + +def tags(*t): + """ + Indicate that the values should be matched to a tag field + + ### Parameters + + - **t**: Tags to search for + """ + if not t: + raise ValueError("At least one tag must be specified") + return TagValue(*t) + + +def between(a, b, inclusive_min=True, inclusive_max=True): + """ + Indicate that value is a numeric range + """ + return RangeValue(a, b, inclusive_min=inclusive_min, + inclusive_max=inclusive_max) + + +def equal(n): + """ + Match a numeric value + """ + return between(n, n) + + +def lt(n): + """ + Match any value less than n + """ + return between(None, n, inclusive_max=False) + + +def le(n): + """ + Match any value less or equal to n + """ + return between(None, n, inclusive_max=True) + + +def gt(n): + """ + Match any value greater than n + """ + return between(n, None, inclusive_min=False) + + +def ge(n): + """ + Match any value greater or equal to n + """ + return between(n, None, inclusive_min=True) + + +def geo(lat, lon, radius, unit="km"): + """ + Indicate that value is a geo region + """ + return GeoValue(lat, lon, radius, unit) + + +class Value(object): + @property + def combinable(self): + """ + Whether this type of value may be combined with other values + for the same field. This makes the filter potentially more efficient + """ + return False + + @staticmethod + def make_value(v): + """ + Convert an object to a value, if it is not a value already + """ + if isinstance(v, Value): + return v + return ScalarValue(v) + + def to_string(self): + raise NotImplementedError() + + def __str__(self): + return self.to_string() + + +class RangeValue(Value): + combinable = False + + def __init__(self, a, b, inclusive_min=False, inclusive_max=False): + if a is None: + a = "-inf" + if b is None: + b = "inf" + self.range = [str(a), str(b)] + self.inclusive_min = inclusive_min + self.inclusive_max = inclusive_max + + def to_string(self): + return "[{1}{0[0]} {2}{0[1]}]".format( + self.range, + "(" if not self.inclusive_min else "", + "(" if not self.inclusive_max else "", + ) + + +class ScalarValue(Value): + combinable = True + + def __init__(self, v): + self.v = str(v) + + def to_string(self): + return self.v + + +class TagValue(Value): + combinable = False + + def __init__(self, *tags): + self.tags = tags + + def to_string(self): + return "{" + " | ".join(str(t) for t in self.tags) + "}" + + +class GeoValue(Value): + def __init__(self, lon, lat, radius, unit="km"): + self.lon = lon + self.lat = lat + self.radius = radius + self.unit = unit + + +class Node(object): + def __init__(self, *children, **kwparams): + """ + Create a node + + ### Parameters + + - **children**: One or more sub-conditions. These can be additional + `intersect`, `disjunct`, `union`, `optional`, or any other `Node` + type. + + The semantics of multiple conditions are dependent on the type of + query. For an `intersection` node, this amounts to a logical AND, + for a `union` node, this amounts to a logical `OR`. + + - **kwparams**: key-value parameters. Each key is the name of a field, + and the value should be a field value. This can be one of the + following: + + - Simple string (for text field matches) + - value returned by one of the helper functions + - list of either a string or a value + + + ### Examples + + Field `num` should be between 1 and 10 + ``` + intersect(num=between(1, 10) + ``` + + Name can either be `bob` or `john` + + ``` + union(name=("bob", "john")) + ``` + + Don't select countries in Israel, Japan, or US + + ``` + disjunct_union(country=("il", "jp", "us")) + ``` + """ + + self.params = [] + + kvparams = {} + for k, v in kwparams.items(): + curvals = kvparams.setdefault(k, []) + if isinstance(v, (string_types, integer_types, float)): + curvals.append(Value.make_value(v)) + elif isinstance(v, Value): + curvals.append(v) + else: + curvals.extend(Value.make_value(subv) for subv in v) + + self.params += [Node.to_node(p) for p in children] + + for k, v in kvparams.items(): + self.params.extend(self.join_fields(k, v)) + + def join_fields(self, key, vals): + if len(vals) == 1: + return [BaseNode("@{}:{}".format(key, vals[0].to_string()))] + if not vals[0].combinable: + return [BaseNode("@{}:{}".format(key, + v.to_string())) for v in vals] + s = BaseNode( + "@{}:({})".format(key, + self.JOINSTR.join(v.to_string() for v in vals)) + ) + return [s] + + @classmethod + def to_node(cls, obj): # noqa + if isinstance(obj, Node): + return obj + return BaseNode(obj) + + @property + def JOINSTR(self): + raise NotImplementedError() + + def to_string(self, with_parens=None): + with_parens = self._should_use_paren(with_parens) + pre, post = ("(", ")") if with_parens else ("", "") + return "{}{}{}".format( + pre, self.JOINSTR.join(n.to_string() for n in self.params), post + ) + + def _should_use_paren(self, optval): + if optval is not None: + return optval + return len(self.params) > 1 + + def __str__(self): + return self.to_string() + + +class BaseNode(Node): + def __init__(self, s): + super(BaseNode, self).__init__() + self.s = str(s) + + def to_string(self, with_parens=None): + return self.s + + +class IntersectNode(Node): + """ + Create an intersection node. All children need to be satisfied in order for + this node to evaluate as true + """ + + JOINSTR = " " + + +class UnionNode(Node): + """ + Create a union node. Any of the children need to be satisfied in order for + this node to evaluate as true + """ + + JOINSTR = "|" + + +class DisjunctNode(IntersectNode): + """ + Create a disjunct node. In order for this node to be true, all of its + children must evaluate to false + """ + + def to_string(self, with_parens=None): + with_parens = self._should_use_paren(with_parens) + ret = super(DisjunctNode, self).to_string(with_parens=False) + if with_parens: + return "(-" + ret + ")" + else: + return "-" + ret + + +class DistjunctUnion(DisjunctNode): + """ + This node is true if *all* of its children are false. This is equivalent to + ``` + disjunct(union(...)) + ``` + """ + + JOINSTR = "|" + + +class OptionalNode(IntersectNode): + """ + Create an optional node. If this nodes evaluates to true, then the document + will be rated higher in score/rank. + """ + + def to_string(self, with_parens=None): + with_parens = self._should_use_paren(with_parens) + ret = super(OptionalNode, self).to_string(with_parens=False) + if with_parens: + return "(~" + ret + ")" + else: + return "~" + ret + + +def intersect(*args, **kwargs): + return IntersectNode(*args, **kwargs) + + +def union(*args, **kwargs): + return UnionNode(*args, **kwargs) + + +def disjunct(*args, **kwargs): + return DisjunctNode(*args, **kwargs) + + +def disjunct_union(*args, **kwargs): + return DistjunctUnion(*args, **kwargs) + + +def querystring(*args, **kwargs): + return intersect(*args, **kwargs).to_string() diff --git a/redis/commands/search/reducers.py b/redis/commands/search/reducers.py new file mode 100644 index 0000000..6cbbf2f --- /dev/null +++ b/redis/commands/search/reducers.py @@ -0,0 +1,178 @@ +from .aggregation import Reducer, SortDirection + + +class FieldOnlyReducer(Reducer): + def __init__(self, field): + super(FieldOnlyReducer, self).__init__(field) + self._field = field + + +class count(Reducer): + """ + Counts the number of results in the group + """ + + NAME = "COUNT" + + def __init__(self): + super(count, self).__init__() + + +class sum(FieldOnlyReducer): + """ + Calculates the sum of all the values in the given fields within the group + """ + + NAME = "SUM" + + def __init__(self, field): + super(sum, self).__init__(field) + + +class min(FieldOnlyReducer): + """ + Calculates the smallest value in the given field within the group + """ + + NAME = "MIN" + + def __init__(self, field): + super(min, self).__init__(field) + + +class max(FieldOnlyReducer): + """ + Calculates the largest value in the given field within the group + """ + + NAME = "MAX" + + def __init__(self, field): + super(max, self).__init__(field) + + +class avg(FieldOnlyReducer): + """ + Calculates the mean value in the given field within the group + """ + + NAME = "AVG" + + def __init__(self, field): + super(avg, self).__init__(field) + + +class tolist(FieldOnlyReducer): + """ + Returns all the matched properties in a list + """ + + NAME = "TOLIST" + + def __init__(self, field): + super(tolist, self).__init__(field) + + +class count_distinct(FieldOnlyReducer): + """ + Calculate the number of distinct values contained in all the results in + the group for the given field + """ + + NAME = "COUNT_DISTINCT" + + def __init__(self, field): + super(count_distinct, self).__init__(field) + + +class count_distinctish(FieldOnlyReducer): + """ + Calculate the number of distinct values contained in all the results in the + group for the given field. This uses a faster algorithm than + `count_distinct` but is less accurate + """ + + NAME = "COUNT_DISTINCTISH" + + +class quantile(Reducer): + """ + Return the value for the nth percentile within the range of values for the + field within the group. + """ + + NAME = "QUANTILE" + + def __init__(self, field, pct): + super(quantile, self).__init__(field, str(pct)) + self._field = field + + +class stddev(FieldOnlyReducer): + """ + Return the standard deviation for the values within the group + """ + + NAME = "STDDEV" + + def __init__(self, field): + super(stddev, self).__init__(field) + + +class first_value(Reducer): + """ + Selects the first value within the group according to sorting parameters + """ + + NAME = "FIRST_VALUE" + + def __init__(self, field, *byfields): + """ + Selects the first value of the given field within the group. + + ### Parameter + + - **field**: Source field used for the value + - **byfields**: How to sort the results. This can be either the + *class* of `aggregation.Asc` or `aggregation.Desc` in which + case the field `field` is also used as the sort input. + + `byfields` can also be one or more *instances* of `Asc` or `Desc` + indicating the sort order for these fields + """ + + fieldstrs = [] + if ( + len(byfields) == 1 + and isinstance(byfields[0], type) + and issubclass(byfields[0], SortDirection) + ): + byfields = [byfields[0](field)] + + for f in byfields: + fieldstrs += [f.field, f.DIRSTRING] + + args = [field] + if fieldstrs: + args += ["BY"] + fieldstrs + super(first_value, self).__init__(*args) + self._field = field + + +class random_sample(Reducer): + """ + Returns a random sample of items from the dataset, from the given property + """ + + NAME = "RANDOM_SAMPLE" + + def __init__(self, field, size): + """ + ### Parameter + + **field**: Field to sample from + **size**: Return this many items (can be less) + """ + args = [field, str(size)] + super(random_sample, self).__init__(*args) + self._field = field diff --git a/redis/commands/search/result.py b/redis/commands/search/result.py new file mode 100644 index 0000000..afc83f8 --- /dev/null +++ b/redis/commands/search/result.py @@ -0,0 +1,75 @@ +from six.moves import xrange, zip as izip + +from .document import Document +from ._util import to_string + + +class Result(object): + """ + Represents the result of a search query, and has an array of Document + objects + """ + + def __init__( + self, res, hascontent, duration=0, has_payload=False, with_scores=False + ): + """ + - **snippets**: An optional dictionary of the form + {field: snippet_size} for snippet formatting + """ + + self.total = res[0] + self.duration = duration + self.docs = [] + + step = 1 + if hascontent: + step = step + 1 + if has_payload: + step = step + 1 + if with_scores: + step = step + 1 + + offset = 2 if with_scores else 1 + + for i in xrange(1, len(res), step): + id = to_string(res[i]) + payload = to_string(res[i + offset]) if has_payload else None + # fields_offset = 2 if has_payload else 1 + fields_offset = offset + 1 if has_payload else offset + score = float(res[i + 1]) if with_scores else None + + fields = {} + if hascontent: + fields = ( + dict( + dict( + izip( + map(to_string, res[i + fields_offset][::2]), + map(to_string, res[i + fields_offset][1::2]), + ) + ) + ) + if hascontent + else {} + ) + try: + del fields["id"] + except KeyError: + pass + + try: + fields["json"] = fields["$"] + del fields["$"] + except KeyError: + pass + + doc = ( + Document(id, score=score, payload=payload, **fields) + if with_scores + else Document(id, payload=payload, **fields) + ) + self.docs.append(doc) + + def __repr__(self): + return "Result{%d total, docs: %s}" % (self.total, self.docs) diff --git a/redis/commands/search/suggestion.py b/redis/commands/search/suggestion.py new file mode 100644 index 0000000..550c514 --- /dev/null +++ b/redis/commands/search/suggestion.py @@ -0,0 +1,54 @@ +from six.moves import xrange +from ._util import to_string + + +class Suggestion(object): + """ + Represents a single suggestion being sent or returned from the + autocomplete server + """ + + def __init__(self, string, score=1.0, payload=None): + self.string = to_string(string) + self.payload = to_string(payload) + self.score = score + + def __repr__(self): + return self.string + + +class SuggestionParser(object): + """ + Internal class used to parse results from the `SUGGET` command. + This needs to consume either 1, 2, or 3 values at a time from + the return value depending on what objects were requested + """ + + def __init__(self, with_scores, with_payloads, ret): + self.with_scores = with_scores + self.with_payloads = with_payloads + + if with_scores and with_payloads: + self.sugsize = 3 + self._scoreidx = 1 + self._payloadidx = 2 + elif with_scores: + self.sugsize = 2 + self._scoreidx = 1 + elif with_payloads: + self.sugsize = 2 + self._payloadidx = 1 + else: + self.sugsize = 1 + self._scoreidx = -1 + + self._sugs = ret + + def __iter__(self): + for i in xrange(0, len(self._sugs), self.sugsize): + ss = self._sugs[i] + score = float(self._sugs[i + self._scoreidx]) \ + if self.with_scores else 1.0 + payload = self._sugs[i + self._payloadidx] \ + if self.with_payloads else None + yield Suggestion(ss, score, payload) |