diff options
author | guybe7 <guy.benoish@redislabs.com> | 2022-12-20 13:21:50 +0530 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-12-20 09:51:50 +0200 |
commit | 9c7c6924a019b902996fc4b608541f475daa649d (patch) | |
tree | dbf8d847864a1a363e5e9cbb984fad82b4c491d3 | |
parent | 669688a342d5c1697c737b23f456e1750b46f0ff (diff) | |
download | redis-9c7c6924a019b902996fc4b608541f475daa649d.tar.gz |
Cleanup: Get rid of server.core_propagates (#11572)
1. Get rid of server.core_propagates - we can just rely on module/call nesting levels
2. Rename in_nested_call to execution_nesting and update the comment
3. Remove module_ctx_nesting (redundant, we can use execution_nesting)
4. Modify postExecutionUnitOperations according to the comment (The main purpose of this PR)
5. trackingHandlePendingKeyInvalidations: Check the nesting level inside this function
-rw-r--r-- | src/blocked.c | 5 | ||||
-rw-r--r-- | src/cluster.c | 6 | ||||
-rw-r--r-- | src/db.c | 2 | ||||
-rw-r--r-- | src/evict.c | 16 | ||||
-rw-r--r-- | src/expire.c | 10 | ||||
-rw-r--r-- | src/module.c | 93 | ||||
-rw-r--r-- | src/server.c | 52 | ||||
-rw-r--r-- | src/server.h | 7 | ||||
-rw-r--r-- | src/tracking.c | 4 | ||||
-rw-r--r-- | tests/unit/moduleapi/cluster.tcl | 2 | ||||
-rw-r--r-- | tests/unit/moduleapi/propagate.tcl | 29 |
11 files changed, 116 insertions, 110 deletions
diff --git a/src/blocked.c b/src/blocked.c index 5d6d0c800..b935ab9e0 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -608,7 +608,6 @@ void handleClientsBlockedOnKeys(void) { /* This function is called only when also_propagate is in its basic state * (i.e. not from call(), module context, etc.) */ serverAssert(server.also_propagate.numops == 0); - server.core_propagates = 1; while(listLength(server.ready_keys) != 0) { list *l; @@ -669,10 +668,6 @@ void handleClientsBlockedOnKeys(void) { } listRelease(l); /* We have the new list on place at this point. */ } - - serverAssert(server.core_propagates); /* This function should not be re-entrant */ - - server.core_propagates = 0; } /* This is how the current blocking lists/sorted sets/streams work, we use diff --git a/src/cluster.c b/src/cluster.c index 81ffba48b..7414f830f 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -7359,9 +7359,6 @@ void slotToKeyDestroy(redisDb *db) { unsigned int delKeysInSlot(unsigned int hashslot) { unsigned int j = 0; - server.core_propagates = 1; - server.in_nested_call++; - dictEntry *de = (*server.db->slots_to_keys).by_slot[hashslot].head; while (de != NULL) { sds sdskey = dictGetKey(de); @@ -7376,10 +7373,7 @@ unsigned int delKeysInSlot(unsigned int hashslot) { j++; server.dirty++; } - serverAssert(server.core_propagates); /* This function should not be re-entrant */ - server.core_propagates = 0; - server.in_nested_call--; return j; } @@ -696,7 +696,7 @@ void flushallCommand(client *c) { addReply(c,shared.ok); } -/* This command implements DEL and LAZYDEL. */ +/* This command implements DEL and UNLINK. */ void delGenericCommand(client *c, int lazy) { int numdel = 0, j; diff --git a/src/evict.c b/src/evict.c index 85a8d7ab0..41712b926 100644 --- a/src/evict.c +++ b/src/evict.c @@ -567,17 +567,8 @@ int performEvictions(void) { monotime evictionTimer; elapsedStart(&evictionTimer); - /* Unlike active-expire and blocked client, we can reach here from 'CONFIG SET maxmemory' - * so we have to back-up and restore server.core_propagates. */ - int prev_core_propagates = server.core_propagates; + /* Try to smoke-out bugs (server.also_propagate should be empty here) */ serverAssert(server.also_propagate.numops == 0); - server.core_propagates = 1; - - /* Increase nested call counter - * we add this in order to prevent any RM_Call that may exist - * in the notify CB to be propagated immediately. - * we want them in multi/exec with the DEL command */ - server.in_nested_call++; while (mem_freed < (long long)mem_tofree) { int j, k, i; @@ -747,11 +738,6 @@ cant_free: latencyAddSampleIfNeeded("eviction-lazyfree",lazyfree_latency); } - serverAssert(server.core_propagates); /* This function should not be re-entrant */ - - server.core_propagates = prev_core_propagates; - server.in_nested_call--; - latencyEndMonitor(latency); latencyAddSampleIfNeeded("eviction-cycle",latency); diff --git a/src/expire.c b/src/expire.c index 17df22205..7b6b3bc9c 100644 --- a/src/expire.c +++ b/src/expire.c @@ -182,11 +182,8 @@ void activeExpireCycle(int type) { long total_sampled = 0; long total_expired = 0; - /* Sanity: There can't be any pending commands to propagate when - * we're in cron */ + /* Try to smoke-out bugs (server.also_propagate should be empty here) */ serverAssert(server.also_propagate.numops == 0); - server.core_propagates = 1; - server.in_nested_call++; for (j = 0; j < dbs_per_call && timelimit_exit == 0; j++) { /* Expired and checked in a single loop. */ @@ -312,11 +309,6 @@ void activeExpireCycle(int type) { (expired*100/sampled) > config_cycle_acceptable_stale); } - serverAssert(server.core_propagates); /* This function should not be re-entrant */ - - server.core_propagates = 0; - server.in_nested_call--; - elapsed = ustime()-start; server.stat_expire_cycle_time_used += elapsed; latencyAddSampleIfNeeded("expire-cycle",elapsed/1000); diff --git a/src/module.c b/src/module.c index 7fd5cad00..7fbc1d52b 100644 --- a/src/module.c +++ b/src/module.c @@ -181,6 +181,8 @@ typedef struct RedisModuleCtx RedisModuleCtx; #define REDISMODULE_CTX_NEW_CLIENT (1<<7) /* Free client object when the context is destroyed */ #define REDISMODULE_CTX_CHANNELS_POS_REQUEST (1<<8) +#define REDISMODULE_CTX_COMMAND (1<<9) /* Context created to serve a command from call() or AOF (which calls cmd->proc directly) */ + /* This represents a Redis key opened with RM_OpenKey(). */ struct RedisModuleKey { @@ -737,23 +739,25 @@ int RM_GetApi(const char *funcname, void **targetPtrPtr) { return REDISMODULE_OK; } +void modulePostExecutionUnitOperations() { + if (server.execution_nesting) + return; + + if (server.busy_module_yield_flags) { + blockingOperationEnds(); + server.busy_module_yield_flags = BUSY_MODULE_YIELD_NONE; + if (server.current_client) + unprotectClient(server.current_client); + unblockPostponedClients(); + } +} + /* Free the context after the user function was called. */ void moduleFreeContext(RedisModuleCtx *ctx) { - if (!(ctx->flags & REDISMODULE_CTX_THREAD_SAFE)) { - /* Modules take care of their own propagation, when we are - * outside of call() context (timers, events, etc.). */ - if (--server.module_ctx_nesting == 0) { - if (!server.core_propagates) { - postExecutionUnitOperations(); - } - if (server.busy_module_yield_flags) { - blockingOperationEnds(); - server.busy_module_yield_flags = BUSY_MODULE_YIELD_NONE; - if (server.current_client) - unprotectClient(server.current_client); - unblockPostponedClients(); - } - } + /* See comment in moduleCreateContext */ + if (!(ctx->flags & (REDISMODULE_CTX_THREAD_SAFE|REDISMODULE_CTX_COMMAND))) { + server.execution_nesting--; + postExecutionUnitOperations(); } autoMemoryCollect(ctx); poolAllocRelease(ctx); @@ -805,8 +809,15 @@ void moduleCreateContext(RedisModuleCtx *out_ctx, RedisModule *module, int ctx_f else out_ctx->next_yield_time = getMonotonicUs() + server.busy_reply_threshold * 1000; - if (!(ctx_flags & REDISMODULE_CTX_THREAD_SAFE)) { - server.module_ctx_nesting++; + /* Increment the execution_nesting counter (module is about to execute some code), + * except in the following cases: + * 1. We came here from cmd->proc (either call() or AOF load). + * In the former, the counter has been already incremented from within + * call() and in the latter we don't care about execution_nesting + * 2. If we are running in a thread (execution_nesting will be dealt with + * when locking/unlocking the GIL) */ + if (!(ctx_flags & (REDISMODULE_CTX_THREAD_SAFE|REDISMODULE_CTX_COMMAND))) { + server.execution_nesting++; } } @@ -815,7 +826,7 @@ void moduleCreateContext(RedisModuleCtx *out_ctx, RedisModule *module, int ctx_f void RedisModuleCommandDispatcher(client *c) { RedisModuleCommand *cp = c->cmd->module_cmd; RedisModuleCtx ctx; - moduleCreateContext(&ctx, cp->module, REDISMODULE_CTX_NONE); + moduleCreateContext(&ctx, cp->module, REDISMODULE_CTX_COMMAND); ctx.client = c; cp->func(&ctx,(void**)c->argv,c->argc); @@ -7883,10 +7894,10 @@ void RM_FreeThreadSafeContext(RedisModuleCtx *ctx) { void moduleGILAfterLock() { /* We should never get here if we already inside a module * code block which already opened a context. */ - serverAssert(server.module_ctx_nesting == 0); + serverAssert(server.execution_nesting == 0); /* Bump up the nesting level to prevent immediate propagation * of possible RM_Call from th thread */ - server.module_ctx_nesting++; + server.execution_nesting++; updateCachedTime(0); } @@ -7921,20 +7932,12 @@ void moduleGILBeforeUnlock() { /* We should never get here if we already inside a module * code block which already opened a context, except * the bump-up from moduleGILAcquired. */ - serverAssert(server.module_ctx_nesting == 1); - /* Restore ctx_nesting and propagate pending commands - * (because it's u clear when thread safe contexts are + serverAssert(server.execution_nesting == 1); + /* Restore nesting level and propagate pending commands + * (because it's unclear when thread safe contexts are * released we have to propagate here). */ - server.module_ctx_nesting--; + server.execution_nesting--; postExecutionUnitOperations(); - - if (server.busy_module_yield_flags) { - blockingOperationEnds(); - server.busy_module_yield_flags = BUSY_MODULE_YIELD_NONE; - if (server.current_client) - unprotectClient(server.current_client); - unblockPostponedClients(); - } } /* Release the server lock after a thread safe API call was executed. */ @@ -8041,8 +8044,10 @@ int RM_SubscribeToKeyspaceEvents(RedisModuleCtx *ctx, int types, RedisModuleNoti } void firePostExecutionUnitJobs() { - /* Avoid propagation of commands. */ - server.in_nested_call++; + /* Avoid propagation of commands. + * In that way, postExecutionUnitOperations will prevent + * recursive calls to firePostExecutionUnitJobs. */ + server.execution_nesting++; while (listLength(modulePostExecUnitJobs) > 0) { listNode *ln = listFirst(modulePostExecUnitJobs); RedisModulePostExecUnitJob *job = listNodeValue(ln); @@ -8058,7 +8063,7 @@ void firePostExecutionUnitJobs() { moduleFreeContext(&ctx); zfree(job); } - server.in_nested_call--; + server.execution_nesting--; } /* When running inside a key space notification callback, it is dangerous and highly discouraged to perform any write @@ -8108,6 +8113,22 @@ void moduleNotifyKeyspaceEvent(int type, const char *event, robj *key, int dbid) /* Don't do anything if there aren't any subscribers */ if (listLength(moduleKeyspaceSubscribers) == 0) return; + /* Ugly hack to handle modules which use write commands from within + * notify_callback, which they should NOT do! + * Modules should use RedisModules_AddPostNotificationJob instead. + * + * Anyway, we want any propagated commands from within notify_callback + * to be propagated inside a MULTI/EXEC together with the original + * command that caused the KSN. + * Note that it's only relevant for KSNs which are not generated from within + * call(), for example active-expiry and eviction (because anyway + * execution_nesting is incremented from within call()) + * + * In order to do that we increment the execution_nesting counter, thus + * preventing postExecutionUnitOperations (from within moduleFreeContext) + * from propagating commands from CB. */ + server.execution_nesting++; + listIter li; listNode *ln; listRewind(moduleKeyspaceSubscribers,&li); @@ -8136,6 +8157,8 @@ void moduleNotifyKeyspaceEvent(int type, const char *event, robj *key, int dbid) moduleFreeContext(&ctx); } } + + server.execution_nesting--; } /* Unsubscribe any notification subscribers this module has upon unloading */ diff --git a/src/server.c b/src/server.c index 7f2f4497e..dee7c402a 100644 --- a/src/server.c +++ b/src/server.c @@ -2481,7 +2481,7 @@ void initServer(void) { server.main_thread_id = pthread_self(); server.current_client = NULL; server.errors = raxNew(); - server.in_nested_call = 0; + server.execution_nesting = 0; server.clients = listCreate(); server.clients_index = raxNew(); server.clients_to_close = listCreate(); @@ -2554,8 +2554,6 @@ void initServer(void) { server.in_exec = 0; server.busy_module_yield_flags = BUSY_MODULE_YIELD_NONE; server.busy_module_yield_reply = NULL; - server.core_propagates = 0; - server.module_ctx_nesting = 0; server.client_pause_in_transaction = 0; server.child_pid = -1; server.child_type = CHILD_TYPE_NONE; @@ -3323,16 +3321,19 @@ static void propagatePendingCommands() { * What we want to achieve is that the entire execution unit will be done atomically, * currently with respect to replication and post jobs, but in the future there might * be other considerations. So we basically want the `postUnitOperations` to trigger - * after the entire chain finished. - * - * Current, in order to avoid massive code changes that could be risky to cherry-pick, - * we count on the mechanism we already have such as `server.core_propagation`, - * `server.module_ctx_nesting`, and `server.in_nested_call`. We understand that we probably - * do not need all of those variable and we will make an attempt to re-arrange it on unstable - * branch. */ + * after the entire chain finished. */ void postExecutionUnitOperations() { + if (server.execution_nesting) + return; + firePostExecutionUnitJobs(); + + /* If we are at the top-most call() and not inside a an active module + * context (e.g. within a module timer) we can propagate what we accumulated. */ propagatePendingCommands(); + + /* Module subsystem post-execution-unit logic */ + modulePostExecutionUnitOperations(); } /* Increment the command failure counters (either rejected_calls or failed_calls). @@ -3414,13 +3415,7 @@ void call(client *c, int flags) { * The only other option to get to call() without having processCommand * as an entry point is if a module triggers RM_Call outside of call() * context (for example, in a timer). - * In that case, the module is in charge of propagation. - * - * Because call() is re-entrant we have to cache and restore - * server.core_propagates. */ - int prev_core_propagates = server.core_propagates; - if (!server.core_propagates && !(flags & CMD_CALL_FROM_MODULE)) - server.core_propagates = 1; + * In that case, the module is in charge of propagation. */ /* Call the command. */ dirty = server.dirty; @@ -3429,8 +3424,8 @@ void call(client *c, int flags) { const long long call_timer = ustime(); /* Update cache time, in case we have nested calls we want to - * update only on the first call*/ - if (server.in_nested_call++ == 0) { + * update only on the first call */ + if (server.execution_nesting++ == 0) { updateCachedTimeWithUs(0,call_timer); } @@ -3439,7 +3434,7 @@ void call(client *c, int flags) { monotonic_start = getMonotonicUs(); c->cmd->proc(c); - server.in_nested_call--; + server.execution_nesting--; /* In order to avoid performance implication due to querying the clock using a system call 3 times, * we use a monotonic clock, when we are sure its cost is very low, and fall back to non-monotonic call otherwise. */ @@ -3599,8 +3594,6 @@ void call(client *c, int flags) { if (!server.in_exec && server.client_pause_in_transaction) { server.client_pause_in_transaction = 0; } - - server.core_propagates = prev_core_propagates; } /* Used when a command that is ready for execution needs to be rejected, due to @@ -3645,17 +3638,10 @@ void rejectCommandFormat(client *c, const char *fmt, ...) { /* This is called after a command in call, we can do some maintenance job in it. */ void afterCommand(client *c) { UNUSED(c); - if (!server.in_nested_call) { - /* If we are at the top-most call() we can propagate what we accumulated. - * Should be done before trackingHandlePendingKeyInvalidations so that we - * reply to client before invalidating cache (makes more sense) */ - if (server.core_propagates) { - postExecutionUnitOperations(); - } - /* Flush pending invalidation messages only when we are not in nested call. - * So the messages are not interleaved with transaction response. */ - trackingHandlePendingKeyInvalidations(); - } + /* Should be done before trackingHandlePendingKeyInvalidations so that we + * reply to client before invalidating cache (makes more sense) */ + postExecutionUnitOperations(); + trackingHandlePendingKeyInvalidations(); } /* Check if c->cmd exists, fills `err` with details in case it doesn't. diff --git a/src/server.h b/src/server.h index 8a5a000d3..765b67d62 100644 --- a/src/server.h +++ b/src/server.h @@ -1515,8 +1515,6 @@ struct redisServer { int in_exec; /* Are we inside EXEC? */ int busy_module_yield_flags; /* Are we inside a busy module? (triggered by RM_Yield). see BUSY_MODULE_YIELD_ flags. */ const char *busy_module_yield_reply; /* When non-null, we are inside RM_Yield. */ - int core_propagates; /* Is the core (in oppose to the module subsystem) is in charge of calling propagatePendingCommands? */ - int module_ctx_nesting; /* moduleCreateContext() nesting level */ char *ignore_warnings; /* Config: warnings that should be ignored. */ int client_pause_in_transaction; /* Was a client pause executed during this Exec? */ int thp_enabled; /* If true, THP is enabled. */ @@ -1553,7 +1551,9 @@ struct redisServer { clientMemUsageBucket* client_mem_usage_buckets; rax *clients_timeout_table; /* Radix tree for blocked clients timeouts. */ - int in_nested_call; /* If > 0, in a nested call of a call */ + int execution_nesting; /* Execution nesting level. + * e.g. call(), async module stuff (timers, events, etc.), + * cron stuff (active expire, eviction) */ rax *clients_index; /* Active clients dictionary by client ID. */ uint32_t paused_actions; /* Bitmask of actions that are currently paused */ list *postponed_clients; /* List of postponed clients */ @@ -2434,6 +2434,7 @@ void moduleReleaseGIL(void); void moduleNotifyKeyspaceEvent(int type, const char *event, robj *key, int dbid); void firePostExecutionUnitJobs(); void moduleCallCommandFilters(client *c); +void modulePostExecutionUnitOperations(); void ModuleForkDoneHandler(int exitcode, int bysignal); int TerminateModuleForkChild(int child_pid, int wait); ssize_t rdbSaveModulesAux(rio *rdb, int when); diff --git a/src/tracking.c b/src/tracking.c index d62eb736b..ba7406e6c 100644 --- a/src/tracking.c +++ b/src/tracking.c @@ -415,6 +415,10 @@ void trackingInvalidateKey(client *c, robj *keyobj, int bcast) { void trackingHandlePendingKeyInvalidations(void) { if (!listLength(server.tracking_pending_keys)) return; + /* Flush pending invalidation messages only when we are not in nested call. + * So the messages are not interleaved with transaction response. */ + if (server.execution_nesting) return; + listNode *ln; listIter li; diff --git a/tests/unit/moduleapi/cluster.tcl b/tests/unit/moduleapi/cluster.tcl index 8851b0301..43356f77d 100644 --- a/tests/unit/moduleapi/cluster.tcl +++ b/tests/unit/moduleapi/cluster.tcl @@ -219,4 +219,4 @@ start_cluster 3 0 [list config_lines $modules] { assert_equal {PONG} [$node2 PING] assert_equal {PONG} [$node3 PING] } -}
\ No newline at end of file +} diff --git a/tests/unit/moduleapi/propagate.tcl b/tests/unit/moduleapi/propagate.tcl index 7bdc000ee..ce4aaae44 100644 --- a/tests/unit/moduleapi/propagate.tcl +++ b/tests/unit/moduleapi/propagate.tcl @@ -675,8 +675,10 @@ tags "modules" { } } + tags "modules aof" { - test {Modules RM_Replicate replicates MULTI/EXEC correctly} { + foreach aofload_type {debug_cmd startup} { + test "Modules RM_Replicate replicates MULTI/EXEC correctly: AOF-load type $aofload_type" { start_server [list overrides [list loadmodule "$testmodule"]] { # Enable the AOF r config set appendonly yes @@ -690,11 +692,34 @@ tags "modules aof" { r propagate-test.mixed r exec + assert_equal [r get counter-1] {} + assert_equal [r get counter-2] {} + assert_equal [r get using-call] 2 + assert_equal [r get after-call] 2 + assert_equal [r get notifications] 4 + # Load the AOF - r debug loadaof + if {$aofload_type == "debug_cmd"} { + r debug loadaof + } else { + r config rewrite + restart_server 0 true false + wait_done_loading r + } + + # This module behaves bad on purpose, it only calls + # RM_Replicate for counter-1 and counter-2 so values + # after AOF-load are different + assert_equal [r get counter-1] 4 + assert_equal [r get counter-2] 4 + assert_equal [r get using-call] 2 + assert_equal [r get after-call] 2 + # 4+4+2+2 commands from AOF (just above) + 4 "INCR notifications" from AOF + 4 notifications for these INCRs + assert_equal [r get notifications] 20 assert_equal {OK} [r module unload propagate-test] assert_equal [s 0 unexpected_error_replies] 0 } } + } } |