summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorVlad Alexandru Ionescu <vlad@rabbitmq.com>2010-10-05 12:31:12 +0100
committerVlad Alexandru Ionescu <vlad@rabbitmq.com>2010-10-05 12:31:12 +0100
commita131c276c4e2963ed3652a072acf128fcbeb808f (patch)
treeea1d8fe5f2fe1b5ab70e90db35a943cc30a87f84
parentad23d50cd83490c14b29f2ebb30a966586168653 (diff)
parent0ead8ebe9ded44546be397510a537c03a78e37fa (diff)
downloadrabbitmq-server-a131c276c4e2963ed3652a072acf128fcbeb808f.tar.gz
merging in from default
-rw-r--r--LICENSE2
-rw-r--r--Makefile17
-rw-r--r--docs/rabbitmqctl.1.xml23
-rw-r--r--include/rabbit_backing_queue_spec.hrl17
-rw-r--r--packaging/RPMS/Fedora/rabbitmq-server.spec3
-rw-r--r--packaging/debs/Debian/debian/changelog6
-rw-r--r--packaging/debs/Debian/debian/control5
-rwxr-xr-xscripts/rabbitmq-server2
-rwxr-xr-xscripts/rabbitmqctl1
-rw-r--r--src/file_handle_cache.erl2
-rw-r--r--src/gen_server2.erl926
-rw-r--r--src/rabbit_amqqueue.erl44
-rw-r--r--src/rabbit_amqqueue_process.erl73
-rw-r--r--src/rabbit_basic.erl26
-rw-r--r--src/rabbit_channel.erl128
-rw-r--r--src/rabbit_control.erl2
-rw-r--r--src/rabbit_dialyzer.erl25
-rw-r--r--src/rabbit_event.erl71
-rw-r--r--src/rabbit_limiter.erl26
-rw-r--r--src/rabbit_msg_store.erl32
-rw-r--r--src/rabbit_msg_store_gc.erl7
-rw-r--r--src/rabbit_net.erl28
-rw-r--r--src/rabbit_networking.erl25
-rw-r--r--src/rabbit_plugin_activator.erl2
-rw-r--r--src/rabbit_queue_collector.erl2
-rw-r--r--src/rabbit_queue_index.erl43
-rw-r--r--src/rabbit_reader.erl40
-rw-r--r--src/rabbit_ssl.erl173
-rw-r--r--src/rabbit_tests.erl1
-rw-r--r--src/rabbit_variable_queue.erl67
-rw-r--r--src/vm_memory_monitor.erl17
-rw-r--r--src/worker_pool_worker.erl7
32 files changed, 1113 insertions, 730 deletions
diff --git a/LICENSE b/LICENSE
index d7042b92..89640485 100644
--- a/LICENSE
+++ b/LICENSE
@@ -1,5 +1,5 @@
This package, the RabbitMQ server is licensed under the MPL. For the
MPL, please see LICENSE-MPL-RabbitMQ.
-If you have any questions regarding licensing, please contact us at
+If you have any questions regarding licensing, please contact us at
info@rabbitmq.com.
diff --git a/Makefile b/Makefile
index 46b18425..ee0c0838 100644
--- a/Makefile
+++ b/Makefile
@@ -1,4 +1,3 @@
-
TMPDIR ?= /tmp
RABBITMQ_NODENAME ?= rabbit
@@ -92,12 +91,13 @@ endif
all: $(TARGETS)
$(DEPS_FILE): $(SOURCES) $(INCLUDES)
+ rm -f $@
escript generate_deps $(INCLUDE_DIR) $(SOURCE_DIR) \$$\(EBIN_DIR\) $@
$(EBIN_DIR)/rabbit.app: $(EBIN_DIR)/rabbit_app.in $(BEAM_TARGETS) generate_app
escript generate_app $(EBIN_DIR) $@ < $<
-$(EBIN_DIR)/%.beam:
+$(EBIN_DIR)/%.beam: $(SOURCE_DIR)/%.erl | $(DEPS_FILE)
erlc $(ERLC_OPTS) -pa $(EBIN_DIR) $<
$(INCLUDE_DIR)/rabbit_framing.hrl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_FILES_0_9_1) $(AMQP_SPEC_JSON_FILES_0_8)
@@ -111,7 +111,11 @@ $(SOURCE_DIR)/rabbit_framing_amqp_0_8.erl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_c
dialyze: $(BEAM_TARGETS) $(BASIC_PLT)
$(ERL_EBIN) -eval \
- "rabbit_dialyzer:halt_with_code(rabbit_dialyzer:dialyze_files(\"$(BASIC_PLT)\", \"$(BEAM_TARGETS)\"))."
+ "rabbit_dialyzer:dialyze_files(\"$(BASIC_PLT)\", \"$(BEAM_TARGETS)\")." \
+ -eval \
+ "init:stop()."
+
+
# rabbit.plt is used by rabbitmq-erlang-client's dialyze make target
create-plt: $(RABBIT_PLT)
@@ -308,11 +312,6 @@ else
TESTABLEGOALS:=$(MAKECMDGOALS)
endif
-ifneq "$(strip $(TESTABLEGOALS))" "$(DEPS_FILE)"
ifneq "$(strip $(patsubst clean%,,$(patsubst %clean,,$(TESTABLEGOALS))))" ""
-ifeq "$(strip $(wildcard $(DEPS_FILE)))" ""
-$(info $(shell $(MAKE) $(DEPS_FILE)))
-endif
-include $(DEPS_FILE)
-endif
+-include $(DEPS_FILE)
endif
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index 5179eb25..73882861 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -967,6 +967,21 @@
<listitem><para>Peer port.</para></listitem>
</varlistentry>
<varlistentry>
+ <term>peer_cert_subject</term>
+ <listitem><para>The subject of the peer's SSL
+ certificate, in RFC4514 form.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>peer_cert_issuer</term>
+ <listitem><para>The issuer of the peer's SSL
+ certificate, in RFC4514 form.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>peer_cert_validity</term>
+ <listitem><para>The period for which the peer's SSL
+ certificate is valid.</para></listitem>
+ </varlistentry>
+ <varlistentry>
<term>state</term>
<listitem><para>Connection state (one of [<command>starting</command>, <command>tuning</command>,
<command>opening</command>, <command>running</command>, <command>closing</command>, <command>closed</command>]).</para></listitem>
@@ -1101,6 +1116,14 @@
<term>prefetch_count</term>
<listitem><para>QoS prefetch count limit in force, 0 if unlimited.</para></listitem>
</varlistentry>
+ <varlistentry>
+ <term>client_flow_blocked</term>
+ <listitem><para>True if the client issued a
+ <command>channel.flow{active=false}</command>
+ command, blocking the server from delivering
+ messages to the channel's consumers.
+ </para></listitem>
+ </varlistentry>
</variablelist>
<para>
If no <command>channelinfoitem</command>s are specified then pid,
diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl
index 005994f0..38c6f939 100644
--- a/include/rabbit_backing_queue_spec.hrl
+++ b/include/rabbit_backing_queue_spec.hrl
@@ -30,8 +30,9 @@
%%
-type(fetch_result() ::
- %% Message, IsDelivered, AckTag, Remaining_Len
- ('empty'|{rabbit_types:basic_message(), boolean(), ack(), non_neg_integer()})).
+ ('empty' |
+ %% Message, IsDelivered, AckTag, RemainingLen
+ {rabbit_types:basic_message(), boolean(), ack(), non_neg_integer()})).
-type(is_durable() :: boolean()).
-type(attempt_recovery() :: boolean()).
-type(purged_msg_count() :: non_neg_integer()).
@@ -39,19 +40,23 @@
-spec(start/1 :: ([rabbit_amqqueue:name()]) -> 'ok').
-spec(stop/0 :: () -> 'ok').
--spec(init/3 :: (rabbit_amqqueue:name(), is_durable(), attempt_recovery()) -> state()).
+-spec(init/3 :: (rabbit_amqqueue:name(), is_durable(), attempt_recovery()) ->
+ state()).
-spec(terminate/1 :: (state()) -> state()).
-spec(delete_and_terminate/1 :: (state()) -> state()).
-spec(purge/1 :: (state()) -> {purged_msg_count(), state()}).
-spec(publish/2 :: (rabbit_types:basic_message(), state()) -> state()).
-spec(publish_delivered/3 ::
- (ack_required(), rabbit_types:basic_message(), state()) -> {ack(), state()}).
+ (ack_required(), rabbit_types:basic_message(), state()) ->
+ {ack(), state()}).
-spec(fetch/2 :: (ack_required(), state()) -> {fetch_result(), state()}).
-spec(ack/2 :: ([ack()], state()) -> state()).
--spec(tx_publish/3 :: (rabbit_types:txn(), rabbit_types:basic_message(), state()) -> state()).
+-spec(tx_publish/3 :: (rabbit_types:txn(), rabbit_types:basic_message(),
+ state()) -> state()).
-spec(tx_ack/3 :: (rabbit_types:txn(), [ack()], state()) -> state()).
-spec(tx_rollback/2 :: (rabbit_types:txn(), state()) -> {[ack()], state()}).
--spec(tx_commit/3 :: (rabbit_types:txn(), fun (() -> any()), state()) -> {[ack()], state()}).
+-spec(tx_commit/3 :: (rabbit_types:txn(), fun (() -> any()), state()) ->
+ {[ack()], state()}).
-spec(requeue/2 :: ([ack()], state()) -> state()).
-spec(len/1 :: (state()) -> non_neg_integer()).
-spec(is_empty/1 :: (state()) -> boolean()).
diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec
index 17518ddf..eb0a2a51 100644
--- a/packaging/RPMS/Fedora/rabbitmq-server.spec
+++ b/packaging/RPMS/Fedora/rabbitmq-server.spec
@@ -127,6 +127,9 @@ done
rm -rf %{buildroot}
%changelog
+* Tue Sep 14 2010 marek@rabbitmq.com 2.1.0-1
+- New Upstream Release
+
* Mon Aug 23 2010 mikeb@rabbitmq.com 2.0.0-1
- New Upstream Release
diff --git a/packaging/debs/Debian/debian/changelog b/packaging/debs/Debian/debian/changelog
index 7ee60016..9927cfbc 100644
--- a/packaging/debs/Debian/debian/changelog
+++ b/packaging/debs/Debian/debian/changelog
@@ -1,3 +1,9 @@
+rabbitmq-server (2.1.0-1) lucid; urgency=low
+
+ * New Upstream Release
+
+ -- Marek Majkowski <marek@rabbitmq.com> Tue, 14 Sep 2010 14:20:17 +0100
+
rabbitmq-server (2.0.0-1) karmic; urgency=low
* New Upstream Release
diff --git a/packaging/debs/Debian/debian/control b/packaging/debs/Debian/debian/control
index 4afc66ac..02da0cc6 100644
--- a/packaging/debs/Debian/debian/control
+++ b/packaging/debs/Debian/debian/control
@@ -7,7 +7,10 @@ Standards-Version: 3.8.0
Package: rabbitmq-server
Architecture: all
-Depends: erlang-base (>= 1:12.b.3) | erlang-base-hipe (>= 1:12.b.3), erlang-ssl | erlang-nox (<< 1:13.b-dfsg1-1), erlang-os-mon | erlang-nox (<< 1:13.b-dfsg1-1), erlang-mnesia | erlang-nox (<< 1:13.b-dfsg1-1), adduser, logrotate, ${misc:Depends}
+# erlang-inets is not a strict dependency, but it's needed to allow
+# the installation of plugins that use mochiweb. Ideally it would be a
+# "Recommends" instead, but gdebi does not install those.
+Depends: erlang-base (>= 1:12.b.3) | erlang-base-hipe (>= 1:12.b.3), erlang-ssl | erlang-nox (<< 1:13.b-dfsg1-1), erlang-os-mon | erlang-nox (<< 1:13.b-dfsg1-1), erlang-mnesia | erlang-nox (<< 1:13.b-dfsg1-1), erlang-inets | erlang-nox (<< 1:13.b-dfsg1-1), adduser, logrotate, ${misc:Depends}
Description: An AMQP server written in Erlang
RabbitMQ is an implementation of AMQP, the emerging standard for high
performance enterprise messaging. The RabbitMQ server is a robust and
diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server
index 9310752f..8e26663a 100755
--- a/scripts/rabbitmq-server
+++ b/scripts/rabbitmq-server
@@ -82,7 +82,7 @@ fi
[ -f "${RABBITMQ_SASL_LOGS}" ] && cat "${RABBITMQ_SASL_LOGS}" >> "${RABBITMQ_SASL_LOGS}${RABBITMQ_BACKUP_EXTENSION}"
RABBITMQ_START_RABBIT=
-[ "x" = "x$RABBITMQ_ALLOW_INPUT" ] && RABBITMQ_START_RABBIT='-noinput'
+[ "x" = "x$RABBITMQ_ALLOW_INPUT" ] && RABBITMQ_START_RABBIT='-noinput'
RABBITMQ_EBIN_ROOT="${RABBITMQ_HOME}/ebin"
if [ "x" = "x$RABBITMQ_NODE_ONLY" ]; then
diff --git a/scripts/rabbitmqctl b/scripts/rabbitmqctl
index 92e5312b..76ce25fd 100755
--- a/scripts/rabbitmqctl
+++ b/scripts/rabbitmqctl
@@ -47,4 +47,3 @@ exec erl \
-s rabbit_control \
-nodename $RABBITMQ_NODENAME \
-extra "$@"
-
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index d2830a25..6a948d49 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -1171,7 +1171,7 @@ ulimit() ->
?FILE_HANDLES_LIMIT_WINDOWS;
{unix, _OsName} ->
%% Under Linux, Solaris and FreeBSD, ulimit is a shell
- %% builtin, not a command. In OS X, it's a command.
+ %% builtin, not a command. In OS X and AIX it's a command.
%% Fortunately, os:cmd invokes the cmd in a shell env, so
%% we're safe in all cases.
case os:cmd("ulimit -n") of
diff --git a/src/gen_server2.erl b/src/gen_server2.erl
index 9fb9e2fe..230d1f2a 100644
--- a/src/gen_server2.erl
+++ b/src/gen_server2.erl
@@ -16,10 +16,12 @@
%% The original code could reorder messages when communicating with a
%% process on a remote node that was not currently connected.
%%
-%% 4) The new functions gen_server2:pcall/3, pcall/4, and pcast/3
-%% allow callers to attach priorities to requests. Requests with
-%% higher priorities are processed before requests with lower
-%% priorities. The default priority is 0.
+%% 4) The callback module can optionally implement prioritise_call/3,
+%% prioritise_cast/2 and prioritise_info/2. These functions take
+%% Message, From and State or just Message and State and return a
+%% single integer representing the priority attached to the message.
+%% Messages with higher priorities are processed before requests with
+%% lower priorities. The default priority is 0.
%%
%% 5) The callback module can optionally implement
%% handle_pre_hibernate/1 and handle_post_hibernate/1. These will be
@@ -64,16 +66,16 @@
%% compliance with the License. You should have received a copy of the
%% Erlang Public License along with this software. If not, it can be
%% retrieved via the world wide web at http://www.erlang.org/.
-%%
+%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and limitations
%% under the License.
-%%
+%%
%% The Initial Developer of the Original Code is Ericsson Utvecklings AB.
%% Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings
%% AB. All Rights Reserved.''
-%%
+%%
%% $Id$
%%
-module(gen_server2).
@@ -82,13 +84,13 @@
%%%
%%% The idea behind THIS server is that the user module
%%% provides (different) functions to handle different
-%%% kind of inputs.
+%%% kind of inputs.
%%% If the Parent process terminates the Module:terminate/2
%%% function is called.
%%%
%%% The user module should export:
%%%
-%%% init(Args)
+%%% init(Args)
%%% ==> {ok, State}
%%% {ok, State, Timeout}
%%% {ok, State, Timeout, Backoff}
@@ -101,21 +103,21 @@
%%% {reply, Reply, State, Timeout}
%%% {noreply, State}
%%% {noreply, State, Timeout}
-%%% {stop, Reason, Reply, State}
+%%% {stop, Reason, Reply, State}
%%% Reason = normal | shutdown | Term terminate(State) is called
%%%
%%% handle_cast(Msg, State)
%%%
%%% ==> {noreply, State}
%%% {noreply, State, Timeout}
-%%% {stop, Reason, State}
+%%% {stop, Reason, State}
%%% Reason = normal | shutdown | Term terminate(State) is called
%%%
%%% handle_info(Info, State) Info is e.g. {'EXIT', P, R}, {nodedown, N}, ...
%%%
%%% ==> {noreply, State}
%%% {noreply, State, Timeout}
-%%% {stop, Reason, State}
+%%% {stop, Reason, State}
%%% Reason = normal | shutdown | Term, terminate(State) is called
%%%
%%% terminate(Reason, State) Let the user module clean up
@@ -159,37 +161,41 @@
%% API
-export([start/3, start/4,
- start_link/3, start_link/4,
- call/2, call/3, pcall/3, pcall/4,
- cast/2, pcast/3, reply/2,
- abcast/2, abcast/3,
- multi_call/2, multi_call/3, multi_call/4,
- enter_loop/3, enter_loop/4, enter_loop/5, enter_loop/6, wake_hib/7]).
+ start_link/3, start_link/4,
+ call/2, call/3,
+ cast/2, reply/2,
+ abcast/2, abcast/3,
+ multi_call/2, multi_call/3, multi_call/4,
+ enter_loop/3, enter_loop/4, enter_loop/5, enter_loop/6, wake_hib/1]).
-export([behaviour_info/1]).
%% System exports
-export([system_continue/3,
- system_terminate/4,
- system_code_change/4,
- format_status/2]).
+ system_terminate/4,
+ system_code_change/4,
+ format_status/2]).
%% Internal exports
-export([init_it/6, print_event/3]).
-import(error_logger, [format/2]).
+%% State record
+-record(gs2_state, {parent, name, state, mod, time,
+ timeout_state, queue, debug, prioritise_call,
+ prioritise_cast, prioritise_info}).
+
%%%=========================================================================
%%% Specs. These exist only to shut up dialyzer's warnings
%%%=========================================================================
-ifdef(use_specs).
--spec(handle_common_termination/6 ::
- (any(), any(), any(), atom(), any(), any()) -> no_return()).
+-spec(handle_common_termination/3 ::
+ (any(), atom(), #gs2_state{}) -> no_return()).
--spec(hibernate/7 ::
- (pid(), any(), any(), atom(), any(), queue(), any()) -> no_return()).
+-spec(hibernate/1 :: (#gs2_state{}) -> no_return()).
-endif.
@@ -238,37 +244,21 @@ start_link(Name, Mod, Args, Options) ->
%% be monitored.
%% If the client is trapping exits and is linked server termination
%% is handled here (? Shall we do that here (or rely on timeouts) ?).
-%% -----------------------------------------------------------------
+%% -----------------------------------------------------------------
call(Name, Request) ->
case catch gen:call(Name, '$gen_call', Request) of
- {ok,Res} ->
- Res;
- {'EXIT',Reason} ->
- exit({Reason, {?MODULE, call, [Name, Request]}})
+ {ok,Res} ->
+ Res;
+ {'EXIT',Reason} ->
+ exit({Reason, {?MODULE, call, [Name, Request]}})
end.
call(Name, Request, Timeout) ->
case catch gen:call(Name, '$gen_call', Request, Timeout) of
- {ok,Res} ->
- Res;
- {'EXIT',Reason} ->
- exit({Reason, {?MODULE, call, [Name, Request, Timeout]}})
- end.
-
-pcall(Name, Priority, Request) ->
- case catch gen:call(Name, '$gen_pcall', {Priority, Request}) of
- {ok,Res} ->
- Res;
- {'EXIT',Reason} ->
- exit({Reason, {?MODULE, pcall, [Name, Priority, Request]}})
- end.
-
-pcall(Name, Priority, Request, Timeout) ->
- case catch gen:call(Name, '$gen_pcall', {Priority, Request}, Timeout) of
- {ok,Res} ->
- Res;
- {'EXIT',Reason} ->
- exit({Reason, {?MODULE, pcall, [Name, Priority, Request, Timeout]}})
+ {ok,Res} ->
+ Res;
+ {'EXIT',Reason} ->
+ exit({Reason, {?MODULE, call, [Name, Request, Timeout]}})
end.
%% -----------------------------------------------------------------
@@ -277,34 +267,18 @@ pcall(Name, Priority, Request, Timeout) ->
cast({global,Name}, Request) ->
catch global:send(Name, cast_msg(Request)),
ok;
-cast({Name,Node}=Dest, Request) when is_atom(Name), is_atom(Node) ->
+cast({Name,Node}=Dest, Request) when is_atom(Name), is_atom(Node) ->
do_cast(Dest, Request);
cast(Dest, Request) when is_atom(Dest) ->
do_cast(Dest, Request);
cast(Dest, Request) when is_pid(Dest) ->
do_cast(Dest, Request).
-do_cast(Dest, Request) ->
+do_cast(Dest, Request) ->
do_send(Dest, cast_msg(Request)),
ok.
-
-cast_msg(Request) -> {'$gen_cast',Request}.
-pcast({global,Name}, Priority, Request) ->
- catch global:send(Name, cast_msg(Priority, Request)),
- ok;
-pcast({Name,Node}=Dest, Priority, Request) when is_atom(Name), is_atom(Node) ->
- do_cast(Dest, Priority, Request);
-pcast(Dest, Priority, Request) when is_atom(Dest) ->
- do_cast(Dest, Priority, Request);
-pcast(Dest, Priority, Request) when is_pid(Dest) ->
- do_cast(Dest, Priority, Request).
-
-do_cast(Dest, Priority, Request) ->
- do_send(Dest, cast_msg(Priority, Request)),
- ok.
-
-cast_msg(Priority, Request) -> {'$gen_pcast', {Priority, Request}}.
+cast_msg(Request) -> {'$gen_cast',Request}.
%% -----------------------------------------------------------------
%% Send a reply to the client.
@@ -312,9 +286,9 @@ cast_msg(Priority, Request) -> {'$gen_pcast', {Priority, Request}}.
reply({To, Tag}, Reply) ->
catch To ! {Tag, Reply}.
-%% -----------------------------------------------------------------
-%% Asyncronous broadcast, returns nothing, it's just send'n prey
-%%-----------------------------------------------------------------
+%% -----------------------------------------------------------------
+%% Asyncronous broadcast, returns nothing, it's just send'n pray
+%% -----------------------------------------------------------------
abcast(Name, Request) when is_atom(Name) ->
do_abcast([node() | nodes()], Name, cast_msg(Request)).
@@ -330,36 +304,36 @@ do_abcast([], _,_) -> abcast.
%%% Make a call to servers at several nodes.
%%% Returns: {[Replies],[BadNodes]}
%%% A Timeout can be given
-%%%
+%%%
%%% A middleman process is used in case late answers arrives after
%%% the timeout. If they would be allowed to glog the callers message
-%%% queue, it would probably become confused. Late answers will
+%%% queue, it would probably become confused. Late answers will
%%% now arrive to the terminated middleman and so be discarded.
%%% -----------------------------------------------------------------
multi_call(Name, Req)
when is_atom(Name) ->
do_multi_call([node() | nodes()], Name, Req, infinity).
-multi_call(Nodes, Name, Req)
+multi_call(Nodes, Name, Req)
when is_list(Nodes), is_atom(Name) ->
do_multi_call(Nodes, Name, Req, infinity).
multi_call(Nodes, Name, Req, infinity) ->
do_multi_call(Nodes, Name, Req, infinity);
-multi_call(Nodes, Name, Req, Timeout)
+multi_call(Nodes, Name, Req, Timeout)
when is_list(Nodes), is_atom(Name), is_integer(Timeout), Timeout >= 0 ->
do_multi_call(Nodes, Name, Req, Timeout).
%%-----------------------------------------------------------------
-%% enter_loop(Mod, Options, State, <ServerName>, <TimeOut>, <Backoff>) ->_
-%%
-%% Description: Makes an existing process into a gen_server.
-%% The calling process will enter the gen_server receive
+%% enter_loop(Mod, Options, State, <ServerName>, <TimeOut>, <Backoff>) ->_
+%%
+%% Description: Makes an existing process into a gen_server.
+%% The calling process will enter the gen_server receive
%% loop and become a gen_server process.
-%% The process *must* have been started using one of the
-%% start functions in proc_lib, see proc_lib(3).
-%% The user is responsible for any initialization of the
+%% The process *must* have been started using one of the
+%% start functions in proc_lib, see proc_lib(3).
+%% The user is responsible for any initialization of the
%% process, including registering a name for it.
%%-----------------------------------------------------------------
enter_loop(Mod, Options, State) ->
@@ -386,7 +360,10 @@ enter_loop(Mod, Options, State, ServerName, Timeout, Backoff) ->
Debug = debug_options(Name, Options),
Queue = priority_queue:new(),
Backoff1 = extend_backoff(Backoff),
- loop(Parent, Name, State, Mod, Timeout, Backoff1, Queue, Debug).
+ loop(find_prioritisers(
+ #gs2_state { parent = Parent, name = Name, state = State,
+ mod = Mod, time = Timeout, timeout_state = Backoff1,
+ queue = Queue, debug = Debug })).
%%%========================================================================
%%% Gen-callback functions
@@ -405,39 +382,51 @@ init_it(Starter, Parent, Name0, Mod, Args, Options) ->
Name = name(Name0),
Debug = debug_options(Name, Options),
Queue = priority_queue:new(),
+ GS2State = find_prioritisers(
+ #gs2_state { parent = Parent,
+ name = Name,
+ mod = Mod,
+ queue = Queue,
+ debug = Debug }),
case catch Mod:init(Args) of
- {ok, State} ->
- proc_lib:init_ack(Starter, {ok, self()}),
- loop(Parent, Name, State, Mod, infinity, undefined, Queue, Debug);
- {ok, State, Timeout} ->
- proc_lib:init_ack(Starter, {ok, self()}),
- loop(Parent, Name, State, Mod, Timeout, undefined, Queue, Debug);
- {ok, State, Timeout, Backoff = {backoff, _, _, _}} ->
+ {ok, State} ->
+ proc_lib:init_ack(Starter, {ok, self()}),
+ loop(GS2State #gs2_state { state = State,
+ time = infinity,
+ timeout_state = undefined });
+ {ok, State, Timeout} ->
+ proc_lib:init_ack(Starter, {ok, self()}),
+ loop(GS2State #gs2_state { state = State,
+ time = Timeout,
+ timeout_state = undefined });
+ {ok, State, Timeout, Backoff = {backoff, _, _, _}} ->
Backoff1 = extend_backoff(Backoff),
- proc_lib:init_ack(Starter, {ok, self()}),
- loop(Parent, Name, State, Mod, Timeout, Backoff1, Queue, Debug);
- {stop, Reason} ->
- %% For consistency, we must make sure that the
- %% registered name (if any) is unregistered before
- %% the parent process is notified about the failure.
- %% (Otherwise, the parent process could get
- %% an 'already_started' error if it immediately
- %% tried starting the process again.)
- unregister_name(Name0),
- proc_lib:init_ack(Starter, {error, Reason}),
- exit(Reason);
- ignore ->
- unregister_name(Name0),
- proc_lib:init_ack(Starter, ignore),
- exit(normal);
- {'EXIT', Reason} ->
- unregister_name(Name0),
- proc_lib:init_ack(Starter, {error, Reason}),
- exit(Reason);
- Else ->
- Error = {bad_return_value, Else},
- proc_lib:init_ack(Starter, {error, Error}),
- exit(Error)
+ proc_lib:init_ack(Starter, {ok, self()}),
+ loop(GS2State #gs2_state { state = State,
+ time = Timeout,
+ timeout_state = Backoff1 });
+ {stop, Reason} ->
+ %% For consistency, we must make sure that the
+ %% registered name (if any) is unregistered before
+ %% the parent process is notified about the failure.
+ %% (Otherwise, the parent process could get
+ %% an 'already_started' error if it immediately
+ %% tried starting the process again.)
+ unregister_name(Name0),
+ proc_lib:init_ack(Starter, {error, Reason}),
+ exit(Reason);
+ ignore ->
+ unregister_name(Name0),
+ proc_lib:init_ack(Starter, ignore),
+ exit(normal);
+ {'EXIT', Reason} ->
+ unregister_name(Name0),
+ proc_lib:init_ack(Starter, {error, Reason}),
+ exit(Reason);
+ Else ->
+ Error = {bad_return_value, Else},
+ proc_lib:init_ack(Starter, {error, Error}),
+ exit(Error)
end.
name({local,Name}) -> Name;
@@ -467,23 +456,24 @@ extend_backoff({backoff, InitialTimeout, MinimumTimeout, DesiredHibPeriod}) ->
%%% ---------------------------------------------------
%%% The MAIN loop.
%%% ---------------------------------------------------
-loop(Parent, Name, State, Mod, hibernate, undefined, Queue, Debug) ->
- pre_hibernate(Parent, Name, State, Mod, undefined, Queue, Debug);
-loop(Parent, Name, State, Mod, Time, TimeoutState, Queue, Debug) ->
- process_next_msg(Parent, Name, State, Mod, Time, TimeoutState,
- drain(Queue), Debug).
+loop(GS2State = #gs2_state { time = hibernate,
+ timeout_state = undefined }) ->
+ pre_hibernate(GS2State);
+loop(GS2State) ->
+ process_next_msg(drain(GS2State)).
-drain(Queue) ->
+drain(GS2State) ->
receive
- Input -> drain(in(Input, Queue))
- after 0 -> Queue
+ Input -> drain(in(Input, GS2State))
+ after 0 -> GS2State
end.
-process_next_msg(Parent, Name, State, Mod, Time, TimeoutState, Queue, Debug) ->
+process_next_msg(GS2State = #gs2_state { time = Time,
+ timeout_state = TimeoutState,
+ queue = Queue }) ->
case priority_queue:out(Queue) of
{{value, Msg}, Queue1} ->
- process_msg(Parent, Name, State, Mod,
- Time, TimeoutState, Queue1, Debug, Msg);
+ process_msg(Msg, GS2State #gs2_state { queue = Queue1 });
{empty, Queue1} ->
{Time1, HibOnTimeout}
= case {Time, TimeoutState} of
@@ -504,68 +494,64 @@ process_next_msg(Parent, Name, State, Mod, Time, TimeoutState, Queue, Debug) ->
Input ->
%% Time could be 'hibernate' here, so *don't* call loop
process_next_msg(
- Parent, Name, State, Mod, Time, TimeoutState,
- drain(in(Input, Queue1)), Debug)
+ drain(in(Input, GS2State #gs2_state { queue = Queue1 })))
after Time1 ->
case HibOnTimeout of
true ->
pre_hibernate(
- Parent, Name, State, Mod, TimeoutState, Queue1,
- Debug);
+ GS2State #gs2_state { queue = Queue1 });
false ->
- process_msg(
- Parent, Name, State, Mod, Time, TimeoutState,
- Queue1, Debug, timeout)
+ process_msg(timeout,
+ GS2State #gs2_state { queue = Queue1 })
end
end
end.
-wake_hib(Parent, Name, State, Mod, TS, Queue, Debug) ->
+wake_hib(GS2State = #gs2_state { timeout_state = TS }) ->
TimeoutState1 = case TS of
undefined ->
undefined;
{SleptAt, TimeoutState} ->
adjust_timeout_state(SleptAt, now(), TimeoutState)
end,
- post_hibernate(Parent, Name, State, Mod, TimeoutState1,
- drain(Queue), Debug).
+ post_hibernate(
+ drain(GS2State #gs2_state { timeout_state = TimeoutState1 })).
-hibernate(Parent, Name, State, Mod, TimeoutState, Queue, Debug) ->
+hibernate(GS2State = #gs2_state { timeout_state = TimeoutState }) ->
TS = case TimeoutState of
undefined -> undefined;
{backoff, _, _, _, _} -> {now(), TimeoutState}
end,
- proc_lib:hibernate(?MODULE, wake_hib, [Parent, Name, State, Mod,
- TS, Queue, Debug]).
+ proc_lib:hibernate(?MODULE, wake_hib,
+ [GS2State #gs2_state { timeout_state = TS }]).
-pre_hibernate(Parent, Name, State, Mod, TimeoutState, Queue, Debug) ->
+pre_hibernate(GS2State = #gs2_state { state = State,
+ mod = Mod }) ->
case erlang:function_exported(Mod, handle_pre_hibernate, 1) of
true ->
case catch Mod:handle_pre_hibernate(State) of
{hibernate, NState} ->
- hibernate(Parent, Name, NState, Mod, TimeoutState, Queue,
- Debug);
+ hibernate(GS2State #gs2_state { state = NState } );
Reply ->
- handle_common_termination(Reply, Name, pre_hibernate,
- Mod, State, Debug)
+ handle_common_termination(Reply, pre_hibernate, GS2State)
end;
false ->
- hibernate(Parent, Name, State, Mod, TimeoutState, Queue, Debug)
+ hibernate(GS2State)
end.
-post_hibernate(Parent, Name, State, Mod, TimeoutState, Queue, Debug) ->
+post_hibernate(GS2State = #gs2_state { state = State,
+ mod = Mod }) ->
case erlang:function_exported(Mod, handle_post_hibernate, 1) of
true ->
case catch Mod:handle_post_hibernate(State) of
{noreply, NState} ->
- process_next_msg(Parent, Name, NState, Mod, infinity,
- TimeoutState, Queue, Debug);
+ process_next_msg(GS2State #gs2_state { state = NState,
+ time = infinity });
{noreply, NState, Time} ->
- process_next_msg(Parent, Name, NState, Mod, Time,
- TimeoutState, Queue, Debug);
+ process_next_msg(GS2State #gs2_state { state = NState,
+ time = Time });
Reply ->
- handle_common_termination(Reply, Name, post_hibernate,
- Mod, State, Debug)
+ handle_common_termination(Reply, post_hibernate, GS2State)
end;
false ->
%% use hibernate here, not infinity. This matches
@@ -574,8 +560,7 @@ post_hibernate(Parent, Name, State, Mod, TimeoutState, Queue, Debug) ->
%% still set to hibernate, iff that msg is the very msg
%% that woke us up (or the first msg we receive after
%% waking up).
- process_next_msg(Parent, Name, State, Mod, hibernate,
- TimeoutState, Queue, Debug)
+ process_next_msg(GS2State #gs2_state { time = hibernate })
end.
adjust_timeout_state(SleptAt, AwokeAt, {backoff, CurrentTO, MinimumTO,
@@ -596,32 +581,40 @@ adjust_timeout_state(SleptAt, AwokeAt, {backoff, CurrentTO, MinimumTO,
CurrentTO1 = Base + Extra,
{backoff, CurrentTO1, MinimumTO, DesiredHibPeriod, RandomState1}.
-in({'$gen_pcast', {Priority, Msg}}, Queue) ->
- priority_queue:in({'$gen_cast', Msg}, Priority, Queue);
-in({'$gen_pcall', From, {Priority, Msg}}, Queue) ->
- priority_queue:in({'$gen_call', From, Msg}, Priority, Queue);
-in(Input, Queue) ->
- priority_queue:in(Input, Queue).
-
-process_msg(Parent, Name, State, Mod, Time, TimeoutState, Queue,
- Debug, Msg) ->
+in({'$gen_cast', Msg}, GS2State = #gs2_state { prioritise_cast = PC,
+ queue = Queue }) ->
+ GS2State #gs2_state { queue = priority_queue:in(
+ {'$gen_cast', Msg},
+ PC(Msg, GS2State), Queue) };
+in({'$gen_call', From, Msg}, GS2State = #gs2_state { prioritise_call = PC,
+ queue = Queue }) ->
+ GS2State #gs2_state { queue = priority_queue:in(
+ {'$gen_call', From, Msg},
+ PC(Msg, From, GS2State), Queue) };
+in(Input, GS2State = #gs2_state { prioritise_info = PI, queue = Queue }) ->
+ GS2State #gs2_state { queue = priority_queue:in(
+ Input, PI(Input, GS2State), Queue) }.
+
+process_msg(Msg,
+ GS2State = #gs2_state { parent = Parent,
+ name = Name,
+ debug = Debug }) ->
case Msg of
- {system, From, Req} ->
- sys:handle_system_msg(
+ {system, From, Req} ->
+ sys:handle_system_msg(
Req, From, Parent, ?MODULE, Debug,
- [Name, State, Mod, Time, TimeoutState, Queue]);
+ GS2State);
%% gen_server puts Hib on the end as the 7th arg, but that
%% version of the function seems not to be documented so
%% leaving out for now.
- {'EXIT', Parent, Reason} ->
- terminate(Reason, Name, Msg, Mod, State, Debug);
- _Msg when Debug =:= [] ->
- handle_msg(Msg, Parent, Name, State, Mod, TimeoutState, Queue);
- _Msg ->
- Debug1 = sys:handle_debug(Debug, {?MODULE, print_event},
- Name, {in, Msg}),
- handle_msg(Msg, Parent, Name, State, Mod, TimeoutState, Queue,
- Debug1)
+ {'EXIT', Parent, Reason} ->
+ terminate(Reason, Msg, GS2State);
+ _Msg when Debug =:= [] ->
+ handle_msg(Msg, GS2State);
+ _Msg ->
+ Debug1 = sys:handle_debug(Debug, {?MODULE, print_event},
+ Name, {in, Msg}),
+ handle_msg(Msg, GS2State #gs2_state { debug = Debug1 })
end.
%%% ---------------------------------------------------
@@ -638,35 +631,35 @@ do_multi_call(Nodes, Name, Req, Timeout) ->
Tag = make_ref(),
Caller = self(),
Receiver =
- spawn(
- fun () ->
- %% Middleman process. Should be unsensitive to regular
- %% exit signals. The sychronization is needed in case
- %% the receiver would exit before the caller started
- %% the monitor.
- process_flag(trap_exit, true),
- Mref = erlang:monitor(process, Caller),
- receive
- {Caller,Tag} ->
- Monitors = send_nodes(Nodes, Name, Tag, Req),
- TimerId = erlang:start_timer(Timeout, self(), ok),
- Result = rec_nodes(Tag, Monitors, Name, TimerId),
- exit({self(),Tag,Result});
- {'DOWN',Mref,_,_,_} ->
- %% Caller died before sending us the go-ahead.
- %% Give up silently.
- exit(normal)
- end
- end),
+ spawn(
+ fun () ->
+ %% Middleman process. Should be unsensitive to regular
+ %% exit signals. The sychronization is needed in case
+ %% the receiver would exit before the caller started
+ %% the monitor.
+ process_flag(trap_exit, true),
+ Mref = erlang:monitor(process, Caller),
+ receive
+ {Caller,Tag} ->
+ Monitors = send_nodes(Nodes, Name, Tag, Req),
+ TimerId = erlang:start_timer(Timeout, self(), ok),
+ Result = rec_nodes(Tag, Monitors, Name, TimerId),
+ exit({self(),Tag,Result});
+ {'DOWN',Mref,_,_,_} ->
+ %% Caller died before sending us the go-ahead.
+ %% Give up silently.
+ exit(normal)
+ end
+ end),
Mref = erlang:monitor(process, Receiver),
Receiver ! {self(),Tag},
receive
- {'DOWN',Mref,_,_,{Receiver,Tag,Result}} ->
- Result;
- {'DOWN',Mref,_,_,Reason} ->
- %% The middleman code failed. Or someone did
- %% exit(_, kill) on the middleman process => Reason==killed
- exit(Reason)
+ {'DOWN',Mref,_,_,{Receiver,Tag,Result}} ->
+ Result;
+ {'DOWN',Mref,_,_,Reason} ->
+ %% The middleman code failed. Or someone did
+ %% exit(_, kill) on the middleman process => Reason==killed
+ exit(Reason)
end.
send_nodes(Nodes, Name, Tag, Req) ->
@@ -681,7 +674,7 @@ send_nodes([Node|Tail], Name, Tag, Req, Monitors)
send_nodes([_Node|Tail], Name, Tag, Req, Monitors) ->
%% Skip non-atom Node
send_nodes(Tail, Name, Tag, Req, Monitors);
-send_nodes([], _Name, _Tag, _Req, Monitors) ->
+send_nodes([], _Name, _Tag, _Req, Monitors) ->
Monitors.
%% Against old nodes:
@@ -691,89 +684,89 @@ send_nodes([], _Name, _Tag, _Req, Monitors) ->
%% Against contemporary nodes:
%% Wait for reply, server 'DOWN', or timeout from TimerId.
-rec_nodes(Tag, Nodes, Name, TimerId) ->
+rec_nodes(Tag, Nodes, Name, TimerId) ->
rec_nodes(Tag, Nodes, Name, [], [], 2000, TimerId).
rec_nodes(Tag, [{N,R}|Tail], Name, Badnodes, Replies, Time, TimerId ) ->
receive
- {'DOWN', R, _, _, _} ->
- rec_nodes(Tag, Tail, Name, [N|Badnodes], Replies, Time, TimerId);
- {{Tag, N}, Reply} -> %% Tag is bound !!!
- unmonitor(R),
- rec_nodes(Tag, Tail, Name, Badnodes,
- [{N,Reply}|Replies], Time, TimerId);
- {timeout, TimerId, _} ->
- unmonitor(R),
- %% Collect all replies that already have arrived
- rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies)
+ {'DOWN', R, _, _, _} ->
+ rec_nodes(Tag, Tail, Name, [N|Badnodes], Replies, Time, TimerId);
+ {{Tag, N}, Reply} -> %% Tag is bound !!!
+ unmonitor(R),
+ rec_nodes(Tag, Tail, Name, Badnodes,
+ [{N,Reply}|Replies], Time, TimerId);
+ {timeout, TimerId, _} ->
+ unmonitor(R),
+ %% Collect all replies that already have arrived
+ rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies)
end;
rec_nodes(Tag, [N|Tail], Name, Badnodes, Replies, Time, TimerId) ->
%% R6 node
receive
- {nodedown, N} ->
- monitor_node(N, false),
- rec_nodes(Tag, Tail, Name, [N|Badnodes], Replies, 2000, TimerId);
- {{Tag, N}, Reply} -> %% Tag is bound !!!
- receive {nodedown, N} -> ok after 0 -> ok end,
- monitor_node(N, false),
- rec_nodes(Tag, Tail, Name, Badnodes,
- [{N,Reply}|Replies], 2000, TimerId);
- {timeout, TimerId, _} ->
- receive {nodedown, N} -> ok after 0 -> ok end,
- monitor_node(N, false),
- %% Collect all replies that already have arrived
- rec_nodes_rest(Tag, Tail, Name, [N | Badnodes], Replies)
+ {nodedown, N} ->
+ monitor_node(N, false),
+ rec_nodes(Tag, Tail, Name, [N|Badnodes], Replies, 2000, TimerId);
+ {{Tag, N}, Reply} -> %% Tag is bound !!!
+ receive {nodedown, N} -> ok after 0 -> ok end,
+ monitor_node(N, false),
+ rec_nodes(Tag, Tail, Name, Badnodes,
+ [{N,Reply}|Replies], 2000, TimerId);
+ {timeout, TimerId, _} ->
+ receive {nodedown, N} -> ok after 0 -> ok end,
+ monitor_node(N, false),
+ %% Collect all replies that already have arrived
+ rec_nodes_rest(Tag, Tail, Name, [N | Badnodes], Replies)
after Time ->
- case rpc:call(N, erlang, whereis, [Name]) of
- Pid when is_pid(Pid) -> % It exists try again.
- rec_nodes(Tag, [N|Tail], Name, Badnodes,
- Replies, infinity, TimerId);
- _ -> % badnode
- receive {nodedown, N} -> ok after 0 -> ok end,
- monitor_node(N, false),
- rec_nodes(Tag, Tail, Name, [N|Badnodes],
- Replies, 2000, TimerId)
- end
+ case rpc:call(N, erlang, whereis, [Name]) of
+ Pid when is_pid(Pid) -> % It exists try again.
+ rec_nodes(Tag, [N|Tail], Name, Badnodes,
+ Replies, infinity, TimerId);
+ _ -> % badnode
+ receive {nodedown, N} -> ok after 0 -> ok end,
+ monitor_node(N, false),
+ rec_nodes(Tag, Tail, Name, [N|Badnodes],
+ Replies, 2000, TimerId)
+ end
end;
rec_nodes(_, [], _, Badnodes, Replies, _, TimerId) ->
case catch erlang:cancel_timer(TimerId) of
- false -> % It has already sent it's message
- receive
- {timeout, TimerId, _} -> ok
- after 0 ->
- ok
- end;
- _ -> % Timer was cancelled, or TimerId was 'undefined'
- ok
+ false -> % It has already sent it's message
+ receive
+ {timeout, TimerId, _} -> ok
+ after 0 ->
+ ok
+ end;
+ _ -> % Timer was cancelled, or TimerId was 'undefined'
+ ok
end,
{Replies, Badnodes}.
%% Collect all replies that already have arrived
rec_nodes_rest(Tag, [{N,R}|Tail], Name, Badnodes, Replies) ->
receive
- {'DOWN', R, _, _, _} ->
- rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies);
- {{Tag, N}, Reply} -> %% Tag is bound !!!
- unmonitor(R),
- rec_nodes_rest(Tag, Tail, Name, Badnodes, [{N,Reply}|Replies])
+ {'DOWN', R, _, _, _} ->
+ rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies);
+ {{Tag, N}, Reply} -> %% Tag is bound !!!
+ unmonitor(R),
+ rec_nodes_rest(Tag, Tail, Name, Badnodes, [{N,Reply}|Replies])
after 0 ->
- unmonitor(R),
- rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies)
+ unmonitor(R),
+ rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies)
end;
rec_nodes_rest(Tag, [N|Tail], Name, Badnodes, Replies) ->
%% R6 node
receive
- {nodedown, N} ->
- monitor_node(N, false),
- rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies);
- {{Tag, N}, Reply} -> %% Tag is bound !!!
- receive {nodedown, N} -> ok after 0 -> ok end,
- monitor_node(N, false),
- rec_nodes_rest(Tag, Tail, Name, Badnodes, [{N,Reply}|Replies])
+ {nodedown, N} ->
+ monitor_node(N, false),
+ rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies);
+ {{Tag, N}, Reply} -> %% Tag is bound !!!
+ receive {nodedown, N} -> ok after 0 -> ok end,
+ monitor_node(N, false),
+ rec_nodes_rest(Tag, Tail, Name, Badnodes, [{N,Reply}|Replies])
after 0 ->
- receive {nodedown, N} -> ok after 0 -> ok end,
- monitor_node(N, false),
- rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies)
+ receive {nodedown, N} -> ok after 0 -> ok end,
+ monitor_node(N, false),
+ rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies)
end;
rec_nodes_rest(_Tag, [], _Name, Badnodes, Replies) ->
{Replies, Badnodes}.
@@ -785,28 +778,28 @@ rec_nodes_rest(_Tag, [], _Name, Badnodes, Replies) ->
start_monitor(Node, Name) when is_atom(Node), is_atom(Name) ->
if node() =:= nonode@nohost, Node =/= nonode@nohost ->
- Ref = make_ref(),
- self() ! {'DOWN', Ref, process, {Name, Node}, noconnection},
- {Node, Ref};
+ Ref = make_ref(),
+ self() ! {'DOWN', Ref, process, {Name, Node}, noconnection},
+ {Node, Ref};
true ->
- case catch erlang:monitor(process, {Name, Node}) of
- {'EXIT', _} ->
- %% Remote node is R6
- monitor_node(Node, true),
- Node;
- Ref when is_reference(Ref) ->
- {Node, Ref}
- end
+ case catch erlang:monitor(process, {Name, Node}) of
+ {'EXIT', _} ->
+ %% Remote node is R6
+ monitor_node(Node, true),
+ Node;
+ Ref when is_reference(Ref) ->
+ {Node, Ref}
+ end
end.
%% Cancels a monitor started with Ref=erlang:monitor(_, _).
unmonitor(Ref) when is_reference(Ref) ->
erlang:demonitor(Ref),
receive
- {'DOWN', Ref, _, _, _} ->
- true
+ {'DOWN', Ref, _, _, _} ->
+ true
after 0 ->
- true
+ true
end.
%%% ---------------------------------------------------
@@ -818,130 +811,114 @@ dispatch({'$gen_cast', Msg}, Mod, State) ->
dispatch(Info, Mod, State) ->
Mod:handle_info(Info, State).
-handle_msg({'$gen_call', From, Msg},
- Parent, Name, State, Mod, TimeoutState, Queue) ->
- case catch Mod:handle_call(Msg, From, State) of
- {reply, Reply, NState} ->
- reply(From, Reply),
- loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, []);
- {reply, Reply, NState, Time1} ->
- reply(From, Reply),
- loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, []);
- {noreply, NState} ->
- loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, []);
- {noreply, NState, Time1} ->
- loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, []);
- {stop, Reason, Reply, NState} ->
- {'EXIT', R} =
- (catch terminate(Reason, Name, Msg, Mod, NState, [])),
- reply(From, Reply),
- exit(R);
- Other -> handle_common_reply(Other, Parent, Name, Msg, Mod, State,
- TimeoutState, Queue)
- end;
-handle_msg(Msg,
- Parent, Name, State, Mod, TimeoutState, Queue) ->
- Reply = (catch dispatch(Msg, Mod, State)),
- handle_common_reply(Reply, Parent, Name, Msg, Mod, State,
- TimeoutState, Queue).
-
-handle_msg({'$gen_call', From, Msg},
- Parent, Name, State, Mod, TimeoutState, Queue, Debug) ->
+common_reply(_Name, From, Reply, _NState, [] = _Debug) ->
+ reply(From, Reply),
+ [];
+common_reply(Name, From, Reply, NState, Debug) ->
+ reply(Name, From, Reply, NState, Debug).
+
+common_debug([] = _Debug, _Func, _Info, _Event) ->
+ [];
+common_debug(Debug, Func, Info, Event) ->
+ sys:handle_debug(Debug, Func, Info, Event).
+
+handle_msg({'$gen_call', From, Msg}, GS2State = #gs2_state { mod = Mod,
+ state = State,
+ name = Name,
+ debug = Debug }) ->
case catch Mod:handle_call(Msg, From, State) of
- {reply, Reply, NState} ->
- Debug1 = reply(Name, From, Reply, NState, Debug),
- loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue,
- Debug1);
- {reply, Reply, NState, Time1} ->
- Debug1 = reply(Name, From, Reply, NState, Debug),
- loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, Debug1);
- {noreply, NState} ->
- Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name,
- {noreply, NState}),
- loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue,
- Debug1);
- {noreply, NState, Time1} ->
- Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name,
- {noreply, NState}),
- loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, Debug1);
- {stop, Reason, Reply, NState} ->
- {'EXIT', R} =
- (catch terminate(Reason, Name, Msg, Mod, NState, Debug)),
- reply(Name, From, Reply, NState, Debug),
- exit(R);
- Other ->
- handle_common_reply(Other, Parent, Name, Msg, Mod, State,
- TimeoutState, Queue, Debug)
+ {reply, Reply, NState} ->
+ Debug1 = common_reply(Name, From, Reply, NState, Debug),
+ loop(GS2State #gs2_state { state = NState,
+ time = infinity,
+ debug = Debug1 });
+ {reply, Reply, NState, Time1} ->
+ Debug1 = common_reply(Name, From, Reply, NState, Debug),
+ loop(GS2State #gs2_state { state = NState,
+ time = Time1,
+ debug = Debug1});
+ {noreply, NState} ->
+ Debug1 = common_debug(Debug, {?MODULE, print_event}, Name,
+ {noreply, NState}),
+ loop(GS2State #gs2_state {state = NState,
+ time = infinity,
+ debug = Debug1});
+ {noreply, NState, Time1} ->
+ Debug1 = common_debug(Debug, {?MODULE, print_event}, Name,
+ {noreply, NState}),
+ loop(GS2State #gs2_state {state = NState,
+ time = Time1,
+ debug = Debug1});
+ {stop, Reason, Reply, NState} ->
+ {'EXIT', R} =
+ (catch terminate(Reason, Msg,
+ GS2State #gs2_state { state = NState })),
+ reply(Name, From, Reply, NState, Debug),
+ exit(R);
+ Other ->
+ handle_common_reply(Other, Msg, GS2State)
end;
-handle_msg(Msg,
- Parent, Name, State, Mod, TimeoutState, Queue, Debug) ->
+handle_msg(Msg, GS2State = #gs2_state { mod = Mod, state = State }) ->
Reply = (catch dispatch(Msg, Mod, State)),
- handle_common_reply(Reply, Parent, Name, Msg, Mod, State,
- TimeoutState, Queue, Debug).
+ handle_common_reply(Reply, Msg, GS2State).
-handle_common_reply(Reply, Parent, Name, Msg, Mod, State,
- TimeoutState, Queue) ->
+handle_common_reply(Reply, Msg, GS2State = #gs2_state { name = Name,
+ debug = Debug}) ->
case Reply of
- {noreply, NState} ->
- loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, []);
- {noreply, NState, Time1} ->
- loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, []);
+ {noreply, NState} ->
+ Debug1 = common_debug(Debug, {?MODULE, print_event}, Name,
+ {noreply, NState}),
+ loop(GS2State #gs2_state { state = NState,
+ time = infinity,
+ debug = Debug1 });
+ {noreply, NState, Time1} ->
+ Debug1 = common_debug(Debug, {?MODULE, print_event}, Name,
+ {noreply, NState}),
+ loop(GS2State #gs2_state { state = NState,
+ time = Time1,
+ debug = Debug1 });
_ ->
- handle_common_termination(Reply, Name, Msg, Mod, State, [])
+ handle_common_termination(Reply, Msg, GS2State)
end.
-handle_common_reply(Reply, Parent, Name, Msg, Mod, State, TimeoutState, Queue,
- Debug) ->
+handle_common_termination(Reply, Msg, GS2State) ->
case Reply of
- {noreply, NState} ->
- Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name,
- {noreply, NState}),
- loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue,
- Debug1);
- {noreply, NState, Time1} ->
- Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name,
- {noreply, NState}),
- loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, Debug1);
+ {stop, Reason, NState} ->
+ terminate(Reason, Msg, GS2State #gs2_state { state = NState });
+ {'EXIT', What} ->
+ terminate(What, Msg, GS2State);
_ ->
- handle_common_termination(Reply, Name, Msg, Mod, State, Debug)
- end.
-
-handle_common_termination(Reply, Name, Msg, Mod, State, Debug) ->
- case Reply of
- {stop, Reason, NState} ->
- terminate(Reason, Name, Msg, Mod, NState, Debug);
- {'EXIT', What} ->
- terminate(What, Name, Msg, Mod, State, Debug);
- _ ->
- terminate({bad_return_value, Reply}, Name, Msg, Mod, State, Debug)
+ terminate({bad_return_value, Reply}, Msg, GS2State)
end.
reply(Name, {To, Tag}, Reply, State, Debug) ->
reply({To, Tag}, Reply),
- sys:handle_debug(Debug, {?MODULE, print_event}, Name,
- {out, Reply, To, State} ).
+ sys:handle_debug(
+ Debug, {?MODULE, print_event}, Name, {out, Reply, To, State}).
%%-----------------------------------------------------------------
%% Callback functions for system messages handling.
%%-----------------------------------------------------------------
-system_continue(Parent, Debug, [Name, State, Mod, Time, TimeoutState, Queue]) ->
- loop(Parent, Name, State, Mod, Time, TimeoutState, Queue, Debug).
+system_continue(Parent, Debug, GS2State) ->
+ loop(GS2State #gs2_state { parent = Parent, debug = Debug }).
-ifdef(use_specs).
-spec system_terminate(_, _, _, [_]) -> no_return().
-endif.
-system_terminate(Reason, _Parent, Debug, [Name, State, Mod, _Time,
- _TimeoutState, _Queue]) ->
- terminate(Reason, Name, [], Mod, State, Debug).
+system_terminate(Reason, _Parent, Debug, GS2State) ->
+ terminate(Reason, [], GS2State #gs2_state { debug = Debug }).
-system_code_change([Name, State, Mod, Time, TimeoutState, Queue], _Module,
- OldVsn, Extra) ->
+system_code_change(GS2State = #gs2_state { mod = Mod,
+ state = State },
+ _Module, OldVsn, Extra) ->
case catch Mod:code_change(OldVsn, State, Extra) of
- {ok, NewState} ->
- {ok, [Name, NewState, Mod, Time, TimeoutState, Queue]};
- Else ->
+ {ok, NewState} ->
+ NewGS2State = find_prioritisers(
+ GS2State #gs2_state { state = NewState }),
+ {ok, [NewGS2State]};
+ Else ->
Else
end.
@@ -951,18 +928,18 @@ system_code_change([Name, State, Mod, Time, TimeoutState, Queue], _Module,
%%-----------------------------------------------------------------
print_event(Dev, {in, Msg}, Name) ->
case Msg of
- {'$gen_call', {From, _Tag}, Call} ->
- io:format(Dev, "*DBG* ~p got call ~p from ~w~n",
- [Name, Call, From]);
- {'$gen_cast', Cast} ->
- io:format(Dev, "*DBG* ~p got cast ~p~n",
- [Name, Cast]);
- _ ->
- io:format(Dev, "*DBG* ~p got ~p~n", [Name, Msg])
+ {'$gen_call', {From, _Tag}, Call} ->
+ io:format(Dev, "*DBG* ~p got call ~p from ~w~n",
+ [Name, Call, From]);
+ {'$gen_cast', Cast} ->
+ io:format(Dev, "*DBG* ~p got cast ~p~n",
+ [Name, Cast]);
+ _ ->
+ io:format(Dev, "*DBG* ~p got ~p~n", [Name, Msg])
end;
print_event(Dev, {out, Msg, To, State}, Name) ->
- io:format(Dev, "*DBG* ~p sent ~p to ~w, new state ~w~n",
- [Name, Msg, To, State]);
+ io:format(Dev, "*DBG* ~p sent ~p to ~w, new state ~w~n",
+ [Name, Msg, To, State]);
print_event(Dev, {noreply, State}, Name) ->
io:format(Dev, "*DBG* ~p new state ~w~n", [Name, State]);
print_event(Dev, Event, Name) ->
@@ -973,23 +950,26 @@ print_event(Dev, Event, Name) ->
%%% Terminate the server.
%%% ---------------------------------------------------
-terminate(Reason, Name, Msg, Mod, State, Debug) ->
+terminate(Reason, Msg, #gs2_state { name = Name,
+ mod = Mod,
+ state = State,
+ debug = Debug }) ->
case catch Mod:terminate(Reason, State) of
- {'EXIT', R} ->
- error_info(R, Reason, Name, Msg, State, Debug),
- exit(R);
- _ ->
- case Reason of
- normal ->
- exit(normal);
- shutdown ->
- exit(shutdown);
- {shutdown,_}=Shutdown ->
- exit(Shutdown);
- _ ->
- error_info(Reason, undefined, Name, Msg, State, Debug),
- exit(Reason)
- end
+ {'EXIT', R} ->
+ error_info(R, Reason, Name, Msg, State, Debug),
+ exit(R);
+ _ ->
+ case Reason of
+ normal ->
+ exit(normal);
+ shutdown ->
+ exit(shutdown);
+ {shutdown,_}=Shutdown ->
+ exit(Shutdown);
+ _ ->
+ error_info(Reason, undefined, Name, Msg, State, Debug),
+ exit(Reason)
+ end
end.
error_info(_Reason, _RootCause, application_controller, _Msg, _State, _Debug) ->
@@ -1038,74 +1018,109 @@ opt(_, []) ->
debug_options(Name, Opts) ->
case opt(debug, Opts) of
- {ok, Options} -> dbg_options(Name, Options);
- _ -> dbg_options(Name, [])
+ {ok, Options} -> dbg_options(Name, Options);
+ _ -> dbg_options(Name, [])
end.
dbg_options(Name, []) ->
- Opts =
- case init:get_argument(generic_debug) of
- error ->
- [];
- _ ->
- [log, statistics]
- end,
+ Opts =
+ case init:get_argument(generic_debug) of
+ error ->
+ [];
+ _ ->
+ [log, statistics]
+ end,
dbg_opts(Name, Opts);
dbg_options(Name, Opts) ->
dbg_opts(Name, Opts).
dbg_opts(Name, Opts) ->
case catch sys:debug_options(Opts) of
- {'EXIT',_} ->
- format("~p: ignoring erroneous debug options - ~p~n",
- [Name, Opts]),
- [];
- Dbg ->
- Dbg
+ {'EXIT',_} ->
+ format("~p: ignoring erroneous debug options - ~p~n",
+ [Name, Opts]),
+ [];
+ Dbg ->
+ Dbg
end.
get_proc_name(Pid) when is_pid(Pid) ->
Pid;
get_proc_name({local, Name}) ->
case process_info(self(), registered_name) of
- {registered_name, Name} ->
- Name;
- {registered_name, _Name} ->
- exit(process_not_registered);
- [] ->
- exit(process_not_registered)
- end;
+ {registered_name, Name} ->
+ Name;
+ {registered_name, _Name} ->
+ exit(process_not_registered);
+ [] ->
+ exit(process_not_registered)
+ end;
get_proc_name({global, Name}) ->
case global:safe_whereis_name(Name) of
- undefined ->
- exit(process_not_registered_globally);
- Pid when Pid =:= self() ->
- Name;
- _Pid ->
- exit(process_not_registered_globally)
+ undefined ->
+ exit(process_not_registered_globally);
+ Pid when Pid =:= self() ->
+ Name;
+ _Pid ->
+ exit(process_not_registered_globally)
end.
get_parent() ->
case get('$ancestors') of
- [Parent | _] when is_pid(Parent)->
+ [Parent | _] when is_pid(Parent)->
Parent;
[Parent | _] when is_atom(Parent)->
name_to_pid(Parent);
- _ ->
- exit(process_was_not_started_by_proc_lib)
+ _ ->
+ exit(process_was_not_started_by_proc_lib)
end.
name_to_pid(Name) ->
case whereis(Name) of
- undefined ->
- case global:safe_whereis_name(Name) of
- undefined ->
- exit(could_not_find_registerd_name);
- Pid ->
- Pid
- end;
- Pid ->
- Pid
+ undefined ->
+ case global:safe_whereis_name(Name) of
+ undefined ->
+ exit(could_not_find_registerd_name);
+ Pid ->
+ Pid
+ end;
+ Pid ->
+ Pid
+ end.
+
+find_prioritisers(GS2State = #gs2_state { mod = Mod }) ->
+ PrioriCall = function_exported_or_default(
+ Mod, 'prioritise_call', 3,
+ fun (_Msg, _From, _State) -> 0 end),
+ PrioriCast = function_exported_or_default(Mod, 'prioritise_cast', 2,
+ fun (_Msg, _State) -> 0 end),
+ PrioriInfo = function_exported_or_default(Mod, 'prioritise_info', 2,
+ fun (_Msg, _State) -> 0 end),
+ GS2State #gs2_state { prioritise_call = PrioriCall,
+ prioritise_cast = PrioriCast,
+ prioritise_info = PrioriInfo }.
+
+function_exported_or_default(Mod, Fun, Arity, Default) ->
+ case erlang:function_exported(Mod, Fun, Arity) of
+ true -> case Arity of
+ 2 -> fun (Msg, GS2State = #gs2_state { state = State }) ->
+ case catch Mod:Fun(Msg, State) of
+ Res when is_integer(Res) ->
+ Res;
+ Err ->
+ handle_common_termination(Err, Msg, GS2State)
+ end
+ end;
+ 3 -> fun (Msg, From, GS2State = #gs2_state { state = State }) ->
+ case catch Mod:Fun(Msg, From, State) of
+ Res when is_integer(Res) ->
+ Res;
+ Err ->
+ handle_common_termination(Err, Msg, GS2State)
+ end
+ end
+ end;
+ false -> Default
end.
%%-----------------------------------------------------------------
@@ -1113,27 +1128,26 @@ name_to_pid(Name) ->
%%-----------------------------------------------------------------
format_status(Opt, StatusData) ->
[PDict, SysState, Parent, Debug,
- [Name, State, Mod, _Time, _TimeoutState, Queue]] = StatusData,
+ #gs2_state{name = Name, state = State, mod = Mod, queue = Queue}] =
+ StatusData,
NameTag = if is_pid(Name) ->
- pid_to_list(Name);
- is_atom(Name) ->
- Name
- end,
+ pid_to_list(Name);
+ is_atom(Name) ->
+ Name
+ end,
Header = lists:concat(["Status for generic server ", NameTag]),
Log = sys:get_debug(log, Debug, []),
- Specfic =
- case erlang:function_exported(Mod, format_status, 2) of
- true ->
- case catch Mod:format_status(Opt, [PDict, State]) of
- {'EXIT', _} -> [{data, [{"State", State}]}];
- Else -> Else
- end;
- _ ->
- [{data, [{"State", State}]}]
- end,
+ Specfic =
+ case erlang:function_exported(Mod, format_status, 2) of
+ true -> case catch Mod:format_status(Opt, [PDict, State]) of
+ {'EXIT', _} -> [{data, [{"State", State}]}];
+ Else -> Else
+ end;
+ _ -> [{data, [{"State", State}]}]
+ end,
[{header, Header},
{data, [{"Status", SysState},
- {"Parent", Parent},
- {"Logged events", Log},
+ {"Parent", Parent},
+ {"Logged events", Log},
{"Queued messages", priority_queue:to_list(Queue)}]} |
Specfic].
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 7116653c..42bddc5e 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -31,7 +31,7 @@
-module(rabbit_amqqueue).
--export([start/0, stop/0, declare/5, delete/3, purge/1]).
+-export([start/0, stop/0, declare/5, delete_exclusive/1, delete/3, purge/1]).
-export([internal_declare/2, internal_delete/1,
maybe_run_queue_via_backing_queue/2,
update_ram_duration/1, set_ram_duration_target/2,
@@ -115,6 +115,9 @@
(rabbit_types:amqqueue())
-> {'ok', non_neg_integer(), non_neg_integer()}).
-spec(emit_stats/1 :: (rabbit_types:amqqueue()) -> 'ok').
+-spec(delete_exclusive/1 :: (rabbit_types:amqqueue())
+ -> rabbit_types:ok_or_error2(qlen(),
+ 'not_exclusive')).
-spec(delete/3 ::
(rabbit_types:amqqueue(), 'false', 'false')
-> qlen();
@@ -332,10 +335,10 @@ info_keys() -> rabbit_amqqueue_process:info_keys().
map(VHostPath, F) -> rabbit_misc:filter_exit_map(F, list(VHostPath)).
info(#amqqueue{ pid = QPid }) ->
- delegate_pcall(QPid, 9, info, infinity).
+ delegate_call(QPid, info, infinity).
info(#amqqueue{ pid = QPid }, Items) ->
- case delegate_pcall(QPid, 9, {info, Items}, infinity) of
+ case delegate_call(QPid, {info, Items}, infinity) of
{ok, Res} -> Res;
{error, Error} -> throw(Error)
end.
@@ -345,7 +348,7 @@ info_all(VHostPath) -> map(VHostPath, fun (Q) -> info(Q) end).
info_all(VHostPath, Items) -> map(VHostPath, fun (Q) -> info(Q, Items) end).
consumers(#amqqueue{ pid = QPid }) ->
- delegate_pcall(QPid, 9, consumers, infinity).
+ delegate_call(QPid, consumers, infinity).
consumers_all(VHostPath) ->
lists:concat(
@@ -357,7 +360,10 @@ consumers_all(VHostPath) ->
stat(#amqqueue{pid = QPid}) -> delegate_call(QPid, stat, infinity).
emit_stats(#amqqueue{pid = QPid}) ->
- delegate_pcast(QPid, 7, emit_stats).
+ delegate_cast(QPid, emit_stats).
+
+delete_exclusive(#amqqueue{ pid = QPid }) ->
+ gen_server2:call(QPid, delete_exclusive, infinity).
delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) ->
delegate_call(QPid, {delete, IfUnused, IfEmpty}, infinity).
@@ -380,10 +386,10 @@ requeue(QPid, MsgIds, ChPid) ->
delegate_call(QPid, {requeue, MsgIds, ChPid}, infinity).
ack(QPid, Txn, MsgIds, ChPid) ->
- delegate_pcast(QPid, 7, {ack, Txn, MsgIds, ChPid}).
+ delegate_cast(QPid, {ack, Txn, MsgIds, ChPid}).
reject(QPid, MsgIds, Requeue, ChPid) ->
- delegate_pcast(QPid, 7, {reject, MsgIds, Requeue, ChPid}).
+ delegate_cast(QPid, {reject, MsgIds, Requeue, ChPid}).
commit_all(QPids, Txn, ChPid) ->
safe_delegate_call_ok(
@@ -419,10 +425,10 @@ basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) ->
infinity).
notify_sent(QPid, ChPid) ->
- delegate_pcast(QPid, 7, {notify_sent, ChPid}).
+ delegate_cast(QPid, {notify_sent, ChPid}).
unblock(QPid, ChPid) ->
- delegate_pcast(QPid, 7, {unblock, ChPid}).
+ delegate_cast(QPid, {unblock, ChPid}).
flush_all(QPids, ChPid) ->
delegate:invoke_no_result(
@@ -452,20 +458,19 @@ internal_delete(QueueName) ->
end.
maybe_run_queue_via_backing_queue(QPid, Fun) ->
- gen_server2:pcall(QPid, 6, {maybe_run_queue_via_backing_queue, Fun},
- infinity).
+ gen_server2:call(QPid, {maybe_run_queue_via_backing_queue, Fun}, infinity).
update_ram_duration(QPid) ->
- gen_server2:pcast(QPid, 8, update_ram_duration).
+ gen_server2:cast(QPid, update_ram_duration).
set_ram_duration_target(QPid, Duration) ->
- gen_server2:pcast(QPid, 8, {set_ram_duration_target, Duration}).
+ gen_server2:cast(QPid, {set_ram_duration_target, Duration}).
set_maximum_since_use(QPid, Age) ->
- gen_server2:pcast(QPid, 8, {set_maximum_since_use, Age}).
+ gen_server2:cast(QPid, {set_maximum_since_use, Age}).
maybe_expire(QPid) ->
- gen_server2:pcast(QPid, 8, maybe_expire).
+ gen_server2:cast(QPid, maybe_expire).
on_node_down(Node) ->
[Hook() ||
@@ -505,11 +510,6 @@ safe_delegate_call_ok(F, Pids) ->
delegate_call(Pid, Msg, Timeout) ->
delegate:invoke(Pid, fun (P) -> gen_server2:call(P, Msg, Timeout) end).
-delegate_pcall(Pid, Pri, Msg, Timeout) ->
- delegate:invoke(Pid,
- fun (P) -> gen_server2:pcall(P, Pri, Msg, Timeout) end).
-
-delegate_pcast(Pid, Pri, Msg) ->
- delegate:invoke_no_result(Pid,
- fun (P) -> gen_server2:pcast(P, Pri, Msg) end).
+delegate_cast(Pid, Msg) ->
+ delegate:invoke(Pid, fun (P) -> gen_server2:cast(P, Msg) end).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 9a40580e..2c53a8e3 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -42,7 +42,8 @@
-export([start_link/1, info_keys/0]).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
- handle_info/2, handle_pre_hibernate/1]).
+ handle_info/2, handle_pre_hibernate/1, prioritise_call/3,
+ prioritise_cast/2, prioritise_info/2]).
-import(queue).
-import(erlang).
@@ -152,7 +153,8 @@ init_expires(State = #q{q = #amqqueue{arguments = Arguments}}) ->
declare(Recover, From,
State = #q{q = Q = #amqqueue{name = QName, durable = IsDurable},
- backing_queue = BQ, backing_queue_state = undefined}) ->
+ backing_queue = BQ, backing_queue_state = undefined,
+ stats_timer = StatsTimer}) ->
case rabbit_amqqueue:internal_declare(Q, Recover) of
not_found -> {stop, normal, not_found, State};
Q -> gen_server2:reply(From, {new, Q}),
@@ -163,9 +165,12 @@ declare(Recover, From,
self(), {rabbit_amqqueue,
set_ram_duration_target, [self()]}),
BQS = BQ:init(QName, IsDurable, Recover),
+ State1 = init_expires(State#q{backing_queue_state = BQS}),
rabbit_event:notify(queue_created,
- infos(?CREATION_EVENT_KEYS, State)),
- noreply(init_expires(State#q{backing_queue_state = BQS}));
+ infos(?CREATION_EVENT_KEYS, State1)),
+ rabbit_event:if_enabled(StatsTimer,
+ fun() -> emit_stats(State1) end),
+ noreply(State1);
Q1 -> {stop, normal, {existing, Q1}, State}
end.
@@ -265,14 +270,8 @@ ensure_stats_timer(State = #q{stats_timer = StatsTimer,
q = Q}) ->
State#q{stats_timer = rabbit_event:ensure_stats_timer(
StatsTimer,
- fun() -> emit_stats(State) end,
fun() -> rabbit_amqqueue:emit_stats(Q) end)}.
-stop_stats_timer(State = #q{stats_timer = StatsTimer}) ->
- State#q{stats_timer = rabbit_event:stop_stats_timer(
- StatsTimer,
- fun() -> emit_stats(State) end)}.
-
assert_invariant(#q{active_consumers = AC,
backing_queue = BQ, backing_queue_state = BQS}) ->
true = (queue:is_empty(AC) orelse BQ:is_empty(BQS)).
@@ -589,6 +588,34 @@ emit_stats(State) ->
%---------------------------------------------------------------------------
+prioritise_call(Msg, _From, _State) ->
+ case Msg of
+ info -> 9;
+ {info, _Items} -> 9;
+ consumers -> 9;
+ delete_exclusive -> 8;
+ {maybe_run_queue_via_backing_queue, _Fun} -> 6;
+ _ -> 0
+ end.
+
+prioritise_cast(Msg, _State) ->
+ case Msg of
+ update_ram_duration -> 8;
+ {set_ram_duration_target, _Duration} -> 8;
+ {set_maximum_since_use, _Age} -> 8;
+ maybe_expire -> 8;
+ emit_stats -> 7;
+ {ack, _Txn, _MsgIds, _ChPid} -> 7;
+ {reject, _MsgIds, _Requeue, _ChPid} -> 7;
+ {notify_sent, _ChPid} -> 7;
+ {unblock, _ChPid} -> 7;
+ _ -> 0
+ end.
+
+prioritise_info({'DOWN', _MonitorRef, process, DownPid, _Reason},
+ #q{q = #amqqueue{exclusive_owner = DownPid}}) -> 8;
+prioritise_info(_Msg, _State) -> 0.
+
handle_call({init, Recover}, From,
State = #q{q = #amqqueue{exclusive_owner = none}}) ->
declare(Recover, From, State);
@@ -758,6 +785,16 @@ handle_call(stat, _From, State = #q{backing_queue = BQ,
active_consumers = ActiveConsumers}) ->
reply({ok, BQ:len(BQS), queue:len(ActiveConsumers)}, State);
+handle_call(delete_exclusive, _From,
+ State = #q{ backing_queue_state = BQS,
+ backing_queue = BQ,
+ q = #amqqueue{exclusive_owner = Owner}
+ }) when Owner =/= none ->
+ {stop, normal, {ok, BQ:len(BQS)}, State};
+
+handle_call(delete_exclusive, _From, State) ->
+ reply({error, not_exclusive}, State);
+
handle_call({delete, IfUnused, IfEmpty}, _From,
State = #q{backing_queue_state = BQS, backing_queue = BQ}) ->
IsEmpty = BQ:is_empty(BQS),
@@ -886,9 +923,12 @@ handle_cast(maybe_expire, State) ->
false -> noreply(ensure_expiry_timer(State))
end;
-handle_cast(emit_stats, State) ->
+handle_cast(emit_stats, State = #q{stats_timer = StatsTimer}) ->
+ %% Do not invoke noreply as it would see no timer and create a new one.
emit_stats(State),
- noreply(State).
+ State1 = State#q{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)},
+ assert_invariant(State1),
+ {noreply, State1}.
handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason},
State = #q{q = #amqqueue{exclusive_owner = DownPid}}) ->
@@ -919,11 +959,14 @@ handle_info(Info, State) ->
handle_pre_hibernate(State = #q{backing_queue_state = undefined}) ->
{hibernate, State};
handle_pre_hibernate(State = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
+ backing_queue_state = BQS,
+ stats_timer = StatsTimer}) ->
BQS1 = BQ:handle_pre_hibernate(BQS),
%% no activity for a while == 0 egress and ingress rates
DesiredDuration =
rabbit_memory_monitor:report_ram_duration(self(), infinity),
BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1),
- {hibernate, stop_stats_timer(
- stop_rate_timer(State#q{backing_queue_state = BQS2}))}.
+ rabbit_event:if_enabled(StatsTimer, fun () -> emit_stats(State) end),
+ State1 = State#q{stats_timer = rabbit_event:stop_stats_timer(StatsTimer),
+ backing_queue_state = BQS2},
+ {hibernate, stop_rate_timer(State1)}.
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index d62fc07c..38412982 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -55,28 +55,24 @@
rabbit_types:message()) -> rabbit_types:delivery()).
-spec(message/4 ::
(rabbit_exchange:name(), rabbit_router:routing_key(),
- properties_input(), binary())
- -> (rabbit_types:message() | rabbit_types:error(any()))).
+ properties_input(), binary()) ->
+ (rabbit_types:message() | rabbit_types:error(any()))).
-spec(properties/1 ::
(properties_input()) -> rabbit_framing:amqp_property_record()).
-spec(publish/4 ::
(rabbit_exchange:name(), rabbit_router:routing_key(),
- properties_input(), binary())
- -> publish_result()).
+ properties_input(), binary()) -> publish_result()).
-spec(publish/7 ::
(rabbit_exchange:name(), rabbit_router:routing_key(),
boolean(), boolean(), rabbit_types:maybe(rabbit_types:txn()),
- properties_input(), binary())
- -> publish_result()).
--spec(build_content/2 ::
- (rabbit_framing:amqp_property_record(), binary())
- -> rabbit_types:content()).
--spec(from_content/1 ::
- (rabbit_types:content())
- -> {rabbit_framing:amqp_property_record(), binary()}).
--spec(is_message_persistent/1 ::
- (rabbit_types:decoded_content())
- -> (boolean() | {'invalid', non_neg_integer()})).
+ properties_input(), binary()) -> publish_result()).
+-spec(build_content/2 :: (rabbit_framing:amqp_property_record(), binary()) ->
+ rabbit_types:content()).
+-spec(from_content/1 :: (rabbit_types:content()) ->
+ {rabbit_framing:amqp_property_record(), binary()}).
+-spec(is_message_persistent/1 :: (rabbit_types:decoded_content()) ->
+ (boolean() |
+ {'invalid', non_neg_integer()})).
-endif.
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 174eab40..fe36cef9 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -41,7 +41,8 @@
-export([emit_stats/1, flush/1]).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
- handle_info/2, handle_pre_hibernate/1]).
+ handle_info/2, handle_pre_hibernate/1, prioritise_call/3,
+ prioritise_cast/2]).
-record(ch, {state, channel, reader_pid, writer_pid, limiter_pid,
start_limiter_fun, transaction_id, tx_participants, next_tag,
@@ -57,7 +58,8 @@
consumer_count,
messages_unacknowledged,
acks_uncommitted,
- prefetch_count]).
+ prefetch_count,
+ client_flow_blocked]).
-define(CREATION_EVENT_KEYS,
[pid,
@@ -131,10 +133,10 @@ list() ->
info_keys() -> ?INFO_KEYS.
info(Pid) ->
- gen_server2:pcall(Pid, 9, info, infinity).
+ gen_server2:call(Pid, info, infinity).
info(Pid, Items) ->
- case gen_server2:pcall(Pid, 9, {info, Items}, infinity) of
+ case gen_server2:call(Pid, {info, Items}, infinity) of
{ok, Res} -> Res;
{error, Error} -> throw(Error)
end.
@@ -146,7 +148,7 @@ info_all(Items) ->
rabbit_misc:filter_exit_map(fun (C) -> info(C, Items) end, list()).
emit_stats(Pid) ->
- gen_server2:pcast(Pid, 7, emit_stats).
+ gen_server2:cast(Pid, emit_stats).
flush(Pid) ->
gen_server2:call(Pid, flush).
@@ -157,6 +159,7 @@ init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid,
StartLimiterFun]) ->
process_flag(trap_exit, true),
ok = pg_local:join(rabbit_channels, self()),
+ StatsTimer = rabbit_event:init_stats_timer(),
State = #ch{state = starting,
channel = Channel,
reader_pid = ReaderPid,
@@ -174,11 +177,26 @@ init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid,
consumer_mapping = dict:new(),
blocking = dict:new(),
queue_collector_pid = CollectorPid,
- stats_timer = rabbit_event:init_stats_timer()},
+ stats_timer = StatsTimer},
rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)),
+ rabbit_event:if_enabled(StatsTimer,
+ fun() -> internal_emit_stats(State) end),
{ok, State, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
+prioritise_call(Msg, _From, _State) ->
+ case Msg of
+ info -> 9;
+ {info, _Items} -> 9;
+ _ -> 0
+ end.
+
+prioritise_cast(Msg, _State) ->
+ case Msg of
+ emit_stats -> 7;
+ _ -> 0
+ end.
+
handle_call(info, _From, State) ->
reply(infos(?INFO_KEYS, State), State);
@@ -237,17 +255,22 @@ handle_cast({deliver, ConsumerTag, AckRequired, Msg},
end, State),
noreply(State1#ch{next_tag = DeliveryTag + 1});
-handle_cast(emit_stats, State) ->
+handle_cast(emit_stats, State = #ch{stats_timer = StatsTimer}) ->
internal_emit_stats(State),
- {noreply, State}.
+ {noreply,
+ State#ch{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)}}.
handle_info({'DOWN', _MRef, process, QPid, _Reason}, State) ->
erase_queue_stats(QPid),
{noreply, queue_blocked(QPid, State)}.
-handle_pre_hibernate(State) ->
+handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) ->
ok = clear_permission_cache(),
- {hibernate, stop_stats_timer(State)}.
+ rabbit_event:if_enabled(StatsTimer, fun () ->
+ internal_emit_stats(State)
+ end),
+ {hibernate,
+ State#ch{stats_timer = rabbit_event:stop_stats_timer(StatsTimer)}}.
terminate(_Reason, State = #ch{state = terminating}) ->
terminate(State);
@@ -277,14 +300,8 @@ ensure_stats_timer(State = #ch{stats_timer = StatsTimer}) ->
ChPid = self(),
State#ch{stats_timer = rabbit_event:ensure_stats_timer(
StatsTimer,
- fun() -> internal_emit_stats(State) end,
fun() -> emit_stats(ChPid) end)}.
-stop_stats_timer(State = #ch{stats_timer = StatsTimer}) ->
- State#ch{stats_timer = rabbit_event:stop_stats_timer(
- StatsTimer,
- fun() -> internal_emit_stats(State) end)}.
-
return_ok(State, true, _Msg) -> {noreply, State};
return_ok(State, false, Msg) -> {reply, Msg, State}.
@@ -298,14 +315,10 @@ terminating(Reason, State = #ch{channel = Channel, reader_pid = Reader}) ->
return_queue_declare_ok(#resource{name = ActualName},
NoWait, MessageCount, ConsumerCount, State) ->
- NewState = State#ch{most_recently_declared_queue = ActualName},
- case NoWait of
- true -> {noreply, NewState};
- false -> Reply = #'queue.declare_ok'{queue = ActualName,
- message_count = MessageCount,
- consumer_count = ConsumerCount},
- {reply, Reply, NewState}
- end.
+ return_ok(State#ch{most_recently_declared_queue = ActualName}, NoWait,
+ #'queue.declare_ok'{queue = ActualName,
+ message_count = MessageCount,
+ consumer_count = ConsumerCount}).
check_resource_access(Username, Resource, Perm) ->
V = {Resource, Perm},
@@ -327,30 +340,30 @@ clear_permission_cache() ->
erase(permission_cache),
ok.
-check_configure_permitted(Resource, #ch{ username = Username}) ->
+check_configure_permitted(Resource, #ch{username = Username}) ->
check_resource_access(Username, Resource, configure).
-check_write_permitted(Resource, #ch{ username = Username}) ->
+check_write_permitted(Resource, #ch{username = Username}) ->
check_resource_access(Username, Resource, write).
-check_read_permitted(Resource, #ch{ username = Username}) ->
+check_read_permitted(Resource, #ch{username = Username}) ->
check_resource_access(Username, Resource, read).
-expand_queue_name_shortcut(<<>>, #ch{ most_recently_declared_queue = <<>> }) ->
+expand_queue_name_shortcut(<<>>, #ch{most_recently_declared_queue = <<>>}) ->
rabbit_misc:protocol_error(
not_found, "no previously declared queue", []);
-expand_queue_name_shortcut(<<>>, #ch{ virtual_host = VHostPath,
- most_recently_declared_queue = MRDQ }) ->
+expand_queue_name_shortcut(<<>>, #ch{virtual_host = VHostPath,
+ most_recently_declared_queue = MRDQ}) ->
rabbit_misc:r(VHostPath, queue, MRDQ);
-expand_queue_name_shortcut(QueueNameBin, #ch{ virtual_host = VHostPath }) ->
+expand_queue_name_shortcut(QueueNameBin, #ch{virtual_host = VHostPath}) ->
rabbit_misc:r(VHostPath, queue, QueueNameBin).
expand_routing_key_shortcut(<<>>, <<>>,
- #ch{ most_recently_declared_queue = <<>> }) ->
+ #ch{most_recently_declared_queue = <<>>}) ->
rabbit_misc:protocol_error(
not_found, "no previously declared queue", []);
expand_routing_key_shortcut(<<>>, <<>>,
- #ch{ most_recently_declared_queue = MRDQ }) ->
+ #ch{most_recently_declared_queue = MRDQ}) ->
MRDQ;
expand_routing_key_shortcut(_QueueNameBin, RoutingKey, _State) ->
RoutingKey.
@@ -421,11 +434,11 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
%% certain to want to look at delivery-mode and priority.
DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content),
IsPersistent = is_message_persistent(DecodedContent),
- Message = #basic_message{exchange_name = ExchangeName,
- routing_key = RoutingKey,
- content = DecodedContent,
- guid = rabbit_guid:guid(),
- is_persistent = IsPersistent},
+ Message = #basic_message{exchange_name = ExchangeName,
+ routing_key = RoutingKey,
+ content = DecodedContent,
+ guid = rabbit_guid:guid(),
+ is_persistent = IsPersistent},
{RoutingRes, DeliveredQPids} =
rabbit_exchange:publish(
Exchange,
@@ -464,9 +477,9 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
handle_method(#'basic.get'{queue = QueueNameBin,
no_ack = NoAck},
- _, State = #ch{ writer_pid = WriterPid,
- reader_pid = ReaderPid,
- next_tag = DeliveryTag }) ->
+ _, State = #ch{writer_pid = WriterPid,
+ reader_pid = ReaderPid,
+ next_tag = DeliveryTag}) ->
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
check_read_permitted(QueueName, State),
case rabbit_amqqueue:with_exclusive_access_or_die(
@@ -496,15 +509,15 @@ handle_method(#'basic.get'{queue = QueueNameBin,
{reply, #'basic.get_empty'{}, State}
end;
-handle_method(#'basic.consume'{queue = QueueNameBin,
+handle_method(#'basic.consume'{queue = QueueNameBin,
consumer_tag = ConsumerTag,
- no_local = _, % FIXME: implement
- no_ack = NoAck,
- exclusive = ExclusiveConsume,
- nowait = NoWait},
- _, State = #ch{ reader_pid = ReaderPid,
- limiter_pid = LimiterPid,
- consumer_mapping = ConsumerMapping }) ->
+ no_local = _, % FIXME: implement
+ no_ack = NoAck,
+ exclusive = ExclusiveConsume,
+ nowait = NoWait},
+ _, State = #ch{reader_pid = ReaderPid,
+ limiter_pid = LimiterPid,
+ consumer_mapping = ConsumerMapping }) ->
case dict:find(ConsumerTag, ConsumerMapping) of
error ->
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
@@ -599,7 +612,7 @@ handle_method(#'basic.qos'{prefetch_count = PrefetchCount},
{reply, #'basic.qos_ok'{}, State#ch{limiter_pid = LimiterPid2}};
handle_method(#'basic.recover_async'{requeue = true},
- _, State = #ch{ unacked_message_q = UAMQ }) ->
+ _, State = #ch{unacked_message_q = UAMQ}) ->
ok = fold_per_queue(
fun (QPid, MsgIds, ok) ->
%% The Qpid python test suite incorrectly assumes
@@ -614,8 +627,8 @@ handle_method(#'basic.recover_async'{requeue = true},
{noreply, State#ch{unacked_message_q = queue:new()}};
handle_method(#'basic.recover_async'{requeue = false},
- _, State = #ch{ writer_pid = WriterPid,
- unacked_message_q = UAMQ }) ->
+ _, State = #ch{writer_pid = WriterPid,
+ unacked_message_q = UAMQ}) ->
ok = rabbit_misc:queue_fold(
fun ({_DeliveryTag, none, _Msg}, ok) ->
%% Was sent as a basic.get_ok. Don't redeliver
@@ -648,7 +661,7 @@ handle_method(#'basic.recover'{requeue = Requeue}, Content, State) ->
handle_method(#'basic.reject'{delivery_tag = DeliveryTag,
requeue = Requeue},
- _, State = #ch{ unacked_message_q = UAMQ}) ->
+ _, State = #ch{unacked_message_q = UAMQ}) ->
{Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, false),
ok = fold_per_queue(
fun (QPid, MsgIds, ok) ->
@@ -665,7 +678,7 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
internal = false,
nowait = NoWait,
arguments = Args},
- _, State = #ch{ virtual_host = VHostPath }) ->
+ _, State = #ch{virtual_host = VHostPath}) ->
CheckedType = rabbit_exchange:check_type(TypeNameBin),
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
check_configure_permitted(ExchangeName, State),
@@ -693,7 +706,7 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
passive = true,
nowait = NoWait},
- _, State = #ch{ virtual_host = VHostPath }) ->
+ _, State = #ch{virtual_host = VHostPath}) ->
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
check_configure_permitted(ExchangeName, State),
_ = rabbit_exchange:lookup_or_die(ExchangeName),
@@ -702,7 +715,7 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
handle_method(#'exchange.delete'{exchange = ExchangeNameBin,
if_unused = IfUnused,
nowait = NoWait},
- _, State = #ch { virtual_host = VHostPath }) ->
+ _, State = #ch{virtual_host = VHostPath}) ->
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
check_configure_permitted(ExchangeName, State),
case rabbit_exchange:delete(ExchangeName, IfUnused) of
@@ -856,6 +869,7 @@ handle_method(#'channel.flow'{active = true}, _,
end,
{reply, #'channel.flow_ok'{active = true},
State#ch{limiter_pid = LimiterPid1}};
+
handle_method(#'channel.flow'{active = false}, _,
State = #ch{limiter_pid = LimiterPid,
consumer_mapping = Consumers}) ->
@@ -1111,6 +1125,8 @@ i(acks_uncommitted, #ch{uncommitted_ack_q = UAQ}) ->
queue:len(UAQ);
i(prefetch_count, #ch{limiter_pid = LimiterPid}) ->
rabbit_limiter:get_limit(LimiterPid);
+i(client_flow_blocked, #ch{limiter_pid = LimiterPid}) ->
+ rabbit_limiter:is_blocked(LimiterPid);
i(Item, _) ->
throw({bad_argument, Item}).
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index a3b6f369..57efe7cc 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -347,6 +347,8 @@ format_info_item([{TableEntryKey, TableEntryType, _TableEntryValue} | _] =
Value) when is_binary(TableEntryKey) andalso
is_atom(TableEntryType) ->
io_lib:format("~1000000000000p", [prettify_amqp_table(Value)]);
+format_info_item([C|_] = Value) when is_number(C), C >= 32, C =< 255 ->
+ Value;
format_info_item(Value) ->
io_lib:format("~w", [Value]).
diff --git a/src/rabbit_dialyzer.erl b/src/rabbit_dialyzer.erl
index 51bd6b1f..a9806305 100644
--- a/src/rabbit_dialyzer.erl
+++ b/src/rabbit_dialyzer.erl
@@ -61,26 +61,27 @@ add_to_plt(PltPath, FilesString) ->
{init_plt, PltPath},
{output_plt, PltPath},
{files, Files}]),
- print_warnings(DialyzerWarnings),
+ print_warnings(DialyzerWarnings, fun dialyzer:format_warning/1),
ok.
dialyze_files(PltPath, ModifiedFiles) ->
Files = string:tokens(ModifiedFiles, " "),
DialyzerWarnings = dialyzer:run([{init_plt, PltPath},
- {files, Files}]),
+ {files, Files},
+ {warnings, [behaviours,
+ race_conditions]}]),
case DialyzerWarnings of
- [] -> io:format("~nOk~n"),
- ok;
- _ -> io:format("~nFAILED with the following warnings:~n"),
- print_warnings(DialyzerWarnings),
- fail
- end.
-
-print_warnings(Warnings) ->
- [io:format("~s", [dialyzer:format_warning(W)]) || W <- Warnings],
- io:format("~n"),
+ [] -> io:format("~nOk~n");
+ _ -> io:format("~n~nFAILED with the following ~p warnings:~n~n",
+ [length(DialyzerWarnings)]),
+ print_warnings(DialyzerWarnings, fun dialyzer:format_warning/1)
+ end,
ok.
+print_warnings(Warnings, FormatFun) ->
+ [io:format("~s~n", [FormatFun(W)]) || W <- Warnings],
+ io:format("~n").
+
otp_apps_dependencies_paths() ->
[code:lib_dir(App, ebin) ||
App <- [kernel, stdlib, sasl, mnesia, os_mon, ssl, eunit, tools]].
diff --git a/src/rabbit_event.erl b/src/rabbit_event.erl
index 0f00537a..2b236531 100644
--- a/src/rabbit_event.erl
+++ b/src/rabbit_event.erl
@@ -34,9 +34,9 @@
-include("rabbit.hrl").
-export([start_link/0]).
--export([init_stats_timer/0, ensure_stats_timer/3, stop_stats_timer/2]).
--export([ensure_stats_timer_after/2, reset_stats_timer_after/1]).
--export([stats_level/1]).
+-export([init_stats_timer/0, ensure_stats_timer/2, stop_stats_timer/1]).
+-export([reset_stats_timer/1]).
+-export([stats_level/1, if_enabled/2]).
-export([notify/2]).
%%----------------------------------------------------------------------------
@@ -71,11 +71,11 @@
-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
-spec(init_stats_timer/0 :: () -> state()).
--spec(ensure_stats_timer/3 :: (state(), timer_fun(), timer_fun()) -> state()).
--spec(stop_stats_timer/2 :: (state(), timer_fun()) -> state()).
--spec(ensure_stats_timer_after/2 :: (state(), timer_fun()) -> state()).
--spec(reset_stats_timer_after/1 :: (state()) -> state()).
+-spec(ensure_stats_timer/2 :: (state(), timer_fun()) -> state()).
+-spec(stop_stats_timer/1 :: (state()) -> state()).
+-spec(reset_stats_timer/1 :: (state()) -> state()).
-spec(stats_level/1 :: (state()) -> level()).
+-spec(if_enabled/2 :: (state(), timer_fun()) -> 'ok').
-spec(notify/2 :: (event_type(), event_props()) -> 'ok').
-endif.
@@ -85,44 +85,61 @@
start_link() ->
gen_event:start_link({local, ?MODULE}).
+%% The idea is, for each stat-emitting object:
+%%
+%% On startup:
+%% Timer = init_stats_timer()
+%% notify(created event)
+%% if_enabled(internal_emit_stats) - so we immediately send something
+%%
+%% On wakeup:
+%% ensure_stats_timer(Timer, emit_stats)
+%% (Note we can't emit stats immediately, the timer may have fired 1ms ago.)
+%%
+%% emit_stats:
+%% if_enabled(internal_emit_stats)
+%% reset_stats_timer(Timer) - just bookkeeping
+%%
+%% Pre-hibernation:
+%% if_enabled(internal_emit_stats)
+%% stop_stats_timer(Timer)
+%%
+%% internal_emit_stats:
+%% notify(stats)
+
init_stats_timer() ->
{ok, StatsLevel} = application:get_env(rabbit, collect_statistics),
#state{level = StatsLevel, timer = undefined}.
-ensure_stats_timer(State = #state{level = none}, _NowFun, _TimerFun) ->
+ensure_stats_timer(State = #state{level = none}, _Fun) ->
State;
-ensure_stats_timer(State = #state{timer = undefined}, NowFun, TimerFun) ->
- NowFun(),
- {ok, TRef} = timer:apply_interval(?STATS_INTERVAL,
- erlang, apply, [TimerFun, []]),
+ensure_stats_timer(State = #state{timer = undefined}, Fun) ->
+ {ok, TRef} = timer:apply_after(?STATS_INTERVAL,
+ erlang, apply, [Fun, []]),
State#state{timer = TRef};
-ensure_stats_timer(State, _NowFun, _TimerFun) ->
+ensure_stats_timer(State, _Fun) ->
State.
-stop_stats_timer(State = #state{level = none}, _NowFun) ->
+stop_stats_timer(State = #state{level = none}) ->
State;
-stop_stats_timer(State = #state{timer = undefined}, _NowFun) ->
+stop_stats_timer(State = #state{timer = undefined}) ->
State;
-stop_stats_timer(State = #state{timer = TRef}, NowFun) ->
+stop_stats_timer(State = #state{timer = TRef}) ->
{ok, cancel} = timer:cancel(TRef),
- NowFun(),
State#state{timer = undefined}.
-ensure_stats_timer_after(State = #state{level = none}, _TimerFun) ->
- State;
-ensure_stats_timer_after(State = #state{timer = undefined}, TimerFun) ->
- {ok, TRef} = timer:apply_after(?STATS_INTERVAL,
- erlang, apply, [TimerFun, []]),
- State#state{timer = TRef};
-ensure_stats_timer_after(State, _TimerFun) ->
- State.
-
-reset_stats_timer_after(State) ->
+reset_stats_timer(State) ->
State#state{timer = undefined}.
stats_level(#state{level = Level}) ->
Level.
+if_enabled(#state{level = none}, _Fun) ->
+ ok;
+if_enabled(_State, Fun) ->
+ Fun(),
+ ok.
+
notify(Type, Props) ->
try
%% TODO: switch to os:timestamp() when we drop support for
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index da7078f1..be1dcad1 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -34,10 +34,10 @@
-behaviour(gen_server2).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
- handle_info/2]).
+ handle_info/2, prioritise_call/3]).
-export([start_link/2]).
-export([limit/2, can_send/3, ack/2, register/2, unregister/2]).
--export([get_limit/1, block/1, unblock/1]).
+-export([get_limit/1, block/1, unblock/1, is_blocked/1]).
%%----------------------------------------------------------------------------
@@ -55,6 +55,7 @@
-spec(get_limit/1 :: (maybe_pid()) -> non_neg_integer()).
-spec(block/1 :: (maybe_pid()) -> 'ok').
-spec(unblock/1 :: (maybe_pid()) -> 'ok' | 'stopped').
+-spec(is_blocked/1 :: (maybe_pid()) -> boolean()).
-endif.
@@ -107,7 +108,7 @@ get_limit(undefined) ->
get_limit(Pid) ->
rabbit_misc:with_exit_handler(
fun () -> 0 end,
- fun () -> gen_server2:pcall(Pid, 9, get_limit, infinity) end).
+ fun () -> gen_server2:call(Pid, get_limit, infinity) end).
block(undefined) ->
ok;
@@ -119,6 +120,11 @@ unblock(undefined) ->
unblock(LimiterPid) ->
gen_server2:call(LimiterPid, unblock, infinity).
+is_blocked(undefined) ->
+ false;
+is_blocked(LimiterPid) ->
+ gen_server2:call(LimiterPid, is_blocked, infinity).
+
%%----------------------------------------------------------------------------
%% gen_server callbacks
%%----------------------------------------------------------------------------
@@ -126,6 +132,9 @@ unblock(LimiterPid) ->
init([ChPid, UnackedMsgCount]) ->
{ok, #lim{ch_pid = ChPid, volume = UnackedMsgCount}}.
+prioritise_call(get_limit, _From, _State) -> 9;
+prioritise_call(_Msg, _From, _State) -> 0.
+
handle_call({can_send, _QPid, _AckRequired}, _From,
State = #lim{blocked = true}) ->
{reply, false, State};
@@ -154,7 +163,10 @@ handle_call(unblock, _From, State) ->
case maybe_notify(State, State#lim{blocked = false}) of
{cont, State1} -> {reply, ok, State1};
{stop, State1} -> {stop, normal, stopped, State1}
- end.
+ end;
+
+handle_call(is_blocked, _From, State) ->
+ {reply, blocked(State), State}.
handle_cast({ack, Count}, State = #lim{volume = Volume}) ->
NewVolume = if Volume == 0 -> 0;
@@ -183,8 +195,8 @@ code_change(_, State, _) ->
%%----------------------------------------------------------------------------
maybe_notify(OldState, NewState) ->
- case (limit_reached(OldState) orelse is_blocked(OldState)) andalso
- not (limit_reached(NewState) orelse is_blocked(NewState)) of
+ case (limit_reached(OldState) orelse blocked(OldState)) andalso
+ not (limit_reached(NewState) orelse blocked(NewState)) of
true -> NewState1 = notify_queues(NewState),
{case NewState1#lim.prefetch_count of
0 -> stop;
@@ -196,7 +208,7 @@ maybe_notify(OldState, NewState) ->
limit_reached(#lim{prefetch_count = Limit, volume = Volume}) ->
Limit =/= 0 andalso Volume >= Limit.
-is_blocked(#lim{blocked = Blocked}) -> Blocked.
+blocked(#lim{blocked = Blocked}) -> Blocked.
remember_queue(QPid, State = #lim{queues = Queues}) ->
case dict:is_key(QPid, Queues) of
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 18810833..bbecbfe2 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -40,7 +40,7 @@
-export([sync/1, gc_done/4, set_maximum_since_use/2, gc/3]). %% internal
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
- terminate/2, code_change/3]).
+ terminate/2, code_change/3, prioritise_call/3, prioritise_cast/2]).
%%----------------------------------------------------------------------------
@@ -322,8 +322,8 @@ read(Server, Guid,
%% 2. Check the cur file cache
case ets:lookup(CurFileCacheEts, Guid) of
[] ->
- Defer = fun() -> {gen_server2:pcall(
- Server, 2, {read, Guid}, infinity),
+ Defer = fun() -> {gen_server2:call(
+ Server, {read, Guid}, infinity),
CState} end,
case index_lookup(Guid, CState) of
not_found -> Defer();
@@ -345,18 +345,18 @@ remove(Server, Guids) -> gen_server2:cast(Server, {remove, Guids}).
release(_Server, []) -> ok;
release(Server, Guids) -> gen_server2:cast(Server, {release, Guids}).
sync(Server, Guids, K) -> gen_server2:cast(Server, {sync, Guids, K}).
-sync(Server) -> gen_server2:pcast(Server, 8, sync). %% internal
+sync(Server) -> gen_server2:cast(Server, sync). %% internal
gc_done(Server, Reclaimed, Source, Destination) ->
- gen_server2:pcast(Server, 8, {gc_done, Reclaimed, Source, Destination}).
+ gen_server2:cast(Server, {gc_done, Reclaimed, Source, Destination}).
set_maximum_since_use(Server, Age) ->
- gen_server2:pcast(Server, 8, {set_maximum_since_use, Age}).
+ gen_server2:cast(Server, {set_maximum_since_use, Age}).
client_init(Server, Ref) ->
{IState, IModule, Dir, GCPid,
FileHandlesEts, FileSummaryEts, DedupCacheEts, CurFileCacheEts} =
- gen_server2:pcall(Server, 7, {new_client_state, Ref}, infinity),
+ gen_server2:call(Server, {new_client_state, Ref}, infinity),
#client_msstate { file_handle_cache = dict:new(),
index_state = IState,
index_module = IModule,
@@ -376,7 +376,7 @@ client_delete_and_terminate(CState, Server, Ref) ->
ok = gen_server2:cast(Server, {client_delete, Ref}).
successfully_recovered_state(Server) ->
- gen_server2:pcall(Server, 7, successfully_recovered_state, infinity).
+ gen_server2:call(Server, successfully_recovered_state, infinity).
%%----------------------------------------------------------------------------
%% Client-side-only helpers
@@ -575,6 +575,22 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
+prioritise_call(Msg, _From, _State) ->
+ case Msg of
+ {new_client_state, _Ref} -> 7;
+ successfully_recovered_state -> 7;
+ {read, _Guid} -> 2;
+ _ -> 0
+ end.
+
+prioritise_cast(Msg, _State) ->
+ case Msg of
+ sync -> 8;
+ {gc_done, _Reclaimed, _Source, _Destination} -> 8;
+ {set_maximum_since_use, _Age} -> 8;
+ _ -> 0
+ end.
+
handle_call({read, Guid}, From, State) ->
State1 = read_message(Guid, From, State),
noreply(State1);
diff --git a/src/rabbit_msg_store_gc.erl b/src/rabbit_msg_store_gc.erl
index c7948b7e..a7855bbf 100644
--- a/src/rabbit_msg_store_gc.erl
+++ b/src/rabbit_msg_store_gc.erl
@@ -38,7 +38,7 @@
-export([set_maximum_since_use/2]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
- terminate/2, code_change/3]).
+ terminate/2, code_change/3, prioritise_cast/2]).
-record(gcstate,
{dir,
@@ -81,7 +81,7 @@ stop(Server) ->
gen_server2:call(Server, stop, infinity).
set_maximum_since_use(Pid, Age) ->
- gen_server2:pcast(Pid, 8, {set_maximum_since_use, Age}).
+ gen_server2:cast(Pid, {set_maximum_since_use, Age}).
%%----------------------------------------------------------------------------
@@ -97,6 +97,9 @@ init([Parent, Dir, IndexState, IndexModule, FileSummaryEts]) ->
hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
+prioritise_cast({set_maximum_since_use, _Age}, _State) -> 8;
+prioritise_cast(_Msg, _State) -> 0.
+
handle_call(stop, _From, State) ->
{stop, normal, ok, State}.
diff --git a/src/rabbit_net.erl b/src/rabbit_net.erl
index 2286896b..53d0d5cb 100644
--- a/src/rabbit_net.erl
+++ b/src/rabbit_net.erl
@@ -33,7 +33,7 @@
-include("rabbit.hrl").
-export([async_recv/3, close/1, controlling_process/2,
- getstat/2, peername/1, port_command/2,
+ getstat/2, peername/1, peercert/1, port_command/2,
send/2, sockname/1]).
%%---------------------------------------------------------------------------
@@ -45,28 +45,29 @@
-type(stat_option() ::
'recv_cnt' | 'recv_max' | 'recv_avg' | 'recv_oct' | 'recv_dvi' |
'send_cnt' | 'send_max' | 'send_avg' | 'send_oct' | 'send_pend').
--type(error() :: rabbit_types:error(any())).
+-type(ok_val_or_error(A) :: rabbit_types:ok_or_error2(A, any())).
+-type(ok_or_any_error() :: rabbit_types:ok_or_error(any())).
-type(socket() :: port() | #ssl_socket{}).
-spec(async_recv/3 ::
(socket(), integer(), timeout()) -> rabbit_types:ok(any())).
--spec(close/1 :: (socket()) -> rabbit_types:ok_or_error(any())).
--spec(controlling_process/2 ::
- (socket(), pid()) -> rabbit_types:ok_or_error(any())).
+-spec(close/1 :: (socket()) -> ok_or_any_error()).
+-spec(controlling_process/2 :: (socket(), pid()) -> ok_or_any_error()).
-spec(port_command/2 :: (socket(), iolist()) -> 'true').
-spec(send/2 ::
- (socket(), binary() | iolist()) -> rabbit_types:ok_or_error(any())).
+ (socket(), binary() | iolist()) -> ok_or_any_error()).
-spec(peername/1 ::
(socket())
- -> rabbit_types:ok({inet:ip_address(), rabbit_networking:ip_port()}) |
- error()).
+ -> ok_val_or_error({inet:ip_address(), rabbit_networking:ip_port()})).
+-spec(peercert/1 ::
+ (socket())
+ -> 'nossl' | ok_val_or_error(rabbit_ssl:certificate())).
-spec(sockname/1 ::
(socket())
- -> rabbit_types:ok({inet:ip_address(), rabbit_networking:ip_port()}) |
- error()).
+ -> ok_val_or_error({inet:ip_address(), rabbit_networking:ip_port()})).
-spec(getstat/2 ::
(socket(), [stat_option()])
- -> rabbit_types:ok([{stat_option(), integer()}]) | error()).
+ -> ok_val_or_error([{stat_option(), integer()}])).
-endif.
@@ -108,6 +109,11 @@ peername(Sock) when ?IS_SSL(Sock) ->
peername(Sock) when is_port(Sock) ->
inet:peername(Sock).
+peercert(Sock) when ?IS_SSL(Sock) ->
+ ssl:peercert(Sock#ssl_socket.ssl);
+peercert(Sock) when is_port(Sock) ->
+ nossl.
+
port_command(Sock, Data) when ?IS_SSL(Sock) ->
case ssl:send(Sock#ssl_socket.ssl, Data) of
ok -> self() ! {inet_reply, Sock, ok},
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index da4eb66a..db5c71f6 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -46,8 +46,6 @@
-include("rabbit.hrl").
-include_lib("kernel/include/inet.hrl").
--include_lib("ssl/src/ssl_record.hrl").
-
-define(RABBIT_TCP_OPTS, [
binary,
@@ -110,6 +108,8 @@ boot_ssl() ->
{ok, SslListeners} ->
ok = rabbit_misc:start_applications([crypto, public_key, ssl]),
{ok, SslOptsConfig} = application:get_env(ssl_options),
+ % unknown_ca errors are silently ignored prior to R14B unless we
+ % supply this verify_fun - remove when at least R14B is required
SslOpts =
case proplists:get_value(verify, SslOptsConfig, verify_none) of
verify_none -> SslOptsConfig;
@@ -118,26 +118,7 @@ boot_ssl() ->
end}
| SslOptsConfig]
end,
- % In R13B04 and R14A (at least), rc4 is incorrectly implemented.
- CipherSuites = proplists:get_value(ciphers,
- SslOpts,
- ssl:cipher_suites()),
- FilteredCipherSuites =
- [C || C <- CipherSuites,
- begin
- SuiteCode =
- if is_tuple(C) -> ssl_cipher:suite(C);
- is_list(C) -> ssl_cipher:openssl_suite(C)
- end,
- SP = ssl_cipher:security_parameters(
- SuiteCode,
- #security_parameters{}),
- SP#security_parameters.bulk_cipher_algorithm =/= ?RC4
- end],
- SslOpts1 = [{ciphers, FilteredCipherSuites}
- | [{K, V} || {K, V} <- SslOpts, K =/= ciphers]],
- [start_ssl_listener(Host, Port, SslOpts1)
- || {Host, Port} <- SslListeners],
+ [start_ssl_listener(Host, Port, SslOpts) || {Host, Port} <- SslListeners],
ok
end.
diff --git a/src/rabbit_plugin_activator.erl b/src/rabbit_plugin_activator.erl
index b23776cd..c38ef8d2 100644
--- a/src/rabbit_plugin_activator.erl
+++ b/src/rabbit_plugin_activator.erl
@@ -90,7 +90,7 @@ start() ->
%% Compile the script
ScriptFile = RootName ++ ".script",
- case systools:make_script(RootName, [local, silent]) of
+ case systools:make_script(RootName, [local, silent, exref]) of
{ok, Module, Warnings} ->
%% This gets lots of spurious no-source warnings when we
%% have .ez files, so we want to supress them to prevent
diff --git a/src/rabbit_queue_collector.erl b/src/rabbit_queue_collector.erl
index 0a49b94d..0b8efc8f 100644
--- a/src/rabbit_queue_collector.erl
+++ b/src/rabbit_queue_collector.erl
@@ -81,7 +81,7 @@ handle_call(delete_all, _From, State = #state{queues = Queues}) ->
fun () -> ok end,
fun () ->
erlang:demonitor(MonitorRef),
- rabbit_amqqueue:delete(Q, false, false)
+ rabbit_amqqueue:delete_exclusive(Q)
end)
|| {MonitorRef, Q} <- dict:to_list(Queues)],
{reply, ok, State}.
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index d6b8bb28..0b98290c 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -159,7 +159,9 @@
-define(PUB, {_, _}). %% {Guid, IsPersistent}
--define(READ_MODE, [binary, raw, read, {read_ahead, ?SEGMENT_TOTAL_SIZE}]).
+-define(READ_MODE, [binary, raw, read]).
+-define(READ_AHEAD_MODE, [{read_ahead, ?SEGMENT_TOTAL_SIZE} | ?READ_MODE]).
+-define(WRITE_MODE, [write | ?READ_MODE]).
%%----------------------------------------------------------------------------
@@ -220,8 +222,13 @@
%% public API
%%----------------------------------------------------------------------------
-init(Name, Recover, MsgStoreRecovered, ContainsCheckFun) ->
- State = #qistate { dir = Dir } = blank_state(Name, not Recover),
+init(Name, false, _MsgStoreRecovered, _ContainsCheckFun) ->
+ State = #qistate { dir = Dir } = blank_state(Name),
+ false = filelib:is_file(Dir), %% is_file == is file or dir
+ {0, [], State};
+
+init(Name, true, MsgStoreRecovered, ContainsCheckFun) ->
+ State = #qistate { dir = Dir } = blank_state(Name),
Terms = case read_shutdown_terms(Dir) of
{error, _} -> [];
{ok, Terms1} -> Terms1
@@ -356,15 +363,8 @@ recover(DurableQueues) ->
%% startup and shutdown
%%----------------------------------------------------------------------------
-blank_state(QueueName, EnsureFresh) ->
- StrName = queue_name_to_dir_name(QueueName),
- Dir = filename:join(queues_dir(), StrName),
- ok = case EnsureFresh of
- true -> false = filelib:is_file(Dir), %% is_file == is file or dir
- ok;
- false -> ok
- end,
- ok = filelib:ensure_dir(filename:join(Dir, "nothing")),
+blank_state(QueueName) ->
+ Dir = filename:join(queues_dir(), queue_name_to_dir_name(QueueName)),
{ok, MaxJournal} =
application:get_env(rabbit, queue_index_max_journal_entries),
#qistate { dir = Dir,
@@ -373,17 +373,21 @@ blank_state(QueueName, EnsureFresh) ->
dirty_count = 0,
max_journal_entries = MaxJournal }.
+clean_file_name(Dir) -> filename:join(Dir, ?CLEAN_FILENAME).
+
detect_clean_shutdown(Dir) ->
- case file:delete(filename:join(Dir, ?CLEAN_FILENAME)) of
+ case file:delete(clean_file_name(Dir)) of
ok -> true;
{error, enoent} -> false
end.
read_shutdown_terms(Dir) ->
- rabbit_misc:read_term_file(filename:join(Dir, ?CLEAN_FILENAME)).
+ rabbit_misc:read_term_file(clean_file_name(Dir)).
store_clean_shutdown(Terms, Dir) ->
- rabbit_misc:write_term_file(filename:join(Dir, ?CLEAN_FILENAME), Terms).
+ CleanFileName = clean_file_name(Dir),
+ ok = filelib:ensure_dir(CleanFileName),
+ rabbit_misc:write_term_file(CleanFileName, Terms).
init_clean(RecoveredCounts, State) ->
%% Load the journal. Since this is a clean recovery this (almost)
@@ -500,7 +504,7 @@ queue_index_walker({next, Gatherer}) when is_pid(Gatherer) ->
queue_index_walker_reader(QueueName, Gatherer) ->
State = #qistate { segments = Segments, dir = Dir } =
- recover_journal(blank_state(QueueName, false)),
+ recover_journal(blank_state(QueueName)),
[ok = segment_entries_foldr(
fun (_RelSeq, {{Guid, true}, _IsDelivered, no_ack}, ok) ->
gatherer:in(Gatherer, {Guid, 1});
@@ -578,7 +582,7 @@ append_journal_to_segment(#segment { journal_entries = JEntries,
path = Path } = Segment) ->
case array:sparse_size(JEntries) of
0 -> Segment;
- _ -> {ok, Hdl} = file_handle_cache:open(Path, [write | ?READ_MODE],
+ _ -> {ok, Hdl} = file_handle_cache:open(Path, ?WRITE_MODE,
[{write_buffer, infinity}]),
array:sparse_foldl(fun write_entry_to_segment/3, Hdl, JEntries),
ok = file_handle_cache:close(Hdl),
@@ -588,7 +592,8 @@ append_journal_to_segment(#segment { journal_entries = JEntries,
get_journal_handle(State = #qistate { journal_handle = undefined,
dir = Dir }) ->
Path = filename:join(Dir, ?JOURNAL_FILENAME),
- {ok, Hdl} = file_handle_cache:open(Path, [write | ?READ_MODE],
+ ok = filelib:ensure_dir(Path),
+ {ok, Hdl} = file_handle_cache:open(Path, ?WRITE_MODE,
[{write_buffer, infinity}]),
{Hdl, State #qistate { journal_handle = Hdl }};
get_journal_handle(State = #qistate { journal_handle = Hdl }) ->
@@ -785,7 +790,7 @@ segment_entries_foldr(Fun, Init,
load_segment(KeepAcked, #segment { path = Path }) ->
case filelib:is_file(Path) of
false -> {array_new(), 0};
- true -> {ok, Hdl} = file_handle_cache:open(Path, ?READ_MODE, []),
+ true -> {ok, Hdl} = file_handle_cache:open(Path, ?READ_AHEAD_MODE, []),
{ok, 0} = file_handle_cache:position(Hdl, bof),
Res = load_segment_entries(KeepAcked, Hdl, array_new(), 0),
ok = file_handle_cache:close(Hdl),
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index a370c97f..e500b111 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -66,6 +66,8 @@
send_pend, state, channels]).
-define(CREATION_EVENT_KEYS, [pid, address, port, peer_address, peer_port,
+ peer_cert_subject, peer_cert_issuer,
+ peer_cert_validity,
protocol, user, vhost, timeout, frame_max,
client_properties]).
@@ -369,10 +371,8 @@ mainloop(Deb, State = #v1{parent = Parent, sock= Sock, recv_ref = Ref}) ->
end),
mainloop(Deb, State);
{'$gen_cast', emit_stats} ->
- internal_emit_stats(State),
- mainloop(Deb, State#v1{stats_timer =
- rabbit_event:reset_stats_timer_after(
- State#v1.stats_timer)});
+ State1 = internal_emit_stats(State),
+ mainloop(Deb, State1);
{system, From, Request} ->
sys:handle_system_msg(Request, From,
Parent, ?MODULE, Deb, State);
@@ -690,11 +690,14 @@ refuse_connection(Sock, Exception) ->
ok = inet_op(fun () -> rabbit_net:send(Sock, <<"AMQP",0,0,9,1>>) end),
throw(Exception).
-ensure_stats_timer(State = #v1{stats_timer = StatsTimer}) ->
+ensure_stats_timer(State = #v1{stats_timer = StatsTimer,
+ connection_state = running}) ->
Self = self(),
- State#v1{stats_timer = rabbit_event:ensure_stats_timer_after(
+ State#v1{stats_timer = rabbit_event:ensure_stats_timer(
StatsTimer,
- fun() -> emit_stats(Self) end)}.
+ fun() -> emit_stats(Self) end)};
+ensure_stats_timer(State) ->
+ State.
%%--------------------------------------------------------------------------
@@ -765,7 +768,8 @@ handle_method0(#'connection.open'{virtual_host = VHostPath},
connection = Connection = #connection{
user = User,
protocol = Protocol},
- sock = Sock}) ->
+ sock = Sock,
+ stats_timer = StatsTimer}) ->
ok = rabbit_access_control:check_vhost_access(User, VHostPath),
NewConnection = Connection#connection{vhost = VHostPath},
ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol),
@@ -775,6 +779,8 @@ handle_method0(#'connection.open'{virtual_host = VHostPath},
connection = NewConnection}),
rabbit_event:notify(connection_created,
infos(?CREATION_EVENT_KEYS, State1)),
+ rabbit_event:if_enabled(StatsTimer,
+ fun() -> internal_emit_stats(State1) end),
State1;
handle_method0(#'connection.close'{}, State) when ?IS_RUNNING(State) ->
lists:foreach(fun rabbit_framing_channel:shutdown/1, all_channels()),
@@ -820,6 +826,12 @@ i(peer_address, #v1{sock = Sock}) ->
i(peer_port, #v1{sock = Sock}) ->
{ok, {_, P}} = rabbit_net:peername(Sock),
P;
+i(peer_cert_issuer, #v1{sock = Sock}) ->
+ cert_info(fun rabbit_ssl:peer_cert_issuer/1, Sock);
+i(peer_cert_subject, #v1{sock = Sock}) ->
+ cert_info(fun rabbit_ssl:peer_cert_subject/1, Sock);
+i(peer_cert_validity, #v1{sock = Sock}) ->
+ cert_info(fun rabbit_ssl:peer_cert_validity/1, Sock);
i(SockStat, #v1{sock = Sock}) when SockStat =:= recv_oct;
SockStat =:= recv_cnt;
SockStat =:= send_oct;
@@ -854,6 +866,13 @@ i(client_properties, #v1{connection = #connection{
i(Item, #v1{}) ->
throw({bad_argument, Item}).
+cert_info(F, Sock) ->
+ case rabbit_net:peercert(Sock) of
+ nossl -> '';
+ {error, no_peercert} -> '';
+ {ok, Cert} -> F(Cert)
+ end.
+
%%--------------------------------------------------------------------------
send_to_new_channel(Channel, AnalyzedFrame, State) ->
@@ -897,5 +916,6 @@ send_exception(State = #v1{connection = #connection{protocol = Protocol}},
NewState#v1.sock, CloseChannel, CloseMethod, Protocol),
NewState.
-internal_emit_stats(State) ->
- rabbit_event:notify(connection_stats, infos(?STATISTICS_KEYS, State)).
+internal_emit_stats(State = #v1{stats_timer = StatsTimer}) ->
+ rabbit_event:notify(connection_stats, infos(?STATISTICS_KEYS, State)),
+ State#v1{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)}.
diff --git a/src/rabbit_ssl.erl b/src/rabbit_ssl.erl
new file mode 100644
index 00000000..be451af6
--- /dev/null
+++ b/src/rabbit_ssl.erl
@@ -0,0 +1,173 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_ssl).
+
+-include("rabbit.hrl").
+
+-include_lib("public_key/include/public_key.hrl").
+-include_lib("ssl/src/ssl_int.hrl").
+
+-export([peer_cert_issuer/1, peer_cert_subject/1, peer_cert_validity/1]).
+
+%%--------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-export_type([certificate/0]).
+
+-type(certificate() :: binary()).
+
+-spec(peer_cert_issuer/1 :: (certificate()) -> string()).
+-spec(peer_cert_subject/1 :: (certificate()) -> string()).
+-spec(peer_cert_validity/1 :: (certificate()) -> string()).
+
+-endif.
+
+%%--------------------------------------------------------------------------
+%% High-level functions used by reader
+%%--------------------------------------------------------------------------
+
+%% Return a string describing the certificate's issuer.
+peer_cert_issuer(Cert) ->
+ cert_info(fun(#'OTPCertificate' {
+ tbsCertificate = #'OTPTBSCertificate' {
+ issuer = Issuer }}) ->
+ format_rdn_sequence(Issuer)
+ end, Cert).
+
+%% Return a string describing the certificate's subject, as per RFC4514.
+peer_cert_subject(Cert) ->
+ cert_info(fun(#'OTPCertificate' {
+ tbsCertificate = #'OTPTBSCertificate' {
+ subject = Subject }}) ->
+ format_rdn_sequence(Subject)
+ end, Cert).
+
+%% Return a string describing the certificate's validity.
+peer_cert_validity(Cert) ->
+ cert_info(fun(#'OTPCertificate' {
+ tbsCertificate = #'OTPTBSCertificate' {
+ validity = {'Validity', Start, End} }}) ->
+ lists:flatten(
+ io_lib:format("~s - ~s", [format_asn1_value(Start),
+ format_asn1_value(End)]))
+ end, Cert).
+
+%%--------------------------------------------------------------------------
+
+cert_info(F, Cert) ->
+ F(case public_key:pkix_decode_cert(Cert, otp) of
+ {ok, DecCert} -> DecCert;
+ DecCert -> DecCert
+ end).
+
+%%--------------------------------------------------------------------------
+%% Formatting functions
+%%--------------------------------------------------------------------------
+
+%% Format and rdnSequence as a RFC4514 subject string.
+format_rdn_sequence({rdnSequence, Seq}) ->
+ lists:flatten(
+ rabbit_misc:intersperse(
+ ",", lists:reverse([format_complex_rdn(RDN) || RDN <- Seq]))).
+
+%% Format an RDN set.
+format_complex_rdn(RDNs) ->
+ lists:flatten(
+ rabbit_misc:intersperse("+", [format_rdn(RDN) || RDN <- RDNs])).
+
+%% Format an RDN. If the type name is unknown, use the dotted decimal
+%% representation. See RFC4514, section 2.3.
+format_rdn(#'AttributeTypeAndValue'{type = T, value = V}) ->
+ FV = escape_rdn_value(format_asn1_value(V)),
+ Fmts = [{?'id-at-surname' , "SN"},
+ {?'id-at-givenName' , "GIVENNAME"},
+ {?'id-at-initials' , "INITIALS"},
+ {?'id-at-generationQualifier' , "GENERATIONQUALIFIER"},
+ {?'id-at-commonName' , "CN"},
+ {?'id-at-localityName' , "L"},
+ {?'id-at-stateOrProvinceName' , "ST"},
+ {?'id-at-organizationName' , "O"},
+ {?'id-at-organizationalUnitName' , "OU"},
+ {?'id-at-title' , "TITLE"},
+ {?'id-at-countryName' , "C"},
+ {?'id-at-serialNumber' , "SERIALNUMBER"},
+ {?'id-at-pseudonym' , "PSEUDONYM"},
+ {?'id-domainComponent' , "DC"},
+ {?'id-emailAddress' , "EMAILADDRESS"},
+ {?'street-address' , "STREET"}],
+ case proplists:lookup(T, Fmts) of
+ {_, Fmt} ->
+ io_lib:format(Fmt ++ "=~s", [FV]);
+ none when is_tuple(T) ->
+ TypeL = [io_lib:format("~w", [X]) || X <- tuple_to_list(T)],
+ io_lib:format("~s:~s", [rabbit_misc:intersperse(".", TypeL), FV]);
+ none ->
+ io_lib:format("~p:~s", [T, FV])
+ end.
+
+%% Escape a string as per RFC4514.
+escape_rdn_value(V) ->
+ escape_rdn_value(V, start).
+
+escape_rdn_value([], _) ->
+ [];
+escape_rdn_value([C | S], start) when C =:= $ ; C =:= $# ->
+ [$\\, C | escape_rdn_value(S, middle)];
+escape_rdn_value(S, start) ->
+ escape_rdn_value(S, middle);
+escape_rdn_value([$ ], middle) ->
+ [$\\, $ ];
+escape_rdn_value([C | S], middle) when C =:= $"; C =:= $+; C =:= $,; C =:= $;;
+ C =:= $<; C =:= $>; C =:= $\\ ->
+ [$\\, C | escape_rdn_value(S, middle)];
+escape_rdn_value([C | S], middle) when C < 32 ; C =:= 127 ->
+ %% only U+0000 needs escaping, but for display purposes it's handy
+ %% to escape all non-printable chars
+ lists:flatten(io_lib:format("\\~2.16.0B", [C])) ++
+ escape_rdn_value(S, middle);
+escape_rdn_value([C | S], middle) ->
+ [C | escape_rdn_value(S, middle)].
+
+%% Get the string representation of an OTPCertificate field.
+format_asn1_value({ST, S}) when ST =:= teletexString; ST =:= printableString;
+ ST =:= universalString; ST =:= utf8String;
+ ST =:= bmpString ->
+ if is_binary(S) -> binary_to_list(S);
+ true -> S
+ end;
+format_asn1_value({utcTime, [Y1, Y2, M1, M2, D1, D2, H1, H2,
+ Min1, Min2, S1, S2, $Z]}) ->
+ io_lib:format("20~c~c-~c~c-~c~cT~c~c:~c~c:~c~cZ",
+ [Y1, Y2, M1, M2, D1, D2, H1, H2, Min1, Min2, S1, S2]);
+format_asn1_value(V) ->
+ io_lib:format("~p", [V]).
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index a72656b7..b36ee0be 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -75,7 +75,6 @@ all_tests() ->
passed = maybe_run_cluster_dependent_tests(),
passed.
-
maybe_run_cluster_dependent_tests() ->
SecondaryNode = rabbit_misc:makenode("hare"),
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 30d3a8ae..cbc71bcc 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -472,23 +472,30 @@ delete_and_terminate(State) ->
a(State2 #vqstate { index_state = IndexState1,
msg_store_clients = undefined }).
-purge(State = #vqstate { q4 = Q4, index_state = IndexState, len = Len }) ->
+purge(State = #vqstate { q4 = Q4,
+ index_state = IndexState,
+ len = Len,
+ persistent_count = PCount }) ->
%% TODO: when there are no pending acks, which is a common case,
%% we could simply wipe the qi instead of issuing delivers and
%% acks for all the messages.
- IndexState1 = remove_queue_entries(fun rabbit_misc:queue_fold/3, Q4,
- IndexState),
- State1 = #vqstate { q1 = Q1, index_state = IndexState2 } =
- purge_betas_and_deltas(State #vqstate { q4 = queue:new(),
+ {LensByStore, IndexState1} = remove_queue_entries(
+ fun rabbit_misc:queue_fold/3, Q4,
+ orddict:new(), IndexState),
+ {LensByStore1, State1 = #vqstate { q1 = Q1, index_state = IndexState2 }} =
+ purge_betas_and_deltas(LensByStore,
+ State #vqstate { q4 = queue:new(),
index_state = IndexState1 }),
- IndexState3 = remove_queue_entries(fun rabbit_misc:queue_fold/3, Q1,
- IndexState2),
+ {LensByStore2, IndexState3} = remove_queue_entries(
+ fun rabbit_misc:queue_fold/3, Q1,
+ LensByStore1, IndexState2),
+ PCount1 = PCount - find_persistent_count(LensByStore2),
{Len, a(State1 #vqstate { q1 = queue:new(),
index_state = IndexState3,
len = 0,
ram_msg_count = 0,
ram_index_count = 0,
- persistent_count = 0 })}.
+ persistent_count = PCount1 })}.
publish(Msg, State) ->
{_SeqId, State1} = publish(Msg, false, false, State),
@@ -957,26 +964,30 @@ tx_commit_index(State = #vqstate { on_sync = #sync {
reduce_memory_use(
State1 #vqstate { index_state = IndexState1, on_sync = ?BLANK_SYNC }).
-purge_betas_and_deltas(State = #vqstate { q3 = Q3,
+purge_betas_and_deltas(LensByStore,
+ State = #vqstate { q3 = Q3,
index_state = IndexState }) ->
case bpqueue:is_empty(Q3) of
- true -> State;
- false -> IndexState1 = remove_queue_entries(fun beta_fold/3, Q3,
- IndexState),
- purge_betas_and_deltas(
- maybe_deltas_to_betas(
- State #vqstate { q3 = bpqueue:new(),
- index_state = IndexState1 }))
+ true -> {LensByStore, State};
+ false -> {LensByStore1, IndexState1} = remove_queue_entries(
+ fun beta_fold/3, Q3,
+ LensByStore, IndexState),
+ purge_betas_and_deltas(LensByStore1,
+ maybe_deltas_to_betas(
+ State #vqstate {
+ q3 = bpqueue:new(),
+ index_state = IndexState1 }))
end.
-remove_queue_entries(Fold, Q, IndexState) ->
+remove_queue_entries(Fold, Q, LensByStore, IndexState) ->
{GuidsByStore, Delivers, Acks} =
Fold(fun remove_queue_entries1/2, {orddict:new(), [], []}, Q),
ok = orddict:fold(fun (MsgStore, Guids, ok) ->
rabbit_msg_store:remove(MsgStore, Guids)
end, ok, GuidsByStore),
- rabbit_queue_index:ack(Acks,
- rabbit_queue_index:deliver(Delivers, IndexState)).
+ {sum_guids_by_store_to_len(LensByStore, GuidsByStore),
+ rabbit_queue_index:ack(Acks,
+ rabbit_queue_index:deliver(Delivers, IndexState))}.
remove_queue_entries1(
#msg_status { guid = Guid, seq_id = SeqId,
@@ -991,6 +1002,12 @@ remove_queue_entries1(
cons_if(IndexOnDisk andalso not IsDelivered, SeqId, Delivers),
cons_if(IndexOnDisk, SeqId, Acks)}.
+sum_guids_by_store_to_len(LensByStore, GuidsByStore) ->
+ orddict:fold(
+ fun (MsgStore, Guids, LensByStore1) ->
+ orddict:update_counter(MsgStore, length(Guids), LensByStore1)
+ end, LensByStore, GuidsByStore).
+
%%----------------------------------------------------------------------------
%% Internal gubbins for publishing
%%----------------------------------------------------------------------------
@@ -1117,10 +1134,8 @@ ack(MsgStoreFun, Fun, AckTags, State) ->
ok = orddict:fold(fun (MsgStore, Guids, ok) ->
MsgStoreFun(MsgStore, Guids)
end, ok, GuidsByStore),
- PCount1 = PCount - case orddict:find(?PERSISTENT_MSG_STORE, GuidsByStore) of
- error -> 0;
- {ok, Guids} -> length(Guids)
- end,
+ PCount1 = PCount - find_persistent_count(sum_guids_by_store_to_len(
+ orddict:new(), GuidsByStore)),
State1 #vqstate { index_state = IndexState1,
persistent_count = PCount1 }.
@@ -1132,6 +1147,12 @@ accumulate_ack(SeqId, {IsPersistent, Guid}, {SeqIdsAcc, Dict}) ->
{cons_if(IsPersistent, SeqId, SeqIdsAcc),
rabbit_misc:orddict_cons(find_msg_store(IsPersistent), Guid, Dict)}.
+find_persistent_count(LensByStore) ->
+ case orddict:find(?PERSISTENT_MSG_STORE, LensByStore) of
+ error -> 0;
+ {ok, Len} -> Len
+ end.
+
%%----------------------------------------------------------------------------
%% Phase changes
%%----------------------------------------------------------------------------
diff --git a/src/vm_memory_monitor.erl b/src/vm_memory_monitor.erl
index e658f005..067fa9f2 100644
--- a/src/vm_memory_monitor.erl
+++ b/src/vm_memory_monitor.erl
@@ -296,6 +296,12 @@ get_total_memory({unix, sunos}) ->
Dict = dict:from_list(lists:map(fun parse_line_sunos/1, Lines)),
dict:fetch('Memory size', Dict);
+get_total_memory({unix, aix}) ->
+ File = cmd("/usr/bin/vmstat -v"),
+ Lines = string:tokens(File, "\n"),
+ Dict = dict:from_list(lists:map(fun parse_line_aix/1, Lines)),
+ dict:fetch('memory pages', Dict) * 4096;
+
get_total_memory(_OsType) ->
unknown.
@@ -341,6 +347,17 @@ parse_line_sunos(Line) ->
[Name] -> {list_to_atom(Name), none}
end.
+%% Lines look like " 12345 memory pages"
+%% or " 80.1 maxpin percentage"
+parse_line_aix(Line) ->
+ [Value | NameWords] = string:tokens(Line, " "),
+ Name = string:join(NameWords, " "),
+ {list_to_atom(Name),
+ case lists:member($., Value) of
+ true -> trunc(list_to_float(Value));
+ false -> list_to_integer(Value)
+ end}.
+
freebsd_sysctl(Def) ->
list_to_integer(cmd("/sbin/sysctl -n " ++ Def) -- "\n").
diff --git a/src/worker_pool_worker.erl b/src/worker_pool_worker.erl
index 42049d50..f461a539 100644
--- a/src/worker_pool_worker.erl
+++ b/src/worker_pool_worker.erl
@@ -38,7 +38,7 @@
-export([set_maximum_since_use/2]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
- terminate/2, code_change/3]).
+ terminate/2, code_change/3, prioritise_cast/2]).
%%----------------------------------------------------------------------------
@@ -71,7 +71,7 @@ submit_async(Pid, Fun) ->
gen_server2:cast(Pid, {submit_async, Fun}).
set_maximum_since_use(Pid, Age) ->
- gen_server2:pcast(Pid, 8, {set_maximum_since_use, Age}).
+ gen_server2:cast(Pid, {set_maximum_since_use, Age}).
run({M, F, A}) ->
apply(M, F, A);
@@ -88,6 +88,9 @@ init([WId]) ->
{ok, WId, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
+prioritise_cast({set_maximum_since_use, _Age}, _State) -> 8;
+prioritise_cast(_Msg, _State) -> 0.
+
handle_call({submit, Fun}, From, WId) ->
gen_server2:reply(From, run(Fun)),
ok = worker_pool:idle(WId),