summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2011-03-09 12:10:40 +0000
committerAlexandru Scvortov <alexandru@rabbitmq.com>2011-03-09 12:10:40 +0000
commit395d873a5c7516910f7768787a6676b2260f0ffe (patch)
treeacc439977842204f7591dcb3a95c265596dd2e00
parenta3f01f3123c3c4b5d5ab6353a5121b5a1d5a999c (diff)
parent84594b1b331375c6b8f37372b2e68cf41fba6c68 (diff)
downloadrabbitmq-server-395d873a5c7516910f7768787a6676b2260f0ffe.tar.gz
merge bug23920 into default
-rw-r--r--src/rabbit_channel.erl32
-rw-r--r--src/rabbit_tests.erl77
2 files changed, 95 insertions, 14 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 526fb428..f584ff32 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -298,12 +298,13 @@ handle_info({'DOWN', _MRef, process, QPid, Reason},
%% process_confirms to prevent each MsgSeqNo being removed from
%% the set one by one which which would be inefficient
State1 = State#ch{unconfirmed_qm = gb_trees:delete_any(QPid, UQM)},
- {MXs, State2} = process_confirms(MsgSeqNos, QPid, State1),
+ {Nack, SendFun} = case Reason of
+ normal -> {false, fun record_confirms/2};
+ _ -> {true, fun send_nacks/2}
+ end,
+ {MXs, State2} = process_confirms(MsgSeqNos, QPid, Nack, State1),
erase_queue_stats(QPid),
- State3 = (case Reason of
- normal -> fun record_confirms/2;
- _ -> fun send_nacks/2
- end)(MXs, State2),
+ State3 = SendFun(MXs, State2),
noreply(queue_blocked(QPid, State3)).
handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) ->
@@ -513,23 +514,24 @@ record_confirms(MXs, State = #ch{confirmed = C}) ->
confirm([], _QPid, State) ->
State;
confirm(MsgSeqNos, QPid, State) ->
- {MXs, State1} = process_confirms(MsgSeqNos, QPid, State),
+ {MXs, State1} = process_confirms(MsgSeqNos, QPid, false, State),
record_confirms(MXs, State1).
-process_confirms(MsgSeqNos, QPid, State = #ch{unconfirmed_mq = UMQ,
- unconfirmed_qm = UQM}) ->
+process_confirms(MsgSeqNos, QPid, Nack, State = #ch{unconfirmed_mq = UMQ,
+ unconfirmed_qm = UQM}) ->
{MXs, UMQ1, UQM1} =
lists:foldl(
- fun(MsgSeqNo, {_DMs, UMQ0, _UQM} = Acc) ->
+ fun(MsgSeqNo, {_MXs, UMQ0, _UQM} = Acc) ->
case gb_trees:lookup(MsgSeqNo, UMQ0) of
- {value, XQ} -> remove_unconfirmed(MsgSeqNo, QPid, XQ, Acc,
- State);
+ {value, XQ} -> remove_unconfirmed(MsgSeqNo, QPid, XQ,
+ Acc, Nack, State);
none -> Acc
end
end, {[], UMQ, UQM}, MsgSeqNos),
{MXs, State#ch{unconfirmed_mq = UMQ1, unconfirmed_qm = UQM1}}.
-remove_unconfirmed(MsgSeqNo, QPid, {XName, Qs}, {MXs, UMQ, UQM}, State) ->
+remove_unconfirmed(MsgSeqNo, QPid, {XName, Qs}, {MXs, UMQ, UQM}, Nack,
+ State) ->
%% these confirms will be emitted even when a queue dies, but that
%% should be fine, since the queue stats get erased immediately
maybe_incr_stats([{{QPid, XName}, 1}], confirm, State),
@@ -544,8 +546,10 @@ remove_unconfirmed(MsgSeqNo, QPid, {XName, Qs}, {MXs, UMQ, UQM}, State) ->
UQM
end,
Qs1 = gb_sets:del_element(QPid, Qs),
- case gb_sets:is_empty(Qs1) of
- true ->
+ %% If QPid somehow died initiating a nack, clear the message from
+ %% internal data-structures. Also, cleanup empty entries.
+ case (Nack orelse gb_sets:is_empty(Qs1)) of
+ true ->
{[{MsgSeqNo, XName} | MXs], gb_trees:delete(MsgSeqNo, UMQ), UQM1};
false ->
{MXs, gb_trees:update(MsgSeqNo, {XName, Qs1}, UMQ), UQM1}
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index d6af16ac..9547cae5 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -57,6 +57,7 @@ all_tests() ->
passed = test_cluster_management(),
passed = test_user_management(),
passed = test_server_status(),
+ passed = test_confirms(),
passed = maybe_run_cluster_dependent_tests(),
passed = test_configurable_server_properties(),
passed.
@@ -1225,6 +1226,82 @@ test_statistics_receive_event1(Ch, Matcher) ->
after 1000 -> throw(failed_to_receive_event)
end.
+test_confirms_receiver(Pid) ->
+ receive
+ shutdown ->
+ ok;
+ {send_command, Method} ->
+ Pid ! Method,
+ test_confirms_receiver(Pid)
+ end.
+
+test_confirms() ->
+ {_Writer, Ch} = test_spawn(fun test_confirms_receiver/1),
+ DeclareBindDurableQueue =
+ fun() ->
+ rabbit_channel:do(Ch, #'queue.declare'{durable = true}),
+ receive #'queue.declare_ok'{queue = Q0} ->
+ rabbit_channel:do(Ch, #'queue.bind'{
+ queue = Q0,
+ exchange = <<"amq.direct">>,
+ routing_key = "magic" }),
+ receive #'queue.bind_ok'{} ->
+ Q0
+ after 1000 ->
+ throw(failed_to_bind_queue)
+ end
+ after 1000 ->
+ throw(failed_to_declare_queue)
+ end
+ end,
+ %% Declare and bind two queues
+ QName1 = DeclareBindDurableQueue(),
+ QName2 = DeclareBindDurableQueue(),
+ %% Get the first one's pid (we'll crash it later)
+ {ok, Q1} = rabbit_amqqueue:lookup(rabbit_misc:r(<<"/">>, queue, QName1)),
+ QPid1 = Q1#amqqueue.pid,
+ %% Enable confirms
+ rabbit_channel:do(Ch, #'confirm.select'{}),
+ receive #'confirm.select_ok'{} ->
+ ok
+ after 1000 ->
+ throw(failed_to_enable_confirms)
+ end,
+ %% Publish a message
+ rabbit_channel:do(Ch, #'basic.publish'{exchange = <<"amq.direct">>,
+ routing_key = "magic"
+ },
+ rabbit_basic:build_content(
+ #'P_basic'{delivery_mode = 2}, <<"">>)),
+ %% Crash the queue
+ QPid1 ! boom,
+ %% Wait for a nack
+ receive
+ #'basic.nack'{} ->
+ ok;
+ #'basic.ack'{} ->
+ throw(received_ack_instead_of_nack)
+ after 2000 ->
+ throw(did_not_receive_nack)
+ end,
+ receive
+ #'basic.ack'{} ->
+ throw(received_ack_when_none_expected)
+ after 1000 ->
+ ok
+ end,
+ %% Cleanup
+ rabbit_channel:do(Ch, #'queue.delete'{queue = QName2}),
+ receive #'queue.delete_ok'{} ->
+ ok
+ after 1000 ->
+ throw(failed_to_cleanup_queue)
+ end,
+ unlink(Ch),
+ ok = rabbit_channel:shutdown(Ch),
+
+ passed.
+
test_statistics() ->
application:set_env(rabbit, collect_statistics, fine),