diff options
author | yoav-steinberg <yoav@monfort.co.il> | 2022-01-02 09:39:01 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-01-02 09:39:01 +0200 |
commit | 1bf6d6f11eb08d98ba3de688d18d805a2d8696d5 (patch) | |
tree | e808f1f65487fbeae9eeb281b1d33a1fd8baaf63 | |
parent | 888e92eb5797c1a6ce4c7b89c5814dbe13a8d8e3 (diff) | |
download | redis-1bf6d6f11eb08d98ba3de688d18d805a2d8696d5.tar.gz |
Generate RDB with Functions only via redis-cli --functions-rdb (#9968)
This is needed in order to ease the deployment of functions for ephemeral cases, where user
needs to spin up a server with functions pre-loaded.
#### Details:
* Added `--functions-rdb` option to _redis-cli_.
* Functions only rdb via `REPLCONF rdb-filter-only functions`. This is a placeholder for a space
separated inclusion filter for the RDB. In the future can be `REPLCONF rdb-filter-only
"functions db:3 key-patten:user*"` and a complementing `rdb-filter-exclude` `REPLCONF`
can also be added.
* Handle "slave requirements" specification to RDB saving code so we can use the same RDB
when different slaves express the same requirements (like functions-only) and not share the
RDB when their requirements differ. This is currently just a flags `int`, but can be extended to
a more complex structure with various filter fields.
* make sure to support filters only in diskless replication mode (not to override the persistence file),
we do that by forcing diskless (even if disabled by config)
other changes:
* some refactoring in rdb.c (extract portion of a big function to a sub-function)
* rdb_key_save_delay used in AOFRW too
* sendChildInfo takes the number of updated keys (incremental, rather than absolute)
Co-authored-by: Oran Agra <oran@redislabs.com>
-rw-r--r-- | src/aof.c | 6 | ||||
-rw-r--r-- | src/db.c | 2 | ||||
-rw-r--r-- | src/debug.c | 2 | ||||
-rw-r--r-- | src/functions.c | 2 | ||||
-rw-r--r-- | src/networking.c | 1 | ||||
-rw-r--r-- | src/rdb.c | 220 | ||||
-rw-r--r-- | src/rdb.h | 10 | ||||
-rw-r--r-- | src/redis-cli.c | 27 | ||||
-rw-r--r-- | src/replication.c | 111 | ||||
-rw-r--r-- | src/server.c | 6 | ||||
-rw-r--r-- | src/server.h | 5 | ||||
-rw-r--r-- | tests/integration/redis-cli.tcl | 29 |
12 files changed, 288 insertions, 133 deletions
@@ -1509,6 +1509,10 @@ int rewriteAppendOnlyFileRio(rio *aof) { updated_time = now; } } + + /* Delay before next key if required (for testing) */ + if (server.rdb_key_save_delay) + debugDelay(server.rdb_key_save_delay); } dictReleaseIterator(di); di = NULL; @@ -1552,7 +1556,7 @@ int rewriteAppendOnlyFile(char *filename) { if (server.aof_use_rdb_preamble) { int error; - if (rdbSaveRio(&aof,&error,RDBFLAGS_AOF_PREAMBLE,NULL) == C_ERR) { + if (rdbSaveRio(SLAVE_REQ_NONE,&aof,&error,RDBFLAGS_AOF_PREAMBLE,NULL) == C_ERR) { errno = error; goto werr; } @@ -599,7 +599,7 @@ void flushAllDataAndResetRDB(int flags) { int saved_dirty = server.dirty; rdbSaveInfo rsi, *rsiptr; rsiptr = rdbPopulateSaveInfo(&rsi); - rdbSave(server.rdb_filename,rsiptr); + rdbSave(SLAVE_REQ_NONE,server.rdb_filename,rsiptr); server.dirty = saved_dirty; } diff --git a/src/debug.c b/src/debug.c index d5a180573..f01509e84 100644 --- a/src/debug.c +++ b/src/debug.c @@ -544,7 +544,7 @@ NULL if (save) { rdbSaveInfo rsi, *rsiptr; rsiptr = rdbPopulateSaveInfo(&rsi); - if (rdbSave(server.rdb_filename,rsiptr) != C_OK) { + if (rdbSave(SLAVE_REQ_NONE,server.rdb_filename,rsiptr) != C_OK) { addReplyErrorObject(c,shared.err); return; } diff --git a/src/functions.c b/src/functions.c index 2da142ecb..a515709ca 100644 --- a/src/functions.c +++ b/src/functions.c @@ -408,7 +408,7 @@ void functionDumpCommand(client *c) { rio payload; rioInitWithBuffer(&payload, sdsempty()); - functionsSaveRio(&payload); + rdbSaveFunctions(&payload); /* RDB version */ buf[0] = RDB_VERSION & 0xff; diff --git a/src/networking.c b/src/networking.c index 238af0b8f..d0c74faba 100644 --- a/src/networking.c +++ b/src/networking.c @@ -171,6 +171,7 @@ client *createClient(connection *conn) { c->slave_listening_port = 0; c->slave_addr = NULL; c->slave_capa = SLAVE_CAPA_NONE; + c->slave_req = SLAVE_REQ_NONE; c->reply = listCreate(); c->reply_bytes = 0; c->obuf_soft_limit_reached_time = 0; @@ -1214,28 +1214,119 @@ ssize_t rdbSaveSingleModuleAux(rio *rdb, int when, moduleType *mt) { return io.bytes; } -int functionsSaveRio(rio *rdb) { - int ret = C_ERR; +ssize_t rdbSaveFunctions(rio *rdb) { dict *functions = functionsGet(); dictIterator *iter = dictGetIterator(functions); dictEntry *entry = NULL; + ssize_t written = 0; + ssize_t ret; while ((entry = dictNext(iter))) { - rdbSaveType(rdb, RDB_OPCODE_FUNCTION); + if ((ret = rdbSaveType(rdb, RDB_OPCODE_FUNCTION)) < 0) goto werr; + written += ret; functionInfo *fi = dictGetVal(entry); - if (rdbSaveRawString(rdb, (unsigned char *) fi->name, sdslen(fi->name)) == -1) goto done; - if (rdbSaveRawString(rdb, (unsigned char *) fi->ei->name, sdslen(fi->ei->name)) == -1) goto done; + if ((ret = rdbSaveRawString(rdb, (unsigned char *) fi->name, sdslen(fi->name))) < 0) goto werr; + written += ret; + if ((ret = rdbSaveRawString(rdb, (unsigned char *) fi->ei->name, sdslen(fi->ei->name))) < 0) goto werr; + written += ret; if (fi->desc) { - if (rdbSaveLen(rdb, 1) == -1) goto done; /* desc exists */ - if (rdbSaveRawString(rdb, (unsigned char *) fi->desc, sdslen(fi->desc)) == -1) goto done; + /* desc exists */ + if ((ret = rdbSaveLen(rdb, 1)) < 0) goto werr; + written += ret; + if ((ret = rdbSaveRawString(rdb, (unsigned char *) fi->desc, sdslen(fi->desc))) < 0) goto werr; + written += ret; } else { - if (rdbSaveLen(rdb, 0) == -1) goto done; /* desc not exists */ + /* desc not exists */ + if ((ret = rdbSaveLen(rdb, 0)) < 0) goto werr; + written += ret; } - if (rdbSaveRawString(rdb, (unsigned char *) fi->code, sdslen(fi->code)) == -1) goto done; + if ((ret = rdbSaveRawString(rdb, (unsigned char *) fi->code, sdslen(fi->code))) < 0) goto werr; + written += ret; } - ret = C_OK; -done: dictReleaseIterator(iter); - return ret; + return written; + +werr: + dictReleaseIterator(iter); + return -1; +} + +ssize_t rdbSaveDb(rio *rdb, int dbid, int rdbflags, long *key_counter) { + dictIterator *di; + dictEntry *de; + ssize_t written = 0; + ssize_t res; + static long long info_updated_time = 0; + size_t processed = 0; + char *pname = (rdbflags & RDBFLAGS_AOF_PREAMBLE) ? "AOF rewrite" : "RDB"; + + redisDb *db = server.db + dbid; + dict *d = db->dict; + if (dictSize(d) == 0) return 0; + di = dictGetSafeIterator(d); + + /* Write the SELECT DB opcode */ + if ((res = rdbSaveType(rdb,RDB_OPCODE_SELECTDB)) < 0) goto werr; + written += res; + if ((res = rdbSaveLen(rdb, dbid)) < 0) goto werr; + written += res; + + /* Write the RESIZE DB opcode. */ + uint64_t db_size, expires_size; + db_size = dictSize(db->dict); + expires_size = dictSize(db->expires); + if ((res = rdbSaveType(rdb,RDB_OPCODE_RESIZEDB)) < 0) goto werr; + written += res; + if ((res = rdbSaveLen(rdb,db_size)) < 0) goto werr; + written += res; + if ((res = rdbSaveLen(rdb,expires_size)) < 0) goto werr; + written += res; + + /* Iterate this DB writing every entry */ + while((de = dictNext(di)) != NULL) { + sds keystr = dictGetKey(de); + robj key, *o = dictGetVal(de); + long long expire; + size_t rdb_bytes_before_key = rdb->processed_bytes; + + initStaticStringObject(key,keystr); + expire = getExpire(db,&key); + if ((res = rdbSaveKeyValuePair(rdb, &key, o, expire, dbid)) < 0) goto werr; + written += res; + + /* In fork child process, we can try to release memory back to the + * OS and possibly avoid or decrease COW. We give the dismiss + * mechanism a hint about an estimated size of the object we stored. */ + size_t dump_size = rdb->processed_bytes - rdb_bytes_before_key; + if (server.in_fork_child) dismissObject(o, dump_size); + + /* When this RDB is produced as part of an AOF rewrite, move + * accumulated diff from parent to child while rewriting in + * order to have a smaller final write. */ + if (rdbflags & RDBFLAGS_AOF_PREAMBLE && + rdb->processed_bytes > processed+AOF_READ_DIFF_INTERVAL_BYTES) + { + processed = rdb->processed_bytes; + aofReadDiffFromParent(); + } + + /* Update child info every 1 second (approximately). + * in order to avoid calling mstime() on each iteration, we will + * check the diff every 1024 keys */ + if (((*key_counter)++ & 1023) == 0) { + long long now = mstime(); + if (now - info_updated_time >= 1000) { + sendChildInfo(CHILD_INFO_TYPE_CURRENT_INFO, *key_counter, pname); + info_updated_time = now; + } + } + } + + dictReleaseIterator(di); + return written; + +werr: + dictReleaseIterator(di); + return -1; } /* Produces a dump of the database in RDB format sending it to the specified @@ -1246,87 +1337,30 @@ done: * When the function returns C_ERR and if 'error' is not NULL, the * integer pointed by 'error' is set to the value of errno just after the I/O * error. */ -int rdbSaveRio(rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi) { - dictIterator *di = NULL; - dictEntry *de; +int rdbSaveRio(int req, rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi) { char magic[10]; uint64_t cksum; - size_t processed = 0; + long key_counter = 0; int j; - long key_count = 0; - long long info_updated_time = 0; - char *pname = (rdbflags & RDBFLAGS_AOF_PREAMBLE) ? "AOF rewrite" : "RDB"; if (server.rdb_checksum) rdb->update_cksum = rioGenericUpdateChecksum; snprintf(magic,sizeof(magic),"REDIS%04d",RDB_VERSION); if (rdbWriteRaw(rdb,magic,9) == -1) goto werr; if (rdbSaveInfoAuxFields(rdb,rdbflags,rsi) == -1) goto werr; - if (rdbSaveModulesAux(rdb, REDISMODULE_AUX_BEFORE_RDB) == -1) goto werr; - - if (functionsSaveRio(rdb) != C_OK) goto werr; - - for (j = 0; j < server.dbnum; j++) { - redisDb *db = server.db+j; - dict *d = db->dict; - if (dictSize(d) == 0) continue; - di = dictGetSafeIterator(d); - - /* Write the SELECT DB opcode */ - if (rdbSaveType(rdb,RDB_OPCODE_SELECTDB) == -1) goto werr; - if (rdbSaveLen(rdb,j) == -1) goto werr; - - /* Write the RESIZE DB opcode. */ - uint64_t db_size, expires_size; - db_size = dictSize(db->dict); - expires_size = dictSize(db->expires); - if (rdbSaveType(rdb,RDB_OPCODE_RESIZEDB) == -1) goto werr; - if (rdbSaveLen(rdb,db_size) == -1) goto werr; - if (rdbSaveLen(rdb,expires_size) == -1) goto werr; - - /* Iterate this DB writing every entry */ - while((de = dictNext(di)) != NULL) { - sds keystr = dictGetKey(de); - robj key, *o = dictGetVal(de); - long long expire; - size_t rdb_bytes_before_key = rdb->processed_bytes; - - initStaticStringObject(key,keystr); - expire = getExpire(db,&key); - if (rdbSaveKeyValuePair(rdb,&key,o,expire,j) == -1) goto werr; - - /* In fork child process, we can try to release memory back to the - * OS and possibly avoid or decrease COW. We give the dismiss - * mechanism a hint about an estimated size of the object we stored. */ - size_t dump_size = rdb->processed_bytes - rdb_bytes_before_key; - if (server.in_fork_child) dismissObject(o, dump_size); - - /* When this RDB is produced as part of an AOF rewrite, move - * accumulated diff from parent to child while rewriting in - * order to have a smaller final write. */ - if (rdbflags & RDBFLAGS_AOF_PREAMBLE && - rdb->processed_bytes > processed+AOF_READ_DIFF_INTERVAL_BYTES) - { - processed = rdb->processed_bytes; - aofReadDiffFromParent(); - } - - /* Update child info every 1 second (approximately). - * in order to avoid calling mstime() on each iteration, we will - * check the diff every 1024 keys */ - if ((key_count++ & 1023) == 0) { - long long now = mstime(); - if (now - info_updated_time >= 1000) { - sendChildInfo(CHILD_INFO_TYPE_CURRENT_INFO, key_count, pname); - info_updated_time = now; - } - } + if (!(req & SLAVE_REQ_RDB_FUNCTIONS_ONLY) && rdbSaveModulesAux(rdb, REDISMODULE_AUX_BEFORE_RDB) == -1) goto werr; + + /* save functions */ + if (rdbSaveFunctions(rdb) == -1) goto werr; + + /* save all databases, skip this if we're in functions-only mode */ + if (!(req & SLAVE_REQ_RDB_FUNCTIONS_ONLY)) { + for (j = 0; j < server.dbnum; j++) { + if (rdbSaveDb(rdb, j, rdbflags, &key_counter) == -1) goto werr; } - dictReleaseIterator(di); - di = NULL; /* So that we don't release it again on error. */ } - if (rdbSaveModulesAux(rdb, REDISMODULE_AUX_AFTER_RDB) == -1) goto werr; + if (!(req & SLAVE_REQ_RDB_FUNCTIONS_ONLY) && rdbSaveModulesAux(rdb, REDISMODULE_AUX_AFTER_RDB) == -1) goto werr; /* EOF opcode */ if (rdbSaveType(rdb,RDB_OPCODE_EOF) == -1) goto werr; @@ -1340,7 +1374,6 @@ int rdbSaveRio(rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi) { werr: if (error) *error = errno; - if (di) dictReleaseIterator(di); return C_ERR; } @@ -1352,7 +1385,7 @@ werr: * While the suffix is the 40 bytes hex string we announced in the prefix. * This way processes receiving the payload can understand when it ends * without doing any processing of the content. */ -int rdbSaveRioWithEOFMark(rio *rdb, int *error, rdbSaveInfo *rsi) { +int rdbSaveRioWithEOFMark(int req, rio *rdb, int *error, rdbSaveInfo *rsi) { char eofmark[RDB_EOF_MARK_SIZE]; startSaving(RDBFLAGS_REPLICATION); @@ -1361,7 +1394,7 @@ int rdbSaveRioWithEOFMark(rio *rdb, int *error, rdbSaveInfo *rsi) { if (rioWrite(rdb,"$EOF:",5) == 0) goto werr; if (rioWrite(rdb,eofmark,RDB_EOF_MARK_SIZE) == 0) goto werr; if (rioWrite(rdb,"\r\n",2) == 0) goto werr; - if (rdbSaveRio(rdb,error,RDBFLAGS_NONE,rsi) == C_ERR) goto werr; + if (rdbSaveRio(req,rdb,error,RDBFLAGS_NONE,rsi) == C_ERR) goto werr; if (rioWrite(rdb,eofmark,RDB_EOF_MARK_SIZE) == 0) goto werr; stopSaving(1); return C_OK; @@ -1374,7 +1407,7 @@ werr: /* Write error. */ } /* Save the DB on disk. Return C_ERR on error, C_OK on success. */ -int rdbSave(char *filename, rdbSaveInfo *rsi) { +int rdbSave(int req, char *filename, rdbSaveInfo *rsi) { char tmpfile[256]; char cwd[MAXPATHLEN]; /* Current working dir path for error messages. */ FILE *fp = NULL; @@ -1401,7 +1434,7 @@ int rdbSave(char *filename, rdbSaveInfo *rsi) { if (server.rdb_save_incremental_fsync) rioSetAutoSync(&rdb,REDIS_AUTOSYNC_BYTES); - if (rdbSaveRio(&rdb,&error,RDBFLAGS_NONE,rsi) == C_ERR) { + if (rdbSaveRio(req,&rdb,&error,RDBFLAGS_NONE,rsi) == C_ERR) { errno = error; goto werr; } @@ -1444,7 +1477,7 @@ werr: return C_ERR; } -int rdbSaveBackground(char *filename, rdbSaveInfo *rsi) { +int rdbSaveBackground(int req, char *filename, rdbSaveInfo *rsi) { pid_t childpid; if (hasActiveChildProcess()) return C_ERR; @@ -1458,7 +1491,7 @@ int rdbSaveBackground(char *filename, rdbSaveInfo *rsi) { /* Child */ redisSetProcTitle("redis-rdb-bgsave"); redisSetCpuAffinity(server.bgsave_cpulist); - retval = rdbSave(filename,rsi); + retval = rdbSave(req, filename,rsi); if (retval == C_OK) { sendChildCowInfo(CHILD_INFO_TYPE_RDB_COW_SIZE, "RDB"); } @@ -3249,7 +3282,7 @@ void killRDBChild(void) { /* Spawn an RDB child that writes the RDB to the sockets of the slaves * that are currently in SLAVE_STATE_WAIT_BGSAVE_START state. */ -int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) { +int rdbSaveToSlavesSockets(int req, rdbSaveInfo *rsi) { listNode *ln; listIter li; pid_t childpid; @@ -3288,6 +3321,9 @@ int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) { while((ln = listNext(&li))) { client *slave = ln->value; if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) { + /* Check slave has the exact requirements */ + if (slave->slave_req != req) + continue; server.rdb_pipe_conns[server.rdb_pipe_numconns++] = slave->conn; replicationSetupSlaveForFullResync(slave,getPsyncInitialOffset()); } @@ -3304,7 +3340,7 @@ int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) { redisSetProcTitle("redis-rdb-to-slaves"); redisSetCpuAffinity(server.bgsave_cpulist); - retval = rdbSaveRioWithEOFMark(&rdb,NULL,rsi); + retval = rdbSaveRioWithEOFMark(req,&rdb,NULL,rsi); if (retval == C_OK && rioFlush(&rdb) == 0) retval = C_ERR; @@ -3366,7 +3402,7 @@ void saveCommand(client *c) { } rdbSaveInfo rsi, *rsiptr; rsiptr = rdbPopulateSaveInfo(&rsi); - if (rdbSave(server.rdb_filename,rsiptr) == C_OK) { + if (rdbSave(SLAVE_REQ_NONE, server.rdb_filename,rsiptr) == C_OK) { addReply(c,shared.ok); } else { addReplyErrorObject(c,shared.err); @@ -3403,7 +3439,7 @@ void bgsaveCommand(client *c) { "Use BGSAVE SCHEDULE in order to schedule a BGSAVE whenever " "possible."); } - } else if (rdbSaveBackground(server.rdb_filename,rsiptr) == C_OK) { + } else if (rdbSaveBackground(SLAVE_REQ_NONE,server.rdb_filename,rsiptr) == C_OK) { addReplyStatus(c,"Background saving started"); } else { addReplyErrorObject(c,shared.err); @@ -148,10 +148,10 @@ int rdbLoadLenByRef(rio *rdb, int *isencoded, uint64_t *lenptr); int rdbSaveObjectType(rio *rdb, robj *o); int rdbLoadObjectType(rio *rdb); int rdbLoad(char *filename, rdbSaveInfo *rsi, int rdbflags); -int rdbSaveBackground(char *filename, rdbSaveInfo *rsi); -int rdbSaveToSlavesSockets(rdbSaveInfo *rsi); +int rdbSaveBackground(int req, char *filename, rdbSaveInfo *rsi); +int rdbSaveToSlavesSockets(int req, rdbSaveInfo *rsi); void rdbRemoveTempFile(pid_t childpid, int from_signal); -int rdbSave(char *filename, rdbSaveInfo *rsi); +int rdbSave(int req, char *filename, rdbSaveInfo *rsi); ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key, int dbid); size_t rdbSavedObjectLen(robj *o, robj *key, int dbid); robj *rdbLoadObject(int type, rio *rdb, sds key, int dbid, int *error); @@ -170,8 +170,8 @@ int rdbLoadBinaryFloatValue(rio *rdb, float *val); int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi); int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadingCtx *rdb_loading_ctx); int rdbFunctionLoad(rio *rdb, int ver, functionsCtx* functions_ctx, int rdbflags, sds *err); -int rdbSaveRio(rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi); -int functionsSaveRio(rio *rdb); +int rdbSaveRio(int req, rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi); +ssize_t rdbSaveFunctions(rio *rdb); rdbSaveInfo *rdbPopulateSaveInfo(rdbSaveInfo *rsi); #endif diff --git a/src/redis-cli.c b/src/redis-cli.c index 2fb7d9ef0..397e3bfa8 100644 --- a/src/redis-cli.c +++ b/src/redis-cli.c @@ -226,6 +226,7 @@ static struct config { int pipe_mode; int pipe_timeout; int getrdb_mode; + int get_functions_rdb_mode; int stat_mode; int scan_mode; int intrinsic_latency_mode; @@ -1643,6 +1644,9 @@ static int parseOptions(int argc, char **argv) { } else if (!strcmp(argv[i],"--rdb") && !lastarg) { config.getrdb_mode = 1; config.rdb_filename = argv[++i]; + } else if (!strcmp(argv[i],"--functions-rdb") && !lastarg) { + config.get_functions_rdb_mode = 1; + config.rdb_filename = argv[++i]; } else if (!strcmp(argv[i],"--pipe")) { config.pipe_mode = 1; } else if (!strcmp(argv[i],"--pipe-timeout") && !lastarg) { @@ -1848,6 +1852,11 @@ static int parseOptions(int argc, char **argv) { " line interface may not be safe.\n", stderr); } + if (config.get_functions_rdb_mode && config.getrdb_mode) { + fprintf(stderr,"Option --functions-rdb and --rdb are mutually exclusive.\n"); + exit(1); + } + if (config.stdin_lastarg && config.stdin_tag_arg) { fprintf(stderr, "Options -x and -X are mutually exclusive.\n"); exit(1); @@ -1949,6 +1958,8 @@ static void usage(int err) { " --replica Simulate a replica showing commands received from the master.\n" " --rdb <filename> Transfer an RDB dump from remote server to local file.\n" " Use filename of \"-\" to write to stdout.\n" +" --functions-rdb <filename> Like --rdb but only get the functions (not the keys)\n" +" when getting the RDB dump file.\n" " --pipe Transfer raw Redis protocol from stdin to server.\n" " --pipe-timeout <n> In --pipe mode, abort with error if after sending all data.\n" " no reply is received within <n> seconds.\n" @@ -7158,7 +7169,8 @@ static void latencyDistMode(void) { #define RDB_EOF_MARK_SIZE 40 -void sendReplconf(const char* arg1, const char* arg2) { +int sendReplconf(const char* arg1, const char* arg2) { + int res = 1; fprintf(stderr, "sending REPLCONF %s %s\n", arg1, arg2); redisReply *reply = redisCommand(context, "REPLCONF %s %s", arg1, arg2); @@ -7167,10 +7179,12 @@ void sendReplconf(const char* arg1, const char* arg2) { fprintf(stderr, "\nI/O error\n"); exit(1); } else if(reply->type == REDIS_REPLY_ERROR) { - fprintf(stderr, "REPLCONF %s error: %s\n", arg1, reply->str); /* non fatal, old versions may not support it */ + fprintf(stderr, "REPLCONF %s error: %s\n", arg1, reply->str); + res = 0; } freeReplyObject(reply); + return res; } void sendCapa() { @@ -8411,6 +8425,7 @@ int main(int argc, char **argv) { config.cluster_send_asking = 0; config.slave_mode = 0; config.getrdb_mode = 0; + config.get_functions_rdb_mode = 0; config.stat_mode = 0; config.scan_mode = 0; config.intrinsic_latency_mode = 0; @@ -8522,11 +8537,15 @@ int main(int argc, char **argv) { slaveMode(); } - /* Get RDB mode. */ - if (config.getrdb_mode) { + /* Get RDB/functions mode. */ + if (config.getrdb_mode || config.get_functions_rdb_mode) { if (cliConnect(0) == REDIS_ERR) exit(1); sendCapa(); sendRdbOnly(); + if (config.get_functions_rdb_mode && !sendReplconf("rdb-filter-only", "functions")) { + fprintf(stderr, "Failed requesting functions only RDB from server, aborting\n"); + exit(1); + } getRDB(NULL); } diff --git a/src/replication.c b/src/replication.c index ade3ca1c9..e7a092645 100644 --- a/src/replication.c +++ b/src/replication.c @@ -826,12 +826,19 @@ need_full_resync: * started. * * Returns C_OK on success or C_ERR otherwise. */ -int startBgsaveForReplication(int mincapa) { +int startBgsaveForReplication(int mincapa, int req) { int retval; - int socket_target = server.repl_diskless_sync && (mincapa & SLAVE_CAPA_EOF); + int socket_target = 0; listIter li; listNode *ln; + /* We use a socket target if slave can handle the EOF marker and we're configured to do diskless syncs. + * Note that in case we're creating a "filtered" RDB (functions-only) we also force socket replication + * to avoid overwriting the snapshot RDB file with filtered data. */ + socket_target = (server.repl_diskless_sync || (req & SLAVE_REQ_RDB_FUNCTIONS_ONLY)) && (mincapa & SLAVE_CAPA_EOF); + /* `SYNC` should have failed with error if we don't support socket and require a filter, assert this here */ + serverAssert(socket_target || !(req & SLAVE_REQ_RDB_FUNCTIONS_ONLY)); + serverLog(LL_NOTICE,"Starting BGSAVE for SYNC with target: %s", socket_target ? "replicas sockets" : "disk"); @@ -841,9 +848,9 @@ int startBgsaveForReplication(int mincapa) { * otherwise slave will miss repl-stream-db. */ if (rsiptr) { if (socket_target) - retval = rdbSaveToSlavesSockets(rsiptr); + retval = rdbSaveToSlavesSockets(req,rsiptr); else - retval = rdbSaveBackground(server.rdb_filename,rsiptr); + retval = rdbSaveBackground(req,server.rdb_filename,rsiptr); } else { serverLog(LL_WARNING,"BGSAVE for replication: replication information not available, can't generate the RDB file right now. Try later."); retval = C_ERR; @@ -886,8 +893,10 @@ int startBgsaveForReplication(int mincapa) { client *slave = ln->value; if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) { - replicationSetupSlaveForFullResync(slave, - getPsyncInitialOffset()); + /* Check slave has the exact requirements */ + if (slave->slave_req != req) + continue; + replicationSetupSlaveForFullResync(slave, getPsyncInitialOffset()); } } } @@ -946,6 +955,14 @@ void syncCommand(client *c) { return; } + /* Fail sync if slave doesn't support EOF capability but wants a filtered RDB. This is because we force filtered + * RDB's to be generated over a socket and not through a file to avoid conflicts with the snapshot files. Forcing + * use of a socket is handled, if needed, in `startBgsaveForReplication`. */ + if ((c->slave_req & SLAVE_REQ_RDB_FUNCTIONS_ONLY) && !(c->slave_capa & SLAVE_CAPA_EOF)) { + addReplyError(c,"Filtered replica requires EOF capability"); + return; + } + serverLog(LL_NOTICE,"Replica %s asks for synchronization", replicationGetSlaveName(c)); @@ -1025,8 +1042,10 @@ void syncCommand(client *c) { break; } /* To attach this slave, we check that it has at least all the - * capabilities of the slave that triggered the current BGSAVE. */ - if (ln && ((c->slave_capa & slave->slave_capa) == slave->slave_capa)) { + * capabilities of the slave that triggered the current BGSAVE + * and its exact requirements. */ + if (ln && ((c->slave_capa & slave->slave_capa) == slave->slave_capa) && + c->slave_req == slave->slave_req) { /* Perfect, the server is already registering differences for * another slave. Set the right state, and copy the buffer. * We don't copy buffer if clients don't want. */ @@ -1062,7 +1081,7 @@ void syncCommand(client *c) { /* We don't have a BGSAVE in progress, let's start one. Diskless * or disk-based mode is determined by replica's capacity. */ if (!hasActiveChildProcess()) { - startBgsaveForReplication(c->slave_capa); + startBgsaveForReplication(c->slave_capa, c->slave_req); } else { serverLog(LL_NOTICE, "No BGSAVE in progress, but another BG operation is active. " @@ -1100,8 +1119,12 @@ void syncCommand(client *c) { * Unlike other subcommands, this is used by master to get the replication * offset from a replica. * - * - rdb-only - * Only wants RDB snapshot without replication buffer. */ + * - rdb-only <0|1> + * Only wants RDB snapshot without replication buffer. + * + * - rdb-filter-only <include-filters> + * Define "include" filters for the RDB snapshot. Currently we only support + * a single include filter: "functions". */ void replconfCommand(client *c) { int j; @@ -1177,6 +1200,30 @@ void replconfCommand(client *c) { return; if (rdb_only == 1) c->flags |= CLIENT_REPL_RDBONLY; else c->flags &= ~CLIENT_REPL_RDBONLY; + } else if (!strcasecmp(c->argv[j]->ptr,"rdb-filter-only")) { + /* REPLCONFG RDB-FILTER-ONLY is used to define "include" filters + * for the RDB snapshot. Currently we only support a single + * include filter: "functions". In the future we may want to add + * other filters like key patterns, key types, non-volatile, module + * aux fields, ... + * We might want to add the complementing "RDB-FILTER-EXCLUDE" to + * filter out certain data. */ + int filter_count, i; + sds *filters; + if (!(filters = sdssplitargs(c->argv[j+1]->ptr, &filter_count))) { + addReplyErrorFormat(c, "Missing rdb-filter-only values"); + return; + } + for (i = 0; i < filter_count; i++) { + if (!strcasecmp(filters[i], "functions")) + c->slave_req |= SLAVE_REQ_RDB_FUNCTIONS_ONLY; + else { + addReplyErrorFormat(c, "Unsupported rdb-filter-only option: %s", (char*)filters[i]); + sdsfreesplitres(filters, filter_count); + return; + } + } + sdsfreesplitres(filters, filter_count); } else { addReplyErrorFormat(c,"Unrecognized REPLCONF option: %s", (char*)c->argv[j]->ptr); @@ -3633,8 +3680,8 @@ void replicationCron(void) { replication_cron_loops++; /* Incremented with frequency 1 HZ. */ } -void replicationStartPendingFork(void) { - /* Start a BGSAVE good for replication if we have slaves in +int shouldStartChildReplication(int *mincapa_out, int *req_out) { + /* We should start a BGSAVE good for replication if we have slaves in * WAIT_BGSAVE_START state. * * In case of diskless replication, we make sure to wait the specified @@ -3643,7 +3690,9 @@ void replicationStartPendingFork(void) { if (!hasActiveChildProcess()) { time_t idle, max_idle = 0; int slaves_waiting = 0; - int mincapa = -1; + int mincapa; + int req; + int first = 1; listNode *ln; listIter li; @@ -3651,11 +3700,18 @@ void replicationStartPendingFork(void) { while((ln = listNext(&li))) { client *slave = ln->value; if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) { + if (first) { + /* Get first slave's requirements */ + req = slave->slave_req; + } else if (req != slave->slave_req) { + /* Skip slaves that don't match */ + continue; + } idle = server.unixtime - slave->lastinteraction; if (idle > max_idle) max_idle = idle; slaves_waiting++; - mincapa = (mincapa == -1) ? slave->slave_capa : - (mincapa & slave->slave_capa); + mincapa = first ? slave->slave_capa : (mincapa & slave->slave_capa); + first = 0; } } @@ -3663,12 +3719,27 @@ void replicationStartPendingFork(void) { (!server.repl_diskless_sync || max_idle >= server.repl_diskless_sync_delay)) { - /* Start the BGSAVE. The called function may start a - * BGSAVE with socket target or disk target depending on the - * configuration and slaves capabilities. */ - startBgsaveForReplication(mincapa); + if (mincapa_out) + *mincapa_out = mincapa; + if (req_out) + *req_out = req; + return 1; } } + + return 0; +} + +void replicationStartPendingFork(void) { + int mincapa = -1; + int req = -1; + + if (shouldStartChildReplication(&mincapa, &req)) { + /* Start the BGSAVE. The called function may start a + * BGSAVE with socket target or disk target depending on the + * configuration and slaves capabilities and requirements. */ + startBgsaveForReplication(mincapa, req); + } } /* Find replica at IP:PORT from replica list */ diff --git a/src/server.c b/src/server.c index 6d531449e..a5a95d1d8 100644 --- a/src/server.c +++ b/src/server.c @@ -1209,7 +1209,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { sp->changes, (int)sp->seconds); rdbSaveInfo rsi, *rsiptr; rsiptr = rdbPopulateSaveInfo(&rsi); - rdbSaveBackground(server.rdb_filename,rsiptr); + rdbSaveBackground(SLAVE_REQ_NONE, server.rdb_filename,rsiptr); break; } } @@ -1298,7 +1298,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { { rdbSaveInfo rsi, *rsiptr; rsiptr = rdbPopulateSaveInfo(&rsi); - if (rdbSaveBackground(server.rdb_filename,rsiptr) == C_OK) + if (rdbSaveBackground(SLAVE_REQ_NONE, server.rdb_filename,rsiptr) == C_OK) server.rdb_bgsave_scheduled = 0; } @@ -3692,7 +3692,7 @@ int prepareForShutdown(int flags) { /* Snapshotting. Perform a SYNC SAVE and exit */ rdbSaveInfo rsi, *rsiptr; rsiptr = rdbPopulateSaveInfo(&rsi); - if (rdbSave(server.rdb_filename,rsiptr) != C_OK) { + if (rdbSave(SLAVE_REQ_NONE, server.rdb_filename,rsiptr) != C_OK) { /* Ooops.. error saving! The best we can do is to continue * operating. Note that if there was a background saving process, * in the next cron() Redis will be notified that the background diff --git a/src/server.h b/src/server.h index 4536d3b5b..e3cf50b65 100644 --- a/src/server.h +++ b/src/server.h @@ -380,6 +380,10 @@ typedef enum { #define SLAVE_CAPA_EOF (1<<0) /* Can parse the RDB EOF streaming format. */ #define SLAVE_CAPA_PSYNC2 (1<<1) /* Supports PSYNC2 protocol. */ +/* Slave requirements */ +#define SLAVE_REQ_NONE 0 +#define SLAVE_REQ_RDB_FUNCTIONS_ONLY (1 << 0) + /* Synchronous read timeout - slave side */ #define CONFIG_REPL_SYNCIO_TIMEOUT 5 @@ -1058,6 +1062,7 @@ typedef struct client { int slave_listening_port; /* As configured with: REPLCONF listening-port */ char *slave_addr; /* Optionally given by REPLCONF ip-address */ int slave_capa; /* Slave capabilities: SLAVE_CAPA_* bitwise OR. */ + int slave_req; /* Slave requirements: SLAVE_REQ_* */ multiState mstate; /* MULTI/EXEC state */ int btype; /* Type of blocking op if CLIENT_BLOCKED. */ blockingState bpop; /* blocking state */ diff --git a/tests/integration/redis-cli.tcl b/tests/integration/redis-cli.tcl index 4b9ce38d1..e858002f1 100644 --- a/tests/integration/redis-cli.tcl +++ b/tests/integration/redis-cli.tcl @@ -315,34 +315,53 @@ if {!$::tls} { ;# fake_redis_node doesn't support TLS file delete $tmpfile } - proc test_redis_cli_rdb_dump {} { + proc test_redis_cli_rdb_dump {functions_only} { r flushdb + r function flush set dir [lindex [r config get dir] 1] assert_equal "OK" [r debug populate 100000 key 1000] - catch {run_cli --rdb "$dir/cli.rdb"} output + assert_equal "OK" [r function create lua func1 "return 123"] + if {$functions_only} { + set args "--functions-rdb $dir/cli.rdb" + } else { + set args "--rdb $dir/cli.rdb" + } + catch {run_cli {*}$args} output assert_match {*Transfer finished with success*} $output file delete "$dir/dump.rdb" file rename "$dir/cli.rdb" "$dir/dump.rdb" assert_equal "OK" [r set should-not-exist 1] + assert_equal "OK" [r function create lua should_not_exist_func "return 456"] assert_equal "OK" [r debug reload nosave] assert_equal {} [r get should-not-exist] + assert_error "ERR Function does not exists" {r function info should_not_exist_func} + assert_equal "func1" [dict get [r function info func1] name] + if {$functions_only} { + assert_equal 0 [r dbsize] + } else { + assert_equal 100000 [r dbsize] + } } - test "Dumping an RDB" { + foreach {functions_only} {no yes} { + + test "Dumping an RDB - functions only: $functions_only" { # Disk-based master assert_match "OK" [r config set repl-diskless-sync no] - test_redis_cli_rdb_dump + test_redis_cli_rdb_dump $functions_only # Disk-less master assert_match "OK" [r config set repl-diskless-sync yes] assert_match "OK" [r config set repl-diskless-sync-delay 0] - test_redis_cli_rdb_dump + test_redis_cli_rdb_dump $functions_only } {} {needs:repl needs:debug} + } ;# foreach functions_only + test "Scan mode" { r flushdb populate 1000 key: 1 |