summaryrefslogtreecommitdiff
path: root/src/rabbit_mirror_queue_master.erl
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2010-12-20 00:26:33 +0000
committerMatthew Sackman <matthew@rabbitmq.com>2010-12-20 00:26:33 +0000
commitb73f2e5f8dd433fce76e1a8dee20596d6dbfd144 (patch)
treea69c76c8393a6eaeb0fb6b9e880d44425d60f96e /src/rabbit_mirror_queue_master.erl
parentdfd985400ac482349797f430978b773292eaea0f (diff)
downloadrabbitmq-server-b73f2e5f8dd433fce76e1a8dee20596d6dbfd144.tar.gz
Right, well the fake handling code in master is fine. The fake handling code in slave is utterly wrong. However, I need to sleep
Diffstat (limited to 'src/rabbit_mirror_queue_master.erl')
-rw-r--r--src/rabbit_mirror_queue_master.erl63
1 files changed, 45 insertions, 18 deletions
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 0d64ab8e..4628796f 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -26,7 +26,7 @@
-export([start/1, stop/0]).
--export([promote_backing_queue_state/4]).
+-export([promote_backing_queue_state/5]).
-behaviour(rabbit_backing_queue).
@@ -36,7 +36,8 @@
coordinator,
backing_queue,
backing_queue_state,
- set_delivered
+ set_delivered,
+ fakes
}).
%% ---------------------------------------------------------------------------
@@ -64,14 +65,16 @@ init(#amqqueue { arguments = Args } = Q, Recover) ->
coordinator = CPid,
backing_queue = BQ,
backing_queue_state = BQS,
- set_delivered = 0 }.
+ set_delivered = 0,
+ fakes = sets:new() }.
-promote_backing_queue_state(CPid, BQ, BQS, GM) ->
+promote_backing_queue_state(CPid, BQ, BQS, GM, Fakes) ->
#state { gm = GM,
coordinator = CPid,
backing_queue = BQ,
backing_queue_state = BQS,
- set_delivered = BQ:len(BQS) }.
+ set_delivered = BQ:len(BQS),
+ fakes = Fakes }.
terminate(State = #state { backing_queue = BQ, backing_queue_state = BQS }) ->
%% Backing queue termination. The queue is going down but
@@ -126,30 +129,54 @@ dropwhile(Fun, State = #state { gm = GM,
fetch(AckRequired, State = #state { gm = GM,
backing_queue = BQ,
backing_queue_state = BQS,
- set_delivered = SetDelivered }) ->
+ set_delivered = SetDelivered,
+ fakes = Fakes }) ->
{Result, BQS1} = BQ:fetch(AckRequired, BQS),
- State1 = State #state { backing_queue_state = BQS1 },
case Result of
empty ->
- {Result, State1};
+ {Result, State #state { backing_queue_state = BQS1 }};
{#basic_message { guid = Guid } = Message, IsDelivered, AckTag,
Remaining} ->
- ok = gm:broadcast(GM, {fetch, AckRequired, Guid, Remaining}),
- IsDelivered1 = IsDelivered orelse SetDelivered > 0,
SetDelivered1 = lists:max([0, SetDelivered - 1]),
- {{Message, IsDelivered1, AckTag, Remaining},
- State1 #state { set_delivered = SetDelivered1 }}
+ case sets:is_element(Guid, Fakes) of
+ true ->
+ {BQS2, Fakes1} =
+ case AckRequired of
+ true -> {[Guid], BQS3} = BQ:ack([AckTag], BQS1),
+ {BQS3, Fakes};
+ false -> {BQS1, sets:del_element(Guid, Fakes)}
+ end,
+ ok = gm:broadcast(GM, {fetch, false, Guid, Remaining}),
+ fetch(AckRequired,
+ State #state { backing_queue_state = BQS2,
+ set_delivered = SetDelivered1,
+ fakes = Fakes1 });
+ false ->
+ ok = gm:broadcast(GM,
+ {fetch, AckRequired, Guid, Remaining}),
+ IsDelivered1 = IsDelivered orelse SetDelivered > 0,
+ Fakes1 = case SetDelivered + SetDelivered1 of
+ 1 -> sets:new(); %% transition to 0
+ _ -> Fakes
+ end,
+ {{Message, IsDelivered1, AckTag, Remaining},
+ State #state { backing_queue_state = BQS1,
+ set_delivered = SetDelivered1,
+ fakes = Fakes1 }}
+ end
end.
ack(AckTags, State = #state { gm = GM,
backing_queue = BQ,
- backing_queue_state = BQS }) ->
+ backing_queue_state = BQS,
+ fakes = Fakes }) ->
{Guids, BQS1} = BQ:ack(AckTags, BQS),
- case Guids of
- [] -> ok;
- _ -> ok = gm:broadcast(GM, {ack, Guids})
- end,
- {Guids, State #state { backing_queue_state = BQS1 }}.
+ Fakes1 = case Guids of
+ [] -> Fakes;
+ _ -> ok = gm:broadcast(GM, {ack, Guids}),
+ sets:difference(Fakes, sets:from_list(Guids))
+ end,
+ {Guids, State #state { backing_queue_state = BQS1, fakes = Fakes1 }}.
tx_publish(Txn, Msg, MsgProps, ChPid, #state {} = State) ->
%% gm:broadcast(GM, {tx_publish, Txn, Guid, MsgProps, ChPid})