diff options
author | Harkrishn Patro <harkrisp@amazon.com> | 2022-05-30 22:03:59 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-05-31 08:03:59 +0300 |
commit | 4065b4f27efc539b86beb63829bc148a02adecb1 (patch) | |
tree | 13f2c1d3e1ca0e7710ede98640814e81d837b705 /src | |
parent | d7ae858745f722b845d3540a437f89a7fb8d0bb2 (diff) | |
download | redis-4065b4f27efc539b86beb63829bc148a02adecb1.tar.gz |
Sharded pubsub publish messagebulk as smessage (#10792)
To easily distinguish between sharded channel message and a global
channel message, introducing `smessage` (instead of `message`) as
message bulk for sharded channel publish message.
This is gonna be a breaking change in 7.0.1!
Background:
Sharded pubsub introduced in redis 7.0, but after the release we quickly
realized that the fact that it's problematic that the client can't distinguish
between normal (global) pubsub messages and sharded ones.
This is important because the same connection can subscribe to both,
but messages sent to one pubsub system are not propagated to the
other (they're completely separate), so if one connection is used to
subscribe to both, we need to assist the client library to know which
message it got so it can forward it to the correct callback.
Diffstat (limited to 'src')
-rw-r--r-- | src/pubsub.c | 11 | ||||
-rw-r--r-- | src/server.c | 1 | ||||
-rw-r--r-- | src/server.h | 4 | ||||
-rw-r--r-- | src/tracking.c | 2 |
4 files changed, 11 insertions, 7 deletions
diff --git a/src/pubsub.c b/src/pubsub.c index 07459c1c8..da1bd1fc2 100644 --- a/src/pubsub.c +++ b/src/pubsub.c @@ -39,6 +39,7 @@ typedef struct pubsubtype { dict **serverPubSubChannels; robj **subscribeMsg; robj **unsubscribeMsg; + robj **messageBulk; }pubsubtype; /* @@ -78,6 +79,7 @@ pubsubtype pubSubType = { .serverPubSubChannels = &server.pubsub_channels, .subscribeMsg = &shared.subscribebulk, .unsubscribeMsg = &shared.unsubscribebulk, + .messageBulk = &shared.messagebulk, }; /* @@ -89,7 +91,8 @@ pubsubtype pubSubShardType = { .subscriptionCount = clientShardSubscriptionsCount, .serverPubSubChannels = &server.pubsubshard_channels, .subscribeMsg = &shared.ssubscribebulk, - .unsubscribeMsg = &shared.sunsubscribebulk + .unsubscribeMsg = &shared.sunsubscribebulk, + .messageBulk = &shared.smessagebulk, }; /*----------------------------------------------------------------------------- @@ -101,12 +104,12 @@ pubsubtype pubSubShardType = { * message. However if the caller sets 'msg' as NULL, it will be able * to send a special message (for instance an Array type) by using the * addReply*() API family. */ -void addReplyPubsubMessage(client *c, robj *channel, robj *msg) { +void addReplyPubsubMessage(client *c, robj *channel, robj *msg, robj *message_bulk) { if (c->resp == 2) addReply(c,shared.mbulkhdr[3]); else addReplyPushLen(c,3); - addReply(c,shared.messagebulk); + addReply(c,message_bulk); addReplyBulk(c,channel); if (msg) addReplyBulk(c,msg); } @@ -461,7 +464,7 @@ int pubsubPublishMessageInternal(robj *channel, robj *message, pubsubtype type) listRewind(list,&li); while ((ln = listNext(&li)) != NULL) { client *c = ln->value; - addReplyPubsubMessage(c,channel,message); + addReplyPubsubMessage(c,channel,message,*type.messageBulk); updateClientMemUsage(c); receivers++; } diff --git a/src/server.c b/src/server.c index 8f4654ad8..75dbab0d2 100644 --- a/src/server.c +++ b/src/server.c @@ -1755,6 +1755,7 @@ void createSharedObjects(void) { shared.unsubscribebulk = createStringObject("$11\r\nunsubscribe\r\n",18); shared.ssubscribebulk = createStringObject("$10\r\nssubscribe\r\n", 17); shared.sunsubscribebulk = createStringObject("$12\r\nsunsubscribe\r\n", 19); + shared.smessagebulk = createStringObject("$8\r\nsmessage\r\n", 14); shared.psubscribebulk = createStringObject("$10\r\npsubscribe\r\n",17); shared.punsubscribebulk = createStringObject("$12\r\npunsubscribe\r\n",19); diff --git a/src/server.h b/src/server.h index 75f7aba60..b34a84cd7 100644 --- a/src/server.h +++ b/src/server.h @@ -1228,7 +1228,7 @@ struct sharedObjectsStruct { *time, *pxat, *absttl, *retrycount, *force, *justid, *entriesread, *lastid, *ping, *setid, *keepttl, *load, *createconsumer, *getack, *special_asterick, *special_equals, *default_username, *redacted, - *ssubscribebulk,*sunsubscribebulk, + *ssubscribebulk,*sunsubscribebulk, *smessagebulk, *select[PROTO_SHARED_SELECT_CMDS], *integers[OBJ_SHARED_INTEGERS], *mbulkhdr[OBJ_SHARED_BULKHDR_LEN], /* "*<value>\r\n" */ @@ -2991,7 +2991,7 @@ void pubsubUnsubscribeShardChannels(robj **channels, unsigned int count); int pubsubUnsubscribeAllPatterns(client *c, int notify); int pubsubPublishMessage(robj *channel, robj *message, int sharded); int pubsubPublishMessageAndPropagateToCluster(robj *channel, robj *message, int sharded); -void addReplyPubsubMessage(client *c, robj *channel, robj *msg); +void addReplyPubsubMessage(client *c, robj *channel, robj *msg, robj *message_bulk); int serverPubsubSubscriptionCount(); int serverPubsubShardSubscriptionCount(); diff --git a/src/tracking.c b/src/tracking.c index b86d984a7..a659e98dd 100644 --- a/src/tracking.c +++ b/src/tracking.c @@ -295,7 +295,7 @@ void sendTrackingMessage(client *c, char *keyname, size_t keylen, int proto) { } else if (using_redirection && c->flags & CLIENT_PUBSUB) { /* We use a static object to speedup things, however we assume * that addReplyPubsubMessage() will not take a reference. */ - addReplyPubsubMessage(c,TrackingChannelName,NULL); + addReplyPubsubMessage(c,TrackingChannelName,NULL,shared.messagebulk); } else { /* If are here, the client is not using RESP3, nor is * redirecting to another client. We can't send anything to |