summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-10-10 15:29:13 +0100
committerSimon MacMullen <simon@rabbitmq.com>2014-10-10 15:29:13 +0100
commit16d135e7ff952ee3efeacab5b1d34134f67de2ce (patch)
tree2e751f0b9c8e6e10a625f070629d5065b7a6e6a1
parentc8e2664f60e13f74ebe712f52ddf467f4fd53343 (diff)
parentba1c1d0c31e7038cd9f64577d24a288e515f004f (diff)
downloadrabbitmq-server-16d135e7ff952ee3efeacab5b1d34134f67de2ce.tar.gz
Merge bug26408 (again)
-rw-r--r--src/rabbit_node_monitor.erl37
-rw-r--r--test/src/rabbit_tests.erl25
2 files changed, 36 insertions, 26 deletions
diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl
index 933beed7..a948115d 100644
--- a/src/rabbit_node_monitor.erl
+++ b/src/rabbit_node_monitor.erl
@@ -38,7 +38,6 @@
-define(SERVER, ?MODULE).
-define(RABBIT_UP_RPC_TIMEOUT, 2000).
-define(RABBIT_DOWN_PING_INTERVAL, 1000).
--define(PARTIAL_PARTITION_NOTIFICATION_DELAY, 1000).
-record(state, {monitors, partitions, subscribers, down_ping_timer,
keepalive_timer, autoheal, guid, node_guids}).
@@ -305,26 +304,26 @@ handle_cast({node_up, Node, NodeType, GUID},
handle_cast({announce_guid, Node, GUID}, State = #state{node_guids = GUIDs}) ->
{noreply, State#state{node_guids = orddict:store(Node, GUID, GUIDs)}};
-handle_cast({check_partial_partition, Node, NodeGUID, Reporter, MyGUID},
- State = #state{guid = MyGUID}) ->
- case lists:member(Node, alive_nodes()) of
- true -> erlang:send_after(
- ?PARTIAL_PARTITION_NOTIFICATION_DELAY, self(),
- {send_partial_partition, Node, NodeGUID, Reporter});
+handle_cast({check_partial_partition, Node, Rep, NodeGUID, MyGUID, RepGUID},
+ State = #state{guid = MyGUID,
+ node_guids = GUIDs}) ->
+ case lists:member(Node, rabbit_mnesia:cluster_nodes(running)) andalso
+ orddict:find(Node, GUIDs) =:= {ok, NodeGUID} of
+ true -> cast(Rep, {partial_partition, Node, node(), RepGUID});
false -> ok
end,
{noreply, State};
-handle_cast({check_partial_partition, _Node, _NodeGUID, _Reporter, _GUID},
- State) ->
+handle_cast({check_partial_partition, _Node, _Reporter,
+ _NodeGUID, _GUID, _ReporterGUID}, State) ->
{noreply, State};
-handle_cast({partial_partition, GUID, Reporter, Proxy},
- State = #state{guid = GUID}) ->
+handle_cast({partial_partition, NotReallyDown, Proxy, MyGUID},
+ State = #state{guid = MyGUID}) ->
FmtBase = "Partial partition detected:~n"
- " * This node was reported DOWN by ~s~n"
+ " * We saw DOWN from ~s~n"
" * We can still see ~s which can see ~s~n",
- ArgsBase = [Reporter, Proxy, Reporter],
+ ArgsBase = [NotReallyDown, Proxy, NotReallyDown],
case application:get_env(rabbit, cluster_partition_handling) of
{ok, pause_minority} ->
rabbit_log:error(
@@ -403,15 +402,17 @@ handle_info({'DOWN', _MRef, process, Pid, _Reason},
State = #state{subscribers = Subscribers}) ->
{noreply, State#state{subscribers = pmon:erase(Pid, Subscribers)}};
-handle_info({nodedown, Node, Info}, State = #state{node_guids = GUIDs}) ->
+handle_info({nodedown, Node, Info}, State = #state{guid = MyGUID,
+ node_guids = GUIDs}) ->
rabbit_log:info("node ~p down: ~p~n",
[Node, proplists:get_value(nodedown_reason, Info)]),
Check = fun (N, CheckGUID, DownGUID) ->
cast(N, {check_partial_partition,
- Node, DownGUID, node(), CheckGUID})
+ Node, node(), DownGUID, CheckGUID, MyGUID})
end,
case orddict:find(Node, GUIDs) of
- {ok, DownGUID} -> Alive = alive_nodes() -- [node(), Node],
+ {ok, DownGUID} -> Alive = rabbit_mnesia:cluster_nodes(running)
+ -- [node(), Node],
[case orddict:find(N, GUIDs) of
{ok, CheckGUID} -> Check(N, CheckGUID, DownGUID);
error -> ok
@@ -469,10 +470,6 @@ handle_info(ping_up_nodes, State) ->
[cast(N, keepalive) || N <- alive_nodes() -- [node()]],
{noreply, ensure_keepalive_timer(State#state{keepalive_timer = undefined})};
-handle_info({send_partial_partition, Node, NodeGUID, Reporter}, State) ->
- cast(Node, {partial_partition, NodeGUID, Reporter, node()}),
- {noreply, State};
-
handle_info(_Info, State) ->
{noreply, State}.
diff --git a/test/src/rabbit_tests.erl b/test/src/rabbit_tests.erl
index 9cef22c1..ef6b756b 100644
--- a/test/src/rabbit_tests.erl
+++ b/test/src/rabbit_tests.erl
@@ -1979,6 +1979,11 @@ msg_store_write(MsgIds, MSCState) ->
rabbit_msg_store:write(MsgId, MsgId, MSCState)
end, ok, MsgIds).
+msg_store_write_flow(MsgIds, MSCState) ->
+ ok = lists:foldl(fun (MsgId, ok) ->
+ rabbit_msg_store:write_flow(MsgId, MsgId, MSCState)
+ end, ok, MsgIds).
+
msg_store_remove(MsgIds, MSCState) ->
rabbit_msg_store:remove(MsgIds, MSCState).
@@ -2169,18 +2174,26 @@ test_msg_store_confirm_timer() ->
end
end, undefined),
ok = msg_store_write([MsgId], MSCState),
- ok = msg_store_keep_busy_until_confirm([msg_id_bin(2)], MSCState),
+ ok = msg_store_keep_busy_until_confirm([msg_id_bin(2)], MSCState, false),
ok = msg_store_remove([MsgId], MSCState),
ok = rabbit_msg_store:client_delete_and_terminate(MSCState),
passed.
-msg_store_keep_busy_until_confirm(MsgIds, MSCState) ->
+msg_store_keep_busy_until_confirm(MsgIds, MSCState, Blocked) ->
+ After = case Blocked of
+ false -> 0;
+ true -> ?MAX_WAIT
+ end,
+ Recurse = fun () -> msg_store_keep_busy_until_confirm(
+ MsgIds, MSCState, credit_flow:blocked()) end,
receive
- on_disk -> ok
- after 0 ->
- ok = msg_store_write(MsgIds, MSCState),
+ on_disk -> ok;
+ {bump_credit, Msg} -> credit_flow:handle_bump_msg(Msg),
+ Recurse()
+ after After ->
+ ok = msg_store_write_flow(MsgIds, MSCState),
ok = msg_store_remove(MsgIds, MSCState),
- msg_store_keep_busy_until_confirm(MsgIds, MSCState)
+ Recurse()
end.
test_msg_store_client_delete_and_terminate() ->