diff options
author | Michael Bridgen <mikeb@rabbitmq.com> | 2010-11-12 17:54:52 +0000 |
---|---|---|
committer | Michael Bridgen <mikeb@rabbitmq.com> | 2010-11-12 17:54:52 +0000 |
commit | 8fffd3efed0ae3fe9af9b2b9b027850eaa5fc163 (patch) | |
tree | 9e5f86ef578e5e08f9cdf107b13e96b57f9d5e73 | |
parent | 14e57c73119b554e7e31238a95b80bfc9843f74c (diff) | |
parent | 0cef8cb483d0108cd21a252952376e4455559e71 (diff) | |
download | rabbitmq-server-8fffd3efed0ae3fe9af9b2b9b027850eaa5fc163.tar.gz |
Merge bug23354 (remove xmlto and half of universe as macports deps)
54 files changed, 2171 insertions, 1407 deletions
@@ -3,6 +3,7 @@ TMPDIR ?= /tmp RABBITMQ_NODENAME ?= rabbit RABBITMQ_SERVER_START_ARGS ?= RABBITMQ_MNESIA_DIR ?= $(TMPDIR)/rabbitmq-$(RABBITMQ_NODENAME)-mnesia +RABBITMQ_PLUGINS_EXPAND_DIR ?= $(TMPDIR)/rabbitmq-$(RABBITMQ_NODENAME)-plugins-scratch RABBITMQ_LOG_BASE ?= $(TMPDIR) DEPS_FILE=deps.mk @@ -92,7 +93,7 @@ all: $(TARGETS) $(DEPS_FILE): $(SOURCES) $(INCLUDES) rm -f $@ - escript generate_deps $(INCLUDE_DIR) $(SOURCE_DIR) \$$\(EBIN_DIR\) $@ + echo $(subst : ,:,$(foreach FILE,$^,$(FILE):)) | escript generate_deps $@ $(EBIN_DIR) $(EBIN_DIR)/rabbit.app: $(EBIN_DIR)/rabbit_app.in $(BEAM_TARGETS) generate_app escript generate_app $(EBIN_DIR) $@ < $< @@ -110,27 +111,23 @@ $(SOURCE_DIR)/rabbit_framing_amqp_0_8.erl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_c $(PYTHON) codegen.py body $(AMQP_SPEC_JSON_FILES_0_8) $@ dialyze: $(BEAM_TARGETS) $(BASIC_PLT) - $(ERL_EBIN) -eval \ - "rabbit_dialyzer:dialyze_files(\"$(BASIC_PLT)\", \"$(BEAM_TARGETS)\")." \ - -eval \ - "init:stop()." - - + dialyzer --plt $(BASIC_PLT) --no_native \ + -Wrace_conditions $(BEAM_TARGETS) # rabbit.plt is used by rabbitmq-erlang-client's dialyze make target create-plt: $(RABBIT_PLT) $(RABBIT_PLT): $(BEAM_TARGETS) $(BASIC_PLT) - cp $(BASIC_PLT) $@ - $(ERL_EBIN) -eval \ - "rabbit_dialyzer:halt_with_code(rabbit_dialyzer:add_to_plt(\"$@\", \"$(BEAM_TARGETS)\"))." + dialyzer --plt $(BASIC_PLT) --output_plt $@ --no_native \ + --add_to_plt $(BEAM_TARGETS) $(BASIC_PLT): $(BEAM_TARGETS) if [ -f $@ ]; then \ touch $@; \ else \ - $(ERL_EBIN) -eval \ - "rabbit_dialyzer:halt_with_code(rabbit_dialyzer:create_basic_plt(\"$@\"))."; \ + dialyzer --output_plt $@ --build_plt \ + --apps erts kernel stdlib compiler sasl os_mon mnesia tools \ + public_key crypto ssl; \ fi clean: @@ -150,7 +147,8 @@ BASIC_SCRIPT_ENVIRONMENT_SETTINGS=\ RABBITMQ_NODE_IP_ADDRESS="$(RABBITMQ_NODE_IP_ADDRESS)" \ RABBITMQ_NODE_PORT="$(RABBITMQ_NODE_PORT)" \ RABBITMQ_LOG_BASE="$(RABBITMQ_LOG_BASE)" \ - RABBITMQ_MNESIA_DIR="$(RABBITMQ_MNESIA_DIR)" + RABBITMQ_MNESIA_DIR="$(RABBITMQ_MNESIA_DIR)" \ + RABBITMQ_PLUGINS_EXPAND_DIR="$(RABBITMQ_PLUGINS_EXPAND_DIR)" run: all $(BASIC_SCRIPT_ENVIRONMENT_SETTINGS) \ @@ -240,8 +240,6 @@ def genErl(spec): elif type == 'table': print " F%d = rabbit_binary_parser:parse_table(F%dTab)," % \ (f.index, f.index) - elif type == 'shortstr': - print " if F%dLen > 255 -> exit(method_field_shortstr_overflow); true -> ok end," % (f.index) else: pass @@ -278,8 +276,7 @@ def genErl(spec): print " F%dTab = rabbit_binary_generator:generate_table(F%d)," % (f.index, f.index) print " F%dLen = size(F%dTab)," % (f.index, f.index) elif type == 'shortstr': - print " F%dLen = size(F%d)," % (f.index, f.index) - print " if F%dLen > 255 -> exit(method_field_shortstr_overflow); true -> ok end," % (f.index) + print " F%dLen = shortstr_size(F%d)," % (f.index, f.index) elif type == 'longstr': print " F%dLen = size(F%d)," % (f.index, f.index) else: @@ -348,9 +345,11 @@ def genErl(spec): print "%% Various types" print "-ifdef(use_specs)." - print """-export_type([amqp_table/0, amqp_property_type/0, amqp_method_record/0, - amqp_method_name/0, amqp_method/0, amqp_class_id/0, - amqp_value/0, amqp_array/0, amqp_exception/0, amqp_property_record/0]). + print """-export_type([amqp_field_type/0, amqp_property_type/0, + amqp_table/0, amqp_array/0, amqp_value/0, + amqp_method_name/0, amqp_method/0, amqp_method_record/0, + amqp_method_field_name/0, amqp_property_record/0, + amqp_exception/0, amqp_exception_code/0, amqp_class_id/0]). -type(amqp_field_type() :: 'longstr' | 'signedint' | 'decimal' | 'timestamp' | @@ -418,7 +417,7 @@ def genErl(spec): (amqp_method_name(), binary()) -> amqp_method_record() | rabbit_types:connection_exit()). -spec(decode_properties/2 :: (non_neg_integer(), binary()) -> amqp_property_record()). -spec(encode_method_fields/1 :: (amqp_method_record()) -> binary()). --spec(encode_properties/1 :: (amqp_method_record()) -> binary()). +-spec(encode_properties/1 :: (amqp_property_record()) -> binary()). -spec(lookup_amqp_exception/1 :: (amqp_exception()) -> {boolean(), amqp_exception_code(), binary()}). -spec(amqp_exception/1 :: (amqp_exception_code()) -> amqp_exception()). -endif. % use_specs @@ -426,6 +425,12 @@ def genErl(spec): bitvalue(true) -> 1; bitvalue(false) -> 0; bitvalue(undefined) -> 0. + +shortstr_size(S) -> + case size(S) of + Len when Len =< 255 -> Len; + _ -> exit(method_field_shortstr_overflow) + end. """ version = "{%d, %d, %d}" % (spec.major, spec.minor, spec.revision) if version == '{8, 0, 0}': version = '{0, 8, 0}' diff --git a/docs/html-to-website-xml.xsl b/docs/html-to-website-xml.xsl index c325bb5a..ec8f87e5 100644 --- a/docs/html-to-website-xml.xsl +++ b/docs/html-to-website-xml.xsl @@ -17,7 +17,7 @@ <!-- Copy the root node, and munge the outer part of the page --> <xsl:template match="/html"> <xsl:processing-instruction name="xml-stylesheet">type="text/xml" href="page.xsl"</xsl:processing-instruction> -<html xmlns:doc="http://www.rabbitmq.com/namespaces/ad-hoc/doc"> +<html xmlns:doc="http://www.rabbitmq.com/namespaces/ad-hoc/doc" xmlns="http://www.w3.org/1999/xhtml"> <head> <title><xsl:value-of select="document($original)/refentry/refnamediv/refname"/><xsl:if test="document($original)/refentry/refmeta/manvolnum">(<xsl:value-of select="document($original)/refentry/refmeta/manvolnum"/>)</xsl:if> manual page</title> </head> @@ -42,7 +42,7 @@ </xsl:choose> <p> For more general documentation, please see the - <a href="admin-guide.html">administrator's guide</a>. + <a href="../admin-guide.html">administrator's guide</a>. </p> <doc:toc class="compact"> diff --git a/docs/rabbitmq-server.1.xml b/docs/rabbitmq-server.1.xml index 921da4f1..03e76c79 100644 --- a/docs/rabbitmq-server.1.xml +++ b/docs/rabbitmq-server.1.xml @@ -98,18 +98,6 @@ Defaults to 5672. </listitem> </varlistentry> - <varlistentry> - <term>RABBITMQ_CLUSTER_CONFIG_FILE</term> - <listitem> - <para> -Defaults to <filename>/etc/rabbitmq/rabbitmq_cluster.config</filename>. If this file is -present it is used by the server to auto-configure a RabbitMQ cluster. -See the <ulink url="http://www.rabbitmq.com/clustering.html">clustering guide</ulink> -for details. - </para> - </listitem> - </varlistentry> - </variablelist> </refsect1> diff --git a/docs/rabbitmq-service.xml b/docs/rabbitmq-service.xml index 2b416e3e..e95f9889 100644 --- a/docs/rabbitmq-service.xml +++ b/docs/rabbitmq-service.xml @@ -193,18 +193,6 @@ manager. </varlistentry> <varlistentry> - <term>RABBITMQ_CLUSTER_CONFIG_FILE</term> - <listitem> - <para> -If this file is -present it is used by the server to auto-configure a RabbitMQ cluster. -See the <ulink url="http://www.rabbitmq.com/clustering.html">clustering guide</ulink> -for details. - </para> - </listitem> - </varlistentry> - - <varlistentry> <term>RABBITMQ_CONSOLE_LOG</term> <listitem> <para> diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index 3b7244c7..acb99bc8 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -589,7 +589,7 @@ </varlistentry> <varlistentry> - <term><cmdsynopsis><command>set_permissions</command> <arg choice="opt">-p <replaceable>vhostpath</replaceable></arg> <arg choice="opt">-s <replaceable>scope</replaceable></arg> <arg choice="req"><replaceable>user</replaceable></arg> <arg choice="req"><replaceable>conf</replaceable></arg> <arg choice="req"><replaceable>write</replaceable></arg> <arg choice="req"><replaceable>read</replaceable></arg></cmdsynopsis></term> + <term><cmdsynopsis><command>set_permissions</command> <arg choice="opt">-p <replaceable>vhostpath</replaceable></arg> <arg choice="req"><replaceable>user</replaceable></arg> <arg choice="req"><replaceable>conf</replaceable></arg> <arg choice="req"><replaceable>write</replaceable></arg> <arg choice="req"><replaceable>read</replaceable></arg></cmdsynopsis></term> <listitem> <variablelist> <varlistentry> @@ -597,16 +597,6 @@ <listitem><para>The name of the virtual host to which to grant the user access, defaulting to <command>/</command>.</para></listitem> </varlistentry> <varlistentry> - <term>scope</term> - <listitem><para>Scope of the permissions: either - <command>client</command> (the default) or - <command>all</command>. This determines whether - permissions are checked for server-generated resource - names (<command>all</command>) or only for - client-specified resource names - (<command>client</command>).</para></listitem> - </varlistentry> - <varlistentry> <term>user</term> <listitem><para>The name of the user to grant access to the specified virtual host.</para></listitem> </varlistentry> diff --git a/docs/remove-namespaces.xsl b/docs/remove-namespaces.xsl index 58a1e826..7f7f3c12 100644 --- a/docs/remove-namespaces.xsl +++ b/docs/remove-namespaces.xsl @@ -1,13 +1,14 @@ <?xml version='1.0'?> <xsl:stylesheet xmlns:xsl="http://www.w3.org/1999/XSL/Transform" xmlns:doc="http://www.rabbitmq.com/namespaces/ad-hoc/doc" + xmlns="http://www.w3.org/1999/xhtml" version='1.0'> <xsl:output method="xml" /> <!-- Copy every element through with local name only --> <xsl:template match="*"> - <xsl:element name="{local-name()}"> + <xsl:element name="{local-name()}" namespace=""> <xsl:apply-templates select="@*|node()"/> </xsl:element> </xsl:template> diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in index 4be09c5a..17d05a99 100644 --- a/ebin/rabbit_app.in +++ b/ebin/rabbit_app.in @@ -29,4 +29,6 @@ {default_user_is_admin, true}, {default_vhost, <<"/">>}, {default_permissions, [<<".*">>, <<".*">>, <<".*">>]}, + {cluster_nodes, []}, + {server_properties, []}, {collect_statistics, none}]}]}. diff --git a/generate_deps b/generate_deps index 29587b5a..ddfca816 100644 --- a/generate_deps +++ b/generate_deps @@ -2,18 +2,21 @@ %% -*- erlang -*- -mode(compile). -main([IncludeDir, ErlDir, EbinDir, TargetFile]) -> - ErlDirContents = filelib:wildcard("*.erl", ErlDir), - ErlFiles = [filename:join(ErlDir, FileName) || FileName <- ErlDirContents], +%% We expect the list of Erlang source and header files to arrive on +%% stdin, with the entries colon-separated. +main([TargetFile, EbinDir]) -> + ErlsAndHrls = [ string:strip(S,left) || + S <- string:tokens(io:get_line(""), ":\n")], + ErlFiles = [F || F <- ErlsAndHrls, lists:suffix(".erl", F)], Modules = sets:from_list( [list_to_atom(filename:basename(FileName, ".erl")) || - FileName <- ErlDirContents]), - Headers = sets:from_list( - [filename:join(IncludeDir, FileName) || - FileName <- filelib:wildcard("*.hrl", IncludeDir)]), + FileName <- ErlFiles]), + HrlFiles = [F || F <- ErlsAndHrls, lists:suffix(".hrl", F)], + IncludeDirs = lists:usort([filename:dirname(Path) || Path <- HrlFiles]), + Headers = sets:from_list(HrlFiles), Deps = lists:foldl( fun (Path, Deps1) -> - dict:store(Path, detect_deps(IncludeDir, EbinDir, + dict:store(Path, detect_deps(IncludeDirs, EbinDir, Modules, Headers, Path), Deps1) end, dict:new(), ErlFiles), @@ -33,8 +36,8 @@ main([IncludeDir, ErlDir, EbinDir, TargetFile]) -> ok = file:sync(Hdl), ok = file:close(Hdl). -detect_deps(IncludeDir, EbinDir, Modules, Headers, Path) -> - {ok, Forms} = epp:parse_file(Path, [IncludeDir], [{use_specs, true}]), +detect_deps(IncludeDirs, EbinDir, Modules, Headers, Path) -> + {ok, Forms} = epp:parse_file(Path, IncludeDirs, [{use_specs, true}]), lists:foldl( fun ({attribute, _LineNumber, Attribute, Behaviour}, Deps) when Attribute =:= behaviour orelse Attribute =:= behavior -> diff --git a/include/rabbit.hrl b/include/rabbit.hrl index af6e257a..a1987fb2 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -29,8 +29,8 @@ %% Contributor(s): ______________________________________. %% --record(user, {username, password, is_admin}). --record(permission, {scope, configure, write, read}). +-record(user, {username, password_hash, is_admin}). +-record(permission, {configure, write, read}). -record(user_vhost, {username, virtual_host}). -record(user_permission, {user_vhost, permission}). @@ -63,7 +63,7 @@ -record(binding, {source, key, destination, args = []}). -record(reverse_binding, {destination, key, source, args = []}). --record(listener, {node, protocol, host, port}). +-record(listener, {node, protocol, host, ip_address, port}). -record(basic_message, {exchange_name, routing_key, content, guid, is_persistent}). @@ -74,6 +74,8 @@ -record(event, {type, props, timestamp}). +-record(message_properties, {expiry}). + %%---------------------------------------------------------------------------- -define(COPYRIGHT_MESSAGE, "Copyright (C) 2007-2010 LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd."). diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl index 38c6f939..20230b24 100644 --- a/include/rabbit_backing_queue_spec.hrl +++ b/include/rabbit_backing_queue_spec.hrl @@ -31,12 +31,15 @@ -type(fetch_result() :: ('empty' | - %% Message, IsDelivered, AckTag, RemainingLen + %% Message, IsDelivered, AckTag, Remaining_Len {rabbit_types:basic_message(), boolean(), ack(), non_neg_integer()})). -type(is_durable() :: boolean()). -type(attempt_recovery() :: boolean()). -type(purged_msg_count() :: non_neg_integer()). -type(ack_required() :: boolean()). +-type(message_properties_transformer() :: + fun ((rabbit_types:message_properties()) + -> rabbit_types:message_properties())). -spec(start/1 :: ([rabbit_amqqueue:name()]) -> 'ok'). -spec(stop/0 :: () -> 'ok'). @@ -45,19 +48,25 @@ -spec(terminate/1 :: (state()) -> state()). -spec(delete_and_terminate/1 :: (state()) -> state()). -spec(purge/1 :: (state()) -> {purged_msg_count(), state()}). --spec(publish/2 :: (rabbit_types:basic_message(), state()) -> state()). --spec(publish_delivered/3 :: - (ack_required(), rabbit_types:basic_message(), state()) -> - {ack(), state()}). +-spec(publish/3 :: (rabbit_types:basic_message(), + rabbit_types:message_properties(), state()) -> state()). +-spec(publish_delivered/4 :: (ack_required(), rabbit_types:basic_message(), + rabbit_types:message_properties(), state()) + -> {ack(), state()}). +-spec(dropwhile/2 :: + (fun ((rabbit_types:message_properties()) -> boolean()), state()) + -> state()). -spec(fetch/2 :: (ack_required(), state()) -> {fetch_result(), state()}). -spec(ack/2 :: ([ack()], state()) -> state()). --spec(tx_publish/3 :: (rabbit_types:txn(), rabbit_types:basic_message(), - state()) -> state()). +-spec(tx_publish/4 :: (rabbit_types:txn(), rabbit_types:basic_message(), + rabbit_types:message_properties(), state()) -> state()). -spec(tx_ack/3 :: (rabbit_types:txn(), [ack()], state()) -> state()). -spec(tx_rollback/2 :: (rabbit_types:txn(), state()) -> {[ack()], state()}). --spec(tx_commit/3 :: (rabbit_types:txn(), fun (() -> any()), state()) -> - {[ack()], state()}). --spec(requeue/2 :: ([ack()], state()) -> state()). +-spec(tx_commit/4 :: + (rabbit_types:txn(), fun (() -> any()), + message_properties_transformer(), state()) -> {[ack()], state()}). +-spec(requeue/3 :: ([ack()], message_properties_transformer(), state()) + -> state()). -spec(len/1 :: (state()) -> non_neg_integer()). -spec(is_empty/1 :: (state()) -> boolean()). -spec(set_ram_duration_target/2 :: diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec index eb0a2a51..209a90ee 100644 --- a/packaging/RPMS/Fedora/rabbitmq-server.spec +++ b/packaging/RPMS/Fedora/rabbitmq-server.spec @@ -127,6 +127,9 @@ done rm -rf %{buildroot} %changelog +* Tue Oct 19 2010 vlad@rabbitmq.com 2.1.1-1 +- New Upstream Release + * Tue Sep 14 2010 marek@rabbitmq.com 2.1.0-1 - New Upstream Release diff --git a/packaging/debs/Debian/debian/changelog b/packaging/debs/Debian/debian/changelog index 9927cfbc..e81fda24 100644 --- a/packaging/debs/Debian/debian/changelog +++ b/packaging/debs/Debian/debian/changelog @@ -1,3 +1,9 @@ +rabbitmq-server (2.1.1-1) lucid; urgency=low + + * New Upstream Release + + -- Vlad Alexandru Ionescu <vlad@rabbitmq.com> Tue, 19 Oct 2010 17:20:10 +0100 + rabbitmq-server (2.1.0-1) lucid; urgency=low * New Upstream Release diff --git a/packaging/macports/Portfile.in b/packaging/macports/Portfile.in index 6e67d0cb..ce6b1e34 100644 --- a/packaging/macports/Portfile.in +++ b/packaging/macports/Portfile.in @@ -85,7 +85,7 @@ post-destroot { reinplace -E "s:(/etc/rabbitmq/rabbitmq.conf):${prefix}\\1:g" \ ${realsbin}/rabbitmq-env - foreach var {CONFIG_FILE CLUSTER_CONFIG_FILE LOG_BASE MNESIA_BASE PIDS_FILE} { + foreach var {CONFIG_FILE LOG_BASE MNESIA_BASE PIDS_FILE} { reinplace -E "s:^($var)=/:\\1=${prefix}/:" \ ${realsbin}/rabbitmq-multi \ ${realsbin}/rabbitmq-server \ diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server index 8e26663a..0eb7092d 100755 --- a/scripts/rabbitmq-server +++ b/scripts/rabbitmq-server @@ -35,7 +35,6 @@ NODENAME=rabbit@${HOSTNAME%%.*} SERVER_ERL_ARGS="+K true +A30 +P 1048576 \ -kernel inet_default_listen_options [{nodelay,true}] \ -kernel inet_default_connect_options [{nodelay,true}]" -CLUSTER_CONFIG_FILE=/etc/rabbitmq/rabbitmq_cluster.config CONFIG_FILE=/etc/rabbitmq/rabbitmq LOG_BASE=/var/log/rabbitmq MNESIA_BASE=/var/lib/rabbitmq/mnesia @@ -59,7 +58,6 @@ else fi [ "x" = "x$RABBITMQ_NODENAME" ] && RABBITMQ_NODENAME=${NODENAME} [ "x" = "x$RABBITMQ_SERVER_ERL_ARGS" ] && RABBITMQ_SERVER_ERL_ARGS=${SERVER_ERL_ARGS} -[ "x" = "x$RABBITMQ_CLUSTER_CONFIG_FILE" ] && RABBITMQ_CLUSTER_CONFIG_FILE=${CLUSTER_CONFIG_FILE} [ "x" = "x$RABBITMQ_CONFIG_FILE" ] && RABBITMQ_CONFIG_FILE=${CONFIG_FILE} [ "x" = "x$RABBITMQ_LOG_BASE" ] && RABBITMQ_LOG_BASE=${LOG_BASE} [ "x" = "x$RABBITMQ_MNESIA_BASE" ] && RABBITMQ_MNESIA_BASE=${MNESIA_BASE} @@ -68,6 +66,9 @@ fi [ "x" = "x$RABBITMQ_MNESIA_DIR" ] && RABBITMQ_MNESIA_DIR=${MNESIA_DIR} [ "x" = "x$RABBITMQ_MNESIA_DIR" ] && RABBITMQ_MNESIA_DIR=${RABBITMQ_MNESIA_BASE}/${RABBITMQ_NODENAME} +[ "x" = "x$RABBITMQ_PLUGINS_EXPAND_DIR" ] && RABBITMQ_PLUGINS_EXPAND_DIR=${PLUGINS_EXPAND_DIR} +[ "x" = "x$RABBITMQ_PLUGINS_EXPAND_DIR" ] && RABBITMQ_PLUGINS_EXPAND_DIR=${RABBITMQ_MNESIA_BASE}/${RABBITMQ_NODENAME}-plugins-expand + [ "x" = "x$RABBITMQ_PLUGINS_DIR" ] && RABBITMQ_PLUGINS_DIR="${RABBITMQ_HOME}/plugins" ## Log rotation @@ -89,14 +90,14 @@ if [ "x" = "x$RABBITMQ_NODE_ONLY" ]; then if erl \ -pa "$RABBITMQ_EBIN_ROOT" \ -rabbit plugins_dir "\"$RABBITMQ_PLUGINS_DIR\"" \ - -rabbit plugins_expand_dir "\"${RABBITMQ_MNESIA_DIR}/plugins-scratch\"" \ + -rabbit plugins_expand_dir "\"$RABBITMQ_PLUGINS_EXPAND_DIR\"" \ -rabbit rabbit_ebin "\"$RABBITMQ_EBIN_ROOT\"" \ -noinput \ -hidden \ -s rabbit_plugin_activator \ -extra "$@" then - RABBITMQ_BOOT_FILE="${RABBITMQ_MNESIA_DIR}/plugins-scratch/rabbit" + RABBITMQ_BOOT_FILE="${RABBITMQ_PLUGINS_EXPAND_DIR}/rabbit" RABBITMQ_EBIN_PATH="" else exit 1 diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat index 5bcbc6ba..bd4120fa 100644 --- a/scripts/rabbitmq-server.bat +++ b/scripts/rabbitmq-server.bat @@ -107,6 +107,10 @@ if "!RABBITMQ_MNESIA_DIR!"=="" ( set RABBITMQ_MNESIA_DIR=!RABBITMQ_MNESIA_BASE!/!RABBITMQ_NODENAME!-mnesia
)
+if "!RABBITMQ_PLUGINS_EXPAND_DIR!"=="" (
+ set RABBITMQ_PLUGINS_EXPAND_DIR=!RABBITMQ_MNESIA_BASE!/!RABBITMQ_NODENAME!-plugins-expand
+)
+
set RABBITMQ_PLUGINS_DIR=!TDP0!..\plugins
set RABBITMQ_EBIN_ROOT=!TDP0!..\ebin
@@ -115,7 +119,7 @@ set RABBITMQ_EBIN_ROOT=!TDP0!..\ebin -noinput -hidden ^
-s rabbit_plugin_activator ^
-rabbit plugins_dir \""!RABBITMQ_PLUGINS_DIR:\=/!"\" ^
--rabbit plugins_expand_dir \""!RABBITMQ_MNESIA_DIR:\=/!/plugins-scratch"\" ^
+-rabbit plugins_expand_dir \""!RABBITMQ_PLUGINS_EXPAND_DIR:\=/!"\" ^
-rabbit rabbit_ebin \""!RABBITMQ_EBIN_ROOT:\=/!"\" ^
-extra !STAR!
@@ -165,7 +169,6 @@ if not "!RABBITMQ_NODE_IP_ADDRESS!"=="" ( -os_mon start_disksup false ^
-os_mon start_memsup false ^
-mnesia dir \""!RABBITMQ_MNESIA_DIR!"\" ^
-!CLUSTER_CONFIG! ^
!RABBITMQ_SERVER_START_ARGS! ^
!STAR!
diff --git a/scripts/rabbitmq-service.bat b/scripts/rabbitmq-service.bat index 4b3961d4..ff25b146 100644 --- a/scripts/rabbitmq-service.bat +++ b/scripts/rabbitmq-service.bat @@ -140,6 +140,9 @@ if "!RABBITMQ_MNESIA_DIR!"=="" ( set RABBITMQ_MNESIA_DIR=!RABBITMQ_MNESIA_BASE!/!RABBITMQ_NODENAME!-mnesia
)
+if "!RABBITMQ_PLUGINS_EXPAND_DIR!"=="" (
+ set RABBITMQ_PLUGINS_EXPAND_DIR=!RABBITMQ_MNESIA_BASE!/!RABBITMQ_NODENAME!-plugins-expand
+)
if "!P1!" == "install" goto INSTALL_SERVICE
for %%i in (start stop disable enable list remove) do if "%%i" == "!P1!" goto MODIFY_SERVICE
@@ -185,7 +188,7 @@ set RABBITMQ_EBIN_ROOT=!TDP0!..\ebin -noinput -hidden ^
-s rabbit_plugin_activator ^
-rabbit plugins_dir \""!RABBITMQ_PLUGINS_DIR:\=/!"\" ^
--rabbit plugins_expand_dir \""!RABBITMQ_MNESIA_DIR:\=/!/plugins-scratch"\" ^
+-rabbit plugins_expand_dir \""!RABBITMQ_PLUGINS_EXPAND_DIR:\=/!"\" ^
-rabbit rabbit_ebin \""!RABBITMQ_EBIN_ROOT:\=/!"\" ^
-extra !STAR!
@@ -232,7 +235,6 @@ set ERLANG_SERVICE_ARGUMENTS= ^ -os_mon start_disksup false ^
-os_mon start_memsup false ^
-mnesia dir \""!RABBITMQ_MNESIA_DIR!"\" ^
-!CLUSTER_CONFIG! ^
!RABBITMQ_SERVER_START_ARGS! ^
!STAR!
diff --git a/src/delegate.erl b/src/delegate.erl index c8aa3092..11abe73b 100644 --- a/src/delegate.erl +++ b/src/delegate.erl @@ -35,7 +35,7 @@ -behaviour(gen_server2). --export([start_link/1, invoke_no_result/2, invoke/2, process_count/0]). +-export([start_link/2, invoke_no_result/2, invoke/2, process_count/0]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -44,7 +44,8 @@ -ifdef(use_specs). --spec(start_link/1 :: (non_neg_integer()) -> {'ok', pid()} | {'error', any()}). +-spec(start_link/2 :: + (atom(), non_neg_integer()) -> {'ok', pid()} | {'error', any()}). -spec(invoke_no_result/2 :: (pid() | [pid()], fun ((pid()) -> any())) -> 'ok'). -spec(invoke/2 :: (pid() | [pid()], fun ((pid()) -> A)) -> A). @@ -60,8 +61,8 @@ %%---------------------------------------------------------------------------- -start_link(Hash) -> - gen_server2:start_link({local, server(Hash)}, ?MODULE, [], []). +start_link(Prefix, Hash) -> + gen_server2:start_link({local, server(Prefix, Hash)}, ?MODULE, [], []). invoke(Pid, Fun) when is_pid(Pid) -> [Res] = invoke_per_node(split_delegate_per_node([Pid]), Fun), @@ -147,7 +148,8 @@ delegate_per_remote_node(NodePids, Fun, DelegateFun) -> local_server(Node) -> case get({delegate_local_server_name, Node}) of undefined -> - Name = server(erlang:phash2({self(), Node}, process_count())), + Name = server(outgoing, + erlang:phash2({self(), Node}, process_count())), put({delegate_local_server_name, Node}, Name), Name; Name -> Name @@ -160,17 +162,20 @@ remote_server(Node) -> {badrpc, _} -> %% Have to return something, if we're just casting %% then we don't want to blow up - server(1); + server(incoming, 1); Count -> - Name = server(erlang:phash2({self(), Node}, Count)), + Name = server(incoming, + erlang:phash2({self(), Node}, Count)), put({delegate_remote_server_name, Node}, Name), Name end; Name -> Name end. -server(Hash) -> - list_to_atom("delegate_process_" ++ integer_to_list(Hash)). +server(Prefix, Hash) -> + list_to_atom("delegate_" ++ + atom_to_list(Prefix) ++ "_" ++ + integer_to_list(Hash)). safe_invoke(Pids, Fun) when is_list(Pids) -> [safe_invoke(Pid, Fun) || Pid <- Pids]; @@ -201,7 +206,7 @@ handle_cast({thunk, Thunk}, State) -> {noreply, State, hibernate}. handle_info(_Info, State) -> - {noreply, State}. + {noreply, State, hibernate}. terminate(_Reason, _State) -> ok. diff --git a/src/delegate_sup.erl b/src/delegate_sup.erl index ff303ee2..544546f1 100644 --- a/src/delegate_sup.erl +++ b/src/delegate_sup.erl @@ -55,9 +55,11 @@ start_link() -> %%---------------------------------------------------------------------------- init(_Args) -> - {ok, {{one_for_one, 10, 10}, - [{Hash, {delegate, start_link, [Hash]}, - transient, 16#ffffffff, worker, [delegate]} || - Hash <- lists:seq(0, delegate:process_count() - 1)]}}. + {ok, {{one_for_one, 10, 10}, specs(incoming) ++ specs(outgoing)}}. + +specs(Prefix) -> + [{{Prefix, Hash}, {delegate, start_link, [Prefix, Hash]}, + transient, 16#ffffffff, worker, [delegate]} || + Hash <- lists:seq(0, delegate:process_count() - 1)]. %%---------------------------------------------------------------------------- diff --git a/src/gen_server2.erl b/src/gen_server2.erl index 230d1f2a..6e02b23e 100644 --- a/src/gen_server2.erl +++ b/src/gen_server2.erl @@ -177,7 +177,7 @@ format_status/2]). %% Internal exports --export([init_it/6, print_event/3]). +-export([init_it/6]). -import(error_logger, [format/2]). @@ -192,10 +192,13 @@ -ifdef(use_specs). --spec(handle_common_termination/3 :: - (any(), atom(), #gs2_state{}) -> no_return()). +-type(gs2_state() :: #gs2_state{}). --spec(hibernate/1 :: (#gs2_state{}) -> no_return()). +-spec(handle_common_termination/3 :: + (any(), atom(), gs2_state()) -> no_return()). +-spec(hibernate/1 :: (gs2_state()) -> no_return()). +-spec(pre_hibernate/1 :: (gs2_state()) -> no_return()). +-spec(system_terminate/4 :: (_, _, _, gs2_state()) -> no_return()). -endif. @@ -612,7 +615,7 @@ process_msg(Msg, _Msg when Debug =:= [] -> handle_msg(Msg, GS2State); _Msg -> - Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, + Debug1 = sys:handle_debug(Debug, fun print_event/3, Name, {in, Msg}), handle_msg(Msg, GS2State #gs2_state { debug = Debug1 }) end. @@ -838,13 +841,13 @@ handle_msg({'$gen_call', From, Msg}, GS2State = #gs2_state { mod = Mod, time = Time1, debug = Debug1}); {noreply, NState} -> - Debug1 = common_debug(Debug, {?MODULE, print_event}, Name, + Debug1 = common_debug(Debug, fun print_event/3, Name, {noreply, NState}), loop(GS2State #gs2_state {state = NState, time = infinity, debug = Debug1}); {noreply, NState, Time1} -> - Debug1 = common_debug(Debug, {?MODULE, print_event}, Name, + Debug1 = common_debug(Debug, fun print_event/3, Name, {noreply, NState}), loop(GS2State #gs2_state {state = NState, time = Time1, @@ -866,13 +869,13 @@ handle_common_reply(Reply, Msg, GS2State = #gs2_state { name = Name, debug = Debug}) -> case Reply of {noreply, NState} -> - Debug1 = common_debug(Debug, {?MODULE, print_event}, Name, + Debug1 = common_debug(Debug, fun print_event/3, Name, {noreply, NState}), loop(GS2State #gs2_state { state = NState, time = infinity, debug = Debug1 }); {noreply, NState, Time1} -> - Debug1 = common_debug(Debug, {?MODULE, print_event}, Name, + Debug1 = common_debug(Debug, fun print_event/3, Name, {noreply, NState}), loop(GS2State #gs2_state { state = NState, time = Time1, @@ -894,7 +897,7 @@ handle_common_termination(Reply, Msg, GS2State) -> reply(Name, {To, Tag}, Reply, State, Debug) -> reply({To, Tag}, Reply), sys:handle_debug( - Debug, {?MODULE, print_event}, Name, {out, Reply, To, State}). + Debug, fun print_event/3, Name, {out, Reply, To, State}). %%----------------------------------------------------------------- @@ -903,10 +906,6 @@ reply(Name, {To, Tag}, Reply, State, Debug) -> system_continue(Parent, Debug, GS2State) -> loop(GS2State #gs2_state { parent = Parent, debug = Debug }). --ifdef(use_specs). --spec system_terminate(_, _, _, [_]) -> no_return(). --endif. - system_terminate(Reason, _Parent, Debug, GS2State) -> terminate(Reason, [], GS2State #gs2_state { debug = Debug }). diff --git a/src/rabbit.erl b/src/rabbit.erl index 8c36a9f0..a1dd2c2e 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -301,49 +301,38 @@ run_boot_step({StepName, Attributes}) -> ok end. -module_attributes(Module) -> - case catch Module:module_info(attributes) of - {'EXIT', {undef, [{Module, module_info, _} | _]}} -> - io:format("WARNING: module ~p not found, so not scanned for boot steps.~n", - [Module]), - []; - {'EXIT', Reason} -> - exit(Reason); - V -> - V - end. - boot_steps() -> - AllApps = [App || {App, _, _} <- application:loaded_applications()], - Modules = lists:usort( - lists:append([Modules - || {ok, Modules} <- - [application:get_key(App, modules) - || App <- AllApps]])), - UnsortedSteps = - lists:flatmap(fun (Module) -> - [{StepName, Attributes} - || {rabbit_boot_step, [{StepName, Attributes}]} - <- module_attributes(Module)] - end, Modules), - sort_boot_steps(UnsortedSteps). + sort_boot_steps(rabbit_misc:all_module_attributes(rabbit_boot_step)). + +vertices(_Module, Steps) -> + [{StepName, {StepName, Atts}} || {StepName, Atts} <- Steps]. + +edges(_Module, Steps) -> + [case Key of + requires -> {StepName, OtherStep}; + enables -> {OtherStep, StepName} + end || {StepName, Atts} <- Steps, + {Key, OtherStep} <- Atts, + Key =:= requires orelse Key =:= enables]. + +graph_build_error({vertex, duplicate, StepName}) -> + boot_error("Duplicate boot step name: ~w~n", [StepName]); +graph_build_error({edge, Reason, From, To}) -> + boot_error( + "Could not add boot step dependency of ~w on ~w:~n~s", + [To, From, + case Reason of + {bad_vertex, V} -> + io_lib:format("Boot step not registered: ~w~n", [V]); + {bad_edge, [First | Rest]} -> + [io_lib:format("Cyclic dependency: ~w", [First]), + [io_lib:format(" depends on ~w", [Next]) || Next <- Rest], + io_lib:format(" depends on ~w~n", [First])] + end]). sort_boot_steps(UnsortedSteps) -> - G = digraph:new([acyclic]), - - %% Add vertices, with duplicate checking. - [case digraph:vertex(G, StepName) of - false -> digraph:add_vertex(G, StepName, Step); - _ -> boot_error("Duplicate boot step name: ~w~n", [StepName]) - end || Step = {StepName, _Attrs} <- UnsortedSteps], - - %% Add edges, detecting cycles and missing vertices. - lists:foreach(fun ({StepName, Attributes}) -> - [add_boot_step_dep(G, StepName, PrecedingStepName) - || {requires, PrecedingStepName} <- Attributes], - [add_boot_step_dep(G, SucceedingStepName, StepName) - || {enables, SucceedingStepName} <- Attributes] - end, UnsortedSteps), + G = rabbit_misc:build_acyclic_graph( + fun vertices/2, fun edges/2, fun graph_build_error/1, UnsortedSteps), %% Use topological sort to find a consistent ordering (if there is %% one, otherwise fail). @@ -365,24 +354,6 @@ sort_boot_steps(UnsortedSteps) -> [MissingFunctions]) end. -add_boot_step_dep(G, RunsSecond, RunsFirst) -> - case digraph:add_edge(G, RunsSecond, RunsFirst) of - {error, Reason} -> - boot_error("Could not add boot step dependency of ~w on ~w:~n~s", - [RunsSecond, RunsFirst, - case Reason of - {bad_vertex, V} -> - io_lib:format("Boot step not registered: ~w~n", [V]); - {bad_edge, [First | Rest]} -> - [io_lib:format("Cyclic dependency: ~w", [First]), - [io_lib:format(" depends on ~w", [Next]) - || Next <- Rest], - io_lib:format(" depends on ~w~n", [First])] - end]); - _ -> - ok - end. - %%--------------------------------------------------------------------------- log_location(Type) -> diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl index 73fd6f0e..3388e5e7 100644 --- a/src/rabbit_access_control.erl +++ b/src/rabbit_access_control.erl @@ -33,12 +33,13 @@ -include_lib("stdlib/include/qlc.hrl"). -include("rabbit.hrl"). --export([check_login/2, user_pass_login/2, +-export([check_login/2, user_pass_login/2, check_user_pass_login/2, check_vhost_access/2, check_resource_access/3]). -export([add_user/2, delete_user/1, change_password/2, set_admin/1, clear_admin/1, list_users/0, lookup_user/1]). +-export([change_password_hash/2]). -export([add_vhost/1, delete_vhost/1, vhost_exists/1, list_vhosts/0]). --export([set_permissions/5, set_permissions/6, clear_permissions/2, +-export([set_permissions/5, clear_permissions/2, list_permissions/0, list_vhost_permissions/1, list_user_permissions/1, list_user_vhost_permissions/2]). @@ -46,21 +47,22 @@ -ifdef(use_specs). --export_type([username/0, password/0]). +-export_type([username/0, password/0, password_hash/0]). -type(permission_atom() :: 'configure' | 'read' | 'write'). -type(username() :: binary()). -type(password() :: binary()). +-type(password_hash() :: binary()). -type(regexp() :: binary()). --type(scope() :: binary()). --type(scope_atom() :: 'client' | 'all'). - -spec(check_login/2 :: (binary(), binary()) -> rabbit_types:user() | rabbit_types:channel_exit()). -spec(user_pass_login/2 :: (username(), password()) -> rabbit_types:user() | rabbit_types:channel_exit()). +-spec(check_user_pass_login/2 :: + (username(), password()) + -> {'ok', rabbit_types:user()} | 'refused'). -spec(check_vhost_access/2 :: (rabbit_types:user(), rabbit_types:vhost()) -> 'ok' | rabbit_types:channel_exit()). @@ -70,9 +72,10 @@ -spec(add_user/2 :: (username(), password()) -> 'ok'). -spec(delete_user/1 :: (username()) -> 'ok'). -spec(change_password/2 :: (username(), password()) -> 'ok'). +-spec(change_password_hash/2 :: (username(), password_hash()) -> 'ok'). -spec(set_admin/1 :: (username()) -> 'ok'). -spec(clear_admin/1 :: (username()) -> 'ok'). --spec(list_users/0 :: () -> [username()]). +-spec(list_users/0 :: () -> [{username(), boolean()}]). -spec(lookup_user/1 :: (username()) -> rabbit_types:ok(rabbit_types:user()) | rabbit_types:error('not_found')). @@ -82,21 +85,15 @@ -spec(list_vhosts/0 :: () -> [rabbit_types:vhost()]). -spec(set_permissions/5 ::(username(), rabbit_types:vhost(), regexp(), regexp(), regexp()) -> 'ok'). --spec(set_permissions/6 ::(scope(), username(), rabbit_types:vhost(), - regexp(), regexp(), regexp()) -> 'ok'). -spec(clear_permissions/2 :: (username(), rabbit_types:vhost()) -> 'ok'). -spec(list_permissions/0 :: - () -> [{username(), rabbit_types:vhost(), regexp(), regexp(), regexp(), - scope_atom()}]). + () -> [{username(), rabbit_types:vhost(), regexp(), regexp(), regexp()}]). -spec(list_vhost_permissions/1 :: - (rabbit_types:vhost()) -> [{username(), regexp(), regexp(), regexp(), - scope_atom()}]). + (rabbit_types:vhost()) -> [{username(), regexp(), regexp(), regexp()}]). -spec(list_user_permissions/1 :: - (username()) -> [{rabbit_types:vhost(), regexp(), regexp(), regexp(), - scope_atom()}]). + (username()) -> [{rabbit_types:vhost(), regexp(), regexp(), regexp()}]). -spec(list_user_vhost_permissions/2 :: - (username(), rabbit_types:vhost()) -> [{regexp(), regexp(), regexp(), - scope_atom()}]). + (username(), rabbit_types:vhost()) -> [{regexp(), regexp(), regexp()}]). -endif. @@ -133,17 +130,23 @@ check_login(Mechanism, _Response) -> user_pass_login(User, Pass) -> ?LOGDEBUG("Login with user ~p pass ~p~n", [User, Pass]), + case check_user_pass_login(User, Pass) of + refused -> + rabbit_misc:protocol_error( + access_refused, "login refused for user '~s'", [User]); + {ok, U} -> + U + end. + +check_user_pass_login(User, Pass) -> case lookup_user(User) of {ok, U} -> - if - Pass == U#user.password -> U; - true -> - rabbit_misc:protocol_error( - access_refused, "login refused for user '~s'", [User]) + case check_password(Pass, U#user.password_hash) of + true -> {ok, U}; + _ -> refused end; {error, not_found} -> - rabbit_misc:protocol_error( - access_refused, "login refused for user '~s'", [User]) + refused end. internal_lookup_vhost_access(Username, VHostPath) -> @@ -188,20 +191,15 @@ check_resource_access(Username, [] -> false; [#user_permission{permission = P}] -> - case {Name, P} of - {<<"amq.gen",_/binary>>, #permission{scope = client}} -> - true; - _ -> - PermRegexp = - case element(permission_index(Permission), P) of - %% <<"^$">> breaks Emacs' erlang mode - <<"">> -> <<$^, $$>>; - RE -> RE - end, - case re:run(Name, PermRegexp, [{capture, none}]) of - match -> true; - nomatch -> false - end + PermRegexp = + case element(permission_index(Permission), P) of + %% <<"^$">> breaks Emacs' erlang mode + <<"">> -> <<$^, $$>>; + RE -> RE + end, + case re:run(Name, PermRegexp, [{capture, none}]) of + match -> true; + nomatch -> false end end, if Res -> ok; @@ -217,7 +215,8 @@ add_user(Username, Password) -> [] -> ok = mnesia:write(rabbit_user, #user{username = Username, - password = Password, + password_hash = + hash_password(Password), is_admin = false}, write); _ -> @@ -248,12 +247,33 @@ delete_user(Username) -> R. change_password(Username, Password) -> + change_password_hash(Username, hash_password(Password)). + +change_password_hash(Username, PasswordHash) -> R = update_user(Username, fun(User) -> - User#user{password = Password} + User#user{ password_hash = PasswordHash } end), rabbit_log:info("Changed password for user ~p~n", [Username]), R. +hash_password(Cleartext) -> + Salt = make_salt(), + Hash = salted_md5(Salt, Cleartext), + <<Salt/binary, Hash/binary>>. + +check_password(Cleartext, <<Salt:4/binary, Hash/binary>>) -> + Hash =:= salted_md5(Salt, Cleartext). + +make_salt() -> + {A1,A2,A3} = now(), + random:seed(A1, A2, A3), + Salt = random:uniform(16#ffffffff), + <<Salt:32>>. + +salted_md5(Salt, Cleartext) -> + Salted = <<Salt/binary, Cleartext/binary>>, + erlang:md5(Salted). + set_admin(Username) -> set_admin(Username, true). @@ -334,7 +354,7 @@ internal_delete_vhost(VHostPath) -> ok = rabbit_exchange:delete(Name, false) end, rabbit_exchange:list(VHostPath)), - lists:foreach(fun ({Username, _, _, _, _}) -> + lists:foreach(fun ({Username, _, _, _}) -> ok = clear_permissions(Username, VHostPath) end, list_vhost_permissions(VHostPath)), @@ -355,16 +375,7 @@ validate_regexp(RegexpBin) -> end. set_permissions(Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm) -> - set_permissions(<<"client">>, Username, VHostPath, ConfigurePerm, - WritePerm, ReadPerm). - -set_permissions(ScopeBin, Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm) -> lists:map(fun validate_regexp/1, [ConfigurePerm, WritePerm, ReadPerm]), - Scope = case ScopeBin of - <<"client">> -> client; - <<"all">> -> all; - _ -> throw({error, {invalid_scope, ScopeBin}}) - end, rabbit_misc:execute_mnesia_transaction( rabbit_misc:with_user_and_vhost( Username, VHostPath, @@ -374,7 +385,6 @@ set_permissions(ScopeBin, Username, VHostPath, ConfigurePerm, WritePerm, ReadPer username = Username, virtual_host = VHostPath}, permission = #permission{ - scope = Scope, configure = ConfigurePerm, write = WritePerm, read = ReadPerm}}, @@ -393,35 +403,34 @@ clear_permissions(Username, VHostPath) -> end)). list_permissions() -> - [{Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm, Scope} || - {Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm, Scope} <- + [{Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm} || + {Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm} <- list_permissions(match_user_vhost('_', '_'))]. list_vhost_permissions(VHostPath) -> - [{Username, ConfigurePerm, WritePerm, ReadPerm, Scope} || - {Username, _, ConfigurePerm, WritePerm, ReadPerm, Scope} <- + [{Username, ConfigurePerm, WritePerm, ReadPerm} || + {Username, _, ConfigurePerm, WritePerm, ReadPerm} <- list_permissions(rabbit_misc:with_vhost( VHostPath, match_user_vhost('_', VHostPath)))]. list_user_permissions(Username) -> - [{VHostPath, ConfigurePerm, WritePerm, ReadPerm, Scope} || - {_, VHostPath, ConfigurePerm, WritePerm, ReadPerm, Scope} <- + [{VHostPath, ConfigurePerm, WritePerm, ReadPerm} || + {_, VHostPath, ConfigurePerm, WritePerm, ReadPerm} <- list_permissions(rabbit_misc:with_user( Username, match_user_vhost(Username, '_')))]. list_user_vhost_permissions(Username, VHostPath) -> - [{ConfigurePerm, WritePerm, ReadPerm, Scope} || - {_, _, ConfigurePerm, WritePerm, ReadPerm, Scope} <- + [{ConfigurePerm, WritePerm, ReadPerm} || + {_, _, ConfigurePerm, WritePerm, ReadPerm} <- list_permissions(rabbit_misc:with_user_and_vhost( Username, VHostPath, match_user_vhost(Username, VHostPath)))]. list_permissions(QueryThunk) -> - [{Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm, Scope} || + [{Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm} || #user_permission{user_vhost = #user_vhost{username = Username, virtual_host = VHostPath}, - permission = #permission{ scope = Scope, - configure = ConfigurePerm, + permission = #permission{ configure = ConfigurePerm, write = WritePerm, read = ReadPerm}} <- %% TODO: use dirty ops instead diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 2389ec86..5cdd0e3c 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -35,7 +35,7 @@ -export([internal_declare/2, internal_delete/1, maybe_run_queue_via_backing_queue/2, update_ram_duration/1, set_ram_duration_target/2, - set_maximum_since_use/2, maybe_expire/1]). + set_maximum_since_use/2, maybe_expire/1, drop_expired/1]). -export([pseudo_queue/2]). -export([lookup/1, with/2, with_or_die/2, assert_equivalence/5, check_exclusive_access/2, with_exclusive_access_or_die/3, @@ -56,7 +56,7 @@ -include("rabbit.hrl"). -include_lib("stdlib/include/qlc.hrl"). --define(EXPIRES_TYPES, [byte, short, signedint, long]). +-define(INTEGER_ARG_TYPES, [byte, short, signedint, long]). %%---------------------------------------------------------------------------- @@ -97,14 +97,14 @@ -spec(with_exclusive_access_or_die/3 :: (name(), pid(), qfun(A)) -> A | rabbit_types:channel_exit()). -spec(list/1 :: (rabbit_types:vhost()) -> [rabbit_types:amqqueue()]). --spec(info_keys/0 :: () -> [rabbit_types:info_key()]). --spec(info/1 :: (rabbit_types:amqqueue()) -> [rabbit_types:info()]). +-spec(info_keys/0 :: () -> rabbit_types:info_keys()). +-spec(info/1 :: (rabbit_types:amqqueue()) -> rabbit_types:infos()). -spec(info/2 :: - (rabbit_types:amqqueue(), [rabbit_types:info_key()]) - -> [rabbit_types:info()]). --spec(info_all/1 :: (rabbit_types:vhost()) -> [[rabbit_types:info()]]). --spec(info_all/2 :: (rabbit_types:vhost(), [rabbit_types:info_key()]) - -> [[rabbit_types:info()]]). + (rabbit_types:amqqueue(), rabbit_types:info_keys()) + -> rabbit_types:infos()). +-spec(info_all/1 :: (rabbit_types:vhost()) -> [rabbit_types:infos()]). +-spec(info_all/2 :: (rabbit_types:vhost(), rabbit_types:info_keys()) + -> [rabbit_types:infos()]). -spec(consumers/1 :: (rabbit_types:amqqueue()) -> [{pid(), rabbit_types:ctag(), boolean()}]). @@ -162,7 +162,7 @@ -spec(set_maximum_since_use/2 :: (pid(), non_neg_integer()) -> 'ok'). -spec(maybe_expire/1 :: (pid()) -> 'ok'). -spec(on_node_down/1 :: (node()) -> 'ok'). --spec(pseudo_queue/2 :: (binary(), pid()) -> rabbit_types:amqqueue()). +-spec(pseudo_queue/2 :: (name(), pid()) -> rabbit_types:amqqueue()). -endif. @@ -310,18 +310,30 @@ check_declare_arguments(QueueName, Args) -> precondition_failed, "invalid arg '~s' for ~s: ~w", [Key, rabbit_misc:rs(QueueName), Error]) - end || {Key, Fun} <- [{<<"x-expires">>, fun check_expires_argument/1}]], + end || {Key, Fun} <- + [{<<"x-expires">>, fun check_expires_argument/1}, + {<<"x-message-ttl">>, fun check_message_ttl_argument/1}]], ok. -check_expires_argument(undefined) -> +check_expires_argument(Val) -> + check_integer_argument(Val, + expires_not_of_acceptable_type, + expires_zero_or_less). + +check_message_ttl_argument(Val) -> + check_integer_argument(Val, + ttl_not_of_acceptable_type, + ttl_zero_or_less). + +check_integer_argument(undefined, _, _) -> ok; -check_expires_argument({Type, Expires}) when Expires > 0 -> - case lists:member(Type, ?EXPIRES_TYPES) of +check_integer_argument({Type, Val}, InvalidTypeError, _) when Val > 0 -> + case lists:member(Type, ?INTEGER_ARG_TYPES) of true -> ok; - false -> {error, {expires_not_of_acceptable_type, Type, Expires}} + false -> {error, {InvalidTypeError, Type, Val}} end; -check_expires_argument({_Type, _Expires}) -> - {error, expires_zero_or_less}. +check_integer_argument({_Type, _Val}, _, ZeroOrLessError) -> + {error, ZeroOrLessError}. list(VHostPath) -> mnesia:dirty_match_object( @@ -349,7 +361,7 @@ consumers(#amqqueue{ pid = QPid }) -> delegate_call(QPid, consumers, infinity). consumers_all(VHostPath) -> - lists:concat( + lists:append( map(VHostPath, fun (Q) -> [{Q#amqqueue.name, ChPid, ConsumerTag, AckRequired} || {ChPid, ConsumerTag, AckRequired} <- consumers(Q)] @@ -466,6 +478,9 @@ set_maximum_since_use(QPid, Age) -> maybe_expire(QPid) -> gen_server2:cast(QPid, maybe_expire). +drop_expired(QPid) -> + gen_server2:cast(QPid, drop_expired). + on_node_down(Node) -> rabbit_binding:process_deletions( lists:foldl( diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 19db731a..75f285df 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -39,6 +39,8 @@ -define(SYNC_INTERVAL, 5). %% milliseconds -define(RAM_DURATION_UPDATE_INTERVAL, 5000). +-define(BASE_MESSAGE_PROPERTIES, #message_properties{expiry = undefined}). + -export([start_link/1, info_keys/0]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, @@ -61,7 +63,9 @@ sync_timer_ref, rate_timer_ref, expiry_timer_ref, - stats_timer + stats_timer, + ttl, + ttl_timer_ref }). -record(consumer, {tag, ack_required}). @@ -123,6 +127,7 @@ init(Q) -> sync_timer_ref = undefined, rate_timer_ref = undefined, expiry_timer_ref = undefined, + ttl = undefined, stats_timer = rabbit_event:init_stats_timer()}, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -145,12 +150,6 @@ code_change(_OldVsn, State, _Extra) -> %%---------------------------------------------------------------------------- -init_expires(State = #q{q = #amqqueue{arguments = Arguments}}) -> - case rabbit_misc:table_lookup(Arguments, <<"x-expires">>) of - {_Type, Expires} -> ensure_expiry_timer(State#q{expires = Expires}); - undefined -> State - end. - declare(Recover, From, State = #q{q = Q = #amqqueue{name = QName, durable = IsDurable}, backing_queue = BQ, backing_queue_state = undefined, @@ -165,7 +164,7 @@ declare(Recover, From, self(), {rabbit_amqqueue, set_ram_duration_target, [self()]}), BQS = BQ:init(QName, IsDurable, Recover), - State1 = init_expires(State#q{backing_queue_state = BQS}), + State1 = process_args(State#q{backing_queue_state = BQS}), rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State1)), rabbit_event:if_enabled(StatsTimer, @@ -174,6 +173,19 @@ declare(Recover, From, Q1 -> {stop, normal, {existing, Q1}, State} end. +process_args(State = #q{q = #amqqueue{arguments = Arguments}}) -> + lists:foldl(fun({Arg, Fun}, State1) -> + case rabbit_misc:table_lookup(Arguments, Arg) of + {_Type, Val} -> Fun(Val, State1); + undefined -> State1 + end + end, State, [{<<"x-expires">>, fun init_expires/2}, + {<<"x-message-ttl">>, fun init_ttl/2}]). + +init_expires(Expires, State) -> ensure_expiry_timer(State#q{expires = Expires}). + +init_ttl(TTL, State) -> drop_expired_messages(State#q{ttl = TTL}). + terminate_shutdown(Fun, State) -> State1 = #q{backing_queue = BQ, backing_queue_state = BQS} = stop_sync_timer(stop_rate_timer(State)), @@ -303,6 +315,25 @@ ch_record(ChPid) -> store_ch_record(C = #cr{ch_pid = ChPid}) -> put({ch, ChPid}, C). +maybe_store_ch_record(C = #cr{consumer_count = ConsumerCount, + acktags = ChAckTags, + txn = Txn, + unsent_message_count = UnsentMessageCount}) -> + case {sets:size(ChAckTags), ConsumerCount, UnsentMessageCount, Txn} of + {0, 0, 0, none} -> ok = erase_ch_record(C), + false; + _ -> store_ch_record(C), + true + end. + +erase_ch_record(#cr{ch_pid = ChPid, + limiter_pid = LimiterPid, + monitor_ref = MonitorRef}) -> + ok = rabbit_limiter:unregister(LimiterPid, self()), + erlang:demonitor(MonitorRef), + erase({ch, ChPid}), + ok. + all_ch_record() -> [C || {{ch, _}, C} <- get()]. @@ -349,7 +380,7 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc, end, NewC = C#cr{unsent_message_count = Count + 1, acktags = ChAckTags1}, - store_ch_record(NewC), + true = maybe_store_ch_record(NewC), {NewActiveConsumers, NewBlockedConsumers} = case ch_record_state_transition(C, NewC) of ok -> {queue:in(QEntry, ActiveConsumersTail), @@ -368,7 +399,7 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc, deliver_msgs_to_consumers(Funs, FunAcc1, State2); %% if IsMsgReady then we've hit the limiter false when IsMsgReady -> - store_ch_record(C#cr{is_limit_active = true}), + true = maybe_store_ch_record(C#cr{is_limit_active = true}), {NewActiveConsumers, NewBlockedConsumers} = move_consumers(ChPid, ActiveConsumers, @@ -388,35 +419,40 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc, deliver_from_queue_pred(IsEmpty, _State) -> not IsEmpty. -deliver_from_queue_deliver(AckRequired, false, - State = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> - {{Message, IsDelivered, AckTag, Remaining}, BQS1} = - BQ:fetch(AckRequired, BQS), - {{Message, IsDelivered, AckTag}, 0 == Remaining, - State #q { backing_queue_state = BQS1 }}. +deliver_from_queue_deliver(AckRequired, false, State) -> + {{Message, IsDelivered, AckTag, Remaining}, State1} = + fetch(AckRequired, State), + {{Message, IsDelivered, AckTag}, 0 == Remaining, State1}. -run_message_queue(State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> +run_message_queue(State) -> Funs = {fun deliver_from_queue_pred/2, fun deliver_from_queue_deliver/3}, + State1 = #q{backing_queue = BQ, backing_queue_state = BQS} = + drop_expired_messages(State), IsEmpty = BQ:is_empty(BQS), - {_IsEmpty1, State1} = deliver_msgs_to_consumers(Funs, IsEmpty, State), - State1. + {_IsEmpty1, State2} = deliver_msgs_to_consumers(Funs, IsEmpty, State1), + State2. attempt_delivery(none, _ChPid, Message, State = #q{backing_queue = BQ}) -> PredFun = fun (IsEmpty, _State) -> not IsEmpty end, DeliverFun = fun (AckRequired, false, State1 = #q{backing_queue_state = BQS}) -> + %% we don't need an expiry here because messages are + %% not being enqueued, so we use an empty + %% message_properties. {AckTag, BQS1} = - BQ:publish_delivered(AckRequired, Message, BQS), + BQ:publish_delivered(AckRequired, Message, + ?BASE_MESSAGE_PROPERTIES, BQS), {{Message, false, AckTag}, true, State1#q{backing_queue_state = BQS1}} end, deliver_msgs_to_consumers({ PredFun, DeliverFun }, false, State); -attempt_delivery(Txn, ChPid, Message, State = #q{backing_queue = BQ, +attempt_delivery(Txn, ChPid, Message, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> record_current_channel_tx(ChPid, Txn), - {true, State#q{backing_queue_state = BQ:tx_publish(Txn, Message, BQS)}}. + {true, + State#q{backing_queue_state = + BQ:tx_publish(Txn, Message, ?BASE_MESSAGE_PROPERTIES, BQS)}}. deliver_or_enqueue(Txn, ChPid, Message, State = #q{backing_queue = BQ}) -> case attempt_delivery(Txn, ChPid, Message, State) of @@ -424,13 +460,22 @@ deliver_or_enqueue(Txn, ChPid, Message, State = #q{backing_queue = BQ}) -> {true, NewState}; {false, NewState} -> %% Txn is none and no unblocked channels with consumers - BQS = BQ:publish(Message, State #q.backing_queue_state), - {false, NewState#q{backing_queue_state = BQS}} + BQS = BQ:publish(Message, + message_properties(State), + State #q.backing_queue_state), + {false, ensure_ttl_timer(NewState#q{backing_queue_state = BQS})} end. -requeue_and_run(AckTags, State = #q{backing_queue = BQ}) -> +requeue_and_run(AckTags, State = #q{backing_queue = BQ, ttl=TTL}) -> maybe_run_queue_via_backing_queue( - fun (BQS) -> BQ:requeue(AckTags, BQS) end, State). + fun (BQS) -> + BQ:requeue(AckTags, reset_msg_expiry_fun(TTL), BQS) + end, State). + +fetch(AckRequired, State = #q{backing_queue_state = BQS, + backing_queue = BQ}) -> + {Result, BQS1} = BQ:fetch(AckRequired, BQS), + {Result, State#q{backing_queue_state = BQS1}}. add_consumer(ChPid, Consumer, Queue) -> queue:in({ChPid, Consumer}, Queue). @@ -453,7 +498,7 @@ possibly_unblock(State, ChPid, Update) -> State; C -> NewC = Update(C), - store_ch_record(NewC), + maybe_store_ch_record(NewC), case ch_record_state_transition(C, NewC) of ok -> State; unblock -> {NewBlockedConsumers, NewActiveConsumers} = @@ -474,10 +519,8 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) -> case lookup_ch(DownPid) of not_found -> {ok, State}; - #cr{monitor_ref = MonitorRef, ch_pid = ChPid, txn = Txn, - acktags = ChAckTags} -> - erlang:demonitor(MonitorRef), - erase({ch, ChPid}), + C = #cr{ch_pid = ChPid, txn = Txn, acktags = ChAckTags} -> + ok = erase_ch_record(C), State1 = State#q{ exclusive_consumer = case Holder of {ChPid, _} -> none; @@ -525,15 +568,18 @@ qname(#q{q = #amqqueue{name = QName}}) -> QName. maybe_run_queue_via_backing_queue(Fun, State = #q{backing_queue_state = BQS}) -> run_message_queue(State#q{backing_queue_state = Fun(BQS)}). -commit_transaction(Txn, From, ChPid, State = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> - {AckTags, BQS1} = - BQ:tx_commit(Txn, fun () -> gen_server2:reply(From, ok) end, BQS), +commit_transaction(Txn, From, ChPid, State = #q{backing_queue = BQ, + backing_queue_state = BQS, + ttl = TTL}) -> + {AckTags, BQS1} = BQ:tx_commit(Txn, + fun () -> gen_server2:reply(From, ok) end, + reset_msg_expiry_fun(TTL), + BQS), %% ChPid must be known here because of the participant management %% by the channel. C = #cr{acktags = ChAckTags} = lookup_ch(ChPid), ChAckTags1 = subtract_acks(ChAckTags, AckTags), - store_ch_record(C#cr{acktags = ChAckTags1, txn = none}), + maybe_store_ch_record(C#cr{acktags = ChAckTags1, txn = none}), State#q{backing_queue_state = BQS1}. rollback_transaction(Txn, ChPid, State = #q{backing_queue = BQ, @@ -547,6 +593,44 @@ rollback_transaction(Txn, ChPid, State = #q{backing_queue = BQ, subtract_acks(A, B) when is_list(B) -> lists:foldl(fun sets:del_element/2, A, B). +reset_msg_expiry_fun(TTL) -> + fun(MsgProps) -> + MsgProps#message_properties{expiry = calculate_msg_expiry(TTL)} + end. + +message_properties(#q{ttl=TTL}) -> + #message_properties{expiry = calculate_msg_expiry(TTL)}. + +calculate_msg_expiry(undefined) -> undefined; +calculate_msg_expiry(TTL) -> now_millis() + (TTL * 1000). + +drop_expired_messages(State = #q{ttl = undefined}) -> + State; +drop_expired_messages(State = #q{backing_queue_state = BQS, + backing_queue = BQ}) -> + Now = now_millis(), + BQS1 = BQ:dropwhile( + fun (#message_properties{expiry = Expiry}) -> + Now > Expiry + end, BQS), + ensure_ttl_timer(State#q{backing_queue_state = BQS1}). + +ensure_ttl_timer(State = #q{backing_queue = BQ, + backing_queue_state = BQS, + ttl = TTL, + ttl_timer_ref = undefined}) + when TTL =/= undefined -> + case BQ:is_empty(BQS) of + true -> State; + false -> TRef = timer:apply_after(TTL, rabbit_amqqueue, drop_expired, + [self()]), + State#q{ttl_timer_ref = TRef} + end; +ensure_ttl_timer(State) -> + State. + +now_millis() -> timer:now_diff(now(), {0,0,0}). + infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. i(name, #q{q = #amqqueue{name = Name}}) -> Name; @@ -605,6 +689,7 @@ prioritise_cast(Msg, _State) -> {set_ram_duration_target, _Duration} -> 8; {set_maximum_since_use, _Age} -> 8; maybe_expire -> 8; + drop_expired -> 8; emit_stats -> 7; {ack, _Txn, _MsgIds, _ChPid} -> 7; {reject, _MsgIds, _Requeue, _ChPid} -> 7; @@ -695,21 +780,22 @@ handle_call({notify_down, ChPid}, _From, State) -> end; handle_call({basic_get, ChPid, NoAck}, _From, - State = #q{q = #amqqueue{name = QName}, - backing_queue_state = BQS, backing_queue = BQ}) -> + State = #q{q = #amqqueue{name = QName}}) -> AckRequired = not NoAck, State1 = ensure_expiry_timer(State), - case BQ:fetch(AckRequired, BQS) of - {empty, BQS1} -> reply(empty, State1#q{backing_queue_state = BQS1}); - {{Message, IsDelivered, AckTag, Remaining}, BQS1} -> + case fetch(AckRequired, drop_expired_messages(State1)) of + {empty, State2} -> + reply(empty, State2); + {{Message, IsDelivered, AckTag, Remaining}, State2} -> case AckRequired of - true -> C = #cr{acktags = ChAckTags} = ch_record(ChPid), - store_ch_record( - C#cr{acktags = sets:add_element(AckTag, ChAckTags)}); + true -> C = #cr{acktags = ChAckTags} = ch_record(ChPid), + true = maybe_store_ch_record( + C#cr{acktags = sets:add_element(AckTag, + ChAckTags)}); false -> ok end, Msg = {QName, self(), AckTag, IsDelivered, Message}, - reply({ok, Remaining, Msg}, State1#q{backing_queue_state = BQS1}) + reply({ok, Remaining, Msg}, State2) end; handle_call({basic_consume, NoAck, ChPid, LimiterPid, @@ -723,8 +809,8 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, C = #cr{consumer_count = ConsumerCount} = ch_record(ChPid), Consumer = #consumer{tag = ConsumerTag, ack_required = not NoAck}, - store_ch_record(C#cr{consumer_count = ConsumerCount +1, - limiter_pid = LimiterPid}), + true = maybe_store_ch_record(C#cr{consumer_count = ConsumerCount +1, + limiter_pid = LimiterPid}), ok = case ConsumerCount of 0 -> rabbit_limiter:register(LimiterPid, self()); _ -> ok @@ -758,12 +844,15 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, not_found -> ok = maybe_send_reply(ChPid, OkMsg), reply(ok, State); - C = #cr{consumer_count = ConsumerCount, limiter_pid = LimiterPid} -> - store_ch_record(C#cr{consumer_count = ConsumerCount - 1}), - case ConsumerCount of - 1 -> ok = rabbit_limiter:unregister(LimiterPid, self()); - _ -> ok - end, + C = #cr{consumer_count = ConsumerCount, + limiter_pid = LimiterPid} -> + C1 = C#cr{consumer_count = ConsumerCount -1}, + maybe_store_ch_record( + case ConsumerCount of + 1 -> ok = rabbit_limiter:unregister(LimiterPid, self()), + C1#cr{limiter_pid = undefined}; + _ -> C1 + end), ok = maybe_send_reply(ChPid, OkMsg), NewState = State#q{exclusive_consumer = cancel_holder(ChPid, @@ -781,11 +870,11 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, end end; -handle_call(stat, _From, State = #q{backing_queue = BQ, - backing_queue_state = BQS, - active_consumers = ActiveConsumers}) -> - reply({ok, BQ:len(BQS), queue:len(ActiveConsumers)}, - ensure_expiry_timer(State)); +handle_call(stat, _From, State) -> + State1 = #q{backing_queue = BQ, backing_queue_state = BQS, + active_consumers = ActiveConsumers} = + drop_expired_messages(ensure_expiry_timer(State)), + reply({ok, BQ:len(BQS), queue:len(ActiveConsumers)}, State1); handle_call({delete, IfUnused, IfEmpty}, _From, State = #q{backing_queue_state = BQS, backing_queue = BQ}) -> @@ -812,7 +901,7 @@ handle_call({requeue, AckTags, ChPid}, From, State) -> noreply(State); C = #cr{acktags = ChAckTags} -> ChAckTags1 = subtract_acks(ChAckTags, AckTags), - store_ch_record(C#cr{acktags = ChAckTags1}), + maybe_store_ch_record(C#cr{acktags = ChAckTags1}), noreply(requeue_and_run(AckTags, State)) end; @@ -836,7 +925,7 @@ handle_cast({ack, Txn, AckTags, ChPid}, {C#cr{acktags = ChAckTags1}, BQ:ack(AckTags, BQS)}; _ -> {C#cr{txn = Txn}, BQ:tx_ack(Txn, AckTags, BQS)} end, - store_ch_record(C1), + maybe_store_ch_record(C1), noreply(State#q{backing_queue_state = BQS1}) end; @@ -847,7 +936,7 @@ handle_cast({reject, AckTags, Requeue, ChPid}, noreply(State); C = #cr{acktags = ChAckTags} -> ChAckTags1 = subtract_acks(ChAckTags, AckTags), - store_ch_record(C#cr{acktags = ChAckTags1}), + maybe_store_ch_record(C#cr{acktags = ChAckTags1}), noreply(case Requeue of true -> requeue_and_run(AckTags, State); false -> BQS1 = BQ:ack(AckTags, BQS), @@ -918,12 +1007,15 @@ handle_cast(maybe_expire, State) -> false -> noreply(ensure_expiry_timer(State)) end; +handle_cast(drop_expired, State) -> + noreply(drop_expired_messages(State#q{ttl_timer_ref = undefined})); + handle_cast(emit_stats, State = #q{stats_timer = StatsTimer}) -> %% Do not invoke noreply as it would see no timer and create a new one. emit_stats(State), State1 = State#q{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)}, assert_invariant(State1), - {noreply, State1}. + {noreply, State1, hibernate}. handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State = #q{q = #amqqueue{exclusive_owner = DownPid}}) -> diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 2230c507..352e76fd 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -62,12 +62,16 @@ behaviour_info(callbacks) -> {purge, 1}, %% Publish a message. - {publish, 2}, + {publish, 3}, %% Called for messages which have already been passed straight %% out to a client. The queue will be empty for these calls %% (i.e. saves the round trip through the backing queue). - {publish_delivered, 3}, + {publish_delivered, 4}, + + %% Drop messages from the head of the queue while the supplied + %% predicate returns true. + {dropwhile, 2}, %% Produce the next message. {fetch, 2}, @@ -77,7 +81,7 @@ behaviour_info(callbacks) -> {ack, 2}, %% A publish, but in the context of a transaction. - {tx_publish, 3}, + {tx_publish, 4}, %% Acks, but in the context of a transaction. {tx_ack, 3}, @@ -89,11 +93,11 @@ behaviour_info(callbacks) -> %% Commit a transaction. The Fun passed in must be called once %% the messages have really been commited. This CPS permits the %% possibility of commit coalescing. - {tx_commit, 3}, + {tx_commit, 4}, %% Reinsert messages into the queue which have already been %% delivered and were pending acknowledgement. - {requeue, 2}, + {requeue, 3}, %% How long is my queue? {len, 1}, diff --git a/src/rabbit_binary_generator.erl b/src/rabbit_binary_generator.erl index 722573c7..b2997ae2 100644 --- a/src/rabbit_binary_generator.erl +++ b/src/rabbit_binary_generator.erl @@ -75,9 +75,12 @@ rabbit_types:encoded_content()). -spec(clear_encoded_content/1 :: (rabbit_types:content()) -> rabbit_types:unencoded_content()). --spec(map_exception/3 :: (non_neg_integer(), rabbit_types:amqp_error(), +-spec(map_exception/3 :: (rabbit_channel:channel_number(), + rabbit_types:amqp_error() | any(), rabbit_types:protocol()) -> - {boolean(), non_neg_integer(), rabbit_framing:amqp_method()}). + {boolean(), + rabbit_channel:channel_number(), + rabbit_framing:amqp_method_record()}). -endif. diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index 1af213c4..9d1399f7 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -60,7 +60,7 @@ rabbit_types:ok_or_error(rabbit_types:amqp_error()))). -type(bindings() :: [rabbit_types:binding()]). --opaque(deletions() :: dict:dictionary()). +-opaque(deletions() :: dict()). -spec(recover/0 :: () -> [rabbit_types:binding()]). -spec(exists/1 :: (rabbit_types:binding()) -> boolean() | bind_errors()). @@ -78,13 +78,13 @@ -spec(list_for_source_and_destination/2 :: (rabbit_types:binding_source(), rabbit_types:binding_destination()) -> bindings()). --spec(info_keys/0 :: () -> [rabbit_types:info_key()]). --spec(info/1 :: (rabbit_types:binding()) -> [rabbit_types:info()]). --spec(info/2 :: (rabbit_types:binding(), [rabbit_types:info_key()]) -> - [rabbit_types:info()]). --spec(info_all/1 :: (rabbit_types:vhost()) -> [[rabbit_types:info()]]). --spec(info_all/2 ::(rabbit_types:vhost(), [rabbit_types:info_key()]) - -> [[rabbit_types:info()]]). +-spec(info_keys/0 :: () -> rabbit_types:info_keys()). +-spec(info/1 :: (rabbit_types:binding()) -> rabbit_types:infos()). +-spec(info/2 :: (rabbit_types:binding(), rabbit_types:info_keys()) -> + rabbit_types:infos()). +-spec(info_all/1 :: (rabbit_types:vhost()) -> [rabbit_types:infos()]). +-spec(info_all/2 ::(rabbit_types:vhost(), rabbit_types:info_keys()) + -> [rabbit_types:infos()]). -spec(has_for_source/1 :: (rabbit_types:binding_source()) -> boolean()). -spec(remove_for_source/1 :: (rabbit_types:binding_source()) -> bindings()). -spec(remove_for_destination/1 :: @@ -94,9 +94,9 @@ -spec(process_deletions/1 :: (deletions()) -> 'ok'). -spec(combine_deletions/2 :: (deletions(), deletions()) -> deletions()). -spec(add_deletion/3 :: (rabbit_exchange:name(), - {'undefined' | rabbit_types:binding_source(), + {'undefined' | rabbit_types:exchange(), 'deleted' | 'not_deleted', - deletions()}, deletions()) -> deletions()). + bindings()}, deletions()) -> deletions()). -spec(new_deletions/0 :: () -> deletions()). -endif. diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 58c8e341..19613a57 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -87,17 +87,17 @@ -spec(do/3 :: (pid(), rabbit_framing:amqp_method_record(), rabbit_types:maybe(rabbit_types:content())) -> 'ok'). -spec(shutdown/1 :: (pid()) -> 'ok'). --spec(send_command/2 :: (pid(), rabbit_framing:amqp_method()) -> 'ok'). +-spec(send_command/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok'). -spec(deliver/4 :: (pid(), rabbit_types:ctag(), boolean(), rabbit_amqqueue:qmsg()) -> 'ok'). -spec(flushed/2 :: (pid(), pid()) -> 'ok'). -spec(list/0 :: () -> [pid()]). --spec(info_keys/0 :: () -> [rabbit_types:info_key()]). --spec(info/1 :: (pid()) -> [rabbit_types:info()]). --spec(info/2 :: (pid(), [rabbit_types:info_key()]) -> [rabbit_types:info()]). --spec(info_all/0 :: () -> [[rabbit_types:info()]]). --spec(info_all/1 :: ([rabbit_types:info_key()]) -> [[rabbit_types:info()]]). +-spec(info_keys/0 :: () -> rabbit_types:info_keys()). +-spec(info/1 :: (pid()) -> rabbit_types:infos()). +-spec(info/2 :: (pid(), rabbit_types:info_keys()) -> rabbit_types:infos()). +-spec(info_all/0 :: () -> [rabbit_types:infos()]). +-spec(info_all/1 :: (rabbit_types:info_keys()) -> [rabbit_types:infos()]). -spec(emit_stats/1 :: (pid()) -> 'ok'). -endif. @@ -233,7 +233,7 @@ handle_cast({method, Method, Content}, State) -> end; handle_cast({flushed, QPid}, State) -> - {noreply, queue_blocked(QPid, State)}; + {noreply, queue_blocked(QPid, State), hibernate}; handle_cast(terminate, State) -> {stop, normal, State}; @@ -258,11 +258,12 @@ handle_cast({deliver, ConsumerTag, AckRequired, Msg}, handle_cast(emit_stats, State = #ch{stats_timer = StatsTimer}) -> internal_emit_stats(State), {noreply, - State#ch{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)}}. + State#ch{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)}, + hibernate}. handle_info({'DOWN', _MRef, process, QPid, _Reason}, State) -> erase_queue_stats(QPid), - {noreply, queue_blocked(QPid, State)}. + {noreply, queue_blocked(QPid, State), hibernate}. handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) -> ok = clear_permission_cache(), diff --git a/src/rabbit_connection_sup.erl b/src/rabbit_connection_sup.erl index b3821d3b..22742fa9 100644 --- a/src/rabbit_connection_sup.erl +++ b/src/rabbit_connection_sup.erl @@ -66,7 +66,8 @@ start_link() -> supervisor2:start_child( SupPid, {reader, {rabbit_reader, start_link, - [ChannelSupSupPid, Collector, start_heartbeat_fun(SupPid)]}, + [ChannelSupSupPid, Collector, + rabbit_heartbeat:start_heartbeat_fun(SupPid)]}, intrinsic, ?MAX_WAIT, worker, [rabbit_reader]}), {ok, SupPid, ReaderPid}. @@ -78,22 +79,3 @@ reader(Pid) -> init([]) -> {ok, {{one_for_all, 0, 1}, []}}. -start_heartbeat_fun(SupPid) -> - fun (_Sock, 0) -> - none; - (Sock, TimeoutSec) -> - Parent = self(), - {ok, Sender} = - supervisor2:start_child( - SupPid, {heartbeat_sender, - {rabbit_heartbeat, start_heartbeat_sender, - [Parent, Sock, TimeoutSec]}, - transient, ?MAX_WAIT, worker, [rabbit_heartbeat]}), - {ok, Receiver} = - supervisor2:start_child( - SupPid, {heartbeat_receiver, - {rabbit_heartbeat, start_heartbeat_receiver, - [Parent, Sock, TimeoutSec]}, - transient, ?MAX_WAIT, worker, [rabbit_heartbeat]}), - {Sender, Receiver} - end. diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index 8facaf16..6b212745 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -39,7 +39,6 @@ -define(QUIET_OPT, "-q"). -define(NODE_OPT, "-n"). -define(VHOST_OPT, "-p"). --define(SCOPE_OPT, "-s"). %%---------------------------------------------------------------------------- @@ -67,7 +66,7 @@ start() -> {[Command0 | Args], Opts} = rabbit_misc:get_options( [{flag, ?QUIET_OPT}, {option, ?NODE_OPT, NodeStr}, - {option, ?VHOST_OPT, "/"}, {option, ?SCOPE_OPT, "client"}], + {option, ?VHOST_OPT, "/"}], FullCommand), Opts1 = lists:map(fun({K, V}) -> case K of @@ -289,10 +288,9 @@ action(list_consumers, Node, _Args, Opts, Inform) -> action(set_permissions, Node, [Username, CPerm, WPerm, RPerm], Opts, Inform) -> VHost = proplists:get_value(?VHOST_OPT, Opts), - Scope = proplists:get_value(?SCOPE_OPT, Opts), Inform("Setting permissions for user ~p in vhost ~p", [Username, VHost]), call(Node, {rabbit_access_control, set_permissions, - [Scope, Username, VHost, CPerm, WPerm, RPerm]}); + [Username, VHost, CPerm, WPerm, RPerm]}); action(clear_permissions, Node, [Username], Opts, Inform) -> VHost = proplists:get_value(?VHOST_OPT, Opts), diff --git a/src/rabbit_dialyzer.erl b/src/rabbit_dialyzer.erl deleted file mode 100644 index a9806305..00000000 --- a/src/rabbit_dialyzer.erl +++ /dev/null @@ -1,92 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License at -%% http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the -%% License for the specific language governing rights and limitations -%% under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developers of the Original Code are LShift Ltd, -%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, -%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd -%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial -%% Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift -%% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2010 Cohesive Financial Technologies -%% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2010 Rabbit Technologies Ltd. -%% -%% All Rights Reserved. -%% -%% Contributor(s): ______________________________________. -%% - --module(rabbit_dialyzer). - --export([create_basic_plt/1, add_to_plt/2, dialyze_files/2, - halt_with_code/1]). - -%%---------------------------------------------------------------------------- - --ifdef(use_specs). - --spec(create_basic_plt/1 :: (file:filename()) -> 'ok'). --spec(add_to_plt/2 :: (file:filename(), string()) -> 'ok'). --spec(dialyze_files/2 :: (file:filename(), string()) -> 'ok'). --spec(halt_with_code/1 :: (atom()) -> no_return()). - --endif. - -%%---------------------------------------------------------------------------- - -create_basic_plt(BasicPltPath) -> - OptsRecord = dialyzer_options:build( - [{analysis_type, plt_build}, - {output_plt, BasicPltPath}, - {files_rec, otp_apps_dependencies_paths()}]), - dialyzer_cl:start(OptsRecord), - ok. - -add_to_plt(PltPath, FilesString) -> - Files = string:tokens(FilesString, " "), - DialyzerWarnings = dialyzer:run([{analysis_type, plt_add}, - {init_plt, PltPath}, - {output_plt, PltPath}, - {files, Files}]), - print_warnings(DialyzerWarnings, fun dialyzer:format_warning/1), - ok. - -dialyze_files(PltPath, ModifiedFiles) -> - Files = string:tokens(ModifiedFiles, " "), - DialyzerWarnings = dialyzer:run([{init_plt, PltPath}, - {files, Files}, - {warnings, [behaviours, - race_conditions]}]), - case DialyzerWarnings of - [] -> io:format("~nOk~n"); - _ -> io:format("~n~nFAILED with the following ~p warnings:~n~n", - [length(DialyzerWarnings)]), - print_warnings(DialyzerWarnings, fun dialyzer:format_warning/1) - end, - ok. - -print_warnings(Warnings, FormatFun) -> - [io:format("~s~n", [FormatFun(W)]) || W <- Warnings], - io:format("~n"). - -otp_apps_dependencies_paths() -> - [code:lib_dir(App, ebin) || - App <- [kernel, stdlib, sasl, mnesia, os_mon, ssl, eunit, tools]]. - -halt_with_code(ok) -> - halt(); -halt_with_code(fail) -> - halt(1). diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 46564233..61a24388 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -68,14 +68,14 @@ (name()) -> rabbit_types:exchange() | rabbit_types:channel_exit()). -spec(list/1 :: (rabbit_types:vhost()) -> [rabbit_types:exchange()]). --spec(info_keys/0 :: () -> [rabbit_types:info_key()]). --spec(info/1 :: (rabbit_types:exchange()) -> [rabbit_types:info()]). +-spec(info_keys/0 :: () -> rabbit_types:info_keys()). +-spec(info/1 :: (rabbit_types:exchange()) -> rabbit_types:infos()). -spec(info/2 :: - (rabbit_types:exchange(), [rabbit_types:info_key()]) - -> [rabbit_types:info()]). --spec(info_all/1 :: (rabbit_types:vhost()) -> [[rabbit_types:info()]]). --spec(info_all/2 ::(rabbit_types:vhost(), [rabbit_types:info_key()]) - -> [[rabbit_types:info()]]). + (rabbit_types:exchange(), rabbit_types:info_keys()) + -> rabbit_types:infos()). +-spec(info_all/1 :: (rabbit_types:vhost()) -> [rabbit_types:infos()]). +-spec(info_all/2 ::(rabbit_types:vhost(), rabbit_types:info_keys()) + -> [rabbit_types:infos()]). -spec(publish/2 :: (rabbit_types:exchange(), rabbit_types:delivery()) -> {rabbit_router:routing_result(), [pid()]}). -spec(delete/2 :: diff --git a/src/rabbit_framing.erl b/src/rabbit_framing.erl new file mode 100644 index 00000000..a0c8f4d5 --- /dev/null +++ b/src/rabbit_framing.erl @@ -0,0 +1,64 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +%% TODO auto-generate + +-module(rabbit_framing). + +-ifdef(use_specs). + +-export_type([protocol/0, + amqp_field_type/0, amqp_property_type/0, + amqp_table/0, amqp_array/0, amqp_value/0, + amqp_method_name/0, amqp_method/0, amqp_method_record/0, + amqp_method_field_name/0, amqp_property_record/0, + amqp_exception/0, amqp_exception_code/0, amqp_class_id/0]). + +-type(protocol() :: 'rabbit_framing_amqp_0_8' | 'rabbit_framing_amqp_0_9_1'). + +-define(protocol_type(T), type(T :: rabbit_framing_amqp_0_8:T | + rabbit_framing_amqp_0_9_1:T)). + +-?protocol_type(amqp_field_type()). +-?protocol_type(amqp_property_type()). +-?protocol_type(amqp_table()). +-?protocol_type(amqp_array()). +-?protocol_type(amqp_value()). +-?protocol_type(amqp_method_name()). +-?protocol_type(amqp_method()). +-?protocol_type(amqp_method_record()). +-?protocol_type(amqp_method_field_name()). +-?protocol_type(amqp_property_record()). +-?protocol_type(amqp_exception()). +-?protocol_type(amqp_exception_code()). +-?protocol_type(amqp_class_id()). + +-endif. diff --git a/src/rabbit_heartbeat.erl b/src/rabbit_heartbeat.erl index a9945af1..589bf7cc 100644 --- a/src/rabbit_heartbeat.erl +++ b/src/rabbit_heartbeat.erl @@ -32,7 +32,7 @@ -module(rabbit_heartbeat). -export([start_heartbeat_sender/3, start_heartbeat_receiver/3, - pause_monitor/1, resume_monitor/1]). + start_heartbeat_fun/1, pause_monitor/1, resume_monitor/1]). -include("rabbit.hrl"). @@ -41,16 +41,28 @@ -ifdef(use_specs). -export_type([heartbeaters/0]). +-export_type([start_heartbeat_fun/0]). --type(heartbeaters() :: rabbit_types:maybe({pid(), pid()})). +-type(heartbeaters() :: {rabbit_types:maybe(pid()), rabbit_types:maybe(pid())}). + +-type(heartbeat_callback() :: fun (() -> any())). + +-type(start_heartbeat_fun() :: + fun((rabbit_net:socket(), non_neg_integer(), heartbeat_callback(), + non_neg_integer(), heartbeat_callback()) -> + no_return())). -spec(start_heartbeat_sender/3 :: - (pid(), rabbit_net:socket(), non_neg_integer()) -> + (rabbit_net:socket(), non_neg_integer(), heartbeat_callback()) -> rabbit_types:ok(pid())). -spec(start_heartbeat_receiver/3 :: - (pid(), rabbit_net:socket(), non_neg_integer()) -> + (rabbit_net:socket(), non_neg_integer(), heartbeat_callback()) -> rabbit_types:ok(pid())). +-spec(start_heartbeat_fun/1 :: + (pid()) -> start_heartbeat_fun()). + + -spec(pause_monitor/1 :: (heartbeaters()) -> 'ok'). -spec(resume_monitor/1 :: (heartbeaters()) -> 'ok'). @@ -58,40 +70,60 @@ %%---------------------------------------------------------------------------- -start_heartbeat_sender(_Parent, Sock, TimeoutSec) -> +start_heartbeat_sender(Sock, TimeoutSec, SendFun) -> %% the 'div 2' is there so that we don't end up waiting for nearly %% 2 * TimeoutSec before sending a heartbeat in the boundary case %% where the last message was sent just after a heartbeat. heartbeater( {Sock, TimeoutSec * 1000 div 2, send_oct, 0, fun () -> - catch rabbit_net:send( - Sock, rabbit_binary_generator:build_heartbeat_frame()), + SendFun(), continue end}). -start_heartbeat_receiver(Parent, Sock, TimeoutSec) -> +start_heartbeat_receiver(Sock, TimeoutSec, ReceiveFun) -> %% we check for incoming data every interval, and time out after %% two checks with no change. As a result we will time out between %% 2 and 3 intervals after the last data has been received. heartbeater({Sock, TimeoutSec * 1000, recv_oct, 1, fun () -> - Parent ! timeout, + ReceiveFun(), stop end}). -pause_monitor(none) -> +start_heartbeat_fun(SupPid) -> + fun (Sock, SendTimeoutSec, SendFun, ReceiveTimeoutSec, ReceiveFun) -> + {ok, Sender} = + start_heartbeater(SendTimeoutSec, SupPid, Sock, + SendFun, heartbeat_sender, + start_heartbeat_sender), + {ok, Receiver} = + start_heartbeater(ReceiveTimeoutSec, SupPid, Sock, + ReceiveFun, heartbeat_receiver, + start_heartbeat_receiver), + {Sender, Receiver} + end. + +pause_monitor({_Sender, none}) -> ok; pause_monitor({_Sender, Receiver}) -> Receiver ! pause, ok. -resume_monitor(none) -> +resume_monitor({_Sender, none}) -> ok; resume_monitor({_Sender, Receiver}) -> Receiver ! resume, ok. %%---------------------------------------------------------------------------- +start_heartbeater(0, _SupPid, _Sock, _TimeoutFun, _Name, _Callback) -> + {ok, none}; +start_heartbeater(TimeoutSec, SupPid, Sock, TimeoutFun, Name, Callback) -> + supervisor2:start_child( + SupPid, {Name, + {rabbit_heartbeat, Callback, + [Sock, TimeoutSec, TimeoutFun]}, + transient, ?MAX_WAIT, worker, [rabbit_heartbeat]}). heartbeater(Params) -> {ok, proc_lib:spawn_link(fun () -> heartbeater(Params, {0, 0}) end)}. diff --git a/src/rabbit_invariable_queue.erl b/src/rabbit_invariable_queue.erl index 4e0dad84..5a0532ea 100644 --- a/src/rabbit_invariable_queue.erl +++ b/src/rabbit_invariable_queue.erl @@ -31,9 +31,9 @@ -module(rabbit_invariable_queue). --export([init/3, terminate/1, delete_and_terminate/1, purge/1, publish/2, - publish_delivered/3, fetch/2, ack/2, tx_publish/3, tx_ack/3, - tx_rollback/2, tx_commit/3, requeue/2, len/1, is_empty/1, +-export([init/3, terminate/1, delete_and_terminate/1, purge/1, publish/3, + publish_delivered/4, fetch/2, ack/2, tx_publish/4, tx_ack/3, + dropwhile/2, tx_rollback/2, tx_commit/4, requeue/3, len/1, is_empty/1, set_ram_duration_target/2, ram_duration/1, needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1, status/1]). @@ -52,7 +52,7 @@ -type(state() :: #iv_state { queue :: queue(), qname :: rabbit_amqqueue:name(), len :: non_neg_integer(), - pending_ack :: dict:dictionary() + pending_ack :: dict() }). -include("rabbit_backing_queue_spec.hrl"). @@ -89,40 +89,61 @@ purge(State = #iv_state { queue = Q, qname = QName, durable = IsDurable, %% We do not purge messages pending acks. {AckTags, PA} = rabbit_misc:queue_fold( - fun ({#basic_message { is_persistent = false }, _IsDelivered}, Acc) -> + fun ({#basic_message { is_persistent = false }, + _MsgProps, _IsDelivered}, Acc) -> Acc; - ({Msg = #basic_message { guid = Guid }, IsDelivered}, + ({Msg = #basic_message { guid = Guid }, MsgProps, IsDelivered}, {AckTagsN, PAN}) -> ok = persist_delivery(QName, IsDurable, IsDelivered, Msg), - {[Guid | AckTagsN], dict:store(Guid, Msg, PAN)} + {[Guid | AckTagsN], store_ack(Msg, MsgProps, PAN)} end, {[], dict:new()}, Q), ok = persist_acks(QName, IsDurable, none, AckTags, PA), {Len, State #iv_state { len = 0, queue = queue:new() }}. -publish(Msg, State = #iv_state { queue = Q, qname = QName, durable = IsDurable, - len = Len }) -> - ok = persist_message(QName, IsDurable, none, Msg), - State #iv_state { queue = queue:in({Msg, false}, Q), len = Len + 1 }. +publish(Msg, MsgProps, State = #iv_state { queue = Q, + qname = QName, + durable = IsDurable, + len = Len }) -> + ok = persist_message(QName, IsDurable, none, Msg, MsgProps), + State #iv_state { queue = enqueue(Msg, MsgProps, false, Q), len = Len + 1 }. -publish_delivered(false, _Msg, State) -> +publish_delivered(false, _Msg, _MsgProps, State) -> {blank_ack, State}; publish_delivered(true, Msg = #basic_message { guid = Guid }, + MsgProps, State = #iv_state { qname = QName, durable = IsDurable, len = 0, pending_ack = PA }) -> - ok = persist_message(QName, IsDurable, none, Msg), + ok = persist_message(QName, IsDurable, none, Msg, MsgProps), ok = persist_delivery(QName, IsDurable, false, Msg), - {Guid, State #iv_state { pending_ack = dict:store(Guid, Msg, PA) }}. + {Guid, State #iv_state { pending_ack = store_ack(Msg, MsgProps, PA) }}. + +dropwhile(_Pred, State = #iv_state { len = 0 }) -> + State; +dropwhile(Pred, State = #iv_state { queue = Q }) -> + {{value, {Msg, MsgProps, IsDelivered}}, Q1} = queue:out(Q), + case Pred(MsgProps) of + true -> {_, State1} = fetch_internal(false, Q1, Msg, MsgProps, + IsDelivered, State), + dropwhile(Pred, State1); + false -> State + end. fetch(_AckRequired, State = #iv_state { len = 0 }) -> {empty, State}; -fetch(AckRequired, State = #iv_state { len = Len, queue = Q, qname = QName, - durable = IsDurable, - pending_ack = PA }) -> - {{value, {Msg = #basic_message { guid = Guid }, IsDelivered}}, Q1} = - queue:out(Q), +fetch(AckRequired, State = #iv_state { queue = Q }) -> + {{value, {Msg, MsgProps, IsDelivered}}, Q1} = queue:out(Q), + fetch_internal(AckRequired, Q1, Msg, MsgProps, IsDelivered, State). + +fetch_internal(AckRequired, Q1, + Msg = #basic_message { guid = Guid }, + MsgProps, IsDelivered, + State = #iv_state { len = Len, + qname = QName, + durable = IsDurable, + pending_ack = PA }) -> Len1 = Len - 1, ok = persist_delivery(QName, IsDurable, IsDelivered, Msg), - PA1 = dict:store(Guid, Msg, PA), + PA1 = store_ack(Msg, MsgProps, PA), {AckTag, PA2} = case AckRequired of true -> {Guid, PA1}; false -> ok = persist_acks(QName, IsDurable, none, @@ -138,11 +159,11 @@ ack(AckTags, State = #iv_state { qname = QName, durable = IsDurable, PA1 = remove_acks(AckTags, PA), State #iv_state { pending_ack = PA1 }. -tx_publish(Txn, Msg, State = #iv_state { qname = QName, - durable = IsDurable }) -> +tx_publish(Txn, Msg, MsgProps, State = #iv_state { qname = QName, + durable = IsDurable }) -> Tx = #tx { pending_messages = Pubs } = lookup_tx(Txn), - store_tx(Txn, Tx #tx { pending_messages = [Msg | Pubs] }), - ok = persist_message(QName, IsDurable, Txn, Msg), + store_tx(Txn, Tx #tx { pending_messages = [{Msg, MsgProps} | Pubs] }), + ok = persist_message(QName, IsDurable, Txn, Msg, MsgProps), State. tx_ack(Txn, AckTags, State = #iv_state { qname = QName, durable = IsDurable, @@ -159,8 +180,10 @@ tx_rollback(Txn, State = #iv_state { qname = QName }) -> erase_tx(Txn), {lists:flatten(AckTags), State}. -tx_commit(Txn, Fun, State = #iv_state { qname = QName, pending_ack = PA, - queue = Q, len = Len }) -> +tx_commit(Txn, Fun, MsgPropsFun, State = #iv_state { qname = QName, + pending_ack = PA, + queue = Q, + len = Len }) -> #tx { pending_acks = AckTags, pending_messages = PubsRev } = lookup_tx(Txn), ok = do_if_persistent(fun rabbit_persister:commit_transaction/1, Txn, QName), @@ -168,13 +191,16 @@ tx_commit(Txn, Fun, State = #iv_state { qname = QName, pending_ack = PA, Fun(), AckTags1 = lists:flatten(AckTags), PA1 = remove_acks(AckTags1, PA), - {Q1, Len1} = lists:foldr(fun (Msg, {QN, LenN}) -> - {queue:in({Msg, false}, QN), LenN + 1} + {Q1, Len1} = lists:foldr(fun ({Msg, MsgProps}, {QN, LenN}) -> + {enqueue(Msg, MsgPropsFun(MsgProps), + false, QN), + LenN + 1} end, {Q, Len}, PubsRev), {AckTags1, State #iv_state { pending_ack = PA1, queue = Q1, len = Len1 }}. -requeue(AckTags, State = #iv_state { pending_ack = PA, queue = Q, - len = Len }) -> +requeue(AckTags, MsgPropsFun, State = #iv_state { pending_ack = PA, + queue = Q, + len = Len }) -> %% We don't need to touch the persister here - the persister will %% already have these messages published and delivered as %% necessary. The complication is that the persister's seq_id will @@ -186,12 +212,17 @@ requeue(AckTags, State = #iv_state { pending_ack = PA, queue = Q, %% order to the last known state of our queue, prior to shutdown. {Q1, Len1} = lists:foldl( fun (Guid, {QN, LenN}) -> - {ok, Msg = #basic_message {}} = dict:find(Guid, PA), - {queue:in({Msg, true}, QN), LenN + 1} + {Msg = #basic_message {}, MsgProps} + = dict:fetch(Guid, PA), + {enqueue(Msg, MsgPropsFun(MsgProps), true, QN), + LenN + 1} end, {Q, Len}, AckTags), PA1 = remove_acks(AckTags, PA), State #iv_state { pending_ack = PA1, queue = Q1, len = Len1 }. +enqueue(Msg, MsgProps, IsDelivered, Q) -> + queue:in({Msg, MsgProps, IsDelivered}, Q). + len(#iv_state { len = Len }) -> Len. is_empty(State) -> 0 == len(State). @@ -212,6 +243,9 @@ status(_State) -> []. remove_acks(AckTags, PA) -> lists:foldl(fun dict:erase/2, PA, AckTags). +store_ack(Msg = #basic_message { guid = Guid }, MsgProps, PA) -> + dict:store(Guid, {Msg, MsgProps}, PA). + %%---------------------------------------------------------------------------- lookup_tx(Txn) -> @@ -243,14 +277,15 @@ do_if_persistent(F, Txn, QName) -> %%---------------------------------------------------------------------------- persist_message(QName, true, Txn, Msg = #basic_message { - is_persistent = true }) -> + is_persistent = true }, MsgProps) -> Msg1 = Msg #basic_message { %% don't persist any recoverable decoded properties content = rabbit_binary_parser:clear_decoded_content( Msg #basic_message.content)}, persist_work(Txn, QName, - [{publish, Msg1, {QName, Msg1 #basic_message.guid}}]); -persist_message(_QName, _IsDurable, _Txn, _Msg) -> + [{publish, Msg1, MsgProps, + {QName, Msg1 #basic_message.guid}}]); +persist_message(_QName, _IsDurable, _Txn, _Msg, _MsgProps) -> ok. persist_delivery(QName, true, false, #basic_message { is_persistent = true, @@ -263,7 +298,8 @@ persist_acks(QName, true, Txn, AckTags, PA) -> persist_work(Txn, QName, [{ack, {QName, Guid}} || Guid <- AckTags, begin - {ok, Msg} = dict:find(Guid, PA), + {Msg, _MsgProps} + = dict:fetch(Guid, PA), Msg #basic_message.is_persistent end]); persist_acks(_QName, _IsDurable, _Txn, _AckTags, _PA) -> diff --git a/src/rabbit_memory_monitor.erl b/src/rabbit_memory_monitor.erl index f87b6271..bb4d77a4 100644 --- a/src/rabbit_memory_monitor.erl +++ b/src/rabbit_memory_monitor.erl @@ -63,8 +63,9 @@ %% have some space for when the queues don't quite respond as fast as %% we would like, or when there is buffering going on in other parts %% of the system. In short, we aim to stay some distance away from -%% when the memory alarms will go off, which cause channel.flow. -%% Note that all other Thresholds are relative to this scaling. +%% when the memory alarms will go off, which cause backpressure (of +%% some sort) on producers. Note that all other Thresholds are +%% relative to this scaling. -define(MEMORY_LIMIT_SCALING, 0.4). -define(LIMIT_THRESHOLD, 0.5). %% don't limit queues when mem use is < this diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 086d260e..0522afdc 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -64,6 +64,8 @@ -export([recursive_delete/1, dict_cons/3, orddict_cons/3, unlink_and_capture_exit/1]). -export([get_options/2]). +-export([all_module_attributes/1, build_acyclic_graph/4]). +-export([now_ms/0]). -import(mnesia). -import(lists). @@ -82,6 +84,12 @@ -type(optdef() :: {flag, string()} | {option, string(), any()}). -type(channel_or_connection_exit() :: rabbit_types:channel_exit() | rabbit_types:connection_exit()). +-type(digraph_label() :: term()). +-type(graph_vertex_fun() :: + fun ((atom(), [term()]) -> {digraph:vertex(), digraph_label()})). +-type(graph_edge_fun() :: + fun ((atom(), [term()]) -> {digraph:vertex(), digraph:vertex()})). +-type(graph_error_fun() :: fun ((any()) -> any() | no_return())). -spec(method_record_type/1 :: (rabbit_framing:amqp_method_record()) -> rabbit_framing:amqp_method_name()). @@ -128,8 +136,8 @@ -spec(enable_cover/0 :: () -> ok_or_error()). -spec(start_cover/1 :: ([{string(), string()} | string()]) -> 'ok'). -spec(report_cover/0 :: () -> 'ok'). --spec(enable_cover/1 :: (file:filename()) -> ok_or_error()). --spec(report_cover/1 :: (file:filename()) -> 'ok'). +-spec(enable_cover/1 :: ([file:filename() | atom()]) -> ok_or_error()). +-spec(report_cover/1 :: ([file:filename() | atom()]) -> 'ok'). -spec(throw_on_error/2 :: (atom(), thunk(rabbit_types:error(any()) | {ok, A} | A)) -> A). -spec(with_exit_handler/2 :: (thunk(A), thunk(A)) -> A). @@ -177,13 +185,16 @@ -spec(recursive_delete/1 :: ([file:filename()]) -> rabbit_types:ok_or_error({file:filename(), any()})). --spec(dict_cons/3 :: (any(), any(), dict:dictionary()) -> - dict:dictionary()). --spec(orddict_cons/3 :: (any(), any(), orddict:dictionary()) -> - orddict:dictionary()). +-spec(dict_cons/3 :: (any(), any(), dict()) -> dict()). +-spec(orddict_cons/3 :: (any(), any(), orddict:orddict()) -> orddict:orddict()). -spec(unlink_and_capture_exit/1 :: (pid()) -> 'ok'). -spec(get_options/2 :: ([optdef()], [string()]) -> {[string()], [{string(), any()}]}). +-spec(all_module_attributes/1 :: (atom()) -> [{atom(), [term()]}]). +-spec(build_acyclic_graph/4 :: (graph_vertex_fun(), graph_edge_fun(), + graph_error_fun(), [{atom(), [term()]}]) -> + digraph()). +-spec(now_ms/0 :: () -> non_neg_integer()). -endif. @@ -268,29 +279,30 @@ rs(#resource{virtual_host = VHostPath, kind = Kind, name = Name}) -> lists:flatten(io_lib:format("~s '~s' in vhost '~s'", [Kind, Name, VHostPath])). -enable_cover() -> - enable_cover("."). +enable_cover() -> enable_cover(["."]). -enable_cover([Root]) when is_atom(Root) -> - enable_cover(atom_to_list(Root)); -enable_cover(Root) -> - case cover:compile_beam_directory(filename:join(Root, "ebin")) of - {error,Reason} -> {error,Reason}; - _ -> ok - end. +enable_cover(Dirs) -> + lists:foldl(fun (Dir, ok) -> + case cover:compile_beam_directory( + filename:join(lists:concat([Dir]),"ebin")) of + {error, _} = Err -> Err; + _ -> ok + end; + (_Dir, Err) -> + Err + end, ok, Dirs). start_cover(NodesS) -> {ok, _} = cover:start([makenode(N) || N <- NodesS]), ok. -report_cover() -> - report_cover("."). +report_cover() -> report_cover(["."]). + +report_cover(Dirs) -> [report_cover1(lists:concat([Dir])) || Dir <- Dirs], ok. -report_cover([Root]) when is_atom(Root) -> - report_cover(atom_to_list(Root)); -report_cover(Root) -> +report_cover1(Root) -> Dir = filename:join(Root, "cover"), - ok = filelib:ensure_dir(filename:join(Dir,"junk")), + ok = filelib:ensure_dir(filename:join(Dir, "junk")), lists:foreach(fun (F) -> file:delete(F) end, filelib:wildcard(filename:join(Dir, "*.html"))), {ok, SummaryFile} = file:open(filename:join(Dir, "summary.txt"), [write]), @@ -721,3 +733,48 @@ get_flag(K, [Nk | As]) -> {[Nk | As1], V}; get_flag(_, []) -> {[], false}. + +now_ms() -> + timer:now_diff(now(), {0,0,0}) div 1000. + +module_attributes(Module) -> + case catch Module:module_info(attributes) of + {'EXIT', {undef, [{Module, module_info, _} | _]}} -> + io:format("WARNING: module ~p not found, so not scanned for boot steps.~n", + [Module]), + []; + {'EXIT', Reason} -> + exit(Reason); + V -> + V + end. + +all_module_attributes(Name) -> + Modules = + lists:usort( + lists:append( + [Modules || {App, _, _} <- application:loaded_applications(), + {ok, Modules} <- [application:get_key(App, modules)]])), + lists:foldl( + fun (Module, Acc) -> + case lists:append([Atts || {N, Atts} <- module_attributes(Module), + N =:= Name]) of + [] -> Acc; + Atts -> [{Module, Atts} | Acc] + end + end, [], Modules). + + +build_acyclic_graph(VertexFun, EdgeFun, ErrorFun, Graph) -> + G = digraph:new([acyclic]), + [ case digraph:vertex(G, Vertex) of + false -> digraph:add_vertex(G, Vertex, Label); + _ -> ErrorFun({vertex, duplicate, Vertex}) + end || {Module, Atts} <- Graph, + {Vertex, Label} <- VertexFun(Module, Atts) ], + [ case digraph:add_edge(G, From, To) of + {error, E} -> ErrorFun({edge, E, From, To}); + _ -> ok + end || {Module, Atts} <- Graph, + {From, To} <- EdgeFun(Module, Atts) ], + G. diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 577d206d..9d172269 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -44,9 +44,6 @@ -include("rabbit.hrl"). --define(SCHEMA_VERSION_SET, []). --define(SCHEMA_VERSION_FILENAME, "schema_version"). - %%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -94,9 +91,6 @@ init() -> ok = ensure_mnesia_running(), ok = ensure_mnesia_dir(), ok = init_db(read_cluster_nodes_config(), true), - ok = rabbit_misc:write_term_file(filename:join( - dir(), ?SCHEMA_VERSION_FILENAME), - [?SCHEMA_VERSION_SET]), ok. is_db_empty() -> @@ -256,12 +250,12 @@ ensure_mnesia_dir() -> ensure_mnesia_running() -> case mnesia:system_info(is_running) of yes -> ok; - no -> throw({error, mnesia_not_running}) + no -> throw({error, mnesia_not_running}) end. ensure_mnesia_not_running() -> case mnesia:system_info(is_running) of - no -> ok; + no -> ok; yes -> throw({error, mnesia_unexpectedly_running}) end. @@ -340,10 +334,8 @@ read_cluster_nodes_config() -> case rabbit_misc:read_term_file(FileName) of {ok, [ClusterNodes]} -> ClusterNodes; {error, enoent} -> - case application:get_env(cluster_nodes) of - undefined -> []; - {ok, ClusterNodes} -> ClusterNodes - end; + {ok, ClusterNodes} = application:get_env(rabbit, cluster_nodes), + ClusterNodes; {error, Reason} -> throw({error, {cannot_read_cluster_nodes_config, FileName, Reason}}) @@ -380,28 +372,31 @@ init_db(ClusterNodes, Force) -> end; _ -> ok end, - case Nodes of - [] -> - case mnesia:system_info(use_dir) of - true -> - case check_schema_integrity() of - ok -> - ok; - {error, Reason} -> - %% NB: we cannot use rabbit_log here since - %% it may not have been started yet - error_logger:warning_msg( - "schema integrity check failed: ~p~n" - "moving database to backup location " - "and recreating schema from scratch~n", - [Reason]), - ok = move_db(), - ok = create_schema() - end; - false -> - ok = create_schema() + case {Nodes, mnesia:system_info(use_dir), + mnesia:system_info(db_nodes)} of + {[], true, [_]} -> + %% True single disc node, attempt upgrade + wait_for_tables(), + case rabbit_upgrade:maybe_upgrade() of + ok -> + schema_ok_or_exit(); + version_not_available -> + schema_ok_or_move() end; - [_|_] -> + {[], true, _} -> + %% "Master" (i.e. without config) disc node in cluster, + %% verify schema + wait_for_tables(), + version_ok_or_exit(rabbit_upgrade:read_version()), + schema_ok_or_exit(); + {[], false, _} -> + %% First RAM node in cluster, start from scratch + ok = create_schema(); + {[AnotherNode|_], _, _} -> + %% Subsequent node in cluster, catch up + version_ok_or_exit(rabbit_upgrade:read_version()), + version_ok_or_exit( + rpc:call(AnotherNode, rabbit_upgrade, read_version, [])), IsDiskNode = ClusterNodes == [] orelse lists:member(node(), ClusterNodes), ok = wait_for_replicated_tables(), @@ -410,7 +405,7 @@ init_db(ClusterNodes, Force) -> true -> disc; false -> ram end), - ok = ensure_schema_integrity() + schema_ok_or_exit() end; {error, Reason} -> %% one reason we may end up here is if we try to join @@ -420,6 +415,39 @@ init_db(ClusterNodes, Force) -> ClusterNodes, Reason}}) end. +schema_ok_or_move() -> + case check_schema_integrity() of + ok -> + ok; + {error, Reason} -> + %% NB: we cannot use rabbit_log here since it may not have been + %% started yet + error_logger:warning_msg("schema integrity check failed: ~p~n" + "moving database to backup location " + "and recreating schema from scratch~n", + [Reason]), + ok = move_db(), + ok = create_schema() + end. + +version_ok_or_exit({ok, DiscVersion}) -> + case rabbit_upgrade:desired_version() of + DiscVersion -> + ok; + DesiredVersion -> + exit({schema_mismatch, DesiredVersion, DiscVersion}) + end; +version_ok_or_exit({error, _}) -> + ok = rabbit_upgrade:write_version(). + +schema_ok_or_exit() -> + case check_schema_integrity() of + ok -> + ok; + {error, Reason} -> + exit({schema_invalid, Reason}) + end. + create_schema() -> mnesia:stop(), rabbit_misc:ensure_ok(mnesia:create_schema([node()]), @@ -428,7 +456,8 @@ create_schema() -> cannot_start_mnesia), ok = create_tables(), ok = ensure_schema_integrity(), - ok = wait_for_tables(). + ok = wait_for_tables(), + ok = rabbit_upgrade:write_version(). move_db() -> mnesia:stop(), diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 66cc06cf..fd84109b 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -34,10 +34,12 @@ -behaviour(gen_server2). -export([start_link/4, successfully_recovered_state/1, - client_init/2, client_terminate/2, client_delete_and_terminate/3, - write/4, read/3, contains/2, remove/2, release/2, sync/3]). + client_init/2, client_terminate/1, client_delete_and_terminate/1, + client_ref/1, + write/3, read/2, contains/2, remove/2, release/2, sync/3]). --export([sync/1, gc_done/4, set_maximum_since_use/2, gc/3]). %% internal +-export([sync/1, set_maximum_since_use/2, + has_readers/2, combine_files/3, delete_file/2]). %% internal -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3, prioritise_call/3, prioritise_cast/2]). @@ -74,7 +76,6 @@ sum_valid_data, %% sum of valid data in all files sum_file_size, %% sum of file sizes pending_gc_completion, %% things to do once GC completes - gc_active, %% is the GC currently working? gc_pid, %% pid of our GC file_handles_ets, %% tid of the shared file handles table file_summary_ets, %% tid of the file summary table @@ -86,7 +87,9 @@ }). -record(client_msstate, - { file_handle_cache, + { server, + client_ref, + file_handle_cache, index_state, index_module, dir, @@ -100,14 +103,34 @@ -record(file_summary, {file, valid_total_size, left, right, file_size, locked, readers}). +-record(gc_state, + { dir, + index_module, + index_state, + file_summary_ets, + msg_store + }). + %%---------------------------------------------------------------------------- -ifdef(use_specs). +-export_type([gc_state/0, file_num/0]). + +-type(gc_state() :: #gc_state { dir :: file:filename(), + index_module :: atom(), + index_state :: any(), + file_summary_ets :: ets:tid(), + msg_store :: server() + }). + -type(server() :: pid() | atom()). +-type(client_ref() :: binary()). -type(file_num() :: non_neg_integer()). -type(client_msstate() :: #client_msstate { - file_handle_cache :: dict:dictionary(), + server :: server(), + client_ref :: client_ref(), + file_handle_cache :: dict(), index_state :: any(), index_module :: atom(), dir :: file:filename(), @@ -124,26 +147,25 @@ (atom(), file:filename(), [binary()] | 'undefined', startup_fun_state()) -> rabbit_types:ok_pid_or_error()). -spec(successfully_recovered_state/1 :: (server()) -> boolean()). --spec(client_init/2 :: (server(), binary()) -> client_msstate()). --spec(client_terminate/2 :: (client_msstate(), server()) -> 'ok'). --spec(client_delete_and_terminate/3 :: - (client_msstate(), server(), binary()) -> 'ok'). --spec(write/4 :: (server(), rabbit_guid:guid(), msg(), client_msstate()) -> - rabbit_types:ok(client_msstate())). --spec(read/3 :: (server(), rabbit_guid:guid(), client_msstate()) -> - {rabbit_types:ok(msg()) | 'not_found', client_msstate()}). --spec(contains/2 :: (server(), rabbit_guid:guid()) -> boolean()). --spec(remove/2 :: (server(), [rabbit_guid:guid()]) -> 'ok'). --spec(release/2 :: (server(), [rabbit_guid:guid()]) -> 'ok'). --spec(sync/3 :: (server(), [rabbit_guid:guid()], fun (() -> any())) -> 'ok'). +-spec(client_init/2 :: (server(), client_ref()) -> client_msstate()). +-spec(client_terminate/1 :: (client_msstate()) -> 'ok'). +-spec(client_delete_and_terminate/1 :: (client_msstate()) -> 'ok'). +-spec(client_ref/1 :: (client_msstate()) -> client_ref()). +-spec(write/3 :: (rabbit_guid:guid(), msg(), client_msstate()) -> 'ok'). +-spec(read/2 :: (rabbit_guid:guid(), client_msstate()) -> + {rabbit_types:ok(msg()) | 'not_found', client_msstate()}). +-spec(contains/2 :: (rabbit_guid:guid(), client_msstate()) -> boolean()). +-spec(remove/2 :: ([rabbit_guid:guid()], client_msstate()) -> 'ok'). +-spec(release/2 :: ([rabbit_guid:guid()], client_msstate()) -> 'ok'). +-spec(sync/3 :: ([rabbit_guid:guid()], fun (() -> any()), client_msstate()) -> + 'ok'). -spec(sync/1 :: (server()) -> 'ok'). --spec(gc_done/4 :: (server(), non_neg_integer(), file_num(), file_num()) -> - 'ok'). -spec(set_maximum_since_use/2 :: (server(), non_neg_integer()) -> 'ok'). --spec(gc/3 :: (non_neg_integer(), non_neg_integer(), - {ets:tid(), file:filename(), atom(), any()}) -> - 'concurrent_readers' | non_neg_integer()). +-spec(has_readers/2 :: (non_neg_integer(), gc_state()) -> boolean()). +-spec(combine_files/3 :: (non_neg_integer(), non_neg_integer(), gc_state()) -> + 'ok'). +-spec(delete_file/2 :: (non_neg_integer(), gc_state()) -> 'ok'). -endif. @@ -316,7 +338,9 @@ client_init(Server, Ref) -> {IState, IModule, Dir, GCPid, FileHandlesEts, FileSummaryEts, DedupCacheEts, CurFileCacheEts} = gen_server2:call(Server, {new_client_state, Ref}, infinity), - #client_msstate { file_handle_cache = dict:new(), + #client_msstate { server = Server, + client_ref = Ref, + file_handle_cache = dict:new(), index_state = IState, index_module = IModule, dir = Dir, @@ -326,20 +350,22 @@ client_init(Server, Ref) -> dedup_cache_ets = DedupCacheEts, cur_file_cache_ets = CurFileCacheEts }. -client_terminate(CState, Server) -> +client_terminate(CState) -> close_all_handles(CState), - ok = gen_server2:call(Server, client_terminate, infinity). + ok = server_call(CState, client_terminate). -client_delete_and_terminate(CState, Server, Ref) -> +client_delete_and_terminate(CState = #client_msstate { client_ref = Ref }) -> close_all_handles(CState), - ok = gen_server2:cast(Server, {client_delete, Ref}). + ok = server_cast(CState, {client_delete, Ref}). -write(Server, Guid, Msg, +client_ref(#client_msstate { client_ref = Ref }) -> Ref. + +write(Guid, Msg, CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts }) -> ok = update_msg_cache(CurFileCacheEts, Guid, Msg), - {gen_server2:cast(Server, {write, Guid}), CState}. + ok = server_cast(CState, {write, Guid}). -read(Server, Guid, +read(Guid, CState = #client_msstate { dedup_cache_ets = DedupCacheEts, cur_file_cache_ets = CurFileCacheEts }) -> %% 1. Check the dedup cache @@ -348,13 +374,12 @@ read(Server, Guid, %% 2. Check the cur file cache case ets:lookup(CurFileCacheEts, Guid) of [] -> - Defer = fun() -> {gen_server2:call( - Server, {read, Guid}, infinity), - CState} end, + Defer = fun() -> + {server_call(CState, {read, Guid}), CState} + end, case index_lookup_positive_ref_count(Guid, CState) of not_found -> Defer(); - MsgLocation -> client_read1(Server, MsgLocation, Defer, - CState) + MsgLocation -> client_read1(MsgLocation, Defer, CState) end; [{Guid, Msg, _CacheRefCount}] -> %% Although we've found it, we don't know the @@ -365,19 +390,16 @@ read(Server, Guid, {{ok, Msg}, CState} end. -contains(Server, Guid) -> gen_server2:call(Server, {contains, Guid}, infinity). -remove(_Server, []) -> ok; -remove(Server, Guids) -> gen_server2:cast(Server, {remove, Guids}). -release(_Server, []) -> ok; -release(Server, Guids) -> gen_server2:cast(Server, {release, Guids}). -sync(Server, Guids, K) -> gen_server2:cast(Server, {sync, Guids, K}). +contains(Guid, CState) -> server_call(CState, {contains, Guid}). +remove([], _CState) -> ok; +remove(Guids, CState) -> server_cast(CState, {remove, Guids}). +release([], _CState) -> ok; +release(Guids, CState) -> server_cast(CState, {release, Guids}). +sync(Guids, K, CState) -> server_cast(CState, {sync, Guids, K}). sync(Server) -> gen_server2:cast(Server, sync). -gc_done(Server, Reclaimed, Source, Destination) -> - gen_server2:cast(Server, {gc_done, Reclaimed, Source, Destination}). - set_maximum_since_use(Server, Age) -> gen_server2:cast(Server, {set_maximum_since_use, Age}). @@ -385,18 +407,22 @@ set_maximum_since_use(Server, Age) -> %% Client-side-only helpers %%---------------------------------------------------------------------------- -client_read1(Server, - #msg_location { guid = Guid, file = File } = MsgLocation, - Defer, +server_call(#client_msstate { server = Server }, Msg) -> + gen_server2:call(Server, Msg, infinity). + +server_cast(#client_msstate { server = Server }, Msg) -> + gen_server2:cast(Server, Msg). + +client_read1(#msg_location { guid = Guid, file = File } = MsgLocation, Defer, CState = #client_msstate { file_summary_ets = FileSummaryEts }) -> case ets:lookup(FileSummaryEts, File) of [] -> %% File has been GC'd and no longer exists. Go around again. - read(Server, Guid, CState); + read(Guid, CState); [#file_summary { locked = Locked, right = Right }] -> - client_read2(Server, Locked, Right, MsgLocation, Defer, CState) + client_read2(Locked, Right, MsgLocation, Defer, CState) end. -client_read2(_Server, false, undefined, _MsgLocation, Defer, _CState) -> +client_read2(false, undefined, _MsgLocation, Defer, _CState) -> %% Although we've already checked both caches and not found the %% message there, the message is apparently in the %% current_file. We can only arrive here if we are trying to read @@ -407,12 +433,12 @@ client_read2(_Server, false, undefined, _MsgLocation, Defer, _CState) -> %% contents of the current file, thus reads from the current file %% will end up here and will need to be deferred. Defer(); -client_read2(_Server, true, _Right, _MsgLocation, Defer, _CState) -> +client_read2(true, _Right, _MsgLocation, Defer, _CState) -> %% Of course, in the mean time, the GC could have run and our msg %% is actually in a different file, unlocked. However, defering is %% the safest and simplest thing to do. Defer(); -client_read2(Server, false, _Right, +client_read2(false, _Right, MsgLocation = #msg_location { guid = Guid, file = File }, Defer, CState = #client_msstate { file_summary_ets = FileSummaryEts }) -> @@ -421,10 +447,10 @@ client_read2(Server, false, _Right, %% finished. safe_ets_update_counter( FileSummaryEts, File, {#file_summary.readers, +1}, - fun (_) -> client_read3(Server, MsgLocation, Defer, CState) end, - fun () -> read(Server, Guid, CState) end). + fun (_) -> client_read3(MsgLocation, Defer, CState) end, + fun () -> read(Guid, CState) end). -client_read3(Server, #msg_location { guid = Guid, file = File }, Defer, +client_read3(#msg_location { guid = Guid, file = File }, Defer, CState = #client_msstate { file_handles_ets = FileHandlesEts, file_summary_ets = FileSummaryEts, dedup_cache_ets = DedupCacheEts, @@ -448,7 +474,7 @@ client_read3(Server, #msg_location { guid = Guid, file = File }, Defer, %% too). case ets:lookup(FileSummaryEts, File) of [] -> %% GC has deleted our file, just go round again. - read(Server, Guid, CState); + read(Guid, CState); [#file_summary { locked = true }] -> %% If we get a badarg here, then the GC has finished and %% deleted our file. Try going around again. Otherwise, @@ -459,7 +485,7 @@ client_read3(Server, #msg_location { guid = Guid, file = File }, Defer, %% unlocks the dest) try Release(), Defer() - catch error:badarg -> read(Server, Guid, CState) + catch error:badarg -> read(Guid, CState) end; [#file_summary { locked = false }] -> %% Ok, we're definitely safe to continue - a GC involving @@ -482,9 +508,14 @@ client_read3(Server, #msg_location { guid = Guid, file = File }, Defer, read_from_disk(MsgLocation, CState1, DedupCacheEts), Release(), %% this MUST NOT fail with badarg {{ok, Msg}, CState2}; - MsgLocation -> %% different file! + #msg_location {} = MsgLocation -> %% different file! Release(), %% this MUST NOT fail with badarg - client_read1(Server, MsgLocation, Defer, CState) + client_read1(MsgLocation, Defer, CState); + not_found -> %% it seems not to exist. Defer, just to be sure. + try Release() %% this can badarg, same as locked case, above + catch error:badarg -> ok + end, + Defer() end end. @@ -547,8 +578,7 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) -> sync_timer_ref = undefined, sum_valid_data = 0, sum_file_size = 0, - pending_gc_completion = [], - gc_active = false, + pending_gc_completion = orddict:new(), gc_pid = undefined, file_handles_ets = FileHandlesEts, file_summary_ets = FileSummaryEts, @@ -570,8 +600,13 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) -> {ok, Offset} = file_handle_cache:position(CurHdl, Offset), ok = file_handle_cache:truncate(CurHdl), - {ok, GCPid} = rabbit_msg_store_gc:start_link(Dir, IndexState, IndexModule, - FileSummaryEts), + {ok, GCPid} = rabbit_msg_store_gc:start_link( + #gc_state { dir = Dir, + index_module = IndexModule, + index_state = IndexState, + file_summary_ets = FileSummaryEts, + msg_store = self() + }), {ok, maybe_compact( State1 #msstate { current_file_handle = CurHdl, gc_pid = GCPid }), @@ -588,10 +623,11 @@ prioritise_call(Msg, _From, _State) -> prioritise_cast(Msg, _State) -> case Msg of - sync -> 8; - {gc_done, _Reclaimed, _Source, _Destination} -> 8; - {set_maximum_since_use, _Age} -> 8; - _ -> 0 + sync -> 8; + {combine_files, _Source, _Destination, _Reclaimed} -> 8; + {delete_file, _File, _Reclaimed} -> 8; + {set_maximum_since_use, _Age} -> 8; + _ -> 0 end. handle_call(successfully_recovered_state, _From, State) -> @@ -686,37 +722,23 @@ handle_cast({sync, Guids, K}, handle_cast(sync, State) -> noreply(internal_sync(State)); -handle_cast({gc_done, Reclaimed, Src, Dst}, +handle_cast({combine_files, Source, Destination, Reclaimed}, State = #msstate { sum_file_size = SumFileSize, - gc_active = {Src, Dst}, file_handles_ets = FileHandlesEts, file_summary_ets = FileSummaryEts }) -> - %% GC done, so now ensure that any clients that have open fhs to - %% those files close them before using them again. This has to be - %% done here (given it's done in the msg_store, and not the gc), - %% and not when starting up the GC, because if done when starting - %% up the GC, the client could find the close, and close and - %% reopen the fh, whilst the GC is waiting for readers to - %% disappear, before it's actually done the GC. - true = mark_handle_to_close(FileHandlesEts, Src), - true = mark_handle_to_close(FileHandlesEts, Dst), - %% we always move data left, so Src has gone and was on the - %% right, so need to make dest = source.right.left, and also - %% dest.right = source.right - [#file_summary { left = Dst, - right = SrcRight, - locked = true, - readers = 0 }] = ets:lookup(FileSummaryEts, Src), - %% this could fail if SrcRight =:= undefined - ets:update_element(FileSummaryEts, SrcRight, {#file_summary.left, Dst}), - true = ets:update_element(FileSummaryEts, Dst, - [{#file_summary.locked, false}, - {#file_summary.right, SrcRight}]), - true = ets:delete(FileSummaryEts, Src), - noreply( - maybe_compact(run_pending( - State #msstate { sum_file_size = SumFileSize - Reclaimed, - gc_active = false }))); + ok = cleanup_after_file_deletion(Source, State), + %% see comment in cleanup_after_file_deletion + true = mark_handle_to_close(FileHandlesEts, Destination), + true = ets:update_element(FileSummaryEts, Destination, + {#file_summary.locked, false}), + State1 = State #msstate { sum_file_size = SumFileSize - Reclaimed }, + noreply(maybe_compact(run_pending([Source, Destination], State1))); + +handle_cast({delete_file, File, Reclaimed}, + State = #msstate { sum_file_size = SumFileSize }) -> + ok = cleanup_after_file_deletion(File, State), + State1 = State #msstate { sum_file_size = SumFileSize - Reclaimed }, + noreply(maybe_compact(run_pending([File], State1))); handle_cast({set_maximum_since_use, Age}, State) -> ok = file_handle_cache:set_maximum_since_use(Age), @@ -867,7 +889,7 @@ read_message1(From, #msg_location { guid = Guid, ref_count = RefCount, ets:lookup(FileSummaryEts, File), case Locked of true -> add_to_pending_gc_completion({read, Guid, From}, - State); + File, State); false -> {Msg, State1} = read_from_disk(MsgLoc, State, DedupCacheEts), gen_server2:reply(From, {ok, Msg}), @@ -897,19 +919,18 @@ read_from_disk(#msg_location { guid = Guid, ref_count = RefCount, ok = maybe_insert_into_cache(DedupCacheEts, RefCount, Guid, Msg), {Msg, State1}. -contains_message(Guid, From, State = #msstate { gc_active = GCActive }) -> +contains_message(Guid, From, + State = #msstate { pending_gc_completion = Pending }) -> case index_lookup_positive_ref_count(Guid, State) of not_found -> gen_server2:reply(From, false), State; #msg_location { file = File } -> - case GCActive of - {A, B} when File =:= A orelse File =:= B -> - add_to_pending_gc_completion( - {contains, Guid, From}, State); - _ -> - gen_server2:reply(From, true), - State + case orddict:is_key(File, Pending) of + true -> add_to_pending_gc_completion( + {contains, Guid, From}, File, State); + false -> gen_server2:reply(From, true), + State end end. @@ -928,7 +949,7 @@ remove_message(Guid, State = #msstate { sum_valid_data = SumValid, 1 -> ok = remove_cache_entry(DedupCacheEts, Guid), case ets:lookup(FileSummaryEts, File) of [#file_summary { locked = true } ] -> - add_to_pending_gc_completion({remove, Guid}, State); + add_to_pending_gc_completion({remove, Guid}, File, State); [#file_summary {}] -> ok = Dec(), [_] = ets:update_counter( @@ -944,20 +965,25 @@ remove_message(Guid, State = #msstate { sum_valid_data = SumValid, end. add_to_pending_gc_completion( - Op, State = #msstate { pending_gc_completion = Pending }) -> - State #msstate { pending_gc_completion = [Op | Pending] }. - -run_pending(State = #msstate { pending_gc_completion = [] }) -> - State; -run_pending(State = #msstate { pending_gc_completion = Pending }) -> - State1 = State #msstate { pending_gc_completion = [] }, - lists:foldl(fun run_pending/2, State1, lists:reverse(Pending)). + Op, File, State = #msstate { pending_gc_completion = Pending }) -> + State #msstate { pending_gc_completion = + rabbit_misc:orddict_cons(File, Op, Pending) }. -run_pending({read, Guid, From}, State) -> +run_pending(Files, State) -> + lists:foldl( + fun (File, State1 = #msstate { pending_gc_completion = Pending }) -> + Pending1 = orddict:erase(File, Pending), + lists:foldl( + fun run_pending_action/2, + State1 #msstate { pending_gc_completion = Pending1 }, + lists:reverse(orddict:fetch(File, Pending))) + end, State, Files). + +run_pending_action({read, Guid, From}, State) -> read_message(Guid, From, State); -run_pending({contains, Guid, From}, State) -> +run_pending_action({contains, Guid, From}, State) -> contains_message(Guid, From, State); -run_pending({remove, Guid}, State) -> +run_pending_action({remove, Guid}, State) -> remove_message(Guid, State). safe_ets_update_counter(Tab, Key, UpdateOp, SuccessFun, FailThunk) -> @@ -969,6 +995,10 @@ safe_ets_update_counter(Tab, Key, UpdateOp, SuccessFun, FailThunk) -> safe_ets_update_counter_ok(Tab, Key, UpdateOp, FailThunk) -> safe_ets_update_counter(Tab, Key, UpdateOp, fun (_) -> ok end, FailThunk). +orddict_store(Key, Val, Dict) -> + false = orddict:is_key(Key, Dict), + orddict:store(Key, Val, Dict). + %%---------------------------------------------------------------------------- %% file helper functions %%---------------------------------------------------------------------------- @@ -1429,12 +1459,12 @@ maybe_roll_to_new_file( maybe_roll_to_new_file(_, State) -> State. -maybe_compact(State = #msstate { sum_valid_data = SumValid, - sum_file_size = SumFileSize, - gc_active = false, - gc_pid = GCPid, - file_summary_ets = FileSummaryEts, - file_size_limit = FileSizeLimit }) +maybe_compact(State = #msstate { sum_valid_data = SumValid, + sum_file_size = SumFileSize, + gc_pid = GCPid, + pending_gc_completion = Pending, + file_summary_ets = FileSummaryEts, + file_size_limit = FileSizeLimit }) when (SumFileSize > 2 * FileSizeLimit andalso (SumFileSize - SumValid) / SumFileSize > ?GARBAGE_FRACTION) -> %% TODO: the algorithm here is sub-optimal - it may result in a @@ -1443,27 +1473,30 @@ maybe_compact(State = #msstate { sum_valid_data = SumValid, '$end_of_table' -> State; First -> - case find_files_to_gc(FileSummaryEts, FileSizeLimit, - ets:lookup(FileSummaryEts, First)) of + case find_files_to_combine(FileSummaryEts, FileSizeLimit, + ets:lookup(FileSummaryEts, First)) of not_found -> State; {Src, Dst} -> + Pending1 = orddict_store(Dst, [], + orddict_store(Src, [], Pending)), State1 = close_handle(Src, close_handle(Dst, State)), true = ets:update_element(FileSummaryEts, Src, {#file_summary.locked, true}), true = ets:update_element(FileSummaryEts, Dst, {#file_summary.locked, true}), - ok = rabbit_msg_store_gc:gc(GCPid, Src, Dst), - State1 #msstate { gc_active = {Src, Dst} } + ok = rabbit_msg_store_gc:combine(GCPid, Src, Dst), + State1 #msstate { pending_gc_completion = Pending1 } end end; maybe_compact(State) -> State. -find_files_to_gc(FileSummaryEts, FileSizeLimit, - [#file_summary { file = Dst, - valid_total_size = DstValid, - right = Src }]) -> +find_files_to_combine(FileSummaryEts, FileSizeLimit, + [#file_summary { file = Dst, + valid_total_size = DstValid, + right = Src, + locked = DstLocked }]) -> case Src of undefined -> not_found; @@ -1471,13 +1504,16 @@ find_files_to_gc(FileSummaryEts, FileSizeLimit, [#file_summary { file = Src, valid_total_size = SrcValid, left = Dst, - right = SrcRight }] = Next = + right = SrcRight, + locked = SrcLocked }] = Next = ets:lookup(FileSummaryEts, Src), case SrcRight of undefined -> not_found; - _ -> case DstValid + SrcValid =< FileSizeLimit of + _ -> case (DstValid + SrcValid =< FileSizeLimit) andalso + (DstValid > 0) andalso (SrcValid > 0) andalso + not (DstLocked orelse SrcLocked) of true -> {Src, Dst}; - false -> find_files_to_gc( + false -> find_files_to_combine( FileSummaryEts, FileSizeLimit, Next) end end @@ -1486,85 +1522,86 @@ find_files_to_gc(FileSummaryEts, FileSizeLimit, delete_file_if_empty(File, State = #msstate { current_file = File }) -> State; delete_file_if_empty(File, State = #msstate { - dir = Dir, - sum_file_size = SumFileSize, - file_handles_ets = FileHandlesEts, - file_summary_ets = FileSummaryEts }) -> + gc_pid = GCPid, + file_summary_ets = FileSummaryEts, + pending_gc_completion = Pending }) -> [#file_summary { valid_total_size = ValidData, - left = Left, - right = Right, - file_size = FileSize, locked = false }] = ets:lookup(FileSummaryEts, File), case ValidData of - %% we should NEVER find the current file in here hence right - %% should always be a file, not undefined - 0 -> case {Left, Right} of - {undefined, _} when Right =/= undefined -> - %% the eldest file is empty. - true = ets:update_element( - FileSummaryEts, Right, - {#file_summary.left, undefined}); - {_, _} when Right =/= undefined -> - true = ets:update_element(FileSummaryEts, Right, - {#file_summary.left, Left}), - true = ets:update_element(FileSummaryEts, Left, - {#file_summary.right, Right}) - end, - true = mark_handle_to_close(FileHandlesEts, File), - true = ets:delete(FileSummaryEts, File), - {ok, Messages, FileSize} = - scan_file_for_valid_messages(Dir, filenum_to_name(File)), - [index_delete(Guid, State) || - {Guid, _TotalSize, _Offset} <- Messages], - State1 = close_handle(File, State), - ok = file:delete(form_filename(Dir, filenum_to_name(File))), - State1 #msstate { sum_file_size = SumFileSize - FileSize }; + 0 -> %% don't delete the file_summary_ets entry for File here + %% because we could have readers which need to be able to + %% decrement the readers count. + true = ets:update_element(FileSummaryEts, File, + {#file_summary.locked, true}), + ok = rabbit_msg_store_gc:delete(GCPid, File), + Pending1 = orddict_store(File, [], Pending), + close_handle(File, + State #msstate { pending_gc_completion = Pending1 }); _ -> State end. +cleanup_after_file_deletion(File, + #msstate { file_handles_ets = FileHandlesEts, + file_summary_ets = FileSummaryEts }) -> + %% Ensure that any clients that have open fhs to the file close + %% them before using them again. This has to be done here (given + %% it's done in the msg_store, and not the gc), and not when + %% starting up the GC, because if done when starting up the GC, + %% the client could find the close, and close and reopen the fh, + %% whilst the GC is waiting for readers to disappear, before it's + %% actually done the GC. + true = mark_handle_to_close(FileHandlesEts, File), + [#file_summary { left = Left, + right = Right, + locked = true, + readers = 0 }] = ets:lookup(FileSummaryEts, File), + %% We'll never delete the current file, so right is never undefined + true = Right =/= undefined, %% ASSERTION + true = ets:update_element(FileSummaryEts, Right, + {#file_summary.left, Left}), + %% ensure the double linked list is maintained + true = case Left of + undefined -> true; %% File is the eldest file (left-most) + _ -> ets:update_element(FileSummaryEts, Left, + {#file_summary.right, Right}) + end, + true = ets:delete(FileSummaryEts, File), + ok. + %%---------------------------------------------------------------------------- %% garbage collection / compaction / aggregation -- external %%---------------------------------------------------------------------------- -gc(SrcFile, DstFile, State = {FileSummaryEts, _Dir, _Index, _IndexState}) -> - [SrcObj = #file_summary { - readers = SrcReaders, - left = DstFile, - file_size = SrcFileSize, - locked = true }] = ets:lookup(FileSummaryEts, SrcFile), - [DstObj = #file_summary { - readers = DstReaders, - right = SrcFile, - file_size = DstFileSize, - locked = true }] = ets:lookup(FileSummaryEts, DstFile), - - case SrcReaders =:= 0 andalso DstReaders =:= 0 of - true -> TotalValidData = combine_files(SrcObj, DstObj, State), - %% don't update dest.right, because it could be - %% changing at the same time - true = ets:update_element( - FileSummaryEts, DstFile, - [{#file_summary.valid_total_size, TotalValidData}, - {#file_summary.file_size, TotalValidData}]), - SrcFileSize + DstFileSize - TotalValidData; - false -> concurrent_readers - end. - -combine_files(#file_summary { file = Source, - valid_total_size = SourceValid, - left = Destination }, - #file_summary { file = Destination, - valid_total_size = DestinationValid, - right = Source }, - State = {_FileSummaryEts, Dir, _Index, _IndexState}) -> - SourceName = filenum_to_name(Source), - DestinationName = filenum_to_name(Destination), +has_readers(File, #gc_state { file_summary_ets = FileSummaryEts }) -> + [#file_summary { locked = true, readers = Count }] = + ets:lookup(FileSummaryEts, File), + Count /= 0. + +combine_files(Source, Destination, + State = #gc_state { file_summary_ets = FileSummaryEts, + dir = Dir, + msg_store = Server }) -> + [#file_summary { + readers = 0, + left = Destination, + valid_total_size = SourceValid, + file_size = SourceFileSize, + locked = true }] = ets:lookup(FileSummaryEts, Source), + [#file_summary { + readers = 0, + right = Source, + valid_total_size = DestinationValid, + file_size = DestinationFileSize, + locked = true }] = ets:lookup(FileSummaryEts, Destination), + + SourceName = filenum_to_name(Source), + DestinationName = filenum_to_name(Destination), {ok, SourceHdl} = open_file(Dir, SourceName, ?READ_AHEAD_MODE), {ok, DestinationHdl} = open_file(Dir, DestinationName, ?READ_AHEAD_MODE ++ ?WRITE_MODE), - ExpectedSize = SourceValid + DestinationValid, + TotalValidData = SourceValid + DestinationValid, %% if DestinationValid =:= DestinationContiguousTop then we don't %% need a tmp file %% if they're not equal, then we need to write out everything past @@ -1577,7 +1614,7 @@ combine_files(#file_summary { file = Source, drop_contiguous_block_prefix(DestinationWorkList), case DestinationWorkListTail of [] -> ok = truncate_and_extend_file( - DestinationHdl, DestinationContiguousTop, ExpectedSize); + DestinationHdl, DestinationContiguousTop, TotalValidData); _ -> Tmp = filename:rootname(DestinationName) ++ ?FILE_EXTENSION_TMP, {ok, TmpHdl} = open_file(Dir, Tmp, ?READ_AHEAD_MODE++?WRITE_MODE), ok = copy_messages( @@ -1591,7 +1628,7 @@ combine_files(#file_summary { file = Source, %% Destination and copy from Tmp back to the end {ok, 0} = file_handle_cache:position(TmpHdl, 0), ok = truncate_and_extend_file( - DestinationHdl, DestinationContiguousTop, ExpectedSize), + DestinationHdl, DestinationContiguousTop, TotalValidData), {ok, TmpSize} = file_handle_cache:copy(TmpHdl, DestinationHdl, TmpSize), %% position in DestinationHdl should now be DestinationValid @@ -1599,14 +1636,36 @@ combine_files(#file_summary { file = Source, ok = file_handle_cache:delete(TmpHdl) end, {SourceWorkList, SourceValid} = load_and_vacuum_message_file(Source, State), - ok = copy_messages(SourceWorkList, DestinationValid, ExpectedSize, + ok = copy_messages(SourceWorkList, DestinationValid, TotalValidData, SourceHdl, DestinationHdl, Destination, State), %% tidy up ok = file_handle_cache:close(DestinationHdl), ok = file_handle_cache:delete(SourceHdl), - ExpectedSize. -load_and_vacuum_message_file(File, {_FileSummaryEts, Dir, Index, IndexState}) -> + %% don't update dest.right, because it could be changing at the + %% same time + true = ets:update_element( + FileSummaryEts, Destination, + [{#file_summary.valid_total_size, TotalValidData}, + {#file_summary.file_size, TotalValidData}]), + + Reclaimed = SourceFileSize + DestinationFileSize - TotalValidData, + gen_server2:cast(Server, {combine_files, Source, Destination, Reclaimed}). + +delete_file(File, State = #gc_state { file_summary_ets = FileSummaryEts, + dir = Dir, + msg_store = Server }) -> + [#file_summary { valid_total_size = 0, + locked = true, + file_size = FileSize, + readers = 0 }] = ets:lookup(FileSummaryEts, File), + {[], 0} = load_and_vacuum_message_file(File, State), + ok = file:delete(form_filename(Dir, filenum_to_name(File))), + gen_server2:cast(Server, {delete_file, File, FileSize}). + +load_and_vacuum_message_file(File, #gc_state { dir = Dir, + index_module = Index, + index_state = IndexState }) -> %% Messages here will be end-of-file at start-of-list {ok, Messages, _FileSize} = scan_file_for_valid_messages(Dir, filenum_to_name(File)), @@ -1627,7 +1686,8 @@ load_and_vacuum_message_file(File, {_FileSummaryEts, Dir, Index, IndexState}) -> end, {[], 0}, Messages). copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl, - Destination, {_FileSummaryEts, _Dir, Index, IndexState}) -> + Destination, #gc_state { index_module = Index, + index_state = IndexState }) -> Copy = fun ({BlockStart, BlockEnd}) -> BSize = BlockEnd - BlockStart, {ok, BlockStart} = diff --git a/src/rabbit_msg_store_gc.erl b/src/rabbit_msg_store_gc.erl index a7855bbf..cd9fd497 100644 --- a/src/rabbit_msg_store_gc.erl +++ b/src/rabbit_msg_store_gc.erl @@ -33,20 +33,16 @@ -behaviour(gen_server2). --export([start_link/4, gc/3, no_readers/2, stop/1]). +-export([start_link/1, combine/3, delete/2, no_readers/2, stop/1]). -export([set_maximum_since_use/2]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3, prioritise_cast/2]). --record(gcstate, - {dir, - index_state, - index_module, - parent, - file_summary_ets, - scheduled +-record(state, + { pending_no_readers, + msg_store_state }). -include("rabbit.hrl"). @@ -55,10 +51,12 @@ -ifdef(use_specs). --spec(start_link/4 :: (file:filename(), any(), atom(), ets:tid()) -> +-spec(start_link/1 :: (rabbit_msg_store:gc_state()) -> rabbit_types:ok_pid_or_error()). --spec(gc/3 :: (pid(), non_neg_integer(), non_neg_integer()) -> 'ok'). --spec(no_readers/2 :: (pid(), non_neg_integer()) -> 'ok'). +-spec(combine/3 :: (pid(), rabbit_msg_store:file_num(), + rabbit_msg_store:file_num()) -> 'ok'). +-spec(delete/2 :: (pid(), rabbit_msg_store:file_num()) -> 'ok'). +-spec(no_readers/2 :: (pid(), rabbit_msg_store:file_num()) -> 'ok'). -spec(stop/1 :: (pid()) -> 'ok'). -spec(set_maximum_since_use/2 :: (pid(), non_neg_integer()) -> 'ok'). @@ -66,13 +64,15 @@ %%---------------------------------------------------------------------------- -start_link(Dir, IndexState, IndexModule, FileSummaryEts) -> - gen_server2:start_link( - ?MODULE, [self(), Dir, IndexState, IndexModule, FileSummaryEts], - [{timeout, infinity}]). +start_link(MsgStoreState) -> + gen_server2:start_link(?MODULE, [MsgStoreState], + [{timeout, infinity}]). -gc(Server, Source, Destination) -> - gen_server2:cast(Server, {gc, Source, Destination}). +combine(Server, Source, Destination) -> + gen_server2:cast(Server, {combine, Source, Destination}). + +delete(Server, File) -> + gen_server2:cast(Server, {delete, File}). no_readers(Server, File) -> gen_server2:cast(Server, {no_readers, File}). @@ -85,16 +85,11 @@ set_maximum_since_use(Pid, Age) -> %%---------------------------------------------------------------------------- -init([Parent, Dir, IndexState, IndexModule, FileSummaryEts]) -> +init([MsgStoreState]) -> ok = file_handle_cache:register_callback(?MODULE, set_maximum_since_use, [self()]), - {ok, #gcstate { dir = Dir, - index_state = IndexState, - index_module = IndexModule, - parent = Parent, - file_summary_ets = FileSummaryEts, - scheduled = undefined }, - hibernate, + {ok, #state { pending_no_readers = dict:new(), + msg_store_state = MsgStoreState }, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. prioritise_cast({set_maximum_since_use, _Age}, _State) -> 8; @@ -103,18 +98,23 @@ prioritise_cast(_Msg, _State) -> 0. handle_call(stop, _From, State) -> {stop, normal, ok, State}. -handle_cast({gc, Source, Destination}, - State = #gcstate { scheduled = undefined }) -> - {noreply, attempt_gc(State #gcstate { scheduled = {Source, Destination} }), - hibernate}; +handle_cast({combine, Source, Destination}, State) -> + {noreply, attempt_action(combine, [Source, Destination], State), hibernate}; -handle_cast({no_readers, File}, - State = #gcstate { scheduled = {Source, Destination} }) - when File =:= Source orelse File =:= Destination -> - {noreply, attempt_gc(State), hibernate}; +handle_cast({delete, File}, State) -> + {noreply, attempt_action(delete, [File], State), hibernate}; -handle_cast({no_readers, _File}, State) -> - {noreply, State, hibernate}; +handle_cast({no_readers, File}, + State = #state { pending_no_readers = Pending }) -> + {noreply, case dict:find(File, Pending) of + error -> + State; + {ok, {Action, Files}} -> + Pending1 = dict:erase(File, Pending), + attempt_action( + Action, Files, + State #state { pending_no_readers = Pending1 }) + end, hibernate}; handle_cast({set_maximum_since_use, Age}, State) -> ok = file_handle_cache:set_maximum_since_use(Age), @@ -129,16 +129,18 @@ terminate(_Reason, State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. -attempt_gc(State = #gcstate { dir = Dir, - index_state = IndexState, - index_module = Index, - parent = Parent, - file_summary_ets = FileSummaryEts, - scheduled = {Source, Destination} }) -> - case rabbit_msg_store:gc(Source, Destination, - {FileSummaryEts, Dir, Index, IndexState}) of - concurrent_readers -> State; - Reclaimed -> ok = rabbit_msg_store:gc_done( - Parent, Reclaimed, Source, Destination), - State #gcstate { scheduled = undefined } +attempt_action(Action, Files, + State = #state { pending_no_readers = Pending, + msg_store_state = MsgStoreState }) -> + case [File || File <- Files, + rabbit_msg_store:has_readers(File, MsgStoreState)] of + [] -> do_action(Action, Files, MsgStoreState), + State; + [File | _] -> Pending1 = dict:store(File, {Action, Files}, Pending), + State #state { pending_no_readers = Pending1 } end. + +do_action(combine, [Source, Destination], MsgStoreState) -> + rabbit_msg_store:combine_files(Source, Destination, MsgStoreState); +do_action(delete, [File], MsgStoreState) -> + rabbit_msg_store:delete_file(File, MsgStoreState). diff --git a/src/rabbit_multi.erl b/src/rabbit_multi.erl index b48d0aa3..0440dbe4 100644 --- a/src/rabbit_multi.erl +++ b/src/rabbit_multi.erl @@ -227,11 +227,11 @@ run_rabbitmq_server_unix() -> run_rabbitmq_server_win32() -> Cmd = filename:nativename(os:find_executable("cmd")), - CmdLine = "\"" ++ getenv("RABBITMQ_SCRIPT_HOME") - ++ "\\rabbitmq-server.bat\" -noinput", + CmdLine = "\"" ++ getenv("RABBITMQ_SCRIPT_HOME") ++ + "\\rabbitmq-server.bat\" -noinput -detached", erlang:open_port({spawn_executable, Cmd}, [{arg0, Cmd}, {args, ["/q", "/s", "/c", CmdLine]}, - nouse_stdio, hide]). + nouse_stdio]). is_rabbit_running(Node, RpcTimeout) -> case rpc:call(Node, rabbit, status, [], RpcTimeout) of @@ -315,7 +315,7 @@ is_dead(Pid) -> end}, {win32, fun () -> Res = os:cmd("tasklist /nh /fi \"pid eq " ++ - PidS ++ "\""), + PidS ++ "\" 2>&1"), case re:run(Res, "erl\\.exe", [{capture, none}]) of match -> false; _ -> true diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index db5c71f6..1c542ffe 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -41,7 +41,7 @@ %%used by TCP-based transports, e.g. STOMP adapter -export([check_tcp_listener_address/3]). --export([tcp_listener_started/2, tcp_listener_stopped/2, +-export([tcp_listener_started/3, tcp_listener_stopped/3, start_client/1, start_ssl_client/2]). -include("rabbit.hrl"). @@ -67,21 +67,21 @@ -spec(start/0 :: () -> 'ok'). -spec(start_tcp_listener/2 :: (hostname(), ip_port()) -> 'ok'). --spec(start_ssl_listener/3 :: (hostname(), ip_port(), [rabbit_types:info()]) +-spec(start_ssl_listener/3 :: (hostname(), ip_port(), rabbit_types:infos()) -> 'ok'). -spec(stop_tcp_listener/2 :: (hostname(), ip_port()) -> 'ok'). -spec(active_listeners/0 :: () -> [rabbit_types:listener()]). -spec(node_listeners/1 :: (node()) -> [rabbit_types:listener()]). -spec(connections/0 :: () -> [rabbit_types:connection()]). --spec(connection_info_keys/0 :: () -> [rabbit_types:info_key()]). +-spec(connection_info_keys/0 :: () -> rabbit_types:info_keys()). -spec(connection_info/1 :: - (rabbit_types:connection()) -> [rabbit_types:info()]). + (rabbit_types:connection()) -> rabbit_types:infos()). -spec(connection_info/2 :: - (rabbit_types:connection(), [rabbit_types:info_key()]) - -> [rabbit_types:info()]). --spec(connection_info_all/0 :: () -> [[rabbit_types:info()]]). + (rabbit_types:connection(), rabbit_types:info_keys()) + -> rabbit_types:infos()). +-spec(connection_info_all/0 :: () -> [rabbit_types:infos()]). -spec(connection_info_all/1 :: - ([rabbit_types:info_key()]) -> [[rabbit_types:info()]]). + (rabbit_types:info_keys()) -> [rabbit_types:infos()]). -spec(close_connection/2 :: (pid(), string()) -> 'ok'). -spec(on_node_down/1 :: (node()) -> 'ok'). -spec(check_tcp_listener_address/3 :: @@ -160,14 +160,14 @@ check_tcp_listener_address(NamePrefix, Host, Port) -> {IPAddress, Name}. start_tcp_listener(Host, Port) -> - start_listener(Host, Port, "TCP Listener", + start_listener(Host, Port, amqp, "TCP Listener", {?MODULE, start_client, []}). start_ssl_listener(Host, Port, SslOpts) -> - start_listener(Host, Port, "SSL Listener", + start_listener(Host, Port, 'amqp/ssl', "SSL Listener", {?MODULE, start_ssl_client, [SslOpts]}). -start_listener(Host, Port, Label, OnConnect) -> +start_listener(Host, Port, Protocol, Label, OnConnect) -> {IPAddress, Name} = check_tcp_listener_address(rabbit_tcp_listener_sup, Host, Port), {ok,_} = supervisor:start_child( @@ -175,8 +175,8 @@ start_listener(Host, Port, Label, OnConnect) -> {Name, {tcp_listener_sup, start_link, [IPAddress, Port, ?RABBIT_TCP_OPTS , - {?MODULE, tcp_listener_started, []}, - {?MODULE, tcp_listener_stopped, []}, + {?MODULE, tcp_listener_started, [Protocol]}, + {?MODULE, tcp_listener_stopped, [Protocol]}, OnConnect, Label]}, transient, infinity, supervisor, [tcp_listener_sup]}), ok. @@ -188,20 +188,25 @@ stop_tcp_listener(Host, Port) -> ok = supervisor:delete_child(rabbit_sup, Name), ok. -tcp_listener_started(IPAddress, Port) -> +tcp_listener_started(Protocol, IPAddress, Port) -> + %% We need the ip to distinguish e.g. 0.0.0.0 and 127.0.0.1 + %% We need the host so we can distinguish multiple instances of the above + %% in a cluster. ok = mnesia:dirty_write( rabbit_listener, #listener{node = node(), - protocol = tcp, + protocol = Protocol, host = tcp_host(IPAddress), + ip_address = IPAddress, port = Port}). -tcp_listener_stopped(IPAddress, Port) -> +tcp_listener_stopped(Protocol, IPAddress, Port) -> ok = mnesia:dirty_delete_object( rabbit_listener, #listener{node = node(), - protocol = tcp, + protocol = Protocol, host = tcp_host(IPAddress), + ip_address = IPAddress, port = Port}). active_listeners() -> diff --git a/src/rabbit_persister.erl b/src/rabbit_persister.erl index 66e5cf63..11056c8e 100644 --- a/src/rabbit_persister.erl +++ b/src/rabbit_persister.erl @@ -69,7 +69,8 @@ -type(pmsg() :: {rabbit_amqqueue:name(), pkey()}). -type(work_item() :: - {publish, rabbit_types:message(), pmsg()} | + {publish, + rabbit_types:message(), rabbit_types:message_properties(), pmsg()} | {deliver, pmsg()} | {ack, pmsg()}). @@ -173,9 +174,10 @@ handle_call(force_snapshot, _From, State) -> handle_call({queue_content, QName}, _From, State = #pstate{snapshot = #psnapshot{messages = Messages, queues = Queues}}) -> - MatchSpec= [{{{QName,'$1'}, '$2', '$3'}, [], [{{'$3', '$1', '$2'}}]}], - do_reply([{ets:lookup_element(Messages, K, 2), D} || - {_, K, D} <- lists:sort(ets:select(Queues, MatchSpec))], + MatchSpec= [{{{QName,'$1'}, '$2', '$3', '$4'}, [], + [{{'$4', '$1', '$2', '$3'}}]}], + do_reply([{ets:lookup_element(Messages, K, 2), MP, D} || + {_, K, D, MP} <- lists:sort(ets:select(Queues, MatchSpec))], State); handle_call(_Request, _From, State) -> {noreply, State}. @@ -243,9 +245,9 @@ log_work(CreateWorkUnit, MessageList, snapshot = Snapshot = #psnapshot{messages = Messages}}) -> Unit = CreateWorkUnit( rabbit_misc:map_in_order( - fun (M = {publish, Message, QK = {_QName, PKey}}) -> + fun (M = {publish, Message, MsgProps, QK = {_QName, PKey}}) -> case ets:lookup(Messages, PKey) of - [_] -> {tied, QK}; + [_] -> {tied, MsgProps, QK}; [] -> ets:insert(Messages, {PKey, Message}), M end; @@ -356,7 +358,8 @@ current_snapshot(_Snapshot = #psnapshot{transactions = Ts, next_seq_id = NextSeqId}) -> %% Avoid infinite growth of the table by removing messages not %% bound to a queue anymore - PKeys = ets:foldl(fun ({{_QName, PKey}, _Delivered, _SeqId}, S) -> + PKeys = ets:foldl(fun ({{_QName, PKey}, _Delivered, + _MsgProps, _SeqId}, S) -> sets:add_element(PKey, S) end, sets:new(), Queues), prune_table(Messages, fun (Key) -> sets:is_element(Key, PKeys) end), @@ -474,14 +477,14 @@ perform_work(MessageList, Messages, Queues, SeqId) -> perform_work_item(Item, Messages, Queues, NextSeqId) end, SeqId, MessageList). -perform_work_item({publish, Message, QK = {_QName, PKey}}, +perform_work_item({publish, Message, MsgProps, QK = {_QName, PKey}}, Messages, Queues, NextSeqId) -> true = ets:insert(Messages, {PKey, Message}), - true = ets:insert(Queues, {QK, false, NextSeqId}), + true = ets:insert(Queues, {QK, false, MsgProps, NextSeqId}), NextSeqId + 1; -perform_work_item({tied, QK}, _Messages, Queues, NextSeqId) -> - true = ets:insert(Queues, {QK, false, NextSeqId}), +perform_work_item({tied, MsgProps, QK}, _Messages, Queues, NextSeqId) -> + true = ets:insert(Queues, {QK, false, MsgProps, NextSeqId}), NextSeqId + 1; perform_work_item({deliver, QK}, _Messages, Queues, NextSeqId) -> diff --git a/src/rabbit_plugin_activator.erl b/src/rabbit_plugin_activator.erl index 88300ab4..ef81ddf2 100644 --- a/src/rabbit_plugin_activator.erl +++ b/src/rabbit_plugin_activator.erl @@ -33,8 +33,6 @@ -export([start/0, stop/0]). --define(DefaultPluginDir, "plugins"). --define(DefaultUnpackedPluginDir, "priv/plugins"). -define(BaseApps, [rabbit]). %%---------------------------------------------------------------------------- @@ -56,8 +54,8 @@ start() -> application:load(rabbit), %% Determine our various directories - PluginDir = get_env(plugins_dir, ?DefaultPluginDir), - UnpackedPluginDir = get_env(plugins_expand_dir, ?DefaultUnpackedPluginDir), + {ok, PluginDir} = application:get_env(rabbit, plugins_dir), + {ok, UnpackedPluginDir} = application:get_env(rabbit, plugins_expand_dir), RootName = UnpackedPluginDir ++ "/rabbit", @@ -142,12 +140,6 @@ start() -> stop() -> ok. -get_env(Key, Default) -> - case application:get_env(rabbit, Key) of - {ok, V} -> V; - _ -> Default - end. - determine_version(App) -> application:load(App), {ok, Vsn} = application:get_key(App, vsn), diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 0b98290c..bde9b3d3 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -31,8 +31,9 @@ -module(rabbit_queue_index). --export([init/4, terminate/2, delete_and_terminate/1, publish/4, - deliver/2, ack/2, sync/2, flush/1, read/3, +-export([init/1, shutdown_terms/1, recover/4, + terminate/2, delete_and_terminate/1, + publish/5, deliver/2, ack/2, sync/2, flush/1, read/3, next_segment_boundary/1, bounds/1, recover/1]). -define(CLEAN_FILENAME, "clean.dot"). @@ -98,12 +99,12 @@ %% and seeding the message store on start up. %% %% Note that in general, the representation of a message's state as -%% the tuple: {('no_pub'|{Guid, IsPersistent}), ('del'|'no_del'), -%% ('ack'|'no_ack')} is richer than strictly necessary for most -%% operations. However, for startup, and to ensure the safe and -%% correct combination of journal entries with entries read from the -%% segment on disk, this richer representation vastly simplifies and -%% clarifies the code. +%% the tuple: {('no_pub'|{Guid, MsgProps, IsPersistent}), +%% ('del'|'no_del'), ('ack'|'no_ack')} is richer than strictly +%% necessary for most operations. However, for startup, and to ensure +%% the safe and correct combination of journal entries with entries +%% read from the segment on disk, this richer representation vastly +%% simplifies and clarifies the code. %% %% For notes on Clean Shutdown and startup, see documentation in %% variable_queue. @@ -141,14 +142,19 @@ -define(REL_SEQ_ONLY_ENTRY_LENGTH_BYTES, 2). %% publish record is binary 1 followed by a bit for is_persistent, -%% then 14 bits of rel seq id, and 128 bits of md5sum msg id +%% then 14 bits of rel seq id, 64 bits for message expiry and 128 bits +%% of md5sum msg id -define(PUBLISH_PREFIX, 1). -define(PUBLISH_PREFIX_BITS, 1). +-define(EXPIRY_BYTES, 8). +-define(EXPIRY_BITS, (?EXPIRY_BYTES * 8)). +-define(NO_EXPIRY, 0). + -define(GUID_BYTES, 16). %% md5sum is 128 bit or 16 bytes -define(GUID_BITS, (?GUID_BYTES * 8)). -%% 16 bytes for md5sum + 2 for seq, bits and prefix --define(PUBLISH_RECORD_LENGTH_BYTES, ?GUID_BYTES + 2). +%% 16 bytes for md5sum + 8 for expiry + 2 for seq, bits and prefix +-define(PUBLISH_RECORD_LENGTH_BYTES, ?GUID_BYTES + ?EXPIRY_BYTES + 2). %% 1 publish, 1 deliver, 1 ack per msg -define(SEGMENT_TOTAL_SIZE, ?SEGMENT_ENTRY_COUNT * @@ -157,7 +163,7 @@ %% ---- misc ---- --define(PUB, {_, _}). %% {Guid, IsPersistent} +-define(PUB, {_, _, _}). %% {Guid, MsgProps, IsPersistent} -define(READ_MODE, [binary, raw, read]). -define(READ_AHEAD_MODE, [{read_ahead, ?SEGMENT_TOTAL_SIZE} | ?READ_MODE]). @@ -184,7 +190,7 @@ unacked :: non_neg_integer() })). -type(seq_id() :: integer()). --type(seg_dict() :: {dict:dictionary(), [segment()]}). +-type(seg_dict() :: {dict(), [segment()]}). -type(qistate() :: #qistate { dir :: file:filename(), segments :: 'undefined' | seg_dict(), journal_handle :: hdl(), @@ -194,21 +200,26 @@ -type(startup_fun_state() :: {(fun ((A) -> 'finished' | {rabbit_guid:guid(), non_neg_integer(), A})), A}). +-type(shutdown_terms() :: [any()]). --spec(init/4 :: (rabbit_amqqueue:name(), boolean(), boolean(), - fun ((rabbit_guid:guid()) -> boolean())) -> - {'undefined' | non_neg_integer(), [any()], qistate()}). +-spec(init/1 :: (rabbit_amqqueue:name()) -> qistate()). +-spec(shutdown_terms/1 :: (rabbit_amqqueue:name()) -> shutdown_terms()). +-spec(recover/4 :: (rabbit_amqqueue:name(), shutdown_terms(), boolean(), + fun ((rabbit_guid:guid()) -> boolean())) -> + {'undefined' | non_neg_integer(), qistate()}). -spec(terminate/2 :: ([any()], qistate()) -> qistate()). -spec(delete_and_terminate/1 :: (qistate()) -> qistate()). --spec(publish/4 :: (rabbit_guid:guid(), seq_id(), boolean(), qistate()) -> - qistate()). +-spec(publish/5 :: (rabbit_guid:guid(), seq_id(), + rabbit_types:message_properties(), boolean(), qistate()) + -> qistate()). -spec(deliver/2 :: ([seq_id()], qistate()) -> qistate()). -spec(ack/2 :: ([seq_id()], qistate()) -> qistate()). -spec(sync/2 :: ([seq_id()], qistate()) -> qistate()). -spec(flush/1 :: (qistate()) -> qistate()). -spec(read/3 :: (seq_id(), seq_id(), qistate()) -> - {[{rabbit_guid:guid(), seq_id(), boolean(), boolean()}], - qistate()}). + {[{rabbit_guid:guid(), seq_id(), + rabbit_types:message_properties(), + boolean(), boolean()}], qistate()}). -spec(next_segment_boundary/1 :: (seq_id()) -> seq_id()). -spec(bounds/1 :: (qistate()) -> {non_neg_integer(), non_neg_integer(), qistate()}). @@ -222,25 +233,26 @@ %% public API %%---------------------------------------------------------------------------- -init(Name, false, _MsgStoreRecovered, _ContainsCheckFun) -> +init(Name) -> State = #qistate { dir = Dir } = blank_state(Name), false = filelib:is_file(Dir), %% is_file == is file or dir - {0, [], State}; + State. + +shutdown_terms(Name) -> + #qistate { dir = Dir } = blank_state(Name), + case read_shutdown_terms(Dir) of + {error, _} -> []; + {ok, Terms1} -> Terms1 + end. -init(Name, true, MsgStoreRecovered, ContainsCheckFun) -> +recover(Name, Terms, MsgStoreRecovered, ContainsCheckFun) -> State = #qistate { dir = Dir } = blank_state(Name), - Terms = case read_shutdown_terms(Dir) of - {error, _} -> []; - {ok, Terms1} -> Terms1 - end, CleanShutdown = detect_clean_shutdown(Dir), - {Count, State1} = - case CleanShutdown andalso MsgStoreRecovered of - true -> RecoveredCounts = proplists:get_value(segments, Terms, []), - init_clean(RecoveredCounts, State); - false -> init_dirty(CleanShutdown, ContainsCheckFun, State) - end, - {Count, Terms, State1}. + case CleanShutdown andalso MsgStoreRecovered of + true -> RecoveredCounts = proplists:get_value(segments, Terms, []), + init_clean(RecoveredCounts, State); + false -> init_dirty(CleanShutdown, ContainsCheckFun, State) + end. terminate(Terms, State) -> {SegmentCounts, State1 = #qistate { dir = Dir }} = terminate(State), @@ -252,15 +264,18 @@ delete_and_terminate(State) -> ok = rabbit_misc:recursive_delete([Dir]), State1. -publish(Guid, SeqId, IsPersistent, State) when is_binary(Guid) -> +publish(Guid, SeqId, MsgProps, IsPersistent, State) when is_binary(Guid) -> ?GUID_BYTES = size(Guid), {JournalHdl, State1} = get_journal_handle(State), ok = file_handle_cache:append( JournalHdl, [<<(case IsPersistent of true -> ?PUB_PERSIST_JPREFIX; false -> ?PUB_TRANS_JPREFIX - end):?JPREFIX_BITS, SeqId:?SEQ_BITS>>, Guid]), - maybe_flush_journal(add_to_journal(SeqId, {Guid, IsPersistent}, State1)). + end):?JPREFIX_BITS, + SeqId:?SEQ_BITS>>, + create_pub_record_body(Guid, MsgProps)]), + maybe_flush_journal( + add_to_journal(SeqId, {Guid, MsgProps, IsPersistent}, State1)). deliver(SeqIds, State) -> deliver_or_ack(del, SeqIds, State). @@ -453,7 +468,7 @@ recover_segment(ContainsCheckFun, CleanShutdown, {SegEntries1, UnackedCountDelta} = segment_plus_journal(SegEntries, JEntries), array:sparse_foldl( - fun (RelSeq, {{Guid, _IsPersistent}, Del, no_ack}, Segment1) -> + fun (RelSeq, {{Guid, _MsgProps, _IsPersistent}, Del, no_ack}, Segment1) -> recover_message(ContainsCheckFun(Guid), CleanShutdown, Del, RelSeq, Segment1) end, @@ -506,7 +521,8 @@ queue_index_walker_reader(QueueName, Gatherer) -> State = #qistate { segments = Segments, dir = Dir } = recover_journal(blank_state(QueueName)), [ok = segment_entries_foldr( - fun (_RelSeq, {{Guid, true}, _IsDelivered, no_ack}, ok) -> + fun (_RelSeq, {{Guid, _MsgProps, true}, _IsDelivered, no_ack}, + ok) -> gatherer:in(Gatherer, {Guid, 1}); (_RelSeq, _Value, Acc) -> Acc @@ -516,6 +532,32 @@ queue_index_walker_reader(QueueName, Gatherer) -> ok = gatherer:finish(Gatherer). %%---------------------------------------------------------------------------- +%% expiry/binary manipulation +%%---------------------------------------------------------------------------- + +create_pub_record_body(Guid, #message_properties{expiry = Expiry}) -> + [Guid, expiry_to_binary(Expiry)]. + +expiry_to_binary(undefined) -> <<?NO_EXPIRY:?EXPIRY_BITS>>; +expiry_to_binary(Expiry) -> <<Expiry:?EXPIRY_BITS>>. + +read_pub_record_body(Hdl) -> + case file_handle_cache:read(Hdl, ?GUID_BYTES + ?EXPIRY_BYTES) of + {ok, Bin} -> + %% work around for binary data fragmentation. See + %% rabbit_msg_file:read_next/2 + <<GuidNum:?GUID_BITS, Expiry:?EXPIRY_BITS>> = Bin, + <<Guid:?GUID_BYTES/binary>> = <<GuidNum:?GUID_BITS>>, + Exp = case Expiry of + ?NO_EXPIRY -> undefined; + X -> X + end, + {Guid, #message_properties{expiry = Exp}}; + Error -> + Error + end. + +%%---------------------------------------------------------------------------- %% journal manipulation %%---------------------------------------------------------------------------- @@ -636,17 +678,13 @@ load_journal_entries(State = #qistate { journal_handle = Hdl }) -> ?ACK_JPREFIX -> load_journal_entries(add_to_journal(SeqId, ack, State)); _ -> - case file_handle_cache:read(Hdl, ?GUID_BYTES) of - {ok, <<GuidNum:?GUID_BITS>>} -> - %% work around for binary data - %% fragmentation. See - %% rabbit_msg_file:read_next/2 - <<Guid:?GUID_BYTES/binary>> = - <<GuidNum:?GUID_BITS>>, - Publish = {Guid, case Prefix of - ?PUB_PERSIST_JPREFIX -> true; - ?PUB_TRANS_JPREFIX -> false - end}, + case read_pub_record_body(Hdl) of + {Guid, MsgProps} -> + Publish = {Guid, MsgProps, + case Prefix of + ?PUB_PERSIST_JPREFIX -> true; + ?PUB_TRANS_JPREFIX -> false + end}, load_journal_entries( add_to_journal(SeqId, Publish, State)); _ErrOrEoF -> %% err, we've lost at least a publish @@ -744,11 +782,12 @@ write_entry_to_segment(RelSeq, {Pub, Del, Ack}, Hdl) -> ok = case Pub of no_pub -> ok; - {Guid, IsPersistent} -> + {Guid, MsgProps, IsPersistent} -> file_handle_cache:append( Hdl, [<<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS, (bool_to_int(IsPersistent)):1, - RelSeq:?REL_SEQ_BITS>>, Guid]) + RelSeq:?REL_SEQ_BITS>>, + create_pub_record_body(Guid, MsgProps)]) end, ok = case {Del, Ack} of {no_del, no_ack} -> @@ -768,10 +807,10 @@ read_bounded_segment(Seg, {StartSeg, StartRelSeq}, {EndSeg, EndRelSeq}, {Messages, Segments}, Dir) -> Segment = segment_find_or_new(Seg, Dir, Segments), {segment_entries_foldr( - fun (RelSeq, {{Guid, IsPersistent}, IsDelivered, no_ack}, Acc) + fun (RelSeq, {{Guid, MsgProps, IsPersistent}, IsDelivered, no_ack}, Acc) when (Seg > StartSeg orelse StartRelSeq =< RelSeq) andalso (Seg < EndSeg orelse EndRelSeq >= RelSeq) -> - [ {Guid, reconstruct_seq_id(StartSeg, RelSeq), + [ {Guid, reconstruct_seq_id(StartSeg, RelSeq), MsgProps, IsPersistent, IsDelivered == del} | Acc ]; (_RelSeq, _Value, Acc) -> Acc @@ -801,10 +840,8 @@ load_segment_entries(KeepAcked, Hdl, SegEntries, UnackedCount) -> case file_handle_cache:read(Hdl, ?REL_SEQ_ONLY_ENTRY_LENGTH_BYTES) of {ok, <<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS, IsPersistentNum:1, RelSeq:?REL_SEQ_BITS>>} -> - %% because we specify /binary, and binaries are complete - %% bytes, the size spec is in bytes, not bits. - {ok, Guid} = file_handle_cache:read(Hdl, ?GUID_BYTES), - Obj = {{Guid, 1 == IsPersistentNum}, no_del, no_ack}, + {Guid, MsgProps} = read_pub_record_body(Hdl), + Obj = {{Guid, MsgProps, 1 == IsPersistentNum}, no_del, no_ack}, SegEntries1 = array:set(RelSeq, Obj, SegEntries), load_segment_entries(KeepAcked, Hdl, SegEntries1, UnackedCount + 1); diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 29004bd5..23eb3058 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -162,28 +162,25 @@ -ifdef(use_specs). --type(start_heartbeat_fun() :: - fun ((rabbit_networking:socket(), non_neg_integer()) -> - rabbit_heartbeat:heartbeaters())). - --spec(start_link/3 :: (pid(), pid(), start_heartbeat_fun()) -> +-spec(start_link/3 :: (pid(), pid(), rabbit_heartbeat:start_heartbeat_fun()) -> rabbit_types:ok(pid())). --spec(info_keys/0 :: () -> [rabbit_types:info_key()]). --spec(info/1 :: (pid()) -> [rabbit_types:info()]). --spec(info/2 :: (pid(), [rabbit_types:info_key()]) -> [rabbit_types:info()]). +-spec(info_keys/0 :: () -> rabbit_types:info_keys()). +-spec(info/1 :: (pid()) -> rabbit_types:infos()). +-spec(info/2 :: (pid(), rabbit_types:info_keys()) -> rabbit_types:infos()). -spec(emit_stats/1 :: (pid()) -> 'ok'). -spec(shutdown/2 :: (pid(), string()) -> 'ok'). -spec(conserve_memory/2 :: (pid(), boolean()) -> 'ok'). -spec(server_properties/0 :: () -> rabbit_framing:amqp_table()). %% These specs only exists to add no_return() to keep dialyzer happy --spec(init/4 :: (pid(), pid(), pid(), start_heartbeat_fun()) -> no_return()). +-spec(init/4 :: (pid(), pid(), pid(), rabbit_heartbeat:start_heartbeat_fun()) + -> no_return()). -spec(start_connection/7 :: - (pid(), pid(), pid(), start_heartbeat_fun(), any(), - rabbit_networking:socket(), - fun ((rabbit_networking:socket()) -> + (pid(), pid(), pid(), rabbit_heartbeat:start_heartbeat_fun(), any(), + rabbit_net:socket(), + fun ((rabbit_net:socket()) -> rabbit_types:ok_or_error2( - rabbit_networking:socket(), any()))) -> no_return()). + rabbit_net:socket(), any()))) -> no_return()). -endif. @@ -235,12 +232,29 @@ conserve_memory(Pid, Conserve) -> server_properties() -> {ok, Product} = application:get_key(rabbit, id), {ok, Version} = application:get_key(rabbit, vsn), - [{list_to_binary(K), longstr, list_to_binary(V)} || - {K, V} <- [{"product", Product}, - {"version", Version}, - {"platform", "Erlang/OTP"}, - {"copyright", ?COPYRIGHT_MESSAGE}, - {"information", ?INFORMATION_MESSAGE}]]. + + %% Get any configuration-specified server properties + {ok, RawConfigServerProps} = application:get_env(rabbit, + server_properties), + + %% Normalize the simplifed (2-tuple) and unsimplified (3-tuple) forms + %% from the config and merge them with the generated built-in properties + NormalizedConfigServerProps = + [case X of + {KeyAtom, Value} -> {list_to_binary(atom_to_list(KeyAtom)), + longstr, + list_to_binary(Value)}; + {BinKey, Type, Value} -> {BinKey, Type, Value} + end || X <- RawConfigServerProps ++ + [{product, Product}, + {version, Version}, + {platform, "Erlang/OTP"}, + {copyright, ?COPYRIGHT_MESSAGE}, + {information, ?INFORMATION_MESSAGE}]], + + %% Filter duplicated properties in favor of config file provided values + lists:usort(fun ({K1,_,_}, {K2,_,_}) -> K1 =< K2 end, + NormalizedConfigServerProps). inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F). @@ -754,7 +768,19 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax, not_allowed, "frame_max=~w > ~w max size", [FrameMax, ?FRAME_MAX]); true -> - Heartbeater = SHF(Sock, ClientHeartbeat), + SendFun = + fun() -> + Frame = rabbit_binary_generator:build_heartbeat_frame(), + catch rabbit_net:send(Sock, Frame) + end, + + Parent = self(), + ReceiveFun = + fun() -> + Parent ! timeout + end, + Heartbeater = SHF(Sock, ClientHeartbeat, SendFun, + ClientHeartbeat, ReceiveFun), State#v1{connection_state = opening, connection = Connection#connection{ timeout_sec = ClientHeartbeat, diff --git a/src/rabbit_ssl.erl b/src/rabbit_ssl.erl index be451af6..5b905682 100644 --- a/src/rabbit_ssl.erl +++ b/src/rabbit_ssl.erl @@ -34,7 +34,6 @@ -include("rabbit.hrl"). -include_lib("public_key/include/public_key.hrl"). --include_lib("ssl/src/ssl_int.hrl"). -export([peer_cert_issuer/1, peer_cert_subject/1, peer_cert_validity/1]). @@ -86,8 +85,8 @@ peer_cert_validity(Cert) -> cert_info(F, Cert) -> F(case public_key:pkix_decode_cert(Cert, otp) of - {ok, DecCert} -> DecCert; - DecCert -> DecCert + {ok, DecCert} -> DecCert; %%pre R14B + DecCert -> DecCert %%R14B onwards end). %%-------------------------------------------------------------------------- diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 1b47cdb7..71b23e01 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -41,8 +41,8 @@ -include("rabbit_framing.hrl"). -include_lib("kernel/include/file.hrl"). --define(PERSISTENT_MSG_STORE, msg_store_persistent). --define(TRANSIENT_MSG_STORE, msg_store_transient). +-define(PERSISTENT_MSG_STORE, msg_store_persistent). +-define(TRANSIENT_MSG_STORE, msg_store_transient). test_content_prop_roundtrip(Datum, Binary) -> Types = [element(1, E) || E <- Datum], @@ -73,6 +73,7 @@ all_tests() -> passed = test_user_management(), passed = test_server_status(), passed = maybe_run_cluster_dependent_tests(), + passed = test_configurable_server_properties(), passed. maybe_run_cluster_dependent_tests() -> @@ -962,9 +963,6 @@ test_user_management() -> control_action(list_permissions, [], [{"-p", "/testhost"}]), {error, {invalid_regexp, _, _}} = control_action(set_permissions, ["guest", "+foo", ".*", ".*"]), - {error, {invalid_scope, _}} = - control_action(set_permissions, ["guest", "foo", ".*", ".*"], - [{"-s", "cilent"}]), %% user creation ok = control_action(add_user, ["foo", "bar"]), @@ -987,9 +985,7 @@ test_user_management() -> ok = control_action(set_permissions, ["foo", ".*", ".*", ".*"], [{"-p", "/testhost"}]), ok = control_action(set_permissions, ["foo", ".*", ".*", ".*"], - [{"-p", "/testhost"}, {"-s", "client"}]), - ok = control_action(set_permissions, ["foo", ".*", ".*", ".*"], - [{"-p", "/testhost"}, {"-s", "all"}]), + [{"-p", "/testhost"}]), ok = control_action(list_permissions, [], [{"-p", "/testhost"}]), ok = control_action(list_permissions, [], [{"-p", "/testhost"}]), ok = control_action(list_user_permissions, ["foo"]), @@ -1297,7 +1293,7 @@ info_action(Command, Args, CheckVHost) -> {bad_argument, dummy} = control_action(Command, ["dummy"]), ok. -default_options() -> [{"-s", "client"}, {"-p", "/"}, {"-q", "false"}]. +default_options() -> [{"-p", "/"}, {"-q", "false"}]. expand_options(As, Bs) -> lists:foldl(fun({K, _}=A, R) -> @@ -1413,6 +1409,7 @@ test_backing_queue() -> application:set_env(rabbit, msg_store_file_size_limit, FileSizeLimit, infinity), passed = test_queue_index(), + passed = test_queue_index_props(), passed = test_variable_queue(), passed = test_queue_recover(), application:set_env(rabbit, queue_index_max_journal_entries, @@ -1430,17 +1427,17 @@ restart_msg_store_empty() -> guid_bin(X) -> erlang:md5(term_to_binary(X)). -msg_store_contains(Atom, Guids) -> +msg_store_contains(Atom, Guids, MSCState) -> Atom = lists:foldl( fun (Guid, Atom1) when Atom1 =:= Atom -> - rabbit_msg_store:contains(?PERSISTENT_MSG_STORE, Guid) end, + rabbit_msg_store:contains(Guid, MSCState) end, Atom, Guids). -msg_store_sync(Guids) -> +msg_store_sync(Guids, MSCState) -> Ref = make_ref(), Self = self(), - ok = rabbit_msg_store:sync(?PERSISTENT_MSG_STORE, Guids, - fun () -> Self ! {sync, Ref} end), + ok = rabbit_msg_store:sync(Guids, fun () -> Self ! {sync, Ref} end, + MSCState), receive {sync, Ref} -> ok after @@ -1452,55 +1449,64 @@ msg_store_sync(Guids) -> msg_store_read(Guids, MSCState) -> lists:foldl(fun (Guid, MSCStateM) -> {{ok, Guid}, MSCStateN} = rabbit_msg_store:read( - ?PERSISTENT_MSG_STORE, Guid, MSCStateM), MSCStateN end, MSCState, Guids). msg_store_write(Guids, MSCState) -> - lists:foldl(fun (Guid, {ok, MSCStateN}) -> - rabbit_msg_store:write(?PERSISTENT_MSG_STORE, - Guid, Guid, MSCStateN) - end, {ok, MSCState}, Guids). + ok = lists:foldl( + fun (Guid, ok) -> rabbit_msg_store:write(Guid, Guid, MSCState) end, + ok, Guids). + +msg_store_remove(Guids, MSCState) -> + rabbit_msg_store:remove(Guids, MSCState). -msg_store_remove(Guids) -> - rabbit_msg_store:remove(?PERSISTENT_MSG_STORE, Guids). +msg_store_remove(MsgStore, Ref, Guids) -> + with_msg_store_client(MsgStore, Ref, + fun (MSCStateM) -> + ok = msg_store_remove(Guids, MSCStateM), + MSCStateM + end). + +with_msg_store_client(MsgStore, Ref, Fun) -> + rabbit_msg_store:client_terminate( + Fun(rabbit_msg_store:client_init(MsgStore, Ref))). foreach_with_msg_store_client(MsgStore, Ref, Fun, L) -> rabbit_msg_store:client_terminate( - lists:foldl(fun (Guid, MSCState) -> Fun(Guid, MsgStore, MSCState) end, - rabbit_msg_store:client_init(MsgStore, Ref), L), MsgStore). + lists:foldl(fun (Guid, MSCState) -> Fun(Guid, MSCState) end, + rabbit_msg_store:client_init(MsgStore, Ref), L)). test_msg_store() -> restart_msg_store_empty(), Self = self(), Guids = [guid_bin(M) || M <- lists:seq(1,100)], {Guids1stHalf, Guids2ndHalf} = lists:split(50, Guids), - %% check we don't contain any of the msgs we're about to publish - false = msg_store_contains(false, Guids), Ref = rabbit_guid:guid(), MSCState = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref), + %% check we don't contain any of the msgs we're about to publish + false = msg_store_contains(false, Guids, MSCState), %% publish the first half - {ok, MSCState1} = msg_store_write(Guids1stHalf, MSCState), + ok = msg_store_write(Guids1stHalf, MSCState), %% sync on the first half - ok = msg_store_sync(Guids1stHalf), + ok = msg_store_sync(Guids1stHalf, MSCState), %% publish the second half - {ok, MSCState2} = msg_store_write(Guids2ndHalf, MSCState1), + ok = msg_store_write(Guids2ndHalf, MSCState), %% sync on the first half again - the msg_store will be dirty, but %% we won't need the fsync - ok = msg_store_sync(Guids1stHalf), + ok = msg_store_sync(Guids1stHalf, MSCState), %% check they're all in there - true = msg_store_contains(true, Guids), + true = msg_store_contains(true, Guids, MSCState), %% publish the latter half twice so we hit the caching and ref count code - {ok, MSCState3} = msg_store_write(Guids2ndHalf, MSCState2), + ok = msg_store_write(Guids2ndHalf, MSCState), %% check they're still all in there - true = msg_store_contains(true, Guids), + true = msg_store_contains(true, Guids, MSCState), %% sync on the 2nd half, but do lots of individual syncs to try %% and cause coalescing to happen ok = lists:foldl( fun (Guid, ok) -> rabbit_msg_store:sync( - ?PERSISTENT_MSG_STORE, - [Guid], fun () -> Self ! {sync, Guid} end) + [Guid], fun () -> Self ! {sync, Guid} end, + MSCState) end, ok, Guids2ndHalf), lists:foldl( fun(Guid, ok) -> @@ -1515,24 +1521,24 @@ test_msg_store() -> end, ok, Guids2ndHalf), %% it's very likely we're not dirty here, so the 1st half sync %% should hit a different code path - ok = msg_store_sync(Guids1stHalf), + ok = msg_store_sync(Guids1stHalf, MSCState), %% read them all - MSCState4 = msg_store_read(Guids, MSCState3), + MSCState1 = msg_store_read(Guids, MSCState), %% read them all again - this will hit the cache, not disk - MSCState5 = msg_store_read(Guids, MSCState4), + MSCState2 = msg_store_read(Guids, MSCState1), %% remove them all - ok = rabbit_msg_store:remove(?PERSISTENT_MSG_STORE, Guids), + ok = rabbit_msg_store:remove(Guids, MSCState2), %% check first half doesn't exist - false = msg_store_contains(false, Guids1stHalf), + false = msg_store_contains(false, Guids1stHalf, MSCState2), %% check second half does exist - true = msg_store_contains(true, Guids2ndHalf), + true = msg_store_contains(true, Guids2ndHalf, MSCState2), %% read the second half again - MSCState6 = msg_store_read(Guids2ndHalf, MSCState5), + MSCState3 = msg_store_read(Guids2ndHalf, MSCState2), %% release the second half, just for fun (aka code coverage) - ok = rabbit_msg_store:release(?PERSISTENT_MSG_STORE, Guids2ndHalf), + ok = rabbit_msg_store:release(Guids2ndHalf, MSCState3), %% read the second half again, just for fun (aka code coverage) - MSCState7 = msg_store_read(Guids2ndHalf, MSCState6), - ok = rabbit_msg_store:client_terminate(MSCState7, ?PERSISTENT_MSG_STORE), + MSCState4 = msg_store_read(Guids2ndHalf, MSCState3), + ok = rabbit_msg_store:client_terminate(MSCState4), %% stop and restart, preserving every other msg in 2nd half ok = rabbit_variable_queue:stop_msg_store(), ok = rabbit_variable_queue:start_msg_store( @@ -1543,22 +1549,26 @@ test_msg_store() -> ([Guid|GuidsTail]) -> {Guid, 0, GuidsTail} end, Guids2ndHalf}), + MSCState5 = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref), %% check we have the right msgs left lists:foldl( fun (Guid, Bool) -> - not(Bool = rabbit_msg_store:contains(?PERSISTENT_MSG_STORE, Guid)) + not(Bool = rabbit_msg_store:contains(Guid, MSCState5)) end, false, Guids2ndHalf), + ok = rabbit_msg_store:client_terminate(MSCState5), %% restart empty restart_msg_store_empty(), + MSCState6 = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref), %% check we don't contain any of the msgs - false = msg_store_contains(false, Guids), + false = msg_store_contains(false, Guids, MSCState6), %% publish the first half again - MSCState8 = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref), - {ok, MSCState9} = msg_store_write(Guids1stHalf, MSCState8), + ok = msg_store_write(Guids1stHalf, MSCState6), %% this should force some sort of sync internally otherwise misread ok = rabbit_msg_store:client_terminate( - msg_store_read(Guids1stHalf, MSCState9), ?PERSISTENT_MSG_STORE), - ok = rabbit_msg_store:remove(?PERSISTENT_MSG_STORE, Guids1stHalf), + msg_store_read(Guids1stHalf, MSCState6)), + MSCState7 = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref), + ok = rabbit_msg_store:remove(Guids1stHalf, MSCState7), + ok = rabbit_msg_store:client_terminate(MSCState7), %% restart empty restart_msg_store_empty(), %% now safe to reuse guids %% push a lot of msgs in... at least 100 files worth @@ -1567,31 +1577,39 @@ test_msg_store() -> BigCount = trunc(100 * FileSize / (PayloadSizeBits div 8)), GuidsBig = [guid_bin(X) || X <- lists:seq(1, BigCount)], Payload = << 0:PayloadSizeBits >>, - ok = foreach_with_msg_store_client( + ok = with_msg_store_client( ?PERSISTENT_MSG_STORE, Ref, - fun (Guid, MsgStore, MSCStateM) -> - {ok, MSCStateN} = rabbit_msg_store:write( - MsgStore, Guid, Payload, MSCStateM), - MSCStateN - end, GuidsBig), + fun (MSCStateM) -> + [ok = rabbit_msg_store:write(Guid, Payload, MSCStateM) || + Guid <- GuidsBig], + MSCStateM + end), %% now read them to ensure we hit the fast client-side reading ok = foreach_with_msg_store_client( ?PERSISTENT_MSG_STORE, Ref, - fun (Guid, MsgStore, MSCStateM) -> + fun (Guid, MSCStateM) -> {{ok, Payload}, MSCStateN} = rabbit_msg_store:read( - MsgStore, Guid, MSCStateM), + Guid, MSCStateM), MSCStateN end, GuidsBig), %% .., then 3s by 1... - ok = msg_store_remove([guid_bin(X) || X <- lists:seq(BigCount, 1, -3)]), + ok = msg_store_remove(?PERSISTENT_MSG_STORE, Ref, + [guid_bin(X) || X <- lists:seq(BigCount, 1, -3)]), %% .., then remove 3s by 2, from the young end first. This hits %% GC (under 50% good data left, but no empty files. Must GC). - ok = msg_store_remove([guid_bin(X) || X <- lists:seq(BigCount-1, 1, -3)]), + ok = msg_store_remove(?PERSISTENT_MSG_STORE, Ref, + [guid_bin(X) || X <- lists:seq(BigCount-1, 1, -3)]), %% .., then remove 3s by 3, from the young end first. This hits %% GC... - ok = msg_store_remove([guid_bin(X) || X <- lists:seq(BigCount-2, 1, -3)]), + ok = msg_store_remove(?PERSISTENT_MSG_STORE, Ref, + [guid_bin(X) || X <- lists:seq(BigCount-2, 1, -3)]), %% ensure empty - false = msg_store_contains(false, GuidsBig), + ok = with_msg_store_client( + ?PERSISTENT_MSG_STORE, Ref, + fun (MSCStateM) -> + false = msg_store_contains(false, GuidsBig, MSCStateM), + MSCStateM + end), %% restart empty restart_msg_store_empty(), passed. @@ -1603,11 +1621,18 @@ test_queue() -> queue_name(<<"test">>). init_test_queue() -> - rabbit_queue_index:init( - test_queue(), true, false, - fun (Guid) -> - rabbit_msg_store:contains(?PERSISTENT_MSG_STORE, Guid) - end). + TestQueue = test_queue(), + Terms = rabbit_queue_index:shutdown_terms(TestQueue), + PRef = proplists:get_value(persistent_ref, Terms, rabbit_guid:guid()), + PersistentClient = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, + PRef), + Res = rabbit_queue_index:recover( + TestQueue, Terms, false, + fun (Guid) -> + rabbit_msg_store:contains(Guid, PersistentClient) + end), + ok = rabbit_msg_store:client_delete_and_terminate(PersistentClient), + Res. restart_test_queue(Qi) -> _ = rabbit_queue_index:terminate([], Qi), @@ -1618,13 +1643,13 @@ restart_test_queue(Qi) -> empty_test_queue() -> ok = rabbit_variable_queue:stop(), ok = rabbit_variable_queue:start([]), - {0, _Terms, Qi} = init_test_queue(), + {0, Qi} = init_test_queue(), _ = rabbit_queue_index:delete_and_terminate(Qi), ok. with_empty_test_queue(Fun) -> ok = empty_test_queue(), - {0, _Terms, Qi} = init_test_queue(), + {0, Qi} = init_test_queue(), rabbit_queue_index:delete_and_terminate(Fun(Qi)). queue_index_publish(SeqIds, Persistent, Qi) -> @@ -1633,29 +1658,44 @@ queue_index_publish(SeqIds, Persistent, Qi) -> true -> ?PERSISTENT_MSG_STORE; false -> ?TRANSIENT_MSG_STORE end, - {A, B, MSCStateEnd} = + MSCState = rabbit_msg_store:client_init(MsgStore, Ref), + {A, B} = lists:foldl( - fun (SeqId, {QiN, SeqIdsGuidsAcc, MSCStateN}) -> + fun (SeqId, {QiN, SeqIdsGuidsAcc}) -> Guid = rabbit_guid:guid(), QiM = rabbit_queue_index:publish( - Guid, SeqId, Persistent, QiN), - {ok, MSCStateM} = rabbit_msg_store:write(MsgStore, Guid, - Guid, MSCStateN), - {QiM, [{SeqId, Guid} | SeqIdsGuidsAcc], MSCStateM} - end, {Qi, [], rabbit_msg_store:client_init(MsgStore, Ref)}, SeqIds), - ok = rabbit_msg_store:client_delete_and_terminate( - MSCStateEnd, MsgStore, Ref), + Guid, SeqId, #message_properties{}, Persistent, QiN), + ok = rabbit_msg_store:write(Guid, Guid, MSCState), + {QiM, [{SeqId, Guid} | SeqIdsGuidsAcc]} + end, {Qi, []}, SeqIds), + ok = rabbit_msg_store:client_delete_and_terminate(MSCState), {A, B}. verify_read_with_published(_Delivered, _Persistent, [], _) -> ok; verify_read_with_published(Delivered, Persistent, - [{Guid, SeqId, Persistent, Delivered}|Read], + [{Guid, SeqId, _Props, Persistent, Delivered}|Read], [{SeqId, Guid}|Published]) -> verify_read_with_published(Delivered, Persistent, Read, Published); verify_read_with_published(_Delivered, _Persistent, _Read, _Published) -> ko. +test_queue_index_props() -> + with_empty_test_queue( + fun(Qi0) -> + Guid = rabbit_guid:guid(), + Props = #message_properties{expiry=12345}, + Qi1 = rabbit_queue_index:publish(Guid, 1, Props, true, Qi0), + {[{Guid, 1, Props, _, _}], Qi2} = + rabbit_queue_index:read(1, 2, Qi1), + Qi2 + end), + + ok = rabbit_variable_queue:stop(), + ok = rabbit_variable_queue:start([]), + + passed. + test_queue_index() -> SegmentSize = rabbit_queue_index:next_segment_boundary(0), TwoSegs = SegmentSize + SegmentSize, @@ -1674,7 +1714,7 @@ test_queue_index() -> ok = verify_read_with_published(false, false, ReadA, lists:reverse(SeqIdsGuidsA)), %% should get length back as 0, as all the msgs were transient - {0, _Terms1, Qi6} = restart_test_queue(Qi4), + {0, Qi6} = restart_test_queue(Qi4), {0, 0, Qi7} = rabbit_queue_index:bounds(Qi6), {Qi8, SeqIdsGuidsB} = queue_index_publish(SeqIdsB, true, Qi7), {0, TwoSegs, Qi9} = rabbit_queue_index:bounds(Qi8), @@ -1683,7 +1723,7 @@ test_queue_index() -> lists:reverse(SeqIdsGuidsB)), %% should get length back as MostOfASegment LenB = length(SeqIdsB), - {LenB, _Terms2, Qi12} = restart_test_queue(Qi10), + {LenB, Qi12} = restart_test_queue(Qi10), {0, TwoSegs, Qi13} = rabbit_queue_index:bounds(Qi12), Qi14 = rabbit_queue_index:deliver(SeqIdsB, Qi13), {ReadC, Qi15} = rabbit_queue_index:read(0, SegmentSize, Qi14), @@ -1695,7 +1735,7 @@ test_queue_index() -> {0, 0, Qi18} = rabbit_queue_index:bounds(Qi17), %% should get length back as 0 because all persistent %% msgs have been acked - {0, _Terms3, Qi19} = restart_test_queue(Qi18), + {0, Qi19} = restart_test_queue(Qi18), Qi19 end), @@ -1767,11 +1807,11 @@ test_queue_index() -> true, Qi0), Qi2 = rabbit_queue_index:deliver([0,1,4], Qi1), Qi3 = rabbit_queue_index:ack([0], Qi2), - {5, _Terms9, Qi4} = restart_test_queue(Qi3), + {5, Qi4} = restart_test_queue(Qi3), {Qi5, _SeqIdsGuidsF} = queue_index_publish([3,6,8], true, Qi4), Qi6 = rabbit_queue_index:deliver([2,3,5,6], Qi5), Qi7 = rabbit_queue_index:ack([1,2,3], Qi6), - {5, _Terms10, Qi8} = restart_test_queue(Qi7), + {5, Qi8} = restart_test_queue(Qi7), Qi8 end), @@ -1789,7 +1829,8 @@ variable_queue_publish(IsPersistent, Count, VQ) -> <<>>, #'P_basic'{delivery_mode = case IsPersistent of true -> 2; false -> 1 - end}, <<>>), VQN) + end}, <<>>), + #message_properties{}, VQN) end, VQ, lists:seq(1, Count)). variable_queue_fetch(Count, IsPersistent, IsDelivered, Len, VQ) -> @@ -1823,9 +1864,41 @@ test_variable_queue() -> F <- [fun test_variable_queue_dynamic_duration_change/1, fun test_variable_queue_partial_segments_delta_thing/1, fun test_variable_queue_all_the_bits_not_covered_elsewhere1/1, - fun test_variable_queue_all_the_bits_not_covered_elsewhere2/1]], + fun test_variable_queue_all_the_bits_not_covered_elsewhere2/1, + fun test_dropwhile/1]], passed. +test_dropwhile(VQ0) -> + Count = 10, + + %% add messages with sequential expiry + VQ1 = lists:foldl( + fun (N, VQN) -> + rabbit_variable_queue:publish( + rabbit_basic:message( + rabbit_misc:r(<<>>, exchange, <<>>), + <<>>, #'P_basic'{}, <<>>), + #message_properties{expiry = N}, VQN) + end, VQ0, lists:seq(1, Count)), + + %% drop the first 5 messages + VQ2 = rabbit_variable_queue:dropwhile( + fun(#message_properties { expiry = Expiry }) -> + Expiry =< 5 + end, VQ1), + + %% fetch five now + VQ3 = lists:foldl(fun (_N, VQN) -> + {{#basic_message{}, _, _, _}, VQM} = + rabbit_variable_queue:fetch(false, VQN), + VQM + end, VQ2, lists:seq(6, Count)), + + %% should be empty now + {empty, VQ4} = rabbit_variable_queue:fetch(false, VQ3), + + VQ4. + test_variable_queue_dynamic_duration_change(VQ0) -> SegmentSize = rabbit_queue_index:next_segment_boundary(0), @@ -1836,6 +1909,7 @@ test_variable_queue_dynamic_duration_change(VQ0) -> %% squeeze and relax queue Churn = Len div 32, VQ2 = publish_fetch_and_ack(Churn, Len, VQ1), + {Duration, VQ3} = rabbit_variable_queue:ram_duration(VQ2), VQ7 = lists:foldl( fun (Duration1, VQ4) -> @@ -1934,7 +2008,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) -> VQ1 = rabbit_variable_queue:set_ram_duration_target(0, VQ0), VQ2 = variable_queue_publish(false, 4, VQ1), {VQ3, AckTags} = variable_queue_fetch(2, false, false, 4, VQ2), - VQ4 = rabbit_variable_queue:requeue(AckTags, VQ3), + VQ4 = rabbit_variable_queue:requeue(AckTags, fun(X) -> X end, VQ3), VQ5 = rabbit_variable_queue:idle_timeout(VQ4), _VQ6 = rabbit_variable_queue:terminate(VQ5), VQ7 = rabbit_variable_queue:init(test_queue(), true, true), @@ -1974,3 +2048,56 @@ test_queue_recover() -> rabbit_amqqueue:internal_delete(QName) end), passed. + +test_configurable_server_properties() -> + %% List of the names of the built-in properties do we expect to find + BuiltInPropNames = [<<"product">>, <<"version">>, <<"platform">>, + <<"copyright">>, <<"information">>], + + %% Verify that the built-in properties are initially present + ActualPropNames = [Key || + {Key, longstr, _} <- rabbit_reader:server_properties()], + true = lists:all(fun (X) -> lists:member(X, ActualPropNames) end, + BuiltInPropNames), + + %% Get the initial server properties configured in the environment + {ok, ServerProperties} = application:get_env(rabbit, server_properties), + + %% Helper functions + ConsProp = fun (X) -> application:set_env(rabbit, + server_properties, + [X | ServerProperties]) end, + IsPropPresent = fun (X) -> lists:member(X, + rabbit_reader:server_properties()) + end, + + %% Add a wholly new property of the simplified {KeyAtom, StringValue} form + NewSimplifiedProperty = {NewHareKey, NewHareVal} = {hare, "soup"}, + ConsProp(NewSimplifiedProperty), + %% Do we find hare soup, appropriately formatted in the generated properties? + ExpectedHareImage = {list_to_binary(atom_to_list(NewHareKey)), + longstr, + list_to_binary(NewHareVal)}, + true = IsPropPresent(ExpectedHareImage), + + %% Add a wholly new property of the {BinaryKey, Type, Value} form + %% and check for it + NewProperty = {<<"new-bin-key">>, signedint, -1}, + ConsProp(NewProperty), + %% Do we find the new property? + true = IsPropPresent(NewProperty), + + %% Add a property that clobbers a built-in, and verify correct clobbering + {NewVerKey, NewVerVal} = NewVersion = {version, "X.Y.Z."}, + {BinNewVerKey, BinNewVerVal} = {list_to_binary(atom_to_list(NewVerKey)), + list_to_binary(NewVerVal)}, + ConsProp(NewVersion), + ClobberedServerProps = rabbit_reader:server_properties(), + %% Is the clobbering insert present? + true = IsPropPresent({BinNewVerKey, longstr, BinNewVerVal}), + %% Is the clobbering insert the only thing with the clobbering key? + [{BinNewVerKey, longstr, BinNewVerVal}] = + [E || {K, longstr, _V} = E <- ClobberedServerProps, K =:= BinNewVerKey], + + application:set_env(rabbit, server_properties, ServerProperties), + passed. diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl index b971a63f..b9993823 100644 --- a/src/rabbit_types.erl +++ b/src/rabbit_types.erl @@ -35,10 +35,11 @@ -ifdef(use_specs). --export_type([txn/0, maybe/1, info/0, info_key/0, message/0, basic_message/0, +-export_type([txn/0, maybe/1, info/0, infos/0, info_key/0, info_keys/0, + message/0, basic_message/0, delivery/0, content/0, decoded_content/0, undecoded_content/0, - unencoded_content/0, encoded_content/0, vhost/0, ctag/0, - amqp_error/0, r/1, r2/2, r3/3, listener/0, + unencoded_content/0, encoded_content/0, message_properties/0, + vhost/0, ctag/0, amqp_error/0, r/1, r2/2, r3/3, listener/0, binding/0, binding_source/0, binding_destination/0, amqqueue/0, exchange/0, connection/0, protocol/0, user/0, ok/1, error/1, ok_or_error/1, @@ -88,12 +89,17 @@ txn :: maybe(txn()), sender :: pid(), message :: message()}). +-type(message_properties() :: + #message_properties{expiry :: pos_integer() | 'undefined'}). %% this is really an abstract type, but dialyzer does not support them -type(txn() :: rabbit_guid:guid()). -type(info_key() :: atom()). +-type(info_keys() :: [info_key()]). + -type(info() :: {info_key(), any()}). +-type(infos() :: [info()]). -type(amqp_error() :: #amqp_error{name :: rabbit_framing:amqp_exception(), @@ -141,12 +147,12 @@ -type(connection() :: pid()). --type(protocol() :: 'rabbit_framing_amqp_0_8' | 'rabbit_framing_amqp_0_9_1'). +-type(protocol() :: rabbit_framing:protocol()). -type(user() :: - #user{username :: rabbit_access_control:username(), - password :: rabbit_access_control:password(), - is_admin :: boolean()}). + #user{username :: rabbit_access_control:username(), + password_hash :: rabbit_access_control:password_hash(), + is_admin :: boolean()}). -type(ok(A) :: {'ok', A}). -type(error(A) :: {'error', A}). diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl new file mode 100644 index 00000000..0071a08a --- /dev/null +++ b/src/rabbit_upgrade.erl @@ -0,0 +1,156 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are Rabbit Technologies Ltd. +%% +%% Copyright (C) 2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_upgrade). + +-export([maybe_upgrade/0, read_version/0, write_version/0, desired_version/0]). + +-include("rabbit.hrl"). + +-define(VERSION_FILENAME, "schema_version"). +-define(LOCK_FILENAME, "schema_upgrade_lock"). + +%% ------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(maybe_upgrade/0 :: () -> 'ok' | 'version_not_available'). +-spec(read_version/0 :: + () -> {'ok', [any()]} | rabbit_types:error(any())). +-spec(write_version/0 :: () -> 'ok'). +-spec(desired_version/0 :: () -> [atom()]). + +-endif. + +%% ------------------------------------------------------------------- + +%% Try to upgrade the schema. If no information on the existing schema +%% could be found, do nothing. rabbit_mnesia:check_schema_integrity() +%% will catch the problem. +maybe_upgrade() -> + case read_version() of + {ok, CurrentHeads} -> + G = load_graph(), + case unknown_heads(CurrentHeads, G) of + [] -> + case upgrades_to_apply(CurrentHeads, G) of + [] -> ok; + Upgrades -> apply_upgrades(Upgrades) + end; + Unknown -> + exit({future_upgrades_found, Unknown}) + end, + true = digraph:delete(G), + ok; + {error, enoent} -> + version_not_available + end. + +read_version() -> + case rabbit_misc:read_term_file(schema_filename()) of + {ok, [Heads]} -> {ok, Heads}; + {error, E} -> {error, E} + end. + +write_version() -> + ok = rabbit_misc:write_term_file(schema_filename(), [desired_version()]), + ok. + +desired_version() -> + G = load_graph(), + Version = heads(G), + true = digraph:delete(G), + Version. + +%% ------------------------------------------------------------------- + +load_graph() -> + Upgrades = rabbit_misc:all_module_attributes(rabbit_upgrade), + rabbit_misc:build_acyclic_graph( + fun vertices/2, fun edges/2, fun graph_build_error/1, Upgrades). + +vertices(Module, Steps) -> + [{StepName, {Module, StepName}} || {StepName, _Reqs} <- Steps]. + +edges(_Module, Steps) -> + [{Require, StepName} || {StepName, Requires} <- Steps, Require <- Requires]. + +graph_build_error({vertex, duplicate, StepName}) -> + exit({duplicate_upgrade, StepName}); +graph_build_error({edge, E, From, To}) -> + exit({E, From, To}). + +unknown_heads(Heads, G) -> + [H || H <- Heads, digraph:vertex(G, H) =:= false]. + +upgrades_to_apply(Heads, G) -> + %% Take all the vertices which can reach the known heads. That's + %% everything we've already applied. Subtract that from all + %% vertices: that's what we have to apply. + Unsorted = sets:to_list( + sets:subtract( + sets:from_list(digraph:vertices(G)), + sets:from_list(digraph_utils:reaching(Heads, G)))), + %% Form a subgraph from that list and find a topological ordering + %% so we can invoke them in order. + [element(2, digraph:vertex(G, StepName)) + || StepName <- digraph_utils:topsort(digraph_utils:subgraph(G, Unsorted))]. + +heads(G) -> + lists:sort([V || V <- digraph:vertices(G), digraph:out_degree(G, V) =:= 0]). + +%% ------------------------------------------------------------------- + +apply_upgrades(Upgrades) -> + LockFile = lock_filename(), + case file:open(LockFile, [write, exclusive]) of + {ok, Lock} -> + ok = file:close(Lock), + info("Upgrades: ~w to apply~n", [length(Upgrades)]), + [apply_upgrade(Upgrade) || Upgrade <- Upgrades], + info("Upgrades: All applied~n", []), + ok = write_version(), + ok = file:delete(LockFile); + {error, eexist} -> + exit(previous_upgrade_failed); + {error, _} = Error -> + exit(Error) + end. + +apply_upgrade({M, F}) -> + info("Upgrades: Applying ~w:~w~n", [M, F]), + ok = apply(M, F, []). + +%% ------------------------------------------------------------------- + +schema_filename() -> + filename:join(dir(), ?VERSION_FILENAME). + +lock_filename() -> + filename:join(dir(), ?LOCK_FILENAME). + +%% NB: we cannot use rabbit_log here since it may not have been +%% started yet +info(Msg, Args) -> + error_logger:info_msg(Msg, Args). + +dir() -> + rabbit_mnesia:dir(). diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl new file mode 100644 index 00000000..59b8705d --- /dev/null +++ b/src/rabbit_upgrade_functions.erl @@ -0,0 +1,51 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are Rabbit Technologies Ltd. +%% +%% Copyright (C) 2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% +-module(rabbit_upgrade_functions). + +-include("rabbit.hrl"). + +-compile([export_all]). + +-rabbit_upgrade({remove_user_scope, []}). + +%% ------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(remove_user_scope/0 :: () -> 'ok'). + +-endif. + +%%-------------------------------------------------------------------- + +remove_user_scope() -> + {atomic, ok} = mnesia:transform_table( + rabbit_user_permission, + fun (Perm = #user_permission{ + permission = {permission, + _Scope, Conf, Write, Read}}) -> + Perm#user_permission{ + permission = #permission{configure = Conf, + write = Write, + read = Read}} + end, + record_info(fields, user_permission)), + ok. diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index cbc71bcc..69d62fde 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -32,9 +32,9 @@ -module(rabbit_variable_queue). -export([init/3, terminate/1, delete_and_terminate/1, - purge/1, publish/2, publish_delivered/3, fetch/2, ack/2, - tx_publish/3, tx_ack/3, tx_rollback/2, tx_commit/3, - requeue/2, len/1, is_empty/1, + purge/1, publish/3, publish_delivered/4, fetch/2, ack/2, + tx_publish/4, tx_ack/3, tx_rollback/2, tx_commit/4, + requeue/3, len/1, is_empty/1, dropwhile/2, set_ram_duration_target/2, ram_duration/1, needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1, status/1]). @@ -229,7 +229,6 @@ len, persistent_count, - duration_target, target_ram_msg_count, ram_msg_count, ram_msg_count_prev, @@ -248,7 +247,8 @@ is_persistent, is_delivered, msg_on_disk, - index_on_disk + index_on_disk, + msg_props }). -record(delta, @@ -294,7 +294,8 @@ -type(sync() :: #sync { acks_persistent :: [[seq_id()]], acks_all :: [[seq_id()]], - pubs :: [[rabbit_guid:guid()]], + pubs :: [{message_properties_transformer(), + [rabbit_types:basic_message()]}], funs :: [fun (() -> any())] }). -type(state() :: #vqstate { @@ -304,7 +305,7 @@ q3 :: bpqueue:bpqueue(), q4 :: queue(), next_seq_id :: seq_id(), - pending_ack :: dict:dictionary(), + pending_ack :: dict(), index_state :: any(), msg_store_clients :: 'undefined' | {{any(), binary()}, {any(), binary()}}, @@ -315,7 +316,6 @@ persistent_count :: non_neg_integer(), transient_threshold :: non_neg_integer(), - duration_target :: number() | 'infinity', target_ram_msg_count :: non_neg_integer() | 'infinity', ram_msg_count :: non_neg_integer(), ram_msg_count_prev :: non_neg_integer(), @@ -368,16 +368,17 @@ stop_msg_store() -> ok = rabbit_sup:stop_child(?PERSISTENT_MSG_STORE), ok = rabbit_sup:stop_child(?TRANSIENT_MSG_STORE). -init(QueueName, IsDurable, Recover) -> - {DeltaCount, Terms, IndexState} = - rabbit_queue_index:init( - QueueName, Recover, - rabbit_msg_store:successfully_recovered_state(?PERSISTENT_MSG_STORE), - fun (Guid) -> - rabbit_msg_store:contains(?PERSISTENT_MSG_STORE, Guid) - end), - {LowSeqId, NextSeqId, IndexState1} = rabbit_queue_index:bounds(IndexState), +init(QueueName, IsDurable, false) -> + IndexState = rabbit_queue_index:init(QueueName), + init(IsDurable, IndexState, 0, [], + case IsDurable of + true -> msg_store_client_init(?PERSISTENT_MSG_STORE); + false -> undefined + end, + msg_store_client_init(?TRANSIENT_MSG_STORE)); +init(QueueName, true, true) -> + Terms = rabbit_queue_index:shutdown_terms(QueueName), {PRef, TRef, Terms1} = case [persistent_ref, transient_ref] -- proplists:get_keys(Terms) of [] -> {proplists:get_value(persistent_ref, Terms), @@ -385,64 +386,32 @@ init(QueueName, IsDurable, Recover) -> Terms}; _ -> {rabbit_guid:guid(), rabbit_guid:guid(), []} end, - DeltaCount1 = proplists:get_value(persistent_count, Terms1, DeltaCount), - Delta = case DeltaCount1 == 0 andalso DeltaCount /= undefined of - true -> ?BLANK_DELTA; - false -> #delta { start_seq_id = LowSeqId, - count = DeltaCount1, - end_seq_id = NextSeqId } - end, - Now = now(), - PersistentClient = - case IsDurable of - true -> rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, PRef); - false -> undefined - end, - TransientClient = rabbit_msg_store:client_init(?TRANSIENT_MSG_STORE, TRef), - State = #vqstate { - q1 = queue:new(), - q2 = bpqueue:new(), - delta = Delta, - q3 = bpqueue:new(), - q4 = queue:new(), - next_seq_id = NextSeqId, - pending_ack = dict:new(), - index_state = IndexState1, - msg_store_clients = {{PersistentClient, PRef}, - {TransientClient, TRef}}, - on_sync = ?BLANK_SYNC, - durable = IsDurable, - transient_threshold = NextSeqId, - - len = DeltaCount1, - persistent_count = DeltaCount1, - - duration_target = infinity, - target_ram_msg_count = infinity, - ram_msg_count = 0, - ram_msg_count_prev = 0, - ram_index_count = 0, - out_counter = 0, - in_counter = 0, - rates = #rates { egress = {Now, 0}, - ingress = {Now, DeltaCount1}, - avg_egress = 0.0, - avg_ingress = 0.0, - timestamp = Now } }, - a(maybe_deltas_to_betas(State)). + PersistentClient = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, + PRef), + TransientClient = rabbit_msg_store:client_init(?TRANSIENT_MSG_STORE, + TRef), + {DeltaCount, IndexState} = + rabbit_queue_index:recover( + QueueName, Terms1, + rabbit_msg_store:successfully_recovered_state(?PERSISTENT_MSG_STORE), + fun (Guid) -> + rabbit_msg_store:contains(Guid, PersistentClient) + end), + init(true, IndexState, DeltaCount, Terms1, + PersistentClient, TransientClient). terminate(State) -> State1 = #vqstate { persistent_count = PCount, index_state = IndexState, - msg_store_clients = {{MSCStateP, PRef}, - {MSCStateT, TRef}} } = + msg_store_clients = {MSCStateP, MSCStateT} } = remove_pending_ack(true, tx_commit_index(State)), - case MSCStateP of - undefined -> ok; - _ -> rabbit_msg_store:client_terminate( - MSCStateP, ?PERSISTENT_MSG_STORE) - end, - rabbit_msg_store:client_terminate(MSCStateT, ?TRANSIENT_MSG_STORE), + PRef = case MSCStateP of + undefined -> undefined; + _ -> ok = rabbit_msg_store:client_terminate(MSCStateP), + rabbit_msg_store:client_ref(MSCStateP) + end, + ok = rabbit_msg_store:client_terminate(MSCStateT), + TRef = rabbit_msg_store:client_ref(MSCStateT), Terms = [{persistent_ref, PRef}, {transient_ref, TRef}, {persistent_count, PCount}], @@ -458,191 +427,247 @@ delete_and_terminate(State) -> %% deleting it. {_PurgeCount, State1} = purge(State), State2 = #vqstate { index_state = IndexState, - msg_store_clients = {{MSCStateP, PRef}, - {MSCStateT, TRef}} } = + msg_store_clients = {MSCStateP, MSCStateT} } = remove_pending_ack(false, State1), IndexState1 = rabbit_queue_index:delete_and_terminate(IndexState), case MSCStateP of undefined -> ok; - _ -> rabbit_msg_store:client_delete_and_terminate( - MSCStateP, ?PERSISTENT_MSG_STORE, PRef) + _ -> rabbit_msg_store:client_delete_and_terminate(MSCStateP) end, - rabbit_msg_store:client_delete_and_terminate( - MSCStateT, ?TRANSIENT_MSG_STORE, TRef), + rabbit_msg_store:client_delete_and_terminate(MSCStateT), a(State2 #vqstate { index_state = IndexState1, msg_store_clients = undefined }). -purge(State = #vqstate { q4 = Q4, - index_state = IndexState, - len = Len, - persistent_count = PCount }) -> +purge(State = #vqstate { q4 = Q4, + index_state = IndexState, + msg_store_clients = MSCState, + len = Len, + persistent_count = PCount }) -> %% TODO: when there are no pending acks, which is a common case, %% we could simply wipe the qi instead of issuing delivers and %% acks for all the messages. {LensByStore, IndexState1} = remove_queue_entries( fun rabbit_misc:queue_fold/3, Q4, - orddict:new(), IndexState), - {LensByStore1, State1 = #vqstate { q1 = Q1, index_state = IndexState2 }} = + orddict:new(), IndexState, MSCState), + {LensByStore1, State1 = #vqstate { q1 = Q1, + index_state = IndexState2, + msg_store_clients = MSCState1 }} = purge_betas_and_deltas(LensByStore, State #vqstate { q4 = queue:new(), index_state = IndexState1 }), {LensByStore2, IndexState3} = remove_queue_entries( fun rabbit_misc:queue_fold/3, Q1, - LensByStore1, IndexState2), + LensByStore1, IndexState2, MSCState1), PCount1 = PCount - find_persistent_count(LensByStore2), - {Len, a(State1 #vqstate { q1 = queue:new(), - index_state = IndexState3, - len = 0, - ram_msg_count = 0, - ram_index_count = 0, - persistent_count = PCount1 })}. - -publish(Msg, State) -> - {_SeqId, State1} = publish(Msg, false, false, State), + {Len, a(State1 #vqstate { q1 = queue:new(), + index_state = IndexState3, + len = 0, + ram_msg_count = 0, + ram_index_count = 0, + persistent_count = PCount1 })}. + +publish(Msg, MsgProps, State) -> + {_SeqId, State1} = publish(Msg, MsgProps, false, false, State), a(reduce_memory_use(State1)). -publish_delivered(false, _Msg, State = #vqstate { len = 0 }) -> +publish_delivered(false, _Msg, _MsgProps, State = #vqstate { len = 0 }) -> {blank_ack, a(State)}; publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent }, - State = #vqstate { len = 0, - next_seq_id = SeqId, - out_counter = OutCount, - in_counter = InCount, - persistent_count = PCount, - pending_ack = PA, - durable = IsDurable }) -> + MsgProps, + State = #vqstate { len = 0, + next_seq_id = SeqId, + out_counter = OutCount, + in_counter = InCount, + persistent_count = PCount, + pending_ack = PA, + durable = IsDurable }) -> IsPersistent1 = IsDurable andalso IsPersistent, - MsgStatus = (msg_status(IsPersistent1, SeqId, Msg)) + MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, MsgProps)) #msg_status { is_delivered = true }, {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State), PA1 = record_pending_ack(m(MsgStatus1), PA), PCount1 = PCount + one_if(IsPersistent1), - {SeqId, a(State1 #vqstate { next_seq_id = SeqId + 1, - out_counter = OutCount + 1, - in_counter = InCount + 1, - persistent_count = PCount1, - pending_ack = PA1 })}. - -fetch(AckRequired, State = #vqstate { q4 = Q4, - ram_msg_count = RamMsgCount, - out_counter = OutCount, - index_state = IndexState, - len = Len, - persistent_count = PCount, - pending_ack = PA }) -> + {SeqId, a(State1 #vqstate { next_seq_id = SeqId + 1, + out_counter = OutCount + 1, + in_counter = InCount + 1, + persistent_count = PCount1, + pending_ack = PA1 })}. + +dropwhile(Pred, State) -> + {_OkOrEmpty, State1} = dropwhile1(Pred, State), + State1. + +dropwhile1(Pred, State) -> + internal_queue_out( + fun(MsgStatus = #msg_status { msg_props = MsgProps }, State1) -> + case Pred(MsgProps) of + true -> + {_, State2} = internal_fetch(false, MsgStatus, State1), + dropwhile1(Pred, State2); + false -> + %% message needs to go back into Q4 (or maybe go + %% in for the first time if it was loaded from + %% Q3). Also the msg contents might not be in + %% RAM, so read them in now + {MsgStatus1, State2 = #vqstate { q4 = Q4 }} = + read_msg(MsgStatus, State1), + {ok, State2 #vqstate {q4 = queue:in_r(MsgStatus1, Q4) }} + end + end, State). + +fetch(AckRequired, State) -> + internal_queue_out( + fun(MsgStatus, State1) -> + %% it's possible that the message wasn't read from disk + %% at this point, so read it in. + {MsgStatus1, State2} = read_msg(MsgStatus, State1), + internal_fetch(AckRequired, MsgStatus1, State2) + end, State). + +internal_queue_out(Fun, State = #vqstate { q4 = Q4 }) -> case queue:out(Q4) of {empty, _Q4} -> - case fetch_from_q3_to_q4(State) of - {empty, State1} = Result -> a(State1), Result; - {loaded, State1} -> fetch(AckRequired, State1) + case fetch_from_q3(State) of + {empty, State1} = Result -> a(State1), Result; + {loaded, {MsgStatus, State1}} -> Fun(MsgStatus, State1) end; - {{value, MsgStatus = #msg_status { - msg = Msg, guid = Guid, seq_id = SeqId, - is_persistent = IsPersistent, is_delivered = IsDelivered, - msg_on_disk = MsgOnDisk, index_on_disk = IndexOnDisk }}, - Q4a} -> - - %% 1. Mark it delivered if necessary - IndexState1 = maybe_write_delivered( - IndexOnDisk andalso not IsDelivered, - SeqId, IndexState), - - %% 2. Remove from msg_store and queue index, if necessary - MsgStore = find_msg_store(IsPersistent), - Rem = fun () -> ok = rabbit_msg_store:remove(MsgStore, [Guid]) end, - Ack = fun () -> rabbit_queue_index:ack([SeqId], IndexState1) end, - IndexState2 = - case {AckRequired, MsgOnDisk, IndexOnDisk, IsPersistent} of - {false, true, false, _} -> Rem(), IndexState1; - {false, true, true, _} -> Rem(), Ack(); - { true, true, true, false} -> Ack(); - _ -> IndexState1 - end, - - %% 3. If an ack is required, add something sensible to PA - {AckTag, PA1} = case AckRequired of - true -> PA2 = record_pending_ack( - MsgStatus #msg_status { - is_delivered = true }, PA), - {SeqId, PA2}; - false -> {blank_ack, PA} - end, - - PCount1 = PCount - one_if(IsPersistent andalso not AckRequired), - Len1 = Len - 1, - {{Msg, IsDelivered, AckTag, Len1}, - a(State #vqstate { q4 = Q4a, - ram_msg_count = RamMsgCount - 1, - out_counter = OutCount + 1, - index_state = IndexState2, - len = Len1, - persistent_count = PCount1, - pending_ack = PA1 })} + {{value, MsgStatus}, Q4a} -> + Fun(MsgStatus, State #vqstate { q4 = Q4a }) end. +read_msg(MsgStatus = #msg_status { msg = undefined, + guid = Guid, + is_persistent = IsPersistent }, + State = #vqstate { ram_msg_count = RamMsgCount, + msg_store_clients = MSCState}) -> + {{ok, Msg = #basic_message {}}, MSCState1} = + msg_store_read(MSCState, IsPersistent, Guid), + {MsgStatus #msg_status { msg = Msg }, + State #vqstate { ram_msg_count = RamMsgCount + 1, + msg_store_clients = MSCState1 }}; +read_msg(MsgStatus, State) -> + {MsgStatus, State}. + +internal_fetch(AckRequired, MsgStatus = #msg_status { + seq_id = SeqId, + guid = Guid, + msg = Msg, + is_persistent = IsPersistent, + is_delivered = IsDelivered, + msg_on_disk = MsgOnDisk, + index_on_disk = IndexOnDisk }, + State = #vqstate {ram_msg_count = RamMsgCount, + out_counter = OutCount, + index_state = IndexState, + msg_store_clients = MSCState, + len = Len, + persistent_count = PCount, + pending_ack = PA }) -> + %% 1. Mark it delivered if necessary + IndexState1 = maybe_write_delivered( + IndexOnDisk andalso not IsDelivered, + SeqId, IndexState), + + %% 2. Remove from msg_store and queue index, if necessary + Rem = fun () -> + ok = msg_store_remove(MSCState, IsPersistent, [Guid]) + end, + Ack = fun () -> rabbit_queue_index:ack([SeqId], IndexState1) end, + IndexState2 = + case {AckRequired, MsgOnDisk, IndexOnDisk, IsPersistent} of + {false, true, false, _} -> Rem(), IndexState1; + {false, true, true, _} -> Rem(), Ack(); + { true, true, true, false} -> Ack(); + _ -> IndexState1 + end, + + %% 3. If an ack is required, add something sensible to PA + {AckTag, PA1} = case AckRequired of + true -> PA2 = record_pending_ack( + MsgStatus #msg_status { + is_delivered = true }, PA), + {SeqId, PA2}; + false -> {blank_ack, PA} + end, + + PCount1 = PCount - one_if(IsPersistent andalso not AckRequired), + Len1 = Len - 1, + RamMsgCount1 = RamMsgCount - one_if(Msg =/= undefined), + + {{Msg, IsDelivered, AckTag, Len1}, + a(State #vqstate { ram_msg_count = RamMsgCount1, + out_counter = OutCount + 1, + index_state = IndexState2, + len = Len1, + persistent_count = PCount1, + pending_ack = PA1 })}. + ack(AckTags, State) -> - a(ack(fun rabbit_msg_store:remove/2, + a(ack(fun msg_store_remove/3, fun (_AckEntry, State1) -> State1 end, AckTags, State)). -tx_publish(Txn, Msg = #basic_message { is_persistent = IsPersistent }, +tx_publish(Txn, Msg = #basic_message { is_persistent = IsPersistent }, MsgProps, State = #vqstate { durable = IsDurable, msg_store_clients = MSCState }) -> Tx = #tx { pending_messages = Pubs } = lookup_tx(Txn), - store_tx(Txn, Tx #tx { pending_messages = [Msg | Pubs] }), - a(case IsPersistent andalso IsDurable of - true -> MsgStatus = msg_status(true, undefined, Msg), - {#msg_status { msg_on_disk = true }, MSCState1} = - maybe_write_msg_to_disk(false, MsgStatus, MSCState), - State #vqstate { msg_store_clients = MSCState1 }; - false -> State - end). + store_tx(Txn, Tx #tx { pending_messages = [{Msg, MsgProps} | Pubs] }), + case IsPersistent andalso IsDurable of + true -> MsgStatus = msg_status(true, undefined, Msg, MsgProps), + #msg_status { msg_on_disk = true } = + maybe_write_msg_to_disk(false, MsgStatus, MSCState); + false -> ok + end, + a(State). tx_ack(Txn, AckTags, State) -> Tx = #tx { pending_acks = Acks } = lookup_tx(Txn), store_tx(Txn, Tx #tx { pending_acks = [AckTags | Acks] }), State. -tx_rollback(Txn, State = #vqstate { durable = IsDurable }) -> +tx_rollback(Txn, State = #vqstate { durable = IsDurable, + msg_store_clients = MSCState }) -> #tx { pending_acks = AckTags, pending_messages = Pubs } = lookup_tx(Txn), erase_tx(Txn), ok = case IsDurable of - true -> rabbit_msg_store:remove(?PERSISTENT_MSG_STORE, - persistent_guids(Pubs)); + true -> msg_store_remove(MSCState, true, persistent_guids(Pubs)); false -> ok end, {lists:append(AckTags), a(State)}. -tx_commit(Txn, Fun, State = #vqstate { durable = IsDurable }) -> +tx_commit(Txn, Fun, MsgPropsFun, + State = #vqstate { durable = IsDurable, + msg_store_clients = MSCState }) -> #tx { pending_acks = AckTags, pending_messages = Pubs } = lookup_tx(Txn), erase_tx(Txn), - PubsOrdered = lists:reverse(Pubs), AckTags1 = lists:append(AckTags), - PersistentGuids = persistent_guids(PubsOrdered), + PersistentGuids = persistent_guids(Pubs), HasPersistentPubs = PersistentGuids =/= [], {AckTags1, a(case IsDurable andalso HasPersistentPubs of - true -> ok = rabbit_msg_store:sync( - ?PERSISTENT_MSG_STORE, PersistentGuids, - msg_store_callback(PersistentGuids, - PubsOrdered, AckTags1, Fun)), + true -> ok = msg_store_sync( + MSCState, true, PersistentGuids, + msg_store_callback(PersistentGuids, Pubs, AckTags1, + Fun, MsgPropsFun)), State; - false -> tx_commit_post_msg_store( - HasPersistentPubs, PubsOrdered, AckTags1, Fun, State) + false -> tx_commit_post_msg_store(HasPersistentPubs, Pubs, AckTags1, + Fun, MsgPropsFun, State) end)}. -requeue(AckTags, State) -> +requeue(AckTags, MsgPropsFun, State) -> a(reduce_memory_use( - ack(fun rabbit_msg_store:release/2, - fun (#msg_status { msg = Msg }, State1) -> - {_SeqId, State2} = publish(Msg, true, false, State1), + ack(fun msg_store_release/3, + fun (#msg_status { msg = Msg, msg_props = MsgProps }, State1) -> + {_SeqId, State2} = publish(Msg, MsgPropsFun(MsgProps), + true, false, State1), State2; - ({IsPersistent, Guid}, State1) -> + ({IsPersistent, Guid, MsgProps}, State1) -> #vqstate { msg_store_clients = MSCState } = State1, {{ok, Msg = #basic_message{}}, MSCState1} = - read_from_msg_store(MSCState, IsPersistent, Guid), + msg_store_read(MSCState, IsPersistent, Guid), State2 = State1 #vqstate { msg_store_clients = MSCState1 }, - {_SeqId, State3} = publish(Msg, true, true, State2), + {_SeqId, State3} = publish(Msg, MsgPropsFun(MsgProps), + true, true, State2), State3 end, AckTags, State))). @@ -662,8 +687,7 @@ set_ram_duration_target(DurationTarget, infinity -> infinity; _ -> trunc(DurationTarget * Rate) %% msgs = sec * msgs/sec end, - State1 = State #vqstate { target_ram_msg_count = TargetRamMsgCount1, - duration_target = DurationTarget }, + State1 = State #vqstate { target_ram_msg_count = TargetRamMsgCount1 }, a(case TargetRamMsgCount1 == infinity orelse (TargetRamMsgCount =/= infinity andalso TargetRamMsgCount1 >= TargetRamMsgCount) of @@ -678,7 +702,6 @@ ram_duration(State = #vqstate { in_counter = InCount, out_counter = OutCount, ram_msg_count = RamMsgCount, - duration_target = DurationTarget, ram_msg_count_prev = RamMsgCountPrev }) -> Now = now(), {AvgEgressRate, Egress1} = update_rate(Now, Timestamp, OutCount, Egress), @@ -691,18 +714,16 @@ ram_duration(State = #vqstate { (2 * (AvgEgressRate + AvgIngressRate)) end, - {Duration, set_ram_duration_target( - DurationTarget, - State #vqstate { - rates = Rates #rates { - egress = Egress1, - ingress = Ingress1, - avg_egress = AvgEgressRate, - avg_ingress = AvgIngressRate, - timestamp = Now }, - in_counter = 0, - out_counter = 0, - ram_msg_count_prev = RamMsgCount })}. + {Duration, State #vqstate { + rates = Rates #rates { + egress = Egress1, + ingress = Ingress1, + avg_egress = AvgEgressRate, + avg_ingress = AvgIngressRate, + timestamp = Now }, + in_counter = 0, + out_counter = 0, + ram_msg_count_prev = RamMsgCount }}. needs_idle_timeout(State = #vqstate { on_sync = ?BLANK_SYNC }) -> {Res, _State} = reduce_memory_use(fun (_Quota, State1) -> State1 end, @@ -790,27 +811,54 @@ one_if(false) -> 0. cons_if(true, E, L) -> [E | L]; cons_if(false, _E, L) -> L. -msg_status(IsPersistent, SeqId, Msg = #basic_message { guid = Guid }) -> +msg_status(IsPersistent, SeqId, Msg = #basic_message { guid = Guid }, + MsgProps) -> #msg_status { seq_id = SeqId, guid = Guid, msg = Msg, is_persistent = IsPersistent, is_delivered = false, - msg_on_disk = false, index_on_disk = false }. + msg_on_disk = false, index_on_disk = false, + msg_props = MsgProps }. + +with_msg_store_state({MSCStateP, MSCStateT}, true, Fun) -> + {Result, MSCStateP1} = Fun(MSCStateP), + {Result, {MSCStateP1, MSCStateT}}; +with_msg_store_state({MSCStateP, MSCStateT}, false, Fun) -> + {Result, MSCStateT1} = Fun(MSCStateT), + {Result, {MSCStateP, MSCStateT1}}. + +with_immutable_msg_store_state(MSCState, IsPersistent, Fun) -> + {Res, MSCState} = with_msg_store_state(MSCState, IsPersistent, + fun (MSCState1) -> + {Fun(MSCState1), MSCState1} + end), + Res. + +msg_store_client_init(MsgStore) -> + rabbit_msg_store:client_init(MsgStore, rabbit_guid:guid()). + +msg_store_write(MSCState, IsPersistent, Guid, Msg) -> + with_immutable_msg_store_state( + MSCState, IsPersistent, + fun (MSCState1) -> rabbit_msg_store:write(Guid, Msg, MSCState1) end). -find_msg_store(true) -> ?PERSISTENT_MSG_STORE; -find_msg_store(false) -> ?TRANSIENT_MSG_STORE. +msg_store_read(MSCState, IsPersistent, Guid) -> + with_msg_store_state( + MSCState, IsPersistent, + fun (MSCState1) -> rabbit_msg_store:read(Guid, MSCState1) end). -with_msg_store_state({{MSCStateP, PRef}, MSCStateT}, true, Fun) -> - {Result, MSCStateP1} = Fun(?PERSISTENT_MSG_STORE, MSCStateP), - {Result, {{MSCStateP1, PRef}, MSCStateT}}; -with_msg_store_state({MSCStateP, {MSCStateT, TRef}}, false, Fun) -> - {Result, MSCStateT1} = Fun(?TRANSIENT_MSG_STORE, MSCStateT), - {Result, {MSCStateP, {MSCStateT1, TRef}}}. +msg_store_remove(MSCState, IsPersistent, Guids) -> + with_immutable_msg_store_state( + MSCState, IsPersistent, + fun (MCSState1) -> rabbit_msg_store:remove(Guids, MCSState1) end). -read_from_msg_store(MSCState, IsPersistent, Guid) -> - with_msg_store_state( +msg_store_release(MSCState, IsPersistent, Guids) -> + with_immutable_msg_store_state( MSCState, IsPersistent, - fun (MsgStore, MSCState1) -> - rabbit_msg_store:read(MsgStore, Guid, MSCState1) - end). + fun (MCSState1) -> rabbit_msg_store:release(Guids, MCSState1) end). + +msg_store_sync(MSCState, IsPersistent, Guids, Callback) -> + with_immutable_msg_store_state( + MSCState, IsPersistent, + fun (MSCState1) -> rabbit_msg_store:sync(Guids, Callback, MSCState1) end). maybe_write_delivered(false, _SeqId, IndexState) -> IndexState; @@ -828,12 +876,13 @@ store_tx(Txn, Tx) -> put({txn, Txn}, Tx). erase_tx(Txn) -> erase({txn, Txn}). persistent_guids(Pubs) -> - [Guid || #basic_message { guid = Guid, is_persistent = true } <- Pubs]. + [Guid || {#basic_message { guid = Guid, + is_persistent = true }, _MsgProps} <- Pubs]. betas_from_index_entries(List, TransientThreshold, IndexState) -> {Filtered, Delivers, Acks} = lists:foldr( - fun ({Guid, SeqId, IsPersistent, IsDelivered}, + fun ({Guid, SeqId, MsgProps, IsPersistent, IsDelivered}, {Filtered1, Delivers1, Acks1}) -> case SeqId < TransientThreshold andalso not IsPersistent of true -> {Filtered1, @@ -845,7 +894,8 @@ betas_from_index_entries(List, TransientThreshold, IndexState) -> is_persistent = IsPersistent, is_delivered = IsDelivered, msg_on_disk = true, - index_on_disk = true + index_on_disk = true, + msg_props = MsgProps }) | Filtered1], Delivers1, Acks1} @@ -892,22 +942,69 @@ update_rate(Now, Then, Count, {OThen, OCount}) -> %% Internal major helpers for Public API %%---------------------------------------------------------------------------- -msg_store_callback(PersistentGuids, Pubs, AckTags, Fun) -> +init(IsDurable, IndexState, DeltaCount, Terms, + PersistentClient, TransientClient) -> + {LowSeqId, NextSeqId, IndexState1} = rabbit_queue_index:bounds(IndexState), + + DeltaCount1 = proplists:get_value(persistent_count, Terms, DeltaCount), + Delta = case DeltaCount1 == 0 andalso DeltaCount /= undefined of + true -> ?BLANK_DELTA; + false -> #delta { start_seq_id = LowSeqId, + count = DeltaCount1, + end_seq_id = NextSeqId } + end, + Now = now(), + State = #vqstate { + q1 = queue:new(), + q2 = bpqueue:new(), + delta = Delta, + q3 = bpqueue:new(), + q4 = queue:new(), + next_seq_id = NextSeqId, + pending_ack = dict:new(), + index_state = IndexState1, + msg_store_clients = {PersistentClient, TransientClient}, + on_sync = ?BLANK_SYNC, + durable = IsDurable, + transient_threshold = NextSeqId, + + len = DeltaCount1, + persistent_count = DeltaCount1, + + target_ram_msg_count = infinity, + ram_msg_count = 0, + ram_msg_count_prev = 0, + ram_index_count = 0, + out_counter = 0, + in_counter = 0, + rates = #rates { egress = {Now, 0}, + ingress = {Now, DeltaCount1}, + avg_egress = 0.0, + avg_ingress = 0.0, + timestamp = Now } }, + a(maybe_deltas_to_betas(State)). + +msg_store_callback(PersistentGuids, Pubs, AckTags, Fun, MsgPropsFun) -> Self = self(), F = fun () -> rabbit_amqqueue:maybe_run_queue_via_backing_queue( Self, fun (StateN) -> tx_commit_post_msg_store( - true, Pubs, AckTags, Fun, StateN) + true, Pubs, AckTags, + Fun, MsgPropsFun, StateN) end) end, fun () -> spawn(fun () -> ok = rabbit_misc:with_exit_handler( - fun () -> rabbit_msg_store:remove( - ?PERSISTENT_MSG_STORE, + fun () -> remove_persistent_messages( PersistentGuids) end, F) end) end. -tx_commit_post_msg_store(HasPersistentPubs, Pubs, AckTags, Fun, +remove_persistent_messages(Guids) -> + PersistentClient = msg_store_client_init(?PERSISTENT_MSG_STORE), + ok = rabbit_msg_store:remove(Guids, PersistentClient), + rabbit_msg_store:client_delete_and_terminate(PersistentClient). + +tx_commit_post_msg_store(HasPersistentPubs, Pubs, AckTags, Fun, MsgPropsFun, State = #vqstate { on_sync = OnSync = #sync { acks_persistent = SPAcks, @@ -920,23 +1017,27 @@ tx_commit_post_msg_store(HasPersistentPubs, Pubs, AckTags, Fun, case IsDurable of true -> [AckTag || AckTag <- AckTags, case dict:fetch(AckTag, PA) of - #msg_status {} -> false; - {IsPersistent, _Guid} -> IsPersistent + #msg_status {} -> + false; + {IsPersistent, _Guid, _MsgProps} -> + IsPersistent end]; false -> [] end, case IsDurable andalso (HasPersistentPubs orelse PersistentAcks =/= []) of - true -> State #vqstate { on_sync = #sync { - acks_persistent = [PersistentAcks | SPAcks], - acks_all = [AckTags | SAcks], - pubs = [Pubs | SPubs], - funs = [Fun | SFuns] }}; + true -> State #vqstate { + on_sync = #sync { + acks_persistent = [PersistentAcks | SPAcks], + acks_all = [AckTags | SAcks], + pubs = [{MsgPropsFun, Pubs} | SPubs], + funs = [Fun | SFuns] }}; false -> State1 = tx_commit_index( - State #vqstate { on_sync = #sync { - acks_persistent = [], - acks_all = [AckTags], - pubs = [Pubs], - funs = [Fun] } }), + State #vqstate { + on_sync = #sync { + acks_persistent = [], + acks_all = [AckTags], + pubs = [{MsgPropsFun, Pubs}], + funs = [Fun] } }), State1 #vqstate { on_sync = OnSync } end. @@ -950,13 +1051,16 @@ tx_commit_index(State = #vqstate { on_sync = #sync { durable = IsDurable }) -> PAcks = lists:append(SPAcks), Acks = lists:append(SAcks), - Pubs = lists:append(lists:reverse(SPubs)), + Pubs = [{Msg, Fun(MsgProps)} || {Fun, PubsN} <- lists:reverse(SPubs), + {Msg, MsgProps} <- lists:reverse(PubsN)], {SeqIds, State1 = #vqstate { index_state = IndexState }} = lists:foldl( - fun (Msg = #basic_message { is_persistent = IsPersistent }, + fun ({Msg = #basic_message { is_persistent = IsPersistent }, + MsgProps}, {SeqIdsAcc, State2}) -> IsPersistent1 = IsDurable andalso IsPersistent, - {SeqId, State3} = publish(Msg, false, IsPersistent1, State2), + {SeqId, State3} = + publish(Msg, MsgProps, false, IsPersistent1, State2), {cons_if(IsPersistent1, SeqId, SeqIdsAcc), State3} end, {PAcks, ack(Acks, State)}, Pubs), IndexState1 = rabbit_queue_index:sync(SeqIds, IndexState), @@ -965,13 +1069,14 @@ tx_commit_index(State = #vqstate { on_sync = #sync { State1 #vqstate { index_state = IndexState1, on_sync = ?BLANK_SYNC }). purge_betas_and_deltas(LensByStore, - State = #vqstate { q3 = Q3, - index_state = IndexState }) -> + State = #vqstate { q3 = Q3, + index_state = IndexState, + msg_store_clients = MSCState }) -> case bpqueue:is_empty(Q3) of true -> {LensByStore, State}; - false -> {LensByStore1, IndexState1} = remove_queue_entries( - fun beta_fold/3, Q3, - LensByStore, IndexState), + false -> {LensByStore1, IndexState1} = + remove_queue_entries(fun beta_fold/3, Q3, + LensByStore, IndexState, MSCState), purge_betas_and_deltas(LensByStore1, maybe_deltas_to_betas( State #vqstate { @@ -979,11 +1084,11 @@ purge_betas_and_deltas(LensByStore, index_state = IndexState1 })) end. -remove_queue_entries(Fold, Q, LensByStore, IndexState) -> +remove_queue_entries(Fold, Q, LensByStore, IndexState, MSCState) -> {GuidsByStore, Delivers, Acks} = Fold(fun remove_queue_entries1/2, {orddict:new(), [], []}, Q), - ok = orddict:fold(fun (MsgStore, Guids, ok) -> - rabbit_msg_store:remove(MsgStore, Guids) + ok = orddict:fold(fun (IsPersistent, Guids, ok) -> + msg_store_remove(MSCState, IsPersistent, Guids) end, ok, GuidsByStore), {sum_guids_by_store_to_len(LensByStore, GuidsByStore), rabbit_queue_index:ack(Acks, @@ -995,8 +1100,7 @@ remove_queue_entries1( index_on_disk = IndexOnDisk, is_persistent = IsPersistent }, {GuidsByStore, Delivers, Acks}) -> {case MsgOnDisk of - true -> rabbit_misc:orddict_cons(find_msg_store(IsPersistent), Guid, - GuidsByStore); + true -> rabbit_misc:orddict_cons(IsPersistent, Guid, GuidsByStore); false -> GuidsByStore end, cons_if(IndexOnDisk andalso not IsDelivered, SeqId, Delivers), @@ -1004,8 +1108,8 @@ remove_queue_entries1( sum_guids_by_store_to_len(LensByStore, GuidsByStore) -> orddict:fold( - fun (MsgStore, Guids, LensByStore1) -> - orddict:update_counter(MsgStore, length(Guids), LensByStore1) + fun (IsPersistent, Guids, LensByStore1) -> + orddict:update_counter(IsPersistent, length(Guids), LensByStore1) end, LensByStore, GuidsByStore). %%---------------------------------------------------------------------------- @@ -1013,7 +1117,7 @@ sum_guids_by_store_to_len(LensByStore, GuidsByStore) -> %%---------------------------------------------------------------------------- publish(Msg = #basic_message { is_persistent = IsPersistent }, - IsDelivered, MsgOnDisk, + MsgProps, IsDelivered, MsgOnDisk, State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4, next_seq_id = SeqId, len = Len, @@ -1022,8 +1126,8 @@ publish(Msg = #basic_message { is_persistent = IsPersistent }, durable = IsDurable, ram_msg_count = RamMsgCount }) -> IsPersistent1 = IsDurable andalso IsPersistent, - MsgStatus = (msg_status(IsPersistent1, SeqId, Msg)) - #msg_status { is_delivered = IsDelivered, msg_on_disk = MsgOnDisk }, + MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, MsgProps)) + #msg_status { is_delivered = IsDelivered, msg_on_disk = MsgOnDisk}, {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State), State2 = case bpqueue:is_empty(Q3) of false -> State1 #vqstate { q1 = queue:in(m(MsgStatus1), Q1) }; @@ -1037,38 +1141,35 @@ publish(Msg = #basic_message { is_persistent = IsPersistent }, ram_msg_count = RamMsgCount + 1}}. maybe_write_msg_to_disk(_Force, MsgStatus = #msg_status { - msg_on_disk = true }, MSCState) -> - {MsgStatus, MSCState}; + msg_on_disk = true }, _MSCState) -> + MsgStatus; maybe_write_msg_to_disk(Force, MsgStatus = #msg_status { msg = Msg, guid = Guid, is_persistent = IsPersistent }, MSCState) when Force orelse IsPersistent -> - {ok, MSCState1} = - with_msg_store_state( - MSCState, IsPersistent, - fun (MsgStore, MSCState2) -> - Msg1 = Msg #basic_message { - %% don't persist any recoverable decoded properties - content = rabbit_binary_parser:clear_decoded_content( - Msg #basic_message.content)}, - rabbit_msg_store:write(MsgStore, Guid, Msg1, MSCState2) - end), - {MsgStatus #msg_status { msg_on_disk = true }, MSCState1}; -maybe_write_msg_to_disk(_Force, MsgStatus, MSCState) -> - {MsgStatus, MSCState}. + Msg1 = Msg #basic_message { + %% don't persist any recoverable decoded properties + content = rabbit_binary_parser:clear_decoded_content( + Msg #basic_message.content)}, + ok = msg_store_write(MSCState, IsPersistent, Guid, Msg1), + MsgStatus #msg_status { msg_on_disk = true }; +maybe_write_msg_to_disk(_Force, MsgStatus, _MSCState) -> + MsgStatus. maybe_write_index_to_disk(_Force, MsgStatus = #msg_status { index_on_disk = true }, IndexState) -> true = MsgStatus #msg_status.msg_on_disk, %% ASSERTION {MsgStatus, IndexState}; maybe_write_index_to_disk(Force, MsgStatus = #msg_status { - guid = Guid, seq_id = SeqId, + guid = Guid, + seq_id = SeqId, is_persistent = IsPersistent, - is_delivered = IsDelivered }, IndexState) + is_delivered = IsDelivered, + msg_props = MsgProps}, IndexState) when Force orelse IsPersistent -> true = MsgStatus #msg_status.msg_on_disk, %% ASSERTION - IndexState1 = rabbit_queue_index:publish(Guid, SeqId, IsPersistent, - IndexState), + IndexState1 = rabbit_queue_index:publish( + Guid, SeqId, MsgProps, IsPersistent, IndexState), {MsgStatus #msg_status { index_on_disk = true }, maybe_write_delivered(IsDelivered, SeqId, IndexState1)}; maybe_write_index_to_disk(_Force, MsgStatus, IndexState) -> @@ -1077,43 +1178,44 @@ maybe_write_index_to_disk(_Force, MsgStatus, IndexState) -> maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus, State = #vqstate { index_state = IndexState, msg_store_clients = MSCState }) -> - {MsgStatus1, MSCState1} = maybe_write_msg_to_disk( - ForceMsg, MsgStatus, MSCState), - {MsgStatus2, IndexState1} = maybe_write_index_to_disk( - ForceIndex, MsgStatus1, IndexState), - {MsgStatus2, State #vqstate { index_state = IndexState1, - msg_store_clients = MSCState1 }}. + MsgStatus1 = maybe_write_msg_to_disk(ForceMsg, MsgStatus, MSCState), + {MsgStatus2, IndexState1} = + maybe_write_index_to_disk(ForceIndex, MsgStatus1, IndexState), + {MsgStatus2, State #vqstate { index_state = IndexState1 }}. %%---------------------------------------------------------------------------- %% Internal gubbins for acks %%---------------------------------------------------------------------------- -record_pending_ack(#msg_status { guid = Guid, seq_id = SeqId, +record_pending_ack(#msg_status { seq_id = SeqId, + guid = Guid, is_persistent = IsPersistent, - msg_on_disk = MsgOnDisk } = MsgStatus, PA) -> + msg_on_disk = MsgOnDisk, + msg_props = MsgProps } = MsgStatus, PA) -> AckEntry = case MsgOnDisk of - true -> {IsPersistent, Guid}; + true -> {IsPersistent, Guid, MsgProps}; false -> MsgStatus end, dict:store(SeqId, AckEntry, PA). remove_pending_ack(KeepPersistent, - State = #vqstate { pending_ack = PA, - index_state = IndexState }) -> + State = #vqstate { pending_ack = PA, + index_state = IndexState, + msg_store_clients = MSCState }) -> {SeqIds, GuidsByStore} = dict:fold(fun accumulate_ack/3, {[], orddict:new()}, PA), State1 = State #vqstate { pending_ack = dict:new() }, case KeepPersistent of - true -> case orddict:find(?TRANSIENT_MSG_STORE, GuidsByStore) of + true -> case orddict:find(false, GuidsByStore) of error -> State1; - {ok, Guids} -> ok = rabbit_msg_store:remove( - ?TRANSIENT_MSG_STORE, Guids), + {ok, Guids} -> ok = msg_store_remove(MSCState, false, + Guids), State1 end; false -> IndexState1 = rabbit_queue_index:ack(SeqIds, IndexState), ok = orddict:fold( - fun (MsgStore, Guids, ok) -> - rabbit_msg_store:remove(MsgStore, Guids) + fun (IsPersistent, Guids, ok) -> + msg_store_remove(MSCState, IsPersistent, Guids) end, ok, GuidsByStore), State1 #vqstate { index_state = IndexState1 } end. @@ -1121,18 +1223,20 @@ remove_pending_ack(KeepPersistent, ack(_MsgStoreFun, _Fun, [], State) -> State; ack(MsgStoreFun, Fun, AckTags, State) -> - {{SeqIds, GuidsByStore}, State1 = #vqstate { index_state = IndexState, - persistent_count = PCount }} = + {{SeqIds, GuidsByStore}, + State1 = #vqstate { index_state = IndexState, + msg_store_clients = MSCState, + persistent_count = PCount }} = lists:foldl( fun (SeqId, {Acc, State2 = #vqstate { pending_ack = PA }}) -> - {ok, AckEntry} = dict:find(SeqId, PA), + AckEntry = dict:fetch(SeqId, PA), {accumulate_ack(SeqId, AckEntry, Acc), Fun(AckEntry, State2 #vqstate { pending_ack = dict:erase(SeqId, PA) })} end, {{[], orddict:new()}, State}, AckTags), IndexState1 = rabbit_queue_index:ack(SeqIds, IndexState), - ok = orddict:fold(fun (MsgStore, Guids, ok) -> - MsgStoreFun(MsgStore, Guids) + ok = orddict:fold(fun (IsPersistent, Guids, ok) -> + MsgStoreFun(MSCState, IsPersistent, Guids) end, ok, GuidsByStore), PCount1 = PCount - find_persistent_count(sum_guids_by_store_to_len( orddict:new(), GuidsByStore)), @@ -1143,12 +1247,12 @@ accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS msg_on_disk = false, index_on_disk = false }, Acc) -> Acc; -accumulate_ack(SeqId, {IsPersistent, Guid}, {SeqIdsAcc, Dict}) -> +accumulate_ack(SeqId, {IsPersistent, Guid, _MsgProps}, {SeqIdsAcc, Dict}) -> {cons_if(IsPersistent, SeqId, SeqIdsAcc), - rabbit_misc:orddict_cons(find_msg_store(IsPersistent), Guid, Dict)}. + rabbit_misc:orddict_cons(IsPersistent, Guid, Dict)}. find_persistent_count(LensByStore) -> - case orddict:find(?PERSISTENT_MSG_STORE, LensByStore) of + case orddict:find(true, LensByStore) of error -> 0; {ok, Len} -> Len end. @@ -1245,40 +1349,33 @@ chunk_size(Current, Permitted) chunk_size(Current, Permitted) -> lists:min([Current - Permitted, ?IO_BATCH_SIZE]). -fetch_from_q3_to_q4(State = #vqstate { - q1 = Q1, - q2 = Q2, - delta = #delta { count = DeltaCount }, - q3 = Q3, - q4 = Q4, - ram_msg_count = RamMsgCount, - ram_index_count = RamIndexCount, - msg_store_clients = MSCState }) -> +fetch_from_q3(State = #vqstate { + q1 = Q1, + q2 = Q2, + delta = #delta { count = DeltaCount }, + q3 = Q3, + q4 = Q4, + ram_index_count = RamIndexCount}) -> case bpqueue:out(Q3) of {empty, _Q3} -> {empty, State}; - {{value, IndexOnDisk, MsgStatus = #msg_status { - msg = undefined, guid = Guid, - is_persistent = IsPersistent }}, Q3a} -> - {{ok, Msg = #basic_message {}}, MSCState1} = - read_from_msg_store(MSCState, IsPersistent, Guid), - Q4a = queue:in(m(MsgStatus #msg_status { msg = Msg }), Q4), + {{value, IndexOnDisk, MsgStatus}, Q3a} -> RamIndexCount1 = RamIndexCount - one_if(not IndexOnDisk), true = RamIndexCount1 >= 0, %% ASSERTION - State1 = State #vqstate { q3 = Q3a, - q4 = Q4a, - ram_msg_count = RamMsgCount + 1, - ram_index_count = RamIndexCount1, - msg_store_clients = MSCState1 }, + State1 = State #vqstate { q3 = Q3a, + ram_index_count = RamIndexCount1 }, State2 = case {bpqueue:is_empty(Q3a), 0 == DeltaCount} of {true, true} -> %% q3 is now empty, it wasn't before; delta is - %% still empty. So q2 must be empty, and q1 - %% can now be joined onto q4 + %% still empty. So q2 must be empty, and we + %% know q4 is empty otherwise we wouldn't be + %% loading from q3. As such, we can just set + %% q4 to Q1. true = bpqueue:is_empty(Q2), %% ASSERTION + true = queue:is_empty(Q4), %% ASSERTION State1 #vqstate { q1 = queue:new(), - q4 = queue:join(Q4a, Q1) }; + q4 = Q1 }; {true, false} -> maybe_deltas_to_betas(State1); {false, _} -> @@ -1287,7 +1384,7 @@ fetch_from_q3_to_q4(State = #vqstate { %% delta and q3 are maintained State1 end, - {loaded, State2} + {loaded, {MsgStatus, State2}} end. maybe_deltas_to_betas(State = #vqstate { delta = ?BLANK_DELTA_PATTERN(X) }) -> @@ -1297,46 +1394,40 @@ maybe_deltas_to_betas(State = #vqstate { delta = Delta, q3 = Q3, index_state = IndexState, - target_ram_msg_count = TargetRamMsgCount, transient_threshold = TransientThreshold }) -> - case bpqueue:is_empty(Q3) orelse (TargetRamMsgCount /= 0) of - false -> - State; - true -> - #delta { start_seq_id = DeltaSeqId, - count = DeltaCount, - end_seq_id = DeltaSeqIdEnd } = Delta, - DeltaSeqId1 = - lists:min([rabbit_queue_index:next_segment_boundary(DeltaSeqId), - DeltaSeqIdEnd]), - {List, IndexState1} = - rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1, IndexState), - {Q3a, IndexState2} = betas_from_index_entries( - List, TransientThreshold, IndexState1), - State1 = State #vqstate { index_state = IndexState2 }, - case bpqueue:len(Q3a) of + #delta { start_seq_id = DeltaSeqId, + count = DeltaCount, + end_seq_id = DeltaSeqIdEnd } = Delta, + DeltaSeqId1 = + lists:min([rabbit_queue_index:next_segment_boundary(DeltaSeqId), + DeltaSeqIdEnd]), + {List, IndexState1} = + rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1, IndexState), + {Q3a, IndexState2} = + betas_from_index_entries(List, TransientThreshold, IndexState1), + State1 = State #vqstate { index_state = IndexState2 }, + case bpqueue:len(Q3a) of + 0 -> + %% we ignored every message in the segment due to it being + %% transient and below the threshold + maybe_deltas_to_betas( + State1 #vqstate { + delta = Delta #delta { start_seq_id = DeltaSeqId1 }}); + Q3aLen -> + Q3b = bpqueue:join(Q3, Q3a), + case DeltaCount - Q3aLen of 0 -> - %% we ignored every message in the segment due to - %% it being transient and below the threshold - maybe_deltas_to_betas( - State #vqstate { - delta = Delta #delta { start_seq_id = DeltaSeqId1 }}); - Q3aLen -> - Q3b = bpqueue:join(Q3, Q3a), - case DeltaCount - Q3aLen of - 0 -> - %% delta is now empty, but it wasn't - %% before, so can now join q2 onto q3 - State1 #vqstate { q2 = bpqueue:new(), - delta = ?BLANK_DELTA, - q3 = bpqueue:join(Q3b, Q2) }; - N when N > 0 -> - Delta1 = #delta { start_seq_id = DeltaSeqId1, - count = N, - end_seq_id = DeltaSeqIdEnd }, - State1 #vqstate { delta = Delta1, - q3 = Q3b } - end + %% delta is now empty, but it wasn't before, so + %% can now join q2 onto q3 + State1 #vqstate { q2 = bpqueue:new(), + delta = ?BLANK_DELTA, + q3 = bpqueue:join(Q3b, Q2) }; + N when N > 0 -> + Delta1 = #delta { start_seq_id = DeltaSeqId1, + count = N, + end_seq_id = DeltaSeqIdEnd }, + State1 #vqstate { delta = Delta1, + q3 = Q3b } end end. diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl index aa986e54..50bca390 100644 --- a/src/rabbit_writer.erl +++ b/src/rabbit_writer.erl @@ -62,9 +62,10 @@ (pid(), rabbit_framing:amqp_method_record(), rabbit_types:content()) -> 'ok'). -spec(send_command_sync/2 :: - (pid(), rabbit_framing:amqp_method()) -> 'ok'). + (pid(), rabbit_framing:amqp_method_record()) -> 'ok'). -spec(send_command_sync/3 :: - (pid(), rabbit_framing:amqp_method(), rabbit_types:content()) -> 'ok'). + (pid(), rabbit_framing:amqp_method_record(), rabbit_types:content()) + -> 'ok'). -spec(send_command_and_notify/5 :: (pid(), pid(), pid(), rabbit_framing:amqp_method_record(), rabbit_types:content()) diff --git a/src/supervisor2.erl b/src/supervisor2.erl index 93adfcb1..46bab31d 100644 --- a/src/supervisor2.erl +++ b/src/supervisor2.erl @@ -357,8 +357,7 @@ handle_cast({delayed_restart, {RestartType, Reason, Child}}, State) when ?is_simple(State) -> {ok, NState} = do_restart(RestartType, Reason, Child, State), {noreply, NState}; -handle_cast({delayed_restart, {RestartType, Reason, Child}}, State) - when not (?is_simple(State)) -> +handle_cast({delayed_restart, {RestartType, Reason, Child}}, State) -> case get_child(Child#child.name, State) of {value, Child} -> {ok, NState} = do_restart(RestartType, Reason, Child, State), |