summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTim Watson <tim@rabbitmq.com>2012-11-27 12:48:02 +0000
committerTim Watson <tim@rabbitmq.com>2012-11-27 12:48:02 +0000
commitb71fbc7c285dc19c15c36a1282a08ce862632ea2 (patch)
tree637d6b00c3d6b336ce72d638d326f03012fb1116
parent721de630fdd3e7817ece0e788475a0081fcbc749 (diff)
parentbf87506cea0640482af76fcb580229bed2e2c74e (diff)
downloadrabbitmq-server-b71fbc7c285dc19c15c36a1282a08ce862632ea2.tar.gz
merge default
-rw-r--r--codegen.py158
-rw-r--r--docs/rabbitmq-plugins.1.xml13
-rw-r--r--docs/rabbitmqctl.1.xml18
-rw-r--r--ebin/rabbit_app.in1
-rw-r--r--include/rabbit.hrl10
-rw-r--r--packaging/RPMS/Fedora/rabbitmq-server.spec3
-rw-r--r--packaging/debs/Debian/debian/changelog6
-rw-r--r--src/background_gc.erl81
-rw-r--r--src/rabbit.erl13
-rw-r--r--src/rabbit_alarm.erl8
-rw-r--r--src/rabbit_amqqueue.erl185
-rw-r--r--src/rabbit_amqqueue_process.erl288
-rw-r--r--src/rabbit_backing_queue.erl19
-rw-r--r--src/rabbit_backing_queue_qc.erl95
-rw-r--r--src/rabbit_basic.erl70
-rw-r--r--src/rabbit_binary_generator.erl156
-rw-r--r--src/rabbit_binary_parser.erl55
-rw-r--r--src/rabbit_binding.erl29
-rw-r--r--src/rabbit_channel.erl228
-rw-r--r--src/rabbit_channel_sup.erl2
-rw-r--r--src/rabbit_control_main.erl4
-rw-r--r--src/rabbit_event.erl10
-rw-r--r--src/rabbit_exchange.erl22
-rw-r--r--src/rabbit_mirror_queue_master.erl69
-rw-r--r--src/rabbit_mirror_queue_misc.erl33
-rw-r--r--src/rabbit_mirror_queue_slave.erl22
-rw-r--r--src/rabbit_misc.erl40
-rw-r--r--src/rabbit_mnesia.erl2
-rw-r--r--src/rabbit_net.erl42
-rw-r--r--src/rabbit_networking.erl2
-rw-r--r--src/rabbit_node_monitor.erl17
-rw-r--r--src/rabbit_plugins.erl21
-rw-r--r--src/rabbit_plugins_main.erl60
-rw-r--r--src/rabbit_reader.erl173
-rw-r--r--src/rabbit_tests.erl324
-rw-r--r--src/rabbit_variable_queue.erl118
-rw-r--r--src/rabbit_writer.erl83
37 files changed, 1461 insertions, 1019 deletions
diff --git a/codegen.py b/codegen.py
index 9483e854..5624658b 100644
--- a/codegen.py
+++ b/codegen.py
@@ -24,18 +24,6 @@ from amqp_codegen import *
import string
import re
-erlangTypeMap = {
- 'octet': 'octet',
- 'shortstr': 'shortstr',
- 'longstr': 'longstr',
- 'short': 'shortint',
- 'long': 'longint',
- 'longlong': 'longlongint',
- 'bit': 'bit',
- 'table': 'table',
- 'timestamp': 'timestamp',
-}
-
# Coming up with a proper encoding of AMQP tables in JSON is too much
# hassle at this stage. Given that the only default value we are
# interested in is for the empty table, we only support that.
@@ -123,7 +111,7 @@ def printFileHeader():
def genErl(spec):
def erlType(domain):
- return erlangTypeMap[spec.resolveDomain(domain)]
+ return erlangize(spec.resolveDomain(domain))
def fieldTypeList(fields):
return '[' + ', '.join([erlType(f.domain) for f in fields]) + ']'
@@ -186,11 +174,11 @@ def genErl(spec):
return p+'Len:32/unsigned, '+p+':'+p+'Len/binary'
elif type == 'octet':
return p+':8/unsigned'
- elif type == 'shortint':
+ elif type == 'short':
return p+':16/unsigned'
- elif type == 'longint':
+ elif type == 'long':
return p+':32/unsigned'
- elif type == 'longlongint':
+ elif type == 'longlong':
return p+':64/unsigned'
elif type == 'timestamp':
return p+':64/unsigned'
@@ -233,29 +221,23 @@ def genErl(spec):
def presentBin(fields):
ps = ', '.join(['P' + str(f.index) + ':1' for f in fields])
return '<<' + ps + ', _:%d, R0/binary>>' % (16 - len(fields),)
- def mkMacroName(field):
- return '?' + field.domain.upper() + '_PROP'
- def writePropFieldLine(field, bin_next = None):
+ def writePropFieldLine(field):
i = str(field.index)
- if not bin_next:
- bin_next = 'R' + str(field.index + 1)
- if field.domain in ['octet', 'timestamp']:
- print (" {%s, %s} = %s(%s, %s, %s, %s)," %
- ('F' + i, bin_next, mkMacroName(field), 'P' + i,
- 'R' + i, 'I' + i, 'X' + i))
+ if field.domain == 'bit':
+ print " {F%s, R%s} = {P%s =/= 0, R%s}," % \
+ (i, str(field.index + 1), i, i)
else:
- print (" {%s, %s} = %s(%s, %s, %s, %s, %s)," %
- ('F' + i, bin_next, mkMacroName(field), 'P' + i,
- 'R' + i, 'L' + i, 'S' + i, 'X' + i))
+ print " {F%s, R%s} = if P%s =:= 0 -> {undefined, R%s}; true -> ?%s_VAL(R%s, L%s, V%s, X%s) end," % \
+ (i, str(field.index + 1), i, i, erlType(field.domain).upper(), i, i, i, i)
if len(c.fields) == 0:
- print "decode_properties(%d, _) ->" % (c.index,)
+ print "decode_properties(%d, <<>>) ->" % (c.index,)
else:
print ("decode_properties(%d, %s) ->" %
(c.index, presentBin(c.fields)))
- for field in c.fields[:-1]:
+ for field in c.fields:
writePropFieldLine(field)
- writePropFieldLine(c.fields[-1], "<<>>")
+ print " <<>> = %s," % ('R' + str(len(c.fields)))
print " #'P_%s'{%s};" % (erlangize(c.name), fieldMapList(c.fields))
def genFieldPreprocessing(packed):
@@ -283,9 +265,27 @@ def genErl(spec):
print " <<%s>>;" % (', '.join([methodFieldFragment(f) for f in packedFields]))
def genEncodeProperties(c):
+ def presentBin(fields):
+ ps = ', '.join(['P' + str(f.index) + ':1' for f in fields])
+ return '<<' + ps + ', 0:%d>>' % (16 - len(fields),)
+ def writePropFieldLine(field):
+ i = str(field.index)
+ if field.domain == 'bit':
+ print " {P%s, R%s} = {F%s =:= 1, R%s}," % \
+ (i, str(field.index + 1), i, i)
+ else:
+ print " {P%s, R%s} = if F%s =:= undefined -> {0, R%s}; true -> {1, [?%s_PROP(F%s, L%s) | R%s]} end," % \
+ (i, str(field.index + 1), i, i, erlType(field.domain).upper(), i, i, i)
+
print "encode_properties(#'P_%s'{%s}) ->" % (erlangize(c.name), fieldMapList(c.fields))
- print " rabbit_binary_generator:encode_properties(%s, %s);" % \
- (fieldTypeList(c.fields), fieldTempList(c.fields))
+ if len(c.fields) == 0:
+ print " <<>>;"
+ else:
+ print " R0 = [<<>>],"
+ for field in c.fields:
+ writePropFieldLine(field)
+ print " list_to_binary([%s | lists:reverse(R%s)]);" % \
+ (presentBin(c.fields), str(len(c.fields)))
def messageConstantClass(cls):
# We do this because 0.8 uses "soft error" and 8.1 uses "soft-error".
@@ -350,8 +350,8 @@ def genErl(spec):
'table' | 'byte' | 'double' | 'float' | 'long' |
'short' | 'bool' | 'binary' | 'void' | 'array').
-type(amqp_property_type() ::
- 'shortstr' | 'longstr' | 'octet' | 'shortint' | 'longint' |
- 'longlongint' | 'timestamp' | 'bit' | 'table').
+ 'shortstr' | 'longstr' | 'octet' | 'short' | 'long' |
+ 'longlong' | 'timestamp' | 'bit' | 'table').
-type(amqp_table() :: [{binary(), amqp_field_type(), amqp_value()}]).
-type(amqp_array() :: [{amqp_field_type(), amqp_value()}]).
@@ -429,25 +429,78 @@ shortstr_size(S) ->
_ -> exit(method_field_shortstr_overflow)
end.
--define(SHORTSTR_PROP(P, R, L, S, X),
- if P =:= 0 -> {undefined, R};
- true -> <<L:8/unsigned, S:L/binary, X/binary>> = R,
- {S, X}
+-define(SHORTSTR_VAL(R, L, V, X),
+ begin
+ <<L:8/unsigned, V:L/binary, X/binary>> = R,
+ {V, X}
+ end).
+
+-define(LONGSTR_VAL(R, L, V, X),
+ begin
+ <<L:32/unsigned, V:L/binary, X/binary>> = R,
+ {V, X}
+ end).
+
+-define(SHORT_VAL(R, L, V, X),
+ begin
+ <<V:8/unsigned, X/binary>> = R,
+ {V, X}
+ end).
+
+-define(LONG_VAL(R, L, V, X),
+ begin
+ <<V:32/unsigned, X/binary>> = R,
+ {V, X}
+ end).
+
+-define(LONGLONG_VAL(R, L, V, X),
+ begin
+ <<V:64/unsigned, X/binary>> = R,
+ {V, X}
+ end).
+
+-define(OCTET_VAL(R, L, V, X),
+ begin
+ <<V:8/unsigned, X/binary>> = R,
+ {V, X}
end).
--define(TABLE_PROP(P, R, L, T, X),
- if P =:= 0 -> {undefined, R};
- true -> <<L:32/unsigned, T:L/binary, X/binary>> = R,
- {rabbit_binary_parser:parse_table(T), X}
+
+-define(TABLE_VAL(R, L, V, X),
+ begin
+ <<L:32/unsigned, V:L/binary, X/binary>> = R,
+ {rabbit_binary_parser:parse_table(V), X}
end).
--define(OCTET_PROP(P, R, I, X),
- if P =:= 0 -> {undefined, R};
- true -> <<I:8/unsigned, X/binary>> = R,
- {I, X}
+
+-define(TIMESTAMP_VAL(R, L, V, X),
+ begin
+ <<V:64/unsigned, X/binary>> = R,
+ {V, X}
end).
--define(TIMESTAMP_PROP(P, R, I, X),
- if P =:= 0 -> {undefined, R};
- true -> <<I:64/unsigned, X/binary>> = R,
- {I, X}
+
+-define(SHORTSTR_PROP(X, L),
+ begin
+ L = size(X),
+ if L < 256 -> <<L:8, X:L/binary>>;
+ true -> exit(content_properties_shortstr_overflow)
+ end
+ end).
+
+-define(LONGSTR_PROP(X, L),
+ begin
+ L = size(X),
+ <<L:32, X:L/binary>>
+ end).
+
+-define(OCTET_PROP(X, L), <<X:8/unsigned>>).
+-define(SHORT_PROP(X, L), <<X:16/unsigned>>).
+-define(LONG_PROP(X, L), <<X:32/unsigned>>).
+-define(LONGLONG_PROP(X, L), <<X:64/unsigned>>).
+-define(TIMESTAMP_PROP(X, L), <<X:64/unsigned>>).
+
+-define(TABLE_PROP(X, T),
+ begin
+ T = rabbit_binary_generator:generate_table(X),
+ <<(size(T)):32, T/binary>>
end).
"""
version = "{%d, %d, %d}" % (spec.major, spec.minor, spec.revision)
@@ -497,9 +550,6 @@ shortstr_size(S) ->
print "amqp_exception(_Code) -> undefined."
def genHrl(spec):
- def erlType(domain):
- return erlangTypeMap[spec.resolveDomain(domain)]
-
def fieldNameList(fields):
return ', '.join([erlangize(f.name) for f in fields])
diff --git a/docs/rabbitmq-plugins.1.xml b/docs/rabbitmq-plugins.1.xml
index 5d74c6e1..8ecb4fc8 100644
--- a/docs/rabbitmq-plugins.1.xml
+++ b/docs/rabbitmq-plugins.1.xml
@@ -96,11 +96,13 @@
</varlistentry>
</variablelist>
<para>
- Lists available plugins, their versions, dependencies and
+ Lists all plugins, their versions, dependencies and
descriptions. Each plugin is prefixed with a status
indicator - [ ] to indicate that the plugin is not
enabled, [E] to indicate that it is explicitly enabled,
- and [e] to indicate that it is implicitly enabled.
+ [e] to indicate that it is implicitly enabled, and [!] to
+ indicate that it is enabled but missing and thus not
+ operational.
</para>
<para>
If the optional pattern is given, only plugins whose
@@ -109,16 +111,15 @@
<para role="example-prefix">For example:</para>
<screen role="example">rabbitmq-plugins list</screen>
<para role="example">
- This command lists all the plugins available, on one line each.
+ This command lists all plugins, on one line each.
</para>
<screen role="example">rabbitmq-plugins list -v </screen>
<para role="example">
- This command lists all the plugins available.
+ This command lists all plugins.
</para>
<screen role="example">rabbitmq-plugins list -v management</screen>
<para role="example">
- This command lists all the plugins available, but does not
- display plugins whose name does not contain "management".
+ This command lists all plugins whose name contains "management".
</para>
<screen role="example">rabbitmq-plugins list -e rabbit</screen>
<para role="example">
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index 3082fe14..34947b66 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -1282,22 +1282,26 @@
<listitem><para>Readable name for the connection.</para></listitem>
</varlistentry>
<varlistentry>
- <term>address</term>
- <listitem><para>Server IP address.</para></listitem>
- </varlistentry>
- <varlistentry>
<term>port</term>
<listitem><para>Server port.</para></listitem>
</varlistentry>
<varlistentry>
- <term>peer_address</term>
- <listitem><para>Peer address.</para></listitem>
+ <term>host</term>
+ <listitem><para>Server hostname obtained via reverse
+ DNS, or its IP address if reverse DNS failed or was
+ not enabled.</para></listitem>
</varlistentry>
<varlistentry>
<term>peer_port</term>
<listitem><para>Peer port.</para></listitem>
</varlistentry>
<varlistentry>
+ <term>peer_host</term>
+ <listitem><para>Peer hostname obtained via reverse
+ DNS, or its IP address if reverse DNS failed or was
+ not enabled.</para></listitem>
+ </varlistentry>
+ <varlistentry>
<term>ssl</term>
<listitem><para>Boolean indicating whether the
connection is secured with SSL.</para></listitem>
@@ -1414,7 +1418,7 @@
</variablelist>
<para>
If no <command>connectioninfoitem</command>s are
- specified then user, peer address, peer port, time since
+ specified then user, peer host, peer port, time since
flow control and memory block state are displayed.
</para>
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in
index 9b1ff8bd..16dfd196 100644
--- a/ebin/rabbit_app.in
+++ b/ebin/rabbit_app.in
@@ -43,6 +43,7 @@
{trace_vhosts, []},
{log_levels, [{connection, info}]},
{ssl_cert_login_from, distinguished_name},
+ {reverse_dns_lookups, false},
{tcp_listen_options, [binary,
{packet, raw},
{reuseaddr, true},
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index 3db2b68a..b2832b45 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -95,11 +95,21 @@
-define(PROTOCOL_VERSION, "AMQP 0-9-1 / 0-9 / 0-8").
-define(ERTS_MINIMUM, "5.6.3").
+%% EMPTY_FRAME_SIZE, 8 = 1 + 2 + 4 + 1
+%% - 1 byte of frame type
+%% - 2 bytes of channel number
+%% - 4 bytes of frame payload length
+%% - 1 byte of payload trailer FRAME_END byte
+%% See rabbit_binary_generator:check_empty_frame_size/0, an assertion
+%% called at startup.
+-define(EMPTY_FRAME_SIZE, 8).
+
-define(MAX_WAIT, 16#ffffffff).
-define(HIBERNATE_AFTER_MIN, 1000).
-define(DESIRED_HIBERNATE, 10000).
-define(CREDIT_DISC_BOUND, {2000, 500}).
+-define(INVALID_HEADERS_KEY, <<"x-invalid-headers">>).
-define(ROUTING_HEADERS, [<<"CC">>, <<"BCC">>]).
-define(DELETED_HEADER, <<"BCC">>).
diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec
index d73c5634..5d9b9e2e 100644
--- a/packaging/RPMS/Fedora/rabbitmq-server.spec
+++ b/packaging/RPMS/Fedora/rabbitmq-server.spec
@@ -123,6 +123,9 @@ done
rm -rf %{buildroot}
%changelog
+* Fri Nov 16 2012 simon@rabbitmq.com 3.0.0-1
+- New Upstream Release
+
* Fri Dec 16 2011 steve@rabbitmq.com 2.7.1-1
- New Upstream Release
diff --git a/packaging/debs/Debian/debian/changelog b/packaging/debs/Debian/debian/changelog
index b3743c39..17327133 100644
--- a/packaging/debs/Debian/debian/changelog
+++ b/packaging/debs/Debian/debian/changelog
@@ -1,3 +1,9 @@
+rabbitmq-server (3.0.0-1) unstable; urgency=low
+
+ * New Upstream Release
+
+ -- Simon MacMullen <simon@rabbitmq.com> Fri, 16 Nov 2012 14:15:29 +0000
+
rabbitmq-server (2.7.1-1) natty; urgency=low
* New Upstream Release
diff --git a/src/background_gc.erl b/src/background_gc.erl
new file mode 100644
index 00000000..3dbce330
--- /dev/null
+++ b/src/background_gc.erl
@@ -0,0 +1,81 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%%
+
+-module(background_gc).
+
+-behaviour(gen_server2).
+
+-export([start_link/0, run/0]).
+-export([gc/0]). %% For run_interval only
+
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+-define(MAX_RATIO, 0.01).
+-define(IDEAL_INTERVAL, 60000).
+
+-record(state, {last_interval}).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(start_link/0 :: () -> {'ok', pid()} | {'error', any()}).
+-spec(run/0 :: () -> 'ok').
+-spec(gc/0 :: () -> 'ok').
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+start_link() -> gen_server2:start_link({local, ?MODULE}, ?MODULE, [],
+ [{timeout, infinity}]).
+
+run() -> gen_server2:cast(?MODULE, run).
+
+%%----------------------------------------------------------------------------
+
+init([]) -> {ok, interval_gc(#state{last_interval = ?IDEAL_INTERVAL})}.
+
+handle_call(Msg, _From, State) ->
+ {stop, {unexpected_call, Msg}, {unexpected_call, Msg}, State}.
+
+handle_cast(run, State) -> gc(), {noreply, State};
+
+handle_cast(Msg, State) -> {stop, {unexpected_cast, Msg}, State}.
+
+handle_info(run, State) -> {noreply, interval_gc(State)};
+
+handle_info(Msg, State) -> {stop, {unexpected_info, Msg}, State}.
+
+code_change(_OldVsn, State, _Extra) -> {ok, State}.
+
+terminate(_Reason, State) -> State.
+
+%%----------------------------------------------------------------------------
+
+interval_gc(State = #state{last_interval = LastInterval}) ->
+ {ok, Interval} = rabbit_misc:interval_operation(
+ {?MODULE, gc, []},
+ ?MAX_RATIO, ?IDEAL_INTERVAL, LastInterval),
+ erlang:send_after(Interval, self(), run),
+ State#state{last_interval = Interval}.
+
+gc() ->
+ [garbage_collect(P) || P <- processes(),
+ {status, waiting} == process_info(P, status)],
+ garbage_collect(), %% since we will never be waiting...
+ ok.
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 69f77824..c3a6d283 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -36,7 +36,7 @@
-rabbit_boot_step({codec_correctness_check,
[{description, "codec correctness check"},
{mfa, {rabbit_binary_generator,
- check_empty_content_body_frame_size,
+ check_empty_frame_size,
[]}},
{requires, pre_boot},
{enables, external_infrastructure}]}).
@@ -179,6 +179,12 @@
{mfa, {rabbit_node_monitor, notify_node_up, []}},
{requires, networking}]}).
+-rabbit_boot_step({background_gc,
+ [{description, "background garbage collection"},
+ {mfa, {rabbit_sup, start_restartable_child,
+ [background_gc]}},
+ {enables, networking}]}).
+
%%---------------------------------------------------------------------------
-include("rabbit_framing.hrl").
@@ -570,7 +576,10 @@ boot_delegate() ->
rabbit_sup:start_supervisor_child(delegate_sup, [Count]).
recover() ->
- rabbit_binding:recover(rabbit_exchange:recover(), rabbit_amqqueue:start()).
+ Qs = rabbit_amqqueue:recover(),
+ ok = rabbit_binding:recover(rabbit_exchange:recover(),
+ [QName || #amqqueue{name = QName} <- Qs]),
+ rabbit_amqqueue:start(Qs).
maybe_insert_default_data() ->
case rabbit_table:is_empty() of
diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl
index e6625b2b..d7d4d82a 100644
--- a/src/rabbit_alarm.erl
+++ b/src/rabbit_alarm.erl
@@ -55,8 +55,12 @@ start() ->
ok = gen_event:add_handler(?SERVER, ?MODULE, []),
{ok, MemoryWatermark} = application:get_env(vm_memory_high_watermark),
rabbit_sup:start_restartable_child(
- vm_memory_monitor, [MemoryWatermark, fun rabbit_alarm:set_alarm/1,
- fun rabbit_alarm:clear_alarm/1]),
+ vm_memory_monitor, [MemoryWatermark,
+ fun (Alarm) ->
+ background_gc:run(),
+ set_alarm(Alarm)
+ end,
+ fun clear_alarm/1]),
{ok, DiskLimit} = application:get_env(disk_free_limit),
rabbit_sup:start_restartable_child(rabbit_disk_monitor, [DiskLimit]),
ok.
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 6ad85b24..be7c7867 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -16,9 +16,11 @@
-module(rabbit_amqqueue).
--export([start/0, stop/0, declare/5, delete_immediately/1, delete/3, purge/1]).
+-export([recover/0, stop/0, start/1, declare/5,
+ delete_immediately/1, delete/3, purge/1]).
-export([pseudo_queue/2]).
--export([lookup/1, with/2, with_or_die/2, assert_equivalence/5,
+-export([lookup/1, not_found_or_absent/1, with/2, with/3, with_or_die/2,
+ assert_equivalence/5,
check_exclusive_access/2, with_exclusive_access_or_die/3,
stat/1, deliver/2, deliver_flow/2, requeue/3, ack/3, reject/4]).
-export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]).
@@ -32,7 +34,7 @@
-export([start_mirroring/1, stop_mirroring/1]).
%% internal
--export([internal_declare/2, internal_delete/2, run_backing_queue/3,
+-export([internal_declare/2, internal_delete/1, run_backing_queue/3,
set_ram_duration_target/2, set_maximum_since_use/2]).
-include("rabbit.hrl").
@@ -40,8 +42,6 @@
-define(INTEGER_ARG_TYPES, [byte, short, signedint, long]).
--define(MAX_EXPIRY_TIMER, 4294967295).
-
-define(MORE_CONSUMER_CREDIT_AFTER, 50).
-define(FAILOVER_WAIT_MILLIS, 100).
@@ -61,18 +61,21 @@
-type(ok_or_errors() ::
'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}).
-type(routing_result() :: 'routed' | 'unroutable').
--type(queue_or_not_found() :: rabbit_types:amqqueue() | 'not_found').
-
--spec(start/0 :: () -> [name()]).
+-type(queue_or_absent() :: rabbit_types:amqqueue() |
+ {'absent', rabbit_types:amqqueue()}).
+-type(not_found_or_absent() :: 'not_found' |
+ {'absent', rabbit_types:amqqueue()}).
+-spec(recover/0 :: () -> [rabbit_types:amqqueue()]).
-spec(stop/0 :: () -> 'ok').
+-spec(start/1 :: ([rabbit_types:amqqueue()]) -> 'ok').
-spec(declare/5 ::
(name(), boolean(), boolean(),
rabbit_framing:amqp_table(), rabbit_types:maybe(pid()))
- -> {'new' | 'existing', rabbit_types:amqqueue()} |
+ -> {'new' | 'existing' | 'absent', rabbit_types:amqqueue()} |
rabbit_types:channel_exit()).
-spec(internal_declare/2 ::
(rabbit_types:amqqueue(), boolean())
- -> queue_or_not_found() | rabbit_misc:thunk(queue_or_not_found())).
+ -> queue_or_absent() | rabbit_misc:thunk(queue_or_absent())).
-spec(update/2 ::
(name(),
fun((rabbit_types:amqqueue()) -> rabbit_types:amqqueue())) -> 'ok').
@@ -80,7 +83,10 @@
(name()) -> rabbit_types:ok(rabbit_types:amqqueue()) |
rabbit_types:error('not_found');
([name()]) -> [rabbit_types:amqqueue()]).
--spec(with/2 :: (name(), qfun(A)) -> A | rabbit_types:error('not_found')).
+-spec(not_found_or_absent/1 :: (name()) -> not_found_or_absent()).
+-spec(with/2 :: (name(), qfun(A)) ->
+ A | rabbit_types:error(not_found_or_absent())).
+-spec(with/3 :: (name(), qfun(A), fun((not_found_or_absent()) -> B)) -> A | B).
-spec(with_or_die/2 ::
(name(), qfun(A)) -> A | rabbit_types:channel_exit()).
-spec(assert_equivalence/5 ::
@@ -150,11 +156,11 @@
-spec(notify_sent_queue_down/1 :: (pid()) -> 'ok').
-spec(unblock/2 :: (pid(), pid()) -> 'ok').
-spec(flush_all/2 :: (qpids(), pid()) -> 'ok').
--spec(internal_delete/2 ::
- (name(), pid()) -> rabbit_types:ok_or_error('not_found') |
- rabbit_types:connection_exit() |
- fun (() -> rabbit_types:ok_or_error('not_found') |
- rabbit_types:connection_exit())).
+-spec(internal_delete/1 ::
+ (name()) -> rabbit_types:ok_or_error('not_found') |
+ rabbit_types:connection_exit() |
+ fun (() -> rabbit_types:ok_or_error('not_found') |
+ rabbit_types:connection_exit())).
-spec(run_backing_queue/3 ::
(pid(), atom(),
(fun ((atom(), A) -> {[rabbit_types:msg_id()], A}))) -> 'ok').
@@ -175,7 +181,7 @@
-define(CONSUMER_INFO_KEYS,
[queue_name, channel_pid, consumer_tag, ack_required]).
-start() ->
+recover() ->
%% Clear out remnants of old incarnation, in case we restarted
%% faster than other nodes handled DOWN messages from us.
on_node_down(node()),
@@ -195,6 +201,14 @@ stop() ->
{ok, BQ} = application:get_env(rabbit, backing_queue_module),
ok = BQ:stop().
+start(Qs) ->
+ %% At this point all recovered queues and their bindings are
+ %% visible to routing, so now it is safe for them to complete
+ %% their initialisation (which may involve interacting with other
+ %% queues).
+ [Pid ! {self(), go} || #amqqueue{pid = Pid} <- Qs],
+ ok.
+
find_durable_queues() ->
Node = node(),
%% TODO: use dirty ops instead
@@ -207,8 +221,8 @@ find_durable_queues() ->
recover_durable_queues(DurableQueues) ->
Qs = [start_queue_process(node(), Q) || Q <- DurableQueues],
- [QName || Q = #amqqueue{name = QName, pid = Pid} <- Qs,
- gen_server2:call(Pid, {init, true}, infinity) == {new, Q}].
+ [Q || Q = #amqqueue{pid = Pid} <- Qs,
+ gen_server2:call(Pid, {init, self()}, infinity) == {new, Q}].
declare(QueueName, Durable, AutoDelete, Args, Owner) ->
ok = check_declare_arguments(QueueName, Args),
@@ -223,10 +237,7 @@ declare(QueueName, Durable, AutoDelete, Args, Owner) ->
gm_pids = []}),
{Node, _MNodes} = rabbit_mirror_queue_misc:suggested_queue_nodes(Q0),
Q1 = start_queue_process(Node, Q0),
- case gen_server2:call(Q1#amqqueue.pid, {init, false}, infinity) of
- not_found -> rabbit_misc:not_found(QueueName);
- Q2 -> Q2
- end.
+ gen_server2:call(Q1#amqqueue.pid, {init, new}, infinity).
internal_declare(Q, true) ->
rabbit_misc:execute_mnesia_tx_with_tail(
@@ -236,18 +247,17 @@ internal_declare(Q = #amqqueue{name = QueueName}, false) ->
fun () ->
case mnesia:wread({rabbit_queue, QueueName}) of
[] ->
- case mnesia:read({rabbit_durable_queue, QueueName}) of
- [] -> Q1 = rabbit_policy:set(Q),
- ok = store_queue(Q1),
- B = add_default_binding(Q1),
- fun () -> B(), Q1 end;
- %% Q exists on stopped node
- [_] -> rabbit_misc:const(not_found)
+ case not_found_or_absent(QueueName) of
+ not_found -> Q1 = rabbit_policy:set(Q),
+ ok = store_queue(Q1),
+ B = add_default_binding(Q1),
+ fun () -> B(), Q1 end;
+ {absent, _Q} = R -> rabbit_misc:const(R)
end;
[ExistingQ = #amqqueue{pid = QPid}] ->
case rabbit_misc:is_process_alive(QPid) of
true -> rabbit_misc:const(ExistingQ);
- false -> TailFun = internal_delete(QueueName, QPid),
+ false -> TailFun = internal_delete(QueueName),
fun () -> TailFun(), ExistingQ end
end
end
@@ -274,8 +284,7 @@ store_queue(Q = #amqqueue{durable = false}) ->
ok = mnesia:write(rabbit_queue, Q, write),
ok.
-policy_changed(Q1, Q2) ->
- rabbit_mirror_queue_misc:update_mirrors(Q1, Q2).
+policy_changed(Q1, Q2) -> rabbit_mirror_queue_misc:update_mirrors(Q1, Q2).
start_queue_process(Node, Q) ->
{ok, Pid} = rabbit_amqqueue_sup:start_child(Node, [Q]),
@@ -296,28 +305,47 @@ lookup(Names) when is_list(Names) ->
lookup(Name) ->
rabbit_misc:dirty_read({rabbit_queue, Name}).
+not_found_or_absent(Name) ->
+ %% NB: we assume that the caller has already performed a lookup on
+ %% rabbit_queue and not found anything
+ case mnesia:read({rabbit_durable_queue, Name}) of
+ [] -> not_found;
+ [Q] -> {absent, Q} %% Q exists on stopped node
+ end.
+
+not_found_or_absent_dirty(Name) ->
+ %% We should read from both tables inside a tx, to get a
+ %% consistent view. But the chances of an inconsistency are small,
+ %% and only affect the error kind.
+ case rabbit_misc:dirty_read({rabbit_durable_queue, Name}) of
+ {error, not_found} -> not_found;
+ {ok, Q} -> {absent, Q}
+ end.
+
with(Name, F, E) ->
case lookup(Name) of
{ok, Q = #amqqueue{pid = QPid}} ->
%% We check is_process_alive(QPid) in case we receive a
%% nodedown (for example) in F() that has nothing to do
%% with the QPid.
- E1 = fun () ->
- case rabbit_misc:is_process_alive(QPid) of
- true -> E();
- false -> timer:sleep(25),
- with(Name, F, E)
- end
- end,
- rabbit_misc:with_exit_handler(E1, fun () -> F(Q) end);
+ rabbit_misc:with_exit_handler(
+ fun () ->
+ case rabbit_misc:is_process_alive(QPid) of
+ true -> E(not_found_or_absent_dirty(Name));
+ false -> timer:sleep(25),
+ with(Name, F, E)
+ end
+ end, fun () -> F(Q) end);
{error, not_found} ->
- E()
+ E(not_found_or_absent_dirty(Name))
end.
-with(Name, F) ->
- with(Name, F, fun () -> {error, not_found} end).
+with(Name, F) -> with(Name, F, fun (E) -> {error, E} end).
+
with_or_die(Name, F) ->
- with(Name, F, fun () -> rabbit_misc:not_found(Name) end).
+ with(Name, F, fun (not_found) -> rabbit_misc:not_found(Name);
+ ({absent, Q}) -> rabbit_misc:absent(Q)
+ end).
assert_equivalence(#amqqueue{durable = Durable,
auto_delete = AutoDelete} = Q,
@@ -352,8 +380,8 @@ assert_args_equivalence(#amqqueue{name = QueueName, arguments = Args},
Args, RequiredArgs, QueueName, [<<"x-expires">>, <<"x-message-ttl">>]).
check_declare_arguments(QueueName, Args) ->
- Checks = [{<<"x-expires">>, fun check_positive_int_arg/2},
- {<<"x-message-ttl">>, fun check_non_neg_int_arg/2},
+ Checks = [{<<"x-expires">>, fun check_expires_arg/2},
+ {<<"x-message-ttl">>, fun check_message_ttl_arg/2},
{<<"x-dead-letter-exchange">>, fun check_string_arg/2},
{<<"x-dead-letter-routing-key">>, fun check_dlxrk_arg/2}],
[case rabbit_misc:table_lookup(Args, Key) of
@@ -369,10 +397,8 @@ check_declare_arguments(QueueName, Args) ->
end || {Key, Fun} <- Checks],
ok.
-check_string_arg({longstr, _}, _Args) ->
- ok;
-check_string_arg({Type, _}, _) ->
- {error, {unacceptable_type, Type}}.
+check_string_arg({longstr, _}, _Args) -> ok;
+check_string_arg({Type, _}, _Args) -> {error, {unacceptable_type, Type}}.
check_int_arg({Type, _}, _) ->
case lists:member(Type, ?INTEGER_ARG_TYPES) of
@@ -380,20 +406,17 @@ check_int_arg({Type, _}, _) ->
false -> {error, {unacceptable_type, Type}}
end.
-check_positive_int_arg({Type, Val}, Args) ->
+check_expires_arg({Type, Val}, Args) ->
case check_int_arg({Type, Val}, Args) of
- ok when Val > ?MAX_EXPIRY_TIMER -> {error, {value_too_big, Val}};
- ok when Val > 0 -> ok;
- ok -> {error, {value_zero_or_less, Val}};
- Error -> Error
+ ok when Val == 0 -> {error, {value_zero, Val}};
+ ok -> rabbit_misc:check_expiry(Val);
+ Error -> Error
end.
-check_non_neg_int_arg({Type, Val}, Args) ->
+check_message_ttl_arg({Type, Val}, Args) ->
case check_int_arg({Type, Val}, Args) of
- ok when Val > ?MAX_EXPIRY_TIMER -> {error, {value_too_big, Val}};
- ok when Val >= 0 -> ok;
- ok -> {error, {value_less_than_zero, Val}};
- Error -> Error
+ ok -> rabbit_misc:check_expiry(Val);
+ Error -> Error
end.
check_dlxrk_arg({longstr, _}, Args) ->
@@ -401,11 +424,10 @@ check_dlxrk_arg({longstr, _}, Args) ->
undefined -> {error, routing_key_but_no_dlx_defined};
_ -> ok
end;
-check_dlxrk_arg({Type, _}, _Args) ->
+check_dlxrk_arg({Type, _}, _Args) ->
{error, {unacceptable_type, Type}}.
-list() ->
- mnesia:dirty_match_object(rabbit_queue, #amqqueue{_ = '_'}).
+list() -> mnesia:dirty_match_object(rabbit_queue, #amqqueue{_ = '_'}).
list(VHostPath) ->
mnesia:dirty_match_object(
@@ -416,8 +438,7 @@ info_keys() -> rabbit_amqqueue_process:info_keys().
map(VHostPath, F) -> rabbit_misc:filter_exit_map(F, list(VHostPath)).
-info(#amqqueue{ pid = QPid }) ->
- delegate_call(QPid, info).
+info(#amqqueue{ pid = QPid }) -> delegate_call(QPid, info).
info(#amqqueue{ pid = QPid }, Items) ->
case delegate_call(QPid, {info, Items}) of
@@ -434,8 +455,7 @@ info_all(VHostPath, Items) -> map(VHostPath, fun (Q) -> info(Q, Items) end).
%% the first place since a node failed). Therefore we keep poking at
%% the list of queues until we were able to talk to a live process or
%% the queue no longer exists.
-force_event_refresh() ->
- force_event_refresh([Q#amqqueue.name || Q <- list()]).
+force_event_refresh() -> force_event_refresh([Q#amqqueue.name || Q <- list()]).
force_event_refresh(QNames) ->
Qs = [Q || Q <- list(), lists:member(Q#amqqueue.name, QNames)],
@@ -452,8 +472,7 @@ force_event_refresh(QNames) ->
wake_up(#amqqueue{pid = QPid}) -> gen_server2:cast(QPid, wake_up).
-consumers(#amqqueue{ pid = QPid }) ->
- delegate_call(QPid, consumers).
+consumers(#amqqueue{ pid = QPid }) -> delegate_call(QPid, consumers).
consumer_info_keys() -> ?CONSUMER_INFO_KEYS.
@@ -467,8 +486,7 @@ consumers_all(VHostPath) ->
{ChPid, ConsumerTag, AckRequired} <- consumers(Q)]
end)).
-stat(#amqqueue{pid = QPid}) ->
- delegate_call(QPid, stat).
+stat(#amqqueue{pid = QPid}) -> delegate_call(QPid, stat).
delete_immediately(QPids) ->
[gen_server2:cast(QPid, delete_immediately) || QPid <- QPids],
@@ -483,11 +501,9 @@ deliver(Qs, Delivery) -> deliver(Qs, Delivery, noflow).
deliver_flow(Qs, Delivery) -> deliver(Qs, Delivery, flow).
-requeue(QPid, MsgIds, ChPid) ->
- delegate_call(QPid, {requeue, MsgIds, ChPid}).
+requeue(QPid, MsgIds, ChPid) -> delegate_call(QPid, {requeue, MsgIds, ChPid}).
-ack(QPid, MsgIds, ChPid) ->
- delegate_cast(QPid, {ack, MsgIds, ChPid}).
+ack(QPid, MsgIds, ChPid) -> delegate_cast(QPid, {ack, MsgIds, ChPid}).
reject(QPid, MsgIds, Requeue, ChPid) ->
delegate_cast(QPid, {reject, MsgIds, Requeue, ChPid}).
@@ -529,8 +545,7 @@ notify_sent_queue_down(QPid) ->
erase({consumer_credit_to, QPid}),
ok.
-unblock(QPid, ChPid) ->
- delegate_cast(QPid, {unblock, ChPid}).
+unblock(QPid, ChPid) -> delegate_cast(QPid, {unblock, ChPid}).
flush_all(QPids, ChPid) ->
delegate:invoke_no_result(
@@ -548,7 +563,7 @@ internal_delete1(QueueName) ->
%% after the transaction.
rabbit_binding:remove_for_destination(QueueName).
-internal_delete(QueueName, QPid) ->
+internal_delete(QueueName) ->
rabbit_misc:execute_mnesia_tx_with_tail(
fun () ->
case mnesia:wread({rabbit_queue, QueueName}) of
@@ -558,8 +573,7 @@ internal_delete(QueueName, QPid) ->
fun() ->
ok = T(),
ok = rabbit_event:notify(queue_deleted,
- [{pid, QPid},
- {name, QueueName}])
+ [{name, QueueName}])
end
end
end).
@@ -574,12 +588,12 @@ set_maximum_since_use(QPid, Age) ->
gen_server2:cast(QPid, {set_maximum_since_use, Age}).
start_mirroring(QPid) -> ok = delegate_call(QPid, start_mirroring).
-stop_mirroring(QPid) -> ok = delegate_call(QPid, stop_mirroring).
+stop_mirroring(QPid) -> ok = delegate_call(QPid, stop_mirroring).
on_node_down(Node) ->
rabbit_misc:execute_mnesia_tx_with_tail(
fun () -> QsDels =
- qlc:e(qlc:q([{{QName, Pid}, delete_queue(QName)} ||
+ qlc:e(qlc:q([{QName, delete_queue(QName)} ||
#amqqueue{name = QName, pid = Pid,
slave_pids = []}
<- mnesia:table(rabbit_queue),
@@ -592,10 +606,9 @@ on_node_down(Node) ->
fun () ->
T(),
lists:foreach(
- fun({QName, QPid}) ->
+ fun(QName) ->
ok = rabbit_event:notify(queue_deleted,
- [{pid, QPid},
- {name, QName}])
+ [{name, QName}])
end, Qs)
end
end).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 8d05a78c..5ddafba8 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -85,7 +85,7 @@
%%----------------------------------------------------------------------------
-define(STATISTICS_KEYS,
- [pid,
+ [name,
policy,
exclusive_consumer_pid,
exclusive_consumer_tag,
@@ -101,16 +101,14 @@
]).
-define(CREATION_EVENT_KEYS,
- [pid,
- name,
+ [name,
durable,
auto_delete,
arguments,
owner_pid
]).
--define(INFO_KEYS,
- ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]).
+-define(INFO_KEYS, [pid | ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [name]]).
%%----------------------------------------------------------------------------
@@ -183,7 +181,7 @@ terminate(Reason, State = #q{q = #amqqueue{name = QName},
fun (BQS) ->
BQS1 = BQ:delete_and_terminate(Reason, BQS),
%% don't care if the internal delete doesn't return 'ok'.
- rabbit_amqqueue:internal_delete(QName, self()),
+ rabbit_amqqueue:internal_delete(QName),
BQS1
end, State).
@@ -195,10 +193,8 @@ code_change(_OldVsn, State, _Extra) ->
declare(Recover, From, State = #q{q = Q,
backing_queue = BQ,
backing_queue_state = undefined}) ->
- case rabbit_amqqueue:internal_declare(Q, Recover) of
- not_found ->
- {stop, normal, not_found, State};
- Q1 ->
+ case rabbit_amqqueue:internal_declare(Q, Recover =/= new) of
+ #amqqueue{} = Q1 ->
case matches(Recover, Q, Q1) of
true ->
gen_server2:reply(From, {new, Q}),
@@ -208,6 +204,7 @@ declare(Recover, From, State = #q{q = Q,
self(), {rabbit_amqqueue,
set_ram_duration_target, [self()]}),
BQS = bq_init(BQ, Q, Recover),
+ recovery_barrier(Recover),
State1 = process_args(State#q{backing_queue_state = BQS}),
rabbit_event:notify(queue_created,
infos(?CREATION_EVENT_KEYS, State1)),
@@ -216,28 +213,39 @@ declare(Recover, From, State = #q{q = Q,
noreply(State1);
false ->
{stop, normal, {existing, Q1}, State}
- end
- end.
+ end;
+ Err ->
+ {stop, normal, Err, State}
+ end.
-matches(true, Q, Q) -> true;
-matches(true, _Q, _Q1) -> false;
-matches(false, Q1, Q2) ->
+matches(new, Q1, Q2) ->
%% i.e. not policy
- Q1#amqqueue.name =:= Q2#amqqueue.name andalso
- Q1#amqqueue.durable =:= Q2#amqqueue.durable andalso
- Q1#amqqueue.auto_delete =:= Q2#amqqueue.auto_delete andalso
+ Q1#amqqueue.name =:= Q2#amqqueue.name andalso
+ Q1#amqqueue.durable =:= Q2#amqqueue.durable andalso
+ Q1#amqqueue.auto_delete =:= Q2#amqqueue.auto_delete andalso
Q1#amqqueue.exclusive_owner =:= Q2#amqqueue.exclusive_owner andalso
- Q1#amqqueue.arguments =:= Q2#amqqueue.arguments andalso
- Q1#amqqueue.pid =:= Q2#amqqueue.pid andalso
- Q1#amqqueue.slave_pids =:= Q2#amqqueue.slave_pids.
+ Q1#amqqueue.arguments =:= Q2#amqqueue.arguments andalso
+ Q1#amqqueue.pid =:= Q2#amqqueue.pid andalso
+ Q1#amqqueue.slave_pids =:= Q2#amqqueue.slave_pids;
+matches(_, Q, Q) -> true;
+matches(_, _Q, _Q1) -> false.
bq_init(BQ, Q, Recover) ->
Self = self(),
- BQ:init(Q, Recover,
+ BQ:init(Q, Recover =/= new,
fun (Mod, Fun) ->
rabbit_amqqueue:run_backing_queue(Self, Mod, Fun)
end).
+recovery_barrier(new) ->
+ ok;
+recovery_barrier(BarrierPid) ->
+ MRef = erlang:monitor(process, BarrierPid),
+ receive
+ {BarrierPid, go} -> erlang:demonitor(MRef, [flush]);
+ {'DOWN', MRef, process, _, _} -> ok
+ end.
+
process_args(State = #q{q = #amqqueue{arguments = Arguments}}) ->
lists:foldl(
fun({Arg, Fun}, State1) ->
@@ -247,9 +255,9 @@ process_args(State = #q{q = #amqqueue{arguments = Arguments}}) ->
end
end, State,
[{<<"x-expires">>, fun init_expires/2},
- {<<"x-message-ttl">>, fun init_ttl/2},
{<<"x-dead-letter-exchange">>, fun init_dlx/2},
- {<<"x-dead-letter-routing-key">>, fun init_dlx_routing_key/2}]).
+ {<<"x-dead-letter-routing-key">>, fun init_dlx_routing_key/2},
+ {<<"x-message-ttl">>, fun init_ttl/2}]).
init_expires(Expires, State) -> ensure_expiry_timer(State#q{expires = Expires}).
@@ -267,7 +275,8 @@ terminate_shutdown(Fun, State) ->
case BQS of
undefined -> State1;
_ -> ok = rabbit_memory_monitor:deregister(self()),
- [emit_consumer_deleted(Ch, CTag)
+ QName = qname(State),
+ [emit_consumer_deleted(Ch, CTag, QName)
|| {Ch, CTag, _} <- consumers(State1)],
State1#q{backing_queue_state = Fun(BQS)}
end.
@@ -475,9 +484,10 @@ deliver_msg_to_consumer(DeliverFun,
{Stop, State1}.
deliver_from_queue_deliver(AckRequired, State) ->
- {{Message, IsDelivered, AckTag, Remaining}, State1} =
- fetch(AckRequired, State),
- {{Message, IsDelivered, AckTag}, 0 == Remaining, State1}.
+ {Result, State1} = fetch(AckRequired, State),
+ State2 = #q{backing_queue = BQ, backing_queue_state = BQS} =
+ drop_expired_messages(State1),
+ {Result, BQ:is_empty(BQS), State2}.
confirm_messages([], State) ->
State;
@@ -553,7 +563,7 @@ attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message},
deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid},
Delivered, State) ->
{Confirm, State1} = send_or_record_confirm(Delivery, State),
- Props = message_properties(Confirm, Delivered, State),
+ Props = message_properties(Message, Confirm, Delivered, State),
case attempt_delivery(Delivery, Props, State1) of
{true, State2} ->
State2;
@@ -576,14 +586,26 @@ fetch(AckRequired, State = #q{backing_queue = BQ,
{Result, BQS1} = BQ:fetch(AckRequired, BQS),
{Result, State#q{backing_queue_state = BQS1}}.
+ack(AckTags, ChPid, State) ->
+ subtract_acks(ChPid, AckTags, State,
+ fun (State1 = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ {_Guids, BQS1} = BQ:ack(AckTags, BQS),
+ State1#q{backing_queue_state = BQS1}
+ end).
+
+requeue(AckTags, ChPid, State) ->
+ subtract_acks(ChPid, AckTags, State,
+ fun (State1) -> requeue_and_run(AckTags, State1) end).
+
remove_consumer(ChPid, ConsumerTag, Queue) ->
queue:filter(fun ({CP, #consumer{tag = CTag}}) ->
(CP /= ChPid) or (CTag /= ConsumerTag)
end, Queue).
-remove_consumers(ChPid, Queue) ->
+remove_consumers(ChPid, Queue, QName) ->
queue:filter(fun ({CP, #consumer{tag = CTag}}) when CP =:= ChPid ->
- emit_consumer_deleted(ChPid, CTag),
+ emit_consumer_deleted(ChPid, CTag, QName),
false;
(_) ->
true
@@ -624,7 +646,8 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder,
C = #cr{ch_pid = ChPid,
acktags = ChAckTags,
blocked_consumers = Blocked} ->
- _ = remove_consumers(ChPid, Blocked), %% for stats emission
+ QName = qname(State),
+ _ = remove_consumers(ChPid, Blocked, QName), %% for stats emission
ok = erase_ch_record(C),
State1 = State#q{
exclusive_consumer = case Holder of
@@ -632,7 +655,8 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder,
Other -> Other
end,
active_consumers = remove_consumers(
- ChPid, State#q.active_consumers),
+ ChPid, State#q.active_consumers,
+ QName),
senders = Senders1},
case should_auto_delete(State1) of
true -> {stop, State1};
@@ -680,28 +704,36 @@ subtract_acks(ChPid, AckTags, State, Fun) ->
Fun(State)
end.
-message_properties(Confirm, Delivered, #q{ttl = TTL}) ->
- #message_properties{expiry = calculate_msg_expiry(TTL),
+message_properties(Message, Confirm, Delivered, #q{ttl = TTL}) ->
+ #message_properties{expiry = calculate_msg_expiry(Message, TTL),
needs_confirming = Confirm == eventually,
delivered = Delivered}.
-calculate_msg_expiry(undefined) -> undefined;
-calculate_msg_expiry(TTL) -> now_micros() + (TTL * 1000).
+calculate_msg_expiry(#basic_message{content = Content}, TTL) ->
+ #content{properties = Props} =
+ rabbit_binary_parser:ensure_content_decoded(Content),
+ %% We assert that the expiration must be valid - we check in the channel.
+ {ok, MsgTTL} = rabbit_basic:parse_expiration(Props),
+ case lists:min([TTL, MsgTTL]) of
+ undefined -> undefined;
+ T -> now_micros() + T * 1000
+ end.
-drop_expired_messages(State = #q{ttl = undefined}) ->
- State;
-drop_expired_messages(State = #q{backing_queue_state = BQS,
+drop_expired_messages(State = #q{dlx = DLX,
+ backing_queue_state = BQS,
backing_queue = BQ }) ->
Now = now_micros(),
- DLXFun = dead_letter_fun(expired, State),
ExpirePred = fun (#message_properties{expiry = Exp}) -> Now >= Exp end,
- {Props, BQS1} = case DLXFun of
+ {Props, BQS1} = case DLX of
undefined -> {Next, undefined, BQS2} =
BQ:dropwhile(ExpirePred, false, BQS),
{Next, BQS2};
_ -> {Next, Msgs, BQS2} =
BQ:dropwhile(ExpirePred, true, BQS),
- DLXFun(Msgs),
+ case Msgs of
+ [] -> ok;
+ _ -> (dead_letter_fun(expired))(Msgs)
+ end,
{Next, BQS2}
end,
ensure_ttl_timer(case Props of
@@ -711,8 +743,6 @@ drop_expired_messages(State = #q{backing_queue_state = BQS,
ensure_ttl_timer(undefined, State) ->
State;
-ensure_ttl_timer(_Expiry, State = #q{ttl = undefined}) ->
- State;
ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = undefined}) ->
After = (case Expiry - now_micros() of
V when V > 0 -> V + 999; %% always fire later
@@ -730,17 +760,7 @@ ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = TRef,
ensure_ttl_timer(_Expiry, State) ->
State.
-ack_if_no_dlx(AckTags, State = #q{dlx = undefined,
- backing_queue = BQ,
- backing_queue_state = BQS }) ->
- {_Guids, BQS1} = BQ:ack(AckTags, BQS),
- State#q{backing_queue_state = BQS1};
-ack_if_no_dlx(_AckTags, State) ->
- State.
-
-dead_letter_fun(_Reason, #q{dlx = undefined}) ->
- undefined;
-dead_letter_fun(Reason, _State) ->
+dead_letter_fun(Reason) ->
fun(Msgs) -> gen_server2:cast(self(), {dead_letter, Msgs, Reason}) end.
dead_letter_publish(Msg, Reason, X, State = #q{publish_seqno = MsgSeqNo}) ->
@@ -749,8 +769,8 @@ dead_letter_publish(Msg, Reason, X, State = #q{publish_seqno = MsgSeqNo}) ->
{Queues, Cycles} = detect_dead_letter_cycles(
DLMsg, rabbit_exchange:route(X, Delivery)),
lists:foreach(fun log_cycle_once/1, Cycles),
- QPids = rabbit_amqqueue:lookup(Queues),
- {_, DeliveredQPids} = rabbit_amqqueue:deliver(QPids, Delivery),
+ {_, DeliveredQPids} = rabbit_amqqueue:deliver(
+ rabbit_amqqueue:lookup(Queues), Delivery),
DeliveredQPids.
handle_queue_down(QPid, Reason, State = #q{queue_monitors = QMons,
@@ -772,17 +792,16 @@ handle_queue_down(QPid, Reason, State = #q{queue_monitors = QMons,
unconfirmed = UC1})
end.
-stop_later(Reason, State) ->
- stop_later(Reason, undefined, noreply, State).
+stop(State) -> stop(undefined, noreply, State).
-stop_later(Reason, From, Reply, State = #q{unconfirmed = UC}) ->
+stop(From, Reply, State = #q{unconfirmed = UC}) ->
case {dtree:is_empty(UC), Reply} of
{true, noreply} ->
- {stop, Reason, State};
+ {stop, normal, State};
{true, _} ->
- {stop, Reason, Reply, State};
+ {stop, normal, Reply, State};
{false, _} ->
- noreply(State#q{delayed_stop = {Reason, {From, Reply}}})
+ noreply(State#q{delayed_stop = {From, Reply}})
end.
cleanup_after_confirm(AckTags, State = #q{delayed_stop = DS,
@@ -793,11 +812,10 @@ cleanup_after_confirm(AckTags, State = #q{delayed_stop = DS,
State1 = State#q{backing_queue_state = BQS1},
case dtree:is_empty(UC) andalso DS =/= undefined of
true -> case DS of
- {_, {_, noreply}} -> ok;
- {_, {From, Reply}} -> gen_server2:reply(From, Reply)
+ {_, noreply} -> ok;
+ {From, Reply} -> gen_server2:reply(From, Reply)
end,
- {Reason, _} = DS,
- {stop, Reason, State1};
+ {stop, normal, State1};
false -> noreply(State1)
end.
@@ -853,8 +871,8 @@ make_dead_letter_msg(Reason,
{<<"time">>, timestamp, TimeSec},
{<<"exchange">>, longstr, Exchange#resource.name},
{<<"routing-keys">>, array, RKs1}],
- HeadersFun1(rabbit_basic:append_table_header(<<"x-death">>,
- Info, Headers))
+ HeadersFun1(rabbit_basic:prepend_table_header(<<"x-death">>,
+ Info, Headers))
end,
Content1 = rabbit_basic:map_headers(HeadersFun2, Content),
Msg#basic_message{exchange_name = DLX, id = rabbit_guid:gen(),
@@ -937,19 +955,19 @@ emit_stats(State) ->
emit_stats(State, Extra) ->
rabbit_event:notify(queue_stats, Extra ++ infos(?STATISTICS_KEYS, State)).
-emit_consumer_created(ChPid, ConsumerTag, Exclusive, AckRequired) ->
+emit_consumer_created(ChPid, ConsumerTag, Exclusive, AckRequired, QName) ->
rabbit_event:notify(consumer_created,
[{consumer_tag, ConsumerTag},
{exclusive, Exclusive},
{ack_required, AckRequired},
{channel, ChPid},
- {queue, self()}]).
+ {queue, QName}]).
-emit_consumer_deleted(ChPid, ConsumerTag) ->
+emit_consumer_deleted(ChPid, ConsumerTag, QName) ->
rabbit_event:notify(consumer_deleted,
[{consumer_tag, ConsumerTag},
{channel, ChPid},
- {queue, self()}]).
+ {queue, QName}]).
%%----------------------------------------------------------------------------
@@ -998,9 +1016,9 @@ handle_call({init, Recover}, From,
q = #amqqueue{name = QName} = Q} = State,
gen_server2:reply(From, not_found),
case Recover of
- true -> ok;
- _ -> rabbit_log:warning(
- "Queue ~p exclusive owner went away~n", [QName])
+ new -> rabbit_log:warning(
+ "Queue ~p exclusive owner went away~n", [QName]);
+ _ -> ok
end,
BQS = bq_init(BQ, Q, Recover),
%% Rely on terminate to delete the queue.
@@ -1029,10 +1047,11 @@ handle_call({notify_down, ChPid}, From, State) ->
%% are no longer visible by the time we send a response to the
%% client. The queue is ultimately deleted in terminate/2; if we
%% return stop with a reply, terminate/2 will be called by
- %% gen_server2 *before* the reply is sent.
+ %% gen_server2 *before* the reply is sent. FIXME: in case of a
+ %% delayed stop the reply is sent earlier.
case handle_ch_down(ChPid, State) of
{ok, State1} -> reply(ok, State1);
- {stop, State1} -> stop_later(normal, From, ok, State1)
+ {stop, State1} -> stop(From, ok, State1)
end;
handle_call({basic_get, ChPid, NoAck}, _From,
@@ -1042,8 +1061,8 @@ handle_call({basic_get, ChPid, NoAck}, _From,
case fetch(AckRequired, drop_expired_messages(State1)) of
{empty, State2} ->
reply(empty, State2);
- {{Message, IsDelivered, AckTag, Remaining}, State2} ->
- State3 =
+ {{Message, IsDelivered, AckTag}, State2} ->
+ State3 = #q{backing_queue = BQ, backing_queue_state = BQS} =
case AckRequired of
true -> C = #cr{acktags = ChAckTags} = ch_record(ChPid),
ChAckTags1 = sets:add_element(AckTag, ChAckTags),
@@ -1052,14 +1071,13 @@ handle_call({basic_get, ChPid, NoAck}, _From,
false -> State2
end,
Msg = {QName, self(), AckTag, IsDelivered, Message},
- reply({ok, Remaining, Msg}, State3)
+ reply({ok, BQ:len(BQS), Msg}, State3)
end;
handle_call({basic_consume, NoAck, ChPid, Limiter,
ConsumerTag, ExclusiveConsume, OkMsg},
- _From, State = #q{exclusive_consumer = ExistingHolder}) ->
- case check_exclusive_access(ExistingHolder, ExclusiveConsume,
- State) of
+ _From, State = #q{exclusive_consumer = Holder}) ->
+ case check_exclusive_access(Holder, ExclusiveConsume, State) of
in_use ->
reply({error, exclusive_consume_unavailable}, State);
ok ->
@@ -1068,7 +1086,7 @@ handle_call({basic_consume, NoAck, ChPid, Limiter,
Consumer = #consumer{tag = ConsumerTag,
ack_required = not NoAck},
ExclusiveConsumer = if ExclusiveConsume -> {ChPid, ConsumerTag};
- true -> ExistingHolder
+ true -> Holder
end,
State1 = State#q{has_had_consumers = true,
exclusive_consumer = ExclusiveConsumer},
@@ -1083,7 +1101,7 @@ handle_call({basic_consume, NoAck, ChPid, Limiter,
run_message_queue(State1#q{active_consumers = AC1})
end,
emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
- not NoAck),
+ not NoAck, qname(State2)),
reply(ok, State2)
end;
@@ -1094,7 +1112,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, From,
not_found ->
reply(ok, State);
C = #cr{blocked_consumers = Blocked} ->
- emit_consumer_deleted(ChPid, ConsumerTag),
+ emit_consumer_deleted(ChPid, ConsumerTag, qname(State)),
Blocked1 = remove_consumer(ChPid, ConsumerTag, Blocked),
update_consumer_count(C#cr{blocked_consumers = Blocked1}, -1),
State1 = State#q{
@@ -1104,10 +1122,10 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, From,
end,
active_consumers = remove_consumer(
ChPid, ConsumerTag,
- State#q.active_consumers)},
+ State#q.active_consumers)},
case should_auto_delete(State1) of
false -> reply(ok, ensure_expiry_timer(State1));
- true -> stop_later(normal, From, ok, State1)
+ true -> stop(From, ok, State1)
end
end;
@@ -1118,13 +1136,12 @@ handle_call(stat, _From, State) ->
handle_call({delete, IfUnused, IfEmpty}, From,
State = #q{backing_queue_state = BQS, backing_queue = BQ}) ->
- IsEmpty = BQ:is_empty(BQS),
+ IsEmpty = BQ:is_empty(BQS),
IsUnused = is_unused(State),
if
- IfEmpty and not(IsEmpty) -> reply({error, not_empty}, State);
- IfUnused and not(IsUnused) -> reply({error, in_use}, State);
- true -> stop_later(normal, From,
- {ok, BQ:len(BQS)}, State)
+ IfEmpty and not(IsEmpty) -> reply({error, not_empty}, State);
+ IfUnused and not(IsUnused) -> reply({error, in_use}, State);
+ true -> stop(From, {ok, BQ:len(BQS)}, State)
end;
handle_call(purge, _From, State = #q{backing_queue = BQ,
@@ -1134,9 +1151,7 @@ handle_call(purge, _From, State = #q{backing_queue = BQ,
handle_call({requeue, AckTags, ChPid}, From, State) ->
gen_server2:reply(From, ok),
- noreply(subtract_acks(
- ChPid, AckTags, State,
- fun (State1) -> requeue_and_run(AckTags, State1) end));
+ noreply(requeue(AckTags, ChPid, State));
handle_call(start_mirroring, _From, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
@@ -1158,11 +1173,13 @@ handle_call(stop_mirroring, _From, State = #q{backing_queue = BQ,
handle_call(force_event_refresh, _From,
State = #q{exclusive_consumer = Exclusive}) ->
rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State)),
+ QName = qname(State),
case Exclusive of
- none -> [emit_consumer_created(Ch, CTag, false, AckRequired) ||
+ none -> [emit_consumer_created(
+ Ch, CTag, false, AckRequired, QName) ||
{Ch, CTag, AckRequired} <- consumers(State)];
{Ch, CTag} -> [{Ch, CTag, AckRequired}] = consumers(State),
- emit_consumer_created(Ch, CTag, true, AckRequired)
+ emit_consumer_created(Ch, CTag, true, AckRequired, QName)
end,
reply(ok, State).
@@ -1196,36 +1213,27 @@ handle_cast({deliver, Delivery = #delivery{sender = Sender}, Delivered, Flow},
noreply(deliver_or_enqueue(Delivery, Delivered, State1));
handle_cast({ack, AckTags, ChPid}, State) ->
+ noreply(ack(AckTags, ChPid, State));
+
+handle_cast({reject, AckTags, true, ChPid}, State) ->
+ noreply(requeue(AckTags, ChPid, State));
+
+handle_cast({reject, AckTags, false, ChPid}, State = #q{dlx = undefined}) ->
+ noreply(ack(AckTags, ChPid, State));
+
+handle_cast({reject, AckTags, false, ChPid}, State) ->
+ DLXFun = dead_letter_fun(rejected),
noreply(subtract_acks(
ChPid, AckTags, State,
fun (State1 = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
- {_Guids, BQS1} = BQ:ack(AckTags, BQS),
+ BQS1 = BQ:foreach_ack(fun(M, A) -> DLXFun([{M, A}]) end,
+ BQS, AckTags),
State1#q{backing_queue_state = BQS1}
end));
-handle_cast({reject, AckTags, Requeue, ChPid}, State) ->
- noreply(subtract_acks(
- ChPid, AckTags, State,
- case Requeue of
- true -> fun (State1) -> requeue_and_run(AckTags, State1) end;
- false -> fun (State1 = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
- Fun =
- case dead_letter_fun(rejected, State1) of
- undefined -> undefined;
- F -> fun(M, A) -> F([{M, A}])
- end
- end,
- BQS1 = BQ:fold(Fun, BQS, AckTags),
- ack_if_no_dlx(
- AckTags,
- State1#q{backing_queue_state = BQS1})
- end
- end));
-
handle_cast(delete_immediately, State) ->
- stop_later(normal, State);
+ stop(State);
handle_cast({unblock, ChPid}, State) ->
noreply(
@@ -1271,18 +1279,24 @@ handle_cast({set_maximum_since_use, Age}, State) ->
handle_cast({dead_letter, Msgs, Reason}, State = #q{dlx = XName}) ->
case rabbit_exchange:lookup(XName) of
{ok, X} ->
- noreply(lists:foldl(
- fun({Msg, AckTag}, State1 = #q{publish_seqno = SeqNo,
- unconfirmed = UC,
- queue_monitors = QMon}) ->
- QPids = dead_letter_publish(Msg, Reason, X,
- State1),
- UC1 = dtree:insert(SeqNo, QPids, AckTag, UC),
- QMons = pmon:monitor_all(QPids, QMon),
- State1#q{queue_monitors = QMons,
- publish_seqno = SeqNo + 1,
- unconfirmed = UC1}
- end, State, Msgs));
+ {AckImmediately, State2} =
+ lists:foldl(
+ fun({Msg, AckTag},
+ {Acks, State1 = #q{publish_seqno = SeqNo,
+ unconfirmed = UC,
+ queue_monitors = QMons}}) ->
+ case dead_letter_publish(Msg, Reason, X, State1) of
+ [] -> {[AckTag | Acks], State1};
+ QPids -> UC1 = dtree:insert(
+ SeqNo, QPids, AckTag, UC),
+ QMons1 = pmon:monitor_all(QPids, QMons),
+ {Acks,
+ State1#q{publish_seqno = SeqNo + 1,
+ unconfirmed = UC1,
+ queue_monitors = QMons1}}
+ end
+ end, {[], State}, Msgs),
+ cleanup_after_confirm(AckImmediately, State2);
{error, not_found} ->
cleanup_after_confirm([AckTag || {_, AckTag} <- Msgs], State)
end;
@@ -1301,7 +1315,7 @@ handle_info(_, State = #q{delayed_stop = DS}) when DS =/= undefined ->
handle_info(maybe_expire, State) ->
case is_unused(State) of
- true -> stop_later(normal, State);
+ true -> stop(State);
false -> noreply(ensure_expiry_timer(State))
end;
@@ -1323,12 +1337,12 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason},
%% match what people expect (see bug 21824). However we need this
%% monitor-and-async- delete in case the connection goes away
%% unexpectedly.
- stop_later(normal, State);
+ stop(State);
handle_info({'DOWN', _MonitorRef, process, DownPid, Reason}, State) ->
case handle_ch_down(DownPid, State) of
{ok, State1} -> handle_queue_down(DownPid, Reason, State1);
- {stop, State1} -> stop_later(normal, State1)
+ {stop, State1} -> stop(State1)
end;
handle_info(update_ram_duration, State = #q{backing_queue = BQ,
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index af660c60..9e99ca5e 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -26,9 +26,9 @@
-type(msg_ids() :: [rabbit_types:msg_id()]).
-type(fetch_result(Ack) ::
- ('empty' |
- %% Message, IsDelivered, AckTag, Remaining_Len
- {rabbit_types:basic_message(), boolean(), Ack, non_neg_integer()})).
+ ('empty' | {rabbit_types:basic_message(), boolean(), Ack})).
+-type(drop_result(Ack) ::
+ ('empty' | {rabbit_types:msg_id(), Ack})).
-type(attempt_recovery() :: boolean()).
-type(purged_msg_count() :: non_neg_integer()).
-type(async_callback() ::
@@ -139,18 +139,27 @@
-callback fetch(true, state()) -> {fetch_result(ack()), state()};
(false, state()) -> {fetch_result(undefined), state()}.
+%% Remove the next message.
+-callback drop(true, state()) -> {drop_result(ack()), state()};
+ (false, state()) -> {drop_result(undefined), state()}.
+
%% Acktags supplied are for messages which can now be forgotten
%% about. Must return 1 msg_id per Ack, in the same order as Acks.
-callback ack([ack()], state()) -> {msg_ids(), state()}.
%% Acktags supplied are for messages which should be processed. The
%% provided callback function is called with each message.
--callback fold(msg_fun(), state(), [ack()]) -> state().
+-callback foreach_ack(msg_fun(), state(), [ack()]) -> state().
%% Reinsert messages into the queue which have already been delivered
%% and were pending acknowledgement.
-callback requeue([ack()], state()) -> {msg_ids(), state()}.
+%% Fold over all the messages in a queue and return the accumulated
+%% results, leaving the queue undisturbed.
+-callback fold(fun((rabbit_types:basic_message(), A) -> A), A, state())
+ -> {A, state()}.
+
%% How long is my queue?
-callback len(state()) -> non_neg_integer().
@@ -212,7 +221,7 @@ behaviour_info(callbacks) ->
[{start, 1}, {stop, 0}, {init, 3}, {terminate, 2},
{delete_and_terminate, 2}, {purge, 1}, {publish, 4},
{publish_delivered, 4}, {discard, 3}, {drain_confirmed, 1}, {dropwhile, 3},
- {fetch, 2}, {ack, 2}, {fold, 3}, {requeue, 2}, {len, 1},
+ {fetch, 2}, {ack, 2}, {foreach_ack, 3}, {requeue, 2}, {fold, 3}, {len, 1},
{is_empty, 1}, {depth, 1}, {set_ram_duration_target, 2},
{ram_duration, 1}, {needs_timeout, 1}, {timeout, 1},
{handle_pre_hibernate, 1}, {status, 1}, {invoke, 3}, {is_duplicate, 2}] ;
diff --git a/src/rabbit_backing_queue_qc.erl b/src/rabbit_backing_queue_qc.erl
index b37fbb29..03808859 100644
--- a/src/rabbit_backing_queue_qc.erl
+++ b/src/rabbit_backing_queue_qc.erl
@@ -85,17 +85,19 @@ backing_queue_test(Cmds) ->
%% Commands
-%% Command frequencies are tuned so that queues are normally reasonably
-%% short, but they may sometimes exceed ?QUEUE_MAXLEN. Publish-multiple
-%% and purging cause extreme queue lengths, so these have lower probabilities.
-%% Fetches are sufficiently frequent so that commands that need acktags
-%% get decent coverage.
+%% Command frequencies are tuned so that queues are normally
+%% reasonably short, but they may sometimes exceed
+%% ?QUEUE_MAXLEN. Publish-multiple and purging cause extreme queue
+%% lengths, so these have lower probabilities. Fetches/drops are
+%% sufficiently frequent so that commands that need acktags get decent
+%% coverage.
command(S) ->
frequency([{10, qc_publish(S)},
{1, qc_publish_delivered(S)},
{1, qc_publish_multiple(S)}, %% very slow
- {15, qc_fetch(S)}, %% needed for ack and requeue
+ {9, qc_fetch(S)}, %% needed for ack and requeue
+ {6, qc_drop(S)}, %%
{15, qc_ack(S)},
{15, qc_requeue(S)},
{3, qc_set_ram_duration_target(S)},
@@ -104,7 +106,8 @@ command(S) ->
{1, qc_dropwhile(S)},
{1, qc_is_empty(S)},
{1, qc_timeout(S)},
- {1, qc_purge(S)}]).
+ {1, qc_purge(S)},
+ {1, qc_fold(S)}]).
qc_publish(#state{bqstate = BQ}) ->
{call, ?BQMOD, publish,
@@ -124,6 +127,9 @@ qc_publish_delivered(#state{bqstate = BQ}) ->
qc_fetch(#state{bqstate = BQ}) ->
{call, ?BQMOD, fetch, [boolean(), BQ]}.
+qc_drop(#state{bqstate = BQ}) ->
+ {call, ?BQMOD, drop, [boolean(), BQ]}.
+
qc_ack(#state{bqstate = BQ, acks = Acks}) ->
{call, ?BQMOD, ack, [rand_choice(proplists:get_keys(Acks)), BQ]}.
@@ -152,6 +158,9 @@ qc_timeout(#state{bqstate = BQ}) ->
qc_purge(#state{bqstate = BQ}) ->
{call, ?BQMOD, purge, [BQ]}.
+qc_fold(#state{bqstate = BQ}) ->
+ {call, ?BQMOD, fold, [fun foldfun/2, foldacc(), BQ]}.
+
%% Preconditions
%% Create long queues by only allowing publishing
@@ -217,22 +226,10 @@ next_state(S, Res,
};
next_state(S, Res, {call, ?BQMOD, fetch, [AckReq, _BQ]}) ->
- #state{len = Len, messages = Messages, acks = Acks} = S,
- ResultInfo = {call, erlang, element, [1, Res]},
- BQ1 = {call, erlang, element, [2, Res]},
- AckTag = {call, erlang, element, [3, ResultInfo]},
- S1 = S#state{bqstate = BQ1},
- case gb_trees:is_empty(Messages) of
- true -> S1;
- false -> {SeqId, MsgProp_Msg, M2} = gb_trees:take_smallest(Messages),
- S2 = S1#state{len = Len - 1, messages = M2},
- case AckReq of
- true ->
- S2#state{acks = [{AckTag, {SeqId, MsgProp_Msg}}|Acks]};
- false ->
- S2
- end
- end;
+ next_state_fetch_and_drop(S, Res, AckReq, 3);
+
+next_state(S, Res, {call, ?BQMOD, drop, [AckReq, _BQ]}) ->
+ next_state_fetch_and_drop(S, Res, AckReq, 2);
next_state(S, Res, {call, ?BQMOD, ack, [AcksArg, _BQ]}) ->
#state{acks = AcksState} = S,
@@ -278,19 +275,38 @@ next_state(S, BQ, {call, ?MODULE, timeout, _Args}) ->
next_state(S, Res, {call, ?BQMOD, purge, _Args}) ->
BQ1 = {call, erlang, element, [2, Res]},
- S#state{bqstate = BQ1, len = 0, messages = gb_trees:empty()}.
+ S#state{bqstate = BQ1, len = 0, messages = gb_trees:empty()};
+
+next_state(S, Res, {call, ?BQMOD, fold, _Args}) ->
+ BQ1 = {call, erlang, element, [2, Res]},
+ S#state{bqstate = BQ1}.
%% Postconditions
postcondition(S, {call, ?BQMOD, fetch, _Args}, Res) ->
#state{messages = Messages, len = Len, acks = Acks, confirms = Confrms} = S,
case Res of
- {{MsgFetched, _IsDelivered, AckTag, RemainingLen}, _BQ} ->
+ {{MsgFetched, _IsDelivered, AckTag}, _BQ} ->
{_SeqId, {_MsgProps, Msg}} = gb_trees:smallest(Messages),
MsgFetched =:= Msg andalso
not proplists:is_defined(AckTag, Acks) andalso
not gb_sets:is_element(AckTag, Confrms) andalso
- RemainingLen =:= Len - 1;
+ Len =/= 0;
+ {empty, _BQ} ->
+ Len =:= 0
+ end;
+
+postcondition(S, {call, ?BQMOD, drop, _Args}, Res) ->
+ #state{messages = Messages, len = Len, acks = Acks, confirms = Confrms} = S,
+ case Res of
+ {{MsgIdFetched, AckTag}, _BQ} ->
+ {_SeqId, {_MsgProps, Msg}} = gb_trees:smallest(Messages),
+ MsgId = eval({call, erlang, element,
+ [?RECORD_INDEX(id, basic_message), Msg]}),
+ MsgIdFetched =:= MsgId andalso
+ not proplists:is_defined(AckTag, Acks) andalso
+ not gb_sets:is_element(AckTag, Confrms) andalso
+ Len =/= 0;
{empty, _BQ} ->
Len =:= 0
end;
@@ -313,6 +329,12 @@ postcondition(S, {call, ?BQMOD, drain_confirmed, _Args}, Res) ->
lists:all(fun (M) -> gb_sets:is_element(M, Confirms) end,
ReportedConfirmed);
+postcondition(S, {call, ?BQMOD, fold, _Args}, {Res, _BQ}) ->
+ #state{messages = Messages} = S,
+ lists:foldl(fun ({_SeqId, {_MsgProps, Msg}}, Acc) ->
+ foldfun(Msg, Acc)
+ end, foldacc(), gb_trees:to_list(Messages)) =:= Res;
+
postcondition(#state{bqstate = BQ, len = Len}, {call, _M, _F, _A}, _Res) ->
?BQMOD:len(BQ) =:= Len.
@@ -371,6 +393,9 @@ rand_choice(List, Selection, N) ->
rand_choice(List -- [Picked], [Picked | Selection],
N - 1).
+foldfun(Msg, Acc) -> [Msg | Acc].
+foldacc() -> [].
+
dropfun(Props) ->
Expiry = eval({call, erlang, element,
[?RECORD_INDEX(expiry, message_properties), Props]}),
@@ -388,6 +413,24 @@ drop_messages(Messages) ->
end
end.
+next_state_fetch_and_drop(S, Res, AckReq, AckTagIdx) ->
+ #state{len = Len, messages = Messages, acks = Acks} = S,
+ ResultInfo = {call, erlang, element, [1, Res]},
+ BQ1 = {call, erlang, element, [2, Res]},
+ AckTag = {call, erlang, element, [AckTagIdx, ResultInfo]},
+ S1 = S#state{bqstate = BQ1},
+ case gb_trees:is_empty(Messages) of
+ true -> S1;
+ false -> {SeqId, MsgProp_Msg, M2} = gb_trees:take_smallest(Messages),
+ S2 = S1#state{len = Len - 1, messages = M2},
+ case AckReq of
+ true ->
+ S2#state{acks = [{AckTag, {SeqId, MsgProp_Msg}}|Acks]};
+ false ->
+ S2
+ end
+ end.
+
-else.
-export([prop_disabled/0]).
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index db2b7e95..9bd1fad9 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -19,8 +19,9 @@
-include("rabbit_framing.hrl").
-export([publish/4, publish/5, publish/1,
- message/3, message/4, properties/1, append_table_header/3,
- extract_headers/1, map_headers/2, delivery/3, header_routes/1]).
+ message/3, message/4, properties/1, prepend_table_header/3,
+ extract_headers/1, map_headers/2, delivery/3, header_routes/1,
+ parse_expiration/1]).
-export([build_content/2, from_content/1]).
%%----------------------------------------------------------------------------
@@ -58,7 +59,7 @@
-spec(properties/1 ::
(properties_input()) -> rabbit_framing:amqp_property_record()).
--spec(append_table_header/3 ::
+-spec(prepend_table_header/3 ::
(binary(), rabbit_framing:amqp_table(), headers()) -> headers()).
-spec(extract_headers/1 :: (rabbit_types:content()) -> headers()).
@@ -72,6 +73,9 @@
binary() | [binary()]) -> rabbit_types:content()).
-spec(from_content/1 :: (rabbit_types:content()) ->
{rabbit_framing:amqp_property_record(), binary()}).
+-spec(parse_expiration/1 ::
+ (rabbit_framing:amqp_property_record())
+ -> rabbit_types:ok_or_error2('undefined' | non_neg_integer(), any())).
-endif.
@@ -177,15 +181,45 @@ properties(P) when is_list(P) ->
end
end, #'P_basic'{}, P).
-append_table_header(Name, Info, undefined) ->
- append_table_header(Name, Info, []);
-append_table_header(Name, Info, Headers) ->
- Prior = case rabbit_misc:table_lookup(Headers, Name) of
- undefined -> [];
- {array, Existing} -> Existing
- end,
+prepend_table_header(Name, Info, undefined) ->
+ prepend_table_header(Name, Info, []);
+prepend_table_header(Name, Info, Headers) ->
+ case rabbit_misc:table_lookup(Headers, Name) of
+ {array, Existing} ->
+ prepend_table(Name, Info, Existing, Headers);
+ undefined ->
+ prepend_table(Name, Info, [], Headers);
+ Other ->
+ Headers2 = prepend_table(Name, Info, [], Headers),
+ set_invalid_header(Name, Other, Headers2)
+ end.
+
+prepend_table(Name, Info, Prior, Headers) ->
rabbit_misc:set_table_value(Headers, Name, array, [{table, Info} | Prior]).
+set_invalid_header(Name, {_, _}=Value, Headers) when is_list(Headers) ->
+ case rabbit_misc:table_lookup(Headers, ?INVALID_HEADERS_KEY) of
+ undefined ->
+ set_invalid([{Name, array, [Value]}], Headers);
+ {table, ExistingHdr} ->
+ update_invalid(Name, Value, ExistingHdr, Headers);
+ Other ->
+ %% somehow the x-invalid-headers header is corrupt
+ Invalid = [{?INVALID_HEADERS_KEY, array, [Other]}],
+ set_invalid_header(Name, Value, set_invalid(Invalid, Headers))
+ end.
+
+set_invalid(NewHdr, Headers) ->
+ rabbit_misc:set_table_value(Headers, ?INVALID_HEADERS_KEY, table, NewHdr).
+
+update_invalid(Name, Value, ExistingHdr, Header) ->
+ Values = case rabbit_misc:table_lookup(ExistingHdr, Name) of
+ undefined -> [Value];
+ {array, Prior} -> [Value | Prior]
+ end,
+ NewHdr = rabbit_misc:set_table_value(ExistingHdr, Name, array, Values),
+ set_invalid(NewHdr, Header).
+
extract_headers(Content) ->
#content{properties = #'P_basic'{headers = Headers}} =
rabbit_binary_parser:ensure_content_decoded(Content),
@@ -224,3 +258,19 @@ header_routes(HeadersTable) ->
{Type, _Val} -> throw({error, {unacceptable_type_in_header,
binary_to_list(HeaderKey), Type}})
end || HeaderKey <- ?ROUTING_HEADERS]).
+
+parse_expiration(#'P_basic'{expiration = undefined}) ->
+ {ok, undefined};
+parse_expiration(#'P_basic'{expiration = Expiration}) ->
+ case string:to_integer(binary_to_list(Expiration)) of
+ {error, no_integer} = E ->
+ E;
+ {N, ""} ->
+ case rabbit_misc:check_expiry(N) of
+ ok -> {ok, N};
+ E = {error, _} -> E
+ end;
+ {_, S} ->
+ {error, {leftover_string, S}}
+ end.
+
diff --git a/src/rabbit_binary_generator.erl b/src/rabbit_binary_generator.erl
index d69376fb..a333c1ce 100644
--- a/src/rabbit_binary_generator.erl
+++ b/src/rabbit_binary_generator.erl
@@ -18,20 +18,11 @@
-include("rabbit_framing.hrl").
-include("rabbit.hrl").
-%% EMPTY_CONTENT_BODY_FRAME_SIZE, 8 = 1 + 2 + 4 + 1
-%% - 1 byte of frame type
-%% - 2 bytes of channel number
-%% - 4 bytes of frame payload length
-%% - 1 byte of payload trailer FRAME_END byte
-%% See definition of check_empty_content_body_frame_size/0,
-%% an assertion called at startup.
--define(EMPTY_CONTENT_BODY_FRAME_SIZE, 8).
-
-export([build_simple_method_frame/3,
build_simple_content_frames/4,
build_heartbeat_frame/0]).
--export([generate_table/1, encode_properties/2]).
--export([check_empty_content_body_frame_size/0]).
+-export([generate_table/1]).
+-export([check_empty_frame_size/0]).
-export([ensure_content_encoded/2, clear_encoded_content/1]).
-export([map_exception/3]).
@@ -51,9 +42,7 @@
-> [frame()]).
-spec(build_heartbeat_frame/0 :: () -> frame()).
-spec(generate_table/1 :: (rabbit_framing:amqp_table()) -> binary()).
--spec(encode_properties/2 ::
- ([rabbit_framing:amqp_property_type()], [any()]) -> binary()).
--spec(check_empty_content_body_frame_size/0 :: () -> 'ok').
+-spec(check_empty_frame_size/0 :: () -> 'ok').
-spec(ensure_content_encoded/2 ::
(rabbit_types:content(), rabbit_types:protocol()) ->
rabbit_types:encoded_content()).
@@ -88,10 +77,8 @@ build_simple_content_frames(ChannelInt, Content, FrameMax, Protocol) ->
[HeaderFrame | ContentFrames].
build_content_frames(FragsRev, FrameMax, ChannelInt) ->
- BodyPayloadMax = if FrameMax == 0 ->
- iolist_size(FragsRev);
- true ->
- FrameMax - ?EMPTY_CONTENT_BODY_FRAME_SIZE
+ BodyPayloadMax = if FrameMax == 0 -> iolist_size(FragsRev);
+ true -> FrameMax - ?EMPTY_FRAME_SIZE
end,
build_content_frames(0, [], BodyPayloadMax, [],
lists:reverse(FragsRev), BodyPayloadMax, ChannelInt).
@@ -129,51 +116,24 @@ create_frame(TypeInt, ChannelInt, Payload) ->
%% table_field_to_binary supports the AMQP 0-8/0-9 standard types, S,
%% I, D, T and F, as well as the QPid extensions b, d, f, l, s, t, x,
%% and V.
-
-table_field_to_binary({FName, Type, Value}) ->
- [short_string_to_binary(FName) | field_value_to_binary(Type, Value)].
-
-field_value_to_binary(longstr, Value) ->
- ["S", long_string_to_binary(Value)];
-
-field_value_to_binary(signedint, Value) ->
- ["I", <<Value:32/signed>>];
-
-field_value_to_binary(decimal, {Before, After}) ->
- ["D", Before, <<After:32>>];
-
-field_value_to_binary(timestamp, Value) ->
- ["T", <<Value:64>>];
-
-field_value_to_binary(table, Value) ->
- ["F", table_to_binary(Value)];
-
-field_value_to_binary(array, Value) ->
- ["A", array_to_binary(Value)];
-
-field_value_to_binary(byte, Value) ->
- ["b", <<Value:8/unsigned>>];
-
-field_value_to_binary(double, Value) ->
- ["d", <<Value:64/float>>];
-
-field_value_to_binary(float, Value) ->
- ["f", <<Value:32/float>>];
-
-field_value_to_binary(long, Value) ->
- ["l", <<Value:64/signed>>];
-
-field_value_to_binary(short, Value) ->
- ["s", <<Value:16/signed>>];
-
-field_value_to_binary(bool, Value) ->
- ["t", if Value -> 1; true -> 0 end];
-
-field_value_to_binary(binary, Value) ->
- ["x", long_string_to_binary(Value)];
-
-field_value_to_binary(void, _Value) ->
- ["V"].
+table_field_to_binary({FName, T, V}) ->
+ [short_string_to_binary(FName) | field_value_to_binary(T, V)].
+
+field_value_to_binary(longstr, V) -> ["S", long_string_to_binary(V)];
+field_value_to_binary(signedint, V) -> ["I", <<V:32/signed>>];
+field_value_to_binary(decimal, V) -> {Before, After} = V,
+ ["D", Before, <<After:32>>];
+field_value_to_binary(timestamp, V) -> ["T", <<V:64>>];
+field_value_to_binary(table, V) -> ["F", table_to_binary(V)];
+field_value_to_binary(array, V) -> ["A", array_to_binary(V)];
+field_value_to_binary(byte, V) -> ["b", <<V:8/unsigned>>];
+field_value_to_binary(double, V) -> ["d", <<V:64/float>>];
+field_value_to_binary(float, V) -> ["f", <<V:32/float>>];
+field_value_to_binary(long, V) -> ["l", <<V:64/signed>>];
+field_value_to_binary(short, V) -> ["s", <<V:16/signed>>];
+field_value_to_binary(bool, V) -> ["t", if V -> 1; true -> 0 end];
+field_value_to_binary(binary, V) -> ["x", long_string_to_binary(V)];
+field_value_to_binary(void, _V) -> ["V"].
table_to_binary(Table) when is_list(Table) ->
BinTable = generate_table(Table),
@@ -187,9 +147,8 @@ generate_table(Table) when is_list(Table) ->
list_to_binary(lists:map(fun table_field_to_binary/1, Table)).
generate_array(Array) when is_list(Array) ->
- list_to_binary(lists:map(
- fun ({Type, Value}) -> field_value_to_binary(Type, Value) end,
- Array)).
+ list_to_binary(lists:map(fun ({T, V}) -> field_value_to_binary(T, V) end,
+ Array)).
short_string_to_binary(String) when is_binary(String) ->
Len = size(String),
@@ -207,65 +166,12 @@ long_string_to_binary(String) when is_binary(String) ->
long_string_to_binary(String) ->
[<<(length(String)):32>>, String].
-encode_properties([], []) ->
- <<0, 0>>;
-encode_properties(TypeList, ValueList) ->
- encode_properties(0, TypeList, ValueList, 0, [], []).
-
-encode_properties(_Bit, [], [], FirstShortAcc, FlagsAcc, PropsAcc) ->
- list_to_binary([lists:reverse(FlagsAcc), <<FirstShortAcc:16>>, lists:reverse(PropsAcc)]);
-encode_properties(_Bit, [], _ValueList, _FirstShortAcc, _FlagsAcc, _PropsAcc) ->
- exit(content_properties_values_overflow);
-encode_properties(15, TypeList, ValueList, FirstShortAcc, FlagsAcc, PropsAcc) ->
- NewFlagsShort = FirstShortAcc bor 1, % set the continuation low bit
- encode_properties(0, TypeList, ValueList, 0, [<<NewFlagsShort:16>> | FlagsAcc], PropsAcc);
-encode_properties(Bit, [bit | TypeList], [Value | ValueList], FirstShortAcc, FlagsAcc, PropsAcc) ->
- case Value of
- true -> encode_properties(Bit + 1, TypeList, ValueList,
- FirstShortAcc bor (1 bsl (15 - Bit)), FlagsAcc, PropsAcc);
- false -> encode_properties(Bit + 1, TypeList, ValueList,
- FirstShortAcc, FlagsAcc, PropsAcc);
- Other -> exit({content_properties_illegal_bit_value, Other})
- end;
-encode_properties(Bit, [T | TypeList], [Value | ValueList], FirstShortAcc, FlagsAcc, PropsAcc) ->
- case Value of
- undefined -> encode_properties(Bit + 1, TypeList, ValueList,
- FirstShortAcc, FlagsAcc, PropsAcc);
- _ -> encode_properties(Bit + 1, TypeList, ValueList,
- FirstShortAcc bor (1 bsl (15 - Bit)),
- FlagsAcc,
- [encode_property(T, Value) | PropsAcc])
- end.
-
-encode_property(shortstr, String) ->
- Len = size(String),
- if Len < 256 -> <<Len:8, String:Len/binary>>;
- true -> exit(content_properties_shortstr_overflow)
- end;
-encode_property(longstr, String) ->
- Len = size(String), <<Len:32, String:Len/binary>>;
-encode_property(octet, Int) ->
- <<Int:8/unsigned>>;
-encode_property(shortint, Int) ->
- <<Int:16/unsigned>>;
-encode_property(longint, Int) ->
- <<Int:32/unsigned>>;
-encode_property(longlongint, Int) ->
- <<Int:64/unsigned>>;
-encode_property(timestamp, Int) ->
- <<Int:64/unsigned>>;
-encode_property(table, Table) ->
- table_to_binary(Table).
-
-check_empty_content_body_frame_size() ->
- %% Intended to ensure that EMPTY_CONTENT_BODY_FRAME_SIZE is
- %% defined correctly.
- ComputedSize = iolist_size(create_frame(?FRAME_BODY, 0, <<>>)),
- if ComputedSize == ?EMPTY_CONTENT_BODY_FRAME_SIZE ->
- ok;
- true ->
- exit({incorrect_empty_content_body_frame_size,
- ComputedSize, ?EMPTY_CONTENT_BODY_FRAME_SIZE})
+check_empty_frame_size() ->
+ %% Intended to ensure that EMPTY_FRAME_SIZE is defined correctly.
+ case iolist_size(create_frame(?FRAME_BODY, 0, <<>>)) of
+ ?EMPTY_FRAME_SIZE -> ok;
+ ComputedSize -> exit({incorrect_empty_frame_size,
+ ComputedSize, ?EMPTY_FRAME_SIZE})
end.
ensure_content_encoded(Content = #content{properties_bin = PropBin,
diff --git a/src/rabbit_binary_parser.erl b/src/rabbit_binary_parser.erl
index 5f0016b6..53878d6a 100644
--- a/src/rabbit_binary_parser.erl
+++ b/src/rabbit_binary_parser.erl
@@ -50,47 +50,36 @@ parse_array(<<ValueAndRest/binary>>) ->
{Type, Value, Rest} = parse_field_value(ValueAndRest),
[{Type, Value} | parse_array(Rest)].
-parse_field_value(<<"S", VLen:32/unsigned, ValueString:VLen/binary, Rest/binary>>) ->
- {longstr, ValueString, Rest};
+parse_field_value(<<"S", VLen:32/unsigned, V:VLen/binary, R/binary>>) ->
+ {longstr, V, R};
-parse_field_value(<<"I", Value:32/signed, Rest/binary>>) ->
- {signedint, Value, Rest};
+parse_field_value(<<"I", V:32/signed, R/binary>>) ->
+ {signedint, V, R};
-parse_field_value(<<"D", Before:8/unsigned, After:32/unsigned, Rest/binary>>) ->
- {decimal, {Before, After}, Rest};
+parse_field_value(<<"D", Before:8/unsigned, After:32/unsigned, R/binary>>) ->
+ {decimal, {Before, After}, R};
-parse_field_value(<<"T", Value:64/unsigned, Rest/binary>>) ->
- {timestamp, Value, Rest};
+parse_field_value(<<"T", V:64/unsigned, R/binary>>) ->
+ {timestamp, V, R};
-parse_field_value(<<"F", VLen:32/unsigned, Table:VLen/binary, Rest/binary>>) ->
- {table, parse_table(Table), Rest};
+parse_field_value(<<"F", VLen:32/unsigned, Table:VLen/binary, R/binary>>) ->
+ {table, parse_table(Table), R};
-parse_field_value(<<"A", VLen:32/unsigned, Array:VLen/binary, Rest/binary>>) ->
- {array, parse_array(Array), Rest};
+parse_field_value(<<"A", VLen:32/unsigned, Array:VLen/binary, R/binary>>) ->
+ {array, parse_array(Array), R};
-parse_field_value(<<"b", Value:8/unsigned, Rest/binary>>) ->
- {byte, Value, Rest};
+parse_field_value(<<"b", V:8/unsigned, R/binary>>) -> {byte, V, R};
+parse_field_value(<<"d", V:64/float, R/binary>>) -> {double, V, R};
+parse_field_value(<<"f", V:32/float, R/binary>>) -> {float, V, R};
+parse_field_value(<<"l", V:64/signed, R/binary>>) -> {long, V, R};
+parse_field_value(<<"s", V:16/signed, R/binary>>) -> {short, V, R};
+parse_field_value(<<"t", V:8/unsigned, R/binary>>) -> {bool, (V /= 0), R};
-parse_field_value(<<"d", Value:64/float, Rest/binary>>) ->
- {double, Value, Rest};
+parse_field_value(<<"x", VLen:32/unsigned, V:VLen/binary, R/binary>>) ->
+ {binary, V, R};
-parse_field_value(<<"f", Value:32/float, Rest/binary>>) ->
- {float, Value, Rest};
-
-parse_field_value(<<"l", Value:64/signed, Rest/binary>>) ->
- {long, Value, Rest};
-
-parse_field_value(<<"s", Value:16/signed, Rest/binary>>) ->
- {short, Value, Rest};
-
-parse_field_value(<<"t", Value:8/unsigned, Rest/binary>>) ->
- {bool, (Value /= 0), Rest};
-
-parse_field_value(<<"x", VLen:32/unsigned, ValueString:VLen/binary, Rest/binary>>) ->
- {binary, ValueString, Rest};
-
-parse_field_value(<<"V", Rest/binary>>) ->
- {void, undefined, Rest}.
+parse_field_value(<<"V", R/binary>>) ->
+ {void, undefined, R}.
ensure_content_decoded(Content = #content{properties = Props})
when Props =/= none ->
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
index 0d23f716..2d486651 100644
--- a/src/rabbit_binding.erl
+++ b/src/rabbit_binding.erl
@@ -35,9 +35,11 @@
-type(key() :: binary()).
--type(bind_errors() :: rabbit_types:error('source_not_found' |
- 'destination_not_found' |
- 'source_and_destination_not_found')).
+-type(bind_errors() :: rabbit_types:error(
+ {'resources_missing',
+ [{'not_found', (rabbit_types:binding_source() |
+ rabbit_types:binding_destination())} |
+ {'absent', rabbit_types:amqqueue()}]})).
-type(bind_ok_or_error() :: 'ok' | bind_errors() |
rabbit_types:error('binding_not_found')).
-type(bind_res() :: bind_ok_or_error() | rabbit_misc:thunk(bind_ok_or_error())).
@@ -330,21 +332,32 @@ sync_transient_route(Route, Fun) ->
call_with_source_and_destination(SrcName, DstName, Fun) ->
SrcTable = table_for_resource(SrcName),
DstTable = table_for_resource(DstName),
- ErrFun = fun (Err) -> rabbit_misc:const({error, Err}) end,
+ ErrFun = fun (Names) ->
+ Errs = [not_found_or_absent(Name) || Name <- Names],
+ rabbit_misc:const({error, {resources_missing, Errs}})
+ end,
rabbit_misc:execute_mnesia_tx_with_tail(
fun () ->
case {mnesia:read({SrcTable, SrcName}),
mnesia:read({DstTable, DstName})} of
{[Src], [Dst]} -> Fun(Src, Dst);
- {[], [_] } -> ErrFun(source_not_found);
- {[_], [] } -> ErrFun(destination_not_found);
- {[], [] } -> ErrFun(source_and_destination_not_found)
- end
+ {[], [_] } -> ErrFun([SrcName]);
+ {[_], [] } -> ErrFun([DstName]);
+ {[], [] } -> ErrFun([SrcName, DstName])
+ end
end).
table_for_resource(#resource{kind = exchange}) -> rabbit_exchange;
table_for_resource(#resource{kind = queue}) -> rabbit_queue.
+not_found_or_absent(#resource{kind = exchange} = Name) ->
+ {not_found, Name};
+not_found_or_absent(#resource{kind = queue} = Name) ->
+ case rabbit_amqqueue:not_found_or_absent(Name) of
+ not_found -> {not_found, Name};
+ {absent, _Q} = R -> R
+ end.
+
contains(Table, MatchHead) ->
continue(mnesia:select(Table, [{MatchHead, [], ['$_']}], 1, read)).
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 0d13312b..b1ef3b6b 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -35,8 +35,9 @@
-record(ch, {state, protocol, channel, reader_pid, writer_pid, conn_pid,
conn_name, limiter, tx_status, next_tag, unacked_message_q,
uncommitted_message_q, uncommitted_acks, uncommitted_nacks, user,
- virtual_host, most_recently_declared_queue, queue_monitors,
- consumer_mapping, blocking, queue_consumers, delivering_queues,
+ virtual_host, most_recently_declared_queue,
+ queue_names, queue_monitors, consumer_mapping,
+ blocking, queue_consumers, delivering_queues,
queue_collector_pid, stats_timer, confirm_enabled, publish_seqno,
unconfirmed, confirmed, capabilities, trace_state}).
@@ -194,6 +195,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
user = User,
virtual_host = VHost,
most_recently_declared_queue = <<>>,
+ queue_names = dict:new(),
queue_monitors = pmon:new(),
consumer_mapping = dict:new(),
blocking = sets:new(),
@@ -334,9 +336,13 @@ handle_info({'DOWN', _MRef, process, QPid, Reason}, State) ->
State3 = handle_consuming_queue_down(QPid, State2),
State4 = handle_delivering_queue_down(QPid, State3),
credit_flow:peer_down(QPid),
- erase_queue_stats(QPid),
- noreply(State3#ch{queue_monitors = pmon:erase(
- QPid, State4#ch.queue_monitors)});
+ #ch{queue_names = QNames, queue_monitors = QMons} = State4,
+ case dict:find(QPid, QNames) of
+ {ok, QName} -> erase_queue_stats(QName);
+ error -> ok
+ end,
+ noreply(State4#ch{queue_names = dict:erase(QPid, QNames),
+ queue_monitors = pmon:erase(QPid, QMons)});
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State}.
@@ -474,6 +480,13 @@ check_user_id_header(#'P_basic'{user_id = Claimed},
"'~s'", [Claimed, Actual])
end.
+check_expiration_header(Props) ->
+ case rabbit_basic:parse_expiration(Props) of
+ {ok, _} -> ok;
+ {error, E} -> precondition_failed("invalid expiration '~s': ~p",
+ [Props#'P_basic'.expiration, E])
+ end.
+
check_internal_exchange(#exchange{name = Name, internal = true}) ->
rabbit_misc:protocol_error(access_refused,
"cannot publish to internal ~s",
@@ -614,8 +627,10 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
check_internal_exchange(Exchange),
%% We decode the content's properties here because we're almost
%% certain to want to look at delivery-mode and priority.
- DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content),
- check_user_id_header(DecodedContent#content.properties, State),
+ DecodedContent = #content {properties = Props} =
+ rabbit_binary_parser:ensure_content_decoded(Content),
+ check_user_id_header(Props, State),
+ check_expiration_header(Props),
{MsgSeqNo, State1} =
case {TxStatus, ConfirmEnabled} of
{none, false} -> {undefined, State};
@@ -668,7 +683,7 @@ handle_method(#'basic.get'{queue = QueueNameBin,
QueueName, ConnPid,
fun (Q) -> rabbit_amqqueue:basic_get(Q, self(), NoAck) end) of
{ok, MessageCount,
- Msg = {_QName, QPid, _MsgId, Redelivered,
+ Msg = {QName, QPid, _MsgId, Redelivered,
#basic_message{exchange_name = ExchangeName,
routing_keys = [RoutingKey | _CcRoutes],
content = Content}}} ->
@@ -680,7 +695,7 @@ handle_method(#'basic.get'{queue = QueueNameBin,
routing_key = RoutingKey,
message_count = MessageCount},
Content),
- State1 = monitor_delivering_queue(NoAck, QPid, State),
+ State1 = monitor_delivering_queue(NoAck, QPid, QName, State),
{noreply, record_sent(none, not(NoAck), Msg, State1)};
empty ->
{reply, #'basic.get_empty'{}, State}
@@ -719,10 +734,11 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
consumer_tag = ActualConsumerTag})),
Q}
end) of
- {ok, Q = #amqqueue{pid = QPid}} ->
+ {ok, Q = #amqqueue{pid = QPid, name = QName}} ->
CM1 = dict:store(ActualConsumerTag, Q, ConsumerMapping),
State1 = monitor_delivering_queue(
- NoAck, QPid, State#ch{consumer_mapping = CM1}),
+ NoAck, QPid, QName,
+ State#ch{consumer_mapping = CM1}),
{noreply,
case NoWait of
true -> consumer_monitor(ActualConsumerTag, State1);
@@ -960,8 +976,12 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
{existing, _Q} ->
%% must have been created between the stat and the
%% declare. Loop around again.
- handle_method(Declare, none, State)
- end
+ handle_method(Declare, none, State);
+ {absent, Q} ->
+ rabbit_misc:absent(Q)
+ end;
+ {error, {absent, Q}} ->
+ rabbit_misc:absent(Q)
end;
handle_method(#'queue.declare'{queue = QueueNameBin,
@@ -1113,12 +1133,16 @@ consumer_monitor(ConsumerTag,
State
end.
-monitor_delivering_queue(true, _QPid, State) ->
- State;
-monitor_delivering_queue(false, QPid, State = #ch{queue_monitors = QMons,
- delivering_queues = DQ}) ->
- State#ch{queue_monitors = pmon:monitor(QPid, QMons),
- delivering_queues = sets:add_element(QPid, DQ)}.
+monitor_delivering_queue(NoAck, QPid, QName,
+ State = #ch{queue_names = QNames,
+ queue_monitors = QMons,
+ delivering_queues = DQ}) ->
+ State#ch{queue_names = dict:store(QPid, QName, QNames),
+ queue_monitors = pmon:monitor(QPid, QMons),
+ delivering_queues = case NoAck of
+ true -> DQ;
+ false -> sets:add_element(QPid, DQ)
+ end}.
handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed = UC}) ->
case rabbit_misc:is_abnormal_exit(Reason) of
@@ -1153,10 +1177,6 @@ binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin,
RoutingKey, Arguments, ReturnMethod, NoWait,
State = #ch{virtual_host = VHostPath,
conn_pid = ConnPid }) ->
- %% FIXME: connection exception (!) on failure??
- %% (see rule named "failure" in spec-XML)
- %% FIXME: don't allow binding to internal exchanges -
- %% including the one named "" !
{DestinationName, ActualRoutingKey} =
expand_binding(DestinationType, DestinationNameBin, RoutingKey, State),
check_write_permitted(DestinationName, State),
@@ -1174,14 +1194,10 @@ binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin,
(_X, #exchange{}) ->
ok
end) of
- {error, source_not_found} ->
- rabbit_misc:not_found(ExchangeName);
- {error, destination_not_found} ->
- rabbit_misc:not_found(DestinationName);
- {error, source_and_destination_not_found} ->
- rabbit_misc:protocol_error(
- not_found, "no ~s and no ~s", [rabbit_misc:rs(ExchangeName),
- rabbit_misc:rs(DestinationName)]);
+ {error, {resources_missing, [{not_found, Name} | _]}} ->
+ rabbit_misc:not_found(Name);
+ {error, {resources_missing, [{absent, Q} | _]}} ->
+ rabbit_misc:absent(Q);
{error, binding_not_found} ->
rabbit_misc:protocol_error(
not_found, "no binding ~s between ~s and ~s",
@@ -1227,17 +1243,20 @@ reject(Requeue, Acked, Limiter) ->
ok = notify_limiter(Limiter, Acked).
record_sent(ConsumerTag, AckRequired,
- Msg = {_QName, QPid, MsgId, Redelivered, _Message},
+ Msg = {QName, QPid, MsgId, Redelivered, _Message},
State = #ch{unacked_message_q = UAMQ,
next_tag = DeliveryTag,
trace_state = TraceState}) ->
- maybe_incr_stats([{QPid, 1}], case {ConsumerTag, AckRequired} of
- {none, true} -> get;
- {none, false} -> get_no_ack;
- {_ , true} -> deliver;
- {_ , false} -> deliver_no_ack
- end, State),
- maybe_incr_redeliver_stats(Redelivered, QPid, State),
+ incr_stats([{queue_stats, QName, 1}], case {ConsumerTag, AckRequired} of
+ {none, true} -> get;
+ {none, false} -> get_no_ack;
+ {_ , true} -> deliver;
+ {_ , false} -> deliver_no_ack
+ end, State),
+ case Redelivered of
+ true -> incr_stats([{queue_stats, QName, 1}], redeliver, State);
+ false -> ok
+ end,
rabbit_trace:tap_trace_out(Msg, TraceState),
UAMQ1 = case AckRequired of
true -> queue:in({DeliveryTag, ConsumerTag, {QPid, MsgId}},
@@ -1268,14 +1287,18 @@ collect_acks(ToAcc, PrefixAcc, Q, DeliveryTag, Multiple) ->
precondition_failed("unknown delivery tag ~w", [DeliveryTag])
end.
-ack(Acked, State) ->
- QIncs = fold_per_queue(
- fun (QPid, MsgIds, L) ->
- ok = rabbit_amqqueue:ack(QPid, MsgIds, self()),
- [{QPid, length(MsgIds)} | L]
- end, [], Acked),
+ack(Acked, State = #ch{queue_names = QNames}) ->
+ Incs = fold_per_queue(
+ fun (QPid, MsgIds, L) ->
+ ok = rabbit_amqqueue:ack(QPid, MsgIds, self()),
+ case dict:find(QPid, QNames) of
+ {ok, QName} -> Count = length(MsgIds),
+ [{queue_stats, QName, Count} | L];
+ error -> L
+ end
+ end, [], Acked),
ok = notify_limiter(State#ch.limiter, Acked),
- maybe_incr_stats(QIncs, ack, State).
+ incr_stats(Incs, ack, State).
new_tx(State) -> State#ch{uncommitted_message_q = queue:new(),
uncommitted_acks = [],
@@ -1330,23 +1353,42 @@ notify_limiter(Limiter, Acked) ->
deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{
exchange_name = XName},
msg_seq_no = MsgSeqNo},
- QNames}, State) ->
- {RoutingRes, DeliveredQPids} =
- rabbit_amqqueue:deliver_flow(rabbit_amqqueue:lookup(QNames), Delivery),
- State1 = State#ch{queue_monitors =
- pmon:monitor_all(DeliveredQPids,
- State#ch.queue_monitors)},
- State2 = process_routing_result(RoutingRes, DeliveredQPids,
- XName, MsgSeqNo, Message, State1),
- maybe_incr_stats([{XName, 1} |
- [{{QPid, XName}, 1} ||
- QPid <- DeliveredQPids]], publish, State2),
- State2.
+ DelQNames}, State = #ch{queue_names = QNames,
+ queue_monitors = QMons}) ->
+ Qs = rabbit_amqqueue:lookup(DelQNames),
+ {RoutingRes, DeliveredQPids} = rabbit_amqqueue:deliver_flow(Qs, Delivery),
+ %% The pmon:monitor_all/2 monitors all queues to which we
+ %% delivered. But we want to monitor even queues we didn't deliver
+ %% to, since we need their 'DOWN' messages to clean
+ %% queue_names. So we also need to monitor each QPid from
+ %% queues. But that only gets the masters (which is fine for
+ %% cleaning queue_names), so we need the union of both.
+ %%
+ %% ...and we need to add even non-delivered queues to queue_names
+ %% since alternative algorithms to update queue_names less
+ %% frequently would in fact be more expensive in the common case.
+ {QNames1, QMons1} =
+ lists:foldl(fun (#amqqueue{pid = QPid, name = QName},
+ {QNames0, QMons0}) ->
+ {case dict:is_key(QPid, QNames0) of
+ true -> QNames0;
+ false -> dict:store(QPid, QName, QNames0)
+ end, pmon:monitor(QPid, QMons0)}
+ end, {QNames, pmon:monitor_all(DeliveredQPids, QMons)}, Qs),
+ State1 = process_routing_result(RoutingRes, DeliveredQPids,
+ XName, MsgSeqNo, Message,
+ State#ch{queue_names = QNames1,
+ queue_monitors = QMons1}),
+ incr_stats([{exchange_stats, XName, 1} |
+ [{queue_exchange_stats, {QName, XName}, 1} ||
+ QPid <- DeliveredQPids,
+ {ok, QName} <- [dict:find(QPid, QNames1)]]],
+ publish, State1),
+ State1.
process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) ->
ok = basic_return(Msg, State, no_route),
- maybe_incr_stats([{Msg#basic_message.exchange_name, 1}],
- return_unroutable, State),
+ incr_stats([{exchange_stats, XName, 1}], return_unroutable, State),
record_confirm(MsgSeqNo, XName, State);
process_routing_result(routed, [], XName, MsgSeqNo, _, State) ->
record_confirm(MsgSeqNo, XName, State);
@@ -1371,10 +1413,11 @@ send_confirms(State = #ch{tx_status = none, confirmed = []}) ->
State;
send_confirms(State = #ch{tx_status = none, confirmed = C}) ->
MsgSeqNos =
- lists:foldl(fun ({MsgSeqNo, XName}, MSNs) ->
- maybe_incr_stats([{XName, 1}], confirm, State),
- [MsgSeqNo | MSNs]
- end, [], lists:append(C)),
+ lists:foldl(
+ fun ({MsgSeqNo, XName}, MSNs) ->
+ incr_stats([{exchange_stats, XName, 1}], confirm, State),
+ [MsgSeqNo | MSNs]
+ end, [], lists:append(C)),
send_confirms(MsgSeqNos, State#ch{confirmed = []});
send_confirms(State) ->
maybe_complete_tx(State).
@@ -1457,26 +1500,15 @@ i(Item, _) ->
name(#ch{conn_name = ConnName, channel = Channel}) ->
list_to_binary(rabbit_misc:format("~s (~p)", [ConnName, Channel])).
-maybe_incr_redeliver_stats(true, QPid, State) ->
- maybe_incr_stats([{QPid, 1}], redeliver, State);
-maybe_incr_redeliver_stats(_, _, _State) ->
- ok.
-
-maybe_incr_stats(QXIncs, Measure, State) ->
+incr_stats(Incs, Measure, State) ->
case rabbit_event:stats_level(State, #ch.stats_timer) of
- fine -> [incr_stats(QX, Inc, Measure) || {QX, Inc} <- QXIncs];
+ fine -> [update_measures(Type, Key, Inc, Measure) ||
+ {Type, Key, Inc} <- Incs];
_ -> ok
end.
-incr_stats({_, _} = QX, Inc, Measure) ->
- update_measures(queue_exchange_stats, QX, Inc, Measure);
-incr_stats(QPid, Inc, Measure) when is_pid(QPid) ->
- update_measures(queue_stats, QPid, Inc, Measure);
-incr_stats(X, Inc, Measure) ->
- update_measures(exchange_stats, X, Inc, Measure).
-
-update_measures(Type, QX, Inc, Measure) ->
- Measures = case get({Type, QX}) of
+update_measures(Type, Key, Inc, Measure) ->
+ Measures = case get({Type, Key}) of
undefined -> [];
D -> D
end,
@@ -1484,31 +1516,29 @@ update_measures(Type, QX, Inc, Measure) ->
error -> 0;
{ok, C} -> C
end,
- put({Type, QX},
- orddict:store(Measure, Cur + Inc, Measures)).
+ put({Type, Key}, orddict:store(Measure, Cur + Inc, Measures)).
emit_stats(State) ->
emit_stats(State, []).
emit_stats(State, Extra) ->
- CoarseStats = infos(?STATISTICS_KEYS, State),
+ Coarse = infos(?STATISTICS_KEYS, State),
case rabbit_event:stats_level(State, #ch.stats_timer) of
- coarse ->
- rabbit_event:notify(channel_stats, Extra ++ CoarseStats);
- fine ->
- FineStats =
- [{channel_queue_stats,
- [{QPid, Stats} || {{queue_stats, QPid}, Stats} <- get()]},
- {channel_exchange_stats,
- [{X, Stats} || {{exchange_stats, X}, Stats} <- get()]},
- {channel_queue_exchange_stats,
- [{QX, Stats} ||
- {{queue_exchange_stats, QX}, Stats} <- get()]}],
- rabbit_event:notify(channel_stats,
- Extra ++ CoarseStats ++ FineStats)
+ coarse -> rabbit_event:notify(channel_stats, Extra ++ Coarse);
+ fine -> Fine = [{channel_queue_stats,
+ [{QName, Stats} ||
+ {{queue_stats, QName}, Stats} <- get()]},
+ {channel_exchange_stats,
+ [{XName, Stats} ||
+ {{exchange_stats, XName}, Stats} <- get()]},
+ {channel_queue_exchange_stats,
+ [{QX, Stats} ||
+ {{queue_exchange_stats, QX}, Stats} <- get()]}],
+ rabbit_event:notify(channel_stats, Extra ++ Coarse ++ Fine)
end.
-erase_queue_stats(QPid) ->
- erase({queue_stats, QPid}),
+erase_queue_stats(QName) ->
+ erase({queue_stats, QName}),
[erase({queue_exchange_stats, QX}) ||
- {{queue_exchange_stats, QX = {QPid0, _}}, _} <- get(), QPid =:= QPid0].
+ {{queue_exchange_stats, QX = {QName0, _}}, _} <- get(),
+ QName0 =:= QName].
diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl
index bcb83851..42459833 100644
--- a/src/rabbit_channel_sup.erl
+++ b/src/rabbit_channel_sup.erl
@@ -83,7 +83,7 @@ init(Type) ->
child_specs({tcp, Sock, Channel, FrameMax, ReaderPid, Protocol}) ->
[{writer, {rabbit_writer, start_link,
- [Sock, Channel, FrameMax, Protocol, ReaderPid]},
+ [Sock, Channel, FrameMax, Protocol, ReaderPid, true]},
intrinsic, ?MAX_WAIT, worker, [rabbit_writer]} | child_specs(direct)];
child_specs(direct) ->
[{limiter, {rabbit_limiter, start_link, []},
diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl
index 25f7d758..669a0787 100644
--- a/src/rabbit_control_main.erl
+++ b/src/rabbit_control_main.erl
@@ -386,7 +386,7 @@ action(list_bindings, Node, Args, Opts, Inform) ->
action(list_connections, Node, Args, _Opts, Inform) ->
Inform("Listing connections", []),
- ArgAtoms = default_if_empty(Args, [user, peer_address, peer_port, state]),
+ ArgAtoms = default_if_empty(Args, [user, peer_host, peer_port, state]),
display_info_list(rpc_call(Node, rabbit_networking, connection_info_all,
[ArgAtoms]),
ArgAtoms);
@@ -611,7 +611,7 @@ display_info_list(Results, InfoItemKeys) when is_list(Results) ->
fun (Result) -> display_row(
[format_info_item(proplists:get_value(X, Result)) ||
X <- InfoItemKeys])
- end, Results),
+ end, lists:sort(Results)),
ok;
display_info_list(Other, _) ->
Other.
diff --git a/src/rabbit_event.erl b/src/rabbit_event.erl
index 3f1b20fe..7d91b6fa 100644
--- a/src/rabbit_event.erl
+++ b/src/rabbit_event.erl
@@ -19,8 +19,8 @@
-include("rabbit.hrl").
-export([start_link/0]).
--export([init_stats_timer/2, ensure_stats_timer/3, stop_stats_timer/2]).
--export([reset_stats_timer/2]).
+-export([init_stats_timer/2, init_disabled_stats_timer/2,
+ ensure_stats_timer/3, stop_stats_timer/2, reset_stats_timer/2]).
-export([stats_level/2, if_enabled/3]).
-export([notify/2, notify_if/3]).
@@ -51,6 +51,7 @@
-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
-spec(init_stats_timer/2 :: (container(), pos()) -> container()).
+-spec(init_disabled_stats_timer/2 :: (container(), pos()) -> container()).
-spec(ensure_stats_timer/3 :: (container(), pos(), term()) -> container()).
-spec(stop_stats_timer/2 :: (container(), pos()) -> container()).
-spec(reset_stats_timer/2 :: (container(), pos()) -> container()).
@@ -90,10 +91,13 @@ start_link() ->
init_stats_timer(C, P) ->
{ok, StatsLevel} = application:get_env(rabbit, collect_statistics),
- {ok, Interval} = application:get_env(rabbit, collect_statistics_interval),
+ {ok, Interval} = application:get_env(rabbit, collect_statistics_interval),
setelement(P, C, #state{level = StatsLevel, interval = Interval,
timer = undefined}).
+init_disabled_stats_timer(C, P) ->
+ setelement(P, C, #state{level = none, interval = 0, timer = undefined}).
+
ensure_stats_timer(C, P, Msg) ->
case element(P, C) of
#state{level = Level, interval = Interval, timer = undefined} = State
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index a205b23d..e72cbafe 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -39,8 +39,7 @@
-spec(recover/0 :: () -> [name()]).
-spec(callback/4::
(rabbit_types:exchange(), fun_name(),
- fun((boolean()) -> non_neg_integer()) | atom(),
- [any()]) -> 'ok').
+ fun((boolean()) -> non_neg_integer()) | atom(), [any()]) -> 'ok').
-spec(policy_changed/2 ::
(rabbit_types:exchange(), rabbit_types:exchange()) -> 'ok').
-spec(declare/6 ::
@@ -114,26 +113,19 @@ recover() ->
[XName || #exchange{name = XName} <- Xs].
callback(X = #exchange{type = XType}, Fun, Serial0, Args) ->
- Serial = fun (Bool) ->
- case Serial0 of
- _ when is_atom(Serial0) -> Serial0;
- _ -> Serial0(Bool)
- end
+ Serial = if is_function(Serial0) -> Serial0;
+ is_atom(Serial0) -> fun (_Bool) -> Serial0 end
end,
- [ok = apply(M, Fun, [Serial(M:serialise_events(X)) | Args])
- || M <- decorators()],
+ [ok = apply(M, Fun, [Serial(M:serialise_events(X)) | Args]) ||
+ M <- decorators()],
Module = type_to_module(XType),
apply(Module, Fun, [Serial(Module:serialise_events()) | Args]).
policy_changed(X1, X2) -> callback(X1, policy_changed, none, [X1, X2]).
serialise_events(X = #exchange{type = Type}) ->
- case [Serialise || M <- decorators(),
- Serialise <- [M:serialise_events(X)],
- Serialise == true] of
- [] -> (type_to_module(Type)):serialise_events();
- _ -> true
- end.
+ lists:any(fun (M) -> M:serialise_events(X) end, decorators())
+ orelse (type_to_module(Type)):serialise_events().
serial(#exchange{name = XName} = X) ->
Serial = case serialise_events(X) of
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index cce19c90..8fcd1893 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -17,11 +17,12 @@
-module(rabbit_mirror_queue_master).
-export([init/3, terminate/2, delete_and_terminate/2,
- purge/1, publish/4, publish_delivered/4, discard/3, fetch/2, ack/2,
- requeue/2, len/1, is_empty/1, depth/1, drain_confirmed/1,
+ purge/1, publish/4, publish_delivered/4,
+ discard/3, fetch/2, drop/2, ack/2,
+ requeue/2, fold/3, len/1, is_empty/1, depth/1, drain_confirmed/1,
dropwhile/3, set_ram_duration_target/2, ram_duration/1,
needs_timeout/1, timeout/1, handle_pre_hibernate/1,
- status/1, invoke/3, is_duplicate/2, fold/3]).
+ status/1, invoke/3, is_duplicate/2, foreach_ack/3]).
-export([start/1, stop/0]).
@@ -88,12 +89,10 @@ stop() ->
%% Same as start/1.
exit({not_valid_for_generic_backing_queue, ?MODULE}).
-init(Q = #amqqueue{name = QName}, Recover, AsyncCallback) ->
+init(Q, Recover, AsyncCallback) ->
{ok, BQ} = application:get_env(backing_queue_module),
BQS = BQ:init(Q, Recover, AsyncCallback),
State = #state{gm = GM} = init_with_existing_bq(Q, BQ, BQS),
- {_MNode, SNodes} = rabbit_mirror_queue_misc:suggested_queue_nodes(Q),
- rabbit_mirror_queue_misc:add_mirrors(QName, SNodes),
ok = gm:broadcast(GM, {depth, BQ:depth(BQS)}),
State.
@@ -109,6 +108,8 @@ init_with_existing_bq(Q = #amqqueue{name = QName}, BQ, BQS) ->
ok = rabbit_amqqueue:store_queue(
Q1#amqqueue{gm_pids = [{GM, Self} | GMPids]})
end),
+ {_MNode, SNodes} = rabbit_mirror_queue_misc:suggested_queue_nodes(Q),
+ rabbit_mirror_queue_misc:add_mirrors(QName, SNodes),
#state { gm = GM,
coordinator = CPid,
backing_queue = BQ,
@@ -229,7 +230,10 @@ dropwhile(Pred, AckRequired,
{Next, Msgs, BQS1} = BQ:dropwhile(Pred, AckRequired, BQS),
Len1 = BQ:len(BQS1),
Dropped = Len - Len1,
- ok = gm:broadcast(GM, {drop, Len1, Dropped, AckRequired}),
+ case Dropped of
+ 0 -> ok;
+ _ -> ok = gm:broadcast(GM, {drop, Len1, Dropped, AckRequired})
+ end,
SetDelivered1 = lists:max([0, SetDelivered - Dropped]),
{Next, Msgs, State #state { backing_queue_state = BQS1,
set_delivered = SetDelivered1 } }.
@@ -264,27 +268,28 @@ drain_confirmed(State = #state { backing_queue = BQ,
seen_status = SS1,
confirmed = [] }}.
-fetch(AckRequired, State = #state { gm = GM,
- backing_queue = BQ,
+fetch(AckRequired, State = #state { backing_queue = BQ,
backing_queue_state = BQS,
- set_delivered = SetDelivered,
- ack_msg_id = AM }) ->
+ set_delivered = SetDelivered }) ->
{Result, BQS1} = BQ:fetch(AckRequired, BQS),
State1 = State #state { backing_queue_state = BQS1 },
case Result of
empty ->
{Result, State1};
- {#basic_message { id = MsgId } = Message, IsDelivered, AckTag,
- Remaining} ->
- ok = gm:broadcast(GM, {fetch, AckRequired, MsgId, Remaining}),
- IsDelivered1 = IsDelivered orelse SetDelivered > 0,
- SetDelivered1 = lists:max([0, SetDelivered - 1]),
- AM1 = maybe_store_acktag(AckTag, MsgId, AM),
- {{Message, IsDelivered1, AckTag, Remaining},
- State1 #state { set_delivered = SetDelivered1,
- ack_msg_id = AM1 }}
+ {Message, IsDelivered, AckTag} ->
+ {{Message, IsDelivered orelse SetDelivered > 0, AckTag},
+ drop(Message#basic_message.id, AckTag, State1)}
end.
+drop(AckRequired, State = #state { backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ {Result, BQS1} = BQ:drop(AckRequired, BQS),
+ State1 = State #state { backing_queue_state = BQS1 },
+ {Result, case Result of
+ empty -> State1;
+ {MsgId, AckTag} -> drop(MsgId, AckTag, State1)
+ end}.
+
ack(AckTags, State = #state { gm = GM,
backing_queue = BQ,
backing_queue_state = BQS,
@@ -298,9 +303,9 @@ ack(AckTags, State = #state { gm = GM,
{MsgIds, State #state { backing_queue_state = BQS1,
ack_msg_id = AM1 }}.
-fold(MsgFun, State = #state { backing_queue = BQ,
- backing_queue_state = BQS }, AckTags) ->
- State #state { backing_queue_state = BQ:fold(MsgFun, BQS, AckTags) }.
+foreach_ack(MsgFun, State = #state { backing_queue = BQ,
+ backing_queue_state = BQS }, AckTags) ->
+ State #state { backing_queue_state = BQ:foreach_ack(MsgFun, BQS, AckTags) }.
requeue(AckTags, State = #state { gm = GM,
backing_queue = BQ,
@@ -309,6 +314,11 @@ requeue(AckTags, State = #state { gm = GM,
ok = gm:broadcast(GM, {requeue, MsgIds}),
{MsgIds, State #state { backing_queue_state = BQS1 }}.
+fold(Fun, Acc, State = #state { backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ {Result, BQS1} = BQ:fold(Fun, Acc, BQS),
+ {Result, State #state { backing_queue_state = BQS1 }}.
+
len(#state { backing_queue = BQ, backing_queue_state = BQS }) ->
BQ:len(BQS).
@@ -437,6 +447,19 @@ depth_fun() ->
end)
end.
+%% ---------------------------------------------------------------------------
+%% Helpers
+%% ---------------------------------------------------------------------------
+
+drop(MsgId, AckTag, State = #state { set_delivered = SetDelivered,
+ ack_msg_id = AM,
+ gm = GM,
+ backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ ok = gm:broadcast(GM, {drop, BQ:len(BQS), 1, AckTag =/= undefined}),
+ State #state { set_delivered = lists:max([0, SetDelivered - 1]),
+ ack_msg_id = maybe_store_acktag(AckTag, MsgId, AM) }.
+
maybe_store_acktag(undefined, _MsgId, AM) -> AM;
maybe_store_acktag(AckTag, MsgId, AM) -> dict:store(AckTag, MsgId, AM).
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index 4a00846e..2b3bd027 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -137,11 +137,11 @@ on_node_up() ->
ok.
drop_mirrors(QName, Nodes) ->
- [ok = drop_mirror(QName, Node) || Node <- Nodes],
+ [{ok, _} = drop_mirror(QName, Node) || Node <- Nodes],
ok.
drop_mirror(QName, MirrorNode) ->
- if_mirrored_queue(
+ rabbit_amqqueue:with(
QName,
fun (#amqqueue { name = Name, pid = QPid, slave_pids = SPids }) ->
case [Pid || Pid <- [QPid | SPids], node(Pid) =:= MirrorNode] of
@@ -154,7 +154,7 @@ drop_mirror(QName, MirrorNode) ->
"Dropping queue mirror on node ~p for ~s~n",
[MirrorNode, rabbit_misc:rs(Name)]),
exit(Pid, {shutdown, dropped}),
- ok
+ {ok, dropped}
end
end).
@@ -163,7 +163,7 @@ add_mirrors(QName, Nodes) ->
ok.
add_mirror(QName, MirrorNode) ->
- if_mirrored_queue(
+ rabbit_amqqueue:with(
QName,
fun (#amqqueue { name = Name, pid = QPid, slave_pids = SPids } = Q) ->
case [Pid || Pid <- [QPid | SPids], node(Pid) =:= MirrorNode] of
@@ -206,14 +206,6 @@ start_child(Name, MirrorNode, Q) ->
Other
end.
-if_mirrored_queue(QName, Fun) ->
- rabbit_amqqueue:with(QName, fun (Q) ->
- case is_mirrored(Q) of
- false -> ok;
- true -> Fun(Q)
- end
- end).
-
report_deaths(_MirrorPid, _IsMaster, _QueueName, []) ->
ok;
report_deaths(MirrorPid, IsMaster, QueueName, DeadPids) ->
@@ -268,7 +260,11 @@ policy(Policy, Q) ->
suggested_queue_nodes(<<"all">>, _Params, {MNode, _SNodes}, Possible) ->
{MNode, Possible -- [MNode]};
suggested_queue_nodes(<<"nodes">>, Nodes0, {MNode, _SNodes}, Possible) ->
- Nodes = [list_to_atom(binary_to_list(Node)) || Node <- Nodes0],
+ Nodes1 = [list_to_atom(binary_to_list(Node)) || Node <- Nodes0],
+ %% If the current master is currently not in the nodes specified,
+ %% act like it is for the purposes below - otherwise we will not
+ %% return it in the results...
+ Nodes = lists:usort([MNode | Nodes1]),
Unavailable = Nodes -- Possible,
Available = Nodes -- Unavailable,
case Available of
@@ -314,20 +310,13 @@ is_mirrored(Q) ->
_ -> false
end.
-
-%% [1] - rabbit_amqqueue:start_mirroring/1 will turn unmirrored to
-%% master and start any needed slaves. However, if node(QPid) is not
-%% in the nodes for the policy, it won't switch it. So this is for the
-%% case where we kill the existing queue and restart elsewhere. TODO:
-%% is this TRTTD? All alternatives seem ugly.
update_mirrors(OldQ = #amqqueue{pid = QPid},
NewQ = #amqqueue{pid = QPid}) ->
case {is_mirrored(OldQ), is_mirrored(NewQ)} of
{false, false} -> ok;
{true, false} -> rabbit_amqqueue:stop_mirroring(QPid);
- {false, true} -> rabbit_amqqueue:start_mirroring(QPid),
- update_mirrors0(OldQ, NewQ); %% [1]
- {true, true} -> update_mirrors0(OldQ, NewQ)
+ {false, true} -> rabbit_amqqueue:start_mirroring(QPid);
+ {true, true} -> update_mirrors0(OldQ, NewQ)
end.
update_mirrors0(OldQ = #amqqueue{name = QName},
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 1ba1420f..cb7a2135 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -28,7 +28,7 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3, handle_pre_hibernate/1, prioritise_call/3,
- prioritise_cast/2, prioritise_info/2]).
+ prioritise_cast/2, prioritise_info/2, format_message_queue/2]).
-export([joined/2, members_changed/3, handle_msg/3]).
@@ -329,6 +329,8 @@ prioritise_info(Msg, _State) ->
_ -> 0
end.
+format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ).
+
%% ---------------------------------------------------------------------------
%% GM
%% ---------------------------------------------------------------------------
@@ -725,8 +727,7 @@ process_instruction({drop, Length, Dropped, AckRequired},
end,
State1 = lists:foldl(
fun (const, StateN = #state{backing_queue_state = BQSN}) ->
- {{#basic_message{id = MsgId}, _, AckTag, _}, BQSN1} =
- BQ:fetch(AckRequired, BQSN),
+ {{MsgId, AckTag}, BQSN1} = BQ:drop(AckRequired, BQSN),
maybe_store_ack(
AckRequired, MsgId, AckTag,
StateN #state { backing_queue_state = BQSN1 })
@@ -735,21 +736,6 @@ process_instruction({drop, Length, Dropped, AckRequired},
true -> State1;
false -> update_delta(ToDrop - Dropped, State1)
end};
-process_instruction({fetch, AckRequired, MsgId, Remaining},
- State = #state { backing_queue = BQ,
- backing_queue_state = BQS }) ->
- QLen = BQ:len(BQS),
- {ok, case QLen - 1 of
- Remaining ->
- {{#basic_message{id = MsgId}, _IsDelivered,
- AckTag, Remaining}, BQS1} = BQ:fetch(AckRequired, BQS),
- maybe_store_ack(AckRequired, MsgId, AckTag,
- State #state { backing_queue_state = BQS1 });
- _ when QLen =< Remaining andalso AckRequired ->
- State;
- _ when QLen =< Remaining ->
- update_delta(-1, State)
- end};
process_instruction({ack, MsgIds},
State = #state { backing_queue = BQ,
backing_queue_state = BQS,
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index ab9a9ceb..81bb6769 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -21,7 +21,7 @@
-export([method_record_type/1, polite_pause/0, polite_pause/1]).
-export([die/1, frame_error/2, amqp_error/4, quit/1,
protocol_error/3, protocol_error/4, protocol_error/1]).
--export([not_found/1, assert_args_equivalence/4]).
+-export([not_found/1, absent/1, assert_args_equivalence/4]).
-export([dirty_read/1]).
-export([table_lookup/2, set_table_value/4]).
-export([r/3, r/2, r_arg/4, rs/1]).
@@ -63,13 +63,18 @@
-export([version/0]).
-export([sequence_error/1]).
-export([json_encode/1, json_decode/1, json_to_term/1, term_to_json/1]).
+-export([check_expiry/1]).
-export([base64url/1]).
+-export([interval_operation/4]).
%% Horrible macro to use in guards
-define(IS_BENIGN_EXIT(R),
R =:= noproc; R =:= noconnection; R =:= nodedown; R =:= normal;
R =:= shutdown).
+%% This is dictated by `erlang:send_after' on which we depend to implement TTL.
+-define(MAX_EXPIRY_TIMER, 4294967295).
+
%%----------------------------------------------------------------------------
-ifdef(use_specs).
@@ -111,6 +116,7 @@
-spec(protocol_error/1 ::
(rabbit_types:amqp_error()) -> channel_or_connection_exit()).
-spec(not_found/1 :: (rabbit_types:r(atom())) -> rabbit_types:channel_exit()).
+-spec(absent/1 :: (rabbit_types:amqqueue()) -> rabbit_types:channel_exit()).
-spec(assert_args_equivalence/4 :: (rabbit_framing:amqp_table(),
rabbit_framing:amqp_table(),
rabbit_types:r(any()), [binary()]) ->
@@ -228,7 +234,11 @@
-spec(json_decode/1 :: (string()) -> {'ok', any()} | 'error').
-spec(json_to_term/1 :: (any()) -> any()).
-spec(term_to_json/1 :: (any()) -> any()).
+-spec(check_expiry/1 :: (integer()) -> rabbit_types:ok_or_error(any())).
-spec(base64url/1 :: (binary()) -> string()).
+-spec(interval_operation/4 ::
+ ({atom(), atom(), any()}, float(), non_neg_integer(), non_neg_integer())
+ -> {any(), non_neg_integer()}).
-endif.
@@ -266,6 +276,15 @@ protocol_error(#amqp_error{} = Error) ->
not_found(R) -> protocol_error(not_found, "no ~s", [rs(R)]).
+absent(#amqqueue{name = QueueName, pid = QPid, durable = true}) ->
+ %% The assertion of durability is mainly there because we mention
+ %% durability in the error message. That way we will hopefully
+ %% notice if at some future point our logic changes s.t. we get
+ %% here with non-durable queues.
+ protocol_error(not_found,
+ "home node '~s' of durable ~s is down or inaccessible",
+ [node(QPid), rs(QueueName)]).
+
type_class(byte) -> int;
type_class(short) -> int;
type_class(signedint) -> int;
@@ -990,9 +1009,28 @@ term_to_json(V) when is_binary(V) orelse is_number(V) orelse V =:= null orelse
V =:= true orelse V =:= false ->
V.
+check_expiry(N) when N > ?MAX_EXPIRY_TIMER -> {error, {value_too_big, N}};
+check_expiry(N) when N < 0 -> {error, {value_negative, N}};
+check_expiry(_N) -> ok.
+
base64url(In) ->
lists:reverse(lists:foldl(fun ($\+, Acc) -> [$\- | Acc];
($\/, Acc) -> [$\_ | Acc];
($\=, Acc) -> Acc;
(Chr, Acc) -> [Chr | Acc]
end, [], base64:encode_to_string(In))).
+
+%% Ideally, you'd want Fun to run every IdealInterval. but you don't
+%% want it to take more than MaxRatio of IdealInterval. So if it takes
+%% more then you want to run it less often. So we time how long it
+%% takes to run, and then suggest how long you should wait before
+%% running it again. Times are in millis.
+interval_operation({M, F, A}, MaxRatio, IdealInterval, LastInterval) ->
+ {Micros, Res} = timer:tc(M, F, A),
+ {Res, case {Micros > 1000 * (MaxRatio * IdealInterval),
+ Micros > 1000 * (MaxRatio * LastInterval)} of
+ {true, true} -> round(LastInterval * 1.5);
+ {true, false} -> LastInterval;
+ {false, false} -> lists:max([IdealInterval,
+ round(LastInterval / 1.5)])
+ end}.
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 8df8e653..942048f9 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -833,7 +833,7 @@ error_description(cannot_connect_to_cluster) ->
"'update_cluster_nodes' command to point to the new cluster nodes.";
error_description(no_online_cluster_nodes) ->
"Could not find any online cluster nodes. If the cluster has changed, "
- "you can use the 'recluster' command.";
+ "you can use the 'update_cluster_nodes' command.";
error_description(cannot_connect_to_node) ->
"Could not connect to the cluster node provided.";
error_description(inconsistent_cluster) ->
diff --git a/src/rabbit_net.erl b/src/rabbit_net.erl
index 038154c3..562fc197 100644
--- a/src/rabbit_net.erl
+++ b/src/rabbit_net.erl
@@ -20,7 +20,7 @@
-export([is_ssl/1, ssl_info/1, controlling_process/2, getstat/2,
recv/1, async_recv/3, port_command/2, getopts/2, setopts/2, send/2,
close/1, fast_close/1, sockname/1, peername/1, peercert/1,
- tune_buffer_size/1, connection_string/2]).
+ tune_buffer_size/1, connection_string/2, socket_ends/2]).
%%---------------------------------------------------------------------------
@@ -36,7 +36,7 @@
-type(socket() :: port() | #ssl_socket{}).
-type(opts() :: [{atom(), any()} |
{raw, non_neg_integer(), non_neg_integer(), binary()}]).
-
+-type(host_or_ip() :: binary() | inet:ip_address()).
-spec(is_ssl/1 :: (socket()) -> boolean()).
-spec(ssl_info/1 :: (socket())
-> 'nossl' | ok_val_or_error(
@@ -72,6 +72,10 @@
-spec(tune_buffer_size/1 :: (socket()) -> ok_or_any_error()).
-spec(connection_string/2 ::
(socket(), 'inbound' | 'outbound') -> ok_val_or_error(string())).
+-spec(socket_ends/2 ::
+ (socket(), 'inbound' | 'outbound')
+ -> ok_val_or_error({host_or_ip(), rabbit_networking:ip_port(),
+ host_or_ip(), rabbit_networking:ip_port()})).
-endif.
@@ -193,17 +197,37 @@ tune_buffer_size(Sock) ->
end.
connection_string(Sock, Direction) ->
- {From, To} = case Direction of
- inbound -> {fun peername/1, fun sockname/1};
- outbound -> {fun sockname/1, fun peername/1}
- end,
+ case socket_ends(Sock, Direction) of
+ {ok, {FromAddress, FromPort, ToAddress, ToPort}} ->
+ {ok, rabbit_misc:format(
+ "~s:~p -> ~s:~p",
+ [maybe_ntoab(FromAddress), FromPort,
+ maybe_ntoab(ToAddress), ToPort])};
+ Error ->
+ Error
+ end.
+
+socket_ends(Sock, Direction) ->
+ {From, To} = sock_funs(Direction),
case {From(Sock), To(Sock)} of
{{ok, {FromAddress, FromPort}}, {ok, {ToAddress, ToPort}}} ->
- {ok, rabbit_misc:format("~s:~p -> ~s:~p",
- [rabbit_misc:ntoab(FromAddress), FromPort,
- rabbit_misc:ntoab(ToAddress), ToPort])};
+ {ok, {rdns(FromAddress), FromPort,
+ rdns(ToAddress), ToPort}};
{{error, _Reason} = Error, _} ->
Error;
{_, {error, _Reason} = Error} ->
Error
end.
+
+maybe_ntoab(Addr) when is_tuple(Addr) -> rabbit_misc:ntoab(Addr);
+maybe_ntoab(Host) -> Host.
+
+rdns(Addr) ->
+ {ok, Lookup} = application:get_env(rabbit, reverse_dns_lookups),
+ case Lookup of
+ true -> list_to_binary(rabbit_networking:tcp_host(Addr));
+ _ -> Addr
+ end.
+
+sock_funs(inbound) -> {fun peername/1, fun sockname/1};
+sock_funs(outbound) -> {fun sockname/1, fun peername/1}.
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index 5cf8d1ae..31eeef73 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -21,7 +21,7 @@
node_listeners/1, connections/0, connection_info_keys/0,
connection_info/1, connection_info/2,
connection_info_all/0, connection_info_all/1,
- close_connection/2, force_connection_event_refresh/0]).
+ close_connection/2, force_connection_event_refresh/0, tcp_host/1]).
%%used by TCP-based transports, e.g. STOMP adapter
-export([tcp_listener_addresses/1, tcp_listener_spec/6,
diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl
index b11c9d04..8d0e4456 100644
--- a/src/rabbit_node_monitor.erl
+++ b/src/rabbit_node_monitor.erl
@@ -85,10 +85,10 @@ cluster_status_filename() ->
prepare_cluster_status_files() ->
rabbit_mnesia:ensure_mnesia_dir(),
- CorruptFiles = fun () -> throw({error, corrupt_cluster_status_files}) end,
+ Corrupt = fun(F) -> throw({error, corrupt_cluster_status_files, F}) end,
RunningNodes1 = case try_read_file(running_nodes_filename()) of
{ok, [Nodes]} when is_list(Nodes) -> Nodes;
- {ok, _ } -> CorruptFiles();
+ {ok, Other} -> Corrupt(Other);
{error, enoent} -> []
end,
ThisNode = [node()],
@@ -102,8 +102,8 @@ prepare_cluster_status_files() ->
{ok, [AllNodes0]} when is_list(AllNodes0) ->
{legacy_cluster_nodes(AllNodes0),
legacy_should_be_disc_node(AllNodes0)};
- {ok, _} ->
- CorruptFiles();
+ {ok, Files} ->
+ Corrupt(Files);
{error, enoent} ->
{legacy_cluster_nodes([]), true}
end,
@@ -134,8 +134,8 @@ read_cluster_status() ->
try_read_file(running_nodes_filename())} of
{{ok, [{All, Disc}]}, {ok, [Running]}} when is_list(Running) ->
{All, Disc, Running};
- {_, _} ->
- throw({error, corrupt_or_missing_cluster_files})
+ {Stat, Run} ->
+ throw({error, {corrupt_or_missing_cluster_files, Stat, Run}})
end.
update_cluster_status() ->
@@ -184,6 +184,11 @@ partitions() ->
%%----------------------------------------------------------------------------
init([]) ->
+ %% We trap exits so that the supervisor will not just kill us. We
+ %% want to be sure that we are not going to be killed while
+ %% writing out the cluster status files - bad things can then
+ %% happen.
+ process_flag(trap_exit, true),
{ok, _} = mnesia:subscribe(system),
{ok, #state{monitors = pmon:new(),
partitions = []}}.
diff --git a/src/rabbit_plugins.erl b/src/rabbit_plugins.erl
index ecb19611..9f94af7d 100644
--- a/src/rabbit_plugins.erl
+++ b/src/rabbit_plugins.erl
@@ -19,18 +19,6 @@
-export([setup/0, active/0, read_enabled/1, list/1, dependencies/3]).
--define(VERBOSE_DEF, {?VERBOSE_OPT, flag}).
--define(MINIMAL_DEF, {?MINIMAL_OPT, flag}).
--define(ENABLED_DEF, {?ENABLED_OPT, flag}).
--define(ENABLED_ALL_DEF, {?ENABLED_ALL_OPT, flag}).
-
--define(GLOBAL_DEFS, []).
-
--define(COMMANDS,
- [{list, [?VERBOSE_DEF, ?MINIMAL_DEF, ?ENABLED_DEF, ?ENABLED_ALL_DEF]},
- enable,
- disable]).
-
%%----------------------------------------------------------------------------
-ifdef(use_specs).
@@ -100,8 +88,13 @@ dependencies(Reverse, Sources, AllPlugins) ->
{ok, G} = rabbit_misc:build_acyclic_graph(
fun (App, _Deps) -> [{App, App}] end,
fun (App, Deps) -> [{App, Dep} || Dep <- Deps] end,
- [{Name, Deps}
- || #plugin{name = Name, dependencies = Deps} <- AllPlugins]),
+ lists:ukeysort(
+ 1, [{Name, Deps} ||
+ #plugin{name = Name,
+ dependencies = Deps} <- AllPlugins] ++
+ [{Dep, []} ||
+ #plugin{dependencies = Deps} <- AllPlugins,
+ Dep <- Deps])),
Dests = case Reverse of
false -> digraph_utils:reachable(Sources, G);
true -> digraph_utils:reaching(Sources, G)
diff --git a/src/rabbit_plugins_main.erl b/src/rabbit_plugins_main.erl
index 572cf150..2158d1da 100644
--- a/src/rabbit_plugins_main.erl
+++ b/src/rabbit_plugins_main.erl
@@ -108,16 +108,19 @@ action(enable, ToEnable0, _Opts, PluginsFile, PluginsDir) ->
Enabled, AllPlugins),
ToEnable = [list_to_atom(Name) || Name <- ToEnable0],
Missing = ToEnable -- plugin_names(AllPlugins),
- case Missing of
- [] -> ok;
- _ -> throw({error_string,
- fmt_list("The following plugins could not be found:",
- Missing)})
- end,
NewEnabled = lists:usort(Enabled ++ ToEnable),
- write_enabled_plugins(PluginsFile, NewEnabled),
NewImplicitlyEnabled = rabbit_plugins:dependencies(false,
NewEnabled, AllPlugins),
+ MissingDeps = (NewImplicitlyEnabled -- plugin_names(AllPlugins)) -- Missing,
+ case {Missing, MissingDeps} of
+ {[], []} -> ok;
+ {Miss, []} -> throw({error_string, fmt_missing("plugins", Miss)});
+ {[], Miss} -> throw({error_string, fmt_missing("dependencies", Miss)});
+ {_, _} -> throw({error_string,
+ fmt_missing("plugins", Missing) ++
+ fmt_missing("dependencies", MissingDeps)})
+ end,
+ write_enabled_plugins(PluginsFile, NewEnabled),
maybe_warn_mochiweb(NewImplicitlyEnabled),
case NewEnabled -- ImplicitlyEnabled of
[] -> io:format("Plugin configuration unchanged.~n");
@@ -183,9 +186,12 @@ format_plugins(Pattern, Opts, PluginsFile, PluginsDir) ->
EnabledImplicitly =
rabbit_plugins:dependencies(false, EnabledExplicitly,
AvailablePlugins) -- EnabledExplicitly,
+ Missing = [#plugin{name = Name, dependencies = []} ||
+ Name <- ((EnabledExplicitly ++ EnabledImplicitly) --
+ plugin_names(AvailablePlugins))],
{ok, RE} = re:compile(Pattern),
Plugins = [ Plugin ||
- Plugin = #plugin{name = Name} <- AvailablePlugins,
+ Plugin = #plugin{name = Name} <- AvailablePlugins ++ Missing,
re:run(atom_to_list(Name), RE, [{capture, none}]) =:= match,
if OnlyEnabled -> lists:member(Name, EnabledExplicitly);
OnlyEnabledAll -> (lists:member(Name,
@@ -196,30 +202,35 @@ format_plugins(Pattern, Opts, PluginsFile, PluginsDir) ->
Plugins1 = usort_plugins(Plugins),
MaxWidth = lists:max([length(atom_to_list(Name)) ||
#plugin{name = Name} <- Plugins1] ++ [0]),
- [format_plugin(P, EnabledExplicitly, EnabledImplicitly, Format,
- MaxWidth) || P <- Plugins1],
+ [format_plugin(P, EnabledExplicitly, EnabledImplicitly,
+ plugin_names(Missing), Format, MaxWidth) || P <- Plugins1],
ok.
format_plugin(#plugin{name = Name, version = Version,
description = Description, dependencies = Deps},
- EnabledExplicitly, EnabledImplicitly, Format, MaxWidth) ->
+ EnabledExplicitly, EnabledImplicitly, Missing,
+ Format, MaxWidth) ->
Glyph = case {lists:member(Name, EnabledExplicitly),
- lists:member(Name, EnabledImplicitly)} of
- {true, false} -> "[E]";
- {false, true} -> "[e]";
- _ -> "[ ]"
+ lists:member(Name, EnabledImplicitly),
+ lists:member(Name, Missing)} of
+ {true, false, false} -> "[E]";
+ {false, true, false} -> "[e]";
+ {_, _, true} -> "[!]";
+ _ -> "[ ]"
end,
+ Opt = fun (_F, A, A) -> ok;
+ ( F, A, _) -> io:format(F, [A])
+ end,
case Format of
minimal -> io:format("~s~n", [Name]);
- normal -> io:format("~s ~-" ++ integer_to_list(MaxWidth) ++
- "w ~s~n", [Glyph, Name, Version]);
+ normal -> io:format("~s ~-" ++ integer_to_list(MaxWidth) ++ "w ",
+ [Glyph, Name]),
+ Opt("~s", Version, undefined),
+ io:format("~n");
verbose -> io:format("~s ~w~n", [Glyph, Name]),
- io:format(" Version: \t~s~n", [Version]),
- case Deps of
- [] -> ok;
- _ -> io:format(" Dependencies:\t~p~n", [Deps])
- end,
- io:format(" Description:\t~s~n", [Description]),
+ Opt(" Version: \t~s~n", Version, undefined),
+ Opt(" Dependencies:\t~p~n", Deps, []),
+ Opt(" Description: \t~s~n", Description, undefined),
io:format("~n")
end.
@@ -230,6 +241,9 @@ fmt_list(Header, Plugins) ->
lists:flatten(
[Header, $\n, [io_lib:format(" ~s~n", [P]) || P <- Plugins]]).
+fmt_missing(Desc, Missing) ->
+ fmt_list("The following " ++ Desc ++ " could not be found:", Missing).
+
usort_plugins(Plugins) ->
lists:usort(fun plugins_cmp/2, Plugins).
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index aef48b20..928786e9 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -35,23 +35,23 @@
%%--------------------------------------------------------------------------
--record(v1, {parent, sock, connection, callback, recv_len, pending_recv,
+-record(v1, {parent, sock, name, connection, callback, recv_len, pending_recv,
connection_state, queue_collector, heartbeater, stats_timer,
channel_sup_sup_pid, start_heartbeat_fun, buf, buf_len,
auth_mechanism, auth_state, conserve_resources,
- last_blocked_by, last_blocked_at}).
+ last_blocked_by, last_blocked_at, host, peer_host,
+ port, peer_port}).
-define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt,
send_pend, state, last_blocked_by, last_blocked_age,
channels]).
--define(CREATION_EVENT_KEYS, [pid, name, address, port, peer_address, peer_port,
- ssl, peer_cert_subject, peer_cert_issuer,
- peer_cert_validity, auth_mechanism,
- ssl_protocol, ssl_key_exchange,
- ssl_cipher, ssl_hash,
- protocol, user, vhost, timeout, frame_max,
- client_properties]).
+-define(CREATION_EVENT_KEYS,
+ [pid, name, port, peer_port, host,
+ peer_host, ssl, peer_cert_subject, peer_cert_issuer,
+ peer_cert_validity, auth_mechanism, ssl_protocol,
+ ssl_key_exchange, ssl_cipher, ssl_hash, protocol, user, vhost,
+ timeout, frame_max, client_properties]).
-define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]).
@@ -192,16 +192,20 @@ socket_op(Sock, Fun) ->
name(Sock) ->
socket_op(Sock, fun (S) -> rabbit_net:connection_string(S, inbound) end).
+socket_ends(Sock) ->
+ socket_op(Sock, fun (S) -> rabbit_net:socket_ends(S, inbound) end).
+
start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb,
Sock, SockTransform) ->
process_flag(trap_exit, true),
- ConnStr = name(Sock),
- log(info, "accepting AMQP connection ~p (~s)~n", [self(), ConnStr]),
+ Name = name(Sock),
+ log(info, "accepting AMQP connection ~p (~s)~n", [self(), Name]),
ClientSock = socket_op(Sock, SockTransform),
- erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(),
- handshake_timeout),
+ erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(), handshake_timeout),
+ {PeerHost, PeerPort, Host, Port} = socket_ends(Sock),
State = #v1{parent = Parent,
sock = ClientSock,
+ name = list_to_binary(Name),
connection = #connection{
protocol = none,
user = none,
@@ -224,19 +228,23 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb,
auth_state = none,
conserve_resources = false,
last_blocked_by = none,
- last_blocked_at = never},
+ last_blocked_at = never,
+ host = Host,
+ peer_host = PeerHost,
+ port = Port,
+ peer_port = PeerPort},
try
ok = inet_op(fun () -> rabbit_net:tune_buffer_size(ClientSock) end),
recvloop(Deb, switch_callback(rabbit_event:init_stats_timer(
State, #v1.stats_timer),
handshake, 8)),
- log(info, "closing AMQP connection ~p (~s)~n", [self(), ConnStr])
+ log(info, "closing AMQP connection ~p (~s)~n", [self(), Name])
catch
Ex -> log(case Ex of
connection_closed_abruptly -> warning;
_ -> error
end, "closing AMQP connection ~p (~s):~n~p~n",
- [self(), ConnStr, Ex])
+ [self(), Name, Ex])
after
%% We don't call gen_tcp:close/1 here since it waits for
%% pending output to be sent, which results in unnecessary
@@ -341,6 +349,8 @@ handle_other({'$gen_cast', force_event_refresh}, Deb, State)
handle_other({'$gen_cast', force_event_refresh}, Deb, State) ->
%% Ignore, we will emit a created event once we start running.
mainloop(Deb, State);
+handle_other(ensure_stats, Deb, State) ->
+ mainloop(Deb, ensure_stats_timer(State));
handle_other(emit_stats, Deb, State) ->
mainloop(Deb, emit_stats(State));
handle_other({system, From, Request}, Deb, State = #v1{parent = Parent}) ->
@@ -491,6 +501,14 @@ handle_exception(State, Channel, Reason) ->
timer:sleep(?SILENT_CLOSE_DELAY * 1000),
throw({handshake_error, State#v1.connection_state, Channel, Reason}).
+%% we've "lost sync" with the client and hence must not accept any
+%% more input
+fatal_frame_error(Error, Type, Channel, Payload, State) ->
+ frame_error(Error, Type, Channel, Payload, State),
+ %% grace period to allow transmission of error
+ timer:sleep(?SILENT_CLOSE_DELAY * 1000),
+ throw(fatal_frame_error).
+
frame_error(Error, Type, Channel, Payload, State) ->
{Str, Bin} = payload_snippet(Payload),
handle_exception(State, Channel,
@@ -513,7 +531,7 @@ payload_snippet(<<Snippet:16/binary, _/binary>>) ->
%%--------------------------------------------------------------------------
create_channel(Channel, State) ->
- #v1{sock = Sock, queue_collector = Collector,
+ #v1{sock = Sock, name = Name, queue_collector = Collector,
channel_sup_sup_pid = ChanSupSup,
connection = #connection{protocol = Protocol,
frame_max = FrameMax,
@@ -522,7 +540,7 @@ create_channel(Channel, State) ->
capabilities = Capabilities}} = State,
{ok, _ChSupPid, {ChPid, AState}} =
rabbit_channel_sup_sup:start_channel(
- ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), name(Sock),
+ ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), Name,
Protocol, User, VHost, Capabilities, Collector}),
MRef = erlang:monitor(process, ChPid),
put({ch_pid, ChPid}, {Channel, MRef}),
@@ -613,6 +631,17 @@ post_process_frame(_Frame, _ChPid, State) ->
%%--------------------------------------------------------------------------
+%% We allow clients to exceed the frame size a little bit since quite
+%% a few get it wrong - off-by 1 or 8 (empty frame size) are typical.
+-define(FRAME_SIZE_FUDGE, ?EMPTY_FRAME_SIZE).
+
+handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32>>,
+ State = #v1{connection = #connection{frame_max = FrameMax}})
+ when FrameMax /= 0 andalso
+ PayloadSize > FrameMax - ?EMPTY_FRAME_SIZE + ?FRAME_SIZE_FUDGE ->
+ fatal_frame_error(
+ {frame_too_large, PayloadSize, FrameMax - ?EMPTY_FRAME_SIZE},
+ Type, Channel, <<>>, State);
handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32>>, State) ->
ensure_stats_timer(
switch_callback(State, {frame_payload, Type, Channel, PayloadSize},
@@ -623,8 +652,8 @@ handle_input({frame_payload, Type, Channel, PayloadSize}, Data, State) ->
case EndMarker of
?FRAME_END -> State1 = handle_frame(Type, Channel, Payload, State),
switch_callback(State1, frame_header, 7);
- _ -> frame_error({invalid_frame_end_marker, EndMarker},
- Type, Channel, Payload, State)
+ _ -> fatal_frame_error({invalid_frame_end_marker, EndMarker},
+ Type, Channel, Payload, State)
end;
%% The two rules pertaining to version negotiation:
@@ -871,82 +900,66 @@ auth_phase(Response,
infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
-i(pid, #v1{}) ->
- self();
-i(name, #v1{sock = Sock}) ->
- list_to_binary(name(Sock));
-i(address, #v1{sock = Sock}) ->
- socket_info(fun rabbit_net:sockname/1, fun ({A, _}) -> A end, Sock);
-i(port, #v1{sock = Sock}) ->
- socket_info(fun rabbit_net:sockname/1, fun ({_, P}) -> P end, Sock);
-i(peer_address, #v1{sock = Sock}) ->
- socket_info(fun rabbit_net:peername/1, fun ({A, _}) -> A end, Sock);
-i(peer_port, #v1{sock = Sock}) ->
- socket_info(fun rabbit_net:peername/1, fun ({_, P}) -> P end, Sock);
-i(ssl, #v1{sock = Sock}) ->
- rabbit_net:is_ssl(Sock);
-i(ssl_protocol, #v1{sock = Sock}) ->
- ssl_info(fun ({P, _}) -> P end, Sock);
-i(ssl_key_exchange, #v1{sock = Sock}) ->
- ssl_info(fun ({_, {K, _, _}}) -> K end, Sock);
-i(ssl_cipher, #v1{sock = Sock}) ->
- ssl_info(fun ({_, {_, C, _}}) -> C end, Sock);
-i(ssl_hash, #v1{sock = Sock}) ->
- ssl_info(fun ({_, {_, _, H}}) -> H end, Sock);
-i(peer_cert_issuer, #v1{sock = Sock}) ->
- cert_info(fun rabbit_ssl:peer_cert_issuer/1, Sock);
-i(peer_cert_subject, #v1{sock = Sock}) ->
- cert_info(fun rabbit_ssl:peer_cert_subject/1, Sock);
-i(peer_cert_validity, #v1{sock = Sock}) ->
- cert_info(fun rabbit_ssl:peer_cert_validity/1, Sock);
-i(SockStat, #v1{sock = Sock}) when SockStat =:= recv_oct;
- SockStat =:= recv_cnt;
- SockStat =:= send_oct;
- SockStat =:= send_cnt;
- SockStat =:= send_pend ->
- socket_info(fun (S) -> rabbit_net:getstat(S, [SockStat]) end,
- fun ([{_, I}]) -> I end, Sock);
-i(state, #v1{connection_state = S}) ->
- S;
-i(last_blocked_by, #v1{last_blocked_by = By}) ->
- By;
-i(last_blocked_age, #v1{last_blocked_at = never}) ->
+i(pid, #v1{}) -> self();
+i(name, #v1{name = Name}) -> Name;
+i(host, #v1{host = Host}) -> Host;
+i(peer_host, #v1{peer_host = PeerHost}) -> PeerHost;
+i(port, #v1{port = Port}) -> Port;
+i(peer_port, #v1{peer_port = PeerPort}) -> PeerPort;
+i(SockStat, S) when SockStat =:= recv_oct;
+ SockStat =:= recv_cnt;
+ SockStat =:= send_oct;
+ SockStat =:= send_cnt;
+ SockStat =:= send_pend ->
+ socket_info(fun (Sock) -> rabbit_net:getstat(Sock, [SockStat]) end,
+ fun ([{_, I}]) -> I end, S);
+i(ssl, #v1{sock = Sock}) -> rabbit_net:is_ssl(Sock);
+i(ssl_protocol, S) -> ssl_info(fun ({P, _}) -> P end, S);
+i(ssl_key_exchange, S) -> ssl_info(fun ({_, {K, _, _}}) -> K end, S);
+i(ssl_cipher, S) -> ssl_info(fun ({_, {_, C, _}}) -> C end, S);
+i(ssl_hash, S) -> ssl_info(fun ({_, {_, _, H}}) -> H end, S);
+i(peer_cert_issuer, S) -> cert_info(fun rabbit_ssl:peer_cert_issuer/1, S);
+i(peer_cert_subject, S) -> cert_info(fun rabbit_ssl:peer_cert_subject/1, S);
+i(peer_cert_validity, S) -> cert_info(fun rabbit_ssl:peer_cert_validity/1, S);
+i(state, #v1{connection_state = CS}) -> CS;
+i(last_blocked_by, #v1{last_blocked_by = By}) -> By;
+i(last_blocked_age, #v1{last_blocked_at = never}) ->
infinity;
-i(last_blocked_age, #v1{last_blocked_at = T}) ->
+i(last_blocked_age, #v1{last_blocked_at = T}) ->
timer:now_diff(erlang:now(), T) / 1000000;
-i(channels, #v1{}) ->
- length(all_channels());
-i(protocol, #v1{connection = #connection{protocol = none}}) ->
- none;
-i(protocol, #v1{connection = #connection{protocol = Protocol}}) ->
- Protocol:version();
-i(auth_mechanism, #v1{auth_mechanism = none}) ->
+i(channels, #v1{}) -> length(all_channels());
+i(auth_mechanism, #v1{auth_mechanism = none}) ->
none;
-i(auth_mechanism, #v1{auth_mechanism = Mechanism}) ->
+i(auth_mechanism, #v1{auth_mechanism = Mechanism}) ->
proplists:get_value(name, Mechanism:description());
-i(user, #v1{connection = #connection{user = #user{username = Username}}}) ->
- Username;
-i(user, #v1{connection = #connection{user = none}}) ->
+i(protocol, #v1{connection = #connection{protocol = none}}) ->
+ none;
+i(protocol, #v1{connection = #connection{protocol = Protocol}}) ->
+ Protocol:version();
+i(user, #v1{connection = #connection{user = none}}) ->
'';
-i(vhost, #v1{connection = #connection{vhost = VHost}}) ->
+i(user, #v1{connection = #connection{user = #user{
+ username = Username}}}) ->
+ Username;
+i(vhost, #v1{connection = #connection{vhost = VHost}}) ->
VHost;
-i(timeout, #v1{connection = #connection{timeout_sec = Timeout}}) ->
+i(timeout, #v1{connection = #connection{timeout_sec = Timeout}}) ->
Timeout;
-i(frame_max, #v1{connection = #connection{frame_max = FrameMax}}) ->
+i(frame_max, #v1{connection = #connection{frame_max = FrameMax}}) ->
FrameMax;
-i(client_properties, #v1{connection = #connection{
- client_properties = ClientProperties}}) ->
+i(client_properties, #v1{connection = #connection{client_properties =
+ ClientProperties}}) ->
ClientProperties;
i(Item, #v1{}) ->
throw({bad_argument, Item}).
-socket_info(Get, Select, Sock) ->
+socket_info(Get, Select, #v1{sock = Sock}) ->
case Get(Sock) of
{ok, T} -> Select(T);
{error, _} -> ''
end.
-ssl_info(F, Sock) ->
+ssl_info(F, #v1{sock = Sock}) ->
%% The first ok form is R14
%% The second is R13 - the extra term is exportability (by inspection,
%% the docs are wrong)
@@ -957,7 +970,7 @@ ssl_info(F, Sock) ->
{ok, {P, {K, C, H, _}}} -> F({P, {K, C, H}})
end.
-cert_info(F, Sock) ->
+cert_info(F, #v1{sock = Sock}) ->
case rabbit_net:peercert(Sock) of
nossl -> '';
{error, no_peercert} -> '';
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 962bb648..81180ebe 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -18,7 +18,7 @@
-compile([export_all]).
--export([all_tests/0, test_parsing/0]).
+-export([all_tests/0]).
-import(rabbit_misc, [pget/2]).
@@ -41,11 +41,12 @@ all_tests() ->
passed = test_multi_call(),
passed = test_file_handle_cache(),
passed = test_backing_queue(),
+ passed = test_rabbit_basic_header_handling(),
passed = test_priority_queue(),
passed = test_pg_local(),
passed = test_unfold(),
passed = test_supervisor_delayed_restart(),
- passed = test_parsing(),
+ passed = test_table_codec(),
passed = test_content_framing(),
passed = test_content_transcoding(),
passed = test_topic_matching(),
@@ -71,6 +72,7 @@ all_tests() ->
passed = test_configurable_server_properties(),
passed.
+
do_if_secondary_node(Up, Down) ->
SecondaryNode = rabbit_nodes:make("hare"),
@@ -159,6 +161,78 @@ test_multi_call() ->
exit(Pid3, bang),
passed.
+test_rabbit_basic_header_handling() ->
+ passed = write_table_with_invalid_existing_type_test(),
+ passed = invalid_existing_headers_test(),
+ passed = disparate_invalid_header_entries_accumulate_separately_test(),
+ passed = corrupt_or_invalid_headers_are_overwritten_test(),
+ passed = invalid_same_header_entry_accumulation_test(),
+ passed.
+
+-define(XDEATH_TABLE,
+ [{<<"reason">>, longstr, <<"blah">>},
+ {<<"queue">>, longstr, <<"foo.bar.baz">>},
+ {<<"exchange">>, longstr, <<"my-exchange">>},
+ {<<"routing-keys">>, array, []}]).
+
+-define(ROUTE_TABLE, [{<<"redelivered">>, bool, <<"true">>}]).
+
+-define(BAD_HEADER(K), {<<K>>, longstr, <<"bad ", K>>}).
+-define(BAD_HEADER2(K, Suf), {<<K>>, longstr, <<"bad ", K, Suf>>}).
+-define(FOUND_BAD_HEADER(K), {<<K>>, array, [{longstr, <<"bad ", K>>}]}).
+
+write_table_with_invalid_existing_type_test() ->
+ prepend_check(<<"header1">>, ?XDEATH_TABLE, [?BAD_HEADER("header1")]),
+ passed.
+
+invalid_existing_headers_test() ->
+ Headers =
+ prepend_check(<<"header2">>, ?ROUTE_TABLE, [?BAD_HEADER("header2")]),
+ {array, [{table, ?ROUTE_TABLE}]} =
+ rabbit_misc:table_lookup(Headers, <<"header2">>),
+ passed.
+
+disparate_invalid_header_entries_accumulate_separately_test() ->
+ BadHeaders = [?BAD_HEADER("header2")],
+ Headers = prepend_check(<<"header2">>, ?ROUTE_TABLE, BadHeaders),
+ Headers2 = prepend_check(<<"header1">>, ?XDEATH_TABLE,
+ [?BAD_HEADER("header1") | Headers]),
+ {table, [?FOUND_BAD_HEADER("header1"),
+ ?FOUND_BAD_HEADER("header2")]} =
+ rabbit_misc:table_lookup(Headers2, ?INVALID_HEADERS_KEY),
+ passed.
+
+corrupt_or_invalid_headers_are_overwritten_test() ->
+ Headers0 = [?BAD_HEADER("header1"),
+ ?BAD_HEADER("x-invalid-headers")],
+ Headers1 = prepend_check(<<"header1">>, ?XDEATH_TABLE, Headers0),
+ {table,[?FOUND_BAD_HEADER("header1"),
+ ?FOUND_BAD_HEADER("x-invalid-headers")]} =
+ rabbit_misc:table_lookup(Headers1, ?INVALID_HEADERS_KEY),
+ passed.
+
+invalid_same_header_entry_accumulation_test() ->
+ BadHeader1 = ?BAD_HEADER2("header1", "a"),
+ Headers = prepend_check(<<"header1">>, ?ROUTE_TABLE, [BadHeader1]),
+ Headers2 = prepend_check(<<"header1">>, ?ROUTE_TABLE,
+ [?BAD_HEADER2("header1", "b") | Headers]),
+ {table, InvalidHeaders} =
+ rabbit_misc:table_lookup(Headers2, ?INVALID_HEADERS_KEY),
+ {array, [{longstr,<<"bad header1b">>},
+ {longstr,<<"bad header1a">>}]} =
+ rabbit_misc:table_lookup(InvalidHeaders, <<"header1">>),
+ passed.
+
+prepend_check(HeaderKey, HeaderTable, Headers) ->
+ Headers1 = rabbit_basic:prepend_table_header(
+ HeaderKey, HeaderTable, Headers),
+ {table, Invalid} =
+ rabbit_misc:table_lookup(Headers1, ?INVALID_HEADERS_KEY),
+ {Type, Value} = rabbit_misc:table_lookup(Headers, HeaderKey),
+ {array, [{Type, Value} | _]} =
+ rabbit_misc:table_lookup(Invalid, HeaderKey),
+ Headers1.
+
test_priority_queue() ->
false = priority_queue:is_queue(not_a_queue),
@@ -350,113 +424,45 @@ test_unfold() ->
end, 10),
passed.
-test_parsing() ->
- passed = test_content_properties(),
- passed = test_field_values(),
- passed.
-
-test_content_prop_encoding(Datum, Binary) ->
- Types = [element(1, E) || E <- Datum],
- Values = [element(2, E) || E <- Datum],
- Binary = rabbit_binary_generator:encode_properties(Types, Values). %% assertion
-
-test_content_properties() ->
- test_content_prop_encoding([], <<0, 0>>),
- test_content_prop_encoding([{bit, true}, {bit, false}, {bit, true}, {bit, false}],
- <<16#A0, 0>>),
- test_content_prop_encoding([{bit, true}, {octet, 123}, {bit, true}, {octet, undefined},
- {bit, true}],
- <<16#E8,0,123>>),
- test_content_prop_encoding([{bit, true}, {octet, 123}, {octet, 123}, {bit, true}],
- <<16#F0,0,123,123>>),
- test_content_prop_encoding([{bit, true}, {shortstr, <<"hi">>}, {bit, true},
- {shortint, 54321}, {bit, true}],
- <<16#F8,0,2,"hi",16#D4,16#31>>),
- test_content_prop_encoding([{bit, true}, {shortstr, undefined}, {bit, true},
- {shortint, 54321}, {bit, true}],
- <<16#B8,0,16#D4,16#31>>),
- test_content_prop_encoding([{table, [{<<"a signedint">>, signedint, 12345678},
- {<<"a longstr">>, longstr, <<"yes please">>},
- {<<"a decimal">>, decimal, {123, 12345678}},
- {<<"a timestamp">>, timestamp, 123456789012345},
- {<<"a nested table">>, table,
- [{<<"one">>, signedint, 1},
- {<<"two">>, signedint, 2}]}]}],
- <<
- %% property-flags
- 16#8000:16,
-
- %% property-list:
-
- %% table
- 117:32, % table length in bytes
-
- 11,"a signedint", % name
- "I",12345678:32, % type and value
-
- 9,"a longstr",
- "S",10:32,"yes please",
-
- 9,"a decimal",
- "D",123,12345678:32,
-
- 11,"a timestamp",
- "T", 123456789012345:64,
-
- 14,"a nested table",
- "F",
- 18:32,
-
- 3,"one",
- "I",1:32,
-
- 3,"two",
- "I",2:32 >>),
- passed.
-
-test_field_values() ->
+test_table_codec() ->
%% FIXME this does not test inexact numbers (double and float) yet,
%% because they won't pass the equality assertions
- test_content_prop_encoding(
- [{table, [{<<"longstr">>, longstr, <<"Here is a long string">>},
- {<<"signedint">>, signedint, 12345},
- {<<"decimal">>, decimal, {3, 123456}},
- {<<"timestamp">>, timestamp, 109876543209876},
- {<<"table">>, table, [{<<"one">>, signedint, 54321},
- {<<"two">>, longstr, <<"A long string">>}]},
- {<<"byte">>, byte, 255},
- {<<"long">>, long, 1234567890},
- {<<"short">>, short, 655},
- {<<"bool">>, bool, true},
- {<<"binary">>, binary, <<"a binary string">>},
- {<<"void">>, void, undefined},
- {<<"array">>, array, [{signedint, 54321},
- {longstr, <<"A long string">>}]}
-
- ]}],
- <<
- %% property-flags
- 16#8000:16,
- %% table length in bytes
- 228:32,
-
- 7,"longstr", "S", 21:32, "Here is a long string", % = 34
- 9,"signedint", "I", 12345:32/signed, % + 15 = 49
- 7,"decimal", "D", 3, 123456:32, % + 14 = 63
- 9,"timestamp", "T", 109876543209876:64, % + 19 = 82
- 5,"table", "F", 31:32, % length of table % + 11 = 93
- 3,"one", "I", 54321:32, % + 9 = 102
- 3,"two", "S", 13:32, "A long string", % + 22 = 124
- 4,"byte", "b", 255:8, % + 7 = 131
- 4,"long", "l", 1234567890:64, % + 14 = 145
- 5,"short", "s", 655:16, % + 9 = 154
- 4,"bool", "t", 1, % + 7 = 161
- 6,"binary", "x", 15:32, "a binary string", % + 27 = 188
- 4,"void", "V", % + 6 = 194
- 5,"array", "A", 23:32, % + 11 = 205
- "I", 54321:32, % + 5 = 210
- "S", 13:32, "A long string" % + 18 = 228
- >>),
+ Table = [{<<"longstr">>, longstr, <<"Here is a long string">>},
+ {<<"signedint">>, signedint, 12345},
+ {<<"decimal">>, decimal, {3, 123456}},
+ {<<"timestamp">>, timestamp, 109876543209876},
+ {<<"table">>, table, [{<<"one">>, signedint, 54321},
+ {<<"two">>, longstr,
+ <<"A long string">>}]},
+ {<<"byte">>, byte, 255},
+ {<<"long">>, long, 1234567890},
+ {<<"short">>, short, 655},
+ {<<"bool">>, bool, true},
+ {<<"binary">>, binary, <<"a binary string">>},
+ {<<"void">>, void, undefined},
+ {<<"array">>, array, [{signedint, 54321},
+ {longstr, <<"A long string">>}]}
+ ],
+ Binary = <<
+ 7,"longstr", "S", 21:32, "Here is a long string",
+ 9,"signedint", "I", 12345:32/signed,
+ 7,"decimal", "D", 3, 123456:32,
+ 9,"timestamp", "T", 109876543209876:64,
+ 5,"table", "F", 31:32, % length of table
+ 3,"one", "I", 54321:32,
+ 3,"two", "S", 13:32, "A long string",
+ 4,"byte", "b", 255:8,
+ 4,"long", "l", 1234567890:64,
+ 5,"short", "s", 655:16,
+ 4,"bool", "t", 1,
+ 6,"binary", "x", 15:32, "a binary string",
+ 4,"void", "V",
+ 5,"array", "A", 23:32,
+ "I", 54321:32,
+ "S", 13:32, "A long string"
+ >>,
+ Binary = rabbit_binary_generator:generate_table(Table),
+ Table = rabbit_binary_parser:parse_table(Binary),
passed.
%% Test that content frames don't exceed frame-max
@@ -914,12 +920,12 @@ test_dynamic_mirroring() ->
Test({b,[a,c],0},<<"nodes">>,[<<"a">>,<<"b">>,<<"c">>],{b,[a]},[a,b,c,d]),
%% Add two nodes and drop one
Test({a,[b,c],0},<<"nodes">>,[<<"a">>,<<"b">>,<<"c">>],{a,[d]},[a,b,c,d]),
- %% Promote slave to master by policy
- Test({a,[b,c],0},<<"nodes">>,[<<"a">>,<<"b">>,<<"c">>],{d,[a]},[a,b,c,d]),
%% Don't try to include nodes that are not running
Test({a,[b], 0},<<"nodes">>,[<<"a">>,<<"b">>,<<"f">>],{a,[b]},[a,b,c,d]),
%% If we can't find any of the nodes listed then just keep the master
Test({a,[], 0},<<"nodes">>,[<<"f">>,<<"g">>,<<"h">>],{a,[b]},[a,b,c,d]),
+ %% And once that's happened, still keep the master even when not listed
+ Test({a,[b,c],0},<<"nodes">>,[<<"b">>,<<"c">>], {a,[]}, [a,b,c,d]),
Test({a,[], 1},<<"exactly">>,2,{a,[]}, [a,b,c,d]),
Test({a,[], 2},<<"exactly">>,3,{a,[]}, [a,b,c,d]),
@@ -1125,6 +1131,9 @@ test_server_status() ->
HWM = vm_memory_monitor:get_vm_memory_high_watermark(),
ok = control_action(set_vm_memory_high_watermark, ["1"]),
ok = control_action(set_vm_memory_high_watermark, ["1.0"]),
+ %% this will trigger an alarm
+ ok = control_action(set_vm_memory_high_watermark, ["0.0"]),
+ %% reset
ok = control_action(set_vm_memory_high_watermark, [float_to_list(HWM)]),
%% eval
@@ -1277,8 +1286,7 @@ test_statistics() ->
QName = receive #'queue.declare_ok'{queue = Q0} -> Q0
after ?TIMEOUT -> throw(failed_to_receive_queue_declare_ok)
end,
- {ok, Q} = rabbit_amqqueue:lookup(rabbit_misc:r(<<"/">>, queue, QName)),
- QPid = Q#amqqueue.pid,
+ QRes = rabbit_misc:r(<<"/">>, queue, QName),
X = rabbit_misc:r(<<"/">>, exchange, <<"">>),
rabbit_tests_event_receiver:start(self(), [node()], [channel_stats]),
@@ -1302,9 +1310,9 @@ test_statistics() ->
length(proplists:get_value(
channel_queue_exchange_stats, E)) > 0
end),
- [{QPid,[{get,1}]}] = proplists:get_value(channel_queue_stats, Event2),
+ [{QRes, [{get,1}]}] = proplists:get_value(channel_queue_stats, Event2),
[{X,[{publish,1}]}] = proplists:get_value(channel_exchange_stats, Event2),
- [{{QPid,X},[{publish,1}]}] =
+ [{{QRes,X},[{publish,1}]}] =
proplists:get_value(channel_queue_exchange_stats, Event2),
%% Check the stats remove stuff on queue deletion
@@ -1329,31 +1337,31 @@ test_refresh_events(SecondaryNode) ->
[channel_created, queue_created]),
{_Writer, Ch} = test_spawn(),
- expect_events(Ch, channel_created),
+ expect_events(pid, Ch, channel_created),
rabbit_channel:shutdown(Ch),
{_Writer2, Ch2} = test_spawn(SecondaryNode),
- expect_events(Ch2, channel_created),
+ expect_events(pid, Ch2, channel_created),
rabbit_channel:shutdown(Ch2),
- {new, #amqqueue { pid = QPid } = Q} =
+ {new, #amqqueue{name = QName} = Q} =
rabbit_amqqueue:declare(test_queue(), false, false, [], none),
- expect_events(QPid, queue_created),
+ expect_events(name, QName, queue_created),
rabbit_amqqueue:delete(Q, false, false),
rabbit_tests_event_receiver:stop(),
passed.
-expect_events(Pid, Type) ->
- expect_event(Pid, Type),
+expect_events(Tag, Key, Type) ->
+ expect_event(Tag, Key, Type),
rabbit:force_event_refresh(),
- expect_event(Pid, Type).
+ expect_event(Tag, Key, Type).
-expect_event(Pid, Type) ->
+expect_event(Tag, Key, Type) ->
receive #event{type = Type, props = Props} ->
- case pget(pid, Props) of
- Pid -> ok;
- _ -> expect_event(Pid, Type)
+ case pget(Tag, Props) of
+ Key -> ok;
+ _ -> expect_event(Tag, Key, Type)
end
after ?TIMEOUT -> throw({failed_to_receive_event, Type})
end.
@@ -2208,6 +2216,10 @@ variable_queue_publish(IsPersistent, Count, VQ) ->
variable_queue_publish(IsPersistent, Count, fun (_N, P) -> P end, VQ).
variable_queue_publish(IsPersistent, Count, PropFun, VQ) ->
+ variable_queue_publish(IsPersistent, Count, PropFun,
+ fun (_N) -> <<>> end, VQ).
+
+variable_queue_publish(IsPersistent, Count, PropFun, PayloadFun, VQ) ->
lists:foldl(
fun (N, VQN) ->
rabbit_variable_queue:publish(
@@ -2216,7 +2228,8 @@ variable_queue_publish(IsPersistent, Count, PropFun, VQ) ->
<<>>, #'P_basic'{delivery_mode = case IsPersistent of
true -> 2;
false -> 1
- end}, <<>>),
+ end},
+ PayloadFun(N)),
PropFun(N, #message_properties{}), self(), VQN)
end, VQ, lists:seq(1, Count)).
@@ -2224,8 +2237,9 @@ variable_queue_fetch(Count, IsPersistent, IsDelivered, Len, VQ) ->
lists:foldl(fun (N, {VQN, AckTagsAcc}) ->
Rem = Len - N,
{{#basic_message { is_persistent = IsPersistent },
- IsDelivered, AckTagN, Rem}, VQM} =
+ IsDelivered, AckTagN}, VQM} =
rabbit_variable_queue:fetch(true, VQN),
+ Rem = rabbit_variable_queue:len(VQM),
{VQM, [AckTagN | AckTagsAcc]}
end, {VQ, []}, lists:seq(1, Count)).
@@ -2291,12 +2305,27 @@ test_variable_queue() ->
fun test_variable_queue_partial_segments_delta_thing/1,
fun test_variable_queue_all_the_bits_not_covered_elsewhere1/1,
fun test_variable_queue_all_the_bits_not_covered_elsewhere2/1,
+ fun test_drop/1,
+ fun test_variable_queue_fold_msg_on_disk/1,
fun test_dropwhile/1,
fun test_dropwhile_varying_ram_duration/1,
fun test_variable_queue_ack_limiting/1,
- fun test_variable_queue_requeue/1]],
+ fun test_variable_queue_requeue/1,
+ fun test_variable_queue_fold/1]],
passed.
+test_variable_queue_fold(VQ0) ->
+ Count = rabbit_queue_index:next_segment_boundary(0) * 2 + 1,
+ VQ1 = rabbit_variable_queue:set_ram_duration_target(0, VQ0),
+ VQ2 = variable_queue_publish(
+ true, Count, fun (_, P) -> P end, fun erlang:term_to_binary/1, VQ1),
+ {Acc, VQ3} = rabbit_variable_queue:fold(fun (M, A) -> [M | A] end, [], VQ2),
+ true = [term_to_binary(N) || N <- lists:seq(Count, 1, -1)] ==
+ [list_to_binary(lists:reverse(P)) ||
+ #basic_message{ content = #content{ payload_fragments_rev = P}} <-
+ Acc],
+ VQ3.
+
test_variable_queue_requeue(VQ0) ->
Interval = 50,
Count = rabbit_queue_index:next_segment_boundary(0) + 2 * Interval,
@@ -2316,7 +2345,7 @@ test_variable_queue_requeue(VQ0) ->
VQM
end, VQ4, Subset),
VQ6 = lists:foldl(fun (AckTag, VQa) ->
- {{#basic_message{}, true, AckTag, _}, VQb} =
+ {{#basic_message{}, true, AckTag}, VQb} =
rabbit_variable_queue:fetch(true, VQa),
VQb
end, VQ5, lists:reverse(Acks)),
@@ -2352,6 +2381,22 @@ test_variable_queue_ack_limiting(VQ0) ->
VQ6.
+test_drop(VQ0) ->
+ %% start by sending a messages
+ VQ1 = variable_queue_publish(false, 1, VQ0),
+ %% drop message with AckRequired = true
+ {{MsgId, AckTag}, VQ2} = rabbit_variable_queue:drop(true, VQ1),
+ true = rabbit_variable_queue:is_empty(VQ2),
+ true = AckTag =/= undefinded,
+ %% drop again -> empty
+ {empty, VQ3} = rabbit_variable_queue:drop(false, VQ2),
+ %% requeue
+ {[MsgId], VQ4} = rabbit_variable_queue:requeue([AckTag], VQ3),
+ %% drop message with AckRequired = false
+ {{MsgId, undefined}, VQ5} = rabbit_variable_queue:drop(false, VQ4),
+ true = rabbit_variable_queue:is_empty(VQ5),
+ VQ5.
+
test_dropwhile(VQ0) ->
Count = 10,
@@ -2368,7 +2413,7 @@ test_dropwhile(VQ0) ->
%% fetch five now
VQ3 = lists:foldl(fun (_N, VQN) ->
- {{#basic_message{}, _, _, _}, VQM} =
+ {{#basic_message{}, _, _}, VQM} =
rabbit_variable_queue:fetch(false, VQN),
VQM
end, VQ2, lists:seq(6, Count)),
@@ -2421,7 +2466,8 @@ publish_fetch_and_ack(0, _Len, VQ0) ->
VQ0;
publish_fetch_and_ack(N, Len, VQ0) ->
VQ1 = variable_queue_publish(false, 1, VQ0),
- {{_Msg, false, AckTag, Len}, VQ2} = rabbit_variable_queue:fetch(true, VQ1),
+ {{_Msg, false, AckTag}, VQ2} = rabbit_variable_queue:fetch(true, VQ1),
+ Len = rabbit_variable_queue:len(VQ2),
{_Guids, VQ3} = rabbit_variable_queue:ack([AckTag], VQ2),
publish_fetch_and_ack(N-1, Len, VQ3).
@@ -2486,8 +2532,8 @@ test_variable_queue_all_the_bits_not_covered_elsewhere1(VQ0) ->
Count, VQ4),
_VQ6 = rabbit_variable_queue:terminate(shutdown, VQ5),
VQ7 = variable_queue_init(test_amqqueue(true), true),
- {{_Msg1, true, _AckTag1, Count1}, VQ8} =
- rabbit_variable_queue:fetch(true, VQ7),
+ {{_Msg1, true, _AckTag1}, VQ8} = rabbit_variable_queue:fetch(true, VQ7),
+ Count1 = rabbit_variable_queue:len(VQ8),
VQ9 = variable_queue_publish(false, 1, VQ8),
VQ10 = rabbit_variable_queue:set_ram_duration_target(0, VQ9),
{VQ11, _AckTags2} = variable_queue_fetch(Count1, true, true, Count, VQ10),
@@ -2506,6 +2552,13 @@ test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) ->
{empty, VQ8} = rabbit_variable_queue:fetch(false, VQ7),
VQ8.
+test_variable_queue_fold_msg_on_disk(VQ0) ->
+ VQ1 = variable_queue_publish(true, 1, VQ0),
+ {VQ2, AckTags} = variable_queue_fetch(1, true, false, 1, VQ1),
+ VQ3 = rabbit_variable_queue:foreach_ack(fun (_M, _A) -> ok end,
+ VQ2, AckTags),
+ VQ3.
+
test_queue_recover() ->
Count = 2 * rabbit_queue_index:next_segment_boundary(0),
{new, #amqqueue { pid = QPid, name = QName } = Q} =
@@ -2518,7 +2571,7 @@ test_queue_recover() ->
after 10000 -> exit(timeout_waiting_for_queue_death)
end,
rabbit_amqqueue:stop(),
- rabbit_amqqueue:start(),
+ rabbit_amqqueue:start(rabbit_amqqueue:recover()),
rabbit_amqqueue:with_or_die(
QName,
fun (Q1 = #amqqueue { pid = QPid1 }) ->
@@ -2527,10 +2580,11 @@ test_queue_recover() ->
rabbit_amqqueue:basic_get(Q1, self(), false),
exit(QPid1, shutdown),
VQ1 = variable_queue_init(Q, true),
- {{_Msg1, true, _AckTag1, CountMinusOne}, VQ2} =
+ {{_Msg1, true, _AckTag1}, VQ2} =
rabbit_variable_queue:fetch(true, VQ1),
+ CountMinusOne = rabbit_variable_queue:len(VQ2),
_VQ3 = rabbit_variable_queue:delete_and_terminate(shutdown, VQ2),
- rabbit_amqqueue:internal_delete(QName, QPid1)
+ rabbit_amqqueue:internal_delete(QName)
end),
passed.
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 8a3fd9d9..e2566e10 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -18,10 +18,10 @@
-export([init/3, terminate/2, delete_and_terminate/2, purge/1,
publish/4, publish_delivered/4, discard/3, drain_confirmed/1,
- dropwhile/3, fetch/2, ack/2, requeue/2, len/1, is_empty/1,
- depth/1, set_ram_duration_target/2, ram_duration/1,
+ dropwhile/3, fetch/2, drop/2, ack/2, requeue/2, fold/3, len/1,
+ is_empty/1, depth/1, set_ram_duration_target/2, ram_duration/1,
needs_timeout/1, timeout/1, handle_pre_hibernate/1, status/1, invoke/3,
- is_duplicate/2, multiple_routing_keys/0, fold/3]).
+ is_duplicate/2, multiple_routing_keys/0, foreach_ack/3]).
-export([start/1, stop/0]).
@@ -255,7 +255,6 @@
q4,
next_seq_id,
pending_ack,
- pending_ack_index,
ram_ack_index,
index_state,
msg_store_clients,
@@ -349,7 +348,7 @@
q4 :: ?QUEUE:?QUEUE(),
next_seq_id :: seq_id(),
pending_ack :: gb_tree(),
- ram_ack_index :: gb_tree(),
+ ram_ack_index :: gb_set(),
index_state :: any(),
msg_store_clients :: 'undefined' | {{any(), binary()},
{any(), binary()}},
@@ -592,8 +591,8 @@ dropwhile(Pred, AckRequired, State, Msgs) ->
case {Pred(MsgProps), AckRequired} of
{true, true} ->
{MsgStatus1, State2} = read_msg(MsgStatus, State1),
- {{Msg, _, AckTag, _}, State3} =
- internal_fetch(true, MsgStatus1, State2),
+ {{Msg, _IsDelivered, AckTag}, State3} =
+ internal_fetch(true, MsgStatus1, State2),
dropwhile(Pred, AckRequired, State3, [{Msg, AckTag} | Msgs]);
{true, false} ->
{_, State2} = internal_fetch(false, MsgStatus, State1),
@@ -615,6 +614,16 @@ fetch(AckRequired, State) ->
{Res, a(State3)}
end.
+drop(AckRequired, State) ->
+ case queue_out(State) of
+ {empty, State1} ->
+ {empty, a(State1)};
+ {{value, MsgStatus}, State1} ->
+ {{_Msg, _IsDelivered, AckTag}, State2} =
+ internal_fetch(AckRequired, MsgStatus, State1),
+ {{MsgStatus#msg_status.msg_id, AckTag}, a(State2)}
+ end.
+
ack([], State) ->
{[], State};
ack(AckTags, State) ->
@@ -638,16 +647,15 @@ ack(AckTags, State) ->
persistent_count = PCount1,
ack_out_counter = AckOutCount + length(AckTags) })}.
-fold(undefined, State, _AckTags) ->
+foreach_ack(undefined, State, _AckTags) ->
State;
-fold(MsgFun, State = #vqstate{pending_ack = PA}, AckTags) ->
- lists:foldl(
- fun(SeqId, State1) ->
- {MsgStatus, State2} =
- read_msg(gb_trees:get(SeqId, PA), State1),
- MsgFun(MsgStatus#msg_status.msg, SeqId),
- State2
- end, State, AckTags).
+foreach_ack(MsgFun, State = #vqstate{pending_ack = PA}, AckTags) ->
+ a(lists:foldl(fun(SeqId, State1) ->
+ {MsgStatus, State2} =
+ read_msg(gb_trees:get(SeqId, PA), false, State1),
+ MsgFun(MsgStatus#msg_status.msg, SeqId),
+ State2
+ end, State, AckTags)).
requeue(AckTags, #vqstate { delta = Delta,
q3 = Q3,
@@ -670,6 +678,24 @@ requeue(AckTags, #vqstate { delta = Delta,
in_counter = InCounter + MsgCount,
len = Len + MsgCount }))}.
+fold(Fun, Acc, #vqstate { q1 = Q1,
+ q2 = Q2,
+ delta = #delta { start_seq_id = DeltaSeqId,
+ end_seq_id = DeltaSeqIdEnd },
+ q3 = Q3,
+ q4 = Q4 } = State) ->
+ QFun = fun(MsgStatus, {Acc0, State0}) ->
+ {#msg_status { msg = Msg }, State1 } =
+ read_msg(MsgStatus, false, State0),
+ {Fun(Msg, Acc0), State1}
+ end,
+ {Acc1, State1} = ?QUEUE:foldl(QFun, {Acc, State}, Q4),
+ {Acc2, State2} = ?QUEUE:foldl(QFun, {Acc1, State1}, Q3),
+ {Acc3, State3} = delta_fold(Fun, Acc2, DeltaSeqId, DeltaSeqIdEnd, State2),
+ {Acc4, State4} = ?QUEUE:foldl(QFun, {Acc3, State3}, Q2),
+ {Acc5, State5} = ?QUEUE:foldl(QFun, {Acc4, State4}, Q1),
+ {Acc5, State5}.
+
len(#vqstate { len = Len }) -> Len.
is_empty(State) -> 0 == len(State).
@@ -723,7 +749,7 @@ ram_duration(State = #vqstate {
{AvgAckIngressRate, AckIngress1} =
update_rate(Now, AckTimestamp, AckInCount, AckIngress),
- RamAckCount = gb_trees:size(RamAckIndex),
+ RamAckCount = gb_sets:size(RamAckIndex),
Duration = %% msgs+acks / (msgs+acks/sec) == sec
case (AvgEgressRate == 0 andalso AvgIngressRate == 0 andalso
@@ -802,7 +828,7 @@ status(#vqstate {
{pending_acks , gb_trees:size(PA)},
{target_ram_count , TargetRamCount},
{ram_msg_count , RamMsgCount},
- {ram_ack_count , gb_trees:size(RAI)},
+ {ram_ack_count , gb_sets:size(RAI)},
{next_seq_id , NextSeqId},
{persistent_count , PersistentCount},
{avg_ingress_rate , AvgIngressRate},
@@ -837,6 +863,7 @@ a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
true = Len >= 0,
true = PersistentCount >= 0,
true = RamMsgCount >= 0,
+ true = RamMsgCount =< Len,
State.
@@ -1006,7 +1033,7 @@ init(IsDurable, IndexState, DeltaCount, Terms, AsyncCallback,
q4 = ?QUEUE:new(),
next_seq_id = NextSeqId,
pending_ack = gb_trees:empty(),
- ram_ack_index = gb_trees:empty(),
+ ram_ack_index = gb_sets:empty(),
index_state = IndexState1,
msg_store_clients = {PersistentClient, TransientClient},
durable = IsDurable,
@@ -1062,17 +1089,19 @@ queue_out(State = #vqstate { q4 = Q4 }) ->
{{value, MsgStatus}, State #vqstate { q4 = Q4a }}
end.
+read_msg(MsgStatus, State) -> read_msg(MsgStatus, true, State).
+
read_msg(MsgStatus = #msg_status { msg = undefined,
msg_id = MsgId,
is_persistent = IsPersistent },
- State = #vqstate { ram_msg_count = RamMsgCount,
- msg_store_clients = MSCState}) ->
+ CountDiskToRam, State = #vqstate { ram_msg_count = RamMsgCount,
+ msg_store_clients = MSCState}) ->
{{ok, Msg = #basic_message {}}, MSCState1} =
msg_store_read(MSCState, IsPersistent, MsgId),
{MsgStatus #msg_status { msg = Msg },
- State #vqstate { ram_msg_count = RamMsgCount + 1,
+ State #vqstate { ram_msg_count = RamMsgCount + one_if(CountDiskToRam),
msg_store_clients = MSCState1 }};
-read_msg(MsgStatus, State) ->
+read_msg(MsgStatus, _CountDiskToRam, State) ->
{MsgStatus, State}.
internal_fetch(AckRequired, MsgStatus = #msg_status {
@@ -1116,14 +1145,13 @@ internal_fetch(AckRequired, MsgStatus = #msg_status {
end,
PCount1 = PCount - one_if(IsPersistent andalso not AckRequired),
- Len1 = Len - 1,
RamMsgCount1 = RamMsgCount - one_if(Msg =/= undefined),
- {{Msg, IsDelivered, AckTag, Len1},
+ {{Msg, IsDelivered, AckTag},
State1 #vqstate { ram_msg_count = RamMsgCount1,
out_counter = OutCount + 1,
index_state = IndexState2,
- len = Len1,
+ len = Len - 1,
persistent_count = PCount1 }}.
purge_betas_and_deltas(LensByStore,
@@ -1222,7 +1250,6 @@ maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus,
%%----------------------------------------------------------------------------
record_pending_ack(#msg_status { seq_id = SeqId,
- msg_id = MsgId,
msg_on_disk = MsgOnDisk } = MsgStatus,
State = #vqstate { pending_ack = PA,
ram_ack_index = RAI,
@@ -1230,7 +1257,7 @@ record_pending_ack(#msg_status { seq_id = SeqId,
{AckEntry, RAI1} =
case MsgOnDisk of
true -> {m(trim_msg_status(MsgStatus)), RAI};
- false -> {MsgStatus, gb_trees:insert(SeqId, MsgId, RAI)}
+ false -> {MsgStatus, gb_sets:insert(SeqId, RAI)}
end,
State #vqstate { pending_ack = gb_trees:insert(SeqId, AckEntry, PA),
ram_ack_index = RAI1,
@@ -1240,7 +1267,7 @@ remove_pending_ack(SeqId, State = #vqstate { pending_ack = PA,
ram_ack_index = RAI }) ->
{gb_trees:get(SeqId, PA),
State #vqstate { pending_ack = gb_trees:delete(SeqId, PA),
- ram_ack_index = gb_trees:delete_any(SeqId, RAI) }}.
+ ram_ack_index = gb_sets:delete_any(SeqId, RAI) }}.
purge_pending_ack(KeepPersistent,
State = #vqstate { pending_ack = PA,
@@ -1251,7 +1278,7 @@ purge_pending_ack(KeepPersistent,
accumulate_ack(MsgStatus, Acc)
end, accumulate_ack_init(), PA),
State1 = State #vqstate { pending_ack = gb_trees:empty(),
- ram_ack_index = gb_trees:empty() },
+ ram_ack_index = gb_sets:empty() },
case KeepPersistent of
true -> case orddict:find(false, MsgIdsByStore) of
error -> State1;
@@ -1344,7 +1371,7 @@ msg_indices_written_to_disk(Callback, MsgIdSet) ->
end).
%%----------------------------------------------------------------------------
-%% Internal plumbing for requeue
+%% Internal plumbing for requeue and fold
%%----------------------------------------------------------------------------
publish_alpha(#msg_status { msg = undefined } = MsgStatus, State) ->
@@ -1413,6 +1440,27 @@ beta_limit(Q) ->
delta_limit(?BLANK_DELTA_PATTERN(_X)) -> undefined;
delta_limit(#delta { start_seq_id = StartSeqId }) -> StartSeqId.
+delta_fold(_Fun, Acc, DeltaSeqIdEnd, DeltaSeqIdEnd, State) ->
+ {Acc, State};
+delta_fold(Fun, Acc, DeltaSeqId, DeltaSeqIdEnd,
+ #vqstate { index_state = IndexState,
+ msg_store_clients = MSCState } = State) ->
+ DeltaSeqId1 = lists:min(
+ [rabbit_queue_index:next_segment_boundary(DeltaSeqId),
+ DeltaSeqIdEnd]),
+ {List, IndexState1} = rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1,
+ IndexState),
+ {Acc1, MSCState1} =
+ lists:foldl(fun ({MsgId, _SeqId, _MsgProps, IsPersistent,
+ _IsDelivered}, {Acc0, MSCState0}) ->
+ {{ok, Msg = #basic_message {}}, MSCState1} =
+ msg_store_read(MSCState0, IsPersistent, MsgId),
+ {Fun(Msg, Acc0), MSCState1}
+ end, {Acc, MSCState}, List),
+ delta_fold(Fun, Acc1, DeltaSeqId1, DeltaSeqIdEnd,
+ State #vqstate { index_state = IndexState1,
+ msg_store_clients = MSCState1 }).
+
%%----------------------------------------------------------------------------
%% Phase changes
%%----------------------------------------------------------------------------
@@ -1451,7 +1499,7 @@ reduce_memory_use(AlphaBetaFun, BetaDeltaFun, AckFun,
}) ->
{Reduce, State1 = #vqstate { q2 = Q2, q3 = Q3 }} =
- case chunk_size(RamMsgCount + gb_trees:size(RamAckIndex),
+ case chunk_size(RamMsgCount + gb_sets:size(RamAckIndex),
TargetRamCount) of
0 -> {false, State};
%% Reduce memory of pending acks and alphas. The order is
@@ -1479,12 +1527,12 @@ limit_ram_acks(0, State) ->
{0, State};
limit_ram_acks(Quota, State = #vqstate { pending_ack = PA,
ram_ack_index = RAI }) ->
- case gb_trees:is_empty(RAI) of
+ case gb_sets:is_empty(RAI) of
true ->
{Quota, State};
false ->
- {SeqId, MsgId, RAI1} = gb_trees:take_largest(RAI),
- MsgStatus = #msg_status { msg_id = MsgId, is_persistent = false} =
+ {SeqId, RAI1} = gb_sets:take_largest(RAI),
+ MsgStatus = #msg_status { is_persistent = false} =
gb_trees:get(SeqId, PA),
{MsgStatus1, State1} =
maybe_write_to_disk(true, false, MsgStatus, State),
diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl
index f3a8cacf..a7ea3d99 100644
--- a/src/rabbit_writer.erl
+++ b/src/rabbit_writer.erl
@@ -18,13 +18,17 @@
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
--export([start/5, start_link/5, mainloop/2, mainloop1/2]).
+-export([start/5, start_link/5, start/6, start_link/6]).
-export([send_command/2, send_command/3,
send_command_sync/2, send_command_sync/3,
send_command_and_notify/4, send_command_and_notify/5]).
-export([internal_send_command/4, internal_send_command/6]).
--record(wstate, {sock, channel, frame_max, protocol, pending}).
+%% internal
+-export([mainloop/1, mainloop1/1]).
+
+-record(wstate, {sock, channel, frame_max, protocol, reader,
+ stats_timer, pending}).
-define(HIBERNATE_AFTER, 5000).
@@ -40,6 +44,14 @@
(rabbit_net:socket(), rabbit_channel:channel_number(),
non_neg_integer(), rabbit_types:protocol(), pid())
-> rabbit_types:ok(pid())).
+-spec(start/6 ::
+ (rabbit_net:socket(), rabbit_channel:channel_number(),
+ non_neg_integer(), rabbit_types:protocol(), pid(), boolean())
+ -> rabbit_types:ok(pid())).
+-spec(start_link/6 ::
+ (rabbit_net:socket(), rabbit_channel:channel_number(),
+ non_neg_integer(), rabbit_types:protocol(), pid(), boolean())
+ -> rabbit_types:ok(pid())).
-spec(send_command/2 ::
(pid(), rabbit_framing:amqp_method_record()) -> 'ok').
-spec(send_command/3 ::
@@ -67,50 +79,58 @@
non_neg_integer(), rabbit_types:protocol())
-> 'ok').
--spec(mainloop/2 :: (_,_) -> 'done').
--spec(mainloop1/2 :: (_,_) -> any()).
-
-endif.
%%---------------------------------------------------------------------------
start(Sock, Channel, FrameMax, Protocol, ReaderPid) ->
- {ok,
- proc_lib:spawn(?MODULE, mainloop, [ReaderPid,
- #wstate{sock = Sock,
- channel = Channel,
- frame_max = FrameMax,
- protocol = Protocol,
- pending = []}])}.
+ start(Sock, Channel, FrameMax, Protocol, ReaderPid, false).
start_link(Sock, Channel, FrameMax, Protocol, ReaderPid) ->
- {ok,
- proc_lib:spawn_link(?MODULE, mainloop, [ReaderPid,
- #wstate{sock = Sock,
- channel = Channel,
- frame_max = FrameMax,
- protocol = Protocol,
- pending = []}])}.
-
-mainloop(ReaderPid, State) ->
+ start_link(Sock, Channel, FrameMax, Protocol, ReaderPid, false).
+
+start(Sock, Channel, FrameMax, Protocol, ReaderPid, ReaderWantsStats) ->
+ State = initial_state(Sock, Channel, FrameMax, Protocol, ReaderPid,
+ ReaderWantsStats),
+ {ok, proc_lib:spawn(?MODULE, mainloop, [State])}.
+
+start_link(Sock, Channel, FrameMax, Protocol, ReaderPid, ReaderWantsStats) ->
+ State = initial_state(Sock, Channel, FrameMax, Protocol, ReaderPid,
+ ReaderWantsStats),
+ {ok, proc_lib:spawn_link(?MODULE, mainloop, [State])}.
+
+initial_state(Sock, Channel, FrameMax, Protocol, ReaderPid, ReaderWantsStats) ->
+ (case ReaderWantsStats of
+ true -> fun rabbit_event:init_stats_timer/2;
+ false -> fun rabbit_event:init_disabled_stats_timer/2
+ end)(#wstate{sock = Sock,
+ channel = Channel,
+ frame_max = FrameMax,
+ protocol = Protocol,
+ reader = ReaderPid,
+ pending = []},
+ #wstate.stats_timer).
+
+mainloop(State) ->
try
- mainloop1(ReaderPid, State)
+ mainloop1(State)
catch
- exit:Error -> ReaderPid ! {channel_exit, #wstate.channel, Error}
+ exit:Error -> #wstate{reader = ReaderPid, channel = Channel} = State,
+ ReaderPid ! {channel_exit, Channel, Error}
end,
done.
-mainloop1(ReaderPid, State = #wstate{pending = []}) ->
+mainloop1(State = #wstate{pending = []}) ->
receive
- Message -> ?MODULE:mainloop1(ReaderPid, handle_message(Message, State))
+ Message -> ?MODULE:mainloop1(handle_message(Message, State))
after ?HIBERNATE_AFTER ->
- erlang:hibernate(?MODULE, mainloop, [ReaderPid, State])
+ erlang:hibernate(?MODULE, mainloop, [State])
end;
-mainloop1(ReaderPid, State) ->
+mainloop1(State) ->
receive
- Message -> ?MODULE:mainloop1(ReaderPid, handle_message(Message, State))
+ Message -> ?MODULE:mainloop1(handle_message(Message, State))
after 0 ->
- ?MODULE:mainloop1(ReaderPid, flush(State))
+ ?MODULE:mainloop1(flush(State))
end.
handle_message({send_command, MethodRecord}, State) ->
@@ -139,9 +159,12 @@ handle_message({'DOWN', _MRef, process, QPid, _Reason}, State) ->
rabbit_amqqueue:notify_sent_queue_down(QPid),
State;
handle_message({inet_reply, _, ok}, State) ->
- State;
+ rabbit_event:ensure_stats_timer(State, #wstate.stats_timer, emit_stats);
handle_message({inet_reply, _, Status}, _State) ->
exit({writer, send_failed, Status});
+handle_message(emit_stats, State = #wstate{reader = ReaderPid}) ->
+ ReaderPid ! ensure_stats,
+ rabbit_event:reset_stats_timer(State, #wstate.stats_timer);
handle_message(Message, _State) ->
exit({writer, message_not_understood, Message}).