summaryrefslogtreecommitdiff
path: root/src/rabbit_mirror_queue_master.erl
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2011-03-10 16:26:37 +0000
committerMatthew Sackman <matthew@rabbitmq.com>2011-03-10 16:26:37 +0000
commit4013400d6787ac306c1c1c354a6db8d5a283bfba (patch)
treeeb11a2a77df4a19553206408c0f7d80729f0e45b /src/rabbit_mirror_queue_master.erl
parent5f11273b7379510d20b3792d30b8a0c9481353ef (diff)
downloadrabbitmq-server-4013400d6787ac306c1c1c354a6db8d5a283bfba.tar.gz
discovered another entire class of interleaving opportunities I've not considered. Fortunately, the fix turned out to be simple. ish.
Diffstat (limited to 'src/rabbit_mirror_queue_master.erl')
-rw-r--r--src/rabbit_mirror_queue_master.erl48
1 files changed, 36 insertions, 12 deletions
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 704e62c1..388f5ce3 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -243,10 +243,30 @@ status(#state { backing_queue = BQ, backing_queue_state = BQS}) ->
invoke(?MODULE, Fun, State) ->
Fun(State);
-invoke(Mod, Fun, State = #state { backing_queue = BQ,
- backing_queue_state = BQS }) ->
+invoke(Mod, Fun, State = #state { backing_queue = BQ,
+ backing_queue_state = BQS,
+ seen_status = SS }) ->
{MsgIds, BQS1} = BQ:invoke(Mod, Fun, BQS),
- {MsgIds, State #state { backing_queue_state = BQS1 }}.
+ {MsgIds1, SS1} =
+ lists:foldl(
+ fun (MsgId, {MsgIdsN, SSN}) ->
+ case dict:find(MsgId, SSN) of
+ error ->
+ {[MsgId | MsgIdsN], SSN};
+ {ok, published} ->
+ %% It was published when we were a slave,
+ %% and we were promoted before we saw the
+ %% publish from the channel. We still
+ %% haven't seen the channel publish, and
+ %% consequently we need to filter out the
+ %% confirm here. We will issue the confirm
+ %% when we see the publish from the
+ %% channel.
+ {MsgIdsN, dict:store(MsgId, confirmed, SSN)}
+ end
+ end, {[], SS}, MsgIds),
+ {MsgIds1, State #state { backing_queue_state = BQS1,
+ seen_status = SS1 }}.
validate_message(Message = #basic_message { id = MsgId },
State = #state { seen_status = SS,
@@ -265,20 +285,24 @@ validate_message(Message = #basic_message { id = MsgId },
%% only if we ourselves are not filtering out the msg.
{Result, BQS1} = BQ:validate_message(Message, BQS),
{Result, State #state { backing_queue_state = BQS1 }};
- {ok, {published, _ChPid}} ->
+ {ok, published} ->
%% It already got published when we were a slave and no
%% confirmation is waiting. amqqueue_process will have, in
%% its msg_id_to_channel mapping, the entry for dealing
- %% with the confirm when that comes back in. The msg is
- %% invalid. We will not see this again, nor will we be
+ %% with the confirm when that comes back in (it's added
+ %% immediately prior to calling validate_message). The msg
+ %% is invalid. We will not see this again, nor will we be
%% further involved in confirming this message, so erase.
{invalid, State #state { seen_status = dict:erase(MsgId, SS) }};
- {ok, {confirmed, _ChPid}} ->
- %% It got confirmed before we became master, but we've
- %% only just received the publish from the channel, so
- %% couldn't previously know what the msg_seq_no was. Thus
- %% confirm now. As above, amqqueue_process will have the
- %% entry for the msg_id_to_channel mapping.
+ {ok, confirmed} ->
+ %% It got published when we were a slave via gm, and
+ %% confirmed some time after that (maybe even after
+ %% promotion), but before we received the publish from the
+ %% channel, so couldn't previously know what the
+ %% msg_seq_no was (and thus confirm as a slave). So we
+ %% need to confirm now. As above, amqqueue_process will
+ %% have the entry for the msg_id_to_channel mapping added
+ %% immediately prior to calling validate_message/2.
ok = rabbit_amqqueue:maybe_run_queue_via_backing_queue_async(
self(), ?MODULE, fun (State1) -> {[MsgId], State1} end),
{invalid, State #state { seen_status = dict:erase(MsgId, SS) }}