summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Klishin <michael@clojurewerkz.org>2021-10-19 23:04:42 +0300
committerMichael Klishin <michael@clojurewerkz.org>2021-10-19 23:04:42 +0300
commit9cf18e83f279408e20430b55428a2b19156c90d7 (patch)
tree79eebeef8d66ad93f80b28963f84511431caa799
parent1e7df8c436174735b1d167673afd3f1642da5cdc (diff)
parent87325b09f7d066a5b7bde1682e3be877a604871e (diff)
downloadrabbitmq-server-git-9cf18e83f279408e20430b55428a2b19156c90d7.tar.gz
Merge branch 'RentTheRunway-falconertc/fix_duplicate_binding_exchange'
-rw-r--r--deps/rabbitmq_consistent_hash_exchange/src/rabbit_exchange_type_consistent_hash.erl7
-rw-r--r--deps/rabbitmq_consistent_hash_exchange/test/rabbit_exchange_type_consistent_hash_SUITE.erl97
2 files changed, 99 insertions, 5 deletions
diff --git a/deps/rabbitmq_consistent_hash_exchange/src/rabbit_exchange_type_consistent_hash.erl b/deps/rabbitmq_consistent_hash_exchange/src/rabbit_exchange_type_consistent_hash.erl
index e9da8a63d5..44d7dca3b2 100644
--- a/deps/rabbitmq_consistent_hash_exchange/src/rabbit_exchange_type_consistent_hash.erl
+++ b/deps/rabbitmq_consistent_hash_exchange/src/rabbit_exchange_type_consistent_hash.erl
@@ -227,7 +227,6 @@ remove_bindings(none, X, Bindings) ->
ok.
remove_binding(#binding{source = S, destination = D, key = RK}) ->
- Weight = rabbit_data_coercion:to_integer(RK),
rabbit_log:debug("Consistent hashing exchange: removing binding "
"from exchange '~p' to destination '~p' with routing key '~s'",
[rabbit_misc:rs(S), rabbit_misc:rs(D), RK]),
@@ -237,7 +236,7 @@ remove_binding(#binding{source = S, destination = D, key = RK}) ->
next_bucket_number = NexN0}] ->
%% Buckets with lower numbers stay as is; buckets that
%% belong to this binding are removed; buckets with
- %% greater numbers are updated (their numbers are adjusted downwards by weight)
+ %% greater numbers are updated (their numbers are adjusted downwards)
BucketsOfThisBinding = maps:filter(fun (_K, V) -> V =:= D end, BM0),
case maps:size(BucketsOfThisBinding) of
0 -> ok;
@@ -251,10 +250,10 @@ remove_binding(#binding{source = S, destination = D, key = RK}) ->
%% final state with "down the ring" buckets updated
NewBucketsDownTheRing = maps:fold(
fun(K0, V, Acc) ->
- maps:put(K0 - Weight, V, Acc)
+ maps:put(K0 - N, V, Acc)
end, #{}, BucketsDownTheRing),
BM1 = maps:merge(UnchangedBuckets, NewBucketsDownTheRing),
- NextN = NexN0 - Weight,
+ NextN = NexN0 - N,
State = State0#chx_hash_ring{bucket_map = BM1,
next_bucket_number = NextN},
diff --git a/deps/rabbitmq_consistent_hash_exchange/test/rabbit_exchange_type_consistent_hash_SUITE.erl b/deps/rabbitmq_consistent_hash_exchange/test/rabbit_exchange_type_consistent_hash_SUITE.erl
index cd8397a597..1439c45b6d 100644
--- a/deps/rabbitmq_consistent_hash_exchange/test/rabbit_exchange_type_consistent_hash_SUITE.erl
+++ b/deps/rabbitmq_consistent_hash_exchange/test/rabbit_exchange_type_consistent_hash_SUITE.erl
@@ -41,7 +41,9 @@ groups() ->
test_hash_ring_updates_when_exclusive_queues_are_deleted_due_to_connection_closure_case5,
test_hash_ring_updates_when_exclusive_queues_are_deleted_due_to_connection_closure_case6,
test_hash_ring_updates_when_exchange_is_deleted,
- test_hash_ring_updates_when_queue_is_unbound
+ test_hash_ring_updates_when_queue_is_unbound,
+ test_hash_ring_updates_when_duplicate_binding_is_created_and_queue_is_deleted,
+ test_hash_ring_updates_when_duplicate_binding_is_created_and_binding_is_deleted
]}
].
@@ -557,6 +559,99 @@ test_hash_ring_updates_when_queue_is_unbound(Config) ->
rabbit_ct_client_helpers:close_channel(Chan),
ok.
+test_hash_ring_updates_when_duplicate_binding_is_created_and_queue_is_deleted(Config) ->
+ Chan = rabbit_ct_client_helpers:open_channel(Config, 0),
+
+ X = <<"test_hash_ring_updates_when_duplicate_binding_is_created_and_queue_is_deleted">>,
+ amqp_channel:call(Chan, #'exchange.delete' {exchange = X}),
+
+ Declare = #'exchange.declare'{exchange = X,
+ type = <<"x-consistent-hash">>},
+ #'exchange.declare_ok'{} = amqp_channel:call(Chan, Declare),
+
+ Q1 = <<"f-q1">>,
+ #'queue.declare_ok'{} =
+ amqp_channel:call(Chan, #'queue.declare'{
+ queue = Q1, durable = true, exclusive = false}),
+ #'queue.bind_ok'{} =
+ amqp_channel:call(Chan, #'queue.bind'{queue = Q1,
+ exchange = X,
+ routing_key = <<"2">>}),
+
+ #'queue.bind_ok'{} =
+ amqp_channel:call(Chan, #'queue.bind'{queue = Q1,
+ exchange = X,
+ routing_key = <<"3">>}),
+
+ ?assertEqual(5, count_buckets_of_exchange(Config, X)),
+ assert_ring_consistency(Config, X),
+
+ Q2 = <<"f-q2">>,
+ #'queue.declare_ok'{} =
+ amqp_channel:call(Chan, #'queue.declare'{
+ queue = Q2, durable = true, exclusive = false}),
+ #'queue.bind_ok'{} =
+ amqp_channel:call(Chan, #'queue.bind'{queue = Q2,
+ exchange = X,
+ routing_key = <<"4">>}),
+
+ ?assertEqual(9, count_buckets_of_exchange(Config, X)),
+ assert_ring_consistency(Config, X),
+
+ amqp_channel:call(Chan, #'queue.delete' {queue = Q1}),
+ ?assertEqual(4, count_buckets_of_exchange(Config, X)),
+ assert_ring_consistency(Config, X),
+
+ clean_up_test_topology(Config, X, [Q1, Q2]),
+ rabbit_ct_client_helpers:close_channel(Chan),
+ ok.
+
+test_hash_ring_updates_when_duplicate_binding_is_created_and_binding_is_deleted(Config) ->
+ Chan = rabbit_ct_client_helpers:open_channel(Config, 0),
+
+ X = <<"test_hash_ring_updates_when_duplicate_binding_is_created_and_binding_is_deleted">>,
+ amqp_channel:call(Chan, #'exchange.delete' {exchange = X}),
+
+ Declare = #'exchange.declare'{exchange = X,
+ type = <<"x-consistent-hash">>},
+ #'exchange.declare_ok'{} = amqp_channel:call(Chan, Declare),
+
+ Q1 = <<"f-q1">>,
+ #'queue.declare_ok'{} =
+ amqp_channel:call(Chan, #'queue.declare'{
+ queue = Q1, durable = true, exclusive = false}),
+ #'queue.bind_ok'{} =
+ amqp_channel:call(Chan, #'queue.bind'{queue = Q1,
+ exchange = X,
+ routing_key = <<"2">>}),
+
+ #'queue.bind_ok'{} =
+ amqp_channel:call(Chan, #'queue.bind'{queue = Q1,
+ exchange = X,
+ routing_key = <<"3">>}),
+
+ Q2 = <<"f-q2">>,
+ #'queue.declare_ok'{} =
+ amqp_channel:call(Chan, #'queue.declare'{
+ queue = Q2, durable = true, exclusive = false}),
+ #'queue.bind_ok'{} =
+ amqp_channel:call(Chan, #'queue.bind'{queue = Q2,
+ exchange = X,
+ routing_key = <<"4">>}),
+
+ ?assertEqual(9, count_buckets_of_exchange(Config, X)),
+ assert_ring_consistency(Config, X),
+
+ %% Both bindings to Q1 will be deleted
+ amqp_channel:call(Chan, #'queue.unbind'{queue = Q1,
+ exchange = X,
+ routing_key = <<"3">>}),
+ ?assertEqual(4, count_buckets_of_exchange(Config, X)),
+ assert_ring_consistency(Config, X),
+
+ clean_up_test_topology(Config, X, [Q1, Q2]),
+ rabbit_ct_client_helpers:close_channel(Chan),
+ ok.
%%
%% Helpers