diff options
-rw-r--r-- | src/rabbit_mirror_queue_master.erl | 48 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 7 |
2 files changed, 39 insertions, 16 deletions
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 704e62c1..388f5ce3 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -243,10 +243,30 @@ status(#state { backing_queue = BQ, backing_queue_state = BQS}) -> invoke(?MODULE, Fun, State) -> Fun(State); -invoke(Mod, Fun, State = #state { backing_queue = BQ, - backing_queue_state = BQS }) -> +invoke(Mod, Fun, State = #state { backing_queue = BQ, + backing_queue_state = BQS, + seen_status = SS }) -> {MsgIds, BQS1} = BQ:invoke(Mod, Fun, BQS), - {MsgIds, State #state { backing_queue_state = BQS1 }}. + {MsgIds1, SS1} = + lists:foldl( + fun (MsgId, {MsgIdsN, SSN}) -> + case dict:find(MsgId, SSN) of + error -> + {[MsgId | MsgIdsN], SSN}; + {ok, published} -> + %% It was published when we were a slave, + %% and we were promoted before we saw the + %% publish from the channel. We still + %% haven't seen the channel publish, and + %% consequently we need to filter out the + %% confirm here. We will issue the confirm + %% when we see the publish from the + %% channel. + {MsgIdsN, dict:store(MsgId, confirmed, SSN)} + end + end, {[], SS}, MsgIds), + {MsgIds1, State #state { backing_queue_state = BQS1, + seen_status = SS1 }}. validate_message(Message = #basic_message { id = MsgId }, State = #state { seen_status = SS, @@ -265,20 +285,24 @@ validate_message(Message = #basic_message { id = MsgId }, %% only if we ourselves are not filtering out the msg. {Result, BQS1} = BQ:validate_message(Message, BQS), {Result, State #state { backing_queue_state = BQS1 }}; - {ok, {published, _ChPid}} -> + {ok, published} -> %% It already got published when we were a slave and no %% confirmation is waiting. amqqueue_process will have, in %% its msg_id_to_channel mapping, the entry for dealing - %% with the confirm when that comes back in. The msg is - %% invalid. We will not see this again, nor will we be + %% with the confirm when that comes back in (it's added + %% immediately prior to calling validate_message). The msg + %% is invalid. We will not see this again, nor will we be %% further involved in confirming this message, so erase. {invalid, State #state { seen_status = dict:erase(MsgId, SS) }}; - {ok, {confirmed, _ChPid}} -> - %% It got confirmed before we became master, but we've - %% only just received the publish from the channel, so - %% couldn't previously know what the msg_seq_no was. Thus - %% confirm now. As above, amqqueue_process will have the - %% entry for the msg_id_to_channel mapping. + {ok, confirmed} -> + %% It got published when we were a slave via gm, and + %% confirmed some time after that (maybe even after + %% promotion), but before we received the publish from the + %% channel, so couldn't previously know what the + %% msg_seq_no was (and thus confirm as a slave). So we + %% need to confirm now. As above, amqqueue_process will + %% have the entry for the msg_id_to_channel mapping added + %% immediately prior to calling validate_message/2. ok = rabbit_amqqueue:maybe_run_queue_via_backing_queue_async( self(), ?MODULE, fun (State1) -> {[MsgId], State1} end), {invalid, State #state { seen_status = dict:erase(MsgId, SS) }} diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 950df509..d7f86456 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -411,10 +411,9 @@ promote_me(From, #state { q = Q, %% this does not affect MS, nor which bits go through to SS in %% Master, or MTC in queue_process. - SS = dict:filter(fun (_MsgId, {published, _ChPid}) -> true; - (_MsgId, {published, _ChPid, _MsgSeqNo}) -> false; - (_MsgId, {confirmed, _ChPid}) -> true - end, MS), + SS = dict:from_list([{MsgId, Status} + || {MsgId, {Status, _ChPid}} <- dict:to_list(MS), + Status =:= published orelse Status =:= confirmed]), MasterState = rabbit_mirror_queue_master:promote_backing_queue_state( CPid, BQ, BQS, GM, SS), |