summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2011-11-07 12:47:18 +0000
committerSimon MacMullen <simon@rabbitmq.com>2011-11-07 12:47:18 +0000
commita4460624b982c294bd3bf35d13474b6191c01998 (patch)
tree24ac469a2a701d91b01da2c7e985267a754bf7fc
parent8310e0bd8b4f2ddeae5549ec23883b675f20a73e (diff)
parent1d31bd97926db8fcce132906c17cf4105467df9a (diff)
downloadrabbitmq-server-a4460624b982c294bd3bf35d13474b6191c01998.tar.gz
Merge bug24430.
-rw-r--r--codegen.py55
-rw-r--r--docs/rabbitmqctl.1.xml8
-rw-r--r--ebin/rabbit_app.in3
-rw-r--r--include/rabbit_backing_queue_spec.hrl5
-rw-r--r--packaging/common/rabbitmq-server.init8
-rwxr-xr-xquickcheck9
-rwxr-xr-xscripts/rabbitmq-server.bat10
-rwxr-xr-xscripts/rabbitmq-service.bat23
-rw-r--r--src/bpqueue.erl271
-rw-r--r--src/lqueue.erl90
-rw-r--r--src/mirrored_supervisor_tests.erl1
-rw-r--r--src/rabbit.erl67
-rw-r--r--src/rabbit_alarm.erl6
-rw-r--r--src/rabbit_amqqueue_process.erl29
-rw-r--r--src/rabbit_backing_queue.erl2
-rw-r--r--src/rabbit_backing_queue_qc.erl46
-rw-r--r--src/rabbit_binary_parser.erl55
-rw-r--r--src/rabbit_channel.erl5
-rw-r--r--src/rabbit_control.erl48
-rw-r--r--src/rabbit_exchange.erl10
-rw-r--r--src/rabbit_memory_monitor.erl35
-rw-r--r--src/rabbit_mirror_queue_master.erl12
-rw-r--r--src/rabbit_mirror_queue_slave.erl4
-rw-r--r--src/rabbit_misc.erl26
-rw-r--r--src/rabbit_msg_store.erl284
-rw-r--r--src/rabbit_plugins.erl2
-rw-r--r--src/rabbit_prelaunch.erl19
-rw-r--r--src/rabbit_queue_index.erl17
-rw-r--r--src/rabbit_router.erl5
-rw-r--r--src/rabbit_tests.erl335
-rw-r--r--src/rabbit_variable_queue.erl704
-rw-r--r--src/vm_memory_monitor.erl14
32 files changed, 1011 insertions, 1197 deletions
diff --git a/codegen.py b/codegen.py
index 7636c196..494be73d 100644
--- a/codegen.py
+++ b/codegen.py
@@ -42,7 +42,8 @@ erlangTypeMap = {
def convertTable(d):
if len(d) == 0:
return "[]"
- else: raise 'Non-empty table defaults not supported', d
+ else:
+ raise Exception('Non-empty table defaults not supported ' + d)
erlangDefaultValueTypeConvMap = {
bool : lambda x: str(x).lower(),
@@ -229,9 +230,32 @@ def genErl(spec):
print " %s;" % (recordConstructorExpr,)
def genDecodeProperties(c):
- print "decode_properties(%d, PropBin) ->" % (c.index)
- print " %s = rabbit_binary_parser:parse_properties(%s, PropBin)," % \
- (fieldTempList(c.fields), fieldTypeList(c.fields))
+ def presentBin(fields):
+ ps = ', '.join(['P' + str(f.index) + ':1' for f in fields])
+ return '<<' + ps + ', _:%d, R0/binary>>' % (16 - len(fields),)
+ def mkMacroName(field):
+ return '?' + field.domain.upper() + '_PROP'
+ def writePropFieldLine(field, bin_next = None):
+ i = str(field.index)
+ if not bin_next:
+ bin_next = 'R' + str(field.index + 1)
+ if field.domain in ['octet', 'timestamp']:
+ print (" {%s, %s} = %s(%s, %s, %s, %s)," %
+ ('F' + i, bin_next, mkMacroName(field), 'P' + i,
+ 'R' + i, 'I' + i, 'X' + i))
+ else:
+ print (" {%s, %s} = %s(%s, %s, %s, %s, %s)," %
+ ('F' + i, bin_next, mkMacroName(field), 'P' + i,
+ 'R' + i, 'L' + i, 'S' + i, 'X' + i))
+
+ if len(c.fields) == 0:
+ print "decode_properties(%d, _) ->" % (c.index,)
+ else:
+ print ("decode_properties(%d, %s) ->" %
+ (c.index, presentBin(c.fields)))
+ for field in c.fields[:-1]:
+ writePropFieldLine(field)
+ writePropFieldLine(c.fields[-1], "<<>>")
print " #'P_%s'{%s};" % (erlangize(c.name), fieldMapList(c.fields))
def genFieldPreprocessing(packed):
@@ -272,7 +296,7 @@ def genErl(spec):
if mCls == 'SOFT_ERROR': genLookupException1(c,'false')
elif mCls == 'HARD_ERROR': genLookupException1(c, 'true')
elif mCls == '': pass
- else: raise 'Unknown constant class', cls
+ else: raise Exception('Unknown constant class' + cls)
def genLookupException1(c,hardErrorBoolStr):
n = erlangConstantName(c)
@@ -404,6 +428,27 @@ shortstr_size(S) ->
Len when Len =< 255 -> Len;
_ -> exit(method_field_shortstr_overflow)
end.
+
+-define(SHORTSTR_PROP(P, R, L, S, X),
+ if P =:= 0 -> {undefined, R};
+ true -> <<L:8/unsigned, S:L/binary, X/binary>> = R,
+ {S, X}
+ end).
+-define(TABLE_PROP(P, R, L, T, X),
+ if P =:= 0 -> {undefined, R};
+ true -> <<L:32/unsigned, T:L/binary, X/binary>> = R,
+ {rabbit_binary_parser:parse_table(T), X}
+ end).
+-define(OCTET_PROP(P, R, I, X),
+ if P =:= 0 -> {undefined, R};
+ true -> <<I:8/unsigned, X/binary>> = R,
+ {I, X}
+ end).
+-define(TIMESTAMP_PROP(P, R, I, X),
+ if P =:= 0 -> {undefined, R};
+ true -> <<I:64/unsigned, X/binary>> = R,
+ {I, X}
+ end).
"""
version = "{%d, %d, %d}" % (spec.major, spec.minor, spec.revision)
if version == '{8, 0, 0}': version = '{0, 8, 0}'
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index 662a36c7..f21888bd 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -103,7 +103,7 @@
<variablelist>
<varlistentry>
- <term><cmdsynopsis><command>stop</command></cmdsynopsis></term>
+ <term><cmdsynopsis><command>stop</command> <arg choice="opt"><replaceable>pid_file</replaceable></arg></cmdsynopsis></term>
<listitem>
<para>
Stops the Erlang node on which RabbitMQ is running. To
@@ -111,6 +111,12 @@
the Server</citetitle> in the <ulink url="http://www.rabbitmq.com/install.html">installation
guide</ulink>.
</para>
+ <para>
+ If a <option>pid_file</option> is specified, also waits
+ for the process specified there to terminate. See the
+ description of the <option>wait</option> command below
+ for details on this file.
+ </para>
<para role="example-prefix">For example:</para>
<screen role="example">rabbitmqctl stop</screen>
<para role="example">
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in
index 65a3269a..5ead1051 100644
--- a/ebin/rabbit_app.in
+++ b/ebin/rabbit_app.in
@@ -14,7 +14,8 @@
%% we also depend on crypto, public_key and ssl but they shouldn't be
%% in here as we don't actually want to start it
{mod, {rabbit, []}},
- {env, [{tcp_listeners, [5672]},
+ {env, [{hipe_compile, false},
+ {tcp_listeners, [5672]},
{ssl_listeners, []},
{ssl_options, []},
{vm_memory_high_watermark, 0.4},
diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl
index 20fe4234..4a657951 100644
--- a/include/rabbit_backing_queue_spec.hrl
+++ b/include/rabbit_backing_queue_spec.hrl
@@ -22,9 +22,6 @@
-type(attempt_recovery() :: boolean()).
-type(purged_msg_count() :: non_neg_integer()).
-type(confirm_required() :: boolean()).
--type(message_properties_transformer() ::
- fun ((rabbit_types:message_properties())
- -> rabbit_types:message_properties())).
-type(async_callback() :: fun ((atom(), fun ((atom(), state()) -> state())) -> 'ok')).
-type(duration() :: ('undefined' | 'infinity' | number())).
@@ -51,7 +48,7 @@
-spec(fetch/2 :: (true, state()) -> {fetch_result(ack()), state()};
(false, state()) -> {fetch_result(undefined), state()}).
-spec(ack/2 :: ([ack()], state()) -> {[rabbit_guid:guid()], state()}).
--spec(requeue/3 :: ([ack()], message_properties_transformer(), state())
+-spec(requeue/2 :: ([ack()], state())
-> {[rabbit_guid:guid()], state()}).
-spec(len/1 :: (state()) -> non_neg_integer()).
-spec(is_empty/1 :: (state()) -> boolean()).
diff --git a/packaging/common/rabbitmq-server.init b/packaging/common/rabbitmq-server.init
index 15fd5d5b..4084d8c7 100644
--- a/packaging/common/rabbitmq-server.init
+++ b/packaging/common/rabbitmq-server.init
@@ -56,8 +56,10 @@ start_rabbitmq () {
RETVAL=0
ensure_pid_dir
set +e
- setsid sh -c "RABBITMQ_PID_FILE=$PID_FILE $DAEMON > \
- ${INIT_LOG_DIR}/startup_log 2> ${INIT_LOG_DIR}/startup_err" &
+ RABBITMQ_PID_FILE=$PID_FILE setsid $DAEMON \
+ > "${INIT_LOG_DIR}/startup_log" \
+ 2> "${INIT_LOG_DIR}/startup_err" \
+ 0<&- &
$CONTROL wait $PID_FILE >/dev/null 2>&1
RETVAL=$?
set -e
@@ -81,7 +83,7 @@ stop_rabbitmq () {
status_rabbitmq quiet
if [ $RETVAL = 0 ] ; then
set +e
- $CONTROL stop > ${INIT_LOG_DIR}/shutdown_log 2> ${INIT_LOG_DIR}/shutdown_err
+ $CONTROL stop ${PID_FILE} > ${INIT_LOG_DIR}/shutdown_log 2> ${INIT_LOG_DIR}/shutdown_err
RETVAL=$?
set -e
if [ $RETVAL = 0 ] ; then
diff --git a/quickcheck b/quickcheck
index a36cf3ed..b5382d75 100755
--- a/quickcheck
+++ b/quickcheck
@@ -6,15 +6,16 @@
%% A helper to test quickcheck properties on a running broker
%% NodeStr is a local broker node name
%% ModStr is the module containing quickcheck properties
-%% The number of trials is optional
-main([NodeStr, ModStr | TrialsStr]) ->
+%% TrialsStr is the number of trials
+main([NodeStr, ModStr, TrialsStr]) ->
{ok, Hostname} = inet:gethostname(),
Node = list_to_atom(NodeStr ++ "@" ++ Hostname),
Mod = list_to_atom(ModStr),
- Trials = lists:map(fun erlang:list_to_integer/1, TrialsStr),
+ Trials = erlang:list_to_integer(TrialsStr),
case rpc:call(Node, code, ensure_loaded, [proper]) of
{module, proper} ->
- case rpc:call(Node, proper, module, [Mod] ++ Trials) of
+ case rpc:call(Node, proper, module,
+ [Mod] ++ [[{numtests, Trials}, {constraint_tries, 200}]]) of
[] -> ok;
_ -> quit(1)
end;
diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat
index 0a78794f..c27b418a 100755
--- a/scripts/rabbitmq-server.bat
+++ b/scripts/rabbitmq-server.bat
@@ -57,13 +57,11 @@ if not exist "!ERLANG_HOME!\bin\erl.exe" (
exit /B
)
-set RABBITMQ_BASE_UNIX=!RABBITMQ_BASE:\=/!
-
if "!RABBITMQ_MNESIA_BASE!"=="" (
- set RABBITMQ_MNESIA_BASE=!RABBITMQ_BASE_UNIX!/db
+ set RABBITMQ_MNESIA_BASE=!RABBITMQ_BASE!/db
)
if "!RABBITMQ_LOG_BASE!"=="" (
- set RABBITMQ_LOG_BASE=!RABBITMQ_BASE_UNIX!/log
+ set RABBITMQ_LOG_BASE=!RABBITMQ_BASE!/log
)
@@ -96,7 +94,7 @@ set RABBITMQ_EBIN_ROOT=!TDP0!..\ebin
-noinput -hidden ^
-s rabbit_prelaunch ^
-sname rabbitmqprelaunch!RANDOM! ^
--extra "!RABBITMQ_ENABLED_PLUGINS_FILE!" ^
+-extra "!RABBITMQ_ENABLED_PLUGINS_FILE:\=/!" ^
"!RABBITMQ_PLUGINS_DIR:\=/!" ^
"!RABBITMQ_PLUGINS_EXPAND_DIR:\=/!" ^
"!RABBITMQ_NODENAME!"
@@ -144,7 +142,7 @@ if not "!RABBITMQ_NODE_IP_ADDRESS!"=="" (
-os_mon start_cpu_sup true ^
-os_mon start_disksup false ^
-os_mon start_memsup false ^
--mnesia dir \""!RABBITMQ_MNESIA_DIR!"\" ^
+-mnesia dir \""!RABBITMQ_MNESIA_DIR:\=/!"\" ^
!RABBITMQ_SERVER_START_ARGS! ^
!STAR!
diff --git a/scripts/rabbitmq-service.bat b/scripts/rabbitmq-service.bat
index e671ba7a..4d3fce49 100755
--- a/scripts/rabbitmq-service.bat
+++ b/scripts/rabbitmq-service.bat
@@ -80,7 +80,7 @@ rem *** End of configuration ***
if not exist "!ERLANG_SERVICE_MANAGER_PATH!\erlsrv.exe" (
echo.
echo **********************************************
- echo ERLANG_SERVICE_MANAGER_PATH not set correctly.
+ echo ERLANG_SERVICE_MANAGER_PATH not set correctly.
echo **********************************************
echo.
echo "!ERLANG_SERVICE_MANAGER_PATH!\erlsrv.exe" not found
@@ -89,14 +89,11 @@ if not exist "!ERLANG_SERVICE_MANAGER_PATH!\erlsrv.exe" (
exit /B 1
)
-rem erlang prefers forwardslash as separator in paths
-set RABBITMQ_BASE_UNIX=!RABBITMQ_BASE:\=/!
-
if "!RABBITMQ_MNESIA_BASE!"=="" (
- set RABBITMQ_MNESIA_BASE=!RABBITMQ_BASE_UNIX!/db
+ set RABBITMQ_MNESIA_BASE=!RABBITMQ_BASE!/db
)
if "!RABBITMQ_LOG_BASE!"=="" (
- set RABBITMQ_LOG_BASE=!RABBITMQ_BASE_UNIX!/log
+ set RABBITMQ_LOG_BASE=!RABBITMQ_BASE!/log
)
@@ -118,7 +115,7 @@ if "!RABBITMQ_PLUGINS_EXPAND_DIR!"=="" (
)
if "!P1!" == "install" goto INSTALL_SERVICE
-for %%i in (start stop disable enable list remove) do if "%%i" == "!P1!" goto MODIFY_SERVICE
+for %%i in (start stop disable enable list remove) do if "%%i" == "!P1!" goto MODIFY_SERVICE
echo.
echo *********************
@@ -129,7 +126,7 @@ echo !TN0! help - Display this help
echo !TN0! install - Install the !RABBITMQ_SERVICENAME! service
echo !TN0! remove - Remove the !RABBITMQ_SERVICENAME! service
echo.
-echo The following actions can also be accomplished by using
+echo The following actions can also be accomplished by using
echo Windows Services Management Console (services.msc):
echo.
echo !TN0! start - Start the !RABBITMQ_SERVICENAME! service
@@ -143,7 +140,7 @@ exit /B
:INSTALL_SERVICE
if not exist "!RABBITMQ_BASE!" (
- echo Creating base directory !RABBITMQ_BASE! & md "!RABBITMQ_BASE!"
+ echo Creating base directory !RABBITMQ_BASE! & md "!RABBITMQ_BASE!"
)
"!ERLANG_SERVICE_MANAGER_PATH!\erlsrv" list !RABBITMQ_SERVICENAME! 2>NUL 1>NUL
@@ -164,7 +161,7 @@ set RABBITMQ_EBIN_ROOT=!TDP0!..\ebin
-pa "!RABBITMQ_EBIN_ROOT!" ^
-noinput -hidden ^
-s rabbit_prelaunch ^
--extra "!RABBITMQ_ENABLED_PLUGINS_FILE!" ^
+-extra "!RABBITMQ_ENABLED_PLUGINS_FILE:\=/!" ^
"!RABBITMQ_PLUGINS_DIR:\=/!" ^
"!RABBITMQ_PLUGINS_EXPAND_DIR:\=/!" ^
""
@@ -179,7 +176,7 @@ set RABBITMQ_EBIN_PATH=
if "!RABBITMQ_CONFIG_FILE!"=="" (
set RABBITMQ_CONFIG_FILE=!RABBITMQ_BASE!\rabbitmq
)
-
+
if exist "!RABBITMQ_CONFIG_FILE!.config" (
set RABBITMQ_CONFIG_ARG=-config "!RABBITMQ_CONFIG_FILE!"
) else (
@@ -210,7 +207,7 @@ set ERLANG_SERVICE_ARGUMENTS= ^
-os_mon start_cpu_sup true ^
-os_mon start_disksup false ^
-os_mon start_memsup false ^
--mnesia dir \""!RABBITMQ_MNESIA_DIR!"\" ^
+-mnesia dir \""!RABBITMQ_MNESIA_DIR:\=/!"\" ^
!RABBITMQ_SERVER_START_ARGS! ^
!STAR!
@@ -219,7 +216,7 @@ set ERLANG_SERVICE_ARGUMENTS=!ERLANG_SERVICE_ARGUMENTS:"=\"!
"!ERLANG_SERVICE_MANAGER_PATH!\erlsrv" set !RABBITMQ_SERVICENAME! ^
-machine "!ERLANG_SERVICE_MANAGER_PATH!\erl.exe" ^
--env ERL_CRASH_DUMP="!RABBITMQ_BASE_UNIX!/erl_crash.dump" ^
+-env ERL_CRASH_DUMP="!RABBITMQ_BASE:\=/!/erl_crash.dump" ^
-workdir "!RABBITMQ_BASE!" ^
-stopaction "rabbit:stop_and_halt()." ^
-sname !RABBITMQ_NODENAME! ^
diff --git a/src/bpqueue.erl b/src/bpqueue.erl
deleted file mode 100644
index 71a34262..00000000
--- a/src/bpqueue.erl
+++ /dev/null
@@ -1,271 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License
-%% at http://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
-%% the License for the specific language governing rights and
-%% limitations under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
-%%
-
--module(bpqueue).
-
-%% Block-prefixed queue. From the perspective of the queue interface
-%% the datastructure acts like a regular queue where each value is
-%% paired with the prefix.
-%%
-%% This is implemented as a queue of queues, which is more space and
-%% time efficient, whilst supporting the normal queue interface. Each
-%% inner queue has a prefix, which does not need to be unique, and it
-%% is guaranteed that no two consecutive blocks have the same
-%% prefix. len/1 returns the flattened length of the queue and is
-%% O(1).
-
--export([new/0, is_empty/1, len/1, in/3, in_r/3, out/1, out_r/1, join/2,
- foldl/3, foldr/3, from_list/1, to_list/1, map_fold_filter_l/4,
- map_fold_filter_r/4]).
-
-%%----------------------------------------------------------------------------
-
--ifdef(use_specs).
-
--export_type([bpqueue/0]).
-
--type(bpqueue() :: {non_neg_integer(), queue()}).
--type(prefix() :: any()).
--type(value() :: any()).
--type(result() :: ({'empty', bpqueue()} |
- {{'value', prefix(), value()}, bpqueue()})).
-
--spec(new/0 :: () -> bpqueue()).
--spec(is_empty/1 :: (bpqueue()) -> boolean()).
--spec(len/1 :: (bpqueue()) -> non_neg_integer()).
--spec(in/3 :: (prefix(), value(), bpqueue()) -> bpqueue()).
--spec(in_r/3 :: (prefix(), value(), bpqueue()) -> bpqueue()).
--spec(out/1 :: (bpqueue()) -> result()).
--spec(out_r/1 :: (bpqueue()) -> result()).
--spec(join/2 :: (bpqueue(), bpqueue()) -> bpqueue()).
--spec(foldl/3 :: (fun ((prefix(), value(), B) -> B), B, bpqueue()) -> B).
--spec(foldr/3 :: (fun ((prefix(), value(), B) -> B), B, bpqueue()) -> B).
--spec(from_list/1 :: ([{prefix(), [value()]}]) -> bpqueue()).
--spec(to_list/1 :: (bpqueue()) -> [{prefix(), [value()]}]).
--spec(map_fold_filter_l/4 :: ((fun ((prefix()) -> boolean())),
- (fun ((value(), B) ->
- ({prefix(), value(), B} | 'stop'))),
- B,
- bpqueue()) ->
- {bpqueue(), B}).
--spec(map_fold_filter_r/4 :: ((fun ((prefix()) -> boolean())),
- (fun ((value(), B) ->
- ({prefix(), value(), B} | 'stop'))),
- B,
- bpqueue()) ->
- {bpqueue(), B}).
-
--endif.
-
-%%----------------------------------------------------------------------------
-
-new() -> {0, queue:new()}.
-
-is_empty({0, _Q}) -> true;
-is_empty(_BPQ) -> false.
-
-len({N, _Q}) -> N.
-
-in(Prefix, Value, {0, Q}) ->
- {1, queue:in({Prefix, queue:from_list([Value])}, Q)};
-in(Prefix, Value, BPQ) ->
- in1({fun queue:in/2, fun queue:out_r/1}, Prefix, Value, BPQ).
-
-in_r(Prefix, Value, BPQ = {0, _Q}) ->
- in(Prefix, Value, BPQ);
-in_r(Prefix, Value, BPQ) ->
- in1({fun queue:in_r/2, fun queue:out/1}, Prefix, Value, BPQ).
-
-in1({In, Out}, Prefix, Value, {N, Q}) ->
- {N+1, case Out(Q) of
- {{value, {Prefix, InnerQ}}, Q1} ->
- In({Prefix, In(Value, InnerQ)}, Q1);
- {{value, {_Prefix, _InnerQ}}, _Q1} ->
- In({Prefix, queue:in(Value, queue:new())}, Q)
- end}.
-
-in_q(Prefix, Queue, BPQ = {0, Q}) ->
- case queue:len(Queue) of
- 0 -> BPQ;
- N -> {N, queue:in({Prefix, Queue}, Q)}
- end;
-in_q(Prefix, Queue, BPQ) ->
- in_q1({fun queue:in/2, fun queue:out_r/1,
- fun queue:join/2},
- Prefix, Queue, BPQ).
-
-in_q_r(Prefix, Queue, BPQ = {0, _Q}) ->
- in_q(Prefix, Queue, BPQ);
-in_q_r(Prefix, Queue, BPQ) ->
- in_q1({fun queue:in_r/2, fun queue:out/1,
- fun (T, H) -> queue:join(H, T) end},
- Prefix, Queue, BPQ).
-
-in_q1({In, Out, Join}, Prefix, Queue, BPQ = {N, Q}) ->
- case queue:len(Queue) of
- 0 -> BPQ;
- M -> {N + M, case Out(Q) of
- {{value, {Prefix, InnerQ}}, Q1} ->
- In({Prefix, Join(InnerQ, Queue)}, Q1);
- {{value, {_Prefix, _InnerQ}}, _Q1} ->
- In({Prefix, Queue}, Q)
- end}
- end.
-
-out({0, _Q} = BPQ) -> {empty, BPQ};
-out(BPQ) -> out1({fun queue:in_r/2, fun queue:out/1}, BPQ).
-
-out_r({0, _Q} = BPQ) -> {empty, BPQ};
-out_r(BPQ) -> out1({fun queue:in/2, fun queue:out_r/1}, BPQ).
-
-out1({In, Out}, {N, Q}) ->
- {{value, {Prefix, InnerQ}}, Q1} = Out(Q),
- {{value, Value}, InnerQ1} = Out(InnerQ),
- Q2 = case queue:is_empty(InnerQ1) of
- true -> Q1;
- false -> In({Prefix, InnerQ1}, Q1)
- end,
- {{value, Prefix, Value}, {N-1, Q2}}.
-
-join({0, _Q}, BPQ) ->
- BPQ;
-join(BPQ, {0, _Q}) ->
- BPQ;
-join({NHead, QHead}, {NTail, QTail}) ->
- {{value, {Prefix, InnerQHead}}, QHead1} = queue:out_r(QHead),
- {NHead + NTail,
- case queue:out(QTail) of
- {{value, {Prefix, InnerQTail}}, QTail1} ->
- queue:join(
- queue:in({Prefix, queue:join(InnerQHead, InnerQTail)}, QHead1),
- QTail1);
- {{value, {_Prefix, _InnerQTail}}, _QTail1} ->
- queue:join(QHead, QTail)
- end}.
-
-foldl(_Fun, Init, {0, _Q}) -> Init;
-foldl( Fun, Init, {_N, Q}) -> fold1(fun queue:out/1, Fun, Init, Q).
-
-foldr(_Fun, Init, {0, _Q}) -> Init;
-foldr( Fun, Init, {_N, Q}) -> fold1(fun queue:out_r/1, Fun, Init, Q).
-
-fold1(Out, Fun, Init, Q) ->
- case Out(Q) of
- {empty, _Q} ->
- Init;
- {{value, {Prefix, InnerQ}}, Q1} ->
- fold1(Out, Fun, fold1(Out, Fun, Prefix, Init, InnerQ), Q1)
- end.
-
-fold1(Out, Fun, Prefix, Init, InnerQ) ->
- case Out(InnerQ) of
- {empty, _Q} ->
- Init;
- {{value, Value}, InnerQ1} ->
- fold1(Out, Fun, Prefix, Fun(Prefix, Value, Init), InnerQ1)
- end.
-
-from_list(List) ->
- {FinalPrefix, FinalInnerQ, ListOfPQs1, Len} =
- lists:foldl(
- fun ({_Prefix, []}, Acc) ->
- Acc;
- ({Prefix, InnerList}, {Prefix, InnerQ, ListOfPQs, LenAcc}) ->
- {Prefix, queue:join(InnerQ, queue:from_list(InnerList)),
- ListOfPQs, LenAcc + length(InnerList)};
- ({Prefix1, InnerList}, {Prefix, InnerQ, ListOfPQs, LenAcc}) ->
- {Prefix1, queue:from_list(InnerList),
- [{Prefix, InnerQ} | ListOfPQs], LenAcc + length(InnerList)}
- end, {undefined, queue:new(), [], 0}, List),
- ListOfPQs2 = [{FinalPrefix, FinalInnerQ} | ListOfPQs1],
- [{undefined, InnerQ1} | Rest] = All = lists:reverse(ListOfPQs2),
- {Len, queue:from_list(case queue:is_empty(InnerQ1) of
- true -> Rest;
- false -> All
- end)}.
-
-to_list({0, _Q}) -> [];
-to_list({_N, Q}) -> [{Prefix, queue:to_list(InnerQ)} ||
- {Prefix, InnerQ} <- queue:to_list(Q)].
-
-%% map_fold_filter_[lr](FilterFun, Fun, Init, BPQ) -> {BPQ, Init}
-%% where FilterFun(Prefix) -> boolean()
-%% Fun(Value, Init) -> {Prefix, Value, Init} | stop
-%%
-%% The filter fun allows you to skip very quickly over blocks that
-%% you're not interested in. Such blocks appear in the resulting bpq
-%% without modification. The Fun is then used both to map the value,
-%% which also allows you to change the prefix (and thus block) of the
-%% value, and also to modify the Init/Acc (just like a fold). If the
-%% Fun returns 'stop' then it is not applied to any further items.
-map_fold_filter_l(_PFilter, _Fun, Init, BPQ = {0, _Q}) ->
- {BPQ, Init};
-map_fold_filter_l(PFilter, Fun, Init, {N, Q}) ->
- map_fold_filter1({fun queue:out/1, fun queue:in/2,
- fun in_q/3, fun join/2},
- N, PFilter, Fun, Init, Q, new()).
-
-map_fold_filter_r(_PFilter, _Fun, Init, BPQ = {0, _Q}) ->
- {BPQ, Init};
-map_fold_filter_r(PFilter, Fun, Init, {N, Q}) ->
- map_fold_filter1({fun queue:out_r/1, fun queue:in_r/2,
- fun in_q_r/3, fun (T, H) -> join(H, T) end},
- N, PFilter, Fun, Init, Q, new()).
-
-map_fold_filter1(Funs = {Out, _In, InQ, Join}, Len, PFilter, Fun,
- Init, Q, QNew) ->
- case Out(Q) of
- {empty, _Q} ->
- {QNew, Init};
- {{value, {Prefix, InnerQ}}, Q1} ->
- case PFilter(Prefix) of
- true ->
- {Init1, QNew1, Cont} =
- map_fold_filter2(Funs, Fun, Prefix, Prefix,
- Init, InnerQ, QNew, queue:new()),
- case Cont of
- false -> {Join(QNew1, {Len - len(QNew1), Q1}), Init1};
- true -> map_fold_filter1(Funs, Len, PFilter, Fun,
- Init1, Q1, QNew1)
- end;
- false ->
- map_fold_filter1(Funs, Len, PFilter, Fun,
- Init, Q1, InQ(Prefix, InnerQ, QNew))
- end
- end.
-
-map_fold_filter2(Funs = {Out, In, InQ, _Join}, Fun, OrigPrefix, Prefix,
- Init, InnerQ, QNew, InnerQNew) ->
- case Out(InnerQ) of
- {empty, _Q} ->
- {Init, InQ(OrigPrefix, InnerQ,
- InQ(Prefix, InnerQNew, QNew)), true};
- {{value, Value}, InnerQ1} ->
- case Fun(Value, Init) of
- stop ->
- {Init, InQ(OrigPrefix, InnerQ,
- InQ(Prefix, InnerQNew, QNew)), false};
- {Prefix1, Value1, Init1} ->
- {Prefix2, QNew1, InnerQNew1} =
- case Prefix1 =:= Prefix of
- true -> {Prefix, QNew, In(Value1, InnerQNew)};
- false -> {Prefix1, InQ(Prefix, InnerQNew, QNew),
- In(Value1, queue:new())}
- end,
- map_fold_filter2(Funs, Fun, OrigPrefix, Prefix2,
- Init1, InnerQ1, QNew1, InnerQNew1)
- end
- end.
diff --git a/src/lqueue.erl b/src/lqueue.erl
new file mode 100644
index 00000000..04b40706
--- /dev/null
+++ b/src/lqueue.erl
@@ -0,0 +1,90 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2011-2011 VMware, Inc. All rights reserved.
+%%
+
+-module(lqueue).
+
+-export([new/0, is_empty/1, len/1, in/2, in_r/2, out/1, out_r/1, join/2,
+ foldl/3, foldr/3, from_list/1, to_list/1, peek/1, peek_r/1]).
+
+-define(QUEUE, queue).
+
+-ifdef(use_specs).
+
+-export_type([?MODULE/0]).
+
+-opaque(?MODULE() :: {non_neg_integer(), ?QUEUE()}).
+-type(value() :: any()).
+-type(result() :: 'empty' | {'value', value()}).
+
+-spec(new/0 :: () -> ?MODULE()).
+-spec(is_empty/1 :: (?MODULE()) -> boolean()).
+-spec(len/1 :: (?MODULE()) -> non_neg_integer()).
+-spec(in/2 :: (value(), ?MODULE()) -> ?MODULE()).
+-spec(in_r/2 :: (value(), ?MODULE()) -> ?MODULE()).
+-spec(out/1 :: (?MODULE()) -> {result(), ?MODULE()}).
+-spec(out_r/1 :: (?MODULE()) -> {result(), ?MODULE()}).
+-spec(join/2 :: (?MODULE(), ?MODULE()) -> ?MODULE()).
+-spec(foldl/3 :: (fun ((value(), B) -> B), B, ?MODULE()) -> B).
+-spec(foldr/3 :: (fun ((value(), B) -> B), B, ?MODULE()) -> B).
+-spec(from_list/1 :: ([value()]) -> ?MODULE()).
+-spec(to_list/1 :: (?MODULE()) -> [value()]).
+-spec(peek/1 :: (?MODULE()) -> result()).
+-spec(peek_r/1 :: (?MODULE()) -> result()).
+
+-endif.
+
+new() -> {0, ?QUEUE:new()}.
+
+is_empty({0, _Q}) -> true;
+is_empty(_) -> false.
+
+in(V, {L, Q}) -> {L+1, ?QUEUE:in(V, Q)}.
+
+in_r(V, {L, Q}) -> {L+1, ?QUEUE:in_r(V, Q)}.
+
+out({0, _Q} = Q) -> {empty, Q};
+out({L, Q}) -> {Result, Q1} = ?QUEUE:out(Q),
+ {Result, {L-1, Q1}}.
+
+out_r({0, _Q} = Q) -> {empty, Q};
+out_r({L, Q}) -> {Result, Q1} = ?QUEUE:out_r(Q),
+ {Result, {L-1, Q1}}.
+
+join({L1, Q1}, {L2, Q2}) -> {L1 + L2, ?QUEUE:join(Q1, Q2)}.
+
+to_list({_L, Q}) -> ?QUEUE:to_list(Q).
+
+from_list(L) -> {length(L), ?QUEUE:from_list(L)}.
+
+foldl(Fun, Init, Q) ->
+ case out(Q) of
+ {empty, _Q} -> Init;
+ {{value, V}, Q1} -> foldl(Fun, Fun(V, Init), Q1)
+ end.
+
+foldr(Fun, Init, Q) ->
+ case out_r(Q) of
+ {empty, _Q} -> Init;
+ {{value, V}, Q1} -> foldr(Fun, Fun(V, Init), Q1)
+ end.
+
+len({L, _Q}) -> L.
+
+peek({ 0, _Q}) -> empty;
+peek({_L, Q}) -> ?QUEUE:peek(Q).
+
+peek_r({ 0, _Q}) -> empty;
+peek_r({_L, Q}) -> ?QUEUE:peek_r(Q).
diff --git a/src/mirrored_supervisor_tests.erl b/src/mirrored_supervisor_tests.erl
index ee9c7593..0900f56f 100644
--- a/src/mirrored_supervisor_tests.erl
+++ b/src/mirrored_supervisor_tests.erl
@@ -202,6 +202,7 @@ with_sups(Fun, Sups) ->
Pids = [begin {ok, Pid} = start_sup(Sup), Pid end || Sup <- Sups],
Fun(Pids),
[kill(Pid) || Pid <- Pids, is_process_alive(Pid)],
+ timer:sleep(500),
passed.
start_sup(Spec) ->
diff --git a/src/rabbit.erl b/src/rabbit.erl
index e98ca9be..0a2681a2 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -18,8 +18,8 @@
-behaviour(application).
--export([prepare/0, start/0, stop/0, stop_and_halt/0, status/0,
- is_running/0 , is_running/1, environment/0,
+-export([maybe_hipe_compile/0, prepare/0, start/0, stop/0, stop_and_halt/0,
+ status/0, is_running/0, is_running/1, environment/0,
rotate_logs/1, force_event_refresh/0]).
-export([start/2, stop/1]).
@@ -177,6 +177,27 @@
-define(APPS, [os_mon, mnesia, rabbit]).
+%% see bug 24513 for how this list was created
+-define(HIPE_WORTHY,
+ [rabbit_reader, rabbit_channel, gen_server2,
+ rabbit_exchange, rabbit_command_assembler, rabbit_framing_amqp_0_9_1,
+ rabbit_basic, rabbit_event, lists, queue, priority_queue,
+ rabbit_router, rabbit_trace, rabbit_misc, rabbit_binary_parser,
+ rabbit_exchange_type_direct, rabbit_guid, rabbit_net,
+ rabbit_amqqueue_process, rabbit_variable_queue,
+ rabbit_binary_generator, rabbit_writer, delegate, gb_sets, lqueue,
+ sets, orddict, rabbit_amqqueue, rabbit_limiter, gb_trees,
+ rabbit_queue_index, gen, dict, ordsets, file_handle_cache,
+ rabbit_msg_store, array, rabbit_msg_store_ets_index, rabbit_msg_file,
+ rabbit_exchange_type_fanout, rabbit_exchange_type_topic, mnesia,
+ mnesia_lib, rpc, mnesia_tm, qlc, sofs, proplists]).
+
+%% HiPE compilation uses multiple cores anyway, but some bits are
+%% IO-bound so we can go faster if we parallelise a bit more. In
+%% practice 2 processes seems just as fast as any other number > 1,
+%% and keeps the progress bar realistic-ish.
+-define(HIPE_PROCESSES, 2).
+
%%----------------------------------------------------------------------------
-ifdef(use_specs).
@@ -185,6 +206,7 @@
%% this really should be an abstract type
-type(log_location() :: 'tty' | 'undefined' | file:filename()).
+-spec(maybe_hipe_compile/0 :: () -> 'ok').
-spec(prepare/0 :: () -> 'ok').
-spec(start/0 :: () -> 'ok').
-spec(stop/0 :: () -> 'ok').
@@ -218,12 +240,53 @@
%%----------------------------------------------------------------------------
+maybe_hipe_compile() ->
+ {ok, Want} = application:get_env(rabbit, hipe_compile),
+ Can = code:which(hipe) =/= non_existing,
+ case {Want, Can} of
+ {true, true} -> hipe_compile();
+ {true, false} -> io:format("Not HiPE compiling: HiPE not found in "
+ "this Erlang installation.~n");
+ {false, _} -> ok
+ end.
+
+hipe_compile() ->
+ Count = length(?HIPE_WORTHY),
+ io:format("HiPE compiling: |~s|~n |",
+ [string:copies("-", Count)]),
+ T1 = erlang:now(),
+ PidMRefs = [spawn_monitor(fun () -> [begin
+ {ok, M} = hipe:c(M, [o3]),
+ io:format("#")
+ end || M <- Ms]
+ end) ||
+ Ms <- split(?HIPE_WORTHY, ?HIPE_PROCESSES)],
+ [receive
+ {'DOWN', MRef, process, _, normal} -> ok;
+ {'DOWN', MRef, process, _, Reason} -> exit(Reason)
+ end || {_Pid, MRef} <- PidMRefs],
+ T2 = erlang:now(),
+ io:format("|~n~nCompiled ~B modules in ~Bs~n",
+ [Count, timer:now_diff(T2, T1) div 1000000]).
+
+split(L, N) -> split0(L, [[] || _ <- lists:seq(1, N)]).
+
+split0([], Ls) -> Ls;
+split0([I | Is], [L | Ls]) -> split0(Is, Ls ++ [[I | L]]).
+
prepare() ->
ok = ensure_working_log_handlers(),
ok = rabbit_upgrade:maybe_upgrade_mnesia().
start() ->
try
+ %% prepare/1 ends up looking at the rabbit app's env, so it
+ %% needs to be loaded, but during the tests, it may end up
+ %% getting loaded twice, so guard against that
+ case application:load(rabbit) of
+ ok -> ok;
+ {error, {already_loaded, rabbit}} -> ok
+ end,
ok = prepare(),
ok = rabbit_misc:start_applications(application_load_order())
after
diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl
index d38ecb91..fd03ca85 100644
--- a/src/rabbit_alarm.erl
+++ b/src/rabbit_alarm.erl
@@ -45,11 +45,7 @@
start() ->
ok = alarm_handler:add_alarm_handler(?MODULE, []),
{ok, MemoryWatermark} = application:get_env(vm_memory_high_watermark),
- ok = case MemoryWatermark == 0 of
- true -> ok;
- false -> rabbit_sup:start_restartable_child(vm_memory_monitor,
- [MemoryWatermark])
- end,
+ rabbit_sup:start_restartable_child(vm_memory_monitor, [MemoryWatermark]),
ok.
stop() ->
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 46f6674b..ba20b355 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -129,7 +129,7 @@ init(Q) ->
rate_timer_ref = undefined,
expiry_timer_ref = undefined,
ttl = undefined,
- msg_id_to_channel = dict:new()},
+ msg_id_to_channel = gb_trees:empty()},
{ok, rabbit_event:init_stats_timer(State, #q.stats_timer), hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
@@ -454,11 +454,11 @@ confirm_messages(MsgIds, State = #q{msg_id_to_channel = MTC}) ->
{CMs, MTC1} =
lists:foldl(
fun(MsgId, {CMs, MTC0}) ->
- case dict:find(MsgId, MTC0) of
- {ok, {ChPid, MsgSeqNo}} ->
+ case gb_trees:lookup(MsgId, MTC0) of
+ {value, {ChPid, MsgSeqNo}} ->
{rabbit_misc:gb_trees_cons(ChPid, MsgSeqNo, CMs),
- dict:erase(MsgId, MTC0)};
- _ ->
+ gb_trees:delete(MsgId, MTC0)};
+ none ->
{CMs, MTC0}
end
end, {gb_trees:empty(), MTC}, MsgIds),
@@ -482,7 +482,7 @@ needs_confirming(_) -> false.
maybe_record_confirm_message({eventually, ChPid, MsgSeqNo, MsgId},
State = #q{msg_id_to_channel = MTC}) ->
- State#q{msg_id_to_channel = dict:store(MsgId, {ChPid, MsgSeqNo}, MTC)};
+ State#q{msg_id_to_channel = gb_trees:insert(MsgId, {ChPid, MsgSeqNo}, MTC)};
maybe_record_confirm_message(_Confirm, State) ->
State.
@@ -551,13 +551,11 @@ deliver_or_enqueue(Delivery = #delivery{message = Message,
ensure_ttl_timer(State2#q{backing_queue_state = BQS1})
end.
-requeue_and_run(AckTags, State = #q{backing_queue = BQ, ttl=TTL}) ->
- run_backing_queue(
- BQ, fun (M, BQS) ->
- {_MsgIds, BQS1} =
- M:requeue(AckTags, reset_msg_expiry_fun(TTL), BQS),
- BQS1
- end, State).
+requeue_and_run(AckTags, State = #q{backing_queue = BQ}) ->
+ run_backing_queue(BQ, fun (M, BQS) ->
+ {_MsgIds, BQS1} = M:requeue(AckTags, BQS),
+ BQS1
+ end, State).
fetch(AckRequired, State = #q{backing_queue_state = BQS,
backing_queue = BQ}) ->
@@ -670,11 +668,6 @@ discard_delivery(#delivery{sender = ChPid,
backing_queue_state = BQS}) ->
State#q{backing_queue_state = BQ:discard(Message, ChPid, BQS)}.
-reset_msg_expiry_fun(TTL) ->
- fun(MsgProps) ->
- MsgProps#message_properties{expiry = calculate_msg_expiry(TTL)}
- end.
-
message_properties(#q{ttl=TTL}) ->
#message_properties{expiry = calculate_msg_expiry(TTL)}.
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 77278416..c3b322ee 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -107,7 +107,7 @@ behaviour_info(callbacks) ->
%% Reinsert messages into the queue which have already been
%% delivered and were pending acknowledgement.
- {requeue, 3},
+ {requeue, 2},
%% How long is my queue?
{len, 1},
diff --git a/src/rabbit_backing_queue_qc.erl b/src/rabbit_backing_queue_qc.erl
index 095202dd..c61184a6 100644
--- a/src/rabbit_backing_queue_qc.erl
+++ b/src/rabbit_backing_queue_qc.erl
@@ -34,14 +34,15 @@
-export([initial_state/0, command/1, precondition/2, postcondition/3,
next_state/3]).
--export([prop_backing_queue_test/0, publish_multiple/4, timeout/2]).
+-export([prop_backing_queue_test/0, publish_multiple/1, timeout/2]).
-record(state, {bqstate,
len, %% int
next_seq_id, %% int
messages, %% gb_trees of seqid => {msg_props, basic_msg}
acks, %% [{acktag, {seqid, {msg_props, basic_msg}}}]
- confirms}). %% set of msgid
+ confirms, %% set of msgid
+ publishing}).%% int
%% Initialise model
@@ -51,7 +52,8 @@ initial_state() ->
next_seq_id = 0,
messages = gb_trees:empty(),
acks = [],
- confirms = gb_sets:new()}.
+ confirms = gb_sets:new(),
+ publishing = 0}.
%% Property
@@ -112,10 +114,8 @@ qc_publish(#state{bqstate = BQ}) ->
expiry = oneof([undefined | lists:seq(1, 10)])},
self(), BQ]}.
-qc_publish_multiple(#state{bqstate = BQ}) ->
- {call, ?MODULE, publish_multiple,
- [qc_message(), #message_properties{}, BQ,
- resize(?QUEUE_MAXLEN, pos_integer())]}.
+qc_publish_multiple(#state{}) ->
+ {call, ?MODULE, publish_multiple, [resize(?QUEUE_MAXLEN, pos_integer())]}.
qc_publish_delivered(#state{bqstate = BQ}) ->
{call, ?BQMOD, publish_delivered,
@@ -128,8 +128,7 @@ qc_ack(#state{bqstate = BQ, acks = Acks}) ->
{call, ?BQMOD, ack, [rand_choice(proplists:get_keys(Acks)), BQ]}.
qc_requeue(#state{bqstate = BQ, acks = Acks}) ->
- {call, ?BQMOD, requeue,
- [rand_choice(proplists:get_keys(Acks)), fun(MsgOpts) -> MsgOpts end, BQ]}.
+ {call, ?BQMOD, requeue, [rand_choice(proplists:get_keys(Acks)), BQ]}.
qc_set_ram_duration_target(#state{bqstate = BQ}) ->
{call, ?BQMOD, set_ram_duration_target,
@@ -155,6 +154,10 @@ qc_purge(#state{bqstate = BQ}) ->
%% Preconditions
+%% Create long queues by only allowing publishing
+precondition(#state{publishing = Count}, {call, _Mod, Fun, _Arg})
+ when Count > 0, Fun /= publish ->
+ false;
precondition(#state{acks = Acks}, {call, ?BQMOD, Fun, _Arg})
when Fun =:= ack; Fun =:= requeue ->
length(Acks) > 0;
@@ -174,6 +177,7 @@ next_state(S, BQ, {call, ?BQMOD, publish, [Msg, MsgProps, _Pid, _BQ]}) ->
#state{len = Len,
messages = Messages,
confirms = Confirms,
+ publishing = PublishCount,
next_seq_id = NextSeq} = S,
MsgId = {call, erlang, element, [?RECORD_INDEX(id, basic_message), Msg]},
NeedsConfirm =
@@ -183,21 +187,15 @@ next_state(S, BQ, {call, ?BQMOD, publish, [Msg, MsgProps, _Pid, _BQ]}) ->
len = Len + 1,
next_seq_id = NextSeq + 1,
messages = gb_trees:insert(NextSeq, {MsgProps, Msg}, Messages),
+ publishing = {call, erlang, max, [0, {call, erlang, '-',
+ [PublishCount, 1]}]},
confirms = case eval(NeedsConfirm) of
true -> gb_sets:add(MsgId, Confirms);
_ -> Confirms
end};
-next_state(S, BQ, {call, _, publish_multiple, [Msg, MsgProps, _BQ, Count]}) ->
- #state{len = Len, messages = Messages} = S,
- {S1, Msgs1} = repeat({S, Messages},
- fun ({#state{next_seq_id = NextSeq} = State, Msgs}) ->
- {State #state { next_seq_id = NextSeq + 1},
- gb_trees:insert(NextSeq, {MsgProps, Msg}, Msgs)}
- end, Count),
- S1#state{bqstate = BQ,
- len = Len + Count,
- messages = Msgs1};
+next_state(S, _BQ, {call, ?MODULE, publish_multiple, [PublishCount]}) ->
+ S#state{publishing = PublishCount};
next_state(S, Res,
{call, ?BQMOD, publish_delivered,
@@ -245,7 +243,7 @@ next_state(S, Res, {call, ?BQMOD, ack, [AcksArg, _BQ]}) ->
S#state{bqstate = BQ1,
acks = lists:foldl(fun proplists:delete/2, AcksState, AcksArg)};
-next_state(S, Res, {call, ?BQMOD, requeue, [AcksArg, _F, _V]}) ->
+next_state(S, Res, {call, ?BQMOD, requeue, [AcksArg, _V]}) ->
#state{messages = Messages, acks = AcksState} = S,
BQ1 = {call, erlang, element, [2, Res]},
Messages1 = lists:foldl(fun (AckTag, Msgs) ->
@@ -322,12 +320,8 @@ postcondition(#state{bqstate = BQ, len = Len}, {call, _M, _F, _A}, _Res) ->
%% Helpers
-repeat(Result, _Fun, 0) -> Result;
-repeat(Result, Fun, Times) -> repeat(Fun(Result), Fun, Times - 1).
-
-publish_multiple(Msg, MsgProps, BQ, Count) ->
- repeat(BQ, fun(BQ1) -> ?BQMOD:publish(Msg, MsgProps, self(), BQ1) end,
- Count).
+publish_multiple(_C) ->
+ ok.
timeout(BQ, 0) ->
BQ;
diff --git a/src/rabbit_binary_parser.erl b/src/rabbit_binary_parser.erl
index 88026bab..f3ca4e98 100644
--- a/src/rabbit_binary_parser.erl
+++ b/src/rabbit_binary_parser.erl
@@ -18,7 +18,7 @@
-include("rabbit.hrl").
--export([parse_table/1, parse_properties/2]).
+-export([parse_table/1]).
-export([ensure_content_decoded/1, clear_decoded_content/1]).
%%----------------------------------------------------------------------------
@@ -26,8 +26,6 @@
-ifdef(use_specs).
-spec(parse_table/1 :: (binary()) -> rabbit_framing:amqp_table()).
--spec(parse_properties/2 ::
- ([rabbit_framing:amqp_property_type()], binary()) -> [any()]).
-spec(ensure_content_decoded/1 ::
(rabbit_types:content()) -> rabbit_types:decoded_content()).
-spec(clear_decoded_content/1 ::
@@ -94,57 +92,6 @@ parse_field_value(<<"x", VLen:32/unsigned, ValueString:VLen/binary, Rest/binary>
parse_field_value(<<"V", Rest/binary>>) ->
{void, undefined, Rest}.
-
-parse_properties([], _PropBin) ->
- [];
-parse_properties(TypeList, PropBin) ->
- FlagCount = length(TypeList),
- %% round up to the nearest multiple of 15 bits, since the 16th bit
- %% in each short is a "continuation" bit.
- FlagsLengthBytes = trunc((FlagCount + 14) / 15) * 2,
- <<Flags:FlagsLengthBytes/binary, Properties/binary>> = PropBin,
- <<FirstShort:16, Remainder/binary>> = Flags,
- parse_properties(0, TypeList, [], FirstShort, Remainder, Properties).
-
-parse_properties(_Bit, [], Acc, _FirstShort,
- _Remainder, <<>>) ->
- lists:reverse(Acc);
-parse_properties(_Bit, [], _Acc, _FirstShort,
- _Remainder, _LeftoverBin) ->
- exit(content_properties_binary_overflow);
-parse_properties(15, TypeList, Acc, _OldFirstShort,
- <<NewFirstShort:16,Remainder/binary>>, Properties) ->
- parse_properties(0, TypeList, Acc, NewFirstShort, Remainder, Properties);
-parse_properties(Bit, [Type | TypeListRest], Acc, FirstShort,
- Remainder, Properties) ->
- {Value, Rest} =
- if (FirstShort band (1 bsl (15 - Bit))) /= 0 ->
- parse_property(Type, Properties);
- Type == bit -> {false , Properties};
- true -> {undefined, Properties}
- end,
- parse_properties(Bit + 1, TypeListRest, [Value | Acc], FirstShort,
- Remainder, Rest).
-
-parse_property(shortstr, <<Len:8/unsigned, String:Len/binary, Rest/binary>>) ->
- {String, Rest};
-parse_property(longstr, <<Len:32/unsigned, String:Len/binary, Rest/binary>>) ->
- {String, Rest};
-parse_property(octet, <<Int:8/unsigned, Rest/binary>>) ->
- {Int, Rest};
-parse_property(shortint, <<Int:16/unsigned, Rest/binary>>) ->
- {Int, Rest};
-parse_property(longint, <<Int:32/unsigned, Rest/binary>>) ->
- {Int, Rest};
-parse_property(longlongint, <<Int:64/unsigned, Rest/binary>>) ->
- {Int, Rest};
-parse_property(timestamp, <<Int:64/unsigned, Rest/binary>>) ->
- {Int, Rest};
-parse_property(bit, Rest) ->
- {true, Rest};
-parse_property(table, <<Len:32/unsigned, Table:Len/binary, Rest/binary>>) ->
- {parse_table(Table), Rest}.
-
ensure_content_decoded(Content = #content{properties = Props})
when Props =/= none ->
Content;
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 883e570a..9b2fe28c 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -1080,9 +1080,8 @@ handle_method(#'tx.rollback'{}, _, #ch{tx_status = none}) ->
handle_method(#'tx.rollback'{}, _, State = #ch{unacked_message_q = UAMQ,
uncommitted_acks = TAL}) ->
- TAQ = queue:from_list(lists:reverse(TAL)),
- {reply, #'tx.rollback_ok'{},
- new_tx(State#ch{unacked_message_q = queue:join(TAQ, UAMQ)})};
+ UAMQ1 = queue:from_list(lists:usort(TAL ++ queue:to_list(UAMQ))),
+ {reply, #'tx.rollback_ok'{}, new_tx(State#ch{unacked_message_q = UAMQ1})};
handle_method(#'confirm.select'{}, _, #ch{tx_status = in_progress}) ->
rabbit_misc:protocol_error(
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index 905e4fd0..fa8dd262 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -20,6 +20,7 @@
-export([start/0, stop/0, action/5, diagnostics/1]).
-define(RPC_TIMEOUT, infinity).
+-define(EXTERNAL_CHECK_INTERVAL, 1000).
-define(QUIET_OPT, "-q").
-define(NODE_OPT, "-n").
@@ -161,9 +162,16 @@ usage() ->
%%----------------------------------------------------------------------------
-action(stop, Node, [], _Opts, Inform) ->
+action(stop, Node, Args, _Opts, Inform) ->
Inform("Stopping and halting node ~p", [Node]),
- call(Node, {rabbit, stop_and_halt, []});
+ Res = call(Node, {rabbit, stop_and_halt, []}),
+ case {Res, Args} of
+ {ok, [PidFile]} -> wait_for_process_death(
+ read_pid_file(PidFile, false));
+ {ok, [_, _| _]} -> exit({badarg, Args});
+ _ -> ok
+ end,
+ Res;
action(stop_app, Node, [], _Opts, Inform) ->
Inform("Stopping node ~p", [Node]),
@@ -325,7 +333,10 @@ action(trace_off, Node, [], Opts, Inform) ->
rpc_call(Node, rabbit_trace, stop, [list_to_binary(VHost)]);
action(set_vm_memory_high_watermark, Node, [Arg], _Opts, Inform) ->
- Frac = list_to_float(Arg),
+ Frac = list_to_float(case string:chr(Arg, $.) of
+ 0 -> Arg ++ ".0";
+ _ -> Arg
+ end),
Inform("Setting memory threshhold on ~p to ~p", [Node, Frac]),
rpc_call(Node, vm_memory_monitor, set_vm_memory_high_watermark, [Frac]);
@@ -362,7 +373,7 @@ action(report, Node, _Args, _Opts, Inform) ->
%%----------------------------------------------------------------------------
wait_for_application(Node, PidFile, Inform) ->
- Pid = wait_and_read_pid_file(PidFile),
+ Pid = read_pid_file(PidFile, true),
Inform("pid is ~s", [Pid]),
wait_for_application(Node, Pid).
@@ -370,18 +381,33 @@ wait_for_application(Node, Pid) ->
case process_up(Pid) of
true -> case rabbit:is_running(Node) of
true -> ok;
- false -> timer:sleep(1000),
+ false -> timer:sleep(?EXTERNAL_CHECK_INTERVAL),
wait_for_application(Node, Pid)
end;
false -> {error, process_not_running}
end.
-wait_and_read_pid_file(PidFile) ->
- case file:read_file(PidFile) of
- {ok, Bin} -> string:strip(binary_to_list(Bin), right, $\n);
- {error, enoent} -> timer:sleep(500),
- wait_and_read_pid_file(PidFile);
- {error, _} = E -> exit({error, {could_not_read_pid, E}})
+wait_for_process_death(Pid) ->
+ case process_up(Pid) of
+ true -> timer:sleep(?EXTERNAL_CHECK_INTERVAL),
+ wait_for_process_death(Pid);
+ false -> ok
+ end.
+
+read_pid_file(PidFile, Wait) ->
+ case {file:read_file(PidFile), Wait} of
+ {{ok, Bin}, _} ->
+ S = string:strip(binary_to_list(Bin), right, $\n),
+ try list_to_integer(S)
+ catch error:badarg ->
+ exit({error, {garbage_in_pid_file, PidFile}})
+ end,
+ S;
+ {{error, enoent}, true} ->
+ timer:sleep(?EXTERNAL_CHECK_INTERVAL),
+ read_pid_file(PidFile, Wait);
+ {{error, _} = E, _} ->
+ exit({error, {could_not_read_pid, E}})
end.
% Test using some OS clunkiness since we shouldn't trust
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index afa48355..a15b9be4 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -257,6 +257,8 @@ route1(Delivery, {WorkList, SeenXs, QNames}) ->
DstNames))
end.
+process_alternate(#exchange{arguments = []}, Results) -> %% optimisation
+ Results;
process_alternate(#exchange{name = XName, arguments = Args}, []) ->
case rabbit_misc:r_arg(XName, exchange, Args, <<"alternate-exchange">>) of
undefined -> [];
@@ -355,5 +357,9 @@ peek_serial(XName) ->
%% Used with atoms from records; e.g., the type is expected to exist.
type_to_module(T) ->
- {ok, Module} = rabbit_registry:lookup_module(exchange, T),
- Module.
+ case get({xtype_to_module, T}) of
+ undefined -> {ok, Module} = rabbit_registry:lookup_module(exchange, T),
+ put({xtype_to_module, T}, Module),
+ Module;
+ Module -> Module
+ end.
diff --git a/src/rabbit_memory_monitor.erl b/src/rabbit_memory_monitor.erl
index 3bd8eeef..02f3158f 100644
--- a/src/rabbit_memory_monitor.erl
+++ b/src/rabbit_memory_monitor.erl
@@ -24,7 +24,7 @@
-behaviour(gen_server2).
--export([start_link/0, update/0, register/2, deregister/1,
+-export([start_link/0, register/2, deregister/1,
report_ram_duration/2, stop/0]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
@@ -69,7 +69,6 @@
-ifdef(use_specs).
-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
--spec(update/0 :: () -> 'ok').
-spec(register/2 :: (pid(), {atom(),atom(),[any()]}) -> 'ok').
-spec(deregister/1 :: (pid()) -> 'ok').
-spec(report_ram_duration/2 ::
@@ -85,9 +84,6 @@
start_link() ->
gen_server2:start_link({local, ?SERVER}, ?MODULE, [], []).
-update() ->
- gen_server2:cast(?SERVER, update).
-
register(Pid, MFA = {_M, _F, _A}) ->
gen_server2:call(?SERVER, {register, Pid, MFA}, infinity).
@@ -106,8 +102,7 @@ stop() ->
%%----------------------------------------------------------------------------
init([]) ->
- {ok, TRef} = timer:apply_interval(?DEFAULT_UPDATE_INTERVAL,
- ?SERVER, update, []),
+ {ok, TRef} = timer:send_interval(?DEFAULT_UPDATE_INTERVAL, update),
Ets = ets:new(?TABLE_NAME, [set, private, {keypos, #process.pid}]),
@@ -153,9 +148,6 @@ handle_call({register, Pid, MFA}, _From,
handle_call(_Request, _From, State) ->
{noreply, State}.
-handle_cast(update, State) ->
- {noreply, internal_update(State)};
-
handle_cast({deregister, Pid}, State) ->
{noreply, internal_deregister(Pid, true, State)};
@@ -165,6 +157,9 @@ handle_cast(stop, State) ->
handle_cast(_Request, State) ->
{noreply, State}.
+handle_info(update, State) ->
+ {noreply, internal_update(State)};
+
handle_info({'DOWN', _MRef, process, Pid, _Reason}, State) ->
{noreply, internal_deregister(Pid, false, State)};
@@ -216,17 +211,19 @@ internal_update(State = #state { queue_durations = Durations,
queue_duration_sum = Sum,
queue_duration_count = Count }) ->
MemoryLimit = ?MEMORY_LIMIT_SCALING * vm_memory_monitor:get_memory_limit(),
- MemoryRatio = erlang:memory(total) / MemoryLimit,
+ MemoryRatio = case MemoryLimit > 0.0 of
+ true -> erlang:memory(total) / MemoryLimit;
+ false -> infinity
+ end,
DesiredDurationAvg1 =
- case MemoryRatio < ?LIMIT_THRESHOLD orelse Count == 0 of
- true ->
+ if MemoryRatio =:= infinity ->
+ 0.0;
+ MemoryRatio < ?LIMIT_THRESHOLD orelse Count == 0 ->
infinity;
- false ->
- Sum1 = case MemoryRatio < ?SUM_INC_THRESHOLD of
- true -> Sum + ?SUM_INC_AMOUNT;
- false -> Sum
- end,
- (Sum1 / Count) / MemoryRatio
+ MemoryRatio < ?SUM_INC_THRESHOLD ->
+ ((Sum + ?SUM_INC_AMOUNT) / Count) / MemoryRatio;
+ true ->
+ (Sum / Count) / MemoryRatio
end,
State1 = State #state { desired_duration = DesiredDurationAvg1 },
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 328fe639..f60562ef 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -18,7 +18,7 @@
-export([init/3, terminate/2, delete_and_terminate/2,
purge/1, publish/4, publish_delivered/5, fetch/2, ack/2,
- requeue/3, len/1, is_empty/1, drain_confirmed/1, dropwhile/2,
+ requeue/2, len/1, is_empty/1, drain_confirmed/1, dropwhile/2,
set_ram_duration_target/2, ram_duration/1,
needs_timeout/1, timeout/1, handle_pre_hibernate/1,
status/1, invoke/3, is_duplicate/2, discard/3]).
@@ -248,11 +248,11 @@ ack(AckTags, State = #state { gm = GM,
{MsgIds, State #state { backing_queue_state = BQS1,
ack_msg_id = AM1 }}.
-requeue(AckTags, MsgPropsFun, State = #state { gm = GM,
- backing_queue = BQ,
- backing_queue_state = BQS }) ->
- {MsgIds, BQS1} = BQ:requeue(AckTags, MsgPropsFun, BQS),
- ok = gm:broadcast(GM, {requeue, MsgPropsFun, MsgIds}),
+requeue(AckTags, State = #state { gm = GM,
+ backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ {MsgIds, BQS1} = BQ:requeue(AckTags, BQS),
+ ok = gm:broadcast(GM, {requeue, MsgIds}),
{MsgIds, State #state { backing_queue_state = BQS1 }}.
len(#state { backing_queue = BQ, backing_queue_state = BQS }) ->
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index f423760a..7182042d 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -827,14 +827,14 @@ process_instruction({ack, MsgIds},
[] = MsgIds1 -- MsgIds, %% ASSERTION
{ok, State #state { msg_id_ack = MA1,
backing_queue_state = BQS1 }};
-process_instruction({requeue, MsgPropsFun, MsgIds},
+process_instruction({requeue, MsgIds},
State = #state { backing_queue = BQ,
backing_queue_state = BQS,
msg_id_ack = MA }) ->
{AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA),
{ok, case length(AckTags) =:= length(MsgIds) of
true ->
- {MsgIds, BQS1} = BQ:requeue(AckTags, MsgPropsFun, BQS),
+ {MsgIds, BQS1} = BQ:requeue(AckTags, BQS),
State #state { msg_id_ack = MA1,
backing_queue_state = BQS1 };
false ->
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index a9b15af8..dcfbcaff 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -236,18 +236,32 @@ protocol_error(#amqp_error{} = Error) ->
not_found(R) -> protocol_error(not_found, "no ~s", [rs(R)]).
+type_class(byte) -> int;
+type_class(short) -> int;
+type_class(signedint) -> int;
+type_class(long) -> int;
+type_class(decimal) -> int;
+type_class(float) -> float;
+type_class(double) -> float;
+type_class(Other) -> Other.
+
assert_args_equivalence(Orig, New, Name, Keys) ->
[assert_args_equivalence1(Orig, New, Name, Key) || Key <- Keys],
ok.
assert_args_equivalence1(Orig, New, Name, Key) ->
case {table_lookup(Orig, Key), table_lookup(New, Key)} of
- {Same, Same} -> ok;
- {Orig1, New1} -> protocol_error(
- precondition_failed,
- "inequivalent arg '~s' for ~s: "
- "received ~s but current is ~s",
- [Key, rs(Name), val(New1), val(Orig1)])
+ {Same, Same} ->
+ ok;
+ {{OrigType, OrigVal} = Orig1, {NewType, NewVal} = New1} ->
+ case type_class(OrigType) == type_class(NewType) andalso
+ OrigVal == NewVal of
+ true -> ok;
+ false -> protocol_error(precondition_failed, "inequivalent arg"
+ " '~s' for ~s: received ~s but current"
+ " is ~s",
+ [Key, rs(Name), val(New1), val(Orig1)])
+ end
end.
val(undefined) ->
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index e4691b81..e6a32b90 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -68,6 +68,7 @@
file_handles_ets, %% tid of the shared file handles table
file_summary_ets, %% tid of the file summary table
cur_file_cache_ets, %% tid of current file cache table
+ flying_ets, %% tid of writes/removes in flight
dying_clients, %% set of dying clients
clients, %% map of references of all registered clients
%% to callbacks
@@ -86,7 +87,8 @@
gc_pid,
file_handles_ets,
file_summary_ets,
- cur_file_cache_ets
+ cur_file_cache_ets,
+ flying_ets
}).
-record(file_summary,
@@ -128,12 +130,13 @@
gc_pid :: pid(),
file_handles_ets :: ets:tid(),
file_summary_ets :: ets:tid(),
- cur_file_cache_ets :: ets:tid()}).
+ cur_file_cache_ets :: ets:tid(),
+ flying_ets :: ets:tid()}).
-type(msg_ref_delta_gen(A) ::
fun ((A) -> 'finished' |
{rabbit_types:msg_id(), non_neg_integer(), A})).
-type(maybe_msg_id_fun() ::
- 'undefined' | fun ((gb_set(), 'written' | 'removed') -> any())).
+ 'undefined' | fun ((gb_set(), 'written' | 'ignored') -> any())).
-type(maybe_close_fds_fun() :: 'undefined' | fun (() -> 'ok')).
-type(deletion_thunk() :: fun (() -> boolean())).
@@ -375,6 +378,45 @@
%% performance with many healthy clients and few, if any, dying
%% clients, which is the typical case.
%%
+%% When the msg_store has a backlog (i.e. it has unprocessed messages
+%% in its mailbox / gen_server priority queue), a further optimisation
+%% opportunity arises: we can eliminate pairs of 'write' and 'remove'
+%% from the same client for the same message. A typical occurrence of
+%% these is when an empty durable queue delivers persistent messages
+%% to ack'ing consumers. The queue will asynchronously ask the
+%% msg_store to 'write' such messages, and when they are acknowledged
+%% it will issue a 'remove'. That 'remove' may be issued before the
+%% msg_store has processed the 'write'. There is then no point going
+%% ahead with the processing of that 'write'.
+%%
+%% To detect this situation a 'flying_ets' table is shared between the
+%% clients and the server. The table is keyed on the combination of
+%% client (reference) and msg id, and the value represents an
+%% integration of all the writes and removes currently "in flight" for
+%% that message between the client and server - '+1' means all the
+%% writes/removes add up to a single 'write', '-1' to a 'remove', and
+%% '0' to nothing. (NB: the integration can never add up to more than
+%% one 'write' or 'read' since clients must not write/remove a message
+%% more than once without first removing/writing it).
+%%
+%% Maintaining this table poses two challenges: 1) both the clients
+%% and the server access and update the table, which causes
+%% concurrency issues, 2) we must ensure that entries do not stay in
+%% the table forever, since that would constitute a memory leak. We
+%% address the former by carefully modelling all operations as
+%% sequences of atomic actions that produce valid results in all
+%% possible interleavings. We address the latter by deleting table
+%% entries whenever the server finds a 0-valued entry during the
+%% processing of a write/remove. 0 is essentially equivalent to "no
+%% entry". If, OTOH, the value is non-zero we know there is at least
+%% one other 'write' or 'remove' in flight, so we get an opportunity
+%% later to delete the table entry when processing these.
+%%
+%% There are two further complications. We need to ensure that 1)
+%% eliminated writes still get confirmed, and 2) the write-back cache
+%% doesn't grow unbounded. These are quite straightforward to
+%% address. See the comments in the code.
+%%
%% For notes on Clean Shutdown and startup, see documentation in
%% variable_queue.
@@ -392,7 +434,7 @@ successfully_recovered_state(Server) ->
client_init(Server, Ref, MsgOnDiskFun, CloseFDsFun) ->
{IState, IModule, Dir, GCPid,
- FileHandlesEts, FileSummaryEts, CurFileCacheEts} =
+ FileHandlesEts, FileSummaryEts, CurFileCacheEts, FlyingEts} =
gen_server2:call(
Server, {new_client_state, Ref, MsgOnDiskFun, CloseFDsFun}, infinity),
#client_msstate { server = Server,
@@ -404,7 +446,8 @@ client_init(Server, Ref, MsgOnDiskFun, CloseFDsFun) ->
gc_pid = GCPid,
file_handles_ets = FileHandlesEts,
file_summary_ets = FileSummaryEts,
- cur_file_cache_ets = CurFileCacheEts }.
+ cur_file_cache_ets = CurFileCacheEts,
+ flying_ets = FlyingEts }.
client_terminate(CState = #client_msstate { client_ref = Ref }) ->
close_all_handles(CState),
@@ -420,6 +463,7 @@ client_ref(#client_msstate { client_ref = Ref }) -> Ref.
write(MsgId, Msg,
CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts,
client_ref = CRef }) ->
+ ok = client_update_flying(+1, MsgId, CState),
ok = update_msg_cache(CurFileCacheEts, MsgId, Msg),
ok = server_cast(CState, {write, CRef, MsgId}).
@@ -440,6 +484,7 @@ read(MsgId,
contains(MsgId, CState) -> server_call(CState, {contains, MsgId}).
remove([], _CState) -> ok;
remove(MsgIds, CState = #client_msstate { client_ref = CRef }) ->
+ [client_update_flying(-1, MsgId, CState) || MsgId <- MsgIds],
server_cast(CState, {remove, CRef, MsgIds}).
set_maximum_since_use(Server, Age) ->
@@ -566,6 +611,21 @@ client_read3(#msg_location { msg_id = MsgId, file = File }, Defer,
end
end.
+client_update_flying(Diff, MsgId, #client_msstate { flying_ets = FlyingEts,
+ client_ref = CRef }) ->
+ Key = {MsgId, CRef},
+ case ets:insert_new(FlyingEts, {Key, Diff}) of
+ true -> ok;
+ false -> try ets:update_counter(FlyingEts, Key, {2, Diff})
+ catch error:badarg ->
+ %% this is guaranteed to succeed since the
+ %% server only removes and updates flying_ets
+ %% entries; it never inserts them
+ true = ets:insert_new(FlyingEts, {Key, Diff})
+ end,
+ ok
+ end.
+
clear_client(CRef, State = #msstate { cref_to_msg_ids = CTM,
dying_clients = DyingClients }) ->
State #msstate { cref_to_msg_ids = dict:erase(CRef, CTM),
@@ -619,6 +679,7 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
FileHandlesEts = ets:new(rabbit_msg_store_shared_file_handles,
[ordered_set, public]),
CurFileCacheEts = ets:new(rabbit_msg_store_cur_file, [set, public]),
+ FlyingEts = ets:new(rabbit_msg_store_flying, [set, public]),
{ok, FileSizeLimit} = application:get_env(msg_store_file_size_limit),
@@ -645,6 +706,7 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
file_handles_ets = FileHandlesEts,
file_summary_ets = FileSummaryEts,
cur_file_cache_ets = CurFileCacheEts,
+ flying_ets = FlyingEts,
dying_clients = sets:new(),
clients = Clients,
successfully_recovered = CleanShutdown,
@@ -700,11 +762,13 @@ handle_call({new_client_state, CRef, MsgOnDiskFun, CloseFDsFun}, _From,
file_handles_ets = FileHandlesEts,
file_summary_ets = FileSummaryEts,
cur_file_cache_ets = CurFileCacheEts,
+ flying_ets = FlyingEts,
clients = Clients,
gc_pid = GCPid }) ->
Clients1 = dict:store(CRef, {MsgOnDiskFun, CloseFDsFun}, Clients),
reply({IndexState, IndexModule, Dir, GCPid, FileHandlesEts, FileSummaryEts,
- CurFileCacheEts}, State #msstate { clients = Clients1 });
+ CurFileCacheEts, FlyingEts},
+ State #msstate { clients = Clients1 });
handle_call({client_terminate, CRef}, _From, State) ->
reply(ok, clear_client(CRef, State));
@@ -723,40 +787,54 @@ handle_cast({client_dying, CRef},
noreply(write_message(CRef, <<>>,
State #msstate { dying_clients = DyingClients1 }));
-handle_cast({client_delete, CRef}, State = #msstate { clients = Clients }) ->
+handle_cast({client_delete, CRef},
+ State = #msstate { clients = Clients }) ->
State1 = State #msstate { clients = dict:erase(CRef, Clients) },
noreply(remove_message(CRef, CRef, clear_client(CRef, State1)));
handle_cast({write, CRef, MsgId},
State = #msstate { cur_file_cache_ets = CurFileCacheEts }) ->
true = 0 =< ets:update_counter(CurFileCacheEts, MsgId, {3, -1}),
- [{MsgId, Msg, _CacheRefCount}] = ets:lookup(CurFileCacheEts, MsgId),
- noreply(
- case write_action(should_mask_action(CRef, MsgId, State), MsgId, State) of
- {write, State1} ->
- write_message(CRef, MsgId, Msg, State1);
- {ignore, CurFile, State1 = #msstate { current_file = CurFile }} ->
- State1;
- {ignore, _File, State1} ->
- true = ets:delete_object(CurFileCacheEts, {MsgId, Msg, 0}),
- State1;
- {confirm, CurFile, State1 = #msstate { current_file = CurFile }}->
- record_pending_confirm(CRef, MsgId, State1);
- {confirm, _File, State1} ->
- true = ets:delete_object(CurFileCacheEts, {MsgId, Msg, 0}),
- update_pending_confirms(
- fun (MsgOnDiskFun, CTM) ->
- MsgOnDiskFun(gb_sets:singleton(MsgId), written),
- CTM
- end, CRef, State1)
- end);
+ case update_flying(-1, MsgId, CRef, State) of
+ process ->
+ [{MsgId, Msg, _PWC}] = ets:lookup(CurFileCacheEts, MsgId),
+ noreply(write_message(MsgId, Msg, CRef, State));
+ ignore ->
+ %% A 'remove' has already been issued and eliminated the
+ %% 'write'.
+ State1 = blind_confirm(CRef, gb_sets:singleton(MsgId),
+ ignored, State),
+ %% If all writes get eliminated, cur_file_cache_ets could
+ %% grow unbounded. To prevent that we delete the cache
+ %% entry here, but only if the message isn't in the
+ %% current file. That way reads of the message can
+ %% continue to be done client side, from either the cache
+ %% or the non-current files. If the message *is* in the
+ %% current file then the cache entry will be removed by
+ %% the normal logic for that in write_message/4 and
+ %% maybe_roll_to_new_file/2.
+ case index_lookup(MsgId, State1) of
+ [#msg_location { file = File }]
+ when File == State1 #msstate.current_file ->
+ ok;
+ _ ->
+ true = ets:match_delete(CurFileCacheEts, {MsgId, '_', 0})
+ end,
+ noreply(State1)
+ end;
handle_cast({remove, CRef, MsgIds}, State) ->
- State1 = lists:foldl(
- fun (MsgId, State2) -> remove_message(MsgId, CRef, State2) end,
- State, MsgIds),
- noreply(maybe_compact(client_confirm(CRef, gb_sets:from_list(MsgIds),
- removed, State1)));
+ {RemovedMsgIds, State1} =
+ lists:foldl(
+ fun (MsgId, {Removed, State2}) ->
+ case update_flying(+1, MsgId, CRef, State2) of
+ process -> {[MsgId | Removed],
+ remove_message(MsgId, CRef, State2)};
+ ignore -> {Removed, State2}
+ end
+ end, {[], State}, MsgIds),
+ noreply(maybe_compact(client_confirm(CRef, gb_sets:from_list(RemovedMsgIds),
+ ignored, State1)));
handle_cast({combine_files, Source, Destination, Reclaimed},
State = #msstate { sum_file_size = SumFileSize,
@@ -797,6 +875,7 @@ terminate(_Reason, State = #msstate { index_state = IndexState,
file_handles_ets = FileHandlesEts,
file_summary_ets = FileSummaryEts,
cur_file_cache_ets = CurFileCacheEts,
+ flying_ets = FlyingEts,
clients = Clients,
dir = Dir }) ->
%% stop the gc first, otherwise it could be working and we pull
@@ -810,8 +889,8 @@ terminate(_Reason, State = #msstate { index_state = IndexState,
end,
State3 = close_all_handles(State1),
ok = store_file_summary(FileSummaryEts, Dir),
- [true = ets:delete(T) ||
- T <- [FileSummaryEts, FileHandlesEts, CurFileCacheEts]],
+ [true = ets:delete(T) || T <- [FileSummaryEts, FileHandlesEts,
+ CurFileCacheEts, FlyingEts]],
IndexModule:terminate(IndexState),
ok = store_recovery_terms([{client_refs, dict:fetch_keys(Clients)},
{index_module, IndexModule}], Dir),
@@ -874,6 +953,19 @@ internal_sync(State = #msstate { current_file_handle = CurHdl,
client_confirm(CRef, MsgIds, written, StateN)
end, State1, CGs).
+update_flying(Diff, MsgId, CRef, #msstate { flying_ets = FlyingEts }) ->
+ Key = {MsgId, CRef},
+ NDiff = -Diff,
+ case ets:lookup(FlyingEts, Key) of
+ [] -> ignore;
+ [{_, Diff}] -> ignore;
+ [{_, NDiff}] -> ets:update_counter(FlyingEts, Key, {2, Diff}),
+ true = ets:delete_object(FlyingEts, {Key, 0}),
+ process;
+ [{_, 0}] -> true = ets:delete_object(FlyingEts, {Key, 0}),
+ ignore
+ end.
+
write_action({true, not_found}, _MsgId, State) ->
{ignore, undefined, State};
write_action({true, #msg_location { file = File }}, _MsgId, State) ->
@@ -905,8 +997,65 @@ write_action({_Mask, #msg_location { ref_count = RefCount, file = File }},
%% field otherwise bad interaction with concurrent GC
{confirm, File, State}.
-write_message(CRef, MsgId, Msg, State) ->
- write_message(MsgId, Msg, record_pending_confirm(CRef, MsgId, State)).
+write_message(MsgId, Msg, CRef,
+ State = #msstate { cur_file_cache_ets = CurFileCacheEts }) ->
+ case write_action(should_mask_action(CRef, MsgId, State), MsgId, State) of
+ {write, State1} ->
+ write_message(MsgId, Msg,
+ record_pending_confirm(CRef, MsgId, State1));
+ {ignore, CurFile, State1 = #msstate { current_file = CurFile }} ->
+ State1;
+ {ignore, _File, State1} ->
+ true = ets:delete_object(CurFileCacheEts, {MsgId, Msg, 0}),
+ State1;
+ {confirm, CurFile, State1 = #msstate { current_file = CurFile }}->
+ record_pending_confirm(CRef, MsgId, State1);
+ {confirm, _File, State1} ->
+ true = ets:delete_object(CurFileCacheEts, {MsgId, Msg, 0}),
+ update_pending_confirms(
+ fun (MsgOnDiskFun, CTM) ->
+ MsgOnDiskFun(gb_sets:singleton(MsgId), written),
+ CTM
+ end, CRef, State1)
+ end.
+
+remove_message(MsgId, CRef,
+ State = #msstate { file_summary_ets = FileSummaryEts }) ->
+ case should_mask_action(CRef, MsgId, State) of
+ {true, _Location} ->
+ State;
+ {false_if_increment, #msg_location { ref_count = 0 }} ->
+ %% CRef has tried to both write and remove this msg whilst
+ %% it's being GC'd.
+ %%
+ %% ASSERTION: [#file_summary { locked = true }] =
+ %% ets:lookup(FileSummaryEts, File),
+ State;
+ {_Mask, #msg_location { ref_count = RefCount, file = File,
+ total_size = TotalSize }}
+ when RefCount > 0 ->
+ %% only update field, otherwise bad interaction with
+ %% concurrent GC
+ Dec = fun () -> index_update_ref_count(
+ MsgId, RefCount - 1, State) end,
+ case RefCount of
+ %% don't remove from cur_file_cache_ets here because
+ %% there may be further writes in the mailbox for the
+ %% same msg.
+ 1 -> case ets:lookup(FileSummaryEts, File) of
+ [#file_summary { locked = true }] ->
+ add_to_pending_gc_completion(
+ {remove, MsgId, CRef}, File, State);
+ [#file_summary {}] ->
+ ok = Dec(),
+ delete_file_if_empty(
+ File, adjust_valid_total_size(
+ File, -TotalSize, State))
+ end;
+ _ -> ok = Dec(),
+ State
+ end
+ end.
write_message(MsgId, Msg,
State = #msstate { current_file_handle = CurHdl,
@@ -1004,43 +1153,6 @@ contains_message(MsgId, From,
end
end.
-remove_message(MsgId, CRef,
- State = #msstate { file_summary_ets = FileSummaryEts }) ->
- case should_mask_action(CRef, MsgId, State) of
- {true, _Location} ->
- State;
- {false_if_increment, #msg_location { ref_count = 0 }} ->
- %% CRef has tried to both write and remove this msg
- %% whilst it's being GC'd. ASSERTION:
- %% [#file_summary { locked = true }] =
- %% ets:lookup(FileSummaryEts, File),
- State;
- {_Mask, #msg_location { ref_count = RefCount, file = File,
- total_size = TotalSize }} when RefCount > 0 ->
- %% only update field, otherwise bad interaction with
- %% concurrent GC
- Dec = fun () ->
- index_update_ref_count(MsgId, RefCount - 1, State)
- end,
- case RefCount of
- %% don't remove from CUR_FILE_CACHE_ETS_NAME here
- %% because there may be further writes in the mailbox
- %% for the same msg.
- 1 -> case ets:lookup(FileSummaryEts, File) of
- [#file_summary { locked = true }] ->
- add_to_pending_gc_completion(
- {remove, MsgId, CRef}, File, State);
- [#file_summary {}] ->
- ok = Dec(),
- delete_file_if_empty(
- File, adjust_valid_total_size(File, -TotalSize,
- State))
- end;
- _ -> ok = Dec(),
- State
- end
- end.
-
add_to_pending_gc_completion(
Op, File, State = #msstate { pending_gc_completion = Pending }) ->
State #msstate { pending_gc_completion =
@@ -1069,8 +1181,13 @@ safe_ets_update_counter(Tab, Key, UpdateOp, SuccessFun, FailThunk) ->
catch error:badarg -> FailThunk()
end.
-safe_ets_update_counter_ok(Tab, Key, UpdateOp, FailThunk) ->
- safe_ets_update_counter(Tab, Key, UpdateOp, fun (_) -> ok end, FailThunk).
+update_msg_cache(CacheEts, MsgId, Msg) ->
+ case ets:insert_new(CacheEts, {MsgId, Msg, 1}) of
+ true -> ok;
+ false -> safe_ets_update_counter(
+ CacheEts, MsgId, {3, +1}, fun (_) -> ok end,
+ fun () -> update_msg_cache(CacheEts, MsgId, Msg) end)
+ end.
adjust_valid_total_size(File, Delta, State = #msstate {
sum_valid_data = SumValid,
@@ -1115,6 +1232,11 @@ client_confirm(CRef, MsgIds, ActionTaken, State) ->
end
end, CRef, State).
+blind_confirm(CRef, MsgIds, ActionTaken, State) ->
+ update_pending_confirms(
+ fun (MsgOnDiskFun, CTM) -> MsgOnDiskFun(MsgIds, ActionTaken), CTM end,
+ CRef, State).
+
%% Detect whether the MsgId is older or younger than the client's death
%% msg (if there is one). If the msg is older than the client death
%% msg, and it has a 0 ref_count we must only alter the ref_count, not
@@ -1257,18 +1379,6 @@ list_sorted_file_names(Dir, Ext) ->
filelib:wildcard("*" ++ Ext, Dir)).
%%----------------------------------------------------------------------------
-%% message cache helper functions
-%%----------------------------------------------------------------------------
-
-update_msg_cache(CacheEts, MsgId, Msg) ->
- case ets:insert_new(CacheEts, {MsgId, Msg, 1}) of
- true -> ok;
- false -> safe_ets_update_counter_ok(
- CacheEts, MsgId, {3, +1},
- fun () -> update_msg_cache(CacheEts, MsgId, Msg) end)
- end.
-
-%%----------------------------------------------------------------------------
%% index
%%----------------------------------------------------------------------------
diff --git a/src/rabbit_plugins.erl b/src/rabbit_plugins.erl
index 27dadfc5..b06bcd83 100644
--- a/src/rabbit_plugins.erl
+++ b/src/rabbit_plugins.erl
@@ -67,7 +67,7 @@ start() ->
print_error("~p", [Reason]),
rabbit_misc:quit(2);
Other ->
- print_error("~s", [Other]),
+ print_error("~p", [Other]),
rabbit_misc:quit(2)
end.
diff --git a/src/rabbit_prelaunch.erl b/src/rabbit_prelaunch.erl
index d34ed44a..50444dc4 100644
--- a/src/rabbit_prelaunch.erl
+++ b/src/rabbit_prelaunch.erl
@@ -22,6 +22,7 @@
-define(BaseApps, [rabbit]).
-define(ERROR_CODE, 1).
+-define(EPMD_TIMEOUT, 30000).
%%----------------------------------------------------------------------------
%% Specs
@@ -233,7 +234,8 @@ post_process_script(ScriptFile) ->
end.
process_entry(Entry = {apply,{application,start_boot,[mnesia,permanent]}}) ->
- [{apply,{rabbit,prepare,[]}}, Entry];
+ [{apply,{rabbit,maybe_hipe_compile,[]}},
+ {apply,{rabbit,prepare,[]}}, Entry];
process_entry(Entry) ->
[Entry].
@@ -244,7 +246,7 @@ duplicate_node_check([]) ->
duplicate_node_check(NodeStr) ->
Node = rabbit_misc:makenode(NodeStr),
{NodeName, NodeHost} = rabbit_misc:nodeparts(Node),
- case net_adm:names(NodeHost) of
+ case names(NodeHost) of
{ok, NamePorts} ->
case proplists:is_defined(NodeName, NamePorts) of
true -> io:format("node with name ~p "
@@ -260,6 +262,7 @@ duplicate_node_check(NodeStr) ->
[NodeHost, EpmdReason,
case EpmdReason of
address -> "unable to establish tcp connection";
+ timeout -> "timed out establishing tcp connection";
_ -> inet:format_error(EpmdReason)
end])
end.
@@ -276,3 +279,15 @@ terminate(Status) ->
after infinity -> ok
end
end.
+
+names(Hostname) ->
+ Self = self(),
+ process_flag(trap_exit, true),
+ Pid = spawn_link(fun () -> Self ! {names, net_adm:names(Hostname)} end),
+ timer:exit_after(?EPMD_TIMEOUT, Pid, timeout),
+ Res = receive
+ {names, Names} -> Names;
+ {'EXIT', Pid, Reason} -> {error, Reason}
+ end,
+ process_flag(trap_exit, false),
+ Res.
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index f1751e95..4b545466 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -188,7 +188,7 @@
dirty_count :: integer(),
max_journal_entries :: non_neg_integer(),
on_sync :: on_sync_fun(),
- unsynced_msg_ids :: [rabbit_types:msg_id()]
+ unsynced_msg_ids :: gb_set()
}).
-type(contains_predicate() :: fun ((rabbit_types:msg_id()) -> boolean())).
-type(walker(A) :: fun ((A) -> 'finished' |
@@ -263,9 +263,10 @@ publish(MsgId, SeqId, MsgProps, IsPersistent,
State = #qistate { unsynced_msg_ids = UnsyncedMsgIds })
when is_binary(MsgId) ->
?MSG_ID_BYTES = size(MsgId),
- {JournalHdl, State1} = get_journal_handle(
- State #qistate {
- unsynced_msg_ids = [MsgId | UnsyncedMsgIds] }),
+ {JournalHdl, State1} =
+ get_journal_handle(
+ State #qistate {
+ unsynced_msg_ids = gb_sets:add_element(MsgId, UnsyncedMsgIds) }),
ok = file_handle_cache:append(
JournalHdl, [<<(case IsPersistent of
true -> ?PUB_PERSIST_JPREFIX;
@@ -285,7 +286,7 @@ ack(SeqIds, State) ->
%% This is only called when there are outstanding confirms and the
%% queue is idle.
sync(State = #qistate { unsynced_msg_ids = MsgIds }) ->
- sync_if([] =/= MsgIds, State).
+ sync_if(not gb_sets:is_empty(MsgIds), State).
sync(SeqIds, State) ->
%% The SeqIds here contains the SeqId of every publish and ack to
@@ -387,7 +388,7 @@ blank_state(QueueName) ->
dirty_count = 0,
max_journal_entries = MaxJournal,
on_sync = fun (_) -> ok end,
- unsynced_msg_ids = [] }.
+ unsynced_msg_ids = gb_sets:new() }.
clean_file_name(Dir) -> filename:join(Dir, ?CLEAN_FILENAME).
@@ -712,8 +713,8 @@ sync_if(true, State = #qistate { journal_handle = JournalHdl }) ->
notify_sync(State).
notify_sync(State = #qistate { unsynced_msg_ids = UG, on_sync = OnSyncFun }) ->
- OnSyncFun(gb_sets:from_list(UG)),
- State #qistate { unsynced_msg_ids = [] }.
+ OnSyncFun(UG),
+ State #qistate { unsynced_msg_ids = gb_sets:new() }.
%%----------------------------------------------------------------------------
%% segment manipulation
diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl
index e9c4479a..31f5ad14 100644
--- a/src/rabbit_router.erl
+++ b/src/rabbit_router.erl
@@ -44,6 +44,11 @@
%%----------------------------------------------------------------------------
+deliver([], #delivery{mandatory = false,
+ immediate = false}) ->
+ %% /dev/null optimisation
+ {routed, []};
+
deliver(QNames, Delivery = #delivery{mandatory = false,
immediate = false}) ->
%% optimisation: when Mandatory = false and Immediate = false,
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 7a36dd60..00d46f5a 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -30,12 +30,6 @@
-define(TRANSIENT_MSG_STORE, msg_store_transient).
-define(CLEANUP_QUEUE_NAME, <<"cleanup-queue">>).
-test_content_prop_roundtrip(Datum, Binary) ->
- Types = [element(1, E) || E <- Datum],
- Values = [element(2, E) || E <- Datum],
- Values = rabbit_binary_parser:parse_properties(Types, Binary), %% assertion
- Binary = rabbit_binary_generator:encode_properties(Types, Values). %% assertion
-
all_tests() ->
passed = gm_tests:all_tests(),
passed = mirrored_supervisor_tests:all_tests(),
@@ -44,7 +38,6 @@ all_tests() ->
passed = test_file_handle_cache(),
passed = test_backing_queue(),
passed = test_priority_queue(),
- passed = test_bpqueue(),
passed = test_pg_local(),
passed = test_unfold(),
passed = test_supervisor_delayed_restart(),
@@ -262,143 +255,6 @@ test_priority_queue(Q) ->
priority_queue:to_list(Q),
priority_queue_out_all(Q)}.
-test_bpqueue() ->
- Q = bpqueue:new(),
- true = bpqueue:is_empty(Q),
- 0 = bpqueue:len(Q),
- [] = bpqueue:to_list(Q),
-
- Q1 = bpqueue_test(fun bpqueue:in/3, fun bpqueue:out/1,
- fun bpqueue:to_list/1,
- fun bpqueue:foldl/3, fun bpqueue:map_fold_filter_l/4),
- Q2 = bpqueue_test(fun bpqueue:in_r/3, fun bpqueue:out_r/1,
- fun (QR) -> lists:reverse(
- [{P, lists:reverse(L)} ||
- {P, L} <- bpqueue:to_list(QR)])
- end,
- fun bpqueue:foldr/3, fun bpqueue:map_fold_filter_r/4),
-
- [{foo, [1, 2]}, {bar, [3]}] = bpqueue:to_list(bpqueue:join(Q, Q1)),
- [{bar, [3]}, {foo, [2, 1]}] = bpqueue:to_list(bpqueue:join(Q2, Q)),
- [{foo, [1, 2]}, {bar, [3, 3]}, {foo, [2,1]}] =
- bpqueue:to_list(bpqueue:join(Q1, Q2)),
-
- [{foo, [1, 2]}, {bar, [3]}, {foo, [1, 2]}, {bar, [3]}] =
- bpqueue:to_list(bpqueue:join(Q1, Q1)),
-
- [{foo, [1, 2]}, {bar, [3]}] =
- bpqueue:to_list(
- bpqueue:from_list(
- [{x, []}, {foo, [1]}, {y, []}, {foo, [2]}, {bar, [3]}, {z, []}])),
-
- [{undefined, [a]}] = bpqueue:to_list(bpqueue:from_list([{undefined, [a]}])),
-
- {4, [a,b,c,d]} =
- bpqueue:foldl(
- fun (Prefix, Value, {Prefix, Acc}) ->
- {Prefix + 1, [Value | Acc]}
- end,
- {0, []}, bpqueue:from_list([{0,[d]}, {1,[c]}, {2,[b]}, {3,[a]}])),
-
- [{bar,3}, {foo,2}, {foo,1}] =
- bpqueue:foldr(fun (P, V, I) -> [{P,V} | I] end, [], Q2),
-
- BPQL = [{foo,[1,2,2]}, {bar,[3,4,5]}, {foo,[5,6,7]}],
- BPQ = bpqueue:from_list(BPQL),
-
- %% no effect
- {BPQL, 0} = bpqueue_mffl([none], {none, []}, BPQ),
- {BPQL, 0} = bpqueue_mffl([foo,bar], {none, [1]}, BPQ),
- {BPQL, 0} = bpqueue_mffl([bar], {none, [3]}, BPQ),
- {BPQL, 0} = bpqueue_mffr([bar], {foo, [5]}, BPQ),
-
- %% process 1 item
- {[{foo,[-1,2,2]}, {bar,[3,4,5]}, {foo,[5,6,7]}], 1} =
- bpqueue_mffl([foo,bar], {foo, [2]}, BPQ),
- {[{foo,[1,2,2]}, {bar,[-3,4,5]}, {foo,[5,6,7]}], 1} =
- bpqueue_mffl([bar], {bar, [4]}, BPQ),
- {[{foo,[1,2,2]}, {bar,[3,4,5]}, {foo,[5,6,-7]}], 1} =
- bpqueue_mffr([foo,bar], {foo, [6]}, BPQ),
- {[{foo,[1,2,2]}, {bar,[3,4]}, {baz,[-5]}, {foo,[5,6,7]}], 1} =
- bpqueue_mffr([bar], {baz, [4]}, BPQ),
-
- %% change prefix
- {[{bar,[-1,-2,-2,-3,-4,-5,-5,-6,-7]}], 9} =
- bpqueue_mffl([foo,bar], {bar, []}, BPQ),
- {[{bar,[-1,-2,-2,3,4,5]}, {foo,[5,6,7]}], 3} =
- bpqueue_mffl([foo], {bar, [5]}, BPQ),
- {[{bar,[-1,-2,-2,3,4,5,-5,-6]}, {foo,[7]}], 5} =
- bpqueue_mffl([foo], {bar, [7]}, BPQ),
- {[{foo,[1,2,2,-3,-4]}, {bar,[5]}, {foo,[5,6,7]}], 2} =
- bpqueue_mffl([bar], {foo, [5]}, BPQ),
- {[{bar,[-1,-2,-2,3,4,5,-5,-6,-7]}], 6} =
- bpqueue_mffl([foo], {bar, []}, BPQ),
- {[{foo,[1,2,2,-3,-4,-5,5,6,7]}], 3} =
- bpqueue_mffl([bar], {foo, []}, BPQ),
-
- %% edge cases
- {[{foo,[-1,-2,-2]}, {bar,[3,4,5]}, {foo,[5,6,7]}], 3} =
- bpqueue_mffl([foo], {foo, [5]}, BPQ),
- {[{foo,[1,2,2]}, {bar,[3,4,5]}, {foo,[-5,-6,-7]}], 3} =
- bpqueue_mffr([foo], {foo, [2]}, BPQ),
-
- passed.
-
-bpqueue_test(In, Out, List, Fold, MapFoldFilter) ->
- Q = bpqueue:new(),
- {empty, _Q} = Out(Q),
-
- ok = Fold(fun (Prefix, Value, ok) -> {error, Prefix, Value} end, ok, Q),
- {Q1M, 0} = MapFoldFilter(fun(_P) -> throw(explosion) end,
- fun(_V, _N) -> throw(explosion) end, 0, Q),
- [] = bpqueue:to_list(Q1M),
-
- Q1 = In(bar, 3, In(foo, 2, In(foo, 1, Q))),
- false = bpqueue:is_empty(Q1),
- 3 = bpqueue:len(Q1),
- [{foo, [1, 2]}, {bar, [3]}] = List(Q1),
-
- {{value, foo, 1}, Q3} = Out(Q1),
- {{value, foo, 2}, Q4} = Out(Q3),
- {{value, bar, 3}, _Q5} = Out(Q4),
-
- F = fun (QN) ->
- MapFoldFilter(fun (foo) -> true;
- (_) -> false
- end,
- fun (2, _Num) -> stop;
- (V, Num) -> {bar, -V, V - Num} end,
- 0, QN)
- end,
- {Q6, 0} = F(Q),
- [] = bpqueue:to_list(Q6),
- {Q7, 1} = F(Q1),
- [{bar, [-1]}, {foo, [2]}, {bar, [3]}] = List(Q7),
-
- Q1.
-
-bpqueue_mffl(FF1A, FF2A, BPQ) ->
- bpqueue_mff(fun bpqueue:map_fold_filter_l/4, FF1A, FF2A, BPQ).
-
-bpqueue_mffr(FF1A, FF2A, BPQ) ->
- bpqueue_mff(fun bpqueue:map_fold_filter_r/4, FF1A, FF2A, BPQ).
-
-bpqueue_mff(Fold, FF1A, FF2A, BPQ) ->
- FF1 = fun (Prefixes) ->
- fun (P) -> lists:member(P, Prefixes) end
- end,
- FF2 = fun ({Prefix, Stoppers}) ->
- fun (Val, Num) ->
- case lists:member(Val, Stoppers) of
- true -> stop;
- false -> {Prefix, -Val, 1 + Num}
- end
- end
- end,
- Queue_to_list = fun ({LHS, RHS}) -> {bpqueue:to_list(LHS), RHS} end,
-
- Queue_to_list(Fold(FF1(FF1A), FF2(FF2A), 0, BPQ)).
-
test_simple_n_element_queue(N) ->
Items = lists:seq(1, N),
Q = priority_queue_in_all(priority_queue:new(), Items),
@@ -444,67 +300,69 @@ test_parsing() ->
passed = test_field_values(),
passed.
+test_content_prop_encoding(Datum, Binary) ->
+ Types = [element(1, E) || E <- Datum],
+ Values = [element(2, E) || E <- Datum],
+ Binary = rabbit_binary_generator:encode_properties(Types, Values). %% assertion
+
test_content_properties() ->
- test_content_prop_roundtrip([], <<0, 0>>),
- test_content_prop_roundtrip([{bit, true}, {bit, false}, {bit, true}, {bit, false}],
- <<16#A0, 0>>),
- test_content_prop_roundtrip([{bit, true}, {octet, 123}, {bit, true}, {octet, undefined},
- {bit, true}],
- <<16#E8,0,123>>),
- test_content_prop_roundtrip([{bit, true}, {octet, 123}, {octet, 123}, {bit, true}],
- <<16#F0,0,123,123>>),
- test_content_prop_roundtrip([{bit, true}, {shortstr, <<"hi">>}, {bit, true},
- {shortint, 54321}, {bit, true}],
- <<16#F8,0,2,"hi",16#D4,16#31>>),
- test_content_prop_roundtrip([{bit, true}, {shortstr, undefined}, {bit, true},
- {shortint, 54321}, {bit, true}],
- <<16#B8,0,16#D4,16#31>>),
- test_content_prop_roundtrip([{table, [{<<"a signedint">>, signedint, 12345678},
- {<<"a longstr">>, longstr, <<"yes please">>},
- {<<"a decimal">>, decimal, {123, 12345678}},
- {<<"a timestamp">>, timestamp, 123456789012345},
- {<<"a nested table">>, table,
- [{<<"one">>, signedint, 1},
- {<<"two">>, signedint, 2}]}]}],
- <<
- %% property-flags
- 16#8000:16,
-
- %% property-list:
-
- %% table
- 117:32, % table length in bytes
-
- 11,"a signedint", % name
- "I",12345678:32, % type and value
-
- 9,"a longstr",
- "S",10:32,"yes please",
-
- 9,"a decimal",
- "D",123,12345678:32,
-
- 11,"a timestamp",
- "T", 123456789012345:64,
-
- 14,"a nested table",
- "F",
- 18:32,
-
- 3,"one",
- "I",1:32,
-
- 3,"two",
- "I",2:32 >>),
- case catch rabbit_binary_parser:parse_properties([bit, bit, bit, bit], <<16#A0,0,1>>) of
- {'EXIT', content_properties_binary_overflow} -> passed;
- V -> exit({got_success_but_expected_failure, V})
- end.
+ test_content_prop_encoding([], <<0, 0>>),
+ test_content_prop_encoding([{bit, true}, {bit, false}, {bit, true}, {bit, false}],
+ <<16#A0, 0>>),
+ test_content_prop_encoding([{bit, true}, {octet, 123}, {bit, true}, {octet, undefined},
+ {bit, true}],
+ <<16#E8,0,123>>),
+ test_content_prop_encoding([{bit, true}, {octet, 123}, {octet, 123}, {bit, true}],
+ <<16#F0,0,123,123>>),
+ test_content_prop_encoding([{bit, true}, {shortstr, <<"hi">>}, {bit, true},
+ {shortint, 54321}, {bit, true}],
+ <<16#F8,0,2,"hi",16#D4,16#31>>),
+ test_content_prop_encoding([{bit, true}, {shortstr, undefined}, {bit, true},
+ {shortint, 54321}, {bit, true}],
+ <<16#B8,0,16#D4,16#31>>),
+ test_content_prop_encoding([{table, [{<<"a signedint">>, signedint, 12345678},
+ {<<"a longstr">>, longstr, <<"yes please">>},
+ {<<"a decimal">>, decimal, {123, 12345678}},
+ {<<"a timestamp">>, timestamp, 123456789012345},
+ {<<"a nested table">>, table,
+ [{<<"one">>, signedint, 1},
+ {<<"two">>, signedint, 2}]}]}],
+ <<
+ %% property-flags
+ 16#8000:16,
+
+ %% property-list:
+
+ %% table
+ 117:32, % table length in bytes
+
+ 11,"a signedint", % name
+ "I",12345678:32, % type and value
+
+ 9,"a longstr",
+ "S",10:32,"yes please",
+
+ 9,"a decimal",
+ "D",123,12345678:32,
+
+ 11,"a timestamp",
+ "T", 123456789012345:64,
+
+ 14,"a nested table",
+ "F",
+ 18:32,
+
+ 3,"one",
+ "I",1:32,
+
+ 3,"two",
+ "I",2:32 >>),
+ passed.
test_field_values() ->
%% FIXME this does not test inexact numbers (double and float) yet,
%% because they won't pass the equality assertions
- test_content_prop_roundtrip(
+ test_content_prop_encoding(
[{table, [{<<"longstr">>, longstr, <<"Here is a long string">>},
{<<"signedint">>, signedint, 12345},
{<<"decimal">>, decimal, {3, 123456}},
@@ -1842,6 +1700,8 @@ on_disk_capture() ->
stop -> done
end.
+on_disk_capture([_|_], _Awaiting, Pid) ->
+ Pid ! {self(), surplus};
on_disk_capture(OnDisk, Awaiting, Pid) ->
receive
{on_disk, MsgIdsS} ->
@@ -1850,12 +1710,10 @@ on_disk_capture(OnDisk, Awaiting, Pid) ->
Pid);
stop ->
done
- after 200 ->
- case {OnDisk, Awaiting} of
- {[], []} -> Pid ! {self(), arrived}, on_disk_capture();
- {_, []} -> Pid ! {self(), surplus};
- {[], _} -> Pid ! {self(), timeout};
- {_, _} -> Pid ! {self(), surplus_timeout}
+ after (case Awaiting of [] -> 200; _ -> 1000 end) ->
+ case Awaiting of
+ [] -> Pid ! {self(), arrived}, on_disk_capture();
+ _ -> Pid ! {self(), timeout}
end
end.
@@ -1920,7 +1778,7 @@ foreach_with_msg_store_client(MsgStore, Ref, Fun, L) ->
test_msg_store() ->
restart_msg_store_empty(),
MsgIds = [msg_id_bin(M) || M <- lists:seq(1,100)],
- {MsgIds1stHalf, MsgIds2ndHalf} = lists:split(50, MsgIds),
+ {MsgIds1stHalf, MsgIds2ndHalf} = lists:split(length(MsgIds) div 2, MsgIds),
Ref = rabbit_guid:guid(),
{Cap, MSCState} = msg_store_client_init_capture(
?PERSISTENT_MSG_STORE, Ref),
@@ -1931,6 +1789,8 @@ test_msg_store() ->
false = msg_store_contains(false, MsgIds, MSCState),
%% test confirm logic
passed = test_msg_store_confirms([hd(MsgIds)], Cap, MSCState),
+ %% check we don't contain any of the msgs we're about to publish
+ false = msg_store_contains(false, MsgIds, MSCState),
%% publish the first half
ok = msg_store_write(MsgIds1stHalf, MSCState),
%% sync on the first half
@@ -2038,12 +1898,12 @@ test_msg_store() ->
false = msg_store_contains(false, MsgIdsBig, MSCStateM),
MSCStateM
end),
+ %%
+ passed = test_msg_store_client_delete_and_terminate(),
%% restart empty
restart_msg_store_empty(),
passed.
-%% We want to test that writes that get eliminated due to removes still
-%% get confirmed. Removes themselves do not.
test_msg_store_confirms(MsgIds, Cap, MSCState) ->
%% write -> confirmed
ok = msg_store_write(MsgIds, MSCState),
@@ -2069,6 +1929,45 @@ test_msg_store_confirms(MsgIds, Cap, MSCState) ->
ok = msg_store_write(MsgIds, MSCState),
ok = msg_store_remove(MsgIds, MSCState),
ok = on_disk_await(Cap, MsgIds),
+ %% confirmation on timer-based sync
+ passed = test_msg_store_confirm_timer(),
+ passed.
+
+test_msg_store_confirm_timer() ->
+ Ref = rabbit_guid:guid(),
+ MsgId = msg_id_bin(1),
+ Self = self(),
+ MSCState = rabbit_msg_store:client_init(
+ ?PERSISTENT_MSG_STORE, Ref,
+ fun (MsgIds, _ActionTaken) ->
+ case gb_sets:is_member(MsgId, MsgIds) of
+ true -> Self ! on_disk;
+ false -> ok
+ end
+ end, undefined),
+ ok = msg_store_write([MsgId], MSCState),
+ ok = msg_store_keep_busy_until_confirm([msg_id_bin(2)], MSCState),
+ ok = msg_store_remove([MsgId], MSCState),
+ ok = rabbit_msg_store:client_delete_and_terminate(MSCState),
+ passed.
+
+msg_store_keep_busy_until_confirm(MsgIds, MSCState) ->
+ receive
+ on_disk -> ok
+ after 0 ->
+ ok = msg_store_write(MsgIds, MSCState),
+ ok = msg_store_remove(MsgIds, MSCState),
+ msg_store_keep_busy_until_confirm(MsgIds, MSCState)
+ end.
+
+test_msg_store_client_delete_and_terminate() ->
+ restart_msg_store_empty(),
+ MsgIds = [msg_id_bin(M) || M <- lists:seq(1, 10)],
+ Ref = rabbit_guid:guid(),
+ MSCState = msg_store_client_init(?PERSISTENT_MSG_STORE, Ref),
+ ok = msg_store_write(MsgIds, MSCState),
+ %% test the 'dying client' fast path for writes
+ ok = rabbit_msg_store:client_delete_and_terminate(MSCState),
passed.
queue_name(Name) ->
@@ -2374,22 +2273,16 @@ test_variable_queue_requeue(VQ0) ->
Seq = lists:seq(1, Count),
VQ1 = rabbit_variable_queue:set_ram_duration_target(0, VQ0),
VQ2 = variable_queue_publish(false, Count, VQ1),
- {VQ3, Acks} = lists:foldl(
- fun (_N, {VQN, AckTags}) ->
- {{#basic_message{}, false, AckTag, _}, VQM} =
- rabbit_variable_queue:fetch(true, VQN),
- {VQM, [AckTag | AckTags]}
- end, {VQ2, []}, Seq),
+ {VQ3, Acks} = variable_queue_fetch(Count, false, false, Count, VQ2),
Subset = lists:foldl(fun ({Ack, N}, Acc) when N rem Interval == 0 ->
[Ack | Acc];
(_, Acc) ->
Acc
end, [], lists:zip(Acks, Seq)),
- {_MsgIds, VQ4} = rabbit_variable_queue:requeue(Acks -- Subset,
- fun(X) -> X end, VQ3),
+ {_MsgIds, VQ4} = rabbit_variable_queue:requeue(Acks -- Subset, VQ3),
VQ5 = lists:foldl(fun (AckTag, VQN) ->
{_MsgId, VQM} = rabbit_variable_queue:requeue(
- [AckTag], fun(X) -> X end, VQN),
+ [AckTag], VQN),
VQM
end, VQ4, Subset),
VQ6 = lists:foldl(fun (AckTag, VQa) ->
@@ -2507,7 +2400,7 @@ test_variable_queue_partial_segments_delta_thing(VQ0) ->
{_Duration, VQ2} = rabbit_variable_queue:ram_duration(VQ1),
VQ3 = check_variable_queue_status(
rabbit_variable_queue:set_ram_duration_target(0, VQ2),
- %% one segment in q3 as betas, and half a segment in delta
+ %% one segment in q3, and half a segment in delta
[{delta, {delta, SegmentSize, HalfSegment, OneAndAHalfSegment}},
{q3, SegmentSize},
{len, SegmentSize + HalfSegment}]),
@@ -2523,7 +2416,7 @@ test_variable_queue_partial_segments_delta_thing(VQ0) ->
SegmentSize + HalfSegment + 1, VQ5),
VQ7 = check_variable_queue_status(
VQ6,
- %% the half segment should now be in q3 as betas
+ %% the half segment should now be in q3
[{q1, 1},
{delta, {delta, undefined, 0, undefined}},
{q3, HalfSegment},
@@ -2573,7 +2466,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) ->
VQ2 = variable_queue_publish(false, 4, VQ1),
{VQ3, AckTags} = variable_queue_fetch(2, false, false, 4, VQ2),
{_Guids, VQ4} =
- rabbit_variable_queue:requeue(AckTags, fun(X) -> X end, VQ3),
+ rabbit_variable_queue:requeue(AckTags, VQ3),
VQ5 = rabbit_variable_queue:timeout(VQ4),
_VQ6 = rabbit_variable_queue:terminate(shutdown, VQ5),
VQ7 = variable_queue_init(test_amqqueue(true), true),
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index b853d983..63a0927f 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -18,7 +18,7 @@
-export([init/3, terminate/2, delete_and_terminate/2,
purge/1, publish/4, publish_delivered/5, drain_confirmed/1,
- dropwhile/2, fetch/2, ack/2, requeue/3, len/1, is_empty/1,
+ dropwhile/2, fetch/2, ack/2, requeue/2, len/1, is_empty/1,
set_ram_duration_target/2, ram_duration/1,
needs_timeout/1, timeout/1, handle_pre_hibernate/1,
status/1, invoke/3, is_duplicate/2, discard/3,
@@ -49,17 +49,15 @@
%% within the queue are always held on disk, *in addition* to being in
%% one of the above classifications.
%%
-%% Also note that within this code, the term gamma never
-%% appears. Instead, gammas are defined by betas who have had their
-%% queue position recorded on disk.
+%% Also note that within this code, the term gamma seldom
+%% appears. It's frequently the case that gammas are defined by betas
+%% who have had their queue position recorded on disk.
%%
%% In general, messages move q1 -> q2 -> delta -> q3 -> q4, though
%% many of these steps are frequently skipped. q1 and q4 only hold
-%% alphas, q2 and q3 hold both betas and gammas (as queues of queues,
-%% using the bpqueue module where the block prefix determines whether
-%% they're betas or gammas). When a message arrives, its
-%% classification is determined. It is then added to the rightmost
-%% appropriate queue.
+%% alphas, q2 and q3 hold both betas and gammas. When a message
+%% arrives, its classification is determined. It is then added to the
+%% rightmost appropriate queue.
%%
%% If a new message is determined to be a beta or gamma, q1 is
%% empty. If a new message is determined to be a delta, q1 and q2 are
@@ -74,14 +72,15 @@
%%
%% The duration indicated to us by the memory_monitor is used to
%% calculate, given our current ingress and egress rates, how many
-%% messages we should hold in RAM. We track the ingress and egress
-%% rates for both messages and pending acks and rates for both are
-%% considered when calculating the number of messages to hold in
-%% RAM. When we need to push alphas to betas or betas to gammas, we
-%% favour writing out messages that are further from the head of the
-%% queue. This minimises writes to disk, as the messages closer to the
-%% tail of the queue stay in the queue for longer, thus do not need to
-%% be replaced as quickly by sending other messages to disk.
+%% messages we should hold in RAM (i.e. as alphas). We track the
+%% ingress and egress rates for both messages and pending acks and
+%% rates for both are considered when calculating the number of
+%% messages to hold in RAM. When we need to push alphas to betas or
+%% betas to gammas, we favour writing out messages that are further
+%% from the head of the queue. This minimises writes to disk, as the
+%% messages closer to the tail of the queue stay in the queue for
+%% longer, thus do not need to be replaced as quickly by sending other
+%% messages to disk.
%%
%% Whilst messages are pushed to disk and forgotten from RAM as soon
%% as requested by a new setting of the queue RAM duration, the
@@ -119,17 +118,43 @@
%% getting rid of messages as fast as possible and remaining
%% responsive, and using only the egress rate impacts that goal.
%%
-%% If a queue is full of transient messages, then the transition from
-%% betas to deltas will be potentially very expensive as millions of
-%% entries must be written to disk by the queue_index module. This can
-%% badly stall the queue. In order to avoid this, the proportion of
-%% gammas / (betas+gammas) must not be lower than (betas+gammas) /
-%% (alphas+betas+gammas). As the queue grows or available memory
-%% shrinks, the latter ratio increases, requiring the conversion of
-%% more gammas to betas in order to maintain the invariant. At the
-%% point at which betas and gammas must be converted to deltas, there
-%% should be very few betas remaining, thus the transition is fast (no
-%% work needs to be done for the gamma -> delta transition).
+%% Once the queue has more alphas than the target_ram_count, the
+%% surplus must be converted to betas, if not gammas, if not rolled
+%% into delta. The conditions under which these transitions occur
+%% reflect the conflicting goals of minimising RAM cost per msg, and
+%% minimising CPU cost per msg. Once the msg has become a beta, its
+%% payload is no longer in RAM, thus a read from the msg_store must
+%% occur before the msg can be delivered, but the RAM cost of a beta
+%% is the same as a gamma, so converting a beta to gamma will not free
+%% up any further RAM. To reduce the RAM cost further, the gamma must
+%% be rolled into delta. Whilst recovering a beta or a gamma to an
+%% alpha requires only one disk read (from the msg_store), recovering
+%% a msg from within delta will require two reads (queue_index and
+%% then msg_store). But delta has a near-0 per-msg RAM cost. So the
+%% conflict is between using delta more, which will free up more
+%% memory, but require additional CPU and disk ops, versus using delta
+%% less and gammas and betas more, which will cost more memory, but
+%% require fewer disk ops and less CPU overhead.
+%%
+%% In the case of a persistent msg published to a durable queue, the
+%% msg is immediately written to the msg_store and queue_index. If
+%% then additionally converted from an alpha, it'll immediately go to
+%% a gamma (as it's already in queue_index), and cannot exist as a
+%% beta. Thus a durable queue with a mixture of persistent and
+%% transient msgs in it which has more messages than permitted by the
+%% target_ram_count may contain an interspersed mixture of betas and
+%% gammas in q2 and q3.
+%%
+%% There is then a ratio that controls how many betas and gammas there
+%% can be. This is based on the target_ram_count and thus expresses
+%% the fact that as the number of permitted alphas in the queue falls,
+%% so should the number of betas and gammas fall (i.e. delta
+%% grows). If q2 and q3 contain more than the permitted number of
+%% betas and gammas, then the surplus are forcibly converted to gammas
+%% (as necessary) and then rolled into delta. The ratio is that
+%% delta/(betas+gammas+delta) equals
+%% (betas+gammas+delta)/(target_ram_count+betas+gammas+delta). I.e. as
+%% the target_ram_count shrinks to 0, so must betas and gammas.
%%
%% The conversion of betas to gammas is done in batches of exactly
%% ?IO_BATCH_SIZE. This value should not be too small, otherwise the
@@ -137,8 +162,7 @@
%% effectively amortised (switching the direction of queue access
%% defeats amortisation), nor should it be too big, otherwise
%% converting a batch stalls the queue for too long. Therefore, it
-%% must be just right. ram_index_count is used here and is the number
-%% of betas.
+%% must be just right.
%%
%% The conversion from alphas to betas is also chunked, but only to
%% ensure no more than ?IO_BATCH_SIZE alphas are converted to betas at
@@ -248,7 +272,6 @@
ram_msg_count,
ram_msg_count_prev,
ram_ack_count_prev,
- ram_index_count,
out_counter,
in_counter,
rates,
@@ -280,8 +303,6 @@
end_seq_id %% end_seq_id is exclusive
}).
--record(merge_funs, {new, join, out, in, publish}).
-
%% When we discover, on publish, that we should write some indices to
%% disk for some betas, the IO_BATCH_SIZE sets the number of betas
%% that we must be due to write indices for before we do any work at
@@ -291,6 +312,7 @@
-define(IO_BATCH_SIZE, 64).
-define(PERSISTENT_MSG_STORE, msg_store_persistent).
-define(TRANSIENT_MSG_STORE, msg_store_transient).
+-define(QUEUE, lqueue).
-include("rabbit.hrl").
@@ -315,11 +337,11 @@
end_seq_id :: non_neg_integer() }).
-type(state() :: #vqstate {
- q1 :: queue(),
- q2 :: bpqueue:bpqueue(),
+ q1 :: ?QUEUE:?QUEUE(),
+ q2 :: ?QUEUE:?QUEUE(),
delta :: delta(),
- q3 :: bpqueue:bpqueue(),
- q4 :: queue(),
+ q3 :: ?QUEUE:?QUEUE(),
+ q4 :: ?QUEUE:?QUEUE(),
next_seq_id :: seq_id(),
pending_ack :: gb_tree(),
ram_ack_index :: gb_tree(),
@@ -337,7 +359,6 @@
target_ram_count :: non_neg_integer() | 'infinity',
ram_msg_count :: non_neg_integer(),
ram_msg_count_prev :: non_neg_integer(),
- ram_index_count :: non_neg_integer(),
out_counter :: non_neg_integer(),
in_counter :: non_neg_integer(),
rates :: rates(),
@@ -475,23 +496,22 @@ purge(State = #vqstate { q4 = Q4,
%% we could simply wipe the qi instead of issuing delivers and
%% acks for all the messages.
{LensByStore, IndexState1} = remove_queue_entries(
- fun rabbit_misc:queue_fold/3, Q4,
+ fun ?QUEUE:foldl/3, Q4,
orddict:new(), IndexState, MSCState),
{LensByStore1, State1 = #vqstate { q1 = Q1,
index_state = IndexState2,
msg_store_clients = MSCState1 }} =
purge_betas_and_deltas(LensByStore,
- State #vqstate { q4 = queue:new(),
+ State #vqstate { q4 = ?QUEUE:new(),
index_state = IndexState1 }),
{LensByStore2, IndexState3} = remove_queue_entries(
- fun rabbit_misc:queue_fold/3, Q1,
+ fun ?QUEUE:foldl/3, Q1,
LensByStore1, IndexState2, MSCState1),
PCount1 = PCount - find_persistent_count(LensByStore2),
- {Len, a(State1 #vqstate { q1 = queue:new(),
+ {Len, a(State1 #vqstate { q1 = ?QUEUE:new(),
index_state = IndexState3,
len = 0,
ram_msg_count = 0,
- ram_index_count = 0,
persistent_count = PCount1 })}.
publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId },
@@ -507,9 +527,9 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId },
IsPersistent1 = IsDurable andalso IsPersistent,
MsgStatus = msg_status(IsPersistent1, SeqId, Msg, MsgProps),
{MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State),
- State2 = case bpqueue:is_empty(Q3) of
- false -> State1 #vqstate { q1 = queue:in(m(MsgStatus1), Q1) };
- true -> State1 #vqstate { q4 = queue:in(m(MsgStatus1), Q4) }
+ State2 = case ?QUEUE:is_empty(Q3) of
+ false -> State1 #vqstate { q1 = ?QUEUE:in(m(MsgStatus1), Q1) };
+ true -> State1 #vqstate { q4 = ?QUEUE:in(m(MsgStatus1), Q4) }
end,
PCount1 = PCount + one_if(IsPersistent1),
UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC),
@@ -608,21 +628,19 @@ ack(AckTags, State) ->
persistent_count = PCount1,
ack_out_counter = AckOutCount + length(AckTags) })}.
-requeue(AckTags, MsgPropsFun, #vqstate { delta = Delta,
- q3 = Q3,
- q4 = Q4,
- in_counter = InCounter,
- len = Len } = State) ->
+requeue(AckTags, #vqstate { delta = Delta,
+ q3 = Q3,
+ q4 = Q4,
+ in_counter = InCounter,
+ len = Len } = State) ->
{SeqIds, Q4a, MsgIds, State1} = queue_merge(lists:sort(AckTags), Q4, [],
beta_limit(Q3),
- alpha_funs(),
- MsgPropsFun, State),
+ fun publish_alpha/2, State),
{SeqIds1, Q3a, MsgIds1, State2} = queue_merge(SeqIds, Q3, MsgIds,
delta_limit(Delta),
- beta_funs(),
- MsgPropsFun, State1),
+ fun publish_beta/2, State1),
{Delta1, MsgIds2, State3} = delta_merge(SeqIds1, Delta, MsgIds1,
- MsgPropsFun, State2),
+ State2),
MsgCount = length(MsgIds2),
{MsgIds2, a(reduce_memory_use(
State3 #vqstate { delta = Delta1,
@@ -718,7 +736,6 @@ needs_timeout(State) ->
false -> case reduce_memory_use(
fun (_Quota, State1) -> {0, State1} end,
fun (_Quota, State1) -> State1 end,
- fun (State1) -> State1 end,
fun (_Quota, State1) -> {0, State1} end,
State) of
{true, _State} -> idle;
@@ -740,24 +757,22 @@ status(#vqstate {
ram_ack_index = RAI,
target_ram_count = TargetRamCount,
ram_msg_count = RamMsgCount,
- ram_index_count = RamIndexCount,
next_seq_id = NextSeqId,
persistent_count = PersistentCount,
rates = #rates { avg_egress = AvgEgressRate,
avg_ingress = AvgIngressRate },
ack_rates = #rates { avg_egress = AvgAckEgressRate,
avg_ingress = AvgAckIngressRate } }) ->
- [ {q1 , queue:len(Q1)},
- {q2 , bpqueue:len(Q2)},
+ [ {q1 , ?QUEUE:len(Q1)},
+ {q2 , ?QUEUE:len(Q2)},
{delta , Delta},
- {q3 , bpqueue:len(Q3)},
- {q4 , queue:len(Q4)},
+ {q3 , ?QUEUE:len(Q3)},
+ {q4 , ?QUEUE:len(Q4)},
{len , Len},
{pending_acks , gb_trees:size(PA)},
{target_ram_count , TargetRamCount},
{ram_msg_count , RamMsgCount},
{ram_ack_count , gb_trees:size(RAI)},
- {ram_index_count , RamIndexCount},
{next_seq_id , NextSeqId},
{persistent_count , PersistentCount},
{avg_ingress_rate , AvgIngressRate},
@@ -776,15 +791,14 @@ discard(_Msg, _ChPid, State) -> State.
%%----------------------------------------------------------------------------
a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
- len = Len,
- persistent_count = PersistentCount,
- ram_msg_count = RamMsgCount,
- ram_index_count = RamIndexCount }) ->
- E1 = queue:is_empty(Q1),
- E2 = bpqueue:is_empty(Q2),
+ len = Len,
+ persistent_count = PersistentCount,
+ ram_msg_count = RamMsgCount }) ->
+ E1 = ?QUEUE:is_empty(Q1),
+ E2 = ?QUEUE:is_empty(Q2),
ED = Delta#delta.count == 0,
- E3 = bpqueue:is_empty(Q3),
- E4 = queue:is_empty(Q4),
+ E3 = ?QUEUE:is_empty(Q3),
+ E4 = ?QUEUE:is_empty(Q4),
LZ = Len == 0,
true = E1 or not E3,
@@ -795,10 +809,13 @@ a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
true = Len >= 0,
true = PersistentCount >= 0,
true = RamMsgCount >= 0,
- true = RamIndexCount >= 0,
State.
+d(Delta = #delta { start_seq_id = Start, count = Count, end_seq_id = End })
+ when Start + Count =< End ->
+ Delta.
+
m(MsgStatus = #msg_status { msg = Msg,
is_persistent = IsPersistent,
msg_on_disk = MsgOnDisk,
@@ -891,54 +908,39 @@ betas_from_index_entries(List, TransientThreshold, PA, IndexState) ->
cons_if(not IsDelivered, SeqId, Delivers1),
[SeqId | Acks1]};
false -> case gb_trees:is_defined(SeqId, PA) of
- false -> {[m(#msg_status {
- seq_id = SeqId,
- msg_id = MsgId,
- msg = undefined,
- is_persistent = IsPersistent,
- is_delivered = IsDelivered,
- msg_on_disk = true,
- index_on_disk = true,
- msg_props = MsgProps
- }) | Filtered1],
- Delivers1,
- Acks1};
- true -> Acc
+ false ->
+ {?QUEUE:in_r(
+ m(#msg_status {
+ seq_id = SeqId,
+ msg_id = MsgId,
+ msg = undefined,
+ is_persistent = IsPersistent,
+ is_delivered = IsDelivered,
+ msg_on_disk = true,
+ index_on_disk = true,
+ msg_props = MsgProps
+ }), Filtered1),
+ Delivers1, Acks1};
+ true ->
+ Acc
end
end
- end, {[], [], []}, List),
- {bpqueue:from_list([{true, Filtered}]),
- rabbit_queue_index:ack(Acks,
- rabbit_queue_index:deliver(Delivers, IndexState))}.
-
-%% the first arg is the older delta
-combine_deltas(?BLANK_DELTA_PATTERN(X), ?BLANK_DELTA_PATTERN(Y)) ->
- ?BLANK_DELTA;
-combine_deltas(?BLANK_DELTA_PATTERN(X), #delta { start_seq_id = Start,
- count = Count,
- end_seq_id = End } = B) ->
- true = Start + Count =< End, %% ASSERTION
- B;
-combine_deltas(#delta { start_seq_id = Start,
- count = Count,
- end_seq_id = End } = A, ?BLANK_DELTA_PATTERN(Y)) ->
- true = Start + Count =< End, %% ASSERTION
- A;
-combine_deltas(#delta { start_seq_id = StartLow,
- count = CountLow,
- end_seq_id = EndLow },
- #delta { start_seq_id = StartHigh,
- count = CountHigh,
- end_seq_id = EndHigh }) ->
- Count = CountLow + CountHigh,
- true = (StartLow =< StartHigh) %% ASSERTIONS
- andalso ((StartLow + CountLow) =< EndLow)
- andalso ((StartHigh + CountHigh) =< EndHigh)
- andalso ((StartLow + Count) =< EndHigh),
- #delta { start_seq_id = StartLow, count = Count, end_seq_id = EndHigh }.
-
-beta_fold(Fun, Init, Q) ->
- bpqueue:foldr(fun (_Prefix, Value, Acc) -> Fun(Value, Acc) end, Init, Q).
+ end, {?QUEUE:new(), [], []}, List),
+ {Filtered, rabbit_queue_index:ack(
+ Acks, rabbit_queue_index:deliver(Delivers, IndexState))}.
+
+expand_delta(SeqId, ?BLANK_DELTA_PATTERN(X)) ->
+ d(#delta { start_seq_id = SeqId, count = 1, end_seq_id = SeqId + 1 });
+expand_delta(SeqId, #delta { start_seq_id = StartSeqId,
+ count = Count } = Delta)
+ when SeqId < StartSeqId ->
+ d(Delta #delta { start_seq_id = SeqId, count = Count + 1 });
+expand_delta(SeqId, #delta { count = Count,
+ end_seq_id = EndSeqId } = Delta)
+ when SeqId >= EndSeqId ->
+ d(Delta #delta { count = Count + 1, end_seq_id = SeqId + 1 });
+expand_delta(_SeqId, #delta { count = Count } = Delta) ->
+ d(Delta #delta { count = Count + 1 }).
update_rate(Now, Then, Count, {OThen, OCount}) ->
%% avg over the current period and the previous
@@ -955,17 +957,17 @@ init(IsDurable, IndexState, DeltaCount, Terms, AsyncCallback,
DeltaCount1 = proplists:get_value(persistent_count, Terms, DeltaCount),
Delta = case DeltaCount1 == 0 andalso DeltaCount /= undefined of
true -> ?BLANK_DELTA;
- false -> #delta { start_seq_id = LowSeqId,
- count = DeltaCount1,
- end_seq_id = NextSeqId }
+ false -> d(#delta { start_seq_id = LowSeqId,
+ count = DeltaCount1,
+ end_seq_id = NextSeqId })
end,
Now = now(),
State = #vqstate {
- q1 = queue:new(),
- q2 = bpqueue:new(),
+ q1 = ?QUEUE:new(),
+ q2 = ?QUEUE:new(),
delta = Delta,
- q3 = bpqueue:new(),
- q4 = queue:new(),
+ q3 = ?QUEUE:new(),
+ q4 = ?QUEUE:new(),
next_seq_id = NextSeqId,
pending_ack = gb_trees:empty(),
ram_ack_index = gb_trees:empty(),
@@ -983,7 +985,6 @@ init(IsDurable, IndexState, DeltaCount, Terms, AsyncCallback,
ram_msg_count = 0,
ram_msg_count_prev = 0,
ram_ack_count_prev = 0,
- ram_index_count = 0,
out_counter = 0,
in_counter = 0,
rates = blank_rate(Now, DeltaCount1),
@@ -1003,21 +1004,19 @@ blank_rate(Timestamp, IngressLength) ->
avg_ingress = 0.0,
timestamp = Timestamp }.
-in_r(MsgStatus = #msg_status { msg = undefined, index_on_disk = IndexOnDisk },
- State = #vqstate { q3 = Q3, q4 = Q4, ram_index_count = RamIndexCount }) ->
- case queue:is_empty(Q4) of
- true -> State #vqstate {
- q3 = bpqueue:in_r(IndexOnDisk, MsgStatus, Q3),
- ram_index_count = RamIndexCount + one_if(not IndexOnDisk) };
+in_r(MsgStatus = #msg_status { msg = undefined },
+ State = #vqstate { q3 = Q3, q4 = Q4 }) ->
+ case ?QUEUE:is_empty(Q4) of
+ true -> State #vqstate { q3 = ?QUEUE:in_r(MsgStatus, Q3) };
false -> {MsgStatus1, State1 = #vqstate { q4 = Q4a }} =
read_msg(MsgStatus, State),
- State1 #vqstate { q4 = queue:in_r(MsgStatus1, Q4a) }
+ State1 #vqstate { q4 = ?QUEUE:in_r(MsgStatus1, Q4a) }
end;
in_r(MsgStatus, State = #vqstate { q4 = Q4 }) ->
- State #vqstate { q4 = queue:in_r(MsgStatus, Q4) }.
+ State #vqstate { q4 = ?QUEUE:in_r(MsgStatus, Q4) }.
queue_out(State = #vqstate { q4 = Q4 }) ->
- case queue:out(Q4) of
+ case ?QUEUE:out(Q4) of
{empty, _Q4} ->
case fetch_from_q3(State) of
{empty, _State1} = Result -> Result;
@@ -1095,15 +1094,15 @@ purge_betas_and_deltas(LensByStore,
State = #vqstate { q3 = Q3,
index_state = IndexState,
msg_store_clients = MSCState }) ->
- case bpqueue:is_empty(Q3) of
+ case ?QUEUE:is_empty(Q3) of
true -> {LensByStore, State};
false -> {LensByStore1, IndexState1} =
- remove_queue_entries(fun beta_fold/3, Q3,
+ remove_queue_entries(fun ?QUEUE:foldl/3, Q3,
LensByStore, IndexState, MSCState),
purge_betas_and_deltas(LensByStore1,
maybe_deltas_to_betas(
State #vqstate {
- q3 = bpqueue:new(),
+ q3 = ?QUEUE:new(),
index_state = IndexState1 }))
end.
@@ -1291,7 +1290,7 @@ blind_confirm(Callback, MsgIdSet) ->
Callback(?MODULE,
fun (?MODULE, State) -> record_confirms(MsgIdSet, State) end).
-msgs_written_to_disk(Callback, MsgIdSet, removed) ->
+msgs_written_to_disk(Callback, MsgIdSet, ignored) ->
blind_confirm(Callback, MsgIdSet);
msgs_written_to_disk(Callback, MsgIdSet, written) ->
Callback(?MODULE,
@@ -1321,116 +1320,70 @@ msg_indices_written_to_disk(Callback, MsgIdSet) ->
%% Internal plumbing for requeue
%%----------------------------------------------------------------------------
-alpha_funs() ->
- #merge_funs {
- new = fun queue:new/0,
- join = fun queue:join/2,
- out = fun queue:out/1,
- in = fun queue:in/2,
- publish = fun (#msg_status { msg = undefined } = MsgStatus, State) ->
- read_msg(MsgStatus, State);
- (MsgStatus, #vqstate {
- ram_msg_count = RamMsgCount } = State) ->
- {MsgStatus, State #vqstate {
- ram_msg_count = RamMsgCount + 1 }}
- end}.
-
-beta_funs() ->
- #merge_funs {
- new = fun bpqueue:new/0,
- join = fun bpqueue:join/2,
- out = fun (Q) ->
- case bpqueue:out(Q) of
- {{value, _IndexOnDisk, MsgStatus}, Q1} ->
- {{value, MsgStatus}, Q1};
- {empty, _Q1} = X ->
- X
- end
- end,
- in = fun (#msg_status { index_on_disk = IOD } = MsgStatus, Q) ->
- bpqueue:in(IOD, MsgStatus, Q)
- end,
- publish = fun (#msg_status { msg_on_disk = MsgOnDisk } = MsgStatus,
- State) ->
- {#msg_status { index_on_disk = IndexOnDisk,
- msg = Msg} = MsgStatus1,
- #vqstate { ram_index_count = RamIndexCount,
- ram_msg_count = RamMsgCount } =
- State1} =
- maybe_write_to_disk(not MsgOnDisk, false,
- MsgStatus, State),
- {MsgStatus1, State1 #vqstate {
- ram_msg_count = RamMsgCount +
- one_if(Msg =/= undefined),
- ram_index_count = RamIndexCount +
- one_if(not IndexOnDisk) }}
- end}.
+publish_alpha(#msg_status { msg = undefined } = MsgStatus, State) ->
+ read_msg(MsgStatus, State);
+publish_alpha(MsgStatus, #vqstate {ram_msg_count = RamMsgCount } = State) ->
+ {MsgStatus, State #vqstate { ram_msg_count = RamMsgCount + 1 }}.
+
+publish_beta(MsgStatus, State) ->
+ {#msg_status { msg = Msg} = MsgStatus1,
+ #vqstate { ram_msg_count = RamMsgCount } = State1} =
+ maybe_write_to_disk(true, false, MsgStatus, State),
+ {MsgStatus1, State1 #vqstate {
+ ram_msg_count = RamMsgCount + one_if(Msg =/= undefined) }}.
%% Rebuild queue, inserting sequence ids to maintain ordering
-queue_merge(SeqIds, Q, MsgIds, Limit, #merge_funs { new = QNew } = Funs,
- MsgPropsFun, State) ->
- queue_merge(SeqIds, Q, QNew(), MsgIds, Limit, Funs, MsgPropsFun, State).
+queue_merge(SeqIds, Q, MsgIds, Limit, PubFun, State) ->
+ queue_merge(SeqIds, Q, ?QUEUE:new(), MsgIds,
+ Limit, PubFun, State).
-queue_merge([SeqId | Rest] = SeqIds, Q, Front, MsgIds, Limit,
- #merge_funs { out = QOut, in = QIn, publish = QPublish } = Funs,
- MsgPropsFun, State)
+queue_merge([SeqId | Rest] = SeqIds, Q, Front, MsgIds,
+ Limit, PubFun, State)
when Limit == undefined orelse SeqId < Limit ->
- case QOut(Q) of
+ case ?QUEUE:out(Q) of
{{value, #msg_status { seq_id = SeqIdQ } = MsgStatus}, Q1}
when SeqIdQ < SeqId ->
%% enqueue from the remaining queue
- queue_merge(SeqIds, Q1, QIn(MsgStatus, Front), MsgIds,
- Limit, Funs, MsgPropsFun, State);
+ queue_merge(SeqIds, Q1, ?QUEUE:in(MsgStatus, Front), MsgIds,
+ Limit, PubFun, State);
{_, _Q1} ->
%% enqueue from the remaining list of sequence ids
- {MsgStatus, State1} = msg_from_pending_ack(SeqId, MsgPropsFun,
- State),
+ {MsgStatus, State1} = msg_from_pending_ack(SeqId, State),
{#msg_status { msg_id = MsgId } = MsgStatus1, State2} =
- QPublish(MsgStatus, State1),
- queue_merge(Rest, Q, QIn(MsgStatus1, Front), [MsgId | MsgIds],
- Limit, Funs, MsgPropsFun, State2)
+ PubFun(MsgStatus, State1),
+ queue_merge(Rest, Q, ?QUEUE:in(MsgStatus1, Front), [MsgId | MsgIds],
+ Limit, PubFun, State2)
end;
-queue_merge(SeqIds, Q, Front, MsgIds, _Limit, #merge_funs { join = QJoin },
- _MsgPropsFun, State) ->
- {SeqIds, QJoin(Front, Q), MsgIds, State}.
+queue_merge(SeqIds, Q, Front, MsgIds,
+ _Limit, _PubFun, State) ->
+ {SeqIds, ?QUEUE:join(Front, Q), MsgIds, State}.
-delta_merge([], Delta, MsgIds, _MsgPropsFun, State) ->
+delta_merge([], Delta, MsgIds, State) ->
{Delta, MsgIds, State};
-delta_merge(SeqIds, #delta { start_seq_id = StartSeqId,
- count = Count,
- end_seq_id = EndSeqId} = Delta,
- MsgIds, MsgPropsFun, State) ->
+delta_merge(SeqIds, Delta, MsgIds, State) ->
lists:foldl(fun (SeqId, {Delta0, MsgIds0, State0}) ->
- {#msg_status { msg_id = MsgId,
- index_on_disk = IndexOnDisk,
- msg_on_disk = MsgOnDisk} = MsgStatus,
- State1} =
- msg_from_pending_ack(SeqId, MsgPropsFun, State0),
+ {#msg_status { msg_id = MsgId } = MsgStatus, State1} =
+ msg_from_pending_ack(SeqId, State0),
{_MsgStatus, State2} =
- maybe_write_to_disk(not MsgOnDisk, not IndexOnDisk,
- MsgStatus, State1),
- {Delta0 #delta {
- start_seq_id = lists:min([SeqId, StartSeqId]),
- count = Count + 1,
- end_seq_id = lists:max([SeqId + 1, EndSeqId]) },
- [MsgId | MsgIds0], State2}
+ maybe_write_to_disk(true, true, MsgStatus, State1),
+ {expand_delta(SeqId, Delta0), [MsgId | MsgIds0], State2}
end, {Delta, MsgIds, State}, SeqIds).
%% Mostly opposite of record_pending_ack/2
-msg_from_pending_ack(SeqId, MsgPropsFun, State) ->
+msg_from_pending_ack(SeqId, State) ->
{#msg_status { msg_props = MsgProps } = MsgStatus, State1} =
remove_pending_ack(SeqId, State),
{MsgStatus #msg_status {
- msg_props = (MsgPropsFun(MsgProps)) #message_properties {
- needs_confirming = false } }, State1}.
+ msg_props = MsgProps #message_properties { needs_confirming = false } },
+ State1}.
-beta_limit(BPQ) ->
- case bpqueue:out(BPQ) of
- {{value, _Prefix, #msg_status { seq_id = SeqId }}, _BPQ} -> SeqId;
- {empty, _BPQ} -> undefined
+beta_limit(Q) ->
+ case ?QUEUE:peek(Q) of
+ {value, #msg_status { seq_id = SeqId }} -> SeqId;
+ empty -> undefined
end.
-delta_limit(?BLANK_DELTA_PATTERN(_X)) -> undefined;
+delta_limit(?BLANK_DELTA_PATTERN(_X)) -> undefined;
delta_limit(#delta { start_seq_id = StartSeqId }) -> StartSeqId.
%%----------------------------------------------------------------------------
@@ -1456,10 +1409,10 @@ delta_limit(#delta { start_seq_id = StartSeqId }) -> StartSeqId.
%% one segment's worth of messages in q3 - and thus would risk
%% perpetually reporting the need for a conversion when no such
%% conversion is needed. That in turn could cause an infinite loop.
-reduce_memory_use(_AlphaBetaFun, _BetaGammaFun, _BetaDeltaFun, _AckFun,
+reduce_memory_use(_AlphaBetaFun, _BetaDeltaFun, _AckFun,
State = #vqstate {target_ram_count = infinity}) ->
{false, State};
-reduce_memory_use(AlphaBetaFun, BetaGammaFun, BetaDeltaFun, AckFun,
+reduce_memory_use(AlphaBetaFun, BetaDeltaFun, AckFun,
State = #vqstate {
ram_ack_index = RamAckIndex,
ram_msg_count = RamMsgCount,
@@ -1470,7 +1423,7 @@ reduce_memory_use(AlphaBetaFun, BetaGammaFun, BetaDeltaFun, AckFun,
avg_egress = AvgAckEgress }
}) ->
- {Reduce, State1} =
+ {Reduce, State1 = #vqstate { q2 = Q2, q3 = Q3 }} =
case chunk_size(RamMsgCount + gb_trees:size(RamAckIndex),
TargetRamCount) of
0 -> {false, State};
@@ -1491,13 +1444,10 @@ reduce_memory_use(AlphaBetaFun, BetaGammaFun, BetaDeltaFun, AckFun,
{true, State2}
end,
- case State1 #vqstate.target_ram_count of
- 0 -> {Reduce, BetaDeltaFun(State1)};
- _ -> case chunk_size(State1 #vqstate.ram_index_count,
- permitted_ram_index_count(State1)) of
- ?IO_BATCH_SIZE = S2 -> {true, BetaGammaFun(S2, State1)};
- _ -> {Reduce, State1}
- end
+ case chunk_size(?QUEUE:len(Q2) + ?QUEUE:len(Q3),
+ permitted_beta_count(State1)) of
+ ?IO_BATCH_SIZE = S2 -> {true, BetaDeltaFun(S2, State1)};
+ _ -> {Reduce, State1}
end.
limit_ram_acks(0, State) ->
@@ -1519,54 +1469,25 @@ limit_ram_acks(Quota, State = #vqstate { pending_ack = PA,
ram_ack_index = RAI1 })
end.
-
reduce_memory_use(State) ->
{_, State1} = reduce_memory_use(fun push_alphas_to_betas/2,
- fun limit_ram_index/2,
- fun push_betas_to_deltas/1,
+ fun push_betas_to_deltas/2,
fun limit_ram_acks/2,
State),
State1.
-limit_ram_index(Quota, State = #vqstate { q2 = Q2, q3 = Q3,
- index_state = IndexState,
- ram_index_count = RamIndexCount }) ->
- {Q2a, {Quota1, IndexState1}} = limit_ram_index(
- fun bpqueue:map_fold_filter_r/4,
- Q2, {Quota, IndexState}),
- %% TODO: we shouldn't be writing index entries for messages that
- %% can never end up in delta due them residing in the only segment
- %% held by q3.
- {Q3a, {Quota2, IndexState2}} = limit_ram_index(
- fun bpqueue:map_fold_filter_r/4,
- Q3, {Quota1, IndexState1}),
- State #vqstate { q2 = Q2a, q3 = Q3a,
- index_state = IndexState2,
- ram_index_count = RamIndexCount - (Quota - Quota2) }.
-
-limit_ram_index(_MapFoldFilterFun, Q, {0, IndexState}) ->
- {Q, {0, IndexState}};
-limit_ram_index(MapFoldFilterFun, Q, {Quota, IndexState}) ->
- MapFoldFilterFun(
- fun erlang:'not'/1,
- fun (MsgStatus, {0, _IndexStateN}) ->
- false = MsgStatus #msg_status.index_on_disk, %% ASSERTION
- stop;
- (MsgStatus, {N, IndexStateN}) when N > 0 ->
- false = MsgStatus #msg_status.index_on_disk, %% ASSERTION
- {MsgStatus1, IndexStateN1} =
- maybe_write_index_to_disk(true, MsgStatus, IndexStateN),
- {true, m(MsgStatus1), {N-1, IndexStateN1}}
- end, {Quota, IndexState}, Q).
-
-permitted_ram_index_count(#vqstate { len = 0 }) ->
+permitted_beta_count(#vqstate { len = 0 }) ->
infinity;
-permitted_ram_index_count(#vqstate { len = Len,
- q2 = Q2,
- q3 = Q3,
- delta = #delta { count = DeltaCount } }) ->
- BetaLen = bpqueue:len(Q2) + bpqueue:len(Q3),
- BetaLen - trunc(BetaLen * BetaLen / (Len - DeltaCount)).
+permitted_beta_count(#vqstate { target_ram_count = 0, q3 = Q3 }) ->
+ lists:min([?QUEUE:len(Q3), rabbit_queue_index:next_segment_boundary(0)]);
+permitted_beta_count(#vqstate { q1 = Q1,
+ q4 = Q4,
+ target_ram_count = TargetRamCount,
+ len = Len }) ->
+ BetaDelta = Len - ?QUEUE:len(Q1) - ?QUEUE:len(Q4),
+ lists:max([rabbit_queue_index:next_segment_boundary(0),
+ BetaDelta - ((BetaDelta * BetaDelta) div
+ (BetaDelta + TargetRamCount))]).
chunk_size(Current, Permitted)
when Permitted =:= infinity orelse Permitted >= Current ->
@@ -1574,41 +1495,35 @@ chunk_size(Current, Permitted)
chunk_size(Current, Permitted) ->
lists:min([Current - Permitted, ?IO_BATCH_SIZE]).
-fetch_from_q3(State = #vqstate {
- q1 = Q1,
- q2 = Q2,
- delta = #delta { count = DeltaCount },
- q3 = Q3,
- q4 = Q4,
- ram_index_count = RamIndexCount}) ->
- case bpqueue:out(Q3) of
+fetch_from_q3(State = #vqstate { q1 = Q1,
+ q2 = Q2,
+ delta = #delta { count = DeltaCount },
+ q3 = Q3,
+ q4 = Q4 }) ->
+ case ?QUEUE:out(Q3) of
{empty, _Q3} ->
{empty, State};
- {{value, IndexOnDisk, MsgStatus}, Q3a} ->
- RamIndexCount1 = RamIndexCount - one_if(not IndexOnDisk),
- true = RamIndexCount1 >= 0, %% ASSERTION
- State1 = State #vqstate { q3 = Q3a,
- ram_index_count = RamIndexCount1 },
- State2 =
- case {bpqueue:is_empty(Q3a), 0 == DeltaCount} of
- {true, true} ->
- %% q3 is now empty, it wasn't before; delta is
- %% still empty. So q2 must be empty, and we
- %% know q4 is empty otherwise we wouldn't be
- %% loading from q3. As such, we can just set
- %% q4 to Q1.
- true = bpqueue:is_empty(Q2), %% ASSERTION
- true = queue:is_empty(Q4), %% ASSERTION
- State1 #vqstate { q1 = queue:new(),
- q4 = Q1 };
- {true, false} ->
- maybe_deltas_to_betas(State1);
- {false, _} ->
- %% q3 still isn't empty, we've not touched
- %% delta, so the invariants between q1, q2,
- %% delta and q3 are maintained
- State1
- end,
+ {{value, MsgStatus}, Q3a} ->
+ State1 = State #vqstate { q3 = Q3a },
+ State2 = case {?QUEUE:is_empty(Q3a), 0 == DeltaCount} of
+ {true, true} ->
+ %% q3 is now empty, it wasn't before;
+ %% delta is still empty. So q2 must be
+ %% empty, and we know q4 is empty
+ %% otherwise we wouldn't be loading from
+ %% q3. As such, we can just set q4 to Q1.
+ true = ?QUEUE:is_empty(Q2), %% ASSERTION
+ true = ?QUEUE:is_empty(Q4), %% ASSERTION
+ State1 #vqstate { q1 = ?QUEUE:new(), q4 = Q1 };
+ {true, false} ->
+ maybe_deltas_to_betas(State1);
+ {false, _} ->
+ %% q3 still isn't empty, we've not
+ %% touched delta, so the invariants
+ %% between q1, q2, delta and q3 are
+ %% maintained
+ State1
+ end,
{loaded, {MsgStatus, State2}}
end.
@@ -1632,143 +1547,120 @@ maybe_deltas_to_betas(State = #vqstate {
{Q3a, IndexState2} =
betas_from_index_entries(List, TransientThreshold, PA, IndexState1),
State1 = State #vqstate { index_state = IndexState2 },
- case bpqueue:len(Q3a) of
+ case ?QUEUE:len(Q3a) of
0 ->
%% we ignored every message in the segment due to it being
%% transient and below the threshold
maybe_deltas_to_betas(
State1 #vqstate {
- delta = Delta #delta { start_seq_id = DeltaSeqId1 }});
+ delta = d(Delta #delta { start_seq_id = DeltaSeqId1 })});
Q3aLen ->
- Q3b = bpqueue:join(Q3, Q3a),
+ Q3b = ?QUEUE:join(Q3, Q3a),
case DeltaCount - Q3aLen of
0 ->
%% delta is now empty, but it wasn't before, so
%% can now join q2 onto q3
- State1 #vqstate { q2 = bpqueue:new(),
+ State1 #vqstate { q2 = ?QUEUE:new(),
delta = ?BLANK_DELTA,
- q3 = bpqueue:join(Q3b, Q2) };
+ q3 = ?QUEUE:join(Q3b, Q2) };
N when N > 0 ->
- Delta1 = #delta { start_seq_id = DeltaSeqId1,
- count = N,
- end_seq_id = DeltaSeqIdEnd },
+ Delta1 = d(#delta { start_seq_id = DeltaSeqId1,
+ count = N,
+ end_seq_id = DeltaSeqIdEnd }),
State1 #vqstate { delta = Delta1,
q3 = Q3b }
end
end.
push_alphas_to_betas(Quota, State) ->
- {Quota1, State1} = maybe_push_q1_to_betas(Quota, State),
- {Quota2, State2} = maybe_push_q4_to_betas(Quota1, State1),
+ {Quota1, State1} =
+ push_alphas_to_betas(
+ fun ?QUEUE:out/1,
+ fun (MsgStatus, Q1a,
+ State0 = #vqstate { q3 = Q3, delta = #delta { count = 0 } }) ->
+ State0 #vqstate { q1 = Q1a, q3 = ?QUEUE:in(MsgStatus, Q3) };
+ (MsgStatus, Q1a, State0 = #vqstate { q2 = Q2 }) ->
+ State0 #vqstate { q1 = Q1a, q2 = ?QUEUE:in(MsgStatus, Q2) }
+ end, Quota, State #vqstate.q1, State),
+ {Quota2, State2} =
+ push_alphas_to_betas(
+ fun ?QUEUE:out_r/1,
+ fun (MsgStatus, Q4a, State0 = #vqstate { q3 = Q3 }) ->
+ State0 #vqstate { q3 = ?QUEUE:in_r(MsgStatus, Q3), q4 = Q4a }
+ end, Quota1, State1 #vqstate.q4, State1),
{Quota2, State2}.
-maybe_push_q1_to_betas(Quota, State = #vqstate { q1 = Q1 }) ->
- maybe_push_alphas_to_betas(
- fun queue:out/1,
- fun (MsgStatus = #msg_status { index_on_disk = IndexOnDisk },
- Q1a, State1 = #vqstate { q3 = Q3, delta = #delta { count = 0 } }) ->
- State1 #vqstate { q1 = Q1a,
- q3 = bpqueue:in(IndexOnDisk, MsgStatus, Q3) };
- (MsgStatus = #msg_status { index_on_disk = IndexOnDisk },
- Q1a, State1 = #vqstate { q2 = Q2 }) ->
- State1 #vqstate { q1 = Q1a,
- q2 = bpqueue:in(IndexOnDisk, MsgStatus, Q2) }
- end, Quota, Q1, State).
-
-maybe_push_q4_to_betas(Quota, State = #vqstate { q4 = Q4 }) ->
- maybe_push_alphas_to_betas(
- fun queue:out_r/1,
- fun (MsgStatus = #msg_status { index_on_disk = IndexOnDisk },
- Q4a, State1 = #vqstate { q3 = Q3 }) ->
- State1 #vqstate { q3 = bpqueue:in_r(IndexOnDisk, MsgStatus, Q3),
- q4 = Q4a }
- end, Quota, Q4, State).
-
-maybe_push_alphas_to_betas(_Generator, _Consumer, Quota, _Q,
- State = #vqstate {
- ram_msg_count = RamMsgCount,
- target_ram_count = TargetRamCount })
+push_alphas_to_betas(_Generator, _Consumer, Quota, _Q,
+ State = #vqstate { ram_msg_count = RamMsgCount,
+ target_ram_count = TargetRamCount })
when Quota =:= 0 orelse
TargetRamCount =:= infinity orelse
TargetRamCount >= RamMsgCount ->
{Quota, State};
-maybe_push_alphas_to_betas(Generator, Consumer, Quota, Q, State) ->
+push_alphas_to_betas(Generator, Consumer, Quota, Q, State) ->
case Generator(Q) of
{empty, _Q} ->
{Quota, State};
{{value, MsgStatus}, Qa} ->
- {MsgStatus1 = #msg_status { msg_on_disk = true,
- index_on_disk = IndexOnDisk },
- State1 = #vqstate { ram_msg_count = RamMsgCount,
- ram_index_count = RamIndexCount }} =
+ {MsgStatus1 = #msg_status { msg_on_disk = true },
+ State1 = #vqstate { ram_msg_count = RamMsgCount }} =
maybe_write_to_disk(true, false, MsgStatus, State),
MsgStatus2 = m(trim_msg_status(MsgStatus1)),
- RamIndexCount1 = RamIndexCount + one_if(not IndexOnDisk),
- State2 = State1 #vqstate { ram_msg_count = RamMsgCount - 1,
- ram_index_count = RamIndexCount1 },
- maybe_push_alphas_to_betas(Generator, Consumer, Quota - 1, Qa,
- Consumer(MsgStatus2, Qa, State2))
+ State2 = State1 #vqstate { ram_msg_count = RamMsgCount - 1 },
+ push_alphas_to_betas(Generator, Consumer, Quota - 1, Qa,
+ Consumer(MsgStatus2, Qa, State2))
end.
-push_betas_to_deltas(State = #vqstate { q2 = Q2,
- delta = Delta,
- q3 = Q3,
- index_state = IndexState,
- ram_index_count = RamIndexCount }) ->
- {Delta2, Q2a, RamIndexCount2, IndexState2} =
- push_betas_to_deltas(fun (Q2MinSeqId) -> Q2MinSeqId end,
- fun bpqueue:out/1, Q2,
- RamIndexCount, IndexState),
- {Delta3, Q3a, RamIndexCount3, IndexState3} =
- push_betas_to_deltas(fun rabbit_queue_index:next_segment_boundary/1,
- fun bpqueue:out_r/1, Q3,
- RamIndexCount2, IndexState2),
- Delta4 = combine_deltas(Delta3, combine_deltas(Delta, Delta2)),
- State #vqstate { q2 = Q2a,
- delta = Delta4,
- q3 = Q3a,
- index_state = IndexState3,
- ram_index_count = RamIndexCount3 }.
-
-push_betas_to_deltas(LimitFun, Generator, Q, RamIndexCount, IndexState) ->
- case bpqueue:out(Q) of
- {empty, _Q} ->
- {?BLANK_DELTA, Q, RamIndexCount, IndexState};
- {{value, _IndexOnDisk1, #msg_status { seq_id = MinSeqId }}, _Qa} ->
- {{value, _IndexOnDisk2, #msg_status { seq_id = MaxSeqId }}, _Qb} =
- bpqueue:out_r(Q),
+push_betas_to_deltas(Quota, State = #vqstate { q2 = Q2,
+ delta = Delta,
+ q3 = Q3,
+ index_state = IndexState }) ->
+ PushState = {Quota, Delta, IndexState},
+ {Q3a, PushState1} = push_betas_to_deltas(
+ fun ?QUEUE:out_r/1,
+ fun rabbit_queue_index:next_segment_boundary/1,
+ Q3, PushState),
+ {Q2a, PushState2} = push_betas_to_deltas(
+ fun ?QUEUE:out/1,
+ fun (Q2MinSeqId) -> Q2MinSeqId end,
+ Q2, PushState1),
+ {_, Delta1, IndexState1} = PushState2,
+ State #vqstate { q2 = Q2a,
+ delta = Delta1,
+ q3 = Q3a,
+ index_state = IndexState1 }.
+
+push_betas_to_deltas(Generator, LimitFun, Q, PushState) ->
+ case ?QUEUE:is_empty(Q) of
+ true ->
+ {Q, PushState};
+ false ->
+ {value, #msg_status { seq_id = MinSeqId }} = ?QUEUE:peek(Q),
+ {value, #msg_status { seq_id = MaxSeqId }} = ?QUEUE:peek_r(Q),
Limit = LimitFun(MinSeqId),
case MaxSeqId < Limit of
- true -> {?BLANK_DELTA, Q, RamIndexCount, IndexState};
- false -> {Len, Qc, RamIndexCount1, IndexState1} =
- push_betas_to_deltas(Generator, Limit, Q, 0,
- RamIndexCount, IndexState),
- {#delta { start_seq_id = Limit,
- count = Len,
- end_seq_id = MaxSeqId + 1 },
- Qc, RamIndexCount1, IndexState1}
+ true -> {Q, PushState};
+ false -> push_betas_to_deltas1(Generator, Limit, Q, PushState)
end
end.
-push_betas_to_deltas(Generator, Limit, Q, Count, RamIndexCount, IndexState) ->
+push_betas_to_deltas1(_Generator, _Limit, Q,
+ {0, _Delta, _IndexState} = PushState) ->
+ {Q, PushState};
+push_betas_to_deltas1(Generator, Limit, Q,
+ {Quota, Delta, IndexState} = PushState) ->
case Generator(Q) of
{empty, _Q} ->
- {Count, Q, RamIndexCount, IndexState};
- {{value, _IndexOnDisk, #msg_status { seq_id = SeqId }}, _Qa}
+ {Q, PushState};
+ {{value, #msg_status { seq_id = SeqId }}, _Qa}
when SeqId < Limit ->
- {Count, Q, RamIndexCount, IndexState};
- {{value, IndexOnDisk, MsgStatus}, Qa} ->
- {RamIndexCount1, IndexState1} =
- case IndexOnDisk of
- true -> {RamIndexCount, IndexState};
- false -> {#msg_status { index_on_disk = true },
- IndexState2} =
- maybe_write_index_to_disk(true, MsgStatus,
- IndexState),
- {RamIndexCount - 1, IndexState2}
- end,
- push_betas_to_deltas(
- Generator, Limit, Qa, Count + 1, RamIndexCount1, IndexState1)
+ {Q, PushState};
+ {{value, MsgStatus = #msg_status { seq_id = SeqId }}, Qa} ->
+ {#msg_status { index_on_disk = true }, IndexState1} =
+ maybe_write_index_to_disk(true, MsgStatus, IndexState),
+ Delta1 = expand_delta(SeqId, Delta),
+ push_betas_to_deltas1(Generator, Limit, Qa,
+ {Quota - 1, Delta1, IndexState1})
end.
%%----------------------------------------------------------------------------
diff --git a/src/vm_memory_monitor.erl b/src/vm_memory_monitor.erl
index 35ee1e51..8973a4f7 100644
--- a/src/vm_memory_monitor.erl
+++ b/src/vm_memory_monitor.erl
@@ -32,7 +32,7 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
--export([update/0, get_total_memory/0, get_vm_limit/0,
+-export([get_total_memory/0, get_vm_limit/0,
get_check_interval/0, set_check_interval/1,
get_vm_memory_high_watermark/0, set_vm_memory_high_watermark/1,
get_memory_limit/0]).
@@ -59,7 +59,6 @@
-ifdef(use_specs).
-spec(start_link/1 :: (float()) -> rabbit_types:ok_pid_or_error()).
--spec(update/0 :: () -> 'ok').
-spec(get_total_memory/0 :: () -> (non_neg_integer() | 'unknown')).
-spec(get_vm_limit/0 :: () -> non_neg_integer()).
-spec(get_check_interval/0 :: () -> non_neg_integer()).
@@ -74,9 +73,6 @@
%% Public API
%%----------------------------------------------------------------------------
-update() ->
- gen_server:cast(?SERVER, update).
-
get_total_memory() ->
get_total_memory(os:type()).
@@ -135,12 +131,12 @@ handle_call(get_memory_limit, _From, State) ->
handle_call(_Request, _From, State) ->
{noreply, State}.
-handle_cast(update, State) ->
- {noreply, internal_update(State)};
-
handle_cast(_Request, State) ->
{noreply, State}.
+handle_info(update, State) ->
+ {noreply, internal_update(State)};
+
handle_info(_Info, State) ->
{noreply, State}.
@@ -200,7 +196,7 @@ emit_update_info(State, MemUsed, MemLimit) ->
[State, MemUsed, MemLimit]).
start_timer(Timeout) ->
- {ok, TRef} = timer:apply_interval(Timeout, ?MODULE, update, []),
+ {ok, TRef} = timer:send_interval(Timeout, update),
TRef.
%% According to http://msdn.microsoft.com/en-us/library/aa366778(VS.85).aspx