summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit.erl12
-rw-r--r--src/rabbit_amqqueue_process.erl2
-rw-r--r--src/rabbit_mirror_queue_master.erl28
-rw-r--r--src/rabbit_mirror_queue_slave.erl56
-rw-r--r--src/rabbit_mnesia.erl11
-rw-r--r--src/rabbit_msg_store_gc.erl8
-rw-r--r--src/rabbit_peer_discovery.erl62
-rw-r--r--src/rabbit_peer_discovery_classic_config.erl7
-rw-r--r--src/rabbit_peer_discovery_dns.erl9
-rw-r--r--src/rabbit_queue_consumers.erl5
-rw-r--r--src/rabbit_queue_index.erl22
11 files changed, 152 insertions, 70 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 680a6a2a98..c52949af4c 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -22,7 +22,7 @@
stop_and_halt/0, await_startup/0, status/0, is_running/0, alarms/0,
is_running/1, environment/0, rotate_logs/0, force_event_refresh/1,
start_fhc/0]).
--export([start/2, stop/1]).
+-export([start/2, stop/1, prep_stop/1]).
-export([start_apps/1, stop_apps/1]).
-export([log_locations/0, config_files/0, decrypt_config/2]). %% for testing and mgmt-agent
@@ -327,7 +327,9 @@ broker_start() ->
ToBeLoaded = Plugins ++ ?APPS,
start_apps(ToBeLoaded),
maybe_sd_notify(),
- ok = log_broker_started(rabbit_plugins:strictly_plugins(rabbit_plugins:active())).
+ ok = log_broker_started(rabbit_plugins:strictly_plugins(rabbit_plugins:active())),
+ rabbit_peer_discovery:maybe_register(),
+ ok.
%% Try to send systemd ready notification if it makes sense in the
%% current environment. standard_error is used intentionally in all
@@ -471,6 +473,8 @@ stop() ->
end,
rabbit_log:info("RabbitMQ is asked to stop...~n", []),
Apps = ?APPS ++ rabbit_plugins:active(),
+ %% this will also perform unregistration with the peer discovery backend
+ %% as needed
stop_apps(app_utils:app_dependency_order(Apps, true)),
rabbit_log:info("Successfully stopped RabbitMQ and its dependencies~n", []).
@@ -759,6 +763,10 @@ start(normal, []) ->
Error
end.
+prep_stop(State) ->
+ rabbit_peer_discovery:maybe_unregister(),
+ State.
+
stop(_State) ->
ok = rabbit_alarm:stop(),
ok = case rabbit_mnesia:is_clustered() of
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index e9f91041e7..c52d329392 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -99,7 +99,7 @@
-spec info_keys() -> rabbit_types:info_keys().
-spec init_with_backing_queue_state
(rabbit_types:amqqueue(), atom(), tuple(), any(),
- [rabbit_types:delivery()], pmon:pmon(), dict:dict()) ->
+ [rabbit_types:delivery()], pmon:pmon(), gb_trees:tree()) ->
#q{}.
%%----------------------------------------------------------------------------
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index b006e37eb2..b9952178e0 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -57,13 +57,13 @@
coordinator :: pid(),
backing_queue :: atom(),
backing_queue_state :: any(),
- seen_status :: dict:dict(),
+ seen_status :: map(),
confirmed :: [rabbit_guid:guid()],
known_senders :: sets:set()
}.
-spec promote_backing_queue_state
(rabbit_amqqueue:name(), pid(), atom(), any(), pid(), [any()],
- dict:dict(), [pid()]) ->
+ map(), [pid()]) ->
master_state().
-spec sender_death_fun() -> death_fun().
@@ -127,7 +127,7 @@ init_with_existing_bq(Q = #amqqueue{name = QName}, BQ, BQS) ->
coordinator = CPid,
backing_queue = BQ,
backing_queue_state = BQS,
- seen_status = dict:new(),
+ seen_status = #{},
confirmed = [],
known_senders = sets:new(),
wait_timeout = rabbit_misc:get_env(rabbit, slave_wait_timeout, 15000) };
@@ -266,7 +266,7 @@ publish(Msg = #basic_message { id = MsgId }, MsgProps, IsDelivered, ChPid, Flow,
seen_status = SS,
backing_queue = BQ,
backing_queue_state = BQS }) ->
- false = dict:is_key(MsgId, SS), %% ASSERTION
+ false = maps:is_key(MsgId, SS), %% ASSERTION
ok = gm:broadcast(GM, {publish, ChPid, Flow, MsgProps, Msg},
rabbit_basic:msg_size(Msg)),
BQS1 = BQ:publish(Msg, MsgProps, IsDelivered, ChPid, Flow, BQS),
@@ -281,7 +281,7 @@ batch_publish(Publishes, ChPid, Flow,
lists:foldl(fun ({Msg = #basic_message { id = MsgId },
MsgProps, _IsDelivered}, {Pubs, false, Sizes}) ->
{[{Msg, MsgProps, true} | Pubs], %% [0]
- false = dict:is_key(MsgId, SS), %% ASSERTION
+ false = maps:is_key(MsgId, SS), %% ASSERTION
Sizes + rabbit_basic:msg_size(Msg)}
end, {[], false, 0}, Publishes),
Publishes2 = lists:reverse(Publishes1),
@@ -298,7 +298,7 @@ publish_delivered(Msg = #basic_message { id = MsgId }, MsgProps,
seen_status = SS,
backing_queue = BQ,
backing_queue_state = BQS }) ->
- false = dict:is_key(MsgId, SS), %% ASSERTION
+ false = maps:is_key(MsgId, SS), %% ASSERTION
ok = gm:broadcast(GM, {publish_delivered, ChPid, Flow, MsgProps, Msg},
rabbit_basic:msg_size(Msg)),
{AckTag, BQS1} = BQ:publish_delivered(Msg, MsgProps, ChPid, Flow, BQS),
@@ -313,7 +313,7 @@ batch_publish_delivered(Publishes, ChPid, Flow,
{false, MsgSizes} =
lists:foldl(fun ({Msg = #basic_message { id = MsgId }, _MsgProps},
{false, Sizes}) ->
- {false = dict:is_key(MsgId, SS), %% ASSERTION
+ {false = maps:is_key(MsgId, SS), %% ASSERTION
Sizes + rabbit_basic:msg_size(Msg)}
end, {false, 0}, Publishes),
ok = gm:broadcast(GM, {batch_publish_delivered, ChPid, Flow, Publishes},
@@ -326,7 +326,7 @@ discard(MsgId, ChPid, Flow, State = #state { gm = GM,
backing_queue = BQ,
backing_queue_state = BQS,
seen_status = SS }) ->
- false = dict:is_key(MsgId, SS), %% ASSERTION
+ false = maps:is_key(MsgId, SS), %% ASSERTION
ok = gm:broadcast(GM, {discard, ChPid, Flow, MsgId}),
ensure_monitoring(ChPid,
State #state { backing_queue_state =
@@ -353,7 +353,7 @@ drain_confirmed(State = #state { backing_queue = BQ,
lists:foldl(
fun (MsgId, {MsgIdsN, SSN}) ->
%% We will never see 'discarded' here
- case dict:find(MsgId, SSN) of
+ case maps:find(MsgId, SSN) of
error ->
{[MsgId | MsgIdsN], SSN};
{ok, published} ->
@@ -364,7 +364,7 @@ drain_confirmed(State = #state { backing_queue = BQ,
%% consequently we need to filter out the
%% confirm here. We will issue the confirm
%% when we see the publish from the channel.
- {MsgIdsN, dict:store(MsgId, confirmed, SSN)};
+ {MsgIdsN, maps:put(MsgId, confirmed, SSN)};
{ok, confirmed} ->
%% Well, confirms are racy by definition.
{[MsgId | MsgIdsN], SSN}
@@ -457,7 +457,7 @@ msg_rates(#state { backing_queue = BQ, backing_queue_state = BQS }) ->
info(backing_queue_status,
State = #state { backing_queue = BQ, backing_queue_state = BQS }) ->
BQ:info(backing_queue_status, BQS) ++
- [ {mirror_seen, dict:size(State #state.seen_status)},
+ [ {mirror_seen, maps:size(State #state.seen_status)},
{mirror_senders, sets:size(State #state.known_senders)} ];
info(Item, #state { backing_queue = BQ, backing_queue_state = BQS }) ->
BQ:info(Item, BQS).
@@ -480,7 +480,7 @@ is_duplicate(Message = #basic_message { id = MsgId },
%% it.
%% We will never see {published, ChPid, MsgSeqNo} here.
- case dict:find(MsgId, SS) of
+ case maps:find(MsgId, SS) of
error ->
%% We permit the underlying BQ to have a peek at it, but
%% only if we ourselves are not filtering out the msg.
@@ -494,7 +494,7 @@ is_duplicate(Message = #basic_message { id = MsgId },
%% immediately after calling is_duplicate). The msg is
%% invalid. We will not see this again, nor will we be
%% further involved in confirming this message, so erase.
- {true, State #state { seen_status = dict:erase(MsgId, SS) }};
+ {true, State #state { seen_status = maps:remove(MsgId, SS) }};
{ok, Disposition}
when Disposition =:= confirmed
%% It got published when we were a slave via gm, and
@@ -509,7 +509,7 @@ is_duplicate(Message = #basic_message { id = MsgId },
%% Message was discarded while we were a slave. Confirm now.
%% As above, amqqueue_process will have the entry for the
%% msg_id_to_channel mapping.
- {true, State #state { seen_status = dict:erase(MsgId, SS),
+ {true, State #state { seen_status = maps:remove(MsgId, SS),
confirmed = [MsgId | Confirmed] }}
end.
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 61623c9441..748a5afdf5 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -129,10 +129,10 @@ handle_go(Q = #amqqueue{name = QName}) ->
rate_timer_ref = undefined,
sync_timer_ref = undefined,
- sender_queues = dict:new(),
- msg_id_ack = dict:new(),
+ sender_queues = #{},
+ msg_id_ack = #{},
- msg_id_status = dict:new(),
+ msg_id_status = #{},
known_senders = pmon:new(delegate),
depth_delta = undefined
@@ -310,7 +310,7 @@ handle_cast({sync_start, Ref, Syncer},
State1 = #state{rate_timer_ref = TRef} = ensure_rate_timer(State),
S = fun({MA, TRefN, BQSN}) ->
State1#state{depth_delta = undefined,
- msg_id_ack = dict:from_list(MA),
+ msg_id_ack = maps:from_list(MA),
rate_timer_ref = TRefN,
backing_queue_state = BQSN}
end,
@@ -546,7 +546,7 @@ send_or_record_confirm(published, #delivery { sender = ChPid,
id = MsgId,
is_persistent = true } },
MS, #state { q = #amqqueue { durable = true } }) ->
- dict:store(MsgId, {published, ChPid, MsgSeqNo} , MS);
+ maps:put(MsgId, {published, ChPid, MsgSeqNo} , MS);
send_or_record_confirm(_Status, #delivery { sender = ChPid,
confirm = true,
msg_seq_no = MsgSeqNo },
@@ -559,7 +559,7 @@ confirm_messages(MsgIds, State = #state { msg_id_status = MS }) ->
lists:foldl(
fun (MsgId, {CMsN, MSN} = Acc) ->
%% We will never see 'discarded' here
- case dict:find(MsgId, MSN) of
+ case maps:find(MsgId, MSN) of
error ->
%% If it needed confirming, it'll have
%% already been done.
@@ -567,12 +567,12 @@ confirm_messages(MsgIds, State = #state { msg_id_status = MS }) ->
{ok, published} ->
%% Still not seen it from the channel, just
%% record that it's been confirmed.
- {CMsN, dict:store(MsgId, confirmed, MSN)};
+ {CMsN, maps:put(MsgId, confirmed, MSN)};
{ok, {published, ChPid, MsgSeqNo}} ->
%% Seen from both GM and Channel. Can now
%% confirm.
{rabbit_misc:gb_trees_cons(ChPid, MsgSeqNo, CMsN),
- dict:erase(MsgId, MSN)};
+ maps:remove(MsgId, MSN)};
{ok, confirmed} ->
%% It's already been confirmed. This is
%% probably it's been both sync'd to disk
@@ -672,21 +672,21 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName },
%% Master, or MTC in queue_process.
St = [published, confirmed, discarded],
- SS = dict:filter(fun (_MsgId, Status) -> lists:member(Status, St) end, MS),
- AckTags = [AckTag || {_MsgId, AckTag} <- dict:to_list(MA)],
+ SS = maps:filter(fun (_MsgId, Status) -> lists:member(Status, St) end, MS),
+ AckTags = [AckTag || {_MsgId, AckTag} <- maps:to_list(MA)],
MasterState = rabbit_mirror_queue_master:promote_backing_queue_state(
QName, CPid, BQ, BQS, GM, AckTags, SS, MPids),
- MTC = dict:fold(fun (MsgId, {published, ChPid, MsgSeqNo}, MTC0) ->
+ MTC = maps:fold(fun (MsgId, {published, ChPid, MsgSeqNo}, MTC0) ->
gb_trees:insert(MsgId, {ChPid, MsgSeqNo}, MTC0);
(_Msgid, _Status, MTC0) ->
MTC0
end, gb_trees:empty(), MS),
Deliveries = [promote_delivery(Delivery) ||
- {_ChPid, {PubQ, _PendCh, _ChState}} <- dict:to_list(SQ),
+ {_ChPid, {PubQ, _PendCh, _ChState}} <- maps:to_list(SQ),
Delivery <- queue:to_list(PubQ)],
- AwaitGmDown = [ChPid || {ChPid, {_, _, down_from_ch}} <- dict:to_list(SQ)],
+ AwaitGmDown = [ChPid || {ChPid, {_, _, down_from_ch}} <- maps:to_list(SQ)],
KS1 = lists:foldl(fun (ChPid0, KS0) ->
pmon:demonitor(ChPid0, KS0)
end, KS, AwaitGmDown),
@@ -798,20 +798,20 @@ forget_sender(Down1, Down2) when Down1 =/= Down2 -> true.
maybe_forget_sender(ChPid, ChState, State = #state { sender_queues = SQ,
msg_id_status = MS,
known_senders = KS }) ->
- case dict:find(ChPid, SQ) of
+ case maps:find(ChPid, SQ) of
error ->
State;
{ok, {MQ, PendCh, ChStateRecord}} ->
case forget_sender(ChState, ChStateRecord) of
true ->
credit_flow:peer_down(ChPid),
- State #state { sender_queues = dict:erase(ChPid, SQ),
+ State #state { sender_queues = maps:remove(ChPid, SQ),
msg_id_status = lists:foldl(
- fun dict:erase/2,
+ fun maps:remove/2,
MS, sets:to_list(PendCh)),
known_senders = pmon:demonitor(ChPid, KS) };
false ->
- SQ1 = dict:store(ChPid, {MQ, PendCh, ChState}, SQ),
+ SQ1 = maps:put(ChPid, {MQ, PendCh, ChState}, SQ),
State #state { sender_queues = SQ1 }
end
end.
@@ -823,32 +823,32 @@ maybe_enqueue_message(
send_mandatory(Delivery), %% must do this before confirms
State1 = ensure_monitoring(ChPid, State),
%% We will never see {published, ChPid, MsgSeqNo} here.
- case dict:find(MsgId, MS) of
+ case maps:find(MsgId, MS) of
error ->
{MQ, PendingCh, ChState} = get_sender_queue(ChPid, SQ),
MQ1 = queue:in(Delivery, MQ),
- SQ1 = dict:store(ChPid, {MQ1, PendingCh, ChState}, SQ),
+ SQ1 = maps:put(ChPid, {MQ1, PendingCh, ChState}, SQ),
State1 #state { sender_queues = SQ1 };
{ok, Status} ->
MS1 = send_or_record_confirm(
- Status, Delivery, dict:erase(MsgId, MS), State1),
+ Status, Delivery, maps:remove(MsgId, MS), State1),
SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ),
State1 #state { msg_id_status = MS1,
sender_queues = SQ1 }
end.
get_sender_queue(ChPid, SQ) ->
- case dict:find(ChPid, SQ) of
+ case maps:find(ChPid, SQ) of
error -> {queue:new(), sets:new(), running};
{ok, Val} -> Val
end.
remove_from_pending_ch(MsgId, ChPid, SQ) ->
- case dict:find(ChPid, SQ) of
+ case maps:find(ChPid, SQ) of
error ->
SQ;
{ok, {MQ, PendingCh, ChState}} ->
- dict:store(ChPid, {MQ, sets:del_element(MsgId, PendingCh), ChState},
+ maps:put(ChPid, {MQ, sets:del_element(MsgId, PendingCh), ChState},
SQ)
end.
@@ -865,7 +865,7 @@ publish_or_discard(Status, ChPid, MsgId,
case queue:out(MQ) of
{empty, _MQ2} ->
{MQ, sets:add_element(MsgId, PendingCh),
- dict:store(MsgId, Status, MS)};
+ maps:put(MsgId, Status, MS)};
{{value, Delivery = #delivery {
message = #basic_message { id = MsgId } }}, MQ2} ->
{MQ2, PendingCh,
@@ -880,7 +880,7 @@ publish_or_discard(Status, ChPid, MsgId,
%% expecting any confirms from us.
{MQ, PendingCh, MS}
end,
- SQ1 = dict:store(ChPid, {MQ1, PendingCh1, ChState}, SQ),
+ SQ1 = maps:put(ChPid, {MQ1, PendingCh1, ChState}, SQ),
State1 #state { sender_queues = SQ1, msg_id_status = MS1 }.
@@ -1002,9 +1002,9 @@ msg_ids_to_acktags(MsgIds, MA) ->
{AckTags, MA1} =
lists:foldl(
fun (MsgId, {Acc, MAN}) ->
- case dict:find(MsgId, MA) of
+ case maps:find(MsgId, MA) of
error -> {Acc, MAN};
- {ok, AckTag} -> {[AckTag | Acc], dict:erase(MsgId, MAN)}
+ {ok, AckTag} -> {[AckTag | Acc], maps:remove(MsgId, MAN)}
end
end, {[], MA}, MsgIds),
{lists:reverse(AckTags), MA1}.
@@ -1012,7 +1012,7 @@ msg_ids_to_acktags(MsgIds, MA) ->
maybe_store_ack(false, _MsgId, _AckTag, State) ->
State;
maybe_store_ack(true, MsgId, AckTag, State = #state { msg_id_ack = MA }) ->
- State #state { msg_id_ack = dict:store(MsgId, AckTag, MA) }.
+ State #state { msg_id_ack = maps:put(MsgId, AckTag, MA) }.
set_delta(0, State = #state { depth_delta = undefined }) ->
ok = record_synchronised(State#state.q),
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 6d3b47a405..68fe5468af 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -139,14 +139,17 @@ init_from_config() ->
{ok, _} ->
e(invalid_cluster_nodes_conf)
end,
- case DiscoveredNodes of
+ rabbit_log:info("All discovered existing cluster peers: ~p~n",
+ [rabbit_peer_discovery:format_discovered_nodes(DiscoveredNodes)]),
+ Peers = nodes_excl_me(DiscoveredNodes),
+ case Peers of
[] ->
rabbit_log:info("Discovered no peer nodes to cluster with"),
init_db_and_upgrade([node()], disc, false, _Retry = true);
_ ->
- rabbit_log:info("Discovered peer nodes: ~s~n",
- [rabbit_peer_discovery:format_discovered_nodes(DiscoveredNodes)]),
- join_discovered_peers(DiscoveredNodes, NodeType)
+ rabbit_log:info("Peer nodes we can cluster with: ~s~n",
+ [rabbit_peer_discovery:format_discovered_nodes(Peers)]),
+ join_discovered_peers(Peers, NodeType)
end.
%% Attempts to join discovered,
diff --git a/src/rabbit_msg_store_gc.erl b/src/rabbit_msg_store_gc.erl
index 6179ef95bb..728c9652d0 100644
--- a/src/rabbit_msg_store_gc.erl
+++ b/src/rabbit_msg_store_gc.erl
@@ -70,7 +70,7 @@ set_maximum_since_use(Pid, Age) ->
init([MsgStoreState]) ->
ok = file_handle_cache:register_callback(?MODULE, set_maximum_since_use,
[self()]),
- {ok, #state { pending_no_readers = dict:new(),
+ {ok, #state { pending_no_readers = #{},
on_action = [],
msg_store_state = MsgStoreState }, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
@@ -89,11 +89,11 @@ handle_cast({delete, File}, State) ->
handle_cast({no_readers, File},
State = #state { pending_no_readers = Pending }) ->
- {noreply, case dict:find(File, Pending) of
+ {noreply, case maps:find(File, Pending) of
error ->
State;
{ok, {Action, Files}} ->
- Pending1 = dict:erase(File, Pending),
+ Pending1 = maps:remove(File, Pending),
attempt_action(
Action, Files,
State #state { pending_no_readers = Pending1 })
@@ -123,7 +123,7 @@ attempt_action(Action, Files,
fun (Thunk) -> not Thunk() end,
[do_action(Action, Files, MsgStoreState) |
Thunks]) };
- [File | _] -> Pending1 = dict:store(File, {Action, Files}, Pending),
+ [File | _] -> Pending1 = maps:put(File, {Action, Files}, Pending),
State #state { pending_no_readers = Pending1 }
end.
diff --git a/src/rabbit_peer_discovery.erl b/src/rabbit_peer_discovery.erl
index 7cdb35f6f4..98d2f9e7c5 100644
--- a/src/rabbit_peer_discovery.erl
+++ b/src/rabbit_peer_discovery.erl
@@ -21,7 +21,8 @@
%%
-export([discover_cluster_nodes/0, backend/0, node_type/0,
- normalize/1, format_discovered_nodes/1, log_configured_backend/0]).
+ normalize/1, format_discovered_nodes/1, log_configured_backend/0,
+ register/0, unregister/0, maybe_register/0, maybe_unregister/0]).
-export([append_node_prefix/1, node_prefix/0]).
-define(DEFAULT_BACKEND, rabbit_peer_discovery_classic_config).
@@ -72,6 +73,64 @@ discover_cluster_nodes() ->
normalize(Backend:list_nodes()).
+-spec maybe_register() -> ok.
+
+maybe_register() ->
+ Backend = backend(),
+ case Backend:supports_registration() of
+ true ->
+ register();
+ false ->
+ rabbit_log:info("Peer discovery backend ~s does not support registration, skipping registration.", [Backend]),
+ ok
+ end.
+
+
+-spec maybe_unregister() -> ok.
+
+maybe_unregister() ->
+ Backend = backend(),
+ case Backend:supports_registration() of
+ true ->
+ unregister();
+ false ->
+ rabbit_log:info("Peer discovery backend ~s does not support registration, skipping unregistration.", [Backend]),
+ ok
+ end.
+
+
+-spec register() -> ok.
+
+register() ->
+ Backend = backend(),
+ rabbit_log:info("Will register with peer discovery backend ~s", [Backend]),
+ case Backend:register() of
+ ok -> ok;
+ {error, Error} ->
+ rabbit_log:error("Failed to register with peer discovery backend ~s: ~p",
+ [Backend, Error]),
+ ok
+ end.
+
+
+-spec unregister() -> ok.
+
+unregister() ->
+ Backend = backend(),
+ rabbit_log:info("Will unregister with peer discovery backend ~s", [Backend]),
+ case Backend:unregister() of
+ ok -> ok;
+ {error, Error} ->
+ rabbit_log:error("Failed to unregister with peer discovery backend ~s: ~p",
+ [Backend, Error]),
+ ok
+ end.
+
+
+%%
+%% Implementation
+%%
+
-spec normalize(Nodes :: list() |
{Nodes :: list(), NodeType :: rabbit_types:node_type()} |
{ok, Nodes :: list()} |
@@ -90,7 +149,6 @@ normalize({ok, {Nodes, NodeType}}) when is_list(Nodes) andalso is_atom(NodeType)
normalize({error, Reason}) ->
{error, Reason}.
-
-spec format_discovered_nodes(Nodes :: list()) -> string().
format_discovered_nodes(Nodes) ->
diff --git a/src/rabbit_peer_discovery_classic_config.erl b/src/rabbit_peer_discovery_classic_config.erl
index 95e5532548..685e4b1639 100644
--- a/src/rabbit_peer_discovery_classic_config.erl
+++ b/src/rabbit_peer_discovery_classic_config.erl
@@ -19,7 +19,7 @@
-include("rabbit.hrl").
--export([list_nodes/0, register/0, unregister/0]).
+-export([list_nodes/0, supports_registration/0, register/0, unregister/0]).
%%
%% API
@@ -34,6 +34,11 @@ list_nodes() ->
undefined -> {[], disc}
end.
+-spec supports_registration() -> boolean().
+
+supports_registration() ->
+ false.
+
-spec register() -> ok.
register() ->
diff --git a/src/rabbit_peer_discovery_dns.erl b/src/rabbit_peer_discovery_dns.erl
index b7c623ee55..f048a40c89 100644
--- a/src/rabbit_peer_discovery_dns.erl
+++ b/src/rabbit_peer_discovery_dns.erl
@@ -19,7 +19,7 @@
-include("rabbit.hrl").
--export([list_nodes/0, register/0, unregister/0]).
+-export([list_nodes/0, supports_registration/0, register/0, unregister/0]).
%% for tests
-export([discover_nodes/2, discover_hostnames/2]).
@@ -48,6 +48,13 @@ list_nodes() ->
end
end.
+
+-spec supports_registration() -> boolean().
+
+supports_registration() ->
+ false.
+
+
-spec register() -> ok.
register() ->
diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl
index 4e9715fba2..f13a46fcf3 100644
--- a/src/rabbit_queue_consumers.erl
+++ b/src/rabbit_queue_consumers.erl
@@ -64,7 +64,8 @@
-spec max_active_priority(state()) -> integer() | 'infinity' | 'empty'.
-spec inactive(state()) -> boolean().
-spec all(state()) -> [{ch(), rabbit_types:ctag(), boolean(),
- non_neg_integer(), rabbit_framing:amqp_table()}].
+ non_neg_integer(), rabbit_framing:amqp_table(),
+ rabbit_types:username()}].
-spec count() -> non_neg_integer().
-spec unacknowledged_message_count() -> non_neg_integer().
-spec add(ch(), rabbit_types:ctag(), boolean(), pid(), boolean(),
@@ -280,7 +281,7 @@ subtract_acks([T | TL] = AckTags, Prefix, CTagCounts, AckQ) ->
orddict:update_counter(CTag, 1, CTagCounts), QTail);
{{value, V}, QTail} ->
subtract_acks(AckTags, [V | Prefix], CTagCounts, QTail);
- {empty, _} ->
+ {empty, _} ->
subtract_acks([], Prefix, CTagCounts, AckQ)
end.
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index bf80fe53a5..67f783a8dd 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -80,7 +80,7 @@
%% contains a mapping from segment numbers to state-per-segment (this
%% state is held for all segments which have been "seen": thus a
%% segment which has been read but has no pending entries in the
-%% journal is still held in this mapping. Also note that a dict is
+%% journal is still held in this mapping. Also note that a map is
%% used for this mapping, not an array because with an array, you will
%% always have entries from 0). Actions are stored directly in this
%% state. Thus at the point of flushing the journal, firstly no
@@ -233,10 +233,10 @@
unacked :: non_neg_integer()
}).
-type seq_id() :: integer().
--type seg_dict() :: {dict:dict(), [segment()]}.
+-type seg_map() :: {map(), [segment()]}.
-type on_sync_fun() :: fun ((gb_sets:set()) -> ok).
-type qistate() :: #qistate { dir :: file:filename(),
- segments :: 'undefined' | seg_dict(),
+ segments :: 'undefined' | seg_map(),
journal_handle :: hdl(),
dirty_count :: integer(),
max_journal_entries :: non_neg_integer(),
@@ -995,7 +995,7 @@ segment_find(Seg, {_Segments, [Segment = #segment { num = Seg } |_]}) ->
segment_find(Seg, {_Segments, [_, Segment = #segment { num = Seg }]}) ->
{ok, Segment}; %% 2, matches tail
segment_find(Seg, {Segments, _}) -> %% no match
- dict:find(Seg, Segments).
+ maps:find(Seg, Segments).
segment_store(Segment = #segment { num = Seg }, %% 1 or (2, matches head)
{Segments, [#segment { num = Seg } | Tail]}) ->
@@ -1004,28 +1004,28 @@ segment_store(Segment = #segment { num = Seg }, %% 2, matches tail
{Segments, [SegmentA, #segment { num = Seg }]}) ->
{Segments, [Segment, SegmentA]};
segment_store(Segment = #segment { num = Seg }, {Segments, []}) ->
- {dict:erase(Seg, Segments), [Segment]};
+ {maps:remove(Seg, Segments), [Segment]};
segment_store(Segment = #segment { num = Seg }, {Segments, [SegmentA]}) ->
- {dict:erase(Seg, Segments), [Segment, SegmentA]};
+ {maps:remove(Seg, Segments), [Segment, SegmentA]};
segment_store(Segment = #segment { num = Seg },
{Segments, [SegmentA, SegmentB]}) ->
- {dict:store(SegmentB#segment.num, SegmentB, dict:erase(Seg, Segments)),
+ {maps:put(SegmentB#segment.num, SegmentB, maps:remove(Seg, Segments)),
[Segment, SegmentA]}.
segment_fold(Fun, Acc, {Segments, CachedSegments}) ->
- dict:fold(fun (_Seg, Segment, Acc1) -> Fun(Segment, Acc1) end,
+ maps:fold(fun (_Seg, Segment, Acc1) -> Fun(Segment, Acc1) end,
lists:foldl(Fun, Acc, CachedSegments), Segments).
segment_map(Fun, {Segments, CachedSegments}) ->
- {dict:map(fun (_Seg, Segment) -> Fun(Segment) end, Segments),
+ {maps:map(fun (_Seg, Segment) -> Fun(Segment) end, Segments),
lists:map(Fun, CachedSegments)}.
segment_nums({Segments, CachedSegments}) ->
lists:map(fun (#segment { num = Num }) -> Num end, CachedSegments) ++
- dict:fetch_keys(Segments).
+ maps:keys(Segments).
segments_new() ->
- {dict:new(), []}.
+ {#{}, []}.
entry_to_segment(_RelSeq, {?PUB, del, ack}, Initial) ->
Initial;