diff options
author | Emile Joubert <emile@rabbitmq.com> | 2012-12-18 17:20:02 +0000 |
---|---|---|
committer | Emile Joubert <emile@rabbitmq.com> | 2012-12-18 17:20:02 +0000 |
commit | 4b9cc57601b69ad1e3a6f7e9649382dbb1cd8ef3 (patch) | |
tree | d35c204cfa8357ba96d84276603617f39064abaf | |
parent | 7008dd09a161e026db0a574436eebe4506ac6101 (diff) | |
download | rabbitmq-server-4b9cc57601b69ad1e3a6f7e9649382dbb1cd8ef3.tar.gz |
Confirms for HA
-rw-r--r-- | src/dtree.erl | 24 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 21 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 12 | ||||
-rw-r--r-- | src/rabbit_misc.erl | 5 |
4 files changed, 44 insertions, 18 deletions
diff --git a/src/dtree.erl b/src/dtree.erl index ca2d30cf..c59243bb 100644 --- a/src/dtree.erl +++ b/src/dtree.erl @@ -32,7 +32,7 @@ -module(dtree). --export([empty/0, insert/4, take/3, take/2, take_all/2, +-export([empty/0, insert/4, take/3, take/2, take_all/2, take_prim/2, is_defined/2, is_empty/1, smallest/1, size/1]). %%---------------------------------------------------------------------------- @@ -53,6 +53,7 @@ -spec(take/3 :: ([pk()], sk(), ?MODULE()) -> {[kv()], ?MODULE()}). -spec(take/2 :: (sk(), ?MODULE()) -> {[kv()], ?MODULE()}). -spec(take_all/2 :: (sk(), ?MODULE()) -> {[kv()], ?MODULE()}). +-spec(take_prim/2 :: (pk(), ?MODULE()) -> {[kv()], ?MODULE()}). -spec(is_defined/2 :: (sk(), ?MODULE()) -> boolean()). -spec(is_empty/1 :: (?MODULE()) -> boolean()). -spec(smallest/1 :: (?MODULE()) -> kv()). @@ -120,6 +121,13 @@ take_all(SK, {P, S}) -> {KVs, {P1, prune(SKS, PKS, S)}} end. +%% Drop the entry with the given primary key +take_prim(PK, {P, S} = DTree) -> + case gb_trees:lookup(PK, P) of + none -> {[], DTree}; + {value, {SKS, V}} -> {[{PK, V}], take_prim2(PK, SKS, DTree)} + end. + is_defined(SK, {_P, S}) -> gb_trees:is_defined(SK, S). is_empty({P, _S}) -> gb_trees:is_empty(P). @@ -149,6 +157,20 @@ take_all2(PKS, P) -> gb_trees:delete(PK, P0)} end, {[], gb_sets:empty(), P}, PKS). +take_prim2(PK, SKS, {P, S}) -> + {gb_trees:delete(PK, P), + rabbit_misc:gb_trees_fold( + fun (SK0, PKS, S1) -> + case gb_sets:is_member(SK0, SKS) of + false -> S1; + true -> PKS1 = gb_sets:delete(PK, PKS), + case gb_sets:is_empty(PKS1) of + true -> gb_trees:delete(SK0, S1); + false -> gb_trees:update(SK0, PKS1, S1) + end + end + end, S, S)}. + prune(SKS, PKS, S) -> gb_sets:fold(fun (SK0, S0) -> PKS1 = gb_trees:get(SK0, S0), diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index f5e2b400..01125819 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -580,11 +580,9 @@ deliver_or_enqueue(Delivery = #delivery{message = Message}, discard(Delivery, State2); {false, State2} -> case publish_max(Delivery, Props, Delivered, State2) of - nopub -> - State2; - BQS1 -> - ensure_ttl_timer(Props#message_properties.expiry, - State2#q{backing_queue_state = BQS1}) + nopub -> State2; + BQS1 -> ensure_ttl_timer(Props#message_properties.expiry, + State2#q{backing_queue_state = BQS1}) end end. @@ -597,21 +595,16 @@ publish_max(#delivery{message = Message, publish_max(#delivery{message = Message, msg_seq_no = MsgSeqNo, sender = SenderPid}, - Props = #message_properties{needs_confirming = Confirm}, - Delivered, - #q{backing_queue = BQ, - backing_queue_state = BQS, - max_depth = MaxDepth}) -> + Props, Delivered, #q{backing_queue = BQ, + backing_queue_state = BQS, + max_depth = MaxDepth}) -> {Depth, Len} = {BQ:depth(BQS), BQ:len(BQS)}, case {Depth >= MaxDepth, Len =:= 0} of {false, _} -> BQ:publish(Message, Props, Delivered, SenderPid, BQS); {true, true} -> (dead_letter_fun(maxdepth))([{Message, undefined}]), - case Confirm of - true -> rabbit_misc:confirm_to_sender(SenderPid, [MsgSeqNo]); - false -> ok - end, + rabbit_misc:confirm_all(SenderPid, MsgSeqNo), nopub; {true, false} -> {{Msg, _IsDelivered, AckTag}, BQS1} = BQ:fetch(true, BQS), diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index a3c82865..ae29861e 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -224,8 +224,9 @@ prioritise_call(Msg, _From, _State) -> prioritise_cast(Msg, _State) -> case Msg of - {confirm, _MsgSeqNos, _QPid} -> 5; - _ -> 0 + {confirm, _MsgSeqNos, _QPid} -> 5; + {confirm_all, _MsgSeqNo, _QPid} -> 5; + _ -> 0 end. prioritise_info(Msg, _State) -> @@ -316,6 +317,9 @@ handle_cast(force_event_refresh, State) -> noreply(State); handle_cast({confirm, MsgSeqNos, From}, State) -> State1 = #ch{confirmed = C} = confirm(MsgSeqNos, From, State), + noreply([send_confirms], State1, case C of [] -> hibernate; _ -> 0 end); +handle_cast({confirm_all, MsgSeqNo, _From}, State) -> + State1 = #ch{confirmed = C} = confirm_all(MsgSeqNo, State), noreply([send_confirms], State1, case C of [] -> hibernate; _ -> 0 end). handle_info({bump_credit, Msg}, State) -> @@ -571,6 +575,10 @@ confirm(MsgSeqNos, QPid, State = #ch{unconfirmed = UC}) -> {MXs, UC1} = dtree:take(MsgSeqNos, QPid, UC), record_confirms(MXs, State#ch{unconfirmed = UC1}). +confirm_all(MsgSeqNo, State = #ch{unconfirmed = UC}) -> + {MXs, UC1} = dtree:take_prim(MsgSeqNo, UC), + record_confirms(MXs, State#ch{unconfirmed = UC1}). + handle_method(#'channel.open'{}, _, State = #ch{state = starting}) -> {reply, #'channel.open_ok'{}, State#ch{state = running}}; diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 4efde50e..05569599 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -28,7 +28,7 @@ -export([enable_cover/0, report_cover/0]). -export([enable_cover/1, report_cover/1]). -export([start_cover/1]). --export([confirm_to_sender/2]). +-export([confirm_to_sender/2, confirm_all/2]). -export([throw_on_error/2, with_exit_handler/2, is_abnormal_exit/1, filter_exit_map/2]). -export([with_user/2, with_user_and_vhost/3]). @@ -428,6 +428,9 @@ report_coverage_percentage(File, Cov, NotCov, Mod) -> confirm_to_sender(Pid, MsgSeqNos) -> gen_server2:cast(Pid, {confirm, MsgSeqNos, self()}). +confirm_all(Pid, MsgSeqNo) -> + gen_server2:cast(Pid, {confirm_all, MsgSeqNo, self()}). + %% @doc Halts the emulator returning the given status code to the os. %% On Windows this function will block indefinitely so as to give the io %% subsystem time to flush stdout completely. |