diff options
author | Harkrishn Patro <30795839+hpatro@users.noreply.github.com> | 2022-01-03 01:54:47 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-01-02 16:54:47 -0800 |
commit | 9f8885760b53e6d3952b9c9b41f9e6c48dfa6cec (patch) | |
tree | 770dfdbff19a1a2a1c71a642ebd844d592ef3d26 /src/server.c | |
parent | b8ba942ac2aabf51fd96134d9fa21b47d3baff4a (diff) | |
download | redis-9f8885760b53e6d3952b9c9b41f9e6c48dfa6cec.tar.gz |
Sharded pubsub implementation (#8621)
This commit implements a sharded pubsub implementation based off of shard channels.
Co-authored-by: Harkrishn Patro <harkrisp@amazon.com>
Co-authored-by: Madelyn Olson <madelyneolson@gmail.com>
Diffstat (limited to 'src/server.c')
-rw-r--r-- | src/server.c | 10 |
1 files changed, 8 insertions, 2 deletions
diff --git a/src/server.c b/src/server.c index e7cf5740b..4770a5df0 100644 --- a/src/server.c +++ b/src/server.c @@ -1648,6 +1648,8 @@ void createSharedObjects(void) { shared.pmessagebulk = createStringObject("$8\r\npmessage\r\n",14); shared.subscribebulk = createStringObject("$9\r\nsubscribe\r\n",15); shared.unsubscribebulk = createStringObject("$11\r\nunsubscribe\r\n",18); + shared.ssubscribebulk = createStringObject("$10\r\nssubscribe\r\n", 17); + shared.sunsubscribebulk = createStringObject("$12\r\nsunsubscribe\r\n", 19); shared.psubscribebulk = createStringObject("$10\r\npsubscribe\r\n",17); shared.punsubscribebulk = createStringObject("$12\r\npunsubscribe\r\n",19); @@ -2367,6 +2369,7 @@ void initServer(void) { evictionPoolAlloc(); /* Initialize the LRU keys pool. */ server.pubsub_channels = dictCreate(&keylistDictType); server.pubsub_patterns = dictCreate(&keylistDictType); + server.pubsubshard_channels = dictCreate(&keylistDictType); server.cronloops = 0; server.in_script = 0; server.in_exec = 0; @@ -3499,14 +3502,16 @@ int processCommand(client *c) { if ((c->flags & CLIENT_PUBSUB && c->resp == 2) && c->cmd->proc != pingCommand && c->cmd->proc != subscribeCommand && + c->cmd->proc != ssubscribeCommand && c->cmd->proc != unsubscribeCommand && + c->cmd->proc != sunsubscribeCommand && c->cmd->proc != psubscribeCommand && c->cmd->proc != punsubscribeCommand && c->cmd->proc != quitCommand && c->cmd->proc != resetCommand) { rejectCommandFormat(c, - "Can't execute '%s': only (P)SUBSCRIBE / " - "(P)UNSUBSCRIBE / PING / QUIT / RESET are allowed in this context", + "Can't execute '%s': only (P|S)SUBSCRIBE / " + "(P|S)UNSUBSCRIBE / PING / QUIT / RESET are allowed in this context", c->cmd->name); return C_OK; } @@ -4001,6 +4006,7 @@ void addReplyFlagsForKeyArgs(client *c, uint64_t flags) { void *flaglen = addReplyDeferredLen(c); flagcount += addReplyCommandFlag(c,flags,CMD_KEY_WRITE, "write"); flagcount += addReplyCommandFlag(c,flags,CMD_KEY_READ, "read"); + flagcount += addReplyCommandFlag(c,flags,CMD_KEY_SHARD_CHANNEL, "shard_channel"); flagcount += addReplyCommandFlag(c,flags,CMD_KEY_INCOMPLETE, "incomplete"); setDeferredSetLen(c, flaglen, flagcount); } |