summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorantirez <antirez@gmail.com>2020-05-29 12:06:12 +0200
committerantirez <antirez@gmail.com>2020-06-10 10:40:18 +0200
commit80efd2e8b5aaea8457fa69a51265529448728514 (patch)
tree4bd2c2bcf1aacbd716e731dce22350d54790a687
parent66ade8636edd189a1ce50fa50797fb35850b1231 (diff)
downloadredis-80efd2e8b5aaea8457fa69a51265529448728514.tar.gz
Threaded core commands: WIP #1.
-rw-r--r--src/module.c10
-rw-r--r--src/server.c2
-rw-r--r--src/server.h4
-rw-r--r--src/t_string.c149
4 files changed, 98 insertions, 67 deletions
diff --git a/src/module.c b/src/module.c
index 1efade996..cf0f598de 100644
--- a/src/module.c
+++ b/src/module.c
@@ -372,6 +372,7 @@ typedef struct RedisModuleUser {
/* The Redis core itself registeres a module in order to use module services
* for things like blocking clients for threaded execution of slow commands. */
RedisModule *coremodule;
+_Atomic unsigned long CoreModuleBlockedClients = 0;
/* --------------------------------------------------------------------------
* Prototypes
@@ -4610,6 +4611,7 @@ void RM_SetDisconnectCallback(RedisModuleBlockedClient *bc, RedisModuleDisconnec
* When this happens the RedisModuleBlockedClient structure in the queue
* will have the 'client' field set to NULL. */
void moduleHandleBlockedClients(void) {
+ if (moduleCount() == 0 && CoreModuleBlockedClients == 0) return;
listNode *ln;
RedisModuleBlockedClient *bc;
@@ -7227,7 +7229,9 @@ void threadedCoreCommandFreePrivdata(RedisModuleCtx *ctx, void *privdata) {
tcprivdata *tcpd = privdata;
for (int j = 0; j < tcpd->freecount; j++)
decrRefCount(tcpd->objv[j]);
+ zfree(tcpd->objv);
zfree(tcpd);
+ CoreModuleBlockedClients--;
}
/* The entry point of the threaded execution. */
@@ -7272,10 +7276,12 @@ void executeThreadedCommand(client *c, coreThreadedCommandCallback callback, rob
tcprivdata *tcpd = zmalloc(sizeof(*tcpd));
/* Block the client. */
+ CoreModuleBlockedClients++;
RedisModuleBlockedClient *bc = RM_BlockClient(&ctx,NULL,NULL,threadedCoreCommandFreePrivdata,0);
tcpd->bc = bc;
tcpd->callback = callback;
- tcpd->objv = objv;
+ tcpd->objv = zmalloc(sizeof(robj*)*objc);
+ memcpy(tcpd->objv,objv,sizeof(robj*)*objc);
tcpd->objc = objc;
tcpd->freecount = freecount;
bc->privdata = tcpd;
@@ -7285,7 +7291,7 @@ void executeThreadedCommand(client *c, coreThreadedCommandCallback callback, rob
client *copy = bc->reply_client;
copy->argc = c->argc;
copy->argv = zmalloc(sizeof(robj*) * c->argc);
- for (int j = 0; j < c->argc; c++)
+ for (int j = 0; j < c->argc; j++)
copy->argv[j] = createStringObject(c->argv[j]->ptr,
sdslen(c->argv[j]->ptr));
diff --git a/src/server.c b/src/server.c
index e8e711240..d5ddefae7 100644
--- a/src/server.c
+++ b/src/server.c
@@ -2151,7 +2151,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
/* Check if there are clients unblocked by modules that implement
* blocking commands. */
- if (moduleCount()) moduleHandleBlockedClients();
+ moduleHandleBlockedClients();
/* Try to process pending commands for clients that were just unblocked. */
if (listLength(server.unblocked_clients))
diff --git a/src/server.h b/src/server.h
index a08585292..62ad92cae 100644
--- a/src/server.h
+++ b/src/server.h
@@ -1581,6 +1581,10 @@ void moduleUnblockClient(client *c);
int moduleClientIsBlockedOnKeys(client *c);
void moduleNotifyUserChanged(client *c);
+/* Modules functionalities exported to core commands. */
+typedef void (*coreThreadedCommandCallback)(client *c, robj **objv, int objc);
+void executeThreadedCommand(client *c, coreThreadedCommandCallback callback, robj **objv, int objc, int freecount);
+
/* Utils */
long long ustime(void);
long long mstime(void);
diff --git a/src/t_string.c b/src/t_string.c
index 6d27ef6aa..43f81f067 100644
--- a/src/t_string.c
+++ b/src/t_string.c
@@ -494,73 +494,23 @@ void stralgoCommand(client *c) {
}
}
-/* STRALGO LCS [IDX] [MINMATCHLEN <len>] [WITHMATCHLEN]
- * STRINGS <string> <string> | KEYS <keya> <keyb> */
-void stralgoLCS(client *c) {
+/* This implements the threaded part of STRALGO LCS. It's the callback
+ * we provide to the background execution engine. */
+void stralgoLCSThreadedPart(client *c, robj **objv, int objc) {
uint32_t i, j;
- long long minmatchlen = 0;
- sds a = NULL, b = NULL;
- int getlen = 0, getidx = 0, withmatchlen = 0;
- robj *obja = NULL, *objb = NULL;
-
- for (j = 2; j < (uint32_t)c->argc; j++) {
- char *opt = c->argv[j]->ptr;
- int moreargs = (c->argc-1) - j;
-
- if (!strcasecmp(opt,"IDX")) {
- getidx = 1;
- } else if (!strcasecmp(opt,"LEN")) {
- getlen = 1;
- } else if (!strcasecmp(opt,"WITHMATCHLEN")) {
- withmatchlen = 1;
- } else if (!strcasecmp(opt,"MINMATCHLEN") && moreargs) {
- if (getLongLongFromObjectOrReply(c,c->argv[j+1],&minmatchlen,NULL)
- != C_OK) return;
- if (minmatchlen < 0) minmatchlen = 0;
- j++;
- } else if (!strcasecmp(opt,"STRINGS") && moreargs > 1) {
- if (a != NULL) {
- addReplyError(c,"Either use STRINGS or KEYS");
- return;
- }
- a = c->argv[j+1]->ptr;
- b = c->argv[j+2]->ptr;
- j += 2;
- } else if (!strcasecmp(opt,"KEYS") && moreargs > 1) {
- if (a != NULL) {
- addReplyError(c,"Either use STRINGS or KEYS");
- return;
- }
- obja = lookupKeyRead(c->db,c->argv[j+1]);
- objb = lookupKeyRead(c->db,c->argv[j+2]);
- obja = obja ? getDecodedObject(obja) : createStringObject("",0);
- objb = objb ? getDecodedObject(objb) : createStringObject("",0);
- a = obja->ptr;
- b = objb->ptr;
- j += 2;
- } else {
- addReply(c,shared.syntaxerr);
- return;
- }
- }
-
- /* Complain if the user passed ambiguous parameters. */
- if (a == NULL) {
- addReplyError(c,"Please specify two strings: "
- "STRINGS or KEYS options are mandatory");
- return;
- } else if (getlen && getidx) {
- addReplyError(c,
- "If you want both the length and indexes, please "
- "just use IDX.");
- return;
- }
+ UNUSED(objc);
/* Compute the LCS using the vanilla dynamic programming technique of
* building a table of LCS(x,y) substrings. */
+ sds a = objv[0]->ptr, b = objv[1]->ptr;
uint32_t alen = sdslen(a);
uint32_t blen = sdslen(b);
+ /* FIXME: really populate these values with data in
+ * objv. */
+ long long minmatchlen = 0;
+ int getlen = 0, getidx = 0, withmatchlen = 0;
+
/* Setup an uint32_t array to store at LCS[i,j] the length of the
* LCS A0..i-1, B0..j-1. Note that we have a linear array here, so
* we index it as LCS[j+(blen+1)*j] */
@@ -673,8 +623,6 @@ void stralgoLCS(client *c) {
}
}
- /* Signal modified key, increment dirty, ... */
-
/* Reply depending on the given options. */
if (arraylenptr) {
addReplyBulkCString(c,"len");
@@ -688,10 +636,83 @@ void stralgoLCS(client *c) {
}
/* Cleanup. */
- if (obja) decrRefCount(obja);
- if (objb) decrRefCount(objb);
sdsfree(result);
zfree(lcs);
return;
}
+/* STRALGO LCS [IDX] [MINMATCHLEN <len>] [WITHMATCHLEN]
+ * STRINGS <string> <string> | KEYS <keya> <keyb> */
+void stralgoLCS(client *c) {
+ long long minmatchlen = 0;
+ int getlen = 0, getidx = 0, withmatchlen = 0;
+ int data_from_key = 0;
+ robj *obja = NULL, *objb = NULL;
+
+ for (int j = 2; j < c->argc; j++) {
+ char *opt = c->argv[j]->ptr;
+ int moreargs = (c->argc-1) - j;
+
+ if (!strcasecmp(opt,"IDX")) {
+ getidx = 1;
+ } else if (!strcasecmp(opt,"LEN")) {
+ getlen = 1;
+ } else if (!strcasecmp(opt,"WITHMATCHLEN")) {
+ withmatchlen = 1;
+ } else if (!strcasecmp(opt,"MINMATCHLEN") && moreargs) {
+ if (getLongLongFromObjectOrReply(c,c->argv[j+1],&minmatchlen,NULL)
+ != C_OK) return;
+ if (minmatchlen < 0) minmatchlen = 0;
+ j++;
+ } else if (!strcasecmp(opt,"STRINGS") && moreargs > 1) {
+ if (obja != NULL) {
+ addReplyError(c,"Either use STRINGS or KEYS");
+ return;
+ }
+ obja = c->argv[j+1];
+ objb = c->argv[j+2];
+ /* We increment the objects refcount only to make the cleanup
+ * code path the same as the KEYS option. */
+ incrRefCount(obja);
+ incrRefCount(objb);
+ j += 2;
+ } else if (!strcasecmp(opt,"KEYS") && moreargs > 1) {
+ if (obja != NULL) {
+ addReplyError(c,"Either use STRINGS or KEYS");
+ return;
+ }
+ obja = lookupKeyRead(c->db,c->argv[j+1]);
+ objb = lookupKeyRead(c->db,c->argv[j+2]);
+ obja = obja ? getDecodedObject(obja) : createStringObject("",0);
+ objb = objb ? getDecodedObject(objb) : createStringObject("",0);
+ j += 2;
+ data_from_key = 1;
+ } else {
+ addReply(c,shared.syntaxerr);
+ return;
+ }
+ }
+
+ /* Complain if the user passed ambiguous parameters. */
+ if (obja == NULL) {
+ addReplyError(c,"Please specify two strings: "
+ "STRINGS or KEYS options are mandatory");
+ return;
+ } else if (getlen && getidx) {
+ addReplyError(c,
+ "If you want both the length and indexes, please "
+ "just use IDX.");
+ return;
+ }
+
+ /* Switch to the threaded execution of the command. We always work on
+ * a copy of the data. */
+ robj *a = dupStringObject(obja);
+ robj *b = dupStringObject(objb);
+ decrRefCount(obja);
+ decrRefCount(objb);
+ robj *objv[2] = {a,b};
+ executeThreadedCommand(c, stralgoLCSThreadedPart,objv,2,
+ data_from_key?2:0);
+}
+