summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorperryitay <85821686+perryitay@users.noreply.github.com>2022-01-20 09:05:53 +0200
committerGitHub <noreply@github.com>2022-01-20 09:05:53 +0200
commitc4b788230ca034761a0e9f6ca35b4aee4b15d340 (patch)
tree50b89484548307841d799786a75e6cc79a5d39d8
parent22172a4aa648374d7076b179dab18de09f72fd52 (diff)
downloadredis-c4b788230ca034761a0e9f6ca35b4aee4b15d340.tar.gz
Adding module api for processing commands during busy jobs and allow flagging the commands that should be handled at this status (#9963)
Some modules might perform a long-running logic in different stages of Redis lifetime, for example: * command execution * RDB loading * thread safe context During this long-running logic Redis is not responsive. This PR offers 1. An API to process events while a busy command is running (`RM_Yield`) 2. A new flag (`ALLOW_BUSY`) to mark the commands that should be handled during busy jobs which can also be used by modules (`allow-busy`) 3. In slow commands and thread safe contexts, this flag will start rejecting commands with -BUSY only after `busy-reply-threshold` 4. During loading (`rdb_load` callback), it'll process events right away (not wait for `busy-reply-threshold`), but either way, the processing is throttled to the server hz rate. 5. Allow modules to Yield to redis background tasks, but not to client commands * rename `script-time-limit` to `busy-reply-threshold` (an alias to the pre-7.0 `lua-time-limit`) Co-authored-by: Oran Agra <oran@redislabs.com>
-rw-r--r--redis.conf32
-rw-r--r--src/blocked.c20
-rw-r--r--src/commands.c26
-rw-r--r--src/commands/auth.json3
-rw-r--r--src/commands/discard.json3
-rw-r--r--src/commands/function-kill.json3
-rw-r--r--src/commands/function-stats.json3
-rw-r--r--src/commands/hello.json3
-rw-r--r--src/commands/multi.json3
-rw-r--r--src/commands/quit.json1
-rw-r--r--src/commands/replicaof.json1
-rw-r--r--src/commands/reset.json3
-rw-r--r--src/commands/script-kill.json3
-rw-r--r--src/commands/shutdown.json5
-rw-r--r--src/commands/unwatch.json3
-rw-r--r--src/commands/watch.json3
-rw-r--r--src/config.c2
-rw-r--r--src/db.c9
-rw-r--r--src/eval.c4
-rw-r--r--src/module.c90
-rw-r--r--src/networking.c22
-rw-r--r--src/redismodule.h7
-rw-r--r--src/script.c2
-rw-r--r--src/script_lua.c2
-rw-r--r--src/server.c46
-rw-r--r--src/server.h20
-rw-r--r--tests/modules/blockedclient.c98
-rw-r--r--tests/modules/datatype.c57
-rw-r--r--tests/unit/functions.tcl6
-rw-r--r--tests/unit/moduleapi/blockedclient.tcl77
-rw-r--r--tests/unit/moduleapi/datatype.tcl30
31 files changed, 492 insertions, 95 deletions
diff --git a/redis.conf b/redis.conf
index ca3d4d6ad..a433e4fc4 100644
--- a/redis.conf
+++ b/redis.conf
@@ -1485,23 +1485,29 @@ aof-timestamp-enabled no
#
# shutdown-timeout 10
-################################ LUA SCRIPTING ###############################
+################ NON-DETERMINISTIC LONG BLOCKING COMMANDS #####################
-# Max execution time of a Lua script in milliseconds.
+# Maximum time in milliseconds for EVAL scripts, functions and in some cases
+# modules' commands before Redis can start processing or rejecting other clients.
#
-# If the maximum execution time is reached Redis will log that a script is
-# still in execution after the maximum allowed time and will start to
-# reply to queries with an error.
+# If the maximum execution time is reached Redis will start to reply to most
+# commands with a BUSY error.
#
-# When a long running script exceeds the maximum execution time only the
-# SCRIPT KILL and SHUTDOWN NOSAVE commands are available. The first can be
-# used to stop a script that did not yet call any write commands. The second
-# is the only way to shut down the server in the case a write command was
-# already issued by the script but the user doesn't want to wait for the natural
-# termination of the script.
+# In this state Redis will only allow a handful of commands to be executed.
+# For instance, SCRIPT KILL, FUNCTION KILL, SHUTDOWN NOSAVE and possibly some
+# module specific 'allow-busy' commands.
#
-# Set it to 0 or a negative value for unlimited execution without warnings.
-lua-time-limit 5000
+# SCRIPT KILL and FUNCTION KILL will only be able to stop a script that did not
+# yet call any write commands, so SHUTDOWN NOSAVE may be the only way to stop
+# the server in the case a write command was already issued by the script when
+# the user doesn't want to wait for the natural termination of the script.
+#
+# The default is 5 seconds. It is possible to set it to 0 or a negative value
+# to disable this mechanism (uninterrupted execution). Note that in the past
+# this config had a different name, which is now an alias, so both of these do
+# the same:
+# lua-time-limit 5000
+# busy-reply-threshold 5000
################################ REDIS CLUSTER ###############################
diff --git a/src/blocked.c b/src/blocked.c
index 3d381691a..0feab4a69 100644
--- a/src/blocked.c
+++ b/src/blocked.c
@@ -90,16 +90,16 @@ void blockClient(client *c, int btype) {
/* Master client should never be blocked unless pause or module */
serverAssert(!(c->flags & CLIENT_MASTER &&
btype != BLOCKED_MODULE &&
- btype != BLOCKED_PAUSE));
+ btype != BLOCKED_POSTPONE));
c->flags |= CLIENT_BLOCKED;
c->btype = btype;
server.blocked_clients++;
server.blocked_clients_by_type[btype]++;
addClientToTimeoutTable(c);
- if (btype == BLOCKED_PAUSE) {
- listAddNodeTail(server.paused_clients, c);
- c->paused_list_node = listLast(server.paused_clients);
+ if (btype == BLOCKED_POSTPONE) {
+ listAddNodeTail(server.postponed_clients, c);
+ c->postponed_list_node = listLast(server.postponed_clients);
/* Mark this client to execute its command */
c->flags |= CLIENT_PENDING_COMMAND;
}
@@ -189,9 +189,9 @@ void unblockClient(client *c) {
} else if (c->btype == BLOCKED_MODULE) {
if (moduleClientIsBlockedOnKeys(c)) unblockClientWaitingData(c);
unblockClientFromModule(c);
- } else if (c->btype == BLOCKED_PAUSE) {
- listDelNode(server.paused_clients,c->paused_list_node);
- c->paused_list_node = NULL;
+ } else if (c->btype == BLOCKED_POSTPONE) {
+ listDelNode(server.postponed_clients,c->postponed_list_node);
+ c->postponed_list_node = NULL;
} else if (c->btype == BLOCKED_SHUTDOWN) {
/* No special cleanup. */
} else {
@@ -202,7 +202,7 @@ void unblockClient(client *c) {
* we do not do it immediately after the command returns (when the
* client got blocked) in order to be still able to access the argument
* vector from module callbacks and updateStatsOnUnblock. */
- if (c->btype != BLOCKED_PAUSE) {
+ if (c->btype != BLOCKED_POSTPONE) {
freeClientOriginalArgv(c);
resetClient(c);
}
@@ -266,11 +266,11 @@ void disconnectAllBlockedClients(void) {
client *c = listNodeValue(ln);
if (c->flags & CLIENT_BLOCKED) {
- /* PAUSED clients are an exception, when they'll be unblocked, the
+ /* POSTPONEd clients are an exception, when they'll be unblocked, the
* command processing will start from scratch, and the command will
* be either executed or rejected. (unlike LIST blocked clients for
* which the command is already in progress in a way. */
- if (c->btype == BLOCKED_PAUSE)
+ if (c->btype == BLOCKED_POSTPONE)
continue;
addReplyError(c,
diff --git a/src/commands.c b/src/commands.c
index b107ea02c..172c04b36 100644
--- a/src/commands.c
+++ b/src/commands.c
@@ -3273,11 +3273,11 @@ struct redisCommand FUNCTION_Subcommands[] = {
{"dump","Dump all functions into a serialized binary payload","O(N) where N is the number of functions","7.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SCRIPTING,FUNCTION_DUMP_History,FUNCTION_DUMP_Hints,functionDumpCommand,2,CMD_NOSCRIPT,ACL_CATEGORY_SCRIPTING},
{"flush","Deleting all functions","O(N) where N is the number of functions deleted","7.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SCRIPTING,FUNCTION_FLUSH_History,FUNCTION_FLUSH_Hints,functionFlushCommand,-2,CMD_NOSCRIPT|CMD_WRITE,ACL_CATEGORY_SCRIPTING,.args=FUNCTION_FLUSH_Args},
{"help","Show helpful text about the different subcommands","O(1)","7.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SCRIPTING,FUNCTION_HELP_History,FUNCTION_HELP_Hints,functionHelpCommand,2,CMD_LOADING|CMD_STALE,ACL_CATEGORY_SCRIPTING},
-{"kill","Kill the function currently in execution.","O(1)","7.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SCRIPTING,FUNCTION_KILL_History,FUNCTION_KILL_Hints,functionKillCommand,2,CMD_NOSCRIPT,ACL_CATEGORY_SCRIPTING},
+{"kill","Kill the function currently in execution.","O(1)","7.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SCRIPTING,FUNCTION_KILL_History,FUNCTION_KILL_Hints,functionKillCommand,2,CMD_NOSCRIPT|CMD_ALLOW_BUSY,ACL_CATEGORY_SCRIPTING},
{"list","List information about all the functions","O(N) where N is the number of functions","7.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SCRIPTING,FUNCTION_LIST_History,FUNCTION_LIST_Hints,functionListCommand,-2,CMD_NOSCRIPT,ACL_CATEGORY_SCRIPTING,.args=FUNCTION_LIST_Args},
{"load","Create a function with the given arguments (name, code, description)","O(1) (considering compilation time is redundant)","7.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SCRIPTING,FUNCTION_LOAD_History,FUNCTION_LOAD_Hints,functionLoadCommand,-5,CMD_NOSCRIPT|CMD_WRITE|CMD_DENYOOM,ACL_CATEGORY_SCRIPTING,.args=FUNCTION_LOAD_Args},
{"restore","Restore all the functions on the given payload","O(N) where N is the number of functions on the payload","7.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SCRIPTING,FUNCTION_RESTORE_History,FUNCTION_RESTORE_Hints,functionRestoreCommand,-3,CMD_NOSCRIPT|CMD_WRITE|CMD_DENYOOM,ACL_CATEGORY_SCRIPTING,.args=FUNCTION_RESTORE_Args},
-{"stats","Return information about the function currently running (name, description, duration)","O(1)","7.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SCRIPTING,FUNCTION_STATS_History,FUNCTION_STATS_Hints,functionStatsCommand,2,CMD_NOSCRIPT,ACL_CATEGORY_SCRIPTING},
+{"stats","Return information about the function currently running (name, description, duration)","O(1)","7.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SCRIPTING,FUNCTION_STATS_History,FUNCTION_STATS_Hints,functionStatsCommand,2,CMD_NOSCRIPT|CMD_ALLOW_BUSY,ACL_CATEGORY_SCRIPTING},
{0}
};
@@ -3385,7 +3385,7 @@ struct redisCommand SCRIPT_Subcommands[] = {
{"exists","Check existence of scripts in the script cache.","O(N) with N being the number of scripts to check (so checking a single script is an O(1) operation).","2.6.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SCRIPTING,SCRIPT_EXISTS_History,SCRIPT_EXISTS_Hints,scriptCommand,-3,CMD_NOSCRIPT,ACL_CATEGORY_SCRIPTING,.args=SCRIPT_EXISTS_Args},
{"flush","Remove all the scripts from the script cache.","O(N) with N being the number of scripts in cache","2.6.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SCRIPTING,SCRIPT_FLUSH_History,SCRIPT_FLUSH_Hints,scriptCommand,-2,CMD_NOSCRIPT,ACL_CATEGORY_SCRIPTING,.args=SCRIPT_FLUSH_Args},
{"help","Show helpful text about the different subcommands","O(1)","5.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SCRIPTING,SCRIPT_HELP_History,SCRIPT_HELP_Hints,scriptCommand,2,CMD_LOADING|CMD_STALE,ACL_CATEGORY_SCRIPTING},
-{"kill","Kill the script currently in execution.","O(1)","2.6.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SCRIPTING,SCRIPT_KILL_History,SCRIPT_KILL_Hints,scriptCommand,2,CMD_NOSCRIPT,ACL_CATEGORY_SCRIPTING},
+{"kill","Kill the script currently in execution.","O(1)","2.6.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SCRIPTING,SCRIPT_KILL_History,SCRIPT_KILL_Hints,scriptCommand,2,CMD_NOSCRIPT|CMD_ALLOW_BUSY,ACL_CATEGORY_SCRIPTING},
{"load","Load the specified Lua script into the script cache.","O(N) with N being the length in bytes of the script body.","2.6.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SCRIPTING,SCRIPT_LOAD_History,SCRIPT_LOAD_Hints,scriptCommand,3,CMD_NOSCRIPT,ACL_CATEGORY_SCRIPTING,.args=SCRIPT_LOAD_Args},
{0}
};
@@ -6503,13 +6503,13 @@ struct redisCommand redisCommandTable[] = {
{"readonly","Enables read queries for a connection to a cluster replica node","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_CLUSTER,READONLY_History,READONLY_Hints,readonlyCommand,1,CMD_FAST|CMD_LOADING|CMD_STALE,ACL_CATEGORY_CONNECTION},
{"readwrite","Disables read queries for a connection to a cluster replica node","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_CLUSTER,READWRITE_History,READWRITE_Hints,readwriteCommand,1,CMD_FAST|CMD_LOADING|CMD_STALE,ACL_CATEGORY_CONNECTION},
/* connection */
-{"auth","Authenticate to the server","O(N) where N is the number of passwords defined for the user","1.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_CONNECTION,AUTH_History,AUTH_Hints,authCommand,-2,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_FAST|CMD_NO_AUTH|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,.args=AUTH_Args},
+{"auth","Authenticate to the server","O(N) where N is the number of passwords defined for the user","1.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_CONNECTION,AUTH_History,AUTH_Hints,authCommand,-2,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_FAST|CMD_NO_AUTH|CMD_SENTINEL|CMD_ALLOW_BUSY,ACL_CATEGORY_CONNECTION,.args=AUTH_Args},
{"client","A container for client connection commands","Depends on subcommand.","2.4.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_CONNECTION,CLIENT_History,CLIENT_Hints,NULL,-2,CMD_SENTINEL,0,.subcommands=CLIENT_Subcommands},
{"echo","Echo the given string","O(1)","1.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_CONNECTION,ECHO_History,ECHO_Hints,echoCommand,2,CMD_FAST,ACL_CATEGORY_CONNECTION,.args=ECHO_Args},
-{"hello","Handshake with Redis","O(1)","6.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_CONNECTION,HELLO_History,HELLO_Hints,helloCommand,-1,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_FAST|CMD_NO_AUTH|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,.args=HELLO_Args},
+{"hello","Handshake with Redis","O(1)","6.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_CONNECTION,HELLO_History,HELLO_Hints,helloCommand,-1,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_FAST|CMD_NO_AUTH|CMD_SENTINEL|CMD_ALLOW_BUSY,ACL_CATEGORY_CONNECTION,.args=HELLO_Args},
{"ping","Ping the server","O(1)","1.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_CONNECTION,PING_History,PING_Hints,pingCommand,-1,CMD_FAST|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,.args=PING_Args},
-{"quit","Close the connection","O(1)","1.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_CONNECTION,QUIT_History,QUIT_Hints,quitCommand,-1,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_FAST|CMD_NO_AUTH,ACL_CATEGORY_CONNECTION},
-{"reset","Reset the connection","O(1)","6.2.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_CONNECTION,RESET_History,RESET_Hints,resetCommand,1,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_FAST|CMD_NO_AUTH,ACL_CATEGORY_CONNECTION},
+{"quit","Close the connection","O(1)","1.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_CONNECTION,QUIT_History,QUIT_Hints,quitCommand,-1,CMD_ALLOW_BUSY|CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_FAST|CMD_NO_AUTH,ACL_CATEGORY_CONNECTION},
+{"reset","Reset the connection","O(1)","6.2.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_CONNECTION,RESET_History,RESET_Hints,resetCommand,1,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_FAST|CMD_NO_AUTH|CMD_ALLOW_BUSY,ACL_CATEGORY_CONNECTION},
{"select","Change the selected database for the current connection","O(1)","1.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_CONNECTION,SELECT_History,SELECT_Hints,selectCommand,2,CMD_LOADING|CMD_STALE|CMD_FAST,ACL_CATEGORY_CONNECTION,.args=SELECT_Args},
/* generic */
{"copy","Copy a key","O(N) worst case for collections, where N is the number of nested items. O(1) for string values.","6.2.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_GENERIC,COPY_History,COPY_Hints,copyCommand,-3,CMD_WRITE|CMD_DENYOOM,ACL_CATEGORY_KEYSPACE,{{CMD_KEY_RO|CMD_KEY_ACCESS,KSPEC_BS_INDEX,.bs.index={1},KSPEC_FK_RANGE,.fk.range={0,1,0}},{CMD_KEY_OW|CMD_KEY_UPDATE,KSPEC_BS_INDEX,.bs.index={2},KSPEC_FK_RANGE,.fk.range={0,1,0}}},.args=COPY_Args},
@@ -6638,11 +6638,11 @@ struct redisCommand redisCommandTable[] = {
{"monitor","Listen for all requests received by the server in real time",NULL,"1.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SERVER,MONITOR_History,MONITOR_Hints,monitorCommand,1,CMD_ADMIN|CMD_NOSCRIPT|CMD_LOADING|CMD_STALE,0},
{"psync","Internal command used for replication",NULL,"2.8.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SERVER,PSYNC_History,PSYNC_Hints,syncCommand,-3,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_NO_MULTI|CMD_NOSCRIPT,0,.args=PSYNC_Args},
{"replconf","An internal command for configuring the replication stream","O(1)","3.0.0",CMD_DOC_SYSCMD,NULL,NULL,COMMAND_GROUP_SERVER,REPLCONF_History,REPLCONF_Hints,replconfCommand,-1,CMD_ADMIN|CMD_NOSCRIPT|CMD_LOADING|CMD_STALE,0},
-{"replicaof","Make the server a replica of another instance, or promote it as master.","O(1)","5.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SERVER,REPLICAOF_History,REPLICAOF_Hints,replicaofCommand,3,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_NOSCRIPT|CMD_STALE,0,.args=REPLICAOF_Args},
+{"replicaof","Make the server a replica of another instance, or promote it as master.","O(1)","5.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SERVER,REPLICAOF_History,REPLICAOF_Hints,replicaofCommand,3,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_ALLOW_BUSY|CMD_NOSCRIPT|CMD_STALE,0,.args=REPLICAOF_Args},
{"restore-asking","An internal command for migrating keys in a cluster","O(1) to create the new key and additional O(N*M) to reconstruct the serialized value, where N is the number of Redis objects composing the value and M their average size. For small string values the time complexity is thus O(1)+O(1*M) where M is small, so simply O(1). However for sorted set values the complexity is O(N*M*log(N)) because inserting values into sorted sets is O(log(N)).","3.0.0",CMD_DOC_SYSCMD,NULL,NULL,COMMAND_GROUP_SERVER,RESTORE_ASKING_History,RESTORE_ASKING_Hints,restoreCommand,-4,CMD_WRITE|CMD_DENYOOM|CMD_ASKING,ACL_CATEGORY_KEYSPACE|ACL_CATEGORY_DANGEROUS,{{CMD_KEY_OW|CMD_KEY_UPDATE,KSPEC_BS_INDEX,.bs.index={1},KSPEC_FK_RANGE,.fk.range={0,1,0}}}},
{"role","Return the role of the instance in the context of replication","O(1)","2.8.12",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SERVER,ROLE_History,ROLE_Hints,roleCommand,1,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_FAST|CMD_SENTINEL,ACL_CATEGORY_ADMIN|ACL_CATEGORY_DANGEROUS},
{"save","Synchronously save the dataset to disk","O(N) where N is the total number of keys in all databases","1.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SERVER,SAVE_History,SAVE_Hints,saveCommand,1,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_NOSCRIPT|CMD_NO_MULTI,0},
-{"shutdown","Synchronously save the dataset to disk and then shut down the server","O(N) when saving, where N is the total number of keys in all databases when saving data, otherwise O(1)","1.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SERVER,SHUTDOWN_History,SHUTDOWN_Hints,shutdownCommand,-1,CMD_ADMIN|CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_NO_MULTI|CMD_SENTINEL,0,.args=SHUTDOWN_Args},
+{"shutdown","Synchronously save the dataset to disk and then shut down the server","O(N) when saving, where N is the total number of keys in all databases when saving data, otherwise O(1)","1.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SERVER,SHUTDOWN_History,SHUTDOWN_Hints,shutdownCommand,-1,CMD_ADMIN|CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_NO_MULTI|CMD_SENTINEL|CMD_ALLOW_BUSY,0,.args=SHUTDOWN_Args},
{"slaveof","Make the server a replica of another instance, or promote it as master. Deprecated starting with Redis 5. Use REPLICAOF instead.","O(1)","1.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SERVER,SLAVEOF_History,SLAVEOF_Hints,replicaofCommand,3,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_NOSCRIPT|CMD_STALE,0,.args=SLAVEOF_Args},
{"slowlog","A container for slow log commands","Depends on subcommand.","2.2.12",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SERVER,SLOWLOG_History,SLOWLOG_Hints,NULL,-2,0,0,.subcommands=SLOWLOG_Subcommands},
{"swapdb","Swaps two Redis databases","O(N) where N is the count of clients watching or blocking on keys from both databases.","4.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SERVER,SWAPDB_History,SWAPDB_Hints,swapdbCommand,3,CMD_WRITE|CMD_FAST,ACL_CATEGORY_KEYSPACE|ACL_CATEGORY_DANGEROUS,.args=SWAPDB_Args},
@@ -6742,10 +6742,10 @@ struct redisCommand redisCommandTable[] = {
{"strlen","Get the length of the value stored in a key","O(1)","2.2.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_STRING,STRLEN_History,STRLEN_Hints,strlenCommand,2,CMD_READONLY|CMD_FAST,ACL_CATEGORY_STRING,{{CMD_KEY_RO,KSPEC_BS_INDEX,.bs.index={1},KSPEC_FK_RANGE,.fk.range={0,1,0}}},.args=STRLEN_Args},
{"substr","Get a substring of the string stored at a key","O(N) where N is the length of the returned string. The complexity is ultimately determined by the returned length, but because creating a substring from an existing string is very cheap, it can be considered O(1) for small strings.","1.0.0",CMD_DOC_DEPRECATED,"`GETRANGE`","2.0.0",COMMAND_GROUP_STRING,SUBSTR_History,SUBSTR_Hints,getrangeCommand,4,CMD_READONLY,ACL_CATEGORY_STRING,{{CMD_KEY_RO|CMD_KEY_ACCESS,KSPEC_BS_INDEX,.bs.index={1},KSPEC_FK_RANGE,.fk.range={0,1,0}}},.args=SUBSTR_Args},
/* transactions */
-{"discard","Discard all commands issued after MULTI","O(N), when N is the number of queued commands","2.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_TRANSACTIONS,DISCARD_History,DISCARD_Hints,discardCommand,1,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_FAST,ACL_CATEGORY_TRANSACTION},
+{"discard","Discard all commands issued after MULTI","O(N), when N is the number of queued commands","2.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_TRANSACTIONS,DISCARD_History,DISCARD_Hints,discardCommand,1,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_FAST|CMD_ALLOW_BUSY,ACL_CATEGORY_TRANSACTION},
{"exec","Execute all commands issued after MULTI","Depends on commands in the transaction","1.2.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_TRANSACTIONS,EXEC_History,EXEC_Hints,execCommand,1,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SKIP_SLOWLOG,ACL_CATEGORY_TRANSACTION},
-{"multi","Mark the start of a transaction block","O(1)","1.2.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_TRANSACTIONS,MULTI_History,MULTI_Hints,multiCommand,1,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_FAST,ACL_CATEGORY_TRANSACTION},
-{"unwatch","Forget about all watched keys","O(1)","2.2.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_TRANSACTIONS,UNWATCH_History,UNWATCH_Hints,unwatchCommand,1,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_FAST,ACL_CATEGORY_TRANSACTION},
-{"watch","Watch the given keys to determine execution of the MULTI/EXEC block","O(1) for every key.","2.2.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_TRANSACTIONS,WATCH_History,WATCH_Hints,watchCommand,-2,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_FAST,ACL_CATEGORY_TRANSACTION,{{0,KSPEC_BS_INDEX,.bs.index={1},KSPEC_FK_RANGE,.fk.range={-1,1,0}}},.args=WATCH_Args},
+{"multi","Mark the start of a transaction block","O(1)","1.2.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_TRANSACTIONS,MULTI_History,MULTI_Hints,multiCommand,1,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_FAST|CMD_ALLOW_BUSY,ACL_CATEGORY_TRANSACTION},
+{"unwatch","Forget about all watched keys","O(1)","2.2.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_TRANSACTIONS,UNWATCH_History,UNWATCH_Hints,unwatchCommand,1,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_FAST|CMD_ALLOW_BUSY,ACL_CATEGORY_TRANSACTION},
+{"watch","Watch the given keys to determine execution of the MULTI/EXEC block","O(1) for every key.","2.2.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_TRANSACTIONS,WATCH_History,WATCH_Hints,watchCommand,-2,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_FAST|CMD_ALLOW_BUSY,ACL_CATEGORY_TRANSACTION,{{0,KSPEC_BS_INDEX,.bs.index={1},KSPEC_FK_RANGE,.fk.range={-1,1,0}}},.args=WATCH_Args},
{0}
};
diff --git a/src/commands/auth.json b/src/commands/auth.json
index 7d6172484..ff5e4b285 100644
--- a/src/commands/auth.json
+++ b/src/commands/auth.json
@@ -18,7 +18,8 @@
"STALE",
"FAST",
"NO_AUTH",
- "SENTINEL"
+ "SENTINEL",
+ "ALLOW_BUSY"
],
"acl_categories": [
"CONNECTION"
diff --git a/src/commands/discard.json b/src/commands/discard.json
index 8e056b09d..56589a84f 100644
--- a/src/commands/discard.json
+++ b/src/commands/discard.json
@@ -10,7 +10,8 @@
"NOSCRIPT",
"LOADING",
"STALE",
- "FAST"
+ "FAST",
+ "ALLOW_BUSY"
],
"acl_categories": [
"TRANSACTION"
diff --git a/src/commands/function-kill.json b/src/commands/function-kill.json
index 43377d8b8..660158295 100644
--- a/src/commands/function-kill.json
+++ b/src/commands/function-kill.json
@@ -8,7 +8,8 @@
"container": "FUNCTION",
"function": "functionKillCommand",
"command_flags": [
- "NOSCRIPT"
+ "NOSCRIPT",
+ "ALLOW_BUSY"
],
"acl_categories": [
"SCRIPTING"
diff --git a/src/commands/function-stats.json b/src/commands/function-stats.json
index 1fed0f24f..dd2133bef 100644
--- a/src/commands/function-stats.json
+++ b/src/commands/function-stats.json
@@ -8,7 +8,8 @@
"container": "FUNCTION",
"function": "functionStatsCommand",
"command_flags": [
- "NOSCRIPT"
+ "NOSCRIPT",
+ "ALLOW_BUSY"
],
"acl_categories": [
"SCRIPTING"
diff --git a/src/commands/hello.json b/src/commands/hello.json
index 98b11c2b6..8e80a81cf 100644
--- a/src/commands/hello.json
+++ b/src/commands/hello.json
@@ -18,7 +18,8 @@
"STALE",
"FAST",
"NO_AUTH",
- "SENTINEL"
+ "SENTINEL",
+ "ALLOW_BUSY"
],
"acl_categories": [
"CONNECTION"
diff --git a/src/commands/multi.json b/src/commands/multi.json
index 0956af781..f1299a6f4 100644
--- a/src/commands/multi.json
+++ b/src/commands/multi.json
@@ -10,7 +10,8 @@
"NOSCRIPT",
"LOADING",
"STALE",
- "FAST"
+ "FAST",
+ "ALLOW_BUSY"
],
"acl_categories": [
"TRANSACTION"
diff --git a/src/commands/quit.json b/src/commands/quit.json
index 4f74056c9..cf13f1ee9 100644
--- a/src/commands/quit.json
+++ b/src/commands/quit.json
@@ -7,6 +7,7 @@
"arity": -1,
"function": "quitCommand",
"command_flags": [
+ "ALLOW_BUSY",
"NOSCRIPT",
"LOADING",
"STALE",
diff --git a/src/commands/replicaof.json b/src/commands/replicaof.json
index 805b81e4c..90ec59fed 100644
--- a/src/commands/replicaof.json
+++ b/src/commands/replicaof.json
@@ -9,6 +9,7 @@
"command_flags": [
"NO_ASYNC_LOADING",
"ADMIN",
+ "ALLOW_BUSY",
"NOSCRIPT",
"STALE"
],
diff --git a/src/commands/reset.json b/src/commands/reset.json
index fe2083cdd..40041cd8c 100644
--- a/src/commands/reset.json
+++ b/src/commands/reset.json
@@ -11,7 +11,8 @@
"LOADING",
"STALE",
"FAST",
- "NO_AUTH"
+ "NO_AUTH",
+ "ALLOW_BUSY"
],
"acl_categories": [
"CONNECTION"
diff --git a/src/commands/script-kill.json b/src/commands/script-kill.json
index 10bf1c7b0..674aac6d3 100644
--- a/src/commands/script-kill.json
+++ b/src/commands/script-kill.json
@@ -8,7 +8,8 @@
"container": "SCRIPT",
"function": "scriptCommand",
"command_flags": [
- "NOSCRIPT"
+ "NOSCRIPT",
+ "ALLOW_BUSY"
],
"acl_categories": [
"SCRIPTING"
diff --git a/src/commands/shutdown.json b/src/commands/shutdown.json
index 816eae781..94b2cf588 100644
--- a/src/commands/shutdown.json
+++ b/src/commands/shutdown.json
@@ -17,8 +17,9 @@
"NOSCRIPT",
"LOADING",
"STALE",
- "NO_MULTI",
- "SENTINEL"
+ "NO_MULTI",
+ "SENTINEL",
+ "ALLOW_BUSY"
],
"arguments": [
{
diff --git a/src/commands/unwatch.json b/src/commands/unwatch.json
index 86b3027d4..820ea5b93 100644
--- a/src/commands/unwatch.json
+++ b/src/commands/unwatch.json
@@ -10,7 +10,8 @@
"NOSCRIPT",
"LOADING",
"STALE",
- "FAST"
+ "FAST",
+ "ALLOW_BUSY"
],
"acl_categories": [
"TRANSACTION"
diff --git a/src/commands/watch.json b/src/commands/watch.json
index e2777e543..e167d17cf 100644
--- a/src/commands/watch.json
+++ b/src/commands/watch.json
@@ -10,7 +10,8 @@
"NOSCRIPT",
"LOADING",
"STALE",
- "FAST"
+ "FAST",
+ "ALLOW_BUSY"
],
"acl_categories": [
"TRANSACTION"
diff --git a/src/config.c b/src/config.c
index 7915385cf..46bd1ebb9 100644
--- a/src/config.c
+++ b/src/config.c
@@ -2867,7 +2867,7 @@ standardConfig configs[] = {
createULongConfig("acllog-max-len", NULL, MODIFIABLE_CONFIG, 0, LONG_MAX, server.acllog_max_len, 128, INTEGER_CONFIG, NULL, NULL),
/* Long Long configs */
- createLongLongConfig("script-time-limit", "lua-time-limit", MODIFIABLE_CONFIG, 0, LONG_MAX, server.script_time_limit, 5000, INTEGER_CONFIG, NULL, NULL),/* milliseconds */
+ createLongLongConfig("busy-reply-threshold", "lua-time-limit", MODIFIABLE_CONFIG, 0, LONG_MAX, server.busy_reply_threshold, 5000, INTEGER_CONFIG, NULL, NULL),/* milliseconds */
createLongLongConfig("cluster-node-timeout", NULL, MODIFIABLE_CONFIG, 0, LLONG_MAX, server.cluster_node_timeout, 15000, INTEGER_CONFIG, NULL, NULL),
createLongLongConfig("slowlog-log-slower-than", NULL, MODIFIABLE_CONFIG, -1, LLONG_MAX, server.slowlog_log_slower_than, 10000, INTEGER_CONFIG, NULL, NULL),
createLongLongConfig("latency-monitor-threshold", NULL, MODIFIABLE_CONFIG, 0, LLONG_MAX, server.latency_monitor_threshold, 0, INTEGER_CONFIG, NULL, NULL),
diff --git a/src/db.c b/src/db.c
index cd824024d..ff20f4687 100644
--- a/src/db.c
+++ b/src/db.c
@@ -1075,10 +1075,15 @@ void shutdownCommand(client *c) {
if (!(flags & SHUTDOWN_NOSAVE) && scriptIsTimedout()) {
/* Script timed out. Shutdown allowed only with the NOSAVE flag. See
* also processCommand where these errors are returned. */
- if (scriptIsEval())
+ if (server.busy_module_yield_flags && server.busy_module_yield_reply) {
+ addReplyErrorFormat(c, "-BUSY %s", server.busy_module_yield_reply);
+ } else if (server.busy_module_yield_flags) {
+ addReplyErrorObject(c, shared.slowmoduleerr);
+ } else if (scriptIsEval()) {
addReplyErrorObject(c, shared.slowevalerr);
- else
+ } else {
addReplyErrorObject(c, shared.slowscripterr);
+ }
return;
}
diff --git a/src/eval.c b/src/eval.c
index 942f7e07b..2d45ceac9 100644
--- a/src/eval.c
+++ b/src/eval.c
@@ -1535,8 +1535,8 @@ void luaLdbLineHook(lua_State *lua, lua_Debug *ar) {
/* Check if a timeout occurred. */
if (ar->event == LUA_HOOKCOUNT && ldb.step == 0 && bp == 0) {
mstime_t elapsed = elapsedMs(rctx->start_time);
- mstime_t timelimit = server.script_time_limit ?
- server.script_time_limit : 5000;
+ mstime_t timelimit = server.busy_reply_threshold ?
+ server.busy_reply_threshold : 5000;
if (elapsed >= timelimit) {
timeout = 1;
ldb.step = 1;
diff --git a/src/module.c b/src/module.c
index bce6cf3a4..44c638b02 100644
--- a/src/module.c
+++ b/src/module.c
@@ -158,6 +158,7 @@ struct RedisModuleCtx {
getKeysResult *keys_result;
struct RedisModulePoolAllocBlock *pa_head;
+ long long next_yield_time;
};
typedef struct RedisModuleCtx RedisModuleCtx;
@@ -650,8 +651,15 @@ void moduleFreeContext(RedisModuleCtx *ctx) {
if (!(ctx->flags & REDISMODULE_CTX_THREAD_SAFE)) {
/* Modules take care of their own propagation, when we are
* outside of call() context (timers, events, etc.). */
- if (--server.module_ctx_nesting == 0 && !server.core_propagates)
- propagatePendingCommands();
+ if (--server.module_ctx_nesting == 0) {
+ if (!server.core_propagates)
+ propagatePendingCommands();
+ if (server.busy_module_yield_flags) {
+ blockingOperationEnds();
+ server.busy_module_yield_flags = BUSY_MODULE_YIELD_NONE;
+ unblockPostponedClients();
+ }
+ }
}
autoMemoryCollect(ctx);
poolAllocRelease(ctx);
@@ -691,6 +699,18 @@ void moduleCreateContext(RedisModuleCtx *out_ctx, RedisModule *module, int ctx_f
else if (ctx_flags & REDISMODULE_CTX_NEW_CLIENT)
out_ctx->client = createClient(NULL);
+ /* Calculate the initial yield time for long blocked contexts.
+ * in loading we depend on the server hz, but in other cases we also wait
+ * for busy_reply_threshold.
+ * Note that in theory we could have started processing BUSY_MODULE_YIELD_EVENTS
+ * sooner, and only delay the processing for clients till the busy_reply_threshold,
+ * but this carries some overheads of frequently marking clients with BLOCKED_POSTPONE
+ * and releasing them, i.e. if modules only block for short periods. */
+ if (server.loading)
+ out_ctx->next_yield_time = getMonotonicUs() + 1000000 / server.hz;
+ else
+ out_ctx->next_yield_time = getMonotonicUs() + server.busy_reply_threshold * 1000;
+
if (!(ctx_flags & REDISMODULE_CTX_THREAD_SAFE)) {
server.module_ctx_nesting++;
}
@@ -821,6 +841,7 @@ int64_t commandFlagsFromString(char *s) {
else if (!strcasecmp(t,"getkeys-api")) flags |= CMD_MODULE_GETKEYS;
else if (!strcasecmp(t,"no-cluster")) flags |= CMD_MODULE_NO_CLUSTER;
else if (!strcasecmp(t,"no-mandatory-keys")) flags |= CMD_NO_MANDATORY_KEYS;
+ else if (!strcasecmp(t,"allow-busy")) flags |= CMD_ALLOW_BUSY;
else break;
}
sdsfreesplitres(tokens,count);
@@ -917,6 +938,9 @@ RedisModuleCommand *moduleCreateCommandProxy(struct RedisModule *module, const c
* * **"may-replicate"**: This command may generate replication traffic, even
* though it's not a write command.
* * **"no-mandatory-keys"**: All the keys this command may take are optional
+ * * **"allow-busy"**: Permit the command while the server is blocked either by
+ * a script or by a slow module command, see
+ * RM_Yield.
*
* The last three parameters specify which arguments of the new command are
* Redis keys. See https://redis.io/commands/command for more information.
@@ -1361,6 +1385,61 @@ int RM_BlockedClientMeasureTimeEnd(RedisModuleBlockedClient *bc) {
return REDISMODULE_OK;
}
+/* This API allows modules to let Redis process background tasks, and some
+ * commands during long blocking execution of a module command.
+ * The module can call this API periodically.
+ * The flags is a bit mask of these:
+ *
+ * - `REDISMODULE_YIELD_FLAG_NONE`: No special flags, can perform some background
+ * operations, but not process client commands.
+ * - `REDISMODULE_YIELD_FLAG_CLIENTS`: Redis can also process client commands.
+ *
+ * The `busy_reply` argument is optional, and can be used to control the verbose
+ * error string after the `-BUSY` error code.
+ *
+ * When the `REDISMODULE_YIELD_FLAG_CLIENTS` is used, Redis will only start
+ * processing client commands after the time defined by the
+ * `busy-reply-threshold` config, in which case Redis will start rejecting most
+ * commands with `-BUSY` error, but allow the ones marked with the `allow-busy`
+ * flag to be executed.
+ * This API can also be used in thread safe context (while locked), and during
+ * loading (in the `rdb_load` callback, in which case it'll reject commands with
+ * the -LOADING error)
+ */
+void RM_Yield(RedisModuleCtx *ctx, int flags, const char *busy_reply) {
+ long long now = getMonotonicUs();
+ if (now >= ctx->next_yield_time) {
+ /* In loading mode, there's no need to handle busy_module_yield_reply,
+ * and busy_module_yield_flags, since redis is anyway rejecting all
+ * commands with -LOADING. */
+ if (server.loading) {
+ /* Let redis process events */
+ processEventsWhileBlocked();
+ } else {
+ const char *prev_busy_module_yield_reply = server.busy_module_yield_reply;
+ server.busy_module_yield_reply = busy_reply;
+ /* start the blocking operation if not already started. */
+ if (!server.busy_module_yield_flags) {
+ server.busy_module_yield_flags = flags & REDISMODULE_YIELD_FLAG_CLIENTS ?
+ BUSY_MODULE_YIELD_CLIENTS : BUSY_MODULE_YIELD_EVENTS;
+ blockingOperationStarts();
+ }
+
+ /* Let redis process events */
+ processEventsWhileBlocked();
+
+ server.busy_module_yield_reply = prev_busy_module_yield_reply;
+ /* Possibly restore the previous flags in case of two nested contexts
+ * that use this API with different flags, but keep the first bit
+ * (PROCESS_EVENTS) set, so we know to call blockingOperationEnds on time. */
+ server.busy_module_yield_flags &= ~BUSY_MODULE_YIELD_CLIENTS;
+ }
+
+ /* decide when the next event should fire. */
+ ctx->next_yield_time = now + 1000000 / server.hz;
+ }
+}
+
/* Set flags defining capabilities or behavior bit flags.
*
* REDISMODULE_OPTIONS_HANDLE_IO_ERRORS:
@@ -6743,6 +6822,12 @@ void moduleGILBeforeUnlock() {
* released we have to propagate here). */
server.module_ctx_nesting--;
propagatePendingCommands();
+
+ if (server.busy_module_yield_flags) {
+ blockingOperationEnds();
+ server.busy_module_yield_flags = BUSY_MODULE_YIELD_NONE;
+ unblockPostponedClients();
+ }
}
/* Release the server lock after a thread safe API call was executed. */
@@ -10958,4 +11043,5 @@ void moduleRegisterCoreAPI(void) {
REGISTER_API(EventLoopAdd);
REGISTER_API(EventLoopDel);
REGISTER_API(EventLoopAddOneShot);
+ REGISTER_API(Yield);
}
diff --git a/src/networking.c b/src/networking.c
index 57576032e..8ea5efd1b 100644
--- a/src/networking.c
+++ b/src/networking.c
@@ -194,7 +194,7 @@ client *createClient(connection *conn) {
c->peerid = NULL;
c->sockname = NULL;
c->client_list_node = NULL;
- c->paused_list_node = NULL;
+ c->postponed_list_node = NULL;
c->pending_read_list_node = NULL;
c->client_tracking_redirection = 0;
c->client_tracking_prefixes = NULL;
@@ -3628,13 +3628,19 @@ static void updateClientPauseTypeAndEndTime(void) {
/* If the pause type is less restrictive than before, we unblock all clients
* so they are reprocessed (may get re-paused). */
if (type < old_type) {
- listNode *ln;
- listIter li;
- listRewind(server.paused_clients, &li);
- while ((ln = listNext(&li)) != NULL) {
- client *c = listNodeValue(ln);
- unblockClient(c);
- }
+ unblockPostponedClients();
+ }
+}
+
+/* Unblock all paused clients (ones that where blocked by BLOCKED_POSTPONE (possibly in processCommand).
+ * This means they'll get re-processed in beforeSleep, and may get paused again if needed. */
+void unblockPostponedClients() {
+ listNode *ln;
+ listIter li;
+ listRewind(server.postponed_clients, &li);
+ while ((ln = listNext(&li)) != NULL) {
+ client *c = listNodeValue(ln);
+ unblockClient(c);
}
}
diff --git a/src/redismodule.h b/src/redismodule.h
index 7136bbf6e..15990a04f 100644
--- a/src/redismodule.h
+++ b/src/redismodule.h
@@ -218,6 +218,10 @@ This flag should not be used directly by the module.
#define REDISMODULE_AUX_BEFORE_RDB (1<<0)
#define REDISMODULE_AUX_AFTER_RDB (1<<1)
+/* RM_Yield flags */
+#define REDISMODULE_YIELD_FLAG_NONE (1<<0)
+#define REDISMODULE_YIELD_FLAG_CLIENTS (1<<1)
+
/* This type represents a timer handle, and is returned when a timer is
* registered and used in order to invalidate a timer. It's just a 64 bit
* number, because this is how each timer is represented inside the radix tree
@@ -919,7 +923,7 @@ REDISMODULE_API int (*RedisModule_SetCommandKeySpecBeginSearchIndex)(RedisModule
REDISMODULE_API int (*RedisModule_SetCommandKeySpecBeginSearchKeyword)(RedisModuleCommand *command, int spec_id, const char *keyword, int startfrom) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_SetCommandKeySpecFindKeysRange)(RedisModuleCommand *command, int spec_id, int lastkey, int keystep, int limit) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_SetCommandKeySpecFindKeysKeynum)(RedisModuleCommand *command, int spec_id, int keynumidx, int firstkey, int keystep) REDISMODULE_ATTR;
-
+REDISMODULE_API void (*RedisModule_Yield)(RedisModuleCtx *ctx, int flags, const char *busy_reply) REDISMODULE_ATTR;
REDISMODULE_API RedisModuleBlockedClient * (*RedisModule_BlockClient)(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_UnblockClient)(RedisModuleBlockedClient *bc, void *privdata) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_IsBlockedReplyRequest)(RedisModuleCtx *ctx) REDISMODULE_ATTR;
@@ -1242,6 +1246,7 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int
REDISMODULE_GET_API(SetCommandKeySpecBeginSearchKeyword);
REDISMODULE_GET_API(SetCommandKeySpecFindKeysRange);
REDISMODULE_GET_API(SetCommandKeySpecFindKeysKeynum);
+ REDISMODULE_GET_API(Yield);
REDISMODULE_GET_API(GetThreadSafeContext);
REDISMODULE_GET_API(GetDetachedThreadSafeContext);
REDISMODULE_GET_API(FreeThreadSafeContext);
diff --git a/src/script.c b/src/script.c
index 8b1545b1a..d99c928ce 100644
--- a/src/script.c
+++ b/src/script.c
@@ -85,7 +85,7 @@ int scriptInterrupt(scriptRunCtx *run_ctx) {
}
long long elapsed = elapsedMs(run_ctx->start_time);
- if (elapsed < server.script_time_limit) {
+ if (elapsed < server.busy_reply_threshold) {
return SCRIPT_CONTINUE;
}
diff --git a/src/script_lua.c b/src/script_lua.c
index 76e9eb7ad..d7332cf86 100644
--- a/src/script_lua.c
+++ b/src/script_lua.c
@@ -1363,7 +1363,7 @@ void luaCallFunction(scriptRunCtx* run_ctx, lua_State *lua, robj** keys, size_t
* each time the Lua hook is invoked. */
luaSaveOnRegistry(lua, REGISTRY_RUN_CTX_NAME, run_ctx);
- if (server.script_time_limit > 0 && !debug_enabled) {
+ if (server.busy_reply_threshold > 0 && !debug_enabled) {
lua_sethook(lua,luaMaskCountHook,LUA_MASKCOUNT,100000);
delhook = 1;
} else if (debug_enabled) {
diff --git a/src/server.c b/src/server.c
index 92cec7e38..b6515ceeb 100644
--- a/src/server.c
+++ b/src/server.c
@@ -1613,6 +1613,8 @@ void createSharedObjects(void) {
"-BUSY Redis is busy running a script. You can only call SCRIPT KILL or SHUTDOWN NOSAVE.\r\n"));
shared.slowscripterr = createObject(OBJ_STRING,sdsnew(
"-BUSY Redis is busy running a script. You can only call FUNCTION KILL or SHUTDOWN NOSAVE.\r\n"));
+ shared.slowmoduleerr = createObject(OBJ_STRING,sdsnew(
+ "-BUSY Redis is busy running a module command.\r\n"));
shared.masterdownerr = createObject(OBJ_STRING,sdsnew(
"-MASTERDOWN Link with MASTER is down and replica-serve-stale-data is set to 'no'.\r\n"));
shared.bgsaveerr = createObject(OBJ_STRING,sdsnew(
@@ -2324,7 +2326,7 @@ void initServer(void) {
server.client_pause_end_time = 0;
memset(server.client_pause_per_purpose, 0,
sizeof(server.client_pause_per_purpose));
- server.paused_clients = listCreate();
+ server.postponed_clients = listCreate();
server.events_processed_while_blocked = 0;
server.system_memory_size = zmalloc_get_memory_size();
server.blocked_last_cron = 0;
@@ -2411,6 +2413,8 @@ void initServer(void) {
server.cronloops = 0;
server.in_script = 0;
server.in_exec = 0;
+ server.busy_module_yield_flags = BUSY_MODULE_YIELD_NONE;
+ server.busy_module_yield_reply = NULL;
server.core_propagates = 0;
server.propagate_no_multi = 0;
server.module_ctx_nesting = 0;
@@ -3367,6 +3371,16 @@ int processCommand(client *c) {
return C_ERR;
}
+ /* If we're inside a module blocked context yielding that wants to avoid
+ * processing clients, postpone the command. */
+ if (server.busy_module_yield_flags != BUSY_MODULE_YIELD_NONE &&
+ !(server.busy_module_yield_flags & BUSY_MODULE_YIELD_CLIENTS))
+ {
+ c->bpop.timeout = 0;
+ blockClient(c,BLOCKED_POSTPONE);
+ return C_OK;
+ }
+
/* Now lookup the command and check ASAP about trivial error conditions
* such as wrong arity, bad command name and so forth. */
c->cmd = c->lastcmd = lookupCommand(c->argv,c->argc);
@@ -3645,30 +3659,19 @@ int processCommand(client *c) {
return C_OK;
}
- /* Lua script too slow? Only allow a limited number of commands.
+ /* when a busy job is being done (script / module)
+ * Only allow a limited number of commands.
* Note that we need to allow the transactions commands, otherwise clients
* sending a transaction with pipelining without error checking, may have
* the MULTI plus a few initial commands refused, then the timeout
* condition resolves, and the bottom-half of the transaction gets
* executed, see Github PR #7022. */
- if (scriptIsTimedout() &&
- c->cmd->proc != authCommand &&
- c->cmd->proc != helloCommand &&
- c->cmd->proc != replconfCommand &&
- c->cmd->proc != multiCommand &&
- c->cmd->proc != discardCommand &&
- c->cmd->proc != watchCommand &&
- c->cmd->proc != unwatchCommand &&
- c->cmd->proc != quitCommand &&
- c->cmd->proc != resetCommand &&
- c->cmd->proc != shutdownCommand && /* more checks in shutdownCommand */
- !(c->cmd->proc == scriptCommand &&
- c->argc == 2 &&
- tolower(((char*)c->argv[1]->ptr)[0]) == 'k') &&
- !(c->cmd->proc == functionKillCommand) &&
- !(c->cmd->proc == functionStatsCommand))
- {
- if (scriptIsEval()) {
+ if ((scriptIsTimedout() || server.busy_module_yield_flags) && !(c->cmd->flags & CMD_ALLOW_BUSY)) {
+ if (server.busy_module_yield_flags && server.busy_module_yield_reply) {
+ rejectCommandFormat(c, "-BUSY %s", server.busy_module_yield_reply);
+ } else if (server.busy_module_yield_flags) {
+ rejectCommand(c, shared.slowmoduleerr);
+ } else if (scriptIsEval()) {
rejectCommand(c, shared.slowevalerr);
} else {
rejectCommand(c, shared.slowscripterr);
@@ -3691,7 +3694,7 @@ int processCommand(client *c) {
(server.client_pause_type == CLIENT_PAUSE_WRITE && is_may_replicate_command)))
{
c->bpop.timeout = 0;
- blockClient(c,BLOCKED_PAUSE);
+ blockClient(c,BLOCKED_POSTPONE);
return C_OK;
}
@@ -4111,6 +4114,7 @@ void addReplyFlagsForCommand(client *c, struct redisCommand *cmd) {
{CMD_NO_ASYNC_LOADING, "no_async_loading"},
{CMD_NO_MULTI, "no_multi"},
{CMD_MOVABLE_KEYS, "movablekeys"},
+ {CMD_ALLOW_BUSY, "allow_busy"},
{0,NULL}
};
/* "sentinel" and "only-sentinel" are hidden on purpose. */
diff --git a/src/server.h b/src/server.h
index f839f9995..f4813a384 100644
--- a/src/server.h
+++ b/src/server.h
@@ -209,6 +209,7 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT];
#define CMD_NO_ASYNC_LOADING (1ULL<<23)
#define CMD_NO_MULTI (1ULL<<24)
#define CMD_MOVABLE_KEYS (1ULL<<25) /* populated by populateCommandMovableKeys */
+#define CMD_ALLOW_BUSY ((1ULL<<26))
/* Command flags that describe ACLs categories. */
#define ACL_CATEGORY_KEYSPACE (1ULL<<0)
@@ -349,7 +350,7 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT];
#define BLOCKED_MODULE 3 /* Blocked by a loadable module. */
#define BLOCKED_STREAM 4 /* XREAD. */
#define BLOCKED_ZSET 5 /* BZPOP et al. */
-#define BLOCKED_PAUSE 6 /* Blocked by CLIENT PAUSE */
+#define BLOCKED_POSTPONE 6 /* Blocked by processCommand, re-try processing later. */
#define BLOCKED_SHUTDOWN 7 /* SHUTDOWN. */
#define BLOCKED_NUM 8 /* Number of blocked states. */
@@ -603,6 +604,11 @@ typedef enum {
* Value quantization within the range will thus be no larger than 1/100th (or 1%) of any value.
* The total size per histogram should sit around 40 KiB Bytes. */
+/* Busy module flags, see busy_module_yield_flags */
+#define BUSY_MODULE_YIELD_NONE (0)
+#define BUSY_MODULE_YIELD_EVENTS (1<<0)
+#define BUSY_MODULE_YIELD_CLIENTS (1<<1)
+
/*-----------------------------------------------------------------------------
* Data types
*----------------------------------------------------------------------------*/
@@ -1136,7 +1142,7 @@ typedef struct client {
sds peerid; /* Cached peer ID. */
sds sockname; /* Cached connection target address. */
listNode *client_list_node; /* list node in client list */
- listNode *paused_list_node; /* list node within the pause list */
+ listNode *postponed_list_node; /* list node within the postponed list */
listNode *pending_read_list_node; /* list node in clients pending read list */
RedisModuleUserChangedFunc auth_callback; /* Module callback to execute
* when the authenticated user
@@ -1211,7 +1217,8 @@ struct sharedObjectsStruct {
robj *crlf, *ok, *err, *emptybulk, *czero, *cone, *pong, *space,
*queued, *null[4], *nullarray[4], *emptymap[4], *emptyset[4],
*emptyarray, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr,
- *outofrangeerr, *noscripterr, *loadingerr, *slowevalerr, *slowscripterr, *bgsaveerr,
+ *outofrangeerr, *noscripterr, *loadingerr,
+ *slowevalerr, *slowscripterr, *slowmoduleerr, *bgsaveerr,
*masterdownerr, *roslaveerr, *execaborterr, *noautherr, *noreplicaserr,
*busykeyerr, *oomerr, *plus, *messagebulk, *pmessagebulk, *subscribebulk,
*unsubscribebulk, *psubscribebulk, *punsubscribebulk, *del, *unlink,
@@ -1459,6 +1466,8 @@ struct redisServer {
int always_show_logo; /* Show logo even for non-stdout logging. */
int in_script; /* Are we inside EVAL? */
int in_exec; /* Are we inside EXEC? */
+ int busy_module_yield_flags; /* Are we inside a busy module? (triggered by RM_Yield). see BUSY_MODULE_YIELD_ flags. */
+ const char *busy_module_yield_reply; /* When non-null, we are inside RM_Yield. */
int core_propagates; /* Is the core (in oppose to the module subsystem) is in charge of calling propagatePendingCommands? */
int propagate_no_multi; /* True if propagatePendingCommands should avoid wrapping command in MULTI/EXEC */
int module_ctx_nesting; /* moduleCreateContext() nesting level */
@@ -1502,7 +1511,7 @@ struct redisServer {
int in_nested_call; /* If > 0, in a nested call of a call */
rax *clients_index; /* Active clients dictionary by client ID. */
pause_type client_pause_type; /* True if clients are currently paused */
- list *paused_clients; /* List of pause clients */
+ list *postponed_clients; /* List of postponed clients */
mstime_t client_pause_end_time; /* Time when we undo clients_paused */
pause_event *client_pause_per_purpose[NUM_PAUSE_PURPOSES];
char neterr[ANET_ERR_LEN]; /* Error buffer for anet.c */
@@ -1856,7 +1865,7 @@ struct redisServer {
* dropping packets of a specific type */
/* Scripting */
client *script_caller; /* The client running script right now, or NULL */
- mstime_t script_time_limit; /* Script timeout in milliseconds */
+ mstime_t busy_reply_threshold; /* Script / module timeout in milliseconds */
int script_oom; /* OOM detected when script start */
int script_disable_deny_script; /* Allow running commands marked "no-script" inside a script. */
/* Lazy free */
@@ -2461,6 +2470,7 @@ void pauseClients(pause_purpose purpose, mstime_t end, pause_type type);
void unpauseClients(pause_purpose purpose);
int areClientsPaused(void);
int checkClientPauseTimeoutAndReturnIfPaused(void);
+void unblockPostponedClients();
void processEventsWhileBlocked(void);
void whileBlockedCron();
void blockingOperationStarts();
diff --git a/tests/modules/blockedclient.c b/tests/modules/blockedclient.c
index 3e97d6779..a2d7c6d00 100644
--- a/tests/modules/blockedclient.c
+++ b/tests/modules/blockedclient.c
@@ -1,3 +1,8 @@
+/* define macros for having usleep */
+#define _BSD_SOURCE
+#define _DEFAULT_SOURCE
+#include <unistd.h>
+
#include "redismodule.h"
#include <assert.h>
#include <stdio.h>
@@ -5,6 +10,10 @@
#define UNUSED(V) ((void) V)
+/* used to test processing events during slow bg operation */
+static volatile int g_slow_bg_operation = 0;
+static volatile int g_is_in_slow_bg_operation = 0;
+
void *sub_worker(void *arg) {
// Get Redis module context
RedisModuleCtx *ctx = (RedisModuleCtx *)arg;
@@ -99,6 +108,16 @@ void *bg_call_worker(void *arg) {
// Acquire GIL
RedisModule_ThreadSafeContextLock(ctx);
+ // Test slow operation yielding
+ if (g_slow_bg_operation) {
+ g_is_in_slow_bg_operation = 1;
+ while (g_slow_bg_operation) {
+ RedisModule_Yield(ctx, REDISMODULE_YIELD_FLAG_CLIENTS, "Slow module operation");
+ usleep(1000);
+ }
+ g_is_in_slow_bg_operation = 0;
+ }
+
// Call the command
const char* cmd = RedisModule_StringPtrLen(bg->argv[1], NULL);
RedisModuleCallReply* rep = RedisModule_Call(ctx, cmd, "v", bg->argv + 2, bg->argc - 2);
@@ -203,6 +222,73 @@ int do_fake_bg_true(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
return REDISMODULE_OK;
}
+
+/* this flag is used to work with busy commands, that might take a while
+ * and ability to stop the busy work with a different command*/
+static volatile int abort_flag = 0;
+
+int slow_fg_command(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
+ if (argc != 2) {
+ RedisModule_WrongArity(ctx);
+ return REDISMODULE_OK;
+ }
+ long long block_time = 0;
+ if (RedisModule_StringToLongLong(argv[1], &block_time) != REDISMODULE_OK) {
+ RedisModule_ReplyWithError(ctx, "Invalid integer value");
+ return REDISMODULE_OK;
+ }
+
+ uint64_t start_time = RedisModule_MonotonicMicroseconds();
+ /* when not blocking indefinitely, we don't process client commands in this test. */
+ int yield_flags = block_time? REDISMODULE_YIELD_FLAG_NONE: REDISMODULE_YIELD_FLAG_CLIENTS;
+ while (!abort_flag) {
+ RedisModule_Yield(ctx, yield_flags, "Slow module operation");
+ usleep(1000);
+ if (block_time && RedisModule_MonotonicMicroseconds() - start_time > (uint64_t)block_time)
+ break;
+ }
+
+ abort_flag = 0;
+ RedisModule_ReplyWithLongLong(ctx, 1);
+ return REDISMODULE_OK;
+}
+
+int stop_slow_fg_command(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
+ REDISMODULE_NOT_USED(argv);
+ REDISMODULE_NOT_USED(argc);
+ abort_flag = 1;
+ RedisModule_ReplyWithLongLong(ctx, 1);
+ return REDISMODULE_OK;
+}
+
+/* used to enable or disable slow operation in do_bg_rm_call */
+static int set_slow_bg_operation(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
+ if (argc != 2) {
+ RedisModule_WrongArity(ctx);
+ return REDISMODULE_OK;
+ }
+ long long ll;
+ if (RedisModule_StringToLongLong(argv[1], &ll) != REDISMODULE_OK) {
+ RedisModule_ReplyWithError(ctx, "Invalid integer value");
+ return REDISMODULE_OK;
+ }
+ g_slow_bg_operation = ll;
+ RedisModule_ReplyWithSimpleString(ctx, "OK");
+ return REDISMODULE_OK;
+}
+
+/* used to test if we reached the slow operation in do_bg_rm_call */
+static int is_in_slow_bg_operation(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
+ UNUSED(argv);
+ if (argc != 1) {
+ RedisModule_WrongArity(ctx);
+ return REDISMODULE_OK;
+ }
+
+ RedisModule_ReplyWithLongLong(ctx, g_is_in_slow_bg_operation);
+ return REDISMODULE_OK;
+}
+
int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
REDISMODULE_NOT_USED(argv);
REDISMODULE_NOT_USED(argc);
@@ -223,5 +309,17 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
if (RedisModule_CreateCommand(ctx, "do_fake_bg_true", do_fake_bg_true, "", 0, 0, 0) == REDISMODULE_ERR)
return REDISMODULE_ERR;
+ if (RedisModule_CreateCommand(ctx, "slow_fg_command", slow_fg_command,"", 0, 0, 0) == REDISMODULE_ERR)
+ return REDISMODULE_ERR;
+
+ if (RedisModule_CreateCommand(ctx, "stop_slow_fg_command", stop_slow_fg_command,"allow-busy", 0, 0, 0) == REDISMODULE_ERR)
+ return REDISMODULE_ERR;
+
+ if (RedisModule_CreateCommand(ctx, "set_slow_bg_operation", set_slow_bg_operation, "allow-busy", 0, 0, 0) == REDISMODULE_ERR)
+ return REDISMODULE_ERR;
+
+ if (RedisModule_CreateCommand(ctx, "is_in_slow_bg_operation", is_in_slow_bg_operation, "allow-busy", 0, 0, 0) == REDISMODULE_ERR)
+ return REDISMODULE_ERR;
+
return REDISMODULE_OK;
}
diff --git a/tests/modules/datatype.c b/tests/modules/datatype.c
index d763a2412..45a356e52 100644
--- a/tests/modules/datatype.c
+++ b/tests/modules/datatype.c
@@ -2,11 +2,20 @@
* for general ModuleDataType coverage.
*/
+/* define macros for having usleep */
+#define _BSD_SOURCE
+#define _DEFAULT_SOURCE
+#include <unistd.h>
+
#include "redismodule.h"
static RedisModuleType *datatype = NULL;
static int load_encver = 0;
+/* used to test processing events during slow loading */
+static volatile int slow_loading = 0;
+static volatile int is_in_slow_loading = 0;
+
#define DATATYPE_ENC_VER 1
typedef struct {
@@ -25,6 +34,17 @@ static void *datatype_load(RedisModuleIO *io, int encver) {
DataType *dt = (DataType *) RedisModule_Alloc(sizeof(DataType));
dt->intval = intval;
dt->strval = strval;
+
+ if (slow_loading) {
+ RedisModuleCtx *ctx = RedisModule_GetContextFromIO(io);
+ is_in_slow_loading = 1;
+ while (slow_loading) {
+ RedisModule_Yield(ctx, REDISMODULE_YIELD_FLAG_CLIENTS, "Slow module operation");
+ usleep(1000);
+ }
+ is_in_slow_loading = 0;
+ }
+
return dt;
}
@@ -185,6 +205,35 @@ static int datatype_swap(RedisModuleCtx *ctx, RedisModuleString **argv, int argc
return REDISMODULE_OK;
}
+/* used to enable or disable slow loading */
+static int datatype_slow_loading(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
+ if (argc != 2) {
+ RedisModule_WrongArity(ctx);
+ return REDISMODULE_OK;
+ }
+
+ long long ll;
+ if (RedisModule_StringToLongLong(argv[1], &ll) != REDISMODULE_OK) {
+ RedisModule_ReplyWithError(ctx, "Invalid integer value");
+ return REDISMODULE_OK;
+ }
+ slow_loading = ll;
+ RedisModule_ReplyWithSimpleString(ctx, "OK");
+ return REDISMODULE_OK;
+}
+
+/* used to test if we reached the slow loading code */
+static int datatype_is_in_slow_loading(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
+ REDISMODULE_NOT_USED(argv);
+ if (argc != 1) {
+ RedisModule_WrongArity(ctx);
+ return REDISMODULE_OK;
+ }
+
+ RedisModule_ReplyWithLongLong(ctx, is_in_slow_loading);
+ return REDISMODULE_OK;
+}
+
int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
REDISMODULE_NOT_USED(argv);
REDISMODULE_NOT_USED(argc);
@@ -224,5 +273,13 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
"write", 1, 1, 1) == REDISMODULE_ERR)
return REDISMODULE_ERR;
+ if (RedisModule_CreateCommand(ctx, "datatype.slow_loading", datatype_slow_loading,
+ "allow-loading", 0, 0, 0) == REDISMODULE_ERR)
+ return REDISMODULE_ERR;
+
+ if (RedisModule_CreateCommand(ctx, "datatype.is_in_slow_loading", datatype_is_in_slow_loading,
+ "allow-loading", 0, 0, 0) == REDISMODULE_ERR)
+ return REDISMODULE_ERR;
+
return REDISMODULE_OK;
}
diff --git a/tests/unit/functions.tcl b/tests/unit/functions.tcl
index 6ad61fb5f..fe4ce5ad7 100644
--- a/tests/unit/functions.tcl
+++ b/tests/unit/functions.tcl
@@ -239,7 +239,7 @@ start_server {tags {"scripting"}} {
test {FUNCTION - test function kill} {
set rd [redis_deferring_client]
- r config set script-time-limit 10
+ r config set busy-reply-threshold 10
r function load lua test REPLACE [get_function_code test {local a = 1 while true do a = a + 1 end}]
$rd fcall test 0
after 200
@@ -253,7 +253,7 @@ start_server {tags {"scripting"}} {
test {FUNCTION - test script kill not working on function} {
set rd [redis_deferring_client]
- r config set script-time-limit 10
+ r config set busy-reply-threshold 10
r function load lua test REPLACE [get_function_code test {local a = 1 while true do a = a + 1 end}]
$rd fcall test 0
after 200
@@ -268,7 +268,7 @@ start_server {tags {"scripting"}} {
test {FUNCTION - test function kill not working on eval} {
set rd [redis_deferring_client]
- r config set script-time-limit 10
+ r config set busy-reply-threshold 10
$rd eval {local a = 1 while true do a = a + 1 end} 0
after 200
catch {r ping} e
diff --git a/tests/unit/moduleapi/blockedclient.tcl b/tests/unit/moduleapi/blockedclient.tcl
index 8f657afc5..c8651aabd 100644
--- a/tests/unit/moduleapi/blockedclient.tcl
+++ b/tests/unit/moduleapi/blockedclient.tcl
@@ -92,6 +92,83 @@ start_server {tags {"modules"}} {
}
}
+ test {Busy module command} {
+ set busy_time_limit 50
+ set old_time_limit [lindex [r config get busy-reply-threshold] 1]
+ r config set busy-reply-threshold $busy_time_limit
+ set rd [redis_deferring_client]
+
+ # run command that blocks until released
+ set start [clock clicks -milliseconds]
+ $rd slow_fg_command 0
+ $rd flush
+
+ # make sure we get BUSY error, and that we didn't get it too early
+ assert_error {*BUSY Slow module operation*} {r ping}
+ assert_morethan_equal [expr [clock clicks -milliseconds]-$start] $busy_time_limit
+
+ # abort the blocking operation
+ r stop_slow_fg_command
+ wait_for_condition 50 100 {
+ [catch {r ping} e] == 0
+ } else {
+ fail "Failed waiting for busy command to end"
+ }
+ $rd read
+
+ #run command that blocks for 200ms
+ set start [clock clicks -milliseconds]
+ $rd slow_fg_command 200000
+ $rd flush
+ after 10 ;# try to make sure redis started running the command before we proceed
+
+ # make sure we didn't get BUSY error, it simply blocked till the command was done
+ r ping
+ assert_morethan_equal [expr [clock clicks -milliseconds]-$start] 200
+ $rd read
+
+ $rd close
+ r config set busy-reply-threshold $old_time_limit
+ }
+
+ test {RM_Call from blocked client} {
+ set busy_time_limit 50
+ set old_time_limit [lindex [r config get busy-reply-threshold] 1]
+ r config set busy-reply-threshold $busy_time_limit
+
+ # trigger slow operation
+ r set_slow_bg_operation 1
+ r hset hash foo bar
+ set rd [redis_deferring_client]
+ set start [clock clicks -milliseconds]
+ $rd do_bg_rm_call hgetall hash
+
+ # wait till we know we're blocked inside the module
+ wait_for_condition 50 100 {
+ [r is_in_slow_bg_operation] eq 1
+ } else {
+ fail "Failed waiting for slow operation to start"
+ }
+
+ # make sure we get BUSY error, and that we didn't get here too early
+ assert_error {*BUSY Slow module operation*} {r ping}
+ assert_morethan [expr [clock clicks -milliseconds]-$start] $busy_time_limit
+ # abort the blocking operation
+ r set_slow_bg_operation 0
+
+ wait_for_condition 50 100 {
+ [r is_in_slow_bg_operation] eq 0
+ } else {
+ fail "Failed waiting for slow operation to stop"
+ }
+ assert_equal [r ping] {PONG}
+
+ r config set busy-reply-threshold $old_time_limit
+ set res [$rd read]
+ $rd close
+ set _ $res
+ } {foo bar}
+
test {blocked client reaches client output buffer limit} {
r hset hash big [string repeat x 50000]
r hset hash bada [string repeat x 50000]
diff --git a/tests/unit/moduleapi/datatype.tcl b/tests/unit/moduleapi/datatype.tcl
index 0cc687e89..c8fd30ed1 100644
--- a/tests/unit/moduleapi/datatype.tcl
+++ b/tests/unit/moduleapi/datatype.tcl
@@ -55,4 +55,34 @@ start_server {tags {"modules"}} {
r copy sourcekey targetkey
r datatype.get targetkey
} {1234 AAA/sourcekey/targetkey}
+
+ test {DataType: Slow Loading} {
+ r config set busy-reply-threshold 5000 ;# make sure we're using a high default
+ # trigger slow loading
+ r datatype.slow_loading 1
+ set rd [redis_deferring_client]
+ set start [clock clicks -milliseconds]
+ $rd debug reload
+
+ # wait till we know we're blocked inside the module
+ wait_for_condition 50 100 {
+ [r datatype.is_in_slow_loading] eq 1
+ } else {
+ fail "Failed waiting for slow loading to start"
+ }
+
+ # make sure we get LOADING error, and that we didn't get here late (not waiting for busy-reply-threshold)
+ assert_error {*LOADING*} {r ping}
+ assert_lessthan [expr [clock clicks -milliseconds]-$start] 2000
+
+ # abort the blocking operation
+ r datatype.slow_loading 0
+ wait_for_condition 50 100 {
+ [s loading] eq {0}
+ } else {
+ fail "Failed waiting for loading to end"
+ }
+ $rd read
+ $rd close
+ }
}