diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2010-11-02 11:02:53 +0000 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2010-11-02 11:02:53 +0000 |
commit | b59f0f574a068ea65ddace332451ec56ed76ef3b (patch) | |
tree | 1cf9e95d94ee28060386847001c1365263eefe20 | |
parent | 94058959138d8643464e7a6b8d76d3e9becee64c (diff) | |
parent | 92e7f5bdc6efcae672a295e025ea6ce3d88a7125 (diff) | |
download | rabbitmq-server-bug23411.tar.gz |
Merging default into bug23411bug23411
33 files changed, 1420 insertions, 981 deletions
diff --git a/docs/html-to-website-xml.xsl b/docs/html-to-website-xml.xsl index c325bb5a..ec8f87e5 100644 --- a/docs/html-to-website-xml.xsl +++ b/docs/html-to-website-xml.xsl @@ -17,7 +17,7 @@ <!-- Copy the root node, and munge the outer part of the page --> <xsl:template match="/html"> <xsl:processing-instruction name="xml-stylesheet">type="text/xml" href="page.xsl"</xsl:processing-instruction> -<html xmlns:doc="http://www.rabbitmq.com/namespaces/ad-hoc/doc"> +<html xmlns:doc="http://www.rabbitmq.com/namespaces/ad-hoc/doc" xmlns="http://www.w3.org/1999/xhtml"> <head> <title><xsl:value-of select="document($original)/refentry/refnamediv/refname"/><xsl:if test="document($original)/refentry/refmeta/manvolnum">(<xsl:value-of select="document($original)/refentry/refmeta/manvolnum"/>)</xsl:if> manual page</title> </head> @@ -42,7 +42,7 @@ </xsl:choose> <p> For more general documentation, please see the - <a href="admin-guide.html">administrator's guide</a>. + <a href="../admin-guide.html">administrator's guide</a>. </p> <doc:toc class="compact"> diff --git a/docs/rabbitmq-server.1.xml b/docs/rabbitmq-server.1.xml index 921da4f1..03e76c79 100644 --- a/docs/rabbitmq-server.1.xml +++ b/docs/rabbitmq-server.1.xml @@ -98,18 +98,6 @@ Defaults to 5672. </listitem> </varlistentry> - <varlistentry> - <term>RABBITMQ_CLUSTER_CONFIG_FILE</term> - <listitem> - <para> -Defaults to <filename>/etc/rabbitmq/rabbitmq_cluster.config</filename>. If this file is -present it is used by the server to auto-configure a RabbitMQ cluster. -See the <ulink url="http://www.rabbitmq.com/clustering.html">clustering guide</ulink> -for details. - </para> - </listitem> - </varlistentry> - </variablelist> </refsect1> diff --git a/docs/rabbitmq-service.xml b/docs/rabbitmq-service.xml index 2b416e3e..e95f9889 100644 --- a/docs/rabbitmq-service.xml +++ b/docs/rabbitmq-service.xml @@ -193,18 +193,6 @@ manager. </varlistentry> <varlistentry> - <term>RABBITMQ_CLUSTER_CONFIG_FILE</term> - <listitem> - <para> -If this file is -present it is used by the server to auto-configure a RabbitMQ cluster. -See the <ulink url="http://www.rabbitmq.com/clustering.html">clustering guide</ulink> -for details. - </para> - </listitem> - </varlistentry> - - <varlistentry> <term>RABBITMQ_CONSOLE_LOG</term> <listitem> <para> diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index 3b7244c7..acb99bc8 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -589,7 +589,7 @@ </varlistentry> <varlistentry> - <term><cmdsynopsis><command>set_permissions</command> <arg choice="opt">-p <replaceable>vhostpath</replaceable></arg> <arg choice="opt">-s <replaceable>scope</replaceable></arg> <arg choice="req"><replaceable>user</replaceable></arg> <arg choice="req"><replaceable>conf</replaceable></arg> <arg choice="req"><replaceable>write</replaceable></arg> <arg choice="req"><replaceable>read</replaceable></arg></cmdsynopsis></term> + <term><cmdsynopsis><command>set_permissions</command> <arg choice="opt">-p <replaceable>vhostpath</replaceable></arg> <arg choice="req"><replaceable>user</replaceable></arg> <arg choice="req"><replaceable>conf</replaceable></arg> <arg choice="req"><replaceable>write</replaceable></arg> <arg choice="req"><replaceable>read</replaceable></arg></cmdsynopsis></term> <listitem> <variablelist> <varlistentry> @@ -597,16 +597,6 @@ <listitem><para>The name of the virtual host to which to grant the user access, defaulting to <command>/</command>.</para></listitem> </varlistentry> <varlistentry> - <term>scope</term> - <listitem><para>Scope of the permissions: either - <command>client</command> (the default) or - <command>all</command>. This determines whether - permissions are checked for server-generated resource - names (<command>all</command>) or only for - client-specified resource names - (<command>client</command>).</para></listitem> - </varlistentry> - <varlistentry> <term>user</term> <listitem><para>The name of the user to grant access to the specified virtual host.</para></listitem> </varlistentry> diff --git a/docs/remove-namespaces.xsl b/docs/remove-namespaces.xsl index 58a1e826..7f7f3c12 100644 --- a/docs/remove-namespaces.xsl +++ b/docs/remove-namespaces.xsl @@ -1,13 +1,14 @@ <?xml version='1.0'?> <xsl:stylesheet xmlns:xsl="http://www.w3.org/1999/XSL/Transform" xmlns:doc="http://www.rabbitmq.com/namespaces/ad-hoc/doc" + xmlns="http://www.w3.org/1999/xhtml" version='1.0'> <xsl:output method="xml" /> <!-- Copy every element through with local name only --> <xsl:template match="*"> - <xsl:element name="{local-name()}"> + <xsl:element name="{local-name()}" namespace=""> <xsl:apply-templates select="@*|node()"/> </xsl:element> </xsl:template> diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in index 4be09c5a..17d05a99 100644 --- a/ebin/rabbit_app.in +++ b/ebin/rabbit_app.in @@ -29,4 +29,6 @@ {default_user_is_admin, true}, {default_vhost, <<"/">>}, {default_permissions, [<<".*">>, <<".*">>, <<".*">>]}, + {cluster_nodes, []}, + {server_properties, []}, {collect_statistics, none}]}]}. diff --git a/include/rabbit.hrl b/include/rabbit.hrl index af6e257a..4d4f6fe6 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -30,7 +30,7 @@ %% -record(user, {username, password, is_admin}). --record(permission, {scope, configure, write, read}). +-record(permission, {configure, write, read}). -record(user_vhost, {username, virtual_host}). -record(user_permission, {user_vhost, permission}). @@ -63,7 +63,7 @@ -record(binding, {source, key, destination, args = []}). -record(reverse_binding, {destination, key, source, args = []}). --record(listener, {node, protocol, host, port}). +-record(listener, {node, protocol, host, ip_address, port}). -record(basic_message, {exchange_name, routing_key, content, guid, is_persistent}). @@ -74,6 +74,8 @@ -record(event, {type, props, timestamp}). +-record(message_properties, {expiry}). + %%---------------------------------------------------------------------------- -define(COPYRIGHT_MESSAGE, "Copyright (C) 2007-2010 LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd."). diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl index 38c6f939..20230b24 100644 --- a/include/rabbit_backing_queue_spec.hrl +++ b/include/rabbit_backing_queue_spec.hrl @@ -31,12 +31,15 @@ -type(fetch_result() :: ('empty' | - %% Message, IsDelivered, AckTag, RemainingLen + %% Message, IsDelivered, AckTag, Remaining_Len {rabbit_types:basic_message(), boolean(), ack(), non_neg_integer()})). -type(is_durable() :: boolean()). -type(attempt_recovery() :: boolean()). -type(purged_msg_count() :: non_neg_integer()). -type(ack_required() :: boolean()). +-type(message_properties_transformer() :: + fun ((rabbit_types:message_properties()) + -> rabbit_types:message_properties())). -spec(start/1 :: ([rabbit_amqqueue:name()]) -> 'ok'). -spec(stop/0 :: () -> 'ok'). @@ -45,19 +48,25 @@ -spec(terminate/1 :: (state()) -> state()). -spec(delete_and_terminate/1 :: (state()) -> state()). -spec(purge/1 :: (state()) -> {purged_msg_count(), state()}). --spec(publish/2 :: (rabbit_types:basic_message(), state()) -> state()). --spec(publish_delivered/3 :: - (ack_required(), rabbit_types:basic_message(), state()) -> - {ack(), state()}). +-spec(publish/3 :: (rabbit_types:basic_message(), + rabbit_types:message_properties(), state()) -> state()). +-spec(publish_delivered/4 :: (ack_required(), rabbit_types:basic_message(), + rabbit_types:message_properties(), state()) + -> {ack(), state()}). +-spec(dropwhile/2 :: + (fun ((rabbit_types:message_properties()) -> boolean()), state()) + -> state()). -spec(fetch/2 :: (ack_required(), state()) -> {fetch_result(), state()}). -spec(ack/2 :: ([ack()], state()) -> state()). --spec(tx_publish/3 :: (rabbit_types:txn(), rabbit_types:basic_message(), - state()) -> state()). +-spec(tx_publish/4 :: (rabbit_types:txn(), rabbit_types:basic_message(), + rabbit_types:message_properties(), state()) -> state()). -spec(tx_ack/3 :: (rabbit_types:txn(), [ack()], state()) -> state()). -spec(tx_rollback/2 :: (rabbit_types:txn(), state()) -> {[ack()], state()}). --spec(tx_commit/3 :: (rabbit_types:txn(), fun (() -> any()), state()) -> - {[ack()], state()}). --spec(requeue/2 :: ([ack()], state()) -> state()). +-spec(tx_commit/4 :: + (rabbit_types:txn(), fun (() -> any()), + message_properties_transformer(), state()) -> {[ack()], state()}). +-spec(requeue/3 :: ([ack()], message_properties_transformer(), state()) + -> state()). -spec(len/1 :: (state()) -> non_neg_integer()). -spec(is_empty/1 :: (state()) -> boolean()). -spec(set_ram_duration_target/2 :: diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec index eb0a2a51..209a90ee 100644 --- a/packaging/RPMS/Fedora/rabbitmq-server.spec +++ b/packaging/RPMS/Fedora/rabbitmq-server.spec @@ -127,6 +127,9 @@ done rm -rf %{buildroot} %changelog +* Tue Oct 19 2010 vlad@rabbitmq.com 2.1.1-1 +- New Upstream Release + * Tue Sep 14 2010 marek@rabbitmq.com 2.1.0-1 - New Upstream Release diff --git a/packaging/debs/Debian/debian/changelog b/packaging/debs/Debian/debian/changelog index 9927cfbc..e81fda24 100644 --- a/packaging/debs/Debian/debian/changelog +++ b/packaging/debs/Debian/debian/changelog @@ -1,3 +1,9 @@ +rabbitmq-server (2.1.1-1) lucid; urgency=low + + * New Upstream Release + + -- Vlad Alexandru Ionescu <vlad@rabbitmq.com> Tue, 19 Oct 2010 17:20:10 +0100 + rabbitmq-server (2.1.0-1) lucid; urgency=low * New Upstream Release diff --git a/packaging/macports/Portfile.in b/packaging/macports/Portfile.in index f30460d3..e37a45b3 100644 --- a/packaging/macports/Portfile.in +++ b/packaging/macports/Portfile.in @@ -75,7 +75,7 @@ post-destroot { reinplace -E "s:(/etc/rabbitmq/rabbitmq.conf):${prefix}\\1:g" \ ${realsbin}/rabbitmq-env - foreach var {CONFIG_FILE CLUSTER_CONFIG_FILE LOG_BASE MNESIA_BASE PIDS_FILE} { + foreach var {CONFIG_FILE LOG_BASE MNESIA_BASE PIDS_FILE} { reinplace -E "s:^($var)=/:\\1=${prefix}/:" \ ${realsbin}/rabbitmq-multi \ ${realsbin}/rabbitmq-server \ diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server index 8e26663a..ef0a3521 100755 --- a/scripts/rabbitmq-server +++ b/scripts/rabbitmq-server @@ -35,7 +35,6 @@ NODENAME=rabbit@${HOSTNAME%%.*} SERVER_ERL_ARGS="+K true +A30 +P 1048576 \ -kernel inet_default_listen_options [{nodelay,true}] \ -kernel inet_default_connect_options [{nodelay,true}]" -CLUSTER_CONFIG_FILE=/etc/rabbitmq/rabbitmq_cluster.config CONFIG_FILE=/etc/rabbitmq/rabbitmq LOG_BASE=/var/log/rabbitmq MNESIA_BASE=/var/lib/rabbitmq/mnesia @@ -59,7 +58,6 @@ else fi [ "x" = "x$RABBITMQ_NODENAME" ] && RABBITMQ_NODENAME=${NODENAME} [ "x" = "x$RABBITMQ_SERVER_ERL_ARGS" ] && RABBITMQ_SERVER_ERL_ARGS=${SERVER_ERL_ARGS} -[ "x" = "x$RABBITMQ_CLUSTER_CONFIG_FILE" ] && RABBITMQ_CLUSTER_CONFIG_FILE=${CLUSTER_CONFIG_FILE} [ "x" = "x$RABBITMQ_CONFIG_FILE" ] && RABBITMQ_CONFIG_FILE=${CONFIG_FILE} [ "x" = "x$RABBITMQ_LOG_BASE" ] && RABBITMQ_LOG_BASE=${LOG_BASE} [ "x" = "x$RABBITMQ_MNESIA_BASE" ] && RABBITMQ_MNESIA_BASE=${MNESIA_BASE} diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat index 5bcbc6ba..193f1c8a 100644 --- a/scripts/rabbitmq-server.bat +++ b/scripts/rabbitmq-server.bat @@ -165,7 +165,6 @@ if not "!RABBITMQ_NODE_IP_ADDRESS!"=="" ( -os_mon start_disksup false ^
-os_mon start_memsup false ^
-mnesia dir \""!RABBITMQ_MNESIA_DIR!"\" ^
-!CLUSTER_CONFIG! ^
!RABBITMQ_SERVER_START_ARGS! ^
!STAR!
diff --git a/scripts/rabbitmq-service.bat b/scripts/rabbitmq-service.bat index 4b3961d4..51ef6ecc 100644 --- a/scripts/rabbitmq-service.bat +++ b/scripts/rabbitmq-service.bat @@ -232,7 +232,6 @@ set ERLANG_SERVICE_ARGUMENTS= ^ -os_mon start_disksup false ^
-os_mon start_memsup false ^
-mnesia dir \""!RABBITMQ_MNESIA_DIR!"\" ^
-!CLUSTER_CONFIG! ^
!RABBITMQ_SERVER_START_ARGS! ^
!STAR!
diff --git a/src/delegate.erl b/src/delegate.erl index c8aa3092..e50b99f1 100644 --- a/src/delegate.erl +++ b/src/delegate.erl @@ -35,7 +35,7 @@ -behaviour(gen_server2). --export([start_link/1, invoke_no_result/2, invoke/2, process_count/0]). +-export([start_link/2, invoke_no_result/2, invoke/2, process_count/0]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -44,7 +44,8 @@ -ifdef(use_specs). --spec(start_link/1 :: (non_neg_integer()) -> {'ok', pid()} | {'error', any()}). +-spec(start_link/2 :: + (atom(), non_neg_integer()) -> {'ok', pid()} | {'error', any()}). -spec(invoke_no_result/2 :: (pid() | [pid()], fun ((pid()) -> any())) -> 'ok'). -spec(invoke/2 :: (pid() | [pid()], fun ((pid()) -> A)) -> A). @@ -60,8 +61,8 @@ %%---------------------------------------------------------------------------- -start_link(Hash) -> - gen_server2:start_link({local, server(Hash)}, ?MODULE, [], []). +start_link(Prefix, Hash) -> + gen_server2:start_link({local, server(Prefix, Hash)}, ?MODULE, [], []). invoke(Pid, Fun) when is_pid(Pid) -> [Res] = invoke_per_node(split_delegate_per_node([Pid]), Fun), @@ -147,7 +148,8 @@ delegate_per_remote_node(NodePids, Fun, DelegateFun) -> local_server(Node) -> case get({delegate_local_server_name, Node}) of undefined -> - Name = server(erlang:phash2({self(), Node}, process_count())), + Name = server(outgoing, + erlang:phash2({self(), Node}, process_count())), put({delegate_local_server_name, Node}, Name), Name; Name -> Name @@ -160,17 +162,20 @@ remote_server(Node) -> {badrpc, _} -> %% Have to return something, if we're just casting %% then we don't want to blow up - server(1); + server(incoming, 1); Count -> - Name = server(erlang:phash2({self(), Node}, Count)), + Name = server(incoming, + erlang:phash2({self(), Node}, Count)), put({delegate_remote_server_name, Node}, Name), Name end; Name -> Name end. -server(Hash) -> - list_to_atom("delegate_process_" ++ integer_to_list(Hash)). +server(Prefix, Hash) -> + list_to_atom("delegate_" ++ + atom_to_list(Prefix) ++ "_" ++ + integer_to_list(Hash)). safe_invoke(Pids, Fun) when is_list(Pids) -> [safe_invoke(Pid, Fun) || Pid <- Pids]; diff --git a/src/delegate_sup.erl b/src/delegate_sup.erl index ff303ee2..544546f1 100644 --- a/src/delegate_sup.erl +++ b/src/delegate_sup.erl @@ -55,9 +55,11 @@ start_link() -> %%---------------------------------------------------------------------------- init(_Args) -> - {ok, {{one_for_one, 10, 10}, - [{Hash, {delegate, start_link, [Hash]}, - transient, 16#ffffffff, worker, [delegate]} || - Hash <- lists:seq(0, delegate:process_count() - 1)]}}. + {ok, {{one_for_one, 10, 10}, specs(incoming) ++ specs(outgoing)}}. + +specs(Prefix) -> + [{{Prefix, Hash}, {delegate, start_link, [Prefix, Hash]}, + transient, 16#ffffffff, worker, [delegate]} || + Hash <- lists:seq(0, delegate:process_count() - 1)]. %%---------------------------------------------------------------------------- diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl index 73fd6f0e..85452abf 100644 --- a/src/rabbit_access_control.erl +++ b/src/rabbit_access_control.erl @@ -38,7 +38,7 @@ -export([add_user/2, delete_user/1, change_password/2, set_admin/1, clear_admin/1, list_users/0, lookup_user/1]). -export([add_vhost/1, delete_vhost/1, vhost_exists/1, list_vhosts/0]). --export([set_permissions/5, set_permissions/6, clear_permissions/2, +-export([set_permissions/5, clear_permissions/2, list_permissions/0, list_vhost_permissions/1, list_user_permissions/1, list_user_vhost_permissions/2]). @@ -52,9 +52,6 @@ -type(username() :: binary()). -type(password() :: binary()). -type(regexp() :: binary()). --type(scope() :: binary()). --type(scope_atom() :: 'client' | 'all'). - -spec(check_login/2 :: (binary(), binary()) -> rabbit_types:user() | rabbit_types:channel_exit()). @@ -82,21 +79,15 @@ -spec(list_vhosts/0 :: () -> [rabbit_types:vhost()]). -spec(set_permissions/5 ::(username(), rabbit_types:vhost(), regexp(), regexp(), regexp()) -> 'ok'). --spec(set_permissions/6 ::(scope(), username(), rabbit_types:vhost(), - regexp(), regexp(), regexp()) -> 'ok'). -spec(clear_permissions/2 :: (username(), rabbit_types:vhost()) -> 'ok'). -spec(list_permissions/0 :: - () -> [{username(), rabbit_types:vhost(), regexp(), regexp(), regexp(), - scope_atom()}]). + () -> [{username(), rabbit_types:vhost(), regexp(), regexp(), regexp()}]). -spec(list_vhost_permissions/1 :: - (rabbit_types:vhost()) -> [{username(), regexp(), regexp(), regexp(), - scope_atom()}]). + (rabbit_types:vhost()) -> [{username(), regexp(), regexp(), regexp()}]). -spec(list_user_permissions/1 :: - (username()) -> [{rabbit_types:vhost(), regexp(), regexp(), regexp(), - scope_atom()}]). + (username()) -> [{rabbit_types:vhost(), regexp(), regexp(), regexp()}]). -spec(list_user_vhost_permissions/2 :: - (username(), rabbit_types:vhost()) -> [{regexp(), regexp(), regexp(), - scope_atom()}]). + (username(), rabbit_types:vhost()) -> [{regexp(), regexp(), regexp()}]). -endif. @@ -188,20 +179,15 @@ check_resource_access(Username, [] -> false; [#user_permission{permission = P}] -> - case {Name, P} of - {<<"amq.gen",_/binary>>, #permission{scope = client}} -> - true; - _ -> - PermRegexp = - case element(permission_index(Permission), P) of - %% <<"^$">> breaks Emacs' erlang mode - <<"">> -> <<$^, $$>>; - RE -> RE - end, - case re:run(Name, PermRegexp, [{capture, none}]) of - match -> true; - nomatch -> false - end + PermRegexp = + case element(permission_index(Permission), P) of + %% <<"^$">> breaks Emacs' erlang mode + <<"">> -> <<$^, $$>>; + RE -> RE + end, + case re:run(Name, PermRegexp, [{capture, none}]) of + match -> true; + nomatch -> false end end, if Res -> ok; @@ -334,7 +320,7 @@ internal_delete_vhost(VHostPath) -> ok = rabbit_exchange:delete(Name, false) end, rabbit_exchange:list(VHostPath)), - lists:foreach(fun ({Username, _, _, _, _}) -> + lists:foreach(fun ({Username, _, _, _}) -> ok = clear_permissions(Username, VHostPath) end, list_vhost_permissions(VHostPath)), @@ -355,16 +341,7 @@ validate_regexp(RegexpBin) -> end. set_permissions(Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm) -> - set_permissions(<<"client">>, Username, VHostPath, ConfigurePerm, - WritePerm, ReadPerm). - -set_permissions(ScopeBin, Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm) -> lists:map(fun validate_regexp/1, [ConfigurePerm, WritePerm, ReadPerm]), - Scope = case ScopeBin of - <<"client">> -> client; - <<"all">> -> all; - _ -> throw({error, {invalid_scope, ScopeBin}}) - end, rabbit_misc:execute_mnesia_transaction( rabbit_misc:with_user_and_vhost( Username, VHostPath, @@ -374,7 +351,6 @@ set_permissions(ScopeBin, Username, VHostPath, ConfigurePerm, WritePerm, ReadPer username = Username, virtual_host = VHostPath}, permission = #permission{ - scope = Scope, configure = ConfigurePerm, write = WritePerm, read = ReadPerm}}, @@ -393,35 +369,34 @@ clear_permissions(Username, VHostPath) -> end)). list_permissions() -> - [{Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm, Scope} || - {Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm, Scope} <- + [{Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm} || + {Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm} <- list_permissions(match_user_vhost('_', '_'))]. list_vhost_permissions(VHostPath) -> - [{Username, ConfigurePerm, WritePerm, ReadPerm, Scope} || - {Username, _, ConfigurePerm, WritePerm, ReadPerm, Scope} <- + [{Username, ConfigurePerm, WritePerm, ReadPerm} || + {Username, _, ConfigurePerm, WritePerm, ReadPerm} <- list_permissions(rabbit_misc:with_vhost( VHostPath, match_user_vhost('_', VHostPath)))]. list_user_permissions(Username) -> - [{VHostPath, ConfigurePerm, WritePerm, ReadPerm, Scope} || - {_, VHostPath, ConfigurePerm, WritePerm, ReadPerm, Scope} <- + [{VHostPath, ConfigurePerm, WritePerm, ReadPerm} || + {_, VHostPath, ConfigurePerm, WritePerm, ReadPerm} <- list_permissions(rabbit_misc:with_user( Username, match_user_vhost(Username, '_')))]. list_user_vhost_permissions(Username, VHostPath) -> - [{ConfigurePerm, WritePerm, ReadPerm, Scope} || - {_, _, ConfigurePerm, WritePerm, ReadPerm, Scope} <- + [{ConfigurePerm, WritePerm, ReadPerm} || + {_, _, ConfigurePerm, WritePerm, ReadPerm} <- list_permissions(rabbit_misc:with_user_and_vhost( Username, VHostPath, match_user_vhost(Username, VHostPath)))]. list_permissions(QueryThunk) -> - [{Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm, Scope} || + [{Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm} || #user_permission{user_vhost = #user_vhost{username = Username, virtual_host = VHostPath}, - permission = #permission{ scope = Scope, - configure = ConfigurePerm, + permission = #permission{ configure = ConfigurePerm, write = WritePerm, read = ReadPerm}} <- %% TODO: use dirty ops instead diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 2389ec86..9d78bafa 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -35,7 +35,7 @@ -export([internal_declare/2, internal_delete/1, maybe_run_queue_via_backing_queue/2, update_ram_duration/1, set_ram_duration_target/2, - set_maximum_since_use/2, maybe_expire/1]). + set_maximum_since_use/2, maybe_expire/1, drop_expired/1]). -export([pseudo_queue/2]). -export([lookup/1, with/2, with_or_die/2, assert_equivalence/5, check_exclusive_access/2, with_exclusive_access_or_die/3, @@ -56,7 +56,7 @@ -include("rabbit.hrl"). -include_lib("stdlib/include/qlc.hrl"). --define(EXPIRES_TYPES, [byte, short, signedint, long]). +-define(INTEGER_ARG_TYPES, [byte, short, signedint, long]). %%---------------------------------------------------------------------------- @@ -310,18 +310,30 @@ check_declare_arguments(QueueName, Args) -> precondition_failed, "invalid arg '~s' for ~s: ~w", [Key, rabbit_misc:rs(QueueName), Error]) - end || {Key, Fun} <- [{<<"x-expires">>, fun check_expires_argument/1}]], + end || {Key, Fun} <- + [{<<"x-expires">>, fun check_expires_argument/1}, + {<<"x-message-ttl">>, fun check_message_ttl_argument/1}]], ok. -check_expires_argument(undefined) -> +check_expires_argument(Val) -> + check_integer_argument(Val, + expires_not_of_acceptable_type, + expires_zero_or_less). + +check_message_ttl_argument(Val) -> + check_integer_argument(Val, + ttl_not_of_acceptable_type, + ttl_zero_or_less). + +check_integer_argument(undefined, _, _) -> ok; -check_expires_argument({Type, Expires}) when Expires > 0 -> - case lists:member(Type, ?EXPIRES_TYPES) of +check_integer_argument({Type, Val}, InvalidTypeError, _) when Val > 0 -> + case lists:member(Type, ?INTEGER_ARG_TYPES) of true -> ok; - false -> {error, {expires_not_of_acceptable_type, Type, Expires}} + false -> {error, {InvalidTypeError, Type, Val}} end; -check_expires_argument({_Type, _Expires}) -> - {error, expires_zero_or_less}. +check_integer_argument({_Type, _Val}, _, ZeroOrLessError) -> + {error, ZeroOrLessError}. list(VHostPath) -> mnesia:dirty_match_object( @@ -466,6 +478,9 @@ set_maximum_since_use(QPid, Age) -> maybe_expire(QPid) -> gen_server2:cast(QPid, maybe_expire). +drop_expired(QPid) -> + gen_server2:cast(QPid, drop_expired). + on_node_down(Node) -> rabbit_binding:process_deletions( lists:foldl( diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 19db731a..6048920e 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -39,6 +39,8 @@ -define(SYNC_INTERVAL, 5). %% milliseconds -define(RAM_DURATION_UPDATE_INTERVAL, 5000). +-define(BASE_MESSAGE_PROPERTIES, #message_properties{expiry = undefined}). + -export([start_link/1, info_keys/0]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, @@ -61,7 +63,9 @@ sync_timer_ref, rate_timer_ref, expiry_timer_ref, - stats_timer + stats_timer, + ttl, + ttl_timer_ref }). -record(consumer, {tag, ack_required}). @@ -123,6 +127,7 @@ init(Q) -> sync_timer_ref = undefined, rate_timer_ref = undefined, expiry_timer_ref = undefined, + ttl = undefined, stats_timer = rabbit_event:init_stats_timer()}, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -145,12 +150,6 @@ code_change(_OldVsn, State, _Extra) -> %%---------------------------------------------------------------------------- -init_expires(State = #q{q = #amqqueue{arguments = Arguments}}) -> - case rabbit_misc:table_lookup(Arguments, <<"x-expires">>) of - {_Type, Expires} -> ensure_expiry_timer(State#q{expires = Expires}); - undefined -> State - end. - declare(Recover, From, State = #q{q = Q = #amqqueue{name = QName, durable = IsDurable}, backing_queue = BQ, backing_queue_state = undefined, @@ -165,7 +164,7 @@ declare(Recover, From, self(), {rabbit_amqqueue, set_ram_duration_target, [self()]}), BQS = BQ:init(QName, IsDurable, Recover), - State1 = init_expires(State#q{backing_queue_state = BQS}), + State1 = process_args(State#q{backing_queue_state = BQS}), rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State1)), rabbit_event:if_enabled(StatsTimer, @@ -174,6 +173,19 @@ declare(Recover, From, Q1 -> {stop, normal, {existing, Q1}, State} end. +process_args(State = #q{q = #amqqueue{arguments = Arguments}}) -> + lists:foldl(fun({Arg, Fun}, State1) -> + case rabbit_misc:table_lookup(Arguments, Arg) of + {_Type, Val} -> Fun(Val, State1); + undefined -> State1 + end + end, State, [{<<"x-expires">>, fun init_expires/2}, + {<<"x-message-ttl">>, fun init_ttl/2}]). + +init_expires(Expires, State) -> ensure_expiry_timer(State#q{expires = Expires}). + +init_ttl(TTL, State) -> drop_expired_messages(State#q{ttl = TTL}). + terminate_shutdown(Fun, State) -> State1 = #q{backing_queue = BQ, backing_queue_state = BQS} = stop_sync_timer(stop_rate_timer(State)), @@ -388,35 +400,40 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc, deliver_from_queue_pred(IsEmpty, _State) -> not IsEmpty. -deliver_from_queue_deliver(AckRequired, false, - State = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> - {{Message, IsDelivered, AckTag, Remaining}, BQS1} = - BQ:fetch(AckRequired, BQS), - {{Message, IsDelivered, AckTag}, 0 == Remaining, - State #q { backing_queue_state = BQS1 }}. +deliver_from_queue_deliver(AckRequired, false, State) -> + {{Message, IsDelivered, AckTag, Remaining}, State1} = + fetch(AckRequired, State), + {{Message, IsDelivered, AckTag}, 0 == Remaining, State1}. -run_message_queue(State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> +run_message_queue(State) -> Funs = {fun deliver_from_queue_pred/2, fun deliver_from_queue_deliver/3}, + State1 = #q{backing_queue = BQ, backing_queue_state = BQS} = + drop_expired_messages(State), IsEmpty = BQ:is_empty(BQS), - {_IsEmpty1, State1} = deliver_msgs_to_consumers(Funs, IsEmpty, State), - State1. + {_IsEmpty1, State2} = deliver_msgs_to_consumers(Funs, IsEmpty, State1), + State2. attempt_delivery(none, _ChPid, Message, State = #q{backing_queue = BQ}) -> PredFun = fun (IsEmpty, _State) -> not IsEmpty end, DeliverFun = fun (AckRequired, false, State1 = #q{backing_queue_state = BQS}) -> + %% we don't need an expiry here because messages are + %% not being enqueued, so we use an empty + %% message_properties. {AckTag, BQS1} = - BQ:publish_delivered(AckRequired, Message, BQS), + BQ:publish_delivered(AckRequired, Message, + ?BASE_MESSAGE_PROPERTIES, BQS), {{Message, false, AckTag}, true, State1#q{backing_queue_state = BQS1}} end, deliver_msgs_to_consumers({ PredFun, DeliverFun }, false, State); -attempt_delivery(Txn, ChPid, Message, State = #q{backing_queue = BQ, +attempt_delivery(Txn, ChPid, Message, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> record_current_channel_tx(ChPid, Txn), - {true, State#q{backing_queue_state = BQ:tx_publish(Txn, Message, BQS)}}. + {true, + State#q{backing_queue_state = + BQ:tx_publish(Txn, Message, ?BASE_MESSAGE_PROPERTIES, BQS)}}. deliver_or_enqueue(Txn, ChPid, Message, State = #q{backing_queue = BQ}) -> case attempt_delivery(Txn, ChPid, Message, State) of @@ -424,13 +441,22 @@ deliver_or_enqueue(Txn, ChPid, Message, State = #q{backing_queue = BQ}) -> {true, NewState}; {false, NewState} -> %% Txn is none and no unblocked channels with consumers - BQS = BQ:publish(Message, State #q.backing_queue_state), - {false, NewState#q{backing_queue_state = BQS}} + BQS = BQ:publish(Message, + message_properties(State), + State #q.backing_queue_state), + {false, ensure_ttl_timer(NewState#q{backing_queue_state = BQS})} end. -requeue_and_run(AckTags, State = #q{backing_queue = BQ}) -> +requeue_and_run(AckTags, State = #q{backing_queue = BQ, ttl=TTL}) -> maybe_run_queue_via_backing_queue( - fun (BQS) -> BQ:requeue(AckTags, BQS) end, State). + fun (BQS) -> + BQ:requeue(AckTags, reset_msg_expiry_fun(TTL), BQS) + end, State). + +fetch(AckRequired, State = #q{backing_queue_state = BQS, + backing_queue = BQ}) -> + {Result, BQS1} = BQ:fetch(AckRequired, BQS), + {Result, State#q{backing_queue_state = BQS1}}. add_consumer(ChPid, Consumer, Queue) -> queue:in({ChPid, Consumer}, Queue). @@ -525,10 +551,13 @@ qname(#q{q = #amqqueue{name = QName}}) -> QName. maybe_run_queue_via_backing_queue(Fun, State = #q{backing_queue_state = BQS}) -> run_message_queue(State#q{backing_queue_state = Fun(BQS)}). -commit_transaction(Txn, From, ChPid, State = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> - {AckTags, BQS1} = - BQ:tx_commit(Txn, fun () -> gen_server2:reply(From, ok) end, BQS), +commit_transaction(Txn, From, ChPid, State = #q{backing_queue = BQ, + backing_queue_state = BQS, + ttl = TTL}) -> + {AckTags, BQS1} = BQ:tx_commit(Txn, + fun () -> gen_server2:reply(From, ok) end, + reset_msg_expiry_fun(TTL), + BQS), %% ChPid must be known here because of the participant management %% by the channel. C = #cr{acktags = ChAckTags} = lookup_ch(ChPid), @@ -547,6 +576,44 @@ rollback_transaction(Txn, ChPid, State = #q{backing_queue = BQ, subtract_acks(A, B) when is_list(B) -> lists:foldl(fun sets:del_element/2, A, B). +reset_msg_expiry_fun(TTL) -> + fun(MsgProps) -> + MsgProps#message_properties{expiry = calculate_msg_expiry(TTL)} + end. + +message_properties(#q{ttl=TTL}) -> + #message_properties{expiry = calculate_msg_expiry(TTL)}. + +calculate_msg_expiry(undefined) -> undefined; +calculate_msg_expiry(TTL) -> now_millis() + (TTL * 1000). + +drop_expired_messages(State = #q{ttl = undefined}) -> + State; +drop_expired_messages(State = #q{backing_queue_state = BQS, + backing_queue = BQ}) -> + Now = now_millis(), + BQS1 = BQ:dropwhile( + fun (#message_properties{expiry = Expiry}) -> + Now > Expiry + end, BQS), + ensure_ttl_timer(State#q{backing_queue_state = BQS1}). + +ensure_ttl_timer(State = #q{backing_queue = BQ, + backing_queue_state = BQS, + ttl = TTL, + ttl_timer_ref = undefined}) + when TTL =/= undefined -> + case BQ:is_empty(BQS) of + true -> State; + false -> TRef = timer:apply_after(TTL, rabbit_amqqueue, drop_expired, + [self()]), + State#q{ttl_timer_ref = TRef} + end; +ensure_ttl_timer(State) -> + State. + +now_millis() -> timer:now_diff(now(), {0,0,0}). + infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. i(name, #q{q = #amqqueue{name = Name}}) -> Name; @@ -605,6 +672,7 @@ prioritise_cast(Msg, _State) -> {set_ram_duration_target, _Duration} -> 8; {set_maximum_since_use, _Age} -> 8; maybe_expire -> 8; + drop_expired -> 8; emit_stats -> 7; {ack, _Txn, _MsgIds, _ChPid} -> 7; {reject, _MsgIds, _Requeue, _ChPid} -> 7; @@ -695,21 +763,21 @@ handle_call({notify_down, ChPid}, _From, State) -> end; handle_call({basic_get, ChPid, NoAck}, _From, - State = #q{q = #amqqueue{name = QName}, - backing_queue_state = BQS, backing_queue = BQ}) -> + State = #q{q = #amqqueue{name = QName}}) -> AckRequired = not NoAck, State1 = ensure_expiry_timer(State), - case BQ:fetch(AckRequired, BQS) of - {empty, BQS1} -> reply(empty, State1#q{backing_queue_state = BQS1}); - {{Message, IsDelivered, AckTag, Remaining}, BQS1} -> + case fetch(AckRequired, drop_expired_messages(State1)) of + {empty, State2} -> + reply(empty, State2); + {{Message, IsDelivered, AckTag, Remaining}, State2} -> case AckRequired of - true -> C = #cr{acktags = ChAckTags} = ch_record(ChPid), + true -> C = #cr{acktags = ChAckTags} = ch_record(ChPid), store_ch_record( C#cr{acktags = sets:add_element(AckTag, ChAckTags)}); false -> ok end, Msg = {QName, self(), AckTag, IsDelivered, Message}, - reply({ok, Remaining, Msg}, State1#q{backing_queue_state = BQS1}) + reply({ok, Remaining, Msg}, State2) end; handle_call({basic_consume, NoAck, ChPid, LimiterPid, @@ -781,11 +849,11 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, end end; -handle_call(stat, _From, State = #q{backing_queue = BQ, - backing_queue_state = BQS, - active_consumers = ActiveConsumers}) -> - reply({ok, BQ:len(BQS), queue:len(ActiveConsumers)}, - ensure_expiry_timer(State)); +handle_call(stat, _From, State) -> + State1 = #q{backing_queue = BQ, backing_queue_state = BQS, + active_consumers = ActiveConsumers} = + drop_expired_messages(ensure_expiry_timer(State)), + reply({ok, BQ:len(BQS), queue:len(ActiveConsumers)}, State1); handle_call({delete, IfUnused, IfEmpty}, _From, State = #q{backing_queue_state = BQS, backing_queue = BQ}) -> @@ -918,6 +986,9 @@ handle_cast(maybe_expire, State) -> false -> noreply(ensure_expiry_timer(State)) end; +handle_cast(drop_expired, State) -> + noreply(drop_expired_messages(State#q{ttl_timer_ref = undefined})); + handle_cast(emit_stats, State = #q{stats_timer = StatsTimer}) -> %% Do not invoke noreply as it would see no timer and create a new one. emit_stats(State), diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 2230c507..352e76fd 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -62,12 +62,16 @@ behaviour_info(callbacks) -> {purge, 1}, %% Publish a message. - {publish, 2}, + {publish, 3}, %% Called for messages which have already been passed straight %% out to a client. The queue will be empty for these calls %% (i.e. saves the round trip through the backing queue). - {publish_delivered, 3}, + {publish_delivered, 4}, + + %% Drop messages from the head of the queue while the supplied + %% predicate returns true. + {dropwhile, 2}, %% Produce the next message. {fetch, 2}, @@ -77,7 +81,7 @@ behaviour_info(callbacks) -> {ack, 2}, %% A publish, but in the context of a transaction. - {tx_publish, 3}, + {tx_publish, 4}, %% Acks, but in the context of a transaction. {tx_ack, 3}, @@ -89,11 +93,11 @@ behaviour_info(callbacks) -> %% Commit a transaction. The Fun passed in must be called once %% the messages have really been commited. This CPS permits the %% possibility of commit coalescing. - {tx_commit, 3}, + {tx_commit, 4}, %% Reinsert messages into the queue which have already been %% delivered and were pending acknowledgement. - {requeue, 2}, + {requeue, 3}, %% How long is my queue? {len, 1}, diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index 8facaf16..6b212745 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -39,7 +39,6 @@ -define(QUIET_OPT, "-q"). -define(NODE_OPT, "-n"). -define(VHOST_OPT, "-p"). --define(SCOPE_OPT, "-s"). %%---------------------------------------------------------------------------- @@ -67,7 +66,7 @@ start() -> {[Command0 | Args], Opts} = rabbit_misc:get_options( [{flag, ?QUIET_OPT}, {option, ?NODE_OPT, NodeStr}, - {option, ?VHOST_OPT, "/"}, {option, ?SCOPE_OPT, "client"}], + {option, ?VHOST_OPT, "/"}], FullCommand), Opts1 = lists:map(fun({K, V}) -> case K of @@ -289,10 +288,9 @@ action(list_consumers, Node, _Args, Opts, Inform) -> action(set_permissions, Node, [Username, CPerm, WPerm, RPerm], Opts, Inform) -> VHost = proplists:get_value(?VHOST_OPT, Opts), - Scope = proplists:get_value(?SCOPE_OPT, Opts), Inform("Setting permissions for user ~p in vhost ~p", [Username, VHost]), call(Node, {rabbit_access_control, set_permissions, - [Scope, Username, VHost, CPerm, WPerm, RPerm]}); + [Username, VHost, CPerm, WPerm, RPerm]}); action(clear_permissions, Node, [Username], Opts, Inform) -> VHost = proplists:get_value(?VHOST_OPT, Opts), diff --git a/src/rabbit_invariable_queue.erl b/src/rabbit_invariable_queue.erl index 4e0dad84..9855f18c 100644 --- a/src/rabbit_invariable_queue.erl +++ b/src/rabbit_invariable_queue.erl @@ -31,9 +31,9 @@ -module(rabbit_invariable_queue). --export([init/3, terminate/1, delete_and_terminate/1, purge/1, publish/2, - publish_delivered/3, fetch/2, ack/2, tx_publish/3, tx_ack/3, - tx_rollback/2, tx_commit/3, requeue/2, len/1, is_empty/1, +-export([init/3, terminate/1, delete_and_terminate/1, purge/1, publish/3, + publish_delivered/4, fetch/2, ack/2, tx_publish/4, tx_ack/3, + dropwhile/2, tx_rollback/2, tx_commit/4, requeue/3, len/1, is_empty/1, set_ram_duration_target/2, ram_duration/1, needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1, status/1]). @@ -89,40 +89,61 @@ purge(State = #iv_state { queue = Q, qname = QName, durable = IsDurable, %% We do not purge messages pending acks. {AckTags, PA} = rabbit_misc:queue_fold( - fun ({#basic_message { is_persistent = false }, _IsDelivered}, Acc) -> + fun ({#basic_message { is_persistent = false }, + _MsgProps, _IsDelivered}, Acc) -> Acc; - ({Msg = #basic_message { guid = Guid }, IsDelivered}, + ({Msg = #basic_message { guid = Guid }, MsgProps, IsDelivered}, {AckTagsN, PAN}) -> ok = persist_delivery(QName, IsDurable, IsDelivered, Msg), - {[Guid | AckTagsN], dict:store(Guid, Msg, PAN)} + {[Guid | AckTagsN], store_ack(Msg, MsgProps, PAN)} end, {[], dict:new()}, Q), ok = persist_acks(QName, IsDurable, none, AckTags, PA), {Len, State #iv_state { len = 0, queue = queue:new() }}. -publish(Msg, State = #iv_state { queue = Q, qname = QName, durable = IsDurable, - len = Len }) -> - ok = persist_message(QName, IsDurable, none, Msg), - State #iv_state { queue = queue:in({Msg, false}, Q), len = Len + 1 }. +publish(Msg, MsgProps, State = #iv_state { queue = Q, + qname = QName, + durable = IsDurable, + len = Len }) -> + ok = persist_message(QName, IsDurable, none, Msg, MsgProps), + State #iv_state { queue = enqueue(Msg, MsgProps, false, Q), len = Len + 1 }. -publish_delivered(false, _Msg, State) -> +publish_delivered(false, _Msg, _MsgProps, State) -> {blank_ack, State}; publish_delivered(true, Msg = #basic_message { guid = Guid }, + MsgProps, State = #iv_state { qname = QName, durable = IsDurable, len = 0, pending_ack = PA }) -> - ok = persist_message(QName, IsDurable, none, Msg), + ok = persist_message(QName, IsDurable, none, Msg, MsgProps), ok = persist_delivery(QName, IsDurable, false, Msg), - {Guid, State #iv_state { pending_ack = dict:store(Guid, Msg, PA) }}. + {Guid, State #iv_state { pending_ack = store_ack(Msg, MsgProps, PA) }}. + +dropwhile(_Pred, State = #iv_state { len = 0 }) -> + State; +dropwhile(Pred, State = #iv_state { queue = Q }) -> + {{value, {Msg, MsgProps, IsDelivered}}, Q1} = queue:out(Q), + case Pred(MsgProps) of + true -> {_, State1} = fetch_internal(false, Q1, Msg, MsgProps, + IsDelivered, State), + dropwhile(Pred, State1); + false -> State + end. fetch(_AckRequired, State = #iv_state { len = 0 }) -> {empty, State}; -fetch(AckRequired, State = #iv_state { len = Len, queue = Q, qname = QName, - durable = IsDurable, - pending_ack = PA }) -> - {{value, {Msg = #basic_message { guid = Guid }, IsDelivered}}, Q1} = - queue:out(Q), +fetch(AckRequired, State = #iv_state { queue = Q }) -> + {{value, {Msg, MsgProps, IsDelivered}}, Q1} = queue:out(Q), + fetch_internal(AckRequired, Q1, Msg, MsgProps, IsDelivered, State). + +fetch_internal(AckRequired, Q1, + Msg = #basic_message { guid = Guid }, + MsgProps, IsDelivered, + State = #iv_state { len = Len, + qname = QName, + durable = IsDurable, + pending_ack = PA }) -> Len1 = Len - 1, ok = persist_delivery(QName, IsDurable, IsDelivered, Msg), - PA1 = dict:store(Guid, Msg, PA), + PA1 = store_ack(Msg, MsgProps, PA), {AckTag, PA2} = case AckRequired of true -> {Guid, PA1}; false -> ok = persist_acks(QName, IsDurable, none, @@ -138,11 +159,11 @@ ack(AckTags, State = #iv_state { qname = QName, durable = IsDurable, PA1 = remove_acks(AckTags, PA), State #iv_state { pending_ack = PA1 }. -tx_publish(Txn, Msg, State = #iv_state { qname = QName, - durable = IsDurable }) -> +tx_publish(Txn, Msg, MsgProps, State = #iv_state { qname = QName, + durable = IsDurable }) -> Tx = #tx { pending_messages = Pubs } = lookup_tx(Txn), - store_tx(Txn, Tx #tx { pending_messages = [Msg | Pubs] }), - ok = persist_message(QName, IsDurable, Txn, Msg), + store_tx(Txn, Tx #tx { pending_messages = [{Msg, MsgProps} | Pubs] }), + ok = persist_message(QName, IsDurable, Txn, Msg, MsgProps), State. tx_ack(Txn, AckTags, State = #iv_state { qname = QName, durable = IsDurable, @@ -159,8 +180,10 @@ tx_rollback(Txn, State = #iv_state { qname = QName }) -> erase_tx(Txn), {lists:flatten(AckTags), State}. -tx_commit(Txn, Fun, State = #iv_state { qname = QName, pending_ack = PA, - queue = Q, len = Len }) -> +tx_commit(Txn, Fun, MsgPropsFun, State = #iv_state { qname = QName, + pending_ack = PA, + queue = Q, + len = Len }) -> #tx { pending_acks = AckTags, pending_messages = PubsRev } = lookup_tx(Txn), ok = do_if_persistent(fun rabbit_persister:commit_transaction/1, Txn, QName), @@ -168,13 +191,16 @@ tx_commit(Txn, Fun, State = #iv_state { qname = QName, pending_ack = PA, Fun(), AckTags1 = lists:flatten(AckTags), PA1 = remove_acks(AckTags1, PA), - {Q1, Len1} = lists:foldr(fun (Msg, {QN, LenN}) -> - {queue:in({Msg, false}, QN), LenN + 1} + {Q1, Len1} = lists:foldr(fun ({Msg, MsgProps}, {QN, LenN}) -> + {enqueue(Msg, MsgPropsFun(MsgProps), + false, QN), + LenN + 1} end, {Q, Len}, PubsRev), {AckTags1, State #iv_state { pending_ack = PA1, queue = Q1, len = Len1 }}. -requeue(AckTags, State = #iv_state { pending_ack = PA, queue = Q, - len = Len }) -> +requeue(AckTags, MsgPropsFun, State = #iv_state { pending_ack = PA, + queue = Q, + len = Len }) -> %% We don't need to touch the persister here - the persister will %% already have these messages published and delivered as %% necessary. The complication is that the persister's seq_id will @@ -186,12 +212,17 @@ requeue(AckTags, State = #iv_state { pending_ack = PA, queue = Q, %% order to the last known state of our queue, prior to shutdown. {Q1, Len1} = lists:foldl( fun (Guid, {QN, LenN}) -> - {ok, Msg = #basic_message {}} = dict:find(Guid, PA), - {queue:in({Msg, true}, QN), LenN + 1} + {Msg = #basic_message {}, MsgProps} + = dict:fetch(Guid, PA), + {enqueue(Msg, MsgPropsFun(MsgProps), true, QN), + LenN + 1} end, {Q, Len}, AckTags), PA1 = remove_acks(AckTags, PA), State #iv_state { pending_ack = PA1, queue = Q1, len = Len1 }. +enqueue(Msg, MsgProps, IsDelivered, Q) -> + queue:in({Msg, MsgProps, IsDelivered}, Q). + len(#iv_state { len = Len }) -> Len. is_empty(State) -> 0 == len(State). @@ -212,6 +243,9 @@ status(_State) -> []. remove_acks(AckTags, PA) -> lists:foldl(fun dict:erase/2, PA, AckTags). +store_ack(Msg = #basic_message { guid = Guid }, MsgProps, PA) -> + dict:store(Guid, {Msg, MsgProps}, PA). + %%---------------------------------------------------------------------------- lookup_tx(Txn) -> @@ -243,14 +277,15 @@ do_if_persistent(F, Txn, QName) -> %%---------------------------------------------------------------------------- persist_message(QName, true, Txn, Msg = #basic_message { - is_persistent = true }) -> + is_persistent = true }, MsgProps) -> Msg1 = Msg #basic_message { %% don't persist any recoverable decoded properties content = rabbit_binary_parser:clear_decoded_content( Msg #basic_message.content)}, persist_work(Txn, QName, - [{publish, Msg1, {QName, Msg1 #basic_message.guid}}]); -persist_message(_QName, _IsDurable, _Txn, _Msg) -> + [{publish, Msg1, MsgProps, + {QName, Msg1 #basic_message.guid}}]); +persist_message(_QName, _IsDurable, _Txn, _Msg, _MsgProps) -> ok. persist_delivery(QName, true, false, #basic_message { is_persistent = true, @@ -263,7 +298,8 @@ persist_acks(QName, true, Txn, AckTags, PA) -> persist_work(Txn, QName, [{ack, {QName, Guid}} || Guid <- AckTags, begin - {ok, Msg} = dict:find(Guid, PA), + {ok, {Msg, _MsgProps}} + = dict:find(Guid, PA), Msg #basic_message.is_persistent end]); persist_acks(_QName, _IsDurable, _Txn, _AckTags, _PA) -> diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 577d206d..8de2f0d6 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -340,10 +340,8 @@ read_cluster_nodes_config() -> case rabbit_misc:read_term_file(FileName) of {ok, [ClusterNodes]} -> ClusterNodes; {error, enoent} -> - case application:get_env(cluster_nodes) of - undefined -> []; - {ok, ClusterNodes} -> ClusterNodes - end; + {ok, ClusterNodes} = application:get_env(rabbit, cluster_nodes), + ClusterNodes; {error, Reason} -> throw({error, {cannot_read_cluster_nodes_config, FileName, Reason}}) diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 66cc06cf..24b017b5 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -34,10 +34,12 @@ -behaviour(gen_server2). -export([start_link/4, successfully_recovered_state/1, - client_init/2, client_terminate/2, client_delete_and_terminate/3, - write/4, read/3, contains/2, remove/2, release/2, sync/3]). + client_init/2, client_terminate/1, client_delete_and_terminate/1, + client_ref/1, + write/3, read/2, contains/2, remove/2, release/2, sync/3]). --export([sync/1, gc_done/4, set_maximum_since_use/2, gc/3]). %% internal +-export([sync/1, set_maximum_since_use/2, + has_readers/2, combine_files/3, delete_file/2]). %% internal -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3, prioritise_call/3, prioritise_cast/2]). @@ -74,7 +76,6 @@ sum_valid_data, %% sum of valid data in all files sum_file_size, %% sum of file sizes pending_gc_completion, %% things to do once GC completes - gc_active, %% is the GC currently working? gc_pid, %% pid of our GC file_handles_ets, %% tid of the shared file handles table file_summary_ets, %% tid of the file summary table @@ -86,7 +87,9 @@ }). -record(client_msstate, - { file_handle_cache, + { server, + client_ref, + file_handle_cache, index_state, index_module, dir, @@ -100,13 +103,33 @@ -record(file_summary, {file, valid_total_size, left, right, file_size, locked, readers}). +-record(gc_state, + { dir, + index_module, + index_state, + file_summary_ets, + msg_store + }). + %%---------------------------------------------------------------------------- -ifdef(use_specs). +-export_type([gc_state/0, file_num/0]). + +-opaque(gc_state() :: #gc_state { dir :: file:filename(), + index_module :: atom(), + index_state :: any(), + file_summary_ets :: ets:tid(), + msg_store :: server() + }). + -type(server() :: pid() | atom()). +-type(client_ref() :: binary()). -type(file_num() :: non_neg_integer()). -type(client_msstate() :: #client_msstate { + server :: server(), + client_ref :: client_ref(), file_handle_cache :: dict:dictionary(), index_state :: any(), index_module :: atom(), @@ -124,26 +147,25 @@ (atom(), file:filename(), [binary()] | 'undefined', startup_fun_state()) -> rabbit_types:ok_pid_or_error()). -spec(successfully_recovered_state/1 :: (server()) -> boolean()). --spec(client_init/2 :: (server(), binary()) -> client_msstate()). --spec(client_terminate/2 :: (client_msstate(), server()) -> 'ok'). --spec(client_delete_and_terminate/3 :: - (client_msstate(), server(), binary()) -> 'ok'). --spec(write/4 :: (server(), rabbit_guid:guid(), msg(), client_msstate()) -> - rabbit_types:ok(client_msstate())). --spec(read/3 :: (server(), rabbit_guid:guid(), client_msstate()) -> - {rabbit_types:ok(msg()) | 'not_found', client_msstate()}). --spec(contains/2 :: (server(), rabbit_guid:guid()) -> boolean()). --spec(remove/2 :: (server(), [rabbit_guid:guid()]) -> 'ok'). --spec(release/2 :: (server(), [rabbit_guid:guid()]) -> 'ok'). --spec(sync/3 :: (server(), [rabbit_guid:guid()], fun (() -> any())) -> 'ok'). +-spec(client_init/2 :: (server(), client_ref()) -> client_msstate()). +-spec(client_terminate/1 :: (client_msstate()) -> 'ok'). +-spec(client_delete_and_terminate/1 :: (client_msstate()) -> 'ok'). +-spec(client_ref/1 :: (client_msstate()) -> client_ref()). +-spec(write/3 :: (rabbit_guid:guid(), msg(), client_msstate()) -> 'ok'). +-spec(read/2 :: (rabbit_guid:guid(), client_msstate()) -> + {rabbit_types:ok(msg()) | 'not_found', client_msstate()}). +-spec(contains/2 :: (rabbit_guid:guid(), client_msstate()) -> boolean()). +-spec(remove/2 :: ([rabbit_guid:guid()], client_msstate()) -> 'ok'). +-spec(release/2 :: ([rabbit_guid:guid()], client_msstate()) -> 'ok'). +-spec(sync/3 :: ([rabbit_guid:guid()], fun (() -> any()), client_msstate()) -> + 'ok'). -spec(sync/1 :: (server()) -> 'ok'). --spec(gc_done/4 :: (server(), non_neg_integer(), file_num(), file_num()) -> - 'ok'). -spec(set_maximum_since_use/2 :: (server(), non_neg_integer()) -> 'ok'). --spec(gc/3 :: (non_neg_integer(), non_neg_integer(), - {ets:tid(), file:filename(), atom(), any()}) -> - 'concurrent_readers' | non_neg_integer()). +-spec(has_readers/2 :: (non_neg_integer(), gc_state()) -> boolean()). +-spec(combine_files/3 :: (non_neg_integer(), non_neg_integer(), gc_state()) -> + non_neg_integer()). +-spec(delete_file/2 :: (non_neg_integer(), gc_state()) -> non_neg_integer()). -endif. @@ -316,7 +338,9 @@ client_init(Server, Ref) -> {IState, IModule, Dir, GCPid, FileHandlesEts, FileSummaryEts, DedupCacheEts, CurFileCacheEts} = gen_server2:call(Server, {new_client_state, Ref}, infinity), - #client_msstate { file_handle_cache = dict:new(), + #client_msstate { server = Server, + client_ref = Ref, + file_handle_cache = dict:new(), index_state = IState, index_module = IModule, dir = Dir, @@ -326,20 +350,22 @@ client_init(Server, Ref) -> dedup_cache_ets = DedupCacheEts, cur_file_cache_ets = CurFileCacheEts }. -client_terminate(CState, Server) -> +client_terminate(CState) -> close_all_handles(CState), - ok = gen_server2:call(Server, client_terminate, infinity). + ok = server_call(CState, client_terminate). -client_delete_and_terminate(CState, Server, Ref) -> +client_delete_and_terminate(CState = #client_msstate { client_ref = Ref }) -> close_all_handles(CState), - ok = gen_server2:cast(Server, {client_delete, Ref}). + ok = server_cast(CState, {client_delete, Ref}). -write(Server, Guid, Msg, +client_ref(#client_msstate { client_ref = Ref }) -> Ref. + +write(Guid, Msg, CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts }) -> ok = update_msg_cache(CurFileCacheEts, Guid, Msg), - {gen_server2:cast(Server, {write, Guid}), CState}. + ok = server_cast(CState, {write, Guid}). -read(Server, Guid, +read(Guid, CState = #client_msstate { dedup_cache_ets = DedupCacheEts, cur_file_cache_ets = CurFileCacheEts }) -> %% 1. Check the dedup cache @@ -348,13 +374,12 @@ read(Server, Guid, %% 2. Check the cur file cache case ets:lookup(CurFileCacheEts, Guid) of [] -> - Defer = fun() -> {gen_server2:call( - Server, {read, Guid}, infinity), - CState} end, + Defer = fun() -> + {server_call(CState, {read, Guid}), CState} + end, case index_lookup_positive_ref_count(Guid, CState) of not_found -> Defer(); - MsgLocation -> client_read1(Server, MsgLocation, Defer, - CState) + MsgLocation -> client_read1(MsgLocation, Defer, CState) end; [{Guid, Msg, _CacheRefCount}] -> %% Although we've found it, we don't know the @@ -365,19 +390,16 @@ read(Server, Guid, {{ok, Msg}, CState} end. -contains(Server, Guid) -> gen_server2:call(Server, {contains, Guid}, infinity). -remove(_Server, []) -> ok; -remove(Server, Guids) -> gen_server2:cast(Server, {remove, Guids}). -release(_Server, []) -> ok; -release(Server, Guids) -> gen_server2:cast(Server, {release, Guids}). -sync(Server, Guids, K) -> gen_server2:cast(Server, {sync, Guids, K}). +contains(Guid, CState) -> server_call(CState, {contains, Guid}). +remove([], _CState) -> ok; +remove(Guids, CState) -> server_cast(CState, {remove, Guids}). +release([], _CState) -> ok; +release(Guids, CState) -> server_cast(CState, {release, Guids}). +sync(Guids, K, CState) -> server_cast(CState, {sync, Guids, K}). sync(Server) -> gen_server2:cast(Server, sync). -gc_done(Server, Reclaimed, Source, Destination) -> - gen_server2:cast(Server, {gc_done, Reclaimed, Source, Destination}). - set_maximum_since_use(Server, Age) -> gen_server2:cast(Server, {set_maximum_since_use, Age}). @@ -385,18 +407,22 @@ set_maximum_since_use(Server, Age) -> %% Client-side-only helpers %%---------------------------------------------------------------------------- -client_read1(Server, - #msg_location { guid = Guid, file = File } = MsgLocation, - Defer, +server_call(#client_msstate { server = Server }, Msg) -> + gen_server2:call(Server, Msg, infinity). + +server_cast(#client_msstate { server = Server }, Msg) -> + gen_server2:cast(Server, Msg). + +client_read1(#msg_location { guid = Guid, file = File } = MsgLocation, Defer, CState = #client_msstate { file_summary_ets = FileSummaryEts }) -> case ets:lookup(FileSummaryEts, File) of [] -> %% File has been GC'd and no longer exists. Go around again. - read(Server, Guid, CState); + read(Guid, CState); [#file_summary { locked = Locked, right = Right }] -> - client_read2(Server, Locked, Right, MsgLocation, Defer, CState) + client_read2(Locked, Right, MsgLocation, Defer, CState) end. -client_read2(_Server, false, undefined, _MsgLocation, Defer, _CState) -> +client_read2(false, undefined, _MsgLocation, Defer, _CState) -> %% Although we've already checked both caches and not found the %% message there, the message is apparently in the %% current_file. We can only arrive here if we are trying to read @@ -407,12 +433,12 @@ client_read2(_Server, false, undefined, _MsgLocation, Defer, _CState) -> %% contents of the current file, thus reads from the current file %% will end up here and will need to be deferred. Defer(); -client_read2(_Server, true, _Right, _MsgLocation, Defer, _CState) -> +client_read2(true, _Right, _MsgLocation, Defer, _CState) -> %% Of course, in the mean time, the GC could have run and our msg %% is actually in a different file, unlocked. However, defering is %% the safest and simplest thing to do. Defer(); -client_read2(Server, false, _Right, +client_read2(false, _Right, MsgLocation = #msg_location { guid = Guid, file = File }, Defer, CState = #client_msstate { file_summary_ets = FileSummaryEts }) -> @@ -421,10 +447,10 @@ client_read2(Server, false, _Right, %% finished. safe_ets_update_counter( FileSummaryEts, File, {#file_summary.readers, +1}, - fun (_) -> client_read3(Server, MsgLocation, Defer, CState) end, - fun () -> read(Server, Guid, CState) end). + fun (_) -> client_read3(MsgLocation, Defer, CState) end, + fun () -> read(Guid, CState) end). -client_read3(Server, #msg_location { guid = Guid, file = File }, Defer, +client_read3(#msg_location { guid = Guid, file = File }, Defer, CState = #client_msstate { file_handles_ets = FileHandlesEts, file_summary_ets = FileSummaryEts, dedup_cache_ets = DedupCacheEts, @@ -448,7 +474,7 @@ client_read3(Server, #msg_location { guid = Guid, file = File }, Defer, %% too). case ets:lookup(FileSummaryEts, File) of [] -> %% GC has deleted our file, just go round again. - read(Server, Guid, CState); + read(Guid, CState); [#file_summary { locked = true }] -> %% If we get a badarg here, then the GC has finished and %% deleted our file. Try going around again. Otherwise, @@ -459,7 +485,7 @@ client_read3(Server, #msg_location { guid = Guid, file = File }, Defer, %% unlocks the dest) try Release(), Defer() - catch error:badarg -> read(Server, Guid, CState) + catch error:badarg -> read(Guid, CState) end; [#file_summary { locked = false }] -> %% Ok, we're definitely safe to continue - a GC involving @@ -482,9 +508,14 @@ client_read3(Server, #msg_location { guid = Guid, file = File }, Defer, read_from_disk(MsgLocation, CState1, DedupCacheEts), Release(), %% this MUST NOT fail with badarg {{ok, Msg}, CState2}; - MsgLocation -> %% different file! + #msg_location {} = MsgLocation -> %% different file! Release(), %% this MUST NOT fail with badarg - client_read1(Server, MsgLocation, Defer, CState) + client_read1(MsgLocation, Defer, CState); + not_found -> %% it seems not to exist. Defer, just to be sure. + try Release() %% this can badarg, same as locked case, above + catch error:badarg -> ok + end, + Defer() end end. @@ -547,8 +578,7 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) -> sync_timer_ref = undefined, sum_valid_data = 0, sum_file_size = 0, - pending_gc_completion = [], - gc_active = false, + pending_gc_completion = orddict:new(), gc_pid = undefined, file_handles_ets = FileHandlesEts, file_summary_ets = FileSummaryEts, @@ -570,8 +600,13 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) -> {ok, Offset} = file_handle_cache:position(CurHdl, Offset), ok = file_handle_cache:truncate(CurHdl), - {ok, GCPid} = rabbit_msg_store_gc:start_link(Dir, IndexState, IndexModule, - FileSummaryEts), + {ok, GCPid} = rabbit_msg_store_gc:start_link( + #gc_state { dir = Dir, + index_module = IndexModule, + index_state = IndexState, + file_summary_ets = FileSummaryEts, + msg_store = self() + }), {ok, maybe_compact( State1 #msstate { current_file_handle = CurHdl, gc_pid = GCPid }), @@ -588,10 +623,11 @@ prioritise_call(Msg, _From, _State) -> prioritise_cast(Msg, _State) -> case Msg of - sync -> 8; - {gc_done, _Reclaimed, _Source, _Destination} -> 8; - {set_maximum_since_use, _Age} -> 8; - _ -> 0 + sync -> 8; + {combine_files, _Source, _Destination, _Reclaimed} -> 8; + {delete_file, _File, _Reclaimed} -> 8; + {set_maximum_since_use, _Age} -> 8; + _ -> 0 end. handle_call(successfully_recovered_state, _From, State) -> @@ -686,37 +722,23 @@ handle_cast({sync, Guids, K}, handle_cast(sync, State) -> noreply(internal_sync(State)); -handle_cast({gc_done, Reclaimed, Src, Dst}, +handle_cast({combine_files, Source, Destination, Reclaimed}, State = #msstate { sum_file_size = SumFileSize, - gc_active = {Src, Dst}, file_handles_ets = FileHandlesEts, file_summary_ets = FileSummaryEts }) -> - %% GC done, so now ensure that any clients that have open fhs to - %% those files close them before using them again. This has to be - %% done here (given it's done in the msg_store, and not the gc), - %% and not when starting up the GC, because if done when starting - %% up the GC, the client could find the close, and close and - %% reopen the fh, whilst the GC is waiting for readers to - %% disappear, before it's actually done the GC. - true = mark_handle_to_close(FileHandlesEts, Src), - true = mark_handle_to_close(FileHandlesEts, Dst), - %% we always move data left, so Src has gone and was on the - %% right, so need to make dest = source.right.left, and also - %% dest.right = source.right - [#file_summary { left = Dst, - right = SrcRight, - locked = true, - readers = 0 }] = ets:lookup(FileSummaryEts, Src), - %% this could fail if SrcRight =:= undefined - ets:update_element(FileSummaryEts, SrcRight, {#file_summary.left, Dst}), - true = ets:update_element(FileSummaryEts, Dst, - [{#file_summary.locked, false}, - {#file_summary.right, SrcRight}]), - true = ets:delete(FileSummaryEts, Src), - noreply( - maybe_compact(run_pending( - State #msstate { sum_file_size = SumFileSize - Reclaimed, - gc_active = false }))); + ok = cleanup_after_file_deletion(Source, State), + %% see comment in cleanup_after_file_deletion + true = mark_handle_to_close(FileHandlesEts, Destination), + true = ets:update_element(FileSummaryEts, Destination, + {#file_summary.locked, false}), + State1 = State #msstate { sum_file_size = SumFileSize - Reclaimed }, + noreply(maybe_compact(run_pending([Source, Destination], State1))); + +handle_cast({delete_file, File, Reclaimed}, + State = #msstate { sum_file_size = SumFileSize }) -> + ok = cleanup_after_file_deletion(File, State), + State1 = State #msstate { sum_file_size = SumFileSize - Reclaimed }, + noreply(maybe_compact(run_pending([File], State1))); handle_cast({set_maximum_since_use, Age}, State) -> ok = file_handle_cache:set_maximum_since_use(Age), @@ -867,7 +889,7 @@ read_message1(From, #msg_location { guid = Guid, ref_count = RefCount, ets:lookup(FileSummaryEts, File), case Locked of true -> add_to_pending_gc_completion({read, Guid, From}, - State); + File, State); false -> {Msg, State1} = read_from_disk(MsgLoc, State, DedupCacheEts), gen_server2:reply(From, {ok, Msg}), @@ -897,19 +919,18 @@ read_from_disk(#msg_location { guid = Guid, ref_count = RefCount, ok = maybe_insert_into_cache(DedupCacheEts, RefCount, Guid, Msg), {Msg, State1}. -contains_message(Guid, From, State = #msstate { gc_active = GCActive }) -> +contains_message(Guid, From, + State = #msstate { pending_gc_completion = Pending }) -> case index_lookup_positive_ref_count(Guid, State) of not_found -> gen_server2:reply(From, false), State; #msg_location { file = File } -> - case GCActive of - {A, B} when File =:= A orelse File =:= B -> - add_to_pending_gc_completion( - {contains, Guid, From}, State); - _ -> - gen_server2:reply(From, true), - State + case orddict:is_key(File, Pending) of + true -> add_to_pending_gc_completion( + {contains, Guid, From}, File, State); + false -> gen_server2:reply(From, true), + State end end. @@ -928,7 +949,7 @@ remove_message(Guid, State = #msstate { sum_valid_data = SumValid, 1 -> ok = remove_cache_entry(DedupCacheEts, Guid), case ets:lookup(FileSummaryEts, File) of [#file_summary { locked = true } ] -> - add_to_pending_gc_completion({remove, Guid}, State); + add_to_pending_gc_completion({remove, Guid}, File, State); [#file_summary {}] -> ok = Dec(), [_] = ets:update_counter( @@ -944,20 +965,25 @@ remove_message(Guid, State = #msstate { sum_valid_data = SumValid, end. add_to_pending_gc_completion( - Op, State = #msstate { pending_gc_completion = Pending }) -> - State #msstate { pending_gc_completion = [Op | Pending] }. - -run_pending(State = #msstate { pending_gc_completion = [] }) -> - State; -run_pending(State = #msstate { pending_gc_completion = Pending }) -> - State1 = State #msstate { pending_gc_completion = [] }, - lists:foldl(fun run_pending/2, State1, lists:reverse(Pending)). + Op, File, State = #msstate { pending_gc_completion = Pending }) -> + State #msstate { pending_gc_completion = + rabbit_misc:orddict_cons(File, Op, Pending) }. -run_pending({read, Guid, From}, State) -> +run_pending(Files, State) -> + lists:foldl( + fun (File, State1 = #msstate { pending_gc_completion = Pending }) -> + Pending1 = orddict:erase(File, Pending), + lists:foldl( + fun run_pending_action/2, + State1 #msstate { pending_gc_completion = Pending1 }, + lists:reverse(orddict:fetch(File, Pending))) + end, State, Files). + +run_pending_action({read, Guid, From}, State) -> read_message(Guid, From, State); -run_pending({contains, Guid, From}, State) -> +run_pending_action({contains, Guid, From}, State) -> contains_message(Guid, From, State); -run_pending({remove, Guid}, State) -> +run_pending_action({remove, Guid}, State) -> remove_message(Guid, State). safe_ets_update_counter(Tab, Key, UpdateOp, SuccessFun, FailThunk) -> @@ -969,6 +995,10 @@ safe_ets_update_counter(Tab, Key, UpdateOp, SuccessFun, FailThunk) -> safe_ets_update_counter_ok(Tab, Key, UpdateOp, FailThunk) -> safe_ets_update_counter(Tab, Key, UpdateOp, fun (_) -> ok end, FailThunk). +orddict_store(Key, Val, Dict) -> + false = orddict:is_key(Key, Dict), + orddict:store(Key, Val, Dict). + %%---------------------------------------------------------------------------- %% file helper functions %%---------------------------------------------------------------------------- @@ -1429,12 +1459,12 @@ maybe_roll_to_new_file( maybe_roll_to_new_file(_, State) -> State. -maybe_compact(State = #msstate { sum_valid_data = SumValid, - sum_file_size = SumFileSize, - gc_active = false, - gc_pid = GCPid, - file_summary_ets = FileSummaryEts, - file_size_limit = FileSizeLimit }) +maybe_compact(State = #msstate { sum_valid_data = SumValid, + sum_file_size = SumFileSize, + gc_pid = GCPid, + pending_gc_completion = Pending, + file_summary_ets = FileSummaryEts, + file_size_limit = FileSizeLimit }) when (SumFileSize > 2 * FileSizeLimit andalso (SumFileSize - SumValid) / SumFileSize > ?GARBAGE_FRACTION) -> %% TODO: the algorithm here is sub-optimal - it may result in a @@ -1443,27 +1473,30 @@ maybe_compact(State = #msstate { sum_valid_data = SumValid, '$end_of_table' -> State; First -> - case find_files_to_gc(FileSummaryEts, FileSizeLimit, - ets:lookup(FileSummaryEts, First)) of + case find_files_to_combine(FileSummaryEts, FileSizeLimit, + ets:lookup(FileSummaryEts, First)) of not_found -> State; {Src, Dst} -> + Pending1 = orddict_store(Dst, [], + orddict_store(Src, [], Pending)), State1 = close_handle(Src, close_handle(Dst, State)), true = ets:update_element(FileSummaryEts, Src, {#file_summary.locked, true}), true = ets:update_element(FileSummaryEts, Dst, {#file_summary.locked, true}), - ok = rabbit_msg_store_gc:gc(GCPid, Src, Dst), - State1 #msstate { gc_active = {Src, Dst} } + ok = rabbit_msg_store_gc:combine(GCPid, Src, Dst), + State1 #msstate { pending_gc_completion = Pending1 } end end; maybe_compact(State) -> State. -find_files_to_gc(FileSummaryEts, FileSizeLimit, - [#file_summary { file = Dst, - valid_total_size = DstValid, - right = Src }]) -> +find_files_to_combine(FileSummaryEts, FileSizeLimit, + [#file_summary { file = Dst, + valid_total_size = DstValid, + right = Src, + locked = DstLocked }]) -> case Src of undefined -> not_found; @@ -1471,13 +1504,16 @@ find_files_to_gc(FileSummaryEts, FileSizeLimit, [#file_summary { file = Src, valid_total_size = SrcValid, left = Dst, - right = SrcRight }] = Next = + right = SrcRight, + locked = SrcLocked }] = Next = ets:lookup(FileSummaryEts, Src), case SrcRight of undefined -> not_found; - _ -> case DstValid + SrcValid =< FileSizeLimit of + _ -> case (DstValid + SrcValid =< FileSizeLimit) andalso + (DstValid > 0) andalso (SrcValid > 0) andalso + not (DstLocked orelse SrcLocked) of true -> {Src, Dst}; - false -> find_files_to_gc( + false -> find_files_to_combine( FileSummaryEts, FileSizeLimit, Next) end end @@ -1486,85 +1522,86 @@ find_files_to_gc(FileSummaryEts, FileSizeLimit, delete_file_if_empty(File, State = #msstate { current_file = File }) -> State; delete_file_if_empty(File, State = #msstate { - dir = Dir, - sum_file_size = SumFileSize, - file_handles_ets = FileHandlesEts, - file_summary_ets = FileSummaryEts }) -> + gc_pid = GCPid, + file_summary_ets = FileSummaryEts, + pending_gc_completion = Pending }) -> [#file_summary { valid_total_size = ValidData, - left = Left, - right = Right, - file_size = FileSize, locked = false }] = ets:lookup(FileSummaryEts, File), case ValidData of - %% we should NEVER find the current file in here hence right - %% should always be a file, not undefined - 0 -> case {Left, Right} of - {undefined, _} when Right =/= undefined -> - %% the eldest file is empty. - true = ets:update_element( - FileSummaryEts, Right, - {#file_summary.left, undefined}); - {_, _} when Right =/= undefined -> - true = ets:update_element(FileSummaryEts, Right, - {#file_summary.left, Left}), - true = ets:update_element(FileSummaryEts, Left, - {#file_summary.right, Right}) - end, - true = mark_handle_to_close(FileHandlesEts, File), - true = ets:delete(FileSummaryEts, File), - {ok, Messages, FileSize} = - scan_file_for_valid_messages(Dir, filenum_to_name(File)), - [index_delete(Guid, State) || - {Guid, _TotalSize, _Offset} <- Messages], - State1 = close_handle(File, State), - ok = file:delete(form_filename(Dir, filenum_to_name(File))), - State1 #msstate { sum_file_size = SumFileSize - FileSize }; + 0 -> %% don't delete the file_summary_ets entry for File here + %% because we could have readers which need to be able to + %% decrement the readers count. + true = ets:update_element(FileSummaryEts, File, + {#file_summary.locked, true}), + ok = rabbit_msg_store_gc:delete(GCPid, File), + Pending1 = orddict_store(File, [], Pending), + close_handle(File, + State #msstate { pending_gc_completion = Pending1 }); _ -> State end. +cleanup_after_file_deletion(File, + #msstate { file_handles_ets = FileHandlesEts, + file_summary_ets = FileSummaryEts }) -> + %% Ensure that any clients that have open fhs to the file close + %% them before using them again. This has to be done here (given + %% it's done in the msg_store, and not the gc), and not when + %% starting up the GC, because if done when starting up the GC, + %% the client could find the close, and close and reopen the fh, + %% whilst the GC is waiting for readers to disappear, before it's + %% actually done the GC. + true = mark_handle_to_close(FileHandlesEts, File), + [#file_summary { left = Left, + right = Right, + locked = true, + readers = 0 }] = ets:lookup(FileSummaryEts, File), + %% We'll never delete the current file, so right is never undefined + true = Right =/= undefined, %% ASSERTION + true = ets:update_element(FileSummaryEts, Right, + {#file_summary.left, Left}), + %% ensure the double linked list is maintained + true = case Left of + undefined -> true; %% File is the eldest file (left-most) + _ -> ets:update_element(FileSummaryEts, Left, + {#file_summary.right, Right}) + end, + true = ets:delete(FileSummaryEts, File), + ok. + %%---------------------------------------------------------------------------- %% garbage collection / compaction / aggregation -- external %%---------------------------------------------------------------------------- -gc(SrcFile, DstFile, State = {FileSummaryEts, _Dir, _Index, _IndexState}) -> - [SrcObj = #file_summary { - readers = SrcReaders, - left = DstFile, - file_size = SrcFileSize, - locked = true }] = ets:lookup(FileSummaryEts, SrcFile), - [DstObj = #file_summary { - readers = DstReaders, - right = SrcFile, - file_size = DstFileSize, - locked = true }] = ets:lookup(FileSummaryEts, DstFile), - - case SrcReaders =:= 0 andalso DstReaders =:= 0 of - true -> TotalValidData = combine_files(SrcObj, DstObj, State), - %% don't update dest.right, because it could be - %% changing at the same time - true = ets:update_element( - FileSummaryEts, DstFile, - [{#file_summary.valid_total_size, TotalValidData}, - {#file_summary.file_size, TotalValidData}]), - SrcFileSize + DstFileSize - TotalValidData; - false -> concurrent_readers - end. - -combine_files(#file_summary { file = Source, - valid_total_size = SourceValid, - left = Destination }, - #file_summary { file = Destination, - valid_total_size = DestinationValid, - right = Source }, - State = {_FileSummaryEts, Dir, _Index, _IndexState}) -> - SourceName = filenum_to_name(Source), - DestinationName = filenum_to_name(Destination), +has_readers(File, #gc_state { file_summary_ets = FileSummaryEts }) -> + [#file_summary { locked = true, readers = Count }] = + ets:lookup(FileSummaryEts, File), + Count /= 0. + +combine_files(Source, Destination, + State = #gc_state { file_summary_ets = FileSummaryEts, + dir = Dir, + msg_store = Server }) -> + [#file_summary { + readers = 0, + left = Destination, + valid_total_size = SourceValid, + file_size = SourceFileSize, + locked = true }] = ets:lookup(FileSummaryEts, Source), + [#file_summary { + readers = 0, + right = Source, + valid_total_size = DestinationValid, + file_size = DestinationFileSize, + locked = true }] = ets:lookup(FileSummaryEts, Destination), + + SourceName = filenum_to_name(Source), + DestinationName = filenum_to_name(Destination), {ok, SourceHdl} = open_file(Dir, SourceName, ?READ_AHEAD_MODE), {ok, DestinationHdl} = open_file(Dir, DestinationName, ?READ_AHEAD_MODE ++ ?WRITE_MODE), - ExpectedSize = SourceValid + DestinationValid, + TotalValidData = SourceValid + DestinationValid, %% if DestinationValid =:= DestinationContiguousTop then we don't %% need a tmp file %% if they're not equal, then we need to write out everything past @@ -1577,7 +1614,7 @@ combine_files(#file_summary { file = Source, drop_contiguous_block_prefix(DestinationWorkList), case DestinationWorkListTail of [] -> ok = truncate_and_extend_file( - DestinationHdl, DestinationContiguousTop, ExpectedSize); + DestinationHdl, DestinationContiguousTop, TotalValidData); _ -> Tmp = filename:rootname(DestinationName) ++ ?FILE_EXTENSION_TMP, {ok, TmpHdl} = open_file(Dir, Tmp, ?READ_AHEAD_MODE++?WRITE_MODE), ok = copy_messages( @@ -1591,7 +1628,7 @@ combine_files(#file_summary { file = Source, %% Destination and copy from Tmp back to the end {ok, 0} = file_handle_cache:position(TmpHdl, 0), ok = truncate_and_extend_file( - DestinationHdl, DestinationContiguousTop, ExpectedSize), + DestinationHdl, DestinationContiguousTop, TotalValidData), {ok, TmpSize} = file_handle_cache:copy(TmpHdl, DestinationHdl, TmpSize), %% position in DestinationHdl should now be DestinationValid @@ -1599,14 +1636,36 @@ combine_files(#file_summary { file = Source, ok = file_handle_cache:delete(TmpHdl) end, {SourceWorkList, SourceValid} = load_and_vacuum_message_file(Source, State), - ok = copy_messages(SourceWorkList, DestinationValid, ExpectedSize, + ok = copy_messages(SourceWorkList, DestinationValid, TotalValidData, SourceHdl, DestinationHdl, Destination, State), %% tidy up ok = file_handle_cache:close(DestinationHdl), ok = file_handle_cache:delete(SourceHdl), - ExpectedSize. -load_and_vacuum_message_file(File, {_FileSummaryEts, Dir, Index, IndexState}) -> + %% don't update dest.right, because it could be changing at the + %% same time + true = ets:update_element( + FileSummaryEts, Destination, + [{#file_summary.valid_total_size, TotalValidData}, + {#file_summary.file_size, TotalValidData}]), + + Reclaimed = SourceFileSize + DestinationFileSize - TotalValidData, + gen_server2:cast(Server, {combine_files, Source, Destination, Reclaimed}). + +delete_file(File, State = #gc_state { file_summary_ets = FileSummaryEts, + dir = Dir, + msg_store = Server }) -> + [#file_summary { valid_total_size = 0, + locked = true, + file_size = FileSize, + readers = 0 }] = ets:lookup(FileSummaryEts, File), + {[], 0} = load_and_vacuum_message_file(File, State), + ok = file:delete(form_filename(Dir, filenum_to_name(File))), + gen_server2:cast(Server, {delete_file, File, FileSize}). + +load_and_vacuum_message_file(File, #gc_state { dir = Dir, + index_module = Index, + index_state = IndexState }) -> %% Messages here will be end-of-file at start-of-list {ok, Messages, _FileSize} = scan_file_for_valid_messages(Dir, filenum_to_name(File)), @@ -1627,7 +1686,8 @@ load_and_vacuum_message_file(File, {_FileSummaryEts, Dir, Index, IndexState}) -> end, {[], 0}, Messages). copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl, - Destination, {_FileSummaryEts, _Dir, Index, IndexState}) -> + Destination, #gc_state { index_module = Index, + index_state = IndexState }) -> Copy = fun ({BlockStart, BlockEnd}) -> BSize = BlockEnd - BlockStart, {ok, BlockStart} = diff --git a/src/rabbit_msg_store_gc.erl b/src/rabbit_msg_store_gc.erl index a7855bbf..cd9fd497 100644 --- a/src/rabbit_msg_store_gc.erl +++ b/src/rabbit_msg_store_gc.erl @@ -33,20 +33,16 @@ -behaviour(gen_server2). --export([start_link/4, gc/3, no_readers/2, stop/1]). +-export([start_link/1, combine/3, delete/2, no_readers/2, stop/1]). -export([set_maximum_since_use/2]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3, prioritise_cast/2]). --record(gcstate, - {dir, - index_state, - index_module, - parent, - file_summary_ets, - scheduled +-record(state, + { pending_no_readers, + msg_store_state }). -include("rabbit.hrl"). @@ -55,10 +51,12 @@ -ifdef(use_specs). --spec(start_link/4 :: (file:filename(), any(), atom(), ets:tid()) -> +-spec(start_link/1 :: (rabbit_msg_store:gc_state()) -> rabbit_types:ok_pid_or_error()). --spec(gc/3 :: (pid(), non_neg_integer(), non_neg_integer()) -> 'ok'). --spec(no_readers/2 :: (pid(), non_neg_integer()) -> 'ok'). +-spec(combine/3 :: (pid(), rabbit_msg_store:file_num(), + rabbit_msg_store:file_num()) -> 'ok'). +-spec(delete/2 :: (pid(), rabbit_msg_store:file_num()) -> 'ok'). +-spec(no_readers/2 :: (pid(), rabbit_msg_store:file_num()) -> 'ok'). -spec(stop/1 :: (pid()) -> 'ok'). -spec(set_maximum_since_use/2 :: (pid(), non_neg_integer()) -> 'ok'). @@ -66,13 +64,15 @@ %%---------------------------------------------------------------------------- -start_link(Dir, IndexState, IndexModule, FileSummaryEts) -> - gen_server2:start_link( - ?MODULE, [self(), Dir, IndexState, IndexModule, FileSummaryEts], - [{timeout, infinity}]). +start_link(MsgStoreState) -> + gen_server2:start_link(?MODULE, [MsgStoreState], + [{timeout, infinity}]). -gc(Server, Source, Destination) -> - gen_server2:cast(Server, {gc, Source, Destination}). +combine(Server, Source, Destination) -> + gen_server2:cast(Server, {combine, Source, Destination}). + +delete(Server, File) -> + gen_server2:cast(Server, {delete, File}). no_readers(Server, File) -> gen_server2:cast(Server, {no_readers, File}). @@ -85,16 +85,11 @@ set_maximum_since_use(Pid, Age) -> %%---------------------------------------------------------------------------- -init([Parent, Dir, IndexState, IndexModule, FileSummaryEts]) -> +init([MsgStoreState]) -> ok = file_handle_cache:register_callback(?MODULE, set_maximum_since_use, [self()]), - {ok, #gcstate { dir = Dir, - index_state = IndexState, - index_module = IndexModule, - parent = Parent, - file_summary_ets = FileSummaryEts, - scheduled = undefined }, - hibernate, + {ok, #state { pending_no_readers = dict:new(), + msg_store_state = MsgStoreState }, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. prioritise_cast({set_maximum_since_use, _Age}, _State) -> 8; @@ -103,18 +98,23 @@ prioritise_cast(_Msg, _State) -> 0. handle_call(stop, _From, State) -> {stop, normal, ok, State}. -handle_cast({gc, Source, Destination}, - State = #gcstate { scheduled = undefined }) -> - {noreply, attempt_gc(State #gcstate { scheduled = {Source, Destination} }), - hibernate}; +handle_cast({combine, Source, Destination}, State) -> + {noreply, attempt_action(combine, [Source, Destination], State), hibernate}; -handle_cast({no_readers, File}, - State = #gcstate { scheduled = {Source, Destination} }) - when File =:= Source orelse File =:= Destination -> - {noreply, attempt_gc(State), hibernate}; +handle_cast({delete, File}, State) -> + {noreply, attempt_action(delete, [File], State), hibernate}; -handle_cast({no_readers, _File}, State) -> - {noreply, State, hibernate}; +handle_cast({no_readers, File}, + State = #state { pending_no_readers = Pending }) -> + {noreply, case dict:find(File, Pending) of + error -> + State; + {ok, {Action, Files}} -> + Pending1 = dict:erase(File, Pending), + attempt_action( + Action, Files, + State #state { pending_no_readers = Pending1 }) + end, hibernate}; handle_cast({set_maximum_since_use, Age}, State) -> ok = file_handle_cache:set_maximum_since_use(Age), @@ -129,16 +129,18 @@ terminate(_Reason, State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. -attempt_gc(State = #gcstate { dir = Dir, - index_state = IndexState, - index_module = Index, - parent = Parent, - file_summary_ets = FileSummaryEts, - scheduled = {Source, Destination} }) -> - case rabbit_msg_store:gc(Source, Destination, - {FileSummaryEts, Dir, Index, IndexState}) of - concurrent_readers -> State; - Reclaimed -> ok = rabbit_msg_store:gc_done( - Parent, Reclaimed, Source, Destination), - State #gcstate { scheduled = undefined } +attempt_action(Action, Files, + State = #state { pending_no_readers = Pending, + msg_store_state = MsgStoreState }) -> + case [File || File <- Files, + rabbit_msg_store:has_readers(File, MsgStoreState)] of + [] -> do_action(Action, Files, MsgStoreState), + State; + [File | _] -> Pending1 = dict:store(File, {Action, Files}, Pending), + State #state { pending_no_readers = Pending1 } end. + +do_action(combine, [Source, Destination], MsgStoreState) -> + rabbit_msg_store:combine_files(Source, Destination, MsgStoreState); +do_action(delete, [File], MsgStoreState) -> + rabbit_msg_store:delete_file(File, MsgStoreState). diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index db5c71f6..03a9b386 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -41,7 +41,7 @@ %%used by TCP-based transports, e.g. STOMP adapter -export([check_tcp_listener_address/3]). --export([tcp_listener_started/2, tcp_listener_stopped/2, +-export([tcp_listener_started/3, tcp_listener_stopped/3, start_client/1, start_ssl_client/2]). -include("rabbit.hrl"). @@ -160,14 +160,14 @@ check_tcp_listener_address(NamePrefix, Host, Port) -> {IPAddress, Name}. start_tcp_listener(Host, Port) -> - start_listener(Host, Port, "TCP Listener", + start_listener(Host, Port, amqp, "TCP Listener", {?MODULE, start_client, []}). start_ssl_listener(Host, Port, SslOpts) -> - start_listener(Host, Port, "SSL Listener", + start_listener(Host, Port, 'amqp/ssl', "SSL Listener", {?MODULE, start_ssl_client, [SslOpts]}). -start_listener(Host, Port, Label, OnConnect) -> +start_listener(Host, Port, Protocol, Label, OnConnect) -> {IPAddress, Name} = check_tcp_listener_address(rabbit_tcp_listener_sup, Host, Port), {ok,_} = supervisor:start_child( @@ -175,8 +175,8 @@ start_listener(Host, Port, Label, OnConnect) -> {Name, {tcp_listener_sup, start_link, [IPAddress, Port, ?RABBIT_TCP_OPTS , - {?MODULE, tcp_listener_started, []}, - {?MODULE, tcp_listener_stopped, []}, + {?MODULE, tcp_listener_started, [Protocol]}, + {?MODULE, tcp_listener_stopped, [Protocol]}, OnConnect, Label]}, transient, infinity, supervisor, [tcp_listener_sup]}), ok. @@ -188,20 +188,25 @@ stop_tcp_listener(Host, Port) -> ok = supervisor:delete_child(rabbit_sup, Name), ok. -tcp_listener_started(IPAddress, Port) -> +tcp_listener_started(Protocol, IPAddress, Port) -> + %% We need the ip to distinguish e.g. 0.0.0.0 and 127.0.0.1 + %% We need the host so we can distinguish multiple instances of the above + %% in a cluster. ok = mnesia:dirty_write( rabbit_listener, #listener{node = node(), - protocol = tcp, + protocol = Protocol, host = tcp_host(IPAddress), + ip_address = IPAddress, port = Port}). -tcp_listener_stopped(IPAddress, Port) -> +tcp_listener_stopped(Protocol, IPAddress, Port) -> ok = mnesia:dirty_delete_object( rabbit_listener, #listener{node = node(), - protocol = tcp, + protocol = Protocol, host = tcp_host(IPAddress), + ip_address = IPAddress, port = Port}). active_listeners() -> diff --git a/src/rabbit_persister.erl b/src/rabbit_persister.erl index 66e5cf63..11056c8e 100644 --- a/src/rabbit_persister.erl +++ b/src/rabbit_persister.erl @@ -69,7 +69,8 @@ -type(pmsg() :: {rabbit_amqqueue:name(), pkey()}). -type(work_item() :: - {publish, rabbit_types:message(), pmsg()} | + {publish, + rabbit_types:message(), rabbit_types:message_properties(), pmsg()} | {deliver, pmsg()} | {ack, pmsg()}). @@ -173,9 +174,10 @@ handle_call(force_snapshot, _From, State) -> handle_call({queue_content, QName}, _From, State = #pstate{snapshot = #psnapshot{messages = Messages, queues = Queues}}) -> - MatchSpec= [{{{QName,'$1'}, '$2', '$3'}, [], [{{'$3', '$1', '$2'}}]}], - do_reply([{ets:lookup_element(Messages, K, 2), D} || - {_, K, D} <- lists:sort(ets:select(Queues, MatchSpec))], + MatchSpec= [{{{QName,'$1'}, '$2', '$3', '$4'}, [], + [{{'$4', '$1', '$2', '$3'}}]}], + do_reply([{ets:lookup_element(Messages, K, 2), MP, D} || + {_, K, D, MP} <- lists:sort(ets:select(Queues, MatchSpec))], State); handle_call(_Request, _From, State) -> {noreply, State}. @@ -243,9 +245,9 @@ log_work(CreateWorkUnit, MessageList, snapshot = Snapshot = #psnapshot{messages = Messages}}) -> Unit = CreateWorkUnit( rabbit_misc:map_in_order( - fun (M = {publish, Message, QK = {_QName, PKey}}) -> + fun (M = {publish, Message, MsgProps, QK = {_QName, PKey}}) -> case ets:lookup(Messages, PKey) of - [_] -> {tied, QK}; + [_] -> {tied, MsgProps, QK}; [] -> ets:insert(Messages, {PKey, Message}), M end; @@ -356,7 +358,8 @@ current_snapshot(_Snapshot = #psnapshot{transactions = Ts, next_seq_id = NextSeqId}) -> %% Avoid infinite growth of the table by removing messages not %% bound to a queue anymore - PKeys = ets:foldl(fun ({{_QName, PKey}, _Delivered, _SeqId}, S) -> + PKeys = ets:foldl(fun ({{_QName, PKey}, _Delivered, + _MsgProps, _SeqId}, S) -> sets:add_element(PKey, S) end, sets:new(), Queues), prune_table(Messages, fun (Key) -> sets:is_element(Key, PKeys) end), @@ -474,14 +477,14 @@ perform_work(MessageList, Messages, Queues, SeqId) -> perform_work_item(Item, Messages, Queues, NextSeqId) end, SeqId, MessageList). -perform_work_item({publish, Message, QK = {_QName, PKey}}, +perform_work_item({publish, Message, MsgProps, QK = {_QName, PKey}}, Messages, Queues, NextSeqId) -> true = ets:insert(Messages, {PKey, Message}), - true = ets:insert(Queues, {QK, false, NextSeqId}), + true = ets:insert(Queues, {QK, false, MsgProps, NextSeqId}), NextSeqId + 1; -perform_work_item({tied, QK}, _Messages, Queues, NextSeqId) -> - true = ets:insert(Queues, {QK, false, NextSeqId}), +perform_work_item({tied, MsgProps, QK}, _Messages, Queues, NextSeqId) -> + true = ets:insert(Queues, {QK, false, MsgProps, NextSeqId}), NextSeqId + 1; perform_work_item({deliver, QK}, _Messages, Queues, NextSeqId) -> diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 0b98290c..28d0b47d 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -31,8 +31,9 @@ -module(rabbit_queue_index). --export([init/4, terminate/2, delete_and_terminate/1, publish/4, - deliver/2, ack/2, sync/2, flush/1, read/3, +-export([init/1, shutdown_terms/1, recover/4, + terminate/2, delete_and_terminate/1, + publish/5, deliver/2, ack/2, sync/2, flush/1, read/3, next_segment_boundary/1, bounds/1, recover/1]). -define(CLEAN_FILENAME, "clean.dot"). @@ -98,12 +99,12 @@ %% and seeding the message store on start up. %% %% Note that in general, the representation of a message's state as -%% the tuple: {('no_pub'|{Guid, IsPersistent}), ('del'|'no_del'), -%% ('ack'|'no_ack')} is richer than strictly necessary for most -%% operations. However, for startup, and to ensure the safe and -%% correct combination of journal entries with entries read from the -%% segment on disk, this richer representation vastly simplifies and -%% clarifies the code. +%% the tuple: {('no_pub'|{Guid, MsgProps, IsPersistent}), +%% ('del'|'no_del'), ('ack'|'no_ack')} is richer than strictly +%% necessary for most operations. However, for startup, and to ensure +%% the safe and correct combination of journal entries with entries +%% read from the segment on disk, this richer representation vastly +%% simplifies and clarifies the code. %% %% For notes on Clean Shutdown and startup, see documentation in %% variable_queue. @@ -141,14 +142,19 @@ -define(REL_SEQ_ONLY_ENTRY_LENGTH_BYTES, 2). %% publish record is binary 1 followed by a bit for is_persistent, -%% then 14 bits of rel seq id, and 128 bits of md5sum msg id +%% then 14 bits of rel seq id, 64 bits for message expiry and 128 bits +%% of md5sum msg id -define(PUBLISH_PREFIX, 1). -define(PUBLISH_PREFIX_BITS, 1). +-define(EXPIRY_BYTES, 8). +-define(EXPIRY_BITS, (?EXPIRY_BYTES * 8)). +-define(NO_EXPIRY, 0). + -define(GUID_BYTES, 16). %% md5sum is 128 bit or 16 bytes -define(GUID_BITS, (?GUID_BYTES * 8)). -%% 16 bytes for md5sum + 2 for seq, bits and prefix --define(PUBLISH_RECORD_LENGTH_BYTES, ?GUID_BYTES + 2). +%% 16 bytes for md5sum + 8 for expiry + 2 for seq, bits and prefix +-define(PUBLISH_RECORD_LENGTH_BYTES, ?GUID_BYTES + ?EXPIRY_BYTES + 2). %% 1 publish, 1 deliver, 1 ack per msg -define(SEGMENT_TOTAL_SIZE, ?SEGMENT_ENTRY_COUNT * @@ -157,7 +163,7 @@ %% ---- misc ---- --define(PUB, {_, _}). %% {Guid, IsPersistent} +-define(PUB, {_, _, _}). %% {Guid, MsgProps, IsPersistent} -define(READ_MODE, [binary, raw, read]). -define(READ_AHEAD_MODE, [{read_ahead, ?SEGMENT_TOTAL_SIZE} | ?READ_MODE]). @@ -194,21 +200,26 @@ -type(startup_fun_state() :: {(fun ((A) -> 'finished' | {rabbit_guid:guid(), non_neg_integer(), A})), A}). +-type(shutdown_terms() :: [any()]). --spec(init/4 :: (rabbit_amqqueue:name(), boolean(), boolean(), - fun ((rabbit_guid:guid()) -> boolean())) -> - {'undefined' | non_neg_integer(), [any()], qistate()}). +-spec(init/1 :: (rabbit_amqqueue:name()) -> qistate()). +-spec(shutdown_terms/1 :: (rabbit_amqqueue:name()) -> shutdown_terms()). +-spec(recover/4 :: (rabbit_amqqueue:name(), shutdown_terms(), boolean(), + fun ((rabbit_guid:guid()) -> boolean())) -> + {'undefined' | non_neg_integer(), qistate()}). -spec(terminate/2 :: ([any()], qistate()) -> qistate()). -spec(delete_and_terminate/1 :: (qistate()) -> qistate()). --spec(publish/4 :: (rabbit_guid:guid(), seq_id(), boolean(), qistate()) -> - qistate()). +-spec(publish/5 :: (rabbit_guid:guid(), seq_id(), + rabbit_types:message_properties(), boolean(), qistate()) + -> qistate()). -spec(deliver/2 :: ([seq_id()], qistate()) -> qistate()). -spec(ack/2 :: ([seq_id()], qistate()) -> qistate()). -spec(sync/2 :: ([seq_id()], qistate()) -> qistate()). -spec(flush/1 :: (qistate()) -> qistate()). -spec(read/3 :: (seq_id(), seq_id(), qistate()) -> - {[{rabbit_guid:guid(), seq_id(), boolean(), boolean()}], - qistate()}). + {[{rabbit_guid:guid(), seq_id(), + rabbit_types:message_properties(), + boolean(), boolean()}], qistate()}). -spec(next_segment_boundary/1 :: (seq_id()) -> seq_id()). -spec(bounds/1 :: (qistate()) -> {non_neg_integer(), non_neg_integer(), qistate()}). @@ -222,25 +233,26 @@ %% public API %%---------------------------------------------------------------------------- -init(Name, false, _MsgStoreRecovered, _ContainsCheckFun) -> +init(Name) -> State = #qistate { dir = Dir } = blank_state(Name), false = filelib:is_file(Dir), %% is_file == is file or dir - {0, [], State}; + State. + +shutdown_terms(Name) -> + #qistate { dir = Dir } = blank_state(Name), + case read_shutdown_terms(Dir) of + {error, _} -> []; + {ok, Terms1} -> Terms1 + end. -init(Name, true, MsgStoreRecovered, ContainsCheckFun) -> +recover(Name, Terms, MsgStoreRecovered, ContainsCheckFun) -> State = #qistate { dir = Dir } = blank_state(Name), - Terms = case read_shutdown_terms(Dir) of - {error, _} -> []; - {ok, Terms1} -> Terms1 - end, CleanShutdown = detect_clean_shutdown(Dir), - {Count, State1} = - case CleanShutdown andalso MsgStoreRecovered of - true -> RecoveredCounts = proplists:get_value(segments, Terms, []), - init_clean(RecoveredCounts, State); - false -> init_dirty(CleanShutdown, ContainsCheckFun, State) - end, - {Count, Terms, State1}. + case CleanShutdown andalso MsgStoreRecovered of + true -> RecoveredCounts = proplists:get_value(segments, Terms, []), + init_clean(RecoveredCounts, State); + false -> init_dirty(CleanShutdown, ContainsCheckFun, State) + end. terminate(Terms, State) -> {SegmentCounts, State1 = #qistate { dir = Dir }} = terminate(State), @@ -252,15 +264,18 @@ delete_and_terminate(State) -> ok = rabbit_misc:recursive_delete([Dir]), State1. -publish(Guid, SeqId, IsPersistent, State) when is_binary(Guid) -> +publish(Guid, SeqId, MsgProps, IsPersistent, State) when is_binary(Guid) -> ?GUID_BYTES = size(Guid), {JournalHdl, State1} = get_journal_handle(State), ok = file_handle_cache:append( JournalHdl, [<<(case IsPersistent of true -> ?PUB_PERSIST_JPREFIX; false -> ?PUB_TRANS_JPREFIX - end):?JPREFIX_BITS, SeqId:?SEQ_BITS>>, Guid]), - maybe_flush_journal(add_to_journal(SeqId, {Guid, IsPersistent}, State1)). + end):?JPREFIX_BITS, + SeqId:?SEQ_BITS>>, + create_pub_record_body(Guid, MsgProps)]), + maybe_flush_journal( + add_to_journal(SeqId, {Guid, MsgProps, IsPersistent}, State1)). deliver(SeqIds, State) -> deliver_or_ack(del, SeqIds, State). @@ -453,7 +468,7 @@ recover_segment(ContainsCheckFun, CleanShutdown, {SegEntries1, UnackedCountDelta} = segment_plus_journal(SegEntries, JEntries), array:sparse_foldl( - fun (RelSeq, {{Guid, _IsPersistent}, Del, no_ack}, Segment1) -> + fun (RelSeq, {{Guid, _MsgProps, _IsPersistent}, Del, no_ack}, Segment1) -> recover_message(ContainsCheckFun(Guid), CleanShutdown, Del, RelSeq, Segment1) end, @@ -506,7 +521,8 @@ queue_index_walker_reader(QueueName, Gatherer) -> State = #qistate { segments = Segments, dir = Dir } = recover_journal(blank_state(QueueName)), [ok = segment_entries_foldr( - fun (_RelSeq, {{Guid, true}, _IsDelivered, no_ack}, ok) -> + fun (_RelSeq, {{Guid, _MsgProps, true}, _IsDelivered, no_ack}, + ok) -> gatherer:in(Gatherer, {Guid, 1}); (_RelSeq, _Value, Acc) -> Acc @@ -516,6 +532,32 @@ queue_index_walker_reader(QueueName, Gatherer) -> ok = gatherer:finish(Gatherer). %%---------------------------------------------------------------------------- +%% expiry/binary manipulation +%%---------------------------------------------------------------------------- + +create_pub_record_body(Guid, #message_properties{expiry = Expiry}) -> + [Guid, expiry_to_binary(Expiry)]. + +expiry_to_binary(undefined) -> <<?NO_EXPIRY:?EXPIRY_BITS>>; +expiry_to_binary(Expiry) -> <<Expiry:?EXPIRY_BITS>>. + +read_pub_record_body(Hdl) -> + case file_handle_cache:read(Hdl, ?GUID_BYTES + ?EXPIRY_BYTES) of + {ok, Bin} -> + %% work around for binary data fragmentation. See + %% rabbit_msg_file:read_next/2 + <<GuidNum:?GUID_BITS, Expiry:?EXPIRY_BITS>> = Bin, + <<Guid:?GUID_BYTES/binary>> = <<GuidNum:?GUID_BITS>>, + Exp = case Expiry of + ?NO_EXPIRY -> undefined; + X -> X + end, + {Guid, #message_properties{expiry = Exp}}; + Error -> + Error + end. + +%%---------------------------------------------------------------------------- %% journal manipulation %%---------------------------------------------------------------------------- @@ -636,17 +678,13 @@ load_journal_entries(State = #qistate { journal_handle = Hdl }) -> ?ACK_JPREFIX -> load_journal_entries(add_to_journal(SeqId, ack, State)); _ -> - case file_handle_cache:read(Hdl, ?GUID_BYTES) of - {ok, <<GuidNum:?GUID_BITS>>} -> - %% work around for binary data - %% fragmentation. See - %% rabbit_msg_file:read_next/2 - <<Guid:?GUID_BYTES/binary>> = - <<GuidNum:?GUID_BITS>>, - Publish = {Guid, case Prefix of - ?PUB_PERSIST_JPREFIX -> true; - ?PUB_TRANS_JPREFIX -> false - end}, + case read_pub_record_body(Hdl) of + {Guid, MsgProps} -> + Publish = {Guid, MsgProps, + case Prefix of + ?PUB_PERSIST_JPREFIX -> true; + ?PUB_TRANS_JPREFIX -> false + end}, load_journal_entries( add_to_journal(SeqId, Publish, State)); _ErrOrEoF -> %% err, we've lost at least a publish @@ -744,11 +782,12 @@ write_entry_to_segment(RelSeq, {Pub, Del, Ack}, Hdl) -> ok = case Pub of no_pub -> ok; - {Guid, IsPersistent} -> + {Guid, MsgProps, IsPersistent} -> file_handle_cache:append( Hdl, [<<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS, (bool_to_int(IsPersistent)):1, - RelSeq:?REL_SEQ_BITS>>, Guid]) + RelSeq:?REL_SEQ_BITS>>, + create_pub_record_body(Guid, MsgProps)]) end, ok = case {Del, Ack} of {no_del, no_ack} -> @@ -768,10 +807,10 @@ read_bounded_segment(Seg, {StartSeg, StartRelSeq}, {EndSeg, EndRelSeq}, {Messages, Segments}, Dir) -> Segment = segment_find_or_new(Seg, Dir, Segments), {segment_entries_foldr( - fun (RelSeq, {{Guid, IsPersistent}, IsDelivered, no_ack}, Acc) + fun (RelSeq, {{Guid, MsgProps, IsPersistent}, IsDelivered, no_ack}, Acc) when (Seg > StartSeg orelse StartRelSeq =< RelSeq) andalso (Seg < EndSeg orelse EndRelSeq >= RelSeq) -> - [ {Guid, reconstruct_seq_id(StartSeg, RelSeq), + [ {Guid, reconstruct_seq_id(StartSeg, RelSeq), MsgProps, IsPersistent, IsDelivered == del} | Acc ]; (_RelSeq, _Value, Acc) -> Acc @@ -801,10 +840,8 @@ load_segment_entries(KeepAcked, Hdl, SegEntries, UnackedCount) -> case file_handle_cache:read(Hdl, ?REL_SEQ_ONLY_ENTRY_LENGTH_BYTES) of {ok, <<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS, IsPersistentNum:1, RelSeq:?REL_SEQ_BITS>>} -> - %% because we specify /binary, and binaries are complete - %% bytes, the size spec is in bytes, not bits. - {ok, Guid} = file_handle_cache:read(Hdl, ?GUID_BYTES), - Obj = {{Guid, 1 == IsPersistentNum}, no_del, no_ack}, + {Guid, MsgProps} = read_pub_record_body(Hdl), + Obj = {{Guid, MsgProps, 1 == IsPersistentNum}, no_del, no_ack}, SegEntries1 = array:set(RelSeq, Obj, SegEntries), load_segment_entries(KeepAcked, Hdl, SegEntries1, UnackedCount + 1); diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 29004bd5..127467bb 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -235,12 +235,29 @@ conserve_memory(Pid, Conserve) -> server_properties() -> {ok, Product} = application:get_key(rabbit, id), {ok, Version} = application:get_key(rabbit, vsn), - [{list_to_binary(K), longstr, list_to_binary(V)} || - {K, V} <- [{"product", Product}, - {"version", Version}, - {"platform", "Erlang/OTP"}, - {"copyright", ?COPYRIGHT_MESSAGE}, - {"information", ?INFORMATION_MESSAGE}]]. + + %% Get any configuration-specified server properties + {ok, RawConfigServerProps} = application:get_env(rabbit, + server_properties), + + %% Normalize the simplifed (2-tuple) and unsimplified (3-tuple) forms + %% from the config and merge them with the generated built-in properties + NormalizedConfigServerProps = + [case X of + {KeyAtom, Value} -> {list_to_binary(atom_to_list(KeyAtom)), + longstr, + list_to_binary(Value)}; + {BinKey, Type, Value} -> {BinKey, Type, Value} + end || X <- RawConfigServerProps ++ + [{product, Product}, + {version, Version}, + {platform, "Erlang/OTP"}, + {copyright, ?COPYRIGHT_MESSAGE}, + {information, ?INFORMATION_MESSAGE}]], + + %% Filter duplicated properties in favor of config file provided values + lists:usort(fun ({K1,_,_}, {K2,_,_}) -> K1 =< K2 end, + NormalizedConfigServerProps). inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F). diff --git a/src/rabbit_ssl.erl b/src/rabbit_ssl.erl index be451af6..4335dd2e 100644 --- a/src/rabbit_ssl.erl +++ b/src/rabbit_ssl.erl @@ -34,7 +34,6 @@ -include("rabbit.hrl"). -include_lib("public_key/include/public_key.hrl"). --include_lib("ssl/src/ssl_int.hrl"). -export([peer_cert_issuer/1, peer_cert_subject/1, peer_cert_validity/1]). diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 1b47cdb7..71b23e01 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -41,8 +41,8 @@ -include("rabbit_framing.hrl"). -include_lib("kernel/include/file.hrl"). --define(PERSISTENT_MSG_STORE, msg_store_persistent). --define(TRANSIENT_MSG_STORE, msg_store_transient). +-define(PERSISTENT_MSG_STORE, msg_store_persistent). +-define(TRANSIENT_MSG_STORE, msg_store_transient). test_content_prop_roundtrip(Datum, Binary) -> Types = [element(1, E) || E <- Datum], @@ -73,6 +73,7 @@ all_tests() -> passed = test_user_management(), passed = test_server_status(), passed = maybe_run_cluster_dependent_tests(), + passed = test_configurable_server_properties(), passed. maybe_run_cluster_dependent_tests() -> @@ -962,9 +963,6 @@ test_user_management() -> control_action(list_permissions, [], [{"-p", "/testhost"}]), {error, {invalid_regexp, _, _}} = control_action(set_permissions, ["guest", "+foo", ".*", ".*"]), - {error, {invalid_scope, _}} = - control_action(set_permissions, ["guest", "foo", ".*", ".*"], - [{"-s", "cilent"}]), %% user creation ok = control_action(add_user, ["foo", "bar"]), @@ -987,9 +985,7 @@ test_user_management() -> ok = control_action(set_permissions, ["foo", ".*", ".*", ".*"], [{"-p", "/testhost"}]), ok = control_action(set_permissions, ["foo", ".*", ".*", ".*"], - [{"-p", "/testhost"}, {"-s", "client"}]), - ok = control_action(set_permissions, ["foo", ".*", ".*", ".*"], - [{"-p", "/testhost"}, {"-s", "all"}]), + [{"-p", "/testhost"}]), ok = control_action(list_permissions, [], [{"-p", "/testhost"}]), ok = control_action(list_permissions, [], [{"-p", "/testhost"}]), ok = control_action(list_user_permissions, ["foo"]), @@ -1297,7 +1293,7 @@ info_action(Command, Args, CheckVHost) -> {bad_argument, dummy} = control_action(Command, ["dummy"]), ok. -default_options() -> [{"-s", "client"}, {"-p", "/"}, {"-q", "false"}]. +default_options() -> [{"-p", "/"}, {"-q", "false"}]. expand_options(As, Bs) -> lists:foldl(fun({K, _}=A, R) -> @@ -1413,6 +1409,7 @@ test_backing_queue() -> application:set_env(rabbit, msg_store_file_size_limit, FileSizeLimit, infinity), passed = test_queue_index(), + passed = test_queue_index_props(), passed = test_variable_queue(), passed = test_queue_recover(), application:set_env(rabbit, queue_index_max_journal_entries, @@ -1430,17 +1427,17 @@ restart_msg_store_empty() -> guid_bin(X) -> erlang:md5(term_to_binary(X)). -msg_store_contains(Atom, Guids) -> +msg_store_contains(Atom, Guids, MSCState) -> Atom = lists:foldl( fun (Guid, Atom1) when Atom1 =:= Atom -> - rabbit_msg_store:contains(?PERSISTENT_MSG_STORE, Guid) end, + rabbit_msg_store:contains(Guid, MSCState) end, Atom, Guids). -msg_store_sync(Guids) -> +msg_store_sync(Guids, MSCState) -> Ref = make_ref(), Self = self(), - ok = rabbit_msg_store:sync(?PERSISTENT_MSG_STORE, Guids, - fun () -> Self ! {sync, Ref} end), + ok = rabbit_msg_store:sync(Guids, fun () -> Self ! {sync, Ref} end, + MSCState), receive {sync, Ref} -> ok after @@ -1452,55 +1449,64 @@ msg_store_sync(Guids) -> msg_store_read(Guids, MSCState) -> lists:foldl(fun (Guid, MSCStateM) -> {{ok, Guid}, MSCStateN} = rabbit_msg_store:read( - ?PERSISTENT_MSG_STORE, Guid, MSCStateM), MSCStateN end, MSCState, Guids). msg_store_write(Guids, MSCState) -> - lists:foldl(fun (Guid, {ok, MSCStateN}) -> - rabbit_msg_store:write(?PERSISTENT_MSG_STORE, - Guid, Guid, MSCStateN) - end, {ok, MSCState}, Guids). + ok = lists:foldl( + fun (Guid, ok) -> rabbit_msg_store:write(Guid, Guid, MSCState) end, + ok, Guids). + +msg_store_remove(Guids, MSCState) -> + rabbit_msg_store:remove(Guids, MSCState). -msg_store_remove(Guids) -> - rabbit_msg_store:remove(?PERSISTENT_MSG_STORE, Guids). +msg_store_remove(MsgStore, Ref, Guids) -> + with_msg_store_client(MsgStore, Ref, + fun (MSCStateM) -> + ok = msg_store_remove(Guids, MSCStateM), + MSCStateM + end). + +with_msg_store_client(MsgStore, Ref, Fun) -> + rabbit_msg_store:client_terminate( + Fun(rabbit_msg_store:client_init(MsgStore, Ref))). foreach_with_msg_store_client(MsgStore, Ref, Fun, L) -> rabbit_msg_store:client_terminate( - lists:foldl(fun (Guid, MSCState) -> Fun(Guid, MsgStore, MSCState) end, - rabbit_msg_store:client_init(MsgStore, Ref), L), MsgStore). + lists:foldl(fun (Guid, MSCState) -> Fun(Guid, MSCState) end, + rabbit_msg_store:client_init(MsgStore, Ref), L)). test_msg_store() -> restart_msg_store_empty(), Self = self(), Guids = [guid_bin(M) || M <- lists:seq(1,100)], {Guids1stHalf, Guids2ndHalf} = lists:split(50, Guids), - %% check we don't contain any of the msgs we're about to publish - false = msg_store_contains(false, Guids), Ref = rabbit_guid:guid(), MSCState = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref), + %% check we don't contain any of the msgs we're about to publish + false = msg_store_contains(false, Guids, MSCState), %% publish the first half - {ok, MSCState1} = msg_store_write(Guids1stHalf, MSCState), + ok = msg_store_write(Guids1stHalf, MSCState), %% sync on the first half - ok = msg_store_sync(Guids1stHalf), + ok = msg_store_sync(Guids1stHalf, MSCState), %% publish the second half - {ok, MSCState2} = msg_store_write(Guids2ndHalf, MSCState1), + ok = msg_store_write(Guids2ndHalf, MSCState), %% sync on the first half again - the msg_store will be dirty, but %% we won't need the fsync - ok = msg_store_sync(Guids1stHalf), + ok = msg_store_sync(Guids1stHalf, MSCState), %% check they're all in there - true = msg_store_contains(true, Guids), + true = msg_store_contains(true, Guids, MSCState), %% publish the latter half twice so we hit the caching and ref count code - {ok, MSCState3} = msg_store_write(Guids2ndHalf, MSCState2), + ok = msg_store_write(Guids2ndHalf, MSCState), %% check they're still all in there - true = msg_store_contains(true, Guids), + true = msg_store_contains(true, Guids, MSCState), %% sync on the 2nd half, but do lots of individual syncs to try %% and cause coalescing to happen ok = lists:foldl( fun (Guid, ok) -> rabbit_msg_store:sync( - ?PERSISTENT_MSG_STORE, - [Guid], fun () -> Self ! {sync, Guid} end) + [Guid], fun () -> Self ! {sync, Guid} end, + MSCState) end, ok, Guids2ndHalf), lists:foldl( fun(Guid, ok) -> @@ -1515,24 +1521,24 @@ test_msg_store() -> end, ok, Guids2ndHalf), %% it's very likely we're not dirty here, so the 1st half sync %% should hit a different code path - ok = msg_store_sync(Guids1stHalf), + ok = msg_store_sync(Guids1stHalf, MSCState), %% read them all - MSCState4 = msg_store_read(Guids, MSCState3), + MSCState1 = msg_store_read(Guids, MSCState), %% read them all again - this will hit the cache, not disk - MSCState5 = msg_store_read(Guids, MSCState4), + MSCState2 = msg_store_read(Guids, MSCState1), %% remove them all - ok = rabbit_msg_store:remove(?PERSISTENT_MSG_STORE, Guids), + ok = rabbit_msg_store:remove(Guids, MSCState2), %% check first half doesn't exist - false = msg_store_contains(false, Guids1stHalf), + false = msg_store_contains(false, Guids1stHalf, MSCState2), %% check second half does exist - true = msg_store_contains(true, Guids2ndHalf), + true = msg_store_contains(true, Guids2ndHalf, MSCState2), %% read the second half again - MSCState6 = msg_store_read(Guids2ndHalf, MSCState5), + MSCState3 = msg_store_read(Guids2ndHalf, MSCState2), %% release the second half, just for fun (aka code coverage) - ok = rabbit_msg_store:release(?PERSISTENT_MSG_STORE, Guids2ndHalf), + ok = rabbit_msg_store:release(Guids2ndHalf, MSCState3), %% read the second half again, just for fun (aka code coverage) - MSCState7 = msg_store_read(Guids2ndHalf, MSCState6), - ok = rabbit_msg_store:client_terminate(MSCState7, ?PERSISTENT_MSG_STORE), + MSCState4 = msg_store_read(Guids2ndHalf, MSCState3), + ok = rabbit_msg_store:client_terminate(MSCState4), %% stop and restart, preserving every other msg in 2nd half ok = rabbit_variable_queue:stop_msg_store(), ok = rabbit_variable_queue:start_msg_store( @@ -1543,22 +1549,26 @@ test_msg_store() -> ([Guid|GuidsTail]) -> {Guid, 0, GuidsTail} end, Guids2ndHalf}), + MSCState5 = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref), %% check we have the right msgs left lists:foldl( fun (Guid, Bool) -> - not(Bool = rabbit_msg_store:contains(?PERSISTENT_MSG_STORE, Guid)) + not(Bool = rabbit_msg_store:contains(Guid, MSCState5)) end, false, Guids2ndHalf), + ok = rabbit_msg_store:client_terminate(MSCState5), %% restart empty restart_msg_store_empty(), + MSCState6 = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref), %% check we don't contain any of the msgs - false = msg_store_contains(false, Guids), + false = msg_store_contains(false, Guids, MSCState6), %% publish the first half again - MSCState8 = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref), - {ok, MSCState9} = msg_store_write(Guids1stHalf, MSCState8), + ok = msg_store_write(Guids1stHalf, MSCState6), %% this should force some sort of sync internally otherwise misread ok = rabbit_msg_store:client_terminate( - msg_store_read(Guids1stHalf, MSCState9), ?PERSISTENT_MSG_STORE), - ok = rabbit_msg_store:remove(?PERSISTENT_MSG_STORE, Guids1stHalf), + msg_store_read(Guids1stHalf, MSCState6)), + MSCState7 = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref), + ok = rabbit_msg_store:remove(Guids1stHalf, MSCState7), + ok = rabbit_msg_store:client_terminate(MSCState7), %% restart empty restart_msg_store_empty(), %% now safe to reuse guids %% push a lot of msgs in... at least 100 files worth @@ -1567,31 +1577,39 @@ test_msg_store() -> BigCount = trunc(100 * FileSize / (PayloadSizeBits div 8)), GuidsBig = [guid_bin(X) || X <- lists:seq(1, BigCount)], Payload = << 0:PayloadSizeBits >>, - ok = foreach_with_msg_store_client( + ok = with_msg_store_client( ?PERSISTENT_MSG_STORE, Ref, - fun (Guid, MsgStore, MSCStateM) -> - {ok, MSCStateN} = rabbit_msg_store:write( - MsgStore, Guid, Payload, MSCStateM), - MSCStateN - end, GuidsBig), + fun (MSCStateM) -> + [ok = rabbit_msg_store:write(Guid, Payload, MSCStateM) || + Guid <- GuidsBig], + MSCStateM + end), %% now read them to ensure we hit the fast client-side reading ok = foreach_with_msg_store_client( ?PERSISTENT_MSG_STORE, Ref, - fun (Guid, MsgStore, MSCStateM) -> + fun (Guid, MSCStateM) -> {{ok, Payload}, MSCStateN} = rabbit_msg_store:read( - MsgStore, Guid, MSCStateM), + Guid, MSCStateM), MSCStateN end, GuidsBig), %% .., then 3s by 1... - ok = msg_store_remove([guid_bin(X) || X <- lists:seq(BigCount, 1, -3)]), + ok = msg_store_remove(?PERSISTENT_MSG_STORE, Ref, + [guid_bin(X) || X <- lists:seq(BigCount, 1, -3)]), %% .., then remove 3s by 2, from the young end first. This hits %% GC (under 50% good data left, but no empty files. Must GC). - ok = msg_store_remove([guid_bin(X) || X <- lists:seq(BigCount-1, 1, -3)]), + ok = msg_store_remove(?PERSISTENT_MSG_STORE, Ref, + [guid_bin(X) || X <- lists:seq(BigCount-1, 1, -3)]), %% .., then remove 3s by 3, from the young end first. This hits %% GC... - ok = msg_store_remove([guid_bin(X) || X <- lists:seq(BigCount-2, 1, -3)]), + ok = msg_store_remove(?PERSISTENT_MSG_STORE, Ref, + [guid_bin(X) || X <- lists:seq(BigCount-2, 1, -3)]), %% ensure empty - false = msg_store_contains(false, GuidsBig), + ok = with_msg_store_client( + ?PERSISTENT_MSG_STORE, Ref, + fun (MSCStateM) -> + false = msg_store_contains(false, GuidsBig, MSCStateM), + MSCStateM + end), %% restart empty restart_msg_store_empty(), passed. @@ -1603,11 +1621,18 @@ test_queue() -> queue_name(<<"test">>). init_test_queue() -> - rabbit_queue_index:init( - test_queue(), true, false, - fun (Guid) -> - rabbit_msg_store:contains(?PERSISTENT_MSG_STORE, Guid) - end). + TestQueue = test_queue(), + Terms = rabbit_queue_index:shutdown_terms(TestQueue), + PRef = proplists:get_value(persistent_ref, Terms, rabbit_guid:guid()), + PersistentClient = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, + PRef), + Res = rabbit_queue_index:recover( + TestQueue, Terms, false, + fun (Guid) -> + rabbit_msg_store:contains(Guid, PersistentClient) + end), + ok = rabbit_msg_store:client_delete_and_terminate(PersistentClient), + Res. restart_test_queue(Qi) -> _ = rabbit_queue_index:terminate([], Qi), @@ -1618,13 +1643,13 @@ restart_test_queue(Qi) -> empty_test_queue() -> ok = rabbit_variable_queue:stop(), ok = rabbit_variable_queue:start([]), - {0, _Terms, Qi} = init_test_queue(), + {0, Qi} = init_test_queue(), _ = rabbit_queue_index:delete_and_terminate(Qi), ok. with_empty_test_queue(Fun) -> ok = empty_test_queue(), - {0, _Terms, Qi} = init_test_queue(), + {0, Qi} = init_test_queue(), rabbit_queue_index:delete_and_terminate(Fun(Qi)). queue_index_publish(SeqIds, Persistent, Qi) -> @@ -1633,29 +1658,44 @@ queue_index_publish(SeqIds, Persistent, Qi) -> true -> ?PERSISTENT_MSG_STORE; false -> ?TRANSIENT_MSG_STORE end, - {A, B, MSCStateEnd} = + MSCState = rabbit_msg_store:client_init(MsgStore, Ref), + {A, B} = lists:foldl( - fun (SeqId, {QiN, SeqIdsGuidsAcc, MSCStateN}) -> + fun (SeqId, {QiN, SeqIdsGuidsAcc}) -> Guid = rabbit_guid:guid(), QiM = rabbit_queue_index:publish( - Guid, SeqId, Persistent, QiN), - {ok, MSCStateM} = rabbit_msg_store:write(MsgStore, Guid, - Guid, MSCStateN), - {QiM, [{SeqId, Guid} | SeqIdsGuidsAcc], MSCStateM} - end, {Qi, [], rabbit_msg_store:client_init(MsgStore, Ref)}, SeqIds), - ok = rabbit_msg_store:client_delete_and_terminate( - MSCStateEnd, MsgStore, Ref), + Guid, SeqId, #message_properties{}, Persistent, QiN), + ok = rabbit_msg_store:write(Guid, Guid, MSCState), + {QiM, [{SeqId, Guid} | SeqIdsGuidsAcc]} + end, {Qi, []}, SeqIds), + ok = rabbit_msg_store:client_delete_and_terminate(MSCState), {A, B}. verify_read_with_published(_Delivered, _Persistent, [], _) -> ok; verify_read_with_published(Delivered, Persistent, - [{Guid, SeqId, Persistent, Delivered}|Read], + [{Guid, SeqId, _Props, Persistent, Delivered}|Read], [{SeqId, Guid}|Published]) -> verify_read_with_published(Delivered, Persistent, Read, Published); verify_read_with_published(_Delivered, _Persistent, _Read, _Published) -> ko. +test_queue_index_props() -> + with_empty_test_queue( + fun(Qi0) -> + Guid = rabbit_guid:guid(), + Props = #message_properties{expiry=12345}, + Qi1 = rabbit_queue_index:publish(Guid, 1, Props, true, Qi0), + {[{Guid, 1, Props, _, _}], Qi2} = + rabbit_queue_index:read(1, 2, Qi1), + Qi2 + end), + + ok = rabbit_variable_queue:stop(), + ok = rabbit_variable_queue:start([]), + + passed. + test_queue_index() -> SegmentSize = rabbit_queue_index:next_segment_boundary(0), TwoSegs = SegmentSize + SegmentSize, @@ -1674,7 +1714,7 @@ test_queue_index() -> ok = verify_read_with_published(false, false, ReadA, lists:reverse(SeqIdsGuidsA)), %% should get length back as 0, as all the msgs were transient - {0, _Terms1, Qi6} = restart_test_queue(Qi4), + {0, Qi6} = restart_test_queue(Qi4), {0, 0, Qi7} = rabbit_queue_index:bounds(Qi6), {Qi8, SeqIdsGuidsB} = queue_index_publish(SeqIdsB, true, Qi7), {0, TwoSegs, Qi9} = rabbit_queue_index:bounds(Qi8), @@ -1683,7 +1723,7 @@ test_queue_index() -> lists:reverse(SeqIdsGuidsB)), %% should get length back as MostOfASegment LenB = length(SeqIdsB), - {LenB, _Terms2, Qi12} = restart_test_queue(Qi10), + {LenB, Qi12} = restart_test_queue(Qi10), {0, TwoSegs, Qi13} = rabbit_queue_index:bounds(Qi12), Qi14 = rabbit_queue_index:deliver(SeqIdsB, Qi13), {ReadC, Qi15} = rabbit_queue_index:read(0, SegmentSize, Qi14), @@ -1695,7 +1735,7 @@ test_queue_index() -> {0, 0, Qi18} = rabbit_queue_index:bounds(Qi17), %% should get length back as 0 because all persistent %% msgs have been acked - {0, _Terms3, Qi19} = restart_test_queue(Qi18), + {0, Qi19} = restart_test_queue(Qi18), Qi19 end), @@ -1767,11 +1807,11 @@ test_queue_index() -> true, Qi0), Qi2 = rabbit_queue_index:deliver([0,1,4], Qi1), Qi3 = rabbit_queue_index:ack([0], Qi2), - {5, _Terms9, Qi4} = restart_test_queue(Qi3), + {5, Qi4} = restart_test_queue(Qi3), {Qi5, _SeqIdsGuidsF} = queue_index_publish([3,6,8], true, Qi4), Qi6 = rabbit_queue_index:deliver([2,3,5,6], Qi5), Qi7 = rabbit_queue_index:ack([1,2,3], Qi6), - {5, _Terms10, Qi8} = restart_test_queue(Qi7), + {5, Qi8} = restart_test_queue(Qi7), Qi8 end), @@ -1789,7 +1829,8 @@ variable_queue_publish(IsPersistent, Count, VQ) -> <<>>, #'P_basic'{delivery_mode = case IsPersistent of true -> 2; false -> 1 - end}, <<>>), VQN) + end}, <<>>), + #message_properties{}, VQN) end, VQ, lists:seq(1, Count)). variable_queue_fetch(Count, IsPersistent, IsDelivered, Len, VQ) -> @@ -1823,9 +1864,41 @@ test_variable_queue() -> F <- [fun test_variable_queue_dynamic_duration_change/1, fun test_variable_queue_partial_segments_delta_thing/1, fun test_variable_queue_all_the_bits_not_covered_elsewhere1/1, - fun test_variable_queue_all_the_bits_not_covered_elsewhere2/1]], + fun test_variable_queue_all_the_bits_not_covered_elsewhere2/1, + fun test_dropwhile/1]], passed. +test_dropwhile(VQ0) -> + Count = 10, + + %% add messages with sequential expiry + VQ1 = lists:foldl( + fun (N, VQN) -> + rabbit_variable_queue:publish( + rabbit_basic:message( + rabbit_misc:r(<<>>, exchange, <<>>), + <<>>, #'P_basic'{}, <<>>), + #message_properties{expiry = N}, VQN) + end, VQ0, lists:seq(1, Count)), + + %% drop the first 5 messages + VQ2 = rabbit_variable_queue:dropwhile( + fun(#message_properties { expiry = Expiry }) -> + Expiry =< 5 + end, VQ1), + + %% fetch five now + VQ3 = lists:foldl(fun (_N, VQN) -> + {{#basic_message{}, _, _, _}, VQM} = + rabbit_variable_queue:fetch(false, VQN), + VQM + end, VQ2, lists:seq(6, Count)), + + %% should be empty now + {empty, VQ4} = rabbit_variable_queue:fetch(false, VQ3), + + VQ4. + test_variable_queue_dynamic_duration_change(VQ0) -> SegmentSize = rabbit_queue_index:next_segment_boundary(0), @@ -1836,6 +1909,7 @@ test_variable_queue_dynamic_duration_change(VQ0) -> %% squeeze and relax queue Churn = Len div 32, VQ2 = publish_fetch_and_ack(Churn, Len, VQ1), + {Duration, VQ3} = rabbit_variable_queue:ram_duration(VQ2), VQ7 = lists:foldl( fun (Duration1, VQ4) -> @@ -1934,7 +2008,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) -> VQ1 = rabbit_variable_queue:set_ram_duration_target(0, VQ0), VQ2 = variable_queue_publish(false, 4, VQ1), {VQ3, AckTags} = variable_queue_fetch(2, false, false, 4, VQ2), - VQ4 = rabbit_variable_queue:requeue(AckTags, VQ3), + VQ4 = rabbit_variable_queue:requeue(AckTags, fun(X) -> X end, VQ3), VQ5 = rabbit_variable_queue:idle_timeout(VQ4), _VQ6 = rabbit_variable_queue:terminate(VQ5), VQ7 = rabbit_variable_queue:init(test_queue(), true, true), @@ -1974,3 +2048,56 @@ test_queue_recover() -> rabbit_amqqueue:internal_delete(QName) end), passed. + +test_configurable_server_properties() -> + %% List of the names of the built-in properties do we expect to find + BuiltInPropNames = [<<"product">>, <<"version">>, <<"platform">>, + <<"copyright">>, <<"information">>], + + %% Verify that the built-in properties are initially present + ActualPropNames = [Key || + {Key, longstr, _} <- rabbit_reader:server_properties()], + true = lists:all(fun (X) -> lists:member(X, ActualPropNames) end, + BuiltInPropNames), + + %% Get the initial server properties configured in the environment + {ok, ServerProperties} = application:get_env(rabbit, server_properties), + + %% Helper functions + ConsProp = fun (X) -> application:set_env(rabbit, + server_properties, + [X | ServerProperties]) end, + IsPropPresent = fun (X) -> lists:member(X, + rabbit_reader:server_properties()) + end, + + %% Add a wholly new property of the simplified {KeyAtom, StringValue} form + NewSimplifiedProperty = {NewHareKey, NewHareVal} = {hare, "soup"}, + ConsProp(NewSimplifiedProperty), + %% Do we find hare soup, appropriately formatted in the generated properties? + ExpectedHareImage = {list_to_binary(atom_to_list(NewHareKey)), + longstr, + list_to_binary(NewHareVal)}, + true = IsPropPresent(ExpectedHareImage), + + %% Add a wholly new property of the {BinaryKey, Type, Value} form + %% and check for it + NewProperty = {<<"new-bin-key">>, signedint, -1}, + ConsProp(NewProperty), + %% Do we find the new property? + true = IsPropPresent(NewProperty), + + %% Add a property that clobbers a built-in, and verify correct clobbering + {NewVerKey, NewVerVal} = NewVersion = {version, "X.Y.Z."}, + {BinNewVerKey, BinNewVerVal} = {list_to_binary(atom_to_list(NewVerKey)), + list_to_binary(NewVerVal)}, + ConsProp(NewVersion), + ClobberedServerProps = rabbit_reader:server_properties(), + %% Is the clobbering insert present? + true = IsPropPresent({BinNewVerKey, longstr, BinNewVerVal}), + %% Is the clobbering insert the only thing with the clobbering key? + [{BinNewVerKey, longstr, BinNewVerVal}] = + [E || {K, longstr, _V} = E <- ClobberedServerProps, K =:= BinNewVerKey], + + application:set_env(rabbit, server_properties, ServerProperties), + passed. diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl index b971a63f..91f2c4ca 100644 --- a/src/rabbit_types.erl +++ b/src/rabbit_types.erl @@ -37,8 +37,8 @@ -export_type([txn/0, maybe/1, info/0, info_key/0, message/0, basic_message/0, delivery/0, content/0, decoded_content/0, undecoded_content/0, - unencoded_content/0, encoded_content/0, vhost/0, ctag/0, - amqp_error/0, r/1, r2/2, r3/3, listener/0, + unencoded_content/0, encoded_content/0, message_properties/0, + vhost/0, ctag/0, amqp_error/0, r/1, r2/2, r3/3, listener/0, binding/0, binding_source/0, binding_destination/0, amqqueue/0, exchange/0, connection/0, protocol/0, user/0, ok/1, error/1, ok_or_error/1, @@ -88,6 +88,8 @@ txn :: maybe(txn()), sender :: pid(), message :: message()}). +-type(message_properties() :: + #message_properties{expiry :: pos_integer() | 'undefined'}). %% this is really an abstract type, but dialyzer does not support them -type(txn() :: rabbit_guid:guid()). diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index fef44037..ac8fcfee 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -32,9 +32,9 @@ -module(rabbit_variable_queue). -export([init/3, terminate/1, delete_and_terminate/1, - purge/1, publish/2, publish_delivered/3, fetch/2, ack/2, - tx_publish/3, tx_ack/3, tx_rollback/2, tx_commit/3, - requeue/2, len/1, is_empty/1, + purge/1, publish/3, publish_delivered/4, fetch/2, ack/2, + tx_publish/4, tx_ack/3, tx_rollback/2, tx_commit/4, + requeue/3, len/1, is_empty/1, dropwhile/2, set_ram_duration_target/2, ram_duration/1, needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1, status/1]). @@ -247,7 +247,8 @@ is_persistent, is_delivered, msg_on_disk, - index_on_disk + index_on_disk, + msg_props }). -record(delta, @@ -293,7 +294,8 @@ -type(sync() :: #sync { acks_persistent :: [[seq_id()]], acks_all :: [[seq_id()]], - pubs :: [[rabbit_guid:guid()]], + pubs :: [{message_properties_transformer(), + [rabbit_types:basic_message()]}], funs :: [fun (() -> any())] }). -type(state() :: #vqstate { @@ -366,16 +368,17 @@ stop_msg_store() -> ok = rabbit_sup:stop_child(?PERSISTENT_MSG_STORE), ok = rabbit_sup:stop_child(?TRANSIENT_MSG_STORE). -init(QueueName, IsDurable, Recover) -> - {DeltaCount, Terms, IndexState} = - rabbit_queue_index:init( - QueueName, Recover, - rabbit_msg_store:successfully_recovered_state(?PERSISTENT_MSG_STORE), - fun (Guid) -> - rabbit_msg_store:contains(?PERSISTENT_MSG_STORE, Guid) - end), - {LowSeqId, NextSeqId, IndexState1} = rabbit_queue_index:bounds(IndexState), +init(QueueName, IsDurable, false) -> + IndexState = rabbit_queue_index:init(QueueName), + init(IsDurable, IndexState, 0, [], + case IsDurable of + true -> msg_store_client_init(?PERSISTENT_MSG_STORE); + false -> undefined + end, + msg_store_client_init(?TRANSIENT_MSG_STORE)); +init(QueueName, true, true) -> + Terms = rabbit_queue_index:shutdown_terms(QueueName), {PRef, TRef, Terms1} = case [persistent_ref, transient_ref] -- proplists:get_keys(Terms) of [] -> {proplists:get_value(persistent_ref, Terms), @@ -383,63 +386,32 @@ init(QueueName, IsDurable, Recover) -> Terms}; _ -> {rabbit_guid:guid(), rabbit_guid:guid(), []} end, - DeltaCount1 = proplists:get_value(persistent_count, Terms1, DeltaCount), - Delta = case DeltaCount1 == 0 andalso DeltaCount /= undefined of - true -> ?BLANK_DELTA; - false -> #delta { start_seq_id = LowSeqId, - count = DeltaCount1, - end_seq_id = NextSeqId } - end, - Now = now(), - PersistentClient = - case IsDurable of - true -> rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, PRef); - false -> undefined - end, - TransientClient = rabbit_msg_store:client_init(?TRANSIENT_MSG_STORE, TRef), - State = #vqstate { - q1 = queue:new(), - q2 = bpqueue:new(), - delta = Delta, - q3 = bpqueue:new(), - q4 = queue:new(), - next_seq_id = NextSeqId, - pending_ack = dict:new(), - index_state = IndexState1, - msg_store_clients = {{PersistentClient, PRef}, - {TransientClient, TRef}}, - on_sync = ?BLANK_SYNC, - durable = IsDurable, - transient_threshold = NextSeqId, - - len = DeltaCount1, - persistent_count = DeltaCount1, - - target_ram_msg_count = infinity, - ram_msg_count = 0, - ram_msg_count_prev = 0, - ram_index_count = 0, - out_counter = 0, - in_counter = 0, - rates = #rates { egress = {Now, 0}, - ingress = {Now, DeltaCount1}, - avg_egress = 0.0, - avg_ingress = 0.0, - timestamp = Now } }, - a(maybe_deltas_to_betas(State)). + PersistentClient = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, + PRef), + TransientClient = rabbit_msg_store:client_init(?TRANSIENT_MSG_STORE, + TRef), + {DeltaCount, IndexState} = + rabbit_queue_index:recover( + QueueName, Terms1, + rabbit_msg_store:successfully_recovered_state(?PERSISTENT_MSG_STORE), + fun (Guid) -> + rabbit_msg_store:contains(Guid, PersistentClient) + end), + init(true, IndexState, DeltaCount, Terms1, + PersistentClient, TransientClient). terminate(State) -> State1 = #vqstate { persistent_count = PCount, index_state = IndexState, - msg_store_clients = {{MSCStateP, PRef}, - {MSCStateT, TRef}} } = + msg_store_clients = {MSCStateP, MSCStateT} } = remove_pending_ack(true, tx_commit_index(State)), - case MSCStateP of - undefined -> ok; - _ -> rabbit_msg_store:client_terminate( - MSCStateP, ?PERSISTENT_MSG_STORE) - end, - rabbit_msg_store:client_terminate(MSCStateT, ?TRANSIENT_MSG_STORE), + PRef = case MSCStateP of + undefined -> undefined; + _ -> ok = rabbit_msg_store:client_terminate(MSCStateP), + rabbit_msg_store:client_ref(MSCStateP) + end, + ok = rabbit_msg_store:client_terminate(MSCStateT), + TRef = rabbit_msg_store:client_ref(MSCStateT), Terms = [{persistent_ref, PRef}, {transient_ref, TRef}, {persistent_count, PCount}], @@ -455,191 +427,247 @@ delete_and_terminate(State) -> %% deleting it. {_PurgeCount, State1} = purge(State), State2 = #vqstate { index_state = IndexState, - msg_store_clients = {{MSCStateP, PRef}, - {MSCStateT, TRef}} } = + msg_store_clients = {MSCStateP, MSCStateT} } = remove_pending_ack(false, State1), IndexState1 = rabbit_queue_index:delete_and_terminate(IndexState), case MSCStateP of undefined -> ok; - _ -> rabbit_msg_store:client_delete_and_terminate( - MSCStateP, ?PERSISTENT_MSG_STORE, PRef) + _ -> rabbit_msg_store:client_delete_and_terminate(MSCStateP) end, - rabbit_msg_store:client_delete_and_terminate( - MSCStateT, ?TRANSIENT_MSG_STORE, TRef), + rabbit_msg_store:client_delete_and_terminate(MSCStateT), a(State2 #vqstate { index_state = IndexState1, msg_store_clients = undefined }). -purge(State = #vqstate { q4 = Q4, - index_state = IndexState, - len = Len, - persistent_count = PCount }) -> +purge(State = #vqstate { q4 = Q4, + index_state = IndexState, + msg_store_clients = MSCState, + len = Len, + persistent_count = PCount }) -> %% TODO: when there are no pending acks, which is a common case, %% we could simply wipe the qi instead of issuing delivers and %% acks for all the messages. {LensByStore, IndexState1} = remove_queue_entries( fun rabbit_misc:queue_fold/3, Q4, - orddict:new(), IndexState), - {LensByStore1, State1 = #vqstate { q1 = Q1, index_state = IndexState2 }} = + orddict:new(), IndexState, MSCState), + {LensByStore1, State1 = #vqstate { q1 = Q1, + index_state = IndexState2, + msg_store_clients = MSCState1 }} = purge_betas_and_deltas(LensByStore, State #vqstate { q4 = queue:new(), index_state = IndexState1 }), {LensByStore2, IndexState3} = remove_queue_entries( fun rabbit_misc:queue_fold/3, Q1, - LensByStore1, IndexState2), + LensByStore1, IndexState2, MSCState1), PCount1 = PCount - find_persistent_count(LensByStore2), - {Len, a(State1 #vqstate { q1 = queue:new(), - index_state = IndexState3, - len = 0, - ram_msg_count = 0, - ram_index_count = 0, - persistent_count = PCount1 })}. - -publish(Msg, State) -> - {_SeqId, State1} = publish(Msg, false, false, State), + {Len, a(State1 #vqstate { q1 = queue:new(), + index_state = IndexState3, + len = 0, + ram_msg_count = 0, + ram_index_count = 0, + persistent_count = PCount1 })}. + +publish(Msg, MsgProps, State) -> + {_SeqId, State1} = publish(Msg, MsgProps, false, false, State), a(reduce_memory_use(State1)). -publish_delivered(false, _Msg, State = #vqstate { len = 0 }) -> +publish_delivered(false, _Msg, _MsgProps, State = #vqstate { len = 0 }) -> {blank_ack, a(State)}; publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent }, - State = #vqstate { len = 0, - next_seq_id = SeqId, - out_counter = OutCount, - in_counter = InCount, - persistent_count = PCount, - pending_ack = PA, - durable = IsDurable }) -> + MsgProps, + State = #vqstate { len = 0, + next_seq_id = SeqId, + out_counter = OutCount, + in_counter = InCount, + persistent_count = PCount, + pending_ack = PA, + durable = IsDurable }) -> IsPersistent1 = IsDurable andalso IsPersistent, - MsgStatus = (msg_status(IsPersistent1, SeqId, Msg)) + MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, MsgProps)) #msg_status { is_delivered = true }, {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State), PA1 = record_pending_ack(m(MsgStatus1), PA), PCount1 = PCount + one_if(IsPersistent1), - {SeqId, a(State1 #vqstate { next_seq_id = SeqId + 1, - out_counter = OutCount + 1, - in_counter = InCount + 1, - persistent_count = PCount1, - pending_ack = PA1 })}. - -fetch(AckRequired, State = #vqstate { q4 = Q4, - ram_msg_count = RamMsgCount, - out_counter = OutCount, - index_state = IndexState, - len = Len, - persistent_count = PCount, - pending_ack = PA }) -> + {SeqId, a(State1 #vqstate { next_seq_id = SeqId + 1, + out_counter = OutCount + 1, + in_counter = InCount + 1, + persistent_count = PCount1, + pending_ack = PA1 })}. + +dropwhile(Pred, State) -> + {_OkOrEmpty, State1} = dropwhile1(Pred, State), + State1. + +dropwhile1(Pred, State) -> + internal_queue_out( + fun(MsgStatus = #msg_status { msg_props = MsgProps }, State1) -> + case Pred(MsgProps) of + true -> + {_, State2} = internal_fetch(false, MsgStatus, State1), + dropwhile1(Pred, State2); + false -> + %% message needs to go back into Q4 (or maybe go + %% in for the first time if it was loaded from + %% Q3). Also the msg contents might not be in + %% RAM, so read them in now + {MsgStatus1, State2 = #vqstate { q4 = Q4 }} = + read_msg(MsgStatus, State1), + {ok, State2 #vqstate {q4 = queue:in_r(MsgStatus1, Q4) }} + end + end, State). + +fetch(AckRequired, State) -> + internal_queue_out( + fun(MsgStatus, State1) -> + %% it's possible that the message wasn't read from disk + %% at this point, so read it in. + {MsgStatus1, State2} = read_msg(MsgStatus, State1), + internal_fetch(AckRequired, MsgStatus1, State2) + end, State). + +internal_queue_out(Fun, State = #vqstate { q4 = Q4 }) -> case queue:out(Q4) of {empty, _Q4} -> - case fetch_from_q3_to_q4(State) of - {empty, State1} = Result -> a(State1), Result; - {loaded, State1} -> fetch(AckRequired, State1) + case fetch_from_q3(State) of + {empty, State1} = Result -> a(State1), Result; + {loaded, {MsgStatus, State1}} -> Fun(MsgStatus, State1) end; - {{value, MsgStatus = #msg_status { - msg = Msg, guid = Guid, seq_id = SeqId, - is_persistent = IsPersistent, is_delivered = IsDelivered, - msg_on_disk = MsgOnDisk, index_on_disk = IndexOnDisk }}, - Q4a} -> - - %% 1. Mark it delivered if necessary - IndexState1 = maybe_write_delivered( - IndexOnDisk andalso not IsDelivered, - SeqId, IndexState), - - %% 2. Remove from msg_store and queue index, if necessary - MsgStore = find_msg_store(IsPersistent), - Rem = fun () -> ok = rabbit_msg_store:remove(MsgStore, [Guid]) end, - Ack = fun () -> rabbit_queue_index:ack([SeqId], IndexState1) end, - IndexState2 = - case {AckRequired, MsgOnDisk, IndexOnDisk, IsPersistent} of - {false, true, false, _} -> Rem(), IndexState1; - {false, true, true, _} -> Rem(), Ack(); - { true, true, true, false} -> Ack(); - _ -> IndexState1 - end, - - %% 3. If an ack is required, add something sensible to PA - {AckTag, PA1} = case AckRequired of - true -> PA2 = record_pending_ack( - MsgStatus #msg_status { - is_delivered = true }, PA), - {SeqId, PA2}; - false -> {blank_ack, PA} - end, - - PCount1 = PCount - one_if(IsPersistent andalso not AckRequired), - Len1 = Len - 1, - {{Msg, IsDelivered, AckTag, Len1}, - a(State #vqstate { q4 = Q4a, - ram_msg_count = RamMsgCount - 1, - out_counter = OutCount + 1, - index_state = IndexState2, - len = Len1, - persistent_count = PCount1, - pending_ack = PA1 })} + {{value, MsgStatus}, Q4a} -> + Fun(MsgStatus, State #vqstate { q4 = Q4a }) end. +read_msg(MsgStatus = #msg_status { msg = undefined, + guid = Guid, + is_persistent = IsPersistent }, + State = #vqstate { ram_msg_count = RamMsgCount, + msg_store_clients = MSCState}) -> + {{ok, Msg = #basic_message {}}, MSCState1} = + msg_store_read(MSCState, IsPersistent, Guid), + {MsgStatus #msg_status { msg = Msg }, + State #vqstate { ram_msg_count = RamMsgCount + 1, + msg_store_clients = MSCState1 }}; +read_msg(MsgStatus, State) -> + {MsgStatus, State}. + +internal_fetch(AckRequired, MsgStatus = #msg_status { + seq_id = SeqId, + guid = Guid, + msg = Msg, + is_persistent = IsPersistent, + is_delivered = IsDelivered, + msg_on_disk = MsgOnDisk, + index_on_disk = IndexOnDisk }, + State = #vqstate {ram_msg_count = RamMsgCount, + out_counter = OutCount, + index_state = IndexState, + msg_store_clients = MSCState, + len = Len, + persistent_count = PCount, + pending_ack = PA }) -> + %% 1. Mark it delivered if necessary + IndexState1 = maybe_write_delivered( + IndexOnDisk andalso not IsDelivered, + SeqId, IndexState), + + %% 2. Remove from msg_store and queue index, if necessary + Rem = fun () -> + ok = msg_store_remove(MSCState, IsPersistent, [Guid]) + end, + Ack = fun () -> rabbit_queue_index:ack([SeqId], IndexState1) end, + IndexState2 = + case {AckRequired, MsgOnDisk, IndexOnDisk, IsPersistent} of + {false, true, false, _} -> Rem(), IndexState1; + {false, true, true, _} -> Rem(), Ack(); + { true, true, true, false} -> Ack(); + _ -> IndexState1 + end, + + %% 3. If an ack is required, add something sensible to PA + {AckTag, PA1} = case AckRequired of + true -> PA2 = record_pending_ack( + MsgStatus #msg_status { + is_delivered = true }, PA), + {SeqId, PA2}; + false -> {blank_ack, PA} + end, + + PCount1 = PCount - one_if(IsPersistent andalso not AckRequired), + Len1 = Len - 1, + RamMsgCount1 = RamMsgCount - one_if(Msg =/= undefined), + + {{Msg, IsDelivered, AckTag, Len1}, + a(State #vqstate { ram_msg_count = RamMsgCount1, + out_counter = OutCount + 1, + index_state = IndexState2, + len = Len1, + persistent_count = PCount1, + pending_ack = PA1 })}. + ack(AckTags, State) -> - a(ack(fun rabbit_msg_store:remove/2, + a(ack(fun msg_store_remove/3, fun (_AckEntry, State1) -> State1 end, AckTags, State)). -tx_publish(Txn, Msg = #basic_message { is_persistent = IsPersistent }, +tx_publish(Txn, Msg = #basic_message { is_persistent = IsPersistent }, MsgProps, State = #vqstate { durable = IsDurable, msg_store_clients = MSCState }) -> Tx = #tx { pending_messages = Pubs } = lookup_tx(Txn), - store_tx(Txn, Tx #tx { pending_messages = [Msg | Pubs] }), - a(case IsPersistent andalso IsDurable of - true -> MsgStatus = msg_status(true, undefined, Msg), - {#msg_status { msg_on_disk = true }, MSCState1} = - maybe_write_msg_to_disk(false, MsgStatus, MSCState), - State #vqstate { msg_store_clients = MSCState1 }; - false -> State - end). + store_tx(Txn, Tx #tx { pending_messages = [{Msg, MsgProps} | Pubs] }), + case IsPersistent andalso IsDurable of + true -> MsgStatus = msg_status(true, undefined, Msg, MsgProps), + #msg_status { msg_on_disk = true } = + maybe_write_msg_to_disk(false, MsgStatus, MSCState); + false -> ok + end, + a(State). tx_ack(Txn, AckTags, State) -> Tx = #tx { pending_acks = Acks } = lookup_tx(Txn), store_tx(Txn, Tx #tx { pending_acks = [AckTags | Acks] }), State. -tx_rollback(Txn, State = #vqstate { durable = IsDurable }) -> +tx_rollback(Txn, State = #vqstate { durable = IsDurable, + msg_store_clients = MSCState }) -> #tx { pending_acks = AckTags, pending_messages = Pubs } = lookup_tx(Txn), erase_tx(Txn), ok = case IsDurable of - true -> rabbit_msg_store:remove(?PERSISTENT_MSG_STORE, - persistent_guids(Pubs)); + true -> msg_store_remove(MSCState, true, persistent_guids(Pubs)); false -> ok end, {lists:append(AckTags), a(State)}. -tx_commit(Txn, Fun, State = #vqstate { durable = IsDurable }) -> +tx_commit(Txn, Fun, MsgPropsFun, + State = #vqstate { durable = IsDurable, + msg_store_clients = MSCState }) -> #tx { pending_acks = AckTags, pending_messages = Pubs } = lookup_tx(Txn), erase_tx(Txn), - PubsOrdered = lists:reverse(Pubs), AckTags1 = lists:append(AckTags), - PersistentGuids = persistent_guids(PubsOrdered), + PersistentGuids = persistent_guids(Pubs), HasPersistentPubs = PersistentGuids =/= [], {AckTags1, a(case IsDurable andalso HasPersistentPubs of - true -> ok = rabbit_msg_store:sync( - ?PERSISTENT_MSG_STORE, PersistentGuids, - msg_store_callback(PersistentGuids, - PubsOrdered, AckTags1, Fun)), + true -> ok = msg_store_sync( + MSCState, true, PersistentGuids, + msg_store_callback(PersistentGuids, Pubs, AckTags1, + Fun, MsgPropsFun)), State; - false -> tx_commit_post_msg_store( - HasPersistentPubs, PubsOrdered, AckTags1, Fun, State) + false -> tx_commit_post_msg_store(HasPersistentPubs, Pubs, AckTags1, + Fun, MsgPropsFun, State) end)}. -requeue(AckTags, State) -> +requeue(AckTags, MsgPropsFun, State) -> a(reduce_memory_use( - ack(fun rabbit_msg_store:release/2, - fun (#msg_status { msg = Msg }, State1) -> - {_SeqId, State2} = publish(Msg, true, false, State1), + ack(fun msg_store_release/3, + fun (#msg_status { msg = Msg, msg_props = MsgProps }, State1) -> + {_SeqId, State2} = publish(Msg, MsgPropsFun(MsgProps), + true, false, State1), State2; - ({IsPersistent, Guid}, State1) -> + ({IsPersistent, Guid, MsgProps}, State1) -> #vqstate { msg_store_clients = MSCState } = State1, {{ok, Msg = #basic_message{}}, MSCState1} = - read_from_msg_store(MSCState, IsPersistent, Guid), + msg_store_read(MSCState, IsPersistent, Guid), State2 = State1 #vqstate { msg_store_clients = MSCState1 }, - {_SeqId, State3} = publish(Msg, true, true, State2), + {_SeqId, State3} = publish(Msg, MsgPropsFun(MsgProps), + true, true, State2), State3 end, AckTags, State))). @@ -783,27 +811,54 @@ one_if(false) -> 0. cons_if(true, E, L) -> [E | L]; cons_if(false, _E, L) -> L. -msg_status(IsPersistent, SeqId, Msg = #basic_message { guid = Guid }) -> +msg_status(IsPersistent, SeqId, Msg = #basic_message { guid = Guid }, + MsgProps) -> #msg_status { seq_id = SeqId, guid = Guid, msg = Msg, is_persistent = IsPersistent, is_delivered = false, - msg_on_disk = false, index_on_disk = false }. + msg_on_disk = false, index_on_disk = false, + msg_props = MsgProps }. + +with_msg_store_state({MSCStateP, MSCStateT}, true, Fun) -> + {Result, MSCStateP1} = Fun(MSCStateP), + {Result, {MSCStateP1, MSCStateT}}; +with_msg_store_state({MSCStateP, MSCStateT}, false, Fun) -> + {Result, MSCStateT1} = Fun(MSCStateT), + {Result, {MSCStateP, MSCStateT1}}. + +with_immutable_msg_store_state(MSCState, IsPersistent, Fun) -> + {Res, MSCState} = with_msg_store_state(MSCState, IsPersistent, + fun (MSCState1) -> + {Fun(MSCState1), MSCState1} + end), + Res. + +msg_store_client_init(MsgStore) -> + rabbit_msg_store:client_init(MsgStore, rabbit_guid:guid()). + +msg_store_write(MSCState, IsPersistent, Guid, Msg) -> + with_immutable_msg_store_state( + MSCState, IsPersistent, + fun (MSCState1) -> rabbit_msg_store:write(Guid, Msg, MSCState1) end). -find_msg_store(true) -> ?PERSISTENT_MSG_STORE; -find_msg_store(false) -> ?TRANSIENT_MSG_STORE. +msg_store_read(MSCState, IsPersistent, Guid) -> + with_msg_store_state( + MSCState, IsPersistent, + fun (MSCState1) -> rabbit_msg_store:read(Guid, MSCState1) end). -with_msg_store_state({{MSCStateP, PRef}, MSCStateT}, true, Fun) -> - {Result, MSCStateP1} = Fun(?PERSISTENT_MSG_STORE, MSCStateP), - {Result, {{MSCStateP1, PRef}, MSCStateT}}; -with_msg_store_state({MSCStateP, {MSCStateT, TRef}}, false, Fun) -> - {Result, MSCStateT1} = Fun(?TRANSIENT_MSG_STORE, MSCStateT), - {Result, {MSCStateP, {MSCStateT1, TRef}}}. +msg_store_remove(MSCState, IsPersistent, Guids) -> + with_immutable_msg_store_state( + MSCState, IsPersistent, + fun (MCSState1) -> rabbit_msg_store:remove(Guids, MCSState1) end). -read_from_msg_store(MSCState, IsPersistent, Guid) -> - with_msg_store_state( +msg_store_release(MSCState, IsPersistent, Guids) -> + with_immutable_msg_store_state( MSCState, IsPersistent, - fun (MsgStore, MSCState1) -> - rabbit_msg_store:read(MsgStore, Guid, MSCState1) - end). + fun (MCSState1) -> rabbit_msg_store:release(Guids, MCSState1) end). + +msg_store_sync(MSCState, IsPersistent, Guids, Callback) -> + with_immutable_msg_store_state( + MSCState, IsPersistent, + fun (MSCState1) -> rabbit_msg_store:sync(Guids, Callback, MSCState1) end). maybe_write_delivered(false, _SeqId, IndexState) -> IndexState; @@ -821,12 +876,13 @@ store_tx(Txn, Tx) -> put({txn, Txn}, Tx). erase_tx(Txn) -> erase({txn, Txn}). persistent_guids(Pubs) -> - [Guid || #basic_message { guid = Guid, is_persistent = true } <- Pubs]. + [Guid || {#basic_message { guid = Guid, + is_persistent = true }, _MsgProps} <- Pubs]. betas_from_index_entries(List, TransientThreshold, IndexState) -> {Filtered, Delivers, Acks} = lists:foldr( - fun ({Guid, SeqId, IsPersistent, IsDelivered}, + fun ({Guid, SeqId, MsgProps, IsPersistent, IsDelivered}, {Filtered1, Delivers1, Acks1}) -> case SeqId < TransientThreshold andalso not IsPersistent of true -> {Filtered1, @@ -838,7 +894,8 @@ betas_from_index_entries(List, TransientThreshold, IndexState) -> is_persistent = IsPersistent, is_delivered = IsDelivered, msg_on_disk = true, - index_on_disk = true + index_on_disk = true, + msg_props = MsgProps }) | Filtered1], Delivers1, Acks1} @@ -885,22 +942,69 @@ update_rate(Now, Then, Count, {OThen, OCount}) -> %% Internal major helpers for Public API %%---------------------------------------------------------------------------- -msg_store_callback(PersistentGuids, Pubs, AckTags, Fun) -> +init(IsDurable, IndexState, DeltaCount, Terms, + PersistentClient, TransientClient) -> + {LowSeqId, NextSeqId, IndexState1} = rabbit_queue_index:bounds(IndexState), + + DeltaCount1 = proplists:get_value(persistent_count, Terms, DeltaCount), + Delta = case DeltaCount1 == 0 andalso DeltaCount /= undefined of + true -> ?BLANK_DELTA; + false -> #delta { start_seq_id = LowSeqId, + count = DeltaCount1, + end_seq_id = NextSeqId } + end, + Now = now(), + State = #vqstate { + q1 = queue:new(), + q2 = bpqueue:new(), + delta = Delta, + q3 = bpqueue:new(), + q4 = queue:new(), + next_seq_id = NextSeqId, + pending_ack = dict:new(), + index_state = IndexState1, + msg_store_clients = {PersistentClient, TransientClient}, + on_sync = ?BLANK_SYNC, + durable = IsDurable, + transient_threshold = NextSeqId, + + len = DeltaCount1, + persistent_count = DeltaCount1, + + target_ram_msg_count = infinity, + ram_msg_count = 0, + ram_msg_count_prev = 0, + ram_index_count = 0, + out_counter = 0, + in_counter = 0, + rates = #rates { egress = {Now, 0}, + ingress = {Now, DeltaCount1}, + avg_egress = 0.0, + avg_ingress = 0.0, + timestamp = Now } }, + a(maybe_deltas_to_betas(State)). + +msg_store_callback(PersistentGuids, Pubs, AckTags, Fun, MsgPropsFun) -> Self = self(), F = fun () -> rabbit_amqqueue:maybe_run_queue_via_backing_queue( Self, fun (StateN) -> tx_commit_post_msg_store( - true, Pubs, AckTags, Fun, StateN) + true, Pubs, AckTags, + Fun, MsgPropsFun, StateN) end) end, fun () -> spawn(fun () -> ok = rabbit_misc:with_exit_handler( - fun () -> rabbit_msg_store:remove( - ?PERSISTENT_MSG_STORE, + fun () -> remove_persistent_messages( PersistentGuids) end, F) end) end. -tx_commit_post_msg_store(HasPersistentPubs, Pubs, AckTags, Fun, +remove_persistent_messages(Guids) -> + PersistentClient = msg_store_client_init(?PERSISTENT_MSG_STORE), + ok = rabbit_msg_store:remove(Guids, PersistentClient), + rabbit_msg_store:client_delete_and_terminate(PersistentClient). + +tx_commit_post_msg_store(HasPersistentPubs, Pubs, AckTags, Fun, MsgPropsFun, State = #vqstate { on_sync = OnSync = #sync { acks_persistent = SPAcks, @@ -913,23 +1017,27 @@ tx_commit_post_msg_store(HasPersistentPubs, Pubs, AckTags, Fun, case IsDurable of true -> [AckTag || AckTag <- AckTags, case dict:fetch(AckTag, PA) of - #msg_status {} -> false; - {IsPersistent, _Guid} -> IsPersistent + #msg_status {} -> + false; + {IsPersistent, _Guid, _MsgProps} -> + IsPersistent end]; false -> [] end, case IsDurable andalso (HasPersistentPubs orelse PersistentAcks =/= []) of - true -> State #vqstate { on_sync = #sync { - acks_persistent = [PersistentAcks | SPAcks], - acks_all = [AckTags | SAcks], - pubs = [Pubs | SPubs], - funs = [Fun | SFuns] }}; + true -> State #vqstate { + on_sync = #sync { + acks_persistent = [PersistentAcks | SPAcks], + acks_all = [AckTags | SAcks], + pubs = [{MsgPropsFun, Pubs} | SPubs], + funs = [Fun | SFuns] }}; false -> State1 = tx_commit_index( - State #vqstate { on_sync = #sync { - acks_persistent = [], - acks_all = [AckTags], - pubs = [Pubs], - funs = [Fun] } }), + State #vqstate { + on_sync = #sync { + acks_persistent = [], + acks_all = [AckTags], + pubs = [{MsgPropsFun, Pubs}], + funs = [Fun] } }), State1 #vqstate { on_sync = OnSync } end. @@ -943,13 +1051,16 @@ tx_commit_index(State = #vqstate { on_sync = #sync { durable = IsDurable }) -> PAcks = lists:append(SPAcks), Acks = lists:append(SAcks), - Pubs = lists:append(lists:reverse(SPubs)), + Pubs = [{Msg, Fun(MsgProps)} || {Fun, PubsN} <- lists:reverse(SPubs), + {Msg, MsgProps} <- lists:reverse(PubsN)], {SeqIds, State1 = #vqstate { index_state = IndexState }} = lists:foldl( - fun (Msg = #basic_message { is_persistent = IsPersistent }, + fun ({Msg = #basic_message { is_persistent = IsPersistent }, + MsgProps}, {SeqIdsAcc, State2}) -> IsPersistent1 = IsDurable andalso IsPersistent, - {SeqId, State3} = publish(Msg, false, IsPersistent1, State2), + {SeqId, State3} = + publish(Msg, MsgProps, false, IsPersistent1, State2), {cons_if(IsPersistent1, SeqId, SeqIdsAcc), State3} end, {PAcks, ack(Acks, State)}, Pubs), IndexState1 = rabbit_queue_index:sync(SeqIds, IndexState), @@ -958,13 +1069,14 @@ tx_commit_index(State = #vqstate { on_sync = #sync { State1 #vqstate { index_state = IndexState1, on_sync = ?BLANK_SYNC }). purge_betas_and_deltas(LensByStore, - State = #vqstate { q3 = Q3, - index_state = IndexState }) -> + State = #vqstate { q3 = Q3, + index_state = IndexState, + msg_store_clients = MSCState }) -> case bpqueue:is_empty(Q3) of true -> {LensByStore, State}; - false -> {LensByStore1, IndexState1} = remove_queue_entries( - fun beta_fold/3, Q3, - LensByStore, IndexState), + false -> {LensByStore1, IndexState1} = + remove_queue_entries(fun beta_fold/3, Q3, + LensByStore, IndexState, MSCState), purge_betas_and_deltas(LensByStore1, maybe_deltas_to_betas( State #vqstate { @@ -972,11 +1084,11 @@ purge_betas_and_deltas(LensByStore, index_state = IndexState1 })) end. -remove_queue_entries(Fold, Q, LensByStore, IndexState) -> +remove_queue_entries(Fold, Q, LensByStore, IndexState, MSCState) -> {GuidsByStore, Delivers, Acks} = Fold(fun remove_queue_entries1/2, {orddict:new(), [], []}, Q), - ok = orddict:fold(fun (MsgStore, Guids, ok) -> - rabbit_msg_store:remove(MsgStore, Guids) + ok = orddict:fold(fun (IsPersistent, Guids, ok) -> + msg_store_remove(MSCState, IsPersistent, Guids) end, ok, GuidsByStore), {sum_guids_by_store_to_len(LensByStore, GuidsByStore), rabbit_queue_index:ack(Acks, @@ -988,8 +1100,7 @@ remove_queue_entries1( index_on_disk = IndexOnDisk, is_persistent = IsPersistent }, {GuidsByStore, Delivers, Acks}) -> {case MsgOnDisk of - true -> rabbit_misc:orddict_cons(find_msg_store(IsPersistent), Guid, - GuidsByStore); + true -> rabbit_misc:orddict_cons(IsPersistent, Guid, GuidsByStore); false -> GuidsByStore end, cons_if(IndexOnDisk andalso not IsDelivered, SeqId, Delivers), @@ -997,8 +1108,8 @@ remove_queue_entries1( sum_guids_by_store_to_len(LensByStore, GuidsByStore) -> orddict:fold( - fun (MsgStore, Guids, LensByStore1) -> - orddict:update_counter(MsgStore, length(Guids), LensByStore1) + fun (IsPersistent, Guids, LensByStore1) -> + orddict:update_counter(IsPersistent, length(Guids), LensByStore1) end, LensByStore, GuidsByStore). %%---------------------------------------------------------------------------- @@ -1006,7 +1117,7 @@ sum_guids_by_store_to_len(LensByStore, GuidsByStore) -> %%---------------------------------------------------------------------------- publish(Msg = #basic_message { is_persistent = IsPersistent }, - IsDelivered, MsgOnDisk, + MsgProps, IsDelivered, MsgOnDisk, State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4, next_seq_id = SeqId, len = Len, @@ -1015,8 +1126,8 @@ publish(Msg = #basic_message { is_persistent = IsPersistent }, durable = IsDurable, ram_msg_count = RamMsgCount }) -> IsPersistent1 = IsDurable andalso IsPersistent, - MsgStatus = (msg_status(IsPersistent1, SeqId, Msg)) - #msg_status { is_delivered = IsDelivered, msg_on_disk = MsgOnDisk }, + MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, MsgProps)) + #msg_status { is_delivered = IsDelivered, msg_on_disk = MsgOnDisk}, {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State), State2 = case bpqueue:is_empty(Q3) of false -> State1 #vqstate { q1 = queue:in(m(MsgStatus1), Q1) }; @@ -1030,38 +1141,35 @@ publish(Msg = #basic_message { is_persistent = IsPersistent }, ram_msg_count = RamMsgCount + 1}}. maybe_write_msg_to_disk(_Force, MsgStatus = #msg_status { - msg_on_disk = true }, MSCState) -> - {MsgStatus, MSCState}; + msg_on_disk = true }, _MSCState) -> + MsgStatus; maybe_write_msg_to_disk(Force, MsgStatus = #msg_status { msg = Msg, guid = Guid, is_persistent = IsPersistent }, MSCState) when Force orelse IsPersistent -> - {ok, MSCState1} = - with_msg_store_state( - MSCState, IsPersistent, - fun (MsgStore, MSCState2) -> - Msg1 = Msg #basic_message { - %% don't persist any recoverable decoded properties - content = rabbit_binary_parser:clear_decoded_content( - Msg #basic_message.content)}, - rabbit_msg_store:write(MsgStore, Guid, Msg1, MSCState2) - end), - {MsgStatus #msg_status { msg_on_disk = true }, MSCState1}; -maybe_write_msg_to_disk(_Force, MsgStatus, MSCState) -> - {MsgStatus, MSCState}. + Msg1 = Msg #basic_message { + %% don't persist any recoverable decoded properties + content = rabbit_binary_parser:clear_decoded_content( + Msg #basic_message.content)}, + ok = msg_store_write(MSCState, IsPersistent, Guid, Msg1), + MsgStatus #msg_status { msg_on_disk = true }; +maybe_write_msg_to_disk(_Force, MsgStatus, _MSCState) -> + MsgStatus. maybe_write_index_to_disk(_Force, MsgStatus = #msg_status { index_on_disk = true }, IndexState) -> true = MsgStatus #msg_status.msg_on_disk, %% ASSERTION {MsgStatus, IndexState}; maybe_write_index_to_disk(Force, MsgStatus = #msg_status { - guid = Guid, seq_id = SeqId, + guid = Guid, + seq_id = SeqId, is_persistent = IsPersistent, - is_delivered = IsDelivered }, IndexState) + is_delivered = IsDelivered, + msg_props = MsgProps}, IndexState) when Force orelse IsPersistent -> true = MsgStatus #msg_status.msg_on_disk, %% ASSERTION - IndexState1 = rabbit_queue_index:publish(Guid, SeqId, IsPersistent, - IndexState), + IndexState1 = rabbit_queue_index:publish( + Guid, SeqId, MsgProps, IsPersistent, IndexState), {MsgStatus #msg_status { index_on_disk = true }, maybe_write_delivered(IsDelivered, SeqId, IndexState1)}; maybe_write_index_to_disk(_Force, MsgStatus, IndexState) -> @@ -1070,43 +1178,44 @@ maybe_write_index_to_disk(_Force, MsgStatus, IndexState) -> maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus, State = #vqstate { index_state = IndexState, msg_store_clients = MSCState }) -> - {MsgStatus1, MSCState1} = maybe_write_msg_to_disk( - ForceMsg, MsgStatus, MSCState), - {MsgStatus2, IndexState1} = maybe_write_index_to_disk( - ForceIndex, MsgStatus1, IndexState), - {MsgStatus2, State #vqstate { index_state = IndexState1, - msg_store_clients = MSCState1 }}. + MsgStatus1 = maybe_write_msg_to_disk(ForceMsg, MsgStatus, MSCState), + {MsgStatus2, IndexState1} = + maybe_write_index_to_disk(ForceIndex, MsgStatus1, IndexState), + {MsgStatus2, State #vqstate { index_state = IndexState1 }}. %%---------------------------------------------------------------------------- %% Internal gubbins for acks %%---------------------------------------------------------------------------- -record_pending_ack(#msg_status { guid = Guid, seq_id = SeqId, +record_pending_ack(#msg_status { seq_id = SeqId, + guid = Guid, is_persistent = IsPersistent, - msg_on_disk = MsgOnDisk } = MsgStatus, PA) -> + msg_on_disk = MsgOnDisk, + msg_props = MsgProps } = MsgStatus, PA) -> AckEntry = case MsgOnDisk of - true -> {IsPersistent, Guid}; + true -> {IsPersistent, Guid, MsgProps}; false -> MsgStatus end, dict:store(SeqId, AckEntry, PA). remove_pending_ack(KeepPersistent, - State = #vqstate { pending_ack = PA, - index_state = IndexState }) -> + State = #vqstate { pending_ack = PA, + index_state = IndexState, + msg_store_clients = MSCState }) -> {SeqIds, GuidsByStore} = dict:fold(fun accumulate_ack/3, {[], orddict:new()}, PA), State1 = State #vqstate { pending_ack = dict:new() }, case KeepPersistent of - true -> case orddict:find(?TRANSIENT_MSG_STORE, GuidsByStore) of + true -> case orddict:find(false, GuidsByStore) of error -> State1; - {ok, Guids} -> ok = rabbit_msg_store:remove( - ?TRANSIENT_MSG_STORE, Guids), + {ok, Guids} -> ok = msg_store_remove(MSCState, false, + Guids), State1 end; false -> IndexState1 = rabbit_queue_index:ack(SeqIds, IndexState), ok = orddict:fold( - fun (MsgStore, Guids, ok) -> - rabbit_msg_store:remove(MsgStore, Guids) + fun (IsPersistent, Guids, ok) -> + msg_store_remove(MSCState, IsPersistent, Guids) end, ok, GuidsByStore), State1 #vqstate { index_state = IndexState1 } end. @@ -1114,8 +1223,10 @@ remove_pending_ack(KeepPersistent, ack(_MsgStoreFun, _Fun, [], State) -> State; ack(MsgStoreFun, Fun, AckTags, State) -> - {{SeqIds, GuidsByStore}, State1 = #vqstate { index_state = IndexState, - persistent_count = PCount }} = + {{SeqIds, GuidsByStore}, + State1 = #vqstate { index_state = IndexState, + msg_store_clients = MSCState, + persistent_count = PCount }} = lists:foldl( fun (SeqId, {Acc, State2 = #vqstate { pending_ack = PA }}) -> {ok, AckEntry} = dict:find(SeqId, PA), @@ -1124,8 +1235,8 @@ ack(MsgStoreFun, Fun, AckTags, State) -> pending_ack = dict:erase(SeqId, PA) })} end, {{[], orddict:new()}, State}, AckTags), IndexState1 = rabbit_queue_index:ack(SeqIds, IndexState), - ok = orddict:fold(fun (MsgStore, Guids, ok) -> - MsgStoreFun(MsgStore, Guids) + ok = orddict:fold(fun (IsPersistent, Guids, ok) -> + MsgStoreFun(MSCState, IsPersistent, Guids) end, ok, GuidsByStore), PCount1 = PCount - find_persistent_count(sum_guids_by_store_to_len( orddict:new(), GuidsByStore)), @@ -1136,12 +1247,12 @@ accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS msg_on_disk = false, index_on_disk = false }, Acc) -> Acc; -accumulate_ack(SeqId, {IsPersistent, Guid}, {SeqIdsAcc, Dict}) -> +accumulate_ack(SeqId, {IsPersistent, Guid, _MsgProps}, {SeqIdsAcc, Dict}) -> {cons_if(IsPersistent, SeqId, SeqIdsAcc), - rabbit_misc:orddict_cons(find_msg_store(IsPersistent), Guid, Dict)}. + rabbit_misc:orddict_cons(IsPersistent, Guid, Dict)}. find_persistent_count(LensByStore) -> - case orddict:find(?PERSISTENT_MSG_STORE, LensByStore) of + case orddict:find(true, LensByStore) of error -> 0; {ok, Len} -> Len end. @@ -1238,40 +1349,33 @@ chunk_size(Current, Permitted) chunk_size(Current, Permitted) -> lists:min([Current - Permitted, ?IO_BATCH_SIZE]). -fetch_from_q3_to_q4(State = #vqstate { - q1 = Q1, - q2 = Q2, - delta = #delta { count = DeltaCount }, - q3 = Q3, - q4 = Q4, - ram_msg_count = RamMsgCount, - ram_index_count = RamIndexCount, - msg_store_clients = MSCState }) -> +fetch_from_q3(State = #vqstate { + q1 = Q1, + q2 = Q2, + delta = #delta { count = DeltaCount }, + q3 = Q3, + q4 = Q4, + ram_index_count = RamIndexCount}) -> case bpqueue:out(Q3) of {empty, _Q3} -> {empty, State}; - {{value, IndexOnDisk, MsgStatus = #msg_status { - msg = undefined, guid = Guid, - is_persistent = IsPersistent }}, Q3a} -> - {{ok, Msg = #basic_message {}}, MSCState1} = - read_from_msg_store(MSCState, IsPersistent, Guid), - Q4a = queue:in(m(MsgStatus #msg_status { msg = Msg }), Q4), + {{value, IndexOnDisk, MsgStatus}, Q3a} -> RamIndexCount1 = RamIndexCount - one_if(not IndexOnDisk), true = RamIndexCount1 >= 0, %% ASSERTION - State1 = State #vqstate { q3 = Q3a, - q4 = Q4a, - ram_msg_count = RamMsgCount + 1, - ram_index_count = RamIndexCount1, - msg_store_clients = MSCState1 }, + State1 = State #vqstate { q3 = Q3a, + ram_index_count = RamIndexCount1 }, State2 = case {bpqueue:is_empty(Q3a), 0 == DeltaCount} of {true, true} -> %% q3 is now empty, it wasn't before; delta is - %% still empty. So q2 must be empty, and q1 - %% can now be joined onto q4 + %% still empty. So q2 must be empty, and we + %% know q4 is empty otherwise we wouldn't be + %% loading from q3. As such, we can just set + %% q4 to Q1. true = bpqueue:is_empty(Q2), %% ASSERTION + true = queue:is_empty(Q4), %% ASSERTION State1 #vqstate { q1 = queue:new(), - q4 = queue:join(Q4a, Q1) }; + q4 = Q1 }; {true, false} -> maybe_deltas_to_betas(State1); {false, _} -> @@ -1280,7 +1384,7 @@ fetch_from_q3_to_q4(State = #vqstate { %% delta and q3 are maintained State1 end, - {loaded, State2} + {loaded, {MsgStatus, State2}} end. maybe_deltas_to_betas(State = #vqstate { delta = ?BLANK_DELTA_PATTERN(X) }) -> @@ -1290,46 +1394,40 @@ maybe_deltas_to_betas(State = #vqstate { delta = Delta, q3 = Q3, index_state = IndexState, - target_ram_msg_count = TargetRamMsgCount, transient_threshold = TransientThreshold }) -> - case bpqueue:is_empty(Q3) orelse (TargetRamMsgCount /= 0) of - false -> - State; - true -> - #delta { start_seq_id = DeltaSeqId, - count = DeltaCount, - end_seq_id = DeltaSeqIdEnd } = Delta, - DeltaSeqId1 = - lists:min([rabbit_queue_index:next_segment_boundary(DeltaSeqId), - DeltaSeqIdEnd]), - {List, IndexState1} = - rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1, IndexState), - {Q3a, IndexState2} = betas_from_index_entries( - List, TransientThreshold, IndexState1), - State1 = State #vqstate { index_state = IndexState2 }, - case bpqueue:len(Q3a) of + #delta { start_seq_id = DeltaSeqId, + count = DeltaCount, + end_seq_id = DeltaSeqIdEnd } = Delta, + DeltaSeqId1 = + lists:min([rabbit_queue_index:next_segment_boundary(DeltaSeqId), + DeltaSeqIdEnd]), + {List, IndexState1} = + rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1, IndexState), + {Q3a, IndexState2} = + betas_from_index_entries(List, TransientThreshold, IndexState1), + State1 = State #vqstate { index_state = IndexState2 }, + case bpqueue:len(Q3a) of + 0 -> + %% we ignored every message in the segment due to it being + %% transient and below the threshold + maybe_deltas_to_betas( + State1 #vqstate { + delta = Delta #delta { start_seq_id = DeltaSeqId1 }}); + Q3aLen -> + Q3b = bpqueue:join(Q3, Q3a), + case DeltaCount - Q3aLen of 0 -> - %% we ignored every message in the segment due to - %% it being transient and below the threshold - maybe_deltas_to_betas( - State #vqstate { - delta = Delta #delta { start_seq_id = DeltaSeqId1 }}); - Q3aLen -> - Q3b = bpqueue:join(Q3, Q3a), - case DeltaCount - Q3aLen of - 0 -> - %% delta is now empty, but it wasn't - %% before, so can now join q2 onto q3 - State1 #vqstate { q2 = bpqueue:new(), - delta = ?BLANK_DELTA, - q3 = bpqueue:join(Q3b, Q2) }; - N when N > 0 -> - Delta1 = #delta { start_seq_id = DeltaSeqId1, - count = N, - end_seq_id = DeltaSeqIdEnd }, - State1 #vqstate { delta = Delta1, - q3 = Q3b } - end + %% delta is now empty, but it wasn't before, so + %% can now join q2 onto q3 + State1 #vqstate { q2 = bpqueue:new(), + delta = ?BLANK_DELTA, + q3 = bpqueue:join(Q3b, Q2) }; + N when N > 0 -> + Delta1 = #delta { start_seq_id = DeltaSeqId1, + count = N, + end_seq_id = DeltaSeqIdEnd }, + State1 #vqstate { delta = Delta1, + q3 = Q3b } end end. |