diff options
author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-10-20 13:11:18 +0100 |
---|---|---|
committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-10-20 13:11:18 +0100 |
commit | 5fc27ddfe468be603804df4c7a49e2831b8c3f00 (patch) | |
tree | 1537ab4dcdaffea32a8590d8327c19da029fb654 | |
parent | 53f116d3b4e82c36277bb365630384292ffc4daf (diff) | |
download | rabbitmq-server-5fc27ddfe468be603804df4c7a49e2831b8c3f00.tar.gz |
only confirm when all queues have dealt with the message
-rw-r--r-- | src/rabbit_channel.erl | 108 |
1 files changed, 60 insertions, 48 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index ee9cc0df..746f4ca6 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -50,7 +50,7 @@ username, virtual_host, most_recently_declared_queue, consumer_mapping, blocking, queue_collector_pid, stats_timer, confirm_enabled, published_count, confirm_multiple, confirm_tref, - held_confirms, unconfirmed, qpid_to_msgs}). + held_confirms, unconfirmed, queues_for_msg}). -define(MAX_PERMISSION_CACHE_SIZE, 12). @@ -163,7 +163,7 @@ flush_multiple_acks(Pid) -> gen_server2:cast(Pid, flush_multiple_acks). confirm(Pid, MsgSeqNo) -> - gen_server2:cast(Pid, {confirm, MsgSeqNo}). + gen_server2:cast(Pid, {confirm, MsgSeqNo, self()}). %%--------------------------------------------------------------------------- @@ -195,7 +195,7 @@ init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid, confirm_multiple = false, held_confirms = gb_sets:new(), unconfirmed = gb_sets:new(), - qpid_to_msgs = dict:new()}, + queues_for_msg = dict:new()}, rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)), rabbit_event:if_enabled(StatsTimer, fun() -> internal_emit_stats(State) end), @@ -286,19 +286,22 @@ handle_cast(flush_multiple_acks, {noreply, State#ch{held_confirms = gb_sets:new(), confirm_tref = undefined}}; -handle_cast({confirm, MsgSeqNo}, State) -> - {noreply, send_or_enqueue_ack(MsgSeqNo, State)}. +handle_cast({confirm, MsgSeqNo, From}, State) -> + {noreply, send_or_enqueue_ack(MsgSeqNo, From, State)}. handle_info({'DOWN', _MRef, process, QPid, _Reason}, - State = #ch{qpid_to_msgs = QTM}) -> - State2 = case dict:find(QPid, QTM) of - {ok, Msgs} -> State1 = gb_sets:fold(fun send_or_enqueue_ack/2, - State, Msgs), - State1 #ch{qpid_to_msgs = dict:erase(QPid, QTM)}; - error -> State - end, + State = #ch{queues_for_msg = QFM}) -> + State1 = dict:fold( + fun(Msg, QPids, State0 = #ch{queues_for_msg = QFM0}) -> + Qs = sets:del_elem(QPid, QPids), + case sets:size(Qs) of + 0 -> send_or_enqueue_ack(Msg, QPid, State0); + _ -> State0#ch{queues_for_msg = + dict:store(Msg, Qs, QFM0)} + end + end, State, QFM), erase_queue_stats(QPid), - {noreply, queue_blocked(QPid, State2)}. + {noreply, queue_blocked(QPid, State1)}. handle_pre_hibernate(State = #ch{writer_pid = WriterPid, held_confirms = As, @@ -457,44 +460,50 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) -> State#ch{blocking = Blocking1} end. -send_or_enqueue_ack(undefined, State) -> +send_or_enqueue_ack(undefined, _QPid, State) -> State; -send_or_enqueue_ack(_, State = #ch{confirm_enabled = false}) -> +send_or_enqueue_ack(_MsgSeqNo, _QPid, State = #ch{confirm_enabled = false}) -> State; -send_or_enqueue_ack(MsgSeqNo, State = #ch{confirm_multiple = false}) -> +send_or_enqueue_ack(MsgSeqNo, QPid, State = #ch{confirm_multiple = false}) -> do_if_unconfirmed( - MsgSeqNo, State, + MsgSeqNo, QPid, fun(MSN, State1 = #ch{writer_pid = WriterPid}) -> ok = rabbit_writer:send_command( WriterPid, #'basic.ack'{delivery_tag = MSN}), State1 - end); -send_or_enqueue_ack(MsgSeqNo, State = #ch{confirm_multiple = true}) -> + end, State); +send_or_enqueue_ack(MsgSeqNo, QPid, State = #ch{confirm_multiple = true}) -> do_if_unconfirmed( - MsgSeqNo, State, + MsgSeqNo, QPid, fun(MSN, State1 = #ch{held_confirms = As}) -> - start_ack_timer(State1#ch{held_confirms = - gb_sets:add(MSN, As)}) - end). + start_ack_timer(State1#ch{held_confirms = gb_sets:add(MSN, As)}) + end, State). -msg_sent_to_queue(undefined, _QPid, State) -> - State; -msg_sent_to_queue(MsgSeqNo, QPid, State = #ch{qpid_to_msgs = QTM}) -> - Msgs1 = case dict:find(QPid, QTM) of - {ok, Msgs} -> Msgs; - error -> erlang:monitor(process, QPid), - gb_sets:new() - end, - QTM1 = dict:store(QPid, gb_sets:add(MsgSeqNo, Msgs1), QTM), - State#ch{qpid_to_msgs = QTM1}. - -do_if_unconfirmed(MsgSeqNo, State = #ch{unconfirmed = UC}, Fun) -> +do_if_unconfirmed(MsgSeqNo, QPid, ConfirmFun, + State = #ch{unconfirmed = UC, + queues_for_msg = QFM}) -> + %% clears references to MsgSeqNo and does ConfirmFun case gb_sets:is_element(MsgSeqNo, UC) of - true -> QTM = dict:map(fun (_, Msgs) -> - gb_sets:delete_any(MsgSeqNo, Msgs) - end, State#ch.qpid_to_msgs), - State1 = Fun(MsgSeqNo, State#ch{qpid_to_msgs = QTM}), - State1#ch{unconfirmed = gb_sets:delete(MsgSeqNo, UC)}; + true -> + case QPid of + undefined -> + ConfirmFun(MsgSeqNo, + State#ch{unconfirmed = + gb_sets:delete(MsgSeqNo, UC)}); + _ -> + {ok, Qs} = dict:find(MsgSeqNo, QFM), + Qs1 = sets:del_element(QPid, Qs), + case sets:size(Qs1) of + 0 -> ConfirmFun(MsgSeqNo, + State#ch{ + queues_for_msg = + dict:erase(MsgSeqNo, QFM), + unconfirmed = + gb_sets:delete(MsgSeqNo, UC)}); + _ -> State#ch{queues_for_msg = + dict:store(MsgSeqNo, Qs1, QFM)} + end + end; false -> State end. @@ -1238,18 +1247,21 @@ is_message_persistent(Content) -> process_routing_result(unroutable, _, _, MsgSeqNo, Message, State) -> ok = basic_return(Message, State#ch.writer_pid, no_route), - send_or_enqueue_ack(MsgSeqNo, State); + send_or_enqueue_ack(MsgSeqNo, undefined, State); process_routing_result(not_delivered, _, _, MsgSeqNo, Message, State) -> ok = basic_return(Message, State#ch.writer_pid, no_consumers), - send_or_enqueue_ack(MsgSeqNo, State); + send_or_enqueue_ack(MsgSeqNo, undefined, State); process_routing_result(routed, [], _, MsgSeqNo, _, State) -> - send_or_enqueue_ack(MsgSeqNo, State); + send_or_enqueue_ack(MsgSeqNo, undefined, State); +process_routing_result(routed, _, _, undefined, _, State) -> + State; process_routing_result(routed, _, false, MsgSeqNo, _, State) -> - send_or_enqueue_ack(MsgSeqNo, State); -process_routing_result(routed, QPids, true, MsgSeqNo, _, State) -> - lists:foldl(fun (QPid, State0) -> - msg_sent_to_queue(MsgSeqNo, QPid, State0) - end, State, QPids). + send_or_enqueue_ack(MsgSeqNo, undefined, State); +process_routing_result(routed, QPids, true, MsgSeqNo, _, + State = #ch{queues_for_msg = QFM}) -> + QFM1 = dict:store(MsgSeqNo, sets:from_list(QPids), QFM), + [maybe_monitor(QPid) || QPid <- QPids], + State#ch{queues_for_msg = QFM1}. lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) -> State#ch{unacked_message_q = queue:in(MsgStruct, UAMQ)}; |