summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEmile Joubert <emile@rabbitmq.com>2013-06-27 17:48:40 +0100
committerEmile Joubert <emile@rabbitmq.com>2013-06-27 17:48:40 +0100
commitdcafe81a206c85dc83846647fa2878a26ccbaf4c (patch)
tree81ce3495761d5f1c38c73f80a18b2912e561d5f9
parent23344f846db43fcedd526f3ce0127fbf4847396e (diff)
downloadrabbitmq-server-dcafe81a206c85dc83846647fa2878a26ccbaf4c.tar.gz
Optimisation for handling discarded deliveries
-rw-r--r--src/rabbit_amqqueue_process.erl11
-rw-r--r--src/rabbit_mirror_queue_master.erl22
2 files changed, 16 insertions, 17 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 3e3de10b..0d8d6e29 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -533,7 +533,9 @@ run_message_queue(State) ->
is_empty(State), State),
State1.
-attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message},
+attempt_delivery(Delivery = #delivery{sender = SenderPid,
+ msg_seq_no = MsgSeqNo,
+ message = Message},
Props, Delivered, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
case BQ:is_duplicate(Message, BQS) of
@@ -551,7 +553,12 @@ attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message},
{published, BQS1} ->
{true, State#q{backing_queue_state = BQS1}};
{discarded, BQS1} ->
- {true, discard(Delivery, State#q{backing_queue_state = BQS1})}
+ State1 = State#q{backing_queue_state = BQS1},
+ {true, case MsgSeqNo of
+ undefined -> State;
+ _ -> #basic_message{id = MsgId} = Message,
+ confirm_messages([MsgId], State)
+ end}
end.
deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid},
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index acf77df6..8c061e52 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -225,18 +225,10 @@ discard(MsgId, ChPid, State = #state { gm = GM,
backing_queue = BQ,
backing_queue_state = BQS,
seen_status = SS }) ->
- %% It's a massive error if we get told to discard something that's
- %% already been published or published-and-confirmed. To do that
- %% would require non FIFO access. Hence we should not find
- %% 'published' or 'confirmed' in this dict:find.
- State1 = case dict:find(MsgId, SS) of
- error ->
- ok = gm:broadcast(GM, {discard, ChPid, MsgId}),
- State #state { backing_queue_state =
- BQ:discard(MsgId, ChPid, BQS) };
- {ok, discarded} ->
- State #state { seen_status = dict:erase(MsgId, SS) }
- end,
+ false = dict:is_key(MsgId, SS), %% ASSERTION
+ ok = gm:broadcast(GM, {discard, ChPid, MsgId}),
+ State1 = State #state { backing_queue_state =
+ BQ:discard(MsgId, ChPid, BQS) },
ensure_monitoring(ChPid, State1).
dropwhile(Pred, State = #state{backing_queue = BQ,
@@ -404,9 +396,9 @@ is_duplicate(Message = #basic_message { id = MsgId },
{published, State #state { seen_status = dict:erase(MsgId, SS),
confirmed = [MsgId | Confirmed] }};
{ok, discarded} ->
- %% Don't erase from SS here because discard/2 is about to
- %% be called and we need to be able to detect this case
- {discarded, State}
+ %% Message was discarded while we were a slave. Erase
+ %% and let amqqueue_process confirm if necessary.
+ {discarded, State #state { seen_status = dict:erase(MsgId, SS) }}
end.
%% ---------------------------------------------------------------------------