summaryrefslogtreecommitdiff
path: root/redis/commands
diff options
context:
space:
mode:
authordvora-h <67596500+dvora-h@users.noreply.github.com>2022-03-08 09:25:40 +0200
committerGitHub <noreply@github.com>2022-03-08 09:25:40 +0200
commit5bf9034266e89b9573a4efe2d4bbe56a961409eb (patch)
treef5a1d826f87e6c72ee1ade04563d57f0e0f512c8 /redis/commands
parent1f2259fa3078d38048060a429837fb13f397686e (diff)
downloadredis-py-5bf9034266e89b9573a4efe2d4bbe56a961409eb.tar.gz
Add pipeline support for search (#2038)
Diffstat (limited to 'redis/commands')
-rw-r--r--redis/commands/search/__init__.py30
-rw-r--r--redis/commands/search/commands.py31
2 files changed, 45 insertions, 16 deletions
diff --git a/redis/commands/search/__init__.py b/redis/commands/search/__init__.py
index 94bc037..e9763b6 100644
--- a/redis/commands/search/__init__.py
+++ b/redis/commands/search/__init__.py
@@ -1,3 +1,5 @@
+import redis
+
from .commands import SearchCommands
@@ -17,7 +19,7 @@ class Search(SearchCommands):
self.client = client
self.execute_command = client.execute_command
- self.pipeline = client.pipeline(transaction=False, shard_hint=None)
+ self._pipeline = client.pipeline(transaction=False, shard_hint=None)
self.total = 0
self.chunk_size = chunk_size
self.current_chunk = 0
@@ -42,7 +44,7 @@ class Search(SearchCommands):
"""
self.client._add_document(
doc_id,
- conn=self.pipeline,
+ conn=self._pipeline,
nosave=nosave,
score=score,
payload=payload,
@@ -67,7 +69,7 @@ class Search(SearchCommands):
"""
self.client._add_document_hash(
doc_id,
- conn=self.pipeline,
+ conn=self._pipeline,
score=score,
replace=replace,
)
@@ -80,7 +82,7 @@ class Search(SearchCommands):
"""
Manually commit and flush the batch indexing query
"""
- self.pipeline.execute()
+ self._pipeline.execute()
self.current_chunk = 0
def __init__(self, client, index_name="idx"):
@@ -90,7 +92,25 @@ class Search(SearchCommands):
If conn is not None, we employ an already existing redis connection
"""
+ self.MODULE_CALLBACKS = {}
self.client = client
self.index_name = index_name
self.execute_command = client.execute_command
- self.pipeline = client.pipeline
+ self._pipeline = client.pipeline
+
+ def pipeline(self, transaction=True, shard_hint=None):
+ """Creates a pipeline for the SEARCH module, that can be used for executing
+ SEARCH commands, as well as classic core commands.
+ """
+ p = Pipeline(
+ connection_pool=self.client.connection_pool,
+ response_callbacks=self.MODULE_CALLBACKS,
+ transaction=transaction,
+ shard_hint=shard_hint,
+ )
+ p.index_name = self.index_name
+ return p
+
+
+class Pipeline(SearchCommands, redis.client.Pipeline):
+ """Pipeline for the module."""
diff --git a/redis/commands/search/commands.py b/redis/commands/search/commands.py
index 39c599f..158beec 100644
--- a/redis/commands/search/commands.py
+++ b/redis/commands/search/commands.py
@@ -2,6 +2,8 @@ import itertools
import time
from typing import Dict, Union
+from redis.client import Pipeline
+
from ..helpers import parse_to_dict
from ._util import to_string
from .aggregation import AggregateRequest, AggregateResult, Cursor
@@ -186,8 +188,6 @@ class SearchCommands:
"""
Internal add_document used for both batch and single doc indexing
"""
- if conn is None:
- conn = self.client
if partial or no_create:
replace = True
@@ -208,7 +208,11 @@ class SearchCommands:
args += ["LANGUAGE", language]
args.append("FIELDS")
args += list(itertools.chain(*fields.items()))
- return conn.execute_command(*args)
+
+ if conn is not None:
+ return conn.execute_command(*args)
+
+ return self.execute_command(*args)
def _add_document_hash(
self,
@@ -221,8 +225,6 @@ class SearchCommands:
"""
Internal add_document_hash used for both batch and single doc indexing
"""
- if conn is None:
- conn = self.client
args = [ADDHASH_CMD, self.index_name, doc_id, score]
@@ -232,7 +234,10 @@ class SearchCommands:
if language:
args += ["LANGUAGE", language]
- return conn.execute_command(*args)
+ if conn is not None:
+ return conn.execute_command(*args)
+
+ return self.execute_command(*args)
def add_document(
self,
@@ -331,12 +336,13 @@ class SearchCommands:
For more information: https://oss.redis.com/redisearch/Commands/#ftdel
""" # noqa
args = [DEL_CMD, self.index_name, doc_id]
- if conn is None:
- conn = self.client
if delete_actual_document:
args.append("DD")
- return conn.execute_command(*args)
+ if conn is not None:
+ return conn.execute_command(*args)
+
+ return self.execute_command(*args)
def load_document(self, id):
"""
@@ -364,7 +370,7 @@ class SearchCommands:
For more information https://oss.redis.com/redisearch/Commands/#ftget
"""
- return self.client.execute_command(MGET_CMD, self.index_name, *ids)
+ return self.execute_command(MGET_CMD, self.index_name, *ids)
def info(self):
"""
@@ -374,7 +380,7 @@ class SearchCommands:
For more information https://oss.redis.com/redisearch/Commands/#ftinfo
"""
- res = self.client.execute_command(INFO_CMD, self.index_name)
+ res = self.execute_command(INFO_CMD, self.index_name)
it = map(to_string, res)
return dict(zip(it, it))
@@ -423,6 +429,9 @@ class SearchCommands:
st = time.time()
res = self.execute_command(SEARCH_CMD, *args)
+ if isinstance(res, Pipeline):
+ return res
+
return Result(
res,
not query._no_content,