summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJake Barnwell <2320567+jakebarnwell@users.noreply.github.com>2022-02-22 07:07:22 -0500
committerGitHub <noreply@github.com>2022-02-22 14:07:22 +0200
commite5ac39ac7b2728d14bfc27aac989b1085b7f6199 (patch)
treedc4e64f23a73f5b85d00bacc1e6f194d2af04c9b
parent1983905d5adceaba2c3b27ba8f569dcb5387cc35 (diff)
downloadredis-py-e5ac39ac7b2728d14bfc27aac989b1085b7f6199.tar.gz
Add cluster support for scripting (#1937)
* Add cluster support for scripting * Fall back to connection_pool.get_encoder if necessary * Add documentation for cluster-based scripting * Add test for flush response Co-authored-by: dvora-h <dvora.heller@redis.com>
-rw-r--r--README.md34
-rw-r--r--redis/cluster.py87
-rw-r--r--redis/commands/cluster.py9
-rw-r--r--redis/commands/core.py56
-rw-r--r--redis/commands/parser.py17
-rw-r--r--redis/utils.py2
-rw-r--r--tests/test_command_parser.py15
-rw-r--r--tests/test_scripting.py94
8 files changed, 285 insertions, 29 deletions
diff --git a/README.md b/README.md
index c6ee3c1..0735055 100644
--- a/README.md
+++ b/README.md
@@ -862,7 +862,8 @@ Monitor object to block until a command is received.
redis-py supports the EVAL, EVALSHA, and SCRIPT commands. However, there
are a number of edge cases that make these commands tedious to use in
real world scenarios. Therefore, redis-py exposes a Script object that
-makes scripting much easier to use.
+makes scripting much easier to use. (RedisClusters have limited support for
+scripting.)
To create a Script instance, use the register_script
function on a client instance passing the Lua code as the first
@@ -955,7 +956,7 @@ C 3
### Cluster Mode
-redis-py is now supports cluster mode and provides a client for
+redis-py now supports cluster mode and provides a client for
[Redis Cluster](<https://redis.io/topics/cluster-tutorial>).
The cluster client is based on Grokzen's
@@ -963,6 +964,8 @@ The cluster client is based on Grokzen's
fixes, and now supersedes that library. Support for these changes is thanks to
his contributions.
+To learn more about Redis Cluster, see
+[Redis Cluster specifications](https://redis.io/topics/cluster-spec).
**Create RedisCluster:**
@@ -1218,10 +1221,29 @@ according to their respective destination nodes. This means that we can not
turn the pipeline commands into one transaction block, because in most cases
they are split up into several smaller pipelines.
-
-See [Redis Cluster tutorial](https://redis.io/topics/cluster-tutorial) and
-[Redis Cluster specifications](https://redis.io/topics/cluster-spec)
-to learn more about Redis Cluster.
+**Lua Scripting in Cluster Mode**
+
+Cluster mode has limited support for lua scripting.
+
+The following commands are supported, with caveats:
+- `EVAL` and `EVALSHA`: The command is sent to the relevant node, depending on
+the keys (i.e., in `EVAL "<script>" num_keys key_1 ... key_n ...`). The keys
+_must_ all be on the same node. If the script requires 0 keys, _the command is
+sent to a random (primary) node_.
+- `SCRIPT EXISTS`: The command is sent to all primaries. The result is a list
+of booleans corresponding to the input SHA hashes. Each boolean is an AND of
+"does the script exist on each node?". In other words, each boolean is True iff
+the script exists on all nodes.
+- `SCRIPT FLUSH`: The command is sent to all primaries. The result is a bool
+AND over all nodes' responses.
+- `SCRIPT LOAD`: The command is sent to all primaries. The result is the SHA1
+digest.
+
+The following commands are not supported:
+- `EVAL_RO`
+- `EVALSHA_RO`
+
+Using scripting within pipelines in cluster mode is **not supported**.
### Author
diff --git a/redis/cluster.py b/redis/cluster.py
index 7151cfe..b8d6b19 100644
--- a/redis/cluster.py
+++ b/redis/cluster.py
@@ -289,6 +289,9 @@ class RedisCluster(RedisClusterCommands):
[
"FLUSHALL",
"FLUSHDB",
+ "SCRIPT EXISTS",
+ "SCRIPT FLUSH",
+ "SCRIPT LOAD",
],
PRIMARIES,
),
@@ -379,6 +382,24 @@ class RedisCluster(RedisClusterCommands):
],
parse_scan_result,
),
+ list_keys_to_dict(
+ [
+ "SCRIPT LOAD",
+ ],
+ lambda command, res: list(res.values()).pop(),
+ ),
+ list_keys_to_dict(
+ [
+ "SCRIPT EXISTS",
+ ],
+ lambda command, res: [all(k) for k in zip(*res.values())],
+ ),
+ list_keys_to_dict(
+ [
+ "SCRIPT FLUSH",
+ ],
+ lambda command, res: all(res.values()),
+ ),
)
ERRORS_ALLOW_RETRY = (
@@ -778,40 +799,70 @@ class RedisCluster(RedisClusterCommands):
"""
Get the keys in the command. If the command has no keys in in, None is
returned.
+
+ NOTE: Due to a bug in redis<7.0, this function does not work properly
+ for EVAL or EVALSHA when the `numkeys` arg is 0.
+ - issue: https://github.com/redis/redis/issues/9493
+ - fix: https://github.com/redis/redis/pull/9733
+
+ So, don't use this function with EVAL or EVALSHA.
"""
redis_conn = self.get_default_node().redis_connection
return self.commands_parser.get_keys(redis_conn, *args)
def determine_slot(self, *args):
"""
- Figure out what slot based on command and args
+ Figure out what slot to use based on args.
+
+ Raises a RedisClusterException if there's a missing key and we can't
+ determine what slots to map the command to; or, if the keys don't
+ all map to the same key slot.
"""
- if self.command_flags.get(args[0]) == SLOT_ID:
+ command = args[0]
+ if self.command_flags.get(command) == SLOT_ID:
# The command contains the slot ID
return args[1]
# Get the keys in the command
- keys = self._get_command_keys(*args)
- if keys is None or len(keys) == 0:
- raise RedisClusterException(
- "No way to dispatch this command to Redis Cluster. "
- "Missing key.\nYou can execute the command by specifying "
- f"target nodes.\nCommand: {args}"
- )
- if len(keys) > 1:
- # multi-key command, we need to make sure all keys are mapped to
- # the same slot
- slots = {self.keyslot(key) for key in keys}
- if len(slots) != 1:
+ # EVAL and EVALSHA are common enough that it's wasteful to go to the
+ # redis server to parse the keys. Besides, there is a bug in redis<7.0
+ # where `self._get_command_keys()` fails anyway. So, we special case
+ # EVAL/EVALSHA.
+ if command in ("EVAL", "EVALSHA"):
+ # command syntax: EVAL "script body" num_keys ...
+ if len(args) <= 2:
+ raise RedisClusterException(f"Invalid args in command: {args}")
+ num_actual_keys = args[2]
+ eval_keys = args[3 : 3 + num_actual_keys]
+ # if there are 0 keys, that means the script can be run on any node
+ # so we can just return a random slot
+ if len(eval_keys) == 0:
+ return random.randrange(0, REDIS_CLUSTER_HASH_SLOTS)
+ keys = eval_keys
+ else:
+ keys = self._get_command_keys(*args)
+ if keys is None or len(keys) == 0:
raise RedisClusterException(
- f"{args[0]} - all keys must map to the same key slot"
+ "No way to dispatch this command to Redis Cluster. "
+ "Missing key.\nYou can execute the command by specifying "
+ f"target nodes.\nCommand: {args}"
)
- return slots.pop()
- else:
- # single key command
+
+ # single key command
+ if len(keys) == 1:
return self.keyslot(keys[0])
+ # multi-key command; we need to make sure all keys are mapped to
+ # the same slot
+ slots = {self.keyslot(key) for key in keys}
+ if len(slots) != 1:
+ raise RedisClusterException(
+ f"{command} - all keys must map to the same key slot"
+ )
+
+ return slots.pop()
+
def reinitialize_caches(self):
self.nodes_manager.initialize()
diff --git a/redis/commands/cluster.py b/redis/commands/cluster.py
index 5d0e804..8bdcbba 100644
--- a/redis/commands/cluster.py
+++ b/redis/commands/cluster.py
@@ -1,7 +1,13 @@
from redis.crc import key_slot
from redis.exceptions import RedisClusterException, RedisError
-from .core import ACLCommands, DataAccessCommands, ManagementCommands, PubSubCommands
+from .core import (
+ ACLCommands,
+ DataAccessCommands,
+ ManagementCommands,
+ PubSubCommands,
+ ScriptCommands,
+)
from .helpers import list_or_args
@@ -205,6 +211,7 @@ class RedisClusterCommands(
ACLCommands,
PubSubCommands,
ClusterDataAccessCommands,
+ ScriptCommands,
):
"""
A class for all Redis Cluster commands
diff --git a/redis/commands/core.py b/redis/commands/core.py
index e74550f..7fd668e 100644
--- a/redis/commands/core.py
+++ b/redis/commands/core.py
@@ -5399,6 +5399,62 @@ class ModuleCommands(CommandsProtocol):
return self.execute_command("COMMAND")
+class Script:
+ """
+ An executable Lua script object returned by ``register_script``
+ """
+
+ def __init__(self, registered_client, script):
+ self.registered_client = registered_client
+ self.script = script
+ # Precalculate and store the SHA1 hex digest of the script.
+
+ if isinstance(script, str):
+ # We need the encoding from the client in order to generate an
+ # accurate byte representation of the script
+ encoder = self.get_encoder()
+ script = encoder.encode(script)
+ self.sha = hashlib.sha1(script).hexdigest()
+
+ def __call__(self, keys=[], args=[], client=None):
+ "Execute the script, passing any required ``args``"
+ if client is None:
+ client = self.registered_client
+ args = tuple(keys) + tuple(args)
+ # make sure the Redis server knows about the script
+ from redis.client import Pipeline
+
+ if isinstance(client, Pipeline):
+ # Make sure the pipeline can register the script before executing.
+ client.scripts.add(self)
+ try:
+ return client.evalsha(self.sha, len(keys), *args)
+ except NoScriptError:
+ # Maybe the client is pointed to a different server than the client
+ # that created this instance?
+ # Overwrite the sha just in case there was a discrepancy.
+ self.sha = client.script_load(self.script)
+ return client.evalsha(self.sha, len(keys), *args)
+
+ def get_encoder(self):
+ """Get the encoder to encode string scripts into bytes."""
+ try:
+ return self.registered_client.get_encoder()
+ except AttributeError:
+ # DEPRECATED
+ # In version <=4.1.2, this was the code we used to get the encoder.
+ # However, after 4.1.2 we added support for scripting in clustered
+ # redis. ClusteredRedis doesn't have a `.connection_pool` attribute
+ # so we changed the Script class to use
+ # `self.registered_client.get_encoder` (see above).
+ # However, that is technically a breaking change, as consumers who
+ # use Scripts directly might inject a `registered_client` that
+ # doesn't have a `.get_encoder` field. This try/except prevents us
+ # from breaking backward-compatibility. Ideally, it would be
+ # removed in the next major release.
+ return self.registered_client.connection_pool.get_encoder()
+
+
class AsyncModuleCommands(ModuleCommands):
async def command_info(self) -> None:
return super().command_info()
diff --git a/redis/commands/parser.py b/redis/commands/parser.py
index 4cce800..2bb0576 100644
--- a/redis/commands/parser.py
+++ b/redis/commands/parser.py
@@ -24,7 +24,14 @@ class CommandsParser:
# https://github.com/redis/redis/pull/8324
def get_keys(self, redis_conn, *args):
"""
- Get the keys from the passed command
+ Get the keys from the passed command.
+
+ NOTE: Due to a bug in redis<7.0, this function does not work properly
+ for EVAL or EVALSHA when the `numkeys` arg is 0.
+ - issue: https://github.com/redis/redis/issues/9493
+ - fix: https://github.com/redis/redis/pull/9733
+
+ So, don't use this function with EVAL or EVALSHA.
"""
if len(args) < 2:
# The command has no keys in it
@@ -72,6 +79,14 @@ class CommandsParser:
return keys
def _get_moveable_keys(self, redis_conn, *args):
+ """
+ NOTE: Due to a bug in redis<7.0, this function does not work properly
+ for EVAL or EVALSHA when the `numkeys` arg is 0.
+ - issue: https://github.com/redis/redis/issues/9493
+ - fix: https://github.com/redis/redis/pull/9733
+
+ So, don't use this function with EVAL or EVALSHA.
+ """
pieces = []
cmd_name = args[0]
# The command name should be splitted into separate arguments,
diff --git a/redis/utils.py b/redis/utils.py
index 56fec49..9ab75f2 100644
--- a/redis/utils.py
+++ b/redis/utils.py
@@ -67,7 +67,7 @@ def merge_result(command, res):
Merge all items in `res` into a list.
This command is used when sending a command to multiple nodes
- and they result from each node should be merged into a single list.
+ and the result from each node should be merged into a single list.
res : 'dict'
"""
diff --git a/tests/test_command_parser.py b/tests/test_command_parser.py
index ad29e69..ab050a7 100644
--- a/tests/test_command_parser.py
+++ b/tests/test_command_parser.py
@@ -2,6 +2,8 @@ import pytest
from redis.commands import CommandsParser
+from .conftest import skip_if_server_version_lt
+
class TestCommandsParser:
def test_init_commands(self, r):
@@ -68,6 +70,19 @@ class TestCommandsParser:
assert commands_parser.get_keys(r, *args8) is None
assert commands_parser.get_keys(r, *args9).sort() == ["key1", "key2"].sort()
+ # A bug in redis<7.0 causes this to fail: https://github.com/redis/redis/issues/9493
+ @skip_if_server_version_lt("7.0.0")
+ def test_get_eval_keys_with_0_keys(self, r):
+ commands_parser = CommandsParser(r)
+ args = [
+ "EVAL",
+ "return {ARGV[1],ARGV[2]}",
+ 0,
+ "key1",
+ "key2",
+ ]
+ assert commands_parser.get_keys(r, *args) == []
+
def test_get_pubsub_keys(self, r):
commands_parser = CommandsParser(r)
args1 = ["PUBLISH", "foo", "bar"]
diff --git a/tests/test_scripting.py b/tests/test_scripting.py
index dcf8a78..4635f95 100644
--- a/tests/test_scripting.py
+++ b/tests/test_scripting.py
@@ -2,6 +2,7 @@ import pytest
import redis
from redis import exceptions
+from redis.commands.core import Script
from tests.conftest import skip_if_server_version_lt
multiply_script = """
@@ -21,24 +22,103 @@ return "hello " .. name
"""
-@pytest.mark.onlynoncluster
+class TestScript:
+ """
+ We have a few tests to directly test the Script class.
+
+ However, most of the behavioral tests are covered by `TestScripting`.
+ """
+
+ @pytest.fixture()
+ def script_str(self):
+ return "fake-script"
+
+ @pytest.fixture()
+ def script_bytes(self):
+ return b"\xcf\x84o\xcf\x81\xce\xbdo\xcf\x82"
+
+ def test_script_text(self, r, script_str, script_bytes):
+ assert Script(r, script_str).script == "fake-script"
+ assert Script(r, script_bytes).script == b"\xcf\x84o\xcf\x81\xce\xbdo\xcf\x82"
+
+ def test_string_script_sha(self, r, script_str):
+ script = Script(r, script_str)
+ assert script.sha == "505e4245f0866b60552741b3cce9a0c3d3b66a87"
+
+ def test_bytes_script_sha(self, r, script_bytes):
+ script = Script(r, script_bytes)
+ assert script.sha == "1329344e6bf995a35a8dc57ab1a6af8b2d54a763"
+
+ def test_encoder(self, r, script_bytes):
+ encoder = Script(r, script_bytes).get_encoder()
+ assert encoder is not None
+ assert encoder.encode("fake-script") == b"fake-script"
+
+
class TestScripting:
@pytest.fixture(autouse=True)
def reset_scripts(self, r):
r.script_flush()
- def test_eval(self, r):
+ def test_eval_multiply(self, r):
r.set("a", 2)
# 2 * 3 == 6
assert r.eval(multiply_script, 1, "a", 3) == 6
# @skip_if_server_version_lt("7.0.0") turn on after redis 7 release
+ @pytest.mark.onlynoncluster
def test_eval_ro(self, unstable_r):
unstable_r.set("a", "b")
assert unstable_r.eval_ro("return redis.call('GET', KEYS[1])", 1, "a") == "b"
with pytest.raises(redis.ResponseError):
unstable_r.eval_ro("return redis.call('DEL', KEYS[1])", 1, "a")
+ def test_eval_msgpack(self, r):
+ msgpack_message_dumped = b"\x81\xa4name\xa3Joe"
+ # this is msgpack.dumps({"name": "joe"})
+ assert r.eval(msgpack_hello_script, 0, msgpack_message_dumped) == b"hello Joe"
+
+ def test_eval_same_slot(self, r):
+ """
+ In a clustered redis, the script keys must be in the same slot.
+
+ This test isn't very interesting for standalone redis, but it doesn't
+ hurt anything.
+ """
+ r.set("A{foo}", 2)
+ r.set("B{foo}", 4)
+ # 2 * 4 == 8
+
+ script = """
+ local value = redis.call('GET', KEYS[1])
+ local value2 = redis.call('GET', KEYS[2])
+ return value * value2
+ """
+ result = r.eval(script, 2, "A{foo}", "B{foo}")
+ assert result == 8
+
+ @pytest.mark.onlycluster
+ def test_eval_crossslot(self, r):
+ """
+ In a clustered redis, the script keys must be in the same slot.
+
+ This test should fail, because the two keys we send are in different
+ slots. This test assumes that {foo} and {bar} will not go to the same
+ server when used. In a setup with 3 primaries and 3 secondaries, this
+ assumption holds.
+ """
+ r.set("A{foo}", 2)
+ r.set("B{bar}", 4)
+ # 2 * 4 == 8
+
+ script = """
+ local value = redis.call('GET', KEYS[1])
+ local value2 = redis.call('GET', KEYS[2])
+ return value * value2
+ """
+ with pytest.raises(exceptions.RedisClusterException):
+ r.eval(script, 2, "A{foo}", "B{bar}")
+
@skip_if_server_version_lt("6.2.0")
def test_script_flush_620(self, r):
r.set("a", 2)
@@ -75,6 +155,7 @@ class TestScripting:
assert r.evalsha(sha, 1, "a", 3) == 6
# @skip_if_server_version_lt("7.0.0") turn on after redis 7 release
+ @pytest.mark.onlynoncluster
def test_evalsha_ro(self, unstable_r):
unstable_r.set("a", "b")
get_sha = unstable_r.script_load("return redis.call('GET', KEYS[1])")
@@ -99,6 +180,11 @@ class TestScripting:
r.script_load(multiply_script)
assert r.script_exists(sha) == [True]
+ def test_flush_response(self, r):
+ r.script_load(multiply_script)
+ flush_response = r.script_flush()
+ assert flush_response is True
+
def test_script_object(self, r):
r.set("a", 2)
multiply = r.register_script(multiply_script)
@@ -114,6 +200,8 @@ class TestScripting:
# Test first evalsha block
assert multiply(keys=["a"], args=[3]) == 6
+ # Scripting is not supported in cluster pipelines
+ @pytest.mark.onlynoncluster
def test_script_object_in_pipeline(self, r):
multiply = r.register_script(multiply_script)
precalculated_sha = multiply.sha
@@ -142,6 +230,8 @@ class TestScripting:
assert pipe.execute() == [True, b"2", 6]
assert r.script_exists(multiply.sha) == [True]
+ # Scripting is not supported in cluster pipelines
+ @pytest.mark.onlynoncluster
def test_eval_msgpack_pipeline_error_in_lua(self, r):
msgpack_hello = r.register_script(msgpack_hello_script)
assert msgpack_hello.sha