diff options
45 files changed, 1720 insertions, 553 deletions
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 847abcf02..cc4991606 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -3,11 +3,8 @@ name: CI on: [push, pull_request] jobs: - build-ubuntu: - strategy: - matrix: - platform: [ubuntu-latest, ubuntu-16.04] - runs-on: ${{ matrix.platform }} + test-ubuntu-latest: + runs-on: ubuntu-latest steps: - uses: actions/checkout@v1 - name: make @@ -15,13 +12,17 @@ jobs: - name: test run: | sudo apt-get install tcl8.5 - make test + ./runtest --clients 2 --verbose + + build-ubuntu-old: + runs-on: ubuntu-16.04 + steps: + - uses: actions/checkout@v1 + - name: make + run: make build-macos-latest: - strategy: - matrix: - platform: [macos-latest, macOS-10.14] - runs-on: ${{ matrix.platform }} + runs-on: macos-latest steps: - uses: actions/checkout@v1 - name: make @@ -35,6 +35,11 @@ It is as simple as: % make +To build with TLS support, you'll need OpenSSL development libraries (e.g. +libssl-dev on Debian/Ubuntu) and run: + + % make BUILD_TLS=yes + You can run a 32 bit Redis binary using: % make 32bit @@ -43,6 +48,13 @@ After building Redis, it is a good idea to test it using: % make test +If TLS is built, running the tests with TLS enabled (you will need `tcl-tls` +installed): + + % ./utils/gen-test-certs.sh + % ./runtest --tls + + Fixing build problems with dependencies or cached build options --------- @@ -125,6 +137,12 @@ as options using the command line. Examples: All the options in redis.conf are also supported as options using the command line, with exactly the same name. +Running Redis with TLS: +------------------ + +Please consult the [TLS.md](TLS.md) file for more information on +how to use Redis with TLS. + Playing with Redis ------------------ @@ -1,8 +1,5 @@ -TLS Support -- Work In Progress -=============================== - -This is a brief note to capture current thoughts/ideas and track pending action -items. +TLS Support +=========== Getting Started --------------- @@ -69,37 +66,23 @@ probably not be so hard. For cluster keys migration it might be more difficult, but there are probably other good reasons to improve that part anyway. To-Do List -========== - -Additional TLS Features ------------------------ - -1. Add metrics to INFO? -2. Add session caching support. Check if/how it's handled by clients to assess - how useful/important it is. - -redis-benchmark ---------------- - -The current implementation is a mix of using hiredis for parsing and basic -networking (establishing connections), but directly manipulating sockets for -most actions. - -This will need to be cleaned up for proper TLS support. The best approach is -probably to migrate to hiredis async mode. - -redis-cli ---------- +---------- -1. Add support for TLS in --slave and --rdb modes. +- [ ] Add session caching support. Check if/how it's handled by clients to + assess how useful/important it is. +- [ ] redis-benchmark support. The current implementation is a mix of using + hiredis for parsing and basic networking (establishing connections), but + directly manipulating sockets for most actions. This will need to be cleaned + up for proper TLS support. The best approach is probably to migrate to hiredis + async mode. +- [ ] redis-cli `--slave` and `--rdb` support. -Others ------- +Multi-port +---------- Consider the implications of allowing TLS to be configured on a separate port, -making Redis listening on multiple ports. +making Redis listening on multiple ports: -This impacts many things, like 1. Startup banner port notification 2. Proctitle 3. How slaves announce themselves diff --git a/deps/lua/src/lua_struct.c b/deps/lua/src/lua_struct.c index 4d5f027b8..c58c8e72b 100644 --- a/deps/lua/src/lua_struct.c +++ b/deps/lua/src/lua_struct.c @@ -89,12 +89,14 @@ typedef struct Header { } Header; -static int getnum (const char **fmt, int df) { +static int getnum (lua_State *L, const char **fmt, int df) { if (!isdigit(**fmt)) /* no number? */ return df; /* return default value */ else { int a = 0; do { + if (a > (INT_MAX / 10) || a * 10 > (INT_MAX - (**fmt - '0'))) + luaL_error(L, "integral size overflow"); a = a*10 + *((*fmt)++) - '0'; } while (isdigit(**fmt)); return a; @@ -115,9 +117,9 @@ static size_t optsize (lua_State *L, char opt, const char **fmt) { case 'f': return sizeof(float); case 'd': return sizeof(double); case 'x': return 1; - case 'c': return getnum(fmt, 1); + case 'c': return getnum(L, fmt, 1); case 'i': case 'I': { - int sz = getnum(fmt, sizeof(int)); + int sz = getnum(L, fmt, sizeof(int)); if (sz > MAXINTSIZE) luaL_error(L, "integral size %d is larger than limit of %d", sz, MAXINTSIZE); @@ -150,7 +152,7 @@ static void controloptions (lua_State *L, int opt, const char **fmt, case '>': h->endian = BIG; return; case '<': h->endian = LITTLE; return; case '!': { - int a = getnum(fmt, MAXALIGN); + int a = getnum(L, fmt, MAXALIGN); if (!isp2(a)) luaL_error(L, "alignment %d is not a power of 2", a); h->align = a; diff --git a/redis.conf b/redis.conf index 07005cffe..c04880f32 100644 --- a/redis.conf +++ b/redis.conf @@ -155,23 +155,22 @@ tcp-keepalive 300 # tls-ca-cert-file ca.crt # tls-ca-cert-dir /etc/ssl/certs -# If TLS/SSL clients are required to authenticate using a client side -# certificate, use this directive. +# By default, clients (including replica servers) on a TLS port are required +# to authenticate using valid client side certificates. # -# Note: this applies to all incoming clients, including replicas. +# It is possible to disable authentication using this directive. # -# tls-auth-clients yes +# tls-auth-clients no -# If TLS/SSL should be used when connecting as a replica to a master, enable -# this configuration directive: +# By default, a Redis replica does not attempt to establish a TLS connection +# with its master. +# +# Use the following directive to enable TLS on replication links. # # tls-replication yes -# If TLS/SSL should be used for the Redis Cluster bus, enable this configuration -# directive. -# -# NOTE: If TLS/SSL is enabled for Cluster Bus, mutual authentication is always -# enforced. +# By default, the Redis Cluster bus uses a plain TCP connection. To enable +# TLS for the bus protocol, use the following directive: # # tls-cluster yes @@ -1362,7 +1361,11 @@ latency-monitor-threshold 0 # z Sorted set commands # x Expired events (events generated every time a key expires) # e Evicted events (events generated when a key is evicted for maxmemory) -# A Alias for g$lshzxe, so that the "AKE" string means all the events. +# t Stream commands +# m Key-miss events (Note: It is not included in the 'A' class) +# A Alias for g$lshzxet, so that the "AKE" string means all the events +# (Except key-miss events which are excluded from 'A' due to their +# unique nature). # # The "notify-keyspace-events" takes as argument a string that is composed # of zero or multiple characters. The empty string means that notifications @@ -49,6 +49,8 @@ list *UsersToLoad; /* This is a list of users found in the configuration file array of SDS pointers: the first is the user name, all the remaining pointers are ACL rules in the same format as ACLSetUser(). */ +list *ACLLog; /* Our security log, the user is able to inspect that + using the ACL LOG command .*/ struct ACLCategoryItem { const char *name; @@ -93,6 +95,7 @@ struct ACLUserFlag { void ACLResetSubcommandsForCommand(user *u, unsigned long id); void ACLResetSubcommands(user *u); void ACLAddAllowedSubcommand(user *u, unsigned long id, const char *sub); +void ACLFreeLogEntry(void *le); /* The length of the string representation of a hashed password. */ #define HASH_PASSWORD_LEN SHA256_BLOCK_SIZE*2 @@ -920,6 +923,7 @@ void ACLInitDefaultUser(void) { void ACLInit(void) { Users = raxNew(); UsersToLoad = listCreate(); + ACLLog = listCreate(); ACLInitDefaultUser(); } @@ -978,6 +982,7 @@ int ACLAuthenticateUser(client *c, robj *username, robj *password) { moduleNotifyUserChanged(c); return C_OK; } else { + addACLLogEntry(c,ACL_DENIED_AUTH,0,username->ptr); return C_ERR; } } @@ -1034,7 +1039,7 @@ user *ACLGetUserByName(const char *name, size_t namelen) { * command cannot be executed because the user is not allowed to run such * command, the second if the command is denied because the user is trying * to access keys that are not among the specified patterns. */ -int ACLCheckCommandPerm(client *c) { +int ACLCheckCommandPerm(client *c, int *keyidxptr) { user *u = c->user; uint64_t id = c->cmd->id; @@ -1094,6 +1099,7 @@ int ACLCheckCommandPerm(client *c) { } } if (!match) { + if (keyidxptr) *keyidxptr = keyidx[j]; getKeysFreeResult(keyidx); return ACL_DENIED_KEY; } @@ -1455,12 +1461,138 @@ void ACLLoadUsersAtStartup(void) { } /* ============================================================================= + * ACL log + * ==========================================================================*/ + +#define ACL_LOG_CTX_TOPLEVEL 0 +#define ACL_LOG_CTX_LUA 1 +#define ACL_LOG_CTX_MULTI 2 +#define ACL_LOG_GROUPING_MAX_TIME_DELTA 60000 + +/* This structure defines an entry inside the ACL log. */ +typedef struct ACLLogEntry { + uint64_t count; /* Number of times this happened recently. */ + int reason; /* Reason for denying the command. ACL_DENIED_*. */ + int context; /* Toplevel, Lua or MULTI/EXEC? ACL_LOG_CTX_*. */ + sds object; /* The key name or command name. */ + sds username; /* User the client is authenticated with. */ + mstime_t ctime; /* Milliseconds time of last update to this entry. */ + sds cinfo; /* Client info (last client if updated). */ +} ACLLogEntry; + +/* This function will check if ACL entries 'a' and 'b' are similar enough + * that we should actually update the existing entry in our ACL log instead + * of creating a new one. */ +int ACLLogMatchEntry(ACLLogEntry *a, ACLLogEntry *b) { + if (a->reason != b->reason) return 0; + if (a->context != b->context) return 0; + mstime_t delta = a->ctime - b->ctime; + if (delta < 0) delta = -delta; + if (delta > ACL_LOG_GROUPING_MAX_TIME_DELTA) return 0; + if (sdscmp(a->object,b->object) != 0) return 0; + if (sdscmp(a->username,b->username) != 0) return 0; + return 1; +} + +/* Release an ACL log entry. */ +void ACLFreeLogEntry(void *leptr) { + ACLLogEntry *le = leptr; + sdsfree(le->object); + sdsfree(le->username); + sdsfree(le->cinfo); + zfree(le); +} + +/* Adds a new entry in the ACL log, making sure to delete the old entry + * if we reach the maximum length allowed for the log. This function attempts + * to find similar entries in the current log in order to bump the counter of + * the log entry instead of creating many entries for very similar ACL + * rules issues. + * + * The keypos argument is only used when the reason is ACL_DENIED_KEY, since + * it allows the function to log the key name that caused the problem. + * Similarly the username is only passed when we failed to authenticate the + * user with AUTH or HELLO, for the ACL_DENIED_AUTH reason. Otherwise + * it will just be NULL. + */ +void addACLLogEntry(client *c, int reason, int keypos, sds username) { + /* Create a new entry. */ + struct ACLLogEntry *le = zmalloc(sizeof(*le)); + le->count = 1; + le->reason = reason; + le->username = sdsdup(reason == ACL_DENIED_AUTH ? username : c->user->name); + le->ctime = mstime(); + + switch(reason) { + case ACL_DENIED_CMD: le->object = sdsnew(c->cmd->name); break; + case ACL_DENIED_KEY: le->object = sdsnew(c->argv[keypos]->ptr); break; + case ACL_DENIED_AUTH: le->object = sdsnew(c->argv[0]->ptr); break; + default: le->object = sdsempty(); + } + + client *realclient = c; + if (realclient->flags & CLIENT_LUA) realclient = server.lua_caller; + + le->cinfo = catClientInfoString(sdsempty(),realclient); + if (c->flags & CLIENT_MULTI) { + le->context = ACL_LOG_CTX_MULTI; + } else if (c->flags & CLIENT_LUA) { + le->context = ACL_LOG_CTX_LUA; + } else { + le->context = ACL_LOG_CTX_TOPLEVEL; + } + + /* Try to match this entry with past ones, to see if we can just + * update an existing entry instead of creating a new one. */ + long toscan = 10; /* Do a limited work trying to find duplicated. */ + listIter li; + listNode *ln; + listRewind(ACLLog,&li); + ACLLogEntry *match = NULL; + while (toscan-- && (ln = listNext(&li)) != NULL) { + ACLLogEntry *current = listNodeValue(ln); + if (ACLLogMatchEntry(current,le)) { + match = current; + listDelNode(ACLLog,ln); + listAddNodeHead(ACLLog,current); + break; + } + } + + /* If there is a match update the entry, otherwise add it as a + * new one. */ + if (match) { + /* We update a few fields of the existing entry and bump the + * counter of events for this entry. */ + sdsfree(match->cinfo); + match->cinfo = le->cinfo; + match->ctime = le->ctime; + match->count++; + + /* Release the old entry. */ + le->cinfo = NULL; + ACLFreeLogEntry(le); + } else { + /* Add it to our list of entires. We'll have to trim the list + * to its maximum size. */ + listAddNodeHead(ACLLog, le); + while(listLength(ACLLog) > server.acllog_max_len) { + listNode *ln = listLast(ACLLog); + ACLLogEntry *le = listNodeValue(ln); + ACLFreeLogEntry(le); + listDelNode(ACLLog,ln); + } + } +} + +/* ============================================================================= * ACL related commands * ==========================================================================*/ /* ACL -- show and modify the configuration of ACL users. * ACL HELP * ACL LOAD + * ACL SAVE * ACL LIST * ACL USERS * ACL CAT [<category>] @@ -1469,6 +1601,7 @@ void ACLLoadUsersAtStartup(void) { * ACL GETUSER <username> * ACL GENPASS * ACL WHOAMI + * ACL LOG [<count> | RESET] */ void aclCommand(client *c) { char *sub = c->argv[1]->ptr; @@ -1655,9 +1788,75 @@ void aclCommand(client *c) { char pass[32]; /* 128 bits of actual pseudo random data. */ getRandomHexChars(pass,sizeof(pass)); addReplyBulkCBuffer(c,pass,sizeof(pass)); + } else if (!strcasecmp(sub,"log") && (c->argc == 2 || c->argc ==3)) { + long count = 10; /* Number of entries to emit by default. */ + + /* Parse the only argument that LOG may have: it could be either + * the number of entires the user wants to display, or alternatively + * the "RESET" command in order to flush the old entires. */ + if (c->argc == 3) { + if (!strcasecmp(c->argv[2]->ptr,"reset")) { + listSetFreeMethod(ACLLog,ACLFreeLogEntry); + listEmpty(ACLLog); + listSetFreeMethod(ACLLog,NULL); + addReply(c,shared.ok); + return; + } else if (getLongFromObjectOrReply(c,c->argv[2],&count,NULL) + != C_OK) + { + return; + } + if (count < 0) count = 0; + } + + /* Fix the count according to the number of entries we got. */ + if ((size_t)count > listLength(ACLLog)) + count = listLength(ACLLog); + + addReplyArrayLen(c,count); + listIter li; + listNode *ln; + listRewind(ACLLog,&li); + mstime_t now = mstime(); + while (count-- && (ln = listNext(&li)) != NULL) { + ACLLogEntry *le = listNodeValue(ln); + addReplyMapLen(c,7); + addReplyBulkCString(c,"count"); + addReplyLongLong(c,le->count); + + addReplyBulkCString(c,"reason"); + char *reasonstr; + switch(le->reason) { + case ACL_DENIED_CMD: reasonstr="command"; break; + case ACL_DENIED_KEY: reasonstr="key"; break; + case ACL_DENIED_AUTH: reasonstr="auth"; break; + } + addReplyBulkCString(c,reasonstr); + + addReplyBulkCString(c,"context"); + char *ctxstr; + switch(le->context) { + case ACL_LOG_CTX_TOPLEVEL: ctxstr="toplevel"; break; + case ACL_LOG_CTX_MULTI: ctxstr="multi"; break; + case ACL_LOG_CTX_LUA: ctxstr="lua"; break; + default: ctxstr="unknown"; + } + addReplyBulkCString(c,ctxstr); + + addReplyBulkCString(c,"object"); + addReplyBulkCBuffer(c,le->object,sdslen(le->object)); + addReplyBulkCString(c,"username"); + addReplyBulkCBuffer(c,le->username,sdslen(le->username)); + addReplyBulkCString(c,"age-seconds"); + double age = (double)(now - le->ctime)/1000; + addReplyDouble(c,age); + addReplyBulkCString(c,"client-info"); + addReplyBulkCBuffer(c,le->cinfo,sdslen(le->cinfo)); + } } else if (!strcasecmp(sub,"help")) { const char *help[] = { "LOAD -- Reload users from the ACL file.", +"SAVE -- Save the current config to the ACL file." "LIST -- Show user details in config file format.", "USERS -- List all the registered usernames.", "SETUSER <username> [attribs ...] -- Create or modify a user.", @@ -1667,6 +1866,7 @@ void aclCommand(client *c) { "CAT <category> -- List commands inside category.", "GENPASS -- Generate a secure user password.", "WHOAMI -- Return the current connection username.", +"LOG [<count> | RESET] -- Show the ACL log entries.", NULL }; addReplyHelp(c,help); @@ -135,6 +135,14 @@ void aeDeleteEventLoop(aeEventLoop *eventLoop) { aeApiFree(eventLoop); zfree(eventLoop->events); zfree(eventLoop->fired); + + /* Free the time events list. */ + aeTimeEvent *next_te, *te = eventLoop->timeEventHead; + while (te) { + next_te = te->next; + zfree(te); + te = next_te; + } zfree(eventLoop); } @@ -242,6 +242,7 @@ void stopAppendOnly(void) { server.aof_fd = -1; server.aof_selected_db = -1; server.aof_state = AOF_OFF; + server.aof_rewrite_scheduled = 0; killAppendOnlyChild(); } @@ -1797,14 +1798,15 @@ void backgroundRewriteDoneHandler(int exitcode, int bysignal) { serverLog(LL_VERBOSE, "Background AOF rewrite signal handler took %lldus", ustime()-now); } else if (!bysignal && exitcode != 0) { + server.aof_lastbgrewrite_status = C_ERR; + + serverLog(LL_WARNING, + "Background AOF rewrite terminated with error"); + } else { /* SIGUSR1 is whitelisted, so we have a way to kill a child without * tirggering an error condition. */ if (bysignal != SIGUSR1) server.aof_lastbgrewrite_status = C_ERR; - serverLog(LL_WARNING, - "Background AOF rewrite terminated with error"); - } else { - server.aof_lastbgrewrite_status = C_ERR; serverLog(LL_WARNING, "Background AOF rewrite terminated by signal %d", bysignal); diff --git a/src/config.c b/src/config.c index e4b1bf869..5841ae7a0 100644 --- a/src/config.c +++ b/src/config.c @@ -190,8 +190,9 @@ typedef struct typeInterface { void (*init)(typeData data); /* Called on server start, should return 1 on success, 0 on error and should set err */ int (*load)(typeData data, sds *argc, int argv, char **err); - /* Called on CONFIG SET, returns 1 on success, 0 on error */ - int (*set)(typeData data, sds value, char **err); + /* Called on server startup and CONFIG SET, returns 1 on success, 0 on error + * and can set a verbose err string, update is true when called from CONFIG SET */ + int (*set)(typeData data, sds value, int update, char **err); /* Called on CONFIG GET, required to add output to the client */ void (*get)(client *c, typeData data); /* Called on CONFIG REWRITE, required to rewrite the config state */ @@ -323,7 +324,11 @@ void loadServerConfigFromString(char *config) { if ((!strcasecmp(argv[0],config->name) || (config->alias && !strcasecmp(argv[0],config->alias)))) { - if (!config->interface.load(config->data, argv, argc, &err)) { + if (argc != 2) { + err = "wrong number of arguments"; + goto loaderr; + } + if (!config->interface.set(config->data, argv[1], 0, &err)) { goto loaderr; } @@ -344,6 +349,10 @@ void loadServerConfigFromString(char *config) { if (addresses > CONFIG_BINDADDR_MAX) { err = "Too many bind addresses specified"; goto loaderr; } + /* Free old bind addresses */ + for (j = 0; j < server.bindaddr_count; j++) { + zfree(server.bindaddr[j]); + } for (j = 0; j < addresses; j++) server.bindaddr[j] = zstrdup(argv[j+1]); server.bindaddr_count = addresses; @@ -599,7 +608,7 @@ void configSetCommand(client *c) { if(config->modifiable && (!strcasecmp(c->argv[2]->ptr,config->name) || (config->alias && !strcasecmp(c->argv[2]->ptr,config->alias)))) { - if (!config->interface.set(config->data,o->ptr, &errstr)) { + if (!config->interface.set(config->data,o->ptr,1,&errstr)) { goto badfmt; } addReply(c,shared.ok); @@ -1536,9 +1545,8 @@ static char loadbuf[LOADBUF_SIZE]; .alias = (config_alias), \ .modifiable = (is_modifiable), -#define embedConfigInterface(initfn, loadfn, setfn, getfn, rewritefn) .interface = { \ +#define embedConfigInterface(initfn, setfn, getfn, rewritefn) .interface = { \ .init = (initfn), \ - .load = (loadfn), \ .set = (setfn), \ .get = (getfn), \ .rewrite = (rewritefn) \ @@ -1561,30 +1569,17 @@ static void boolConfigInit(typeData data) { *data.yesno.config = data.yesno.default_value; } -static int boolConfigLoad(typeData data, sds *argv, int argc, char **err) { - int yn; - if (argc != 2) { - *err = "wrong number of arguments"; - return 0; - } - if ((yn = yesnotoi(argv[1])) == -1) { +static int boolConfigSet(typeData data, sds value, int update, char **err) { + int yn = yesnotoi(value); + if (yn == -1) { *err = "argument must be 'yes' or 'no'"; return 0; } if (data.yesno.is_valid_fn && !data.yesno.is_valid_fn(yn, err)) return 0; - *data.yesno.config = yn; - return 1; -} - -static int boolConfigSet(typeData data, sds value, char **err) { - int yn = yesnotoi(value); - if (yn == -1) return 0; - if (data.yesno.is_valid_fn && !data.yesno.is_valid_fn(yn, err)) - return 0; int prev = *(data.yesno.config); *(data.yesno.config) = yn; - if (data.yesno.update_fn && !data.yesno.update_fn(yn, prev, err)) { + if (update && data.yesno.update_fn && !data.yesno.update_fn(yn, prev, err)) { *(data.yesno.config) = prev; return 0; } @@ -1601,7 +1596,7 @@ static void boolConfigRewrite(typeData data, const char *name, struct rewriteCon #define createBoolConfig(name, alias, modifiable, config_addr, default, is_valid, update) { \ embedCommonConfig(name, alias, modifiable) \ - embedConfigInterface(boolConfigInit, boolConfigLoad, boolConfigSet, boolConfigGet, boolConfigRewrite) \ + embedConfigInterface(boolConfigInit, boolConfigSet, boolConfigGet, boolConfigRewrite) \ .data.yesno = { \ .config = &(config_addr), \ .default_value = (default), \ @@ -1619,23 +1614,7 @@ static void stringConfigInit(typeData data) { } } -static int stringConfigLoad(typeData data, sds *argv, int argc, char **err) { - if (argc != 2) { - *err = "wrong number of arguments"; - return 0; - } - if (data.string.is_valid_fn && !data.string.is_valid_fn(argv[1], err)) - return 0; - zfree(*data.string.config); - if (data.string.convert_empty_to_null) { - *data.string.config = argv[1][0] ? zstrdup(argv[1]) : NULL; - } else { - *data.string.config = zstrdup(argv[1]); - } - return 1; -} - -static int stringConfigSet(typeData data, sds value, char **err) { +static int stringConfigSet(typeData data, sds value, int update, char **err) { if (data.string.is_valid_fn && !data.string.is_valid_fn(value, err)) return 0; char *prev = *data.string.config; @@ -1644,7 +1623,7 @@ static int stringConfigSet(typeData data, sds value, char **err) { } else { *data.string.config = zstrdup(value); } - if (data.string.update_fn && !data.string.update_fn(*data.string.config, prev, err)) { + if (update && data.string.update_fn && !data.string.update_fn(*data.string.config, prev, err)) { zfree(*data.string.config); *data.string.config = prev; return 0; @@ -1666,7 +1645,7 @@ static void stringConfigRewrite(typeData data, const char *name, struct rewriteC #define createStringConfig(name, alias, modifiable, empty_to_null, config_addr, default, is_valid, update) { \ embedCommonConfig(name, alias, modifiable) \ - embedConfigInterface(stringConfigInit, stringConfigLoad, stringConfigSet, stringConfigGet, stringConfigRewrite) \ + embedConfigInterface(stringConfigInit, stringConfigSet, stringConfigGet, stringConfigRewrite) \ .data.string = { \ .config = &(config_addr), \ .default_value = (default), \ @@ -1677,17 +1656,12 @@ static void stringConfigRewrite(typeData data, const char *name, struct rewriteC } /* Enum configs */ -static void configEnumInit(typeData data) { +static void enumConfigInit(typeData data) { *data.enumd.config = data.enumd.default_value; } -static int configEnumLoad(typeData data, sds *argv, int argc, char **err) { - if (argc != 2) { - *err = "wrong number of arguments"; - return 0; - } - - int enumval = configEnumGetValue(data.enumd.enum_value, argv[1]); +static int enumConfigSet(typeData data, sds value, int update, char **err) { + int enumval = configEnumGetValue(data.enumd.enum_value, value); if (enumval == INT_MIN) { sds enumerr = sdsnew("argument must be one of the following: "); configEnum *enumNode = data.enumd.enum_value; @@ -1709,35 +1683,26 @@ static int configEnumLoad(typeData data, sds *argv, int argc, char **err) { } if (data.enumd.is_valid_fn && !data.enumd.is_valid_fn(enumval, err)) return 0; - *(data.enumd.config) = enumval; - return 1; -} - -static int configEnumSet(typeData data, sds value, char **err) { - int enumval = configEnumGetValue(data.enumd.enum_value, value); - if (enumval == INT_MIN) return 0; - if (data.enumd.is_valid_fn && !data.enumd.is_valid_fn(enumval, err)) - return 0; int prev = *(data.enumd.config); *(data.enumd.config) = enumval; - if (data.enumd.update_fn && !data.enumd.update_fn(enumval, prev, err)) { + if (update && data.enumd.update_fn && !data.enumd.update_fn(enumval, prev, err)) { *(data.enumd.config) = prev; return 0; } return 1; } -static void configEnumGet(client *c, typeData data) { +static void enumConfigGet(client *c, typeData data) { addReplyBulkCString(c, configEnumGetNameOrUnknown(data.enumd.enum_value,*data.enumd.config)); } -static void configEnumRewrite(typeData data, const char *name, struct rewriteConfigState *state) { +static void enumConfigRewrite(typeData data, const char *name, struct rewriteConfigState *state) { rewriteConfigEnumOption(state, name,*(data.enumd.config), data.enumd.enum_value, data.enumd.default_value); } #define createEnumConfig(name, alias, modifiable, enum, config_addr, default, is_valid, update) { \ embedCommonConfig(name, alias, modifiable) \ - embedConfigInterface(configEnumInit, configEnumLoad, configEnumSet, configEnumGet, configEnumRewrite) \ + embedConfigInterface(enumConfigInit, enumConfigSet, enumConfigGet, enumConfigRewrite) \ .data.enumd = { \ .config = &(config_addr), \ .default_value = (default), \ @@ -1832,23 +1797,17 @@ static int numericBoundaryCheck(typeData data, long long ll, char **err) { return 1; } -static int numericConfigLoad(typeData data, sds *argv, int argc, char **err) { - long long ll; - - if (argc != 2) { - *err = "wrong number of arguments"; - return 0; - } - +static int numericConfigSet(typeData data, sds value, int update, char **err) { + long long ll, prev = 0; if (data.numeric.is_memory) { int memerr; - ll = memtoll(argv[1], &memerr); + ll = memtoll(value, &memerr); if (memerr || ll < 0) { *err = "argument must be a memory value"; return 0; } } else { - if (!string2ll(argv[1], sdslen(argv[1]),&ll)) { + if (!string2ll(value, sdslen(value),&ll)) { *err = "argument couldn't be parsed into an integer" ; return 0; } @@ -1860,31 +1819,10 @@ static int numericConfigLoad(typeData data, sds *argv, int argc, char **err) { if (data.numeric.is_valid_fn && !data.numeric.is_valid_fn(ll, err)) return 0; - SET_NUMERIC_TYPE(ll) - - return 1; -} - -static int numericConfigSet(typeData data, sds value, char **err) { - long long ll, prev = 0; - if (data.numeric.is_memory) { - int memerr; - ll = memtoll(value, &memerr); - if (memerr || ll < 0) return 0; - } else { - if (!string2ll(value, sdslen(value),&ll)) return 0; - } - - if (!numericBoundaryCheck(data, ll, err)) - return 0; - - if (data.numeric.is_valid_fn && !data.numeric.is_valid_fn(ll, err)) - return 0; - GET_NUMERIC_TYPE(prev) SET_NUMERIC_TYPE(ll) - if (data.numeric.update_fn && !data.numeric.update_fn(ll, prev, err)) { + if (update && data.numeric.update_fn && !data.numeric.update_fn(ll, prev, err)) { SET_NUMERIC_TYPE(prev) return 0; } @@ -1918,7 +1856,7 @@ static void numericConfigRewrite(typeData data, const char *name, struct rewrite #define embedCommonNumericalConfig(name, alias, modifiable, lower, upper, config_addr, default, memory, is_valid, update) { \ embedCommonConfig(name, alias, modifiable) \ - embedConfigInterface(numericConfigInit, numericConfigLoad, numericConfigSet, numericConfigGet, numericConfigRewrite) \ + embedConfigInterface(numericConfigInit, numericConfigSet, numericConfigGet, numericConfigRewrite) \ .data.numeric = { \ .lower_bound = (lower), \ .upper_bound = (upper), \ @@ -2061,8 +1999,9 @@ static int updateMaxmemory(long long val, long long prev, char **err) { UNUSED(prev); UNUSED(err); if (val) { - if ((unsigned long long)val < zmalloc_used_memory()) { - serverLog(LL_WARNING,"WARNING: the new maxmemory value set via CONFIG SET is smaller than the current memory usage. This will result in key eviction and/or the inability to accept new write commands depending on the maxmemory-policy."); + size_t used = zmalloc_used_memory()-freeMemoryGetNotCountedMemory(); + if ((unsigned long long)val < used) { + serverLog(LL_WARNING,"WARNING: the new maxmemory value set via CONFIG SET (%llu) is smaller than the current memory usage (%zu). This will result in key eviction and/or the inability to accept new write commands depending on the maxmemory-policy.", server.maxmemory, used); } freeMemoryIfNeededAndSafe(); } @@ -2221,7 +2160,6 @@ standardConfig configs[] = { createIntConfig("list-compress-depth", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.list_compress_depth, 0, INTEGER_CONFIG, NULL, NULL), createIntConfig("rdb-key-save-delay", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.rdb_key_save_delay, 0, INTEGER_CONFIG, NULL, NULL), createIntConfig("key-load-delay", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.key_load_delay, 0, INTEGER_CONFIG, NULL, NULL), - createIntConfig("tracking-table-max-fill", NULL, MODIFIABLE_CONFIG, 0, 100, server.tracking_table_max_fill, 10, INTEGER_CONFIG, NULL, NULL), /* Default: 10% tracking table max fill. */ createIntConfig("active-expire-effort", NULL, MODIFIABLE_CONFIG, 1, 10, server.active_expire_effort, 1, INTEGER_CONFIG, NULL, NULL), /* From 1 to 10. */ createIntConfig("hz", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.config_hz, CONFIG_DEFAULT_HZ, INTEGER_CONFIG, NULL, updateHZ), createIntConfig("min-replicas-to-write", "min-slaves-to-write", MODIFIABLE_CONFIG, 0, INT_MAX, server.repl_min_slaves_to_write, 0, INTEGER_CONFIG, NULL, updateGoodSlaves), @@ -2233,6 +2171,7 @@ standardConfig configs[] = { /* Unsigned Long configs */ createULongConfig("active-defrag-max-scan-fields", NULL, MODIFIABLE_CONFIG, 1, LONG_MAX, server.active_defrag_max_scan_fields, 1000, INTEGER_CONFIG, NULL, NULL), /* Default: keys with more than 1000 fields will be processed separately */ createULongConfig("slowlog-max-len", NULL, MODIFIABLE_CONFIG, 0, LONG_MAX, server.slowlog_max_len, 128, INTEGER_CONFIG, NULL, NULL), + createULongConfig("acllog-max-len", NULL, MODIFIABLE_CONFIG, 0, LONG_MAX, server.acllog_max_len, 128, INTEGER_CONFIG, NULL, NULL), /* Long Long configs */ createLongLongConfig("lua-time-limit", NULL, MODIFIABLE_CONFIG, 0, LONG_MAX, server.lua_time_limit, 5000, INTEGER_CONFIG, NULL, NULL),/* milliseconds */ @@ -2255,6 +2194,7 @@ standardConfig configs[] = { createSizeTConfig("stream-node-max-bytes", NULL, MODIFIABLE_CONFIG, 0, LONG_MAX, server.stream_node_max_bytes, 4096, MEMORY_CONFIG, NULL, NULL), createSizeTConfig("zset-max-ziplist-value", NULL, MODIFIABLE_CONFIG, 0, LONG_MAX, server.zset_max_ziplist_value, 64, MEMORY_CONFIG, NULL, NULL), createSizeTConfig("hll-sparse-max-bytes", NULL, MODIFIABLE_CONFIG, 0, LONG_MAX, server.hll_sparse_max_bytes, 3000, MEMORY_CONFIG, NULL, NULL), + createSizeTConfig("tracking-table-max-keys", NULL, MODIFIABLE_CONFIG, 0, LONG_MAX, server.tracking_table_max_keys, 1000000, INTEGER_CONFIG, NULL, NULL), /* Default: 1 million keys max. */ /* Other configs */ createTimeTConfig("repl-backlog-ttl", NULL, MODIFIABLE_CONFIG, 0, LONG_MAX, server.repl_backlog_time_limit, 60*60, INTEGER_CONFIG, NULL, NULL), /* Default: 1 hour */ @@ -347,7 +347,10 @@ robj *dbUnshareStringValue(redisDb *db, robj *key, robj *o) { * DB number if we want to flush only a single Redis database number. * * Flags are be EMPTYDB_NO_FLAGS if no special flags are specified or - * EMPTYDB_ASYNC if we want the memory to be freed in a different thread + * 1. EMPTYDB_ASYNC if we want the memory to be freed in a different thread. + * 2. EMPTYDB_BACKUP if we want to empty the backup dictionaries created by + * disklessLoadMakeBackups. In that case we only free memory and avoid + * firing module events. * and the function to return ASAP. * * On success the fuction returns the number of keys removed from the @@ -355,6 +358,8 @@ robj *dbUnshareStringValue(redisDb *db, robj *key, robj *o) { * DB number is out of range, and errno is set to EINVAL. */ long long emptyDbGeneric(redisDb *dbarray, int dbnum, int flags, void(callback)(void*)) { int async = (flags & EMPTYDB_ASYNC); + int backup = (flags & EMPTYDB_BACKUP); /* Just free the memory, nothing else */ + RedisModuleFlushInfoV1 fi = {REDISMODULE_FLUSHINFO_VERSION,!async,dbnum}; long long removed = 0; if (dbnum < -1 || dbnum >= server.dbnum) { @@ -362,16 +367,18 @@ long long emptyDbGeneric(redisDb *dbarray, int dbnum, int flags, void(callback)( return -1; } - /* Fire the flushdb modules event. */ - RedisModuleFlushInfoV1 fi = {REDISMODULE_FLUSHINFO_VERSION,!async,dbnum}; - moduleFireServerEvent(REDISMODULE_EVENT_FLUSHDB, - REDISMODULE_SUBEVENT_FLUSHDB_START, - &fi); - - /* Make sure the WATCHed keys are affected by the FLUSH* commands. - * Note that we need to call the function while the keys are still - * there. */ - signalFlushedDb(dbnum); + /* Pre-flush actions */ + if (!backup) { + /* Fire the flushdb modules event. */ + moduleFireServerEvent(REDISMODULE_EVENT_FLUSHDB, + REDISMODULE_SUBEVENT_FLUSHDB_START, + &fi); + + /* Make sure the WATCHed keys are affected by the FLUSH* commands. + * Note that we need to call the function while the keys are still + * there. */ + signalFlushedDb(dbnum); + } int startdb, enddb; if (dbnum == -1) { @@ -390,20 +397,24 @@ long long emptyDbGeneric(redisDb *dbarray, int dbnum, int flags, void(callback)( dictEmpty(dbarray[j].expires,callback); } } - if (server.cluster_enabled) { - if (async) { - slotToKeyFlushAsync(); - } else { - slotToKeyFlush(); + + /* Post-flush actions */ + if (!backup) { + if (server.cluster_enabled) { + if (async) { + slotToKeyFlushAsync(); + } else { + slotToKeyFlush(); + } } - } - if (dbnum == -1) flushSlaveKeysWithExpireList(); + if (dbnum == -1) flushSlaveKeysWithExpireList(); - /* Also fire the end event. Note that this event will fire almost - * immediately after the start event if the flush is asynchronous. */ - moduleFireServerEvent(REDISMODULE_EVENT_FLUSHDB, - REDISMODULE_SUBEVENT_FLUSHDB_END, - &fi); + /* Also fire the end event. Note that this event will fire almost + * immediately after the start event if the flush is asynchronous. */ + moduleFireServerEvent(REDISMODULE_EVENT_FLUSHDB, + REDISMODULE_SUBEVENT_FLUSHDB_END, + &fi); + } return removed; } @@ -1285,8 +1296,10 @@ int expireIfNeeded(redisDb *db, robj *key) { propagateExpire(db,key,server.lazyfree_lazy_expire); notifyKeyspaceEvent(NOTIFY_EXPIRED, "expired",key,db->id); - return server.lazyfree_lazy_expire ? dbAsyncDelete(db,key) : - dbSyncDelete(db,key); + int retval = server.lazyfree_lazy_expire ? dbAsyncDelete(db,key) : + dbSyncDelete(db,key); + if (retval) signalModifiedKey(db,key); + return retval; } /* ----------------------------------------------------------------------------- @@ -1522,6 +1535,22 @@ int *georadiusGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numk return keys; } +/* Helper function to extract keys from memory command. + * MEMORY USAGE <key> */ +int *memoryGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys) { + int *keys; + UNUSED(cmd); + + if (argc >= 3 && !strcasecmp(argv[1]->ptr,"usage")) { + keys = zmalloc(sizeof(int) * 1); + keys[0] = 2; + *numkeys = 1; + return keys; + } + *numkeys = 0; + return NULL; +} + /* XREAD [BLOCK <milliseconds>] [COUNT <count>] [GROUP <groupname> <ttl>] * STREAMS key_1 key_2 ... key_N ID_1 ID_2 ... ID_N */ int *xreadGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys) { diff --git a/src/debug.c b/src/debug.c index a2d37337d..36af35aec 100644 --- a/src/debug.c +++ b/src/debug.c @@ -355,6 +355,7 @@ void debugCommand(client *c) { "CRASH-AND-RECOVER <milliseconds> -- Hard crash and restart after <milliseconds> delay.", "DIGEST -- Output a hex signature representing the current DB content.", "DIGEST-VALUE <key-1> ... <key-N>-- Output a hex signature of the values of all the specified keys.", +"DEBUG PROTOCOL [string|integer|double|bignum|null|array|set|map|attrib|push|verbatim|true|false]", "ERROR <string> -- Return a Redis protocol error with <string> as message. Useful for clients unit tests to simulate Redis errors.", "LOG <message> -- write message to the server log.", "HTSTATS <dbid> -- Return hash table statistics of the specified Redis database.", @@ -362,6 +363,7 @@ void debugCommand(client *c) { "LOADAOF -- Flush the AOF buffers on disk and reload the AOF in memory.", "LUA-ALWAYS-REPLICATE-COMMANDS <0|1> -- Setting it to 1 makes Lua replication defaulting to replicating single commands, without the script having to enable effects replication.", "OBJECT <key> -- Show low level info about key and associated value.", +"OOM -- Crash the server simulating an out-of-memory error.", "PANIC -- Crash the server simulating a panic.", "POPULATE <count> [prefix] [size] -- Create <count> string keys named key:<num>. If a prefix is specified is used instead of the 'key' prefix.", "RELOAD -- Save the RDB on disk and reload it back in memory.", @@ -586,7 +588,7 @@ NULL } } else if (!strcasecmp(c->argv[1]->ptr,"protocol") && c->argc == 3) { /* DEBUG PROTOCOL [string|integer|double|bignum|null|array|set|map| - * attrib|push|verbatim|true|false|state|err|bloberr] */ + * attrib|push|verbatim|true|false] */ char *name = c->argv[2]->ptr; if (!strcasecmp(name,"string")) { addReplyBulkCString(c,"Hello World"); @@ -634,7 +636,7 @@ NULL } else if (!strcasecmp(name,"verbatim")) { addReplyVerbatim(c,"This is a verbatim\nstring",25,"txt"); } else { - addReplyError(c,"Wrong protocol type name. Please use one of the following: string|integer|double|bignum|null|array|set|map|attrib|push|verbatim|true|false|state|err|bloberr"); + addReplyError(c,"Wrong protocol type name. Please use one of the following: string|integer|double|bignum|null|array|set|map|attrib|push|verbatim|true|false"); } } else if (!strcasecmp(c->argv[1]->ptr,"sleep") && c->argc == 3) { double dtime = strtod(c->argv[2]->ptr,NULL); @@ -683,9 +685,12 @@ NULL sds stats = sdsempty(); char buf[4096]; - if (getLongFromObjectOrReply(c, c->argv[2], &dbid, NULL) != C_OK) + if (getLongFromObjectOrReply(c, c->argv[2], &dbid, NULL) != C_OK) { + sdsfree(stats); return; + } if (dbid < 0 || dbid >= server.dbnum) { + sdsfree(stats); addReplyError(c,"Out of range database"); return; } diff --git a/src/defrag.c b/src/defrag.c index 04e57955b..e729297a5 100644 --- a/src/defrag.c +++ b/src/defrag.c @@ -5,8 +5,8 @@ * We do that by scanning the keyspace and for each pointer we have, we can try to * ask the allocator if moving it to a new address will help reduce fragmentation. * - * Copyright (c) 2017, Oran Agra - * Copyright (c) 2017, Redis Labs, Inc + * Copyright (c) 2020, Oran Agra + * Copyright (c) 2020, Redis Labs, Inc * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -408,25 +408,32 @@ dictEntry* replaceSateliteDictKeyPtrAndOrDefragDictEntry(dict *d, sds oldkey, sd return NULL; } -long activeDefragQuickListNodes(quicklist *ql) { - quicklistNode *node = ql->head, *newnode; +long activeDefragQuickListNode(quicklist *ql, quicklistNode **node_ref) { + quicklistNode *newnode, *node = *node_ref; long defragged = 0; unsigned char *newzl; + if ((newnode = activeDefragAlloc(node))) { + if (newnode->prev) + newnode->prev->next = newnode; + else + ql->head = newnode; + if (newnode->next) + newnode->next->prev = newnode; + else + ql->tail = newnode; + *node_ref = node = newnode; + defragged++; + } + if ((newzl = activeDefragAlloc(node->zl))) + defragged++, node->zl = newzl; + return defragged; +} + +long activeDefragQuickListNodes(quicklist *ql) { + quicklistNode *node = ql->head; + long defragged = 0; while (node) { - if ((newnode = activeDefragAlloc(node))) { - if (newnode->prev) - newnode->prev->next = newnode; - else - ql->head = newnode; - if (newnode->next) - newnode->next->prev = newnode; - else - ql->tail = newnode; - node = newnode; - defragged++; - } - if ((newzl = activeDefragAlloc(node->zl))) - defragged++, node->zl = newzl; + defragged += activeDefragQuickListNode(ql, &node); node = node->next; } return defragged; @@ -440,12 +447,48 @@ void defragLater(redisDb *db, dictEntry *kde) { listAddNodeTail(db->defrag_later, key); } -long scanLaterList(robj *ob) { +/* returns 0 if no more work needs to be been done, and 1 if time is up and more work is needed. */ +long scanLaterList(robj *ob, unsigned long *cursor, long long endtime, long long *defragged) { quicklist *ql = ob->ptr; + quicklistNode *node; + long iterations = 0; + int bookmark_failed = 0; if (ob->type != OBJ_LIST || ob->encoding != OBJ_ENCODING_QUICKLIST) return 0; - server.stat_active_defrag_scanned+=ql->len; - return activeDefragQuickListNodes(ql); + + if (*cursor == 0) { + /* if cursor is 0, we start new iteration */ + node = ql->head; + } else { + node = quicklistBookmarkFind(ql, "_AD"); + if (!node) { + /* if the bookmark was deleted, it means we reached the end. */ + *cursor = 0; + return 0; + } + node = node->next; + } + + (*cursor)++; + while (node) { + (*defragged) += activeDefragQuickListNode(ql, &node); + server.stat_active_defrag_scanned++; + if (++iterations > 128 && !bookmark_failed) { + if (ustime() > endtime) { + if (!quicklistBookmarkCreate(&ql, "_AD", node)) { + bookmark_failed = 1; + } else { + ob->ptr = ql; /* bookmark creation may have re-allocated the quicklist */ + return 1; + } + } + iterations = 0; + } + node = node->next; + } + quicklistBookmarkDelete(ql, "_AD"); + *cursor = 0; + return bookmark_failed? 1: 0; } typedef struct { @@ -638,7 +681,8 @@ int scanLaterStraemListpacks(robj *ob, unsigned long *cursor, long long endtime, void *newdata = activeDefragAlloc(ri.data); if (newdata) raxSetData(ri.node, ri.data=newdata), (*defragged)++; - if (++iterations > 16) { + server.stat_active_defrag_scanned++; + if (++iterations > 128) { if (ustime() > endtime) { serverAssert(ri.key_len==sizeof(last)); memcpy(last,ri.key,ri.key_len); @@ -900,8 +944,7 @@ int defragLaterItem(dictEntry *de, unsigned long *cursor, long long endtime) { if (de) { robj *ob = dictGetVal(de); if (ob->type == OBJ_LIST) { - server.stat_active_defrag_hits += scanLaterList(ob); - *cursor = 0; /* list has no scan, we must finish it in one go */ + return scanLaterList(ob, cursor, endtime, &server.stat_active_defrag_hits); } else if (ob->type == OBJ_SET) { server.stat_active_defrag_hits += scanLaterSet(ob, cursor); } else if (ob->type == OBJ_ZSET) { @@ -961,11 +1004,6 @@ int defragLaterStep(redisDb *db, long long endtime) { if (defragLaterItem(de, &defrag_later_cursor, endtime)) quit = 1; /* time is up, we didn't finish all the work */ - /* Don't start a new BIG key in this loop, this is because the - * next key can be a list, and scanLaterList must be done in once cycle */ - if (!defrag_later_cursor) - quit = 1; - /* Once in 16 scan iterations, 512 pointer reallocations, or 64 fields * (if we have a lot of pointers in one hash bucket, or rehashing), * check if we reached the time limit. */ diff --git a/src/module.c b/src/module.c index 965bb4460..cce3d1c11 100644 --- a/src/module.c +++ b/src/module.c @@ -714,9 +714,9 @@ void RM_KeyAtPos(RedisModuleCtx *ctx, int pos) { * flags into the command flags used by the Redis core. * * It returns the set of flags, or -1 if unknown flags are found. */ -int commandFlagsFromString(char *s) { +int64_t commandFlagsFromString(char *s) { int count, j; - int flags = 0; + int64_t flags = 0; sds *tokens = sdssplitlen(s,strlen(s)," ",1,&count); for (j = 0; j < count; j++) { char *t = tokens[j]; @@ -730,6 +730,7 @@ int commandFlagsFromString(char *s) { else if (!strcasecmp(t,"random")) flags |= CMD_RANDOM; else if (!strcasecmp(t,"allow-stale")) flags |= CMD_STALE; else if (!strcasecmp(t,"no-monitor")) flags |= CMD_SKIP_MONITOR; + else if (!strcasecmp(t,"no-slowlog")) flags |= CMD_SKIP_SLOWLOG; else if (!strcasecmp(t,"fast")) flags |= CMD_FAST; else if (!strcasecmp(t,"no-auth")) flags |= CMD_NO_AUTH; else if (!strcasecmp(t,"getkeys-api")) flags |= CMD_MODULE_GETKEYS; @@ -781,6 +782,8 @@ int commandFlagsFromString(char *s) { * this means. * * **"no-monitor"**: Don't propagate the command on monitor. Use this if * the command has sensible data among the arguments. + * * **"no-slowlog"**: Don't log this command in the slowlog. Use this if + * the command has sensible data among the arguments. * * **"fast"**: The command time complexity is not greater * than O(log(N)) where N is the size of the collection or * anything else representing the normal scalability @@ -798,7 +801,7 @@ int commandFlagsFromString(char *s) { * to authenticate a client. */ int RM_CreateCommand(RedisModuleCtx *ctx, const char *name, RedisModuleCmdFunc cmdfunc, const char *strflags, int firstkey, int lastkey, int keystep) { - int flags = strflags ? commandFlagsFromString((char*)strflags) : 0; + int64_t flags = strflags ? commandFlagsFromString((char*)strflags) : 0; if (flags == -1) return REDISMODULE_ERR; if ((flags & CMD_MODULE_NO_CLUSTER) && server.cluster_enabled) return REDISMODULE_ERR; @@ -859,6 +862,7 @@ void RM_SetModuleAttribs(RedisModuleCtx *ctx, const char *name, int ver, int api module->in_call = 0; module->in_hook = 0; module->options = 0; + module->info_cb = 0; ctx->module = module; } @@ -889,7 +893,8 @@ void RM_SetModuleOptions(RedisModuleCtx *ctx, int options) { ctx->module->options = options; } -/* Signals that the key is modified from user's perspective (i.e. invalidate WATCH). */ +/* Signals that the key is modified from user's perspective (i.e. invalidate WATCH + * and client side caching). */ int RM_SignalModifiedKey(RedisModuleCtx *ctx, RedisModuleString *keyname) { signalModifiedKey(ctx->client->db,keyname); return REDISMODULE_OK; @@ -3648,14 +3653,15 @@ void moduleRDBLoadError(RedisModuleIO *io) { io->error = 1; return; } - serverLog(LL_WARNING, + serverPanic( "Error loading data from RDB (short read or EOF). " "Read performed by module '%s' about type '%s' " - "after reading '%llu' bytes of a value.", + "after reading '%llu' bytes of a value " + "for key named: '%s'.", io->type->module->name, io->type->name, - (unsigned long long)io->bytes); - exit(1); + (unsigned long long)io->bytes, + io->key? (char*)io->key->ptr: "(null)"); } /* Returns 0 if there's at least one registered data type that did not declare @@ -3901,7 +3907,7 @@ void RM_SaveLongDouble(RedisModuleIO *io, long double value) { /* Long double has different number of bits in different platforms, so we * save it as a string type. */ size_t len = ld2string(buf,sizeof(buf),value,LD_STR_HEX); - RM_SaveStringBuffer(io,buf,len+1); /* len+1 for '\0' */ + RM_SaveStringBuffer(io,buf,len); } /* In the context of the rdb_save method of a module data type, loads back the @@ -4277,6 +4283,24 @@ void unblockClientFromModule(client *c) { moduleFreeContext(&ctx); } + /* If we made it here and client is still blocked it means that the command + * timed-out, client was killed or disconnected and disconnect_callback was + * not implemented (or it was, but RM_UnblockClient was not called from + * within it, as it should). + * We must call moduleUnblockClient in order to free privdata and + * RedisModuleBlockedClient. + * + * Note that we only do that for clients that are blocked on keys, for which + * the contract is that the module should not call RM_UnblockClient under + * normal circumstances. + * Clients implementing threads and working with private data should be + * aware that calling RM_UnblockClient for every blocked client is their + * responsibility, and if they fail to do so memory may leak. Ideally they + * should implement the disconnect and timeout callbacks and call + * RM_UnblockClient, but any other way is also acceptable. */ + if (bc->blocked_on_keys && !bc->unblocked) + moduleUnblockClient(c); + bc->client = NULL; /* Reset the client for a new query since, for blocking commands implemented * into modules, we do not it immediately after the command returns (and @@ -4388,6 +4412,10 @@ int moduleTryServeClientBlockedOnKey(client *c, robj *key) { * * free_privdata: called in order to free the private data that is passed * by RedisModule_UnblockClient() call. + * + * Note: RedisModule_UnblockClient should be called for every blocked client, + * even if client was killed, timed-out or disconnected. Failing to do so + * will result in memory leaks. */ RedisModuleBlockedClient *RM_BlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms) { return moduleBlockClient(ctx,reply_callback,timeout_callback,free_privdata,timeout_ms, NULL,0,NULL); @@ -4442,7 +4470,15 @@ RedisModuleBlockedClient *RM_BlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc * freed using the free_privdata callback provided by the user. * * However the reply callback will be able to access the argument vector of - * the command, so the private data is often not needed. */ + * the command, so the private data is often not needed. + * + * Note: Under normal circumstances RedisModule_UnblockClient should not be + * called for clients that are blocked on keys (Either the key will + * become ready or a timeout will occur). If for some reason you do want + * to call RedisModule_UnblockClient it is possible: Client will be + * handled as if it were timed-out (You must implement the timeout + * callback in that case). + */ RedisModuleBlockedClient *RM_BlockClientOnKeys(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms, RedisModuleString **keys, int numkeys, void *privdata) { return moduleBlockClient(ctx,reply_callback,timeout_callback,free_privdata,timeout_ms, keys,numkeys,privdata); } @@ -4704,9 +4740,9 @@ int RM_BlockedClientDisconnected(RedisModuleCtx *ctx) { * * To call non-reply APIs, the thread safe context must be prepared with: * - * RedisModule_ThreadSafeCallStart(ctx); + * RedisModule_ThreadSafeContextLock(ctx); * ... make your call here ... - * RedisModule_ThreadSafeCallStop(ctx); + * RedisModule_ThreadSafeContextUnlock(ctx); * * This is not needed when using `RedisModule_Reply*` functions, assuming * that a blocked client was used when the context was created, otherwise @@ -4789,7 +4825,8 @@ void moduleReleaseGIL(void) { * - REDISMODULE_NOTIFY_EXPIRED: Expiration events * - REDISMODULE_NOTIFY_EVICTED: Eviction events * - REDISMODULE_NOTIFY_STREAM: Stream events - * - REDISMODULE_NOTIFY_ALL: All events + * - REDISMODULE_NOTIFY_KEYMISS: Key-miss events + * - REDISMODULE_NOTIFY_ALL: All events (Excluding REDISMODULE_NOTIFY_KEYMISS) * * We do not distinguish between key events and keyspace events, and it is up * to the module to filter the actions taken based on the key. @@ -6687,7 +6724,7 @@ int RM_Fork(RedisModuleForkDoneHandler cb, void *user_data) { server.module_child_pid = childpid; moduleForkInfo.done_handler = cb; moduleForkInfo.done_handler_user_data = user_data; - serverLog(LL_NOTICE, "Module fork started pid: %d ", childpid); + serverLog(LL_VERBOSE, "Module fork started pid: %d ", childpid); } return childpid; } @@ -6710,7 +6747,7 @@ int TerminateModuleForkChild(int child_pid, int wait) { server.module_child_pid != child_pid) return C_ERR; int statloc; - serverLog(LL_NOTICE,"Killing running module fork child: %ld", + serverLog(LL_VERBOSE,"Killing running module fork child: %ld", (long) server.module_child_pid); if (kill(server.module_child_pid,SIGUSR1) != -1 && wait) { while(wait4(server.module_child_pid,&statloc,0,NULL) != diff --git a/src/multi.c b/src/multi.c index df11225bd..cbbd2c513 100644 --- a/src/multi.c +++ b/src/multi.c @@ -177,8 +177,10 @@ void execCommand(client *c) { must_propagate = 1; } - int acl_retval = ACLCheckCommandPerm(c); + int acl_keypos; + int acl_retval = ACLCheckCommandPerm(c,&acl_keypos); if (acl_retval != ACL_OK) { + addACLLogEntry(c,acl_retval,acl_keypos,NULL); addReplyErrorFormat(c, "-NOPERM ACLs rules changed between the moment the " "transaction was accumulated and the EXEC call. " diff --git a/src/networking.c b/src/networking.c index a2e454d4b..4c394af70 100644 --- a/src/networking.c +++ b/src/networking.c @@ -154,6 +154,7 @@ client *createClient(connection *conn) { c->peerid = NULL; c->client_list_node = NULL; c->client_tracking_redirection = 0; + c->client_tracking_prefixes = NULL; c->auth_callback = NULL; c->auth_callback_privdata = NULL; c->auth_module = NULL; @@ -369,9 +370,10 @@ void addReplyErrorLength(client *c, const char *s, size_t len) { * Where the master must propagate the first change even if the second * will produce an error. However it is useful to log such events since * they are rare and may hint at errors in a script or a bug in Redis. */ - if (c->flags & (CLIENT_MASTER|CLIENT_SLAVE) && !(c->flags & CLIENT_MONITOR)) { - char* to = c->flags & CLIENT_MASTER? "master": "replica"; - char* from = c->flags & CLIENT_MASTER? "replica": "master"; + int ctype = getClientType(c); + if (ctype == CLIENT_TYPE_MASTER || ctype == CLIENT_TYPE_SLAVE) { + char* to = ctype == CLIENT_TYPE_MASTER? "master": "replica"; + char* from = ctype == CLIENT_TYPE_MASTER? "replica": "master"; char *cmdname = c->lastcmd ? c->lastcmd->name : "<unknown>"; serverLog(LL_WARNING,"== CRITICAL == This %s is sending an error " "to its %s: '%s' after processing the command " @@ -1074,7 +1076,7 @@ void freeClient(client *c) { } /* Log link disconnection with slave */ - if ((c->flags & CLIENT_SLAVE) && !(c->flags & CLIENT_MONITOR)) { + if (getClientType(c) == CLIENT_TYPE_SLAVE) { serverLog(LL_WARNING,"Connection with replica %s lost.", replicationGetSlaveName(c)); } @@ -1121,7 +1123,7 @@ void freeClient(client *c) { /* We need to remember the time when we started to have zero * attached slaves, as after some time we'll free the replication * backlog. */ - if (c->flags & CLIENT_SLAVE && listLength(server.slaves) == 0) + if (getClientType(c) == CLIENT_TYPE_SLAVE && listLength(server.slaves) == 0) server.repl_no_slaves_since = server.unixtime; refreshGoodSlavesCount(); /* Fire the replica change modules event. */ @@ -1162,9 +1164,14 @@ void freeClientAsync(client *c) { * may access the list while Redis uses I/O threads. All the other accesses * are in the context of the main thread while the other threads are * idle. */ - static pthread_mutex_t async_free_queue_mutex = PTHREAD_MUTEX_INITIALIZER; if (c->flags & CLIENT_CLOSE_ASAP || c->flags & CLIENT_LUA) return; c->flags |= CLIENT_CLOSE_ASAP; + if (server.io_threads_num == 1) { + /* no need to bother with locking if there's just one thread (the main thread) */ + listAddNodeTail(server.clients_to_close,c); + return; + } + static pthread_mutex_t async_free_queue_mutex = PTHREAD_MUTEX_INITIALIZER; pthread_mutex_lock(&async_free_queue_mutex); listAddNodeTail(server.clients_to_close,c); pthread_mutex_unlock(&async_free_queue_mutex); @@ -1252,8 +1259,8 @@ int writeToClient(client *c, int handler_installed) { * just deliver as much data as it is possible to deliver. * * Moreover, we also send as much as possible if the client is - * a slave (otherwise, on high-speed traffic, the replication - * buffer will grow indefinitely) */ + * a slave or a monitor (otherwise, on high-speed traffic, the + * replication/output buffer will grow indefinitely) */ if (totwritten > NET_MAX_WRITES_PER_EVENT && (server.maxmemory == 0 || zmalloc_used_memory() < server.maxmemory) && @@ -1358,6 +1365,12 @@ void resetClient(client *c) { if (!(c->flags & CLIENT_MULTI) && prevcmd != askingCommand) c->flags &= ~CLIENT_ASKING; + /* We do the same for the CACHING command as well. It also affects + * the next command or transaction executed, in a way very similar + * to ASKING. */ + if (!(c->flags & CLIENT_MULTI) && prevcmd != clientCommand) + c->flags &= ~CLIENT_TRACKING_CACHING; + /* Remove the CLIENT_REPLY_SKIP flag if any so that the reply * to the next command will be sent, but set the flag if the command * we just processed was "CLIENT REPLY SKIP". */ @@ -1439,7 +1452,7 @@ int processInlineBuffer(client *c) { /* Newline from slaves can be used to refresh the last ACK time. * This is useful for a slave to ping back while loading a big * RDB file. */ - if (querylen == 0 && c->flags & CLIENT_SLAVE) + if (querylen == 0 && getClientType(c) == CLIENT_TYPE_SLAVE) c->repl_ack_time = server.unixtime; /* Move querybuffer position to the next query in the buffer. */ @@ -2021,7 +2034,6 @@ int clientSetNameOrReply(client *c, robj *name) { void clientCommand(client *c) { listNode *ln; listIter li; - client *client; if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"help")) { const char *help[] = { @@ -2038,7 +2050,7 @@ void clientCommand(client *c) { "REPLY (on|off|skip) -- Control the replies sent to the current connection.", "SETNAME <name> -- Assign the name <name> to the current connection.", "UNBLOCK <clientid> [TIMEOUT|ERROR] -- Unblock the specified blocked client.", -"TRACKING (on|off) [REDIRECT <id>] -- Enable client keys tracking for client side caching.", +"TRACKING (on|off) [REDIRECT <id>] [BCAST] [PREFIX first] [PREFIX second] [OPTIN] [OPTOUT]... -- Enable client keys tracking for client side caching.", "GETREDIR -- Return the client ID we are redirecting to when tracking is enabled.", NULL }; @@ -2135,7 +2147,7 @@ NULL /* Iterate clients killing all the matching clients. */ listRewind(server.clients,&li); while ((ln = listNext(&li)) != NULL) { - client = listNodeValue(ln); + client *client = listNodeValue(ln); if (addr && strcmp(getClientPeerId(client),addr) != 0) continue; if (type != -1 && getClientType(client) != type) continue; if (id != 0 && client->id != id) continue; @@ -2213,38 +2225,131 @@ NULL UNIT_MILLISECONDS) != C_OK) return; pauseClients(duration); addReply(c,shared.ok); - } else if (!strcasecmp(c->argv[1]->ptr,"tracking") && - (c->argc == 3 || c->argc == 5)) - { - /* CLIENT TRACKING (on|off) [REDIRECT <id>] */ + } else if (!strcasecmp(c->argv[1]->ptr,"tracking") && c->argc >= 3) { + /* CLIENT TRACKING (on|off) [REDIRECT <id>] [BCAST] [PREFIX first] + * [PREFIX second] [OPTIN] [OPTOUT] ... */ long long redir = 0; + uint64_t options = 0; + robj **prefix = NULL; + size_t numprefix = 0; + + /* Parse the options. */ + for (int j = 3; j < c->argc; j++) { + int moreargs = (c->argc-1) - j; + + if (!strcasecmp(c->argv[j]->ptr,"redirect") && moreargs) { + j++; + if (redir != 0) { + addReplyError(c,"A client can only redirect to a single " + "other client"); + zfree(prefix); + return; + } - /* Parse the redirection option: we'll require the client with - * the specified ID to exist right now, even if it is possible - * it will get disconnected later. */ - if (c->argc == 5) { - if (strcasecmp(c->argv[3]->ptr,"redirect") != 0) { - addReply(c,shared.syntaxerr); - return; - } else { - if (getLongLongFromObjectOrReply(c,c->argv[4],&redir,NULL) != - C_OK) return; + if (getLongLongFromObjectOrReply(c,c->argv[j],&redir,NULL) != + C_OK) + { + zfree(prefix); + return; + } + /* We will require the client with the specified ID to exist + * right now, even if it is possible that it gets disconnected + * later. Still a valid sanity check. */ if (lookupClientByID(redir) == NULL) { addReplyError(c,"The client ID you want redirect to " "does not exist"); + zfree(prefix); return; } + } else if (!strcasecmp(c->argv[j]->ptr,"bcast")) { + options |= CLIENT_TRACKING_BCAST; + } else if (!strcasecmp(c->argv[j]->ptr,"optin")) { + options |= CLIENT_TRACKING_OPTIN; + } else if (!strcasecmp(c->argv[j]->ptr,"optout")) { + options |= CLIENT_TRACKING_OPTOUT; + } else if (!strcasecmp(c->argv[j]->ptr,"prefix") && moreargs) { + j++; + prefix = zrealloc(prefix,sizeof(robj*)*(numprefix+1)); + prefix[numprefix++] = c->argv[j]; + } else { + zfree(prefix); + addReply(c,shared.syntaxerr); + return; } } + /* Options are ok: enable or disable the tracking for this client. */ if (!strcasecmp(c->argv[2]->ptr,"on")) { - enableTracking(c,redir); + /* Before enabling tracking, make sure options are compatible + * among each other and with the current state of the client. */ + if (!(options & CLIENT_TRACKING_BCAST) && numprefix) { + addReplyError(c, + "PREFIX option requires BCAST mode to be enabled"); + zfree(prefix); + return; + } + + if (c->flags & CLIENT_TRACKING) { + int oldbcast = !!(c->flags & CLIENT_TRACKING_BCAST); + int newbcast = !!(options & CLIENT_TRACKING_BCAST); + if (oldbcast != newbcast) { + addReplyError(c, + "You can't switch BCAST mode on/off before disabling " + "tracking for this client, and then re-enabling it with " + "a different mode."); + zfree(prefix); + return; + } + } + + if (options & CLIENT_TRACKING_BCAST && + options & (CLIENT_TRACKING_OPTIN|CLIENT_TRACKING_OPTOUT)) + { + addReplyError(c, + "OPTIN and OPTOUT are not compatible with BCAST"); + zfree(prefix); + return; + } + + enableTracking(c,redir,options,prefix,numprefix); } else if (!strcasecmp(c->argv[2]->ptr,"off")) { disableTracking(c); } else { + zfree(prefix); addReply(c,shared.syntaxerr); return; } + zfree(prefix); + addReply(c,shared.ok); + } else if (!strcasecmp(c->argv[1]->ptr,"caching") && c->argc >= 3) { + if (!(c->flags & CLIENT_TRACKING)) { + addReplyError(c,"CLIENT CACHING can be called only when the " + "client is in tracking mode with OPTIN or " + "OPTOUT mode enabled"); + return; + } + + char *opt = c->argv[2]->ptr; + if (!strcasecmp(opt,"yes")) { + if (c->flags & CLIENT_TRACKING_OPTIN) { + c->flags |= CLIENT_TRACKING_CACHING; + } else { + addReplyError(c,"CLIENT CACHING YES is only valid when tracking is enabled in OPTIN mode."); + return; + } + } else if (!strcasecmp(opt,"no")) { + if (c->flags & CLIENT_TRACKING_OPTOUT) { + c->flags |= CLIENT_TRACKING_CACHING; + } else { + addReplyError(c,"CLIENT CACHING NO is only valid when tracking is enabled in OPTOUT mode."); + return; + } + } else { + addReply(c,shared.syntaxerr); + return; + } + + /* Common reply for when we succeeded. */ addReply(c,shared.ok); } else if (!strcasecmp(c->argv[1]->ptr,"getredir") && c->argc == 2) { /* CLIENT GETREDIR */ @@ -2433,12 +2538,14 @@ unsigned long getClientOutputBufferMemoryUsage(client *c) { * * The function will return one of the following: * CLIENT_TYPE_NORMAL -> Normal client - * CLIENT_TYPE_SLAVE -> Slave or client executing MONITOR command + * CLIENT_TYPE_SLAVE -> Slave * CLIENT_TYPE_PUBSUB -> Client subscribed to Pub/Sub channels * CLIENT_TYPE_MASTER -> The client representing our replication master. */ int getClientType(client *c) { if (c->flags & CLIENT_MASTER) return CLIENT_TYPE_MASTER; + /* Even though MONITOR clients are marked as replicas, we + * want the expose them as normal clients. */ if ((c->flags & CLIENT_SLAVE) && !(c->flags & CLIENT_MONITOR)) return CLIENT_TYPE_SLAVE; if (c->flags & CLIENT_PUBSUB) return CLIENT_TYPE_PUBSUB; diff --git a/src/notify.c b/src/notify.c index d6c3ad403..bb1055724 100644 --- a/src/notify.c +++ b/src/notify.c @@ -82,10 +82,10 @@ sds keyspaceEventsFlagsToString(int flags) { if (flags & NOTIFY_EXPIRED) res = sdscatlen(res,"x",1); if (flags & NOTIFY_EVICTED) res = sdscatlen(res,"e",1); if (flags & NOTIFY_STREAM) res = sdscatlen(res,"t",1); - if (flags & NOTIFY_KEY_MISS) res = sdscatlen(res,"m",1); } if (flags & NOTIFY_KEYSPACE) res = sdscatlen(res,"K",1); if (flags & NOTIFY_KEYEVENT) res = sdscatlen(res,"E",1); + if (flags & NOTIFY_KEY_MISS) res = sdscatlen(res,"m",1); return res; } diff --git a/src/object.c b/src/object.c index 2201a317a..11e335afc 100644 --- a/src/object.c +++ b/src/object.c @@ -640,21 +640,13 @@ int getDoubleFromObjectOrReply(client *c, robj *o, double *target, const char *m int getLongDoubleFromObject(robj *o, long double *target) { long double value; - char *eptr; if (o == NULL) { value = 0; } else { serverAssertWithInfo(NULL,o,o->type == OBJ_STRING); if (sdsEncodedObject(o)) { - errno = 0; - value = strtold(o->ptr, &eptr); - if (sdslen(o->ptr) == 0 || - isspace(((const char*)o->ptr)[0]) || - (size_t)(eptr-(char*)o->ptr) != sdslen(o->ptr) || - (errno == ERANGE && - (value == HUGE_VAL || value == -HUGE_VAL || value == 0)) || - isnan(value)) + if (!string2ld(o->ptr, sdslen(o->ptr), &value)) return C_ERR; } else if (o->encoding == OBJ_ENCODING_INT) { value = (long)o->ptr; @@ -983,37 +975,28 @@ struct redisMemOverhead *getMemoryOverheadData(void) { mem_total += mem; mem = 0; - if (listLength(server.slaves)) { - listIter li; - listNode *ln; - - listRewind(server.slaves,&li); - while((ln = listNext(&li))) { - client *c = listNodeValue(ln); - mem += getClientOutputBufferMemoryUsage(c); - mem += sdsAllocSize(c->querybuf); - mem += sizeof(client); - } - } - mh->clients_slaves = mem; - mem_total+=mem; - - mem = 0; if (listLength(server.clients)) { listIter li; listNode *ln; + size_t mem_normal = 0, mem_slaves = 0; listRewind(server.clients,&li); while((ln = listNext(&li))) { + size_t mem_curr = 0; client *c = listNodeValue(ln); - if (c->flags & CLIENT_SLAVE && !(c->flags & CLIENT_MONITOR)) - continue; - mem += getClientOutputBufferMemoryUsage(c); - mem += sdsAllocSize(c->querybuf); - mem += sizeof(client); + int type = getClientType(c); + mem_curr += getClientOutputBufferMemoryUsage(c); + mem_curr += sdsAllocSize(c->querybuf); + mem_curr += sizeof(client); + if (type == CLIENT_TYPE_SLAVE) + mem_slaves += mem_curr; + else + mem_normal += mem_curr; } + mh->clients_slaves = mem_slaves; + mh->clients_normal = mem_normal; + mem = mem_slaves + mem_normal; } - mh->clients_normal = mem; mem_total+=mem; mem = 0; @@ -1119,13 +1102,13 @@ sds getMemoryDoctorReport(void) { num_reports++; } - /* Allocator fss is higher than 1.1 and 10MB ? */ + /* Allocator rss is higher than 1.1 and 10MB ? */ if (mh->allocator_rss > 1.1 && mh->allocator_rss_bytes > 10<<20) { high_alloc_rss = 1; num_reports++; } - /* Non-Allocator fss is higher than 1.1 and 10MB ? */ + /* Non-Allocator rss is higher than 1.1 and 10MB ? */ if (mh->rss_extra > 1.1 && mh->rss_extra_bytes > 10<<20) { high_proc_rss = 1; num_reports++; diff --git a/src/pubsub.c b/src/pubsub.c index 994dd9734..5cb4298e0 100644 --- a/src/pubsub.c +++ b/src/pubsub.c @@ -35,7 +35,11 @@ int clientSubscriptionsCount(client *c); * Pubsub client replies API *----------------------------------------------------------------------------*/ -/* Send a pubsub message of type "message" to the client. */ +/* Send a pubsub message of type "message" to the client. + * Normally 'msg' is a Redis object containing the string to send as + * message. However if the caller sets 'msg' as NULL, it will be able + * to send a special message (for instance an Array type) by using the + * addReply*() API family. */ void addReplyPubsubMessage(client *c, robj *channel, robj *msg) { if (c->resp == 2) addReply(c,shared.mbulkhdr[3]); @@ -43,7 +47,7 @@ void addReplyPubsubMessage(client *c, robj *channel, robj *msg) { addReplyPushLen(c,3); addReply(c,shared.messagebulk); addReplyBulk(c,channel); - addReplyBulk(c,msg); + if (msg) addReplyBulk(c,msg); } /* Send a pubsub message of type "pmessage" to the client. The difference diff --git a/src/quicklist.c b/src/quicklist.c index 7b5484116..ae183ffd8 100644 --- a/src/quicklist.c +++ b/src/quicklist.c @@ -70,6 +70,12 @@ static const size_t optimization_level[] = {4096, 8192, 16384, 32768, 65536}; } while (0); #endif +/* Bookmarks forward declarations */ +#define QL_MAX_BM ((1 << QL_BM_BITS)-1) +quicklistBookmark *_quicklistBookmarkFindByName(quicklist *ql, const char *name); +quicklistBookmark *_quicklistBookmarkFindByNode(quicklist *ql, quicklistNode *node); +void _quicklistBookmarkDelete(quicklist *ql, quicklistBookmark *bm); + /* Simple way to give quicklistEntry structs default values with one call. */ #define initEntry(e) \ do { \ @@ -100,10 +106,11 @@ quicklist *quicklistCreate(void) { quicklist->count = 0; quicklist->compress = 0; quicklist->fill = -2; + quicklist->bookmark_count = 0; return quicklist; } -#define COMPRESS_MAX (1 << 16) +#define COMPRESS_MAX (1 << QL_COMP_BITS) void quicklistSetCompressDepth(quicklist *quicklist, int compress) { if (compress > COMPRESS_MAX) { compress = COMPRESS_MAX; @@ -113,7 +120,7 @@ void quicklistSetCompressDepth(quicklist *quicklist, int compress) { quicklist->compress = compress; } -#define FILL_MAX (1 << 15) +#define FILL_MAX (1 << (QL_FILL_BITS-1)) void quicklistSetFill(quicklist *quicklist, int fill) { if (fill > FILL_MAX) { fill = FILL_MAX; @@ -169,6 +176,7 @@ void quicklistRelease(quicklist *quicklist) { quicklist->len--; current = next; } + quicklistBookmarksClear(quicklist); zfree(quicklist); } @@ -578,6 +586,15 @@ quicklist *quicklistCreateFromZiplist(int fill, int compress, REDIS_STATIC void __quicklistDelNode(quicklist *quicklist, quicklistNode *node) { + /* Update the bookmark if any */ + quicklistBookmark *bm = _quicklistBookmarkFindByNode(quicklist, node); + if (bm) { + bm->node = node->next; + /* if the bookmark was to the last node, delete it. */ + if (!bm->node) + _quicklistBookmarkDelete(quicklist, bm); + } + if (node->next) node->next->prev = node->prev; if (node->prev) @@ -1410,6 +1427,87 @@ void quicklistPush(quicklist *quicklist, void *value, const size_t sz, } } +/* Create or update a bookmark in the list which will be updated to the next node + * automatically when the one referenced gets deleted. + * Returns 1 on success (creation of new bookmark or override of an existing one). + * Returns 0 on failure (reached the maximum supported number of bookmarks). + * NOTE: use short simple names, so that string compare on find is quick. + * NOTE: bookmakrk creation may re-allocate the quicklist, so the input pointer + may change and it's the caller responsibilty to update the reference. + */ +int quicklistBookmarkCreate(quicklist **ql_ref, const char *name, quicklistNode *node) { + quicklist *ql = *ql_ref; + if (ql->bookmark_count >= QL_MAX_BM) + return 0; + quicklistBookmark *bm = _quicklistBookmarkFindByName(ql, name); + if (bm) { + bm->node = node; + return 1; + } + ql = zrealloc(ql, sizeof(quicklist) + (ql->bookmark_count+1) * sizeof(quicklistBookmark)); + *ql_ref = ql; + ql->bookmarks[ql->bookmark_count].node = node; + ql->bookmarks[ql->bookmark_count].name = zstrdup(name); + ql->bookmark_count++; + return 1; +} + +/* Find the quicklist node referenced by a named bookmark. + * When the bookmarked node is deleted the bookmark is updated to the next node, + * and if that's the last node, the bookmark is deleted (so find returns NULL). */ +quicklistNode *quicklistBookmarkFind(quicklist *ql, const char *name) { + quicklistBookmark *bm = _quicklistBookmarkFindByName(ql, name); + if (!bm) return NULL; + return bm->node; +} + +/* Delete a named bookmark. + * returns 0 if bookmark was not found, and 1 if deleted. + * Note that the bookmark memory is not freed yet, and is kept for future use. */ +int quicklistBookmarkDelete(quicklist *ql, const char *name) { + quicklistBookmark *bm = _quicklistBookmarkFindByName(ql, name); + if (!bm) + return 0; + _quicklistBookmarkDelete(ql, bm); + return 1; +} + +quicklistBookmark *_quicklistBookmarkFindByName(quicklist *ql, const char *name) { + unsigned i; + for (i=0; i<ql->bookmark_count; i++) { + if (!strcmp(ql->bookmarks[i].name, name)) { + return &ql->bookmarks[i]; + } + } + return NULL; +} + +quicklistBookmark *_quicklistBookmarkFindByNode(quicklist *ql, quicklistNode *node) { + unsigned i; + for (i=0; i<ql->bookmark_count; i++) { + if (ql->bookmarks[i].node == node) { + return &ql->bookmarks[i]; + } + } + return NULL; +} + +void _quicklistBookmarkDelete(quicklist *ql, quicklistBookmark *bm) { + int index = bm - ql->bookmarks; + zfree(bm->name); + ql->bookmark_count--; + memmove(bm, bm+1, (ql->bookmark_count - index)* sizeof(*bm)); + /* NOTE: We do not shrink (realloc) the quicklist yet (to avoid resonance, + * it may be re-used later (a call to realloc may NOP). */ +} + +void quicklistBookmarksClear(quicklist *ql) { + while (ql->bookmark_count) + zfree(ql->bookmarks[--ql->bookmark_count].name); + /* NOTE: We do not shrink (realloc) the quick list. main use case for this + * function is just before releasing the allocation. */ +} + /* The rest of this file is test cases and test helpers. */ #ifdef REDIS_TEST #include <stdint.h> @@ -2641,6 +2739,54 @@ int quicklistTest(int argc, char *argv[]) { printf("Compressions: %0.2f seconds.\n", (float)(stop - start) / 1000); printf("\n"); + TEST("bookmark get updated to next item") { + quicklist *ql = quicklistNew(1, 0); + quicklistPushTail(ql, "1", 1); + quicklistPushTail(ql, "2", 1); + quicklistPushTail(ql, "3", 1); + quicklistPushTail(ql, "4", 1); + quicklistPushTail(ql, "5", 1); + assert(ql->len==5); + /* add two bookmarks, one pointing to the node before the last. */ + assert(quicklistBookmarkCreate(&ql, "_dummy", ql->head->next)); + assert(quicklistBookmarkCreate(&ql, "_test", ql->tail->prev)); + /* test that the bookmark returns the right node, delete it and see that the bookmark points to the last node */ + assert(quicklistBookmarkFind(ql, "_test") == ql->tail->prev); + assert(quicklistDelRange(ql, -2, 1)); + assert(quicklistBookmarkFind(ql, "_test") == ql->tail); + /* delete the last node, and see that the bookmark was deleted. */ + assert(quicklistDelRange(ql, -1, 1)); + assert(quicklistBookmarkFind(ql, "_test") == NULL); + /* test that other bookmarks aren't affected */ + assert(quicklistBookmarkFind(ql, "_dummy") == ql->head->next); + assert(quicklistBookmarkFind(ql, "_missing") == NULL); + assert(ql->len==3); + quicklistBookmarksClear(ql); /* for coverage */ + assert(quicklistBookmarkFind(ql, "_dummy") == NULL); + quicklistRelease(ql); + } + + TEST("bookmark limit") { + int i; + quicklist *ql = quicklistNew(1, 0); + quicklistPushHead(ql, "1", 1); + for (i=0; i<QL_MAX_BM; i++) + assert(quicklistBookmarkCreate(&ql, genstr("",i), ql->head)); + /* when all bookmarks are used, creation fails */ + assert(!quicklistBookmarkCreate(&ql, "_test", ql->head)); + /* delete one and see that we can now create another */ + assert(quicklistBookmarkDelete(ql, "0")); + assert(quicklistBookmarkCreate(&ql, "_test", ql->head)); + /* delete one and see that the rest survive */ + assert(quicklistBookmarkDelete(ql, "_test")); + for (i=1; i<QL_MAX_BM; i++) + assert(quicklistBookmarkFind(ql, genstr("",i)) == ql->head); + /* make sure the deleted ones are indeed gone */ + assert(!quicklistBookmarkFind(ql, "0")); + assert(!quicklistBookmarkFind(ql, "_test")); + quicklistRelease(ql); + } + if (!err) printf("ALL TESTS PASSED!\n"); else diff --git a/src/quicklist.h b/src/quicklist.h index a7e27a2dd..8b553c119 100644 --- a/src/quicklist.h +++ b/src/quicklist.h @@ -28,6 +28,8 @@ * POSSIBILITY OF SUCH DAMAGE. */ +#include <stdint.h> // for UINTPTR_MAX + #ifndef __QUICKLIST_H__ #define __QUICKLIST_H__ @@ -64,19 +66,51 @@ typedef struct quicklistLZF { char compressed[]; } quicklistLZF; +/* Bookmarks are padded with realloc at the end of of the quicklist struct. + * They should only be used for very big lists if thousands of nodes were the + * excess memory usage is negligible, and there's a real need to iterate on them + * in portions. + * When not used, they don't add any memory overhead, but when used and then + * deleted, some overhead remains (to avoid resonance). + * The number of bookmarks used should be kept to minimum since it also adds + * overhead on node deletion (searching for a bookmark to update). */ +typedef struct quicklistBookmark { + quicklistNode *node; + char *name; +} quicklistBookmark; + +#if UINTPTR_MAX == 0xffffffff +/* 32-bit */ +# define QL_FILL_BITS 14 +# define QL_COMP_BITS 14 +# define QL_BM_BITS 4 +#elif UINTPTR_MAX == 0xffffffffffffffff +/* 64-bit */ +# define QL_FILL_BITS 16 +# define QL_COMP_BITS 16 +# define QL_BM_BITS 4 /* we can encode more, but we rather limit the user + since they cause performance degradation. */ +#else +# error unknown arch bits count +#endif + /* quicklist is a 40 byte struct (on 64-bit systems) describing a quicklist. * 'count' is the number of total entries. * 'len' is the number of quicklist nodes. * 'compress' is: -1 if compression disabled, otherwise it's the number * of quicklistNodes to leave uncompressed at ends of quicklist. - * 'fill' is the user-requested (or default) fill factor. */ + * 'fill' is the user-requested (or default) fill factor. + * 'bookmakrs are an optional feature that is used by realloc this struct, + * so that they don't consume memory when not used. */ typedef struct quicklist { quicklistNode *head; quicklistNode *tail; unsigned long count; /* total count of all entries in all ziplists */ unsigned long len; /* number of quicklistNodes */ - int fill : 16; /* fill factor for individual nodes */ - unsigned int compress : 16; /* depth of end nodes not to compress;0=off */ + int fill : QL_FILL_BITS; /* fill factor for individual nodes */ + unsigned int compress : QL_COMP_BITS; /* depth of end nodes not to compress;0=off */ + unsigned int bookmark_count: QL_BM_BITS; + quicklistBookmark bookmarks[]; } quicklist; typedef struct quicklistIter { @@ -158,6 +192,12 @@ unsigned long quicklistCount(const quicklist *ql); int quicklistCompare(unsigned char *p1, unsigned char *p2, int p2_len); size_t quicklistGetLzf(const quicklistNode *node, void **data); +/* bookmarks */ +int quicklistBookmarkCreate(quicklist **ql_ref, const char *name, quicklistNode *node); +int quicklistBookmarkDelete(quicklist *ql, const char *name); +quicklistNode *quicklistBookmarkFind(quicklist *ql, const char *name); +void quicklistBookmarksClear(quicklist *ql); + #ifdef REDIS_TEST int quicklistTest(int argc, char *argv[]); #endif @@ -1766,6 +1766,7 @@ int raxRandomWalk(raxIterator *it, size_t steps) { if (n->iskey) steps--; } it->node = n; + it->data = raxGetData(it->node); return 1; } @@ -1871,7 +1871,10 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) { } } else if (rdbtype == RDB_TYPE_MODULE || rdbtype == RDB_TYPE_MODULE_2) { uint64_t moduleid = rdbLoadLen(rdb,NULL); - if (rioGetReadError(rdb)) return NULL; + if (rioGetReadError(rdb)) { + rdbReportReadError("Short read module id"); + return NULL; + } moduleType *mt = moduleTypeLookupModuleByID(moduleid); char name[10]; diff --git a/src/redis-cli.c b/src/redis-cli.c index 065c389c6..1d79e0db0 100644 --- a/src/redis-cli.c +++ b/src/redis-cli.c @@ -49,6 +49,7 @@ #include <hiredis.h> #ifdef USE_OPENSSL #include <openssl/ssl.h> +#include <openssl/err.h> #include <hiredis_ssl.h> #endif #include <sds.h> /* use sds.h from hiredis, so that only one set of sds functions will be present in the binary */ @@ -1594,15 +1595,15 @@ static int parseOptions(int argc, char **argv) { #ifdef USE_OPENSSL } else if (!strcmp(argv[i],"--tls")) { config.tls = 1; - } else if (!strcmp(argv[i],"--sni")) { + } else if (!strcmp(argv[i],"--sni") && !lastarg) { config.sni = argv[++i]; - } else if (!strcmp(argv[i],"--cacertdir")) { + } else if (!strcmp(argv[i],"--cacertdir") && !lastarg) { config.cacertdir = argv[++i]; - } else if (!strcmp(argv[i],"--cacert")) { + } else if (!strcmp(argv[i],"--cacert") && !lastarg) { config.cacert = argv[++i]; - } else if (!strcmp(argv[i],"--cert")) { + } else if (!strcmp(argv[i],"--cert") && !lastarg) { config.cert = argv[++i]; - } else if (!strcmp(argv[i],"--key")) { + } else if (!strcmp(argv[i],"--key") && !lastarg) { config.key = argv[++i]; #endif } else if (!strcmp(argv[i],"-v") || !strcmp(argv[i], "--version")) { @@ -1687,8 +1688,8 @@ static void usage(void) { " You can also use the " REDIS_CLI_AUTH_ENV " environment\n" " variable to pass this password more safely\n" " (if both are used, this argument takes predecence).\n" -" -user <username> Used to send ACL style 'AUTH username pass'. Needs -a.\n" -" -pass <password> Alias of -a for consistency with the new --user option.\n" +" --user <username> Used to send ACL style 'AUTH username pass'. Needs -a.\n" +" --pass <password> Alias of -a for consistency with the new --user option.\n" " -u <uri> Server URI.\n" " -r <repeat> Execute specified command N times.\n" " -i <interval> When -r is used, waits <interval> seconds per command.\n" @@ -1700,12 +1701,13 @@ static void usage(void) { " -c Enable cluster mode (follow -ASK and -MOVED redirections).\n" #ifdef USE_OPENSSL " --tls Establish a secure TLS connection.\n" -" --cacert CA Certificate file to verify with.\n" -" --cacertdir Directory where trusted CA certificates are stored.\n" +" --sni <host> Server name indication for TLS.\n" +" --cacert <file> CA Certificate file to verify with.\n" +" --cacertdir <dir> Directory where trusted CA certificates are stored.\n" " If neither cacert nor cacertdir are specified, the default\n" " system-wide trusted root certs configuration will apply.\n" -" --cert Client certificate to authenticate with.\n" -" --key Private key file to authenticate with.\n" +" --cert <file> Client certificate to authenticate with.\n" +" --key <file> Private key file to authenticate with.\n" #endif " --raw Use raw formatting for replies (default when STDOUT is\n" " not a tty).\n" @@ -7933,6 +7935,14 @@ int main(int argc, char **argv) { parseEnv(); +#ifdef USE_OPENSSL + if (config.tls) { + ERR_load_crypto_strings(); + SSL_load_error_strings(); + SSL_library_init(); + } +#endif + /* Cluster Manager mode */ if (CLUSTER_MANAGER_MODE()) { clusterManagerCommandProc *proc = validateClusterManagerCommand(); diff --git a/src/redismodule.h b/src/redismodule.h index 637078f2b..e74611f13 100644 --- a/src/redismodule.h +++ b/src/redismodule.h @@ -127,8 +127,8 @@ #define REDISMODULE_NOTIFY_EXPIRED (1<<8) /* x */ #define REDISMODULE_NOTIFY_EVICTED (1<<9) /* e */ #define REDISMODULE_NOTIFY_STREAM (1<<10) /* t */ -#define REDISMODULE_NOTIFY_KEY_MISS (1<<11) /* m */ -#define REDISMODULE_NOTIFY_ALL (REDISMODULE_NOTIFY_GENERIC | REDISMODULE_NOTIFY_STRING | REDISMODULE_NOTIFY_LIST | REDISMODULE_NOTIFY_SET | REDISMODULE_NOTIFY_HASH | REDISMODULE_NOTIFY_ZSET | REDISMODULE_NOTIFY_EXPIRED | REDISMODULE_NOTIFY_EVICTED | REDISMODULE_NOTIFY_STREAM | REDISMODULE_NOTIFY_KEY_MISS) /* A */ +#define REDISMODULE_NOTIFY_KEY_MISS (1<<11) /* m (Note: This one is excluded from REDISMODULE_NOTIFY_ALL on purpose) */ +#define REDISMODULE_NOTIFY_ALL (REDISMODULE_NOTIFY_GENERIC | REDISMODULE_NOTIFY_STRING | REDISMODULE_NOTIFY_LIST | REDISMODULE_NOTIFY_SET | REDISMODULE_NOTIFY_HASH | REDISMODULE_NOTIFY_ZSET | REDISMODULE_NOTIFY_EXPIRED | REDISMODULE_NOTIFY_EVICTED | REDISMODULE_NOTIFY_STREAM) /* A */ /* A special pointer that we can use between the core and the module to signal diff --git a/src/replication.c b/src/replication.c index b7e77184a..c497051c8 100644 --- a/src/replication.c +++ b/src/replication.c @@ -1339,8 +1339,8 @@ void disklessLoadRestoreBackups(redisDb *backup, int restore, int empty_db_flags server.db[i] = backup[i]; } } else { - /* Delete. */ - emptyDbGeneric(backup,-1,empty_db_flags,replicationEmptyDbCallback); + /* Delete (Pass EMPTYDB_BACKUP in order to avoid firing module events) . */ + emptyDbGeneric(backup,-1,empty_db_flags|EMPTYDB_BACKUP,replicationEmptyDbCallback); for (int i=0; i<server.dbnum; i++) { dictRelease(backup[i].dict); dictRelease(backup[i].expires); @@ -1352,9 +1352,9 @@ void disklessLoadRestoreBackups(redisDb *backup, int restore, int empty_db_flags /* Asynchronously read the SYNC payload we receive from a master */ #define REPL_MAX_WRITTEN_BEFORE_FSYNC (1024*1024*8) /* 8 MB */ void readSyncBulkPayload(connection *conn) { - char buf[4096]; + char buf[PROTO_IOBUF_LEN]; ssize_t nread, readlen, nwritten; - int use_diskless_load; + int use_diskless_load = useDisklessLoad(); redisDb *diskless_load_backup = NULL; int empty_db_flags = server.repl_slave_lazy_flush ? EMPTYDB_ASYNC : EMPTYDB_NO_FLAGS; @@ -1411,19 +1411,18 @@ void readSyncBulkPayload(connection *conn) { server.repl_transfer_size = 0; serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: receiving streamed RDB from master with EOF %s", - useDisklessLoad()? "to parser":"to disk"); + use_diskless_load? "to parser":"to disk"); } else { usemark = 0; server.repl_transfer_size = strtol(buf+1,NULL,10); serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: receiving %lld bytes from master %s", (long long) server.repl_transfer_size, - useDisklessLoad()? "to parser":"to disk"); + use_diskless_load? "to parser":"to disk"); } return; } - use_diskless_load = useDisklessLoad(); if (!use_diskless_load) { /* Read the data from the socket, store it to a file and search * for the EOF. */ @@ -1525,7 +1524,6 @@ void readSyncBulkPayload(connection *conn) { /* We need to stop any AOF rewriting child before flusing and parsing * the RDB, otherwise we'll create a copy-on-write disaster. */ if (server.aof_state != AOF_OFF) stopAppendOnly(); - signalFlushedDb(-1); /* When diskless RDB loading is used by replicas, it may be configured * in order to save the current DB instead of throwing it away, @@ -1533,10 +1531,15 @@ void readSyncBulkPayload(connection *conn) { if (use_diskless_load && server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) { + /* Create a backup of server.db[] and initialize to empty + * dictionaries */ diskless_load_backup = disklessLoadMakeBackups(); - } else { - emptyDb(-1,empty_db_flags,replicationEmptyDbCallback); } + /* We call to emptyDb even in case of REPL_DISKLESS_LOAD_SWAPDB + * (Where disklessLoadMakeBackups left server.db empty) because we + * want to execute all the auxiliary logic of emptyDb (Namely, + * fire module events) */ + emptyDb(-1,empty_db_flags,replicationEmptyDbCallback); /* Before loading the DB into memory we need to delete the readable * handler, otherwise it will get called recursively since @@ -2399,6 +2402,10 @@ void replicationUnsetMaster(void) { moduleFireServerEvent(REDISMODULE_EVENT_REPLICATION_ROLE_CHANGED, REDISMODULE_EVENT_REPLROLECHANGED_NOW_MASTER, NULL); + + /* Restart the AOF subsystem in case we shut it down during a sync when + * we were still a slave. */ + if (server.aof_enabled && server.aof_state == AOF_OFF) restartAOFAfterSYNC(); } /* This function is called when the slave lose the connection with the @@ -2436,9 +2443,6 @@ void replicaofCommand(client *c) { serverLog(LL_NOTICE,"MASTER MODE enabled (user request from '%s')", client); sdsfree(client); - /* Restart the AOF subsystem in case we shut it down during a sync when - * we were still a slave. */ - if (server.aof_enabled && server.aof_state == AOF_OFF) restartAOFAfterSYNC(); } } else { long port; diff --git a/src/scripting.c b/src/scripting.c index 9282b7fd9..7f64e06db 100644 --- a/src/scripting.c +++ b/src/scripting.c @@ -606,8 +606,10 @@ int luaRedisGenericCommand(lua_State *lua, int raise_error) { } /* Check the ACLs. */ - int acl_retval = ACLCheckCommandPerm(c); + int acl_keypos; + int acl_retval = ACLCheckCommandPerm(c,&acl_keypos); if (acl_retval != ACL_OK) { + addACLLogEntry(c,acl_retval,acl_keypos,NULL); if (acl_retval == ACL_DENIED_CMD) luaPushError(lua, "The user executing the script can't run this " "command or subcommand"); @@ -2473,6 +2475,7 @@ void ldbEval(lua_State *lua, sds *argv, int argc) { ldbLog(sdscatfmt(sdsempty(),"<error> %s",lua_tostring(lua,-1))); lua_pop(lua,1); sdsfree(code); + sdsfree(expr); return; } } @@ -34,7 +34,7 @@ #define __SDS_H #define SDS_MAX_PREALLOC (1024*1024) -const char *SDS_NOINIT; +extern const char *SDS_NOINIT; #include <sys/types.h> #include <stdarg.h> diff --git a/src/server.c b/src/server.c index 5845a5485..22c81070c 100644 --- a/src/server.c +++ b/src/server.c @@ -579,7 +579,7 @@ struct redisCommand redisCommandTable[] = { 0,NULL,0,0,0,0,0,0}, {"select",selectCommand,2, - "ok-loading fast @keyspace", + "ok-loading fast ok-stale @keyspace", 0,NULL,0,0,0,0,0,0}, {"swapdb",swapdbCommand,3, @@ -660,7 +660,7 @@ struct redisCommand redisCommandTable[] = { 0,NULL,0,0,0,0,0,0}, {"lastsave",lastsaveCommand,1, - "read-only random fast @admin @dangerous", + "read-only random fast ok-loading ok-stale @admin @dangerous", 0,NULL,0,0,0,0,0,0}, {"type",typeCommand,2, @@ -708,7 +708,7 @@ struct redisCommand redisCommandTable[] = { 0,NULL,0,0,0,0,0,0}, {"monitor",monitorCommand,1, - "admin no-script", + "admin no-script ok-loading ok-stale", 0,NULL,0,0,0,0,0,0}, {"ttl",ttlCommand,2, @@ -740,7 +740,7 @@ struct redisCommand redisCommandTable[] = { 0,NULL,0,0,0,0,0,0}, {"debug",debugCommand,-2, - "admin no-script", + "admin no-script ok-loading ok-stale", 0,NULL,0,0,0,0,0,0}, {"config",configCommand,-2, @@ -817,14 +817,14 @@ struct redisCommand redisCommandTable[] = { {"memory",memoryCommand,-2, "random read-only", - 0,NULL,0,0,0,0,0,0}, + 0,memoryGetKeys,0,0,0,0,0,0}, {"client",clientCommand,-2, - "admin no-script random @connection", + "admin no-script random ok-loading ok-stale @connection", 0,NULL,0,0,0,0,0,0}, {"hello",helloCommand,-2, - "no-auth no-script fast no-monitor no-slowlog @connection", + "no-auth no-script fast no-monitor ok-loading ok-stale no-slowlog @connection", 0,NULL,0,0,0,0,0,0}, /* EVAL can modify the dataset, however it is not flagged as a write @@ -838,7 +838,7 @@ struct redisCommand redisCommandTable[] = { 0,evalGetKeys,0,0,0,0,0,0}, {"slowlog",slowlogCommand,-2, - "admin random", + "admin random ok-loading ok-stale", 0,NULL,0,0,0,0,0,0}, {"script",scriptCommand,-2, @@ -846,7 +846,7 @@ struct redisCommand redisCommandTable[] = { 0,NULL,0,0,0,0,0,0}, {"time",timeCommand,1, - "read-only random fast", + "read-only random fast ok-loading ok-stale", 0,NULL,0,0,0,0,0,0}, {"bitop",bitopCommand,-4, @@ -1498,7 +1498,7 @@ int clientsCronHandleTimeout(client *c, mstime_t now_ms) { time_t now = now_ms/1000; if (server.maxidletime && - !(c->flags & CLIENT_SLAVE) && /* no timeout for slaves */ + !(c->flags & CLIENT_SLAVE) && /* no timeout for slaves and monitors */ !(c->flags & CLIENT_MASTER) && /* no timeout for masters */ !(c->flags & CLIENT_BLOCKED) && /* no timeout for BLPOP */ !(c->flags & CLIENT_PUBSUB) && /* no timeout for Pub/Sub clients */ @@ -2124,6 +2124,10 @@ void beforeSleep(struct aeEventLoop *eventLoop) { if (listLength(server.unblocked_clients)) processUnblockedClients(); + /* Send the invalidation messages to clients participating to the + * client side caching protocol in broadcasting (BCAST) mode. */ + trackingBroadcastInvalidationMessages(); + /* Write the AOF buffer on disk */ flushAppendOnlyFile(0); @@ -3310,8 +3314,11 @@ void call(client *c, int flags) { if (c->cmd->flags & CMD_READONLY) { client *caller = (c->flags & CLIENT_LUA && server.lua_caller) ? server.lua_caller : c; - if (caller->flags & CLIENT_TRACKING) + if (caller->flags & CLIENT_TRACKING && + !(caller->flags & CLIENT_TRACKING_BCAST)) + { trackingRememberKeys(caller); + } } server.fixed_time_expire--; @@ -3377,8 +3384,10 @@ int processCommand(client *c) { /* Check if the user can run this command according to the current * ACLs. */ - int acl_retval = ACLCheckCommandPerm(c); + int acl_keypos; + int acl_retval = ACLCheckCommandPerm(c,&acl_keypos); if (acl_retval != ACL_OK) { + addACLLogEntry(c,acl_retval,acl_keypos,NULL); flagTransaction(c); if (acl_retval == ACL_DENIED_CMD) addReplyErrorFormat(c, @@ -3498,7 +3507,10 @@ int processCommand(client *c) { c->cmd->proc != unsubscribeCommand && c->cmd->proc != psubscribeCommand && c->cmd->proc != punsubscribeCommand) { - addReplyError(c,"only (P)SUBSCRIBE / (P)UNSUBSCRIBE / PING / QUIT allowed in this context"); + addReplyErrorFormat(c, + "Can't execute '%s': only (P)SUBSCRIBE / " + "(P)UNSUBSCRIBE / PING / QUIT are allowed in this context", + c->cmd->name); return C_OK; } @@ -4216,7 +4228,8 @@ sds genRedisInfoString(const char *section) { "active_defrag_misses:%lld\r\n" "active_defrag_key_hits:%lld\r\n" "active_defrag_key_misses:%lld\r\n" - "tracking_used_slots:%lld\r\n", + "tracking_total_keys:%lld\r\n" + "tracking_total_items:%lld\r\n", server.stat_numconnections, server.stat_numcommands, getInstantaneousMetric(STATS_METRIC_COMMAND), @@ -4244,7 +4257,8 @@ sds genRedisInfoString(const char *section) { server.stat_active_defrag_misses, server.stat_active_defrag_key_hits, server.stat_active_defrag_key_misses, - trackingGetUsedSlots()); + (unsigned long long) trackingGetTotalKeys(), + (unsigned long long) trackingGetTotalItems()); } /* Replication */ diff --git a/src/server.h b/src/server.h index 8e354c03d..87c293c26 100644 --- a/src/server.h +++ b/src/server.h @@ -247,6 +247,11 @@ typedef long long ustime_t; /* microsecond time type. */ #define CLIENT_TRACKING (1ULL<<31) /* Client enabled keys tracking in order to perform client side caching. */ #define CLIENT_TRACKING_BROKEN_REDIR (1ULL<<32) /* Target client is invalid. */ +#define CLIENT_TRACKING_BCAST (1ULL<<33) /* Tracking in BCAST mode. */ +#define CLIENT_TRACKING_OPTIN (1ULL<<34) /* Tracking in opt-in mode. */ +#define CLIENT_TRACKING_OPTOUT (1ULL<<35) /* Tracking in opt-out mode. */ +#define CLIENT_TRACKING_CACHING (1ULL<<36) /* CACHING yes/no was given, + depending on optin/optout mode. */ /* Client block type (btype field in client structure) * if CLIENT_BLOCKED flag is set. */ @@ -335,7 +340,7 @@ typedef long long ustime_t; /* microsecond time type. */ /* Anti-warning macro... */ #define UNUSED(V) ((void) V) -#define ZSKIPLIST_MAXLEVEL 64 /* Should be enough for 2^64 elements */ +#define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^64 elements */ #define ZSKIPLIST_P 0.25 /* Skiplist P = 1/4 */ /* Append only defines */ @@ -413,8 +418,8 @@ typedef long long ustime_t; /* microsecond time type. */ #define NOTIFY_EXPIRED (1<<8) /* x */ #define NOTIFY_EVICTED (1<<9) /* e */ #define NOTIFY_STREAM (1<<10) /* t */ -#define NOTIFY_KEY_MISS (1<<11) /* m */ -#define NOTIFY_ALL (NOTIFY_GENERIC | NOTIFY_STRING | NOTIFY_LIST | NOTIFY_SET | NOTIFY_HASH | NOTIFY_ZSET | NOTIFY_EXPIRED | NOTIFY_EVICTED | NOTIFY_STREAM | NOTIFY_KEY_MISS) /* A flag */ +#define NOTIFY_KEY_MISS (1<<11) /* m (Note: This one is excluded from NOTIFY_ALL on purpose) */ +#define NOTIFY_ALL (NOTIFY_GENERIC | NOTIFY_STRING | NOTIFY_LIST | NOTIFY_SET | NOTIFY_HASH | NOTIFY_ZSET | NOTIFY_EXPIRED | NOTIFY_EVICTED | NOTIFY_STREAM) /* A flag */ /* Get the first bind addr or NULL */ #define NET_FIRST_BIND_ADDR (server.bindaddr_count ? server.bindaddr[0] : NULL) @@ -822,7 +827,9 @@ typedef struct client { * invalidation messages for keys fetched by this client will be send to * the specified client ID. */ uint64_t client_tracking_redirection; - + rax *client_tracking_prefixes; /* A dictionary of prefixes we are already + subscribed to in BCAST mode, in the + context of client side caching. */ /* Response buffer */ int bufpos; char buf[PROTO_REPLY_CHUNK_BYTES]; @@ -1306,7 +1313,7 @@ struct redisServer { list *ready_keys; /* List of readyList structures for BLPOP & co */ /* Client side caching. */ unsigned int tracking_clients; /* # of clients with tracking enabled.*/ - int tracking_table_max_fill; /* Max fill percentage. */ + size_t tracking_table_max_keys; /* Max number of keys in tracking table. */ /* Sort parameters - qsort_r() is only available under BSD so we * have to take this state global, in order to pass it to sortCompare() */ int sort_desc; @@ -1385,6 +1392,7 @@ struct redisServer { dict *latency_events; /* ACLs */ char *acl_filename; /* ACL Users file. NULL if not configured. */ + unsigned long acllog_max_len; /* Maximum length of the ACL LOG list. */ /* Assert & bug reporting */ const char *assert_failed; const char *assert_file; @@ -1647,13 +1655,15 @@ void addReplyStatusFormat(client *c, const char *fmt, ...); #endif /* Client side caching (tracking mode) */ -void enableTracking(client *c, uint64_t redirect_to); +void enableTracking(client *c, uint64_t redirect_to, uint64_t options, robj **prefix, size_t numprefix); void disableTracking(client *c); void trackingRememberKeys(client *c); void trackingInvalidateKey(robj *keyobj); void trackingInvalidateKeysOnFlush(int dbid); void trackingLimitUsedSlots(void); -unsigned long long trackingGetUsedSlots(void); +uint64_t trackingGetTotalItems(void); +uint64_t trackingGetTotalKeys(void); +void trackingBroadcastInvalidationMessages(void); /* List data type */ void listTypeTryConversion(robj *subject, robj *value); @@ -1820,11 +1830,12 @@ void ACLInit(void); #define ACL_OK 0 #define ACL_DENIED_CMD 1 #define ACL_DENIED_KEY 2 +#define ACL_DENIED_AUTH 3 /* Only used for ACL LOG entries. */ int ACLCheckUserCredentials(robj *username, robj *password); int ACLAuthenticateUser(client *c, robj *username, robj *password); unsigned long ACLGetCommandID(const char *cmdname); user *ACLGetUserByName(const char *name, size_t namelen); -int ACLCheckCommandPerm(client *c); +int ACLCheckCommandPerm(client *c, int *keyidxptr); int ACLSetUser(user *u, const char *op, ssize_t oplen); sds ACLDefaultUserFirstPassword(void); uint64_t ACLGetCommandCategoryFlagByName(const char *name); @@ -1836,6 +1847,7 @@ void ACLLoadUsersAtStartup(void); void addReplyCommandCategories(client *c, struct redisCommand *cmd); user *ACLCreateUnlinkedUser(); void ACLFreeUserAndKillClients(user *u); +void addACLLogEntry(client *c, int reason, int keypos, sds username); /* Sorted sets data type */ @@ -2042,6 +2054,7 @@ robj *dbUnshareStringValue(redisDb *db, robj *key, robj *o); #define EMPTYDB_NO_FLAGS 0 /* No flags. */ #define EMPTYDB_ASYNC (1<<0) /* Reclaim memory in another thread. */ +#define EMPTYDB_BACKUP (1<<2) /* DB array is a backup for REPL_DISKLESS_LOAD_SWAPDB. */ long long emptyDb(int dbnum, int flags, void(callback)(void*)); long long emptyDbGeneric(redisDb *dbarray, int dbnum, int flags, void(callback)(void*)); void flushAllDataAndResetRDB(int flags); @@ -2074,6 +2087,7 @@ int *sortGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys); int *migrateGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys); int *georadiusGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys); int *xreadGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys); +int *memoryGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys); /* Cluster */ void clusterInit(void); diff --git a/src/t_stream.c b/src/t_stream.c index 0f0f97a1d..e1efc6bca 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -848,7 +848,7 @@ void streamPropagateXCLAIM(client *c, robj *key, streamCG *group, robj *groupnam argv[11] = createStringObject("JUSTID",6); argv[12] = createStringObject("LASTID",6); argv[13] = createObjectFromStreamID(&group->last_id); - propagate(server.xclaimCommand,c->db->id,argv,14,PROPAGATE_AOF|PROPAGATE_REPL); + alsoPropagate(server.xclaimCommand,c->db->id,argv,14,PROPAGATE_AOF|PROPAGATE_REPL); decrRefCount(argv[0]); decrRefCount(argv[3]); decrRefCount(argv[4]); @@ -875,7 +875,7 @@ void streamPropagateGroupID(client *c, robj *key, streamCG *group, robj *groupna argv[2] = key; argv[3] = groupname; argv[4] = createObjectFromStreamID(&group->last_id); - propagate(server.xgroupCommand,c->db->id,argv,5,PROPAGATE_AOF|PROPAGATE_REPL); + alsoPropagate(server.xgroupCommand,c->db->id,argv,5,PROPAGATE_AOF|PROPAGATE_REPL); decrRefCount(argv[0]); decrRefCount(argv[1]); decrRefCount(argv[4]); @@ -1850,6 +1850,8 @@ NULL server.dirty++; notifyKeyspaceEvent(NOTIFY_STREAM,"xgroup-destroy", c->argv[2],c->db->id); + /* We want to unblock any XREADGROUP consumers with -NOGROUP. */ + signalKeyAsReady(c->db,c->argv[2]); } else { addReply(c,shared.czero); } diff --git a/src/tracking.c b/src/tracking.c index acb97800a..45f83103a 100644 --- a/src/tracking.c +++ b/src/tracking.c @@ -30,39 +30,34 @@ #include "server.h" -/* The tracking table is constituted by 2^24 radix trees (each tree, and the - * table itself, are allocated in a lazy way only when needed) tracking - * clients that may have certain keys in their local, client side, cache. - * - * Keys are grouped into 2^24 slots, in a way similar to Redis Cluster hash - * slots, however here the function we use is crc64, taking the least - * significant 24 bits of the output. +/* The tracking table is constituted by a radix tree of keys, each pointing + * to a radix tree of client IDs, used to track the clients that may have + * certain keys in their local, client side, cache. * * When a client enables tracking with "CLIENT TRACKING on", each key served to - * the client is hashed to one of such slots, and Redis will remember what - * client may have keys about such slot. Later, when a key in a given slot is - * modified, all the clients that may have local copies of keys in that slot - * will receive an invalidation message. There is no distinction of database - * number: a single table is used. + * the client is remembered in the table mapping the keys to the client IDs. + * Later, when a key is modified, all the clients that may have local copy + * of such key will receive an invalidation message. * * Clients will normally take frequently requested objects in memory, removing - * them when invalidation messages are received. A strategy clients may use is - * to just cache objects in a dictionary, associating to each cached object - * some incremental epoch, or just a timestamp. When invalidation messages are - * received clients may store, in a different table, the timestamp (or epoch) - * of the invalidation of such given slot: later when accessing objects, the - * eviction of stale objects may be performed in a lazy way by checking if the - * cached object timestamp is older than the invalidation timestamp for such - * objects. - * - * The output of the 24 bit hash function is very large (more than 16 million - * possible slots), so clients that may want to use less resources may only - * use the most significant bits instead of the full 24 bits. */ -#define TRACKING_TABLE_SIZE (1<<24) -rax **TrackingTable = NULL; -unsigned long TrackingTableUsedSlots = 0; + * them when invalidation messages are received. */ +rax *TrackingTable = NULL; +rax *PrefixTable = NULL; +uint64_t TrackingTableTotalItems = 0; /* Total number of IDs stored across + the whole tracking table. This gives + an hint about the total memory we + are using server side for CSC. */ robj *TrackingChannelName; +/* This is the structure that we have as value of the PrefixTable, and + * represents the list of keys modified, and the list of clients that need + * to be notified, for a given prefix. */ +typedef struct bcastState { + rax *keys; /* Keys modified in the current event loop cycle. */ + rax *clients; /* Clients subscribed to the notification events for this + prefix. */ +} bcastState; + /* Remove the tracking state from the client 'c'. Note that there is not much * to do for us here, if not to decrement the counter of the clients in * tracking mode, because we just store the ID of the client in the tracking @@ -70,9 +65,56 @@ robj *TrackingChannelName; * client with many entries in the table is removed, it would cost a lot of * time to do the cleanup. */ void disableTracking(client *c) { + /* If this client is in broadcasting mode, we need to unsubscribe it + * from all the prefixes it is registered to. */ + if (c->flags & CLIENT_TRACKING_BCAST) { + raxIterator ri; + raxStart(&ri,c->client_tracking_prefixes); + raxSeek(&ri,"^",NULL,0); + while(raxNext(&ri)) { + bcastState *bs = raxFind(PrefixTable,ri.key,ri.key_len); + serverAssert(bs != raxNotFound); + raxRemove(bs->clients,(unsigned char*)&c,sizeof(c),NULL); + /* Was it the last client? Remove the prefix from the + * table. */ + if (raxSize(bs->clients) == 0) { + raxFree(bs->clients); + raxFree(bs->keys); + zfree(bs); + raxRemove(PrefixTable,ri.key,ri.key_len,NULL); + } + } + raxStop(&ri); + raxFree(c->client_tracking_prefixes); + c->client_tracking_prefixes = NULL; + } + + /* Clear flags and adjust the count. */ if (c->flags & CLIENT_TRACKING) { server.tracking_clients--; - c->flags &= ~(CLIENT_TRACKING|CLIENT_TRACKING_BROKEN_REDIR); + c->flags &= ~(CLIENT_TRACKING|CLIENT_TRACKING_BROKEN_REDIR| + CLIENT_TRACKING_BCAST|CLIENT_TRACKING_OPTIN| + CLIENT_TRACKING_OPTOUT); + } +} + +/* Set the client 'c' to track the prefix 'prefix'. If the client 'c' is + * already registered for the specified prefix, no operation is performed. */ +void enableBcastTrackingForPrefix(client *c, char *prefix, size_t plen) { + bcastState *bs = raxFind(PrefixTable,(unsigned char*)prefix,sdslen(prefix)); + /* If this is the first client subscribing to such prefix, create + * the prefix in the table. */ + if (bs == raxNotFound) { + bs = zmalloc(sizeof(*bs)); + bs->keys = raxNew(); + bs->clients = raxNew(); + raxInsert(PrefixTable,(unsigned char*)prefix,plen,bs,NULL); + } + if (raxTryInsert(bs->clients,(unsigned char*)&c,sizeof(c),NULL,NULL)) { + if (c->client_tracking_prefixes == NULL) + c->client_tracking_prefixes = raxNew(); + raxInsert(c->client_tracking_prefixes, + (unsigned char*)prefix,plen,NULL,NULL); } } @@ -83,24 +125,43 @@ void disableTracking(client *c) { * eventually get freed, we'll send a message to the original client to * inform it of the condition. Multiple clients can redirect the invalidation * messages to the same client ID. */ -void enableTracking(client *c, uint64_t redirect_to) { - if (c->flags & CLIENT_TRACKING) return; +void enableTracking(client *c, uint64_t redirect_to, uint64_t options, robj **prefix, size_t numprefix) { + if (!(c->flags & CLIENT_TRACKING)) server.tracking_clients++; c->flags |= CLIENT_TRACKING; - c->flags &= ~CLIENT_TRACKING_BROKEN_REDIR; + c->flags &= ~(CLIENT_TRACKING_BROKEN_REDIR|CLIENT_TRACKING_BCAST| + CLIENT_TRACKING_OPTIN|CLIENT_TRACKING_OPTOUT); c->client_tracking_redirection = redirect_to; - server.tracking_clients++; if (TrackingTable == NULL) { - TrackingTable = zcalloc(sizeof(rax*) * TRACKING_TABLE_SIZE); + TrackingTable = raxNew(); + PrefixTable = raxNew(); TrackingChannelName = createStringObject("__redis__:invalidate",20); } + + if (options & CLIENT_TRACKING_BCAST) { + c->flags |= CLIENT_TRACKING_BCAST; + if (numprefix == 0) enableBcastTrackingForPrefix(c,"",0); + for (size_t j = 0; j < numprefix; j++) { + sds sdsprefix = prefix[j]->ptr; + enableBcastTrackingForPrefix(c,sdsprefix,sdslen(sdsprefix)); + } + } + c->flags |= options & (CLIENT_TRACKING_OPTIN|CLIENT_TRACKING_OPTOUT); } -/* This function is called after the excution of a readonly command in the - * case the client 'c' has keys tracking enabled. It will populate the - * tracking ivalidation table according to the keys the user fetched, so that - * Redis will know what are the clients that should receive an invalidation - * message with certain groups of keys are modified. */ +/* This function is called after the execution of a readonly command in the + * case the client 'c' has keys tracking enabled and the tracking is not + * in BCAST mode. It will populate the tracking invalidation table according + * to the keys the user fetched, so that Redis will know what are the clients + * that should receive an invalidation message with certain groups of keys + * are modified. */ void trackingRememberKeys(client *c) { + /* Return if we are in optin/out mode and the right CACHING command + * was/wasn't given in order to modify the default behavior. */ + uint64_t optin = c->flags & CLIENT_TRACKING_OPTIN; + uint64_t optout = c->flags & CLIENT_TRACKING_OPTOUT; + uint64_t caching_given = c->flags & CLIENT_TRACKING_CACHING; + if ((optin && !caching_given) || (optout && caching_given)) return; + int numkeys; int *keys = getKeysFromCommand(c->cmd,c->argv,c->argc,&numkeys); if (keys == NULL) return; @@ -108,19 +169,30 @@ void trackingRememberKeys(client *c) { for(int j = 0; j < numkeys; j++) { int idx = keys[j]; sds sdskey = c->argv[idx]->ptr; - uint64_t hash = crc64(0, - (unsigned char*)sdskey,sdslen(sdskey))&(TRACKING_TABLE_SIZE-1); - if (TrackingTable[hash] == NULL) { - TrackingTable[hash] = raxNew(); - TrackingTableUsedSlots++; + rax *ids = raxFind(TrackingTable,(unsigned char*)sdskey,sdslen(sdskey)); + if (ids == raxNotFound) { + ids = raxNew(); + int inserted = raxTryInsert(TrackingTable,(unsigned char*)sdskey, + sdslen(sdskey),ids, NULL); + serverAssert(inserted == 1); } - raxTryInsert(TrackingTable[hash], - (unsigned char*)&c->id,sizeof(c->id),NULL,NULL); + if (raxTryInsert(ids,(unsigned char*)&c->id,sizeof(c->id),NULL,NULL)) + TrackingTableTotalItems++; } getKeysFreeResult(keys); } -void sendTrackingMessage(client *c, long long hash) { +/* Given a key name, this function sends an invalidation message in the + * proper channel (depending on RESP version: PubSub or Push message) and + * to the proper client (in case fo redirection), in the context of the + * client 'c' with tracking enabled. + * + * In case the 'proto' argument is non zero, the function will assume that + * 'keyname' points to a buffer of 'keylen' bytes already expressed in the + * form of Redis RESP protocol, representing an array of keys to send + * to the client as value of the invalidation. This is used in BCAST mode + * in order to optimized the implementation to use less CPU time. */ +void sendTrackingMessage(client *c, char *keyname, size_t keylen, int proto) { int using_redirection = 0; if (c->client_tracking_redirection) { client *redir = lookupClientByID(c->client_tracking_redirection); @@ -146,36 +218,45 @@ void sendTrackingMessage(client *c, long long hash) { if (c->resp > 2) { addReplyPushLen(c,2); addReplyBulkCBuffer(c,"invalidate",10); - addReplyLongLong(c,hash); } else if (using_redirection && c->flags & CLIENT_PUBSUB) { - robj *msg = createStringObjectFromLongLong(hash); - addReplyPubsubMessage(c,TrackingChannelName,msg); - decrRefCount(msg); + /* We use a static object to speedup things, however we assume + * that addReplyPubsubMessage() will not take a reference. */ + addReplyPubsubMessage(c,TrackingChannelName,NULL); + } else { + /* If are here, the client is not using RESP3, nor is + * redirecting to another client. We can't send anything to + * it since RESP2 does not support push messages in the same + * connection. */ + return; } -} -/* Invalidates a caching slot: this is actually the low level implementation - * of the API that Redis calls externally, that is trackingInvalidateKey(). */ -void trackingInvalidateSlot(uint64_t slot) { - if (TrackingTable == NULL || TrackingTable[slot] == NULL) return; + /* Send the "value" part, which is the array of keys. */ + if (proto) { + addReplyProto(c,keyname,keylen); + } else { + addReplyArrayLen(c,1); + addReplyBulkCBuffer(c,keyname,keylen); + } +} +/* This function is called when a key is modified in Redis and in the case + * we have at least one client with the BCAST mode enabled. + * Its goal is to set the key in the right broadcast state if the key + * matches one or more prefixes in the prefix table. Later when we + * return to the event loop, we'll send invalidation messages to the + * clients subscribed to each prefix. */ +void trackingRememberKeyToBroadcast(char *keyname, size_t keylen) { raxIterator ri; - raxStart(&ri,TrackingTable[slot]); + raxStart(&ri,PrefixTable); raxSeek(&ri,"^",NULL,0); while(raxNext(&ri)) { - uint64_t id; - memcpy(&id,ri.key,sizeof(id)); - client *c = lookupClientByID(id); - if (c == NULL || !(c->flags & CLIENT_TRACKING)) continue; - sendTrackingMessage(c,slot); + if (ri.key_len > keylen) continue; + if (ri.key_len != 0 && memcmp(ri.key,keyname,ri.key_len) != 0) + continue; + bcastState *bs = ri.data; + raxTryInsert(bs->keys,(unsigned char*)keyname,keylen,NULL,NULL); } raxStop(&ri); - - /* Free the tracking table: we'll create the radix tree and populate it - * again if more keys will be modified in this caching slot. */ - raxFree(TrackingTable[slot]); - TrackingTable[slot] = NULL; - TrackingTableUsedSlots--; } /* This function is called from signalModifiedKey() or other places in Redis @@ -183,28 +264,55 @@ void trackingInvalidateSlot(uint64_t slot) { * to send a notification to every client that may have keys about such caching * slot. */ void trackingInvalidateKey(robj *keyobj) { - if (TrackingTable == NULL || TrackingTableUsedSlots == 0) return; - + if (TrackingTable == NULL) return; sds sdskey = keyobj->ptr; - uint64_t hash = crc64(0, - (unsigned char*)sdskey,sdslen(sdskey))&(TRACKING_TABLE_SIZE-1); - trackingInvalidateSlot(hash); + + if (raxSize(PrefixTable) > 0) + trackingRememberKeyToBroadcast(sdskey,sdslen(sdskey)); + + rax *ids = raxFind(TrackingTable,(unsigned char*)sdskey,sdslen(sdskey)); + if (ids == raxNotFound) return;; + + raxIterator ri; + raxStart(&ri,ids); + raxSeek(&ri,"^",NULL,0); + while(raxNext(&ri)) { + uint64_t id; + memcpy(&id,ri.key,sizeof(id)); + client *c = lookupClientByID(id); + /* Note that if the client is in BCAST mode, we don't want to + * send invalidation messages that were pending in the case + * previously the client was not in BCAST mode. This can happen if + * TRACKING is enabled normally, and then the client switches to + * BCAST mode. */ + if (c == NULL || + !(c->flags & CLIENT_TRACKING)|| + c->flags & CLIENT_TRACKING_BCAST) + { + continue; + } + sendTrackingMessage(c,sdskey,sdslen(sdskey),0); + } + raxStop(&ri); + + /* Free the tracking table: we'll create the radix tree and populate it + * again if more keys will be modified in this caching slot. */ + TrackingTableTotalItems -= raxSize(ids); + raxFree(ids); + raxRemove(TrackingTable,(unsigned char*)sdskey,sdslen(sdskey),NULL); } /* This function is called when one or all the Redis databases are flushed - * (dbid == -1 in case of FLUSHALL). Caching slots are not specific for - * each DB but are global: currently what we do is sending a special + * (dbid == -1 in case of FLUSHALL). Caching keys are not specific for + * each DB but are global: currently what we do is send a special * notification to clients with tracking enabled, invalidating the caching - * slot "-1", which means, "all the keys", in order to avoid flooding clients + * key "", which means, "all the keys", in order to avoid flooding clients * with many invalidation messages for all the keys they may hold. - * - * However trying to flush the tracking table here is very costly: - * we need scanning 16 million caching slots in the table to check - * if they are used, this introduces a big delay. So what we do is to really - * flush the table in the case of FLUSHALL. When a FLUSHDB is called instead - * we just send the invalidation message to all the clients, but don't - * flush the table: it will slowly get garbage collected as more keys - * are modified in the used caching slots. */ + */ +void freeTrackingRadixTree(void *rt) { + raxFree(rt); +} + void trackingInvalidateKeysOnFlush(int dbid) { if (server.tracking_clients) { listNode *ln; @@ -213,84 +321,131 @@ void trackingInvalidateKeysOnFlush(int dbid) { while ((ln = listNext(&li)) != NULL) { client *c = listNodeValue(ln); if (c->flags & CLIENT_TRACKING) { - sendTrackingMessage(c,-1); + sendTrackingMessage(c,"",1,0); } } } /* In case of FLUSHALL, reclaim all the memory used by tracking. */ if (dbid == -1 && TrackingTable) { - for (int j = 0; j < TRACKING_TABLE_SIZE && TrackingTableUsedSlots > 0; j++) { - if (TrackingTable[j] != NULL) { - raxFree(TrackingTable[j]); - TrackingTable[j] = NULL; - TrackingTableUsedSlots--; - } - } - - /* If there are no clients with tracking enabled, we can even - * reclaim the memory used by the table itself. The code assumes - * the table is allocated only if there is at least one client alive - * with tracking enabled. */ - if (server.tracking_clients == 0) { - zfree(TrackingTable); - TrackingTable = NULL; - } + raxFreeWithCallback(TrackingTable,freeTrackingRadixTree); + TrackingTable = raxNew(); + TrackingTableTotalItems = 0; } } /* Tracking forces Redis to remember information about which client may have - * keys about certian caching slots. In workloads where there are a lot of - * reads, but keys are hardly modified, the amount of information we have - * to remember server side could be a lot: for each 16 millions of caching - * slots we may end with a radix tree containing many entries. + * certain keys. In workloads where there are a lot of reads, but keys are + * hardly modified, the amount of information we have to remember server side + * could be a lot, with the number of keys being totally not bound. * - * So Redis allows the user to configure a maximum fill rate for the + * So Redis allows the user to configure a maximum number of keys for the * invalidation table. This function makes sure that we don't go over the * specified fill rate: if we are over, we can just evict informations about - * random caching slots, and send invalidation messages to clients like if - * the key was modified. */ + * a random key, and send invalidation messages to clients like if the key was + * modified. */ void trackingLimitUsedSlots(void) { static unsigned int timeout_counter = 0; - - if (server.tracking_table_max_fill == 0) return; /* No limits set. */ - unsigned int max_slots = - (TRACKING_TABLE_SIZE/100) * server.tracking_table_max_fill; - if (TrackingTableUsedSlots <= max_slots) { + if (TrackingTable == NULL) return; + if (server.tracking_table_max_keys == 0) return; /* No limits set. */ + size_t max_keys = server.tracking_table_max_keys; + if (raxSize(TrackingTable) <= max_keys) { timeout_counter = 0; return; /* Limit not reached. */ } - /* We have to invalidate a few slots to reach the limit again. The effort + /* We have to invalidate a few keys to reach the limit again. The effort * we do here is proportional to the number of times we entered this * function and found that we are still over the limit. */ int effort = 100 * (timeout_counter+1); - /* Let's start at a random position, and perform linear probing, in order - * to improve cache locality. However once we are able to find an used - * slot, jump again randomly, in order to avoid creating big holes in the - * table (that will make this funciton use more resourced later). */ + /* We just remove one key after another by using a random walk. */ + raxIterator ri; + raxStart(&ri,TrackingTable); while(effort > 0) { - unsigned int idx = rand() % TRACKING_TABLE_SIZE; - do { - effort--; - idx = (idx+1) % TRACKING_TABLE_SIZE; - if (TrackingTable[idx] != NULL) { - trackingInvalidateSlot(idx); - if (TrackingTableUsedSlots <= max_slots) { - timeout_counter = 0; - return; /* Return ASAP: we are again under the limit. */ - } else { - break; /* Jump to next random position. */ - } - } - } while(effort > 0); + effort--; + raxSeek(&ri,"^",NULL,0); + raxRandomWalk(&ri,0); + rax *ids = ri.data; + TrackingTableTotalItems -= raxSize(ids); + raxFree(ids); + raxRemove(TrackingTable,ri.key,ri.key_len,NULL); + if (raxSize(TrackingTable) <= max_keys) { + timeout_counter = 0; + raxStop(&ri); + return; /* Return ASAP: we are again under the limit. */ + } } + + /* If we reach this point, we were not able to go under the configured + * limit using the maximum effort we had for this run. */ + raxStop(&ri); timeout_counter++; } +/* This function will run the prefixes of clients in BCAST mode and + * keys that were modified about each prefix, and will send the + * notifications to each client in each prefix. */ +void trackingBroadcastInvalidationMessages(void) { + raxIterator ri, ri2; + + /* Return ASAP if there is nothing to do here. */ + if (TrackingTable == NULL || !server.tracking_clients) return; + + raxStart(&ri,PrefixTable); + raxSeek(&ri,"^",NULL,0); + while(raxNext(&ri)) { + bcastState *bs = ri.data; + if (raxSize(bs->keys)) { + /* Create the array reply with the list of keys once, then send + * it to all the clients subscribed to this prefix. */ + char buf[32]; + size_t len = ll2string(buf,sizeof(buf),raxSize(bs->keys)); + sds proto = sdsempty(); + proto = sdsMakeRoomFor(proto,raxSize(bs->keys)*15); + proto = sdscatlen(proto,"*",1); + proto = sdscatlen(proto,buf,len); + proto = sdscatlen(proto,"\r\n",2); + raxStart(&ri2,bs->keys); + raxSeek(&ri2,"^",NULL,0); + while(raxNext(&ri2)) { + len = ll2string(buf,sizeof(buf),ri2.key_len); + proto = sdscatlen(proto,"$",1); + proto = sdscatlen(proto,buf,len); + proto = sdscatlen(proto,"\r\n",2); + proto = sdscatlen(proto,ri2.key,ri2.key_len); + proto = sdscatlen(proto,"\r\n",2); + } + raxStop(&ri2); + + /* Send this array of keys to every client in the list. */ + raxStart(&ri2,bs->clients); + raxSeek(&ri2,"^",NULL,0); + while(raxNext(&ri2)) { + client *c; + memcpy(&c,ri2.key,sizeof(c)); + sendTrackingMessage(c,proto,sdslen(proto),1); + } + raxStop(&ri2); + + /* Clean up: we can remove everything from this state, because we + * want to only track the new keys that will be accumulated starting + * from now. */ + sdsfree(proto); + } + raxFree(bs->keys); + bs->keys = raxNew(); + } + raxStop(&ri); +} + /* This is just used in order to access the amount of used slots in the * tracking table. */ -unsigned long long trackingGetUsedSlots(void) { - return TrackingTableUsedSlots; +uint64_t trackingGetTotalItems(void) { + return TrackingTableTotalItems; +} + +uint64_t trackingGetTotalKeys(void) { + if (TrackingTable == NULL) return 0; + return raxSize(TrackingTable); } diff --git a/src/util.c b/src/util.c index 20471b539..2be42a0df 100644 --- a/src/util.c +++ b/src/util.c @@ -471,13 +471,14 @@ int string2ld(const char *s, size_t slen, long double *dp) { long double value; char *eptr; - if (slen >= sizeof(buf)) return 0; + if (slen == 0 || slen >= sizeof(buf)) return 0; memcpy(buf,s,slen); buf[slen] = '\0'; errno = 0; value = strtold(buf, &eptr); if (isspace(buf[0]) || eptr[0] != '\0' || + (size_t)(eptr-buf) != slen || (errno == ERANGE && (value == HUGE_VAL || value == -HUGE_VAL || value == 0)) || errno == EINVAL || diff --git a/tests/modules/blockonkeys.c b/tests/modules/blockonkeys.c index 959918b1c..10dc65b1a 100644 --- a/tests/modules/blockonkeys.c +++ b/tests/modules/blockonkeys.c @@ -172,13 +172,13 @@ int bpopgt_reply_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int arg REDISMODULE_NOT_USED(argv); REDISMODULE_NOT_USED(argc); RedisModuleString *keyname = RedisModule_GetBlockedClientReadyKey(ctx); - long long gt = (long long)RedisModule_GetBlockedClientPrivateData(ctx); + long long *pgt = RedisModule_GetBlockedClientPrivateData(ctx); fsl_t *fsl; if (!get_fsl(ctx, keyname, REDISMODULE_READ, 0, &fsl, 0)) return REDISMODULE_ERR; - if (!fsl || fsl->list[fsl->length-1] <= gt) + if (!fsl || fsl->list[fsl->length-1] <= *pgt) return REDISMODULE_ERR; RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]); @@ -192,10 +192,8 @@ int bpopgt_timeout_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int a } void bpopgt_free_privdata(RedisModuleCtx *ctx, void *privdata) { - /* Nothing to do because privdata is actually a 'long long', - * not a pointer to the heap */ REDISMODULE_NOT_USED(ctx); - REDISMODULE_NOT_USED(privdata); + RedisModule_Free(privdata); } /* FSL.BPOPGT <key> <gt> <timeout> - Block clients until list has an element greater than <gt>. @@ -217,9 +215,12 @@ int fsl_bpopgt(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { return REDISMODULE_OK; if (!fsl || fsl->list[fsl->length-1] <= gt) { + /* We use malloc so the tests in blockedonkeys.tcl can check for memory leaks */ + long long *pgt = RedisModule_Alloc(sizeof(long long)); + *pgt = gt; /* Key is empty or has <2 elements, we must block */ RedisModule_BlockClientOnKeys(ctx, bpopgt_reply_callback, bpopgt_timeout_callback, - bpopgt_free_privdata, timeout, &argv[1], 1, (void*)gt); + bpopgt_free_privdata, timeout, &argv[1], 1, pgt); } else { RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]); } diff --git a/tests/modules/fork.c b/tests/modules/fork.c index 1a139ef1b..0443d9ef0 100644 --- a/tests/modules/fork.c +++ b/tests/modules/fork.c @@ -42,7 +42,7 @@ int fork_create(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) /* child */ RedisModule_Log(ctx, "notice", "fork child started"); - usleep(200000); + usleep(500000); RedisModule_Log(ctx, "notice", "fork child exiting"); RedisModule_ExitFromChild(code_to_exit_with); /* unreachable */ diff --git a/tests/modules/misc.c b/tests/modules/misc.c index b5a032f60..1048d5065 100644 --- a/tests/modules/misc.c +++ b/tests/modules/misc.c @@ -74,6 +74,19 @@ int test_ld_conv(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { RedisModule_ReplyWithError(ctx, err); goto final; } + + /* Make sure we can't convert a string that has \0 in it */ + char buf[4] = "123"; + buf[1] = '\0'; + RedisModuleString *s3 = RedisModule_CreateString(ctx, buf, 3); + long double ld3; + if (RedisModule_StringToLongDouble(s3, &ld3) == REDISMODULE_OK) { + RedisModule_ReplyWithError(ctx, "Invalid string successfully converted to long double"); + RedisModule_FreeString(ctx, s3); + goto final; + } + RedisModule_FreeString(ctx, s3); + RedisModule_ReplyWithLongDouble(ctx, ld2); final: RedisModule_FreeString(ctx, s1); diff --git a/tests/support/server.tcl b/tests/support/server.tcl index b20f1ad36..d086366dc 100644 --- a/tests/support/server.tcl +++ b/tests/support/server.tcl @@ -53,6 +53,7 @@ proc kill_server config { } # kill server and wait for the process to be totally exited + send_data_packet $::test_server_fd server-killing $pid catch {exec kill $pid} if {$::valgrind} { set max_wait 60000 @@ -140,6 +141,18 @@ proc tags {tags code} { uplevel 1 $code set ::tags [lrange $::tags 0 end-[llength $tags]] } + +# Write the configuration in the dictionary 'config' in the specified +# file name. +proc create_server_config_file {filename config} { + set fp [open $filename w+] + foreach directive [dict keys $config] { + puts -nonewline $fp "$directive " + puts $fp [dict get $config $directive] + } + close $fp +} + proc start_server {options {code undefined}} { # If we are running against an external server, we just push the # host/port pair in the stack the first time @@ -221,56 +234,91 @@ proc start_server {options {code undefined}} { # write new configuration to temporary file set config_file [tmpfile redis.conf] - set fp [open $config_file w+] - foreach directive [dict keys $config] { - puts -nonewline $fp "$directive " - puts $fp [dict get $config $directive] - } - close $fp + create_server_config_file $config_file $config set stdout [format "%s/%s" [dict get $config "dir"] "stdout"] set stderr [format "%s/%s" [dict get $config "dir"] "stderr"] - if {$::valgrind} { - set pid [exec valgrind --track-origins=yes --suppressions=src/valgrind.sup --show-reachable=no --show-possibly-lost=no --leak-check=full src/redis-server $config_file > $stdout 2> $stderr &] - } elseif ($::stack_logging) { - set pid [exec /usr/bin/env MallocStackLogging=1 MallocLogFile=/tmp/malloc_log.txt src/redis-server $config_file > $stdout 2> $stderr &] - } else { - set pid [exec src/redis-server $config_file > $stdout 2> $stderr &] - } + # We need a loop here to retry with different ports. + set server_started 0 + while {$server_started == 0} { + if {$::verbose} { + puts -nonewline "=== ($tags) Starting server ${::host}:${::port} " + } - # Tell the test server about this new instance. - send_data_packet $::test_server_fd server-spawned $pid + send_data_packet $::test_server_fd "server-spawning" "port $::port" - # check that the server actually started - # ugly but tries to be as fast as possible... - if {$::valgrind} {set retrynum 1000} else {set retrynum 100} + if {$::valgrind} { + set pid [exec valgrind --track-origins=yes --suppressions=src/valgrind.sup --show-reachable=no --show-possibly-lost=no --leak-check=full src/redis-server $config_file > $stdout 2> $stderr &] + } elseif ($::stack_logging) { + set pid [exec /usr/bin/env MallocStackLogging=1 MallocLogFile=/tmp/malloc_log.txt src/redis-server $config_file > $stdout 2> $stderr &] + } else { + set pid [exec src/redis-server $config_file > $stdout 2> $stderr &] + } - if {$::verbose} { - puts -nonewline "=== ($tags) Starting server ${::host}:${::port} " - } + # Tell the test server about this new instance. + send_data_packet $::test_server_fd server-spawned $pid + + # check that the server actually started + # ugly but tries to be as fast as possible... + if {$::valgrind} {set retrynum 1000} else {set retrynum 100} + + # Wait for actual startup + set checkperiod 100; # Milliseconds + set maxiter [expr {120*1000/100}] ; # Wait up to 2 minutes. + set port_busy 0 + while {![info exists _pid]} { + regexp {PID:\s(\d+)} [exec cat $stdout] _ _pid + after $checkperiod + incr maxiter -1 + if {$maxiter == 0} { + start_server_error $config_file "No PID detected in log $stdout" + puts "--- LOG CONTENT ---" + puts [exec cat $stdout] + puts "-------------------" + break + } - if {$code ne "undefined"} { - set serverisup [server_is_up $::host $::port $retrynum] - } else { - set serverisup 1 - } + # Check if the port is actually busy and the server failed + # for this reason. + if {[regexp {Could not create server TCP} [exec cat $stdout]]} { + set port_busy 1 + break + } + } - if {$::verbose} { - puts "" - } + # Sometimes we have to try a different port, even if we checked + # for availability. Other test clients may grab the port before we + # are able to do it for example. + if {$port_busy} { + puts "Port $::port was already busy, trying another port..." + set ::port [find_available_port [expr {$::port+1}]] + if {$::tls} { + dict set config "tls-port" $::port + } else { + dict set config port $::port + } + create_server_config_file $config_file $config + continue; # Try again + } - if {!$serverisup} { - set err {} - append err [exec cat $stdout] "\n" [exec cat $stderr] - start_server_error $config_file $err - return - } + if {$code ne "undefined"} { + set serverisup [server_is_up $::host $::port $retrynum] + } else { + set serverisup 1 + } - # Wait for actual startup - while {![info exists _pid]} { - regexp {PID:\s(\d+)} [exec cat $stdout] _ _pid - after 100 + if {$::verbose} { + puts "" + } + + if {!$serverisup} { + set err {} + append err [exec cat $stdout] "\n" [exec cat $stderr] + start_server_error $config_file $err + return + } + set server_started 1 } # setup properties to be able to initialize a client object diff --git a/tests/test_helper.tcl b/tests/test_helper.tcl index cb7e4e328..fa5579669 100644 --- a/tests/test_helper.tcl +++ b/tests/test_helper.tcl @@ -87,7 +87,7 @@ set ::file ""; # If set, runs only the tests in this comma separated list set ::curfile ""; # Hold the filename of the current suite set ::accurate 0; # If true runs fuzz tests with more iterations set ::force_failure 0 -set ::timeout 600; # 10 minutes without progresses will quit the test. +set ::timeout 1200; # 20 minutes without progresses will quit the test. set ::last_progress [clock seconds] set ::active_servers {} ; # Pids of active Redis instances. set ::dont_clean 0 @@ -289,7 +289,7 @@ proc read_from_test_client fd { puts "\[$completed_tests_count/$all_tests_count [colorstr yellow $status]\]: $data ($elapsed seconds)" lappend ::clients_time_history $elapsed $data signal_idle_client $fd - set ::active_clients_task($fd) DONE + set ::active_clients_task($fd) "(DONE) $data" } elseif {$status eq {ok}} { if {!$::quiet} { puts "\[[colorstr green $status]\]: $data" @@ -320,10 +320,16 @@ proc read_from_test_client fd { exit 1 } elseif {$status eq {testing}} { set ::active_clients_task($fd) "(IN PROGRESS) $data" + } elseif {$status eq {server-spawning}} { + set ::active_clients_task($fd) "(SPAWNING SERVER) $data" } elseif {$status eq {server-spawned}} { lappend ::active_servers $data + set ::active_clients_task($fd) "(SPAWNED SERVER) pid:$data" + } elseif {$status eq {server-killing}} { + set ::active_clients_task($fd) "(KILLING SERVER) pid:$data" } elseif {$status eq {server-killed}} { set ::active_servers [lsearch -all -inline -not -exact $::active_servers $data] + set ::active_clients_task($fd) "(KILLED SERVER) pid:$data" } else { if {!$::quiet} { puts "\[$status\]: $data" @@ -333,7 +339,7 @@ proc read_from_test_client fd { proc show_clients_state {} { # The following loop is only useful for debugging tests that may - # enter an infinite loop. Commented out normally. + # enter an infinite loop. foreach x $::active_clients { if {[info exist ::active_clients_task($x)]} { puts "$x => $::active_clients_task($x)" @@ -363,8 +369,6 @@ proc signal_idle_client fd { set ::active_clients \ [lsearch -all -inline -not -exact $::active_clients $fd] - if 0 {show_clients_state} - # New unit to process? if {$::next_test != [llength $::all_tests]} { if {!$::quiet} { @@ -380,6 +384,7 @@ proc signal_idle_client fd { } } else { lappend ::idle_clients $fd + set ::active_clients_task($fd) "SLEEPING, no more units to assign" if {[llength $::active_clients] == 0} { the_end } diff --git a/tests/unit/acl.tcl b/tests/unit/acl.tcl index 2205d2d86..fc1664a75 100644 --- a/tests/unit/acl.tcl +++ b/tests/unit/acl.tcl @@ -141,4 +141,111 @@ start_server {tags {"acl"}} { r ACL setuser newuser -debug # The test framework will detect a leak if any. } + + test {ACL LOG shows failed command executions at toplevel} { + r ACL LOG RESET + r ACL setuser antirez >foo on +set ~object:1234 + r ACL setuser antirez +eval +multi +exec + r AUTH antirez foo + catch {r GET foo} + r AUTH default "" + set entry [lindex [r ACL LOG] 0] + assert {[dict get $entry username] eq {antirez}} + assert {[dict get $entry context] eq {toplevel}} + assert {[dict get $entry reason] eq {command}} + assert {[dict get $entry object] eq {get}} + } + + test {ACL LOG is able to test similar events} { + r AUTH antirez foo + catch {r GET foo} + catch {r GET foo} + catch {r GET foo} + r AUTH default "" + set entry [lindex [r ACL LOG] 0] + assert {[dict get $entry count] == 4} + } + + test {ACL LOG is able to log keys access violations and key name} { + r AUTH antirez foo + catch {r SET somekeynotallowed 1234} + r AUTH default "" + set entry [lindex [r ACL LOG] 0] + assert {[dict get $entry reason] eq {key}} + assert {[dict get $entry object] eq {somekeynotallowed}} + } + + test {ACL LOG RESET is able to flush the entries in the log} { + r ACL LOG RESET + assert {[llength [r ACL LOG]] == 0} + } + + test {ACL LOG can distinguish the transaction context (1)} { + r AUTH antirez foo + r MULTI + catch {r INCR foo} + catch {r EXEC} + r AUTH default "" + set entry [lindex [r ACL LOG] 0] + assert {[dict get $entry context] eq {multi}} + assert {[dict get $entry object] eq {incr}} + } + + test {ACL LOG can distinguish the transaction context (2)} { + set rd1 [redis_deferring_client] + r ACL SETUSER antirez +incr + + r AUTH antirez foo + r MULTI + r INCR object:1234 + $rd1 ACL SETUSER antirez -incr + $rd1 read + catch {r EXEC} + $rd1 close + r AUTH default "" + set entry [lindex [r ACL LOG] 0] + assert {[dict get $entry context] eq {multi}} + assert {[dict get $entry object] eq {incr}} + r ACL SETUSER antirez -incr + } + + test {ACL can log errors in the context of Lua scripting} { + r AUTH antirez foo + catch {r EVAL {redis.call('incr','foo')} 0} + r AUTH default "" + set entry [lindex [r ACL LOG] 0] + assert {[dict get $entry context] eq {lua}} + assert {[dict get $entry object] eq {incr}} + } + + test {ACL LOG can accept a numerical argument to show less entries} { + r AUTH antirez foo + catch {r INCR foo} + catch {r INCR foo} + catch {r INCR foo} + catch {r INCR foo} + r AUTH default "" + assert {[llength [r ACL LOG]] > 1} + assert {[llength [r ACL LOG 2]] == 2} + } + + test {ACL LOG can log failed auth attempts} { + catch {r AUTH antirez wrong-password} + set entry [lindex [r ACL LOG] 0] + assert {[dict get $entry context] eq {toplevel}} + assert {[dict get $entry reason] eq {auth}} + assert {[dict get $entry object] eq {AUTH}} + assert {[dict get $entry username] eq {antirez}} + } + + test {ACL LOG entries are limited to a maximum amount} { + r ACL LOG RESET + r CONFIG SET acllog-max-len 5 + r AUTH antirez foo + for {set j 0} {$j < 10} {incr j} { + catch {r SET obj:$j 123} + } + r AUTH default "" + assert {[llength [r ACL LOG]] == 5} + } } diff --git a/tests/unit/memefficiency.tcl b/tests/unit/memefficiency.tcl index d152e212c..06b0e07d7 100644 --- a/tests/unit/memefficiency.tcl +++ b/tests/unit/memefficiency.tcl @@ -39,6 +39,8 @@ start_server {tags {"memefficiency"}} { start_server {tags {"defrag"}} { if {[string match {*jemalloc*} [s mem_allocator]]} { test "Active defrag" { + r config set save "" ;# prevent bgsave from interfereing with save below + r config set hz 100 r config set activedefrag no r config set active-defrag-threshold-lower 5 r config set active-defrag-cycle-min 65 @@ -46,8 +48,8 @@ start_server {tags {"defrag"}} { r config set active-defrag-ignore-bytes 2mb r config set maxmemory 100mb r config set maxmemory-policy allkeys-lru - r debug populate 700000 asdf 150 - r debug populate 170000 asdf 300 + r debug populate 700000 asdf1 150 + r debug populate 170000 asdf2 300 r ping ;# trigger eviction following the previous population after 120 ;# serverCron only updates the info once in 100ms set frag [s allocator_frag_ratio] @@ -55,6 +57,11 @@ start_server {tags {"defrag"}} { puts "frag $frag" } assert {$frag >= 1.4} + + r config set latency-monitor-threshold 5 + r latency reset + r config set maxmemory 110mb ;# prevent further eviction (not to fail the digest test) + set digest [r debug digest] catch {r config set activedefrag yes} e if {![string match {DISABLED*} $e]} { # Wait for the active defrag to start working (decision once a @@ -78,19 +85,37 @@ start_server {tags {"defrag"}} { # Test the the fragmentation is lower. after 120 ;# serverCron only updates the info once in 100ms set frag [s allocator_frag_ratio] + set max_latency 0 + foreach event [r latency latest] { + lassign $event eventname time latency max + if {$eventname == "active-defrag-cycle"} { + set max_latency $max + } + } if {$::verbose} { puts "frag $frag" + puts "max latency $max_latency" + puts [r latency latest] + puts [r latency history active-defrag-cycle] } assert {$frag < 1.1} + # due to high fragmentation, 100hz, and active-defrag-cycle-max set to 75, + # we expect max latency to be not much higher than 7.5ms but due to rare slowness threshold is set higher + assert {$max_latency <= 30} } else { set _ "" } - } {} + # verify the data isn't corrupted or changed + set newdigest [r debug digest] + assert {$digest eq $newdigest} + r save ;# saving an rdb iterates over all the data / pointers + } {OK} test "Active defrag big keys" { r flushdb r config resetstat r config set save "" ;# prevent bgsave from interfereing with save below + r config set hz 100 r config set activedefrag no r config set active-defrag-max-scan-fields 1000 r config set active-defrag-threshold-lower 5 @@ -142,7 +167,7 @@ start_server {tags {"defrag"}} { for {set j 0} {$j < 500000} {incr j} { $rd read ; # Discard replies } - assert {[r dbsize] == 500010} + assert_equal [r dbsize] 500010 # create some fragmentation for {set j 0} {$j < 500000} {incr j 2} { @@ -151,7 +176,7 @@ start_server {tags {"defrag"}} { for {set j 0} {$j < 500000} {incr j 2} { $rd read ; # Discard replies } - assert {[r dbsize] == 250010} + assert_equal [r dbsize] 250010 # start defrag after 120 ;# serverCron only updates the info once in 100ms @@ -200,14 +225,106 @@ start_server {tags {"defrag"}} { puts [r latency history active-defrag-cycle] } assert {$frag < 1.1} - # due to high fragmentation, 10hz, and active-defrag-cycle-max set to 75, - # we expect max latency to be not much higher than 75ms - assert {$max_latency <= 120} + # due to high fragmentation, 100hz, and active-defrag-cycle-max set to 75, + # we expect max latency to be not much higher than 7.5ms but due to rare slowness threshold is set higher + assert {$max_latency <= 30} } # verify the data isn't corrupted or changed set newdigest [r debug digest] assert {$digest eq $newdigest} r save ;# saving an rdb iterates over all the data / pointers } {OK} + + test "Active defrag big list" { + r flushdb + r config resetstat + r config set save "" ;# prevent bgsave from interfereing with save below + r config set hz 100 + r config set activedefrag no + r config set active-defrag-max-scan-fields 1000 + r config set active-defrag-threshold-lower 5 + r config set active-defrag-cycle-min 65 + r config set active-defrag-cycle-max 75 + r config set active-defrag-ignore-bytes 2mb + r config set maxmemory 0 + r config set list-max-ziplist-size 5 ;# list of 500k items will have 100k quicklist nodes + + # create big keys with 10k items + set rd [redis_deferring_client] + + set expected_frag 1.7 + # add a mass of list nodes to two lists (allocations are interlaced) + set val [string repeat A 100] ;# 5 items of 100 bytes puts us in the 640 bytes bin, which has 32 regs, so high potential for fragmentation + for {set j 0} {$j < 500000} {incr j} { + $rd lpush biglist1 $val + $rd lpush biglist2 $val + } + for {set j 0} {$j < 500000} {incr j} { + $rd read ; # Discard replies + $rd read ; # Discard replies + } + + # create some fragmentation + r del biglist2 + + # start defrag + after 120 ;# serverCron only updates the info once in 100ms + set frag [s allocator_frag_ratio] + if {$::verbose} { + puts "frag $frag" + } + + assert {$frag >= $expected_frag} + r config set latency-monitor-threshold 5 + r latency reset + + set digest [r debug digest] + catch {r config set activedefrag yes} e + if {![string match {DISABLED*} $e]} { + # wait for the active defrag to start working (decision once a second) + wait_for_condition 50 100 { + [s active_defrag_running] ne 0 + } else { + fail "defrag not started." + } + + # wait for the active defrag to stop working + wait_for_condition 500 100 { + [s active_defrag_running] eq 0 + } else { + after 120 ;# serverCron only updates the info once in 100ms + puts [r info memory] + puts [r info stats] + puts [r memory malloc-stats] + fail "defrag didn't stop." + } + + # test the the fragmentation is lower + after 120 ;# serverCron only updates the info once in 100ms + set frag [s allocator_frag_ratio] + set max_latency 0 + foreach event [r latency latest] { + lassign $event eventname time latency max + if {$eventname == "active-defrag-cycle"} { + set max_latency $max + } + } + if {$::verbose} { + puts "frag $frag" + puts "max latency $max_latency" + puts [r latency latest] + puts [r latency history active-defrag-cycle] + } + assert {$frag < 1.1} + # due to high fragmentation, 100hz, and active-defrag-cycle-max set to 75, + # we expect max latency to be not much higher than 7.5ms but due to rare slowness threshold is set higher + assert {$max_latency <= 30} + } + # verify the data isn't corrupted or changed + set newdigest [r debug digest] + assert {$digest eq $newdigest} + r save ;# saving an rdb iterates over all the data / pointers + r del biglist1 ;# coverage for quicklistBookmarksClear + } {1} } } diff --git a/tests/unit/moduleapi/blockonkeys.tcl b/tests/unit/moduleapi/blockonkeys.tcl index cb99ab1c9..b380227e0 100644 --- a/tests/unit/moduleapi/blockonkeys.tcl +++ b/tests/unit/moduleapi/blockonkeys.tcl @@ -45,18 +45,24 @@ start_server {tags {"modules"}} { test {Module client blocked on keys (with metadata): Timeout} { r del k set rd [redis_deferring_client] + $rd client id + set cid [$rd read] r fsl.push k 33 $rd fsl.bpopgt k 35 1 assert_equal {Request timedout} [$rd read] + r client kill id $cid ;# try to smoke-out client-related memory leak } test {Module client blocked on keys (with metadata): Blocked, case 1} { r del k set rd [redis_deferring_client] + $rd client id + set cid [$rd read] r fsl.push k 33 $rd fsl.bpopgt k 33 0 r fsl.push k 34 assert_equal {34} [$rd read] + r client kill id $cid ;# try to smoke-out client-related memory leak } test {Module client blocked on keys (with metadata): Blocked, case 2} { @@ -70,6 +76,35 @@ start_server {tags {"modules"}} { assert_equal {36} [$rd read] } + test {Module client blocked on keys (with metadata): Blocked, CLIENT KILL} { + r del k + set rd [redis_deferring_client] + $rd client id + set cid [$rd read] + $rd fsl.bpopgt k 35 0 + r client kill id $cid ;# try to smoke-out client-related memory leak + } + + test {Module client blocked on keys (with metadata): Blocked, CLIENT UNBLOCK TIMEOUT} { + r del k + set rd [redis_deferring_client] + $rd client id + set cid [$rd read] + $rd fsl.bpopgt k 35 0 + r client unblock $cid timeout ;# try to smoke-out client-related memory leak + assert_equal {Request timedout} [$rd read] + } + + test {Module client blocked on keys (with metadata): Blocked, CLIENT UNBLOCK ERROR} { + r del k + set rd [redis_deferring_client] + $rd client id + set cid [$rd read] + $rd fsl.bpopgt k 35 0 + r client unblock $cid error ;# try to smoke-out client-related memory leak + assert_error "*unblocked*" {$rd read} + } + test {Module client blocked on keys does not wake up on wrong type} { r del k set rd [redis_deferring_client] diff --git a/tests/unit/moduleapi/fork.tcl b/tests/unit/moduleapi/fork.tcl index f7d7e47d5..8535a3382 100644 --- a/tests/unit/moduleapi/fork.tcl +++ b/tests/unit/moduleapi/fork.tcl @@ -20,9 +20,8 @@ start_server {tags {"modules"}} { test {Module fork kill} { r fork.create 3 - after 20 + after 250 r fork.kill - after 100 assert {[count_log_message "fork child started"] eq "2"} assert {[count_log_message "Received SIGUSR1 in child"] eq "1"} diff --git a/tests/unit/scripting.tcl b/tests/unit/scripting.tcl index 2543a0377..fb36d0b80 100644 --- a/tests/unit/scripting.tcl +++ b/tests/unit/scripting.tcl @@ -741,3 +741,8 @@ start_server {tags {"scripting repl"}} { } } +start_server {tags {"scripting"}} { + r script debug sync + r eval {return 'hello'} 0 + r eval {return 'hello'} 0 +} diff --git a/tests/unit/tracking.tcl b/tests/unit/tracking.tcl new file mode 100644 index 000000000..2058319f7 --- /dev/null +++ b/tests/unit/tracking.tcl @@ -0,0 +1,66 @@ +start_server {tags {"tracking"}} { + # Create a deferred client we'll use to redirect invalidation + # messages to. + set rd1 [redis_deferring_client] + $rd1 client id + set redir [$rd1 read] + $rd1 subscribe __redis__:invalidate + $rd1 read ; # Consume the SUBSCRIBE reply. + + test {Clients are able to enable tracking and redirect it} { + r CLIENT TRACKING on REDIRECT $redir + } {*OK} + + test {The other connection is able to get invalidations} { + r SET a 1 + r GET a + r INCR a + r INCR b ; # This key should not be notified, since it wasn't fetched. + set keys [lindex [$rd1 read] 2] + assert {[llength $keys] == 1} + assert {[lindex $keys 0] eq {a}} + } + + test {The client is now able to disable tracking} { + # Make sure to add a few more keys in the tracking list + # so that we can check for leaks, as a side effect. + r MGET a b c d e f g + r CLIENT TRACKING off + } + + test {Clients can enable the BCAST mode with the empty prefix} { + r CLIENT TRACKING on BCAST REDIRECT $redir + } {*OK*} + + test {The connection gets invalidation messages about all the keys} { + r MSET a 1 b 2 c 3 + set keys [lsort [lindex [$rd1 read] 2]] + assert {$keys eq {a b c}} + } + + test {Clients can enable the BCAST mode with prefixes} { + r CLIENT TRACKING off + r CLIENT TRACKING on BCAST REDIRECT $redir PREFIX a: PREFIX b: + r MULTI + r INCR a:1 + r INCR a:2 + r INCR b:1 + r INCR b:2 + r EXEC + # Because of the internals, we know we are going to receive + # two separated notifications for the two different prefixes. + set keys1 [lsort [lindex [$rd1 read] 2]] + set keys2 [lsort [lindex [$rd1 read] 2]] + set keys [lsort [list {*}$keys1 {*}$keys2]] + assert {$keys eq {a:1 a:2 b:1 b:2}} + } + + test {Adding prefixes to BCAST mode works} { + r CLIENT TRACKING on BCAST REDIRECT $redir PREFIX c: + r INCR c:1234 + set keys [lsort [lindex [$rd1 read] 2]] + assert {$keys eq {c:1234}} + } + + $rd1 close +} diff --git a/tests/unit/type/hash.tcl b/tests/unit/type/hash.tcl index d2c679d32..9f8a21b1c 100644 --- a/tests/unit/type/hash.tcl +++ b/tests/unit/type/hash.tcl @@ -390,6 +390,13 @@ start_server {tags {"hash"}} { lappend rv [string match "ERR*not*float*" $bigerr] } {1 1} + test {HINCRBYFLOAT fails against hash value that contains a null-terminator in the middle} { + r hset h f "1\x002" + catch {r hincrbyfloat h f 1} err + set rv {} + lappend rv [string match "ERR*not*float*" $err] + } {1} + test {HSTRLEN against the small hash} { set err {} foreach k [array names smallhash *] { diff --git a/tests/unit/type/stream-cgroups.tcl b/tests/unit/type/stream-cgroups.tcl index a59e168ef..072ed14d6 100644 --- a/tests/unit/type/stream-cgroups.tcl +++ b/tests/unit/type/stream-cgroups.tcl @@ -161,6 +161,15 @@ start_server { assert {[$rd read] == {}} ;# before the fix, client didn't even block, but was served synchronously with {mystream {}} } + test {XGROUP DESTROY should unblock XREADGROUP with -NOGROUP} { + r del mystream + r XGROUP CREATE mystream mygroup $ MKSTREAM + set rd [redis_deferring_client] + $rd XREADGROUP GROUP mygroup Alice BLOCK 100 STREAMS mystream ">" + r XGROUP DESTROY mystream mygroup + assert_error "*NOGROUP*" {$rd read} + } + test {XCLAIM can claim PEL items from another consumer} { # Add 3 items into the stream, and create a consumer group r del mystream |