diff options
author | Emile Joubert <emile@rabbitmq.com> | 2013-06-27 17:48:40 +0100 |
---|---|---|
committer | Emile Joubert <emile@rabbitmq.com> | 2013-06-27 17:48:40 +0100 |
commit | dcafe81a206c85dc83846647fa2878a26ccbaf4c (patch) | |
tree | 81ce3495761d5f1c38c73f80a18b2912e561d5f9 | |
parent | 23344f846db43fcedd526f3ce0127fbf4847396e (diff) | |
download | rabbitmq-server-dcafe81a206c85dc83846647fa2878a26ccbaf4c.tar.gz |
Optimisation for handling discarded deliveries
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 11 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_master.erl | 22 |
2 files changed, 16 insertions, 17 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 3e3de10b..0d8d6e29 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -533,7 +533,9 @@ run_message_queue(State) -> is_empty(State), State), State1. -attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message}, +attempt_delivery(Delivery = #delivery{sender = SenderPid, + msg_seq_no = MsgSeqNo, + message = Message}, Props, Delivered, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> case BQ:is_duplicate(Message, BQS) of @@ -551,7 +553,12 @@ attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message}, {published, BQS1} -> {true, State#q{backing_queue_state = BQS1}}; {discarded, BQS1} -> - {true, discard(Delivery, State#q{backing_queue_state = BQS1})} + State1 = State#q{backing_queue_state = BQS1}, + {true, case MsgSeqNo of + undefined -> State; + _ -> #basic_message{id = MsgId} = Message, + confirm_messages([MsgId], State) + end} end. deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid}, diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index acf77df6..8c061e52 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -225,18 +225,10 @@ discard(MsgId, ChPid, State = #state { gm = GM, backing_queue = BQ, backing_queue_state = BQS, seen_status = SS }) -> - %% It's a massive error if we get told to discard something that's - %% already been published or published-and-confirmed. To do that - %% would require non FIFO access. Hence we should not find - %% 'published' or 'confirmed' in this dict:find. - State1 = case dict:find(MsgId, SS) of - error -> - ok = gm:broadcast(GM, {discard, ChPid, MsgId}), - State #state { backing_queue_state = - BQ:discard(MsgId, ChPid, BQS) }; - {ok, discarded} -> - State #state { seen_status = dict:erase(MsgId, SS) } - end, + false = dict:is_key(MsgId, SS), %% ASSERTION + ok = gm:broadcast(GM, {discard, ChPid, MsgId}), + State1 = State #state { backing_queue_state = + BQ:discard(MsgId, ChPid, BQS) }, ensure_monitoring(ChPid, State1). dropwhile(Pred, State = #state{backing_queue = BQ, @@ -404,9 +396,9 @@ is_duplicate(Message = #basic_message { id = MsgId }, {published, State #state { seen_status = dict:erase(MsgId, SS), confirmed = [MsgId | Confirmed] }}; {ok, discarded} -> - %% Don't erase from SS here because discard/2 is about to - %% be called and we need to be able to detect this case - {discarded, State} + %% Message was discarded while we were a slave. Erase + %% and let amqqueue_process confirm if necessary. + {discarded, State #state { seen_status = dict:erase(MsgId, SS) }} end. %% --------------------------------------------------------------------------- |