diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2010-07-19 12:09:46 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2010-07-19 12:09:46 +0100 |
commit | d840ecb005ded849632dffb906905d38bc0a19ea (patch) | |
tree | 78f20be06f3a8c3391ab49de146cf14446ae2f99 | |
parent | 838f8d48c1838f288517b63cf2351fc48ea8e371 (diff) | |
parent | 321b1dc7775cabf4895e185fa6f07264f2f77fdf (diff) | |
download | rabbitmq-server-d840ecb005ded849632dffb906905d38bc0a19ea.tar.gz |
Merging bug 22987 into bug 21673
-rw-r--r-- | src/rabbit_variable_queue.erl | 84 |
1 files changed, 55 insertions, 29 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index f1510da3..7e960fde 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -220,6 +220,8 @@ -record(tx, { pending_messages, pending_acks }). +-record(sync, { acks_persistent, acks_all, pubs, funs }). + %% When we discover, on publish, that we should write some indices to %% disk for some betas, the RAM_INDEX_BATCH_SIZE sets the number of %% betas that we must be due to write indices for before we do any @@ -245,6 +247,11 @@ count :: non_neg_integer (), end_seq_id :: non_neg_integer() }). +-type(sync() :: #sync { acks_persistent :: [[seq_id()]], + acks_all :: [[seq_id()]], + pubs :: [[rabbit_guid:guid()]], + funs :: [fun (() -> any())] }). + -type(state() :: #vqstate { q1 :: queue(), q2 :: bpqueue:bpqueue(), @@ -256,8 +263,7 @@ index_state :: any(), msg_store_clients :: 'undefined' | {{any(), binary()}, {any(), binary()}}, - on_sync :: {[[ack()]], [[rabbit_guid:guid()]], - [fun (() -> any())]}, + on_sync :: sync(), durable :: boolean(), len :: non_neg_integer(), @@ -289,6 +295,11 @@ count = 0, end_seq_id = Z }). +-define(BLANK_SYNC, #sync { acks_persistent = [], + acks_all = [], + pubs = [], + funs = [] }). + %%---------------------------------------------------------------------------- %% Public API %%---------------------------------------------------------------------------- @@ -349,7 +360,7 @@ init(QueueName, IsDurable, _Recover) -> index_state = IndexState1, msg_store_clients = {{PersistentClient, PRef}, {TransientClient, TRef}}, - on_sync = {[], [], []}, + on_sync = ?BLANK_SYNC, durable = IsDurable, transient_threshold = NextSeqId, @@ -634,14 +645,14 @@ ram_duration(State = #vqstate { egress_rate = Egress, out_counter = 0, ram_msg_count_prev = RamMsgCount })}. -needs_idle_timeout(#vqstate { on_sync = {_, _, [_|_]}}) -> - true; -needs_idle_timeout(State) -> +needs_idle_timeout(State = #vqstate { on_sync = ?BLANK_SYNC }) -> {Res, _State} = reduce_memory_use(fun (_Quota, State1) -> State1 end, fun (_Quota, State1) -> State1 end, fun (State1) -> State1 end, State), - Res. + Res; +needs_idle_timeout(_State) -> + true. idle_timeout(State) -> a(reduce_memory_use(tx_commit_index(State))). @@ -651,7 +662,7 @@ handle_pre_hibernate(State = #vqstate { index_state = IndexState }) -> status(#vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, len = Len, pending_ack = PA, - on_sync = {_, _, From}, + on_sync = #sync { funs = From }, target_ram_msg_count = TargetRamMsgCount, ram_msg_count = RamMsgCount, ram_index_count = RamIndexCount, @@ -838,33 +849,48 @@ msg_store_callback(PersistentGuids, Pubs, AckTags, Fun) -> tx_commit_post_msg_store(HasPersistentPubs, Pubs, AckTags, Fun, State = #vqstate { - on_sync = OnSync = {SAcks, SPubs, SFuns}, + on_sync = OnSync = #sync { + acks_persistent = SPAcks, + acks_all = SAcks, + pubs = SPubs, + funs = SFuns }, pending_ack = PA, durable = IsDurable }) -> - case IsDurable andalso - (HasPersistentPubs orelse - lists:any(fun (AckTag) -> - case dict:find(AckTag, PA) of - {ok, #msg_status {}} -> false; - {ok, {IsPersistent, _Guid}} -> IsPersistent - end - end, AckTags)) of - true -> State #vqstate { on_sync = { [AckTags | SAcks], - [Pubs | SPubs], - [Fun | SFuns] }}; + PersistentAcks = + case IsDurable of + true -> [AckTag || AckTag <- AckTags, + case dict:find(AckTag, PA) of + {ok, #msg_status {}} -> false; + {ok, {IsPersistent, _Guid}} -> IsPersistent + end]; + false -> [] + end, + case IsDurable andalso (HasPersistentPubs orelse PersistentAcks =/= []) of + true -> State #vqstate { on_sync = #sync { + acks_persistent = [PersistentAcks | SPAcks], + acks_all = [AckTags | SAcks], + pubs = [Pubs | SPubs], + funs = [Fun | SFuns] }}; false -> State1 = tx_commit_index( - State #vqstate { on_sync = { [AckTags], - [Pubs], - [Fun]} }), + State #vqstate { on_sync = #sync { + acks_persistent = [], + acks_all = [AckTags], + pubs = [Pubs], + funs = [Fun] } }), State1 #vqstate { on_sync = OnSync } end. -tx_commit_index(State = #vqstate { on_sync = {_, _, []} }) -> +tx_commit_index(State = #vqstate { on_sync = ?BLANK_SYNC }) -> State; -tx_commit_index(State = #vqstate { on_sync = {SAcks, SPubs, SFuns}, +tx_commit_index(State = #vqstate { on_sync = #sync { + acks_persistent = SPAcks, + acks_all = SAcks, + pubs = SPubs, + funs = SFuns }, durable = IsDurable }) -> - Acks = lists:flatten(SAcks), - Pubs = lists:flatten(lists:reverse(SPubs)), + PAcks = lists:flatten(SPAcks), + Acks = lists:flatten(SAcks), + Pubs = lists:flatten(lists:reverse(SPubs)), {SeqIds, State1 = #vqstate { index_state = IndexState }} = lists:foldl( fun (Msg = #basic_message { is_persistent = IsPersistent }, @@ -872,11 +898,11 @@ tx_commit_index(State = #vqstate { on_sync = {SAcks, SPubs, SFuns}, IsPersistent1 = IsDurable andalso IsPersistent, {SeqId, State3} = publish(Msg, false, IsPersistent1, State2), {cons_if(IsPersistent1, SeqId, SeqIdsAcc), State3} - end, {Acks, ack(Acks, State)}, Pubs), + end, {PAcks, ack(Acks, State)}, Pubs), IndexState1 = rabbit_queue_index:sync(SeqIds, IndexState), [ Fun() || Fun <- lists:reverse(SFuns) ], reduce_memory_use( - State1 #vqstate { index_state = IndexState1, on_sync = {[], [], []} }). + State1 #vqstate { index_state = IndexState1, on_sync = ?BLANK_SYNC }). purge_betas_and_deltas(State = #vqstate { q3 = Q3, index_state = IndexState }) -> |