summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSalvatore Sanfilippo <antirez@gmail.com>2019-11-04 10:37:16 +0100
committerGitHub <noreply@github.com>2019-11-04 10:37:16 +0100
commitdcc4347c53cc31a8f0f63f68bfcc5e46490024fc (patch)
treef6ab96938c5f17c8bd92416e3a1df98aa91768e9
parentfdaea2a7a7eed1499f46bb98552f8d8bb8dc7e9d (diff)
parentda44c548977dc2ce5bb0bd8c9d77a1c84b609e6b (diff)
downloadredis-dcc4347c53cc31a8f0f63f68bfcc5e46490024fc.tar.gz
Merge pull request #6514 from oranagra/module_hooks
Modules hooks: complete missing hooks for the initial set of hooks
-rw-r--r--src/aof.c14
-rw-r--r--src/debug.c2
-rw-r--r--src/module.c161
-rw-r--r--src/networking.c5
-rw-r--r--src/rdb.c80
-rw-r--r--src/rdb.h11
-rw-r--r--src/redis-check-rdb.c6
-rw-r--r--src/redismodule.h86
-rw-r--r--src/replication.c55
-rw-r--r--src/server.c11
-rw-r--r--src/server.h10
-rw-r--r--tests/modules/hooks.c256
-rw-r--r--tests/unit/moduleapi/hooks.tcl134
13 files changed, 737 insertions, 94 deletions
diff --git a/src/aof.c b/src/aof.c
index 0e3648ff0..dda9579e3 100644
--- a/src/aof.c
+++ b/src/aof.c
@@ -731,7 +731,7 @@ int loadAppendOnlyFile(char *filename) {
server.aof_state = AOF_OFF;
fakeClient = createAOFClient();
- startLoadingFile(fp, filename);
+ startLoadingFile(fp, filename, RDBFLAGS_AOF_PREAMBLE);
/* Check if this AOF file has an RDB preamble. In that case we need to
* load the RDB file and later continue loading the AOF tail. */
@@ -746,7 +746,7 @@ int loadAppendOnlyFile(char *filename) {
serverLog(LL_NOTICE,"Reading RDB preamble from AOF file...");
if (fseek(fp,0,SEEK_SET) == -1) goto readerr;
rioInitWithFile(&rdb,fp);
- if (rdbLoadRio(&rdb,NULL,1) != C_OK) {
+ if (rdbLoadRio(&rdb,RDBFLAGS_AOF_PREAMBLE,NULL) != C_OK) {
serverLog(LL_WARNING,"Error reading the RDB preamble of the AOF file, AOF loading aborted");
goto readerr;
} else {
@@ -767,6 +767,7 @@ int loadAppendOnlyFile(char *filename) {
if (!(loops++ % 1000)) {
loadingProgress(ftello(fp));
processEventsWhileBlocked();
+ processModuleLoadingProgressEvent(1);
}
if (fgets(buf,sizeof(buf),fp) == NULL) {
@@ -859,7 +860,7 @@ loaded_ok: /* DB loaded, cleanup and return C_OK to the caller. */
fclose(fp);
freeFakeClient(fakeClient);
server.aof_state = old_aof_state;
- stopLoading();
+ stopLoading(1);
aofUpdateCurrentSize();
server.aof_rewrite_base_size = server.aof_current_size;
server.aof_fsync_offset = server.aof_current_size;
@@ -1400,9 +1401,11 @@ int rewriteAppendOnlyFile(char *filename) {
if (server.aof_rewrite_incremental_fsync)
rioSetAutoSync(&aof,REDIS_AUTOSYNC_BYTES);
+ startSaving(RDBFLAGS_AOF_PREAMBLE);
+
if (server.aof_use_rdb_preamble) {
int error;
- if (rdbSaveRio(&aof,&error,RDB_SAVE_AOF_PREAMBLE,NULL) == C_ERR) {
+ if (rdbSaveRio(&aof,&error,RDBFLAGS_AOF_PREAMBLE,NULL) == C_ERR) {
errno = error;
goto werr;
}
@@ -1465,15 +1468,18 @@ int rewriteAppendOnlyFile(char *filename) {
if (rename(tmpfile,filename) == -1) {
serverLog(LL_WARNING,"Error moving temp append only file on the final destination: %s", strerror(errno));
unlink(tmpfile);
+ stopSaving(0);
return C_ERR;
}
serverLog(LL_NOTICE,"SYNC append only file rewrite performed");
+ stopSaving(1);
return C_OK;
werr:
serverLog(LL_WARNING,"Write error writing append only file on disk: %s", strerror(errno));
fclose(fp);
unlink(tmpfile);
+ stopSaving(0);
return C_ERR;
}
diff --git a/src/debug.c b/src/debug.c
index 179f6d2c9..a2d37337d 100644
--- a/src/debug.c
+++ b/src/debug.c
@@ -417,7 +417,7 @@ NULL
}
emptyDb(-1,EMPTYDB_NO_FLAGS,NULL);
protectClient(c);
- int ret = rdbLoad(server.rdb_filename,NULL);
+ int ret = rdbLoad(server.rdb_filename,NULL,RDBFLAGS_NONE);
unprotectClient(c);
if (ret != C_OK) {
addReplyError(c,"Error trying to load the RDB dump");
diff --git a/src/module.c b/src/module.c
index f9f654b42..d985eba16 100644
--- a/src/module.c
+++ b/src/module.c
@@ -1625,6 +1625,27 @@ int modulePopulateClientInfoStructure(void *ci, client *client, int structver) {
return REDISMODULE_OK;
}
+/* This is an helper for moduleFireServerEvent() and other functions:
+ * It populates the replication info structure with the appropriate
+ * fields depending on the version provided. If the version is not valid
+ * then REDISMODULE_ERR is returned. Otherwise the function returns
+ * REDISMODULE_OK and the structure pointed by 'ri' gets populated. */
+int modulePopulateReplicationInfoStructure(void *ri, int structver) {
+ if (structver != 1) return REDISMODULE_ERR;
+
+ RedisModuleReplicationInfoV1 *ri1 = ri;
+ memset(ri1,0,sizeof(*ri1));
+ ri1->version = structver;
+ ri1->master = server.masterhost==NULL;
+ ri1->masterhost = server.masterhost? server.masterhost: "";
+ ri1->masterport = server.masterport;
+ ri1->replid1 = server.replid;
+ ri1->replid2 = server.replid2;
+ ri1->repl1_offset = server.master_repl_offset;
+ ri1->repl2_offset = server.second_replid_offset;
+ return REDISMODULE_OK;
+}
+
/* Return information about the client with the specified ID (that was
* previously obtained via the RedisModule_GetClientId() API). If the
* client exists, REDISMODULE_OK is returned, otherwise REDISMODULE_ERR
@@ -5949,8 +5970,8 @@ void ModuleForkDoneHandler(int exitcode, int bysignal) {
*
* The following sub events are available:
*
- * REDISMODULE_EVENT_REPLROLECHANGED_NOW_MASTER
- * REDISMODULE_EVENT_REPLROLECHANGED_NOW_REPLICA
+ * REDISMODULE_SUBEVENT_REPLROLECHANGED_NOW_MASTER
+ * REDISMODULE_SUBEVENT_REPLROLECHANGED_NOW_REPLICA
*
* The 'data' field can be casted by the callback to a
* RedisModuleReplicationInfo structure with the following fields:
@@ -5960,24 +5981,30 @@ void ModuleForkDoneHandler(int exitcode, int bysignal) {
* int masterport; // master instance port for NOW_REPLICA
* char *replid1; // Main replication ID
* char *replid2; // Secondary replication ID
+ * uint64_t repl1_offset; // Main replication offset
* uint64_t repl2_offset; // Offset of replid2 validity
- * uint64_t main_repl_offset; // Replication offset
*
* RedisModuleEvent_Persistence
*
* This event is called when RDB saving or AOF rewriting starts
* and ends. The following sub events are available:
*
- * REDISMODULE_EVENT_LOADING_RDB_START // BGSAVE start
- * REDISMODULE_EVENT_LOADING_RDB_END // BGSAVE end
- * REDISMODULE_EVENT_LOADING_SYNC_RDB_START // SAVE start
- * REDISMODULE_EVENT_LOADING_SYNC_RDB_START // SAVE end
- * REDISMODULE_EVENT_LOADING_AOF_START // AOF rewrite start
- * REDISMODULE_EVENT_LOADING_AOF_END // AOF rewrite end
+ * REDISMODULE_SUBEVENT_PERSISTENCE_RDB_START
+ * REDISMODULE_SUBEVENT_PERSISTENCE_AOF_START
+ * REDISMODULE_SUBEVENT_PERSISTENCE_SYNC_RDB_START
+ * REDISMODULE_SUBEVENT_PERSISTENCE_ENDED
+ * REDISMODULE_SUBEVENT_PERSISTENCE_FAILED
*
* The above events are triggered not just when the user calls the
* relevant commands like BGSAVE, but also when a saving operation
* or AOF rewriting occurs because of internal server triggers.
+ * The SYNC_RDB_START sub events are happening in the forground due to
+ * SAVE command, FLUSHALL, or server shutdown, and the other RDB and
+ * AOF sub events are executed in a background fork child, so any
+ * action the module takes can only affect the generated AOF or RDB,
+ * but will not be reflected in the parent process and affect connected
+ * clients and commands. Also note that the AOF_START sub event may end
+ * up saving RDB content in case of an AOF with rdb-preamble.
*
* RedisModuleEvent_FlushDB
*
@@ -5985,8 +6012,8 @@ void ModuleForkDoneHandler(int exitcode, int bysignal) {
* because of replication, after the replica synchronization)
* happened. The following sub events are available:
*
- * REDISMODULE_EVENT_FLUSHDB_START
- * REDISMODULE_EVENT_FLUSHDB_END
+ * REDISMODULE_SUBEVENT_FLUSHDB_START
+ * REDISMODULE_SUBEVENT_FLUSHDB_END
*
* The data pointer can be casted to a RedisModuleFlushInfo
* structure with the following fields:
@@ -6010,12 +6037,15 @@ void ModuleForkDoneHandler(int exitcode, int bysignal) {
* replica is loading the RDB file from the master.
* The following sub events are available:
*
- * REDISMODULE_EVENT_LOADING_RDB_START
- * REDISMODULE_EVENT_LOADING_RDB_END
- * REDISMODULE_EVENT_LOADING_MASTER_RDB_START
- * REDISMODULE_EVENT_LOADING_MASTER_RDB_END
- * REDISMODULE_EVENT_LOADING_AOF_START
- * REDISMODULE_EVENT_LOADING_AOF_END
+ * REDISMODULE_SUBEVENT_LOADING_RDB_START
+ * REDISMODULE_SUBEVENT_LOADING_AOF_START
+ * REDISMODULE_SUBEVENT_LOADING_REPL_START
+ * REDISMODULE_SUBEVENT_LOADING_ENDED
+ * REDISMODULE_SUBEVENT_LOADING_FAILED
+ *
+ * Note that AOF loading may start with an RDB data in case of
+ * rdb-preamble, in which case you'll only recieve an AOF_START event.
+ *
*
* RedisModuleEvent_ClientChange
*
@@ -6024,8 +6054,8 @@ void ModuleForkDoneHandler(int exitcode, int bysignal) {
* structure, documented in RedisModule_GetClientInfoById().
* The following sub events are available:
*
- * REDISMODULE_EVENT_CLIENT_CHANGE_CONNECTED
- * REDISMODULE_EVENT_CLIENT_CHANGE_DISCONNECTED
+ * REDISMODULE_SUBEVENT_CLIENT_CHANGE_CONNECTED
+ * REDISMODULE_SUBEVENT_CLIENT_CHANGE_DISCONNECTED
*
* RedisModuleEvent_Shutdown
*
@@ -6038,8 +6068,8 @@ void ModuleForkDoneHandler(int exitcode, int bysignal) {
* replica since it gets disconnected.
* The following sub events are availble:
*
- * REDISMODULE_EVENT_REPLICA_CHANGE_ONLINE
- * REDISMODULE_EVENT_REPLICA_CHANGE_OFFLINE
+ * REDISMODULE_SUBEVENT_REPLICA_CHANGE_ONLINE
+ * REDISMODULE_SUBEVENT_REPLICA_CHANGE_OFFLINE
*
* No additional information is available so far: future versions
* of Redis will have an API in order to enumerate the replicas
@@ -6054,6 +6084,11 @@ void ModuleForkDoneHandler(int exitcode, int bysignal) {
* this changes depending on the "hz" configuration.
* No sub events are available.
*
+ * The data pointer can be casted to a RedisModuleCronLoop
+ * structure with the following fields:
+ *
+ * int32_t hz; // Approximate number of events per second.
+ *
* RedisModuleEvent_MasterLinkChange
*
* This is called for replicas in order to notify when the
@@ -6063,8 +6098,38 @@ void ModuleForkDoneHandler(int exitcode, int bysignal) {
* replication is happening correctly.
* The following sub events are available:
*
- * REDISMODULE_EVENT_MASTER_LINK_UP
- * REDISMODULE_EVENT_MASTER_LINK_DOWN
+ * REDISMODULE_SUBEVENT_MASTER_LINK_UP
+ * REDISMODULE_SUBEVENT_MASTER_LINK_DOWN
+ *
+ * RedisModuleEvent_ModuleChange
+ *
+ * This event is called when a new module is loaded or one is unloaded.
+ * The following sub events are availble:
+ *
+ * REDISMODULE_SUBEVENT_MODULE_LOADED
+ * REDISMODULE_SUBEVENT_MODULE_UNLOADED
+ *
+ * The data pointer can be casted to a RedisModuleModuleChange
+ * structure with the following fields:
+ *
+ * const char* module_name; // Name of module loaded or unloaded.
+ * int32_t module_version; // Module version.
+ *
+ * RedisModuleEvent_LoadingProgress
+ *
+ * This event is called repeatedly called while an RDB or AOF file
+ * is being loaded.
+ * The following sub events are availble:
+ *
+ * REDISMODULE_SUBEVENT_LOADING_PROGRESS_RDB
+ * REDISMODULE_SUBEVENT_LOADING_PROGRESS_AOF
+ *
+ * The data pointer can be casted to a RedisModuleLoadingProgress
+ * structure with the following fields:
+ *
+ * int32_t hz; // Approximate number of events per second.
+ * int32_t progress; // Approximate progress between 0 and 1024,
+ * or -1 if unknown.
*
* The function returns REDISMODULE_OK if the module was successfully subscrived
* for the specified event. If the API is called from a wrong context then
@@ -6123,7 +6188,7 @@ void moduleFireServerEvent(uint64_t eid, int subid, void *data) {
listRewind(RedisModule_EventListeners,&li);
while((ln = listNext(&li))) {
RedisModuleEventListener *el = ln->value;
- if (el->event.id == eid && !el->module->in_hook) {
+ if (el->event.id == eid) {
RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
ctx.module = el->module;
@@ -6137,6 +6202,8 @@ void moduleFireServerEvent(uint64_t eid, int subid, void *data) {
void *moduledata = NULL;
RedisModuleClientInfoV1 civ1;
+ RedisModuleReplicationInfoV1 riv1;
+ RedisModuleModuleChangeV1 mcv1;
/* Start at DB zero by default when calling the handler. It's
* up to the specific event setup to change it when it makes
* sense. For instance for FLUSHDB events we select the correct
@@ -6148,11 +6215,26 @@ void moduleFireServerEvent(uint64_t eid, int subid, void *data) {
modulePopulateClientInfoStructure(&civ1,data,
el->event.dataver);
moduledata = &civ1;
+ } else if (eid == REDISMODULE_EVENT_REPLICATION_ROLE_CHANGED) {
+ modulePopulateReplicationInfoStructure(&riv1,el->event.dataver);
+ moduledata = &riv1;
} else if (eid == REDISMODULE_EVENT_FLUSHDB) {
moduledata = data;
RedisModuleFlushInfoV1 *fi = data;
if (fi->dbnum != -1)
selectDb(ctx.client, fi->dbnum);
+ } else if (eid == REDISMODULE_EVENT_MODULE_CHANGE) {
+ RedisModule *m = data;
+ if (m == el->module)
+ continue;
+ mcv1.version = REDISMODULE_MODULE_CHANGE_VERSION;
+ mcv1.module_name = m->name;
+ mcv1.module_version = m->ver;
+ moduledata = &mcv1;
+ } else if (eid == REDISMODULE_EVENT_LOADING_PROGRESS) {
+ moduledata = data;
+ } else if (eid == REDISMODULE_EVENT_CRON_LOOP) {
+ moduledata = data;
}
ModulesInHooks++;
@@ -6184,6 +6266,27 @@ void moduleUnsubscribeAllServerEvents(RedisModule *module) {
}
}
+void processModuleLoadingProgressEvent(int is_aof) {
+ long long now = ustime();
+ static long long next_event = 0;
+ if (now >= next_event) {
+ /* Fire the loading progress modules end event. */
+ int progress = -1;
+ if (server.loading_total_bytes)
+ progress = (server.loading_total_bytes<<10) / server.loading_total_bytes;
+ RedisModuleFlushInfoV1 fi = {REDISMODULE_LOADING_PROGRESS_VERSION,
+ server.hz,
+ progress};
+ moduleFireServerEvent(REDISMODULE_EVENT_LOADING_PROGRESS,
+ is_aof?
+ REDISMODULE_SUBEVENT_LOADING_PROGRESS_AOF:
+ REDISMODULE_SUBEVENT_LOADING_PROGRESS_RDB,
+ &fi);
+ /* decide when the next event should fire. */
+ next_event = now + 1000000 / server.hz;
+ }
+}
+
/* --------------------------------------------------------------------------
* Modules API internals
* -------------------------------------------------------------------------- */
@@ -6352,6 +6455,11 @@ int moduleLoad(const char *path, void **module_argv, int module_argc) {
ctx.module->blocked_clients = 0;
ctx.module->handle = handle;
serverLog(LL_NOTICE,"Module '%s' loaded from %s",ctx.module->name,path);
+ /* Fire the loaded modules event. */
+ moduleFireServerEvent(REDISMODULE_EVENT_MODULE_CHANGE,
+ REDISMODULE_SUBEVENT_MODULE_LOADED,
+ ctx.module);
+
moduleFreeContext(&ctx);
return C_OK;
}
@@ -6414,6 +6522,11 @@ int moduleUnload(sds name) {
module->name, error);
}
+ /* Fire the unloaded modules event. */
+ moduleFireServerEvent(REDISMODULE_EVENT_MODULE_CHANGE,
+ REDISMODULE_SUBEVENT_MODULE_UNLOADED,
+ module);
+
/* Remove from list of modules. */
serverLog(LL_NOTICE,"Module %s unloaded",module->name);
dictDelete(modules,module->name);
diff --git a/src/networking.c b/src/networking.c
index e7cc561fa..9336c177c 100644
--- a/src/networking.c
+++ b/src/networking.c
@@ -1118,6 +1118,11 @@ void freeClient(client *c) {
if (c->flags & CLIENT_SLAVE && listLength(server.slaves) == 0)
server.repl_no_slaves_since = server.unixtime;
refreshGoodSlavesCount();
+ /* Fire the replica change modules event. */
+ if (c->replstate == SLAVE_STATE_ONLINE)
+ moduleFireServerEvent(REDISMODULE_EVENT_REPLICA_CHANGE,
+ REDISMODULE_SUBEVENT_REPLICA_CHANGE_OFFLINE,
+ NULL);
}
/* Master/slave cleanup Case 2:
diff --git a/src/rdb.c b/src/rdb.c
index f530219a4..b569edfea 100644
--- a/src/rdb.c
+++ b/src/rdb.c
@@ -1080,9 +1080,9 @@ ssize_t rdbSaveAuxFieldStrInt(rio *rdb, char *key, long long val) {
}
/* Save a few default AUX fields with information about the RDB generated. */
-int rdbSaveInfoAuxFields(rio *rdb, int flags, rdbSaveInfo *rsi) {
+int rdbSaveInfoAuxFields(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
int redis_bits = (sizeof(void*) == 8) ? 64 : 32;
- int aof_preamble = (flags & RDB_SAVE_AOF_PREAMBLE) != 0;
+ int aof_preamble = (rdbflags & RDBFLAGS_AOF_PREAMBLE) != 0;
/* Add a few fields about the state when the RDB was created. */
if (rdbSaveAuxFieldStrStr(rdb,"redis-ver",REDIS_VERSION) == -1) return -1;
@@ -1150,7 +1150,7 @@ ssize_t rdbSaveSingleModuleAux(rio *rdb, int when, moduleType *mt) {
* When the function returns C_ERR and if 'error' is not NULL, the
* integer pointed by 'error' is set to the value of errno just after the I/O
* error. */
-int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) {
+int rdbSaveRio(rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi) {
dictIterator *di = NULL;
dictEntry *de;
char magic[10];
@@ -1162,7 +1162,7 @@ int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) {
rdb->update_cksum = rioGenericUpdateChecksum;
snprintf(magic,sizeof(magic),"REDIS%04d",RDB_VERSION);
if (rdbWriteRaw(rdb,magic,9) == -1) goto werr;
- if (rdbSaveInfoAuxFields(rdb,flags,rsi) == -1) goto werr;
+ if (rdbSaveInfoAuxFields(rdb,rdbflags,rsi) == -1) goto werr;
if (rdbSaveModulesAux(rdb, REDISMODULE_AUX_BEFORE_RDB) == -1) goto werr;
for (j = 0; j < server.dbnum; j++) {
@@ -1199,7 +1199,7 @@ int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) {
/* When this RDB is produced as part of an AOF rewrite, move
* accumulated diff from parent to child while rewriting in
* order to have a smaller final write. */
- if (flags & RDB_SAVE_AOF_PREAMBLE &&
+ if (rdbflags & RDBFLAGS_AOF_PREAMBLE &&
rdb->processed_bytes > processed+AOF_READ_DIFF_INTERVAL_BYTES)
{
processed = rdb->processed_bytes;
@@ -1254,18 +1254,21 @@ werr:
int rdbSaveRioWithEOFMark(rio *rdb, int *error, rdbSaveInfo *rsi) {
char eofmark[RDB_EOF_MARK_SIZE];
+ startSaving(RDBFLAGS_REPLICATION);
getRandomHexChars(eofmark,RDB_EOF_MARK_SIZE);
if (error) *error = 0;
if (rioWrite(rdb,"$EOF:",5) == 0) goto werr;
if (rioWrite(rdb,eofmark,RDB_EOF_MARK_SIZE) == 0) goto werr;
if (rioWrite(rdb,"\r\n",2) == 0) goto werr;
- if (rdbSaveRio(rdb,error,RDB_SAVE_NONE,rsi) == C_ERR) goto werr;
+ if (rdbSaveRio(rdb,error,RDBFLAGS_NONE,rsi) == C_ERR) goto werr;
if (rioWrite(rdb,eofmark,RDB_EOF_MARK_SIZE) == 0) goto werr;
+ stopSaving(1);
return C_OK;
werr: /* Write error. */
/* Set 'error' only if not already set by rdbSaveRio() call. */
if (error && *error == 0) *error = errno;
+ stopSaving(0);
return C_ERR;
}
@@ -1291,11 +1294,12 @@ int rdbSave(char *filename, rdbSaveInfo *rsi) {
}
rioInitWithFile(&rdb,fp);
+ startSaving(RDBFLAGS_NONE);
if (server.rdb_save_incremental_fsync)
rioSetAutoSync(&rdb,REDIS_AUTOSYNC_BYTES);
- if (rdbSaveRio(&rdb,&error,RDB_SAVE_NONE,rsi) == C_ERR) {
+ if (rdbSaveRio(&rdb,&error,RDBFLAGS_NONE,rsi) == C_ERR) {
errno = error;
goto werr;
}
@@ -1317,6 +1321,7 @@ int rdbSave(char *filename, rdbSaveInfo *rsi) {
cwdp ? cwdp : "unknown",
strerror(errno));
unlink(tmpfile);
+ stopSaving(0);
return C_ERR;
}
@@ -1324,12 +1329,14 @@ int rdbSave(char *filename, rdbSaveInfo *rsi) {
server.dirty = 0;
server.lastsave = time(NULL);
server.lastbgsave_status = C_OK;
+ stopSaving(1);
return C_OK;
werr:
serverLog(LL_WARNING,"Write error saving DB on disk: %s", strerror(errno));
fclose(fp);
unlink(tmpfile);
+ stopSaving(0);
return C_ERR;
}
@@ -1918,23 +1925,33 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) {
/* Mark that we are loading in the global state and setup the fields
* needed to provide loading stats. */
-void startLoading(size_t size) {
+void startLoading(size_t size, int rdbflags) {
/* Load the DB */
server.loading = 1;
server.loading_start_time = time(NULL);
server.loading_loaded_bytes = 0;
server.loading_total_bytes = size;
+
+ /* Fire the loading modules start event. */
+ int subevent;
+ if (rdbflags & RDBFLAGS_AOF_PREAMBLE)
+ subevent = REDISMODULE_SUBEVENT_LOADING_AOF_START;
+ else if(rdbflags & RDBFLAGS_REPLICATION)
+ subevent = REDISMODULE_SUBEVENT_LOADING_REPL_START;
+ else
+ subevent = REDISMODULE_SUBEVENT_LOADING_RDB_START;
+ moduleFireServerEvent(REDISMODULE_EVENT_LOADING,subevent,NULL);
}
/* Mark that we are loading in the global state and setup the fields
* needed to provide loading stats.
* 'filename' is optional and used for rdb-check on error */
-void startLoadingFile(FILE *fp, char* filename) {
+void startLoadingFile(FILE *fp, char* filename, int rdbflags) {
struct stat sb;
if (fstat(fileno(fp), &sb) == -1)
sb.st_size = 0;
rdbFileBeingLoaded = filename;
- startLoading(sb.st_size);
+ startLoading(sb.st_size, rdbflags);
}
/* Refresh the loading progress info */
@@ -1945,9 +1962,37 @@ void loadingProgress(off_t pos) {
}
/* Loading finished */
-void stopLoading(void) {
+void stopLoading(int success) {
server.loading = 0;
rdbFileBeingLoaded = NULL;
+
+ /* Fire the loading modules end event. */
+ moduleFireServerEvent(REDISMODULE_EVENT_LOADING,
+ success?
+ REDISMODULE_SUBEVENT_LOADING_ENDED:
+ REDISMODULE_SUBEVENT_LOADING_FAILED,
+ NULL);
+}
+
+void startSaving(int rdbflags) {
+ /* Fire the persistence modules end event. */
+ int subevent;
+ if (rdbflags & RDBFLAGS_AOF_PREAMBLE)
+ subevent = REDISMODULE_SUBEVENT_PERSISTENCE_AOF_START;
+ else if (getpid()!=server.pid)
+ subevent = REDISMODULE_SUBEVENT_PERSISTENCE_RDB_START;
+ else
+ subevent = REDISMODULE_SUBEVENT_PERSISTENCE_SYNC_RDB_START;
+ moduleFireServerEvent(REDISMODULE_EVENT_PERSISTENCE,subevent,NULL);
+}
+
+void stopSaving(int success) {
+ /* Fire the persistence modules end event. */
+ moduleFireServerEvent(REDISMODULE_EVENT_PERSISTENCE,
+ success?
+ REDISMODULE_SUBEVENT_PERSISTENCE_ENDED:
+ REDISMODULE_SUBEVENT_PERSISTENCE_FAILED,
+ NULL);
}
/* Track loading progress in order to serve client's from time to time
@@ -1966,12 +2011,13 @@ void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) {
replicationSendNewlineToMaster();
loadingProgress(r->processed_bytes);
processEventsWhileBlocked();
+ processModuleLoadingProgressEvent(0);
}
}
/* Load an RDB file from the rio stream 'rdb'. On success C_OK is returned,
* otherwise C_ERR is returned and 'errno' is set accordingly. */
-int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) {
+int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
uint64_t dbid;
int type, rdbver;
redisDb *db = server.db+0;
@@ -2182,7 +2228,7 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) {
* received from the master. In the latter case, the master is
* responsible for key expiry. If we would expire keys here, the
* snapshot taken by the master may not be reflected on the slave. */
- if (server.masterhost == NULL && !loading_aof && expiretime != -1 && expiretime < now) {
+ if (server.masterhost == NULL && !(rdbflags&RDBFLAGS_AOF_PREAMBLE) && expiretime != -1 && expiretime < now) {
decrRefCount(key);
decrRefCount(val);
} else {
@@ -2243,17 +2289,17 @@ eoferr:
*
* If you pass an 'rsi' structure initialied with RDB_SAVE_OPTION_INIT, the
* loading code will fiil the information fields in the structure. */
-int rdbLoad(char *filename, rdbSaveInfo *rsi) {
+int rdbLoad(char *filename, rdbSaveInfo *rsi, int rdbflags) {
FILE *fp;
rio rdb;
int retval;
if ((fp = fopen(filename,"r")) == NULL) return C_ERR;
- startLoadingFile(fp, filename);
+ startLoadingFile(fp, filename,rdbflags);
rioInitWithFile(&rdb,fp);
- retval = rdbLoadRio(&rdb,rsi,0);
+ retval = rdbLoadRio(&rdb,rdbflags,rsi);
fclose(fp);
- stopLoading();
+ stopLoading(retval==C_OK);
return retval;
}
diff --git a/src/rdb.h b/src/rdb.h
index 40a50f7ba..4229beea8 100644
--- a/src/rdb.h
+++ b/src/rdb.h
@@ -121,8 +121,10 @@
#define RDB_LOAD_PLAIN (1<<1)
#define RDB_LOAD_SDS (1<<2)
-#define RDB_SAVE_NONE 0
-#define RDB_SAVE_AOF_PREAMBLE (1<<0)
+/* flags on the purpose of rdb save or load */
+#define RDBFLAGS_NONE 0
+#define RDBFLAGS_AOF_PREAMBLE (1<<0)
+#define RDBFLAGS_REPLICATION (1<<1)
int rdbSaveType(rio *rdb, unsigned char type);
int rdbLoadType(rio *rdb);
@@ -135,7 +137,7 @@ uint64_t rdbLoadLen(rio *rdb, int *isencoded);
int rdbLoadLenByRef(rio *rdb, int *isencoded, uint64_t *lenptr);
int rdbSaveObjectType(rio *rdb, robj *o);
int rdbLoadObjectType(rio *rdb);
-int rdbLoad(char *filename, rdbSaveInfo *rsi);
+int rdbLoad(char *filename, rdbSaveInfo *rsi, int rdbflags);
int rdbSaveBackground(char *filename, rdbSaveInfo *rsi);
int rdbSaveToSlavesSockets(rdbSaveInfo *rsi);
void rdbRemoveTempFile(pid_t childpid);
@@ -154,7 +156,8 @@ int rdbSaveBinaryDoubleValue(rio *rdb, double val);
int rdbLoadBinaryDoubleValue(rio *rdb, double *val);
int rdbSaveBinaryFloatValue(rio *rdb, float val);
int rdbLoadBinaryFloatValue(rio *rdb, float *val);
-int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof);
+int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi);
+int rdbSaveRio(rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi);
rdbSaveInfo *rdbPopulateSaveInfo(rdbSaveInfo *rsi);
#endif
diff --git a/src/redis-check-rdb.c b/src/redis-check-rdb.c
index 5e7415046..1210d49b4 100644
--- a/src/redis-check-rdb.c
+++ b/src/redis-check-rdb.c
@@ -202,7 +202,7 @@ int redis_check_rdb(char *rdbfilename, FILE *fp) {
}
expiretime = -1;
- startLoadingFile(fp, rdbfilename);
+ startLoadingFile(fp, rdbfilename, RDBFLAGS_NONE);
while(1) {
robj *key, *val;
@@ -316,7 +316,7 @@ int redis_check_rdb(char *rdbfilename, FILE *fp) {
}
if (closefile) fclose(fp);
- stopLoading();
+ stopLoading(1);
return 0;
eoferr: /* unexpected end of file is handled here with a fatal exit */
@@ -327,7 +327,7 @@ eoferr: /* unexpected end of file is handled here with a fatal exit */
}
err:
if (closefile) fclose(fp);
- stopLoading();
+ stopLoading(0);
return 1;
}
diff --git a/src/redismodule.h b/src/redismodule.h
index ea0d6a139..0049f394e 100644
--- a/src/redismodule.h
+++ b/src/redismodule.h
@@ -181,6 +181,8 @@ typedef uint64_t RedisModuleTimerID;
#define REDISMODULE_EVENT_REPLICA_CHANGE 6
#define REDISMODULE_EVENT_MASTER_LINK_CHANGE 7
#define REDISMODULE_EVENT_CRON_LOOP 8
+#define REDISMODULE_EVENT_MODULE_CHANGE 9
+#define REDISMODULE_EVENT_LOADING_PROGRESS 10
typedef struct RedisModuleEvent {
uint64_t id; /* REDISMODULE_EVENT_... defines. */
@@ -226,18 +228,28 @@ static const RedisModuleEvent
RedisModuleEvent_MasterLinkChange = {
REDISMODULE_EVENT_MASTER_LINK_CHANGE,
1
+ },
+ RedisModuleEvent_ModuleChange = {
+ REDISMODULE_EVENT_MODULE_CHANGE,
+ 1
+ },
+ RedisModuleEvent_LoadingProgress = {
+ REDISMODULE_EVENT_LOADING_PROGRESS,
+ 1
};
/* Those are values that are used for the 'subevent' callback argument. */
#define REDISMODULE_SUBEVENT_PERSISTENCE_RDB_START 0
-#define REDISMODULE_SUBEVENT_PERSISTENCE_RDB_END 1
-#define REDISMODULE_SUBEVENT_PERSISTENCE_AOF_START 2
-#define REDISMODULE_SUBEVENT_PERSISTENCE_AOF_END 3
+#define REDISMODULE_SUBEVENT_PERSISTENCE_AOF_START 1
+#define REDISMODULE_SUBEVENT_PERSISTENCE_SYNC_RDB_START 2
+#define REDISMODULE_SUBEVENT_PERSISTENCE_ENDED 3
+#define REDISMODULE_SUBEVENT_PERSISTENCE_FAILED 4
#define REDISMODULE_SUBEVENT_LOADING_RDB_START 0
-#define REDISMODULE_SUBEVENT_LOADING_RDB_END 1
-#define REDISMODULE_SUBEVENT_LOADING_AOF_START 2
-#define REDISMODULE_SUBEVENT_LOADING_AOF_END 3
+#define REDISMODULE_SUBEVENT_LOADING_AOF_START 1
+#define REDISMODULE_SUBEVENT_LOADING_REPL_START 2
+#define REDISMODULE_SUBEVENT_LOADING_ENDED 3
+#define REDISMODULE_SUBEVENT_LOADING_FAILED 4
#define REDISMODULE_SUBEVENT_CLIENT_CHANGE_CONNECTED 0
#define REDISMODULE_SUBEVENT_CLIENT_CHANGE_DISCONNECTED 1
@@ -245,12 +257,21 @@ static const RedisModuleEvent
#define REDISMODULE_SUBEVENT_MASTER_LINK_UP 0
#define REDISMODULE_SUBEVENT_MASTER_LINK_DOWN 1
-#define REDISMODULE_SUBEVENT_REPLICA_CHANGE_CONNECTED 0
-#define REDISMODULE_SUBEVENT_REPLICA_CHANGE_DISCONNECTED 1
+#define REDISMODULE_SUBEVENT_REPLICA_CHANGE_ONLINE 0
+#define REDISMODULE_SUBEVENT_REPLICA_CHANGE_OFFLINE 1
+
+#define REDISMODULE_EVENT_REPLROLECHANGED_NOW_MASTER 0
+#define REDISMODULE_EVENT_REPLROLECHANGED_NOW_REPLICA 1
#define REDISMODULE_SUBEVENT_FLUSHDB_START 0
#define REDISMODULE_SUBEVENT_FLUSHDB_END 1
+#define REDISMODULE_SUBEVENT_MODULE_LOADED 0
+#define REDISMODULE_SUBEVENT_MODULE_UNLOADED 1
+
+#define REDISMODULE_SUBEVENT_LOADING_PROGRESS_RDB 0
+#define REDISMODULE_SUBEVENT_LOADING_PROGRESS_AOF 1
+
/* RedisModuleClientInfo flags. */
#define REDISMODULE_CLIENTINFO_FLAG_SSL (1<<0)
#define REDISMODULE_CLIENTINFO_FLAG_PUBSUB (1<<1)
@@ -286,6 +307,22 @@ typedef struct RedisModuleClientInfo {
#define RedisModuleClientInfo RedisModuleClientInfoV1
+#define REDISMODULE_REPLICATIONINFO_VERSION 1
+typedef struct RedisModuleReplicationInfo {
+ uint64_t version; /* Not used since this structure is never passed
+ from the module to the core right now. Here
+ for future compatibility. */
+ int master; /* true if master, false if replica */
+ char *masterhost; /* master instance hostname for NOW_REPLICA */
+ int masterport; /* master instance port for NOW_REPLICA */
+ char *replid1; /* Main replication ID */
+ char *replid2; /* Secondary replication ID */
+ uint64_t repl1_offset; /* Main replication offset */
+ uint64_t repl2_offset; /* Offset of replid2 validity */
+} RedisModuleReplicationInfoV1;
+
+#define RedisModuleReplicationInfo RedisModuleReplicationInfoV1
+
#define REDISMODULE_FLUSHINFO_VERSION 1
typedef struct RedisModuleFlushInfo {
uint64_t version; /* Not used since this structure is never passed
@@ -297,6 +334,39 @@ typedef struct RedisModuleFlushInfo {
#define RedisModuleFlushInfo RedisModuleFlushInfoV1
+#define REDISMODULE_MODULE_CHANGE_VERSION 1
+typedef struct RedisModuleModuleChange {
+ uint64_t version; /* Not used since this structure is never passed
+ from the module to the core right now. Here
+ for future compatibility. */
+ const char* module_name;/* Name of module loaded or unloaded. */
+ int32_t module_version; /* Module version. */
+} RedisModuleModuleChangeV1;
+
+#define RedisModuleModuleChange RedisModuleModuleChangeV1
+
+#define REDISMODULE_CRON_LOOP_VERSION 1
+typedef struct RedisModuleCronLoopInfo {
+ uint64_t version; /* Not used since this structure is never passed
+ from the module to the core right now. Here
+ for future compatibility. */
+ int32_t hz; /* Approximate number of events per second. */
+} RedisModuleCronLoopV1;
+
+#define RedisModuleCronLoop RedisModuleCronLoopV1
+
+#define REDISMODULE_LOADING_PROGRESS_VERSION 1
+typedef struct RedisModuleLoadingProgressInfo {
+ uint64_t version; /* Not used since this structure is never passed
+ from the module to the core right now. Here
+ for future compatibility. */
+ int32_t hz; /* Approximate number of events per second. */
+ int32_t progress; /* Approximate progress between 0 and 1024, or -1
+ * if unknown. */
+} RedisModuleLoadingProgressV1;
+
+#define RedisModuleLoadingProgress RedisModuleLoadingProgressV1
+
/* ------------------------- End of common defines ------------------------ */
#ifndef REDISMODULE_CORE
diff --git a/src/replication.c b/src/replication.c
index 4550e6a83..c9a2e0fe1 100644
--- a/src/replication.c
+++ b/src/replication.c
@@ -533,6 +533,12 @@ int masterTryPartialResynchronization(client *c) {
* has this state from the previous connection with the master. */
refreshGoodSlavesCount();
+
+ /* Fire the replica change modules event. */
+ moduleFireServerEvent(REDISMODULE_EVENT_REPLICA_CHANGE,
+ REDISMODULE_SUBEVENT_REPLICA_CHANGE_ONLINE,
+ NULL);
+
return C_OK; /* The caller can return, no full resync needed. */
need_full_resync:
@@ -868,6 +874,10 @@ void putSlaveOnline(client *slave) {
return;
}
refreshGoodSlavesCount();
+ /* Fire the replica change modules event. */
+ moduleFireServerEvent(REDISMODULE_EVENT_REPLICA_CHANGE,
+ REDISMODULE_SUBEVENT_REPLICA_CHANGE_ONLINE,
+ NULL);
serverLog(LL_NOTICE,"Synchronization with replica %s succeeded",
replicationGetSlaveName(slave));
}
@@ -1542,11 +1552,11 @@ void readSyncBulkPayload(connection *conn) {
* We'll restore it when the RDB is received. */
connBlock(conn);
connRecvTimeout(conn, server.repl_timeout*1000);
- startLoading(server.repl_transfer_size);
+ startLoading(server.repl_transfer_size, RDBFLAGS_REPLICATION);
- if (rdbLoadRio(&rdb,&rsi,0) != C_OK) {
+ if (rdbLoadRio(&rdb,RDBFLAGS_REPLICATION,&rsi) != C_OK) {
/* RDB loading failed. */
- stopLoading();
+ stopLoading(0);
serverLog(LL_WARNING,
"Failed trying to load the MASTER synchronization DB "
"from socket");
@@ -1567,7 +1577,7 @@ void readSyncBulkPayload(connection *conn) {
* gets promoted. */
return;
}
- stopLoading();
+ stopLoading(1);
/* RDB loading succeeded if we reach this point. */
if (server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) {
@@ -1614,7 +1624,7 @@ void readSyncBulkPayload(connection *conn) {
cancelReplicationHandshake();
return;
}
- if (rdbLoad(server.rdb_filename,&rsi) != C_OK) {
+ if (rdbLoad(server.rdb_filename,&rsi,RDBFLAGS_REPLICATION) != C_OK) {
serverLog(LL_WARNING,
"Failed trying to load the MASTER synchronization "
"DB from disk");
@@ -1636,6 +1646,11 @@ void readSyncBulkPayload(connection *conn) {
server.repl_state = REPL_STATE_CONNECTED;
server.repl_down_since = 0;
+ /* Fire the master link modules event. */
+ moduleFireServerEvent(REDISMODULE_EVENT_MASTER_LINK_CHANGE,
+ REDISMODULE_SUBEVENT_MASTER_LINK_UP,
+ NULL);
+
/* After a full resynchroniziation we use the replication ID and
* offset of the master. The secondary ID / offset are cleared since
* we are starting a new history. */
@@ -2314,12 +2329,31 @@ void replicationSetMaster(char *ip, int port) {
replicationDiscardCachedMaster();
replicationCacheMasterUsingMyself();
}
+
+ /* Fire the role change modules event. */
+ moduleFireServerEvent(REDISMODULE_EVENT_REPLICATION_ROLE_CHANGED,
+ REDISMODULE_EVENT_REPLROLECHANGED_NOW_REPLICA,
+ NULL);
+
+ /* Fire the master link modules event. */
+ if (server.repl_state == REPL_STATE_CONNECTED)
+ moduleFireServerEvent(REDISMODULE_EVENT_MASTER_LINK_CHANGE,
+ REDISMODULE_SUBEVENT_MASTER_LINK_DOWN,
+ NULL);
+
server.repl_state = REPL_STATE_CONNECT;
}
/* Cancel replication, setting the instance as a master itself. */
void replicationUnsetMaster(void) {
if (server.masterhost == NULL) return; /* Nothing to do. */
+
+ /* Fire the master link modules event. */
+ if (server.repl_state == REPL_STATE_CONNECTED)
+ moduleFireServerEvent(REDISMODULE_EVENT_MASTER_LINK_CHANGE,
+ REDISMODULE_SUBEVENT_MASTER_LINK_DOWN,
+ NULL);
+
sdsfree(server.masterhost);
server.masterhost = NULL;
/* When a slave is turned into a master, the current replication ID
@@ -2348,11 +2382,22 @@ void replicationUnsetMaster(void) {
* starting from now. Otherwise the backlog will be freed after a
* failover if slaves do not connect immediately. */
server.repl_no_slaves_since = server.unixtime;
+
+ /* Fire the role change modules event. */
+ moduleFireServerEvent(REDISMODULE_EVENT_REPLICATION_ROLE_CHANGED,
+ REDISMODULE_EVENT_REPLROLECHANGED_NOW_MASTER,
+ NULL);
}
/* This function is called when the slave lose the connection with the
* master into an unexpected way. */
void replicationHandleMasterDisconnection(void) {
+ /* Fire the master link modules event. */
+ if (server.repl_state == REPL_STATE_CONNECTED)
+ moduleFireServerEvent(REDISMODULE_EVENT_MASTER_LINK_CHANGE,
+ REDISMODULE_SUBEVENT_MASTER_LINK_DOWN,
+ NULL);
+
server.master = NULL;
server.repl_state = REPL_STATE_CONNECT;
server.repl_down_since = server.unixtime;
diff --git a/src/server.c b/src/server.c
index 8f165113d..f42924764 100644
--- a/src/server.c
+++ b/src/server.c
@@ -2056,6 +2056,12 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
server.rdb_bgsave_scheduled = 0;
}
+ /* Fire the cron loop modules event. */
+ RedisModuleCronLoopV1 ei = {REDISMODULE_CRON_LOOP_VERSION,server.hz};
+ moduleFireServerEvent(REDISMODULE_EVENT_CRON_LOOP,
+ 0,
+ &ei);
+
server.cronloops++;
return 1000/server.hz;
}
@@ -3682,6 +3688,9 @@ int prepareForShutdown(int flags) {
}
}
+ /* Fire the shutdown modules event. */
+ moduleFireServerEvent(REDISMODULE_EVENT_SHUTDOWN,0,NULL);
+
/* Remove the pid file if possible and needed. */
if (server.daemonize || server.pidfile) {
serverLog(LL_NOTICE,"Removing the pid file.");
@@ -4767,7 +4776,7 @@ void loadDataFromDisk(void) {
serverLog(LL_NOTICE,"DB loaded from append only file: %.3f seconds",(float)(ustime()-start)/1000000);
} else {
rdbSaveInfo rsi = RDB_SAVE_INFO_INIT;
- if (rdbLoad(server.rdb_filename,&rsi) == C_OK) {
+ if (rdbLoad(server.rdb_filename,&rsi,RDBFLAGS_NONE) == C_OK) {
serverLog(LL_NOTICE,"DB loaded from disk: %.3f seconds",
(float)(ustime()-start)/1000000);
diff --git a/src/server.h b/src/server.h
index f724f7d64..fba24b195 100644
--- a/src/server.h
+++ b/src/server.h
@@ -1602,6 +1602,7 @@ ssize_t rdbSaveModulesAux(rio *rdb, int when);
int moduleAllDatatypesHandleErrors();
sds modulesCollectInfo(sds info, sds section, int for_crash_report, int sections);
void moduleFireServerEvent(uint64_t eid, int subid, void *data);
+void processModuleLoadingProgressEvent(int is_aof);
int moduleTryServeClientBlockedOnKey(client *c, robj *key);
void moduleUnblockClient(client *c);
int moduleClientIsBlockedOnKeys(client *c);
@@ -1834,10 +1835,12 @@ void rdbPipeReadHandler(struct aeEventLoop *eventLoop, int fd, void *clientData,
void rdbPipeWriteHandlerConnRemoved(struct connection *conn);
/* Generic persistence functions */
-void startLoadingFile(FILE* fp, char* filename);
-void startLoading(size_t size);
+void startLoadingFile(FILE* fp, char* filename, int rdbflags);
+void startLoading(size_t size, int rdbflags);
void loadingProgress(off_t pos);
-void stopLoading(void);
+void stopLoading(int success);
+void startSaving(int rdbflags);
+void stopSaving(int success);
#define DISK_ERROR_TYPE_AOF 1 /* Don't accept writes: AOF errors. */
#define DISK_ERROR_TYPE_RDB 2 /* Don't accept writes: RDB errors. */
@@ -1846,7 +1849,6 @@ int writeCommandsDeniedByDiskError(void);
/* RDB persistence */
#include "rdb.h"
-int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi);
void killRDBChild(void);
/* AOF persistence */
diff --git a/tests/modules/hooks.c b/tests/modules/hooks.c
index 33b690b2f..665a20481 100644
--- a/tests/modules/hooks.c
+++ b/tests/modules/hooks.c
@@ -30,36 +30,227 @@
* POSSIBILITY OF SUCH DAMAGE.
*/
-#define REDISMODULE_EXPERIMENTAL_API
#include "redismodule.h"
+#include <stdio.h>
+#include <string.h>
+
+/* We need to store events to be able to test and see what we got, and we can't
+ * store them in the key-space since that would mess up rdb loading (duplicates)
+ * and be lost of flushdb. */
+RedisModuleDict *event_log = NULL;
+
+typedef struct EventElement {
+ long count;
+ RedisModuleString *last_val_string;
+ long last_val_int;
+} EventElement;
+
+void LogStringEvent(RedisModuleCtx *ctx, const char* keyname, const char* data) {
+ EventElement *event = RedisModule_DictGetC(event_log, (void*)keyname, strlen(keyname), NULL);
+ if (!event) {
+ event = RedisModule_Alloc(sizeof(EventElement));
+ memset(event, 0, sizeof(EventElement));
+ RedisModule_DictSetC(event_log, (void*)keyname, strlen(keyname), event);
+ }
+ if (event->last_val_string) RedisModule_FreeString(ctx, event->last_val_string);
+ event->last_val_string = RedisModule_CreateString(ctx, data, strlen(data));
+ event->count++;
+}
+
+void LogNumericEvent(RedisModuleCtx *ctx, const char* keyname, long data) {
+ REDISMODULE_NOT_USED(ctx);
+ EventElement *event = RedisModule_DictGetC(event_log, (void*)keyname, strlen(keyname), NULL);
+ if (!event) {
+ event = RedisModule_Alloc(sizeof(EventElement));
+ memset(event, 0, sizeof(EventElement));
+ RedisModule_DictSetC(event_log, (void*)keyname, strlen(keyname), event);
+ }
+ event->last_val_int = data;
+ event->count++;
+}
+
+void FreeEvent(RedisModuleCtx *ctx, EventElement *event) {
+ if (event->last_val_string)
+ RedisModule_FreeString(ctx, event->last_val_string);
+ RedisModule_Free(event);
+}
+
+int cmdEventCount(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
+{
+ if (argc != 2){
+ RedisModule_WrongArity(ctx);
+ return REDISMODULE_OK;
+ }
+
+ EventElement *event = RedisModule_DictGet(event_log, argv[1], NULL);
+ RedisModule_ReplyWithLongLong(ctx, event? event->count: 0);
+ return REDISMODULE_OK;
+}
+
+int cmdEventLast(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
+{
+ if (argc != 2){
+ RedisModule_WrongArity(ctx);
+ return REDISMODULE_OK;
+ }
+
+ EventElement *event = RedisModule_DictGet(event_log, argv[1], NULL);
+ if (event && event->last_val_string)
+ RedisModule_ReplyWithString(ctx, event->last_val_string);
+ else if (event)
+ RedisModule_ReplyWithLongLong(ctx, event->last_val_int);
+ else
+ RedisModule_ReplyWithNull(ctx);
+ return REDISMODULE_OK;
+}
+
+void clearEvents(RedisModuleCtx *ctx)
+{
+ RedisModuleString *key;
+ EventElement *event;
+ RedisModuleDictIter *iter = RedisModule_DictIteratorStart(event_log, "^", NULL);
+ while((key = RedisModule_DictNext(ctx, iter, (void**)&event)) != NULL) {
+ event->count = 0;
+ event->last_val_int = 0;
+ if (event->last_val_string) RedisModule_FreeString(ctx, event->last_val_string);
+ event->last_val_string = NULL;
+ RedisModule_DictDel(event_log, key, NULL);
+ RedisModule_Free(event);
+ }
+ RedisModule_DictIteratorStop(iter);
+}
+
+int cmdEventsClear(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
+{
+ REDISMODULE_NOT_USED(argc);
+ REDISMODULE_NOT_USED(argv);
+ clearEvents(ctx);
+ return REDISMODULE_OK;
+}
/* Client state change callback. */
void clientChangeCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data)
{
- REDISMODULE_NOT_USED(ctx);
REDISMODULE_NOT_USED(e);
RedisModuleClientInfo *ci = data;
char *keyname = (sub == REDISMODULE_SUBEVENT_CLIENT_CHANGE_CONNECTED) ?
- "connected" : "disconnected";
- RedisModuleCallReply *reply;
- RedisModule_SelectDb(ctx,9);
- reply = RedisModule_Call(ctx,"RPUSH","cl",keyname,(long)ci->id);
- RedisModule_FreeCallReply(reply);
+ "client-connected" : "client-disconnected";
+ LogNumericEvent(ctx, keyname, ci->id);
}
void flushdbCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data)
{
- REDISMODULE_NOT_USED(ctx);
REDISMODULE_NOT_USED(e);
RedisModuleFlushInfo *fi = data;
char *keyname = (sub == REDISMODULE_SUBEVENT_FLUSHDB_START) ?
"flush-start" : "flush-end";
- RedisModuleCallReply *reply;
- RedisModule_SelectDb(ctx,9);
- reply = RedisModule_Call(ctx,"RPUSH","cl",keyname,(long)fi->dbnum);
- RedisModule_FreeCallReply(reply);
+ LogNumericEvent(ctx, keyname, fi->dbnum);
+}
+
+void roleChangeCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data)
+{
+ REDISMODULE_NOT_USED(e);
+ REDISMODULE_NOT_USED(data);
+
+ RedisModuleReplicationInfo *ri = data;
+ char *keyname = (sub == REDISMODULE_EVENT_REPLROLECHANGED_NOW_MASTER) ?
+ "role-master" : "role-replica";
+ LogStringEvent(ctx, keyname, ri->masterhost);
+}
+
+void replicationChangeCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data)
+{
+ REDISMODULE_NOT_USED(e);
+ REDISMODULE_NOT_USED(data);
+
+ char *keyname = (sub == REDISMODULE_SUBEVENT_REPLICA_CHANGE_ONLINE) ?
+ "replica-online" : "replica-offline";
+ LogNumericEvent(ctx, keyname, 0);
+}
+
+void rasterLinkChangeCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data)
+{
+ REDISMODULE_NOT_USED(e);
+ REDISMODULE_NOT_USED(data);
+
+ char *keyname = (sub == REDISMODULE_SUBEVENT_MASTER_LINK_UP) ?
+ "masterlink-up" : "masterlink-down";
+ LogNumericEvent(ctx, keyname, 0);
+}
+
+void persistenceCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data)
+{
+ REDISMODULE_NOT_USED(e);
+ REDISMODULE_NOT_USED(data);
+
+ char *keyname = NULL;
+ switch (sub) {
+ case REDISMODULE_SUBEVENT_PERSISTENCE_RDB_START: keyname = "persistence-rdb-start"; break;
+ case REDISMODULE_SUBEVENT_PERSISTENCE_AOF_START: keyname = "persistence-aof-start"; break;
+ case REDISMODULE_SUBEVENT_PERSISTENCE_SYNC_RDB_START: keyname = "persistence-syncrdb-start"; break;
+ case REDISMODULE_SUBEVENT_PERSISTENCE_ENDED: keyname = "persistence-end"; break;
+ case REDISMODULE_SUBEVENT_PERSISTENCE_FAILED: keyname = "persistence-failed"; break;
+ }
+ /* modifying the keyspace from the fork child is not an option, using log instead */
+ RedisModule_Log(ctx, "warning", "module-event-%s", keyname);
+ if (sub == REDISMODULE_SUBEVENT_PERSISTENCE_SYNC_RDB_START)
+ LogNumericEvent(ctx, keyname, 0);
+}
+
+void loadingCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data)
+{
+ REDISMODULE_NOT_USED(e);
+ REDISMODULE_NOT_USED(data);
+
+ char *keyname = NULL;
+ switch (sub) {
+ case REDISMODULE_SUBEVENT_LOADING_RDB_START: keyname = "loading-rdb-start"; break;
+ case REDISMODULE_SUBEVENT_LOADING_AOF_START: keyname = "loading-aof-start"; break;
+ case REDISMODULE_SUBEVENT_LOADING_REPL_START: keyname = "loading-repl-start"; break;
+ case REDISMODULE_SUBEVENT_LOADING_ENDED: keyname = "loading-end"; break;
+ case REDISMODULE_SUBEVENT_LOADING_FAILED: keyname = "loading-failed"; break;
+ }
+ LogNumericEvent(ctx, keyname, 0);
+}
+
+void loadingProgressCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data)
+{
+ REDISMODULE_NOT_USED(e);
+
+ RedisModuleLoadingProgress *ei = data;
+ char *keyname = (sub == REDISMODULE_SUBEVENT_LOADING_PROGRESS_RDB) ?
+ "loading-progress-rdb" : "loading-progress-aof";
+ LogNumericEvent(ctx, keyname, ei->progress);
+}
+
+void shutdownCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data)
+{
+ REDISMODULE_NOT_USED(e);
+ REDISMODULE_NOT_USED(data);
+ REDISMODULE_NOT_USED(sub);
+
+ RedisModule_Log(ctx, "warning", "module-event-%s", "shutdown");
+}
+
+void cronLoopCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data)
+{
+ REDISMODULE_NOT_USED(e);
+ REDISMODULE_NOT_USED(sub);
+
+ RedisModuleCronLoop *ei = data;
+ LogNumericEvent(ctx, "cron-loop", ei->hz);
+}
+
+void moduleChangeCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data)
+{
+ REDISMODULE_NOT_USED(e);
+
+ RedisModuleModuleChange *ei = data;
+ char *keyname = (sub == REDISMODULE_SUBEVENT_MODULE_LOADED) ?
+ "module-loaded" : "module-unloaded";
+ LogStringEvent(ctx, keyname, ei->module_name);
}
/* This function must be present on each Redis module. It is used in order to
@@ -71,9 +262,50 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
if (RedisModule_Init(ctx,"testhook",1,REDISMODULE_APIVER_1)
== REDISMODULE_ERR) return REDISMODULE_ERR;
+ /* replication related hooks */
+ RedisModule_SubscribeToServerEvent(ctx,
+ RedisModuleEvent_ReplicationRoleChanged, roleChangeCallback);
+ RedisModule_SubscribeToServerEvent(ctx,
+ RedisModuleEvent_ReplicaChange, replicationChangeCallback);
+ RedisModule_SubscribeToServerEvent(ctx,
+ RedisModuleEvent_MasterLinkChange, rasterLinkChangeCallback);
+
+ /* persistence related hooks */
+ RedisModule_SubscribeToServerEvent(ctx,
+ RedisModuleEvent_Persistence, persistenceCallback);
+ RedisModule_SubscribeToServerEvent(ctx,
+ RedisModuleEvent_Loading, loadingCallback);
+ RedisModule_SubscribeToServerEvent(ctx,
+ RedisModuleEvent_LoadingProgress, loadingProgressCallback);
+
+ /* other hooks */
RedisModule_SubscribeToServerEvent(ctx,
RedisModuleEvent_ClientChange, clientChangeCallback);
RedisModule_SubscribeToServerEvent(ctx,
RedisModuleEvent_FlushDB, flushdbCallback);
+ RedisModule_SubscribeToServerEvent(ctx,
+ RedisModuleEvent_Shutdown, shutdownCallback);
+ RedisModule_SubscribeToServerEvent(ctx,
+ RedisModuleEvent_CronLoop, cronLoopCallback);
+ RedisModule_SubscribeToServerEvent(ctx,
+ RedisModuleEvent_ModuleChange, moduleChangeCallback);
+
+ event_log = RedisModule_CreateDict(ctx);
+
+ if (RedisModule_CreateCommand(ctx,"hooks.event_count", cmdEventCount,"",0,0,0) == REDISMODULE_ERR)
+ return REDISMODULE_ERR;
+ if (RedisModule_CreateCommand(ctx,"hooks.event_last", cmdEventLast,"",0,0,0) == REDISMODULE_ERR)
+ return REDISMODULE_ERR;
+ if (RedisModule_CreateCommand(ctx,"hooks.clear", cmdEventsClear,"",0,0,0) == REDISMODULE_ERR)
+ return REDISMODULE_ERR;
+
return REDISMODULE_OK;
}
+
+int RedisModule_OnUnload(RedisModuleCtx *ctx) {
+ clearEvents(ctx);
+ RedisModule_FreeDict(ctx, event_log);
+ event_log = NULL;
+ return REDISMODULE_OK;
+}
+
diff --git a/tests/unit/moduleapi/hooks.tcl b/tests/unit/moduleapi/hooks.tcl
index 7a727902d..cbca9c3eb 100644
--- a/tests/unit/moduleapi/hooks.tcl
+++ b/tests/unit/moduleapi/hooks.tcl
@@ -3,26 +3,138 @@ set testmodule [file normalize tests/modules/hooks.so]
tags "modules" {
start_server {} {
r module load $testmodule
+ r config set appendonly yes
+
test {Test clients connection / disconnection hooks} {
for {set j 0} {$j < 2} {incr j} {
set rd1 [redis_deferring_client]
$rd1 close
}
- assert {[r llen connected] > 1}
- assert {[r llen disconnected] > 1}
+ assert {[r hooks.event_count client-connected] > 1}
+ assert {[r hooks.event_count client-disconnected] > 1}
+ }
+
+ test {Test module cron hook} {
+ after 100
+ assert {[r hooks.event_count cron-loop] > 0}
+ set hz [r hooks.event_last cron-loop]
+ assert_equal $hz 10
+ }
+
+ test {Test module loaded / unloaded hooks} {
+ set othermodule [file normalize tests/modules/infotest.so]
+ r module load $othermodule
+ r module unload infotest
+ assert_equal [r hooks.event_last module-loaded] "infotest"
+ assert_equal [r hooks.event_last module-unloaded] "infotest"
+ }
+
+ test {Test module aofrw hook} {
+ r debug populate 1000 foo 10000 ;# 10mb worth of data
+ r config set rdbcompression no ;# rdb progress is only checked once in 2mb
+ r BGREWRITEAOF
+ waitForBgrewriteaof r
+ assert_equal [string match {*module-event-persistence-aof-start*} [exec tail -20 < [srv 0 stdout]]] 1
+ assert_equal [string match {*module-event-persistence-end*} [exec tail -20 < [srv 0 stdout]]] 1
+ }
+
+ test {Test module aof load and rdb/aof progress hooks} {
+ # create some aof tail (progress is checked only once in 1000 commands)
+ for {set j 0} {$j < 4000} {incr j} {
+ r set "bar$j" x
+ }
+ # set some configs that will cause many loading progress events during aof loading
+ r config set key-load-delay 1
+ r config set dynamic-hz no
+ r config set hz 500
+ r DEBUG LOADAOF
+ assert_equal [r hooks.event_last loading-aof-start] 0
+ assert_equal [r hooks.event_last loading-end] 0
+ assert {[r hooks.event_count loading-rdb-start] == 0}
+ assert {[r hooks.event_count loading-progress-rdb] >= 2} ;# comes from the preamble section
+ assert {[r hooks.event_count loading-progress-aof] >= 2}
+ }
+ # undo configs before next test
+ r config set dynamic-hz yes
+ r config set key-load-delay 0
+
+ test {Test module rdb save hook} {
+ # debug reload does: save, flush, load:
+ assert {[r hooks.event_count persistence-syncrdb-start] == 0}
+ assert {[r hooks.event_count loading-rdb-start] == 0}
+ r debug reload
+ assert {[r hooks.event_count persistence-syncrdb-start] == 1}
+ assert {[r hooks.event_count loading-rdb-start] == 1}
}
test {Test flushdb hooks} {
- r flushall ;# Note: only the "end" RPUSH will survive
- r select 1
- r flushdb
- r select 2
r flushdb
- r select 9
- assert {[r llen flush-start] == 2}
- assert {[r llen flush-end] == 3}
- assert {[r lrange flush-start 0 -1] eq {1 2}}
- assert {[r lrange flush-end 0 -1] eq {-1 1 2}}
+ assert_equal [r hooks.event_last flush-start] 9
+ assert_equal [r hooks.event_last flush-end] 9
+ r flushall
+ assert_equal [r hooks.event_last flush-start] -1
+ assert_equal [r hooks.event_last flush-end] -1
+ }
+
+ # replication related tests
+ set master [srv 0 client]
+ set master_host [srv 0 host]
+ set master_port [srv 0 port]
+ start_server {} {
+ r module load $testmodule
+ set replica [srv 0 client]
+ set replica_host [srv 0 host]
+ set replica_port [srv 0 port]
+ $replica replicaof $master_host $master_port
+
+ wait_for_condition 50 100 {
+ [string match {*master_link_status:up*} [r info replication]]
+ } else {
+ fail "Can't turn the instance into a replica"
+ }
+
+ test {Test master link up hook} {
+ assert_equal [r hooks.event_count masterlink-up] 1
+ assert_equal [r hooks.event_count masterlink-down] 0
+ }
+
+ test {Test role-replica hook} {
+ assert_equal [r hooks.event_count role-replica] 1
+ assert_equal [r hooks.event_count role-master] 0
+ assert_equal [r hooks.event_last role-replica] [s 0 master_host]
+ }
+
+ test {Test replica-online hook} {
+ assert_equal [r -1 hooks.event_count replica-online] 1
+ assert_equal [r -1 hooks.event_count replica-offline] 0
+ }
+
+ test {Test master link down hook} {
+ r client kill type master
+ assert_equal [r hooks.event_count masterlink-down] 1
+ }
+
+ $replica replicaof no one
+
+ test {Test role-master hook} {
+ assert_equal [r hooks.event_count role-replica] 1
+ assert_equal [r hooks.event_count role-master] 1
+ assert_equal [r hooks.event_last role-master] {}
+ }
+
+ test {Test replica-offline hook} {
+ assert_equal [r -1 hooks.event_count replica-online] 1
+ assert_equal [r -1 hooks.event_count replica-offline] 1
+ }
+ # get the replica stdout, to be used by the next test
+ set replica_stdout [srv 0 stdout]
}
+
+
+ # look into the log file of the server that just exited
+ test {Test shutdown hook} {
+ assert_equal [string match {*module-event-shutdown*} [exec tail -5 < $replica_stdout]] 1
+ }
+
}
}