summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDiana Parra Corbacho <dparracorbac@vmware.com>2023-01-16 13:35:17 +0100
committerDiana Parra Corbacho <dparracorbac@vmware.com>2023-02-02 15:01:42 +0100
commit9cf10ed8a7adb5bf41f619426253aa0661e2158b (patch)
tree1380856b9c28c96e259cf9715449c4b28af6f898
parent7a12cf840d7e0917aec19ec1c6519fe08d9849d1 (diff)
downloadrabbitmq-server-git-9cf10ed8a7adb5bf41f619426253aa0661e2158b.tar.gz
Unit test rabbit_db_* modules, spec and API updates
-rw-r--r--deps/rabbit/BUILD.bazel45
-rw-r--r--deps/rabbit/src/rabbit_amqqueue.erl183
-rw-r--r--deps/rabbit/src/rabbit_amqqueue_process.erl2
-rw-r--r--deps/rabbit/src/rabbit_basic.erl2
-rw-r--r--deps/rabbit/src/rabbit_binding.erl24
-rw-r--r--deps/rabbit/src/rabbit_channel.erl4
-rw-r--r--deps/rabbit/src/rabbit_classic_queue.erl2
-rw-r--r--deps/rabbit/src/rabbit_core_ff.erl3
-rw-r--r--deps/rabbit/src/rabbit_db_binding.erl672
-rw-r--r--deps/rabbit/src/rabbit_db_exchange.erl281
-rw-r--r--deps/rabbit/src/rabbit_db_maintenance.erl9
-rw-r--r--deps/rabbit/src/rabbit_db_msup.erl173
-rw-r--r--deps/rabbit/src/rabbit_db_policy.erl12
-rw-r--r--deps/rabbit/src/rabbit_db_queue.erl824
-rw-r--r--deps/rabbit/src/rabbit_db_topic_exchange.erl79
-rw-r--r--deps/rabbit/src/rabbit_db_user.erl21
-rw-r--r--deps/rabbit/src/rabbit_db_vhost.erl19
-rw-r--r--deps/rabbit/src/rabbit_dead_letter.erl2
-rw-r--r--deps/rabbit/src/rabbit_exchange.erl22
-rw-r--r--deps/rabbit/src/rabbit_exchange_type_topic.erl2
-rw-r--r--deps/rabbit/src/rabbit_fifo_dlx_worker.erl6
-rw-r--r--deps/rabbit/src/rabbit_maintenance.erl14
-rw-r--r--deps/rabbit/src/rabbit_policy.erl8
-rw-r--r--deps/rabbit/src/rabbit_priority_queue.erl2
-rw-r--r--deps/rabbit/src/rabbit_quorum_queue.erl10
-rw-r--r--deps/rabbit/src/rabbit_router.erl37
-rw-r--r--deps/rabbit/src/rabbit_stream_coordinator.erl6
-rw-r--r--deps/rabbit/test/rabbit_db_binding_SUITE.erl331
-rw-r--r--deps/rabbit/test/rabbit_db_exchange_SUITE.erl330
-rw-r--r--deps/rabbit/test/rabbit_db_maintenance_SUITE.erl93
-rw-r--r--deps/rabbit/test/rabbit_db_msup_SUITE.erl136
-rw-r--r--deps/rabbit/test/rabbit_db_policy_SUITE.erl96
-rw-r--r--deps/rabbit/test/rabbit_db_queue_SUITE.erl596
-rw-r--r--deps/rabbit/test/rabbit_db_topic_exchange_SUITE.erl158
-rw-r--r--deps/rabbit/test/topic_permission_SUITE.erl16
-rw-r--r--deps/rabbit/test/unit_access_control_SUITE.erl6
-rw-r--r--deps/rabbitmq_consistent_hash_exchange/BUILD.bazel3
-rw-r--r--deps/rabbitmq_consistent_hash_exchange/src/rabbit_exchange_type_consistent_hash.erl4
-rw-r--r--deps/rabbitmq_federation/src/rabbit_federation_exchange_link.erl8
-rw-r--r--deps/rabbitmq_jms_topic_exchange/BUILD.bazel5
-rw-r--r--deps/rabbitmq_jms_topic_exchange/src/rabbit_jms_topic_exchange.erl6
-rw-r--r--deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl2
-rw-r--r--deps/rabbitmq_random_exchange/BUILD.bazel7
-rw-r--r--deps/rabbitmq_recent_history_exchange/BUILD.bazel5
-rw-r--r--deps/rabbitmq_recent_history_exchange/src/rabbit_db_rh_exchange.erl7
-rw-r--r--deps/rabbitmq_recent_history_exchange/src/rabbit_exchange_type_recent_history.erl6
46 files changed, 3360 insertions, 919 deletions
diff --git a/deps/rabbit/BUILD.bazel b/deps/rabbit/BUILD.bazel
index da8aff3917..d2dc86f326 100644
--- a/deps/rabbit/BUILD.bazel
+++ b/deps/rabbit/BUILD.bazel
@@ -513,15 +513,12 @@ rabbitmq_integration_suite(
],
)
-rabbitmq_suite(
+rabbitmq_integration_suite(
name = "mirrored_supervisor_SUITE",
size = "small",
additional_srcs = [
"test/mirrored_supervisor_SUITE_gs.erl",
- ],
- deps = [
- "//deps/rabbit_common:erlang_app",
- ],
+ ]
)
rabbitmq_suite(
@@ -1096,6 +1093,9 @@ rabbitmq_integration_suite(
rabbitmq_integration_suite(
name = "exchanges_SUITE",
size = "small",
+ additional_beam = [
+ ":quorum_queue_utils",
+ ],
)
rabbitmq_integration_suite(
@@ -1103,6 +1103,41 @@ rabbitmq_integration_suite(
size = "small",
)
+rabbitmq_integration_suite(
+ name = "rabbit_db_queue_SUITE",
+ size = "small",
+)
+
+rabbitmq_integration_suite(
+ name = "rabbit_db_maintenance_SUITE",
+ size = "small",
+)
+
+rabbitmq_integration_suite(
+ name = "rabbit_db_topic_exchange_SUITE",
+ size = "small",
+)
+
+rabbitmq_integration_suite(
+ name = "rabbit_db_exchange_SUITE",
+ size = "small",
+)
+
+rabbitmq_integration_suite(
+ name = "rabbit_db_binding_SUITE",
+ size = "small",
+)
+
+rabbitmq_integration_suite(
+ name = "rabbit_db_msup_SUITE",
+ size = "small",
+)
+
+rabbitmq_integration_suite(
+ name = "rabbit_db_policy_SUITE",
+ size = "small",
+)
+
assert_suites()
filegroup(
diff --git a/deps/rabbit/src/rabbit_amqqueue.erl b/deps/rabbit/src/rabbit_amqqueue.erl
index 98a27915fb..d3a3e59000 100644
--- a/deps/rabbit/src/rabbit_amqqueue.erl
+++ b/deps/rabbit/src/rabbit_amqqueue.erl
@@ -12,7 +12,7 @@
delete_immediately/1, delete_exclusive/2, delete/4, purge/1,
forget_all_durable/1]).
-export([pseudo_queue/2, pseudo_queue/3, immutable/1]).
--export([exists/1, lookup/1, lookup/2, lookup_many/1,
+-export([exists/1, lookup/1, lookup/2, lookup_many/1, lookup_durable_queue/1,
not_found_or_absent_dirty/1,
with/2, with/3, with_or_die/2,
assert_equivalence/5,
@@ -162,24 +162,28 @@ start(Qs) ->
amqqueue:is_classic(Q)],
ok.
-mark_local_durable_queues_stopped(VHost) ->
- Qs0 = find_local_durable_queues(VHost),
- Qs = [amqqueue:set_state(Q, stopped)
- || Q <- Qs0, amqqueue:get_type(Q) =:= rabbit_classic_queue,
- amqqueue:get_state(Q) =/= stopped ],
- rabbit_db_queue:insert(Qs).
-
-find_local_durable_queues(VHost) ->
- Qs = rabbit_db_queue:get_all_durable(VHost),
- lists:filter(fun(Q) ->
- rabbit_queue_type:is_recoverable(Q)
- end, Qs).
+mark_local_durable_queues_stopped(VHostName) ->
+ rabbit_db_queue:update_durable(
+ fun(Q) ->
+ amqqueue:set_state(Q, stopped)
+ end,
+ fun(Q) ->
+ amqqueue:get_vhost(Q) =:= VHostName andalso
+ rabbit_queue_type:is_recoverable(Q) andalso
+ amqqueue:get_type(Q) =:= rabbit_classic_queue andalso
+ amqqueue:get_state(Q) =/= stopped
+ end).
+
+find_local_durable_queues(VHostName) ->
+ rabbit_db_queue:filter_all_durable(fun(Q) ->
+ amqqueue:get_vhost(Q) =:= VHostName andalso
+ rabbit_queue_type:is_recoverable(Q)
+ end).
find_recoverable_queues() ->
- Qs = rabbit_db_queue:get_all_durable(),
- lists:filter(fun(Q) ->
- rabbit_queue_type:is_recoverable(Q)
- end, Qs).
+ rabbit_db_queue:filter_all_durable(fun(Q) ->
+ rabbit_queue_type:is_recoverable(Q)
+ end).
-spec declare(name(),
boolean(),
@@ -248,13 +252,12 @@ internal_declare(Q, Recover) ->
do_internal_declare(Q0, true) ->
Q = amqqueue:set_state(Q0, live),
- store_queue(Q),
+ ok = store_queue(Q),
{created, Q0};
do_internal_declare(Q0, false) ->
Q = rabbit_policy:set(amqqueue:set_state(Q0, live)),
Queue = rabbit_queue_decorator:set(Q),
- DurableQueue = amqqueue:reset_mirroring_and_decorators(Q),
- rabbit_db_queue:create_or_get(DurableQueue, Queue).
+ rabbit_db_queue:create_or_get(Queue).
-spec update
(name(), fun((amqqueue:amqqueue()) -> amqqueue:amqqueue())) ->
@@ -272,8 +275,7 @@ ensure_rabbit_queue_record_is_initialized(Q) ->
store_queue(Q0) ->
Q = rabbit_queue_decorator:set(Q0),
- DurableQ = amqqueue:reset_mirroring_and_decorators(Q0),
- rabbit_db_queue:insert(DurableQ, Q).
+ rabbit_db_queue:set(Q).
-spec update_decorators(name()) -> 'ok'.
@@ -316,14 +318,17 @@ is_server_named_allowed(Args) ->
([name()]) ->
[amqqueue:amqqueue()].
-lookup([]) -> []; %% optimisation
-lookup(Names) ->
- rabbit_db_queue:get(Names).
+lookup(Name) when is_record(Name, resource) ->
+ rabbit_db_queue:get(Name).
+
+lookup_durable_queue(QName) ->
+ rabbit_db_queue:get_durable(QName).
-spec lookup_many ([name()]) -> [amqqueue:amqqueue()].
+lookup_many([]) -> []; %% optimisation
lookup_many(Names) when is_list(Names) ->
- lookup(Names).
+ rabbit_db_queue:get_many(Names).
-spec lookup(binary(), binary()) ->
rabbit_types:ok(amqqueue:amqqueue()) |
@@ -341,7 +346,15 @@ exists(Name) ->
-spec not_found_or_absent_dirty(name()) -> not_found_or_absent().
not_found_or_absent_dirty(Name) ->
- rabbit_db_queue:not_found_or_absent_queue_dirty(Name).
+ %% We should read from both tables inside a tx, to get a
+ %% consistent view. But the chances of an inconsistency are small,
+ %% and only affect the error kind.
+ case rabbit_db_queue:get_durable(Name) of
+ {error, not_found} ->
+ not_found;
+ {ok, Q} ->
+ {absent, Q, nodedown}
+ end.
-spec get_rebalance_lock(pid()) ->
{true, {rebalance_queues, pid()}} | false.
@@ -542,7 +555,7 @@ with(#resource{} = Name, F, E, RetriesLeft) ->
fun () -> retry_wait(Q, F, E, RetriesLeft) end,
fun () -> F(Q) end);
{error, not_found} ->
- E(rabbit_db_queue:not_found_or_absent_queue_dirty(Name))
+ E(not_found_or_absent_dirty(Name))
end.
-spec retry_wait(amqqueue:amqqueue(),
@@ -1239,16 +1252,18 @@ list_down(VHostPath) ->
false -> [];
true ->
Alive = sets:from_list([amqqueue:get_name(Q) || Q <- list(VHostPath)]),
- Durable = rabbit_db_queue:get_all_durable(VHostPath),
NodesRunning = rabbit_nodes:all_running(),
- lists:filter(fun (Q) ->
- N = amqqueue:get_name(Q),
- Pid = amqqueue:get_pid(Q),
- St = amqqueue:get_state(Q),
- (St =:= stopped andalso not lists:member(node(Pid), NodesRunning))
- orelse
- (not sets:is_element(N, Alive))
- end, Durable)
+ rabbit_db_queue:filter_all_durable(
+ fun (Q) ->
+ N = amqqueue:get_name(Q),
+ Pid = amqqueue:get_pid(Q),
+ St = amqqueue:get_state(Q),
+ amqqueue:get_vhost(Q) =:= VHostPath
+ andalso
+ ((St =:= stopped andalso not lists:member(node(Pid), NodesRunning))
+ orelse
+ (not sets:is_element(N, Alive)))
+ end)
end.
count(VHost) ->
@@ -1671,7 +1686,7 @@ internal_delete(QueueName, ActingUser, Reason) ->
ok ->
ok;
Deletions ->
- rabbit_binding:process_deletions(Deletions),
+ _ = rabbit_binding:process_deletions(Deletions),
rabbit_binding:notify_deletions(Deletions, ?INTERNAL_USER),
rabbit_core_metrics:queue_deleted(QueueName),
ok = rabbit_event:notify(queue_deleted,
@@ -1683,12 +1698,12 @@ internal_delete(QueueName, ActingUser, Reason) ->
forget_all_durable(Node) ->
UpdateFun = fun(Q) ->
- forget_node_for_queue(Node, Q)
- end,
+ forget_node_for_queue(Node, Q)
+ end,
FilterFun = fun(Q) ->
is_local_to_node(amqqueue:get_pid(Q), Node)
end,
- rabbit_db_queue:match_and_update(amqqueue:pattern_match_all(), UpdateFun, FilterFun).
+ rabbit_db_queue:foreach_durable(UpdateFun, FilterFun).
%% Try to promote a mirror while down - it should recover as a
%% leader. We try to take the oldest mirror here for best chance of
@@ -1717,7 +1732,11 @@ forget_node_for_queue(DeadNode, [H|T], Q) when ?is_amqqueue(Q) ->
{false, _} -> forget_node_for_queue(DeadNode, T, Q);
{true, rabbit_classic_queue} ->
Q1 = amqqueue:set_pid(Q, rabbit_misc:node_to_fake_pid(H)),
- ok = rabbit_db_queue:insert([Q1]);
+ %% rabbit_db_queue:set_many/1 just stores a durable queue record,
+ %% that is the only one required here.
+ %% rabbit_db_queue:set/1 writes both durable and transient, thus
+ %% can't be used for this operation.
+ ok = rabbit_db_queue:set_many([Q1]);
{true, rabbit_quorum_queue} ->
ok
end.
@@ -1809,43 +1828,45 @@ has_synchronised_mirrors_online(Q) ->
-spec on_node_up(node()) -> 'ok'.
on_node_up(Node) ->
- rabbit_db_queue:on_node_up(Node, fun maybe_clear_recoverable_node/2).
-
-maybe_clear_recoverable_node(Node, Q) ->
- SPids = amqqueue:get_sync_slave_pids(Q),
- RSs = amqqueue:get_recoverable_slaves(Q),
- case lists:member(Node, RSs) of
- true ->
- %% There is a race with
- %% rabbit_mirror_queue_slave:record_synchronised/1 called
- %% by the incoming mirror node and this function, called
- %% by the leader node. If this function is executed after
- %% record_synchronised/1, the node is erroneously removed
- %% from the recoverable mirror list.
- %%
- %% We check if the mirror node's queue PID is alive. If it is
- %% the case, then this function is executed after. In this
- %% situation, we don't touch the queue record, it is already
- %% correct.
- DoClearNode =
- case [SP || SP <- SPids, node(SP) =:= Node] of
- [SPid] -> not rabbit_misc:is_process_alive(SPid);
- _ -> true
- end,
- if
- DoClearNode -> RSs1 = RSs -- [Node],
- store_queue(
- amqqueue:set_recoverable_slaves(Q, RSs1));
- true -> ok
- end;
- false ->
- ok
+ rabbit_db_queue:foreach_transient(maybe_clear_recoverable_node(Node)).
+
+maybe_clear_recoverable_node(Node) ->
+ fun(Q) ->
+ SPids = amqqueue:get_sync_slave_pids(Q),
+ RSs = amqqueue:get_recoverable_slaves(Q),
+ case lists:member(Node, RSs) of
+ true ->
+ %% There is a race with
+ %% rabbit_mirror_queue_slave:record_synchronised/1 called
+ %% by the incoming mirror node and this function, called
+ %% by the leader node. If this function is executed after
+ %% record_synchronised/1, the node is erroneously removed
+ %% from the recoverable mirror list.
+ %%
+ %% We check if the mirror node's queue PID is alive. If it is
+ %% the case, then this function is executed after. In this
+ %% situation, we don't touch the queue record, it is already
+ %% correct.
+ DoClearNode =
+ case [SP || SP <- SPids, node(SP) =:= Node] of
+ [SPid] -> not rabbit_misc:is_process_alive(SPid);
+ _ -> true
+ end,
+ if
+ DoClearNode -> RSs1 = RSs -- [Node],
+ store_queue(
+ amqqueue:set_recoverable_slaves(Q, RSs1));
+ true -> ok
+ end;
+ false ->
+ ok
+ end
end.
-spec on_node_down(node()) -> 'ok'.
on_node_down(Node) ->
- {Time, Ret} = timer:tc(fun() -> rabbit_db_queue:on_node_down(Node, fun filter_transient_queues_to_delete/2) end),
+ {Time, Ret} = timer:tc(fun() -> rabbit_db_queue:delete_transient(filter_transient_queues_to_delete(Node)) end),
case Ret of
ok -> ok;
{QueueNames, Deletions} ->
@@ -1859,12 +1880,14 @@ on_node_down(Node) ->
ok
end.
-filter_transient_queues_to_delete(Node, Q) ->
- amqqueue:qnode(Q) == Node andalso
- not rabbit_mnesia:is_process_alive(amqqueue:get_pid(Q))
- andalso (not amqqueue:is_classic(Q) orelse not amqqueue:is_durable(Q))
- andalso (not rabbit_amqqueue:is_replicated(Q)
- orelse rabbit_amqqueue:is_dead_exclusive(Q)).
+filter_transient_queues_to_delete(Node) ->
+ fun(Q) ->
+ amqqueue:qnode(Q) == Node andalso
+ not rabbit_mnesia:is_process_alive(amqqueue:get_pid(Q))
+ andalso (not amqqueue:is_classic(Q) orelse not amqqueue:is_durable(Q))
+ andalso (not rabbit_amqqueue:is_replicated(Q)
+ orelse rabbit_amqqueue:is_dead_exclusive(Q))
+ end.
notify_queue_binding_deletions(QueueDeletions) when is_list(QueueDeletions) ->
Deletions = rabbit_binding:process_deletions(
diff --git a/deps/rabbit/src/rabbit_amqqueue_process.erl b/deps/rabbit/src/rabbit_amqqueue_process.erl
index 091a300125..d9f7922995 100644
--- a/deps/rabbit/src/rabbit_amqqueue_process.erl
+++ b/deps/rabbit/src/rabbit_amqqueue_process.erl
@@ -283,7 +283,7 @@ terminate(shutdown = R, State = #q{backing_queue = BQ, q = Q0}) ->
rabbit_core_metrics:queue_deleted(qname(State)),
terminate_shutdown(
fun (BQS) ->
- update_state(stopped, Q0),
+ _ = update_state(stopped, Q0),
BQ:terminate(R, BQS)
end, State);
terminate({shutdown, missing_owner} = Reason, State) ->
diff --git a/deps/rabbit/src/rabbit_basic.erl b/deps/rabbit/src/rabbit_basic.erl
index 4c67af1249..305049cde6 100644
--- a/deps/rabbit/src/rabbit_basic.erl
+++ b/deps/rabbit/src/rabbit_basic.erl
@@ -68,7 +68,7 @@ publish(Delivery = #delivery{
end.
publish(X, Delivery) ->
- Qs = rabbit_amqqueue:lookup(rabbit_exchange:route(X, Delivery)),
+ Qs = rabbit_amqqueue:lookup_many(rabbit_exchange:route(X, Delivery)),
_ = rabbit_queue_type:deliver(Qs, Delivery, stateless),
ok.
diff --git a/deps/rabbit/src/rabbit_binding.erl b/deps/rabbit/src/rabbit_binding.erl
index 52bdce4ebe..b418f0a51b 100644
--- a/deps/rabbit/src/rabbit_binding.erl
+++ b/deps/rabbit/src/rabbit_binding.erl
@@ -9,7 +9,7 @@
-include_lib("rabbit_common/include/rabbit.hrl").
-include("amqqueue.hrl").
--export([recover/0, recover/2, exists/1, add/2, add/3, remove/3]).
+-export([recover/0, recover/2, exists/1, add/2, add/3, remove/2, remove/3]).
-export([list/1, list_for_source/1, list_for_destination/1,
list_for_source_and_destination/2, list_for_source_and_destination/3,
list_explicit/0]).
@@ -150,24 +150,27 @@ binding_type0(false, true) ->
binding_type0(_, _) ->
transient.
+-spec remove(rabbit_types:binding(), rabbit_types:username()) -> bind_res().
+remove(Binding, ActingUser) -> remove(Binding, fun (_Src, _Dst) -> ok end, ActingUser).
+
-spec remove(rabbit_types:binding(), inner_fun(), rabbit_types:username()) -> bind_res().
remove(Binding0, InnerFun, ActingUser) ->
Binding = sort_args(Binding0),
case
rabbit_db_binding:delete(Binding, InnerFun)
of
- ok ->
- ok;
{error, _} = Err ->
Err;
- Deletions ->
+ ok ->
+ ok;
+ {ok, Deletions} ->
notify_deletions(Deletions, ActingUser)
end.
-spec list_explicit() -> bindings().
list_explicit() ->
- rabbit_db_binding:get_all_explicit().
+ rabbit_db_binding:get_all().
-spec list(rabbit_types:vhost()) -> bindings().
@@ -295,6 +298,15 @@ group_bindings_fold(Fun, Acc, [B = #binding{source = SrcName} | Bs],
OnlyDurable) ->
group_bindings_fold(Fun, SrcName, Acc, Bs, [B], OnlyDurable).
+-spec group_bindings_fold(Fun, Name, Deletions, [Binding], [Binding], OnlyDurable)
+ -> Ret when
+ Fun :: fun((Name, [Binding], Deletions, OnlyDurable) ->
+ Deletions),
+ Name :: rabbit_exchange:name(),
+ Deletions :: rabbit_binding:deletions(),
+ Binding :: rabbit_types:binding(),
+ OnlyDurable :: boolean(),
+ Ret :: Deletions.
group_bindings_fold(
Fun, SrcName, Acc, [B = #binding{source = SrcName} | Bs], Bindings,
OnlyDurable) ->
@@ -399,7 +411,7 @@ notify_bindings_deletion(Bs, ActingUser) ->
|| B <- Bs],
ok.
--spec process_deletions(deletions()) -> rabbit_misc:thunk('ok').
+-spec process_deletions(deletions()) -> deletions().
process_deletions(Deletions) ->
dict:map(fun (_XName, {X, deleted, Bindings}) ->
Bs = lists:flatten(Bindings),
diff --git a/deps/rabbit/src/rabbit_channel.erl b/deps/rabbit/src/rabbit_channel.erl
index 2d5ccdcad4..70bce8cab0 100644
--- a/deps/rabbit/src/rabbit_channel.erl
+++ b/deps/rabbit/src/rabbit_channel.erl
@@ -2154,7 +2154,7 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ex
confirm = Confirm,
msg_seq_no = MsgSeqNo},
RoutedToQueueNames = [QName]}, State0 = #ch{queue_states = QueueStates0}) -> %% optimisation when there is one queue
- Qs0 = rabbit_amqqueue:lookup(RoutedToQueueNames),
+ Qs0 = rabbit_amqqueue:lookup_many(RoutedToQueueNames),
Qs = rabbit_amqqueue:prepend_extra_bcc(Qs0),
QueueNames = lists:map(fun amqqueue:get_name/1, Qs),
case rabbit_queue_type:deliver(Qs, Delivery, QueueStates0) of
@@ -2191,7 +2191,7 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ex
confirm = Confirm,
msg_seq_no = MsgSeqNo},
RoutedToQueueNames}, State0 = #ch{queue_states = QueueStates0}) ->
- Qs0 = rabbit_amqqueue:lookup(RoutedToQueueNames),
+ Qs0 = rabbit_amqqueue:lookup_many(RoutedToQueueNames),
Qs = rabbit_amqqueue:prepend_extra_bcc(Qs0),
QueueNames = lists:map(fun amqqueue:get_name/1, Qs),
case rabbit_queue_type:deliver(Qs, Delivery, QueueStates0) of
diff --git a/deps/rabbit/src/rabbit_classic_queue.erl b/deps/rabbit/src/rabbit_classic_queue.erl
index 153889ce46..940b122a4f 100644
--- a/deps/rabbit/src/rabbit_classic_queue.erl
+++ b/deps/rabbit/src/rabbit_classic_queue.erl
@@ -127,7 +127,7 @@ is_recoverable(Q) when ?is_amqqueue(Q) ->
%% record if it is a mirrored queue (such info is now obtained from
%% the policy). Thus, we must check if the local pid is alive
%% - if the record is present - in order to restart.
- (rabbit_amqqueue:exists(Q)
+ (not rabbit_db_queue:consistent_exists(amqqueue:get_name(Q))
orelse not rabbit_mnesia:is_process_alive(amqqueue:get_pid(Q))).
recover(VHost, Queues) ->
diff --git a/deps/rabbit/src/rabbit_core_ff.erl b/deps/rabbit/src/rabbit_core_ff.erl
index 3ccf55336a..1a816d03a5 100644
--- a/deps/rabbit/src/rabbit_core_ff.erl
+++ b/deps/rabbit/src/rabbit_core_ff.erl
@@ -134,6 +134,7 @@
Ret :: rabbit_feature_flags:enable_callback_ret().
direct_exchange_routing_v2_enable(#{feature_name := FeatureName}) ->
TableName = rabbit_index_route,
+ ok = rabbit_table:wait([rabbit_route, rabbit_exchange], _Retry = true),
try
case rabbit_db_binding:create_index_route_table() of
ok ->
@@ -142,7 +143,7 @@ direct_exchange_routing_v2_enable(#{feature_name := FeatureName}) ->
?LOG_ERROR(
"Feature flags: `~ts`: failed to add copy of table ~ts to "
"node ~tp: ~tp",
- [FeatureName, NewTable, node(), Err],
+ [FeatureName, TableName, node(), Err],
#{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}),
Error
end
diff --git a/deps/rabbit/src/rabbit_db_binding.erl b/deps/rabbit/src/rabbit_db_binding.erl
index 90526dbc91..b653cc834a 100644
--- a/deps/rabbit/src/rabbit_db_binding.erl
+++ b/deps/rabbit/src/rabbit_db_binding.erl
@@ -9,8 +9,14 @@
-include_lib("rabbit_common/include/rabbit.hrl").
--export([exists/1, create/2, delete/2, get_all/1, get_all_for_source/1,
- get_all_for_destination/1, get_all/3, get_all_explicit/0,
+-export([exists/1,
+ create/2,
+ delete/2,
+ get_all/0,
+ get_all/1,
+ get_all/3,
+ get_all_for_source/1,
+ get_all_for_destination/1,
fold/2]).
%% Routing. These functions are in the hot code path
@@ -28,6 +34,15 @@
-export([create_index_route_table/0]).
+%% For testing
+-export([clear/0]).
+
+-define(MNESIA_TABLE, rabbit_route).
+-define(MNESIA_DURABLE_TABLE, rabbit_durable_route).
+-define(MNESIA_SEMI_DURABLE_TABLE, rabbit_semi_durable_route).
+-define(MNESIA_REVERSE_TABLE, rabbit_reverse_route).
+-define(MNESIA_INDEX_TABLE, rabbit_index_route).
+
%% -------------------------------------------------------------------
%% exists().
%% -------------------------------------------------------------------
@@ -49,22 +64,43 @@ exists(Binding) ->
exists_in_mnesia(Binding) ->
binding_action_in_mnesia(
Binding, fun (_Src, _Dst) ->
- rabbit_misc:const(mnesia:read({rabbit_route, Binding}) /= [])
+ rabbit_misc:const(mnesia:read({?MNESIA_TABLE, Binding}) /= [])
end, fun not_found_or_absent_errs_in_mnesia/1).
+binding_action_in_mnesia(#binding{source = SrcName,
+ destination = DstName}, Fun, ErrFun) ->
+ SrcTable = table_for_resource(SrcName),
+ DstTable = table_for_resource(DstName),
+ rabbit_mnesia:execute_mnesia_tx_with_tail(
+ fun () ->
+ case {mnesia:read({SrcTable, SrcName}),
+ mnesia:read({DstTable, DstName})} of
+ {[Src], [Dst]} -> Fun(Src, Dst);
+ {[], [_] } -> ErrFun([SrcName]);
+ {[_], [] } -> ErrFun([DstName]);
+ {[], [] } -> ErrFun([SrcName, DstName])
+ end
+ end).
+
+table_for_resource(#resource{kind = exchange}) -> rabbit_exchange;
+table_for_resource(#resource{kind = queue}) -> rabbit_queue.
+
+not_found_or_absent_errs_in_mnesia(Names) ->
+ Errs = [not_found_or_absent_in_mnesia(Name) || Name <- Names],
+ rabbit_misc:const({error, {resources_missing, Errs}}).
+
%% -------------------------------------------------------------------
%% create().
%% -------------------------------------------------------------------
-spec create(Binding, ChecksFun) -> Ret when
Binding :: rabbit_types:binding(),
- Src :: rabbit_types:r('exchange'),
- Dst :: rabbit_types:r('exchange') | rabbit_types:r('queue'),
- BindingType :: durable | semi_durable | transient,
- ChecksFun :: fun((Src, Dst) -> {ok, BindingType} | {error, Reason :: any()}),
+ Src :: rabbit_types:binding_source(),
+ Dst :: rabbit_types:binding_destination(),
+ ChecksFun :: fun((Src, Dst) -> ok | {error, Reason :: any()}),
Ret :: ok | {error, Reason :: any()}.
%% @doc Writes a binding if it doesn't exist already and passes the validation in
-%% `ChecksFun` i.e. exclusive access
+%% `ChecksFun' i.e. exclusive access
%%
%% @returns ok, or an error if the validation has failed.
%%
@@ -75,19 +111,43 @@ create(Binding, ChecksFun) ->
#{mnesia => fun() -> create_in_mnesia(Binding, ChecksFun) end
}).
+create_in_mnesia(Binding, ChecksFun) ->
+ binding_action_in_mnesia(
+ Binding,
+ fun (Src, Dst) ->
+ lock_resource(Src, read),
+ lock_resource(Dst, read),
+ case ChecksFun(Src, Dst) of
+ ok ->
+ BindingType = rabbit_binding:binding_type(Src, Dst),
+ case mnesia:read({?MNESIA_TABLE, Binding}) of
+ [] ->
+ ok = sync_route(#route{binding = Binding}, BindingType,
+ should_index_table(Src), fun mnesia:write/3),
+ MaybeSerial = rabbit_exchange:serialise_events(Src),
+ Serial = serial_in_mnesia(MaybeSerial, Src),
+ fun () ->
+ rabbit_exchange:callback(Src, add_binding, Serial, [Src, Binding])
+ end;
+ [_] -> fun () -> ok end
+ end;
+ {error, _} = Err ->
+ rabbit_misc:const(Err)
+ end
+ end, fun not_found_or_absent_errs_in_mnesia/1).
+
%% -------------------------------------------------------------------
%% delete().
%% -------------------------------------------------------------------
-spec delete(Binding, ChecksFun) -> Ret when
Binding :: rabbit_types:binding(),
- Src :: rabbit_types:r('exchange'),
- Dst :: rabbit_types:r('exchange') | rabbit_types:r('queue'),
- BindingType :: durable | semi_durable | transient,
- ChecksFun :: fun((Src, Dst) -> {ok, BindingType} | {error, Reason :: any()}),
- Ret :: ok | {error, Reason :: any()}.
+ Src :: rabbit_types:binding_source(),
+ Dst :: rabbit_types:binding_destination(),
+ ChecksFun :: fun((Src, Dst) -> ok | {error, Reason :: any()}),
+ Ret :: ok | {ok, rabbit_binding:deletions()} | {error, Reason :: any()}.
%% @doc Deletes a binding record from the database if it passes the validation in
-%% `ChecksFun`. It also triggers the deletion of auto-delete exchanges if needed.
+%% `ChecksFun'. It also triggers the deletion of auto-delete exchanges if needed.
%%
%% @private
@@ -96,10 +156,82 @@ delete(Binding, ChecksFun) ->
#{mnesia => fun() -> delete_in_mnesia(Binding, ChecksFun) end
}).
+delete_in_mnesia(Binding, ChecksFun) ->
+ binding_action_in_mnesia(
+ Binding,
+ fun (Src, Dst) ->
+ lock_resource(Src, read),
+ lock_resource(Dst, read),
+ case mnesia:read(?MNESIA_TABLE, Binding, write) of
+ [] -> case mnesia:read(?MNESIA_DURABLE_TABLE, Binding, write) of
+ [] -> rabbit_misc:const(ok);
+ %% We still delete the binding and run
+ %% all post-delete functions if there is only
+ %% a durable route in the database
+ _ -> delete_in_mnesia(Src, Dst, Binding)
+ end;
+ _ -> case ChecksFun(Src, Dst) of
+ ok -> delete_in_mnesia(Src, Dst, Binding);
+ {error, _} = Err -> rabbit_misc:const(Err)
+ end
+ end
+ end, fun absent_errs_only_in_mnesia/1).
+
+-spec delete_in_mnesia(Src, Dst, Binding) -> Ret when
+ Src :: rabbit_types:exchange() | amqqueue:amqqueue(),
+ Dst :: rabbit_types:exchange() | amqqueue:amqqueue(),
+ Binding :: rabbit_types:binding(),
+ Ret :: fun(() -> rabbit_binding:deletions()).
+delete_in_mnesia(Src, Dst, B) ->
+ ok = sync_route(#route{binding = B}, rabbit_binding:binding_type(Src, Dst),
+ should_index_table(Src), fun delete/3),
+ Deletions0 = maybe_auto_delete_exchange_in_mnesia(
+ B#binding.source, [B], rabbit_binding:new_deletions(), false),
+ fun() -> {ok, rabbit_binding:process_deletions(Deletions0)} end.
+
+absent_errs_only_in_mnesia(Names) ->
+ Errs = [E || Name <- Names,
+ {absent, _Q, _Reason} = E <- [not_found_or_absent_in_mnesia(Name)]],
+ rabbit_misc:const(case Errs of
+ [] -> ok;
+ _ -> {error, {resources_missing, Errs}}
+ end).
+
+not_found_or_absent_in_mnesia(#resource{kind = exchange} = Name) ->
+ {not_found, Name};
+not_found_or_absent_in_mnesia(#resource{kind = queue} = Name) ->
+ %% NB: we assume that the caller has already performed a lookup on
+ %% rabbit_queue and not found anything
+ case rabbit_db_queue:get_durable_in_mnesia_tx(Name) of
+ {error, not_found} -> {not_found, Name};
+ {ok, Q} -> {absent, Q, nodedown}
+ end.
+
%% -------------------------------------------------------------------
%% get_all().
%% -------------------------------------------------------------------
+-spec get_all() -> [Binding] when
+ Binding :: rabbit_types:binding().
+%% @doc Returns all explicit binding records, the bindings explicitly added and not
+%% automatically generated to the default exchange.
+%%
+%% @returns the list of binding records.
+%%
+%% @private
+
+get_all() ->
+ rabbit_db:run(
+ #{mnesia => fun() -> get_all_in_mnesia() end
+ }).
+
+get_all_in_mnesia() ->
+ mnesia:async_dirty(
+ fun () ->
+ AllRoutes = mnesia:dirty_match_object(?MNESIA_TABLE, #route{_ = '_'}),
+ [B || #route{binding = B} <- AllRoutes]
+ end).
+
-spec get_all(VHostName) -> [Binding] when
VHostName :: vhost:name(),
Binding :: rabbit_types:binding().
@@ -120,10 +252,38 @@ get_all_in_mnesia(VHost) ->
destination = VHostResource,
_ = '_'},
_ = '_'},
- [B || #route{binding = B} <- rabbit_db:list_in_mnesia(rabbit_route, Match)].
+ [B || #route{binding = B} <- rabbit_db:list_in_mnesia(?MNESIA_TABLE, Match)].
+
+-spec get_all(Src, Dst, Reverse) -> [Binding] when
+ Src :: rabbit_types:binding_source(),
+ Dst :: rabbit_types:binding_destination(),
+ Reverse :: boolean(),
+ Binding :: rabbit_types:binding().
+%% @doc Returns all binding records for a given source and destination
+%% in the given virtual host.
+%%
+%% @returns the list of binding records.
+%%
+%% @private
+
+get_all(SrcName, DstName, Reverse) ->
+ rabbit_db:run(
+ #{mnesia => fun() -> get_all_in_mnesia(SrcName, DstName, Reverse) end
+ }).
+
+get_all_in_mnesia(SrcName, DstName, Reverse) ->
+ Route = #route{binding = #binding{source = SrcName,
+ destination = DstName,
+ _ = '_'}},
+ Fun = list_for_route(Route, Reverse),
+ mnesia:async_dirty(Fun).
+
+%% -------------------------------------------------------------------
+%% get_all_for_source().
+%% -------------------------------------------------------------------
-spec get_all_for_source(Src) -> [Binding] when
- Src :: rabbit_types:r('exchange'),
+ Src :: rabbit_types:binding_source(),
Binding :: rabbit_types:binding().
%% @doc Returns all binding records for a given exchange in the given virtual host.
%%
@@ -143,18 +303,22 @@ get_all_for_source_in_mnesia(Resource) ->
list_for_route(Route, false) ->
fun() ->
- [B || #route{binding = B} <- mnesia:match_object(rabbit_route, Route, read)]
+ [B || #route{binding = B} <- mnesia:match_object(?MNESIA_TABLE, Route, read)]
end;
list_for_route(Route, true) ->
fun() ->
[rabbit_binding:reverse_binding(B) ||
#reverse_route{reverse_binding = B} <-
- mnesia:match_object(rabbit_reverse_route,
+ mnesia:match_object(?MNESIA_REVERSE_TABLE,
rabbit_binding:reverse_route(Route), read)]
end.
+%% -------------------------------------------------------------------
+%% get_all_for_destination().
+%% -------------------------------------------------------------------
+
-spec get_all_for_destination(Dst) -> [Binding] when
- Dst :: rabbit_types:r('exchange') | rabbit_types:r('queue'),
+ Dst :: rabbit_types:binding_destination(),
Binding :: rabbit_types:binding().
%% @doc Returns all binding records for a given exchange or queue destination
%% in the given virtual host.
@@ -174,50 +338,9 @@ get_all_for_destination_in_mnesia(Dst) ->
Fun = list_for_route(Route, true),
mnesia:async_dirty(Fun).
--spec get_all(Src, Dst, Reverse) -> [Binding] when
- Src :: rabbit_types:r('exchange'),
- Dst :: rabbit_types:r('exchange') | rabbit_types:r('queue'),
- Reverse :: boolean(),
- Binding :: rabbit_types:binding().
-%% @doc Returns all binding records for a given source and destination
-%% in the given virtual host.
-%%
-%% @returns the list of binding records.
-%%
-%% @private
-
-get_all(SrcName, DstName, Reverse) ->
- rabbit_db:run(
- #{mnesia => fun() -> get_all_in_mnesia(SrcName, DstName, Reverse) end
- }).
-
-get_all_in_mnesia(SrcName, DstName, Reverse) ->
- Route = #route{binding = #binding{source = SrcName,
- destination = DstName,
- _ = '_'}},
- Fun = list_for_route(Route, Reverse),
- mnesia:async_dirty(Fun).
-
--spec get_all_explicit() -> [Binding] when
- Binding :: rabbit_types:binding().
-%% @doc Returns all explicit binding records, the bindings explicitly added and not
-%% automatically generated to the default exchange.
-%%
-%% @returns the list of binding records.
-%%
-%% @private
-
-get_all_explicit() ->
- rabbit_db:run(
- #{mnesia => fun() -> get_all_explicit_in_mnesia() end
- }).
-
-get_all_explicit_in_mnesia() ->
- mnesia:async_dirty(
- fun () ->
- AllRoutes = mnesia:dirty_match_object(rabbit_route, #route{_ = '_'}),
- [B || #route{binding = B} <- AllRoutes]
- end).
+%% -------------------------------------------------------------------
+%% fold().
+%% -------------------------------------------------------------------
-spec fold(Fun, Acc) -> Acc when
Fun :: fun((Binding :: rabbit_types:binding(), Acc) -> Acc),
@@ -238,44 +361,24 @@ fold(Fun, Acc) ->
fold_in_mnesia(Fun, Acc) ->
ets:foldl(fun(#route{binding = Binding}, Acc0) ->
Fun(Binding, Acc0)
- end, Acc, rabbit_route).
-
-recover() ->
- rabbit_db:run(
- #{mnesia => fun() -> recover_in_mnesia() end
- }).
-
-recover(RecoverFun) ->
- rabbit_db:run(
- #{mnesia => fun() -> recover_in_mnesia(RecoverFun) end
- }).
-
-recover_in_mnesia(RecoverFun) ->
- [RecoverFun(Route, Src, Dst, fun recover_semi_durable_route/2) ||
- #route{binding = #binding{destination = Dst,
- source = Src}} = Route <-
- rabbit_mnesia:dirty_read_all(rabbit_semi_durable_route)].
-
-create_index_route_table() ->
- rabbit_db:run(
- #{mnesia => fun() -> create_index_route_table_in_mnesia() end
- }).
-
-create_index_route_table_in_mnesia() ->
- TableName = rabbit_index_route,
- DependantTables = [rabbit_route, rabbit_exchange],
- ok = rabbit_table:wait(DependantTables, _Retry = true),
- [ok = rabbit_table:create_local_copy(Tab, ram_copies) || Tab <- DependantTables],
- ok = rabbit_table:create(
- TableName, rabbit_table:rabbit_index_route_definition()),
- case rabbit_table:ensure_table_copy(TableName, node(), ram_copies) of
- ok ->
- ok = populate_index_route_table_in_mnesia();
- Error ->
- Error
- end.
+ end, Acc, ?MNESIA_TABLE).
%% Routing - HOT CODE PATH
+%% -------------------------------------------------------------------
+%% match().
+%% -------------------------------------------------------------------
+
+-spec match(Src, MatchFun) -> [Dst] when
+ Src :: rabbit_types:binding_source(),
+ Dst :: rabbit_types:binding_destination(),
+ Binding :: rabbit_types:binding(),
+ MatchFun :: fun((Binding) -> boolean()).
+%% @doc Matches all binding records that have `Src' as source of the binding
+%% and for which `MatchFun' returns `true'.
+%%
+%% @returns the list of destinations
+%%
+%% @private
match(SrcName, Match) ->
rabbit_db:run(
@@ -285,10 +388,28 @@ match(SrcName, Match) ->
match_in_mnesia(SrcName, Match) ->
MatchHead = #route{binding = #binding{source = SrcName,
_ = '_'}},
- Routes = ets:select(rabbit_route, [{MatchHead, [], [['$_']]}]),
+ Routes = ets:select(?MNESIA_TABLE, [{MatchHead, [], [['$_']]}]),
[Dest || [#route{binding = Binding = #binding{destination = Dest}}] <-
Routes, Match(Binding)].
+
+%% Routing - HOT CODE PATH
+%% -------------------------------------------------------------------
+%% match_routing_key().
+%% -------------------------------------------------------------------
+
+-spec match_routing_key(Src, RoutingKeys, UseIndex) -> [Dst] when
+ Src :: rabbit_types:binding_source(),
+ Dst :: rabbit_types:binding_destination(),
+ RoutingKeys :: [binary() | '_'],
+ UseIndex :: boolean().
+%% @doc Matches all binding records that have `Src' as source of the binding
+%% and that match any routing key in `RoutingKeys'.
+%%
+%% @returns the list of destinations
+%%
+%% @private
+
match_routing_key(SrcName, RoutingKeys, UseIndex) ->
rabbit_db:run(
#{mnesia => fun() -> match_routing_key_in_mnesia(SrcName, RoutingKeys, UseIndex) end
@@ -297,91 +418,96 @@ match_routing_key(SrcName, RoutingKeys, UseIndex) ->
match_routing_key_in_mnesia(SrcName, RoutingKeys, UseIndex) ->
case UseIndex of
true ->
- route_v2(rabbit_index_route, SrcName, RoutingKeys);
+ route_v2(?MNESIA_INDEX_TABLE, SrcName, RoutingKeys);
_ ->
route_in_mnesia_v1(SrcName, RoutingKeys)
end.
-delete_all_for_exchange_in_mnesia(X = #exchange{name = XName}, OnlyDurable, RemoveBindingsForSource) ->
- Bindings = case RemoveBindingsForSource of
- true -> delete_for_source_in_mnesia(XName);
- false -> []
- end,
- {deleted, X, Bindings, delete_for_destination_in_mnesia(XName, OnlyDurable, fun delete_routes/1)}.
+%% -------------------------------------------------------------------
+%% recover().
+%% -------------------------------------------------------------------
-delete_for_destination_in_mnesia(DstName, OnlyDurable) ->
- delete_for_destination_in_mnesia(DstName, OnlyDurable, fun delete_routes/1).
+-spec recover() -> ok.
+%% @doc Recovers all durable routes
+%%
+%% @private
--spec delete_transient_for_destination_in_mnesia(rabbit_types:binding_destination()) -> rabbit_binding:deletions().
-delete_transient_for_destination_in_mnesia(DstName) ->
- delete_for_destination_in_mnesia(DstName, false, fun delete_transient_routes/1).
+recover() ->
+ rabbit_db:run(
+ #{mnesia => fun() -> recover_in_mnesia() end
+ }).
--spec has_for_source_in_mnesia(rabbit_types:binding_source()) -> boolean().
+recover_in_mnesia() ->
+ rabbit_mnesia:execute_mnesia_transaction(
+ fun () ->
+ _ = mnesia:lock({table, ?MNESIA_DURABLE_TABLE}, read),
+ _ = mnesia:lock({table, ?MNESIA_SEMI_DURABLE_TABLE}, write),
+ Routes = rabbit_mnesia:dirty_read_all(?MNESIA_DURABLE_TABLE),
+ Fun = fun(Route) ->
+ mnesia:dirty_write(?MNESIA_SEMI_DURABLE_TABLE, Route)
+ end,
+ lists:foreach(Fun, Routes),
+ ok
+ end).
-has_for_source_in_mnesia(SrcName) ->
- Match = #route{binding = #binding{source = SrcName, _ = '_'}},
- %% we need to check for semi-durable routes (which subsumes
- %% durable routes) here too in case a bunch of routes to durable
- %% queues have been removed temporarily as a result of a node
- %% failure
- contains(rabbit_route, Match) orelse
- contains(rabbit_semi_durable_route, Match).
+-spec recover(RecoverFun) -> ok when
+ Route :: #route{},
+ Src :: rabbit_types:binding_source(),
+ Dst :: rabbit_types:binding_destination(),
+ Binding :: rabbit_types:binding(),
+ Exchange :: rabbit_types:exchange(),
+ RecoverFun :: fun((Route, Src, Dst, fun((Binding, Exchange) -> ok)) -> ok).
+%% @doc Recovers all semi-durable routes
+%%
+%% @private
-%% Internal
-%% --------------------------------------------------------------
-binding_action_in_mnesia(#binding{source = SrcName,
- destination = DstName}, Fun, ErrFun) ->
- SrcTable = table_for_resource(SrcName),
- DstTable = table_for_resource(DstName),
- rabbit_mnesia:execute_mnesia_tx_with_tail(
- fun () ->
- case {mnesia:read({SrcTable, SrcName}),
- mnesia:read({DstTable, DstName})} of
- {[Src], [Dst]} -> Fun(Src, Dst);
- {[], [_] } -> ErrFun([SrcName]);
- {[_], [] } -> ErrFun([DstName]);
- {[], [] } -> ErrFun([SrcName, DstName])
- end
- end).
+recover(RecoverFun) ->
+ rabbit_db:run(
+ #{mnesia => fun() -> recover_in_mnesia(RecoverFun) end
+ }).
-table_for_resource(#resource{kind = exchange}) -> rabbit_exchange;
-table_for_resource(#resource{kind = queue}) -> rabbit_queue.
+recover_in_mnesia(RecoverFun) ->
+ [RecoverFun(Route, Src, Dst, fun recover_semi_durable_route/2) ||
+ #route{binding = #binding{destination = Dst,
+ source = Src}} = Route <-
+ rabbit_mnesia:dirty_read_all(?MNESIA_SEMI_DURABLE_TABLE)].
-create_in_mnesia(Binding, ChecksFun) ->
- binding_action_in_mnesia(
- Binding,
- fun (Src, Dst) ->
- case ChecksFun(Src, Dst) of
- ok ->
- BindingType = rabbit_binding:binding_type(Src, Dst),
- case mnesia:read({rabbit_route, Binding}) of
- [] ->
- ok = sync_route(#route{binding = Binding}, BindingType,
- should_index_table(Src), fun mnesia:write/3),
- MaybeSerial = rabbit_exchange:serialise_events(Src),
- Serial = serial_in_mnesia(MaybeSerial, Src),
- fun () ->
- rabbit_exchange:callback(Src, add_binding, Serial, [Src, Binding])
- end;
- [_] -> fun () -> ok end
- end;
- {error, _} = Err ->
- rabbit_misc:const(Err)
- end
- end, fun not_found_or_absent_errs_in_mnesia/1).
+%% -------------------------------------------------------------------
+%% create_index_route_table().
+%% -------------------------------------------------------------------
+
+-spec create_index_route_table() -> ok | {error, any()}.
+create_index_route_table() ->
+ rabbit_db:run(
+ #{mnesia => fun() -> create_index_route_table_in_mnesia() end
+ }).
+
+create_index_route_table_in_mnesia() ->
+ DependantTables = [?MNESIA_TABLE, rabbit_exchange],
+ ok = rabbit_table:wait(DependantTables, _Retry = true),
+ [ok = rabbit_table:create_local_copy(Tab, ram_copies) || Tab <- DependantTables],
+ ok = rabbit_table:create(
+ ?MNESIA_INDEX_TABLE, rabbit_table:rabbit_index_route_definition()),
+ case rabbit_table:ensure_table_copy(?MNESIA_INDEX_TABLE, node(), ram_copies) of
+ ok ->
+ ok = populate_index_route_table_in_mnesia();
+ Error ->
+ Error
+ end.
populate_index_route_table_in_mnesia() ->
rabbit_mnesia:execute_mnesia_transaction(
fun () ->
- _ = mnesia:lock({table, rabbit_route}, read),
- _ = mnesia:lock({table, rabbit_index_route}, write),
- Routes = rabbit_mnesia:dirty_read_all(rabbit_route),
+ _ = mnesia:lock({table, ?MNESIA_TABLE}, read),
+ _ = mnesia:lock({table, rabbit_exchange}, read),
+ _ = mnesia:lock({table, ?MNESIA_INDEX_TABLE}, write),
+ Routes = rabbit_mnesia:dirty_read_all(?MNESIA_TABLE),
lists:foreach(fun(#route{binding = #binding{source = Exchange}} = Route) ->
case rabbit_db_exchange:get(Exchange) of
{ok, X} ->
case should_index_table(X) of
true ->
- mnesia:dirty_write(rabbit_index_route,
+ mnesia:dirty_write(?MNESIA_INDEX_TABLE,
rabbit_binding:index_route(Route));
false ->
ok
@@ -392,33 +518,52 @@ populate_index_route_table_in_mnesia() ->
end, Routes)
end).
-delete_in_mnesia(Binding, ChecksFun) ->
- binding_action_in_mnesia(
- Binding,
- fun (Src, Dst) ->
- lock_resource(Src, read),
- lock_resource(Dst, read),
- case mnesia:read(rabbit_route, Binding, write) of
- [] -> case mnesia:read(rabbit_durable_route, Binding, write) of
- [] -> rabbit_misc:const(ok);
- %% We still delete the binding and run
- %% all post-delete functions if there is only
- %% a durable route in the database
- _ -> delete_in_mnesia(Src, Dst, Binding)
- end;
- _ -> case ChecksFun(Src, Dst) of
- ok -> delete_in_mnesia(Src, Dst, Binding);
- {error, _} = Err -> rabbit_misc:const(Err)
- end
- end
- end, fun absent_errs_only_in_mnesia/1).
+%% -------------------------------------------------------------------
+%% delete_all_for_exchange_in_mnesia().
+%% -------------------------------------------------------------------
-delete_in_mnesia(Src, Dst, B) ->
- ok = sync_route(#route{binding = B}, rabbit_binding:binding_type(Src, Dst),
- should_index_table(Src), fun delete/3),
- Deletions0 = maybe_auto_delete_exchange_in_mnesia(
- B#binding.source, [B], rabbit_binding:new_deletions(), false),
- fun() -> rabbit_binding:process_deletions(Deletions0) end.
+-spec delete_all_for_exchange_in_mnesia(Exchange, OnlyDurable, RemoveBindingsForSource)
+ -> Ret when
+ Exchange :: rabbit_types:exchange(),
+ OnlyDurable :: boolean(),
+ RemoveBindingsForSource :: boolean(),
+ Binding :: rabbit_types:binding(),
+ Ret :: {deleted, Exchange, [Binding], rabbit_binding:deletions()}.
+
+delete_all_for_exchange_in_mnesia(X = #exchange{name = XName}, OnlyDurable, RemoveBindingsForSource) ->
+ Bindings = case RemoveBindingsForSource of
+ true -> delete_for_source_in_mnesia(XName);
+ false -> []
+ end,
+ {deleted, X, Bindings, delete_for_destination_in_mnesia(XName, OnlyDurable, fun delete_routes/1)}.
+
+delete_for_source_in_mnesia(#exchange{name = SrcName} = SrcX) ->
+ delete_for_source_in_mnesia(SrcName, should_index_table(SrcX));
+delete_for_source_in_mnesia(SrcName) ->
+ delete_for_source_in_mnesia(SrcName, undefined).
+
+-spec delete_for_source_in_mnesia(rabbit_types:binding_source(),
+ boolean() | undefined) -> [rabbit_types:binding()].
+delete_for_source_in_mnesia(SrcName, ShouldIndexTable) ->
+ lock_resource(SrcName),
+ Match = #route{binding = #binding{source = SrcName, _ = '_'}},
+ delete_routes(
+ lists:usort(
+ mnesia:dirty_match_object(?MNESIA_TABLE, Match) ++
+ mnesia:dirty_match_object(?MNESIA_SEMI_DURABLE_TABLE, Match)),
+ ShouldIndexTable).
+
+%% -------------------------------------------------------------------
+%% delete_for_destination_in_mnesia().
+%% -------------------------------------------------------------------
+
+-spec delete_for_destination_in_mnesia(Dst, OnlyDurable) -> Deletions when
+ Dst :: rabbit_types:binding_destination(),
+ OnlyDurable :: boolean(),
+ Deletions :: rabbit_binding:deletions().
+
+delete_for_destination_in_mnesia(DstName, OnlyDurable) ->
+ delete_for_destination_in_mnesia(DstName, OnlyDurable, fun delete_routes/1).
delete_for_destination_in_mnesia(DstName, OnlyDurable, Fun) ->
lock_resource(DstName),
@@ -428,33 +573,71 @@ delete_for_destination_in_mnesia(DstName, OnlyDurable, Fun) ->
false ->
[rabbit_binding:reverse_route(R) ||
R <- mnesia:dirty_match_object(
- rabbit_reverse_route, MatchRev)];
+ ?MNESIA_REVERSE_TABLE, MatchRev)];
true -> lists:usort(
mnesia:dirty_match_object(
- rabbit_durable_route, MatchFwd) ++
+ ?MNESIA_DURABLE_TABLE, MatchFwd) ++
mnesia:dirty_match_object(
- rabbit_semi_durable_route, MatchFwd))
+ ?MNESIA_SEMI_DURABLE_TABLE, MatchFwd))
end,
Bindings = Fun(Routes),
rabbit_binding:group_bindings_fold(fun maybe_auto_delete_exchange_in_mnesia/4,
lists:keysort(#binding.source, Bindings), OnlyDurable).
-delete_for_source_in_mnesia(#exchange{name = SrcName} = SrcX) ->
- delete_for_source_in_mnesia(SrcName, should_index_table(SrcX));
-delete_for_source_in_mnesia(SrcName) ->
- delete_for_source_in_mnesia(SrcName, undefined).
+%% -------------------------------------------------------------------
+%% delete_transient_for_destination_in_mnesia().
+%% -------------------------------------------------------------------
--spec delete_for_source_in_mnesia(rabbit_types:binding_source(),
- boolean() | undefined) -> [rabbit_types:binding()].
-delete_for_source_in_mnesia(SrcName, ShouldIndexTable) ->
- lock_resource(SrcName),
+-spec delete_transient_for_destination_in_mnesia(rabbit_types:binding_destination()) -> rabbit_binding:deletions().
+delete_transient_for_destination_in_mnesia(DstName) ->
+ delete_for_destination_in_mnesia(DstName, false, fun delete_transient_routes/1).
+
+delete_transient_routes(Routes) ->
+ lists:map(fun(#route{binding = #binding{source = Src} = Binding} = Route) ->
+ {ok, X} = rabbit_db_exchange:get(Src),
+ ok = sync_transient_route(Route, should_index_table(X), fun delete/3),
+ Binding
+ end, Routes).
+
+%% -------------------------------------------------------------------
+%% has_for_source_in_mnesia().
+%% -------------------------------------------------------------------
+
+-spec has_for_source_in_mnesia(rabbit_types:binding_source()) -> boolean().
+
+has_for_source_in_mnesia(SrcName) ->
Match = #route{binding = #binding{source = SrcName, _ = '_'}},
- delete_routes(
- lists:usort(
- mnesia:dirty_match_object(rabbit_route, Match) ++
- mnesia:dirty_match_object(rabbit_semi_durable_route, Match)),
- ShouldIndexTable).
+ %% we need to check for semi-durable routes (which subsumes
+ %% durable routes) here too in case a bunch of routes to durable
+ %% queues have been removed temporarily as a result of a node
+ %% failure
+ contains(?MNESIA_TABLE, Match) orelse
+ contains(?MNESIA_SEMI_DURABLE_TABLE, Match).
+
+%% -------------------------------------------------------------------
+%% clear().
+%% -------------------------------------------------------------------
+
+-spec clear() -> ok.
+%% @doc Deletes all bindings.
+%%
+%% @private
+clear() ->
+ rabbit_db:run(
+ #{mnesia => fun() -> clear_in_mnesia() end}).
+
+clear_in_mnesia() ->
+ {atomic, ok} = mnesia:clear_table(?MNESIA_TABLE),
+ {atomic, ok} = mnesia:clear_table(?MNESIA_DURABLE_TABLE),
+ {atomic, ok} = mnesia:clear_table(?MNESIA_SEMI_DURABLE_TABLE),
+ {atomic, ok} = mnesia:clear_table(?MNESIA_REVERSE_TABLE),
+ {atomic, ok} = mnesia:clear_table(?MNESIA_INDEX_TABLE),
+ ok.
+
+%% --------------------------------------------------------------
+%% Internal
+%% --------------------------------------------------------------
delete_routes(Routes) ->
delete_routes(Routes, undefined).
@@ -463,11 +646,11 @@ delete_routes(Routes, ShouldIndexTable) ->
%% operations on disk tables, which require an fsync.
{RamRoutes, DiskRoutes} =
lists:partition(fun (R) -> mnesia:read(
- rabbit_durable_route, R#route.binding, read) == [] end,
+ ?MNESIA_DURABLE_TABLE, R#route.binding, read) == [] end,
Routes),
{RamOnlyRoutes, SemiDurableRoutes} =
lists:partition(fun (R) -> mnesia:read(
- rabbit_semi_durable_route, R#route.binding, read) == [] end,
+ ?MNESIA_SEMI_DURABLE_TABLE, R#route.binding, read) == [] end,
RamRoutes),
%% Of course the destination might not really be durable but it's
%% just as easy to try to delete it from the semi-durable table
@@ -493,13 +676,6 @@ delete_routes(Routes, ShouldIndexTable) ->
end,
[R#route.binding || R <- Routes].
-delete_transient_routes(Routes) ->
- lists:map(fun(#route{binding = #binding{source = Src} = Binding} = Route) ->
- {ok, X} = rabbit_db_exchange:get(Src),
- ok = sync_transient_route(Route, should_index_table(X), fun delete/3),
- Binding
- end, Routes).
-
delete(Tab, #route{binding = B}, LockKind) ->
mnesia:delete(Tab, B, LockKind);
delete(Tab, #reverse_route{reverse_binding = B}, LockKind) ->
@@ -521,44 +697,11 @@ should_index_table(#exchange{name = #resource{name = Name},
should_index_table(_) ->
false.
-not_found_or_absent_errs_in_mnesia(Names) ->
- Errs = [not_found_or_absent_in_mnesia(Name) || Name <- Names],
- rabbit_misc:const({error, {resources_missing, Errs}}).
-
-absent_errs_only_in_mnesia(Names) ->
- Errs = [E || Name <- Names,
- {absent, _Q, _Reason} = E <- [not_found_or_absent_in_mnesia(Name)]],
- rabbit_misc:const(case Errs of
- [] -> ok;
- _ -> {error, {resources_missing, Errs}}
- end).
-
-not_found_or_absent_in_mnesia(#resource{kind = exchange} = Name) ->
- {not_found, Name};
-not_found_or_absent_in_mnesia(#resource{kind = queue} = Name) ->
- case rabbit_db_queue:not_found_or_absent_queue_in_mnesia(Name) of
- not_found -> {not_found, Name};
- {absent, _Q, _Reason} = R -> R
- end.
-
-recover_in_mnesia() ->
- rabbit_mnesia:execute_mnesia_transaction(
- fun () ->
- _ = mnesia:lock({table, rabbit_durable_route}, read),
- _ = mnesia:lock({table, rabbit_semi_durable_route}, write),
- Routes = rabbit_mnesia:dirty_read_all(rabbit_durable_route),
- Fun = fun(Route) ->
- mnesia:dirty_write(rabbit_semi_durable_route, Route)
- end,
- lists:foreach(Fun, Routes),
- ok
- end).
-
recover_semi_durable_route(#route{binding = B} = Route, X) ->
MaybeSerial = rabbit_exchange:serialise_events(X),
rabbit_mnesia:execute_mnesia_transaction(
fun () ->
- case mnesia:read(rabbit_semi_durable_route, B, read) of
+ case mnesia:read(?MNESIA_SEMI_DURABLE_TABLE, B, read) of
[] -> no_recover;
_ -> ok = sync_transient_route(Route, should_index_table(X), fun mnesia:write/3),
serial_in_mnesia(MaybeSerial, X)
@@ -572,22 +715,22 @@ recover_semi_durable_route(#route{binding = B} = Route, X) ->
serial_in_mnesia(false, _) ->
none;
serial_in_mnesia(true, X) ->
- rabbit_db_exchange:next_serial_in_mnesia_tx(X).
+ rabbit_db_exchange:next_serial_in_mnesia_tx(X#exchange.name).
sync_route(Route, durable, ShouldIndexTable, Fun) ->
- ok = Fun(rabbit_durable_route, Route, write),
+ ok = Fun(?MNESIA_DURABLE_TABLE, Route, write),
sync_route(Route, semi_durable, ShouldIndexTable, Fun);
sync_route(Route, semi_durable, ShouldIndexTable, Fun) ->
- ok = Fun(rabbit_semi_durable_route, Route, write),
+ ok = Fun(?MNESIA_SEMI_DURABLE_TABLE, Route, write),
sync_route(Route, transient, ShouldIndexTable, Fun);
sync_route(Route, transient, ShouldIndexTable, Fun) ->
sync_transient_route(Route, ShouldIndexTable, Fun).
sync_transient_route(Route, ShouldIndexTable, Fun) ->
- ok = Fun(rabbit_route, Route, write),
- ok = Fun(rabbit_reverse_route, rabbit_binding:reverse_route(Route), write),
+ ok = Fun(?MNESIA_TABLE, Route, write),
+ ok = Fun(?MNESIA_REVERSE_TABLE, rabbit_binding:reverse_route(Route), write),
sync_index_route(Route, ShouldIndexTable, Fun).
sync_index_route(Route, true, Fun) ->
@@ -596,13 +739,13 @@ sync_index_route(Route, true, Fun) ->
%% (i.e. feature flag migration) runs in parallel.
case rabbit_feature_flags:is_enabled(direct_exchange_routing_v2, non_blocking) of
true ->
- ok = Fun(rabbit_index_route, rabbit_binding:index_route(Route), write);
+ ok = Fun(?MNESIA_INDEX_TABLE, rabbit_binding:index_route(Route), write);
false ->
ok;
state_changing ->
- case rabbit_table:exists(rabbit_index_route) of
+ case rabbit_table:exists(?MNESIA_INDEX_TABLE) of
true ->
- ok = Fun(rabbit_index_route, rabbit_binding:index_route(Route), write);
+ ok = Fun(?MNESIA_INDEX_TABLE, rabbit_binding:index_route(Route), write);
false ->
ok
end
@@ -610,6 +753,13 @@ sync_index_route(Route, true, Fun) ->
sync_index_route(_, _, _) ->
ok.
+-spec maybe_auto_delete_exchange_in_mnesia(ExchangeName, [Binding], Deletions, OnlyDurable)
+ -> Ret when
+ ExchangeName :: rabbit_exchange:name(),
+ Binding :: rabbit_types:binding(),
+ Deletions :: rabbit_binding:deletions(),
+ OnlyDurable :: boolean(),
+ Ret :: rabbit_binding:deletions().
maybe_auto_delete_exchange_in_mnesia(XName, Bindings, Deletions, OnlyDurable) ->
{Entry, Deletions1} =
case rabbit_db_exchange:maybe_auto_delete_in_mnesia(XName, OnlyDurable) of
@@ -626,7 +776,7 @@ maybe_auto_delete_exchange_in_mnesia(XName, Bindings, Deletions, OnlyDurable) ->
lock_resource(Name) -> lock_resource(Name, write).
lock_resource(Name, LockKind) ->
- _ = mnesia:lock({global, Name, mnesia:table_info(rabbit_route, where_to_write)},
+ _ = mnesia:lock({global, Name, mnesia:table_info(?MNESIA_TABLE, where_to_write)},
LockKind),
ok.
@@ -644,7 +794,7 @@ route_in_mnesia_v1(SrcName, [RoutingKey]) ->
destination = '$1',
key = RoutingKey,
_ = '_'}},
- ets:select(rabbit_route, [{MatchHead, [], ['$1']}]);
+ ets:select(?MNESIA_TABLE, [{MatchHead, [], ['$1']}]);
route_in_mnesia_v1(SrcName, [_|_] = RoutingKeys) ->
%% Normally we'd call mnesia:dirty_select/2 here, but that is quite
%% expensive for the same reasons as above, and, additionally, due to
@@ -663,7 +813,7 @@ route_in_mnesia_v1(SrcName, [_|_] = RoutingKeys) ->
_ = '_'}},
Conditions = [list_to_tuple(['orelse' | [{'=:=', '$2', RKey} ||
RKey <- RoutingKeys]])],
- ets:select(rabbit_route, [{MatchHead, Conditions, ['$1']}]).
+ ets:select(?MNESIA_TABLE, [{MatchHead, Conditions, ['$1']}]).
%% rabbit_router:match_routing_key/2 uses ets:select/2 to get destinations.
%% ets:select/2 is expensive because it needs to compile the match spec every
diff --git a/deps/rabbit/src/rabbit_db_exchange.erl b/deps/rabbit/src/rabbit_db_exchange.erl
index e4cbd2ad4d..093b0c6e87 100644
--- a/deps/rabbit/src/rabbit_db_exchange.erl
+++ b/deps/rabbit/src/rabbit_db_exchange.erl
@@ -19,7 +19,7 @@
count/0,
update/2,
create_or_get/1,
- insert/1,
+ set/1,
peek_serial/1,
next_serial/1,
delete/2,
@@ -37,7 +37,12 @@
update_in_mnesia_tx/2
]).
--type name() :: rabbit_types:r('exchange').
+%% For testing
+-export([clear/0]).
+
+-define(MNESIA_TABLE, rabbit_exchange).
+-define(MNESIA_DURABLE_TABLE, rabbit_durable_exchange).
+-define(MNESIA_SERIAL_TABLE, rabbit_exchange_serial).
%% -------------------------------------------------------------------
%% get_all().
@@ -56,7 +61,7 @@ get_all() ->
#{mnesia => fun() -> get_all_in_mnesia() end}).
get_all_in_mnesia() ->
- rabbit_db:list_in_mnesia(rabbit_exchange, #exchange{_ = '_'}).
+ rabbit_db:list_in_mnesia(?MNESIA_TABLE, #exchange{_ = '_'}).
-spec get_all(VHostName) -> [Exchange] when
VHostName :: vhost:name(),
@@ -74,7 +79,11 @@ get_all(VHost) ->
get_all_in_mnesia(VHost) ->
Match = #exchange{name = rabbit_misc:r(VHost, exchange), _ = '_'},
- rabbit_db:list_in_mnesia(rabbit_exchange, Match).
+ rabbit_db:list_in_mnesia(?MNESIA_TABLE, Match).
+
+%% -------------------------------------------------------------------
+%% get_all_durable().
+%% -------------------------------------------------------------------
-spec get_all_durable() -> [Exchange] when
Exchange :: rabbit_types:exchange().
@@ -96,8 +105,8 @@ get_all_durable_in_mnesia() ->
%% list().
%% -------------------------------------------------------------------
--spec list() -> [Exchange] when
- Exchange :: rabbit_types:exchange().
+-spec list() -> [ExchangeName] when
+ ExchangeName :: rabbit_exchange:name().
%% @doc Lists the names of all exchanges.
%%
%% @returns a list of exchange names.
@@ -110,14 +119,14 @@ list() ->
}).
list_in_mnesia() ->
- mnesia:dirty_all_keys(rabbit_exchange).
+ mnesia:dirty_all_keys(?MNESIA_TABLE).
%% -------------------------------------------------------------------
%% get().
%% -------------------------------------------------------------------
-spec get(ExchangeName) -> Ret when
- ExchangeName :: name(),
+ ExchangeName :: rabbit_exchange:name(),
Ret :: {ok, Exchange :: rabbit_types:exchange()} | {error, not_found}.
%% @doc Returns the record of the exchange named `Name'.
%%
@@ -132,14 +141,14 @@ get(Name) ->
}).
get_in_mnesia(Name) ->
- rabbit_mnesia:dirty_read({rabbit_exchange, Name}).
+ rabbit_mnesia:dirty_read({?MNESIA_TABLE, Name}).
%% -------------------------------------------------------------------
%% get_many().
%% -------------------------------------------------------------------
-spec get_many([ExchangeName]) -> [Exchange] when
- ExchangeName :: name(),
+ ExchangeName :: rabbit_exchange:name(),
Exchange :: rabbit_types:exchange().
%% @doc Returns the records of the exchanges named `Name'.
%%
@@ -149,9 +158,15 @@ get_in_mnesia(Name) ->
get_many(Names) when is_list(Names) ->
rabbit_db:run(
- #{mnesia => fun() -> get_many_in_mnesia(rabbit_exchange, Names) end
+ #{mnesia => fun() -> get_many_in_mnesia(?MNESIA_TABLE, Names) end
}).
+get_many_in_mnesia(Table, [Name]) -> ets:lookup(Table, Name);
+get_many_in_mnesia(Table, Names) when is_list(Names) ->
+ %% Normally we'd call mnesia:dirty_read/1 here, but that is quite
+ %% expensive for reasons explained in rabbit_mnesia:dirty_read/1.
+ lists:append([ets:lookup(Table, Name) || Name <- Names]).
+
%% -------------------------------------------------------------------
%% count().
%% -------------------------------------------------------------------
@@ -168,16 +183,15 @@ count() ->
#{mnesia => fun() -> count_in_mnesia() end}).
count_in_mnesia() ->
- mnesia:table_info(rabbit_exchange, size).
+ mnesia:table_info(?MNESIA_TABLE, size).
%% -------------------------------------------------------------------
%% update().
%% -------------------------------------------------------------------
--spec update(ExchangeName, UpdateFun) -> Ret when
- ExchangeName :: name(),
- UpdateFun :: fun((Exchange) -> Exchange),
- Ret :: Exchange :: rabbit_types:exchange() | not_found.
+-spec update(ExchangeName, UpdateFun) -> ok when
+ ExchangeName :: rabbit_exchange:name(),
+ UpdateFun :: fun((Exchange) -> Exchange).
%% @doc Updates an existing exchange record using the result of
%% `UpdateFun'.
%%
@@ -194,9 +208,36 @@ update(XName, Fun) ->
update_in_mnesia(XName, Fun) ->
rabbit_mnesia:execute_mnesia_transaction(
fun() ->
- update_in_mnesia_tx(XName, Fun)
+ _ = update_in_mnesia_tx(XName, Fun),
+ ok
end).
+-spec update_in_mnesia_tx(ExchangeName, UpdateFun) -> Ret when
+ ExchangeName :: rabbit_exchange:name(),
+ Exchange :: rabbit_types:exchange(),
+ UpdateFun :: fun((Exchange) -> Exchange),
+ Ret :: not_found | Exchange.
+
+update_in_mnesia_tx(Name, Fun) ->
+ Table = {?MNESIA_TABLE, Name},
+ case mnesia:wread(Table) of
+ [X] -> X1 = Fun(X),
+ set_in_mnesia_tx(X1);
+ [] -> not_found
+ end.
+
+set_in_mnesia_tx(X = #exchange{durable = true}) ->
+ mnesia:write(rabbit_durable_exchange, X#exchange{decorators = undefined},
+ write),
+ set_ram_in_mnesia_tx(X);
+set_in_mnesia_tx(X = #exchange{durable = false}) ->
+ set_ram_in_mnesia_tx(X).
+
+set_ram_in_mnesia_tx(X) ->
+ X1 = rabbit_exchange_decorator:set(X),
+ ok = mnesia:write(?MNESIA_TABLE, X1, write),
+ X1.
+
%% -------------------------------------------------------------------
%% create_or_get().
%% -------------------------------------------------------------------
@@ -220,19 +261,19 @@ create_or_get(X) ->
create_or_get_in_mnesia(#exchange{name = XName} = X) ->
rabbit_mnesia:execute_mnesia_transaction(
fun() ->
- case mnesia:wread({rabbit_exchange, XName}) of
+ case mnesia:wread({?MNESIA_TABLE, XName}) of
[] ->
- {new, insert_in_mnesia_tx(X)};
+ {new, set_in_mnesia_tx(X)};
[ExistingX] ->
{existing, ExistingX}
end
end).
%% -------------------------------------------------------------------
-%% insert().
+%% set().
%% -------------------------------------------------------------------
--spec insert([Exchange]) -> ok when
+-spec set([Exchange]) -> ok when
Exchange :: rabbit_types:exchange().
%% @doc Writes the exchange records.
%%
@@ -240,12 +281,12 @@ create_or_get_in_mnesia(#exchange{name = XName} = X) ->
%%
%% @private
-insert(Xs) ->
+set(Xs) ->
rabbit_db:run(
- #{mnesia => fun() -> insert_in_mnesia(Xs) end
+ #{mnesia => fun() -> set_in_mnesia(Xs) end
}).
-insert_in_mnesia(Xs) when is_list(Xs) ->
+set_in_mnesia(Xs) when is_list(Xs) ->
rabbit_mnesia:execute_mnesia_transaction(
fun () ->
[mnesia:write(rabbit_durable_exchange, X, write) || X <- Xs]
@@ -257,7 +298,7 @@ insert_in_mnesia(Xs) when is_list(Xs) ->
%% -------------------------------------------------------------------
-spec peek_serial(ExchangeName) -> Serial when
- ExchangeName :: name(),
+ ExchangeName :: rabbit_exchange:name(),
Serial :: integer().
%% @doc Returns the next serial number without increasing it.
%%
@@ -276,12 +317,18 @@ peek_serial_in_mnesia(XName) ->
peek_serial_in_mnesia_tx(XName, read)
end).
+peek_serial_in_mnesia_tx(XName, LockType) ->
+ case mnesia:read(?MNESIA_SERIAL_TABLE, XName, LockType) of
+ [#exchange_serial{next = Serial}] -> Serial;
+ _ -> 1
+ end.
+
%% -------------------------------------------------------------------
%% next_serial().
%% -------------------------------------------------------------------
-spec next_serial(ExchangeName) -> Serial when
- ExchangeName :: name(),
+ ExchangeName :: rabbit_exchange:name(),
Serial :: integer().
%% @doc Returns the next serial number and increases it.
%%
@@ -289,29 +336,39 @@ peek_serial_in_mnesia(XName) ->
%%
%% @private
-next_serial(X) ->
+next_serial(XName) ->
rabbit_db:run(
- #{mnesia => fun() -> next_serial_in_mnesia(X) end
+ #{mnesia => fun() -> next_serial_in_mnesia(XName) end
}).
-next_serial_in_mnesia(X) ->
+next_serial_in_mnesia(XName) ->
rabbit_mnesia:execute_mnesia_transaction(fun() ->
- next_serial_in_mnesia_tx(X)
+ next_serial_in_mnesia_tx(XName)
end).
+-spec next_serial_in_mnesia_tx(ExchangeName) -> Serial when
+ ExchangeName :: rabbit_exchange:name(),
+ Serial :: integer().
+
+next_serial_in_mnesia_tx(XName) ->
+ Serial = peek_serial_in_mnesia_tx(XName, write),
+ ok = mnesia:write(?MNESIA_SERIAL_TABLE,
+ #exchange_serial{name = XName, next = Serial + 1}, write),
+ Serial.
+
%% -------------------------------------------------------------------
%% delete().
%% -------------------------------------------------------------------
-spec delete(ExchangeName, IfUnused) -> Ret when
- ExchangeName :: name(),
+ ExchangeName :: rabbit_exchange:name(),
IfUnused :: boolean(),
Exchange :: rabbit_types:exchange(),
Binding :: rabbit_types:binding(),
Deletions :: dict:dict(),
- Ret :: {error, not_found} | {deleted, Exchange, [Binding], Deletions}.
-%% @doc Deletes an exchange record from the database. If `IfUnused` is set
-%% to `true`, it is only deleted when there are no bindings present on the
+ Ret :: {error, not_found} | {error, in_use} | {deleted, Exchange, [Binding], Deletions}.
+%% @doc Deletes an exchange record from the database. If `IfUnused' is set
+%% to `true', it is only deleted when there are no bindings present on the
%% exchange.
%%
%% @returns an error if the exchange does not exist or a tuple with the exchange,
@@ -334,18 +391,41 @@ delete_in_mnesia(XName, IfUnused) ->
end,
rabbit_mnesia:execute_mnesia_transaction(
fun() ->
- case mnesia:wread({rabbit_exchange, XName}) of
+ case mnesia:wread({?MNESIA_TABLE, XName}) of
[X] -> DeletionFun(X, false);
[] -> {error, not_found}
end
end).
+conditional_delete_in_mnesia(X = #exchange{name = XName}, OnlyDurable) ->
+ case rabbit_db_binding:has_for_source_in_mnesia(XName) of
+ false -> delete_in_mnesia(X, OnlyDurable, false);
+ true -> {error, in_use}
+ end.
+
+unconditional_delete_in_mnesia(X, OnlyDurable) ->
+ delete_in_mnesia(X, OnlyDurable, true).
+
+-spec delete_in_mnesia(Exchange, OnlyDurable, RemoveBindingsForSource) -> Ret when
+ Exchange :: rabbit_types:exchange(),
+ OnlyDurable :: boolean(),
+ RemoveBindingsForSource :: boolean(),
+ Exchange :: rabbit_types:exchange(),
+ Binding :: rabbit_types:binding(),
+ Deletions :: dict:dict(),
+ Ret :: {error, not_found} | {error, in_use} | {deleted, Exchange, [Binding], Deletions}.
+delete_in_mnesia(X = #exchange{name = XName}, OnlyDurable, RemoveBindingsForSource) ->
+ ok = mnesia:delete({?MNESIA_TABLE, XName}),
+ mnesia:delete({?MNESIA_DURABLE_TABLE, XName}),
+ rabbit_db_binding:delete_all_for_exchange_in_mnesia(
+ X, OnlyDurable, RemoveBindingsForSource).
+
%% -------------------------------------------------------------------
%% delete_serial().
%% -------------------------------------------------------------------
-spec delete_serial(ExchangeName) -> ok when
- ExchangeName :: name().
+ ExchangeName :: rabbit_exchange:name().
%% @doc Deletes an exchange serial record from the database.
%%
%% @returns ok
@@ -360,14 +440,15 @@ delete_serial(XName) ->
delete_serial_in_mnesia(XName) ->
rabbit_mnesia:execute_mnesia_transaction(
fun() ->
- mnesia:delete({rabbit_exchange_serial, XName})
+ mnesia:delete({?MNESIA_SERIAL_TABLE, XName})
end).
%% -------------------------------------------------------------------
%% recover().
%% -------------------------------------------------------------------
--spec recover(VHostName) -> ok when
+-spec recover(VHostName) -> [Exchange] when
+ Exchange :: rabbit_types:exchange(),
VHostName :: vhost:name().
%% @doc Recovers all exchanges for a given vhost
%%
@@ -380,13 +461,30 @@ recover(VHost) ->
#{mnesia => fun() -> recover_in_mnesia(VHost) end
}).
+recover_in_mnesia(VHost) ->
+ rabbit_mnesia:table_filter(
+ fun (#exchange{name = XName}) ->
+ XName#resource.virtual_host =:= VHost andalso
+ mnesia:read({?MNESIA_TABLE, XName}) =:= []
+ end,
+ fun (X, true) ->
+ X;
+ (X, false) ->
+ X1 = rabbit_mnesia:execute_mnesia_transaction(
+ fun() -> set_ram_in_mnesia_tx(X) end),
+ Serial = rabbit_exchange:serial(X1),
+ rabbit_exchange:callback(X1, create, Serial, [X1])
+ end,
+ ?MNESIA_DURABLE_TABLE).
+
%% -------------------------------------------------------------------
%% match().
%% -------------------------------------------------------------------
--spec match(Pattern) -> [Exchange] when
+-spec match(Pattern) -> Ret when
Pattern :: #exchange{},
- Exchange :: rabbit_types:exchange().
+ Exchange :: rabbit_types:exchange(),
+ Ret :: [Exchange] | {error, Reason :: any()}.
%% @doc Returns all exchanges that match a given pattern
%%
%% @returns a list of exchange records
@@ -401,7 +499,7 @@ match(Pattern) ->
match_in_mnesia(Pattern) ->
case mnesia:transaction(
fun() ->
- mnesia:match_object(rabbit_exchange, Pattern, read)
+ mnesia:match_object(?MNESIA_TABLE, Pattern, read)
end) of
{atomic, Xs} -> Xs;
{aborted, Err} -> {error, Err}
@@ -412,7 +510,7 @@ match_in_mnesia(Pattern) ->
%% -------------------------------------------------------------------
-spec exists(ExchangeName) -> Exists when
- ExchangeName :: name(),
+ ExchangeName :: rabbit_exchange:name(),
Exists :: boolean().
%% @doc Indicates if the exchange named `Name' exists.
%%
@@ -425,58 +523,41 @@ exists(Name) ->
#{mnesia => fun() -> exists_in_mnesia(Name) end}).
exists_in_mnesia(Name) ->
- ets:member(rabbit_exchange, Name).
-
-%% Internal
-%% --------------------------------------------------------------
-
-peek_serial_in_mnesia_tx(XName, LockType) ->
- case mnesia:read(rabbit_exchange_serial, XName, LockType) of
- [#exchange_serial{next = Serial}] -> Serial;
- _ -> 1
- end.
-
-next_serial_in_mnesia_tx(#exchange{name = XName}) ->
- Serial = peek_serial_in_mnesia_tx(XName, write),
- ok = mnesia:write(rabbit_exchange_serial,
- #exchange_serial{name = XName, next = Serial + 1}, write),
- Serial.
+ ets:member(?MNESIA_TABLE, Name).
-update_in_mnesia_tx(Name, Fun) ->
- Table = {rabbit_exchange, Name},
- case mnesia:wread(Table) of
- [X] -> X1 = Fun(X),
- insert_in_mnesia_tx(X1);
- [] -> not_found
- end.
+%% -------------------------------------------------------------------
+%% clear().
+%% -------------------------------------------------------------------
-delete_in_mnesia(X = #exchange{name = XName}, OnlyDurable, RemoveBindingsForSource) ->
- ok = mnesia:delete({rabbit_exchange, XName}),
- mnesia:delete({rabbit_durable_exchange, XName}),
- rabbit_db_binding:delete_all_for_exchange_in_mnesia(X, OnlyDurable, RemoveBindingsForSource).
+-spec clear() -> ok.
+%% @doc Deletes all exchanges.
+%%
+%% @private
-get_many_in_mnesia(Table, [Name]) -> ets:lookup(Table, Name);
-get_many_in_mnesia(Table, Names) when is_list(Names) ->
- %% Normally we'd call mnesia:dirty_read/1 here, but that is quite
- %% expensive for reasons explained in rabbit_mnesia:dirty_read/1.
- lists:append([ets:lookup(Table, Name) || Name <- Names]).
+clear() ->
+ rabbit_db:run(
+ #{mnesia => fun() -> clear_in_mnesia() end}).
-conditional_delete_in_mnesia(X = #exchange{name = XName}, OnlyDurable) ->
- case rabbit_db_binding:has_for_source_in_mnesia(XName) of
- false -> delete_in_mnesia(X, OnlyDurable, false);
- true -> {error, in_use}
- end.
+clear_in_mnesia() ->
+ {atomic, ok} = mnesia:clear_table(?MNESIA_TABLE),
+ {atomic, ok} = mnesia:clear_table(?MNESIA_DURABLE_TABLE),
+ {atomic, ok} = mnesia:clear_table(?MNESIA_SERIAL_TABLE),
+ ok.
-unconditional_delete_in_mnesia(X, OnlyDurable) ->
- delete_in_mnesia(X, OnlyDurable, true).
+%% -------------------------------------------------------------------
+%% maybe_auto_delete_in_mnesia().
+%% -------------------------------------------------------------------
--spec maybe_auto_delete_in_mnesia
- (rabbit_types:exchange(), boolean())
- -> 'not_deleted' | {'deleted', rabbit_binding:deletions()}.
+-spec maybe_auto_delete_in_mnesia(ExchangeName, boolean()) -> Ret when
+ ExchangeName :: rabbit_exchange:name(),
+ Exchange :: rabbit_types:exchange(),
+ Deletions :: rabbit_binding:deletions(),
+ Ret :: {'not_deleted', 'undefined' | Exchange} |
+ {'deleted', Exchange, Deletions}.
maybe_auto_delete_in_mnesia(XName, OnlyDurable) ->
case mnesia:read({case OnlyDurable of
- true -> rabbit_durable_exchange;
- false -> rabbit_exchange
+ true -> ?MNESIA_DURABLE_TABLE;
+ false -> ?MNESIA_TABLE
end, XName}) of
[] -> {not_deleted, undefined};
[#exchange{auto_delete = false} = X] -> {not_deleted, X};
@@ -486,31 +567,3 @@ maybe_auto_delete_in_mnesia(XName, OnlyDurable) ->
{deleted, X, [], Deletions} -> {deleted, X, Deletions}
end
end.
-
-insert_in_mnesia_tx(X = #exchange{durable = true}) ->
- mnesia:write(rabbit_durable_exchange, X#exchange{decorators = undefined},
- write),
- insert_ram_in_mnesia_tx(X);
-insert_in_mnesia_tx(X = #exchange{durable = false}) ->
- insert_ram_in_mnesia_tx(X).
-
-insert_ram_in_mnesia_tx(X) ->
- X1 = rabbit_exchange_decorator:set(X),
- ok = mnesia:write(rabbit_exchange, X1, write),
- X1.
-
-recover_in_mnesia(VHost) ->
- rabbit_mnesia:table_filter(
- fun (#exchange{name = XName}) ->
- XName#resource.virtual_host =:= VHost andalso
- mnesia:read({rabbit_exchange, XName}) =:= []
- end,
- fun (X, true) ->
- X;
- (X, false) ->
- X1 = rabbit_mnesia:execute_mnesia_transaction(
- fun() -> insert_in_mnesia_tx(X) end),
- Serial = rabbit_exchange:serial(X1),
- rabbit_exchange:callback(X1, create, Serial, [X1])
- end,
- rabbit_durable_exchange).
diff --git a/deps/rabbit/src/rabbit_db_maintenance.erl b/deps/rabbit/src/rabbit_db_maintenance.erl
index ba6e59be64..6036dc4149 100644
--- a/deps/rabbit/src/rabbit_db_maintenance.erl
+++ b/deps/rabbit/src/rabbit_db_maintenance.erl
@@ -64,8 +64,9 @@ status_table_definition() ->
%% set().
%% -------------------------------------------------------------------
--spec set(Status) -> ok when
- Status :: rabbit_maintenance:maintenance_status().
+-spec set(Status) -> Ret when
+ Status :: rabbit_maintenance:maintenance_status(),
+ Ret :: boolean().
%% @doc Sets the maintenance status for the local node
%%
%% @private
@@ -104,7 +105,7 @@ set_in_mnesia(Status) ->
-spec get(Node) -> Status when
Node :: node(),
- Status :: rabbit_maintenance:maintenance_status().
+ Status :: undefined | rabbit_maintenance:maintenance_status().
%% @doc Returns the status for the given node using a local query.
%%
%% @returns the status if any, or `undefined'.
@@ -130,7 +131,7 @@ get_in_mnesia(Node) ->
-spec get_consistent(Node) -> Status when
Node :: node(),
- Status :: rabbit_maintenance:maintenance_status().
+ Status :: undefined | rabbit_maintenance:maintenance_status().
%% @doc Returns the status for the given node using a consistent query.
%%
%% @returns the status if any, or `undefined'.
diff --git a/deps/rabbit/src/rabbit_db_msup.erl b/deps/rabbit/src/rabbit_db_msup.erl
index 0f56b609d7..35b8b36d04 100644
--- a/deps/rabbit/src/rabbit_db_msup.erl
+++ b/deps/rabbit/src/rabbit_db_msup.erl
@@ -7,6 +7,18 @@
-module(rabbit_db_msup).
+-export([
+ create_tables/0,
+ table_definitions/0,
+ create_or_update/5,
+ find_mirror/2,
+ update_all/2,
+ delete/2,
+ delete_all/1
+ ]).
+
+-export([clear/0]).
+
-define(TABLE, mirrored_sup_childspec).
-define(TABLE_DEF,
{?TABLE,
@@ -15,12 +27,12 @@
{attributes, record_info(fields, mirrored_sup_childspec)}]}).
-define(TABLE_MATCH, {match, #mirrored_sup_childspec{ _ = '_' }}).
--export([create_tables/0, table_definitions/0,
- create_or_update/5, delete/2,
- find_mirror/2, update_all/2, delete_all/1]).
-
-record(mirrored_sup_childspec, {key, mirroring_pid, childspec}).
+%% -------------------------------------------------------------------
+%% create_tables().
+%% -------------------------------------------------------------------
+
-spec create_tables() -> Ret when
Ret :: 'ok' | {error, Reason :: term()}.
@@ -29,6 +41,19 @@ create_tables() ->
#{mnesia => fun() -> create_tables_in_mnesia([?TABLE_DEF]) end
}).
+create_tables_in_mnesia([]) ->
+ ok;
+create_tables_in_mnesia([{Table, Attributes} | Ts]) ->
+ case mnesia:create_table(Table, Attributes) of
+ {atomic, ok} -> create_tables_in_mnesia(Ts);
+ {aborted, {already_exists, ?TABLE}} -> create_tables_in_mnesia(Ts);
+ Err -> Err
+ end.
+
+%% -------------------------------------------------------------------
+%% table_definitions().
+%% -------------------------------------------------------------------
+
-spec table_definitions() -> [Def] when
Def :: {Name :: atom(), term()}.
@@ -36,6 +61,10 @@ table_definitions() ->
{Name, Attributes} = ?TABLE_DEF,
[{Name, [?TABLE_MATCH | Attributes]}].
+%% -------------------------------------------------------------------
+%% create_or_update().
+%% -------------------------------------------------------------------
+
-spec create_or_update(Group, Overall, Delegate, ChildSpec, Id) -> Ret when
Group :: any(),
Overall :: pid(),
@@ -51,6 +80,47 @@ create_or_update(Group, Overall, Delegate, ChildSpec, Id) ->
create_or_update_in_mnesia(Group, Overall, Delegate, ChildSpec, Id)
end}).
+create_or_update_in_mnesia(Group, Overall, Delegate, ChildSpec, Id) ->
+ rabbit_mnesia:execute_mnesia_transaction(
+ fun() ->
+ ReadResult = mnesia:wread({?TABLE, {Group, Id}}),
+ rabbit_log:debug("Mirrored supervisor: check_start table ~ts read for key ~tp returned ~tp",
+ [?TABLE, {Group, Id}, ReadResult]),
+ case ReadResult of
+ [] -> _ = write_in_mnesia(Group, Overall, ChildSpec, Id),
+ start;
+ [S] -> #mirrored_sup_childspec{key = {Group, Id},
+ mirroring_pid = Pid} = S,
+ case Overall of
+ Pid ->
+ rabbit_log:debug("Mirrored supervisor: overall matched mirrored pid ~tp", [Pid]),
+ Delegate;
+ _ ->
+ rabbit_log:debug("Mirrored supervisor: overall ~tp did not match mirrored pid ~tp", [Overall, Pid]),
+ Sup = mirrored_supervisor:supervisor(Pid),
+ rabbit_log:debug("Mirrored supervisor: supervisor(~tp) returned ~tp", [Pid, Sup]),
+ case Sup of
+ dead ->
+ _ = write_in_mnesia(Group, Overall, ChildSpec, Id),
+ start;
+ Delegate0 ->
+ Delegate0
+ end
+ end
+ end
+ end).
+
+write_in_mnesia(Group, Overall, ChildSpec, Id) ->
+ S = #mirrored_sup_childspec{key = {Group, Id},
+ mirroring_pid = Overall,
+ childspec = ChildSpec},
+ ok = mnesia:write(?TABLE, S, write),
+ ChildSpec.
+
+%% -------------------------------------------------------------------
+%% delete().
+%% -------------------------------------------------------------------
+
-spec delete(Group, Id) -> ok when
Group :: any(),
Id :: any().
@@ -62,7 +132,13 @@ delete(Group, Id) ->
delete_in_mnesia(Group, Id) ->
rabbit_mnesia:execute_mnesia_transaction(
- fun() -> delete_in_mnesia_tx(Group, Id) end).
+ fun() ->
+ ok = mnesia:delete({?TABLE, {Group, Id}})
+ end).
+
+%% -------------------------------------------------------------------
+%% find_mirror().
+%% -------------------------------------------------------------------
-spec find_mirror(Group, Id) -> Ret when
Group :: any(),
@@ -86,6 +162,10 @@ find_mirror_in_mnesia(Group, Id) ->
_ -> {error, not_found}
end.
+%% -------------------------------------------------------------------
+%% update_all().
+%% -------------------------------------------------------------------
+
-spec update_all(Overall, Overall) -> [ChildSpec] when
Overall :: pid(),
ChildSpec :: supervisor2:child_spec().
@@ -95,55 +175,6 @@ update_all(Overall, OldOverall) ->
#{mnesia => fun() -> update_all_in_mnesia(Overall, OldOverall) end
}).
--spec delete_all(Group) -> ok when
- Group :: any().
-
-delete_all(Group) ->
- rabbit_db:run(
- #{mnesia => fun() -> delete_all_in_mnesia(Group) end
- }).
-
-%%----------------------------------------------------------------------------
-
-create_tables_in_mnesia([]) ->
- ok;
-create_tables_in_mnesia([{Table, Attributes} | Ts]) ->
- case mnesia:create_table(Table, Attributes) of
- {atomic, ok} -> create_tables_in_mnesia(Ts);
- {aborted, {already_exists, ?TABLE}} -> create_tables_in_mnesia(Ts);
- Err -> Err
- end.
-
-create_or_update_in_mnesia(Group, Overall, Delegate, ChildSpec, Id) ->
- rabbit_mnesia:execute_mnesia_transaction(
- fun() ->
- ReadResult = mnesia:wread({?TABLE, {Group, Id}}),
- rabbit_log:debug("Mirrored supervisor: check_start table ~ts read for key ~tp returned ~tp",
- [?TABLE, {Group, Id}, ReadResult]),
- case ReadResult of
- [] -> _ = write_in_mnesia(Group, Overall, ChildSpec, Id),
- start;
- [S] -> #mirrored_sup_childspec{key = {Group, Id},
- mirroring_pid = Pid} = S,
- case Overall of
- Pid ->
- rabbit_log:debug("Mirrored supervisor: overall matched mirrored pid ~tp", [Pid]),
- Delegate;
- _ ->
- rabbit_log:debug("Mirrored supervisor: overall ~tp did not match mirrored pid ~tp", [Overall, Pid]),
- Sup = mirrored_supervisor:supervisor(Pid),
- rabbit_log:debug("Mirrored supervisor: supervisor(~tp) returned ~tp", [Pid, Sup]),
- case Sup of
- dead ->
- _ = write_in_mnesia(Group, Overall, ChildSpec, Id),
- start;
- Delegate0 ->
- Delegate0
- end
- end
- end
- end).
-
update_all_in_mnesia(Overall, OldOverall) ->
rabbit_mnesia:execute_mnesia_transaction(
fun() ->
@@ -155,22 +186,38 @@ update_all_in_mnesia(Overall, OldOverall) ->
[{Group, Id}, C] <- mnesia:select(?TABLE, [{MatchHead, [], ['$$']}])]
end).
+%% -------------------------------------------------------------------
+%% delete_all().
+%% -------------------------------------------------------------------
+
+-spec delete_all(Group) -> ok when
+ Group :: any().
+
+delete_all(Group) ->
+ rabbit_db:run(
+ #{mnesia => fun() -> delete_all_in_mnesia(Group) end
+ }).
+
delete_all_in_mnesia(Group) ->
rabbit_mnesia:execute_mnesia_transaction(
fun() ->
MatchHead = #mirrored_sup_childspec{key = {Group, '$1'},
_ = '_'},
- [delete_in_mnesia_tx(Group, Id) ||
+ [ok = mnesia:delete({?TABLE, {Group, Id}}) ||
Id <- mnesia:select(?TABLE, [{MatchHead, [], ['$1']}])]
end),
ok.
-write_in_mnesia(Group, Overall, ChildSpec, Id) ->
- S = #mirrored_sup_childspec{key = {Group, Id},
- mirroring_pid = Overall,
- childspec = ChildSpec},
- ok = mnesia:write(?TABLE, S, write),
- ChildSpec.
+%% -------------------------------------------------------------------
+%% clear().
+%% -------------------------------------------------------------------
+
+-spec clear() -> ok.
-delete_in_mnesia_tx(Group, Id) ->
- ok = mnesia:delete({?TABLE, {Group, Id}}).
+clear() ->
+ rabbit_db:run(
+ #{mnesia => fun() -> clear_in_mnesia() end}).
+
+clear_in_mnesia() ->
+ {atomic, ok} = mnesia:clear_table(?TABLE),
+ ok.
diff --git a/deps/rabbit/src/rabbit_db_policy.erl b/deps/rabbit/src/rabbit_db_policy.erl
index 25ae02ac2a..ee713aa94c 100644
--- a/deps/rabbit/src/rabbit_db_policy.erl
+++ b/deps/rabbit/src/rabbit_db_policy.erl
@@ -12,12 +12,18 @@
-export([update/3]).
+%% -------------------------------------------------------------------
+%% update().
+%% -------------------------------------------------------------------
+
-spec update(VHostName, UpdateXFun, UpdateQFun) -> Ret when
VHostName :: vhost:name(),
Exchange :: rabbit_types:exchange(),
Queue :: amqqueue:amqqueue(),
- UpdateXFun :: fun((Exchange) -> Exchange),
- UpdateQFun :: fun((Queue) -> Queue),
+ UpdateXFun :: fun((Exchange) -> #{exchange => Exchange,
+ update_function => fun((Exchange) -> Exchange)}),
+ UpdateQFun :: fun((Queue) -> #{queue => Queue,
+ update_function => fun((Queue) -> Queue)}),
Ret :: {[{Exchange, Exchange}], [{Queue, Queue}]}.
update(VHost, GetUpdatedExchangeFun, GetUpdatedQueueFun) ->
@@ -33,7 +39,7 @@ update_in_mnesia(VHost, GetUpdatedExchangeFun, GetUpdatedQueueFun) ->
rabbit_exchange, rabbit_durable_exchange],
rabbit_mnesia:execute_mnesia_transaction(
fun() ->
- [mnesia:lock({table, T}, write) || T <- Tabs], %% [1]
+ _ = [mnesia:lock({table, T}, write) || T <- Tabs], %% [1]
Exchanges0 = rabbit_db_exchange:get_all(VHost),
Queues0 = rabbit_db_queue:get_all(VHost),
Exchanges = [GetUpdatedExchangeFun(X) || X <- Exchanges0],
diff --git a/deps/rabbit/src/rabbit_db_queue.erl b/deps/rabbit/src/rabbit_db_queue.erl
index e713759a5d..407d58e6af 100644
--- a/deps/rabbit/src/rabbit_db_queue.erl
+++ b/deps/rabbit/src/rabbit_db_queue.erl
@@ -13,6 +13,7 @@
-export([
get/1,
+ get_many/1,
get_all/0,
get_all/1,
get_all_by_type/1,
@@ -20,9 +21,9 @@
list/0,
count/0,
count/1,
- create_or_get/2,
- insert/2,
- insert/1,
+ create_or_get/1,
+ set/1,
+ set_many/1,
delete/2,
update/2,
update_decorators/1,
@@ -31,28 +32,36 @@
-export([
get_all_durable/0,
- get_all_durable/1,
get_all_durable_by_type/1,
- get_durable/1
+ filter_all_durable/1,
+ update_durable/2,
+ get_durable/1,
+ get_many_durable/1,
+ consistent_exists/1
]).
--export([delete_transient/1]).
--export([on_node_up/2,
- on_node_down/2]).
+%% Used by on_node_up and on_node_down
+-export([foreach_transient/1,
+ delete_transient/1]).
--export([match_and_update/3]).
--export([insert_dirty/1]).
+%% Used only by forget all durable
+-export([foreach_durable/2,
+ internal_delete/3]).
--export([not_found_or_absent_queue_dirty/1]).
-
--export([internal_delete/3]).
+-export([set_dirty/1]).
%% Used by other rabbit_db_* modules
-export([
update_in_mnesia_tx/2,
- not_found_or_absent_queue_in_mnesia/1
+ get_durable_in_mnesia_tx/1
]).
+%% For testing
+-export([clear/0]).
+
+-define(MNESIA_TABLE, rabbit_queue).
+-define(MNESIA_DURABLE_TABLE, rabbit_durable_queue).
+
%% -------------------------------------------------------------------
%% get_all().
%% -------------------------------------------------------------------
@@ -74,7 +83,7 @@ get_all() ->
get_all_in_mnesia() ->
list_with_possible_retry_in_mnesia(
fun() ->
- rabbit_db:list_in_mnesia(rabbit_queue, amqqueue:pattern_match_all())
+ rabbit_db:list_in_mnesia(?MNESIA_TABLE, amqqueue:pattern_match_all())
end).
-spec get_all(VHostName) -> [Queue] when
@@ -87,11 +96,18 @@ get_all_in_mnesia() ->
%%
%% @private
-get_all(VHost) ->
+get_all(VHostName) ->
rabbit_db:run(
- #{mnesia => fun() -> get_all_in_mnesia(VHost) end
+ #{mnesia => fun() -> get_all_in_mnesia(VHostName) end
}).
+get_all_in_mnesia(VHostName) ->
+ list_with_possible_retry_in_mnesia(
+ fun() ->
+ Pattern = amqqueue:pattern_match_on_name(rabbit_misc:r(VHostName, queue)),
+ rabbit_db:list_in_mnesia(?MNESIA_TABLE, Pattern)
+ end).
+
%% -------------------------------------------------------------------
%% get_all_durable().
%% -------------------------------------------------------------------
@@ -113,31 +129,19 @@ get_all_durable() ->
get_all_durable_in_mnesia() ->
list_with_possible_retry_in_mnesia(
fun() ->
- rabbit_db:list_in_mnesia(rabbit_durable_queue, amqqueue:pattern_match_all())
+ rabbit_db:list_in_mnesia(?MNESIA_DURABLE_TABLE, amqqueue:pattern_match_all())
end).
--spec get_all_durable(VHostName) -> [Queue] when
- VHostName :: vhost:name(),
+-spec get_all_durable_by_type(Type) -> [Queue] when
+ Type :: atom(),
Queue :: amqqueue:amqqueue().
-%% @doc Gets all durable queues belonging to the given virtual host
+%% @doc Gets all durable queues of the given type
%%
%% @returns a list of queue records.
%%
%% @private
-get_all_durable(VHost) ->
- rabbit_db:run(
- #{mnesia => fun() -> get_all_durable_in_mnesia(VHost) end
- }).
-
-get_all_durable_in_mnesia(VHost) ->
- list_with_possible_retry_in_mnesia(
- fun() ->
- Pattern = amqqueue:pattern_match_on_name(rabbit_misc:r(VHost, queue)),
- rabbit_db:list_in_mnesia(rabbit_durable_queue, Pattern)
- end).
-
get_all_durable_by_type(Type) ->
rabbit_db:run(
#{mnesia => fun() -> get_all_durable_by_type_in_mnesia(Type) end
@@ -145,7 +149,47 @@ get_all_durable_by_type(Type) ->
get_all_durable_by_type_in_mnesia(Type) ->
Pattern = amqqueue:pattern_match_on_type(Type),
- rabbit_db:list_in_mnesia(rabbit_durable_queue, Pattern).
+ rabbit_db:list_in_mnesia(?MNESIA_DURABLE_TABLE, Pattern).
+
+%% -------------------------------------------------------------------
+%% filter_all_durable().
+%% -------------------------------------------------------------------
+
+-spec filter_all_durable(FilterFun) -> [Queue] when
+ Queue :: amqqueue:amqqueue(),
+ FilterFun :: fun((Queue) -> boolean()).
+
+%% @doc Filters all durable queues
+%%
+%% @returns a list of queue records.
+%%
+%% @private
+
+filter_all_durable(FilterFun) ->
+ rabbit_db:run(
+ #{mnesia => fun() -> filter_all_durable_in_mnesia(FilterFun) end
+ }).
+
+filter_all_durable_in_mnesia(FilterFun) ->
+ rabbit_mnesia:execute_mnesia_transaction(
+ fun () ->
+ qlc:e(qlc:q([Q || Q <- mnesia:table(?MNESIA_DURABLE_TABLE),
+ FilterFun(Q)
+ ]))
+ end).
+
+%% -------------------------------------------------------------------
+%% list().
+%% -------------------------------------------------------------------
+
+-spec list() -> [QName] when
+ QName :: rabbit_amqqueue:name().
+
+%% @doc Returns all queue names.
+%%
+%% @returns the list of all queue names.
+%%
+%% @private
list() ->
rabbit_db:run(
@@ -153,7 +197,20 @@ list() ->
}).
list_in_mnesia() ->
- mnesia:dirty_all_keys(rabbit_queue).
+ mnesia:dirty_all_keys(?MNESIA_TABLE).
+
+%% -------------------------------------------------------------------
+%% count().
+%% -------------------------------------------------------------------
+
+-spec count() -> Count when
+ Count :: integer().
+
+%% @doc Counts the number of queues
+%%
+%% @returns the number of queues.
+%%
+%% @private
count() ->
rabbit_db:run(
@@ -161,141 +218,307 @@ count() ->
}).
count_in_mnesia() ->
- mnesia:table_info(rabbit_queue, size).
+ mnesia:table_info(?MNESIA_TABLE, size).
+
+-spec count(VHostName) -> Count when
+ VHostName :: vhost:name(),
+ Count :: integer().
-count(VHost) ->
+%% @doc Counts the number of queues for the given vhost
+%%
+%% @returns the number of queues for the given vhost
+%%
+%% @private
+
+count(VHostName) ->
try
- list_for_count(VHost)
+ list_for_count(VHostName)
catch _:Err ->
rabbit_log:error("Failed to fetch number of queues in vhost ~p:~n~p",
- [VHost, Err]),
+ [VHostName, Err]),
0
end.
+list_for_count(VHostName) ->
+ rabbit_db:run(
+ #{mnesia => fun() -> list_for_count_in_mnesia(VHostName) end
+ }).
+
+list_for_count_in_mnesia(VHostName) ->
+ %% this is certainly suboptimal but there is no way to count
+ %% things using a secondary index in Mnesia. Our counter-table-per-node
+ %% won't work here because with master migration of mirrored queues
+ %% the "ownership" of queues by nodes becomes a non-trivial problem
+ %% that requires a proper consensus algorithm.
+ list_with_possible_retry_in_mnesia(
+ fun() ->
+ length(mnesia:dirty_index_read(?MNESIA_TABLE,
+ VHostName,
+ amqqueue:field_vhost()))
+ end).
+
+%% -------------------------------------------------------------------
+%% delete().
+%% -------------------------------------------------------------------
+
+-spec delete(QName, Reason) -> Ret when
+ QName :: rabbit_amqqueue:name(),
+ Reason :: atom(),
+ Ret :: ok | Deletions :: rabbit_binding:deletions().
+
delete(QueueName, Reason) ->
rabbit_db:run(
#{mnesia => fun() -> delete_in_mnesia(QueueName, Reason) end
}).
+delete_in_mnesia(QueueName, Reason) ->
+ rabbit_mnesia:execute_mnesia_transaction(
+ fun () ->
+ case {mnesia:wread({?MNESIA_TABLE, QueueName}),
+ mnesia:wread({?MNESIA_DURABLE_TABLE, QueueName})} of
+ {[], []} ->
+ ok;
+ _ ->
+ internal_delete_in_mnesia(QueueName, false, Reason)
+ end
+ end).
+
+%% -------------------------------------------------------------------
+%% internal_delete().
+%% -------------------------------------------------------------------
+
+-spec internal_delete(QName, OnlyDurable, Reason) -> Deletions when
+ QName :: rabbit_amqqueue:name(),
+ OnlyDurable :: boolean(),
+ Reason :: atom(),
+ Deletions :: rabbit_binding:deletions().
+
internal_delete(QueueName, OnlyDurable, Reason) ->
%% Only used by rabbit_amqqueue:forget_node_for_queue, which is only called
- %% by `rabbit_mnesia:remove_node_if_mnesia_running`. Thus, once mnesia and/or
+ %% by `rabbit_mnesia:remove_node_if_mnesia_running'. Thus, once mnesia and/or
%% HA queues are removed it can be removed.
rabbit_db:run(
#{mnesia => fun() -> internal_delete_in_mnesia(QueueName, OnlyDurable, Reason) end
}).
-get(Names) when is_list(Names) ->
+internal_delete_in_mnesia(QueueName, OnlyDurable, Reason) ->
+ ok = mnesia:delete({?MNESIA_TABLE, QueueName}),
+ case Reason of
+ auto_delete ->
+ %% efficiency improvement when a channel with many auto-delete queues
+ %% is being closed
+ case mnesia:wread({?MNESIA_DURABLE_TABLE, QueueName}) of
+ [] -> ok;
+ [_] -> ok = mnesia:delete({?MNESIA_DURABLE_TABLE, QueueName})
+ end;
+ _ ->
+ mnesia:delete({?MNESIA_DURABLE_TABLE, QueueName})
+ end,
+ %% we want to execute some things, as decided by rabbit_exchange,
+ %% after the transaction.
+ rabbit_db_binding:delete_for_destination_in_mnesia(QueueName, OnlyDurable).
+
+%% -------------------------------------------------------------------
+%% get_many().
+%% -------------------------------------------------------------------
+
+-spec get_many([QName]) -> Ret when
+ QName :: rabbit_amqqueue:name(),
+ Ret :: [Queue :: amqqueue:amqqueue()].
+get_many(Names) when is_list(Names) ->
rabbit_db:run(
- #{mnesia => fun() -> get_many_in_mnesia(rabbit_queue, Names) end
- });
+ #{mnesia => fun() -> get_many_in_mnesia(?MNESIA_TABLE, Names) end
+ }).
+
+get_many_in_mnesia(Table, [Name]) ->
+ ets:lookup(Table, Name);
+get_many_in_mnesia(Table, Names) when is_list(Names) ->
+ %% Normally we'd call mnesia:dirty_read/1 here, but that is quite
+ %% expensive for reasons explained in rabbit_mnesia:dirty_read/1.
+ lists:append([ets:lookup(Table, Name) || Name <- Names]).
+
+%% -------------------------------------------------------------------
+%% get().
+%% -------------------------------------------------------------------
+
+-spec get(QName) -> Ret when
+ QName :: rabbit_amqqueue:name(),
+ Ret :: {ok, Queue :: amqqueue:amqqueue()} | {error, not_found}.
get(Name) ->
rabbit_db:run(
#{mnesia => fun() -> get_in_mnesia(Name) end
}).
get_in_mnesia(Name) ->
- rabbit_mnesia:dirty_read({rabbit_queue, Name}).
+ rabbit_mnesia:dirty_read({?MNESIA_TABLE, Name}).
+
+%% -------------------------------------------------------------------
+%% get_durable().
+%% -------------------------------------------------------------------
+
+-spec get_durable(QName) -> Ret when
+ QName :: rabbit_amqqueue:name(),
+ Ret :: {ok, Queue :: amqqueue:amqqueue()} | {error, not_found}.
-get_durable(Names) when is_list(Names) ->
- rabbit_db:run(
- #{mnesia => fun() -> get_many_in_mnesia(rabbit_durable_queue, Names) end
- });
get_durable(Name) ->
rabbit_db:run(
#{mnesia => fun() -> get_durable_in_mnesia(Name) end
}).
get_durable_in_mnesia(Name) ->
- rabbit_mnesia:dirty_read({rabbit_durable_queue, Name}).
+ rabbit_mnesia:dirty_read({?MNESIA_DURABLE_TABLE, Name}).
+
+%% -------------------------------------------------------------------
+%% get_many_durable().
+%% -------------------------------------------------------------------
+
+-spec get_many_durable([QName]) -> Ret when
+ QName :: rabbit_amqqueue:name(),
+ Ret :: [Queue :: amqqueue:amqqueue()].
-delete_transient(Queues) ->
+get_many_durable(Names) when is_list(Names) ->
rabbit_db:run(
- #{mnesia => fun() -> delete_transient_in_mnesia(Queues) end
+ #{mnesia => fun() -> get_many_in_mnesia(?MNESIA_DURABLE_TABLE, Names) end
}).
-delete_transient_in_mnesia(Queues) ->
- rabbit_mnesia:execute_mnesia_transaction(
- fun () ->
- [{QName, delete_transient_in_mnesia_tx(QName)}
- || QName <- Queues]
- end).
+%% -------------------------------------------------------------------
+%% update().
+%% -------------------------------------------------------------------
+
+-spec update(QName, UpdateFun) -> Ret when
+ QName :: rabbit_amqqueue:name(),
+ Queue :: amqqueue:amqqueue(),
+ UpdateFun :: fun((Queue) -> Queue),
+ Ret :: Queue | not_found.
+%% @doc Updates an existing queue record using `UpdateFun'.
+%%
+%% @private
-on_node_up(Node, Fun) ->
+update(QName, Fun) ->
rabbit_db:run(
- #{mnesia => fun() -> on_node_up_in_mnesia(Node, Fun) end
+ #{mnesia => fun() -> update_in_mnesia(QName, Fun) end
}).
-on_node_up_in_mnesia(Node, Fun) ->
+update_in_mnesia(QName, Fun) ->
rabbit_mnesia:execute_mnesia_transaction(
- fun () ->
- Qs = mnesia:match_object(rabbit_queue,
- amqqueue:pattern_match_all(), write),
- [Fun(Node, Q) || Q <- Qs],
- ok
+ fun() ->
+ update_in_mnesia_tx(QName, Fun)
end).
-on_node_down(Node, Fun) ->
- rabbit_db:run(
- #{mnesia => fun() -> on_node_down_in_mnesia(Node, Fun) end
- }).
-
-on_node_down_in_mnesia(Node, Fun) ->
- Qs = rabbit_mnesia:execute_mnesia_transaction(
- fun () ->
- qlc:e(qlc:q([amqqueue:get_name(Q) || Q <- mnesia:table(rabbit_queue),
- Fun(Node, Q)
- ]))
- end),
- lists:unzip(lists:flatten(
- [case delete_transient(Queues) of
- {error, noproc} -> [];
- {error, {timeout, _}} -> [];
- Value -> Value
- end || Queues <- partition_queues(Qs)]
- )).
+%% -------------------------------------------------------------------
+%% update_decorators().
+%% -------------------------------------------------------------------
-% If there are many queues and we delete them all in a single Mnesia transaction,
-% this can block all other Mnesia operations for a really long time.
-% In situations where a node wants to (re-)join a cluster,
-% Mnesia won't be able to sync on the new node until this operation finishes.
-% As a result, we want to have multiple Mnesia transactions so that other
-% operations can make progress in between these queue delete transactions.
-%
-% 10 queues per Mnesia transaction is an arbitrary number, but it seems to work OK with 50k queues per node.
-partition_queues([Q0,Q1,Q2,Q3,Q4,Q5,Q6,Q7,Q8,Q9 | T]) ->
- [[Q0,Q1,Q2,Q3,Q4,Q5,Q6,Q7,Q8,Q9] | partition_queues(T)];
-partition_queues(T) ->
- [T].
+-spec update_decorators(QName) -> ok when
+ QName :: rabbit_amqqueue:name().
+%% @doc Updates an existing queue record adding the active queue decorators.
+%%
+%% @private
-update(QName, Fun) ->
+update_decorators(QName) ->
rabbit_db:run(
- #{mnesia => fun() -> update_in_mnesia(QName, Fun) end
+ #{mnesia => fun() -> update_decorators_in_mnesia(QName) end
}).
-update_in_mnesia(QName, Fun) ->
+update_decorators_in_mnesia(Name) ->
rabbit_mnesia:execute_mnesia_transaction(
fun() ->
- update_in_mnesia_tx(QName, Fun)
+ case mnesia:wread({?MNESIA_TABLE, Name}) of
+ [Q] -> ok = mnesia:write(?MNESIA_TABLE, rabbit_queue_decorator:set(Q),
+ write);
+ [] -> ok
+ end
end).
-update_decorators(Name) ->
+%% -------------------------------------------------------------------
+%% update_durable().
+%% -------------------------------------------------------------------
+
+-spec update_durable(UpdateFun, FilterFun) -> ok when
+ UpdateFun :: fun((Queue) -> any()),
+ FilterFun :: fun((Queue) -> boolean()).
+%% @doc Applies `UpdateFun' to all durable queue records that match `FilterFun'
+%% and stores them
+%%
+%% @private
+
+update_durable(UpdateFun, FilterFun) ->
rabbit_db:run(
- #{mnesia => fun() -> update_decorators_in_mnesia(Name) end
+ #{mnesia =>
+ fun() -> update_durable_in_mnesia(UpdateFun, FilterFun) end
}).
-not_found_or_absent_queue_dirty(Name) ->
+update_durable_in_mnesia(UpdateFun, FilterFun) ->
+ Pattern = amqqueue:pattern_match_all(),
+ {atomic, ok} =
+ mnesia:sync_transaction(
+ fun () ->
+ Qs = mnesia:match_object(?MNESIA_DURABLE_TABLE, Pattern, write),
+ _ = [mnesia:write(?MNESIA_DURABLE_TABLE, UpdateFun(Q), write)
+ || Q <- Qs, FilterFun(Q)],
+ ok
+ end),
+ ok.
+
+%% -------------------------------------------------------------------
+%% exists().
+%% -------------------------------------------------------------------
+
+-spec exists(QName) -> Exists when
+ QName :: rabbit_amqqueue:name(),
+ Exists :: boolean().
+%% @doc Indicates if queue named `QName' exists.
+%%
+%% @returns true if the queue exists, false otherwise.
+%%
+%% @private
+
+exists(QName) ->
rabbit_db:run(
- #{mnesia => fun() -> not_found_or_absent_queue_dirty_in_mnesia(Name) end
+ #{mnesia => fun() -> exists_in_mnesia(QName) end
}).
-exists(Name) ->
+exists_in_mnesia(QName) ->
+ ets:member(?MNESIA_TABLE, QName).
+
+%% -------------------------------------------------------------------
+%% exists().
+%% -------------------------------------------------------------------
+
+-spec consistent_exists(QName) -> Exists when
+ QName :: rabbit_amqqueue:name(),
+ Exists :: boolean().
+%% @doc Indicates if queue named `QName' exists using a consistent read.
+%% Just used by `rabbit_classic_queue:is_recoverable` for transient queues.
+%%
+%% @returns true if the queue exists, false otherwise.
+%%
+%% @private
+
+consistent_exists(QName) ->
rabbit_db:run(
- #{mnesia => fun() -> exists_in_mnesia(Name) end
+ #{mnesia => fun() -> consistent_exists_in_mnesia(QName) end
}).
-exists_in_mnesia(Name) ->
- ets:member(rabbit_queue, Name).
+consistent_exists_in_mnesia(QName) ->
+ case mnesia:read({?MNESIA_TABLE, QName}) of
+ [] -> false;
+ [_] -> true
+ end.
+
+%% -------------------------------------------------------------------
+%% get_all_by_type().
+%% -------------------------------------------------------------------
+
+-spec get_all_by_type(Type) -> [Queue] when
+ Type :: atom(),
+ Queue :: amqqueue:amqqueue().
+
+%% @doc Gets all queues belonging to the given type
+%%
+%% @returns a list of queue records.
+%%
+%% @private
get_all_by_type(Type) ->
Pattern = amqqueue:pattern_match_on_type(Type),
@@ -304,101 +527,284 @@ get_all_by_type(Type) ->
}).
get_all_by_pattern_in_mnesia(Pattern) ->
- rabbit_db:list_in_mnesia(rabbit_queue, Pattern).
+ rabbit_db:list_in_mnesia(?MNESIA_TABLE, Pattern).
-get_all_by_type_and_node(VHost, Type, Node) ->
+%% -------------------------------------------------------------------
+%% get_all_by_type_and_node().
+%% -------------------------------------------------------------------
+
+-spec get_all_by_type_and_node(VHostName, Type, Node) -> [Queue] when
+ VHostName :: vhost:name(),
+ Type :: atom(),
+ Node :: 'none' | atom(),
+ Queue :: amqqueue:amqqueue().
+
+%% @doc Gets all queues belonging to the given type
+%%
+%% @returns a list of queue records.
+%%
+%% @private
+
+get_all_by_type_and_node(VHostName, Type, Node) ->
rabbit_db:run(
- #{mnesia => fun() -> get_all_by_type_and_node_in_mnesia(VHost, Type, Node) end
+ #{mnesia => fun() -> get_all_by_type_and_node_in_mnesia(VHostName, Type, Node) end
}).
-get_all_by_type_and_node_in_mnesia(VHost, Type, Node) ->
+get_all_by_type_and_node_in_mnesia(VHostName, Type, Node) ->
mnesia:async_dirty(
fun () ->
- qlc:e(qlc:q([Q || Q <- mnesia:table(rabbit_durable_queue),
+ qlc:e(qlc:q([Q || Q <- mnesia:table(?MNESIA_DURABLE_TABLE),
amqqueue:get_type(Q) =:= Type,
- amqqueue:get_vhost(Q) =:= VHost,
+ amqqueue:get_vhost(Q) =:= VHostName,
amqqueue:qnode(Q) == Node]))
end).
-create_or_get(DurableQ, Q) ->
+%% -------------------------------------------------------------------
+%% create_or_get().
+%% -------------------------------------------------------------------
+
+-spec create_or_get(Queue) -> Ret when
+ Queue :: amqqueue:amqqueue(),
+ Ret :: {created, Queue} | {existing, Queue} | {absent, Queue, nodedown}.
+%% @doc Writes a queue record if it doesn't exist already or returns the existing one
+%%
+%% @returns the existing record if there is one in the database already, or the newly
+%% created record.
+%%
+%% @private
+
+create_or_get(Q) ->
rabbit_db:run(
- #{mnesia => fun() -> create_or_get_in_mnesia(DurableQ, Q) end
+ #{mnesia => fun() -> create_or_get_in_mnesia(Q) end
}).
-create_or_get_in_mnesia(DurableQ, Q) ->
+create_or_get_in_mnesia(Q) ->
+ DurableQ = amqqueue:reset_mirroring_and_decorators(Q),
QueueName = amqqueue:get_name(Q),
rabbit_mnesia:execute_mnesia_transaction(
fun () ->
- case mnesia:wread({rabbit_queue, QueueName}) of
+ case mnesia:wread({?MNESIA_TABLE, QueueName}) of
[] ->
- case not_found_or_absent_queue_in_mnesia(QueueName) of
- not_found ->
- insert_in_mnesia_tx(DurableQ, Q),
+ case get_durable_in_mnesia_tx(QueueName) of
+ {error, not_found} ->
+ set_in_mnesia_tx(DurableQ, Q),
{created, Q};
- {absent, _Q, _} = R ->
- R
+ {ok, Q} ->
+ {absent, Q, nodedown}
end;
[ExistingQ] ->
{existing, ExistingQ}
end
end).
-insert(DurableQ, Q) ->
+%% -------------------------------------------------------------------
+%% set().
+%% -------------------------------------------------------------------
+
+-spec set(Queue) -> ok when
+ Queue :: amqqueue:amqqueue().
+%% @doc Writes a queue record. If the queue is durable, it writes both instances:
+%% durable and transient. For the durable one, it resets mirrors and decorators.
+%% The transient one is left as it is.
+%%
+%% @private
+
+set(Q) ->
rabbit_db:run(
- #{mnesia => fun() -> insert_in_mnesia(DurableQ, Q) end
+ #{mnesia => fun() -> set_in_mnesia(Q) end
}).
-insert_in_mnesia(DurableQ, Q) ->
+set_in_mnesia(Q) ->
+ DurableQ = amqqueue:reset_mirroring_and_decorators(Q),
rabbit_mnesia:execute_mnesia_transaction(
fun () ->
- insert_in_mnesia_tx(DurableQ, Q)
+ set_in_mnesia_tx(DurableQ, Q)
end).
-insert(Qs) ->
+set_in_mnesia_tx(DurableQ, Q) ->
+ case ?amqqueue_is_durable(Q) of
+ true ->
+ ok = mnesia:write(?MNESIA_DURABLE_TABLE, DurableQ, write);
+ false ->
+ ok
+ end,
+ ok = mnesia:write(?MNESIA_TABLE, Q, write).
+
+%% -------------------------------------------------------------------
+%% set_many().
+%% -------------------------------------------------------------------
+
+-spec set_many([Queue]) -> ok when
+ Queue :: amqqueue:amqqueue().
+%% @doc Writes a list of durable queue records.
+%% It is responsibility of the calling function to ensure all records are durable.
+%% Once transient entities are deprecated, this is a non-issue.
+%%
+%% @private
+
+set_many(Qs) ->
rabbit_db:run(
- #{mnesia => fun() -> insert_many_in_mnesia(Qs) end
+ #{mnesia => fun() -> set_many_in_mnesia(Qs) end
}).
-insert_many_in_mnesia(Qs) ->
+set_many_in_mnesia(Qs) ->
+ {atomic, ok} =
+ %% Just to be nested in forget_node_for_queue
+ mnesia:transaction(
+ fun() ->
+ [ok = mnesia:write(?MNESIA_DURABLE_TABLE, Q, write) || Q <- Qs],
+ ok
+ end),
+ ok.
+
+%% -------------------------------------------------------------------
+%% delete_transient().
+%% -------------------------------------------------------------------
+
+-spec delete_transient(FilterFun) -> Ret when
+ Queue :: amqqueue:amqqueue(),
+ FilterFun :: fun((Queue) -> boolean()),
+ QName :: rabbit_amqqueue:name(),
+ Ret :: {[QName], [Deletions :: rabbit_binding:deletions()]}.
+%% @doc Deletes all transient queues that match `FilterFun'.
+%%
+%% @private
+
+delete_transient(FilterFun) ->
+ rabbit_db:run(
+ #{mnesia => fun() -> delete_transient_in_mnesia(FilterFun) end
+ }).
+
+delete_transient_in_mnesia(FilterFun) ->
+ Qs = rabbit_mnesia:execute_mnesia_transaction(
+ fun () ->
+ qlc:e(qlc:q([amqqueue:get_name(Q) || Q <- mnesia:table(?MNESIA_TABLE),
+ FilterFun(Q)
+ ]))
+ end),
+ lists:unzip(lists:flatten(
+ [delete_many_transient_in_mnesia(Queues) || Queues <- partition_queues(Qs)]
+ )).
+
+-spec delete_many_transient_in_mnesia([QName]) -> Ret when
+ QName :: rabbit_amqqueue:name(),
+ Ret :: [{QName, Deletions :: rabbit_binding:deletions()}].
+
+delete_many_transient_in_mnesia(Queues) ->
rabbit_mnesia:execute_mnesia_transaction(
- fun() ->
- [ok = mnesia:write(rabbit_durable_queue, Q, write) || Q <- Qs]
+ fun () ->
+ [{QName, delete_transient_in_mnesia_tx(QName)}
+ || QName <- Queues]
+ end).
+
+delete_transient_in_mnesia_tx(QName) ->
+ ok = mnesia:delete({?MNESIA_TABLE, QName}),
+ rabbit_db_binding:delete_transient_for_destination_in_mnesia(QName).
+
+% If there are many queues and we delete them all in a single Mnesia transaction,
+% this can block all other Mnesia operations for a really long time.
+% In situations where a node wants to (re-)join a cluster,
+% Mnesia won't be able to sync on the new node until this operation finishes.
+% As a result, we want to have multiple Mnesia transactions so that other
+% operations can make progress in between these queue delete transactions.
+%
+% 10 queues per Mnesia transaction is an arbitrary number, but it seems to work OK with 50k queues per node.
+partition_queues([Q0,Q1,Q2,Q3,Q4,Q5,Q6,Q7,Q8,Q9 | T]) ->
+ [[Q0,Q1,Q2,Q3,Q4,Q5,Q6,Q7,Q8,Q9] | partition_queues(T)];
+partition_queues(T) ->
+ [T].
+
+%% -------------------------------------------------------------------
+%% foreach_transient().
+%% -------------------------------------------------------------------
+
+-spec foreach_transient(UpdateFun) -> ok when
+ Queue :: amqqueue:amqqueue(),
+ UpdateFun :: fun((Queue) -> any()).
+%% @doc Applies `UpdateFun' to all transient queue records.
+%%
+%% @private
+
+foreach_transient(UpdateFun) ->
+ rabbit_db:run(
+ #{mnesia => fun() -> foreach_transient_in_mnesia(UpdateFun) end
+ }).
+
+foreach_transient_in_mnesia(UpdateFun) ->
+ Pattern = amqqueue:pattern_match_all(),
+ rabbit_mnesia:execute_mnesia_transaction(
+ fun () ->
+ Qs = mnesia:match_object(?MNESIA_TABLE, Pattern, write),
+ _ = [UpdateFun(Q) || Q <- Qs],
+ ok
end).
-match_and_update(Pattern, UpdateFun, FilterFun) ->
+%% -------------------------------------------------------------------
+%% foreach_durable().
+%% -------------------------------------------------------------------
+
+-spec foreach_durable(UpdateFun, FilterFun) -> ok when
+ UpdateFun :: fun((Queue) -> any()),
+ FilterFun :: fun((Queue) -> boolean()).
+%% @doc Applies `UpdateFun' to all durable queue records that match `FilterFun'.
+%%
+%% @private
+
+foreach_durable(UpdateFun, FilterFun) ->
rabbit_db:run(
#{mnesia =>
- fun() -> match_and_update_in_mnesia(Pattern, UpdateFun, FilterFun) end
+ fun() -> foreach_durable_in_mnesia(UpdateFun, FilterFun) end
}).
-match_and_update_in_mnesia(Pattern, UpdateFun, FilterFun) ->
+foreach_durable_in_mnesia(UpdateFun, FilterFun) ->
%% Note rabbit is not running so we avoid e.g. the worker pool. Also why
%% we don't invoke the return from rabbit_binding:process_deletions/1.
+ Pattern = amqqueue:pattern_match_all(),
{atomic, ok} =
mnesia:sync_transaction(
fun () ->
- Qs = mnesia:match_object(rabbit_durable_queue, Pattern, write),
+ Qs = mnesia:match_object(?MNESIA_DURABLE_TABLE, Pattern, write),
_ = [UpdateFun(Q) || Q <- Qs, FilterFun(Q)],
ok
end),
ok.
-insert_dirty(Q) ->
+%% -------------------------------------------------------------------
+%% set_dirty().
+%% -------------------------------------------------------------------
+
+-spec set_dirty(Queue) -> ok when
+ Queue :: amqqueue:amqqueue().
+%% @doc Writes a transient queue record
+%%
+%% @private
+
+set_dirty(Q) ->
rabbit_db:run(
- #{mnesia => fun() -> insert_dirty_in_mnesia(Q) end
+ #{mnesia => fun() -> set_dirty_in_mnesia(Q) end
}).
-insert_dirty_in_mnesia(Q) ->
- ok = mnesia:dirty_write(rabbit_queue, rabbit_queue_decorator:set(Q)).
+set_dirty_in_mnesia(Q) ->
+ ok = mnesia:dirty_write(?MNESIA_TABLE, rabbit_queue_decorator:set(Q)).
+
+%% -------------------------------------------------------------------
+%% update_in_mnesia_tx().
+%% -------------------------------------------------------------------
+
+-spec update_in_mnesia_tx(QName, UpdateFun) -> Ret when
+ QName :: rabbit_amqqueue:name(),
+ Queue :: amqqueue:amqqueue(),
+ UpdateFun :: fun((Queue) -> Queue),
+ Ret :: Queue | not_found.
update_in_mnesia_tx(Name, Fun) ->
- case mnesia:wread({rabbit_queue, Name}) of
+ case mnesia:wread({?MNESIA_TABLE, Name}) of
[Q] ->
Durable = amqqueue:is_durable(Q),
Q1 = Fun(Q),
- ok = mnesia:write(rabbit_queue, Q1, write),
+ ok = mnesia:write(?MNESIA_TABLE, Q1, write),
case Durable of
- true -> ok = mnesia:write(rabbit_durable_queue, Q1, write);
+ true -> ok = mnesia:write(?MNESIA_DURABLE_TABLE, Q1, write);
_ -> ok
end,
Q1;
@@ -406,54 +812,52 @@ update_in_mnesia_tx(Name, Fun) ->
not_found
end.
-not_found_or_absent_queue_in_mnesia(Name) ->
- %% NB: we assume that the caller has already performed a lookup on
- %% rabbit_queue and not found anything
- case mnesia:read({rabbit_durable_queue, Name}) of
- [] -> not_found;
- [Q] -> {absent, Q, nodedown} %% Q exists on stopped node
+%% -------------------------------------------------------------------
+%% get_durable_in_mnesia_tx().
+%% -------------------------------------------------------------------
+
+-spec get_durable_in_mnesia_tx(QName) -> Ret when
+ QName :: rabbit_amqqueue:name(),
+ Ret :: {ok, Queue :: amqqueue:amqqueue()} | {error, not_found}.
+
+get_durable_in_mnesia_tx(Name) ->
+ case mnesia:read({?MNESIA_DURABLE_TABLE, Name}) of
+ [] -> {error, not_found};
+ [Q] -> {ok, Q}
end.
-%% Internal
-%% --------------------------------------------------------------
-get_many_in_mnesia(Table, [Name]) ->
- ets:lookup(Table, Name);
-get_many_in_mnesia(Table, Names) when is_list(Names) ->
- %% Normally we'd call mnesia:dirty_read/1 here, but that is quite
- %% expensive for reasons explained in rabbit_mnesia:dirty_read/1.
- lists:append([ets:lookup(Table, Name) || Name <- Names]).
+%% -------------------------------------------------------------------
+%% clear().
+%% -------------------------------------------------------------------
-delete_transient_in_mnesia_tx(QName) ->
- ok = mnesia:delete({rabbit_queue, QName}),
- rabbit_db_binding:delete_transient_for_destination_in_mnesia(QName).
+-spec clear() -> ok.
+%% @doc Deletes all queues.
+%%
+%% @private
-get_all_in_mnesia(VHost) ->
- list_with_possible_retry_in_mnesia(
- fun() ->
- Pattern = amqqueue:pattern_match_on_name(rabbit_misc:r(VHost, queue)),
- rabbit_db:list_in_mnesia(rabbit_queue, Pattern)
- end).
+clear() ->
+ rabbit_db:run(
+ #{mnesia => fun() -> clear_in_mnesia() end}).
-not_found_or_absent_queue_dirty_in_mnesia(Name) ->
- %% We should read from both tables inside a tx, to get a
- %% consistent view. But the chances of an inconsistency are small,
- %% and only affect the error kind.
- case rabbit_mnesia:dirty_read({rabbit_durable_queue, Name}) of
- {error, not_found} -> not_found;
- {ok, Q} -> {absent, Q, nodedown}
- end.
+clear_in_mnesia() ->
+ {atomic, ok} = mnesia:clear_table(?MNESIA_TABLE),
+ {atomic, ok} = mnesia:clear_table(?MNESIA_DURABLE_TABLE),
+ ok.
+
+%% Internal
+%% --------------------------------------------------------------
list_with_possible_retry_in_mnesia(Fun) ->
%% amqqueue migration:
- %% The `rabbit_queue` or `rabbit_durable_queue` tables
+ %% The `rabbit_queue' or `rabbit_durable_queue' tables
%% might be migrated between the time we query the pattern
- %% (with the `amqqueue` module) and the time we call
- %% `mnesia:dirty_match_object()`. This would lead to an empty list
+ %% (with the `amqqueue' module) and the time we call
+ %% `mnesia:dirty_match_object()'. This would lead to an empty list
%% (no object matching the now incorrect pattern), not a Mnesia
%% error.
%%
%% So if the result is an empty list and the version of the
- %% `amqqueue` record changed in between, we retry the operation.
+ %% `amqqueue' record changed in between, we retry the operation.
%%
%% However, we don't do this if inside a Mnesia transaction: we
%% could end up with a live lock between this started transaction
@@ -474,67 +878,3 @@ list_with_possible_retry_in_mnesia(Fun) ->
Ret ->
Ret
end.
-
-delete_in_mnesia(QueueName, Reason) ->
- rabbit_mnesia:execute_mnesia_transaction(
- fun () ->
- case {mnesia:wread({rabbit_queue, QueueName}),
- mnesia:wread({rabbit_durable_queue, QueueName})} of
- {[], []} ->
- ok;
- _ ->
- internal_delete_in_mnesia(QueueName, false, Reason)
- end
- end).
-
-internal_delete_in_mnesia(QueueName, OnlyDurable, Reason) ->
- ok = mnesia:delete({rabbit_queue, QueueName}),
- case Reason of
- auto_delete ->
- case mnesia:wread({rabbit_durable_queue, QueueName}) of
- [] -> ok;
- [_] -> ok = mnesia:delete({rabbit_durable_queue, QueueName})
- end;
- _ ->
- mnesia:delete({rabbit_durable_queue, QueueName})
- end,
- %% we want to execute some things, as decided by rabbit_exchange,
- %% after the transaction.
- rabbit_db_binding:delete_for_destination_in_mnesia(QueueName, OnlyDurable).
-
-list_for_count(VHost) ->
- rabbit_db:run(
- #{mnesia => fun() -> list_for_count_in_mnesia(VHost) end
- }).
-
-list_for_count_in_mnesia(VHost) ->
- %% this is certainly suboptimal but there is no way to count
- %% things using a secondary index in Mnesia. Our counter-table-per-node
- %% won't work here because with master migration of mirrored queues
- %% the "ownership" of queues by nodes becomes a non-trivial problem
- %% that requires a proper consensus algorithm.
- list_with_possible_retry_in_mnesia(
- fun() ->
- length(mnesia:dirty_index_read(rabbit_queue,
- VHost,
- amqqueue:field_vhost()))
- end).
-
-update_decorators_in_mnesia(Name) ->
- rabbit_mnesia:execute_mnesia_transaction(
- fun() ->
- case mnesia:wread({rabbit_queue, Name}) of
- [Q] -> ok = mnesia:write(rabbit_queue, rabbit_queue_decorator:set(Q),
- write);
- [] -> ok
- end
- end).
-
-insert_in_mnesia_tx(DurableQ, Q) ->
- case ?amqqueue_is_durable(Q) of
- true ->
- ok = mnesia:write(rabbit_durable_queue, DurableQ, write);
- false ->
- ok
- end,
- ok = mnesia:write(rabbit_queue, Q, write).
diff --git a/deps/rabbit/src/rabbit_db_topic_exchange.erl b/deps/rabbit/src/rabbit_db_topic_exchange.erl
index 89acc9d266..5c0f0283ef 100644
--- a/deps/rabbit/src/rabbit_db_topic_exchange.erl
+++ b/deps/rabbit/src/rabbit_db_topic_exchange.erl
@@ -9,21 +9,28 @@
-include_lib("rabbit_common/include/rabbit.hrl").
--export([insert/1, delete_all_for_exchange/1, delete/1, match/2]).
+-export([set/1, delete_all_for_exchange/1, delete/1, match/2]).
+
+%% For testing
+-export([clear/0]).
+
+-define(MNESIA_NODE_TABLE, rabbit_topic_trie_node).
+-define(MNESIA_EDGE_TABLE, rabbit_topic_trie_edge).
+-define(MNESIA_BINDING_TABLE, rabbit_topic_trie_binding).
%% -------------------------------------------------------------------
-%% insert().
+%% set().
%% -------------------------------------------------------------------
--spec insert(Binding) -> ok when
+-spec set(Binding) -> ok when
Binding :: rabbit_types:binding().
-%% @doc Inserts a topic binding.
+%% @doc Sets a topic binding.
%%
%% @private
-insert(#binding{source = XName, key = RoutingKey, destination = Destination, args = Args}) ->
+set(#binding{source = XName, key = RoutingKey, destination = Destination, args = Args}) ->
rabbit_db:run(
- #{mnesia => fun() -> insert_in_mnesia(XName, RoutingKey, Destination, Args) end
+ #{mnesia => fun() -> set_in_mnesia(XName, RoutingKey, Destination, Args) end
}).
%% -------------------------------------------------------------------
@@ -31,7 +38,7 @@ insert(#binding{source = XName, key = RoutingKey, destination = Destination, arg
%% -------------------------------------------------------------------
-spec delete_all_for_exchange(ExchangeName) -> ok when
- ExchangeName :: rabbit_types:r('exchange').
+ ExchangeName :: rabbit_exchange:name().
%% @doc Deletes all topic bindings for the exchange named `ExchangeName'
%%
%% @private
@@ -61,7 +68,7 @@ delete(Bs) when is_list(Bs) ->
%% -------------------------------------------------------------------
-spec match(ExchangeName, RoutingKey) -> ok when
- ExchangeName :: rabbit_types:r('exchange'),
+ ExchangeName :: rabbit_exchange:name(),
RoutingKey :: binary().
%% @doc Finds the topic binding matching the given exchange and routing key and returns
%% the destination of the binding
@@ -78,13 +85,33 @@ match(XName, RoutingKey) ->
end
}).
+%% -------------------------------------------------------------------
+%% clear().
+%% -------------------------------------------------------------------
+
+-spec clear() -> ok.
+%% @doc Deletes all topic bindings
+%%
+%% @private
+
+clear() ->
+ rabbit_db:run(
+ #{mnesia => fun() -> clear_in_mnesia() end
+ }).
+
+clear_in_mnesia() ->
+ {atomic, ok} = mnesia:clear_table(?MNESIA_NODE_TABLE),
+ {atomic, ok} = mnesia:clear_table(?MNESIA_EDGE_TABLE),
+ {atomic, ok} = mnesia:clear_table(?MNESIA_BINDING_TABLE),
+ ok.
+
%% Internal
%% --------------------------------------------------------------
split_topic_key(Key) ->
split_topic_key(Key, [], []).
-insert_in_mnesia(XName, RoutingKey, Destination, Args) ->
+set_in_mnesia(XName, RoutingKey, Destination, Args) ->
rabbit_mnesia:execute_mnesia_transaction(
fun() ->
FinalNode = follow_down_create(XName, split_topic_key(RoutingKey)),
@@ -106,19 +133,19 @@ match_in_mnesia(XName, RoutingKey) ->
mnesia:async_dirty(fun trie_match/2, [XName, Words]).
trie_remove_all_nodes(X) ->
- remove_all(rabbit_topic_trie_node,
+ remove_all(?MNESIA_NODE_TABLE,
#topic_trie_node{trie_node = #trie_node{exchange_name = X,
_ = '_'},
_ = '_'}).
trie_remove_all_edges(X) ->
- remove_all(rabbit_topic_trie_edge,
+ remove_all(?MNESIA_EDGE_TABLE,
#topic_trie_edge{trie_edge = #trie_edge{exchange_name = X,
_ = '_'},
_ = '_'}).
trie_remove_all_bindings(X) ->
- remove_all(rabbit_topic_trie_binding,
+ remove_all(?MNESIA_BINDING_TABLE,
#topic_trie_binding{
trie_binding = #trie_binding{exchange_name = X, _ = '_'},
_ = '_'}).
@@ -131,12 +158,12 @@ delete_in_mnesia_tx(Bs) ->
%% See rabbit_binding:lock_route_tables for the rationale for
%% taking table locks.
_ = case Bs of
- [_] -> ok;
- _ -> [mnesia:lock({table, T}, write) ||
- T <- [rabbit_topic_trie_node,
- rabbit_topic_trie_edge,
- rabbit_topic_trie_binding]]
- end,
+ [_] -> ok;
+ _ -> [mnesia:lock({table, T}, write) ||
+ T <- [?MNESIA_NODE_TABLE,
+ ?MNESIA_EDGE_TABLE,
+ ?MNESIA_BINDING_TABLE]]
+ end,
[case follow_down_get_path(X, split_topic_key(K)) of
{ok, Path = [{FinalNode, _} | _]} ->
trie_remove_binding(X, FinalNode, D, Args),
@@ -222,7 +249,7 @@ follow_down(X, CurNode, AccFun, Acc, Words = [W | RestW]) ->
remove_path_if_empty(_, [{root, none}]) ->
ok;
remove_path_if_empty(X, [{Node, W} | [{Parent, _} | _] = RestPath]) ->
- case mnesia:read(rabbit_topic_trie_node,
+ case mnesia:read(?MNESIA_NODE_TABLE,
#trie_node{exchange_name = X, node_id = Node}, write) of
[] -> trie_remove_edge(X, Parent, Node, W),
remove_path_if_empty(X, RestPath);
@@ -230,7 +257,7 @@ remove_path_if_empty(X, [{Node, W} | [{Parent, _} | _] = RestPath]) ->
end.
trie_child(X, Node, Word) ->
- case mnesia:read({rabbit_topic_trie_edge,
+ case mnesia:read({?MNESIA_EDGE_TABLE,
#trie_edge{exchange_name = X,
node_id = Node,
word = Word}}) of
@@ -244,10 +271,10 @@ trie_bindings(X, Node) ->
node_id = Node,
destination = '$1',
arguments = '_'}},
- mnesia:select(rabbit_topic_trie_binding, [{MatchHead, [], ['$1']}]).
+ mnesia:select(?MNESIA_BINDING_TABLE, [{MatchHead, [], ['$1']}]).
trie_update_node_counts(X, Node, Field, Delta) ->
- E = case mnesia:read(rabbit_topic_trie_node,
+ E = case mnesia:read(?MNESIA_NODE_TABLE,
#trie_node{exchange_name = X,
node_id = Node}, write) of
[] -> #topic_trie_node{trie_node = #trie_node{
@@ -259,9 +286,9 @@ trie_update_node_counts(X, Node, Field, Delta) ->
end,
case setelement(Field, E, element(Field, E) + Delta) of
#topic_trie_node{edge_count = 0, binding_count = 0} ->
- ok = mnesia:delete_object(rabbit_topic_trie_node, E, write);
+ ok = mnesia:delete_object(?MNESIA_NODE_TABLE, E, write);
EN ->
- ok = mnesia:write(rabbit_topic_trie_node, EN, write)
+ ok = mnesia:write(?MNESIA_NODE_TABLE, EN, write)
end.
trie_add_edge(X, FromNode, ToNode, W) ->
@@ -273,7 +300,7 @@ trie_remove_edge(X, FromNode, ToNode, W) ->
trie_edge_op(X, FromNode, ToNode, W, fun mnesia:delete_object/3).
trie_edge_op(X, FromNode, ToNode, W, Op) ->
- ok = Op(rabbit_topic_trie_edge,
+ ok = Op(?MNESIA_EDGE_TABLE,
#topic_trie_edge{trie_edge = #trie_edge{exchange_name = X,
node_id = FromNode,
word = W},
@@ -289,7 +316,7 @@ trie_remove_binding(X, Node, D, Args) ->
trie_binding_op(X, Node, D, Args, fun mnesia:delete_object/3).
trie_binding_op(X, Node, D, Args, Op) ->
- ok = Op(rabbit_topic_trie_binding,
+ ok = Op(?MNESIA_BINDING_TABLE,
#topic_trie_binding{
trie_binding = #trie_binding{exchange_name = X,
node_id = Node,
diff --git a/deps/rabbit/src/rabbit_db_user.erl b/deps/rabbit/src/rabbit_db_user.erl
index ba2adaf813..8b59512af8 100644
--- a/deps/rabbit/src/rabbit_db_user.erl
+++ b/deps/rabbit/src/rabbit_db_user.erl
@@ -28,6 +28,8 @@
clear_matching_topic_permissions/3,
delete/1]).
+-export([clear/0]).
+
-define(MNESIA_TABLE, rabbit_user).
-define(PERM_MNESIA_TABLE, rabbit_user_permission).
-define(TOPIC_PERM_MNESIA_TABLE, rabbit_topic_permission).
@@ -644,3 +646,22 @@ topic_permission_pattern(Username, VHostName, ExchangeName) ->
virtual_host = VHostName},
exchange = ExchangeName},
permission = '_'}.
+
+%% -------------------------------------------------------------------
+%% clear().
+%% -------------------------------------------------------------------
+
+-spec clear() -> ok.
+%% @doc Deletes all users and permissions.
+%%
+%% @private
+
+clear() ->
+ rabbit_db:run(
+ #{mnesia => fun() -> clear_in_mnesia() end}).
+
+clear_in_mnesia() ->
+ {atomic, ok} = mnesia:clear_table(?MNESIA_TABLE),
+ {atomic, ok} = mnesia:clear_table(?PERM_MNESIA_TABLE),
+ {atomic, ok} = mnesia:clear_table(?TOPIC_PERM_MNESIA_TABLE),
+ ok.
diff --git a/deps/rabbit/src/rabbit_db_vhost.erl b/deps/rabbit/src/rabbit_db_vhost.erl
index 1a01860a39..1d35e22c30 100644
--- a/deps/rabbit/src/rabbit_db_vhost.erl
+++ b/deps/rabbit/src/rabbit_db_vhost.erl
@@ -24,6 +24,8 @@
with_fun_in_mnesia_tx/2,
delete/1]).
+-export([clear/0]).
+
-define(MNESIA_TABLE, rabbit_vhost).
%% -------------------------------------------------------------------
@@ -319,3 +321,20 @@ delete_in_mnesia_tx(VHostName) ->
Existed = mnesia:wread({?MNESIA_TABLE, VHostName}) =/= [],
mnesia:delete({?MNESIA_TABLE, VHostName}),
Existed.
+
+%% -------------------------------------------------------------------
+%% clear().
+%% -------------------------------------------------------------------
+
+-spec clear() -> ok.
+%% @doc Deletes all vhosts.
+%%
+%% @private
+
+clear() ->
+ rabbit_db:run(
+ #{mnesia => fun() -> clear_in_mnesia() end}).
+
+clear_in_mnesia() ->
+ {atomic, ok} = mnesia:clear_table(?MNESIA_TABLE),
+ ok.
diff --git a/deps/rabbit/src/rabbit_dead_letter.erl b/deps/rabbit/src/rabbit_dead_letter.erl
index 22c633bb59..7212ed32df 100644
--- a/deps/rabbit/src/rabbit_dead_letter.erl
+++ b/deps/rabbit/src/rabbit_dead_letter.erl
@@ -29,7 +29,7 @@ publish(Msg, Reason, X, RK, SourceQName) ->
{QNames, Cycles} = detect_cycles(Reason, DLMsg,
rabbit_exchange:route(X, Delivery)),
lists:foreach(fun log_cycle_once/1, Cycles),
- Qs0 = rabbit_amqqueue:lookup(QNames),
+ Qs0 = rabbit_amqqueue:lookup_many(QNames),
Qs = rabbit_amqqueue:prepend_extra_bcc(Qs0),
_ = rabbit_queue_type:deliver(Qs, Delivery, stateless),
ok.
diff --git a/deps/rabbit/src/rabbit_exchange.erl b/deps/rabbit/src/rabbit_exchange.erl
index bcc04c99f4..db7058cd68 100644
--- a/deps/rabbit/src/rabbit_exchange.erl
+++ b/deps/rabbit/src/rabbit_exchange.erl
@@ -17,13 +17,11 @@
route/2, delete/3, validate_binding/2, count/0]).
-export([list_names/0]).
-export([serialise_events/1]).
-%% these must be run inside a mnesia tx
-export([serial/1, peek_serial/1]).
%%----------------------------------------------------------------------------
-export_type([name/0, type/0]).
-
-type name() :: rabbit_types:r('exchange').
-type type() :: atom().
-type fun_name() :: atom().
@@ -79,13 +77,12 @@ serialise_events(X = #exchange{type = Type, decorators = Decorators}) ->
rabbit_exchange_decorator:select(all, Decorators))
orelse (type_to_module(Type)):serialise_events().
--spec serial(rabbit_types:exchange()) ->
- fun((boolean()) -> 'none' | pos_integer()).
+-spec serial(rabbit_types:exchange()) -> 'none' | pos_integer().
serial(X) ->
case serialise_events(X) of
false -> 'none';
- true -> rabbit_db_exchange:next_serial(X)
+ true -> rabbit_db_exchange:next_serial(X#exchange.name)
end.
-spec declare
@@ -215,7 +212,7 @@ list() ->
-spec count() -> non_neg_integer().
count() ->
- rabbit_db_exchange:get_all().
+ rabbit_db_exchange:count().
-spec list_names() -> [rabbit_exchange:name()].
@@ -254,8 +251,7 @@ update_scratch(Name, App, Fun) ->
{ok, X} -> rabbit_exchange_decorator:active(X);
{error, not_found} -> []
end,
- rabbit_db_exchange:update(Name, update_scratch_fun(App, Fun, Decorators)),
- ok.
+ ok = rabbit_db_exchange:update(Name, update_scratch_fun(App, Fun, Decorators)).
update_scratch_fun(App, Fun, Decorators) ->
fun(X = #exchange{scratches = Scratches0}) ->
@@ -272,11 +268,11 @@ update_scratch_fun(App, Fun, Decorators) ->
decorators = Decorators}
end.
--spec update_decorators(name(), [atom()] | none | undefined) -> 'ok'.
-
+-spec update_decorators(name(), {[Decorator], [Decorator]}) -> 'ok' when
+ Decorator :: atom().
update_decorators(Name, Decorators) ->
Fun = fun(X) -> X#exchange{decorators = Decorators} end,
- rabbit_db_exchange:update(Name, Fun).
+ ok = rabbit_db_exchange:update(Name, Fun).
-spec immutable(rabbit_types:exchange()) -> rabbit_types:exchange().
@@ -444,9 +440,7 @@ process_deletions({error, _} = E) ->
process_deletions({deleted, #exchange{name = XName} = X, Bs, Deletions}) ->
rabbit_binding:process_deletions(
rabbit_binding:add_deletion(
- XName, {X, deleted, Bs}, Deletions));
-process_deletions(Deletions) ->
- rabbit_binding:process_deletions(Deletions).
+ XName, {X, deleted, Bs}, Deletions)).
-spec validate_binding
(rabbit_types:exchange(), rabbit_types:binding())
diff --git a/deps/rabbit/src/rabbit_exchange_type_topic.erl b/deps/rabbit/src/rabbit_exchange_type_topic.erl
index 8ed3f7e1b5..ee94022b5f 100644
--- a/deps/rabbit/src/rabbit_exchange_type_topic.erl
+++ b/deps/rabbit/src/rabbit_exchange_type_topic.erl
@@ -49,7 +49,7 @@ delete(_Serial, #exchange{name = X}) ->
policy_changed(_X1, _X2) -> ok.
add_binding(_Serial, _Exchange, Binding) ->
- rabbit_db_topic_exchange:insert(Binding).
+ rabbit_db_topic_exchange:set(Binding).
remove_bindings(_Serial, _X, Bs) ->
rabbit_db_topic_exchange:delete(Bs).
diff --git a/deps/rabbit/src/rabbit_fifo_dlx_worker.erl b/deps/rabbit/src/rabbit_fifo_dlx_worker.erl
index 3dbbfa79c5..90c0b0996e 100644
--- a/deps/rabbit/src/rabbit_fifo_dlx_worker.erl
+++ b/deps/rabbit/src/rabbit_fifo_dlx_worker.erl
@@ -317,7 +317,7 @@ forward(ConsumedMsg, ConsumedMsgId, ConsumedQRef, DLX, Reason,
RouteToQs0 = rabbit_exchange:route(DLX, Delivery),
{RouteToQs1, Cycles} = rabbit_dead_letter:detect_cycles(Reason, Msg, RouteToQs0),
State1 = log_cycles(Cycles, RKeys, State0),
- RouteToQs2 = rabbit_amqqueue:lookup(RouteToQs1),
+ RouteToQs2 = rabbit_amqqueue:lookup_many(RouteToQs1),
RouteToQs = rabbit_amqqueue:prepend_extra_bcc(RouteToQs2),
State2 = case RouteToQs of
[] ->
@@ -469,7 +469,7 @@ redeliver0(#pending{delivery = #delivery{message = BasicMsg} = Delivery0,
%% queues that do not exist. Therefore, filter out non-existent target queues.
RouteToQs0 = queue_names(
rabbit_amqqueue:prepend_extra_bcc(
- rabbit_amqqueue:lookup(
+ rabbit_amqqueue:lookup_many(
rabbit_exchange:route(DLX, Delivery)))),
case {RouteToQs0, Settled} of
{[], [_|_]} ->
@@ -501,7 +501,7 @@ redeliver0(#pending{delivery = #delivery{message = BasicMsg} = Delivery0,
%% to be routed to is moved back to 'unsettled'.
rejected = []},
State = State0#state{pendings = maps:update(OutSeq, Pend, Pendings)},
- deliver_to_queues(Delivery, rabbit_amqqueue:lookup(RouteToQs), State)
+ deliver_to_queues(Delivery, rabbit_amqqueue:lookup_many(RouteToQs), State)
end
end.
diff --git a/deps/rabbit/src/rabbit_maintenance.erl b/deps/rabbit/src/rabbit_maintenance.erl
index 567ba3bb5a..aceb3ec181 100644
--- a/deps/rabbit/src/rabbit_maintenance.erl
+++ b/deps/rabbit/src/rabbit_maintenance.erl
@@ -130,11 +130,21 @@ is_being_drained_consistent_read(Node) ->
-spec status_local_read(node()) -> maintenance_status().
status_local_read(Node) ->
- rabbit_db_maintenance:get(Node).
+ case rabbit_db_maintenance:get(Node) of
+ undefined ->
+ ?DEFAULT_STATUS;
+ Status ->
+ Status
+ end.
-spec status_consistent_read(node()) -> maintenance_status().
status_consistent_read(Node) ->
- rabbit_db_maintenance:get_consistent(Node).
+ case rabbit_db_maintenance:get_consistent(Node) of
+ undefined ->
+ ?DEFAULT_STATUS;
+ Status ->
+ Status
+ end.
-spec filter_out_drained_nodes_local_read([node()]) -> [node()].
filter_out_drained_nodes_local_read(Nodes) ->
diff --git a/deps/rabbit/src/rabbit_policy.erl b/deps/rabbit/src/rabbit_policy.erl
index c09fd5b5a6..bdb96468d9 100644
--- a/deps/rabbit/src/rabbit_policy.erl
+++ b/deps/rabbit/src/rabbit_policy.erl
@@ -269,7 +269,7 @@ recover0() ->
operator_policy = match(Name, OpPolicies)})
|| X = #exchange{name = Name} <- Xs0],
Qs = rabbit_amqqueue:list_durable(),
- _ = rabbit_db_exchange:insert(Xs),
+ _ = rabbit_db_exchange:set(Xs),
Qs0 = [begin
QName = amqqueue:get_name(Q0),
Policy1 = match(QName, Policies),
@@ -278,7 +278,9 @@ recover0() ->
Q2 = amqqueue:set_operator_policy(Q1, OpPolicy1),
rabbit_queue_decorator:set(Q2)
end || Q0 <- Qs],
- _ = rabbit_db_queue:insert(Qs0),
+ %% This function is just used to recover policies, thus no transient entities
+ %% are considered for this process as there is none to recover on boot.
+ _ = rabbit_db_queue:set_many(Qs0),
ok.
invalid_file() ->
@@ -493,8 +495,6 @@ update_queue(Policy, OpPolicy, Decorators) ->
amqqueue:set_decorators(Queue3, Decorators)
end.
-maybe_notify_of_policy_change(no_change, _PolicyDef, _ActingUser)->
- ok;
maybe_notify_of_policy_change({X1 = #exchange{}, X2 = #exchange{}}, _PolicyDef, _ActingUser) ->
rabbit_exchange:policy_changed(X1, X2);
%% policy has been cleared
diff --git a/deps/rabbit/src/rabbit_priority_queue.erl b/deps/rabbit/src/rabbit_priority_queue.erl
index 3747adc719..382b513bec 100644
--- a/deps/rabbit/src/rabbit_priority_queue.erl
+++ b/deps/rabbit/src/rabbit_priority_queue.erl
@@ -106,7 +106,7 @@ mutate_name_bin(P, NameBin) ->
<<NameBin/binary, 0, P:8>>.
expand_queues(QNames) ->
- Qs = rabbit_db_queue:get_durable(QNames),
+ Qs = rabbit_db_queue:get_many_durable(QNames),
lists:unzip(lists:append([expand_queue(Q) || Q <- Qs])).
expand_queue(Q) ->
diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl
index 962c1decbb..75d7299a5b 100644
--- a/deps/rabbit/src/rabbit_quorum_queue.erl
+++ b/deps/rabbit/src/rabbit_quorum_queue.erl
@@ -326,7 +326,7 @@ become_leader(QName, Name) ->
%% we need to ensure there is no chance of blocking as else the ra node
%% may not be able to establish its leadership
spawn(fun() ->
- rabbit_amqqueue:update(QName, Fun),
+ _ = rabbit_amqqueue:update(QName, Fun),
case rabbit_amqqueue:lookup(QName) of
{ok, Q0} when ?is_amqqueue(Q0) ->
Nodes = get_nodes(Q0),
@@ -538,7 +538,7 @@ repair_amqqueue_nodes(Q0) ->
TS = TS0#{nodes => RaNodes},
amqqueue:set_type_state(Q, TS)
end,
- rabbit_amqqueue:update(QName, Fun),
+ _ = rabbit_amqqueue:update(QName, Fun),
repaired
end.
@@ -600,7 +600,7 @@ recover(_Vhost, Queues) ->
%% present in the rabbit_queue table and not just in
%% rabbit_durable_queue
%% So many code paths are dependent on this.
- ok = rabbit_db_queue:insert_dirty(Q0),
+ ok = rabbit_db_queue:set_dirty(Q0),
Q = Q0,
case Res of
ok ->
@@ -1099,7 +1099,7 @@ add_member(Q, Node, Timeout) when ?amqqueue_is_quorum(Q) ->
end),
amqqueue:set_pid(Q2, Leader)
end,
- rabbit_amqqueue:update(QName, Fun),
+ _ = rabbit_amqqueue:update(QName, Fun),
ok;
{timeout, _} ->
_ = ra:force_delete_server(?RA_SYSTEM, ServerId),
@@ -1153,7 +1153,7 @@ delete_member(Q, Node) when ?amqqueue_is_quorum(Q) ->
Ts#{nodes => lists:delete(Node, Nodes)}
end)
end,
- rabbit_amqqueue:update(QName, Fun),
+ _ = rabbit_amqqueue:update(QName, Fun),
case ra:force_delete_server(?RA_SYSTEM, ServerId) of
ok ->
ok;
diff --git a/deps/rabbit/src/rabbit_router.erl b/deps/rabbit/src/rabbit_router.erl
index be71476b6d..b894cfa3a1 100644
--- a/deps/rabbit/src/rabbit_router.erl
+++ b/deps/rabbit/src/rabbit_router.erl
@@ -25,42 +25,11 @@
match_result().
match_bindings(SrcName, Match) ->
- MatchHead = #route{binding = #binding{source = SrcName,
- _ = '_'}},
- Routes = ets:select(rabbit_route, [{MatchHead, [], [['$_']]}]),
- [Dest || [#route{binding = Binding = #binding{destination = Dest}}] <-
- Routes, Match(Binding)].
+ rabbit_db_binding:match(SrcName, Match).
-spec match_routing_key(rabbit_types:binding_source(),
[routing_key(), ...] | ['_']) ->
match_result().
-match_routing_key(SrcName, [RoutingKey]) ->
- find_routes(#route{binding = #binding{source = SrcName,
- destination = '$1',
- key = RoutingKey,
- _ = '_'}},
- []);
-match_routing_key(SrcName, [_|_] = RoutingKeys) ->
- find_routes(#route{binding = #binding{source = SrcName,
- destination = '$1',
- key = '$2',
- _ = '_'}},
- [list_to_tuple(['orelse' | [{'=:=', '$2', RKey} ||
- RKey <- RoutingKeys]])]).
-
-%%--------------------------------------------------------------------
-
-%% Normally we'd call mnesia:dirty_select/2 here, but that is quite
-%% expensive for the same reasons as above, and, additionally, due to
-%% mnesia 'fixing' the table with ets:safe_fixtable/2, which is wholly
-%% unnecessary. According to the ets docs (and the code in erl_db.c),
-%% 'select' is safe anyway ("Functions that internally traverse over a
-%% table, like select and match, will give the same guarantee as
-%% safe_fixtable.") and, furthermore, even the lower level iterators
-%% ('first' and 'next') are safe on ordered_set tables ("Note that for
-%% tables of the ordered_set type, safe_fixtable/2 is not necessary as
-%% calls to first/1 and next/2 will always succeed."), which
-%% rabbit_route is.
-find_routes(MatchHead, Conditions) ->
- ets:select(rabbit_route, [{MatchHead, Conditions, ['$1']}]).
+match_routing_key(SrcName, RoutingKeys) ->
+ rabbit_db_binding:match_routing_key(SrcName, RoutingKeys, false).
diff --git a/deps/rabbit/src/rabbit_stream_coordinator.erl b/deps/rabbit/src/rabbit_stream_coordinator.erl
index 05e99547bc..d555fb9dd8 100644
--- a/deps/rabbit/src/rabbit_stream_coordinator.erl
+++ b/deps/rabbit/src/rabbit_stream_coordinator.erl
@@ -153,7 +153,7 @@ restart_stream(QRes) ->
{timeout, term()}.
restart_stream(QRes, Options)
when element(1, QRes) == resource ->
- restart_stream(hd(rabbit_amqqueue:lookup([QRes])), Options);
+ restart_stream(hd(rabbit_amqqueue:lookup_many([QRes])), Options);
restart_stream(Q, Options)
when ?is_amqqueue(Q) andalso
?amqqueue_is_stream(Q) ->
@@ -1089,10 +1089,10 @@ phase_update_mnesia(StreamId, Args, #{reference := QName,
%% we need to re-initialise the queue record
%% if the stream id is a match
case rabbit_amqqueue:lookup_durable_queue(QName) of
- [] ->
+ {error, not_found} ->
%% queue not found at all, it must have been deleted
ok;
- [Q] ->
+ {ok, Q} ->
case amqqueue:get_type_state(Q) of
#{name := S} when S == StreamId ->
rabbit_log:debug("~ts: initializing queue record for stream id ~ts",
diff --git a/deps/rabbit/test/rabbit_db_binding_SUITE.erl b/deps/rabbit/test/rabbit_db_binding_SUITE.erl
new file mode 100644
index 0000000000..ed5376e3ea
--- /dev/null
+++ b/deps/rabbit/test/rabbit_db_binding_SUITE.erl
@@ -0,0 +1,331 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2023 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(rabbit_db_binding_SUITE).
+
+-include_lib("rabbit_common/include/rabbit.hrl").
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("common_test/include/ct.hrl").
+
+-compile(export_all).
+
+-define(VHOST, <<"/">>).
+
+all() ->
+ [
+ {group, all_tests}
+ ].
+
+groups() ->
+ [
+ {all_tests, [], all_tests()}
+ ].
+
+all_tests() ->
+ [
+ create,
+ exists,
+ delete,
+ auto_delete,
+ get_all,
+ get_all_by_vhost,
+ get_all_for_source,
+ get_all_for_destination,
+ get_all_for_source_and_destination,
+ get_all_for_source_and_destination_reverse,
+ fold,
+ match,
+ match_routing_key
+ ].
+
+%% -------------------------------------------------------------------
+%% Test suite setup/teardown.
+%% -------------------------------------------------------------------
+
+init_per_suite(Config) ->
+ rabbit_ct_helpers:log_environment(),
+ rabbit_ct_helpers:run_setup_steps(Config).
+
+end_per_suite(Config) ->
+ rabbit_ct_helpers:run_teardown_steps(Config).
+
+init_per_group(Group, Config) ->
+ Config1 = rabbit_ct_helpers:set_config(Config, [
+ {rmq_nodename_suffix, Group},
+ {rmq_nodes_count, 1}
+ ]),
+ rabbit_ct_helpers:run_steps(Config1,
+ rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps()).
+
+end_per_group(_Group, Config) ->
+ rabbit_ct_helpers:run_steps(Config,
+ rabbit_ct_client_helpers:teardown_steps() ++
+ rabbit_ct_broker_helpers:teardown_steps()).
+
+init_per_testcase(Testcase, Config) ->
+ rabbit_ct_helpers:testcase_started(Config, Testcase).
+
+end_per_testcase(Testcase, Config) ->
+ rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_db_exchange, clear, []),
+ rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_db_binding, clear, []),
+ rabbit_ct_helpers:testcase_finished(Config, Testcase).
+
+%% ---------------------------------------------------------------------------
+%% Test Cases
+%% ---------------------------------------------------------------------------
+
+create(Config) ->
+ passed = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, create1, [Config]).
+
+create1(_Config) ->
+ XName1 = rabbit_misc:r(?VHOST, exchange, <<"test-exchange1">>),
+ XName2 = rabbit_misc:r(?VHOST, exchange, <<"test-exchange2">>),
+ Exchange1 = #exchange{name = XName1, durable = true},
+ Exchange2 = #exchange{name = XName2, durable = true},
+ Binding = #binding{source = XName1, key = <<"">>, destination = XName2, args = #{}},
+ ?assertMatch({error, {resources_missing, [_, _]}},
+ rabbit_db_binding:create(Binding, fun(_, _) -> ok end)),
+ ?assertMatch({new, #exchange{}}, rabbit_db_exchange:create_or_get(Exchange1)),
+ ?assertMatch({error, {resources_missing, [_]}},
+ rabbit_db_binding:create(Binding, fun(_, _) -> ok end)),
+ ?assertMatch({new, #exchange{}}, rabbit_db_exchange:create_or_get(Exchange2)),
+ ?assertMatch({error, too_bad},
+ rabbit_db_binding:create(Binding, fun(_, _) -> {error, too_bad} end)),
+ ?assertMatch(ok, rabbit_db_binding:create(Binding, fun(_, _) -> ok end)),
+ passed.
+
+exists(Config) ->
+ passed = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, exists1, [Config]).
+
+exists1(_Config) ->
+ XName1 = rabbit_misc:r(?VHOST, exchange, <<"test-exchange1">>),
+ XName2 = rabbit_misc:r(?VHOST, exchange, <<"test-exchange2">>),
+ Exchange1 = #exchange{name = XName1, durable = true},
+ Exchange2 = #exchange{name = XName2, durable = true},
+ Binding = #binding{source = XName1, key = <<"">>, destination = XName2, args = #{}},
+ ?assertEqual(false, rabbit_db_exchange:exists(Binding)),
+ ?assertMatch({new, #exchange{}}, rabbit_db_exchange:create_or_get(Exchange1)),
+ ?assertMatch({new, #exchange{}}, rabbit_db_exchange:create_or_get(Exchange2)),
+ ?assertMatch(ok, rabbit_db_binding:create(Binding, fun(_, _) -> ok end)),
+ ?assertEqual(true, rabbit_db_binding:exists(Binding)),
+ passed.
+
+delete(Config) ->
+ passed = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete1, [Config]).
+
+delete1(_Config) ->
+ XName1 = rabbit_misc:r(?VHOST, exchange, <<"test-exchange1">>),
+ XName2 = rabbit_misc:r(?VHOST, exchange, <<"test-exchange2">>),
+ Exchange1 = #exchange{name = XName1, durable = true, auto_delete = false},
+ Exchange2 = #exchange{name = XName2, durable = true, auto_delete = false},
+ Binding = #binding{source = XName1, key = <<"">>, destination = XName2, args = #{}},
+ ?assertEqual(ok, rabbit_db_binding:delete(Binding, fun(_, _) -> ok end)),
+ ?assertMatch({new, #exchange{}}, rabbit_db_exchange:create_or_get(Exchange1)),
+ ?assertMatch({new, #exchange{}}, rabbit_db_exchange:create_or_get(Exchange2)),
+ ?assertMatch(ok, rabbit_db_binding:create(Binding, fun(_, _) -> ok end)),
+ Ret = rabbit_db_binding:delete(Binding, fun(_, _) -> ok end),
+ ?assertMatch({ok, _}, Ret),
+ {ok, Deletions} = Ret,
+ ?assertMatch({#exchange{}, not_deleted, [#binding{}], none},
+ dict:fetch(XName1, Deletions)),
+ ?assertEqual(false, rabbit_db_binding:exists(Binding)),
+ passed.
+
+auto_delete(Config) ->
+ passed = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete1, [Config]).
+
+auto_delete1(_Config) ->
+ XName1 = rabbit_misc:r(?VHOST, exchange, <<"test-exchange1">>),
+ XName2 = rabbit_misc:r(?VHOST, exchange, <<"test-exchange2">>),
+ Exchange1 = #exchange{name = XName1, durable = true, auto_delete = true},
+ Exchange2 = #exchange{name = XName2, durable = true, auto_delete = false},
+ Binding = #binding{source = XName1, key = <<"">>, destination = XName2, args = #{}},
+ ?assertEqual(ok, rabbit_db_binding:delete(Binding, fun(_, _) -> ok end)),
+ ?assertMatch({new, #exchange{}}, rabbit_db_exchange:create_or_get(Exchange1)),
+ ?assertMatch({new, #exchange{}}, rabbit_db_exchange:create_or_get(Exchange2)),
+ ?assertMatch(ok, rabbit_db_binding:create(Binding, fun(_, _) -> ok end)),
+ Ret = rabbit_db_binding:delete(Binding, fun(_, _) -> ok end),
+ ?assertMatch({ok, _}, Ret),
+ {ok, Deletions} = Ret,
+ ?assertMatch({#exchange{}, deleted, [#binding{}], none},
+ dict:fetch(XName1, Deletions)),
+ ?assertEqual(false, rabbit_db_binding:exists(Binding)),
+ passed.
+
+get_all(Config) ->
+ passed = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, get_all1, [Config]).
+
+get_all1(_Config) ->
+ XName1 = rabbit_misc:r(?VHOST, exchange, <<"test-exchange1">>),
+ XName2 = rabbit_misc:r(?VHOST, exchange, <<"test-exchange2">>),
+ Exchange1 = #exchange{name = XName1, durable = true},
+ Exchange2 = #exchange{name = XName2, durable = true},
+ Binding = #binding{source = XName1, key = <<"">>, destination = XName2, args = #{}},
+ ?assertEqual([], rabbit_db_binding:get_all()),
+ ?assertMatch({new, #exchange{}}, rabbit_db_exchange:create_or_get(Exchange1)),
+ ?assertMatch({new, #exchange{}}, rabbit_db_exchange:create_or_get(Exchange2)),
+ ?assertMatch(ok, rabbit_db_binding:create(Binding, fun(_, _) -> ok end)),
+ ?assertEqual([Binding], rabbit_db_binding:get_all()),
+ passed.
+
+get_all_by_vhost(Config) ->
+ passed = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, get_all_by_vhost1, [Config]).
+
+get_all_by_vhost1(_Config) ->
+ XName1 = rabbit_misc:r(?VHOST, exchange, <<"test-exchange1">>),
+ XName2 = rabbit_misc:r(?VHOST, exchange, <<"test-exchange2">>),
+ Exchange1 = #exchange{name = XName1, durable = true},
+ Exchange2 = #exchange{name = XName2, durable = true},
+ Binding = #binding{source = XName1, key = <<"">>, destination = XName2, args = #{}},
+ ?assertEqual([], rabbit_db_binding:get_all(?VHOST)),
+ ?assertMatch({new, #exchange{}}, rabbit_db_exchange:create_or_get(Exchange1)),
+ ?assertMatch({new, #exchange{}}, rabbit_db_exchange:create_or_get(Exchange2)),
+ ?assertMatch(ok, rabbit_db_binding:create(Binding, fun(_, _) -> ok end)),
+ ?assertEqual([Binding], rabbit_db_binding:get_all(?VHOST)),
+ ?assertEqual([], rabbit_db_binding:get_all(<<"other-vhost">>)),
+ passed.
+
+get_all_for_source(Config) ->
+ passed = rabbit_ct_broker_helpers:rpc(
+ Config, 0, ?MODULE, get_all_for_source1, [Config]).
+
+get_all_for_source1(_Config) ->
+ XName1 = rabbit_misc:r(?VHOST, exchange, <<"test-exchange1">>),
+ XName2 = rabbit_misc:r(?VHOST, exchange, <<"test-exchange2">>),
+ Exchange1 = #exchange{name = XName1, durable = true},
+ Exchange2 = #exchange{name = XName2, durable = true},
+ Binding = #binding{source = XName1, key = <<"">>, destination = XName2, args = #{}},
+ ?assertEqual([], rabbit_db_binding:get_all_for_source(XName1)),
+ ?assertEqual([], rabbit_db_binding:get_all_for_source(XName2)),
+ ?assertMatch({new, #exchange{}}, rabbit_db_exchange:create_or_get(Exchange1)),
+ ?assertMatch({new, #exchange{}}, rabbit_db_exchange:create_or_get(Exchange2)),
+ ?assertMatch(ok, rabbit_db_binding:create(Binding, fun(_, _) -> ok end)),
+ ?assertEqual([Binding], rabbit_db_binding:get_all_for_source(XName1)),
+ ?assertEqual([], rabbit_db_binding:get_all_for_source(XName2)),
+ passed.
+
+get_all_for_destination(Config) ->
+ passed = rabbit_ct_broker_helpers:rpc(
+ Config, 0, ?MODULE, get_all_for_destination1, [Config]).
+
+get_all_for_destination1(_Config) ->
+ XName1 = rabbit_misc:r(?VHOST, exchange, <<"test-exchange1">>),
+ XName2 = rabbit_misc:r(?VHOST, exchange, <<"test-exchange2">>),
+ Exchange1 = #exchange{name = XName1, durable = true},
+ Exchange2 = #exchange{name = XName2, durable = true},
+ Binding = #binding{source = XName1, key = <<"">>, destination = XName2, args = #{}},
+ ?assertEqual([], rabbit_db_binding:get_all_for_destination(XName1)),
+ ?assertEqual([], rabbit_db_binding:get_all_for_destination(XName2)),
+ ?assertMatch({new, #exchange{}}, rabbit_db_exchange:create_or_get(Exchange1)),
+ ?assertMatch({new, #exchange{}}, rabbit_db_exchange:create_or_get(Exchange2)),
+ ?assertMatch(ok, rabbit_db_binding:create(Binding, fun(_, _) -> ok end)),
+ ?assertEqual([], rabbit_db_binding:get_all_for_destination(XName1)),
+ ?assertEqual([Binding], rabbit_db_binding:get_all_for_destination(XName2)),
+ passed.
+
+get_all_for_source_and_destination(Config) ->
+ passed = rabbit_ct_broker_helpers:rpc(
+ Config, 0, ?MODULE, get_all_for_source_and_destination1, [Config]).
+
+get_all_for_source_and_destination1(_Config) ->
+ XName1 = rabbit_misc:r(?VHOST, exchange, <<"test-exchange1">>),
+ XName2 = rabbit_misc:r(?VHOST, exchange, <<"test-exchange2">>),
+ Exchange1 = #exchange{name = XName1, durable = true},
+ Exchange2 = #exchange{name = XName2, durable = true},
+ Binding = #binding{source = XName1, key = <<"">>, destination = XName2, args = #{}},
+ ?assertEqual([], rabbit_db_binding:get_all(XName1, XName2, false)),
+ ?assertEqual([], rabbit_db_binding:get_all(XName2, XName1, false)),
+ ?assertMatch({new, #exchange{}}, rabbit_db_exchange:create_or_get(Exchange1)),
+ ?assertMatch({new, #exchange{}}, rabbit_db_exchange:create_or_get(Exchange2)),
+ ?assertMatch(ok, rabbit_db_binding:create(Binding, fun(_, _) -> ok end)),
+ ?assertEqual([Binding], rabbit_db_binding:get_all(XName1, XName2, false)),
+ ?assertEqual([], rabbit_db_binding:get_all(XName1, XName1, false)),
+ ?assertEqual([], rabbit_db_binding:get_all(XName2, XName1, false)),
+ ?assertEqual([], rabbit_db_binding:get_all(XName2, XName2, false)),
+ passed.
+
+get_all_for_source_and_destination_reverse(Config) ->
+ passed = rabbit_ct_broker_helpers:rpc(
+ Config, 0, ?MODULE, get_all_for_source_and_destination_reverse1, [Config]).
+
+get_all_for_source_and_destination_reverse1(_Config) ->
+ XName1 = rabbit_misc:r(?VHOST, exchange, <<"test-exchange1">>),
+ XName2 = rabbit_misc:r(?VHOST, exchange, <<"test-exchange2">>),
+ Exchange1 = #exchange{name = XName1, durable = true},
+ Exchange2 = #exchange{name = XName2, durable = true},
+ Binding = #binding{source = XName1, key = <<"">>, destination = XName2, args = #{}},
+ ?assertEqual([], rabbit_db_binding:get_all(XName1, XName2, true)),
+ ?assertEqual([], rabbit_db_binding:get_all(XName2, XName1, true)),
+ ?assertMatch({new, #exchange{}}, rabbit_db_exchange:create_or_get(Exchange1)),
+ ?assertMatch({new, #exchange{}}, rabbit_db_exchange:create_or_get(Exchange2)),
+ ?assertMatch(ok, rabbit_db_binding:create(Binding, fun(_, _) -> ok end)),
+ ?assertEqual([Binding], rabbit_db_binding:get_all(XName1, XName2, true)),
+ ?assertEqual([], rabbit_db_binding:get_all(XName1, XName1, true)),
+ ?assertEqual([], rabbit_db_binding:get_all(XName2, XName1, true)),
+ ?assertEqual([], rabbit_db_binding:get_all(XName2, XName2, true)),
+ passed.
+
+fold(Config) ->
+ passed = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, fold1, [Config]).
+
+fold1(_Config) ->
+ XName1 = rabbit_misc:r(?VHOST, exchange, <<"test-exchange1">>),
+ XName2 = rabbit_misc:r(?VHOST, exchange, <<"test-exchange2">>),
+ Exchange1 = #exchange{name = XName1, durable = true},
+ Exchange2 = #exchange{name = XName2, durable = true},
+ Binding = #binding{source = XName1, key = <<"">>, destination = XName2, args = #{}},
+ ?assertEqual([], rabbit_db_binding:fold(fun(B, Acc) -> [B | Acc] end, [])),
+ ?assertMatch({new, #exchange{}}, rabbit_db_exchange:create_or_get(Exchange1)),
+ ?assertMatch({new, #exchange{}}, rabbit_db_exchange:create_or_get(Exchange2)),
+ ?assertMatch(ok, rabbit_db_binding:create(Binding, fun(_, _) -> ok end)),
+ ?assertEqual([Binding], rabbit_db_binding:fold(fun(B, Acc) -> [B | Acc] end, [])),
+ passed.
+
+match(Config) ->
+ passed = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, match1, [Config]).
+
+match1(_Config) ->
+ XName1 = rabbit_misc:r(?VHOST, exchange, <<"test-exchange1">>),
+ XName2 = rabbit_misc:r(?VHOST, exchange, <<"test-exchange2">>),
+ Exchange1 = #exchange{name = XName1, durable = true},
+ Exchange2 = #exchange{name = XName2, durable = true},
+ Binding = #binding{source = XName1, key = <<"">>, destination = XName2,
+ args = #{foo => bar}},
+ ?assertEqual([], rabbit_db_binding:match(XName1, fun(#binding{args = Args}) ->
+ maps:get(foo, Args) =:= bar
+ end)),
+ ?assertMatch({new, #exchange{}}, rabbit_db_exchange:create_or_get(Exchange1)),
+ ?assertMatch({new, #exchange{}}, rabbit_db_exchange:create_or_get(Exchange2)),
+ ?assertMatch(ok, rabbit_db_binding:create(Binding, fun(_, _) -> ok end)),
+ ?assertEqual([XName2],
+ rabbit_db_binding:match(XName1, fun(#binding{args = Args}) ->
+ maps:get(foo, Args) =:= bar
+ end)),
+ ?assertEqual([],
+ rabbit_db_binding:match(XName1, fun(#binding{args = Args}) ->
+ maps:is_key(headers, Args)
+ end)),
+ passed.
+
+match_routing_key(Config) ->
+ passed = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, match1, [Config]).
+
+match_routing_key1(_Config) ->
+ XName1 = rabbit_misc:r(?VHOST, exchange, <<"test-exchange1">>),
+ XName2 = rabbit_misc:r(?VHOST, exchange, <<"test-exchange2">>),
+ Exchange1 = #exchange{name = XName1, durable = true},
+ Exchange2 = #exchange{name = XName2, durable = true},
+ Binding = #binding{source = XName1, key = <<"*.*">>, destination = XName2,
+ args = #{foo => bar}},
+ ?assertEqual([], rabbit_db_binding:match_routing_key(XName1, [<<"a.b.c">>], false)),
+ ?assertMatch({new, #exchange{}}, rabbit_db_exchange:create_or_get(Exchange1)),
+ ?assertMatch({new, #exchange{}}, rabbit_db_exchange:create_or_get(Exchange2)),
+ ?assertMatch(ok, rabbit_db_binding:create(Binding, fun(_, _) -> ok end)),
+ ?assertEqual([], rabbit_db_binding:match_routing_key(XName1, [<<"a.b.c">>], false)),
+ ?assertEqual([XName2], rabbit_db_binding:match_routing_key(XName1, [<<"a.b">>], false)),
+ passed.
diff --git a/deps/rabbit/test/rabbit_db_exchange_SUITE.erl b/deps/rabbit/test/rabbit_db_exchange_SUITE.erl
new file mode 100644
index 0000000000..33982e8b34
--- /dev/null
+++ b/deps/rabbit/test/rabbit_db_exchange_SUITE.erl
@@ -0,0 +1,330 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2023 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(rabbit_db_exchange_SUITE).
+
+-include_lib("rabbit_common/include/rabbit.hrl").
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("common_test/include/ct.hrl").
+
+-compile(export_all).
+
+-define(VHOST, <<"/">>).
+
+all() ->
+ [
+ {group, all_tests}
+ ].
+
+groups() ->
+ [
+ {all_tests, [], all_tests()}
+ ].
+
+all_tests() ->
+ [
+ create_or_get,
+ get,
+ get_many,
+ get_all,
+ get_all_by_vhost,
+ get_all_durable,
+ list,
+ count,
+ update,
+ set,
+ peek_serial,
+ next_serial,
+ delete_serial,
+ delete,
+ delete_if_unused,
+ exists,
+ match,
+ recover
+ ].
+
+%% -------------------------------------------------------------------
+%% Test suite setup/teardown.
+%% -------------------------------------------------------------------
+
+init_per_suite(Config) ->
+ rabbit_ct_helpers:log_environment(),
+ rabbit_ct_helpers:run_setup_steps(Config).
+
+end_per_suite(Config) ->
+ rabbit_ct_helpers:run_teardown_steps(Config).
+
+init_per_group(Group, Config) ->
+ Config1 = rabbit_ct_helpers:set_config(Config, [
+ {rmq_nodename_suffix, Group},
+ {rmq_nodes_count, 1}
+ ]),
+ rabbit_ct_helpers:run_steps(Config1,
+ rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps()).
+
+end_per_group(_Group, Config) ->
+ rabbit_ct_helpers:run_steps(Config,
+ rabbit_ct_client_helpers:teardown_steps() ++
+ rabbit_ct_broker_helpers:teardown_steps()).
+
+init_per_testcase(Testcase, Config) ->
+ rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_db_exchange, clear, []),
+ rabbit_ct_helpers:testcase_started(Config, Testcase).
+
+end_per_testcase(Testcase, Config) ->
+ rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_db_exchange, clear, []),
+ rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_db_binding, clear, []),
+ rabbit_ct_helpers:testcase_finished(Config, Testcase).
+
+%% ---------------------------------------------------------------------------
+%% Test Cases
+%% ---------------------------------------------------------------------------
+
+create_or_get(Config) ->
+ passed = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, create_or_get1, [Config]).
+
+create_or_get1(_Config) ->
+ XName = rabbit_misc:r(?VHOST, exchange, <<"test-exchange">>),
+ Exchange0 = #exchange{name = XName, durable = true},
+ Exchange = rabbit_exchange_decorator:set(Exchange0),
+ ?assertMatch({new, Exchange}, rabbit_db_exchange:create_or_get(Exchange0)),
+ ?assertEqual({existing, Exchange}, rabbit_db_exchange:create_or_get(Exchange0)),
+ passed.
+
+get(Config) ->
+ passed = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, get1, [Config]).
+
+get1(_Config) ->
+ XName = rabbit_misc:r(?VHOST, exchange, <<"test-exchange">>),
+ Exchange0 = #exchange{name = XName, durable = true},
+ Exchange = rabbit_exchange_decorator:set(Exchange0),
+ ?assertEqual({error, not_found}, rabbit_db_exchange:get(XName)),
+ ?assertEqual({new, Exchange}, rabbit_db_exchange:create_or_get(Exchange0)),
+ ?assertEqual({ok, Exchange}, rabbit_db_exchange:get(XName)),
+ passed.
+
+get_many(Config) ->
+ passed = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, get_many1, [Config]).
+
+get_many1(_Config) ->
+ XName = rabbit_misc:r(?VHOST, exchange, <<"test-exchange">>),
+ Exchange0 = #exchange{name = XName, durable = true},
+ Exchange = rabbit_exchange_decorator:set(Exchange0),
+ ?assertEqual([], rabbit_db_exchange:get_many([XName])),
+ ?assertEqual({new, Exchange}, rabbit_db_exchange:create_or_get(Exchange0)),
+ ?assertEqual([Exchange], rabbit_db_exchange:get_many([XName])),
+ passed.
+
+get_all(Config) ->
+ passed = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, get_all1, [Config]).
+
+get_all1(_Config) ->
+ XName1 = rabbit_misc:r(?VHOST, exchange, <<"test-exchange1">>),
+ XName2 = rabbit_misc:r(?VHOST, exchange, <<"test-exchange2">>),
+ Exchange1_0 = #exchange{name = XName1, durable = true},
+ Exchange2_0 = #exchange{name = XName2, durable = true},
+ Exchange1 = rabbit_exchange_decorator:set(Exchange1_0),
+ Exchange2 = rabbit_exchange_decorator:set(Exchange2_0),
+ All = lists:sort([Exchange1, Exchange2]),
+ ?assertEqual([], rabbit_db_exchange:get_all()),
+ create([Exchange1_0, Exchange2_0]),
+ ?assertEqual(All, lists:sort(rabbit_db_exchange:get_all())),
+ passed.
+
+get_all_by_vhost(Config) ->
+ passed = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, get_all_by_vhost1, [Config]).
+
+get_all_by_vhost1(_Config) ->
+ XName1 = rabbit_misc:r(?VHOST, exchange, <<"test-exchange1">>),
+ XName2 = rabbit_misc:r(?VHOST, exchange, <<"test-exchange2">>),
+ Exchange1_0 = #exchange{name = XName1, durable = true},
+ Exchange2_0 = #exchange{name = XName2, durable = true},
+ Exchange1 = rabbit_exchange_decorator:set(Exchange1_0),
+ Exchange2 = rabbit_exchange_decorator:set(Exchange2_0),
+ All = lists:sort([Exchange1, Exchange2]),
+ ?assertEqual([], rabbit_db_exchange:get_all(?VHOST)),
+ create([Exchange1_0, Exchange2_0]),
+ ?assertEqual(All, lists:sort(rabbit_db_exchange:get_all(?VHOST))),
+ ?assertEqual([], lists:sort(rabbit_db_exchange:get_all(<<"other-vhost">>))),
+ passed.
+
+get_all_durable(Config) ->
+ passed = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, get_all_durable1, [Config]).
+
+get_all_durable1(_Config) ->
+ XName1 = rabbit_misc:r(?VHOST, exchange, <<"test-exchange1">>),
+ XName2 = rabbit_misc:r(?VHOST, exchange, <<"test-exchange2">>),
+ Exchange1_0 = #exchange{name = XName1, durable = true},
+ Exchange2_0 = #exchange{name = XName2, durable = true},
+ All = lists:sort([Exchange1_0, Exchange2_0]),
+ ?assertEqual([], rabbit_db_exchange:get_all_durable()),
+ create([Exchange1_0, Exchange2_0]),
+ ?assertEqual(All, lists:sort(rabbit_db_exchange:get_all_durable())),
+ passed.
+
+list(Config) ->
+ passed = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, list1, [Config]).
+
+list1(_Config) ->
+ XName1 = rabbit_misc:r(?VHOST, exchange, <<"test-exchange1">>),
+ XName2 = rabbit_misc:r(?VHOST, exchange, <<"test-exchange2">>),
+ Exchange1_0 = #exchange{name = XName1, durable = true},
+ Exchange2_0 = #exchange{name = XName2, durable = true},
+ All = lists:sort([XName1, XName2]),
+ ?assertEqual([], rabbit_db_exchange:list()),
+ create([Exchange1_0, Exchange2_0]),
+ ?assertEqual(All, lists:sort(rabbit_db_exchange:list())),
+ passed.
+
+count(Config) ->
+ passed = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, count1, [Config]).
+
+count1(_Config) ->
+ XName1 = rabbit_misc:r(?VHOST, exchange, <<"test-exchange1">>),
+ XName2 = rabbit_misc:r(?VHOST, exchange, <<"test-exchange2">>),
+ Exchange1_0 = #exchange{name = XName1, durable = true},
+ Exchange2_0 = #exchange{name = XName2, durable = true},
+ ?assertEqual(0, rabbit_db_exchange:count()),
+ create([Exchange1_0, Exchange2_0]),
+ ?assertEqual(2, rabbit_db_exchange:count()),
+ passed.
+
+update(Config) ->
+ passed = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, update1, [Config]).
+
+update1(_Config) ->
+ XName = rabbit_misc:r(?VHOST, exchange, <<"test-exchange">>),
+ Exchange = #exchange{name = XName, durable = true},
+ ?assertEqual(ok,
+ rabbit_db_exchange:update(XName, fun(X) -> X#exchange{type = topic} end)),
+ create([Exchange]),
+ ?assertEqual(ok,
+ rabbit_db_exchange:update(XName, fun(X) -> X#exchange{type = topic} end)),
+ {ok, Exchange0} = rabbit_db_exchange:get(XName),
+ ?assertEqual(topic, Exchange0#exchange.type),
+ passed.
+
+set(Config) ->
+ passed = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, set1, [Config]).
+
+set1(_Config) ->
+ XName = rabbit_misc:r(?VHOST, exchange, <<"test-exchange">>),
+ Exchange = #exchange{name = XName, durable = true},
+ ?assertEqual(ok, rabbit_db_exchange:set([Exchange])),
+ ?assertEqual({error, not_found}, rabbit_db_exchange:get(XName)),
+ ?assertEqual([Exchange], rabbit_db_exchange:get_all_durable()),
+ passed.
+
+peek_serial(Config) ->
+ passed = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, peek_serial1, [Config]).
+
+peek_serial1(_Config) ->
+ XName = rabbit_misc:r(?VHOST, exchange, <<"test-exchange">>),
+ ?assertEqual(1, rabbit_db_exchange:peek_serial(XName)),
+ ?assertEqual(1, rabbit_db_exchange:peek_serial(XName)),
+ ?assertEqual(1, rabbit_db_exchange:next_serial(XName)),
+ ?assertEqual(2, rabbit_db_exchange:peek_serial(XName)),
+ ?assertEqual(2, rabbit_db_exchange:peek_serial(XName)),
+ passed.
+
+next_serial(Config) ->
+ passed = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, next_serial1, [Config]).
+
+next_serial1(_Config) ->
+ XName = rabbit_misc:r(?VHOST, exchange, <<"test-exchange">>),
+ ?assertEqual(1, rabbit_db_exchange:next_serial(XName)),
+ ?assertEqual(2, rabbit_db_exchange:next_serial(XName)),
+ ?assertEqual(3, rabbit_db_exchange:next_serial(XName)),
+ passed.
+
+delete_serial(Config) ->
+ passed = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_serial1, [Config]).
+
+delete_serial1(_Config) ->
+ XName = rabbit_misc:r(?VHOST, exchange, <<"test-exchange">>),
+ ?assertEqual(1, rabbit_db_exchange:next_serial(XName)),
+ ?assertEqual(2, rabbit_db_exchange:next_serial(XName)),
+ ?assertEqual(ok, rabbit_db_exchange:delete_serial(XName)),
+ ?assertEqual(1, rabbit_db_exchange:peek_serial(XName)),
+ passed.
+
+delete(Config) ->
+ passed = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete1, [Config]).
+
+delete1(_Config) ->
+ XName = rabbit_misc:r(?VHOST, exchange, <<"test-exchange">>),
+ Exchange0 = #exchange{name = XName, durable = true},
+ ?assertMatch({error, not_found}, rabbit_db_exchange:delete(XName, false)),
+ create([Exchange0]),
+ ?assertMatch({ok, #exchange{name = XName}}, rabbit_db_exchange:get(XName)),
+ ?assertMatch([#exchange{name = XName}], rabbit_db_exchange:get_all_durable()),
+ ?assertMatch({deleted, #exchange{name = XName}, [], _},
+ rabbit_db_exchange:delete(XName, false)),
+ ?assertEqual({error, not_found}, rabbit_db_exchange:get(XName)),
+ ?assertEqual([], rabbit_db_exchange:get_all_durable()),
+ passed.
+
+delete_if_unused(Config) ->
+ passed = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_if_unused1, [Config]).
+
+delete_if_unused1(_Config) ->
+ XName1 = rabbit_misc:r(?VHOST, exchange, <<"test-exchange1">>),
+ XName2 = rabbit_misc:r(?VHOST, exchange, <<"test-exchange2">>),
+ Exchange1 = #exchange{name = XName1, durable = true},
+ Exchange2 = #exchange{name = XName2, durable = true},
+ Binding = #binding{source = XName1, key = <<"">>, destination = XName2, args = #{}},
+ ?assertMatch({error, not_found}, rabbit_db_exchange:delete(XName1, true)),
+ create([Exchange1, Exchange2]),
+ ?assertEqual(ok, rabbit_db_binding:create(Binding, fun(_, _) -> ok end)),
+ ?assertMatch({ok, #exchange{name = XName1}}, rabbit_db_exchange:get(XName1)),
+ ?assertMatch([#exchange{}, #exchange{}], rabbit_db_exchange:get_all_durable()),
+ ?assertMatch({error, in_use}, rabbit_db_exchange:delete(XName1, true)),
+ ?assertMatch({ok, #exchange{name = XName1}}, rabbit_db_exchange:get(XName1)),
+ ?assertMatch([#exchange{}, #exchange{}], rabbit_db_exchange:get_all_durable()),
+ passed.
+
+exists(Config) ->
+ passed = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, exists1, [Config]).
+
+exists1(_Config) ->
+ XName = rabbit_misc:r(?VHOST, exchange, <<"test-exchange">>),
+ Exchange = #exchange{name = XName, durable = true},
+ ?assertEqual(false, rabbit_db_exchange:exists(XName)),
+ create([Exchange]),
+ ?assertEqual(true, rabbit_db_exchange:exists(XName)),
+ passed.
+
+match(Config) ->
+ passed = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, match1, [Config]).
+
+match1(_Config) ->
+ XName = rabbit_misc:r(?VHOST, exchange, <<"test-exchange">>),
+ Exchange = #exchange{name = XName, durable = true, type = topic},
+ Pattern = #exchange{durable = true, type = topic, _ = '_'},
+ ?assertEqual([], rabbit_db_exchange:match(Pattern)),
+ create([Exchange]),
+ ?assertMatch([#exchange{name = XName}], rabbit_db_exchange:match(Pattern)),
+ passed.
+
+recover(Config) ->
+ passed = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, recover1, [Config]).
+
+recover1(_Config) ->
+ XName = rabbit_misc:r(?VHOST, exchange, <<"test-exchange">>),
+ Exchange = #exchange{name = XName, durable = true},
+ ?assertEqual(ok, rabbit_db_exchange:set([Exchange])),
+ ?assertEqual({error, not_found}, rabbit_db_exchange:get(XName)),
+ ?assertEqual([Exchange], rabbit_db_exchange:get_all_durable()),
+ ?assertMatch([Exchange], rabbit_db_exchange:recover(?VHOST)),
+ ?assertMatch({ok, #exchange{name = XName}}, rabbit_db_exchange:get(XName)),
+ ?assertEqual([Exchange], rabbit_db_exchange:get_all_durable()),
+ passed.
+
+create(Exchanges) ->
+ [?assertMatch({new, #exchange{}}, rabbit_db_exchange:create_or_get(Exchange))
+ || Exchange <- Exchanges].
diff --git a/deps/rabbit/test/rabbit_db_maintenance_SUITE.erl b/deps/rabbit/test/rabbit_db_maintenance_SUITE.erl
new file mode 100644
index 0000000000..491cdfb9a3
--- /dev/null
+++ b/deps/rabbit/test/rabbit_db_maintenance_SUITE.erl
@@ -0,0 +1,93 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2023 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(rabbit_db_maintenance_SUITE).
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("common_test/include/ct.hrl").
+
+-compile(export_all).
+
+all() ->
+ [
+ {group, all_tests}
+ ].
+
+groups() ->
+ [
+ {all_tests, [], all_tests()}
+ ].
+
+all_tests() ->
+ [
+ setup_schema,
+ set_and_get,
+ set_and_get_consistent
+ ].
+
+%% -------------------------------------------------------------------
+%% Test suite setup/teardown.
+%% -------------------------------------------------------------------
+
+init_per_suite(Config) ->
+ rabbit_ct_helpers:log_environment(),
+ rabbit_ct_helpers:run_setup_steps(Config).
+
+end_per_suite(Config) ->
+ rabbit_ct_helpers:run_teardown_steps(Config).
+
+init_per_group(Group, Config) ->
+ Config1 = rabbit_ct_helpers:set_config(Config, [
+ {rmq_nodename_suffix, Group},
+ {rmq_nodes_count, 1}
+ ]),
+ rabbit_ct_helpers:run_steps(Config1,
+ rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps()).
+
+end_per_group(_Group, Config) ->
+ rabbit_ct_helpers:run_steps(Config,
+ rabbit_ct_client_helpers:teardown_steps() ++
+ rabbit_ct_broker_helpers:teardown_steps()).
+
+init_per_testcase(Testcase, Config) ->
+ rabbit_ct_helpers:testcase_started(Config, Testcase).
+
+end_per_testcase(Testcase, Config) ->
+ rabbit_ct_helpers:testcase_finished(Config, Testcase).
+
+%% ---------------------------------------------------------------------------
+%% Test Cases
+%% ---------------------------------------------------------------------------
+
+setup_schema(Config) ->
+ passed = rabbit_ct_broker_helpers:rpc(
+ Config, 0, ?MODULE, setup_schema1, [Config]).
+
+setup_schema1(_Config) ->
+ ?assertEqual(ok, rabbit_db_maintenance:setup_schema()),
+ passed.
+
+set_and_get(Config) ->
+ passed = rabbit_ct_broker_helpers:rpc(
+ Config, 0, ?MODULE, set_and_get1, [Config]).
+
+set_and_get1(_Config) ->
+ ?assertEqual(true, rabbit_db_maintenance:set(ready)),
+ ?assertEqual(ready, rabbit_db_maintenance:get(node())),
+ ?assertEqual(undefined, rabbit_db_maintenance:get('another-node')),
+ passed.
+
+set_and_get_consistent(Config) ->
+ passed = rabbit_ct_broker_helpers:rpc(
+ Config, 0, ?MODULE, set_and_get_consistent1, [Config]).
+
+set_and_get_consistent1(_Config) ->
+ ?assertEqual(true, rabbit_db_maintenance:set(ready)),
+ ?assertEqual(ready, rabbit_db_maintenance:get_consistent(node())),
+ ?assertEqual(undefined, rabbit_db_maintenance:get_consistent('another-node')),
+ passed.
diff --git a/deps/rabbit/test/rabbit_db_msup_SUITE.erl b/deps/rabbit/test/rabbit_db_msup_SUITE.erl
new file mode 100644
index 0000000000..20ca9f1fbf
--- /dev/null
+++ b/deps/rabbit/test/rabbit_db_msup_SUITE.erl
@@ -0,0 +1,136 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2023 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(rabbit_db_msup_SUITE).
+
+-include_lib("rabbit_common/include/rabbit.hrl").
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("common_test/include/ct.hrl").
+
+-compile(export_all).
+
+all() ->
+ [
+ {group, all_tests}
+ ].
+
+groups() ->
+ [
+ {all_tests, [], all_tests()}
+ ].
+
+all_tests() ->
+ [
+ create_tables,
+ create_or_update,
+ find_mirror,
+ delete,
+ delete_all,
+ update_all
+ ].
+
+%% -------------------------------------------------------------------
+%% Test suite setup/teardown.
+%% -------------------------------------------------------------------
+
+init_per_suite(Config) ->
+ rabbit_ct_helpers:log_environment(),
+ rabbit_ct_helpers:run_setup_steps(Config).
+
+end_per_suite(Config) ->
+ rabbit_ct_helpers:run_teardown_steps(Config).
+
+init_per_group(Group, Config) ->
+ Config1 = rabbit_ct_helpers:set_config(Config, [
+ {rmq_nodename_suffix, Group},
+ {rmq_nodes_count, 1}
+ ]),
+ rabbit_ct_helpers:run_steps(Config1,
+ rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps()).
+
+end_per_group(_Group, Config) ->
+ rabbit_ct_helpers:run_steps(Config,
+ rabbit_ct_client_helpers:teardown_steps() ++
+ rabbit_ct_broker_helpers:teardown_steps()).
+
+init_per_testcase(Testcase, Config) ->
+ rabbit_ct_helpers:testcase_started(Config, Testcase).
+
+end_per_testcase(Testcase, Config) ->
+ rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_db_msup, clear, []),
+ rabbit_ct_helpers:testcase_finished(Config, Testcase).
+
+%% ---------------------------------------------------------------------------
+%% Test Cases
+%% ---------------------------------------------------------------------------
+
+create_tables(Config) ->
+ passed = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, create_tables1, [Config]).
+
+create_tables1(_Config) ->
+ ?assertEqual(ok, rabbit_db_msup:create_tables()),
+ passed.
+
+create_or_update(Config) ->
+ passed = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, create_or_update1, [Config]).
+
+create_or_update1(_Config) ->
+ Overall = spawn(fun() -> ok end),
+ Spec = #{id => id, start => {m, f, args}},
+ ?assertEqual(start,
+ rabbit_db_msup:create_or_update(group, Overall, undefined, Spec, id)),
+ passed.
+
+find_mirror(Config) ->
+ passed = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, find_mirror1, [Config]).
+
+find_mirror1(_Config) ->
+ Overall = spawn(fun() -> ok end),
+ Spec = #{id => id, start => {m, f, args}},
+ ?assertEqual(start, rabbit_db_msup:create_or_update(group, Overall, undefined,
+ Spec, id)),
+ ?assertEqual({ok, Overall}, rabbit_db_msup:find_mirror(group, id)),
+ passed.
+
+delete(Config) ->
+ passed = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete1, [Config]).
+
+delete1(_Config) ->
+ Overall = spawn(fun() -> ok end),
+ Spec = #{id => id, start => {m, f, args}},
+ ?assertEqual(start, rabbit_db_msup:create_or_update(group, Overall, undefined,
+ Spec, id)),
+ ?assertEqual(ok, rabbit_db_msup:delete(group, id)),
+ ?assertEqual({error, not_found}, rabbit_db_msup:find_mirror(group, id)),
+ passed.
+
+delete_all(Config) ->
+ passed = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_all1, [Config]).
+
+delete_all1(_Config) ->
+ Overall = spawn(fun() -> ok end),
+ Spec = #{id => id, start => {m, f, args}},
+ ?assertEqual(start, rabbit_db_msup:create_or_update(group, Overall, undefined,
+ Spec, id)),
+ ?assertEqual(ok, rabbit_db_msup:delete_all(group)),
+ ?assertEqual({error, not_found}, rabbit_db_msup:find_mirror(group, id)),
+ passed.
+
+update_all(Config) ->
+ passed = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, update_all1, [Config]).
+
+update_all1(_Config) ->
+ OldOverall = spawn(fun() -> ok end),
+ Overall = spawn(fun() -> ok end),
+ Spec = #{id => id, start => {m, f, args}},
+ ?assertEqual(start, rabbit_db_msup:create_or_update(group, OldOverall, undefined,
+ Spec, id)),
+ ?assertEqual({ok, OldOverall}, rabbit_db_msup:find_mirror(group, id)),
+ ?assertEqual([Spec], rabbit_db_msup:update_all(Overall, OldOverall)),
+ ?assertEqual({ok, Overall}, rabbit_db_msup:find_mirror(group, id)),
+ passed.
diff --git a/deps/rabbit/test/rabbit_db_policy_SUITE.erl b/deps/rabbit/test/rabbit_db_policy_SUITE.erl
new file mode 100644
index 0000000000..2afa3acd5d
--- /dev/null
+++ b/deps/rabbit/test/rabbit_db_policy_SUITE.erl
@@ -0,0 +1,96 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2023 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(rabbit_db_policy_SUITE).
+
+-include_lib("rabbit_common/include/rabbit.hrl").
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("common_test/include/ct.hrl").
+
+-compile(export_all).
+
+-define(VHOST, <<"/">>).
+
+all() ->
+ [
+ {group, all_tests}
+ ].
+
+groups() ->
+ [
+ {all_tests, [], all_tests()}
+ ].
+
+all_tests() ->
+ [
+ update
+ ].
+
+%% -------------------------------------------------------------------
+%% Test suite setup/teardown.
+%% -------------------------------------------------------------------
+
+init_per_suite(Config) ->
+ rabbit_ct_helpers:log_environment(),
+ rabbit_ct_helpers:run_setup_steps(Config).
+
+end_per_suite(Config) ->
+ rabbit_ct_helpers:run_teardown_steps(Config).
+
+init_per_group(Group, Config) ->
+ Config1 = rabbit_ct_helpers:set_config(Config, [
+ {rmq_nodename_suffix, Group},
+ {rmq_nodes_count, 1}
+ ]),
+ rabbit_ct_helpers:run_steps(Config1,
+ rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps()).
+
+end_per_group(_Group, Config) ->
+ rabbit_ct_helpers:run_steps(Config,
+ rabbit_ct_client_helpers:teardown_steps() ++
+ rabbit_ct_broker_helpers:teardown_steps()).
+
+init_per_testcase(Testcase, Config) ->
+ rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_db_exchange, clear, []),
+ rabbit_ct_helpers:testcase_started(Config, Testcase).
+
+end_per_testcase(Testcase, Config) ->
+ rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_db_exchange, clear, []),
+ rabbit_ct_helpers:testcase_finished(Config, Testcase).
+
+%% ---------------------------------------------------------------------------
+%% Test Cases
+%% ---------------------------------------------------------------------------
+
+update(Config) ->
+ passed = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, update1, [Config]).
+
+update1(_Config) ->
+ XName = rabbit_misc:r(?VHOST, exchange, <<"test-exchange">>),
+ Exchange = #exchange{name = XName, durable = true},
+ ?assertMatch({new, #exchange{}}, rabbit_db_exchange:create_or_get(Exchange)),
+ QName = rabbit_misc:r(?VHOST, queue, <<"test-queue">>),
+ Queue = amqqueue:new(QName, none, true, false, none, [], ?VHOST, #{},
+ rabbit_classic_queue),
+ ?assertEqual({created, Queue}, rabbit_db_queue:create_or_get(Queue)),
+ ?assertMatch(
+ {[{_, _}], [{_, _}]},
+ rabbit_db_policy:update(?VHOST,
+ fun(X) -> #{exchange => X,
+ update_function =>
+ fun(X0) ->
+ X0#exchange{policy = new_policy}
+ end}
+ end,
+ fun(Q) -> #{queue => Q,
+ update_function =>
+ fun(Q0) ->
+ amqqueue:set_policy(Q0, random_policy)
+ end}
+ end)),
+ passed.
diff --git a/deps/rabbit/test/rabbit_db_queue_SUITE.erl b/deps/rabbit/test/rabbit_db_queue_SUITE.erl
new file mode 100644
index 0000000000..3cafb91443
--- /dev/null
+++ b/deps/rabbit/test/rabbit_db_queue_SUITE.erl
@@ -0,0 +1,596 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2023 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(rabbit_db_queue_SUITE).
+
+-include_lib("rabbit_common/include/rabbit.hrl").
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("common_test/include/ct.hrl").
+-include("amqqueue.hrl").
+
+-compile(export_all).
+
+-define(VHOST, <<"/">>).
+
+all() ->
+ [
+ {group, all_tests},
+ {group, mnesia_store}
+ ].
+
+groups() ->
+ [
+ {all_tests, [], all_tests()},
+ {mnesia_store, [], mnesia_tests()}
+ ].
+
+all_tests() ->
+ [
+ create_or_get,
+ get,
+ get_many,
+ get_all,
+ get_all_by_vhost,
+ get_all_by_type,
+ get_all_by_type_and_node,
+ list,
+ count,
+ count_by_vhost,
+ set,
+ set_many,
+ delete,
+ update,
+ exists,
+ get_all_durable,
+ get_all_durable_by_type,
+ filter_all_durable,
+ get_durable,
+ get_many_durable,
+ set_dirty,
+ internal_delete,
+ update_durable
+ ].
+
+mnesia_tests() ->
+ [
+ foreach_durable,
+ foreach_transient,
+ delete_transient,
+ update_in_mnesia_tx,
+ get_durable_in_mnesia_tx
+ ].
+
+%% -------------------------------------------------------------------
+%% Test suite setup/teardown.
+%% -------------------------------------------------------------------
+
+init_per_suite(Config) ->
+ rabbit_ct_helpers:log_environment(),
+ rabbit_ct_helpers:run_setup_steps(Config).
+
+end_per_suite(Config) ->
+ rabbit_ct_helpers:run_teardown_steps(Config).
+
+init_per_group(Group, Config) ->
+ Config1 = rabbit_ct_helpers:set_config(Config, [
+ {rmq_nodename_suffix, Group},
+ {rmq_nodes_count, 1}
+ ]),
+ rabbit_ct_helpers:run_steps(Config1,
+ rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps()).
+
+end_per_group(_Group, Config) ->
+ rabbit_ct_helpers:run_steps(Config,
+ rabbit_ct_client_helpers:teardown_steps() ++
+ rabbit_ct_broker_helpers:teardown_steps()).
+
+init_per_testcase(Testcase, Config) ->
+ rabbit_ct_helpers:testcase_started(Config, Testcase).
+
+end_per_testcase(Testcase, Config) ->
+ rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_db_queue, clear, []),
+ rabbit_ct_helpers:testcase_finished(Config, Testcase).
+
+%% ---------------------------------------------------------------------------
+%% Test Cases
+%% ---------------------------------------------------------------------------
+
+create_or_get(Config) ->
+ passed = rabbit_ct_broker_helpers:rpc(
+ Config, 0, ?MODULE, create_or_get1, [Config]).
+
+create_or_get1(_Config) ->
+ QName = rabbit_misc:r(?VHOST, queue, <<"test-queue">>),
+ Q = new_queue(QName, rabbit_classic_queue),
+ ?assertEqual({created, Q}, rabbit_db_queue:create_or_get(Q)),
+ ?assertEqual({existing, Q}, rabbit_db_queue:create_or_get(Q)),
+ %% TODO absent
+ passed.
+
+get(Config) ->
+ passed = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, get1, [Config]).
+
+get1(_Config) ->
+ QName = rabbit_misc:r(?VHOST, queue, <<"test-queue">>),
+ Q = new_queue(QName, rabbit_classic_queue),
+ ok = rabbit_db_queue:set(Q),
+ ?assertEqual({ok, Q}, rabbit_db_queue:get(QName)),
+ ?assertEqual({error, not_found},
+ rabbit_db_queue:get(rabbit_misc:r(?VHOST, queue, <<"test-queue2">>))),
+ passed.
+
+get_many(Config) ->
+ passed = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, get_many1, [Config]).
+
+get_many1(_Config) ->
+ QName = rabbit_misc:r(?VHOST, queue, <<"test-queue">>),
+ QName2 = rabbit_misc:r(?VHOST, queue, <<"test-queue2">>),
+ Q = new_queue(QName, rabbit_classic_queue),
+ Q2 = new_queue(QName2, rabbit_classic_queue),
+ ok = rabbit_db_queue:set(Q),
+ ?assertEqual([Q], rabbit_db_queue:get_many([QName])),
+ ?assertEqual([Q], rabbit_db_queue:get_many([QName, QName2])),
+ ?assertEqual([], rabbit_db_queue:get_many([QName2])),
+ ok = rabbit_db_queue:set(Q2),
+ ?assertEqual([Q, Q2], rabbit_db_queue:get_many([QName, QName2])),
+ passed.
+
+get_all(Config) ->
+ passed = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, get_all1, [Config]).
+
+get_all1(_Config) ->
+ QName = rabbit_misc:r(?VHOST, queue, <<"test-queue">>),
+ QName2 = rabbit_misc:r(?VHOST, queue, <<"test-queue2">>),
+ Q = new_queue(QName, rabbit_classic_queue),
+ Q2 = new_queue(QName2, rabbit_classic_queue),
+ All = lists:sort([Q, Q2]),
+ ?assertEqual([], rabbit_db_queue:get_all()),
+ set_list([Q, Q2]),
+ ?assertEqual(All, lists:sort(rabbit_db_queue:get_all())),
+ passed.
+
+get_all_by_vhost(Config) ->
+ passed = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, get_all_by_vhost1, [Config]).
+
+get_all_by_vhost1(_Config) ->
+ QName = rabbit_misc:r(?VHOST, queue, <<"test-queue">>),
+ QName2 = rabbit_misc:r(?VHOST, queue, <<"test-queue2">>),
+ Q = new_queue(QName, rabbit_classic_queue),
+ Q2 = new_queue(QName2, rabbit_classic_queue),
+ All = lists:sort([Q, Q2]),
+ ?assertEqual([], rabbit_db_queue:get_all(?VHOST)),
+ ?assertEqual([], rabbit_db_queue:get_all(<<"some-vhost">>)),
+ set_list([Q, Q2]),
+ ?assertEqual(All, lists:sort(rabbit_db_queue:get_all(?VHOST))),
+ ?assertEqual([], rabbit_db_queue:get_all(<<"some-vhost">>)),
+ passed.
+
+get_all_by_type(Config) ->
+ passed = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, get_all_by_type1, [Config]).
+
+get_all_by_type1(_Config) ->
+ QName = rabbit_misc:r(?VHOST, queue, <<"test-queue">>),
+ QName2 = rabbit_misc:r(?VHOST, queue, <<"test-queue2">>),
+ QName3 = rabbit_misc:r(?VHOST, queue, <<"test-queue3">>),
+ QName4 = rabbit_misc:r(?VHOST, queue, <<"test-queue4">>),
+ Q = new_queue(QName, rabbit_classic_queue),
+ Q2 = new_queue(QName2, rabbit_quorum_queue),
+ Q3 = new_queue(QName3, rabbit_quorum_queue),
+ Q4 = new_queue(QName4, rabbit_stream_queue),
+ Quorum = lists:sort([Q2, Q3]),
+ ?assertEqual([], rabbit_db_queue:get_all_by_type(rabbit_classic_queue)),
+ ?assertEqual([], lists:sort(rabbit_db_queue:get_all_by_type(rabbit_quorum_queue))),
+ ?assertEqual([], rabbit_db_queue:get_all_by_type(rabbit_stream_queue)),
+ set_list([Q, Q2, Q3, Q4]),
+ ?assertEqual([Q], rabbit_db_queue:get_all_by_type(rabbit_classic_queue)),
+ ?assertEqual(Quorum, lists:sort(rabbit_db_queue:get_all_by_type(rabbit_quorum_queue))),
+ ?assertEqual([Q4], rabbit_db_queue:get_all_by_type(rabbit_stream_queue)),
+ passed.
+
+get_all_by_type_and_node(Config) ->
+ passed = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, get_all_by_type_and_node1, [Config]).
+
+get_all_by_type_and_node1(_Config) ->
+ QName = rabbit_misc:r(?VHOST, queue, <<"test-queue">>),
+ QName2 = rabbit_misc:r(?VHOST, queue, <<"test-queue2">>),
+ QName3 = rabbit_misc:r(?VHOST, queue, <<"test-queue3">>),
+ QName4 = rabbit_misc:r(?VHOST, queue, <<"test-queue4">>),
+ Pid = spawn(fun() -> ok end),
+ Q = new_queue(QName, rabbit_classic_queue, Pid),
+ Q2 = new_queue(QName2, rabbit_quorum_queue),
+ Q3 = new_queue(QName3, rabbit_quorum_queue, Pid),
+ Q4 = new_queue(QName4, rabbit_stream_queue, Pid),
+ Node = node(),
+ ?assertEqual([], rabbit_db_queue:get_all_by_type_and_node(?VHOST, rabbit_classic_queue, Node)),
+ ?assertEqual([], lists:sort(rabbit_db_queue:get_all_by_type_and_node(?VHOST, rabbit_quorum_queue, Node))),
+ ?assertEqual([], rabbit_db_queue:get_all_by_type_and_node(?VHOST, rabbit_stream_queue, Node)),
+ set_list([Q, Q2, Q3, Q4]),
+ ?assertEqual([Q], rabbit_db_queue:get_all_by_type_and_node(?VHOST, rabbit_classic_queue, Node)),
+ ?assertEqual([], rabbit_db_queue:get_all_by_type_and_node(<<"other-vhost">>, rabbit_classic_queue, Node)),
+ ?assertEqual([Q3], lists:sort(rabbit_db_queue:get_all_by_type_and_node(?VHOST, rabbit_quorum_queue, Node))),
+ ?assertEqual([Q4], rabbit_db_queue:get_all_by_type_and_node(?VHOST, rabbit_stream_queue, Node)),
+ passed.
+
+list(Config) ->
+ passed = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, list1, [Config]).
+
+list1(_Config) ->
+ QName = rabbit_misc:r(?VHOST, queue, <<"test-queue">>),
+ QName2 = rabbit_misc:r(?VHOST, queue, <<"test-queue2">>),
+ Q = new_queue(QName, rabbit_classic_queue),
+ Q2 = new_queue(QName2, rabbit_classic_queue),
+ All = lists:sort([QName, QName2]),
+ ?assertEqual([], rabbit_db_queue:list()),
+ set_list([Q, Q2]),
+ ?assertEqual(All, lists:sort(rabbit_db_queue:list())),
+ passed.
+
+count(Config) ->
+ passed = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, count1, [Config]).
+
+count1(_Config) ->
+ QName = rabbit_misc:r(?VHOST, queue, <<"test-queue">>),
+ QName2 = rabbit_misc:r(?VHOST, queue, <<"test-queue2">>),
+ QName3 = rabbit_misc:r(?VHOST, queue, <<"test-queue3">>),
+ QName4 = rabbit_misc:r(?VHOST, queue, <<"test-queue4">>),
+ Q = new_queue(QName, rabbit_classic_queue),
+ Q2 = new_queue(QName2, rabbit_quorum_queue),
+ Q3 = new_queue(QName3, rabbit_quorum_queue),
+ Q4 = new_queue(QName4, rabbit_stream_queue),
+ ?assertEqual(0, rabbit_db_queue:count()),
+ set_list([Q, Q2, Q3, Q4]),
+ ?assertEqual(4, rabbit_db_queue:count()),
+ passed.
+
+count_by_vhost(Config) ->
+ passed = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, count_by_vhost1, [Config]).
+
+count_by_vhost1(_Config) ->
+ QName = rabbit_misc:r(?VHOST, queue, <<"test-queue">>),
+ QName2 = rabbit_misc:r(?VHOST, queue, <<"test-queue2">>),
+ QName3 = rabbit_misc:r(?VHOST, queue, <<"test-queue3">>),
+ QName4 = rabbit_misc:r(?VHOST, queue, <<"test-queue4">>),
+ Q = new_queue(QName, rabbit_classic_queue),
+ Q2 = new_queue(QName2, rabbit_quorum_queue),
+ Q3 = new_queue(QName3, rabbit_quorum_queue),
+ Q4 = new_queue(QName4, rabbit_stream_queue),
+ ?assertEqual(0, rabbit_db_queue:count(?VHOST)),
+ set_list([Q, Q2, Q3, Q4]),
+ ?assertEqual(4, rabbit_db_queue:count(?VHOST)),
+ ?assertEqual(0, rabbit_db_queue:count(<<"other-vhost">>)),
+ passed.
+
+set(Config) ->
+ passed = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, set1, [Config]).
+
+set1(_Config) ->
+ QName = rabbit_misc:r(?VHOST, queue, <<"test-queue">>),
+ Q = new_queue(QName, rabbit_classic_queue),
+ ?assertEqual(ok, rabbit_db_queue:set(Q)),
+ ?assertEqual(ok, rabbit_db_queue:set(Q)),
+ ?assertEqual({ok, Q}, rabbit_db_queue:get(QName)),
+ passed.
+
+set_many(Config) ->
+ passed = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, set_many1, [Config]).
+
+set_many1(_Config) ->
+ QName1 = rabbit_misc:r(?VHOST, queue, <<"test-queue1">>),
+ QName2 = rabbit_misc:r(?VHOST, queue, <<"test-queue2">>),
+ QName3 = rabbit_misc:r(?VHOST, queue, <<"test-queue3">>),
+ Q1 = new_queue(QName1, rabbit_classic_queue),
+ Q2 = new_queue(QName2, rabbit_classic_queue),
+ Q3 = new_queue(QName3, rabbit_classic_queue),
+ ?assertEqual(ok, rabbit_db_queue:set_many([])),
+ ?assertEqual(ok, rabbit_db_queue:set_many([Q1, Q2, Q3])),
+ ?assertEqual({ok, Q1}, rabbit_db_queue:get_durable(QName1)),
+ ?assertEqual({ok, Q2}, rabbit_db_queue:get_durable(QName2)),
+ ?assertEqual({ok, Q3}, rabbit_db_queue:get_durable(QName3)),
+ passed.
+
+delete(Config) ->
+ passed = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete1, [Config]).
+
+delete1(_Config) ->
+ QName = rabbit_misc:r(?VHOST, queue, <<"test-queue">>),
+ Q = new_queue(QName, rabbit_classic_queue),
+ ?assertEqual(ok, rabbit_db_queue:set(Q)),
+ ?assertEqual({ok, Q}, rabbit_db_queue:get(QName)),
+ %% TODO Can we handle the deletions outside of rabbit_db_queue? Probably not because
+ %% they should be done in a single transaction, but what a horrid API to have!
+ Dict = rabbit_db_queue:delete(QName, normal),
+ ?assertEqual(0, dict:size(Dict)),
+ ?assertEqual(ok, rabbit_db_queue:delete(QName, normal)),
+ ?assertEqual({error, not_found}, rabbit_db_queue:get(QName)),
+ passed.
+
+update(Config) ->
+ passed = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, update1, [Config]).
+
+update1(_Config) ->
+ QName = rabbit_misc:r(?VHOST, queue, <<"test-queue">>),
+ QName2 = rabbit_misc:r(?VHOST, queue, <<"test-queue2">>),
+ Q = new_queue(QName, rabbit_classic_queue),
+ Pid = spawn(fun() -> ok end),
+ Q2 = amqqueue:set_pid(Q, Pid),
+ ?assertEqual(ok, rabbit_db_queue:set(Q)),
+ ?assertEqual({ok, Q}, rabbit_db_queue:get(QName)),
+ ?assertEqual(Q2, rabbit_db_queue:update(QName, fun(_) -> Q2 end)),
+ ?assertEqual({ok, Q2}, rabbit_db_queue:get(QName)),
+ ?assertEqual(not_found, rabbit_db_queue:update(QName2, fun(_) -> Q2 end)),
+ passed.
+
+update_decorators(Config) ->
+ passed = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, update_decorators1, [Config]).
+
+update_decorators1(_Config) ->
+ QName = rabbit_misc:r(?VHOST, queue, <<"test-queue">>),
+ Q = new_queue(QName, rabbit_classic_queue),
+ ?assertEqual(ok, rabbit_db_queue:set(Q)),
+ ?assertEqual({ok, Q}, rabbit_db_queue:get(QName)),
+ ?assertEqual(undefined, amqqueue:get_decorators(Q)),
+ %% Not really testing we set a decorator, but at least the field is being updated
+ ?assertEqual(ok, rabbit_db_queue:update_decorators(QName)),
+ {ok, Q1} = rabbit_db_queue:get(QName),
+ ?assertEqual([], amqqueue:get_decorators(Q1)),
+ passed.
+
+exists(Config) ->
+ passed = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, exists1, [Config]).
+
+exists1(_Config) ->
+ QName = rabbit_misc:r(?VHOST, queue, <<"test-queue">>),
+ Q = new_queue(QName, rabbit_classic_queue),
+ ?assertEqual(false, rabbit_db_queue:exists(QName)),
+ ?assertEqual(ok, rabbit_db_queue:set(Q)),
+ ?assertEqual(true, rabbit_db_queue:exists(QName)),
+ passed.
+
+get_all_durable(Config) ->
+ passed = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, get_all_durable1, [Config]).
+
+get_all_durable1(_Config) ->
+ QName1 = rabbit_misc:r(?VHOST, queue, <<"test-queue1">>),
+ QName2 = rabbit_misc:r(?VHOST, queue, <<"test-queue2">>),
+ QName3 = rabbit_misc:r(?VHOST, queue, <<"test-queue3">>),
+ Q1 = new_queue(QName1, rabbit_classic_queue),
+ Q2 = new_queue(QName2, rabbit_classic_queue),
+ Q3 = new_queue(QName3, rabbit_classic_queue),
+ All = lists:sort([Q1, Q2, Q3]),
+ ?assertEqual([], rabbit_db_queue:get_all_durable()),
+ set_list(All),
+ ?assertEqual(All, lists:sort(rabbit_db_queue:get_all_durable())),
+ passed.
+
+get_all_durable_by_type(Config) ->
+ passed = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, get_all_durable_by_type1, [Config]).
+
+get_all_durable_by_type1(_Config) ->
+ QName1 = rabbit_misc:r(?VHOST, queue, <<"test-queue1">>),
+ QName2 = rabbit_misc:r(?VHOST, queue, <<"test-queue2">>),
+ QName3 = rabbit_misc:r(?VHOST, queue, <<"test-queue3">>),
+ QName4 = rabbit_misc:r(?VHOST, queue, <<"test-queue4">>),
+ Q1 = new_queue(QName1, rabbit_classic_queue),
+ Q2 = new_queue(QName2, rabbit_quorum_queue),
+ Q3 = new_queue(QName3, rabbit_stream_queue),
+ Q4 = new_queue(QName4, rabbit_classic_queue),
+ All = lists:sort([Q1, Q2, Q3]),
+ ok = rabbit_db_queue:set_dirty(Q4),
+ ?assertEqual([], rabbit_db_queue:get_all_durable_by_type(rabbit_classic_queue)),
+ set_list(All),
+ ?assertEqual([Q1], rabbit_db_queue:get_all_durable_by_type(rabbit_classic_queue)),
+ ?assertEqual([Q2], rabbit_db_queue:get_all_durable_by_type(rabbit_quorum_queue)),
+ ?assertEqual([Q3], rabbit_db_queue:get_all_durable_by_type(rabbit_stream_queue)),
+ passed.
+
+filter_all_durable(Config) ->
+ passed = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, filter_all_durable1, [Config]).
+
+filter_all_durable1(_Config) ->
+ QName1 = rabbit_misc:r(?VHOST, queue, <<"test-queue1">>),
+ QName2 = rabbit_misc:r(?VHOST, queue, <<"test-queue2">>),
+ QName3 = rabbit_misc:r(?VHOST, queue, <<"test-queue3">>),
+ Q1 = new_queue(QName1, rabbit_quorum_queue),
+ Q2 = new_queue(QName2, rabbit_classic_queue),
+ Q3 = new_queue(QName3, rabbit_classic_queue),
+ All = lists:sort([Q2, Q3]),
+ ?assertEqual([], rabbit_db_queue:filter_all_durable(
+ fun(Q) ->
+ amqqueue:get_type(Q) =:= rabbit_classic_queue
+ end)),
+ set_list([Q1, Q2, Q3]),
+ ?assertEqual(All, lists:sort(rabbit_db_queue:filter_all_durable(
+ fun(Q) ->
+ amqqueue:get_type(Q) =:= rabbit_classic_queue
+ end))),
+ passed.
+
+get_durable(Config) ->
+ passed = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, get_durable1, [Config]).
+
+get_durable1(_Config) ->
+ QName1 = rabbit_misc:r(?VHOST, queue, <<"test-queue">>),
+ QName2 = rabbit_misc:r(?VHOST, queue, <<"test-queue2">>),
+ Q1 = new_queue(QName1, rabbit_classic_queue),
+ Q2 = new_queue(QName2, rabbit_classic_queue),
+ ok = rabbit_db_queue:set(Q1),
+ ok = rabbit_db_queue:set_dirty(Q2),
+ ?assertEqual({ok, Q1}, rabbit_db_queue:get_durable(QName1)),
+ ?assertEqual({error, not_found}, rabbit_db_queue:get_durable(QName2)),
+ passed.
+
+get_many_durable(Config) ->
+ passed = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, get_many_durable1, [Config]).
+
+get_many_durable1(_Config) ->
+ QName1 = rabbit_misc:r(?VHOST, queue, <<"test-queue">>),
+ QName2 = rabbit_misc:r(?VHOST, queue, <<"test-queue2">>),
+ Q1 = new_queue(QName1, rabbit_classic_queue),
+ Q2 = new_queue(QName2, rabbit_classic_queue),
+ ok = rabbit_db_queue:set(Q1),
+ ok = rabbit_db_queue:set_dirty(Q2),
+ ?assertEqual([Q1], rabbit_db_queue:get_many_durable([QName1])),
+ ?assertEqual([], rabbit_db_queue:get_many_durable([QName2])),
+ passed.
+
+update_durable(Config) ->
+ passed = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, update_durable1, [Config]).
+
+update_durable1(_Config) ->
+ QName1 = rabbit_misc:r(?VHOST, queue, <<"test-queue1">>),
+ QName2 = rabbit_misc:r(?VHOST, queue, <<"test-queue2">>),
+ Q1 = new_queue(QName1, rabbit_classic_queue),
+ Q2 = new_queue(QName2, rabbit_classic_queue),
+ ?assertEqual(ok, rabbit_db_queue:set(Q1)),
+ ?assertEqual(ok, rabbit_db_queue:set_dirty(Q2)),
+ ?assertEqual(ok, rabbit_db_queue:update_durable(
+ fun(Q0) ->
+ amqqueue:set_policy(Q0, my_policy)
+ end,
+ fun(Q0) when ?is_amqqueue(Q0) -> true end)),
+ {ok, Q0} = rabbit_db_queue:get_durable(QName1),
+ ?assertMatch(my_policy, amqqueue:get_policy(Q0)),
+ {ok, Q00} = rabbit_db_queue:get(QName1),
+ ?assertMatch(undefined, amqqueue:get_policy(Q00)),
+ passed.
+
+foreach_durable(Config) ->
+ passed = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, foreach_durable1, [Config]).
+
+foreach_durable1(_Config) ->
+ QName1 = rabbit_misc:r(?VHOST, queue, <<"test-queue1">>),
+ QName2 = rabbit_misc:r(?VHOST, queue, <<"test-queue2">>),
+ Q1 = new_queue(QName1, rabbit_classic_queue),
+ Q2 = new_queue(QName2, rabbit_classic_queue),
+ ?assertEqual(ok, rabbit_db_queue:set(Q1)),
+ ?assertEqual(ok, rabbit_db_queue:set_dirty(Q2)),
+ ?assertEqual(ok, rabbit_db_queue:foreach_durable(
+ fun(Q0) ->
+ rabbit_db_queue:internal_delete(amqqueue:get_name(Q0), true, normal)
+ end,
+ fun(Q0) when ?is_amqqueue(Q0) -> true end)),
+ ?assertEqual({error, not_found}, rabbit_db_queue:get(QName1)),
+ ?assertEqual({error, not_found}, rabbit_db_queue:get_durable(QName1)),
+ ?assertMatch({ok, _}, rabbit_db_queue:get(QName2)),
+ passed.
+
+foreach_transient(Config) ->
+ passed = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, foreach_transient1, [Config]).
+
+foreach_transient1(_Config) ->
+ QName1 = rabbit_misc:r(?VHOST, queue, <<"test-queue1">>),
+ QName2 = rabbit_misc:r(?VHOST, queue, <<"test-queue2">>),
+ Q1 = new_queue(QName1, rabbit_classic_queue),
+ Q2 = new_queue(QName2, rabbit_classic_queue),
+ ?assertEqual(ok, rabbit_db_queue:set(Q1)),
+ ?assertEqual(ok, rabbit_db_queue:set_dirty(Q2)),
+ ?assertEqual(ok, rabbit_db_queue:foreach_transient(
+ fun(Q0) ->
+ rabbit_db_queue:internal_delete(amqqueue:get_name(Q0), true, normal)
+ end)),
+ ?assertEqual({error, not_found}, rabbit_db_queue:get(QName1)),
+ ?assertEqual({error, not_found}, rabbit_db_queue:get_durable(QName1)),
+ ?assertEqual({error, not_found}, rabbit_db_queue:get(QName2)),
+ passed.
+
+delete_transient(Config) ->
+ passed = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_transient1, [Config]).
+
+delete_transient1(_Config) ->
+ QName1 = rabbit_misc:r(?VHOST, queue, <<"test-queue1">>),
+ QName2 = rabbit_misc:r(?VHOST, queue, <<"test-queue2">>),
+ Q1 = new_queue(QName1, rabbit_classic_queue),
+ Q2 = new_queue(QName2, rabbit_quorum_queue),
+ ?assertEqual(ok, rabbit_db_queue:set_dirty(Q1)),
+ ?assertEqual(ok, rabbit_db_queue:set_dirty(Q2)),
+ ?assertMatch({[QName1], _},
+ rabbit_db_queue:delete_transient(
+ fun(Q0) when ?is_amqqueue(Q0) ->
+ amqqueue:get_type(Q0) == rabbit_classic_queue
+ end)),
+ ?assertEqual({error, not_found}, rabbit_db_queue:get(QName1)),
+ ?assertMatch({ok, _}, rabbit_db_queue:get(QName2)),
+ passed.
+
+set_dirty(Config) ->
+ passed = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, set_dirty1, [Config]).
+
+set_dirty1(_Config) ->
+ QName1 = rabbit_misc:r(?VHOST, queue, <<"test-queue">>),
+ Q1 = new_queue(QName1, rabbit_classic_queue),
+ Q2 = amqqueue:set_decorators(Q1, []),
+ ok = rabbit_db_queue:set_dirty(Q1),
+ ?assertEqual({ok, Q2}, rabbit_db_queue:get(QName1)),
+ ?assertEqual({error, not_found}, rabbit_db_queue:get_durable(QName1)),
+ passed.
+
+internal_delete(Config) ->
+ passed = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, internal_delete1, [Config]).
+
+internal_delete1(_Config) ->
+ QName = rabbit_misc:r(?VHOST, queue, <<"test-queue">>),
+ Q = new_queue(QName, rabbit_classic_queue),
+ ?assertEqual(ok, rabbit_db_queue:set(Q)),
+ ?assertEqual(ok, rabbit_db_queue:foreach_durable(
+ fun(Q0) -> rabbit_db_queue:internal_delete(amqqueue:get_name(Q0),
+ false, normal) end,
+ fun(Q0) when ?is_amqqueue(Q0) -> true end)),
+ ?assertEqual({error, not_found}, rabbit_db_queue:get(QName)),
+ ?assertEqual({error, not_found}, rabbit_db_queue:get_durable(QName)),
+ passed.
+
+update_in_mnesia_tx(Config) ->
+ passed = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, update_in_mnesia_tx1, [Config]).
+
+update_in_mnesia_tx1(_Config) ->
+ QName = rabbit_misc:r(?VHOST, queue, <<"test-queue">>),
+ Q = new_queue(QName, rabbit_classic_queue),
+ Pid = spawn(fun() -> ok end),
+ ?assertEqual({atomic, not_found},
+ mnesia:transaction(fun() ->
+ rabbit_db_queue:update_in_mnesia_tx(
+ QName,
+ fun(Q0) -> amqqueue:set_pid(Q0, Pid) end)
+ end)),
+ ?assertEqual(ok, rabbit_db_queue:set(Q)),
+ {atomic, Q1} =
+ mnesia:transaction(fun() ->
+ rabbit_db_queue:update_in_mnesia_tx(
+ QName,
+ fun(Q0) -> amqqueue:set_pid(Q0, Pid) end)
+ end),
+ ?assertEqual(Pid, amqqueue:get_pid(Q1)),
+ ?assertEqual({ok, Q1}, rabbit_db_queue:get(QName)),
+ ?assertEqual({ok, Q1}, rabbit_db_queue:get_durable(QName)),
+ passed.
+
+get_durable_in_mnesia_tx(Config) ->
+ passed = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, get_durable_in_mnesia_tx1, [Config]).
+
+get_durable_in_mnesia_tx1(_Config) ->
+ QName = rabbit_misc:r(?VHOST, queue, <<"test-queue">>),
+ Q = new_queue(QName, rabbit_classic_queue),
+ ?assertEqual({atomic, {error, not_found}},
+ mnesia:transaction(fun() ->
+ rabbit_db_queue:get_durable_in_mnesia_tx(QName)
+ end)),
+ ?assertEqual(ok, rabbit_db_queue:set(Q)),
+ ?assertEqual({atomic, {ok, Q}},
+ mnesia:transaction(fun() ->
+ rabbit_db_queue:get_durable_in_mnesia_tx(QName)
+ end)),
+ passed.
+
+set_list(Qs) ->
+ [?assertEqual(ok, rabbit_db_queue:set(Q)) || Q <- Qs].
+
+new_queue(QName, Type) ->
+ new_queue(QName, Type, none).
+
+new_queue(#resource{virtual_host = VHost} = QName, Type, Pid) ->
+ amqqueue:new(QName, Pid, true, false, none, [], VHost, #{}, Type).
diff --git a/deps/rabbit/test/rabbit_db_topic_exchange_SUITE.erl b/deps/rabbit/test/rabbit_db_topic_exchange_SUITE.erl
new file mode 100644
index 0000000000..8c2d424e20
--- /dev/null
+++ b/deps/rabbit/test/rabbit_db_topic_exchange_SUITE.erl
@@ -0,0 +1,158 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2023 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(rabbit_db_topic_exchange_SUITE).
+
+-include_lib("rabbit_common/include/rabbit.hrl").
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("common_test/include/ct.hrl").
+
+-compile(export_all).
+
+-define(VHOST, <<"/">>).
+
+all() ->
+ [
+ {group, all_tests}
+ ].
+
+groups() ->
+ [
+ {all_tests, [], all_tests()}
+ ].
+
+all_tests() ->
+ [
+ set,
+ delete,
+ delete_all_for_exchange,
+ match
+ ].
+
+%% -------------------------------------------------------------------
+%% Test suite setup/teardown.
+%% -------------------------------------------------------------------
+
+init_per_suite(Config) ->
+ rabbit_ct_helpers:log_environment(),
+ rabbit_ct_helpers:run_setup_steps(Config).
+
+end_per_suite(Config) ->
+ rabbit_ct_helpers:run_teardown_steps(Config).
+
+init_per_group(Group, Config) ->
+ Config1 = rabbit_ct_helpers:set_config(Config, [
+ {rmq_nodename_suffix, Group},
+ {rmq_nodes_count, 1}
+ ]),
+ rabbit_ct_helpers:run_steps(Config1,
+ rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps()).
+
+end_per_group(_Group, Config) ->
+ rabbit_ct_helpers:run_steps(Config,
+ rabbit_ct_client_helpers:teardown_steps() ++
+ rabbit_ct_broker_helpers:teardown_steps()).
+
+init_per_testcase(Testcase, Config) ->
+ rabbit_ct_helpers:testcase_started(Config, Testcase).
+
+end_per_testcase(Testcase, Config) ->
+ rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_db_topic_exchange, clear, []),
+ rabbit_ct_helpers:testcase_finished(Config, Testcase).
+
+%% ---------------------------------------------------------------------------
+%% Test Cases
+%% ---------------------------------------------------------------------------
+
+set(Config) ->
+ passed = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, set1, [Config]).
+
+set1(_Config) ->
+ Src = rabbit_misc:r(?VHOST, exchange, <<"test-exchange">>),
+ Dst = rabbit_misc:r(?VHOST, queue, <<"test-queue">>),
+ RoutingKey = <<"a.b.c">>,
+ Binding = #binding{source = Src, key = RoutingKey, destination = Dst, args = #{}},
+ ?assertEqual([], rabbit_db_topic_exchange:match(Src, RoutingKey)),
+ ?assertEqual(ok, rabbit_db_topic_exchange:set(Binding)),
+ ?assertEqual(ok, rabbit_db_topic_exchange:set(Binding)),
+ ?assertEqual([Dst], rabbit_db_topic_exchange:match(Src, RoutingKey)),
+ passed.
+
+delete(Config) ->
+ passed = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete1, [Config]).
+
+delete1(_Config) ->
+ Src = rabbit_misc:r(?VHOST, exchange, <<"test-exchange">>),
+ Dst1 = rabbit_misc:r(?VHOST, queue, <<"test-queue1">>),
+ Dst2 = rabbit_misc:r(?VHOST, queue, <<"test-queue2">>),
+ Dst3= rabbit_misc:r(?VHOST, queue, <<"test-queue3">>),
+ Dsts = lists:sort([Dst1, Dst2, Dst3]),
+ RoutingKey = <<"a.b.c">>,
+ Binding1 = #binding{source = Src, key = RoutingKey, destination = Dst1, args = #{}},
+ Binding2 = #binding{source = Src, key = RoutingKey, destination = Dst2, args = #{}},
+ Binding3 = #binding{source = Src, key = RoutingKey, destination = Dst3, args = #{}},
+ ?assertEqual(ok, rabbit_db_topic_exchange:delete([Binding1])),
+ ?assertEqual(ok, rabbit_db_topic_exchange:set(Binding1)),
+ ?assertEqual(ok, rabbit_db_topic_exchange:set(Binding2)),
+ ?assertEqual(ok, rabbit_db_topic_exchange:set(Binding3)),
+ ?assertEqual(Dsts, lists:sort(rabbit_db_topic_exchange:match(Src, RoutingKey))),
+ ?assertEqual(ok, rabbit_db_topic_exchange:delete([Binding1, Binding2])),
+ ?assertEqual([Dst3], rabbit_db_topic_exchange:match(Src, RoutingKey)),
+ passed.
+
+delete_all_for_exchange(Config) ->
+ passed = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_all_for_exchange1, [Config]).
+
+delete_all_for_exchange1(_Config) ->
+ Src1 = rabbit_misc:r(?VHOST, exchange, <<"test-exchange1">>),
+ Src2 = rabbit_misc:r(?VHOST, exchange, <<"test-exchange2">>),
+ Dst1 = rabbit_misc:r(?VHOST, queue, <<"test-queue1">>),
+ Dst2 = rabbit_misc:r(?VHOST, queue, <<"test-queue2">>),
+ Dsts = lists:sort([Dst1, Dst2]),
+ RoutingKey = <<"a.b.c">>,
+ ?assertEqual(ok, rabbit_db_topic_exchange:delete_all_for_exchange(Src1)),
+ set(Src1, RoutingKey, Dst1),
+ set(Src1, RoutingKey, Dst2),
+ set(Src2, RoutingKey, Dst1),
+ ?assertEqual(Dsts, lists:sort(rabbit_db_topic_exchange:match(Src1, RoutingKey))),
+ ?assertEqual([Dst1], rabbit_db_topic_exchange:match(Src2, RoutingKey)),
+ ?assertEqual(ok, rabbit_db_topic_exchange:delete_all_for_exchange(Src1)),
+ ?assertEqual([], rabbit_db_topic_exchange:match(Src1, RoutingKey)),
+ ?assertEqual([Dst1], rabbit_db_topic_exchange:match(Src2, RoutingKey)),
+ passed.
+
+match(Config) ->
+ passed = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, match1, [Config]).
+
+match1(_Config) ->
+ Src = rabbit_misc:r(?VHOST, exchange, <<"test-exchange">>),
+ Dst1 = rabbit_misc:r(?VHOST, queue, <<"test-queue1">>),
+ Dst2 = rabbit_misc:r(?VHOST, queue, <<"test-queue2">>),
+ Dst3 = rabbit_misc:r(?VHOST, queue, <<"test-queue3">>),
+ Dst4 = rabbit_misc:r(?VHOST, queue, <<"test-queue4">>),
+ Dst5 = rabbit_misc:r(?VHOST, queue, <<"test-queue5">>),
+ Dst6 = rabbit_misc:r(?VHOST, queue, <<"test-queue6">>),
+ set(Src, <<"a.b.c">>, Dst1),
+ set(Src, <<"a.*.c">>, Dst2),
+ set(Src, <<"*.#">>, Dst3),
+ set(Src, <<"#">>, Dst4),
+ set(Src, <<"#.#">>, Dst5),
+ set(Src, <<"a.*">>, Dst6),
+ Dsts1 = lists:sort([Dst1, Dst2, Dst3, Dst4, Dst5]),
+ ?assertEqual(Dsts1, lists:usort(rabbit_db_topic_exchange:match(Src, <<"a.b.c">>))),
+ Dsts2 = lists:sort([Dst3, Dst4, Dst5, Dst6]),
+ ?assertEqual(Dsts2, lists:usort(rabbit_db_topic_exchange:match(Src, <<"a.b">>))),
+ Dsts3 = lists:sort([Dst4, Dst5]),
+ ?assertEqual(Dsts3, lists:usort(rabbit_db_topic_exchange:match(Src, <<"">>))),
+ Dsts4 = lists:sort([Dst3, Dst4, Dst5]),
+ ?assertEqual(Dsts4, lists:usort(rabbit_db_topic_exchange:match(Src, <<"zen.rabbit">>))),
+ passed.
+
+set(Src, RoutingKey, Dst) ->
+ Binding = #binding{source = Src, key = RoutingKey, destination = Dst, args = #{}},
+ ok = rabbit_db_topic_exchange:set(Binding).
diff --git a/deps/rabbit/test/topic_permission_SUITE.erl b/deps/rabbit/test/topic_permission_SUITE.erl
index 14474411a8..b39b3e8b77 100644
--- a/deps/rabbit/test/topic_permission_SUITE.erl
+++ b/deps/rabbit/test/topic_permission_SUITE.erl
@@ -47,10 +47,8 @@ init_per_testcase(Testcase, Config) ->
rabbit_ct_helpers:testcase_started(Config, Testcase).
clear_tables() ->
- {atomic, ok} = mnesia:clear_table(rabbit_topic_permission),
- {atomic, ok} = mnesia:clear_table(rabbit_vhost),
- {atomic, ok} = mnesia:clear_table(rabbit_user),
- ok.
+ ok = rabbit_db_vhost:clear(),
+ ok = rabbit_db_user:clear().
end_per_testcase(Testcase, Config) ->
rabbit_ct_helpers:testcase_finished(Config, Testcase).
@@ -145,14 +143,8 @@ topic_permission_checks(Config) ->
topic_permission_checks1(_Config) ->
0 = length(ets:tab2list(rabbit_topic_permission)),
- rabbit_mnesia:execute_mnesia_transaction(fun() ->
- ok = mnesia:write(rabbit_vhost,
- vhost:new(<<"/">>, []),
- write),
- ok = mnesia:write(rabbit_vhost,
- vhost:new(<<"other-vhost">>, []),
- write)
- end),
+ rabbit_db_vhost:create_or_get(<<"/">>, [], #{}),
+ rabbit_db_vhost:create_or_get(<<"other-vhost">>, [], #{}),
rabbit_auth_backend_internal:add_user(<<"guest">>, <<"guest">>, <<"acting-user">>),
rabbit_auth_backend_internal:add_user(<<"dummy">>, <<"dummy">>, <<"acting-user">>),
diff --git a/deps/rabbit/test/unit_access_control_SUITE.erl b/deps/rabbit/test/unit_access_control_SUITE.erl
index f3ee24cd67..d921ace881 100644
--- a/deps/rabbit/test/unit_access_control_SUITE.erl
+++ b/deps/rabbit/test/unit_access_control_SUITE.erl
@@ -391,7 +391,7 @@ topic_matching1(_Config) ->
lists:nth(11, Bindings), lists:nth(19, Bindings),
lists:nth(21, Bindings), lists:nth(28, Bindings)],
exchange_op_callback(X, remove_bindings, [RemovedBindings]),
- RemainingBindings = ordsets:to_list(
+ _RemainingBindings = ordsets:to_list(
ordsets:subtract(ordsets:from_list(Bindings),
ordsets:from_list(RemovedBindings))),
@@ -416,14 +416,12 @@ topic_matching1(_Config) ->
{"args-test", ["t6", "t22", "t23", "t24", "t25", "t27"]}]),
%% remove the entire exchange
- exchange_op_callback(X, delete, [RemainingBindings]),
+ exchange_op_callback(X, delete, []),
%% none should match now
test_topic_expect_match(X, [{"a.b.c", []}, {"b.b.c", []}, {"", []}]),
passed.
exchange_op_callback(X, Fun, Args) ->
- rabbit_mnesia:execute_mnesia_transaction(
- fun () -> rabbit_exchange:callback(X, Fun, transaction, [X] ++ Args) end),
rabbit_exchange:callback(X, Fun, none, [X] ++ Args).
test_topic_expect_match(X, List) ->
diff --git a/deps/rabbitmq_consistent_hash_exchange/BUILD.bazel b/deps/rabbitmq_consistent_hash_exchange/BUILD.bazel
index 4401984fb8..bba8fbc605 100644
--- a/deps/rabbitmq_consistent_hash_exchange/BUILD.bazel
+++ b/deps/rabbitmq_consistent_hash_exchange/BUILD.bazel
@@ -15,6 +15,7 @@ APP_NAME = "rabbitmq_consistent_hash_exchange"
APP_DESCRIPTION = "Consistent Hash Exchange Type"
BUILD_DEPS = [
+ "//deps/rabbit:erlang_app",
"//deps/rabbitmq_cli:erlang_app",
]
@@ -42,7 +43,7 @@ xref(
plt(
name = "base_plt",
libs = ["//deps/rabbitmq_cli:elixir"],
- deps = ["//deps/rabbitmq_cli:elixir"] + BUILD_DEPS + DEPS + RUNTIME_DEPS,
+ deps = ["//deps/rabbitmq_cli:elixir"] + BUILD_DEPS + DEPS,
)
dialyze(
diff --git a/deps/rabbitmq_consistent_hash_exchange/src/rabbit_exchange_type_consistent_hash.erl b/deps/rabbitmq_consistent_hash_exchange/src/rabbit_exchange_type_consistent_hash.erl
index 88f3af8a09..83c60d22bb 100644
--- a/deps/rabbitmq_consistent_hash_exchange/src/rabbit_exchange_type_consistent_hash.erl
+++ b/deps/rabbitmq_consistent_hash_exchange/src/rabbit_exchange_type_consistent_hash.erl
@@ -32,7 +32,7 @@
[exchange, <<"x-consistent-hash">>]}}]}).
-rabbit_boot_step(
- {rabbit_exchange_type_consistent_hash_mnesia,
+ {rabbit_exchange_type_consistent_hash_metadata_store,
[{description, "exchange type x-consistent-hash: shared state"},
{mfa, {?MODULE, init, []}},
{requires, database},
@@ -48,7 +48,7 @@
init() ->
rabbit_db_ch_exchange:setup_schema(),
- recover(),
+ _ = recover(),
ok.
info(_X) -> [].
diff --git a/deps/rabbitmq_federation/src/rabbit_federation_exchange_link.erl b/deps/rabbitmq_federation/src/rabbit_federation_exchange_link.erl
index 2cb9952072..e71c6a9509 100644
--- a/deps/rabbitmq_federation/src/rabbit_federation_exchange_link.erl
+++ b/deps/rabbitmq_federation/src/rabbit_federation_exchange_link.erl
@@ -438,12 +438,8 @@ go(S0 = {not_started, {Upstream, UParams, DownXName}}) ->
{longstr, N} -> N;
_ -> unknown
end,
- {Serial, Bindings} =
- rabbit_mnesia:execute_mnesia_transaction(
- fun () ->
- {rabbit_exchange:peek_serial(DownXName),
- rabbit_binding:list_for_source(DownXName)}
- end),
+ {Serial, Bindings} = {rabbit_exchange:peek_serial(DownXName),
+ rabbit_binding:list_for_source(DownXName)},
true = is_integer(Serial),
%% If we are very short lived, Serial can be undefined at
%% this point (since the deletion of the X could have
diff --git a/deps/rabbitmq_jms_topic_exchange/BUILD.bazel b/deps/rabbitmq_jms_topic_exchange/BUILD.bazel
index 2a7fb05433..20e1d48405 100644
--- a/deps/rabbitmq_jms_topic_exchange/BUILD.bazel
+++ b/deps/rabbitmq_jms_topic_exchange/BUILD.bazel
@@ -16,6 +16,10 @@ APP_DESCRIPTION = "RabbitMQ JMS topic selector exchange plugin"
APP_MODULE = "rabbit_federation_app"
+BUILD_DEPS = [
+ "//deps/rabbit:erlang_app",
+]
+
DEPS = [
"//deps/rabbit_common:erlang_app",
]
@@ -33,6 +37,7 @@ rabbitmq_app(
app_module = APP_MODULE,
app_name = APP_NAME,
runtime_deps = RUNTIME_DEPS,
+ build_deps = BUILD_DEPS,
deps = DEPS,
extra_apps = EXTRA_APPS,
)
diff --git a/deps/rabbitmq_jms_topic_exchange/src/rabbit_jms_topic_exchange.erl b/deps/rabbitmq_jms_topic_exchange/src/rabbit_jms_topic_exchange.erl
index 66288d3f9a..d9087215e9 100644
--- a/deps/rabbitmq_jms_topic_exchange/src/rabbit_jms_topic_exchange.erl
+++ b/deps/rabbitmq_jms_topic_exchange/src/rabbit_jms_topic_exchange.erl
@@ -124,10 +124,8 @@ add_binding( none
case BindGen of
{ok, BindFun} ->
add_binding_fun(XName, {{BindingKey, Dest}, BindFun});
- {none, error} ->
- parsing_error(XName, Selector, Dest);
- _ ->
- ok
+ error ->
+ parsing_error(XName, Selector, Dest)
end,
ok.
diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl
index 952b26e961..fea18533b1 100644
--- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl
+++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl
@@ -1253,7 +1253,7 @@ deliver_to_queues(Delivery,
RoutedToQNames,
State0 = #state{queue_states = QStates0,
cfg = #cfg{proto_ver = ProtoVer}}) ->
- Qs0 = rabbit_amqqueue:lookup(RoutedToQNames),
+ Qs0 = rabbit_amqqueue:lookup_many(RoutedToQNames),
Qs = rabbit_amqqueue:prepend_extra_bcc(Qs0),
case rabbit_queue_type:deliver(Qs, Delivery, QStates0) of
{ok, QStates, Actions} ->
diff --git a/deps/rabbitmq_random_exchange/BUILD.bazel b/deps/rabbitmq_random_exchange/BUILD.bazel
index 3bfc232699..803791b8db 100644
--- a/deps/rabbitmq_random_exchange/BUILD.bazel
+++ b/deps/rabbitmq_random_exchange/BUILD.bazel
@@ -11,6 +11,10 @@ APP_NAME = "rabbitmq_random_exchange"
APP_DESCRIPTION = "RabbitMQ Random Exchange"
+BUILD_DEPS = [
+ "//deps/rabbit:erlang_app",
+]
+
DEPS = [
"//deps/rabbit_common:erlang_app",
]
@@ -22,6 +26,7 @@ RUNTIME_DEPS = [
rabbitmq_app(
app_description = APP_DESCRIPTION,
app_name = APP_NAME,
+ build_deps = BUILD_DEPS,
runtime_deps = RUNTIME_DEPS,
deps = DEPS,
)
@@ -30,7 +35,7 @@ xref()
plt(
name = "base_plt",
- deps = DEPS,
+ deps = BUILD_DEPS + DEPS,
)
dialyze(
diff --git a/deps/rabbitmq_recent_history_exchange/BUILD.bazel b/deps/rabbitmq_recent_history_exchange/BUILD.bazel
index 39a84db6ec..458d84861e 100644
--- a/deps/rabbitmq_recent_history_exchange/BUILD.bazel
+++ b/deps/rabbitmq_recent_history_exchange/BUILD.bazel
@@ -14,6 +14,10 @@ APP_NAME = "rabbitmq_recent_history_exchange"
APP_DESCRIPTION = "RabbitMQ Recent History Exchange"
+BUILD_DEPS = [
+ "//deps/rabbit:erlang_app",
+]
+
DEPS = [
"//deps/rabbit_common:erlang_app",
]
@@ -26,6 +30,7 @@ rabbitmq_app(
app_description = APP_DESCRIPTION,
app_extra_keys = BROKER_VERSION_REQUIREMENTS_ANY,
app_name = APP_NAME,
+ build_deps = BUILD_DEPS,
runtime_deps = RUNTIME_DEPS,
deps = DEPS,
)
diff --git a/deps/rabbitmq_recent_history_exchange/src/rabbit_db_rh_exchange.erl b/deps/rabbitmq_recent_history_exchange/src/rabbit_db_rh_exchange.erl
index 1dde6fda15..6feee9ad9b 100644
--- a/deps/rabbitmq_recent_history_exchange/src/rabbit_db_rh_exchange.erl
+++ b/deps/rabbitmq_recent_history_exchange/src/rabbit_db_rh_exchange.erl
@@ -84,13 +84,6 @@ insert0_in_mnesia(Key, Cached, Message, Length) ->
content = [Message|lists:sublist(Cached, Length-1)]},
write).
-add_to_cache(Cached, Message, undefined) ->
- add_to_cache(Cached, Message, ?KEEP_NB);
-add_to_cache(Cached, Message, {_Type, Length}) ->
- add_to_cache(Cached, Message, Length);
-add_to_cache(Cached, Message, Length) ->
- [Message|lists:sublist(Cached, Length-1)].
-
%% -------------------------------------------------------------------
%% delete().
%% -------------------------------------------------------------------
diff --git a/deps/rabbitmq_recent_history_exchange/src/rabbit_exchange_type_recent_history.erl b/deps/rabbitmq_recent_history_exchange/src/rabbit_exchange_type_recent_history.erl
index 685c8463db..7b20758196 100644
--- a/deps/rabbitmq_recent_history_exchange/src/rabbit_exchange_type_recent_history.erl
+++ b/deps/rabbitmq_recent_history_exchange/src/rabbit_exchange_type_recent_history.erl
@@ -28,8 +28,8 @@
{requires, rabbit_registry},
{enables, kernel_ready}]}).
--rabbit_boot_step({rabbit_exchange_type_recent_history_mnesia,
- [{description, "recent history exchange type: mnesia"},
+-rabbit_boot_step({rabbit_exchange_type_recent_history_metadata_store,
+ [{description, "recent history exchange type: metadata store"},
{mfa, {?MODULE, setup_schema, []}},
{requires, database},
{enables, external_infrastructure}]}).
@@ -96,7 +96,7 @@ add_binding(none, #exchange{ name = XName },
[begin
Delivery = rabbit_basic:delivery(false, false, Msg, undefined),
Qs = rabbit_exchange:route(X, Delivery),
- case rabbit_amqqueue:lookup(Qs) of
+ case rabbit_amqqueue:lookup_many(Qs) of
[] ->
destination_not_found_error(Qs);
QPids ->