summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2012-10-12 22:52:40 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2012-10-12 22:52:40 +0100
commit100a85a5a47f229624c9e82694d0aaa77649b391 (patch)
tree1cdec724a6cb29756deae90fb47017da7bc06663
parent88a9592e06dd8721bb3bdf4ea5c47a0f1238669e (diff)
downloadrabbitmq-server-100a85a5a47f229624c9e82694d0aaa77649b391.tar.gz
store less in the slave's msg_id_status
There is no need to track the ChPid of 'published' and 'confirmed'. Also: neater conversion of msg_id_status on promotion.
-rw-r--r--src/rabbit_mirror_queue_slave.erl36
1 files changed, 16 insertions, 20 deletions
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 931a7f90..0530fa7f 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -406,16 +406,16 @@ confirm_messages(MsgIds, State = #state { msg_id_status = MS }) ->
%% If it needed confirming, it'll have
%% already been done.
Acc;
- {ok, {published, ChPid}} ->
+ {ok, published} ->
%% Still not seen it from the channel, just
%% record that it's been confirmed.
- {CMsN, dict:store(MsgId, {confirmed, ChPid}, MSN)};
+ {CMsN, dict:store(MsgId, confirmed, MSN)};
{ok, {published, ChPid, MsgSeqNo}} ->
%% Seen from both GM and Channel. Can now
%% confirm.
{rabbit_misc:gb_trees_cons(ChPid, MsgSeqNo, CMsN),
dict:erase(MsgId, MSN)};
- {ok, {confirmed, _ChPid}} ->
+ {ok, confirmed} ->
%% It's already been confirmed. This is
%% probably it's been both sync'd to disk
%% and then delivered and ack'd before we've
@@ -482,18 +482,18 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName },
%%
%% MS contains the following three entry types:
%%
- %% a) {published, ChPid}:
+ %% a) published:
%% published via gm only; pending arrival of publication from
%% channel, maybe pending confirm.
%%
%% b) {published, ChPid, MsgSeqNo}:
%% published via gm and channel; pending confirm.
%%
- %% c) {confirmed, ChPid}:
+ %% c) confirmed:
%% published via gm only, and confirmed; pending publication
%% from channel.
%%
- %% d) discarded
+ %% d) discarded:
%% seen via gm only as discarded. Pending publication from
%% channel
%%
@@ -511,22 +511,18 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName },
%% this does not affect MS, nor which bits go through to SS in
%% Master, or MTC in queue_process.
- MSList = dict:to_list(MS),
- SS = dict:from_list(
- [E || E = {_MsgId, discarded} <- MSList] ++
- [{MsgId, Status}
- || {MsgId, {Status, _ChPid}} <- MSList,
- Status =:= published orelse Status =:= confirmed]),
+ St = [published, confirmed, discarded],
+ SS = dict:filter(fun (_MsgId, Status) -> lists:member(Status, St) end, MS),
AckTags = [AckTag || {_MsgId, AckTag} <- dict:to_list(MA)],
MasterState = rabbit_mirror_queue_master:promote_backing_queue_state(
CPid, BQ, BQS, GM, AckTags, SS, MPids),
- MTC = lists:foldl(fun ({MsgId, {published, ChPid, MsgSeqNo}}, MTC0) ->
- gb_trees:insert(MsgId, {ChPid, MsgSeqNo}, MTC0);
- (_, MTC0) ->
- MTC0
- end, gb_trees:empty(), MSList),
+ MTC = dict:fold(fun (MsgId, {published, ChPid, MsgSeqNo}, MTC0) ->
+ gb_trees:insert(MsgId, {ChPid, MsgSeqNo}, MTC0);
+ (_Msgid, _Status, MTC0) ->
+ MTC0
+ end, gb_trees:empty(), MS),
Deliveries = [Delivery || {_ChPid, {PubQ, _PendCh}} <- dict:to_list(SQ),
Delivery <- queue:to_list(PubQ)],
rabbit_amqqueue_process:init_with_backing_queue_state(
@@ -637,14 +633,14 @@ maybe_enqueue_message(
MQ1 = queue:in(Delivery, MQ),
SQ1 = dict:store(ChPid, {MQ1, PendingCh}, SQ),
State1 #state { sender_queues = SQ1 };
- {ok, {confirmed, ChPid}} ->
+ {ok, confirmed} ->
%% BQ has confirmed it but we didn't know what the
%% msg_seq_no was at the time. We do now!
ok = rabbit_misc:confirm_to_sender(ChPid, [MsgSeqNo]),
SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ),
State1 #state { msg_id_status = dict:erase(MsgId, MS),
sender_queues = SQ1 };
- {ok, {published, ChPid}} ->
+ {ok, published} ->
%% It was published to the BQ and we didn't know the
%% msg_seq_no so couldn't confirm it at the time.
{MS1, SQ1} =
@@ -703,7 +699,7 @@ process_instruction(
case queue:out(MQ) of
{empty, _MQ2} ->
{MQ, sets:add_element(MsgId, PendingCh),
- dict:store(MsgId, {published, ChPid}, MS)};
+ dict:store(MsgId, published, MS)};
{{value, Delivery = #delivery {
msg_seq_no = MsgSeqNo,
message = #basic_message { id = MsgId } }}, MQ2} ->