diff options
Diffstat (limited to 'src/module.c')
-rw-r--r-- | src/module.c | 133 |
1 files changed, 122 insertions, 11 deletions
diff --git a/src/module.c b/src/module.c index ad34e7b64..5758abbb6 100644 --- a/src/module.c +++ b/src/module.c @@ -1848,6 +1848,17 @@ int RM_SelectDb(RedisModuleCtx *ctx, int newid) { return (retval == C_OK) ? REDISMODULE_OK : REDISMODULE_ERR; } +static void initializeKey(RedisModuleKey *kp, RedisModuleCtx *ctx, robj *keyname, robj *value, int mode){ + kp->ctx = ctx; + kp->db = ctx->client->db; + kp->key = keyname; + incrRefCount(keyname); + kp->value = value; + kp->iter = NULL; + kp->mode = mode; + zsetKeyReset(kp); +} + /* Return an handle representing a Redis key, so that it is possible * to call other APIs with the key handle as argument to perform * operations on the key. @@ -1878,27 +1889,24 @@ void *RM_OpenKey(RedisModuleCtx *ctx, robj *keyname, int mode) { /* Setup the key handle. */ kp = zmalloc(sizeof(*kp)); - kp->ctx = ctx; - kp->db = ctx->client->db; - kp->key = keyname; - incrRefCount(keyname); - kp->value = value; - kp->iter = NULL; - kp->mode = mode; - zsetKeyReset(kp); + initializeKey(kp, ctx, keyname, value, mode); autoMemoryAdd(ctx,REDISMODULE_AM_KEY,kp); return (void*)kp; } -/* Close a key handle. */ -void RM_CloseKey(RedisModuleKey *key) { - if (key == NULL) return; +static void closeKeyInternal(RedisModuleKey *key) { int signal = SHOULD_SIGNAL_MODIFIED_KEYS(key->ctx); if ((key->mode & REDISMODULE_WRITE) && signal) signalModifiedKey(key->db,key->key); /* TODO: if (key->iter) RM_KeyIteratorStop(kp); */ RM_ZsetRangeStop(key); decrRefCount(key->key); +} + +/* Close a key handle. */ +void RM_CloseKey(RedisModuleKey *key) { + if (key == NULL) return; + closeKeyInternal(key); autoMemoryFreed(key->ctx,REDISMODULE_AM_KEY,key); zfree(key); } @@ -5891,6 +5899,105 @@ int RM_CommandFilterArgDelete(RedisModuleCommandFilterCtx *fctx, int pos) return REDISMODULE_OK; } +/** + * Callback for scan implementation. + * + * The keyname is owned by the caller and need to be retained if used after this function. + * + * The kp is the data and provide using the best efforts approach, in some cases it might + * not be available (in such case it will be set to NULL) and it is the user responsibility + * to handle it. + * + * The kp (if given) is owned by the caller and will be free when the callback returns + * + */ +typedef void (*RedisModuleScanCB)(void *privdata, RedisModuleString* keyname, RedisModuleKey* key); + +typedef struct { + RedisModuleCtx *ctx; + void* user_data; + RedisModuleScanCB fn; +} ScanCBData; + +typedef struct RedisModuleCursor{ + int cursor; +}RedisModuleCursor; + +void ScanCallback(void *privdata, const dictEntry *de) { + ScanCBData *data = privdata; + sds key = dictGetKey(de); + robj* val = dictGetVal(de); + RedisModuleString *keyname = createObject(OBJ_STRING,sdsdup(key)); + + /* Setup the key handle. */ + RedisModuleKey kp = {0}; + initializeKey(&kp, data->ctx, keyname, val, REDISMODULE_READ); + + data->fn(data->user_data, keyname, &kp); + + closeKeyInternal(&kp); + decrRefCount(keyname); +} + +/** + * Create a new cursor to scan keys. + */ +RedisModuleCursor* RM_CursorCreate() { + RedisModuleCursor* cursor = zmalloc(sizeof(*cursor)); + cursor->cursor = 0; + return cursor; +} + +/** + * Restart an existing cursor. The keys will be rescanned. + */ +void RM_CursorRestart(RedisModuleCursor* cursor) { + cursor->cursor = 0; +} + +/** + * Destroy the cursor struct. + */ +void RM_CursorDestroy(RedisModuleCursor* cursor) { + zfree(cursor); +} + +/** + * Scan api that allows module writer to scan all the keys and value in redis. + * The way it should be used: + * Cursor* c = RedisModule_CursorCreate(); + * while(RedisModule_Scan(ctx, c, callback, privateData)); + * RedisModule_CursorDestroy(c); + * + * It is also possible to use this api from another thread such that the GIL only have to + * be acquired durring the actuall call to RM_Scan: + * Cursor* c = RedisModule_CursorCreate(); + * RedisModule_ThreadSafeCtxLock(ctx); + * while(RedisModule_Scan(ctx, c, callback, privateData)){ + * RedisModule_ThreadSafeCtxUnlock(ctx); + * // do some background job + * RedisModule_ThreadSafeCtxLock(ctx); + * } + * RedisModule_CursorDestroy(c); + * + * The function will return 1 if there is more elements to scan and 0 otherwise. + * It is also possible to restart and existing cursor using RM_CursorRestart + */ +int RM_Scan(RedisModuleCtx *ctx, RedisModuleCursor* cursor, RedisModuleScanCB fn, void* privdata) { + if(cursor->cursor == -1){ + return 0; + } + int ret = 1; + ScanCBData data = { ctx, privdata, fn }; + cursor->cursor = dictScan(ctx->client->db->dict, cursor->cursor, ScanCallback, NULL, &data); + if (cursor->cursor == 0){ + cursor->cursor = -1; + ret = 0; + } + return ret; +} + + /* -------------------------------------------------------------------------- * Module fork API * -------------------------------------------------------------------------- */ @@ -6969,6 +7076,10 @@ void moduleRegisterCoreAPI(void) { REGISTER_API(SetLRUOrLFU); REGISTER_API(GetLRUOrLFU); REGISTER_API(BlockClientOnKeys); + REGISTER_API(Scan); + REGISTER_API(CursorCreate); + REGISTER_API(CursorDestroy); + REGISTER_API(CursorRestart); REGISTER_API(SignalKeyAsReady); REGISTER_API(GetBlockedClientReadyKey); } |