diff options
author | Rob Harrop <rob@rabbitmq.com> | 2010-10-11 15:54:48 +0100 |
---|---|---|
committer | Rob Harrop <rob@rabbitmq.com> | 2010-10-11 15:54:48 +0100 |
commit | 63c9e1de03ec055598beed856aef4f2cddc50ff0 (patch) | |
tree | b2afd12f87e38473d4a7ca6d7d516a1b2e4b128f | |
parent | 178c6bc0318d1a471f4a7eb6b29e7e7e63098996 (diff) | |
download | rabbitmq-server-63c9e1de03ec055598beed856aef4f2cddc50ff0.tar.gz |
Started recording the acks that are stored as full messages in memory
-rw-r--r-- | src/rabbit_variable_queue.erl | 61 |
1 files changed, 36 insertions, 25 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index cbc71bcc..208f3d56 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -220,6 +220,7 @@ q4, next_seq_id, pending_ack, + pending_ack_index, index_state, msg_store_clients, on_sync, @@ -305,6 +306,7 @@ q4 :: queue(), next_seq_id :: seq_id(), pending_ack :: dict:dictionary(), + pending_ack_index :: gb_trees:gb_tree(), index_state :: any(), msg_store_clients :: 'undefined' | {{any(), binary()}, {any(), binary()}}, @@ -407,6 +409,7 @@ init(QueueName, IsDurable, Recover) -> q4 = queue:new(), next_seq_id = NextSeqId, pending_ack = dict:new(), + pending_ack_index = gb_trees:empty(), index_state = IndexState1, msg_store_clients = {{PersistentClient, PRef}, {TransientClient, TRef}}, @@ -509,27 +512,24 @@ publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent }, out_counter = OutCount, in_counter = InCount, persistent_count = PCount, - pending_ack = PA, durable = IsDurable }) -> IsPersistent1 = IsDurable andalso IsPersistent, MsgStatus = (msg_status(IsPersistent1, SeqId, Msg)) #msg_status { is_delivered = true }, {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State), - PA1 = record_pending_ack(m(MsgStatus1), PA), + State2 = record_pending_ack(m(MsgStatus1), State1), PCount1 = PCount + one_if(IsPersistent1), - {SeqId, a(State1 #vqstate { next_seq_id = SeqId + 1, + {SeqId, a(State2 #vqstate { next_seq_id = SeqId + 1, out_counter = OutCount + 1, in_counter = InCount + 1, - persistent_count = PCount1, - pending_ack = PA1 })}. + persistent_count = PCount1 })}. fetch(AckRequired, State = #vqstate { q4 = Q4, ram_msg_count = RamMsgCount, out_counter = OutCount, index_state = IndexState, len = Len, - persistent_count = PCount, - pending_ack = PA }) -> + persistent_count = PCount }) -> case queue:out(Q4) of {empty, _Q4} -> case fetch_from_q3_to_q4(State) of @@ -560,24 +560,24 @@ fetch(AckRequired, State = #vqstate { q4 = Q4, end, %% 3. If an ack is required, add something sensible to PA - {AckTag, PA1} = case AckRequired of - true -> PA2 = record_pending_ack( - MsgStatus #msg_status { - is_delivered = true }, PA), - {SeqId, PA2}; - false -> {blank_ack, PA} + {AckTag, State1} = case AckRequired of + true -> StateN = record_pending_ack( + MsgStatus #msg_status { + is_delivered = true }, + State), + {SeqId, StateN}; + false -> {blank_ack, State} end, PCount1 = PCount - one_if(IsPersistent andalso not AckRequired), Len1 = Len - 1, {{Msg, IsDelivered, AckTag, Len1}, - a(State #vqstate { q4 = Q4a, + a(State1 #vqstate { q4 = Q4a, ram_msg_count = RamMsgCount - 1, out_counter = OutCount + 1, index_state = IndexState2, len = Len1, - persistent_count = PCount1, - pending_ack = PA1 })} + persistent_count = PCount1})} end. ack(AckTags, State) -> @@ -1090,19 +1090,27 @@ maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus, record_pending_ack(#msg_status { guid = Guid, seq_id = SeqId, is_persistent = IsPersistent, - msg_on_disk = MsgOnDisk } = MsgStatus, PA) -> - AckEntry = case MsgOnDisk of - true -> {IsPersistent, Guid}; - false -> MsgStatus - end, - dict:store(SeqId, AckEntry, PA). + msg_on_disk = MsgOnDisk } = MsgStatus, + State = #vqstate { pending_ack = PA, + pending_ack_index = PAI }) -> + {AckEntry, PAI1} = + case MsgOnDisk of + true -> + {{IsPersistent, Guid}, PAI}; + false -> + {MsgStatus, gb_trees:insert(SeqId, Guid, PAI)} + end, + PA1 = dict:store(SeqId, AckEntry, PA), + State #vqstate { pending_ack = PA1, pending_ack_index = PAI1 }. +%% TODO: On remove, need to prevent any seqids that remove_pending_ack(KeepPersistent, State = #vqstate { pending_ack = PA, index_state = IndexState }) -> {SeqIds, GuidsByStore} = dict:fold(fun accumulate_ack/3, {[], orddict:new()}, PA), - State1 = State #vqstate { pending_ack = dict:new() }, + State1 = State #vqstate { pending_ack = dict:new(), + pending_ack_index = gb_trees:empty() }, case KeepPersistent of true -> case orddict:find(?TRANSIENT_MSG_STORE, GuidsByStore) of error -> State1; @@ -1124,11 +1132,14 @@ ack(MsgStoreFun, Fun, AckTags, State) -> {{SeqIds, GuidsByStore}, State1 = #vqstate { index_state = IndexState, persistent_count = PCount }} = lists:foldl( - fun (SeqId, {Acc, State2 = #vqstate { pending_ack = PA }}) -> + fun (SeqId, {Acc, State2 = #vqstate { pending_ack = PA, + pending_ack_index = PAI }}) -> {ok, AckEntry} = dict:find(SeqId, PA), {accumulate_ack(SeqId, AckEntry, Acc), Fun(AckEntry, State2 #vqstate { - pending_ack = dict:erase(SeqId, PA) })} + pending_ack = dict:erase(SeqId, PA), + pending_ack_index = + gb_trees:delete_any(SeqId, PAI)})} end, {{[], orddict:new()}, State}, AckTags), IndexState1 = rabbit_queue_index:ack(SeqIds, IndexState), ok = orddict:fold(fun (MsgStore, Guids, ok) -> |