diff options
-rw-r--r-- | src/rabbit.erl | 12 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 2 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_master.erl | 28 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 56 | ||||
-rw-r--r-- | src/rabbit_mnesia.erl | 11 | ||||
-rw-r--r-- | src/rabbit_msg_store_gc.erl | 8 | ||||
-rw-r--r-- | src/rabbit_peer_discovery.erl | 62 | ||||
-rw-r--r-- | src/rabbit_peer_discovery_classic_config.erl | 7 | ||||
-rw-r--r-- | src/rabbit_peer_discovery_dns.erl | 9 | ||||
-rw-r--r-- | src/rabbit_queue_consumers.erl | 5 | ||||
-rw-r--r-- | src/rabbit_queue_index.erl | 22 |
11 files changed, 152 insertions, 70 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl index 680a6a2a98..c52949af4c 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -22,7 +22,7 @@ stop_and_halt/0, await_startup/0, status/0, is_running/0, alarms/0, is_running/1, environment/0, rotate_logs/0, force_event_refresh/1, start_fhc/0]). --export([start/2, stop/1]). +-export([start/2, stop/1, prep_stop/1]). -export([start_apps/1, stop_apps/1]). -export([log_locations/0, config_files/0, decrypt_config/2]). %% for testing and mgmt-agent @@ -327,7 +327,9 @@ broker_start() -> ToBeLoaded = Plugins ++ ?APPS, start_apps(ToBeLoaded), maybe_sd_notify(), - ok = log_broker_started(rabbit_plugins:strictly_plugins(rabbit_plugins:active())). + ok = log_broker_started(rabbit_plugins:strictly_plugins(rabbit_plugins:active())), + rabbit_peer_discovery:maybe_register(), + ok. %% Try to send systemd ready notification if it makes sense in the %% current environment. standard_error is used intentionally in all @@ -471,6 +473,8 @@ stop() -> end, rabbit_log:info("RabbitMQ is asked to stop...~n", []), Apps = ?APPS ++ rabbit_plugins:active(), + %% this will also perform unregistration with the peer discovery backend + %% as needed stop_apps(app_utils:app_dependency_order(Apps, true)), rabbit_log:info("Successfully stopped RabbitMQ and its dependencies~n", []). @@ -759,6 +763,10 @@ start(normal, []) -> Error end. +prep_stop(State) -> + rabbit_peer_discovery:maybe_unregister(), + State. + stop(_State) -> ok = rabbit_alarm:stop(), ok = case rabbit_mnesia:is_clustered() of diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index e9f91041e7..c52d329392 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -99,7 +99,7 @@ -spec info_keys() -> rabbit_types:info_keys(). -spec init_with_backing_queue_state (rabbit_types:amqqueue(), atom(), tuple(), any(), - [rabbit_types:delivery()], pmon:pmon(), dict:dict()) -> + [rabbit_types:delivery()], pmon:pmon(), gb_trees:tree()) -> #q{}. %%---------------------------------------------------------------------------- diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index b006e37eb2..b9952178e0 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -57,13 +57,13 @@ coordinator :: pid(), backing_queue :: atom(), backing_queue_state :: any(), - seen_status :: dict:dict(), + seen_status :: map(), confirmed :: [rabbit_guid:guid()], known_senders :: sets:set() }. -spec promote_backing_queue_state (rabbit_amqqueue:name(), pid(), atom(), any(), pid(), [any()], - dict:dict(), [pid()]) -> + map(), [pid()]) -> master_state(). -spec sender_death_fun() -> death_fun(). @@ -127,7 +127,7 @@ init_with_existing_bq(Q = #amqqueue{name = QName}, BQ, BQS) -> coordinator = CPid, backing_queue = BQ, backing_queue_state = BQS, - seen_status = dict:new(), + seen_status = #{}, confirmed = [], known_senders = sets:new(), wait_timeout = rabbit_misc:get_env(rabbit, slave_wait_timeout, 15000) }; @@ -266,7 +266,7 @@ publish(Msg = #basic_message { id = MsgId }, MsgProps, IsDelivered, ChPid, Flow, seen_status = SS, backing_queue = BQ, backing_queue_state = BQS }) -> - false = dict:is_key(MsgId, SS), %% ASSERTION + false = maps:is_key(MsgId, SS), %% ASSERTION ok = gm:broadcast(GM, {publish, ChPid, Flow, MsgProps, Msg}, rabbit_basic:msg_size(Msg)), BQS1 = BQ:publish(Msg, MsgProps, IsDelivered, ChPid, Flow, BQS), @@ -281,7 +281,7 @@ batch_publish(Publishes, ChPid, Flow, lists:foldl(fun ({Msg = #basic_message { id = MsgId }, MsgProps, _IsDelivered}, {Pubs, false, Sizes}) -> {[{Msg, MsgProps, true} | Pubs], %% [0] - false = dict:is_key(MsgId, SS), %% ASSERTION + false = maps:is_key(MsgId, SS), %% ASSERTION Sizes + rabbit_basic:msg_size(Msg)} end, {[], false, 0}, Publishes), Publishes2 = lists:reverse(Publishes1), @@ -298,7 +298,7 @@ publish_delivered(Msg = #basic_message { id = MsgId }, MsgProps, seen_status = SS, backing_queue = BQ, backing_queue_state = BQS }) -> - false = dict:is_key(MsgId, SS), %% ASSERTION + false = maps:is_key(MsgId, SS), %% ASSERTION ok = gm:broadcast(GM, {publish_delivered, ChPid, Flow, MsgProps, Msg}, rabbit_basic:msg_size(Msg)), {AckTag, BQS1} = BQ:publish_delivered(Msg, MsgProps, ChPid, Flow, BQS), @@ -313,7 +313,7 @@ batch_publish_delivered(Publishes, ChPid, Flow, {false, MsgSizes} = lists:foldl(fun ({Msg = #basic_message { id = MsgId }, _MsgProps}, {false, Sizes}) -> - {false = dict:is_key(MsgId, SS), %% ASSERTION + {false = maps:is_key(MsgId, SS), %% ASSERTION Sizes + rabbit_basic:msg_size(Msg)} end, {false, 0}, Publishes), ok = gm:broadcast(GM, {batch_publish_delivered, ChPid, Flow, Publishes}, @@ -326,7 +326,7 @@ discard(MsgId, ChPid, Flow, State = #state { gm = GM, backing_queue = BQ, backing_queue_state = BQS, seen_status = SS }) -> - false = dict:is_key(MsgId, SS), %% ASSERTION + false = maps:is_key(MsgId, SS), %% ASSERTION ok = gm:broadcast(GM, {discard, ChPid, Flow, MsgId}), ensure_monitoring(ChPid, State #state { backing_queue_state = @@ -353,7 +353,7 @@ drain_confirmed(State = #state { backing_queue = BQ, lists:foldl( fun (MsgId, {MsgIdsN, SSN}) -> %% We will never see 'discarded' here - case dict:find(MsgId, SSN) of + case maps:find(MsgId, SSN) of error -> {[MsgId | MsgIdsN], SSN}; {ok, published} -> @@ -364,7 +364,7 @@ drain_confirmed(State = #state { backing_queue = BQ, %% consequently we need to filter out the %% confirm here. We will issue the confirm %% when we see the publish from the channel. - {MsgIdsN, dict:store(MsgId, confirmed, SSN)}; + {MsgIdsN, maps:put(MsgId, confirmed, SSN)}; {ok, confirmed} -> %% Well, confirms are racy by definition. {[MsgId | MsgIdsN], SSN} @@ -457,7 +457,7 @@ msg_rates(#state { backing_queue = BQ, backing_queue_state = BQS }) -> info(backing_queue_status, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> BQ:info(backing_queue_status, BQS) ++ - [ {mirror_seen, dict:size(State #state.seen_status)}, + [ {mirror_seen, maps:size(State #state.seen_status)}, {mirror_senders, sets:size(State #state.known_senders)} ]; info(Item, #state { backing_queue = BQ, backing_queue_state = BQS }) -> BQ:info(Item, BQS). @@ -480,7 +480,7 @@ is_duplicate(Message = #basic_message { id = MsgId }, %% it. %% We will never see {published, ChPid, MsgSeqNo} here. - case dict:find(MsgId, SS) of + case maps:find(MsgId, SS) of error -> %% We permit the underlying BQ to have a peek at it, but %% only if we ourselves are not filtering out the msg. @@ -494,7 +494,7 @@ is_duplicate(Message = #basic_message { id = MsgId }, %% immediately after calling is_duplicate). The msg is %% invalid. We will not see this again, nor will we be %% further involved in confirming this message, so erase. - {true, State #state { seen_status = dict:erase(MsgId, SS) }}; + {true, State #state { seen_status = maps:remove(MsgId, SS) }}; {ok, Disposition} when Disposition =:= confirmed %% It got published when we were a slave via gm, and @@ -509,7 +509,7 @@ is_duplicate(Message = #basic_message { id = MsgId }, %% Message was discarded while we were a slave. Confirm now. %% As above, amqqueue_process will have the entry for the %% msg_id_to_channel mapping. - {true, State #state { seen_status = dict:erase(MsgId, SS), + {true, State #state { seen_status = maps:remove(MsgId, SS), confirmed = [MsgId | Confirmed] }} end. diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 61623c9441..748a5afdf5 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -129,10 +129,10 @@ handle_go(Q = #amqqueue{name = QName}) -> rate_timer_ref = undefined, sync_timer_ref = undefined, - sender_queues = dict:new(), - msg_id_ack = dict:new(), + sender_queues = #{}, + msg_id_ack = #{}, - msg_id_status = dict:new(), + msg_id_status = #{}, known_senders = pmon:new(delegate), depth_delta = undefined @@ -310,7 +310,7 @@ handle_cast({sync_start, Ref, Syncer}, State1 = #state{rate_timer_ref = TRef} = ensure_rate_timer(State), S = fun({MA, TRefN, BQSN}) -> State1#state{depth_delta = undefined, - msg_id_ack = dict:from_list(MA), + msg_id_ack = maps:from_list(MA), rate_timer_ref = TRefN, backing_queue_state = BQSN} end, @@ -546,7 +546,7 @@ send_or_record_confirm(published, #delivery { sender = ChPid, id = MsgId, is_persistent = true } }, MS, #state { q = #amqqueue { durable = true } }) -> - dict:store(MsgId, {published, ChPid, MsgSeqNo} , MS); + maps:put(MsgId, {published, ChPid, MsgSeqNo} , MS); send_or_record_confirm(_Status, #delivery { sender = ChPid, confirm = true, msg_seq_no = MsgSeqNo }, @@ -559,7 +559,7 @@ confirm_messages(MsgIds, State = #state { msg_id_status = MS }) -> lists:foldl( fun (MsgId, {CMsN, MSN} = Acc) -> %% We will never see 'discarded' here - case dict:find(MsgId, MSN) of + case maps:find(MsgId, MSN) of error -> %% If it needed confirming, it'll have %% already been done. @@ -567,12 +567,12 @@ confirm_messages(MsgIds, State = #state { msg_id_status = MS }) -> {ok, published} -> %% Still not seen it from the channel, just %% record that it's been confirmed. - {CMsN, dict:store(MsgId, confirmed, MSN)}; + {CMsN, maps:put(MsgId, confirmed, MSN)}; {ok, {published, ChPid, MsgSeqNo}} -> %% Seen from both GM and Channel. Can now %% confirm. {rabbit_misc:gb_trees_cons(ChPid, MsgSeqNo, CMsN), - dict:erase(MsgId, MSN)}; + maps:remove(MsgId, MSN)}; {ok, confirmed} -> %% It's already been confirmed. This is %% probably it's been both sync'd to disk @@ -672,21 +672,21 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName }, %% Master, or MTC in queue_process. St = [published, confirmed, discarded], - SS = dict:filter(fun (_MsgId, Status) -> lists:member(Status, St) end, MS), - AckTags = [AckTag || {_MsgId, AckTag} <- dict:to_list(MA)], + SS = maps:filter(fun (_MsgId, Status) -> lists:member(Status, St) end, MS), + AckTags = [AckTag || {_MsgId, AckTag} <- maps:to_list(MA)], MasterState = rabbit_mirror_queue_master:promote_backing_queue_state( QName, CPid, BQ, BQS, GM, AckTags, SS, MPids), - MTC = dict:fold(fun (MsgId, {published, ChPid, MsgSeqNo}, MTC0) -> + MTC = maps:fold(fun (MsgId, {published, ChPid, MsgSeqNo}, MTC0) -> gb_trees:insert(MsgId, {ChPid, MsgSeqNo}, MTC0); (_Msgid, _Status, MTC0) -> MTC0 end, gb_trees:empty(), MS), Deliveries = [promote_delivery(Delivery) || - {_ChPid, {PubQ, _PendCh, _ChState}} <- dict:to_list(SQ), + {_ChPid, {PubQ, _PendCh, _ChState}} <- maps:to_list(SQ), Delivery <- queue:to_list(PubQ)], - AwaitGmDown = [ChPid || {ChPid, {_, _, down_from_ch}} <- dict:to_list(SQ)], + AwaitGmDown = [ChPid || {ChPid, {_, _, down_from_ch}} <- maps:to_list(SQ)], KS1 = lists:foldl(fun (ChPid0, KS0) -> pmon:demonitor(ChPid0, KS0) end, KS, AwaitGmDown), @@ -798,20 +798,20 @@ forget_sender(Down1, Down2) when Down1 =/= Down2 -> true. maybe_forget_sender(ChPid, ChState, State = #state { sender_queues = SQ, msg_id_status = MS, known_senders = KS }) -> - case dict:find(ChPid, SQ) of + case maps:find(ChPid, SQ) of error -> State; {ok, {MQ, PendCh, ChStateRecord}} -> case forget_sender(ChState, ChStateRecord) of true -> credit_flow:peer_down(ChPid), - State #state { sender_queues = dict:erase(ChPid, SQ), + State #state { sender_queues = maps:remove(ChPid, SQ), msg_id_status = lists:foldl( - fun dict:erase/2, + fun maps:remove/2, MS, sets:to_list(PendCh)), known_senders = pmon:demonitor(ChPid, KS) }; false -> - SQ1 = dict:store(ChPid, {MQ, PendCh, ChState}, SQ), + SQ1 = maps:put(ChPid, {MQ, PendCh, ChState}, SQ), State #state { sender_queues = SQ1 } end end. @@ -823,32 +823,32 @@ maybe_enqueue_message( send_mandatory(Delivery), %% must do this before confirms State1 = ensure_monitoring(ChPid, State), %% We will never see {published, ChPid, MsgSeqNo} here. - case dict:find(MsgId, MS) of + case maps:find(MsgId, MS) of error -> {MQ, PendingCh, ChState} = get_sender_queue(ChPid, SQ), MQ1 = queue:in(Delivery, MQ), - SQ1 = dict:store(ChPid, {MQ1, PendingCh, ChState}, SQ), + SQ1 = maps:put(ChPid, {MQ1, PendingCh, ChState}, SQ), State1 #state { sender_queues = SQ1 }; {ok, Status} -> MS1 = send_or_record_confirm( - Status, Delivery, dict:erase(MsgId, MS), State1), + Status, Delivery, maps:remove(MsgId, MS), State1), SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ), State1 #state { msg_id_status = MS1, sender_queues = SQ1 } end. get_sender_queue(ChPid, SQ) -> - case dict:find(ChPid, SQ) of + case maps:find(ChPid, SQ) of error -> {queue:new(), sets:new(), running}; {ok, Val} -> Val end. remove_from_pending_ch(MsgId, ChPid, SQ) -> - case dict:find(ChPid, SQ) of + case maps:find(ChPid, SQ) of error -> SQ; {ok, {MQ, PendingCh, ChState}} -> - dict:store(ChPid, {MQ, sets:del_element(MsgId, PendingCh), ChState}, + maps:put(ChPid, {MQ, sets:del_element(MsgId, PendingCh), ChState}, SQ) end. @@ -865,7 +865,7 @@ publish_or_discard(Status, ChPid, MsgId, case queue:out(MQ) of {empty, _MQ2} -> {MQ, sets:add_element(MsgId, PendingCh), - dict:store(MsgId, Status, MS)}; + maps:put(MsgId, Status, MS)}; {{value, Delivery = #delivery { message = #basic_message { id = MsgId } }}, MQ2} -> {MQ2, PendingCh, @@ -880,7 +880,7 @@ publish_or_discard(Status, ChPid, MsgId, %% expecting any confirms from us. {MQ, PendingCh, MS} end, - SQ1 = dict:store(ChPid, {MQ1, PendingCh1, ChState}, SQ), + SQ1 = maps:put(ChPid, {MQ1, PendingCh1, ChState}, SQ), State1 #state { sender_queues = SQ1, msg_id_status = MS1 }. @@ -1002,9 +1002,9 @@ msg_ids_to_acktags(MsgIds, MA) -> {AckTags, MA1} = lists:foldl( fun (MsgId, {Acc, MAN}) -> - case dict:find(MsgId, MA) of + case maps:find(MsgId, MA) of error -> {Acc, MAN}; - {ok, AckTag} -> {[AckTag | Acc], dict:erase(MsgId, MAN)} + {ok, AckTag} -> {[AckTag | Acc], maps:remove(MsgId, MAN)} end end, {[], MA}, MsgIds), {lists:reverse(AckTags), MA1}. @@ -1012,7 +1012,7 @@ msg_ids_to_acktags(MsgIds, MA) -> maybe_store_ack(false, _MsgId, _AckTag, State) -> State; maybe_store_ack(true, MsgId, AckTag, State = #state { msg_id_ack = MA }) -> - State #state { msg_id_ack = dict:store(MsgId, AckTag, MA) }. + State #state { msg_id_ack = maps:put(MsgId, AckTag, MA) }. set_delta(0, State = #state { depth_delta = undefined }) -> ok = record_synchronised(State#state.q), diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 6d3b47a405..68fe5468af 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -139,14 +139,17 @@ init_from_config() -> {ok, _} -> e(invalid_cluster_nodes_conf) end, - case DiscoveredNodes of + rabbit_log:info("All discovered existing cluster peers: ~p~n", + [rabbit_peer_discovery:format_discovered_nodes(DiscoveredNodes)]), + Peers = nodes_excl_me(DiscoveredNodes), + case Peers of [] -> rabbit_log:info("Discovered no peer nodes to cluster with"), init_db_and_upgrade([node()], disc, false, _Retry = true); _ -> - rabbit_log:info("Discovered peer nodes: ~s~n", - [rabbit_peer_discovery:format_discovered_nodes(DiscoveredNodes)]), - join_discovered_peers(DiscoveredNodes, NodeType) + rabbit_log:info("Peer nodes we can cluster with: ~s~n", + [rabbit_peer_discovery:format_discovered_nodes(Peers)]), + join_discovered_peers(Peers, NodeType) end. %% Attempts to join discovered, diff --git a/src/rabbit_msg_store_gc.erl b/src/rabbit_msg_store_gc.erl index 6179ef95bb..728c9652d0 100644 --- a/src/rabbit_msg_store_gc.erl +++ b/src/rabbit_msg_store_gc.erl @@ -70,7 +70,7 @@ set_maximum_since_use(Pid, Age) -> init([MsgStoreState]) -> ok = file_handle_cache:register_callback(?MODULE, set_maximum_since_use, [self()]), - {ok, #state { pending_no_readers = dict:new(), + {ok, #state { pending_no_readers = #{}, on_action = [], msg_store_state = MsgStoreState }, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -89,11 +89,11 @@ handle_cast({delete, File}, State) -> handle_cast({no_readers, File}, State = #state { pending_no_readers = Pending }) -> - {noreply, case dict:find(File, Pending) of + {noreply, case maps:find(File, Pending) of error -> State; {ok, {Action, Files}} -> - Pending1 = dict:erase(File, Pending), + Pending1 = maps:remove(File, Pending), attempt_action( Action, Files, State #state { pending_no_readers = Pending1 }) @@ -123,7 +123,7 @@ attempt_action(Action, Files, fun (Thunk) -> not Thunk() end, [do_action(Action, Files, MsgStoreState) | Thunks]) }; - [File | _] -> Pending1 = dict:store(File, {Action, Files}, Pending), + [File | _] -> Pending1 = maps:put(File, {Action, Files}, Pending), State #state { pending_no_readers = Pending1 } end. diff --git a/src/rabbit_peer_discovery.erl b/src/rabbit_peer_discovery.erl index 7cdb35f6f4..98d2f9e7c5 100644 --- a/src/rabbit_peer_discovery.erl +++ b/src/rabbit_peer_discovery.erl @@ -21,7 +21,8 @@ %% -export([discover_cluster_nodes/0, backend/0, node_type/0, - normalize/1, format_discovered_nodes/1, log_configured_backend/0]). + normalize/1, format_discovered_nodes/1, log_configured_backend/0, + register/0, unregister/0, maybe_register/0, maybe_unregister/0]). -export([append_node_prefix/1, node_prefix/0]). -define(DEFAULT_BACKEND, rabbit_peer_discovery_classic_config). @@ -72,6 +73,64 @@ discover_cluster_nodes() -> normalize(Backend:list_nodes()). +-spec maybe_register() -> ok. + +maybe_register() -> + Backend = backend(), + case Backend:supports_registration() of + true -> + register(); + false -> + rabbit_log:info("Peer discovery backend ~s does not support registration, skipping registration.", [Backend]), + ok + end. + + +-spec maybe_unregister() -> ok. + +maybe_unregister() -> + Backend = backend(), + case Backend:supports_registration() of + true -> + unregister(); + false -> + rabbit_log:info("Peer discovery backend ~s does not support registration, skipping unregistration.", [Backend]), + ok + end. + + +-spec register() -> ok. + +register() -> + Backend = backend(), + rabbit_log:info("Will register with peer discovery backend ~s", [Backend]), + case Backend:register() of + ok -> ok; + {error, Error} -> + rabbit_log:error("Failed to register with peer discovery backend ~s: ~p", + [Backend, Error]), + ok + end. + + +-spec unregister() -> ok. + +unregister() -> + Backend = backend(), + rabbit_log:info("Will unregister with peer discovery backend ~s", [Backend]), + case Backend:unregister() of + ok -> ok; + {error, Error} -> + rabbit_log:error("Failed to unregister with peer discovery backend ~s: ~p", + [Backend, Error]), + ok + end. + + +%% +%% Implementation +%% + -spec normalize(Nodes :: list() | {Nodes :: list(), NodeType :: rabbit_types:node_type()} | {ok, Nodes :: list()} | @@ -90,7 +149,6 @@ normalize({ok, {Nodes, NodeType}}) when is_list(Nodes) andalso is_atom(NodeType) normalize({error, Reason}) -> {error, Reason}. - -spec format_discovered_nodes(Nodes :: list()) -> string(). format_discovered_nodes(Nodes) -> diff --git a/src/rabbit_peer_discovery_classic_config.erl b/src/rabbit_peer_discovery_classic_config.erl index 95e5532548..685e4b1639 100644 --- a/src/rabbit_peer_discovery_classic_config.erl +++ b/src/rabbit_peer_discovery_classic_config.erl @@ -19,7 +19,7 @@ -include("rabbit.hrl"). --export([list_nodes/0, register/0, unregister/0]). +-export([list_nodes/0, supports_registration/0, register/0, unregister/0]). %% %% API @@ -34,6 +34,11 @@ list_nodes() -> undefined -> {[], disc} end. +-spec supports_registration() -> boolean(). + +supports_registration() -> + false. + -spec register() -> ok. register() -> diff --git a/src/rabbit_peer_discovery_dns.erl b/src/rabbit_peer_discovery_dns.erl index b7c623ee55..f048a40c89 100644 --- a/src/rabbit_peer_discovery_dns.erl +++ b/src/rabbit_peer_discovery_dns.erl @@ -19,7 +19,7 @@ -include("rabbit.hrl"). --export([list_nodes/0, register/0, unregister/0]). +-export([list_nodes/0, supports_registration/0, register/0, unregister/0]). %% for tests -export([discover_nodes/2, discover_hostnames/2]). @@ -48,6 +48,13 @@ list_nodes() -> end end. + +-spec supports_registration() -> boolean(). + +supports_registration() -> + false. + + -spec register() -> ok. register() -> diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl index 4e9715fba2..f13a46fcf3 100644 --- a/src/rabbit_queue_consumers.erl +++ b/src/rabbit_queue_consumers.erl @@ -64,7 +64,8 @@ -spec max_active_priority(state()) -> integer() | 'infinity' | 'empty'. -spec inactive(state()) -> boolean(). -spec all(state()) -> [{ch(), rabbit_types:ctag(), boolean(), - non_neg_integer(), rabbit_framing:amqp_table()}]. + non_neg_integer(), rabbit_framing:amqp_table(), + rabbit_types:username()}]. -spec count() -> non_neg_integer(). -spec unacknowledged_message_count() -> non_neg_integer(). -spec add(ch(), rabbit_types:ctag(), boolean(), pid(), boolean(), @@ -280,7 +281,7 @@ subtract_acks([T | TL] = AckTags, Prefix, CTagCounts, AckQ) -> orddict:update_counter(CTag, 1, CTagCounts), QTail); {{value, V}, QTail} -> subtract_acks(AckTags, [V | Prefix], CTagCounts, QTail); - {empty, _} -> + {empty, _} -> subtract_acks([], Prefix, CTagCounts, AckQ) end. diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index bf80fe53a5..67f783a8dd 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -80,7 +80,7 @@ %% contains a mapping from segment numbers to state-per-segment (this %% state is held for all segments which have been "seen": thus a %% segment which has been read but has no pending entries in the -%% journal is still held in this mapping. Also note that a dict is +%% journal is still held in this mapping. Also note that a map is %% used for this mapping, not an array because with an array, you will %% always have entries from 0). Actions are stored directly in this %% state. Thus at the point of flushing the journal, firstly no @@ -233,10 +233,10 @@ unacked :: non_neg_integer() }). -type seq_id() :: integer(). --type seg_dict() :: {dict:dict(), [segment()]}. +-type seg_map() :: {map(), [segment()]}. -type on_sync_fun() :: fun ((gb_sets:set()) -> ok). -type qistate() :: #qistate { dir :: file:filename(), - segments :: 'undefined' | seg_dict(), + segments :: 'undefined' | seg_map(), journal_handle :: hdl(), dirty_count :: integer(), max_journal_entries :: non_neg_integer(), @@ -995,7 +995,7 @@ segment_find(Seg, {_Segments, [Segment = #segment { num = Seg } |_]}) -> segment_find(Seg, {_Segments, [_, Segment = #segment { num = Seg }]}) -> {ok, Segment}; %% 2, matches tail segment_find(Seg, {Segments, _}) -> %% no match - dict:find(Seg, Segments). + maps:find(Seg, Segments). segment_store(Segment = #segment { num = Seg }, %% 1 or (2, matches head) {Segments, [#segment { num = Seg } | Tail]}) -> @@ -1004,28 +1004,28 @@ segment_store(Segment = #segment { num = Seg }, %% 2, matches tail {Segments, [SegmentA, #segment { num = Seg }]}) -> {Segments, [Segment, SegmentA]}; segment_store(Segment = #segment { num = Seg }, {Segments, []}) -> - {dict:erase(Seg, Segments), [Segment]}; + {maps:remove(Seg, Segments), [Segment]}; segment_store(Segment = #segment { num = Seg }, {Segments, [SegmentA]}) -> - {dict:erase(Seg, Segments), [Segment, SegmentA]}; + {maps:remove(Seg, Segments), [Segment, SegmentA]}; segment_store(Segment = #segment { num = Seg }, {Segments, [SegmentA, SegmentB]}) -> - {dict:store(SegmentB#segment.num, SegmentB, dict:erase(Seg, Segments)), + {maps:put(SegmentB#segment.num, SegmentB, maps:remove(Seg, Segments)), [Segment, SegmentA]}. segment_fold(Fun, Acc, {Segments, CachedSegments}) -> - dict:fold(fun (_Seg, Segment, Acc1) -> Fun(Segment, Acc1) end, + maps:fold(fun (_Seg, Segment, Acc1) -> Fun(Segment, Acc1) end, lists:foldl(Fun, Acc, CachedSegments), Segments). segment_map(Fun, {Segments, CachedSegments}) -> - {dict:map(fun (_Seg, Segment) -> Fun(Segment) end, Segments), + {maps:map(fun (_Seg, Segment) -> Fun(Segment) end, Segments), lists:map(Fun, CachedSegments)}. segment_nums({Segments, CachedSegments}) -> lists:map(fun (#segment { num = Num }) -> Num end, CachedSegments) ++ - dict:fetch_keys(Segments). + maps:keys(Segments). segments_new() -> - {dict:new(), []}. + {#{}, []}. entry_to_segment(_RelSeq, {?PUB, del, ack}, Initial) -> Initial; |