summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/aof.c2
-rw-r--r--src/blocked.c27
-rw-r--r--src/config.c3
-rw-r--r--src/db.c18
-rw-r--r--src/evict.c21
-rw-r--r--src/expire.c14
-rw-r--r--src/module.c230
-rw-r--r--src/multi.c42
-rw-r--r--src/networking.c2
-rw-r--r--src/replication.c7
-rw-r--r--src/script.c25
-rw-r--r--src/script.h3
-rw-r--r--src/server.c205
-rw-r--r--src/server.h20
-rw-r--r--src/t_list.c6
-rw-r--r--src/t_stream.c21
16 files changed, 305 insertions, 341 deletions
diff --git a/src/aof.c b/src/aof.c
index beaecefca..1647dd009 100644
--- a/src/aof.c
+++ b/src/aof.c
@@ -628,6 +628,8 @@ sds genAofTimestampAnnotationIfNeeded(int force) {
void feedAppendOnlyFile(int dictid, robj **argv, int argc) {
sds buf = sdsempty();
+ serverAssert(dictid >= 0 && dictid < server.dbnum);
+
/* Feed timestamp if needed */
if (server.aof_timestamp_enabled) {
sds ts = genAofTimestampAnnotationIfNeeded(0);
diff --git a/src/blocked.c b/src/blocked.c
index a554e863b..ccab0e0e1 100644
--- a/src/blocked.c
+++ b/src/blocked.c
@@ -352,10 +352,6 @@ void serveClientsBlockedOnSortedSetKey(robj *o, readyList *rl) {
monotime replyTimer;
elapsedStart(&replyTimer);
genericZpopCommand(receiver, &rl->key, 1, where, 1, count, use_nested_array, reply_nil_when_empty, &deleted);
- updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer));
- unblockClient(receiver);
- afterCommand(receiver);
- server.current_client = old_client;
/* Replicate the command. */
int argc = 2;
@@ -369,10 +365,15 @@ void serveClientsBlockedOnSortedSetKey(robj *o, readyList *rl) {
argv[2] = count_obj;
argc++;
}
- propagate(receiver->db->id, argv, argc, PROPAGATE_AOF|PROPAGATE_REPL);
+ alsoPropagate(receiver->db->id, argv, argc, PROPAGATE_AOF|PROPAGATE_REPL);
decrRefCount(argv[1]);
if (count != -1) decrRefCount(argv[2]);
+ updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer));
+ unblockClient(receiver);
+ afterCommand(receiver);
+ server.current_client = old_client;
+
/* The zset is empty and has been deleted. */
if (deleted) break;
}
@@ -474,7 +475,6 @@ void serveClientsBlockedOnStreamKey(robj *o, readyList *rl) {
receiver->bpop.xread_count,
0, group, consumer, noack, &pi);
updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer));
-
/* Note that after we unblock the client, 'gt'
* and other receiver->bpop stuff are no longer
* valid, so we must do the setup above before
@@ -532,7 +532,6 @@ void serveClientsBlockedOnKeyByModule(readyList *rl) {
elapsedStart(&replyTimer);
if (!moduleTryServeClientBlockedOnKey(receiver, rl->key)) continue;
updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer));
-
moduleUnblockClient(receiver);
afterCommand(receiver);
server.current_client = old_client;
@@ -562,6 +561,11 @@ void serveClientsBlockedOnKeyByModule(readyList *rl) {
* be used only for a single type, like virtually any Redis application will
* do, the function is already fair. */
void handleClientsBlockedOnKeys(void) {
+ /* This function is called only when also_propagate is in its basic state
+ * (i.e. not from call(), module context, etc.) */
+ serverAssert(server.also_propagate.numops == 0);
+ server.core_propagates = 1;
+
while(listLength(server.ready_keys) != 0) {
list *l;
@@ -603,6 +607,11 @@ void handleClientsBlockedOnKeys(void) {
* regardless of the object type: we don't know what the
* module is trying to accomplish right now. */
serveClientsBlockedOnKeyByModule(rl);
+ } else {
+ /* Edge case: If lookupKeyReadWithFlags decides to expire the key we have to
+ * take care of the propagation here, because afterCommand wasn't called */
+ if (server.also_propagate.numops > 0)
+ propagatePendingCommands();
}
server.fixed_time_expire--;
@@ -613,6 +622,10 @@ void handleClientsBlockedOnKeys(void) {
}
listRelease(l); /* We have the new list on place at this point. */
}
+
+ serverAssert(server.core_propagates); /* This function should not be re-entrant */
+
+ server.core_propagates = 0;
}
/* This is how the current blocking lists/sorted sets/streams work, we use
diff --git a/src/config.c b/src/config.c
index 407cf7249..129770571 100644
--- a/src/config.c
+++ b/src/config.c
@@ -2207,6 +2207,9 @@ static int updateMaxmemory(const char **err) {
}
performEvictions();
}
+ /* The function is called via 'CONFIG SET maxmemory', we don't want to propagate it
+ * because server.dirty might have been incremented by performEvictions() */
+ preventCommandPropagation(server.current_client);
return 1;
}
diff --git a/src/db.c b/src/db.c
index 8a28b78a8..ff7545e6a 100644
--- a/src/db.c
+++ b/src/db.c
@@ -1466,7 +1466,7 @@ void deleteExpiredKeyAndPropagate(redisDb *db, robj *keyobj) {
latencyAddSampleIfNeeded("expire-del",expire_latency);
notifyKeyspaceEvent(NOTIFY_EXPIRED,"expired",keyobj,db->id);
signalModifiedKey(NULL, db, keyobj);
- propagateExpire(db,keyobj,server.lazyfree_lazy_expire);
+ propagateDeletion(db,keyobj,server.lazyfree_lazy_expire);
server.stat_expiredkeys++;
}
@@ -1477,8 +1477,18 @@ void deleteExpiredKeyAndPropagate(redisDb *db, robj *keyobj) {
* This way the key expiry is centralized in one place, and since both
* AOF and the master->slave link guarantee operation ordering, everything
* will be consistent even if we allow write operations against expiring
- * keys. */
-void propagateExpire(redisDb *db, robj *key, int lazy) {
+ * keys.
+ *
+ * This function may be called from:
+ * 1. Within call(): Example: Lazy-expire on key access.
+ * In this case the caller doesn't have to do anything
+ * because call() handles server.also_propagate(); or
+ * 2. Outside of call(): Example: Active-expire, eviction.
+ * In this the caller must remember to call
+ * propagatePendingCommands, preferably at the end of
+ * the deletion batch, so that DELs will be wrapped
+ * in MULTI/EXEC */
+void propagateDeletion(redisDb *db, robj *key, int lazy) {
robj *argv[2];
argv[0] = lazy ? shared.unlink : shared.del;
@@ -1490,7 +1500,7 @@ void propagateExpire(redisDb *db, robj *key, int lazy) {
* Even if module executed a command without asking for propagation. */
int prev_replication_allowed = server.replication_allowed;
server.replication_allowed = 1;
- propagate(db->id,argv,2,PROPAGATE_AOF|PROPAGATE_REPL);
+ alsoPropagate(db->id,argv,2,PROPAGATE_AOF|PROPAGATE_REPL);
server.replication_allowed = prev_replication_allowed;
decrRefCount(argv[0]);
diff --git a/src/evict.c b/src/evict.c
index a10c2d20e..6e2f8af72 100644
--- a/src/evict.c
+++ b/src/evict.c
@@ -484,6 +484,10 @@ static int isSafeToPerformEvictions(void) {
* expires and evictions of keys not being performed. */
if (checkClientPauseTimeoutAndReturnIfPaused()) return 0;
+ /* We cannot evict if we already have stuff to propagate (for example,
+ * CONFIG SET maxmemory inside a MULTI/EXEC) */
+ if (server.also_propagate.numops != 0) return 0;
+
return 1;
}
@@ -561,6 +565,13 @@ int performEvictions(void) {
monotime evictionTimer;
elapsedStart(&evictionTimer);
+ /* Unlike active-expire and blocked client, we can reach here from 'CONFIG SET maxmemory'
+ * so we have to back-up and restore server.core_propagates. */
+ int prev_core_propagates = server.core_propagates;
+ serverAssert(server.also_propagate.numops == 0);
+ server.core_propagates = 1;
+ server.propagate_no_multi = 1;
+
while (mem_freed < (long long)mem_tofree) {
int j, k, i;
static unsigned int next_db = 0;
@@ -648,7 +659,6 @@ int performEvictions(void) {
if (bestkey) {
db = server.db+bestdbid;
robj *keyobj = createStringObject(bestkey,sdslen(bestkey));
- propagateExpire(db,keyobj,server.lazyfree_lazy_eviction);
/* We compute the amount of memory freed by db*Delete() alone.
* It is possible that actually the memory needed to propagate
* the DEL in AOF and replication link is greater than the one
@@ -673,6 +683,7 @@ int performEvictions(void) {
signalModifiedKey(NULL,db,keyobj);
notifyKeyspaceEvent(NOTIFY_EVICTED, "evicted",
keyobj, db->id);
+ propagateDeletion(db,keyobj,server.lazyfree_lazy_eviction);
decrRefCount(keyobj);
keys_freed++;
@@ -729,6 +740,14 @@ cant_free:
}
}
+ serverAssert(server.core_propagates); /* This function should not be re-entrant */
+
+ /* Propagate all DELs */
+ propagatePendingCommands();
+
+ server.core_propagates = prev_core_propagates;
+ server.propagate_no_multi = 0;
+
latencyEndMonitor(latency);
latencyAddSampleIfNeeded("eviction-cycle",latency);
diff --git a/src/expire.c b/src/expire.c
index 6be5d8ff3..059d0c344 100644
--- a/src/expire.c
+++ b/src/expire.c
@@ -182,6 +182,12 @@ void activeExpireCycle(int type) {
long total_sampled = 0;
long total_expired = 0;
+ /* Sanity: There can't be any pending commands to propagate when
+ * we're in cron */
+ serverAssert(server.also_propagate.numops == 0);
+ server.core_propagates = 1;
+ server.propagate_no_multi = 1;
+
for (j = 0; j < dbs_per_call && timelimit_exit == 0; j++) {
/* Expired and checked in a single loop. */
unsigned long expired, sampled;
@@ -302,6 +308,14 @@ void activeExpireCycle(int type) {
(expired*100/sampled) > config_cycle_acceptable_stale);
}
+ serverAssert(server.core_propagates); /* This function should not be re-entrant */
+
+ /* Propagate all DELs */
+ propagatePendingCommands();
+
+ server.core_propagates = 0;
+ server.propagate_no_multi = 0;
+
elapsed = ustime()-start;
server.stat_expire_cycle_time_used += elapsed;
latencyAddSampleIfNeeded("expire-cycle",elapsed/1000);
diff --git a/src/module.c b/src/module.c
index 371dee335..81068556d 100644
--- a/src/module.c
+++ b/src/module.c
@@ -158,22 +158,16 @@ struct RedisModuleCtx {
getKeysResult *keys_result;
struct RedisModulePoolAllocBlock *pa_head;
- redisOpArray saved_oparray; /* When propagating commands in a callback
- we reallocate the "also propagate" op
- array. Here we save the old one to
- restore it later. */
};
typedef struct RedisModuleCtx RedisModuleCtx;
-#define REDISMODULE_CTX_INIT {(void*)(unsigned long)&RM_GetApi, NULL, NULL, NULL, NULL, 0, 0, 0, NULL, 0, NULL, NULL, NULL, NULL, {0}}
+#define REDISMODULE_CTX_NONE (0)
#define REDISMODULE_CTX_AUTO_MEMORY (1<<0)
#define REDISMODULE_CTX_KEYS_POS_REQUEST (1<<1)
#define REDISMODULE_CTX_BLOCKED_REPLY (1<<2)
#define REDISMODULE_CTX_BLOCKED_TIMEOUT (1<<3)
#define REDISMODULE_CTX_THREAD_SAFE (1<<4)
#define REDISMODULE_CTX_BLOCKED_DISCONNECTED (1<<5)
-#define REDISMODULE_CTX_MODULE_COMMAND_CALL (1<<6)
-#define REDISMODULE_CTX_MULTI_EMITTED (1<<7)
/* This represents a Redis key opened with RM_OpenKey(). */
struct RedisModuleKey {
@@ -395,7 +389,6 @@ void RM_FreeCallReply(RedisModuleCallReply *reply);
void RM_CloseKey(RedisModuleKey *key);
void autoMemoryCollect(RedisModuleCtx *ctx);
robj **moduleCreateArgvFromUserFormat(const char *cmdname, const char *fmt, int *argcp, int *argvlenp, int *flags, va_list ap);
-void moduleReplicateMultiIfNeeded(RedisModuleCtx *ctx);
void RM_ZsetRangeStop(RedisModuleKey *kp);
static void zsetKeyReset(RedisModuleKey *key);
static void moduleInitKeyTypeSpecific(RedisModuleKey *key);
@@ -614,52 +607,14 @@ int RM_GetApi(const char *funcname, void **targetPtrPtr) {
return REDISMODULE_OK;
}
-/* Helper function for when a command callback is called, in order to handle
- * details needed to correctly replicate commands. */
-void moduleHandlePropagationAfterCommandCallback(RedisModuleCtx *ctx) {
- client *c = ctx->client;
-
- /* We don't need to do anything here if the context was never used
- * in order to propagate commands. */
- if (!(ctx->flags & REDISMODULE_CTX_MULTI_EMITTED)) return;
-
- /* We don't need to do anything here if the server isn't inside
- * a transaction. */
- if (!server.propagate_in_transaction) return;
-
- /* If this command is executed from with Lua or MULTI/EXEC we do not
- * need to propagate EXEC */
- if (server.in_script || server.in_exec) return;
-
- /* Handle the replication of the final EXEC, since whatever a command
- * emits is always wrapped around MULTI/EXEC. */
- alsoPropagate(c->db->id,&shared.exec,1,PROPAGATE_AOF|PROPAGATE_REPL);
- afterPropagateExec();
-
- /* If this is not a module command context (but is instead a simple
- * callback context), we have to handle directly the "also propagate"
- * array and emit it. In a module command call this will be handled
- * directly by call(). */
- if (!(ctx->flags & REDISMODULE_CTX_MODULE_COMMAND_CALL) &&
- server.also_propagate.numops)
- {
- for (int j = 0; j < server.also_propagate.numops; j++) {
- redisOp *rop = &server.also_propagate.ops[j];
- int target = rop->target;
- if (target)
- propagate(rop->dbid,rop->argv,rop->argc,target);
- }
- redisOpArrayFree(&server.also_propagate);
- /* Restore the previous oparray in case of nexted use of the API. */
- server.also_propagate = ctx->saved_oparray;
- /* We're done with saved_oparray, let's invalidate it. */
- redisOpArrayInit(&ctx->saved_oparray);
- }
-}
-
/* Free the context after the user function was called. */
void moduleFreeContext(RedisModuleCtx *ctx) {
- moduleHandlePropagationAfterCommandCallback(ctx);
+ if (!(ctx->flags & REDISMODULE_CTX_THREAD_SAFE)) {
+ /* Modules take care of their own propagation, when we are
+ * outside of call() context (timers, events, etc.). */
+ if (--server.module_ctx_nesting == 0 && !server.core_propagates)
+ propagatePendingCommands();
+ }
autoMemoryCollect(ctx);
poolAllocRelease(ctx);
if (ctx->postponed_arrays) {
@@ -675,14 +630,29 @@ void moduleFreeContext(RedisModuleCtx *ctx) {
if (ctx->flags & REDISMODULE_CTX_THREAD_SAFE) freeClient(ctx->client);
}
+/* Create a module ctx and keep track of the nesting level.
+ *
+ * Note: When creating ctx for threads (RM_GetThreadSafeContext and
+ * RM_GetDetachedThreadSafeContext) we do not bump up the nesting level
+ * because we only need to track of nesting level in the main thread
+ * (only the main thread uses propagatePendingCommands) */
+void moduleCreateContext(RedisModuleCtx *out_ctx, RedisModule *module, int ctx_flags) {
+ memset(out_ctx, 0 ,sizeof(RedisModuleCtx));
+ out_ctx->getapifuncptr = (void*)(unsigned long)&RM_GetApi;
+ out_ctx->module = module;
+ out_ctx->flags = ctx_flags;
+ if (!(ctx_flags & REDISMODULE_CTX_THREAD_SAFE)) {
+ server.module_ctx_nesting++;
+ }
+}
+
/* This Redis command binds the normal Redis command invocation with commands
* exported by modules. */
void RedisModuleCommandDispatcher(client *c) {
RedisModuleCommand *cp = (void*)(unsigned long)c->cmd->getkeys_proc;
- RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
+ RedisModuleCtx ctx;
+ moduleCreateContext(&ctx, cp->module, REDISMODULE_CTX_NONE);
- ctx.flags |= REDISMODULE_CTX_MODULE_COMMAND_CALL;
- ctx.module = cp->module;
ctx.client = c;
cp->func(&ctx,(void**)c->argv,c->argc);
moduleFreeContext(&ctx);
@@ -715,11 +685,8 @@ void RedisModuleCommandDispatcher(client *c) {
* "get keys" call by calling RedisModule_IsKeysPositionRequest(ctx). */
int moduleGetCommandKeysViaAPI(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) {
RedisModuleCommand *cp = (void*)(unsigned long)cmd->getkeys_proc;
- RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
-
- ctx.module = cp->module;
- ctx.client = NULL;
- ctx.flags |= REDISMODULE_CTX_KEYS_POS_REQUEST;
+ RedisModuleCtx ctx;
+ moduleCreateContext(&ctx, cp->module, REDISMODULE_CTX_KEYS_POS_REQUEST);
/* Initialize getKeysResult */
getKeysPrepareResult(result, MAX_KEYS_BUFFER);
@@ -2325,31 +2292,6 @@ int RM_ReplyWithLongDouble(RedisModuleCtx *ctx, long double ld) {
* ## Commands replication API
* -------------------------------------------------------------------------- */
-/* Helper function to replicate MULTI the first time we replicate something
- * in the context of a command execution. EXEC will be handled by the
- * RedisModuleCommandDispatcher() function. */
-void moduleReplicateMultiIfNeeded(RedisModuleCtx *ctx) {
- /* Skip this if client explicitly wrap the command with MULTI, or if
- * the module command was called by a script. */
- if (server.in_script || server.in_exec) return;
- /* If we already emitted MULTI return ASAP. */
- if (server.propagate_in_transaction) return;
- /* If this is a thread safe context, we do not want to wrap commands
- * executed into MULTI/EXEC, they are executed as single commands
- * from an external client in essence. */
- if (ctx->flags & REDISMODULE_CTX_THREAD_SAFE) return;
- /* If this is a callback context, and not a module command execution
- * context, we have to setup the op array for the "also propagate" API
- * so that RM_Replicate() will work. */
- if (!(ctx->flags & REDISMODULE_CTX_MODULE_COMMAND_CALL)) {
- serverAssert(ctx->saved_oparray.ops == NULL);
- ctx->saved_oparray = server.also_propagate;
- redisOpArrayInit(&server.also_propagate);
- }
- execCommandPropagateMulti(ctx->client->db->id);
- ctx->flags |= REDISMODULE_CTX_MULTI_EMITTED;
-}
-
/* Replicate the specified command and arguments to slaves and AOF, as effect
* of execution of the calling command implementation.
*
@@ -2409,16 +2351,7 @@ int RM_Replicate(RedisModuleCtx *ctx, const char *cmdname, const char *fmt, ...)
if (!(flags & REDISMODULE_ARGV_NO_AOF)) target |= PROPAGATE_AOF;
if (!(flags & REDISMODULE_ARGV_NO_REPLICAS)) target |= PROPAGATE_REPL;
- /* Replicate! When we are in a threaded context, we want to just insert
- * the replicated command ASAP, since it is not clear when the context
- * will stop being used, so accumulating stuff does not make much sense,
- * nor we could easily use the alsoPropagate() API from threads. */
- if (ctx->flags & REDISMODULE_CTX_THREAD_SAFE) {
- propagate(ctx->client->db->id,argv,argc,target);
- } else {
- moduleReplicateMultiIfNeeded(ctx);
- alsoPropagate(ctx->client->db->id,argv,argc,target);
- }
+ alsoPropagate(ctx->client->db->id,argv,argc,target);
/* Release the argv. */
for (j = 0; j < argc; j++) decrRefCount(argv[j]);
@@ -5028,19 +4961,18 @@ RedisModuleCallReply *RM_Call(RedisModuleCtx *ctx, const char *cmdname, const ch
server.replication_allowed = replicate && server.replication_allowed;
/* Run the command */
- int call_flags = CMD_CALL_SLOWLOG | CMD_CALL_STATS | CMD_CALL_NOWRAP;
+ int call_flags = CMD_CALL_SLOWLOG | CMD_CALL_STATS | CMD_CALL_FROM_MODULE;
if (replicate) {
- /* If we are using single commands replication, we need to wrap what
- * we propagate into a MULTI/EXEC block, so that it will be atomic like
- * a Lua script in the context of AOF and slaves. */
- moduleReplicateMultiIfNeeded(ctx);
-
if (!(flags & REDISMODULE_ARGV_NO_AOF))
call_flags |= CMD_CALL_PROPAGATE_AOF;
if (!(flags & REDISMODULE_ARGV_NO_REPLICAS))
call_flags |= CMD_CALL_PROPAGATE_REPL;
}
+ /* Set server.current_client */
+ client *old_client = server.current_client;
+ server.current_client = c;
call(c,call_flags);
+ server.current_client = old_client;
server.replication_allowed = prev_replication_allowed;
serverAssert((c->flags & CLIENT_BLOCKED) == 0);
@@ -6007,11 +5939,8 @@ void RM_EmitAOF(RedisModuleIO *io, const char *cmdname, const char *fmt, ...) {
RedisModuleCtx *RM_GetContextFromIO(RedisModuleIO *io) {
if (io->ctx) return io->ctx; /* Can't have more than one... */
- RedisModuleCtx ctxtemplate = REDISMODULE_CTX_INIT;
io->ctx = zmalloc(sizeof(RedisModuleCtx));
- *(io->ctx) = ctxtemplate;
- io->ctx->module = io->type->module;
- io->ctx->client = NULL;
+ moduleCreateContext(io->ctx, io->type->module, REDISMODULE_CTX_NONE);
return io->ctx;
}
@@ -6162,9 +6091,9 @@ void unblockClientFromModule(client *c) {
* by the module itself or because of a timeout, so the callback will NOT
* get called if this is not an actual disconnection event. */
if (bc->disconnect_callback) {
- RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
+ RedisModuleCtx ctx;
+ moduleCreateContext(&ctx, bc->module, REDISMODULE_CTX_NONE);
ctx.blocked_privdata = bc->privdata;
- ctx.module = bc->module;
ctx.client = bc->client;
bc->disconnect_callback(&ctx,bc);
moduleFreeContext(&ctx);
@@ -6274,11 +6203,10 @@ int moduleTryServeClientBlockedOnKey(client *c, robj *key) {
* explicit call). See #6798. */
if (bc->unblocked) return 0;
- RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
- ctx.flags |= REDISMODULE_CTX_BLOCKED_REPLY;
+ RedisModuleCtx ctx;
+ moduleCreateContext(&ctx, bc->module, REDISMODULE_CTX_BLOCKED_REPLY);
ctx.blocked_ready_key = key;
ctx.blocked_privdata = bc->privdata;
- ctx.module = bc->module;
ctx.client = bc->client;
ctx.blocked_client = bc;
if (bc->reply_callback(&ctx,(void**)c->argv,c->argc) == REDISMODULE_OK)
@@ -6521,11 +6449,10 @@ void moduleHandleBlockedClients(void) {
* the key was signaled as ready. */
uint64_t reply_us = 0;
if (c && !bc->blocked_on_keys && bc->reply_callback) {
- RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
- ctx.flags |= REDISMODULE_CTX_BLOCKED_REPLY;
+ RedisModuleCtx ctx;
+ moduleCreateContext(&ctx, bc->module, REDISMODULE_CTX_BLOCKED_REPLY);
ctx.blocked_privdata = bc->privdata;
ctx.blocked_ready_key = NULL;
- ctx.module = bc->module;
ctx.client = bc->client;
ctx.blocked_client = bc;
monotime replyTimer;
@@ -6544,11 +6471,10 @@ void moduleHandleBlockedClients(void) {
/* Free privdata if any. */
if (bc->privdata && bc->free_privdata) {
- RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
- if (c == NULL)
- ctx.flags |= REDISMODULE_CTX_BLOCKED_DISCONNECTED;
+ RedisModuleCtx ctx;
+ int ctx_flags = c == NULL ? REDISMODULE_CTX_BLOCKED_DISCONNECTED : REDISMODULE_CTX_NONE;
+ moduleCreateContext(&ctx, bc->module, ctx_flags);
ctx.blocked_privdata = bc->privdata;
- ctx.module = bc->module;
ctx.client = bc->client;
bc->free_privdata(&ctx,bc->privdata);
moduleFreeContext(&ctx);
@@ -6613,9 +6539,8 @@ void moduleBlockedClientTimedOut(client *c) {
* explicit call). See #6798. */
if (bc->unblocked) return;
- RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
- ctx.flags |= REDISMODULE_CTX_BLOCKED_TIMEOUT;
- ctx.module = bc->module;
+ RedisModuleCtx ctx;
+ moduleCreateContext(&ctx, bc->module, REDISMODULE_CTX_BLOCKED_TIMEOUT);
ctx.client = bc->client;
ctx.blocked_client = bc;
ctx.blocked_privdata = bc->privdata;
@@ -6694,19 +6619,16 @@ int RM_BlockedClientDisconnected(RedisModuleCtx *ctx) {
* the module ID and thus be more useful for logging. */
RedisModuleCtx *RM_GetThreadSafeContext(RedisModuleBlockedClient *bc) {
RedisModuleCtx *ctx = zmalloc(sizeof(*ctx));
- RedisModuleCtx empty = REDISMODULE_CTX_INIT;
- memcpy(ctx,&empty,sizeof(empty));
- if (bc) {
- ctx->blocked_client = bc;
- ctx->module = bc->module;
- }
- ctx->flags |= REDISMODULE_CTX_THREAD_SAFE;
+ RedisModule *module = bc ? bc->module : NULL;
+ moduleCreateContext(ctx, module, REDISMODULE_CTX_THREAD_SAFE);
+
/* Even when the context is associated with a blocked client, we can't
* access it safely from another thread, so we create a fake client here
* in order to keep things like the currently selected database and similar
* things. */
ctx->client = createClient(NULL);
if (bc) {
+ ctx->blocked_client = bc;
selectDb(ctx->client,bc->dbid);
if (bc->client) {
ctx->client->id = bc->client->id;
@@ -6723,10 +6645,7 @@ RedisModuleCtx *RM_GetThreadSafeContext(RedisModuleBlockedClient *bc) {
* a long term, for purposes such as logging. */
RedisModuleCtx *RM_GetDetachedThreadSafeContext(RedisModuleCtx *ctx) {
RedisModuleCtx *new_ctx = zmalloc(sizeof(*new_ctx));
- RedisModuleCtx empty = REDISMODULE_CTX_INIT;
- memcpy(new_ctx,&empty,sizeof(empty));
- new_ctx->module = ctx->module;
- new_ctx->flags |= REDISMODULE_CTX_THREAD_SAFE;
+ moduleCreateContext(new_ctx, ctx->module, REDISMODULE_CTX_THREAD_SAFE);
new_ctx->client = createClient(NULL);
return new_ctx;
}
@@ -6737,12 +6656,22 @@ void RM_FreeThreadSafeContext(RedisModuleCtx *ctx) {
zfree(ctx);
}
+void moduleGILAfterLock() {
+ /* We should never get here if we already inside a module
+ * code block which already opened a context. */
+ serverAssert(server.module_ctx_nesting == 0);
+ /* Bump up the nesting level to prevent immediate propagation
+ * of possible RM_Call from th thread */
+ server.module_ctx_nesting++;
+}
+
/* Acquire the server lock before executing a thread safe API call.
* This is not needed for `RedisModule_Reply*` calls when there is
* a blocked client connected to the thread safe context. */
void RM_ThreadSafeContextLock(RedisModuleCtx *ctx) {
UNUSED(ctx);
moduleAcquireGIL();
+ moduleGILAfterLock();
}
/* Similar to RM_ThreadSafeContextLock but this function
@@ -6759,12 +6688,26 @@ int RM_ThreadSafeContextTryLock(RedisModuleCtx *ctx) {
errno = res;
return REDISMODULE_ERR;
}
+ moduleGILAfterLock();
return REDISMODULE_OK;
}
+void moduleGILBeforeUnlock() {
+ /* We should never get here if we already inside a module
+ * code block which already opened a context, except
+ * the bump-up from moduleGILAcquired. */
+ serverAssert(server.module_ctx_nesting == 1);
+ /* Restore ctx_nesting and propagate pending commands
+ * (because it's u clear when thread safe contexts are
+ * released we have to propagate here). */
+ server.module_ctx_nesting--;
+ propagatePendingCommands();
+}
+
/* Release the server lock after a thread safe API call was executed. */
void RM_ThreadSafeContextUnlock(RedisModuleCtx *ctx) {
UNUSED(ctx);
+ moduleGILBeforeUnlock();
moduleReleaseGIL();
}
@@ -6885,8 +6828,8 @@ void moduleNotifyKeyspaceEvent(int type, const char *event, robj *key, int dbid)
/* Only notify subscribers on events matching they registration,
* and avoid subscribers triggering themselves */
if ((sub->event_mask & type) && sub->active == 0) {
- RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
- ctx.module = sub->module;
+ RedisModuleCtx ctx;
+ moduleCreateContext(&ctx, sub->module, REDISMODULE_CTX_NONE);
ctx.client = moduleFreeContextReusedClient;
selectDb(ctx.client, dbid);
@@ -6948,8 +6891,8 @@ void moduleCallClusterReceivers(const char *sender_id, uint64_t module_id, uint8
moduleClusterReceiver *r = clusterReceivers[type];
while(r) {
if (r->module_id == module_id) {
- RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
- ctx.module = r->module;
+ RedisModuleCtx ctx;
+ moduleCreateContext(&ctx, r->module, REDISMODULE_CTX_NONE);
ctx.client = moduleFreeContextReusedClient;
selectDb(ctx.client, 0);
r->callback(&ctx,sender_id,type,payload,len);
@@ -7220,9 +7163,8 @@ int moduleTimerHandler(struct aeEventLoop *eventLoop, long long id, void *client
expiretime = ntohu64(expiretime);
if (now >= expiretime) {
RedisModuleTimer *timer = ri.data;
- RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
-
- ctx.module = timer->module;
+ RedisModuleCtx ctx;
+ moduleCreateContext(&ctx, timer->module, REDISMODULE_CTX_NONE);
ctx.client = moduleFreeContextReusedClient;
selectDb(ctx.client, timer->dbid);
timer->callback(&ctx,timer->data);
@@ -9326,8 +9268,8 @@ void moduleFireServerEvent(uint64_t eid, int subid, void *data) {
while((ln = listNext(&li))) {
RedisModuleEventListener *el = ln->value;
if (el->event.id == eid) {
- RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
- ctx.module = el->module;
+ RedisModuleCtx ctx;
+ moduleCreateContext(&ctx, el->module, REDISMODULE_CTX_NONE);
if (eid == REDISMODULE_EVENT_CLIENT_CHANGE) {
/* In the case of client changes, we're pushing the real client
@@ -9658,7 +9600,8 @@ void moduleUnregisterCommands(struct RedisModule *module) {
int moduleLoad(const char *path, void **module_argv, int module_argc) {
int (*onload)(void *, void **, int);
void *handle;
- RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
+ RedisModuleCtx ctx;
+ moduleCreateContext(&ctx, NULL, REDISMODULE_CTX_NONE); /* We pass NULL since we don't have a module yet. */
ctx.client = moduleFreeContextReusedClient;
selectDb(ctx.client, 0);
@@ -9715,7 +9658,6 @@ int moduleLoad(const char *path, void **module_argv, int module_argc) {
moduleFireServerEvent(REDISMODULE_EVENT_MODULE_CHANGE,
REDISMODULE_SUBEVENT_MODULE_LOADED,
ctx.module);
-
moduleFreeContext(&ctx);
return C_OK;
}
@@ -9751,8 +9693,8 @@ int moduleUnload(sds name) {
int (*onunload)(void *);
onunload = (int (*)(void *))(unsigned long) dlsym(module->handle, "RedisModule_OnUnload");
if (onunload) {
- RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
- ctx.module = module;
+ RedisModuleCtx ctx;
+ moduleCreateContext(&ctx, module, REDISMODULE_CTX_NONE);
ctx.client = moduleFreeContextReusedClient;
int unload_status = onunload((void*)&ctx);
moduleFreeContext(&ctx);
diff --git a/src/multi.c b/src/multi.c
index 1c1a17ee7..a98195672 100644
--- a/src/multi.c
+++ b/src/multi.c
@@ -120,30 +120,6 @@ void discardCommand(client *c) {
addReply(c,shared.ok);
}
-void beforePropagateMulti() {
- /* Propagating MULTI */
- serverAssert(!server.propagate_in_transaction);
- server.propagate_in_transaction = 1;
-}
-
-void afterPropagateExec() {
- /* Propagating EXEC */
- serverAssert(server.propagate_in_transaction == 1);
- server.propagate_in_transaction = 0;
-}
-
-/* Send a MULTI command to all the slaves and AOF file. Check the execCommand
- * implementation for more information. */
-void execCommandPropagateMulti(int dbid) {
- beforePropagateMulti();
- propagate(dbid,&shared.multi,1,PROPAGATE_AOF|PROPAGATE_REPL);
-}
-
-void execCommandPropagateExec(int dbid) {
- propagate(dbid,&shared.exec,1,PROPAGATE_AOF|PROPAGATE_REPL);
- afterPropagateExec();
-}
-
/* Aborts a transaction, with a specific error message.
* The transaction is always aborted with -EXECABORT so that the client knows
* the server exited the multi state, but the actual reason for the abort is
@@ -166,7 +142,6 @@ void execCommand(client *c) {
robj **orig_argv;
int orig_argc, orig_argv_len;
struct redisCommand *orig_cmd;
- int was_master = server.masterhost == NULL;
if (!(c->flags & CLIENT_MULTI)) {
addReplyError(c,"EXEC without MULTI");
@@ -268,23 +243,6 @@ void execCommand(client *c) {
c->cmd = orig_cmd;
discardTransaction(c);
- /* Make sure the EXEC command will be propagated as well if MULTI
- * was already propagated. */
- if (server.propagate_in_transaction) {
- int is_master = server.masterhost == NULL;
- server.dirty++;
- /* If inside the MULTI/EXEC block this instance was suddenly
- * switched from master to slave (using the SLAVEOF command), the
- * initial MULTI was propagated into the replication backlog, but the
- * rest was not. We need to make sure to at least terminate the
- * backlog with the final EXEC. */
- if (server.repl_backlog && was_master && !is_master) {
- char *execcmd = "*1\r\n$4\r\nEXEC\r\n";
- feedReplicationBuffer(execcmd,strlen(execcmd));
- }
- afterPropagateExec();
- }
-
server.in_exec = 0;
}
diff --git a/src/networking.c b/src/networking.c
index 7e5779b02..238af0b8f 100644
--- a/src/networking.c
+++ b/src/networking.c
@@ -3563,7 +3563,7 @@ void pauseClients(mstime_t end, pause_type type) {
/* We allow write commands that were queued
* up before and after to execute. We need
* to track this state so that we don't assert
- * in propagate(). */
+ * in propagateNow(). */
if (server.in_exec) {
server.client_pause_in_transaction = 1;
}
diff --git a/src/replication.c b/src/replication.c
index 421f0be40..ade3ca1c9 100644
--- a/src/replication.c
+++ b/src/replication.c
@@ -421,6 +421,10 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
int j, len;
char llstr[LONG_STR_SIZE];
+ /* In case we propagate a command that doesn't touch keys (PING, REPLCONF) we
+ * pass dbid=server.slaveseldb which may be -1. */
+ serverAssert(dictid == -1 || (dictid >= 0 && dictid < server.dbnum));
+
/* If the instance is not a top level master, return ASAP: we'll just proxy
* the stream of data we receive from our master instead, in order to
* propagate *identical* replication stream. In this way this slave can
@@ -456,8 +460,9 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
if (dictid < 0 || dictid >= PROTO_SHARED_SELECT_CMDS)
decrRefCount(selectcmd);
+
+ server.slaveseldb = dictid;
}
- server.slaveseldb = dictid;
/* Write the command to the replication buffer if any. */
char aux[LONG_STR_SIZE+3];
diff --git a/src/script.c b/src/script.c
index fdafadae8..0c46beea6 100644
--- a/src/script.c
+++ b/src/script.c
@@ -148,11 +148,7 @@ void scriptResetRun(scriptRunCtx *run_ctx) {
unprotectClient(run_ctx->original_client);
}
- /* emit EXEC if MULTI has been propagated. */
preventCommandPropagation(run_ctx->original_client);
- if (run_ctx->flags & SCRIPT_MULTI_EMMITED) {
- execCommandPropagateExec(run_ctx->original_client->db->id);
- }
/* unset curr_run_ctx so we will know there is no running script */
curr_run_ctx = NULL;
@@ -332,25 +328,6 @@ static int scriptVerifyClusterState(client *c, client *original_c, sds *err) {
return C_OK;
}
-static void scriptEmitMultiIfNeeded(scriptRunCtx *run_ctx) {
- /* If we are using single commands replication, we need to wrap what
- * we propagate into a MULTI/EXEC block, so that it will be atomic like
- * a Lua script in the context of AOF and slaves. */
- client *c = run_ctx->c;
- if (!(run_ctx->flags & SCRIPT_MULTI_EMMITED)
- && !(run_ctx->original_client->flags & CLIENT_MULTI)
- && (run_ctx->flags & SCRIPT_WRITE_DIRTY)
- && ((run_ctx->repl_flags & PROPAGATE_AOF)
- || (run_ctx->repl_flags & PROPAGATE_REPL)))
- {
- execCommandPropagateMulti(run_ctx->original_client->db->id);
- run_ctx->flags |= SCRIPT_MULTI_EMMITED;
- /* Now we are in the MULTI context, the lua_client should be
- * flag as CLIENT_MULTI. */
- c->flags |= CLIENT_MULTI;
- }
-}
-
/* set RESP for a given run_ctx */
int scriptSetResp(scriptRunCtx *run_ctx, int resp) {
if (resp != 2 && resp != 3) {
@@ -423,8 +400,6 @@ void scriptCall(scriptRunCtx *run_ctx, robj* *argv, int argc, sds *err) {
return;
}
- scriptEmitMultiIfNeeded(run_ctx);
-
int call_flags = CMD_CALL_SLOWLOG | CMD_CALL_STATS;
if (run_ctx->repl_flags & PROPAGATE_AOF) {
call_flags |= CMD_CALL_PROPAGATE_AOF;
diff --git a/src/script.h b/src/script.h
index cfe6abaff..b2e253ee0 100644
--- a/src/script.h
+++ b/src/script.h
@@ -59,12 +59,9 @@
/* runCtx flags */
#define SCRIPT_WRITE_DIRTY (1ULL<<0) /* indicate that the current script already performed a write command */
-
-#define SCRIPT_MULTI_EMMITED (1ULL<<2) /* indicate that we already wrote a multi command to replication/aof */
#define SCRIPT_TIMEDOUT (1ULL<<3) /* indicate that the current script timedout */
#define SCRIPT_KILLED (1ULL<<4) /* indicate that the current script was marked to be killed */
#define SCRIPT_READ_ONLY (1ULL<<5) /* indicate that the current script should only perform read commands */
-
#define SCRIPT_EVAL_MODE (1ULL<<7) /* Indicate that the current script called from legacy Lua */
typedef struct scriptRunCtx scriptRunCtx;
diff --git a/src/server.c b/src/server.c
index bfc58e0ed..e1f7919b7 100644
--- a/src/server.c
+++ b/src/server.c
@@ -2314,7 +2314,9 @@ void initServer(void) {
server.cronloops = 0;
server.in_script = 0;
server.in_exec = 0;
- server.propagate_in_transaction = 0;
+ server.core_propagates = 0;
+ server.propagate_no_multi = 0;
+ server.module_ctx_nesting = 0;
server.client_pause_in_transaction = 0;
server.child_pid = -1;
server.child_type = CHILD_TYPE_NONE;
@@ -2633,12 +2635,21 @@ void resetErrorTableStats(void) {
void redisOpArrayInit(redisOpArray *oa) {
oa->ops = NULL;
oa->numops = 0;
+ oa->capacity = 0;
}
int redisOpArrayAppend(redisOpArray *oa, int dbid, robj **argv, int argc, int target) {
redisOp *op;
+ int prev_capacity = oa->capacity;
- oa->ops = zrealloc(oa->ops,sizeof(redisOp)*(oa->numops+1));
+ if (oa->numops == 0) {
+ oa->capacity = 16;
+ } else if (oa->numops >= oa->capacity) {
+ oa->capacity *= 2;
+ }
+
+ if (prev_capacity != oa->capacity)
+ oa->ops = zrealloc(oa->ops,sizeof(redisOp)*oa->capacity);
op = oa->ops+oa->numops;
op->dbid = dbid;
op->argv = argv;
@@ -2660,7 +2671,7 @@ void redisOpArrayFree(redisOpArray *oa) {
zfree(op->argv);
}
zfree(oa->ops);
- oa->ops = NULL;
+ redisOpArrayInit(oa);
}
/* ====================== Commands lookup and execution ===================== */
@@ -2735,6 +2746,22 @@ struct redisCommand *lookupCommandOrOriginal(robj **argv ,int argc) {
return cmd;
}
+static int shouldPropagate(int target) {
+ if (!server.replication_allowed || target == PROPAGATE_NONE || server.loading)
+ return 0;
+
+ if (target & PROPAGATE_AOF) {
+ if (server.aof_state != AOF_OFF)
+ return 1;
+ }
+ if (target & PROPAGATE_REPL) {
+ if (server.masterhost == NULL && (server.repl_backlog || listLength(server.slaves) != 0))
+ return 1;
+ }
+
+ return 0;
+}
+
/* Propagate the specified command (in the context of the specified database id)
* to AOF and Slaves.
*
@@ -2743,33 +2770,21 @@ struct redisCommand *lookupCommandOrOriginal(robj **argv ,int argc) {
* + PROPAGATE_AOF (propagate into the AOF file if is enabled)
* + PROPAGATE_REPL (propagate into the replication link)
*
- * This should not be used inside commands implementation since it will not
- * wrap the resulting commands in MULTI/EXEC. Use instead alsoPropagate(),
- * preventCommandPropagation(), forceCommandPropagation().
+ * This is an internal low-level function and should not be called!
*
- * However for functions that need to (also) propagate out of the context of a
- * command execution, for example when serving a blocked client, you
- * want to use propagate().
+ * The API for propagating commands is alsoPropagate().
*/
-void propagate(int dbid, robj **argv, int argc, int flags) {
- if (!server.replication_allowed)
+static void propagateNow(int dbid, robj **argv, int argc, int target) {
+ if (!shouldPropagate(target))
return;
- /* Propagate a MULTI request once we encounter the first command which
- * is a write command.
- * This way we'll deliver the MULTI/..../EXEC block as a whole and
- * both the AOF and the replication link will have the same consistency
- * and atomicity guarantees. */
- if (server.in_exec && !server.propagate_in_transaction)
- execCommandPropagateMulti(dbid);
-
/* This needs to be unreachable since the dataset should be fixed during
* client pause, otherwise data may be lost during a failover. */
serverAssert(!(areClientsPaused() && !server.client_pause_in_transaction));
- if (server.aof_state != AOF_OFF && flags & PROPAGATE_AOF)
+ if (server.aof_state != AOF_OFF && target & PROPAGATE_AOF)
feedAppendOnlyFile(dbid,argv,argc);
- if (flags & PROPAGATE_REPL)
+ if (target & PROPAGATE_REPL)
replicationFeedSlaves(server.slaves,dbid,argv,argc);
}
@@ -2788,7 +2803,8 @@ void alsoPropagate(int dbid, robj **argv, int argc, int target) {
robj **argvcopy;
int j;
- if (server.loading) return; /* No propagation during loading. */
+ if (!shouldPropagate(target))
+ return;
argvcopy = zmalloc(sizeof(robj*)*argc);
for (j = 0; j < argc; j++) {
@@ -2837,6 +2853,46 @@ void slowlogPushCurrentCommand(client *c, struct redisCommand *cmd, ustime_t dur
slowlogPushEntryIfNeeded(c,argv,argc,duration);
}
+/* Handle the alsoPropagate() API to handle commands that want to propagate
+ * multiple separated commands. Note that alsoPropagate() is not affected
+ * by CLIENT_PREVENT_PROP flag. */
+void propagatePendingCommands() {
+ if (server.also_propagate.numops == 0)
+ return;
+
+ int j;
+ redisOp *rop;
+ int multi_emitted = 0;
+
+ /* Wrap the commands in server.also_propagate array,
+ * but don't wrap it if we are already in MULTI context,
+ * in case the nested MULTI/EXEC.
+ *
+ * And if the array contains only one command, no need to
+ * wrap it, since the single command is atomic. */
+ if (server.also_propagate.numops > 1 && !server.propagate_no_multi) {
+ /* We use the first command-to-propagate to set the dbid for MULTI,
+ * so that the SELECT will be propagated beforehand */
+ int multi_dbid = server.also_propagate.ops[0].dbid;
+ propagateNow(multi_dbid,&shared.multi,1,PROPAGATE_AOF|PROPAGATE_REPL);
+ multi_emitted = 1;
+ }
+
+ for (j = 0; j < server.also_propagate.numops; j++) {
+ rop = &server.also_propagate.ops[j];
+ serverAssert(rop->target);
+ propagateNow(rop->dbid,rop->argv,rop->argc,rop->target);
+ }
+
+ if (multi_emitted) {
+ /* We take the dbid from last command so that propagateNow() won't inject another SELECT */
+ int exec_dbid = server.also_propagate.ops[server.also_propagate.numops-1].dbid;
+ propagateNow(exec_dbid,&shared.exec,1,PROPAGATE_AOF|PROPAGATE_REPL);
+ }
+
+ redisOpArrayFree(&server.also_propagate);
+}
+
/* Call() is the core of Redis execution of a command.
*
* The following flags can be passed:
@@ -2884,8 +2940,19 @@ void call(client *c, int flags) {
/* Initialization: clear the flags that must be set by the command on
* demand, and initialize the array for additional commands propagation. */
c->flags &= ~(CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP);
- redisOpArray prev_also_propagate = server.also_propagate;
- redisOpArrayInit(&server.also_propagate);
+
+ /* Redis core is in charge of propagation when the first entry point
+ * of call() is processCommand().
+ * The only other option to get to call() without having processCommand
+ * as an entry point is if a module triggers RM_Call outside of call()
+ * context (for example, in a timer).
+ * In that case, the module is in charge of propagation.
+ *
+ * Because call() is re-entrant we have to cache and restore
+ * server.core_propagates. */
+ int prev_core_propagates = server.core_propagates;
+ if (!server.core_propagates && !(flags & CMD_CALL_FROM_MODULE))
+ server.core_propagates = 1;
/* Call the command. */
dirty = server.dirty;
@@ -2974,9 +3041,14 @@ void call(client *c, int flags) {
real_cmd->calls++;
}
- /* Propagate the command into the AOF and replication link */
+ /* Propagate the command into the AOF and replication link.
+ * We never propagate EXEC explicitly, it will be implicitly
+ * propagated if needed (see propagatePendingCommands).
+ * Also, module commands take care of themselves */
if (flags & CMD_CALL_PROPAGATE &&
- (c->flags & CLIENT_PREVENT_PROP) != CLIENT_PREVENT_PROP)
+ (c->flags & CLIENT_PREVENT_PROP) != CLIENT_PREVENT_PROP &&
+ c->cmd->proc != execCommand &&
+ !(c->cmd->flags & CMD_MODULE))
{
int propagate_flags = PROPAGATE_NONE;
@@ -2999,11 +3071,10 @@ void call(client *c, int flags) {
!(flags & CMD_CALL_PROPAGATE_AOF))
propagate_flags &= ~PROPAGATE_AOF;
- /* Call propagate() only if at least one of AOF / replication
- * propagation is needed. Note that modules commands handle replication
- * in an explicit way, so we never replicate them automatically. */
- if (propagate_flags != PROPAGATE_NONE && !(c->cmd->flags & CMD_MODULE))
- propagate(c->db->id,c->argv,c->argc,propagate_flags);
+ /* Call alsoPropagate() only if at least one of AOF / replication
+ * propagation is needed. */
+ if (propagate_flags != PROPAGATE_NONE)
+ alsoPropagate(c->db->id,c->argv,c->argc,propagate_flags);
}
/* Restore the old replication flags, since call() can be executed
@@ -3012,54 +3083,6 @@ void call(client *c, int flags) {
c->flags |= client_old_flags &
(CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP);
- /* Handle the alsoPropagate() API to handle commands that want to propagate
- * multiple separated commands. Note that alsoPropagate() is not affected
- * by CLIENT_PREVENT_PROP flag. */
- if (server.also_propagate.numops) {
- int j;
- redisOp *rop;
-
- if (flags & CMD_CALL_PROPAGATE) {
- int multi_emitted = 0;
- /* Wrap the commands in server.also_propagate array,
- * but don't wrap it if we are already in MULTI context,
- * in case the nested MULTI/EXEC.
- *
- * And if the array contains only one command, no need to
- * wrap it, since the single command is atomic. */
- if (server.also_propagate.numops > 1 &&
- !(c->cmd->flags & CMD_MODULE) &&
- !(c->flags & CLIENT_MULTI) &&
- !(flags & CMD_CALL_NOWRAP))
- {
- execCommandPropagateMulti(c->db->id);
- multi_emitted = 1;
- }
-
- for (j = 0; j < server.also_propagate.numops; j++) {
- rop = &server.also_propagate.ops[j];
- int target = rop->target;
- /* Whatever the command wish is, we honor the call() flags. */
- if (!(flags&CMD_CALL_PROPAGATE_AOF)) target &= ~PROPAGATE_AOF;
- if (!(flags&CMD_CALL_PROPAGATE_REPL)) target &= ~PROPAGATE_REPL;
- if (target)
- propagate(rop->dbid,rop->argv,rop->argc,target);
- }
-
- if (multi_emitted) {
- execCommandPropagateExec(c->db->id);
- }
- }
- redisOpArrayFree(&server.also_propagate);
- }
- server.also_propagate = prev_also_propagate;
-
- /* Client pause takes effect after a transaction has finished. This needs
- * to be located after everything is propagated. */
- if (!server.in_exec && server.client_pause_in_transaction) {
- server.client_pause_in_transaction = 0;
- }
-
/* If the client has keys tracking enabled for client side caching,
* make sure to remember the keys it fetched via this command. */
if (c->cmd->flags & CMD_READONLY) {
@@ -3084,6 +3107,14 @@ void call(client *c, int flags) {
/* Do some maintenance job and cleanup */
afterCommand(c);
+
+ /* Client pause takes effect after a transaction has finished. This needs
+ * to be located after everything is propagated. */
+ if (!server.in_exec && server.client_pause_in_transaction) {
+ server.client_pause_in_transaction = 0;
+ }
+
+ server.core_propagates = prev_core_propagates;
}
/* Used when a command that is ready for execution needs to be rejected, due to
@@ -3124,9 +3155,16 @@ void rejectCommandFormat(client *c, const char *fmt, ...) {
/* This is called after a command in call, we can do some maintenance job in it. */
void afterCommand(client *c) {
UNUSED(c);
- /* Flush pending invalidation messages only when we are not in nested call.
- * So the messages are not interleaved with transaction response. */
- if (!server.in_nested_call) trackingHandlePendingKeyInvalidations();
+ if (!server.in_nested_call) {
+ /* If we are at the top-most call() we can propagate what we accumulated.
+ * Should be done before trackingHandlePendingKeyInvalidations so that we
+ * reply to client before invalidating cache (makes more sense) */
+ if (server.core_propagates)
+ propagatePendingCommands();
+ /* Flush pending invalidation messages only when we are not in nested call.
+ * So the messages are not interleaved with transaction response. */
+ trackingHandlePendingKeyInvalidations();
+ }
}
/* Returns 1 for commands that may have key names in their arguments, but the legacy range
@@ -3167,10 +3205,9 @@ void populateCommandMovableKeys(struct redisCommand *cmd) {
int processCommand(client *c) {
if (!scriptIsTimedout()) {
/* Both EXEC and EVAL call call() directly so there should be
- * no way in_exec or in_eval or propagate_in_transaction is 1.
+ * no way in_exec or in_eval is 1.
* That is unless lua_timedout, in which case client may run
* some commands. */
- serverAssert(!server.propagate_in_transaction);
serverAssert(!server.in_exec);
serverAssert(!server.in_script);
}
diff --git a/src/server.h b/src/server.h
index 31506a773..8e4d8d902 100644
--- a/src/server.h
+++ b/src/server.h
@@ -489,10 +489,9 @@ typedef enum {
#define CMD_CALL_PROPAGATE_REPL (1<<3)
#define CMD_CALL_PROPAGATE (CMD_CALL_PROPAGATE_AOF|CMD_CALL_PROPAGATE_REPL)
#define CMD_CALL_FULL (CMD_CALL_SLOWLOG | CMD_CALL_STATS | CMD_CALL_PROPAGATE)
-#define CMD_CALL_NOWRAP (1<<4) /* Don't wrap also propagate array into
- MULTI/EXEC: the caller will handle it. */
+#define CMD_CALL_FROM_MODULE (1<<4) /* From RM_Call */
-/* Command propagation flags, see propagate() function */
+/* Command propagation flags, see propagateNow() function */
#define PROPAGATE_NONE 0
#define PROPAGATE_AOF 1
#define PROPAGATE_REPL 2
@@ -1212,6 +1211,7 @@ typedef struct redisOp {
typedef struct redisOpArray {
redisOp *ops;
int numops;
+ int capacity;
} redisOpArray;
/* This structure is returned by the getMemoryOverheadData() function in
@@ -1358,9 +1358,11 @@ struct redisServer {
int sentinel_mode; /* True if this instance is a Sentinel. */
size_t initial_memory_usage; /* Bytes used after initialization. */
int always_show_logo; /* Show logo even for non-stdout logging. */
- int in_script; /* Are we inside EVAL? */
+ int in_script; /* Are we inside EVAL? */
int in_exec; /* Are we inside EXEC? */
- int propagate_in_transaction; /* Make sure we don't propagate nested MULTI/EXEC */
+ int core_propagates; /* Is the core (in oppose to the module subsystem) is in charge of calling propagatePendingCommands? */
+ int propagate_no_multi; /* True if propagatePendingCommands should avoid wrapping command in MULTI/EXEC */
+ int module_ctx_nesting; /* moduleCreateContext() nesting level */
char *ignore_warnings; /* Config: warnings that should be ignored. */
int client_pause_in_transaction; /* Was a client pause executed during this Exec? */
int thp_enabled; /* If true, THP is enabled. */
@@ -2420,10 +2422,6 @@ void touchAllWatchedKeysInDb(redisDb *emptied, redisDb *replaced_with);
void discardTransaction(client *c);
void flagTransaction(client *c);
void execCommandAbort(client *c, sds error);
-void execCommandPropagateMulti(int dbid);
-void execCommandPropagateExec(int dbid);
-void beforePropagateMulti();
-void afterPropagateExec();
/* Redis object implementation */
void decrRefCount(robj *o);
@@ -2695,8 +2693,8 @@ struct redisCommand *lookupCommandByCStringLogic(dict *commands, const char *s);
struct redisCommand *lookupCommandByCString(const char *s);
struct redisCommand *lookupCommandOrOriginal(robj **argv ,int argc);
void call(client *c, int flags);
-void propagate(int dbid, robj **argv, int argc, int flags);
void alsoPropagate(int dbid, robj **argv, int argc, int target);
+void propagatePendingCommands();
void redisOpArrayInit(redisOpArray *oa);
void redisOpArrayFree(redisOpArray *oa);
void forceCommandPropagation(client *c, int flags);
@@ -2812,7 +2810,7 @@ int allowProtectedAction(int config, client *c);
/* db.c -- Keyspace access API */
int removeExpire(redisDb *db, robj *key);
void deleteExpiredKeyAndPropagate(redisDb *db, robj *keyobj);
-void propagateExpire(redisDb *db, robj *key, int lazy);
+void propagateDeletion(redisDb *db, robj *key, int lazy);
int keyIsExpired(redisDb *db, robj *key);
long long getExpire(redisDb *db, robj *key);
void setExpire(client *c, redisDb *db, robj *key, long long when);
diff --git a/src/t_list.c b/src/t_list.c
index 84855ebe2..460437853 100644
--- a/src/t_list.c
+++ b/src/t_list.c
@@ -960,7 +960,7 @@ void serveClientBlockedOnList(client *receiver, robj *o, robj *key, robj *dstkey
serverAssert(llen > 0);
argv[2] = createStringObjectFromLongLong((count > llen) ? llen : count);
- propagate(db->id, argv, 3, PROPAGATE_AOF|PROPAGATE_REPL);
+ alsoPropagate(db->id, argv, 3, PROPAGATE_AOF|PROPAGATE_REPL);
decrRefCount(argv[2]);
/* Pop a range of elements in a nested arrays way. */
@@ -968,7 +968,7 @@ void serveClientBlockedOnList(client *receiver, robj *o, robj *key, robj *dstkey
return;
}
- propagate(db->id, argv, 2, PROPAGATE_AOF|PROPAGATE_REPL);
+ alsoPropagate(db->id, argv, 2, PROPAGATE_AOF|PROPAGATE_REPL);
/* BRPOP/BLPOP */
value = listTypePop(o, wherefrom);
@@ -999,7 +999,7 @@ void serveClientBlockedOnList(client *receiver, robj *o, robj *key, robj *dstkey
argv[2] = dstkey;
argv[3] = getStringObjectFromListPosition(wherefrom);
argv[4] = getStringObjectFromListPosition(whereto);
- propagate(db->id,argv,(isbrpoplpush ? 3 : 5),PROPAGATE_AOF|PROPAGATE_REPL);
+ alsoPropagate(db->id,argv,(isbrpoplpush ? 3 : 5),PROPAGATE_AOF|PROPAGATE_REPL);
/* Notify event ("lpush" or "rpush" was notified by lmoveHandlePush). */
notifyKeyspaceEvent(NOTIFY_LIST,wherefrom == LIST_TAIL ? "rpop" : "lpop",
diff --git a/src/t_stream.c b/src/t_stream.c
index 14dd2bd02..c49889abe 100644
--- a/src/t_stream.c
+++ b/src/t_stream.c
@@ -1399,11 +1399,8 @@ void streamPropagateXCLAIM(client *c, robj *key, streamCG *group, robj *groupnam
argv[12] = shared.lastid;
argv[13] = createObjectFromStreamID(&group->last_id);
- /* We use propagate() because this code path is not always called from
- * the command execution context. Moreover this will just alter the
- * consumer group state, and we don't need MULTI/EXEC wrapping because
- * there is no message state cross-message atomicity required. */
- propagate(c->db->id,argv,14,PROPAGATE_AOF|PROPAGATE_REPL);
+ alsoPropagate(c->db->id,argv,14,PROPAGATE_AOF|PROPAGATE_REPL);
+
decrRefCount(argv[3]);
decrRefCount(argv[7]);
decrRefCount(argv[9]);
@@ -1424,11 +1421,8 @@ void streamPropagateGroupID(client *c, robj *key, streamCG *group, robj *groupna
argv[3] = groupname;
argv[4] = createObjectFromStreamID(&group->last_id);
- /* We use propagate() because this code path is not always called from
- * the command execution context. Moreover this will just alter the
- * consumer group state, and we don't need MULTI/EXEC wrapping because
- * there is no message state cross-message atomicity required. */
- propagate(c->db->id,argv,5,PROPAGATE_AOF|PROPAGATE_REPL);
+ alsoPropagate(c->db->id,argv,5,PROPAGATE_AOF|PROPAGATE_REPL);
+
decrRefCount(argv[4]);
}
@@ -1446,11 +1440,8 @@ void streamPropagateConsumerCreation(client *c, robj *key, robj *groupname, sds
argv[3] = groupname;
argv[4] = createObject(OBJ_STRING,sdsdup(consumername));
- /* We use propagate() because this code path is not always called from
- * the command execution context. Moreover this will just alter the
- * consumer group state, and we don't need MULTI/EXEC wrapping because
- * there is no message state cross-message atomicity required. */
- propagate(c->db->id,argv,5,PROPAGATE_AOF|PROPAGATE_REPL);
+ alsoPropagate(c->db->id,argv,5,PROPAGATE_AOF|PROPAGATE_REPL);
+
decrRefCount(argv[4]);
}