diff options
author | dvora-h <67596500+dvora-h@users.noreply.github.com> | 2022-03-08 09:25:40 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-03-08 09:25:40 +0200 |
commit | 5bf9034266e89b9573a4efe2d4bbe56a961409eb (patch) | |
tree | f5a1d826f87e6c72ee1ade04563d57f0e0f512c8 /redis/commands | |
parent | 1f2259fa3078d38048060a429837fb13f397686e (diff) | |
download | redis-py-5bf9034266e89b9573a4efe2d4bbe56a961409eb.tar.gz |
Add pipeline support for search (#2038)
Diffstat (limited to 'redis/commands')
-rw-r--r-- | redis/commands/search/__init__.py | 30 | ||||
-rw-r--r-- | redis/commands/search/commands.py | 31 |
2 files changed, 45 insertions, 16 deletions
diff --git a/redis/commands/search/__init__.py b/redis/commands/search/__init__.py index 94bc037..e9763b6 100644 --- a/redis/commands/search/__init__.py +++ b/redis/commands/search/__init__.py @@ -1,3 +1,5 @@ +import redis + from .commands import SearchCommands @@ -17,7 +19,7 @@ class Search(SearchCommands): self.client = client self.execute_command = client.execute_command - self.pipeline = client.pipeline(transaction=False, shard_hint=None) + self._pipeline = client.pipeline(transaction=False, shard_hint=None) self.total = 0 self.chunk_size = chunk_size self.current_chunk = 0 @@ -42,7 +44,7 @@ class Search(SearchCommands): """ self.client._add_document( doc_id, - conn=self.pipeline, + conn=self._pipeline, nosave=nosave, score=score, payload=payload, @@ -67,7 +69,7 @@ class Search(SearchCommands): """ self.client._add_document_hash( doc_id, - conn=self.pipeline, + conn=self._pipeline, score=score, replace=replace, ) @@ -80,7 +82,7 @@ class Search(SearchCommands): """ Manually commit and flush the batch indexing query """ - self.pipeline.execute() + self._pipeline.execute() self.current_chunk = 0 def __init__(self, client, index_name="idx"): @@ -90,7 +92,25 @@ class Search(SearchCommands): If conn is not None, we employ an already existing redis connection """ + self.MODULE_CALLBACKS = {} self.client = client self.index_name = index_name self.execute_command = client.execute_command - self.pipeline = client.pipeline + self._pipeline = client.pipeline + + def pipeline(self, transaction=True, shard_hint=None): + """Creates a pipeline for the SEARCH module, that can be used for executing + SEARCH commands, as well as classic core commands. + """ + p = Pipeline( + connection_pool=self.client.connection_pool, + response_callbacks=self.MODULE_CALLBACKS, + transaction=transaction, + shard_hint=shard_hint, + ) + p.index_name = self.index_name + return p + + +class Pipeline(SearchCommands, redis.client.Pipeline): + """Pipeline for the module.""" diff --git a/redis/commands/search/commands.py b/redis/commands/search/commands.py index 39c599f..158beec 100644 --- a/redis/commands/search/commands.py +++ b/redis/commands/search/commands.py @@ -2,6 +2,8 @@ import itertools import time from typing import Dict, Union +from redis.client import Pipeline + from ..helpers import parse_to_dict from ._util import to_string from .aggregation import AggregateRequest, AggregateResult, Cursor @@ -186,8 +188,6 @@ class SearchCommands: """ Internal add_document used for both batch and single doc indexing """ - if conn is None: - conn = self.client if partial or no_create: replace = True @@ -208,7 +208,11 @@ class SearchCommands: args += ["LANGUAGE", language] args.append("FIELDS") args += list(itertools.chain(*fields.items())) - return conn.execute_command(*args) + + if conn is not None: + return conn.execute_command(*args) + + return self.execute_command(*args) def _add_document_hash( self, @@ -221,8 +225,6 @@ class SearchCommands: """ Internal add_document_hash used for both batch and single doc indexing """ - if conn is None: - conn = self.client args = [ADDHASH_CMD, self.index_name, doc_id, score] @@ -232,7 +234,10 @@ class SearchCommands: if language: args += ["LANGUAGE", language] - return conn.execute_command(*args) + if conn is not None: + return conn.execute_command(*args) + + return self.execute_command(*args) def add_document( self, @@ -331,12 +336,13 @@ class SearchCommands: For more information: https://oss.redis.com/redisearch/Commands/#ftdel """ # noqa args = [DEL_CMD, self.index_name, doc_id] - if conn is None: - conn = self.client if delete_actual_document: args.append("DD") - return conn.execute_command(*args) + if conn is not None: + return conn.execute_command(*args) + + return self.execute_command(*args) def load_document(self, id): """ @@ -364,7 +370,7 @@ class SearchCommands: For more information https://oss.redis.com/redisearch/Commands/#ftget """ - return self.client.execute_command(MGET_CMD, self.index_name, *ids) + return self.execute_command(MGET_CMD, self.index_name, *ids) def info(self): """ @@ -374,7 +380,7 @@ class SearchCommands: For more information https://oss.redis.com/redisearch/Commands/#ftinfo """ - res = self.client.execute_command(INFO_CMD, self.index_name) + res = self.execute_command(INFO_CMD, self.index_name) it = map(to_string, res) return dict(zip(it, it)) @@ -423,6 +429,9 @@ class SearchCommands: st = time.time() res = self.execute_command(SEARCH_CMD, *args) + if isinstance(res, Pipeline): + return res + return Result( res, not query._no_content, |