diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2014-10-10 15:29:13 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2014-10-10 15:29:13 +0100 |
commit | 16d135e7ff952ee3efeacab5b1d34134f67de2ce (patch) | |
tree | 2e751f0b9c8e6e10a625f070629d5065b7a6e6a1 | |
parent | c8e2664f60e13f74ebe712f52ddf467f4fd53343 (diff) | |
parent | ba1c1d0c31e7038cd9f64577d24a288e515f004f (diff) | |
download | rabbitmq-server-16d135e7ff952ee3efeacab5b1d34134f67de2ce.tar.gz |
Merge bug26408 (again)
-rw-r--r-- | src/rabbit_node_monitor.erl | 37 | ||||
-rw-r--r-- | test/src/rabbit_tests.erl | 25 |
2 files changed, 36 insertions, 26 deletions
diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl index 933beed7..a948115d 100644 --- a/src/rabbit_node_monitor.erl +++ b/src/rabbit_node_monitor.erl @@ -38,7 +38,6 @@ -define(SERVER, ?MODULE). -define(RABBIT_UP_RPC_TIMEOUT, 2000). -define(RABBIT_DOWN_PING_INTERVAL, 1000). --define(PARTIAL_PARTITION_NOTIFICATION_DELAY, 1000). -record(state, {monitors, partitions, subscribers, down_ping_timer, keepalive_timer, autoheal, guid, node_guids}). @@ -305,26 +304,26 @@ handle_cast({node_up, Node, NodeType, GUID}, handle_cast({announce_guid, Node, GUID}, State = #state{node_guids = GUIDs}) -> {noreply, State#state{node_guids = orddict:store(Node, GUID, GUIDs)}}; -handle_cast({check_partial_partition, Node, NodeGUID, Reporter, MyGUID}, - State = #state{guid = MyGUID}) -> - case lists:member(Node, alive_nodes()) of - true -> erlang:send_after( - ?PARTIAL_PARTITION_NOTIFICATION_DELAY, self(), - {send_partial_partition, Node, NodeGUID, Reporter}); +handle_cast({check_partial_partition, Node, Rep, NodeGUID, MyGUID, RepGUID}, + State = #state{guid = MyGUID, + node_guids = GUIDs}) -> + case lists:member(Node, rabbit_mnesia:cluster_nodes(running)) andalso + orddict:find(Node, GUIDs) =:= {ok, NodeGUID} of + true -> cast(Rep, {partial_partition, Node, node(), RepGUID}); false -> ok end, {noreply, State}; -handle_cast({check_partial_partition, _Node, _NodeGUID, _Reporter, _GUID}, - State) -> +handle_cast({check_partial_partition, _Node, _Reporter, + _NodeGUID, _GUID, _ReporterGUID}, State) -> {noreply, State}; -handle_cast({partial_partition, GUID, Reporter, Proxy}, - State = #state{guid = GUID}) -> +handle_cast({partial_partition, NotReallyDown, Proxy, MyGUID}, + State = #state{guid = MyGUID}) -> FmtBase = "Partial partition detected:~n" - " * This node was reported DOWN by ~s~n" + " * We saw DOWN from ~s~n" " * We can still see ~s which can see ~s~n", - ArgsBase = [Reporter, Proxy, Reporter], + ArgsBase = [NotReallyDown, Proxy, NotReallyDown], case application:get_env(rabbit, cluster_partition_handling) of {ok, pause_minority} -> rabbit_log:error( @@ -403,15 +402,17 @@ handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #state{subscribers = Subscribers}) -> {noreply, State#state{subscribers = pmon:erase(Pid, Subscribers)}}; -handle_info({nodedown, Node, Info}, State = #state{node_guids = GUIDs}) -> +handle_info({nodedown, Node, Info}, State = #state{guid = MyGUID, + node_guids = GUIDs}) -> rabbit_log:info("node ~p down: ~p~n", [Node, proplists:get_value(nodedown_reason, Info)]), Check = fun (N, CheckGUID, DownGUID) -> cast(N, {check_partial_partition, - Node, DownGUID, node(), CheckGUID}) + Node, node(), DownGUID, CheckGUID, MyGUID}) end, case orddict:find(Node, GUIDs) of - {ok, DownGUID} -> Alive = alive_nodes() -- [node(), Node], + {ok, DownGUID} -> Alive = rabbit_mnesia:cluster_nodes(running) + -- [node(), Node], [case orddict:find(N, GUIDs) of {ok, CheckGUID} -> Check(N, CheckGUID, DownGUID); error -> ok @@ -469,10 +470,6 @@ handle_info(ping_up_nodes, State) -> [cast(N, keepalive) || N <- alive_nodes() -- [node()]], {noreply, ensure_keepalive_timer(State#state{keepalive_timer = undefined})}; -handle_info({send_partial_partition, Node, NodeGUID, Reporter}, State) -> - cast(Node, {partial_partition, NodeGUID, Reporter, node()}), - {noreply, State}; - handle_info(_Info, State) -> {noreply, State}. diff --git a/test/src/rabbit_tests.erl b/test/src/rabbit_tests.erl index 9cef22c1..ef6b756b 100644 --- a/test/src/rabbit_tests.erl +++ b/test/src/rabbit_tests.erl @@ -1979,6 +1979,11 @@ msg_store_write(MsgIds, MSCState) -> rabbit_msg_store:write(MsgId, MsgId, MSCState) end, ok, MsgIds). +msg_store_write_flow(MsgIds, MSCState) -> + ok = lists:foldl(fun (MsgId, ok) -> + rabbit_msg_store:write_flow(MsgId, MsgId, MSCState) + end, ok, MsgIds). + msg_store_remove(MsgIds, MSCState) -> rabbit_msg_store:remove(MsgIds, MSCState). @@ -2169,18 +2174,26 @@ test_msg_store_confirm_timer() -> end end, undefined), ok = msg_store_write([MsgId], MSCState), - ok = msg_store_keep_busy_until_confirm([msg_id_bin(2)], MSCState), + ok = msg_store_keep_busy_until_confirm([msg_id_bin(2)], MSCState, false), ok = msg_store_remove([MsgId], MSCState), ok = rabbit_msg_store:client_delete_and_terminate(MSCState), passed. -msg_store_keep_busy_until_confirm(MsgIds, MSCState) -> +msg_store_keep_busy_until_confirm(MsgIds, MSCState, Blocked) -> + After = case Blocked of + false -> 0; + true -> ?MAX_WAIT + end, + Recurse = fun () -> msg_store_keep_busy_until_confirm( + MsgIds, MSCState, credit_flow:blocked()) end, receive - on_disk -> ok - after 0 -> - ok = msg_store_write(MsgIds, MSCState), + on_disk -> ok; + {bump_credit, Msg} -> credit_flow:handle_bump_msg(Msg), + Recurse() + after After -> + ok = msg_store_write_flow(MsgIds, MSCState), ok = msg_store_remove(MsgIds, MSCState), - msg_store_keep_busy_until_confirm(MsgIds, MSCState) + Recurse() end. test_msg_store_client_delete_and_terminate() -> |