summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2012-04-03 18:40:40 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2012-04-03 18:40:40 +0100
commit363f7a6aad12282b2c87983145c00b0c2f7faad9 (patch)
tree771d24ec71ddca8306288575c4fdcf18ee2f6bc8
parentd8ebe0e0c16c2b4b27ef57800a30cd9aadd3f625 (diff)
downloadrabbitmq-server-363f7a6aad12282b2c87983145c00b0c2f7faad9.tar.gz
fix nack handling in channel
When a queue dies abnormally we nack all messages sent to that queue and still pending confirms. Any confirms coming in for these messages thereafter - from other queues - are ignored.
-rw-r--r--src/dtree.erl27
-rw-r--r--src/rabbit_channel.erl11
2 files changed, 32 insertions, 6 deletions
diff --git a/src/dtree.erl b/src/dtree.erl
index 30fa8794..66a862cb 100644
--- a/src/dtree.erl
+++ b/src/dtree.erl
@@ -31,7 +31,7 @@
-module(dtree).
--export([empty/0, insert/4, take/3, take/2,
+-export([empty/0, insert/4, take/3, take/2, take_all/2,
is_defined/2, is_empty/1, smallest/1, size/1]).
%%----------------------------------------------------------------------------
@@ -51,6 +51,7 @@
-spec(insert/4 :: (pk(), [sk()], val(), ?MODULE()) -> ?MODULE()).
-spec(take/3 :: ([pk()], sk(), ?MODULE()) -> {[kv()], ?MODULE()}).
-spec(take/2 :: (sk(), ?MODULE()) -> {[kv()], ?MODULE()}).
+-spec(take_all/2 :: (sk(), ?MODULE()) -> {[kv()], ?MODULE()}).
-spec(is_defined/2 :: (sk(), ?MODULE()) -> boolean()).
-spec(is_empty/1 :: (?MODULE()) -> boolean()).
-spec(smallest/1 :: (?MODULE()) -> kv()).
@@ -93,6 +94,13 @@ take(SK, {P, S}) ->
{KVs, {P1, gb_trees:delete(SK, S)}}
end.
+take_all(SK, {P, S}) ->
+ case gb_trees:lookup(SK, S) of
+ none -> {[], {P, S}};
+ {value, PKS} -> {KVs, SKS, P1} = take_all2(PKS, P),
+ {KVs, {P1, prune(SKS, PKS, S)}}
+ end.
+
is_defined(SK, {_P, S}) -> gb_trees:is_defined(SK, S).
is_empty({P, _S}) -> gb_trees:is_empty(P).
@@ -114,3 +122,20 @@ take2(PKS, SK, P) ->
false -> {KVs, gb_trees:update(PK, {SKS1, V}, P0)}
end
end, {[], P}, PKS).
+
+take_all2(PKS, P) ->
+ gb_sets:fold(fun (PK, {KVs, SKS0, P0}) ->
+ {SKS, V} = gb_trees:get(PK, P0),
+ {[{PK, V} | KVs], gb_sets:union(SKS, SKS0),
+ gb_trees:delete(PK, P0)}
+ end, {[], gb_sets:empty(), P}, PKS).
+
+prune(SKS, PKS, S) ->
+ gb_sets:fold(fun (SK0, S0) ->
+ PKS1 = gb_trees:get(SK0, S0),
+ PKS2 = gb_sets:difference(PKS1, PKS),
+ case gb_sets:is_empty(PKS2) of
+ true -> gb_trees:delete(SK0, S0);
+ false -> gb_trees:update(SK0, PKS2, S0)
+ end
+ end, S, SKS).
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 4a0e93be..0c1c11d8 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -1116,11 +1116,12 @@ monitor_queue(QPid, State = #ch{queue_monitors = QMons}) ->
end.
handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed = UC}) ->
- {MXs, UC1} = dtree:take(QPid, UC),
- (case rabbit_misc:is_abnormal_termination(Reason) of
- true -> fun send_nacks/2;
- false -> fun record_confirms/2
- end)(MXs, State#ch{unconfirmed = UC1}).
+ case rabbit_misc:is_abnormal_termination(Reason) of
+ true -> {MXs, UC1} = dtree:take_all(QPid, UC),
+ send_nacks(MXs, State#ch{unconfirmed = UC1});
+ false -> {MXs, UC1} = dtree:take(QPid, UC),
+ record_confirms(MXs, State#ch{unconfirmed = UC1})
+ end.
handle_consuming_queue_down(QPid,
State = #ch{consumer_mapping = ConsumerMapping,