summaryrefslogtreecommitdiff
path: root/tests/cluster
diff options
context:
space:
mode:
authorHarkrishn Patro <30795839+hpatro@users.noreply.github.com>2022-01-03 01:54:47 +0100
committerGitHub <noreply@github.com>2022-01-02 16:54:47 -0800
commit9f8885760b53e6d3952b9c9b41f9e6c48dfa6cec (patch)
tree770dfdbff19a1a2a1c71a642ebd844d592ef3d26 /tests/cluster
parentb8ba942ac2aabf51fd96134d9fa21b47d3baff4a (diff)
downloadredis-9f8885760b53e6d3952b9c9b41f9e6c48dfa6cec.tar.gz
Sharded pubsub implementation (#8621)
This commit implements a sharded pubsub implementation based off of shard channels. Co-authored-by: Harkrishn Patro <harkrisp@amazon.com> Co-authored-by: Madelyn Olson <madelyneolson@gmail.com>
Diffstat (limited to 'tests/cluster')
-rw-r--r--tests/cluster/cluster.tcl30
-rw-r--r--tests/cluster/tests/19-cluster-nodes-slots.tcl21
-rw-r--r--tests/cluster/tests/23-multiple-slot-operations.tcl8
-rw-r--r--tests/cluster/tests/25-pubsubshard-slot-migration.tcl171
-rw-r--r--tests/cluster/tests/26-pubsubshard.tcl94
5 files changed, 297 insertions, 27 deletions
diff --git a/tests/cluster/cluster.tcl b/tests/cluster/cluster.tcl
index 7b7ce5343..31e0c3667 100644
--- a/tests/cluster/cluster.tcl
+++ b/tests/cluster/cluster.tcl
@@ -66,7 +66,7 @@ proc s {n field} {
get_info_field [R $n info] $field
}
-# Assuming nodes are reest, this function performs slots allocation.
+# Assuming nodes are reset, this function performs slots allocation.
# Only the first 'n' nodes are used.
proc cluster_allocate_slots {n} {
set slot 16383
@@ -129,6 +129,32 @@ proc create_cluster {masters slaves} {
set ::cluster_replica_nodes $slaves
}
+proc cluster_allocate_with_continuous_slots {n} {
+ set slot 16383
+ set avg [expr ($slot+1) / $n]
+ while {$slot >= 0} {
+ set node [expr $slot/$avg >= $n ? $n-1 : $slot/$avg]
+ lappend slots_$node $slot
+ incr slot -1
+ }
+ for {set j 0} {$j < $n} {incr j} {
+ R $j cluster addslots {*}[set slots_${j}]
+ }
+}
+
+# Create a cluster composed of the specified number of masters and slaves with continuous slots.
+proc cluster_create_with_continuous_slots {masters slaves} {
+ cluster_allocate_with_continuous_slots $masters
+ if {$slaves} {
+ cluster_allocate_slaves $masters $slaves
+ }
+ assert_cluster_state ok
+
+ set ::cluster_master_nodes $masters
+ set ::cluster_replica_nodes $slaves
+}
+
+
# Set the cluster node-timeout to all the reachalbe nodes.
proc set_cluster_node_timeout {to} {
foreach_redis_id id {
@@ -243,4 +269,4 @@ proc get_link_from_peer {this_instance_id peer_nodename} {
}
}
return {}
-}
+} \ No newline at end of file
diff --git a/tests/cluster/tests/19-cluster-nodes-slots.tcl b/tests/cluster/tests/19-cluster-nodes-slots.tcl
index 80f68d5d0..77faec912 100644
--- a/tests/cluster/tests/19-cluster-nodes-slots.tcl
+++ b/tests/cluster/tests/19-cluster-nodes-slots.tcl
@@ -2,27 +2,6 @@
source "../tests/includes/init-tests.tcl"
-proc cluster_allocate_with_continuous_slots {n} {
- set slot 16383
- set avg [expr ($slot+1) / $n]
- while {$slot >= 0} {
- set node [expr $slot/$avg >= $n ? $n-1 : $slot/$avg]
- lappend slots_$node $slot
- incr slot -1
- }
- for {set j 0} {$j < $n} {incr j} {
- R $j cluster addslots {*}[set slots_${j}]
- }
-}
-
-proc cluster_create_with_continuous_slots {masters slaves} {
- cluster_allocate_with_continuous_slots $masters
- if {$slaves} {
- cluster_allocate_slaves $masters $slaves
- }
- assert_cluster_state ok
-}
-
test "Create a 2 nodes cluster" {
cluster_create_with_continuous_slots 2 2
}
diff --git a/tests/cluster/tests/23-multiple-slot-operations.tcl b/tests/cluster/tests/23-multiple-slot-operations.tcl
index 906033c72..965ecd5af 100644
--- a/tests/cluster/tests/23-multiple-slot-operations.tcl
+++ b/tests/cluster/tests/23-multiple-slot-operations.tcl
@@ -2,7 +2,7 @@
source "../tests/includes/init-tests.tcl"
-proc cluster_allocate_with_continuous_slots {n} {
+proc cluster_allocate_with_continuous_slots_local {n} {
R 0 cluster ADDSLOTSRANGE 0 3276
R 1 cluster ADDSLOTSRANGE 3277 6552
R 2 cluster ADDSLOTSRANGE 6553 9828
@@ -10,8 +10,8 @@ proc cluster_allocate_with_continuous_slots {n} {
R 4 cluster ADDSLOTSRANGE 13105 16383
}
-proc cluster_create_with_continuous_slots {masters slaves} {
- cluster_allocate_with_continuous_slots $masters
+proc cluster_create_with_continuous_slots_local {masters slaves} {
+ cluster_allocate_with_continuous_slots_local $masters
if {$slaves} {
cluster_allocate_slaves $masters $slaves
}
@@ -20,7 +20,7 @@ proc cluster_create_with_continuous_slots {masters slaves} {
test "Create a 5 nodes cluster" {
- cluster_create_with_continuous_slots 5 5
+ cluster_create_with_continuous_slots_local 5 5
}
test "Cluster should start ok" {
diff --git a/tests/cluster/tests/25-pubsubshard-slot-migration.tcl b/tests/cluster/tests/25-pubsubshard-slot-migration.tcl
new file mode 100644
index 000000000..11b77d36a
--- /dev/null
+++ b/tests/cluster/tests/25-pubsubshard-slot-migration.tcl
@@ -0,0 +1,171 @@
+source "../tests/includes/init-tests.tcl"
+
+test "Create a 3 nodes cluster" {
+ cluster_create_with_continuous_slots 3 3
+}
+
+test "Cluster is up" {
+ assert_cluster_state ok
+}
+
+set cluster [redis_cluster 127.0.0.1:[get_instance_attrib redis 0 port]]
+
+test "Migrate a slot, verify client receives sunsubscribe on primary serving the slot." {
+
+ # Setup the to and from node
+ set channelname mychannel
+ set slot [$cluster cluster keyslot $channelname]
+ array set nodefrom [$cluster masternode_for_slot $slot]
+ array set nodeto [$cluster masternode_notfor_slot $slot]
+
+ set subscribeclient [redis_deferring_client_by_addr $nodefrom(host) $nodefrom(port)]
+
+ $subscribeclient deferred 1
+ $subscribeclient ssubscribe $channelname
+ $subscribeclient read
+
+ assert_equal {OK} [$nodefrom(link) cluster setslot $slot migrating $nodeto(id)]
+ assert_equal {OK} [$nodeto(link) cluster setslot $slot importing $nodefrom(id)]
+
+ # Verify subscribe is still valid, able to receive messages.
+ $nodefrom(link) spublish $channelname hello
+ assert_equal {message mychannel hello} [$subscribeclient read]
+
+ assert_equal {OK} [$nodefrom(link) cluster setslot $slot node $nodeto(id)]
+
+ set msg [$subscribeclient read]
+ assert {"sunsubscribe" eq [lindex $msg 0]}
+ assert {$channelname eq [lindex $msg 1]}
+ assert {"0" eq [lindex $msg 2]}
+
+ assert_equal {OK} [$nodeto(link) cluster setslot $slot node $nodeto(id)]
+
+ $subscribeclient close
+}
+
+test "Client subscribes to multiple channels, migrate a slot, verify client receives sunsubscribe on primary serving the slot." {
+
+ # Setup the to and from node
+ set channelname ch3
+ set anotherchannelname ch7
+ set slot [$cluster cluster keyslot $channelname]
+ array set nodefrom [$cluster masternode_for_slot $slot]
+ array set nodeto [$cluster masternode_notfor_slot $slot]
+
+ set subscribeclient [redis_deferring_client_by_addr $nodefrom(host) $nodefrom(port)]
+
+ $subscribeclient deferred 1
+ $subscribeclient ssubscribe $channelname
+ $subscribeclient read
+
+ $subscribeclient ssubscribe $anotherchannelname
+ $subscribeclient read
+
+ assert_equal {OK} [$nodefrom(link) cluster setslot $slot migrating $nodeto(id)]
+ assert_equal {OK} [$nodeto(link) cluster setslot $slot importing $nodefrom(id)]
+
+ # Verify subscribe is still valid, able to receive messages.
+ $nodefrom(link) spublish $channelname hello
+ assert_equal {message ch3 hello} [$subscribeclient read]
+
+ assert_equal {OK} [$nodefrom(link) cluster setslot $slot node $nodeto(id)]
+
+ # Verify the client receives sunsubscribe message for the channel(slot) which got migrated.
+ set msg [$subscribeclient read]
+ assert {"sunsubscribe" eq [lindex $msg 0]}
+ assert {$channelname eq [lindex $msg 1]}
+ assert {"1" eq [lindex $msg 2]}
+
+ assert_equal {OK} [$nodeto(link) cluster setslot $slot node $nodeto(id)]
+
+ $nodefrom(link) spublish $anotherchannelname hello
+
+ # Verify the client is still connected and receives message from the other channel.
+ set msg [$subscribeclient read]
+ assert {"message" eq [lindex $msg 0]}
+ assert {$anotherchannelname eq [lindex $msg 1]}
+ assert {"hello" eq [lindex $msg 2]}
+
+ $subscribeclient close
+}
+
+test "Migrate a slot, verify client receives sunsubscribe on replica serving the slot." {
+
+ # Setup the to and from node
+ set channelname mychannel1
+ set slot [$cluster cluster keyslot $channelname]
+ array set nodefrom [$cluster masternode_for_slot $slot]
+ array set nodeto [$cluster masternode_notfor_slot $slot]
+
+ # Get replica node serving slot (mychannel) to connect a client.
+ set replicanodeinfo [$cluster cluster replicas $nodefrom(id)]
+ set args [split $replicanodeinfo " "]
+ set addr [lindex [split [lindex $args 1] @] 0]
+ set replicahost [lindex [split $addr :] 0]
+ set replicaport [lindex [split $addr :] 1]
+ set subscribeclient [redis_deferring_client_by_addr $replicahost $replicaport]
+
+ $subscribeclient deferred 1
+ $subscribeclient ssubscribe $channelname
+ $subscribeclient read
+
+ assert_equal {OK} [$nodefrom(link) cluster setslot $slot migrating $nodeto(id)]
+ assert_equal {OK} [$nodeto(link) cluster setslot $slot importing $nodefrom(id)]
+
+ # Verify subscribe is still valid, able to receive messages.
+ $nodefrom(link) spublish $channelname hello
+ assert_equal {message mychannel1 hello} [$subscribeclient read]
+
+ assert_equal {OK} [$nodefrom(link) cluster setslot $slot node $nodeto(id)]
+ assert_equal {OK} [$nodeto(link) cluster setslot $slot node $nodeto(id)]
+
+ set msg [$subscribeclient read]
+ assert {"sunsubscribe" eq [lindex $msg 0]}
+ assert {$channelname eq [lindex $msg 1]}
+ assert {"0" eq [lindex $msg 2]}
+
+ $subscribeclient close
+}
+
+test "Delete a slot, verify sunsubscribe message" {
+ set channelname ch2
+ set slot [$cluster cluster keyslot $channelname]
+
+ array set primary_client [$cluster masternode_for_slot $slot]
+
+ set subscribeclient [redis_deferring_client_by_addr $primary_client(host) $primary_client(port)]
+ $subscribeclient deferred 1
+ $subscribeclient ssubscribe $channelname
+ $subscribeclient read
+
+ $primary_client(link) cluster DELSLOTS $slot
+
+ set msg [$subscribeclient read]
+ assert {"sunsubscribe" eq [lindex $msg 0]}
+ assert {$channelname eq [lindex $msg 1]}
+ assert {"0" eq [lindex $msg 2]}
+
+ $subscribeclient close
+}
+
+test "Reset cluster, verify sunsubscribe message" {
+ set channelname ch4
+ set slot [$cluster cluster keyslot $channelname]
+
+ array set primary_client [$cluster masternode_for_slot $slot]
+
+ set subscribeclient [redis_deferring_client_by_addr $primary_client(host) $primary_client(port)]
+ $subscribeclient deferred 1
+ $subscribeclient ssubscribe $channelname
+ $subscribeclient read
+
+ $cluster cluster reset HARD
+
+ set msg [$subscribeclient read]
+ assert {"sunsubscribe" eq [lindex $msg 0]}
+ assert {$channelname eq [lindex $msg 1]}
+ assert {"0" eq [lindex $msg 2]}
+
+ $cluster close
+ $subscribeclient close
+} \ No newline at end of file
diff --git a/tests/cluster/tests/26-pubsubshard.tcl b/tests/cluster/tests/26-pubsubshard.tcl
new file mode 100644
index 000000000..2619eda0a
--- /dev/null
+++ b/tests/cluster/tests/26-pubsubshard.tcl
@@ -0,0 +1,94 @@
+# Test PUBSUB shard propagation in a cluster slot.
+
+source "../tests/includes/init-tests.tcl"
+
+test "Create a 3 nodes cluster" {
+ cluster_create_with_continuous_slots 3 3
+}
+
+set cluster [redis_cluster 127.0.0.1:[get_instance_attrib redis 0 port]]
+test "Pub/Sub shard basics" {
+
+ set slot [$cluster cluster keyslot "channel.0"]
+ array set publishnode [$cluster masternode_for_slot $slot]
+ array set notshardnode [$cluster masternode_notfor_slot $slot]
+
+ set publishclient [redis_client_by_addr $publishnode(host) $publishnode(port)]
+ set subscribeclient [redis_deferring_client_by_addr $publishnode(host) $publishnode(port)]
+ set subscribeclient2 [redis_deferring_client_by_addr $publishnode(host) $publishnode(port)]
+ set anotherclient [redis_deferring_client_by_addr $notshardnode(host) $notshardnode(port)]
+
+ $subscribeclient ssubscribe channel.0
+ $subscribeclient read
+
+ $subscribeclient2 ssubscribe channel.0
+ $subscribeclient2 read
+
+ $anotherclient ssubscribe channel.0
+ catch {$anotherclient read} err
+ assert_match {MOVED *} $err
+
+ set data [randomValue]
+ $publishclient spublish channel.0 $data
+
+ set msg [$subscribeclient read]
+ assert_equal $data [lindex $msg 2]
+
+ set msg [$subscribeclient2 read]
+ assert_equal $data [lindex $msg 2]
+
+ $publishclient close
+ $subscribeclient close
+ $subscribeclient2 close
+ $anotherclient close
+}
+
+test "client can't subscribe to multiple shard channels across different slots in same call" {
+ catch {$cluster ssubscribe channel.0 channel.1} err
+ assert_match {CROSSSLOT Keys*} $err
+}
+
+test "client can subscribe to multiple shard channels across different slots in separate call" {
+ $cluster ssubscribe ch3
+ $cluster ssubscribe ch7
+
+ $cluster sunsubscribe ch3
+ $cluster sunsubscribe ch7
+}
+
+
+test "Verify Pub/Sub and Pub/Sub shard no overlap" {
+ set slot [$cluster cluster keyslot "channel.0"]
+ array set publishnode [$cluster masternode_for_slot $slot]
+ array set notshardnode [$cluster masternode_notfor_slot $slot]
+
+ set publishshardclient [redis_client_by_addr $publishnode(host) $publishnode(port)]
+ set publishclient [redis_deferring_client_by_addr $publishnode(host) $publishnode(port)]
+ set subscribeshardclient [redis_deferring_client_by_addr $publishnode(host) $publishnode(port)]
+ set subscribeclient [redis_deferring_client_by_addr $publishnode(host) $publishnode(port)]
+
+ $subscribeshardclient deferred 1
+ $subscribeshardclient ssubscribe channel.0
+ $subscribeshardclient read
+
+ $subscribeclient deferred 1
+ $subscribeclient subscribe channel.0
+ $subscribeclient read
+
+ set sharddata "testingpubsubdata"
+ $publishshardclient spublish channel.0 $sharddata
+
+ set data "somemoredata"
+ $publishclient publish channel.0 $data
+
+ set msg [$subscribeshardclient read]
+ assert_equal $sharddata [lindex $msg 2]
+
+ set msg [$subscribeclient read]
+ assert_equal $data [lindex $msg 2]
+
+ $cluster close
+ $publishclient close
+ $subscribeclient close
+ $subscribeshardclient close
+} \ No newline at end of file