diff options
-rw-r--r-- | README.md | 11 | ||||
-rw-r--r-- | src/Makefile | 2 | ||||
-rw-r--r-- | src/aof.c | 4 | ||||
-rw-r--r-- | src/db.c | 5 | ||||
-rw-r--r-- | src/eval.c | 8 | ||||
-rw-r--r-- | src/function_lua.c | 183 | ||||
-rw-r--r-- | src/functions.c | 538 | ||||
-rw-r--r-- | src/functions.h | 126 | ||||
-rw-r--r-- | src/object.c | 8 | ||||
-rw-r--r-- | src/rdb.c | 104 | ||||
-rw-r--r-- | src/rdb.h | 4 | ||||
-rw-r--r-- | src/replication.c | 14 | ||||
-rw-r--r-- | src/script.c | 38 | ||||
-rw-r--r-- | src/script.h | 7 | ||||
-rw-r--r-- | src/script_lua.c | 16 | ||||
-rw-r--r-- | src/script_lua.h | 3 | ||||
-rw-r--r-- | src/server.c | 94 | ||||
-rw-r--r-- | src/server.h | 18 | ||||
-rw-r--r-- | tests/integration/replication.tcl | 19 | ||||
-rw-r--r-- | tests/unit/functions.tcl | 280 | ||||
-rw-r--r-- | tests/unit/scripting.tcl | 308 |
21 files changed, 1647 insertions, 143 deletions
@@ -443,6 +443,16 @@ This file also implements both the `SYNC` and `PSYNC` commands that are used in order to perform the first synchronization between masters and replicas, or to continue the replication after a disconnection. +Script +--- +The script unit is compose of 3 units +* `script.c` - integration of scripts with Redis (commands execution, set replication/resp, ..) +* `script_lua.c` - responsible to execute Lua code, uses script.c to interact with Redis from within the Lua code. +* `function_lua.c` - contains the Lua engine implementation, uses script_lua.c to execute the Lua code. +* `functions.c` - Contains Redis Functions implementation (FUNCTION command), uses functions_lua.c if the function it wants to invoke needs the Lua engine. +* `eval.c` - Contains the `eval` implementation using `script_lua.c` to invoke the Lua code. + + Other C files --- @@ -451,7 +461,6 @@ Other C files * `sds.c` is the Redis string library, check https://github.com/antirez/sds for more information. * `anet.c` is a library to use POSIX networking in a simpler way compared to the raw interface exposed by the kernel. * `dict.c` is an implementation of a non-blocking hash table which rehashes incrementally. -* `scripting.c` implements Lua scripting. It is completely self-contained and isolated from the rest of the Redis implementation and is simple enough to understand if you are familiar with the Lua API. * `cluster.c` implements the Redis Cluster. Probably a good read only after being very familiar with the rest of the Redis code base. If you want to read `cluster.c` make sure to read the [Redis Cluster specification][4]. [4]: https://redis.io/topics/cluster-spec diff --git a/src/Makefile b/src/Makefile index 076305b58..1bc6d86ed 100644 --- a/src/Makefile +++ b/src/Makefile @@ -309,7 +309,7 @@ endif REDIS_SERVER_NAME=redis-server$(PROG_SUFFIX) REDIS_SENTINEL_NAME=redis-sentinel$(PROG_SUFFIX) -REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o crcspeed.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o connection.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o +REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o crcspeed.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o connection.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o REDIS_CLI_NAME=redis-cli$(PROG_SUFFIX) REDIS_CLI_OBJ=anet.o adlist.o dict.o redis-cli.o zmalloc.o release.o ae.o redisassert.o crcspeed.o crc64.o siphash.o crc16.o monotonic.o cli_common.o mt19937-64.o REDIS_BENCHMARK_NAME=redis-benchmark$(PROG_SUFFIX) @@ -30,6 +30,7 @@ #include "server.h" #include "bio.h" #include "rio.h" +#include "functions.h" #include <signal.h> #include <fcntl.h> @@ -754,7 +755,8 @@ int loadAppendOnlyFile(char *filename) { serverLog(LL_NOTICE,"Reading RDB preamble from AOF file..."); if (fseek(fp,0,SEEK_SET) == -1) goto readerr; rioInitWithFile(&rdb,fp); - if (rdbLoadRio(&rdb,RDBFLAGS_AOF_PREAMBLE,NULL,server.db) != C_OK) { + + if (rdbLoadRio(&rdb,RDBFLAGS_AOF_PREAMBLE,NULL) != C_OK) { serverLog(LL_WARNING,"Error reading the RDB preamble of the AOF file, AOF loading aborted"); goto readerr; } else { @@ -1744,6 +1744,11 @@ int evalGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult * return genericGetKeys(0, 2, 3, 1, argv, argc, result); } +int functionGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) { + UNUSED(cmd); + return genericGetKeys(0, 2, 3, 1, argv, argc, result); +} + int lmpopGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) { UNUSED(cmd); return genericGetKeys(0, 1, 2, 1, argv, argc, result); diff --git a/src/eval.c b/src/eval.c index f8fe8bf37..977cad721 100644 --- a/src/eval.c +++ b/src/eval.c @@ -257,7 +257,7 @@ void scriptingInit(int setup) { /* Lua beginners often don't use "local", this is likely to introduce * subtle bugs in their code. To prevent problems we protect accesses * to global variables. */ - luaEnableGlobalsProtection(lua); + luaEnableGlobalsProtection(lua, 1); lctx.lua = lua; } @@ -443,6 +443,8 @@ void evalGenericCommand(client *c, int evalsha) { scriptRunCtx rctx; scriptPrepareForRun(&rctx, lctx.lua_client, c, lctx.lua_cur_script); + rctx.flags |= SCRIPT_EVAL_MODE; /* mark the current run as legacy so we + will get legacy error messages and logs */ if (!lctx.lua_replicate_commands) rctx.flags |= SCRIPT_EVAL_REPLICATION; /* This check is for EVAL_RO, EVALSHA_RO. We want to allow only read only commands */ if ((server.script_caller->cmd->proc == evalRoCommand || @@ -584,7 +586,7 @@ NULL addReplyBulkCBuffer(c,sha,40); forceCommandPropagation(c,PROPAGATE_REPL|PROPAGATE_AOF); } else if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"kill")) { - scriptKill(c); + scriptKill(c, 1); } else if (c->argc == 3 && !strcasecmp(c->argv[1]->ptr,"debug")) { if (clientHasPendingReplies(c)) { addReplyError(c,"SCRIPT DEBUG must be called outside a pipeline"); @@ -610,7 +612,7 @@ NULL } unsigned long evalMemory() { - return lua_gc(lctx.lua, LUA_GCCOUNT, 0) * 1024LL; + return luaMemory(lctx.lua); } dict* evalScriptsDict() { diff --git a/src/function_lua.c b/src/function_lua.c new file mode 100644 index 000000000..864ced809 --- /dev/null +++ b/src/function_lua.c @@ -0,0 +1,183 @@ +/* + * Copyright (c) 2021, Redis Ltd. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of Redis nor the names of its contributors may be used + * to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +/* + * function_lua.c unit provides the Lua engine functionality. + * Including registering the engine and implementing the engine + * callbacks: + * * Create a function from blob (usually text) + * * Invoke a function + * * Free function memory + * * Get memory usage + * + * Uses script_lua.c to run the Lua code. + */ + +#include "functions.h" +#include "script_lua.h" +#include <lua.h> +#include <lauxlib.h> +#include <lualib.h> + +#define LUA_ENGINE_NAME "LUA" +#define REGISTRY_ENGINE_CTX_NAME "__ENGINE_CTX__" +#define REGISTRY_ERROR_HANDLER_NAME "__ERROR_HANDLER__" + +/* Lua engine ctx */ +typedef struct luaEngineCtx { + lua_State *lua; +} luaEngineCtx; + +/* Lua function ctx */ +typedef struct luaFunctionCtx { + /* Special ID that allows getting the Lua function object from the Lua registry */ + int lua_function_ref; +} luaFunctionCtx; + +/* + * Compile a given blob and save it on the registry. + * Return a function ctx with Lua ref that allows to later retrieve the + * function from the registry. + * + * Return NULL on compilation error and set the error to the err variable + */ +static void* luaEngineCreate(void *engine_ctx, sds blob, sds *err) { + luaEngineCtx *lua_engine_ctx = engine_ctx; + lua_State *lua = lua_engine_ctx->lua; + if (luaL_loadbuffer(lua, blob, sdslen(blob), "@user_function")) { + *err = sdsempty(); + *err = sdscatprintf(*err, "Error compiling function: %s", + lua_tostring(lua, -1)); + lua_pop(lua, 1); + return NULL; + } + + serverAssert(lua_isfunction(lua, -1)); + + int lua_function_ref = luaL_ref(lua, LUA_REGISTRYINDEX); + + luaFunctionCtx *f_ctx = zmalloc(sizeof(*f_ctx)); + *f_ctx = (luaFunctionCtx ) { .lua_function_ref = lua_function_ref, }; + + return f_ctx; +} + +/* + * Invole the give function with the given keys and args + */ +static void luaEngineCall(scriptRunCtx *run_ctx, + void *engine_ctx, + void *compiled_function, + robj **keys, + size_t nkeys, + robj **args, + size_t nargs) +{ + luaEngineCtx *lua_engine_ctx = engine_ctx; + lua_State *lua = lua_engine_ctx->lua; + luaFunctionCtx *f_ctx = compiled_function; + + /* Push error handler */ + lua_pushstring(lua, REGISTRY_ERROR_HANDLER_NAME); + lua_gettable(lua, LUA_REGISTRYINDEX); + + lua_rawgeti(lua, LUA_REGISTRYINDEX, f_ctx->lua_function_ref); + + serverAssert(lua_isfunction(lua, -1)); + + luaCallFunction(run_ctx, lua, keys, nkeys, args, nargs, 0); + lua_pop(lua, 1); /* Pop error handler */ +} + +static size_t luaEngineGetUsedMemoy(void *engine_ctx) { + luaEngineCtx *lua_engine_ctx = engine_ctx; + return luaMemory(lua_engine_ctx->lua); +} + +static size_t luaEngineFunctionMemoryOverhead(void *compiled_function) { + return zmalloc_size(compiled_function); +} + +static size_t luaEngineMemoryOverhead(void *engine_ctx) { + luaEngineCtx *lua_engine_ctx = engine_ctx; + return zmalloc_size(lua_engine_ctx); +} + +static void luaEngineFreeFunction(void *engine_ctx, void *compiled_function) { + luaEngineCtx *lua_engine_ctx = engine_ctx; + lua_State *lua = lua_engine_ctx->lua; + luaFunctionCtx *f_ctx = compiled_function; + lua_unref(lua, f_ctx->lua_function_ref); + zfree(f_ctx); +} + +/* Initialize Lua engine, should be called once on start. */ +int luaEngineInitEngine() { + luaEngineCtx *lua_engine_ctx = zmalloc(sizeof(*lua_engine_ctx)); + lua_engine_ctx->lua = lua_open(); + + luaRegisterRedisAPI(lua_engine_ctx->lua); + + /* Save error handler to registry */ + lua_pushstring(lua_engine_ctx->lua, REGISTRY_ERROR_HANDLER_NAME); + char *errh_func = "local dbg = debug\n" + "local error_handler = function (err)\n" + " local i = dbg.getinfo(2,'nSl')\n" + " if i and i.what == 'C' then\n" + " i = dbg.getinfo(3,'nSl')\n" + " end\n" + " if i then\n" + " return i.source .. ':' .. i.currentline .. ': ' .. err\n" + " else\n" + " return err\n" + " end\n" + "end\n" + "return error_handler"; + luaL_loadbuffer(lua_engine_ctx->lua, errh_func, strlen(errh_func), "@err_handler_def"); + lua_pcall(lua_engine_ctx->lua,0,1,0); + lua_settable(lua_engine_ctx->lua, LUA_REGISTRYINDEX); + + /* save the engine_ctx on the registry so we can get it from the Lua interpreter */ + luaSaveOnRegistry(lua_engine_ctx->lua, REGISTRY_ENGINE_CTX_NAME, lua_engine_ctx); + + luaEnableGlobalsProtection(lua_engine_ctx->lua, 0); + + + engine *lua_engine = zmalloc(sizeof(*lua_engine)); + *lua_engine = (engine) { + .engine_ctx = lua_engine_ctx, + .create = luaEngineCreate, + .call = luaEngineCall, + .get_used_memory = luaEngineGetUsedMemoy, + .get_function_memory_overhead = luaEngineFunctionMemoryOverhead, + .get_engine_memory_overhead = luaEngineMemoryOverhead, + .free_function = luaEngineFreeFunction, + }; + return functionsRegisterEngine(LUA_ENGINE_NAME, lua_engine); +} diff --git a/src/functions.c b/src/functions.c new file mode 100644 index 000000000..b0d7a2305 --- /dev/null +++ b/src/functions.c @@ -0,0 +1,538 @@ +/* + * Copyright (c) 2021, Redis Ltd. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of Redis nor the names of its contributors may be used + * to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include "functions.h" +#include "sds.h" +#include "dict.h" +#include "adlist.h" +#include "atomicvar.h" + +static size_t engine_cache_memory = 0; + +/* Forward declaration */ +static void engineFunctionDispose(dict *d, void *obj); + +struct functionsCtx { + dict *functions; /* Function name -> Function object that can be used to run the function */ + size_t cache_memory /* Overhead memory (structs, dictionaries, ..) used by all the functions */; +}; + +dictType engineDictType = { + dictSdsCaseHash, /* hash function */ + dictSdsDup, /* key dup */ + NULL, /* val dup */ + dictSdsKeyCaseCompare, /* key compare */ + dictSdsDestructor, /* key destructor */ + NULL, /* val destructor */ + NULL /* allow to expand */ +}; + +dictType functionDictType = { + dictSdsHash, /* hash function */ + dictSdsDup, /* key dup */ + NULL, /* val dup */ + dictSdsKeyCompare, /* key compare */ + dictSdsDestructor, /* key destructor */ + engineFunctionDispose,/* val destructor */ + NULL /* allow to expand */ +}; + +/* Dictionary of engines */ +static dict *engines = NULL; + +/* Functions Ctx. + * Contains the dictionary that map a function name to + * function object and the cache memory used by all the functions */ +static functionsCtx *functions_ctx = NULL; + +static size_t functionMallocSize(functionInfo *fi) { + return zmalloc_size(fi) + sdsZmallocSize(fi->name) + + (fi->desc ? sdsZmallocSize(fi->desc) : 0) + + sdsZmallocSize(fi->code) + + fi->ei->engine->get_function_memory_overhead(fi->function); +} + +/* Dispose function memory */ +static void engineFunctionDispose(dict *d, void *obj) { + UNUSED(d); + functionInfo *fi = obj; + sdsfree(fi->code); + sdsfree(fi->name); + if (fi->desc) { + sdsfree(fi->desc); + } + engine *engine = fi->ei->engine; + engine->free_function(engine->engine_ctx, fi->function); + zfree(fi); +} + +/* Free function memory and detele it from the functions dictionary */ +static void engineFunctionFree(functionInfo *fi, functionsCtx *functions) { + functions->cache_memory -= functionMallocSize(fi); + + dictDelete(functions->functions, fi->name); +} + +/* Clear all the functions from the given functions ctx */ +void functionsCtxClear(functionsCtx *functions_ctx) { + dictEmpty(functions_ctx->functions, NULL); + functions_ctx->cache_memory = 0; +} + +/* Free the given functions ctx */ +void functionsCtxFree(functionsCtx *functions_ctx) { + functionsCtxClear(functions_ctx); + dictRelease(functions_ctx->functions); + zfree(functions_ctx); +} + +/* Swap the current functions ctx with the given one. + * Free the old functions ctx. */ +void functionsCtxSwapWithCurrent(functionsCtx *new_functions_ctx) { + functionsCtxFree(functions_ctx); + functions_ctx = new_functions_ctx; +} + +/* return the current functions ctx */ +functionsCtx* functionsCtxGetCurrent() { + return functions_ctx; +} + +/* Create a new functions ctx */ +functionsCtx* functionsCtxCreate() { + functionsCtx *ret = zmalloc(sizeof(functionsCtx)); + ret->functions = dictCreate(&functionDictType); + ret->cache_memory = 0; + return ret; +} + +/* + * Register a function info to functions dictionary + * 1. Set the function client + * 2. Add function to functions dictionary + * 3. update cache memory + */ +static void engineFunctionRegister(functionInfo *fi, functionsCtx *functions) { + int res = dictAdd(functions->functions, fi->name, fi); + serverAssert(res == DICT_OK); + + functions->cache_memory += functionMallocSize(fi); +} + +/* + * Creating a function info object and register it. + * Return the created object + */ +static functionInfo* engineFunctionCreate(sds name, void *function, engineInfo *ei, + sds desc, sds code, functionsCtx *functions) +{ + + functionInfo *fi = zmalloc(sizeof(*fi)); + *fi = (functionInfo ) { + .name = sdsdup(name), + .function = function, + .ei = ei, + .code = sdsdup(code), + .desc = desc ? sdsdup(desc) : NULL, + }; + + engineFunctionRegister(fi, functions); + + return fi; +} + +/* Register an engine, should be called once by the engine on startup and give the following: + * + * - engine_name - name of the engine to register + * - engine_ctx - the engine ctx that should be used by Redis to interact with the engine */ +int functionsRegisterEngine(const char *engine_name, engine *engine) { + sds engine_name_sds = sdsnew(engine_name); + if (dictFetchValue(engines, engine_name_sds)) { + serverLog(LL_WARNING, "Same engine was registered twice"); + sdsfree(engine_name_sds); + return C_ERR; + } + + client *c = createClient(NULL); + c->flags |= (CLIENT_DENY_BLOCKING | CLIENT_SCRIPT); + engineInfo *ei = zmalloc(sizeof(*ei)); + *ei = (engineInfo ) { .name = engine_name_sds, .engine = engine, .c = c,}; + + dictAdd(engines, engine_name_sds, ei); + + engine_cache_memory += zmalloc_size(ei) + sdsZmallocSize(ei->name) + + zmalloc_size(engine) + + engine->get_engine_memory_overhead(engine->engine_ctx); + + return C_OK; +} + +/* + * FUNCTION STATS + */ +void functionsStatsCommand(client *c) { + if (scriptIsRunning() && scriptIsEval()) { + addReplyErrorObject(c, shared.slowevalerr); + return; + } + + addReplyMapLen(c, 2); + + addReplyBulkCString(c, "running_script"); + if (!scriptIsRunning()) { + addReplyNull(c); + } else { + addReplyMapLen(c, 3); + addReplyBulkCString(c, "name"); + addReplyBulkCString(c, scriptCurrFunction()); + addReplyBulkCString(c, "command"); + client *script_client = scriptGetCaller(); + addReplyArrayLen(c, script_client->argc); + for (int i = 0 ; i < script_client->argc ; ++i) { + addReplyBulkCBuffer(c, script_client->argv[i]->ptr, sdslen(script_client->argv[i]->ptr)); + } + addReplyBulkCString(c, "duration_ms"); + addReplyLongLong(c, scriptRunDuration()); + } + + addReplyBulkCString(c, "engines"); + addReplyArrayLen(c, dictSize(engines)); + dictIterator *iter = dictGetIterator(engines); + dictEntry *entry = NULL; + while ((entry = dictNext(iter))) { + engineInfo *ei = dictGetVal(entry); + addReplyBulkCString(c, ei->name); + } + dictReleaseIterator(iter); +} + +/* + * FUNCTION LIST + */ +void functionsListCommand(client *c) { + /* general information on all the functions */ + addReplyArrayLen(c, dictSize(functions_ctx->functions)); + dictIterator *iter = dictGetIterator(functions_ctx->functions); + dictEntry *entry = NULL; + while ((entry = dictNext(iter))) { + functionInfo *fi = dictGetVal(entry); + addReplyMapLen(c, 3); + addReplyBulkCString(c, "name"); + addReplyBulkCBuffer(c, fi->name, sdslen(fi->name)); + addReplyBulkCString(c, "engine"); + addReplyBulkCBuffer(c, fi->ei->name, sdslen(fi->ei->name)); + addReplyBulkCString(c, "description"); + if (fi->desc) { + addReplyBulkCBuffer(c, fi->desc, sdslen(fi->desc)); + } else { + addReplyNull(c); + } + } + dictReleaseIterator(iter); +} + +/* + * FUNCTION INFO <FUNCTION NAME> [WITHCODE] + */ +void functionsInfoCommand(client *c) { + if (c->argc > 4) { + addReplyErrorFormat(c,"wrong number of arguments for '%s' command or subcommand", c->cmd->name); + return; + } + /* dedicated information on specific function */ + robj *function_name = c->argv[2]; + int with_code = 0; + if (c->argc == 4) { + robj *with_code_arg = c->argv[3]; + if (!strcasecmp(with_code_arg->ptr, "withcode")) { + with_code = 1; + } + } + + functionInfo *fi = dictFetchValue(functions_ctx->functions, function_name->ptr); + if (!fi) { + addReplyError(c, "Function does not exists"); + return; + } + addReplyMapLen(c, with_code? 4 : 3); + addReplyBulkCString(c, "name"); + addReplyBulkCBuffer(c, fi->name, sdslen(fi->name)); + addReplyBulkCString(c, "engine"); + addReplyBulkCBuffer(c, fi->ei->name, sdslen(fi->ei->name)); + addReplyBulkCString(c, "description"); + if (fi->desc) { + addReplyBulkCBuffer(c, fi->desc, sdslen(fi->desc)); + } else { + addReplyNull(c); + } + if (with_code) { + addReplyBulkCString(c, "code"); + addReplyBulkCBuffer(c, fi->code, sdslen(fi->code)); + } +} + +/* + * FUNCTION DELETE <FUNCTION NAME> + */ +void functionsDeleteCommand(client *c) { + if (server.masterhost && server.repl_slave_ro && !(c->flags & CLIENT_MASTER)) { + addReplyError(c, "Can not delete a function on a read only replica"); + return; + } + + robj *function_name = c->argv[2]; + functionInfo *fi = dictFetchValue(functions_ctx->functions, function_name->ptr); + if (!fi) { + addReplyError(c, "Function not found"); + return; + } + + engineFunctionFree(fi, functions_ctx); + forceCommandPropagation(c, PROPAGATE_REPL | PROPAGATE_AOF); + addReply(c, shared.ok); +} + +void functionsKillCommand(client *c) { + scriptKill(c, 0); +} + +static void fcallCommandGeneric(client *c, int ro) { + robj *function_name = c->argv[1]; + functionInfo *fi = dictFetchValue(functions_ctx->functions, function_name->ptr); + if (!fi) { + addReplyError(c, "Function not found"); + return; + } + engine *engine = fi->ei->engine; + + long long numkeys; + /* Get the number of arguments that are keys */ + if (getLongLongFromObject(c->argv[2], &numkeys) != C_OK) { + addReplyError(c, "Bad number of keys provided"); + return; + } + if (numkeys > (c->argc - 3)) { + addReplyError(c, "Number of keys can't be greater than number of args"); + return; + } else if (numkeys < 0) { + addReplyError(c, "Number of keys can't be negative"); + return; + } + + scriptRunCtx run_ctx; + + scriptPrepareForRun(&run_ctx, fi->ei->c, c, fi->name); + if (ro) { + run_ctx.flags |= SCRIPT_READ_ONLY; + } + engine->call(&run_ctx, engine->engine_ctx, fi->function, c->argv + 3, numkeys, + c->argv + 3 + numkeys, c->argc - 3 - numkeys); + scriptResetRun(&run_ctx); +} + +/* + * FCALL <FUNCTION NAME> nkeys <key1 .. keyn> <arg1 .. argn> + */ +void fcallCommand(client *c) { + fcallCommandGeneric(c, 0); +} + +/* + * FCALL_RO <FUNCTION NAME> nkeys <key1 .. keyn> <arg1 .. argn> + */ +void fcallCommandReadOnly(client *c) { + fcallCommandGeneric(c, 1); +} + +void functionsHelpCommand(client *c) { + const char *help[] = { +"CREATE <ENGINE NAME> <FUNCTION NAME> [REPLACE] [DESC <FUNCTION DESCRIPTION>] <FUNCTION CODE>", +" Create a new function with the given function name and code.", +"DELETE <FUNCTION NAME>", +" Delete the given function.", +"INFO <FUNCTION NAME> [WITHCODE]", +" For each function, print the following information about the function:", +" * Function name", +" * The engine used to run the function", +" * Function description", +" * Function code (only if WITHCODE is given)", +"LIST", +" Return general information on all the functions:", +" * Function name", +" * The engine used to run the function", +" * Function description", +"STATS", +" Return information about the current function running:", +" * Function name", +" * Command used to run the function", +" * Duration in MS that the function is running", +" If not function is running, return nil", +" In addition, returns a list of available engines.", +"KILL", +" Kill the current running function.", +NULL }; + addReplyHelp(c, help); +} + +/* Compile and save the given function, return C_OK on success and C_ERR on failure. + * In case on failure the err out param is set with relevant error message */ +int functionsCreateWithFunctionCtx(sds function_name,sds engine_name, sds desc, sds code, + int replace, sds* err, functionsCtx *functions) { + engineInfo *ei = dictFetchValue(engines, engine_name); + if (!ei) { + *err = sdsnew("Engine not found"); + return C_ERR; + } + engine *engine = ei->engine; + + functionInfo *fi = dictFetchValue(functions->functions, function_name); + if (fi && !replace) { + *err = sdsnew("Function already exists"); + return C_ERR; + } + + void *function = engine->create(engine->engine_ctx, code, err); + if (*err) { + return C_ERR; + } + + if (fi) { + /* free the already existing function as we are going to replace it */ + engineFunctionFree(fi, functions); + } + + engineFunctionCreate(function_name, function, ei, desc, code, functions); + + return C_OK; +} + +/* + * FUNCTION CREATE <ENGINE NAME> <FUNCTION NAME> + * [REPLACE] [DESC <FUNCTION DESCRIPTION>] <FUNCTION CODE> + * + * ENGINE NAME - name of the engine to use the run the function + * FUNCTION NAME - name to use to invoke the function + * REPLACE - optional, replace existing function + * DESCRIPTION - optional, function description + * FUNCTION CODE - function code to pass to the engine + */ +void functionsCreateCommand(client *c) { + + if (server.masterhost && server.repl_slave_ro && !(c->flags & CLIENT_MASTER)) { + addReplyError(c, "Can not create a function on a read only replica"); + return; + } + + robj *engine_name = c->argv[2]; + robj *function_name = c->argv[3]; + + int replace = 0; + int argc_pos = 4; + sds desc = NULL; + while (argc_pos < c->argc - 1) { + robj *next_arg = c->argv[argc_pos++]; + if (!strcasecmp(next_arg->ptr, "replace")) { + replace = 1; + continue; + } + if (!strcasecmp(next_arg->ptr, "description")) { + if (argc_pos >= c->argc) { + addReplyError(c, "Bad function description"); + return; + } + desc = c->argv[argc_pos++]->ptr; + continue; + } + } + + if (argc_pos >= c->argc) { + addReplyError(c, "Function code is missing"); + return; + } + + robj *code = c->argv[argc_pos]; + sds err = NULL; + if (functionsCreateWithFunctionCtx(function_name->ptr, engine_name->ptr, + desc, code->ptr, replace, &err, functions_ctx) != C_OK) + { + addReplyErrorSds(c, err); + return; + } + forceCommandPropagation(c, PROPAGATE_REPL | PROPAGATE_AOF); + addReply(c, shared.ok); +} + +/* Return memory usage of all the engines combine */ +unsigned long functionsMemory() { + dictIterator *iter = dictGetIterator(engines); + dictEntry *entry = NULL; + size_t engines_nemory = 0; + while ((entry = dictNext(iter))) { + engineInfo *ei = dictGetVal(entry); + engine *engine = ei->engine; + engines_nemory += engine->get_used_memory(engine->engine_ctx); + } + dictReleaseIterator(iter); + + return engines_nemory; +} + +/* Return memory overhead of all the engines combine */ +unsigned long functionsMemoryOverhead() { + size_t memory_overhead = dictSize(engines) * sizeof(dictEntry) + + dictSlots(engines) * sizeof(dictEntry*); + memory_overhead += dictSize(functions_ctx->functions) * sizeof(dictEntry) + + dictSlots(functions_ctx->functions) * sizeof(dictEntry*) + sizeof(functionsCtx); + memory_overhead += functions_ctx->cache_memory; + memory_overhead += engine_cache_memory; + + return memory_overhead; +} + +/* Returns the number of functions */ +unsigned long functionsNum() { + return dictSize(functions_ctx->functions); +} + +dict* functionsGet() { + return functions_ctx->functions; +} + +/* Initialize engine data structures. + * Should be called once on server initialization */ +int functionsInit() { + engines = dictCreate(&engineDictType); + functions_ctx = functionsCtxCreate(); + + if (luaEngineInitEngine() != C_OK) { + return C_ERR; + } + + return C_OK; +} diff --git a/src/functions.h b/src/functions.h new file mode 100644 index 000000000..8675883df --- /dev/null +++ b/src/functions.h @@ -0,0 +1,126 @@ +/* + * Copyright (c) 2021, Redis Ltd. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of Redis nor the names of its contributors may be used + * to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef __FUNCTIONS_H_ +#define __FUNCTIONS_H_ + +/* + * functions.c unit provides the Redis Functions API: + * * FUNCTION CREATE + * * FUNCTION CALL + * * FUNCTION DELETE + * * FUNCTION KILL + * * FUNCTION INFO + * + * Also contains implementation for: + * * Save/Load function from rdb + * * Register engines + */ + +#include "server.h" +#include "script.h" +#include "redismodule.h" + +typedef struct engine { + /* engine specific context */ + void *engine_ctx; + + /* Create function callback, get the engine_ctx, and function code. + * returns NULL on error and set sds to be the error message */ + void* (*create)(void *engine_ctx, sds code, sds *err); + + /* Invoking a function, r_ctx is an opaque object (from engine POV). + * The r_ctx should be used by the engine to interaction with Redis, + * such interaction could be running commands, set resp, or set + * replication mode + */ + void (*call)(scriptRunCtx *r_ctx, void *engine_ctx, void *compiled_function, + robj **keys, size_t nkeys, robj **args, size_t nargs); + + /* get current used memory by the engine */ + size_t (*get_used_memory)(void *engine_ctx); + + /* Return memory overhead for a given function, + * such memory is not counted as engine memory but as general + * structs memory that hold different information */ + size_t (*get_function_memory_overhead)(void *compiled_function); + + /* Return memory overhead for engine (struct size holding the engine)*/ + size_t (*get_engine_memory_overhead)(void *engine_ctx); + + /* free the given function */ + void (*free_function)(void *engine_ctx, void *compiled_function); +} engine; + +/* Hold information about an engine. + * Used on rdb.c so it must be declared here. */ +typedef struct engineInfo { + sds name; /* Name of the engine */ + engine *engine; /* engine callbacks that allows to interact with the engine */ + client *c; /* Client that is used to run commands */ +} engineInfo; + +/* Hold information about the specific function. + * Used on rdb.c so it must be declared here. */ +typedef struct functionInfo { + sds name; /* Function name */ + void *function; /* Opaque object that set by the function's engine and allow it + to run the function, usually it's the function compiled code. */ + engineInfo *ei; /* Pointer to the function engine */ + sds code; /* Function code */ + sds desc; /* Function description */ +} functionInfo; + +int functionsRegisterEngine(const char *engine_name, engine *engine_ctx); +int functionsCreateWithFunctionCtx(sds function_name, sds engine_name, sds desc, sds code, + int replace, sds* err, functionsCtx *functions); +void functionsCreateCommand(client *c); +void fcallCommand(client *c); +void fcallCommandReadOnly(client *c); +void functionsDeleteCommand(client *c); +void functionsKillCommand(client *c); +void functionsStatsCommand(client *c); +void functionsInfoCommand(client *c); +void functionsListCommand(client *c); +void functionsHelpCommand(client *c); +unsigned long functionsMemory(); +unsigned long functionsMemoryOverhead(); +int functionsLoad(rio *rdb, int ver); +unsigned long functionsNum(); +dict* functionsGet(); +functionsCtx* functionsCtxGetCurrent(); +functionsCtx* functionsCtxCreate(); +void functionsCtxFree(functionsCtx *functions_ctx); +void functionsCtxClear(functionsCtx *functions_ctx); +void functionsCtxSwapWithCurrent(functionsCtx *functions_ctx); + +int luaEngineInitEngine(); +int functionsInit(); + +#endif /* __FUNCTIONS_H_ */ diff --git a/src/object.c b/src/object.c index dab0648a8..7a5563ccb 100644 --- a/src/object.c +++ b/src/object.c @@ -29,6 +29,7 @@ */ #include "server.h" +#include "functions.h" #include <math.h> #include <ctype.h> @@ -1212,6 +1213,8 @@ struct redisMemOverhead *getMemoryOverheadData(void) { } mh->lua_caches = mem; mem_total+=mem; + mh->functions_caches = functionsMemoryOverhead(); + mem_total+=mh->functions_caches; for (j = 0; j < server.dbnum; j++) { redisDb *db = server.db+j; @@ -1527,7 +1530,7 @@ NULL } else if (!strcasecmp(c->argv[1]->ptr,"stats") && c->argc == 2) { struct redisMemOverhead *mh = getMemoryOverheadData(); - addReplyMapLen(c,25+mh->num_dbs); + addReplyMapLen(c,26+mh->num_dbs); addReplyBulkCString(c,"peak.allocated"); addReplyLongLong(c,mh->peak_allocated); @@ -1553,6 +1556,9 @@ NULL addReplyBulkCString(c,"lua.caches"); addReplyLongLong(c,mh->lua_caches); + addReplyBulkCString(c,"functions.caches"); + addReplyLongLong(c,mh->functions_caches); + for (size_t j = 0; j < mh->num_dbs; j++) { char dbname[32]; snprintf(dbname,sizeof(dbname),"db.%zd",mh->db[j].dbid); @@ -32,6 +32,7 @@ #include "zipmap.h" #include "endianconv.h" #include "stream.h" +#include "functions.h" #include <math.h> #include <fcntl.h> @@ -1239,6 +1240,25 @@ int rdbSaveRio(rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi) { if (rdbSaveInfoAuxFields(rdb,rdbflags,rsi) == -1) goto werr; if (rdbSaveModulesAux(rdb, REDISMODULE_AUX_BEFORE_RDB) == -1) goto werr; + /* save functions */ + dict *functions = functionsGet(); + dictIterator *iter = dictGetIterator(functions); + dictEntry *entry = NULL; + while ((entry = dictNext(iter))) { + rdbSaveType(rdb, RDB_OPCODE_FUNCTION); + functionInfo* fi = dictGetVal(entry); + if (rdbSaveRawString(rdb, (unsigned char *) fi->name, sdslen(fi->name)) == -1) goto werr; + if (rdbSaveRawString(rdb, (unsigned char *) fi->ei->name, sdslen(fi->ei->name)) == -1) goto werr; + if (fi->desc) { + if (rdbSaveLen(rdb, 1) == -1) goto werr; /* desc exists */ + if (rdbSaveRawString(rdb, (unsigned char *) fi->desc, sdslen(fi->desc)) == -1) goto werr; + } else { + if (rdbSaveLen(rdb, 0) == -1) goto werr; /* desc not exists */ + } + if (rdbSaveRawString(rdb, (unsigned char *) fi->code, sdslen(fi->code)) == -1) goto werr; + } + dictReleaseIterator(iter); + for (j = 0; j < server.dbnum; j++) { redisDb *db = server.db+j; dict *d = db->dict; @@ -2687,12 +2707,80 @@ void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) { } } +static int rdbFunctionLoad(rio *rdb, int ver, functionsCtx* functions_ctx) { + UNUSED(ver); + sds name = NULL; + sds engine_name = NULL; + sds desc = NULL; + sds blob = NULL; + sds err = NULL; + uint64_t has_desc; + int res = C_ERR; + if (!(name = rdbGenericLoadStringObject(rdb, RDB_LOAD_SDS, NULL))) { + serverLog(LL_WARNING, "Failed loading function name"); + goto error; + } + + if (!(engine_name = rdbGenericLoadStringObject(rdb, RDB_LOAD_SDS, NULL))) { + serverLog(LL_WARNING, "Failed loading engine name"); + goto error; + } + + if ((has_desc = rdbLoadLen(rdb, NULL)) == RDB_LENERR) { + serverLog(LL_WARNING, "Failed loading function desc indicator"); + goto error; + } + + if (has_desc && !(desc = rdbGenericLoadStringObject(rdb, RDB_LOAD_SDS, NULL))) { + serverLog(LL_WARNING, "Failed loading function desc"); + goto error; + } + + if (!(blob = rdbGenericLoadStringObject(rdb, RDB_LOAD_SDS, NULL))) { + serverLog(LL_WARNING, "Failed loading function blob"); + goto error; + } + + if (functionsCreateWithFunctionCtx(name, engine_name, desc, blob, 0, &err, functions_ctx) != C_OK) { + serverLog(LL_WARNING, "Failed compiling and saving the function %s", err); + goto error; + } + + res = C_OK; + +error: + if (name) sdsfree(name); + if (engine_name) sdsfree(engine_name); + if (desc) sdsfree(desc); + if (blob) sdsfree(blob); + if (err) sdsfree(err); + return res; +} + /* Load an RDB file from the rio stream 'rdb'. On success C_OK is returned, * otherwise C_ERR is returned and 'errno' is set accordingly. */ -int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi, redisDb *dbarray) { +int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { + functionsCtx* functions_ctx = functionsCtxGetCurrent(); + functionsCtxClear(functions_ctx); + rdbLoadingCtx loading_ctx = { .dbarray = server.db, .functions_ctx = functions_ctx }; + int retval = rdbLoadRioWithLoadingCtx(rdb,rdbflags,rsi,&loading_ctx); + if (retval != C_OK) { + /* Loading failed, clear the function ctx */ + functionsCtxClear(functions_ctx); + } + return retval; +} + + +/* Load an RDB file from the rio stream 'rdb'. On success C_OK is returned, + * otherwise C_ERR is returned and 'errno' is set accordingly. + * The rdb_loading_ctx argument holds objects to which the rdb will be loaded to, + * currently it only allow to set db object and functionsCtx to which the data + * will be loaded (in the future it might contains more such objects). */ +int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadingCtx *rdb_loading_ctx) { uint64_t dbid = 0; int type, rdbver; - redisDb *db = dbarray+0; + redisDb *db = rdb_loading_ctx->dbarray+0; char buf[1024]; int error; long long empty_keys_skipped = 0; @@ -2764,7 +2852,7 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi, redisDb *dbarray) { "databases. Exiting\n", server.dbnum); exit(1); } - db = dbarray+dbid; + db = rdb_loading_ctx->dbarray+dbid; continue; /* Read next opcode. */ } else if (type == RDB_OPCODE_RESIZEDB) { /* RESIZEDB: Hint about the size of the keys in the currently @@ -2895,6 +2983,12 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi, redisDb *dbarray) { decrRefCount(aux); continue; /* Read next opcode. */ } + } else if (type == RDB_OPCODE_FUNCTION) { + if (rdbFunctionLoad(rdb, rdbver, rdb_loading_ctx->functions_ctx) != C_OK) { + serverLog(LL_WARNING,"Failed loading function"); + goto eoferr; + } + continue; } /* Read key */ @@ -3044,7 +3138,9 @@ int rdbLoad(char *filename, rdbSaveInfo *rsi, int rdbflags) { if ((fp = fopen(filename,"r")) == NULL) return C_ERR; startLoadingFile(fp, filename,rdbflags); rioInitWithFile(&rdb,fp); - retval = rdbLoadRio(&rdb,rdbflags,rsi,server.db); + + retval = rdbLoadRio(&rdb,rdbflags,rsi); + fclose(fp); stopLoading(retval==C_OK); return retval; @@ -100,6 +100,7 @@ #define rdbIsObjectType(t) ((t >= 0 && t <= 7) || (t >= 9 && t <= 18)) /* Special RDB opcodes (saved/loaded with rdbSaveType/rdbLoadType). */ +#define RDB_OPCODE_FUNCTION 246 /* engine data */ #define RDB_OPCODE_MODULE_AUX 247 /* Module auxiliary data. */ #define RDB_OPCODE_IDLE 248 /* LRU idle time. */ #define RDB_OPCODE_FREQ 249 /* LFU frequency. */ @@ -166,7 +167,8 @@ int rdbSaveBinaryDoubleValue(rio *rdb, double val); int rdbLoadBinaryDoubleValue(rio *rdb, double *val); int rdbSaveBinaryFloatValue(rio *rdb, float val); int rdbLoadBinaryFloatValue(rio *rdb, float *val); -int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi, redisDb *db); +int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi); +int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadingCtx *rdb_loading_ctx); int rdbSaveRio(rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi); rdbSaveInfo *rdbPopulateSaveInfo(rdbSaveInfo *rsi); diff --git a/src/replication.c b/src/replication.c index cfd7f1f37..66483d934 100644 --- a/src/replication.c +++ b/src/replication.c @@ -32,6 +32,7 @@ #include "server.h" #include "cluster.h" #include "bio.h" +#include "functions.h" #include <memory.h> #include <sys/time.h> @@ -1738,6 +1739,7 @@ void readSyncBulkPayload(connection *conn) { ssize_t nread, readlen, nwritten; int use_diskless_load = useDisklessLoad(); redisDb *diskless_load_tempDb = NULL; + functionsCtx* temp_functions_ctx = NULL; int empty_db_flags = server.repl_slave_lazy_flush ? EMPTYDB_ASYNC : EMPTYDB_NO_FLAGS; off_t left; @@ -1913,6 +1915,7 @@ void readSyncBulkPayload(connection *conn) { if (use_diskless_load && server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) { /* Initialize empty tempDb dictionaries. */ diskless_load_tempDb = disklessLoadInitTempDb(); + temp_functions_ctx = functionsCtxCreate(); moduleFireServerEvent(REDISMODULE_EVENT_REPL_ASYNC_LOAD, REDISMODULE_SUBEVENT_REPL_ASYNC_LOAD_STARTED, @@ -1935,6 +1938,7 @@ void readSyncBulkPayload(connection *conn) { if (use_diskless_load) { rio rdb; redisDb *dbarray; + functionsCtx* functions_ctx; int asyncLoading = 0; if (server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) { @@ -1947,8 +1951,11 @@ void readSyncBulkPayload(connection *conn) { asyncLoading = 1; } dbarray = diskless_load_tempDb; + functions_ctx = temp_functions_ctx; } else { dbarray = server.db; + functions_ctx = functionsCtxGetCurrent(); + functionsCtxClear(functions_ctx); } rioInitWithConn(&rdb,conn,server.repl_transfer_size); @@ -1960,7 +1967,8 @@ void readSyncBulkPayload(connection *conn) { startLoading(server.repl_transfer_size, RDBFLAGS_REPLICATION, asyncLoading); int loadingFailed = 0; - if (rdbLoadRio(&rdb,RDBFLAGS_REPLICATION,&rsi,dbarray) != C_OK) { + rdbLoadingCtx loadingCtx = { .dbarray = dbarray, .functions_ctx = functions_ctx }; + if (rdbLoadRioWithLoadingCtx(&rdb,RDBFLAGS_REPLICATION,&rsi,&loadingCtx) != C_OK) { /* RDB loading failed. */ serverLog(LL_WARNING, "Failed trying to load the MASTER synchronization DB " @@ -1988,6 +1996,7 @@ void readSyncBulkPayload(connection *conn) { NULL); disklessLoadDiscardTempDb(diskless_load_tempDb); + functionsCtxFree(temp_functions_ctx); serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Discarding temporary DB in background"); } else { /* Remove the half-loaded data in case we started with an empty replica. */ @@ -2010,6 +2019,9 @@ void readSyncBulkPayload(connection *conn) { serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Swapping active DB with loaded DB"); swapMainDbWithTempDb(diskless_load_tempDb); + /* swap existing functions ctx with the temporary one */ + functionsCtxSwapWithCurrent(temp_functions_ctx); + moduleFireServerEvent(REDISMODULE_EVENT_REPL_ASYNC_LOAD, REDISMODULE_SUBEVENT_REPL_ASYNC_LOAD_COMPLETED, NULL); diff --git a/src/script.c b/src/script.c index 0d6014508..34e4953d3 100644 --- a/src/script.c +++ b/src/script.c @@ -60,6 +60,11 @@ client* scriptGetClient() { return curr_run_ctx->c; } +client* scriptGetCaller() { + serverAssert(scriptIsRunning()); + return curr_run_ctx->original_client; +} + /* interrupt function for scripts, should be call * from time to time to reply some special command (like ping) * and also check if the run should be terminated. */ @@ -78,8 +83,8 @@ int scriptInterrupt(scriptRunCtx *run_ctx) { serverLog(LL_WARNING, "Slow script detected: still in execution after %lld milliseconds. " - "You can try killing the script using the SCRIPT KILL command.", - elapsed); + "You can try killing the script using the %s command.", + elapsed, (run_ctx->flags & SCRIPT_EVAL_MODE) ? "SCRIPT KILL" : "FUNCTION KILL"); enterScriptTimedoutMode(run_ctx); /* Once the script timeouts we reenter the event loop to permit others @@ -159,8 +164,18 @@ int scriptIsRunning() { return curr_run_ctx != NULL; } +const char* scriptCurrFunction() { + serverAssert(scriptIsRunning()); + return curr_run_ctx->funcname; +} + +int scriptIsEval() { + serverAssert(scriptIsRunning()); + return curr_run_ctx->flags & SCRIPT_EVAL_MODE; +} + /* Kill the current running script */ -void scriptKill(client *c) { +void scriptKill(client *c, int is_eval) { if (!curr_run_ctx) { addReplyError(c, "-NOTBUSY No scripts in execution right now."); return; @@ -177,6 +192,16 @@ void scriptKill(client *c) { "using the SHUTDOWN NOSAVE command."); return; } + if (is_eval && !(curr_run_ctx->flags & SCRIPT_EVAL_MODE)) { + /* Kill a function with 'SCRIPT KILL' is not allow */ + addReplyErrorObject(c, shared.slowscripterr); + return; + } + if (!is_eval && (curr_run_ctx->flags & SCRIPT_EVAL_MODE)) { + /* Kill an eval with 'FUNCTION KILL' is not allow */ + addReplyErrorObject(c, shared.slowevalerr); + return; + } curr_run_ctx->flags |= SCRIPT_KILLED; addReply(c, shared.ok); } @@ -430,3 +455,10 @@ mstime_t scriptTimeSnapshot() { serverAssert(!curr_run_ctx); return curr_run_ctx->snapshot_time; } + +long long scriptRunDuration() { + serverAssert(scriptIsRunning()); + return elapsedMs(curr_run_ctx->start_time); +} + + diff --git a/src/script.h b/src/script.h index 4d5e92966..aeed72456 100644 --- a/src/script.h +++ b/src/script.h @@ -69,6 +69,7 @@ #define SCRIPT_READ_ONLY (1ULL<<5) /* indicate that the current script should only perform read commands */ #define SCRIPT_EVAL_REPLICATION (1ULL<<6) /* mode for eval, indicate that we replicate the script invocation and not the effects */ +#define SCRIPT_EVAL_MODE (1ULL<<7) /* Indicate that the current script called from legacy Lua */ typedef struct scriptRunCtx scriptRunCtx; struct scriptRunCtx { @@ -87,10 +88,14 @@ int scriptSetResp(scriptRunCtx *r_ctx, int resp); int scriptSetRepl(scriptRunCtx *r_ctx, int repl); void scriptCall(scriptRunCtx *r_ctx, robj **argv, int argc, sds *err); int scriptInterrupt(scriptRunCtx *r_ctx); -void scriptKill(client *c); +void scriptKill(client *c, int is_eval); int scriptIsRunning(); +const char* scriptCurrFunction(); +int scriptIsEval(); int scriptIsTimedout(); client* scriptGetClient(); +client* scriptGetCaller(); mstime_t scriptTimeSnapshot(); +long long scriptRunDuration(); #endif /* __SCRIPT_H_ */ diff --git a/src/script_lua.c b/src/script_lua.c index fb3fa397b..aa73c5fb3 100644 --- a/src/script_lua.c +++ b/src/script_lua.c @@ -867,6 +867,8 @@ cleanup: } c->user = NULL; + c->argv = NULL; + c->argc = 0; if (raise_error) { /* If we are here we should have an error in the stack, in the @@ -1067,8 +1069,12 @@ static void luaRemoveUnsupportedFunctions(lua_State *lua) { * the creation of globals accidentally. * * It should be the last to be called in the scripting engine initialization - * sequence, because it may interact with creation of globals. */ -void luaEnableGlobalsProtection(lua_State *lua) { + * sequence, because it may interact with creation of globals. + * + * On Legacy Lua (eval) we need to check 'w ~= \"main\"' otherwise we will not be able + * to create the global 'function <sha> ()' variable. On Lua engine we do not use this trick + * so its not needed. */ +void luaEnableGlobalsProtection(lua_State *lua, int is_eval) { char *s[32]; sds code = sdsempty(); int j = 0; @@ -1081,7 +1087,7 @@ void luaEnableGlobalsProtection(lua_State *lua) { s[j++]="mt.__newindex = function (t, n, v)\n"; s[j++]=" if dbg.getinfo(2) then\n"; s[j++]=" local w = dbg.getinfo(2, \"S\").what\n"; - s[j++]=" if w ~= \"main\" and w ~= \"C\" then\n"; + s[j++]= is_eval ? " if w ~= \"main\" and w ~= \"C\" then\n" : " if w ~= \"C\" then\n"; s[j++]=" error(\"Script attempted to create global variable '\"..tostring(n)..\"'\", 2)\n"; s[j++]=" end\n"; s[j++]=" end\n"; @@ -1336,3 +1342,7 @@ void luaCallFunction(scriptRunCtx* run_ctx, lua_State *lua, robj** keys, size_t /* remove run_ctx from registry, its only applicable for the current script. */ luaSaveOnRegistry(lua, REGISTRY_RUN_CTX_NAME, NULL); } + +unsigned long luaMemory(lua_State *lua) { + return lua_gc(lua, LUA_GCCOUNT, 0) * 1024LL; +} diff --git a/src/script_lua.h b/src/script_lua.h index c18ad40bd..40cdd00b5 100644 --- a/src/script_lua.h +++ b/src/script_lua.h @@ -57,10 +57,11 @@ #define REGISTRY_RUN_CTX_NAME "__RUN_CTX__" void luaRegisterRedisAPI(lua_State* lua); -void luaEnableGlobalsProtection(lua_State *lua); +void luaEnableGlobalsProtection(lua_State *lua, int is_eval); void luaSaveOnRegistry(lua_State* lua, const char* name, void* ptr); void* luaGetFromRegistry(lua_State* lua, const char* name); void luaCallFunction(scriptRunCtx* r_ctx, lua_State *lua, robj** keys, size_t nkeys, robj** args, size_t nargs, int debug_enabled); +unsigned long luaMemory(lua_State *lua); #endif /* __SCRIPT_LUA_H_ */ diff --git a/src/server.c b/src/server.c index 1fbf3fa3a..571eb1b06 100644 --- a/src/server.c +++ b/src/server.c @@ -35,7 +35,7 @@ #include "latency.h" #include "atomicvar.h" #include "mt19937-64.h" -#include "script.h" +#include "functions.h" #include <time.h> #include <signal.h> @@ -472,6 +472,31 @@ struct redisCommand scriptSubcommands[] = { {NULL}, }; +struct redisCommand functionSubcommands[] = { + {"create",functionsCreateCommand,-5, + "may-replicate no-script @scripting"}, + + {"delete",functionsDeleteCommand,3, + "may-replicate no-script @scripting"}, + + {"kill",functionsKillCommand,2, + "no-script @scripting"}, + + {"info",functionsInfoCommand,-3, + "no-script @scripting"}, + + {"list",functionsListCommand,2, + "no-script @scripting"}, + + {"stats",functionsStatsCommand,2, + "no-script @scripting"}, + + {"help",functionsHelpCommand,2, + "ok-loading ok-stale @scripting"}, + + {NULL}, +}; + struct redisCommand clientSubcommands[] = { {"caching",clientCommand,3, "no-script ok-loading ok-stale @connection"}, @@ -2033,7 +2058,25 @@ struct redisCommand redisCommandTable[] = { "no-auth no-script ok-stale ok-loading fast @connection"}, {"failover",failoverCommand,-1, - "admin no-script ok-stale"} + "admin no-script ok-stale"}, + + {"function",NULL,-2, + "", + .subcommands=functionSubcommands}, + + {"fcall",fcallCommand,-3, + "no-script no-monitor may-replicate no-mandatory-keys @scripting", + {{"read write", /* We pass both read and write because these flag are worst-case-scenario */ + KSPEC_BS_INDEX,.bs.index={2}, + KSPEC_FK_KEYNUM,.fk.keynum={0,1,1}}}, + functionGetKeys}, + + {"fcall_ro",fcallCommandReadOnly,-3, + "no-script no-monitor no-mandatory-keys @scripting", + {{"read", + KSPEC_BS_INDEX,.bs.index={2}, + KSPEC_FK_KEYNUM,.fk.keynum={0,1,1}}}, + functionGetKeys}, }; /*============================ Utility functions ============================ */ @@ -2209,6 +2252,11 @@ void dictSdsDestructor(dict *d, void *val) sdsfree(val); } +void *dictSdsDup(dict *d, const void *key) { + UNUSED(d); + return sdsdup((const sds) key); +} + int dictObjKeyCompare(dict *d, const void *key1, const void *key2) { @@ -3517,8 +3565,10 @@ void createSharedObjects(void) { "-NOSCRIPT No matching script. Please use EVAL.\r\n")); shared.loadingerr = createObject(OBJ_STRING,sdsnew( "-LOADING Redis is loading the dataset in memory\r\n")); - shared.slowscripterr = createObject(OBJ_STRING,sdsnew( + shared.slowevalerr = createObject(OBJ_STRING,sdsnew( "-BUSY Redis is busy running a script. You can only call SCRIPT KILL or SHUTDOWN NOSAVE.\r\n")); + shared.slowscripterr = createObject(OBJ_STRING,sdsnew( + "-BUSY Redis is busy running a script. You can only call FUNCTION KILL or SHUTDOWN NOSAVE.\r\n")); shared.masterdownerr = createObject(OBJ_STRING,sdsnew( "-MASTERDOWN Link with MASTER is down and replica-serve-stale-data is set to 'no'.\r\n")); shared.bgsaveerr = createObject(OBJ_STRING,sdsnew( @@ -4345,6 +4395,7 @@ void initServer(void) { if (server.cluster_enabled) clusterInit(); replicationScriptCacheInit(); scriptingInit(1); + functionsInit(); slowlogInit(); latencyMonitorInit(); @@ -5448,9 +5499,15 @@ int processCommand(client *c) { tolower(((char*)c->argv[1]->ptr)[0]) == 'n') && !(c->cmd->proc == scriptCommand && c->argc == 2 && - tolower(((char*)c->argv[1]->ptr)[0]) == 'k')) + tolower(((char*)c->argv[1]->ptr)[0]) == 'k') && + !(c->cmd->proc == functionsKillCommand) && + !(c->cmd->proc == functionsStatsCommand)) { - rejectCommand(c, shared.slowscripterr); + if (scriptIsEval()) { + rejectCommand(c, shared.slowevalerr); + } else { + rejectCommand(c, shared.slowscripterr); + } return C_OK; } @@ -6281,6 +6338,7 @@ sds genRedisInfoString(const char *section) { char peak_hmem[64]; char total_system_hmem[64]; char used_memory_lua_hmem[64]; + char used_memory_vm_total_hmem[64]; char used_memory_scripts_hmem[64]; char used_memory_rss_hmem[64]; char maxmemory_hmem[64]; @@ -6288,6 +6346,7 @@ sds genRedisInfoString(const char *section) { size_t total_system_mem = server.system_memory_size; const char *evict_policy = evictPolicyToString(); long long memory_lua = evalMemory(); + long long memory_functions = functionsMemory(); struct redisMemOverhead *mh = getMemoryOverheadData(); /* Peak memory is updated from time to time by serverCron() so it @@ -6301,7 +6360,8 @@ sds genRedisInfoString(const char *section) { bytesToHuman(peak_hmem,server.stat_peak_memory); bytesToHuman(total_system_hmem,total_system_mem); bytesToHuman(used_memory_lua_hmem,memory_lua); - bytesToHuman(used_memory_scripts_hmem,mh->lua_caches); + bytesToHuman(used_memory_vm_total_hmem,memory_functions + memory_lua); + bytesToHuman(used_memory_scripts_hmem,mh->lua_caches + mh->functions_caches); bytesToHuman(used_memory_rss_hmem,server.cron_malloc_stats.process_rss); bytesToHuman(maxmemory_hmem,server.maxmemory); @@ -6324,11 +6384,18 @@ sds genRedisInfoString(const char *section) { "allocator_resident:%zu\r\n" "total_system_memory:%lu\r\n" "total_system_memory_human:%s\r\n" - "used_memory_lua:%lld\r\n" - "used_memory_lua_human:%s\r\n" + "used_memory_lua:%lld\r\n" /* deprecated, renamed to used_memory_vm_eval */ + "used_memory_vm_eval:%lld\r\n" + "used_memory_lua_human:%s\r\n" /* deprecated */ + "used_memory_scripts_eval:%lld\r\n" + "number_of_cached_scripts:%lu\r\n" + "number_of_functions:%lu\r\n" + "used_memory_vm_functions:%lld\r\n" + "used_memory_vm_total:%lld\r\n" + "used_memory_vm_total_human:%s\r\n" + "used_memory_functions:%lld\r\n" "used_memory_scripts:%lld\r\n" "used_memory_scripts_human:%s\r\n" - "number_of_cached_scripts:%lu\r\n" "maxmemory:%lld\r\n" "maxmemory_human:%s\r\n" "maxmemory_policy:%s\r\n" @@ -6367,10 +6434,17 @@ sds genRedisInfoString(const char *section) { (unsigned long)total_system_mem, total_system_hmem, memory_lua, + memory_lua, used_memory_lua_hmem, (long long) mh->lua_caches, - used_memory_scripts_hmem, dictSize(evalScriptsDict()), + functionsNum(), + memory_functions, + memory_functions + memory_lua, + used_memory_vm_total_hmem, + (long long) mh->functions_caches, + (long long) mh->lua_caches + (long long) mh->functions_caches, + used_memory_scripts_hmem, server.maxmemory, maxmemory_hmem, evict_policy, diff --git a/src/server.h b/src/server.h index 3511ff04b..535d21dd9 100644 --- a/src/server.h +++ b/src/server.h @@ -822,6 +822,19 @@ typedef struct redisDb { clusterSlotToKeyMapping *slots_to_keys; /* Array of slots to keys. Only used in cluster mode (db 0). */ } redisDb; +/* forward declaration for functions ctx */ +typedef struct functionsCtx functionsCtx; + +/* Holding object that need to be populated during + * rdb loading. On loading end it is possible to decide + * whether not to set those objects on their rightful place. + * For example: dbarray need to be set as main database on + * successful loading and dropped on failure. */ +typedef struct rdbLoadingCtx { + redisDb* dbarray; + functionsCtx* functions_ctx; +}rdbLoadingCtx; + /* Client MULTI/EXEC state */ typedef struct multiCmd { robj **argv; @@ -1122,7 +1135,7 @@ struct sharedObjectsStruct { robj *crlf, *ok, *err, *emptybulk, *czero, *cone, *pong, *space, *queued, *null[4], *nullarray[4], *emptymap[4], *emptyset[4], *emptyarray, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr, - *outofrangeerr, *noscripterr, *loadingerr, *slowscripterr, *bgsaveerr, + *outofrangeerr, *noscripterr, *loadingerr, *slowevalerr, *slowscripterr, *bgsaveerr, *masterdownerr, *roslaveerr, *execaborterr, *noautherr, *noreplicaserr, *busykeyerr, *oomerr, *plus, *messagebulk, *pmessagebulk, *subscribebulk, *unsubscribebulk, *psubscribebulk, *punsubscribebulk, *del, *unlink, @@ -1203,6 +1216,7 @@ struct redisMemOverhead { size_t clients_normal; size_t aof_buffer; size_t lua_caches; + size_t functions_caches; size_t overhead_total; size_t dataset; size_t total_keys; @@ -2670,6 +2684,7 @@ int sintercardGetKeys(struct redisCommand *cmd,robj **argv, int argc, getKeysRes int zunionInterDiffGetKeys(struct redisCommand *cmd,robj **argv, int argc, getKeysResult *result); int zunionInterDiffStoreGetKeys(struct redisCommand *cmd,robj **argv, int argc, getKeysResult *result); int evalGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result); +int functionGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result); int sortGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result); int migrateGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result); int georadiusGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result); @@ -2761,6 +2776,7 @@ uint64_t dictSdsCaseHash(const void *key); int dictSdsKeyCompare(dict *d, const void *key1, const void *key2); int dictSdsKeyCaseCompare(dict *d, const void *key1, const void *key2); void dictSdsDestructor(dict *d, void *val); +void *dictSdsDup(dict *d, const void *key); /* Git SHA1 */ char *redisGitSHA1(void); diff --git a/tests/integration/replication.tcl b/tests/integration/replication.tcl index fe4adbe93..1332c8380 100644 --- a/tests/integration/replication.tcl +++ b/tests/integration/replication.tcl @@ -521,6 +521,12 @@ foreach testType {Successful Aborted} { # Set a key value on replica to check status during loading, on failure and after swapping db $replica set mykey myvalue + # Set a function value on replica to check status during loading, on failure and after swapping db + $replica function create LUA test {return 'hello1'} + + # Set a function value on master to check it reaches the replica when replication ends + $master function create LUA test {return 'hello2'} + # Force the replica to try another full sync (this time it will have matching master replid) $master multi $master client kill type replica @@ -552,6 +558,9 @@ foreach testType {Successful Aborted} { # Ensure we still see old values while async_loading is in progress and also not LOADING status assert_equal [$replica get mykey] "myvalue" + # Ensure we still can call old function while async_loading is in progress + assert_equal [$replica fcall test 0] "hello1" + # Make sure we're still async_loading to validate previous assertion assert_equal [s -1 async_loading] 1 @@ -576,6 +585,9 @@ foreach testType {Successful Aborted} { # Ensure we see old values from replica assert_equal [$replica get mykey] "myvalue" + # Ensure we still can call old function + assert_equal [$replica fcall test 0] "hello1" + # Make sure amount of replica keys didn't change assert_equal [$replica dbsize] 2001 } @@ -595,6 +607,9 @@ foreach testType {Successful Aborted} { # Ensure we don't see anymore the key that was stored only to replica and also that we don't get LOADING status assert_equal [$replica GET mykey] "" + # Ensure we got the new function + assert_equal [$replica fcall test 0] "hello2" + # Make sure amount of keys matches master assert_equal [$replica dbsize] 1010 } @@ -624,6 +639,10 @@ test {diskless loading short read} { $replica config set dynamic-hz no # Try to fill the master with all types of data types / encodings set start [clock clicks -milliseconds] + + # Set a function value to check short read handling on functions + r function create LUA test {return 'hello1'} + for {set k 0} {$k < 3} {incr k} { for {set i 0} {$i < 10} {incr i} { r set "$k int_$i" [expr {int(rand()*10000)}] diff --git a/tests/unit/functions.tcl b/tests/unit/functions.tcl new file mode 100644 index 000000000..0736a44da --- /dev/null +++ b/tests/unit/functions.tcl @@ -0,0 +1,280 @@ +start_server {tags {"scripting"}} { + test {FUNCTION - Basic usage} { + r function create LUA test {return 'hello'} + r fcall test 0 + } {hello} + + test {FUNCTION - Create an already exiting function raise error} { + catch { + r function create LUA test {return 'hello1'} + } e + set _ $e + } {*Function already exists*} + + test {FUNCTION - Create function with unexisting engine} { + catch { + r function create bad_engine test {return 'hello1'} + } e + set _ $e + } {*Engine not found*} + + test {FUNCTION - Test uncompiled script} { + catch { + r function create LUA test1 {bad script} + } e + set _ $e + } {*Error compiling function*} + + test {FUNCTION - test replace argument} { + r function create LUA test REPLACE {return 'hello1'} + r fcall test 0 + } {hello1} + + test {FUNCTION - test replace argument with function creation failure keeps old function} { + catch {r function create LUA test REPLACE {error}} + r fcall test 0 + } {hello1} + + test {FUNCTION - test function delete} { + r function delete test + catch { + r fcall test 0 + } e + set _ $e + } {*Function not found*} + + test {FUNCTION - test description argument} { + r function create LUA test DESCRIPTION {some description} {return 'hello'} + r function list + } {{name test engine LUA description {some description}}} + + test {FUNCTION - test info specific function} { + r function info test WITHCODE + } {name test engine LUA description {some description} code {return 'hello'}} + + test {FUNCTION - test info without code} { + r function info test + } {name test engine LUA description {some description}} + + test {FUNCTION - test info on function that does not exists} { + catch { + r function info bad_function_name + } e + set _ $e + } {*Function does not exists*} + + test {FUNCTION - test info with bad number of arguments} { + catch { + r function info test WITHCODE bad_arg + } e + set _ $e + } {*wrong number of arguments*} + + test {FUNCTION - test fcall bad arguments} { + catch { + r fcall test bad_arg + } e + set _ $e + } {*Bad number of keys provided*} + + test {FUNCTION - test fcall bad number of keys arguments} { + catch { + r fcall test 10 key1 + } e + set _ $e + } {*Number of keys can't be greater than number of args*} + + test {FUNCTION - test fcall negative number of keys} { + catch { + r fcall test -1 key1 + } e + set _ $e + } {*Number of keys can't be negative*} + + test {FUNCTION - test function delete on not exiting function} { + catch { + r function delete test1 + } e + set _ $e + } {*Function not found*} + + test {FUNCTION - test function kill when function is not running} { + catch { + r function kill + } e + set _ $e + } {*No scripts in execution*} + + test {FUNCTION - test wrong subcommand} { + catch { + r function bad_subcommand + } e + set _ $e + } {*Unknown subcommand*} + + test {FUNCTION - test loading from rdb} { + r debug reload + r fcall test 0 + } {hello} + + test {FUNCTION - test fcall_ro with write command} { + r function create lua test REPLACE {return redis.call('set', 'x', '1')} + catch { r fcall_ro test 0 } e + set _ $e + } {*Write commands are not allowed from read-only scripts*} + + test {FUNCTION - test fcall_ro with read only commands} { + r function create lua test REPLACE {return redis.call('get', 'x')} + r set x 1 + r fcall_ro test 0 + } {1} + + test {FUNCTION - test keys and argv} { + r function create lua test REPLACE {return redis.call('set', KEYS[1], ARGV[1])} + r fcall test 1 x foo + r get x + } {foo} + + test {FUNCTION - test command get keys on fcall} { + r COMMAND GETKEYS fcall test 1 x foo + } {x} + + test {FUNCTION - test command get keys on fcall_ro} { + r COMMAND GETKEYS fcall_ro test 1 x foo + } {x} + + test {FUNCTION - test function kill} { + set rd [redis_deferring_client] + r config set script-time-limit 10 + r function create lua test REPLACE {local a = 1 while true do a = a + 1 end} + $rd fcall test 0 + after 200 + catch {r ping} e + assert_match {BUSY*} $e + assert_match {running_script {name test command {fcall test 0} duration_ms *} engines LUA} [r FUNCTION STATS] + r function kill + after 200 ; # Give some time to Lua to call the hook again... + assert_equal [r ping] "PONG" + } + + test {FUNCTION - test script kill not working on function} { + set rd [redis_deferring_client] + r config set script-time-limit 10 + r function create lua test REPLACE {local a = 1 while true do a = a + 1 end} + $rd fcall test 0 + after 200 + catch {r ping} e + assert_match {BUSY*} $e + catch {r script kill} e + assert_match {BUSY*} $e + r function kill + after 200 ; # Give some time to Lua to call the hook again... + assert_equal [r ping] "PONG" + } + + test {FUNCTION - test function kill not working on eval} { + set rd [redis_deferring_client] + r config set script-time-limit 10 + $rd eval {local a = 1 while true do a = a + 1 end} 0 + after 200 + catch {r ping} e + assert_match {BUSY*} $e + catch {r function kill} e + assert_match {BUSY*} $e + r script kill + after 200 ; # Give some time to Lua to call the hook again... + assert_equal [r ping] "PONG" + } +} + +start_server {tags {"scripting repl"}} { + start_server {} { + test "Connect a replica to the master instance" { + r -1 slaveof [srv 0 host] [srv 0 port] + wait_for_condition 50 100 { + [s -1 role] eq {slave} && + [string match {*master_link_status:up*} [r -1 info replication]] + } else { + fail "Can't turn the instance into a replica" + } + } + + test {FUNCTION - creation is replicated to replica} { + r function create LUA test DESCRIPTION {some description} {return 'hello'} + wait_for_condition 50 100 { + [r -1 function list] eq {{name test engine LUA description {some description}}} + } else { + fail "Failed waiting for function to replicate to replica" + } + } + + test {FUNCTION - call on replica} { + r -1 fcall test 0 + } {hello} + + test {FUNCTION - delete is replicated to replica} { + r function delete test + wait_for_condition 50 100 { + [r -1 function list] eq {} + } else { + fail "Failed waiting for function to replicate to replica" + } + } + + test "Disconnecting the replica from master instance" { + r -1 slaveof no one + # creating a function after disconnect to make sure function + # is replicated on rdb phase + r function create LUA test DESCRIPTION {some description} {return 'hello'} + + # reconnect the replica + r -1 slaveof [srv 0 host] [srv 0 port] + wait_for_condition 50 100 { + [s -1 role] eq {slave} && + [string match {*master_link_status:up*} [r -1 info replication]] + } else { + fail "Can't turn the instance into a replica" + } + } + + test "FUNCTION - test replication to replica on rdb phase" { + r -1 fcall test 0 + } {hello} + + test "FUNCTION - test replication to replica on rdb phase info command" { + r -1 function info test WITHCODE + } {name test engine LUA description {some description} code {return 'hello'}} + + test "FUNCTION - create on read only replica" { + catch { + r -1 function create LUA test DESCRIPTION {some description} {return 'hello'} + } e + set _ $e + } {*Can not create a function on a read only replica*} + + test "FUNCTION - delete on read only replica" { + catch { + r -1 function delete test + } e + set _ $e + } {*Can not delete a function on a read only replica*} + + test "FUNCTION - function effect is replicated to replica" { + r function create LUA test REPLACE {return redis.call('set', 'x', '1')} + r fcall test 0 + assert {[r get x] eq {1}} + wait_for_condition 50 100 { + [r -1 get x] eq {1} + } else { + fail "Failed waiting function effect to be replicated to replica" + } + } + + test "FUNCTION - modify key space of read only replica" { + catch { + r -1 fcall test 0 + } e + set _ $e + } {*can't write against a read only replica*} + } +}
\ No newline at end of file diff --git a/tests/unit/scripting.tcl b/tests/unit/scripting.tcl index f62f68970..09c021e56 100644 --- a/tests/unit/scripting.tcl +++ b/tests/unit/scripting.tcl @@ -1,48 +1,87 @@ +foreach is_eval {0 1} { + +if {$is_eval == 1} { + proc run_script {args} { + r eval {*}$args + } + proc run_script_ro {args} { + r eval_ro {*}$args + } + proc run_script_on_connection {args} { + [lindex $args 0] eval {*}[lrange $args 1 end] + } + proc kill_script {args} { + r script kill + } +} else { + proc run_script {args} { + r function create LUA test replace [lindex $args 0] + r fcall test {*}[lrange $args 1 end] + } + proc run_script_ro {args} { + r function create LUA test replace [lindex $args 0] + r fcall_ro test {*}[lrange $args 1 end] + } + proc run_script_on_connection {args} { + set rd [lindex $args 0] + $rd function create LUA test replace [lindex $args 1] + # read the ok reply of function create + $rd read + $rd fcall test {*}[lrange $args 2 end] + } + proc kill_script {args} { + r function kill + } +} + start_server {tags {"scripting"}} { + test {EVAL - Does Lua interpreter replies to our requests?} { - r eval {return 'hello'} 0 + run_script {return 'hello'} 0 } {hello} test {EVAL - Lua integer -> Redis protocol type conversion} { - r eval {return 100.5} 0 + run_script {return 100.5} 0 } {100} test {EVAL - Lua string -> Redis protocol type conversion} { - r eval {return 'hello world'} 0 + run_script {return 'hello world'} 0 } {hello world} test {EVAL - Lua true boolean -> Redis protocol type conversion} { - r eval {return true} 0 + run_script {return true} 0 } {1} test {EVAL - Lua false boolean -> Redis protocol type conversion} { - r eval {return false} 0 + run_script {return false} 0 } {} test {EVAL - Lua status code reply -> Redis protocol type conversion} { - r eval {return {ok='fine'}} 0 + run_script {return {ok='fine'}} 0 } {fine} test {EVAL - Lua error reply -> Redis protocol type conversion} { catch { - r eval {return {err='this is an error'}} 0 + run_script {return {err='this is an error'}} 0 } e set _ $e } {this is an error} test {EVAL - Lua table -> Redis protocol type conversion} { - r eval {return {1,2,3,'ciao',{1,2}}} 0 + run_script {return {1,2,3,'ciao',{1,2}}} 0 } {1 2 3 ciao {1 2}} test {EVAL - Are the KEYS and ARGV arrays populated correctly?} { - r eval {return {KEYS[1],KEYS[2],ARGV[1],ARGV[2]}} 2 a{t} b{t} c{t} d{t} + run_script {return {KEYS[1],KEYS[2],ARGV[1],ARGV[2]}} 2 a{t} b{t} c{t} d{t} } {a{t} b{t} c{t} d{t}} test {EVAL - is Lua able to call Redis API?} { r set mykey myval - r eval {return redis.call('get',KEYS[1])} 1 mykey + run_script {return redis.call('get',KEYS[1])} 1 mykey } {myval} + if {$is_eval eq 1} { + # eval sha is only relevant for is_eval Lua test {EVALSHA - Can we call a SHA1 if already defined?} { r evalsha fd758d1589d044dd850a6f05d52f2eefd27f033f 1 mykey } {myval} @@ -60,10 +99,11 @@ start_server {tags {"scripting"}} { catch {r evalsha ffd632c7d33e571e9f24556ebed26c3479a87130 0} e set _ $e } {NOSCRIPT*} + } ;# is_eval test {EVAL - Redis integer -> Lua type conversion} { r set x 0 - r eval { + run_script { local foo = redis.pcall('incr',KEYS[1]) return {type(foo),foo} } 1 x @@ -71,7 +111,7 @@ start_server {tags {"scripting"}} { test {EVAL - Redis bulk -> Lua type conversion} { r set mykey myval - r eval { + run_script { local foo = redis.pcall('get',KEYS[1]) return {type(foo),foo} } 1 mykey @@ -82,14 +122,14 @@ start_server {tags {"scripting"}} { r rpush mylist a r rpush mylist b r rpush mylist c - r eval { + run_script { local foo = redis.pcall('lrange',KEYS[1],0,-1) return {type(foo),foo[1],foo[2],foo[3],# foo} } 1 mylist } {table a b c 3} test {EVAL - Redis status reply -> Lua type conversion} { - r eval { + run_script { local foo = redis.pcall('set',KEYS[1],'myval') return {type(foo),foo['ok']} } 1 mykey @@ -97,7 +137,7 @@ start_server {tags {"scripting"}} { test {EVAL - Redis error reply -> Lua type conversion} { r set mykey myval - r eval { + run_script { local foo = redis.pcall('incr',KEYS[1]) return {type(foo),foo['err']} } 1 mykey @@ -105,7 +145,7 @@ start_server {tags {"scripting"}} { test {EVAL - Redis nil bulk reply -> Lua type conversion} { r del mykey - r eval { + run_script { local foo = redis.pcall('get',KEYS[1]) return {type(foo),foo == false} } 1 mykey @@ -115,13 +155,13 @@ start_server {tags {"scripting"}} { r set mykey "this is DB 9" r select 10 r set mykey "this is DB 10" - r eval {return redis.pcall('get',KEYS[1])} 1 mykey + run_script {return redis.pcall('get',KEYS[1])} 1 mykey } {this is DB 10} {singledb:skip} test {EVAL - SELECT inside Lua should not affect the caller} { # here we DB 10 is selected r set mykey "original value" - r eval {return redis.pcall('select','9')} 0 + run_script {return redis.pcall('select','9')} 0 set res [r get mykey] r select 9 set res @@ -131,7 +171,7 @@ start_server {tags {"scripting"}} { test {EVAL - Script can't run more than configured time limit} { r config set lua-time-limit 1 catch { - r eval { + run_script { local i = 0 while true do i=i+1 end } 0 @@ -142,71 +182,74 @@ start_server {tags {"scripting"}} { test {EVAL - Scripts can't run blpop command} { set e {} - catch {r eval {return redis.pcall('blpop','x',0)} 0} e + catch {run_script {return redis.pcall('blpop','x',0)} 0} e set e } {*not allowed*} test {EVAL - Scripts can't run brpop command} { set e {} - catch {r eval {return redis.pcall('brpop','empty_list',0)} 0} e + catch {run_script {return redis.pcall('brpop','empty_list',0)} 0} e set e } {*not allowed*} test {EVAL - Scripts can't run brpoplpush command} { set e {} - catch {r eval {return redis.pcall('brpoplpush','empty_list1', 'empty_list2',0)} 0} e + catch {run_script {return redis.pcall('brpoplpush','empty_list1', 'empty_list2',0)} 0} e set e } {*not allowed*} test {EVAL - Scripts can't run blmove command} { set e {} - catch {r eval {return redis.pcall('blmove','empty_list1', 'empty_list2', 'LEFT', 'LEFT', 0)} 0} e + catch {run_script {return redis.pcall('blmove','empty_list1', 'empty_list2', 'LEFT', 'LEFT', 0)} 0} e set e } {*not allowed*} test {EVAL - Scripts can't run bzpopmin command} { set e {} - catch {r eval {return redis.pcall('bzpopmin','empty_zset', 0)} 0} e + catch {run_script {return redis.pcall('bzpopmin','empty_zset', 0)} 0} e set e } {*not allowed*} test {EVAL - Scripts can't run bzpopmax command} { set e {} - catch {r eval {return redis.pcall('bzpopmax','empty_zset', 0)} 0} e + catch {run_script {return redis.pcall('bzpopmax','empty_zset', 0)} 0} e set e } {*not allowed*} test {EVAL - Scripts can't run XREAD and XREADGROUP with BLOCK option} { r del s r xgroup create s g $ MKSTREAM - set res [r eval {return redis.pcall('xread','STREAMS','s','$')} 1 s] + set res [run_script {return redis.pcall('xread','STREAMS','s','$')} 1 s] assert {$res eq {}} - assert_error "*xread command is not allowed with BLOCK option from scripts" {r eval {return redis.pcall('xread','BLOCK',0,'STREAMS','s','$')} 1 s} - set res [r eval {return redis.pcall('xreadgroup','group','g','c','STREAMS','s','>')} 1 s] + assert_error "*xread command is not allowed with BLOCK option from scripts" {run_script {return redis.pcall('xread','BLOCK',0,'STREAMS','s','$')} 1 s} + set res [run_script {return redis.pcall('xreadgroup','group','g','c','STREAMS','s','>')} 1 s] assert {$res eq {}} - assert_error "*xreadgroup command is not allowed with BLOCK option from scripts" {r eval {return redis.pcall('xreadgroup','group','g','c','BLOCK',0,'STREAMS','s','>')} 1 s} + assert_error "*xreadgroup command is not allowed with BLOCK option from scripts" {run_script {return redis.pcall('xreadgroup','group','g','c','BLOCK',0,'STREAMS','s','>')} 1 s} } + if {$is_eval eq 1} { + # only is_eval Lua can not execute randomkey test {EVAL - Scripts can't run certain commands} { set e {} r debug lua-always-replicate-commands 0 catch { - r eval "redis.pcall('randomkey'); return redis.pcall('set','x','ciao')" 0 + run_script "redis.pcall('randomkey'); return redis.pcall('set','x','ciao')" 0 } e r debug lua-always-replicate-commands 1 set e } {*not allowed after*} {needs:debug} + } ;# is_eval test {EVAL - No arguments to redis.call/pcall is considered an error} { set e {} - catch {r eval {return redis.call()} 0} e + catch {run_script {return redis.call()} 0} e set e } {*one argument*} test {EVAL - redis.call variant raises a Lua error on Redis cmd error (1)} { set e {} catch { - r eval "redis.call('nosuchcommand')" 0 + run_script "redis.call('nosuchcommand')" 0 } e set e } {*Unknown Redis*} @@ -214,7 +257,7 @@ start_server {tags {"scripting"}} { test {EVAL - redis.call variant raises a Lua error on Redis cmd error (1)} { set e {} catch { - r eval "redis.call('get','a','b','c')" 0 + run_script "redis.call('get','a','b','c')" 0 } e set e } {*number of args*} @@ -223,7 +266,7 @@ start_server {tags {"scripting"}} { set e {} r set foo bar catch { - r eval {redis.call('lpush',KEYS[1],'val')} 1 foo + run_script {redis.call('lpush',KEYS[1],'val')} 1 foo } e set e } {*against a key*} @@ -232,7 +275,7 @@ start_server {tags {"scripting"}} { # We must return the table as a string because otherwise # Redis converts floats to ints and we get 0 and 1023 instead # of 0.0003 and 1023.2 as the parsed output. - r eval {return + run_script {return table.concat( cjson.decode( "[0.0, -5e3, -1, 0.3e-3, 1023.2, 0e10]"), " ") @@ -240,13 +283,13 @@ start_server {tags {"scripting"}} { } {0 -5000 -1 0.0003 1023.2 0} test {EVAL - JSON string decoding} { - r eval {local decoded = cjson.decode('{"keya": "a", "keyb": "b"}') + run_script {local decoded = cjson.decode('{"keya": "a", "keyb": "b"}') return {decoded.keya, decoded.keyb} } 0 } {a b} test {EVAL - cmsgpack can pack double?} { - r eval {local encoded = cmsgpack.pack(0.1) + run_script {local encoded = cmsgpack.pack(0.1) local h = "" for i = 1, #encoded do h = h .. string.format("%02x",string.byte(encoded,i)) @@ -256,7 +299,7 @@ start_server {tags {"scripting"}} { } {cb3fb999999999999a} test {EVAL - cmsgpack can pack negative int64?} { - r eval {local encoded = cmsgpack.pack(-1099511627776) + run_script {local encoded = cmsgpack.pack(-1099511627776) local h = "" for i = 1, #encoded do h = h .. string.format("%02x",string.byte(encoded,i)) @@ -266,7 +309,7 @@ start_server {tags {"scripting"}} { } {d3ffffff0000000000} test {EVAL - cmsgpack can pack and unpack circular references?} { - r eval {local a = {x=nil,y=5} + run_script {local a = {x=nil,y=5} local b = {x=a} a['x'] = b local encoded = cmsgpack.pack(a) @@ -298,7 +341,7 @@ start_server {tags {"scripting"}} { } {82a17905a17881a17882a17905a17881a17882a17905a17881a17882a17905a17881a17882a17905a17881a17882a17905a17881a17882a17905a17881a17882a17905a17881a178c0 1 1} test {EVAL - Numerical sanity check from bitop} { - r eval {assert(0x7fffffff == 2147483647, "broken hex literals"); + run_script {assert(0x7fffffff == 2147483647, "broken hex literals"); assert(0xffffffff == -1 or 0xffffffff == 2^32-1, "broken hex literals"); assert(tostring(-1) == "-1", "broken tostring()"); @@ -309,7 +352,7 @@ start_server {tags {"scripting"}} { } {} test {EVAL - Verify minimal bitop functionality} { - r eval {assert(bit.tobit(1) == 1); + run_script {assert(bit.tobit(1) == 1); assert(bit.band(1) == 1); assert(bit.bxor(1,2) == 3); assert(bit.bor(1,2,4,8,16,32,64,128) == 255) @@ -317,20 +360,22 @@ start_server {tags {"scripting"}} { } {} test {EVAL - Able to parse trailing comments} { - r eval {return 'hello' --trailing comment} 0 + run_script {return 'hello' --trailing comment} 0 } {hello} test {EVAL_RO - Successful case} { r set foo bar - assert_equal bar [r eval_ro {return redis.call('get', KEYS[1]);} 1 foo] + assert_equal bar [run_script_ro {return redis.call('get', KEYS[1]);} 1 foo] } test {EVAL_RO - Cannot run write commands} { r set foo bar - catch {r eval_ro {redis.call('del', KEYS[1]);} 1 foo} e + catch {run_script_ro {redis.call('del', KEYS[1]);} 1 foo} e set e } {*Write commands are not allowed from read-only scripts*} + if {$is_eval eq 1} { + # script command is only relevant for is_eval Lua test {SCRIPTING FLUSH - is able to clear the scripts cache?} { r set mykey myval set v [r evalsha fd758d1589d044dd850a6f05d52f2eefd27f033f 1 mykey] @@ -361,6 +406,7 @@ start_server {tags {"scripting"}} { [r evalsha b534286061d4b9e4026607613b95c06c06015ae8 0] } {b534286061d4b9e4026607613b95c06c06015ae8 loaded} + # reply oredering is only relevant for is_eval Lua test "In the context of Lua the output of random commands gets ordered" { r debug lua-always-replicate-commands 0 r del myset @@ -387,19 +433,20 @@ start_server {tags {"scripting"}} { r sadd myset a b c r eval {return redis.call('sort',KEYS[1],'by','_','get','#','get','_:*')} 1 myset } {a {} b {} c {}} {cluster:skip} + } ;# is_eval test "redis.sha1hex() implementation" { - list [r eval {return redis.sha1hex('')} 0] \ - [r eval {return redis.sha1hex('Pizza & Mandolino')} 0] + list [run_script {return redis.sha1hex('')} 0] \ + [run_script {return redis.sha1hex('Pizza & Mandolino')} 0] } {da39a3ee5e6b4b0d3255bfef95601890afd80709 74822d82031af7493c20eefa13bd07ec4fada82f} test {Globals protection reading an undeclared global variable} { - catch {r eval {return a} 0} e + catch {run_script {return a} 0} e set e } {*ERR*attempted to access * global*} test {Globals protection setting an undeclared global*} { - catch {r eval {a=10} 0} e + catch {run_script {a=10} 0} e set e } {*ERR*attempted to create global*} @@ -417,14 +464,16 @@ start_server {tags {"scripting"}} { } r set foo 5 set res {} - lappend res [r eval $decr_if_gt 1 foo 2] - lappend res [r eval $decr_if_gt 1 foo 2] - lappend res [r eval $decr_if_gt 1 foo 2] - lappend res [r eval $decr_if_gt 1 foo 2] - lappend res [r eval $decr_if_gt 1 foo 2] + lappend res [run_script $decr_if_gt 1 foo 2] + lappend res [run_script $decr_if_gt 1 foo 2] + lappend res [run_script $decr_if_gt 1 foo 2] + lappend res [run_script $decr_if_gt 1 foo 2] + lappend res [run_script $decr_if_gt 1 foo 2] set res } {4 3 2 2 2} + if {$is_eval eq 1} { + # random handling is only relevant for is_eval Lua test {Scripting engine resets PRNG at every script execution} { set rand1 [r eval {return tostring(math.random())} 0] set rand2 [r eval {return tostring(math.random())} 0] @@ -444,13 +493,14 @@ start_server {tags {"scripting"}} { assert_equal $rand1 $rand2 assert {$rand2 ne $rand3} } + } ;# is_eval test {EVAL does not leak in the Lua stack} { r set x 0 # Use a non blocking client to speedup the loop. set rd [redis_deferring_client] for {set j 0} {$j < 10000} {incr j} { - $rd eval {return redis.call("incr",KEYS[1])} 1 x + run_script_on_connection $rd {return redis.call("incr",KEYS[1])} 1 x } for {set j 0} {$j < 10000} {incr j} { $rd read @@ -464,9 +514,9 @@ start_server {tags {"scripting"}} { r flushall r config set appendonly yes r config set aof-use-rdb-preamble no - r eval {redis.call("set",KEYS[1],"100")} 1 foo - r eval {redis.call("incr",KEYS[1])} 1 foo - r eval {redis.call("incr",KEYS[1])} 1 foo + run_script {redis.call("set",KEYS[1],"100")} 1 foo + run_script {redis.call("incr",KEYS[1])} 1 foo + run_script {redis.call("incr",KEYS[1])} 1 foo wait_for_condition 50 100 { [s aof_rewrite_in_progress] == 0 } else { @@ -481,6 +531,8 @@ start_server {tags {"scripting"}} { set res } {102} {external:skip} + if {$is_eval eq 1} { + # script propagation is irrelevant on functions test {EVAL timeout from AOF} { # generate a long running script that is propagated to the AOF as script # make sure that the script times out during loading @@ -528,9 +580,11 @@ start_server {tags {"scripting"}} { assert {[r mget a{t} b{t} c{t} d{t}] eq {1 2 3 4}} assert {[r spop myset] eq {}} } + } ;# is_eval + test {Call Redis command with many args from Lua (issue #1764)} { - r eval { + run_script { local i local x={} redis.call('del','mylist') @@ -543,7 +597,7 @@ start_server {tags {"scripting"}} { } {1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100} test {Number conversion precision test (issue #1118)} { - r eval { + run_script { local value = 9007199254740991 redis.call("set","foo",value) return redis.call("get","foo") @@ -551,19 +605,19 @@ start_server {tags {"scripting"}} { } {9007199254740991} test {String containing number precision test (regression of issue #1118)} { - r eval { + run_script { redis.call("set", "key", "12039611435714932082") return redis.call("get", "key") } 1 key } {12039611435714932082} test {Verify negative arg count is error instead of crash (issue #1842)} { - catch { r eval { return "hello" } -12 } e + catch { run_script { return "hello" } -12 } e set e } {ERR Number of keys can't be negative} test {Correct handling of reused argv (issue #1939)} { - r eval { + run_script { for i = 0, 10 do redis.call('SET', 'a{t}', '1') redis.call('MGET', 'a{t}', 'b{t}', 'c{t}') @@ -576,7 +630,7 @@ start_server {tags {"scripting"}} { test {Functions in the Redis namespace are able to report errors} { catch { - r eval { + run_script { redis.sha1hex() } 0 } e @@ -594,22 +648,22 @@ start_server {tags {"scripting"}} { assert_equal $res $expected_dict # Test RESP3 client with script in both RESP2 and RESP3 modes - set res [r eval {redis.setresp(3); return redis.call('hgetall', KEYS[1])} 1 hash] + set res [run_script {redis.setresp(3); return redis.call('hgetall', KEYS[1])} 1 hash] assert_equal $res $expected_dict - set res [r eval {redis.setresp(2); return redis.call('hgetall', KEYS[1])} 1 hash] + set res [run_script {redis.setresp(2); return redis.call('hgetall', KEYS[1])} 1 hash] assert_equal $res $expected_list # Test RESP2 client with script in both RESP2 and RESP3 modes r HELLO 2 - set res [r eval {redis.setresp(3); return redis.call('hgetall', KEYS[1])} 1 hash] + set res [run_script {redis.setresp(3); return redis.call('hgetall', KEYS[1])} 1 hash] assert_equal $res $expected_list - set res [r eval {redis.setresp(2); return redis.call('hgetall', KEYS[1])} 1 hash] + set res [run_script {redis.setresp(2); return redis.call('hgetall', KEYS[1])} 1 hash] assert_equal $res $expected_list } test {Script return recursive object} { r readraw 1 - set res [r eval {local a = {}; local b = {a}; a[1] = b; return a} 0] + set res [run_script {local a = {}; local b = {a}; a[1] = b; return a} 0] # drain the response while {true} { if {$res == "-ERR reached lua stack limit"} { @@ -640,11 +694,11 @@ start_server {tags {"scripting"}} { test {Timedout read-only scripts can be killed by SCRIPT KILL} { set rd [redis_deferring_client] r config set lua-time-limit 10 - $rd eval {while true do end} 0 + run_script_on_connection $rd {while true do end} 0 after 200 catch {r ping} e assert_match {BUSY*} $e - r script kill + kill_script after 200 ; # Give some time to Lua to call the hook again... assert_equal [r ping] "PONG" $rd close @@ -653,7 +707,7 @@ start_server {tags {"scripting"}} { test {Timedout read-only scripts can be killed by SCRIPT KILL even when use pcall} { set rd [redis_deferring_client] r config set lua-time-limit 10 - $rd eval {local f = function() while 1 do redis.call('ping') end end while 1 do pcall(f) end} 0 + run_script_on_connection $rd {local f = function() while 1 do redis.call('ping') end end while 1 do pcall(f) end} 0 wait_for_condition 50 100 { [catch {r ping} e] == 1 @@ -663,7 +717,7 @@ start_server {tags {"scripting"}} { catch {r ping} e assert_match {BUSY*} $e - r script kill + kill_script wait_for_condition 50 100 { [catch {r ping} e] == 0 @@ -685,8 +739,14 @@ start_server {tags {"scripting"}} { # senging (in a pipeline): # 1. eval "while 1 do redis.call('ping') end" 0 # 2. ping - set buf "*3\r\n\$4\r\neval\r\n\$33\r\nwhile 1 do redis.call('ping') end\r\n\$1\r\n0\r\n" - append buf "*1\r\n\$4\r\nping\r\n" + if {$is_eval == 1} { + set buf "*3\r\n\$4\r\neval\r\n\$33\r\nwhile 1 do redis.call('ping') end\r\n\$1\r\n0\r\n" + append buf "*1\r\n\$4\r\nping\r\n" + } else { + set buf "*6\r\n\$8\r\nfunction\r\n\$6\r\ncreate\r\n\$3\r\nlua\r\n\$4\r\ntest\r\n\$7\r\nreplace\r\n\$33\r\nwhile 1 do redis.call('ping') end\r\n" + append buf "*3\r\n\$5\r\nfcall\r\n\$4\r\ntest\r\n\$1\r\n0\r\n" + append buf "*1\r\n\$4\r\nping\r\n" + } $rd write $buf $rd flush @@ -698,7 +758,7 @@ start_server {tags {"scripting"}} { catch {r ping} e assert_match {BUSY*} $e - r script kill + kill_script wait_for_condition 50 100 { [catch {r ping} e] == 0 } else { @@ -706,6 +766,11 @@ start_server {tags {"scripting"}} { } assert_equal [r ping] "PONG" + if {$is_eval == 0} { + # read the ok reply of function create + assert_match {OK} [$rd read] + } + catch {$rd read} res assert_match {*killed by user*} $res @@ -717,18 +782,18 @@ start_server {tags {"scripting"}} { test {Timedout script link is still usable after Lua returns} { r config set lua-time-limit 10 - r eval {for i=1,100000 do redis.call('ping') end return 'ok'} 0 + run_script {for i=1,100000 do redis.call('ping') end return 'ok'} 0 r ping } {PONG} test {Timedout scripts that modified data can't be killed by SCRIPT KILL} { set rd [redis_deferring_client] r config set lua-time-limit 10 - $rd eval {redis.call('set',KEYS[1],'y'); while true do end} 1 x + run_script_on_connection $rd {redis.call('set',KEYS[1],'y'); while true do end} 1 x after 200 catch {r ping} e assert_match {BUSY*} $e - catch {r script kill} e + catch {kill_script} e assert_match {UNKILLABLE*} $e catch {r ping} e assert_match {BUSY*} $e @@ -761,11 +826,11 @@ foreach cmdrepl {0 1} { # One with an error, but still executing a command. # SHA is: 67164fc43fa971f76fd1aaeeaf60c1c178d25876 catch { - r eval {redis.call('incr',KEYS[1]); redis.call('nonexisting')} 1 x + run_script {redis.call('incr',KEYS[1]); redis.call('nonexisting')} 1 x } # One command is correct: # SHA is: 6f5ade10a69975e903c6d07b10ea44c6382381a5 - r eval {return redis.call('incr',KEYS[1])} 1 x + run_script {return redis.call('incr',KEYS[1])} 1 x } {2} test "Connect a replica to the master instance $rt" { @@ -778,6 +843,7 @@ foreach cmdrepl {0 1} { } } + if {$is_eval eq 1} { test "Now use EVALSHA against the master, with both SHAs $rt" { # The server should replicate successful and unsuccessful # commands as EVAL instead of EVALSHA. @@ -794,11 +860,12 @@ foreach cmdrepl {0 1} { fail "Expected 4 in x, but value is '[r -1 get x]'" } } + } ;# is_eval test "Replication of script multiple pushes to list with BLPOP $rt" { set rd [redis_deferring_client] $rd brpop a 0 - r eval { + run_script { redis.call("lpush",KEYS[1],"1"); redis.call("lpush",KEYS[1],"2"); } 1 a @@ -812,6 +879,7 @@ foreach cmdrepl {0 1} { set res } {a 1} + if {$is_eval eq 1} { test "EVALSHA replication when first call is readonly $rt" { r del x r eval {if tonumber(ARGV[1]) > 0 then redis.call('incr', KEYS[1]) end} 1 x 0 @@ -823,16 +891,17 @@ foreach cmdrepl {0 1} { fail "Expected 1 in x, but value is '[r -1 get x]'" } } + } ;# is_eval test "Lua scripts using SELECT are replicated correctly $rt" { - r eval { + run_script { redis.call("set","foo1","bar1") redis.call("select","10") redis.call("incr","x") redis.call("select","11") redis.call("incr","z") } 0 - r eval { + run_script { redis.call("set","foo1","bar1") redis.call("select","10") redis.call("incr","x") @@ -861,6 +930,8 @@ start_server {tags {"scripting repl external:skip"}} { } } + if {$is_eval eq 1} { + # replicate_commands is the default on Redis Function test "Redis.replicate_commands() must be issued before any write" { r eval { redis.call('set','foo','bar'); @@ -884,11 +955,11 @@ start_server {tags {"scripting repl external:skip"}} { r debug lua-always-replicate-commands 1 set e } {*only after turning on*} + } ;# is_eval test "Redis.set_repl() don't accept invalid values" { catch { - r eval { - redis.replicate_commands(); + run_script { redis.set_repl(12345); } 0 } e @@ -897,8 +968,7 @@ start_server {tags {"scripting repl external:skip"}} { test "Test selective replication of certain Redis commands from Lua" { r del a b c d - r eval { - redis.replicate_commands(); + run_script { redis.call('set','a','1'); redis.set_repl(redis.REPL_NONE); redis.call('set','b','2'); @@ -924,24 +994,37 @@ start_server {tags {"scripting repl external:skip"}} { } test "PRNG is seeded randomly for command replication" { - set a [ - r eval { - redis.replicate_commands(); - return math.random()*100000; - } 0 - ] - set b [ - r eval { - redis.replicate_commands(); - return math.random()*100000; - } 0 - ] + if {$is_eval eq 1} { + # on is_eval Lua we need to call redis.replicate_commands() to get real randomization + set a [ + run_script { + redis.replicate_commands() + return math.random()*100000; + } 0 + ] + set b [ + run_script { + redis.replicate_commands() + return math.random()*100000; + } 0 + ] + } else { + set a [ + run_script { + return math.random()*100000; + } 0 + ] + set b [ + run_script { + return math.random()*100000; + } 0 + ] + } assert {$a ne $b} } test "Using side effects is not a problem with command replication" { - r eval { - redis.replicate_commands(); + run_script { redis.call('set','time',redis.call('time')[1]) } 0 @@ -956,6 +1039,7 @@ start_server {tags {"scripting repl external:skip"}} { } } +if {$is_eval eq 1} { start_server {tags {"scripting external:skip"}} { r script debug sync r eval {return 'hello'} 0 @@ -984,12 +1068,13 @@ start_server {tags {"scripting needs:debug external:skip"}} { r write $cmd r flush set ret [r read] - assert_match {*Unknown Redis command called from*} $ret + assert_match {*Unknown Redis command called from script*} $ret # make sure the server is still ok reconnect assert_equal [r ping] {PONG} } } +} ;# is_eval start_server {tags {"scripting resp3 needs:debug"}} { r debug set-disable-deny-scripts 1 @@ -999,7 +1084,7 @@ start_server {tags {"scripting resp3 needs:debug"}} { r readraw 1 test {test resp3 big number protocol parsing} { - set ret [r eval "redis.setresp($i);return redis.call('debug', 'protocol', 'bignum')" 0] + set ret [run_script "redis.setresp($i);return redis.call('debug', 'protocol', 'bignum')" 0] if {$client_proto == 2 || $i == 2} { # if either Lua or the clien is RESP2 the reply will be RESP2 assert_equal $ret {$37} @@ -1021,7 +1106,7 @@ start_server {tags {"scripting resp3 needs:debug"}} { } test {test resp3 map protocol parsing} { - set ret [r eval "redis.setresp($i);return redis.call('debug', 'protocol', 'map')" 0] + set ret [run_script "redis.setresp($i);return redis.call('debug', 'protocol', 'map')" 0] if {$client_proto == 2 || $i == 2} { # if either Lua or the clien is RESP2 the reply will be RESP2 assert_equal $ret {*6} @@ -1034,7 +1119,7 @@ start_server {tags {"scripting resp3 needs:debug"}} { } test {test resp3 set protocol parsing} { - set ret [r eval "redis.setresp($i);return redis.call('debug', 'protocol', 'set')" 0] + set ret [run_script "redis.setresp($i);return redis.call('debug', 'protocol', 'set')" 0] if {$client_proto == 2 || $i == 2} { # if either Lua or the clien is RESP2 the reply will be RESP2 assert_equal $ret {*3} @@ -1047,7 +1132,7 @@ start_server {tags {"scripting resp3 needs:debug"}} { } test {test resp3 double protocol parsing} { - set ret [r eval "redis.setresp($i);return redis.call('debug', 'protocol', 'double')" 0] + set ret [run_script "redis.setresp($i);return redis.call('debug', 'protocol', 'double')" 0] if {$client_proto == 2 || $i == 2} { # if either Lua or the clien is RESP2 the reply will be RESP2 assert_equal $ret {$5} @@ -1058,7 +1143,7 @@ start_server {tags {"scripting resp3 needs:debug"}} { } test {test resp3 null protocol parsing} { - set ret [r eval "redis.setresp($i);return redis.call('debug', 'protocol', 'null')" 0] + set ret [run_script "redis.setresp($i);return redis.call('debug', 'protocol', 'null')" 0] if {$client_proto == 2} { # null is a special case in which a Lua client format does not effect the reply to the client assert_equal $ret {$-1} @@ -1068,7 +1153,7 @@ start_server {tags {"scripting resp3 needs:debug"}} { } {} test {test resp3 verbatim protocol parsing} { - set ret [r eval "redis.setresp($i);return redis.call('debug', 'protocol', 'verbatim')" 0] + set ret [run_script "redis.setresp($i);return redis.call('debug', 'protocol', 'verbatim')" 0] if {$client_proto == 2 || $i == 2} { # if either Lua or the clien is RESP2 the reply will be RESP2 assert_equal $ret {$25} @@ -1082,7 +1167,7 @@ start_server {tags {"scripting resp3 needs:debug"}} { } test {test resp3 true protocol parsing} { - set ret [r eval "redis.setresp($i);return redis.call('debug', 'protocol', 'true')" 0] + set ret [run_script "redis.setresp($i);return redis.call('debug', 'protocol', 'true')" 0] if {$client_proto == 2 || $i == 2} { # if either Lua or the clien is RESP2 the reply will be RESP2 assert_equal $ret {:1} @@ -1092,7 +1177,7 @@ start_server {tags {"scripting resp3 needs:debug"}} { } test {test resp3 false protocol parsing} { - set ret [r eval "redis.setresp($i);return redis.call('debug', 'protocol', 'false')" 0] + set ret [run_script "redis.setresp($i);return redis.call('debug', 'protocol', 'false')" 0] if {$client_proto == 2 || $i == 2} { # if either Lua or the clien is RESP2 the reply will be RESP2 assert_equal $ret {:0} @@ -1109,8 +1194,9 @@ start_server {tags {"scripting resp3 needs:debug"}} { test {test resp3 attribute protocol parsing} { # attributes are not (yet) expose to the script # So here we just check the parser handles them and they are ignored. - r eval "redis.setresp(3);return redis.call('debug', 'protocol', 'attrib')" 0 + run_script "redis.setresp(3);return redis.call('debug', 'protocol', 'attrib')" 0 } {Some real reply following the attribute} r debug set-disable-deny-scripts 0 } +} ;# foreach is_eval |