summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2010-07-19 12:09:46 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2010-07-19 12:09:46 +0100
commitd840ecb005ded849632dffb906905d38bc0a19ea (patch)
tree78f20be06f3a8c3391ab49de146cf14446ae2f99
parent838f8d48c1838f288517b63cf2351fc48ea8e371 (diff)
parent321b1dc7775cabf4895e185fa6f07264f2f77fdf (diff)
downloadrabbitmq-server-d840ecb005ded849632dffb906905d38bc0a19ea.tar.gz
Merging bug 22987 into bug 21673
-rw-r--r--src/rabbit_variable_queue.erl84
1 files changed, 55 insertions, 29 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index f1510da3..7e960fde 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -220,6 +220,8 @@
-record(tx, { pending_messages, pending_acks }).
+-record(sync, { acks_persistent, acks_all, pubs, funs }).
+
%% When we discover, on publish, that we should write some indices to
%% disk for some betas, the RAM_INDEX_BATCH_SIZE sets the number of
%% betas that we must be due to write indices for before we do any
@@ -245,6 +247,11 @@
count :: non_neg_integer (),
end_seq_id :: non_neg_integer() }).
+-type(sync() :: #sync { acks_persistent :: [[seq_id()]],
+ acks_all :: [[seq_id()]],
+ pubs :: [[rabbit_guid:guid()]],
+ funs :: [fun (() -> any())] }).
+
-type(state() :: #vqstate {
q1 :: queue(),
q2 :: bpqueue:bpqueue(),
@@ -256,8 +263,7 @@
index_state :: any(),
msg_store_clients :: 'undefined' | {{any(), binary()},
{any(), binary()}},
- on_sync :: {[[ack()]], [[rabbit_guid:guid()]],
- [fun (() -> any())]},
+ on_sync :: sync(),
durable :: boolean(),
len :: non_neg_integer(),
@@ -289,6 +295,11 @@
count = 0,
end_seq_id = Z }).
+-define(BLANK_SYNC, #sync { acks_persistent = [],
+ acks_all = [],
+ pubs = [],
+ funs = [] }).
+
%%----------------------------------------------------------------------------
%% Public API
%%----------------------------------------------------------------------------
@@ -349,7 +360,7 @@ init(QueueName, IsDurable, _Recover) ->
index_state = IndexState1,
msg_store_clients = {{PersistentClient, PRef},
{TransientClient, TRef}},
- on_sync = {[], [], []},
+ on_sync = ?BLANK_SYNC,
durable = IsDurable,
transient_threshold = NextSeqId,
@@ -634,14 +645,14 @@ ram_duration(State = #vqstate { egress_rate = Egress,
out_counter = 0,
ram_msg_count_prev = RamMsgCount })}.
-needs_idle_timeout(#vqstate { on_sync = {_, _, [_|_]}}) ->
- true;
-needs_idle_timeout(State) ->
+needs_idle_timeout(State = #vqstate { on_sync = ?BLANK_SYNC }) ->
{Res, _State} = reduce_memory_use(fun (_Quota, State1) -> State1 end,
fun (_Quota, State1) -> State1 end,
fun (State1) -> State1 end,
State),
- Res.
+ Res;
+needs_idle_timeout(_State) ->
+ true.
idle_timeout(State) -> a(reduce_memory_use(tx_commit_index(State))).
@@ -651,7 +662,7 @@ handle_pre_hibernate(State = #vqstate { index_state = IndexState }) ->
status(#vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
len = Len,
pending_ack = PA,
- on_sync = {_, _, From},
+ on_sync = #sync { funs = From },
target_ram_msg_count = TargetRamMsgCount,
ram_msg_count = RamMsgCount,
ram_index_count = RamIndexCount,
@@ -838,33 +849,48 @@ msg_store_callback(PersistentGuids, Pubs, AckTags, Fun) ->
tx_commit_post_msg_store(HasPersistentPubs, Pubs, AckTags, Fun,
State = #vqstate {
- on_sync = OnSync = {SAcks, SPubs, SFuns},
+ on_sync = OnSync = #sync {
+ acks_persistent = SPAcks,
+ acks_all = SAcks,
+ pubs = SPubs,
+ funs = SFuns },
pending_ack = PA,
durable = IsDurable }) ->
- case IsDurable andalso
- (HasPersistentPubs orelse
- lists:any(fun (AckTag) ->
- case dict:find(AckTag, PA) of
- {ok, #msg_status {}} -> false;
- {ok, {IsPersistent, _Guid}} -> IsPersistent
- end
- end, AckTags)) of
- true -> State #vqstate { on_sync = { [AckTags | SAcks],
- [Pubs | SPubs],
- [Fun | SFuns] }};
+ PersistentAcks =
+ case IsDurable of
+ true -> [AckTag || AckTag <- AckTags,
+ case dict:find(AckTag, PA) of
+ {ok, #msg_status {}} -> false;
+ {ok, {IsPersistent, _Guid}} -> IsPersistent
+ end];
+ false -> []
+ end,
+ case IsDurable andalso (HasPersistentPubs orelse PersistentAcks =/= []) of
+ true -> State #vqstate { on_sync = #sync {
+ acks_persistent = [PersistentAcks | SPAcks],
+ acks_all = [AckTags | SAcks],
+ pubs = [Pubs | SPubs],
+ funs = [Fun | SFuns] }};
false -> State1 = tx_commit_index(
- State #vqstate { on_sync = { [AckTags],
- [Pubs],
- [Fun]} }),
+ State #vqstate { on_sync = #sync {
+ acks_persistent = [],
+ acks_all = [AckTags],
+ pubs = [Pubs],
+ funs = [Fun] } }),
State1 #vqstate { on_sync = OnSync }
end.
-tx_commit_index(State = #vqstate { on_sync = {_, _, []} }) ->
+tx_commit_index(State = #vqstate { on_sync = ?BLANK_SYNC }) ->
State;
-tx_commit_index(State = #vqstate { on_sync = {SAcks, SPubs, SFuns},
+tx_commit_index(State = #vqstate { on_sync = #sync {
+ acks_persistent = SPAcks,
+ acks_all = SAcks,
+ pubs = SPubs,
+ funs = SFuns },
durable = IsDurable }) ->
- Acks = lists:flatten(SAcks),
- Pubs = lists:flatten(lists:reverse(SPubs)),
+ PAcks = lists:flatten(SPAcks),
+ Acks = lists:flatten(SAcks),
+ Pubs = lists:flatten(lists:reverse(SPubs)),
{SeqIds, State1 = #vqstate { index_state = IndexState }} =
lists:foldl(
fun (Msg = #basic_message { is_persistent = IsPersistent },
@@ -872,11 +898,11 @@ tx_commit_index(State = #vqstate { on_sync = {SAcks, SPubs, SFuns},
IsPersistent1 = IsDurable andalso IsPersistent,
{SeqId, State3} = publish(Msg, false, IsPersistent1, State2),
{cons_if(IsPersistent1, SeqId, SeqIdsAcc), State3}
- end, {Acks, ack(Acks, State)}, Pubs),
+ end, {PAcks, ack(Acks, State)}, Pubs),
IndexState1 = rabbit_queue_index:sync(SeqIds, IndexState),
[ Fun() || Fun <- lists:reverse(SFuns) ],
reduce_memory_use(
- State1 #vqstate { index_state = IndexState1, on_sync = {[], [], []} }).
+ State1 #vqstate { index_state = IndexState1, on_sync = ?BLANK_SYNC }).
purge_betas_and_deltas(State = #vqstate { q3 = Q3,
index_state = IndexState }) ->