diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2010-12-20 00:26:33 +0000 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2010-12-20 00:26:33 +0000 |
commit | b73f2e5f8dd433fce76e1a8dee20596d6dbfd144 (patch) | |
tree | a69c76c8393a6eaeb0fb6b9e880d44425d60f96e /src/rabbit_mirror_queue_master.erl | |
parent | dfd985400ac482349797f430978b773292eaea0f (diff) | |
download | rabbitmq-server-b73f2e5f8dd433fce76e1a8dee20596d6dbfd144.tar.gz |
Right, well the fake handling code in master is fine. The fake handling code in slave is utterly wrong. However, I need to sleep
Diffstat (limited to 'src/rabbit_mirror_queue_master.erl')
-rw-r--r-- | src/rabbit_mirror_queue_master.erl | 63 |
1 files changed, 45 insertions, 18 deletions
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 0d64ab8e..4628796f 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -26,7 +26,7 @@ -export([start/1, stop/0]). --export([promote_backing_queue_state/4]). +-export([promote_backing_queue_state/5]). -behaviour(rabbit_backing_queue). @@ -36,7 +36,8 @@ coordinator, backing_queue, backing_queue_state, - set_delivered + set_delivered, + fakes }). %% --------------------------------------------------------------------------- @@ -64,14 +65,16 @@ init(#amqqueue { arguments = Args } = Q, Recover) -> coordinator = CPid, backing_queue = BQ, backing_queue_state = BQS, - set_delivered = 0 }. + set_delivered = 0, + fakes = sets:new() }. -promote_backing_queue_state(CPid, BQ, BQS, GM) -> +promote_backing_queue_state(CPid, BQ, BQS, GM, Fakes) -> #state { gm = GM, coordinator = CPid, backing_queue = BQ, backing_queue_state = BQS, - set_delivered = BQ:len(BQS) }. + set_delivered = BQ:len(BQS), + fakes = Fakes }. terminate(State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> %% Backing queue termination. The queue is going down but @@ -126,30 +129,54 @@ dropwhile(Fun, State = #state { gm = GM, fetch(AckRequired, State = #state { gm = GM, backing_queue = BQ, backing_queue_state = BQS, - set_delivered = SetDelivered }) -> + set_delivered = SetDelivered, + fakes = Fakes }) -> {Result, BQS1} = BQ:fetch(AckRequired, BQS), - State1 = State #state { backing_queue_state = BQS1 }, case Result of empty -> - {Result, State1}; + {Result, State #state { backing_queue_state = BQS1 }}; {#basic_message { guid = Guid } = Message, IsDelivered, AckTag, Remaining} -> - ok = gm:broadcast(GM, {fetch, AckRequired, Guid, Remaining}), - IsDelivered1 = IsDelivered orelse SetDelivered > 0, SetDelivered1 = lists:max([0, SetDelivered - 1]), - {{Message, IsDelivered1, AckTag, Remaining}, - State1 #state { set_delivered = SetDelivered1 }} + case sets:is_element(Guid, Fakes) of + true -> + {BQS2, Fakes1} = + case AckRequired of + true -> {[Guid], BQS3} = BQ:ack([AckTag], BQS1), + {BQS3, Fakes}; + false -> {BQS1, sets:del_element(Guid, Fakes)} + end, + ok = gm:broadcast(GM, {fetch, false, Guid, Remaining}), + fetch(AckRequired, + State #state { backing_queue_state = BQS2, + set_delivered = SetDelivered1, + fakes = Fakes1 }); + false -> + ok = gm:broadcast(GM, + {fetch, AckRequired, Guid, Remaining}), + IsDelivered1 = IsDelivered orelse SetDelivered > 0, + Fakes1 = case SetDelivered + SetDelivered1 of + 1 -> sets:new(); %% transition to 0 + _ -> Fakes + end, + {{Message, IsDelivered1, AckTag, Remaining}, + State #state { backing_queue_state = BQS1, + set_delivered = SetDelivered1, + fakes = Fakes1 }} + end end. ack(AckTags, State = #state { gm = GM, backing_queue = BQ, - backing_queue_state = BQS }) -> + backing_queue_state = BQS, + fakes = Fakes }) -> {Guids, BQS1} = BQ:ack(AckTags, BQS), - case Guids of - [] -> ok; - _ -> ok = gm:broadcast(GM, {ack, Guids}) - end, - {Guids, State #state { backing_queue_state = BQS1 }}. + Fakes1 = case Guids of + [] -> Fakes; + _ -> ok = gm:broadcast(GM, {ack, Guids}), + sets:difference(Fakes, sets:from_list(Guids)) + end, + {Guids, State #state { backing_queue_state = BQS1, fakes = Fakes1 }}. tx_publish(Txn, Msg, MsgProps, ChPid, #state {} = State) -> %% gm:broadcast(GM, {tx_publish, Txn, Guid, MsgProps, ChPid}) |