diff options
-rw-r--r-- | src/module.c | 10 | ||||
-rw-r--r-- | src/server.c | 2 | ||||
-rw-r--r-- | src/server.h | 4 | ||||
-rw-r--r-- | src/t_string.c | 149 |
4 files changed, 98 insertions, 67 deletions
diff --git a/src/module.c b/src/module.c index 1efade996..cf0f598de 100644 --- a/src/module.c +++ b/src/module.c @@ -372,6 +372,7 @@ typedef struct RedisModuleUser { /* The Redis core itself registeres a module in order to use module services * for things like blocking clients for threaded execution of slow commands. */ RedisModule *coremodule; +_Atomic unsigned long CoreModuleBlockedClients = 0; /* -------------------------------------------------------------------------- * Prototypes @@ -4610,6 +4611,7 @@ void RM_SetDisconnectCallback(RedisModuleBlockedClient *bc, RedisModuleDisconnec * When this happens the RedisModuleBlockedClient structure in the queue * will have the 'client' field set to NULL. */ void moduleHandleBlockedClients(void) { + if (moduleCount() == 0 && CoreModuleBlockedClients == 0) return; listNode *ln; RedisModuleBlockedClient *bc; @@ -7227,7 +7229,9 @@ void threadedCoreCommandFreePrivdata(RedisModuleCtx *ctx, void *privdata) { tcprivdata *tcpd = privdata; for (int j = 0; j < tcpd->freecount; j++) decrRefCount(tcpd->objv[j]); + zfree(tcpd->objv); zfree(tcpd); + CoreModuleBlockedClients--; } /* The entry point of the threaded execution. */ @@ -7272,10 +7276,12 @@ void executeThreadedCommand(client *c, coreThreadedCommandCallback callback, rob tcprivdata *tcpd = zmalloc(sizeof(*tcpd)); /* Block the client. */ + CoreModuleBlockedClients++; RedisModuleBlockedClient *bc = RM_BlockClient(&ctx,NULL,NULL,threadedCoreCommandFreePrivdata,0); tcpd->bc = bc; tcpd->callback = callback; - tcpd->objv = objv; + tcpd->objv = zmalloc(sizeof(robj*)*objc); + memcpy(tcpd->objv,objv,sizeof(robj*)*objc); tcpd->objc = objc; tcpd->freecount = freecount; bc->privdata = tcpd; @@ -7285,7 +7291,7 @@ void executeThreadedCommand(client *c, coreThreadedCommandCallback callback, rob client *copy = bc->reply_client; copy->argc = c->argc; copy->argv = zmalloc(sizeof(robj*) * c->argc); - for (int j = 0; j < c->argc; c++) + for (int j = 0; j < c->argc; j++) copy->argv[j] = createStringObject(c->argv[j]->ptr, sdslen(c->argv[j]->ptr)); diff --git a/src/server.c b/src/server.c index e8e711240..d5ddefae7 100644 --- a/src/server.c +++ b/src/server.c @@ -2151,7 +2151,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) { /* Check if there are clients unblocked by modules that implement * blocking commands. */ - if (moduleCount()) moduleHandleBlockedClients(); + moduleHandleBlockedClients(); /* Try to process pending commands for clients that were just unblocked. */ if (listLength(server.unblocked_clients)) diff --git a/src/server.h b/src/server.h index a08585292..62ad92cae 100644 --- a/src/server.h +++ b/src/server.h @@ -1581,6 +1581,10 @@ void moduleUnblockClient(client *c); int moduleClientIsBlockedOnKeys(client *c); void moduleNotifyUserChanged(client *c); +/* Modules functionalities exported to core commands. */ +typedef void (*coreThreadedCommandCallback)(client *c, robj **objv, int objc); +void executeThreadedCommand(client *c, coreThreadedCommandCallback callback, robj **objv, int objc, int freecount); + /* Utils */ long long ustime(void); long long mstime(void); diff --git a/src/t_string.c b/src/t_string.c index 6d27ef6aa..43f81f067 100644 --- a/src/t_string.c +++ b/src/t_string.c @@ -494,73 +494,23 @@ void stralgoCommand(client *c) { } } -/* STRALGO LCS [IDX] [MINMATCHLEN <len>] [WITHMATCHLEN] - * STRINGS <string> <string> | KEYS <keya> <keyb> */ -void stralgoLCS(client *c) { +/* This implements the threaded part of STRALGO LCS. It's the callback + * we provide to the background execution engine. */ +void stralgoLCSThreadedPart(client *c, robj **objv, int objc) { uint32_t i, j; - long long minmatchlen = 0; - sds a = NULL, b = NULL; - int getlen = 0, getidx = 0, withmatchlen = 0; - robj *obja = NULL, *objb = NULL; - - for (j = 2; j < (uint32_t)c->argc; j++) { - char *opt = c->argv[j]->ptr; - int moreargs = (c->argc-1) - j; - - if (!strcasecmp(opt,"IDX")) { - getidx = 1; - } else if (!strcasecmp(opt,"LEN")) { - getlen = 1; - } else if (!strcasecmp(opt,"WITHMATCHLEN")) { - withmatchlen = 1; - } else if (!strcasecmp(opt,"MINMATCHLEN") && moreargs) { - if (getLongLongFromObjectOrReply(c,c->argv[j+1],&minmatchlen,NULL) - != C_OK) return; - if (minmatchlen < 0) minmatchlen = 0; - j++; - } else if (!strcasecmp(opt,"STRINGS") && moreargs > 1) { - if (a != NULL) { - addReplyError(c,"Either use STRINGS or KEYS"); - return; - } - a = c->argv[j+1]->ptr; - b = c->argv[j+2]->ptr; - j += 2; - } else if (!strcasecmp(opt,"KEYS") && moreargs > 1) { - if (a != NULL) { - addReplyError(c,"Either use STRINGS or KEYS"); - return; - } - obja = lookupKeyRead(c->db,c->argv[j+1]); - objb = lookupKeyRead(c->db,c->argv[j+2]); - obja = obja ? getDecodedObject(obja) : createStringObject("",0); - objb = objb ? getDecodedObject(objb) : createStringObject("",0); - a = obja->ptr; - b = objb->ptr; - j += 2; - } else { - addReply(c,shared.syntaxerr); - return; - } - } - - /* Complain if the user passed ambiguous parameters. */ - if (a == NULL) { - addReplyError(c,"Please specify two strings: " - "STRINGS or KEYS options are mandatory"); - return; - } else if (getlen && getidx) { - addReplyError(c, - "If you want both the length and indexes, please " - "just use IDX."); - return; - } + UNUSED(objc); /* Compute the LCS using the vanilla dynamic programming technique of * building a table of LCS(x,y) substrings. */ + sds a = objv[0]->ptr, b = objv[1]->ptr; uint32_t alen = sdslen(a); uint32_t blen = sdslen(b); + /* FIXME: really populate these values with data in + * objv. */ + long long minmatchlen = 0; + int getlen = 0, getidx = 0, withmatchlen = 0; + /* Setup an uint32_t array to store at LCS[i,j] the length of the * LCS A0..i-1, B0..j-1. Note that we have a linear array here, so * we index it as LCS[j+(blen+1)*j] */ @@ -673,8 +623,6 @@ void stralgoLCS(client *c) { } } - /* Signal modified key, increment dirty, ... */ - /* Reply depending on the given options. */ if (arraylenptr) { addReplyBulkCString(c,"len"); @@ -688,10 +636,83 @@ void stralgoLCS(client *c) { } /* Cleanup. */ - if (obja) decrRefCount(obja); - if (objb) decrRefCount(objb); sdsfree(result); zfree(lcs); return; } +/* STRALGO LCS [IDX] [MINMATCHLEN <len>] [WITHMATCHLEN] + * STRINGS <string> <string> | KEYS <keya> <keyb> */ +void stralgoLCS(client *c) { + long long minmatchlen = 0; + int getlen = 0, getidx = 0, withmatchlen = 0; + int data_from_key = 0; + robj *obja = NULL, *objb = NULL; + + for (int j = 2; j < c->argc; j++) { + char *opt = c->argv[j]->ptr; + int moreargs = (c->argc-1) - j; + + if (!strcasecmp(opt,"IDX")) { + getidx = 1; + } else if (!strcasecmp(opt,"LEN")) { + getlen = 1; + } else if (!strcasecmp(opt,"WITHMATCHLEN")) { + withmatchlen = 1; + } else if (!strcasecmp(opt,"MINMATCHLEN") && moreargs) { + if (getLongLongFromObjectOrReply(c,c->argv[j+1],&minmatchlen,NULL) + != C_OK) return; + if (minmatchlen < 0) minmatchlen = 0; + j++; + } else if (!strcasecmp(opt,"STRINGS") && moreargs > 1) { + if (obja != NULL) { + addReplyError(c,"Either use STRINGS or KEYS"); + return; + } + obja = c->argv[j+1]; + objb = c->argv[j+2]; + /* We increment the objects refcount only to make the cleanup + * code path the same as the KEYS option. */ + incrRefCount(obja); + incrRefCount(objb); + j += 2; + } else if (!strcasecmp(opt,"KEYS") && moreargs > 1) { + if (obja != NULL) { + addReplyError(c,"Either use STRINGS or KEYS"); + return; + } + obja = lookupKeyRead(c->db,c->argv[j+1]); + objb = lookupKeyRead(c->db,c->argv[j+2]); + obja = obja ? getDecodedObject(obja) : createStringObject("",0); + objb = objb ? getDecodedObject(objb) : createStringObject("",0); + j += 2; + data_from_key = 1; + } else { + addReply(c,shared.syntaxerr); + return; + } + } + + /* Complain if the user passed ambiguous parameters. */ + if (obja == NULL) { + addReplyError(c,"Please specify two strings: " + "STRINGS or KEYS options are mandatory"); + return; + } else if (getlen && getidx) { + addReplyError(c, + "If you want both the length and indexes, please " + "just use IDX."); + return; + } + + /* Switch to the threaded execution of the command. We always work on + * a copy of the data. */ + robj *a = dupStringObject(obja); + robj *b = dupStringObject(objb); + decrRefCount(obja); + decrRefCount(objb); + robj *objv[2] = {a,b}; + executeThreadedCommand(c, stralgoLCSThreadedPart,objv,2, + data_from_key?2:0); +} + |