diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2012-04-03 18:40:40 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2012-04-03 18:40:40 +0100 |
commit | 363f7a6aad12282b2c87983145c00b0c2f7faad9 (patch) | |
tree | 771d24ec71ddca8306288575c4fdcf18ee2f6bc8 | |
parent | d8ebe0e0c16c2b4b27ef57800a30cd9aadd3f625 (diff) | |
download | rabbitmq-server-363f7a6aad12282b2c87983145c00b0c2f7faad9.tar.gz |
fix nack handling in channel
When a queue dies abnormally we nack all messages sent to that queue
and still pending confirms. Any confirms coming in for these messages
thereafter - from other queues - are ignored.
-rw-r--r-- | src/dtree.erl | 27 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 11 |
2 files changed, 32 insertions, 6 deletions
diff --git a/src/dtree.erl b/src/dtree.erl index 30fa8794..66a862cb 100644 --- a/src/dtree.erl +++ b/src/dtree.erl @@ -31,7 +31,7 @@ -module(dtree). --export([empty/0, insert/4, take/3, take/2, +-export([empty/0, insert/4, take/3, take/2, take_all/2, is_defined/2, is_empty/1, smallest/1, size/1]). %%---------------------------------------------------------------------------- @@ -51,6 +51,7 @@ -spec(insert/4 :: (pk(), [sk()], val(), ?MODULE()) -> ?MODULE()). -spec(take/3 :: ([pk()], sk(), ?MODULE()) -> {[kv()], ?MODULE()}). -spec(take/2 :: (sk(), ?MODULE()) -> {[kv()], ?MODULE()}). +-spec(take_all/2 :: (sk(), ?MODULE()) -> {[kv()], ?MODULE()}). -spec(is_defined/2 :: (sk(), ?MODULE()) -> boolean()). -spec(is_empty/1 :: (?MODULE()) -> boolean()). -spec(smallest/1 :: (?MODULE()) -> kv()). @@ -93,6 +94,13 @@ take(SK, {P, S}) -> {KVs, {P1, gb_trees:delete(SK, S)}} end. +take_all(SK, {P, S}) -> + case gb_trees:lookup(SK, S) of + none -> {[], {P, S}}; + {value, PKS} -> {KVs, SKS, P1} = take_all2(PKS, P), + {KVs, {P1, prune(SKS, PKS, S)}} + end. + is_defined(SK, {_P, S}) -> gb_trees:is_defined(SK, S). is_empty({P, _S}) -> gb_trees:is_empty(P). @@ -114,3 +122,20 @@ take2(PKS, SK, P) -> false -> {KVs, gb_trees:update(PK, {SKS1, V}, P0)} end end, {[], P}, PKS). + +take_all2(PKS, P) -> + gb_sets:fold(fun (PK, {KVs, SKS0, P0}) -> + {SKS, V} = gb_trees:get(PK, P0), + {[{PK, V} | KVs], gb_sets:union(SKS, SKS0), + gb_trees:delete(PK, P0)} + end, {[], gb_sets:empty(), P}, PKS). + +prune(SKS, PKS, S) -> + gb_sets:fold(fun (SK0, S0) -> + PKS1 = gb_trees:get(SK0, S0), + PKS2 = gb_sets:difference(PKS1, PKS), + case gb_sets:is_empty(PKS2) of + true -> gb_trees:delete(SK0, S0); + false -> gb_trees:update(SK0, PKS2, S0) + end + end, S, SKS). diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 4a0e93be..0c1c11d8 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -1116,11 +1116,12 @@ monitor_queue(QPid, State = #ch{queue_monitors = QMons}) -> end. handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed = UC}) -> - {MXs, UC1} = dtree:take(QPid, UC), - (case rabbit_misc:is_abnormal_termination(Reason) of - true -> fun send_nacks/2; - false -> fun record_confirms/2 - end)(MXs, State#ch{unconfirmed = UC1}). + case rabbit_misc:is_abnormal_termination(Reason) of + true -> {MXs, UC1} = dtree:take_all(QPid, UC), + send_nacks(MXs, State#ch{unconfirmed = UC1}); + false -> {MXs, UC1} = dtree:take(QPid, UC), + record_confirms(MXs, State#ch{unconfirmed = UC1}) + end. handle_consuming_queue_down(QPid, State = #ch{consumer_mapping = ConsumerMapping, |