summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRob Harrop <rob@rabbitmq.com>2010-10-11 15:54:48 +0100
committerRob Harrop <rob@rabbitmq.com>2010-10-11 15:54:48 +0100
commit63c9e1de03ec055598beed856aef4f2cddc50ff0 (patch)
treeb2afd12f87e38473d4a7ca6d7d516a1b2e4b128f
parent178c6bc0318d1a471f4a7eb6b29e7e7e63098996 (diff)
downloadrabbitmq-server-63c9e1de03ec055598beed856aef4f2cddc50ff0.tar.gz
Started recording the acks that are stored as full messages in memory
-rw-r--r--src/rabbit_variable_queue.erl61
1 files changed, 36 insertions, 25 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index cbc71bcc..208f3d56 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -220,6 +220,7 @@
q4,
next_seq_id,
pending_ack,
+ pending_ack_index,
index_state,
msg_store_clients,
on_sync,
@@ -305,6 +306,7 @@
q4 :: queue(),
next_seq_id :: seq_id(),
pending_ack :: dict:dictionary(),
+ pending_ack_index :: gb_trees:gb_tree(),
index_state :: any(),
msg_store_clients :: 'undefined' | {{any(), binary()},
{any(), binary()}},
@@ -407,6 +409,7 @@ init(QueueName, IsDurable, Recover) ->
q4 = queue:new(),
next_seq_id = NextSeqId,
pending_ack = dict:new(),
+ pending_ack_index = gb_trees:empty(),
index_state = IndexState1,
msg_store_clients = {{PersistentClient, PRef},
{TransientClient, TRef}},
@@ -509,27 +512,24 @@ publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent },
out_counter = OutCount,
in_counter = InCount,
persistent_count = PCount,
- pending_ack = PA,
durable = IsDurable }) ->
IsPersistent1 = IsDurable andalso IsPersistent,
MsgStatus = (msg_status(IsPersistent1, SeqId, Msg))
#msg_status { is_delivered = true },
{MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State),
- PA1 = record_pending_ack(m(MsgStatus1), PA),
+ State2 = record_pending_ack(m(MsgStatus1), State1),
PCount1 = PCount + one_if(IsPersistent1),
- {SeqId, a(State1 #vqstate { next_seq_id = SeqId + 1,
+ {SeqId, a(State2 #vqstate { next_seq_id = SeqId + 1,
out_counter = OutCount + 1,
in_counter = InCount + 1,
- persistent_count = PCount1,
- pending_ack = PA1 })}.
+ persistent_count = PCount1 })}.
fetch(AckRequired, State = #vqstate { q4 = Q4,
ram_msg_count = RamMsgCount,
out_counter = OutCount,
index_state = IndexState,
len = Len,
- persistent_count = PCount,
- pending_ack = PA }) ->
+ persistent_count = PCount }) ->
case queue:out(Q4) of
{empty, _Q4} ->
case fetch_from_q3_to_q4(State) of
@@ -560,24 +560,24 @@ fetch(AckRequired, State = #vqstate { q4 = Q4,
end,
%% 3. If an ack is required, add something sensible to PA
- {AckTag, PA1} = case AckRequired of
- true -> PA2 = record_pending_ack(
- MsgStatus #msg_status {
- is_delivered = true }, PA),
- {SeqId, PA2};
- false -> {blank_ack, PA}
+ {AckTag, State1} = case AckRequired of
+ true -> StateN = record_pending_ack(
+ MsgStatus #msg_status {
+ is_delivered = true },
+ State),
+ {SeqId, StateN};
+ false -> {blank_ack, State}
end,
PCount1 = PCount - one_if(IsPersistent andalso not AckRequired),
Len1 = Len - 1,
{{Msg, IsDelivered, AckTag, Len1},
- a(State #vqstate { q4 = Q4a,
+ a(State1 #vqstate { q4 = Q4a,
ram_msg_count = RamMsgCount - 1,
out_counter = OutCount + 1,
index_state = IndexState2,
len = Len1,
- persistent_count = PCount1,
- pending_ack = PA1 })}
+ persistent_count = PCount1})}
end.
ack(AckTags, State) ->
@@ -1090,19 +1090,27 @@ maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus,
record_pending_ack(#msg_status { guid = Guid, seq_id = SeqId,
is_persistent = IsPersistent,
- msg_on_disk = MsgOnDisk } = MsgStatus, PA) ->
- AckEntry = case MsgOnDisk of
- true -> {IsPersistent, Guid};
- false -> MsgStatus
- end,
- dict:store(SeqId, AckEntry, PA).
+ msg_on_disk = MsgOnDisk } = MsgStatus,
+ State = #vqstate { pending_ack = PA,
+ pending_ack_index = PAI }) ->
+ {AckEntry, PAI1} =
+ case MsgOnDisk of
+ true ->
+ {{IsPersistent, Guid}, PAI};
+ false ->
+ {MsgStatus, gb_trees:insert(SeqId, Guid, PAI)}
+ end,
+ PA1 = dict:store(SeqId, AckEntry, PA),
+ State #vqstate { pending_ack = PA1, pending_ack_index = PAI1 }.
+%% TODO: On remove, need to prevent any seqids that
remove_pending_ack(KeepPersistent,
State = #vqstate { pending_ack = PA,
index_state = IndexState }) ->
{SeqIds, GuidsByStore} = dict:fold(fun accumulate_ack/3,
{[], orddict:new()}, PA),
- State1 = State #vqstate { pending_ack = dict:new() },
+ State1 = State #vqstate { pending_ack = dict:new(),
+ pending_ack_index = gb_trees:empty() },
case KeepPersistent of
true -> case orddict:find(?TRANSIENT_MSG_STORE, GuidsByStore) of
error -> State1;
@@ -1124,11 +1132,14 @@ ack(MsgStoreFun, Fun, AckTags, State) ->
{{SeqIds, GuidsByStore}, State1 = #vqstate { index_state = IndexState,
persistent_count = PCount }} =
lists:foldl(
- fun (SeqId, {Acc, State2 = #vqstate { pending_ack = PA }}) ->
+ fun (SeqId, {Acc, State2 = #vqstate { pending_ack = PA,
+ pending_ack_index = PAI }}) ->
{ok, AckEntry} = dict:find(SeqId, PA),
{accumulate_ack(SeqId, AckEntry, Acc),
Fun(AckEntry, State2 #vqstate {
- pending_ack = dict:erase(SeqId, PA) })}
+ pending_ack = dict:erase(SeqId, PA),
+ pending_ack_index =
+ gb_trees:delete_any(SeqId, PAI)})}
end, {{[], orddict:new()}, State}, AckTags),
IndexState1 = rabbit_queue_index:ack(SeqIds, IndexState),
ok = orddict:fold(fun (MsgStore, Guids, ok) ->