summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorOran Agra <oran@redislabs.com>2019-11-03 17:35:35 +0200
committerOran Agra <oran@redislabs.com>2019-11-04 07:40:52 +0200
commit87332ce524e30d4949d2144a4da15b5ae17e5051 (patch)
tree1bc42130e013d13200bc5c1c1cb51e20ac310ddb
parentfdaea2a7a7eed1499f46bb98552f8d8bb8dc7e9d (diff)
downloadredis-87332ce524e30d4949d2144a4da15b5ae17e5051.tar.gz
Module API for PUBLISH, FLUSHALL, RANDOMKEY, DBSIZE
-rw-r--r--src/db.c44
-rw-r--r--src/module.c35
-rw-r--r--src/redismodule.h8
-rw-r--r--src/server.h2
4 files changed, 69 insertions, 20 deletions
diff --git a/src/db.c b/src/db.c
index 2c0a0cdd3..ad19b42dd 100644
--- a/src/db.c
+++ b/src/db.c
@@ -461,6 +461,29 @@ int getFlushCommandFlags(client *c, int *flags) {
return C_OK;
}
+/* Flushes the whole server data set. */
+void flushAllDataAndResetRDB(int flags) {
+ server.dirty += emptyDb(-1,flags,NULL);
+ if (server.rdb_child_pid != -1) killRDBChild();
+ if (server.saveparamslen > 0) {
+ /* Normally rdbSave() will reset dirty, but we don't want this here
+ * as otherwise FLUSHALL will not be replicated nor put into the AOF. */
+ int saved_dirty = server.dirty;
+ rdbSaveInfo rsi, *rsiptr;
+ rsiptr = rdbPopulateSaveInfo(&rsi);
+ rdbSave(server.rdb_filename,rsiptr);
+ server.dirty = saved_dirty;
+ }
+ server.dirty++;
+#if defined(USE_JEMALLOC)
+ /* jemalloc 5 doesn't release pages back to the OS when there's no traffic.
+ * for large databases, flushdb blocks for long anyway, so a bit more won't
+ * harm and this way the flush and purge will be synchroneus. */
+ if (!(flags & EMPTYDB_ASYNC))
+ jemalloc_purge();
+#endif
+}
+
/* FLUSHDB [ASYNC]
*
* Flushes the currently SELECTed Redis DB. */
@@ -484,28 +507,9 @@ void flushdbCommand(client *c) {
* Flushes the whole server data set. */
void flushallCommand(client *c) {
int flags;
-
if (getFlushCommandFlags(c,&flags) == C_ERR) return;
- server.dirty += emptyDb(-1,flags,NULL);
+ flushAllDataAndResetRDB(flags);
addReply(c,shared.ok);
- if (server.rdb_child_pid != -1) killRDBChild();
- if (server.saveparamslen > 0) {
- /* Normally rdbSave() will reset dirty, but we don't want this here
- * as otherwise FLUSHALL will not be replicated nor put into the AOF. */
- int saved_dirty = server.dirty;
- rdbSaveInfo rsi, *rsiptr;
- rsiptr = rdbPopulateSaveInfo(&rsi);
- rdbSave(server.rdb_filename,rsiptr);
- server.dirty = saved_dirty;
- }
- server.dirty++;
-#if defined(USE_JEMALLOC)
- /* jemalloc 5 doesn't release pages back to the OS when there's no traffic.
- * for large databases, flushdb blocks for long anyway, so a bit more won't
- * harm and this way the flush and purge will be synchroneus. */
- if (!(flags & EMPTYDB_ASYNC))
- jemalloc_purge();
-#endif
}
/* This command implements DEL and LAZYDEL. */
diff --git a/src/module.c b/src/module.c
index f9f654b42..5453aed47 100644
--- a/src/module.c
+++ b/src/module.c
@@ -1677,6 +1677,15 @@ int RM_GetClientInfoById(void *ci, uint64_t id) {
return modulePopulateClientInfoStructure(ci,client,structver);
}
+/* Publish a message to subscribers (see PUBLISH command). */
+int RM_PublishMessage(RedisModuleCtx *ctx, RedisModuleString *channel, RedisModuleString *message) {
+ UNUSED(ctx);
+ int receivers = pubsubPublishMessage(channel, message);
+ if (server.cluster_enabled)
+ clusterPropagatePublish(channel, message);
+ return receivers;
+}
+
/* Return the currently selected DB. */
int RM_GetSelectedDb(RedisModuleCtx *ctx) {
return ctx->client->db->id;
@@ -1964,6 +1973,28 @@ int RM_SetExpire(RedisModuleKey *key, mstime_t expire) {
return REDISMODULE_OK;
}
+/* Performs similar operation to FLUSHALL, and optionally start a new AOF file (if enabled)
+ * If restart_aof is true, you must make sure the command that triggered this call is not
+ * propagated to the AOF file.
+ * When async is set to true, db contents will be freed by a background thread. */
+void RM_ResetDataset(int restart_aof, int async) {
+ if (restart_aof && server.aof_state != AOF_OFF) stopAppendOnly();
+ flushAllDataAndResetRDB(async? EMPTYDB_ASYNC: EMPTYDB_NO_FLAGS);
+ if (server.aof_enabled && restart_aof) restartAOFAfterSYNC();
+}
+
+/* Returns the number of keys in the current db. */
+unsigned long long RM_DbSize(RedisModuleCtx *ctx) {
+ return dictSize(ctx->client->db->dict);
+}
+
+/* Returns a name of a random key, or NULL if current db is empty. */
+RedisModuleString *RM_RandomKey(RedisModuleCtx *ctx) {
+ robj *key = dbRandomKey(ctx->client->db);
+ autoMemoryAdd(ctx,REDISMODULE_AM_STRING,key);
+ return key;
+}
+
/* --------------------------------------------------------------------------
* Key API for String type
* -------------------------------------------------------------------------- */
@@ -6630,6 +6661,9 @@ void moduleRegisterCoreAPI(void) {
REGISTER_API(StringTruncate);
REGISTER_API(SetExpire);
REGISTER_API(GetExpire);
+ REGISTER_API(ResetDataset);
+ REGISTER_API(DbSize);
+ REGISTER_API(RandomKey);
REGISTER_API(ZsetAdd);
REGISTER_API(ZsetIncrby);
REGISTER_API(ZsetScore);
@@ -6757,6 +6791,7 @@ void moduleRegisterCoreAPI(void) {
REGISTER_API(InfoAddFieldLongLong);
REGISTER_API(InfoAddFieldULongLong);
REGISTER_API(GetClientInfoById);
+ REGISTER_API(PublishMessage);
REGISTER_API(SubscribeToServerEvent);
REGISTER_API(BlockClientOnKeys);
REGISTER_API(SignalKeyAsReady);
diff --git a/src/redismodule.h b/src/redismodule.h
index ea0d6a139..77019f89e 100644
--- a/src/redismodule.h
+++ b/src/redismodule.h
@@ -416,6 +416,9 @@ char *REDISMODULE_API_FUNC(RedisModule_StringDMA)(RedisModuleKey *key, size_t *l
int REDISMODULE_API_FUNC(RedisModule_StringTruncate)(RedisModuleKey *key, size_t newlen);
mstime_t REDISMODULE_API_FUNC(RedisModule_GetExpire)(RedisModuleKey *key);
int REDISMODULE_API_FUNC(RedisModule_SetExpire)(RedisModuleKey *key, mstime_t expire);
+void REDISMODULE_API_FUNC(RedisModule_ResetDataset)(int restart_aof, int async);
+unsigned long long REDISMODULE_API_FUNC(RedisModule_DbSize)(RedisModuleCtx *ctx);
+RedisModuleString *REDISMODULE_API_FUNC(RedisModule_RandomKey)(RedisModuleCtx *ctx);
int REDISMODULE_API_FUNC(RedisModule_ZsetAdd)(RedisModuleKey *key, double score, RedisModuleString *ele, int *flagsptr);
int REDISMODULE_API_FUNC(RedisModule_ZsetIncrby)(RedisModuleKey *key, double score, RedisModuleString *ele, int *flagsptr, double *newscore);
int REDISMODULE_API_FUNC(RedisModule_ZsetScore)(RedisModuleKey *key, RedisModuleString *ele, double *score);
@@ -435,6 +438,7 @@ int REDISMODULE_API_FUNC(RedisModule_IsKeysPositionRequest)(RedisModuleCtx *ctx)
void REDISMODULE_API_FUNC(RedisModule_KeyAtPos)(RedisModuleCtx *ctx, int pos);
unsigned long long REDISMODULE_API_FUNC(RedisModule_GetClientId)(RedisModuleCtx *ctx);
int REDISMODULE_API_FUNC(RedisModule_GetClientInfoById)(void *ci, uint64_t id);
+int REDISMODULE_API_FUNC(RedisModule_PublishMessage)(RedisModuleCtx *ctx, RedisModuleString *channel, RedisModuleString *message);
int REDISMODULE_API_FUNC(RedisModule_GetContextFlags)(RedisModuleCtx *ctx);
void *REDISMODULE_API_FUNC(RedisModule_PoolAlloc)(RedisModuleCtx *ctx, size_t bytes);
RedisModuleType *REDISMODULE_API_FUNC(RedisModule_CreateDataType)(RedisModuleCtx *ctx, const char *name, int encver, RedisModuleTypeMethods *typemethods);
@@ -619,6 +623,9 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int
REDISMODULE_GET_API(StringTruncate);
REDISMODULE_GET_API(GetExpire);
REDISMODULE_GET_API(SetExpire);
+ REDISMODULE_GET_API(ResetDataset);
+ REDISMODULE_GET_API(DbSize);
+ REDISMODULE_GET_API(RandomKey);
REDISMODULE_GET_API(ZsetAdd);
REDISMODULE_GET_API(ZsetIncrby);
REDISMODULE_GET_API(ZsetScore);
@@ -705,6 +712,7 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int
REDISMODULE_GET_API(InfoAddFieldLongLong);
REDISMODULE_GET_API(InfoAddFieldULongLong);
REDISMODULE_GET_API(GetClientInfoById);
+ REDISMODULE_GET_API(PublishMessage);
REDISMODULE_GET_API(SubscribeToServerEvent);
REDISMODULE_GET_API(BlockClientOnKeys);
REDISMODULE_GET_API(SignalKeyAsReady);
diff --git a/src/server.h b/src/server.h
index f724f7d64..4287d1adf 100644
--- a/src/server.h
+++ b/src/server.h
@@ -1862,6 +1862,7 @@ void aofRewriteBufferReset(void);
unsigned long aofRewriteBufferSize(void);
ssize_t aofReadDiffFromParent(void);
void killAppendOnlyChild(void);
+void restartAOFAfterSYNC();
/* Child info */
void openChildInfoPipe(void);
@@ -2101,6 +2102,7 @@ robj *dbUnshareStringValue(redisDb *db, robj *key, robj *o);
#define EMPTYDB_ASYNC (1<<0) /* Reclaim memory in another thread. */
long long emptyDb(int dbnum, int flags, void(callback)(void*));
long long emptyDbGeneric(redisDb *dbarray, int dbnum, int flags, void(callback)(void*));
+void flushAllDataAndResetRDB(int flags);
long long dbTotalServerKeyCount();
int selectDb(client *c, int id);