summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorantirez <antirez@gmail.com>2018-04-12 13:21:48 +0200
committerantirez <antirez@gmail.com>2018-04-12 13:21:48 +0200
commit404160a27198b9bfbac2086be20c4ca130b47eac (patch)
tree3b86af16e3dee15df89674e4b26c6e1e9eab0c25
parent005c932f22a5e3027db2c31865e394d388fcf0a0 (diff)
downloadredis-404160a27198b9bfbac2086be20c4ca130b47eac.tar.gz
Modules API: blocked client disconnection callback.
-rw-r--r--src/module.c44
-rw-r--r--src/modules/helloblock.c22
-rw-r--r--src/redismodule.h9
3 files changed, 72 insertions, 3 deletions
diff --git a/src/module.c b/src/module.c
index a6b4ae4ea..9effe21c9 100644
--- a/src/module.c
+++ b/src/module.c
@@ -158,7 +158,9 @@ typedef struct RedisModuleKey RedisModuleKey;
/* Function pointer type of a function representing a command inside
* a Redis module. */
+struct RedisModuleBlockedClient;
typedef int (*RedisModuleCmdFunc) (RedisModuleCtx *ctx, void **argv, int argc);
+typedef void (*RedisModuleDisconnectFunc) (RedisModuleCtx *ctx, struct RedisModuleBlockedClient *bc);
/* This struct holds the information about a command registered by a module.*/
struct RedisModuleCommandProxy {
@@ -201,6 +203,7 @@ typedef struct RedisModuleBlockedClient {
RedisModule *module; /* Module blocking the client. */
RedisModuleCmdFunc reply_callback; /* Reply callback on normal completion.*/
RedisModuleCmdFunc timeout_callback; /* Reply callback on timeout. */
+ RedisModuleDisconnectFunc disconnect_callback; /* Called on disconnection.*/
void (*free_privdata)(RedisModuleCtx*,void*);/* privdata cleanup callback.*/
void *privdata; /* Module private data that may be used by the reply
or timeout callback. It is set via the
@@ -3438,6 +3441,17 @@ void moduleBlockedClientPipeReadable(aeEventLoop *el, int fd, void *privdata, in
* running the list of clients blocked by a module that need to be unblocked. */
void unblockClientFromModule(client *c) {
RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle;
+
+ /* Call the disconnection callback if any. */
+ if (bc->disconnect_callback) {
+ RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
+ ctx.blocked_privdata = bc->privdata;
+ ctx.module = bc->module;
+ ctx.client = bc->client;
+ bc->disconnect_callback(&ctx,bc);
+ moduleFreeContext(&ctx);
+ }
+
bc->client = NULL;
/* Reset the client for a new query since, for blocking commands implemented
* into modules, we do not it immediately after the command returns (and
@@ -3478,6 +3492,7 @@ RedisModuleBlockedClient *RM_BlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc
bc->module = ctx->module;
bc->reply_callback = reply_callback;
bc->timeout_callback = timeout_callback;
+ bc->disconnect_callback = NULL; /* Set by RM_SetDisconnectCallback() */
bc->free_privdata = free_privdata;
bc->privdata = NULL;
bc->reply_client = createClient(-1);
@@ -3525,6 +3540,26 @@ int RM_AbortBlock(RedisModuleBlockedClient *bc) {
return RM_UnblockClient(bc,NULL);
}
+/* Set a callback that will be called if a blocked client disconnects
+ * before the module has a chance to call RedisModule_UnblockClient()
+ *
+ * Usually what you want to do there, is to cleanup your module state
+ * so that you can call RedisModule_UnblockClient() safely, otherwise
+ * the client will remain blocked forever if the timeout is large.
+ *
+ * Notes:
+ *
+ * 1. It is not safe to call Reply* family functions here, it is also
+ * useless since the client is gone.
+ *
+ * 2. This callback is not called if the client disconnects because of
+ * a timeout. In such a case, the client is unblocked automatically
+ * and the timeout callback is called.
+ */
+void RM_SetDisconnectCallback(RedisModuleBlockedClient *bc, RedisModuleDisconnectFunc callback) {
+ bc->disconnect_callback = callback;
+}
+
/* This function will check the moduleUnblockedClients queue in order to
* call the reply callback and really unblock the client.
*
@@ -3592,6 +3627,10 @@ void moduleHandleBlockedClients(void) {
freeClient(bc->reply_client);
if (c != NULL) {
+ /* Before unblocking the client, set the disconnect callback
+ * to NULL, because if we reached this point, the client was
+ * properly unblocked by the module. */
+ bc->disconnect_callback = NULL;
unblockClient(c);
/* Put the client in the list of clients that need to write
* if there are pending replies here. This is needed since
@@ -3627,6 +3666,10 @@ void moduleBlockedClientTimedOut(client *c) {
ctx.client = bc->client;
bc->timeout_callback(&ctx,(void**)c->argv,c->argc);
moduleFreeContext(&ctx);
+ /* For timeout events, we do not want to call the disconnect callback,
+ * because the blocekd client will be automatically disconnected in
+ * this case, and the user can still hook using the timeout callback. */
+ bc->disconnect_callback = NULL;
}
/* Return non-zero if a module command was called in order to fill the
@@ -4625,4 +4668,5 @@ void moduleRegisterCoreAPI(void) {
REGISTER_API(GetRandomBytes);
REGISTER_API(GetRandomHexChars);
REGISTER_API(BlockedClientDisconnected);
+ REGISTER_API(SetDisconnectCallback);
}
diff --git a/src/modules/helloblock.c b/src/modules/helloblock.c
index cabaeff6c..6bba17d33 100644
--- a/src/modules/helloblock.c
+++ b/src/modules/helloblock.c
@@ -74,6 +74,23 @@ void *HelloBlock_ThreadMain(void *arg) {
return NULL;
}
+/* An example blocked client disconnection callback.
+ *
+ * Note that in the case of the HELLO.BLOCK command, the blocked client is now
+ * owned by the thread calling sleep(). In this speciifc case, there is not
+ * much we can do, however normally we could instead implement a way to
+ * signal the thread that the client disconnected, and sleep the specified
+ * amount of seconds with a while loop calling sleep(1), so that once we
+ * detect the client disconnection, we can terminate the thread ASAP. */
+void HelloBlock_Disconnected(RedisModuleCtx *ctx, RedisModuleBlockedClient *bc) {
+ RedisModule_Log(ctx,"warning","Blocked client %p disconnected!",
+ (void*)bc);
+
+ /* Here you should cleanup your state / threads, and if possible
+ * call RedisModule_UnblockClient(), or notify the thread that will
+ * call the function ASAP. */
+}
+
/* HELLO.BLOCK <delay> <timeout> -- Block for <count> seconds, then reply with
* a random number. Timeout is the command timeout, so that you can test
* what happens when the delay is greater than the timeout. */
@@ -93,6 +110,11 @@ int HelloBlock_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int a
pthread_t tid;
RedisModuleBlockedClient *bc = RedisModule_BlockClient(ctx,HelloBlock_Reply,HelloBlock_Timeout,HelloBlock_FreeData,timeout);
+ /* Here we set a disconnection handler, however since this module will
+ * block in sleep() in a thread, there is not much we can do in the
+ * callback, so this is just to show you the API. */
+ RedisModule_SetDisconnectCallback(bc,HelloBlock_Disconnected);
+
/* Now that we setup a blocking client, we need to pass the control
* to the thread. However we need to pass arguments to the thread:
* the delay and a reference to the blocked client handle. */
diff --git a/src/redismodule.h b/src/redismodule.h
index d500d28bf..9c52012dc 100644
--- a/src/redismodule.h
+++ b/src/redismodule.h
@@ -142,8 +142,9 @@ typedef struct RedisModuleDigest RedisModuleDigest;
typedef struct RedisModuleBlockedClient RedisModuleBlockedClient;
typedef struct RedisModuleClusterInfo RedisModuleClusterInfo;
-typedef int (*RedisModuleCmdFunc) (RedisModuleCtx *ctx, RedisModuleString **argv, int argc);
-typedef int (*RedisModuleNotificationFunc) (RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key);
+typedef int (*RedisModuleCmdFunc)(RedisModuleCtx *ctx, RedisModuleString **argv, int argc);
+typedef void (*RedisModuleDisconnectFunc)(RedisModuleCtx *ctx, RedisModuleBlockedClient *bc);
+typedef int (*RedisModuleNotificationFunc)(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key);
typedef void *(*RedisModuleTypeLoadFunc)(RedisModuleIO *rdb, int encver);
typedef void (*RedisModuleTypeSaveFunc)(RedisModuleIO *rdb, void *value);
typedef void (*RedisModuleTypeRewriteFunc)(RedisModuleIO *aof, RedisModuleString *key, void *value);
@@ -275,7 +276,7 @@ void REDISMODULE_API_FUNC(RedisModule_DigestEndSequence)(RedisModuleDigest *md);
/* Experimental APIs */
#ifdef REDISMODULE_EXPERIMENTAL_API
-#define REDISMODULE_EXPERIMENTAL_API_VERSION 2
+#define REDISMODULE_EXPERIMENTAL_API_VERSION 3
RedisModuleBlockedClient *REDISMODULE_API_FUNC(RedisModule_BlockClient)(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms);
int REDISMODULE_API_FUNC(RedisModule_UnblockClient)(RedisModuleBlockedClient *bc, void *privdata);
int REDISMODULE_API_FUNC(RedisModule_IsBlockedReplyRequest)(RedisModuleCtx *ctx);
@@ -300,6 +301,7 @@ const char *REDISMODULE_API_FUNC(RedisModule_GetMyClusterID)(void);
size_t REDISMODULE_API_FUNC(RedisModule_GetClusterSize)(void);
void REDISMODULE_API_FUNC(RedisModule_GetRandomBytes)(unsigned char *dst, size_t len);
void REDISMODULE_API_FUNC(RedisModule_GetRandomHexChars)(char *dst, size_t len);
+void REDISMODULE_API_FUNC(RedisModule_SetDisconnectCallback)(RedisModuleBlockedClient *bc, RedisModuleDisconnectFunc callback);
#endif
/* This is included inline inside each Redis module. */
@@ -421,6 +423,7 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int
REDISMODULE_GET_API(IsBlockedTimeoutRequest);
REDISMODULE_GET_API(GetBlockedClientPrivateData);
REDISMODULE_GET_API(AbortBlock);
+ REDISMODULE_GET_API(SetDisconnectCallback);
REDISMODULE_GET_API(SubscribeToKeyspaceEvents);
REDISMODULE_GET_API(BlockedClientDisconnected);
REDISMODULE_GET_API(RegisterClusterMessageReceiver);