summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_mirror_queue_master.erl48
-rw-r--r--src/rabbit_mirror_queue_slave.erl7
2 files changed, 39 insertions, 16 deletions
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 704e62c1..388f5ce3 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -243,10 +243,30 @@ status(#state { backing_queue = BQ, backing_queue_state = BQS}) ->
invoke(?MODULE, Fun, State) ->
Fun(State);
-invoke(Mod, Fun, State = #state { backing_queue = BQ,
- backing_queue_state = BQS }) ->
+invoke(Mod, Fun, State = #state { backing_queue = BQ,
+ backing_queue_state = BQS,
+ seen_status = SS }) ->
{MsgIds, BQS1} = BQ:invoke(Mod, Fun, BQS),
- {MsgIds, State #state { backing_queue_state = BQS1 }}.
+ {MsgIds1, SS1} =
+ lists:foldl(
+ fun (MsgId, {MsgIdsN, SSN}) ->
+ case dict:find(MsgId, SSN) of
+ error ->
+ {[MsgId | MsgIdsN], SSN};
+ {ok, published} ->
+ %% It was published when we were a slave,
+ %% and we were promoted before we saw the
+ %% publish from the channel. We still
+ %% haven't seen the channel publish, and
+ %% consequently we need to filter out the
+ %% confirm here. We will issue the confirm
+ %% when we see the publish from the
+ %% channel.
+ {MsgIdsN, dict:store(MsgId, confirmed, SSN)}
+ end
+ end, {[], SS}, MsgIds),
+ {MsgIds1, State #state { backing_queue_state = BQS1,
+ seen_status = SS1 }}.
validate_message(Message = #basic_message { id = MsgId },
State = #state { seen_status = SS,
@@ -265,20 +285,24 @@ validate_message(Message = #basic_message { id = MsgId },
%% only if we ourselves are not filtering out the msg.
{Result, BQS1} = BQ:validate_message(Message, BQS),
{Result, State #state { backing_queue_state = BQS1 }};
- {ok, {published, _ChPid}} ->
+ {ok, published} ->
%% It already got published when we were a slave and no
%% confirmation is waiting. amqqueue_process will have, in
%% its msg_id_to_channel mapping, the entry for dealing
- %% with the confirm when that comes back in. The msg is
- %% invalid. We will not see this again, nor will we be
+ %% with the confirm when that comes back in (it's added
+ %% immediately prior to calling validate_message). The msg
+ %% is invalid. We will not see this again, nor will we be
%% further involved in confirming this message, so erase.
{invalid, State #state { seen_status = dict:erase(MsgId, SS) }};
- {ok, {confirmed, _ChPid}} ->
- %% It got confirmed before we became master, but we've
- %% only just received the publish from the channel, so
- %% couldn't previously know what the msg_seq_no was. Thus
- %% confirm now. As above, amqqueue_process will have the
- %% entry for the msg_id_to_channel mapping.
+ {ok, confirmed} ->
+ %% It got published when we were a slave via gm, and
+ %% confirmed some time after that (maybe even after
+ %% promotion), but before we received the publish from the
+ %% channel, so couldn't previously know what the
+ %% msg_seq_no was (and thus confirm as a slave). So we
+ %% need to confirm now. As above, amqqueue_process will
+ %% have the entry for the msg_id_to_channel mapping added
+ %% immediately prior to calling validate_message/2.
ok = rabbit_amqqueue:maybe_run_queue_via_backing_queue_async(
self(), ?MODULE, fun (State1) -> {[MsgId], State1} end),
{invalid, State #state { seen_status = dict:erase(MsgId, SS) }}
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 950df509..d7f86456 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -411,10 +411,9 @@ promote_me(From, #state { q = Q,
%% this does not affect MS, nor which bits go through to SS in
%% Master, or MTC in queue_process.
- SS = dict:filter(fun (_MsgId, {published, _ChPid}) -> true;
- (_MsgId, {published, _ChPid, _MsgSeqNo}) -> false;
- (_MsgId, {confirmed, _ChPid}) -> true
- end, MS),
+ SS = dict:from_list([{MsgId, Status}
+ || {MsgId, {Status, _ChPid}} <- dict:to_list(MS),
+ Status =:= published orelse Status =:= confirmed]),
MasterState = rabbit_mirror_queue_master:promote_backing_queue_state(
CPid, BQ, BQS, GM, SS),