summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2010-11-24 11:51:29 +0000
committerSimon MacMullen <simon@rabbitmq.com>2010-11-24 11:51:29 +0000
commit99a0a09b6736eb6e535f54efa834c31dccc7a427 (patch)
tree59852472aab6b517b5a913ab7cb6c24accb45cde
parent1327df8fc4821abd824c23242bcf07236fd5df06 (diff)
parentad4d8b090b45f47569c40dde99b630537083f403 (diff)
downloadrabbitmq-server-99a0a09b6736eb6e535f54efa834c31dccc7a427.tar.gz
Merge from bug23467 (and hence default)
-rw-r--r--packaging/RPMS/Fedora/rabbitmq-server.spec9
-rw-r--r--packaging/common/rabbitmq-asroot-script-wrapper45
-rw-r--r--packaging/macports/Makefile4
-rwxr-xr-xscripts/rabbitmq-server5
-rw-r--r--scripts/rabbitmq-server.bat8
-rw-r--r--scripts/rabbitmq-service.bat8
-rw-r--r--src/rabbit.erl81
-rw-r--r--src/rabbit_access_control.erl3
-rw-r--r--src/rabbit_amqqueue_process.erl10
-rw-r--r--src/rabbit_channel.erl59
-rw-r--r--src/rabbit_control.erl6
-rw-r--r--src/rabbit_exchange.erl15
-rw-r--r--src/rabbit_misc.erl53
-rw-r--r--src/rabbit_mnesia.erl53
-rw-r--r--src/rabbit_multi.erl3
-rw-r--r--src/rabbit_plugin_activator.erl6
-rw-r--r--src/rabbit_queue_index.erl121
-rw-r--r--src/rabbit_reader.erl41
-rw-r--r--src/rabbit_ssl.erl9
-rw-r--r--src/rabbit_tests.erl32
-rw-r--r--src/rabbit_upgrade.erl86
-rw-r--r--src/rabbit_upgrade_functions.erl53
-rw-r--r--src/rabbit_variable_queue.erl468
23 files changed, 706 insertions, 472 deletions
diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec
index 209a90ee..5032f471 100644
--- a/packaging/RPMS/Fedora/rabbitmq-server.spec
+++ b/packaging/RPMS/Fedora/rabbitmq-server.spec
@@ -9,8 +9,7 @@ Source: http://www.rabbitmq.com/releases/rabbitmq-server/v%{version}/%{name}-%{v
Source1: rabbitmq-server.init
Source2: rabbitmq-script-wrapper
Source3: rabbitmq-server.logrotate
-Source4: rabbitmq-asroot-script-wrapper
-Source5: rabbitmq-server.ocf
+Source4: rabbitmq-server.ocf
URL: http://www.rabbitmq.com/
BuildArch: noarch
BuildRequires: erlang >= R12B-3, python-simplejson, xmlto, libxslt
@@ -29,8 +28,7 @@ scalable implementation of an AMQP broker.
%define _rabbit_libdir %{_exec_prefix}/lib/rabbitmq
%define _rabbit_erllibdir %{_rabbit_libdir}/lib/rabbitmq_server-%{version}
%define _rabbit_wrapper %{_builddir}/`basename %{S:2}`
-%define _rabbit_asroot_wrapper %{_builddir}/`basename %{S:4}`
-%define _rabbit_server_ocf %{_builddir}/`basename %{S:5}`
+%define _rabbit_server_ocf %{_builddir}/`basename %{S:4}`
%define _plugins_state_dir %{_localstatedir}/lib/rabbitmq/plugins
%define _maindir %{buildroot}%{_rabbit_erllibdir}
@@ -40,8 +38,7 @@ scalable implementation of an AMQP broker.
%build
cp %{S:2} %{_rabbit_wrapper}
-cp %{S:4} %{_rabbit_asroot_wrapper}
-cp %{S:5} %{_rabbit_server_ocf}
+cp %{S:4} %{_rabbit_server_ocf}
make %{?_smp_mflags}
%install
diff --git a/packaging/common/rabbitmq-asroot-script-wrapper b/packaging/common/rabbitmq-asroot-script-wrapper
deleted file mode 100644
index 693a6f0b..00000000
--- a/packaging/common/rabbitmq-asroot-script-wrapper
+++ /dev/null
@@ -1,45 +0,0 @@
-#!/bin/sh
-## The contents of this file are subject to the Mozilla Public License
-## Version 1.1 (the "License"); you may not use this file except in
-## compliance with the License. You may obtain a copy of the License at
-## http://www.mozilla.org/MPL/
-##
-## Software distributed under the License is distributed on an "AS IS"
-## basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
-## License for the specific language governing rights and limitations
-## under the License.
-##
-## The Original Code is RabbitMQ.
-##
-## The Initial Developers of the Original Code are LShift Ltd,
-## Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
-##
-## Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
-## Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
-## are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
-## Technologies LLC, and Rabbit Technologies Ltd.
-##
-## Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
-## Ltd. Portions created by Cohesive Financial Technologies LLC are
-## Copyright (C) 2007-2010 Cohesive Financial Technologies
-## LLC. Portions created by Rabbit Technologies Ltd are Copyright
-## (C) 2007-2010 Rabbit Technologies Ltd.
-##
-## All Rights Reserved.
-##
-## Contributor(s): ______________________________________.
-##
-
-cd /var/lib/rabbitmq
-
-SCRIPT=`basename $0`
-
-if [ `id -u` = 0 ] ; then
- /usr/lib/rabbitmq/bin/${SCRIPT} "$@"
-else
- echo
- echo "Only root should run ${SCRIPT}"
- echo
- exit 1
-fi
-
diff --git a/packaging/macports/Makefile b/packaging/macports/Makefile
index ee79c95a..47da02dc 100644
--- a/packaging/macports/Makefile
+++ b/packaging/macports/Makefile
@@ -38,9 +38,7 @@ $(DEST)/Portfile: Portfile.in
# needs vars such as HOME to be set. So we have to set them
# explicitly.
macports: dirs $(DEST)/Portfile
- for f in rabbitmq-asroot-script-wrapper rabbitmq-script-wrapper ; do \
- cp $(COMMON_DIR)/$$f $(DEST)/files ; \
- done
+ cp $(COMMON_DIR)/rabbitmq-script-wrapper $(DEST)/files
sed -i -e 's|@SU_RABBITMQ_SH_C@|SHELL=/bin/sh HOME=/var/lib/rabbitmq USER=rabbitmq LOGNAME=rabbitmq PATH="$$(eval `PATH=MACPORTS_PREFIX/bin /usr/libexec/path_helper -s`; echo $$PATH)" su -m rabbitmq -c|' \
$(DEST)/files/rabbitmq-script-wrapper
cp patch-org.macports.rabbitmq-server.plist.diff $(DEST)/files
diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server
index 0eb7092d..c5d883c3 100755
--- a/scripts/rabbitmq-server
+++ b/scripts/rabbitmq-server
@@ -89,13 +89,10 @@ RABBITMQ_EBIN_ROOT="${RABBITMQ_HOME}/ebin"
if [ "x" = "x$RABBITMQ_NODE_ONLY" ]; then
if erl \
-pa "$RABBITMQ_EBIN_ROOT" \
- -rabbit plugins_dir "\"$RABBITMQ_PLUGINS_DIR\"" \
- -rabbit plugins_expand_dir "\"$RABBITMQ_PLUGINS_EXPAND_DIR\"" \
- -rabbit rabbit_ebin "\"$RABBITMQ_EBIN_ROOT\"" \
-noinput \
-hidden \
-s rabbit_plugin_activator \
- -extra "$@"
+ -extra "$RABBITMQ_PLUGINS_DIR" "${RABBITMQ_PLUGINS_EXPAND_DIR}"
then
RABBITMQ_BOOT_FILE="${RABBITMQ_PLUGINS_EXPAND_DIR}/rabbit"
RABBITMQ_EBIN_PATH=""
diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat
index bd4120fa..94180de9 100644
--- a/scripts/rabbitmq-server.bat
+++ b/scripts/rabbitmq-server.bat
@@ -118,12 +118,10 @@ set RABBITMQ_EBIN_ROOT=!TDP0!..\ebin
-pa "!RABBITMQ_EBIN_ROOT!" ^
-noinput -hidden ^
-s rabbit_plugin_activator ^
--rabbit plugins_dir \""!RABBITMQ_PLUGINS_DIR:\=/!"\" ^
--rabbit plugins_expand_dir \""!RABBITMQ_PLUGINS_EXPAND_DIR:\=/!"\" ^
--rabbit rabbit_ebin \""!RABBITMQ_EBIN_ROOT:\=/!"\" ^
--extra !STAR!
+-extra "!RABBITMQ_PLUGINS_DIR:\=/!" ^
+ "!RABBITMQ_PLUGINS_EXPAND_DIR:\=/!"
-set RABBITMQ_BOOT_FILE=!RABBITMQ_MNESIA_DIR!\plugins-scratch\rabbit
+set RABBITMQ_BOOT_FILE=!RABBITMQ_PLUGINS_EXPAND_DIR!\rabbit
if not exist "!RABBITMQ_BOOT_FILE!.boot" (
echo Custom Boot File "!RABBITMQ_BOOT_FILE!.boot" is missing.
exit /B 1
diff --git a/scripts/rabbitmq-service.bat b/scripts/rabbitmq-service.bat
index ff25b146..2c96b6fd 100644
--- a/scripts/rabbitmq-service.bat
+++ b/scripts/rabbitmq-service.bat
@@ -187,12 +187,10 @@ set RABBITMQ_EBIN_ROOT=!TDP0!..\ebin
-pa "!RABBITMQ_EBIN_ROOT!" ^
-noinput -hidden ^
-s rabbit_plugin_activator ^
--rabbit plugins_dir \""!RABBITMQ_PLUGINS_DIR:\=/!"\" ^
--rabbit plugins_expand_dir \""!RABBITMQ_PLUGINS_EXPAND_DIR:\=/!"\" ^
--rabbit rabbit_ebin \""!RABBITMQ_EBIN_ROOT:\=/!"\" ^
--extra !STAR!
+-extra "!RABBITMQ_PLUGINS_DIR:\=/!" ^
+ "!RABBITMQ_PLUGINS_EXPAND_DIR:\=/!"
-set RABBITMQ_BOOT_FILE=!RABBITMQ_MNESIA_DIR!\plugins-scratch\rabbit
+set RABBITMQ_BOOT_FILE=!RABBITMQ_PLUGINS_EXPAND_DIR!\rabbit
if not exist "!RABBITMQ_BOOT_FILE!.boot" (
echo Custom Boot File "!RABBITMQ_BOOT_FILE!.boot" is missing.
exit /B 1
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 64c75102..e6657b32 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -291,11 +291,10 @@ run_boot_step({StepName, Attributes}) ->
io:format("-- ~s~n", [Description]);
MFAs ->
io:format("starting ~-60s ...", [Description]),
- [case catch apply(M,F,A) of
- {'EXIT', Reason} ->
- boot_error("FAILED~nReason: ~p~n", [Reason]);
- ok ->
- ok
+ [try
+ apply(M,F,A)
+ catch
+ _:Reason -> boot_error("FAILED~nReason: ~p~n", [Reason])
end || {M,F,A} <- MFAs],
io:format("done~n"),
ok
@@ -315,43 +314,43 @@ edges(_Module, Steps) ->
{Key, OtherStep} <- Atts,
Key =:= requires orelse Key =:= enables].
-graph_build_error({vertex, duplicate, StepName}) ->
- boot_error("Duplicate boot step name: ~w~n", [StepName]);
-graph_build_error({edge, Reason, From, To}) ->
- boot_error(
- "Could not add boot step dependency of ~w on ~w:~n~s",
- [To, From,
- case Reason of
- {bad_vertex, V} ->
- io_lib:format("Boot step not registered: ~w~n", [V]);
- {bad_edge, [First | Rest]} ->
- [io_lib:format("Cyclic dependency: ~w", [First]),
- [io_lib:format(" depends on ~w", [Next]) || Next <- Rest],
- io_lib:format(" depends on ~w~n", [First])]
- end]).
-
sort_boot_steps(UnsortedSteps) ->
- G = rabbit_misc:build_acyclic_graph(
- fun vertices/2, fun edges/2, fun graph_build_error/1, UnsortedSteps),
-
- %% Use topological sort to find a consistent ordering (if there is
- %% one, otherwise fail).
- SortedStepsRev = [begin
- {StepName, Step} = digraph:vertex(G, StepName),
- Step
- end || StepName <- digraph_utils:topsort(G)],
- SortedSteps = lists:reverse(SortedStepsRev),
-
- digraph:delete(G),
-
- %% Check that all mentioned {M,F,A} triples are exported.
- case [{StepName, {M,F,A}}
- || {StepName, Attributes} <- SortedSteps,
- {mfa, {M,F,A}} <- Attributes,
- not erlang:function_exported(M, F, length(A))] of
- [] -> SortedSteps;
- MissingFunctions -> boot_error("Boot step functions not exported: ~p~n",
- [MissingFunctions])
+ case rabbit_misc:build_acyclic_graph(fun vertices/2, fun edges/2,
+ UnsortedSteps) of
+ {ok, G} ->
+ %% Use topological sort to find a consistent ordering (if
+ %% there is one, otherwise fail).
+ SortedSteps = lists:reverse(
+ [begin
+ {StepName, Step} = digraph:vertex(G, StepName),
+ Step
+ end || StepName <- digraph_utils:topsort(G)]),
+ digraph:delete(G),
+ %% Check that all mentioned {M,F,A} triples are exported.
+ case [{StepName, {M,F,A}} ||
+ {StepName, Attributes} <- SortedSteps,
+ {mfa, {M,F,A}} <- Attributes,
+ not erlang:function_exported(M, F, length(A))] of
+ [] -> SortedSteps;
+ MissingFunctions -> boot_error(
+ "Boot step functions not exported: ~p~n",
+ [MissingFunctions])
+ end;
+ {error, {vertex, duplicate, StepName}} ->
+ boot_error("Duplicate boot step name: ~w~n", [StepName]);
+ {error, {edge, Reason, From, To}} ->
+ boot_error(
+ "Could not add boot step dependency of ~w on ~w:~n~s",
+ [To, From,
+ case Reason of
+ {bad_vertex, V} ->
+ io_lib:format("Boot step not registered: ~w~n", [V]);
+ {bad_edge, [First | Rest]} ->
+ [io_lib:format("Cyclic dependency: ~w", [First]),
+ [io_lib:format(" depends on ~w", [Next]) ||
+ Next <- Rest],
+ io_lib:format(" depends on ~w~n", [First])]
+ end])
end.
%%---------------------------------------------------------------------------
diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl
index 3a68346c..7d8ccb36 100644
--- a/src/rabbit_access_control.erl
+++ b/src/rabbit_access_control.erl
@@ -38,7 +38,7 @@
check_resource_access/3, list_vhosts/1]).
-export([add_user/2, delete_user/1, change_password/2, set_admin/1,
clear_admin/1, list_users/0, lookup_user/1]).
--export([change_password_hash/2]).
+-export([change_password_hash/2, hash_password/1]).
-export([add_vhost/1, delete_vhost/1, vhost_exists/1, list_vhosts/0]).
-export([set_permissions/5, clear_permissions/2,
list_permissions/0, list_vhost_permissions/1, list_user_permissions/1,
@@ -73,6 +73,7 @@
-spec(delete_user/1 :: (username()) -> 'ok').
-spec(change_password/2 :: (username(), password()) -> 'ok').
-spec(change_password_hash/2 :: (username(), password_hash()) -> 'ok').
+-spec(hash_password/1 :: (password()) -> password_hash()).
-spec(set_admin/1 :: (username()) -> 'ok').
-spec(clear_admin/1 :: (username()) -> 'ok').
-spec(list_users/0 :: () -> [{username(), boolean()}]).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 75f285df..a999fe58 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -669,7 +669,10 @@ i(Item, _) ->
throw({bad_argument, Item}).
emit_stats(State) ->
- rabbit_event:notify(queue_stats, infos(?STATISTICS_KEYS, State)).
+ emit_stats(State, []).
+
+emit_stats(State, Extra) ->
+ rabbit_event:notify(queue_stats, Extra ++ infos(?STATISTICS_KEYS, State)).
%---------------------------------------------------------------------------
@@ -1053,7 +1056,10 @@ handle_pre_hibernate(State = #q{backing_queue = BQ,
DesiredDuration =
rabbit_memory_monitor:report_ram_duration(self(), infinity),
BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1),
- rabbit_event:if_enabled(StatsTimer, fun () -> emit_stats(State) end),
+ rabbit_event:if_enabled(StatsTimer,
+ fun () ->
+ emit_stats(State, [{idle_since, now()}])
+ end),
State1 = State#q{stats_timer = rabbit_event:stop_stats_timer(StatsTimer),
backing_queue_state = BQS2},
{hibernate, stop_rate_timer(State1)}.
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 863081a7..d0018ea5 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -245,7 +245,9 @@ handle_cast({command, Msg}, State = #ch{writer_pid = WriterPid}) ->
handle_cast({deliver, ConsumerTag, AckRequired, Msg},
State = #ch{writer_pid = WriterPid,
next_tag = DeliveryTag}) ->
- State1 = lock_message(AckRequired, {DeliveryTag, ConsumerTag, Msg}, State),
+ State1 = lock_message(AckRequired,
+ ack_record(DeliveryTag, ConsumerTag, Msg),
+ State),
ok = internal_deliver(WriterPid, true, ConsumerTag, DeliveryTag, Msg),
{_QName, QPid, _MsgId, _Redelivered, _Msg} = Msg,
maybe_incr_stats([{QPid, 1}],
@@ -267,9 +269,11 @@ handle_info({'DOWN', _MRef, process, QPid, _Reason}, State) ->
handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) ->
ok = clear_permission_cache(),
- rabbit_event:if_enabled(StatsTimer, fun () ->
- internal_emit_stats(State)
- end),
+ rabbit_event:if_enabled(StatsTimer,
+ fun () ->
+ internal_emit_stats(
+ State, [{idle_since, now()}])
+ end),
{hibernate,
State#ch{stats_timer = rabbit_event:stop_stats_timer(StatsTimer)}}.
@@ -504,7 +508,9 @@ handle_method(#'basic.get'{queue = QueueNameBin,
#basic_message{exchange_name = ExchangeName,
routing_key = RoutingKey,
content = Content}}} ->
- State1 = lock_message(not(NoAck), {DeliveryTag, none, Msg}, State),
+ State1 = lock_message(not(NoAck),
+ ack_record(DeliveryTag, none, Msg),
+ State),
maybe_incr_stats([{QPid, 1}],
case NoAck of
true -> get_no_ack;
@@ -640,30 +646,8 @@ handle_method(#'basic.recover_async'{requeue = true},
%% variant of this method
{noreply, State#ch{unacked_message_q = queue:new()}};
-handle_method(#'basic.recover_async'{requeue = false},
- _, State = #ch{writer_pid = WriterPid,
- unacked_message_q = UAMQ}) ->
- ok = rabbit_misc:queue_fold(
- fun ({_DeliveryTag, none, _Msg}, ok) ->
- %% Was sent as a basic.get_ok. Don't redeliver
- %% it. FIXME: appropriate?
- ok;
- ({DeliveryTag, ConsumerTag,
- {QName, QPid, MsgId, _Redelivered, Message}}, ok) ->
- %% Was sent as a proper consumer delivery. Resend
- %% it as before.
- %%
- %% FIXME: What should happen if the consumer's been
- %% cancelled since?
- %%
- %% FIXME: should we allocate a fresh DeliveryTag?
- internal_deliver(
- WriterPid, false, ConsumerTag, DeliveryTag,
- {QName, QPid, MsgId, true, Message})
- end, ok, UAMQ),
- %% No answer required - basic.recover is the newer, synchronous
- %% variant of this method
- {noreply, State};
+handle_method(#'basic.recover_async'{requeue = false}, _, _State) ->
+ rabbit_misc:protocol_error(not_implemented, "requeue=false", []);
handle_method(#'basic.recover'{requeue = Requeue}, Content, State) ->
{noreply, State2 = #ch{writer_pid = WriterPid}} =
@@ -985,6 +969,10 @@ basic_return(#basic_message{exchange_name = ExchangeName,
routing_key = RoutingKey},
Content).
+ack_record(DeliveryTag, ConsumerTag,
+ _MsgStruct = {_QName, QPid, MsgId, _Redelivered, _Msg}) ->
+ {DeliveryTag, ConsumerTag, {QPid, MsgId}}.
+
collect_acks(Q, 0, true) ->
{Q, queue:new()};
collect_acks(Q, DeliveryTag, Multiple) ->
@@ -1057,8 +1045,7 @@ rollback_and_notify(State) ->
fold_per_queue(F, Acc0, UAQ) ->
D = rabbit_misc:queue_fold(
- fun ({_DTag, _CTag,
- {_QName, QPid, MsgId, _Redelivered, _Message}}, D) ->
+ fun ({_DTag, _CTag, {QPid, MsgId}}, D) ->
%% dict:append would avoid the lists:reverse in
%% handle_message({recover, true}, ...). However, it
%% is significantly slower when going beyond a few
@@ -1201,11 +1188,14 @@ update_measures(Type, QX, Inc, Measure) ->
put({Type, QX},
orddict:store(Measure, Cur + Inc, Measures)).
-internal_emit_stats(State = #ch{stats_timer = StatsTimer}) ->
+internal_emit_stats(State) ->
+ internal_emit_stats(State, []).
+
+internal_emit_stats(State = #ch{stats_timer = StatsTimer}, Extra) ->
CoarseStats = infos(?STATISTICS_KEYS, State),
case rabbit_event:stats_level(StatsTimer) of
coarse ->
- rabbit_event:notify(channel_stats, CoarseStats);
+ rabbit_event:notify(channel_stats, Extra ++ CoarseStats);
fine ->
FineStats =
[{channel_queue_stats,
@@ -1215,7 +1205,8 @@ internal_emit_stats(State = #ch{stats_timer = StatsTimer}) ->
{channel_queue_exchange_stats,
[{QX, Stats} ||
{{queue_exchange_stats, QX}, Stats} <- get()]}],
- rabbit_event:notify(channel_stats, CoarseStats ++ FineStats)
+ rabbit_event:notify(channel_stats,
+ Extra ++ CoarseStats ++ FineStats)
end.
erase_queue_stats(QPid) ->
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index 6c0a727b..72b77b1f 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -94,9 +94,7 @@ start() ->
halt();
{'EXIT', {function_clause, [{?MODULE, action, _} | _]}} ->
print_error("invalid command '~s'",
- [lists:flatten(
- rabbit_misc:intersperse(
- " ", [atom_to_list(Command) | Args]))]),
+ [string:join([atom_to_list(Command) | Args], " ")]),
usage();
{error, Reason} ->
print_error("~p", [Reason]),
@@ -321,7 +319,7 @@ display_info_list(Other, _) ->
Other.
display_row(Row) ->
- io:fwrite(lists:flatten(rabbit_misc:intersperse("\t", Row))),
+ io:fwrite(string:join(Row, "\t")),
io:nl().
-define(IS_U8(X), (X >= 0 andalso X =< 255)).
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index f41b76d8..9cc70a26 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -228,7 +228,7 @@ info_all(VHostPath, Items) -> map(VHostPath, fun (X) -> info(X, Items) end).
publish(X = #exchange{name = XName}, Delivery) ->
rabbit_router:deliver(
- route(Delivery, {queue:from_list([X]), sets:from_list([XName]), []}),
+ route(Delivery, {queue:from_list([X]), XName, []}),
Delivery).
route(Delivery, {WorkList, SeenXs, QNames}) ->
@@ -252,13 +252,22 @@ process_alternate(_X, Results) ->
Results.
process_route(#resource{kind = exchange} = XName,
+ {_WorkList, XName, _QNames} = Acc) ->
+ Acc;
+process_route(#resource{kind = exchange} = XName,
+ {WorkList, #resource{kind = exchange} = SeenX, QNames}) ->
+ {case lookup(XName) of
+ {ok, X} -> queue:in(X, WorkList);
+ {error, not_found} -> WorkList
+ end, gb_sets:from_list([SeenX, XName]), QNames};
+process_route(#resource{kind = exchange} = XName,
{WorkList, SeenXs, QNames} = Acc) ->
- case sets:is_element(XName, SeenXs) of
+ case gb_sets:is_element(XName, SeenXs) of
true -> Acc;
false -> {case lookup(XName) of
{ok, X} -> queue:in(X, WorkList);
{error, not_found} -> WorkList
- end, sets:add_element(XName, SeenXs), QNames}
+ end, gb_sets:add_element(XName, SeenXs), QNames}
end;
process_route(#resource{kind = queue} = QName,
{WorkList, SeenXs, QNames}) ->
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 0522afdc..230f4db5 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -50,7 +50,7 @@
-export([execute_mnesia_transaction/1]).
-export([ensure_ok/2]).
-export([makenode/1, nodeparts/1, cookie_hash/0, tcp_name/3]).
--export([intersperse/2, upmap/2, map_in_order/2]).
+-export([upmap/2, map_in_order/2]).
-export([table_fold/3]).
-export([dirty_read_all/1, dirty_foreach_key/2, dirty_dump_log/1]).
-export([read_term_file/1, write_term_file/2]).
@@ -64,7 +64,7 @@
-export([recursive_delete/1, dict_cons/3, orddict_cons/3,
unlink_and_capture_exit/1]).
-export([get_options/2]).
--export([all_module_attributes/1, build_acyclic_graph/4]).
+-export([all_module_attributes/1, build_acyclic_graph/3]).
-export([now_ms/0]).
-import(mnesia).
@@ -86,10 +86,9 @@
:: rabbit_types:channel_exit() | rabbit_types:connection_exit()).
-type(digraph_label() :: term()).
-type(graph_vertex_fun() ::
- fun ((atom(), [term()]) -> {digraph:vertex(), digraph_label()})).
+ fun ((atom(), [term()]) -> [{digraph:vertex(), digraph_label()}])).
-type(graph_edge_fun() ::
- fun ((atom(), [term()]) -> {digraph:vertex(), digraph:vertex()})).
--type(graph_error_fun() :: fun ((any()) -> any() | no_return())).
+ fun ((atom(), [term()]) -> [{digraph:vertex(), digraph:vertex()}])).
-spec(method_record_type/1 :: (rabbit_framing:amqp_method_record())
-> rabbit_framing:amqp_method_name()).
@@ -155,7 +154,6 @@
-spec(tcp_name/3 ::
(atom(), inet:ip_address(), rabbit_networking:ip_port())
-> atom()).
--spec(intersperse/2 :: (A, [A]) -> [A]).
-spec(upmap/2 :: (fun ((A) -> B), [A]) -> [B]).
-spec(map_in_order/2 :: (fun ((A) -> B), [A]) -> [B]).
-spec(table_fold/3 :: (fun ((any(), A) -> A), A, atom()) -> A).
@@ -191,9 +189,13 @@
-spec(get_options/2 :: ([optdef()], [string()])
-> {[string()], [{string(), any()}]}).
-spec(all_module_attributes/1 :: (atom()) -> [{atom(), [term()]}]).
--spec(build_acyclic_graph/4 :: (graph_vertex_fun(), graph_edge_fun(),
- graph_error_fun(), [{atom(), [term()]}]) ->
- digraph()).
+-spec(build_acyclic_graph/3 ::
+ (graph_vertex_fun(), graph_edge_fun(), [{atom(), [term()]}])
+ -> rabbit_types:ok_or_error2(digraph(),
+ {'vertex', 'duplicate', digraph:vertex()} |
+ {'edge', ({bad_vertex, digraph:vertex()} |
+ {bad_edge, [digraph:vertex()]}),
+ digraph:vertex(), digraph:vertex()})).
-spec(now_ms/0 :: () -> non_neg_integer()).
-endif.
@@ -414,10 +416,6 @@ tcp_name(Prefix, IPAddress, Port)
io_lib:format("~w_~s:~w",
[Prefix, inet_parse:ntoa(IPAddress), Port]))).
-intersperse(_, []) -> [];
-intersperse(_, [E]) -> [E];
-intersperse(Sep, [E|T]) -> [E, Sep | intersperse(Sep, T)].
-
%% This is a modified version of Luke Gorrie's pmap -
%% http://lukego.livejournal.com/6753.html - that doesn't care about
%% the order in which results are received.
@@ -765,16 +763,21 @@ all_module_attributes(Name) ->
end, [], Modules).
-build_acyclic_graph(VertexFun, EdgeFun, ErrorFun, Graph) ->
+build_acyclic_graph(VertexFun, EdgeFun, Graph) ->
G = digraph:new([acyclic]),
- [ case digraph:vertex(G, Vertex) of
- false -> digraph:add_vertex(G, Vertex, Label);
- _ -> ErrorFun({vertex, duplicate, Vertex})
- end || {Module, Atts} <- Graph,
- {Vertex, Label} <- VertexFun(Module, Atts) ],
- [ case digraph:add_edge(G, From, To) of
- {error, E} -> ErrorFun({edge, E, From, To});
- _ -> ok
- end || {Module, Atts} <- Graph,
- {From, To} <- EdgeFun(Module, Atts) ],
- G.
+ try
+ [case digraph:vertex(G, Vertex) of
+ false -> digraph:add_vertex(G, Vertex, Label);
+ _ -> ok = throw({graph_error, {vertex, duplicate, Vertex}})
+ end || {Module, Atts} <- Graph,
+ {Vertex, Label} <- VertexFun(Module, Atts)],
+ [case digraph:add_edge(G, From, To) of
+ {error, E} -> throw({graph_error, {edge, E, From, To}});
+ _ -> ok
+ end || {Module, Atts} <- Graph,
+ {From, To} <- EdgeFun(Module, Atts)],
+ {ok, G}
+ catch {graph_error, Reason} ->
+ true = digraph:delete(G),
+ {error, Reason}
+ end.
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 7efc2c21..72e7264d 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -361,16 +361,15 @@ init_db(ClusterNodes, Force) ->
case mnesia:change_config(extra_db_nodes, ProperClusterNodes) of
{ok, Nodes} ->
case Force of
- false ->
- FailedClusterNodes = ProperClusterNodes -- Nodes,
- case FailedClusterNodes of
- [] -> ok;
- _ ->
- throw({error, {failed_to_cluster_with,
- FailedClusterNodes,
- "Mnesia could not connect to some nodes."}})
- end;
- _ -> ok
+ false -> FailedClusterNodes = ProperClusterNodes -- Nodes,
+ case FailedClusterNodes of
+ [] -> ok;
+ _ -> throw({error, {failed_to_cluster_with,
+ FailedClusterNodes,
+ "Mnesia could not connect "
+ "to some nodes."}})
+ end;
+ true -> ok
end,
case {Nodes, mnesia:system_info(use_dir),
mnesia:system_info(db_nodes)} of
@@ -379,7 +378,7 @@ init_db(ClusterNodes, Force) ->
wait_for_tables(),
case rabbit_upgrade:maybe_upgrade() of
ok ->
- schema_ok_or_exit();
+ ensure_schema_ok();
version_not_available ->
schema_ok_or_move()
end;
@@ -387,15 +386,15 @@ init_db(ClusterNodes, Force) ->
%% "Master" (i.e. without config) disc node in cluster,
%% verify schema
wait_for_tables(),
- version_ok_or_exit(rabbit_upgrade:read_version()),
- schema_ok_or_exit();
+ ensure_version_ok(rabbit_upgrade:read_version()),
+ ensure_schema_ok();
{[], false, _} ->
%% First RAM node in cluster, start from scratch
ok = create_schema();
{[AnotherNode|_], _, _} ->
%% Subsequent node in cluster, catch up
- version_ok_or_exit(rabbit_upgrade:read_version()),
- version_ok_or_exit(
+ ensure_version_ok(rabbit_upgrade:read_version()),
+ ensure_version_ok(
rpc:call(AnotherNode, rabbit_upgrade, read_version, [])),
IsDiskNode = ClusterNodes == [] orelse
lists:member(node(), ClusterNodes),
@@ -405,14 +404,13 @@ init_db(ClusterNodes, Force) ->
true -> disc;
false -> ram
end),
- schema_ok_or_exit()
+ ensure_schema_ok()
end;
{error, Reason} ->
%% one reason we may end up here is if we try to join
%% nodes together that are currently running standalone or
%% are members of a different cluster
- throw({error, {unable_to_join_cluster,
- ClusterNodes, Reason}})
+ throw({error, {unable_to_join_cluster, ClusterNodes, Reason}})
end.
schema_ok_or_move() ->
@@ -430,22 +428,19 @@ schema_ok_or_move() ->
ok = create_schema()
end.
-version_ok_or_exit({ok, DiscVersion}) ->
+ensure_version_ok({ok, DiscVersion}) ->
case rabbit_upgrade:desired_version() of
- DiscVersion ->
- ok;
- DesiredVersion ->
- exit({schema_mismatch, DesiredVersion, DiscVersion})
+ DiscVersion -> ok;
+ DesiredVersion -> throw({error, {schema_mismatch,
+ DesiredVersion, DiscVersion}})
end;
-version_ok_or_exit({error, _}) ->
+ensure_version_ok({error, _}) ->
ok = rabbit_upgrade:write_version().
-schema_ok_or_exit() ->
+ensure_schema_ok() ->
case check_schema_integrity() of
- ok ->
- ok;
- {error, Reason} ->
- exit({schema_invalid, Reason})
+ ok -> ok;
+ {error, Reason} -> throw({error, {schema_invalid, Reason}})
end.
create_schema() ->
diff --git a/src/rabbit_multi.erl b/src/rabbit_multi.erl
index 0440dbe4..0030216e 100644
--- a/src/rabbit_multi.erl
+++ b/src/rabbit_multi.erl
@@ -65,8 +65,7 @@ start() ->
halt();
{'EXIT', {function_clause, [{?MODULE, action, _} | _]}} ->
print_error("invalid command '~s'",
- [lists:flatten(
- rabbit_misc:intersperse(" ", FullCommand))]),
+ [string:join(FullCommand, " ")]),
usage();
timeout ->
print_error("timeout starting some nodes.", []),
diff --git a/src/rabbit_plugin_activator.erl b/src/rabbit_plugin_activator.erl
index ef81ddf2..072f297e 100644
--- a/src/rabbit_plugin_activator.erl
+++ b/src/rabbit_plugin_activator.erl
@@ -50,13 +50,9 @@
start() ->
io:format("Activating RabbitMQ plugins ...~n"),
- %% Ensure Rabbit is loaded so we can access it's environment
- application:load(rabbit),
%% Determine our various directories
- {ok, PluginDir} = application:get_env(rabbit, plugins_dir),
- {ok, UnpackedPluginDir} = application:get_env(rabbit, plugins_expand_dir),
-
+ [PluginDir, UnpackedPluginDir] = init:get_plain_arguments(),
RootName = UnpackedPluginDir ++ "/rabbit",
%% Unpack any .ez plugins
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index bde9b3d3..248c1fbc 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -36,6 +36,8 @@
publish/5, deliver/2, ack/2, sync/2, flush/1, read/3,
next_segment_boundary/1, bounds/1, recover/1]).
+-export([add_queue_ttl/0]).
+
-define(CLEAN_FILENAME, "clean.dot").
%%----------------------------------------------------------------------------
@@ -180,6 +182,8 @@
%%----------------------------------------------------------------------------
+-rabbit_upgrade({add_queue_ttl, []}).
+
-ifdef(use_specs).
-type(hdl() :: ('undefined' | any())).
@@ -226,6 +230,8 @@
-spec(recover/1 ::
([rabbit_amqqueue:name()]) -> {[[any()]], startup_fun_state()}).
+-spec(add_queue_ttl/0 :: () -> 'ok').
+
-endif.
@@ -345,35 +351,36 @@ recover(DurableQueues) ->
DurableDict = dict:from_list([ {queue_name_to_dir_name(Queue), Queue} ||
Queue <- DurableQueues ]),
QueuesDir = queues_dir(),
- Directories = case file:list_dir(QueuesDir) of
- {ok, Entries} -> [ Entry || Entry <- Entries,
- filelib:is_dir(
- filename:join(
- QueuesDir, Entry)) ];
- {error, enoent} -> []
- end,
+ QueueDirNames = all_queue_directory_names(QueuesDir),
DurableDirectories = sets:from_list(dict:fetch_keys(DurableDict)),
{DurableQueueNames, DurableTerms} =
lists:foldl(
- fun (QueueDir, {DurableAcc, TermsAcc}) ->
- case sets:is_element(QueueDir, DurableDirectories) of
+ fun (QueueDirName, {DurableAcc, TermsAcc}) ->
+ QueueDirPath = filename:join(QueuesDir, QueueDirName),
+ case sets:is_element(QueueDirName, DurableDirectories) of
true ->
TermsAcc1 =
- case read_shutdown_terms(
- filename:join(QueuesDir, QueueDir)) of
+ case read_shutdown_terms(QueueDirPath) of
{error, _} -> TermsAcc;
{ok, Terms} -> [Terms | TermsAcc]
end,
- {[dict:fetch(QueueDir, DurableDict) | DurableAcc],
+ {[dict:fetch(QueueDirName, DurableDict) | DurableAcc],
TermsAcc1};
false ->
- Dir = filename:join(queues_dir(), QueueDir),
- ok = rabbit_misc:recursive_delete([Dir]),
+ ok = rabbit_misc:recursive_delete([QueueDirPath]),
{DurableAcc, TermsAcc}
end
- end, {[], []}, Directories),
+ end, {[], []}, QueueDirNames),
{DurableTerms, {fun queue_index_walker/1, {start, DurableQueueNames}}}.
+all_queue_directory_names(Dir) ->
+ case file:list_dir(Dir) of
+ {ok, Entries} -> [ Entry || Entry <- Entries,
+ filelib:is_dir(
+ filename:join(Dir, Entry)) ];
+ {error, enoent} -> []
+ end.
+
%%----------------------------------------------------------------------------
%% startup and shutdown
%%----------------------------------------------------------------------------
@@ -972,3 +979,87 @@ journal_minus_segment1({no_pub, del, ack}, {?PUB, del, no_ack}) ->
{{no_pub, no_del, ack}, 0};
journal_minus_segment1({no_pub, del, ack}, {?PUB, del, ack}) ->
{undefined, -1}.
+
+%%----------------------------------------------------------------------------
+%% upgrade
+%%----------------------------------------------------------------------------
+
+add_queue_ttl() ->
+ foreach_queue_index({fun add_queue_ttl_journal/1,
+ fun add_queue_ttl_segment/1}).
+
+add_queue_ttl_journal(<<?DEL_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS,
+ Rest/binary>>) ->
+ {<<?DEL_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS>>, Rest};
+add_queue_ttl_journal(<<?ACK_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS,
+ Rest/binary>>) ->
+ {<<?ACK_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS>>, Rest};
+add_queue_ttl_journal(<<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS,
+ Guid:?GUID_BYTES/binary, Rest/binary>>) ->
+ {[<<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS>>, Guid,
+ expiry_to_binary(undefined)], Rest};
+add_queue_ttl_journal(_) ->
+ stop.
+
+add_queue_ttl_segment(<<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS, IsPersistentNum:1,
+ RelSeq:?REL_SEQ_BITS, Guid:?GUID_BYTES/binary,
+ Rest/binary>>) ->
+ {[<<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS, IsPersistentNum:1,
+ RelSeq:?REL_SEQ_BITS>>, Guid, expiry_to_binary(undefined)], Rest};
+add_queue_ttl_segment(<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
+ RelSeq:?REL_SEQ_BITS, Rest>>) ->
+ {<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, RelSeq:?REL_SEQ_BITS>>,
+ Rest};
+add_queue_ttl_segment(_) ->
+ stop.
+
+%%----------------------------------------------------------------------------
+
+foreach_queue_index(Funs) ->
+ QueuesDir = queues_dir(),
+ QueueDirNames = all_queue_directory_names(QueuesDir),
+ {ok, Gatherer} = gatherer:start_link(),
+ [begin
+ ok = gatherer:fork(Gatherer),
+ ok = worker_pool:submit_async(
+ fun () ->
+ transform_queue(filename:join(QueuesDir, QueueDirName),
+ Gatherer, Funs)
+ end)
+ end || QueueDirName <- QueueDirNames],
+ empty = gatherer:out(Gatherer),
+ ok = gatherer:stop(Gatherer),
+ ok = rabbit_misc:unlink_and_capture_exit(Gatherer).
+
+transform_queue(Dir, Gatherer, {JournalFun, SegmentFun}) ->
+ ok = transform_file(filename:join(Dir, ?JOURNAL_FILENAME), JournalFun),
+ [ok = transform_file(filename:join(Dir, Seg), SegmentFun)
+ || Seg <- filelib:wildcard("*" ++ ?SEGMENT_EXTENSION, Dir)],
+ ok = gatherer:finish(Gatherer).
+
+transform_file(Path, Fun) ->
+ PathTmp = Path ++ ".upgrade",
+ Size = filelib:file_size(Path),
+
+ {ok, PathTmpHdl} =
+ file_handle_cache:open(PathTmp, [exclusive | ?WRITE_MODE],
+ [{write_buffer, infinity}]),
+
+ {ok, PathHdl} =
+ file_handle_cache:open(Path, [{read_ahead, Size} | ?READ_MODE], []),
+ {ok, Content} = file_handle_cache:read(PathHdl, Size),
+ ok = file_handle_cache:close(PathHdl),
+
+ ok = drive_transform_fun(Fun, PathTmpHdl, Content),
+
+ ok = file_handle_cache:close(PathTmpHdl),
+ ok = file:rename(PathTmp, Path).
+
+drive_transform_fun(Fun, Hdl, Contents) ->
+ case Fun(Contents) of
+ stop ->
+ ok;
+ {Output, Contents1} ->
+ ok = file_handle_cache:append(Hdl, Output),
+ drive_transform_fun(Fun, Hdl, Contents1)
+ end.
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index cda6fbb1..7e4f31a1 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -45,10 +45,6 @@
-export([emit_stats/1]).
--import(gen_tcp).
--import(inet).
--import(prim_inet).
-
-define(HANDSHAKE_TIMEOUT, 10).
-define(NORMAL_TIMEOUT, 3).
-define(CLOSING_TIMEOUT, 1).
@@ -325,13 +321,10 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb,
done.
mainloop(Deb, State = #v1{parent = Parent, sock= Sock, recv_ref = Ref}) ->
- %%?LOGDEBUG("Reader mainloop: ~p bytes available, need ~p~n", [HaveBytes, WaitUntilNBytes]),
receive
{inet_async, Sock, Ref, {ok, Data}} ->
- {State1, Callback1, Length1} =
- handle_input(State#v1.callback, Data,
- State#v1{recv_ref = none}),
- mainloop(Deb, switch_callback(State1, Callback1, Length1));
+ mainloop(Deb, handle_input(State#v1.callback, Data,
+ State#v1{recv_ref = none}));
{inet_async, Sock, Ref, {error, closed}} ->
if State#v1.connection_state =:= closed ->
State;
@@ -571,7 +564,6 @@ handle_frame(Type, Channel, Payload,
error -> throw({unknown_frame, Channel, Type, Payload});
heartbeat -> throw({unexpected_heartbeat_frame, Channel});
AnalyzedFrame ->
- %%?LOGDEBUG("Ch ~p Frame ~p~n", [Channel, AnalyzedFrame]),
case get({channel, Channel}) of
{ch_fr_pid, ChFrPid} ->
ok = rabbit_framing_channel:process(ChFrPid, AnalyzedFrame),
@@ -635,18 +627,18 @@ analyze_frame(_Type, _Body, _Protocol) ->
error.
handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32>>, State) ->
- %%?LOGDEBUG("Got frame header: ~p/~p/~p~n", [Type, Channel, PayloadSize]),
- {ensure_stats_timer(State), {frame_payload, Type, Channel, PayloadSize},
- PayloadSize + 1};
+ ensure_stats_timer(
+ switch_callback(State, {frame_payload, Type, Channel, PayloadSize},
+ PayloadSize + 1));
-handle_input({frame_payload, Type, Channel, PayloadSize}, PayloadAndMarker, State) ->
+handle_input({frame_payload, Type, Channel, PayloadSize},
+ PayloadAndMarker, State) ->
case PayloadAndMarker of
<<Payload:PayloadSize/binary, ?FRAME_END>> ->
- %%?LOGDEBUG("Frame completed: ~p/~p/~p~n", [Type, Channel, Payload]),
- NewState = handle_frame(Type, Channel, Payload, State),
- {NewState, frame_header, 7};
+ handle_frame(Type, Channel, Payload,
+ switch_callback(State, frame_header, 7));
_ ->
- throw({bad_payload, PayloadAndMarker})
+ throw({bad_payload, Type, Channel, PayloadSize, PayloadAndMarker})
end;
%% The two rules pertaining to version negotiation:
@@ -698,11 +690,11 @@ start_connection({ProtocolMajor, ProtocolMinor, _ProtocolRevision},
mechanisms = auth_mechanisms_binary(Sock),
locales = <<"en_US">> },
ok = send_on_channel0(Sock, Start, Protocol),
- {State#v1{connection = Connection#connection{
- timeout_sec = ?NORMAL_TIMEOUT,
- protocol = Protocol},
- connection_state = starting},
- frame_header, 7}.
+ switch_callback(State#v1{connection = Connection#connection{
+ timeout_sec = ?NORMAL_TIMEOUT,
+ protocol = Protocol},
+ connection_state = starting},
+ frame_header, 7).
refuse_connection(Sock, Exception) ->
ok = inet_op(fun () -> rabbit_net:send(Sock, <<"AMQP",0,0,9,1>>) end),
@@ -862,8 +854,7 @@ auth_mechanism_to_module(TypeBin, Sock) ->
auth_mechanisms(Sock) ->
{ok, Configured} = application:get_env(auth_mechanisms),
[Name || {Name, Module} <- rabbit_registry:lookup_all(auth_mechanism),
- Module:should_offer(Sock),
- lists:any(fun (N) -> N == Name end, Configured)].
+ Module:should_offer(Sock), lists:member(Name, Configured)].
auth_mechanisms_binary(Sock) ->
list_to_binary(
diff --git a/src/rabbit_ssl.erl b/src/rabbit_ssl.erl
index 4452b075..a4da23e2 100644
--- a/src/rabbit_ssl.erl
+++ b/src/rabbit_ssl.erl
@@ -114,14 +114,11 @@ find_by_type(Type, {rdnSequence, RDNs}) ->
%% Format and rdnSequence as a RFC4514 subject string.
format_rdn_sequence({rdnSequence, Seq}) ->
- lists:flatten(
- rabbit_misc:intersperse(
- ",", lists:reverse([format_complex_rdn(RDN) || RDN <- Seq]))).
+ string:join(lists:reverse([format_complex_rdn(RDN) || RDN <- Seq]), ",").
%% Format an RDN set.
format_complex_rdn(RDNs) ->
- lists:flatten(
- rabbit_misc:intersperse("+", [format_rdn(RDN) || RDN <- RDNs])).
+ string:join([format_rdn(RDN) || RDN <- RDNs], "+").
%% Format an RDN. If the type name is unknown, use the dotted decimal
%% representation. See RFC4514, section 2.3.
@@ -148,7 +145,7 @@ format_rdn(#'AttributeTypeAndValue'{type = T, value = V}) ->
io_lib:format(Fmt ++ "=~s", [FV]);
none when is_tuple(T) ->
TypeL = [io_lib:format("~w", [X]) || X <- tuple_to_list(T)],
- io_lib:format("~s:~s", [rabbit_misc:intersperse(".", TypeL), FV]);
+ io_lib:format("~s:~s", [string:join(TypeL, "."), FV]);
none ->
io_lib:format("~p:~s", [T, FV])
end.
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 71b23e01..27e4d925 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1865,9 +1865,39 @@ test_variable_queue() ->
fun test_variable_queue_partial_segments_delta_thing/1,
fun test_variable_queue_all_the_bits_not_covered_elsewhere1/1,
fun test_variable_queue_all_the_bits_not_covered_elsewhere2/1,
- fun test_dropwhile/1]],
+ fun test_dropwhile/1,
+ fun test_variable_queue_ack_limiting/1]],
passed.
+test_variable_queue_ack_limiting(VQ0) ->
+ %% start by sending in a bunch of messages
+ Len = 1024,
+ VQ1 = variable_queue_publish(false, Len, VQ0),
+
+ %% squeeze and relax queue
+ Churn = Len div 32,
+ VQ2 = publish_fetch_and_ack(Churn, Len, VQ1),
+
+ %% update stats for duration
+ {_Duration, VQ3} = rabbit_variable_queue:ram_duration(VQ2),
+
+ %% fetch half the messages
+ {VQ4, _AckTags} = variable_queue_fetch(Len div 2, false, false, Len, VQ3),
+
+ VQ5 = check_variable_queue_status(VQ4, [{len , Len div 2},
+ {ram_ack_count, Len div 2},
+ {ram_msg_count, Len div 2}]),
+
+ %% ensure all acks go to disk on 0 duration target
+ VQ6 = check_variable_queue_status(
+ rabbit_variable_queue:set_ram_duration_target(0, VQ5),
+ [{len, Len div 2},
+ {target_ram_item_count, 0},
+ {ram_msg_count, 0},
+ {ram_ack_count, 0}]),
+
+ VQ6.
+
test_dropwhile(VQ0) ->
Count = 10,
diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl
index 0071a08a..27a94f6f 100644
--- a/src/rabbit_upgrade.erl
+++ b/src/rabbit_upgrade.erl
@@ -32,11 +32,13 @@
-ifdef(use_specs).
+-type(step() :: atom()).
+-type(version() :: [step()]).
+
-spec(maybe_upgrade/0 :: () -> 'ok' | 'version_not_available').
--spec(read_version/0 ::
- () -> {'ok', [any()]} | rabbit_types:error(any())).
+-spec(read_version/0 :: () -> rabbit_types:ok_or_error2(version(), any())).
-spec(write_version/0 :: () -> 'ok').
--spec(desired_version/0 :: () -> [atom()]).
+-spec(desired_version/0 :: () -> version()).
-endif.
@@ -48,26 +50,25 @@
maybe_upgrade() ->
case read_version() of
{ok, CurrentHeads} ->
- G = load_graph(),
- case unknown_heads(CurrentHeads, G) of
- [] ->
- case upgrades_to_apply(CurrentHeads, G) of
- [] -> ok;
- Upgrades -> apply_upgrades(Upgrades)
- end;
- Unknown ->
- exit({future_upgrades_found, Unknown})
- end,
- true = digraph:delete(G),
- ok;
+ with_upgrade_graph(
+ fun (G) ->
+ case unknown_heads(CurrentHeads, G) of
+ [] -> case upgrades_to_apply(CurrentHeads, G) of
+ [] -> ok;
+ Upgrades -> apply_upgrades(Upgrades)
+ end;
+ Unknown -> throw({error,
+ {future_upgrades_found, Unknown}})
+ end
+ end);
{error, enoent} ->
version_not_available
end.
read_version() ->
case rabbit_misc:read_term_file(schema_filename()) of
- {ok, [Heads]} -> {ok, Heads};
- {error, E} -> {error, E}
+ {ok, [Heads]} -> {ok, Heads};
+ {error, _} = Err -> Err
end.
write_version() ->
@@ -75,17 +76,26 @@ write_version() ->
ok.
desired_version() ->
- G = load_graph(),
- Version = heads(G),
- true = digraph:delete(G),
- Version.
+ with_upgrade_graph(fun (G) -> heads(G) end).
%% -------------------------------------------------------------------
-load_graph() ->
- Upgrades = rabbit_misc:all_module_attributes(rabbit_upgrade),
- rabbit_misc:build_acyclic_graph(
- fun vertices/2, fun edges/2, fun graph_build_error/1, Upgrades).
+with_upgrade_graph(Fun) ->
+ case rabbit_misc:build_acyclic_graph(
+ fun vertices/2, fun edges/2,
+ rabbit_misc:all_module_attributes(rabbit_upgrade)) of
+ {ok, G} -> try
+ Fun(G)
+ after
+ true = digraph:delete(G)
+ end;
+ {error, {vertex, duplicate, StepName}} ->
+ throw({error, {duplicate_upgrade_step, StepName}});
+ {error, {edge, {bad_vertex, StepName}, _From, _To}} ->
+ throw({error, {dependency_on_unknown_upgrade_step, StepName}});
+ {error, {edge, {bad_edge, StepNames}, _From, _To}} ->
+ throw({error, {cycle_in_upgrade_steps, StepNames}})
+ end.
vertices(Module, Steps) ->
[{StepName, {Module, StepName}} || {StepName, _Reqs} <- Steps].
@@ -93,10 +103,6 @@ vertices(Module, Steps) ->
edges(_Module, Steps) ->
[{Require, StepName} || {StepName, Requires} <- Steps, Require <- Requires].
-graph_build_error({vertex, duplicate, StepName}) ->
- exit({duplicate_upgrade, StepName});
-graph_build_error({edge, E, From, To}) ->
- exit({E, From, To}).
unknown_heads(Heads, G) ->
[H || H <- Heads, digraph:vertex(G, H) =:= false].
@@ -111,8 +117,8 @@ upgrades_to_apply(Heads, G) ->
sets:from_list(digraph_utils:reaching(Heads, G)))),
%% Form a subgraph from that list and find a topological ordering
%% so we can invoke them in order.
- [element(2, digraph:vertex(G, StepName))
- || StepName <- digraph_utils:topsort(digraph_utils:subgraph(G, Unsorted))].
+ [element(2, digraph:vertex(G, StepName)) ||
+ StepName <- digraph_utils:topsort(digraph_utils:subgraph(G, Unsorted))].
heads(G) ->
lists:sort([V || V <- digraph:vertices(G), digraph:out_degree(G, V) =:= 0]).
@@ -130,9 +136,9 @@ apply_upgrades(Upgrades) ->
ok = write_version(),
ok = file:delete(LockFile);
{error, eexist} ->
- exit(previous_upgrade_failed);
+ throw({error, previous_upgrade_failed});
{error, _} = Error ->
- exit(Error)
+ throw(Error)
end.
apply_upgrade({M, F}) ->
@@ -141,16 +147,12 @@ apply_upgrade({M, F}) ->
%% -------------------------------------------------------------------
-schema_filename() ->
- filename:join(dir(), ?VERSION_FILENAME).
+dir() -> rabbit_mnesia:dir().
-lock_filename() ->
- filename:join(dir(), ?LOCK_FILENAME).
+schema_filename() -> filename:join(dir(), ?VERSION_FILENAME).
+
+lock_filename() -> filename:join(dir(), ?LOCK_FILENAME).
%% NB: we cannot use rabbit_log here since it may not have been
%% started yet
-info(Msg, Args) ->
- error_logger:info_msg(Msg, Args).
-
-dir() ->
- rabbit_mnesia:dir().
+info(Msg, Args) -> error_logger:info_msg(Msg, Args).
diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl
index 59b8705d..1c56d51d 100644
--- a/src/rabbit_upgrade_functions.erl
+++ b/src/rabbit_upgrade_functions.erl
@@ -24,28 +24,55 @@
-compile([export_all]).
--rabbit_upgrade({remove_user_scope, []}).
+-rabbit_upgrade({remove_user_scope, []}).
+-rabbit_upgrade({hash_passwords, []}).
+-rabbit_upgrade({add_ip_to_listener, []}).
%% -------------------------------------------------------------------
-ifdef(use_specs).
--spec(remove_user_scope/0 :: () -> 'ok').
+-spec(remove_user_scope/0 :: () -> 'ok').
+-spec(hash_passwords/0 :: () -> 'ok').
+-spec(add_ip_to_listener/0 :: () -> 'ok').
-endif.
%%--------------------------------------------------------------------
+%% It's a bad idea to use records or record_info here, even for the
+%% destination form. Because in the future, the destination form of
+%% your current transform may not match the record any more, and it
+%% would be messy to have to go back and fix old transforms at that
+%% point.
+
remove_user_scope() ->
- {atomic, ok} = mnesia:transform_table(
- rabbit_user_permission,
- fun (Perm = #user_permission{
- permission = {permission,
- _Scope, Conf, Write, Read}}) ->
- Perm#user_permission{
- permission = #permission{configure = Conf,
- write = Write,
- read = Read}}
- end,
- record_info(fields, user_permission)),
+ mnesia(
+ rabbit_user_permission,
+ fun ({user_permission, UV, {permission, _Scope, Conf, Write, Read}}) ->
+ {user_permission, UV, {permission, Conf, Write, Read}}
+ end,
+ [user_vhost, permission]).
+
+hash_passwords() ->
+ mnesia(
+ rabbit_user,
+ fun ({user, Username, Password, IsAdmin}) ->
+ Hash = rabbit_access_control:hash_password(Password),
+ {user, Username, Hash, IsAdmin}
+ end,
+ [username, password_hash, is_admin]).
+
+add_ip_to_listener() ->
+ mnesia(
+ rabbit_listener,
+ fun ({listener, Node, Protocol, Host, Port}) ->
+ {listener, Node, Protocol, Host, {0,0,0,0}, Port}
+ end,
+ [node, protocol, host, ip_address, port]).
+
+%%--------------------------------------------------------------------
+
+mnesia(TableName, Fun, FieldList) ->
+ {atomic, ok} = mnesia:transform_table(TableName, Fun, FieldList),
ok.
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 69d62fde..5ac042a2 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -89,12 +89,14 @@
%%
%% The duration indicated to us by the memory_monitor is used to
%% calculate, given our current ingress and egress rates, how many
-%% messages we should hold in RAM. When we need to push alphas to
-%% betas or betas to gammas, we favour writing out messages that are
-%% further from the head of the queue. This minimises writes to disk,
-%% as the messages closer to the tail of the queue stay in the queue
-%% for longer, thus do not need to be replaced as quickly by sending
-%% other messages to disk.
+%% messages we should hold in RAM. We track the ingress and egress
+%% rates for both messages and pending acks and rates for both are
+%% considered when calculating the number of messages to hold in
+%% RAM. When we need to push alphas to betas or betas to gammas, we
+%% favour writing out messages that are further from the head of the
+%% queue. This minimises writes to disk, as the messages closer to the
+%% tail of the queue stay in the queue for longer, thus do not need to
+%% be replaced as quickly by sending other messages to disk.
%%
%% Whilst messages are pushed to disk and forgotten from RAM as soon
%% as requested by a new setting of the queue RAM duration, the
@@ -156,7 +158,7 @@
%% The conversion from alphas to betas is also chunked, but only to
%% ensure no more than ?IO_BATCH_SIZE alphas are converted to betas at
%% any one time. This further smooths the effects of changes to the
-%% target_ram_msg_count and ensures the queue remains responsive
+%% target_ram_item_count and ensures the queue remains responsive
%% even when there is a large amount of IO work to do. The
%% idle_timeout callback is utilised to ensure that conversions are
%% done as promptly as possible whilst ensuring the queue remains
@@ -168,6 +170,29 @@
%% the latter) are both cheap and do require any scanning through qi
%% segments.
%%
+%% Pending acks are recorded in memory either as the tuple {SeqId,
+%% Guid, MsgProps} (tuple-form) or as the message itself (message-
+%% form). Acks for persistent messages are always stored in the tuple-
+%% form. Acks for transient messages are also stored in tuple-form if
+%% the message has been sent to disk as part of the memory reduction
+%% process. For transient messages that haven't already been written
+%% to disk, acks are stored in message-form.
+%%
+%% During memory reduction, acks stored in message-form are converted
+%% to tuple-form, and the corresponding messages are pushed out to
+%% disk.
+%%
+%% The order in which alphas are pushed to betas and message-form acks
+%% are pushed to disk is determined dynamically. We always prefer to
+%% push messages for the source (alphas or acks) that is growing the
+%% fastest (with growth measured as avg. ingress - avg. egress). In
+%% each round of memory reduction a chunk of messages at most
+%% ?IO_BATCH_SIZE in size is allocated to be pushed to disk. The
+%% fastest growing source will be reduced by as much of this chunk as
+%% possible. If there is any remaining allocation in the chunk after
+%% the first source has been reduced to zero, the second source will
+%% be reduced by as much of the remaining chunk as possible.
+%%
%% Notes on Clean Shutdown
%% (This documents behaviour in variable_queue, queue_index and
%% msg_store.)
@@ -220,6 +245,8 @@
q4,
next_seq_id,
pending_ack,
+ pending_ack_index,
+ ram_ack_index,
index_state,
msg_store_clients,
on_sync,
@@ -229,13 +256,17 @@
len,
persistent_count,
- target_ram_msg_count,
+ target_ram_item_count,
ram_msg_count,
ram_msg_count_prev,
+ ram_ack_count_prev,
ram_index_count,
out_counter,
in_counter,
- rates
+ ack_out_counter,
+ ack_in_counter,
+ rates,
+ ack_rates
}).
-record(rates, { egress, ingress, avg_egress, avg_ingress, timestamp }).
@@ -299,30 +330,34 @@
funs :: [fun (() -> any())] }).
-type(state() :: #vqstate {
- q1 :: queue(),
- q2 :: bpqueue:bpqueue(),
- delta :: delta(),
- q3 :: bpqueue:bpqueue(),
- q4 :: queue(),
- next_seq_id :: seq_id(),
- pending_ack :: dict(),
- index_state :: any(),
- msg_store_clients :: 'undefined' | {{any(), binary()},
+ q1 :: queue(),
+ q2 :: bpqueue:bpqueue(),
+ delta :: delta(),
+ q3 :: bpqueue:bpqueue(),
+ q4 :: queue(),
+ next_seq_id :: seq_id(),
+ pending_ack :: dict(),
+ ram_ack_index :: gb_tree(),
+ index_state :: any(),
+ msg_store_clients :: 'undefined' | {{any(), binary()},
{any(), binary()}},
- on_sync :: sync(),
- durable :: boolean(),
-
- len :: non_neg_integer(),
- persistent_count :: non_neg_integer(),
-
- transient_threshold :: non_neg_integer(),
- target_ram_msg_count :: non_neg_integer() | 'infinity',
- ram_msg_count :: non_neg_integer(),
- ram_msg_count_prev :: non_neg_integer(),
- ram_index_count :: non_neg_integer(),
- out_counter :: non_neg_integer(),
- in_counter :: non_neg_integer(),
- rates :: rates() }).
+ on_sync :: sync(),
+ durable :: boolean(),
+
+ len :: non_neg_integer(),
+ persistent_count :: non_neg_integer(),
+
+ transient_threshold :: non_neg_integer(),
+ target_ram_item_count :: non_neg_integer() | 'infinity',
+ ram_msg_count :: non_neg_integer(),
+ ram_msg_count_prev :: non_neg_integer(),
+ ram_index_count :: non_neg_integer(),
+ out_counter :: non_neg_integer(),
+ in_counter :: non_neg_integer(),
+ ack_out_counter :: non_neg_integer(),
+ ack_in_counter :: non_neg_integer(),
+ rates :: rates(),
+ ack_rates :: rates() }).
-include("rabbit_backing_queue_spec.hrl").
@@ -479,19 +514,18 @@ publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent },
out_counter = OutCount,
in_counter = InCount,
persistent_count = PCount,
- pending_ack = PA,
durable = IsDurable }) ->
IsPersistent1 = IsDurable andalso IsPersistent,
MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, MsgProps))
#msg_status { is_delivered = true },
{MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State),
- PA1 = record_pending_ack(m(MsgStatus1), PA),
+ State2 = record_pending_ack(m(MsgStatus1), State1),
PCount1 = PCount + one_if(IsPersistent1),
- {SeqId, a(State1 #vqstate { next_seq_id = SeqId + 1,
- out_counter = OutCount + 1,
- in_counter = InCount + 1,
- persistent_count = PCount1,
- pending_ack = PA1 })}.
+ {SeqId, a(reduce_memory_use(
+ State2 #vqstate { next_seq_id = SeqId + 1,
+ out_counter = OutCount + 1,
+ in_counter = InCount + 1,
+ persistent_count = PCount1 }))}.
dropwhile(Pred, State) ->
{_OkOrEmpty, State1} = dropwhile1(Pred, State),
@@ -561,8 +595,7 @@ internal_fetch(AckRequired, MsgStatus = #msg_status {
index_state = IndexState,
msg_store_clients = MSCState,
len = Len,
- persistent_count = PCount,
- pending_ack = PA }) ->
+ persistent_count = PCount }) ->
%% 1. Mark it delivered if necessary
IndexState1 = maybe_write_delivered(
IndexOnDisk andalso not IsDelivered,
@@ -582,12 +615,12 @@ internal_fetch(AckRequired, MsgStatus = #msg_status {
end,
%% 3. If an ack is required, add something sensible to PA
- {AckTag, PA1} = case AckRequired of
- true -> PA2 = record_pending_ack(
- MsgStatus #msg_status {
- is_delivered = true }, PA),
- {SeqId, PA2};
- false -> {blank_ack, PA}
+ {AckTag, State1} = case AckRequired of
+ true -> StateN = record_pending_ack(
+ MsgStatus #msg_status {
+ is_delivered = true }, State),
+ {SeqId, StateN};
+ false -> {blank_ack, State}
end,
PCount1 = PCount - one_if(IsPersistent andalso not AckRequired),
@@ -595,12 +628,11 @@ internal_fetch(AckRequired, MsgStatus = #msg_status {
RamMsgCount1 = RamMsgCount - one_if(Msg =/= undefined),
{{Msg, IsDelivered, AckTag, Len1},
- a(State #vqstate { ram_msg_count = RamMsgCount1,
- out_counter = OutCount + 1,
- index_state = IndexState2,
- len = Len1,
- persistent_count = PCount1,
- pending_ack = PA1 })}.
+ a(State1 #vqstate { ram_msg_count = RamMsgCount1,
+ out_counter = OutCount + 1,
+ index_state = IndexState2,
+ len = Len1,
+ persistent_count = PCount1 })}.
ack(AckTags, State) ->
a(ack(fun msg_store_remove/3,
@@ -678,40 +710,62 @@ is_empty(State) -> 0 == len(State).
set_ram_duration_target(DurationTarget,
State = #vqstate {
- rates = #rates { avg_egress = AvgEgressRate,
- avg_ingress = AvgIngressRate },
- target_ram_msg_count = TargetRamMsgCount }) ->
- Rate = AvgEgressRate + AvgIngressRate,
- TargetRamMsgCount1 =
+ rates =
+ #rates { avg_egress = AvgEgressRate,
+ avg_ingress = AvgIngressRate },
+ ack_rates =
+ #rates { avg_egress = AvgAckEgressRate,
+ avg_ingress = AvgAckIngressRate },
+ target_ram_item_count = TargetRamItemCount }) ->
+ Rate =
+ AvgEgressRate + AvgIngressRate + AvgAckEgressRate + AvgAckIngressRate,
+ TargetRamItemCount1 =
case DurationTarget of
infinity -> infinity;
_ -> trunc(DurationTarget * Rate) %% msgs = sec * msgs/sec
end,
- State1 = State #vqstate { target_ram_msg_count = TargetRamMsgCount1 },
- a(case TargetRamMsgCount1 == infinity orelse
- (TargetRamMsgCount =/= infinity andalso
- TargetRamMsgCount1 >= TargetRamMsgCount) of
+ State1 = State #vqstate { target_ram_item_count = TargetRamItemCount1 },
+ a(case TargetRamItemCount1 == infinity orelse
+ (TargetRamItemCount =/= infinity andalso
+ TargetRamItemCount1 >= TargetRamItemCount) of
true -> State1;
false -> reduce_memory_use(State1)
end).
ram_duration(State = #vqstate {
- rates = #rates { egress = Egress,
- ingress = Ingress,
- timestamp = Timestamp } = Rates,
+ rates = #rates { timestamp = Timestamp,
+ egress = Egress,
+ ingress = Ingress } = Rates,
+ ack_rates = #rates { timestamp = AckTimestamp,
+ egress = AckEgress,
+ ingress = AckIngress } = ARates,
in_counter = InCount,
out_counter = OutCount,
+ ack_in_counter = AckInCount,
+ ack_out_counter = AckOutCount,
ram_msg_count = RamMsgCount,
- ram_msg_count_prev = RamMsgCountPrev }) ->
+ ram_msg_count_prev = RamMsgCountPrev,
+ ram_ack_index = RamAckIndex,
+ ram_ack_count_prev = RamAckCountPrev }) ->
Now = now(),
{AvgEgressRate, Egress1} = update_rate(Now, Timestamp, OutCount, Egress),
{AvgIngressRate, Ingress1} = update_rate(Now, Timestamp, InCount, Ingress),
- Duration = %% msgs / (msgs/sec) == sec
- case AvgEgressRate == 0 andalso AvgIngressRate == 0 of
+ {AvgAckEgressRate, AckEgress1} =
+ update_rate(Now, AckTimestamp, AckOutCount, AckEgress),
+ {AvgAckIngressRate, AckIngress1} =
+ update_rate(Now, AckTimestamp, AckInCount, AckIngress),
+
+ RamAckCount = gb_trees:size(RamAckIndex),
+
+ Duration = %% msgs+acks / (msgs+acks/sec) == sec
+ case AvgEgressRate == 0 andalso AvgIngressRate == 0 andalso
+ AvgAckEgressRate == 0 andalso AvgAckIngressRate == 0 of
true -> infinity;
- false -> (RamMsgCountPrev + RamMsgCount) /
- (2 * (AvgEgressRate + AvgIngressRate))
+ false -> (RamMsgCountPrev + RamMsgCount +
+ RamAckCount + RamAckCountPrev) /
+ (4 * (AvgEgressRate + AvgIngressRate +
+ AvgAckEgressRate + AvgAckIngressRate))
end,
{Duration, State #vqstate {
@@ -721,14 +775,24 @@ ram_duration(State = #vqstate {
avg_egress = AvgEgressRate,
avg_ingress = AvgIngressRate,
timestamp = Now },
+ ack_rates = ARates #rates {
+ egress = AckEgress1,
+ ingress = AckIngress1,
+ avg_egress = AvgAckEgressRate,
+ avg_ingress = AvgAckIngressRate,
+ timestamp = Now },
in_counter = 0,
out_counter = 0,
- ram_msg_count_prev = RamMsgCount }}.
+ ack_in_counter = 0,
+ ack_out_counter = 0,
+ ram_msg_count_prev = RamMsgCount,
+ ram_ack_count_prev = RamAckCount }}.
needs_idle_timeout(State = #vqstate { on_sync = ?BLANK_SYNC }) ->
- {Res, _State} = reduce_memory_use(fun (_Quota, State1) -> State1 end,
+ {Res, _State} = reduce_memory_use(fun (_Quota, State1) -> {0, State1} end,
fun (_Quota, State1) -> State1 end,
fun (State1) -> State1 end,
+ fun (_Quota, State1) -> {0, State1} end,
State),
Res;
needs_idle_timeout(_State) ->
@@ -740,32 +804,39 @@ handle_pre_hibernate(State = #vqstate { index_state = IndexState }) ->
State #vqstate { index_state = rabbit_queue_index:flush(IndexState) }.
status(#vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
- len = Len,
- pending_ack = PA,
- on_sync = #sync { funs = From },
- target_ram_msg_count = TargetRamMsgCount,
- ram_msg_count = RamMsgCount,
- ram_index_count = RamIndexCount,
- next_seq_id = NextSeqId,
- persistent_count = PersistentCount,
- rates = #rates {
+ len = Len,
+ pending_ack = PA,
+ ram_ack_index = RAI,
+ on_sync = #sync { funs = From },
+ target_ram_item_count = TargetRamItemCount,
+ ram_msg_count = RamMsgCount,
+ ram_index_count = RamIndexCount,
+ next_seq_id = NextSeqId,
+ persistent_count = PersistentCount,
+ rates = #rates {
avg_egress = AvgEgressRate,
- avg_ingress = AvgIngressRate } }) ->
- [ {q1 , queue:len(Q1)},
- {q2 , bpqueue:len(Q2)},
- {delta , Delta},
- {q3 , bpqueue:len(Q3)},
- {q4 , queue:len(Q4)},
- {len , Len},
- {pending_acks , dict:size(PA)},
- {outstanding_txns , length(From)},
- {target_ram_msg_count , TargetRamMsgCount},
- {ram_msg_count , RamMsgCount},
- {ram_index_count , RamIndexCount},
- {next_seq_id , NextSeqId},
- {persistent_count , PersistentCount},
- {avg_egress_rate , AvgEgressRate},
- {avg_ingress_rate , AvgIngressRate} ].
+ avg_ingress = AvgIngressRate },
+ ack_rates = #rates {
+ avg_egress = AvgAckEgressRate,
+ avg_ingress = AvgAckIngressRate } }) ->
+ [ {q1 , queue:len(Q1)},
+ {q2 , bpqueue:len(Q2)},
+ {delta , Delta},
+ {q3 , bpqueue:len(Q3)},
+ {q4 , queue:len(Q4)},
+ {len , Len},
+ {pending_acks , dict:size(PA)},
+ {outstanding_txns , length(From)},
+ {target_ram_item_count , TargetRamItemCount},
+ {ram_msg_count , RamMsgCount},
+ {ram_ack_count , gb_trees:size(RAI)},
+ {ram_index_count , RamIndexCount},
+ {next_seq_id , NextSeqId},
+ {persistent_count , PersistentCount},
+ {avg_ingress_rate , AvgIngressRate},
+ {avg_egress_rate , AvgEgressRate},
+ {avg_ack_ingress_rate , AvgAckIngressRate},
+ {avg_ack_egress_rate , AvgAckEgressRate} ].
%%----------------------------------------------------------------------------
%% Minor helpers
@@ -955,35 +1026,43 @@ init(IsDurable, IndexState, DeltaCount, Terms,
end,
Now = now(),
State = #vqstate {
- q1 = queue:new(),
- q2 = bpqueue:new(),
- delta = Delta,
- q3 = bpqueue:new(),
- q4 = queue:new(),
- next_seq_id = NextSeqId,
- pending_ack = dict:new(),
- index_state = IndexState1,
- msg_store_clients = {PersistentClient, TransientClient},
- on_sync = ?BLANK_SYNC,
- durable = IsDurable,
- transient_threshold = NextSeqId,
-
- len = DeltaCount1,
- persistent_count = DeltaCount1,
-
- target_ram_msg_count = infinity,
- ram_msg_count = 0,
- ram_msg_count_prev = 0,
- ram_index_count = 0,
- out_counter = 0,
- in_counter = 0,
- rates = #rates { egress = {Now, 0},
- ingress = {Now, DeltaCount1},
- avg_egress = 0.0,
- avg_ingress = 0.0,
- timestamp = Now } },
+ q1 = queue:new(),
+ q2 = bpqueue:new(),
+ delta = Delta,
+ q3 = bpqueue:new(),
+ q4 = queue:new(),
+ next_seq_id = NextSeqId,
+ pending_ack = dict:new(),
+ ram_ack_index = gb_trees:empty(),
+ index_state = IndexState1,
+ msg_store_clients = {PersistentClient, TransientClient},
+ on_sync = ?BLANK_SYNC,
+ durable = IsDurable,
+ transient_threshold = NextSeqId,
+
+ len = DeltaCount1,
+ persistent_count = DeltaCount1,
+
+ target_ram_item_count = infinity,
+ ram_msg_count = 0,
+ ram_msg_count_prev = 0,
+ ram_ack_count_prev = 0,
+ ram_index_count = 0,
+ out_counter = 0,
+ in_counter = 0,
+ ack_out_counter = 0,
+ ack_in_counter = 0,
+ rates = blank_rate(Now, DeltaCount1),
+ ack_rates = blank_rate(Now, 0) },
a(maybe_deltas_to_betas(State)).
+blank_rate(Timestamp, IngressLength) ->
+ #rates { egress = {Timestamp, 0},
+ ingress = {Timestamp, IngressLength},
+ avg_egress = 0.0,
+ avg_ingress = 0.0,
+ timestamp = Timestamp }.
+
msg_store_callback(PersistentGuids, Pubs, AckTags, Fun, MsgPropsFun) ->
Self = self(),
F = fun () -> rabbit_amqqueue:maybe_run_queue_via_backing_queue(
@@ -1191,12 +1270,21 @@ record_pending_ack(#msg_status { seq_id = SeqId,
guid = Guid,
is_persistent = IsPersistent,
msg_on_disk = MsgOnDisk,
- msg_props = MsgProps } = MsgStatus, PA) ->
- AckEntry = case MsgOnDisk of
- true -> {IsPersistent, Guid, MsgProps};
- false -> MsgStatus
- end,
- dict:store(SeqId, AckEntry, PA).
+ msg_props = MsgProps } = MsgStatus,
+ State = #vqstate { pending_ack = PA,
+ ram_ack_index = RAI,
+ ack_in_counter = AckInCount}) ->
+ {AckEntry, RAI1} =
+ case MsgOnDisk of
+ true ->
+ {{IsPersistent, Guid, MsgProps}, RAI};
+ false ->
+ {MsgStatus, gb_trees:insert(SeqId, Guid, RAI)}
+ end,
+ PA1 = dict:store(SeqId, AckEntry, PA),
+ State #vqstate { pending_ack = PA1,
+ ram_ack_index = RAI1,
+ ack_in_counter = AckInCount + 1}.
remove_pending_ack(KeepPersistent,
State = #vqstate { pending_ack = PA,
@@ -1204,7 +1292,8 @@ remove_pending_ack(KeepPersistent,
msg_store_clients = MSCState }) ->
{SeqIds, GuidsByStore} = dict:fold(fun accumulate_ack/3,
{[], orddict:new()}, PA),
- State1 = State #vqstate { pending_ack = dict:new() },
+ State1 = State #vqstate { pending_ack = dict:new(),
+ ram_ack_index = gb_trees:empty() },
case KeepPersistent of
true -> case orddict:find(false, GuidsByStore) of
error -> State1;
@@ -1226,13 +1315,17 @@ ack(MsgStoreFun, Fun, AckTags, State) ->
{{SeqIds, GuidsByStore},
State1 = #vqstate { index_state = IndexState,
msg_store_clients = MSCState,
- persistent_count = PCount }} =
+ persistent_count = PCount,
+ ack_out_counter = AckOutCount }} =
lists:foldl(
- fun (SeqId, {Acc, State2 = #vqstate { pending_ack = PA }}) ->
+ fun (SeqId, {Acc, State2 = #vqstate { pending_ack = PA,
+ ram_ack_index = RAI }}) ->
AckEntry = dict:fetch(SeqId, PA),
{accumulate_ack(SeqId, AckEntry, Acc),
Fun(AckEntry, State2 #vqstate {
- pending_ack = dict:erase(SeqId, PA) })}
+ pending_ack = dict:erase(SeqId, PA),
+ ram_ack_index =
+ gb_trees:delete_any(SeqId, RAI)})}
end, {{[], orddict:new()}, State}, AckTags),
IndexState1 = rabbit_queue_index:ack(SeqIds, IndexState),
ok = orddict:fold(fun (IsPersistent, Guids, ok) ->
@@ -1241,7 +1334,8 @@ ack(MsgStoreFun, Fun, AckTags, State) ->
PCount1 = PCount - find_persistent_count(sum_guids_by_store_to_len(
orddict:new(), GuidsByStore)),
State1 #vqstate { index_state = IndexState1,
- persistent_count = PCount1 }.
+ persistent_count = PCount1,
+ ack_out_counter = AckOutCount + length(AckTags) }.
accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS
msg_on_disk = false,
@@ -1270,7 +1364,7 @@ find_persistent_count(LensByStore) ->
%% though the conversion function for that is called as necessary. The
%% reason is twofold. Firstly, this is safe because the conversion is
%% only ever necessary just after a transition to a
-%% target_ram_msg_count of zero or after an incremental alpha->beta
+%% target_ram_item_count of zero or after an incremental alpha->beta
%% conversion. In the former case the conversion is performed straight
%% away (i.e. any betas present at the time are converted to deltas),
%% and in the latter case the need for a conversion is flagged up
@@ -1280,26 +1374,87 @@ find_persistent_count(LensByStore) ->
%% one segment's worth of messages in q3 - and thus would risk
%% perpetually reporting the need for a conversion when no such
%% conversion is needed. That in turn could cause an infinite loop.
-reduce_memory_use(AlphaBetaFun, BetaGammaFun, BetaDeltaFun, State) ->
- {Reduce, State1} = case chunk_size(State #vqstate.ram_msg_count,
- State #vqstate.target_ram_msg_count) of
- 0 -> {false, State};
- S1 -> {true, AlphaBetaFun(S1, State)}
- end,
- case State1 #vqstate.target_ram_msg_count of
- infinity -> {Reduce, State1};
- 0 -> {Reduce, BetaDeltaFun(State1)};
- _ -> case chunk_size(State1 #vqstate.ram_index_count,
- permitted_ram_index_count(State1)) of
- ?IO_BATCH_SIZE = S2 -> {true, BetaGammaFun(S2, State1)};
- _ -> {Reduce, State1}
- end
+reduce_memory_use(_AlphaBetaFun, _BetaGammaFun, _BetaDeltaFun, _AckFun,
+ State = #vqstate {target_ram_item_count = infinity}) ->
+ {false, State};
+reduce_memory_use(AlphaBetaFun, BetaGammaFun, BetaDeltaFun, AckFun,
+ State = #vqstate {
+ ram_ack_index = RamAckIndex,
+ ram_msg_count = RamMsgCount,
+ target_ram_item_count = TargetRamItemCount,
+ rates = #rates {
+ avg_ingress = AvgIngress,
+ avg_egress = AvgEgress },
+ ack_rates = #rates {
+ avg_ingress = AvgAckIngress,
+ avg_egress = AvgAckEgress } }) ->
+
+ {Reduce, State1} =
+ case chunk_size(RamMsgCount + gb_trees:size(RamAckIndex),
+ TargetRamItemCount) of
+ 0 ->
+ {false, State};
+ S1 ->
+ ReduceFuns =
+ case (AvgAckIngress - AvgAckEgress) >
+ (AvgIngress - AvgEgress) of
+ true ->
+ %% ACKs are growing faster than the queue,
+ %% push messages from there first.
+ [AckFun, AlphaBetaFun];
+ false ->
+ %% The queue is growing faster than the
+ %% acks, push queue messages first.
+ [AlphaBetaFun, AckFun]
+ end,
+ {_, State2} =
+ %% Both reduce functions get a chance to reduce
+ %% memory. The second may very well get a quota of
+ %% 0 if the first function managed to push out the
+ %% maximum number of messages.
+ lists:foldl(
+ fun (ReduceFun, {QuotaN, StateN}) ->
+ ReduceFun(QuotaN, StateN)
+ end, {S1, State}, ReduceFuns),
+ {true, State2}
+ end,
+
+ case State1 #vqstate.target_ram_item_count of
+ 0 -> {Reduce, BetaDeltaFun(State1)};
+ _ -> case chunk_size(State1 #vqstate.ram_index_count,
+ permitted_ram_index_count(State1)) of
+ ?IO_BATCH_SIZE = S2 -> {true, BetaGammaFun(S2, State1)};
+ _ -> {Reduce, State1}
+ end
end.
+limit_ram_acks(0, State) ->
+ {0, State};
+limit_ram_acks(Quota, State = #vqstate { pending_ack = PA,
+ ram_ack_index = RAI }) ->
+ case gb_trees:is_empty(RAI) of
+ true ->
+ {Quota, State};
+ false ->
+ {SeqId, Guid, RAI1} = gb_trees:take_largest(RAI),
+ MsgStatus = #msg_status {
+ guid = Guid, %% ASSERTION
+ is_persistent = false, %% ASSERTION
+ msg_props = MsgProps } = dict:fetch(SeqId, PA),
+ {_, State1} = maybe_write_to_disk(true, false, MsgStatus, State),
+ limit_ram_acks(Quota - 1,
+ State1 #vqstate {
+ pending_ack =
+ dict:store(SeqId, {false, Guid, MsgProps}, PA),
+ ram_ack_index = RAI1 })
+ end.
+
+
reduce_memory_use(State) ->
{_, State1} = reduce_memory_use(fun push_alphas_to_betas/2,
fun limit_ram_index/2,
fun push_betas_to_deltas/1,
+ fun limit_ram_acks/2,
State),
State1.
@@ -1432,9 +1587,9 @@ maybe_deltas_to_betas(State = #vqstate {
end.
push_alphas_to_betas(Quota, State) ->
- { Quota1, State1} = maybe_push_q1_to_betas(Quota, State),
- {_Quota2, State2} = maybe_push_q4_to_betas(Quota1, State1),
- State2.
+ {Quota1, State1} = maybe_push_q1_to_betas(Quota, State),
+ {Quota2, State2} = maybe_push_q4_to_betas(Quota1, State1),
+ {Quota2, State2}.
maybe_push_q1_to_betas(Quota, State = #vqstate { q1 = Q1 }) ->
maybe_push_alphas_to_betas(
@@ -1460,10 +1615,11 @@ maybe_push_q4_to_betas(Quota, State = #vqstate { q4 = Q4 }) ->
maybe_push_alphas_to_betas(_Generator, _Consumer, Quota, _Q,
State = #vqstate {
- ram_msg_count = RamMsgCount,
- target_ram_msg_count = TargetRamMsgCount })
+ ram_msg_count = RamMsgCount,
+ target_ram_item_count = TargetRamItemCount })
when Quota =:= 0 orelse
- TargetRamMsgCount =:= infinity orelse TargetRamMsgCount >= RamMsgCount ->
+ TargetRamItemCount =:= infinity orelse
+ TargetRamItemCount >= RamMsgCount ->
{Quota, State};
maybe_push_alphas_to_betas(Generator, Consumer, Quota, Q, State) ->
case Generator(Q) of