From 416842e6c004dbd951e398a8651df6c56a030a23 Mon Sep 17 00:00:00 2001 From: Binbin Date: Sun, 12 Mar 2023 23:50:44 +0800 Subject: Fix the bug that CLIENT REPLY OFF|SKIP cannot receive push notifications (#11875) This bug seems to be there forever, CLIENT REPLY OFF|SKIP will mark the client with CLIENT_REPLY_OFF or CLIENT_REPLY_SKIP flags. With these flags, prepareClientToWrite called by addReply* will return C_ERR directly. So the client can't receive the Pub/Sub messages and any other push notifications, e.g client side tracking. In this PR, we adding a CLIENT_PUSHING flag, disables the reply silencing flags. When adding push replies, set the flag, after the reply, clear the flag. Then add the flag check in prepareClientToWrite. Fixes #11874 Note, the SUBSCRIBE command response is a bit awkward, see https://github.com/redis/redis-doc/pull/2327 Co-authored-by: Oran Agra --- src/debug.c | 3 ++ src/networking.c | 7 +++- src/pubsub.c | 18 +++++++++ src/server.h | 1 + src/tracking.c | 9 +++++ tests/unit/introspection.tcl | 42 ++++++++++++++++++++ tests/unit/pubsub.tcl | 28 ++++++++++++++ tests/unit/pubsubshard.tcl | 33 ++++++++++++++-- tests/unit/tracking.tcl | 91 ++++++++++++++++++++++++++++++++++++++++++++ 9 files changed, 227 insertions(+), 5 deletions(-) diff --git a/src/debug.c b/src/debug.c index 7a02c332b..41722a901 100644 --- a/src/debug.c +++ b/src/debug.c @@ -807,9 +807,12 @@ NULL addReplyError(c,"RESP2 is not supported by this command"); return; } + uint64_t old_flags = c->flags; + c->flags |= CLIENT_PUSHING; addReplyPushLen(c,2); addReplyBulkCString(c,"server-cpu-usage"); addReplyLongLong(c,42); + if (!(old_flags & CLIENT_PUSHING)) c->flags &= ~CLIENT_PUSHING; /* Push replies are not synchronous replies, so we emit also a * normal reply in order for blocking clients just discarding the * push reply, to actually consume the reply and continue. */ diff --git a/src/networking.c b/src/networking.c index 3b4caa4af..634c1fa89 100644 --- a/src/networking.c +++ b/src/networking.c @@ -292,8 +292,10 @@ int prepareClientToWrite(client *c) { /* If CLIENT_CLOSE_ASAP flag is set, we need not write anything. */ if (c->flags & CLIENT_CLOSE_ASAP) return C_ERR; - /* CLIENT REPLY OFF / SKIP handling: don't send replies. */ - if (c->flags & (CLIENT_REPLY_OFF|CLIENT_REPLY_SKIP)) return C_ERR; + /* CLIENT REPLY OFF / SKIP handling: don't send replies. + * CLIENT_PUSHING handling: disables the reply silencing flags. */ + if ((c->flags & (CLIENT_REPLY_OFF|CLIENT_REPLY_SKIP)) && + !(c->flags & CLIENT_PUSHING)) return C_ERR; /* Masters don't receive replies, unless CLIENT_MASTER_FORCE_REPLY flag * is set. */ @@ -976,6 +978,7 @@ void addReplyAttributeLen(client *c, long length) { void addReplyPushLen(client *c, long length) { serverAssert(c->resp >= 3); + serverAssertWithInfo(c, NULL, c->flags & CLIENT_PUSHING); addReplyAggregateLen(c,length,'>'); } diff --git a/src/pubsub.c b/src/pubsub.c index a257a8af3..2bbe40380 100644 --- a/src/pubsub.c +++ b/src/pubsub.c @@ -105,6 +105,8 @@ pubsubtype pubSubShardType = { * to send a special message (for instance an Array type) by using the * addReply*() API family. */ void addReplyPubsubMessage(client *c, robj *channel, robj *msg, robj *message_bulk) { + uint64_t old_flags = c->flags; + c->flags |= CLIENT_PUSHING; if (c->resp == 2) addReply(c,shared.mbulkhdr[3]); else @@ -112,12 +114,15 @@ void addReplyPubsubMessage(client *c, robj *channel, robj *msg, robj *message_bu addReply(c,message_bulk); addReplyBulk(c,channel); if (msg) addReplyBulk(c,msg); + if (!(old_flags & CLIENT_PUSHING)) c->flags &= ~CLIENT_PUSHING; } /* Send a pubsub message of type "pmessage" to the client. The difference * with the "message" type delivered by addReplyPubsubMessage() is that * this message format also includes the pattern that matched the message. */ void addReplyPubsubPatMessage(client *c, robj *pat, robj *channel, robj *msg) { + uint64_t old_flags = c->flags; + c->flags |= CLIENT_PUSHING; if (c->resp == 2) addReply(c,shared.mbulkhdr[4]); else @@ -126,10 +131,13 @@ void addReplyPubsubPatMessage(client *c, robj *pat, robj *channel, robj *msg) { addReplyBulk(c,pat); addReplyBulk(c,channel); addReplyBulk(c,msg); + if (!(old_flags & CLIENT_PUSHING)) c->flags &= ~CLIENT_PUSHING; } /* Send the pubsub subscription notification to the client. */ void addReplyPubsubSubscribed(client *c, robj *channel, pubsubtype type) { + uint64_t old_flags = c->flags; + c->flags |= CLIENT_PUSHING; if (c->resp == 2) addReply(c,shared.mbulkhdr[3]); else @@ -137,6 +145,7 @@ void addReplyPubsubSubscribed(client *c, robj *channel, pubsubtype type) { addReply(c,*type.subscribeMsg); addReplyBulk(c,channel); addReplyLongLong(c,type.subscriptionCount(c)); + if (!(old_flags & CLIENT_PUSHING)) c->flags &= ~CLIENT_PUSHING; } /* Send the pubsub unsubscription notification to the client. @@ -144,6 +153,8 @@ void addReplyPubsubSubscribed(client *c, robj *channel, pubsubtype type) { * unsubscribe command but there are no channels to unsubscribe from: we * still send a notification. */ void addReplyPubsubUnsubscribed(client *c, robj *channel, pubsubtype type) { + uint64_t old_flags = c->flags; + c->flags |= CLIENT_PUSHING; if (c->resp == 2) addReply(c,shared.mbulkhdr[3]); else @@ -154,10 +165,13 @@ void addReplyPubsubUnsubscribed(client *c, robj *channel, pubsubtype type) { else addReplyNull(c); addReplyLongLong(c,type.subscriptionCount(c)); + if (!(old_flags & CLIENT_PUSHING)) c->flags &= ~CLIENT_PUSHING; } /* Send the pubsub pattern subscription notification to the client. */ void addReplyPubsubPatSubscribed(client *c, robj *pattern) { + uint64_t old_flags = c->flags; + c->flags |= CLIENT_PUSHING; if (c->resp == 2) addReply(c,shared.mbulkhdr[3]); else @@ -165,6 +179,7 @@ void addReplyPubsubPatSubscribed(client *c, robj *pattern) { addReply(c,shared.psubscribebulk); addReplyBulk(c,pattern); addReplyLongLong(c,clientSubscriptionsCount(c)); + if (!(old_flags & CLIENT_PUSHING)) c->flags &= ~CLIENT_PUSHING; } /* Send the pubsub pattern unsubscription notification to the client. @@ -172,6 +187,8 @@ void addReplyPubsubPatSubscribed(client *c, robj *pattern) { * punsubscribe command but there are no pattern to unsubscribe from: we * still send a notification. */ void addReplyPubsubPatUnsubscribed(client *c, robj *pattern) { + uint64_t old_flags = c->flags; + c->flags |= CLIENT_PUSHING; if (c->resp == 2) addReply(c,shared.mbulkhdr[3]); else @@ -182,6 +199,7 @@ void addReplyPubsubPatUnsubscribed(client *c, robj *pattern) { else addReplyNull(c); addReplyLongLong(c,clientSubscriptionsCount(c)); + if (!(old_flags & CLIENT_PUSHING)) c->flags &= ~CLIENT_PUSHING; } /*----------------------------------------------------------------------------- diff --git a/src/server.h b/src/server.h index 056123d0f..04371e959 100644 --- a/src/server.h +++ b/src/server.h @@ -391,6 +391,7 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT]; #define CLIENT_ALLOW_OOM (1ULL<<44) /* Client used by RM_Call is allowed to fully execute scripts even when in OOM */ #define CLIENT_NO_TOUCH (1ULL<<45) /* This client will not touch LFU/LRU stats. */ +#define CLIENT_PUSHING (1ULL<<46) /* This client is pushing notifications. */ /* Client block type (btype field in client structure) * if CLIENT_BLOCKED flag is set. */ diff --git a/src/tracking.c b/src/tracking.c index 775eea684..5a9b114aa 100644 --- a/src/tracking.c +++ b/src/tracking.c @@ -266,6 +266,9 @@ void trackingRememberKeys(client *tracking, client *executing) { * - Following a flush command, to send a single RESP NULL to indicate * that all keys are now invalid. */ void sendTrackingMessage(client *c, char *keyname, size_t keylen, int proto) { + uint64_t old_flags = c->flags; + c->flags |= CLIENT_PUSHING; + int using_redirection = 0; if (c->client_tracking_redirection) { client *redir = lookupClientByID(c->client_tracking_redirection); @@ -279,10 +282,14 @@ void sendTrackingMessage(client *c, char *keyname, size_t keylen, int proto) { addReplyBulkCBuffer(c,"tracking-redir-broken",21); addReplyLongLong(c,c->client_tracking_redirection); } + if (!(old_flags & CLIENT_PUSHING)) c->flags &= ~CLIENT_PUSHING; return; } + if (!(old_flags & CLIENT_PUSHING)) c->flags &= ~CLIENT_PUSHING; c = redir; using_redirection = 1; + old_flags = c->flags; + c->flags |= CLIENT_PUSHING; } /* Only send such info for clients in RESP version 3 or more. However @@ -301,6 +308,7 @@ void sendTrackingMessage(client *c, char *keyname, size_t keylen, int proto) { * redirecting to another client. We can't send anything to * it since RESP2 does not support push messages in the same * connection. */ + if (!(old_flags & CLIENT_PUSHING)) c->flags &= ~CLIENT_PUSHING; return; } @@ -312,6 +320,7 @@ void sendTrackingMessage(client *c, char *keyname, size_t keylen, int proto) { addReplyBulkCBuffer(c,keyname,keylen); } updateClientMemUsageAndBucket(c); + if (!(old_flags & CLIENT_PUSHING)) c->flags &= ~CLIENT_PUSHING; } /* This function is called when a key is modified in Redis and in the case diff --git a/tests/unit/introspection.tcl b/tests/unit/introspection.tcl index 097074047..10d3a15e9 100644 --- a/tests/unit/introspection.tcl +++ b/tests/unit/introspection.tcl @@ -94,6 +94,48 @@ start_server {tags {"introspection"}} { } } {} {needs:save} + test "CLIENT REPLY OFF/ON: disable all commands reply" { + set rd [redis_deferring_client] + + # These replies were silenced. + $rd client reply off + $rd ping pong + $rd ping pong2 + + $rd client reply on + assert_equal {OK} [$rd read] + $rd ping pong3 + assert_equal {pong3} [$rd read] + + $rd close + } + + test "CLIENT REPLY SKIP: skip the next command reply" { + set rd [redis_deferring_client] + + # The first pong reply was silenced. + $rd client reply skip + $rd ping pong + + $rd ping pong2 + assert_equal {pong2} [$rd read] + + $rd close + } + + test "CLIENT REPLY ON: unset SKIP flag" { + set rd [redis_deferring_client] + + $rd client reply skip + $rd client reply on + assert_equal {OK} [$rd read] ;# OK from CLIENT REPLY ON command + + $rd ping + assert_equal {PONG} [$rd read] + + $rd close + } + test {MONITOR can log executed commands} { set rd [redis_deferring_client] $rd monitor diff --git a/tests/unit/pubsub.tcl b/tests/unit/pubsub.tcl index fe486edf3..0dd32c397 100644 --- a/tests/unit/pubsub.tcl +++ b/tests/unit/pubsub.tcl @@ -166,6 +166,30 @@ start_server {tags {"pubsub network"}} { $rd1 close } + test "PubSub messages with CLIENT REPLY OFF" { + set rd [redis_deferring_client] + $rd hello 3 + $rd read ;# Discard the hello reply + + # Test that the subscribe/psubscribe notification is ok + $rd client reply off + assert_equal {1} [subscribe $rd channel] + assert_equal {2} [psubscribe $rd ch*] + + # Test that the publish notification is ok + $rd client reply off + assert_equal 2 [r publish channel hello] + assert_equal {message channel hello} [$rd read] + assert_equal {pmessage ch* channel hello} [$rd read] + + # Test that the unsubscribe/punsubscribe notification is ok + $rd client reply off + assert_equal {1} [unsubscribe $rd channel] + assert_equal {0} [punsubscribe $rd ch*] + + $rd close + } + test "PUNSUBSCRIBE from non-subscribed channels" { set rd1 [redis_deferring_client] assert_equal {0 0 0} [punsubscribe $rd1 {foo.* bar.* quux.*}] @@ -226,6 +250,7 @@ start_server {tags {"pubsub network"}} { test "Keyspace notifications: we receive keyspace notifications" { r config set notify-keyspace-events KA set rd1 [redis_deferring_client] + $rd1 CLIENT REPLY OFF ;# Make sure it works even if replies are silenced assert_equal {1} [psubscribe $rd1 *] r set foo bar assert_equal "pmessage * __keyspace@${db}__:foo set" [$rd1 read] @@ -235,6 +260,7 @@ start_server {tags {"pubsub network"}} { test "Keyspace notifications: we receive keyevent notifications" { r config set notify-keyspace-events EA set rd1 [redis_deferring_client] + $rd1 CLIENT REPLY SKIP ;# Make sure it works even if replies are silenced assert_equal {1} [psubscribe $rd1 *] r set foo bar assert_equal "pmessage * __keyevent@${db}__:set foo" [$rd1 read] @@ -244,6 +270,8 @@ start_server {tags {"pubsub network"}} { test "Keyspace notifications: we can receive both kind of events" { r config set notify-keyspace-events KEA set rd1 [redis_deferring_client] + $rd1 CLIENT REPLY ON ;# Just coverage + assert_equal {OK} [$rd1 read] assert_equal {1} [psubscribe $rd1 *] r set foo bar assert_equal "pmessage * __keyspace@${db}__:foo set" [$rd1 read] diff --git a/tests/unit/pubsubshard.tcl b/tests/unit/pubsubshard.tcl index 8cccdcff6..6e3fb61c1 100644 --- a/tests/unit/pubsubshard.tcl +++ b/tests/unit/pubsubshard.tcl @@ -40,7 +40,7 @@ start_server {tags {"pubsubshard external:skip"}} { $rd2 close } - test "PUBLISH/SUBSCRIBE after UNSUBSCRIBE without arguments" { + test "SPUBLISH/SSUBSCRIBE after UNSUBSCRIBE without arguments" { set rd1 [redis_deferring_client] assert_equal {1} [ssubscribe $rd1 {chan1}] assert_equal {2} [ssubscribe $rd1 {chan2}] @@ -54,7 +54,7 @@ start_server {tags {"pubsubshard external:skip"}} { $rd1 close } - test "SUBSCRIBE to one channel more than once" { + test "SSUBSCRIBE to one channel more than once" { set rd1 [redis_deferring_client] assert_equal {1 1 1} [ssubscribe $rd1 {chan1 chan1 chan1}] assert_equal 1 [r SPUBLISH chan1 hello] @@ -64,7 +64,7 @@ start_server {tags {"pubsubshard external:skip"}} { $rd1 close } - test "UNSUBSCRIBE from non-subscribed channels" { + test "SUNSUBSCRIBE from non-subscribed channels" { set rd1 [redis_deferring_client] assert_equal {0} [sunsubscribe $rd1 {foo}] assert_equal {0} [sunsubscribe $rd1 {bar}] @@ -105,6 +105,33 @@ start_server {tags {"pubsubshard external:skip"}} { assert_equal "chan1 1" [r pubsub numsub chan1] assert_equal "chan1" [r pubsub shardchannels] assert_equal "chan1" [r pubsub channels] + + $rd1 close + $rd2 close + } + + test "PubSubShard with CLIENT REPLY OFF" { + set rd [redis_deferring_client] + $rd hello 3 + $rd read ;# Discard the hello reply + + # Test that the ssubscribe notification is ok + $rd client reply off + $rd ping + assert_equal {1} [ssubscribe $rd channel] + + # Test that the spublish notification is ok + $rd client reply off + $rd ping + assert_equal 1 [r spublish channel hello] + assert_equal {smessage channel hello} [$rd read] + + # Test that sunsubscribe notification is ok + $rd client reply off + $rd ping + assert_equal {0} [sunsubscribe $rd channel] + + $rd close } } diff --git a/tests/unit/tracking.tcl b/tests/unit/tracking.tcl index 21036352f..bea8508b1 100644 --- a/tests/unit/tracking.tcl +++ b/tests/unit/tracking.tcl @@ -781,7 +781,98 @@ start_server {tags {"tracking network logreqres:skip"}} { r debug pause-cron 0 } {OK} {needs:debug} + foreach resp {3 2} { + test "RESP$resp based basic invalidation with client reply off" { + # This entire test is mostly irrelevant for RESP2, but we run it anyway just for some extra coverage. + clean_all + + $rd hello $resp + $rd read + $rd client tracking on + $rd read + + $rd_sg set foo bar + $rd get foo + $rd read + + $rd client reply off + + $rd_sg set foo bar2 + + if {$resp == 3} { + assert_equal {invalidate foo} [$rd read] + } elseif {$resp == 2} { } ;# Just coverage + + # Verify things didn't get messed up and no unexpected reply was pushed to the client. + $rd client reply on + assert_equal {OK} [$rd read] + $rd ping + assert_equal {PONG} [$rd read] + } + } + + test {RESP3 based basic redirect invalidation with client reply off} { + clean_all + + set rd_redir [redis_deferring_client] + $rd_redir hello 3 + $rd_redir read + + $rd_redir client id + set rd_redir_id [$rd_redir read] + + $rd client tracking on redirect $rd_redir_id + $rd read + + $rd_sg set foo bar + $rd get foo + $rd read + + $rd_redir client reply off + + $rd_sg set foo bar2 + assert_equal {invalidate foo} [$rd_redir read] + + # Verify things didn't get messed up and no unexpected reply was pushed to the client. + $rd_redir client reply on + assert_equal {OK} [$rd_redir read] + $rd_redir ping + assert_equal {PONG} [$rd_redir read] + + $rd_redir close + } + + test {RESP3 based basic tracking-redir-broken with client reply off} { + clean_all + + $rd hello 3 + $rd read + $rd client tracking on redirect $redir_id + $rd read + + $rd_sg set foo bar + $rd get foo + $rd read + + $rd client reply off + + $rd_redirection quit + $rd_redirection read + + $rd_sg set foo bar2 + + set res [lsearch -exact [$rd read] "tracking-redir-broken"] + assert_morethan_equal $res 0 + + # Verify things didn't get messed up and no unexpected reply was pushed to the client. + $rd client reply on + assert_equal {OK} [$rd read] + $rd ping + assert_equal {PONG} [$rd read] + } + $rd_redirection close + $rd_sg close $rd close } -- cgit v1.2.1