summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEmile Joubert <emile@rabbitmq.com>2012-12-18 17:20:02 +0000
committerEmile Joubert <emile@rabbitmq.com>2012-12-18 17:20:02 +0000
commit4b9cc57601b69ad1e3a6f7e9649382dbb1cd8ef3 (patch)
treed35c204cfa8357ba96d84276603617f39064abaf
parent7008dd09a161e026db0a574436eebe4506ac6101 (diff)
downloadrabbitmq-server-4b9cc57601b69ad1e3a6f7e9649382dbb1cd8ef3.tar.gz
Confirms for HA
-rw-r--r--src/dtree.erl24
-rw-r--r--src/rabbit_amqqueue_process.erl21
-rw-r--r--src/rabbit_channel.erl12
-rw-r--r--src/rabbit_misc.erl5
4 files changed, 44 insertions, 18 deletions
diff --git a/src/dtree.erl b/src/dtree.erl
index ca2d30cf..c59243bb 100644
--- a/src/dtree.erl
+++ b/src/dtree.erl
@@ -32,7 +32,7 @@
-module(dtree).
--export([empty/0, insert/4, take/3, take/2, take_all/2,
+-export([empty/0, insert/4, take/3, take/2, take_all/2, take_prim/2,
is_defined/2, is_empty/1, smallest/1, size/1]).
%%----------------------------------------------------------------------------
@@ -53,6 +53,7 @@
-spec(take/3 :: ([pk()], sk(), ?MODULE()) -> {[kv()], ?MODULE()}).
-spec(take/2 :: (sk(), ?MODULE()) -> {[kv()], ?MODULE()}).
-spec(take_all/2 :: (sk(), ?MODULE()) -> {[kv()], ?MODULE()}).
+-spec(take_prim/2 :: (pk(), ?MODULE()) -> {[kv()], ?MODULE()}).
-spec(is_defined/2 :: (sk(), ?MODULE()) -> boolean()).
-spec(is_empty/1 :: (?MODULE()) -> boolean()).
-spec(smallest/1 :: (?MODULE()) -> kv()).
@@ -120,6 +121,13 @@ take_all(SK, {P, S}) ->
{KVs, {P1, prune(SKS, PKS, S)}}
end.
+%% Drop the entry with the given primary key
+take_prim(PK, {P, S} = DTree) ->
+ case gb_trees:lookup(PK, P) of
+ none -> {[], DTree};
+ {value, {SKS, V}} -> {[{PK, V}], take_prim2(PK, SKS, DTree)}
+ end.
+
is_defined(SK, {_P, S}) -> gb_trees:is_defined(SK, S).
is_empty({P, _S}) -> gb_trees:is_empty(P).
@@ -149,6 +157,20 @@ take_all2(PKS, P) ->
gb_trees:delete(PK, P0)}
end, {[], gb_sets:empty(), P}, PKS).
+take_prim2(PK, SKS, {P, S}) ->
+ {gb_trees:delete(PK, P),
+ rabbit_misc:gb_trees_fold(
+ fun (SK0, PKS, S1) ->
+ case gb_sets:is_member(SK0, SKS) of
+ false -> S1;
+ true -> PKS1 = gb_sets:delete(PK, PKS),
+ case gb_sets:is_empty(PKS1) of
+ true -> gb_trees:delete(SK0, S1);
+ false -> gb_trees:update(SK0, PKS1, S1)
+ end
+ end
+ end, S, S)}.
+
prune(SKS, PKS, S) ->
gb_sets:fold(fun (SK0, S0) ->
PKS1 = gb_trees:get(SK0, S0),
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index f5e2b400..01125819 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -580,11 +580,9 @@ deliver_or_enqueue(Delivery = #delivery{message = Message},
discard(Delivery, State2);
{false, State2} ->
case publish_max(Delivery, Props, Delivered, State2) of
- nopub ->
- State2;
- BQS1 ->
- ensure_ttl_timer(Props#message_properties.expiry,
- State2#q{backing_queue_state = BQS1})
+ nopub -> State2;
+ BQS1 -> ensure_ttl_timer(Props#message_properties.expiry,
+ State2#q{backing_queue_state = BQS1})
end
end.
@@ -597,21 +595,16 @@ publish_max(#delivery{message = Message,
publish_max(#delivery{message = Message,
msg_seq_no = MsgSeqNo,
sender = SenderPid},
- Props = #message_properties{needs_confirming = Confirm},
- Delivered,
- #q{backing_queue = BQ,
- backing_queue_state = BQS,
- max_depth = MaxDepth}) ->
+ Props, Delivered, #q{backing_queue = BQ,
+ backing_queue_state = BQS,
+ max_depth = MaxDepth}) ->
{Depth, Len} = {BQ:depth(BQS), BQ:len(BQS)},
case {Depth >= MaxDepth, Len =:= 0} of
{false, _} ->
BQ:publish(Message, Props, Delivered, SenderPid, BQS);
{true, true} ->
(dead_letter_fun(maxdepth))([{Message, undefined}]),
- case Confirm of
- true -> rabbit_misc:confirm_to_sender(SenderPid, [MsgSeqNo]);
- false -> ok
- end,
+ rabbit_misc:confirm_all(SenderPid, MsgSeqNo),
nopub;
{true, false} ->
{{Msg, _IsDelivered, AckTag}, BQS1} = BQ:fetch(true, BQS),
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index a3c82865..ae29861e 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -224,8 +224,9 @@ prioritise_call(Msg, _From, _State) ->
prioritise_cast(Msg, _State) ->
case Msg of
- {confirm, _MsgSeqNos, _QPid} -> 5;
- _ -> 0
+ {confirm, _MsgSeqNos, _QPid} -> 5;
+ {confirm_all, _MsgSeqNo, _QPid} -> 5;
+ _ -> 0
end.
prioritise_info(Msg, _State) ->
@@ -316,6 +317,9 @@ handle_cast(force_event_refresh, State) ->
noreply(State);
handle_cast({confirm, MsgSeqNos, From}, State) ->
State1 = #ch{confirmed = C} = confirm(MsgSeqNos, From, State),
+ noreply([send_confirms], State1, case C of [] -> hibernate; _ -> 0 end);
+handle_cast({confirm_all, MsgSeqNo, _From}, State) ->
+ State1 = #ch{confirmed = C} = confirm_all(MsgSeqNo, State),
noreply([send_confirms], State1, case C of [] -> hibernate; _ -> 0 end).
handle_info({bump_credit, Msg}, State) ->
@@ -571,6 +575,10 @@ confirm(MsgSeqNos, QPid, State = #ch{unconfirmed = UC}) ->
{MXs, UC1} = dtree:take(MsgSeqNos, QPid, UC),
record_confirms(MXs, State#ch{unconfirmed = UC1}).
+confirm_all(MsgSeqNo, State = #ch{unconfirmed = UC}) ->
+ {MXs, UC1} = dtree:take_prim(MsgSeqNo, UC),
+ record_confirms(MXs, State#ch{unconfirmed = UC1}).
+
handle_method(#'channel.open'{}, _, State = #ch{state = starting}) ->
{reply, #'channel.open_ok'{}, State#ch{state = running}};
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 4efde50e..05569599 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -28,7 +28,7 @@
-export([enable_cover/0, report_cover/0]).
-export([enable_cover/1, report_cover/1]).
-export([start_cover/1]).
--export([confirm_to_sender/2]).
+-export([confirm_to_sender/2, confirm_all/2]).
-export([throw_on_error/2, with_exit_handler/2, is_abnormal_exit/1,
filter_exit_map/2]).
-export([with_user/2, with_user_and_vhost/3]).
@@ -428,6 +428,9 @@ report_coverage_percentage(File, Cov, NotCov, Mod) ->
confirm_to_sender(Pid, MsgSeqNos) ->
gen_server2:cast(Pid, {confirm, MsgSeqNos, self()}).
+confirm_all(Pid, MsgSeqNo) ->
+ gen_server2:cast(Pid, {confirm_all, MsgSeqNo, self()}).
+
%% @doc Halts the emulator returning the given status code to the os.
%% On Windows this function will block indefinitely so as to give the io
%% subsystem time to flush stdout completely.