diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2012-10-12 22:52:40 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2012-10-12 22:52:40 +0100 |
commit | 100a85a5a47f229624c9e82694d0aaa77649b391 (patch) | |
tree | 1cdec724a6cb29756deae90fb47017da7bc06663 | |
parent | 88a9592e06dd8721bb3bdf4ea5c47a0f1238669e (diff) | |
download | rabbitmq-server-100a85a5a47f229624c9e82694d0aaa77649b391.tar.gz |
store less in the slave's msg_id_status
There is no need to track the ChPid of 'published' and 'confirmed'.
Also: neater conversion of msg_id_status on promotion.
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 36 |
1 files changed, 16 insertions, 20 deletions
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 931a7f90..0530fa7f 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -406,16 +406,16 @@ confirm_messages(MsgIds, State = #state { msg_id_status = MS }) -> %% If it needed confirming, it'll have %% already been done. Acc; - {ok, {published, ChPid}} -> + {ok, published} -> %% Still not seen it from the channel, just %% record that it's been confirmed. - {CMsN, dict:store(MsgId, {confirmed, ChPid}, MSN)}; + {CMsN, dict:store(MsgId, confirmed, MSN)}; {ok, {published, ChPid, MsgSeqNo}} -> %% Seen from both GM and Channel. Can now %% confirm. {rabbit_misc:gb_trees_cons(ChPid, MsgSeqNo, CMsN), dict:erase(MsgId, MSN)}; - {ok, {confirmed, _ChPid}} -> + {ok, confirmed} -> %% It's already been confirmed. This is %% probably it's been both sync'd to disk %% and then delivered and ack'd before we've @@ -482,18 +482,18 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName }, %% %% MS contains the following three entry types: %% - %% a) {published, ChPid}: + %% a) published: %% published via gm only; pending arrival of publication from %% channel, maybe pending confirm. %% %% b) {published, ChPid, MsgSeqNo}: %% published via gm and channel; pending confirm. %% - %% c) {confirmed, ChPid}: + %% c) confirmed: %% published via gm only, and confirmed; pending publication %% from channel. %% - %% d) discarded + %% d) discarded: %% seen via gm only as discarded. Pending publication from %% channel %% @@ -511,22 +511,18 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName }, %% this does not affect MS, nor which bits go through to SS in %% Master, or MTC in queue_process. - MSList = dict:to_list(MS), - SS = dict:from_list( - [E || E = {_MsgId, discarded} <- MSList] ++ - [{MsgId, Status} - || {MsgId, {Status, _ChPid}} <- MSList, - Status =:= published orelse Status =:= confirmed]), + St = [published, confirmed, discarded], + SS = dict:filter(fun (_MsgId, Status) -> lists:member(Status, St) end, MS), AckTags = [AckTag || {_MsgId, AckTag} <- dict:to_list(MA)], MasterState = rabbit_mirror_queue_master:promote_backing_queue_state( CPid, BQ, BQS, GM, AckTags, SS, MPids), - MTC = lists:foldl(fun ({MsgId, {published, ChPid, MsgSeqNo}}, MTC0) -> - gb_trees:insert(MsgId, {ChPid, MsgSeqNo}, MTC0); - (_, MTC0) -> - MTC0 - end, gb_trees:empty(), MSList), + MTC = dict:fold(fun (MsgId, {published, ChPid, MsgSeqNo}, MTC0) -> + gb_trees:insert(MsgId, {ChPid, MsgSeqNo}, MTC0); + (_Msgid, _Status, MTC0) -> + MTC0 + end, gb_trees:empty(), MS), Deliveries = [Delivery || {_ChPid, {PubQ, _PendCh}} <- dict:to_list(SQ), Delivery <- queue:to_list(PubQ)], rabbit_amqqueue_process:init_with_backing_queue_state( @@ -637,14 +633,14 @@ maybe_enqueue_message( MQ1 = queue:in(Delivery, MQ), SQ1 = dict:store(ChPid, {MQ1, PendingCh}, SQ), State1 #state { sender_queues = SQ1 }; - {ok, {confirmed, ChPid}} -> + {ok, confirmed} -> %% BQ has confirmed it but we didn't know what the %% msg_seq_no was at the time. We do now! ok = rabbit_misc:confirm_to_sender(ChPid, [MsgSeqNo]), SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ), State1 #state { msg_id_status = dict:erase(MsgId, MS), sender_queues = SQ1 }; - {ok, {published, ChPid}} -> + {ok, published} -> %% It was published to the BQ and we didn't know the %% msg_seq_no so couldn't confirm it at the time. {MS1, SQ1} = @@ -703,7 +699,7 @@ process_instruction( case queue:out(MQ) of {empty, _MQ2} -> {MQ, sets:add_element(MsgId, PendingCh), - dict:store(MsgId, {published, ChPid}, MS)}; + dict:store(MsgId, published, MS)}; {{value, Delivery = #delivery { msg_seq_no = MsgSeqNo, message = #basic_message { id = MsgId } }}, MQ2} -> |