summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2010-11-02 11:02:53 +0000
committerMatthew Sackman <matthew@rabbitmq.com>2010-11-02 11:02:53 +0000
commitb59f0f574a068ea65ddace332451ec56ed76ef3b (patch)
tree1cf9e95d94ee28060386847001c1365263eefe20
parent94058959138d8643464e7a6b8d76d3e9becee64c (diff)
parent92e7f5bdc6efcae672a295e025ea6ce3d88a7125 (diff)
downloadrabbitmq-server-bug23411.tar.gz
Merging default into bug23411bug23411
-rw-r--r--docs/html-to-website-xml.xsl4
-rw-r--r--docs/rabbitmq-server.1.xml12
-rw-r--r--docs/rabbitmq-service.xml12
-rw-r--r--docs/rabbitmqctl.1.xml12
-rw-r--r--docs/remove-namespaces.xsl3
-rw-r--r--ebin/rabbit_app.in2
-rw-r--r--include/rabbit.hrl6
-rw-r--r--include/rabbit_backing_queue_spec.hrl29
-rw-r--r--packaging/RPMS/Fedora/rabbitmq-server.spec3
-rw-r--r--packaging/debs/Debian/debian/changelog6
-rw-r--r--packaging/macports/Portfile.in2
-rwxr-xr-xscripts/rabbitmq-server2
-rw-r--r--scripts/rabbitmq-server.bat1
-rw-r--r--scripts/rabbitmq-service.bat1
-rw-r--r--src/delegate.erl23
-rw-r--r--src/delegate_sup.erl10
-rw-r--r--src/rabbit_access_control.erl75
-rw-r--r--src/rabbit_amqqueue.erl33
-rw-r--r--src/rabbit_amqqueue_process.erl153
-rw-r--r--src/rabbit_backing_queue.erl14
-rw-r--r--src/rabbit_control.erl6
-rw-r--r--src/rabbit_invariable_queue.erl106
-rw-r--r--src/rabbit_mnesia.erl6
-rw-r--r--src/rabbit_msg_store.erl466
-rw-r--r--src/rabbit_msg_store_gc.erl96
-rw-r--r--src/rabbit_networking.erl25
-rw-r--r--src/rabbit_persister.erl25
-rw-r--r--src/rabbit_queue_index.erl151
-rw-r--r--src/rabbit_reader.erl29
-rw-r--r--src/rabbit_ssl.erl1
-rw-r--r--src/rabbit_tests.erl301
-rw-r--r--src/rabbit_types.erl6
-rw-r--r--src/rabbit_variable_queue.erl780
33 files changed, 1420 insertions, 981 deletions
diff --git a/docs/html-to-website-xml.xsl b/docs/html-to-website-xml.xsl
index c325bb5a..ec8f87e5 100644
--- a/docs/html-to-website-xml.xsl
+++ b/docs/html-to-website-xml.xsl
@@ -17,7 +17,7 @@
<!-- Copy the root node, and munge the outer part of the page -->
<xsl:template match="/html">
<xsl:processing-instruction name="xml-stylesheet">type="text/xml" href="page.xsl"</xsl:processing-instruction>
-<html xmlns:doc="http://www.rabbitmq.com/namespaces/ad-hoc/doc">
+<html xmlns:doc="http://www.rabbitmq.com/namespaces/ad-hoc/doc" xmlns="http://www.w3.org/1999/xhtml">
<head>
<title><xsl:value-of select="document($original)/refentry/refnamediv/refname"/><xsl:if test="document($original)/refentry/refmeta/manvolnum">(<xsl:value-of select="document($original)/refentry/refmeta/manvolnum"/>)</xsl:if> manual page</title>
</head>
@@ -42,7 +42,7 @@
</xsl:choose>
<p>
For more general documentation, please see the
- <a href="admin-guide.html">administrator's guide</a>.
+ <a href="../admin-guide.html">administrator's guide</a>.
</p>
<doc:toc class="compact">
diff --git a/docs/rabbitmq-server.1.xml b/docs/rabbitmq-server.1.xml
index 921da4f1..03e76c79 100644
--- a/docs/rabbitmq-server.1.xml
+++ b/docs/rabbitmq-server.1.xml
@@ -98,18 +98,6 @@ Defaults to 5672.
</listitem>
</varlistentry>
- <varlistentry>
- <term>RABBITMQ_CLUSTER_CONFIG_FILE</term>
- <listitem>
- <para>
-Defaults to <filename>/etc/rabbitmq/rabbitmq_cluster.config</filename>. If this file is
-present it is used by the server to auto-configure a RabbitMQ cluster.
-See the <ulink url="http://www.rabbitmq.com/clustering.html">clustering guide</ulink>
-for details.
- </para>
- </listitem>
- </varlistentry>
-
</variablelist>
</refsect1>
diff --git a/docs/rabbitmq-service.xml b/docs/rabbitmq-service.xml
index 2b416e3e..e95f9889 100644
--- a/docs/rabbitmq-service.xml
+++ b/docs/rabbitmq-service.xml
@@ -193,18 +193,6 @@ manager.
</varlistentry>
<varlistentry>
- <term>RABBITMQ_CLUSTER_CONFIG_FILE</term>
- <listitem>
- <para>
-If this file is
-present it is used by the server to auto-configure a RabbitMQ cluster.
-See the <ulink url="http://www.rabbitmq.com/clustering.html">clustering guide</ulink>
-for details.
- </para>
- </listitem>
- </varlistentry>
-
- <varlistentry>
<term>RABBITMQ_CONSOLE_LOG</term>
<listitem>
<para>
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index 3b7244c7..acb99bc8 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -589,7 +589,7 @@
</varlistentry>
<varlistentry>
- <term><cmdsynopsis><command>set_permissions</command> <arg choice="opt">-p <replaceable>vhostpath</replaceable></arg> <arg choice="opt">-s <replaceable>scope</replaceable></arg> <arg choice="req"><replaceable>user</replaceable></arg> <arg choice="req"><replaceable>conf</replaceable></arg> <arg choice="req"><replaceable>write</replaceable></arg> <arg choice="req"><replaceable>read</replaceable></arg></cmdsynopsis></term>
+ <term><cmdsynopsis><command>set_permissions</command> <arg choice="opt">-p <replaceable>vhostpath</replaceable></arg> <arg choice="req"><replaceable>user</replaceable></arg> <arg choice="req"><replaceable>conf</replaceable></arg> <arg choice="req"><replaceable>write</replaceable></arg> <arg choice="req"><replaceable>read</replaceable></arg></cmdsynopsis></term>
<listitem>
<variablelist>
<varlistentry>
@@ -597,16 +597,6 @@
<listitem><para>The name of the virtual host to which to grant the user access, defaulting to <command>/</command>.</para></listitem>
</varlistentry>
<varlistentry>
- <term>scope</term>
- <listitem><para>Scope of the permissions: either
- <command>client</command> (the default) or
- <command>all</command>. This determines whether
- permissions are checked for server-generated resource
- names (<command>all</command>) or only for
- client-specified resource names
- (<command>client</command>).</para></listitem>
- </varlistentry>
- <varlistentry>
<term>user</term>
<listitem><para>The name of the user to grant access to the specified virtual host.</para></listitem>
</varlistentry>
diff --git a/docs/remove-namespaces.xsl b/docs/remove-namespaces.xsl
index 58a1e826..7f7f3c12 100644
--- a/docs/remove-namespaces.xsl
+++ b/docs/remove-namespaces.xsl
@@ -1,13 +1,14 @@
<?xml version='1.0'?>
<xsl:stylesheet xmlns:xsl="http://www.w3.org/1999/XSL/Transform"
xmlns:doc="http://www.rabbitmq.com/namespaces/ad-hoc/doc"
+ xmlns="http://www.w3.org/1999/xhtml"
version='1.0'>
<xsl:output method="xml" />
<!-- Copy every element through with local name only -->
<xsl:template match="*">
- <xsl:element name="{local-name()}">
+ <xsl:element name="{local-name()}" namespace="">
<xsl:apply-templates select="@*|node()"/>
</xsl:element>
</xsl:template>
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in
index 4be09c5a..17d05a99 100644
--- a/ebin/rabbit_app.in
+++ b/ebin/rabbit_app.in
@@ -29,4 +29,6 @@
{default_user_is_admin, true},
{default_vhost, <<"/">>},
{default_permissions, [<<".*">>, <<".*">>, <<".*">>]},
+ {cluster_nodes, []},
+ {server_properties, []},
{collect_statistics, none}]}]}.
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index af6e257a..4d4f6fe6 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -30,7 +30,7 @@
%%
-record(user, {username, password, is_admin}).
--record(permission, {scope, configure, write, read}).
+-record(permission, {configure, write, read}).
-record(user_vhost, {username, virtual_host}).
-record(user_permission, {user_vhost, permission}).
@@ -63,7 +63,7 @@
-record(binding, {source, key, destination, args = []}).
-record(reverse_binding, {destination, key, source, args = []}).
--record(listener, {node, protocol, host, port}).
+-record(listener, {node, protocol, host, ip_address, port}).
-record(basic_message, {exchange_name, routing_key, content, guid,
is_persistent}).
@@ -74,6 +74,8 @@
-record(event, {type, props, timestamp}).
+-record(message_properties, {expiry}).
+
%%----------------------------------------------------------------------------
-define(COPYRIGHT_MESSAGE, "Copyright (C) 2007-2010 LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd.").
diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl
index 38c6f939..20230b24 100644
--- a/include/rabbit_backing_queue_spec.hrl
+++ b/include/rabbit_backing_queue_spec.hrl
@@ -31,12 +31,15 @@
-type(fetch_result() ::
('empty' |
- %% Message, IsDelivered, AckTag, RemainingLen
+ %% Message, IsDelivered, AckTag, Remaining_Len
{rabbit_types:basic_message(), boolean(), ack(), non_neg_integer()})).
-type(is_durable() :: boolean()).
-type(attempt_recovery() :: boolean()).
-type(purged_msg_count() :: non_neg_integer()).
-type(ack_required() :: boolean()).
+-type(message_properties_transformer() ::
+ fun ((rabbit_types:message_properties())
+ -> rabbit_types:message_properties())).
-spec(start/1 :: ([rabbit_amqqueue:name()]) -> 'ok').
-spec(stop/0 :: () -> 'ok').
@@ -45,19 +48,25 @@
-spec(terminate/1 :: (state()) -> state()).
-spec(delete_and_terminate/1 :: (state()) -> state()).
-spec(purge/1 :: (state()) -> {purged_msg_count(), state()}).
--spec(publish/2 :: (rabbit_types:basic_message(), state()) -> state()).
--spec(publish_delivered/3 ::
- (ack_required(), rabbit_types:basic_message(), state()) ->
- {ack(), state()}).
+-spec(publish/3 :: (rabbit_types:basic_message(),
+ rabbit_types:message_properties(), state()) -> state()).
+-spec(publish_delivered/4 :: (ack_required(), rabbit_types:basic_message(),
+ rabbit_types:message_properties(), state())
+ -> {ack(), state()}).
+-spec(dropwhile/2 ::
+ (fun ((rabbit_types:message_properties()) -> boolean()), state())
+ -> state()).
-spec(fetch/2 :: (ack_required(), state()) -> {fetch_result(), state()}).
-spec(ack/2 :: ([ack()], state()) -> state()).
--spec(tx_publish/3 :: (rabbit_types:txn(), rabbit_types:basic_message(),
- state()) -> state()).
+-spec(tx_publish/4 :: (rabbit_types:txn(), rabbit_types:basic_message(),
+ rabbit_types:message_properties(), state()) -> state()).
-spec(tx_ack/3 :: (rabbit_types:txn(), [ack()], state()) -> state()).
-spec(tx_rollback/2 :: (rabbit_types:txn(), state()) -> {[ack()], state()}).
--spec(tx_commit/3 :: (rabbit_types:txn(), fun (() -> any()), state()) ->
- {[ack()], state()}).
--spec(requeue/2 :: ([ack()], state()) -> state()).
+-spec(tx_commit/4 ::
+ (rabbit_types:txn(), fun (() -> any()),
+ message_properties_transformer(), state()) -> {[ack()], state()}).
+-spec(requeue/3 :: ([ack()], message_properties_transformer(), state())
+ -> state()).
-spec(len/1 :: (state()) -> non_neg_integer()).
-spec(is_empty/1 :: (state()) -> boolean()).
-spec(set_ram_duration_target/2 ::
diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec
index eb0a2a51..209a90ee 100644
--- a/packaging/RPMS/Fedora/rabbitmq-server.spec
+++ b/packaging/RPMS/Fedora/rabbitmq-server.spec
@@ -127,6 +127,9 @@ done
rm -rf %{buildroot}
%changelog
+* Tue Oct 19 2010 vlad@rabbitmq.com 2.1.1-1
+- New Upstream Release
+
* Tue Sep 14 2010 marek@rabbitmq.com 2.1.0-1
- New Upstream Release
diff --git a/packaging/debs/Debian/debian/changelog b/packaging/debs/Debian/debian/changelog
index 9927cfbc..e81fda24 100644
--- a/packaging/debs/Debian/debian/changelog
+++ b/packaging/debs/Debian/debian/changelog
@@ -1,3 +1,9 @@
+rabbitmq-server (2.1.1-1) lucid; urgency=low
+
+ * New Upstream Release
+
+ -- Vlad Alexandru Ionescu <vlad@rabbitmq.com> Tue, 19 Oct 2010 17:20:10 +0100
+
rabbitmq-server (2.1.0-1) lucid; urgency=low
* New Upstream Release
diff --git a/packaging/macports/Portfile.in b/packaging/macports/Portfile.in
index f30460d3..e37a45b3 100644
--- a/packaging/macports/Portfile.in
+++ b/packaging/macports/Portfile.in
@@ -75,7 +75,7 @@ post-destroot {
reinplace -E "s:(/etc/rabbitmq/rabbitmq.conf):${prefix}\\1:g" \
${realsbin}/rabbitmq-env
- foreach var {CONFIG_FILE CLUSTER_CONFIG_FILE LOG_BASE MNESIA_BASE PIDS_FILE} {
+ foreach var {CONFIG_FILE LOG_BASE MNESIA_BASE PIDS_FILE} {
reinplace -E "s:^($var)=/:\\1=${prefix}/:" \
${realsbin}/rabbitmq-multi \
${realsbin}/rabbitmq-server \
diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server
index 8e26663a..ef0a3521 100755
--- a/scripts/rabbitmq-server
+++ b/scripts/rabbitmq-server
@@ -35,7 +35,6 @@ NODENAME=rabbit@${HOSTNAME%%.*}
SERVER_ERL_ARGS="+K true +A30 +P 1048576 \
-kernel inet_default_listen_options [{nodelay,true}] \
-kernel inet_default_connect_options [{nodelay,true}]"
-CLUSTER_CONFIG_FILE=/etc/rabbitmq/rabbitmq_cluster.config
CONFIG_FILE=/etc/rabbitmq/rabbitmq
LOG_BASE=/var/log/rabbitmq
MNESIA_BASE=/var/lib/rabbitmq/mnesia
@@ -59,7 +58,6 @@ else
fi
[ "x" = "x$RABBITMQ_NODENAME" ] && RABBITMQ_NODENAME=${NODENAME}
[ "x" = "x$RABBITMQ_SERVER_ERL_ARGS" ] && RABBITMQ_SERVER_ERL_ARGS=${SERVER_ERL_ARGS}
-[ "x" = "x$RABBITMQ_CLUSTER_CONFIG_FILE" ] && RABBITMQ_CLUSTER_CONFIG_FILE=${CLUSTER_CONFIG_FILE}
[ "x" = "x$RABBITMQ_CONFIG_FILE" ] && RABBITMQ_CONFIG_FILE=${CONFIG_FILE}
[ "x" = "x$RABBITMQ_LOG_BASE" ] && RABBITMQ_LOG_BASE=${LOG_BASE}
[ "x" = "x$RABBITMQ_MNESIA_BASE" ] && RABBITMQ_MNESIA_BASE=${MNESIA_BASE}
diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat
index 5bcbc6ba..193f1c8a 100644
--- a/scripts/rabbitmq-server.bat
+++ b/scripts/rabbitmq-server.bat
@@ -165,7 +165,6 @@ if not "!RABBITMQ_NODE_IP_ADDRESS!"=="" (
-os_mon start_disksup false ^
-os_mon start_memsup false ^
-mnesia dir \""!RABBITMQ_MNESIA_DIR!"\" ^
-!CLUSTER_CONFIG! ^
!RABBITMQ_SERVER_START_ARGS! ^
!STAR!
diff --git a/scripts/rabbitmq-service.bat b/scripts/rabbitmq-service.bat
index 4b3961d4..51ef6ecc 100644
--- a/scripts/rabbitmq-service.bat
+++ b/scripts/rabbitmq-service.bat
@@ -232,7 +232,6 @@ set ERLANG_SERVICE_ARGUMENTS= ^
-os_mon start_disksup false ^
-os_mon start_memsup false ^
-mnesia dir \""!RABBITMQ_MNESIA_DIR!"\" ^
-!CLUSTER_CONFIG! ^
!RABBITMQ_SERVER_START_ARGS! ^
!STAR!
diff --git a/src/delegate.erl b/src/delegate.erl
index c8aa3092..e50b99f1 100644
--- a/src/delegate.erl
+++ b/src/delegate.erl
@@ -35,7 +35,7 @@
-behaviour(gen_server2).
--export([start_link/1, invoke_no_result/2, invoke/2, process_count/0]).
+-export([start_link/2, invoke_no_result/2, invoke/2, process_count/0]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
@@ -44,7 +44,8 @@
-ifdef(use_specs).
--spec(start_link/1 :: (non_neg_integer()) -> {'ok', pid()} | {'error', any()}).
+-spec(start_link/2 ::
+ (atom(), non_neg_integer()) -> {'ok', pid()} | {'error', any()}).
-spec(invoke_no_result/2 ::
(pid() | [pid()], fun ((pid()) -> any())) -> 'ok').
-spec(invoke/2 :: (pid() | [pid()], fun ((pid()) -> A)) -> A).
@@ -60,8 +61,8 @@
%%----------------------------------------------------------------------------
-start_link(Hash) ->
- gen_server2:start_link({local, server(Hash)}, ?MODULE, [], []).
+start_link(Prefix, Hash) ->
+ gen_server2:start_link({local, server(Prefix, Hash)}, ?MODULE, [], []).
invoke(Pid, Fun) when is_pid(Pid) ->
[Res] = invoke_per_node(split_delegate_per_node([Pid]), Fun),
@@ -147,7 +148,8 @@ delegate_per_remote_node(NodePids, Fun, DelegateFun) ->
local_server(Node) ->
case get({delegate_local_server_name, Node}) of
undefined ->
- Name = server(erlang:phash2({self(), Node}, process_count())),
+ Name = server(outgoing,
+ erlang:phash2({self(), Node}, process_count())),
put({delegate_local_server_name, Node}, Name),
Name;
Name -> Name
@@ -160,17 +162,20 @@ remote_server(Node) ->
{badrpc, _} ->
%% Have to return something, if we're just casting
%% then we don't want to blow up
- server(1);
+ server(incoming, 1);
Count ->
- Name = server(erlang:phash2({self(), Node}, Count)),
+ Name = server(incoming,
+ erlang:phash2({self(), Node}, Count)),
put({delegate_remote_server_name, Node}, Name),
Name
end;
Name -> Name
end.
-server(Hash) ->
- list_to_atom("delegate_process_" ++ integer_to_list(Hash)).
+server(Prefix, Hash) ->
+ list_to_atom("delegate_" ++
+ atom_to_list(Prefix) ++ "_" ++
+ integer_to_list(Hash)).
safe_invoke(Pids, Fun) when is_list(Pids) ->
[safe_invoke(Pid, Fun) || Pid <- Pids];
diff --git a/src/delegate_sup.erl b/src/delegate_sup.erl
index ff303ee2..544546f1 100644
--- a/src/delegate_sup.erl
+++ b/src/delegate_sup.erl
@@ -55,9 +55,11 @@ start_link() ->
%%----------------------------------------------------------------------------
init(_Args) ->
- {ok, {{one_for_one, 10, 10},
- [{Hash, {delegate, start_link, [Hash]},
- transient, 16#ffffffff, worker, [delegate]} ||
- Hash <- lists:seq(0, delegate:process_count() - 1)]}}.
+ {ok, {{one_for_one, 10, 10}, specs(incoming) ++ specs(outgoing)}}.
+
+specs(Prefix) ->
+ [{{Prefix, Hash}, {delegate, start_link, [Prefix, Hash]},
+ transient, 16#ffffffff, worker, [delegate]} ||
+ Hash <- lists:seq(0, delegate:process_count() - 1)].
%%----------------------------------------------------------------------------
diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl
index 73fd6f0e..85452abf 100644
--- a/src/rabbit_access_control.erl
+++ b/src/rabbit_access_control.erl
@@ -38,7 +38,7 @@
-export([add_user/2, delete_user/1, change_password/2, set_admin/1,
clear_admin/1, list_users/0, lookup_user/1]).
-export([add_vhost/1, delete_vhost/1, vhost_exists/1, list_vhosts/0]).
--export([set_permissions/5, set_permissions/6, clear_permissions/2,
+-export([set_permissions/5, clear_permissions/2,
list_permissions/0, list_vhost_permissions/1, list_user_permissions/1,
list_user_vhost_permissions/2]).
@@ -52,9 +52,6 @@
-type(username() :: binary()).
-type(password() :: binary()).
-type(regexp() :: binary()).
--type(scope() :: binary()).
--type(scope_atom() :: 'client' | 'all').
-
-spec(check_login/2 ::
(binary(), binary()) -> rabbit_types:user() |
rabbit_types:channel_exit()).
@@ -82,21 +79,15 @@
-spec(list_vhosts/0 :: () -> [rabbit_types:vhost()]).
-spec(set_permissions/5 ::(username(), rabbit_types:vhost(), regexp(),
regexp(), regexp()) -> 'ok').
--spec(set_permissions/6 ::(scope(), username(), rabbit_types:vhost(),
- regexp(), regexp(), regexp()) -> 'ok').
-spec(clear_permissions/2 :: (username(), rabbit_types:vhost()) -> 'ok').
-spec(list_permissions/0 ::
- () -> [{username(), rabbit_types:vhost(), regexp(), regexp(), regexp(),
- scope_atom()}]).
+ () -> [{username(), rabbit_types:vhost(), regexp(), regexp(), regexp()}]).
-spec(list_vhost_permissions/1 ::
- (rabbit_types:vhost()) -> [{username(), regexp(), regexp(), regexp(),
- scope_atom()}]).
+ (rabbit_types:vhost()) -> [{username(), regexp(), regexp(), regexp()}]).
-spec(list_user_permissions/1 ::
- (username()) -> [{rabbit_types:vhost(), regexp(), regexp(), regexp(),
- scope_atom()}]).
+ (username()) -> [{rabbit_types:vhost(), regexp(), regexp(), regexp()}]).
-spec(list_user_vhost_permissions/2 ::
- (username(), rabbit_types:vhost()) -> [{regexp(), regexp(), regexp(),
- scope_atom()}]).
+ (username(), rabbit_types:vhost()) -> [{regexp(), regexp(), regexp()}]).
-endif.
@@ -188,20 +179,15 @@ check_resource_access(Username,
[] ->
false;
[#user_permission{permission = P}] ->
- case {Name, P} of
- {<<"amq.gen",_/binary>>, #permission{scope = client}} ->
- true;
- _ ->
- PermRegexp =
- case element(permission_index(Permission), P) of
- %% <<"^$">> breaks Emacs' erlang mode
- <<"">> -> <<$^, $$>>;
- RE -> RE
- end,
- case re:run(Name, PermRegexp, [{capture, none}]) of
- match -> true;
- nomatch -> false
- end
+ PermRegexp =
+ case element(permission_index(Permission), P) of
+ %% <<"^$">> breaks Emacs' erlang mode
+ <<"">> -> <<$^, $$>>;
+ RE -> RE
+ end,
+ case re:run(Name, PermRegexp, [{capture, none}]) of
+ match -> true;
+ nomatch -> false
end
end,
if Res -> ok;
@@ -334,7 +320,7 @@ internal_delete_vhost(VHostPath) ->
ok = rabbit_exchange:delete(Name, false)
end,
rabbit_exchange:list(VHostPath)),
- lists:foreach(fun ({Username, _, _, _, _}) ->
+ lists:foreach(fun ({Username, _, _, _}) ->
ok = clear_permissions(Username, VHostPath)
end,
list_vhost_permissions(VHostPath)),
@@ -355,16 +341,7 @@ validate_regexp(RegexpBin) ->
end.
set_permissions(Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm) ->
- set_permissions(<<"client">>, Username, VHostPath, ConfigurePerm,
- WritePerm, ReadPerm).
-
-set_permissions(ScopeBin, Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm) ->
lists:map(fun validate_regexp/1, [ConfigurePerm, WritePerm, ReadPerm]),
- Scope = case ScopeBin of
- <<"client">> -> client;
- <<"all">> -> all;
- _ -> throw({error, {invalid_scope, ScopeBin}})
- end,
rabbit_misc:execute_mnesia_transaction(
rabbit_misc:with_user_and_vhost(
Username, VHostPath,
@@ -374,7 +351,6 @@ set_permissions(ScopeBin, Username, VHostPath, ConfigurePerm, WritePerm, ReadPer
username = Username,
virtual_host = VHostPath},
permission = #permission{
- scope = Scope,
configure = ConfigurePerm,
write = WritePerm,
read = ReadPerm}},
@@ -393,35 +369,34 @@ clear_permissions(Username, VHostPath) ->
end)).
list_permissions() ->
- [{Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm, Scope} ||
- {Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm, Scope} <-
+ [{Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm} ||
+ {Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm} <-
list_permissions(match_user_vhost('_', '_'))].
list_vhost_permissions(VHostPath) ->
- [{Username, ConfigurePerm, WritePerm, ReadPerm, Scope} ||
- {Username, _, ConfigurePerm, WritePerm, ReadPerm, Scope} <-
+ [{Username, ConfigurePerm, WritePerm, ReadPerm} ||
+ {Username, _, ConfigurePerm, WritePerm, ReadPerm} <-
list_permissions(rabbit_misc:with_vhost(
VHostPath, match_user_vhost('_', VHostPath)))].
list_user_permissions(Username) ->
- [{VHostPath, ConfigurePerm, WritePerm, ReadPerm, Scope} ||
- {_, VHostPath, ConfigurePerm, WritePerm, ReadPerm, Scope} <-
+ [{VHostPath, ConfigurePerm, WritePerm, ReadPerm} ||
+ {_, VHostPath, ConfigurePerm, WritePerm, ReadPerm} <-
list_permissions(rabbit_misc:with_user(
Username, match_user_vhost(Username, '_')))].
list_user_vhost_permissions(Username, VHostPath) ->
- [{ConfigurePerm, WritePerm, ReadPerm, Scope} ||
- {_, _, ConfigurePerm, WritePerm, ReadPerm, Scope} <-
+ [{ConfigurePerm, WritePerm, ReadPerm} ||
+ {_, _, ConfigurePerm, WritePerm, ReadPerm} <-
list_permissions(rabbit_misc:with_user_and_vhost(
Username, VHostPath,
match_user_vhost(Username, VHostPath)))].
list_permissions(QueryThunk) ->
- [{Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm, Scope} ||
+ [{Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm} ||
#user_permission{user_vhost = #user_vhost{username = Username,
virtual_host = VHostPath},
- permission = #permission{ scope = Scope,
- configure = ConfigurePerm,
+ permission = #permission{ configure = ConfigurePerm,
write = WritePerm,
read = ReadPerm}} <-
%% TODO: use dirty ops instead
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 2389ec86..9d78bafa 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -35,7 +35,7 @@
-export([internal_declare/2, internal_delete/1,
maybe_run_queue_via_backing_queue/2,
update_ram_duration/1, set_ram_duration_target/2,
- set_maximum_since_use/2, maybe_expire/1]).
+ set_maximum_since_use/2, maybe_expire/1, drop_expired/1]).
-export([pseudo_queue/2]).
-export([lookup/1, with/2, with_or_die/2, assert_equivalence/5,
check_exclusive_access/2, with_exclusive_access_or_die/3,
@@ -56,7 +56,7 @@
-include("rabbit.hrl").
-include_lib("stdlib/include/qlc.hrl").
--define(EXPIRES_TYPES, [byte, short, signedint, long]).
+-define(INTEGER_ARG_TYPES, [byte, short, signedint, long]).
%%----------------------------------------------------------------------------
@@ -310,18 +310,30 @@ check_declare_arguments(QueueName, Args) ->
precondition_failed,
"invalid arg '~s' for ~s: ~w",
[Key, rabbit_misc:rs(QueueName), Error])
- end || {Key, Fun} <- [{<<"x-expires">>, fun check_expires_argument/1}]],
+ end || {Key, Fun} <-
+ [{<<"x-expires">>, fun check_expires_argument/1},
+ {<<"x-message-ttl">>, fun check_message_ttl_argument/1}]],
ok.
-check_expires_argument(undefined) ->
+check_expires_argument(Val) ->
+ check_integer_argument(Val,
+ expires_not_of_acceptable_type,
+ expires_zero_or_less).
+
+check_message_ttl_argument(Val) ->
+ check_integer_argument(Val,
+ ttl_not_of_acceptable_type,
+ ttl_zero_or_less).
+
+check_integer_argument(undefined, _, _) ->
ok;
-check_expires_argument({Type, Expires}) when Expires > 0 ->
- case lists:member(Type, ?EXPIRES_TYPES) of
+check_integer_argument({Type, Val}, InvalidTypeError, _) when Val > 0 ->
+ case lists:member(Type, ?INTEGER_ARG_TYPES) of
true -> ok;
- false -> {error, {expires_not_of_acceptable_type, Type, Expires}}
+ false -> {error, {InvalidTypeError, Type, Val}}
end;
-check_expires_argument({_Type, _Expires}) ->
- {error, expires_zero_or_less}.
+check_integer_argument({_Type, _Val}, _, ZeroOrLessError) ->
+ {error, ZeroOrLessError}.
list(VHostPath) ->
mnesia:dirty_match_object(
@@ -466,6 +478,9 @@ set_maximum_since_use(QPid, Age) ->
maybe_expire(QPid) ->
gen_server2:cast(QPid, maybe_expire).
+drop_expired(QPid) ->
+ gen_server2:cast(QPid, drop_expired).
+
on_node_down(Node) ->
rabbit_binding:process_deletions(
lists:foldl(
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 19db731a..6048920e 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -39,6 +39,8 @@
-define(SYNC_INTERVAL, 5). %% milliseconds
-define(RAM_DURATION_UPDATE_INTERVAL, 5000).
+-define(BASE_MESSAGE_PROPERTIES, #message_properties{expiry = undefined}).
+
-export([start_link/1, info_keys/0]).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
@@ -61,7 +63,9 @@
sync_timer_ref,
rate_timer_ref,
expiry_timer_ref,
- stats_timer
+ stats_timer,
+ ttl,
+ ttl_timer_ref
}).
-record(consumer, {tag, ack_required}).
@@ -123,6 +127,7 @@ init(Q) ->
sync_timer_ref = undefined,
rate_timer_ref = undefined,
expiry_timer_ref = undefined,
+ ttl = undefined,
stats_timer = rabbit_event:init_stats_timer()}, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
@@ -145,12 +150,6 @@ code_change(_OldVsn, State, _Extra) ->
%%----------------------------------------------------------------------------
-init_expires(State = #q{q = #amqqueue{arguments = Arguments}}) ->
- case rabbit_misc:table_lookup(Arguments, <<"x-expires">>) of
- {_Type, Expires} -> ensure_expiry_timer(State#q{expires = Expires});
- undefined -> State
- end.
-
declare(Recover, From,
State = #q{q = Q = #amqqueue{name = QName, durable = IsDurable},
backing_queue = BQ, backing_queue_state = undefined,
@@ -165,7 +164,7 @@ declare(Recover, From,
self(), {rabbit_amqqueue,
set_ram_duration_target, [self()]}),
BQS = BQ:init(QName, IsDurable, Recover),
- State1 = init_expires(State#q{backing_queue_state = BQS}),
+ State1 = process_args(State#q{backing_queue_state = BQS}),
rabbit_event:notify(queue_created,
infos(?CREATION_EVENT_KEYS, State1)),
rabbit_event:if_enabled(StatsTimer,
@@ -174,6 +173,19 @@ declare(Recover, From,
Q1 -> {stop, normal, {existing, Q1}, State}
end.
+process_args(State = #q{q = #amqqueue{arguments = Arguments}}) ->
+ lists:foldl(fun({Arg, Fun}, State1) ->
+ case rabbit_misc:table_lookup(Arguments, Arg) of
+ {_Type, Val} -> Fun(Val, State1);
+ undefined -> State1
+ end
+ end, State, [{<<"x-expires">>, fun init_expires/2},
+ {<<"x-message-ttl">>, fun init_ttl/2}]).
+
+init_expires(Expires, State) -> ensure_expiry_timer(State#q{expires = Expires}).
+
+init_ttl(TTL, State) -> drop_expired_messages(State#q{ttl = TTL}).
+
terminate_shutdown(Fun, State) ->
State1 = #q{backing_queue = BQ, backing_queue_state = BQS} =
stop_sync_timer(stop_rate_timer(State)),
@@ -388,35 +400,40 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc,
deliver_from_queue_pred(IsEmpty, _State) ->
not IsEmpty.
-deliver_from_queue_deliver(AckRequired, false,
- State = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
- {{Message, IsDelivered, AckTag, Remaining}, BQS1} =
- BQ:fetch(AckRequired, BQS),
- {{Message, IsDelivered, AckTag}, 0 == Remaining,
- State #q { backing_queue_state = BQS1 }}.
+deliver_from_queue_deliver(AckRequired, false, State) ->
+ {{Message, IsDelivered, AckTag, Remaining}, State1} =
+ fetch(AckRequired, State),
+ {{Message, IsDelivered, AckTag}, 0 == Remaining, State1}.
-run_message_queue(State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
+run_message_queue(State) ->
Funs = {fun deliver_from_queue_pred/2,
fun deliver_from_queue_deliver/3},
+ State1 = #q{backing_queue = BQ, backing_queue_state = BQS} =
+ drop_expired_messages(State),
IsEmpty = BQ:is_empty(BQS),
- {_IsEmpty1, State1} = deliver_msgs_to_consumers(Funs, IsEmpty, State),
- State1.
+ {_IsEmpty1, State2} = deliver_msgs_to_consumers(Funs, IsEmpty, State1),
+ State2.
attempt_delivery(none, _ChPid, Message, State = #q{backing_queue = BQ}) ->
PredFun = fun (IsEmpty, _State) -> not IsEmpty end,
DeliverFun =
fun (AckRequired, false, State1 = #q{backing_queue_state = BQS}) ->
+ %% we don't need an expiry here because messages are
+ %% not being enqueued, so we use an empty
+ %% message_properties.
{AckTag, BQS1} =
- BQ:publish_delivered(AckRequired, Message, BQS),
+ BQ:publish_delivered(AckRequired, Message,
+ ?BASE_MESSAGE_PROPERTIES, BQS),
{{Message, false, AckTag}, true,
State1#q{backing_queue_state = BQS1}}
end,
deliver_msgs_to_consumers({ PredFun, DeliverFun }, false, State);
-attempt_delivery(Txn, ChPid, Message, State = #q{backing_queue = BQ,
+attempt_delivery(Txn, ChPid, Message, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
record_current_channel_tx(ChPid, Txn),
- {true, State#q{backing_queue_state = BQ:tx_publish(Txn, Message, BQS)}}.
+ {true,
+ State#q{backing_queue_state =
+ BQ:tx_publish(Txn, Message, ?BASE_MESSAGE_PROPERTIES, BQS)}}.
deliver_or_enqueue(Txn, ChPid, Message, State = #q{backing_queue = BQ}) ->
case attempt_delivery(Txn, ChPid, Message, State) of
@@ -424,13 +441,22 @@ deliver_or_enqueue(Txn, ChPid, Message, State = #q{backing_queue = BQ}) ->
{true, NewState};
{false, NewState} ->
%% Txn is none and no unblocked channels with consumers
- BQS = BQ:publish(Message, State #q.backing_queue_state),
- {false, NewState#q{backing_queue_state = BQS}}
+ BQS = BQ:publish(Message,
+ message_properties(State),
+ State #q.backing_queue_state),
+ {false, ensure_ttl_timer(NewState#q{backing_queue_state = BQS})}
end.
-requeue_and_run(AckTags, State = #q{backing_queue = BQ}) ->
+requeue_and_run(AckTags, State = #q{backing_queue = BQ, ttl=TTL}) ->
maybe_run_queue_via_backing_queue(
- fun (BQS) -> BQ:requeue(AckTags, BQS) end, State).
+ fun (BQS) ->
+ BQ:requeue(AckTags, reset_msg_expiry_fun(TTL), BQS)
+ end, State).
+
+fetch(AckRequired, State = #q{backing_queue_state = BQS,
+ backing_queue = BQ}) ->
+ {Result, BQS1} = BQ:fetch(AckRequired, BQS),
+ {Result, State#q{backing_queue_state = BQS1}}.
add_consumer(ChPid, Consumer, Queue) -> queue:in({ChPid, Consumer}, Queue).
@@ -525,10 +551,13 @@ qname(#q{q = #amqqueue{name = QName}}) -> QName.
maybe_run_queue_via_backing_queue(Fun, State = #q{backing_queue_state = BQS}) ->
run_message_queue(State#q{backing_queue_state = Fun(BQS)}).
-commit_transaction(Txn, From, ChPid, State = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
- {AckTags, BQS1} =
- BQ:tx_commit(Txn, fun () -> gen_server2:reply(From, ok) end, BQS),
+commit_transaction(Txn, From, ChPid, State = #q{backing_queue = BQ,
+ backing_queue_state = BQS,
+ ttl = TTL}) ->
+ {AckTags, BQS1} = BQ:tx_commit(Txn,
+ fun () -> gen_server2:reply(From, ok) end,
+ reset_msg_expiry_fun(TTL),
+ BQS),
%% ChPid must be known here because of the participant management
%% by the channel.
C = #cr{acktags = ChAckTags} = lookup_ch(ChPid),
@@ -547,6 +576,44 @@ rollback_transaction(Txn, ChPid, State = #q{backing_queue = BQ,
subtract_acks(A, B) when is_list(B) ->
lists:foldl(fun sets:del_element/2, A, B).
+reset_msg_expiry_fun(TTL) ->
+ fun(MsgProps) ->
+ MsgProps#message_properties{expiry = calculate_msg_expiry(TTL)}
+ end.
+
+message_properties(#q{ttl=TTL}) ->
+ #message_properties{expiry = calculate_msg_expiry(TTL)}.
+
+calculate_msg_expiry(undefined) -> undefined;
+calculate_msg_expiry(TTL) -> now_millis() + (TTL * 1000).
+
+drop_expired_messages(State = #q{ttl = undefined}) ->
+ State;
+drop_expired_messages(State = #q{backing_queue_state = BQS,
+ backing_queue = BQ}) ->
+ Now = now_millis(),
+ BQS1 = BQ:dropwhile(
+ fun (#message_properties{expiry = Expiry}) ->
+ Now > Expiry
+ end, BQS),
+ ensure_ttl_timer(State#q{backing_queue_state = BQS1}).
+
+ensure_ttl_timer(State = #q{backing_queue = BQ,
+ backing_queue_state = BQS,
+ ttl = TTL,
+ ttl_timer_ref = undefined})
+ when TTL =/= undefined ->
+ case BQ:is_empty(BQS) of
+ true -> State;
+ false -> TRef = timer:apply_after(TTL, rabbit_amqqueue, drop_expired,
+ [self()]),
+ State#q{ttl_timer_ref = TRef}
+ end;
+ensure_ttl_timer(State) ->
+ State.
+
+now_millis() -> timer:now_diff(now(), {0,0,0}).
+
infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
i(name, #q{q = #amqqueue{name = Name}}) -> Name;
@@ -605,6 +672,7 @@ prioritise_cast(Msg, _State) ->
{set_ram_duration_target, _Duration} -> 8;
{set_maximum_since_use, _Age} -> 8;
maybe_expire -> 8;
+ drop_expired -> 8;
emit_stats -> 7;
{ack, _Txn, _MsgIds, _ChPid} -> 7;
{reject, _MsgIds, _Requeue, _ChPid} -> 7;
@@ -695,21 +763,21 @@ handle_call({notify_down, ChPid}, _From, State) ->
end;
handle_call({basic_get, ChPid, NoAck}, _From,
- State = #q{q = #amqqueue{name = QName},
- backing_queue_state = BQS, backing_queue = BQ}) ->
+ State = #q{q = #amqqueue{name = QName}}) ->
AckRequired = not NoAck,
State1 = ensure_expiry_timer(State),
- case BQ:fetch(AckRequired, BQS) of
- {empty, BQS1} -> reply(empty, State1#q{backing_queue_state = BQS1});
- {{Message, IsDelivered, AckTag, Remaining}, BQS1} ->
+ case fetch(AckRequired, drop_expired_messages(State1)) of
+ {empty, State2} ->
+ reply(empty, State2);
+ {{Message, IsDelivered, AckTag, Remaining}, State2} ->
case AckRequired of
- true -> C = #cr{acktags = ChAckTags} = ch_record(ChPid),
+ true -> C = #cr{acktags = ChAckTags} = ch_record(ChPid),
store_ch_record(
C#cr{acktags = sets:add_element(AckTag, ChAckTags)});
false -> ok
end,
Msg = {QName, self(), AckTag, IsDelivered, Message},
- reply({ok, Remaining, Msg}, State1#q{backing_queue_state = BQS1})
+ reply({ok, Remaining, Msg}, State2)
end;
handle_call({basic_consume, NoAck, ChPid, LimiterPid,
@@ -781,11 +849,11 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
end
end;
-handle_call(stat, _From, State = #q{backing_queue = BQ,
- backing_queue_state = BQS,
- active_consumers = ActiveConsumers}) ->
- reply({ok, BQ:len(BQS), queue:len(ActiveConsumers)},
- ensure_expiry_timer(State));
+handle_call(stat, _From, State) ->
+ State1 = #q{backing_queue = BQ, backing_queue_state = BQS,
+ active_consumers = ActiveConsumers} =
+ drop_expired_messages(ensure_expiry_timer(State)),
+ reply({ok, BQ:len(BQS), queue:len(ActiveConsumers)}, State1);
handle_call({delete, IfUnused, IfEmpty}, _From,
State = #q{backing_queue_state = BQS, backing_queue = BQ}) ->
@@ -918,6 +986,9 @@ handle_cast(maybe_expire, State) ->
false -> noreply(ensure_expiry_timer(State))
end;
+handle_cast(drop_expired, State) ->
+ noreply(drop_expired_messages(State#q{ttl_timer_ref = undefined}));
+
handle_cast(emit_stats, State = #q{stats_timer = StatsTimer}) ->
%% Do not invoke noreply as it would see no timer and create a new one.
emit_stats(State),
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 2230c507..352e76fd 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -62,12 +62,16 @@ behaviour_info(callbacks) ->
{purge, 1},
%% Publish a message.
- {publish, 2},
+ {publish, 3},
%% Called for messages which have already been passed straight
%% out to a client. The queue will be empty for these calls
%% (i.e. saves the round trip through the backing queue).
- {publish_delivered, 3},
+ {publish_delivered, 4},
+
+ %% Drop messages from the head of the queue while the supplied
+ %% predicate returns true.
+ {dropwhile, 2},
%% Produce the next message.
{fetch, 2},
@@ -77,7 +81,7 @@ behaviour_info(callbacks) ->
{ack, 2},
%% A publish, but in the context of a transaction.
- {tx_publish, 3},
+ {tx_publish, 4},
%% Acks, but in the context of a transaction.
{tx_ack, 3},
@@ -89,11 +93,11 @@ behaviour_info(callbacks) ->
%% Commit a transaction. The Fun passed in must be called once
%% the messages have really been commited. This CPS permits the
%% possibility of commit coalescing.
- {tx_commit, 3},
+ {tx_commit, 4},
%% Reinsert messages into the queue which have already been
%% delivered and were pending acknowledgement.
- {requeue, 2},
+ {requeue, 3},
%% How long is my queue?
{len, 1},
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index 8facaf16..6b212745 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -39,7 +39,6 @@
-define(QUIET_OPT, "-q").
-define(NODE_OPT, "-n").
-define(VHOST_OPT, "-p").
--define(SCOPE_OPT, "-s").
%%----------------------------------------------------------------------------
@@ -67,7 +66,7 @@ start() ->
{[Command0 | Args], Opts} =
rabbit_misc:get_options(
[{flag, ?QUIET_OPT}, {option, ?NODE_OPT, NodeStr},
- {option, ?VHOST_OPT, "/"}, {option, ?SCOPE_OPT, "client"}],
+ {option, ?VHOST_OPT, "/"}],
FullCommand),
Opts1 = lists:map(fun({K, V}) ->
case K of
@@ -289,10 +288,9 @@ action(list_consumers, Node, _Args, Opts, Inform) ->
action(set_permissions, Node, [Username, CPerm, WPerm, RPerm], Opts, Inform) ->
VHost = proplists:get_value(?VHOST_OPT, Opts),
- Scope = proplists:get_value(?SCOPE_OPT, Opts),
Inform("Setting permissions for user ~p in vhost ~p", [Username, VHost]),
call(Node, {rabbit_access_control, set_permissions,
- [Scope, Username, VHost, CPerm, WPerm, RPerm]});
+ [Username, VHost, CPerm, WPerm, RPerm]});
action(clear_permissions, Node, [Username], Opts, Inform) ->
VHost = proplists:get_value(?VHOST_OPT, Opts),
diff --git a/src/rabbit_invariable_queue.erl b/src/rabbit_invariable_queue.erl
index 4e0dad84..9855f18c 100644
--- a/src/rabbit_invariable_queue.erl
+++ b/src/rabbit_invariable_queue.erl
@@ -31,9 +31,9 @@
-module(rabbit_invariable_queue).
--export([init/3, terminate/1, delete_and_terminate/1, purge/1, publish/2,
- publish_delivered/3, fetch/2, ack/2, tx_publish/3, tx_ack/3,
- tx_rollback/2, tx_commit/3, requeue/2, len/1, is_empty/1,
+-export([init/3, terminate/1, delete_and_terminate/1, purge/1, publish/3,
+ publish_delivered/4, fetch/2, ack/2, tx_publish/4, tx_ack/3,
+ dropwhile/2, tx_rollback/2, tx_commit/4, requeue/3, len/1, is_empty/1,
set_ram_duration_target/2, ram_duration/1, needs_idle_timeout/1,
idle_timeout/1, handle_pre_hibernate/1, status/1]).
@@ -89,40 +89,61 @@ purge(State = #iv_state { queue = Q, qname = QName, durable = IsDurable,
%% We do not purge messages pending acks.
{AckTags, PA} =
rabbit_misc:queue_fold(
- fun ({#basic_message { is_persistent = false }, _IsDelivered}, Acc) ->
+ fun ({#basic_message { is_persistent = false },
+ _MsgProps, _IsDelivered}, Acc) ->
Acc;
- ({Msg = #basic_message { guid = Guid }, IsDelivered},
+ ({Msg = #basic_message { guid = Guid }, MsgProps, IsDelivered},
{AckTagsN, PAN}) ->
ok = persist_delivery(QName, IsDurable, IsDelivered, Msg),
- {[Guid | AckTagsN], dict:store(Guid, Msg, PAN)}
+ {[Guid | AckTagsN], store_ack(Msg, MsgProps, PAN)}
end, {[], dict:new()}, Q),
ok = persist_acks(QName, IsDurable, none, AckTags, PA),
{Len, State #iv_state { len = 0, queue = queue:new() }}.
-publish(Msg, State = #iv_state { queue = Q, qname = QName, durable = IsDurable,
- len = Len }) ->
- ok = persist_message(QName, IsDurable, none, Msg),
- State #iv_state { queue = queue:in({Msg, false}, Q), len = Len + 1 }.
+publish(Msg, MsgProps, State = #iv_state { queue = Q,
+ qname = QName,
+ durable = IsDurable,
+ len = Len }) ->
+ ok = persist_message(QName, IsDurable, none, Msg, MsgProps),
+ State #iv_state { queue = enqueue(Msg, MsgProps, false, Q), len = Len + 1 }.
-publish_delivered(false, _Msg, State) ->
+publish_delivered(false, _Msg, _MsgProps, State) ->
{blank_ack, State};
publish_delivered(true, Msg = #basic_message { guid = Guid },
+ MsgProps,
State = #iv_state { qname = QName, durable = IsDurable,
len = 0, pending_ack = PA }) ->
- ok = persist_message(QName, IsDurable, none, Msg),
+ ok = persist_message(QName, IsDurable, none, Msg, MsgProps),
ok = persist_delivery(QName, IsDurable, false, Msg),
- {Guid, State #iv_state { pending_ack = dict:store(Guid, Msg, PA) }}.
+ {Guid, State #iv_state { pending_ack = store_ack(Msg, MsgProps, PA) }}.
+
+dropwhile(_Pred, State = #iv_state { len = 0 }) ->
+ State;
+dropwhile(Pred, State = #iv_state { queue = Q }) ->
+ {{value, {Msg, MsgProps, IsDelivered}}, Q1} = queue:out(Q),
+ case Pred(MsgProps) of
+ true -> {_, State1} = fetch_internal(false, Q1, Msg, MsgProps,
+ IsDelivered, State),
+ dropwhile(Pred, State1);
+ false -> State
+ end.
fetch(_AckRequired, State = #iv_state { len = 0 }) ->
{empty, State};
-fetch(AckRequired, State = #iv_state { len = Len, queue = Q, qname = QName,
- durable = IsDurable,
- pending_ack = PA }) ->
- {{value, {Msg = #basic_message { guid = Guid }, IsDelivered}}, Q1} =
- queue:out(Q),
+fetch(AckRequired, State = #iv_state { queue = Q }) ->
+ {{value, {Msg, MsgProps, IsDelivered}}, Q1} = queue:out(Q),
+ fetch_internal(AckRequired, Q1, Msg, MsgProps, IsDelivered, State).
+
+fetch_internal(AckRequired, Q1,
+ Msg = #basic_message { guid = Guid },
+ MsgProps, IsDelivered,
+ State = #iv_state { len = Len,
+ qname = QName,
+ durable = IsDurable,
+ pending_ack = PA }) ->
Len1 = Len - 1,
ok = persist_delivery(QName, IsDurable, IsDelivered, Msg),
- PA1 = dict:store(Guid, Msg, PA),
+ PA1 = store_ack(Msg, MsgProps, PA),
{AckTag, PA2} = case AckRequired of
true -> {Guid, PA1};
false -> ok = persist_acks(QName, IsDurable, none,
@@ -138,11 +159,11 @@ ack(AckTags, State = #iv_state { qname = QName, durable = IsDurable,
PA1 = remove_acks(AckTags, PA),
State #iv_state { pending_ack = PA1 }.
-tx_publish(Txn, Msg, State = #iv_state { qname = QName,
- durable = IsDurable }) ->
+tx_publish(Txn, Msg, MsgProps, State = #iv_state { qname = QName,
+ durable = IsDurable }) ->
Tx = #tx { pending_messages = Pubs } = lookup_tx(Txn),
- store_tx(Txn, Tx #tx { pending_messages = [Msg | Pubs] }),
- ok = persist_message(QName, IsDurable, Txn, Msg),
+ store_tx(Txn, Tx #tx { pending_messages = [{Msg, MsgProps} | Pubs] }),
+ ok = persist_message(QName, IsDurable, Txn, Msg, MsgProps),
State.
tx_ack(Txn, AckTags, State = #iv_state { qname = QName, durable = IsDurable,
@@ -159,8 +180,10 @@ tx_rollback(Txn, State = #iv_state { qname = QName }) ->
erase_tx(Txn),
{lists:flatten(AckTags), State}.
-tx_commit(Txn, Fun, State = #iv_state { qname = QName, pending_ack = PA,
- queue = Q, len = Len }) ->
+tx_commit(Txn, Fun, MsgPropsFun, State = #iv_state { qname = QName,
+ pending_ack = PA,
+ queue = Q,
+ len = Len }) ->
#tx { pending_acks = AckTags, pending_messages = PubsRev } = lookup_tx(Txn),
ok = do_if_persistent(fun rabbit_persister:commit_transaction/1,
Txn, QName),
@@ -168,13 +191,16 @@ tx_commit(Txn, Fun, State = #iv_state { qname = QName, pending_ack = PA,
Fun(),
AckTags1 = lists:flatten(AckTags),
PA1 = remove_acks(AckTags1, PA),
- {Q1, Len1} = lists:foldr(fun (Msg, {QN, LenN}) ->
- {queue:in({Msg, false}, QN), LenN + 1}
+ {Q1, Len1} = lists:foldr(fun ({Msg, MsgProps}, {QN, LenN}) ->
+ {enqueue(Msg, MsgPropsFun(MsgProps),
+ false, QN),
+ LenN + 1}
end, {Q, Len}, PubsRev),
{AckTags1, State #iv_state { pending_ack = PA1, queue = Q1, len = Len1 }}.
-requeue(AckTags, State = #iv_state { pending_ack = PA, queue = Q,
- len = Len }) ->
+requeue(AckTags, MsgPropsFun, State = #iv_state { pending_ack = PA,
+ queue = Q,
+ len = Len }) ->
%% We don't need to touch the persister here - the persister will
%% already have these messages published and delivered as
%% necessary. The complication is that the persister's seq_id will
@@ -186,12 +212,17 @@ requeue(AckTags, State = #iv_state { pending_ack = PA, queue = Q,
%% order to the last known state of our queue, prior to shutdown.
{Q1, Len1} = lists:foldl(
fun (Guid, {QN, LenN}) ->
- {ok, Msg = #basic_message {}} = dict:find(Guid, PA),
- {queue:in({Msg, true}, QN), LenN + 1}
+ {Msg = #basic_message {}, MsgProps}
+ = dict:fetch(Guid, PA),
+ {enqueue(Msg, MsgPropsFun(MsgProps), true, QN),
+ LenN + 1}
end, {Q, Len}, AckTags),
PA1 = remove_acks(AckTags, PA),
State #iv_state { pending_ack = PA1, queue = Q1, len = Len1 }.
+enqueue(Msg, MsgProps, IsDelivered, Q) ->
+ queue:in({Msg, MsgProps, IsDelivered}, Q).
+
len(#iv_state { len = Len }) -> Len.
is_empty(State) -> 0 == len(State).
@@ -212,6 +243,9 @@ status(_State) -> [].
remove_acks(AckTags, PA) -> lists:foldl(fun dict:erase/2, PA, AckTags).
+store_ack(Msg = #basic_message { guid = Guid }, MsgProps, PA) ->
+ dict:store(Guid, {Msg, MsgProps}, PA).
+
%%----------------------------------------------------------------------------
lookup_tx(Txn) ->
@@ -243,14 +277,15 @@ do_if_persistent(F, Txn, QName) ->
%%----------------------------------------------------------------------------
persist_message(QName, true, Txn, Msg = #basic_message {
- is_persistent = true }) ->
+ is_persistent = true }, MsgProps) ->
Msg1 = Msg #basic_message {
%% don't persist any recoverable decoded properties
content = rabbit_binary_parser:clear_decoded_content(
Msg #basic_message.content)},
persist_work(Txn, QName,
- [{publish, Msg1, {QName, Msg1 #basic_message.guid}}]);
-persist_message(_QName, _IsDurable, _Txn, _Msg) ->
+ [{publish, Msg1, MsgProps,
+ {QName, Msg1 #basic_message.guid}}]);
+persist_message(_QName, _IsDurable, _Txn, _Msg, _MsgProps) ->
ok.
persist_delivery(QName, true, false, #basic_message { is_persistent = true,
@@ -263,7 +298,8 @@ persist_acks(QName, true, Txn, AckTags, PA) ->
persist_work(Txn, QName,
[{ack, {QName, Guid}} || Guid <- AckTags,
begin
- {ok, Msg} = dict:find(Guid, PA),
+ {ok, {Msg, _MsgProps}}
+ = dict:find(Guid, PA),
Msg #basic_message.is_persistent
end]);
persist_acks(_QName, _IsDurable, _Txn, _AckTags, _PA) ->
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 577d206d..8de2f0d6 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -340,10 +340,8 @@ read_cluster_nodes_config() ->
case rabbit_misc:read_term_file(FileName) of
{ok, [ClusterNodes]} -> ClusterNodes;
{error, enoent} ->
- case application:get_env(cluster_nodes) of
- undefined -> [];
- {ok, ClusterNodes} -> ClusterNodes
- end;
+ {ok, ClusterNodes} = application:get_env(rabbit, cluster_nodes),
+ ClusterNodes;
{error, Reason} ->
throw({error, {cannot_read_cluster_nodes_config,
FileName, Reason}})
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 66cc06cf..24b017b5 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -34,10 +34,12 @@
-behaviour(gen_server2).
-export([start_link/4, successfully_recovered_state/1,
- client_init/2, client_terminate/2, client_delete_and_terminate/3,
- write/4, read/3, contains/2, remove/2, release/2, sync/3]).
+ client_init/2, client_terminate/1, client_delete_and_terminate/1,
+ client_ref/1,
+ write/3, read/2, contains/2, remove/2, release/2, sync/3]).
--export([sync/1, gc_done/4, set_maximum_since_use/2, gc/3]). %% internal
+-export([sync/1, set_maximum_since_use/2,
+ has_readers/2, combine_files/3, delete_file/2]). %% internal
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3, prioritise_call/3, prioritise_cast/2]).
@@ -74,7 +76,6 @@
sum_valid_data, %% sum of valid data in all files
sum_file_size, %% sum of file sizes
pending_gc_completion, %% things to do once GC completes
- gc_active, %% is the GC currently working?
gc_pid, %% pid of our GC
file_handles_ets, %% tid of the shared file handles table
file_summary_ets, %% tid of the file summary table
@@ -86,7 +87,9 @@
}).
-record(client_msstate,
- { file_handle_cache,
+ { server,
+ client_ref,
+ file_handle_cache,
index_state,
index_module,
dir,
@@ -100,13 +103,33 @@
-record(file_summary,
{file, valid_total_size, left, right, file_size, locked, readers}).
+-record(gc_state,
+ { dir,
+ index_module,
+ index_state,
+ file_summary_ets,
+ msg_store
+ }).
+
%%----------------------------------------------------------------------------
-ifdef(use_specs).
+-export_type([gc_state/0, file_num/0]).
+
+-opaque(gc_state() :: #gc_state { dir :: file:filename(),
+ index_module :: atom(),
+ index_state :: any(),
+ file_summary_ets :: ets:tid(),
+ msg_store :: server()
+ }).
+
-type(server() :: pid() | atom()).
+-type(client_ref() :: binary()).
-type(file_num() :: non_neg_integer()).
-type(client_msstate() :: #client_msstate {
+ server :: server(),
+ client_ref :: client_ref(),
file_handle_cache :: dict:dictionary(),
index_state :: any(),
index_module :: atom(),
@@ -124,26 +147,25 @@
(atom(), file:filename(), [binary()] | 'undefined',
startup_fun_state()) -> rabbit_types:ok_pid_or_error()).
-spec(successfully_recovered_state/1 :: (server()) -> boolean()).
--spec(client_init/2 :: (server(), binary()) -> client_msstate()).
--spec(client_terminate/2 :: (client_msstate(), server()) -> 'ok').
--spec(client_delete_and_terminate/3 ::
- (client_msstate(), server(), binary()) -> 'ok').
--spec(write/4 :: (server(), rabbit_guid:guid(), msg(), client_msstate()) ->
- rabbit_types:ok(client_msstate())).
--spec(read/3 :: (server(), rabbit_guid:guid(), client_msstate()) ->
- {rabbit_types:ok(msg()) | 'not_found', client_msstate()}).
--spec(contains/2 :: (server(), rabbit_guid:guid()) -> boolean()).
--spec(remove/2 :: (server(), [rabbit_guid:guid()]) -> 'ok').
--spec(release/2 :: (server(), [rabbit_guid:guid()]) -> 'ok').
--spec(sync/3 :: (server(), [rabbit_guid:guid()], fun (() -> any())) -> 'ok').
+-spec(client_init/2 :: (server(), client_ref()) -> client_msstate()).
+-spec(client_terminate/1 :: (client_msstate()) -> 'ok').
+-spec(client_delete_and_terminate/1 :: (client_msstate()) -> 'ok').
+-spec(client_ref/1 :: (client_msstate()) -> client_ref()).
+-spec(write/3 :: (rabbit_guid:guid(), msg(), client_msstate()) -> 'ok').
+-spec(read/2 :: (rabbit_guid:guid(), client_msstate()) ->
+ {rabbit_types:ok(msg()) | 'not_found', client_msstate()}).
+-spec(contains/2 :: (rabbit_guid:guid(), client_msstate()) -> boolean()).
+-spec(remove/2 :: ([rabbit_guid:guid()], client_msstate()) -> 'ok').
+-spec(release/2 :: ([rabbit_guid:guid()], client_msstate()) -> 'ok').
+-spec(sync/3 :: ([rabbit_guid:guid()], fun (() -> any()), client_msstate()) ->
+ 'ok').
-spec(sync/1 :: (server()) -> 'ok').
--spec(gc_done/4 :: (server(), non_neg_integer(), file_num(), file_num()) ->
- 'ok').
-spec(set_maximum_since_use/2 :: (server(), non_neg_integer()) -> 'ok').
--spec(gc/3 :: (non_neg_integer(), non_neg_integer(),
- {ets:tid(), file:filename(), atom(), any()}) ->
- 'concurrent_readers' | non_neg_integer()).
+-spec(has_readers/2 :: (non_neg_integer(), gc_state()) -> boolean()).
+-spec(combine_files/3 :: (non_neg_integer(), non_neg_integer(), gc_state()) ->
+ non_neg_integer()).
+-spec(delete_file/2 :: (non_neg_integer(), gc_state()) -> non_neg_integer()).
-endif.
@@ -316,7 +338,9 @@ client_init(Server, Ref) ->
{IState, IModule, Dir, GCPid,
FileHandlesEts, FileSummaryEts, DedupCacheEts, CurFileCacheEts} =
gen_server2:call(Server, {new_client_state, Ref}, infinity),
- #client_msstate { file_handle_cache = dict:new(),
+ #client_msstate { server = Server,
+ client_ref = Ref,
+ file_handle_cache = dict:new(),
index_state = IState,
index_module = IModule,
dir = Dir,
@@ -326,20 +350,22 @@ client_init(Server, Ref) ->
dedup_cache_ets = DedupCacheEts,
cur_file_cache_ets = CurFileCacheEts }.
-client_terminate(CState, Server) ->
+client_terminate(CState) ->
close_all_handles(CState),
- ok = gen_server2:call(Server, client_terminate, infinity).
+ ok = server_call(CState, client_terminate).
-client_delete_and_terminate(CState, Server, Ref) ->
+client_delete_and_terminate(CState = #client_msstate { client_ref = Ref }) ->
close_all_handles(CState),
- ok = gen_server2:cast(Server, {client_delete, Ref}).
+ ok = server_cast(CState, {client_delete, Ref}).
-write(Server, Guid, Msg,
+client_ref(#client_msstate { client_ref = Ref }) -> Ref.
+
+write(Guid, Msg,
CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts }) ->
ok = update_msg_cache(CurFileCacheEts, Guid, Msg),
- {gen_server2:cast(Server, {write, Guid}), CState}.
+ ok = server_cast(CState, {write, Guid}).
-read(Server, Guid,
+read(Guid,
CState = #client_msstate { dedup_cache_ets = DedupCacheEts,
cur_file_cache_ets = CurFileCacheEts }) ->
%% 1. Check the dedup cache
@@ -348,13 +374,12 @@ read(Server, Guid,
%% 2. Check the cur file cache
case ets:lookup(CurFileCacheEts, Guid) of
[] ->
- Defer = fun() -> {gen_server2:call(
- Server, {read, Guid}, infinity),
- CState} end,
+ Defer = fun() ->
+ {server_call(CState, {read, Guid}), CState}
+ end,
case index_lookup_positive_ref_count(Guid, CState) of
not_found -> Defer();
- MsgLocation -> client_read1(Server, MsgLocation, Defer,
- CState)
+ MsgLocation -> client_read1(MsgLocation, Defer, CState)
end;
[{Guid, Msg, _CacheRefCount}] ->
%% Although we've found it, we don't know the
@@ -365,19 +390,16 @@ read(Server, Guid,
{{ok, Msg}, CState}
end.
-contains(Server, Guid) -> gen_server2:call(Server, {contains, Guid}, infinity).
-remove(_Server, []) -> ok;
-remove(Server, Guids) -> gen_server2:cast(Server, {remove, Guids}).
-release(_Server, []) -> ok;
-release(Server, Guids) -> gen_server2:cast(Server, {release, Guids}).
-sync(Server, Guids, K) -> gen_server2:cast(Server, {sync, Guids, K}).
+contains(Guid, CState) -> server_call(CState, {contains, Guid}).
+remove([], _CState) -> ok;
+remove(Guids, CState) -> server_cast(CState, {remove, Guids}).
+release([], _CState) -> ok;
+release(Guids, CState) -> server_cast(CState, {release, Guids}).
+sync(Guids, K, CState) -> server_cast(CState, {sync, Guids, K}).
sync(Server) ->
gen_server2:cast(Server, sync).
-gc_done(Server, Reclaimed, Source, Destination) ->
- gen_server2:cast(Server, {gc_done, Reclaimed, Source, Destination}).
-
set_maximum_since_use(Server, Age) ->
gen_server2:cast(Server, {set_maximum_since_use, Age}).
@@ -385,18 +407,22 @@ set_maximum_since_use(Server, Age) ->
%% Client-side-only helpers
%%----------------------------------------------------------------------------
-client_read1(Server,
- #msg_location { guid = Guid, file = File } = MsgLocation,
- Defer,
+server_call(#client_msstate { server = Server }, Msg) ->
+ gen_server2:call(Server, Msg, infinity).
+
+server_cast(#client_msstate { server = Server }, Msg) ->
+ gen_server2:cast(Server, Msg).
+
+client_read1(#msg_location { guid = Guid, file = File } = MsgLocation, Defer,
CState = #client_msstate { file_summary_ets = FileSummaryEts }) ->
case ets:lookup(FileSummaryEts, File) of
[] -> %% File has been GC'd and no longer exists. Go around again.
- read(Server, Guid, CState);
+ read(Guid, CState);
[#file_summary { locked = Locked, right = Right }] ->
- client_read2(Server, Locked, Right, MsgLocation, Defer, CState)
+ client_read2(Locked, Right, MsgLocation, Defer, CState)
end.
-client_read2(_Server, false, undefined, _MsgLocation, Defer, _CState) ->
+client_read2(false, undefined, _MsgLocation, Defer, _CState) ->
%% Although we've already checked both caches and not found the
%% message there, the message is apparently in the
%% current_file. We can only arrive here if we are trying to read
@@ -407,12 +433,12 @@ client_read2(_Server, false, undefined, _MsgLocation, Defer, _CState) ->
%% contents of the current file, thus reads from the current file
%% will end up here and will need to be deferred.
Defer();
-client_read2(_Server, true, _Right, _MsgLocation, Defer, _CState) ->
+client_read2(true, _Right, _MsgLocation, Defer, _CState) ->
%% Of course, in the mean time, the GC could have run and our msg
%% is actually in a different file, unlocked. However, defering is
%% the safest and simplest thing to do.
Defer();
-client_read2(Server, false, _Right,
+client_read2(false, _Right,
MsgLocation = #msg_location { guid = Guid, file = File },
Defer,
CState = #client_msstate { file_summary_ets = FileSummaryEts }) ->
@@ -421,10 +447,10 @@ client_read2(Server, false, _Right,
%% finished.
safe_ets_update_counter(
FileSummaryEts, File, {#file_summary.readers, +1},
- fun (_) -> client_read3(Server, MsgLocation, Defer, CState) end,
- fun () -> read(Server, Guid, CState) end).
+ fun (_) -> client_read3(MsgLocation, Defer, CState) end,
+ fun () -> read(Guid, CState) end).
-client_read3(Server, #msg_location { guid = Guid, file = File }, Defer,
+client_read3(#msg_location { guid = Guid, file = File }, Defer,
CState = #client_msstate { file_handles_ets = FileHandlesEts,
file_summary_ets = FileSummaryEts,
dedup_cache_ets = DedupCacheEts,
@@ -448,7 +474,7 @@ client_read3(Server, #msg_location { guid = Guid, file = File }, Defer,
%% too).
case ets:lookup(FileSummaryEts, File) of
[] -> %% GC has deleted our file, just go round again.
- read(Server, Guid, CState);
+ read(Guid, CState);
[#file_summary { locked = true }] ->
%% If we get a badarg here, then the GC has finished and
%% deleted our file. Try going around again. Otherwise,
@@ -459,7 +485,7 @@ client_read3(Server, #msg_location { guid = Guid, file = File }, Defer,
%% unlocks the dest)
try Release(),
Defer()
- catch error:badarg -> read(Server, Guid, CState)
+ catch error:badarg -> read(Guid, CState)
end;
[#file_summary { locked = false }] ->
%% Ok, we're definitely safe to continue - a GC involving
@@ -482,9 +508,14 @@ client_read3(Server, #msg_location { guid = Guid, file = File }, Defer,
read_from_disk(MsgLocation, CState1, DedupCacheEts),
Release(), %% this MUST NOT fail with badarg
{{ok, Msg}, CState2};
- MsgLocation -> %% different file!
+ #msg_location {} = MsgLocation -> %% different file!
Release(), %% this MUST NOT fail with badarg
- client_read1(Server, MsgLocation, Defer, CState)
+ client_read1(MsgLocation, Defer, CState);
+ not_found -> %% it seems not to exist. Defer, just to be sure.
+ try Release() %% this can badarg, same as locked case, above
+ catch error:badarg -> ok
+ end,
+ Defer()
end
end.
@@ -547,8 +578,7 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
sync_timer_ref = undefined,
sum_valid_data = 0,
sum_file_size = 0,
- pending_gc_completion = [],
- gc_active = false,
+ pending_gc_completion = orddict:new(),
gc_pid = undefined,
file_handles_ets = FileHandlesEts,
file_summary_ets = FileSummaryEts,
@@ -570,8 +600,13 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
{ok, Offset} = file_handle_cache:position(CurHdl, Offset),
ok = file_handle_cache:truncate(CurHdl),
- {ok, GCPid} = rabbit_msg_store_gc:start_link(Dir, IndexState, IndexModule,
- FileSummaryEts),
+ {ok, GCPid} = rabbit_msg_store_gc:start_link(
+ #gc_state { dir = Dir,
+ index_module = IndexModule,
+ index_state = IndexState,
+ file_summary_ets = FileSummaryEts,
+ msg_store = self()
+ }),
{ok, maybe_compact(
State1 #msstate { current_file_handle = CurHdl, gc_pid = GCPid }),
@@ -588,10 +623,11 @@ prioritise_call(Msg, _From, _State) ->
prioritise_cast(Msg, _State) ->
case Msg of
- sync -> 8;
- {gc_done, _Reclaimed, _Source, _Destination} -> 8;
- {set_maximum_since_use, _Age} -> 8;
- _ -> 0
+ sync -> 8;
+ {combine_files, _Source, _Destination, _Reclaimed} -> 8;
+ {delete_file, _File, _Reclaimed} -> 8;
+ {set_maximum_since_use, _Age} -> 8;
+ _ -> 0
end.
handle_call(successfully_recovered_state, _From, State) ->
@@ -686,37 +722,23 @@ handle_cast({sync, Guids, K},
handle_cast(sync, State) ->
noreply(internal_sync(State));
-handle_cast({gc_done, Reclaimed, Src, Dst},
+handle_cast({combine_files, Source, Destination, Reclaimed},
State = #msstate { sum_file_size = SumFileSize,
- gc_active = {Src, Dst},
file_handles_ets = FileHandlesEts,
file_summary_ets = FileSummaryEts }) ->
- %% GC done, so now ensure that any clients that have open fhs to
- %% those files close them before using them again. This has to be
- %% done here (given it's done in the msg_store, and not the gc),
- %% and not when starting up the GC, because if done when starting
- %% up the GC, the client could find the close, and close and
- %% reopen the fh, whilst the GC is waiting for readers to
- %% disappear, before it's actually done the GC.
- true = mark_handle_to_close(FileHandlesEts, Src),
- true = mark_handle_to_close(FileHandlesEts, Dst),
- %% we always move data left, so Src has gone and was on the
- %% right, so need to make dest = source.right.left, and also
- %% dest.right = source.right
- [#file_summary { left = Dst,
- right = SrcRight,
- locked = true,
- readers = 0 }] = ets:lookup(FileSummaryEts, Src),
- %% this could fail if SrcRight =:= undefined
- ets:update_element(FileSummaryEts, SrcRight, {#file_summary.left, Dst}),
- true = ets:update_element(FileSummaryEts, Dst,
- [{#file_summary.locked, false},
- {#file_summary.right, SrcRight}]),
- true = ets:delete(FileSummaryEts, Src),
- noreply(
- maybe_compact(run_pending(
- State #msstate { sum_file_size = SumFileSize - Reclaimed,
- gc_active = false })));
+ ok = cleanup_after_file_deletion(Source, State),
+ %% see comment in cleanup_after_file_deletion
+ true = mark_handle_to_close(FileHandlesEts, Destination),
+ true = ets:update_element(FileSummaryEts, Destination,
+ {#file_summary.locked, false}),
+ State1 = State #msstate { sum_file_size = SumFileSize - Reclaimed },
+ noreply(maybe_compact(run_pending([Source, Destination], State1)));
+
+handle_cast({delete_file, File, Reclaimed},
+ State = #msstate { sum_file_size = SumFileSize }) ->
+ ok = cleanup_after_file_deletion(File, State),
+ State1 = State #msstate { sum_file_size = SumFileSize - Reclaimed },
+ noreply(maybe_compact(run_pending([File], State1)));
handle_cast({set_maximum_since_use, Age}, State) ->
ok = file_handle_cache:set_maximum_since_use(Age),
@@ -867,7 +889,7 @@ read_message1(From, #msg_location { guid = Guid, ref_count = RefCount,
ets:lookup(FileSummaryEts, File),
case Locked of
true -> add_to_pending_gc_completion({read, Guid, From},
- State);
+ File, State);
false -> {Msg, State1} =
read_from_disk(MsgLoc, State, DedupCacheEts),
gen_server2:reply(From, {ok, Msg}),
@@ -897,19 +919,18 @@ read_from_disk(#msg_location { guid = Guid, ref_count = RefCount,
ok = maybe_insert_into_cache(DedupCacheEts, RefCount, Guid, Msg),
{Msg, State1}.
-contains_message(Guid, From, State = #msstate { gc_active = GCActive }) ->
+contains_message(Guid, From,
+ State = #msstate { pending_gc_completion = Pending }) ->
case index_lookup_positive_ref_count(Guid, State) of
not_found ->
gen_server2:reply(From, false),
State;
#msg_location { file = File } ->
- case GCActive of
- {A, B} when File =:= A orelse File =:= B ->
- add_to_pending_gc_completion(
- {contains, Guid, From}, State);
- _ ->
- gen_server2:reply(From, true),
- State
+ case orddict:is_key(File, Pending) of
+ true -> add_to_pending_gc_completion(
+ {contains, Guid, From}, File, State);
+ false -> gen_server2:reply(From, true),
+ State
end
end.
@@ -928,7 +949,7 @@ remove_message(Guid, State = #msstate { sum_valid_data = SumValid,
1 -> ok = remove_cache_entry(DedupCacheEts, Guid),
case ets:lookup(FileSummaryEts, File) of
[#file_summary { locked = true } ] ->
- add_to_pending_gc_completion({remove, Guid}, State);
+ add_to_pending_gc_completion({remove, Guid}, File, State);
[#file_summary {}] ->
ok = Dec(),
[_] = ets:update_counter(
@@ -944,20 +965,25 @@ remove_message(Guid, State = #msstate { sum_valid_data = SumValid,
end.
add_to_pending_gc_completion(
- Op, State = #msstate { pending_gc_completion = Pending }) ->
- State #msstate { pending_gc_completion = [Op | Pending] }.
-
-run_pending(State = #msstate { pending_gc_completion = [] }) ->
- State;
-run_pending(State = #msstate { pending_gc_completion = Pending }) ->
- State1 = State #msstate { pending_gc_completion = [] },
- lists:foldl(fun run_pending/2, State1, lists:reverse(Pending)).
+ Op, File, State = #msstate { pending_gc_completion = Pending }) ->
+ State #msstate { pending_gc_completion =
+ rabbit_misc:orddict_cons(File, Op, Pending) }.
-run_pending({read, Guid, From}, State) ->
+run_pending(Files, State) ->
+ lists:foldl(
+ fun (File, State1 = #msstate { pending_gc_completion = Pending }) ->
+ Pending1 = orddict:erase(File, Pending),
+ lists:foldl(
+ fun run_pending_action/2,
+ State1 #msstate { pending_gc_completion = Pending1 },
+ lists:reverse(orddict:fetch(File, Pending)))
+ end, State, Files).
+
+run_pending_action({read, Guid, From}, State) ->
read_message(Guid, From, State);
-run_pending({contains, Guid, From}, State) ->
+run_pending_action({contains, Guid, From}, State) ->
contains_message(Guid, From, State);
-run_pending({remove, Guid}, State) ->
+run_pending_action({remove, Guid}, State) ->
remove_message(Guid, State).
safe_ets_update_counter(Tab, Key, UpdateOp, SuccessFun, FailThunk) ->
@@ -969,6 +995,10 @@ safe_ets_update_counter(Tab, Key, UpdateOp, SuccessFun, FailThunk) ->
safe_ets_update_counter_ok(Tab, Key, UpdateOp, FailThunk) ->
safe_ets_update_counter(Tab, Key, UpdateOp, fun (_) -> ok end, FailThunk).
+orddict_store(Key, Val, Dict) ->
+ false = orddict:is_key(Key, Dict),
+ orddict:store(Key, Val, Dict).
+
%%----------------------------------------------------------------------------
%% file helper functions
%%----------------------------------------------------------------------------
@@ -1429,12 +1459,12 @@ maybe_roll_to_new_file(
maybe_roll_to_new_file(_, State) ->
State.
-maybe_compact(State = #msstate { sum_valid_data = SumValid,
- sum_file_size = SumFileSize,
- gc_active = false,
- gc_pid = GCPid,
- file_summary_ets = FileSummaryEts,
- file_size_limit = FileSizeLimit })
+maybe_compact(State = #msstate { sum_valid_data = SumValid,
+ sum_file_size = SumFileSize,
+ gc_pid = GCPid,
+ pending_gc_completion = Pending,
+ file_summary_ets = FileSummaryEts,
+ file_size_limit = FileSizeLimit })
when (SumFileSize > 2 * FileSizeLimit andalso
(SumFileSize - SumValid) / SumFileSize > ?GARBAGE_FRACTION) ->
%% TODO: the algorithm here is sub-optimal - it may result in a
@@ -1443,27 +1473,30 @@ maybe_compact(State = #msstate { sum_valid_data = SumValid,
'$end_of_table' ->
State;
First ->
- case find_files_to_gc(FileSummaryEts, FileSizeLimit,
- ets:lookup(FileSummaryEts, First)) of
+ case find_files_to_combine(FileSummaryEts, FileSizeLimit,
+ ets:lookup(FileSummaryEts, First)) of
not_found ->
State;
{Src, Dst} ->
+ Pending1 = orddict_store(Dst, [],
+ orddict_store(Src, [], Pending)),
State1 = close_handle(Src, close_handle(Dst, State)),
true = ets:update_element(FileSummaryEts, Src,
{#file_summary.locked, true}),
true = ets:update_element(FileSummaryEts, Dst,
{#file_summary.locked, true}),
- ok = rabbit_msg_store_gc:gc(GCPid, Src, Dst),
- State1 #msstate { gc_active = {Src, Dst} }
+ ok = rabbit_msg_store_gc:combine(GCPid, Src, Dst),
+ State1 #msstate { pending_gc_completion = Pending1 }
end
end;
maybe_compact(State) ->
State.
-find_files_to_gc(FileSummaryEts, FileSizeLimit,
- [#file_summary { file = Dst,
- valid_total_size = DstValid,
- right = Src }]) ->
+find_files_to_combine(FileSummaryEts, FileSizeLimit,
+ [#file_summary { file = Dst,
+ valid_total_size = DstValid,
+ right = Src,
+ locked = DstLocked }]) ->
case Src of
undefined ->
not_found;
@@ -1471,13 +1504,16 @@ find_files_to_gc(FileSummaryEts, FileSizeLimit,
[#file_summary { file = Src,
valid_total_size = SrcValid,
left = Dst,
- right = SrcRight }] = Next =
+ right = SrcRight,
+ locked = SrcLocked }] = Next =
ets:lookup(FileSummaryEts, Src),
case SrcRight of
undefined -> not_found;
- _ -> case DstValid + SrcValid =< FileSizeLimit of
+ _ -> case (DstValid + SrcValid =< FileSizeLimit) andalso
+ (DstValid > 0) andalso (SrcValid > 0) andalso
+ not (DstLocked orelse SrcLocked) of
true -> {Src, Dst};
- false -> find_files_to_gc(
+ false -> find_files_to_combine(
FileSummaryEts, FileSizeLimit, Next)
end
end
@@ -1486,85 +1522,86 @@ find_files_to_gc(FileSummaryEts, FileSizeLimit,
delete_file_if_empty(File, State = #msstate { current_file = File }) ->
State;
delete_file_if_empty(File, State = #msstate {
- dir = Dir,
- sum_file_size = SumFileSize,
- file_handles_ets = FileHandlesEts,
- file_summary_ets = FileSummaryEts }) ->
+ gc_pid = GCPid,
+ file_summary_ets = FileSummaryEts,
+ pending_gc_completion = Pending }) ->
[#file_summary { valid_total_size = ValidData,
- left = Left,
- right = Right,
- file_size = FileSize,
locked = false }] =
ets:lookup(FileSummaryEts, File),
case ValidData of
- %% we should NEVER find the current file in here hence right
- %% should always be a file, not undefined
- 0 -> case {Left, Right} of
- {undefined, _} when Right =/= undefined ->
- %% the eldest file is empty.
- true = ets:update_element(
- FileSummaryEts, Right,
- {#file_summary.left, undefined});
- {_, _} when Right =/= undefined ->
- true = ets:update_element(FileSummaryEts, Right,
- {#file_summary.left, Left}),
- true = ets:update_element(FileSummaryEts, Left,
- {#file_summary.right, Right})
- end,
- true = mark_handle_to_close(FileHandlesEts, File),
- true = ets:delete(FileSummaryEts, File),
- {ok, Messages, FileSize} =
- scan_file_for_valid_messages(Dir, filenum_to_name(File)),
- [index_delete(Guid, State) ||
- {Guid, _TotalSize, _Offset} <- Messages],
- State1 = close_handle(File, State),
- ok = file:delete(form_filename(Dir, filenum_to_name(File))),
- State1 #msstate { sum_file_size = SumFileSize - FileSize };
+ 0 -> %% don't delete the file_summary_ets entry for File here
+ %% because we could have readers which need to be able to
+ %% decrement the readers count.
+ true = ets:update_element(FileSummaryEts, File,
+ {#file_summary.locked, true}),
+ ok = rabbit_msg_store_gc:delete(GCPid, File),
+ Pending1 = orddict_store(File, [], Pending),
+ close_handle(File,
+ State #msstate { pending_gc_completion = Pending1 });
_ -> State
end.
+cleanup_after_file_deletion(File,
+ #msstate { file_handles_ets = FileHandlesEts,
+ file_summary_ets = FileSummaryEts }) ->
+ %% Ensure that any clients that have open fhs to the file close
+ %% them before using them again. This has to be done here (given
+ %% it's done in the msg_store, and not the gc), and not when
+ %% starting up the GC, because if done when starting up the GC,
+ %% the client could find the close, and close and reopen the fh,
+ %% whilst the GC is waiting for readers to disappear, before it's
+ %% actually done the GC.
+ true = mark_handle_to_close(FileHandlesEts, File),
+ [#file_summary { left = Left,
+ right = Right,
+ locked = true,
+ readers = 0 }] = ets:lookup(FileSummaryEts, File),
+ %% We'll never delete the current file, so right is never undefined
+ true = Right =/= undefined, %% ASSERTION
+ true = ets:update_element(FileSummaryEts, Right,
+ {#file_summary.left, Left}),
+ %% ensure the double linked list is maintained
+ true = case Left of
+ undefined -> true; %% File is the eldest file (left-most)
+ _ -> ets:update_element(FileSummaryEts, Left,
+ {#file_summary.right, Right})
+ end,
+ true = ets:delete(FileSummaryEts, File),
+ ok.
+
%%----------------------------------------------------------------------------
%% garbage collection / compaction / aggregation -- external
%%----------------------------------------------------------------------------
-gc(SrcFile, DstFile, State = {FileSummaryEts, _Dir, _Index, _IndexState}) ->
- [SrcObj = #file_summary {
- readers = SrcReaders,
- left = DstFile,
- file_size = SrcFileSize,
- locked = true }] = ets:lookup(FileSummaryEts, SrcFile),
- [DstObj = #file_summary {
- readers = DstReaders,
- right = SrcFile,
- file_size = DstFileSize,
- locked = true }] = ets:lookup(FileSummaryEts, DstFile),
-
- case SrcReaders =:= 0 andalso DstReaders =:= 0 of
- true -> TotalValidData = combine_files(SrcObj, DstObj, State),
- %% don't update dest.right, because it could be
- %% changing at the same time
- true = ets:update_element(
- FileSummaryEts, DstFile,
- [{#file_summary.valid_total_size, TotalValidData},
- {#file_summary.file_size, TotalValidData}]),
- SrcFileSize + DstFileSize - TotalValidData;
- false -> concurrent_readers
- end.
-
-combine_files(#file_summary { file = Source,
- valid_total_size = SourceValid,
- left = Destination },
- #file_summary { file = Destination,
- valid_total_size = DestinationValid,
- right = Source },
- State = {_FileSummaryEts, Dir, _Index, _IndexState}) ->
- SourceName = filenum_to_name(Source),
- DestinationName = filenum_to_name(Destination),
+has_readers(File, #gc_state { file_summary_ets = FileSummaryEts }) ->
+ [#file_summary { locked = true, readers = Count }] =
+ ets:lookup(FileSummaryEts, File),
+ Count /= 0.
+
+combine_files(Source, Destination,
+ State = #gc_state { file_summary_ets = FileSummaryEts,
+ dir = Dir,
+ msg_store = Server }) ->
+ [#file_summary {
+ readers = 0,
+ left = Destination,
+ valid_total_size = SourceValid,
+ file_size = SourceFileSize,
+ locked = true }] = ets:lookup(FileSummaryEts, Source),
+ [#file_summary {
+ readers = 0,
+ right = Source,
+ valid_total_size = DestinationValid,
+ file_size = DestinationFileSize,
+ locked = true }] = ets:lookup(FileSummaryEts, Destination),
+
+ SourceName = filenum_to_name(Source),
+ DestinationName = filenum_to_name(Destination),
{ok, SourceHdl} = open_file(Dir, SourceName,
?READ_AHEAD_MODE),
{ok, DestinationHdl} = open_file(Dir, DestinationName,
?READ_AHEAD_MODE ++ ?WRITE_MODE),
- ExpectedSize = SourceValid + DestinationValid,
+ TotalValidData = SourceValid + DestinationValid,
%% if DestinationValid =:= DestinationContiguousTop then we don't
%% need a tmp file
%% if they're not equal, then we need to write out everything past
@@ -1577,7 +1614,7 @@ combine_files(#file_summary { file = Source,
drop_contiguous_block_prefix(DestinationWorkList),
case DestinationWorkListTail of
[] -> ok = truncate_and_extend_file(
- DestinationHdl, DestinationContiguousTop, ExpectedSize);
+ DestinationHdl, DestinationContiguousTop, TotalValidData);
_ -> Tmp = filename:rootname(DestinationName) ++ ?FILE_EXTENSION_TMP,
{ok, TmpHdl} = open_file(Dir, Tmp, ?READ_AHEAD_MODE++?WRITE_MODE),
ok = copy_messages(
@@ -1591,7 +1628,7 @@ combine_files(#file_summary { file = Source,
%% Destination and copy from Tmp back to the end
{ok, 0} = file_handle_cache:position(TmpHdl, 0),
ok = truncate_and_extend_file(
- DestinationHdl, DestinationContiguousTop, ExpectedSize),
+ DestinationHdl, DestinationContiguousTop, TotalValidData),
{ok, TmpSize} =
file_handle_cache:copy(TmpHdl, DestinationHdl, TmpSize),
%% position in DestinationHdl should now be DestinationValid
@@ -1599,14 +1636,36 @@ combine_files(#file_summary { file = Source,
ok = file_handle_cache:delete(TmpHdl)
end,
{SourceWorkList, SourceValid} = load_and_vacuum_message_file(Source, State),
- ok = copy_messages(SourceWorkList, DestinationValid, ExpectedSize,
+ ok = copy_messages(SourceWorkList, DestinationValid, TotalValidData,
SourceHdl, DestinationHdl, Destination, State),
%% tidy up
ok = file_handle_cache:close(DestinationHdl),
ok = file_handle_cache:delete(SourceHdl),
- ExpectedSize.
-load_and_vacuum_message_file(File, {_FileSummaryEts, Dir, Index, IndexState}) ->
+ %% don't update dest.right, because it could be changing at the
+ %% same time
+ true = ets:update_element(
+ FileSummaryEts, Destination,
+ [{#file_summary.valid_total_size, TotalValidData},
+ {#file_summary.file_size, TotalValidData}]),
+
+ Reclaimed = SourceFileSize + DestinationFileSize - TotalValidData,
+ gen_server2:cast(Server, {combine_files, Source, Destination, Reclaimed}).
+
+delete_file(File, State = #gc_state { file_summary_ets = FileSummaryEts,
+ dir = Dir,
+ msg_store = Server }) ->
+ [#file_summary { valid_total_size = 0,
+ locked = true,
+ file_size = FileSize,
+ readers = 0 }] = ets:lookup(FileSummaryEts, File),
+ {[], 0} = load_and_vacuum_message_file(File, State),
+ ok = file:delete(form_filename(Dir, filenum_to_name(File))),
+ gen_server2:cast(Server, {delete_file, File, FileSize}).
+
+load_and_vacuum_message_file(File, #gc_state { dir = Dir,
+ index_module = Index,
+ index_state = IndexState }) ->
%% Messages here will be end-of-file at start-of-list
{ok, Messages, _FileSize} =
scan_file_for_valid_messages(Dir, filenum_to_name(File)),
@@ -1627,7 +1686,8 @@ load_and_vacuum_message_file(File, {_FileSummaryEts, Dir, Index, IndexState}) ->
end, {[], 0}, Messages).
copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl,
- Destination, {_FileSummaryEts, _Dir, Index, IndexState}) ->
+ Destination, #gc_state { index_module = Index,
+ index_state = IndexState }) ->
Copy = fun ({BlockStart, BlockEnd}) ->
BSize = BlockEnd - BlockStart,
{ok, BlockStart} =
diff --git a/src/rabbit_msg_store_gc.erl b/src/rabbit_msg_store_gc.erl
index a7855bbf..cd9fd497 100644
--- a/src/rabbit_msg_store_gc.erl
+++ b/src/rabbit_msg_store_gc.erl
@@ -33,20 +33,16 @@
-behaviour(gen_server2).
--export([start_link/4, gc/3, no_readers/2, stop/1]).
+-export([start_link/1, combine/3, delete/2, no_readers/2, stop/1]).
-export([set_maximum_since_use/2]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3, prioritise_cast/2]).
--record(gcstate,
- {dir,
- index_state,
- index_module,
- parent,
- file_summary_ets,
- scheduled
+-record(state,
+ { pending_no_readers,
+ msg_store_state
}).
-include("rabbit.hrl").
@@ -55,10 +51,12 @@
-ifdef(use_specs).
--spec(start_link/4 :: (file:filename(), any(), atom(), ets:tid()) ->
+-spec(start_link/1 :: (rabbit_msg_store:gc_state()) ->
rabbit_types:ok_pid_or_error()).
--spec(gc/3 :: (pid(), non_neg_integer(), non_neg_integer()) -> 'ok').
--spec(no_readers/2 :: (pid(), non_neg_integer()) -> 'ok').
+-spec(combine/3 :: (pid(), rabbit_msg_store:file_num(),
+ rabbit_msg_store:file_num()) -> 'ok').
+-spec(delete/2 :: (pid(), rabbit_msg_store:file_num()) -> 'ok').
+-spec(no_readers/2 :: (pid(), rabbit_msg_store:file_num()) -> 'ok').
-spec(stop/1 :: (pid()) -> 'ok').
-spec(set_maximum_since_use/2 :: (pid(), non_neg_integer()) -> 'ok').
@@ -66,13 +64,15 @@
%%----------------------------------------------------------------------------
-start_link(Dir, IndexState, IndexModule, FileSummaryEts) ->
- gen_server2:start_link(
- ?MODULE, [self(), Dir, IndexState, IndexModule, FileSummaryEts],
- [{timeout, infinity}]).
+start_link(MsgStoreState) ->
+ gen_server2:start_link(?MODULE, [MsgStoreState],
+ [{timeout, infinity}]).
-gc(Server, Source, Destination) ->
- gen_server2:cast(Server, {gc, Source, Destination}).
+combine(Server, Source, Destination) ->
+ gen_server2:cast(Server, {combine, Source, Destination}).
+
+delete(Server, File) ->
+ gen_server2:cast(Server, {delete, File}).
no_readers(Server, File) ->
gen_server2:cast(Server, {no_readers, File}).
@@ -85,16 +85,11 @@ set_maximum_since_use(Pid, Age) ->
%%----------------------------------------------------------------------------
-init([Parent, Dir, IndexState, IndexModule, FileSummaryEts]) ->
+init([MsgStoreState]) ->
ok = file_handle_cache:register_callback(?MODULE, set_maximum_since_use,
[self()]),
- {ok, #gcstate { dir = Dir,
- index_state = IndexState,
- index_module = IndexModule,
- parent = Parent,
- file_summary_ets = FileSummaryEts,
- scheduled = undefined },
- hibernate,
+ {ok, #state { pending_no_readers = dict:new(),
+ msg_store_state = MsgStoreState }, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
prioritise_cast({set_maximum_since_use, _Age}, _State) -> 8;
@@ -103,18 +98,23 @@ prioritise_cast(_Msg, _State) -> 0.
handle_call(stop, _From, State) ->
{stop, normal, ok, State}.
-handle_cast({gc, Source, Destination},
- State = #gcstate { scheduled = undefined }) ->
- {noreply, attempt_gc(State #gcstate { scheduled = {Source, Destination} }),
- hibernate};
+handle_cast({combine, Source, Destination}, State) ->
+ {noreply, attempt_action(combine, [Source, Destination], State), hibernate};
-handle_cast({no_readers, File},
- State = #gcstate { scheduled = {Source, Destination} })
- when File =:= Source orelse File =:= Destination ->
- {noreply, attempt_gc(State), hibernate};
+handle_cast({delete, File}, State) ->
+ {noreply, attempt_action(delete, [File], State), hibernate};
-handle_cast({no_readers, _File}, State) ->
- {noreply, State, hibernate};
+handle_cast({no_readers, File},
+ State = #state { pending_no_readers = Pending }) ->
+ {noreply, case dict:find(File, Pending) of
+ error ->
+ State;
+ {ok, {Action, Files}} ->
+ Pending1 = dict:erase(File, Pending),
+ attempt_action(
+ Action, Files,
+ State #state { pending_no_readers = Pending1 })
+ end, hibernate};
handle_cast({set_maximum_since_use, Age}, State) ->
ok = file_handle_cache:set_maximum_since_use(Age),
@@ -129,16 +129,18 @@ terminate(_Reason, State) ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
-attempt_gc(State = #gcstate { dir = Dir,
- index_state = IndexState,
- index_module = Index,
- parent = Parent,
- file_summary_ets = FileSummaryEts,
- scheduled = {Source, Destination} }) ->
- case rabbit_msg_store:gc(Source, Destination,
- {FileSummaryEts, Dir, Index, IndexState}) of
- concurrent_readers -> State;
- Reclaimed -> ok = rabbit_msg_store:gc_done(
- Parent, Reclaimed, Source, Destination),
- State #gcstate { scheduled = undefined }
+attempt_action(Action, Files,
+ State = #state { pending_no_readers = Pending,
+ msg_store_state = MsgStoreState }) ->
+ case [File || File <- Files,
+ rabbit_msg_store:has_readers(File, MsgStoreState)] of
+ [] -> do_action(Action, Files, MsgStoreState),
+ State;
+ [File | _] -> Pending1 = dict:store(File, {Action, Files}, Pending),
+ State #state { pending_no_readers = Pending1 }
end.
+
+do_action(combine, [Source, Destination], MsgStoreState) ->
+ rabbit_msg_store:combine_files(Source, Destination, MsgStoreState);
+do_action(delete, [File], MsgStoreState) ->
+ rabbit_msg_store:delete_file(File, MsgStoreState).
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index db5c71f6..03a9b386 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -41,7 +41,7 @@
%%used by TCP-based transports, e.g. STOMP adapter
-export([check_tcp_listener_address/3]).
--export([tcp_listener_started/2, tcp_listener_stopped/2,
+-export([tcp_listener_started/3, tcp_listener_stopped/3,
start_client/1, start_ssl_client/2]).
-include("rabbit.hrl").
@@ -160,14 +160,14 @@ check_tcp_listener_address(NamePrefix, Host, Port) ->
{IPAddress, Name}.
start_tcp_listener(Host, Port) ->
- start_listener(Host, Port, "TCP Listener",
+ start_listener(Host, Port, amqp, "TCP Listener",
{?MODULE, start_client, []}).
start_ssl_listener(Host, Port, SslOpts) ->
- start_listener(Host, Port, "SSL Listener",
+ start_listener(Host, Port, 'amqp/ssl', "SSL Listener",
{?MODULE, start_ssl_client, [SslOpts]}).
-start_listener(Host, Port, Label, OnConnect) ->
+start_listener(Host, Port, Protocol, Label, OnConnect) ->
{IPAddress, Name} =
check_tcp_listener_address(rabbit_tcp_listener_sup, Host, Port),
{ok,_} = supervisor:start_child(
@@ -175,8 +175,8 @@ start_listener(Host, Port, Label, OnConnect) ->
{Name,
{tcp_listener_sup, start_link,
[IPAddress, Port, ?RABBIT_TCP_OPTS ,
- {?MODULE, tcp_listener_started, []},
- {?MODULE, tcp_listener_stopped, []},
+ {?MODULE, tcp_listener_started, [Protocol]},
+ {?MODULE, tcp_listener_stopped, [Protocol]},
OnConnect, Label]},
transient, infinity, supervisor, [tcp_listener_sup]}),
ok.
@@ -188,20 +188,25 @@ stop_tcp_listener(Host, Port) ->
ok = supervisor:delete_child(rabbit_sup, Name),
ok.
-tcp_listener_started(IPAddress, Port) ->
+tcp_listener_started(Protocol, IPAddress, Port) ->
+ %% We need the ip to distinguish e.g. 0.0.0.0 and 127.0.0.1
+ %% We need the host so we can distinguish multiple instances of the above
+ %% in a cluster.
ok = mnesia:dirty_write(
rabbit_listener,
#listener{node = node(),
- protocol = tcp,
+ protocol = Protocol,
host = tcp_host(IPAddress),
+ ip_address = IPAddress,
port = Port}).
-tcp_listener_stopped(IPAddress, Port) ->
+tcp_listener_stopped(Protocol, IPAddress, Port) ->
ok = mnesia:dirty_delete_object(
rabbit_listener,
#listener{node = node(),
- protocol = tcp,
+ protocol = Protocol,
host = tcp_host(IPAddress),
+ ip_address = IPAddress,
port = Port}).
active_listeners() ->
diff --git a/src/rabbit_persister.erl b/src/rabbit_persister.erl
index 66e5cf63..11056c8e 100644
--- a/src/rabbit_persister.erl
+++ b/src/rabbit_persister.erl
@@ -69,7 +69,8 @@
-type(pmsg() :: {rabbit_amqqueue:name(), pkey()}).
-type(work_item() ::
- {publish, rabbit_types:message(), pmsg()} |
+ {publish,
+ rabbit_types:message(), rabbit_types:message_properties(), pmsg()} |
{deliver, pmsg()} |
{ack, pmsg()}).
@@ -173,9 +174,10 @@ handle_call(force_snapshot, _From, State) ->
handle_call({queue_content, QName}, _From,
State = #pstate{snapshot = #psnapshot{messages = Messages,
queues = Queues}}) ->
- MatchSpec= [{{{QName,'$1'}, '$2', '$3'}, [], [{{'$3', '$1', '$2'}}]}],
- do_reply([{ets:lookup_element(Messages, K, 2), D} ||
- {_, K, D} <- lists:sort(ets:select(Queues, MatchSpec))],
+ MatchSpec= [{{{QName,'$1'}, '$2', '$3', '$4'}, [],
+ [{{'$4', '$1', '$2', '$3'}}]}],
+ do_reply([{ets:lookup_element(Messages, K, 2), MP, D} ||
+ {_, K, D, MP} <- lists:sort(ets:select(Queues, MatchSpec))],
State);
handle_call(_Request, _From, State) ->
{noreply, State}.
@@ -243,9 +245,9 @@ log_work(CreateWorkUnit, MessageList,
snapshot = Snapshot = #psnapshot{messages = Messages}}) ->
Unit = CreateWorkUnit(
rabbit_misc:map_in_order(
- fun (M = {publish, Message, QK = {_QName, PKey}}) ->
+ fun (M = {publish, Message, MsgProps, QK = {_QName, PKey}}) ->
case ets:lookup(Messages, PKey) of
- [_] -> {tied, QK};
+ [_] -> {tied, MsgProps, QK};
[] -> ets:insert(Messages, {PKey, Message}),
M
end;
@@ -356,7 +358,8 @@ current_snapshot(_Snapshot = #psnapshot{transactions = Ts,
next_seq_id = NextSeqId}) ->
%% Avoid infinite growth of the table by removing messages not
%% bound to a queue anymore
- PKeys = ets:foldl(fun ({{_QName, PKey}, _Delivered, _SeqId}, S) ->
+ PKeys = ets:foldl(fun ({{_QName, PKey}, _Delivered,
+ _MsgProps, _SeqId}, S) ->
sets:add_element(PKey, S)
end, sets:new(), Queues),
prune_table(Messages, fun (Key) -> sets:is_element(Key, PKeys) end),
@@ -474,14 +477,14 @@ perform_work(MessageList, Messages, Queues, SeqId) ->
perform_work_item(Item, Messages, Queues, NextSeqId)
end, SeqId, MessageList).
-perform_work_item({publish, Message, QK = {_QName, PKey}},
+perform_work_item({publish, Message, MsgProps, QK = {_QName, PKey}},
Messages, Queues, NextSeqId) ->
true = ets:insert(Messages, {PKey, Message}),
- true = ets:insert(Queues, {QK, false, NextSeqId}),
+ true = ets:insert(Queues, {QK, false, MsgProps, NextSeqId}),
NextSeqId + 1;
-perform_work_item({tied, QK}, _Messages, Queues, NextSeqId) ->
- true = ets:insert(Queues, {QK, false, NextSeqId}),
+perform_work_item({tied, MsgProps, QK}, _Messages, Queues, NextSeqId) ->
+ true = ets:insert(Queues, {QK, false, MsgProps, NextSeqId}),
NextSeqId + 1;
perform_work_item({deliver, QK}, _Messages, Queues, NextSeqId) ->
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 0b98290c..28d0b47d 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -31,8 +31,9 @@
-module(rabbit_queue_index).
--export([init/4, terminate/2, delete_and_terminate/1, publish/4,
- deliver/2, ack/2, sync/2, flush/1, read/3,
+-export([init/1, shutdown_terms/1, recover/4,
+ terminate/2, delete_and_terminate/1,
+ publish/5, deliver/2, ack/2, sync/2, flush/1, read/3,
next_segment_boundary/1, bounds/1, recover/1]).
-define(CLEAN_FILENAME, "clean.dot").
@@ -98,12 +99,12 @@
%% and seeding the message store on start up.
%%
%% Note that in general, the representation of a message's state as
-%% the tuple: {('no_pub'|{Guid, IsPersistent}), ('del'|'no_del'),
-%% ('ack'|'no_ack')} is richer than strictly necessary for most
-%% operations. However, for startup, and to ensure the safe and
-%% correct combination of journal entries with entries read from the
-%% segment on disk, this richer representation vastly simplifies and
-%% clarifies the code.
+%% the tuple: {('no_pub'|{Guid, MsgProps, IsPersistent}),
+%% ('del'|'no_del'), ('ack'|'no_ack')} is richer than strictly
+%% necessary for most operations. However, for startup, and to ensure
+%% the safe and correct combination of journal entries with entries
+%% read from the segment on disk, this richer representation vastly
+%% simplifies and clarifies the code.
%%
%% For notes on Clean Shutdown and startup, see documentation in
%% variable_queue.
@@ -141,14 +142,19 @@
-define(REL_SEQ_ONLY_ENTRY_LENGTH_BYTES, 2).
%% publish record is binary 1 followed by a bit for is_persistent,
-%% then 14 bits of rel seq id, and 128 bits of md5sum msg id
+%% then 14 bits of rel seq id, 64 bits for message expiry and 128 bits
+%% of md5sum msg id
-define(PUBLISH_PREFIX, 1).
-define(PUBLISH_PREFIX_BITS, 1).
+-define(EXPIRY_BYTES, 8).
+-define(EXPIRY_BITS, (?EXPIRY_BYTES * 8)).
+-define(NO_EXPIRY, 0).
+
-define(GUID_BYTES, 16). %% md5sum is 128 bit or 16 bytes
-define(GUID_BITS, (?GUID_BYTES * 8)).
-%% 16 bytes for md5sum + 2 for seq, bits and prefix
--define(PUBLISH_RECORD_LENGTH_BYTES, ?GUID_BYTES + 2).
+%% 16 bytes for md5sum + 8 for expiry + 2 for seq, bits and prefix
+-define(PUBLISH_RECORD_LENGTH_BYTES, ?GUID_BYTES + ?EXPIRY_BYTES + 2).
%% 1 publish, 1 deliver, 1 ack per msg
-define(SEGMENT_TOTAL_SIZE, ?SEGMENT_ENTRY_COUNT *
@@ -157,7 +163,7 @@
%% ---- misc ----
--define(PUB, {_, _}). %% {Guid, IsPersistent}
+-define(PUB, {_, _, _}). %% {Guid, MsgProps, IsPersistent}
-define(READ_MODE, [binary, raw, read]).
-define(READ_AHEAD_MODE, [{read_ahead, ?SEGMENT_TOTAL_SIZE} | ?READ_MODE]).
@@ -194,21 +200,26 @@
-type(startup_fun_state() ::
{(fun ((A) -> 'finished' | {rabbit_guid:guid(), non_neg_integer(), A})),
A}).
+-type(shutdown_terms() :: [any()]).
--spec(init/4 :: (rabbit_amqqueue:name(), boolean(), boolean(),
- fun ((rabbit_guid:guid()) -> boolean())) ->
- {'undefined' | non_neg_integer(), [any()], qistate()}).
+-spec(init/1 :: (rabbit_amqqueue:name()) -> qistate()).
+-spec(shutdown_terms/1 :: (rabbit_amqqueue:name()) -> shutdown_terms()).
+-spec(recover/4 :: (rabbit_amqqueue:name(), shutdown_terms(), boolean(),
+ fun ((rabbit_guid:guid()) -> boolean())) ->
+ {'undefined' | non_neg_integer(), qistate()}).
-spec(terminate/2 :: ([any()], qistate()) -> qistate()).
-spec(delete_and_terminate/1 :: (qistate()) -> qistate()).
--spec(publish/4 :: (rabbit_guid:guid(), seq_id(), boolean(), qistate()) ->
- qistate()).
+-spec(publish/5 :: (rabbit_guid:guid(), seq_id(),
+ rabbit_types:message_properties(), boolean(), qistate())
+ -> qistate()).
-spec(deliver/2 :: ([seq_id()], qistate()) -> qistate()).
-spec(ack/2 :: ([seq_id()], qistate()) -> qistate()).
-spec(sync/2 :: ([seq_id()], qistate()) -> qistate()).
-spec(flush/1 :: (qistate()) -> qistate()).
-spec(read/3 :: (seq_id(), seq_id(), qistate()) ->
- {[{rabbit_guid:guid(), seq_id(), boolean(), boolean()}],
- qistate()}).
+ {[{rabbit_guid:guid(), seq_id(),
+ rabbit_types:message_properties(),
+ boolean(), boolean()}], qistate()}).
-spec(next_segment_boundary/1 :: (seq_id()) -> seq_id()).
-spec(bounds/1 :: (qistate()) ->
{non_neg_integer(), non_neg_integer(), qistate()}).
@@ -222,25 +233,26 @@
%% public API
%%----------------------------------------------------------------------------
-init(Name, false, _MsgStoreRecovered, _ContainsCheckFun) ->
+init(Name) ->
State = #qistate { dir = Dir } = blank_state(Name),
false = filelib:is_file(Dir), %% is_file == is file or dir
- {0, [], State};
+ State.
+
+shutdown_terms(Name) ->
+ #qistate { dir = Dir } = blank_state(Name),
+ case read_shutdown_terms(Dir) of
+ {error, _} -> [];
+ {ok, Terms1} -> Terms1
+ end.
-init(Name, true, MsgStoreRecovered, ContainsCheckFun) ->
+recover(Name, Terms, MsgStoreRecovered, ContainsCheckFun) ->
State = #qistate { dir = Dir } = blank_state(Name),
- Terms = case read_shutdown_terms(Dir) of
- {error, _} -> [];
- {ok, Terms1} -> Terms1
- end,
CleanShutdown = detect_clean_shutdown(Dir),
- {Count, State1} =
- case CleanShutdown andalso MsgStoreRecovered of
- true -> RecoveredCounts = proplists:get_value(segments, Terms, []),
- init_clean(RecoveredCounts, State);
- false -> init_dirty(CleanShutdown, ContainsCheckFun, State)
- end,
- {Count, Terms, State1}.
+ case CleanShutdown andalso MsgStoreRecovered of
+ true -> RecoveredCounts = proplists:get_value(segments, Terms, []),
+ init_clean(RecoveredCounts, State);
+ false -> init_dirty(CleanShutdown, ContainsCheckFun, State)
+ end.
terminate(Terms, State) ->
{SegmentCounts, State1 = #qistate { dir = Dir }} = terminate(State),
@@ -252,15 +264,18 @@ delete_and_terminate(State) ->
ok = rabbit_misc:recursive_delete([Dir]),
State1.
-publish(Guid, SeqId, IsPersistent, State) when is_binary(Guid) ->
+publish(Guid, SeqId, MsgProps, IsPersistent, State) when is_binary(Guid) ->
?GUID_BYTES = size(Guid),
{JournalHdl, State1} = get_journal_handle(State),
ok = file_handle_cache:append(
JournalHdl, [<<(case IsPersistent of
true -> ?PUB_PERSIST_JPREFIX;
false -> ?PUB_TRANS_JPREFIX
- end):?JPREFIX_BITS, SeqId:?SEQ_BITS>>, Guid]),
- maybe_flush_journal(add_to_journal(SeqId, {Guid, IsPersistent}, State1)).
+ end):?JPREFIX_BITS,
+ SeqId:?SEQ_BITS>>,
+ create_pub_record_body(Guid, MsgProps)]),
+ maybe_flush_journal(
+ add_to_journal(SeqId, {Guid, MsgProps, IsPersistent}, State1)).
deliver(SeqIds, State) ->
deliver_or_ack(del, SeqIds, State).
@@ -453,7 +468,7 @@ recover_segment(ContainsCheckFun, CleanShutdown,
{SegEntries1, UnackedCountDelta} =
segment_plus_journal(SegEntries, JEntries),
array:sparse_foldl(
- fun (RelSeq, {{Guid, _IsPersistent}, Del, no_ack}, Segment1) ->
+ fun (RelSeq, {{Guid, _MsgProps, _IsPersistent}, Del, no_ack}, Segment1) ->
recover_message(ContainsCheckFun(Guid), CleanShutdown,
Del, RelSeq, Segment1)
end,
@@ -506,7 +521,8 @@ queue_index_walker_reader(QueueName, Gatherer) ->
State = #qistate { segments = Segments, dir = Dir } =
recover_journal(blank_state(QueueName)),
[ok = segment_entries_foldr(
- fun (_RelSeq, {{Guid, true}, _IsDelivered, no_ack}, ok) ->
+ fun (_RelSeq, {{Guid, _MsgProps, true}, _IsDelivered, no_ack},
+ ok) ->
gatherer:in(Gatherer, {Guid, 1});
(_RelSeq, _Value, Acc) ->
Acc
@@ -516,6 +532,32 @@ queue_index_walker_reader(QueueName, Gatherer) ->
ok = gatherer:finish(Gatherer).
%%----------------------------------------------------------------------------
+%% expiry/binary manipulation
+%%----------------------------------------------------------------------------
+
+create_pub_record_body(Guid, #message_properties{expiry = Expiry}) ->
+ [Guid, expiry_to_binary(Expiry)].
+
+expiry_to_binary(undefined) -> <<?NO_EXPIRY:?EXPIRY_BITS>>;
+expiry_to_binary(Expiry) -> <<Expiry:?EXPIRY_BITS>>.
+
+read_pub_record_body(Hdl) ->
+ case file_handle_cache:read(Hdl, ?GUID_BYTES + ?EXPIRY_BYTES) of
+ {ok, Bin} ->
+ %% work around for binary data fragmentation. See
+ %% rabbit_msg_file:read_next/2
+ <<GuidNum:?GUID_BITS, Expiry:?EXPIRY_BITS>> = Bin,
+ <<Guid:?GUID_BYTES/binary>> = <<GuidNum:?GUID_BITS>>,
+ Exp = case Expiry of
+ ?NO_EXPIRY -> undefined;
+ X -> X
+ end,
+ {Guid, #message_properties{expiry = Exp}};
+ Error ->
+ Error
+ end.
+
+%%----------------------------------------------------------------------------
%% journal manipulation
%%----------------------------------------------------------------------------
@@ -636,17 +678,13 @@ load_journal_entries(State = #qistate { journal_handle = Hdl }) ->
?ACK_JPREFIX ->
load_journal_entries(add_to_journal(SeqId, ack, State));
_ ->
- case file_handle_cache:read(Hdl, ?GUID_BYTES) of
- {ok, <<GuidNum:?GUID_BITS>>} ->
- %% work around for binary data
- %% fragmentation. See
- %% rabbit_msg_file:read_next/2
- <<Guid:?GUID_BYTES/binary>> =
- <<GuidNum:?GUID_BITS>>,
- Publish = {Guid, case Prefix of
- ?PUB_PERSIST_JPREFIX -> true;
- ?PUB_TRANS_JPREFIX -> false
- end},
+ case read_pub_record_body(Hdl) of
+ {Guid, MsgProps} ->
+ Publish = {Guid, MsgProps,
+ case Prefix of
+ ?PUB_PERSIST_JPREFIX -> true;
+ ?PUB_TRANS_JPREFIX -> false
+ end},
load_journal_entries(
add_to_journal(SeqId, Publish, State));
_ErrOrEoF -> %% err, we've lost at least a publish
@@ -744,11 +782,12 @@ write_entry_to_segment(RelSeq, {Pub, Del, Ack}, Hdl) ->
ok = case Pub of
no_pub ->
ok;
- {Guid, IsPersistent} ->
+ {Guid, MsgProps, IsPersistent} ->
file_handle_cache:append(
Hdl, [<<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS,
(bool_to_int(IsPersistent)):1,
- RelSeq:?REL_SEQ_BITS>>, Guid])
+ RelSeq:?REL_SEQ_BITS>>,
+ create_pub_record_body(Guid, MsgProps)])
end,
ok = case {Del, Ack} of
{no_del, no_ack} ->
@@ -768,10 +807,10 @@ read_bounded_segment(Seg, {StartSeg, StartRelSeq}, {EndSeg, EndRelSeq},
{Messages, Segments}, Dir) ->
Segment = segment_find_or_new(Seg, Dir, Segments),
{segment_entries_foldr(
- fun (RelSeq, {{Guid, IsPersistent}, IsDelivered, no_ack}, Acc)
+ fun (RelSeq, {{Guid, MsgProps, IsPersistent}, IsDelivered, no_ack}, Acc)
when (Seg > StartSeg orelse StartRelSeq =< RelSeq) andalso
(Seg < EndSeg orelse EndRelSeq >= RelSeq) ->
- [ {Guid, reconstruct_seq_id(StartSeg, RelSeq),
+ [ {Guid, reconstruct_seq_id(StartSeg, RelSeq), MsgProps,
IsPersistent, IsDelivered == del} | Acc ];
(_RelSeq, _Value, Acc) ->
Acc
@@ -801,10 +840,8 @@ load_segment_entries(KeepAcked, Hdl, SegEntries, UnackedCount) ->
case file_handle_cache:read(Hdl, ?REL_SEQ_ONLY_ENTRY_LENGTH_BYTES) of
{ok, <<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS,
IsPersistentNum:1, RelSeq:?REL_SEQ_BITS>>} ->
- %% because we specify /binary, and binaries are complete
- %% bytes, the size spec is in bytes, not bits.
- {ok, Guid} = file_handle_cache:read(Hdl, ?GUID_BYTES),
- Obj = {{Guid, 1 == IsPersistentNum}, no_del, no_ack},
+ {Guid, MsgProps} = read_pub_record_body(Hdl),
+ Obj = {{Guid, MsgProps, 1 == IsPersistentNum}, no_del, no_ack},
SegEntries1 = array:set(RelSeq, Obj, SegEntries),
load_segment_entries(KeepAcked, Hdl, SegEntries1,
UnackedCount + 1);
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 29004bd5..127467bb 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -235,12 +235,29 @@ conserve_memory(Pid, Conserve) ->
server_properties() ->
{ok, Product} = application:get_key(rabbit, id),
{ok, Version} = application:get_key(rabbit, vsn),
- [{list_to_binary(K), longstr, list_to_binary(V)} ||
- {K, V} <- [{"product", Product},
- {"version", Version},
- {"platform", "Erlang/OTP"},
- {"copyright", ?COPYRIGHT_MESSAGE},
- {"information", ?INFORMATION_MESSAGE}]].
+
+ %% Get any configuration-specified server properties
+ {ok, RawConfigServerProps} = application:get_env(rabbit,
+ server_properties),
+
+ %% Normalize the simplifed (2-tuple) and unsimplified (3-tuple) forms
+ %% from the config and merge them with the generated built-in properties
+ NormalizedConfigServerProps =
+ [case X of
+ {KeyAtom, Value} -> {list_to_binary(atom_to_list(KeyAtom)),
+ longstr,
+ list_to_binary(Value)};
+ {BinKey, Type, Value} -> {BinKey, Type, Value}
+ end || X <- RawConfigServerProps ++
+ [{product, Product},
+ {version, Version},
+ {platform, "Erlang/OTP"},
+ {copyright, ?COPYRIGHT_MESSAGE},
+ {information, ?INFORMATION_MESSAGE}]],
+
+ %% Filter duplicated properties in favor of config file provided values
+ lists:usort(fun ({K1,_,_}, {K2,_,_}) -> K1 =< K2 end,
+ NormalizedConfigServerProps).
inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F).
diff --git a/src/rabbit_ssl.erl b/src/rabbit_ssl.erl
index be451af6..4335dd2e 100644
--- a/src/rabbit_ssl.erl
+++ b/src/rabbit_ssl.erl
@@ -34,7 +34,6 @@
-include("rabbit.hrl").
-include_lib("public_key/include/public_key.hrl").
--include_lib("ssl/src/ssl_int.hrl").
-export([peer_cert_issuer/1, peer_cert_subject/1, peer_cert_validity/1]).
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 1b47cdb7..71b23e01 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -41,8 +41,8 @@
-include("rabbit_framing.hrl").
-include_lib("kernel/include/file.hrl").
--define(PERSISTENT_MSG_STORE, msg_store_persistent).
--define(TRANSIENT_MSG_STORE, msg_store_transient).
+-define(PERSISTENT_MSG_STORE, msg_store_persistent).
+-define(TRANSIENT_MSG_STORE, msg_store_transient).
test_content_prop_roundtrip(Datum, Binary) ->
Types = [element(1, E) || E <- Datum],
@@ -73,6 +73,7 @@ all_tests() ->
passed = test_user_management(),
passed = test_server_status(),
passed = maybe_run_cluster_dependent_tests(),
+ passed = test_configurable_server_properties(),
passed.
maybe_run_cluster_dependent_tests() ->
@@ -962,9 +963,6 @@ test_user_management() ->
control_action(list_permissions, [], [{"-p", "/testhost"}]),
{error, {invalid_regexp, _, _}} =
control_action(set_permissions, ["guest", "+foo", ".*", ".*"]),
- {error, {invalid_scope, _}} =
- control_action(set_permissions, ["guest", "foo", ".*", ".*"],
- [{"-s", "cilent"}]),
%% user creation
ok = control_action(add_user, ["foo", "bar"]),
@@ -987,9 +985,7 @@ test_user_management() ->
ok = control_action(set_permissions, ["foo", ".*", ".*", ".*"],
[{"-p", "/testhost"}]),
ok = control_action(set_permissions, ["foo", ".*", ".*", ".*"],
- [{"-p", "/testhost"}, {"-s", "client"}]),
- ok = control_action(set_permissions, ["foo", ".*", ".*", ".*"],
- [{"-p", "/testhost"}, {"-s", "all"}]),
+ [{"-p", "/testhost"}]),
ok = control_action(list_permissions, [], [{"-p", "/testhost"}]),
ok = control_action(list_permissions, [], [{"-p", "/testhost"}]),
ok = control_action(list_user_permissions, ["foo"]),
@@ -1297,7 +1293,7 @@ info_action(Command, Args, CheckVHost) ->
{bad_argument, dummy} = control_action(Command, ["dummy"]),
ok.
-default_options() -> [{"-s", "client"}, {"-p", "/"}, {"-q", "false"}].
+default_options() -> [{"-p", "/"}, {"-q", "false"}].
expand_options(As, Bs) ->
lists:foldl(fun({K, _}=A, R) ->
@@ -1413,6 +1409,7 @@ test_backing_queue() ->
application:set_env(rabbit, msg_store_file_size_limit,
FileSizeLimit, infinity),
passed = test_queue_index(),
+ passed = test_queue_index_props(),
passed = test_variable_queue(),
passed = test_queue_recover(),
application:set_env(rabbit, queue_index_max_journal_entries,
@@ -1430,17 +1427,17 @@ restart_msg_store_empty() ->
guid_bin(X) ->
erlang:md5(term_to_binary(X)).
-msg_store_contains(Atom, Guids) ->
+msg_store_contains(Atom, Guids, MSCState) ->
Atom = lists:foldl(
fun (Guid, Atom1) when Atom1 =:= Atom ->
- rabbit_msg_store:contains(?PERSISTENT_MSG_STORE, Guid) end,
+ rabbit_msg_store:contains(Guid, MSCState) end,
Atom, Guids).
-msg_store_sync(Guids) ->
+msg_store_sync(Guids, MSCState) ->
Ref = make_ref(),
Self = self(),
- ok = rabbit_msg_store:sync(?PERSISTENT_MSG_STORE, Guids,
- fun () -> Self ! {sync, Ref} end),
+ ok = rabbit_msg_store:sync(Guids, fun () -> Self ! {sync, Ref} end,
+ MSCState),
receive
{sync, Ref} -> ok
after
@@ -1452,55 +1449,64 @@ msg_store_sync(Guids) ->
msg_store_read(Guids, MSCState) ->
lists:foldl(fun (Guid, MSCStateM) ->
{{ok, Guid}, MSCStateN} = rabbit_msg_store:read(
- ?PERSISTENT_MSG_STORE,
Guid, MSCStateM),
MSCStateN
end, MSCState, Guids).
msg_store_write(Guids, MSCState) ->
- lists:foldl(fun (Guid, {ok, MSCStateN}) ->
- rabbit_msg_store:write(?PERSISTENT_MSG_STORE,
- Guid, Guid, MSCStateN)
- end, {ok, MSCState}, Guids).
+ ok = lists:foldl(
+ fun (Guid, ok) -> rabbit_msg_store:write(Guid, Guid, MSCState) end,
+ ok, Guids).
+
+msg_store_remove(Guids, MSCState) ->
+ rabbit_msg_store:remove(Guids, MSCState).
-msg_store_remove(Guids) ->
- rabbit_msg_store:remove(?PERSISTENT_MSG_STORE, Guids).
+msg_store_remove(MsgStore, Ref, Guids) ->
+ with_msg_store_client(MsgStore, Ref,
+ fun (MSCStateM) ->
+ ok = msg_store_remove(Guids, MSCStateM),
+ MSCStateM
+ end).
+
+with_msg_store_client(MsgStore, Ref, Fun) ->
+ rabbit_msg_store:client_terminate(
+ Fun(rabbit_msg_store:client_init(MsgStore, Ref))).
foreach_with_msg_store_client(MsgStore, Ref, Fun, L) ->
rabbit_msg_store:client_terminate(
- lists:foldl(fun (Guid, MSCState) -> Fun(Guid, MsgStore, MSCState) end,
- rabbit_msg_store:client_init(MsgStore, Ref), L), MsgStore).
+ lists:foldl(fun (Guid, MSCState) -> Fun(Guid, MSCState) end,
+ rabbit_msg_store:client_init(MsgStore, Ref), L)).
test_msg_store() ->
restart_msg_store_empty(),
Self = self(),
Guids = [guid_bin(M) || M <- lists:seq(1,100)],
{Guids1stHalf, Guids2ndHalf} = lists:split(50, Guids),
- %% check we don't contain any of the msgs we're about to publish
- false = msg_store_contains(false, Guids),
Ref = rabbit_guid:guid(),
MSCState = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref),
+ %% check we don't contain any of the msgs we're about to publish
+ false = msg_store_contains(false, Guids, MSCState),
%% publish the first half
- {ok, MSCState1} = msg_store_write(Guids1stHalf, MSCState),
+ ok = msg_store_write(Guids1stHalf, MSCState),
%% sync on the first half
- ok = msg_store_sync(Guids1stHalf),
+ ok = msg_store_sync(Guids1stHalf, MSCState),
%% publish the second half
- {ok, MSCState2} = msg_store_write(Guids2ndHalf, MSCState1),
+ ok = msg_store_write(Guids2ndHalf, MSCState),
%% sync on the first half again - the msg_store will be dirty, but
%% we won't need the fsync
- ok = msg_store_sync(Guids1stHalf),
+ ok = msg_store_sync(Guids1stHalf, MSCState),
%% check they're all in there
- true = msg_store_contains(true, Guids),
+ true = msg_store_contains(true, Guids, MSCState),
%% publish the latter half twice so we hit the caching and ref count code
- {ok, MSCState3} = msg_store_write(Guids2ndHalf, MSCState2),
+ ok = msg_store_write(Guids2ndHalf, MSCState),
%% check they're still all in there
- true = msg_store_contains(true, Guids),
+ true = msg_store_contains(true, Guids, MSCState),
%% sync on the 2nd half, but do lots of individual syncs to try
%% and cause coalescing to happen
ok = lists:foldl(
fun (Guid, ok) -> rabbit_msg_store:sync(
- ?PERSISTENT_MSG_STORE,
- [Guid], fun () -> Self ! {sync, Guid} end)
+ [Guid], fun () -> Self ! {sync, Guid} end,
+ MSCState)
end, ok, Guids2ndHalf),
lists:foldl(
fun(Guid, ok) ->
@@ -1515,24 +1521,24 @@ test_msg_store() ->
end, ok, Guids2ndHalf),
%% it's very likely we're not dirty here, so the 1st half sync
%% should hit a different code path
- ok = msg_store_sync(Guids1stHalf),
+ ok = msg_store_sync(Guids1stHalf, MSCState),
%% read them all
- MSCState4 = msg_store_read(Guids, MSCState3),
+ MSCState1 = msg_store_read(Guids, MSCState),
%% read them all again - this will hit the cache, not disk
- MSCState5 = msg_store_read(Guids, MSCState4),
+ MSCState2 = msg_store_read(Guids, MSCState1),
%% remove them all
- ok = rabbit_msg_store:remove(?PERSISTENT_MSG_STORE, Guids),
+ ok = rabbit_msg_store:remove(Guids, MSCState2),
%% check first half doesn't exist
- false = msg_store_contains(false, Guids1stHalf),
+ false = msg_store_contains(false, Guids1stHalf, MSCState2),
%% check second half does exist
- true = msg_store_contains(true, Guids2ndHalf),
+ true = msg_store_contains(true, Guids2ndHalf, MSCState2),
%% read the second half again
- MSCState6 = msg_store_read(Guids2ndHalf, MSCState5),
+ MSCState3 = msg_store_read(Guids2ndHalf, MSCState2),
%% release the second half, just for fun (aka code coverage)
- ok = rabbit_msg_store:release(?PERSISTENT_MSG_STORE, Guids2ndHalf),
+ ok = rabbit_msg_store:release(Guids2ndHalf, MSCState3),
%% read the second half again, just for fun (aka code coverage)
- MSCState7 = msg_store_read(Guids2ndHalf, MSCState6),
- ok = rabbit_msg_store:client_terminate(MSCState7, ?PERSISTENT_MSG_STORE),
+ MSCState4 = msg_store_read(Guids2ndHalf, MSCState3),
+ ok = rabbit_msg_store:client_terminate(MSCState4),
%% stop and restart, preserving every other msg in 2nd half
ok = rabbit_variable_queue:stop_msg_store(),
ok = rabbit_variable_queue:start_msg_store(
@@ -1543,22 +1549,26 @@ test_msg_store() ->
([Guid|GuidsTail]) ->
{Guid, 0, GuidsTail}
end, Guids2ndHalf}),
+ MSCState5 = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref),
%% check we have the right msgs left
lists:foldl(
fun (Guid, Bool) ->
- not(Bool = rabbit_msg_store:contains(?PERSISTENT_MSG_STORE, Guid))
+ not(Bool = rabbit_msg_store:contains(Guid, MSCState5))
end, false, Guids2ndHalf),
+ ok = rabbit_msg_store:client_terminate(MSCState5),
%% restart empty
restart_msg_store_empty(),
+ MSCState6 = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref),
%% check we don't contain any of the msgs
- false = msg_store_contains(false, Guids),
+ false = msg_store_contains(false, Guids, MSCState6),
%% publish the first half again
- MSCState8 = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref),
- {ok, MSCState9} = msg_store_write(Guids1stHalf, MSCState8),
+ ok = msg_store_write(Guids1stHalf, MSCState6),
%% this should force some sort of sync internally otherwise misread
ok = rabbit_msg_store:client_terminate(
- msg_store_read(Guids1stHalf, MSCState9), ?PERSISTENT_MSG_STORE),
- ok = rabbit_msg_store:remove(?PERSISTENT_MSG_STORE, Guids1stHalf),
+ msg_store_read(Guids1stHalf, MSCState6)),
+ MSCState7 = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref),
+ ok = rabbit_msg_store:remove(Guids1stHalf, MSCState7),
+ ok = rabbit_msg_store:client_terminate(MSCState7),
%% restart empty
restart_msg_store_empty(), %% now safe to reuse guids
%% push a lot of msgs in... at least 100 files worth
@@ -1567,31 +1577,39 @@ test_msg_store() ->
BigCount = trunc(100 * FileSize / (PayloadSizeBits div 8)),
GuidsBig = [guid_bin(X) || X <- lists:seq(1, BigCount)],
Payload = << 0:PayloadSizeBits >>,
- ok = foreach_with_msg_store_client(
+ ok = with_msg_store_client(
?PERSISTENT_MSG_STORE, Ref,
- fun (Guid, MsgStore, MSCStateM) ->
- {ok, MSCStateN} = rabbit_msg_store:write(
- MsgStore, Guid, Payload, MSCStateM),
- MSCStateN
- end, GuidsBig),
+ fun (MSCStateM) ->
+ [ok = rabbit_msg_store:write(Guid, Payload, MSCStateM) ||
+ Guid <- GuidsBig],
+ MSCStateM
+ end),
%% now read them to ensure we hit the fast client-side reading
ok = foreach_with_msg_store_client(
?PERSISTENT_MSG_STORE, Ref,
- fun (Guid, MsgStore, MSCStateM) ->
+ fun (Guid, MSCStateM) ->
{{ok, Payload}, MSCStateN} = rabbit_msg_store:read(
- MsgStore, Guid, MSCStateM),
+ Guid, MSCStateM),
MSCStateN
end, GuidsBig),
%% .., then 3s by 1...
- ok = msg_store_remove([guid_bin(X) || X <- lists:seq(BigCount, 1, -3)]),
+ ok = msg_store_remove(?PERSISTENT_MSG_STORE, Ref,
+ [guid_bin(X) || X <- lists:seq(BigCount, 1, -3)]),
%% .., then remove 3s by 2, from the young end first. This hits
%% GC (under 50% good data left, but no empty files. Must GC).
- ok = msg_store_remove([guid_bin(X) || X <- lists:seq(BigCount-1, 1, -3)]),
+ ok = msg_store_remove(?PERSISTENT_MSG_STORE, Ref,
+ [guid_bin(X) || X <- lists:seq(BigCount-1, 1, -3)]),
%% .., then remove 3s by 3, from the young end first. This hits
%% GC...
- ok = msg_store_remove([guid_bin(X) || X <- lists:seq(BigCount-2, 1, -3)]),
+ ok = msg_store_remove(?PERSISTENT_MSG_STORE, Ref,
+ [guid_bin(X) || X <- lists:seq(BigCount-2, 1, -3)]),
%% ensure empty
- false = msg_store_contains(false, GuidsBig),
+ ok = with_msg_store_client(
+ ?PERSISTENT_MSG_STORE, Ref,
+ fun (MSCStateM) ->
+ false = msg_store_contains(false, GuidsBig, MSCStateM),
+ MSCStateM
+ end),
%% restart empty
restart_msg_store_empty(),
passed.
@@ -1603,11 +1621,18 @@ test_queue() ->
queue_name(<<"test">>).
init_test_queue() ->
- rabbit_queue_index:init(
- test_queue(), true, false,
- fun (Guid) ->
- rabbit_msg_store:contains(?PERSISTENT_MSG_STORE, Guid)
- end).
+ TestQueue = test_queue(),
+ Terms = rabbit_queue_index:shutdown_terms(TestQueue),
+ PRef = proplists:get_value(persistent_ref, Terms, rabbit_guid:guid()),
+ PersistentClient = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE,
+ PRef),
+ Res = rabbit_queue_index:recover(
+ TestQueue, Terms, false,
+ fun (Guid) ->
+ rabbit_msg_store:contains(Guid, PersistentClient)
+ end),
+ ok = rabbit_msg_store:client_delete_and_terminate(PersistentClient),
+ Res.
restart_test_queue(Qi) ->
_ = rabbit_queue_index:terminate([], Qi),
@@ -1618,13 +1643,13 @@ restart_test_queue(Qi) ->
empty_test_queue() ->
ok = rabbit_variable_queue:stop(),
ok = rabbit_variable_queue:start([]),
- {0, _Terms, Qi} = init_test_queue(),
+ {0, Qi} = init_test_queue(),
_ = rabbit_queue_index:delete_and_terminate(Qi),
ok.
with_empty_test_queue(Fun) ->
ok = empty_test_queue(),
- {0, _Terms, Qi} = init_test_queue(),
+ {0, Qi} = init_test_queue(),
rabbit_queue_index:delete_and_terminate(Fun(Qi)).
queue_index_publish(SeqIds, Persistent, Qi) ->
@@ -1633,29 +1658,44 @@ queue_index_publish(SeqIds, Persistent, Qi) ->
true -> ?PERSISTENT_MSG_STORE;
false -> ?TRANSIENT_MSG_STORE
end,
- {A, B, MSCStateEnd} =
+ MSCState = rabbit_msg_store:client_init(MsgStore, Ref),
+ {A, B} =
lists:foldl(
- fun (SeqId, {QiN, SeqIdsGuidsAcc, MSCStateN}) ->
+ fun (SeqId, {QiN, SeqIdsGuidsAcc}) ->
Guid = rabbit_guid:guid(),
QiM = rabbit_queue_index:publish(
- Guid, SeqId, Persistent, QiN),
- {ok, MSCStateM} = rabbit_msg_store:write(MsgStore, Guid,
- Guid, MSCStateN),
- {QiM, [{SeqId, Guid} | SeqIdsGuidsAcc], MSCStateM}
- end, {Qi, [], rabbit_msg_store:client_init(MsgStore, Ref)}, SeqIds),
- ok = rabbit_msg_store:client_delete_and_terminate(
- MSCStateEnd, MsgStore, Ref),
+ Guid, SeqId, #message_properties{}, Persistent, QiN),
+ ok = rabbit_msg_store:write(Guid, Guid, MSCState),
+ {QiM, [{SeqId, Guid} | SeqIdsGuidsAcc]}
+ end, {Qi, []}, SeqIds),
+ ok = rabbit_msg_store:client_delete_and_terminate(MSCState),
{A, B}.
verify_read_with_published(_Delivered, _Persistent, [], _) ->
ok;
verify_read_with_published(Delivered, Persistent,
- [{Guid, SeqId, Persistent, Delivered}|Read],
+ [{Guid, SeqId, _Props, Persistent, Delivered}|Read],
[{SeqId, Guid}|Published]) ->
verify_read_with_published(Delivered, Persistent, Read, Published);
verify_read_with_published(_Delivered, _Persistent, _Read, _Published) ->
ko.
+test_queue_index_props() ->
+ with_empty_test_queue(
+ fun(Qi0) ->
+ Guid = rabbit_guid:guid(),
+ Props = #message_properties{expiry=12345},
+ Qi1 = rabbit_queue_index:publish(Guid, 1, Props, true, Qi0),
+ {[{Guid, 1, Props, _, _}], Qi2} =
+ rabbit_queue_index:read(1, 2, Qi1),
+ Qi2
+ end),
+
+ ok = rabbit_variable_queue:stop(),
+ ok = rabbit_variable_queue:start([]),
+
+ passed.
+
test_queue_index() ->
SegmentSize = rabbit_queue_index:next_segment_boundary(0),
TwoSegs = SegmentSize + SegmentSize,
@@ -1674,7 +1714,7 @@ test_queue_index() ->
ok = verify_read_with_published(false, false, ReadA,
lists:reverse(SeqIdsGuidsA)),
%% should get length back as 0, as all the msgs were transient
- {0, _Terms1, Qi6} = restart_test_queue(Qi4),
+ {0, Qi6} = restart_test_queue(Qi4),
{0, 0, Qi7} = rabbit_queue_index:bounds(Qi6),
{Qi8, SeqIdsGuidsB} = queue_index_publish(SeqIdsB, true, Qi7),
{0, TwoSegs, Qi9} = rabbit_queue_index:bounds(Qi8),
@@ -1683,7 +1723,7 @@ test_queue_index() ->
lists:reverse(SeqIdsGuidsB)),
%% should get length back as MostOfASegment
LenB = length(SeqIdsB),
- {LenB, _Terms2, Qi12} = restart_test_queue(Qi10),
+ {LenB, Qi12} = restart_test_queue(Qi10),
{0, TwoSegs, Qi13} = rabbit_queue_index:bounds(Qi12),
Qi14 = rabbit_queue_index:deliver(SeqIdsB, Qi13),
{ReadC, Qi15} = rabbit_queue_index:read(0, SegmentSize, Qi14),
@@ -1695,7 +1735,7 @@ test_queue_index() ->
{0, 0, Qi18} = rabbit_queue_index:bounds(Qi17),
%% should get length back as 0 because all persistent
%% msgs have been acked
- {0, _Terms3, Qi19} = restart_test_queue(Qi18),
+ {0, Qi19} = restart_test_queue(Qi18),
Qi19
end),
@@ -1767,11 +1807,11 @@ test_queue_index() ->
true, Qi0),
Qi2 = rabbit_queue_index:deliver([0,1,4], Qi1),
Qi3 = rabbit_queue_index:ack([0], Qi2),
- {5, _Terms9, Qi4} = restart_test_queue(Qi3),
+ {5, Qi4} = restart_test_queue(Qi3),
{Qi5, _SeqIdsGuidsF} = queue_index_publish([3,6,8], true, Qi4),
Qi6 = rabbit_queue_index:deliver([2,3,5,6], Qi5),
Qi7 = rabbit_queue_index:ack([1,2,3], Qi6),
- {5, _Terms10, Qi8} = restart_test_queue(Qi7),
+ {5, Qi8} = restart_test_queue(Qi7),
Qi8
end),
@@ -1789,7 +1829,8 @@ variable_queue_publish(IsPersistent, Count, VQ) ->
<<>>, #'P_basic'{delivery_mode = case IsPersistent of
true -> 2;
false -> 1
- end}, <<>>), VQN)
+ end}, <<>>),
+ #message_properties{}, VQN)
end, VQ, lists:seq(1, Count)).
variable_queue_fetch(Count, IsPersistent, IsDelivered, Len, VQ) ->
@@ -1823,9 +1864,41 @@ test_variable_queue() ->
F <- [fun test_variable_queue_dynamic_duration_change/1,
fun test_variable_queue_partial_segments_delta_thing/1,
fun test_variable_queue_all_the_bits_not_covered_elsewhere1/1,
- fun test_variable_queue_all_the_bits_not_covered_elsewhere2/1]],
+ fun test_variable_queue_all_the_bits_not_covered_elsewhere2/1,
+ fun test_dropwhile/1]],
passed.
+test_dropwhile(VQ0) ->
+ Count = 10,
+
+ %% add messages with sequential expiry
+ VQ1 = lists:foldl(
+ fun (N, VQN) ->
+ rabbit_variable_queue:publish(
+ rabbit_basic:message(
+ rabbit_misc:r(<<>>, exchange, <<>>),
+ <<>>, #'P_basic'{}, <<>>),
+ #message_properties{expiry = N}, VQN)
+ end, VQ0, lists:seq(1, Count)),
+
+ %% drop the first 5 messages
+ VQ2 = rabbit_variable_queue:dropwhile(
+ fun(#message_properties { expiry = Expiry }) ->
+ Expiry =< 5
+ end, VQ1),
+
+ %% fetch five now
+ VQ3 = lists:foldl(fun (_N, VQN) ->
+ {{#basic_message{}, _, _, _}, VQM} =
+ rabbit_variable_queue:fetch(false, VQN),
+ VQM
+ end, VQ2, lists:seq(6, Count)),
+
+ %% should be empty now
+ {empty, VQ4} = rabbit_variable_queue:fetch(false, VQ3),
+
+ VQ4.
+
test_variable_queue_dynamic_duration_change(VQ0) ->
SegmentSize = rabbit_queue_index:next_segment_boundary(0),
@@ -1836,6 +1909,7 @@ test_variable_queue_dynamic_duration_change(VQ0) ->
%% squeeze and relax queue
Churn = Len div 32,
VQ2 = publish_fetch_and_ack(Churn, Len, VQ1),
+
{Duration, VQ3} = rabbit_variable_queue:ram_duration(VQ2),
VQ7 = lists:foldl(
fun (Duration1, VQ4) ->
@@ -1934,7 +2008,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) ->
VQ1 = rabbit_variable_queue:set_ram_duration_target(0, VQ0),
VQ2 = variable_queue_publish(false, 4, VQ1),
{VQ3, AckTags} = variable_queue_fetch(2, false, false, 4, VQ2),
- VQ4 = rabbit_variable_queue:requeue(AckTags, VQ3),
+ VQ4 = rabbit_variable_queue:requeue(AckTags, fun(X) -> X end, VQ3),
VQ5 = rabbit_variable_queue:idle_timeout(VQ4),
_VQ6 = rabbit_variable_queue:terminate(VQ5),
VQ7 = rabbit_variable_queue:init(test_queue(), true, true),
@@ -1974,3 +2048,56 @@ test_queue_recover() ->
rabbit_amqqueue:internal_delete(QName)
end),
passed.
+
+test_configurable_server_properties() ->
+ %% List of the names of the built-in properties do we expect to find
+ BuiltInPropNames = [<<"product">>, <<"version">>, <<"platform">>,
+ <<"copyright">>, <<"information">>],
+
+ %% Verify that the built-in properties are initially present
+ ActualPropNames = [Key ||
+ {Key, longstr, _} <- rabbit_reader:server_properties()],
+ true = lists:all(fun (X) -> lists:member(X, ActualPropNames) end,
+ BuiltInPropNames),
+
+ %% Get the initial server properties configured in the environment
+ {ok, ServerProperties} = application:get_env(rabbit, server_properties),
+
+ %% Helper functions
+ ConsProp = fun (X) -> application:set_env(rabbit,
+ server_properties,
+ [X | ServerProperties]) end,
+ IsPropPresent = fun (X) -> lists:member(X,
+ rabbit_reader:server_properties())
+ end,
+
+ %% Add a wholly new property of the simplified {KeyAtom, StringValue} form
+ NewSimplifiedProperty = {NewHareKey, NewHareVal} = {hare, "soup"},
+ ConsProp(NewSimplifiedProperty),
+ %% Do we find hare soup, appropriately formatted in the generated properties?
+ ExpectedHareImage = {list_to_binary(atom_to_list(NewHareKey)),
+ longstr,
+ list_to_binary(NewHareVal)},
+ true = IsPropPresent(ExpectedHareImage),
+
+ %% Add a wholly new property of the {BinaryKey, Type, Value} form
+ %% and check for it
+ NewProperty = {<<"new-bin-key">>, signedint, -1},
+ ConsProp(NewProperty),
+ %% Do we find the new property?
+ true = IsPropPresent(NewProperty),
+
+ %% Add a property that clobbers a built-in, and verify correct clobbering
+ {NewVerKey, NewVerVal} = NewVersion = {version, "X.Y.Z."},
+ {BinNewVerKey, BinNewVerVal} = {list_to_binary(atom_to_list(NewVerKey)),
+ list_to_binary(NewVerVal)},
+ ConsProp(NewVersion),
+ ClobberedServerProps = rabbit_reader:server_properties(),
+ %% Is the clobbering insert present?
+ true = IsPropPresent({BinNewVerKey, longstr, BinNewVerVal}),
+ %% Is the clobbering insert the only thing with the clobbering key?
+ [{BinNewVerKey, longstr, BinNewVerVal}] =
+ [E || {K, longstr, _V} = E <- ClobberedServerProps, K =:= BinNewVerKey],
+
+ application:set_env(rabbit, server_properties, ServerProperties),
+ passed.
diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl
index b971a63f..91f2c4ca 100644
--- a/src/rabbit_types.erl
+++ b/src/rabbit_types.erl
@@ -37,8 +37,8 @@
-export_type([txn/0, maybe/1, info/0, info_key/0, message/0, basic_message/0,
delivery/0, content/0, decoded_content/0, undecoded_content/0,
- unencoded_content/0, encoded_content/0, vhost/0, ctag/0,
- amqp_error/0, r/1, r2/2, r3/3, listener/0,
+ unencoded_content/0, encoded_content/0, message_properties/0,
+ vhost/0, ctag/0, amqp_error/0, r/1, r2/2, r3/3, listener/0,
binding/0, binding_source/0, binding_destination/0,
amqqueue/0, exchange/0,
connection/0, protocol/0, user/0, ok/1, error/1, ok_or_error/1,
@@ -88,6 +88,8 @@
txn :: maybe(txn()),
sender :: pid(),
message :: message()}).
+-type(message_properties() ::
+ #message_properties{expiry :: pos_integer() | 'undefined'}).
%% this is really an abstract type, but dialyzer does not support them
-type(txn() :: rabbit_guid:guid()).
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index fef44037..ac8fcfee 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -32,9 +32,9 @@
-module(rabbit_variable_queue).
-export([init/3, terminate/1, delete_and_terminate/1,
- purge/1, publish/2, publish_delivered/3, fetch/2, ack/2,
- tx_publish/3, tx_ack/3, tx_rollback/2, tx_commit/3,
- requeue/2, len/1, is_empty/1,
+ purge/1, publish/3, publish_delivered/4, fetch/2, ack/2,
+ tx_publish/4, tx_ack/3, tx_rollback/2, tx_commit/4,
+ requeue/3, len/1, is_empty/1, dropwhile/2,
set_ram_duration_target/2, ram_duration/1,
needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1,
status/1]).
@@ -247,7 +247,8 @@
is_persistent,
is_delivered,
msg_on_disk,
- index_on_disk
+ index_on_disk,
+ msg_props
}).
-record(delta,
@@ -293,7 +294,8 @@
-type(sync() :: #sync { acks_persistent :: [[seq_id()]],
acks_all :: [[seq_id()]],
- pubs :: [[rabbit_guid:guid()]],
+ pubs :: [{message_properties_transformer(),
+ [rabbit_types:basic_message()]}],
funs :: [fun (() -> any())] }).
-type(state() :: #vqstate {
@@ -366,16 +368,17 @@ stop_msg_store() ->
ok = rabbit_sup:stop_child(?PERSISTENT_MSG_STORE),
ok = rabbit_sup:stop_child(?TRANSIENT_MSG_STORE).
-init(QueueName, IsDurable, Recover) ->
- {DeltaCount, Terms, IndexState} =
- rabbit_queue_index:init(
- QueueName, Recover,
- rabbit_msg_store:successfully_recovered_state(?PERSISTENT_MSG_STORE),
- fun (Guid) ->
- rabbit_msg_store:contains(?PERSISTENT_MSG_STORE, Guid)
- end),
- {LowSeqId, NextSeqId, IndexState1} = rabbit_queue_index:bounds(IndexState),
+init(QueueName, IsDurable, false) ->
+ IndexState = rabbit_queue_index:init(QueueName),
+ init(IsDurable, IndexState, 0, [],
+ case IsDurable of
+ true -> msg_store_client_init(?PERSISTENT_MSG_STORE);
+ false -> undefined
+ end,
+ msg_store_client_init(?TRANSIENT_MSG_STORE));
+init(QueueName, true, true) ->
+ Terms = rabbit_queue_index:shutdown_terms(QueueName),
{PRef, TRef, Terms1} =
case [persistent_ref, transient_ref] -- proplists:get_keys(Terms) of
[] -> {proplists:get_value(persistent_ref, Terms),
@@ -383,63 +386,32 @@ init(QueueName, IsDurable, Recover) ->
Terms};
_ -> {rabbit_guid:guid(), rabbit_guid:guid(), []}
end,
- DeltaCount1 = proplists:get_value(persistent_count, Terms1, DeltaCount),
- Delta = case DeltaCount1 == 0 andalso DeltaCount /= undefined of
- true -> ?BLANK_DELTA;
- false -> #delta { start_seq_id = LowSeqId,
- count = DeltaCount1,
- end_seq_id = NextSeqId }
- end,
- Now = now(),
- PersistentClient =
- case IsDurable of
- true -> rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, PRef);
- false -> undefined
- end,
- TransientClient = rabbit_msg_store:client_init(?TRANSIENT_MSG_STORE, TRef),
- State = #vqstate {
- q1 = queue:new(),
- q2 = bpqueue:new(),
- delta = Delta,
- q3 = bpqueue:new(),
- q4 = queue:new(),
- next_seq_id = NextSeqId,
- pending_ack = dict:new(),
- index_state = IndexState1,
- msg_store_clients = {{PersistentClient, PRef},
- {TransientClient, TRef}},
- on_sync = ?BLANK_SYNC,
- durable = IsDurable,
- transient_threshold = NextSeqId,
-
- len = DeltaCount1,
- persistent_count = DeltaCount1,
-
- target_ram_msg_count = infinity,
- ram_msg_count = 0,
- ram_msg_count_prev = 0,
- ram_index_count = 0,
- out_counter = 0,
- in_counter = 0,
- rates = #rates { egress = {Now, 0},
- ingress = {Now, DeltaCount1},
- avg_egress = 0.0,
- avg_ingress = 0.0,
- timestamp = Now } },
- a(maybe_deltas_to_betas(State)).
+ PersistentClient = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE,
+ PRef),
+ TransientClient = rabbit_msg_store:client_init(?TRANSIENT_MSG_STORE,
+ TRef),
+ {DeltaCount, IndexState} =
+ rabbit_queue_index:recover(
+ QueueName, Terms1,
+ rabbit_msg_store:successfully_recovered_state(?PERSISTENT_MSG_STORE),
+ fun (Guid) ->
+ rabbit_msg_store:contains(Guid, PersistentClient)
+ end),
+ init(true, IndexState, DeltaCount, Terms1,
+ PersistentClient, TransientClient).
terminate(State) ->
State1 = #vqstate { persistent_count = PCount,
index_state = IndexState,
- msg_store_clients = {{MSCStateP, PRef},
- {MSCStateT, TRef}} } =
+ msg_store_clients = {MSCStateP, MSCStateT} } =
remove_pending_ack(true, tx_commit_index(State)),
- case MSCStateP of
- undefined -> ok;
- _ -> rabbit_msg_store:client_terminate(
- MSCStateP, ?PERSISTENT_MSG_STORE)
- end,
- rabbit_msg_store:client_terminate(MSCStateT, ?TRANSIENT_MSG_STORE),
+ PRef = case MSCStateP of
+ undefined -> undefined;
+ _ -> ok = rabbit_msg_store:client_terminate(MSCStateP),
+ rabbit_msg_store:client_ref(MSCStateP)
+ end,
+ ok = rabbit_msg_store:client_terminate(MSCStateT),
+ TRef = rabbit_msg_store:client_ref(MSCStateT),
Terms = [{persistent_ref, PRef},
{transient_ref, TRef},
{persistent_count, PCount}],
@@ -455,191 +427,247 @@ delete_and_terminate(State) ->
%% deleting it.
{_PurgeCount, State1} = purge(State),
State2 = #vqstate { index_state = IndexState,
- msg_store_clients = {{MSCStateP, PRef},
- {MSCStateT, TRef}} } =
+ msg_store_clients = {MSCStateP, MSCStateT} } =
remove_pending_ack(false, State1),
IndexState1 = rabbit_queue_index:delete_and_terminate(IndexState),
case MSCStateP of
undefined -> ok;
- _ -> rabbit_msg_store:client_delete_and_terminate(
- MSCStateP, ?PERSISTENT_MSG_STORE, PRef)
+ _ -> rabbit_msg_store:client_delete_and_terminate(MSCStateP)
end,
- rabbit_msg_store:client_delete_and_terminate(
- MSCStateT, ?TRANSIENT_MSG_STORE, TRef),
+ rabbit_msg_store:client_delete_and_terminate(MSCStateT),
a(State2 #vqstate { index_state = IndexState1,
msg_store_clients = undefined }).
-purge(State = #vqstate { q4 = Q4,
- index_state = IndexState,
- len = Len,
- persistent_count = PCount }) ->
+purge(State = #vqstate { q4 = Q4,
+ index_state = IndexState,
+ msg_store_clients = MSCState,
+ len = Len,
+ persistent_count = PCount }) ->
%% TODO: when there are no pending acks, which is a common case,
%% we could simply wipe the qi instead of issuing delivers and
%% acks for all the messages.
{LensByStore, IndexState1} = remove_queue_entries(
fun rabbit_misc:queue_fold/3, Q4,
- orddict:new(), IndexState),
- {LensByStore1, State1 = #vqstate { q1 = Q1, index_state = IndexState2 }} =
+ orddict:new(), IndexState, MSCState),
+ {LensByStore1, State1 = #vqstate { q1 = Q1,
+ index_state = IndexState2,
+ msg_store_clients = MSCState1 }} =
purge_betas_and_deltas(LensByStore,
State #vqstate { q4 = queue:new(),
index_state = IndexState1 }),
{LensByStore2, IndexState3} = remove_queue_entries(
fun rabbit_misc:queue_fold/3, Q1,
- LensByStore1, IndexState2),
+ LensByStore1, IndexState2, MSCState1),
PCount1 = PCount - find_persistent_count(LensByStore2),
- {Len, a(State1 #vqstate { q1 = queue:new(),
- index_state = IndexState3,
- len = 0,
- ram_msg_count = 0,
- ram_index_count = 0,
- persistent_count = PCount1 })}.
-
-publish(Msg, State) ->
- {_SeqId, State1} = publish(Msg, false, false, State),
+ {Len, a(State1 #vqstate { q1 = queue:new(),
+ index_state = IndexState3,
+ len = 0,
+ ram_msg_count = 0,
+ ram_index_count = 0,
+ persistent_count = PCount1 })}.
+
+publish(Msg, MsgProps, State) ->
+ {_SeqId, State1} = publish(Msg, MsgProps, false, false, State),
a(reduce_memory_use(State1)).
-publish_delivered(false, _Msg, State = #vqstate { len = 0 }) ->
+publish_delivered(false, _Msg, _MsgProps, State = #vqstate { len = 0 }) ->
{blank_ack, a(State)};
publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent },
- State = #vqstate { len = 0,
- next_seq_id = SeqId,
- out_counter = OutCount,
- in_counter = InCount,
- persistent_count = PCount,
- pending_ack = PA,
- durable = IsDurable }) ->
+ MsgProps,
+ State = #vqstate { len = 0,
+ next_seq_id = SeqId,
+ out_counter = OutCount,
+ in_counter = InCount,
+ persistent_count = PCount,
+ pending_ack = PA,
+ durable = IsDurable }) ->
IsPersistent1 = IsDurable andalso IsPersistent,
- MsgStatus = (msg_status(IsPersistent1, SeqId, Msg))
+ MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, MsgProps))
#msg_status { is_delivered = true },
{MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State),
PA1 = record_pending_ack(m(MsgStatus1), PA),
PCount1 = PCount + one_if(IsPersistent1),
- {SeqId, a(State1 #vqstate { next_seq_id = SeqId + 1,
- out_counter = OutCount + 1,
- in_counter = InCount + 1,
- persistent_count = PCount1,
- pending_ack = PA1 })}.
-
-fetch(AckRequired, State = #vqstate { q4 = Q4,
- ram_msg_count = RamMsgCount,
- out_counter = OutCount,
- index_state = IndexState,
- len = Len,
- persistent_count = PCount,
- pending_ack = PA }) ->
+ {SeqId, a(State1 #vqstate { next_seq_id = SeqId + 1,
+ out_counter = OutCount + 1,
+ in_counter = InCount + 1,
+ persistent_count = PCount1,
+ pending_ack = PA1 })}.
+
+dropwhile(Pred, State) ->
+ {_OkOrEmpty, State1} = dropwhile1(Pred, State),
+ State1.
+
+dropwhile1(Pred, State) ->
+ internal_queue_out(
+ fun(MsgStatus = #msg_status { msg_props = MsgProps }, State1) ->
+ case Pred(MsgProps) of
+ true ->
+ {_, State2} = internal_fetch(false, MsgStatus, State1),
+ dropwhile1(Pred, State2);
+ false ->
+ %% message needs to go back into Q4 (or maybe go
+ %% in for the first time if it was loaded from
+ %% Q3). Also the msg contents might not be in
+ %% RAM, so read them in now
+ {MsgStatus1, State2 = #vqstate { q4 = Q4 }} =
+ read_msg(MsgStatus, State1),
+ {ok, State2 #vqstate {q4 = queue:in_r(MsgStatus1, Q4) }}
+ end
+ end, State).
+
+fetch(AckRequired, State) ->
+ internal_queue_out(
+ fun(MsgStatus, State1) ->
+ %% it's possible that the message wasn't read from disk
+ %% at this point, so read it in.
+ {MsgStatus1, State2} = read_msg(MsgStatus, State1),
+ internal_fetch(AckRequired, MsgStatus1, State2)
+ end, State).
+
+internal_queue_out(Fun, State = #vqstate { q4 = Q4 }) ->
case queue:out(Q4) of
{empty, _Q4} ->
- case fetch_from_q3_to_q4(State) of
- {empty, State1} = Result -> a(State1), Result;
- {loaded, State1} -> fetch(AckRequired, State1)
+ case fetch_from_q3(State) of
+ {empty, State1} = Result -> a(State1), Result;
+ {loaded, {MsgStatus, State1}} -> Fun(MsgStatus, State1)
end;
- {{value, MsgStatus = #msg_status {
- msg = Msg, guid = Guid, seq_id = SeqId,
- is_persistent = IsPersistent, is_delivered = IsDelivered,
- msg_on_disk = MsgOnDisk, index_on_disk = IndexOnDisk }},
- Q4a} ->
-
- %% 1. Mark it delivered if necessary
- IndexState1 = maybe_write_delivered(
- IndexOnDisk andalso not IsDelivered,
- SeqId, IndexState),
-
- %% 2. Remove from msg_store and queue index, if necessary
- MsgStore = find_msg_store(IsPersistent),
- Rem = fun () -> ok = rabbit_msg_store:remove(MsgStore, [Guid]) end,
- Ack = fun () -> rabbit_queue_index:ack([SeqId], IndexState1) end,
- IndexState2 =
- case {AckRequired, MsgOnDisk, IndexOnDisk, IsPersistent} of
- {false, true, false, _} -> Rem(), IndexState1;
- {false, true, true, _} -> Rem(), Ack();
- { true, true, true, false} -> Ack();
- _ -> IndexState1
- end,
-
- %% 3. If an ack is required, add something sensible to PA
- {AckTag, PA1} = case AckRequired of
- true -> PA2 = record_pending_ack(
- MsgStatus #msg_status {
- is_delivered = true }, PA),
- {SeqId, PA2};
- false -> {blank_ack, PA}
- end,
-
- PCount1 = PCount - one_if(IsPersistent andalso not AckRequired),
- Len1 = Len - 1,
- {{Msg, IsDelivered, AckTag, Len1},
- a(State #vqstate { q4 = Q4a,
- ram_msg_count = RamMsgCount - 1,
- out_counter = OutCount + 1,
- index_state = IndexState2,
- len = Len1,
- persistent_count = PCount1,
- pending_ack = PA1 })}
+ {{value, MsgStatus}, Q4a} ->
+ Fun(MsgStatus, State #vqstate { q4 = Q4a })
end.
+read_msg(MsgStatus = #msg_status { msg = undefined,
+ guid = Guid,
+ is_persistent = IsPersistent },
+ State = #vqstate { ram_msg_count = RamMsgCount,
+ msg_store_clients = MSCState}) ->
+ {{ok, Msg = #basic_message {}}, MSCState1} =
+ msg_store_read(MSCState, IsPersistent, Guid),
+ {MsgStatus #msg_status { msg = Msg },
+ State #vqstate { ram_msg_count = RamMsgCount + 1,
+ msg_store_clients = MSCState1 }};
+read_msg(MsgStatus, State) ->
+ {MsgStatus, State}.
+
+internal_fetch(AckRequired, MsgStatus = #msg_status {
+ seq_id = SeqId,
+ guid = Guid,
+ msg = Msg,
+ is_persistent = IsPersistent,
+ is_delivered = IsDelivered,
+ msg_on_disk = MsgOnDisk,
+ index_on_disk = IndexOnDisk },
+ State = #vqstate {ram_msg_count = RamMsgCount,
+ out_counter = OutCount,
+ index_state = IndexState,
+ msg_store_clients = MSCState,
+ len = Len,
+ persistent_count = PCount,
+ pending_ack = PA }) ->
+ %% 1. Mark it delivered if necessary
+ IndexState1 = maybe_write_delivered(
+ IndexOnDisk andalso not IsDelivered,
+ SeqId, IndexState),
+
+ %% 2. Remove from msg_store and queue index, if necessary
+ Rem = fun () ->
+ ok = msg_store_remove(MSCState, IsPersistent, [Guid])
+ end,
+ Ack = fun () -> rabbit_queue_index:ack([SeqId], IndexState1) end,
+ IndexState2 =
+ case {AckRequired, MsgOnDisk, IndexOnDisk, IsPersistent} of
+ {false, true, false, _} -> Rem(), IndexState1;
+ {false, true, true, _} -> Rem(), Ack();
+ { true, true, true, false} -> Ack();
+ _ -> IndexState1
+ end,
+
+ %% 3. If an ack is required, add something sensible to PA
+ {AckTag, PA1} = case AckRequired of
+ true -> PA2 = record_pending_ack(
+ MsgStatus #msg_status {
+ is_delivered = true }, PA),
+ {SeqId, PA2};
+ false -> {blank_ack, PA}
+ end,
+
+ PCount1 = PCount - one_if(IsPersistent andalso not AckRequired),
+ Len1 = Len - 1,
+ RamMsgCount1 = RamMsgCount - one_if(Msg =/= undefined),
+
+ {{Msg, IsDelivered, AckTag, Len1},
+ a(State #vqstate { ram_msg_count = RamMsgCount1,
+ out_counter = OutCount + 1,
+ index_state = IndexState2,
+ len = Len1,
+ persistent_count = PCount1,
+ pending_ack = PA1 })}.
+
ack(AckTags, State) ->
- a(ack(fun rabbit_msg_store:remove/2,
+ a(ack(fun msg_store_remove/3,
fun (_AckEntry, State1) -> State1 end,
AckTags, State)).
-tx_publish(Txn, Msg = #basic_message { is_persistent = IsPersistent },
+tx_publish(Txn, Msg = #basic_message { is_persistent = IsPersistent }, MsgProps,
State = #vqstate { durable = IsDurable,
msg_store_clients = MSCState }) ->
Tx = #tx { pending_messages = Pubs } = lookup_tx(Txn),
- store_tx(Txn, Tx #tx { pending_messages = [Msg | Pubs] }),
- a(case IsPersistent andalso IsDurable of
- true -> MsgStatus = msg_status(true, undefined, Msg),
- {#msg_status { msg_on_disk = true }, MSCState1} =
- maybe_write_msg_to_disk(false, MsgStatus, MSCState),
- State #vqstate { msg_store_clients = MSCState1 };
- false -> State
- end).
+ store_tx(Txn, Tx #tx { pending_messages = [{Msg, MsgProps} | Pubs] }),
+ case IsPersistent andalso IsDurable of
+ true -> MsgStatus = msg_status(true, undefined, Msg, MsgProps),
+ #msg_status { msg_on_disk = true } =
+ maybe_write_msg_to_disk(false, MsgStatus, MSCState);
+ false -> ok
+ end,
+ a(State).
tx_ack(Txn, AckTags, State) ->
Tx = #tx { pending_acks = Acks } = lookup_tx(Txn),
store_tx(Txn, Tx #tx { pending_acks = [AckTags | Acks] }),
State.
-tx_rollback(Txn, State = #vqstate { durable = IsDurable }) ->
+tx_rollback(Txn, State = #vqstate { durable = IsDurable,
+ msg_store_clients = MSCState }) ->
#tx { pending_acks = AckTags, pending_messages = Pubs } = lookup_tx(Txn),
erase_tx(Txn),
ok = case IsDurable of
- true -> rabbit_msg_store:remove(?PERSISTENT_MSG_STORE,
- persistent_guids(Pubs));
+ true -> msg_store_remove(MSCState, true, persistent_guids(Pubs));
false -> ok
end,
{lists:append(AckTags), a(State)}.
-tx_commit(Txn, Fun, State = #vqstate { durable = IsDurable }) ->
+tx_commit(Txn, Fun, MsgPropsFun,
+ State = #vqstate { durable = IsDurable,
+ msg_store_clients = MSCState }) ->
#tx { pending_acks = AckTags, pending_messages = Pubs } = lookup_tx(Txn),
erase_tx(Txn),
- PubsOrdered = lists:reverse(Pubs),
AckTags1 = lists:append(AckTags),
- PersistentGuids = persistent_guids(PubsOrdered),
+ PersistentGuids = persistent_guids(Pubs),
HasPersistentPubs = PersistentGuids =/= [],
{AckTags1,
a(case IsDurable andalso HasPersistentPubs of
- true -> ok = rabbit_msg_store:sync(
- ?PERSISTENT_MSG_STORE, PersistentGuids,
- msg_store_callback(PersistentGuids,
- PubsOrdered, AckTags1, Fun)),
+ true -> ok = msg_store_sync(
+ MSCState, true, PersistentGuids,
+ msg_store_callback(PersistentGuids, Pubs, AckTags1,
+ Fun, MsgPropsFun)),
State;
- false -> tx_commit_post_msg_store(
- HasPersistentPubs, PubsOrdered, AckTags1, Fun, State)
+ false -> tx_commit_post_msg_store(HasPersistentPubs, Pubs, AckTags1,
+ Fun, MsgPropsFun, State)
end)}.
-requeue(AckTags, State) ->
+requeue(AckTags, MsgPropsFun, State) ->
a(reduce_memory_use(
- ack(fun rabbit_msg_store:release/2,
- fun (#msg_status { msg = Msg }, State1) ->
- {_SeqId, State2} = publish(Msg, true, false, State1),
+ ack(fun msg_store_release/3,
+ fun (#msg_status { msg = Msg, msg_props = MsgProps }, State1) ->
+ {_SeqId, State2} = publish(Msg, MsgPropsFun(MsgProps),
+ true, false, State1),
State2;
- ({IsPersistent, Guid}, State1) ->
+ ({IsPersistent, Guid, MsgProps}, State1) ->
#vqstate { msg_store_clients = MSCState } = State1,
{{ok, Msg = #basic_message{}}, MSCState1} =
- read_from_msg_store(MSCState, IsPersistent, Guid),
+ msg_store_read(MSCState, IsPersistent, Guid),
State2 = State1 #vqstate { msg_store_clients = MSCState1 },
- {_SeqId, State3} = publish(Msg, true, true, State2),
+ {_SeqId, State3} = publish(Msg, MsgPropsFun(MsgProps),
+ true, true, State2),
State3
end,
AckTags, State))).
@@ -783,27 +811,54 @@ one_if(false) -> 0.
cons_if(true, E, L) -> [E | L];
cons_if(false, _E, L) -> L.
-msg_status(IsPersistent, SeqId, Msg = #basic_message { guid = Guid }) ->
+msg_status(IsPersistent, SeqId, Msg = #basic_message { guid = Guid },
+ MsgProps) ->
#msg_status { seq_id = SeqId, guid = Guid, msg = Msg,
is_persistent = IsPersistent, is_delivered = false,
- msg_on_disk = false, index_on_disk = false }.
+ msg_on_disk = false, index_on_disk = false,
+ msg_props = MsgProps }.
+
+with_msg_store_state({MSCStateP, MSCStateT}, true, Fun) ->
+ {Result, MSCStateP1} = Fun(MSCStateP),
+ {Result, {MSCStateP1, MSCStateT}};
+with_msg_store_state({MSCStateP, MSCStateT}, false, Fun) ->
+ {Result, MSCStateT1} = Fun(MSCStateT),
+ {Result, {MSCStateP, MSCStateT1}}.
+
+with_immutable_msg_store_state(MSCState, IsPersistent, Fun) ->
+ {Res, MSCState} = with_msg_store_state(MSCState, IsPersistent,
+ fun (MSCState1) ->
+ {Fun(MSCState1), MSCState1}
+ end),
+ Res.
+
+msg_store_client_init(MsgStore) ->
+ rabbit_msg_store:client_init(MsgStore, rabbit_guid:guid()).
+
+msg_store_write(MSCState, IsPersistent, Guid, Msg) ->
+ with_immutable_msg_store_state(
+ MSCState, IsPersistent,
+ fun (MSCState1) -> rabbit_msg_store:write(Guid, Msg, MSCState1) end).
-find_msg_store(true) -> ?PERSISTENT_MSG_STORE;
-find_msg_store(false) -> ?TRANSIENT_MSG_STORE.
+msg_store_read(MSCState, IsPersistent, Guid) ->
+ with_msg_store_state(
+ MSCState, IsPersistent,
+ fun (MSCState1) -> rabbit_msg_store:read(Guid, MSCState1) end).
-with_msg_store_state({{MSCStateP, PRef}, MSCStateT}, true, Fun) ->
- {Result, MSCStateP1} = Fun(?PERSISTENT_MSG_STORE, MSCStateP),
- {Result, {{MSCStateP1, PRef}, MSCStateT}};
-with_msg_store_state({MSCStateP, {MSCStateT, TRef}}, false, Fun) ->
- {Result, MSCStateT1} = Fun(?TRANSIENT_MSG_STORE, MSCStateT),
- {Result, {MSCStateP, {MSCStateT1, TRef}}}.
+msg_store_remove(MSCState, IsPersistent, Guids) ->
+ with_immutable_msg_store_state(
+ MSCState, IsPersistent,
+ fun (MCSState1) -> rabbit_msg_store:remove(Guids, MCSState1) end).
-read_from_msg_store(MSCState, IsPersistent, Guid) ->
- with_msg_store_state(
+msg_store_release(MSCState, IsPersistent, Guids) ->
+ with_immutable_msg_store_state(
MSCState, IsPersistent,
- fun (MsgStore, MSCState1) ->
- rabbit_msg_store:read(MsgStore, Guid, MSCState1)
- end).
+ fun (MCSState1) -> rabbit_msg_store:release(Guids, MCSState1) end).
+
+msg_store_sync(MSCState, IsPersistent, Guids, Callback) ->
+ with_immutable_msg_store_state(
+ MSCState, IsPersistent,
+ fun (MSCState1) -> rabbit_msg_store:sync(Guids, Callback, MSCState1) end).
maybe_write_delivered(false, _SeqId, IndexState) ->
IndexState;
@@ -821,12 +876,13 @@ store_tx(Txn, Tx) -> put({txn, Txn}, Tx).
erase_tx(Txn) -> erase({txn, Txn}).
persistent_guids(Pubs) ->
- [Guid || #basic_message { guid = Guid, is_persistent = true } <- Pubs].
+ [Guid || {#basic_message { guid = Guid,
+ is_persistent = true }, _MsgProps} <- Pubs].
betas_from_index_entries(List, TransientThreshold, IndexState) ->
{Filtered, Delivers, Acks} =
lists:foldr(
- fun ({Guid, SeqId, IsPersistent, IsDelivered},
+ fun ({Guid, SeqId, MsgProps, IsPersistent, IsDelivered},
{Filtered1, Delivers1, Acks1}) ->
case SeqId < TransientThreshold andalso not IsPersistent of
true -> {Filtered1,
@@ -838,7 +894,8 @@ betas_from_index_entries(List, TransientThreshold, IndexState) ->
is_persistent = IsPersistent,
is_delivered = IsDelivered,
msg_on_disk = true,
- index_on_disk = true
+ index_on_disk = true,
+ msg_props = MsgProps
}) | Filtered1],
Delivers1,
Acks1}
@@ -885,22 +942,69 @@ update_rate(Now, Then, Count, {OThen, OCount}) ->
%% Internal major helpers for Public API
%%----------------------------------------------------------------------------
-msg_store_callback(PersistentGuids, Pubs, AckTags, Fun) ->
+init(IsDurable, IndexState, DeltaCount, Terms,
+ PersistentClient, TransientClient) ->
+ {LowSeqId, NextSeqId, IndexState1} = rabbit_queue_index:bounds(IndexState),
+
+ DeltaCount1 = proplists:get_value(persistent_count, Terms, DeltaCount),
+ Delta = case DeltaCount1 == 0 andalso DeltaCount /= undefined of
+ true -> ?BLANK_DELTA;
+ false -> #delta { start_seq_id = LowSeqId,
+ count = DeltaCount1,
+ end_seq_id = NextSeqId }
+ end,
+ Now = now(),
+ State = #vqstate {
+ q1 = queue:new(),
+ q2 = bpqueue:new(),
+ delta = Delta,
+ q3 = bpqueue:new(),
+ q4 = queue:new(),
+ next_seq_id = NextSeqId,
+ pending_ack = dict:new(),
+ index_state = IndexState1,
+ msg_store_clients = {PersistentClient, TransientClient},
+ on_sync = ?BLANK_SYNC,
+ durable = IsDurable,
+ transient_threshold = NextSeqId,
+
+ len = DeltaCount1,
+ persistent_count = DeltaCount1,
+
+ target_ram_msg_count = infinity,
+ ram_msg_count = 0,
+ ram_msg_count_prev = 0,
+ ram_index_count = 0,
+ out_counter = 0,
+ in_counter = 0,
+ rates = #rates { egress = {Now, 0},
+ ingress = {Now, DeltaCount1},
+ avg_egress = 0.0,
+ avg_ingress = 0.0,
+ timestamp = Now } },
+ a(maybe_deltas_to_betas(State)).
+
+msg_store_callback(PersistentGuids, Pubs, AckTags, Fun, MsgPropsFun) ->
Self = self(),
F = fun () -> rabbit_amqqueue:maybe_run_queue_via_backing_queue(
Self, fun (StateN) -> tx_commit_post_msg_store(
- true, Pubs, AckTags, Fun, StateN)
+ true, Pubs, AckTags,
+ Fun, MsgPropsFun, StateN)
end)
end,
fun () -> spawn(fun () -> ok = rabbit_misc:with_exit_handler(
- fun () -> rabbit_msg_store:remove(
- ?PERSISTENT_MSG_STORE,
+ fun () -> remove_persistent_messages(
PersistentGuids)
end, F)
end)
end.
-tx_commit_post_msg_store(HasPersistentPubs, Pubs, AckTags, Fun,
+remove_persistent_messages(Guids) ->
+ PersistentClient = msg_store_client_init(?PERSISTENT_MSG_STORE),
+ ok = rabbit_msg_store:remove(Guids, PersistentClient),
+ rabbit_msg_store:client_delete_and_terminate(PersistentClient).
+
+tx_commit_post_msg_store(HasPersistentPubs, Pubs, AckTags, Fun, MsgPropsFun,
State = #vqstate {
on_sync = OnSync = #sync {
acks_persistent = SPAcks,
@@ -913,23 +1017,27 @@ tx_commit_post_msg_store(HasPersistentPubs, Pubs, AckTags, Fun,
case IsDurable of
true -> [AckTag || AckTag <- AckTags,
case dict:fetch(AckTag, PA) of
- #msg_status {} -> false;
- {IsPersistent, _Guid} -> IsPersistent
+ #msg_status {} ->
+ false;
+ {IsPersistent, _Guid, _MsgProps} ->
+ IsPersistent
end];
false -> []
end,
case IsDurable andalso (HasPersistentPubs orelse PersistentAcks =/= []) of
- true -> State #vqstate { on_sync = #sync {
- acks_persistent = [PersistentAcks | SPAcks],
- acks_all = [AckTags | SAcks],
- pubs = [Pubs | SPubs],
- funs = [Fun | SFuns] }};
+ true -> State #vqstate {
+ on_sync = #sync {
+ acks_persistent = [PersistentAcks | SPAcks],
+ acks_all = [AckTags | SAcks],
+ pubs = [{MsgPropsFun, Pubs} | SPubs],
+ funs = [Fun | SFuns] }};
false -> State1 = tx_commit_index(
- State #vqstate { on_sync = #sync {
- acks_persistent = [],
- acks_all = [AckTags],
- pubs = [Pubs],
- funs = [Fun] } }),
+ State #vqstate {
+ on_sync = #sync {
+ acks_persistent = [],
+ acks_all = [AckTags],
+ pubs = [{MsgPropsFun, Pubs}],
+ funs = [Fun] } }),
State1 #vqstate { on_sync = OnSync }
end.
@@ -943,13 +1051,16 @@ tx_commit_index(State = #vqstate { on_sync = #sync {
durable = IsDurable }) ->
PAcks = lists:append(SPAcks),
Acks = lists:append(SAcks),
- Pubs = lists:append(lists:reverse(SPubs)),
+ Pubs = [{Msg, Fun(MsgProps)} || {Fun, PubsN} <- lists:reverse(SPubs),
+ {Msg, MsgProps} <- lists:reverse(PubsN)],
{SeqIds, State1 = #vqstate { index_state = IndexState }} =
lists:foldl(
- fun (Msg = #basic_message { is_persistent = IsPersistent },
+ fun ({Msg = #basic_message { is_persistent = IsPersistent },
+ MsgProps},
{SeqIdsAcc, State2}) ->
IsPersistent1 = IsDurable andalso IsPersistent,
- {SeqId, State3} = publish(Msg, false, IsPersistent1, State2),
+ {SeqId, State3} =
+ publish(Msg, MsgProps, false, IsPersistent1, State2),
{cons_if(IsPersistent1, SeqId, SeqIdsAcc), State3}
end, {PAcks, ack(Acks, State)}, Pubs),
IndexState1 = rabbit_queue_index:sync(SeqIds, IndexState),
@@ -958,13 +1069,14 @@ tx_commit_index(State = #vqstate { on_sync = #sync {
State1 #vqstate { index_state = IndexState1, on_sync = ?BLANK_SYNC }).
purge_betas_and_deltas(LensByStore,
- State = #vqstate { q3 = Q3,
- index_state = IndexState }) ->
+ State = #vqstate { q3 = Q3,
+ index_state = IndexState,
+ msg_store_clients = MSCState }) ->
case bpqueue:is_empty(Q3) of
true -> {LensByStore, State};
- false -> {LensByStore1, IndexState1} = remove_queue_entries(
- fun beta_fold/3, Q3,
- LensByStore, IndexState),
+ false -> {LensByStore1, IndexState1} =
+ remove_queue_entries(fun beta_fold/3, Q3,
+ LensByStore, IndexState, MSCState),
purge_betas_and_deltas(LensByStore1,
maybe_deltas_to_betas(
State #vqstate {
@@ -972,11 +1084,11 @@ purge_betas_and_deltas(LensByStore,
index_state = IndexState1 }))
end.
-remove_queue_entries(Fold, Q, LensByStore, IndexState) ->
+remove_queue_entries(Fold, Q, LensByStore, IndexState, MSCState) ->
{GuidsByStore, Delivers, Acks} =
Fold(fun remove_queue_entries1/2, {orddict:new(), [], []}, Q),
- ok = orddict:fold(fun (MsgStore, Guids, ok) ->
- rabbit_msg_store:remove(MsgStore, Guids)
+ ok = orddict:fold(fun (IsPersistent, Guids, ok) ->
+ msg_store_remove(MSCState, IsPersistent, Guids)
end, ok, GuidsByStore),
{sum_guids_by_store_to_len(LensByStore, GuidsByStore),
rabbit_queue_index:ack(Acks,
@@ -988,8 +1100,7 @@ remove_queue_entries1(
index_on_disk = IndexOnDisk, is_persistent = IsPersistent },
{GuidsByStore, Delivers, Acks}) ->
{case MsgOnDisk of
- true -> rabbit_misc:orddict_cons(find_msg_store(IsPersistent), Guid,
- GuidsByStore);
+ true -> rabbit_misc:orddict_cons(IsPersistent, Guid, GuidsByStore);
false -> GuidsByStore
end,
cons_if(IndexOnDisk andalso not IsDelivered, SeqId, Delivers),
@@ -997,8 +1108,8 @@ remove_queue_entries1(
sum_guids_by_store_to_len(LensByStore, GuidsByStore) ->
orddict:fold(
- fun (MsgStore, Guids, LensByStore1) ->
- orddict:update_counter(MsgStore, length(Guids), LensByStore1)
+ fun (IsPersistent, Guids, LensByStore1) ->
+ orddict:update_counter(IsPersistent, length(Guids), LensByStore1)
end, LensByStore, GuidsByStore).
%%----------------------------------------------------------------------------
@@ -1006,7 +1117,7 @@ sum_guids_by_store_to_len(LensByStore, GuidsByStore) ->
%%----------------------------------------------------------------------------
publish(Msg = #basic_message { is_persistent = IsPersistent },
- IsDelivered, MsgOnDisk,
+ MsgProps, IsDelivered, MsgOnDisk,
State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4,
next_seq_id = SeqId,
len = Len,
@@ -1015,8 +1126,8 @@ publish(Msg = #basic_message { is_persistent = IsPersistent },
durable = IsDurable,
ram_msg_count = RamMsgCount }) ->
IsPersistent1 = IsDurable andalso IsPersistent,
- MsgStatus = (msg_status(IsPersistent1, SeqId, Msg))
- #msg_status { is_delivered = IsDelivered, msg_on_disk = MsgOnDisk },
+ MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, MsgProps))
+ #msg_status { is_delivered = IsDelivered, msg_on_disk = MsgOnDisk},
{MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State),
State2 = case bpqueue:is_empty(Q3) of
false -> State1 #vqstate { q1 = queue:in(m(MsgStatus1), Q1) };
@@ -1030,38 +1141,35 @@ publish(Msg = #basic_message { is_persistent = IsPersistent },
ram_msg_count = RamMsgCount + 1}}.
maybe_write_msg_to_disk(_Force, MsgStatus = #msg_status {
- msg_on_disk = true }, MSCState) ->
- {MsgStatus, MSCState};
+ msg_on_disk = true }, _MSCState) ->
+ MsgStatus;
maybe_write_msg_to_disk(Force, MsgStatus = #msg_status {
msg = Msg, guid = Guid,
is_persistent = IsPersistent }, MSCState)
when Force orelse IsPersistent ->
- {ok, MSCState1} =
- with_msg_store_state(
- MSCState, IsPersistent,
- fun (MsgStore, MSCState2) ->
- Msg1 = Msg #basic_message {
- %% don't persist any recoverable decoded properties
- content = rabbit_binary_parser:clear_decoded_content(
- Msg #basic_message.content)},
- rabbit_msg_store:write(MsgStore, Guid, Msg1, MSCState2)
- end),
- {MsgStatus #msg_status { msg_on_disk = true }, MSCState1};
-maybe_write_msg_to_disk(_Force, MsgStatus, MSCState) ->
- {MsgStatus, MSCState}.
+ Msg1 = Msg #basic_message {
+ %% don't persist any recoverable decoded properties
+ content = rabbit_binary_parser:clear_decoded_content(
+ Msg #basic_message.content)},
+ ok = msg_store_write(MSCState, IsPersistent, Guid, Msg1),
+ MsgStatus #msg_status { msg_on_disk = true };
+maybe_write_msg_to_disk(_Force, MsgStatus, _MSCState) ->
+ MsgStatus.
maybe_write_index_to_disk(_Force, MsgStatus = #msg_status {
index_on_disk = true }, IndexState) ->
true = MsgStatus #msg_status.msg_on_disk, %% ASSERTION
{MsgStatus, IndexState};
maybe_write_index_to_disk(Force, MsgStatus = #msg_status {
- guid = Guid, seq_id = SeqId,
+ guid = Guid,
+ seq_id = SeqId,
is_persistent = IsPersistent,
- is_delivered = IsDelivered }, IndexState)
+ is_delivered = IsDelivered,
+ msg_props = MsgProps}, IndexState)
when Force orelse IsPersistent ->
true = MsgStatus #msg_status.msg_on_disk, %% ASSERTION
- IndexState1 = rabbit_queue_index:publish(Guid, SeqId, IsPersistent,
- IndexState),
+ IndexState1 = rabbit_queue_index:publish(
+ Guid, SeqId, MsgProps, IsPersistent, IndexState),
{MsgStatus #msg_status { index_on_disk = true },
maybe_write_delivered(IsDelivered, SeqId, IndexState1)};
maybe_write_index_to_disk(_Force, MsgStatus, IndexState) ->
@@ -1070,43 +1178,44 @@ maybe_write_index_to_disk(_Force, MsgStatus, IndexState) ->
maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus,
State = #vqstate { index_state = IndexState,
msg_store_clients = MSCState }) ->
- {MsgStatus1, MSCState1} = maybe_write_msg_to_disk(
- ForceMsg, MsgStatus, MSCState),
- {MsgStatus2, IndexState1} = maybe_write_index_to_disk(
- ForceIndex, MsgStatus1, IndexState),
- {MsgStatus2, State #vqstate { index_state = IndexState1,
- msg_store_clients = MSCState1 }}.
+ MsgStatus1 = maybe_write_msg_to_disk(ForceMsg, MsgStatus, MSCState),
+ {MsgStatus2, IndexState1} =
+ maybe_write_index_to_disk(ForceIndex, MsgStatus1, IndexState),
+ {MsgStatus2, State #vqstate { index_state = IndexState1 }}.
%%----------------------------------------------------------------------------
%% Internal gubbins for acks
%%----------------------------------------------------------------------------
-record_pending_ack(#msg_status { guid = Guid, seq_id = SeqId,
+record_pending_ack(#msg_status { seq_id = SeqId,
+ guid = Guid,
is_persistent = IsPersistent,
- msg_on_disk = MsgOnDisk } = MsgStatus, PA) ->
+ msg_on_disk = MsgOnDisk,
+ msg_props = MsgProps } = MsgStatus, PA) ->
AckEntry = case MsgOnDisk of
- true -> {IsPersistent, Guid};
+ true -> {IsPersistent, Guid, MsgProps};
false -> MsgStatus
end,
dict:store(SeqId, AckEntry, PA).
remove_pending_ack(KeepPersistent,
- State = #vqstate { pending_ack = PA,
- index_state = IndexState }) ->
+ State = #vqstate { pending_ack = PA,
+ index_state = IndexState,
+ msg_store_clients = MSCState }) ->
{SeqIds, GuidsByStore} = dict:fold(fun accumulate_ack/3,
{[], orddict:new()}, PA),
State1 = State #vqstate { pending_ack = dict:new() },
case KeepPersistent of
- true -> case orddict:find(?TRANSIENT_MSG_STORE, GuidsByStore) of
+ true -> case orddict:find(false, GuidsByStore) of
error -> State1;
- {ok, Guids} -> ok = rabbit_msg_store:remove(
- ?TRANSIENT_MSG_STORE, Guids),
+ {ok, Guids} -> ok = msg_store_remove(MSCState, false,
+ Guids),
State1
end;
false -> IndexState1 = rabbit_queue_index:ack(SeqIds, IndexState),
ok = orddict:fold(
- fun (MsgStore, Guids, ok) ->
- rabbit_msg_store:remove(MsgStore, Guids)
+ fun (IsPersistent, Guids, ok) ->
+ msg_store_remove(MSCState, IsPersistent, Guids)
end, ok, GuidsByStore),
State1 #vqstate { index_state = IndexState1 }
end.
@@ -1114,8 +1223,10 @@ remove_pending_ack(KeepPersistent,
ack(_MsgStoreFun, _Fun, [], State) ->
State;
ack(MsgStoreFun, Fun, AckTags, State) ->
- {{SeqIds, GuidsByStore}, State1 = #vqstate { index_state = IndexState,
- persistent_count = PCount }} =
+ {{SeqIds, GuidsByStore},
+ State1 = #vqstate { index_state = IndexState,
+ msg_store_clients = MSCState,
+ persistent_count = PCount }} =
lists:foldl(
fun (SeqId, {Acc, State2 = #vqstate { pending_ack = PA }}) ->
{ok, AckEntry} = dict:find(SeqId, PA),
@@ -1124,8 +1235,8 @@ ack(MsgStoreFun, Fun, AckTags, State) ->
pending_ack = dict:erase(SeqId, PA) })}
end, {{[], orddict:new()}, State}, AckTags),
IndexState1 = rabbit_queue_index:ack(SeqIds, IndexState),
- ok = orddict:fold(fun (MsgStore, Guids, ok) ->
- MsgStoreFun(MsgStore, Guids)
+ ok = orddict:fold(fun (IsPersistent, Guids, ok) ->
+ MsgStoreFun(MSCState, IsPersistent, Guids)
end, ok, GuidsByStore),
PCount1 = PCount - find_persistent_count(sum_guids_by_store_to_len(
orddict:new(), GuidsByStore)),
@@ -1136,12 +1247,12 @@ accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS
msg_on_disk = false,
index_on_disk = false }, Acc) ->
Acc;
-accumulate_ack(SeqId, {IsPersistent, Guid}, {SeqIdsAcc, Dict}) ->
+accumulate_ack(SeqId, {IsPersistent, Guid, _MsgProps}, {SeqIdsAcc, Dict}) ->
{cons_if(IsPersistent, SeqId, SeqIdsAcc),
- rabbit_misc:orddict_cons(find_msg_store(IsPersistent), Guid, Dict)}.
+ rabbit_misc:orddict_cons(IsPersistent, Guid, Dict)}.
find_persistent_count(LensByStore) ->
- case orddict:find(?PERSISTENT_MSG_STORE, LensByStore) of
+ case orddict:find(true, LensByStore) of
error -> 0;
{ok, Len} -> Len
end.
@@ -1238,40 +1349,33 @@ chunk_size(Current, Permitted)
chunk_size(Current, Permitted) ->
lists:min([Current - Permitted, ?IO_BATCH_SIZE]).
-fetch_from_q3_to_q4(State = #vqstate {
- q1 = Q1,
- q2 = Q2,
- delta = #delta { count = DeltaCount },
- q3 = Q3,
- q4 = Q4,
- ram_msg_count = RamMsgCount,
- ram_index_count = RamIndexCount,
- msg_store_clients = MSCState }) ->
+fetch_from_q3(State = #vqstate {
+ q1 = Q1,
+ q2 = Q2,
+ delta = #delta { count = DeltaCount },
+ q3 = Q3,
+ q4 = Q4,
+ ram_index_count = RamIndexCount}) ->
case bpqueue:out(Q3) of
{empty, _Q3} ->
{empty, State};
- {{value, IndexOnDisk, MsgStatus = #msg_status {
- msg = undefined, guid = Guid,
- is_persistent = IsPersistent }}, Q3a} ->
- {{ok, Msg = #basic_message {}}, MSCState1} =
- read_from_msg_store(MSCState, IsPersistent, Guid),
- Q4a = queue:in(m(MsgStatus #msg_status { msg = Msg }), Q4),
+ {{value, IndexOnDisk, MsgStatus}, Q3a} ->
RamIndexCount1 = RamIndexCount - one_if(not IndexOnDisk),
true = RamIndexCount1 >= 0, %% ASSERTION
- State1 = State #vqstate { q3 = Q3a,
- q4 = Q4a,
- ram_msg_count = RamMsgCount + 1,
- ram_index_count = RamIndexCount1,
- msg_store_clients = MSCState1 },
+ State1 = State #vqstate { q3 = Q3a,
+ ram_index_count = RamIndexCount1 },
State2 =
case {bpqueue:is_empty(Q3a), 0 == DeltaCount} of
{true, true} ->
%% q3 is now empty, it wasn't before; delta is
- %% still empty. So q2 must be empty, and q1
- %% can now be joined onto q4
+ %% still empty. So q2 must be empty, and we
+ %% know q4 is empty otherwise we wouldn't be
+ %% loading from q3. As such, we can just set
+ %% q4 to Q1.
true = bpqueue:is_empty(Q2), %% ASSERTION
+ true = queue:is_empty(Q4), %% ASSERTION
State1 #vqstate { q1 = queue:new(),
- q4 = queue:join(Q4a, Q1) };
+ q4 = Q1 };
{true, false} ->
maybe_deltas_to_betas(State1);
{false, _} ->
@@ -1280,7 +1384,7 @@ fetch_from_q3_to_q4(State = #vqstate {
%% delta and q3 are maintained
State1
end,
- {loaded, State2}
+ {loaded, {MsgStatus, State2}}
end.
maybe_deltas_to_betas(State = #vqstate { delta = ?BLANK_DELTA_PATTERN(X) }) ->
@@ -1290,46 +1394,40 @@ maybe_deltas_to_betas(State = #vqstate {
delta = Delta,
q3 = Q3,
index_state = IndexState,
- target_ram_msg_count = TargetRamMsgCount,
transient_threshold = TransientThreshold }) ->
- case bpqueue:is_empty(Q3) orelse (TargetRamMsgCount /= 0) of
- false ->
- State;
- true ->
- #delta { start_seq_id = DeltaSeqId,
- count = DeltaCount,
- end_seq_id = DeltaSeqIdEnd } = Delta,
- DeltaSeqId1 =
- lists:min([rabbit_queue_index:next_segment_boundary(DeltaSeqId),
- DeltaSeqIdEnd]),
- {List, IndexState1} =
- rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1, IndexState),
- {Q3a, IndexState2} = betas_from_index_entries(
- List, TransientThreshold, IndexState1),
- State1 = State #vqstate { index_state = IndexState2 },
- case bpqueue:len(Q3a) of
+ #delta { start_seq_id = DeltaSeqId,
+ count = DeltaCount,
+ end_seq_id = DeltaSeqIdEnd } = Delta,
+ DeltaSeqId1 =
+ lists:min([rabbit_queue_index:next_segment_boundary(DeltaSeqId),
+ DeltaSeqIdEnd]),
+ {List, IndexState1} =
+ rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1, IndexState),
+ {Q3a, IndexState2} =
+ betas_from_index_entries(List, TransientThreshold, IndexState1),
+ State1 = State #vqstate { index_state = IndexState2 },
+ case bpqueue:len(Q3a) of
+ 0 ->
+ %% we ignored every message in the segment due to it being
+ %% transient and below the threshold
+ maybe_deltas_to_betas(
+ State1 #vqstate {
+ delta = Delta #delta { start_seq_id = DeltaSeqId1 }});
+ Q3aLen ->
+ Q3b = bpqueue:join(Q3, Q3a),
+ case DeltaCount - Q3aLen of
0 ->
- %% we ignored every message in the segment due to
- %% it being transient and below the threshold
- maybe_deltas_to_betas(
- State #vqstate {
- delta = Delta #delta { start_seq_id = DeltaSeqId1 }});
- Q3aLen ->
- Q3b = bpqueue:join(Q3, Q3a),
- case DeltaCount - Q3aLen of
- 0 ->
- %% delta is now empty, but it wasn't
- %% before, so can now join q2 onto q3
- State1 #vqstate { q2 = bpqueue:new(),
- delta = ?BLANK_DELTA,
- q3 = bpqueue:join(Q3b, Q2) };
- N when N > 0 ->
- Delta1 = #delta { start_seq_id = DeltaSeqId1,
- count = N,
- end_seq_id = DeltaSeqIdEnd },
- State1 #vqstate { delta = Delta1,
- q3 = Q3b }
- end
+ %% delta is now empty, but it wasn't before, so
+ %% can now join q2 onto q3
+ State1 #vqstate { q2 = bpqueue:new(),
+ delta = ?BLANK_DELTA,
+ q3 = bpqueue:join(Q3b, Q2) };
+ N when N > 0 ->
+ Delta1 = #delta { start_seq_id = DeltaSeqId1,
+ count = N,
+ end_seq_id = DeltaSeqIdEnd },
+ State1 #vqstate { delta = Delta1,
+ q3 = Q3b }
end
end.