diff options
author | Tim Watson <tim@rabbitmq.com> | 2012-10-15 14:26:36 +0100 |
---|---|---|
committer | Tim Watson <tim@rabbitmq.com> | 2012-10-15 14:26:36 +0100 |
commit | d5f3ca76a2185facd4a55c29698750558e3bd132 (patch) | |
tree | 9dbc28e5d08d3b946626891c1419aaddd96ba681 | |
parent | e7bc7b820702b26f6ab0ac3de5859dc21d4b36a4 (diff) | |
parent | 7562d6dbacb54e30e4307eff63632e0ca3ec25fa (diff) | |
download | rabbitmq-server-d5f3ca76a2185facd4a55c29698750558e3bd132.tar.gz |
merge default
-rw-r--r-- | src/gatherer.erl | 51 | ||||
-rw-r--r-- | src/rabbit.erl | 2 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 77 | ||||
-rw-r--r-- | src/rabbit_backing_queue.erl | 7 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_coordinator.erl | 35 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_master.erl | 31 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_misc.erl | 91 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 85 | ||||
-rw-r--r-- | src/rabbit_queue_index.erl | 2 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 45 | ||||
-rw-r--r-- | src/rabbit_vm.erl | 129 |
11 files changed, 344 insertions, 211 deletions
diff --git a/src/gatherer.erl b/src/gatherer.erl index 98b36038..29d2d713 100644 --- a/src/gatherer.erl +++ b/src/gatherer.erl @@ -18,7 +18,7 @@ -behaviour(gen_server2). --export([start_link/0, stop/1, fork/1, finish/1, in/2, out/1]). +-export([start_link/0, stop/1, fork/1, finish/1, in/2, sync_in/2, out/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -32,6 +32,7 @@ -spec(fork/1 :: (pid()) -> 'ok'). -spec(finish/1 :: (pid()) -> 'ok'). -spec(in/2 :: (pid(), any()) -> 'ok'). +-spec(sync_in/2 :: (pid(), any()) -> 'ok'). -spec(out/1 :: (pid()) -> {'value', any()} | 'empty'). -endif. @@ -62,6 +63,9 @@ finish(Pid) -> in(Pid, Value) -> gen_server2:cast(Pid, {in, Value}). +sync_in(Pid, Value) -> + gen_server2:call(Pid, {in, Value}, infinity). + out(Pid) -> gen_server2:call(Pid, out, infinity). @@ -78,19 +82,22 @@ handle_call(stop, _From, State) -> handle_call(fork, _From, State = #gstate { forks = Forks }) -> {reply, ok, State #gstate { forks = Forks + 1 }, hibernate}; +handle_call({in, Value}, From, State) -> + {noreply, in(Value, From, State), hibernate}; + handle_call(out, From, State = #gstate { forks = Forks, values = Values, blocked = Blocked }) -> case queue:out(Values) of + {empty, _} when Forks == 0 -> + {reply, empty, State, hibernate}; {empty, _} -> - case Forks of - 0 -> {reply, empty, State, hibernate}; - _ -> {noreply, - State #gstate { blocked = queue:in(From, Blocked) }, - hibernate} - end; - {{value, _Value} = V, NewValues} -> - {reply, V, State #gstate { values = NewValues }, hibernate} + {noreply, State #gstate { blocked = queue:in(From, Blocked) }, + hibernate}; + {{value, {PendingIn, Value}}, NewValues} -> + reply(PendingIn, ok), + {reply, {value, Value}, State #gstate { values = NewValues }, + hibernate} end; handle_call(Msg, _From, State) -> @@ -107,15 +114,8 @@ handle_cast(finish, State = #gstate { forks = Forks, blocked = Blocked }) -> {noreply, State #gstate { forks = NewForks, blocked = NewBlocked }, hibernate}; -handle_cast({in, Value}, State = #gstate { values = Values, - blocked = Blocked }) -> - {noreply, case queue:out(Blocked) of - {empty, _} -> - State #gstate { values = queue:in(Value, Values) }; - {{value, From}, NewBlocked} -> - gen_server2:reply(From, {value, Value}), - State #gstate { blocked = NewBlocked } - end, hibernate}; +handle_cast({in, Value}, State) -> + {noreply, in(Value, undefined, State), hibernate}; handle_cast(Msg, State) -> {stop, {unexpected_cast, Msg}, State}. @@ -128,3 +128,18 @@ code_change(_OldVsn, State, _Extra) -> terminate(_Reason, State) -> State. + +%%---------------------------------------------------------------------------- + +in(Value, From, State = #gstate { values = Values, blocked = Blocked }) -> + case queue:out(Blocked) of + {empty, _} -> + State #gstate { values = queue:in({From, Value}, Values) }; + {{value, PendingOut}, NewBlocked} -> + reply(From, ok), + gen_server2:reply(PendingOut, {value, Value}), + State #gstate { blocked = NewBlocked } + end. + +reply(undefined, _Reply) -> ok; +reply(From, Reply) -> gen_server2:reply(From, Reply). diff --git a/src/rabbit.erl b/src/rabbit.erl index 7b417b00..93808f84 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -364,7 +364,7 @@ status() -> {running_applications, application:which_applications(infinity)}, {os, os:type()}, {erlang_version, erlang:system_info(system_version)}, - {memory, erlang:memory()}], + {memory, rabbit_vm:memory()}], S2 = rabbit_misc:filter_exit_map( fun ({Key, {M, F, A}}) -> {Key, erlang:apply(M, F, A)} end, [{vm_memory_high_watermark, {vm_memory_monitor, diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index f137ae91..b8aad11a 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -497,32 +497,21 @@ confirm_messages(MsgIds, State = #q{msg_id_to_channel = MTC}) -> rabbit_misc:gb_trees_foreach(fun rabbit_misc:confirm_to_sender/2, CMs), State#q{msg_id_to_channel = MTC1}. -should_confirm_message(#delivery{msg_seq_no = undefined}, _State) -> - never; -should_confirm_message(#delivery{sender = SenderPid, +send_or_record_confirm(#delivery{msg_seq_no = undefined}, State) -> + {never, State}; +send_or_record_confirm(#delivery{sender = SenderPid, msg_seq_no = MsgSeqNo, message = #basic_message { is_persistent = true, id = MsgId}}, - #q{q = #amqqueue{durable = true}}) -> - {eventually, SenderPid, MsgSeqNo, MsgId}; -should_confirm_message(#delivery{sender = SenderPid, - msg_seq_no = MsgSeqNo}, - _State) -> - {immediately, SenderPid, MsgSeqNo}. - -needs_confirming({eventually, _, _, _}) -> true; -needs_confirming(_) -> false. - -maybe_record_confirm_message({eventually, SenderPid, MsgSeqNo, MsgId}, - State = #q{msg_id_to_channel = MTC}) -> - State#q{msg_id_to_channel = - gb_trees:insert(MsgId, {SenderPid, MsgSeqNo}, MTC)}; -maybe_record_confirm_message({immediately, SenderPid, MsgSeqNo}, State) -> + State = #q{q = #amqqueue{durable = true}, + msg_id_to_channel = MTC}) -> + MTC1 = gb_trees:insert(MsgId, {SenderPid, MsgSeqNo}, MTC), + {eventually, State#q{msg_id_to_channel = MTC1}}; +send_or_record_confirm(#delivery{sender = SenderPid, + msg_seq_no = MsgSeqNo}, State) -> rabbit_misc:confirm_to_sender(SenderPid, [MsgSeqNo]), - State; -maybe_record_confirm_message(_Confirm, State) -> - State. + {immediately, State}. run_message_queue(State) -> State1 = #q{backing_queue = BQ, backing_queue_state = BQS} = @@ -544,33 +533,27 @@ attempt_delivery(#delivery{sender = SenderPid, message = Message}, Props, {{Message, Props#message_properties.delivered, AckTag}, true, State1#q{backing_queue_state = BQS3}} end, false, State#q{backing_queue_state = BQS1}); - {Duplicate, BQS1} -> - %% if the message has previously been seen by the BQ then - %% it must have been seen under the same circumstances as - %% now: i.e. if it is now a deliver_immediately then it - %% must have been before. - {case Duplicate of - published -> true; - discarded -> false - end, - State#q{backing_queue_state = BQS1}} + {published, BQS1} -> + {true, State#q{backing_queue_state = BQS1}}; + {discarded, BQS1} -> + {false, State#q{backing_queue_state = BQS1}} end. -deliver_or_enqueue(Delivery = #delivery{message = Message, - sender = SenderPid}, Delivered, - State) -> - Confirm = should_confirm_message(Delivery, State), +deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid}, + Delivered, State) -> + {Confirm, State1} = send_or_record_confirm(Delivery, State), Props = message_properties(Confirm, Delivered, State), - case attempt_delivery(Delivery, Props, State) of - {true, State1} -> - maybe_record_confirm_message(Confirm, State1); + case attempt_delivery(Delivery, Props, State1) of + {true, State2} -> + State2; %% the next one is an optimisations %% TODO: optimise the Confirm =/= never case too - {false, State1 = #q{ttl = 0, dlx = undefined}} when Confirm == never -> - discard_delivery(Delivery, State1); - {false, State1} -> - State2 = #q{backing_queue = BQ, backing_queue_state = BQS} = - maybe_record_confirm_message(Confirm, State1), + {false, State2 = #q{ttl = 0, dlx = undefined, + backing_queue = BQ, backing_queue_state = BQS}} + when Confirm == never -> + BQS1 = BQ:discard(Message, SenderPid, BQS), + State2#q{backing_queue_state = BQS1}; + {false, State2 = #q{backing_queue = BQ, backing_queue_state = BQS}} -> BQS1 = BQ:publish(Message, Props, SenderPid, BQS), ensure_ttl_timer(Props#message_properties.expiry, State2#q{backing_queue_state = BQS1}) @@ -690,15 +673,9 @@ subtract_acks(ChPid, AckTags, State, Fun) -> Fun(State) end. -discard_delivery(#delivery{sender = SenderPid, - message = Message}, - State = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> - State#q{backing_queue_state = BQ:discard(Message, SenderPid, BQS)}. - message_properties(Confirm, Delivered, #q{ttl = TTL}) -> #message_properties{expiry = calculate_msg_expiry(TTL), - needs_confirming = needs_confirming(Confirm), + needs_confirming = Confirm == eventually, delivered = Delivered}. calculate_msg_expiry(undefined) -> undefined; diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index d69a6c3b..c6d17785 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -24,6 +24,7 @@ -type(ack() :: any()). -type(state() :: any()). +-type(msg_ids() :: [rabbit_types:msg_id()]). -type(fetch_result(Ack) :: ('empty' | %% Message, IsDelivered, AckTag, Remaining_Len @@ -117,7 +118,7 @@ %% first time the message id appears in the result of %% drain_confirmed. All subsequent appearances of that message id will %% be ignored. --callback drain_confirmed(state()) -> {[rabbit_guid:guid()], state()}. +-callback drain_confirmed(state()) -> {msg_ids(), state()}. %% Drop messages from the head of the queue while the supplied predicate returns %% true. Also accepts a boolean parameter that determines whether the messages @@ -136,7 +137,7 @@ %% Acktags supplied are for messages which can now be forgotten %% about. Must return 1 msg_id per Ack, in the same order as Acks. --callback ack([ack()], state()) -> {[rabbit_guid:guid()], state()}. +-callback ack([ack()], state()) -> {msg_ids(), state()}. %% Acktags supplied are for messages which should be processed. The %% provided callback function is called with each message. @@ -144,7 +145,7 @@ %% Reinsert messages into the queue which have already been delivered %% and were pending acknowledgement. --callback requeue([ack()], state()) -> {[rabbit_guid:guid()], state()}. +-callback requeue([ack()], state()) -> {msg_ids(), state()}. %% How long is my queue? -callback len(state()) -> non_neg_integer(). diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl index 40359da3..6cd71fc3 100644 --- a/src/rabbit_mirror_queue_coordinator.erl +++ b/src/rabbit_mirror_queue_coordinator.erl @@ -101,19 +101,25 @@ %% channel during a publish, only some of the mirrors may receive that %% publish. As a result of this problem, the messages broadcast over %% the gm contain published content, and thus slaves can operate -%% successfully on messages that they only receive via the gm. The key -%% purpose of also sending messages directly from the channels to the -%% slaves is that without this, in the event of the death of the -%% master, messages could be lost until a suitable slave is promoted. +%% successfully on messages that they only receive via the gm. %% -%% However, that is not the only reason. For example, if confirms are -%% in use, then there is no guarantee that every slave will see the -%% delivery with the same msg_seq_no. As a result, the slaves have to -%% wait until they've seen both the publish via gm, and the publish -%% via the channel before they have enough information to be able to -%% perform the publish to their own bq, and subsequently issue the -%% confirm, if necessary. Either form of publish can arrive first, and -%% a slave can be upgraded to the master at any point during this +%% The key purpose of also sending messages directly from the channels +%% to the slaves is that without this, in the event of the death of +%% the master, messages could be lost until a suitable slave is +%% promoted. However, that is not the only reason. A slave cannot send +%% confirms for a message until it has seen it from the +%% channel. Otherwise, it might send a confirm to a channel for a +%% message that it might *never* receive from that channel. This can +%% happen because new slaves join the gm ring (and thus receive +%% messages from the master) before inserting themselves in the +%% queue's mnesia record (which is what channels look at for routing). +%% As it turns out, channels will simply ignore such bogus confirms, +%% but relying on that would introduce a dangerously tight coupling. +%% +%% Hence the slaves have to wait until they've seen both the publish +%% via gm, and the publish via the channel before they issue the +%% confirm. Either form of publish can arrive first, and a slave can +%% be upgraded to the master at any point during this %% process. Confirms continue to be issued correctly, however. %% %% Because the slave is a full process, it impersonates parts of the @@ -344,10 +350,9 @@ handle_cast({gm_deaths, Deaths}, State = #state { q = #amqqueue { name = QueueName, pid = MPid } }) when node(MPid) =:= node() -> case rabbit_mirror_queue_misc:remove_from_queue(QueueName, Deaths) of - {ok, MPid, DeadPids, ExtraNodes} -> + {ok, MPid, DeadPids} -> rabbit_mirror_queue_misc:report_deaths(MPid, true, QueueName, DeadPids), - rabbit_mirror_queue_misc:add_mirrors(QueueName, ExtraNodes), noreply(State); {error, not_found} -> {stop, normal, State} @@ -398,8 +403,6 @@ members_changed([_CPid], _Births, []) -> members_changed([CPid], _Births, Deaths) -> ok = gen_server2:cast(CPid, {gm_deaths, Deaths}). -handle_msg([_CPid], _From, master_changed) -> - ok; handle_msg([CPid], _From, request_depth = Msg) -> ok = gen_server2:cast(CPid, Msg); handle_msg([CPid], _From, {ensure_monitoring, _Pids} = Msg) -> diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 15ab9424..6dac2808 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -88,18 +88,19 @@ stop() -> %% Same as start/1. exit({not_valid_for_generic_backing_queue, ?MODULE}). -init(Q, Recover, AsyncCallback) -> +init(Q = #amqqueue{name = QName}, Recover, AsyncCallback) -> {ok, BQ} = application:get_env(backing_queue_module), BQS = BQ:init(Q, Recover, AsyncCallback), - init_with_existing_bq(Q, BQ, BQS). + State = #state{gm = GM} = init_with_existing_bq(Q, BQ, BQS), + {_MNode, SNodes} = rabbit_mirror_queue_misc:suggested_queue_nodes(Q), + rabbit_mirror_queue_misc:add_mirrors(QName, SNodes), + ok = gm:broadcast(GM, {depth, BQ:depth(BQS)}), + State. -init_with_existing_bq(#amqqueue { name = QName } = Q, BQ, BQS) -> +init_with_existing_bq(Q, BQ, BQS) -> {ok, CPid} = rabbit_mirror_queue_coordinator:start_link( Q, undefined, sender_death_fun(), depth_fun()), GM = rabbit_mirror_queue_coordinator:get_gm(CPid), - {_MNode, SNodes} = rabbit_mirror_queue_misc:suggested_queue_nodes(Q), - rabbit_mirror_queue_misc:add_mirrors(QName, SNodes), - ok = gm:broadcast(GM, {depth, BQ:depth(BQS)}), #state { gm = GM, coordinator = CPid, backing_queue = BQ, @@ -148,7 +149,17 @@ stop_all_slaves(Reason, #state{gm = GM}) -> MRefs = [erlang:monitor(process, S) || S <- Slaves], ok = gm:broadcast(GM, {delete_and_terminate, Reason}), [receive {'DOWN', MRef, process, _Pid, _Info} -> ok end || MRef <- MRefs], - ok = gm:forget_group(proplists:get_value(group_name, Info)). + %% Normally when we remove a slave another slave or master will + %% notice and update Mnesia. But we just removed them all, and + %% have stopped listening ourselves. So manually clean up. + QName = proplists:get_value(group_name, Info), + rabbit_misc:execute_mnesia_transaction( + fun () -> + [Q] = mnesia:read({rabbit_queue, QName}), + rabbit_mirror_queue_misc:store_updated_slaves( + Q #amqqueue { slave_pids = [] }) + end), + ok = gm:forget_group(QName). purge(State = #state { gm = GM, backing_queue = BQ, @@ -368,8 +379,10 @@ discard(Msg = #basic_message { id = MsgId }, ChPid, case dict:find(MsgId, SS) of error -> ok = gm:broadcast(GM, {discard, ChPid, Msg}), - State #state { backing_queue_state = BQ:discard(Msg, ChPid, BQS), - seen_status = dict:erase(MsgId, SS) }; + ensure_monitoring( + ChPid, State #state { + backing_queue_state = BQ:discard(Msg, ChPid, BQS), + seen_status = dict:erase(MsgId, SS) }); {ok, discarded} -> State end. diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index 453f2f2c..b6c229aa 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -31,11 +31,12 @@ -spec(remove_from_queue/2 :: (rabbit_amqqueue:name(), [pid()]) - -> {'ok', pid(), [pid()], [node()]} | {'error', 'not_found'}). + -> {'ok', pid(), [pid()]} | {'error', 'not_found'}). -spec(on_node_up/0 :: () -> 'ok'). -spec(add_mirrors/2 :: (rabbit_amqqueue:name(), [node()]) -> 'ok'). -spec(add_mirror/2 :: - (rabbit_amqqueue:name(), node()) -> rabbit_types:ok_or_error(any())). + (rabbit_amqqueue:name(), node()) -> + {'ok', atom()} | rabbit_types:error(any())). -spec(store_updated_slaves/1 :: (rabbit_types:amqqueue()) -> rabbit_types:amqqueue()). -spec(suggested_queue_nodes/1 :: (rabbit_types:amqqueue()) -> @@ -59,7 +60,6 @@ remove_from_queue(QueueName, DeadGMPids) -> DeadNodes = [node(DeadGMPid) || DeadGMPid <- DeadGMPids], - ClusterNodes = rabbit_mnesia:cluster_nodes(running) -- DeadNodes, rabbit_misc:execute_mnesia_transaction( fun () -> %% Someone else could have deleted the queue before we @@ -73,49 +73,52 @@ remove_from_queue(QueueName, DeadGMPids) -> {QPid1, SPids1} = promote_slave(Alive), case {{QPid, SPids}, {QPid1, SPids1}} of {Same, Same} -> - {ok, QPid1, [], []}; + {ok, QPid1, []}; _ when QPid =:= QPid1 orelse node(QPid1) =:= node() -> %% Either master hasn't changed, so %% we're ok to update mnesia; or we have %% become the master. - Q1 = store_updated_slaves( - Q #amqqueue { pid = QPid1, - slave_pids = SPids1 }), - %% Sometimes a slave dying means we need - %% to start more on other nodes - - %% "exactly" mode can cause this to - %% happen. - {_, OldNodes} = actual_queue_nodes(Q1), - {_, NewNodes} = suggested_queue_nodes( - Q1, ClusterNodes), - {ok, QPid1, [QPid | SPids] -- Alive, - NewNodes -- OldNodes}; + store_updated_slaves( + Q #amqqueue { pid = QPid1, + slave_pids = SPids1 }), + {ok, QPid1, [QPid | SPids] -- Alive}; _ -> %% Master has changed, and we're not it, %% so leave alone to allow the promoted %% slave to find it and make its %% promotion atomic. - {ok, QPid1, [], []} + {ok, QPid1, []} end end end). on_node_up() -> - ClusterNodes = rabbit_mnesia:cluster_nodes(running), QNames = rabbit_misc:execute_mnesia_transaction( fun () -> mnesia:foldl( - fun (Q = #amqqueue{name = QName}, QNames0) -> + fun (Q = #amqqueue{name = QName, + pid = Pid, + slave_pids = SPids}, QNames0) -> + %% We don't want to pass in the whole + %% cluster - we don't want a situation + %% where starting one node causes us to + %% decide to start a mirror on another + PossibleNodes0 = [node(P) || P <- [Pid | SPids]], + PossibleNodes = + case lists:member(node(), PossibleNodes0) of + true -> PossibleNodes0; + false -> [node() | PossibleNodes0] + end, {_MNode, SNodes} = suggested_queue_nodes( - Q, ClusterNodes), + Q, PossibleNodes), case lists:member(node(), SNodes) of true -> [QName | QNames0]; false -> QNames0 end end, [], rabbit_queue) end), - [ok = add_mirror(QName, node()) || QName <- QNames], + [{ok, _} = add_mirror(QName, node()) || QName <- QNames], ok. drop_mirrors(QName, Nodes) -> @@ -141,7 +144,7 @@ drop_mirror(QName, MirrorNode) -> end). add_mirrors(QName, Nodes) -> - [ok = add_mirror(QName, Node) || Node <- Nodes], + [{ok, _} = add_mirror(QName, Node) || Node <- Nodes], ok. add_mirror(QName, MirrorNode) -> @@ -154,8 +157,7 @@ add_mirror(QName, MirrorNode) -> [SPid] -> case rabbit_misc:is_process_alive(SPid) of true -> - {error,{queue_already_mirrored_on_node, - MirrorNode}}; + {ok, already_mirrored}; false -> start_child(Name, MirrorNode, Q) end @@ -171,20 +173,20 @@ start_child(Name, MirrorNode, Q) -> {ok, undefined} -> %% this means the mirror process was %% already running on the given node. - ok; + {ok, already_mirrored}; {ok, down} -> %% Node went down between us deciding to start a mirror %% and actually starting it. Which is fine. - ok; + {ok, node_down}; {ok, SPid} -> rabbit_log:info("Adding mirror of ~s on node ~p: ~p~n", [rabbit_misc:rs(Name), MirrorNode, SPid]), - ok; + {ok, started}; {error, {{stale_master_pid, StalePid}, _}} -> rabbit_log:warning("Detected stale HA master while adding " "mirror of ~s on node ~p: ~p~n", [rabbit_misc:rs(Name), MirrorNode, StalePid]), - ok; + {ok, stale_master}; {error, {{duplicate_live_master, _}=Err, _}} -> Err; Other -> @@ -235,14 +237,14 @@ suggested_queue_nodes(Q) -> %% This variant exists so we can pull a call to %% rabbit_mnesia:cluster_nodes(running) out of a loop or %% transaction or both. -suggested_queue_nodes(Q, ClusterNodes) -> +suggested_queue_nodes(Q, PossibleNodes) -> {MNode0, SNodes} = actual_queue_nodes(Q), MNode = case MNode0 of none -> node(); _ -> MNode0 end, suggested_queue_nodes(policy(<<"ha-mode">>, Q), policy(<<"ha-params">>, Q), - {MNode, SNodes}, ClusterNodes). + {MNode, SNodes}, PossibleNodes). policy(Policy, Q) -> case rabbit_policy:get(Policy, Q) of @@ -250,11 +252,11 @@ policy(Policy, Q) -> _ -> none end. -suggested_queue_nodes(<<"all">>, _Params, {MNode, _SNodes}, All) -> - {MNode, All -- [MNode]}; -suggested_queue_nodes(<<"nodes">>, Nodes0, {MNode, _SNodes}, All) -> +suggested_queue_nodes(<<"all">>, _Params, {MNode, _SNodes}, Possible) -> + {MNode, Possible -- [MNode]}; +suggested_queue_nodes(<<"nodes">>, Nodes0, {MNode, _SNodes}, Possible) -> Nodes = [list_to_atom(binary_to_list(Node)) || Node <- Nodes0], - Unavailable = Nodes -- All, + Unavailable = Nodes -- Possible, Available = Nodes -- Unavailable, case Available of [] -> %% We have never heard of anything? Not much we can do but @@ -265,16 +267,26 @@ suggested_queue_nodes(<<"nodes">>, Nodes0, {MNode, _SNodes}, All) -> false -> promote_slave(Available) end end; -suggested_queue_nodes(<<"exactly">>, Count, {MNode, SNodes}, All) -> +%% When we need to add nodes, we randomise our candidate list as a +%% crude form of load-balancing. TODO it would also be nice to +%% randomise the list of ones to remove when we have too many - but +%% that would fail to take account of synchronisation... +suggested_queue_nodes(<<"exactly">>, Count, {MNode, SNodes}, Possible) -> SCount = Count - 1, {MNode, case SCount > length(SNodes) of - true -> Cand = (All -- [MNode]) -- SNodes, + true -> Cand = shuffle((Possible -- [MNode]) -- SNodes), SNodes ++ lists:sublist(Cand, SCount - length(SNodes)); false -> lists:sublist(SNodes, SCount) end}; suggested_queue_nodes(_, _, {MNode, _}, _) -> {MNode, []}. +shuffle(L) -> + {A1,A2,A3} = now(), + random:seed(A1, A2, A3), + {_, L1} = lists:unzip(lists:keysort(1, [{random:uniform(), N} || N <- L])), + L1. + actual_queue_nodes(#amqqueue{pid = MPid, slave_pids = SPids}) -> {case MPid of none -> none; @@ -310,13 +322,6 @@ update_mirrors0(OldQ = #amqqueue{name = QName}, All = fun ({A,B}) -> [A|B] end, OldNodes = All(actual_queue_nodes(OldQ)), NewNodes = All(suggested_queue_nodes(NewQ)), - %% When a mirror dies, remove_from_queue/2 might have to add new - %% slaves (in "exactly" mode). It will check mnesia to see which - %% slaves there currently are. If drop_mirror/2 is invoked first - %% then when we end up in remove_from_queue/2 it will not see the - %% slaves that add_mirror/2 will add, and also want to add them - %% (even though we are not responding to the death of a - %% mirror). Breakage ensues. add_mirrors(QName, NewNodes -- OldNodes), drop_mirrors(QName, OldNodes -- NewNodes), ok. diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 307f2b4f..f4679184 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -173,7 +173,6 @@ handle_call({deliver, Delivery, true}, From, State) -> handle_call({gm_deaths, Deaths}, From, State = #state { q = #amqqueue { name = QueueName }, - gm = GM, master_pid = MPid }) -> %% The GM has told us about deaths, which means we're not going to %% receive any more messages from GM @@ -181,32 +180,21 @@ handle_call({gm_deaths, Deaths}, From, {error, not_found} -> gen_server2:reply(From, ok), {stop, normal, State}; - {ok, Pid, DeadPids, ExtraNodes} -> + {ok, Pid, DeadPids} -> rabbit_mirror_queue_misc:report_deaths(self(), false, QueueName, DeadPids), if node(Pid) =:= node(MPid) -> %% master hasn't changed gen_server2:reply(From, ok), - rabbit_mirror_queue_misc:add_mirrors(QueueName, ExtraNodes), noreply(State); node(Pid) =:= node() -> %% we've become master QueueState = promote_me(From, State), - rabbit_mirror_queue_misc:add_mirrors(QueueName, ExtraNodes), {become, rabbit_amqqueue_process, QueueState, hibernate}; true -> %% master has changed to not us. gen_server2:reply(From, ok), - %% assertion, we don't need to add_mirrors/2 in this - %% branch, see last clause in remove_from_queue/2 - [] = ExtraNodes, erlang:monitor(process, Pid), - %% GM is lazy. So we know of the death of the - %% slave since it is a neighbour of ours, but - %% until a message is sent, not all members will - %% know. That might include the new master. So - %% broadcast a no-op message to wake everyone up. - ok = gm:broadcast(GM, master_changed), noreply(State #state { master_pid = Pid }) end end; @@ -257,8 +245,8 @@ handle_info(timeout, State) -> noreply(backing_queue_timeout(State)); handle_info({'DOWN', _MonitorRef, process, MPid, _Reason}, - State = #state { gm = GM, master_pid = MPid }) -> - ok = gm:broadcast(GM, {process_death, MPid}), + State = #state { gm = GM, master_pid = MPid }) -> + ok = gm:broadcast(GM, process_death), noreply(State); handle_info({'DOWN', _MonitorRef, process, ChPid, _Reason}, State) -> @@ -345,16 +333,21 @@ joined([SPid], _Members) -> SPid ! {joined, self()}, ok. members_changed([_SPid], _Births, []) -> ok; members_changed([ SPid], _Births, Deaths) -> inform_deaths(SPid, Deaths). -handle_msg([_SPid], _From, master_changed) -> - ok; handle_msg([_SPid], _From, request_depth) -> %% This is only of value to the master ok; handle_msg([_SPid], _From, {ensure_monitoring, _Pid}) -> %% This is only of value to the master ok; -handle_msg([SPid], _From, {process_death, Pid}) -> - inform_deaths(SPid, [Pid]); +handle_msg([_SPid], _From, process_death) -> + %% Since GM is by nature lazy we need to make sure there is some + %% traffic when a master dies, to make sure we get informed of the + %% death. That's all process_death does, create some traffic. We + %% must not take any notice of the master death here since it + %% comes without ordering guarantees - there could still be + %% messages from the master we have yet to receive. When we get + %% members_changed, then there will be no more messages. + ok; handle_msg([CPid], _From, {delete_and_terminate, _Reason} = Msg) -> ok = gen_server2:cast(CPid, {gm, Msg}), {stop, {shutdown, ring_shutdown}}; @@ -413,16 +406,16 @@ confirm_messages(MsgIds, State = #state { msg_id_status = MS }) -> %% If it needed confirming, it'll have %% already been done. Acc; - {ok, {published, ChPid}} -> + {ok, published} -> %% Still not seen it from the channel, just %% record that it's been confirmed. - {CMsN, dict:store(MsgId, {confirmed, ChPid}, MSN)}; + {CMsN, dict:store(MsgId, confirmed, MSN)}; {ok, {published, ChPid, MsgSeqNo}} -> %% Seen from both GM and Channel. Can now %% confirm. {rabbit_misc:gb_trees_cons(ChPid, MsgSeqNo, CMsN), dict:erase(MsgId, MSN)}; - {ok, {confirmed, _ChPid}} -> + {ok, confirmed} -> %% It's already been confirmed. This is %% probably it's been both sync'd to disk %% and then delivered and ack'd before we've @@ -454,9 +447,6 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName }, rabbit_mirror_queue_master:depth_fun()), true = unlink(GM), gen_server2:reply(From, {promote, CPid}), - %% TODO this has been in here since the beginning, but it's not - %% obvious if it is needed. Investigate... - ok = gm:confirmed_broadcast(GM, master_changed), %% Everything that we're monitoring, we need to ensure our new %% coordinator is monitoring. @@ -492,18 +482,18 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName }, %% %% MS contains the following three entry types: %% - %% a) {published, ChPid}: + %% a) published: %% published via gm only; pending arrival of publication from %% channel, maybe pending confirm. %% %% b) {published, ChPid, MsgSeqNo}: %% published via gm and channel; pending confirm. %% - %% c) {confirmed, ChPid}: + %% c) confirmed: %% published via gm only, and confirmed; pending publication %% from channel. %% - %% d) discarded + %% d) discarded: %% seen via gm only as discarded. Pending publication from %% channel %% @@ -521,22 +511,18 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName }, %% this does not affect MS, nor which bits go through to SS in %% Master, or MTC in queue_process. - MSList = dict:to_list(MS), - SS = dict:from_list( - [E || E = {_MsgId, discarded} <- MSList] ++ - [{MsgId, Status} - || {MsgId, {Status, _ChPid}} <- MSList, - Status =:= published orelse Status =:= confirmed]), + St = [published, confirmed, discarded], + SS = dict:filter(fun (_MsgId, Status) -> lists:member(Status, St) end, MS), AckTags = [AckTag || {_MsgId, AckTag} <- dict:to_list(MA)], MasterState = rabbit_mirror_queue_master:promote_backing_queue_state( CPid, BQ, BQS, GM, AckTags, SS, MPids), - MTC = lists:foldl(fun ({MsgId, {published, ChPid, MsgSeqNo}}, MTC0) -> - gb_trees:insert(MsgId, {ChPid, MsgSeqNo}, MTC0); - (_, MTC0) -> - MTC0 - end, gb_trees:empty(), MSList), + MTC = dict:fold(fun (MsgId, {published, ChPid, MsgSeqNo}, MTC0) -> + gb_trees:insert(MsgId, {ChPid, MsgSeqNo}, MTC0); + (_Msgid, _Status, MTC0) -> + MTC0 + end, gb_trees:empty(), MS), Deliveries = [Delivery || {_ChPid, {PubQ, _PendCh}} <- dict:to_list(SQ), Delivery <- queue:to_list(PubQ)], rabbit_amqqueue_process:init_with_backing_queue_state( @@ -647,16 +633,12 @@ maybe_enqueue_message( MQ1 = queue:in(Delivery, MQ), SQ1 = dict:store(ChPid, {MQ1, PendingCh}, SQ), State1 #state { sender_queues = SQ1 }; - {ok, {confirmed, ChPid}} -> - %% BQ has confirmed it but we didn't know what the - %% msg_seq_no was at the time. We do now! + {ok, confirmed} -> ok = rabbit_misc:confirm_to_sender(ChPid, [MsgSeqNo]), SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ), State1 #state { msg_id_status = dict:erase(MsgId, MS), sender_queues = SQ1 }; - {ok, {published, ChPid}} -> - %% It was published to the BQ and we didn't know the - %% msg_seq_no so couldn't confirm it at the time. + {ok, published} -> {MS1, SQ1} = case needs_confirming(Delivery, State1) of never -> {dict:erase(MsgId, MS), @@ -700,20 +682,17 @@ process_instruction( msg_id_status = MS }) -> %% We really are going to do the publish right now, even though we - %% may not have seen it directly from the channel. As a result, we - %% may know that it needs confirming without knowing its - %% msg_seq_no, which means that we can see the confirmation come - %% back from the backing queue without knowing the msg_seq_no, - %% which means that we're going to have to hang on to the fact - %% that we've seen the msg_id confirmed until we can associate it - %% with a msg_seq_no. + %% may not have seen it directly from the channel. But we cannot + %% issues confirms until the latter has happened. So we need to + %% keep track of the MsgId and its confirmation status in the + %% meantime. State1 = ensure_monitoring(ChPid, State), {MQ, PendingCh} = get_sender_queue(ChPid, SQ), {MQ1, PendingCh1, MS1} = case queue:out(MQ) of {empty, _MQ2} -> {MQ, sets:add_element(MsgId, PendingCh), - dict:store(MsgId, {published, ChPid}, MS)}; + dict:store(MsgId, published, MS)}; {{value, Delivery = #delivery { msg_seq_no = MsgSeqNo, message = #basic_message { id = MsgId } }}, MQ2} -> diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 6d6c648a..21f58154 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -537,7 +537,7 @@ queue_index_walker_reader(QueueName, Gatherer) -> State = blank_state(QueueName), ok = scan_segments( fun (_SeqId, MsgId, _MsgProps, true, _IsDelivered, no_ack, ok) -> - gatherer:in(Gatherer, {MsgId, 1}); + gatherer:sync_in(Gatherer, {MsgId, 1}); (_SeqId, _MsgId, _MsgProps, _IsPersistent, _IsDelivered, _IsAcked, Acc) -> Acc diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 11f280bb..2e26837d 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -886,38 +886,49 @@ test_arguments_parser() -> test_dynamic_mirroring() -> %% Just unit tests of the node selection logic, see multi node %% tests for the rest... - Test = fun ({NewM, NewSs}, Policy, Params, {OldM, OldSs}, All) -> + Test = fun ({NewM, NewSs, ExtraSs}, Policy, Params, {OldM, OldSs}, All) -> {NewM, NewSs0} = rabbit_mirror_queue_misc:suggested_queue_nodes( Policy, Params, {OldM, OldSs}, All), - NewSs = lists:sort(NewSs0) + NewSs1 = lists:sort(NewSs0), + case dm_list_match(NewSs, NewSs1, ExtraSs) of + ok -> ok; + error -> exit({no_match, NewSs, NewSs1, ExtraSs}) + end end, - Test({a,[b,c]},<<"all">>,'_',{a,[]}, [a,b,c]), - Test({a,[b,c]},<<"all">>,'_',{a,[b,c]},[a,b,c]), - Test({a,[b,c]},<<"all">>,'_',{a,[d]}, [a,b,c]), + Test({a,[b,c],0},<<"all">>,'_',{a,[]}, [a,b,c]), + Test({a,[b,c],0},<<"all">>,'_',{a,[b,c]},[a,b,c]), + Test({a,[b,c],0},<<"all">>,'_',{a,[d]}, [a,b,c]), %% Add a node - Test({a,[b,c]},<<"nodes">>,[<<"a">>,<<"b">>,<<"c">>],{a,[b]},[a,b,c,d]), - Test({b,[a,c]},<<"nodes">>,[<<"a">>,<<"b">>,<<"c">>],{b,[a]},[a,b,c,d]), + Test({a,[b,c],0},<<"nodes">>,[<<"a">>,<<"b">>,<<"c">>],{a,[b]},[a,b,c,d]), + Test({b,[a,c],0},<<"nodes">>,[<<"a">>,<<"b">>,<<"c">>],{b,[a]},[a,b,c,d]), %% Add two nodes and drop one - Test({a,[b,c]},<<"nodes">>,[<<"a">>,<<"b">>,<<"c">>],{a,[d]},[a,b,c,d]), + Test({a,[b,c],0},<<"nodes">>,[<<"a">>,<<"b">>,<<"c">>],{a,[d]},[a,b,c,d]), %% Promote slave to master by policy - Test({a,[b,c]},<<"nodes">>,[<<"a">>,<<"b">>,<<"c">>],{d,[a]},[a,b,c,d]), + Test({a,[b,c],0},<<"nodes">>,[<<"a">>,<<"b">>,<<"c">>],{d,[a]},[a,b,c,d]), %% Don't try to include nodes that are not running - Test({a,[b]}, <<"nodes">>,[<<"a">>,<<"b">>,<<"f">>],{a,[b]},[a,b,c,d]), + Test({a,[b], 0},<<"nodes">>,[<<"a">>,<<"b">>,<<"f">>],{a,[b]},[a,b,c,d]), %% If we can't find any of the nodes listed then just keep the master - Test({a,[]}, <<"nodes">>,[<<"f">>,<<"g">>,<<"h">>],{a,[b]},[a,b,c,d]), + Test({a,[], 0},<<"nodes">>,[<<"f">>,<<"g">>,<<"h">>],{a,[b]},[a,b,c,d]), - Test({a,[b]}, <<"exactly">>,2,{a,[]}, [a,b,c,d]), - Test({a,[b,c]},<<"exactly">>,3,{a,[]}, [a,b,c,d]), - Test({a,[c]}, <<"exactly">>,2,{a,[c]}, [a,b,c,d]), - Test({a,[b,c]},<<"exactly">>,3,{a,[c]}, [a,b,c,d]), - Test({a,[c]}, <<"exactly">>,2,{a,[c,d]},[a,b,c,d]), - Test({a,[c,d]},<<"exactly">>,3,{a,[c,d]},[a,b,c,d]), + Test({a,[], 1},<<"exactly">>,2,{a,[]}, [a,b,c,d]), + Test({a,[], 2},<<"exactly">>,3,{a,[]}, [a,b,c,d]), + Test({a,[c], 0},<<"exactly">>,2,{a,[c]}, [a,b,c,d]), + Test({a,[c], 1},<<"exactly">>,3,{a,[c]}, [a,b,c,d]), + Test({a,[c], 0},<<"exactly">>,2,{a,[c,d]},[a,b,c,d]), + Test({a,[c,d],0},<<"exactly">>,3,{a,[c,d]},[a,b,c,d]), passed. +%% Does the first list match the second where the second is required +%% to have exactly Extra superfluous items? +dm_list_match([], [], 0) -> ok; +dm_list_match(_, [], _Extra) -> error; +dm_list_match([H|T1], [H |T2], Extra) -> dm_list_match(T1, T2, Extra); +dm_list_match(L1, [_H|T2], Extra) -> dm_list_match(L1, T2, Extra - 1). + test_user_management() -> %% lots if stuff that should fail diff --git a/src/rabbit_vm.erl b/src/rabbit_vm.erl new file mode 100644 index 00000000..53f3df18 --- /dev/null +++ b/src/rabbit_vm.erl @@ -0,0 +1,129 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% + +-module(rabbit_vm). + +-export([memory/0]). + +-define(MAGIC_PLUGINS, ["mochiweb", "webmachine", "cowboy", "sockjs", + "rfc4627_jsonrpc"]). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(memory/0 :: () -> rabbit_types:infos()). + +-endif. + +%%---------------------------------------------------------------------------- + +%% Like erlang:memory(), but with awareness of rabbit-y things +memory() -> + Conns = (sup_memory(rabbit_tcp_client_sup) + + sup_memory(ssl_connection_sup) + + sup_memory(amqp_sup)), + Qs = (sup_memory(rabbit_amqqueue_sup) + + sup_memory(rabbit_mirror_queue_slave_sup)), + Mnesia = mnesia_memory(), + MsgIndexETS = ets_memory(rabbit_msg_store_ets_index), + MsgIndexProc = (pid_memory(msg_store_transient) + + pid_memory(msg_store_persistent)), + MgmtDbETS = ets_memory(rabbit_mgmt_db), + MgmtDbProc = sup_memory(rabbit_mgmt_sup), + Plugins = plugin_memory() - MgmtDbProc, + + [{total, Total}, + {processes, Processes}, + {ets, ETS}, + {atom, Atom}, + {binary, Bin}, + {code, Code}, + {system, System}] = + erlang:memory([total, processes, ets, atom, binary, code, system]), + + OtherProc = Processes - Conns - Qs - MsgIndexProc - MgmtDbProc - Plugins, + + [{total, Total}, + {connection_procs, Conns}, + {queue_procs, Qs}, + {plugins, Plugins}, + {other_proc, lists:max([0, OtherProc])}, %% [1] + {mnesia, Mnesia}, + {mgmt_db, MgmtDbETS + MgmtDbProc}, + {msg_index, MsgIndexETS + MsgIndexProc}, + {other_ets, ETS - Mnesia - MsgIndexETS - MgmtDbETS}, + {binary, Bin}, + {code, Code}, + {atom, Atom}, + {other_system, System - ETS - Atom - Bin - Code}]. + +%% [1] - erlang:memory(processes) can be less than the sum of its +%% parts. Rather than display something nonsensical, just silence any +%% claims about negative memory. See +%% http://erlang.org/pipermail/erlang-questions/2012-September/069320.html + +%%---------------------------------------------------------------------------- + +sup_memory(Sup) -> + lists:sum([child_memory(P, T) || {_, P, T, _} <- sup_children(Sup)]) + + pid_memory(Sup). + +sup_children(Sup) -> + rabbit_misc:with_exit_handler( + rabbit_misc:const([]), fun () -> supervisor:which_children(Sup) end). + +pid_memory(Pid) when is_pid(Pid) -> case process_info(Pid, memory) of + {memory, M} -> M; + _ -> 0 + end; +pid_memory(Name) when is_atom(Name) -> case whereis(Name) of + P when is_pid(P) -> pid_memory(P); + _ -> 0 + end. + +child_memory(Pid, worker) when is_pid (Pid) -> pid_memory(Pid); +child_memory(Pid, supervisor) when is_pid (Pid) -> sup_memory(Pid); +child_memory(_, _) -> 0. + +mnesia_memory() -> + case mnesia:system_info(is_running) of + yes -> lists:sum([bytes(mnesia:table_info(Tab, memory)) || + Tab <- mnesia:system_info(tables)]); + no -> 0 + end. + +ets_memory(Name) -> + lists:sum([bytes(ets:info(T, memory)) || T <- ets:all(), + N <- [ets:info(T, name)], + N =:= Name]). + +bytes(Words) -> Words * erlang:system_info(wordsize). + +plugin_memory() -> + lists:sum([plugin_memory(App) || + {App, _, _} <- application:which_applications(), + is_plugin(atom_to_list(App))]). + +plugin_memory(App) -> + case catch application_master:get_child( + application_controller:get_master(App)) of + {Pid, _} -> sup_memory(Pid); + _ -> 0 + end. + +is_plugin("rabbitmq_" ++ _) -> true; +is_plugin(App) -> lists:member(App, ?MAGIC_PLUGINS). |