diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2010-11-24 11:51:29 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2010-11-24 11:51:29 +0000 |
commit | 99a0a09b6736eb6e535f54efa834c31dccc7a427 (patch) | |
tree | 59852472aab6b517b5a913ab7cb6c24accb45cde | |
parent | 1327df8fc4821abd824c23242bcf07236fd5df06 (diff) | |
parent | ad4d8b090b45f47569c40dde99b630537083f403 (diff) | |
download | rabbitmq-server-99a0a09b6736eb6e535f54efa834c31dccc7a427.tar.gz |
Merge from bug23467 (and hence default)
-rw-r--r-- | packaging/RPMS/Fedora/rabbitmq-server.spec | 9 | ||||
-rw-r--r-- | packaging/common/rabbitmq-asroot-script-wrapper | 45 | ||||
-rw-r--r-- | packaging/macports/Makefile | 4 | ||||
-rwxr-xr-x | scripts/rabbitmq-server | 5 | ||||
-rw-r--r-- | scripts/rabbitmq-server.bat | 8 | ||||
-rw-r--r-- | scripts/rabbitmq-service.bat | 8 | ||||
-rw-r--r-- | src/rabbit.erl | 81 | ||||
-rw-r--r-- | src/rabbit_access_control.erl | 3 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 10 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 59 | ||||
-rw-r--r-- | src/rabbit_control.erl | 6 | ||||
-rw-r--r-- | src/rabbit_exchange.erl | 15 | ||||
-rw-r--r-- | src/rabbit_misc.erl | 53 | ||||
-rw-r--r-- | src/rabbit_mnesia.erl | 53 | ||||
-rw-r--r-- | src/rabbit_multi.erl | 3 | ||||
-rw-r--r-- | src/rabbit_plugin_activator.erl | 6 | ||||
-rw-r--r-- | src/rabbit_queue_index.erl | 121 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 41 | ||||
-rw-r--r-- | src/rabbit_ssl.erl | 9 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 32 | ||||
-rw-r--r-- | src/rabbit_upgrade.erl | 86 | ||||
-rw-r--r-- | src/rabbit_upgrade_functions.erl | 53 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 468 |
23 files changed, 706 insertions, 472 deletions
diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec index 209a90ee..5032f471 100644 --- a/packaging/RPMS/Fedora/rabbitmq-server.spec +++ b/packaging/RPMS/Fedora/rabbitmq-server.spec @@ -9,8 +9,7 @@ Source: http://www.rabbitmq.com/releases/rabbitmq-server/v%{version}/%{name}-%{v Source1: rabbitmq-server.init Source2: rabbitmq-script-wrapper Source3: rabbitmq-server.logrotate -Source4: rabbitmq-asroot-script-wrapper -Source5: rabbitmq-server.ocf +Source4: rabbitmq-server.ocf URL: http://www.rabbitmq.com/ BuildArch: noarch BuildRequires: erlang >= R12B-3, python-simplejson, xmlto, libxslt @@ -29,8 +28,7 @@ scalable implementation of an AMQP broker. %define _rabbit_libdir %{_exec_prefix}/lib/rabbitmq %define _rabbit_erllibdir %{_rabbit_libdir}/lib/rabbitmq_server-%{version} %define _rabbit_wrapper %{_builddir}/`basename %{S:2}` -%define _rabbit_asroot_wrapper %{_builddir}/`basename %{S:4}` -%define _rabbit_server_ocf %{_builddir}/`basename %{S:5}` +%define _rabbit_server_ocf %{_builddir}/`basename %{S:4}` %define _plugins_state_dir %{_localstatedir}/lib/rabbitmq/plugins %define _maindir %{buildroot}%{_rabbit_erllibdir} @@ -40,8 +38,7 @@ scalable implementation of an AMQP broker. %build cp %{S:2} %{_rabbit_wrapper} -cp %{S:4} %{_rabbit_asroot_wrapper} -cp %{S:5} %{_rabbit_server_ocf} +cp %{S:4} %{_rabbit_server_ocf} make %{?_smp_mflags} %install diff --git a/packaging/common/rabbitmq-asroot-script-wrapper b/packaging/common/rabbitmq-asroot-script-wrapper deleted file mode 100644 index 693a6f0b..00000000 --- a/packaging/common/rabbitmq-asroot-script-wrapper +++ /dev/null @@ -1,45 +0,0 @@ -#!/bin/sh -## The contents of this file are subject to the Mozilla Public License -## Version 1.1 (the "License"); you may not use this file except in -## compliance with the License. You may obtain a copy of the License at -## http://www.mozilla.org/MPL/ -## -## Software distributed under the License is distributed on an "AS IS" -## basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the -## License for the specific language governing rights and limitations -## under the License. -## -## The Original Code is RabbitMQ. -## -## The Initial Developers of the Original Code are LShift Ltd, -## Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. -## -## Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, -## Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd -## are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial -## Technologies LLC, and Rabbit Technologies Ltd. -## -## Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift -## Ltd. Portions created by Cohesive Financial Technologies LLC are -## Copyright (C) 2007-2010 Cohesive Financial Technologies -## LLC. Portions created by Rabbit Technologies Ltd are Copyright -## (C) 2007-2010 Rabbit Technologies Ltd. -## -## All Rights Reserved. -## -## Contributor(s): ______________________________________. -## - -cd /var/lib/rabbitmq - -SCRIPT=`basename $0` - -if [ `id -u` = 0 ] ; then - /usr/lib/rabbitmq/bin/${SCRIPT} "$@" -else - echo - echo "Only root should run ${SCRIPT}" - echo - exit 1 -fi - diff --git a/packaging/macports/Makefile b/packaging/macports/Makefile index ee79c95a..47da02dc 100644 --- a/packaging/macports/Makefile +++ b/packaging/macports/Makefile @@ -38,9 +38,7 @@ $(DEST)/Portfile: Portfile.in # needs vars such as HOME to be set. So we have to set them # explicitly. macports: dirs $(DEST)/Portfile - for f in rabbitmq-asroot-script-wrapper rabbitmq-script-wrapper ; do \ - cp $(COMMON_DIR)/$$f $(DEST)/files ; \ - done + cp $(COMMON_DIR)/rabbitmq-script-wrapper $(DEST)/files sed -i -e 's|@SU_RABBITMQ_SH_C@|SHELL=/bin/sh HOME=/var/lib/rabbitmq USER=rabbitmq LOGNAME=rabbitmq PATH="$$(eval `PATH=MACPORTS_PREFIX/bin /usr/libexec/path_helper -s`; echo $$PATH)" su -m rabbitmq -c|' \ $(DEST)/files/rabbitmq-script-wrapper cp patch-org.macports.rabbitmq-server.plist.diff $(DEST)/files diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server index 0eb7092d..c5d883c3 100755 --- a/scripts/rabbitmq-server +++ b/scripts/rabbitmq-server @@ -89,13 +89,10 @@ RABBITMQ_EBIN_ROOT="${RABBITMQ_HOME}/ebin" if [ "x" = "x$RABBITMQ_NODE_ONLY" ]; then if erl \ -pa "$RABBITMQ_EBIN_ROOT" \ - -rabbit plugins_dir "\"$RABBITMQ_PLUGINS_DIR\"" \ - -rabbit plugins_expand_dir "\"$RABBITMQ_PLUGINS_EXPAND_DIR\"" \ - -rabbit rabbit_ebin "\"$RABBITMQ_EBIN_ROOT\"" \ -noinput \ -hidden \ -s rabbit_plugin_activator \ - -extra "$@" + -extra "$RABBITMQ_PLUGINS_DIR" "${RABBITMQ_PLUGINS_EXPAND_DIR}" then RABBITMQ_BOOT_FILE="${RABBITMQ_PLUGINS_EXPAND_DIR}/rabbit" RABBITMQ_EBIN_PATH="" diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat index bd4120fa..94180de9 100644 --- a/scripts/rabbitmq-server.bat +++ b/scripts/rabbitmq-server.bat @@ -118,12 +118,10 @@ set RABBITMQ_EBIN_ROOT=!TDP0!..\ebin -pa "!RABBITMQ_EBIN_ROOT!" ^
-noinput -hidden ^
-s rabbit_plugin_activator ^
--rabbit plugins_dir \""!RABBITMQ_PLUGINS_DIR:\=/!"\" ^
--rabbit plugins_expand_dir \""!RABBITMQ_PLUGINS_EXPAND_DIR:\=/!"\" ^
--rabbit rabbit_ebin \""!RABBITMQ_EBIN_ROOT:\=/!"\" ^
--extra !STAR!
+-extra "!RABBITMQ_PLUGINS_DIR:\=/!" ^
+ "!RABBITMQ_PLUGINS_EXPAND_DIR:\=/!"
-set RABBITMQ_BOOT_FILE=!RABBITMQ_MNESIA_DIR!\plugins-scratch\rabbit
+set RABBITMQ_BOOT_FILE=!RABBITMQ_PLUGINS_EXPAND_DIR!\rabbit
if not exist "!RABBITMQ_BOOT_FILE!.boot" (
echo Custom Boot File "!RABBITMQ_BOOT_FILE!.boot" is missing.
exit /B 1
diff --git a/scripts/rabbitmq-service.bat b/scripts/rabbitmq-service.bat index ff25b146..2c96b6fd 100644 --- a/scripts/rabbitmq-service.bat +++ b/scripts/rabbitmq-service.bat @@ -187,12 +187,10 @@ set RABBITMQ_EBIN_ROOT=!TDP0!..\ebin -pa "!RABBITMQ_EBIN_ROOT!" ^
-noinput -hidden ^
-s rabbit_plugin_activator ^
--rabbit plugins_dir \""!RABBITMQ_PLUGINS_DIR:\=/!"\" ^
--rabbit plugins_expand_dir \""!RABBITMQ_PLUGINS_EXPAND_DIR:\=/!"\" ^
--rabbit rabbit_ebin \""!RABBITMQ_EBIN_ROOT:\=/!"\" ^
--extra !STAR!
+-extra "!RABBITMQ_PLUGINS_DIR:\=/!" ^
+ "!RABBITMQ_PLUGINS_EXPAND_DIR:\=/!"
-set RABBITMQ_BOOT_FILE=!RABBITMQ_MNESIA_DIR!\plugins-scratch\rabbit
+set RABBITMQ_BOOT_FILE=!RABBITMQ_PLUGINS_EXPAND_DIR!\rabbit
if not exist "!RABBITMQ_BOOT_FILE!.boot" (
echo Custom Boot File "!RABBITMQ_BOOT_FILE!.boot" is missing.
exit /B 1
diff --git a/src/rabbit.erl b/src/rabbit.erl index 64c75102..e6657b32 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -291,11 +291,10 @@ run_boot_step({StepName, Attributes}) -> io:format("-- ~s~n", [Description]); MFAs -> io:format("starting ~-60s ...", [Description]), - [case catch apply(M,F,A) of - {'EXIT', Reason} -> - boot_error("FAILED~nReason: ~p~n", [Reason]); - ok -> - ok + [try + apply(M,F,A) + catch + _:Reason -> boot_error("FAILED~nReason: ~p~n", [Reason]) end || {M,F,A} <- MFAs], io:format("done~n"), ok @@ -315,43 +314,43 @@ edges(_Module, Steps) -> {Key, OtherStep} <- Atts, Key =:= requires orelse Key =:= enables]. -graph_build_error({vertex, duplicate, StepName}) -> - boot_error("Duplicate boot step name: ~w~n", [StepName]); -graph_build_error({edge, Reason, From, To}) -> - boot_error( - "Could not add boot step dependency of ~w on ~w:~n~s", - [To, From, - case Reason of - {bad_vertex, V} -> - io_lib:format("Boot step not registered: ~w~n", [V]); - {bad_edge, [First | Rest]} -> - [io_lib:format("Cyclic dependency: ~w", [First]), - [io_lib:format(" depends on ~w", [Next]) || Next <- Rest], - io_lib:format(" depends on ~w~n", [First])] - end]). - sort_boot_steps(UnsortedSteps) -> - G = rabbit_misc:build_acyclic_graph( - fun vertices/2, fun edges/2, fun graph_build_error/1, UnsortedSteps), - - %% Use topological sort to find a consistent ordering (if there is - %% one, otherwise fail). - SortedStepsRev = [begin - {StepName, Step} = digraph:vertex(G, StepName), - Step - end || StepName <- digraph_utils:topsort(G)], - SortedSteps = lists:reverse(SortedStepsRev), - - digraph:delete(G), - - %% Check that all mentioned {M,F,A} triples are exported. - case [{StepName, {M,F,A}} - || {StepName, Attributes} <- SortedSteps, - {mfa, {M,F,A}} <- Attributes, - not erlang:function_exported(M, F, length(A))] of - [] -> SortedSteps; - MissingFunctions -> boot_error("Boot step functions not exported: ~p~n", - [MissingFunctions]) + case rabbit_misc:build_acyclic_graph(fun vertices/2, fun edges/2, + UnsortedSteps) of + {ok, G} -> + %% Use topological sort to find a consistent ordering (if + %% there is one, otherwise fail). + SortedSteps = lists:reverse( + [begin + {StepName, Step} = digraph:vertex(G, StepName), + Step + end || StepName <- digraph_utils:topsort(G)]), + digraph:delete(G), + %% Check that all mentioned {M,F,A} triples are exported. + case [{StepName, {M,F,A}} || + {StepName, Attributes} <- SortedSteps, + {mfa, {M,F,A}} <- Attributes, + not erlang:function_exported(M, F, length(A))] of + [] -> SortedSteps; + MissingFunctions -> boot_error( + "Boot step functions not exported: ~p~n", + [MissingFunctions]) + end; + {error, {vertex, duplicate, StepName}} -> + boot_error("Duplicate boot step name: ~w~n", [StepName]); + {error, {edge, Reason, From, To}} -> + boot_error( + "Could not add boot step dependency of ~w on ~w:~n~s", + [To, From, + case Reason of + {bad_vertex, V} -> + io_lib:format("Boot step not registered: ~w~n", [V]); + {bad_edge, [First | Rest]} -> + [io_lib:format("Cyclic dependency: ~w", [First]), + [io_lib:format(" depends on ~w", [Next]) || + Next <- Rest], + io_lib:format(" depends on ~w~n", [First])] + end]) end. %%--------------------------------------------------------------------------- diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl index 3a68346c..7d8ccb36 100644 --- a/src/rabbit_access_control.erl +++ b/src/rabbit_access_control.erl @@ -38,7 +38,7 @@ check_resource_access/3, list_vhosts/1]). -export([add_user/2, delete_user/1, change_password/2, set_admin/1, clear_admin/1, list_users/0, lookup_user/1]). --export([change_password_hash/2]). +-export([change_password_hash/2, hash_password/1]). -export([add_vhost/1, delete_vhost/1, vhost_exists/1, list_vhosts/0]). -export([set_permissions/5, clear_permissions/2, list_permissions/0, list_vhost_permissions/1, list_user_permissions/1, @@ -73,6 +73,7 @@ -spec(delete_user/1 :: (username()) -> 'ok'). -spec(change_password/2 :: (username(), password()) -> 'ok'). -spec(change_password_hash/2 :: (username(), password_hash()) -> 'ok'). +-spec(hash_password/1 :: (password()) -> password_hash()). -spec(set_admin/1 :: (username()) -> 'ok'). -spec(clear_admin/1 :: (username()) -> 'ok'). -spec(list_users/0 :: () -> [{username(), boolean()}]). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 75f285df..a999fe58 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -669,7 +669,10 @@ i(Item, _) -> throw({bad_argument, Item}). emit_stats(State) -> - rabbit_event:notify(queue_stats, infos(?STATISTICS_KEYS, State)). + emit_stats(State, []). + +emit_stats(State, Extra) -> + rabbit_event:notify(queue_stats, Extra ++ infos(?STATISTICS_KEYS, State)). %--------------------------------------------------------------------------- @@ -1053,7 +1056,10 @@ handle_pre_hibernate(State = #q{backing_queue = BQ, DesiredDuration = rabbit_memory_monitor:report_ram_duration(self(), infinity), BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1), - rabbit_event:if_enabled(StatsTimer, fun () -> emit_stats(State) end), + rabbit_event:if_enabled(StatsTimer, + fun () -> + emit_stats(State, [{idle_since, now()}]) + end), State1 = State#q{stats_timer = rabbit_event:stop_stats_timer(StatsTimer), backing_queue_state = BQS2}, {hibernate, stop_rate_timer(State1)}. diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 863081a7..d0018ea5 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -245,7 +245,9 @@ handle_cast({command, Msg}, State = #ch{writer_pid = WriterPid}) -> handle_cast({deliver, ConsumerTag, AckRequired, Msg}, State = #ch{writer_pid = WriterPid, next_tag = DeliveryTag}) -> - State1 = lock_message(AckRequired, {DeliveryTag, ConsumerTag, Msg}, State), + State1 = lock_message(AckRequired, + ack_record(DeliveryTag, ConsumerTag, Msg), + State), ok = internal_deliver(WriterPid, true, ConsumerTag, DeliveryTag, Msg), {_QName, QPid, _MsgId, _Redelivered, _Msg} = Msg, maybe_incr_stats([{QPid, 1}], @@ -267,9 +269,11 @@ handle_info({'DOWN', _MRef, process, QPid, _Reason}, State) -> handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) -> ok = clear_permission_cache(), - rabbit_event:if_enabled(StatsTimer, fun () -> - internal_emit_stats(State) - end), + rabbit_event:if_enabled(StatsTimer, + fun () -> + internal_emit_stats( + State, [{idle_since, now()}]) + end), {hibernate, State#ch{stats_timer = rabbit_event:stop_stats_timer(StatsTimer)}}. @@ -504,7 +508,9 @@ handle_method(#'basic.get'{queue = QueueNameBin, #basic_message{exchange_name = ExchangeName, routing_key = RoutingKey, content = Content}}} -> - State1 = lock_message(not(NoAck), {DeliveryTag, none, Msg}, State), + State1 = lock_message(not(NoAck), + ack_record(DeliveryTag, none, Msg), + State), maybe_incr_stats([{QPid, 1}], case NoAck of true -> get_no_ack; @@ -640,30 +646,8 @@ handle_method(#'basic.recover_async'{requeue = true}, %% variant of this method {noreply, State#ch{unacked_message_q = queue:new()}}; -handle_method(#'basic.recover_async'{requeue = false}, - _, State = #ch{writer_pid = WriterPid, - unacked_message_q = UAMQ}) -> - ok = rabbit_misc:queue_fold( - fun ({_DeliveryTag, none, _Msg}, ok) -> - %% Was sent as a basic.get_ok. Don't redeliver - %% it. FIXME: appropriate? - ok; - ({DeliveryTag, ConsumerTag, - {QName, QPid, MsgId, _Redelivered, Message}}, ok) -> - %% Was sent as a proper consumer delivery. Resend - %% it as before. - %% - %% FIXME: What should happen if the consumer's been - %% cancelled since? - %% - %% FIXME: should we allocate a fresh DeliveryTag? - internal_deliver( - WriterPid, false, ConsumerTag, DeliveryTag, - {QName, QPid, MsgId, true, Message}) - end, ok, UAMQ), - %% No answer required - basic.recover is the newer, synchronous - %% variant of this method - {noreply, State}; +handle_method(#'basic.recover_async'{requeue = false}, _, _State) -> + rabbit_misc:protocol_error(not_implemented, "requeue=false", []); handle_method(#'basic.recover'{requeue = Requeue}, Content, State) -> {noreply, State2 = #ch{writer_pid = WriterPid}} = @@ -985,6 +969,10 @@ basic_return(#basic_message{exchange_name = ExchangeName, routing_key = RoutingKey}, Content). +ack_record(DeliveryTag, ConsumerTag, + _MsgStruct = {_QName, QPid, MsgId, _Redelivered, _Msg}) -> + {DeliveryTag, ConsumerTag, {QPid, MsgId}}. + collect_acks(Q, 0, true) -> {Q, queue:new()}; collect_acks(Q, DeliveryTag, Multiple) -> @@ -1057,8 +1045,7 @@ rollback_and_notify(State) -> fold_per_queue(F, Acc0, UAQ) -> D = rabbit_misc:queue_fold( - fun ({_DTag, _CTag, - {_QName, QPid, MsgId, _Redelivered, _Message}}, D) -> + fun ({_DTag, _CTag, {QPid, MsgId}}, D) -> %% dict:append would avoid the lists:reverse in %% handle_message({recover, true}, ...). However, it %% is significantly slower when going beyond a few @@ -1201,11 +1188,14 @@ update_measures(Type, QX, Inc, Measure) -> put({Type, QX}, orddict:store(Measure, Cur + Inc, Measures)). -internal_emit_stats(State = #ch{stats_timer = StatsTimer}) -> +internal_emit_stats(State) -> + internal_emit_stats(State, []). + +internal_emit_stats(State = #ch{stats_timer = StatsTimer}, Extra) -> CoarseStats = infos(?STATISTICS_KEYS, State), case rabbit_event:stats_level(StatsTimer) of coarse -> - rabbit_event:notify(channel_stats, CoarseStats); + rabbit_event:notify(channel_stats, Extra ++ CoarseStats); fine -> FineStats = [{channel_queue_stats, @@ -1215,7 +1205,8 @@ internal_emit_stats(State = #ch{stats_timer = StatsTimer}) -> {channel_queue_exchange_stats, [{QX, Stats} || {{queue_exchange_stats, QX}, Stats} <- get()]}], - rabbit_event:notify(channel_stats, CoarseStats ++ FineStats) + rabbit_event:notify(channel_stats, + Extra ++ CoarseStats ++ FineStats) end. erase_queue_stats(QPid) -> diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index 6c0a727b..72b77b1f 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -94,9 +94,7 @@ start() -> halt(); {'EXIT', {function_clause, [{?MODULE, action, _} | _]}} -> print_error("invalid command '~s'", - [lists:flatten( - rabbit_misc:intersperse( - " ", [atom_to_list(Command) | Args]))]), + [string:join([atom_to_list(Command) | Args], " ")]), usage(); {error, Reason} -> print_error("~p", [Reason]), @@ -321,7 +319,7 @@ display_info_list(Other, _) -> Other. display_row(Row) -> - io:fwrite(lists:flatten(rabbit_misc:intersperse("\t", Row))), + io:fwrite(string:join(Row, "\t")), io:nl(). -define(IS_U8(X), (X >= 0 andalso X =< 255)). diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index f41b76d8..9cc70a26 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -228,7 +228,7 @@ info_all(VHostPath, Items) -> map(VHostPath, fun (X) -> info(X, Items) end). publish(X = #exchange{name = XName}, Delivery) -> rabbit_router:deliver( - route(Delivery, {queue:from_list([X]), sets:from_list([XName]), []}), + route(Delivery, {queue:from_list([X]), XName, []}), Delivery). route(Delivery, {WorkList, SeenXs, QNames}) -> @@ -252,13 +252,22 @@ process_alternate(_X, Results) -> Results. process_route(#resource{kind = exchange} = XName, + {_WorkList, XName, _QNames} = Acc) -> + Acc; +process_route(#resource{kind = exchange} = XName, + {WorkList, #resource{kind = exchange} = SeenX, QNames}) -> + {case lookup(XName) of + {ok, X} -> queue:in(X, WorkList); + {error, not_found} -> WorkList + end, gb_sets:from_list([SeenX, XName]), QNames}; +process_route(#resource{kind = exchange} = XName, {WorkList, SeenXs, QNames} = Acc) -> - case sets:is_element(XName, SeenXs) of + case gb_sets:is_element(XName, SeenXs) of true -> Acc; false -> {case lookup(XName) of {ok, X} -> queue:in(X, WorkList); {error, not_found} -> WorkList - end, sets:add_element(XName, SeenXs), QNames} + end, gb_sets:add_element(XName, SeenXs), QNames} end; process_route(#resource{kind = queue} = QName, {WorkList, SeenXs, QNames}) -> diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 0522afdc..230f4db5 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -50,7 +50,7 @@ -export([execute_mnesia_transaction/1]). -export([ensure_ok/2]). -export([makenode/1, nodeparts/1, cookie_hash/0, tcp_name/3]). --export([intersperse/2, upmap/2, map_in_order/2]). +-export([upmap/2, map_in_order/2]). -export([table_fold/3]). -export([dirty_read_all/1, dirty_foreach_key/2, dirty_dump_log/1]). -export([read_term_file/1, write_term_file/2]). @@ -64,7 +64,7 @@ -export([recursive_delete/1, dict_cons/3, orddict_cons/3, unlink_and_capture_exit/1]). -export([get_options/2]). --export([all_module_attributes/1, build_acyclic_graph/4]). +-export([all_module_attributes/1, build_acyclic_graph/3]). -export([now_ms/0]). -import(mnesia). @@ -86,10 +86,9 @@ :: rabbit_types:channel_exit() | rabbit_types:connection_exit()). -type(digraph_label() :: term()). -type(graph_vertex_fun() :: - fun ((atom(), [term()]) -> {digraph:vertex(), digraph_label()})). + fun ((atom(), [term()]) -> [{digraph:vertex(), digraph_label()}])). -type(graph_edge_fun() :: - fun ((atom(), [term()]) -> {digraph:vertex(), digraph:vertex()})). --type(graph_error_fun() :: fun ((any()) -> any() | no_return())). + fun ((atom(), [term()]) -> [{digraph:vertex(), digraph:vertex()}])). -spec(method_record_type/1 :: (rabbit_framing:amqp_method_record()) -> rabbit_framing:amqp_method_name()). @@ -155,7 +154,6 @@ -spec(tcp_name/3 :: (atom(), inet:ip_address(), rabbit_networking:ip_port()) -> atom()). --spec(intersperse/2 :: (A, [A]) -> [A]). -spec(upmap/2 :: (fun ((A) -> B), [A]) -> [B]). -spec(map_in_order/2 :: (fun ((A) -> B), [A]) -> [B]). -spec(table_fold/3 :: (fun ((any(), A) -> A), A, atom()) -> A). @@ -191,9 +189,13 @@ -spec(get_options/2 :: ([optdef()], [string()]) -> {[string()], [{string(), any()}]}). -spec(all_module_attributes/1 :: (atom()) -> [{atom(), [term()]}]). --spec(build_acyclic_graph/4 :: (graph_vertex_fun(), graph_edge_fun(), - graph_error_fun(), [{atom(), [term()]}]) -> - digraph()). +-spec(build_acyclic_graph/3 :: + (graph_vertex_fun(), graph_edge_fun(), [{atom(), [term()]}]) + -> rabbit_types:ok_or_error2(digraph(), + {'vertex', 'duplicate', digraph:vertex()} | + {'edge', ({bad_vertex, digraph:vertex()} | + {bad_edge, [digraph:vertex()]}), + digraph:vertex(), digraph:vertex()})). -spec(now_ms/0 :: () -> non_neg_integer()). -endif. @@ -414,10 +416,6 @@ tcp_name(Prefix, IPAddress, Port) io_lib:format("~w_~s:~w", [Prefix, inet_parse:ntoa(IPAddress), Port]))). -intersperse(_, []) -> []; -intersperse(_, [E]) -> [E]; -intersperse(Sep, [E|T]) -> [E, Sep | intersperse(Sep, T)]. - %% This is a modified version of Luke Gorrie's pmap - %% http://lukego.livejournal.com/6753.html - that doesn't care about %% the order in which results are received. @@ -765,16 +763,21 @@ all_module_attributes(Name) -> end, [], Modules). -build_acyclic_graph(VertexFun, EdgeFun, ErrorFun, Graph) -> +build_acyclic_graph(VertexFun, EdgeFun, Graph) -> G = digraph:new([acyclic]), - [ case digraph:vertex(G, Vertex) of - false -> digraph:add_vertex(G, Vertex, Label); - _ -> ErrorFun({vertex, duplicate, Vertex}) - end || {Module, Atts} <- Graph, - {Vertex, Label} <- VertexFun(Module, Atts) ], - [ case digraph:add_edge(G, From, To) of - {error, E} -> ErrorFun({edge, E, From, To}); - _ -> ok - end || {Module, Atts} <- Graph, - {From, To} <- EdgeFun(Module, Atts) ], - G. + try + [case digraph:vertex(G, Vertex) of + false -> digraph:add_vertex(G, Vertex, Label); + _ -> ok = throw({graph_error, {vertex, duplicate, Vertex}}) + end || {Module, Atts} <- Graph, + {Vertex, Label} <- VertexFun(Module, Atts)], + [case digraph:add_edge(G, From, To) of + {error, E} -> throw({graph_error, {edge, E, From, To}}); + _ -> ok + end || {Module, Atts} <- Graph, + {From, To} <- EdgeFun(Module, Atts)], + {ok, G} + catch {graph_error, Reason} -> + true = digraph:delete(G), + {error, Reason} + end. diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 7efc2c21..72e7264d 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -361,16 +361,15 @@ init_db(ClusterNodes, Force) -> case mnesia:change_config(extra_db_nodes, ProperClusterNodes) of {ok, Nodes} -> case Force of - false -> - FailedClusterNodes = ProperClusterNodes -- Nodes, - case FailedClusterNodes of - [] -> ok; - _ -> - throw({error, {failed_to_cluster_with, - FailedClusterNodes, - "Mnesia could not connect to some nodes."}}) - end; - _ -> ok + false -> FailedClusterNodes = ProperClusterNodes -- Nodes, + case FailedClusterNodes of + [] -> ok; + _ -> throw({error, {failed_to_cluster_with, + FailedClusterNodes, + "Mnesia could not connect " + "to some nodes."}}) + end; + true -> ok end, case {Nodes, mnesia:system_info(use_dir), mnesia:system_info(db_nodes)} of @@ -379,7 +378,7 @@ init_db(ClusterNodes, Force) -> wait_for_tables(), case rabbit_upgrade:maybe_upgrade() of ok -> - schema_ok_or_exit(); + ensure_schema_ok(); version_not_available -> schema_ok_or_move() end; @@ -387,15 +386,15 @@ init_db(ClusterNodes, Force) -> %% "Master" (i.e. without config) disc node in cluster, %% verify schema wait_for_tables(), - version_ok_or_exit(rabbit_upgrade:read_version()), - schema_ok_or_exit(); + ensure_version_ok(rabbit_upgrade:read_version()), + ensure_schema_ok(); {[], false, _} -> %% First RAM node in cluster, start from scratch ok = create_schema(); {[AnotherNode|_], _, _} -> %% Subsequent node in cluster, catch up - version_ok_or_exit(rabbit_upgrade:read_version()), - version_ok_or_exit( + ensure_version_ok(rabbit_upgrade:read_version()), + ensure_version_ok( rpc:call(AnotherNode, rabbit_upgrade, read_version, [])), IsDiskNode = ClusterNodes == [] orelse lists:member(node(), ClusterNodes), @@ -405,14 +404,13 @@ init_db(ClusterNodes, Force) -> true -> disc; false -> ram end), - schema_ok_or_exit() + ensure_schema_ok() end; {error, Reason} -> %% one reason we may end up here is if we try to join %% nodes together that are currently running standalone or %% are members of a different cluster - throw({error, {unable_to_join_cluster, - ClusterNodes, Reason}}) + throw({error, {unable_to_join_cluster, ClusterNodes, Reason}}) end. schema_ok_or_move() -> @@ -430,22 +428,19 @@ schema_ok_or_move() -> ok = create_schema() end. -version_ok_or_exit({ok, DiscVersion}) -> +ensure_version_ok({ok, DiscVersion}) -> case rabbit_upgrade:desired_version() of - DiscVersion -> - ok; - DesiredVersion -> - exit({schema_mismatch, DesiredVersion, DiscVersion}) + DiscVersion -> ok; + DesiredVersion -> throw({error, {schema_mismatch, + DesiredVersion, DiscVersion}}) end; -version_ok_or_exit({error, _}) -> +ensure_version_ok({error, _}) -> ok = rabbit_upgrade:write_version(). -schema_ok_or_exit() -> +ensure_schema_ok() -> case check_schema_integrity() of - ok -> - ok; - {error, Reason} -> - exit({schema_invalid, Reason}) + ok -> ok; + {error, Reason} -> throw({error, {schema_invalid, Reason}}) end. create_schema() -> diff --git a/src/rabbit_multi.erl b/src/rabbit_multi.erl index 0440dbe4..0030216e 100644 --- a/src/rabbit_multi.erl +++ b/src/rabbit_multi.erl @@ -65,8 +65,7 @@ start() -> halt(); {'EXIT', {function_clause, [{?MODULE, action, _} | _]}} -> print_error("invalid command '~s'", - [lists:flatten( - rabbit_misc:intersperse(" ", FullCommand))]), + [string:join(FullCommand, " ")]), usage(); timeout -> print_error("timeout starting some nodes.", []), diff --git a/src/rabbit_plugin_activator.erl b/src/rabbit_plugin_activator.erl index ef81ddf2..072f297e 100644 --- a/src/rabbit_plugin_activator.erl +++ b/src/rabbit_plugin_activator.erl @@ -50,13 +50,9 @@ start() -> io:format("Activating RabbitMQ plugins ...~n"), - %% Ensure Rabbit is loaded so we can access it's environment - application:load(rabbit), %% Determine our various directories - {ok, PluginDir} = application:get_env(rabbit, plugins_dir), - {ok, UnpackedPluginDir} = application:get_env(rabbit, plugins_expand_dir), - + [PluginDir, UnpackedPluginDir] = init:get_plain_arguments(), RootName = UnpackedPluginDir ++ "/rabbit", %% Unpack any .ez plugins diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index bde9b3d3..248c1fbc 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -36,6 +36,8 @@ publish/5, deliver/2, ack/2, sync/2, flush/1, read/3, next_segment_boundary/1, bounds/1, recover/1]). +-export([add_queue_ttl/0]). + -define(CLEAN_FILENAME, "clean.dot"). %%---------------------------------------------------------------------------- @@ -180,6 +182,8 @@ %%---------------------------------------------------------------------------- +-rabbit_upgrade({add_queue_ttl, []}). + -ifdef(use_specs). -type(hdl() :: ('undefined' | any())). @@ -226,6 +230,8 @@ -spec(recover/1 :: ([rabbit_amqqueue:name()]) -> {[[any()]], startup_fun_state()}). +-spec(add_queue_ttl/0 :: () -> 'ok'). + -endif. @@ -345,35 +351,36 @@ recover(DurableQueues) -> DurableDict = dict:from_list([ {queue_name_to_dir_name(Queue), Queue} || Queue <- DurableQueues ]), QueuesDir = queues_dir(), - Directories = case file:list_dir(QueuesDir) of - {ok, Entries} -> [ Entry || Entry <- Entries, - filelib:is_dir( - filename:join( - QueuesDir, Entry)) ]; - {error, enoent} -> [] - end, + QueueDirNames = all_queue_directory_names(QueuesDir), DurableDirectories = sets:from_list(dict:fetch_keys(DurableDict)), {DurableQueueNames, DurableTerms} = lists:foldl( - fun (QueueDir, {DurableAcc, TermsAcc}) -> - case sets:is_element(QueueDir, DurableDirectories) of + fun (QueueDirName, {DurableAcc, TermsAcc}) -> + QueueDirPath = filename:join(QueuesDir, QueueDirName), + case sets:is_element(QueueDirName, DurableDirectories) of true -> TermsAcc1 = - case read_shutdown_terms( - filename:join(QueuesDir, QueueDir)) of + case read_shutdown_terms(QueueDirPath) of {error, _} -> TermsAcc; {ok, Terms} -> [Terms | TermsAcc] end, - {[dict:fetch(QueueDir, DurableDict) | DurableAcc], + {[dict:fetch(QueueDirName, DurableDict) | DurableAcc], TermsAcc1}; false -> - Dir = filename:join(queues_dir(), QueueDir), - ok = rabbit_misc:recursive_delete([Dir]), + ok = rabbit_misc:recursive_delete([QueueDirPath]), {DurableAcc, TermsAcc} end - end, {[], []}, Directories), + end, {[], []}, QueueDirNames), {DurableTerms, {fun queue_index_walker/1, {start, DurableQueueNames}}}. +all_queue_directory_names(Dir) -> + case file:list_dir(Dir) of + {ok, Entries} -> [ Entry || Entry <- Entries, + filelib:is_dir( + filename:join(Dir, Entry)) ]; + {error, enoent} -> [] + end. + %%---------------------------------------------------------------------------- %% startup and shutdown %%---------------------------------------------------------------------------- @@ -972,3 +979,87 @@ journal_minus_segment1({no_pub, del, ack}, {?PUB, del, no_ack}) -> {{no_pub, no_del, ack}, 0}; journal_minus_segment1({no_pub, del, ack}, {?PUB, del, ack}) -> {undefined, -1}. + +%%---------------------------------------------------------------------------- +%% upgrade +%%---------------------------------------------------------------------------- + +add_queue_ttl() -> + foreach_queue_index({fun add_queue_ttl_journal/1, + fun add_queue_ttl_segment/1}). + +add_queue_ttl_journal(<<?DEL_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS, + Rest/binary>>) -> + {<<?DEL_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS>>, Rest}; +add_queue_ttl_journal(<<?ACK_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS, + Rest/binary>>) -> + {<<?ACK_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS>>, Rest}; +add_queue_ttl_journal(<<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS, + Guid:?GUID_BYTES/binary, Rest/binary>>) -> + {[<<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS>>, Guid, + expiry_to_binary(undefined)], Rest}; +add_queue_ttl_journal(_) -> + stop. + +add_queue_ttl_segment(<<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS, IsPersistentNum:1, + RelSeq:?REL_SEQ_BITS, Guid:?GUID_BYTES/binary, + Rest/binary>>) -> + {[<<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS, IsPersistentNum:1, + RelSeq:?REL_SEQ_BITS>>, Guid, expiry_to_binary(undefined)], Rest}; +add_queue_ttl_segment(<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, + RelSeq:?REL_SEQ_BITS, Rest>>) -> + {<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, RelSeq:?REL_SEQ_BITS>>, + Rest}; +add_queue_ttl_segment(_) -> + stop. + +%%---------------------------------------------------------------------------- + +foreach_queue_index(Funs) -> + QueuesDir = queues_dir(), + QueueDirNames = all_queue_directory_names(QueuesDir), + {ok, Gatherer} = gatherer:start_link(), + [begin + ok = gatherer:fork(Gatherer), + ok = worker_pool:submit_async( + fun () -> + transform_queue(filename:join(QueuesDir, QueueDirName), + Gatherer, Funs) + end) + end || QueueDirName <- QueueDirNames], + empty = gatherer:out(Gatherer), + ok = gatherer:stop(Gatherer), + ok = rabbit_misc:unlink_and_capture_exit(Gatherer). + +transform_queue(Dir, Gatherer, {JournalFun, SegmentFun}) -> + ok = transform_file(filename:join(Dir, ?JOURNAL_FILENAME), JournalFun), + [ok = transform_file(filename:join(Dir, Seg), SegmentFun) + || Seg <- filelib:wildcard("*" ++ ?SEGMENT_EXTENSION, Dir)], + ok = gatherer:finish(Gatherer). + +transform_file(Path, Fun) -> + PathTmp = Path ++ ".upgrade", + Size = filelib:file_size(Path), + + {ok, PathTmpHdl} = + file_handle_cache:open(PathTmp, [exclusive | ?WRITE_MODE], + [{write_buffer, infinity}]), + + {ok, PathHdl} = + file_handle_cache:open(Path, [{read_ahead, Size} | ?READ_MODE], []), + {ok, Content} = file_handle_cache:read(PathHdl, Size), + ok = file_handle_cache:close(PathHdl), + + ok = drive_transform_fun(Fun, PathTmpHdl, Content), + + ok = file_handle_cache:close(PathTmpHdl), + ok = file:rename(PathTmp, Path). + +drive_transform_fun(Fun, Hdl, Contents) -> + case Fun(Contents) of + stop -> + ok; + {Output, Contents1} -> + ok = file_handle_cache:append(Hdl, Output), + drive_transform_fun(Fun, Hdl, Contents1) + end. diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index cda6fbb1..7e4f31a1 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -45,10 +45,6 @@ -export([emit_stats/1]). --import(gen_tcp). --import(inet). --import(prim_inet). - -define(HANDSHAKE_TIMEOUT, 10). -define(NORMAL_TIMEOUT, 3). -define(CLOSING_TIMEOUT, 1). @@ -325,13 +321,10 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb, done. mainloop(Deb, State = #v1{parent = Parent, sock= Sock, recv_ref = Ref}) -> - %%?LOGDEBUG("Reader mainloop: ~p bytes available, need ~p~n", [HaveBytes, WaitUntilNBytes]), receive {inet_async, Sock, Ref, {ok, Data}} -> - {State1, Callback1, Length1} = - handle_input(State#v1.callback, Data, - State#v1{recv_ref = none}), - mainloop(Deb, switch_callback(State1, Callback1, Length1)); + mainloop(Deb, handle_input(State#v1.callback, Data, + State#v1{recv_ref = none})); {inet_async, Sock, Ref, {error, closed}} -> if State#v1.connection_state =:= closed -> State; @@ -571,7 +564,6 @@ handle_frame(Type, Channel, Payload, error -> throw({unknown_frame, Channel, Type, Payload}); heartbeat -> throw({unexpected_heartbeat_frame, Channel}); AnalyzedFrame -> - %%?LOGDEBUG("Ch ~p Frame ~p~n", [Channel, AnalyzedFrame]), case get({channel, Channel}) of {ch_fr_pid, ChFrPid} -> ok = rabbit_framing_channel:process(ChFrPid, AnalyzedFrame), @@ -635,18 +627,18 @@ analyze_frame(_Type, _Body, _Protocol) -> error. handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32>>, State) -> - %%?LOGDEBUG("Got frame header: ~p/~p/~p~n", [Type, Channel, PayloadSize]), - {ensure_stats_timer(State), {frame_payload, Type, Channel, PayloadSize}, - PayloadSize + 1}; + ensure_stats_timer( + switch_callback(State, {frame_payload, Type, Channel, PayloadSize}, + PayloadSize + 1)); -handle_input({frame_payload, Type, Channel, PayloadSize}, PayloadAndMarker, State) -> +handle_input({frame_payload, Type, Channel, PayloadSize}, + PayloadAndMarker, State) -> case PayloadAndMarker of <<Payload:PayloadSize/binary, ?FRAME_END>> -> - %%?LOGDEBUG("Frame completed: ~p/~p/~p~n", [Type, Channel, Payload]), - NewState = handle_frame(Type, Channel, Payload, State), - {NewState, frame_header, 7}; + handle_frame(Type, Channel, Payload, + switch_callback(State, frame_header, 7)); _ -> - throw({bad_payload, PayloadAndMarker}) + throw({bad_payload, Type, Channel, PayloadSize, PayloadAndMarker}) end; %% The two rules pertaining to version negotiation: @@ -698,11 +690,11 @@ start_connection({ProtocolMajor, ProtocolMinor, _ProtocolRevision}, mechanisms = auth_mechanisms_binary(Sock), locales = <<"en_US">> }, ok = send_on_channel0(Sock, Start, Protocol), - {State#v1{connection = Connection#connection{ - timeout_sec = ?NORMAL_TIMEOUT, - protocol = Protocol}, - connection_state = starting}, - frame_header, 7}. + switch_callback(State#v1{connection = Connection#connection{ + timeout_sec = ?NORMAL_TIMEOUT, + protocol = Protocol}, + connection_state = starting}, + frame_header, 7). refuse_connection(Sock, Exception) -> ok = inet_op(fun () -> rabbit_net:send(Sock, <<"AMQP",0,0,9,1>>) end), @@ -862,8 +854,7 @@ auth_mechanism_to_module(TypeBin, Sock) -> auth_mechanisms(Sock) -> {ok, Configured} = application:get_env(auth_mechanisms), [Name || {Name, Module} <- rabbit_registry:lookup_all(auth_mechanism), - Module:should_offer(Sock), - lists:any(fun (N) -> N == Name end, Configured)]. + Module:should_offer(Sock), lists:member(Name, Configured)]. auth_mechanisms_binary(Sock) -> list_to_binary( diff --git a/src/rabbit_ssl.erl b/src/rabbit_ssl.erl index 4452b075..a4da23e2 100644 --- a/src/rabbit_ssl.erl +++ b/src/rabbit_ssl.erl @@ -114,14 +114,11 @@ find_by_type(Type, {rdnSequence, RDNs}) -> %% Format and rdnSequence as a RFC4514 subject string. format_rdn_sequence({rdnSequence, Seq}) -> - lists:flatten( - rabbit_misc:intersperse( - ",", lists:reverse([format_complex_rdn(RDN) || RDN <- Seq]))). + string:join(lists:reverse([format_complex_rdn(RDN) || RDN <- Seq]), ","). %% Format an RDN set. format_complex_rdn(RDNs) -> - lists:flatten( - rabbit_misc:intersperse("+", [format_rdn(RDN) || RDN <- RDNs])). + string:join([format_rdn(RDN) || RDN <- RDNs], "+"). %% Format an RDN. If the type name is unknown, use the dotted decimal %% representation. See RFC4514, section 2.3. @@ -148,7 +145,7 @@ format_rdn(#'AttributeTypeAndValue'{type = T, value = V}) -> io_lib:format(Fmt ++ "=~s", [FV]); none when is_tuple(T) -> TypeL = [io_lib:format("~w", [X]) || X <- tuple_to_list(T)], - io_lib:format("~s:~s", [rabbit_misc:intersperse(".", TypeL), FV]); + io_lib:format("~s:~s", [string:join(TypeL, "."), FV]); none -> io_lib:format("~p:~s", [T, FV]) end. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 71b23e01..27e4d925 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1865,9 +1865,39 @@ test_variable_queue() -> fun test_variable_queue_partial_segments_delta_thing/1, fun test_variable_queue_all_the_bits_not_covered_elsewhere1/1, fun test_variable_queue_all_the_bits_not_covered_elsewhere2/1, - fun test_dropwhile/1]], + fun test_dropwhile/1, + fun test_variable_queue_ack_limiting/1]], passed. +test_variable_queue_ack_limiting(VQ0) -> + %% start by sending in a bunch of messages + Len = 1024, + VQ1 = variable_queue_publish(false, Len, VQ0), + + %% squeeze and relax queue + Churn = Len div 32, + VQ2 = publish_fetch_and_ack(Churn, Len, VQ1), + + %% update stats for duration + {_Duration, VQ3} = rabbit_variable_queue:ram_duration(VQ2), + + %% fetch half the messages + {VQ4, _AckTags} = variable_queue_fetch(Len div 2, false, false, Len, VQ3), + + VQ5 = check_variable_queue_status(VQ4, [{len , Len div 2}, + {ram_ack_count, Len div 2}, + {ram_msg_count, Len div 2}]), + + %% ensure all acks go to disk on 0 duration target + VQ6 = check_variable_queue_status( + rabbit_variable_queue:set_ram_duration_target(0, VQ5), + [{len, Len div 2}, + {target_ram_item_count, 0}, + {ram_msg_count, 0}, + {ram_ack_count, 0}]), + + VQ6. + test_dropwhile(VQ0) -> Count = 10, diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl index 0071a08a..27a94f6f 100644 --- a/src/rabbit_upgrade.erl +++ b/src/rabbit_upgrade.erl @@ -32,11 +32,13 @@ -ifdef(use_specs). +-type(step() :: atom()). +-type(version() :: [step()]). + -spec(maybe_upgrade/0 :: () -> 'ok' | 'version_not_available'). --spec(read_version/0 :: - () -> {'ok', [any()]} | rabbit_types:error(any())). +-spec(read_version/0 :: () -> rabbit_types:ok_or_error2(version(), any())). -spec(write_version/0 :: () -> 'ok'). --spec(desired_version/0 :: () -> [atom()]). +-spec(desired_version/0 :: () -> version()). -endif. @@ -48,26 +50,25 @@ maybe_upgrade() -> case read_version() of {ok, CurrentHeads} -> - G = load_graph(), - case unknown_heads(CurrentHeads, G) of - [] -> - case upgrades_to_apply(CurrentHeads, G) of - [] -> ok; - Upgrades -> apply_upgrades(Upgrades) - end; - Unknown -> - exit({future_upgrades_found, Unknown}) - end, - true = digraph:delete(G), - ok; + with_upgrade_graph( + fun (G) -> + case unknown_heads(CurrentHeads, G) of + [] -> case upgrades_to_apply(CurrentHeads, G) of + [] -> ok; + Upgrades -> apply_upgrades(Upgrades) + end; + Unknown -> throw({error, + {future_upgrades_found, Unknown}}) + end + end); {error, enoent} -> version_not_available end. read_version() -> case rabbit_misc:read_term_file(schema_filename()) of - {ok, [Heads]} -> {ok, Heads}; - {error, E} -> {error, E} + {ok, [Heads]} -> {ok, Heads}; + {error, _} = Err -> Err end. write_version() -> @@ -75,17 +76,26 @@ write_version() -> ok. desired_version() -> - G = load_graph(), - Version = heads(G), - true = digraph:delete(G), - Version. + with_upgrade_graph(fun (G) -> heads(G) end). %% ------------------------------------------------------------------- -load_graph() -> - Upgrades = rabbit_misc:all_module_attributes(rabbit_upgrade), - rabbit_misc:build_acyclic_graph( - fun vertices/2, fun edges/2, fun graph_build_error/1, Upgrades). +with_upgrade_graph(Fun) -> + case rabbit_misc:build_acyclic_graph( + fun vertices/2, fun edges/2, + rabbit_misc:all_module_attributes(rabbit_upgrade)) of + {ok, G} -> try + Fun(G) + after + true = digraph:delete(G) + end; + {error, {vertex, duplicate, StepName}} -> + throw({error, {duplicate_upgrade_step, StepName}}); + {error, {edge, {bad_vertex, StepName}, _From, _To}} -> + throw({error, {dependency_on_unknown_upgrade_step, StepName}}); + {error, {edge, {bad_edge, StepNames}, _From, _To}} -> + throw({error, {cycle_in_upgrade_steps, StepNames}}) + end. vertices(Module, Steps) -> [{StepName, {Module, StepName}} || {StepName, _Reqs} <- Steps]. @@ -93,10 +103,6 @@ vertices(Module, Steps) -> edges(_Module, Steps) -> [{Require, StepName} || {StepName, Requires} <- Steps, Require <- Requires]. -graph_build_error({vertex, duplicate, StepName}) -> - exit({duplicate_upgrade, StepName}); -graph_build_error({edge, E, From, To}) -> - exit({E, From, To}). unknown_heads(Heads, G) -> [H || H <- Heads, digraph:vertex(G, H) =:= false]. @@ -111,8 +117,8 @@ upgrades_to_apply(Heads, G) -> sets:from_list(digraph_utils:reaching(Heads, G)))), %% Form a subgraph from that list and find a topological ordering %% so we can invoke them in order. - [element(2, digraph:vertex(G, StepName)) - || StepName <- digraph_utils:topsort(digraph_utils:subgraph(G, Unsorted))]. + [element(2, digraph:vertex(G, StepName)) || + StepName <- digraph_utils:topsort(digraph_utils:subgraph(G, Unsorted))]. heads(G) -> lists:sort([V || V <- digraph:vertices(G), digraph:out_degree(G, V) =:= 0]). @@ -130,9 +136,9 @@ apply_upgrades(Upgrades) -> ok = write_version(), ok = file:delete(LockFile); {error, eexist} -> - exit(previous_upgrade_failed); + throw({error, previous_upgrade_failed}); {error, _} = Error -> - exit(Error) + throw(Error) end. apply_upgrade({M, F}) -> @@ -141,16 +147,12 @@ apply_upgrade({M, F}) -> %% ------------------------------------------------------------------- -schema_filename() -> - filename:join(dir(), ?VERSION_FILENAME). +dir() -> rabbit_mnesia:dir(). -lock_filename() -> - filename:join(dir(), ?LOCK_FILENAME). +schema_filename() -> filename:join(dir(), ?VERSION_FILENAME). + +lock_filename() -> filename:join(dir(), ?LOCK_FILENAME). %% NB: we cannot use rabbit_log here since it may not have been %% started yet -info(Msg, Args) -> - error_logger:info_msg(Msg, Args). - -dir() -> - rabbit_mnesia:dir(). +info(Msg, Args) -> error_logger:info_msg(Msg, Args). diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index 59b8705d..1c56d51d 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -24,28 +24,55 @@ -compile([export_all]). --rabbit_upgrade({remove_user_scope, []}). +-rabbit_upgrade({remove_user_scope, []}). +-rabbit_upgrade({hash_passwords, []}). +-rabbit_upgrade({add_ip_to_listener, []}). %% ------------------------------------------------------------------- -ifdef(use_specs). --spec(remove_user_scope/0 :: () -> 'ok'). +-spec(remove_user_scope/0 :: () -> 'ok'). +-spec(hash_passwords/0 :: () -> 'ok'). +-spec(add_ip_to_listener/0 :: () -> 'ok'). -endif. %%-------------------------------------------------------------------- +%% It's a bad idea to use records or record_info here, even for the +%% destination form. Because in the future, the destination form of +%% your current transform may not match the record any more, and it +%% would be messy to have to go back and fix old transforms at that +%% point. + remove_user_scope() -> - {atomic, ok} = mnesia:transform_table( - rabbit_user_permission, - fun (Perm = #user_permission{ - permission = {permission, - _Scope, Conf, Write, Read}}) -> - Perm#user_permission{ - permission = #permission{configure = Conf, - write = Write, - read = Read}} - end, - record_info(fields, user_permission)), + mnesia( + rabbit_user_permission, + fun ({user_permission, UV, {permission, _Scope, Conf, Write, Read}}) -> + {user_permission, UV, {permission, Conf, Write, Read}} + end, + [user_vhost, permission]). + +hash_passwords() -> + mnesia( + rabbit_user, + fun ({user, Username, Password, IsAdmin}) -> + Hash = rabbit_access_control:hash_password(Password), + {user, Username, Hash, IsAdmin} + end, + [username, password_hash, is_admin]). + +add_ip_to_listener() -> + mnesia( + rabbit_listener, + fun ({listener, Node, Protocol, Host, Port}) -> + {listener, Node, Protocol, Host, {0,0,0,0}, Port} + end, + [node, protocol, host, ip_address, port]). + +%%-------------------------------------------------------------------- + +mnesia(TableName, Fun, FieldList) -> + {atomic, ok} = mnesia:transform_table(TableName, Fun, FieldList), ok. diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 69d62fde..5ac042a2 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -89,12 +89,14 @@ %% %% The duration indicated to us by the memory_monitor is used to %% calculate, given our current ingress and egress rates, how many -%% messages we should hold in RAM. When we need to push alphas to -%% betas or betas to gammas, we favour writing out messages that are -%% further from the head of the queue. This minimises writes to disk, -%% as the messages closer to the tail of the queue stay in the queue -%% for longer, thus do not need to be replaced as quickly by sending -%% other messages to disk. +%% messages we should hold in RAM. We track the ingress and egress +%% rates for both messages and pending acks and rates for both are +%% considered when calculating the number of messages to hold in +%% RAM. When we need to push alphas to betas or betas to gammas, we +%% favour writing out messages that are further from the head of the +%% queue. This minimises writes to disk, as the messages closer to the +%% tail of the queue stay in the queue for longer, thus do not need to +%% be replaced as quickly by sending other messages to disk. %% %% Whilst messages are pushed to disk and forgotten from RAM as soon %% as requested by a new setting of the queue RAM duration, the @@ -156,7 +158,7 @@ %% The conversion from alphas to betas is also chunked, but only to %% ensure no more than ?IO_BATCH_SIZE alphas are converted to betas at %% any one time. This further smooths the effects of changes to the -%% target_ram_msg_count and ensures the queue remains responsive +%% target_ram_item_count and ensures the queue remains responsive %% even when there is a large amount of IO work to do. The %% idle_timeout callback is utilised to ensure that conversions are %% done as promptly as possible whilst ensuring the queue remains @@ -168,6 +170,29 @@ %% the latter) are both cheap and do require any scanning through qi %% segments. %% +%% Pending acks are recorded in memory either as the tuple {SeqId, +%% Guid, MsgProps} (tuple-form) or as the message itself (message- +%% form). Acks for persistent messages are always stored in the tuple- +%% form. Acks for transient messages are also stored in tuple-form if +%% the message has been sent to disk as part of the memory reduction +%% process. For transient messages that haven't already been written +%% to disk, acks are stored in message-form. +%% +%% During memory reduction, acks stored in message-form are converted +%% to tuple-form, and the corresponding messages are pushed out to +%% disk. +%% +%% The order in which alphas are pushed to betas and message-form acks +%% are pushed to disk is determined dynamically. We always prefer to +%% push messages for the source (alphas or acks) that is growing the +%% fastest (with growth measured as avg. ingress - avg. egress). In +%% each round of memory reduction a chunk of messages at most +%% ?IO_BATCH_SIZE in size is allocated to be pushed to disk. The +%% fastest growing source will be reduced by as much of this chunk as +%% possible. If there is any remaining allocation in the chunk after +%% the first source has been reduced to zero, the second source will +%% be reduced by as much of the remaining chunk as possible. +%% %% Notes on Clean Shutdown %% (This documents behaviour in variable_queue, queue_index and %% msg_store.) @@ -220,6 +245,8 @@ q4, next_seq_id, pending_ack, + pending_ack_index, + ram_ack_index, index_state, msg_store_clients, on_sync, @@ -229,13 +256,17 @@ len, persistent_count, - target_ram_msg_count, + target_ram_item_count, ram_msg_count, ram_msg_count_prev, + ram_ack_count_prev, ram_index_count, out_counter, in_counter, - rates + ack_out_counter, + ack_in_counter, + rates, + ack_rates }). -record(rates, { egress, ingress, avg_egress, avg_ingress, timestamp }). @@ -299,30 +330,34 @@ funs :: [fun (() -> any())] }). -type(state() :: #vqstate { - q1 :: queue(), - q2 :: bpqueue:bpqueue(), - delta :: delta(), - q3 :: bpqueue:bpqueue(), - q4 :: queue(), - next_seq_id :: seq_id(), - pending_ack :: dict(), - index_state :: any(), - msg_store_clients :: 'undefined' | {{any(), binary()}, + q1 :: queue(), + q2 :: bpqueue:bpqueue(), + delta :: delta(), + q3 :: bpqueue:bpqueue(), + q4 :: queue(), + next_seq_id :: seq_id(), + pending_ack :: dict(), + ram_ack_index :: gb_tree(), + index_state :: any(), + msg_store_clients :: 'undefined' | {{any(), binary()}, {any(), binary()}}, - on_sync :: sync(), - durable :: boolean(), - - len :: non_neg_integer(), - persistent_count :: non_neg_integer(), - - transient_threshold :: non_neg_integer(), - target_ram_msg_count :: non_neg_integer() | 'infinity', - ram_msg_count :: non_neg_integer(), - ram_msg_count_prev :: non_neg_integer(), - ram_index_count :: non_neg_integer(), - out_counter :: non_neg_integer(), - in_counter :: non_neg_integer(), - rates :: rates() }). + on_sync :: sync(), + durable :: boolean(), + + len :: non_neg_integer(), + persistent_count :: non_neg_integer(), + + transient_threshold :: non_neg_integer(), + target_ram_item_count :: non_neg_integer() | 'infinity', + ram_msg_count :: non_neg_integer(), + ram_msg_count_prev :: non_neg_integer(), + ram_index_count :: non_neg_integer(), + out_counter :: non_neg_integer(), + in_counter :: non_neg_integer(), + ack_out_counter :: non_neg_integer(), + ack_in_counter :: non_neg_integer(), + rates :: rates(), + ack_rates :: rates() }). -include("rabbit_backing_queue_spec.hrl"). @@ -479,19 +514,18 @@ publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent }, out_counter = OutCount, in_counter = InCount, persistent_count = PCount, - pending_ack = PA, durable = IsDurable }) -> IsPersistent1 = IsDurable andalso IsPersistent, MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, MsgProps)) #msg_status { is_delivered = true }, {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State), - PA1 = record_pending_ack(m(MsgStatus1), PA), + State2 = record_pending_ack(m(MsgStatus1), State1), PCount1 = PCount + one_if(IsPersistent1), - {SeqId, a(State1 #vqstate { next_seq_id = SeqId + 1, - out_counter = OutCount + 1, - in_counter = InCount + 1, - persistent_count = PCount1, - pending_ack = PA1 })}. + {SeqId, a(reduce_memory_use( + State2 #vqstate { next_seq_id = SeqId + 1, + out_counter = OutCount + 1, + in_counter = InCount + 1, + persistent_count = PCount1 }))}. dropwhile(Pred, State) -> {_OkOrEmpty, State1} = dropwhile1(Pred, State), @@ -561,8 +595,7 @@ internal_fetch(AckRequired, MsgStatus = #msg_status { index_state = IndexState, msg_store_clients = MSCState, len = Len, - persistent_count = PCount, - pending_ack = PA }) -> + persistent_count = PCount }) -> %% 1. Mark it delivered if necessary IndexState1 = maybe_write_delivered( IndexOnDisk andalso not IsDelivered, @@ -582,12 +615,12 @@ internal_fetch(AckRequired, MsgStatus = #msg_status { end, %% 3. If an ack is required, add something sensible to PA - {AckTag, PA1} = case AckRequired of - true -> PA2 = record_pending_ack( - MsgStatus #msg_status { - is_delivered = true }, PA), - {SeqId, PA2}; - false -> {blank_ack, PA} + {AckTag, State1} = case AckRequired of + true -> StateN = record_pending_ack( + MsgStatus #msg_status { + is_delivered = true }, State), + {SeqId, StateN}; + false -> {blank_ack, State} end, PCount1 = PCount - one_if(IsPersistent andalso not AckRequired), @@ -595,12 +628,11 @@ internal_fetch(AckRequired, MsgStatus = #msg_status { RamMsgCount1 = RamMsgCount - one_if(Msg =/= undefined), {{Msg, IsDelivered, AckTag, Len1}, - a(State #vqstate { ram_msg_count = RamMsgCount1, - out_counter = OutCount + 1, - index_state = IndexState2, - len = Len1, - persistent_count = PCount1, - pending_ack = PA1 })}. + a(State1 #vqstate { ram_msg_count = RamMsgCount1, + out_counter = OutCount + 1, + index_state = IndexState2, + len = Len1, + persistent_count = PCount1 })}. ack(AckTags, State) -> a(ack(fun msg_store_remove/3, @@ -678,40 +710,62 @@ is_empty(State) -> 0 == len(State). set_ram_duration_target(DurationTarget, State = #vqstate { - rates = #rates { avg_egress = AvgEgressRate, - avg_ingress = AvgIngressRate }, - target_ram_msg_count = TargetRamMsgCount }) -> - Rate = AvgEgressRate + AvgIngressRate, - TargetRamMsgCount1 = + rates = + #rates { avg_egress = AvgEgressRate, + avg_ingress = AvgIngressRate }, + ack_rates = + #rates { avg_egress = AvgAckEgressRate, + avg_ingress = AvgAckIngressRate }, + target_ram_item_count = TargetRamItemCount }) -> + Rate = + AvgEgressRate + AvgIngressRate + AvgAckEgressRate + AvgAckIngressRate, + TargetRamItemCount1 = case DurationTarget of infinity -> infinity; _ -> trunc(DurationTarget * Rate) %% msgs = sec * msgs/sec end, - State1 = State #vqstate { target_ram_msg_count = TargetRamMsgCount1 }, - a(case TargetRamMsgCount1 == infinity orelse - (TargetRamMsgCount =/= infinity andalso - TargetRamMsgCount1 >= TargetRamMsgCount) of + State1 = State #vqstate { target_ram_item_count = TargetRamItemCount1 }, + a(case TargetRamItemCount1 == infinity orelse + (TargetRamItemCount =/= infinity andalso + TargetRamItemCount1 >= TargetRamItemCount) of true -> State1; false -> reduce_memory_use(State1) end). ram_duration(State = #vqstate { - rates = #rates { egress = Egress, - ingress = Ingress, - timestamp = Timestamp } = Rates, + rates = #rates { timestamp = Timestamp, + egress = Egress, + ingress = Ingress } = Rates, + ack_rates = #rates { timestamp = AckTimestamp, + egress = AckEgress, + ingress = AckIngress } = ARates, in_counter = InCount, out_counter = OutCount, + ack_in_counter = AckInCount, + ack_out_counter = AckOutCount, ram_msg_count = RamMsgCount, - ram_msg_count_prev = RamMsgCountPrev }) -> + ram_msg_count_prev = RamMsgCountPrev, + ram_ack_index = RamAckIndex, + ram_ack_count_prev = RamAckCountPrev }) -> Now = now(), {AvgEgressRate, Egress1} = update_rate(Now, Timestamp, OutCount, Egress), {AvgIngressRate, Ingress1} = update_rate(Now, Timestamp, InCount, Ingress), - Duration = %% msgs / (msgs/sec) == sec - case AvgEgressRate == 0 andalso AvgIngressRate == 0 of + {AvgAckEgressRate, AckEgress1} = + update_rate(Now, AckTimestamp, AckOutCount, AckEgress), + {AvgAckIngressRate, AckIngress1} = + update_rate(Now, AckTimestamp, AckInCount, AckIngress), + + RamAckCount = gb_trees:size(RamAckIndex), + + Duration = %% msgs+acks / (msgs+acks/sec) == sec + case AvgEgressRate == 0 andalso AvgIngressRate == 0 andalso + AvgAckEgressRate == 0 andalso AvgAckIngressRate == 0 of true -> infinity; - false -> (RamMsgCountPrev + RamMsgCount) / - (2 * (AvgEgressRate + AvgIngressRate)) + false -> (RamMsgCountPrev + RamMsgCount + + RamAckCount + RamAckCountPrev) / + (4 * (AvgEgressRate + AvgIngressRate + + AvgAckEgressRate + AvgAckIngressRate)) end, {Duration, State #vqstate { @@ -721,14 +775,24 @@ ram_duration(State = #vqstate { avg_egress = AvgEgressRate, avg_ingress = AvgIngressRate, timestamp = Now }, + ack_rates = ARates #rates { + egress = AckEgress1, + ingress = AckIngress1, + avg_egress = AvgAckEgressRate, + avg_ingress = AvgAckIngressRate, + timestamp = Now }, in_counter = 0, out_counter = 0, - ram_msg_count_prev = RamMsgCount }}. + ack_in_counter = 0, + ack_out_counter = 0, + ram_msg_count_prev = RamMsgCount, + ram_ack_count_prev = RamAckCount }}. needs_idle_timeout(State = #vqstate { on_sync = ?BLANK_SYNC }) -> - {Res, _State} = reduce_memory_use(fun (_Quota, State1) -> State1 end, + {Res, _State} = reduce_memory_use(fun (_Quota, State1) -> {0, State1} end, fun (_Quota, State1) -> State1 end, fun (State1) -> State1 end, + fun (_Quota, State1) -> {0, State1} end, State), Res; needs_idle_timeout(_State) -> @@ -740,32 +804,39 @@ handle_pre_hibernate(State = #vqstate { index_state = IndexState }) -> State #vqstate { index_state = rabbit_queue_index:flush(IndexState) }. status(#vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, - len = Len, - pending_ack = PA, - on_sync = #sync { funs = From }, - target_ram_msg_count = TargetRamMsgCount, - ram_msg_count = RamMsgCount, - ram_index_count = RamIndexCount, - next_seq_id = NextSeqId, - persistent_count = PersistentCount, - rates = #rates { + len = Len, + pending_ack = PA, + ram_ack_index = RAI, + on_sync = #sync { funs = From }, + target_ram_item_count = TargetRamItemCount, + ram_msg_count = RamMsgCount, + ram_index_count = RamIndexCount, + next_seq_id = NextSeqId, + persistent_count = PersistentCount, + rates = #rates { avg_egress = AvgEgressRate, - avg_ingress = AvgIngressRate } }) -> - [ {q1 , queue:len(Q1)}, - {q2 , bpqueue:len(Q2)}, - {delta , Delta}, - {q3 , bpqueue:len(Q3)}, - {q4 , queue:len(Q4)}, - {len , Len}, - {pending_acks , dict:size(PA)}, - {outstanding_txns , length(From)}, - {target_ram_msg_count , TargetRamMsgCount}, - {ram_msg_count , RamMsgCount}, - {ram_index_count , RamIndexCount}, - {next_seq_id , NextSeqId}, - {persistent_count , PersistentCount}, - {avg_egress_rate , AvgEgressRate}, - {avg_ingress_rate , AvgIngressRate} ]. + avg_ingress = AvgIngressRate }, + ack_rates = #rates { + avg_egress = AvgAckEgressRate, + avg_ingress = AvgAckIngressRate } }) -> + [ {q1 , queue:len(Q1)}, + {q2 , bpqueue:len(Q2)}, + {delta , Delta}, + {q3 , bpqueue:len(Q3)}, + {q4 , queue:len(Q4)}, + {len , Len}, + {pending_acks , dict:size(PA)}, + {outstanding_txns , length(From)}, + {target_ram_item_count , TargetRamItemCount}, + {ram_msg_count , RamMsgCount}, + {ram_ack_count , gb_trees:size(RAI)}, + {ram_index_count , RamIndexCount}, + {next_seq_id , NextSeqId}, + {persistent_count , PersistentCount}, + {avg_ingress_rate , AvgIngressRate}, + {avg_egress_rate , AvgEgressRate}, + {avg_ack_ingress_rate , AvgAckIngressRate}, + {avg_ack_egress_rate , AvgAckEgressRate} ]. %%---------------------------------------------------------------------------- %% Minor helpers @@ -955,35 +1026,43 @@ init(IsDurable, IndexState, DeltaCount, Terms, end, Now = now(), State = #vqstate { - q1 = queue:new(), - q2 = bpqueue:new(), - delta = Delta, - q3 = bpqueue:new(), - q4 = queue:new(), - next_seq_id = NextSeqId, - pending_ack = dict:new(), - index_state = IndexState1, - msg_store_clients = {PersistentClient, TransientClient}, - on_sync = ?BLANK_SYNC, - durable = IsDurable, - transient_threshold = NextSeqId, - - len = DeltaCount1, - persistent_count = DeltaCount1, - - target_ram_msg_count = infinity, - ram_msg_count = 0, - ram_msg_count_prev = 0, - ram_index_count = 0, - out_counter = 0, - in_counter = 0, - rates = #rates { egress = {Now, 0}, - ingress = {Now, DeltaCount1}, - avg_egress = 0.0, - avg_ingress = 0.0, - timestamp = Now } }, + q1 = queue:new(), + q2 = bpqueue:new(), + delta = Delta, + q3 = bpqueue:new(), + q4 = queue:new(), + next_seq_id = NextSeqId, + pending_ack = dict:new(), + ram_ack_index = gb_trees:empty(), + index_state = IndexState1, + msg_store_clients = {PersistentClient, TransientClient}, + on_sync = ?BLANK_SYNC, + durable = IsDurable, + transient_threshold = NextSeqId, + + len = DeltaCount1, + persistent_count = DeltaCount1, + + target_ram_item_count = infinity, + ram_msg_count = 0, + ram_msg_count_prev = 0, + ram_ack_count_prev = 0, + ram_index_count = 0, + out_counter = 0, + in_counter = 0, + ack_out_counter = 0, + ack_in_counter = 0, + rates = blank_rate(Now, DeltaCount1), + ack_rates = blank_rate(Now, 0) }, a(maybe_deltas_to_betas(State)). +blank_rate(Timestamp, IngressLength) -> + #rates { egress = {Timestamp, 0}, + ingress = {Timestamp, IngressLength}, + avg_egress = 0.0, + avg_ingress = 0.0, + timestamp = Timestamp }. + msg_store_callback(PersistentGuids, Pubs, AckTags, Fun, MsgPropsFun) -> Self = self(), F = fun () -> rabbit_amqqueue:maybe_run_queue_via_backing_queue( @@ -1191,12 +1270,21 @@ record_pending_ack(#msg_status { seq_id = SeqId, guid = Guid, is_persistent = IsPersistent, msg_on_disk = MsgOnDisk, - msg_props = MsgProps } = MsgStatus, PA) -> - AckEntry = case MsgOnDisk of - true -> {IsPersistent, Guid, MsgProps}; - false -> MsgStatus - end, - dict:store(SeqId, AckEntry, PA). + msg_props = MsgProps } = MsgStatus, + State = #vqstate { pending_ack = PA, + ram_ack_index = RAI, + ack_in_counter = AckInCount}) -> + {AckEntry, RAI1} = + case MsgOnDisk of + true -> + {{IsPersistent, Guid, MsgProps}, RAI}; + false -> + {MsgStatus, gb_trees:insert(SeqId, Guid, RAI)} + end, + PA1 = dict:store(SeqId, AckEntry, PA), + State #vqstate { pending_ack = PA1, + ram_ack_index = RAI1, + ack_in_counter = AckInCount + 1}. remove_pending_ack(KeepPersistent, State = #vqstate { pending_ack = PA, @@ -1204,7 +1292,8 @@ remove_pending_ack(KeepPersistent, msg_store_clients = MSCState }) -> {SeqIds, GuidsByStore} = dict:fold(fun accumulate_ack/3, {[], orddict:new()}, PA), - State1 = State #vqstate { pending_ack = dict:new() }, + State1 = State #vqstate { pending_ack = dict:new(), + ram_ack_index = gb_trees:empty() }, case KeepPersistent of true -> case orddict:find(false, GuidsByStore) of error -> State1; @@ -1226,13 +1315,17 @@ ack(MsgStoreFun, Fun, AckTags, State) -> {{SeqIds, GuidsByStore}, State1 = #vqstate { index_state = IndexState, msg_store_clients = MSCState, - persistent_count = PCount }} = + persistent_count = PCount, + ack_out_counter = AckOutCount }} = lists:foldl( - fun (SeqId, {Acc, State2 = #vqstate { pending_ack = PA }}) -> + fun (SeqId, {Acc, State2 = #vqstate { pending_ack = PA, + ram_ack_index = RAI }}) -> AckEntry = dict:fetch(SeqId, PA), {accumulate_ack(SeqId, AckEntry, Acc), Fun(AckEntry, State2 #vqstate { - pending_ack = dict:erase(SeqId, PA) })} + pending_ack = dict:erase(SeqId, PA), + ram_ack_index = + gb_trees:delete_any(SeqId, RAI)})} end, {{[], orddict:new()}, State}, AckTags), IndexState1 = rabbit_queue_index:ack(SeqIds, IndexState), ok = orddict:fold(fun (IsPersistent, Guids, ok) -> @@ -1241,7 +1334,8 @@ ack(MsgStoreFun, Fun, AckTags, State) -> PCount1 = PCount - find_persistent_count(sum_guids_by_store_to_len( orddict:new(), GuidsByStore)), State1 #vqstate { index_state = IndexState1, - persistent_count = PCount1 }. + persistent_count = PCount1, + ack_out_counter = AckOutCount + length(AckTags) }. accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS msg_on_disk = false, @@ -1270,7 +1364,7 @@ find_persistent_count(LensByStore) -> %% though the conversion function for that is called as necessary. The %% reason is twofold. Firstly, this is safe because the conversion is %% only ever necessary just after a transition to a -%% target_ram_msg_count of zero or after an incremental alpha->beta +%% target_ram_item_count of zero or after an incremental alpha->beta %% conversion. In the former case the conversion is performed straight %% away (i.e. any betas present at the time are converted to deltas), %% and in the latter case the need for a conversion is flagged up @@ -1280,26 +1374,87 @@ find_persistent_count(LensByStore) -> %% one segment's worth of messages in q3 - and thus would risk %% perpetually reporting the need for a conversion when no such %% conversion is needed. That in turn could cause an infinite loop. -reduce_memory_use(AlphaBetaFun, BetaGammaFun, BetaDeltaFun, State) -> - {Reduce, State1} = case chunk_size(State #vqstate.ram_msg_count, - State #vqstate.target_ram_msg_count) of - 0 -> {false, State}; - S1 -> {true, AlphaBetaFun(S1, State)} - end, - case State1 #vqstate.target_ram_msg_count of - infinity -> {Reduce, State1}; - 0 -> {Reduce, BetaDeltaFun(State1)}; - _ -> case chunk_size(State1 #vqstate.ram_index_count, - permitted_ram_index_count(State1)) of - ?IO_BATCH_SIZE = S2 -> {true, BetaGammaFun(S2, State1)}; - _ -> {Reduce, State1} - end +reduce_memory_use(_AlphaBetaFun, _BetaGammaFun, _BetaDeltaFun, _AckFun, + State = #vqstate {target_ram_item_count = infinity}) -> + {false, State}; +reduce_memory_use(AlphaBetaFun, BetaGammaFun, BetaDeltaFun, AckFun, + State = #vqstate { + ram_ack_index = RamAckIndex, + ram_msg_count = RamMsgCount, + target_ram_item_count = TargetRamItemCount, + rates = #rates { + avg_ingress = AvgIngress, + avg_egress = AvgEgress }, + ack_rates = #rates { + avg_ingress = AvgAckIngress, + avg_egress = AvgAckEgress } }) -> + + {Reduce, State1} = + case chunk_size(RamMsgCount + gb_trees:size(RamAckIndex), + TargetRamItemCount) of + 0 -> + {false, State}; + S1 -> + ReduceFuns = + case (AvgAckIngress - AvgAckEgress) > + (AvgIngress - AvgEgress) of + true -> + %% ACKs are growing faster than the queue, + %% push messages from there first. + [AckFun, AlphaBetaFun]; + false -> + %% The queue is growing faster than the + %% acks, push queue messages first. + [AlphaBetaFun, AckFun] + end, + {_, State2} = + %% Both reduce functions get a chance to reduce + %% memory. The second may very well get a quota of + %% 0 if the first function managed to push out the + %% maximum number of messages. + lists:foldl( + fun (ReduceFun, {QuotaN, StateN}) -> + ReduceFun(QuotaN, StateN) + end, {S1, State}, ReduceFuns), + {true, State2} + end, + + case State1 #vqstate.target_ram_item_count of + 0 -> {Reduce, BetaDeltaFun(State1)}; + _ -> case chunk_size(State1 #vqstate.ram_index_count, + permitted_ram_index_count(State1)) of + ?IO_BATCH_SIZE = S2 -> {true, BetaGammaFun(S2, State1)}; + _ -> {Reduce, State1} + end end. +limit_ram_acks(0, State) -> + {0, State}; +limit_ram_acks(Quota, State = #vqstate { pending_ack = PA, + ram_ack_index = RAI }) -> + case gb_trees:is_empty(RAI) of + true -> + {Quota, State}; + false -> + {SeqId, Guid, RAI1} = gb_trees:take_largest(RAI), + MsgStatus = #msg_status { + guid = Guid, %% ASSERTION + is_persistent = false, %% ASSERTION + msg_props = MsgProps } = dict:fetch(SeqId, PA), + {_, State1} = maybe_write_to_disk(true, false, MsgStatus, State), + limit_ram_acks(Quota - 1, + State1 #vqstate { + pending_ack = + dict:store(SeqId, {false, Guid, MsgProps}, PA), + ram_ack_index = RAI1 }) + end. + + reduce_memory_use(State) -> {_, State1} = reduce_memory_use(fun push_alphas_to_betas/2, fun limit_ram_index/2, fun push_betas_to_deltas/1, + fun limit_ram_acks/2, State), State1. @@ -1432,9 +1587,9 @@ maybe_deltas_to_betas(State = #vqstate { end. push_alphas_to_betas(Quota, State) -> - { Quota1, State1} = maybe_push_q1_to_betas(Quota, State), - {_Quota2, State2} = maybe_push_q4_to_betas(Quota1, State1), - State2. + {Quota1, State1} = maybe_push_q1_to_betas(Quota, State), + {Quota2, State2} = maybe_push_q4_to_betas(Quota1, State1), + {Quota2, State2}. maybe_push_q1_to_betas(Quota, State = #vqstate { q1 = Q1 }) -> maybe_push_alphas_to_betas( @@ -1460,10 +1615,11 @@ maybe_push_q4_to_betas(Quota, State = #vqstate { q4 = Q4 }) -> maybe_push_alphas_to_betas(_Generator, _Consumer, Quota, _Q, State = #vqstate { - ram_msg_count = RamMsgCount, - target_ram_msg_count = TargetRamMsgCount }) + ram_msg_count = RamMsgCount, + target_ram_item_count = TargetRamItemCount }) when Quota =:= 0 orelse - TargetRamMsgCount =:= infinity orelse TargetRamMsgCount >= RamMsgCount -> + TargetRamItemCount =:= infinity orelse + TargetRamItemCount >= RamMsgCount -> {Quota, State}; maybe_push_alphas_to_betas(Generator, Consumer, Quota, Q, State) -> case Generator(Q) of |