summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBinbin <binloveplay1314@qq.com>2023-03-12 23:50:44 +0800
committerGitHub <noreply@github.com>2023-03-12 17:50:44 +0200
commit416842e6c004dbd951e398a8651df6c56a030a23 (patch)
tree47ef38bcf6e592441d6e50b4cd1e44271c923511
parent4e7eb16ae70d2664d169e412b965f6e9143de7a0 (diff)
downloadredis-416842e6c004dbd951e398a8651df6c56a030a23.tar.gz
Fix the bug that CLIENT REPLY OFF|SKIP cannot receive push notifications (#11875)
This bug seems to be there forever, CLIENT REPLY OFF|SKIP will mark the client with CLIENT_REPLY_OFF or CLIENT_REPLY_SKIP flags. With these flags, prepareClientToWrite called by addReply* will return C_ERR directly. So the client can't receive the Pub/Sub messages and any other push notifications, e.g client side tracking. In this PR, we adding a CLIENT_PUSHING flag, disables the reply silencing flags. When adding push replies, set the flag, after the reply, clear the flag. Then add the flag check in prepareClientToWrite. Fixes #11874 Note, the SUBSCRIBE command response is a bit awkward, see https://github.com/redis/redis-doc/pull/2327 Co-authored-by: Oran Agra <oran@redislabs.com>
-rw-r--r--src/debug.c3
-rw-r--r--src/networking.c7
-rw-r--r--src/pubsub.c18
-rw-r--r--src/server.h1
-rw-r--r--src/tracking.c9
-rw-r--r--tests/unit/introspection.tcl42
-rw-r--r--tests/unit/pubsub.tcl28
-rw-r--r--tests/unit/pubsubshard.tcl33
-rw-r--r--tests/unit/tracking.tcl91
9 files changed, 227 insertions, 5 deletions
diff --git a/src/debug.c b/src/debug.c
index 7a02c332b..41722a901 100644
--- a/src/debug.c
+++ b/src/debug.c
@@ -807,9 +807,12 @@ NULL
addReplyError(c,"RESP2 is not supported by this command");
return;
}
+ uint64_t old_flags = c->flags;
+ c->flags |= CLIENT_PUSHING;
addReplyPushLen(c,2);
addReplyBulkCString(c,"server-cpu-usage");
addReplyLongLong(c,42);
+ if (!(old_flags & CLIENT_PUSHING)) c->flags &= ~CLIENT_PUSHING;
/* Push replies are not synchronous replies, so we emit also a
* normal reply in order for blocking clients just discarding the
* push reply, to actually consume the reply and continue. */
diff --git a/src/networking.c b/src/networking.c
index 3b4caa4af..634c1fa89 100644
--- a/src/networking.c
+++ b/src/networking.c
@@ -292,8 +292,10 @@ int prepareClientToWrite(client *c) {
/* If CLIENT_CLOSE_ASAP flag is set, we need not write anything. */
if (c->flags & CLIENT_CLOSE_ASAP) return C_ERR;
- /* CLIENT REPLY OFF / SKIP handling: don't send replies. */
- if (c->flags & (CLIENT_REPLY_OFF|CLIENT_REPLY_SKIP)) return C_ERR;
+ /* CLIENT REPLY OFF / SKIP handling: don't send replies.
+ * CLIENT_PUSHING handling: disables the reply silencing flags. */
+ if ((c->flags & (CLIENT_REPLY_OFF|CLIENT_REPLY_SKIP)) &&
+ !(c->flags & CLIENT_PUSHING)) return C_ERR;
/* Masters don't receive replies, unless CLIENT_MASTER_FORCE_REPLY flag
* is set. */
@@ -976,6 +978,7 @@ void addReplyAttributeLen(client *c, long length) {
void addReplyPushLen(client *c, long length) {
serverAssert(c->resp >= 3);
+ serverAssertWithInfo(c, NULL, c->flags & CLIENT_PUSHING);
addReplyAggregateLen(c,length,'>');
}
diff --git a/src/pubsub.c b/src/pubsub.c
index a257a8af3..2bbe40380 100644
--- a/src/pubsub.c
+++ b/src/pubsub.c
@@ -105,6 +105,8 @@ pubsubtype pubSubShardType = {
* to send a special message (for instance an Array type) by using the
* addReply*() API family. */
void addReplyPubsubMessage(client *c, robj *channel, robj *msg, robj *message_bulk) {
+ uint64_t old_flags = c->flags;
+ c->flags |= CLIENT_PUSHING;
if (c->resp == 2)
addReply(c,shared.mbulkhdr[3]);
else
@@ -112,12 +114,15 @@ void addReplyPubsubMessage(client *c, robj *channel, robj *msg, robj *message_bu
addReply(c,message_bulk);
addReplyBulk(c,channel);
if (msg) addReplyBulk(c,msg);
+ if (!(old_flags & CLIENT_PUSHING)) c->flags &= ~CLIENT_PUSHING;
}
/* Send a pubsub message of type "pmessage" to the client. The difference
* with the "message" type delivered by addReplyPubsubMessage() is that
* this message format also includes the pattern that matched the message. */
void addReplyPubsubPatMessage(client *c, robj *pat, robj *channel, robj *msg) {
+ uint64_t old_flags = c->flags;
+ c->flags |= CLIENT_PUSHING;
if (c->resp == 2)
addReply(c,shared.mbulkhdr[4]);
else
@@ -126,10 +131,13 @@ void addReplyPubsubPatMessage(client *c, robj *pat, robj *channel, robj *msg) {
addReplyBulk(c,pat);
addReplyBulk(c,channel);
addReplyBulk(c,msg);
+ if (!(old_flags & CLIENT_PUSHING)) c->flags &= ~CLIENT_PUSHING;
}
/* Send the pubsub subscription notification to the client. */
void addReplyPubsubSubscribed(client *c, robj *channel, pubsubtype type) {
+ uint64_t old_flags = c->flags;
+ c->flags |= CLIENT_PUSHING;
if (c->resp == 2)
addReply(c,shared.mbulkhdr[3]);
else
@@ -137,6 +145,7 @@ void addReplyPubsubSubscribed(client *c, robj *channel, pubsubtype type) {
addReply(c,*type.subscribeMsg);
addReplyBulk(c,channel);
addReplyLongLong(c,type.subscriptionCount(c));
+ if (!(old_flags & CLIENT_PUSHING)) c->flags &= ~CLIENT_PUSHING;
}
/* Send the pubsub unsubscription notification to the client.
@@ -144,6 +153,8 @@ void addReplyPubsubSubscribed(client *c, robj *channel, pubsubtype type) {
* unsubscribe command but there are no channels to unsubscribe from: we
* still send a notification. */
void addReplyPubsubUnsubscribed(client *c, robj *channel, pubsubtype type) {
+ uint64_t old_flags = c->flags;
+ c->flags |= CLIENT_PUSHING;
if (c->resp == 2)
addReply(c,shared.mbulkhdr[3]);
else
@@ -154,10 +165,13 @@ void addReplyPubsubUnsubscribed(client *c, robj *channel, pubsubtype type) {
else
addReplyNull(c);
addReplyLongLong(c,type.subscriptionCount(c));
+ if (!(old_flags & CLIENT_PUSHING)) c->flags &= ~CLIENT_PUSHING;
}
/* Send the pubsub pattern subscription notification to the client. */
void addReplyPubsubPatSubscribed(client *c, robj *pattern) {
+ uint64_t old_flags = c->flags;
+ c->flags |= CLIENT_PUSHING;
if (c->resp == 2)
addReply(c,shared.mbulkhdr[3]);
else
@@ -165,6 +179,7 @@ void addReplyPubsubPatSubscribed(client *c, robj *pattern) {
addReply(c,shared.psubscribebulk);
addReplyBulk(c,pattern);
addReplyLongLong(c,clientSubscriptionsCount(c));
+ if (!(old_flags & CLIENT_PUSHING)) c->flags &= ~CLIENT_PUSHING;
}
/* Send the pubsub pattern unsubscription notification to the client.
@@ -172,6 +187,8 @@ void addReplyPubsubPatSubscribed(client *c, robj *pattern) {
* punsubscribe command but there are no pattern to unsubscribe from: we
* still send a notification. */
void addReplyPubsubPatUnsubscribed(client *c, robj *pattern) {
+ uint64_t old_flags = c->flags;
+ c->flags |= CLIENT_PUSHING;
if (c->resp == 2)
addReply(c,shared.mbulkhdr[3]);
else
@@ -182,6 +199,7 @@ void addReplyPubsubPatUnsubscribed(client *c, robj *pattern) {
else
addReplyNull(c);
addReplyLongLong(c,clientSubscriptionsCount(c));
+ if (!(old_flags & CLIENT_PUSHING)) c->flags &= ~CLIENT_PUSHING;
}
/*-----------------------------------------------------------------------------
diff --git a/src/server.h b/src/server.h
index 056123d0f..04371e959 100644
--- a/src/server.h
+++ b/src/server.h
@@ -391,6 +391,7 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT];
#define CLIENT_ALLOW_OOM (1ULL<<44) /* Client used by RM_Call is allowed to fully execute
scripts even when in OOM */
#define CLIENT_NO_TOUCH (1ULL<<45) /* This client will not touch LFU/LRU stats. */
+#define CLIENT_PUSHING (1ULL<<46) /* This client is pushing notifications. */
/* Client block type (btype field in client structure)
* if CLIENT_BLOCKED flag is set. */
diff --git a/src/tracking.c b/src/tracking.c
index 775eea684..5a9b114aa 100644
--- a/src/tracking.c
+++ b/src/tracking.c
@@ -266,6 +266,9 @@ void trackingRememberKeys(client *tracking, client *executing) {
* - Following a flush command, to send a single RESP NULL to indicate
* that all keys are now invalid. */
void sendTrackingMessage(client *c, char *keyname, size_t keylen, int proto) {
+ uint64_t old_flags = c->flags;
+ c->flags |= CLIENT_PUSHING;
+
int using_redirection = 0;
if (c->client_tracking_redirection) {
client *redir = lookupClientByID(c->client_tracking_redirection);
@@ -279,10 +282,14 @@ void sendTrackingMessage(client *c, char *keyname, size_t keylen, int proto) {
addReplyBulkCBuffer(c,"tracking-redir-broken",21);
addReplyLongLong(c,c->client_tracking_redirection);
}
+ if (!(old_flags & CLIENT_PUSHING)) c->flags &= ~CLIENT_PUSHING;
return;
}
+ if (!(old_flags & CLIENT_PUSHING)) c->flags &= ~CLIENT_PUSHING;
c = redir;
using_redirection = 1;
+ old_flags = c->flags;
+ c->flags |= CLIENT_PUSHING;
}
/* Only send such info for clients in RESP version 3 or more. However
@@ -301,6 +308,7 @@ void sendTrackingMessage(client *c, char *keyname, size_t keylen, int proto) {
* redirecting to another client. We can't send anything to
* it since RESP2 does not support push messages in the same
* connection. */
+ if (!(old_flags & CLIENT_PUSHING)) c->flags &= ~CLIENT_PUSHING;
return;
}
@@ -312,6 +320,7 @@ void sendTrackingMessage(client *c, char *keyname, size_t keylen, int proto) {
addReplyBulkCBuffer(c,keyname,keylen);
}
updateClientMemUsageAndBucket(c);
+ if (!(old_flags & CLIENT_PUSHING)) c->flags &= ~CLIENT_PUSHING;
}
/* This function is called when a key is modified in Redis and in the case
diff --git a/tests/unit/introspection.tcl b/tests/unit/introspection.tcl
index 097074047..10d3a15e9 100644
--- a/tests/unit/introspection.tcl
+++ b/tests/unit/introspection.tcl
@@ -94,6 +94,48 @@ start_server {tags {"introspection"}} {
}
} {} {needs:save}
+ test "CLIENT REPLY OFF/ON: disable all commands reply" {
+ set rd [redis_deferring_client]
+
+ # These replies were silenced.
+ $rd client reply off
+ $rd ping pong
+ $rd ping pong2
+
+ $rd client reply on
+ assert_equal {OK} [$rd read]
+ $rd ping pong3
+ assert_equal {pong3} [$rd read]
+
+ $rd close
+ }
+
+ test "CLIENT REPLY SKIP: skip the next command reply" {
+ set rd [redis_deferring_client]
+
+ # The first pong reply was silenced.
+ $rd client reply skip
+ $rd ping pong
+
+ $rd ping pong2
+ assert_equal {pong2} [$rd read]
+
+ $rd close
+ }
+
+ test "CLIENT REPLY ON: unset SKIP flag" {
+ set rd [redis_deferring_client]
+
+ $rd client reply skip
+ $rd client reply on
+ assert_equal {OK} [$rd read] ;# OK from CLIENT REPLY ON command
+
+ $rd ping
+ assert_equal {PONG} [$rd read]
+
+ $rd close
+ }
+
test {MONITOR can log executed commands} {
set rd [redis_deferring_client]
$rd monitor
diff --git a/tests/unit/pubsub.tcl b/tests/unit/pubsub.tcl
index fe486edf3..0dd32c397 100644
--- a/tests/unit/pubsub.tcl
+++ b/tests/unit/pubsub.tcl
@@ -166,6 +166,30 @@ start_server {tags {"pubsub network"}} {
$rd1 close
}
+ test "PubSub messages with CLIENT REPLY OFF" {
+ set rd [redis_deferring_client]
+ $rd hello 3
+ $rd read ;# Discard the hello reply
+
+ # Test that the subscribe/psubscribe notification is ok
+ $rd client reply off
+ assert_equal {1} [subscribe $rd channel]
+ assert_equal {2} [psubscribe $rd ch*]
+
+ # Test that the publish notification is ok
+ $rd client reply off
+ assert_equal 2 [r publish channel hello]
+ assert_equal {message channel hello} [$rd read]
+ assert_equal {pmessage ch* channel hello} [$rd read]
+
+ # Test that the unsubscribe/punsubscribe notification is ok
+ $rd client reply off
+ assert_equal {1} [unsubscribe $rd channel]
+ assert_equal {0} [punsubscribe $rd ch*]
+
+ $rd close
+ }
+
test "PUNSUBSCRIBE from non-subscribed channels" {
set rd1 [redis_deferring_client]
assert_equal {0 0 0} [punsubscribe $rd1 {foo.* bar.* quux.*}]
@@ -226,6 +250,7 @@ start_server {tags {"pubsub network"}} {
test "Keyspace notifications: we receive keyspace notifications" {
r config set notify-keyspace-events KA
set rd1 [redis_deferring_client]
+ $rd1 CLIENT REPLY OFF ;# Make sure it works even if replies are silenced
assert_equal {1} [psubscribe $rd1 *]
r set foo bar
assert_equal "pmessage * __keyspace@${db}__:foo set" [$rd1 read]
@@ -235,6 +260,7 @@ start_server {tags {"pubsub network"}} {
test "Keyspace notifications: we receive keyevent notifications" {
r config set notify-keyspace-events EA
set rd1 [redis_deferring_client]
+ $rd1 CLIENT REPLY SKIP ;# Make sure it works even if replies are silenced
assert_equal {1} [psubscribe $rd1 *]
r set foo bar
assert_equal "pmessage * __keyevent@${db}__:set foo" [$rd1 read]
@@ -244,6 +270,8 @@ start_server {tags {"pubsub network"}} {
test "Keyspace notifications: we can receive both kind of events" {
r config set notify-keyspace-events KEA
set rd1 [redis_deferring_client]
+ $rd1 CLIENT REPLY ON ;# Just coverage
+ assert_equal {OK} [$rd1 read]
assert_equal {1} [psubscribe $rd1 *]
r set foo bar
assert_equal "pmessage * __keyspace@${db}__:foo set" [$rd1 read]
diff --git a/tests/unit/pubsubshard.tcl b/tests/unit/pubsubshard.tcl
index 8cccdcff6..6e3fb61c1 100644
--- a/tests/unit/pubsubshard.tcl
+++ b/tests/unit/pubsubshard.tcl
@@ -40,7 +40,7 @@ start_server {tags {"pubsubshard external:skip"}} {
$rd2 close
}
- test "PUBLISH/SUBSCRIBE after UNSUBSCRIBE without arguments" {
+ test "SPUBLISH/SSUBSCRIBE after UNSUBSCRIBE without arguments" {
set rd1 [redis_deferring_client]
assert_equal {1} [ssubscribe $rd1 {chan1}]
assert_equal {2} [ssubscribe $rd1 {chan2}]
@@ -54,7 +54,7 @@ start_server {tags {"pubsubshard external:skip"}} {
$rd1 close
}
- test "SUBSCRIBE to one channel more than once" {
+ test "SSUBSCRIBE to one channel more than once" {
set rd1 [redis_deferring_client]
assert_equal {1 1 1} [ssubscribe $rd1 {chan1 chan1 chan1}]
assert_equal 1 [r SPUBLISH chan1 hello]
@@ -64,7 +64,7 @@ start_server {tags {"pubsubshard external:skip"}} {
$rd1 close
}
- test "UNSUBSCRIBE from non-subscribed channels" {
+ test "SUNSUBSCRIBE from non-subscribed channels" {
set rd1 [redis_deferring_client]
assert_equal {0} [sunsubscribe $rd1 {foo}]
assert_equal {0} [sunsubscribe $rd1 {bar}]
@@ -105,6 +105,33 @@ start_server {tags {"pubsubshard external:skip"}} {
assert_equal "chan1 1" [r pubsub numsub chan1]
assert_equal "chan1" [r pubsub shardchannels]
assert_equal "chan1" [r pubsub channels]
+
+ $rd1 close
+ $rd2 close
+ }
+
+ test "PubSubShard with CLIENT REPLY OFF" {
+ set rd [redis_deferring_client]
+ $rd hello 3
+ $rd read ;# Discard the hello reply
+
+ # Test that the ssubscribe notification is ok
+ $rd client reply off
+ $rd ping
+ assert_equal {1} [ssubscribe $rd channel]
+
+ # Test that the spublish notification is ok
+ $rd client reply off
+ $rd ping
+ assert_equal 1 [r spublish channel hello]
+ assert_equal {smessage channel hello} [$rd read]
+
+ # Test that sunsubscribe notification is ok
+ $rd client reply off
+ $rd ping
+ assert_equal {0} [sunsubscribe $rd channel]
+
+ $rd close
}
}
diff --git a/tests/unit/tracking.tcl b/tests/unit/tracking.tcl
index 21036352f..bea8508b1 100644
--- a/tests/unit/tracking.tcl
+++ b/tests/unit/tracking.tcl
@@ -781,7 +781,98 @@ start_server {tags {"tracking network logreqres:skip"}} {
r debug pause-cron 0
} {OK} {needs:debug}
+ foreach resp {3 2} {
+ test "RESP$resp based basic invalidation with client reply off" {
+ # This entire test is mostly irrelevant for RESP2, but we run it anyway just for some extra coverage.
+ clean_all
+
+ $rd hello $resp
+ $rd read
+ $rd client tracking on
+ $rd read
+
+ $rd_sg set foo bar
+ $rd get foo
+ $rd read
+
+ $rd client reply off
+
+ $rd_sg set foo bar2
+
+ if {$resp == 3} {
+ assert_equal {invalidate foo} [$rd read]
+ } elseif {$resp == 2} { } ;# Just coverage
+
+ # Verify things didn't get messed up and no unexpected reply was pushed to the client.
+ $rd client reply on
+ assert_equal {OK} [$rd read]
+ $rd ping
+ assert_equal {PONG} [$rd read]
+ }
+ }
+
+ test {RESP3 based basic redirect invalidation with client reply off} {
+ clean_all
+
+ set rd_redir [redis_deferring_client]
+ $rd_redir hello 3
+ $rd_redir read
+
+ $rd_redir client id
+ set rd_redir_id [$rd_redir read]
+
+ $rd client tracking on redirect $rd_redir_id
+ $rd read
+
+ $rd_sg set foo bar
+ $rd get foo
+ $rd read
+
+ $rd_redir client reply off
+
+ $rd_sg set foo bar2
+ assert_equal {invalidate foo} [$rd_redir read]
+
+ # Verify things didn't get messed up and no unexpected reply was pushed to the client.
+ $rd_redir client reply on
+ assert_equal {OK} [$rd_redir read]
+ $rd_redir ping
+ assert_equal {PONG} [$rd_redir read]
+
+ $rd_redir close
+ }
+
+ test {RESP3 based basic tracking-redir-broken with client reply off} {
+ clean_all
+
+ $rd hello 3
+ $rd read
+ $rd client tracking on redirect $redir_id
+ $rd read
+
+ $rd_sg set foo bar
+ $rd get foo
+ $rd read
+
+ $rd client reply off
+
+ $rd_redirection quit
+ $rd_redirection read
+
+ $rd_sg set foo bar2
+
+ set res [lsearch -exact [$rd read] "tracking-redir-broken"]
+ assert_morethan_equal $res 0
+
+ # Verify things didn't get messed up and no unexpected reply was pushed to the client.
+ $rd client reply on
+ assert_equal {OK} [$rd read]
+ $rd ping
+ assert_equal {PONG} [$rd read]
+ }
+
$rd_redirection close
+ $rd_sg close
$rd close
}