diff options
author | Michael Klishin <michael@clojurewerkz.org> | 2021-10-19 23:04:42 +0300 |
---|---|---|
committer | Michael Klishin <michael@clojurewerkz.org> | 2021-10-19 23:04:42 +0300 |
commit | 9cf18e83f279408e20430b55428a2b19156c90d7 (patch) | |
tree | 79eebeef8d66ad93f80b28963f84511431caa799 | |
parent | 1e7df8c436174735b1d167673afd3f1642da5cdc (diff) | |
parent | 87325b09f7d066a5b7bde1682e3be877a604871e (diff) | |
download | rabbitmq-server-git-9cf18e83f279408e20430b55428a2b19156c90d7.tar.gz |
Merge branch 'RentTheRunway-falconertc/fix_duplicate_binding_exchange'
-rw-r--r-- | deps/rabbitmq_consistent_hash_exchange/src/rabbit_exchange_type_consistent_hash.erl | 7 | ||||
-rw-r--r-- | deps/rabbitmq_consistent_hash_exchange/test/rabbit_exchange_type_consistent_hash_SUITE.erl | 97 |
2 files changed, 99 insertions, 5 deletions
diff --git a/deps/rabbitmq_consistent_hash_exchange/src/rabbit_exchange_type_consistent_hash.erl b/deps/rabbitmq_consistent_hash_exchange/src/rabbit_exchange_type_consistent_hash.erl index e9da8a63d5..44d7dca3b2 100644 --- a/deps/rabbitmq_consistent_hash_exchange/src/rabbit_exchange_type_consistent_hash.erl +++ b/deps/rabbitmq_consistent_hash_exchange/src/rabbit_exchange_type_consistent_hash.erl @@ -227,7 +227,6 @@ remove_bindings(none, X, Bindings) -> ok. remove_binding(#binding{source = S, destination = D, key = RK}) -> - Weight = rabbit_data_coercion:to_integer(RK), rabbit_log:debug("Consistent hashing exchange: removing binding " "from exchange '~p' to destination '~p' with routing key '~s'", [rabbit_misc:rs(S), rabbit_misc:rs(D), RK]), @@ -237,7 +236,7 @@ remove_binding(#binding{source = S, destination = D, key = RK}) -> next_bucket_number = NexN0}] -> %% Buckets with lower numbers stay as is; buckets that %% belong to this binding are removed; buckets with - %% greater numbers are updated (their numbers are adjusted downwards by weight) + %% greater numbers are updated (their numbers are adjusted downwards) BucketsOfThisBinding = maps:filter(fun (_K, V) -> V =:= D end, BM0), case maps:size(BucketsOfThisBinding) of 0 -> ok; @@ -251,10 +250,10 @@ remove_binding(#binding{source = S, destination = D, key = RK}) -> %% final state with "down the ring" buckets updated NewBucketsDownTheRing = maps:fold( fun(K0, V, Acc) -> - maps:put(K0 - Weight, V, Acc) + maps:put(K0 - N, V, Acc) end, #{}, BucketsDownTheRing), BM1 = maps:merge(UnchangedBuckets, NewBucketsDownTheRing), - NextN = NexN0 - Weight, + NextN = NexN0 - N, State = State0#chx_hash_ring{bucket_map = BM1, next_bucket_number = NextN}, diff --git a/deps/rabbitmq_consistent_hash_exchange/test/rabbit_exchange_type_consistent_hash_SUITE.erl b/deps/rabbitmq_consistent_hash_exchange/test/rabbit_exchange_type_consistent_hash_SUITE.erl index cd8397a597..1439c45b6d 100644 --- a/deps/rabbitmq_consistent_hash_exchange/test/rabbit_exchange_type_consistent_hash_SUITE.erl +++ b/deps/rabbitmq_consistent_hash_exchange/test/rabbit_exchange_type_consistent_hash_SUITE.erl @@ -41,7 +41,9 @@ groups() -> test_hash_ring_updates_when_exclusive_queues_are_deleted_due_to_connection_closure_case5, test_hash_ring_updates_when_exclusive_queues_are_deleted_due_to_connection_closure_case6, test_hash_ring_updates_when_exchange_is_deleted, - test_hash_ring_updates_when_queue_is_unbound + test_hash_ring_updates_when_queue_is_unbound, + test_hash_ring_updates_when_duplicate_binding_is_created_and_queue_is_deleted, + test_hash_ring_updates_when_duplicate_binding_is_created_and_binding_is_deleted ]} ]. @@ -557,6 +559,99 @@ test_hash_ring_updates_when_queue_is_unbound(Config) -> rabbit_ct_client_helpers:close_channel(Chan), ok. +test_hash_ring_updates_when_duplicate_binding_is_created_and_queue_is_deleted(Config) -> + Chan = rabbit_ct_client_helpers:open_channel(Config, 0), + + X = <<"test_hash_ring_updates_when_duplicate_binding_is_created_and_queue_is_deleted">>, + amqp_channel:call(Chan, #'exchange.delete' {exchange = X}), + + Declare = #'exchange.declare'{exchange = X, + type = <<"x-consistent-hash">>}, + #'exchange.declare_ok'{} = amqp_channel:call(Chan, Declare), + + Q1 = <<"f-q1">>, + #'queue.declare_ok'{} = + amqp_channel:call(Chan, #'queue.declare'{ + queue = Q1, durable = true, exclusive = false}), + #'queue.bind_ok'{} = + amqp_channel:call(Chan, #'queue.bind'{queue = Q1, + exchange = X, + routing_key = <<"2">>}), + + #'queue.bind_ok'{} = + amqp_channel:call(Chan, #'queue.bind'{queue = Q1, + exchange = X, + routing_key = <<"3">>}), + + ?assertEqual(5, count_buckets_of_exchange(Config, X)), + assert_ring_consistency(Config, X), + + Q2 = <<"f-q2">>, + #'queue.declare_ok'{} = + amqp_channel:call(Chan, #'queue.declare'{ + queue = Q2, durable = true, exclusive = false}), + #'queue.bind_ok'{} = + amqp_channel:call(Chan, #'queue.bind'{queue = Q2, + exchange = X, + routing_key = <<"4">>}), + + ?assertEqual(9, count_buckets_of_exchange(Config, X)), + assert_ring_consistency(Config, X), + + amqp_channel:call(Chan, #'queue.delete' {queue = Q1}), + ?assertEqual(4, count_buckets_of_exchange(Config, X)), + assert_ring_consistency(Config, X), + + clean_up_test_topology(Config, X, [Q1, Q2]), + rabbit_ct_client_helpers:close_channel(Chan), + ok. + +test_hash_ring_updates_when_duplicate_binding_is_created_and_binding_is_deleted(Config) -> + Chan = rabbit_ct_client_helpers:open_channel(Config, 0), + + X = <<"test_hash_ring_updates_when_duplicate_binding_is_created_and_binding_is_deleted">>, + amqp_channel:call(Chan, #'exchange.delete' {exchange = X}), + + Declare = #'exchange.declare'{exchange = X, + type = <<"x-consistent-hash">>}, + #'exchange.declare_ok'{} = amqp_channel:call(Chan, Declare), + + Q1 = <<"f-q1">>, + #'queue.declare_ok'{} = + amqp_channel:call(Chan, #'queue.declare'{ + queue = Q1, durable = true, exclusive = false}), + #'queue.bind_ok'{} = + amqp_channel:call(Chan, #'queue.bind'{queue = Q1, + exchange = X, + routing_key = <<"2">>}), + + #'queue.bind_ok'{} = + amqp_channel:call(Chan, #'queue.bind'{queue = Q1, + exchange = X, + routing_key = <<"3">>}), + + Q2 = <<"f-q2">>, + #'queue.declare_ok'{} = + amqp_channel:call(Chan, #'queue.declare'{ + queue = Q2, durable = true, exclusive = false}), + #'queue.bind_ok'{} = + amqp_channel:call(Chan, #'queue.bind'{queue = Q2, + exchange = X, + routing_key = <<"4">>}), + + ?assertEqual(9, count_buckets_of_exchange(Config, X)), + assert_ring_consistency(Config, X), + + %% Both bindings to Q1 will be deleted + amqp_channel:call(Chan, #'queue.unbind'{queue = Q1, + exchange = X, + routing_key = <<"3">>}), + ?assertEqual(4, count_buckets_of_exchange(Config, X)), + assert_ring_consistency(Config, X), + + clean_up_test_topology(Config, X, [Q1, Q2]), + rabbit_ct_client_helpers:close_channel(Chan), + ok. %% %% Helpers |