summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2012-09-27 15:00:53 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2012-09-27 15:00:53 +0100
commit65fe5952872f72eb586e1cb76412fdc53f6154ba (patch)
tree07409fe1b2d730d5f7f563543426d94f0c8ada89
parentd09d500f7c32b712a533921b3ea0b2007a946702 (diff)
parent441cd4740e6963cd4c842798ca9df57704ff0b22 (diff)
downloadrabbitmq-server-65fe5952872f72eb586e1cb76412fdc53f6154ba.tar.gz
merge bug25145 into default
-rw-r--r--ebin/rabbit_app.in2
-rw-r--r--include/rabbit.hrl3
-rw-r--r--src/rabbit.erl2
-rw-r--r--src/rabbit_amqqueue_process.erl31
-rw-r--r--src/rabbit_backing_queue.erl5
-rw-r--r--src/rabbit_channel.erl2
-rw-r--r--src/rabbit_control_main.erl9
-rw-r--r--src/rabbit_direct.erl2
-rw-r--r--src/rabbit_mirror_queue_master.erl21
-rw-r--r--src/rabbit_mirror_queue_slave.erl13
-rw-r--r--src/rabbit_mnesia.erl503
-rw-r--r--src/rabbit_networking.erl2
-rw-r--r--src/rabbit_node_monitor.erl194
-rw-r--r--src/rabbit_tests.erl12
-rw-r--r--src/rabbit_upgrade.erl37
-rw-r--r--src/rabbit_variable_queue.erl27
16 files changed, 391 insertions, 474 deletions
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in
index 78842281..9b1ff8bd 100644
--- a/ebin/rabbit_app.in
+++ b/ebin/rabbit_app.in
@@ -33,7 +33,7 @@
{default_user_tags, [administrator]},
{default_vhost, <<"/">>},
{default_permissions, [<<".*">>, <<".*">>, <<".*">>]},
- {cluster_nodes, {[], true}},
+ {cluster_nodes, {[], disc}},
{server_properties, []},
{collect_statistics, none},
{collect_statistics_interval, 5000},
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index fff92205..f2389587 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -78,7 +78,8 @@
-record(event, {type, props, timestamp}).
--record(message_properties, {expiry, needs_confirming = false}).
+-record(message_properties, {expiry, needs_confirming = false,
+ delivered = false}).
-record(plugin, {name, %% atom()
version, %% string()
diff --git a/src/rabbit.erl b/src/rabbit.erl
index e9587841..3fe27cd9 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -519,7 +519,7 @@ sort_boot_steps(UnsortedSteps) ->
end.
boot_error({error, {timeout_waiting_for_tables, _}}, _Stacktrace) ->
- AllNodes = rabbit_mnesia:all_clustered_nodes(),
+ AllNodes = rabbit_mnesia:cluster_nodes(all),
{Err, Nodes} =
case AllNodes -- [node()] of
[] -> {"Timeout contacting cluster nodes. Since RabbitMQ was"
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index d0810564..10ac5bea 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -78,7 +78,7 @@
-spec(info_keys/0 :: () -> rabbit_types:info_keys()).
-spec(init_with_backing_queue_state/8 ::
(rabbit_types:amqqueue(), atom(), tuple(), any(), [any()],
- [{rabbit_types:delivery(), boolean()}], pmon:pmon(), dict()) -> #q{}).
+ [rabbit_types:delivery()], pmon:pmon(), dict()) -> #q{}).
-endif.
@@ -169,11 +169,9 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
State1 = requeue_and_run(AckTags, process_args(
rabbit_event:init_stats_timer(
State, #q.stats_timer))),
- lists:foldl(
- fun ({Delivery, Redelivered}, StateN) ->
- deliver_or_enqueue(Delivery, Redelivered, StateN)
- end,
- State1, Deliveries).
+ lists:foldl(fun (Delivery, StateN) ->
+ deliver_or_enqueue(Delivery, true, StateN)
+ end, State1, Deliveries).
terminate(shutdown = R, State = #q{backing_queue = BQ}) ->
terminate_shutdown(fun (BQS) -> BQ:terminate(R, BQS) end, State);
@@ -536,19 +534,17 @@ run_message_queue(State) ->
BQ:is_empty(BQS), State1),
State2.
-attempt_delivery(#delivery{sender = SenderPid, message = Message}, Confirm,
- Redelivered,
+attempt_delivery(#delivery{sender = SenderPid, message = Message}, Props,
State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
case BQ:is_duplicate(Message, BQS) of
{false, BQS1} ->
deliver_msgs_to_consumers(
fun (AckRequired, State1 = #q{backing_queue_state = BQS2}) ->
- Props = message_properties(Confirm, State1),
{AckTag, BQS3} = BQ:publish_delivered(
AckRequired, Message, Props,
SenderPid, BQS2),
- {{Message, Redelivered, AckTag}, true,
- State1#q{backing_queue_state = BQS3}}
+ {{Message, Props#message_properties.delivered, AckTag},
+ true, State1#q{backing_queue_state = BQS3}}
end, false, State#q{backing_queue_state = BQS1});
{Duplicate, BQS1} ->
%% if the message has previously been seen by the BQ then
@@ -563,10 +559,11 @@ attempt_delivery(#delivery{sender = SenderPid, message = Message}, Confirm,
end.
deliver_or_enqueue(Delivery = #delivery{message = Message,
- sender = SenderPid}, Redelivered,
+ sender = SenderPid}, Delivered,
State) ->
Confirm = should_confirm_message(Delivery, State),
- case attempt_delivery(Delivery, Confirm, Redelivered, State) of
+ Props = message_properties(Confirm, Delivered, State),
+ case attempt_delivery(Delivery, Props, State) of
{true, State1} ->
maybe_record_confirm_message(Confirm, State1);
%% the next one is an optimisations
@@ -576,8 +573,7 @@ deliver_or_enqueue(Delivery = #delivery{message = Message,
{false, State1} ->
State2 = #q{backing_queue = BQ, backing_queue_state = BQS} =
maybe_record_confirm_message(Confirm, State1),
- Props = message_properties(Confirm, State2),
- BQS1 = BQ:publish(Message, Props, SenderPid, Redelivered, BQS),
+ BQS1 = BQ:publish(Message, Props, SenderPid, BQS),
ensure_ttl_timer(Props#message_properties.expiry,
State2#q{backing_queue_state = BQS1})
end.
@@ -706,9 +702,10 @@ discard_delivery(#delivery{sender = SenderPid,
backing_queue_state = BQS}) ->
State#q{backing_queue_state = BQ:discard(Message, SenderPid, BQS)}.
-message_properties(Confirm, #q{ttl = TTL}) ->
+message_properties(Confirm, Delivered, #q{ttl = TTL}) ->
#message_properties{expiry = calculate_msg_expiry(TTL),
- needs_confirming = needs_confirming(Confirm)}.
+ needs_confirming = needs_confirming(Confirm),
+ delivered = Delivered}.
calculate_msg_expiry(undefined) -> undefined;
calculate_msg_expiry(TTL) -> now_micros() + (TTL * 1000).
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 9510ae23..d69a6c3b 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -77,8 +77,7 @@
%% Publish a message.
-callback publish(rabbit_types:basic_message(),
- rabbit_types:message_properties(), pid(), boolean(),
- state()) ->
+ rabbit_types:message_properties(), pid(), state()) ->
state().
%% Called for messages which have already been passed straight
@@ -213,7 +212,7 @@
behaviour_info(callbacks) ->
[{start, 1}, {stop, 0}, {init, 3}, {terminate, 2},
- {delete_and_terminate, 2}, {purge, 1}, {publish, 5},
+ {delete_and_terminate, 2}, {purge, 1}, {publish, 4},
{publish_delivered, 5}, {drain_confirmed, 1}, {dropwhile, 3},
{fetch, 2}, {ack, 2}, {fold, 3}, {requeue, 2}, {len, 1},
{is_empty, 1}, {depth, 1}, {set_ram_duration_target, 2},
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index e8f3aab3..0d13312b 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -136,7 +136,7 @@ flushed(Pid, QPid) ->
gen_server2:cast(Pid, {flushed, QPid}).
list() ->
- rabbit_misc:append_rpc_all_nodes(rabbit_mnesia:running_clustered_nodes(),
+ rabbit_misc:append_rpc_all_nodes(rabbit_mnesia:cluster_nodes(running),
rabbit_channel, list_local, []).
list_local() ->
diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl
index bd01a1b1..e75e1f6f 100644
--- a/src/rabbit_control_main.erl
+++ b/src/rabbit_control_main.erl
@@ -247,9 +247,12 @@ action(force_reset, Node, [], _Opts, Inform) ->
action(join_cluster, Node, [ClusterNodeS], Opts, Inform) ->
ClusterNode = list_to_atom(ClusterNodeS),
- DiscNode = not proplists:get_bool(?RAM_OPT, Opts),
+ NodeType = case proplists:get_bool(?RAM_OPT, Opts) of
+ true -> ram;
+ false -> disc
+ end,
Inform("Clustering node ~p with ~p", [Node, ClusterNode]),
- rpc_call(Node, rabbit_mnesia, join_cluster, [ClusterNode, DiscNode]);
+ rpc_call(Node, rabbit_mnesia, join_cluster, [ClusterNode, NodeType]);
action(change_cluster_node_type, Node, ["ram"], _Opts, Inform) ->
Inform("Turning ~p into a ram node", [Node]),
@@ -458,7 +461,7 @@ action(list_parameters, Node, [], Opts, Inform) ->
action(report, Node, _Args, _Opts, Inform) ->
Inform("Reporting server status on ~p~n~n", [erlang:universaltime()]),
[begin ok = action(Action, N, [], [], Inform), io:nl() end ||
- N <- unsafe_rpc(Node, rabbit_mnesia, running_clustered_nodes, []),
+ N <- unsafe_rpc(Node, rabbit_mnesia, cluster_nodes, [running]),
Action <- [status, cluster_status, environment]],
VHosts = unsafe_rpc(Node, rabbit_vhost, list, []),
[print_report(Node, Q) || Q <- ?GLOBAL_QUERIES],
diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl
index a3431321..689e5d83 100644
--- a/src/rabbit_direct.erl
+++ b/src/rabbit_direct.erl
@@ -60,7 +60,7 @@ list_local() ->
pg_local:get_members(rabbit_direct).
list() ->
- rabbit_misc:append_rpc_all_nodes(rabbit_mnesia:running_clustered_nodes(),
+ rabbit_misc:append_rpc_all_nodes(rabbit_mnesia:cluster_nodes(running),
rabbit_direct, list_local, []).
%%----------------------------------------------------------------------------
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index d447f1f3..4cfb3dcb 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -17,7 +17,7 @@
-module(rabbit_mirror_queue_master).
-export([init/3, terminate/2, delete_and_terminate/2,
- purge/1, publish/5, publish_delivered/5, fetch/2, ack/2,
+ purge/1, publish/4, publish_delivered/5, fetch/2, ack/2,
requeue/2, len/1, is_empty/1, depth/1, drain_confirmed/1,
dropwhile/3, set_ram_duration_target/2, ram_duration/1,
needs_timeout/1, timeout/1, handle_pre_hibernate/1,
@@ -87,12 +87,11 @@ init(#amqqueue { name = QName, mirror_nodes = MNodes } = Q, Recover,
{ok, CPid} = rabbit_mirror_queue_coordinator:start_link(
Q, undefined, sender_death_fun(), length_fun()),
GM = rabbit_mirror_queue_coordinator:get_gm(CPid),
- MNodes1 =
- (case MNodes of
- all -> rabbit_mnesia:all_clustered_nodes();
- undefined -> [];
- _ -> MNodes
- end) -- [node()],
+ MNodes1 = (case MNodes of
+ all -> rabbit_mnesia:cluster_nodes(all);
+ undefined -> [];
+ _ -> MNodes
+ end) -- [node()],
[rabbit_mirror_queue_misc:add_mirror(QName, Node) || Node <- MNodes1],
{ok, BQ} = application:get_env(backing_queue_module),
BQS = BQ:init(Q, Recover, AsyncCallback),
@@ -153,14 +152,14 @@ purge(State = #state { gm = GM,
{Count, State #state { backing_queue_state = BQS1,
set_delivered = 0 }}.
-publish(Msg = #basic_message { id = MsgId }, MsgProps, ChPid, Redelivered,
+publish(Msg = #basic_message { id = MsgId }, MsgProps, ChPid,
State = #state { gm = GM,
seen_status = SS,
backing_queue = BQ,
backing_queue_state = BQS }) ->
false = dict:is_key(MsgId, SS), %% ASSERTION
- ok = gm:broadcast(GM, {publish, false, ChPid, MsgProps, Msg, Redelivered}),
- BQS1 = BQ:publish(Msg, MsgProps, ChPid, Redelivered, BQS),
+ ok = gm:broadcast(GM, {publish, false, ChPid, MsgProps, Msg}),
+ BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS),
ensure_monitoring(ChPid, State #state { backing_queue_state = BQS1 }).
publish_delivered(AckRequired, Msg = #basic_message { id = MsgId }, MsgProps,
@@ -171,7 +170,7 @@ publish_delivered(AckRequired, Msg = #basic_message { id = MsgId }, MsgProps,
ack_msg_id = AM }) ->
false = dict:is_key(MsgId, SS), %% ASSERTION
ok = gm:broadcast(
- GM, {publish, {true, AckRequired}, ChPid, MsgProps, Msg, false}),
+ GM, {publish, {true, AckRequired}, ChPid, MsgProps, Msg}),
{AckTag, BQS1} =
BQ:publish_delivered(AckRequired, Msg, MsgProps, ChPid, BQS),
AM1 = maybe_store_acktag(AckTag, MsgId, AM),
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index b11cd199..625bcdff 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -457,8 +457,7 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName },
ok = rabbit_mirror_queue_coordinator:ensure_monitoring(CPid, MPids),
%% We find all the messages that we've received from channels but
- %% not from gm, and if they're due to be enqueued on promotion
- %% then we pass them to the
+ %% not from gm, and pass them to the
%% queue_process:init_with_backing_queue_state to be enqueued.
%%
%% We also have to requeue messages which are pending acks: the
@@ -536,10 +535,8 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName },
end, gb_trees:empty(), MSList),
NumAckTags = [NumAckTag || {_MsgId, NumAckTag} <- dict:to_list(MA)],
AckTags = [AckTag || {_Num, AckTag} <- lists:sort(NumAckTags)],
- Deliveries = [{Delivery, true} ||
- {_ChPid, {PubQ, _PendCh}} <- dict:to_list(SQ),
+ Deliveries = [Delivery || {_ChPid, {PubQ, _PendCh}} <- dict:to_list(SQ),
Delivery <- queue:to_list(PubQ)],
- rabbit_log:warning("Promotion deliveries: ~p~n", [Deliveries]),
QueueState = rabbit_amqqueue_process:init_with_backing_queue_state(
Q1, rabbit_mirror_queue_master, MasterState, RateTRef,
AckTags, Deliveries, KS, MTC),
@@ -695,8 +692,7 @@ remove_from_pending_ch(MsgId, ChPid, SQ) ->
end.
process_instruction(
- {publish, Deliver, ChPid, MsgProps, Msg = #basic_message { id = MsgId },
- Redelivered},
+ {publish, Deliver, ChPid, MsgProps, Msg = #basic_message { id = MsgId }},
State = #state { sender_queues = SQ,
backing_queue = BQ,
backing_queue_state = BQS,
@@ -746,10 +742,9 @@ process_instruction(
{ok,
case Deliver of
false ->
- BQS1 = BQ:publish(Msg, MsgProps, ChPid, Redelivered, BQS),
+ BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS),
State2 #state { backing_queue_state = BQS1 };
{true, AckRequired} ->
- false = Redelivered, %% master:publish_delivered/5 only sends this
{AckTag, BQS1} = BQ:publish_delivered(AckRequired, Msg, MsgProps,
ChPid, BQS),
maybe_store_ack(AckRequired, MsgId, AckTag,
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 8ce19cc6..ae36febb 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -28,19 +28,16 @@
status/0,
is_db_empty/0,
is_clustered/0,
- all_clustered_nodes/0,
- clustered_disc_nodes/0,
- running_clustered_nodes/0,
- is_disc_node/0,
+ cluster_nodes/1,
+ node_type/0,
dir/0,
table_names/0,
- wait_for_tables/1,
cluster_status_from_mnesia/0,
- init_db/3,
+ init_db_unchecked/2,
empty_ram_only_tables/0,
copy_db/1,
- wait_for_tables/0,
+ wait_for_tables/1,
check_cluster_consistency/0,
ensure_mnesia_dir/0,
@@ -67,12 +64,11 @@
-export_type([node_type/0, cluster_status/0]).
-type(node_type() :: disc | ram).
--type(cluster_status() :: {ordsets:ordset(node()), ordsets:ordset(node()),
- ordsets:ordset(node())}).
+-type(cluster_status() :: {[node()], [node()], [node()]}).
%% Main interface
-spec(init/0 :: () -> 'ok').
--spec(join_cluster/2 :: ([node()], boolean()) -> 'ok').
+-spec(join_cluster/2 :: (node(), node_type()) -> 'ok').
-spec(reset/0 :: () -> 'ok').
-spec(force_reset/0 :: () -> 'ok').
-spec(update_cluster_nodes/1 :: (node()) -> 'ok').
@@ -84,17 +80,15 @@
{'running_nodes', [node()]}]).
-spec(is_db_empty/0 :: () -> boolean()).
-spec(is_clustered/0 :: () -> boolean()).
--spec(all_clustered_nodes/0 :: () -> [node()]).
--spec(clustered_disc_nodes/0 :: () -> [node()]).
--spec(running_clustered_nodes/0 :: () -> [node()]).
--spec(is_disc_node/0 :: () -> boolean()).
+-spec(cluster_nodes/1 :: ('all' | 'disc' | 'ram' | 'running') -> [node()]).
+-spec(node_type/0 :: () -> node_type()).
-spec(dir/0 :: () -> file:filename()).
-spec(table_names/0 :: () -> [atom()]).
--spec(cluster_status_from_mnesia/0 :: () -> {'ok', cluster_status()} |
- {'error', any()}).
+-spec(cluster_status_from_mnesia/0 :: () -> rabbit_types:ok_or_error2(
+ cluster_status(), any())).
%% Operations on the db and utils, mainly used in `rabbit_upgrade' and `rabbit'
--spec(init_db/3 :: ([node()], boolean(), boolean()) -> 'ok').
+-spec(init_db_unchecked/2 :: ([node()], node_type()) -> 'ok').
-spec(empty_ram_only_tables/0 :: () -> 'ok').
-spec(create_tables/0 :: () -> 'ok').
-spec(copy_db/1 :: (file:filename()) -> rabbit_types:ok_or_error(any())).
@@ -106,12 +100,6 @@
-spec(on_node_up/1 :: (node()) -> 'ok').
-spec(on_node_down/1 :: (node()) -> 'ok').
-%% Functions used in internal rpc calls
--spec(node_info/0 :: () -> {string(), string(),
- ({'ok', cluster_status()} | 'error')}).
--spec(remove_node_if_mnesia_running/1 :: (node()) -> 'ok' |
- {'error', term()}).
-
-endif.
%%----------------------------------------------------------------------------
@@ -123,7 +111,9 @@ init() ->
ensure_mnesia_dir(),
case is_virgin_node() of
true -> init_from_config();
- false -> init(is_disc_node(), all_clustered_nodes())
+ false -> NodeType = node_type(),
+ init_db_and_upgrade(cluster_nodes(all), NodeType,
+ NodeType =:= ram)
end,
%% We intuitively expect the global name server to be synced when
%% Mnesia is up. In fact that's not guaranteed to be the case -
@@ -131,24 +121,21 @@ init() ->
ok = global:sync(),
ok.
-init(WantDiscNode, AllNodes) ->
- init_db_and_upgrade(AllNodes, WantDiscNode, WantDiscNode).
-
init_from_config() ->
- {ok, {TryNodes, WantDiscNode}} =
+ {ok, {TryNodes, NodeType}} =
application:get_env(rabbit, cluster_nodes),
- case find_good_node(TryNodes -- [node()]) of
+ case find_good_node(nodes_excl_me(TryNodes)) of
{ok, Node} ->
rabbit_log:info("Node '~p' selected for clustering from "
"configuration~n", [Node]),
{ok, {_, DiscNodes, _}} = discover_cluster(Node),
- init_db_and_upgrade(DiscNodes, WantDiscNode, false),
+ init_db_and_upgrade(DiscNodes, NodeType, true),
rabbit_node_monitor:notify_joined_cluster();
none ->
rabbit_log:warning("Could not find any suitable node amongst the "
"ones provided in the configuration: ~p~n",
[TryNodes]),
- init(true, [node()])
+ init_db_and_upgrade([node()], disc, false)
end.
%% Make the node join a cluster. The node will be reset automatically
@@ -165,21 +152,18 @@ init_from_config() ->
%% Note that we make no attempt to verify that the nodes provided are
%% all in the same cluster, we simply pick the first online node and
%% we cluster to its cluster.
-join_cluster(DiscoveryNode, WantDiscNode) ->
- case is_disc_and_clustered() andalso [node()] =:= clustered_disc_nodes() of
- true -> e(clustering_only_disc_node);
- _ -> ok
- end,
-
+join_cluster(DiscoveryNode, NodeType) ->
ensure_mnesia_not_running(),
ensure_mnesia_dir(),
-
+ case is_only_clustered_disc_node() of
+ true -> e(clustering_only_disc_node);
+ false -> ok
+ end,
{ClusterNodes, _, _} = case discover_cluster(DiscoveryNode) of
{ok, Res} -> Res;
- E = {error, _} -> throw(E)
+ {error, _} = E -> throw(E)
end,
-
- case lists:member(node(), ClusterNodes) of
+ case me_in_nodes(ClusterNodes) of
true -> e(already_clustered);
false -> ok
end,
@@ -190,11 +174,10 @@ join_cluster(DiscoveryNode, WantDiscNode) ->
%% of reseting the node from the user.
reset(false),
- rabbit_misc:local_info_msg("Clustering with ~p~n", [ClusterNodes]),
-
%% Join the cluster
- ok = init_db_with_mnesia(ClusterNodes, WantDiscNode, false),
-
+ rabbit_misc:local_info_msg("Clustering with ~p as ~p node~n",
+ [ClusterNodes, NodeType]),
+ ok = init_db_with_mnesia(ClusterNodes, NodeType, true, true),
rabbit_node_monitor:notify_joined_cluster(),
ok.
@@ -202,87 +185,81 @@ join_cluster(DiscoveryNode, WantDiscNode) ->
%% return node to its virgin state, where it is not member of any
%% cluster, has no cluster configuration, no local database, and no
%% persisted messages
-reset() -> reset(false).
-force_reset() -> reset(true).
+reset() ->
+ rabbit_misc:local_info_msg("Resetting Rabbit~n", []),
+ reset(false).
+
+force_reset() ->
+ rabbit_misc:local_info_msg("Resetting Rabbit forcefully~n", []),
+ reset(true).
reset(Force) ->
- rabbit_misc:local_info_msg("Resetting Rabbit~s~n",
- [if Force -> " forcefully";
- true -> ""
- end]),
ensure_mnesia_not_running(),
- Node = node(),
- case Force of
- true ->
- disconnect_nodes(nodes());
- false ->
- AllNodes = all_clustered_nodes(),
- %% Reconnecting so that we will get an up to date nodes.
- %% We don't need to check for consistency because we are
- %% resetting. Force=true here so that reset still works
- %% when clustered with a node which is down.
- init_db_with_mnesia(AllNodes, is_disc_node(), false, true),
- case is_disc_and_clustered() andalso
- [node()] =:= clustered_disc_nodes()
- of
- true -> e(resetting_only_disc_node);
- false -> ok
+ Nodes = case Force of
+ true ->
+ nodes();
+ false ->
+ AllNodes = cluster_nodes(all),
+ %% Reconnecting so that we will get an up to date
+ %% nodes. We don't need to check for consistency
+ %% because we are resetting. Force=true here so
+ %% that reset still works when clustered with a
+ %% node which is down.
+ init_db_with_mnesia(AllNodes, node_type(), false, false),
+ case is_only_clustered_disc_node() of
+ true -> e(resetting_only_disc_node);
+ false -> ok
+ end,
+ leave_cluster(),
+ rabbit_misc:ensure_ok(mnesia:delete_schema([node()]),
+ cannot_delete_schema),
+ cluster_nodes(all)
end,
- leave_cluster(),
- rabbit_misc:ensure_ok(mnesia:delete_schema([Node]),
- cannot_delete_schema),
- disconnect_nodes(all_clustered_nodes()),
- ok
- end,
+ %% We need to make sure that we don't end up in a distributed
+ %% Erlang system with nodes while not being in an Mnesia cluster
+ %% with them. We don't handle that well.
+ [erlang:disconnect_node(N) || N <- Nodes],
%% remove persisted messages and any other garbage we find
ok = rabbit_file:recursive_delete(filelib:wildcard(dir() ++ "/*")),
ok = rabbit_node_monitor:reset_cluster_status(),
ok.
-%% We need to make sure that we don't end up in a distributed Erlang
-%% system with nodes while not being in an Mnesia cluster with
-%% them. We don't handle that well.
-disconnect_nodes(Nodes) -> [erlang:disconnect_node(N) || N <- Nodes].
-
change_cluster_node_type(Type) ->
- ensure_mnesia_dir(),
ensure_mnesia_not_running(),
+ ensure_mnesia_dir(),
case is_clustered() of
false -> e(not_clustered);
true -> ok
end,
- {_, _, RunningNodes} =
- case discover_cluster(all_clustered_nodes()) of
- {ok, Status} -> Status;
- {error, _Reason} -> e(cannot_connect_to_cluster)
- end,
+ {_, _, RunningNodes} = case discover_cluster(cluster_nodes(all)) of
+ {ok, Status} -> Status;
+ {error, _Reason} -> e(cannot_connect_to_cluster)
+ end,
Node = case RunningNodes of
[] -> e(no_online_cluster_nodes);
[Node0|_] -> Node0
end,
- ok = reset(false),
- ok = join_cluster(Node, case Type of
- ram -> false;
- disc -> true
- end).
+ ok = reset(),
+ ok = join_cluster(Node, Type).
update_cluster_nodes(DiscoveryNode) ->
ensure_mnesia_not_running(),
ensure_mnesia_dir(),
-
Status = {AllNodes, _, _} =
case discover_cluster(DiscoveryNode) of
{ok, Status0} -> Status0;
{error, _Reason} -> e(cannot_connect_to_node)
end,
- case ordsets:is_element(node(), AllNodes) of
+ case me_in_nodes(AllNodes) of
true ->
%% As in `check_consistency/0', we can safely delete the
%% schema here, since it'll be replicated from the other
%% nodes
mnesia:delete_schema([node()]),
rabbit_node_monitor:write_cluster_status(Status),
- init_db_with_mnesia(AllNodes, is_disc_node(), false);
+ rabbit_misc:local_info_msg("Updating cluster nodes from ~p~n",
+ [DiscoveryNode]),
+ init_db_with_mnesia(AllNodes, node_type(), true, true);
false ->
e(inconsistent_cluster)
end,
@@ -296,29 +273,25 @@ update_cluster_nodes(DiscoveryNode) ->
%% the last or second to last after the node we're removing to go
%% down
forget_cluster_node(Node, RemoveWhenOffline) ->
- case ordsets:is_element(Node, all_clustered_nodes()) of
+ case lists:member(Node, cluster_nodes(all)) of
true -> ok;
false -> e(not_a_cluster_node)
end,
- case {mnesia:system_info(is_running), RemoveWhenOffline} of
- {yes, true} -> e(online_node_offline_flag);
- _ -> ok
- end,
- case remove_node_if_mnesia_running(Node) of
- ok ->
- ok;
- {error, mnesia_not_running} when RemoveWhenOffline ->
- remove_node_offline_node(Node);
- {error, mnesia_not_running} ->
- e(offline_node_no_offline_flag);
- Err = {error, _} ->
- throw(Err)
+ case {RemoveWhenOffline, mnesia:system_info(is_running)} of
+ {true, no} -> remove_node_offline_node(Node);
+ {true, yes} -> e(online_node_offline_flag);
+ {false, no} -> e(offline_node_no_offline_flag);
+ {false, yes} -> rabbit_misc:local_info_msg(
+ "Removing node ~p from cluster~n", [Node]),
+ case remove_node_if_mnesia_running(Node) of
+ ok -> ok;
+ {error, _} = Err -> throw(Err)
+ end
end.
remove_node_offline_node(Node) ->
- case {ordsets:del_element(Node, running_nodes(all_clustered_nodes())),
- is_disc_node()} of
- {[], true} ->
+ case {running_nodes(cluster_nodes(all)) -- [Node], node_type()} of
+ {[], disc} ->
%% Note that while we check if the nodes was the last to
%% go down, apart from the node we're removing from, this
%% is still unsafe. Consider the situation in which A and
@@ -327,12 +300,10 @@ remove_node_offline_node(Node) ->
%% and B goes down. In this case, C is the second-to-last,
%% but we don't know that and we'll remove B from A
%% anyway, even if that will lead to bad things.
- case ordsets:subtract(running_clustered_nodes(),
- ordsets:from_list([node(), Node])) of
+ case cluster_nodes(running) -- [node(), Node] of
[] -> start_mnesia(),
try
- [mnesia:force_load_table(T) ||
- T <- rabbit_mnesia:table_names()],
+ [mnesia:force_load_table(T) || T <- table_names()],
forget_cluster_node(Node, false),
ensure_mnesia_running()
after
@@ -350,13 +321,13 @@ remove_node_offline_node(Node) ->
%%----------------------------------------------------------------------------
status() ->
- IfNonEmpty = fun (_, []) -> [];
+ IfNonEmpty = fun (_, []) -> [];
(Type, Nodes) -> [{Type, Nodes}]
end,
- [{nodes, (IfNonEmpty(disc, clustered_disc_nodes()) ++
- IfNonEmpty(ram, clustered_ram_nodes()))}] ++
+ [{nodes, (IfNonEmpty(disc, cluster_nodes(disc)) ++
+ IfNonEmpty(ram, cluster_nodes(ram)))}] ++
case mnesia:system_info(is_running) of
- yes -> [{running_nodes, running_clustered_nodes()}];
+ yes -> [{running_nodes, cluster_nodes(running)}];
no -> []
end.
@@ -364,27 +335,10 @@ is_db_empty() ->
lists:all(fun (Tab) -> mnesia:dirty_first(Tab) == '$end_of_table' end,
table_names()).
-is_clustered() ->
- Nodes = all_clustered_nodes(),
- [node()] =/= Nodes andalso [] =/= Nodes.
-
-is_disc_and_clustered() -> is_disc_node() andalso is_clustered().
-
-%% Functions that retrieve the nodes in the cluster will rely on the
-%% status file if offline.
+is_clustered() -> AllNodes = cluster_nodes(all),
+ AllNodes =/= [] andalso AllNodes =/= [node()].
-all_clustered_nodes() -> cluster_status(all).
-
-clustered_disc_nodes() -> cluster_status(disc).
-
-clustered_ram_nodes() -> ordsets:subtract(cluster_status(all),
- cluster_status(disc)).
-
-running_clustered_nodes() -> cluster_status(running).
-
-running_clustered_disc_nodes() ->
- {_, DiscNodes, RunningNodes} = cluster_status(),
- ordsets:intersection(DiscNodes, RunningNodes).
+cluster_nodes(WhichNodes) -> cluster_status(WhichNodes).
%% This function is the actual source of information, since it gets
%% the data from mnesia. Obviously it'll work only when mnesia is
@@ -398,75 +352,64 @@ mnesia_nodes() ->
%% `init_db/3' hasn't been run yet. In other words, either
%% we are a virgin node or a restarted RAM node. In both
%% cases we're not interested in what mnesia has to say.
- IsDiscNode = mnesia:system_info(use_dir),
+ NodeType = case mnesia:system_info(use_dir) of
+ true -> disc;
+ false -> ram
+ end,
Tables = mnesia:system_info(tables),
- {Table, _} = case table_definitions(case IsDiscNode of
- true -> disc;
- false -> ram
- end) of [T|_] -> T end,
+ [{Table, _} | _] = table_definitions(NodeType),
case lists:member(Table, Tables) of
- true ->
- AllNodes =
- ordsets:from_list(mnesia:system_info(db_nodes)),
- DiscCopies = ordsets:from_list(
- mnesia:table_info(schema, disc_copies)),
- DiscNodes =
- case IsDiscNode of
- true -> ordsets:add_element(node(), DiscCopies);
- false -> DiscCopies
- end,
- {ok, {AllNodes, DiscNodes}};
- false ->
- {error, tables_not_present}
+ true -> AllNodes = mnesia:system_info(db_nodes),
+ DiscCopies = mnesia:table_info(schema, disc_copies),
+ DiscNodes = case NodeType of
+ disc -> nodes_incl_me(DiscCopies);
+ ram -> DiscCopies
+ end,
+ {ok, {AllNodes, DiscNodes}};
+ false -> {error, tables_not_present}
end
end.
-cluster_status(WhichNodes, ForceMnesia) ->
+cluster_status(WhichNodes) ->
%% I don't want to call `running_nodes/1' unless if necessary, since it's
%% pretty expensive.
- Nodes = case mnesia_nodes() of
- {ok, {AllNodes, DiscNodes}} ->
- {ok, {AllNodes, DiscNodes,
- fun() -> running_nodes(AllNodes) end}};
- {error, _Reason} when not ForceMnesia ->
- {AllNodes, DiscNodes, RunningNodes} =
- rabbit_node_monitor:read_cluster_status(),
- %% The cluster status file records the status when the node
- %% is online, but we know for sure that the node is offline
- %% now, so we can remove it from the list of running nodes.
- {ok,
- {AllNodes, DiscNodes,
- fun() -> ordsets:del_element(node(), RunningNodes) end}};
- Err = {error, _} ->
- Err
- end,
- case Nodes of
- {ok, {AllNodes1, DiscNodes1, RunningNodesThunk}} ->
- {ok, case WhichNodes of
- status -> {AllNodes1, DiscNodes1, RunningNodesThunk()};
- all -> AllNodes1;
- disc -> DiscNodes1;
- running -> RunningNodesThunk()
- end};
- Err1 = {error, _} ->
- Err1
+ {AllNodes1, DiscNodes1, RunningNodesThunk} =
+ case mnesia_nodes() of
+ {ok, {AllNodes, DiscNodes}} ->
+ {AllNodes, DiscNodes, fun() -> running_nodes(AllNodes) end};
+ {error, _Reason} ->
+ {AllNodes, DiscNodes, RunningNodes} =
+ rabbit_node_monitor:read_cluster_status(),
+ %% The cluster status file records the status when the node is
+ %% online, but we know for sure that the node is offline now, so
+ %% we can remove it from the list of running nodes.
+ {AllNodes, DiscNodes, fun() -> nodes_excl_me(RunningNodes) end}
+ end,
+ case WhichNodes of
+ status -> {AllNodes1, DiscNodes1, RunningNodesThunk()};
+ all -> AllNodes1;
+ disc -> DiscNodes1;
+ ram -> AllNodes1 -- DiscNodes1;
+ running -> RunningNodesThunk()
end.
-cluster_status(WhichNodes) ->
- {ok, Status} = cluster_status(WhichNodes, false),
- Status.
-
-cluster_status() -> cluster_status(status).
-
-cluster_status_from_mnesia() -> cluster_status(status, true).
+cluster_status_from_mnesia() ->
+ case mnesia_nodes() of
+ {ok, {AllNodes, DiscNodes}} -> {ok, {AllNodes, DiscNodes,
+ running_nodes(AllNodes)}};
+ {error, _} = Err -> Err
+ end.
node_info() ->
{erlang:system_info(otp_release), rabbit_misc:version(),
cluster_status_from_mnesia()}.
-is_disc_node() ->
- DiscNodes = clustered_disc_nodes(),
- DiscNodes =:= [] orelse ordsets:is_element(node(), DiscNodes).
+node_type() ->
+ DiscNodes = cluster_nodes(disc),
+ case DiscNodes =:= [] orelse me_in_nodes(DiscNodes) of
+ true -> disc;
+ false -> ram
+ end.
dir() -> mnesia:system_info(directory).
@@ -480,21 +423,21 @@ table_names() -> [Tab || {Tab, _} <- table_definitions()].
%% schema if there is the need to and catching up if there are other
%% nodes in the cluster already. It also updates the cluster status
%% file.
-init_db(ClusterNodes, WantDiscNode, Force) ->
- Nodes = change_extra_db_nodes(ClusterNodes, Force),
+init_db(ClusterNodes, NodeType, CheckOtherNodes) ->
+ Nodes = change_extra_db_nodes(ClusterNodes, CheckOtherNodes),
%% Note that we use `system_info' here and not the cluster status
%% since when we start rabbit for the first time the cluster
%% status will say we are a disc node but the tables won't be
%% present yet.
WasDiscNode = mnesia:system_info(use_dir),
- case {Nodes, WasDiscNode, WantDiscNode} of
- {[], _, false} ->
+ case {Nodes, WasDiscNode, NodeType} of
+ {[], _, ram} ->
%% Standalone ram node, we don't want that
throw({error, cannot_create_standalone_ram_node});
- {[], false, true} ->
+ {[], false, disc} ->
%% RAM -> disc, starting from scratch
ok = create_schema();
- {[], true, true} ->
+ {[], true, disc} ->
%% First disc node up
ok;
{[AnotherNode | _], _, _} ->
@@ -507,19 +450,22 @@ init_db(ClusterNodes, WantDiscNode, Force) ->
%% first when moving to RAM mnesia will loudly complain
%% since it doesn't make much sense to do that. But when
%% moving to disc, we need to move the schema first.
- case WantDiscNode of
- true -> create_local_table_copy(schema, disc_copies),
- create_local_table_copies(disc);
- false -> create_local_table_copies(ram),
- create_local_table_copy(schema, ram_copies)
+ case NodeType of
+ disc -> create_local_table_copy(schema, disc_copies),
+ create_local_table_copies(disc);
+ ram -> create_local_table_copies(ram),
+ create_local_table_copy(schema, ram_copies)
end
end,
ensure_schema_integrity(),
rabbit_node_monitor:update_cluster_status(),
ok.
-init_db_and_upgrade(ClusterNodes, WantDiscNode, Force) ->
- ok = init_db(ClusterNodes, WantDiscNode, Force),
+init_db_unchecked(ClusterNodes, NodeType) ->
+ init_db(ClusterNodes, NodeType, false).
+
+init_db_and_upgrade(ClusterNodes, NodeType, CheckOtherNodes) ->
+ ok = init_db(ClusterNodes, NodeType, CheckOtherNodes),
ok = case rabbit_upgrade:maybe_upgrade_local() of
ok -> ok;
starting_from_scratch -> rabbit_version:record_desired();
@@ -527,25 +473,23 @@ init_db_and_upgrade(ClusterNodes, WantDiscNode, Force) ->
end,
%% `maybe_upgrade_local' restarts mnesia, so ram nodes will forget
%% about the cluster
- case WantDiscNode of
- false -> start_mnesia(),
- change_extra_db_nodes(ClusterNodes, true),
- wait_for_replicated_tables();
- true -> ok
+ case NodeType of
+ ram -> start_mnesia(),
+ change_extra_db_nodes(ClusterNodes, false),
+ wait_for_replicated_tables();
+ disc -> ok
end,
ok.
-init_db_with_mnesia(ClusterNodes, WantDiscNode, CheckConsistency, Force) ->
+init_db_with_mnesia(ClusterNodes, NodeType,
+ CheckOtherNodes, CheckConsistency) ->
start_mnesia(CheckConsistency),
try
- init_db_and_upgrade(ClusterNodes, WantDiscNode, Force)
+ init_db_and_upgrade(ClusterNodes, NodeType, CheckOtherNodes)
after
stop_mnesia()
end.
-init_db_with_mnesia(ClusterNodes, WantDiscNode, Force) ->
- init_db_with_mnesia(ClusterNodes, WantDiscNode, true, Force).
-
ensure_mnesia_dir() ->
MnesiaDir = dir() ++ "/",
case filelib:ensure_dir(MnesiaDir) of
@@ -593,7 +537,7 @@ check_schema_integrity() ->
true -> check_table_attributes(Tab, TabDef)
end
end) of
- ok -> ok = wait_for_tables(),
+ ok -> ok = wait_for_tables(table_names()),
check_tables(fun check_table_content/2);
Other -> Other
end.
@@ -628,9 +572,9 @@ copy_db(Destination) ->
ok = ensure_mnesia_not_running(),
rabbit_file:recursive_copy(dir(), Destination).
-wait_for_replicated_tables() -> wait_for_tables(replicated_table_names()).
-
-wait_for_tables() -> wait_for_tables(table_names()).
+wait_for_replicated_tables() ->
+ wait_for_tables([Tab || {Tab, TabDef} <- table_definitions(),
+ not lists:member({local_content, true}, TabDef)]).
wait_for_tables(TableNames) ->
case mnesia:wait_for_tables(TableNames, 30000) of
@@ -649,11 +593,11 @@ check_cluster_consistency() ->
case lists:foldl(
fun (Node, {error, _}) -> check_cluster_consistency(Node);
(_Node, {ok, Status}) -> {ok, Status}
- end, {error, not_found},
- ordsets:del_element(node(), all_clustered_nodes()))
+ end, {error, not_found}, nodes_excl_me(cluster_nodes(all)))
of
{ok, Status = {RemoteAllNodes, _, _}} ->
- case ordsets:is_subset(all_clustered_nodes(), RemoteAllNodes) of
+ case ordsets:is_subset(ordsets:from_list(cluster_nodes(all)),
+ ordsets:from_list(RemoteAllNodes)) of
true ->
ok;
false ->
@@ -673,7 +617,7 @@ check_cluster_consistency() ->
rabbit_node_monitor:write_cluster_status(Status);
{error, not_found} ->
ok;
- E = {error, _} ->
+ {error, _} = E ->
throw(E)
end.
@@ -685,7 +629,7 @@ check_cluster_consistency(Node) ->
{error, not_found};
{OTP, Rabbit, {ok, Status}} ->
case check_consistency(OTP, Rabbit, Node, Status) of
- E = {error, _} -> E;
+ {error, _} = E -> E;
{ok, Res} -> {ok, Res}
end
end.
@@ -695,17 +639,22 @@ check_cluster_consistency(Node) ->
%%--------------------------------------------------------------------
on_node_up(Node) ->
- case running_clustered_disc_nodes() =:= [Node] of
- true -> rabbit_log:info("cluster contains disc nodes again~n");
- false -> ok
+ case running_disc_nodes() of
+ [Node] -> rabbit_log:info("cluster contains disc nodes again~n");
+ _ -> ok
end.
on_node_down(_Node) ->
- case running_clustered_disc_nodes() =:= [] of
- true -> rabbit_log:info("only running disc node went down~n");
- false -> ok
+ case running_disc_nodes() of
+ [] -> rabbit_log:info("only running disc node went down~n");
+ _ -> ok
end.
+running_disc_nodes() ->
+ {_AllNodes, DiscNodes, RunningNodes} = cluster_status(status),
+ ordsets:to_list(ordsets:intersection(ordsets:from_list(DiscNodes),
+ ordsets:from_list(RunningNodes))).
+
%%--------------------------------------------------------------------
%% Internal helpers
%%--------------------------------------------------------------------
@@ -717,18 +666,16 @@ discover_cluster(Nodes) when is_list(Nodes) ->
discover_cluster(Node) ->
OfflineError =
{error, {cannot_discover_cluster,
- "The nodes provided is either offline or not running"}},
- case Node =:= node() of
- true ->
- {error, {cannot_discover_cluster,
- "You provided the current node as node to cluster with"}};
- false ->
- case rpc:call(Node,
- rabbit_mnesia, cluster_status_from_mnesia, []) of
- {badrpc, _Reason} -> OfflineError;
- {error, mnesia_not_running} -> OfflineError;
- {ok, Res} -> {ok, Res}
- end
+ "The nodes provided are either offline or not running"}},
+ case node() of
+ Node -> {error, {cannot_discover_cluster,
+ "Cannot cluster node with itself"}};
+ _ -> case rpc:call(Node,
+ rabbit_mnesia, cluster_status_from_mnesia, []) of
+ {badrpc, _Reason} -> OfflineError;
+ {error, mnesia_not_running} -> OfflineError;
+ {ok, Res} -> {ok, Res}
+ end
end.
%% The tables aren't supposed to be on disk on a ram node
@@ -850,11 +797,6 @@ queue_name_match() ->
resource_match(Kind) ->
#resource{kind = Kind, _='_'}.
-replicated_table_names() ->
- [Tab || {Tab, TabDef} <- table_definitions(),
- not lists:member({local_content, true}, TabDef)
- ].
-
check_table_attributes(Tab, TabDef) ->
{_, ExpAttrs} = proplists:lookup(attributes, TabDef),
case mnesia:table_info(Tab, attributes) of
@@ -877,11 +819,7 @@ check_table_content(Tab, TabDef) ->
end.
check_tables(Fun) ->
- case [Error || {Tab, TabDef} <- table_definitions(
- case is_disc_node() of
- true -> disc;
- false -> ram
- end),
+ case [Error || {Tab, TabDef} <- table_definitions(node_type()),
case Fun(Tab, TabDef) of
ok -> Error = none, false;
{error, Error} -> true
@@ -1004,14 +942,13 @@ remove_node_if_mnesia_running(Node) ->
end.
leave_cluster() ->
- case {is_clustered(),
- running_nodes(ordsets:del_element(node(), all_clustered_nodes()))}
- of
- {false, []} -> ok;
- {_, AllNodes} -> case lists:any(fun leave_cluster/1, AllNodes) of
- true -> ok;
- false -> e(no_running_cluster_nodes)
- end
+ RunningNodes = running_nodes(nodes_excl_me(cluster_nodes(all))),
+ case not is_clustered() andalso RunningNodes =:= [] of
+ true -> ok;
+ false -> case lists:any(fun leave_cluster/1, RunningNodes) of
+ true -> ok;
+ false -> e(no_running_cluster_nodes)
+ end
end.
leave_cluster(Node) ->
@@ -1042,25 +979,25 @@ stop_mnesia() ->
stopped = mnesia:stop(),
ensure_mnesia_not_running().
-change_extra_db_nodes(ClusterNodes0, Force) ->
- ClusterNodes = lists:usort(ClusterNodes0) -- [node()],
- case mnesia:change_config(extra_db_nodes, ClusterNodes) of
- {ok, []} when not Force andalso ClusterNodes =/= [] ->
+change_extra_db_nodes(ClusterNodes0, CheckOtherNodes) ->
+ ClusterNodes = nodes_excl_me(ClusterNodes0),
+ case {mnesia:change_config(extra_db_nodes, ClusterNodes), ClusterNodes} of
+ {{ok, []}, [_|_]} when CheckOtherNodes ->
throw({error, {failed_to_cluster_with, ClusterNodes,
"Mnesia could not connect to any nodes."}});
- {ok, Nodes} ->
+ {{ok, Nodes}, _} ->
Nodes
end.
-%% We're not using `mnesia:system_info(running_db_nodes)' directly because if
-%% the node is a RAM node it won't know about other nodes when mnesia is stopped
+%% We're not using `mnesia:system_info(running_db_nodes)' directly
+%% because if the node is a RAM node it won't know about other nodes
+%% when mnesia is stopped
running_nodes(Nodes) ->
- {Replies, _BadNodes} =
- rpc:multicall(Nodes, rabbit_mnesia, is_running_remote, []),
+ {Replies, _BadNodes} = rpc:multicall(Nodes,
+ rabbit_mnesia, is_running_remote, []),
[Node || {Running, Node} <- Replies, Running].
-is_running_remote() ->
- {mnesia:system_info(is_running) =:= yes, node()}.
+is_running_remote() -> {mnesia:system_info(is_running) =:= yes, node()}.
check_consistency(OTP, Rabbit) ->
rabbit_misc:sequence_error(
@@ -1073,15 +1010,14 @@ check_consistency(OTP, Rabbit, Node, Status) ->
check_nodes_consistency(Node, Status)]).
check_nodes_consistency(Node, RemoteStatus = {RemoteAllNodes, _, _}) ->
- ThisNode = node(),
- case ordsets:is_element(ThisNode, RemoteAllNodes) of
+ case me_in_nodes(RemoteAllNodes) of
true ->
{ok, RemoteStatus};
false ->
{error, {inconsistent_cluster,
rabbit_misc:format("Node ~p thinks it's clustered "
"with node ~p, but ~p disagrees",
- [ThisNode, Node, Node])}}
+ [node(), Node, Node])}}
end.
check_version_consistency(This, Remote, _) when This =:= Remote ->
@@ -1106,13 +1042,16 @@ check_rabbit_consistency(Remote) ->
%% `rabbit_node_monitor:prepare_cluster_status_file/0'.
is_virgin_node() ->
case rabbit_file:list_dir(dir()) of
- {error, enoent} -> true;
- {ok, []} -> true;
+ {error, enoent} ->
+ true;
+ {ok, []} ->
+ true;
{ok, [File1, File2]} ->
lists:usort([dir() ++ "/" ++ File1, dir() ++ "/" ++ File2]) =:=
lists:usort([rabbit_node_monitor:cluster_status_filename(),
rabbit_node_monitor:running_nodes_filename()]);
- {ok, _} -> false
+ {ok, _} ->
+ false
end.
find_good_node([]) ->
@@ -1126,6 +1065,16 @@ find_good_node([Node | Nodes]) ->
end
end.
+is_only_clustered_disc_node() ->
+ node_type() =:= disc andalso is_clustered() andalso
+ cluster_nodes(disc) =:= [node()].
+
+me_in_nodes(Nodes) -> lists:member(node(), Nodes).
+
+nodes_incl_me(Nodes) -> lists:usort([node()|Nodes]).
+
+nodes_excl_me(Nodes) -> Nodes -- [node()].
+
e(Tag) -> throw({error, {Tag, error_description(Tag)}}).
error_description(clustering_only_disc_node) ->
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index 2d0ded12..5cf8d1ae 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -295,7 +295,7 @@ start_ssl_client(SslOpts, Sock) ->
start_client(Sock, ssl_transform_fun(SslOpts)).
connections() ->
- rabbit_misc:append_rpc_all_nodes(rabbit_mnesia:running_clustered_nodes(),
+ rabbit_misc:append_rpc_all_nodes(rabbit_mnesia:cluster_nodes(running),
rabbit_networking, connections_local, []).
connections_local() ->
diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl
index 88037953..026aa362 100644
--- a/src/rabbit_node_monitor.erl
+++ b/src/rabbit_node_monitor.erl
@@ -18,29 +18,16 @@
-behaviour(gen_server).
+-export([start_link/0]).
-export([running_nodes_filename/0,
- cluster_status_filename/0,
- prepare_cluster_status_files/0,
- write_cluster_status/1,
- read_cluster_status/0,
- update_cluster_status/0,
- reset_cluster_status/0,
-
- joined_cluster/2,
- notify_joined_cluster/0,
- left_cluster/1,
- notify_left_cluster/1,
- node_up/2,
- notify_node_up/0,
-
- start_link/0,
- init/1,
- handle_call/3,
- handle_cast/2,
- handle_info/2,
- terminate/2,
- code_change/3
- ]).
+ cluster_status_filename/0, prepare_cluster_status_files/0,
+ write_cluster_status/1, read_cluster_status/0,
+ update_cluster_status/0, reset_cluster_status/0]).
+-export([notify_node_up/0, notify_joined_cluster/0, notify_left_cluster/1]).
+
+%% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
+ code_change/3]).
-define(SERVER, ?MODULE).
-define(RABBIT_UP_RPC_TIMEOUT, 2000).
@@ -49,6 +36,8 @@
-ifdef(use_specs).
+-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
+
-spec(running_nodes_filename/0 :: () -> string()).
-spec(cluster_status_filename/0 :: () -> string()).
-spec(prepare_cluster_status_files/0 :: () -> 'ok').
@@ -57,27 +46,31 @@
-spec(update_cluster_status/0 :: () -> 'ok').
-spec(reset_cluster_status/0 :: () -> 'ok').
--spec(joined_cluster/2 :: (node(), boolean()) -> 'ok').
+-spec(notify_node_up/0 :: () -> 'ok').
-spec(notify_joined_cluster/0 :: () -> 'ok').
--spec(left_cluster/1 :: (node()) -> 'ok').
-spec(notify_left_cluster/1 :: (node()) -> 'ok').
--spec(node_up/2 :: (node(), boolean()) -> 'ok').
--spec(notify_node_up/0 :: () -> 'ok').
-endif.
%%----------------------------------------------------------------------------
+%% Start
+%%----------------------------------------------------------------------------
+
+start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
+
+%%----------------------------------------------------------------------------
%% Cluster file operations
%%----------------------------------------------------------------------------
-%% The cluster file information is kept in two files. The "cluster status file"
-%% contains all the clustered nodes and the disc nodes. The "running nodes
-%% file" contains the currently running nodes or the running nodes at shutdown
-%% when the node is down.
+%% The cluster file information is kept in two files. The "cluster
+%% status file" contains all the clustered nodes and the disc nodes.
+%% The "running nodes file" contains the currently running nodes or
+%% the running nodes at shutdown when the node is down.
%%
-%% We strive to keep the files up to date and we rely on this assumption in
-%% various situations. Obviously when mnesia is offline the information we have
-%% will be outdated, but it can't be otherwise.
+%% We strive to keep the files up to date and we rely on this
+%% assumption in various situations. Obviously when mnesia is offline
+%% the information we have will be outdated, but it cannot be
+%% otherwise.
running_nodes_filename() ->
filename:join(rabbit_mnesia:dir(), "nodes_running_at_shutdown").
@@ -93,6 +86,10 @@ prepare_cluster_status_files() ->
{ok, _ } -> CorruptFiles();
{error, enoent} -> []
end,
+ ThisNode = [node()],
+ %% The running nodes file might contain a set or a list, in case
+ %% of the legacy file
+ RunningNodes2 = lists:usort(ThisNode ++ RunningNodes1),
{AllNodes1, WantDiscNode} =
case try_read_file(cluster_status_filename()) of
{ok, [{AllNodes, DiscNodes0}]} ->
@@ -105,16 +102,11 @@ prepare_cluster_status_files() ->
{error, enoent} ->
{legacy_cluster_nodes([]), true}
end,
-
- ThisNode = [node()],
-
- RunningNodes2 = lists:usort(RunningNodes1 ++ ThisNode),
AllNodes2 = lists:usort(AllNodes1 ++ RunningNodes2),
DiscNodes = case WantDiscNode of
true -> ThisNode;
false -> []
end,
-
ok = write_cluster_status({AllNodes2, DiscNodes, RunningNodes2}).
write_cluster_status({All, Disc, Running}) ->
@@ -132,13 +124,6 @@ write_cluster_status({All, Disc, Running}) ->
{FN, {error, E2}} -> throw({error, {could_not_write_file, FN, E2}})
end.
-try_read_file(FileName) ->
- case rabbit_file:read_term_file(FileName) of
- {ok, Term} -> {ok, Term};
- {error, enoent} -> {error, enoent};
- {error, E} -> throw({error, {cannot_read_file, FileName, E}})
- end.
-
read_cluster_status() ->
case {try_read_file(cluster_status_filename()),
try_read_file(running_nodes_filename())} of
@@ -159,88 +144,78 @@ reset_cluster_status() ->
%% Cluster notifications
%%----------------------------------------------------------------------------
-joined_cluster(Node, IsDiscNode) ->
- gen_server:cast(?SERVER, {joined_cluster, Node, IsDiscNode}).
+notify_node_up() ->
+ Nodes = rabbit_mnesia:cluster_nodes(running) -- [node()],
+ gen_server:abcast(Nodes, ?SERVER,
+ {node_up, node(), rabbit_mnesia:node_type()}),
+ %% register other active rabbits with this rabbit
+ DiskNodes = rabbit_mnesia:cluster_nodes(disc),
+ [gen_server:cast(?SERVER, {node_up, N, case lists:member(N, DiskNodes) of
+ true -> disc;
+ false -> ram
+ end}) || N <- Nodes],
+ ok.
notify_joined_cluster() ->
- cluster_multicall(joined_cluster, [node(), rabbit_mnesia:is_disc_node()]),
+ Nodes = rabbit_mnesia:cluster_nodes(running) -- [node()],
+ gen_server:abcast(Nodes, ?SERVER,
+ {joined_cluster, node(), rabbit_mnesia:node_type()}),
ok.
-left_cluster(Node) ->
- gen_server:cast(?SERVER, {left_cluster, Node}).
-
notify_left_cluster(Node) ->
- left_cluster(Node),
- cluster_multicall(left_cluster, [Node]),
- ok.
-
-node_up(Node, IsDiscNode) ->
- gen_server:cast(?SERVER, {node_up, Node, IsDiscNode}).
-
-notify_node_up() ->
- Nodes = cluster_multicall(node_up, [node(), rabbit_mnesia:is_disc_node()]),
- %% register other active rabbits with this rabbit
- [ node_up(N, ordsets:is_element(N, rabbit_mnesia:clustered_disc_nodes())) ||
- N <- Nodes ],
+ Nodes = rabbit_mnesia:cluster_nodes(running),
+ gen_server:abcast(Nodes, ?SERVER, {left_cluster, Node}),
ok.
%%----------------------------------------------------------------------------
%% gen_server callbacks
%%----------------------------------------------------------------------------
-start_link() ->
- gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
-
-init([]) ->
- {ok, no_state}.
+init([]) -> {ok, pmon:new()}.
handle_call(_Request, _From, State) ->
{noreply, State}.
-%% Note: when updating the status file, we can't simply write the mnesia
-%% information since the message can (and will) overtake the mnesia propagation.
-handle_cast({node_up, Node, IsDiscNode}, State) ->
- case is_already_monitored({rabbit, Node}) of
- true -> {noreply, State};
+%% Note: when updating the status file, we can't simply write the
+%% mnesia information since the message can (and will) overtake the
+%% mnesia propagation.
+handle_cast({node_up, Node, NodeType}, Monitors) ->
+ case pmon:is_monitored({rabbit, Node}, Monitors) of
+ true -> {noreply, Monitors};
false -> rabbit_log:info("rabbit on node ~p up~n", [Node]),
{AllNodes, DiscNodes, RunningNodes} = read_cluster_status(),
- write_cluster_status({ordsets:add_element(Node, AllNodes),
- case IsDiscNode of
- true -> ordsets:add_element(
- Node, DiscNodes);
- false -> DiscNodes
+ write_cluster_status({add_node(Node, AllNodes),
+ case NodeType of
+ disc -> add_node(Node, DiscNodes);
+ ram -> DiscNodes
end,
- ordsets:add_element(Node, RunningNodes)}),
- erlang:monitor(process, {rabbit, Node}),
+ add_node(Node, RunningNodes)}),
ok = handle_live_rabbit(Node),
- {noreply, State}
+ {noreply, pmon:monitor({rabbit, Node}, Monitors)}
end;
-handle_cast({joined_cluster, Node, IsDiscNode}, State) ->
+handle_cast({joined_cluster, Node, NodeType}, State) ->
{AllNodes, DiscNodes, RunningNodes} = read_cluster_status(),
- write_cluster_status({ordsets:add_element(Node, AllNodes),
- case IsDiscNode of
- true -> ordsets:add_element(Node,
- DiscNodes);
- false -> DiscNodes
+ write_cluster_status({add_node(Node, AllNodes),
+ case NodeType of
+ disc -> add_node(Node, DiscNodes);
+ ram -> DiscNodes
end,
RunningNodes}),
{noreply, State};
handle_cast({left_cluster, Node}, State) ->
{AllNodes, DiscNodes, RunningNodes} = read_cluster_status(),
- write_cluster_status({ordsets:del_element(Node, AllNodes),
- ordsets:del_element(Node, DiscNodes),
- ordsets:del_element(Node, RunningNodes)}),
+ write_cluster_status({del_node(Node, AllNodes), del_node(Node, DiscNodes),
+ del_node(Node, RunningNodes)}),
{noreply, State};
handle_cast(_Msg, State) ->
{noreply, State}.
-handle_info({'DOWN', _MRef, process, {rabbit, Node}, _Reason}, State) ->
+handle_info({'DOWN', _MRef, process, {rabbit, Node}, _Reason}, Monitors) ->
rabbit_log:info("rabbit on node ~p down~n", [Node]),
{AllNodes, DiscNodes, RunningNodes} = read_cluster_status(),
- write_cluster_status({AllNodes, DiscNodes,
- ordsets:del_element(Node, RunningNodes)}),
+ write_cluster_status({AllNodes, DiscNodes, del_node(Node, RunningNodes)}),
ok = handle_dead_rabbit(Node),
- {noreply, State};
+ {noreply, pmon:erase({rabbit, Node}, Monitors)};
handle_info(_Info, State) ->
{noreply, State}.
@@ -271,27 +246,22 @@ handle_live_rabbit(Node) ->
%% Internal utils
%%--------------------------------------------------------------------
-cluster_multicall(Fun, Args) ->
- Node = node(),
- Nodes = rabbit_mnesia:running_clustered_nodes() -- [Node],
- %% notify other rabbits of this cluster
- case rpc:multicall(Nodes, rabbit_node_monitor, Fun, Args,
- ?RABBIT_UP_RPC_TIMEOUT) of
- {_, [] } -> ok;
- {_, Bad} -> rabbit_log:info("failed to contact nodes ~p~n", [Bad])
- end,
- Nodes.
-
-is_already_monitored(Item) ->
- {monitors, Monitors} = process_info(self(), monitors),
- lists:any(fun ({_, Item1}) when Item =:= Item1 -> true;
- (_) -> false
- end, Monitors).
+try_read_file(FileName) ->
+ case rabbit_file:read_term_file(FileName) of
+ {ok, Term} -> {ok, Term};
+ {error, enoent} -> {error, enoent};
+ {error, E} -> throw({error, {cannot_read_file, FileName, E}})
+ end.
legacy_cluster_nodes(Nodes) ->
- %% We get all the info that we can, including the nodes from mnesia, which
- %% will be there if the node is a disc node (empty list otherwise)
+ %% We get all the info that we can, including the nodes from
+ %% mnesia, which will be there if the node is a disc node (empty
+ %% list otherwise)
lists:usort(Nodes ++ mnesia:system_info(db_nodes)).
legacy_should_be_disc_node(DiscNodes) ->
DiscNodes == [] orelse lists:member(node(), DiscNodes).
+
+add_node(Node, Nodes) -> lists:usort([Node | Nodes]).
+
+del_node(Node, Nodes) -> Nodes -- [Node].
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 08535e7d..df0ee721 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1511,15 +1511,15 @@ clean_logs(Files, Suffix) ->
ok.
assert_ram_node() ->
- case rabbit_mnesia:is_disc_node() of
- true -> exit('not_ram_node');
- false -> ok
+ case rabbit_mnesia:node_type() of
+ disc -> exit('not_ram_node');
+ ram -> ok
end.
assert_disc_node() ->
- case rabbit_mnesia:is_disc_node() of
- true -> ok;
- false -> exit('not_disc_node')
+ case rabbit_mnesia:node_type() of
+ disc -> ok;
+ ram -> exit('not_disc_node')
end.
delete_file(File) ->
diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl
index 3fbfeed0..d037f954 100644
--- a/src/rabbit_upgrade.erl
+++ b/src/rabbit_upgrade.erl
@@ -66,11 +66,11 @@
%% into the boot process by prelaunch before the mnesia application is
%% started. By the time Mnesia is started the upgrades have happened
%% (on the primary), or Mnesia has been reset (on the secondary) and
-%% rabbit_mnesia:init_db/3 can then make the node rejoin the cluster
+%% rabbit_mnesia:init_db_unchecked/2 can then make the node rejoin the cluster
%% in the normal way.
%%
%% The non-mnesia upgrades are then triggered by
-%% rabbit_mnesia:init_db/3. Of course, it's possible for a given
+%% rabbit_mnesia:init_db_unchecked/2. Of course, it's possible for a given
%% upgrade process to only require Mnesia upgrades, or only require
%% non-Mnesia upgrades. In the latter case no Mnesia resets and
%% reclusterings occur.
@@ -121,16 +121,16 @@ remove_backup() ->
info("upgrades: Mnesia backup removed~n", []).
maybe_upgrade_mnesia() ->
- AllNodes = rabbit_mnesia:all_clustered_nodes(),
+ AllNodes = rabbit_mnesia:cluster_nodes(all),
case rabbit_version:upgrades_required(mnesia) of
{error, starting_from_scratch} ->
ok;
{error, version_not_available} ->
case AllNodes of
- [_] -> ok;
- _ -> die("Cluster upgrade needed but upgrading from "
- "< 2.1.1.~nUnfortunately you will need to "
- "rebuild the cluster.", [])
+ [] -> die("Cluster upgrade needed but upgrading from "
+ "< 2.1.1.~nUnfortunately you will need to "
+ "rebuild the cluster.", []);
+ _ -> ok
end;
{error, _} = Err ->
throw(Err);
@@ -147,11 +147,11 @@ maybe_upgrade_mnesia() ->
upgrade_mode(AllNodes) ->
case nodes_running(AllNodes) of
[] ->
- AfterUs = rabbit_mnesia:running_clustered_nodes() -- [node()],
- case {is_disc_node_legacy(), AfterUs} of
- {true, []} ->
+ AfterUs = rabbit_mnesia:cluster_nodes(running) -- [node()],
+ case {node_type_legacy(), AfterUs} of
+ {disc, []} ->
primary;
- {true, _} ->
+ {disc, _} ->
Filename = rabbit_node_monitor:running_nodes_filename(),
die("Cluster upgrade needed but other disc nodes shut "
"down after this one.~nPlease first start the last "
@@ -160,7 +160,7 @@ upgrade_mode(AllNodes) ->
"all~nshow this message. In which case, remove "
"the lock file on one of them and~nstart that node. "
"The lock file on this node is:~n~n ~s ", [Filename]);
- {false, _} ->
+ {ram, _} ->
die("Cluster upgrade needed but this is a ram node.~n"
"Please first start the last disc node to shut down.",
[])
@@ -216,11 +216,11 @@ force_tables() ->
secondary_upgrade(AllNodes) ->
%% must do this before we wipe out schema
- IsDiscNode = is_disc_node_legacy(),
+ NodeType = node_type_legacy(),
rabbit_misc:ensure_ok(mnesia:delete_schema([node()]),
cannot_delete_schema),
rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia),
- ok = rabbit_mnesia:init_db(AllNodes, IsDiscNode, true),
+ ok = rabbit_mnesia:init_db_unchecked(AllNodes, NodeType),
ok = rabbit_version:record_desired_for_scope(mnesia),
ok.
@@ -268,13 +268,16 @@ lock_filename() -> lock_filename(dir()).
lock_filename(Dir) -> filename:join(Dir, ?LOCK_FILENAME).
backup_dir() -> dir() ++ "-upgrade-backup".
-is_disc_node_legacy() ->
+node_type_legacy() ->
%% This is pretty ugly but we can't start Mnesia and ask it (will
%% hang), we can't look at the config file (may not include us
%% even if we're a disc node). We also can't use
- %% rabbit_mnesia:is_disc_node/0 because that will give false
+ %% rabbit_mnesia:node_type/0 because that will give false
%% postivies on Rabbit up to 2.5.1.
- filelib:is_regular(filename:join(dir(), "rabbit_durable_exchange.DCD")).
+ case filelib:is_regular(filename:join(dir(), "rabbit_durable_exchange.DCD")) of
+ true -> disc;
+ false -> ram
+ end.
%% NB: we cannot use rabbit_log here since it may not have been
%% started yet
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 59c0bbeb..68c659df 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -17,7 +17,7 @@
-module(rabbit_variable_queue).
-export([init/3, terminate/2, delete_and_terminate/2, purge/1,
- publish/5, publish_delivered/5, drain_confirmed/1,
+ publish/4, publish_delivered/5, drain_confirmed/1,
dropwhile/3, fetch/2, ack/2, requeue/2, len/1, is_empty/1,
depth/1, set_ram_duration_target/2, ram_duration/1,
needs_timeout/1, timeout/1, handle_pre_hibernate/1, status/1, invoke/3,
@@ -521,16 +521,16 @@ purge(State = #vqstate { q4 = Q4,
publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId },
MsgProps = #message_properties { needs_confirming = NeedsConfirming },
- _ChPid, Redelivered, State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4,
- next_seq_id = SeqId,
- len = Len,
- in_counter = InCount,
- persistent_count = PCount,
- durable = IsDurable,
- ram_msg_count = RamMsgCount,
- unconfirmed = UC }) ->
+ _ChPid, State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4,
+ next_seq_id = SeqId,
+ len = Len,
+ in_counter = InCount,
+ persistent_count = PCount,
+ durable = IsDurable,
+ ram_msg_count = RamMsgCount,
+ unconfirmed = UC }) ->
IsPersistent1 = IsDurable andalso IsPersistent,
- MsgStatus = msg_status(IsPersistent1, SeqId, Msg, MsgProps, Redelivered),
+ MsgStatus = msg_status(IsPersistent1, SeqId, Msg, MsgProps),
{MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State),
State2 = case ?QUEUE:is_empty(Q3) of
false -> State1 #vqstate { q1 = ?QUEUE:in(m(MsgStatus1), Q1) };
@@ -566,7 +566,7 @@ publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent,
durable = IsDurable,
unconfirmed = UC }) ->
IsPersistent1 = IsDurable andalso IsPersistent,
- MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, MsgProps, false))
+ MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, MsgProps))
#msg_status { is_delivered = true },
{MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State),
State2 = record_pending_ack(m(MsgStatus1), State1),
@@ -874,9 +874,10 @@ gb_sets_maybe_insert(false, _Val, Set) -> Set;
gb_sets_maybe_insert(true, Val, Set) -> gb_sets:add(Val, Set).
msg_status(IsPersistent, SeqId, Msg = #basic_message { id = MsgId },
- MsgProps, Redelivered) ->
+ MsgProps = #message_properties{delivered = Delivered}) ->
+ %% TODO would it make sense to remove #msg_status.is_delivered?
#msg_status { seq_id = SeqId, msg_id = MsgId, msg = Msg,
- is_persistent = IsPersistent, is_delivered = Redelivered,
+ is_persistent = IsPersistent, is_delivered = Delivered,
msg_on_disk = false, index_on_disk = false,
msg_props = MsgProps }.