summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2010-10-20 13:11:18 +0100
committerAlexandru Scvortov <alexandru@rabbitmq.com>2010-10-20 13:11:18 +0100
commit5fc27ddfe468be603804df4c7a49e2831b8c3f00 (patch)
tree1537ab4dcdaffea32a8590d8327c19da029fb654
parent53f116d3b4e82c36277bb365630384292ffc4daf (diff)
downloadrabbitmq-server-5fc27ddfe468be603804df4c7a49e2831b8c3f00.tar.gz
only confirm when all queues have dealt with the message
-rw-r--r--src/rabbit_channel.erl108
1 files changed, 60 insertions, 48 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index ee9cc0df..746f4ca6 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -50,7 +50,7 @@
username, virtual_host, most_recently_declared_queue,
consumer_mapping, blocking, queue_collector_pid, stats_timer,
confirm_enabled, published_count, confirm_multiple, confirm_tref,
- held_confirms, unconfirmed, qpid_to_msgs}).
+ held_confirms, unconfirmed, queues_for_msg}).
-define(MAX_PERMISSION_CACHE_SIZE, 12).
@@ -163,7 +163,7 @@ flush_multiple_acks(Pid) ->
gen_server2:cast(Pid, flush_multiple_acks).
confirm(Pid, MsgSeqNo) ->
- gen_server2:cast(Pid, {confirm, MsgSeqNo}).
+ gen_server2:cast(Pid, {confirm, MsgSeqNo, self()}).
%%---------------------------------------------------------------------------
@@ -195,7 +195,7 @@ init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid,
confirm_multiple = false,
held_confirms = gb_sets:new(),
unconfirmed = gb_sets:new(),
- qpid_to_msgs = dict:new()},
+ queues_for_msg = dict:new()},
rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)),
rabbit_event:if_enabled(StatsTimer,
fun() -> internal_emit_stats(State) end),
@@ -286,19 +286,22 @@ handle_cast(flush_multiple_acks,
{noreply, State#ch{held_confirms = gb_sets:new(),
confirm_tref = undefined}};
-handle_cast({confirm, MsgSeqNo}, State) ->
- {noreply, send_or_enqueue_ack(MsgSeqNo, State)}.
+handle_cast({confirm, MsgSeqNo, From}, State) ->
+ {noreply, send_or_enqueue_ack(MsgSeqNo, From, State)}.
handle_info({'DOWN', _MRef, process, QPid, _Reason},
- State = #ch{qpid_to_msgs = QTM}) ->
- State2 = case dict:find(QPid, QTM) of
- {ok, Msgs} -> State1 = gb_sets:fold(fun send_or_enqueue_ack/2,
- State, Msgs),
- State1 #ch{qpid_to_msgs = dict:erase(QPid, QTM)};
- error -> State
- end,
+ State = #ch{queues_for_msg = QFM}) ->
+ State1 = dict:fold(
+ fun(Msg, QPids, State0 = #ch{queues_for_msg = QFM0}) ->
+ Qs = sets:del_elem(QPid, QPids),
+ case sets:size(Qs) of
+ 0 -> send_or_enqueue_ack(Msg, QPid, State0);
+ _ -> State0#ch{queues_for_msg =
+ dict:store(Msg, Qs, QFM0)}
+ end
+ end, State, QFM),
erase_queue_stats(QPid),
- {noreply, queue_blocked(QPid, State2)}.
+ {noreply, queue_blocked(QPid, State1)}.
handle_pre_hibernate(State = #ch{writer_pid = WriterPid,
held_confirms = As,
@@ -457,44 +460,50 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) ->
State#ch{blocking = Blocking1}
end.
-send_or_enqueue_ack(undefined, State) ->
+send_or_enqueue_ack(undefined, _QPid, State) ->
State;
-send_or_enqueue_ack(_, State = #ch{confirm_enabled = false}) ->
+send_or_enqueue_ack(_MsgSeqNo, _QPid, State = #ch{confirm_enabled = false}) ->
State;
-send_or_enqueue_ack(MsgSeqNo, State = #ch{confirm_multiple = false}) ->
+send_or_enqueue_ack(MsgSeqNo, QPid, State = #ch{confirm_multiple = false}) ->
do_if_unconfirmed(
- MsgSeqNo, State,
+ MsgSeqNo, QPid,
fun(MSN, State1 = #ch{writer_pid = WriterPid}) ->
ok = rabbit_writer:send_command(
WriterPid, #'basic.ack'{delivery_tag = MSN}),
State1
- end);
-send_or_enqueue_ack(MsgSeqNo, State = #ch{confirm_multiple = true}) ->
+ end, State);
+send_or_enqueue_ack(MsgSeqNo, QPid, State = #ch{confirm_multiple = true}) ->
do_if_unconfirmed(
- MsgSeqNo, State,
+ MsgSeqNo, QPid,
fun(MSN, State1 = #ch{held_confirms = As}) ->
- start_ack_timer(State1#ch{held_confirms =
- gb_sets:add(MSN, As)})
- end).
+ start_ack_timer(State1#ch{held_confirms = gb_sets:add(MSN, As)})
+ end, State).
-msg_sent_to_queue(undefined, _QPid, State) ->
- State;
-msg_sent_to_queue(MsgSeqNo, QPid, State = #ch{qpid_to_msgs = QTM}) ->
- Msgs1 = case dict:find(QPid, QTM) of
- {ok, Msgs} -> Msgs;
- error -> erlang:monitor(process, QPid),
- gb_sets:new()
- end,
- QTM1 = dict:store(QPid, gb_sets:add(MsgSeqNo, Msgs1), QTM),
- State#ch{qpid_to_msgs = QTM1}.
-
-do_if_unconfirmed(MsgSeqNo, State = #ch{unconfirmed = UC}, Fun) ->
+do_if_unconfirmed(MsgSeqNo, QPid, ConfirmFun,
+ State = #ch{unconfirmed = UC,
+ queues_for_msg = QFM}) ->
+ %% clears references to MsgSeqNo and does ConfirmFun
case gb_sets:is_element(MsgSeqNo, UC) of
- true -> QTM = dict:map(fun (_, Msgs) ->
- gb_sets:delete_any(MsgSeqNo, Msgs)
- end, State#ch.qpid_to_msgs),
- State1 = Fun(MsgSeqNo, State#ch{qpid_to_msgs = QTM}),
- State1#ch{unconfirmed = gb_sets:delete(MsgSeqNo, UC)};
+ true ->
+ case QPid of
+ undefined ->
+ ConfirmFun(MsgSeqNo,
+ State#ch{unconfirmed =
+ gb_sets:delete(MsgSeqNo, UC)});
+ _ ->
+ {ok, Qs} = dict:find(MsgSeqNo, QFM),
+ Qs1 = sets:del_element(QPid, Qs),
+ case sets:size(Qs1) of
+ 0 -> ConfirmFun(MsgSeqNo,
+ State#ch{
+ queues_for_msg =
+ dict:erase(MsgSeqNo, QFM),
+ unconfirmed =
+ gb_sets:delete(MsgSeqNo, UC)});
+ _ -> State#ch{queues_for_msg =
+ dict:store(MsgSeqNo, Qs1, QFM)}
+ end
+ end;
false -> State
end.
@@ -1238,18 +1247,21 @@ is_message_persistent(Content) ->
process_routing_result(unroutable, _, _, MsgSeqNo, Message, State) ->
ok = basic_return(Message, State#ch.writer_pid, no_route),
- send_or_enqueue_ack(MsgSeqNo, State);
+ send_or_enqueue_ack(MsgSeqNo, undefined, State);
process_routing_result(not_delivered, _, _, MsgSeqNo, Message, State) ->
ok = basic_return(Message, State#ch.writer_pid, no_consumers),
- send_or_enqueue_ack(MsgSeqNo, State);
+ send_or_enqueue_ack(MsgSeqNo, undefined, State);
process_routing_result(routed, [], _, MsgSeqNo, _, State) ->
- send_or_enqueue_ack(MsgSeqNo, State);
+ send_or_enqueue_ack(MsgSeqNo, undefined, State);
+process_routing_result(routed, _, _, undefined, _, State) ->
+ State;
process_routing_result(routed, _, false, MsgSeqNo, _, State) ->
- send_or_enqueue_ack(MsgSeqNo, State);
-process_routing_result(routed, QPids, true, MsgSeqNo, _, State) ->
- lists:foldl(fun (QPid, State0) ->
- msg_sent_to_queue(MsgSeqNo, QPid, State0)
- end, State, QPids).
+ send_or_enqueue_ack(MsgSeqNo, undefined, State);
+process_routing_result(routed, QPids, true, MsgSeqNo, _,
+ State = #ch{queues_for_msg = QFM}) ->
+ QFM1 = dict:store(MsgSeqNo, sets:from_list(QPids), QFM),
+ [maybe_monitor(QPid) || QPid <- QPids],
+ State#ch{queues_for_msg = QFM1}.
lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) ->
State#ch{unacked_message_q = queue:in(MsgStruct, UAMQ)};