diff options
author | Marek Majkowski <marek@rabbitmq.com> | 2010-11-02 15:58:12 +0000 |
---|---|---|
committer | Marek Majkowski <marek@rabbitmq.com> | 2010-11-02 15:58:12 +0000 |
commit | f525c97b059b62cf68361cd70e346339aa088582 (patch) | |
tree | 318832fd2cd6081cce862917d74b214d71b5538b | |
parent | 82a46e94dec53c89456af1c5efe68f9ec02271cd (diff) | |
parent | 8b94244399370a084921181ccd16567aa3e52143 (diff) | |
download | rabbitmq-server-f525c97b059b62cf68361cd70e346339aa088582.tar.gz |
bug23416 merged into default
-rw-r--r-- | Makefile | 4 | ||||
-rw-r--r-- | docs/html-to-website-xml.xsl | 2 | ||||
-rw-r--r-- | docs/rabbitmq-server.1.xml | 12 | ||||
-rw-r--r-- | docs/rabbitmq-service.xml | 12 | ||||
-rw-r--r-- | docs/remove-namespaces.xsl | 3 | ||||
-rw-r--r-- | ebin/rabbit_app.in | 2 | ||||
-rw-r--r-- | include/rabbit.hrl | 2 | ||||
-rw-r--r-- | packaging/macports/Portfile.in | 2 | ||||
-rwxr-xr-x | scripts/rabbitmq-server | 9 | ||||
-rw-r--r-- | scripts/rabbitmq-server.bat | 7 | ||||
-rw-r--r-- | scripts/rabbitmq-service.bat | 6 | ||||
-rw-r--r-- | src/delegate.erl | 25 | ||||
-rw-r--r-- | src/delegate_sup.erl | 10 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 2 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 7 | ||||
-rw-r--r-- | src/rabbit_invariable_queue.erl | 4 | ||||
-rw-r--r-- | src/rabbit_mnesia.erl | 6 | ||||
-rw-r--r-- | src/rabbit_msg_store.erl | 121 | ||||
-rw-r--r-- | src/rabbit_networking.erl | 25 | ||||
-rw-r--r-- | src/rabbit_plugin_activator.erl | 12 | ||||
-rw-r--r-- | src/rabbit_queue_index.erl | 43 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 29 | ||||
-rw-r--r-- | src/rabbit_ssl.erl | 1 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 227 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 466 |
25 files changed, 586 insertions, 453 deletions
@@ -3,6 +3,7 @@ TMPDIR ?= /tmp RABBITMQ_NODENAME ?= rabbit RABBITMQ_SERVER_START_ARGS ?= RABBITMQ_MNESIA_DIR ?= $(TMPDIR)/rabbitmq-$(RABBITMQ_NODENAME)-mnesia +RABBITMQ_PLUGINS_EXPAND_DIR ?= $(TMPDIR)/rabbitmq-$(RABBITMQ_NODENAME)-plugins-scratch RABBITMQ_LOG_BASE ?= $(TMPDIR) DEPS_FILE=deps.mk @@ -150,7 +151,8 @@ BASIC_SCRIPT_ENVIRONMENT_SETTINGS=\ RABBITMQ_NODE_IP_ADDRESS="$(RABBITMQ_NODE_IP_ADDRESS)" \ RABBITMQ_NODE_PORT="$(RABBITMQ_NODE_PORT)" \ RABBITMQ_LOG_BASE="$(RABBITMQ_LOG_BASE)" \ - RABBITMQ_MNESIA_DIR="$(RABBITMQ_MNESIA_DIR)" + RABBITMQ_MNESIA_DIR="$(RABBITMQ_MNESIA_DIR)" \ + RABBITMQ_PLUGINS_EXPAND_DIR="$(RABBITMQ_PLUGINS_EXPAND_DIR)" run: all $(BASIC_SCRIPT_ENVIRONMENT_SETTINGS) \ diff --git a/docs/html-to-website-xml.xsl b/docs/html-to-website-xml.xsl index 3e7a5326..ec8f87e5 100644 --- a/docs/html-to-website-xml.xsl +++ b/docs/html-to-website-xml.xsl @@ -17,7 +17,7 @@ <!-- Copy the root node, and munge the outer part of the page --> <xsl:template match="/html"> <xsl:processing-instruction name="xml-stylesheet">type="text/xml" href="page.xsl"</xsl:processing-instruction> -<html xmlns:doc="http://www.rabbitmq.com/namespaces/ad-hoc/doc"> +<html xmlns:doc="http://www.rabbitmq.com/namespaces/ad-hoc/doc" xmlns="http://www.w3.org/1999/xhtml"> <head> <title><xsl:value-of select="document($original)/refentry/refnamediv/refname"/><xsl:if test="document($original)/refentry/refmeta/manvolnum">(<xsl:value-of select="document($original)/refentry/refmeta/manvolnum"/>)</xsl:if> manual page</title> </head> diff --git a/docs/rabbitmq-server.1.xml b/docs/rabbitmq-server.1.xml index 921da4f1..03e76c79 100644 --- a/docs/rabbitmq-server.1.xml +++ b/docs/rabbitmq-server.1.xml @@ -98,18 +98,6 @@ Defaults to 5672. </listitem> </varlistentry> - <varlistentry> - <term>RABBITMQ_CLUSTER_CONFIG_FILE</term> - <listitem> - <para> -Defaults to <filename>/etc/rabbitmq/rabbitmq_cluster.config</filename>. If this file is -present it is used by the server to auto-configure a RabbitMQ cluster. -See the <ulink url="http://www.rabbitmq.com/clustering.html">clustering guide</ulink> -for details. - </para> - </listitem> - </varlistentry> - </variablelist> </refsect1> diff --git a/docs/rabbitmq-service.xml b/docs/rabbitmq-service.xml index 2b416e3e..e95f9889 100644 --- a/docs/rabbitmq-service.xml +++ b/docs/rabbitmq-service.xml @@ -193,18 +193,6 @@ manager. </varlistentry> <varlistentry> - <term>RABBITMQ_CLUSTER_CONFIG_FILE</term> - <listitem> - <para> -If this file is -present it is used by the server to auto-configure a RabbitMQ cluster. -See the <ulink url="http://www.rabbitmq.com/clustering.html">clustering guide</ulink> -for details. - </para> - </listitem> - </varlistentry> - - <varlistentry> <term>RABBITMQ_CONSOLE_LOG</term> <listitem> <para> diff --git a/docs/remove-namespaces.xsl b/docs/remove-namespaces.xsl index 58a1e826..7f7f3c12 100644 --- a/docs/remove-namespaces.xsl +++ b/docs/remove-namespaces.xsl @@ -1,13 +1,14 @@ <?xml version='1.0'?> <xsl:stylesheet xmlns:xsl="http://www.w3.org/1999/XSL/Transform" xmlns:doc="http://www.rabbitmq.com/namespaces/ad-hoc/doc" + xmlns="http://www.w3.org/1999/xhtml" version='1.0'> <xsl:output method="xml" /> <!-- Copy every element through with local name only --> <xsl:template match="*"> - <xsl:element name="{local-name()}"> + <xsl:element name="{local-name()}" namespace=""> <xsl:apply-templates select="@*|node()"/> </xsl:element> </xsl:template> diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in index 4be09c5a..17d05a99 100644 --- a/ebin/rabbit_app.in +++ b/ebin/rabbit_app.in @@ -29,4 +29,6 @@ {default_user_is_admin, true}, {default_vhost, <<"/">>}, {default_permissions, [<<".*">>, <<".*">>, <<".*">>]}, + {cluster_nodes, []}, + {server_properties, []}, {collect_statistics, none}]}]}. diff --git a/include/rabbit.hrl b/include/rabbit.hrl index 861a9770..a1987fb2 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -63,7 +63,7 @@ -record(binding, {source, key, destination, args = []}). -record(reverse_binding, {destination, key, source, args = []}). --record(listener, {node, protocol, host, port}). +-record(listener, {node, protocol, host, ip_address, port}). -record(basic_message, {exchange_name, routing_key, content, guid, is_persistent}). diff --git a/packaging/macports/Portfile.in b/packaging/macports/Portfile.in index f30460d3..e37a45b3 100644 --- a/packaging/macports/Portfile.in +++ b/packaging/macports/Portfile.in @@ -75,7 +75,7 @@ post-destroot { reinplace -E "s:(/etc/rabbitmq/rabbitmq.conf):${prefix}\\1:g" \ ${realsbin}/rabbitmq-env - foreach var {CONFIG_FILE CLUSTER_CONFIG_FILE LOG_BASE MNESIA_BASE PIDS_FILE} { + foreach var {CONFIG_FILE LOG_BASE MNESIA_BASE PIDS_FILE} { reinplace -E "s:^($var)=/:\\1=${prefix}/:" \ ${realsbin}/rabbitmq-multi \ ${realsbin}/rabbitmq-server \ diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server index 8e26663a..0eb7092d 100755 --- a/scripts/rabbitmq-server +++ b/scripts/rabbitmq-server @@ -35,7 +35,6 @@ NODENAME=rabbit@${HOSTNAME%%.*} SERVER_ERL_ARGS="+K true +A30 +P 1048576 \ -kernel inet_default_listen_options [{nodelay,true}] \ -kernel inet_default_connect_options [{nodelay,true}]" -CLUSTER_CONFIG_FILE=/etc/rabbitmq/rabbitmq_cluster.config CONFIG_FILE=/etc/rabbitmq/rabbitmq LOG_BASE=/var/log/rabbitmq MNESIA_BASE=/var/lib/rabbitmq/mnesia @@ -59,7 +58,6 @@ else fi [ "x" = "x$RABBITMQ_NODENAME" ] && RABBITMQ_NODENAME=${NODENAME} [ "x" = "x$RABBITMQ_SERVER_ERL_ARGS" ] && RABBITMQ_SERVER_ERL_ARGS=${SERVER_ERL_ARGS} -[ "x" = "x$RABBITMQ_CLUSTER_CONFIG_FILE" ] && RABBITMQ_CLUSTER_CONFIG_FILE=${CLUSTER_CONFIG_FILE} [ "x" = "x$RABBITMQ_CONFIG_FILE" ] && RABBITMQ_CONFIG_FILE=${CONFIG_FILE} [ "x" = "x$RABBITMQ_LOG_BASE" ] && RABBITMQ_LOG_BASE=${LOG_BASE} [ "x" = "x$RABBITMQ_MNESIA_BASE" ] && RABBITMQ_MNESIA_BASE=${MNESIA_BASE} @@ -68,6 +66,9 @@ fi [ "x" = "x$RABBITMQ_MNESIA_DIR" ] && RABBITMQ_MNESIA_DIR=${MNESIA_DIR} [ "x" = "x$RABBITMQ_MNESIA_DIR" ] && RABBITMQ_MNESIA_DIR=${RABBITMQ_MNESIA_BASE}/${RABBITMQ_NODENAME} +[ "x" = "x$RABBITMQ_PLUGINS_EXPAND_DIR" ] && RABBITMQ_PLUGINS_EXPAND_DIR=${PLUGINS_EXPAND_DIR} +[ "x" = "x$RABBITMQ_PLUGINS_EXPAND_DIR" ] && RABBITMQ_PLUGINS_EXPAND_DIR=${RABBITMQ_MNESIA_BASE}/${RABBITMQ_NODENAME}-plugins-expand + [ "x" = "x$RABBITMQ_PLUGINS_DIR" ] && RABBITMQ_PLUGINS_DIR="${RABBITMQ_HOME}/plugins" ## Log rotation @@ -89,14 +90,14 @@ if [ "x" = "x$RABBITMQ_NODE_ONLY" ]; then if erl \ -pa "$RABBITMQ_EBIN_ROOT" \ -rabbit plugins_dir "\"$RABBITMQ_PLUGINS_DIR\"" \ - -rabbit plugins_expand_dir "\"${RABBITMQ_MNESIA_DIR}/plugins-scratch\"" \ + -rabbit plugins_expand_dir "\"$RABBITMQ_PLUGINS_EXPAND_DIR\"" \ -rabbit rabbit_ebin "\"$RABBITMQ_EBIN_ROOT\"" \ -noinput \ -hidden \ -s rabbit_plugin_activator \ -extra "$@" then - RABBITMQ_BOOT_FILE="${RABBITMQ_MNESIA_DIR}/plugins-scratch/rabbit" + RABBITMQ_BOOT_FILE="${RABBITMQ_PLUGINS_EXPAND_DIR}/rabbit" RABBITMQ_EBIN_PATH="" else exit 1 diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat index 5bcbc6ba..bd4120fa 100644 --- a/scripts/rabbitmq-server.bat +++ b/scripts/rabbitmq-server.bat @@ -107,6 +107,10 @@ if "!RABBITMQ_MNESIA_DIR!"=="" ( set RABBITMQ_MNESIA_DIR=!RABBITMQ_MNESIA_BASE!/!RABBITMQ_NODENAME!-mnesia
)
+if "!RABBITMQ_PLUGINS_EXPAND_DIR!"=="" (
+ set RABBITMQ_PLUGINS_EXPAND_DIR=!RABBITMQ_MNESIA_BASE!/!RABBITMQ_NODENAME!-plugins-expand
+)
+
set RABBITMQ_PLUGINS_DIR=!TDP0!..\plugins
set RABBITMQ_EBIN_ROOT=!TDP0!..\ebin
@@ -115,7 +119,7 @@ set RABBITMQ_EBIN_ROOT=!TDP0!..\ebin -noinput -hidden ^
-s rabbit_plugin_activator ^
-rabbit plugins_dir \""!RABBITMQ_PLUGINS_DIR:\=/!"\" ^
--rabbit plugins_expand_dir \""!RABBITMQ_MNESIA_DIR:\=/!/plugins-scratch"\" ^
+-rabbit plugins_expand_dir \""!RABBITMQ_PLUGINS_EXPAND_DIR:\=/!"\" ^
-rabbit rabbit_ebin \""!RABBITMQ_EBIN_ROOT:\=/!"\" ^
-extra !STAR!
@@ -165,7 +169,6 @@ if not "!RABBITMQ_NODE_IP_ADDRESS!"=="" ( -os_mon start_disksup false ^
-os_mon start_memsup false ^
-mnesia dir \""!RABBITMQ_MNESIA_DIR!"\" ^
-!CLUSTER_CONFIG! ^
!RABBITMQ_SERVER_START_ARGS! ^
!STAR!
diff --git a/scripts/rabbitmq-service.bat b/scripts/rabbitmq-service.bat index 4b3961d4..ff25b146 100644 --- a/scripts/rabbitmq-service.bat +++ b/scripts/rabbitmq-service.bat @@ -140,6 +140,9 @@ if "!RABBITMQ_MNESIA_DIR!"=="" ( set RABBITMQ_MNESIA_DIR=!RABBITMQ_MNESIA_BASE!/!RABBITMQ_NODENAME!-mnesia
)
+if "!RABBITMQ_PLUGINS_EXPAND_DIR!"=="" (
+ set RABBITMQ_PLUGINS_EXPAND_DIR=!RABBITMQ_MNESIA_BASE!/!RABBITMQ_NODENAME!-plugins-expand
+)
if "!P1!" == "install" goto INSTALL_SERVICE
for %%i in (start stop disable enable list remove) do if "%%i" == "!P1!" goto MODIFY_SERVICE
@@ -185,7 +188,7 @@ set RABBITMQ_EBIN_ROOT=!TDP0!..\ebin -noinput -hidden ^
-s rabbit_plugin_activator ^
-rabbit plugins_dir \""!RABBITMQ_PLUGINS_DIR:\=/!"\" ^
--rabbit plugins_expand_dir \""!RABBITMQ_MNESIA_DIR:\=/!/plugins-scratch"\" ^
+-rabbit plugins_expand_dir \""!RABBITMQ_PLUGINS_EXPAND_DIR:\=/!"\" ^
-rabbit rabbit_ebin \""!RABBITMQ_EBIN_ROOT:\=/!"\" ^
-extra !STAR!
@@ -232,7 +235,6 @@ set ERLANG_SERVICE_ARGUMENTS= ^ -os_mon start_disksup false ^
-os_mon start_memsup false ^
-mnesia dir \""!RABBITMQ_MNESIA_DIR!"\" ^
-!CLUSTER_CONFIG! ^
!RABBITMQ_SERVER_START_ARGS! ^
!STAR!
diff --git a/src/delegate.erl b/src/delegate.erl index c8aa3092..11abe73b 100644 --- a/src/delegate.erl +++ b/src/delegate.erl @@ -35,7 +35,7 @@ -behaviour(gen_server2). --export([start_link/1, invoke_no_result/2, invoke/2, process_count/0]). +-export([start_link/2, invoke_no_result/2, invoke/2, process_count/0]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -44,7 +44,8 @@ -ifdef(use_specs). --spec(start_link/1 :: (non_neg_integer()) -> {'ok', pid()} | {'error', any()}). +-spec(start_link/2 :: + (atom(), non_neg_integer()) -> {'ok', pid()} | {'error', any()}). -spec(invoke_no_result/2 :: (pid() | [pid()], fun ((pid()) -> any())) -> 'ok'). -spec(invoke/2 :: (pid() | [pid()], fun ((pid()) -> A)) -> A). @@ -60,8 +61,8 @@ %%---------------------------------------------------------------------------- -start_link(Hash) -> - gen_server2:start_link({local, server(Hash)}, ?MODULE, [], []). +start_link(Prefix, Hash) -> + gen_server2:start_link({local, server(Prefix, Hash)}, ?MODULE, [], []). invoke(Pid, Fun) when is_pid(Pid) -> [Res] = invoke_per_node(split_delegate_per_node([Pid]), Fun), @@ -147,7 +148,8 @@ delegate_per_remote_node(NodePids, Fun, DelegateFun) -> local_server(Node) -> case get({delegate_local_server_name, Node}) of undefined -> - Name = server(erlang:phash2({self(), Node}, process_count())), + Name = server(outgoing, + erlang:phash2({self(), Node}, process_count())), put({delegate_local_server_name, Node}, Name), Name; Name -> Name @@ -160,17 +162,20 @@ remote_server(Node) -> {badrpc, _} -> %% Have to return something, if we're just casting %% then we don't want to blow up - server(1); + server(incoming, 1); Count -> - Name = server(erlang:phash2({self(), Node}, Count)), + Name = server(incoming, + erlang:phash2({self(), Node}, Count)), put({delegate_remote_server_name, Node}, Name), Name end; Name -> Name end. -server(Hash) -> - list_to_atom("delegate_process_" ++ integer_to_list(Hash)). +server(Prefix, Hash) -> + list_to_atom("delegate_" ++ + atom_to_list(Prefix) ++ "_" ++ + integer_to_list(Hash)). safe_invoke(Pids, Fun) when is_list(Pids) -> [safe_invoke(Pid, Fun) || Pid <- Pids]; @@ -201,7 +206,7 @@ handle_cast({thunk, Thunk}, State) -> {noreply, State, hibernate}. handle_info(_Info, State) -> - {noreply, State}. + {noreply, State, hibernate}. terminate(_Reason, _State) -> ok. diff --git a/src/delegate_sup.erl b/src/delegate_sup.erl index ff303ee2..544546f1 100644 --- a/src/delegate_sup.erl +++ b/src/delegate_sup.erl @@ -55,9 +55,11 @@ start_link() -> %%---------------------------------------------------------------------------- init(_Args) -> - {ok, {{one_for_one, 10, 10}, - [{Hash, {delegate, start_link, [Hash]}, - transient, 16#ffffffff, worker, [delegate]} || - Hash <- lists:seq(0, delegate:process_count() - 1)]}}. + {ok, {{one_for_one, 10, 10}, specs(incoming) ++ specs(outgoing)}}. + +specs(Prefix) -> + [{{Prefix, Hash}, {delegate, start_link, [Prefix, Hash]}, + transient, 16#ffffffff, worker, [delegate]} || + Hash <- lists:seq(0, delegate:process_count() - 1)]. %%---------------------------------------------------------------------------- diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 6048920e..fe2c975b 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -994,7 +994,7 @@ handle_cast(emit_stats, State = #q{stats_timer = StatsTimer}) -> emit_stats(State), State1 = State#q{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)}, assert_invariant(State1), - {noreply, State1}. + {noreply, State1, hibernate}. handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State = #q{q = #amqqueue{exclusive_owner = DownPid}}) -> diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 58c8e341..873268cd 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -233,7 +233,7 @@ handle_cast({method, Method, Content}, State) -> end; handle_cast({flushed, QPid}, State) -> - {noreply, queue_blocked(QPid, State)}; + {noreply, queue_blocked(QPid, State), hibernate}; handle_cast(terminate, State) -> {stop, normal, State}; @@ -258,11 +258,12 @@ handle_cast({deliver, ConsumerTag, AckRequired, Msg}, handle_cast(emit_stats, State = #ch{stats_timer = StatsTimer}) -> internal_emit_stats(State), {noreply, - State#ch{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)}}. + State#ch{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)}, + hibernate}. handle_info({'DOWN', _MRef, process, QPid, _Reason}, State) -> erase_queue_stats(QPid), - {noreply, queue_blocked(QPid, State)}. + {noreply, queue_blocked(QPid, State), hibernate}. handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) -> ok = clear_permission_cache(), diff --git a/src/rabbit_invariable_queue.erl b/src/rabbit_invariable_queue.erl index 9855f18c..671634c4 100644 --- a/src/rabbit_invariable_queue.erl +++ b/src/rabbit_invariable_queue.erl @@ -298,8 +298,8 @@ persist_acks(QName, true, Txn, AckTags, PA) -> persist_work(Txn, QName, [{ack, {QName, Guid}} || Guid <- AckTags, begin - {ok, {Msg, _MsgProps}} - = dict:find(Guid, PA), + {Msg, _MsgProps} + = dict:fetch(Guid, PA), Msg #basic_message.is_persistent end]); persist_acks(_QName, _IsDurable, _Txn, _AckTags, _PA) -> diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 577d206d..8de2f0d6 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -340,10 +340,8 @@ read_cluster_nodes_config() -> case rabbit_misc:read_term_file(FileName) of {ok, [ClusterNodes]} -> ClusterNodes; {error, enoent} -> - case application:get_env(cluster_nodes) of - undefined -> []; - {ok, ClusterNodes} -> ClusterNodes - end; + {ok, ClusterNodes} = application:get_env(rabbit, cluster_nodes), + ClusterNodes; {error, Reason} -> throw({error, {cannot_read_cluster_nodes_config, FileName, Reason}}) diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index b9f3e1a3..682a7faa 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -34,8 +34,9 @@ -behaviour(gen_server2). -export([start_link/4, successfully_recovered_state/1, - client_init/2, client_terminate/2, client_delete_and_terminate/3, - write/4, read/3, contains/2, remove/2, release/2, sync/3]). + client_init/2, client_terminate/1, client_delete_and_terminate/1, + client_ref/1, + write/3, read/2, contains/2, remove/2, release/2, sync/3]). -export([sync/1, set_maximum_since_use/2, has_readers/2, combine_files/3, delete_file/2]). %% internal @@ -86,7 +87,9 @@ }). -record(client_msstate, - { file_handle_cache, + { server, + client_ref, + file_handle_cache, index_state, index_module, dir, @@ -114,16 +117,19 @@ -export_type([gc_state/0, file_num/0]). --opaque(gc_state() :: #gc_state { dir :: file:filename(), - index_module :: atom(), - index_state :: any(), - file_summary_ets :: ets:tid(), - msg_store :: server() - }). +-type(gc_state() :: #gc_state { dir :: file:filename(), + index_module :: atom(), + index_state :: any(), + file_summary_ets :: ets:tid(), + msg_store :: server() + }). -type(server() :: pid() | atom()). +-type(client_ref() :: binary()). -type(file_num() :: non_neg_integer()). -type(client_msstate() :: #client_msstate { + server :: server(), + client_ref :: client_ref(), file_handle_cache :: dict:dictionary(), index_state :: any(), index_module :: atom(), @@ -141,18 +147,18 @@ (atom(), file:filename(), [binary()] | 'undefined', startup_fun_state()) -> rabbit_types:ok_pid_or_error()). -spec(successfully_recovered_state/1 :: (server()) -> boolean()). --spec(client_init/2 :: (server(), binary()) -> client_msstate()). --spec(client_terminate/2 :: (client_msstate(), server()) -> 'ok'). --spec(client_delete_and_terminate/3 :: - (client_msstate(), server(), binary()) -> 'ok'). --spec(write/4 :: (server(), rabbit_guid:guid(), msg(), client_msstate()) -> - rabbit_types:ok(client_msstate())). --spec(read/3 :: (server(), rabbit_guid:guid(), client_msstate()) -> - {rabbit_types:ok(msg()) | 'not_found', client_msstate()}). --spec(contains/2 :: (server(), rabbit_guid:guid()) -> boolean()). --spec(remove/2 :: (server(), [rabbit_guid:guid()]) -> 'ok'). --spec(release/2 :: (server(), [rabbit_guid:guid()]) -> 'ok'). --spec(sync/3 :: (server(), [rabbit_guid:guid()], fun (() -> any())) -> 'ok'). +-spec(client_init/2 :: (server(), client_ref()) -> client_msstate()). +-spec(client_terminate/1 :: (client_msstate()) -> 'ok'). +-spec(client_delete_and_terminate/1 :: (client_msstate()) -> 'ok'). +-spec(client_ref/1 :: (client_msstate()) -> client_ref()). +-spec(write/3 :: (rabbit_guid:guid(), msg(), client_msstate()) -> 'ok'). +-spec(read/2 :: (rabbit_guid:guid(), client_msstate()) -> + {rabbit_types:ok(msg()) | 'not_found', client_msstate()}). +-spec(contains/2 :: (rabbit_guid:guid(), client_msstate()) -> boolean()). +-spec(remove/2 :: ([rabbit_guid:guid()], client_msstate()) -> 'ok'). +-spec(release/2 :: ([rabbit_guid:guid()], client_msstate()) -> 'ok'). +-spec(sync/3 :: ([rabbit_guid:guid()], fun (() -> any()), client_msstate()) -> + 'ok'). -spec(sync/1 :: (server()) -> 'ok'). -spec(set_maximum_since_use/2 :: (server(), non_neg_integer()) -> 'ok'). @@ -332,7 +338,9 @@ client_init(Server, Ref) -> {IState, IModule, Dir, GCPid, FileHandlesEts, FileSummaryEts, DedupCacheEts, CurFileCacheEts} = gen_server2:call(Server, {new_client_state, Ref}, infinity), - #client_msstate { file_handle_cache = dict:new(), + #client_msstate { server = Server, + client_ref = Ref, + file_handle_cache = dict:new(), index_state = IState, index_module = IModule, dir = Dir, @@ -342,20 +350,22 @@ client_init(Server, Ref) -> dedup_cache_ets = DedupCacheEts, cur_file_cache_ets = CurFileCacheEts }. -client_terminate(CState, Server) -> +client_terminate(CState) -> close_all_handles(CState), - ok = gen_server2:call(Server, client_terminate, infinity). + ok = server_call(CState, client_terminate). -client_delete_and_terminate(CState, Server, Ref) -> +client_delete_and_terminate(CState = #client_msstate { client_ref = Ref }) -> close_all_handles(CState), - ok = gen_server2:cast(Server, {client_delete, Ref}). + ok = server_cast(CState, {client_delete, Ref}). -write(Server, Guid, Msg, +client_ref(#client_msstate { client_ref = Ref }) -> Ref. + +write(Guid, Msg, CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts }) -> ok = update_msg_cache(CurFileCacheEts, Guid, Msg), - {gen_server2:cast(Server, {write, Guid}), CState}. + ok = server_cast(CState, {write, Guid}). -read(Server, Guid, +read(Guid, CState = #client_msstate { dedup_cache_ets = DedupCacheEts, cur_file_cache_ets = CurFileCacheEts }) -> %% 1. Check the dedup cache @@ -364,13 +374,12 @@ read(Server, Guid, %% 2. Check the cur file cache case ets:lookup(CurFileCacheEts, Guid) of [] -> - Defer = fun() -> {gen_server2:call( - Server, {read, Guid}, infinity), - CState} end, + Defer = fun() -> + {server_call(CState, {read, Guid}), CState} + end, case index_lookup_positive_ref_count(Guid, CState) of not_found -> Defer(); - MsgLocation -> client_read1(Server, MsgLocation, Defer, - CState) + MsgLocation -> client_read1(MsgLocation, Defer, CState) end; [{Guid, Msg, _CacheRefCount}] -> %% Although we've found it, we don't know the @@ -381,12 +390,12 @@ read(Server, Guid, {{ok, Msg}, CState} end. -contains(Server, Guid) -> gen_server2:call(Server, {contains, Guid}, infinity). -remove(_Server, []) -> ok; -remove(Server, Guids) -> gen_server2:cast(Server, {remove, Guids}). -release(_Server, []) -> ok; -release(Server, Guids) -> gen_server2:cast(Server, {release, Guids}). -sync(Server, Guids, K) -> gen_server2:cast(Server, {sync, Guids, K}). +contains(Guid, CState) -> server_call(CState, {contains, Guid}). +remove([], _CState) -> ok; +remove(Guids, CState) -> server_cast(CState, {remove, Guids}). +release([], _CState) -> ok; +release(Guids, CState) -> server_cast(CState, {release, Guids}). +sync(Guids, K, CState) -> server_cast(CState, {sync, Guids, K}). sync(Server) -> gen_server2:cast(Server, sync). @@ -398,18 +407,22 @@ set_maximum_since_use(Server, Age) -> %% Client-side-only helpers %%---------------------------------------------------------------------------- -client_read1(Server, - #msg_location { guid = Guid, file = File } = MsgLocation, - Defer, +server_call(#client_msstate { server = Server }, Msg) -> + gen_server2:call(Server, Msg, infinity). + +server_cast(#client_msstate { server = Server }, Msg) -> + gen_server2:cast(Server, Msg). + +client_read1(#msg_location { guid = Guid, file = File } = MsgLocation, Defer, CState = #client_msstate { file_summary_ets = FileSummaryEts }) -> case ets:lookup(FileSummaryEts, File) of [] -> %% File has been GC'd and no longer exists. Go around again. - read(Server, Guid, CState); + read(Guid, CState); [#file_summary { locked = Locked, right = Right }] -> - client_read2(Server, Locked, Right, MsgLocation, Defer, CState) + client_read2(Locked, Right, MsgLocation, Defer, CState) end. -client_read2(_Server, false, undefined, _MsgLocation, Defer, _CState) -> +client_read2(false, undefined, _MsgLocation, Defer, _CState) -> %% Although we've already checked both caches and not found the %% message there, the message is apparently in the %% current_file. We can only arrive here if we are trying to read @@ -420,12 +433,12 @@ client_read2(_Server, false, undefined, _MsgLocation, Defer, _CState) -> %% contents of the current file, thus reads from the current file %% will end up here and will need to be deferred. Defer(); -client_read2(_Server, true, _Right, _MsgLocation, Defer, _CState) -> +client_read2(true, _Right, _MsgLocation, Defer, _CState) -> %% Of course, in the mean time, the GC could have run and our msg %% is actually in a different file, unlocked. However, defering is %% the safest and simplest thing to do. Defer(); -client_read2(Server, false, _Right, +client_read2(false, _Right, MsgLocation = #msg_location { guid = Guid, file = File }, Defer, CState = #client_msstate { file_summary_ets = FileSummaryEts }) -> @@ -434,10 +447,10 @@ client_read2(Server, false, _Right, %% finished. safe_ets_update_counter( FileSummaryEts, File, {#file_summary.readers, +1}, - fun (_) -> client_read3(Server, MsgLocation, Defer, CState) end, - fun () -> read(Server, Guid, CState) end). + fun (_) -> client_read3(MsgLocation, Defer, CState) end, + fun () -> read(Guid, CState) end). -client_read3(Server, #msg_location { guid = Guid, file = File }, Defer, +client_read3(#msg_location { guid = Guid, file = File }, Defer, CState = #client_msstate { file_handles_ets = FileHandlesEts, file_summary_ets = FileSummaryEts, dedup_cache_ets = DedupCacheEts, @@ -461,7 +474,7 @@ client_read3(Server, #msg_location { guid = Guid, file = File }, Defer, %% too). case ets:lookup(FileSummaryEts, File) of [] -> %% GC has deleted our file, just go round again. - read(Server, Guid, CState); + read(Guid, CState); [#file_summary { locked = true }] -> %% If we get a badarg here, then the GC has finished and %% deleted our file. Try going around again. Otherwise, @@ -472,7 +485,7 @@ client_read3(Server, #msg_location { guid = Guid, file = File }, Defer, %% unlocks the dest) try Release(), Defer() - catch error:badarg -> read(Server, Guid, CState) + catch error:badarg -> read(Guid, CState) end; [#file_summary { locked = false }] -> %% Ok, we're definitely safe to continue - a GC involving @@ -497,7 +510,7 @@ client_read3(Server, #msg_location { guid = Guid, file = File }, Defer, {{ok, Msg}, CState2}; #msg_location {} = MsgLocation -> %% different file! Release(), %% this MUST NOT fail with badarg - client_read1(Server, MsgLocation, Defer, CState); + client_read1(MsgLocation, Defer, CState); not_found -> %% it seems not to exist. Defer, just to be sure. try Release() %% this can badarg, same as locked case, above catch error:badarg -> ok diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index db5c71f6..03a9b386 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -41,7 +41,7 @@ %%used by TCP-based transports, e.g. STOMP adapter -export([check_tcp_listener_address/3]). --export([tcp_listener_started/2, tcp_listener_stopped/2, +-export([tcp_listener_started/3, tcp_listener_stopped/3, start_client/1, start_ssl_client/2]). -include("rabbit.hrl"). @@ -160,14 +160,14 @@ check_tcp_listener_address(NamePrefix, Host, Port) -> {IPAddress, Name}. start_tcp_listener(Host, Port) -> - start_listener(Host, Port, "TCP Listener", + start_listener(Host, Port, amqp, "TCP Listener", {?MODULE, start_client, []}). start_ssl_listener(Host, Port, SslOpts) -> - start_listener(Host, Port, "SSL Listener", + start_listener(Host, Port, 'amqp/ssl', "SSL Listener", {?MODULE, start_ssl_client, [SslOpts]}). -start_listener(Host, Port, Label, OnConnect) -> +start_listener(Host, Port, Protocol, Label, OnConnect) -> {IPAddress, Name} = check_tcp_listener_address(rabbit_tcp_listener_sup, Host, Port), {ok,_} = supervisor:start_child( @@ -175,8 +175,8 @@ start_listener(Host, Port, Label, OnConnect) -> {Name, {tcp_listener_sup, start_link, [IPAddress, Port, ?RABBIT_TCP_OPTS , - {?MODULE, tcp_listener_started, []}, - {?MODULE, tcp_listener_stopped, []}, + {?MODULE, tcp_listener_started, [Protocol]}, + {?MODULE, tcp_listener_stopped, [Protocol]}, OnConnect, Label]}, transient, infinity, supervisor, [tcp_listener_sup]}), ok. @@ -188,20 +188,25 @@ stop_tcp_listener(Host, Port) -> ok = supervisor:delete_child(rabbit_sup, Name), ok. -tcp_listener_started(IPAddress, Port) -> +tcp_listener_started(Protocol, IPAddress, Port) -> + %% We need the ip to distinguish e.g. 0.0.0.0 and 127.0.0.1 + %% We need the host so we can distinguish multiple instances of the above + %% in a cluster. ok = mnesia:dirty_write( rabbit_listener, #listener{node = node(), - protocol = tcp, + protocol = Protocol, host = tcp_host(IPAddress), + ip_address = IPAddress, port = Port}). -tcp_listener_stopped(IPAddress, Port) -> +tcp_listener_stopped(Protocol, IPAddress, Port) -> ok = mnesia:dirty_delete_object( rabbit_listener, #listener{node = node(), - protocol = tcp, + protocol = Protocol, host = tcp_host(IPAddress), + ip_address = IPAddress, port = Port}). active_listeners() -> diff --git a/src/rabbit_plugin_activator.erl b/src/rabbit_plugin_activator.erl index 88300ab4..ef81ddf2 100644 --- a/src/rabbit_plugin_activator.erl +++ b/src/rabbit_plugin_activator.erl @@ -33,8 +33,6 @@ -export([start/0, stop/0]). --define(DefaultPluginDir, "plugins"). --define(DefaultUnpackedPluginDir, "priv/plugins"). -define(BaseApps, [rabbit]). %%---------------------------------------------------------------------------- @@ -56,8 +54,8 @@ start() -> application:load(rabbit), %% Determine our various directories - PluginDir = get_env(plugins_dir, ?DefaultPluginDir), - UnpackedPluginDir = get_env(plugins_expand_dir, ?DefaultUnpackedPluginDir), + {ok, PluginDir} = application:get_env(rabbit, plugins_dir), + {ok, UnpackedPluginDir} = application:get_env(rabbit, plugins_expand_dir), RootName = UnpackedPluginDir ++ "/rabbit", @@ -142,12 +140,6 @@ start() -> stop() -> ok. -get_env(Key, Default) -> - case application:get_env(rabbit, Key) of - {ok, V} -> V; - _ -> Default - end. - determine_version(App) -> application:load(App), {ok, Vsn} = application:get_key(App, vsn), diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 1b837128..28d0b47d 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -31,8 +31,9 @@ -module(rabbit_queue_index). --export([init/4, terminate/2, delete_and_terminate/1, publish/5, - deliver/2, ack/2, sync/2, flush/1, read/3, +-export([init/1, shutdown_terms/1, recover/4, + terminate/2, delete_and_terminate/1, + publish/5, deliver/2, ack/2, sync/2, flush/1, read/3, next_segment_boundary/1, bounds/1, recover/1]). -define(CLEAN_FILENAME, "clean.dot"). @@ -199,10 +200,13 @@ -type(startup_fun_state() :: {(fun ((A) -> 'finished' | {rabbit_guid:guid(), non_neg_integer(), A})), A}). +-type(shutdown_terms() :: [any()]). --spec(init/4 :: (rabbit_amqqueue:name(), boolean(), boolean(), - fun ((rabbit_guid:guid()) -> boolean())) -> - {'undefined' | non_neg_integer(), [any()], qistate()}). +-spec(init/1 :: (rabbit_amqqueue:name()) -> qistate()). +-spec(shutdown_terms/1 :: (rabbit_amqqueue:name()) -> shutdown_terms()). +-spec(recover/4 :: (rabbit_amqqueue:name(), shutdown_terms(), boolean(), + fun ((rabbit_guid:guid()) -> boolean())) -> + {'undefined' | non_neg_integer(), qistate()}). -spec(terminate/2 :: ([any()], qistate()) -> qistate()). -spec(delete_and_terminate/1 :: (qistate()) -> qistate()). -spec(publish/5 :: (rabbit_guid:guid(), seq_id(), @@ -229,25 +233,26 @@ %% public API %%---------------------------------------------------------------------------- -init(Name, false, _MsgStoreRecovered, _ContainsCheckFun) -> +init(Name) -> State = #qistate { dir = Dir } = blank_state(Name), false = filelib:is_file(Dir), %% is_file == is file or dir - {0, [], State}; + State. + +shutdown_terms(Name) -> + #qistate { dir = Dir } = blank_state(Name), + case read_shutdown_terms(Dir) of + {error, _} -> []; + {ok, Terms1} -> Terms1 + end. -init(Name, true, MsgStoreRecovered, ContainsCheckFun) -> +recover(Name, Terms, MsgStoreRecovered, ContainsCheckFun) -> State = #qistate { dir = Dir } = blank_state(Name), - Terms = case read_shutdown_terms(Dir) of - {error, _} -> []; - {ok, Terms1} -> Terms1 - end, CleanShutdown = detect_clean_shutdown(Dir), - {Count, State1} = - case CleanShutdown andalso MsgStoreRecovered of - true -> RecoveredCounts = proplists:get_value(segments, Terms, []), - init_clean(RecoveredCounts, State); - false -> init_dirty(CleanShutdown, ContainsCheckFun, State) - end, - {Count, Terms, State1}. + case CleanShutdown andalso MsgStoreRecovered of + true -> RecoveredCounts = proplists:get_value(segments, Terms, []), + init_clean(RecoveredCounts, State); + false -> init_dirty(CleanShutdown, ContainsCheckFun, State) + end. terminate(Terms, State) -> {SegmentCounts, State1 = #qistate { dir = Dir }} = terminate(State), diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 29004bd5..127467bb 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -235,12 +235,29 @@ conserve_memory(Pid, Conserve) -> server_properties() -> {ok, Product} = application:get_key(rabbit, id), {ok, Version} = application:get_key(rabbit, vsn), - [{list_to_binary(K), longstr, list_to_binary(V)} || - {K, V} <- [{"product", Product}, - {"version", Version}, - {"platform", "Erlang/OTP"}, - {"copyright", ?COPYRIGHT_MESSAGE}, - {"information", ?INFORMATION_MESSAGE}]]. + + %% Get any configuration-specified server properties + {ok, RawConfigServerProps} = application:get_env(rabbit, + server_properties), + + %% Normalize the simplifed (2-tuple) and unsimplified (3-tuple) forms + %% from the config and merge them with the generated built-in properties + NormalizedConfigServerProps = + [case X of + {KeyAtom, Value} -> {list_to_binary(atom_to_list(KeyAtom)), + longstr, + list_to_binary(Value)}; + {BinKey, Type, Value} -> {BinKey, Type, Value} + end || X <- RawConfigServerProps ++ + [{product, Product}, + {version, Version}, + {platform, "Erlang/OTP"}, + {copyright, ?COPYRIGHT_MESSAGE}, + {information, ?INFORMATION_MESSAGE}]], + + %% Filter duplicated properties in favor of config file provided values + lists:usort(fun ({K1,_,_}, {K2,_,_}) -> K1 =< K2 end, + NormalizedConfigServerProps). inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F). diff --git a/src/rabbit_ssl.erl b/src/rabbit_ssl.erl index be451af6..4335dd2e 100644 --- a/src/rabbit_ssl.erl +++ b/src/rabbit_ssl.erl @@ -34,7 +34,6 @@ -include("rabbit.hrl"). -include_lib("public_key/include/public_key.hrl"). --include_lib("ssl/src/ssl_int.hrl"). -export([peer_cert_issuer/1, peer_cert_subject/1, peer_cert_validity/1]). diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index f2a65eeb..71b23e01 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -73,6 +73,7 @@ all_tests() -> passed = test_user_management(), passed = test_server_status(), passed = maybe_run_cluster_dependent_tests(), + passed = test_configurable_server_properties(), passed. maybe_run_cluster_dependent_tests() -> @@ -1426,17 +1427,17 @@ restart_msg_store_empty() -> guid_bin(X) -> erlang:md5(term_to_binary(X)). -msg_store_contains(Atom, Guids) -> +msg_store_contains(Atom, Guids, MSCState) -> Atom = lists:foldl( fun (Guid, Atom1) when Atom1 =:= Atom -> - rabbit_msg_store:contains(?PERSISTENT_MSG_STORE, Guid) end, + rabbit_msg_store:contains(Guid, MSCState) end, Atom, Guids). -msg_store_sync(Guids) -> +msg_store_sync(Guids, MSCState) -> Ref = make_ref(), Self = self(), - ok = rabbit_msg_store:sync(?PERSISTENT_MSG_STORE, Guids, - fun () -> Self ! {sync, Ref} end), + ok = rabbit_msg_store:sync(Guids, fun () -> Self ! {sync, Ref} end, + MSCState), receive {sync, Ref} -> ok after @@ -1448,55 +1449,64 @@ msg_store_sync(Guids) -> msg_store_read(Guids, MSCState) -> lists:foldl(fun (Guid, MSCStateM) -> {{ok, Guid}, MSCStateN} = rabbit_msg_store:read( - ?PERSISTENT_MSG_STORE, Guid, MSCStateM), MSCStateN end, MSCState, Guids). msg_store_write(Guids, MSCState) -> - lists:foldl(fun (Guid, {ok, MSCStateN}) -> - rabbit_msg_store:write(?PERSISTENT_MSG_STORE, - Guid, Guid, MSCStateN) - end, {ok, MSCState}, Guids). + ok = lists:foldl( + fun (Guid, ok) -> rabbit_msg_store:write(Guid, Guid, MSCState) end, + ok, Guids). + +msg_store_remove(Guids, MSCState) -> + rabbit_msg_store:remove(Guids, MSCState). + +msg_store_remove(MsgStore, Ref, Guids) -> + with_msg_store_client(MsgStore, Ref, + fun (MSCStateM) -> + ok = msg_store_remove(Guids, MSCStateM), + MSCStateM + end). -msg_store_remove(Guids) -> - rabbit_msg_store:remove(?PERSISTENT_MSG_STORE, Guids). +with_msg_store_client(MsgStore, Ref, Fun) -> + rabbit_msg_store:client_terminate( + Fun(rabbit_msg_store:client_init(MsgStore, Ref))). foreach_with_msg_store_client(MsgStore, Ref, Fun, L) -> rabbit_msg_store:client_terminate( - lists:foldl(fun (Guid, MSCState) -> Fun(Guid, MsgStore, MSCState) end, - rabbit_msg_store:client_init(MsgStore, Ref), L), MsgStore). + lists:foldl(fun (Guid, MSCState) -> Fun(Guid, MSCState) end, + rabbit_msg_store:client_init(MsgStore, Ref), L)). test_msg_store() -> restart_msg_store_empty(), Self = self(), Guids = [guid_bin(M) || M <- lists:seq(1,100)], {Guids1stHalf, Guids2ndHalf} = lists:split(50, Guids), - %% check we don't contain any of the msgs we're about to publish - false = msg_store_contains(false, Guids), Ref = rabbit_guid:guid(), MSCState = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref), + %% check we don't contain any of the msgs we're about to publish + false = msg_store_contains(false, Guids, MSCState), %% publish the first half - {ok, MSCState1} = msg_store_write(Guids1stHalf, MSCState), + ok = msg_store_write(Guids1stHalf, MSCState), %% sync on the first half - ok = msg_store_sync(Guids1stHalf), + ok = msg_store_sync(Guids1stHalf, MSCState), %% publish the second half - {ok, MSCState2} = msg_store_write(Guids2ndHalf, MSCState1), + ok = msg_store_write(Guids2ndHalf, MSCState), %% sync on the first half again - the msg_store will be dirty, but %% we won't need the fsync - ok = msg_store_sync(Guids1stHalf), + ok = msg_store_sync(Guids1stHalf, MSCState), %% check they're all in there - true = msg_store_contains(true, Guids), + true = msg_store_contains(true, Guids, MSCState), %% publish the latter half twice so we hit the caching and ref count code - {ok, MSCState3} = msg_store_write(Guids2ndHalf, MSCState2), + ok = msg_store_write(Guids2ndHalf, MSCState), %% check they're still all in there - true = msg_store_contains(true, Guids), + true = msg_store_contains(true, Guids, MSCState), %% sync on the 2nd half, but do lots of individual syncs to try %% and cause coalescing to happen ok = lists:foldl( fun (Guid, ok) -> rabbit_msg_store:sync( - ?PERSISTENT_MSG_STORE, - [Guid], fun () -> Self ! {sync, Guid} end) + [Guid], fun () -> Self ! {sync, Guid} end, + MSCState) end, ok, Guids2ndHalf), lists:foldl( fun(Guid, ok) -> @@ -1511,24 +1521,24 @@ test_msg_store() -> end, ok, Guids2ndHalf), %% it's very likely we're not dirty here, so the 1st half sync %% should hit a different code path - ok = msg_store_sync(Guids1stHalf), + ok = msg_store_sync(Guids1stHalf, MSCState), %% read them all - MSCState4 = msg_store_read(Guids, MSCState3), + MSCState1 = msg_store_read(Guids, MSCState), %% read them all again - this will hit the cache, not disk - MSCState5 = msg_store_read(Guids, MSCState4), + MSCState2 = msg_store_read(Guids, MSCState1), %% remove them all - ok = rabbit_msg_store:remove(?PERSISTENT_MSG_STORE, Guids), + ok = rabbit_msg_store:remove(Guids, MSCState2), %% check first half doesn't exist - false = msg_store_contains(false, Guids1stHalf), + false = msg_store_contains(false, Guids1stHalf, MSCState2), %% check second half does exist - true = msg_store_contains(true, Guids2ndHalf), + true = msg_store_contains(true, Guids2ndHalf, MSCState2), %% read the second half again - MSCState6 = msg_store_read(Guids2ndHalf, MSCState5), + MSCState3 = msg_store_read(Guids2ndHalf, MSCState2), %% release the second half, just for fun (aka code coverage) - ok = rabbit_msg_store:release(?PERSISTENT_MSG_STORE, Guids2ndHalf), + ok = rabbit_msg_store:release(Guids2ndHalf, MSCState3), %% read the second half again, just for fun (aka code coverage) - MSCState7 = msg_store_read(Guids2ndHalf, MSCState6), - ok = rabbit_msg_store:client_terminate(MSCState7, ?PERSISTENT_MSG_STORE), + MSCState4 = msg_store_read(Guids2ndHalf, MSCState3), + ok = rabbit_msg_store:client_terminate(MSCState4), %% stop and restart, preserving every other msg in 2nd half ok = rabbit_variable_queue:stop_msg_store(), ok = rabbit_variable_queue:start_msg_store( @@ -1539,22 +1549,26 @@ test_msg_store() -> ([Guid|GuidsTail]) -> {Guid, 0, GuidsTail} end, Guids2ndHalf}), + MSCState5 = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref), %% check we have the right msgs left lists:foldl( fun (Guid, Bool) -> - not(Bool = rabbit_msg_store:contains(?PERSISTENT_MSG_STORE, Guid)) + not(Bool = rabbit_msg_store:contains(Guid, MSCState5)) end, false, Guids2ndHalf), + ok = rabbit_msg_store:client_terminate(MSCState5), %% restart empty restart_msg_store_empty(), + MSCState6 = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref), %% check we don't contain any of the msgs - false = msg_store_contains(false, Guids), + false = msg_store_contains(false, Guids, MSCState6), %% publish the first half again - MSCState8 = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref), - {ok, MSCState9} = msg_store_write(Guids1stHalf, MSCState8), + ok = msg_store_write(Guids1stHalf, MSCState6), %% this should force some sort of sync internally otherwise misread ok = rabbit_msg_store:client_terminate( - msg_store_read(Guids1stHalf, MSCState9), ?PERSISTENT_MSG_STORE), - ok = rabbit_msg_store:remove(?PERSISTENT_MSG_STORE, Guids1stHalf), + msg_store_read(Guids1stHalf, MSCState6)), + MSCState7 = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref), + ok = rabbit_msg_store:remove(Guids1stHalf, MSCState7), + ok = rabbit_msg_store:client_terminate(MSCState7), %% restart empty restart_msg_store_empty(), %% now safe to reuse guids %% push a lot of msgs in... at least 100 files worth @@ -1563,31 +1577,39 @@ test_msg_store() -> BigCount = trunc(100 * FileSize / (PayloadSizeBits div 8)), GuidsBig = [guid_bin(X) || X <- lists:seq(1, BigCount)], Payload = << 0:PayloadSizeBits >>, - ok = foreach_with_msg_store_client( + ok = with_msg_store_client( ?PERSISTENT_MSG_STORE, Ref, - fun (Guid, MsgStore, MSCStateM) -> - {ok, MSCStateN} = rabbit_msg_store:write( - MsgStore, Guid, Payload, MSCStateM), - MSCStateN - end, GuidsBig), + fun (MSCStateM) -> + [ok = rabbit_msg_store:write(Guid, Payload, MSCStateM) || + Guid <- GuidsBig], + MSCStateM + end), %% now read them to ensure we hit the fast client-side reading ok = foreach_with_msg_store_client( ?PERSISTENT_MSG_STORE, Ref, - fun (Guid, MsgStore, MSCStateM) -> + fun (Guid, MSCStateM) -> {{ok, Payload}, MSCStateN} = rabbit_msg_store:read( - MsgStore, Guid, MSCStateM), + Guid, MSCStateM), MSCStateN end, GuidsBig), %% .., then 3s by 1... - ok = msg_store_remove([guid_bin(X) || X <- lists:seq(BigCount, 1, -3)]), + ok = msg_store_remove(?PERSISTENT_MSG_STORE, Ref, + [guid_bin(X) || X <- lists:seq(BigCount, 1, -3)]), %% .., then remove 3s by 2, from the young end first. This hits %% GC (under 50% good data left, but no empty files. Must GC). - ok = msg_store_remove([guid_bin(X) || X <- lists:seq(BigCount-1, 1, -3)]), + ok = msg_store_remove(?PERSISTENT_MSG_STORE, Ref, + [guid_bin(X) || X <- lists:seq(BigCount-1, 1, -3)]), %% .., then remove 3s by 3, from the young end first. This hits %% GC... - ok = msg_store_remove([guid_bin(X) || X <- lists:seq(BigCount-2, 1, -3)]), + ok = msg_store_remove(?PERSISTENT_MSG_STORE, Ref, + [guid_bin(X) || X <- lists:seq(BigCount-2, 1, -3)]), %% ensure empty - false = msg_store_contains(false, GuidsBig), + ok = with_msg_store_client( + ?PERSISTENT_MSG_STORE, Ref, + fun (MSCStateM) -> + false = msg_store_contains(false, GuidsBig, MSCStateM), + MSCStateM + end), %% restart empty restart_msg_store_empty(), passed. @@ -1599,11 +1621,18 @@ test_queue() -> queue_name(<<"test">>). init_test_queue() -> - rabbit_queue_index:init( - test_queue(), true, false, - fun (Guid) -> - rabbit_msg_store:contains(?PERSISTENT_MSG_STORE, Guid) - end). + TestQueue = test_queue(), + Terms = rabbit_queue_index:shutdown_terms(TestQueue), + PRef = proplists:get_value(persistent_ref, Terms, rabbit_guid:guid()), + PersistentClient = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, + PRef), + Res = rabbit_queue_index:recover( + TestQueue, Terms, false, + fun (Guid) -> + rabbit_msg_store:contains(Guid, PersistentClient) + end), + ok = rabbit_msg_store:client_delete_and_terminate(PersistentClient), + Res. restart_test_queue(Qi) -> _ = rabbit_queue_index:terminate([], Qi), @@ -1614,13 +1643,13 @@ restart_test_queue(Qi) -> empty_test_queue() -> ok = rabbit_variable_queue:stop(), ok = rabbit_variable_queue:start([]), - {0, _Terms, Qi} = init_test_queue(), + {0, Qi} = init_test_queue(), _ = rabbit_queue_index:delete_and_terminate(Qi), ok. with_empty_test_queue(Fun) -> ok = empty_test_queue(), - {0, _Terms, Qi} = init_test_queue(), + {0, Qi} = init_test_queue(), rabbit_queue_index:delete_and_terminate(Fun(Qi)). queue_index_publish(SeqIds, Persistent, Qi) -> @@ -1629,18 +1658,17 @@ queue_index_publish(SeqIds, Persistent, Qi) -> true -> ?PERSISTENT_MSG_STORE; false -> ?TRANSIENT_MSG_STORE end, - {A, B, MSCStateEnd} = + MSCState = rabbit_msg_store:client_init(MsgStore, Ref), + {A, B} = lists:foldl( - fun (SeqId, {QiN, SeqIdsGuidsAcc, MSCStateN}) -> + fun (SeqId, {QiN, SeqIdsGuidsAcc}) -> Guid = rabbit_guid:guid(), QiM = rabbit_queue_index:publish( Guid, SeqId, #message_properties{}, Persistent, QiN), - {ok, MSCStateM} = rabbit_msg_store:write(MsgStore, Guid, - Guid, MSCStateN), - {QiM, [{SeqId, Guid} | SeqIdsGuidsAcc], MSCStateM} - end, {Qi, [], rabbit_msg_store:client_init(MsgStore, Ref)}, SeqIds), - ok = rabbit_msg_store:client_delete_and_terminate( - MSCStateEnd, MsgStore, Ref), + ok = rabbit_msg_store:write(Guid, Guid, MSCState), + {QiM, [{SeqId, Guid} | SeqIdsGuidsAcc]} + end, {Qi, []}, SeqIds), + ok = rabbit_msg_store:client_delete_and_terminate(MSCState), {A, B}. verify_read_with_published(_Delivered, _Persistent, [], _) -> @@ -1686,7 +1714,7 @@ test_queue_index() -> ok = verify_read_with_published(false, false, ReadA, lists:reverse(SeqIdsGuidsA)), %% should get length back as 0, as all the msgs were transient - {0, _Terms1, Qi6} = restart_test_queue(Qi4), + {0, Qi6} = restart_test_queue(Qi4), {0, 0, Qi7} = rabbit_queue_index:bounds(Qi6), {Qi8, SeqIdsGuidsB} = queue_index_publish(SeqIdsB, true, Qi7), {0, TwoSegs, Qi9} = rabbit_queue_index:bounds(Qi8), @@ -1695,7 +1723,7 @@ test_queue_index() -> lists:reverse(SeqIdsGuidsB)), %% should get length back as MostOfASegment LenB = length(SeqIdsB), - {LenB, _Terms2, Qi12} = restart_test_queue(Qi10), + {LenB, Qi12} = restart_test_queue(Qi10), {0, TwoSegs, Qi13} = rabbit_queue_index:bounds(Qi12), Qi14 = rabbit_queue_index:deliver(SeqIdsB, Qi13), {ReadC, Qi15} = rabbit_queue_index:read(0, SegmentSize, Qi14), @@ -1707,7 +1735,7 @@ test_queue_index() -> {0, 0, Qi18} = rabbit_queue_index:bounds(Qi17), %% should get length back as 0 because all persistent %% msgs have been acked - {0, _Terms3, Qi19} = restart_test_queue(Qi18), + {0, Qi19} = restart_test_queue(Qi18), Qi19 end), @@ -1779,11 +1807,11 @@ test_queue_index() -> true, Qi0), Qi2 = rabbit_queue_index:deliver([0,1,4], Qi1), Qi3 = rabbit_queue_index:ack([0], Qi2), - {5, _Terms9, Qi4} = restart_test_queue(Qi3), + {5, Qi4} = restart_test_queue(Qi3), {Qi5, _SeqIdsGuidsF} = queue_index_publish([3,6,8], true, Qi4), Qi6 = rabbit_queue_index:deliver([2,3,5,6], Qi5), Qi7 = rabbit_queue_index:ack([1,2,3], Qi6), - {5, _Terms10, Qi8} = restart_test_queue(Qi7), + {5, Qi8} = restart_test_queue(Qi7), Qi8 end), @@ -2020,3 +2048,56 @@ test_queue_recover() -> rabbit_amqqueue:internal_delete(QName) end), passed. + +test_configurable_server_properties() -> + %% List of the names of the built-in properties do we expect to find + BuiltInPropNames = [<<"product">>, <<"version">>, <<"platform">>, + <<"copyright">>, <<"information">>], + + %% Verify that the built-in properties are initially present + ActualPropNames = [Key || + {Key, longstr, _} <- rabbit_reader:server_properties()], + true = lists:all(fun (X) -> lists:member(X, ActualPropNames) end, + BuiltInPropNames), + + %% Get the initial server properties configured in the environment + {ok, ServerProperties} = application:get_env(rabbit, server_properties), + + %% Helper functions + ConsProp = fun (X) -> application:set_env(rabbit, + server_properties, + [X | ServerProperties]) end, + IsPropPresent = fun (X) -> lists:member(X, + rabbit_reader:server_properties()) + end, + + %% Add a wholly new property of the simplified {KeyAtom, StringValue} form + NewSimplifiedProperty = {NewHareKey, NewHareVal} = {hare, "soup"}, + ConsProp(NewSimplifiedProperty), + %% Do we find hare soup, appropriately formatted in the generated properties? + ExpectedHareImage = {list_to_binary(atom_to_list(NewHareKey)), + longstr, + list_to_binary(NewHareVal)}, + true = IsPropPresent(ExpectedHareImage), + + %% Add a wholly new property of the {BinaryKey, Type, Value} form + %% and check for it + NewProperty = {<<"new-bin-key">>, signedint, -1}, + ConsProp(NewProperty), + %% Do we find the new property? + true = IsPropPresent(NewProperty), + + %% Add a property that clobbers a built-in, and verify correct clobbering + {NewVerKey, NewVerVal} = NewVersion = {version, "X.Y.Z."}, + {BinNewVerKey, BinNewVerVal} = {list_to_binary(atom_to_list(NewVerKey)), + list_to_binary(NewVerVal)}, + ConsProp(NewVersion), + ClobberedServerProps = rabbit_reader:server_properties(), + %% Is the clobbering insert present? + true = IsPropPresent({BinNewVerKey, longstr, BinNewVerVal}), + %% Is the clobbering insert the only thing with the clobbering key? + [{BinNewVerKey, longstr, BinNewVerVal}] = + [E || {K, longstr, _V} = E <- ClobberedServerProps, K =:= BinNewVerKey], + + application:set_env(rabbit, server_properties, ServerProperties), + passed. diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index a1c442d3..f88e49c2 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -229,7 +229,6 @@ len, persistent_count, - duration_target, target_ram_msg_count, ram_msg_count, ram_msg_count_prev, @@ -317,7 +316,6 @@ persistent_count :: non_neg_integer(), transient_threshold :: non_neg_integer(), - duration_target :: number() | 'infinity', target_ram_msg_count :: non_neg_integer() | 'infinity', ram_msg_count :: non_neg_integer(), ram_msg_count_prev :: non_neg_integer(), @@ -370,16 +368,17 @@ stop_msg_store() -> ok = rabbit_sup:stop_child(?PERSISTENT_MSG_STORE), ok = rabbit_sup:stop_child(?TRANSIENT_MSG_STORE). -init(QueueName, IsDurable, Recover) -> - {DeltaCount, Terms, IndexState} = - rabbit_queue_index:init( - QueueName, Recover, - rabbit_msg_store:successfully_recovered_state(?PERSISTENT_MSG_STORE), - fun (Guid) -> - rabbit_msg_store:contains(?PERSISTENT_MSG_STORE, Guid) - end), - {LowSeqId, NextSeqId, IndexState1} = rabbit_queue_index:bounds(IndexState), +init(QueueName, IsDurable, false) -> + IndexState = rabbit_queue_index:init(QueueName), + init(IsDurable, IndexState, 0, [], + case IsDurable of + true -> msg_store_client_init(?PERSISTENT_MSG_STORE); + false -> undefined + end, + msg_store_client_init(?TRANSIENT_MSG_STORE)); +init(QueueName, true, true) -> + Terms = rabbit_queue_index:shutdown_terms(QueueName), {PRef, TRef, Terms1} = case [persistent_ref, transient_ref] -- proplists:get_keys(Terms) of [] -> {proplists:get_value(persistent_ref, Terms), @@ -387,64 +386,32 @@ init(QueueName, IsDurable, Recover) -> Terms}; _ -> {rabbit_guid:guid(), rabbit_guid:guid(), []} end, - DeltaCount1 = proplists:get_value(persistent_count, Terms1, DeltaCount), - Delta = case DeltaCount1 == 0 andalso DeltaCount /= undefined of - true -> ?BLANK_DELTA; - false -> #delta { start_seq_id = LowSeqId, - count = DeltaCount1, - end_seq_id = NextSeqId } - end, - Now = now(), - PersistentClient = - case IsDurable of - true -> rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, PRef); - false -> undefined - end, - TransientClient = rabbit_msg_store:client_init(?TRANSIENT_MSG_STORE, TRef), - State = #vqstate { - q1 = queue:new(), - q2 = bpqueue:new(), - delta = Delta, - q3 = bpqueue:new(), - q4 = queue:new(), - next_seq_id = NextSeqId, - pending_ack = dict:new(), - index_state = IndexState1, - msg_store_clients = {{PersistentClient, PRef}, - {TransientClient, TRef}}, - on_sync = ?BLANK_SYNC, - durable = IsDurable, - transient_threshold = NextSeqId, - - len = DeltaCount1, - persistent_count = DeltaCount1, - - duration_target = infinity, - target_ram_msg_count = infinity, - ram_msg_count = 0, - ram_msg_count_prev = 0, - ram_index_count = 0, - out_counter = 0, - in_counter = 0, - rates = #rates { egress = {Now, 0}, - ingress = {Now, DeltaCount1}, - avg_egress = 0.0, - avg_ingress = 0.0, - timestamp = Now } }, - a(maybe_deltas_to_betas(State)). + PersistentClient = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, + PRef), + TransientClient = rabbit_msg_store:client_init(?TRANSIENT_MSG_STORE, + TRef), + {DeltaCount, IndexState} = + rabbit_queue_index:recover( + QueueName, Terms1, + rabbit_msg_store:successfully_recovered_state(?PERSISTENT_MSG_STORE), + fun (Guid) -> + rabbit_msg_store:contains(Guid, PersistentClient) + end), + init(true, IndexState, DeltaCount, Terms1, + PersistentClient, TransientClient). terminate(State) -> State1 = #vqstate { persistent_count = PCount, index_state = IndexState, - msg_store_clients = {{MSCStateP, PRef}, - {MSCStateT, TRef}} } = + msg_store_clients = {MSCStateP, MSCStateT} } = remove_pending_ack(true, tx_commit_index(State)), - case MSCStateP of - undefined -> ok; - _ -> rabbit_msg_store:client_terminate( - MSCStateP, ?PERSISTENT_MSG_STORE) - end, - rabbit_msg_store:client_terminate(MSCStateT, ?TRANSIENT_MSG_STORE), + PRef = case MSCStateP of + undefined -> undefined; + _ -> ok = rabbit_msg_store:client_terminate(MSCStateP), + rabbit_msg_store:client_ref(MSCStateP) + end, + ok = rabbit_msg_store:client_terminate(MSCStateT), + TRef = rabbit_msg_store:client_ref(MSCStateT), Terms = [{persistent_ref, PRef}, {transient_ref, TRef}, {persistent_count, PCount}], @@ -460,44 +427,44 @@ delete_and_terminate(State) -> %% deleting it. {_PurgeCount, State1} = purge(State), State2 = #vqstate { index_state = IndexState, - msg_store_clients = {{MSCStateP, PRef}, - {MSCStateT, TRef}} } = + msg_store_clients = {MSCStateP, MSCStateT} } = remove_pending_ack(false, State1), IndexState1 = rabbit_queue_index:delete_and_terminate(IndexState), case MSCStateP of undefined -> ok; - _ -> rabbit_msg_store:client_delete_and_terminate( - MSCStateP, ?PERSISTENT_MSG_STORE, PRef) + _ -> rabbit_msg_store:client_delete_and_terminate(MSCStateP) end, - rabbit_msg_store:client_delete_and_terminate( - MSCStateT, ?TRANSIENT_MSG_STORE, TRef), + rabbit_msg_store:client_delete_and_terminate(MSCStateT), a(State2 #vqstate { index_state = IndexState1, msg_store_clients = undefined }). -purge(State = #vqstate { q4 = Q4, - index_state = IndexState, - len = Len, - persistent_count = PCount }) -> +purge(State = #vqstate { q4 = Q4, + index_state = IndexState, + msg_store_clients = MSCState, + len = Len, + persistent_count = PCount }) -> %% TODO: when there are no pending acks, which is a common case, %% we could simply wipe the qi instead of issuing delivers and %% acks for all the messages. {LensByStore, IndexState1} = remove_queue_entries( fun rabbit_misc:queue_fold/3, Q4, - orddict:new(), IndexState), - {LensByStore1, State1 = #vqstate { q1 = Q1, index_state = IndexState2 }} = + orddict:new(), IndexState, MSCState), + {LensByStore1, State1 = #vqstate { q1 = Q1, + index_state = IndexState2, + msg_store_clients = MSCState1 }} = purge_betas_and_deltas(LensByStore, State #vqstate { q4 = queue:new(), index_state = IndexState1 }), {LensByStore2, IndexState3} = remove_queue_entries( fun rabbit_misc:queue_fold/3, Q1, - LensByStore1, IndexState2), + LensByStore1, IndexState2, MSCState1), PCount1 = PCount - find_persistent_count(LensByStore2), - {Len, a(State1 #vqstate { q1 = queue:new(), - index_state = IndexState3, - len = 0, - ram_msg_count = 0, - ram_index_count = 0, - persistent_count = PCount1 })}. + {Len, a(State1 #vqstate { q1 = queue:new(), + index_state = IndexState3, + len = 0, + ram_msg_count = 0, + ram_index_count = 0, + persistent_count = PCount1 })}. publish(Msg, MsgProps, State) -> {_SeqId, State1} = publish(Msg, MsgProps, false, false, State), @@ -574,7 +541,7 @@ read_msg(MsgStatus = #msg_status { msg = undefined, State = #vqstate { ram_msg_count = RamMsgCount, msg_store_clients = MSCState}) -> {{ok, Msg = #basic_message {}}, MSCState1} = - read_from_msg_store(MSCState, IsPersistent, Guid), + msg_store_read(MSCState, IsPersistent, Guid), {MsgStatus #msg_status { msg = Msg }, State #vqstate { ram_msg_count = RamMsgCount + 1, msg_store_clients = MSCState1 }}; @@ -589,20 +556,22 @@ internal_fetch(AckRequired, MsgStatus = #msg_status { is_delivered = IsDelivered, msg_on_disk = MsgOnDisk, index_on_disk = IndexOnDisk }, - State = #vqstate {ram_msg_count = RamMsgCount, - out_counter = OutCount, - index_state = IndexState, - len = Len, - persistent_count = PCount, - pending_ack = PA }) -> + State = #vqstate {ram_msg_count = RamMsgCount, + out_counter = OutCount, + index_state = IndexState, + msg_store_clients = MSCState, + len = Len, + persistent_count = PCount, + pending_ack = PA }) -> %% 1. Mark it delivered if necessary IndexState1 = maybe_write_delivered( IndexOnDisk andalso not IsDelivered, SeqId, IndexState), %% 2. Remove from msg_store and queue index, if necessary - MsgStore = find_msg_store(IsPersistent), - Rem = fun () -> ok = rabbit_msg_store:remove(MsgStore, [Guid]) end, + Rem = fun () -> + ok = msg_store_remove(MSCState, IsPersistent, [Guid]) + end, Ack = fun () -> rabbit_queue_index:ack([SeqId], IndexState1) end, IndexState2 = case {AckRequired, MsgOnDisk, IndexOnDisk, IsPersistent} of @@ -634,7 +603,7 @@ internal_fetch(AckRequired, MsgStatus = #msg_status { pending_ack = PA1 })}. ack(AckTags, State) -> - a(ack(fun rabbit_msg_store:remove/2, + a(ack(fun msg_store_remove/3, fun (_AckEntry, State1) -> State1 end, AckTags, State)). @@ -643,30 +612,32 @@ tx_publish(Txn, Msg = #basic_message { is_persistent = IsPersistent }, MsgProps, msg_store_clients = MSCState }) -> Tx = #tx { pending_messages = Pubs } = lookup_tx(Txn), store_tx(Txn, Tx #tx { pending_messages = [{Msg, MsgProps} | Pubs] }), - a(case IsPersistent andalso IsDurable of - true -> MsgStatus = msg_status(true, undefined, Msg, MsgProps), - {#msg_status { msg_on_disk = true }, MSCState1} = - maybe_write_msg_to_disk(false, MsgStatus, MSCState), - State #vqstate { msg_store_clients = MSCState1 }; - false -> State - end). + case IsPersistent andalso IsDurable of + true -> MsgStatus = msg_status(true, undefined, Msg, MsgProps), + #msg_status { msg_on_disk = true } = + maybe_write_msg_to_disk(false, MsgStatus, MSCState); + false -> ok + end, + a(State). tx_ack(Txn, AckTags, State) -> Tx = #tx { pending_acks = Acks } = lookup_tx(Txn), store_tx(Txn, Tx #tx { pending_acks = [AckTags | Acks] }), State. -tx_rollback(Txn, State = #vqstate { durable = IsDurable }) -> +tx_rollback(Txn, State = #vqstate { durable = IsDurable, + msg_store_clients = MSCState }) -> #tx { pending_acks = AckTags, pending_messages = Pubs } = lookup_tx(Txn), erase_tx(Txn), ok = case IsDurable of - true -> rabbit_msg_store:remove(?PERSISTENT_MSG_STORE, - persistent_guids(Pubs)); + true -> msg_store_remove(MSCState, true, persistent_guids(Pubs)); false -> ok end, {lists:append(AckTags), a(State)}. -tx_commit(Txn, Fun, MsgPropsFun, State = #vqstate { durable = IsDurable }) -> +tx_commit(Txn, Fun, MsgPropsFun, + State = #vqstate { durable = IsDurable, + msg_store_clients = MSCState }) -> #tx { pending_acks = AckTags, pending_messages = Pubs } = lookup_tx(Txn), erase_tx(Txn), AckTags1 = lists:append(AckTags), @@ -674,8 +645,8 @@ tx_commit(Txn, Fun, MsgPropsFun, State = #vqstate { durable = IsDurable }) -> HasPersistentPubs = PersistentGuids =/= [], {AckTags1, a(case IsDurable andalso HasPersistentPubs of - true -> ok = rabbit_msg_store:sync( - ?PERSISTENT_MSG_STORE, PersistentGuids, + true -> ok = msg_store_sync( + MSCState, true, PersistentGuids, msg_store_callback(PersistentGuids, Pubs, AckTags1, Fun, MsgPropsFun)), State; @@ -685,7 +656,7 @@ tx_commit(Txn, Fun, MsgPropsFun, State = #vqstate { durable = IsDurable }) -> requeue(AckTags, MsgPropsFun, State) -> a(reduce_memory_use( - ack(fun rabbit_msg_store:release/2, + ack(fun msg_store_release/3, fun (#msg_status { msg = Msg, msg_props = MsgProps }, State1) -> {_SeqId, State2} = publish(Msg, MsgPropsFun(MsgProps), true, false, State1), @@ -693,7 +664,7 @@ requeue(AckTags, MsgPropsFun, State) -> ({IsPersistent, Guid, MsgProps}, State1) -> #vqstate { msg_store_clients = MSCState } = State1, {{ok, Msg = #basic_message{}}, MSCState1} = - read_from_msg_store(MSCState, IsPersistent, Guid), + msg_store_read(MSCState, IsPersistent, Guid), State2 = State1 #vqstate { msg_store_clients = MSCState1 }, {_SeqId, State3} = publish(Msg, MsgPropsFun(MsgProps), true, true, State2), @@ -716,8 +687,7 @@ set_ram_duration_target(DurationTarget, infinity -> infinity; _ -> trunc(DurationTarget * Rate) %% msgs = sec * msgs/sec end, - State1 = State #vqstate { target_ram_msg_count = TargetRamMsgCount1, - duration_target = DurationTarget }, + State1 = State #vqstate { target_ram_msg_count = TargetRamMsgCount1 }, a(case TargetRamMsgCount1 == infinity orelse (TargetRamMsgCount =/= infinity andalso TargetRamMsgCount1 >= TargetRamMsgCount) of @@ -732,7 +702,6 @@ ram_duration(State = #vqstate { in_counter = InCount, out_counter = OutCount, ram_msg_count = RamMsgCount, - duration_target = DurationTarget, ram_msg_count_prev = RamMsgCountPrev }) -> Now = now(), {AvgEgressRate, Egress1} = update_rate(Now, Timestamp, OutCount, Egress), @@ -745,18 +714,16 @@ ram_duration(State = #vqstate { (2 * (AvgEgressRate + AvgIngressRate)) end, - {Duration, set_ram_duration_target( - DurationTarget, - State #vqstate { - rates = Rates #rates { - egress = Egress1, - ingress = Ingress1, - avg_egress = AvgEgressRate, - avg_ingress = AvgIngressRate, - timestamp = Now }, - in_counter = 0, - out_counter = 0, - ram_msg_count_prev = RamMsgCount })}. + {Duration, State #vqstate { + rates = Rates #rates { + egress = Egress1, + ingress = Ingress1, + avg_egress = AvgEgressRate, + avg_ingress = AvgIngressRate, + timestamp = Now }, + in_counter = 0, + out_counter = 0, + ram_msg_count_prev = RamMsgCount }}. needs_idle_timeout(State = #vqstate { on_sync = ?BLANK_SYNC }) -> {Res, _State} = reduce_memory_use(fun (_Quota, State1) -> State1 end, @@ -851,22 +818,47 @@ msg_status(IsPersistent, SeqId, Msg = #basic_message { guid = Guid }, msg_on_disk = false, index_on_disk = false, msg_props = MsgProps }. -find_msg_store(true) -> ?PERSISTENT_MSG_STORE; -find_msg_store(false) -> ?TRANSIENT_MSG_STORE. - -with_msg_store_state({{MSCStateP, PRef}, MSCStateT}, true, Fun) -> - {Result, MSCStateP1} = Fun(?PERSISTENT_MSG_STORE, MSCStateP), - {Result, {{MSCStateP1, PRef}, MSCStateT}}; -with_msg_store_state({MSCStateP, {MSCStateT, TRef}}, false, Fun) -> - {Result, MSCStateT1} = Fun(?TRANSIENT_MSG_STORE, MSCStateT), - {Result, {MSCStateP, {MSCStateT1, TRef}}}. +with_msg_store_state({MSCStateP, MSCStateT}, true, Fun) -> + {Result, MSCStateP1} = Fun(MSCStateP), + {Result, {MSCStateP1, MSCStateT}}; +with_msg_store_state({MSCStateP, MSCStateT}, false, Fun) -> + {Result, MSCStateT1} = Fun(MSCStateT), + {Result, {MSCStateP, MSCStateT1}}. + +with_immutable_msg_store_state(MSCState, IsPersistent, Fun) -> + {Res, MSCState} = with_msg_store_state(MSCState, IsPersistent, + fun (MSCState1) -> + {Fun(MSCState1), MSCState1} + end), + Res. + +msg_store_client_init(MsgStore) -> + rabbit_msg_store:client_init(MsgStore, rabbit_guid:guid()). + +msg_store_write(MSCState, IsPersistent, Guid, Msg) -> + with_immutable_msg_store_state( + MSCState, IsPersistent, + fun (MSCState1) -> rabbit_msg_store:write(Guid, Msg, MSCState1) end). -read_from_msg_store(MSCState, IsPersistent, Guid) -> +msg_store_read(MSCState, IsPersistent, Guid) -> with_msg_store_state( MSCState, IsPersistent, - fun (MsgStore, MSCState1) -> - rabbit_msg_store:read(MsgStore, Guid, MSCState1) - end). + fun (MSCState1) -> rabbit_msg_store:read(Guid, MSCState1) end). + +msg_store_remove(MSCState, IsPersistent, Guids) -> + with_immutable_msg_store_state( + MSCState, IsPersistent, + fun (MCSState1) -> rabbit_msg_store:remove(Guids, MCSState1) end). + +msg_store_release(MSCState, IsPersistent, Guids) -> + with_immutable_msg_store_state( + MSCState, IsPersistent, + fun (MCSState1) -> rabbit_msg_store:release(Guids, MCSState1) end). + +msg_store_sync(MSCState, IsPersistent, Guids, Callback) -> + with_immutable_msg_store_state( + MSCState, IsPersistent, + fun (MSCState1) -> rabbit_msg_store:sync(Guids, Callback, MSCState1) end). maybe_write_delivered(false, _SeqId, IndexState) -> IndexState; @@ -950,6 +942,48 @@ update_rate(Now, Then, Count, {OThen, OCount}) -> %% Internal major helpers for Public API %%---------------------------------------------------------------------------- +init(IsDurable, IndexState, DeltaCount, Terms, + PersistentClient, TransientClient) -> + {LowSeqId, NextSeqId, IndexState1} = rabbit_queue_index:bounds(IndexState), + + DeltaCount1 = proplists:get_value(persistent_count, Terms, DeltaCount), + Delta = case DeltaCount1 == 0 andalso DeltaCount /= undefined of + true -> ?BLANK_DELTA; + false -> #delta { start_seq_id = LowSeqId, + count = DeltaCount1, + end_seq_id = NextSeqId } + end, + Now = now(), + State = #vqstate { + q1 = queue:new(), + q2 = bpqueue:new(), + delta = Delta, + q3 = bpqueue:new(), + q4 = queue:new(), + next_seq_id = NextSeqId, + pending_ack = dict:new(), + index_state = IndexState1, + msg_store_clients = {PersistentClient, TransientClient}, + on_sync = ?BLANK_SYNC, + durable = IsDurable, + transient_threshold = NextSeqId, + + len = DeltaCount1, + persistent_count = DeltaCount1, + + target_ram_msg_count = infinity, + ram_msg_count = 0, + ram_msg_count_prev = 0, + ram_index_count = 0, + out_counter = 0, + in_counter = 0, + rates = #rates { egress = {Now, 0}, + ingress = {Now, DeltaCount1}, + avg_egress = 0.0, + avg_ingress = 0.0, + timestamp = Now } }, + a(maybe_deltas_to_betas(State)). + msg_store_callback(PersistentGuids, Pubs, AckTags, Fun, MsgPropsFun) -> Self = self(), F = fun () -> rabbit_amqqueue:maybe_run_queue_via_backing_queue( @@ -959,13 +993,17 @@ msg_store_callback(PersistentGuids, Pubs, AckTags, Fun, MsgPropsFun) -> end) end, fun () -> spawn(fun () -> ok = rabbit_misc:with_exit_handler( - fun () -> rabbit_msg_store:remove( - ?PERSISTENT_MSG_STORE, + fun () -> remove_persistent_messages( PersistentGuids) end, F) end) end. +remove_persistent_messages(Guids) -> + PersistentClient = msg_store_client_init(?PERSISTENT_MSG_STORE), + ok = rabbit_msg_store:remove(Guids, PersistentClient), + rabbit_msg_store:client_delete_and_terminate(PersistentClient). + tx_commit_post_msg_store(HasPersistentPubs, Pubs, AckTags, Fun, MsgPropsFun, State = #vqstate { on_sync = OnSync = #sync { @@ -1031,13 +1069,14 @@ tx_commit_index(State = #vqstate { on_sync = #sync { State1 #vqstate { index_state = IndexState1, on_sync = ?BLANK_SYNC }). purge_betas_and_deltas(LensByStore, - State = #vqstate { q3 = Q3, - index_state = IndexState }) -> + State = #vqstate { q3 = Q3, + index_state = IndexState, + msg_store_clients = MSCState }) -> case bpqueue:is_empty(Q3) of true -> {LensByStore, State}; - false -> {LensByStore1, IndexState1} = remove_queue_entries( - fun beta_fold/3, Q3, - LensByStore, IndexState), + false -> {LensByStore1, IndexState1} = + remove_queue_entries(fun beta_fold/3, Q3, + LensByStore, IndexState, MSCState), purge_betas_and_deltas(LensByStore1, maybe_deltas_to_betas( State #vqstate { @@ -1045,11 +1084,11 @@ purge_betas_and_deltas(LensByStore, index_state = IndexState1 })) end. -remove_queue_entries(Fold, Q, LensByStore, IndexState) -> +remove_queue_entries(Fold, Q, LensByStore, IndexState, MSCState) -> {GuidsByStore, Delivers, Acks} = Fold(fun remove_queue_entries1/2, {orddict:new(), [], []}, Q), - ok = orddict:fold(fun (MsgStore, Guids, ok) -> - rabbit_msg_store:remove(MsgStore, Guids) + ok = orddict:fold(fun (IsPersistent, Guids, ok) -> + msg_store_remove(MSCState, IsPersistent, Guids) end, ok, GuidsByStore), {sum_guids_by_store_to_len(LensByStore, GuidsByStore), rabbit_queue_index:ack(Acks, @@ -1061,8 +1100,7 @@ remove_queue_entries1( index_on_disk = IndexOnDisk, is_persistent = IsPersistent }, {GuidsByStore, Delivers, Acks}) -> {case MsgOnDisk of - true -> rabbit_misc:orddict_cons(find_msg_store(IsPersistent), Guid, - GuidsByStore); + true -> rabbit_misc:orddict_cons(IsPersistent, Guid, GuidsByStore); false -> GuidsByStore end, cons_if(IndexOnDisk andalso not IsDelivered, SeqId, Delivers), @@ -1070,8 +1108,8 @@ remove_queue_entries1( sum_guids_by_store_to_len(LensByStore, GuidsByStore) -> orddict:fold( - fun (MsgStore, Guids, LensByStore1) -> - orddict:update_counter(MsgStore, length(Guids), LensByStore1) + fun (IsPersistent, Guids, LensByStore1) -> + orddict:update_counter(IsPersistent, length(Guids), LensByStore1) end, LensByStore, GuidsByStore). %%---------------------------------------------------------------------------- @@ -1103,8 +1141,8 @@ publish(Msg = #basic_message { is_persistent = IsPersistent }, ram_msg_count = RamMsgCount + 1}}. maybe_write_msg_to_disk(_Force, MsgStatus = #msg_status { - msg_on_disk = true }, MSCState) -> - {MsgStatus, MSCState}; + msg_on_disk = true }, _MSCState) -> + MsgStatus; maybe_write_msg_to_disk(Force, MsgStatus = #msg_status { msg = Msg, guid = Guid, is_persistent = IsPersistent }, MSCState) @@ -1113,15 +1151,10 @@ maybe_write_msg_to_disk(Force, MsgStatus = #msg_status { %% don't persist any recoverable decoded properties content = rabbit_binary_parser:clear_decoded_content( Msg #basic_message.content)}, - {ok, MSCState1} = with_msg_store_state( - MSCState, IsPersistent, - fun (MsgStore, MSCState2) -> - rabbit_msg_store:write(MsgStore, Guid, Msg1, - MSCState2) - end), - {MsgStatus #msg_status { msg_on_disk = true }, MSCState1}; -maybe_write_msg_to_disk(_Force, MsgStatus, MSCState) -> - {MsgStatus, MSCState}. + ok = msg_store_write(MSCState, IsPersistent, Guid, Msg1), + MsgStatus #msg_status { msg_on_disk = true }; +maybe_write_msg_to_disk(_Force, MsgStatus, _MSCState) -> + MsgStatus. maybe_write_index_to_disk(_Force, MsgStatus = #msg_status { index_on_disk = true }, IndexState) -> @@ -1145,12 +1178,10 @@ maybe_write_index_to_disk(_Force, MsgStatus, IndexState) -> maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus, State = #vqstate { index_state = IndexState, msg_store_clients = MSCState }) -> - {MsgStatus1, MSCState1} = maybe_write_msg_to_disk( - ForceMsg, MsgStatus, MSCState), - {MsgStatus2, IndexState1} = maybe_write_index_to_disk( - ForceIndex, MsgStatus1, IndexState), - {MsgStatus2, State #vqstate { index_state = IndexState1, - msg_store_clients = MSCState1 }}. + MsgStatus1 = maybe_write_msg_to_disk(ForceMsg, MsgStatus, MSCState), + {MsgStatus2, IndexState1} = + maybe_write_index_to_disk(ForceIndex, MsgStatus1, IndexState), + {MsgStatus2, State #vqstate { index_state = IndexState1 }}. %%---------------------------------------------------------------------------- %% Internal gubbins for acks @@ -1168,22 +1199,23 @@ record_pending_ack(#msg_status { seq_id = SeqId, dict:store(SeqId, AckEntry, PA). remove_pending_ack(KeepPersistent, - State = #vqstate { pending_ack = PA, - index_state = IndexState }) -> + State = #vqstate { pending_ack = PA, + index_state = IndexState, + msg_store_clients = MSCState }) -> {SeqIds, GuidsByStore} = dict:fold(fun accumulate_ack/3, {[], orddict:new()}, PA), State1 = State #vqstate { pending_ack = dict:new() }, case KeepPersistent of - true -> case orddict:find(?TRANSIENT_MSG_STORE, GuidsByStore) of + true -> case orddict:find(false, GuidsByStore) of error -> State1; - {ok, Guids} -> ok = rabbit_msg_store:remove( - ?TRANSIENT_MSG_STORE, Guids), + {ok, Guids} -> ok = msg_store_remove(MSCState, false, + Guids), State1 end; false -> IndexState1 = rabbit_queue_index:ack(SeqIds, IndexState), ok = orddict:fold( - fun (MsgStore, Guids, ok) -> - rabbit_msg_store:remove(MsgStore, Guids) + fun (IsPersistent, Guids, ok) -> + msg_store_remove(MSCState, IsPersistent, Guids) end, ok, GuidsByStore), State1 #vqstate { index_state = IndexState1 } end. @@ -1191,18 +1223,20 @@ remove_pending_ack(KeepPersistent, ack(_MsgStoreFun, _Fun, [], State) -> State; ack(MsgStoreFun, Fun, AckTags, State) -> - {{SeqIds, GuidsByStore}, State1 = #vqstate { index_state = IndexState, - persistent_count = PCount }} = + {{SeqIds, GuidsByStore}, + State1 = #vqstate { index_state = IndexState, + msg_store_clients = MSCState, + persistent_count = PCount }} = lists:foldl( fun (SeqId, {Acc, State2 = #vqstate { pending_ack = PA }}) -> - {ok, AckEntry} = dict:find(SeqId, PA), + AckEntry = dict:fetch(SeqId, PA), {accumulate_ack(SeqId, AckEntry, Acc), Fun(AckEntry, State2 #vqstate { pending_ack = dict:erase(SeqId, PA) })} end, {{[], orddict:new()}, State}, AckTags), IndexState1 = rabbit_queue_index:ack(SeqIds, IndexState), - ok = orddict:fold(fun (MsgStore, Guids, ok) -> - MsgStoreFun(MsgStore, Guids) + ok = orddict:fold(fun (IsPersistent, Guids, ok) -> + MsgStoreFun(MSCState, IsPersistent, Guids) end, ok, GuidsByStore), PCount1 = PCount - find_persistent_count(sum_guids_by_store_to_len( orddict:new(), GuidsByStore)), @@ -1215,10 +1249,10 @@ accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS Acc; accumulate_ack(SeqId, {IsPersistent, Guid, _MsgProps}, {SeqIdsAcc, Dict}) -> {cons_if(IsPersistent, SeqId, SeqIdsAcc), - rabbit_misc:orddict_cons(find_msg_store(IsPersistent), Guid, Dict)}. + rabbit_misc:orddict_cons(IsPersistent, Guid, Dict)}. find_persistent_count(LensByStore) -> - case orddict:find(?PERSISTENT_MSG_STORE, LensByStore) of + case orddict:find(true, LensByStore) of error -> 0; {ok, Len} -> Len end. @@ -1360,46 +1394,40 @@ maybe_deltas_to_betas(State = #vqstate { delta = Delta, q3 = Q3, index_state = IndexState, - target_ram_msg_count = TargetRamMsgCount, transient_threshold = TransientThreshold }) -> - case bpqueue:is_empty(Q3) orelse (TargetRamMsgCount /= 0) of - false -> - State; - true -> - #delta { start_seq_id = DeltaSeqId, - count = DeltaCount, - end_seq_id = DeltaSeqIdEnd } = Delta, - DeltaSeqId1 = - lists:min([rabbit_queue_index:next_segment_boundary(DeltaSeqId), - DeltaSeqIdEnd]), - {List, IndexState1} = - rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1, IndexState), - {Q3a, IndexState2} = betas_from_index_entries( - List, TransientThreshold, IndexState1), - State1 = State #vqstate { index_state = IndexState2 }, - case bpqueue:len(Q3a) of + #delta { start_seq_id = DeltaSeqId, + count = DeltaCount, + end_seq_id = DeltaSeqIdEnd } = Delta, + DeltaSeqId1 = + lists:min([rabbit_queue_index:next_segment_boundary(DeltaSeqId), + DeltaSeqIdEnd]), + {List, IndexState1} = + rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1, IndexState), + {Q3a, IndexState2} = + betas_from_index_entries(List, TransientThreshold, IndexState1), + State1 = State #vqstate { index_state = IndexState2 }, + case bpqueue:len(Q3a) of + 0 -> + %% we ignored every message in the segment due to it being + %% transient and below the threshold + maybe_deltas_to_betas( + State1 #vqstate { + delta = Delta #delta { start_seq_id = DeltaSeqId1 }}); + Q3aLen -> + Q3b = bpqueue:join(Q3, Q3a), + case DeltaCount - Q3aLen of 0 -> - %% we ignored every message in the segment due to - %% it being transient and below the threshold - maybe_deltas_to_betas( - State #vqstate { - delta = Delta #delta { start_seq_id = DeltaSeqId1 }}); - Q3aLen -> - Q3b = bpqueue:join(Q3, Q3a), - case DeltaCount - Q3aLen of - 0 -> - %% delta is now empty, but it wasn't - %% before, so can now join q2 onto q3 - State1 #vqstate { q2 = bpqueue:new(), - delta = ?BLANK_DELTA, - q3 = bpqueue:join(Q3b, Q2) }; - N when N > 0 -> - Delta1 = #delta { start_seq_id = DeltaSeqId1, - count = N, - end_seq_id = DeltaSeqIdEnd }, - State1 #vqstate { delta = Delta1, - q3 = Q3b } - end + %% delta is now empty, but it wasn't before, so + %% can now join q2 onto q3 + State1 #vqstate { q2 = bpqueue:new(), + delta = ?BLANK_DELTA, + q3 = bpqueue:join(Q3b, Q2) }; + N when N > 0 -> + Delta1 = #delta { start_seq_id = DeltaSeqId1, + count = N, + end_seq_id = DeltaSeqIdEnd }, + State1 #vqstate { delta = Delta1, + q3 = Q3b } end end. |