summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorguybe7 <guy.benoish@redislabs.com>2022-12-20 13:21:50 +0530
committerGitHub <noreply@github.com>2022-12-20 09:51:50 +0200
commit9c7c6924a019b902996fc4b608541f475daa649d (patch)
treedbf8d847864a1a363e5e9cbb984fad82b4c491d3
parent669688a342d5c1697c737b23f456e1750b46f0ff (diff)
downloadredis-9c7c6924a019b902996fc4b608541f475daa649d.tar.gz
Cleanup: Get rid of server.core_propagates (#11572)
1. Get rid of server.core_propagates - we can just rely on module/call nesting levels 2. Rename in_nested_call to execution_nesting and update the comment 3. Remove module_ctx_nesting (redundant, we can use execution_nesting) 4. Modify postExecutionUnitOperations according to the comment (The main purpose of this PR) 5. trackingHandlePendingKeyInvalidations: Check the nesting level inside this function
-rw-r--r--src/blocked.c5
-rw-r--r--src/cluster.c6
-rw-r--r--src/db.c2
-rw-r--r--src/evict.c16
-rw-r--r--src/expire.c10
-rw-r--r--src/module.c93
-rw-r--r--src/server.c52
-rw-r--r--src/server.h7
-rw-r--r--src/tracking.c4
-rw-r--r--tests/unit/moduleapi/cluster.tcl2
-rw-r--r--tests/unit/moduleapi/propagate.tcl29
11 files changed, 116 insertions, 110 deletions
diff --git a/src/blocked.c b/src/blocked.c
index 5d6d0c800..b935ab9e0 100644
--- a/src/blocked.c
+++ b/src/blocked.c
@@ -608,7 +608,6 @@ void handleClientsBlockedOnKeys(void) {
/* This function is called only when also_propagate is in its basic state
* (i.e. not from call(), module context, etc.) */
serverAssert(server.also_propagate.numops == 0);
- server.core_propagates = 1;
while(listLength(server.ready_keys) != 0) {
list *l;
@@ -669,10 +668,6 @@ void handleClientsBlockedOnKeys(void) {
}
listRelease(l); /* We have the new list on place at this point. */
}
-
- serverAssert(server.core_propagates); /* This function should not be re-entrant */
-
- server.core_propagates = 0;
}
/* This is how the current blocking lists/sorted sets/streams work, we use
diff --git a/src/cluster.c b/src/cluster.c
index 81ffba48b..7414f830f 100644
--- a/src/cluster.c
+++ b/src/cluster.c
@@ -7359,9 +7359,6 @@ void slotToKeyDestroy(redisDb *db) {
unsigned int delKeysInSlot(unsigned int hashslot) {
unsigned int j = 0;
- server.core_propagates = 1;
- server.in_nested_call++;
-
dictEntry *de = (*server.db->slots_to_keys).by_slot[hashslot].head;
while (de != NULL) {
sds sdskey = dictGetKey(de);
@@ -7376,10 +7373,7 @@ unsigned int delKeysInSlot(unsigned int hashslot) {
j++;
server.dirty++;
}
- serverAssert(server.core_propagates); /* This function should not be re-entrant */
- server.core_propagates = 0;
- server.in_nested_call--;
return j;
}
diff --git a/src/db.c b/src/db.c
index a0d23297f..e82402125 100644
--- a/src/db.c
+++ b/src/db.c
@@ -696,7 +696,7 @@ void flushallCommand(client *c) {
addReply(c,shared.ok);
}
-/* This command implements DEL and LAZYDEL. */
+/* This command implements DEL and UNLINK. */
void delGenericCommand(client *c, int lazy) {
int numdel = 0, j;
diff --git a/src/evict.c b/src/evict.c
index 85a8d7ab0..41712b926 100644
--- a/src/evict.c
+++ b/src/evict.c
@@ -567,17 +567,8 @@ int performEvictions(void) {
monotime evictionTimer;
elapsedStart(&evictionTimer);
- /* Unlike active-expire and blocked client, we can reach here from 'CONFIG SET maxmemory'
- * so we have to back-up and restore server.core_propagates. */
- int prev_core_propagates = server.core_propagates;
+ /* Try to smoke-out bugs (server.also_propagate should be empty here) */
serverAssert(server.also_propagate.numops == 0);
- server.core_propagates = 1;
-
- /* Increase nested call counter
- * we add this in order to prevent any RM_Call that may exist
- * in the notify CB to be propagated immediately.
- * we want them in multi/exec with the DEL command */
- server.in_nested_call++;
while (mem_freed < (long long)mem_tofree) {
int j, k, i;
@@ -747,11 +738,6 @@ cant_free:
latencyAddSampleIfNeeded("eviction-lazyfree",lazyfree_latency);
}
- serverAssert(server.core_propagates); /* This function should not be re-entrant */
-
- server.core_propagates = prev_core_propagates;
- server.in_nested_call--;
-
latencyEndMonitor(latency);
latencyAddSampleIfNeeded("eviction-cycle",latency);
diff --git a/src/expire.c b/src/expire.c
index 17df22205..7b6b3bc9c 100644
--- a/src/expire.c
+++ b/src/expire.c
@@ -182,11 +182,8 @@ void activeExpireCycle(int type) {
long total_sampled = 0;
long total_expired = 0;
- /* Sanity: There can't be any pending commands to propagate when
- * we're in cron */
+ /* Try to smoke-out bugs (server.also_propagate should be empty here) */
serverAssert(server.also_propagate.numops == 0);
- server.core_propagates = 1;
- server.in_nested_call++;
for (j = 0; j < dbs_per_call && timelimit_exit == 0; j++) {
/* Expired and checked in a single loop. */
@@ -312,11 +309,6 @@ void activeExpireCycle(int type) {
(expired*100/sampled) > config_cycle_acceptable_stale);
}
- serverAssert(server.core_propagates); /* This function should not be re-entrant */
-
- server.core_propagates = 0;
- server.in_nested_call--;
-
elapsed = ustime()-start;
server.stat_expire_cycle_time_used += elapsed;
latencyAddSampleIfNeeded("expire-cycle",elapsed/1000);
diff --git a/src/module.c b/src/module.c
index 7fd5cad00..7fbc1d52b 100644
--- a/src/module.c
+++ b/src/module.c
@@ -181,6 +181,8 @@ typedef struct RedisModuleCtx RedisModuleCtx;
#define REDISMODULE_CTX_NEW_CLIENT (1<<7) /* Free client object when the
context is destroyed */
#define REDISMODULE_CTX_CHANNELS_POS_REQUEST (1<<8)
+#define REDISMODULE_CTX_COMMAND (1<<9) /* Context created to serve a command from call() or AOF (which calls cmd->proc directly) */
+
/* This represents a Redis key opened with RM_OpenKey(). */
struct RedisModuleKey {
@@ -737,23 +739,25 @@ int RM_GetApi(const char *funcname, void **targetPtrPtr) {
return REDISMODULE_OK;
}
+void modulePostExecutionUnitOperations() {
+ if (server.execution_nesting)
+ return;
+
+ if (server.busy_module_yield_flags) {
+ blockingOperationEnds();
+ server.busy_module_yield_flags = BUSY_MODULE_YIELD_NONE;
+ if (server.current_client)
+ unprotectClient(server.current_client);
+ unblockPostponedClients();
+ }
+}
+
/* Free the context after the user function was called. */
void moduleFreeContext(RedisModuleCtx *ctx) {
- if (!(ctx->flags & REDISMODULE_CTX_THREAD_SAFE)) {
- /* Modules take care of their own propagation, when we are
- * outside of call() context (timers, events, etc.). */
- if (--server.module_ctx_nesting == 0) {
- if (!server.core_propagates) {
- postExecutionUnitOperations();
- }
- if (server.busy_module_yield_flags) {
- blockingOperationEnds();
- server.busy_module_yield_flags = BUSY_MODULE_YIELD_NONE;
- if (server.current_client)
- unprotectClient(server.current_client);
- unblockPostponedClients();
- }
- }
+ /* See comment in moduleCreateContext */
+ if (!(ctx->flags & (REDISMODULE_CTX_THREAD_SAFE|REDISMODULE_CTX_COMMAND))) {
+ server.execution_nesting--;
+ postExecutionUnitOperations();
}
autoMemoryCollect(ctx);
poolAllocRelease(ctx);
@@ -805,8 +809,15 @@ void moduleCreateContext(RedisModuleCtx *out_ctx, RedisModule *module, int ctx_f
else
out_ctx->next_yield_time = getMonotonicUs() + server.busy_reply_threshold * 1000;
- if (!(ctx_flags & REDISMODULE_CTX_THREAD_SAFE)) {
- server.module_ctx_nesting++;
+ /* Increment the execution_nesting counter (module is about to execute some code),
+ * except in the following cases:
+ * 1. We came here from cmd->proc (either call() or AOF load).
+ * In the former, the counter has been already incremented from within
+ * call() and in the latter we don't care about execution_nesting
+ * 2. If we are running in a thread (execution_nesting will be dealt with
+ * when locking/unlocking the GIL) */
+ if (!(ctx_flags & (REDISMODULE_CTX_THREAD_SAFE|REDISMODULE_CTX_COMMAND))) {
+ server.execution_nesting++;
}
}
@@ -815,7 +826,7 @@ void moduleCreateContext(RedisModuleCtx *out_ctx, RedisModule *module, int ctx_f
void RedisModuleCommandDispatcher(client *c) {
RedisModuleCommand *cp = c->cmd->module_cmd;
RedisModuleCtx ctx;
- moduleCreateContext(&ctx, cp->module, REDISMODULE_CTX_NONE);
+ moduleCreateContext(&ctx, cp->module, REDISMODULE_CTX_COMMAND);
ctx.client = c;
cp->func(&ctx,(void**)c->argv,c->argc);
@@ -7883,10 +7894,10 @@ void RM_FreeThreadSafeContext(RedisModuleCtx *ctx) {
void moduleGILAfterLock() {
/* We should never get here if we already inside a module
* code block which already opened a context. */
- serverAssert(server.module_ctx_nesting == 0);
+ serverAssert(server.execution_nesting == 0);
/* Bump up the nesting level to prevent immediate propagation
* of possible RM_Call from th thread */
- server.module_ctx_nesting++;
+ server.execution_nesting++;
updateCachedTime(0);
}
@@ -7921,20 +7932,12 @@ void moduleGILBeforeUnlock() {
/* We should never get here if we already inside a module
* code block which already opened a context, except
* the bump-up from moduleGILAcquired. */
- serverAssert(server.module_ctx_nesting == 1);
- /* Restore ctx_nesting and propagate pending commands
- * (because it's u clear when thread safe contexts are
+ serverAssert(server.execution_nesting == 1);
+ /* Restore nesting level and propagate pending commands
+ * (because it's unclear when thread safe contexts are
* released we have to propagate here). */
- server.module_ctx_nesting--;
+ server.execution_nesting--;
postExecutionUnitOperations();
-
- if (server.busy_module_yield_flags) {
- blockingOperationEnds();
- server.busy_module_yield_flags = BUSY_MODULE_YIELD_NONE;
- if (server.current_client)
- unprotectClient(server.current_client);
- unblockPostponedClients();
- }
}
/* Release the server lock after a thread safe API call was executed. */
@@ -8041,8 +8044,10 @@ int RM_SubscribeToKeyspaceEvents(RedisModuleCtx *ctx, int types, RedisModuleNoti
}
void firePostExecutionUnitJobs() {
- /* Avoid propagation of commands. */
- server.in_nested_call++;
+ /* Avoid propagation of commands.
+ * In that way, postExecutionUnitOperations will prevent
+ * recursive calls to firePostExecutionUnitJobs. */
+ server.execution_nesting++;
while (listLength(modulePostExecUnitJobs) > 0) {
listNode *ln = listFirst(modulePostExecUnitJobs);
RedisModulePostExecUnitJob *job = listNodeValue(ln);
@@ -8058,7 +8063,7 @@ void firePostExecutionUnitJobs() {
moduleFreeContext(&ctx);
zfree(job);
}
- server.in_nested_call--;
+ server.execution_nesting--;
}
/* When running inside a key space notification callback, it is dangerous and highly discouraged to perform any write
@@ -8108,6 +8113,22 @@ void moduleNotifyKeyspaceEvent(int type, const char *event, robj *key, int dbid)
/* Don't do anything if there aren't any subscribers */
if (listLength(moduleKeyspaceSubscribers) == 0) return;
+ /* Ugly hack to handle modules which use write commands from within
+ * notify_callback, which they should NOT do!
+ * Modules should use RedisModules_AddPostNotificationJob instead.
+ *
+ * Anyway, we want any propagated commands from within notify_callback
+ * to be propagated inside a MULTI/EXEC together with the original
+ * command that caused the KSN.
+ * Note that it's only relevant for KSNs which are not generated from within
+ * call(), for example active-expiry and eviction (because anyway
+ * execution_nesting is incremented from within call())
+ *
+ * In order to do that we increment the execution_nesting counter, thus
+ * preventing postExecutionUnitOperations (from within moduleFreeContext)
+ * from propagating commands from CB. */
+ server.execution_nesting++;
+
listIter li;
listNode *ln;
listRewind(moduleKeyspaceSubscribers,&li);
@@ -8136,6 +8157,8 @@ void moduleNotifyKeyspaceEvent(int type, const char *event, robj *key, int dbid)
moduleFreeContext(&ctx);
}
}
+
+ server.execution_nesting--;
}
/* Unsubscribe any notification subscribers this module has upon unloading */
diff --git a/src/server.c b/src/server.c
index 7f2f4497e..dee7c402a 100644
--- a/src/server.c
+++ b/src/server.c
@@ -2481,7 +2481,7 @@ void initServer(void) {
server.main_thread_id = pthread_self();
server.current_client = NULL;
server.errors = raxNew();
- server.in_nested_call = 0;
+ server.execution_nesting = 0;
server.clients = listCreate();
server.clients_index = raxNew();
server.clients_to_close = listCreate();
@@ -2554,8 +2554,6 @@ void initServer(void) {
server.in_exec = 0;
server.busy_module_yield_flags = BUSY_MODULE_YIELD_NONE;
server.busy_module_yield_reply = NULL;
- server.core_propagates = 0;
- server.module_ctx_nesting = 0;
server.client_pause_in_transaction = 0;
server.child_pid = -1;
server.child_type = CHILD_TYPE_NONE;
@@ -3323,16 +3321,19 @@ static void propagatePendingCommands() {
* What we want to achieve is that the entire execution unit will be done atomically,
* currently with respect to replication and post jobs, but in the future there might
* be other considerations. So we basically want the `postUnitOperations` to trigger
- * after the entire chain finished.
- *
- * Current, in order to avoid massive code changes that could be risky to cherry-pick,
- * we count on the mechanism we already have such as `server.core_propagation`,
- * `server.module_ctx_nesting`, and `server.in_nested_call`. We understand that we probably
- * do not need all of those variable and we will make an attempt to re-arrange it on unstable
- * branch. */
+ * after the entire chain finished. */
void postExecutionUnitOperations() {
+ if (server.execution_nesting)
+ return;
+
firePostExecutionUnitJobs();
+
+ /* If we are at the top-most call() and not inside a an active module
+ * context (e.g. within a module timer) we can propagate what we accumulated. */
propagatePendingCommands();
+
+ /* Module subsystem post-execution-unit logic */
+ modulePostExecutionUnitOperations();
}
/* Increment the command failure counters (either rejected_calls or failed_calls).
@@ -3414,13 +3415,7 @@ void call(client *c, int flags) {
* The only other option to get to call() without having processCommand
* as an entry point is if a module triggers RM_Call outside of call()
* context (for example, in a timer).
- * In that case, the module is in charge of propagation.
- *
- * Because call() is re-entrant we have to cache and restore
- * server.core_propagates. */
- int prev_core_propagates = server.core_propagates;
- if (!server.core_propagates && !(flags & CMD_CALL_FROM_MODULE))
- server.core_propagates = 1;
+ * In that case, the module is in charge of propagation. */
/* Call the command. */
dirty = server.dirty;
@@ -3429,8 +3424,8 @@ void call(client *c, int flags) {
const long long call_timer = ustime();
/* Update cache time, in case we have nested calls we want to
- * update only on the first call*/
- if (server.in_nested_call++ == 0) {
+ * update only on the first call */
+ if (server.execution_nesting++ == 0) {
updateCachedTimeWithUs(0,call_timer);
}
@@ -3439,7 +3434,7 @@ void call(client *c, int flags) {
monotonic_start = getMonotonicUs();
c->cmd->proc(c);
- server.in_nested_call--;
+ server.execution_nesting--;
/* In order to avoid performance implication due to querying the clock using a system call 3 times,
* we use a monotonic clock, when we are sure its cost is very low, and fall back to non-monotonic call otherwise. */
@@ -3599,8 +3594,6 @@ void call(client *c, int flags) {
if (!server.in_exec && server.client_pause_in_transaction) {
server.client_pause_in_transaction = 0;
}
-
- server.core_propagates = prev_core_propagates;
}
/* Used when a command that is ready for execution needs to be rejected, due to
@@ -3645,17 +3638,10 @@ void rejectCommandFormat(client *c, const char *fmt, ...) {
/* This is called after a command in call, we can do some maintenance job in it. */
void afterCommand(client *c) {
UNUSED(c);
- if (!server.in_nested_call) {
- /* If we are at the top-most call() we can propagate what we accumulated.
- * Should be done before trackingHandlePendingKeyInvalidations so that we
- * reply to client before invalidating cache (makes more sense) */
- if (server.core_propagates) {
- postExecutionUnitOperations();
- }
- /* Flush pending invalidation messages only when we are not in nested call.
- * So the messages are not interleaved with transaction response. */
- trackingHandlePendingKeyInvalidations();
- }
+ /* Should be done before trackingHandlePendingKeyInvalidations so that we
+ * reply to client before invalidating cache (makes more sense) */
+ postExecutionUnitOperations();
+ trackingHandlePendingKeyInvalidations();
}
/* Check if c->cmd exists, fills `err` with details in case it doesn't.
diff --git a/src/server.h b/src/server.h
index 8a5a000d3..765b67d62 100644
--- a/src/server.h
+++ b/src/server.h
@@ -1515,8 +1515,6 @@ struct redisServer {
int in_exec; /* Are we inside EXEC? */
int busy_module_yield_flags; /* Are we inside a busy module? (triggered by RM_Yield). see BUSY_MODULE_YIELD_ flags. */
const char *busy_module_yield_reply; /* When non-null, we are inside RM_Yield. */
- int core_propagates; /* Is the core (in oppose to the module subsystem) is in charge of calling propagatePendingCommands? */
- int module_ctx_nesting; /* moduleCreateContext() nesting level */
char *ignore_warnings; /* Config: warnings that should be ignored. */
int client_pause_in_transaction; /* Was a client pause executed during this Exec? */
int thp_enabled; /* If true, THP is enabled. */
@@ -1553,7 +1551,9 @@ struct redisServer {
clientMemUsageBucket* client_mem_usage_buckets;
rax *clients_timeout_table; /* Radix tree for blocked clients timeouts. */
- int in_nested_call; /* If > 0, in a nested call of a call */
+ int execution_nesting; /* Execution nesting level.
+ * e.g. call(), async module stuff (timers, events, etc.),
+ * cron stuff (active expire, eviction) */
rax *clients_index; /* Active clients dictionary by client ID. */
uint32_t paused_actions; /* Bitmask of actions that are currently paused */
list *postponed_clients; /* List of postponed clients */
@@ -2434,6 +2434,7 @@ void moduleReleaseGIL(void);
void moduleNotifyKeyspaceEvent(int type, const char *event, robj *key, int dbid);
void firePostExecutionUnitJobs();
void moduleCallCommandFilters(client *c);
+void modulePostExecutionUnitOperations();
void ModuleForkDoneHandler(int exitcode, int bysignal);
int TerminateModuleForkChild(int child_pid, int wait);
ssize_t rdbSaveModulesAux(rio *rdb, int when);
diff --git a/src/tracking.c b/src/tracking.c
index d62eb736b..ba7406e6c 100644
--- a/src/tracking.c
+++ b/src/tracking.c
@@ -415,6 +415,10 @@ void trackingInvalidateKey(client *c, robj *keyobj, int bcast) {
void trackingHandlePendingKeyInvalidations(void) {
if (!listLength(server.tracking_pending_keys)) return;
+ /* Flush pending invalidation messages only when we are not in nested call.
+ * So the messages are not interleaved with transaction response. */
+ if (server.execution_nesting) return;
+
listNode *ln;
listIter li;
diff --git a/tests/unit/moduleapi/cluster.tcl b/tests/unit/moduleapi/cluster.tcl
index 8851b0301..43356f77d 100644
--- a/tests/unit/moduleapi/cluster.tcl
+++ b/tests/unit/moduleapi/cluster.tcl
@@ -219,4 +219,4 @@ start_cluster 3 0 [list config_lines $modules] {
assert_equal {PONG} [$node2 PING]
assert_equal {PONG} [$node3 PING]
}
-} \ No newline at end of file
+}
diff --git a/tests/unit/moduleapi/propagate.tcl b/tests/unit/moduleapi/propagate.tcl
index 7bdc000ee..ce4aaae44 100644
--- a/tests/unit/moduleapi/propagate.tcl
+++ b/tests/unit/moduleapi/propagate.tcl
@@ -675,8 +675,10 @@ tags "modules" {
}
}
+
tags "modules aof" {
- test {Modules RM_Replicate replicates MULTI/EXEC correctly} {
+ foreach aofload_type {debug_cmd startup} {
+ test "Modules RM_Replicate replicates MULTI/EXEC correctly: AOF-load type $aofload_type" {
start_server [list overrides [list loadmodule "$testmodule"]] {
# Enable the AOF
r config set appendonly yes
@@ -690,11 +692,34 @@ tags "modules aof" {
r propagate-test.mixed
r exec
+ assert_equal [r get counter-1] {}
+ assert_equal [r get counter-2] {}
+ assert_equal [r get using-call] 2
+ assert_equal [r get after-call] 2
+ assert_equal [r get notifications] 4
+
# Load the AOF
- r debug loadaof
+ if {$aofload_type == "debug_cmd"} {
+ r debug loadaof
+ } else {
+ r config rewrite
+ restart_server 0 true false
+ wait_done_loading r
+ }
+
+ # This module behaves bad on purpose, it only calls
+ # RM_Replicate for counter-1 and counter-2 so values
+ # after AOF-load are different
+ assert_equal [r get counter-1] 4
+ assert_equal [r get counter-2] 4
+ assert_equal [r get using-call] 2
+ assert_equal [r get after-call] 2
+ # 4+4+2+2 commands from AOF (just above) + 4 "INCR notifications" from AOF + 4 notifications for these INCRs
+ assert_equal [r get notifications] 20
assert_equal {OK} [r module unload propagate-test]
assert_equal [s 0 unexpected_error_replies] 0
}
}
+ }
}