diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2011-11-07 12:47:18 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2011-11-07 12:47:18 +0000 |
commit | a4460624b982c294bd3bf35d13474b6191c01998 (patch) | |
tree | 24ac469a2a701d91b01da2c7e985267a754bf7fc | |
parent | 8310e0bd8b4f2ddeae5549ec23883b675f20a73e (diff) | |
parent | 1d31bd97926db8fcce132906c17cf4105467df9a (diff) | |
download | rabbitmq-server-a4460624b982c294bd3bf35d13474b6191c01998.tar.gz |
Merge bug24430.
32 files changed, 1011 insertions, 1197 deletions
@@ -42,7 +42,8 @@ erlangTypeMap = { def convertTable(d): if len(d) == 0: return "[]" - else: raise 'Non-empty table defaults not supported', d + else: + raise Exception('Non-empty table defaults not supported ' + d) erlangDefaultValueTypeConvMap = { bool : lambda x: str(x).lower(), @@ -229,9 +230,32 @@ def genErl(spec): print " %s;" % (recordConstructorExpr,) def genDecodeProperties(c): - print "decode_properties(%d, PropBin) ->" % (c.index) - print " %s = rabbit_binary_parser:parse_properties(%s, PropBin)," % \ - (fieldTempList(c.fields), fieldTypeList(c.fields)) + def presentBin(fields): + ps = ', '.join(['P' + str(f.index) + ':1' for f in fields]) + return '<<' + ps + ', _:%d, R0/binary>>' % (16 - len(fields),) + def mkMacroName(field): + return '?' + field.domain.upper() + '_PROP' + def writePropFieldLine(field, bin_next = None): + i = str(field.index) + if not bin_next: + bin_next = 'R' + str(field.index + 1) + if field.domain in ['octet', 'timestamp']: + print (" {%s, %s} = %s(%s, %s, %s, %s)," % + ('F' + i, bin_next, mkMacroName(field), 'P' + i, + 'R' + i, 'I' + i, 'X' + i)) + else: + print (" {%s, %s} = %s(%s, %s, %s, %s, %s)," % + ('F' + i, bin_next, mkMacroName(field), 'P' + i, + 'R' + i, 'L' + i, 'S' + i, 'X' + i)) + + if len(c.fields) == 0: + print "decode_properties(%d, _) ->" % (c.index,) + else: + print ("decode_properties(%d, %s) ->" % + (c.index, presentBin(c.fields))) + for field in c.fields[:-1]: + writePropFieldLine(field) + writePropFieldLine(c.fields[-1], "<<>>") print " #'P_%s'{%s};" % (erlangize(c.name), fieldMapList(c.fields)) def genFieldPreprocessing(packed): @@ -272,7 +296,7 @@ def genErl(spec): if mCls == 'SOFT_ERROR': genLookupException1(c,'false') elif mCls == 'HARD_ERROR': genLookupException1(c, 'true') elif mCls == '': pass - else: raise 'Unknown constant class', cls + else: raise Exception('Unknown constant class' + cls) def genLookupException1(c,hardErrorBoolStr): n = erlangConstantName(c) @@ -404,6 +428,27 @@ shortstr_size(S) -> Len when Len =< 255 -> Len; _ -> exit(method_field_shortstr_overflow) end. + +-define(SHORTSTR_PROP(P, R, L, S, X), + if P =:= 0 -> {undefined, R}; + true -> <<L:8/unsigned, S:L/binary, X/binary>> = R, + {S, X} + end). +-define(TABLE_PROP(P, R, L, T, X), + if P =:= 0 -> {undefined, R}; + true -> <<L:32/unsigned, T:L/binary, X/binary>> = R, + {rabbit_binary_parser:parse_table(T), X} + end). +-define(OCTET_PROP(P, R, I, X), + if P =:= 0 -> {undefined, R}; + true -> <<I:8/unsigned, X/binary>> = R, + {I, X} + end). +-define(TIMESTAMP_PROP(P, R, I, X), + if P =:= 0 -> {undefined, R}; + true -> <<I:64/unsigned, X/binary>> = R, + {I, X} + end). """ version = "{%d, %d, %d}" % (spec.major, spec.minor, spec.revision) if version == '{8, 0, 0}': version = '{0, 8, 0}' diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index 662a36c7..f21888bd 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -103,7 +103,7 @@ <variablelist> <varlistentry> - <term><cmdsynopsis><command>stop</command></cmdsynopsis></term> + <term><cmdsynopsis><command>stop</command> <arg choice="opt"><replaceable>pid_file</replaceable></arg></cmdsynopsis></term> <listitem> <para> Stops the Erlang node on which RabbitMQ is running. To @@ -111,6 +111,12 @@ the Server</citetitle> in the <ulink url="http://www.rabbitmq.com/install.html">installation guide</ulink>. </para> + <para> + If a <option>pid_file</option> is specified, also waits + for the process specified there to terminate. See the + description of the <option>wait</option> command below + for details on this file. + </para> <para role="example-prefix">For example:</para> <screen role="example">rabbitmqctl stop</screen> <para role="example"> diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in index 65a3269a..5ead1051 100644 --- a/ebin/rabbit_app.in +++ b/ebin/rabbit_app.in @@ -14,7 +14,8 @@ %% we also depend on crypto, public_key and ssl but they shouldn't be %% in here as we don't actually want to start it {mod, {rabbit, []}}, - {env, [{tcp_listeners, [5672]}, + {env, [{hipe_compile, false}, + {tcp_listeners, [5672]}, {ssl_listeners, []}, {ssl_options, []}, {vm_memory_high_watermark, 0.4}, diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl index 20fe4234..4a657951 100644 --- a/include/rabbit_backing_queue_spec.hrl +++ b/include/rabbit_backing_queue_spec.hrl @@ -22,9 +22,6 @@ -type(attempt_recovery() :: boolean()). -type(purged_msg_count() :: non_neg_integer()). -type(confirm_required() :: boolean()). --type(message_properties_transformer() :: - fun ((rabbit_types:message_properties()) - -> rabbit_types:message_properties())). -type(async_callback() :: fun ((atom(), fun ((atom(), state()) -> state())) -> 'ok')). -type(duration() :: ('undefined' | 'infinity' | number())). @@ -51,7 +48,7 @@ -spec(fetch/2 :: (true, state()) -> {fetch_result(ack()), state()}; (false, state()) -> {fetch_result(undefined), state()}). -spec(ack/2 :: ([ack()], state()) -> {[rabbit_guid:guid()], state()}). --spec(requeue/3 :: ([ack()], message_properties_transformer(), state()) +-spec(requeue/2 :: ([ack()], state()) -> {[rabbit_guid:guid()], state()}). -spec(len/1 :: (state()) -> non_neg_integer()). -spec(is_empty/1 :: (state()) -> boolean()). diff --git a/packaging/common/rabbitmq-server.init b/packaging/common/rabbitmq-server.init index 15fd5d5b..4084d8c7 100644 --- a/packaging/common/rabbitmq-server.init +++ b/packaging/common/rabbitmq-server.init @@ -56,8 +56,10 @@ start_rabbitmq () { RETVAL=0 ensure_pid_dir set +e - setsid sh -c "RABBITMQ_PID_FILE=$PID_FILE $DAEMON > \ - ${INIT_LOG_DIR}/startup_log 2> ${INIT_LOG_DIR}/startup_err" & + RABBITMQ_PID_FILE=$PID_FILE setsid $DAEMON \ + > "${INIT_LOG_DIR}/startup_log" \ + 2> "${INIT_LOG_DIR}/startup_err" \ + 0<&- & $CONTROL wait $PID_FILE >/dev/null 2>&1 RETVAL=$? set -e @@ -81,7 +83,7 @@ stop_rabbitmq () { status_rabbitmq quiet if [ $RETVAL = 0 ] ; then set +e - $CONTROL stop > ${INIT_LOG_DIR}/shutdown_log 2> ${INIT_LOG_DIR}/shutdown_err + $CONTROL stop ${PID_FILE} > ${INIT_LOG_DIR}/shutdown_log 2> ${INIT_LOG_DIR}/shutdown_err RETVAL=$? set -e if [ $RETVAL = 0 ] ; then @@ -6,15 +6,16 @@ %% A helper to test quickcheck properties on a running broker %% NodeStr is a local broker node name %% ModStr is the module containing quickcheck properties -%% The number of trials is optional -main([NodeStr, ModStr | TrialsStr]) -> +%% TrialsStr is the number of trials +main([NodeStr, ModStr, TrialsStr]) -> {ok, Hostname} = inet:gethostname(), Node = list_to_atom(NodeStr ++ "@" ++ Hostname), Mod = list_to_atom(ModStr), - Trials = lists:map(fun erlang:list_to_integer/1, TrialsStr), + Trials = erlang:list_to_integer(TrialsStr), case rpc:call(Node, code, ensure_loaded, [proper]) of {module, proper} -> - case rpc:call(Node, proper, module, [Mod] ++ Trials) of + case rpc:call(Node, proper, module, + [Mod] ++ [[{numtests, Trials}, {constraint_tries, 200}]]) of [] -> ok; _ -> quit(1) end; diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat index 0a78794f..c27b418a 100755 --- a/scripts/rabbitmq-server.bat +++ b/scripts/rabbitmq-server.bat @@ -57,13 +57,11 @@ if not exist "!ERLANG_HOME!\bin\erl.exe" ( exit /B
)
-set RABBITMQ_BASE_UNIX=!RABBITMQ_BASE:\=/!
-
if "!RABBITMQ_MNESIA_BASE!"=="" (
- set RABBITMQ_MNESIA_BASE=!RABBITMQ_BASE_UNIX!/db
+ set RABBITMQ_MNESIA_BASE=!RABBITMQ_BASE!/db
)
if "!RABBITMQ_LOG_BASE!"=="" (
- set RABBITMQ_LOG_BASE=!RABBITMQ_BASE_UNIX!/log
+ set RABBITMQ_LOG_BASE=!RABBITMQ_BASE!/log
)
@@ -96,7 +94,7 @@ set RABBITMQ_EBIN_ROOT=!TDP0!..\ebin -noinput -hidden ^
-s rabbit_prelaunch ^
-sname rabbitmqprelaunch!RANDOM! ^
--extra "!RABBITMQ_ENABLED_PLUGINS_FILE!" ^
+-extra "!RABBITMQ_ENABLED_PLUGINS_FILE:\=/!" ^
"!RABBITMQ_PLUGINS_DIR:\=/!" ^
"!RABBITMQ_PLUGINS_EXPAND_DIR:\=/!" ^
"!RABBITMQ_NODENAME!"
@@ -144,7 +142,7 @@ if not "!RABBITMQ_NODE_IP_ADDRESS!"=="" ( -os_mon start_cpu_sup true ^
-os_mon start_disksup false ^
-os_mon start_memsup false ^
--mnesia dir \""!RABBITMQ_MNESIA_DIR!"\" ^
+-mnesia dir \""!RABBITMQ_MNESIA_DIR:\=/!"\" ^
!RABBITMQ_SERVER_START_ARGS! ^
!STAR!
diff --git a/scripts/rabbitmq-service.bat b/scripts/rabbitmq-service.bat index e671ba7a..4d3fce49 100755 --- a/scripts/rabbitmq-service.bat +++ b/scripts/rabbitmq-service.bat @@ -80,7 +80,7 @@ rem *** End of configuration *** if not exist "!ERLANG_SERVICE_MANAGER_PATH!\erlsrv.exe" (
echo.
echo **********************************************
- echo ERLANG_SERVICE_MANAGER_PATH not set correctly.
+ echo ERLANG_SERVICE_MANAGER_PATH not set correctly.
echo **********************************************
echo.
echo "!ERLANG_SERVICE_MANAGER_PATH!\erlsrv.exe" not found
@@ -89,14 +89,11 @@ if not exist "!ERLANG_SERVICE_MANAGER_PATH!\erlsrv.exe" ( exit /B 1
)
-rem erlang prefers forwardslash as separator in paths
-set RABBITMQ_BASE_UNIX=!RABBITMQ_BASE:\=/!
-
if "!RABBITMQ_MNESIA_BASE!"=="" (
- set RABBITMQ_MNESIA_BASE=!RABBITMQ_BASE_UNIX!/db
+ set RABBITMQ_MNESIA_BASE=!RABBITMQ_BASE!/db
)
if "!RABBITMQ_LOG_BASE!"=="" (
- set RABBITMQ_LOG_BASE=!RABBITMQ_BASE_UNIX!/log
+ set RABBITMQ_LOG_BASE=!RABBITMQ_BASE!/log
)
@@ -118,7 +115,7 @@ if "!RABBITMQ_PLUGINS_EXPAND_DIR!"=="" ( )
if "!P1!" == "install" goto INSTALL_SERVICE
-for %%i in (start stop disable enable list remove) do if "%%i" == "!P1!" goto MODIFY_SERVICE
+for %%i in (start stop disable enable list remove) do if "%%i" == "!P1!" goto MODIFY_SERVICE
echo.
echo *********************
@@ -129,7 +126,7 @@ echo !TN0! help - Display this help echo !TN0! install - Install the !RABBITMQ_SERVICENAME! service
echo !TN0! remove - Remove the !RABBITMQ_SERVICENAME! service
echo.
-echo The following actions can also be accomplished by using
+echo The following actions can also be accomplished by using
echo Windows Services Management Console (services.msc):
echo.
echo !TN0! start - Start the !RABBITMQ_SERVICENAME! service
@@ -143,7 +140,7 @@ exit /B :INSTALL_SERVICE
if not exist "!RABBITMQ_BASE!" (
- echo Creating base directory !RABBITMQ_BASE! & md "!RABBITMQ_BASE!"
+ echo Creating base directory !RABBITMQ_BASE! & md "!RABBITMQ_BASE!"
)
"!ERLANG_SERVICE_MANAGER_PATH!\erlsrv" list !RABBITMQ_SERVICENAME! 2>NUL 1>NUL
@@ -164,7 +161,7 @@ set RABBITMQ_EBIN_ROOT=!TDP0!..\ebin -pa "!RABBITMQ_EBIN_ROOT!" ^
-noinput -hidden ^
-s rabbit_prelaunch ^
--extra "!RABBITMQ_ENABLED_PLUGINS_FILE!" ^
+-extra "!RABBITMQ_ENABLED_PLUGINS_FILE:\=/!" ^
"!RABBITMQ_PLUGINS_DIR:\=/!" ^
"!RABBITMQ_PLUGINS_EXPAND_DIR:\=/!" ^
""
@@ -179,7 +176,7 @@ set RABBITMQ_EBIN_PATH= if "!RABBITMQ_CONFIG_FILE!"=="" (
set RABBITMQ_CONFIG_FILE=!RABBITMQ_BASE!\rabbitmq
)
-
+
if exist "!RABBITMQ_CONFIG_FILE!.config" (
set RABBITMQ_CONFIG_ARG=-config "!RABBITMQ_CONFIG_FILE!"
) else (
@@ -210,7 +207,7 @@ set ERLANG_SERVICE_ARGUMENTS= ^ -os_mon start_cpu_sup true ^
-os_mon start_disksup false ^
-os_mon start_memsup false ^
--mnesia dir \""!RABBITMQ_MNESIA_DIR!"\" ^
+-mnesia dir \""!RABBITMQ_MNESIA_DIR:\=/!"\" ^
!RABBITMQ_SERVER_START_ARGS! ^
!STAR!
@@ -219,7 +216,7 @@ set ERLANG_SERVICE_ARGUMENTS=!ERLANG_SERVICE_ARGUMENTS:"=\"! "!ERLANG_SERVICE_MANAGER_PATH!\erlsrv" set !RABBITMQ_SERVICENAME! ^
-machine "!ERLANG_SERVICE_MANAGER_PATH!\erl.exe" ^
--env ERL_CRASH_DUMP="!RABBITMQ_BASE_UNIX!/erl_crash.dump" ^
+-env ERL_CRASH_DUMP="!RABBITMQ_BASE:\=/!/erl_crash.dump" ^
-workdir "!RABBITMQ_BASE!" ^
-stopaction "rabbit:stop_and_halt()." ^
-sname !RABBITMQ_NODENAME! ^
diff --git a/src/bpqueue.erl b/src/bpqueue.erl deleted file mode 100644 index 71a34262..00000000 --- a/src/bpqueue.erl +++ /dev/null @@ -1,271 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License -%% at http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and -%% limitations under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. -%% - --module(bpqueue). - -%% Block-prefixed queue. From the perspective of the queue interface -%% the datastructure acts like a regular queue where each value is -%% paired with the prefix. -%% -%% This is implemented as a queue of queues, which is more space and -%% time efficient, whilst supporting the normal queue interface. Each -%% inner queue has a prefix, which does not need to be unique, and it -%% is guaranteed that no two consecutive blocks have the same -%% prefix. len/1 returns the flattened length of the queue and is -%% O(1). - --export([new/0, is_empty/1, len/1, in/3, in_r/3, out/1, out_r/1, join/2, - foldl/3, foldr/3, from_list/1, to_list/1, map_fold_filter_l/4, - map_fold_filter_r/4]). - -%%---------------------------------------------------------------------------- - --ifdef(use_specs). - --export_type([bpqueue/0]). - --type(bpqueue() :: {non_neg_integer(), queue()}). --type(prefix() :: any()). --type(value() :: any()). --type(result() :: ({'empty', bpqueue()} | - {{'value', prefix(), value()}, bpqueue()})). - --spec(new/0 :: () -> bpqueue()). --spec(is_empty/1 :: (bpqueue()) -> boolean()). --spec(len/1 :: (bpqueue()) -> non_neg_integer()). --spec(in/3 :: (prefix(), value(), bpqueue()) -> bpqueue()). --spec(in_r/3 :: (prefix(), value(), bpqueue()) -> bpqueue()). --spec(out/1 :: (bpqueue()) -> result()). --spec(out_r/1 :: (bpqueue()) -> result()). --spec(join/2 :: (bpqueue(), bpqueue()) -> bpqueue()). --spec(foldl/3 :: (fun ((prefix(), value(), B) -> B), B, bpqueue()) -> B). --spec(foldr/3 :: (fun ((prefix(), value(), B) -> B), B, bpqueue()) -> B). --spec(from_list/1 :: ([{prefix(), [value()]}]) -> bpqueue()). --spec(to_list/1 :: (bpqueue()) -> [{prefix(), [value()]}]). --spec(map_fold_filter_l/4 :: ((fun ((prefix()) -> boolean())), - (fun ((value(), B) -> - ({prefix(), value(), B} | 'stop'))), - B, - bpqueue()) -> - {bpqueue(), B}). --spec(map_fold_filter_r/4 :: ((fun ((prefix()) -> boolean())), - (fun ((value(), B) -> - ({prefix(), value(), B} | 'stop'))), - B, - bpqueue()) -> - {bpqueue(), B}). - --endif. - -%%---------------------------------------------------------------------------- - -new() -> {0, queue:new()}. - -is_empty({0, _Q}) -> true; -is_empty(_BPQ) -> false. - -len({N, _Q}) -> N. - -in(Prefix, Value, {0, Q}) -> - {1, queue:in({Prefix, queue:from_list([Value])}, Q)}; -in(Prefix, Value, BPQ) -> - in1({fun queue:in/2, fun queue:out_r/1}, Prefix, Value, BPQ). - -in_r(Prefix, Value, BPQ = {0, _Q}) -> - in(Prefix, Value, BPQ); -in_r(Prefix, Value, BPQ) -> - in1({fun queue:in_r/2, fun queue:out/1}, Prefix, Value, BPQ). - -in1({In, Out}, Prefix, Value, {N, Q}) -> - {N+1, case Out(Q) of - {{value, {Prefix, InnerQ}}, Q1} -> - In({Prefix, In(Value, InnerQ)}, Q1); - {{value, {_Prefix, _InnerQ}}, _Q1} -> - In({Prefix, queue:in(Value, queue:new())}, Q) - end}. - -in_q(Prefix, Queue, BPQ = {0, Q}) -> - case queue:len(Queue) of - 0 -> BPQ; - N -> {N, queue:in({Prefix, Queue}, Q)} - end; -in_q(Prefix, Queue, BPQ) -> - in_q1({fun queue:in/2, fun queue:out_r/1, - fun queue:join/2}, - Prefix, Queue, BPQ). - -in_q_r(Prefix, Queue, BPQ = {0, _Q}) -> - in_q(Prefix, Queue, BPQ); -in_q_r(Prefix, Queue, BPQ) -> - in_q1({fun queue:in_r/2, fun queue:out/1, - fun (T, H) -> queue:join(H, T) end}, - Prefix, Queue, BPQ). - -in_q1({In, Out, Join}, Prefix, Queue, BPQ = {N, Q}) -> - case queue:len(Queue) of - 0 -> BPQ; - M -> {N + M, case Out(Q) of - {{value, {Prefix, InnerQ}}, Q1} -> - In({Prefix, Join(InnerQ, Queue)}, Q1); - {{value, {_Prefix, _InnerQ}}, _Q1} -> - In({Prefix, Queue}, Q) - end} - end. - -out({0, _Q} = BPQ) -> {empty, BPQ}; -out(BPQ) -> out1({fun queue:in_r/2, fun queue:out/1}, BPQ). - -out_r({0, _Q} = BPQ) -> {empty, BPQ}; -out_r(BPQ) -> out1({fun queue:in/2, fun queue:out_r/1}, BPQ). - -out1({In, Out}, {N, Q}) -> - {{value, {Prefix, InnerQ}}, Q1} = Out(Q), - {{value, Value}, InnerQ1} = Out(InnerQ), - Q2 = case queue:is_empty(InnerQ1) of - true -> Q1; - false -> In({Prefix, InnerQ1}, Q1) - end, - {{value, Prefix, Value}, {N-1, Q2}}. - -join({0, _Q}, BPQ) -> - BPQ; -join(BPQ, {0, _Q}) -> - BPQ; -join({NHead, QHead}, {NTail, QTail}) -> - {{value, {Prefix, InnerQHead}}, QHead1} = queue:out_r(QHead), - {NHead + NTail, - case queue:out(QTail) of - {{value, {Prefix, InnerQTail}}, QTail1} -> - queue:join( - queue:in({Prefix, queue:join(InnerQHead, InnerQTail)}, QHead1), - QTail1); - {{value, {_Prefix, _InnerQTail}}, _QTail1} -> - queue:join(QHead, QTail) - end}. - -foldl(_Fun, Init, {0, _Q}) -> Init; -foldl( Fun, Init, {_N, Q}) -> fold1(fun queue:out/1, Fun, Init, Q). - -foldr(_Fun, Init, {0, _Q}) -> Init; -foldr( Fun, Init, {_N, Q}) -> fold1(fun queue:out_r/1, Fun, Init, Q). - -fold1(Out, Fun, Init, Q) -> - case Out(Q) of - {empty, _Q} -> - Init; - {{value, {Prefix, InnerQ}}, Q1} -> - fold1(Out, Fun, fold1(Out, Fun, Prefix, Init, InnerQ), Q1) - end. - -fold1(Out, Fun, Prefix, Init, InnerQ) -> - case Out(InnerQ) of - {empty, _Q} -> - Init; - {{value, Value}, InnerQ1} -> - fold1(Out, Fun, Prefix, Fun(Prefix, Value, Init), InnerQ1) - end. - -from_list(List) -> - {FinalPrefix, FinalInnerQ, ListOfPQs1, Len} = - lists:foldl( - fun ({_Prefix, []}, Acc) -> - Acc; - ({Prefix, InnerList}, {Prefix, InnerQ, ListOfPQs, LenAcc}) -> - {Prefix, queue:join(InnerQ, queue:from_list(InnerList)), - ListOfPQs, LenAcc + length(InnerList)}; - ({Prefix1, InnerList}, {Prefix, InnerQ, ListOfPQs, LenAcc}) -> - {Prefix1, queue:from_list(InnerList), - [{Prefix, InnerQ} | ListOfPQs], LenAcc + length(InnerList)} - end, {undefined, queue:new(), [], 0}, List), - ListOfPQs2 = [{FinalPrefix, FinalInnerQ} | ListOfPQs1], - [{undefined, InnerQ1} | Rest] = All = lists:reverse(ListOfPQs2), - {Len, queue:from_list(case queue:is_empty(InnerQ1) of - true -> Rest; - false -> All - end)}. - -to_list({0, _Q}) -> []; -to_list({_N, Q}) -> [{Prefix, queue:to_list(InnerQ)} || - {Prefix, InnerQ} <- queue:to_list(Q)]. - -%% map_fold_filter_[lr](FilterFun, Fun, Init, BPQ) -> {BPQ, Init} -%% where FilterFun(Prefix) -> boolean() -%% Fun(Value, Init) -> {Prefix, Value, Init} | stop -%% -%% The filter fun allows you to skip very quickly over blocks that -%% you're not interested in. Such blocks appear in the resulting bpq -%% without modification. The Fun is then used both to map the value, -%% which also allows you to change the prefix (and thus block) of the -%% value, and also to modify the Init/Acc (just like a fold). If the -%% Fun returns 'stop' then it is not applied to any further items. -map_fold_filter_l(_PFilter, _Fun, Init, BPQ = {0, _Q}) -> - {BPQ, Init}; -map_fold_filter_l(PFilter, Fun, Init, {N, Q}) -> - map_fold_filter1({fun queue:out/1, fun queue:in/2, - fun in_q/3, fun join/2}, - N, PFilter, Fun, Init, Q, new()). - -map_fold_filter_r(_PFilter, _Fun, Init, BPQ = {0, _Q}) -> - {BPQ, Init}; -map_fold_filter_r(PFilter, Fun, Init, {N, Q}) -> - map_fold_filter1({fun queue:out_r/1, fun queue:in_r/2, - fun in_q_r/3, fun (T, H) -> join(H, T) end}, - N, PFilter, Fun, Init, Q, new()). - -map_fold_filter1(Funs = {Out, _In, InQ, Join}, Len, PFilter, Fun, - Init, Q, QNew) -> - case Out(Q) of - {empty, _Q} -> - {QNew, Init}; - {{value, {Prefix, InnerQ}}, Q1} -> - case PFilter(Prefix) of - true -> - {Init1, QNew1, Cont} = - map_fold_filter2(Funs, Fun, Prefix, Prefix, - Init, InnerQ, QNew, queue:new()), - case Cont of - false -> {Join(QNew1, {Len - len(QNew1), Q1}), Init1}; - true -> map_fold_filter1(Funs, Len, PFilter, Fun, - Init1, Q1, QNew1) - end; - false -> - map_fold_filter1(Funs, Len, PFilter, Fun, - Init, Q1, InQ(Prefix, InnerQ, QNew)) - end - end. - -map_fold_filter2(Funs = {Out, In, InQ, _Join}, Fun, OrigPrefix, Prefix, - Init, InnerQ, QNew, InnerQNew) -> - case Out(InnerQ) of - {empty, _Q} -> - {Init, InQ(OrigPrefix, InnerQ, - InQ(Prefix, InnerQNew, QNew)), true}; - {{value, Value}, InnerQ1} -> - case Fun(Value, Init) of - stop -> - {Init, InQ(OrigPrefix, InnerQ, - InQ(Prefix, InnerQNew, QNew)), false}; - {Prefix1, Value1, Init1} -> - {Prefix2, QNew1, InnerQNew1} = - case Prefix1 =:= Prefix of - true -> {Prefix, QNew, In(Value1, InnerQNew)}; - false -> {Prefix1, InQ(Prefix, InnerQNew, QNew), - In(Value1, queue:new())} - end, - map_fold_filter2(Funs, Fun, OrigPrefix, Prefix2, - Init1, InnerQ1, QNew1, InnerQNew1) - end - end. diff --git a/src/lqueue.erl b/src/lqueue.erl new file mode 100644 index 00000000..04b40706 --- /dev/null +++ b/src/lqueue.erl @@ -0,0 +1,90 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2011-2011 VMware, Inc. All rights reserved. +%% + +-module(lqueue). + +-export([new/0, is_empty/1, len/1, in/2, in_r/2, out/1, out_r/1, join/2, + foldl/3, foldr/3, from_list/1, to_list/1, peek/1, peek_r/1]). + +-define(QUEUE, queue). + +-ifdef(use_specs). + +-export_type([?MODULE/0]). + +-opaque(?MODULE() :: {non_neg_integer(), ?QUEUE()}). +-type(value() :: any()). +-type(result() :: 'empty' | {'value', value()}). + +-spec(new/0 :: () -> ?MODULE()). +-spec(is_empty/1 :: (?MODULE()) -> boolean()). +-spec(len/1 :: (?MODULE()) -> non_neg_integer()). +-spec(in/2 :: (value(), ?MODULE()) -> ?MODULE()). +-spec(in_r/2 :: (value(), ?MODULE()) -> ?MODULE()). +-spec(out/1 :: (?MODULE()) -> {result(), ?MODULE()}). +-spec(out_r/1 :: (?MODULE()) -> {result(), ?MODULE()}). +-spec(join/2 :: (?MODULE(), ?MODULE()) -> ?MODULE()). +-spec(foldl/3 :: (fun ((value(), B) -> B), B, ?MODULE()) -> B). +-spec(foldr/3 :: (fun ((value(), B) -> B), B, ?MODULE()) -> B). +-spec(from_list/1 :: ([value()]) -> ?MODULE()). +-spec(to_list/1 :: (?MODULE()) -> [value()]). +-spec(peek/1 :: (?MODULE()) -> result()). +-spec(peek_r/1 :: (?MODULE()) -> result()). + +-endif. + +new() -> {0, ?QUEUE:new()}. + +is_empty({0, _Q}) -> true; +is_empty(_) -> false. + +in(V, {L, Q}) -> {L+1, ?QUEUE:in(V, Q)}. + +in_r(V, {L, Q}) -> {L+1, ?QUEUE:in_r(V, Q)}. + +out({0, _Q} = Q) -> {empty, Q}; +out({L, Q}) -> {Result, Q1} = ?QUEUE:out(Q), + {Result, {L-1, Q1}}. + +out_r({0, _Q} = Q) -> {empty, Q}; +out_r({L, Q}) -> {Result, Q1} = ?QUEUE:out_r(Q), + {Result, {L-1, Q1}}. + +join({L1, Q1}, {L2, Q2}) -> {L1 + L2, ?QUEUE:join(Q1, Q2)}. + +to_list({_L, Q}) -> ?QUEUE:to_list(Q). + +from_list(L) -> {length(L), ?QUEUE:from_list(L)}. + +foldl(Fun, Init, Q) -> + case out(Q) of + {empty, _Q} -> Init; + {{value, V}, Q1} -> foldl(Fun, Fun(V, Init), Q1) + end. + +foldr(Fun, Init, Q) -> + case out_r(Q) of + {empty, _Q} -> Init; + {{value, V}, Q1} -> foldr(Fun, Fun(V, Init), Q1) + end. + +len({L, _Q}) -> L. + +peek({ 0, _Q}) -> empty; +peek({_L, Q}) -> ?QUEUE:peek(Q). + +peek_r({ 0, _Q}) -> empty; +peek_r({_L, Q}) -> ?QUEUE:peek_r(Q). diff --git a/src/mirrored_supervisor_tests.erl b/src/mirrored_supervisor_tests.erl index ee9c7593..0900f56f 100644 --- a/src/mirrored_supervisor_tests.erl +++ b/src/mirrored_supervisor_tests.erl @@ -202,6 +202,7 @@ with_sups(Fun, Sups) -> Pids = [begin {ok, Pid} = start_sup(Sup), Pid end || Sup <- Sups], Fun(Pids), [kill(Pid) || Pid <- Pids, is_process_alive(Pid)], + timer:sleep(500), passed. start_sup(Spec) -> diff --git a/src/rabbit.erl b/src/rabbit.erl index e98ca9be..0a2681a2 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -18,8 +18,8 @@ -behaviour(application). --export([prepare/0, start/0, stop/0, stop_and_halt/0, status/0, - is_running/0 , is_running/1, environment/0, +-export([maybe_hipe_compile/0, prepare/0, start/0, stop/0, stop_and_halt/0, + status/0, is_running/0, is_running/1, environment/0, rotate_logs/1, force_event_refresh/0]). -export([start/2, stop/1]). @@ -177,6 +177,27 @@ -define(APPS, [os_mon, mnesia, rabbit]). +%% see bug 24513 for how this list was created +-define(HIPE_WORTHY, + [rabbit_reader, rabbit_channel, gen_server2, + rabbit_exchange, rabbit_command_assembler, rabbit_framing_amqp_0_9_1, + rabbit_basic, rabbit_event, lists, queue, priority_queue, + rabbit_router, rabbit_trace, rabbit_misc, rabbit_binary_parser, + rabbit_exchange_type_direct, rabbit_guid, rabbit_net, + rabbit_amqqueue_process, rabbit_variable_queue, + rabbit_binary_generator, rabbit_writer, delegate, gb_sets, lqueue, + sets, orddict, rabbit_amqqueue, rabbit_limiter, gb_trees, + rabbit_queue_index, gen, dict, ordsets, file_handle_cache, + rabbit_msg_store, array, rabbit_msg_store_ets_index, rabbit_msg_file, + rabbit_exchange_type_fanout, rabbit_exchange_type_topic, mnesia, + mnesia_lib, rpc, mnesia_tm, qlc, sofs, proplists]). + +%% HiPE compilation uses multiple cores anyway, but some bits are +%% IO-bound so we can go faster if we parallelise a bit more. In +%% practice 2 processes seems just as fast as any other number > 1, +%% and keeps the progress bar realistic-ish. +-define(HIPE_PROCESSES, 2). + %%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -185,6 +206,7 @@ %% this really should be an abstract type -type(log_location() :: 'tty' | 'undefined' | file:filename()). +-spec(maybe_hipe_compile/0 :: () -> 'ok'). -spec(prepare/0 :: () -> 'ok'). -spec(start/0 :: () -> 'ok'). -spec(stop/0 :: () -> 'ok'). @@ -218,12 +240,53 @@ %%---------------------------------------------------------------------------- +maybe_hipe_compile() -> + {ok, Want} = application:get_env(rabbit, hipe_compile), + Can = code:which(hipe) =/= non_existing, + case {Want, Can} of + {true, true} -> hipe_compile(); + {true, false} -> io:format("Not HiPE compiling: HiPE not found in " + "this Erlang installation.~n"); + {false, _} -> ok + end. + +hipe_compile() -> + Count = length(?HIPE_WORTHY), + io:format("HiPE compiling: |~s|~n |", + [string:copies("-", Count)]), + T1 = erlang:now(), + PidMRefs = [spawn_monitor(fun () -> [begin + {ok, M} = hipe:c(M, [o3]), + io:format("#") + end || M <- Ms] + end) || + Ms <- split(?HIPE_WORTHY, ?HIPE_PROCESSES)], + [receive + {'DOWN', MRef, process, _, normal} -> ok; + {'DOWN', MRef, process, _, Reason} -> exit(Reason) + end || {_Pid, MRef} <- PidMRefs], + T2 = erlang:now(), + io:format("|~n~nCompiled ~B modules in ~Bs~n", + [Count, timer:now_diff(T2, T1) div 1000000]). + +split(L, N) -> split0(L, [[] || _ <- lists:seq(1, N)]). + +split0([], Ls) -> Ls; +split0([I | Is], [L | Ls]) -> split0(Is, Ls ++ [[I | L]]). + prepare() -> ok = ensure_working_log_handlers(), ok = rabbit_upgrade:maybe_upgrade_mnesia(). start() -> try + %% prepare/1 ends up looking at the rabbit app's env, so it + %% needs to be loaded, but during the tests, it may end up + %% getting loaded twice, so guard against that + case application:load(rabbit) of + ok -> ok; + {error, {already_loaded, rabbit}} -> ok + end, ok = prepare(), ok = rabbit_misc:start_applications(application_load_order()) after diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl index d38ecb91..fd03ca85 100644 --- a/src/rabbit_alarm.erl +++ b/src/rabbit_alarm.erl @@ -45,11 +45,7 @@ start() -> ok = alarm_handler:add_alarm_handler(?MODULE, []), {ok, MemoryWatermark} = application:get_env(vm_memory_high_watermark), - ok = case MemoryWatermark == 0 of - true -> ok; - false -> rabbit_sup:start_restartable_child(vm_memory_monitor, - [MemoryWatermark]) - end, + rabbit_sup:start_restartable_child(vm_memory_monitor, [MemoryWatermark]), ok. stop() -> diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 46f6674b..ba20b355 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -129,7 +129,7 @@ init(Q) -> rate_timer_ref = undefined, expiry_timer_ref = undefined, ttl = undefined, - msg_id_to_channel = dict:new()}, + msg_id_to_channel = gb_trees:empty()}, {ok, rabbit_event:init_stats_timer(State, #q.stats_timer), hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -454,11 +454,11 @@ confirm_messages(MsgIds, State = #q{msg_id_to_channel = MTC}) -> {CMs, MTC1} = lists:foldl( fun(MsgId, {CMs, MTC0}) -> - case dict:find(MsgId, MTC0) of - {ok, {ChPid, MsgSeqNo}} -> + case gb_trees:lookup(MsgId, MTC0) of + {value, {ChPid, MsgSeqNo}} -> {rabbit_misc:gb_trees_cons(ChPid, MsgSeqNo, CMs), - dict:erase(MsgId, MTC0)}; - _ -> + gb_trees:delete(MsgId, MTC0)}; + none -> {CMs, MTC0} end end, {gb_trees:empty(), MTC}, MsgIds), @@ -482,7 +482,7 @@ needs_confirming(_) -> false. maybe_record_confirm_message({eventually, ChPid, MsgSeqNo, MsgId}, State = #q{msg_id_to_channel = MTC}) -> - State#q{msg_id_to_channel = dict:store(MsgId, {ChPid, MsgSeqNo}, MTC)}; + State#q{msg_id_to_channel = gb_trees:insert(MsgId, {ChPid, MsgSeqNo}, MTC)}; maybe_record_confirm_message(_Confirm, State) -> State. @@ -551,13 +551,11 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, ensure_ttl_timer(State2#q{backing_queue_state = BQS1}) end. -requeue_and_run(AckTags, State = #q{backing_queue = BQ, ttl=TTL}) -> - run_backing_queue( - BQ, fun (M, BQS) -> - {_MsgIds, BQS1} = - M:requeue(AckTags, reset_msg_expiry_fun(TTL), BQS), - BQS1 - end, State). +requeue_and_run(AckTags, State = #q{backing_queue = BQ}) -> + run_backing_queue(BQ, fun (M, BQS) -> + {_MsgIds, BQS1} = M:requeue(AckTags, BQS), + BQS1 + end, State). fetch(AckRequired, State = #q{backing_queue_state = BQS, backing_queue = BQ}) -> @@ -670,11 +668,6 @@ discard_delivery(#delivery{sender = ChPid, backing_queue_state = BQS}) -> State#q{backing_queue_state = BQ:discard(Message, ChPid, BQS)}. -reset_msg_expiry_fun(TTL) -> - fun(MsgProps) -> - MsgProps#message_properties{expiry = calculate_msg_expiry(TTL)} - end. - message_properties(#q{ttl=TTL}) -> #message_properties{expiry = calculate_msg_expiry(TTL)}. diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 77278416..c3b322ee 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -107,7 +107,7 @@ behaviour_info(callbacks) -> %% Reinsert messages into the queue which have already been %% delivered and were pending acknowledgement. - {requeue, 3}, + {requeue, 2}, %% How long is my queue? {len, 1}, diff --git a/src/rabbit_backing_queue_qc.erl b/src/rabbit_backing_queue_qc.erl index 095202dd..c61184a6 100644 --- a/src/rabbit_backing_queue_qc.erl +++ b/src/rabbit_backing_queue_qc.erl @@ -34,14 +34,15 @@ -export([initial_state/0, command/1, precondition/2, postcondition/3, next_state/3]). --export([prop_backing_queue_test/0, publish_multiple/4, timeout/2]). +-export([prop_backing_queue_test/0, publish_multiple/1, timeout/2]). -record(state, {bqstate, len, %% int next_seq_id, %% int messages, %% gb_trees of seqid => {msg_props, basic_msg} acks, %% [{acktag, {seqid, {msg_props, basic_msg}}}] - confirms}). %% set of msgid + confirms, %% set of msgid + publishing}).%% int %% Initialise model @@ -51,7 +52,8 @@ initial_state() -> next_seq_id = 0, messages = gb_trees:empty(), acks = [], - confirms = gb_sets:new()}. + confirms = gb_sets:new(), + publishing = 0}. %% Property @@ -112,10 +114,8 @@ qc_publish(#state{bqstate = BQ}) -> expiry = oneof([undefined | lists:seq(1, 10)])}, self(), BQ]}. -qc_publish_multiple(#state{bqstate = BQ}) -> - {call, ?MODULE, publish_multiple, - [qc_message(), #message_properties{}, BQ, - resize(?QUEUE_MAXLEN, pos_integer())]}. +qc_publish_multiple(#state{}) -> + {call, ?MODULE, publish_multiple, [resize(?QUEUE_MAXLEN, pos_integer())]}. qc_publish_delivered(#state{bqstate = BQ}) -> {call, ?BQMOD, publish_delivered, @@ -128,8 +128,7 @@ qc_ack(#state{bqstate = BQ, acks = Acks}) -> {call, ?BQMOD, ack, [rand_choice(proplists:get_keys(Acks)), BQ]}. qc_requeue(#state{bqstate = BQ, acks = Acks}) -> - {call, ?BQMOD, requeue, - [rand_choice(proplists:get_keys(Acks)), fun(MsgOpts) -> MsgOpts end, BQ]}. + {call, ?BQMOD, requeue, [rand_choice(proplists:get_keys(Acks)), BQ]}. qc_set_ram_duration_target(#state{bqstate = BQ}) -> {call, ?BQMOD, set_ram_duration_target, @@ -155,6 +154,10 @@ qc_purge(#state{bqstate = BQ}) -> %% Preconditions +%% Create long queues by only allowing publishing +precondition(#state{publishing = Count}, {call, _Mod, Fun, _Arg}) + when Count > 0, Fun /= publish -> + false; precondition(#state{acks = Acks}, {call, ?BQMOD, Fun, _Arg}) when Fun =:= ack; Fun =:= requeue -> length(Acks) > 0; @@ -174,6 +177,7 @@ next_state(S, BQ, {call, ?BQMOD, publish, [Msg, MsgProps, _Pid, _BQ]}) -> #state{len = Len, messages = Messages, confirms = Confirms, + publishing = PublishCount, next_seq_id = NextSeq} = S, MsgId = {call, erlang, element, [?RECORD_INDEX(id, basic_message), Msg]}, NeedsConfirm = @@ -183,21 +187,15 @@ next_state(S, BQ, {call, ?BQMOD, publish, [Msg, MsgProps, _Pid, _BQ]}) -> len = Len + 1, next_seq_id = NextSeq + 1, messages = gb_trees:insert(NextSeq, {MsgProps, Msg}, Messages), + publishing = {call, erlang, max, [0, {call, erlang, '-', + [PublishCount, 1]}]}, confirms = case eval(NeedsConfirm) of true -> gb_sets:add(MsgId, Confirms); _ -> Confirms end}; -next_state(S, BQ, {call, _, publish_multiple, [Msg, MsgProps, _BQ, Count]}) -> - #state{len = Len, messages = Messages} = S, - {S1, Msgs1} = repeat({S, Messages}, - fun ({#state{next_seq_id = NextSeq} = State, Msgs}) -> - {State #state { next_seq_id = NextSeq + 1}, - gb_trees:insert(NextSeq, {MsgProps, Msg}, Msgs)} - end, Count), - S1#state{bqstate = BQ, - len = Len + Count, - messages = Msgs1}; +next_state(S, _BQ, {call, ?MODULE, publish_multiple, [PublishCount]}) -> + S#state{publishing = PublishCount}; next_state(S, Res, {call, ?BQMOD, publish_delivered, @@ -245,7 +243,7 @@ next_state(S, Res, {call, ?BQMOD, ack, [AcksArg, _BQ]}) -> S#state{bqstate = BQ1, acks = lists:foldl(fun proplists:delete/2, AcksState, AcksArg)}; -next_state(S, Res, {call, ?BQMOD, requeue, [AcksArg, _F, _V]}) -> +next_state(S, Res, {call, ?BQMOD, requeue, [AcksArg, _V]}) -> #state{messages = Messages, acks = AcksState} = S, BQ1 = {call, erlang, element, [2, Res]}, Messages1 = lists:foldl(fun (AckTag, Msgs) -> @@ -322,12 +320,8 @@ postcondition(#state{bqstate = BQ, len = Len}, {call, _M, _F, _A}, _Res) -> %% Helpers -repeat(Result, _Fun, 0) -> Result; -repeat(Result, Fun, Times) -> repeat(Fun(Result), Fun, Times - 1). - -publish_multiple(Msg, MsgProps, BQ, Count) -> - repeat(BQ, fun(BQ1) -> ?BQMOD:publish(Msg, MsgProps, self(), BQ1) end, - Count). +publish_multiple(_C) -> + ok. timeout(BQ, 0) -> BQ; diff --git a/src/rabbit_binary_parser.erl b/src/rabbit_binary_parser.erl index 88026bab..f3ca4e98 100644 --- a/src/rabbit_binary_parser.erl +++ b/src/rabbit_binary_parser.erl @@ -18,7 +18,7 @@ -include("rabbit.hrl"). --export([parse_table/1, parse_properties/2]). +-export([parse_table/1]). -export([ensure_content_decoded/1, clear_decoded_content/1]). %%---------------------------------------------------------------------------- @@ -26,8 +26,6 @@ -ifdef(use_specs). -spec(parse_table/1 :: (binary()) -> rabbit_framing:amqp_table()). --spec(parse_properties/2 :: - ([rabbit_framing:amqp_property_type()], binary()) -> [any()]). -spec(ensure_content_decoded/1 :: (rabbit_types:content()) -> rabbit_types:decoded_content()). -spec(clear_decoded_content/1 :: @@ -94,57 +92,6 @@ parse_field_value(<<"x", VLen:32/unsigned, ValueString:VLen/binary, Rest/binary> parse_field_value(<<"V", Rest/binary>>) -> {void, undefined, Rest}. - -parse_properties([], _PropBin) -> - []; -parse_properties(TypeList, PropBin) -> - FlagCount = length(TypeList), - %% round up to the nearest multiple of 15 bits, since the 16th bit - %% in each short is a "continuation" bit. - FlagsLengthBytes = trunc((FlagCount + 14) / 15) * 2, - <<Flags:FlagsLengthBytes/binary, Properties/binary>> = PropBin, - <<FirstShort:16, Remainder/binary>> = Flags, - parse_properties(0, TypeList, [], FirstShort, Remainder, Properties). - -parse_properties(_Bit, [], Acc, _FirstShort, - _Remainder, <<>>) -> - lists:reverse(Acc); -parse_properties(_Bit, [], _Acc, _FirstShort, - _Remainder, _LeftoverBin) -> - exit(content_properties_binary_overflow); -parse_properties(15, TypeList, Acc, _OldFirstShort, - <<NewFirstShort:16,Remainder/binary>>, Properties) -> - parse_properties(0, TypeList, Acc, NewFirstShort, Remainder, Properties); -parse_properties(Bit, [Type | TypeListRest], Acc, FirstShort, - Remainder, Properties) -> - {Value, Rest} = - if (FirstShort band (1 bsl (15 - Bit))) /= 0 -> - parse_property(Type, Properties); - Type == bit -> {false , Properties}; - true -> {undefined, Properties} - end, - parse_properties(Bit + 1, TypeListRest, [Value | Acc], FirstShort, - Remainder, Rest). - -parse_property(shortstr, <<Len:8/unsigned, String:Len/binary, Rest/binary>>) -> - {String, Rest}; -parse_property(longstr, <<Len:32/unsigned, String:Len/binary, Rest/binary>>) -> - {String, Rest}; -parse_property(octet, <<Int:8/unsigned, Rest/binary>>) -> - {Int, Rest}; -parse_property(shortint, <<Int:16/unsigned, Rest/binary>>) -> - {Int, Rest}; -parse_property(longint, <<Int:32/unsigned, Rest/binary>>) -> - {Int, Rest}; -parse_property(longlongint, <<Int:64/unsigned, Rest/binary>>) -> - {Int, Rest}; -parse_property(timestamp, <<Int:64/unsigned, Rest/binary>>) -> - {Int, Rest}; -parse_property(bit, Rest) -> - {true, Rest}; -parse_property(table, <<Len:32/unsigned, Table:Len/binary, Rest/binary>>) -> - {parse_table(Table), Rest}. - ensure_content_decoded(Content = #content{properties = Props}) when Props =/= none -> Content; diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 883e570a..9b2fe28c 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -1080,9 +1080,8 @@ handle_method(#'tx.rollback'{}, _, #ch{tx_status = none}) -> handle_method(#'tx.rollback'{}, _, State = #ch{unacked_message_q = UAMQ, uncommitted_acks = TAL}) -> - TAQ = queue:from_list(lists:reverse(TAL)), - {reply, #'tx.rollback_ok'{}, - new_tx(State#ch{unacked_message_q = queue:join(TAQ, UAMQ)})}; + UAMQ1 = queue:from_list(lists:usort(TAL ++ queue:to_list(UAMQ))), + {reply, #'tx.rollback_ok'{}, new_tx(State#ch{unacked_message_q = UAMQ1})}; handle_method(#'confirm.select'{}, _, #ch{tx_status = in_progress}) -> rabbit_misc:protocol_error( diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index 905e4fd0..fa8dd262 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -20,6 +20,7 @@ -export([start/0, stop/0, action/5, diagnostics/1]). -define(RPC_TIMEOUT, infinity). +-define(EXTERNAL_CHECK_INTERVAL, 1000). -define(QUIET_OPT, "-q"). -define(NODE_OPT, "-n"). @@ -161,9 +162,16 @@ usage() -> %%---------------------------------------------------------------------------- -action(stop, Node, [], _Opts, Inform) -> +action(stop, Node, Args, _Opts, Inform) -> Inform("Stopping and halting node ~p", [Node]), - call(Node, {rabbit, stop_and_halt, []}); + Res = call(Node, {rabbit, stop_and_halt, []}), + case {Res, Args} of + {ok, [PidFile]} -> wait_for_process_death( + read_pid_file(PidFile, false)); + {ok, [_, _| _]} -> exit({badarg, Args}); + _ -> ok + end, + Res; action(stop_app, Node, [], _Opts, Inform) -> Inform("Stopping node ~p", [Node]), @@ -325,7 +333,10 @@ action(trace_off, Node, [], Opts, Inform) -> rpc_call(Node, rabbit_trace, stop, [list_to_binary(VHost)]); action(set_vm_memory_high_watermark, Node, [Arg], _Opts, Inform) -> - Frac = list_to_float(Arg), + Frac = list_to_float(case string:chr(Arg, $.) of + 0 -> Arg ++ ".0"; + _ -> Arg + end), Inform("Setting memory threshhold on ~p to ~p", [Node, Frac]), rpc_call(Node, vm_memory_monitor, set_vm_memory_high_watermark, [Frac]); @@ -362,7 +373,7 @@ action(report, Node, _Args, _Opts, Inform) -> %%---------------------------------------------------------------------------- wait_for_application(Node, PidFile, Inform) -> - Pid = wait_and_read_pid_file(PidFile), + Pid = read_pid_file(PidFile, true), Inform("pid is ~s", [Pid]), wait_for_application(Node, Pid). @@ -370,18 +381,33 @@ wait_for_application(Node, Pid) -> case process_up(Pid) of true -> case rabbit:is_running(Node) of true -> ok; - false -> timer:sleep(1000), + false -> timer:sleep(?EXTERNAL_CHECK_INTERVAL), wait_for_application(Node, Pid) end; false -> {error, process_not_running} end. -wait_and_read_pid_file(PidFile) -> - case file:read_file(PidFile) of - {ok, Bin} -> string:strip(binary_to_list(Bin), right, $\n); - {error, enoent} -> timer:sleep(500), - wait_and_read_pid_file(PidFile); - {error, _} = E -> exit({error, {could_not_read_pid, E}}) +wait_for_process_death(Pid) -> + case process_up(Pid) of + true -> timer:sleep(?EXTERNAL_CHECK_INTERVAL), + wait_for_process_death(Pid); + false -> ok + end. + +read_pid_file(PidFile, Wait) -> + case {file:read_file(PidFile), Wait} of + {{ok, Bin}, _} -> + S = string:strip(binary_to_list(Bin), right, $\n), + try list_to_integer(S) + catch error:badarg -> + exit({error, {garbage_in_pid_file, PidFile}}) + end, + S; + {{error, enoent}, true} -> + timer:sleep(?EXTERNAL_CHECK_INTERVAL), + read_pid_file(PidFile, Wait); + {{error, _} = E, _} -> + exit({error, {could_not_read_pid, E}}) end. % Test using some OS clunkiness since we shouldn't trust diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index afa48355..a15b9be4 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -257,6 +257,8 @@ route1(Delivery, {WorkList, SeenXs, QNames}) -> DstNames)) end. +process_alternate(#exchange{arguments = []}, Results) -> %% optimisation + Results; process_alternate(#exchange{name = XName, arguments = Args}, []) -> case rabbit_misc:r_arg(XName, exchange, Args, <<"alternate-exchange">>) of undefined -> []; @@ -355,5 +357,9 @@ peek_serial(XName) -> %% Used with atoms from records; e.g., the type is expected to exist. type_to_module(T) -> - {ok, Module} = rabbit_registry:lookup_module(exchange, T), - Module. + case get({xtype_to_module, T}) of + undefined -> {ok, Module} = rabbit_registry:lookup_module(exchange, T), + put({xtype_to_module, T}, Module), + Module; + Module -> Module + end. diff --git a/src/rabbit_memory_monitor.erl b/src/rabbit_memory_monitor.erl index 3bd8eeef..02f3158f 100644 --- a/src/rabbit_memory_monitor.erl +++ b/src/rabbit_memory_monitor.erl @@ -24,7 +24,7 @@ -behaviour(gen_server2). --export([start_link/0, update/0, register/2, deregister/1, +-export([start_link/0, register/2, deregister/1, report_ram_duration/2, stop/0]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, @@ -69,7 +69,6 @@ -ifdef(use_specs). -spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). --spec(update/0 :: () -> 'ok'). -spec(register/2 :: (pid(), {atom(),atom(),[any()]}) -> 'ok'). -spec(deregister/1 :: (pid()) -> 'ok'). -spec(report_ram_duration/2 :: @@ -85,9 +84,6 @@ start_link() -> gen_server2:start_link({local, ?SERVER}, ?MODULE, [], []). -update() -> - gen_server2:cast(?SERVER, update). - register(Pid, MFA = {_M, _F, _A}) -> gen_server2:call(?SERVER, {register, Pid, MFA}, infinity). @@ -106,8 +102,7 @@ stop() -> %%---------------------------------------------------------------------------- init([]) -> - {ok, TRef} = timer:apply_interval(?DEFAULT_UPDATE_INTERVAL, - ?SERVER, update, []), + {ok, TRef} = timer:send_interval(?DEFAULT_UPDATE_INTERVAL, update), Ets = ets:new(?TABLE_NAME, [set, private, {keypos, #process.pid}]), @@ -153,9 +148,6 @@ handle_call({register, Pid, MFA}, _From, handle_call(_Request, _From, State) -> {noreply, State}. -handle_cast(update, State) -> - {noreply, internal_update(State)}; - handle_cast({deregister, Pid}, State) -> {noreply, internal_deregister(Pid, true, State)}; @@ -165,6 +157,9 @@ handle_cast(stop, State) -> handle_cast(_Request, State) -> {noreply, State}. +handle_info(update, State) -> + {noreply, internal_update(State)}; + handle_info({'DOWN', _MRef, process, Pid, _Reason}, State) -> {noreply, internal_deregister(Pid, false, State)}; @@ -216,17 +211,19 @@ internal_update(State = #state { queue_durations = Durations, queue_duration_sum = Sum, queue_duration_count = Count }) -> MemoryLimit = ?MEMORY_LIMIT_SCALING * vm_memory_monitor:get_memory_limit(), - MemoryRatio = erlang:memory(total) / MemoryLimit, + MemoryRatio = case MemoryLimit > 0.0 of + true -> erlang:memory(total) / MemoryLimit; + false -> infinity + end, DesiredDurationAvg1 = - case MemoryRatio < ?LIMIT_THRESHOLD orelse Count == 0 of - true -> + if MemoryRatio =:= infinity -> + 0.0; + MemoryRatio < ?LIMIT_THRESHOLD orelse Count == 0 -> infinity; - false -> - Sum1 = case MemoryRatio < ?SUM_INC_THRESHOLD of - true -> Sum + ?SUM_INC_AMOUNT; - false -> Sum - end, - (Sum1 / Count) / MemoryRatio + MemoryRatio < ?SUM_INC_THRESHOLD -> + ((Sum + ?SUM_INC_AMOUNT) / Count) / MemoryRatio; + true -> + (Sum / Count) / MemoryRatio end, State1 = State #state { desired_duration = DesiredDurationAvg1 }, diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 328fe639..f60562ef 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -18,7 +18,7 @@ -export([init/3, terminate/2, delete_and_terminate/2, purge/1, publish/4, publish_delivered/5, fetch/2, ack/2, - requeue/3, len/1, is_empty/1, drain_confirmed/1, dropwhile/2, + requeue/2, len/1, is_empty/1, drain_confirmed/1, dropwhile/2, set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, handle_pre_hibernate/1, status/1, invoke/3, is_duplicate/2, discard/3]). @@ -248,11 +248,11 @@ ack(AckTags, State = #state { gm = GM, {MsgIds, State #state { backing_queue_state = BQS1, ack_msg_id = AM1 }}. -requeue(AckTags, MsgPropsFun, State = #state { gm = GM, - backing_queue = BQ, - backing_queue_state = BQS }) -> - {MsgIds, BQS1} = BQ:requeue(AckTags, MsgPropsFun, BQS), - ok = gm:broadcast(GM, {requeue, MsgPropsFun, MsgIds}), +requeue(AckTags, State = #state { gm = GM, + backing_queue = BQ, + backing_queue_state = BQS }) -> + {MsgIds, BQS1} = BQ:requeue(AckTags, BQS), + ok = gm:broadcast(GM, {requeue, MsgIds}), {MsgIds, State #state { backing_queue_state = BQS1 }}. len(#state { backing_queue = BQ, backing_queue_state = BQS }) -> diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index f423760a..7182042d 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -827,14 +827,14 @@ process_instruction({ack, MsgIds}, [] = MsgIds1 -- MsgIds, %% ASSERTION {ok, State #state { msg_id_ack = MA1, backing_queue_state = BQS1 }}; -process_instruction({requeue, MsgPropsFun, MsgIds}, +process_instruction({requeue, MsgIds}, State = #state { backing_queue = BQ, backing_queue_state = BQS, msg_id_ack = MA }) -> {AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA), {ok, case length(AckTags) =:= length(MsgIds) of true -> - {MsgIds, BQS1} = BQ:requeue(AckTags, MsgPropsFun, BQS), + {MsgIds, BQS1} = BQ:requeue(AckTags, BQS), State #state { msg_id_ack = MA1, backing_queue_state = BQS1 }; false -> diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index a9b15af8..dcfbcaff 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -236,18 +236,32 @@ protocol_error(#amqp_error{} = Error) -> not_found(R) -> protocol_error(not_found, "no ~s", [rs(R)]). +type_class(byte) -> int; +type_class(short) -> int; +type_class(signedint) -> int; +type_class(long) -> int; +type_class(decimal) -> int; +type_class(float) -> float; +type_class(double) -> float; +type_class(Other) -> Other. + assert_args_equivalence(Orig, New, Name, Keys) -> [assert_args_equivalence1(Orig, New, Name, Key) || Key <- Keys], ok. assert_args_equivalence1(Orig, New, Name, Key) -> case {table_lookup(Orig, Key), table_lookup(New, Key)} of - {Same, Same} -> ok; - {Orig1, New1} -> protocol_error( - precondition_failed, - "inequivalent arg '~s' for ~s: " - "received ~s but current is ~s", - [Key, rs(Name), val(New1), val(Orig1)]) + {Same, Same} -> + ok; + {{OrigType, OrigVal} = Orig1, {NewType, NewVal} = New1} -> + case type_class(OrigType) == type_class(NewType) andalso + OrigVal == NewVal of + true -> ok; + false -> protocol_error(precondition_failed, "inequivalent arg" + " '~s' for ~s: received ~s but current" + " is ~s", + [Key, rs(Name), val(New1), val(Orig1)]) + end end. val(undefined) -> diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index e4691b81..e6a32b90 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -68,6 +68,7 @@ file_handles_ets, %% tid of the shared file handles table file_summary_ets, %% tid of the file summary table cur_file_cache_ets, %% tid of current file cache table + flying_ets, %% tid of writes/removes in flight dying_clients, %% set of dying clients clients, %% map of references of all registered clients %% to callbacks @@ -86,7 +87,8 @@ gc_pid, file_handles_ets, file_summary_ets, - cur_file_cache_ets + cur_file_cache_ets, + flying_ets }). -record(file_summary, @@ -128,12 +130,13 @@ gc_pid :: pid(), file_handles_ets :: ets:tid(), file_summary_ets :: ets:tid(), - cur_file_cache_ets :: ets:tid()}). + cur_file_cache_ets :: ets:tid(), + flying_ets :: ets:tid()}). -type(msg_ref_delta_gen(A) :: fun ((A) -> 'finished' | {rabbit_types:msg_id(), non_neg_integer(), A})). -type(maybe_msg_id_fun() :: - 'undefined' | fun ((gb_set(), 'written' | 'removed') -> any())). + 'undefined' | fun ((gb_set(), 'written' | 'ignored') -> any())). -type(maybe_close_fds_fun() :: 'undefined' | fun (() -> 'ok')). -type(deletion_thunk() :: fun (() -> boolean())). @@ -375,6 +378,45 @@ %% performance with many healthy clients and few, if any, dying %% clients, which is the typical case. %% +%% When the msg_store has a backlog (i.e. it has unprocessed messages +%% in its mailbox / gen_server priority queue), a further optimisation +%% opportunity arises: we can eliminate pairs of 'write' and 'remove' +%% from the same client for the same message. A typical occurrence of +%% these is when an empty durable queue delivers persistent messages +%% to ack'ing consumers. The queue will asynchronously ask the +%% msg_store to 'write' such messages, and when they are acknowledged +%% it will issue a 'remove'. That 'remove' may be issued before the +%% msg_store has processed the 'write'. There is then no point going +%% ahead with the processing of that 'write'. +%% +%% To detect this situation a 'flying_ets' table is shared between the +%% clients and the server. The table is keyed on the combination of +%% client (reference) and msg id, and the value represents an +%% integration of all the writes and removes currently "in flight" for +%% that message between the client and server - '+1' means all the +%% writes/removes add up to a single 'write', '-1' to a 'remove', and +%% '0' to nothing. (NB: the integration can never add up to more than +%% one 'write' or 'read' since clients must not write/remove a message +%% more than once without first removing/writing it). +%% +%% Maintaining this table poses two challenges: 1) both the clients +%% and the server access and update the table, which causes +%% concurrency issues, 2) we must ensure that entries do not stay in +%% the table forever, since that would constitute a memory leak. We +%% address the former by carefully modelling all operations as +%% sequences of atomic actions that produce valid results in all +%% possible interleavings. We address the latter by deleting table +%% entries whenever the server finds a 0-valued entry during the +%% processing of a write/remove. 0 is essentially equivalent to "no +%% entry". If, OTOH, the value is non-zero we know there is at least +%% one other 'write' or 'remove' in flight, so we get an opportunity +%% later to delete the table entry when processing these. +%% +%% There are two further complications. We need to ensure that 1) +%% eliminated writes still get confirmed, and 2) the write-back cache +%% doesn't grow unbounded. These are quite straightforward to +%% address. See the comments in the code. +%% %% For notes on Clean Shutdown and startup, see documentation in %% variable_queue. @@ -392,7 +434,7 @@ successfully_recovered_state(Server) -> client_init(Server, Ref, MsgOnDiskFun, CloseFDsFun) -> {IState, IModule, Dir, GCPid, - FileHandlesEts, FileSummaryEts, CurFileCacheEts} = + FileHandlesEts, FileSummaryEts, CurFileCacheEts, FlyingEts} = gen_server2:call( Server, {new_client_state, Ref, MsgOnDiskFun, CloseFDsFun}, infinity), #client_msstate { server = Server, @@ -404,7 +446,8 @@ client_init(Server, Ref, MsgOnDiskFun, CloseFDsFun) -> gc_pid = GCPid, file_handles_ets = FileHandlesEts, file_summary_ets = FileSummaryEts, - cur_file_cache_ets = CurFileCacheEts }. + cur_file_cache_ets = CurFileCacheEts, + flying_ets = FlyingEts }. client_terminate(CState = #client_msstate { client_ref = Ref }) -> close_all_handles(CState), @@ -420,6 +463,7 @@ client_ref(#client_msstate { client_ref = Ref }) -> Ref. write(MsgId, Msg, CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts, client_ref = CRef }) -> + ok = client_update_flying(+1, MsgId, CState), ok = update_msg_cache(CurFileCacheEts, MsgId, Msg), ok = server_cast(CState, {write, CRef, MsgId}). @@ -440,6 +484,7 @@ read(MsgId, contains(MsgId, CState) -> server_call(CState, {contains, MsgId}). remove([], _CState) -> ok; remove(MsgIds, CState = #client_msstate { client_ref = CRef }) -> + [client_update_flying(-1, MsgId, CState) || MsgId <- MsgIds], server_cast(CState, {remove, CRef, MsgIds}). set_maximum_since_use(Server, Age) -> @@ -566,6 +611,21 @@ client_read3(#msg_location { msg_id = MsgId, file = File }, Defer, end end. +client_update_flying(Diff, MsgId, #client_msstate { flying_ets = FlyingEts, + client_ref = CRef }) -> + Key = {MsgId, CRef}, + case ets:insert_new(FlyingEts, {Key, Diff}) of + true -> ok; + false -> try ets:update_counter(FlyingEts, Key, {2, Diff}) + catch error:badarg -> + %% this is guaranteed to succeed since the + %% server only removes and updates flying_ets + %% entries; it never inserts them + true = ets:insert_new(FlyingEts, {Key, Diff}) + end, + ok + end. + clear_client(CRef, State = #msstate { cref_to_msg_ids = CTM, dying_clients = DyingClients }) -> State #msstate { cref_to_msg_ids = dict:erase(CRef, CTM), @@ -619,6 +679,7 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) -> FileHandlesEts = ets:new(rabbit_msg_store_shared_file_handles, [ordered_set, public]), CurFileCacheEts = ets:new(rabbit_msg_store_cur_file, [set, public]), + FlyingEts = ets:new(rabbit_msg_store_flying, [set, public]), {ok, FileSizeLimit} = application:get_env(msg_store_file_size_limit), @@ -645,6 +706,7 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) -> file_handles_ets = FileHandlesEts, file_summary_ets = FileSummaryEts, cur_file_cache_ets = CurFileCacheEts, + flying_ets = FlyingEts, dying_clients = sets:new(), clients = Clients, successfully_recovered = CleanShutdown, @@ -700,11 +762,13 @@ handle_call({new_client_state, CRef, MsgOnDiskFun, CloseFDsFun}, _From, file_handles_ets = FileHandlesEts, file_summary_ets = FileSummaryEts, cur_file_cache_ets = CurFileCacheEts, + flying_ets = FlyingEts, clients = Clients, gc_pid = GCPid }) -> Clients1 = dict:store(CRef, {MsgOnDiskFun, CloseFDsFun}, Clients), reply({IndexState, IndexModule, Dir, GCPid, FileHandlesEts, FileSummaryEts, - CurFileCacheEts}, State #msstate { clients = Clients1 }); + CurFileCacheEts, FlyingEts}, + State #msstate { clients = Clients1 }); handle_call({client_terminate, CRef}, _From, State) -> reply(ok, clear_client(CRef, State)); @@ -723,40 +787,54 @@ handle_cast({client_dying, CRef}, noreply(write_message(CRef, <<>>, State #msstate { dying_clients = DyingClients1 })); -handle_cast({client_delete, CRef}, State = #msstate { clients = Clients }) -> +handle_cast({client_delete, CRef}, + State = #msstate { clients = Clients }) -> State1 = State #msstate { clients = dict:erase(CRef, Clients) }, noreply(remove_message(CRef, CRef, clear_client(CRef, State1))); handle_cast({write, CRef, MsgId}, State = #msstate { cur_file_cache_ets = CurFileCacheEts }) -> true = 0 =< ets:update_counter(CurFileCacheEts, MsgId, {3, -1}), - [{MsgId, Msg, _CacheRefCount}] = ets:lookup(CurFileCacheEts, MsgId), - noreply( - case write_action(should_mask_action(CRef, MsgId, State), MsgId, State) of - {write, State1} -> - write_message(CRef, MsgId, Msg, State1); - {ignore, CurFile, State1 = #msstate { current_file = CurFile }} -> - State1; - {ignore, _File, State1} -> - true = ets:delete_object(CurFileCacheEts, {MsgId, Msg, 0}), - State1; - {confirm, CurFile, State1 = #msstate { current_file = CurFile }}-> - record_pending_confirm(CRef, MsgId, State1); - {confirm, _File, State1} -> - true = ets:delete_object(CurFileCacheEts, {MsgId, Msg, 0}), - update_pending_confirms( - fun (MsgOnDiskFun, CTM) -> - MsgOnDiskFun(gb_sets:singleton(MsgId), written), - CTM - end, CRef, State1) - end); + case update_flying(-1, MsgId, CRef, State) of + process -> + [{MsgId, Msg, _PWC}] = ets:lookup(CurFileCacheEts, MsgId), + noreply(write_message(MsgId, Msg, CRef, State)); + ignore -> + %% A 'remove' has already been issued and eliminated the + %% 'write'. + State1 = blind_confirm(CRef, gb_sets:singleton(MsgId), + ignored, State), + %% If all writes get eliminated, cur_file_cache_ets could + %% grow unbounded. To prevent that we delete the cache + %% entry here, but only if the message isn't in the + %% current file. That way reads of the message can + %% continue to be done client side, from either the cache + %% or the non-current files. If the message *is* in the + %% current file then the cache entry will be removed by + %% the normal logic for that in write_message/4 and + %% maybe_roll_to_new_file/2. + case index_lookup(MsgId, State1) of + [#msg_location { file = File }] + when File == State1 #msstate.current_file -> + ok; + _ -> + true = ets:match_delete(CurFileCacheEts, {MsgId, '_', 0}) + end, + noreply(State1) + end; handle_cast({remove, CRef, MsgIds}, State) -> - State1 = lists:foldl( - fun (MsgId, State2) -> remove_message(MsgId, CRef, State2) end, - State, MsgIds), - noreply(maybe_compact(client_confirm(CRef, gb_sets:from_list(MsgIds), - removed, State1))); + {RemovedMsgIds, State1} = + lists:foldl( + fun (MsgId, {Removed, State2}) -> + case update_flying(+1, MsgId, CRef, State2) of + process -> {[MsgId | Removed], + remove_message(MsgId, CRef, State2)}; + ignore -> {Removed, State2} + end + end, {[], State}, MsgIds), + noreply(maybe_compact(client_confirm(CRef, gb_sets:from_list(RemovedMsgIds), + ignored, State1))); handle_cast({combine_files, Source, Destination, Reclaimed}, State = #msstate { sum_file_size = SumFileSize, @@ -797,6 +875,7 @@ terminate(_Reason, State = #msstate { index_state = IndexState, file_handles_ets = FileHandlesEts, file_summary_ets = FileSummaryEts, cur_file_cache_ets = CurFileCacheEts, + flying_ets = FlyingEts, clients = Clients, dir = Dir }) -> %% stop the gc first, otherwise it could be working and we pull @@ -810,8 +889,8 @@ terminate(_Reason, State = #msstate { index_state = IndexState, end, State3 = close_all_handles(State1), ok = store_file_summary(FileSummaryEts, Dir), - [true = ets:delete(T) || - T <- [FileSummaryEts, FileHandlesEts, CurFileCacheEts]], + [true = ets:delete(T) || T <- [FileSummaryEts, FileHandlesEts, + CurFileCacheEts, FlyingEts]], IndexModule:terminate(IndexState), ok = store_recovery_terms([{client_refs, dict:fetch_keys(Clients)}, {index_module, IndexModule}], Dir), @@ -874,6 +953,19 @@ internal_sync(State = #msstate { current_file_handle = CurHdl, client_confirm(CRef, MsgIds, written, StateN) end, State1, CGs). +update_flying(Diff, MsgId, CRef, #msstate { flying_ets = FlyingEts }) -> + Key = {MsgId, CRef}, + NDiff = -Diff, + case ets:lookup(FlyingEts, Key) of + [] -> ignore; + [{_, Diff}] -> ignore; + [{_, NDiff}] -> ets:update_counter(FlyingEts, Key, {2, Diff}), + true = ets:delete_object(FlyingEts, {Key, 0}), + process; + [{_, 0}] -> true = ets:delete_object(FlyingEts, {Key, 0}), + ignore + end. + write_action({true, not_found}, _MsgId, State) -> {ignore, undefined, State}; write_action({true, #msg_location { file = File }}, _MsgId, State) -> @@ -905,8 +997,65 @@ write_action({_Mask, #msg_location { ref_count = RefCount, file = File }}, %% field otherwise bad interaction with concurrent GC {confirm, File, State}. -write_message(CRef, MsgId, Msg, State) -> - write_message(MsgId, Msg, record_pending_confirm(CRef, MsgId, State)). +write_message(MsgId, Msg, CRef, + State = #msstate { cur_file_cache_ets = CurFileCacheEts }) -> + case write_action(should_mask_action(CRef, MsgId, State), MsgId, State) of + {write, State1} -> + write_message(MsgId, Msg, + record_pending_confirm(CRef, MsgId, State1)); + {ignore, CurFile, State1 = #msstate { current_file = CurFile }} -> + State1; + {ignore, _File, State1} -> + true = ets:delete_object(CurFileCacheEts, {MsgId, Msg, 0}), + State1; + {confirm, CurFile, State1 = #msstate { current_file = CurFile }}-> + record_pending_confirm(CRef, MsgId, State1); + {confirm, _File, State1} -> + true = ets:delete_object(CurFileCacheEts, {MsgId, Msg, 0}), + update_pending_confirms( + fun (MsgOnDiskFun, CTM) -> + MsgOnDiskFun(gb_sets:singleton(MsgId), written), + CTM + end, CRef, State1) + end. + +remove_message(MsgId, CRef, + State = #msstate { file_summary_ets = FileSummaryEts }) -> + case should_mask_action(CRef, MsgId, State) of + {true, _Location} -> + State; + {false_if_increment, #msg_location { ref_count = 0 }} -> + %% CRef has tried to both write and remove this msg whilst + %% it's being GC'd. + %% + %% ASSERTION: [#file_summary { locked = true }] = + %% ets:lookup(FileSummaryEts, File), + State; + {_Mask, #msg_location { ref_count = RefCount, file = File, + total_size = TotalSize }} + when RefCount > 0 -> + %% only update field, otherwise bad interaction with + %% concurrent GC + Dec = fun () -> index_update_ref_count( + MsgId, RefCount - 1, State) end, + case RefCount of + %% don't remove from cur_file_cache_ets here because + %% there may be further writes in the mailbox for the + %% same msg. + 1 -> case ets:lookup(FileSummaryEts, File) of + [#file_summary { locked = true }] -> + add_to_pending_gc_completion( + {remove, MsgId, CRef}, File, State); + [#file_summary {}] -> + ok = Dec(), + delete_file_if_empty( + File, adjust_valid_total_size( + File, -TotalSize, State)) + end; + _ -> ok = Dec(), + State + end + end. write_message(MsgId, Msg, State = #msstate { current_file_handle = CurHdl, @@ -1004,43 +1153,6 @@ contains_message(MsgId, From, end end. -remove_message(MsgId, CRef, - State = #msstate { file_summary_ets = FileSummaryEts }) -> - case should_mask_action(CRef, MsgId, State) of - {true, _Location} -> - State; - {false_if_increment, #msg_location { ref_count = 0 }} -> - %% CRef has tried to both write and remove this msg - %% whilst it's being GC'd. ASSERTION: - %% [#file_summary { locked = true }] = - %% ets:lookup(FileSummaryEts, File), - State; - {_Mask, #msg_location { ref_count = RefCount, file = File, - total_size = TotalSize }} when RefCount > 0 -> - %% only update field, otherwise bad interaction with - %% concurrent GC - Dec = fun () -> - index_update_ref_count(MsgId, RefCount - 1, State) - end, - case RefCount of - %% don't remove from CUR_FILE_CACHE_ETS_NAME here - %% because there may be further writes in the mailbox - %% for the same msg. - 1 -> case ets:lookup(FileSummaryEts, File) of - [#file_summary { locked = true }] -> - add_to_pending_gc_completion( - {remove, MsgId, CRef}, File, State); - [#file_summary {}] -> - ok = Dec(), - delete_file_if_empty( - File, adjust_valid_total_size(File, -TotalSize, - State)) - end; - _ -> ok = Dec(), - State - end - end. - add_to_pending_gc_completion( Op, File, State = #msstate { pending_gc_completion = Pending }) -> State #msstate { pending_gc_completion = @@ -1069,8 +1181,13 @@ safe_ets_update_counter(Tab, Key, UpdateOp, SuccessFun, FailThunk) -> catch error:badarg -> FailThunk() end. -safe_ets_update_counter_ok(Tab, Key, UpdateOp, FailThunk) -> - safe_ets_update_counter(Tab, Key, UpdateOp, fun (_) -> ok end, FailThunk). +update_msg_cache(CacheEts, MsgId, Msg) -> + case ets:insert_new(CacheEts, {MsgId, Msg, 1}) of + true -> ok; + false -> safe_ets_update_counter( + CacheEts, MsgId, {3, +1}, fun (_) -> ok end, + fun () -> update_msg_cache(CacheEts, MsgId, Msg) end) + end. adjust_valid_total_size(File, Delta, State = #msstate { sum_valid_data = SumValid, @@ -1115,6 +1232,11 @@ client_confirm(CRef, MsgIds, ActionTaken, State) -> end end, CRef, State). +blind_confirm(CRef, MsgIds, ActionTaken, State) -> + update_pending_confirms( + fun (MsgOnDiskFun, CTM) -> MsgOnDiskFun(MsgIds, ActionTaken), CTM end, + CRef, State). + %% Detect whether the MsgId is older or younger than the client's death %% msg (if there is one). If the msg is older than the client death %% msg, and it has a 0 ref_count we must only alter the ref_count, not @@ -1257,18 +1379,6 @@ list_sorted_file_names(Dir, Ext) -> filelib:wildcard("*" ++ Ext, Dir)). %%---------------------------------------------------------------------------- -%% message cache helper functions -%%---------------------------------------------------------------------------- - -update_msg_cache(CacheEts, MsgId, Msg) -> - case ets:insert_new(CacheEts, {MsgId, Msg, 1}) of - true -> ok; - false -> safe_ets_update_counter_ok( - CacheEts, MsgId, {3, +1}, - fun () -> update_msg_cache(CacheEts, MsgId, Msg) end) - end. - -%%---------------------------------------------------------------------------- %% index %%---------------------------------------------------------------------------- diff --git a/src/rabbit_plugins.erl b/src/rabbit_plugins.erl index 27dadfc5..b06bcd83 100644 --- a/src/rabbit_plugins.erl +++ b/src/rabbit_plugins.erl @@ -67,7 +67,7 @@ start() -> print_error("~p", [Reason]), rabbit_misc:quit(2); Other -> - print_error("~s", [Other]), + print_error("~p", [Other]), rabbit_misc:quit(2) end. diff --git a/src/rabbit_prelaunch.erl b/src/rabbit_prelaunch.erl index d34ed44a..50444dc4 100644 --- a/src/rabbit_prelaunch.erl +++ b/src/rabbit_prelaunch.erl @@ -22,6 +22,7 @@ -define(BaseApps, [rabbit]). -define(ERROR_CODE, 1). +-define(EPMD_TIMEOUT, 30000). %%---------------------------------------------------------------------------- %% Specs @@ -233,7 +234,8 @@ post_process_script(ScriptFile) -> end. process_entry(Entry = {apply,{application,start_boot,[mnesia,permanent]}}) -> - [{apply,{rabbit,prepare,[]}}, Entry]; + [{apply,{rabbit,maybe_hipe_compile,[]}}, + {apply,{rabbit,prepare,[]}}, Entry]; process_entry(Entry) -> [Entry]. @@ -244,7 +246,7 @@ duplicate_node_check([]) -> duplicate_node_check(NodeStr) -> Node = rabbit_misc:makenode(NodeStr), {NodeName, NodeHost} = rabbit_misc:nodeparts(Node), - case net_adm:names(NodeHost) of + case names(NodeHost) of {ok, NamePorts} -> case proplists:is_defined(NodeName, NamePorts) of true -> io:format("node with name ~p " @@ -260,6 +262,7 @@ duplicate_node_check(NodeStr) -> [NodeHost, EpmdReason, case EpmdReason of address -> "unable to establish tcp connection"; + timeout -> "timed out establishing tcp connection"; _ -> inet:format_error(EpmdReason) end]) end. @@ -276,3 +279,15 @@ terminate(Status) -> after infinity -> ok end end. + +names(Hostname) -> + Self = self(), + process_flag(trap_exit, true), + Pid = spawn_link(fun () -> Self ! {names, net_adm:names(Hostname)} end), + timer:exit_after(?EPMD_TIMEOUT, Pid, timeout), + Res = receive + {names, Names} -> Names; + {'EXIT', Pid, Reason} -> {error, Reason} + end, + process_flag(trap_exit, false), + Res. diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index f1751e95..4b545466 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -188,7 +188,7 @@ dirty_count :: integer(), max_journal_entries :: non_neg_integer(), on_sync :: on_sync_fun(), - unsynced_msg_ids :: [rabbit_types:msg_id()] + unsynced_msg_ids :: gb_set() }). -type(contains_predicate() :: fun ((rabbit_types:msg_id()) -> boolean())). -type(walker(A) :: fun ((A) -> 'finished' | @@ -263,9 +263,10 @@ publish(MsgId, SeqId, MsgProps, IsPersistent, State = #qistate { unsynced_msg_ids = UnsyncedMsgIds }) when is_binary(MsgId) -> ?MSG_ID_BYTES = size(MsgId), - {JournalHdl, State1} = get_journal_handle( - State #qistate { - unsynced_msg_ids = [MsgId | UnsyncedMsgIds] }), + {JournalHdl, State1} = + get_journal_handle( + State #qistate { + unsynced_msg_ids = gb_sets:add_element(MsgId, UnsyncedMsgIds) }), ok = file_handle_cache:append( JournalHdl, [<<(case IsPersistent of true -> ?PUB_PERSIST_JPREFIX; @@ -285,7 +286,7 @@ ack(SeqIds, State) -> %% This is only called when there are outstanding confirms and the %% queue is idle. sync(State = #qistate { unsynced_msg_ids = MsgIds }) -> - sync_if([] =/= MsgIds, State). + sync_if(not gb_sets:is_empty(MsgIds), State). sync(SeqIds, State) -> %% The SeqIds here contains the SeqId of every publish and ack to @@ -387,7 +388,7 @@ blank_state(QueueName) -> dirty_count = 0, max_journal_entries = MaxJournal, on_sync = fun (_) -> ok end, - unsynced_msg_ids = [] }. + unsynced_msg_ids = gb_sets:new() }. clean_file_name(Dir) -> filename:join(Dir, ?CLEAN_FILENAME). @@ -712,8 +713,8 @@ sync_if(true, State = #qistate { journal_handle = JournalHdl }) -> notify_sync(State). notify_sync(State = #qistate { unsynced_msg_ids = UG, on_sync = OnSyncFun }) -> - OnSyncFun(gb_sets:from_list(UG)), - State #qistate { unsynced_msg_ids = [] }. + OnSyncFun(UG), + State #qistate { unsynced_msg_ids = gb_sets:new() }. %%---------------------------------------------------------------------------- %% segment manipulation diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl index e9c4479a..31f5ad14 100644 --- a/src/rabbit_router.erl +++ b/src/rabbit_router.erl @@ -44,6 +44,11 @@ %%---------------------------------------------------------------------------- +deliver([], #delivery{mandatory = false, + immediate = false}) -> + %% /dev/null optimisation + {routed, []}; + deliver(QNames, Delivery = #delivery{mandatory = false, immediate = false}) -> %% optimisation: when Mandatory = false and Immediate = false, diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 7a36dd60..00d46f5a 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -30,12 +30,6 @@ -define(TRANSIENT_MSG_STORE, msg_store_transient). -define(CLEANUP_QUEUE_NAME, <<"cleanup-queue">>). -test_content_prop_roundtrip(Datum, Binary) -> - Types = [element(1, E) || E <- Datum], - Values = [element(2, E) || E <- Datum], - Values = rabbit_binary_parser:parse_properties(Types, Binary), %% assertion - Binary = rabbit_binary_generator:encode_properties(Types, Values). %% assertion - all_tests() -> passed = gm_tests:all_tests(), passed = mirrored_supervisor_tests:all_tests(), @@ -44,7 +38,6 @@ all_tests() -> passed = test_file_handle_cache(), passed = test_backing_queue(), passed = test_priority_queue(), - passed = test_bpqueue(), passed = test_pg_local(), passed = test_unfold(), passed = test_supervisor_delayed_restart(), @@ -262,143 +255,6 @@ test_priority_queue(Q) -> priority_queue:to_list(Q), priority_queue_out_all(Q)}. -test_bpqueue() -> - Q = bpqueue:new(), - true = bpqueue:is_empty(Q), - 0 = bpqueue:len(Q), - [] = bpqueue:to_list(Q), - - Q1 = bpqueue_test(fun bpqueue:in/3, fun bpqueue:out/1, - fun bpqueue:to_list/1, - fun bpqueue:foldl/3, fun bpqueue:map_fold_filter_l/4), - Q2 = bpqueue_test(fun bpqueue:in_r/3, fun bpqueue:out_r/1, - fun (QR) -> lists:reverse( - [{P, lists:reverse(L)} || - {P, L} <- bpqueue:to_list(QR)]) - end, - fun bpqueue:foldr/3, fun bpqueue:map_fold_filter_r/4), - - [{foo, [1, 2]}, {bar, [3]}] = bpqueue:to_list(bpqueue:join(Q, Q1)), - [{bar, [3]}, {foo, [2, 1]}] = bpqueue:to_list(bpqueue:join(Q2, Q)), - [{foo, [1, 2]}, {bar, [3, 3]}, {foo, [2,1]}] = - bpqueue:to_list(bpqueue:join(Q1, Q2)), - - [{foo, [1, 2]}, {bar, [3]}, {foo, [1, 2]}, {bar, [3]}] = - bpqueue:to_list(bpqueue:join(Q1, Q1)), - - [{foo, [1, 2]}, {bar, [3]}] = - bpqueue:to_list( - bpqueue:from_list( - [{x, []}, {foo, [1]}, {y, []}, {foo, [2]}, {bar, [3]}, {z, []}])), - - [{undefined, [a]}] = bpqueue:to_list(bpqueue:from_list([{undefined, [a]}])), - - {4, [a,b,c,d]} = - bpqueue:foldl( - fun (Prefix, Value, {Prefix, Acc}) -> - {Prefix + 1, [Value | Acc]} - end, - {0, []}, bpqueue:from_list([{0,[d]}, {1,[c]}, {2,[b]}, {3,[a]}])), - - [{bar,3}, {foo,2}, {foo,1}] = - bpqueue:foldr(fun (P, V, I) -> [{P,V} | I] end, [], Q2), - - BPQL = [{foo,[1,2,2]}, {bar,[3,4,5]}, {foo,[5,6,7]}], - BPQ = bpqueue:from_list(BPQL), - - %% no effect - {BPQL, 0} = bpqueue_mffl([none], {none, []}, BPQ), - {BPQL, 0} = bpqueue_mffl([foo,bar], {none, [1]}, BPQ), - {BPQL, 0} = bpqueue_mffl([bar], {none, [3]}, BPQ), - {BPQL, 0} = bpqueue_mffr([bar], {foo, [5]}, BPQ), - - %% process 1 item - {[{foo,[-1,2,2]}, {bar,[3,4,5]}, {foo,[5,6,7]}], 1} = - bpqueue_mffl([foo,bar], {foo, [2]}, BPQ), - {[{foo,[1,2,2]}, {bar,[-3,4,5]}, {foo,[5,6,7]}], 1} = - bpqueue_mffl([bar], {bar, [4]}, BPQ), - {[{foo,[1,2,2]}, {bar,[3,4,5]}, {foo,[5,6,-7]}], 1} = - bpqueue_mffr([foo,bar], {foo, [6]}, BPQ), - {[{foo,[1,2,2]}, {bar,[3,4]}, {baz,[-5]}, {foo,[5,6,7]}], 1} = - bpqueue_mffr([bar], {baz, [4]}, BPQ), - - %% change prefix - {[{bar,[-1,-2,-2,-3,-4,-5,-5,-6,-7]}], 9} = - bpqueue_mffl([foo,bar], {bar, []}, BPQ), - {[{bar,[-1,-2,-2,3,4,5]}, {foo,[5,6,7]}], 3} = - bpqueue_mffl([foo], {bar, [5]}, BPQ), - {[{bar,[-1,-2,-2,3,4,5,-5,-6]}, {foo,[7]}], 5} = - bpqueue_mffl([foo], {bar, [7]}, BPQ), - {[{foo,[1,2,2,-3,-4]}, {bar,[5]}, {foo,[5,6,7]}], 2} = - bpqueue_mffl([bar], {foo, [5]}, BPQ), - {[{bar,[-1,-2,-2,3,4,5,-5,-6,-7]}], 6} = - bpqueue_mffl([foo], {bar, []}, BPQ), - {[{foo,[1,2,2,-3,-4,-5,5,6,7]}], 3} = - bpqueue_mffl([bar], {foo, []}, BPQ), - - %% edge cases - {[{foo,[-1,-2,-2]}, {bar,[3,4,5]}, {foo,[5,6,7]}], 3} = - bpqueue_mffl([foo], {foo, [5]}, BPQ), - {[{foo,[1,2,2]}, {bar,[3,4,5]}, {foo,[-5,-6,-7]}], 3} = - bpqueue_mffr([foo], {foo, [2]}, BPQ), - - passed. - -bpqueue_test(In, Out, List, Fold, MapFoldFilter) -> - Q = bpqueue:new(), - {empty, _Q} = Out(Q), - - ok = Fold(fun (Prefix, Value, ok) -> {error, Prefix, Value} end, ok, Q), - {Q1M, 0} = MapFoldFilter(fun(_P) -> throw(explosion) end, - fun(_V, _N) -> throw(explosion) end, 0, Q), - [] = bpqueue:to_list(Q1M), - - Q1 = In(bar, 3, In(foo, 2, In(foo, 1, Q))), - false = bpqueue:is_empty(Q1), - 3 = bpqueue:len(Q1), - [{foo, [1, 2]}, {bar, [3]}] = List(Q1), - - {{value, foo, 1}, Q3} = Out(Q1), - {{value, foo, 2}, Q4} = Out(Q3), - {{value, bar, 3}, _Q5} = Out(Q4), - - F = fun (QN) -> - MapFoldFilter(fun (foo) -> true; - (_) -> false - end, - fun (2, _Num) -> stop; - (V, Num) -> {bar, -V, V - Num} end, - 0, QN) - end, - {Q6, 0} = F(Q), - [] = bpqueue:to_list(Q6), - {Q7, 1} = F(Q1), - [{bar, [-1]}, {foo, [2]}, {bar, [3]}] = List(Q7), - - Q1. - -bpqueue_mffl(FF1A, FF2A, BPQ) -> - bpqueue_mff(fun bpqueue:map_fold_filter_l/4, FF1A, FF2A, BPQ). - -bpqueue_mffr(FF1A, FF2A, BPQ) -> - bpqueue_mff(fun bpqueue:map_fold_filter_r/4, FF1A, FF2A, BPQ). - -bpqueue_mff(Fold, FF1A, FF2A, BPQ) -> - FF1 = fun (Prefixes) -> - fun (P) -> lists:member(P, Prefixes) end - end, - FF2 = fun ({Prefix, Stoppers}) -> - fun (Val, Num) -> - case lists:member(Val, Stoppers) of - true -> stop; - false -> {Prefix, -Val, 1 + Num} - end - end - end, - Queue_to_list = fun ({LHS, RHS}) -> {bpqueue:to_list(LHS), RHS} end, - - Queue_to_list(Fold(FF1(FF1A), FF2(FF2A), 0, BPQ)). - test_simple_n_element_queue(N) -> Items = lists:seq(1, N), Q = priority_queue_in_all(priority_queue:new(), Items), @@ -444,67 +300,69 @@ test_parsing() -> passed = test_field_values(), passed. +test_content_prop_encoding(Datum, Binary) -> + Types = [element(1, E) || E <- Datum], + Values = [element(2, E) || E <- Datum], + Binary = rabbit_binary_generator:encode_properties(Types, Values). %% assertion + test_content_properties() -> - test_content_prop_roundtrip([], <<0, 0>>), - test_content_prop_roundtrip([{bit, true}, {bit, false}, {bit, true}, {bit, false}], - <<16#A0, 0>>), - test_content_prop_roundtrip([{bit, true}, {octet, 123}, {bit, true}, {octet, undefined}, - {bit, true}], - <<16#E8,0,123>>), - test_content_prop_roundtrip([{bit, true}, {octet, 123}, {octet, 123}, {bit, true}], - <<16#F0,0,123,123>>), - test_content_prop_roundtrip([{bit, true}, {shortstr, <<"hi">>}, {bit, true}, - {shortint, 54321}, {bit, true}], - <<16#F8,0,2,"hi",16#D4,16#31>>), - test_content_prop_roundtrip([{bit, true}, {shortstr, undefined}, {bit, true}, - {shortint, 54321}, {bit, true}], - <<16#B8,0,16#D4,16#31>>), - test_content_prop_roundtrip([{table, [{<<"a signedint">>, signedint, 12345678}, - {<<"a longstr">>, longstr, <<"yes please">>}, - {<<"a decimal">>, decimal, {123, 12345678}}, - {<<"a timestamp">>, timestamp, 123456789012345}, - {<<"a nested table">>, table, - [{<<"one">>, signedint, 1}, - {<<"two">>, signedint, 2}]}]}], - << - %% property-flags - 16#8000:16, - - %% property-list: - - %% table - 117:32, % table length in bytes - - 11,"a signedint", % name - "I",12345678:32, % type and value - - 9,"a longstr", - "S",10:32,"yes please", - - 9,"a decimal", - "D",123,12345678:32, - - 11,"a timestamp", - "T", 123456789012345:64, - - 14,"a nested table", - "F", - 18:32, - - 3,"one", - "I",1:32, - - 3,"two", - "I",2:32 >>), - case catch rabbit_binary_parser:parse_properties([bit, bit, bit, bit], <<16#A0,0,1>>) of - {'EXIT', content_properties_binary_overflow} -> passed; - V -> exit({got_success_but_expected_failure, V}) - end. + test_content_prop_encoding([], <<0, 0>>), + test_content_prop_encoding([{bit, true}, {bit, false}, {bit, true}, {bit, false}], + <<16#A0, 0>>), + test_content_prop_encoding([{bit, true}, {octet, 123}, {bit, true}, {octet, undefined}, + {bit, true}], + <<16#E8,0,123>>), + test_content_prop_encoding([{bit, true}, {octet, 123}, {octet, 123}, {bit, true}], + <<16#F0,0,123,123>>), + test_content_prop_encoding([{bit, true}, {shortstr, <<"hi">>}, {bit, true}, + {shortint, 54321}, {bit, true}], + <<16#F8,0,2,"hi",16#D4,16#31>>), + test_content_prop_encoding([{bit, true}, {shortstr, undefined}, {bit, true}, + {shortint, 54321}, {bit, true}], + <<16#B8,0,16#D4,16#31>>), + test_content_prop_encoding([{table, [{<<"a signedint">>, signedint, 12345678}, + {<<"a longstr">>, longstr, <<"yes please">>}, + {<<"a decimal">>, decimal, {123, 12345678}}, + {<<"a timestamp">>, timestamp, 123456789012345}, + {<<"a nested table">>, table, + [{<<"one">>, signedint, 1}, + {<<"two">>, signedint, 2}]}]}], + << + %% property-flags + 16#8000:16, + + %% property-list: + + %% table + 117:32, % table length in bytes + + 11,"a signedint", % name + "I",12345678:32, % type and value + + 9,"a longstr", + "S",10:32,"yes please", + + 9,"a decimal", + "D",123,12345678:32, + + 11,"a timestamp", + "T", 123456789012345:64, + + 14,"a nested table", + "F", + 18:32, + + 3,"one", + "I",1:32, + + 3,"two", + "I",2:32 >>), + passed. test_field_values() -> %% FIXME this does not test inexact numbers (double and float) yet, %% because they won't pass the equality assertions - test_content_prop_roundtrip( + test_content_prop_encoding( [{table, [{<<"longstr">>, longstr, <<"Here is a long string">>}, {<<"signedint">>, signedint, 12345}, {<<"decimal">>, decimal, {3, 123456}}, @@ -1842,6 +1700,8 @@ on_disk_capture() -> stop -> done end. +on_disk_capture([_|_], _Awaiting, Pid) -> + Pid ! {self(), surplus}; on_disk_capture(OnDisk, Awaiting, Pid) -> receive {on_disk, MsgIdsS} -> @@ -1850,12 +1710,10 @@ on_disk_capture(OnDisk, Awaiting, Pid) -> Pid); stop -> done - after 200 -> - case {OnDisk, Awaiting} of - {[], []} -> Pid ! {self(), arrived}, on_disk_capture(); - {_, []} -> Pid ! {self(), surplus}; - {[], _} -> Pid ! {self(), timeout}; - {_, _} -> Pid ! {self(), surplus_timeout} + after (case Awaiting of [] -> 200; _ -> 1000 end) -> + case Awaiting of + [] -> Pid ! {self(), arrived}, on_disk_capture(); + _ -> Pid ! {self(), timeout} end end. @@ -1920,7 +1778,7 @@ foreach_with_msg_store_client(MsgStore, Ref, Fun, L) -> test_msg_store() -> restart_msg_store_empty(), MsgIds = [msg_id_bin(M) || M <- lists:seq(1,100)], - {MsgIds1stHalf, MsgIds2ndHalf} = lists:split(50, MsgIds), + {MsgIds1stHalf, MsgIds2ndHalf} = lists:split(length(MsgIds) div 2, MsgIds), Ref = rabbit_guid:guid(), {Cap, MSCState} = msg_store_client_init_capture( ?PERSISTENT_MSG_STORE, Ref), @@ -1931,6 +1789,8 @@ test_msg_store() -> false = msg_store_contains(false, MsgIds, MSCState), %% test confirm logic passed = test_msg_store_confirms([hd(MsgIds)], Cap, MSCState), + %% check we don't contain any of the msgs we're about to publish + false = msg_store_contains(false, MsgIds, MSCState), %% publish the first half ok = msg_store_write(MsgIds1stHalf, MSCState), %% sync on the first half @@ -2038,12 +1898,12 @@ test_msg_store() -> false = msg_store_contains(false, MsgIdsBig, MSCStateM), MSCStateM end), + %% + passed = test_msg_store_client_delete_and_terminate(), %% restart empty restart_msg_store_empty(), passed. -%% We want to test that writes that get eliminated due to removes still -%% get confirmed. Removes themselves do not. test_msg_store_confirms(MsgIds, Cap, MSCState) -> %% write -> confirmed ok = msg_store_write(MsgIds, MSCState), @@ -2069,6 +1929,45 @@ test_msg_store_confirms(MsgIds, Cap, MSCState) -> ok = msg_store_write(MsgIds, MSCState), ok = msg_store_remove(MsgIds, MSCState), ok = on_disk_await(Cap, MsgIds), + %% confirmation on timer-based sync + passed = test_msg_store_confirm_timer(), + passed. + +test_msg_store_confirm_timer() -> + Ref = rabbit_guid:guid(), + MsgId = msg_id_bin(1), + Self = self(), + MSCState = rabbit_msg_store:client_init( + ?PERSISTENT_MSG_STORE, Ref, + fun (MsgIds, _ActionTaken) -> + case gb_sets:is_member(MsgId, MsgIds) of + true -> Self ! on_disk; + false -> ok + end + end, undefined), + ok = msg_store_write([MsgId], MSCState), + ok = msg_store_keep_busy_until_confirm([msg_id_bin(2)], MSCState), + ok = msg_store_remove([MsgId], MSCState), + ok = rabbit_msg_store:client_delete_and_terminate(MSCState), + passed. + +msg_store_keep_busy_until_confirm(MsgIds, MSCState) -> + receive + on_disk -> ok + after 0 -> + ok = msg_store_write(MsgIds, MSCState), + ok = msg_store_remove(MsgIds, MSCState), + msg_store_keep_busy_until_confirm(MsgIds, MSCState) + end. + +test_msg_store_client_delete_and_terminate() -> + restart_msg_store_empty(), + MsgIds = [msg_id_bin(M) || M <- lists:seq(1, 10)], + Ref = rabbit_guid:guid(), + MSCState = msg_store_client_init(?PERSISTENT_MSG_STORE, Ref), + ok = msg_store_write(MsgIds, MSCState), + %% test the 'dying client' fast path for writes + ok = rabbit_msg_store:client_delete_and_terminate(MSCState), passed. queue_name(Name) -> @@ -2374,22 +2273,16 @@ test_variable_queue_requeue(VQ0) -> Seq = lists:seq(1, Count), VQ1 = rabbit_variable_queue:set_ram_duration_target(0, VQ0), VQ2 = variable_queue_publish(false, Count, VQ1), - {VQ3, Acks} = lists:foldl( - fun (_N, {VQN, AckTags}) -> - {{#basic_message{}, false, AckTag, _}, VQM} = - rabbit_variable_queue:fetch(true, VQN), - {VQM, [AckTag | AckTags]} - end, {VQ2, []}, Seq), + {VQ3, Acks} = variable_queue_fetch(Count, false, false, Count, VQ2), Subset = lists:foldl(fun ({Ack, N}, Acc) when N rem Interval == 0 -> [Ack | Acc]; (_, Acc) -> Acc end, [], lists:zip(Acks, Seq)), - {_MsgIds, VQ4} = rabbit_variable_queue:requeue(Acks -- Subset, - fun(X) -> X end, VQ3), + {_MsgIds, VQ4} = rabbit_variable_queue:requeue(Acks -- Subset, VQ3), VQ5 = lists:foldl(fun (AckTag, VQN) -> {_MsgId, VQM} = rabbit_variable_queue:requeue( - [AckTag], fun(X) -> X end, VQN), + [AckTag], VQN), VQM end, VQ4, Subset), VQ6 = lists:foldl(fun (AckTag, VQa) -> @@ -2507,7 +2400,7 @@ test_variable_queue_partial_segments_delta_thing(VQ0) -> {_Duration, VQ2} = rabbit_variable_queue:ram_duration(VQ1), VQ3 = check_variable_queue_status( rabbit_variable_queue:set_ram_duration_target(0, VQ2), - %% one segment in q3 as betas, and half a segment in delta + %% one segment in q3, and half a segment in delta [{delta, {delta, SegmentSize, HalfSegment, OneAndAHalfSegment}}, {q3, SegmentSize}, {len, SegmentSize + HalfSegment}]), @@ -2523,7 +2416,7 @@ test_variable_queue_partial_segments_delta_thing(VQ0) -> SegmentSize + HalfSegment + 1, VQ5), VQ7 = check_variable_queue_status( VQ6, - %% the half segment should now be in q3 as betas + %% the half segment should now be in q3 [{q1, 1}, {delta, {delta, undefined, 0, undefined}}, {q3, HalfSegment}, @@ -2573,7 +2466,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) -> VQ2 = variable_queue_publish(false, 4, VQ1), {VQ3, AckTags} = variable_queue_fetch(2, false, false, 4, VQ2), {_Guids, VQ4} = - rabbit_variable_queue:requeue(AckTags, fun(X) -> X end, VQ3), + rabbit_variable_queue:requeue(AckTags, VQ3), VQ5 = rabbit_variable_queue:timeout(VQ4), _VQ6 = rabbit_variable_queue:terminate(shutdown, VQ5), VQ7 = variable_queue_init(test_amqqueue(true), true), diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index b853d983..63a0927f 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -18,7 +18,7 @@ -export([init/3, terminate/2, delete_and_terminate/2, purge/1, publish/4, publish_delivered/5, drain_confirmed/1, - dropwhile/2, fetch/2, ack/2, requeue/3, len/1, is_empty/1, + dropwhile/2, fetch/2, ack/2, requeue/2, len/1, is_empty/1, set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, handle_pre_hibernate/1, status/1, invoke/3, is_duplicate/2, discard/3, @@ -49,17 +49,15 @@ %% within the queue are always held on disk, *in addition* to being in %% one of the above classifications. %% -%% Also note that within this code, the term gamma never -%% appears. Instead, gammas are defined by betas who have had their -%% queue position recorded on disk. +%% Also note that within this code, the term gamma seldom +%% appears. It's frequently the case that gammas are defined by betas +%% who have had their queue position recorded on disk. %% %% In general, messages move q1 -> q2 -> delta -> q3 -> q4, though %% many of these steps are frequently skipped. q1 and q4 only hold -%% alphas, q2 and q3 hold both betas and gammas (as queues of queues, -%% using the bpqueue module where the block prefix determines whether -%% they're betas or gammas). When a message arrives, its -%% classification is determined. It is then added to the rightmost -%% appropriate queue. +%% alphas, q2 and q3 hold both betas and gammas. When a message +%% arrives, its classification is determined. It is then added to the +%% rightmost appropriate queue. %% %% If a new message is determined to be a beta or gamma, q1 is %% empty. If a new message is determined to be a delta, q1 and q2 are @@ -74,14 +72,15 @@ %% %% The duration indicated to us by the memory_monitor is used to %% calculate, given our current ingress and egress rates, how many -%% messages we should hold in RAM. We track the ingress and egress -%% rates for both messages and pending acks and rates for both are -%% considered when calculating the number of messages to hold in -%% RAM. When we need to push alphas to betas or betas to gammas, we -%% favour writing out messages that are further from the head of the -%% queue. This minimises writes to disk, as the messages closer to the -%% tail of the queue stay in the queue for longer, thus do not need to -%% be replaced as quickly by sending other messages to disk. +%% messages we should hold in RAM (i.e. as alphas). We track the +%% ingress and egress rates for both messages and pending acks and +%% rates for both are considered when calculating the number of +%% messages to hold in RAM. When we need to push alphas to betas or +%% betas to gammas, we favour writing out messages that are further +%% from the head of the queue. This minimises writes to disk, as the +%% messages closer to the tail of the queue stay in the queue for +%% longer, thus do not need to be replaced as quickly by sending other +%% messages to disk. %% %% Whilst messages are pushed to disk and forgotten from RAM as soon %% as requested by a new setting of the queue RAM duration, the @@ -119,17 +118,43 @@ %% getting rid of messages as fast as possible and remaining %% responsive, and using only the egress rate impacts that goal. %% -%% If a queue is full of transient messages, then the transition from -%% betas to deltas will be potentially very expensive as millions of -%% entries must be written to disk by the queue_index module. This can -%% badly stall the queue. In order to avoid this, the proportion of -%% gammas / (betas+gammas) must not be lower than (betas+gammas) / -%% (alphas+betas+gammas). As the queue grows or available memory -%% shrinks, the latter ratio increases, requiring the conversion of -%% more gammas to betas in order to maintain the invariant. At the -%% point at which betas and gammas must be converted to deltas, there -%% should be very few betas remaining, thus the transition is fast (no -%% work needs to be done for the gamma -> delta transition). +%% Once the queue has more alphas than the target_ram_count, the +%% surplus must be converted to betas, if not gammas, if not rolled +%% into delta. The conditions under which these transitions occur +%% reflect the conflicting goals of minimising RAM cost per msg, and +%% minimising CPU cost per msg. Once the msg has become a beta, its +%% payload is no longer in RAM, thus a read from the msg_store must +%% occur before the msg can be delivered, but the RAM cost of a beta +%% is the same as a gamma, so converting a beta to gamma will not free +%% up any further RAM. To reduce the RAM cost further, the gamma must +%% be rolled into delta. Whilst recovering a beta or a gamma to an +%% alpha requires only one disk read (from the msg_store), recovering +%% a msg from within delta will require two reads (queue_index and +%% then msg_store). But delta has a near-0 per-msg RAM cost. So the +%% conflict is between using delta more, which will free up more +%% memory, but require additional CPU and disk ops, versus using delta +%% less and gammas and betas more, which will cost more memory, but +%% require fewer disk ops and less CPU overhead. +%% +%% In the case of a persistent msg published to a durable queue, the +%% msg is immediately written to the msg_store and queue_index. If +%% then additionally converted from an alpha, it'll immediately go to +%% a gamma (as it's already in queue_index), and cannot exist as a +%% beta. Thus a durable queue with a mixture of persistent and +%% transient msgs in it which has more messages than permitted by the +%% target_ram_count may contain an interspersed mixture of betas and +%% gammas in q2 and q3. +%% +%% There is then a ratio that controls how many betas and gammas there +%% can be. This is based on the target_ram_count and thus expresses +%% the fact that as the number of permitted alphas in the queue falls, +%% so should the number of betas and gammas fall (i.e. delta +%% grows). If q2 and q3 contain more than the permitted number of +%% betas and gammas, then the surplus are forcibly converted to gammas +%% (as necessary) and then rolled into delta. The ratio is that +%% delta/(betas+gammas+delta) equals +%% (betas+gammas+delta)/(target_ram_count+betas+gammas+delta). I.e. as +%% the target_ram_count shrinks to 0, so must betas and gammas. %% %% The conversion of betas to gammas is done in batches of exactly %% ?IO_BATCH_SIZE. This value should not be too small, otherwise the @@ -137,8 +162,7 @@ %% effectively amortised (switching the direction of queue access %% defeats amortisation), nor should it be too big, otherwise %% converting a batch stalls the queue for too long. Therefore, it -%% must be just right. ram_index_count is used here and is the number -%% of betas. +%% must be just right. %% %% The conversion from alphas to betas is also chunked, but only to %% ensure no more than ?IO_BATCH_SIZE alphas are converted to betas at @@ -248,7 +272,6 @@ ram_msg_count, ram_msg_count_prev, ram_ack_count_prev, - ram_index_count, out_counter, in_counter, rates, @@ -280,8 +303,6 @@ end_seq_id %% end_seq_id is exclusive }). --record(merge_funs, {new, join, out, in, publish}). - %% When we discover, on publish, that we should write some indices to %% disk for some betas, the IO_BATCH_SIZE sets the number of betas %% that we must be due to write indices for before we do any work at @@ -291,6 +312,7 @@ -define(IO_BATCH_SIZE, 64). -define(PERSISTENT_MSG_STORE, msg_store_persistent). -define(TRANSIENT_MSG_STORE, msg_store_transient). +-define(QUEUE, lqueue). -include("rabbit.hrl"). @@ -315,11 +337,11 @@ end_seq_id :: non_neg_integer() }). -type(state() :: #vqstate { - q1 :: queue(), - q2 :: bpqueue:bpqueue(), + q1 :: ?QUEUE:?QUEUE(), + q2 :: ?QUEUE:?QUEUE(), delta :: delta(), - q3 :: bpqueue:bpqueue(), - q4 :: queue(), + q3 :: ?QUEUE:?QUEUE(), + q4 :: ?QUEUE:?QUEUE(), next_seq_id :: seq_id(), pending_ack :: gb_tree(), ram_ack_index :: gb_tree(), @@ -337,7 +359,6 @@ target_ram_count :: non_neg_integer() | 'infinity', ram_msg_count :: non_neg_integer(), ram_msg_count_prev :: non_neg_integer(), - ram_index_count :: non_neg_integer(), out_counter :: non_neg_integer(), in_counter :: non_neg_integer(), rates :: rates(), @@ -475,23 +496,22 @@ purge(State = #vqstate { q4 = Q4, %% we could simply wipe the qi instead of issuing delivers and %% acks for all the messages. {LensByStore, IndexState1} = remove_queue_entries( - fun rabbit_misc:queue_fold/3, Q4, + fun ?QUEUE:foldl/3, Q4, orddict:new(), IndexState, MSCState), {LensByStore1, State1 = #vqstate { q1 = Q1, index_state = IndexState2, msg_store_clients = MSCState1 }} = purge_betas_and_deltas(LensByStore, - State #vqstate { q4 = queue:new(), + State #vqstate { q4 = ?QUEUE:new(), index_state = IndexState1 }), {LensByStore2, IndexState3} = remove_queue_entries( - fun rabbit_misc:queue_fold/3, Q1, + fun ?QUEUE:foldl/3, Q1, LensByStore1, IndexState2, MSCState1), PCount1 = PCount - find_persistent_count(LensByStore2), - {Len, a(State1 #vqstate { q1 = queue:new(), + {Len, a(State1 #vqstate { q1 = ?QUEUE:new(), index_state = IndexState3, len = 0, ram_msg_count = 0, - ram_index_count = 0, persistent_count = PCount1 })}. publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId }, @@ -507,9 +527,9 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId }, IsPersistent1 = IsDurable andalso IsPersistent, MsgStatus = msg_status(IsPersistent1, SeqId, Msg, MsgProps), {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State), - State2 = case bpqueue:is_empty(Q3) of - false -> State1 #vqstate { q1 = queue:in(m(MsgStatus1), Q1) }; - true -> State1 #vqstate { q4 = queue:in(m(MsgStatus1), Q4) } + State2 = case ?QUEUE:is_empty(Q3) of + false -> State1 #vqstate { q1 = ?QUEUE:in(m(MsgStatus1), Q1) }; + true -> State1 #vqstate { q4 = ?QUEUE:in(m(MsgStatus1), Q4) } end, PCount1 = PCount + one_if(IsPersistent1), UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC), @@ -608,21 +628,19 @@ ack(AckTags, State) -> persistent_count = PCount1, ack_out_counter = AckOutCount + length(AckTags) })}. -requeue(AckTags, MsgPropsFun, #vqstate { delta = Delta, - q3 = Q3, - q4 = Q4, - in_counter = InCounter, - len = Len } = State) -> +requeue(AckTags, #vqstate { delta = Delta, + q3 = Q3, + q4 = Q4, + in_counter = InCounter, + len = Len } = State) -> {SeqIds, Q4a, MsgIds, State1} = queue_merge(lists:sort(AckTags), Q4, [], beta_limit(Q3), - alpha_funs(), - MsgPropsFun, State), + fun publish_alpha/2, State), {SeqIds1, Q3a, MsgIds1, State2} = queue_merge(SeqIds, Q3, MsgIds, delta_limit(Delta), - beta_funs(), - MsgPropsFun, State1), + fun publish_beta/2, State1), {Delta1, MsgIds2, State3} = delta_merge(SeqIds1, Delta, MsgIds1, - MsgPropsFun, State2), + State2), MsgCount = length(MsgIds2), {MsgIds2, a(reduce_memory_use( State3 #vqstate { delta = Delta1, @@ -718,7 +736,6 @@ needs_timeout(State) -> false -> case reduce_memory_use( fun (_Quota, State1) -> {0, State1} end, fun (_Quota, State1) -> State1 end, - fun (State1) -> State1 end, fun (_Quota, State1) -> {0, State1} end, State) of {true, _State} -> idle; @@ -740,24 +757,22 @@ status(#vqstate { ram_ack_index = RAI, target_ram_count = TargetRamCount, ram_msg_count = RamMsgCount, - ram_index_count = RamIndexCount, next_seq_id = NextSeqId, persistent_count = PersistentCount, rates = #rates { avg_egress = AvgEgressRate, avg_ingress = AvgIngressRate }, ack_rates = #rates { avg_egress = AvgAckEgressRate, avg_ingress = AvgAckIngressRate } }) -> - [ {q1 , queue:len(Q1)}, - {q2 , bpqueue:len(Q2)}, + [ {q1 , ?QUEUE:len(Q1)}, + {q2 , ?QUEUE:len(Q2)}, {delta , Delta}, - {q3 , bpqueue:len(Q3)}, - {q4 , queue:len(Q4)}, + {q3 , ?QUEUE:len(Q3)}, + {q4 , ?QUEUE:len(Q4)}, {len , Len}, {pending_acks , gb_trees:size(PA)}, {target_ram_count , TargetRamCount}, {ram_msg_count , RamMsgCount}, {ram_ack_count , gb_trees:size(RAI)}, - {ram_index_count , RamIndexCount}, {next_seq_id , NextSeqId}, {persistent_count , PersistentCount}, {avg_ingress_rate , AvgIngressRate}, @@ -776,15 +791,14 @@ discard(_Msg, _ChPid, State) -> State. %%---------------------------------------------------------------------------- a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, - len = Len, - persistent_count = PersistentCount, - ram_msg_count = RamMsgCount, - ram_index_count = RamIndexCount }) -> - E1 = queue:is_empty(Q1), - E2 = bpqueue:is_empty(Q2), + len = Len, + persistent_count = PersistentCount, + ram_msg_count = RamMsgCount }) -> + E1 = ?QUEUE:is_empty(Q1), + E2 = ?QUEUE:is_empty(Q2), ED = Delta#delta.count == 0, - E3 = bpqueue:is_empty(Q3), - E4 = queue:is_empty(Q4), + E3 = ?QUEUE:is_empty(Q3), + E4 = ?QUEUE:is_empty(Q4), LZ = Len == 0, true = E1 or not E3, @@ -795,10 +809,13 @@ a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, true = Len >= 0, true = PersistentCount >= 0, true = RamMsgCount >= 0, - true = RamIndexCount >= 0, State. +d(Delta = #delta { start_seq_id = Start, count = Count, end_seq_id = End }) + when Start + Count =< End -> + Delta. + m(MsgStatus = #msg_status { msg = Msg, is_persistent = IsPersistent, msg_on_disk = MsgOnDisk, @@ -891,54 +908,39 @@ betas_from_index_entries(List, TransientThreshold, PA, IndexState) -> cons_if(not IsDelivered, SeqId, Delivers1), [SeqId | Acks1]}; false -> case gb_trees:is_defined(SeqId, PA) of - false -> {[m(#msg_status { - seq_id = SeqId, - msg_id = MsgId, - msg = undefined, - is_persistent = IsPersistent, - is_delivered = IsDelivered, - msg_on_disk = true, - index_on_disk = true, - msg_props = MsgProps - }) | Filtered1], - Delivers1, - Acks1}; - true -> Acc + false -> + {?QUEUE:in_r( + m(#msg_status { + seq_id = SeqId, + msg_id = MsgId, + msg = undefined, + is_persistent = IsPersistent, + is_delivered = IsDelivered, + msg_on_disk = true, + index_on_disk = true, + msg_props = MsgProps + }), Filtered1), + Delivers1, Acks1}; + true -> + Acc end end - end, {[], [], []}, List), - {bpqueue:from_list([{true, Filtered}]), - rabbit_queue_index:ack(Acks, - rabbit_queue_index:deliver(Delivers, IndexState))}. - -%% the first arg is the older delta -combine_deltas(?BLANK_DELTA_PATTERN(X), ?BLANK_DELTA_PATTERN(Y)) -> - ?BLANK_DELTA; -combine_deltas(?BLANK_DELTA_PATTERN(X), #delta { start_seq_id = Start, - count = Count, - end_seq_id = End } = B) -> - true = Start + Count =< End, %% ASSERTION - B; -combine_deltas(#delta { start_seq_id = Start, - count = Count, - end_seq_id = End } = A, ?BLANK_DELTA_PATTERN(Y)) -> - true = Start + Count =< End, %% ASSERTION - A; -combine_deltas(#delta { start_seq_id = StartLow, - count = CountLow, - end_seq_id = EndLow }, - #delta { start_seq_id = StartHigh, - count = CountHigh, - end_seq_id = EndHigh }) -> - Count = CountLow + CountHigh, - true = (StartLow =< StartHigh) %% ASSERTIONS - andalso ((StartLow + CountLow) =< EndLow) - andalso ((StartHigh + CountHigh) =< EndHigh) - andalso ((StartLow + Count) =< EndHigh), - #delta { start_seq_id = StartLow, count = Count, end_seq_id = EndHigh }. - -beta_fold(Fun, Init, Q) -> - bpqueue:foldr(fun (_Prefix, Value, Acc) -> Fun(Value, Acc) end, Init, Q). + end, {?QUEUE:new(), [], []}, List), + {Filtered, rabbit_queue_index:ack( + Acks, rabbit_queue_index:deliver(Delivers, IndexState))}. + +expand_delta(SeqId, ?BLANK_DELTA_PATTERN(X)) -> + d(#delta { start_seq_id = SeqId, count = 1, end_seq_id = SeqId + 1 }); +expand_delta(SeqId, #delta { start_seq_id = StartSeqId, + count = Count } = Delta) + when SeqId < StartSeqId -> + d(Delta #delta { start_seq_id = SeqId, count = Count + 1 }); +expand_delta(SeqId, #delta { count = Count, + end_seq_id = EndSeqId } = Delta) + when SeqId >= EndSeqId -> + d(Delta #delta { count = Count + 1, end_seq_id = SeqId + 1 }); +expand_delta(_SeqId, #delta { count = Count } = Delta) -> + d(Delta #delta { count = Count + 1 }). update_rate(Now, Then, Count, {OThen, OCount}) -> %% avg over the current period and the previous @@ -955,17 +957,17 @@ init(IsDurable, IndexState, DeltaCount, Terms, AsyncCallback, DeltaCount1 = proplists:get_value(persistent_count, Terms, DeltaCount), Delta = case DeltaCount1 == 0 andalso DeltaCount /= undefined of true -> ?BLANK_DELTA; - false -> #delta { start_seq_id = LowSeqId, - count = DeltaCount1, - end_seq_id = NextSeqId } + false -> d(#delta { start_seq_id = LowSeqId, + count = DeltaCount1, + end_seq_id = NextSeqId }) end, Now = now(), State = #vqstate { - q1 = queue:new(), - q2 = bpqueue:new(), + q1 = ?QUEUE:new(), + q2 = ?QUEUE:new(), delta = Delta, - q3 = bpqueue:new(), - q4 = queue:new(), + q3 = ?QUEUE:new(), + q4 = ?QUEUE:new(), next_seq_id = NextSeqId, pending_ack = gb_trees:empty(), ram_ack_index = gb_trees:empty(), @@ -983,7 +985,6 @@ init(IsDurable, IndexState, DeltaCount, Terms, AsyncCallback, ram_msg_count = 0, ram_msg_count_prev = 0, ram_ack_count_prev = 0, - ram_index_count = 0, out_counter = 0, in_counter = 0, rates = blank_rate(Now, DeltaCount1), @@ -1003,21 +1004,19 @@ blank_rate(Timestamp, IngressLength) -> avg_ingress = 0.0, timestamp = Timestamp }. -in_r(MsgStatus = #msg_status { msg = undefined, index_on_disk = IndexOnDisk }, - State = #vqstate { q3 = Q3, q4 = Q4, ram_index_count = RamIndexCount }) -> - case queue:is_empty(Q4) of - true -> State #vqstate { - q3 = bpqueue:in_r(IndexOnDisk, MsgStatus, Q3), - ram_index_count = RamIndexCount + one_if(not IndexOnDisk) }; +in_r(MsgStatus = #msg_status { msg = undefined }, + State = #vqstate { q3 = Q3, q4 = Q4 }) -> + case ?QUEUE:is_empty(Q4) of + true -> State #vqstate { q3 = ?QUEUE:in_r(MsgStatus, Q3) }; false -> {MsgStatus1, State1 = #vqstate { q4 = Q4a }} = read_msg(MsgStatus, State), - State1 #vqstate { q4 = queue:in_r(MsgStatus1, Q4a) } + State1 #vqstate { q4 = ?QUEUE:in_r(MsgStatus1, Q4a) } end; in_r(MsgStatus, State = #vqstate { q4 = Q4 }) -> - State #vqstate { q4 = queue:in_r(MsgStatus, Q4) }. + State #vqstate { q4 = ?QUEUE:in_r(MsgStatus, Q4) }. queue_out(State = #vqstate { q4 = Q4 }) -> - case queue:out(Q4) of + case ?QUEUE:out(Q4) of {empty, _Q4} -> case fetch_from_q3(State) of {empty, _State1} = Result -> Result; @@ -1095,15 +1094,15 @@ purge_betas_and_deltas(LensByStore, State = #vqstate { q3 = Q3, index_state = IndexState, msg_store_clients = MSCState }) -> - case bpqueue:is_empty(Q3) of + case ?QUEUE:is_empty(Q3) of true -> {LensByStore, State}; false -> {LensByStore1, IndexState1} = - remove_queue_entries(fun beta_fold/3, Q3, + remove_queue_entries(fun ?QUEUE:foldl/3, Q3, LensByStore, IndexState, MSCState), purge_betas_and_deltas(LensByStore1, maybe_deltas_to_betas( State #vqstate { - q3 = bpqueue:new(), + q3 = ?QUEUE:new(), index_state = IndexState1 })) end. @@ -1291,7 +1290,7 @@ blind_confirm(Callback, MsgIdSet) -> Callback(?MODULE, fun (?MODULE, State) -> record_confirms(MsgIdSet, State) end). -msgs_written_to_disk(Callback, MsgIdSet, removed) -> +msgs_written_to_disk(Callback, MsgIdSet, ignored) -> blind_confirm(Callback, MsgIdSet); msgs_written_to_disk(Callback, MsgIdSet, written) -> Callback(?MODULE, @@ -1321,116 +1320,70 @@ msg_indices_written_to_disk(Callback, MsgIdSet) -> %% Internal plumbing for requeue %%---------------------------------------------------------------------------- -alpha_funs() -> - #merge_funs { - new = fun queue:new/0, - join = fun queue:join/2, - out = fun queue:out/1, - in = fun queue:in/2, - publish = fun (#msg_status { msg = undefined } = MsgStatus, State) -> - read_msg(MsgStatus, State); - (MsgStatus, #vqstate { - ram_msg_count = RamMsgCount } = State) -> - {MsgStatus, State #vqstate { - ram_msg_count = RamMsgCount + 1 }} - end}. - -beta_funs() -> - #merge_funs { - new = fun bpqueue:new/0, - join = fun bpqueue:join/2, - out = fun (Q) -> - case bpqueue:out(Q) of - {{value, _IndexOnDisk, MsgStatus}, Q1} -> - {{value, MsgStatus}, Q1}; - {empty, _Q1} = X -> - X - end - end, - in = fun (#msg_status { index_on_disk = IOD } = MsgStatus, Q) -> - bpqueue:in(IOD, MsgStatus, Q) - end, - publish = fun (#msg_status { msg_on_disk = MsgOnDisk } = MsgStatus, - State) -> - {#msg_status { index_on_disk = IndexOnDisk, - msg = Msg} = MsgStatus1, - #vqstate { ram_index_count = RamIndexCount, - ram_msg_count = RamMsgCount } = - State1} = - maybe_write_to_disk(not MsgOnDisk, false, - MsgStatus, State), - {MsgStatus1, State1 #vqstate { - ram_msg_count = RamMsgCount + - one_if(Msg =/= undefined), - ram_index_count = RamIndexCount + - one_if(not IndexOnDisk) }} - end}. +publish_alpha(#msg_status { msg = undefined } = MsgStatus, State) -> + read_msg(MsgStatus, State); +publish_alpha(MsgStatus, #vqstate {ram_msg_count = RamMsgCount } = State) -> + {MsgStatus, State #vqstate { ram_msg_count = RamMsgCount + 1 }}. + +publish_beta(MsgStatus, State) -> + {#msg_status { msg = Msg} = MsgStatus1, + #vqstate { ram_msg_count = RamMsgCount } = State1} = + maybe_write_to_disk(true, false, MsgStatus, State), + {MsgStatus1, State1 #vqstate { + ram_msg_count = RamMsgCount + one_if(Msg =/= undefined) }}. %% Rebuild queue, inserting sequence ids to maintain ordering -queue_merge(SeqIds, Q, MsgIds, Limit, #merge_funs { new = QNew } = Funs, - MsgPropsFun, State) -> - queue_merge(SeqIds, Q, QNew(), MsgIds, Limit, Funs, MsgPropsFun, State). +queue_merge(SeqIds, Q, MsgIds, Limit, PubFun, State) -> + queue_merge(SeqIds, Q, ?QUEUE:new(), MsgIds, + Limit, PubFun, State). -queue_merge([SeqId | Rest] = SeqIds, Q, Front, MsgIds, Limit, - #merge_funs { out = QOut, in = QIn, publish = QPublish } = Funs, - MsgPropsFun, State) +queue_merge([SeqId | Rest] = SeqIds, Q, Front, MsgIds, + Limit, PubFun, State) when Limit == undefined orelse SeqId < Limit -> - case QOut(Q) of + case ?QUEUE:out(Q) of {{value, #msg_status { seq_id = SeqIdQ } = MsgStatus}, Q1} when SeqIdQ < SeqId -> %% enqueue from the remaining queue - queue_merge(SeqIds, Q1, QIn(MsgStatus, Front), MsgIds, - Limit, Funs, MsgPropsFun, State); + queue_merge(SeqIds, Q1, ?QUEUE:in(MsgStatus, Front), MsgIds, + Limit, PubFun, State); {_, _Q1} -> %% enqueue from the remaining list of sequence ids - {MsgStatus, State1} = msg_from_pending_ack(SeqId, MsgPropsFun, - State), + {MsgStatus, State1} = msg_from_pending_ack(SeqId, State), {#msg_status { msg_id = MsgId } = MsgStatus1, State2} = - QPublish(MsgStatus, State1), - queue_merge(Rest, Q, QIn(MsgStatus1, Front), [MsgId | MsgIds], - Limit, Funs, MsgPropsFun, State2) + PubFun(MsgStatus, State1), + queue_merge(Rest, Q, ?QUEUE:in(MsgStatus1, Front), [MsgId | MsgIds], + Limit, PubFun, State2) end; -queue_merge(SeqIds, Q, Front, MsgIds, _Limit, #merge_funs { join = QJoin }, - _MsgPropsFun, State) -> - {SeqIds, QJoin(Front, Q), MsgIds, State}. +queue_merge(SeqIds, Q, Front, MsgIds, + _Limit, _PubFun, State) -> + {SeqIds, ?QUEUE:join(Front, Q), MsgIds, State}. -delta_merge([], Delta, MsgIds, _MsgPropsFun, State) -> +delta_merge([], Delta, MsgIds, State) -> {Delta, MsgIds, State}; -delta_merge(SeqIds, #delta { start_seq_id = StartSeqId, - count = Count, - end_seq_id = EndSeqId} = Delta, - MsgIds, MsgPropsFun, State) -> +delta_merge(SeqIds, Delta, MsgIds, State) -> lists:foldl(fun (SeqId, {Delta0, MsgIds0, State0}) -> - {#msg_status { msg_id = MsgId, - index_on_disk = IndexOnDisk, - msg_on_disk = MsgOnDisk} = MsgStatus, - State1} = - msg_from_pending_ack(SeqId, MsgPropsFun, State0), + {#msg_status { msg_id = MsgId } = MsgStatus, State1} = + msg_from_pending_ack(SeqId, State0), {_MsgStatus, State2} = - maybe_write_to_disk(not MsgOnDisk, not IndexOnDisk, - MsgStatus, State1), - {Delta0 #delta { - start_seq_id = lists:min([SeqId, StartSeqId]), - count = Count + 1, - end_seq_id = lists:max([SeqId + 1, EndSeqId]) }, - [MsgId | MsgIds0], State2} + maybe_write_to_disk(true, true, MsgStatus, State1), + {expand_delta(SeqId, Delta0), [MsgId | MsgIds0], State2} end, {Delta, MsgIds, State}, SeqIds). %% Mostly opposite of record_pending_ack/2 -msg_from_pending_ack(SeqId, MsgPropsFun, State) -> +msg_from_pending_ack(SeqId, State) -> {#msg_status { msg_props = MsgProps } = MsgStatus, State1} = remove_pending_ack(SeqId, State), {MsgStatus #msg_status { - msg_props = (MsgPropsFun(MsgProps)) #message_properties { - needs_confirming = false } }, State1}. + msg_props = MsgProps #message_properties { needs_confirming = false } }, + State1}. -beta_limit(BPQ) -> - case bpqueue:out(BPQ) of - {{value, _Prefix, #msg_status { seq_id = SeqId }}, _BPQ} -> SeqId; - {empty, _BPQ} -> undefined +beta_limit(Q) -> + case ?QUEUE:peek(Q) of + {value, #msg_status { seq_id = SeqId }} -> SeqId; + empty -> undefined end. -delta_limit(?BLANK_DELTA_PATTERN(_X)) -> undefined; +delta_limit(?BLANK_DELTA_PATTERN(_X)) -> undefined; delta_limit(#delta { start_seq_id = StartSeqId }) -> StartSeqId. %%---------------------------------------------------------------------------- @@ -1456,10 +1409,10 @@ delta_limit(#delta { start_seq_id = StartSeqId }) -> StartSeqId. %% one segment's worth of messages in q3 - and thus would risk %% perpetually reporting the need for a conversion when no such %% conversion is needed. That in turn could cause an infinite loop. -reduce_memory_use(_AlphaBetaFun, _BetaGammaFun, _BetaDeltaFun, _AckFun, +reduce_memory_use(_AlphaBetaFun, _BetaDeltaFun, _AckFun, State = #vqstate {target_ram_count = infinity}) -> {false, State}; -reduce_memory_use(AlphaBetaFun, BetaGammaFun, BetaDeltaFun, AckFun, +reduce_memory_use(AlphaBetaFun, BetaDeltaFun, AckFun, State = #vqstate { ram_ack_index = RamAckIndex, ram_msg_count = RamMsgCount, @@ -1470,7 +1423,7 @@ reduce_memory_use(AlphaBetaFun, BetaGammaFun, BetaDeltaFun, AckFun, avg_egress = AvgAckEgress } }) -> - {Reduce, State1} = + {Reduce, State1 = #vqstate { q2 = Q2, q3 = Q3 }} = case chunk_size(RamMsgCount + gb_trees:size(RamAckIndex), TargetRamCount) of 0 -> {false, State}; @@ -1491,13 +1444,10 @@ reduce_memory_use(AlphaBetaFun, BetaGammaFun, BetaDeltaFun, AckFun, {true, State2} end, - case State1 #vqstate.target_ram_count of - 0 -> {Reduce, BetaDeltaFun(State1)}; - _ -> case chunk_size(State1 #vqstate.ram_index_count, - permitted_ram_index_count(State1)) of - ?IO_BATCH_SIZE = S2 -> {true, BetaGammaFun(S2, State1)}; - _ -> {Reduce, State1} - end + case chunk_size(?QUEUE:len(Q2) + ?QUEUE:len(Q3), + permitted_beta_count(State1)) of + ?IO_BATCH_SIZE = S2 -> {true, BetaDeltaFun(S2, State1)}; + _ -> {Reduce, State1} end. limit_ram_acks(0, State) -> @@ -1519,54 +1469,25 @@ limit_ram_acks(Quota, State = #vqstate { pending_ack = PA, ram_ack_index = RAI1 }) end. - reduce_memory_use(State) -> {_, State1} = reduce_memory_use(fun push_alphas_to_betas/2, - fun limit_ram_index/2, - fun push_betas_to_deltas/1, + fun push_betas_to_deltas/2, fun limit_ram_acks/2, State), State1. -limit_ram_index(Quota, State = #vqstate { q2 = Q2, q3 = Q3, - index_state = IndexState, - ram_index_count = RamIndexCount }) -> - {Q2a, {Quota1, IndexState1}} = limit_ram_index( - fun bpqueue:map_fold_filter_r/4, - Q2, {Quota, IndexState}), - %% TODO: we shouldn't be writing index entries for messages that - %% can never end up in delta due them residing in the only segment - %% held by q3. - {Q3a, {Quota2, IndexState2}} = limit_ram_index( - fun bpqueue:map_fold_filter_r/4, - Q3, {Quota1, IndexState1}), - State #vqstate { q2 = Q2a, q3 = Q3a, - index_state = IndexState2, - ram_index_count = RamIndexCount - (Quota - Quota2) }. - -limit_ram_index(_MapFoldFilterFun, Q, {0, IndexState}) -> - {Q, {0, IndexState}}; -limit_ram_index(MapFoldFilterFun, Q, {Quota, IndexState}) -> - MapFoldFilterFun( - fun erlang:'not'/1, - fun (MsgStatus, {0, _IndexStateN}) -> - false = MsgStatus #msg_status.index_on_disk, %% ASSERTION - stop; - (MsgStatus, {N, IndexStateN}) when N > 0 -> - false = MsgStatus #msg_status.index_on_disk, %% ASSERTION - {MsgStatus1, IndexStateN1} = - maybe_write_index_to_disk(true, MsgStatus, IndexStateN), - {true, m(MsgStatus1), {N-1, IndexStateN1}} - end, {Quota, IndexState}, Q). - -permitted_ram_index_count(#vqstate { len = 0 }) -> +permitted_beta_count(#vqstate { len = 0 }) -> infinity; -permitted_ram_index_count(#vqstate { len = Len, - q2 = Q2, - q3 = Q3, - delta = #delta { count = DeltaCount } }) -> - BetaLen = bpqueue:len(Q2) + bpqueue:len(Q3), - BetaLen - trunc(BetaLen * BetaLen / (Len - DeltaCount)). +permitted_beta_count(#vqstate { target_ram_count = 0, q3 = Q3 }) -> + lists:min([?QUEUE:len(Q3), rabbit_queue_index:next_segment_boundary(0)]); +permitted_beta_count(#vqstate { q1 = Q1, + q4 = Q4, + target_ram_count = TargetRamCount, + len = Len }) -> + BetaDelta = Len - ?QUEUE:len(Q1) - ?QUEUE:len(Q4), + lists:max([rabbit_queue_index:next_segment_boundary(0), + BetaDelta - ((BetaDelta * BetaDelta) div + (BetaDelta + TargetRamCount))]). chunk_size(Current, Permitted) when Permitted =:= infinity orelse Permitted >= Current -> @@ -1574,41 +1495,35 @@ chunk_size(Current, Permitted) chunk_size(Current, Permitted) -> lists:min([Current - Permitted, ?IO_BATCH_SIZE]). -fetch_from_q3(State = #vqstate { - q1 = Q1, - q2 = Q2, - delta = #delta { count = DeltaCount }, - q3 = Q3, - q4 = Q4, - ram_index_count = RamIndexCount}) -> - case bpqueue:out(Q3) of +fetch_from_q3(State = #vqstate { q1 = Q1, + q2 = Q2, + delta = #delta { count = DeltaCount }, + q3 = Q3, + q4 = Q4 }) -> + case ?QUEUE:out(Q3) of {empty, _Q3} -> {empty, State}; - {{value, IndexOnDisk, MsgStatus}, Q3a} -> - RamIndexCount1 = RamIndexCount - one_if(not IndexOnDisk), - true = RamIndexCount1 >= 0, %% ASSERTION - State1 = State #vqstate { q3 = Q3a, - ram_index_count = RamIndexCount1 }, - State2 = - case {bpqueue:is_empty(Q3a), 0 == DeltaCount} of - {true, true} -> - %% q3 is now empty, it wasn't before; delta is - %% still empty. So q2 must be empty, and we - %% know q4 is empty otherwise we wouldn't be - %% loading from q3. As such, we can just set - %% q4 to Q1. - true = bpqueue:is_empty(Q2), %% ASSERTION - true = queue:is_empty(Q4), %% ASSERTION - State1 #vqstate { q1 = queue:new(), - q4 = Q1 }; - {true, false} -> - maybe_deltas_to_betas(State1); - {false, _} -> - %% q3 still isn't empty, we've not touched - %% delta, so the invariants between q1, q2, - %% delta and q3 are maintained - State1 - end, + {{value, MsgStatus}, Q3a} -> + State1 = State #vqstate { q3 = Q3a }, + State2 = case {?QUEUE:is_empty(Q3a), 0 == DeltaCount} of + {true, true} -> + %% q3 is now empty, it wasn't before; + %% delta is still empty. So q2 must be + %% empty, and we know q4 is empty + %% otherwise we wouldn't be loading from + %% q3. As such, we can just set q4 to Q1. + true = ?QUEUE:is_empty(Q2), %% ASSERTION + true = ?QUEUE:is_empty(Q4), %% ASSERTION + State1 #vqstate { q1 = ?QUEUE:new(), q4 = Q1 }; + {true, false} -> + maybe_deltas_to_betas(State1); + {false, _} -> + %% q3 still isn't empty, we've not + %% touched delta, so the invariants + %% between q1, q2, delta and q3 are + %% maintained + State1 + end, {loaded, {MsgStatus, State2}} end. @@ -1632,143 +1547,120 @@ maybe_deltas_to_betas(State = #vqstate { {Q3a, IndexState2} = betas_from_index_entries(List, TransientThreshold, PA, IndexState1), State1 = State #vqstate { index_state = IndexState2 }, - case bpqueue:len(Q3a) of + case ?QUEUE:len(Q3a) of 0 -> %% we ignored every message in the segment due to it being %% transient and below the threshold maybe_deltas_to_betas( State1 #vqstate { - delta = Delta #delta { start_seq_id = DeltaSeqId1 }}); + delta = d(Delta #delta { start_seq_id = DeltaSeqId1 })}); Q3aLen -> - Q3b = bpqueue:join(Q3, Q3a), + Q3b = ?QUEUE:join(Q3, Q3a), case DeltaCount - Q3aLen of 0 -> %% delta is now empty, but it wasn't before, so %% can now join q2 onto q3 - State1 #vqstate { q2 = bpqueue:new(), + State1 #vqstate { q2 = ?QUEUE:new(), delta = ?BLANK_DELTA, - q3 = bpqueue:join(Q3b, Q2) }; + q3 = ?QUEUE:join(Q3b, Q2) }; N when N > 0 -> - Delta1 = #delta { start_seq_id = DeltaSeqId1, - count = N, - end_seq_id = DeltaSeqIdEnd }, + Delta1 = d(#delta { start_seq_id = DeltaSeqId1, + count = N, + end_seq_id = DeltaSeqIdEnd }), State1 #vqstate { delta = Delta1, q3 = Q3b } end end. push_alphas_to_betas(Quota, State) -> - {Quota1, State1} = maybe_push_q1_to_betas(Quota, State), - {Quota2, State2} = maybe_push_q4_to_betas(Quota1, State1), + {Quota1, State1} = + push_alphas_to_betas( + fun ?QUEUE:out/1, + fun (MsgStatus, Q1a, + State0 = #vqstate { q3 = Q3, delta = #delta { count = 0 } }) -> + State0 #vqstate { q1 = Q1a, q3 = ?QUEUE:in(MsgStatus, Q3) }; + (MsgStatus, Q1a, State0 = #vqstate { q2 = Q2 }) -> + State0 #vqstate { q1 = Q1a, q2 = ?QUEUE:in(MsgStatus, Q2) } + end, Quota, State #vqstate.q1, State), + {Quota2, State2} = + push_alphas_to_betas( + fun ?QUEUE:out_r/1, + fun (MsgStatus, Q4a, State0 = #vqstate { q3 = Q3 }) -> + State0 #vqstate { q3 = ?QUEUE:in_r(MsgStatus, Q3), q4 = Q4a } + end, Quota1, State1 #vqstate.q4, State1), {Quota2, State2}. -maybe_push_q1_to_betas(Quota, State = #vqstate { q1 = Q1 }) -> - maybe_push_alphas_to_betas( - fun queue:out/1, - fun (MsgStatus = #msg_status { index_on_disk = IndexOnDisk }, - Q1a, State1 = #vqstate { q3 = Q3, delta = #delta { count = 0 } }) -> - State1 #vqstate { q1 = Q1a, - q3 = bpqueue:in(IndexOnDisk, MsgStatus, Q3) }; - (MsgStatus = #msg_status { index_on_disk = IndexOnDisk }, - Q1a, State1 = #vqstate { q2 = Q2 }) -> - State1 #vqstate { q1 = Q1a, - q2 = bpqueue:in(IndexOnDisk, MsgStatus, Q2) } - end, Quota, Q1, State). - -maybe_push_q4_to_betas(Quota, State = #vqstate { q4 = Q4 }) -> - maybe_push_alphas_to_betas( - fun queue:out_r/1, - fun (MsgStatus = #msg_status { index_on_disk = IndexOnDisk }, - Q4a, State1 = #vqstate { q3 = Q3 }) -> - State1 #vqstate { q3 = bpqueue:in_r(IndexOnDisk, MsgStatus, Q3), - q4 = Q4a } - end, Quota, Q4, State). - -maybe_push_alphas_to_betas(_Generator, _Consumer, Quota, _Q, - State = #vqstate { - ram_msg_count = RamMsgCount, - target_ram_count = TargetRamCount }) +push_alphas_to_betas(_Generator, _Consumer, Quota, _Q, + State = #vqstate { ram_msg_count = RamMsgCount, + target_ram_count = TargetRamCount }) when Quota =:= 0 orelse TargetRamCount =:= infinity orelse TargetRamCount >= RamMsgCount -> {Quota, State}; -maybe_push_alphas_to_betas(Generator, Consumer, Quota, Q, State) -> +push_alphas_to_betas(Generator, Consumer, Quota, Q, State) -> case Generator(Q) of {empty, _Q} -> {Quota, State}; {{value, MsgStatus}, Qa} -> - {MsgStatus1 = #msg_status { msg_on_disk = true, - index_on_disk = IndexOnDisk }, - State1 = #vqstate { ram_msg_count = RamMsgCount, - ram_index_count = RamIndexCount }} = + {MsgStatus1 = #msg_status { msg_on_disk = true }, + State1 = #vqstate { ram_msg_count = RamMsgCount }} = maybe_write_to_disk(true, false, MsgStatus, State), MsgStatus2 = m(trim_msg_status(MsgStatus1)), - RamIndexCount1 = RamIndexCount + one_if(not IndexOnDisk), - State2 = State1 #vqstate { ram_msg_count = RamMsgCount - 1, - ram_index_count = RamIndexCount1 }, - maybe_push_alphas_to_betas(Generator, Consumer, Quota - 1, Qa, - Consumer(MsgStatus2, Qa, State2)) + State2 = State1 #vqstate { ram_msg_count = RamMsgCount - 1 }, + push_alphas_to_betas(Generator, Consumer, Quota - 1, Qa, + Consumer(MsgStatus2, Qa, State2)) end. -push_betas_to_deltas(State = #vqstate { q2 = Q2, - delta = Delta, - q3 = Q3, - index_state = IndexState, - ram_index_count = RamIndexCount }) -> - {Delta2, Q2a, RamIndexCount2, IndexState2} = - push_betas_to_deltas(fun (Q2MinSeqId) -> Q2MinSeqId end, - fun bpqueue:out/1, Q2, - RamIndexCount, IndexState), - {Delta3, Q3a, RamIndexCount3, IndexState3} = - push_betas_to_deltas(fun rabbit_queue_index:next_segment_boundary/1, - fun bpqueue:out_r/1, Q3, - RamIndexCount2, IndexState2), - Delta4 = combine_deltas(Delta3, combine_deltas(Delta, Delta2)), - State #vqstate { q2 = Q2a, - delta = Delta4, - q3 = Q3a, - index_state = IndexState3, - ram_index_count = RamIndexCount3 }. - -push_betas_to_deltas(LimitFun, Generator, Q, RamIndexCount, IndexState) -> - case bpqueue:out(Q) of - {empty, _Q} -> - {?BLANK_DELTA, Q, RamIndexCount, IndexState}; - {{value, _IndexOnDisk1, #msg_status { seq_id = MinSeqId }}, _Qa} -> - {{value, _IndexOnDisk2, #msg_status { seq_id = MaxSeqId }}, _Qb} = - bpqueue:out_r(Q), +push_betas_to_deltas(Quota, State = #vqstate { q2 = Q2, + delta = Delta, + q3 = Q3, + index_state = IndexState }) -> + PushState = {Quota, Delta, IndexState}, + {Q3a, PushState1} = push_betas_to_deltas( + fun ?QUEUE:out_r/1, + fun rabbit_queue_index:next_segment_boundary/1, + Q3, PushState), + {Q2a, PushState2} = push_betas_to_deltas( + fun ?QUEUE:out/1, + fun (Q2MinSeqId) -> Q2MinSeqId end, + Q2, PushState1), + {_, Delta1, IndexState1} = PushState2, + State #vqstate { q2 = Q2a, + delta = Delta1, + q3 = Q3a, + index_state = IndexState1 }. + +push_betas_to_deltas(Generator, LimitFun, Q, PushState) -> + case ?QUEUE:is_empty(Q) of + true -> + {Q, PushState}; + false -> + {value, #msg_status { seq_id = MinSeqId }} = ?QUEUE:peek(Q), + {value, #msg_status { seq_id = MaxSeqId }} = ?QUEUE:peek_r(Q), Limit = LimitFun(MinSeqId), case MaxSeqId < Limit of - true -> {?BLANK_DELTA, Q, RamIndexCount, IndexState}; - false -> {Len, Qc, RamIndexCount1, IndexState1} = - push_betas_to_deltas(Generator, Limit, Q, 0, - RamIndexCount, IndexState), - {#delta { start_seq_id = Limit, - count = Len, - end_seq_id = MaxSeqId + 1 }, - Qc, RamIndexCount1, IndexState1} + true -> {Q, PushState}; + false -> push_betas_to_deltas1(Generator, Limit, Q, PushState) end end. -push_betas_to_deltas(Generator, Limit, Q, Count, RamIndexCount, IndexState) -> +push_betas_to_deltas1(_Generator, _Limit, Q, + {0, _Delta, _IndexState} = PushState) -> + {Q, PushState}; +push_betas_to_deltas1(Generator, Limit, Q, + {Quota, Delta, IndexState} = PushState) -> case Generator(Q) of {empty, _Q} -> - {Count, Q, RamIndexCount, IndexState}; - {{value, _IndexOnDisk, #msg_status { seq_id = SeqId }}, _Qa} + {Q, PushState}; + {{value, #msg_status { seq_id = SeqId }}, _Qa} when SeqId < Limit -> - {Count, Q, RamIndexCount, IndexState}; - {{value, IndexOnDisk, MsgStatus}, Qa} -> - {RamIndexCount1, IndexState1} = - case IndexOnDisk of - true -> {RamIndexCount, IndexState}; - false -> {#msg_status { index_on_disk = true }, - IndexState2} = - maybe_write_index_to_disk(true, MsgStatus, - IndexState), - {RamIndexCount - 1, IndexState2} - end, - push_betas_to_deltas( - Generator, Limit, Qa, Count + 1, RamIndexCount1, IndexState1) + {Q, PushState}; + {{value, MsgStatus = #msg_status { seq_id = SeqId }}, Qa} -> + {#msg_status { index_on_disk = true }, IndexState1} = + maybe_write_index_to_disk(true, MsgStatus, IndexState), + Delta1 = expand_delta(SeqId, Delta), + push_betas_to_deltas1(Generator, Limit, Qa, + {Quota - 1, Delta1, IndexState1}) end. %%---------------------------------------------------------------------------- diff --git a/src/vm_memory_monitor.erl b/src/vm_memory_monitor.erl index 35ee1e51..8973a4f7 100644 --- a/src/vm_memory_monitor.erl +++ b/src/vm_memory_monitor.erl @@ -32,7 +32,7 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --export([update/0, get_total_memory/0, get_vm_limit/0, +-export([get_total_memory/0, get_vm_limit/0, get_check_interval/0, set_check_interval/1, get_vm_memory_high_watermark/0, set_vm_memory_high_watermark/1, get_memory_limit/0]). @@ -59,7 +59,6 @@ -ifdef(use_specs). -spec(start_link/1 :: (float()) -> rabbit_types:ok_pid_or_error()). --spec(update/0 :: () -> 'ok'). -spec(get_total_memory/0 :: () -> (non_neg_integer() | 'unknown')). -spec(get_vm_limit/0 :: () -> non_neg_integer()). -spec(get_check_interval/0 :: () -> non_neg_integer()). @@ -74,9 +73,6 @@ %% Public API %%---------------------------------------------------------------------------- -update() -> - gen_server:cast(?SERVER, update). - get_total_memory() -> get_total_memory(os:type()). @@ -135,12 +131,12 @@ handle_call(get_memory_limit, _From, State) -> handle_call(_Request, _From, State) -> {noreply, State}. -handle_cast(update, State) -> - {noreply, internal_update(State)}; - handle_cast(_Request, State) -> {noreply, State}. +handle_info(update, State) -> + {noreply, internal_update(State)}; + handle_info(_Info, State) -> {noreply, State}. @@ -200,7 +196,7 @@ emit_update_info(State, MemUsed, MemLimit) -> [State, MemUsed, MemLimit]). start_timer(Timeout) -> - {ok, TRef} = timer:apply_interval(Timeout, ?MODULE, update, []), + {ok, TRef} = timer:send_interval(Timeout, update), TRef. %% According to http://msdn.microsoft.com/en-us/library/aa366778(VS.85).aspx |