summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlvaro Videla <alvaro@rabbitmq.com>2013-12-11 15:38:57 +0000
committerAlvaro Videla <alvaro@rabbitmq.com>2013-12-11 15:38:57 +0000
commita76807d6df02b23b20148f4582b61af4f083462b (patch)
treef6af43bccc91333ca71f0663f94cd68e14261015
parentddbf072e71babfda40d2a980d5eb460bf9863301 (diff)
parent6392ac1f070bf039b9eb37a49decff3230794ec6 (diff)
downloadrabbitmq-server-a76807d6df02b23b20148f4582b61af4f083462b.tar.gz
merge default into bug25817
-rw-r--r--.hgignore1
-rw-r--r--README2
-rwxr-xr-xcheck_xref20
-rw-r--r--codegen.py8
-rw-r--r--docs/rabbitmq.config.example7
-rw-r--r--docs/rabbitmqctl.1.xml45
-rw-r--r--ebin/rabbit_app.in6
-rw-r--r--packaging/RPMS/Fedora/rabbitmq-server.spec10
-rw-r--r--packaging/common/README20
-rw-r--r--packaging/debs/Debian/debian/changelog6
-rw-r--r--packaging/debs/Debian/debian/rules1
-rw-r--r--src/credit_flow.erl21
-rw-r--r--src/rabbit.erl32
-rw-r--r--src/rabbit_amqqueue.erl12
-rw-r--r--src/rabbit_amqqueue_process.erl87
-rw-r--r--src/rabbit_binary_parser.erl18
-rw-r--r--src/rabbit_binding.erl27
-rw-r--r--src/rabbit_channel.erl29
-rw-r--r--src/rabbit_connection_helper_sup.erl (renamed from src/rabbit_intermediate_sup.erl)23
-rw-r--r--src/rabbit_connection_sup.erl20
-rw-r--r--src/rabbit_control_main.erl9
-rw-r--r--src/rabbit_error_logger.erl2
-rw-r--r--src/rabbit_exchange.erl12
-rw-r--r--src/rabbit_file.erl2
-rw-r--r--src/rabbit_heartbeat.erl40
-rw-r--r--src/rabbit_limiter.erl16
-rw-r--r--src/rabbit_mirror_queue_master.erl8
-rw-r--r--src/rabbit_mirror_queue_misc.erl61
-rw-r--r--src/rabbit_mirror_queue_slave.erl95
-rw-r--r--src/rabbit_mnesia.erl13
-rw-r--r--src/rabbit_networking.erl24
-rw-r--r--src/rabbit_policy.erl8
-rw-r--r--src/rabbit_reader.erl239
-rw-r--r--src/rabbit_runtime_parameters.erl22
-rw-r--r--src/rabbit_tests.erl6
-rw-r--r--src/rabbit_upgrade.erl9
-rw-r--r--src/rabbit_upgrade_functions.erl15
-rw-r--r--src/rabbit_vhost.erl20
-rw-r--r--src/rabbit_writer.erl22
-rw-r--r--src/vm_memory_monitor.erl4
-rw-r--r--src/worker_pool.erl18
-rw-r--r--src/worker_pool_worker.erl49
42 files changed, 705 insertions, 384 deletions
diff --git a/.hgignore b/.hgignore
index cd017298..05850b03 100644
--- a/.hgignore
+++ b/.hgignore
@@ -4,6 +4,7 @@ syntax: glob
*.swp
*.patch
*.orig
+*.tmp
erl_crash.dump
deps.mk
diff --git a/README b/README
index 67e3a66a..90e99e62 100644
--- a/README
+++ b/README
@@ -1 +1 @@
-Please see http://www.rabbitmq.com/build-server.html for build instructions.
+Please see http://www.rabbitmq.com/build-server.html for build instructions. \ No newline at end of file
diff --git a/check_xref b/check_xref
index 24307fdb..21eb14b9 100755
--- a/check_xref
+++ b/check_xref
@@ -133,16 +133,20 @@ process_analysis(Query, Tag, Severity, Analysis) when is_list(Query) ->
checks() ->
[{"(XXL)(Lin) ((XC - UC) || (XU - X - B))",
"has call to undefined function(s)",
- error, filters()},
- {"(Lin) (L - LU)", "has unused local function(s)",
- error, filters()},
+ error, filters()},
+ {"(Lin) (L - LU)",
+ "has unused local function(s)",
+ error, filters()},
+ {"(E | \"(rabbit|amqp).*\":_/_ || \"gen_server2?\":call/2)",
+ "has 5 sec timeout in",
+ error, filters()},
{"(Lin) (LU * (X - XU))",
- "has exported function(s) only used locally",
- warning, filters()},
+ "has exported function(s) only used locally",
+ warning, filters()},
{"(Lin) (DF * (XU + LU))", "used deprecated function(s)",
- warning, filters()}].
-% {"(Lin) (X - XU)", "possibly unused export",
-% warning, fun filter_unused/1}].
+ warning, filters()}].
+%% {"(Lin) (X - XU)", "possibly unused export",
+%% warning, fun filter_unused/1}].
%%
%% noise filters (can be disabled with -X) - strip uninteresting analyses
diff --git a/codegen.py b/codegen.py
index 842549cf..91fa1154 100644
--- a/codegen.py
+++ b/codegen.py
@@ -187,7 +187,7 @@ def genErl(spec):
elif type == 'table':
return p+'Len:32/unsigned, '+p+'Tab:'+p+'Len/binary'
- def genFieldPostprocessing(packed):
+ def genFieldPostprocessing(packed, hasContent):
for f in packed:
type = erlType(f.domain)
if type == 'bit':
@@ -199,6 +199,10 @@ def genErl(spec):
elif type == 'table':
print " F%d = rabbit_binary_parser:parse_table(F%dTab)," % \
(f.index, f.index)
+ # We skip the check on content-bearing methods for
+ # speed. This is a sanity check, not a security thing.
+ elif type == 'shortstr' and not hasContent:
+ print " rabbit_binary_parser:assert_utf8(F%d)," % (f.index)
else:
pass
@@ -214,7 +218,7 @@ def genErl(spec):
restSeparator = ''
recordConstructorExpr = '#%s{%s}' % (m.erlangName(), fieldMapList(m.arguments))
print "decode_method_fields(%s, <<%s>>) ->" % (m.erlangName(), binaryPattern)
- genFieldPostprocessing(packedFields)
+ genFieldPostprocessing(packedFields, m.hasContent)
print " %s;" % (recordConstructorExpr,)
def genDecodeProperties(c):
diff --git a/docs/rabbitmq.config.example b/docs/rabbitmq.config.example
index 00aef8d9..c0d6cc70 100644
--- a/docs/rabbitmq.config.example
+++ b/docs/rabbitmq.config.example
@@ -138,6 +138,11 @@
%%
%% {frame_max, 131072},
+ %% Set the max permissible number of channels per connection.
+ %% 0 means "no limit".
+ %%
+ %% {channel_max, 128},
+
%% Customising Socket Options.
%%
%% See (http://www.erlang.org/doc/man/inet.html#setopts-2) for
@@ -169,7 +174,7 @@
%% lower bound, a disk alarm will be set - see the documentation
%% listed above for more details.
%%
- %% {disk_free_limit, 1000000000},
+ %% {disk_free_limit, 50000000},
%% Alternatively, we can set a limit relative to total available RAM.
%%
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index d7c93924..d2a3f7c7 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -1135,25 +1135,11 @@
<listitem><para>Number of consumers.</para></listitem>
</varlistentry>
<varlistentry>
- <term>active_consumers</term>
- <listitem>
- <para>
- Number of active consumers. An active consumer is
- one which could immediately receive any messages
- sent to the queue - i.e. it is not limited by its
- prefetch count, TCP congestion, flow control, or
- because it has issued channel.flow. At least one
- of messages_ready and active_consumers must always
- be zero.
- </para>
- <para>
- Note that this value is an instantaneous snapshot
- - when consumers are restricted by their prefetch
- count they may only appear to be active for small
- fractions of a second until more messages are sent
- out.
- </para>
- </listitem>
+ <term>consumer_utilisation</term>
+ <listitem><para>Fraction of the time (between 0.0 and 1.0)
+ that the queue is able to immediately deliver messages to
+ consumers. This can be less than 1.0 if consumers are limited
+ by network congestion or prefetch count.</para></listitem>
</varlistentry>
<varlistentry>
<term>memory</term>
@@ -1411,24 +1397,9 @@
</varlistentry>
<varlistentry>
- <term>last_blocked_by</term>
- <listitem><para>The reason for which this connection
- was last blocked. One of 'resource' - due to a memory
- or disk alarm, 'flow' - due to internal flow control, or
- 'none' if the connection was never
- blocked.</para></listitem>
- </varlistentry>
- <varlistentry>
- <term>last_blocked_age</term>
- <listitem><para>Time, in seconds, since this
- connection was last blocked, or
- 'infinity'.</para></listitem>
- </varlistentry>
-
- <varlistentry>
<term>state</term>
<listitem><para>Connection state (one of [<command>starting</command>, <command>tuning</command>,
- <command>opening</command>, <command>running</command>, <command>blocking</command>, <command>blocked</command>, <command>closing</command>, <command>closed</command>]).</para></listitem>
+ <command>opening</command>, <command>running</command>, <command>flow</command>, <command>blocking</command>, <command>blocked</command>, <command>closing</command>, <command>closed</command>]).</para></listitem>
</varlistentry>
<varlistentry>
<term>channels</term>
@@ -1459,6 +1430,10 @@
<listitem><para>Maximum frame size (bytes).</para></listitem>
</varlistentry>
<varlistentry>
+ <term>channel_max</term>
+ <listitem><para>Maximum number of channels on this connection.</para></listitem>
+ </varlistentry>
+ <varlistentry>
<term>client_properties</term>
<listitem><para>Informational properties transmitted by the client
during connection establishment.</para></listitem>
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in
index 6ee0115b..29f06e79 100644
--- a/ebin/rabbit_app.in
+++ b/ebin/rabbit_app.in
@@ -1,4 +1,4 @@
-{application, rabbit, %% -*- erlang -*-
+{application, rabbit, %% -*- erlang -*-
[{description, "RabbitMQ"},
{id, "RabbitMQ"},
{vsn, "%%VSN%%"},
@@ -25,7 +25,8 @@
%% 0 ("no limit") would make a better default, but that
%% breaks the QPid Java client
{frame_max, 131072},
- {heartbeat, 600},
+ {channel_max, 0},
+ {heartbeat, 580},
{msg_store_file_size_limit, 16777216},
{queue_index_max_journal_entries, 65536},
{default_user, <<"guest">>},
@@ -52,6 +53,7 @@
{nodelay, true},
{linger, {true, 0}},
{exit_on_close, false}]},
+ {halt_on_upgrade_failure, true},
{hipe_compile, false},
%% see bug 24513 for how this list was created
{hipe_modules,
diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec
index d1e3e380..05140e3c 100644
--- a/packaging/RPMS/Fedora/rabbitmq-server.spec
+++ b/packaging/RPMS/Fedora/rabbitmq-server.spec
@@ -10,10 +10,11 @@ Source1: rabbitmq-server.init
Source2: rabbitmq-script-wrapper
Source3: rabbitmq-server.logrotate
Source4: rabbitmq-server.ocf
+Source5: README
URL: http://www.rabbitmq.com/
BuildArch: noarch
-BuildRequires: erlang >= R13B03, python-simplejson, xmlto, libxslt
-Requires: erlang >= R13B03, logrotate
+BuildRequires: erlang >= R13B-03, python-simplejson, xmlto, libxslt
+Requires: erlang >= R13B-03, logrotate
BuildRoot: %{_tmppath}/%{name}-%{version}-%{release}-%{_arch}-root
Summary: The RabbitMQ server
Requires(post): %%REQUIRES%%
@@ -41,6 +42,7 @@ scalable implementation of an AMQP broker.
%build
cp %{S:2} %{_rabbit_wrapper}
cp %{S:4} %{_rabbit_server_ocf}
+cp %{S:5} %{_builddir}/rabbitmq-server-%{version}/README
make %{?_smp_mflags}
%install
@@ -121,12 +123,16 @@ done
%{_initrddir}/rabbitmq-server
%config(noreplace) %{_sysconfdir}/logrotate.d/rabbitmq-server
%doc LICENSE*
+%doc README
%doc docs/rabbitmq.config.example
%clean
rm -rf %{buildroot}
%changelog
+* Wed Oct 23 2013 emile@rabbitmq.com 3.2.0-1
+- New Upstream Release
+
* Thu Aug 15 2013 simon@rabbitmq.com 3.1.5-1
- New Upstream Release
diff --git a/packaging/common/README b/packaging/common/README
new file mode 100644
index 00000000..0a29ee27
--- /dev/null
+++ b/packaging/common/README
@@ -0,0 +1,20 @@
+This is rabbitmq-server, a message broker implementing AMQP, STOMP and MQTT.
+
+Most of the documentation for RabbitMQ is provided on the RabbitMQ web
+site. You can see documentation for the current version at:
+
+http://www.rabbitmq.com/documentation.html
+
+and for previous versions at:
+
+http://www.rabbitmq.com/previous.html
+
+Man pages are installed with this package. Of particular interest are
+rabbitmqctl(1), to interact with a running RabbitMQ server, and
+rabbitmq-plugins(1), to enable and disable plugins. These should be
+run as the superuser.
+
+An example configuration file is provided in the same directory as
+this README. Copy it to /etc/rabbitmq/rabbitmq.config to use it. The
+RabbitMQ server must be restarted after changing the configuration
+file or enabling or disabling plugins.
diff --git a/packaging/debs/Debian/debian/changelog b/packaging/debs/Debian/debian/changelog
index 3212514e..6c63478f 100644
--- a/packaging/debs/Debian/debian/changelog
+++ b/packaging/debs/Debian/debian/changelog
@@ -1,3 +1,9 @@
+rabbitmq-server (3.2.0-1) unstable; urgency=low
+
+ * New Upstream Release
+
+ -- Emile Joubert <emile@rabbitmq.com> Wed, 23 Oct 2013 12:44:10 +0100
+
rabbitmq-server (3.1.5-1) unstable; urgency=low
* New Upstream Release
diff --git a/packaging/debs/Debian/debian/rules b/packaging/debs/Debian/debian/rules
index a1498979..b3c96069 100644
--- a/packaging/debs/Debian/debian/rules
+++ b/packaging/debs/Debian/debian/rules
@@ -9,6 +9,7 @@ RABBIT_BIN=$(DEB_DESTDIR)usr/lib/rabbitmq/bin/
DOCDIR=$(DEB_DESTDIR)usr/share/doc/rabbitmq-server/
DEB_MAKE_INSTALL_TARGET := install TARGET_DIR=$(RABBIT_LIB) SBIN_DIR=$(RABBIT_BIN) DOC_INSTALL_DIR=$(DOCDIR) MAN_DIR=$(DEB_DESTDIR)usr/share/man/
DEB_MAKE_CLEAN_TARGET:= distclean
+DEB_INSTALL_DOCS_ALL=debian/README
install/rabbitmq-server::
mkdir -p $(DOCDIR)
diff --git a/src/credit_flow.erl b/src/credit_flow.erl
index d48d649e..39a257ac 100644
--- a/src/credit_flow.erl
+++ b/src/credit_flow.erl
@@ -30,7 +30,7 @@
-define(DEFAULT_CREDIT, {200, 50}).
--export([send/1, send/2, ack/1, ack/2, handle_bump_msg/1, blocked/0]).
+-export([send/1, send/2, ack/1, ack/2, handle_bump_msg/1, blocked/0, state/0]).
-export([peer_down/1]).
%%----------------------------------------------------------------------------
@@ -110,6 +110,18 @@ blocked() -> case get(credit_blocked) of
_ -> true
end.
+state() -> case blocked() of
+ true -> flow;
+ false -> case get(credit_blocked_at) of
+ undefined -> running;
+ B -> Diff = timer:now_diff(erlang:now(), B),
+ case Diff < 5000000 of
+ true -> flow;
+ false -> running
+ end
+ end
+ end.
+
peer_down(Peer) ->
%% In theory we could also remove it from credit_deferred here, but it
%% doesn't really matter; at some point later we will drain
@@ -128,7 +140,12 @@ grant(To, Quantity) ->
true -> ?UPDATE(credit_deferred, [], Deferred, [{To, Msg} | Deferred])
end.
-block(From) -> ?UPDATE(credit_blocked, [], Blocks, [From | Blocks]).
+block(From) ->
+ case blocked() of
+ false -> put(credit_blocked_at, erlang:now());
+ true -> ok
+ end,
+ ?UPDATE(credit_blocked, [], Blocks, [From | Blocks]).
unblock(From) ->
?UPDATE(credit_blocked, [], Blocks, Blocks -- [From]),
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 1b7fe6da..045c5d58 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -347,19 +347,25 @@ handle_app_error(App, Reason) ->
start_it(StartFun) ->
Marker = spawn_link(fun() -> receive stop -> ok end end),
- register(rabbit_boot, Marker),
- try
- StartFun()
- catch
- throw:{could_not_start, _App, _Reason}=Err ->
- boot_error(Err, not_available);
- _:Reason ->
- boot_error(Reason, erlang:get_stacktrace())
- after
- unlink(Marker),
- Marker ! stop,
- %% give the error loggers some time to catch up
- timer:sleep(100)
+ case catch register(rabbit_boot, Marker) of
+ true -> try
+ case is_running() of
+ true -> ok;
+ false -> StartFun()
+ end
+ catch
+ throw:{could_not_start, _App, _Reason}=Err ->
+ boot_error(Err, not_available);
+ _:Reason ->
+ boot_error(Reason, erlang:get_stacktrace())
+ after
+ unlink(Marker),
+ Marker ! stop,
+ %% give the error loggers some time to catch up
+ timer:sleep(100)
+ end;
+ _ -> unlink(Marker),
+ Marker ! stop
end.
stop() ->
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 8a84c9f4..8306f134 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -466,10 +466,16 @@ check_dlxrk_arg({Type, _}, _Args) ->
list() -> mnesia:dirty_match_object(rabbit_queue, #amqqueue{_ = '_'}).
+%% Not dirty_match_object since that would not be transactional when used in a
+%% tx context
list(VHostPath) ->
- mnesia:dirty_match_object(
- rabbit_queue,
- #amqqueue{name = rabbit_misc:r(VHostPath, queue), _ = '_'}).
+ mnesia:async_dirty(
+ fun () ->
+ mnesia:match_object(
+ rabbit_queue,
+ #amqqueue{name = rabbit_misc:r(VHostPath, queue), _ = '_'},
+ read)
+ end).
info_keys() -> rabbit_amqqueue_process:info_keys().
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 3166c4ab..7002fd36 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -39,6 +39,7 @@
backing_queue,
backing_queue_state,
active_consumers,
+ consumer_use,
expires,
sync_timer_ref,
rate_timer_ref,
@@ -95,11 +96,12 @@
messages_unacknowledged,
messages,
consumers,
+ consumer_utilisation,
memory,
slave_pids,
synchronised_slave_pids,
backing_queue_status,
- status
+ state
]).
-define(CREATION_EVENT_KEYS,
@@ -149,6 +151,7 @@ init_state(Q) ->
exclusive_consumer = none,
has_had_consumers = false,
active_consumers = priority_queue:new(),
+ consumer_use = {inactive, now_micros(), 0, 0.0},
senders = pmon:new(delegate),
msg_id_to_channel = gb_trees:empty(),
status = running,
@@ -267,15 +270,16 @@ recovery_barrier(BarrierPid) ->
process_args_policy(State = #q{q = Q,
args_policy_version = N}) ->
- lists:foldl(
- fun({Name, Resolve, Fun}, StateN) ->
- Fun(args_policy_lookup(Name, Resolve, Q), StateN)
- end, State#q{args_policy_version = N + 1},
- [{<<"expires">>, fun res_min/2, fun init_exp/2},
- {<<"dead-letter-exchange">>, fun res_arg/2, fun init_dlx/2},
- {<<"dead-letter-routing-key">>, fun res_arg/2, fun init_dlx_rkey/2},
- {<<"message-ttl">>, fun res_min/2, fun init_ttl/2},
- {<<"max-length">>, fun res_min/2, fun init_max_length/2}]).
+ ArgsTable =
+ [{<<"expires">>, fun res_min/2, fun init_exp/2},
+ {<<"dead-letter-exchange">>, fun res_arg/2, fun init_dlx/2},
+ {<<"dead-letter-routing-key">>, fun res_arg/2, fun init_dlx_rkey/2},
+ {<<"message-ttl">>, fun res_min/2, fun init_ttl/2},
+ {<<"max-length">>, fun res_min/2, fun init_max_length/2}],
+ drop_expired_msgs(
+ lists:foldl(fun({Name, Resolve, Fun}, StateN) ->
+ Fun(args_policy_lookup(Name, Resolve, Q), StateN)
+ end, State#q{args_policy_version = N + 1}, ArgsTable)).
args_policy_lookup(Name, Resolve, Q = #amqqueue{arguments = Args}) ->
AName = <<"x-", Name/binary>>,
@@ -297,8 +301,7 @@ init_exp(Expires, State) -> State1 = init_exp(undefined, State),
ensure_expiry_timer(State1#q{expires = Expires}).
init_ttl(undefined, State) -> stop_ttl_timer(State#q{ttl = undefined});
-init_ttl(TTL, State) -> State1 = init_ttl(undefined, State),
- drop_expired_msgs(State1#q{ttl = TTL}).
+init_ttl(TTL, State) -> (init_ttl(undefined, State))#q{ttl = TTL}.
init_dlx(undefined, State) ->
State#q{dlx = undefined};
@@ -482,10 +485,12 @@ send_drained(C = #cr{ch_pid = ChPid, limiter = Limiter}) ->
deliver_msgs_to_consumers(_DeliverFun, true, State) ->
{true, State};
deliver_msgs_to_consumers(DeliverFun, false,
- State = #q{active_consumers = ActiveConsumers}) ->
+ State = #q{active_consumers = ActiveConsumers,
+ consumer_use = CUInfo}) ->
case priority_queue:out_p(ActiveConsumers) of
{empty, _} ->
- {false, State};
+ {false,
+ State#q{consumer_use = update_consumer_use(CUInfo, inactive)}};
{{value, QEntry, Priority}, Tail} ->
{Stop, State1} = deliver_msg_to_consumer(
DeliverFun, QEntry, Priority,
@@ -536,6 +541,26 @@ deliver_from_queue_deliver(AckRequired, State) ->
{Result, State1} = fetch(AckRequired, State),
{Result, is_empty(State1), State1}.
+update_consumer_use({inactive, _, _, _} = CUInfo, inactive) ->
+ CUInfo;
+update_consumer_use({active, _, _} = CUInfo, active) ->
+ CUInfo;
+update_consumer_use({active, Since, Avg}, inactive) ->
+ Now = now_micros(),
+ {inactive, Now, Now - Since, Avg};
+update_consumer_use({inactive, Since, Active, Avg}, active) ->
+ Now = now_micros(),
+ {active, Now, consumer_use_avg(Active, Now - Since, Avg)}.
+
+consumer_use_avg(Active, Inactive, Avg) ->
+ Time = Inactive + Active,
+ Ratio = Active / Time,
+ Weight = erlang:min(1, Time / 1000000),
+ case Avg of
+ undefined -> Ratio;
+ _ -> Ratio * Weight + Avg * (1 - Weight)
+ end.
+
confirm_messages([], State) ->
State;
confirm_messages(MsgIds, State = #q{msg_id_to_channel = MTC}) ->
@@ -713,7 +738,7 @@ possibly_unblock(State, ChPid, Update) ->
end
end.
-unblock(State, C = #cr{limiter = Limiter}) ->
+unblock(State = #q{consumer_use = CUInfo}, C = #cr{limiter = Limiter}) ->
case lists:partition(
fun({_P, {_ChPid, #consumer{tag = CTag}}}) ->
rabbit_limiter:is_consumer_blocked(Limiter, CTag)
@@ -725,12 +750,14 @@ unblock(State, C = #cr{limiter = Limiter}) ->
BlockedQ = priority_queue:from_list(Blocked),
UnblockedQ = priority_queue:from_list(Unblocked),
update_ch_record(C#cr{blocked_consumers = BlockedQ}),
- AC1 = priority_queue:join(State#q.active_consumers, UnblockedQ),
- State1 = State#q{active_consumers = AC1},
+ State1 = State#q{consumer_use =
+ update_consumer_use(CUInfo, active)},
+ AC1 = priority_queue:join(State1#q.active_consumers, UnblockedQ),
+ State2 = State1#q{active_consumers = AC1},
[notify_decorators(
- consumer_unblocked, [{consumer_tag, CTag}], State1) ||
+ consumer_unblocked, [{consumer_tag, CTag}], State2) ||
{_P, {_ChPid, #consumer{tag = CTag}}} <- Unblocked],
- run_message_queue(State1)
+ run_message_queue(State2)
end.
should_auto_delete(#q{q = #amqqueue{auto_delete = false}}) -> false;
@@ -1037,6 +1064,16 @@ i(messages, State) ->
messages_unacknowledged]]);
i(consumers, _) ->
consumer_count();
+i(consumer_utilisation, #q{consumer_use = ConsumerUse}) ->
+ case consumer_count() of
+ 0 -> '';
+ _ -> case ConsumerUse of
+ {active, Since, Avg} ->
+ consumer_use_avg(now_micros() - Since, 0, Avg);
+ {inactive, Since, Active, Avg} ->
+ consumer_use_avg(Active, now_micros() - Since, Avg)
+ end
+ end;
i(memory, _) ->
{memory, M} = process_info(self(), memory),
M;
@@ -1054,8 +1091,8 @@ i(synchronised_slave_pids, #q{q = #amqqueue{name = Name}}) ->
false -> '';
true -> SSPids
end;
-i(status, #q{status = Status}) ->
- Status;
+i(state, #q{status = running}) -> credit_flow:state();
+i(state, #q{status = State}) -> State;
i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) ->
BQ:status(BQS);
i(Item, _) ->
@@ -1076,7 +1113,10 @@ emit_stats(State) ->
emit_stats(State, []).
emit_stats(State, Extra) ->
- rabbit_event:notify(queue_stats, Extra ++ infos(?STATISTICS_KEYS, State)).
+ ExtraKs = [K || {K, _} <- Extra],
+ Infos = [{K, V} || {K, V} <- infos(?STATISTICS_KEYS, State),
+ not lists:member(K, ExtraKs)],
+ rabbit_event:notify(queue_stats, Extra ++ Infos).
emit_consumer_created(ChPid, CTag, Exclusive, AckRequired, QName, Args) ->
rabbit_event:notify(consumer_created,
@@ -1537,7 +1577,8 @@ handle_pre_hibernate(State = #q{backing_queue = BQ,
BQS3 = BQ:handle_pre_hibernate(BQS2),
rabbit_event:if_enabled(
State, #q.stats_timer,
- fun () -> emit_stats(State, [{idle_since, now()}]) end),
+ fun () -> emit_stats(State, [{idle_since, now()},
+ {consumer_utilisation, ''}]) end),
State1 = rabbit_event:stop_stats_timer(State#q{backing_queue_state = BQS3},
#q.stats_timer),
{hibernate, stop_rate_timer(State1)}.
diff --git a/src/rabbit_binary_parser.erl b/src/rabbit_binary_parser.erl
index dc6d090f..088ad0e5 100644
--- a/src/rabbit_binary_parser.erl
+++ b/src/rabbit_binary_parser.erl
@@ -20,6 +20,7 @@
-export([parse_table/1]).
-export([ensure_content_decoded/1, clear_decoded_content/1]).
+-export([validate_utf8/1, assert_utf8/1]).
%%----------------------------------------------------------------------------
@@ -30,6 +31,8 @@
(rabbit_types:content()) -> rabbit_types:decoded_content()).
-spec(clear_decoded_content/1 ::
(rabbit_types:content()) -> rabbit_types:undecoded_content()).
+-spec(validate_utf8/1 :: (binary()) -> 'ok' | 'error').
+-spec(assert_utf8/1 :: (binary()) -> 'ok').
-endif.
@@ -99,3 +102,18 @@ clear_decoded_content(Content = #content{properties_bin = none}) ->
Content;
clear_decoded_content(Content = #content{}) ->
Content#content{properties = none}.
+
+assert_utf8(B) ->
+ case validate_utf8(B) of
+ ok -> ok;
+ error -> rabbit_misc:protocol_error(
+ frame_error, "Malformed UTF-8 in shortstr", [])
+ end.
+
+validate_utf8(Bin) ->
+ try
+ xmerl_ucs:from_utf8(Bin),
+ ok
+ catch exit:{ucs, _} ->
+ error
+ end.
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
index 98b427c7..11e6bd38 100644
--- a/src/rabbit_binding.erl
+++ b/src/rabbit_binding.erl
@@ -281,9 +281,10 @@ info_all(VHostPath, Items) -> map(VHostPath, fun (B) -> info(B, Items) end).
has_for_source(SrcName) ->
Match = #route{binding = #binding{source = SrcName, _ = '_'}},
- %% we need to check for durable routes here too in case a bunch of
- %% routes to durable queues have been removed temporarily as a
- %% result of a node failure
+ %% we need to check for semi-durable routes (which subsumes
+ %% durable routes) here too in case a bunch of routes to durable
+ %% queues have been removed temporarily as a result of a node
+ %% failure
contains(rabbit_route, Match) orelse
contains(rabbit_semi_durable_route, Match).
@@ -291,8 +292,9 @@ remove_for_source(SrcName) ->
lock_route_tables(),
Match = #route{binding = #binding{source = SrcName, _ = '_'}},
remove_routes(
- lists:usort(mnesia:match_object(rabbit_route, Match, write) ++
- mnesia:match_object(rabbit_durable_route, Match, write))).
+ lists:usort(
+ mnesia:match_object(rabbit_route, Match, write) ++
+ mnesia:match_object(rabbit_semi_durable_route, Match, write))).
remove_for_destination(DstName) ->
remove_for_destination(DstName, fun remove_routes/1).
@@ -323,8 +325,6 @@ delete_object(Tab, Record, LockKind) ->
[_] -> mnesia:delete_object(Tab, Record, LockKind)
end.
-sync_route(R, Fun) -> sync_route(R, true, true, Fun).
-
sync_route(Route, true, true, Fun) ->
ok = Fun(rabbit_durable_route, Route, write),
sync_route(Route, false, true, Fun);
@@ -407,14 +407,17 @@ lock_route_tables() ->
remove_routes(Routes) ->
%% This partitioning allows us to suppress unnecessary delete
%% operations on disk tables, which require an fsync.
- {TransientRoutes, DurableRoutes} =
+ {RamRoutes, DiskRoutes} =
lists:partition(fun (R) -> mnesia:match_object(
rabbit_durable_route, R, write) == [] end,
Routes),
- [ok = sync_transient_route(R, fun mnesia:delete_object/3) ||
- R <- TransientRoutes],
- [ok = sync_route(R, fun mnesia:delete_object/3) ||
- R <- DurableRoutes],
+ %% Of course the destination might not really be durable but it's
+ %% just as easy to try to delete it from the semi-durable table
+ %% than check first
+ [ok = sync_route(R, false, true, fun mnesia:delete_object/3) ||
+ R <- RamRoutes],
+ [ok = sync_route(R, true, true, fun mnesia:delete_object/3) ||
+ R <- DiskRoutes],
[R#route.binding || R <- Routes].
remove_transient_routes(Routes) ->
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index ab977883..f71d85c4 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -53,7 +53,8 @@
messages_uncommitted,
acks_uncommitted,
prefetch_count,
- client_flow_blocked]).
+ client_flow_blocked,
+ state]).
-define(CREATION_EVENT_KEYS,
[pid,
@@ -177,7 +178,8 @@ info_all(Items) ->
refresh_config_local() ->
rabbit_misc:upmap(
- fun (C) -> gen_server2:call(C, refresh_config) end, list_local()),
+ fun (C) -> gen_server2:call(C, refresh_config, infinity) end,
+ list_local()),
ok.
ready_for_close(Pid) ->
@@ -549,6 +551,14 @@ check_not_default_exchange(#resource{kind = exchange, name = <<"">>}) ->
check_not_default_exchange(_) ->
ok.
+check_exchange_deletion(XName = #resource{name = <<"amq.rabbitmq.", _/binary>>,
+ kind = exchange}) ->
+ rabbit_misc:protocol_error(
+ access_refused, "deletion of system ~s not allowed",
+ [rabbit_misc:rs(XName)]);
+check_exchange_deletion(_) ->
+ ok.
+
%% check that an exchange/queue name does not contain the reserved
%% "amq." prefix.
%%
@@ -591,7 +601,11 @@ confirm(MsgSeqNos, QPid, State = #ch{unconfirmed = UC}) ->
record_confirms(MXs, State#ch{unconfirmed = UC1}).
handle_method(#'channel.open'{}, _, State = #ch{state = starting}) ->
- {reply, #'channel.open_ok'{}, State#ch{state = running}};
+ %% Don't leave "starting" as the state for 5s. TODO is this TRTTD?
+ State1 = State#ch{state = running},
+ rabbit_event:if_enabled(State1, #ch.stats_timer,
+ fun() -> emit_stats(State1) end),
+ {reply, #'channel.open_ok'{}, State1};
handle_method(#'channel.open'{}, _, _State) ->
rabbit_misc:protocol_error(
@@ -934,6 +948,7 @@ handle_method(#'exchange.delete'{exchange = ExchangeNameBin,
_, State = #ch{virtual_host = VHostPath}) ->
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
check_not_default_exchange(ExchangeName),
+ check_exchange_deletion(ExchangeName),
check_configure_permitted(ExchangeName, State),
case rabbit_exchange:delete(ExchangeName, IfUnused) of
{error, not_found} ->
@@ -1273,7 +1288,7 @@ binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin,
State = #ch{virtual_host = VHostPath,
conn_pid = ConnPid }) ->
{OrigDName, ActualRoutingKey} =
- expand_binding(DestinationType, DestinationNameBin, RoutingKey, State),
+ expand_binding(DestinationType, DestinationNameBin, RoutingKey, State),
DestinationName = intercept_binding_method(OrigDName, DestinationType, ReturnMethod),
check_write_permitted(DestinationName, State),
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
@@ -1621,6 +1636,8 @@ i(messages_uncommitted, #ch{tx = {Msgs, _Acks}}) -> queue:len(Msgs);
i(messages_uncommitted, #ch{}) -> 0;
i(acks_uncommitted, #ch{tx = {_Msgs, Acks}}) -> ack_len(Acks);
i(acks_uncommitted, #ch{}) -> 0;
+i(state, #ch{state = running}) -> credit_flow:state();
+i(state, #ch{state = State}) -> State;
i(prefetch_count, #ch{limiter = Limiter}) ->
rabbit_limiter:get_prefetch_limit(Limiter);
i(client_flow_blocked, #ch{limiter = Limiter}) ->
@@ -1682,10 +1699,10 @@ intercept_binding_method(OrigDName, _DestinationType, _Method) ->
intercept_method(M, Q) ->
case rabbit_channel_interceptor:run_filter_chain(M, Q,
rabbit_channel_interceptor:select(Q, M)) of
- {ok, QN} ->
+ {ok, QN} ->
QN;
{error, Reason} ->
rabbit_misc:protocol_error(
internal_error, "~s",
[Reason])
- end. \ No newline at end of file
+ end.
diff --git a/src/rabbit_intermediate_sup.erl b/src/rabbit_connection_helper_sup.erl
index a9381f20..e51615e8 100644
--- a/src/rabbit_intermediate_sup.erl
+++ b/src/rabbit_connection_helper_sup.erl
@@ -11,21 +11,27 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is GoPivotal, Inc.
-%% Copyright (c) 2013-2013 GoPivotal, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 GoPivotal, Inc. All rights reserved.
%%
--module(rabbit_intermediate_sup).
+-module(rabbit_connection_helper_sup).
-behaviour(supervisor2).
-export([start_link/0]).
+-export([start_channel_sup_sup/1,
+ start_queue_collector/1]).
-export([init/1]).
+-include("rabbit.hrl").
+
%%----------------------------------------------------------------------------
-ifdef(use_specs).
-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
+-spec(start_channel_sup_sup/1 :: (pid()) -> rabbit_types:ok_pid_or_error()).
+-spec(start_queue_collector/1 :: (pid()) -> rabbit_types:ok_pid_or_error()).
-endif.
%%----------------------------------------------------------------------------
@@ -33,7 +39,20 @@
start_link() ->
supervisor2:start_link(?MODULE, []).
+start_channel_sup_sup(SupPid) ->
+ supervisor2:start_child(
+ SupPid,
+ {channel_sup_sup, {rabbit_channel_sup_sup, start_link, []},
+ intrinsic, infinity, supervisor, [rabbit_channel_sup_sup]}).
+
+start_queue_collector(SupPid) ->
+ supervisor2:start_child(
+ SupPid,
+ {collector, {rabbit_queue_collector, start_link, []},
+ intrinsic, ?MAX_WAIT, worker, [rabbit_queue_collector]}).
+
%%----------------------------------------------------------------------------
init([]) ->
{ok, {{one_for_one, 10, 10}, []}}.
+
diff --git a/src/rabbit_connection_sup.erl b/src/rabbit_connection_sup.erl
index c1fa17aa..9ed5dc77 100644
--- a/src/rabbit_connection_sup.erl
+++ b/src/rabbit_connection_sup.erl
@@ -37,27 +37,25 @@
start_link() ->
{ok, SupPid} = supervisor2:start_link(?MODULE, []),
- {ok, Collector} =
- supervisor2:start_child(
- SupPid,
- {collector, {rabbit_queue_collector, start_link, []},
- intrinsic, ?MAX_WAIT, worker, [rabbit_queue_collector]}),
%% We need to get channels in the hierarchy here so they get shut
%% down after the reader, so the reader gets a chance to terminate
%% them cleanly. But for 1.0 readers we can't start the real
%% ch_sup_sup (because we don't know if we will be 0-9-1 or 1.0) -
%% so we add another supervisor into the hierarchy.
- {ok, ChannelSup3Pid} =
+ %%
+ %% This supervisor also acts as an intermediary for heartbeaters and
+ %% the queue collector process, since these must not be siblings of the
+ %% reader due to the potential for deadlock if they are added/restarted
+ %% whilst the supervision tree is shutting down.
+ {ok, HelperSup} =
supervisor2:start_child(
SupPid,
- {channel_sup3, {rabbit_intermediate_sup, start_link, []},
- intrinsic, infinity, supervisor, [rabbit_intermediate_sup]}),
+ {helper_sup, {rabbit_connection_helper_sup, start_link, []},
+ intrinsic, infinity, supervisor, [rabbit_connection_helper_sup]}),
{ok, ReaderPid} =
supervisor2:start_child(
SupPid,
- {reader, {rabbit_reader, start_link,
- [ChannelSup3Pid, Collector,
- rabbit_heartbeat:start_heartbeat_fun(SupPid)]},
+ {reader, {rabbit_reader, start_link, [HelperSup]},
intrinsic, ?MAX_WAIT, worker, [rabbit_reader]}),
{ok, SupPid, ReaderPid}.
diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl
index 6f36f99d..f3463286 100644
--- a/src/rabbit_control_main.erl
+++ b/src/rabbit_control_main.erl
@@ -706,7 +706,14 @@ unsafe_rpc(Node, Mod, Fun, Args) ->
end.
call(Node, {Mod, Fun, Args}) ->
- rpc_call(Node, Mod, Fun, lists:map(fun list_to_binary/1, Args)).
+ rpc_call(Node, Mod, Fun, lists:map(fun list_to_binary_utf8/1, Args)).
+
+list_to_binary_utf8(L) ->
+ B = list_to_binary(L),
+ case rabbit_binary_parser:validate_utf8(B) of
+ ok -> B;
+ error -> throw({error, {not_utf_8, L}})
+ end.
rpc_call(Node, Mod, Fun, Args) ->
rpc:call(Node, Mod, Fun, Args, ?RPC_TIMEOUT).
diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl
index 17ed8563..ab8c62fe 100644
--- a/src/rabbit_error_logger.erl
+++ b/src/rabbit_error_logger.erl
@@ -51,7 +51,7 @@ stop() ->
init([DefaultVHost]) ->
#exchange{} = rabbit_exchange:declare(
rabbit_misc:r(DefaultVHost, exchange, ?LOG_EXCH_NAME),
- topic, true, false, false, []),
+ topic, true, false, true, []),
{ok, #resource{virtual_host = DefaultVHost,
kind = exchange,
name = ?LOG_EXCH_NAME}}.
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index fc131519..bb5b63e9 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -244,10 +244,16 @@ lookup_or_die(Name) ->
{error, not_found} -> rabbit_misc:not_found(Name)
end.
+%% Not dirty_match_object since that would not be transactional when used in a
+%% tx context
list(VHostPath) ->
- mnesia:dirty_match_object(
- rabbit_exchange,
- #exchange{name = rabbit_misc:r(VHostPath, exchange), _ = '_'}).
+ mnesia:async_dirty(
+ fun () ->
+ mnesia:match_object(
+ rabbit_exchange,
+ #exchange{name = rabbit_misc:r(VHostPath, exchange), _ = '_'},
+ read)
+ end).
lookup_scratch(Name, App) ->
case lookup(Name) of
diff --git a/src/rabbit_file.erl b/src/rabbit_file.erl
index 4cf314ca..1a766b05 100644
--- a/src/rabbit_file.erl
+++ b/src/rabbit_file.erl
@@ -181,8 +181,8 @@ with_synced_copy(Path, Modes, Fun) ->
{ok, Hdl} ->
try
Result = Fun(Hdl),
- ok = prim_file:rename(Bak, Path),
ok = prim_file:sync(Hdl),
+ ok = prim_file:rename(Bak, Path),
Result
after
prim_file:close(Hdl)
diff --git a/src/rabbit_heartbeat.erl b/src/rabbit_heartbeat.erl
index fac74edb..ca67254b 100644
--- a/src/rabbit_heartbeat.erl
+++ b/src/rabbit_heartbeat.erl
@@ -16,8 +16,9 @@
-module(rabbit_heartbeat).
+-export([start/6]).
-export([start_heartbeat_sender/3, start_heartbeat_receiver/3,
- start_heartbeat_fun/1, pause_monitor/1, resume_monitor/1]).
+ pause_monitor/1, resume_monitor/1]).
-export([system_continue/3, system_terminate/4, system_code_change/4]).
@@ -28,16 +29,15 @@
-ifdef(use_specs).
-export_type([heartbeaters/0]).
--export_type([start_heartbeat_fun/0]).
-type(heartbeaters() :: {rabbit_types:maybe(pid()), rabbit_types:maybe(pid())}).
-type(heartbeat_callback() :: fun (() -> any())).
--type(start_heartbeat_fun() ::
- fun((rabbit_net:socket(), non_neg_integer(), heartbeat_callback(),
- non_neg_integer(), heartbeat_callback()) ->
- no_return())).
+-spec(start/6 ::
+ (pid(), rabbit_net:socket(),
+ non_neg_integer(), heartbeat_callback(),
+ non_neg_integer(), heartbeat_callback()) -> heartbeaters()).
-spec(start_heartbeat_sender/3 ::
(rabbit_net:socket(), non_neg_integer(), heartbeat_callback()) ->
@@ -46,10 +46,6 @@
(rabbit_net:socket(), non_neg_integer(), heartbeat_callback()) ->
rabbit_types:ok(pid())).
--spec(start_heartbeat_fun/1 ::
- (pid()) -> start_heartbeat_fun()).
-
-
-spec(pause_monitor/1 :: (heartbeaters()) -> 'ok').
-spec(resume_monitor/1 :: (heartbeaters()) -> 'ok').
@@ -61,6 +57,17 @@
%%----------------------------------------------------------------------------
+start(SupPid, Sock, SendTimeoutSec, SendFun, ReceiveTimeoutSec, ReceiveFun) ->
+ {ok, Sender} =
+ start_heartbeater(SendTimeoutSec, SupPid, Sock,
+ SendFun, heartbeat_sender,
+ start_heartbeat_sender),
+ {ok, Receiver} =
+ start_heartbeater(ReceiveTimeoutSec, SupPid, Sock,
+ ReceiveFun, heartbeat_receiver,
+ start_heartbeat_receiver),
+ {Sender, Receiver}.
+
start_heartbeat_sender(Sock, TimeoutSec, SendFun) ->
%% the 'div 2' is there so that we don't end up waiting for nearly
%% 2 * TimeoutSec before sending a heartbeat in the boundary case
@@ -75,19 +82,6 @@ start_heartbeat_receiver(Sock, TimeoutSec, ReceiveFun) ->
heartbeater({Sock, TimeoutSec * 1000, recv_oct, 1,
fun () -> ReceiveFun(), stop end}).
-start_heartbeat_fun(SupPid) ->
- fun (Sock, SendTimeoutSec, SendFun, ReceiveTimeoutSec, ReceiveFun) ->
- {ok, Sender} =
- start_heartbeater(SendTimeoutSec, SupPid, Sock,
- SendFun, heartbeat_sender,
- start_heartbeat_sender),
- {ok, Receiver} =
- start_heartbeater(ReceiveTimeoutSec, SupPid, Sock,
- ReceiveFun, heartbeat_receiver,
- start_heartbeat_receiver),
- {Sender, Receiver}
- end.
-
pause_monitor({_Sender, none}) -> ok;
pause_monitor({_Sender, Receiver}) -> Receiver ! pause, ok.
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index 12a13c00..22da465b 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -197,24 +197,25 @@ start_link() -> gen_server2:start_link(?MODULE, [], []).
new(Pid) ->
%% this a 'call' to ensure that it is invoked at most once.
- ok = gen_server:call(Pid, {new, self()}),
+ ok = gen_server:call(Pid, {new, self()}, infinity),
#lstate{pid = Pid, prefetch_limited = false, blocked = false}.
limit_prefetch(L, PrefetchCount, UnackedCount) when PrefetchCount > 0 ->
- ok = gen_server:call(L#lstate.pid,
- {limit_prefetch, PrefetchCount, UnackedCount}),
+ ok = gen_server:call(
+ L#lstate.pid,
+ {limit_prefetch, PrefetchCount, UnackedCount}, infinity),
L#lstate{prefetch_limited = true}.
unlimit_prefetch(L) ->
- ok = gen_server:call(L#lstate.pid, unlimit_prefetch),
+ ok = gen_server:call(L#lstate.pid, unlimit_prefetch, infinity),
L#lstate{prefetch_limited = false}.
block(L) ->
- ok = gen_server:call(L#lstate.pid, block),
+ ok = gen_server:call(L#lstate.pid, block, infinity),
L#lstate{blocked = true}.
unblock(L) ->
- ok = gen_server:call(L#lstate.pid, unblock),
+ ok = gen_server:call(L#lstate.pid, unblock, infinity),
L#lstate{blocked = false}.
is_prefetch_limited(#lstate{prefetch_limited = Limited}) -> Limited.
@@ -224,7 +225,8 @@ is_blocked(#lstate{blocked = Blocked}) -> Blocked.
is_active(L) -> is_prefetch_limited(L) orelse is_blocked(L).
get_prefetch_limit(#lstate{prefetch_limited = false}) -> 0;
-get_prefetch_limit(L) -> gen_server:call(L#lstate.pid, get_prefetch_limit).
+get_prefetch_limit(L) ->
+ gen_server:call(L#lstate.pid, get_prefetch_limit, infinity).
ack(#lstate{prefetch_limited = false}, _AckCount) -> ok;
ack(L, AckCount) -> gen_server:cast(L#lstate.pid, {ack, AckCount}).
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 3abd81f5..d9cef642 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -110,7 +110,13 @@ init_with_existing_bq(Q = #amqqueue{name = QName}, BQ, BQS) ->
Q1#amqqueue{gm_pids = [{GM, Self} | GMPids]})
end),
{_MNode, SNodes} = rabbit_mirror_queue_misc:suggested_queue_nodes(Q),
- rabbit_mirror_queue_misc:add_mirrors(QName, SNodes),
+ %% We need synchronous add here (i.e. do not return until the
+ %% slave is running) so that when queue declaration is finished
+ %% all slaves are up; we don't want to end up with unsynced slaves
+ %% just by declaring a new queue. But add can't be synchronous all
+ %% the time as it can be called by slaves and that's
+ %% deadlock-prone.
+ rabbit_mirror_queue_misc:add_mirrors(QName, SNodes, sync),
#state { name = QName,
gm = GM,
coordinator = CPid,
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index 8ad7c62f..ca495733 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -17,9 +17,10 @@
-module(rabbit_mirror_queue_misc).
-behaviour(rabbit_policy_validator).
--export([remove_from_queue/3, on_node_up/0, add_mirrors/2, add_mirror/2,
+-export([remove_from_queue/3, on_node_up/0, add_mirrors/3,
report_deaths/4, store_updated_slaves/1, suggested_queue_nodes/1,
- is_mirrored/1, update_mirrors/2, validate_policy/1]).
+ is_mirrored/1, update_mirrors/2, validate_policy/1,
+ maybe_auto_sync/1]).
%% for testing only
-export([module/1]).
@@ -45,10 +46,8 @@
(rabbit_amqqueue:name(), pid(), [pid()])
-> {'ok', pid(), [pid()]} | {'error', 'not_found'}).
-spec(on_node_up/0 :: () -> 'ok').
--spec(add_mirrors/2 :: (rabbit_amqqueue:name(), [node()]) -> 'ok').
--spec(add_mirror/2 ::
- (rabbit_amqqueue:name(), node()) ->
- {'ok', atom()} | rabbit_types:error(any())).
+-spec(add_mirrors/3 :: (rabbit_amqqueue:name(), [node()], 'sync' | 'async')
+ -> 'ok').
-spec(store_updated_slaves/1 :: (rabbit_types:amqqueue()) ->
rabbit_types:amqqueue()).
-spec(suggested_queue_nodes/1 :: (rabbit_types:amqqueue()) ->
@@ -56,6 +55,7 @@
-spec(is_mirrored/1 :: (rabbit_types:amqqueue()) -> boolean()).
-spec(update_mirrors/2 ::
(rabbit_types:amqqueue(), rabbit_types:amqqueue()) -> 'ok').
+-spec(maybe_auto_sync/1 :: (rabbit_types:amqqueue()) -> 'ok').
-endif.
@@ -94,10 +94,15 @@ remove_from_queue(QueueName, Self, LiveGMPids) ->
%% Either master hasn't changed, so
%% we're ok to update mnesia; or we have
%% become the master.
- store_updated_slaves(
- Q #amqqueue { pid = QPid1,
+ Q1 = Q#amqqueue{pid = QPid1,
slave_pids = SPids1,
- gm_pids = GMPids1 }),
+ gm_pids = GMPids1},
+ store_updated_slaves(Q1),
+ %% If we add and remove nodes at the same time we
+ %% might tell the old master we need to sync and
+ %% then shut it down. So let's check if the new
+ %% master needs to sync.
+ maybe_auto_sync(Q1),
{ok, QPid1, [QPid | SPids] -- Alive};
_ ->
%% Master has changed, and we're not it,
@@ -135,7 +140,7 @@ on_node_up() ->
end
end, [], rabbit_queue)
end),
- [add_mirror(QName, node()) || QName <- QNames],
+ [add_mirror(QName, node(), async) || QName <- QNames],
ok.
drop_mirrors(QName, Nodes) ->
@@ -160,45 +165,35 @@ drop_mirror(QName, MirrorNode) ->
end
end).
-add_mirrors(QName, Nodes) ->
- [add_mirror(QName, Node) || Node <- Nodes],
+add_mirrors(QName, Nodes, SyncMode) ->
+ [add_mirror(QName, Node, SyncMode) || Node <- Nodes],
ok.
-add_mirror(QName, MirrorNode) ->
+add_mirror(QName, MirrorNode, SyncMode) ->
rabbit_amqqueue:with(
QName,
fun (#amqqueue { name = Name, pid = QPid, slave_pids = SPids } = Q) ->
case [Pid || Pid <- [QPid | SPids], node(Pid) =:= MirrorNode] of
[] ->
- start_child(Name, MirrorNode, Q);
+ start_child(Name, MirrorNode, Q, SyncMode);
[SPid] ->
case rabbit_misc:is_process_alive(SPid) of
true -> {ok, already_mirrored};
- false -> start_child(Name, MirrorNode, Q)
+ false -> start_child(Name, MirrorNode, Q, SyncMode)
end
end
end).
-start_child(Name, MirrorNode, Q) ->
+start_child(Name, MirrorNode, Q, SyncMode) ->
case rabbit_misc:with_exit_handler(
- rabbit_misc:const({ok, down}),
+ rabbit_misc:const(down),
fun () ->
rabbit_mirror_queue_slave_sup:start_child(MirrorNode, [Q])
end) of
- {ok, SPid} when is_pid(SPid) ->
- maybe_auto_sync(Q),
- rabbit_log:info("Adding mirror of ~s on node ~p: ~p~n",
- [rabbit_misc:rs(Name), MirrorNode, SPid]),
- {ok, started};
- {error, {{stale_master_pid, StalePid}, _}} ->
- rabbit_log:warning("Detected stale HA master while adding "
- "mirror of ~s on node ~p: ~p~n",
- [rabbit_misc:rs(Name), MirrorNode, StalePid]),
- {ok, stale_master};
- {error, {{duplicate_live_master, _}=Err, _}} ->
- Err;
- Other ->
- Other
+ {ok, SPid} -> rabbit_log:info("Adding mirror of ~s on node ~p: ~p~n",
+ [rabbit_misc:rs(Name), MirrorNode, SPid]),
+ rabbit_mirror_queue_slave:go(SPid, SyncMode);
+ _ -> ok
end.
report_deaths(_MirrorPid, _IsMaster, _QueueName, []) ->
@@ -312,8 +307,10 @@ update_mirrors0(OldQ = #amqqueue{name = QName},
{NewMNode, NewSNodes} = suggested_queue_nodes(NewQ),
OldNodes = [OldMNode | OldSNodes],
NewNodes = [NewMNode | NewSNodes],
- add_mirrors (QName, NewNodes -- OldNodes),
+ add_mirrors (QName, NewNodes -- OldNodes, async),
drop_mirrors(QName, OldNodes -- NewNodes),
+ %% This is for the case where no extra nodes were added but we changed to
+ %% a policy requiring auto-sync.
maybe_auto_sync(NewQ),
ok.
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index b1a86493..96f89ecc 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -24,7 +24,7 @@
%% All instructions from the GM group must be processed in the order
%% in which they're received.
--export([start_link/1, set_maximum_since_use/2, info/1]).
+-export([start_link/1, set_maximum_since_use/2, info/1, go/2]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3, handle_pre_hibernate/1, prioritise_call/4,
@@ -78,7 +78,15 @@ set_maximum_since_use(QPid, Age) ->
info(QPid) -> gen_server2:call(QPid, info, infinity).
-init(Q = #amqqueue { name = QName }) ->
+init(Q) ->
+ {ok, {not_started, Q}, hibernate,
+ {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN,
+ ?DESIRED_HIBERNATE}}.
+
+go(SPid, sync) -> gen_server2:call(SPid, go, infinity);
+go(SPid, async) -> gen_server2:cast(SPid, go).
+
+handle_go(Q = #amqqueue{name = QName}) ->
%% We join the GM group before we add ourselves to the amqqueue
%% record. As a result:
%% 1. We can receive msgs from GM that correspond to messages we will
@@ -124,22 +132,27 @@ init(Q = #amqqueue { name = QName }) ->
},
ok = gm:broadcast(GM, request_depth),
ok = gm:validate_members(GM, [GM | [G || {G, _} <- GMPids]]),
- {ok, State, hibernate,
- {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN,
- ?DESIRED_HIBERNATE}};
+ rabbit_mirror_queue_misc:maybe_auto_sync(Q1),
+ {ok, State};
{stale, StalePid} ->
- {stop, {stale_master_pid, StalePid}};
+ rabbit_log:warning("Detected stale HA master while adding "
+ "mirror of ~s: ~p~n",
+ [rabbit_misc:rs(QName), StalePid]),
+ gm:leave(GM),
+ {error, {stale_master_pid, StalePid}};
duplicate_live_master ->
- {stop, {duplicate_live_master, Node}};
+ gm:leave(GM),
+ {error, {duplicate_live_master, Node}};
existing ->
gm:leave(GM),
- ignore;
+ {error, normal};
master_in_recovery ->
+ gm:leave(GM),
%% The queue record vanished - we must have a master starting
%% concurrently with us. In that case we can safely decide to do
%% nothing here, and the master will start us in
%% master:init_with_existing_bq/3
- ignore
+ {error, normal}
end.
init_it(Self, GM, Node, QName) ->
@@ -173,6 +186,12 @@ add_slave(Q = #amqqueue { slave_pids = SPids, gm_pids = GMPids }, New, GM) ->
rabbit_mirror_queue_misc:store_updated_slaves(
Q#amqqueue{slave_pids = SPids ++ [New], gm_pids = [{GM, New} | GMPids]}).
+handle_call(go, _From, {not_started, Q} = NotStarted) ->
+ case handle_go(Q) of
+ {ok, State} -> {reply, ok, State};
+ {error, Error} -> {stop, Error, NotStarted}
+ end;
+
handle_call({deliver, Delivery, true}, From, State) ->
%% Synchronous, "mandatory" deliver mode.
gen_server2:reply(From, ok),
@@ -208,6 +227,12 @@ handle_call({gm_deaths, LiveGMPids}, From,
handle_call(info, _From, State) ->
reply(infos(?INFO_KEYS, State), State).
+handle_cast(go, {not_started, Q} = NotStarted) ->
+ case handle_go(Q) of
+ {ok, State} -> {noreply, State};
+ {error, Error} -> {stop, Error, NotStarted}
+ end;
+
handle_cast({run_backing_queue, Mod, Fun}, State) ->
noreply(run_backing_queue(Mod, Fun, State));
@@ -293,32 +318,43 @@ handle_info({bump_credit, Msg}, State) ->
handle_info(Msg, State) ->
{stop, {unexpected_info, Msg}, State}.
-%% If the Reason is shutdown, or {shutdown, _}, it is not the queue
-%% being deleted: it's just the node going down. Even though we're a
-%% slave, we have no idea whether or not we'll be the only copy coming
-%% back up. Thus we must assume we will be, and preserve anything we
-%% have on disk.
+terminate(normal, {not_started, _Q}) ->
+ ok;
terminate(_Reason, #state { backing_queue_state = undefined }) ->
%% We've received a delete_and_terminate from gm, thus nothing to
%% do here.
ok;
-terminate({shutdown, dropped} = R, #state { backing_queue = BQ,
- backing_queue_state = BQS }) ->
+terminate({shutdown, dropped} = R, State = #state{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
%% See rabbit_mirror_queue_master:terminate/2
+ terminate_common(State),
BQ:delete_and_terminate(R, BQS);
-terminate(Reason, #state { q = Q,
- gm = GM,
- backing_queue = BQ,
- backing_queue_state = BQS,
- rate_timer_ref = RateTRef }) ->
- ok = gm:leave(GM),
- QueueState = rabbit_amqqueue_process:init_with_backing_queue_state(
- Q, BQ, BQS, RateTRef, [], pmon:new(), dict:new()),
- rabbit_amqqueue_process:terminate(Reason, QueueState);
+terminate(shutdown, State) ->
+ terminate_shutdown(shutdown, State);
+terminate({shutdown, _} = R, State) ->
+ terminate_shutdown(R, State);
+terminate(Reason, State = #state{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ terminate_common(State),
+ BQ:delete_and_terminate(Reason, BQS);
terminate([_SPid], _Reason) ->
%% gm case
ok.
+%% If the Reason is shutdown, or {shutdown, _}, it is not the queue
+%% being deleted: it's just the node going down. Even though we're a
+%% slave, we have no idea whether or not we'll be the only copy coming
+%% back up. Thus we must assume we will be, and preserve anything we
+%% have on disk.
+terminate_shutdown(Reason, State = #state{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ terminate_common(State),
+ BQ:terminate(Reason, BQS).
+
+terminate_common(State) ->
+ ok = rabbit_memory_monitor:deregister(self()),
+ stop_rate_timer(stop_sync_timer(State)).
+
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
@@ -394,7 +430,9 @@ handle_msg([SPid], _From, Msg) ->
ok = gen_server2:cast(SPid, {gm, Msg}).
inform_deaths(SPid, Live) ->
- case gen_server2:call(SPid, {gm_deaths, Live}, infinity) of
+ case rabbit_misc:with_exit_handler(
+ rabbit_misc:const(ok),
+ fun() -> gen_server2:call(SPid, {gm_deaths, Live}, infinity) end) of
ok -> ok;
{promote, CPid} -> {become, rabbit_mirror_queue_coordinator, [CPid]}
end.
@@ -663,8 +701,13 @@ confirm_sender_death(Pid) ->
ok.
forget_sender(_, running) -> false;
+forget_sender(down_from_gm, down_from_gm) -> false; %% [1]
forget_sender(Down1, Down2) when Down1 =/= Down2 -> true.
+%% [1] If another slave goes through confirm_sender_death/1 before we
+%% do we can get two GM sender_death messages in a row for the same
+%% channel - don't treat that as anything special.
+
%% Record and process lifetime events from channels. Forget all about a channel
%% only when down notifications are received from both the channel and from gm.
maybe_forget_sender(ChPid, ChState, State = #state { sender_queues = SQ,
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 3a8fae7f..f27f77c6 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -434,10 +434,8 @@ init_db(ClusterNodes, NodeType, CheckOtherNodes) ->
%% First disc node up
maybe_force_load(),
ok;
- {[AnotherNode | _], _, _} ->
+ {[_ | _], _, _} ->
%% Subsequent node in cluster, catch up
- ensure_version_ok(
- rpc:call(AnotherNode, rabbit_version, recorded, [])),
maybe_force_load(),
ok = rabbit_table:wait_for_replicated(),
ok = rabbit_table:create_local_copy(NodeType)
@@ -639,15 +637,6 @@ schema_ok_or_move() ->
ok = create_schema()
end.
-ensure_version_ok({ok, DiscVersion}) ->
- DesiredVersion = rabbit_version:desired(),
- case rabbit_version:matches(DesiredVersion, DiscVersion) of
- true -> ok;
- false -> throw({error, {version_mismatch, DesiredVersion, DiscVersion}})
- end;
-ensure_version_ok({error, _}) ->
- ok = rabbit_version:record_desired().
-
%% We only care about disc nodes since ram nodes are supposed to catch
%% up only
create_schema() ->
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index 46cfabe3..91be4dcb 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -149,14 +149,22 @@ ensure_ssl() ->
ok = app_utils:start_applications(SslAppsConfig),
{ok, SslOptsConfig} = application:get_env(rabbit, ssl_options),
- % unknown_ca errors are silently ignored prior to R14B unless we
- % supply this verify_fun - remove when at least R14B is required
- case proplists:get_value(verify, SslOptsConfig, verify_none) of
- verify_none -> SslOptsConfig;
- verify_peer -> [{verify_fun, fun([]) -> true;
- ([_|_]) -> false
- end}
- | SslOptsConfig]
+ case rabbit_misc:pget(verify_fun, SslOptsConfig) of
+ {Module, Function} ->
+ rabbit_misc:pset(verify_fun,
+ fun (ErrorList) ->
+ Module:Function(ErrorList)
+ end, SslOptsConfig);
+ undefined ->
+ % unknown_ca errors are silently ignored prior to R14B unless we
+ % supply this verify_fun - remove when at least R14B is required
+ case proplists:get_value(verify, SslOptsConfig, verify_none) of
+ verify_none -> SslOptsConfig;
+ verify_peer -> [{verify_fun, fun([]) -> true;
+ ([_|_]) -> false
+ end}
+ | SslOptsConfig]
+ end
end.
ssl_transform_fun(SslOpts) ->
diff --git a/src/rabbit_policy.erl b/src/rabbit_policy.erl
index de10b30f..cd55381a 100644
--- a/src/rabbit_policy.erl
+++ b/src/rabbit_policy.erl
@@ -208,10 +208,16 @@ notify_clear(VHost, <<"policy">>, _Name) ->
%%----------------------------------------------------------------------------
+%% [1] We need to prevent this from becoming O(n^2) in a similar
+%% manner to rabbit_binding:remove_for_{source,destination}. So see
+%% the comment in rabbit_binding:lock_route_tables/0 for more rationale.
update_policies(VHost) ->
- Policies = list(VHost),
+ Tabs = [rabbit_queue, rabbit_durable_queue,
+ rabbit_exchange, rabbit_durable_exchange],
{Xs, Qs} = rabbit_misc:execute_mnesia_transaction(
fun() ->
+ [mnesia:lock({table, T}, write) || T <- Tabs], %% [1]
+ Policies = list(VHost),
{[update_exchange(X, Policies) ||
X <- rabbit_exchange:list(VHost)],
[update_queue(Q, Policies) ||
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 157b8270..67effab0 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -18,12 +18,12 @@
-include("rabbit_framing.hrl").
-include("rabbit.hrl").
--export([start_link/3, info_keys/0, info/1, info/2, force_event_refresh/1,
+-export([start_link/1, info_keys/0, info/1, info/2, force_event_refresh/1,
shutdown/2]).
-export([system_continue/3, system_terminate/4, system_code_change/4]).
--export([init/4, mainloop/2, recvloop/2]).
+-export([init/2, mainloop/2, recvloop/2]).
-export([conserve_resources/3, server_properties/1]).
@@ -32,16 +32,16 @@
-define(CLOSING_TIMEOUT, 30).
-define(CHANNEL_TERMINATION_TIMEOUT, 3).
-define(SILENT_CLOSE_DELAY, 3).
+-define(CHANNEL_MIN, 1).
%%--------------------------------------------------------------------------
-record(v1, {parent, sock, connection, callback, recv_len, pending_recv,
- connection_state, queue_collector, heartbeater, stats_timer,
- ch_sup3_pid, channel_sup_sup_pid, start_heartbeat_fun,
- buf, buf_len, throttle}).
+ connection_state, helper_sup, queue_collector, heartbeater,
+ stats_timer, channel_sup_sup_pid, buf, buf_len, throttle}).
-record(connection, {name, host, peer_host, port, peer_port,
- protocol, user, timeout_sec, frame_max, vhost,
+ protocol, user, timeout_sec, frame_max, channel_max, vhost,
client_properties, capabilities,
auth_mechanism, auth_state}).
@@ -49,15 +49,14 @@
blocked_sent}).
-define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt,
- send_pend, state, last_blocked_by, last_blocked_age,
- channels]).
+ send_pend, state, channels]).
-define(CREATION_EVENT_KEYS,
[pid, name, port, peer_port, host,
peer_host, ssl, peer_cert_subject, peer_cert_issuer,
peer_cert_validity, auth_mechanism, ssl_protocol,
ssl_key_exchange, ssl_cipher, ssl_hash, protocol, user, vhost,
- timeout, frame_max, client_properties]).
+ timeout, frame_max, channel_max, client_properties]).
-define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]).
@@ -74,8 +73,7 @@
-ifdef(use_specs).
--spec(start_link/3 :: (pid(), pid(), rabbit_heartbeat:start_heartbeat_fun()) ->
- rabbit_types:ok(pid())).
+-spec(start_link/1 :: (pid()) -> rabbit_types:ok(pid())).
-spec(info_keys/0 :: () -> rabbit_types:info_keys()).
-spec(info/1 :: (pid()) -> rabbit_types:infos()).
-spec(info/2 :: (pid(), rabbit_types:info_keys()) -> rabbit_types:infos()).
@@ -86,11 +84,9 @@
rabbit_framing:amqp_table()).
%% These specs only exists to add no_return() to keep dialyzer happy
--spec(init/4 :: (pid(), pid(), pid(), rabbit_heartbeat:start_heartbeat_fun())
- -> no_return()).
--spec(start_connection/7 ::
- (pid(), pid(), pid(), rabbit_heartbeat:start_heartbeat_fun(), any(),
- rabbit_net:socket(),
+-spec(init/2 :: (pid(), pid()) -> no_return()).
+-spec(start_connection/5 ::
+ (pid(), pid(), any(), rabbit_net:socket(),
fun ((rabbit_net:socket()) ->
rabbit_types:ok_or_error2(
rabbit_net:socket(), any()))) -> no_return()).
@@ -104,20 +100,17 @@
%%--------------------------------------------------------------------------
-start_link(ChannelSup3Pid, Collector, StartHeartbeatFun) ->
- {ok, proc_lib:spawn_link(?MODULE, init, [self(), ChannelSup3Pid,
- Collector, StartHeartbeatFun])}.
+start_link(HelperSup) ->
+ {ok, proc_lib:spawn_link(?MODULE, init, [self(), HelperSup])}.
shutdown(Pid, Explanation) ->
gen_server:call(Pid, {shutdown, Explanation}, infinity).
-init(Parent, ChSup3Pid, Collector, StartHeartbeatFun) ->
+init(Parent, HelperSup) ->
Deb = sys:debug_options([]),
receive
{go, Sock, SockTransform} ->
- start_connection(
- Parent, ChSup3Pid, Collector, StartHeartbeatFun, Deb, Sock,
- SockTransform)
+ start_connection(Parent, HelperSup, Deb, Sock, SockTransform)
end.
system_continue(Parent, Deb, State) ->
@@ -205,8 +198,7 @@ socket_op(Sock, Fun) ->
exit(normal)
end.
-start_connection(Parent, ChSup3Pid, Collector, StartHeartbeatFun, Deb,
- Sock, SockTransform) ->
+start_connection(Parent, HelperSup, Deb, Sock, SockTransform) ->
process_flag(trap_exit, true),
Name = case rabbit_net:connection_string(Sock, inbound) of
{ok, Str} -> Str;
@@ -242,11 +234,10 @@ start_connection(Parent, ChSup3Pid, Collector, StartHeartbeatFun, Deb,
recv_len = 0,
pending_recv = false,
connection_state = pre_init,
- queue_collector = Collector,
+ queue_collector = undefined, %% started on tune-ok
+ helper_sup = HelperSup,
heartbeater = none,
- ch_sup3_pid = ChSup3Pid,
channel_sup_sup_pid = none,
- start_heartbeat_fun = StartHeartbeatFun,
buf = [],
buf_len = 0,
throttle = #throttle{
@@ -615,17 +606,26 @@ create_channel(Channel, State) ->
connection = #connection{name = Name,
protocol = Protocol,
frame_max = FrameMax,
+ channel_max = ChannelMax,
user = User,
vhost = VHost,
capabilities = Capabilities}} = State,
- {ok, _ChSupPid, {ChPid, AState}} =
- rabbit_channel_sup_sup:start_channel(
- ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), Name,
- Protocol, User, VHost, Capabilities, Collector}),
- MRef = erlang:monitor(process, ChPid),
- put({ch_pid, ChPid}, {Channel, MRef}),
- put({channel, Channel}, {ChPid, AState}),
- {ChPid, AState}.
+ N = length(all_channels()),
+ case ChannelMax == 0 orelse N < ChannelMax of
+ true -> {ok, _ChSupPid, {ChPid, AState}} =
+ rabbit_channel_sup_sup:start_channel(
+ ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), Name,
+ Protocol, User, VHost, Capabilities,
+ Collector}),
+ MRef = erlang:monitor(process, ChPid),
+ put({ch_pid, ChPid}, {Channel, MRef}),
+ put({channel, Channel}, {ChPid, AState}),
+ {ok, {ChPid, AState}};
+ false -> {error, rabbit_misc:amqp_error(
+ not_allowed, "number of channels opened (~w) has "
+ "reached the negotiated channel_max (~w)",
+ [N, ChannelMax], 'none')}
+ end.
channel_cleanup(ChPid) ->
case get({ch_pid, ChPid}) of
@@ -673,24 +673,28 @@ handle_frame(Type, Channel, Payload, State) ->
process_frame(Frame, Channel, State) ->
ChKey = {channel, Channel},
- {ChPid, AState} = case get(ChKey) of
- undefined -> create_channel(Channel, State);
- Other -> Other
- end,
- case rabbit_command_assembler:process(Frame, AState) of
- {ok, NewAState} ->
- put(ChKey, {ChPid, NewAState}),
- post_process_frame(Frame, ChPid, State);
- {ok, Method, NewAState} ->
- rabbit_channel:do(ChPid, Method),
- put(ChKey, {ChPid, NewAState}),
- post_process_frame(Frame, ChPid, State);
- {ok, Method, Content, NewAState} ->
- rabbit_channel:do_flow(ChPid, Method, Content),
- put(ChKey, {ChPid, NewAState}),
- post_process_frame(Frame, ChPid, control_throttle(State));
- {error, Reason} ->
- handle_exception(State, Channel, Reason)
+ case (case get(ChKey) of
+ undefined -> create_channel(Channel, State);
+ Other -> {ok, Other}
+ end) of
+ {error, Error} ->
+ handle_exception(State, Channel, Error);
+ {ok, {ChPid, AState}} ->
+ case rabbit_command_assembler:process(Frame, AState) of
+ {ok, NewAState} ->
+ put(ChKey, {ChPid, NewAState}),
+ post_process_frame(Frame, ChPid, State);
+ {ok, Method, NewAState} ->
+ rabbit_channel:do(ChPid, Method),
+ put(ChKey, {ChPid, NewAState}),
+ post_process_frame(Frame, ChPid, State);
+ {ok, Method, Content, NewAState} ->
+ rabbit_channel:do_flow(ChPid, Method, Content),
+ put(ChKey, {ChPid, NewAState}),
+ post_process_frame(Frame, ChPid, control_throttle(State));
+ {error, Reason} ->
+ handle_exception(State, Channel, Reason)
+ end
end.
post_process_frame({method, 'channel.close_ok', _}, ChPid, State) ->
@@ -847,41 +851,40 @@ handle_method0(#'connection.secure_ok'{response = Response},
State = #v1{connection_state = securing}) ->
auth_phase(Response, State);
-handle_method0(#'connection.tune_ok'{frame_max = FrameMax,
- heartbeat = ClientHeartbeat},
+handle_method0(#'connection.tune_ok'{frame_max = FrameMax,
+ channel_max = ChannelMax,
+ heartbeat = ClientHeartbeat},
State = #v1{connection_state = tuning,
connection = Connection,
- sock = Sock,
- start_heartbeat_fun = SHF}) ->
- ServerFrameMax = server_frame_max(),
- if FrameMax /= 0 andalso FrameMax < ?FRAME_MIN_SIZE ->
- rabbit_misc:protocol_error(
- not_allowed, "frame_max=~w < ~w min size",
- [FrameMax, ?FRAME_MIN_SIZE]);
- ServerFrameMax /= 0 andalso FrameMax > ServerFrameMax ->
- rabbit_misc:protocol_error(
- not_allowed, "frame_max=~w > ~w max size",
- [FrameMax, ServerFrameMax]);
- true ->
- Frame = rabbit_binary_generator:build_heartbeat_frame(),
- SendFun = fun() -> catch rabbit_net:send(Sock, Frame) end,
- Parent = self(),
- ReceiveFun = fun() -> Parent ! heartbeat_timeout end,
- Heartbeater = SHF(Sock, ClientHeartbeat, SendFun,
- ClientHeartbeat, ReceiveFun),
- State#v1{connection_state = opening,
- connection = Connection#connection{
- timeout_sec = ClientHeartbeat,
- frame_max = FrameMax},
- heartbeater = Heartbeater}
- end;
+ helper_sup = SupPid,
+ sock = Sock}) ->
+ ok = validate_negotiated_integer_value(
+ frame_max, ?FRAME_MIN_SIZE, FrameMax),
+ ok = validate_negotiated_integer_value(
+ channel_max, ?CHANNEL_MIN, ChannelMax),
+ {ok, Collector} =
+ rabbit_connection_helper_sup:start_queue_collector(SupPid),
+ Frame = rabbit_binary_generator:build_heartbeat_frame(),
+ SendFun = fun() -> catch rabbit_net:send(Sock, Frame) end,
+ Parent = self(),
+ ReceiveFun = fun() -> Parent ! heartbeat_timeout end,
+ Heartbeater =
+ rabbit_heartbeat:start(SupPid, Sock, ClientHeartbeat,
+ SendFun, ClientHeartbeat, ReceiveFun),
+ State#v1{connection_state = opening,
+ connection = Connection#connection{
+ frame_max = FrameMax,
+ channel_max = ChannelMax,
+ timeout_sec = ClientHeartbeat},
+ queue_collector = Collector,
+ heartbeater = Heartbeater};
handle_method0(#'connection.open'{virtual_host = VHostPath},
State = #v1{connection_state = opening,
connection = Connection = #connection{
user = User,
protocol = Protocol},
- ch_sup3_pid = ChSup3Pid,
+ helper_sup = SupPid,
sock = Sock,
throttle = Throttle}) ->
ok = rabbit_access_control:check_vhost_access(User, VHostPath),
@@ -890,10 +893,7 @@ handle_method0(#'connection.open'{virtual_host = VHostPath},
Conserve = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}),
Throttle1 = Throttle#throttle{alarmed_by = Conserve},
{ok, ChannelSupSupPid} =
- supervisor2:start_child(
- ChSup3Pid,
- {channel_sup_sup, {rabbit_channel_sup_sup, start_link, []},
- intrinsic, infinity, supervisor, [rabbit_channel_sup_sup]}),
+ rabbit_connection_helper_sup:start_channel_sup_sup(SupPid),
State1 = control_throttle(
State#v1{connection_state = running,
connection = NewConnection,
@@ -925,13 +925,28 @@ handle_method0(_Method, #v1{connection_state = S}) ->
rabbit_misc:protocol_error(
channel_error, "unexpected method in connection state ~w", [S]).
-server_frame_max() ->
- {ok, FrameMax} = application:get_env(rabbit, frame_max),
- FrameMax.
+validate_negotiated_integer_value(Field, Min, ClientValue) ->
+ ServerValue = get_env(Field),
+ if ClientValue /= 0 andalso ClientValue < Min ->
+ fail_negotiation(Field, min, ServerValue, ClientValue);
+ ServerValue /= 0 andalso ClientValue > ServerValue ->
+ fail_negotiation(Field, max, ServerValue, ClientValue);
+ true ->
+ ok
+ end.
+
+fail_negotiation(Field, MinOrMax, ServerValue, ClientValue) ->
+ {S1, S2} = case MinOrMax of
+ min -> {lower, minimum};
+ max -> {higher, maximum}
+ end,
+ rabbit_misc:protocol_error(
+ not_allowed, "negotiated ~w = ~w is ~w than the ~w allowed value (~w)",
+ [Field, ClientValue, S1, S2, ServerValue], 'connection.tune').
-server_heartbeat() ->
- {ok, Heartbeat} = application:get_env(rabbit, heartbeat),
- Heartbeat.
+get_env(Key) ->
+ {ok, Value} = application:get_env(rabbit, Key),
+ Value.
send_on_channel0(Sock, Method, Protocol) ->
ok = rabbit_writer:internal_send_command(Sock, 0, Method, Protocol).
@@ -997,9 +1012,9 @@ auth_phase(Response,
State#v1{connection = Connection#connection{
auth_state = AuthState1}};
{ok, User} ->
- Tune = #'connection.tune'{channel_max = 0,
- frame_max = server_frame_max(),
- heartbeat = server_heartbeat()},
+ Tune = #'connection.tune'{frame_max = get_env(frame_max),
+ channel_max = get_env(channel_max),
+ heartbeat = get_env(heartbeat)},
ok = send_on_channel0(Sock, Tune, Protocol),
State#v1{connection_state = tuning,
connection = Connection#connection{user = User,
@@ -1026,13 +1041,17 @@ i(ssl_hash, S) -> ssl_info(fun ({_, {_, _, H}}) -> H end, S);
i(peer_cert_issuer, S) -> cert_info(fun rabbit_ssl:peer_cert_issuer/1, S);
i(peer_cert_subject, S) -> cert_info(fun rabbit_ssl:peer_cert_subject/1, S);
i(peer_cert_validity, S) -> cert_info(fun rabbit_ssl:peer_cert_validity/1, S);
-i(state, #v1{connection_state = CS}) -> CS;
-i(last_blocked_by, #v1{throttle = #throttle{last_blocked_by = By}}) -> By;
-i(last_blocked_age, #v1{throttle = #throttle{last_blocked_at = never}}) ->
- infinity;
-i(last_blocked_age, #v1{throttle = #throttle{last_blocked_at = T}}) ->
- timer:now_diff(erlang:now(), T) / 1000000;
i(channels, #v1{}) -> length(all_channels());
+i(state, #v1{connection_state = ConnectionState,
+ throttle = #throttle{last_blocked_by = BlockedBy,
+ last_blocked_at = T}}) ->
+ Recent = T =/= never andalso timer:now_diff(erlang:now(), T) < 5000000,
+ case {BlockedBy, ConnectionState, Recent} of
+ {resourse, blocked, _} -> blocked;
+ {_, blocking, _} -> blocking;
+ {flow, _, true} -> flow;
+ {_, _, _} -> ConnectionState
+ end;
i(Item, #v1{connection = Conn}) -> ic(Item, Conn).
ic(name, #connection{name = Name}) -> Name;
@@ -1047,6 +1066,7 @@ ic(user, #connection{user = U}) -> U#user.username;
ic(vhost, #connection{vhost = VHost}) -> VHost;
ic(timeout, #connection{timeout_sec = Timeout}) -> Timeout;
ic(frame_max, #connection{frame_max = FrameMax}) -> FrameMax;
+ic(channel_max, #connection{channel_max = ChMax}) -> ChMax;
ic(client_properties, #connection{client_properties = CP}) -> CP;
ic(auth_mechanism, #connection{auth_mechanism = none}) -> none;
ic(auth_mechanism, #connection{auth_mechanism = {Name, _Mod}}) -> Name;
@@ -1081,8 +1101,16 @@ maybe_emit_stats(State) ->
fun() -> emit_stats(State) end).
emit_stats(State) ->
- rabbit_event:notify(connection_stats, infos(?STATISTICS_KEYS, State)),
- rabbit_event:reset_stats_timer(State, #v1.stats_timer).
+ Infos = infos(?STATISTICS_KEYS, State),
+ rabbit_event:notify(connection_stats, Infos),
+ State1 = rabbit_event:reset_stats_timer(State, #v1.stats_timer),
+ %% If we emit an event which looks like we are in flow control, it's not a
+ %% good idea for it to be our last even if we go idle. Keep emitting
+ %% events, either we stay busy or we drop out of flow control.
+ case proplists:get_value(state, Infos) of
+ flow -> ensure_stats_timer(State1);
+ _ -> State1
+ end.
%% 1.0 stub
-ifdef(use_specs).
@@ -1106,10 +1134,7 @@ pack_for_1_0(#v1{parent = Parent,
sock = Sock,
recv_len = RecvLen,
pending_recv = PendingRecv,
- queue_collector = QueueCollector,
- ch_sup3_pid = ChSup3Pid,
- start_heartbeat_fun = SHF,
+ helper_sup = SupPid,
buf = Buf,
buf_len = BufLen}) ->
- {Parent, Sock, RecvLen, PendingRecv, QueueCollector, ChSup3Pid, SHF,
- Buf, BufLen}.
+ {Parent, Sock, RecvLen, PendingRecv, SupPid, Buf, BufLen}.
diff --git a/src/rabbit_runtime_parameters.erl b/src/rabbit_runtime_parameters.erl
index c13c333e..bcde0078 100644
--- a/src/rabbit_runtime_parameters.erl
+++ b/src/rabbit_runtime_parameters.erl
@@ -139,15 +139,21 @@ list() ->
list(VHost) -> list(VHost, '_').
list_component(Component) -> list('_', Component).
+%% Not dirty_match_object since that would not be transactional when used in a
+%% tx context
list(VHost, Component) ->
- case VHost of
- '_' -> ok;
- _ -> rabbit_vhost:assert(VHost)
- end,
- Match = #runtime_parameters{key = {VHost, Component, '_'}, _ = '_'},
- [p(P) || #runtime_parameters{key = {_VHost, Comp, _Name}} = P <-
- mnesia:dirty_match_object(?TABLE, Match),
- Comp =/= <<"policy">> orelse Component =:= <<"policy">>].
+ mnesia:async_dirty(
+ fun () ->
+ case VHost of
+ '_' -> ok;
+ _ -> rabbit_vhost:assert(VHost)
+ end,
+ Match = #runtime_parameters{key = {VHost, Component, '_'},
+ _ = '_'},
+ [p(P) || #runtime_parameters{key = {_VHost, Comp, _Name}} = P <-
+ mnesia:match_object(?TABLE, Match, read),
+ Comp =/= <<"policy">> orelse Component =:= <<"policy">>]
+ end).
list_formatted(VHost) ->
[pset(value, format(pget(value, P)), P) || P <- list(VHost)].
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 76421d1a..5fe319d3 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -808,6 +808,7 @@ test_log_management_during_startup() ->
%% start application with logging to non-existing directory
TmpLog = "/tmp/rabbit-tests/test.log",
delete_file(TmpLog),
+ ok = control_action(stop_app, []),
ok = application:set_env(rabbit, error_logger, {file, TmpLog}),
ok = delete_log_handlers([rabbit_error_logger_file_h]),
@@ -816,6 +817,7 @@ test_log_management_during_startup() ->
%% start application with logging to directory with no
%% write permissions
+ ok = control_action(stop_app, []),
TmpDir = "/tmp/rabbit-tests",
ok = set_permissions(TmpDir, 8#00400),
ok = delete_log_handlers([rabbit_error_logger_file_h]),
@@ -830,6 +832,7 @@ test_log_management_during_startup() ->
%% start application with logging to a subdirectory which
%% parent directory has no write permissions
+ ok = control_action(stop_app, []),
TmpTestDir = "/tmp/rabbit-tests/no-permission/test/log",
ok = application:set_env(rabbit, error_logger, {file, TmpTestDir}),
ok = add_log_handlers([{error_logger_file_h, MainLog}]),
@@ -849,12 +852,13 @@ test_log_management_during_startup() ->
%% start application with standard error_logger_file_h
%% handler not installed
+ ok = control_action(stop_app, []),
ok = application:set_env(rabbit, error_logger, {file, MainLog}),
ok = control_action(start_app, []),
- ok = control_action(stop_app, []),
%% start application with standard sasl handler not installed
%% and rabbit main log handler installed correctly
+ ok = control_action(stop_app, []),
ok = delete_log_handlers([rabbit_sasl_report_file_h]),
ok = control_action(start_app, []),
passed.
diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl
index 1047b823..c1f142d7 100644
--- a/src/rabbit_upgrade.erl
+++ b/src/rabbit_upgrade.erl
@@ -191,9 +191,14 @@ die(Msg, Args) ->
%% straight out into do_boot, generating an erl_crash.dump
%% and displaying any error message in a confusing way.
error_logger:error_msg(Msg, Args),
- io:format("~n~n****~n~n" ++ Msg ++ "~n~n****~n~n~n", Args),
+ Str = rabbit_misc:format(
+ "~n~n****~n~n" ++ Msg ++ "~n~n****~n~n~n", Args),
+ io:format(Str),
error_logger:logfile(close),
- halt(1).
+ case application:get_env(rabbit, halt_on_upgrade_failure) of
+ {ok, false} -> throw({upgrade_error, Str});
+ _ -> halt(1) %% i.e. true or undefined
+ end.
primary_upgrade(Upgrades, Nodes) ->
Others = Nodes -- [node()],
diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl
index 6f95ef60..90372461 100644
--- a/src/rabbit_upgrade_functions.erl
+++ b/src/rabbit_upgrade_functions.erl
@@ -46,6 +46,7 @@
-rabbit_upgrade({exchange_decorators, mnesia, [policy]}).
-rabbit_upgrade({policy_apply_to, mnesia, [runtime_parameters]}).
-rabbit_upgrade({queue_decorators, mnesia, [gm_pids]}).
+-rabbit_upgrade({internal_system_x, mnesia, [exchange_decorators]}).
%% -------------------------------------------------------------------
@@ -74,6 +75,7 @@
-spec(exchange_decorators/0 :: () -> 'ok').
-spec(policy_apply_to/0 :: () -> 'ok').
-spec(queue_decorators/0 :: () -> 'ok').
+-spec(internal_system_x/0 :: () -> 'ok').
-endif.
@@ -340,6 +342,19 @@ queue_decorators(Table) ->
[name, durable, auto_delete, exclusive_owner, arguments, pid, slave_pids,
sync_slave_pids, policy, gm_pids, decorators]).
+internal_system_x() ->
+ transform(
+ rabbit_durable_exchange,
+ fun ({exchange, Name = {resource, _, _, <<"amq.rabbitmq.", _/binary>>},
+ Type, Dur, AutoDel, _Int, Args, Scratches, Policy, Decorators}) ->
+ {exchange, Name, Type, Dur, AutoDel, true, Args, Scratches,
+ Policy, Decorators};
+ (X) ->
+ X
+ end,
+ [name, type, durable, auto_delete, internal, arguments, scratches, policy,
+ decorators]).
+
%%--------------------------------------------------------------------
transform(TableName, Fun, FieldList) ->
diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl
index 8d013d43..047bce77 100644
--- a/src/rabbit_vhost.erl
+++ b/src/rabbit_vhost.erl
@@ -60,15 +60,17 @@ add(VHostPath) ->
(ok, false) ->
[rabbit_exchange:declare(
rabbit_misc:r(VHostPath, exchange, Name),
- Type, true, false, false, []) ||
- {Name,Type} <-
- [{<<"">>, direct},
- {<<"amq.direct">>, direct},
- {<<"amq.topic">>, topic},
- {<<"amq.match">>, headers}, %% per 0-9-1 pdf
- {<<"amq.headers">>, headers}, %% per 0-9-1 xml
- {<<"amq.fanout">>, fanout},
- {<<"amq.rabbitmq.trace">>, topic}]],
+ Type, true, false, Internal, []) ||
+ {Name, Type, Internal} <-
+ [{<<"">>, direct, false},
+ {<<"amq.direct">>, direct, false},
+ {<<"amq.topic">>, topic, false},
+ %% per 0-9-1 pdf
+ {<<"amq.match">>, headers, false},
+ %% per 0-9-1 xml
+ {<<"amq.headers">>, headers, false},
+ {<<"amq.fanout">>, fanout, false},
+ {<<"amq.rabbitmq.trace">>, topic, true}]],
ok
end),
rabbit_event:notify(vhost_created, info(VHostPath)),
diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl
index bf6964d8..34dd3d3b 100644
--- a/src/rabbit_writer.erl
+++ b/src/rabbit_writer.erl
@@ -25,6 +25,7 @@
-export([send_command/2, send_command/3,
send_command_sync/2, send_command_sync/3,
send_command_and_notify/4, send_command_and_notify/5,
+ send_command_flow/2, send_command_flow/3,
flush/1]).
-export([internal_send_command/4, internal_send_command/6]).
@@ -78,6 +79,11 @@
(pid(), pid(), pid(), rabbit_framing:amqp_method_record(),
rabbit_types:content())
-> 'ok').
+-spec(send_command_flow/2 ::
+ (pid(), rabbit_framing:amqp_method_record()) -> 'ok').
+-spec(send_command_flow/3 ::
+ (pid(), rabbit_framing:amqp_method_record(), rabbit_types:content())
+ -> 'ok').
-spec(flush/1 :: (pid()) -> 'ok').
-spec(internal_send_command/4 ::
(rabbit_net:socket(), rabbit_channel:channel_number(),
@@ -165,6 +171,12 @@ handle_message({send_command, MethodRecord}, State) ->
internal_send_command_async(MethodRecord, State);
handle_message({send_command, MethodRecord, Content}, State) ->
internal_send_command_async(MethodRecord, Content, State);
+handle_message({send_command_flow, MethodRecord, Sender}, State) ->
+ credit_flow:ack(Sender),
+ internal_send_command_async(MethodRecord, State);
+handle_message({send_command_flow, MethodRecord, Content, Sender}, State) ->
+ credit_flow:ack(Sender),
+ internal_send_command_async(MethodRecord, Content, State);
handle_message({'$gen_call', From, {send_command_sync, MethodRecord}}, State) ->
State1 = internal_flush(
internal_send_command_async(MethodRecord, State)),
@@ -212,6 +224,16 @@ send_command(W, MethodRecord, Content) ->
W ! {send_command, MethodRecord, Content},
ok.
+send_command_flow(W, MethodRecord) ->
+ credit_flow:send(W),
+ W ! {send_command_flow, MethodRecord, self()},
+ ok.
+
+send_command_flow(W, MethodRecord, Content) ->
+ credit_flow:send(W),
+ W ! {send_command_flow, MethodRecord, Content, self()},
+ ok.
+
send_command_sync(W, MethodRecord) ->
call(W, {send_command_sync, MethodRecord}).
diff --git a/src/vm_memory_monitor.erl b/src/vm_memory_monitor.erl
index a07f6c65..369ec655 100644
--- a/src/vm_memory_monitor.erl
+++ b/src/vm_memory_monitor.erl
@@ -221,11 +221,11 @@ get_vm_limit({win32,_OSname}) ->
8 -> 8*1024*1024*1024*1024 %% 8 TB for 64 bits 2^42
end;
-%% On a 32-bit machine, if you're using more than 4 gigs of RAM you're
+%% On a 32-bit machine, if you're using more than 2 gigs of RAM you're
%% in big trouble anyway.
get_vm_limit(_OsType) ->
case erlang:system_info(wordsize) of
- 4 -> 4*1024*1024*1024; %% 4 GB for 32 bits 2^32
+ 4 -> 2*1024*1024*1024; %% 2 GB for 32 bits 2^31
8 -> 256*1024*1024*1024*1024 %% 256 TB for 64 bits 2^48
%%http://en.wikipedia.org/wiki/X86-64#Virtual_address_space_details
end.
diff --git a/src/worker_pool.erl b/src/worker_pool.erl
index 488db5ec..e14c471c 100644
--- a/src/worker_pool.erl
+++ b/src/worker_pool.erl
@@ -63,7 +63,7 @@ start_link() ->
submit(Fun) ->
case get(worker_pool_worker) of
true -> worker_pool_worker:run(Fun);
- _ -> Pid = gen_server2:call(?SERVER, next_free, infinity),
+ _ -> Pid = gen_server2:call(?SERVER, {next_free, self()}, infinity),
worker_pool_worker:submit(Pid, Fun)
end.
@@ -79,15 +79,17 @@ init([]) ->
{ok, #state { pending = queue:new(), available = queue:new() }, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
-handle_call(next_free, From, State = #state { available = Avail,
- pending = Pending }) ->
+handle_call({next_free, CPid}, From, State = #state { available = Avail,
+ pending = Pending }) ->
case queue:out(Avail) of
{empty, _Avail} ->
{noreply,
- State #state { pending = queue:in({next_free, From}, Pending) },
+ State#state{pending = queue:in({next_free, From, CPid}, Pending)},
hibernate};
{{value, WId}, Avail1} ->
- {reply, get_worker_pid(WId), State #state { available = Avail1 },
+ WPid = get_worker_pid(WId),
+ worker_pool_worker:next_job_from(WPid, CPid),
+ {reply, WPid, State #state { available = Avail1 },
hibernate}
end;
@@ -99,8 +101,10 @@ handle_cast({idle, WId}, State = #state { available = Avail,
{noreply, case queue:out(Pending) of
{empty, _Pending} ->
State #state { available = queue:in(WId, Avail) };
- {{value, {next_free, From}}, Pending1} ->
- gen_server2:reply(From, get_worker_pid(WId)),
+ {{value, {next_free, From, CPid}}, Pending1} ->
+ WPid = get_worker_pid(WId),
+ worker_pool_worker:next_job_from(WPid, CPid),
+ gen_server2:reply(From, WPid),
State #state { pending = Pending1 };
{{value, {run_async, Fun}}, Pending1} ->
worker_pool_worker:submit_async(get_worker_pid(WId), Fun),
diff --git a/src/worker_pool_worker.erl b/src/worker_pool_worker.erl
index a976503f..724235bf 100644
--- a/src/worker_pool_worker.erl
+++ b/src/worker_pool_worker.erl
@@ -18,7 +18,7 @@
-behaviour(gen_server2).
--export([start_link/1, submit/2, submit_async/2, run/1]).
+-export([start_link/1, next_job_from/2, submit/2, submit_async/2, run/1]).
-export([set_maximum_since_use/2]).
@@ -32,6 +32,7 @@
-type(mfargs() :: {atom(), atom(), [any()]}).
-spec(start_link/1 :: (any()) -> {'ok', pid()} | {'error', any()}).
+-spec(next_job_from/2 :: (pid(), pid()) -> 'ok').
-spec(submit/2 :: (pid(), fun (() -> A) | mfargs()) -> A).
-spec(submit_async/2 :: (pid(), fun (() -> any()) | mfargs()) -> 'ok').
-spec(run/1 :: (fun (() -> A)) -> A; (mfargs()) -> any()).
@@ -44,13 +45,18 @@
-define(HIBERNATE_AFTER_MIN, 1000).
-define(DESIRED_HIBERNATE, 10000).
+-record(state, {id, next}).
+
%%----------------------------------------------------------------------------
start_link(WId) ->
gen_server2:start_link(?MODULE, [WId], [{timeout, infinity}]).
+next_job_from(Pid, CPid) ->
+ gen_server2:cast(Pid, {next_job_from, CPid}).
+
submit(Pid, Fun) ->
- gen_server2:call(Pid, {submit, Fun}, infinity).
+ gen_server2:call(Pid, {submit, Fun, self()}, infinity).
submit_async(Pid, Fun) ->
gen_server2:cast(Pid, {submit_async, Fun}).
@@ -70,32 +76,57 @@ init([WId]) ->
[self()]),
ok = worker_pool:idle(WId),
put(worker_pool_worker, true),
- {ok, WId, hibernate,
+ {ok, #state{id = WId}, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
prioritise_cast({set_maximum_since_use, _Age}, _Len, _State) -> 8;
+prioritise_cast({next_job_from, _CPid}, _Len, _State) -> 7;
prioritise_cast(_Msg, _Len, _State) -> 0.
-handle_call({submit, Fun}, From, WId) ->
+handle_call({submit, Fun, CPid}, From, State = #state{next = undefined}) ->
+ {noreply, State#state{next = {job, CPid, From, Fun}}, hibernate};
+
+handle_call({submit, Fun, CPid}, From, State = #state{next = {from, CPid, MRef},
+ id = WId}) ->
+ erlang:demonitor(MRef),
gen_server2:reply(From, run(Fun)),
ok = worker_pool:idle(WId),
- {noreply, WId, hibernate};
+ {noreply, State#state{next = undefined}, hibernate};
handle_call(Msg, _From, State) ->
{stop, {unexpected_call, Msg}, State}.
-handle_cast({submit_async, Fun}, WId) ->
+handle_cast({next_job_from, CPid}, State = #state{next = undefined}) ->
+ MRef = erlang:monitor(process, CPid),
+ {noreply, State#state{next = {from, CPid, MRef}}, hibernate};
+
+handle_cast({next_job_from, CPid}, State = #state{next = {job, CPid, From, Fun},
+ id = WId}) ->
+ gen_server2:reply(From, run(Fun)),
+ ok = worker_pool:idle(WId),
+ {noreply, State#state{next = undefined}, hibernate};
+
+handle_cast({submit_async, Fun}, State = #state{id = WId}) ->
run(Fun),
ok = worker_pool:idle(WId),
- {noreply, WId, hibernate};
+ {noreply, State, hibernate};
-handle_cast({set_maximum_since_use, Age}, WId) ->
+handle_cast({set_maximum_since_use, Age}, State) ->
ok = file_handle_cache:set_maximum_since_use(Age),
- {noreply, WId, hibernate};
+ {noreply, State, hibernate};
handle_cast(Msg, State) ->
{stop, {unexpected_cast, Msg}, State}.
+handle_info({'DOWN', MRef, process, CPid, _Reason},
+ State = #state{id = WId,
+ next = {from, CPid, MRef}}) ->
+ ok = worker_pool:idle(WId),
+ {noreply, State#state{next = undefined}};
+
+handle_info({'DOWN', _MRef, process, _Pid, _Reason}, State) ->
+ {noreply, State};
+
handle_info(Msg, State) ->
{stop, {unexpected_info, Msg}, State}.