diff options
author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2011-01-13 11:36:22 +0000 |
---|---|---|
committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2011-01-13 11:36:22 +0000 |
commit | 3036de45fe3ba8bc873967c319373bccd4c60157 (patch) | |
tree | 636850f8cceae7ff677540ec21ea870c6f723594 | |
parent | 998dc60408788458ffe10d22c5714b9406561a90 (diff) | |
parent | 461056061992ba614d71108f74c78cb010cc7d77 (diff) | |
download | rabbitmq-server-3036de45fe3ba8bc873967c319373bccd4c60157.tar.gz |
merge bug23678 into default (conditional on vq:needs_idle_timeout wrt confirms is too lax)
-rw-r--r-- | src/rabbit_channel.erl | 52 |
1 files changed, 23 insertions, 29 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index a2a22f8a..7b5f096b 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -473,15 +473,6 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) -> confirm([], _QPid, State) -> State; -confirm(_MsgSeqNos, _QPid, State = #ch{confirm_enabled = false}) -> - State; -confirm(MsgSeqNos, undefined, State = #ch{unconfirmed = UC, - queues_for_msg = QFM}) -> - MsgSeqNos1 = [MSN || MSN <- MsgSeqNos, gb_sets:is_element(MSN, UC)], - MS = gb_sets:from_list(MsgSeqNos), - QFM1 = dict:filter(fun(M, _Q) -> not(gb_sets:is_element(M, MS)) end, QFM), - send_confirms(MsgSeqNos1, State#ch{unconfirmed = gb_sets:difference(UC, MS), - queues_for_msg = QFM1}); confirm(MsgSeqNos, QPid, State) -> {DoneMessages, State1} = lists:foldl( @@ -539,15 +530,12 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content), check_user_id_header(DecodedContent#content.properties, State), IsPersistent = is_message_persistent(DecodedContent), - {MsgSeqNo, State1} - = case ConfirmEnabled of - false -> {undefined, State}; - true -> SeqNo = State#ch.publish_seqno, - {SeqNo, - State#ch{publish_seqno = SeqNo + 1, - unconfirmed = - gb_sets:add(SeqNo, State#ch.unconfirmed)}} - end, + {MsgSeqNo, State1} = + case ConfirmEnabled of + false -> {undefined, State}; + true -> SeqNo = State#ch.publish_seqno, + {SeqNo, State#ch{publish_seqno = SeqNo + 1}} + end, Message = #basic_message{exchange_name = ExchangeName, routing_key = RoutingKey, content = DecodedContent, @@ -1218,19 +1206,19 @@ is_message_persistent(Content) -> process_routing_result(unroutable, _, MsgSeqNo, Message, State) -> ok = basic_return(Message, State#ch.writer_pid, no_route), - confirm([MsgSeqNo], undefined, State); + send_confirms([MsgSeqNo], State); process_routing_result(not_delivered, _, MsgSeqNo, Message, State) -> ok = basic_return(Message, State#ch.writer_pid, no_consumers), - confirm([MsgSeqNo], undefined, State); + send_confirms([MsgSeqNo], State); process_routing_result(routed, [], MsgSeqNo, _, State) -> - confirm([MsgSeqNo], undefined, State); + send_confirms([MsgSeqNo], State); process_routing_result(routed, _, undefined, _, State) -> State; -process_routing_result(routed, QPids, MsgSeqNo, _, - State = #ch{queues_for_msg = QFM}) -> - QFM1 = dict:store(MsgSeqNo, sets:from_list(QPids), QFM), +process_routing_result(routed, QPids, MsgSeqNo, _, State) -> + #ch{queues_for_msg = QFM, unconfirmed = UC} = State, [maybe_monitor(QPid) || QPid <- QPids], - State#ch{queues_for_msg = QFM1}. + State#ch{queues_for_msg = dict:store(MsgSeqNo, sets:from_list(QPids), QFM), + unconfirmed = gb_sets:add(MsgSeqNo, UC)}. lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) -> State#ch{unacked_message_q = queue:in(MsgStruct, UAMQ)}; @@ -1239,6 +1227,9 @@ lock_message(false, _MsgStruct, State) -> send_confirms([], State) -> State; +send_confirms([MsgSeqNo], State = #ch{writer_pid = WriterPid}) -> + send_confirm(MsgSeqNo, WriterPid), + State; send_confirms(Cs, State = #ch{writer_pid = WriterPid, unconfirmed = UC}) -> SCs = lists:usort(Cs), CutOff = case gb_sets:is_empty(UC) of @@ -1252,12 +1243,15 @@ send_confirms(Cs, State = #ch{writer_pid = WriterPid, unconfirmed = UC}) -> WriterPid, #'basic.ack'{delivery_tag = lists:last(Ms), multiple = true}) end, - ok = lists:foldl(fun(T, ok) -> - rabbit_writer:send_command( - WriterPid, #'basic.ack'{delivery_tag = T}) - end, ok, Ss), + [ok = send_confirm(SeqNo, WriterPid) || SeqNo <- Ss], State. +send_confirm(undefined, _WriterPid) -> + ok; +send_confirm(SeqNo, WriterPid) -> + ok = rabbit_writer:send_command(WriterPid, + #'basic.ack'{delivery_tag = SeqNo}). + terminate(_State) -> pg_local:leave(rabbit_channels, self()), rabbit_event:notify(channel_closed, [{pid, self()}]). |