summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2011-01-13 11:36:22 +0000
committerAlexandru Scvortov <alexandru@rabbitmq.com>2011-01-13 11:36:22 +0000
commit3036de45fe3ba8bc873967c319373bccd4c60157 (patch)
tree636850f8cceae7ff677540ec21ea870c6f723594
parent998dc60408788458ffe10d22c5714b9406561a90 (diff)
parent461056061992ba614d71108f74c78cb010cc7d77 (diff)
downloadrabbitmq-server-3036de45fe3ba8bc873967c319373bccd4c60157.tar.gz
merge bug23678 into default (conditional on vq:needs_idle_timeout wrt confirms is too lax)
-rw-r--r--src/rabbit_channel.erl52
1 files changed, 23 insertions, 29 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index a2a22f8a..7b5f096b 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -473,15 +473,6 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) ->
confirm([], _QPid, State) ->
State;
-confirm(_MsgSeqNos, _QPid, State = #ch{confirm_enabled = false}) ->
- State;
-confirm(MsgSeqNos, undefined, State = #ch{unconfirmed = UC,
- queues_for_msg = QFM}) ->
- MsgSeqNos1 = [MSN || MSN <- MsgSeqNos, gb_sets:is_element(MSN, UC)],
- MS = gb_sets:from_list(MsgSeqNos),
- QFM1 = dict:filter(fun(M, _Q) -> not(gb_sets:is_element(M, MS)) end, QFM),
- send_confirms(MsgSeqNos1, State#ch{unconfirmed = gb_sets:difference(UC, MS),
- queues_for_msg = QFM1});
confirm(MsgSeqNos, QPid, State) ->
{DoneMessages, State1} =
lists:foldl(
@@ -539,15 +530,12 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content),
check_user_id_header(DecodedContent#content.properties, State),
IsPersistent = is_message_persistent(DecodedContent),
- {MsgSeqNo, State1}
- = case ConfirmEnabled of
- false -> {undefined, State};
- true -> SeqNo = State#ch.publish_seqno,
- {SeqNo,
- State#ch{publish_seqno = SeqNo + 1,
- unconfirmed =
- gb_sets:add(SeqNo, State#ch.unconfirmed)}}
- end,
+ {MsgSeqNo, State1} =
+ case ConfirmEnabled of
+ false -> {undefined, State};
+ true -> SeqNo = State#ch.publish_seqno,
+ {SeqNo, State#ch{publish_seqno = SeqNo + 1}}
+ end,
Message = #basic_message{exchange_name = ExchangeName,
routing_key = RoutingKey,
content = DecodedContent,
@@ -1218,19 +1206,19 @@ is_message_persistent(Content) ->
process_routing_result(unroutable, _, MsgSeqNo, Message, State) ->
ok = basic_return(Message, State#ch.writer_pid, no_route),
- confirm([MsgSeqNo], undefined, State);
+ send_confirms([MsgSeqNo], State);
process_routing_result(not_delivered, _, MsgSeqNo, Message, State) ->
ok = basic_return(Message, State#ch.writer_pid, no_consumers),
- confirm([MsgSeqNo], undefined, State);
+ send_confirms([MsgSeqNo], State);
process_routing_result(routed, [], MsgSeqNo, _, State) ->
- confirm([MsgSeqNo], undefined, State);
+ send_confirms([MsgSeqNo], State);
process_routing_result(routed, _, undefined, _, State) ->
State;
-process_routing_result(routed, QPids, MsgSeqNo, _,
- State = #ch{queues_for_msg = QFM}) ->
- QFM1 = dict:store(MsgSeqNo, sets:from_list(QPids), QFM),
+process_routing_result(routed, QPids, MsgSeqNo, _, State) ->
+ #ch{queues_for_msg = QFM, unconfirmed = UC} = State,
[maybe_monitor(QPid) || QPid <- QPids],
- State#ch{queues_for_msg = QFM1}.
+ State#ch{queues_for_msg = dict:store(MsgSeqNo, sets:from_list(QPids), QFM),
+ unconfirmed = gb_sets:add(MsgSeqNo, UC)}.
lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) ->
State#ch{unacked_message_q = queue:in(MsgStruct, UAMQ)};
@@ -1239,6 +1227,9 @@ lock_message(false, _MsgStruct, State) ->
send_confirms([], State) ->
State;
+send_confirms([MsgSeqNo], State = #ch{writer_pid = WriterPid}) ->
+ send_confirm(MsgSeqNo, WriterPid),
+ State;
send_confirms(Cs, State = #ch{writer_pid = WriterPid, unconfirmed = UC}) ->
SCs = lists:usort(Cs),
CutOff = case gb_sets:is_empty(UC) of
@@ -1252,12 +1243,15 @@ send_confirms(Cs, State = #ch{writer_pid = WriterPid, unconfirmed = UC}) ->
WriterPid, #'basic.ack'{delivery_tag = lists:last(Ms),
multiple = true})
end,
- ok = lists:foldl(fun(T, ok) ->
- rabbit_writer:send_command(
- WriterPid, #'basic.ack'{delivery_tag = T})
- end, ok, Ss),
+ [ok = send_confirm(SeqNo, WriterPid) || SeqNo <- Ss],
State.
+send_confirm(undefined, _WriterPid) ->
+ ok;
+send_confirm(SeqNo, WriterPid) ->
+ ok = rabbit_writer:send_command(WriterPid,
+ #'basic.ack'{delivery_tag = SeqNo}).
+
terminate(_State) ->
pg_local:leave(rabbit_channels, self()),
rabbit_event:notify(channel_closed, [{pid, self()}]).