diff options
author | meir@redislabs.com <meir@redislabs.com> | 2021-10-07 14:41:26 +0300 |
---|---|---|
committer | meir <meir@redis.com> | 2021-12-02 19:35:52 +0200 |
commit | cbd463175f8b52d594fd4e6b953fa58a5db053c3 (patch) | |
tree | 2b2e4080b6b4d399eaa3053928f0193f5ddbb360 /src/functions.c | |
parent | f21dc38a6ed3851a5e6501199e803ff0b93795cf (diff) | |
download | redis-cbd463175f8b52d594fd4e6b953fa58a5db053c3.tar.gz |
Redis Functions - Added redis function unit and Lua engine
Redis function unit is located inside functions.c
and contains Redis Function implementation:
1. FUNCTION commands:
* FUNCTION CREATE
* FCALL
* FCALL_RO
* FUNCTION DELETE
* FUNCTION KILL
* FUNCTION INFO
2. Register engine
In addition, this commit introduce the first engine
that uses the Redis Function capabilities, the
Lua engine.
Diffstat (limited to 'src/functions.c')
-rw-r--r-- | src/functions.c | 538 |
1 files changed, 538 insertions, 0 deletions
diff --git a/src/functions.c b/src/functions.c new file mode 100644 index 000000000..b0d7a2305 --- /dev/null +++ b/src/functions.c @@ -0,0 +1,538 @@ +/* + * Copyright (c) 2021, Redis Ltd. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of Redis nor the names of its contributors may be used + * to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include "functions.h" +#include "sds.h" +#include "dict.h" +#include "adlist.h" +#include "atomicvar.h" + +static size_t engine_cache_memory = 0; + +/* Forward declaration */ +static void engineFunctionDispose(dict *d, void *obj); + +struct functionsCtx { + dict *functions; /* Function name -> Function object that can be used to run the function */ + size_t cache_memory /* Overhead memory (structs, dictionaries, ..) used by all the functions */; +}; + +dictType engineDictType = { + dictSdsCaseHash, /* hash function */ + dictSdsDup, /* key dup */ + NULL, /* val dup */ + dictSdsKeyCaseCompare, /* key compare */ + dictSdsDestructor, /* key destructor */ + NULL, /* val destructor */ + NULL /* allow to expand */ +}; + +dictType functionDictType = { + dictSdsHash, /* hash function */ + dictSdsDup, /* key dup */ + NULL, /* val dup */ + dictSdsKeyCompare, /* key compare */ + dictSdsDestructor, /* key destructor */ + engineFunctionDispose,/* val destructor */ + NULL /* allow to expand */ +}; + +/* Dictionary of engines */ +static dict *engines = NULL; + +/* Functions Ctx. + * Contains the dictionary that map a function name to + * function object and the cache memory used by all the functions */ +static functionsCtx *functions_ctx = NULL; + +static size_t functionMallocSize(functionInfo *fi) { + return zmalloc_size(fi) + sdsZmallocSize(fi->name) + + (fi->desc ? sdsZmallocSize(fi->desc) : 0) + + sdsZmallocSize(fi->code) + + fi->ei->engine->get_function_memory_overhead(fi->function); +} + +/* Dispose function memory */ +static void engineFunctionDispose(dict *d, void *obj) { + UNUSED(d); + functionInfo *fi = obj; + sdsfree(fi->code); + sdsfree(fi->name); + if (fi->desc) { + sdsfree(fi->desc); + } + engine *engine = fi->ei->engine; + engine->free_function(engine->engine_ctx, fi->function); + zfree(fi); +} + +/* Free function memory and detele it from the functions dictionary */ +static void engineFunctionFree(functionInfo *fi, functionsCtx *functions) { + functions->cache_memory -= functionMallocSize(fi); + + dictDelete(functions->functions, fi->name); +} + +/* Clear all the functions from the given functions ctx */ +void functionsCtxClear(functionsCtx *functions_ctx) { + dictEmpty(functions_ctx->functions, NULL); + functions_ctx->cache_memory = 0; +} + +/* Free the given functions ctx */ +void functionsCtxFree(functionsCtx *functions_ctx) { + functionsCtxClear(functions_ctx); + dictRelease(functions_ctx->functions); + zfree(functions_ctx); +} + +/* Swap the current functions ctx with the given one. + * Free the old functions ctx. */ +void functionsCtxSwapWithCurrent(functionsCtx *new_functions_ctx) { + functionsCtxFree(functions_ctx); + functions_ctx = new_functions_ctx; +} + +/* return the current functions ctx */ +functionsCtx* functionsCtxGetCurrent() { + return functions_ctx; +} + +/* Create a new functions ctx */ +functionsCtx* functionsCtxCreate() { + functionsCtx *ret = zmalloc(sizeof(functionsCtx)); + ret->functions = dictCreate(&functionDictType); + ret->cache_memory = 0; + return ret; +} + +/* + * Register a function info to functions dictionary + * 1. Set the function client + * 2. Add function to functions dictionary + * 3. update cache memory + */ +static void engineFunctionRegister(functionInfo *fi, functionsCtx *functions) { + int res = dictAdd(functions->functions, fi->name, fi); + serverAssert(res == DICT_OK); + + functions->cache_memory += functionMallocSize(fi); +} + +/* + * Creating a function info object and register it. + * Return the created object + */ +static functionInfo* engineFunctionCreate(sds name, void *function, engineInfo *ei, + sds desc, sds code, functionsCtx *functions) +{ + + functionInfo *fi = zmalloc(sizeof(*fi)); + *fi = (functionInfo ) { + .name = sdsdup(name), + .function = function, + .ei = ei, + .code = sdsdup(code), + .desc = desc ? sdsdup(desc) : NULL, + }; + + engineFunctionRegister(fi, functions); + + return fi; +} + +/* Register an engine, should be called once by the engine on startup and give the following: + * + * - engine_name - name of the engine to register + * - engine_ctx - the engine ctx that should be used by Redis to interact with the engine */ +int functionsRegisterEngine(const char *engine_name, engine *engine) { + sds engine_name_sds = sdsnew(engine_name); + if (dictFetchValue(engines, engine_name_sds)) { + serverLog(LL_WARNING, "Same engine was registered twice"); + sdsfree(engine_name_sds); + return C_ERR; + } + + client *c = createClient(NULL); + c->flags |= (CLIENT_DENY_BLOCKING | CLIENT_SCRIPT); + engineInfo *ei = zmalloc(sizeof(*ei)); + *ei = (engineInfo ) { .name = engine_name_sds, .engine = engine, .c = c,}; + + dictAdd(engines, engine_name_sds, ei); + + engine_cache_memory += zmalloc_size(ei) + sdsZmallocSize(ei->name) + + zmalloc_size(engine) + + engine->get_engine_memory_overhead(engine->engine_ctx); + + return C_OK; +} + +/* + * FUNCTION STATS + */ +void functionsStatsCommand(client *c) { + if (scriptIsRunning() && scriptIsEval()) { + addReplyErrorObject(c, shared.slowevalerr); + return; + } + + addReplyMapLen(c, 2); + + addReplyBulkCString(c, "running_script"); + if (!scriptIsRunning()) { + addReplyNull(c); + } else { + addReplyMapLen(c, 3); + addReplyBulkCString(c, "name"); + addReplyBulkCString(c, scriptCurrFunction()); + addReplyBulkCString(c, "command"); + client *script_client = scriptGetCaller(); + addReplyArrayLen(c, script_client->argc); + for (int i = 0 ; i < script_client->argc ; ++i) { + addReplyBulkCBuffer(c, script_client->argv[i]->ptr, sdslen(script_client->argv[i]->ptr)); + } + addReplyBulkCString(c, "duration_ms"); + addReplyLongLong(c, scriptRunDuration()); + } + + addReplyBulkCString(c, "engines"); + addReplyArrayLen(c, dictSize(engines)); + dictIterator *iter = dictGetIterator(engines); + dictEntry *entry = NULL; + while ((entry = dictNext(iter))) { + engineInfo *ei = dictGetVal(entry); + addReplyBulkCString(c, ei->name); + } + dictReleaseIterator(iter); +} + +/* + * FUNCTION LIST + */ +void functionsListCommand(client *c) { + /* general information on all the functions */ + addReplyArrayLen(c, dictSize(functions_ctx->functions)); + dictIterator *iter = dictGetIterator(functions_ctx->functions); + dictEntry *entry = NULL; + while ((entry = dictNext(iter))) { + functionInfo *fi = dictGetVal(entry); + addReplyMapLen(c, 3); + addReplyBulkCString(c, "name"); + addReplyBulkCBuffer(c, fi->name, sdslen(fi->name)); + addReplyBulkCString(c, "engine"); + addReplyBulkCBuffer(c, fi->ei->name, sdslen(fi->ei->name)); + addReplyBulkCString(c, "description"); + if (fi->desc) { + addReplyBulkCBuffer(c, fi->desc, sdslen(fi->desc)); + } else { + addReplyNull(c); + } + } + dictReleaseIterator(iter); +} + +/* + * FUNCTION INFO <FUNCTION NAME> [WITHCODE] + */ +void functionsInfoCommand(client *c) { + if (c->argc > 4) { + addReplyErrorFormat(c,"wrong number of arguments for '%s' command or subcommand", c->cmd->name); + return; + } + /* dedicated information on specific function */ + robj *function_name = c->argv[2]; + int with_code = 0; + if (c->argc == 4) { + robj *with_code_arg = c->argv[3]; + if (!strcasecmp(with_code_arg->ptr, "withcode")) { + with_code = 1; + } + } + + functionInfo *fi = dictFetchValue(functions_ctx->functions, function_name->ptr); + if (!fi) { + addReplyError(c, "Function does not exists"); + return; + } + addReplyMapLen(c, with_code? 4 : 3); + addReplyBulkCString(c, "name"); + addReplyBulkCBuffer(c, fi->name, sdslen(fi->name)); + addReplyBulkCString(c, "engine"); + addReplyBulkCBuffer(c, fi->ei->name, sdslen(fi->ei->name)); + addReplyBulkCString(c, "description"); + if (fi->desc) { + addReplyBulkCBuffer(c, fi->desc, sdslen(fi->desc)); + } else { + addReplyNull(c); + } + if (with_code) { + addReplyBulkCString(c, "code"); + addReplyBulkCBuffer(c, fi->code, sdslen(fi->code)); + } +} + +/* + * FUNCTION DELETE <FUNCTION NAME> + */ +void functionsDeleteCommand(client *c) { + if (server.masterhost && server.repl_slave_ro && !(c->flags & CLIENT_MASTER)) { + addReplyError(c, "Can not delete a function on a read only replica"); + return; + } + + robj *function_name = c->argv[2]; + functionInfo *fi = dictFetchValue(functions_ctx->functions, function_name->ptr); + if (!fi) { + addReplyError(c, "Function not found"); + return; + } + + engineFunctionFree(fi, functions_ctx); + forceCommandPropagation(c, PROPAGATE_REPL | PROPAGATE_AOF); + addReply(c, shared.ok); +} + +void functionsKillCommand(client *c) { + scriptKill(c, 0); +} + +static void fcallCommandGeneric(client *c, int ro) { + robj *function_name = c->argv[1]; + functionInfo *fi = dictFetchValue(functions_ctx->functions, function_name->ptr); + if (!fi) { + addReplyError(c, "Function not found"); + return; + } + engine *engine = fi->ei->engine; + + long long numkeys; + /* Get the number of arguments that are keys */ + if (getLongLongFromObject(c->argv[2], &numkeys) != C_OK) { + addReplyError(c, "Bad number of keys provided"); + return; + } + if (numkeys > (c->argc - 3)) { + addReplyError(c, "Number of keys can't be greater than number of args"); + return; + } else if (numkeys < 0) { + addReplyError(c, "Number of keys can't be negative"); + return; + } + + scriptRunCtx run_ctx; + + scriptPrepareForRun(&run_ctx, fi->ei->c, c, fi->name); + if (ro) { + run_ctx.flags |= SCRIPT_READ_ONLY; + } + engine->call(&run_ctx, engine->engine_ctx, fi->function, c->argv + 3, numkeys, + c->argv + 3 + numkeys, c->argc - 3 - numkeys); + scriptResetRun(&run_ctx); +} + +/* + * FCALL <FUNCTION NAME> nkeys <key1 .. keyn> <arg1 .. argn> + */ +void fcallCommand(client *c) { + fcallCommandGeneric(c, 0); +} + +/* + * FCALL_RO <FUNCTION NAME> nkeys <key1 .. keyn> <arg1 .. argn> + */ +void fcallCommandReadOnly(client *c) { + fcallCommandGeneric(c, 1); +} + +void functionsHelpCommand(client *c) { + const char *help[] = { +"CREATE <ENGINE NAME> <FUNCTION NAME> [REPLACE] [DESC <FUNCTION DESCRIPTION>] <FUNCTION CODE>", +" Create a new function with the given function name and code.", +"DELETE <FUNCTION NAME>", +" Delete the given function.", +"INFO <FUNCTION NAME> [WITHCODE]", +" For each function, print the following information about the function:", +" * Function name", +" * The engine used to run the function", +" * Function description", +" * Function code (only if WITHCODE is given)", +"LIST", +" Return general information on all the functions:", +" * Function name", +" * The engine used to run the function", +" * Function description", +"STATS", +" Return information about the current function running:", +" * Function name", +" * Command used to run the function", +" * Duration in MS that the function is running", +" If not function is running, return nil", +" In addition, returns a list of available engines.", +"KILL", +" Kill the current running function.", +NULL }; + addReplyHelp(c, help); +} + +/* Compile and save the given function, return C_OK on success and C_ERR on failure. + * In case on failure the err out param is set with relevant error message */ +int functionsCreateWithFunctionCtx(sds function_name,sds engine_name, sds desc, sds code, + int replace, sds* err, functionsCtx *functions) { + engineInfo *ei = dictFetchValue(engines, engine_name); + if (!ei) { + *err = sdsnew("Engine not found"); + return C_ERR; + } + engine *engine = ei->engine; + + functionInfo *fi = dictFetchValue(functions->functions, function_name); + if (fi && !replace) { + *err = sdsnew("Function already exists"); + return C_ERR; + } + + void *function = engine->create(engine->engine_ctx, code, err); + if (*err) { + return C_ERR; + } + + if (fi) { + /* free the already existing function as we are going to replace it */ + engineFunctionFree(fi, functions); + } + + engineFunctionCreate(function_name, function, ei, desc, code, functions); + + return C_OK; +} + +/* + * FUNCTION CREATE <ENGINE NAME> <FUNCTION NAME> + * [REPLACE] [DESC <FUNCTION DESCRIPTION>] <FUNCTION CODE> + * + * ENGINE NAME - name of the engine to use the run the function + * FUNCTION NAME - name to use to invoke the function + * REPLACE - optional, replace existing function + * DESCRIPTION - optional, function description + * FUNCTION CODE - function code to pass to the engine + */ +void functionsCreateCommand(client *c) { + + if (server.masterhost && server.repl_slave_ro && !(c->flags & CLIENT_MASTER)) { + addReplyError(c, "Can not create a function on a read only replica"); + return; + } + + robj *engine_name = c->argv[2]; + robj *function_name = c->argv[3]; + + int replace = 0; + int argc_pos = 4; + sds desc = NULL; + while (argc_pos < c->argc - 1) { + robj *next_arg = c->argv[argc_pos++]; + if (!strcasecmp(next_arg->ptr, "replace")) { + replace = 1; + continue; + } + if (!strcasecmp(next_arg->ptr, "description")) { + if (argc_pos >= c->argc) { + addReplyError(c, "Bad function description"); + return; + } + desc = c->argv[argc_pos++]->ptr; + continue; + } + } + + if (argc_pos >= c->argc) { + addReplyError(c, "Function code is missing"); + return; + } + + robj *code = c->argv[argc_pos]; + sds err = NULL; + if (functionsCreateWithFunctionCtx(function_name->ptr, engine_name->ptr, + desc, code->ptr, replace, &err, functions_ctx) != C_OK) + { + addReplyErrorSds(c, err); + return; + } + forceCommandPropagation(c, PROPAGATE_REPL | PROPAGATE_AOF); + addReply(c, shared.ok); +} + +/* Return memory usage of all the engines combine */ +unsigned long functionsMemory() { + dictIterator *iter = dictGetIterator(engines); + dictEntry *entry = NULL; + size_t engines_nemory = 0; + while ((entry = dictNext(iter))) { + engineInfo *ei = dictGetVal(entry); + engine *engine = ei->engine; + engines_nemory += engine->get_used_memory(engine->engine_ctx); + } + dictReleaseIterator(iter); + + return engines_nemory; +} + +/* Return memory overhead of all the engines combine */ +unsigned long functionsMemoryOverhead() { + size_t memory_overhead = dictSize(engines) * sizeof(dictEntry) + + dictSlots(engines) * sizeof(dictEntry*); + memory_overhead += dictSize(functions_ctx->functions) * sizeof(dictEntry) + + dictSlots(functions_ctx->functions) * sizeof(dictEntry*) + sizeof(functionsCtx); + memory_overhead += functions_ctx->cache_memory; + memory_overhead += engine_cache_memory; + + return memory_overhead; +} + +/* Returns the number of functions */ +unsigned long functionsNum() { + return dictSize(functions_ctx->functions); +} + +dict* functionsGet() { + return functions_ctx->functions; +} + +/* Initialize engine data structures. + * Should be called once on server initialization */ +int functionsInit() { + engines = dictCreate(&engineDictType); + functions_ctx = functionsCtxCreate(); + + if (luaEngineInitEngine() != C_OK) { + return C_ERR; + } + + return C_OK; +} |