summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2013-07-02 14:36:03 +0100
committerSimon MacMullen <simon@rabbitmq.com>2013-07-02 14:36:03 +0100
commit7e0e36acb972d00cd4f0723de6097b4678f68c2d (patch)
treef94040961dda4cc281d6edae3ff2919f4cbecb19
parentf6639b12baa1816f2722509658b0319690bb1955 (diff)
parent56aeaa2f84c48c93e1c53fc5a9ffd6121e9044cd (diff)
downloadrabbitmq-server-7e0e36acb972d00cd4f0723de6097b4678f68c2d.tar.gz
Merge bug25625
-rw-r--r--src/rabbit_amqqueue_process.erl6
-rw-r--r--src/rabbit_backing_queue.erl12
-rw-r--r--src/rabbit_mirror_queue_master.erl36
3 files changed, 19 insertions, 35 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index f08f8292..5409a806 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -549,10 +549,8 @@ attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message},
{{Message, Delivered, undefined},
true, discard(Delivery, State1)}
end, false, State#q{backing_queue_state = BQS1});
- {published, BQS1} ->
- {true, State#q{backing_queue_state = BQS1}};
- {discarded, BQS1} ->
- {true, discard(Delivery, State#q{backing_queue_state = BQS1})}
+ {true, BQS1} ->
+ {true, State#q{backing_queue_state = BQS1}}
end.
deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid},
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 2f247448..bf26cb5a 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -90,10 +90,7 @@
-> {ack(), state()}.
%% Called to inform the BQ about messages which have reached the
-%% queue, but are not going to be further passed to BQ for some
-%% reason. Note that this may be invoked for messages for which
-%% BQ:is_duplicate/2 has already returned {'published' | 'discarded',
-%% BQS}.
+%% queue, but are not going to be further passed to BQ.
-callback discard(rabbit_types:msg_id(), pid(), state()) -> state().
%% Return ids of messages which have been confirmed since the last
@@ -216,11 +213,10 @@
-callback invoke(atom(), fun ((atom(), A) -> A), state()) -> state().
%% Called prior to a publish or publish_delivered call. Allows the BQ
-%% to signal that it's already seen this message (and in what capacity
-%% - i.e. was it published previously or discarded previously) and
-%% thus the message should be dropped.
+%% to signal that it's already seen this message, (e.g. it was published
+%% or discarded previously) and thus the message should be dropped.
-callback is_duplicate(rabbit_types:basic_message(), state())
- -> {'false'|'published'|'discarded', state()}.
+ -> {boolean(), state()}.
-else.
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index bcd4861a..572cd0ca 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -225,21 +225,10 @@ discard(MsgId, ChPid, State = #state { gm = GM,
backing_queue = BQ,
backing_queue_state = BQS,
seen_status = SS }) ->
- %% It's a massive error if we get told to discard something that's
- %% already been published or published-and-confirmed. To do that
- %% would require non FIFO access. Hence we should not find
- %% 'published' or 'confirmed' in this dict:find.
- case dict:find(MsgId, SS) of
- error ->
- ok = gm:broadcast(GM, {discard, ChPid, MsgId}),
- BQS1 = BQ:discard(MsgId, ChPid, BQS),
- ensure_monitoring(
- ChPid, State #state {
- backing_queue_state = BQS1,
- seen_status = dict:erase(MsgId, SS) });
- {ok, discarded} ->
- State
- end.
+ false = dict:is_key(MsgId, SS), %% ASSERTION
+ ok = gm:broadcast(GM, {discard, ChPid, MsgId}),
+ ensure_monitoring(ChPid, State #state { backing_queue_state =
+ BQ:discard(MsgId, ChPid, BQS) }).
dropwhile(Pred, State = #state{backing_queue = BQ,
backing_queue_state = BQS }) ->
@@ -393,8 +382,9 @@ is_duplicate(Message = #basic_message { id = MsgId },
%% immediately after calling is_duplicate). The msg is
%% invalid. We will not see this again, nor will we be
%% further involved in confirming this message, so erase.
- {published, State #state { seen_status = dict:erase(MsgId, SS) }};
- {ok, confirmed} ->
+ {true, State #state { seen_status = dict:erase(MsgId, SS) }};
+ {ok, Disposition}
+ when Disposition =:= confirmed
%% It got published when we were a slave via gm, and
%% confirmed some time after that (maybe even after
%% promotion), but before we received the publish from the
@@ -403,12 +393,12 @@ is_duplicate(Message = #basic_message { id = MsgId },
%% need to confirm now. As above, amqqueue_process will
%% have the entry for the msg_id_to_channel mapping added
%% immediately after calling is_duplicate/2.
- {published, State #state { seen_status = dict:erase(MsgId, SS),
- confirmed = [MsgId | Confirmed] }};
- {ok, discarded} ->
- %% Don't erase from SS here because discard/2 is about to
- %% be called and we need to be able to detect this case
- {discarded, State}
+ orelse Disposition =:= discarded ->
+ %% Message was discarded while we were a slave. Confirm now.
+ %% As above, amqqueue_process will have the entry for the
+ %% msg_id_to_channel mapping.
+ {true, State #state { seen_status = dict:erase(MsgId, SS),
+ confirmed = [MsgId | Confirmed] }}
end.
%% ---------------------------------------------------------------------------