summaryrefslogtreecommitdiff
path: root/src/server.c
diff options
context:
space:
mode:
authorHarkrishn Patro <30795839+hpatro@users.noreply.github.com>2022-01-03 01:54:47 +0100
committerGitHub <noreply@github.com>2022-01-02 16:54:47 -0800
commit9f8885760b53e6d3952b9c9b41f9e6c48dfa6cec (patch)
tree770dfdbff19a1a2a1c71a642ebd844d592ef3d26 /src/server.c
parentb8ba942ac2aabf51fd96134d9fa21b47d3baff4a (diff)
downloadredis-9f8885760b53e6d3952b9c9b41f9e6c48dfa6cec.tar.gz
Sharded pubsub implementation (#8621)
This commit implements a sharded pubsub implementation based off of shard channels. Co-authored-by: Harkrishn Patro <harkrisp@amazon.com> Co-authored-by: Madelyn Olson <madelyneolson@gmail.com>
Diffstat (limited to 'src/server.c')
-rw-r--r--src/server.c10
1 files changed, 8 insertions, 2 deletions
diff --git a/src/server.c b/src/server.c
index e7cf5740b..4770a5df0 100644
--- a/src/server.c
+++ b/src/server.c
@@ -1648,6 +1648,8 @@ void createSharedObjects(void) {
shared.pmessagebulk = createStringObject("$8\r\npmessage\r\n",14);
shared.subscribebulk = createStringObject("$9\r\nsubscribe\r\n",15);
shared.unsubscribebulk = createStringObject("$11\r\nunsubscribe\r\n",18);
+ shared.ssubscribebulk = createStringObject("$10\r\nssubscribe\r\n", 17);
+ shared.sunsubscribebulk = createStringObject("$12\r\nsunsubscribe\r\n", 19);
shared.psubscribebulk = createStringObject("$10\r\npsubscribe\r\n",17);
shared.punsubscribebulk = createStringObject("$12\r\npunsubscribe\r\n",19);
@@ -2367,6 +2369,7 @@ void initServer(void) {
evictionPoolAlloc(); /* Initialize the LRU keys pool. */
server.pubsub_channels = dictCreate(&keylistDictType);
server.pubsub_patterns = dictCreate(&keylistDictType);
+ server.pubsubshard_channels = dictCreate(&keylistDictType);
server.cronloops = 0;
server.in_script = 0;
server.in_exec = 0;
@@ -3499,14 +3502,16 @@ int processCommand(client *c) {
if ((c->flags & CLIENT_PUBSUB && c->resp == 2) &&
c->cmd->proc != pingCommand &&
c->cmd->proc != subscribeCommand &&
+ c->cmd->proc != ssubscribeCommand &&
c->cmd->proc != unsubscribeCommand &&
+ c->cmd->proc != sunsubscribeCommand &&
c->cmd->proc != psubscribeCommand &&
c->cmd->proc != punsubscribeCommand &&
c->cmd->proc != quitCommand &&
c->cmd->proc != resetCommand) {
rejectCommandFormat(c,
- "Can't execute '%s': only (P)SUBSCRIBE / "
- "(P)UNSUBSCRIBE / PING / QUIT / RESET are allowed in this context",
+ "Can't execute '%s': only (P|S)SUBSCRIBE / "
+ "(P|S)UNSUBSCRIBE / PING / QUIT / RESET are allowed in this context",
c->cmd->name);
return C_OK;
}
@@ -4001,6 +4006,7 @@ void addReplyFlagsForKeyArgs(client *c, uint64_t flags) {
void *flaglen = addReplyDeferredLen(c);
flagcount += addReplyCommandFlag(c,flags,CMD_KEY_WRITE, "write");
flagcount += addReplyCommandFlag(c,flags,CMD_KEY_READ, "read");
+ flagcount += addReplyCommandFlag(c,flags,CMD_KEY_SHARD_CHANNEL, "shard_channel");
flagcount += addReplyCommandFlag(c,flags,CMD_KEY_INCOMPLETE, "incomplete");
setDeferredSetLen(c, flaglen, flagcount);
}