diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/aof.c | 2 | ||||
-rw-r--r-- | src/blocked.c | 27 | ||||
-rw-r--r-- | src/config.c | 3 | ||||
-rw-r--r-- | src/db.c | 18 | ||||
-rw-r--r-- | src/evict.c | 21 | ||||
-rw-r--r-- | src/expire.c | 14 | ||||
-rw-r--r-- | src/module.c | 230 | ||||
-rw-r--r-- | src/multi.c | 42 | ||||
-rw-r--r-- | src/networking.c | 2 | ||||
-rw-r--r-- | src/replication.c | 7 | ||||
-rw-r--r-- | src/script.c | 25 | ||||
-rw-r--r-- | src/script.h | 3 | ||||
-rw-r--r-- | src/server.c | 205 | ||||
-rw-r--r-- | src/server.h | 20 | ||||
-rw-r--r-- | src/t_list.c | 6 | ||||
-rw-r--r-- | src/t_stream.c | 21 |
16 files changed, 305 insertions, 341 deletions
@@ -628,6 +628,8 @@ sds genAofTimestampAnnotationIfNeeded(int force) { void feedAppendOnlyFile(int dictid, robj **argv, int argc) { sds buf = sdsempty(); + serverAssert(dictid >= 0 && dictid < server.dbnum); + /* Feed timestamp if needed */ if (server.aof_timestamp_enabled) { sds ts = genAofTimestampAnnotationIfNeeded(0); diff --git a/src/blocked.c b/src/blocked.c index a554e863b..ccab0e0e1 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -352,10 +352,6 @@ void serveClientsBlockedOnSortedSetKey(robj *o, readyList *rl) { monotime replyTimer; elapsedStart(&replyTimer); genericZpopCommand(receiver, &rl->key, 1, where, 1, count, use_nested_array, reply_nil_when_empty, &deleted); - updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer)); - unblockClient(receiver); - afterCommand(receiver); - server.current_client = old_client; /* Replicate the command. */ int argc = 2; @@ -369,10 +365,15 @@ void serveClientsBlockedOnSortedSetKey(robj *o, readyList *rl) { argv[2] = count_obj; argc++; } - propagate(receiver->db->id, argv, argc, PROPAGATE_AOF|PROPAGATE_REPL); + alsoPropagate(receiver->db->id, argv, argc, PROPAGATE_AOF|PROPAGATE_REPL); decrRefCount(argv[1]); if (count != -1) decrRefCount(argv[2]); + updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer)); + unblockClient(receiver); + afterCommand(receiver); + server.current_client = old_client; + /* The zset is empty and has been deleted. */ if (deleted) break; } @@ -474,7 +475,6 @@ void serveClientsBlockedOnStreamKey(robj *o, readyList *rl) { receiver->bpop.xread_count, 0, group, consumer, noack, &pi); updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer)); - /* Note that after we unblock the client, 'gt' * and other receiver->bpop stuff are no longer * valid, so we must do the setup above before @@ -532,7 +532,6 @@ void serveClientsBlockedOnKeyByModule(readyList *rl) { elapsedStart(&replyTimer); if (!moduleTryServeClientBlockedOnKey(receiver, rl->key)) continue; updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer)); - moduleUnblockClient(receiver); afterCommand(receiver); server.current_client = old_client; @@ -562,6 +561,11 @@ void serveClientsBlockedOnKeyByModule(readyList *rl) { * be used only for a single type, like virtually any Redis application will * do, the function is already fair. */ void handleClientsBlockedOnKeys(void) { + /* This function is called only when also_propagate is in its basic state + * (i.e. not from call(), module context, etc.) */ + serverAssert(server.also_propagate.numops == 0); + server.core_propagates = 1; + while(listLength(server.ready_keys) != 0) { list *l; @@ -603,6 +607,11 @@ void handleClientsBlockedOnKeys(void) { * regardless of the object type: we don't know what the * module is trying to accomplish right now. */ serveClientsBlockedOnKeyByModule(rl); + } else { + /* Edge case: If lookupKeyReadWithFlags decides to expire the key we have to + * take care of the propagation here, because afterCommand wasn't called */ + if (server.also_propagate.numops > 0) + propagatePendingCommands(); } server.fixed_time_expire--; @@ -613,6 +622,10 @@ void handleClientsBlockedOnKeys(void) { } listRelease(l); /* We have the new list on place at this point. */ } + + serverAssert(server.core_propagates); /* This function should not be re-entrant */ + + server.core_propagates = 0; } /* This is how the current blocking lists/sorted sets/streams work, we use diff --git a/src/config.c b/src/config.c index 407cf7249..129770571 100644 --- a/src/config.c +++ b/src/config.c @@ -2207,6 +2207,9 @@ static int updateMaxmemory(const char **err) { } performEvictions(); } + /* The function is called via 'CONFIG SET maxmemory', we don't want to propagate it + * because server.dirty might have been incremented by performEvictions() */ + preventCommandPropagation(server.current_client); return 1; } @@ -1466,7 +1466,7 @@ void deleteExpiredKeyAndPropagate(redisDb *db, robj *keyobj) { latencyAddSampleIfNeeded("expire-del",expire_latency); notifyKeyspaceEvent(NOTIFY_EXPIRED,"expired",keyobj,db->id); signalModifiedKey(NULL, db, keyobj); - propagateExpire(db,keyobj,server.lazyfree_lazy_expire); + propagateDeletion(db,keyobj,server.lazyfree_lazy_expire); server.stat_expiredkeys++; } @@ -1477,8 +1477,18 @@ void deleteExpiredKeyAndPropagate(redisDb *db, robj *keyobj) { * This way the key expiry is centralized in one place, and since both * AOF and the master->slave link guarantee operation ordering, everything * will be consistent even if we allow write operations against expiring - * keys. */ -void propagateExpire(redisDb *db, robj *key, int lazy) { + * keys. + * + * This function may be called from: + * 1. Within call(): Example: Lazy-expire on key access. + * In this case the caller doesn't have to do anything + * because call() handles server.also_propagate(); or + * 2. Outside of call(): Example: Active-expire, eviction. + * In this the caller must remember to call + * propagatePendingCommands, preferably at the end of + * the deletion batch, so that DELs will be wrapped + * in MULTI/EXEC */ +void propagateDeletion(redisDb *db, robj *key, int lazy) { robj *argv[2]; argv[0] = lazy ? shared.unlink : shared.del; @@ -1490,7 +1500,7 @@ void propagateExpire(redisDb *db, robj *key, int lazy) { * Even if module executed a command without asking for propagation. */ int prev_replication_allowed = server.replication_allowed; server.replication_allowed = 1; - propagate(db->id,argv,2,PROPAGATE_AOF|PROPAGATE_REPL); + alsoPropagate(db->id,argv,2,PROPAGATE_AOF|PROPAGATE_REPL); server.replication_allowed = prev_replication_allowed; decrRefCount(argv[0]); diff --git a/src/evict.c b/src/evict.c index a10c2d20e..6e2f8af72 100644 --- a/src/evict.c +++ b/src/evict.c @@ -484,6 +484,10 @@ static int isSafeToPerformEvictions(void) { * expires and evictions of keys not being performed. */ if (checkClientPauseTimeoutAndReturnIfPaused()) return 0; + /* We cannot evict if we already have stuff to propagate (for example, + * CONFIG SET maxmemory inside a MULTI/EXEC) */ + if (server.also_propagate.numops != 0) return 0; + return 1; } @@ -561,6 +565,13 @@ int performEvictions(void) { monotime evictionTimer; elapsedStart(&evictionTimer); + /* Unlike active-expire and blocked client, we can reach here from 'CONFIG SET maxmemory' + * so we have to back-up and restore server.core_propagates. */ + int prev_core_propagates = server.core_propagates; + serverAssert(server.also_propagate.numops == 0); + server.core_propagates = 1; + server.propagate_no_multi = 1; + while (mem_freed < (long long)mem_tofree) { int j, k, i; static unsigned int next_db = 0; @@ -648,7 +659,6 @@ int performEvictions(void) { if (bestkey) { db = server.db+bestdbid; robj *keyobj = createStringObject(bestkey,sdslen(bestkey)); - propagateExpire(db,keyobj,server.lazyfree_lazy_eviction); /* We compute the amount of memory freed by db*Delete() alone. * It is possible that actually the memory needed to propagate * the DEL in AOF and replication link is greater than the one @@ -673,6 +683,7 @@ int performEvictions(void) { signalModifiedKey(NULL,db,keyobj); notifyKeyspaceEvent(NOTIFY_EVICTED, "evicted", keyobj, db->id); + propagateDeletion(db,keyobj,server.lazyfree_lazy_eviction); decrRefCount(keyobj); keys_freed++; @@ -729,6 +740,14 @@ cant_free: } } + serverAssert(server.core_propagates); /* This function should not be re-entrant */ + + /* Propagate all DELs */ + propagatePendingCommands(); + + server.core_propagates = prev_core_propagates; + server.propagate_no_multi = 0; + latencyEndMonitor(latency); latencyAddSampleIfNeeded("eviction-cycle",latency); diff --git a/src/expire.c b/src/expire.c index 6be5d8ff3..059d0c344 100644 --- a/src/expire.c +++ b/src/expire.c @@ -182,6 +182,12 @@ void activeExpireCycle(int type) { long total_sampled = 0; long total_expired = 0; + /* Sanity: There can't be any pending commands to propagate when + * we're in cron */ + serverAssert(server.also_propagate.numops == 0); + server.core_propagates = 1; + server.propagate_no_multi = 1; + for (j = 0; j < dbs_per_call && timelimit_exit == 0; j++) { /* Expired and checked in a single loop. */ unsigned long expired, sampled; @@ -302,6 +308,14 @@ void activeExpireCycle(int type) { (expired*100/sampled) > config_cycle_acceptable_stale); } + serverAssert(server.core_propagates); /* This function should not be re-entrant */ + + /* Propagate all DELs */ + propagatePendingCommands(); + + server.core_propagates = 0; + server.propagate_no_multi = 0; + elapsed = ustime()-start; server.stat_expire_cycle_time_used += elapsed; latencyAddSampleIfNeeded("expire-cycle",elapsed/1000); diff --git a/src/module.c b/src/module.c index 371dee335..81068556d 100644 --- a/src/module.c +++ b/src/module.c @@ -158,22 +158,16 @@ struct RedisModuleCtx { getKeysResult *keys_result; struct RedisModulePoolAllocBlock *pa_head; - redisOpArray saved_oparray; /* When propagating commands in a callback - we reallocate the "also propagate" op - array. Here we save the old one to - restore it later. */ }; typedef struct RedisModuleCtx RedisModuleCtx; -#define REDISMODULE_CTX_INIT {(void*)(unsigned long)&RM_GetApi, NULL, NULL, NULL, NULL, 0, 0, 0, NULL, 0, NULL, NULL, NULL, NULL, {0}} +#define REDISMODULE_CTX_NONE (0) #define REDISMODULE_CTX_AUTO_MEMORY (1<<0) #define REDISMODULE_CTX_KEYS_POS_REQUEST (1<<1) #define REDISMODULE_CTX_BLOCKED_REPLY (1<<2) #define REDISMODULE_CTX_BLOCKED_TIMEOUT (1<<3) #define REDISMODULE_CTX_THREAD_SAFE (1<<4) #define REDISMODULE_CTX_BLOCKED_DISCONNECTED (1<<5) -#define REDISMODULE_CTX_MODULE_COMMAND_CALL (1<<6) -#define REDISMODULE_CTX_MULTI_EMITTED (1<<7) /* This represents a Redis key opened with RM_OpenKey(). */ struct RedisModuleKey { @@ -395,7 +389,6 @@ void RM_FreeCallReply(RedisModuleCallReply *reply); void RM_CloseKey(RedisModuleKey *key); void autoMemoryCollect(RedisModuleCtx *ctx); robj **moduleCreateArgvFromUserFormat(const char *cmdname, const char *fmt, int *argcp, int *argvlenp, int *flags, va_list ap); -void moduleReplicateMultiIfNeeded(RedisModuleCtx *ctx); void RM_ZsetRangeStop(RedisModuleKey *kp); static void zsetKeyReset(RedisModuleKey *key); static void moduleInitKeyTypeSpecific(RedisModuleKey *key); @@ -614,52 +607,14 @@ int RM_GetApi(const char *funcname, void **targetPtrPtr) { return REDISMODULE_OK; } -/* Helper function for when a command callback is called, in order to handle - * details needed to correctly replicate commands. */ -void moduleHandlePropagationAfterCommandCallback(RedisModuleCtx *ctx) { - client *c = ctx->client; - - /* We don't need to do anything here if the context was never used - * in order to propagate commands. */ - if (!(ctx->flags & REDISMODULE_CTX_MULTI_EMITTED)) return; - - /* We don't need to do anything here if the server isn't inside - * a transaction. */ - if (!server.propagate_in_transaction) return; - - /* If this command is executed from with Lua or MULTI/EXEC we do not - * need to propagate EXEC */ - if (server.in_script || server.in_exec) return; - - /* Handle the replication of the final EXEC, since whatever a command - * emits is always wrapped around MULTI/EXEC. */ - alsoPropagate(c->db->id,&shared.exec,1,PROPAGATE_AOF|PROPAGATE_REPL); - afterPropagateExec(); - - /* If this is not a module command context (but is instead a simple - * callback context), we have to handle directly the "also propagate" - * array and emit it. In a module command call this will be handled - * directly by call(). */ - if (!(ctx->flags & REDISMODULE_CTX_MODULE_COMMAND_CALL) && - server.also_propagate.numops) - { - for (int j = 0; j < server.also_propagate.numops; j++) { - redisOp *rop = &server.also_propagate.ops[j]; - int target = rop->target; - if (target) - propagate(rop->dbid,rop->argv,rop->argc,target); - } - redisOpArrayFree(&server.also_propagate); - /* Restore the previous oparray in case of nexted use of the API. */ - server.also_propagate = ctx->saved_oparray; - /* We're done with saved_oparray, let's invalidate it. */ - redisOpArrayInit(&ctx->saved_oparray); - } -} - /* Free the context after the user function was called. */ void moduleFreeContext(RedisModuleCtx *ctx) { - moduleHandlePropagationAfterCommandCallback(ctx); + if (!(ctx->flags & REDISMODULE_CTX_THREAD_SAFE)) { + /* Modules take care of their own propagation, when we are + * outside of call() context (timers, events, etc.). */ + if (--server.module_ctx_nesting == 0 && !server.core_propagates) + propagatePendingCommands(); + } autoMemoryCollect(ctx); poolAllocRelease(ctx); if (ctx->postponed_arrays) { @@ -675,14 +630,29 @@ void moduleFreeContext(RedisModuleCtx *ctx) { if (ctx->flags & REDISMODULE_CTX_THREAD_SAFE) freeClient(ctx->client); } +/* Create a module ctx and keep track of the nesting level. + * + * Note: When creating ctx for threads (RM_GetThreadSafeContext and + * RM_GetDetachedThreadSafeContext) we do not bump up the nesting level + * because we only need to track of nesting level in the main thread + * (only the main thread uses propagatePendingCommands) */ +void moduleCreateContext(RedisModuleCtx *out_ctx, RedisModule *module, int ctx_flags) { + memset(out_ctx, 0 ,sizeof(RedisModuleCtx)); + out_ctx->getapifuncptr = (void*)(unsigned long)&RM_GetApi; + out_ctx->module = module; + out_ctx->flags = ctx_flags; + if (!(ctx_flags & REDISMODULE_CTX_THREAD_SAFE)) { + server.module_ctx_nesting++; + } +} + /* This Redis command binds the normal Redis command invocation with commands * exported by modules. */ void RedisModuleCommandDispatcher(client *c) { RedisModuleCommand *cp = (void*)(unsigned long)c->cmd->getkeys_proc; - RedisModuleCtx ctx = REDISMODULE_CTX_INIT; + RedisModuleCtx ctx; + moduleCreateContext(&ctx, cp->module, REDISMODULE_CTX_NONE); - ctx.flags |= REDISMODULE_CTX_MODULE_COMMAND_CALL; - ctx.module = cp->module; ctx.client = c; cp->func(&ctx,(void**)c->argv,c->argc); moduleFreeContext(&ctx); @@ -715,11 +685,8 @@ void RedisModuleCommandDispatcher(client *c) { * "get keys" call by calling RedisModule_IsKeysPositionRequest(ctx). */ int moduleGetCommandKeysViaAPI(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) { RedisModuleCommand *cp = (void*)(unsigned long)cmd->getkeys_proc; - RedisModuleCtx ctx = REDISMODULE_CTX_INIT; - - ctx.module = cp->module; - ctx.client = NULL; - ctx.flags |= REDISMODULE_CTX_KEYS_POS_REQUEST; + RedisModuleCtx ctx; + moduleCreateContext(&ctx, cp->module, REDISMODULE_CTX_KEYS_POS_REQUEST); /* Initialize getKeysResult */ getKeysPrepareResult(result, MAX_KEYS_BUFFER); @@ -2325,31 +2292,6 @@ int RM_ReplyWithLongDouble(RedisModuleCtx *ctx, long double ld) { * ## Commands replication API * -------------------------------------------------------------------------- */ -/* Helper function to replicate MULTI the first time we replicate something - * in the context of a command execution. EXEC will be handled by the - * RedisModuleCommandDispatcher() function. */ -void moduleReplicateMultiIfNeeded(RedisModuleCtx *ctx) { - /* Skip this if client explicitly wrap the command with MULTI, or if - * the module command was called by a script. */ - if (server.in_script || server.in_exec) return; - /* If we already emitted MULTI return ASAP. */ - if (server.propagate_in_transaction) return; - /* If this is a thread safe context, we do not want to wrap commands - * executed into MULTI/EXEC, they are executed as single commands - * from an external client in essence. */ - if (ctx->flags & REDISMODULE_CTX_THREAD_SAFE) return; - /* If this is a callback context, and not a module command execution - * context, we have to setup the op array for the "also propagate" API - * so that RM_Replicate() will work. */ - if (!(ctx->flags & REDISMODULE_CTX_MODULE_COMMAND_CALL)) { - serverAssert(ctx->saved_oparray.ops == NULL); - ctx->saved_oparray = server.also_propagate; - redisOpArrayInit(&server.also_propagate); - } - execCommandPropagateMulti(ctx->client->db->id); - ctx->flags |= REDISMODULE_CTX_MULTI_EMITTED; -} - /* Replicate the specified command and arguments to slaves and AOF, as effect * of execution of the calling command implementation. * @@ -2409,16 +2351,7 @@ int RM_Replicate(RedisModuleCtx *ctx, const char *cmdname, const char *fmt, ...) if (!(flags & REDISMODULE_ARGV_NO_AOF)) target |= PROPAGATE_AOF; if (!(flags & REDISMODULE_ARGV_NO_REPLICAS)) target |= PROPAGATE_REPL; - /* Replicate! When we are in a threaded context, we want to just insert - * the replicated command ASAP, since it is not clear when the context - * will stop being used, so accumulating stuff does not make much sense, - * nor we could easily use the alsoPropagate() API from threads. */ - if (ctx->flags & REDISMODULE_CTX_THREAD_SAFE) { - propagate(ctx->client->db->id,argv,argc,target); - } else { - moduleReplicateMultiIfNeeded(ctx); - alsoPropagate(ctx->client->db->id,argv,argc,target); - } + alsoPropagate(ctx->client->db->id,argv,argc,target); /* Release the argv. */ for (j = 0; j < argc; j++) decrRefCount(argv[j]); @@ -5028,19 +4961,18 @@ RedisModuleCallReply *RM_Call(RedisModuleCtx *ctx, const char *cmdname, const ch server.replication_allowed = replicate && server.replication_allowed; /* Run the command */ - int call_flags = CMD_CALL_SLOWLOG | CMD_CALL_STATS | CMD_CALL_NOWRAP; + int call_flags = CMD_CALL_SLOWLOG | CMD_CALL_STATS | CMD_CALL_FROM_MODULE; if (replicate) { - /* If we are using single commands replication, we need to wrap what - * we propagate into a MULTI/EXEC block, so that it will be atomic like - * a Lua script in the context of AOF and slaves. */ - moduleReplicateMultiIfNeeded(ctx); - if (!(flags & REDISMODULE_ARGV_NO_AOF)) call_flags |= CMD_CALL_PROPAGATE_AOF; if (!(flags & REDISMODULE_ARGV_NO_REPLICAS)) call_flags |= CMD_CALL_PROPAGATE_REPL; } + /* Set server.current_client */ + client *old_client = server.current_client; + server.current_client = c; call(c,call_flags); + server.current_client = old_client; server.replication_allowed = prev_replication_allowed; serverAssert((c->flags & CLIENT_BLOCKED) == 0); @@ -6007,11 +5939,8 @@ void RM_EmitAOF(RedisModuleIO *io, const char *cmdname, const char *fmt, ...) { RedisModuleCtx *RM_GetContextFromIO(RedisModuleIO *io) { if (io->ctx) return io->ctx; /* Can't have more than one... */ - RedisModuleCtx ctxtemplate = REDISMODULE_CTX_INIT; io->ctx = zmalloc(sizeof(RedisModuleCtx)); - *(io->ctx) = ctxtemplate; - io->ctx->module = io->type->module; - io->ctx->client = NULL; + moduleCreateContext(io->ctx, io->type->module, REDISMODULE_CTX_NONE); return io->ctx; } @@ -6162,9 +6091,9 @@ void unblockClientFromModule(client *c) { * by the module itself or because of a timeout, so the callback will NOT * get called if this is not an actual disconnection event. */ if (bc->disconnect_callback) { - RedisModuleCtx ctx = REDISMODULE_CTX_INIT; + RedisModuleCtx ctx; + moduleCreateContext(&ctx, bc->module, REDISMODULE_CTX_NONE); ctx.blocked_privdata = bc->privdata; - ctx.module = bc->module; ctx.client = bc->client; bc->disconnect_callback(&ctx,bc); moduleFreeContext(&ctx); @@ -6274,11 +6203,10 @@ int moduleTryServeClientBlockedOnKey(client *c, robj *key) { * explicit call). See #6798. */ if (bc->unblocked) return 0; - RedisModuleCtx ctx = REDISMODULE_CTX_INIT; - ctx.flags |= REDISMODULE_CTX_BLOCKED_REPLY; + RedisModuleCtx ctx; + moduleCreateContext(&ctx, bc->module, REDISMODULE_CTX_BLOCKED_REPLY); ctx.blocked_ready_key = key; ctx.blocked_privdata = bc->privdata; - ctx.module = bc->module; ctx.client = bc->client; ctx.blocked_client = bc; if (bc->reply_callback(&ctx,(void**)c->argv,c->argc) == REDISMODULE_OK) @@ -6521,11 +6449,10 @@ void moduleHandleBlockedClients(void) { * the key was signaled as ready. */ uint64_t reply_us = 0; if (c && !bc->blocked_on_keys && bc->reply_callback) { - RedisModuleCtx ctx = REDISMODULE_CTX_INIT; - ctx.flags |= REDISMODULE_CTX_BLOCKED_REPLY; + RedisModuleCtx ctx; + moduleCreateContext(&ctx, bc->module, REDISMODULE_CTX_BLOCKED_REPLY); ctx.blocked_privdata = bc->privdata; ctx.blocked_ready_key = NULL; - ctx.module = bc->module; ctx.client = bc->client; ctx.blocked_client = bc; monotime replyTimer; @@ -6544,11 +6471,10 @@ void moduleHandleBlockedClients(void) { /* Free privdata if any. */ if (bc->privdata && bc->free_privdata) { - RedisModuleCtx ctx = REDISMODULE_CTX_INIT; - if (c == NULL) - ctx.flags |= REDISMODULE_CTX_BLOCKED_DISCONNECTED; + RedisModuleCtx ctx; + int ctx_flags = c == NULL ? REDISMODULE_CTX_BLOCKED_DISCONNECTED : REDISMODULE_CTX_NONE; + moduleCreateContext(&ctx, bc->module, ctx_flags); ctx.blocked_privdata = bc->privdata; - ctx.module = bc->module; ctx.client = bc->client; bc->free_privdata(&ctx,bc->privdata); moduleFreeContext(&ctx); @@ -6613,9 +6539,8 @@ void moduleBlockedClientTimedOut(client *c) { * explicit call). See #6798. */ if (bc->unblocked) return; - RedisModuleCtx ctx = REDISMODULE_CTX_INIT; - ctx.flags |= REDISMODULE_CTX_BLOCKED_TIMEOUT; - ctx.module = bc->module; + RedisModuleCtx ctx; + moduleCreateContext(&ctx, bc->module, REDISMODULE_CTX_BLOCKED_TIMEOUT); ctx.client = bc->client; ctx.blocked_client = bc; ctx.blocked_privdata = bc->privdata; @@ -6694,19 +6619,16 @@ int RM_BlockedClientDisconnected(RedisModuleCtx *ctx) { * the module ID and thus be more useful for logging. */ RedisModuleCtx *RM_GetThreadSafeContext(RedisModuleBlockedClient *bc) { RedisModuleCtx *ctx = zmalloc(sizeof(*ctx)); - RedisModuleCtx empty = REDISMODULE_CTX_INIT; - memcpy(ctx,&empty,sizeof(empty)); - if (bc) { - ctx->blocked_client = bc; - ctx->module = bc->module; - } - ctx->flags |= REDISMODULE_CTX_THREAD_SAFE; + RedisModule *module = bc ? bc->module : NULL; + moduleCreateContext(ctx, module, REDISMODULE_CTX_THREAD_SAFE); + /* Even when the context is associated with a blocked client, we can't * access it safely from another thread, so we create a fake client here * in order to keep things like the currently selected database and similar * things. */ ctx->client = createClient(NULL); if (bc) { + ctx->blocked_client = bc; selectDb(ctx->client,bc->dbid); if (bc->client) { ctx->client->id = bc->client->id; @@ -6723,10 +6645,7 @@ RedisModuleCtx *RM_GetThreadSafeContext(RedisModuleBlockedClient *bc) { * a long term, for purposes such as logging. */ RedisModuleCtx *RM_GetDetachedThreadSafeContext(RedisModuleCtx *ctx) { RedisModuleCtx *new_ctx = zmalloc(sizeof(*new_ctx)); - RedisModuleCtx empty = REDISMODULE_CTX_INIT; - memcpy(new_ctx,&empty,sizeof(empty)); - new_ctx->module = ctx->module; - new_ctx->flags |= REDISMODULE_CTX_THREAD_SAFE; + moduleCreateContext(new_ctx, ctx->module, REDISMODULE_CTX_THREAD_SAFE); new_ctx->client = createClient(NULL); return new_ctx; } @@ -6737,12 +6656,22 @@ void RM_FreeThreadSafeContext(RedisModuleCtx *ctx) { zfree(ctx); } +void moduleGILAfterLock() { + /* We should never get here if we already inside a module + * code block which already opened a context. */ + serverAssert(server.module_ctx_nesting == 0); + /* Bump up the nesting level to prevent immediate propagation + * of possible RM_Call from th thread */ + server.module_ctx_nesting++; +} + /* Acquire the server lock before executing a thread safe API call. * This is not needed for `RedisModule_Reply*` calls when there is * a blocked client connected to the thread safe context. */ void RM_ThreadSafeContextLock(RedisModuleCtx *ctx) { UNUSED(ctx); moduleAcquireGIL(); + moduleGILAfterLock(); } /* Similar to RM_ThreadSafeContextLock but this function @@ -6759,12 +6688,26 @@ int RM_ThreadSafeContextTryLock(RedisModuleCtx *ctx) { errno = res; return REDISMODULE_ERR; } + moduleGILAfterLock(); return REDISMODULE_OK; } +void moduleGILBeforeUnlock() { + /* We should never get here if we already inside a module + * code block which already opened a context, except + * the bump-up from moduleGILAcquired. */ + serverAssert(server.module_ctx_nesting == 1); + /* Restore ctx_nesting and propagate pending commands + * (because it's u clear when thread safe contexts are + * released we have to propagate here). */ + server.module_ctx_nesting--; + propagatePendingCommands(); +} + /* Release the server lock after a thread safe API call was executed. */ void RM_ThreadSafeContextUnlock(RedisModuleCtx *ctx) { UNUSED(ctx); + moduleGILBeforeUnlock(); moduleReleaseGIL(); } @@ -6885,8 +6828,8 @@ void moduleNotifyKeyspaceEvent(int type, const char *event, robj *key, int dbid) /* Only notify subscribers on events matching they registration, * and avoid subscribers triggering themselves */ if ((sub->event_mask & type) && sub->active == 0) { - RedisModuleCtx ctx = REDISMODULE_CTX_INIT; - ctx.module = sub->module; + RedisModuleCtx ctx; + moduleCreateContext(&ctx, sub->module, REDISMODULE_CTX_NONE); ctx.client = moduleFreeContextReusedClient; selectDb(ctx.client, dbid); @@ -6948,8 +6891,8 @@ void moduleCallClusterReceivers(const char *sender_id, uint64_t module_id, uint8 moduleClusterReceiver *r = clusterReceivers[type]; while(r) { if (r->module_id == module_id) { - RedisModuleCtx ctx = REDISMODULE_CTX_INIT; - ctx.module = r->module; + RedisModuleCtx ctx; + moduleCreateContext(&ctx, r->module, REDISMODULE_CTX_NONE); ctx.client = moduleFreeContextReusedClient; selectDb(ctx.client, 0); r->callback(&ctx,sender_id,type,payload,len); @@ -7220,9 +7163,8 @@ int moduleTimerHandler(struct aeEventLoop *eventLoop, long long id, void *client expiretime = ntohu64(expiretime); if (now >= expiretime) { RedisModuleTimer *timer = ri.data; - RedisModuleCtx ctx = REDISMODULE_CTX_INIT; - - ctx.module = timer->module; + RedisModuleCtx ctx; + moduleCreateContext(&ctx, timer->module, REDISMODULE_CTX_NONE); ctx.client = moduleFreeContextReusedClient; selectDb(ctx.client, timer->dbid); timer->callback(&ctx,timer->data); @@ -9326,8 +9268,8 @@ void moduleFireServerEvent(uint64_t eid, int subid, void *data) { while((ln = listNext(&li))) { RedisModuleEventListener *el = ln->value; if (el->event.id == eid) { - RedisModuleCtx ctx = REDISMODULE_CTX_INIT; - ctx.module = el->module; + RedisModuleCtx ctx; + moduleCreateContext(&ctx, el->module, REDISMODULE_CTX_NONE); if (eid == REDISMODULE_EVENT_CLIENT_CHANGE) { /* In the case of client changes, we're pushing the real client @@ -9658,7 +9600,8 @@ void moduleUnregisterCommands(struct RedisModule *module) { int moduleLoad(const char *path, void **module_argv, int module_argc) { int (*onload)(void *, void **, int); void *handle; - RedisModuleCtx ctx = REDISMODULE_CTX_INIT; + RedisModuleCtx ctx; + moduleCreateContext(&ctx, NULL, REDISMODULE_CTX_NONE); /* We pass NULL since we don't have a module yet. */ ctx.client = moduleFreeContextReusedClient; selectDb(ctx.client, 0); @@ -9715,7 +9658,6 @@ int moduleLoad(const char *path, void **module_argv, int module_argc) { moduleFireServerEvent(REDISMODULE_EVENT_MODULE_CHANGE, REDISMODULE_SUBEVENT_MODULE_LOADED, ctx.module); - moduleFreeContext(&ctx); return C_OK; } @@ -9751,8 +9693,8 @@ int moduleUnload(sds name) { int (*onunload)(void *); onunload = (int (*)(void *))(unsigned long) dlsym(module->handle, "RedisModule_OnUnload"); if (onunload) { - RedisModuleCtx ctx = REDISMODULE_CTX_INIT; - ctx.module = module; + RedisModuleCtx ctx; + moduleCreateContext(&ctx, module, REDISMODULE_CTX_NONE); ctx.client = moduleFreeContextReusedClient; int unload_status = onunload((void*)&ctx); moduleFreeContext(&ctx); diff --git a/src/multi.c b/src/multi.c index 1c1a17ee7..a98195672 100644 --- a/src/multi.c +++ b/src/multi.c @@ -120,30 +120,6 @@ void discardCommand(client *c) { addReply(c,shared.ok); } -void beforePropagateMulti() { - /* Propagating MULTI */ - serverAssert(!server.propagate_in_transaction); - server.propagate_in_transaction = 1; -} - -void afterPropagateExec() { - /* Propagating EXEC */ - serverAssert(server.propagate_in_transaction == 1); - server.propagate_in_transaction = 0; -} - -/* Send a MULTI command to all the slaves and AOF file. Check the execCommand - * implementation for more information. */ -void execCommandPropagateMulti(int dbid) { - beforePropagateMulti(); - propagate(dbid,&shared.multi,1,PROPAGATE_AOF|PROPAGATE_REPL); -} - -void execCommandPropagateExec(int dbid) { - propagate(dbid,&shared.exec,1,PROPAGATE_AOF|PROPAGATE_REPL); - afterPropagateExec(); -} - /* Aborts a transaction, with a specific error message. * The transaction is always aborted with -EXECABORT so that the client knows * the server exited the multi state, but the actual reason for the abort is @@ -166,7 +142,6 @@ void execCommand(client *c) { robj **orig_argv; int orig_argc, orig_argv_len; struct redisCommand *orig_cmd; - int was_master = server.masterhost == NULL; if (!(c->flags & CLIENT_MULTI)) { addReplyError(c,"EXEC without MULTI"); @@ -268,23 +243,6 @@ void execCommand(client *c) { c->cmd = orig_cmd; discardTransaction(c); - /* Make sure the EXEC command will be propagated as well if MULTI - * was already propagated. */ - if (server.propagate_in_transaction) { - int is_master = server.masterhost == NULL; - server.dirty++; - /* If inside the MULTI/EXEC block this instance was suddenly - * switched from master to slave (using the SLAVEOF command), the - * initial MULTI was propagated into the replication backlog, but the - * rest was not. We need to make sure to at least terminate the - * backlog with the final EXEC. */ - if (server.repl_backlog && was_master && !is_master) { - char *execcmd = "*1\r\n$4\r\nEXEC\r\n"; - feedReplicationBuffer(execcmd,strlen(execcmd)); - } - afterPropagateExec(); - } - server.in_exec = 0; } diff --git a/src/networking.c b/src/networking.c index 7e5779b02..238af0b8f 100644 --- a/src/networking.c +++ b/src/networking.c @@ -3563,7 +3563,7 @@ void pauseClients(mstime_t end, pause_type type) { /* We allow write commands that were queued * up before and after to execute. We need * to track this state so that we don't assert - * in propagate(). */ + * in propagateNow(). */ if (server.in_exec) { server.client_pause_in_transaction = 1; } diff --git a/src/replication.c b/src/replication.c index 421f0be40..ade3ca1c9 100644 --- a/src/replication.c +++ b/src/replication.c @@ -421,6 +421,10 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { int j, len; char llstr[LONG_STR_SIZE]; + /* In case we propagate a command that doesn't touch keys (PING, REPLCONF) we + * pass dbid=server.slaveseldb which may be -1. */ + serverAssert(dictid == -1 || (dictid >= 0 && dictid < server.dbnum)); + /* If the instance is not a top level master, return ASAP: we'll just proxy * the stream of data we receive from our master instead, in order to * propagate *identical* replication stream. In this way this slave can @@ -456,8 +460,9 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { if (dictid < 0 || dictid >= PROTO_SHARED_SELECT_CMDS) decrRefCount(selectcmd); + + server.slaveseldb = dictid; } - server.slaveseldb = dictid; /* Write the command to the replication buffer if any. */ char aux[LONG_STR_SIZE+3]; diff --git a/src/script.c b/src/script.c index fdafadae8..0c46beea6 100644 --- a/src/script.c +++ b/src/script.c @@ -148,11 +148,7 @@ void scriptResetRun(scriptRunCtx *run_ctx) { unprotectClient(run_ctx->original_client); } - /* emit EXEC if MULTI has been propagated. */ preventCommandPropagation(run_ctx->original_client); - if (run_ctx->flags & SCRIPT_MULTI_EMMITED) { - execCommandPropagateExec(run_ctx->original_client->db->id); - } /* unset curr_run_ctx so we will know there is no running script */ curr_run_ctx = NULL; @@ -332,25 +328,6 @@ static int scriptVerifyClusterState(client *c, client *original_c, sds *err) { return C_OK; } -static void scriptEmitMultiIfNeeded(scriptRunCtx *run_ctx) { - /* If we are using single commands replication, we need to wrap what - * we propagate into a MULTI/EXEC block, so that it will be atomic like - * a Lua script in the context of AOF and slaves. */ - client *c = run_ctx->c; - if (!(run_ctx->flags & SCRIPT_MULTI_EMMITED) - && !(run_ctx->original_client->flags & CLIENT_MULTI) - && (run_ctx->flags & SCRIPT_WRITE_DIRTY) - && ((run_ctx->repl_flags & PROPAGATE_AOF) - || (run_ctx->repl_flags & PROPAGATE_REPL))) - { - execCommandPropagateMulti(run_ctx->original_client->db->id); - run_ctx->flags |= SCRIPT_MULTI_EMMITED; - /* Now we are in the MULTI context, the lua_client should be - * flag as CLIENT_MULTI. */ - c->flags |= CLIENT_MULTI; - } -} - /* set RESP for a given run_ctx */ int scriptSetResp(scriptRunCtx *run_ctx, int resp) { if (resp != 2 && resp != 3) { @@ -423,8 +400,6 @@ void scriptCall(scriptRunCtx *run_ctx, robj* *argv, int argc, sds *err) { return; } - scriptEmitMultiIfNeeded(run_ctx); - int call_flags = CMD_CALL_SLOWLOG | CMD_CALL_STATS; if (run_ctx->repl_flags & PROPAGATE_AOF) { call_flags |= CMD_CALL_PROPAGATE_AOF; diff --git a/src/script.h b/src/script.h index cfe6abaff..b2e253ee0 100644 --- a/src/script.h +++ b/src/script.h @@ -59,12 +59,9 @@ /* runCtx flags */ #define SCRIPT_WRITE_DIRTY (1ULL<<0) /* indicate that the current script already performed a write command */ - -#define SCRIPT_MULTI_EMMITED (1ULL<<2) /* indicate that we already wrote a multi command to replication/aof */ #define SCRIPT_TIMEDOUT (1ULL<<3) /* indicate that the current script timedout */ #define SCRIPT_KILLED (1ULL<<4) /* indicate that the current script was marked to be killed */ #define SCRIPT_READ_ONLY (1ULL<<5) /* indicate that the current script should only perform read commands */ - #define SCRIPT_EVAL_MODE (1ULL<<7) /* Indicate that the current script called from legacy Lua */ typedef struct scriptRunCtx scriptRunCtx; diff --git a/src/server.c b/src/server.c index bfc58e0ed..e1f7919b7 100644 --- a/src/server.c +++ b/src/server.c @@ -2314,7 +2314,9 @@ void initServer(void) { server.cronloops = 0; server.in_script = 0; server.in_exec = 0; - server.propagate_in_transaction = 0; + server.core_propagates = 0; + server.propagate_no_multi = 0; + server.module_ctx_nesting = 0; server.client_pause_in_transaction = 0; server.child_pid = -1; server.child_type = CHILD_TYPE_NONE; @@ -2633,12 +2635,21 @@ void resetErrorTableStats(void) { void redisOpArrayInit(redisOpArray *oa) { oa->ops = NULL; oa->numops = 0; + oa->capacity = 0; } int redisOpArrayAppend(redisOpArray *oa, int dbid, robj **argv, int argc, int target) { redisOp *op; + int prev_capacity = oa->capacity; - oa->ops = zrealloc(oa->ops,sizeof(redisOp)*(oa->numops+1)); + if (oa->numops == 0) { + oa->capacity = 16; + } else if (oa->numops >= oa->capacity) { + oa->capacity *= 2; + } + + if (prev_capacity != oa->capacity) + oa->ops = zrealloc(oa->ops,sizeof(redisOp)*oa->capacity); op = oa->ops+oa->numops; op->dbid = dbid; op->argv = argv; @@ -2660,7 +2671,7 @@ void redisOpArrayFree(redisOpArray *oa) { zfree(op->argv); } zfree(oa->ops); - oa->ops = NULL; + redisOpArrayInit(oa); } /* ====================== Commands lookup and execution ===================== */ @@ -2735,6 +2746,22 @@ struct redisCommand *lookupCommandOrOriginal(robj **argv ,int argc) { return cmd; } +static int shouldPropagate(int target) { + if (!server.replication_allowed || target == PROPAGATE_NONE || server.loading) + return 0; + + if (target & PROPAGATE_AOF) { + if (server.aof_state != AOF_OFF) + return 1; + } + if (target & PROPAGATE_REPL) { + if (server.masterhost == NULL && (server.repl_backlog || listLength(server.slaves) != 0)) + return 1; + } + + return 0; +} + /* Propagate the specified command (in the context of the specified database id) * to AOF and Slaves. * @@ -2743,33 +2770,21 @@ struct redisCommand *lookupCommandOrOriginal(robj **argv ,int argc) { * + PROPAGATE_AOF (propagate into the AOF file if is enabled) * + PROPAGATE_REPL (propagate into the replication link) * - * This should not be used inside commands implementation since it will not - * wrap the resulting commands in MULTI/EXEC. Use instead alsoPropagate(), - * preventCommandPropagation(), forceCommandPropagation(). + * This is an internal low-level function and should not be called! * - * However for functions that need to (also) propagate out of the context of a - * command execution, for example when serving a blocked client, you - * want to use propagate(). + * The API for propagating commands is alsoPropagate(). */ -void propagate(int dbid, robj **argv, int argc, int flags) { - if (!server.replication_allowed) +static void propagateNow(int dbid, robj **argv, int argc, int target) { + if (!shouldPropagate(target)) return; - /* Propagate a MULTI request once we encounter the first command which - * is a write command. - * This way we'll deliver the MULTI/..../EXEC block as a whole and - * both the AOF and the replication link will have the same consistency - * and atomicity guarantees. */ - if (server.in_exec && !server.propagate_in_transaction) - execCommandPropagateMulti(dbid); - /* This needs to be unreachable since the dataset should be fixed during * client pause, otherwise data may be lost during a failover. */ serverAssert(!(areClientsPaused() && !server.client_pause_in_transaction)); - if (server.aof_state != AOF_OFF && flags & PROPAGATE_AOF) + if (server.aof_state != AOF_OFF && target & PROPAGATE_AOF) feedAppendOnlyFile(dbid,argv,argc); - if (flags & PROPAGATE_REPL) + if (target & PROPAGATE_REPL) replicationFeedSlaves(server.slaves,dbid,argv,argc); } @@ -2788,7 +2803,8 @@ void alsoPropagate(int dbid, robj **argv, int argc, int target) { robj **argvcopy; int j; - if (server.loading) return; /* No propagation during loading. */ + if (!shouldPropagate(target)) + return; argvcopy = zmalloc(sizeof(robj*)*argc); for (j = 0; j < argc; j++) { @@ -2837,6 +2853,46 @@ void slowlogPushCurrentCommand(client *c, struct redisCommand *cmd, ustime_t dur slowlogPushEntryIfNeeded(c,argv,argc,duration); } +/* Handle the alsoPropagate() API to handle commands that want to propagate + * multiple separated commands. Note that alsoPropagate() is not affected + * by CLIENT_PREVENT_PROP flag. */ +void propagatePendingCommands() { + if (server.also_propagate.numops == 0) + return; + + int j; + redisOp *rop; + int multi_emitted = 0; + + /* Wrap the commands in server.also_propagate array, + * but don't wrap it if we are already in MULTI context, + * in case the nested MULTI/EXEC. + * + * And if the array contains only one command, no need to + * wrap it, since the single command is atomic. */ + if (server.also_propagate.numops > 1 && !server.propagate_no_multi) { + /* We use the first command-to-propagate to set the dbid for MULTI, + * so that the SELECT will be propagated beforehand */ + int multi_dbid = server.also_propagate.ops[0].dbid; + propagateNow(multi_dbid,&shared.multi,1,PROPAGATE_AOF|PROPAGATE_REPL); + multi_emitted = 1; + } + + for (j = 0; j < server.also_propagate.numops; j++) { + rop = &server.also_propagate.ops[j]; + serverAssert(rop->target); + propagateNow(rop->dbid,rop->argv,rop->argc,rop->target); + } + + if (multi_emitted) { + /* We take the dbid from last command so that propagateNow() won't inject another SELECT */ + int exec_dbid = server.also_propagate.ops[server.also_propagate.numops-1].dbid; + propagateNow(exec_dbid,&shared.exec,1,PROPAGATE_AOF|PROPAGATE_REPL); + } + + redisOpArrayFree(&server.also_propagate); +} + /* Call() is the core of Redis execution of a command. * * The following flags can be passed: @@ -2884,8 +2940,19 @@ void call(client *c, int flags) { /* Initialization: clear the flags that must be set by the command on * demand, and initialize the array for additional commands propagation. */ c->flags &= ~(CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP); - redisOpArray prev_also_propagate = server.also_propagate; - redisOpArrayInit(&server.also_propagate); + + /* Redis core is in charge of propagation when the first entry point + * of call() is processCommand(). + * The only other option to get to call() without having processCommand + * as an entry point is if a module triggers RM_Call outside of call() + * context (for example, in a timer). + * In that case, the module is in charge of propagation. + * + * Because call() is re-entrant we have to cache and restore + * server.core_propagates. */ + int prev_core_propagates = server.core_propagates; + if (!server.core_propagates && !(flags & CMD_CALL_FROM_MODULE)) + server.core_propagates = 1; /* Call the command. */ dirty = server.dirty; @@ -2974,9 +3041,14 @@ void call(client *c, int flags) { real_cmd->calls++; } - /* Propagate the command into the AOF and replication link */ + /* Propagate the command into the AOF and replication link. + * We never propagate EXEC explicitly, it will be implicitly + * propagated if needed (see propagatePendingCommands). + * Also, module commands take care of themselves */ if (flags & CMD_CALL_PROPAGATE && - (c->flags & CLIENT_PREVENT_PROP) != CLIENT_PREVENT_PROP) + (c->flags & CLIENT_PREVENT_PROP) != CLIENT_PREVENT_PROP && + c->cmd->proc != execCommand && + !(c->cmd->flags & CMD_MODULE)) { int propagate_flags = PROPAGATE_NONE; @@ -2999,11 +3071,10 @@ void call(client *c, int flags) { !(flags & CMD_CALL_PROPAGATE_AOF)) propagate_flags &= ~PROPAGATE_AOF; - /* Call propagate() only if at least one of AOF / replication - * propagation is needed. Note that modules commands handle replication - * in an explicit way, so we never replicate them automatically. */ - if (propagate_flags != PROPAGATE_NONE && !(c->cmd->flags & CMD_MODULE)) - propagate(c->db->id,c->argv,c->argc,propagate_flags); + /* Call alsoPropagate() only if at least one of AOF / replication + * propagation is needed. */ + if (propagate_flags != PROPAGATE_NONE) + alsoPropagate(c->db->id,c->argv,c->argc,propagate_flags); } /* Restore the old replication flags, since call() can be executed @@ -3012,54 +3083,6 @@ void call(client *c, int flags) { c->flags |= client_old_flags & (CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP); - /* Handle the alsoPropagate() API to handle commands that want to propagate - * multiple separated commands. Note that alsoPropagate() is not affected - * by CLIENT_PREVENT_PROP flag. */ - if (server.also_propagate.numops) { - int j; - redisOp *rop; - - if (flags & CMD_CALL_PROPAGATE) { - int multi_emitted = 0; - /* Wrap the commands in server.also_propagate array, - * but don't wrap it if we are already in MULTI context, - * in case the nested MULTI/EXEC. - * - * And if the array contains only one command, no need to - * wrap it, since the single command is atomic. */ - if (server.also_propagate.numops > 1 && - !(c->cmd->flags & CMD_MODULE) && - !(c->flags & CLIENT_MULTI) && - !(flags & CMD_CALL_NOWRAP)) - { - execCommandPropagateMulti(c->db->id); - multi_emitted = 1; - } - - for (j = 0; j < server.also_propagate.numops; j++) { - rop = &server.also_propagate.ops[j]; - int target = rop->target; - /* Whatever the command wish is, we honor the call() flags. */ - if (!(flags&CMD_CALL_PROPAGATE_AOF)) target &= ~PROPAGATE_AOF; - if (!(flags&CMD_CALL_PROPAGATE_REPL)) target &= ~PROPAGATE_REPL; - if (target) - propagate(rop->dbid,rop->argv,rop->argc,target); - } - - if (multi_emitted) { - execCommandPropagateExec(c->db->id); - } - } - redisOpArrayFree(&server.also_propagate); - } - server.also_propagate = prev_also_propagate; - - /* Client pause takes effect after a transaction has finished. This needs - * to be located after everything is propagated. */ - if (!server.in_exec && server.client_pause_in_transaction) { - server.client_pause_in_transaction = 0; - } - /* If the client has keys tracking enabled for client side caching, * make sure to remember the keys it fetched via this command. */ if (c->cmd->flags & CMD_READONLY) { @@ -3084,6 +3107,14 @@ void call(client *c, int flags) { /* Do some maintenance job and cleanup */ afterCommand(c); + + /* Client pause takes effect after a transaction has finished. This needs + * to be located after everything is propagated. */ + if (!server.in_exec && server.client_pause_in_transaction) { + server.client_pause_in_transaction = 0; + } + + server.core_propagates = prev_core_propagates; } /* Used when a command that is ready for execution needs to be rejected, due to @@ -3124,9 +3155,16 @@ void rejectCommandFormat(client *c, const char *fmt, ...) { /* This is called after a command in call, we can do some maintenance job in it. */ void afterCommand(client *c) { UNUSED(c); - /* Flush pending invalidation messages only when we are not in nested call. - * So the messages are not interleaved with transaction response. */ - if (!server.in_nested_call) trackingHandlePendingKeyInvalidations(); + if (!server.in_nested_call) { + /* If we are at the top-most call() we can propagate what we accumulated. + * Should be done before trackingHandlePendingKeyInvalidations so that we + * reply to client before invalidating cache (makes more sense) */ + if (server.core_propagates) + propagatePendingCommands(); + /* Flush pending invalidation messages only when we are not in nested call. + * So the messages are not interleaved with transaction response. */ + trackingHandlePendingKeyInvalidations(); + } } /* Returns 1 for commands that may have key names in their arguments, but the legacy range @@ -3167,10 +3205,9 @@ void populateCommandMovableKeys(struct redisCommand *cmd) { int processCommand(client *c) { if (!scriptIsTimedout()) { /* Both EXEC and EVAL call call() directly so there should be - * no way in_exec or in_eval or propagate_in_transaction is 1. + * no way in_exec or in_eval is 1. * That is unless lua_timedout, in which case client may run * some commands. */ - serverAssert(!server.propagate_in_transaction); serverAssert(!server.in_exec); serverAssert(!server.in_script); } diff --git a/src/server.h b/src/server.h index 31506a773..8e4d8d902 100644 --- a/src/server.h +++ b/src/server.h @@ -489,10 +489,9 @@ typedef enum { #define CMD_CALL_PROPAGATE_REPL (1<<3) #define CMD_CALL_PROPAGATE (CMD_CALL_PROPAGATE_AOF|CMD_CALL_PROPAGATE_REPL) #define CMD_CALL_FULL (CMD_CALL_SLOWLOG | CMD_CALL_STATS | CMD_CALL_PROPAGATE) -#define CMD_CALL_NOWRAP (1<<4) /* Don't wrap also propagate array into - MULTI/EXEC: the caller will handle it. */ +#define CMD_CALL_FROM_MODULE (1<<4) /* From RM_Call */ -/* Command propagation flags, see propagate() function */ +/* Command propagation flags, see propagateNow() function */ #define PROPAGATE_NONE 0 #define PROPAGATE_AOF 1 #define PROPAGATE_REPL 2 @@ -1212,6 +1211,7 @@ typedef struct redisOp { typedef struct redisOpArray { redisOp *ops; int numops; + int capacity; } redisOpArray; /* This structure is returned by the getMemoryOverheadData() function in @@ -1358,9 +1358,11 @@ struct redisServer { int sentinel_mode; /* True if this instance is a Sentinel. */ size_t initial_memory_usage; /* Bytes used after initialization. */ int always_show_logo; /* Show logo even for non-stdout logging. */ - int in_script; /* Are we inside EVAL? */ + int in_script; /* Are we inside EVAL? */ int in_exec; /* Are we inside EXEC? */ - int propagate_in_transaction; /* Make sure we don't propagate nested MULTI/EXEC */ + int core_propagates; /* Is the core (in oppose to the module subsystem) is in charge of calling propagatePendingCommands? */ + int propagate_no_multi; /* True if propagatePendingCommands should avoid wrapping command in MULTI/EXEC */ + int module_ctx_nesting; /* moduleCreateContext() nesting level */ char *ignore_warnings; /* Config: warnings that should be ignored. */ int client_pause_in_transaction; /* Was a client pause executed during this Exec? */ int thp_enabled; /* If true, THP is enabled. */ @@ -2420,10 +2422,6 @@ void touchAllWatchedKeysInDb(redisDb *emptied, redisDb *replaced_with); void discardTransaction(client *c); void flagTransaction(client *c); void execCommandAbort(client *c, sds error); -void execCommandPropagateMulti(int dbid); -void execCommandPropagateExec(int dbid); -void beforePropagateMulti(); -void afterPropagateExec(); /* Redis object implementation */ void decrRefCount(robj *o); @@ -2695,8 +2693,8 @@ struct redisCommand *lookupCommandByCStringLogic(dict *commands, const char *s); struct redisCommand *lookupCommandByCString(const char *s); struct redisCommand *lookupCommandOrOriginal(robj **argv ,int argc); void call(client *c, int flags); -void propagate(int dbid, robj **argv, int argc, int flags); void alsoPropagate(int dbid, robj **argv, int argc, int target); +void propagatePendingCommands(); void redisOpArrayInit(redisOpArray *oa); void redisOpArrayFree(redisOpArray *oa); void forceCommandPropagation(client *c, int flags); @@ -2812,7 +2810,7 @@ int allowProtectedAction(int config, client *c); /* db.c -- Keyspace access API */ int removeExpire(redisDb *db, robj *key); void deleteExpiredKeyAndPropagate(redisDb *db, robj *keyobj); -void propagateExpire(redisDb *db, robj *key, int lazy); +void propagateDeletion(redisDb *db, robj *key, int lazy); int keyIsExpired(redisDb *db, robj *key); long long getExpire(redisDb *db, robj *key); void setExpire(client *c, redisDb *db, robj *key, long long when); diff --git a/src/t_list.c b/src/t_list.c index 84855ebe2..460437853 100644 --- a/src/t_list.c +++ b/src/t_list.c @@ -960,7 +960,7 @@ void serveClientBlockedOnList(client *receiver, robj *o, robj *key, robj *dstkey serverAssert(llen > 0); argv[2] = createStringObjectFromLongLong((count > llen) ? llen : count); - propagate(db->id, argv, 3, PROPAGATE_AOF|PROPAGATE_REPL); + alsoPropagate(db->id, argv, 3, PROPAGATE_AOF|PROPAGATE_REPL); decrRefCount(argv[2]); /* Pop a range of elements in a nested arrays way. */ @@ -968,7 +968,7 @@ void serveClientBlockedOnList(client *receiver, robj *o, robj *key, robj *dstkey return; } - propagate(db->id, argv, 2, PROPAGATE_AOF|PROPAGATE_REPL); + alsoPropagate(db->id, argv, 2, PROPAGATE_AOF|PROPAGATE_REPL); /* BRPOP/BLPOP */ value = listTypePop(o, wherefrom); @@ -999,7 +999,7 @@ void serveClientBlockedOnList(client *receiver, robj *o, robj *key, robj *dstkey argv[2] = dstkey; argv[3] = getStringObjectFromListPosition(wherefrom); argv[4] = getStringObjectFromListPosition(whereto); - propagate(db->id,argv,(isbrpoplpush ? 3 : 5),PROPAGATE_AOF|PROPAGATE_REPL); + alsoPropagate(db->id,argv,(isbrpoplpush ? 3 : 5),PROPAGATE_AOF|PROPAGATE_REPL); /* Notify event ("lpush" or "rpush" was notified by lmoveHandlePush). */ notifyKeyspaceEvent(NOTIFY_LIST,wherefrom == LIST_TAIL ? "rpop" : "lpop", diff --git a/src/t_stream.c b/src/t_stream.c index 14dd2bd02..c49889abe 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -1399,11 +1399,8 @@ void streamPropagateXCLAIM(client *c, robj *key, streamCG *group, robj *groupnam argv[12] = shared.lastid; argv[13] = createObjectFromStreamID(&group->last_id); - /* We use propagate() because this code path is not always called from - * the command execution context. Moreover this will just alter the - * consumer group state, and we don't need MULTI/EXEC wrapping because - * there is no message state cross-message atomicity required. */ - propagate(c->db->id,argv,14,PROPAGATE_AOF|PROPAGATE_REPL); + alsoPropagate(c->db->id,argv,14,PROPAGATE_AOF|PROPAGATE_REPL); + decrRefCount(argv[3]); decrRefCount(argv[7]); decrRefCount(argv[9]); @@ -1424,11 +1421,8 @@ void streamPropagateGroupID(client *c, robj *key, streamCG *group, robj *groupna argv[3] = groupname; argv[4] = createObjectFromStreamID(&group->last_id); - /* We use propagate() because this code path is not always called from - * the command execution context. Moreover this will just alter the - * consumer group state, and we don't need MULTI/EXEC wrapping because - * there is no message state cross-message atomicity required. */ - propagate(c->db->id,argv,5,PROPAGATE_AOF|PROPAGATE_REPL); + alsoPropagate(c->db->id,argv,5,PROPAGATE_AOF|PROPAGATE_REPL); + decrRefCount(argv[4]); } @@ -1446,11 +1440,8 @@ void streamPropagateConsumerCreation(client *c, robj *key, robj *groupname, sds argv[3] = groupname; argv[4] = createObject(OBJ_STRING,sdsdup(consumername)); - /* We use propagate() because this code path is not always called from - * the command execution context. Moreover this will just alter the - * consumer group state, and we don't need MULTI/EXEC wrapping because - * there is no message state cross-message atomicity required. */ - propagate(c->db->id,argv,5,PROPAGATE_AOF|PROPAGATE_REPL); + alsoPropagate(c->db->id,argv,5,PROPAGATE_AOF|PROPAGATE_REPL); + decrRefCount(argv[4]); } |