summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-xruntest-moduleapi1
-rw-r--r--src/module.c133
-rw-r--r--src/redismodule.h10
-rw-r--r--tests/modules/Makefile3
-rw-r--r--tests/modules/scan.c62
-rw-r--r--tests/unit/moduleapi/scan.tcl18
6 files changed, 215 insertions, 12 deletions
diff --git a/runtest-moduleapi b/runtest-moduleapi
index e48535126..3eb6b21b2 100755
--- a/runtest-moduleapi
+++ b/runtest-moduleapi
@@ -22,4 +22,5 @@ $TCLSH tests/test_helper.tcl \
--single unit/moduleapi/hooks \
--single unit/moduleapi/misc \
--single unit/moduleapi/blockonkeys \
+--single unit/moduleapi/scan \
"${@}"
diff --git a/src/module.c b/src/module.c
index ad34e7b64..5758abbb6 100644
--- a/src/module.c
+++ b/src/module.c
@@ -1848,6 +1848,17 @@ int RM_SelectDb(RedisModuleCtx *ctx, int newid) {
return (retval == C_OK) ? REDISMODULE_OK : REDISMODULE_ERR;
}
+static void initializeKey(RedisModuleKey *kp, RedisModuleCtx *ctx, robj *keyname, robj *value, int mode){
+ kp->ctx = ctx;
+ kp->db = ctx->client->db;
+ kp->key = keyname;
+ incrRefCount(keyname);
+ kp->value = value;
+ kp->iter = NULL;
+ kp->mode = mode;
+ zsetKeyReset(kp);
+}
+
/* Return an handle representing a Redis key, so that it is possible
* to call other APIs with the key handle as argument to perform
* operations on the key.
@@ -1878,27 +1889,24 @@ void *RM_OpenKey(RedisModuleCtx *ctx, robj *keyname, int mode) {
/* Setup the key handle. */
kp = zmalloc(sizeof(*kp));
- kp->ctx = ctx;
- kp->db = ctx->client->db;
- kp->key = keyname;
- incrRefCount(keyname);
- kp->value = value;
- kp->iter = NULL;
- kp->mode = mode;
- zsetKeyReset(kp);
+ initializeKey(kp, ctx, keyname, value, mode);
autoMemoryAdd(ctx,REDISMODULE_AM_KEY,kp);
return (void*)kp;
}
-/* Close a key handle. */
-void RM_CloseKey(RedisModuleKey *key) {
- if (key == NULL) return;
+static void closeKeyInternal(RedisModuleKey *key) {
int signal = SHOULD_SIGNAL_MODIFIED_KEYS(key->ctx);
if ((key->mode & REDISMODULE_WRITE) && signal)
signalModifiedKey(key->db,key->key);
/* TODO: if (key->iter) RM_KeyIteratorStop(kp); */
RM_ZsetRangeStop(key);
decrRefCount(key->key);
+}
+
+/* Close a key handle. */
+void RM_CloseKey(RedisModuleKey *key) {
+ if (key == NULL) return;
+ closeKeyInternal(key);
autoMemoryFreed(key->ctx,REDISMODULE_AM_KEY,key);
zfree(key);
}
@@ -5891,6 +5899,105 @@ int RM_CommandFilterArgDelete(RedisModuleCommandFilterCtx *fctx, int pos)
return REDISMODULE_OK;
}
+/**
+ * Callback for scan implementation.
+ *
+ * The keyname is owned by the caller and need to be retained if used after this function.
+ *
+ * The kp is the data and provide using the best efforts approach, in some cases it might
+ * not be available (in such case it will be set to NULL) and it is the user responsibility
+ * to handle it.
+ *
+ * The kp (if given) is owned by the caller and will be free when the callback returns
+ *
+ */
+typedef void (*RedisModuleScanCB)(void *privdata, RedisModuleString* keyname, RedisModuleKey* key);
+
+typedef struct {
+ RedisModuleCtx *ctx;
+ void* user_data;
+ RedisModuleScanCB fn;
+} ScanCBData;
+
+typedef struct RedisModuleCursor{
+ int cursor;
+}RedisModuleCursor;
+
+void ScanCallback(void *privdata, const dictEntry *de) {
+ ScanCBData *data = privdata;
+ sds key = dictGetKey(de);
+ robj* val = dictGetVal(de);
+ RedisModuleString *keyname = createObject(OBJ_STRING,sdsdup(key));
+
+ /* Setup the key handle. */
+ RedisModuleKey kp = {0};
+ initializeKey(&kp, data->ctx, keyname, val, REDISMODULE_READ);
+
+ data->fn(data->user_data, keyname, &kp);
+
+ closeKeyInternal(&kp);
+ decrRefCount(keyname);
+}
+
+/**
+ * Create a new cursor to scan keys.
+ */
+RedisModuleCursor* RM_CursorCreate() {
+ RedisModuleCursor* cursor = zmalloc(sizeof(*cursor));
+ cursor->cursor = 0;
+ return cursor;
+}
+
+/**
+ * Restart an existing cursor. The keys will be rescanned.
+ */
+void RM_CursorRestart(RedisModuleCursor* cursor) {
+ cursor->cursor = 0;
+}
+
+/**
+ * Destroy the cursor struct.
+ */
+void RM_CursorDestroy(RedisModuleCursor* cursor) {
+ zfree(cursor);
+}
+
+/**
+ * Scan api that allows module writer to scan all the keys and value in redis.
+ * The way it should be used:
+ * Cursor* c = RedisModule_CursorCreate();
+ * while(RedisModule_Scan(ctx, c, callback, privateData));
+ * RedisModule_CursorDestroy(c);
+ *
+ * It is also possible to use this api from another thread such that the GIL only have to
+ * be acquired durring the actuall call to RM_Scan:
+ * Cursor* c = RedisModule_CursorCreate();
+ * RedisModule_ThreadSafeCtxLock(ctx);
+ * while(RedisModule_Scan(ctx, c, callback, privateData)){
+ * RedisModule_ThreadSafeCtxUnlock(ctx);
+ * // do some background job
+ * RedisModule_ThreadSafeCtxLock(ctx);
+ * }
+ * RedisModule_CursorDestroy(c);
+ *
+ * The function will return 1 if there is more elements to scan and 0 otherwise.
+ * It is also possible to restart and existing cursor using RM_CursorRestart
+ */
+int RM_Scan(RedisModuleCtx *ctx, RedisModuleCursor* cursor, RedisModuleScanCB fn, void* privdata) {
+ if(cursor->cursor == -1){
+ return 0;
+ }
+ int ret = 1;
+ ScanCBData data = { ctx, privdata, fn };
+ cursor->cursor = dictScan(ctx->client->db->dict, cursor->cursor, ScanCallback, NULL, &data);
+ if (cursor->cursor == 0){
+ cursor->cursor = -1;
+ ret = 0;
+ }
+ return ret;
+}
+
+
/* --------------------------------------------------------------------------
* Module fork API
* -------------------------------------------------------------------------- */
@@ -6969,6 +7076,10 @@ void moduleRegisterCoreAPI(void) {
REGISTER_API(SetLRUOrLFU);
REGISTER_API(GetLRUOrLFU);
REGISTER_API(BlockClientOnKeys);
+ REGISTER_API(Scan);
+ REGISTER_API(CursorCreate);
+ REGISTER_API(CursorDestroy);
+ REGISTER_API(CursorRestart);
REGISTER_API(SignalKeyAsReady);
REGISTER_API(GetBlockedClientReadyKey);
}
diff --git a/src/redismodule.h b/src/redismodule.h
index 728c7f584..c74772d0f 100644
--- a/src/redismodule.h
+++ b/src/redismodule.h
@@ -392,6 +392,7 @@ typedef struct RedisModuleDictIter RedisModuleDictIter;
typedef struct RedisModuleCommandFilterCtx RedisModuleCommandFilterCtx;
typedef struct RedisModuleCommandFilter RedisModuleCommandFilter;
typedef struct RedisModuleInfoCtx RedisModuleInfoCtx;
+typedef struct RedisModuleCursor RedisModuleCursor;
typedef int (*RedisModuleCmdFunc)(RedisModuleCtx *ctx, RedisModuleString **argv, int argc);
typedef void (*RedisModuleDisconnectFunc)(RedisModuleCtx *ctx, RedisModuleBlockedClient *bc);
@@ -409,6 +410,7 @@ typedef void (*RedisModuleTimerProc)(RedisModuleCtx *ctx, void *data);
typedef void (*RedisModuleCommandFilterFunc) (RedisModuleCommandFilterCtx *filter);
typedef void (*RedisModuleForkDoneHandler) (int exitcode, int bysignal, void *user_data);
typedef void (*RedisModuleInfoFunc)(RedisModuleInfoCtx *ctx, int for_crash_report);
+typedef void (*RedisModuleScanCB)(void *privdata, RedisModuleString* keyname, RedisModuleKey* key);
#define REDISMODULE_TYPE_METHOD_VERSION 2
typedef struct RedisModuleTypeMethods {
@@ -633,6 +635,10 @@ int REDISMODULE_API_FUNC(RedisModule_CommandFilterArgDelete)(RedisModuleCommandF
int REDISMODULE_API_FUNC(RedisModule_Fork)(RedisModuleForkDoneHandler cb, void *user_data);
int REDISMODULE_API_FUNC(RedisModule_ExitFromChild)(int retcode);
int REDISMODULE_API_FUNC(RedisModule_KillForkChild)(int child_pid);
+RedisModuleCursor* REDISMODULE_API_FUNC(RedisModule_CursorCreate)();
+void REDISMODULE_API_FUNC(RedisModule_CursorRestart)(RedisModuleCursor* cursor);
+void REDISMODULE_API_FUNC(RedisModule_CursorDestroy)(RedisModuleCursor* cursor);
+int REDISMODULE_API_FUNC(RedisModule_Scan)(RedisModuleCtx *ctx, RedisModuleCursor* cursor, RedisModuleScanCB fn, void* privdata);
#endif
#define RedisModule_IsAOFClient(id) ((id) == UINT64_MAX)
@@ -842,6 +848,10 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int
REDISMODULE_GET_API(Fork);
REDISMODULE_GET_API(ExitFromChild);
REDISMODULE_GET_API(KillForkChild);
+ REDISMODULE_GET_API(Scan);
+ REDISMODULE_GET_API(CursorCreate);
+ REDISMODULE_GET_API(CursorRestart);
+ REDISMODULE_GET_API(CursorDestroy);
#endif
if (RedisModule_IsModuleNameBusy && RedisModule_IsModuleNameBusy(name)) return REDISMODULE_ERR;
diff --git a/tests/modules/Makefile b/tests/modules/Makefile
index 9e27758a2..07c3cb829 100644
--- a/tests/modules/Makefile
+++ b/tests/modules/Makefile
@@ -19,7 +19,8 @@ TEST_MODULES = \
propagate.so \
misc.so \
hooks.so \
- blockonkeys.so
+ blockonkeys.so \
+ scan.so
.PHONY: all
diff --git a/tests/modules/scan.c b/tests/modules/scan.c
new file mode 100644
index 000000000..21071720a
--- /dev/null
+++ b/tests/modules/scan.c
@@ -0,0 +1,62 @@
+#define REDISMODULE_EXPERIMENTAL_API
+#include "redismodule.h"
+
+#include <string.h>
+#include <assert.h>
+#include <unistd.h>
+
+#define UNUSED(V) ((void) V)
+
+typedef struct scan_pd{
+ size_t nkeys;
+ RedisModuleCtx *ctx;
+} scan_pd;
+
+void scan_callback(void *privdata, RedisModuleString* keyname, RedisModuleKey* key){
+ scan_pd* pd = privdata;
+ RedisModule_ReplyWithArray(pd->ctx, 2);
+
+ RedisModule_ReplyWithString(pd->ctx, keyname);
+ if(key && RedisModule_KeyType(key) == REDISMODULE_KEYTYPE_STRING){
+ size_t len;
+ char * data = RedisModule_StringDMA(key, &len, REDISMODULE_READ);
+ RedisModule_ReplyWithStringBuffer(pd->ctx, data, len);
+ }else{
+ RedisModule_ReplyWithNull(pd->ctx);
+ }
+ pd->nkeys++;
+}
+
+int scan_keys_values(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
+{
+ scan_pd pd = {
+ .nkeys = 0,
+ .ctx = ctx,
+ };
+
+ RedisModule_ReplyWithArray(ctx, REDISMODULE_POSTPONED_ARRAY_LEN);
+
+ RedisModuleCursor* cursor = RedisModule_CursorCreate();
+ while(RedisModule_Scan(ctx, cursor, scan_callback, &pd));
+ RedisModule_CursorDestroy(cursor);
+
+ RedisModule_ReplySetArrayLength(ctx, pd.nkeys);
+ return 0;
+}
+
+int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
+ UNUSED(argv);
+ UNUSED(argc);
+ if (RedisModule_Init(ctx, "scan", 1, REDISMODULE_APIVER_1)== REDISMODULE_ERR)
+ return REDISMODULE_ERR;
+
+ if (RedisModule_CreateCommand(ctx, "scan.scankeysvalues", scan_keys_values, "", 0, 0, 0) == REDISMODULE_ERR)
+ return REDISMODULE_ERR;
+
+ return REDISMODULE_OK;
+}
+
+
+
+
+
diff --git a/tests/unit/moduleapi/scan.tcl b/tests/unit/moduleapi/scan.tcl
new file mode 100644
index 000000000..5a77e8195
--- /dev/null
+++ b/tests/unit/moduleapi/scan.tcl
@@ -0,0 +1,18 @@
+set testmodule [file normalize tests/modules/scan.so]
+
+proc count_log_message {pattern} {
+ set result [exec grep -c $pattern < [srv 0 stdout]]
+}
+
+start_server {tags {"modules"}} {
+ r module load $testmodule
+
+ test {Module scan} {
+ # the module create a scan command which also return values
+ r set x 1
+ r set y 2
+ r set z 3
+ lsort [r scan.scankeysvalues]
+ } {{x 1} {y 2} {z 3}}
+
+} \ No newline at end of file