summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authoryoav-steinberg <yoav@monfort.co.il>2022-01-02 09:39:01 +0200
committerGitHub <noreply@github.com>2022-01-02 09:39:01 +0200
commit1bf6d6f11eb08d98ba3de688d18d805a2d8696d5 (patch)
treee808f1f65487fbeae9eeb281b1d33a1fd8baaf63
parent888e92eb5797c1a6ce4c7b89c5814dbe13a8d8e3 (diff)
downloadredis-1bf6d6f11eb08d98ba3de688d18d805a2d8696d5.tar.gz
Generate RDB with Functions only via redis-cli --functions-rdb (#9968)
This is needed in order to ease the deployment of functions for ephemeral cases, where user needs to spin up a server with functions pre-loaded. #### Details: * Added `--functions-rdb` option to _redis-cli_. * Functions only rdb via `REPLCONF rdb-filter-only functions`. This is a placeholder for a space separated inclusion filter for the RDB. In the future can be `REPLCONF rdb-filter-only "functions db:3 key-patten:user*"` and a complementing `rdb-filter-exclude` `REPLCONF` can also be added. * Handle "slave requirements" specification to RDB saving code so we can use the same RDB when different slaves express the same requirements (like functions-only) and not share the RDB when their requirements differ. This is currently just a flags `int`, but can be extended to a more complex structure with various filter fields. * make sure to support filters only in diskless replication mode (not to override the persistence file), we do that by forcing diskless (even if disabled by config) other changes: * some refactoring in rdb.c (extract portion of a big function to a sub-function) * rdb_key_save_delay used in AOFRW too * sendChildInfo takes the number of updated keys (incremental, rather than absolute) Co-authored-by: Oran Agra <oran@redislabs.com>
-rw-r--r--src/aof.c6
-rw-r--r--src/db.c2
-rw-r--r--src/debug.c2
-rw-r--r--src/functions.c2
-rw-r--r--src/networking.c1
-rw-r--r--src/rdb.c220
-rw-r--r--src/rdb.h10
-rw-r--r--src/redis-cli.c27
-rw-r--r--src/replication.c111
-rw-r--r--src/server.c6
-rw-r--r--src/server.h5
-rw-r--r--tests/integration/redis-cli.tcl29
12 files changed, 288 insertions, 133 deletions
diff --git a/src/aof.c b/src/aof.c
index 1647dd009..764b5e0cf 100644
--- a/src/aof.c
+++ b/src/aof.c
@@ -1509,6 +1509,10 @@ int rewriteAppendOnlyFileRio(rio *aof) {
updated_time = now;
}
}
+
+ /* Delay before next key if required (for testing) */
+ if (server.rdb_key_save_delay)
+ debugDelay(server.rdb_key_save_delay);
}
dictReleaseIterator(di);
di = NULL;
@@ -1552,7 +1556,7 @@ int rewriteAppendOnlyFile(char *filename) {
if (server.aof_use_rdb_preamble) {
int error;
- if (rdbSaveRio(&aof,&error,RDBFLAGS_AOF_PREAMBLE,NULL) == C_ERR) {
+ if (rdbSaveRio(SLAVE_REQ_NONE,&aof,&error,RDBFLAGS_AOF_PREAMBLE,NULL) == C_ERR) {
errno = error;
goto werr;
}
diff --git a/src/db.c b/src/db.c
index ff7545e6a..d19c4d92a 100644
--- a/src/db.c
+++ b/src/db.c
@@ -599,7 +599,7 @@ void flushAllDataAndResetRDB(int flags) {
int saved_dirty = server.dirty;
rdbSaveInfo rsi, *rsiptr;
rsiptr = rdbPopulateSaveInfo(&rsi);
- rdbSave(server.rdb_filename,rsiptr);
+ rdbSave(SLAVE_REQ_NONE,server.rdb_filename,rsiptr);
server.dirty = saved_dirty;
}
diff --git a/src/debug.c b/src/debug.c
index d5a180573..f01509e84 100644
--- a/src/debug.c
+++ b/src/debug.c
@@ -544,7 +544,7 @@ NULL
if (save) {
rdbSaveInfo rsi, *rsiptr;
rsiptr = rdbPopulateSaveInfo(&rsi);
- if (rdbSave(server.rdb_filename,rsiptr) != C_OK) {
+ if (rdbSave(SLAVE_REQ_NONE,server.rdb_filename,rsiptr) != C_OK) {
addReplyErrorObject(c,shared.err);
return;
}
diff --git a/src/functions.c b/src/functions.c
index 2da142ecb..a515709ca 100644
--- a/src/functions.c
+++ b/src/functions.c
@@ -408,7 +408,7 @@ void functionDumpCommand(client *c) {
rio payload;
rioInitWithBuffer(&payload, sdsempty());
- functionsSaveRio(&payload);
+ rdbSaveFunctions(&payload);
/* RDB version */
buf[0] = RDB_VERSION & 0xff;
diff --git a/src/networking.c b/src/networking.c
index 238af0b8f..d0c74faba 100644
--- a/src/networking.c
+++ b/src/networking.c
@@ -171,6 +171,7 @@ client *createClient(connection *conn) {
c->slave_listening_port = 0;
c->slave_addr = NULL;
c->slave_capa = SLAVE_CAPA_NONE;
+ c->slave_req = SLAVE_REQ_NONE;
c->reply = listCreate();
c->reply_bytes = 0;
c->obuf_soft_limit_reached_time = 0;
diff --git a/src/rdb.c b/src/rdb.c
index 9fa6638e7..2b7a8fa81 100644
--- a/src/rdb.c
+++ b/src/rdb.c
@@ -1214,28 +1214,119 @@ ssize_t rdbSaveSingleModuleAux(rio *rdb, int when, moduleType *mt) {
return io.bytes;
}
-int functionsSaveRio(rio *rdb) {
- int ret = C_ERR;
+ssize_t rdbSaveFunctions(rio *rdb) {
dict *functions = functionsGet();
dictIterator *iter = dictGetIterator(functions);
dictEntry *entry = NULL;
+ ssize_t written = 0;
+ ssize_t ret;
while ((entry = dictNext(iter))) {
- rdbSaveType(rdb, RDB_OPCODE_FUNCTION);
+ if ((ret = rdbSaveType(rdb, RDB_OPCODE_FUNCTION)) < 0) goto werr;
+ written += ret;
functionInfo *fi = dictGetVal(entry);
- if (rdbSaveRawString(rdb, (unsigned char *) fi->name, sdslen(fi->name)) == -1) goto done;
- if (rdbSaveRawString(rdb, (unsigned char *) fi->ei->name, sdslen(fi->ei->name)) == -1) goto done;
+ if ((ret = rdbSaveRawString(rdb, (unsigned char *) fi->name, sdslen(fi->name))) < 0) goto werr;
+ written += ret;
+ if ((ret = rdbSaveRawString(rdb, (unsigned char *) fi->ei->name, sdslen(fi->ei->name))) < 0) goto werr;
+ written += ret;
if (fi->desc) {
- if (rdbSaveLen(rdb, 1) == -1) goto done; /* desc exists */
- if (rdbSaveRawString(rdb, (unsigned char *) fi->desc, sdslen(fi->desc)) == -1) goto done;
+ /* desc exists */
+ if ((ret = rdbSaveLen(rdb, 1)) < 0) goto werr;
+ written += ret;
+ if ((ret = rdbSaveRawString(rdb, (unsigned char *) fi->desc, sdslen(fi->desc))) < 0) goto werr;
+ written += ret;
} else {
- if (rdbSaveLen(rdb, 0) == -1) goto done; /* desc not exists */
+ /* desc not exists */
+ if ((ret = rdbSaveLen(rdb, 0)) < 0) goto werr;
+ written += ret;
}
- if (rdbSaveRawString(rdb, (unsigned char *) fi->code, sdslen(fi->code)) == -1) goto done;
+ if ((ret = rdbSaveRawString(rdb, (unsigned char *) fi->code, sdslen(fi->code))) < 0) goto werr;
+ written += ret;
}
- ret = C_OK;
-done:
dictReleaseIterator(iter);
- return ret;
+ return written;
+
+werr:
+ dictReleaseIterator(iter);
+ return -1;
+}
+
+ssize_t rdbSaveDb(rio *rdb, int dbid, int rdbflags, long *key_counter) {
+ dictIterator *di;
+ dictEntry *de;
+ ssize_t written = 0;
+ ssize_t res;
+ static long long info_updated_time = 0;
+ size_t processed = 0;
+ char *pname = (rdbflags & RDBFLAGS_AOF_PREAMBLE) ? "AOF rewrite" : "RDB";
+
+ redisDb *db = server.db + dbid;
+ dict *d = db->dict;
+ if (dictSize(d) == 0) return 0;
+ di = dictGetSafeIterator(d);
+
+ /* Write the SELECT DB opcode */
+ if ((res = rdbSaveType(rdb,RDB_OPCODE_SELECTDB)) < 0) goto werr;
+ written += res;
+ if ((res = rdbSaveLen(rdb, dbid)) < 0) goto werr;
+ written += res;
+
+ /* Write the RESIZE DB opcode. */
+ uint64_t db_size, expires_size;
+ db_size = dictSize(db->dict);
+ expires_size = dictSize(db->expires);
+ if ((res = rdbSaveType(rdb,RDB_OPCODE_RESIZEDB)) < 0) goto werr;
+ written += res;
+ if ((res = rdbSaveLen(rdb,db_size)) < 0) goto werr;
+ written += res;
+ if ((res = rdbSaveLen(rdb,expires_size)) < 0) goto werr;
+ written += res;
+
+ /* Iterate this DB writing every entry */
+ while((de = dictNext(di)) != NULL) {
+ sds keystr = dictGetKey(de);
+ robj key, *o = dictGetVal(de);
+ long long expire;
+ size_t rdb_bytes_before_key = rdb->processed_bytes;
+
+ initStaticStringObject(key,keystr);
+ expire = getExpire(db,&key);
+ if ((res = rdbSaveKeyValuePair(rdb, &key, o, expire, dbid)) < 0) goto werr;
+ written += res;
+
+ /* In fork child process, we can try to release memory back to the
+ * OS and possibly avoid or decrease COW. We give the dismiss
+ * mechanism a hint about an estimated size of the object we stored. */
+ size_t dump_size = rdb->processed_bytes - rdb_bytes_before_key;
+ if (server.in_fork_child) dismissObject(o, dump_size);
+
+ /* When this RDB is produced as part of an AOF rewrite, move
+ * accumulated diff from parent to child while rewriting in
+ * order to have a smaller final write. */
+ if (rdbflags & RDBFLAGS_AOF_PREAMBLE &&
+ rdb->processed_bytes > processed+AOF_READ_DIFF_INTERVAL_BYTES)
+ {
+ processed = rdb->processed_bytes;
+ aofReadDiffFromParent();
+ }
+
+ /* Update child info every 1 second (approximately).
+ * in order to avoid calling mstime() on each iteration, we will
+ * check the diff every 1024 keys */
+ if (((*key_counter)++ & 1023) == 0) {
+ long long now = mstime();
+ if (now - info_updated_time >= 1000) {
+ sendChildInfo(CHILD_INFO_TYPE_CURRENT_INFO, *key_counter, pname);
+ info_updated_time = now;
+ }
+ }
+ }
+
+ dictReleaseIterator(di);
+ return written;
+
+werr:
+ dictReleaseIterator(di);
+ return -1;
}
/* Produces a dump of the database in RDB format sending it to the specified
@@ -1246,87 +1337,30 @@ done:
* When the function returns C_ERR and if 'error' is not NULL, the
* integer pointed by 'error' is set to the value of errno just after the I/O
* error. */
-int rdbSaveRio(rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi) {
- dictIterator *di = NULL;
- dictEntry *de;
+int rdbSaveRio(int req, rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi) {
char magic[10];
uint64_t cksum;
- size_t processed = 0;
+ long key_counter = 0;
int j;
- long key_count = 0;
- long long info_updated_time = 0;
- char *pname = (rdbflags & RDBFLAGS_AOF_PREAMBLE) ? "AOF rewrite" : "RDB";
if (server.rdb_checksum)
rdb->update_cksum = rioGenericUpdateChecksum;
snprintf(magic,sizeof(magic),"REDIS%04d",RDB_VERSION);
if (rdbWriteRaw(rdb,magic,9) == -1) goto werr;
if (rdbSaveInfoAuxFields(rdb,rdbflags,rsi) == -1) goto werr;
- if (rdbSaveModulesAux(rdb, REDISMODULE_AUX_BEFORE_RDB) == -1) goto werr;
-
- if (functionsSaveRio(rdb) != C_OK) goto werr;
-
- for (j = 0; j < server.dbnum; j++) {
- redisDb *db = server.db+j;
- dict *d = db->dict;
- if (dictSize(d) == 0) continue;
- di = dictGetSafeIterator(d);
-
- /* Write the SELECT DB opcode */
- if (rdbSaveType(rdb,RDB_OPCODE_SELECTDB) == -1) goto werr;
- if (rdbSaveLen(rdb,j) == -1) goto werr;
-
- /* Write the RESIZE DB opcode. */
- uint64_t db_size, expires_size;
- db_size = dictSize(db->dict);
- expires_size = dictSize(db->expires);
- if (rdbSaveType(rdb,RDB_OPCODE_RESIZEDB) == -1) goto werr;
- if (rdbSaveLen(rdb,db_size) == -1) goto werr;
- if (rdbSaveLen(rdb,expires_size) == -1) goto werr;
-
- /* Iterate this DB writing every entry */
- while((de = dictNext(di)) != NULL) {
- sds keystr = dictGetKey(de);
- robj key, *o = dictGetVal(de);
- long long expire;
- size_t rdb_bytes_before_key = rdb->processed_bytes;
-
- initStaticStringObject(key,keystr);
- expire = getExpire(db,&key);
- if (rdbSaveKeyValuePair(rdb,&key,o,expire,j) == -1) goto werr;
-
- /* In fork child process, we can try to release memory back to the
- * OS and possibly avoid or decrease COW. We give the dismiss
- * mechanism a hint about an estimated size of the object we stored. */
- size_t dump_size = rdb->processed_bytes - rdb_bytes_before_key;
- if (server.in_fork_child) dismissObject(o, dump_size);
-
- /* When this RDB is produced as part of an AOF rewrite, move
- * accumulated diff from parent to child while rewriting in
- * order to have a smaller final write. */
- if (rdbflags & RDBFLAGS_AOF_PREAMBLE &&
- rdb->processed_bytes > processed+AOF_READ_DIFF_INTERVAL_BYTES)
- {
- processed = rdb->processed_bytes;
- aofReadDiffFromParent();
- }
-
- /* Update child info every 1 second (approximately).
- * in order to avoid calling mstime() on each iteration, we will
- * check the diff every 1024 keys */
- if ((key_count++ & 1023) == 0) {
- long long now = mstime();
- if (now - info_updated_time >= 1000) {
- sendChildInfo(CHILD_INFO_TYPE_CURRENT_INFO, key_count, pname);
- info_updated_time = now;
- }
- }
+ if (!(req & SLAVE_REQ_RDB_FUNCTIONS_ONLY) && rdbSaveModulesAux(rdb, REDISMODULE_AUX_BEFORE_RDB) == -1) goto werr;
+
+ /* save functions */
+ if (rdbSaveFunctions(rdb) == -1) goto werr;
+
+ /* save all databases, skip this if we're in functions-only mode */
+ if (!(req & SLAVE_REQ_RDB_FUNCTIONS_ONLY)) {
+ for (j = 0; j < server.dbnum; j++) {
+ if (rdbSaveDb(rdb, j, rdbflags, &key_counter) == -1) goto werr;
}
- dictReleaseIterator(di);
- di = NULL; /* So that we don't release it again on error. */
}
- if (rdbSaveModulesAux(rdb, REDISMODULE_AUX_AFTER_RDB) == -1) goto werr;
+ if (!(req & SLAVE_REQ_RDB_FUNCTIONS_ONLY) && rdbSaveModulesAux(rdb, REDISMODULE_AUX_AFTER_RDB) == -1) goto werr;
/* EOF opcode */
if (rdbSaveType(rdb,RDB_OPCODE_EOF) == -1) goto werr;
@@ -1340,7 +1374,6 @@ int rdbSaveRio(rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi) {
werr:
if (error) *error = errno;
- if (di) dictReleaseIterator(di);
return C_ERR;
}
@@ -1352,7 +1385,7 @@ werr:
* While the suffix is the 40 bytes hex string we announced in the prefix.
* This way processes receiving the payload can understand when it ends
* without doing any processing of the content. */
-int rdbSaveRioWithEOFMark(rio *rdb, int *error, rdbSaveInfo *rsi) {
+int rdbSaveRioWithEOFMark(int req, rio *rdb, int *error, rdbSaveInfo *rsi) {
char eofmark[RDB_EOF_MARK_SIZE];
startSaving(RDBFLAGS_REPLICATION);
@@ -1361,7 +1394,7 @@ int rdbSaveRioWithEOFMark(rio *rdb, int *error, rdbSaveInfo *rsi) {
if (rioWrite(rdb,"$EOF:",5) == 0) goto werr;
if (rioWrite(rdb,eofmark,RDB_EOF_MARK_SIZE) == 0) goto werr;
if (rioWrite(rdb,"\r\n",2) == 0) goto werr;
- if (rdbSaveRio(rdb,error,RDBFLAGS_NONE,rsi) == C_ERR) goto werr;
+ if (rdbSaveRio(req,rdb,error,RDBFLAGS_NONE,rsi) == C_ERR) goto werr;
if (rioWrite(rdb,eofmark,RDB_EOF_MARK_SIZE) == 0) goto werr;
stopSaving(1);
return C_OK;
@@ -1374,7 +1407,7 @@ werr: /* Write error. */
}
/* Save the DB on disk. Return C_ERR on error, C_OK on success. */
-int rdbSave(char *filename, rdbSaveInfo *rsi) {
+int rdbSave(int req, char *filename, rdbSaveInfo *rsi) {
char tmpfile[256];
char cwd[MAXPATHLEN]; /* Current working dir path for error messages. */
FILE *fp = NULL;
@@ -1401,7 +1434,7 @@ int rdbSave(char *filename, rdbSaveInfo *rsi) {
if (server.rdb_save_incremental_fsync)
rioSetAutoSync(&rdb,REDIS_AUTOSYNC_BYTES);
- if (rdbSaveRio(&rdb,&error,RDBFLAGS_NONE,rsi) == C_ERR) {
+ if (rdbSaveRio(req,&rdb,&error,RDBFLAGS_NONE,rsi) == C_ERR) {
errno = error;
goto werr;
}
@@ -1444,7 +1477,7 @@ werr:
return C_ERR;
}
-int rdbSaveBackground(char *filename, rdbSaveInfo *rsi) {
+int rdbSaveBackground(int req, char *filename, rdbSaveInfo *rsi) {
pid_t childpid;
if (hasActiveChildProcess()) return C_ERR;
@@ -1458,7 +1491,7 @@ int rdbSaveBackground(char *filename, rdbSaveInfo *rsi) {
/* Child */
redisSetProcTitle("redis-rdb-bgsave");
redisSetCpuAffinity(server.bgsave_cpulist);
- retval = rdbSave(filename,rsi);
+ retval = rdbSave(req, filename,rsi);
if (retval == C_OK) {
sendChildCowInfo(CHILD_INFO_TYPE_RDB_COW_SIZE, "RDB");
}
@@ -3249,7 +3282,7 @@ void killRDBChild(void) {
/* Spawn an RDB child that writes the RDB to the sockets of the slaves
* that are currently in SLAVE_STATE_WAIT_BGSAVE_START state. */
-int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) {
+int rdbSaveToSlavesSockets(int req, rdbSaveInfo *rsi) {
listNode *ln;
listIter li;
pid_t childpid;
@@ -3288,6 +3321,9 @@ int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) {
while((ln = listNext(&li))) {
client *slave = ln->value;
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
+ /* Check slave has the exact requirements */
+ if (slave->slave_req != req)
+ continue;
server.rdb_pipe_conns[server.rdb_pipe_numconns++] = slave->conn;
replicationSetupSlaveForFullResync(slave,getPsyncInitialOffset());
}
@@ -3304,7 +3340,7 @@ int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) {
redisSetProcTitle("redis-rdb-to-slaves");
redisSetCpuAffinity(server.bgsave_cpulist);
- retval = rdbSaveRioWithEOFMark(&rdb,NULL,rsi);
+ retval = rdbSaveRioWithEOFMark(req,&rdb,NULL,rsi);
if (retval == C_OK && rioFlush(&rdb) == 0)
retval = C_ERR;
@@ -3366,7 +3402,7 @@ void saveCommand(client *c) {
}
rdbSaveInfo rsi, *rsiptr;
rsiptr = rdbPopulateSaveInfo(&rsi);
- if (rdbSave(server.rdb_filename,rsiptr) == C_OK) {
+ if (rdbSave(SLAVE_REQ_NONE, server.rdb_filename,rsiptr) == C_OK) {
addReply(c,shared.ok);
} else {
addReplyErrorObject(c,shared.err);
@@ -3403,7 +3439,7 @@ void bgsaveCommand(client *c) {
"Use BGSAVE SCHEDULE in order to schedule a BGSAVE whenever "
"possible.");
}
- } else if (rdbSaveBackground(server.rdb_filename,rsiptr) == C_OK) {
+ } else if (rdbSaveBackground(SLAVE_REQ_NONE,server.rdb_filename,rsiptr) == C_OK) {
addReplyStatus(c,"Background saving started");
} else {
addReplyErrorObject(c,shared.err);
diff --git a/src/rdb.h b/src/rdb.h
index 5942c4333..f2a5a28fe 100644
--- a/src/rdb.h
+++ b/src/rdb.h
@@ -148,10 +148,10 @@ int rdbLoadLenByRef(rio *rdb, int *isencoded, uint64_t *lenptr);
int rdbSaveObjectType(rio *rdb, robj *o);
int rdbLoadObjectType(rio *rdb);
int rdbLoad(char *filename, rdbSaveInfo *rsi, int rdbflags);
-int rdbSaveBackground(char *filename, rdbSaveInfo *rsi);
-int rdbSaveToSlavesSockets(rdbSaveInfo *rsi);
+int rdbSaveBackground(int req, char *filename, rdbSaveInfo *rsi);
+int rdbSaveToSlavesSockets(int req, rdbSaveInfo *rsi);
void rdbRemoveTempFile(pid_t childpid, int from_signal);
-int rdbSave(char *filename, rdbSaveInfo *rsi);
+int rdbSave(int req, char *filename, rdbSaveInfo *rsi);
ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key, int dbid);
size_t rdbSavedObjectLen(robj *o, robj *key, int dbid);
robj *rdbLoadObject(int type, rio *rdb, sds key, int dbid, int *error);
@@ -170,8 +170,8 @@ int rdbLoadBinaryFloatValue(rio *rdb, float *val);
int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi);
int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadingCtx *rdb_loading_ctx);
int rdbFunctionLoad(rio *rdb, int ver, functionsCtx* functions_ctx, int rdbflags, sds *err);
-int rdbSaveRio(rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi);
-int functionsSaveRio(rio *rdb);
+int rdbSaveRio(int req, rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi);
+ssize_t rdbSaveFunctions(rio *rdb);
rdbSaveInfo *rdbPopulateSaveInfo(rdbSaveInfo *rsi);
#endif
diff --git a/src/redis-cli.c b/src/redis-cli.c
index 2fb7d9ef0..397e3bfa8 100644
--- a/src/redis-cli.c
+++ b/src/redis-cli.c
@@ -226,6 +226,7 @@ static struct config {
int pipe_mode;
int pipe_timeout;
int getrdb_mode;
+ int get_functions_rdb_mode;
int stat_mode;
int scan_mode;
int intrinsic_latency_mode;
@@ -1643,6 +1644,9 @@ static int parseOptions(int argc, char **argv) {
} else if (!strcmp(argv[i],"--rdb") && !lastarg) {
config.getrdb_mode = 1;
config.rdb_filename = argv[++i];
+ } else if (!strcmp(argv[i],"--functions-rdb") && !lastarg) {
+ config.get_functions_rdb_mode = 1;
+ config.rdb_filename = argv[++i];
} else if (!strcmp(argv[i],"--pipe")) {
config.pipe_mode = 1;
} else if (!strcmp(argv[i],"--pipe-timeout") && !lastarg) {
@@ -1848,6 +1852,11 @@ static int parseOptions(int argc, char **argv) {
" line interface may not be safe.\n", stderr);
}
+ if (config.get_functions_rdb_mode && config.getrdb_mode) {
+ fprintf(stderr,"Option --functions-rdb and --rdb are mutually exclusive.\n");
+ exit(1);
+ }
+
if (config.stdin_lastarg && config.stdin_tag_arg) {
fprintf(stderr, "Options -x and -X are mutually exclusive.\n");
exit(1);
@@ -1949,6 +1958,8 @@ static void usage(int err) {
" --replica Simulate a replica showing commands received from the master.\n"
" --rdb <filename> Transfer an RDB dump from remote server to local file.\n"
" Use filename of \"-\" to write to stdout.\n"
+" --functions-rdb <filename> Like --rdb but only get the functions (not the keys)\n"
+" when getting the RDB dump file.\n"
" --pipe Transfer raw Redis protocol from stdin to server.\n"
" --pipe-timeout <n> In --pipe mode, abort with error if after sending all data.\n"
" no reply is received within <n> seconds.\n"
@@ -7158,7 +7169,8 @@ static void latencyDistMode(void) {
#define RDB_EOF_MARK_SIZE 40
-void sendReplconf(const char* arg1, const char* arg2) {
+int sendReplconf(const char* arg1, const char* arg2) {
+ int res = 1;
fprintf(stderr, "sending REPLCONF %s %s\n", arg1, arg2);
redisReply *reply = redisCommand(context, "REPLCONF %s %s", arg1, arg2);
@@ -7167,10 +7179,12 @@ void sendReplconf(const char* arg1, const char* arg2) {
fprintf(stderr, "\nI/O error\n");
exit(1);
} else if(reply->type == REDIS_REPLY_ERROR) {
- fprintf(stderr, "REPLCONF %s error: %s\n", arg1, reply->str);
/* non fatal, old versions may not support it */
+ fprintf(stderr, "REPLCONF %s error: %s\n", arg1, reply->str);
+ res = 0;
}
freeReplyObject(reply);
+ return res;
}
void sendCapa() {
@@ -8411,6 +8425,7 @@ int main(int argc, char **argv) {
config.cluster_send_asking = 0;
config.slave_mode = 0;
config.getrdb_mode = 0;
+ config.get_functions_rdb_mode = 0;
config.stat_mode = 0;
config.scan_mode = 0;
config.intrinsic_latency_mode = 0;
@@ -8522,11 +8537,15 @@ int main(int argc, char **argv) {
slaveMode();
}
- /* Get RDB mode. */
- if (config.getrdb_mode) {
+ /* Get RDB/functions mode. */
+ if (config.getrdb_mode || config.get_functions_rdb_mode) {
if (cliConnect(0) == REDIS_ERR) exit(1);
sendCapa();
sendRdbOnly();
+ if (config.get_functions_rdb_mode && !sendReplconf("rdb-filter-only", "functions")) {
+ fprintf(stderr, "Failed requesting functions only RDB from server, aborting\n");
+ exit(1);
+ }
getRDB(NULL);
}
diff --git a/src/replication.c b/src/replication.c
index ade3ca1c9..e7a092645 100644
--- a/src/replication.c
+++ b/src/replication.c
@@ -826,12 +826,19 @@ need_full_resync:
* started.
*
* Returns C_OK on success or C_ERR otherwise. */
-int startBgsaveForReplication(int mincapa) {
+int startBgsaveForReplication(int mincapa, int req) {
int retval;
- int socket_target = server.repl_diskless_sync && (mincapa & SLAVE_CAPA_EOF);
+ int socket_target = 0;
listIter li;
listNode *ln;
+ /* We use a socket target if slave can handle the EOF marker and we're configured to do diskless syncs.
+ * Note that in case we're creating a "filtered" RDB (functions-only) we also force socket replication
+ * to avoid overwriting the snapshot RDB file with filtered data. */
+ socket_target = (server.repl_diskless_sync || (req & SLAVE_REQ_RDB_FUNCTIONS_ONLY)) && (mincapa & SLAVE_CAPA_EOF);
+ /* `SYNC` should have failed with error if we don't support socket and require a filter, assert this here */
+ serverAssert(socket_target || !(req & SLAVE_REQ_RDB_FUNCTIONS_ONLY));
+
serverLog(LL_NOTICE,"Starting BGSAVE for SYNC with target: %s",
socket_target ? "replicas sockets" : "disk");
@@ -841,9 +848,9 @@ int startBgsaveForReplication(int mincapa) {
* otherwise slave will miss repl-stream-db. */
if (rsiptr) {
if (socket_target)
- retval = rdbSaveToSlavesSockets(rsiptr);
+ retval = rdbSaveToSlavesSockets(req,rsiptr);
else
- retval = rdbSaveBackground(server.rdb_filename,rsiptr);
+ retval = rdbSaveBackground(req,server.rdb_filename,rsiptr);
} else {
serverLog(LL_WARNING,"BGSAVE for replication: replication information not available, can't generate the RDB file right now. Try later.");
retval = C_ERR;
@@ -886,8 +893,10 @@ int startBgsaveForReplication(int mincapa) {
client *slave = ln->value;
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
- replicationSetupSlaveForFullResync(slave,
- getPsyncInitialOffset());
+ /* Check slave has the exact requirements */
+ if (slave->slave_req != req)
+ continue;
+ replicationSetupSlaveForFullResync(slave, getPsyncInitialOffset());
}
}
}
@@ -946,6 +955,14 @@ void syncCommand(client *c) {
return;
}
+ /* Fail sync if slave doesn't support EOF capability but wants a filtered RDB. This is because we force filtered
+ * RDB's to be generated over a socket and not through a file to avoid conflicts with the snapshot files. Forcing
+ * use of a socket is handled, if needed, in `startBgsaveForReplication`. */
+ if ((c->slave_req & SLAVE_REQ_RDB_FUNCTIONS_ONLY) && !(c->slave_capa & SLAVE_CAPA_EOF)) {
+ addReplyError(c,"Filtered replica requires EOF capability");
+ return;
+ }
+
serverLog(LL_NOTICE,"Replica %s asks for synchronization",
replicationGetSlaveName(c));
@@ -1025,8 +1042,10 @@ void syncCommand(client *c) {
break;
}
/* To attach this slave, we check that it has at least all the
- * capabilities of the slave that triggered the current BGSAVE. */
- if (ln && ((c->slave_capa & slave->slave_capa) == slave->slave_capa)) {
+ * capabilities of the slave that triggered the current BGSAVE
+ * and its exact requirements. */
+ if (ln && ((c->slave_capa & slave->slave_capa) == slave->slave_capa) &&
+ c->slave_req == slave->slave_req) {
/* Perfect, the server is already registering differences for
* another slave. Set the right state, and copy the buffer.
* We don't copy buffer if clients don't want. */
@@ -1062,7 +1081,7 @@ void syncCommand(client *c) {
/* We don't have a BGSAVE in progress, let's start one. Diskless
* or disk-based mode is determined by replica's capacity. */
if (!hasActiveChildProcess()) {
- startBgsaveForReplication(c->slave_capa);
+ startBgsaveForReplication(c->slave_capa, c->slave_req);
} else {
serverLog(LL_NOTICE,
"No BGSAVE in progress, but another BG operation is active. "
@@ -1100,8 +1119,12 @@ void syncCommand(client *c) {
* Unlike other subcommands, this is used by master to get the replication
* offset from a replica.
*
- * - rdb-only
- * Only wants RDB snapshot without replication buffer. */
+ * - rdb-only <0|1>
+ * Only wants RDB snapshot without replication buffer.
+ *
+ * - rdb-filter-only <include-filters>
+ * Define "include" filters for the RDB snapshot. Currently we only support
+ * a single include filter: "functions". */
void replconfCommand(client *c) {
int j;
@@ -1177,6 +1200,30 @@ void replconfCommand(client *c) {
return;
if (rdb_only == 1) c->flags |= CLIENT_REPL_RDBONLY;
else c->flags &= ~CLIENT_REPL_RDBONLY;
+ } else if (!strcasecmp(c->argv[j]->ptr,"rdb-filter-only")) {
+ /* REPLCONFG RDB-FILTER-ONLY is used to define "include" filters
+ * for the RDB snapshot. Currently we only support a single
+ * include filter: "functions". In the future we may want to add
+ * other filters like key patterns, key types, non-volatile, module
+ * aux fields, ...
+ * We might want to add the complementing "RDB-FILTER-EXCLUDE" to
+ * filter out certain data. */
+ int filter_count, i;
+ sds *filters;
+ if (!(filters = sdssplitargs(c->argv[j+1]->ptr, &filter_count))) {
+ addReplyErrorFormat(c, "Missing rdb-filter-only values");
+ return;
+ }
+ for (i = 0; i < filter_count; i++) {
+ if (!strcasecmp(filters[i], "functions"))
+ c->slave_req |= SLAVE_REQ_RDB_FUNCTIONS_ONLY;
+ else {
+ addReplyErrorFormat(c, "Unsupported rdb-filter-only option: %s", (char*)filters[i]);
+ sdsfreesplitres(filters, filter_count);
+ return;
+ }
+ }
+ sdsfreesplitres(filters, filter_count);
} else {
addReplyErrorFormat(c,"Unrecognized REPLCONF option: %s",
(char*)c->argv[j]->ptr);
@@ -3633,8 +3680,8 @@ void replicationCron(void) {
replication_cron_loops++; /* Incremented with frequency 1 HZ. */
}
-void replicationStartPendingFork(void) {
- /* Start a BGSAVE good for replication if we have slaves in
+int shouldStartChildReplication(int *mincapa_out, int *req_out) {
+ /* We should start a BGSAVE good for replication if we have slaves in
* WAIT_BGSAVE_START state.
*
* In case of diskless replication, we make sure to wait the specified
@@ -3643,7 +3690,9 @@ void replicationStartPendingFork(void) {
if (!hasActiveChildProcess()) {
time_t idle, max_idle = 0;
int slaves_waiting = 0;
- int mincapa = -1;
+ int mincapa;
+ int req;
+ int first = 1;
listNode *ln;
listIter li;
@@ -3651,11 +3700,18 @@ void replicationStartPendingFork(void) {
while((ln = listNext(&li))) {
client *slave = ln->value;
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
+ if (first) {
+ /* Get first slave's requirements */
+ req = slave->slave_req;
+ } else if (req != slave->slave_req) {
+ /* Skip slaves that don't match */
+ continue;
+ }
idle = server.unixtime - slave->lastinteraction;
if (idle > max_idle) max_idle = idle;
slaves_waiting++;
- mincapa = (mincapa == -1) ? slave->slave_capa :
- (mincapa & slave->slave_capa);
+ mincapa = first ? slave->slave_capa : (mincapa & slave->slave_capa);
+ first = 0;
}
}
@@ -3663,12 +3719,27 @@ void replicationStartPendingFork(void) {
(!server.repl_diskless_sync ||
max_idle >= server.repl_diskless_sync_delay))
{
- /* Start the BGSAVE. The called function may start a
- * BGSAVE with socket target or disk target depending on the
- * configuration and slaves capabilities. */
- startBgsaveForReplication(mincapa);
+ if (mincapa_out)
+ *mincapa_out = mincapa;
+ if (req_out)
+ *req_out = req;
+ return 1;
}
}
+
+ return 0;
+}
+
+void replicationStartPendingFork(void) {
+ int mincapa = -1;
+ int req = -1;
+
+ if (shouldStartChildReplication(&mincapa, &req)) {
+ /* Start the BGSAVE. The called function may start a
+ * BGSAVE with socket target or disk target depending on the
+ * configuration and slaves capabilities and requirements. */
+ startBgsaveForReplication(mincapa, req);
+ }
}
/* Find replica at IP:PORT from replica list */
diff --git a/src/server.c b/src/server.c
index 6d531449e..a5a95d1d8 100644
--- a/src/server.c
+++ b/src/server.c
@@ -1209,7 +1209,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
sp->changes, (int)sp->seconds);
rdbSaveInfo rsi, *rsiptr;
rsiptr = rdbPopulateSaveInfo(&rsi);
- rdbSaveBackground(server.rdb_filename,rsiptr);
+ rdbSaveBackground(SLAVE_REQ_NONE, server.rdb_filename,rsiptr);
break;
}
}
@@ -1298,7 +1298,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
{
rdbSaveInfo rsi, *rsiptr;
rsiptr = rdbPopulateSaveInfo(&rsi);
- if (rdbSaveBackground(server.rdb_filename,rsiptr) == C_OK)
+ if (rdbSaveBackground(SLAVE_REQ_NONE, server.rdb_filename,rsiptr) == C_OK)
server.rdb_bgsave_scheduled = 0;
}
@@ -3692,7 +3692,7 @@ int prepareForShutdown(int flags) {
/* Snapshotting. Perform a SYNC SAVE and exit */
rdbSaveInfo rsi, *rsiptr;
rsiptr = rdbPopulateSaveInfo(&rsi);
- if (rdbSave(server.rdb_filename,rsiptr) != C_OK) {
+ if (rdbSave(SLAVE_REQ_NONE, server.rdb_filename,rsiptr) != C_OK) {
/* Ooops.. error saving! The best we can do is to continue
* operating. Note that if there was a background saving process,
* in the next cron() Redis will be notified that the background
diff --git a/src/server.h b/src/server.h
index 4536d3b5b..e3cf50b65 100644
--- a/src/server.h
+++ b/src/server.h
@@ -380,6 +380,10 @@ typedef enum {
#define SLAVE_CAPA_EOF (1<<0) /* Can parse the RDB EOF streaming format. */
#define SLAVE_CAPA_PSYNC2 (1<<1) /* Supports PSYNC2 protocol. */
+/* Slave requirements */
+#define SLAVE_REQ_NONE 0
+#define SLAVE_REQ_RDB_FUNCTIONS_ONLY (1 << 0)
+
/* Synchronous read timeout - slave side */
#define CONFIG_REPL_SYNCIO_TIMEOUT 5
@@ -1058,6 +1062,7 @@ typedef struct client {
int slave_listening_port; /* As configured with: REPLCONF listening-port */
char *slave_addr; /* Optionally given by REPLCONF ip-address */
int slave_capa; /* Slave capabilities: SLAVE_CAPA_* bitwise OR. */
+ int slave_req; /* Slave requirements: SLAVE_REQ_* */
multiState mstate; /* MULTI/EXEC state */
int btype; /* Type of blocking op if CLIENT_BLOCKED. */
blockingState bpop; /* blocking state */
diff --git a/tests/integration/redis-cli.tcl b/tests/integration/redis-cli.tcl
index 4b9ce38d1..e858002f1 100644
--- a/tests/integration/redis-cli.tcl
+++ b/tests/integration/redis-cli.tcl
@@ -315,34 +315,53 @@ if {!$::tls} { ;# fake_redis_node doesn't support TLS
file delete $tmpfile
}
- proc test_redis_cli_rdb_dump {} {
+ proc test_redis_cli_rdb_dump {functions_only} {
r flushdb
+ r function flush
set dir [lindex [r config get dir] 1]
assert_equal "OK" [r debug populate 100000 key 1000]
- catch {run_cli --rdb "$dir/cli.rdb"} output
+ assert_equal "OK" [r function create lua func1 "return 123"]
+ if {$functions_only} {
+ set args "--functions-rdb $dir/cli.rdb"
+ } else {
+ set args "--rdb $dir/cli.rdb"
+ }
+ catch {run_cli {*}$args} output
assert_match {*Transfer finished with success*} $output
file delete "$dir/dump.rdb"
file rename "$dir/cli.rdb" "$dir/dump.rdb"
assert_equal "OK" [r set should-not-exist 1]
+ assert_equal "OK" [r function create lua should_not_exist_func "return 456"]
assert_equal "OK" [r debug reload nosave]
assert_equal {} [r get should-not-exist]
+ assert_error "ERR Function does not exists" {r function info should_not_exist_func}
+ assert_equal "func1" [dict get [r function info func1] name]
+ if {$functions_only} {
+ assert_equal 0 [r dbsize]
+ } else {
+ assert_equal 100000 [r dbsize]
+ }
}
- test "Dumping an RDB" {
+ foreach {functions_only} {no yes} {
+
+ test "Dumping an RDB - functions only: $functions_only" {
# Disk-based master
assert_match "OK" [r config set repl-diskless-sync no]
- test_redis_cli_rdb_dump
+ test_redis_cli_rdb_dump $functions_only
# Disk-less master
assert_match "OK" [r config set repl-diskless-sync yes]
assert_match "OK" [r config set repl-diskless-sync-delay 0]
- test_redis_cli_rdb_dump
+ test_redis_cli_rdb_dump $functions_only
} {} {needs:repl needs:debug}
+ } ;# foreach functions_only
+
test "Scan mode" {
r flushdb
populate 1000 key: 1