diff options
author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2011-03-09 12:10:40 +0000 |
---|---|---|
committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2011-03-09 12:10:40 +0000 |
commit | 395d873a5c7516910f7768787a6676b2260f0ffe (patch) | |
tree | acc439977842204f7591dcb3a95c265596dd2e00 | |
parent | a3f01f3123c3c4b5d5ab6353a5121b5a1d5a999c (diff) | |
parent | 84594b1b331375c6b8f37372b2e68cf41fba6c68 (diff) | |
download | rabbitmq-server-395d873a5c7516910f7768787a6676b2260f0ffe.tar.gz |
merge bug23920 into default
-rw-r--r-- | src/rabbit_channel.erl | 32 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 77 |
2 files changed, 95 insertions, 14 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 526fb428..f584ff32 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -298,12 +298,13 @@ handle_info({'DOWN', _MRef, process, QPid, Reason}, %% process_confirms to prevent each MsgSeqNo being removed from %% the set one by one which which would be inefficient State1 = State#ch{unconfirmed_qm = gb_trees:delete_any(QPid, UQM)}, - {MXs, State2} = process_confirms(MsgSeqNos, QPid, State1), + {Nack, SendFun} = case Reason of + normal -> {false, fun record_confirms/2}; + _ -> {true, fun send_nacks/2} + end, + {MXs, State2} = process_confirms(MsgSeqNos, QPid, Nack, State1), erase_queue_stats(QPid), - State3 = (case Reason of - normal -> fun record_confirms/2; - _ -> fun send_nacks/2 - end)(MXs, State2), + State3 = SendFun(MXs, State2), noreply(queue_blocked(QPid, State3)). handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) -> @@ -513,23 +514,24 @@ record_confirms(MXs, State = #ch{confirmed = C}) -> confirm([], _QPid, State) -> State; confirm(MsgSeqNos, QPid, State) -> - {MXs, State1} = process_confirms(MsgSeqNos, QPid, State), + {MXs, State1} = process_confirms(MsgSeqNos, QPid, false, State), record_confirms(MXs, State1). -process_confirms(MsgSeqNos, QPid, State = #ch{unconfirmed_mq = UMQ, - unconfirmed_qm = UQM}) -> +process_confirms(MsgSeqNos, QPid, Nack, State = #ch{unconfirmed_mq = UMQ, + unconfirmed_qm = UQM}) -> {MXs, UMQ1, UQM1} = lists:foldl( - fun(MsgSeqNo, {_DMs, UMQ0, _UQM} = Acc) -> + fun(MsgSeqNo, {_MXs, UMQ0, _UQM} = Acc) -> case gb_trees:lookup(MsgSeqNo, UMQ0) of - {value, XQ} -> remove_unconfirmed(MsgSeqNo, QPid, XQ, Acc, - State); + {value, XQ} -> remove_unconfirmed(MsgSeqNo, QPid, XQ, + Acc, Nack, State); none -> Acc end end, {[], UMQ, UQM}, MsgSeqNos), {MXs, State#ch{unconfirmed_mq = UMQ1, unconfirmed_qm = UQM1}}. -remove_unconfirmed(MsgSeqNo, QPid, {XName, Qs}, {MXs, UMQ, UQM}, State) -> +remove_unconfirmed(MsgSeqNo, QPid, {XName, Qs}, {MXs, UMQ, UQM}, Nack, + State) -> %% these confirms will be emitted even when a queue dies, but that %% should be fine, since the queue stats get erased immediately maybe_incr_stats([{{QPid, XName}, 1}], confirm, State), @@ -544,8 +546,10 @@ remove_unconfirmed(MsgSeqNo, QPid, {XName, Qs}, {MXs, UMQ, UQM}, State) -> UQM end, Qs1 = gb_sets:del_element(QPid, Qs), - case gb_sets:is_empty(Qs1) of - true -> + %% If QPid somehow died initiating a nack, clear the message from + %% internal data-structures. Also, cleanup empty entries. + case (Nack orelse gb_sets:is_empty(Qs1)) of + true -> {[{MsgSeqNo, XName} | MXs], gb_trees:delete(MsgSeqNo, UMQ), UQM1}; false -> {MXs, gb_trees:update(MsgSeqNo, {XName, Qs1}, UMQ), UQM1} diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index d6af16ac..9547cae5 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -57,6 +57,7 @@ all_tests() -> passed = test_cluster_management(), passed = test_user_management(), passed = test_server_status(), + passed = test_confirms(), passed = maybe_run_cluster_dependent_tests(), passed = test_configurable_server_properties(), passed. @@ -1225,6 +1226,82 @@ test_statistics_receive_event1(Ch, Matcher) -> after 1000 -> throw(failed_to_receive_event) end. +test_confirms_receiver(Pid) -> + receive + shutdown -> + ok; + {send_command, Method} -> + Pid ! Method, + test_confirms_receiver(Pid) + end. + +test_confirms() -> + {_Writer, Ch} = test_spawn(fun test_confirms_receiver/1), + DeclareBindDurableQueue = + fun() -> + rabbit_channel:do(Ch, #'queue.declare'{durable = true}), + receive #'queue.declare_ok'{queue = Q0} -> + rabbit_channel:do(Ch, #'queue.bind'{ + queue = Q0, + exchange = <<"amq.direct">>, + routing_key = "magic" }), + receive #'queue.bind_ok'{} -> + Q0 + after 1000 -> + throw(failed_to_bind_queue) + end + after 1000 -> + throw(failed_to_declare_queue) + end + end, + %% Declare and bind two queues + QName1 = DeclareBindDurableQueue(), + QName2 = DeclareBindDurableQueue(), + %% Get the first one's pid (we'll crash it later) + {ok, Q1} = rabbit_amqqueue:lookup(rabbit_misc:r(<<"/">>, queue, QName1)), + QPid1 = Q1#amqqueue.pid, + %% Enable confirms + rabbit_channel:do(Ch, #'confirm.select'{}), + receive #'confirm.select_ok'{} -> + ok + after 1000 -> + throw(failed_to_enable_confirms) + end, + %% Publish a message + rabbit_channel:do(Ch, #'basic.publish'{exchange = <<"amq.direct">>, + routing_key = "magic" + }, + rabbit_basic:build_content( + #'P_basic'{delivery_mode = 2}, <<"">>)), + %% Crash the queue + QPid1 ! boom, + %% Wait for a nack + receive + #'basic.nack'{} -> + ok; + #'basic.ack'{} -> + throw(received_ack_instead_of_nack) + after 2000 -> + throw(did_not_receive_nack) + end, + receive + #'basic.ack'{} -> + throw(received_ack_when_none_expected) + after 1000 -> + ok + end, + %% Cleanup + rabbit_channel:do(Ch, #'queue.delete'{queue = QName2}), + receive #'queue.delete_ok'{} -> + ok + after 1000 -> + throw(failed_to_cleanup_queue) + end, + unlink(Ch), + ok = rabbit_channel:shutdown(Ch), + + passed. + test_statistics() -> application:set_env(rabbit, collect_statistics, fine), |