diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2010-07-22 15:12:58 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2010-07-22 15:12:58 +0100 |
commit | 1926573db998244854f33acbddd50576104ec355 (patch) | |
tree | 2fc4aab7a40a5efb5e1f2d082abae18af45726b3 | |
parent | 8fe39e57d56d08bb803424e914bf51b2adaaab1b (diff) | |
parent | ac9cb2a32a2c3cf63e03a44416dd3a1d9c2969b7 (diff) | |
download | rabbitmq-server-1926573db998244854f33acbddd50576104ec355.tar.gz |
merge default into bug21673
-rw-r--r-- | ebin/rabbit_app.in | 5 | ||||
-rw-r--r-- | include/rabbit_msg_store.hrl | 41 | ||||
-rw-r--r-- | include/rabbit_msg_store_index.hrl | 59 | ||||
-rw-r--r-- | src/bpqueue.erl | 286 | ||||
-rw-r--r-- | src/gatherer.erl | 145 | ||||
-rw-r--r-- | src/rabbit_msg_file.erl | 136 | ||||
-rw-r--r-- | src/rabbit_msg_store.erl | 1731 | ||||
-rw-r--r-- | src/rabbit_msg_store_ets_index.erl | 90 | ||||
-rw-r--r-- | src/rabbit_msg_store_gc.erl | 141 | ||||
-rw-r--r-- | src/rabbit_msg_store_index.erl | 47 | ||||
-rw-r--r-- | src/rabbit_queue_index.erl | 929 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 720 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 1432 |
13 files changed, 5761 insertions, 1 deletions
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in index ce94cafe..2cd28abb 100644 --- a/ebin/rabbit_app.in +++ b/ebin/rabbit_app.in @@ -18,9 +18,12 @@ {ssl_listeners, []}, {ssl_options, []}, {vm_memory_high_watermark, 0.4}, - {backing_queue_module, rabbit_invariable_queue}, + {msg_store_index_module, rabbit_msg_store_ets_index}, + {backing_queue_module, rabbit_variable_queue}, {persister_max_wrap_entries, 500}, {persister_hibernate_after, 10000}, + {msg_store_file_size_limit, 16777216}, + {queue_index_max_journal_entries, 262144}, {default_user, <<"guest">>}, {default_pass, <<"guest">>}, {default_vhost, <<"/">>}, diff --git a/include/rabbit_msg_store.hrl b/include/rabbit_msg_store.hrl new file mode 100644 index 00000000..d96fa758 --- /dev/null +++ b/include/rabbit_msg_store.hrl @@ -0,0 +1,41 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-include("rabbit.hrl"). + +-ifdef(use_specs). + +-type(msg() :: any()). + +-endif. + +-record(msg_location, + {guid, ref_count, file, offset, total_size}). diff --git a/include/rabbit_msg_store_index.hrl b/include/rabbit_msg_store_index.hrl new file mode 100644 index 00000000..fba0b7cd --- /dev/null +++ b/include/rabbit_msg_store_index.hrl @@ -0,0 +1,59 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-include("rabbit_msg_store.hrl"). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-type(dir() :: any()). +-type(index_state() :: any()). +-type(keyvalue() :: any()). +-type(fieldpos() :: non_neg_integer()). +-type(fieldvalue() :: any()). + +-spec(new/1 :: (dir()) -> index_state()). +-spec(recover/1 :: (dir()) -> rabbit_types:ok_or_error2(index_state(), any())). +-spec(lookup/2 :: + (rabbit_guid:guid(), index_state()) -> ('not_found' | keyvalue())). +-spec(insert/2 :: (keyvalue(), index_state()) -> 'ok'). +-spec(update/2 :: (keyvalue(), index_state()) -> 'ok'). +-spec(update_fields/3 :: (rabbit_guid:guid(), ({fieldpos(), fieldvalue()} | + [{fieldpos(), fieldvalue()}]), + index_state()) -> 'ok'). +-spec(delete/2 :: (rabbit_guid:guid(), index_state()) -> 'ok'). +-spec(delete_by_file/2 :: (fieldvalue(), index_state()) -> 'ok'). +-spec(terminate/1 :: (index_state()) -> any()). + +-endif. + +%%---------------------------------------------------------------------------- diff --git a/src/bpqueue.erl b/src/bpqueue.erl new file mode 100644 index 00000000..49874aa6 --- /dev/null +++ b/src/bpqueue.erl @@ -0,0 +1,286 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(bpqueue). + +%% Block-prefixed queue. From the perspective of the queue interface +%% the datastructure acts like a regular queue where each value is +%% paired with the prefix. +%% +%% This is implemented as a queue of queues, which is more space and +%% time efficient, whilst supporting the normal queue interface. Each +%% inner queue has a prefix, which does not need to be unique, and it +%% is guaranteed that no two consecutive blocks have the same +%% prefix. len/1 returns the flattened length of the queue and is +%% O(1). + +-export([new/0, is_empty/1, len/1, in/3, in_r/3, out/1, out_r/1, join/2, + foldl/3, foldr/3, from_list/1, to_list/1, map_fold_filter_l/4, + map_fold_filter_r/4]). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-export_type([bpqueue/0]). + +-type(bpqueue() :: {non_neg_integer(), queue()}). +-type(prefix() :: any()). +-type(value() :: any()). +-type(result() :: ({'empty', bpqueue()} | + {{'value', prefix(), value()}, bpqueue()})). + +-spec(new/0 :: () -> bpqueue()). +-spec(is_empty/1 :: (bpqueue()) -> boolean()). +-spec(len/1 :: (bpqueue()) -> non_neg_integer()). +-spec(in/3 :: (prefix(), value(), bpqueue()) -> bpqueue()). +-spec(in_r/3 :: (prefix(), value(), bpqueue()) -> bpqueue()). +-spec(out/1 :: (bpqueue()) -> result()). +-spec(out_r/1 :: (bpqueue()) -> result()). +-spec(join/2 :: (bpqueue(), bpqueue()) -> bpqueue()). +-spec(foldl/3 :: (fun ((prefix(), value(), B) -> B), B, bpqueue()) -> B). +-spec(foldr/3 :: (fun ((prefix(), value(), B) -> B), B, bpqueue()) -> B). +-spec(from_list/1 :: ([{prefix(), [value()]}]) -> bpqueue()). +-spec(to_list/1 :: (bpqueue()) -> [{prefix(), [value()]}]). +-spec(map_fold_filter_l/4 :: ((fun ((prefix()) -> boolean())), + (fun ((value(), B) -> + ({prefix(), value(), B} | 'stop'))), + B, + bpqueue()) -> + {bpqueue(), B}). +-spec(map_fold_filter_r/4 :: ((fun ((prefix()) -> boolean())), + (fun ((value(), B) -> + ({prefix(), value(), B} | 'stop'))), + B, + bpqueue()) -> + {bpqueue(), B}). + +-endif. + +%%---------------------------------------------------------------------------- + +new() -> {0, queue:new()}. + +is_empty({0, _Q}) -> true; +is_empty(_BPQ) -> false. + +len({N, _Q}) -> N. + +in(Prefix, Value, {0, Q}) -> + {1, queue:in({Prefix, queue:from_list([Value])}, Q)}; +in(Prefix, Value, BPQ) -> + in1({fun queue:in/2, fun queue:out_r/1}, Prefix, Value, BPQ). + +in_r(Prefix, Value, BPQ = {0, _Q}) -> + in(Prefix, Value, BPQ); +in_r(Prefix, Value, BPQ) -> + in1({fun queue:in_r/2, fun queue:out/1}, Prefix, Value, BPQ). + +in1({In, Out}, Prefix, Value, {N, Q}) -> + {N+1, case Out(Q) of + {{value, {Prefix, InnerQ}}, Q1} -> + In({Prefix, In(Value, InnerQ)}, Q1); + {{value, {_Prefix, _InnerQ}}, _Q1} -> + In({Prefix, queue:in(Value, queue:new())}, Q) + end}. + +in_q(Prefix, Queue, BPQ = {0, Q}) -> + case queue:len(Queue) of + 0 -> BPQ; + N -> {N, queue:in({Prefix, Queue}, Q)} + end; +in_q(Prefix, Queue, BPQ) -> + in_q1({fun queue:in/2, fun queue:out_r/1, + fun queue:join/2}, + Prefix, Queue, BPQ). + +in_q_r(Prefix, Queue, BPQ = {0, _Q}) -> + in_q(Prefix, Queue, BPQ); +in_q_r(Prefix, Queue, BPQ) -> + in_q1({fun queue:in_r/2, fun queue:out/1, + fun (T, H) -> queue:join(H, T) end}, + Prefix, Queue, BPQ). + +in_q1({In, Out, Join}, Prefix, Queue, BPQ = {N, Q}) -> + case queue:len(Queue) of + 0 -> BPQ; + M -> {N + M, case Out(Q) of + {{value, {Prefix, InnerQ}}, Q1} -> + In({Prefix, Join(InnerQ, Queue)}, Q1); + {{value, {_Prefix, _InnerQ}}, _Q1} -> + In({Prefix, Queue}, Q) + end} + end. + +out({0, _Q} = BPQ) -> {empty, BPQ}; +out(BPQ) -> out1({fun queue:in_r/2, fun queue:out/1}, BPQ). + +out_r({0, _Q} = BPQ) -> {empty, BPQ}; +out_r(BPQ) -> out1({fun queue:in/2, fun queue:out_r/1}, BPQ). + +out1({In, Out}, {N, Q}) -> + {{value, {Prefix, InnerQ}}, Q1} = Out(Q), + {{value, Value}, InnerQ1} = Out(InnerQ), + Q2 = case queue:is_empty(InnerQ1) of + true -> Q1; + false -> In({Prefix, InnerQ1}, Q1) + end, + {{value, Prefix, Value}, {N-1, Q2}}. + +join({0, _Q}, BPQ) -> + BPQ; +join(BPQ, {0, _Q}) -> + BPQ; +join({NHead, QHead}, {NTail, QTail}) -> + {{value, {Prefix, InnerQHead}}, QHead1} = queue:out_r(QHead), + {NHead + NTail, + case queue:out(QTail) of + {{value, {Prefix, InnerQTail}}, QTail1} -> + queue:join( + queue:in({Prefix, queue:join(InnerQHead, InnerQTail)}, QHead1), + QTail1); + {{value, {_Prefix, _InnerQTail}}, _QTail1} -> + queue:join(QHead, QTail) + end}. + +foldl(_Fun, Init, {0, _Q}) -> Init; +foldl( Fun, Init, {_N, Q}) -> fold1(fun queue:out/1, Fun, Init, Q). + +foldr(_Fun, Init, {0, _Q}) -> Init; +foldr( Fun, Init, {_N, Q}) -> fold1(fun queue:out_r/1, Fun, Init, Q). + +fold1(Out, Fun, Init, Q) -> + case Out(Q) of + {empty, _Q} -> + Init; + {{value, {Prefix, InnerQ}}, Q1} -> + fold1(Out, Fun, fold1(Out, Fun, Prefix, Init, InnerQ), Q1) + end. + +fold1(Out, Fun, Prefix, Init, InnerQ) -> + case Out(InnerQ) of + {empty, _Q} -> + Init; + {{value, Value}, InnerQ1} -> + fold1(Out, Fun, Prefix, Fun(Prefix, Value, Init), InnerQ1) + end. + +from_list(List) -> + {FinalPrefix, FinalInnerQ, ListOfPQs1, Len} = + lists:foldl( + fun ({_Prefix, []}, Acc) -> + Acc; + ({Prefix, InnerList}, {Prefix, InnerQ, ListOfPQs, LenAcc}) -> + {Prefix, queue:join(InnerQ, queue:from_list(InnerList)), + ListOfPQs, LenAcc + length(InnerList)}; + ({Prefix1, InnerList}, {Prefix, InnerQ, ListOfPQs, LenAcc}) -> + {Prefix1, queue:from_list(InnerList), + [{Prefix, InnerQ} | ListOfPQs], LenAcc + length(InnerList)} + end, {undefined, queue:new(), [], 0}, List), + ListOfPQs2 = [{FinalPrefix, FinalInnerQ} | ListOfPQs1], + [{undefined, InnerQ1} | Rest] = All = lists:reverse(ListOfPQs2), + {Len, queue:from_list(case queue:is_empty(InnerQ1) of + true -> Rest; + false -> All + end)}. + +to_list({0, _Q}) -> []; +to_list({_N, Q}) -> [{Prefix, queue:to_list(InnerQ)} || + {Prefix, InnerQ} <- queue:to_list(Q)]. + +%% map_fold_filter_[lr](FilterFun, Fun, Init, BPQ) -> {BPQ, Init} +%% where FilterFun(Prefix) -> boolean() +%% Fun(Value, Init) -> {Prefix, Value, Init} | stop +%% +%% The filter fun allows you to skip very quickly over blocks that +%% you're not interested in. Such blocks appear in the resulting bpq +%% without modification. The Fun is then used both to map the value, +%% which also allows you to change the prefix (and thus block) of the +%% value, and also to modify the Init/Acc (just like a fold). If the +%% Fun returns 'stop' then it is not applied to any further items. +map_fold_filter_l(_PFilter, _Fun, Init, BPQ = {0, _Q}) -> + {BPQ, Init}; +map_fold_filter_l(PFilter, Fun, Init, {N, Q}) -> + map_fold_filter1({fun queue:out/1, fun queue:in/2, + fun in_q/3, fun join/2}, + N, PFilter, Fun, Init, Q, new()). + +map_fold_filter_r(_PFilter, _Fun, Init, BPQ = {0, _Q}) -> + {BPQ, Init}; +map_fold_filter_r(PFilter, Fun, Init, {N, Q}) -> + map_fold_filter1({fun queue:out_r/1, fun queue:in_r/2, + fun in_q_r/3, fun (T, H) -> join(H, T) end}, + N, PFilter, Fun, Init, Q, new()). + +map_fold_filter1(Funs = {Out, _In, InQ, Join}, Len, PFilter, Fun, + Init, Q, QNew) -> + case Out(Q) of + {empty, _Q} -> + {QNew, Init}; + {{value, {Prefix, InnerQ}}, Q1} -> + case PFilter(Prefix) of + true -> + {Init1, QNew1, Cont} = + map_fold_filter2(Funs, Fun, Prefix, Prefix, + Init, InnerQ, QNew, queue:new()), + case Cont of + false -> {Join(QNew1, {Len - len(QNew1), Q1}), Init1}; + true -> map_fold_filter1(Funs, Len, PFilter, Fun, + Init1, Q1, QNew1) + end; + false -> + map_fold_filter1(Funs, Len, PFilter, Fun, + Init, Q1, InQ(Prefix, InnerQ, QNew)) + end + end. + +map_fold_filter2(Funs = {Out, In, InQ, _Join}, Fun, OrigPrefix, Prefix, + Init, InnerQ, QNew, InnerQNew) -> + case Out(InnerQ) of + {empty, _Q} -> + {Init, InQ(OrigPrefix, InnerQ, + InQ(Prefix, InnerQNew, QNew)), true}; + {{value, Value}, InnerQ1} -> + case Fun(Value, Init) of + stop -> + {Init, InQ(OrigPrefix, InnerQ, + InQ(Prefix, InnerQNew, QNew)), false}; + {Prefix1, Value1, Init1} -> + {Prefix2, QNew1, InnerQNew1} = + case Prefix1 =:= Prefix of + true -> {Prefix, QNew, In(Value1, InnerQNew)}; + false -> {Prefix1, InQ(Prefix, InnerQNew, QNew), + In(Value1, queue:new())} + end, + map_fold_filter2(Funs, Fun, OrigPrefix, Prefix2, + Init1, InnerQ1, QNew1, InnerQNew1) + end + end. diff --git a/src/gatherer.erl b/src/gatherer.erl new file mode 100644 index 00000000..31dda16e --- /dev/null +++ b/src/gatherer.erl @@ -0,0 +1,145 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(gatherer). + +-behaviour(gen_server2). + +-export([start_link/0, stop/1, fork/1, finish/1, in/2, out/1]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}). +-spec(stop/1 :: (pid()) -> 'ok'). +-spec(fork/1 :: (pid()) -> 'ok'). +-spec(finish/1 :: (pid()) -> 'ok'). +-spec(in/2 :: (pid(), any()) -> 'ok'). +-spec(out/1 :: (pid()) -> {'value', any()} | 'empty'). + +-endif. + +%%---------------------------------------------------------------------------- + +-define(HIBERNATE_AFTER_MIN, 1000). +-define(DESIRED_HIBERNATE, 10000). + +%%---------------------------------------------------------------------------- + +-record(gstate, { forks, values, blocked }). + +%%---------------------------------------------------------------------------- + +start_link() -> + gen_server2:start_link(?MODULE, [], [{timeout, infinity}]). + +stop(Pid) -> + gen_server2:call(Pid, stop, infinity). + +fork(Pid) -> + gen_server2:call(Pid, fork, infinity). + +finish(Pid) -> + gen_server2:cast(Pid, finish). + +in(Pid, Value) -> + gen_server2:cast(Pid, {in, Value}). + +out(Pid) -> + gen_server2:call(Pid, out, infinity). + +%%---------------------------------------------------------------------------- + +init([]) -> + {ok, #gstate { forks = 0, values = queue:new(), blocked = queue:new() }, + hibernate, + {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. + +handle_call(stop, _From, State) -> + {stop, normal, ok, State}; + +handle_call(fork, _From, State = #gstate { forks = Forks }) -> + {reply, ok, State #gstate { forks = Forks + 1 }, hibernate}; + +handle_call(out, From, State = #gstate { forks = Forks, + values = Values, + blocked = Blocked }) -> + case queue:out(Values) of + {empty, _} -> + case Forks of + 0 -> {reply, empty, State, hibernate}; + _ -> {noreply, + State #gstate { blocked = queue:in(From, Blocked) }, + hibernate} + end; + {{value, _Value} = V, NewValues} -> + {reply, V, State #gstate { values = NewValues }, hibernate} + end; + +handle_call(Msg, _From, State) -> + {stop, {unexpected_call, Msg}, State}. + +handle_cast(finish, State = #gstate { forks = Forks, blocked = Blocked }) -> + NewForks = Forks - 1, + NewBlocked = case NewForks of + 0 -> [gen_server2:reply(From, empty) || + From <- queue:to_list(Blocked)], + queue:new(); + _ -> Blocked + end, + {noreply, State #gstate { forks = NewForks, blocked = NewBlocked }, + hibernate}; + +handle_cast({in, Value}, State = #gstate { values = Values, + blocked = Blocked }) -> + {noreply, case queue:out(Blocked) of + {empty, _} -> + State #gstate { values = queue:in(Value, Values) }; + {{value, From}, NewBlocked} -> + gen_server2:reply(From, {value, Value}), + State #gstate { blocked = NewBlocked } + end, hibernate}; + +handle_cast(Msg, State) -> + {stop, {unexpected_cast, Msg}, State}. + +handle_info(Msg, State) -> + {stop, {unexpected_info, Msg}, State}. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +terminate(_Reason, State) -> + State. diff --git a/src/rabbit_msg_file.erl b/src/rabbit_msg_file.erl new file mode 100644 index 00000000..4f178439 --- /dev/null +++ b/src/rabbit_msg_file.erl @@ -0,0 +1,136 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_msg_file). + +-export([append/3, read/2, scan/2]). + +%%---------------------------------------------------------------------------- + +-include("rabbit_msg_store.hrl"). + +-define(INTEGER_SIZE_BYTES, 8). +-define(INTEGER_SIZE_BITS, (8 * ?INTEGER_SIZE_BYTES)). +-define(WRITE_OK_SIZE_BITS, 8). +-define(WRITE_OK_MARKER, 255). +-define(FILE_PACKING_ADJUSTMENT, (1 + ?INTEGER_SIZE_BYTES)). +-define(GUID_SIZE_BYTES, 16). +-define(GUID_SIZE_BITS, (8 * ?GUID_SIZE_BYTES)). +-define(SCAN_BLOCK_SIZE, 4194304). %% 4MB + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-type(io_device() :: any()). +-type(position() :: non_neg_integer()). +-type(msg_size() :: non_neg_integer()). +-type(file_size() :: non_neg_integer()). + +-spec(append/3 :: (io_device(), rabbit_guid:guid(), msg()) -> + rabbit_types:ok_or_error2(msg_size(), any())). +-spec(read/2 :: (io_device(), msg_size()) -> + rabbit_types:ok_or_error2({rabbit_guid:guid(), msg()}, + any())). +-spec(scan/2 :: (io_device(), file_size()) -> + {'ok', [{rabbit_guid:guid(), msg_size(), position()}], + position()}). + +-endif. + +%%---------------------------------------------------------------------------- + +append(FileHdl, Guid, MsgBody) + when is_binary(Guid) andalso size(Guid) =:= ?GUID_SIZE_BYTES -> + MsgBodyBin = term_to_binary(MsgBody), + MsgBodyBinSize = size(MsgBodyBin), + Size = MsgBodyBinSize + ?GUID_SIZE_BYTES, + case file_handle_cache:append(FileHdl, + <<Size:?INTEGER_SIZE_BITS, + Guid:?GUID_SIZE_BYTES/binary, + MsgBodyBin:MsgBodyBinSize/binary, + ?WRITE_OK_MARKER:?WRITE_OK_SIZE_BITS>>) of + ok -> {ok, Size + ?FILE_PACKING_ADJUSTMENT}; + KO -> KO + end. + +read(FileHdl, TotalSize) -> + Size = TotalSize - ?FILE_PACKING_ADJUSTMENT, + BodyBinSize = Size - ?GUID_SIZE_BYTES, + case file_handle_cache:read(FileHdl, TotalSize) of + {ok, <<Size:?INTEGER_SIZE_BITS, + Guid:?GUID_SIZE_BYTES/binary, + MsgBodyBin:BodyBinSize/binary, + ?WRITE_OK_MARKER:?WRITE_OK_SIZE_BITS>>} -> + {ok, {Guid, binary_to_term(MsgBodyBin)}}; + KO -> KO + end. + +scan(FileHdl, FileSize) when FileSize >= 0 -> + scan(FileHdl, FileSize, <<>>, 0, [], 0). + +scan(_FileHdl, FileSize, _Data, FileSize, Acc, ScanOffset) -> + {ok, Acc, ScanOffset}; +scan(FileHdl, FileSize, Data, ReadOffset, Acc, ScanOffset) -> + Read = lists:min([?SCAN_BLOCK_SIZE, (FileSize - ReadOffset)]), + case file_handle_cache:read(FileHdl, Read) of + {ok, Data1} -> + {Data2, Acc1, ScanOffset1} = + scan(<<Data/binary, Data1/binary>>, Acc, ScanOffset), + ReadOffset1 = ReadOffset + size(Data1), + scan(FileHdl, FileSize, Data2, ReadOffset1, Acc1, ScanOffset1); + _KO -> + {ok, Acc, ScanOffset} + end. + +scan(<<>>, Acc, Offset) -> + {<<>>, Acc, Offset}; +scan(<<0:?INTEGER_SIZE_BITS, _Rest/binary>>, Acc, Offset) -> + {<<>>, Acc, Offset}; %% Nothing to do other than stop. +scan(<<Size:?INTEGER_SIZE_BITS, GuidAndMsg:Size/binary, + WriteMarker:?WRITE_OK_SIZE_BITS, Rest/binary>>, Acc, Offset) -> + TotalSize = Size + ?FILE_PACKING_ADJUSTMENT, + case WriteMarker of + ?WRITE_OK_MARKER -> + %% Here we take option 5 from + %% http://www.erlang.org/cgi-bin/ezmlm-cgi?2:mss:1569 in + %% which we read the Guid as a number, and then convert it + %% back to a binary in order to work around bugs in + %% Erlang's GC. + <<GuidNum:?GUID_SIZE_BITS, _Msg/binary>> = + <<GuidAndMsg:Size/binary>>, + <<Guid:?GUID_SIZE_BYTES/binary>> = <<GuidNum:?GUID_SIZE_BITS>>, + scan(Rest, [{Guid, TotalSize, Offset} | Acc], Offset + TotalSize); + _ -> + scan(Rest, Acc, Offset + TotalSize) + end; +scan(Data, Acc, Offset) -> + {Data, Acc, Offset}. diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl new file mode 100644 index 00000000..63100571 --- /dev/null +++ b/src/rabbit_msg_store.erl @@ -0,0 +1,1731 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_msg_store). + +-behaviour(gen_server2). + +-export([start_link/4, write/4, read/3, contains/2, remove/2, release/2, + sync/3, client_init/2, client_terminate/1, + client_delete_and_terminate/3, successfully_recovered_state/1]). + +-export([sync/1, gc_done/4, set_maximum_since_use/2, gc/3]). %% internal + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +%%---------------------------------------------------------------------------- + +-include("rabbit_msg_store.hrl"). + +-define(SYNC_INTERVAL, 5). %% milliseconds +-define(CLEAN_FILENAME, "clean.dot"). +-define(FILE_SUMMARY_FILENAME, "file_summary.ets"). + +-define(BINARY_MODE, [raw, binary]). +-define(READ_MODE, [read]). +-define(READ_AHEAD_MODE, [read_ahead | ?READ_MODE]). +-define(WRITE_MODE, [write]). + +-define(FILE_EXTENSION, ".rdq"). +-define(FILE_EXTENSION_TMP, ".rdt"). + +-define(HANDLE_CACHE_BUFFER_SIZE, 1048576). %% 1MB + +%%---------------------------------------------------------------------------- + +-record(msstate, + { dir, %% store directory + index_module, %% the module for index ops + index_state, %% where are messages? + current_file, %% current file name as number + current_file_handle, %% current file handle since the last fsync? + file_handle_cache, %% file handle cache + on_sync, %% pending sync requests + sync_timer_ref, %% TRef for our interval timer + sum_valid_data, %% sum of valid data in all files + sum_file_size, %% sum of file sizes + pending_gc_completion, %% things to do once GC completes + gc_active, %% is the GC currently working? + gc_pid, %% pid of our GC + file_handles_ets, %% tid of the shared file handles table + file_summary_ets, %% tid of the file summary table + dedup_cache_ets, %% tid of dedup cache table + cur_file_cache_ets, %% tid of current file cache table + client_refs, %% set of references of all registered clients + successfully_recovered, %% boolean: did we recover state? + file_size_limit %% how big are our files allowed to get? + }). + +-record(client_msstate, + { file_handle_cache, + index_state, + index_module, + dir, + gc_pid, + file_handles_ets, + file_summary_ets, + dedup_cache_ets, + cur_file_cache_ets + }). + +-record(file_summary, + {file, valid_total_size, contiguous_top, left, right, file_size, + locked, readers}). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-type(server() :: pid() | atom()). +-type(file_num() :: non_neg_integer()). +-type(client_msstate() :: #client_msstate { + file_handle_cache :: dict:dictionary(), + index_state :: any(), + index_module :: atom(), + dir :: file:filename(), + gc_pid :: pid(), + file_handles_ets :: ets:tid(), + file_summary_ets :: ets:tid(), + dedup_cache_ets :: ets:tid(), + cur_file_cache_ets :: ets:tid() }). +-type(startup_fun_state() :: + {(fun ((A) -> 'finished' | {rabbit_guid:guid(), non_neg_integer(), A})), + A}). + +-spec(start_link/4 :: + (atom(), file:filename(), [binary()] | 'undefined', + startup_fun_state()) -> + 'ignore' | rabbit_types:ok_or_error2(pid(), any())). +-spec(write/4 :: (server(), rabbit_guid:guid(), msg(), client_msstate()) -> + rabbit_types:ok(client_msstate())). +-spec(read/3 :: (server(), rabbit_guid:guid(), client_msstate()) -> + {rabbit_types:ok(msg()) | 'not_found', client_msstate()}). +-spec(contains/2 :: (server(), rabbit_guid:guid()) -> boolean()). +-spec(remove/2 :: (server(), [rabbit_guid:guid()]) -> 'ok'). +-spec(release/2 :: (server(), [rabbit_guid:guid()]) -> 'ok'). +-spec(sync/3 :: (server(), [rabbit_guid:guid()], fun (() -> any())) -> 'ok'). +-spec(gc_done/4 :: (server(), non_neg_integer(), file_num(), file_num()) -> + 'ok'). +-spec(set_maximum_since_use/2 :: (server(), non_neg_integer()) -> 'ok'). +-spec(client_init/2 :: (server(), binary()) -> client_msstate()). +-spec(client_terminate/1 :: (client_msstate()) -> 'ok'). +-spec(client_delete_and_terminate/3 :: + (client_msstate(), server(), binary()) -> 'ok'). +-spec(successfully_recovered_state/1 :: (server()) -> boolean()). + +-spec(gc/3 :: (non_neg_integer(), non_neg_integer(), + {ets:tid(), file:filename(), atom(), any()}) -> + 'concurrent_readers' | non_neg_integer()). + +-endif. + +%%---------------------------------------------------------------------------- + +%% We run GC whenever (garbage / sum_file_size) > ?GARBAGE_FRACTION +%% It is not recommended to set this to < 0.5 +-define(GARBAGE_FRACTION, 0.5). + +%% The components: +%% +%% Index: this is a mapping from Guid to #msg_location{}: +%% {Guid, RefCount, File, Offset, TotalSize} +%% By default, it's in ets, but it's also pluggable. +%% FileSummary: this is an ets table which maps File to #file_summary{}: +%% {File, ValidTotalSize, ContiguousTop, Left, Right, +%% FileSize, Locked, Readers} +%% +%% The basic idea is that messages are appended to the current file up +%% until that file becomes too big (> file_size_limit). At that point, +%% the file is closed and a new file is created on the _right_ of the +%% old file which is used for new messages. Files are named +%% numerically ascending, thus the file with the lowest name is the +%% eldest file. +%% +%% We need to keep track of which messages are in which files (this is +%% the Index); how much useful data is in each file and which files +%% are on the left and right of each other. This is the purpose of the +%% FileSummary ets table. +%% +%% As messages are removed from files, holes appear in these +%% files. The field ValidTotalSize contains the total amount of useful +%% data left in the file, whilst ContiguousTop contains the amount of +%% valid data right at the start of each file. These are needed for +%% garbage collection. +%% +%% When we discover that a file is now empty, we delete it. When we +%% discover that it can be combined with the useful data in either its +%% left or right neighbour, and overall, across all the files, we have +%% ((the amount of garbage) / (the sum of all file sizes)) > +%% ?GARBAGE_FRACTION, we start a garbage collection run concurrently, +%% which will compact the two files together. This keeps disk +%% utilisation high and aids performance. We deliberately do this +%% lazily in order to prevent doing GC on files which are soon to be +%% emptied (and hence deleted) soon. +%% +%% Given the compaction between two files, the left file (i.e. elder +%% file) is considered the ultimate destination for the good data in +%% the right file. If necessary, the good data in the left file which +%% is fragmented throughout the file is written out to a temporary +%% file, then read back in to form a contiguous chunk of good data at +%% the start of the left file. Thus the left file is garbage collected +%% and compacted. Then the good data from the right file is copied +%% onto the end of the left file. Index and FileSummary tables are +%% updated. +%% +%% On non-clean startup, we scan the files we discover, dealing with +%% the possibilites of a crash having occured during a compaction +%% (this consists of tidyup - the compaction is deliberately designed +%% such that data is duplicated on disk rather than risking it being +%% lost), and rebuild the FileSummary ets table and Index. +%% +%% So, with this design, messages move to the left. Eventually, they +%% should end up in a contiguous block on the left and are then never +%% rewritten. But this isn't quite the case. If in a file there is one +%% message that is being ignored, for some reason, and messages in the +%% file to the right and in the current block are being read all the +%% time then it will repeatedly be the case that the good data from +%% both files can be combined and will be written out to a new +%% file. Whenever this happens, our shunned message will be rewritten. +%% +%% So, provided that we combine messages in the right order, +%% (i.e. left file, bottom to top, right file, bottom to top), +%% eventually our shunned message will end up at the bottom of the +%% left file. The compaction/combining algorithm is smart enough to +%% read in good data from the left file that is scattered throughout +%% (i.e. C and D in the below diagram), then truncate the file to just +%% above B (i.e. truncate to the limit of the good contiguous region +%% at the start of the file), then write C and D on top and then write +%% E, F and G from the right file on top. Thus contiguous blocks of +%% good data at the bottom of files are not rewritten (yes, this is +%% the data the size of which is tracked by the ContiguousTop +%% variable. Judicious use of a mirror is required). +%% +%% +-------+ +-------+ +-------+ +%% | X | | G | | G | +%% +-------+ +-------+ +-------+ +%% | D | | X | | F | +%% +-------+ +-------+ +-------+ +%% | X | | X | | E | +%% +-------+ +-------+ +-------+ +%% | C | | F | ===> | D | +%% +-------+ +-------+ +-------+ +%% | X | | X | | C | +%% +-------+ +-------+ +-------+ +%% | B | | X | | B | +%% +-------+ +-------+ +-------+ +%% | A | | E | | A | +%% +-------+ +-------+ +-------+ +%% left right left +%% +%% From this reasoning, we do have a bound on the number of times the +%% message is rewritten. From when it is inserted, there can be no +%% files inserted between it and the head of the queue, and the worst +%% case is that everytime it is rewritten, it moves one position lower +%% in the file (for it to stay at the same position requires that +%% there are no holes beneath it, which means truncate would be used +%% and so it would not be rewritten at all). Thus this seems to +%% suggest the limit is the number of messages ahead of it in the +%% queue, though it's likely that that's pessimistic, given the +%% requirements for compaction/combination of files. +%% +%% The other property is that we have is the bound on the lowest +%% utilisation, which should be 50% - worst case is that all files are +%% fractionally over half full and can't be combined (equivalent is +%% alternating full files and files with only one tiny message in +%% them). +%% +%% Messages are reference-counted. When a message with the same guid +%% is written several times we only store it once, and only remove it +%% from the store when it has been removed the same number of times. +%% +%% The reference counts do not persist. Therefore the initialisation +%% function must be provided with a generator that produces ref count +%% deltas for all recovered messages. This is only used on startup +%% when the shutdown was non-clean. +%% +%% Read messages with a reference count greater than one are entered +%% into a message cache. The purpose of the cache is not especially +%% performance, though it can help there too, but prevention of memory +%% explosion. It ensures that as messages with a high reference count +%% are read from several processes they are read back as the same +%% binary object rather than multiples of identical binary +%% objects. +%% +%% Reads can be performed directly by clients without calling to the +%% server. This is safe because multiple file handles can be used to +%% read files. However, locking is used by the concurrent GC to make +%% sure that reads are not attempted from files which are in the +%% process of being garbage collected. +%% +%% The server automatically defers reads, removes and contains calls +%% that occur which refer to files which are currently being +%% GC'd. Contains calls are only deferred in order to ensure they do +%% not overtake removes. +%% +%% The current file to which messages are being written has a +%% write-back cache. This is written to immediately by clients and can +%% be read from by clients too. This means that there are only ever +%% writes made to the current file, thus eliminating delays due to +%% flushing write buffers in order to be able to safely read from the +%% current file. The one exception to this is that on start up, the +%% cache is not populated with msgs found in the current file, and +%% thus in this case only, reads may have to come from the file +%% itself. The effect of this is that even if the msg_store process is +%% heavily overloaded, clients can still write and read messages with +%% very low latency and not block at all. +%% +%% For notes on Clean Shutdown and startup, see documentation in +%% variable_queue. + +%%---------------------------------------------------------------------------- +%% public API +%%---------------------------------------------------------------------------- + +start_link(Server, Dir, ClientRefs, StartupFunState) -> + gen_server2:start_link({local, Server}, ?MODULE, + [Server, Dir, ClientRefs, StartupFunState], + [{timeout, infinity}]). + +write(Server, Guid, Msg, + CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts }) -> + ok = update_msg_cache(CurFileCacheEts, Guid, Msg), + {gen_server2:cast(Server, {write, Guid, Msg}), CState}. + +read(Server, Guid, + CState = #client_msstate { dedup_cache_ets = DedupCacheEts, + cur_file_cache_ets = CurFileCacheEts }) -> + %% 1. Check the dedup cache + case fetch_and_increment_cache(DedupCacheEts, Guid) of + not_found -> + %% 2. Check the cur file cache + case ets:lookup(CurFileCacheEts, Guid) of + [] -> + Defer = fun() -> {gen_server2:pcall( + Server, 2, {read, Guid}, infinity), + CState} end, + case index_lookup(Guid, CState) of + not_found -> Defer(); + MsgLocation -> client_read1(Server, MsgLocation, Defer, + CState) + end; + [{Guid, Msg, _CacheRefCount}] -> + %% Although we've found it, we don't know the + %% refcount, so can't insert into dedup cache + {{ok, Msg}, CState} + end; + Msg -> + {{ok, Msg}, CState} + end. + +contains(Server, Guid) -> gen_server2:call(Server, {contains, Guid}, infinity). +remove(_Server, []) -> ok; +remove(Server, Guids) -> gen_server2:cast(Server, {remove, Guids}). +release(_Server, []) -> ok; +release(Server, Guids) -> gen_server2:cast(Server, {release, Guids}). +sync(Server, Guids, K) -> gen_server2:cast(Server, {sync, Guids, K}). +sync(Server) -> gen_server2:pcast(Server, 8, sync). %% internal + +gc_done(Server, Reclaimed, Source, Destination) -> + gen_server2:pcast(Server, 8, {gc_done, Reclaimed, Source, Destination}). + +set_maximum_since_use(Server, Age) -> + gen_server2:pcast(Server, 8, {set_maximum_since_use, Age}). + +client_init(Server, Ref) -> + {IState, IModule, Dir, GCPid, + FileHandlesEts, FileSummaryEts, DedupCacheEts, CurFileCacheEts} = + gen_server2:call(Server, {new_client_state, Ref}, infinity), + #client_msstate { file_handle_cache = dict:new(), + index_state = IState, + index_module = IModule, + dir = Dir, + gc_pid = GCPid, + file_handles_ets = FileHandlesEts, + file_summary_ets = FileSummaryEts, + dedup_cache_ets = DedupCacheEts, + cur_file_cache_ets = CurFileCacheEts }. + +client_terminate(CState) -> + close_all_handles(CState), + ok. + +client_delete_and_terminate(CState, Server, Ref) -> + ok = client_terminate(CState), + ok = gen_server2:call(Server, {delete_client, Ref}, infinity). + +successfully_recovered_state(Server) -> + gen_server2:call(Server, successfully_recovered_state, infinity). + +%%---------------------------------------------------------------------------- +%% Client-side-only helpers +%%---------------------------------------------------------------------------- + +client_read1(Server, + #msg_location { guid = Guid, file = File } = MsgLocation, + Defer, + CState = #client_msstate { file_summary_ets = FileSummaryEts }) -> + case ets:lookup(FileSummaryEts, File) of + [] -> %% File has been GC'd and no longer exists. Go around again. + read(Server, Guid, CState); + [#file_summary { locked = Locked, right = Right }] -> + client_read2(Server, Locked, Right, MsgLocation, Defer, CState) + end. + +client_read2(_Server, false, undefined, _MsgLocation, Defer, _CState) -> + %% Although we've already checked both caches and not found the + %% message there, the message is apparently in the + %% current_file. We can only arrive here if we are trying to read + %% a message which we have not written, which is very odd, so just + %% defer. + %% + %% OR, on startup, the cur_file_cache is not populated with the + %% contents of the current file, thus reads from the current file + %% will end up here and will need to be deferred. + Defer(); +client_read2(_Server, true, _Right, _MsgLocation, Defer, _CState) -> + %% Of course, in the mean time, the GC could have run and our msg + %% is actually in a different file, unlocked. However, defering is + %% the safest and simplest thing to do. + Defer(); +client_read2(Server, false, _Right, + MsgLocation = #msg_location { guid = Guid, file = File }, + Defer, + CState = #client_msstate { file_summary_ets = FileSummaryEts }) -> + %% It's entirely possible that everything we're doing from here on + %% is for the wrong file, or a non-existent file, as a GC may have + %% finished. + safe_ets_update_counter( + FileSummaryEts, File, {#file_summary.readers, +1}, + fun (_) -> client_read3(Server, MsgLocation, Defer, CState) end, + fun () -> read(Server, Guid, CState) end). + +client_read3(Server, #msg_location { guid = Guid, file = File }, Defer, + CState = #client_msstate { file_handles_ets = FileHandlesEts, + file_summary_ets = FileSummaryEts, + dedup_cache_ets = DedupCacheEts, + gc_pid = GCPid }) -> + Release = + fun() -> ok = case ets:update_counter(FileSummaryEts, File, + {#file_summary.readers, -1}) of + 0 -> case ets:lookup(FileSummaryEts, File) of + [#file_summary { locked = true }] -> + rabbit_msg_store_gc:no_readers( + GCPid, File); + _ -> ok + end; + _ -> ok + end + end, + %% If a GC involving the file hasn't already started, it won't + %% start now. Need to check again to see if we've been locked in + %% the meantime, between lookup and update_counter (thus GC + %% started before our +1. In fact, it could have finished by now + %% too). + case ets:lookup(FileSummaryEts, File) of + [] -> %% GC has deleted our file, just go round again. + read(Server, Guid, CState); + [#file_summary { locked = true }] -> + %% If we get a badarg here, then the GC has finished and + %% deleted our file. Try going around again. Otherwise, + %% just defer. + %% + %% badarg scenario: we lookup, msg_store locks, GC starts, + %% GC ends, we +1 readers, msg_store ets:deletes (and + %% unlocks the dest) + try Release(), + Defer() + catch error:badarg -> read(Server, Guid, CState) + end; + [#file_summary { locked = false }] -> + %% Ok, we're definitely safe to continue - a GC involving + %% the file cannot start up now, and isn't running, so + %% nothing will tell us from now on to close the handle if + %% it's already open. + %% + %% Finally, we need to recheck that the msg is still at + %% the same place - it's possible an entire GC ran between + %% us doing the lookup and the +1 on the readers. (Same as + %% badarg scenario above, but we don't have a missing file + %% - we just have the /wrong/ file). + case index_lookup(Guid, CState) of + #msg_location { file = File } = MsgLocation -> + %% Still the same file. + mark_handle_open(FileHandlesEts, File), + + CState1 = close_all_indicated(CState), + {Msg, CState2} = %% This will never be the current file + read_from_disk(MsgLocation, CState1, DedupCacheEts), + Release(), %% this MUST NOT fail with badarg + {{ok, Msg}, CState2}; + MsgLocation -> %% different file! + Release(), %% this MUST NOT fail with badarg + client_read1(Server, MsgLocation, Defer, CState) + end + end. + +%%---------------------------------------------------------------------------- +%% gen_server callbacks +%%---------------------------------------------------------------------------- + +init([Server, BaseDir, ClientRefs, {MsgRefDeltaGen, MsgRefDeltaGenInit}]) -> + process_flag(trap_exit, true), + + ok = file_handle_cache:register_callback(?MODULE, set_maximum_since_use, + [self()]), + + Dir = filename:join(BaseDir, atom_to_list(Server)), + + {ok, IndexModule} = application:get_env(msg_store_index_module), + rabbit_log:info("~w: using ~p to provide index~n", [Server, IndexModule]), + + {AllCleanShutdown, IndexState, ClientRefs1} = + recover_index_and_client_refs(IndexModule, ClientRefs, Dir, Server), + + {FileSummaryRecovered, FileSummaryEts} = + recover_file_summary(AllCleanShutdown, Dir, Server), + + DedupCacheEts = ets:new(rabbit_msg_store_dedup_cache, [set, public]), + FileHandlesEts = ets:new(rabbit_msg_store_shared_file_handles, + [ordered_set, public]), + CurFileCacheEts = ets:new(rabbit_msg_store_cur_file, [set, public]), + + {ok, FileSizeLimit} = application:get_env(msg_store_file_size_limit), + + State = #msstate { dir = Dir, + index_module = IndexModule, + index_state = IndexState, + current_file = 0, + current_file_handle = undefined, + file_handle_cache = dict:new(), + on_sync = [], + sync_timer_ref = undefined, + sum_valid_data = 0, + sum_file_size = 0, + pending_gc_completion = [], + gc_active = false, + gc_pid = undefined, + file_handles_ets = FileHandlesEts, + file_summary_ets = FileSummaryEts, + dedup_cache_ets = DedupCacheEts, + cur_file_cache_ets = CurFileCacheEts, + client_refs = ClientRefs1, + successfully_recovered = AllCleanShutdown, + file_size_limit = FileSizeLimit + }, + + ok = case AllCleanShutdown of + true -> ok; + false -> count_msg_refs(MsgRefDeltaGen, MsgRefDeltaGenInit, State) + end, + + FileNames = + sort_file_names(filelib:wildcard("*" ++ ?FILE_EXTENSION, Dir)), + TmpFileNames = + sort_file_names(filelib:wildcard("*" ++ ?FILE_EXTENSION_TMP, Dir)), + ok = recover_crashed_compactions(Dir, FileNames, TmpFileNames), + + %% There should be no more tmp files now, so go ahead and load the + %% whole lot + Files = [filename_to_num(FileName) || FileName <- FileNames], + {Offset, State1 = #msstate { current_file = CurFile }} = + build_index(FileSummaryRecovered, Files, State), + + %% read is only needed so that we can seek + {ok, CurHdl} = open_file(Dir, filenum_to_name(CurFile), + [read | ?WRITE_MODE]), + {ok, Offset} = file_handle_cache:position(CurHdl, Offset), + ok = file_handle_cache:truncate(CurHdl), + + {ok, GCPid} = rabbit_msg_store_gc:start_link(Dir, IndexState, IndexModule, + FileSummaryEts), + + {ok, maybe_compact( + State1 #msstate { current_file_handle = CurHdl, gc_pid = GCPid }), + hibernate, + {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. + +handle_call({read, Guid}, From, State) -> + State1 = read_message(Guid, From, State), + noreply(State1); + +handle_call({contains, Guid}, From, State) -> + State1 = contains_message(Guid, From, State), + noreply(State1); + +handle_call({new_client_state, CRef}, _From, + State = #msstate { dir = Dir, + index_state = IndexState, + index_module = IndexModule, + file_handles_ets = FileHandlesEts, + file_summary_ets = FileSummaryEts, + dedup_cache_ets = DedupCacheEts, + cur_file_cache_ets = CurFileCacheEts, + client_refs = ClientRefs, + gc_pid = GCPid }) -> + reply({IndexState, IndexModule, Dir, GCPid, + FileHandlesEts, FileSummaryEts, DedupCacheEts, CurFileCacheEts}, + State #msstate { client_refs = sets:add_element(CRef, ClientRefs) }); + +handle_call(successfully_recovered_state, _From, State) -> + reply(State #msstate.successfully_recovered, State); + +handle_call({delete_client, CRef}, _From, + State = #msstate { client_refs = ClientRefs }) -> + reply(ok, + State #msstate { client_refs = sets:del_element(CRef, ClientRefs) }). + +handle_cast({write, Guid, Msg}, + State = #msstate { current_file_handle = CurHdl, + current_file = CurFile, + sum_valid_data = SumValid, + sum_file_size = SumFileSize, + file_summary_ets = FileSummaryEts, + cur_file_cache_ets = CurFileCacheEts }) -> + true = 0 =< ets:update_counter(CurFileCacheEts, Guid, {3, -1}), + case index_lookup(Guid, State) of + not_found -> + %% New message, lots to do + {ok, CurOffset} = file_handle_cache:current_virtual_offset(CurHdl), + {ok, TotalSize} = rabbit_msg_file:append(CurHdl, Guid, Msg), + ok = index_insert(#msg_location { + guid = Guid, ref_count = 1, file = CurFile, + offset = CurOffset, total_size = TotalSize }, + State), + [#file_summary { valid_total_size = ValidTotalSize, + contiguous_top = ContiguousTop, + right = undefined, + locked = false, + file_size = FileSize }] = + ets:lookup(FileSummaryEts, CurFile), + ValidTotalSize1 = ValidTotalSize + TotalSize, + ContiguousTop1 = case CurOffset =:= ContiguousTop of + true -> ValidTotalSize1; + false -> ContiguousTop + end, + true = ets:update_element( + FileSummaryEts, CurFile, + [{#file_summary.valid_total_size, ValidTotalSize1}, + {#file_summary.contiguous_top, ContiguousTop1}, + {#file_summary.file_size, FileSize + TotalSize}]), + NextOffset = CurOffset + TotalSize, + noreply( + maybe_roll_to_new_file( + NextOffset, State #msstate { + sum_valid_data = SumValid + TotalSize, + sum_file_size = SumFileSize + TotalSize })); + #msg_location { ref_count = RefCount } -> + %% We already know about it, just update counter. Only + %% update field otherwise bad interaction with concurrent GC + ok = index_update_fields(Guid, + {#msg_location.ref_count, RefCount + 1}, + State), + noreply(State) + end; + +handle_cast({remove, Guids}, State) -> + State1 = lists:foldl( + fun (Guid, State2) -> remove_message(Guid, State2) end, + State, Guids), + noreply(maybe_compact(State1)); + +handle_cast({release, Guids}, State = + #msstate { dedup_cache_ets = DedupCacheEts }) -> + lists:foreach( + fun (Guid) -> decrement_cache(DedupCacheEts, Guid) end, Guids), + noreply(State); + +handle_cast({sync, Guids, K}, + State = #msstate { current_file = CurFile, + current_file_handle = CurHdl, + on_sync = Syncs }) -> + {ok, SyncOffset} = file_handle_cache:last_sync_offset(CurHdl), + case lists:any(fun (Guid) -> + #msg_location { file = File, offset = Offset } = + index_lookup(Guid, State), + File =:= CurFile andalso Offset >= SyncOffset + end, Guids) of + false -> K(), + noreply(State); + true -> noreply(State #msstate { on_sync = [K | Syncs] }) + end; + +handle_cast(sync, State) -> + noreply(internal_sync(State)); + +handle_cast({gc_done, Reclaimed, Src, Dst}, + State = #msstate { sum_file_size = SumFileSize, + gc_active = {Src, Dst}, + file_handles_ets = FileHandlesEts, + file_summary_ets = FileSummaryEts }) -> + %% GC done, so now ensure that any clients that have open fhs to + %% those files close them before using them again. This has to be + %% done here (given it's done in the msg_store, and not the gc), + %% and not when starting up the GC, because if done when starting + %% up the GC, the client could find the close, and close and + %% reopen the fh, whilst the GC is waiting for readers to + %% disappear, before it's actually done the GC. + true = mark_handle_to_close(FileHandlesEts, Src), + true = mark_handle_to_close(FileHandlesEts, Dst), + %% we always move data left, so Src has gone and was on the + %% right, so need to make dest = source.right.left, and also + %% dest.right = source.right + [#file_summary { left = Dst, + right = SrcRight, + locked = true, + readers = 0 }] = ets:lookup(FileSummaryEts, Src), + %% this could fail if SrcRight =:= undefined + ets:update_element(FileSummaryEts, SrcRight, {#file_summary.left, Dst}), + true = ets:update_element(FileSummaryEts, Dst, + [{#file_summary.locked, false}, + {#file_summary.right, SrcRight}]), + true = ets:delete(FileSummaryEts, Src), + noreply( + maybe_compact(run_pending( + State #msstate { sum_file_size = SumFileSize - Reclaimed, + gc_active = false }))); + +handle_cast({set_maximum_since_use, Age}, State) -> + ok = file_handle_cache:set_maximum_since_use(Age), + noreply(State). + +handle_info(timeout, State) -> + noreply(internal_sync(State)); + +handle_info({'EXIT', _Pid, Reason}, State) -> + {stop, Reason, State}. + +terminate(_Reason, State = #msstate { index_state = IndexState, + index_module = IndexModule, + current_file_handle = CurHdl, + gc_pid = GCPid, + file_handles_ets = FileHandlesEts, + file_summary_ets = FileSummaryEts, + dedup_cache_ets = DedupCacheEts, + cur_file_cache_ets = CurFileCacheEts, + client_refs = ClientRefs, + dir = Dir }) -> + %% stop the gc first, otherwise it could be working and we pull + %% out the ets tables from under it. + ok = rabbit_msg_store_gc:stop(GCPid), + State1 = case CurHdl of + undefined -> State; + _ -> State2 = internal_sync(State), + file_handle_cache:close(CurHdl), + State2 + end, + State3 = close_all_handles(State1), + store_file_summary(FileSummaryEts, Dir), + [ets:delete(T) || + T <- [FileSummaryEts, DedupCacheEts, FileHandlesEts, CurFileCacheEts]], + IndexModule:terminate(IndexState), + store_recovery_terms([{client_refs, sets:to_list(ClientRefs)}, + {index_module, IndexModule}], Dir), + State3 #msstate { index_state = undefined, + current_file_handle = undefined }. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%---------------------------------------------------------------------------- +%% general helper functions +%%---------------------------------------------------------------------------- + +noreply(State) -> + {State1, Timeout} = next_state(State), + {noreply, State1, Timeout}. + +reply(Reply, State) -> + {State1, Timeout} = next_state(State), + {reply, Reply, State1, Timeout}. + +next_state(State = #msstate { on_sync = [], sync_timer_ref = undefined }) -> + {State, hibernate}; +next_state(State = #msstate { sync_timer_ref = undefined }) -> + {start_sync_timer(State), 0}; +next_state(State = #msstate { on_sync = [] }) -> + {stop_sync_timer(State), hibernate}; +next_state(State) -> + {State, 0}. + +start_sync_timer(State = #msstate { sync_timer_ref = undefined }) -> + {ok, TRef} = timer:apply_after(?SYNC_INTERVAL, ?MODULE, sync, [self()]), + State #msstate { sync_timer_ref = TRef }. + +stop_sync_timer(State = #msstate { sync_timer_ref = undefined }) -> + State; +stop_sync_timer(State = #msstate { sync_timer_ref = TRef }) -> + {ok, cancel} = timer:cancel(TRef), + State #msstate { sync_timer_ref = undefined }. + +internal_sync(State = #msstate { current_file_handle = CurHdl, + on_sync = Syncs }) -> + State1 = stop_sync_timer(State), + case Syncs of + [] -> State1; + _ -> ok = file_handle_cache:sync(CurHdl), + lists:foreach(fun (K) -> K() end, lists:reverse(Syncs)), + State1 #msstate { on_sync = [] } + end. + +read_message(Guid, From, + State = #msstate { dedup_cache_ets = DedupCacheEts }) -> + case index_lookup(Guid, State) of + not_found -> + gen_server2:reply(From, not_found), + State; + MsgLocation -> + case fetch_and_increment_cache(DedupCacheEts, Guid) of + not_found -> read_message1(From, MsgLocation, State); + Msg -> gen_server2:reply(From, {ok, Msg}), + State + end + end. + +read_message1(From, #msg_location { guid = Guid, ref_count = RefCount, + file = File, offset = Offset } = MsgLoc, + State = #msstate { current_file = CurFile, + current_file_handle = CurHdl, + file_summary_ets = FileSummaryEts, + dedup_cache_ets = DedupCacheEts, + cur_file_cache_ets = CurFileCacheEts }) -> + case File =:= CurFile of + true -> {Msg, State1} = + %% can return [] if msg in file existed on startup + case ets:lookup(CurFileCacheEts, Guid) of + [] -> + {ok, RawOffSet} = + file_handle_cache:current_raw_offset(CurHdl), + ok = case Offset >= RawOffSet of + true -> file_handle_cache:flush(CurHdl); + false -> ok + end, + read_from_disk(MsgLoc, State, DedupCacheEts); + [{Guid, Msg1, _CacheRefCount}] -> + ok = maybe_insert_into_cache( + DedupCacheEts, RefCount, Guid, Msg1), + {Msg1, State} + end, + gen_server2:reply(From, {ok, Msg}), + State1; + false -> [#file_summary { locked = Locked }] = + ets:lookup(FileSummaryEts, File), + case Locked of + true -> add_to_pending_gc_completion({read, Guid, From}, + State); + false -> {Msg, State1} = + read_from_disk(MsgLoc, State, DedupCacheEts), + gen_server2:reply(From, {ok, Msg}), + State1 + end + end. + +read_from_disk(#msg_location { guid = Guid, ref_count = RefCount, + file = File, offset = Offset, + total_size = TotalSize }, + State, DedupCacheEts) -> + {Hdl, State1} = get_read_handle(File, State), + {ok, Offset} = file_handle_cache:position(Hdl, Offset), + {ok, {Guid, Msg}} = + case rabbit_msg_file:read(Hdl, TotalSize) of + {ok, {Guid, _}} = Obj -> + Obj; + Rest -> + {error, {misread, [{old_state, State}, + {file_num, File}, + {offset, Offset}, + {guid, Guid}, + {read, Rest}, + {proc_dict, get()} + ]}} + end, + ok = maybe_insert_into_cache(DedupCacheEts, RefCount, Guid, Msg), + {Msg, State1}. + +contains_message(Guid, From, State = #msstate { gc_active = GCActive }) -> + case index_lookup(Guid, State) of + not_found -> + gen_server2:reply(From, false), + State; + #msg_location { file = File } -> + case GCActive of + {A, B} when File =:= A orelse File =:= B -> + add_to_pending_gc_completion( + {contains, Guid, From}, State); + _ -> + gen_server2:reply(From, true), + State + end + end. + +remove_message(Guid, State = #msstate { sum_valid_data = SumValid, + file_summary_ets = FileSummaryEts, + dedup_cache_ets = DedupCacheEts }) -> + #msg_location { ref_count = RefCount, file = File, + offset = Offset, total_size = TotalSize } = + index_lookup(Guid, State), + case RefCount of + 1 -> + %% don't remove from CUR_FILE_CACHE_ETS_NAME here because + %% there may be further writes in the mailbox for the same + %% msg. + ok = remove_cache_entry(DedupCacheEts, Guid), + [#file_summary { valid_total_size = ValidTotalSize, + contiguous_top = ContiguousTop, + locked = Locked }] = + ets:lookup(FileSummaryEts, File), + case Locked of + true -> + add_to_pending_gc_completion({remove, Guid}, State); + false -> + ok = index_delete(Guid, State), + ContiguousTop1 = lists:min([ContiguousTop, Offset]), + ValidTotalSize1 = ValidTotalSize - TotalSize, + true = ets:update_element( + FileSummaryEts, File, + [{#file_summary.valid_total_size, ValidTotalSize1}, + {#file_summary.contiguous_top, ContiguousTop1}]), + State1 = delete_file_if_empty(File, State), + State1 #msstate { sum_valid_data = SumValid - TotalSize } + end; + _ when 1 < RefCount -> + ok = decrement_cache(DedupCacheEts, Guid), + %% only update field, otherwise bad interaction with concurrent GC + ok = index_update_fields(Guid, + {#msg_location.ref_count, RefCount - 1}, + State), + State + end. + +add_to_pending_gc_completion( + Op, State = #msstate { pending_gc_completion = Pending }) -> + State #msstate { pending_gc_completion = [Op | Pending] }. + +run_pending(State = #msstate { pending_gc_completion = [] }) -> + State; +run_pending(State = #msstate { pending_gc_completion = Pending }) -> + State1 = State #msstate { pending_gc_completion = [] }, + lists:foldl(fun run_pending/2, State1, lists:reverse(Pending)). + +run_pending({read, Guid, From}, State) -> + read_message(Guid, From, State); +run_pending({contains, Guid, From}, State) -> + contains_message(Guid, From, State); +run_pending({remove, Guid}, State) -> + remove_message(Guid, State). + +safe_ets_update_counter(Tab, Key, UpdateOp, SuccessFun, FailThunk) -> + try + SuccessFun(ets:update_counter(Tab, Key, UpdateOp)) + catch error:badarg -> FailThunk() + end. + +safe_ets_update_counter_ok(Tab, Key, UpdateOp, FailThunk) -> + safe_ets_update_counter(Tab, Key, UpdateOp, fun (_) -> ok end, FailThunk). + +%%---------------------------------------------------------------------------- +%% file helper functions +%%---------------------------------------------------------------------------- + +open_file(Dir, FileName, Mode) -> + file_handle_cache:open(form_filename(Dir, FileName), ?BINARY_MODE ++ Mode, + [{write_buffer, ?HANDLE_CACHE_BUFFER_SIZE}]). + +close_handle(Key, CState = #client_msstate { file_handle_cache = FHC }) -> + CState #client_msstate { file_handle_cache = close_handle(Key, FHC) }; + +close_handle(Key, State = #msstate { file_handle_cache = FHC }) -> + State #msstate { file_handle_cache = close_handle(Key, FHC) }; + +close_handle(Key, FHC) -> + case dict:find(Key, FHC) of + {ok, Hdl} -> ok = file_handle_cache:close(Hdl), + dict:erase(Key, FHC); + error -> FHC + end. + +mark_handle_open(FileHandlesEts, File) -> + %% This is fine to fail (already exists) + ets:insert_new(FileHandlesEts, {{self(), File}, open}), + true. + +mark_handle_to_close(FileHandlesEts, File) -> + [ ets:update_element(FileHandlesEts, Key, {2, close}) + || {Key, open} <- ets:match_object(FileHandlesEts, {{'_', File}, open}) ], + true. + +close_all_indicated(#client_msstate { file_handles_ets = FileHandlesEts } = + CState) -> + Objs = ets:match_object(FileHandlesEts, {{self(), '_'}, close}), + lists:foldl(fun ({Key = {_Self, File}, close}, CStateM) -> + true = ets:delete(FileHandlesEts, Key), + close_handle(File, CStateM) + end, CState, Objs). + +close_all_handles(CState = #client_msstate { file_handles_ets = FileHandlesEts, + file_handle_cache = FHC }) -> + Self = self(), + ok = dict:fold(fun (File, Hdl, ok) -> + true = ets:delete(FileHandlesEts, {Self, File}), + file_handle_cache:close(Hdl) + end, ok, FHC), + CState #client_msstate { file_handle_cache = dict:new() }; + +close_all_handles(State = #msstate { file_handle_cache = FHC }) -> + ok = dict:fold(fun (_Key, Hdl, ok) -> file_handle_cache:close(Hdl) end, + ok, FHC), + State #msstate { file_handle_cache = dict:new() }. + +get_read_handle(FileNum, CState = #client_msstate { file_handle_cache = FHC, + dir = Dir }) -> + {Hdl, FHC2} = get_read_handle(FileNum, FHC, Dir), + {Hdl, CState #client_msstate { file_handle_cache = FHC2 }}; + +get_read_handle(FileNum, State = #msstate { file_handle_cache = FHC, + dir = Dir }) -> + {Hdl, FHC2} = get_read_handle(FileNum, FHC, Dir), + {Hdl, State #msstate { file_handle_cache = FHC2 }}. + +get_read_handle(FileNum, FHC, Dir) -> + case dict:find(FileNum, FHC) of + {ok, Hdl} -> {Hdl, FHC}; + error -> {ok, Hdl} = open_file(Dir, filenum_to_name(FileNum), + ?READ_MODE), + {Hdl, dict:store(FileNum, Hdl, FHC)} + end. + +preallocate(Hdl, FileSizeLimit, FinalPos) -> + {ok, FileSizeLimit} = file_handle_cache:position(Hdl, FileSizeLimit), + ok = file_handle_cache:truncate(Hdl), + {ok, FinalPos} = file_handle_cache:position(Hdl, FinalPos), + ok. + +truncate_and_extend_file(Hdl, Lowpoint, Highpoint) -> + {ok, Lowpoint} = file_handle_cache:position(Hdl, Lowpoint), + ok = file_handle_cache:truncate(Hdl), + ok = preallocate(Hdl, Highpoint, Lowpoint). + +form_filename(Dir, Name) -> filename:join(Dir, Name). + +filenum_to_name(File) -> integer_to_list(File) ++ ?FILE_EXTENSION. + +filename_to_num(FileName) -> list_to_integer(filename:rootname(FileName)). + +sort_file_names(FileNames) -> + lists:sort(fun (A, B) -> filename_to_num(A) < filename_to_num(B) end, + FileNames). + +%%---------------------------------------------------------------------------- +%% message cache helper functions +%%---------------------------------------------------------------------------- + +maybe_insert_into_cache(DedupCacheEts, RefCount, Guid, Msg) + when RefCount > 1 -> + update_msg_cache(DedupCacheEts, Guid, Msg); +maybe_insert_into_cache(_DedupCacheEts, _RefCount, _Guid, _Msg) -> + ok. + +update_msg_cache(CacheEts, Guid, Msg) -> + case ets:insert_new(CacheEts, {Guid, Msg, 1}) of + true -> ok; + false -> safe_ets_update_counter_ok( + CacheEts, Guid, {3, +1}, + fun () -> update_msg_cache(CacheEts, Guid, Msg) end) + end. + +remove_cache_entry(DedupCacheEts, Guid) -> + true = ets:delete(DedupCacheEts, Guid), + ok. + +fetch_and_increment_cache(DedupCacheEts, Guid) -> + case ets:lookup(DedupCacheEts, Guid) of + [] -> + not_found; + [{_Guid, Msg, _RefCount}] -> + safe_ets_update_counter_ok( + DedupCacheEts, Guid, {3, +1}, + %% someone has deleted us in the meantime, insert us + fun () -> ok = update_msg_cache(DedupCacheEts, Guid, Msg) end), + Msg + end. + +decrement_cache(DedupCacheEts, Guid) -> + true = safe_ets_update_counter( + DedupCacheEts, Guid, {3, -1}, + fun (N) when N =< 0 -> true = ets:delete(DedupCacheEts, Guid); + (_N) -> true + end, + %% Guid is not in there because although it's been + %% delivered, it's never actually been read (think: + %% persistent message held in RAM) + fun () -> true end), + ok. + +%%---------------------------------------------------------------------------- +%% index +%%---------------------------------------------------------------------------- + +index_lookup(Key, #client_msstate { index_module = Index, + index_state = State }) -> + Index:lookup(Key, State); + +index_lookup(Key, #msstate { index_module = Index, index_state = State }) -> + Index:lookup(Key, State). + +index_insert(Obj, #msstate { index_module = Index, index_state = State }) -> + Index:insert(Obj, State). + +index_update(Obj, #msstate { index_module = Index, index_state = State }) -> + Index:update(Obj, State). + +index_update_fields(Key, Updates, #msstate { index_module = Index, + index_state = State }) -> + Index:update_fields(Key, Updates, State). + +index_delete(Key, #msstate { index_module = Index, index_state = State }) -> + Index:delete(Key, State). + +index_delete_by_file(File, #msstate { index_module = Index, + index_state = State }) -> + Index:delete_by_file(File, State). + +%%---------------------------------------------------------------------------- +%% shutdown and recovery +%%---------------------------------------------------------------------------- + +recover_index_and_client_refs(IndexModule, undefined, Dir, _Server) -> + ok = rabbit_misc:recursive_delete([Dir]), + ok = filelib:ensure_dir(filename:join(Dir, "nothing")), + {false, IndexModule:new(Dir), sets:new()}; +recover_index_and_client_refs(IndexModule, ClientRefs, Dir, Server) -> + ok = filelib:ensure_dir(filename:join(Dir, "nothing")), + Fresh = fun (ErrorMsg, ErrorArgs) -> + rabbit_log:warning("~w: " ++ ErrorMsg ++ + "~nrebuilding indices from scratch~n", + [Server | ErrorArgs]), + {false, IndexModule:new(Dir), sets:new()} + end, + case read_recovery_terms(Dir) of + {false, Error} -> + Fresh("failed to read recovery terms: ~p", [Error]); + {true, Terms} -> + RecClientRefs = proplists:get_value(client_refs, Terms, []), + RecIndexModule = proplists:get_value(index_module, Terms), + case (lists:sort(ClientRefs) =:= lists:sort(RecClientRefs) + andalso IndexModule =:= RecIndexModule) of + true -> case IndexModule:recover(Dir) of + {ok, IndexState1} -> + ClientRefs1 = sets:from_list(ClientRefs), + {true, IndexState1, ClientRefs1}; + {error, Error} -> + Fresh("failed to recover index: ~p", [Error]) + end; + false -> Fresh("recovery terms differ from present", []) + end + end. + +store_recovery_terms(Terms, Dir) -> + rabbit_misc:write_term_file(filename:join(Dir, ?CLEAN_FILENAME), Terms). + +read_recovery_terms(Dir) -> + Path = filename:join(Dir, ?CLEAN_FILENAME), + case rabbit_misc:read_term_file(Path) of + {ok, Terms} -> case file:delete(Path) of + ok -> {true, Terms}; + {error, Error} -> {false, Error} + end; + {error, Error} -> {false, Error} + end. + +store_file_summary(Tid, Dir) -> + ok = ets:tab2file(Tid, filename:join(Dir, ?FILE_SUMMARY_FILENAME), + [{extended_info, [object_count]}]). + +recover_file_summary(false, _Dir, _Server) -> + %% TODO: the only reason for this to be an *ordered*_set is so + %% that a) maybe_compact can start a traversal from the eldest + %% file, and b) build_index in fast recovery mode can easily + %% identify the current file. It's awkward to have both that + %% odering and the left/right pointers in the entries - replacing + %% the former with some additional bit of state would be easy, but + %% ditching the latter would be neater. + {false, ets:new(rabbit_msg_store_file_summary, + [ordered_set, public, {keypos, #file_summary.file}])}; +recover_file_summary(true, Dir, Server) -> + Path = filename:join(Dir, ?FILE_SUMMARY_FILENAME), + case ets:file2tab(Path) of + {ok, Tid} -> file:delete(Path), + {true, Tid}; + {error, Error} -> rabbit_log:warning( + "~w: failed to recover file summary: ~p~n" + "rebuilding~n", [Server, Error]), + recover_file_summary(false, Dir, Server) + end. + +count_msg_refs(Gen, Seed, State) -> + case Gen(Seed) of + finished -> + ok; + {_Guid, 0, Next} -> + count_msg_refs(Gen, Next, State); + {Guid, Delta, Next} -> + ok = case index_lookup(Guid, State) of + not_found -> + index_insert(#msg_location { guid = Guid, + ref_count = Delta }, + State); + #msg_location { ref_count = RefCount } = StoreEntry -> + NewRefCount = RefCount + Delta, + case NewRefCount of + 0 -> index_delete(Guid, State); + _ -> index_update(StoreEntry #msg_location { + ref_count = NewRefCount }, + State) + end + end, + count_msg_refs(Gen, Next, State) + end. + +recover_crashed_compactions(Dir, FileNames, TmpFileNames) -> + lists:foreach( + fun (TmpFileName) -> + NonTmpRelatedFileName = + filename:rootname(TmpFileName) ++ ?FILE_EXTENSION, + true = lists:member(NonTmpRelatedFileName, FileNames), + ok = recover_crashed_compaction( + Dir, TmpFileName, NonTmpRelatedFileName) + end, TmpFileNames), + ok. + +recover_crashed_compaction(Dir, TmpFileName, NonTmpRelatedFileName) -> + {ok, UncorruptedMessagesTmp, GuidsTmp} = + scan_file_for_valid_messages_and_guids(Dir, TmpFileName), + {ok, UncorruptedMessages, Guids} = + scan_file_for_valid_messages_and_guids(Dir, NonTmpRelatedFileName), + %% 1) It's possible that everything in the tmp file is also in the + %% main file such that the main file is (prefix ++ + %% tmpfile). This means that compaction failed immediately + %% prior to the final step of deleting the tmp file. Plan: just + %% delete the tmp file + %% 2) It's possible that everything in the tmp file is also in the + %% main file but with holes throughout (or just somthing like + %% main = (prefix ++ hole ++ tmpfile)). This means that + %% compaction wrote out the tmp file successfully and then + %% failed. Plan: just delete the tmp file and allow the + %% compaction to eventually be triggered later + %% 3) It's possible that everything in the tmp file is also in the + %% main file but such that the main file does not end with tmp + %% file (and there are valid messages in the suffix; main = + %% (prefix ++ tmpfile[with extra holes?] ++ suffix)). This + %% means that compaction failed as we were writing out the tmp + %% file. Plan: just delete the tmp file and allow the + %% compaction to eventually be triggered later + %% 4) It's possible that there are messages in the tmp file which + %% are not in the main file. This means that writing out the + %% tmp file succeeded, but then we failed as we were copying + %% them back over to the main file, after truncating the main + %% file. As the main file has already been truncated, it should + %% consist only of valid messages. Plan: Truncate the main file + %% back to before any of the files in the tmp file and copy + %% them over again + TmpPath = form_filename(Dir, TmpFileName), + case is_sublist(GuidsTmp, Guids) of + true -> %% we're in case 1, 2 or 3 above. Just delete the tmp file + %% note this also catches the case when the tmp file + %% is empty + ok = file:delete(TmpPath); + false -> + %% We're in case 4 above. We only care about the inital + %% msgs in main file that are not in the tmp file. If + %% there are no msgs in the tmp file then we would be in + %% the 'true' branch of this case, so we know the + %% lists:last call is safe. + EldestTmpGuid = lists:last(GuidsTmp), + {Guids1, UncorruptedMessages1} + = case lists:splitwith( + fun (Guid) -> Guid =/= EldestTmpGuid end, Guids) of + {_Guids, []} -> %% no msgs from tmp in main + {Guids, UncorruptedMessages}; + {Dropped, [EldestTmpGuid | Rest]} -> + %% Msgs in Dropped are in tmp, so forget them. + %% *cry*. Lists indexed from 1. + {Rest, lists:sublist(UncorruptedMessages, + 2 + length(Dropped), + length(Rest))} + end, + %% The main file prefix should be contiguous + {Top, Guids1} = find_contiguous_block_prefix( + lists:reverse(UncorruptedMessages1)), + %% we should have that none of the messages in the prefix + %% are in the tmp file + true = is_disjoint(Guids1, GuidsTmp), + %% must open with read flag, otherwise will stomp over contents + {ok, MainHdl} = open_file(Dir, NonTmpRelatedFileName, + [read | ?WRITE_MODE]), + %% Wipe out any rubbish at the end of the file. Remember + %% the head of the list will be the highest entry in the + %% file. + [{_, TmpTopTotalSize, TmpTopOffset}|_] = UncorruptedMessagesTmp, + TmpSize = TmpTopOffset + TmpTopTotalSize, + %% Extend the main file as big as necessary in a single + %% move. If we run out of disk space, this truncate could + %% fail, but we still aren't risking losing data + ok = truncate_and_extend_file(MainHdl, Top, Top + TmpSize), + {ok, TmpHdl} = open_file(Dir, TmpFileName, ?READ_AHEAD_MODE), + {ok, TmpSize} = file_handle_cache:copy(TmpHdl, MainHdl, TmpSize), + ok = file_handle_cache:close(MainHdl), + ok = file_handle_cache:delete(TmpHdl), + + {ok, _MainMessages, GuidsMain} = + scan_file_for_valid_messages_and_guids( + Dir, NonTmpRelatedFileName), + %% check that everything in Guids1 is in GuidsMain + true = is_sublist(Guids1, GuidsMain), + %% check that everything in GuidsTmp is in GuidsMain + true = is_sublist(GuidsTmp, GuidsMain) + end, + ok. + +is_sublist(SmallerL, BiggerL) -> + lists:all(fun (Item) -> lists:member(Item, BiggerL) end, SmallerL). + +is_disjoint(SmallerL, BiggerL) -> + lists:all(fun (Item) -> not lists:member(Item, BiggerL) end, SmallerL). + +scan_file_for_valid_messages(Dir, FileName) -> + case open_file(Dir, FileName, ?READ_MODE) of + {ok, Hdl} -> Valid = rabbit_msg_file:scan( + Hdl, filelib:file_size( + form_filename(Dir, FileName))), + %% if something really bad has happened, + %% the close could fail, but ignore + file_handle_cache:close(Hdl), + Valid; + {error, enoent} -> {ok, [], 0}; + {error, Reason} -> {error, {unable_to_scan_file, FileName, Reason}} + end. + +scan_file_for_valid_messages_and_guids(Dir, FileName) -> + {ok, Messages, _FileSize} = scan_file_for_valid_messages(Dir, FileName), + {ok, Messages, [Guid || {Guid, _TotalSize, _FileOffset} <- Messages]}. + +%% Takes the list in *ascending* order (i.e. eldest message +%% first). This is the opposite of what scan_file_for_valid_messages +%% produces. The list of msgs that is produced is youngest first. +find_contiguous_block_prefix(L) -> find_contiguous_block_prefix(L, 0, []). + +find_contiguous_block_prefix([], ExpectedOffset, Guids) -> + {ExpectedOffset, Guids}; +find_contiguous_block_prefix([{Guid, TotalSize, ExpectedOffset} | Tail], + ExpectedOffset, Guids) -> + ExpectedOffset1 = ExpectedOffset + TotalSize, + find_contiguous_block_prefix(Tail, ExpectedOffset1, [Guid | Guids]); +find_contiguous_block_prefix([_MsgAfterGap | _Tail], ExpectedOffset, Guids) -> + {ExpectedOffset, Guids}. + +build_index(true, _Files, State = #msstate { + file_summary_ets = FileSummaryEts }) -> + ets:foldl( + fun (#file_summary { valid_total_size = ValidTotalSize, + file_size = FileSize, + file = File }, + {_Offset, State1 = #msstate { sum_valid_data = SumValid, + sum_file_size = SumFileSize }}) -> + {FileSize, State1 #msstate { + sum_valid_data = SumValid + ValidTotalSize, + sum_file_size = SumFileSize + FileSize, + current_file = File }} + end, {0, State}, FileSummaryEts); +build_index(false, Files, State) -> + {ok, Pid} = gatherer:start_link(), + case Files of + [] -> build_index(Pid, undefined, [State #msstate.current_file], State); + _ -> {Offset, State1} = build_index(Pid, undefined, Files, State), + {Offset, lists:foldl(fun delete_file_if_empty/2, State1, Files)} + end. + +build_index(Gatherer, Left, [], + State = #msstate { file_summary_ets = FileSummaryEts, + sum_valid_data = SumValid, + sum_file_size = SumFileSize }) -> + case gatherer:out(Gatherer) of + empty -> + ok = gatherer:stop(Gatherer), + ok = rabbit_misc:unlink_and_capture_exit(Gatherer), + ok = index_delete_by_file(undefined, State), + Offset = case ets:lookup(FileSummaryEts, Left) of + [] -> 0; + [#file_summary { file_size = FileSize }] -> FileSize + end, + {Offset, State #msstate { current_file = Left }}; + {value, #file_summary { valid_total_size = ValidTotalSize, + file_size = FileSize } = FileSummary} -> + true = ets:insert_new(FileSummaryEts, FileSummary), + build_index(Gatherer, Left, [], + State #msstate { + sum_valid_data = SumValid + ValidTotalSize, + sum_file_size = SumFileSize + FileSize }) + end; +build_index(Gatherer, Left, [File|Files], State) -> + ok = gatherer:fork(Gatherer), + ok = worker_pool:submit_async( + fun () -> build_index_worker(Gatherer, State, + Left, File, Files) + end), + build_index(Gatherer, File, Files, State). + +build_index_worker(Gatherer, State = #msstate { dir = Dir }, + Left, File, Files) -> + {ok, Messages, FileSize} = + scan_file_for_valid_messages(Dir, filenum_to_name(File)), + {ValidMessages, ValidTotalSize} = + lists:foldl( + fun (Obj = {Guid, TotalSize, Offset}, {VMAcc, VTSAcc}) -> + case index_lookup(Guid, State) of + not_found -> + {VMAcc, VTSAcc}; + StoreEntry -> + ok = index_update(StoreEntry #msg_location { + file = File, offset = Offset, + total_size = TotalSize }, + State), + {[Obj | VMAcc], VTSAcc + TotalSize} + end + end, {[], 0}, Messages), + %% foldl reverses lists, find_contiguous_block_prefix needs + %% msgs eldest first, so, ValidMessages is the right way round + {ContiguousTop, _} = find_contiguous_block_prefix(ValidMessages), + {Right, FileSize1} = + case Files of + %% if it's the last file, we'll truncate to remove any + %% rubbish above the last valid message. This affects the + %% file size. + [] -> {undefined, case ValidMessages of + [] -> 0; + _ -> {_Guid, TotalSize, Offset} = + lists:last(ValidMessages), + Offset + TotalSize + end}; + [F|_] -> {F, FileSize} + end, + ok = gatherer:in(Gatherer, #file_summary { + file = File, + valid_total_size = ValidTotalSize, + contiguous_top = ContiguousTop, + left = Left, + right = Right, + file_size = FileSize1, + locked = false, + readers = 0 }), + ok = gatherer:finish(Gatherer). + +%%---------------------------------------------------------------------------- +%% garbage collection / compaction / aggregation -- internal +%%---------------------------------------------------------------------------- + +maybe_roll_to_new_file( + Offset, + State = #msstate { dir = Dir, + current_file_handle = CurHdl, + current_file = CurFile, + file_summary_ets = FileSummaryEts, + cur_file_cache_ets = CurFileCacheEts, + file_size_limit = FileSizeLimit }) + when Offset >= FileSizeLimit -> + State1 = internal_sync(State), + ok = file_handle_cache:close(CurHdl), + NextFile = CurFile + 1, + {ok, NextHdl} = open_file(Dir, filenum_to_name(NextFile), ?WRITE_MODE), + true = ets:insert_new(FileSummaryEts, #file_summary { + file = NextFile, + valid_total_size = 0, + contiguous_top = 0, + left = CurFile, + right = undefined, + file_size = 0, + locked = false, + readers = 0 }), + true = ets:update_element(FileSummaryEts, CurFile, + {#file_summary.right, NextFile}), + true = ets:match_delete(CurFileCacheEts, {'_', '_', 0}), + maybe_compact(State1 #msstate { current_file_handle = NextHdl, + current_file = NextFile }); +maybe_roll_to_new_file(_, State) -> + State. + +maybe_compact(State = #msstate { sum_valid_data = SumValid, + sum_file_size = SumFileSize, + gc_active = false, + gc_pid = GCPid, + file_summary_ets = FileSummaryEts, + file_size_limit = FileSizeLimit }) + when (SumFileSize > 2 * FileSizeLimit andalso + (SumFileSize - SumValid) / SumFileSize > ?GARBAGE_FRACTION) -> + %% TODO: the algorithm here is sub-optimal - it may result in a + %% complete traversal of FileSummaryEts. + case ets:first(FileSummaryEts) of + '$end_of_table' -> + State; + First -> + case find_files_to_gc(FileSummaryEts, FileSizeLimit, + ets:lookup(FileSummaryEts, First)) of + not_found -> + State; + {Src, Dst} -> + State1 = close_handle(Src, close_handle(Dst, State)), + true = ets:update_element(FileSummaryEts, Src, + {#file_summary.locked, true}), + true = ets:update_element(FileSummaryEts, Dst, + {#file_summary.locked, true}), + ok = rabbit_msg_store_gc:gc(GCPid, Src, Dst), + State1 #msstate { gc_active = {Src, Dst} } + end + end; +maybe_compact(State) -> + State. + +find_files_to_gc(FileSummaryEts, FileSizeLimit, + [#file_summary { file = Dst, + valid_total_size = DstValid, + right = Src }]) -> + case Src of + undefined -> + not_found; + _ -> + [#file_summary { file = Src, + valid_total_size = SrcValid, + left = Dst, + right = SrcRight }] = Next = + ets:lookup(FileSummaryEts, Src), + case SrcRight of + undefined -> not_found; + _ -> case DstValid + SrcValid =< FileSizeLimit of + true -> {Src, Dst}; + false -> find_files_to_gc( + FileSummaryEts, FileSizeLimit, Next) + end + end + end. + +delete_file_if_empty(File, State = #msstate { current_file = File }) -> + State; +delete_file_if_empty(File, State = #msstate { + dir = Dir, + sum_file_size = SumFileSize, + file_handles_ets = FileHandlesEts, + file_summary_ets = FileSummaryEts }) -> + [#file_summary { valid_total_size = ValidData, + left = Left, + right = Right, + file_size = FileSize, + locked = false }] = + ets:lookup(FileSummaryEts, File), + case ValidData of + %% we should NEVER find the current file in here hence right + %% should always be a file, not undefined + 0 -> case {Left, Right} of + {undefined, _} when Right =/= undefined -> + %% the eldest file is empty. + true = ets:update_element( + FileSummaryEts, Right, + {#file_summary.left, undefined}); + {_, _} when Right =/= undefined -> + true = ets:update_element(FileSummaryEts, Right, + {#file_summary.left, Left}), + true = ets:update_element(FileSummaryEts, Left, + {#file_summary.right, Right}) + end, + true = mark_handle_to_close(FileHandlesEts, File), + true = ets:delete(FileSummaryEts, File), + State1 = close_handle(File, State), + ok = file:delete(form_filename(Dir, filenum_to_name(File))), + State1 #msstate { sum_file_size = SumFileSize - FileSize }; + _ -> State + end. + +%%---------------------------------------------------------------------------- +%% garbage collection / compaction / aggregation -- external +%%---------------------------------------------------------------------------- + +gc(SrcFile, DstFile, State = {FileSummaryEts, _Dir, _Index, _IndexState}) -> + [SrcObj = #file_summary { + readers = SrcReaders, + left = DstFile, + file_size = SrcFileSize, + locked = true }] = ets:lookup(FileSummaryEts, SrcFile), + [DstObj = #file_summary { + readers = DstReaders, + right = SrcFile, + file_size = DstFileSize, + locked = true }] = ets:lookup(FileSummaryEts, DstFile), + + case SrcReaders =:= 0 andalso DstReaders =:= 0 of + true -> TotalValidData = combine_files(SrcObj, DstObj, State), + %% don't update dest.right, because it could be + %% changing at the same time + true = ets:update_element( + FileSummaryEts, DstFile, + [{#file_summary.valid_total_size, TotalValidData}, + {#file_summary.contiguous_top, TotalValidData}, + {#file_summary.file_size, TotalValidData}]), + SrcFileSize + DstFileSize - TotalValidData; + false -> concurrent_readers + end. + +combine_files(#file_summary { file = Source, + valid_total_size = SourceValid, + left = Destination }, + #file_summary { file = Destination, + valid_total_size = DestinationValid, + contiguous_top = DestinationContiguousTop, + right = Source }, + State = {_FileSummaryEts, Dir, _Index, _IndexState}) -> + SourceName = filenum_to_name(Source), + DestinationName = filenum_to_name(Destination), + {ok, SourceHdl} = open_file(Dir, SourceName, + ?READ_AHEAD_MODE), + {ok, DestinationHdl} = open_file(Dir, DestinationName, + ?READ_AHEAD_MODE ++ ?WRITE_MODE), + ExpectedSize = SourceValid + DestinationValid, + %% if DestinationValid =:= DestinationContiguousTop then we don't + %% need a tmp file + %% if they're not equal, then we need to write out everything past + %% the DestinationContiguousTop to a tmp file then truncate, + %% copy back in, and then copy over from Source + %% otherwise we just truncate straight away and copy over from Source + case DestinationContiguousTop =:= DestinationValid of + true -> + ok = truncate_and_extend_file( + DestinationHdl, DestinationContiguousTop, ExpectedSize); + false -> + {DestinationWorkList, DestinationValid} = + find_unremoved_messages_in_file(Destination, State), + Worklist = + lists:dropwhile( + fun (#msg_location { offset = Offset }) + when Offset =/= DestinationContiguousTop -> + %% it cannot be that Offset =:= + %% DestinationContiguousTop because if it + %% was then DestinationContiguousTop would + %% have been extended by TotalSize + Offset < DestinationContiguousTop + end, DestinationWorkList), + Tmp = filename:rootname(DestinationName) ++ ?FILE_EXTENSION_TMP, + {ok, TmpHdl} = open_file(Dir, Tmp, ?READ_AHEAD_MODE ++ ?WRITE_MODE), + ok = copy_messages( + Worklist, DestinationContiguousTop, DestinationValid, + DestinationHdl, TmpHdl, Destination, State), + TmpSize = DestinationValid - DestinationContiguousTop, + %% so now Tmp contains everything we need to salvage from + %% Destination, and index_state has been updated to + %% reflect the compaction of Destination so truncate + %% Destination and copy from Tmp back to the end + {ok, 0} = file_handle_cache:position(TmpHdl, 0), + ok = truncate_and_extend_file( + DestinationHdl, DestinationContiguousTop, ExpectedSize), + {ok, TmpSize} = + file_handle_cache:copy(TmpHdl, DestinationHdl, TmpSize), + %% position in DestinationHdl should now be DestinationValid + ok = file_handle_cache:sync(DestinationHdl), + ok = file_handle_cache:delete(TmpHdl) + end, + {SourceWorkList, SourceValid} = + find_unremoved_messages_in_file(Source, State), + ok = copy_messages(SourceWorkList, DestinationValid, ExpectedSize, + SourceHdl, DestinationHdl, Destination, State), + %% tidy up + ok = file_handle_cache:close(DestinationHdl), + ok = file_handle_cache:delete(SourceHdl), + ExpectedSize. + +find_unremoved_messages_in_file(File, + {_FileSummaryEts, Dir, Index, IndexState}) -> + %% Messages here will be end-of-file at start-of-list + {ok, Messages, _FileSize} = + scan_file_for_valid_messages(Dir, filenum_to_name(File)), + %% foldl will reverse so will end up with msgs in ascending offset order + lists:foldl(fun ({Guid, TotalSize, _Offset}, Acc = {List, Size}) -> + case Index:lookup(Guid, IndexState) of + #msg_location { file = File } = Entry -> + {[ Entry | List ], TotalSize + Size}; + _ -> + Acc + end + end, {[], 0}, Messages). + +copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl, + Destination, {_FileSummaryEts, _Dir, Index, IndexState}) -> + Copy = fun ({BlockStart, BlockEnd}) -> + BSize = BlockEnd - BlockStart, + {ok, BlockStart} = + file_handle_cache:position(SourceHdl, BlockStart), + {ok, BSize} = + file_handle_cache:copy(SourceHdl, DestinationHdl, BSize) + end, + case + lists:foldl( + fun (#msg_location { guid = Guid, offset = Offset, + total_size = TotalSize }, + {CurOffset, Block = {BlockStart, BlockEnd}}) -> + %% CurOffset is in the DestinationFile. + %% Offset, BlockStart and BlockEnd are in the SourceFile + %% update MsgLocation to reflect change of file and offset + ok = Index:update_fields(Guid, + [{#msg_location.file, Destination}, + {#msg_location.offset, CurOffset}], + IndexState), + {CurOffset + TotalSize, + case BlockEnd of + undefined -> + %% base case, called only for the first list elem + {Offset, Offset + TotalSize}; + Offset -> + %% extend the current block because the + %% next msg follows straight on + {BlockStart, BlockEnd + TotalSize}; + _ -> + %% found a gap, so actually do the work for + %% the previous block + Copy(Block), + {Offset, Offset + TotalSize} + end} + end, {InitOffset, {undefined, undefined}}, WorkList) of + {FinalOffset, Block} -> + case WorkList of + [] -> ok; + _ -> Copy(Block), %% do the last remaining block + ok = file_handle_cache:sync(DestinationHdl) + end; + {FinalOffsetZ, _Block} -> + {gc_error, [{expected, FinalOffset}, + {got, FinalOffsetZ}, + {destination, Destination}]} + end. diff --git a/src/rabbit_msg_store_ets_index.erl b/src/rabbit_msg_store_ets_index.erl new file mode 100644 index 00000000..1eb3c11f --- /dev/null +++ b/src/rabbit_msg_store_ets_index.erl @@ -0,0 +1,90 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_msg_store_ets_index). + +-behaviour(rabbit_msg_store_index). + +-export([new/1, recover/1, + lookup/2, insert/2, update/2, update_fields/3, delete/2, + delete_by_file/2, terminate/1]). + +-define(MSG_LOC_NAME, rabbit_msg_store_ets_index). +-define(FILENAME, "msg_store_index.ets"). + +-include("rabbit_msg_store_index.hrl"). + +-record(state, { table, dir }). + +new(Dir) -> + file:delete(filename:join(Dir, ?FILENAME)), + Tid = ets:new(?MSG_LOC_NAME, [set, public, {keypos, #msg_location.guid}]), + #state { table = Tid, dir = Dir }. + +recover(Dir) -> + Path = filename:join(Dir, ?FILENAME), + case ets:file2tab(Path) of + {ok, Tid} -> file:delete(Path), + {ok, #state { table = Tid, dir = Dir }}; + Error -> Error + end. + +lookup(Key, State) -> + case ets:lookup(State #state.table, Key) of + [] -> not_found; + [Entry] -> Entry + end. + +insert(Obj, State) -> + true = ets:insert_new(State #state.table, Obj), + ok. + +update(Obj, State) -> + true = ets:insert(State #state.table, Obj), + ok. + +update_fields(Key, Updates, State) -> + true = ets:update_element(State #state.table, Key, Updates), + ok. + +delete(Key, State) -> + true = ets:delete(State #state.table, Key), + ok. + +delete_by_file(File, State) -> + MatchHead = #msg_location { file = File, _ = '_' }, + ets:select_delete(State #state.table, [{MatchHead, [], [true]}]), + ok. + +terminate(#state { table = MsgLocations, dir = Dir }) -> + ok = ets:tab2file(MsgLocations, filename:join(Dir, ?FILENAME), + [{extended_info, [object_count]}]), + ets:delete(MsgLocations). diff --git a/src/rabbit_msg_store_gc.erl b/src/rabbit_msg_store_gc.erl new file mode 100644 index 00000000..eaa41173 --- /dev/null +++ b/src/rabbit_msg_store_gc.erl @@ -0,0 +1,141 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_msg_store_gc). + +-behaviour(gen_server2). + +-export([start_link/4, gc/3, no_readers/2, stop/1]). + +-export([set_maximum_since_use/2]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-record(gcstate, + {dir, + index_state, + index_module, + parent, + file_summary_ets, + scheduled + }). + +-include("rabbit.hrl"). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(start_link/4 :: (file:filename(), any(), atom(), ets:tid()) -> + 'ignore' | rabbit_types:ok_or_error2(pid(), any())). +-spec(gc/3 :: (pid(), non_neg_integer(), non_neg_integer()) -> 'ok'). +-spec(no_readers/2 :: (pid(), non_neg_integer()) -> 'ok'). +-spec(stop/1 :: (pid()) -> 'ok'). +-spec(set_maximum_since_use/2 :: (pid(), non_neg_integer()) -> 'ok'). + +-endif. + +%%---------------------------------------------------------------------------- + +start_link(Dir, IndexState, IndexModule, FileSummaryEts) -> + gen_server2:start_link( + ?MODULE, [self(), Dir, IndexState, IndexModule, FileSummaryEts], + [{timeout, infinity}]). + +gc(Server, Source, Destination) -> + gen_server2:cast(Server, {gc, Source, Destination}). + +no_readers(Server, File) -> + gen_server2:cast(Server, {no_readers, File}). + +stop(Server) -> + gen_server2:call(Server, stop, infinity). + +set_maximum_since_use(Pid, Age) -> + gen_server2:pcast(Pid, 8, {set_maximum_since_use, Age}). + +%%---------------------------------------------------------------------------- + +init([Parent, Dir, IndexState, IndexModule, FileSummaryEts]) -> + ok = file_handle_cache:register_callback(?MODULE, set_maximum_since_use, + [self()]), + {ok, #gcstate { dir = Dir, + index_state = IndexState, + index_module = IndexModule, + parent = Parent, + file_summary_ets = FileSummaryEts, + scheduled = undefined }, + hibernate, + {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. + +handle_call(stop, _From, State) -> + {stop, normal, ok, State}. + +handle_cast({gc, Source, Destination}, + State = #gcstate { scheduled = undefined }) -> + {noreply, attempt_gc(State #gcstate { scheduled = {Source, Destination} }), + hibernate}; + +handle_cast({no_readers, File}, + State = #gcstate { scheduled = {Source, Destination} }) + when File =:= Source orelse File =:= Destination -> + {noreply, attempt_gc(State), hibernate}; + +handle_cast({no_readers, _File}, State) -> + {noreply, State, hibernate}; + +handle_cast({set_maximum_since_use, Age}, State) -> + ok = file_handle_cache:set_maximum_since_use(Age), + {noreply, State, hibernate}. + +handle_info(Info, State) -> + {stop, {unhandled_info, Info}, State}. + +terminate(_Reason, State) -> + State. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +attempt_gc(State = #gcstate { dir = Dir, + index_state = IndexState, + index_module = Index, + parent = Parent, + file_summary_ets = FileSummaryEts, + scheduled = {Source, Destination} }) -> + case rabbit_msg_store:gc(Source, Destination, + {FileSummaryEts, Dir, Index, IndexState}) of + concurrent_readers -> State; + Reclaimed -> ok = rabbit_msg_store:gc_done( + Parent, Reclaimed, Source, Destination), + State #gcstate { scheduled = undefined } + end. diff --git a/src/rabbit_msg_store_index.erl b/src/rabbit_msg_store_index.erl new file mode 100644 index 00000000..0ed64a9d --- /dev/null +++ b/src/rabbit_msg_store_index.erl @@ -0,0 +1,47 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_msg_store_index). + +-export([behaviour_info/1]). + +behaviour_info(callbacks) -> + [{new, 1}, + {recover, 1}, + {lookup, 2}, + {insert, 2}, + {update, 2}, + {update_fields, 3}, + {delete, 2}, + {delete_by_file, 2}, + {terminate, 1}]; +behaviour_info(_Other) -> + undefined. diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl new file mode 100644 index 00000000..91b19976 --- /dev/null +++ b/src/rabbit_queue_index.erl @@ -0,0 +1,929 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_queue_index). + +-export([init/3, terminate/2, delete_and_terminate/1, publish/4, + deliver/2, ack/2, sync/2, flush/1, read/3, + next_segment_boundary/1, bounds/1, recover/1]). + +-define(CLEAN_FILENAME, "clean.dot"). + +%%---------------------------------------------------------------------------- + +%% The queue index is responsible for recording the order of messages +%% within a queue on disk. +%% +%% Because of the fact that the queue can decide at any point to send +%% a queue entry to disk, you can not rely on publishes appearing in +%% order. The only thing you can rely on is a message being published, +%% then delivered, then ack'd. +%% +%% In order to be able to clean up ack'd messages, we write to segment +%% files. These files have a fixed maximum size: ?SEGMENT_ENTRY_COUNT +%% publishes, delivers and acknowledgements. They are numbered, and so +%% it is known that the 0th segment contains messages 0 -> +%% ?SEGMENT_ENTRY_COUNT - 1, the 1st segment contains messages +%% ?SEGMENT_ENTRY_COUNT -> 2*?SEGMENT_ENTRY_COUNT - 1 and so on. As +%% such, in the segment files, we only refer to message sequence ids +%% by the LSBs as SeqId rem ?SEGMENT_ENTRY_COUNT. This gives them a +%% fixed size. +%% +%% However, transient messages which are not sent to disk at any point +%% will cause gaps to appear in segment files. Therefore, we delete a +%% segment file whenever the number of publishes == number of acks +%% (note that although it is not fully enforced, it is assumed that a +%% message will never be ackd before it is delivered, thus this test +%% also implies == number of delivers). In practise, this does not +%% cause disk churn in the pathological case because of the journal +%% and caching (see below). +%% +%% Because of the fact that publishes, delivers and acks can occur all +%% over, we wish to avoid lots of seeking. Therefore we have a fixed +%% sized journal to which all actions are appended. When the number of +%% entries in this journal reaches max_journal_entries, the journal +%% entries are scattered out to their relevant files, and the journal +%% is truncated to zero size. Note that entries in the journal must +%% carry the full sequence id, thus the format of entries in the +%% journal is different to that in the segments. +%% +%% The journal is also kept fully in memory, pre-segmented: the state +%% contains a mapping from segment numbers to state-per-segment (this +%% state is held for all segments which have been "seen": thus a +%% segment which has been read but has no pending entries in the +%% journal is still held in this mapping. Also note that a dict is +%% used for this mapping, not an array because with an array, you will +%% always have entries from 0). Actions are stored directly in this +%% state. Thus at the point of flushing the journal, firstly no +%% reading from disk is necessary, but secondly if the known number of +%% acks and publishes in a segment are equal, given the known state of +%% the segment file combined with the journal, no writing needs to be +%% done to the segment file either (in fact it is deleted if it exists +%% at all). This is safe given that the set of acks is a subset of the +%% set of publishes. When it's necessary to sync messages because of +%% transactions, it's only necessary to fsync on the journal: when +%% entries are distributed from the journal to segment files, those +%% segments appended to are fsync'd prior to the journal being +%% truncated. +%% +%% This module is also responsible for scanning the queue index files +%% and seeding the message store on start up. +%% +%% Note that in general, the representation of a message's state as +%% the tuple: {('no_pub'|{Guid, IsPersistent}), ('del'|'no_del'), +%% ('ack'|'no_ack')} is richer than strictly necessary for most +%% operations. However, for startup, and to ensure the safe and +%% correct combination of journal entries with entries read from the +%% segment on disk, this richer representation vastly simplifies and +%% clarifies the code. +%% +%% For notes on Clean Shutdown and startup, see documentation in +%% variable_queue. +%% +%%---------------------------------------------------------------------------- + +%% ---- Journal details ---- + +-define(JOURNAL_FILENAME, "journal.jif"). + +-define(PUB_PERSIST_JPREFIX, 2#00). +-define(PUB_TRANS_JPREFIX, 2#01). +-define(DEL_JPREFIX, 2#10). +-define(ACK_JPREFIX, 2#11). +-define(JPREFIX_BITS, 2). +-define(SEQ_BYTES, 8). +-define(SEQ_BITS, ((?SEQ_BYTES * 8) - ?JPREFIX_BITS)). + +%% ---- Segment details ---- + +-define(SEGMENT_EXTENSION, ".idx"). + +%% TODO: The segment size would be configurable, but deriving all the +%% other values is quite hairy and quite possibly noticably less +%% efficient, depending on how clever the compiler is when it comes to +%% binary generation/matching with constant vs variable lengths. + +-define(REL_SEQ_BITS, 14). +-define(SEGMENT_ENTRY_COUNT, 16384). %% trunc(math:pow(2,?REL_SEQ_BITS))). + +%% seq only is binary 00 followed by 14 bits of rel seq id +%% (range: 0 - 16383) +-define(REL_SEQ_ONLY_PREFIX, 00). +-define(REL_SEQ_ONLY_PREFIX_BITS, 2). +-define(REL_SEQ_ONLY_ENTRY_LENGTH_BYTES, 2). + +%% publish record is binary 1 followed by a bit for is_persistent, +%% then 14 bits of rel seq id, and 128 bits of md5sum msg id +-define(PUBLISH_PREFIX, 1). +-define(PUBLISH_PREFIX_BITS, 1). + +-define(GUID_BYTES, 16). %% md5sum is 128 bit or 16 bytes +-define(GUID_BITS, (?GUID_BYTES * 8)). +%% 16 bytes for md5sum + 2 for seq, bits and prefix +-define(PUBLISH_RECORD_LENGTH_BYTES, ?GUID_BYTES + 2). + +%% 1 publish, 1 deliver, 1 ack per msg +-define(SEGMENT_TOTAL_SIZE, ?SEGMENT_ENTRY_COUNT * + (?PUBLISH_RECORD_LENGTH_BYTES + + (2 * ?REL_SEQ_ONLY_ENTRY_LENGTH_BYTES))). + +%% ---- misc ---- + +-define(PUB, {_, _}). %% {Guid, IsPersistent} + +-define(READ_MODE, [binary, raw, read, {read_ahead, ?SEGMENT_TOTAL_SIZE}]). + +%%---------------------------------------------------------------------------- + +-record(qistate, { dir, segments, journal_handle, dirty_count, + max_journal_entries }). + +-record(segment, { num, path, journal_entries, unacked }). + +-include("rabbit.hrl"). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-type(hdl() :: ('undefined' | any())). +-type(segment() :: ('undefined' | + #segment { num :: non_neg_integer(), + path :: file:filename(), + journal_entries :: array(), + unacked :: non_neg_integer() + })). +-type(seq_id() :: integer()). +-type(seg_dict() :: {dict:dictionary(), [segment()]}). +-type(qistate() :: #qistate { dir :: file:filename(), + segments :: 'undefined' | seg_dict(), + journal_handle :: hdl(), + dirty_count :: integer(), + max_journal_entries :: non_neg_integer() + }). +-type(startup_fun_state() :: + {(fun ((A) -> 'finished' | {rabbit_guid:guid(), non_neg_integer(), A})), + A}). + +-spec(init/3 :: (rabbit_amqqueue:name(), boolean(), + fun ((rabbit_guid:guid()) -> boolean())) -> + {'undefined' | non_neg_integer(), [any()], qistate()}). +-spec(terminate/2 :: ([any()], qistate()) -> qistate()). +-spec(delete_and_terminate/1 :: (qistate()) -> qistate()). +-spec(publish/4 :: (rabbit_guid:guid(), seq_id(), boolean(), qistate()) -> + qistate()). +-spec(deliver/2 :: ([seq_id()], qistate()) -> qistate()). +-spec(ack/2 :: ([seq_id()], qistate()) -> qistate()). +-spec(sync/2 :: ([seq_id()], qistate()) -> qistate()). +-spec(flush/1 :: (qistate()) -> qistate()). +-spec(read/3 :: (seq_id(), seq_id(), qistate()) -> + {[{rabbit_guid:guid(), seq_id(), boolean(), boolean()}], + qistate()}). +-spec(next_segment_boundary/1 :: (seq_id()) -> seq_id()). +-spec(bounds/1 :: (qistate()) -> + {non_neg_integer(), non_neg_integer(), qistate()}). +-spec(recover/1 :: + ([rabbit_amqqueue:name()]) -> {[[any()]], startup_fun_state()}). + +-endif. + + +%%---------------------------------------------------------------------------- +%% public API +%%---------------------------------------------------------------------------- + +init(Name, MsgStoreRecovered, ContainsCheckFun) -> + State = #qistate { dir = Dir } = blank_state(Name), + Terms = case read_shutdown_terms(Dir) of + {error, _} -> []; + {ok, Terms1} -> Terms1 + end, + CleanShutdown = detect_clean_shutdown(Dir), + {Count, State1} = + case CleanShutdown andalso MsgStoreRecovered of + true -> RecoveredCounts = proplists:get_value(segments, Terms, []), + init_clean(RecoveredCounts, State); + false -> init_dirty(CleanShutdown, ContainsCheckFun, State) + end, + {Count, Terms, State1}. + +terminate(Terms, State) -> + {SegmentCounts, State1 = #qistate { dir = Dir }} = terminate(State), + store_clean_shutdown([{segments, SegmentCounts} | Terms], Dir), + State1. + +delete_and_terminate(State) -> + {_SegmentCounts, State1 = #qistate { dir = Dir }} = terminate(State), + ok = rabbit_misc:recursive_delete([Dir]), + State1. + +publish(Guid, SeqId, IsPersistent, State) when is_binary(Guid) -> + ?GUID_BYTES = size(Guid), + {JournalHdl, State1} = get_journal_handle(State), + ok = file_handle_cache:append( + JournalHdl, [<<(case IsPersistent of + true -> ?PUB_PERSIST_JPREFIX; + false -> ?PUB_TRANS_JPREFIX + end):?JPREFIX_BITS, SeqId:?SEQ_BITS>>, Guid]), + maybe_flush_journal(add_to_journal(SeqId, {Guid, IsPersistent}, State1)). + +deliver(SeqIds, State) -> + deliver_or_ack(del, SeqIds, State). + +ack(SeqIds, State) -> + deliver_or_ack(ack, SeqIds, State). + +sync([], State) -> + State; +sync(_SeqIds, State = #qistate { journal_handle = undefined }) -> + State; +sync(_SeqIds, State = #qistate { journal_handle = JournalHdl }) -> + %% The SeqIds here contains the SeqId of every publish and ack in + %% the transaction. Ideally we should go through these seqids and + %% only sync the journal if the pubs or acks appear in the + %% journal. However, this would be complex to do, and given that + %% the variable queue publishes and acks to the qi, and then + %% syncs, all in one operation, there is no possibility of the + %% seqids not being in the journal, provided the transaction isn't + %% emptied (handled above anyway). + ok = file_handle_cache:sync(JournalHdl), + State. + +flush(State = #qistate { dirty_count = 0 }) -> State; +flush(State) -> flush_journal(State). + +read(StartEnd, StartEnd, State) -> + {[], State}; +read(Start, End, State = #qistate { segments = Segments, + dir = Dir }) when Start =< End -> + %% Start is inclusive, End is exclusive. + LowerB = {StartSeg, _StartRelSeq} = seq_id_to_seg_and_rel_seq_id(Start), + UpperB = {EndSeg, _EndRelSeq} = seq_id_to_seg_and_rel_seq_id(End - 1), + {Messages, Segments1} = + lists:foldr(fun (Seg, Acc) -> + read_bounded_segment(Seg, LowerB, UpperB, Acc, Dir) + end, {[], Segments}, lists:seq(StartSeg, EndSeg)), + {Messages, State #qistate { segments = Segments1 }}. + +next_segment_boundary(SeqId) -> + {Seg, _RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId), + reconstruct_seq_id(Seg + 1, 0). + +bounds(State = #qistate { segments = Segments }) -> + %% This is not particularly efficient, but only gets invoked on + %% queue initialisation. + SegNums = lists:sort(segment_nums(Segments)), + %% Don't bother trying to figure out the lowest seq_id, merely the + %% seq_id of the start of the lowest segment. That seq_id may not + %% actually exist, but that's fine. The important thing is that + %% the segment exists and the seq_id reported is on a segment + %% boundary. + %% + %% We also don't really care about the max seq_id. Just start the + %% next segment: it makes life much easier. + %% + %% SegNums is sorted, ascending. + {LowSeqId, NextSeqId} = + case SegNums of + [] -> {0, 0}; + [MinSeg|_] -> {reconstruct_seq_id(MinSeg, 0), + reconstruct_seq_id(1 + lists:last(SegNums), 0)} + end, + {LowSeqId, NextSeqId, State}. + +recover(DurableQueues) -> + DurableDict = dict:from_list([ {queue_name_to_dir_name(Queue), Queue} || + Queue <- DurableQueues ]), + QueuesDir = queues_dir(), + Directories = case file:list_dir(QueuesDir) of + {ok, Entries} -> [ Entry || Entry <- Entries, + filelib:is_dir( + filename:join( + QueuesDir, Entry)) ]; + {error, enoent} -> [] + end, + DurableDirectories = sets:from_list(dict:fetch_keys(DurableDict)), + {DurableQueueNames, DurableTerms} = + lists:foldl( + fun (QueueDir, {DurableAcc, TermsAcc}) -> + case sets:is_element(QueueDir, DurableDirectories) of + true -> + TermsAcc1 = + case read_shutdown_terms( + filename:join(QueuesDir, QueueDir)) of + {error, _} -> TermsAcc; + {ok, Terms} -> [Terms | TermsAcc] + end, + {[dict:fetch(QueueDir, DurableDict) | DurableAcc], + TermsAcc1}; + false -> + Dir = filename:join(queues_dir(), QueueDir), + ok = rabbit_misc:recursive_delete([Dir]), + {DurableAcc, TermsAcc} + end + end, {[], []}, Directories), + {DurableTerms, {fun queue_index_walker/1, {start, DurableQueueNames}}}. + +%%---------------------------------------------------------------------------- +%% startup and shutdown +%%---------------------------------------------------------------------------- + +blank_state(QueueName) -> + StrName = queue_name_to_dir_name(QueueName), + Dir = filename:join(queues_dir(), StrName), + ok = filelib:ensure_dir(filename:join(Dir, "nothing")), + {ok, MaxJournal} = + application:get_env(rabbit, queue_index_max_journal_entries), + #qistate { dir = Dir, + segments = segments_new(), + journal_handle = undefined, + dirty_count = 0, + max_journal_entries = MaxJournal }. + +detect_clean_shutdown(Dir) -> + case file:delete(filename:join(Dir, ?CLEAN_FILENAME)) of + ok -> true; + {error, enoent} -> false + end. + +read_shutdown_terms(Dir) -> + rabbit_misc:read_term_file(filename:join(Dir, ?CLEAN_FILENAME)). + +store_clean_shutdown(Terms, Dir) -> + rabbit_misc:write_term_file(filename:join(Dir, ?CLEAN_FILENAME), Terms). + +init_clean(RecoveredCounts, State) -> + %% Load the journal. Since this is a clean recovery this (almost) + %% gets us back to where we were on shutdown. + State1 = #qistate { dir = Dir, segments = Segments } = load_journal(State), + %% The journal loading only creates records for segments touched + %% by the journal, and the counts are based on the journal entries + %% only. We need *complete* counts for *all* segments. By an + %% amazing coincidence we stored that information on shutdown. + Segments1 = + lists:foldl( + fun ({Seg, UnackedCount}, SegmentsN) -> + Segment = segment_find_or_new(Seg, Dir, SegmentsN), + segment_store(Segment #segment { unacked = UnackedCount }, + SegmentsN) + end, Segments, RecoveredCounts), + %% the counts above include transient messages, which would be the + %% wrong thing to return + {undefined, State1 # qistate { segments = Segments1 }}. + +init_dirty(CleanShutdown, ContainsCheckFun, State) -> + %% Recover the journal completely. This will also load segments + %% which have entries in the journal and remove duplicates. The + %% counts will correctly reflect the combination of the segment + %% and the journal. + State1 = #qistate { dir = Dir, segments = Segments } = + recover_journal(State), + {Segments1, Count} = + %% Load each segment in turn and filter out messages that are + %% not in the msg_store, by adding acks to the journal. These + %% acks only go to the RAM journal as it doesn't matter if we + %% lose them. Also mark delivered if not clean shutdown. Also + %% find the number of unacked messages. + lists:foldl( + fun (Seg, {Segments2, CountAcc}) -> + Segment = #segment { unacked = UnackedCount } = + recover_segment(ContainsCheckFun, CleanShutdown, + segment_find_or_new(Seg, Dir, Segments2)), + {segment_store(Segment, Segments2), CountAcc + UnackedCount} + end, {Segments, 0}, all_segment_nums(State1)), + %% Unconditionally flush since the dirty_count doesn't get updated + %% by the above foldl. + State2 = flush_journal(State1 #qistate { segments = Segments1 }), + {Count, State2}. + +terminate(State = #qistate { journal_handle = JournalHdl, + segments = Segments }) -> + ok = case JournalHdl of + undefined -> ok; + _ -> file_handle_cache:close(JournalHdl) + end, + SegmentCounts = + segment_fold( + fun (#segment { num = Seg, unacked = UnackedCount }, Acc) -> + [{Seg, UnackedCount} | Acc] + end, [], Segments), + {SegmentCounts, State #qistate { journal_handle = undefined, + segments = undefined }}. + +recover_segment(ContainsCheckFun, CleanShutdown, + Segment = #segment { journal_entries = JEntries }) -> + {SegEntries, UnackedCount} = load_segment(false, Segment), + {SegEntries1, UnackedCountDelta} = + segment_plus_journal(SegEntries, JEntries), + array:sparse_foldl( + fun (RelSeq, {{Guid, _IsPersistent}, Del, no_ack}, Segment1) -> + recover_message(ContainsCheckFun(Guid), CleanShutdown, + Del, RelSeq, Segment1) + end, + Segment #segment { unacked = UnackedCount + UnackedCountDelta }, + SegEntries1). + +recover_message( true, true, _Del, _RelSeq, Segment) -> + Segment; +recover_message( true, false, del, _RelSeq, Segment) -> + Segment; +recover_message( true, false, no_del, RelSeq, Segment) -> + add_to_journal(RelSeq, del, Segment); +recover_message(false, _, del, RelSeq, Segment) -> + add_to_journal(RelSeq, ack, Segment); +recover_message(false, _, no_del, RelSeq, Segment) -> + add_to_journal(RelSeq, ack, add_to_journal(RelSeq, del, Segment)). + +queue_name_to_dir_name(Name = #resource { kind = queue }) -> + Bin = term_to_binary(Name), + Size = 8*size(Bin), + <<Num:Size>> = Bin, + lists:flatten(io_lib:format("~.36B", [Num])). + +queues_dir() -> + filename:join(rabbit_mnesia:dir(), "queues"). + +%%---------------------------------------------------------------------------- +%% msg store startup delta function +%%---------------------------------------------------------------------------- + +queue_index_walker({start, DurableQueues}) when is_list(DurableQueues) -> + {ok, Gatherer} = gatherer:start_link(), + [begin + ok = gatherer:fork(Gatherer), + ok = worker_pool:submit_async( + fun () -> queue_index_walker_reader(QueueName, Gatherer) + end) + end || QueueName <- DurableQueues], + queue_index_walker({next, Gatherer}); + +queue_index_walker({next, Gatherer}) when is_pid(Gatherer) -> + case gatherer:out(Gatherer) of + empty -> + ok = gatherer:stop(Gatherer), + ok = rabbit_misc:unlink_and_capture_exit(Gatherer), + finished; + {value, {Guid, Count}} -> + {Guid, Count, {next, Gatherer}} + end. + +queue_index_walker_reader(QueueName, Gatherer) -> + State = #qistate { segments = Segments, dir = Dir } = + recover_journal(blank_state(QueueName)), + [ok = segment_entries_foldr( + fun (_RelSeq, {{Guid, true}, _IsDelivered, no_ack}, ok) -> + gatherer:in(Gatherer, {Guid, 1}); + (_RelSeq, _Value, Acc) -> + Acc + end, ok, segment_find_or_new(Seg, Dir, Segments)) || + Seg <- all_segment_nums(State)], + {_SegmentCounts, _State} = terminate(State), + ok = gatherer:finish(Gatherer). + +%%---------------------------------------------------------------------------- +%% journal manipulation +%%---------------------------------------------------------------------------- + +add_to_journal(SeqId, Action, State = #qistate { dirty_count = DCount, + segments = Segments, + dir = Dir }) -> + {Seg, RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId), + Segment = segment_find_or_new(Seg, Dir, Segments), + Segment1 = add_to_journal(RelSeq, Action, Segment), + State #qistate { dirty_count = DCount + 1, + segments = segment_store(Segment1, Segments) }; + +add_to_journal(RelSeq, Action, + Segment = #segment { journal_entries = JEntries, + unacked = UnackedCount }) -> + Segment1 = Segment #segment { + journal_entries = add_to_journal(RelSeq, Action, JEntries) }, + case Action of + del -> Segment1; + ack -> Segment1 #segment { unacked = UnackedCount - 1 }; + ?PUB -> Segment1 #segment { unacked = UnackedCount + 1 } + end; + +add_to_journal(RelSeq, Action, JEntries) -> + Val = case array:get(RelSeq, JEntries) of + undefined -> + case Action of + ?PUB -> {Action, no_del, no_ack}; + del -> {no_pub, del, no_ack}; + ack -> {no_pub, no_del, ack} + end; + ({Pub, no_del, no_ack}) when Action == del -> + {Pub, del, no_ack}; + ({Pub, Del, no_ack}) when Action == ack -> + {Pub, Del, ack} + end, + array:set(RelSeq, Val, JEntries). + +maybe_flush_journal(State = #qistate { dirty_count = DCount, + max_journal_entries = MaxJournal }) + when DCount > MaxJournal -> + flush_journal(State); +maybe_flush_journal(State) -> + State. + +flush_journal(State = #qistate { segments = Segments }) -> + Segments1 = + segment_fold( + fun (#segment { unacked = 0, path = Path }, SegmentsN) -> + case filelib:is_file(Path) of + true -> ok = file:delete(Path); + false -> ok + end, + SegmentsN; + (#segment {} = Segment, SegmentsN) -> + segment_store(append_journal_to_segment(Segment), SegmentsN) + end, segments_new(), Segments), + {JournalHdl, State1} = + get_journal_handle(State #qistate { segments = Segments1 }), + ok = file_handle_cache:clear(JournalHdl), + State1 #qistate { dirty_count = 0 }. + +append_journal_to_segment(#segment { journal_entries = JEntries, + path = Path } = Segment) -> + case array:sparse_size(JEntries) of + 0 -> Segment; + _ -> {ok, Hdl} = file_handle_cache:open(Path, [write | ?READ_MODE], + [{write_buffer, infinity}]), + array:sparse_foldl(fun write_entry_to_segment/3, Hdl, JEntries), + ok = file_handle_cache:close(Hdl), + Segment #segment { journal_entries = array_new() } + end. + +get_journal_handle(State = #qistate { journal_handle = undefined, + dir = Dir }) -> + Path = filename:join(Dir, ?JOURNAL_FILENAME), + {ok, Hdl} = file_handle_cache:open(Path, [write | ?READ_MODE], + [{write_buffer, infinity}]), + {Hdl, State #qistate { journal_handle = Hdl }}; +get_journal_handle(State = #qistate { journal_handle = Hdl }) -> + {Hdl, State}. + +%% Loading Journal. This isn't idempotent and will mess up the counts +%% if you call it more than once on the same state. Assumes the counts +%% are 0 to start with. +load_journal(State) -> + {JournalHdl, State1} = get_journal_handle(State), + {ok, 0} = file_handle_cache:position(JournalHdl, 0), + load_journal_entries(State1). + +%% ditto +recover_journal(State) -> + State1 = #qistate { segments = Segments } = load_journal(State), + Segments1 = + segment_map( + fun (Segment = #segment { journal_entries = JEntries, + unacked = UnackedCountInJournal }) -> + %% We want to keep ack'd entries in so that we can + %% remove them if duplicates are in the journal. The + %% counts here are purely from the segment itself. + {SegEntries, UnackedCountInSeg} = load_segment(true, Segment), + {JEntries1, UnackedCountDuplicates} = + journal_minus_segment(JEntries, SegEntries), + Segment #segment { journal_entries = JEntries1, + unacked = (UnackedCountInJournal + + UnackedCountInSeg - + UnackedCountDuplicates) } + end, Segments), + State1 #qistate { segments = Segments1 }. + +load_journal_entries(State = #qistate { journal_handle = Hdl }) -> + case file_handle_cache:read(Hdl, ?SEQ_BYTES) of + {ok, <<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS>>} -> + case Prefix of + ?DEL_JPREFIX -> + load_journal_entries(add_to_journal(SeqId, del, State)); + ?ACK_JPREFIX -> + load_journal_entries(add_to_journal(SeqId, ack, State)); + _ -> + case file_handle_cache:read(Hdl, ?GUID_BYTES) of + {ok, <<GuidNum:?GUID_BITS>>} -> + %% work around for binary data + %% fragmentation. See + %% rabbit_msg_file:read_next/2 + <<Guid:?GUID_BYTES/binary>> = + <<GuidNum:?GUID_BITS>>, + Publish = {Guid, case Prefix of + ?PUB_PERSIST_JPREFIX -> true; + ?PUB_TRANS_JPREFIX -> false + end}, + load_journal_entries( + add_to_journal(SeqId, Publish, State)); + _ErrOrEoF -> %% err, we've lost at least a publish + State + end + end; + _ErrOrEoF -> State + end. + +deliver_or_ack(_Kind, [], State) -> + State; +deliver_or_ack(Kind, SeqIds, State) -> + JPrefix = case Kind of ack -> ?ACK_JPREFIX; del -> ?DEL_JPREFIX end, + {JournalHdl, State1} = get_journal_handle(State), + ok = file_handle_cache:append( + JournalHdl, + [<<JPrefix:?JPREFIX_BITS, SeqId:?SEQ_BITS>> || SeqId <- SeqIds]), + maybe_flush_journal(lists:foldl(fun (SeqId, StateN) -> + add_to_journal(SeqId, Kind, StateN) + end, State1, SeqIds)). + +%%---------------------------------------------------------------------------- +%% segment manipulation +%%---------------------------------------------------------------------------- + +seq_id_to_seg_and_rel_seq_id(SeqId) -> + { SeqId div ?SEGMENT_ENTRY_COUNT, SeqId rem ?SEGMENT_ENTRY_COUNT }. + +reconstruct_seq_id(Seg, RelSeq) -> + (Seg * ?SEGMENT_ENTRY_COUNT) + RelSeq. + +all_segment_nums(#qistate { dir = Dir, segments = Segments }) -> + lists:sort( + sets:to_list( + lists:foldl( + fun (SegName, Set) -> + sets:add_element( + list_to_integer( + lists:takewhile(fun (C) -> $0 =< C andalso C =< $9 end, + SegName)), Set) + end, sets:from_list(segment_nums(Segments)), + filelib:wildcard("*" ++ ?SEGMENT_EXTENSION, Dir)))). + +segment_find_or_new(Seg, Dir, Segments) -> + case segment_find(Seg, Segments) of + {ok, Segment} -> Segment; + error -> SegName = integer_to_list(Seg) ++ ?SEGMENT_EXTENSION, + Path = filename:join(Dir, SegName), + #segment { num = Seg, + path = Path, + journal_entries = array_new(), + unacked = 0 } + end. + +segment_find(Seg, {_Segments, [Segment = #segment { num = Seg } |_]}) -> + {ok, Segment}; %% 1 or (2, matches head) +segment_find(Seg, {_Segments, [_, Segment = #segment { num = Seg }]}) -> + {ok, Segment}; %% 2, matches tail +segment_find(Seg, {Segments, _}) -> %% no match + dict:find(Seg, Segments). + +segment_store(Segment = #segment { num = Seg }, %% 1 or (2, matches head) + {Segments, [#segment { num = Seg } | Tail]}) -> + {Segments, [Segment | Tail]}; +segment_store(Segment = #segment { num = Seg }, %% 2, matches tail + {Segments, [SegmentA, #segment { num = Seg }]}) -> + {Segments, [Segment, SegmentA]}; +segment_store(Segment = #segment { num = Seg }, {Segments, []}) -> + {dict:erase(Seg, Segments), [Segment]}; +segment_store(Segment = #segment { num = Seg }, {Segments, [SegmentA]}) -> + {dict:erase(Seg, Segments), [Segment, SegmentA]}; +segment_store(Segment = #segment { num = Seg }, + {Segments, [SegmentA, SegmentB]}) -> + {dict:store(SegmentB#segment.num, SegmentB, dict:erase(Seg, Segments)), + [Segment, SegmentA]}. + +segment_fold(Fun, Acc, {Segments, CachedSegments}) -> + dict:fold(fun (_Seg, Segment, Acc1) -> Fun(Segment, Acc1) end, + lists:foldl(Fun, Acc, CachedSegments), Segments). + +segment_map(Fun, {Segments, CachedSegments}) -> + {dict:map(fun (_Seg, Segment) -> Fun(Segment) end, Segments), + lists:map(Fun, CachedSegments)}. + +segment_nums({Segments, CachedSegments}) -> + lists:map(fun (#segment { num = Num }) -> Num end, CachedSegments) ++ + dict:fetch_keys(Segments). + +segments_new() -> + {dict:new(), []}. + +write_entry_to_segment(_RelSeq, {?PUB, del, ack}, Hdl) -> + Hdl; +write_entry_to_segment(RelSeq, {Pub, Del, Ack}, Hdl) -> + ok = case Pub of + no_pub -> + ok; + {Guid, IsPersistent} -> + file_handle_cache:append( + Hdl, [<<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS, + (bool_to_int(IsPersistent)):1, + RelSeq:?REL_SEQ_BITS>>, Guid]) + end, + ok = case {Del, Ack} of + {no_del, no_ack} -> + ok; + _ -> + Binary = <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, + RelSeq:?REL_SEQ_BITS>>, + file_handle_cache:append( + Hdl, case {Del, Ack} of + {del, ack} -> [Binary, Binary]; + _ -> Binary + end) + end, + Hdl. + +read_bounded_segment(Seg, {StartSeg, StartRelSeq}, {EndSeg, EndRelSeq}, + {Messages, Segments}, Dir) -> + Segment = segment_find_or_new(Seg, Dir, Segments), + {segment_entries_foldr( + fun (RelSeq, {{Guid, IsPersistent}, IsDelivered, no_ack}, Acc) + when (Seg > StartSeg orelse StartRelSeq =< RelSeq) andalso + (Seg < EndSeg orelse EndRelSeq >= RelSeq) -> + [ {Guid, reconstruct_seq_id(StartSeg, RelSeq), + IsPersistent, IsDelivered == del} | Acc ]; + (_RelSeq, _Value, Acc) -> + Acc + end, Messages, Segment), + segment_store(Segment, Segments)}. + +segment_entries_foldr(Fun, Init, + Segment = #segment { journal_entries = JEntries }) -> + {SegEntries, _UnackedCount} = load_segment(false, Segment), + {SegEntries1, _UnackedCountD} = segment_plus_journal(SegEntries, JEntries), + array:sparse_foldr(Fun, Init, SegEntries1). + +%% Loading segments +%% +%% Does not do any combining with the journal at all. +load_segment(KeepAcked, #segment { path = Path }) -> + case filelib:is_file(Path) of + false -> {array_new(), 0}; + true -> {ok, Hdl} = file_handle_cache:open(Path, ?READ_MODE, []), + {ok, 0} = file_handle_cache:position(Hdl, bof), + Res = load_segment_entries(KeepAcked, Hdl, array_new(), 0), + ok = file_handle_cache:close(Hdl), + Res + end. + +load_segment_entries(KeepAcked, Hdl, SegEntries, UnackedCount) -> + case file_handle_cache:read(Hdl, ?REL_SEQ_ONLY_ENTRY_LENGTH_BYTES) of + {ok, <<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS, + IsPersistentNum:1, RelSeq:?REL_SEQ_BITS>>} -> + %% because we specify /binary, and binaries are complete + %% bytes, the size spec is in bytes, not bits. + {ok, Guid} = file_handle_cache:read(Hdl, ?GUID_BYTES), + Obj = {{Guid, 1 == IsPersistentNum}, no_del, no_ack}, + SegEntries1 = array:set(RelSeq, Obj, SegEntries), + load_segment_entries(KeepAcked, Hdl, SegEntries1, + UnackedCount + 1); + {ok, <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, + RelSeq:?REL_SEQ_BITS>>} -> + {UnackedCountDelta, SegEntries1} = + case array:get(RelSeq, SegEntries) of + {Pub, no_del, no_ack} -> + { 0, array:set(RelSeq, {Pub, del, no_ack}, SegEntries)}; + {Pub, del, no_ack} when KeepAcked -> + {-1, array:set(RelSeq, {Pub, del, ack}, SegEntries)}; + {_Pub, del, no_ack} -> + {-1, array:reset(RelSeq, SegEntries)} + end, + load_segment_entries(KeepAcked, Hdl, SegEntries1, + UnackedCount + UnackedCountDelta); + _ErrOrEoF -> + {SegEntries, UnackedCount} + end. + +array_new() -> + array:new([{default, undefined}, fixed, {size, ?SEGMENT_ENTRY_COUNT}]). + +bool_to_int(true ) -> 1; +bool_to_int(false) -> 0. + +%%---------------------------------------------------------------------------- +%% journal & segment combination +%%---------------------------------------------------------------------------- + +%% Combine what we have just read from a segment file with what we're +%% holding for that segment in memory. There must be no duplicates. +segment_plus_journal(SegEntries, JEntries) -> + array:sparse_foldl( + fun (RelSeq, JObj, {SegEntriesOut, AdditionalUnacked}) -> + SegEntry = array:get(RelSeq, SegEntriesOut), + {Obj, AdditionalUnackedDelta} = + segment_plus_journal1(SegEntry, JObj), + {case Obj of + undefined -> array:reset(RelSeq, SegEntriesOut); + _ -> array:set(RelSeq, Obj, SegEntriesOut) + end, + AdditionalUnacked + AdditionalUnackedDelta} + end, {SegEntries, 0}, JEntries). + +%% Here, the result is a tuple with the first element containing the +%% item which we may be adding to (for items only in the journal), +%% modifying in (bits in both), or, when returning 'undefined', +%% erasing from (ack in journal, not segment) the segment array. The +%% other element of the tuple is the delta for AdditionalUnacked. +segment_plus_journal1(undefined, {?PUB, no_del, no_ack} = Obj) -> + {Obj, 1}; +segment_plus_journal1(undefined, {?PUB, del, no_ack} = Obj) -> + {Obj, 1}; +segment_plus_journal1(undefined, {?PUB, del, ack}) -> + {undefined, 0}; + +segment_plus_journal1({?PUB = Pub, no_del, no_ack}, {no_pub, del, no_ack}) -> + {{Pub, del, no_ack}, 0}; +segment_plus_journal1({?PUB, no_del, no_ack}, {no_pub, del, ack}) -> + {undefined, -1}; +segment_plus_journal1({?PUB, del, no_ack}, {no_pub, no_del, ack}) -> + {undefined, -1}. + +%% Remove from the journal entries for a segment, items that are +%% duplicates of entries found in the segment itself. Used on start up +%% to clean up the journal. +journal_minus_segment(JEntries, SegEntries) -> + array:sparse_foldl( + fun (RelSeq, JObj, {JEntriesOut, UnackedRemoved}) -> + SegEntry = array:get(RelSeq, SegEntries), + {Obj, UnackedRemovedDelta} = + journal_minus_segment1(JObj, SegEntry), + {case Obj of + keep -> JEntriesOut; + undefined -> array:reset(RelSeq, JEntriesOut); + _ -> array:set(RelSeq, Obj, JEntriesOut) + end, + UnackedRemoved + UnackedRemovedDelta} + end, {JEntries, 0}, JEntries). + +%% Here, the result is a tuple with the first element containing the +%% item we are adding to or modifying in the (initially fresh) journal +%% array. If the item is 'undefined' we leave the journal array +%% alone. The other element of the tuple is the deltas for +%% UnackedRemoved. + +%% Both the same. Must be at least the publish +journal_minus_segment1({?PUB, _Del, no_ack} = Obj, Obj) -> + {undefined, 1}; +journal_minus_segment1({?PUB, _Del, ack} = Obj, Obj) -> + {undefined, 0}; + +%% Just publish in journal +journal_minus_segment1({?PUB, no_del, no_ack}, undefined) -> + {keep, 0}; + +%% Publish and deliver in journal +journal_minus_segment1({?PUB, del, no_ack}, undefined) -> + {keep, 0}; +journal_minus_segment1({?PUB = Pub, del, no_ack}, {Pub, no_del, no_ack}) -> + {{no_pub, del, no_ack}, 1}; + +%% Publish, deliver and ack in journal +journal_minus_segment1({?PUB, del, ack}, undefined) -> + {keep, 0}; +journal_minus_segment1({?PUB = Pub, del, ack}, {Pub, no_del, no_ack}) -> + {{no_pub, del, ack}, 1}; +journal_minus_segment1({?PUB = Pub, del, ack}, {Pub, del, no_ack}) -> + {{no_pub, no_del, ack}, 1}; + +%% Just deliver in journal +journal_minus_segment1({no_pub, del, no_ack}, {?PUB, no_del, no_ack}) -> + {keep, 0}; +journal_minus_segment1({no_pub, del, no_ack}, {?PUB, del, no_ack}) -> + {undefined, 0}; + +%% Just ack in journal +journal_minus_segment1({no_pub, no_del, ack}, {?PUB, del, no_ack}) -> + {keep, 0}; +journal_minus_segment1({no_pub, no_del, ack}, {?PUB, del, ack}) -> + {undefined, -1}; + +%% Deliver and ack in journal +journal_minus_segment1({no_pub, del, ack}, {?PUB, no_del, no_ack}) -> + {keep, 0}; +journal_minus_segment1({no_pub, del, ack}, {?PUB, del, no_ack}) -> + {{no_pub, no_del, ack}, 0}; +journal_minus_segment1({no_pub, del, ack}, {?PUB, del, ack}) -> + {undefined, -1}. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index de06c048..cff55c91 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -44,6 +44,9 @@ -include("rabbit_framing.hrl"). -include_lib("kernel/include/file.hrl"). +-define(PERSISTENT_MSG_STORE, msg_store_persistent). +-define(TRANSIENT_MSG_STORE, msg_store_transient). + test_content_prop_roundtrip(Datum, Binary) -> Types = [element(1, E) || E <- Datum], Values = [element(2, E) || E <- Datum], @@ -51,7 +54,10 @@ test_content_prop_roundtrip(Datum, Binary) -> Binary = rabbit_binary_generator:encode_properties(Types, Values). %% assertion all_tests() -> + application:set_env(rabbit, file_handles_high_watermark, 10, infinity), + passed = test_backing_queue(), passed = test_priority_queue(), + passed = test_bpqueue(), passed = test_pg_local(), passed = test_unfold(), passed = test_parsing(), @@ -207,6 +213,143 @@ test_priority_queue(Q) -> priority_queue:to_list(Q), priority_queue_out_all(Q)}. +test_bpqueue() -> + Q = bpqueue:new(), + true = bpqueue:is_empty(Q), + 0 = bpqueue:len(Q), + [] = bpqueue:to_list(Q), + + Q1 = bpqueue_test(fun bpqueue:in/3, fun bpqueue:out/1, + fun bpqueue:to_list/1, + fun bpqueue:foldl/3, fun bpqueue:map_fold_filter_l/4), + Q2 = bpqueue_test(fun bpqueue:in_r/3, fun bpqueue:out_r/1, + fun (QR) -> lists:reverse( + [{P, lists:reverse(L)} || + {P, L} <- bpqueue:to_list(QR)]) + end, + fun bpqueue:foldr/3, fun bpqueue:map_fold_filter_r/4), + + [{foo, [1, 2]}, {bar, [3]}] = bpqueue:to_list(bpqueue:join(Q, Q1)), + [{bar, [3]}, {foo, [2, 1]}] = bpqueue:to_list(bpqueue:join(Q2, Q)), + [{foo, [1, 2]}, {bar, [3, 3]}, {foo, [2,1]}] = + bpqueue:to_list(bpqueue:join(Q1, Q2)), + + [{foo, [1, 2]}, {bar, [3]}, {foo, [1, 2]}, {bar, [3]}] = + bpqueue:to_list(bpqueue:join(Q1, Q1)), + + [{foo, [1, 2]}, {bar, [3]}] = + bpqueue:to_list( + bpqueue:from_list( + [{x, []}, {foo, [1]}, {y, []}, {foo, [2]}, {bar, [3]}, {z, []}])), + + [{undefined, [a]}] = bpqueue:to_list(bpqueue:from_list([{undefined, [a]}])), + + {4, [a,b,c,d]} = + bpqueue:foldl( + fun (Prefix, Value, {Prefix, Acc}) -> + {Prefix + 1, [Value | Acc]} + end, + {0, []}, bpqueue:from_list([{0,[d]}, {1,[c]}, {2,[b]}, {3,[a]}])), + + [{bar,3}, {foo,2}, {foo,1}] = + bpqueue:foldr(fun (P, V, I) -> [{P,V} | I] end, [], Q2), + + BPQL = [{foo,[1,2,2]}, {bar,[3,4,5]}, {foo,[5,6,7]}], + BPQ = bpqueue:from_list(BPQL), + + %% no effect + {BPQL, 0} = bpqueue_mffl([none], {none, []}, BPQ), + {BPQL, 0} = bpqueue_mffl([foo,bar], {none, [1]}, BPQ), + {BPQL, 0} = bpqueue_mffl([bar], {none, [3]}, BPQ), + {BPQL, 0} = bpqueue_mffr([bar], {foo, [5]}, BPQ), + + %% process 1 item + {[{foo,[-1,2,2]}, {bar,[3,4,5]}, {foo,[5,6,7]}], 1} = + bpqueue_mffl([foo,bar], {foo, [2]}, BPQ), + {[{foo,[1,2,2]}, {bar,[-3,4,5]}, {foo,[5,6,7]}], 1} = + bpqueue_mffl([bar], {bar, [4]}, BPQ), + {[{foo,[1,2,2]}, {bar,[3,4,5]}, {foo,[5,6,-7]}], 1} = + bpqueue_mffr([foo,bar], {foo, [6]}, BPQ), + {[{foo,[1,2,2]}, {bar,[3,4]}, {baz,[-5]}, {foo,[5,6,7]}], 1} = + bpqueue_mffr([bar], {baz, [4]}, BPQ), + + %% change prefix + {[{bar,[-1,-2,-2,-3,-4,-5,-5,-6,-7]}], 9} = + bpqueue_mffl([foo,bar], {bar, []}, BPQ), + {[{bar,[-1,-2,-2,3,4,5]}, {foo,[5,6,7]}], 3} = + bpqueue_mffl([foo], {bar, [5]}, BPQ), + {[{bar,[-1,-2,-2,3,4,5,-5,-6]}, {foo,[7]}], 5} = + bpqueue_mffl([foo], {bar, [7]}, BPQ), + {[{foo,[1,2,2,-3,-4]}, {bar,[5]}, {foo,[5,6,7]}], 2} = + bpqueue_mffl([bar], {foo, [5]}, BPQ), + {[{bar,[-1,-2,-2,3,4,5,-5,-6,-7]}], 6} = + bpqueue_mffl([foo], {bar, []}, BPQ), + {[{foo,[1,2,2,-3,-4,-5,5,6,7]}], 3} = + bpqueue_mffl([bar], {foo, []}, BPQ), + + %% edge cases + {[{foo,[-1,-2,-2]}, {bar,[3,4,5]}, {foo,[5,6,7]}], 3} = + bpqueue_mffl([foo], {foo, [5]}, BPQ), + {[{foo,[1,2,2]}, {bar,[3,4,5]}, {foo,[-5,-6,-7]}], 3} = + bpqueue_mffr([foo], {foo, [2]}, BPQ), + + passed. + +bpqueue_test(In, Out, List, Fold, MapFoldFilter) -> + Q = bpqueue:new(), + {empty, _Q} = Out(Q), + + ok = Fold(fun (Prefix, Value, ok) -> {error, Prefix, Value} end, ok, Q), + {Q1M, 0} = MapFoldFilter(fun(_P) -> throw(explosion) end, + fun(_V, _N) -> throw(explosion) end, 0, Q), + [] = bpqueue:to_list(Q1M), + + Q1 = In(bar, 3, In(foo, 2, In(foo, 1, Q))), + false = bpqueue:is_empty(Q1), + 3 = bpqueue:len(Q1), + [{foo, [1, 2]}, {bar, [3]}] = List(Q1), + + {{value, foo, 1}, Q3} = Out(Q1), + {{value, foo, 2}, Q4} = Out(Q3), + {{value, bar, 3}, _Q5} = Out(Q4), + + F = fun (QN) -> + MapFoldFilter(fun (foo) -> true; + (_) -> false + end, + fun (2, _Num) -> stop; + (V, Num) -> {bar, -V, V - Num} end, + 0, QN) + end, + {Q6, 0} = F(Q), + [] = bpqueue:to_list(Q6), + {Q7, 1} = F(Q1), + [{bar, [-1]}, {foo, [2]}, {bar, [3]}] = List(Q7), + + Q1. + +bpqueue_mffl(FF1A, FF2A, BPQ) -> + bpqueue_mff(fun bpqueue:map_fold_filter_l/4, FF1A, FF2A, BPQ). + +bpqueue_mffr(FF1A, FF2A, BPQ) -> + bpqueue_mff(fun bpqueue:map_fold_filter_r/4, FF1A, FF2A, BPQ). + +bpqueue_mff(Fold, FF1A, FF2A, BPQ) -> + FF1 = fun (Prefixes) -> + fun (P) -> lists:member(P, Prefixes) end + end, + FF2 = fun ({Prefix, Stoppers}) -> + fun (Val, Num) -> + case lists:member(Val, Stoppers) of + true -> stop; + false -> {Prefix, -Val, 1 + Num} + end + end + end, + Queue_to_list = fun ({LHS, RHS}) -> {bpqueue:to_list(LHS), RHS} end, + + Queue_to_list(Fold(FF1(FF1A), FF2(FF2A), 0, BPQ)). + test_simple_n_element_queue(N) -> Items = lists:seq(1, N), Q = priority_queue_in_all(priority_queue:new(), Items), @@ -1186,3 +1329,580 @@ bad_handle_hook(_, _, _) -> exit(bad_handle_hook_called). extra_arg_hook(Hookname, Handler, Args, Extra1, Extra2) -> handle_hook(Hookname, Handler, {Args, Extra1, Extra2}). + +test_backing_queue() -> + case application:get_env(rabbit, backing_queue_module) of + {ok, rabbit_variable_queue} -> + {ok, FileSizeLimit} = + application:get_env(rabbit, msg_store_file_size_limit), + application:set_env(rabbit, msg_store_file_size_limit, 512, + infinity), + {ok, MaxJournal} = + application:get_env(rabbit, queue_index_max_journal_entries), + application:set_env(rabbit, queue_index_max_journal_entries, 128, + infinity), + passed = test_msg_store(), + application:set_env(rabbit, msg_store_file_size_limit, + FileSizeLimit, infinity), + passed = test_queue_index(), + passed = test_variable_queue(), + passed = test_queue_recover(), + application:set_env(rabbit, queue_index_max_journal_entries, + MaxJournal, infinity), + passed; + _ -> + passed + end. + +restart_msg_store_empty() -> + ok = rabbit_variable_queue:stop_msg_store(), + ok = rabbit_variable_queue:start_msg_store( + undefined, {fun (ok) -> finished end, ok}). + +guid_bin(X) -> + erlang:md5(term_to_binary(X)). + +msg_store_contains(Atom, Guids) -> + Atom = lists:foldl( + fun (Guid, Atom1) when Atom1 =:= Atom -> + rabbit_msg_store:contains(?PERSISTENT_MSG_STORE, Guid) end, + Atom, Guids). + +msg_store_sync(Guids) -> + Ref = make_ref(), + Self = self(), + ok = rabbit_msg_store:sync(?PERSISTENT_MSG_STORE, Guids, + fun () -> Self ! {sync, Ref} end), + receive + {sync, Ref} -> ok + after + 10000 -> + io:format("Sync from msg_store missing for guids ~p~n", [Guids]), + throw(timeout) + end. + +msg_store_read(Guids, MSCState) -> + lists:foldl(fun (Guid, MSCStateM) -> + {{ok, Guid}, MSCStateN} = rabbit_msg_store:read( + ?PERSISTENT_MSG_STORE, + Guid, MSCStateM), + MSCStateN + end, MSCState, Guids). + +msg_store_write(Guids, MSCState) -> + lists:foldl(fun (Guid, {ok, MSCStateN}) -> + rabbit_msg_store:write(?PERSISTENT_MSG_STORE, + Guid, Guid, MSCStateN) + end, {ok, MSCState}, Guids). + +msg_store_remove(Guids) -> + rabbit_msg_store:remove(?PERSISTENT_MSG_STORE, Guids). + +foreach_with_msg_store_client(MsgStore, Ref, Fun, L) -> + rabbit_msg_store:client_terminate( + lists:foldl(fun (Guid, MSCState) -> Fun(Guid, MsgStore, MSCState) end, + rabbit_msg_store:client_init(MsgStore, Ref), L)). + +test_msg_store() -> + restart_msg_store_empty(), + Self = self(), + Guids = [guid_bin(M) || M <- lists:seq(1,100)], + {Guids1stHalf, Guids2ndHalf} = lists:split(50, Guids), + %% check we don't contain any of the msgs we're about to publish + false = msg_store_contains(false, Guids), + Ref = rabbit_guid:guid(), + MSCState = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref), + %% publish the first half + {ok, MSCState1} = msg_store_write(Guids1stHalf, MSCState), + %% sync on the first half + ok = msg_store_sync(Guids1stHalf), + %% publish the second half + {ok, MSCState2} = msg_store_write(Guids2ndHalf, MSCState1), + %% sync on the first half again - the msg_store will be dirty, but + %% we won't need the fsync + ok = msg_store_sync(Guids1stHalf), + %% check they're all in there + true = msg_store_contains(true, Guids), + %% publish the latter half twice so we hit the caching and ref count code + {ok, MSCState3} = msg_store_write(Guids2ndHalf, MSCState2), + %% check they're still all in there + true = msg_store_contains(true, Guids), + %% sync on the 2nd half, but do lots of individual syncs to try + %% and cause coalescing to happen + ok = lists:foldl( + fun (Guid, ok) -> rabbit_msg_store:sync( + ?PERSISTENT_MSG_STORE, + [Guid], fun () -> Self ! {sync, Guid} end) + end, ok, Guids2ndHalf), + lists:foldl( + fun(Guid, ok) -> + receive + {sync, Guid} -> ok + after + 10000 -> + io:format("Sync from msg_store missing (guid: ~p)~n", + [Guid]), + throw(timeout) + end + end, ok, Guids2ndHalf), + %% it's very likely we're not dirty here, so the 1st half sync + %% should hit a different code path + ok = msg_store_sync(Guids1stHalf), + %% read them all + MSCState4 = msg_store_read(Guids, MSCState3), + %% read them all again - this will hit the cache, not disk + MSCState5 = msg_store_read(Guids, MSCState4), + %% remove them all + ok = rabbit_msg_store:remove(?PERSISTENT_MSG_STORE, Guids), + %% check first half doesn't exist + false = msg_store_contains(false, Guids1stHalf), + %% check second half does exist + true = msg_store_contains(true, Guids2ndHalf), + %% read the second half again + MSCState6 = msg_store_read(Guids2ndHalf, MSCState5), + %% release the second half, just for fun (aka code coverage) + ok = rabbit_msg_store:release(?PERSISTENT_MSG_STORE, Guids2ndHalf), + %% read the second half again, just for fun (aka code coverage) + MSCState7 = msg_store_read(Guids2ndHalf, MSCState6), + ok = rabbit_msg_store:client_terminate(MSCState7), + %% stop and restart, preserving every other msg in 2nd half + ok = rabbit_variable_queue:stop_msg_store(), + ok = rabbit_variable_queue:start_msg_store( + [], {fun ([]) -> finished; + ([Guid|GuidsTail]) + when length(GuidsTail) rem 2 == 0 -> + {Guid, 1, GuidsTail}; + ([Guid|GuidsTail]) -> + {Guid, 0, GuidsTail} + end, Guids2ndHalf}), + %% check we have the right msgs left + lists:foldl( + fun (Guid, Bool) -> + not(Bool = rabbit_msg_store:contains(?PERSISTENT_MSG_STORE, Guid)) + end, false, Guids2ndHalf), + %% restart empty + restart_msg_store_empty(), + %% check we don't contain any of the msgs + false = msg_store_contains(false, Guids), + %% publish the first half again + MSCState8 = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref), + {ok, MSCState9} = msg_store_write(Guids1stHalf, MSCState8), + %% this should force some sort of sync internally otherwise misread + ok = rabbit_msg_store:client_terminate( + msg_store_read(Guids1stHalf, MSCState9)), + ok = rabbit_msg_store:remove(?PERSISTENT_MSG_STORE, Guids1stHalf), + %% restart empty + restart_msg_store_empty(), %% now safe to reuse guids + %% push a lot of msgs in... at least 100 files worth + {ok, FileSize} = application:get_env(rabbit, msg_store_file_size_limit), + PayloadSizeBits = 65536, + BigCount = trunc(100 * FileSize / (PayloadSizeBits div 8)), + GuidsBig = [guid_bin(X) || X <- lists:seq(1, BigCount)], + Payload = << 0:PayloadSizeBits >>, + ok = foreach_with_msg_store_client( + ?PERSISTENT_MSG_STORE, Ref, + fun (Guid, MsgStore, MSCStateM) -> + {ok, MSCStateN} = rabbit_msg_store:write( + MsgStore, Guid, Payload, MSCStateM), + MSCStateN + end, GuidsBig), + %% now read them to ensure we hit the fast client-side reading + ok = foreach_with_msg_store_client( + ?PERSISTENT_MSG_STORE, Ref, + fun (Guid, MsgStore, MSCStateM) -> + {{ok, Payload}, MSCStateN} = rabbit_msg_store:read( + MsgStore, Guid, MSCStateM), + MSCStateN + end, GuidsBig), + %% .., then 3s by 1... + ok = msg_store_remove([guid_bin(X) || X <- lists:seq(BigCount, 1, -3)]), + %% .., then remove 3s by 2, from the young end first. This hits + %% GC (under 50% good data left, but no empty files. Must GC). + ok = msg_store_remove([guid_bin(X) || X <- lists:seq(BigCount-1, 1, -3)]), + %% .., then remove 3s by 3, from the young end first. This hits + %% GC... + ok = msg_store_remove([guid_bin(X) || X <- lists:seq(BigCount-2, 1, -3)]), + %% ensure empty + false = msg_store_contains(false, GuidsBig), + %% restart empty + restart_msg_store_empty(), + passed. + +queue_name(Name) -> + rabbit_misc:r(<<"/">>, queue, Name). + +test_queue() -> + queue_name(<<"test">>). + +init_test_queue() -> + rabbit_queue_index:init( + test_queue(), false, + fun (Guid) -> + rabbit_msg_store:contains(?PERSISTENT_MSG_STORE, Guid) + end). + +restart_test_queue(Qi) -> + _ = rabbit_queue_index:terminate([], Qi), + ok = rabbit_variable_queue:stop(), + ok = rabbit_variable_queue:start([test_queue()]), + init_test_queue(). + +empty_test_queue() -> + ok = rabbit_variable_queue:stop(), + ok = rabbit_variable_queue:start([]), + {0, _Terms, Qi} = init_test_queue(), + _ = rabbit_queue_index:delete_and_terminate(Qi), + ok. + +with_empty_test_queue(Fun) -> + ok = empty_test_queue(), + {0, _Terms, Qi} = init_test_queue(), + rabbit_queue_index:delete_and_terminate(Fun(Qi)). + +queue_index_publish(SeqIds, Persistent, Qi) -> + Ref = rabbit_guid:guid(), + MsgStore = case Persistent of + true -> ?PERSISTENT_MSG_STORE; + false -> ?TRANSIENT_MSG_STORE + end, + {A, B, MSCStateEnd} = + lists:foldl( + fun (SeqId, {QiN, SeqIdsGuidsAcc, MSCStateN}) -> + Guid = rabbit_guid:guid(), + QiM = rabbit_queue_index:publish( + Guid, SeqId, Persistent, QiN), + {ok, MSCStateM} = rabbit_msg_store:write(MsgStore, Guid, + Guid, MSCStateN), + {QiM, [{SeqId, Guid} | SeqIdsGuidsAcc], MSCStateM} + end, {Qi, [], rabbit_msg_store:client_init(MsgStore, Ref)}, SeqIds), + ok = rabbit_msg_store:client_delete_and_terminate( + MSCStateEnd, MsgStore, Ref), + {A, B}. + +verify_read_with_published(_Delivered, _Persistent, [], _) -> + ok; +verify_read_with_published(Delivered, Persistent, + [{Guid, SeqId, Persistent, Delivered}|Read], + [{SeqId, Guid}|Published]) -> + verify_read_with_published(Delivered, Persistent, Read, Published); +verify_read_with_published(_Delivered, _Persistent, _Read, _Published) -> + ko. + +test_queue_index() -> + SegmentSize = rabbit_queue_index:next_segment_boundary(0), + TwoSegs = SegmentSize + SegmentSize, + MostOfASegment = trunc(SegmentSize*0.75), + SeqIdsA = lists:seq(0, MostOfASegment-1), + SeqIdsB = lists:seq(MostOfASegment, 2*MostOfASegment), + SeqIdsC = lists:seq(0, trunc(SegmentSize/2)), + SeqIdsD = lists:seq(0, SegmentSize*4), + + with_empty_test_queue( + fun (Qi0) -> + {0, 0, Qi1} = rabbit_queue_index:bounds(Qi0), + {Qi2, SeqIdsGuidsA} = queue_index_publish(SeqIdsA, false, Qi1), + {0, SegmentSize, Qi3} = rabbit_queue_index:bounds(Qi2), + {ReadA, Qi4} = rabbit_queue_index:read(0, SegmentSize, Qi3), + ok = verify_read_with_published(false, false, ReadA, + lists:reverse(SeqIdsGuidsA)), + %% should get length back as 0, as all the msgs were transient + {0, _Terms1, Qi6} = restart_test_queue(Qi4), + {0, 0, Qi7} = rabbit_queue_index:bounds(Qi6), + {Qi8, SeqIdsGuidsB} = queue_index_publish(SeqIdsB, true, Qi7), + {0, TwoSegs, Qi9} = rabbit_queue_index:bounds(Qi8), + {ReadB, Qi10} = rabbit_queue_index:read(0, SegmentSize, Qi9), + ok = verify_read_with_published(false, true, ReadB, + lists:reverse(SeqIdsGuidsB)), + %% should get length back as MostOfASegment + LenB = length(SeqIdsB), + {LenB, _Terms2, Qi12} = restart_test_queue(Qi10), + {0, TwoSegs, Qi13} = rabbit_queue_index:bounds(Qi12), + Qi14 = rabbit_queue_index:deliver(SeqIdsB, Qi13), + {ReadC, Qi15} = rabbit_queue_index:read(0, SegmentSize, Qi14), + ok = verify_read_with_published(true, true, ReadC, + lists:reverse(SeqIdsGuidsB)), + Qi16 = rabbit_queue_index:ack(SeqIdsB, Qi15), + Qi17 = rabbit_queue_index:flush(Qi16), + %% Everything will have gone now because #pubs == #acks + {0, 0, Qi18} = rabbit_queue_index:bounds(Qi17), + %% should get length back as 0 because all persistent + %% msgs have been acked + {0, _Terms3, Qi19} = restart_test_queue(Qi18), + Qi19 + end), + + %% These next bits are just to hit the auto deletion of segment files. + %% First, partials: + %% a) partial pub+del+ack, then move to new segment + with_empty_test_queue( + fun (Qi0) -> + {Qi1, _SeqIdsGuidsC} = queue_index_publish(SeqIdsC, + false, Qi0), + Qi2 = rabbit_queue_index:deliver(SeqIdsC, Qi1), + Qi3 = rabbit_queue_index:ack(SeqIdsC, Qi2), + Qi4 = rabbit_queue_index:flush(Qi3), + {Qi5, _SeqIdsGuidsC1} = queue_index_publish([SegmentSize], + false, Qi4), + Qi5 + end), + + %% b) partial pub+del, then move to new segment, then ack all in old segment + with_empty_test_queue( + fun (Qi0) -> + {Qi1, _SeqIdsGuidsC2} = queue_index_publish(SeqIdsC, + false, Qi0), + Qi2 = rabbit_queue_index:deliver(SeqIdsC, Qi1), + {Qi3, _SeqIdsGuidsC3} = queue_index_publish([SegmentSize], + false, Qi2), + Qi4 = rabbit_queue_index:ack(SeqIdsC, Qi3), + rabbit_queue_index:flush(Qi4) + end), + + %% c) just fill up several segments of all pubs, then +dels, then +acks + with_empty_test_queue( + fun (Qi0) -> + {Qi1, _SeqIdsGuidsD} = queue_index_publish(SeqIdsD, + false, Qi0), + Qi2 = rabbit_queue_index:deliver(SeqIdsD, Qi1), + Qi3 = rabbit_queue_index:ack(SeqIdsD, Qi2), + rabbit_queue_index:flush(Qi3) + end), + + %% d) get messages in all states to a segment, then flush, then do + %% the same again, don't flush and read. This will hit all + %% possibilities in combining the segment with the journal. + with_empty_test_queue( + fun (Qi0) -> + {Qi1, [Seven,Five,Four|_]} = queue_index_publish([0,1,2,4,5,7], + false, Qi0), + Qi2 = rabbit_queue_index:deliver([0,1,4], Qi1), + Qi3 = rabbit_queue_index:ack([0], Qi2), + Qi4 = rabbit_queue_index:flush(Qi3), + {Qi5, [Eight,Six|_]} = queue_index_publish([3,6,8], false, Qi4), + Qi6 = rabbit_queue_index:deliver([2,3,5,6], Qi5), + Qi7 = rabbit_queue_index:ack([1,2,3], Qi6), + {[], Qi8} = rabbit_queue_index:read(0, 4, Qi7), + {ReadD, Qi9} = rabbit_queue_index:read(4, 7, Qi8), + ok = verify_read_with_published(true, false, ReadD, + [Four, Five, Six]), + {ReadE, Qi10} = rabbit_queue_index:read(7, 9, Qi9), + ok = verify_read_with_published(false, false, ReadE, + [Seven, Eight]), + Qi10 + end), + + %% e) as for (d), but use terminate instead of read, which will + %% exercise journal_minus_segment, not segment_plus_journal. + with_empty_test_queue( + fun (Qi0) -> + {Qi1, _SeqIdsGuidsE} = queue_index_publish([0,1,2,4,5,7], + true, Qi0), + Qi2 = rabbit_queue_index:deliver([0,1,4], Qi1), + Qi3 = rabbit_queue_index:ack([0], Qi2), + {5, _Terms9, Qi4} = restart_test_queue(Qi3), + {Qi5, _SeqIdsGuidsF} = queue_index_publish([3,6,8], true, Qi4), + Qi6 = rabbit_queue_index:deliver([2,3,5,6], Qi5), + Qi7 = rabbit_queue_index:ack([1,2,3], Qi6), + {5, _Terms10, Qi8} = restart_test_queue(Qi7), + Qi8 + end), + + ok = rabbit_variable_queue:stop(), + ok = rabbit_variable_queue:start([]), + + passed. + +variable_queue_publish(IsPersistent, Count, VQ) -> + lists:foldl( + fun (_N, VQN) -> + rabbit_variable_queue:publish( + rabbit_basic:message( + rabbit_misc:r(<<>>, exchange, <<>>), + <<>>, #'P_basic'{delivery_mode = case IsPersistent of + true -> 2; + false -> 1 + end}, <<>>), VQN) + end, VQ, lists:seq(1, Count)). + +variable_queue_fetch(Count, IsPersistent, IsDelivered, Len, VQ) -> + lists:foldl(fun (N, {VQN, AckTagsAcc}) -> + Rem = Len - N, + {{#basic_message { is_persistent = IsPersistent }, + IsDelivered, AckTagN, Rem}, VQM} = + rabbit_variable_queue:fetch(true, VQN), + {VQM, [AckTagN | AckTagsAcc]} + end, {VQ, []}, lists:seq(1, Count)). + +assert_prop(List, Prop, Value) -> + Value = proplists:get_value(Prop, List). + +assert_props(List, PropVals) -> + [assert_prop(List, Prop, Value) || {Prop, Value} <- PropVals]. + +with_fresh_variable_queue(Fun) -> + ok = empty_test_queue(), + VQ = rabbit_variable_queue:init(test_queue(), true, false), + S0 = rabbit_variable_queue:status(VQ), + assert_props(S0, [{q1, 0}, {q2, 0}, + {delta, {delta, undefined, 0, undefined}}, + {q3, 0}, {q4, 0}, + {len, 0}]), + _ = rabbit_variable_queue:delete_and_terminate(Fun(VQ)), + passed. + +test_variable_queue() -> + [passed = with_fresh_variable_queue(F) || + F <- [fun test_variable_queue_dynamic_duration_change/1, + fun test_variable_queue_partial_segments_delta_thing/1, + fun test_variable_queue_all_the_bits_not_covered_elsewhere1/1, + fun test_variable_queue_all_the_bits_not_covered_elsewhere2/1]], + passed. + +test_variable_queue_dynamic_duration_change(VQ0) -> + SegmentSize = rabbit_queue_index:next_segment_boundary(0), + + %% start by sending in a couple of segments worth + Len = 2*SegmentSize, + VQ1 = variable_queue_publish(false, Len, VQ0), + + %% squeeze and relax queue + Churn = Len div 32, + VQ2 = publish_fetch_and_ack(Churn, Len, VQ1), + {Duration, VQ3} = rabbit_variable_queue:ram_duration(VQ2), + VQ7 = lists:foldl( + fun (Duration1, VQ4) -> + {_Duration, VQ5} = rabbit_variable_queue:ram_duration(VQ4), + io:format("~p:~n~p~n", + [Duration1, rabbit_variable_queue:status(VQ5)]), + VQ6 = rabbit_variable_queue:set_ram_duration_target( + Duration1, VQ5), + publish_fetch_and_ack(Churn, Len, VQ6) + end, VQ3, [Duration / 4, 0, Duration / 4, infinity]), + + %% drain + {VQ8, AckTags} = variable_queue_fetch(Len, false, false, Len, VQ7), + VQ9 = rabbit_variable_queue:ack(AckTags, VQ8), + {empty, VQ10} = rabbit_variable_queue:fetch(true, VQ9), + + VQ10. + +publish_fetch_and_ack(0, _Len, VQ0) -> + VQ0; +publish_fetch_and_ack(N, Len, VQ0) -> + VQ1 = variable_queue_publish(false, 1, VQ0), + {{_Msg, false, AckTag, Len}, VQ2} = rabbit_variable_queue:fetch(true, VQ1), + publish_fetch_and_ack(N-1, Len, rabbit_variable_queue:ack([AckTag], VQ2)). + +test_variable_queue_partial_segments_delta_thing(VQ0) -> + SegmentSize = rabbit_queue_index:next_segment_boundary(0), + HalfSegment = SegmentSize div 2, + OneAndAHalfSegment = SegmentSize + HalfSegment, + VQ1 = variable_queue_publish(true, OneAndAHalfSegment, VQ0), + {_Duration, VQ2} = rabbit_variable_queue:ram_duration(VQ1), + VQ3 = check_variable_queue_status( + rabbit_variable_queue:set_ram_duration_target(0, VQ2), + %% one segment in q3 as betas, and half a segment in delta + [{delta, {delta, SegmentSize, HalfSegment, OneAndAHalfSegment}}, + {q3, SegmentSize}, + {len, SegmentSize + HalfSegment}]), + VQ4 = rabbit_variable_queue:set_ram_duration_target(infinity, VQ3), + VQ5 = check_variable_queue_status( + variable_queue_publish(true, 1, VQ4), + %% one alpha, but it's in the same segment as the deltas + [{q1, 1}, + {delta, {delta, SegmentSize, HalfSegment, OneAndAHalfSegment}}, + {q3, SegmentSize}, + {len, SegmentSize + HalfSegment + 1}]), + {VQ6, AckTags} = variable_queue_fetch(SegmentSize, true, false, + SegmentSize + HalfSegment + 1, VQ5), + VQ7 = check_variable_queue_status( + VQ6, + %% the half segment should now be in q3 as betas + [{q1, 1}, + {delta, {delta, undefined, 0, undefined}}, + {q3, HalfSegment}, + {len, HalfSegment + 1}]), + {VQ8, AckTags1} = variable_queue_fetch(HalfSegment + 1, true, false, + HalfSegment + 1, VQ7), + VQ9 = rabbit_variable_queue:ack(AckTags ++ AckTags1, VQ8), + %% should be empty now + {empty, VQ10} = rabbit_variable_queue:fetch(true, VQ9), + VQ10. + +check_variable_queue_status(VQ0, Props) -> + VQ1 = variable_queue_wait_for_shuffling_end(VQ0), + S = rabbit_variable_queue:status(VQ1), + io:format("~p~n", [S]), + assert_props(S, Props), + VQ1. + +variable_queue_wait_for_shuffling_end(VQ) -> + case rabbit_variable_queue:needs_idle_timeout(VQ) of + true -> variable_queue_wait_for_shuffling_end( + rabbit_variable_queue:idle_timeout(VQ)); + false -> VQ + end. + +test_variable_queue_all_the_bits_not_covered_elsewhere1(VQ0) -> + Count = 2 * rabbit_queue_index:next_segment_boundary(0), + VQ1 = variable_queue_publish(true, Count, VQ0), + VQ2 = variable_queue_publish(false, Count, VQ1), + VQ3 = rabbit_variable_queue:set_ram_duration_target(0, VQ2), + {VQ4, _AckTags} = variable_queue_fetch(Count, true, false, + Count + Count, VQ3), + {VQ5, _AckTags1} = variable_queue_fetch(Count, false, false, + Count, VQ4), + _VQ6 = rabbit_variable_queue:terminate(VQ5), + VQ7 = rabbit_variable_queue:init(test_queue(), true, true), + {{_Msg1, true, _AckTag1, Count1}, VQ8} = + rabbit_variable_queue:fetch(true, VQ7), + VQ9 = variable_queue_publish(false, 1, VQ8), + VQ10 = rabbit_variable_queue:set_ram_duration_target(0, VQ9), + {VQ11, _AckTags2} = variable_queue_fetch(Count1, true, true, Count, VQ10), + {VQ12, _AckTags3} = variable_queue_fetch(1, false, false, 1, VQ11), + VQ12. + +test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) -> + VQ1 = rabbit_variable_queue:set_ram_duration_target(0, VQ0), + VQ2 = variable_queue_publish(false, 4, VQ1), + {VQ3, AckTags} = variable_queue_fetch(2, false, false, 4, VQ2), + VQ4 = rabbit_variable_queue:requeue(AckTags, VQ3), + VQ5 = rabbit_variable_queue:idle_timeout(VQ4), + _VQ6 = rabbit_variable_queue:terminate(VQ5), + VQ7 = rabbit_variable_queue:init(test_queue(), true, true), + {empty, VQ8} = rabbit_variable_queue:fetch(false, VQ7), + VQ8. + +test_queue_recover() -> + Count = 2 * rabbit_queue_index:next_segment_boundary(0), + TxID = rabbit_guid:guid(), + {new, #amqqueue { pid = QPid, name = QName }} = + rabbit_amqqueue:declare(test_queue(), true, false, [], none), + Msg = rabbit_basic:message(rabbit_misc:r(<<>>, exchange, <<>>), + <<>>, #'P_basic'{delivery_mode = 2}, <<>>), + Delivery = #delivery{mandatory = false, immediate = false, txn = TxID, + sender = self(), message = Msg}, + [true = rabbit_amqqueue:deliver(QPid, Delivery) || + _ <- lists:seq(1, Count)], + rabbit_amqqueue:commit_all([QPid], TxID, self()), + exit(QPid, kill), + MRef = erlang:monitor(process, QPid), + receive {'DOWN', MRef, process, QPid, _Info} -> ok + after 10000 -> exit(timeout_waiting_for_queue_death) + end, + rabbit_amqqueue:stop(), + ok = rabbit_amqqueue:start(), + rabbit_amqqueue:with_or_die( + QName, + fun (Q1 = #amqqueue { pid = QPid1 }) -> + CountMinusOne = Count - 1, + {ok, CountMinusOne, {QName, QPid1, _AckTag, true, _Msg}} = + rabbit_amqqueue:basic_get(Q1, self(), false), + exit(QPid1, shutdown), + VQ1 = rabbit_variable_queue:init(QName, true, true), + {{_Msg1, true, _AckTag1, CountMinusOne}, VQ2} = + rabbit_variable_queue:fetch(true, VQ1), + _VQ3 = rabbit_variable_queue:delete_and_terminate(VQ2), + rabbit_amqqueue:internal_delete(QName) + end), + passed. diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl new file mode 100644 index 00000000..8bff66af --- /dev/null +++ b/src/rabbit_variable_queue.erl @@ -0,0 +1,1432 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_variable_queue). + +-export([init/3, terminate/1, delete_and_terminate/1, + purge/1, publish/2, publish_delivered/3, fetch/2, ack/2, + tx_publish/3, tx_ack/3, tx_rollback/2, tx_commit/3, + requeue/2, len/1, is_empty/1, + set_ram_duration_target/2, ram_duration/1, + needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1, + status/1]). + +-export([start/1, stop/0]). + +%% exported for testing only +-export([start_msg_store/2, stop_msg_store/0]). + +%%---------------------------------------------------------------------------- +%% Definitions: + +%% alpha: this is a message where both the message itself, and its +%% position within the queue are held in RAM +%% +%% beta: this is a message where the message itself is only held on +%% disk, but its position within the queue is held in RAM. +%% +%% gamma: this is a message where the message itself is only held on +%% disk, but its position is both in RAM and on disk. +%% +%% delta: this is a collection of messages, represented by a single +%% term, where the messages and their position are only held on +%% disk. +%% +%% Note that for persistent messages, the message and its position +%% within the queue are always held on disk, *in addition* to being in +%% one of the above classifications. +%% +%% Also note that within this code, the term gamma never +%% appears. Instead, gammas are defined by betas who have had their +%% queue position recorded on disk. +%% +%% In general, messages move q1 -> q2 -> delta -> q3 -> q4, though +%% many of these steps are frequently skipped. q1 and q4 only hold +%% alphas, q2 and q3 hold both betas and gammas (as queues of queues, +%% using the bpqueue module where the block prefix determines whether +%% they're betas or gammas). When a message arrives, its +%% classification is determined. It is then added to the rightmost +%% appropriate queue. +%% +%% If a new message is determined to be a beta or gamma, q1 is +%% empty. If a new message is determined to be a delta, q1 and q2 are +%% empty (and actually q4 too). +%% +%% When removing messages from a queue, if q4 is empty then q3 is read +%% directly. If q3 becomes empty then the next segment's worth of +%% messages from delta are read into q3, reducing the size of +%% delta. If the queue is non empty, either q4 or q3 contain +%% entries. It is never permitted for delta to hold all the messages +%% in the queue. +%% +%% The duration indicated to us by the memory_monitor is used to +%% calculate, given our current ingress and egress rates, how many +%% messages we should hold in RAM. When we need to push alphas to +%% betas or betas to gammas, we favour writing out messages that are +%% further from the head of the queue. This minimises writes to disk, +%% as the messages closer to the tail of the queue stay in the queue +%% for longer, thus do not need to be replaced as quickly by sending +%% other messages to disk. +%% +%% Whilst messages are pushed to disk and forgotten from RAM as soon +%% as requested by a new setting of the queue RAM duration, the +%% inverse is not true: we only load messages back into RAM as +%% demanded as the queue is read from. Thus only publishes to the +%% queue will take up available spare capacity. +%% +%% When we report our duration to the memory monitor, we calculate +%% average ingress and egress rates over the last two samples, and +%% then calculate our duration based on the sum of the ingress and +%% egress rates. More than two samples could be used, but it's a +%% balance between responding quickly enough to changes in +%% producers/consumers versus ignoring temporary blips. The problem +%% with temporary blips is that with just a few queues, they can have +%% substantial impact on the calculation of the average duration and +%% hence cause unnecessary I/O. Another alternative is to increase the +%% amqqueue_process:RAM_DURATION_UPDATE_PERIOD to beyond 5 +%% seconds. However, that then runs the risk of being too slow to +%% inform the memory monitor of changes. Thus a 5 second interval, +%% plus a rolling average over the last two samples seems to work +%% well in practice. +%% +%% The sum of the ingress and egress rates is used because the egress +%% rate alone is not sufficient. Adding in the ingress rate means that +%% queues which are being flooded by messages are given more memory, +%% resulting in them being able to process the messages faster (by +%% doing less I/O, or at least deferring it) and thus helping keep +%% their mailboxes empty and thus the queue as a whole is more +%% responsive. If such a queue also has fast but previously idle +%% consumers, the consumer can then start to be driven as fast as it +%% can go, whereas if only egress rate was being used, the incoming +%% messages may have to be written to disk and then read back in, +%% resulting in the hard disk being a bottleneck in driving the +%% consumers. Generally, we want to give Rabbit every chance of +%% getting rid of messages as fast as possible and remaining +%% responsive, and using only the egress rate impacts that goal. +%% +%% If a queue is full of transient messages, then the transition from +%% betas to deltas will be potentially very expensive as millions of +%% entries must be written to disk by the queue_index module. This can +%% badly stall the queue. In order to avoid this, the proportion of +%% gammas / (betas+gammas) must not be lower than (betas+gammas) / +%% (alphas+betas+gammas). As the queue grows or available memory +%% shrinks, the latter ratio increases, requiring the conversion of +%% more gammas to betas in order to maintain the invariant. At the +%% point at which betas and gammas must be converted to deltas, there +%% should be very few betas remaining, thus the transition is fast (no +%% work needs to be done for the gamma -> delta transition). +%% +%% The conversion of betas to gammas is done in batches of exactly +%% ?IO_BATCH_SIZE. This value should not be too small, otherwise the +%% frequent operations on the queues of q2 and q3 will not be +%% effectively amortised (switching the direction of queue access +%% defeats amortisation), nor should it be too big, otherwise +%% converting a batch stalls the queue for too long. Therefore, it +%% must be just right. ram_index_count is used here and is the number +%% of betas. +%% +%% The conversion from alphas to betas is also chunked, but only to +%% ensure no more than ?IO_BATCH_SIZE alphas are converted to betas at +%% any one time. This further smooths the effects of changes to the +%% target_ram_msg_count and ensures the queue remains responsive +%% even when there is a large amount of IO work to do. The +%% idle_timeout callback is utilised to ensure that conversions are +%% done as promptly as possible whilst ensuring the queue remains +%% responsive. +%% +%% In the queue we keep track of both messages that are pending +%% delivery and messages that are pending acks. This ensures that +%% purging (deleting the former) and deletion (deleting the former and +%% the latter) are both cheap and do require any scanning through qi +%% segments. +%% +%% Notes on Clean Shutdown +%% (This documents behaviour in variable_queue, queue_index and +%% msg_store.) +%% +%% In order to try to achieve as fast a start-up as possible, if a +%% clean shutdown occurs, we try to save out state to disk to reduce +%% work on startup. In the msg_store this takes the form of the +%% index_module's state, plus the file_summary ets table, and client +%% refs. In the VQ, this takes the form of the count of persistent +%% messages in the queue and references into the msg_stores. The +%% queue_index adds to these terms the details of its segments and +%% stores the terms in the queue directory. +%% +%% Two message stores are used. One is created for persistent messages +%% to durable queues that must survive restarts, and the other is used +%% for all other messages that just happen to need to be written to +%% disk. On start up we can therefore nuke the transient message +%% store, and be sure that the messages in the persistent store are +%% all that we need. +%% +%% The references to the msg_stores are there so that the msg_store +%% knows to only trust its saved state if all of the queues it was +%% previously talking to come up cleanly. Likewise, the queues +%% themselves (esp queue_index) skips work in init if all the queues +%% and msg_store were shutdown cleanly. This gives both good speed +%% improvements and also robustness so that if anything possibly went +%% wrong in shutdown (or there was subsequent manual tampering), all +%% messages and queues that can be recovered are recovered, safely. +%% +%% To delete transient messages lazily, the variable_queue, on +%% startup, stores the next_seq_id reported by the queue_index as the +%% transient_threshold. From that point on, whenever it's reading a +%% message off disk via the queue_index, if the seq_id is below this +%% threshold and the message is transient then it drops the message +%% (the message itself won't exist on disk because it would have been +%% stored in the transient msg_store which would have had its saved +%% state nuked on startup). This avoids the expensive operation of +%% scanning the entire queue on startup in order to delete transient +%% messages that were only pushed to disk to save memory. +%% +%%---------------------------------------------------------------------------- + +-behaviour(rabbit_backing_queue). + +-record(vqstate, + { q1, + q2, + delta, + q3, + q4, + next_seq_id, + pending_ack, + index_state, + msg_store_clients, + on_sync, + durable, + transient_threshold, + + len, + persistent_count, + + duration_target, + target_ram_msg_count, + ram_msg_count, + ram_msg_count_prev, + ram_index_count, + out_counter, + in_counter, + rates + }). + +-record(rates, { egress, ingress, avg_egress, avg_ingress, timestamp }). + +-record(msg_status, + { seq_id, + guid, + msg, + is_persistent, + is_delivered, + msg_on_disk, + index_on_disk + }). + +-record(delta, + { start_seq_id, %% start_seq_id is inclusive + count, + end_seq_id %% end_seq_id is exclusive + }). + +-record(tx, { pending_messages, pending_acks }). + +-record(sync, { acks_persistent, acks_all, pubs, funs }). + +%% When we discover, on publish, that we should write some indices to +%% disk for some betas, the RAM_INDEX_BATCH_SIZE sets the number of +%% betas that we must be due to write indices for before we do any +%% work at all. This is both a minimum and a maximum - we don't write +%% fewer than RAM_INDEX_BATCH_SIZE indices out in one go, and we don't +%% write more - we can always come back on the next publish to do +%% more. +-define(IO_BATCH_SIZE, 64). +-define(PERSISTENT_MSG_STORE, msg_store_persistent). +-define(TRANSIENT_MSG_STORE, msg_store_transient). + +-include("rabbit.hrl"). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-type(timestamp() :: {non_neg_integer(), non_neg_integer(), non_neg_integer()}). +-type(seq_id() :: non_neg_integer()). +-type(ack() :: seq_id() | 'blank_ack'). + +-type(rates() :: #rates { egress :: {timestamp(), non_neg_integer()}, + ingress :: {timestamp(), non_neg_integer()}, + avg_egress :: float(), + avg_ingress :: float(), + timestamp :: timestamp() }). + +-type(delta() :: #delta { start_seq_id :: non_neg_integer(), + count :: non_neg_integer (), + end_seq_id :: non_neg_integer() }). + +-type(sync() :: #sync { acks_persistent :: [[seq_id()]], + acks_all :: [[seq_id()]], + pubs :: [[rabbit_guid:guid()]], + funs :: [fun (() -> any())] }). + +-type(state() :: #vqstate { + q1 :: queue(), + q2 :: bpqueue:bpqueue(), + delta :: delta(), + q3 :: bpqueue:bpqueue(), + q4 :: queue(), + next_seq_id :: seq_id(), + pending_ack :: dict:dictionary(), + index_state :: any(), + msg_store_clients :: 'undefined' | {{any(), binary()}, + {any(), binary()}}, + on_sync :: sync(), + durable :: boolean(), + + len :: non_neg_integer(), + persistent_count :: non_neg_integer(), + + transient_threshold :: non_neg_integer(), + duration_target :: number() | 'infinity', + target_ram_msg_count :: non_neg_integer() | 'infinity', + ram_msg_count :: non_neg_integer(), + ram_msg_count_prev :: non_neg_integer(), + ram_index_count :: non_neg_integer(), + out_counter :: non_neg_integer(), + in_counter :: non_neg_integer(), + rates :: rates() }). + +-include("rabbit_backing_queue_spec.hrl"). + +-endif. + +-define(BLANK_DELTA, #delta { start_seq_id = undefined, + count = 0, + end_seq_id = undefined }). +-define(BLANK_DELTA_PATTERN(Z), #delta { start_seq_id = Z, + count = 0, + end_seq_id = Z }). + +-define(BLANK_SYNC, #sync { acks_persistent = [], + acks_all = [], + pubs = [], + funs = [] }). + +%%---------------------------------------------------------------------------- +%% Public API +%%---------------------------------------------------------------------------- + +start(DurableQueues) -> + {AllTerms, StartFunState} = rabbit_queue_index:recover(DurableQueues), + start_msg_store( + [Ref || Terms <- AllTerms, + begin + Ref = proplists:get_value(persistent_ref, Terms), + Ref =/= undefined + end], + StartFunState). + +stop() -> stop_msg_store(). + +start_msg_store(Refs, StartFunState) -> + ok = rabbit_sup:start_child(?TRANSIENT_MSG_STORE, rabbit_msg_store, + [?TRANSIENT_MSG_STORE, rabbit_mnesia:dir(), + undefined, {fun (ok) -> finished end, ok}]), + ok = rabbit_sup:start_child(?PERSISTENT_MSG_STORE, rabbit_msg_store, + [?PERSISTENT_MSG_STORE, rabbit_mnesia:dir(), + Refs, StartFunState]). + +stop_msg_store() -> + ok = rabbit_sup:stop_child(?PERSISTENT_MSG_STORE), + ok = rabbit_sup:stop_child(?TRANSIENT_MSG_STORE). + +init(QueueName, IsDurable, _Recover) -> + {DeltaCount, Terms, IndexState} = + rabbit_queue_index:init( + QueueName, + rabbit_msg_store:successfully_recovered_state(?PERSISTENT_MSG_STORE), + fun (Guid) -> + rabbit_msg_store:contains(?PERSISTENT_MSG_STORE, Guid) + end), + {LowSeqId, NextSeqId, IndexState1} = rabbit_queue_index:bounds(IndexState), + + {PRef, TRef, Terms1} = + case [persistent_ref, transient_ref] -- proplists:get_keys(Terms) of + [] -> {proplists:get_value(persistent_ref, Terms), + proplists:get_value(transient_ref, Terms), + Terms}; + _ -> {rabbit_guid:guid(), rabbit_guid:guid(), []} + end, + DeltaCount1 = proplists:get_value(persistent_count, Terms1, DeltaCount), + Delta = case DeltaCount1 == 0 andalso DeltaCount /= undefined of + true -> ?BLANK_DELTA; + false -> #delta { start_seq_id = LowSeqId, + count = DeltaCount1, + end_seq_id = NextSeqId } + end, + Now = now(), + PersistentClient = + case IsDurable of + true -> rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, PRef); + false -> undefined + end, + TransientClient = rabbit_msg_store:client_init(?TRANSIENT_MSG_STORE, TRef), + State = #vqstate { + q1 = queue:new(), + q2 = bpqueue:new(), + delta = Delta, + q3 = bpqueue:new(), + q4 = queue:new(), + next_seq_id = NextSeqId, + pending_ack = dict:new(), + index_state = IndexState1, + msg_store_clients = {{PersistentClient, PRef}, + {TransientClient, TRef}}, + on_sync = ?BLANK_SYNC, + durable = IsDurable, + transient_threshold = NextSeqId, + + len = DeltaCount1, + persistent_count = DeltaCount1, + + duration_target = infinity, + target_ram_msg_count = infinity, + ram_msg_count = 0, + ram_msg_count_prev = 0, + ram_index_count = 0, + out_counter = 0, + in_counter = 0, + rates = #rates { egress = {Now, 0}, + ingress = {Now, DeltaCount1}, + avg_egress = 0.0, + avg_ingress = 0.0, + timestamp = Now } }, + a(maybe_deltas_to_betas(State)). + +terminate(State) -> + State1 = #vqstate { persistent_count = PCount, + index_state = IndexState, + msg_store_clients = {{MSCStateP, PRef}, + {MSCStateT, TRef}} } = + remove_pending_ack(true, tx_commit_index(State)), + case MSCStateP of + undefined -> ok; + _ -> rabbit_msg_store:client_terminate(MSCStateP) + end, + rabbit_msg_store:client_terminate(MSCStateT), + Terms = [{persistent_ref, PRef}, + {transient_ref, TRef}, + {persistent_count, PCount}], + a(State1 #vqstate { index_state = rabbit_queue_index:terminate( + Terms, IndexState), + msg_store_clients = undefined }). + +%% the only difference between purge and delete is that delete also +%% needs to delete everything that's been delivered and not ack'd. +delete_and_terminate(State) -> + %% TODO: there is no need to interact with qi at all - which we do + %% as part of 'purge' and 'remove_pending_ack', other than + %% deleting it. + {_PurgeCount, State1} = purge(State), + State2 = #vqstate { index_state = IndexState, + msg_store_clients = {{MSCStateP, PRef}, + {MSCStateT, TRef}} } = + remove_pending_ack(false, State1), + IndexState1 = rabbit_queue_index:delete_and_terminate(IndexState), + case MSCStateP of + undefined -> ok; + _ -> rabbit_msg_store:client_delete_and_terminate( + MSCStateP, ?PERSISTENT_MSG_STORE, PRef), + rabbit_msg_store:client_terminate(MSCStateP) + end, + rabbit_msg_store:client_delete_and_terminate( + MSCStateT, ?TRANSIENT_MSG_STORE, TRef), + a(State2 #vqstate { index_state = IndexState1, + msg_store_clients = undefined }). + +purge(State = #vqstate { q4 = Q4, index_state = IndexState, len = Len }) -> + %% TODO: when there are no pending acks, which is a common case, + %% we could simply wipe the qi instead of issuing delivers and + %% acks for all the messages. + IndexState1 = remove_queue_entries(fun rabbit_misc:queue_fold/3, Q4, + IndexState), + State1 = #vqstate { q1 = Q1, index_state = IndexState2 } = + purge_betas_and_deltas(State #vqstate { q4 = queue:new(), + index_state = IndexState1 }), + IndexState3 = remove_queue_entries(fun rabbit_misc:queue_fold/3, Q1, + IndexState2), + {Len, a(State1 #vqstate { q1 = queue:new(), + index_state = IndexState3, + len = 0, + ram_msg_count = 0, + ram_index_count = 0, + persistent_count = 0 })}. + +publish(Msg, State) -> + {_SeqId, State1} = publish(Msg, false, false, State), + a(reduce_memory_use(State1)). + +publish_delivered(false, _Msg, State = #vqstate { len = 0 }) -> + {blank_ack, a(State)}; +publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent }, + State = #vqstate { len = 0, + next_seq_id = SeqId, + out_counter = OutCount, + in_counter = InCount, + persistent_count = PCount, + pending_ack = PA, + durable = IsDurable }) -> + IsPersistent1 = IsDurable andalso IsPersistent, + MsgStatus = (msg_status(IsPersistent1, SeqId, Msg)) + #msg_status { is_delivered = true }, + {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State), + PA1 = record_pending_ack(m(MsgStatus1), PA), + PCount1 = PCount + one_if(IsPersistent1), + {SeqId, a(State1 #vqstate { next_seq_id = SeqId + 1, + out_counter = OutCount + 1, + in_counter = InCount + 1, + persistent_count = PCount1, + pending_ack = PA1 })}. + +fetch(AckRequired, State = #vqstate { q4 = Q4, + ram_msg_count = RamMsgCount, + out_counter = OutCount, + index_state = IndexState, + len = Len, + persistent_count = PCount, + pending_ack = PA }) -> + case queue:out(Q4) of + {empty, _Q4} -> + case fetch_from_q3_to_q4(State) of + {empty, State1} = Result -> a(State1), Result; + {loaded, State1} -> fetch(AckRequired, State1) + end; + {{value, MsgStatus = #msg_status { + msg = Msg, guid = Guid, seq_id = SeqId, + is_persistent = IsPersistent, is_delivered = IsDelivered, + msg_on_disk = MsgOnDisk, index_on_disk = IndexOnDisk }}, + Q4a} -> + + %% 1. Mark it delivered if necessary + IndexState1 = maybe_write_delivered( + IndexOnDisk andalso not IsDelivered, + SeqId, IndexState), + + %% 2. Remove from msg_store and queue index, if necessary + MsgStore = find_msg_store(IsPersistent), + Rem = fun () -> ok = rabbit_msg_store:remove(MsgStore, [Guid]) end, + Ack = fun () -> rabbit_queue_index:ack([SeqId], IndexState1) end, + IndexState2 = + case {AckRequired, MsgOnDisk, IndexOnDisk, IsPersistent} of + {false, true, false, _} -> Rem(), IndexState1; + {false, true, true, _} -> Rem(), Ack(); + { true, true, true, false} -> Ack(); + _ -> IndexState1 + end, + + %% 3. If an ack is required, add something sensible to PA + {AckTag, PA1} = case AckRequired of + true -> PA2 = record_pending_ack( + MsgStatus #msg_status { + is_delivered = true }, PA), + {SeqId, PA2}; + false -> {blank_ack, PA} + end, + + PCount1 = PCount - one_if(IsPersistent andalso not AckRequired), + Len1 = Len - 1, + {{Msg, IsDelivered, AckTag, Len1}, + a(State #vqstate { q4 = Q4a, + ram_msg_count = RamMsgCount - 1, + out_counter = OutCount + 1, + index_state = IndexState2, + len = Len1, + persistent_count = PCount1, + pending_ack = PA1 })} + end. + +ack(AckTags, State) -> + a(ack(fun rabbit_msg_store:remove/2, + fun (_AckEntry, State1) -> State1 end, + AckTags, State)). + +tx_publish(Txn, Msg = #basic_message { is_persistent = IsPersistent }, + State = #vqstate { durable = IsDurable, + msg_store_clients = MSCState }) -> + Tx = #tx { pending_messages = Pubs } = lookup_tx(Txn), + store_tx(Txn, Tx #tx { pending_messages = [Msg | Pubs] }), + a(case IsPersistent andalso IsDurable of + true -> MsgStatus = msg_status(true, undefined, Msg), + {#msg_status { msg_on_disk = true }, MSCState1} = + maybe_write_msg_to_disk(false, MsgStatus, MSCState), + State #vqstate { msg_store_clients = MSCState1 }; + false -> State + end). + +tx_ack(Txn, AckTags, State) -> + Tx = #tx { pending_acks = Acks } = lookup_tx(Txn), + store_tx(Txn, Tx #tx { pending_acks = [AckTags | Acks] }), + State. + +tx_rollback(Txn, State = #vqstate { durable = IsDurable }) -> + #tx { pending_acks = AckTags, pending_messages = Pubs } = lookup_tx(Txn), + erase_tx(Txn), + ok = case IsDurable of + true -> rabbit_msg_store:remove(?PERSISTENT_MSG_STORE, + persistent_guids(Pubs)); + false -> ok + end, + {lists:append(AckTags), a(State)}. + +tx_commit(Txn, Fun, State = #vqstate { durable = IsDurable }) -> + #tx { pending_acks = AckTags, pending_messages = Pubs } = lookup_tx(Txn), + erase_tx(Txn), + PubsOrdered = lists:reverse(Pubs), + AckTags1 = lists:append(AckTags), + PersistentGuids = persistent_guids(PubsOrdered), + HasPersistentPubs = PersistentGuids =/= [], + {AckTags1, + a(case IsDurable andalso HasPersistentPubs of + true -> ok = rabbit_msg_store:sync( + ?PERSISTENT_MSG_STORE, PersistentGuids, + msg_store_callback(PersistentGuids, + PubsOrdered, AckTags1, Fun)), + State; + false -> tx_commit_post_msg_store( + HasPersistentPubs, PubsOrdered, AckTags1, Fun, State) + end)}. + +requeue(AckTags, State) -> + a(reduce_memory_use( + ack(fun rabbit_msg_store:release/2, + fun (#msg_status { msg = Msg }, State1) -> + {_SeqId, State2} = publish(Msg, true, false, State1), + State2; + ({IsPersistent, Guid}, State1) -> + #vqstate { msg_store_clients = MSCState } = State1, + {{ok, Msg = #basic_message{}}, MSCState1} = + read_from_msg_store(MSCState, IsPersistent, Guid), + State2 = State1 #vqstate { msg_store_clients = MSCState1 }, + {_SeqId, State3} = publish(Msg, true, true, State2), + State3 + end, + AckTags, State))). + +len(#vqstate { len = Len }) -> Len. + +is_empty(State) -> 0 == len(State). + +set_ram_duration_target(DurationTarget, + State = #vqstate { + rates = #rates { avg_egress = AvgEgressRate, + avg_ingress = AvgIngressRate }, + target_ram_msg_count = TargetRamMsgCount }) -> + Rate = AvgEgressRate + AvgIngressRate, + TargetRamMsgCount1 = + case DurationTarget of + infinity -> infinity; + _ -> trunc(DurationTarget * Rate) %% msgs = sec * msgs/sec + end, + State1 = State #vqstate { target_ram_msg_count = TargetRamMsgCount1, + duration_target = DurationTarget }, + a(case TargetRamMsgCount1 == infinity orelse + (TargetRamMsgCount =/= infinity andalso + TargetRamMsgCount1 >= TargetRamMsgCount) of + true -> State1; + false -> reduce_memory_use(State1) + end). + +ram_duration(State = #vqstate { + rates = #rates { egress = Egress, + ingress = Ingress, + timestamp = Timestamp } = Rates, + in_counter = InCount, + out_counter = OutCount, + ram_msg_count = RamMsgCount, + duration_target = DurationTarget, + ram_msg_count_prev = RamMsgCountPrev }) -> + Now = now(), + {AvgEgressRate, Egress1} = update_rate(Now, Timestamp, OutCount, Egress), + {AvgIngressRate, Ingress1} = update_rate(Now, Timestamp, InCount, Ingress), + + Duration = %% msgs / (msgs/sec) == sec + case AvgEgressRate == 0 andalso AvgIngressRate == 0 of + true -> infinity; + false -> (RamMsgCountPrev + RamMsgCount) / + (2 * (AvgEgressRate + AvgIngressRate)) + end, + + {Duration, set_ram_duration_target( + DurationTarget, + State #vqstate { + rates = Rates #rates { + egress = Egress1, + ingress = Ingress1, + avg_egress = AvgEgressRate, + avg_ingress = AvgIngressRate, + timestamp = Now }, + in_counter = 0, + out_counter = 0, + ram_msg_count_prev = RamMsgCount })}. + +needs_idle_timeout(State = #vqstate { on_sync = ?BLANK_SYNC }) -> + {Res, _State} = reduce_memory_use(fun (_Quota, State1) -> State1 end, + fun (_Quota, State1) -> State1 end, + fun (State1) -> State1 end, + State), + Res; +needs_idle_timeout(_State) -> + true. + +idle_timeout(State) -> a(reduce_memory_use(tx_commit_index(State))). + +handle_pre_hibernate(State = #vqstate { index_state = IndexState }) -> + State #vqstate { index_state = rabbit_queue_index:flush(IndexState) }. + +status(#vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, + len = Len, + pending_ack = PA, + on_sync = #sync { funs = From }, + target_ram_msg_count = TargetRamMsgCount, + ram_msg_count = RamMsgCount, + ram_index_count = RamIndexCount, + next_seq_id = NextSeqId, + persistent_count = PersistentCount, + rates = #rates { + avg_egress = AvgEgressRate, + avg_ingress = AvgIngressRate } }) -> + [ {q1 , queue:len(Q1)}, + {q2 , bpqueue:len(Q2)}, + {delta , Delta}, + {q3 , bpqueue:len(Q3)}, + {q4 , queue:len(Q4)}, + {len , Len}, + {pending_acks , dict:size(PA)}, + {outstanding_txns , length(From)}, + {target_ram_msg_count , TargetRamMsgCount}, + {ram_msg_count , RamMsgCount}, + {ram_index_count , RamIndexCount}, + {next_seq_id , NextSeqId}, + {persistent_count , PersistentCount}, + {avg_egress_rate , AvgEgressRate}, + {avg_ingress_rate , AvgIngressRate} ]. + +%%---------------------------------------------------------------------------- +%% Minor helpers +%%---------------------------------------------------------------------------- + +a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, + len = Len, + persistent_count = PersistentCount, + ram_msg_count = RamMsgCount, + ram_index_count = RamIndexCount }) -> + E1 = queue:is_empty(Q1), + E2 = bpqueue:is_empty(Q2), + ED = Delta#delta.count == 0, + E3 = bpqueue:is_empty(Q3), + E4 = queue:is_empty(Q4), + LZ = Len == 0, + + true = E1 or not E3, + true = E2 or not ED, + true = ED or not E3, + true = LZ == (E3 and E4), + + true = Len >= 0, + true = PersistentCount >= 0, + true = RamMsgCount >= 0, + true = RamIndexCount >= 0, + + State. + +m(MsgStatus = #msg_status { msg = Msg, + is_persistent = IsPersistent, + msg_on_disk = MsgOnDisk, + index_on_disk = IndexOnDisk }) -> + true = (not IsPersistent) or IndexOnDisk, + true = (not IndexOnDisk) or MsgOnDisk, + true = (Msg =/= undefined) or MsgOnDisk, + + MsgStatus. + +one_if(true ) -> 1; +one_if(false) -> 0. + +cons_if(true, E, L) -> [E | L]; +cons_if(false, _E, L) -> L. + +msg_status(IsPersistent, SeqId, Msg = #basic_message { guid = Guid }) -> + #msg_status { seq_id = SeqId, guid = Guid, msg = Msg, + is_persistent = IsPersistent, is_delivered = false, + msg_on_disk = false, index_on_disk = false }. + +find_msg_store(true) -> ?PERSISTENT_MSG_STORE; +find_msg_store(false) -> ?TRANSIENT_MSG_STORE. + +with_msg_store_state({{MSCStateP, PRef}, MSCStateT}, true, Fun) -> + {Result, MSCStateP1} = Fun(?PERSISTENT_MSG_STORE, MSCStateP), + {Result, {{MSCStateP1, PRef}, MSCStateT}}; +with_msg_store_state({MSCStateP, {MSCStateT, TRef}}, false, Fun) -> + {Result, MSCStateT1} = Fun(?TRANSIENT_MSG_STORE, MSCStateT), + {Result, {MSCStateP, {MSCStateT1, TRef}}}. + +read_from_msg_store(MSCState, IsPersistent, Guid) -> + with_msg_store_state( + MSCState, IsPersistent, + fun (MsgStore, MSCState1) -> + rabbit_msg_store:read(MsgStore, Guid, MSCState1) + end). + +maybe_write_delivered(false, _SeqId, IndexState) -> + IndexState; +maybe_write_delivered(true, SeqId, IndexState) -> + rabbit_queue_index:deliver([SeqId], IndexState). + +lookup_tx(Txn) -> case get({txn, Txn}) of + undefined -> #tx { pending_messages = [], + pending_acks = [] }; + V -> V + end. + +store_tx(Txn, Tx) -> put({txn, Txn}, Tx). + +erase_tx(Txn) -> erase({txn, Txn}). + +persistent_guids(Pubs) -> + [Guid || #basic_message { guid = Guid, is_persistent = true } <- Pubs]. + +betas_from_index_entries(List, TransientThreshold, IndexState) -> + {Filtered, Delivers, Acks} = + lists:foldr( + fun ({Guid, SeqId, IsPersistent, IsDelivered}, + {Filtered1, Delivers1, Acks1}) -> + case SeqId < TransientThreshold andalso not IsPersistent of + true -> {Filtered1, + cons_if(not IsDelivered, SeqId, Delivers1), + [SeqId | Acks1]}; + false -> {[m(#msg_status { msg = undefined, + guid = Guid, + seq_id = SeqId, + is_persistent = IsPersistent, + is_delivered = IsDelivered, + msg_on_disk = true, + index_on_disk = true + }) | Filtered1], + Delivers1, + Acks1} + end + end, {[], [], []}, List), + {bpqueue:from_list([{true, Filtered}]), + rabbit_queue_index:ack(Acks, + rabbit_queue_index:deliver(Delivers, IndexState))}. + +%% the first arg is the older delta +combine_deltas(?BLANK_DELTA_PATTERN(X), ?BLANK_DELTA_PATTERN(Y)) -> + ?BLANK_DELTA; +combine_deltas(?BLANK_DELTA_PATTERN(X), #delta { start_seq_id = Start, + count = Count, + end_seq_id = End } = B) -> + true = Start + Count =< End, %% ASSERTION + B; +combine_deltas(#delta { start_seq_id = Start, + count = Count, + end_seq_id = End } = A, ?BLANK_DELTA_PATTERN(Y)) -> + true = Start + Count =< End, %% ASSERTION + A; +combine_deltas(#delta { start_seq_id = StartLow, + count = CountLow, + end_seq_id = EndLow }, + #delta { start_seq_id = StartHigh, + count = CountHigh, + end_seq_id = EndHigh }) -> + Count = CountLow + CountHigh, + true = (StartLow =< StartHigh) %% ASSERTIONS + andalso ((StartLow + CountLow) =< EndLow) + andalso ((StartHigh + CountHigh) =< EndHigh) + andalso ((StartLow + Count) =< EndHigh), + #delta { start_seq_id = StartLow, count = Count, end_seq_id = EndHigh }. + +beta_fold(Fun, Init, Q) -> + bpqueue:foldr(fun (_Prefix, Value, Acc) -> Fun(Value, Acc) end, Init, Q). + +update_rate(Now, Then, Count, {OThen, OCount}) -> + %% avg over the current period and the previous + {1000000.0 * (Count + OCount) / timer:now_diff(Now, OThen), {Then, Count}}. + +%%---------------------------------------------------------------------------- +%% Internal major helpers for Public API +%%---------------------------------------------------------------------------- + +msg_store_callback(PersistentGuids, Pubs, AckTags, Fun) -> + Self = self(), + F = fun () -> rabbit_amqqueue:maybe_run_queue_via_backing_queue( + Self, fun (StateN) -> tx_commit_post_msg_store( + true, Pubs, AckTags, Fun, StateN) + end) + end, + fun () -> spawn(fun () -> ok = rabbit_misc:with_exit_handler( + fun () -> rabbit_msg_store:remove( + ?PERSISTENT_MSG_STORE, + PersistentGuids) + end, F) + end) + end. + +tx_commit_post_msg_store(HasPersistentPubs, Pubs, AckTags, Fun, + State = #vqstate { + on_sync = OnSync = #sync { + acks_persistent = SPAcks, + acks_all = SAcks, + pubs = SPubs, + funs = SFuns }, + pending_ack = PA, + durable = IsDurable }) -> + PersistentAcks = + case IsDurable of + true -> [AckTag || AckTag <- AckTags, + case dict:fetch(AckTag, PA) of + #msg_status {} -> false; + {IsPersistent, _Guid} -> IsPersistent + end]; + false -> [] + end, + case IsDurable andalso (HasPersistentPubs orelse PersistentAcks =/= []) of + true -> State #vqstate { on_sync = #sync { + acks_persistent = [PersistentAcks | SPAcks], + acks_all = [AckTags | SAcks], + pubs = [Pubs | SPubs], + funs = [Fun | SFuns] }}; + false -> State1 = tx_commit_index( + State #vqstate { on_sync = #sync { + acks_persistent = [], + acks_all = [AckTags], + pubs = [Pubs], + funs = [Fun] } }), + State1 #vqstate { on_sync = OnSync } + end. + +tx_commit_index(State = #vqstate { on_sync = ?BLANK_SYNC }) -> + State; +tx_commit_index(State = #vqstate { on_sync = #sync { + acks_persistent = SPAcks, + acks_all = SAcks, + pubs = SPubs, + funs = SFuns }, + durable = IsDurable }) -> + PAcks = lists:append(SPAcks), + Acks = lists:append(SAcks), + Pubs = lists:append(lists:reverse(SPubs)), + {SeqIds, State1 = #vqstate { index_state = IndexState }} = + lists:foldl( + fun (Msg = #basic_message { is_persistent = IsPersistent }, + {SeqIdsAcc, State2}) -> + IsPersistent1 = IsDurable andalso IsPersistent, + {SeqId, State3} = publish(Msg, false, IsPersistent1, State2), + {cons_if(IsPersistent1, SeqId, SeqIdsAcc), State3} + end, {PAcks, ack(Acks, State)}, Pubs), + IndexState1 = rabbit_queue_index:sync(SeqIds, IndexState), + [ Fun() || Fun <- lists:reverse(SFuns) ], + reduce_memory_use( + State1 #vqstate { index_state = IndexState1, on_sync = ?BLANK_SYNC }). + +purge_betas_and_deltas(State = #vqstate { q3 = Q3, + index_state = IndexState }) -> + case bpqueue:is_empty(Q3) of + true -> State; + false -> IndexState1 = remove_queue_entries(fun beta_fold/3, Q3, + IndexState), + purge_betas_and_deltas( + maybe_deltas_to_betas( + State #vqstate { q3 = bpqueue:new(), + index_state = IndexState1 })) + end. + +remove_queue_entries(Fold, Q, IndexState) -> + {GuidsByStore, Delivers, Acks} = + Fold(fun remove_queue_entries1/2, {dict:new(), [], []}, Q), + ok = dict:fold(fun (MsgStore, Guids, ok) -> + rabbit_msg_store:remove(MsgStore, Guids) + end, ok, GuidsByStore), + rabbit_queue_index:ack(Acks, + rabbit_queue_index:deliver(Delivers, IndexState)). + +remove_queue_entries1( + #msg_status { guid = Guid, seq_id = SeqId, + is_delivered = IsDelivered, msg_on_disk = MsgOnDisk, + index_on_disk = IndexOnDisk, is_persistent = IsPersistent }, + {GuidsByStore, Delivers, Acks}) -> + {case MsgOnDisk of + true -> rabbit_misc:dict_cons(find_msg_store(IsPersistent), Guid, + GuidsByStore); + false -> GuidsByStore + end, + cons_if(IndexOnDisk andalso not IsDelivered, SeqId, Delivers), + cons_if(IndexOnDisk, SeqId, Acks)}. + +%%---------------------------------------------------------------------------- +%% Internal gubbins for publishing +%%---------------------------------------------------------------------------- + +publish(Msg = #basic_message { is_persistent = IsPersistent }, + IsDelivered, MsgOnDisk, + State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4, + next_seq_id = SeqId, + len = Len, + in_counter = InCount, + persistent_count = PCount, + durable = IsDurable, + ram_msg_count = RamMsgCount }) -> + IsPersistent1 = IsDurable andalso IsPersistent, + MsgStatus = (msg_status(IsPersistent1, SeqId, Msg)) + #msg_status { is_delivered = IsDelivered, msg_on_disk = MsgOnDisk }, + {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State), + State2 = case bpqueue:is_empty(Q3) of + false -> State1 #vqstate { q1 = queue:in(m(MsgStatus1), Q1) }; + true -> State1 #vqstate { q4 = queue:in(m(MsgStatus1), Q4) } + end, + PCount1 = PCount + one_if(IsPersistent1), + {SeqId, State2 #vqstate { next_seq_id = SeqId + 1, + len = Len + 1, + in_counter = InCount + 1, + persistent_count = PCount1, + ram_msg_count = RamMsgCount + 1}}. + +maybe_write_msg_to_disk(_Force, MsgStatus = #msg_status { + msg_on_disk = true }, MSCState) -> + {MsgStatus, MSCState}; +maybe_write_msg_to_disk(Force, MsgStatus = #msg_status { + msg = Msg, guid = Guid, + is_persistent = IsPersistent }, MSCState) + when Force orelse IsPersistent -> + {ok, MSCState1} = + with_msg_store_state( + MSCState, IsPersistent, + fun (MsgStore, MSCState2) -> + Msg1 = Msg #basic_message { + %% don't persist any recoverable decoded properties + content = rabbit_binary_parser:clear_decoded_content( + Msg #basic_message.content)}, + rabbit_msg_store:write(MsgStore, Guid, Msg1, MSCState2) + end), + {MsgStatus #msg_status { msg_on_disk = true }, MSCState1}; +maybe_write_msg_to_disk(_Force, MsgStatus, MSCState) -> + {MsgStatus, MSCState}. + +maybe_write_index_to_disk(_Force, MsgStatus = #msg_status { + index_on_disk = true }, IndexState) -> + true = MsgStatus #msg_status.msg_on_disk, %% ASSERTION + {MsgStatus, IndexState}; +maybe_write_index_to_disk(Force, MsgStatus = #msg_status { + guid = Guid, seq_id = SeqId, + is_persistent = IsPersistent, + is_delivered = IsDelivered }, IndexState) + when Force orelse IsPersistent -> + true = MsgStatus #msg_status.msg_on_disk, %% ASSERTION + IndexState1 = rabbit_queue_index:publish(Guid, SeqId, IsPersistent, + IndexState), + {MsgStatus #msg_status { index_on_disk = true }, + maybe_write_delivered(IsDelivered, SeqId, IndexState1)}; +maybe_write_index_to_disk(_Force, MsgStatus, IndexState) -> + {MsgStatus, IndexState}. + +maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus, + State = #vqstate { index_state = IndexState, + msg_store_clients = MSCState }) -> + {MsgStatus1, MSCState1} = maybe_write_msg_to_disk( + ForceMsg, MsgStatus, MSCState), + {MsgStatus2, IndexState1} = maybe_write_index_to_disk( + ForceIndex, MsgStatus1, IndexState), + {MsgStatus2, State #vqstate { index_state = IndexState1, + msg_store_clients = MSCState1 }}. + +%%---------------------------------------------------------------------------- +%% Internal gubbins for acks +%%---------------------------------------------------------------------------- + +record_pending_ack(#msg_status { guid = Guid, seq_id = SeqId, + is_persistent = IsPersistent, + msg_on_disk = MsgOnDisk } = MsgStatus, PA) -> + AckEntry = case MsgOnDisk of + true -> {IsPersistent, Guid}; + false -> MsgStatus + end, + dict:store(SeqId, AckEntry, PA). + +remove_pending_ack(KeepPersistent, + State = #vqstate { pending_ack = PA, + index_state = IndexState }) -> + {SeqIds, GuidsByStore} = dict:fold(fun accumulate_ack/3, + {[], dict:new()}, PA), + State1 = State #vqstate { pending_ack = dict:new() }, + case KeepPersistent of + true -> case dict:find(?TRANSIENT_MSG_STORE, GuidsByStore) of + error -> State1; + {ok, Guids} -> ok = rabbit_msg_store:remove( + ?TRANSIENT_MSG_STORE, Guids), + State1 + end; + false -> IndexState1 = rabbit_queue_index:ack(SeqIds, IndexState), + ok = dict:fold(fun (MsgStore, Guids, ok) -> + rabbit_msg_store:remove(MsgStore, Guids) + end, ok, GuidsByStore), + State1 #vqstate { index_state = IndexState1 } + end. + +ack(_MsgStoreFun, _Fun, [], State) -> + State; +ack(MsgStoreFun, Fun, AckTags, State) -> + {{SeqIds, GuidsByStore}, State1 = #vqstate { index_state = IndexState, + persistent_count = PCount }} = + lists:foldl( + fun (SeqId, {Acc, State2 = #vqstate { pending_ack = PA }}) -> + {ok, AckEntry} = dict:find(SeqId, PA), + {accumulate_ack(SeqId, AckEntry, Acc), + Fun(AckEntry, State2 #vqstate { + pending_ack = dict:erase(SeqId, PA) })} + end, {{[], dict:new()}, State}, AckTags), + IndexState1 = rabbit_queue_index:ack(SeqIds, IndexState), + ok = dict:fold(fun (MsgStore, Guids, ok) -> + MsgStoreFun(MsgStore, Guids) + end, ok, GuidsByStore), + PCount1 = PCount - case dict:find(?PERSISTENT_MSG_STORE, GuidsByStore) of + error -> 0; + {ok, Guids} -> length(Guids) + end, + State1 #vqstate { index_state = IndexState1, + persistent_count = PCount1 }. + +accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS + msg_on_disk = false, + index_on_disk = false }, Acc) -> + Acc; +accumulate_ack(SeqId, {IsPersistent, Guid}, {SeqIdsAcc, Dict}) -> + {cons_if(IsPersistent, SeqId, SeqIdsAcc), + rabbit_misc:dict_cons(find_msg_store(IsPersistent), Guid, Dict)}. + +%%---------------------------------------------------------------------------- +%% Phase changes +%%---------------------------------------------------------------------------- + +%% Determine whether a reduction in memory use is necessary, and call +%% functions to perform the required phase changes. The function can +%% also be used to just do the former, by passing in dummy phase +%% change functions. +%% +%% The function does not report on any needed beta->delta conversions, +%% though the conversion function for that is called as necessary. The +%% reason is twofold. Firstly, this is safe because the conversion is +%% only ever necessary just after a transition to a +%% target_ram_msg_count of zero or after an incremental alpha->beta +%% conversion. In the former case the conversion is performed straight +%% away (i.e. any betas present at the time are converted to deltas), +%% and in the latter case the need for a conversion is flagged up +%% anyway. Secondly, this is necessary because we do not have a +%% precise and cheap predicate for determining whether a beta->delta +%% conversion is necessary - due to the complexities of retaining up +%% one segment's worth of messages in q3 - and thus would risk +%% perpetually reporting the need for a conversion when no such +%% conversion is needed. That in turn could cause an infinite loop. +reduce_memory_use(AlphaBetaFun, BetaGammaFun, BetaDeltaFun, State) -> + {Reduce, State1} = case chunk_size(State #vqstate.ram_msg_count, + State #vqstate.target_ram_msg_count) of + 0 -> {false, State}; + S1 -> {true, AlphaBetaFun(S1, State)} + end, + case State1 #vqstate.target_ram_msg_count of + infinity -> {Reduce, State1}; + 0 -> {Reduce, BetaDeltaFun(State1)}; + _ -> case chunk_size(State1 #vqstate.ram_index_count, + permitted_ram_index_count(State1)) of + ?IO_BATCH_SIZE = S2 -> {true, BetaGammaFun(S2, State1)}; + _ -> {Reduce, State1} + end + end. + +reduce_memory_use(State) -> + {_, State1} = reduce_memory_use(fun push_alphas_to_betas/2, + fun limit_ram_index/2, + fun push_betas_to_deltas/1, + State), + State1. + +limit_ram_index(Quota, State = #vqstate { q2 = Q2, q3 = Q3, + index_state = IndexState, + ram_index_count = RamIndexCount }) -> + {Q2a, {Quota1, IndexState1}} = limit_ram_index( + fun bpqueue:map_fold_filter_r/4, + Q2, {Quota, IndexState}), + %% TODO: we shouldn't be writing index entries for messages that + %% can never end up in delta due them residing in the only segment + %% held by q3. + {Q3a, {Quota2, IndexState2}} = limit_ram_index( + fun bpqueue:map_fold_filter_r/4, + Q3, {Quota1, IndexState1}), + State #vqstate { q2 = Q2a, q3 = Q3a, + index_state = IndexState2, + ram_index_count = RamIndexCount - (Quota - Quota2) }. + +limit_ram_index(_MapFoldFilterFun, Q, {0, IndexState}) -> + {Q, {0, IndexState}}; +limit_ram_index(MapFoldFilterFun, Q, {Quota, IndexState}) -> + MapFoldFilterFun( + fun erlang:'not'/1, + fun (MsgStatus, {0, _IndexStateN}) -> + false = MsgStatus #msg_status.index_on_disk, %% ASSERTION + stop; + (MsgStatus, {N, IndexStateN}) when N > 0 -> + false = MsgStatus #msg_status.index_on_disk, %% ASSERTION + {MsgStatus1, IndexStateN1} = + maybe_write_index_to_disk(true, MsgStatus, IndexStateN), + {true, m(MsgStatus1), {N-1, IndexStateN1}} + end, {Quota, IndexState}, Q). + +permitted_ram_index_count(#vqstate { len = 0 }) -> + infinity; +permitted_ram_index_count(#vqstate { len = Len, + q2 = Q2, + q3 = Q3, + delta = #delta { count = DeltaCount } }) -> + BetaLen = bpqueue:len(Q2) + bpqueue:len(Q3), + BetaLen - trunc(BetaLen * BetaLen / (Len - DeltaCount)). + +chunk_size(Current, Permitted) + when Permitted =:= infinity orelse Permitted >= Current -> + 0; +chunk_size(Current, Permitted) -> + lists:min([Current - Permitted, ?IO_BATCH_SIZE]). + +fetch_from_q3_to_q4(State = #vqstate { + q1 = Q1, + q2 = Q2, + delta = #delta { count = DeltaCount }, + q3 = Q3, + q4 = Q4, + ram_msg_count = RamMsgCount, + ram_index_count = RamIndexCount, + msg_store_clients = MSCState }) -> + case bpqueue:out(Q3) of + {empty, _Q3} -> + {empty, State}; + {{value, IndexOnDisk, MsgStatus = #msg_status { + msg = undefined, guid = Guid, + is_persistent = IsPersistent }}, Q3a} -> + {{ok, Msg = #basic_message {}}, MSCState1} = + read_from_msg_store(MSCState, IsPersistent, Guid), + Q4a = queue:in(m(MsgStatus #msg_status { msg = Msg }), Q4), + RamIndexCount1 = RamIndexCount - one_if(not IndexOnDisk), + true = RamIndexCount1 >= 0, %% ASSERTION + State1 = State #vqstate { q3 = Q3a, + q4 = Q4a, + ram_msg_count = RamMsgCount + 1, + ram_index_count = RamIndexCount1, + msg_store_clients = MSCState1 }, + State2 = + case {bpqueue:is_empty(Q3a), 0 == DeltaCount} of + {true, true} -> + %% q3 is now empty, it wasn't before; delta is + %% still empty. So q2 must be empty, and q1 + %% can now be joined onto q4 + true = bpqueue:is_empty(Q2), %% ASSERTION + State1 #vqstate { q1 = queue:new(), + q4 = queue:join(Q4a, Q1) }; + {true, false} -> + maybe_deltas_to_betas(State1); + {false, _} -> + %% q3 still isn't empty, we've not touched + %% delta, so the invariants between q1, q2, + %% delta and q3 are maintained + State1 + end, + {loaded, State2} + end. + +maybe_deltas_to_betas(State = #vqstate { delta = ?BLANK_DELTA_PATTERN(X) }) -> + State; +maybe_deltas_to_betas(State = #vqstate { + q2 = Q2, + delta = Delta, + q3 = Q3, + index_state = IndexState, + target_ram_msg_count = TargetRamMsgCount, + transient_threshold = TransientThreshold }) -> + case bpqueue:is_empty(Q3) orelse (TargetRamMsgCount /= 0) of + false -> + State; + true -> + #delta { start_seq_id = DeltaSeqId, + count = DeltaCount, + end_seq_id = DeltaSeqIdEnd } = Delta, + DeltaSeqId1 = + lists:min([rabbit_queue_index:next_segment_boundary(DeltaSeqId), + DeltaSeqIdEnd]), + {List, IndexState1} = + rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1, IndexState), + {Q3a, IndexState2} = betas_from_index_entries( + List, TransientThreshold, IndexState1), + State1 = State #vqstate { index_state = IndexState2 }, + case bpqueue:len(Q3a) of + 0 -> + %% we ignored every message in the segment due to + %% it being transient and below the threshold + maybe_deltas_to_betas( + State #vqstate { + delta = Delta #delta { start_seq_id = DeltaSeqId1 }}); + Q3aLen -> + Q3b = bpqueue:join(Q3, Q3a), + case DeltaCount - Q3aLen of + 0 -> + %% delta is now empty, but it wasn't + %% before, so can now join q2 onto q3 + State1 #vqstate { q2 = bpqueue:new(), + delta = ?BLANK_DELTA, + q3 = bpqueue:join(Q3b, Q2) }; + N when N > 0 -> + Delta1 = #delta { start_seq_id = DeltaSeqId1, + count = N, + end_seq_id = DeltaSeqIdEnd }, + State1 #vqstate { delta = Delta1, + q3 = Q3b } + end + end + end. + +push_alphas_to_betas(Quota, State) -> + { Quota1, State1} = maybe_push_q1_to_betas(Quota, State), + {_Quota2, State2} = maybe_push_q4_to_betas(Quota1, State1), + State2. + +maybe_push_q1_to_betas(Quota, State = #vqstate { q1 = Q1 }) -> + maybe_push_alphas_to_betas( + fun queue:out/1, + fun (MsgStatus = #msg_status { index_on_disk = IndexOnDisk }, + Q1a, State1 = #vqstate { q3 = Q3, delta = #delta { count = 0 } }) -> + State1 #vqstate { q1 = Q1a, + q3 = bpqueue:in(IndexOnDisk, MsgStatus, Q3) }; + (MsgStatus = #msg_status { index_on_disk = IndexOnDisk }, + Q1a, State1 = #vqstate { q2 = Q2 }) -> + State1 #vqstate { q1 = Q1a, + q2 = bpqueue:in(IndexOnDisk, MsgStatus, Q2) } + end, Quota, Q1, State). + +maybe_push_q4_to_betas(Quota, State = #vqstate { q4 = Q4 }) -> + maybe_push_alphas_to_betas( + fun queue:out_r/1, + fun (MsgStatus = #msg_status { index_on_disk = IndexOnDisk }, + Q4a, State1 = #vqstate { q3 = Q3 }) -> + State1 #vqstate { q3 = bpqueue:in_r(IndexOnDisk, MsgStatus, Q3), + q4 = Q4a } + end, Quota, Q4, State). + +maybe_push_alphas_to_betas(_Generator, _Consumer, Quota, _Q, + State = #vqstate { + ram_msg_count = RamMsgCount, + target_ram_msg_count = TargetRamMsgCount }) + when Quota =:= 0 orelse + TargetRamMsgCount =:= infinity orelse TargetRamMsgCount >= RamMsgCount -> + {Quota, State}; +maybe_push_alphas_to_betas(Generator, Consumer, Quota, Q, State) -> + case Generator(Q) of + {empty, _Q} -> + {Quota, State}; + {{value, MsgStatus}, Qa} -> + {MsgStatus1 = #msg_status { msg_on_disk = true, + index_on_disk = IndexOnDisk }, + State1 = #vqstate { ram_msg_count = RamMsgCount, + ram_index_count = RamIndexCount }} = + maybe_write_to_disk(true, false, MsgStatus, State), + MsgStatus2 = m(MsgStatus1 #msg_status { msg = undefined }), + RamIndexCount1 = RamIndexCount + one_if(not IndexOnDisk), + State2 = State1 #vqstate { ram_msg_count = RamMsgCount - 1, + ram_index_count = RamIndexCount1 }, + maybe_push_alphas_to_betas(Generator, Consumer, Quota - 1, Qa, + Consumer(MsgStatus2, Qa, State2)) + end. + +push_betas_to_deltas(State = #vqstate { q2 = Q2, + delta = Delta, + q3 = Q3, + index_state = IndexState, + ram_index_count = RamIndexCount }) -> + {Delta2, Q2a, RamIndexCount2, IndexState2} = + push_betas_to_deltas(fun (Q2MinSeqId) -> Q2MinSeqId end, + fun bpqueue:out/1, Q2, + RamIndexCount, IndexState), + {Delta3, Q3a, RamIndexCount3, IndexState3} = + push_betas_to_deltas(fun rabbit_queue_index:next_segment_boundary/1, + fun bpqueue:out_r/1, Q3, + RamIndexCount2, IndexState2), + Delta4 = combine_deltas(Delta3, combine_deltas(Delta, Delta2)), + State #vqstate { q2 = Q2a, + delta = Delta4, + q3 = Q3a, + index_state = IndexState3, + ram_index_count = RamIndexCount3 }. + +push_betas_to_deltas(LimitFun, Generator, Q, RamIndexCount, IndexState) -> + case bpqueue:out(Q) of + {empty, _Q} -> + {?BLANK_DELTA, Q, RamIndexCount, IndexState}; + {{value, _IndexOnDisk1, #msg_status { seq_id = MinSeqId }}, _Qa} -> + {{value, _IndexOnDisk2, #msg_status { seq_id = MaxSeqId }}, _Qb} = + bpqueue:out_r(Q), + Limit = LimitFun(MinSeqId), + case MaxSeqId < Limit of + true -> {?BLANK_DELTA, Q, RamIndexCount, IndexState}; + false -> {Len, Qc, RamIndexCount1, IndexState1} = + push_betas_to_deltas(Generator, Limit, Q, 0, + RamIndexCount, IndexState), + {#delta { start_seq_id = Limit, + count = Len, + end_seq_id = MaxSeqId + 1 }, + Qc, RamIndexCount1, IndexState1} + end + end. + +push_betas_to_deltas(Generator, Limit, Q, Count, RamIndexCount, IndexState) -> + case Generator(Q) of + {empty, _Q} -> + {Count, Q, RamIndexCount, IndexState}; + {{value, _IndexOnDisk, #msg_status { seq_id = SeqId }}, _Qa} + when SeqId < Limit -> + {Count, Q, RamIndexCount, IndexState}; + {{value, IndexOnDisk, MsgStatus}, Qa} -> + {RamIndexCount1, IndexState1} = + case IndexOnDisk of + true -> {RamIndexCount, IndexState}; + false -> {#msg_status { index_on_disk = true }, + IndexState2} = + maybe_write_index_to_disk(true, MsgStatus, + IndexState), + {RamIndexCount - 1, IndexState2} + end, + push_betas_to_deltas( + Generator, Limit, Qa, Count + 1, RamIndexCount1, IndexState1) + end. |