diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2011-03-14 16:54:23 +0000 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2011-03-14 16:54:23 +0000 |
commit | b6f5d1dcd1e9465f84a4374ecdece7a37f51504b (patch) | |
tree | 7a55815d85b837a317a9e6b528461708c315b4ec /src/rabbit_mirror_queue_master.erl | |
parent | 279e858cac439e493fe3990a4ef0ef689a0ff29b (diff) | |
parent | 80854415c2e1579d52127b9722c985c77d0791e4 (diff) | |
download | rabbitmq-server-b6f5d1dcd1e9465f84a4374ecdece7a37f51504b.tar.gz |
merge default into bug23554
Diffstat (limited to 'src/rabbit_mirror_queue_master.erl')
-rw-r--r-- | src/rabbit_mirror_queue_master.erl | 84 |
1 files changed, 47 insertions, 37 deletions
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 25a1e4b8..0ca73f03 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -16,10 +16,10 @@ -module(rabbit_mirror_queue_master). --export([init/2, terminate/1, delete_and_terminate/1, +-export([init/4, terminate/1, delete_and_terminate/1, purge/1, publish/4, publish_delivered/5, fetch/2, ack/2, tx_publish/5, tx_ack/3, tx_rollback/2, tx_commit/4, - requeue/3, len/1, is_empty/1, dropwhile/2, + requeue/3, len/1, is_empty/1, drain_confirmed/1, dropwhile/2, set_ram_duration_target/2, ram_duration/1, needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1, status/1, invoke/3, validate_message/2]). @@ -37,7 +37,8 @@ backing_queue, backing_queue_state, set_delivered, - seen_status + seen_status, + confirmed }). %% --------------------------------------------------------------------------- @@ -53,7 +54,8 @@ stop() -> %% Same as start/1. exit({not_valid_for_generic_backing_queue, ?MODULE}). -init(#amqqueue { arguments = Args, name = QName } = Q, Recover) -> +init(#amqqueue { arguments = Args, name = QName } = Q, Recover, + AsyncCallback, SyncCallback) -> {ok, CPid} = rabbit_mirror_queue_coordinator:start_link(Q, undefined), GM = rabbit_mirror_queue_coordinator:get_gm(CPid), {_Type, Nodes} = rabbit_misc:table_lookup(Args, <<"x-mirror">>), @@ -64,13 +66,14 @@ init(#amqqueue { arguments = Args, name = QName } = Q, Recover) -> end, [rabbit_mirror_queue_misc:add_slave(QName, Node) || Node <- Nodes1], {ok, BQ} = application:get_env(backing_queue_module), - BQS = BQ:init(Q, Recover), + BQS = BQ:init(Q, Recover, AsyncCallback, SyncCallback), #state { gm = GM, coordinator = CPid, backing_queue = BQ, backing_queue_state = BQS, set_delivered = 0, - seen_status = dict:new() }. + seen_status = dict:new(), + confirmed = [] }. promote_backing_queue_state(CPid, BQ, BQS, GM, SeenStatus) -> #state { gm = GM, @@ -78,7 +81,8 @@ promote_backing_queue_state(CPid, BQ, BQS, GM, SeenStatus) -> backing_queue = BQ, backing_queue_state = BQS, set_delivered = BQ:len(BQS), - seen_status = SeenStatus }. + seen_status = SeenStatus, + confirmed = [] }. terminate(State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> %% Backing queue termination. The queue is going down but @@ -138,6 +142,35 @@ dropwhile(Fun, State = #state { gm = GM, State #state { backing_queue_state = BQS1, set_delivered = SetDelivered1 }. +drain_confirmed(State = #state { backing_queue = BQ, + backing_queue_state = BQS, + seen_status = SS, + confirmed = Confirmed }) -> + {MsgIds, BQS1} = BQ:drain_confirmed(BQS), + {MsgIds1, SS1} = + lists:foldl( + fun (MsgId, {MsgIdsN, SSN}) -> + case dict:find(MsgId, SSN) of + error -> + {[MsgId | MsgIdsN], SSN}; + {ok, published} -> + %% It was published when we were a slave, + %% and we were promoted before we saw the + %% publish from the channel. We still + %% haven't seen the channel publish, and + %% consequently we need to filter out the + %% confirm here. We will issue the confirm + %% when we see the publish from the channel. + {MsgIdsN, dict:store(MsgId, confirmed, SSN)}; + {ok, confirmed} -> + %% Well, confirms are racy by definition. + {[MsgId | MsgIdsN], SSN} + end + end, {[], SS}, MsgIds), + {Confirmed ++ MsgIds1, State #state { backing_queue_state = BQS1, + seen_status = SS1, + confirmed = [] }}. + fetch(AckRequired, State = #state { gm = GM, backing_queue = BQ, backing_queue_state = BQS, @@ -236,38 +269,16 @@ status(#state { backing_queue = BQ, backing_queue_state = BQS}) -> BQ:status(BQS). invoke(?MODULE, Fun, State) -> - Fun(State); + Fun(?MODULE, State); invoke(Mod, Fun, State = #state { backing_queue = BQ, - backing_queue_state = BQS, - seen_status = SS }) -> - {MsgIds, BQS1} = BQ:invoke(Mod, Fun, BQS), - {MsgIds1, SS1} = - lists:foldl( - fun (MsgId, {MsgIdsN, SSN}) -> - case dict:find(MsgId, SSN) of - error -> - {[MsgId | MsgIdsN], SSN}; - {ok, published} -> - %% It was published when we were a slave, - %% and we were promoted before we saw the - %% publish from the channel. We still - %% haven't seen the channel publish, and - %% consequently we need to filter out the - %% confirm here. We will issue the confirm - %% when we see the publish from the channel. - {MsgIdsN, dict:store(MsgId, confirmed, SSN)}; - {ok, confirmed} -> - %% Well, confirms are racy by definition. - {[MsgId | MsgIdsN], SSN} - end - end, {[], SS}, MsgIds), - {MsgIds1, State #state { backing_queue_state = BQS1, - seen_status = SS1 }}. + backing_queue_state = BQS }) -> + State #state { backing_queue_state = BQ:invoke(Mod, Fun, BQS) }. validate_message(Message = #basic_message { id = MsgId }, State = #state { seen_status = SS, backing_queue = BQ, - backing_queue_state = BQS }) -> + backing_queue_state = BQS, + confirmed = Confirmed }) -> %% Here, we need to deal with the possibility that we're about to %% receive a message that we've already seen when we were a slave %% (we received it via gm). Thus if we do receive such message now @@ -299,7 +310,6 @@ validate_message(Message = #basic_message { id = MsgId }, %% need to confirm now. As above, amqqueue_process will %% have the entry for the msg_id_to_channel mapping added %% immediately prior to calling validate_message/2. - ok = rabbit_amqqueue:maybe_run_queue_via_backing_queue_async( - self(), ?MODULE, fun (State1) -> {[MsgId], State1} end), - {invalid, State #state { seen_status = dict:erase(MsgId, SS) }} + {invalid, State #state { seen_status = dict:erase(MsgId, SS), + confirmed = [MsgId | Confirmed] }} end. |