diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2011-03-10 16:26:37 +0000 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2011-03-10 16:26:37 +0000 |
commit | 4013400d6787ac306c1c1c354a6db8d5a283bfba (patch) | |
tree | eb11a2a77df4a19553206408c0f7d80729f0e45b /src/rabbit_mirror_queue_master.erl | |
parent | 5f11273b7379510d20b3792d30b8a0c9481353ef (diff) | |
download | rabbitmq-server-4013400d6787ac306c1c1c354a6db8d5a283bfba.tar.gz |
discovered another entire class of interleaving opportunities I've not considered. Fortunately, the fix turned out to be simple. ish.
Diffstat (limited to 'src/rabbit_mirror_queue_master.erl')
-rw-r--r-- | src/rabbit_mirror_queue_master.erl | 48 |
1 files changed, 36 insertions, 12 deletions
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 704e62c1..388f5ce3 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -243,10 +243,30 @@ status(#state { backing_queue = BQ, backing_queue_state = BQS}) -> invoke(?MODULE, Fun, State) -> Fun(State); -invoke(Mod, Fun, State = #state { backing_queue = BQ, - backing_queue_state = BQS }) -> +invoke(Mod, Fun, State = #state { backing_queue = BQ, + backing_queue_state = BQS, + seen_status = SS }) -> {MsgIds, BQS1} = BQ:invoke(Mod, Fun, BQS), - {MsgIds, State #state { backing_queue_state = BQS1 }}. + {MsgIds1, SS1} = + lists:foldl( + fun (MsgId, {MsgIdsN, SSN}) -> + case dict:find(MsgId, SSN) of + error -> + {[MsgId | MsgIdsN], SSN}; + {ok, published} -> + %% It was published when we were a slave, + %% and we were promoted before we saw the + %% publish from the channel. We still + %% haven't seen the channel publish, and + %% consequently we need to filter out the + %% confirm here. We will issue the confirm + %% when we see the publish from the + %% channel. + {MsgIdsN, dict:store(MsgId, confirmed, SSN)} + end + end, {[], SS}, MsgIds), + {MsgIds1, State #state { backing_queue_state = BQS1, + seen_status = SS1 }}. validate_message(Message = #basic_message { id = MsgId }, State = #state { seen_status = SS, @@ -265,20 +285,24 @@ validate_message(Message = #basic_message { id = MsgId }, %% only if we ourselves are not filtering out the msg. {Result, BQS1} = BQ:validate_message(Message, BQS), {Result, State #state { backing_queue_state = BQS1 }}; - {ok, {published, _ChPid}} -> + {ok, published} -> %% It already got published when we were a slave and no %% confirmation is waiting. amqqueue_process will have, in %% its msg_id_to_channel mapping, the entry for dealing - %% with the confirm when that comes back in. The msg is - %% invalid. We will not see this again, nor will we be + %% with the confirm when that comes back in (it's added + %% immediately prior to calling validate_message). The msg + %% is invalid. We will not see this again, nor will we be %% further involved in confirming this message, so erase. {invalid, State #state { seen_status = dict:erase(MsgId, SS) }}; - {ok, {confirmed, _ChPid}} -> - %% It got confirmed before we became master, but we've - %% only just received the publish from the channel, so - %% couldn't previously know what the msg_seq_no was. Thus - %% confirm now. As above, amqqueue_process will have the - %% entry for the msg_id_to_channel mapping. + {ok, confirmed} -> + %% It got published when we were a slave via gm, and + %% confirmed some time after that (maybe even after + %% promotion), but before we received the publish from the + %% channel, so couldn't previously know what the + %% msg_seq_no was (and thus confirm as a slave). So we + %% need to confirm now. As above, amqqueue_process will + %% have the entry for the msg_id_to_channel mapping added + %% immediately prior to calling validate_message/2. ok = rabbit_amqqueue:maybe_run_queue_via_backing_queue_async( self(), ?MODULE, fun (State1) -> {[MsgId], State1} end), {invalid, State #state { seen_status = dict:erase(MsgId, SS) }} |