summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Wragg <dpw@lshift.net>2009-08-20 11:00:35 +0100
committerDavid Wragg <dpw@lshift.net>2009-08-20 11:00:35 +0100
commit6d90687a6f58792e0cbd19365b318088fa32b27b (patch)
tree8626442d55fe67c644a8b46e7b577340b7a2d13b
parent2f0a488f376f45b93b6faeb0b9b91b1015d95be5 (diff)
parentbc7e3c07d4c262946f07ac5af28d8fa7016491fd (diff)
downloadrabbitmq-server-6d90687a6f58792e0cbd19365b318088fa32b27b.tar.gz
Merge fixes from bug20342 into default
-rw-r--r--Makefile19
-rw-r--r--ebin/rabbit_app.in2
-rw-r--r--packaging/RPMS/Fedora/Makefile9
-rw-r--r--packaging/RPMS/Fedora/init.d8
-rw-r--r--packaging/debs/Debian/Makefile5
-rw-r--r--packaging/debs/Debian/debian/init.d5
-rw-r--r--src/gen_server2.erl364
-rw-r--r--src/priority_queue.erl40
-rw-r--r--src/rabbit.erl18
-rw-r--r--src/rabbit_amqqueue.erl6
-rw-r--r--src/rabbit_amqqueue_process.erl15
-rw-r--r--src/rabbit_basic.erl25
-rw-r--r--src/rabbit_channel.erl10
-rw-r--r--src/rabbit_control.erl2
-rw-r--r--src/rabbit_guid.erl26
-rw-r--r--src/rabbit_hooks.erl73
-rw-r--r--src/rabbit_misc.erl66
-rw-r--r--src/rabbit_mnesia.erl26
-rw-r--r--src/rabbit_reader.erl2
-rw-r--r--src/rabbit_tests.erl135
-rw-r--r--src/rabbit_writer.erl36
21 files changed, 732 insertions, 160 deletions
diff --git a/Makefile b/Makefile
index 5f1e1c92..1dcf7362 100644
--- a/Makefile
+++ b/Makefile
@@ -20,10 +20,10 @@ PYTHON=python
ifndef USE_SPECS
# our type specs rely on features / bug fixes in dialyzer that are
-# only available in R12B-3 upwards
+# only available in R13B upwards (R13B is eshell 5.7.1)
#
# NB: the test assumes that version number will only contain single digits
-USE_SPECS=$(shell if [ $$(erl -noshell -eval 'io:format(erlang:system_info(version)), halt().') \> "5.6.2" ]; then echo "true"; else echo "false"; fi)
+USE_SPECS=$(shell if [ $$(erl -noshell -eval 'io:format(erlang:system_info(version)), halt().') \> "5.7.0" ]; then echo "true"; else echo "false"; fi)
endif
#other args: +native +"{hipe,[o3,verbose]}" -Ddebug=true +debug_info +no_strict_record_tests
@@ -39,9 +39,6 @@ AMQP_SPEC_JSON_PATH=$(AMQP_CODEGEN_DIR)/amqp-0.8.json
ERL_CALL=erl_call -sname $(RABBITMQ_NODENAME) -e
-# for the moment we don't use boot files because they introduce a
-# dependency on particular versions of OTP applications
-#all: $(EBIN_DIR)/rabbit.boot
all: $(TARGETS)
$(EBIN_DIR)/rabbit.app: $(EBIN_DIR)/rabbit_app.in $(BEAM_TARGETS) generate_app
@@ -101,7 +98,8 @@ run-tests: all
start-background-node:
$(BASIC_SCRIPT_ENVIRONMENT_SETTINGS) \
RABBITMQ_NODE_ONLY=true \
- ./scripts/rabbitmq-server -detached; sleep 1
+ RABBITMQ_SERVER_START_ARGS="$(RABBITMQ_SERVER_START_ARGS) -detached" \
+ ./scripts/rabbitmq-server ; sleep 1
start-rabbit-on-node: all
echo "rabbit:start()." | $(ERL_CALL)
@@ -115,8 +113,11 @@ force-snapshot: all
stop-node:
-$(ERL_CALL) -q
+# code coverage will be created for subdirectory "ebin" of COVER_DIR
+COVER_DIR=.
+
start-cover: all
- echo "cover:start(), rabbit_misc:enable_cover()." | $(ERL_CALL)
+ echo "cover:start(), rabbit_misc:enable_cover([\"$(COVER_DIR)\"])." | $(ERL_CALL)
stop-cover: all
echo "rabbit_misc:report_cover(), cover:stop()." | $(ERL_CALL)
@@ -133,7 +134,7 @@ srcdist: distclean
cp README.in $(TARGET_SRC_DIR)/README
elinks -dump -no-references -no-numbering $(WEB_URL)build-server.html \
>> $(TARGET_SRC_DIR)/BUILD
- sed -i.save 's/%%VERSION%%/$(VERSION)/' $(TARGET_SRC_DIR)/ebin/rabbit_app.in && rm -f $(TARGET_SRC_DIR)/ebin/rabbit_app.in.save
+ sed -i.save 's/%%VSN%%/$(VERSION)/' $(TARGET_SRC_DIR)/ebin/rabbit_app.in && rm -f $(TARGET_SRC_DIR)/ebin/rabbit_app.in.save
cp -r $(AMQP_CODEGEN_DIR)/* $(TARGET_SRC_DIR)/codegen/
cp codegen.py Makefile generate_app calculate-relative $(TARGET_SRC_DIR)
@@ -175,7 +176,7 @@ install: all docs_all install_dirs
for script in rabbitmq-env rabbitmq-server rabbitmqctl rabbitmq-multi rabbitmq-activate-plugins; do \
cp scripts/$$script $(TARGET_DIR)/sbin; \
[ -e $(SBIN_DIR)/$$script ] || ln -s $(SCRIPTS_REL_PATH)/$$script $(SBIN_DIR)/$$script; \
- done
+ done
for section in 1 5; do \
mkdir -p $(MAN_DIR)/man$$section; \
for manpage in docs/*.$$section.pod; do \
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in
index 8e1c890e..0057ea04 100644
--- a/ebin/rabbit_app.in
+++ b/ebin/rabbit_app.in
@@ -1,7 +1,7 @@
{application, rabbit, %% -*- erlang -*-
[{description, "RabbitMQ"},
{id, "RabbitMQ"},
- {vsn, "%%VERSION%%"},
+ {vsn, "%%VSN%%"},
{modules, []},
{registered, [rabbit_amqqueue_sup,
rabbit_log,
diff --git a/packaging/RPMS/Fedora/Makefile b/packaging/RPMS/Fedora/Makefile
index c74d4533..89b73841 100644
--- a/packaging/RPMS/Fedora/Makefile
+++ b/packaging/RPMS/Fedora/Makefile
@@ -1,7 +1,8 @@
-VERSION=0.0.0
-SOURCE_TARBALL_DIR=../../../dist
+TARBALL_DIR=../../../dist
+TARBALL=$(notdir $(wildcard $(TARBALL_DIR)/rabbitmq-server-[0-9.]*.tar.gz))
COMMON_DIR=../../common
-TARBALL=$(SOURCE_TARBALL_DIR)/rabbitmq-server-$(VERSION).tar.gz
+VERSION=$(shell echo $(TARBALL) | sed -e 's:rabbitmq-server-\(.*\)\.tar\.gz:\1:g')
+
TOP_DIR=$(shell pwd)
#Under debian we do not want to check build dependencies, since that
#only checks build-dependencies using rpms, not debs
@@ -23,7 +24,7 @@ rpms: clean server
prepare:
mkdir -p BUILD SOURCES SPECS SRPMS RPMS tmp
- cp $(TOP_DIR)/$(TARBALL) SOURCES
+ cp $(TARBALL_DIR)/$(TARBALL) SOURCES
cp rabbitmq-server.spec SPECS
sed -i 's|%%VERSION%%|$(VERSION)|;s|%%REQUIRES%%|$(REQUIRES)|' \
SPECS/rabbitmq-server.spec
diff --git a/packaging/RPMS/Fedora/init.d b/packaging/RPMS/Fedora/init.d
index 77a6a89a..21019c70 100644
--- a/packaging/RPMS/Fedora/init.d
+++ b/packaging/RPMS/Fedora/init.d
@@ -62,10 +62,12 @@ stop_rabbitmq () {
if [ $RETVAL = 0 ] ; then
$DAEMON stop_all > /var/log/rabbitmq/shutdown_log 2> /var/log/rabbitmq/shutdown_err
RETVAL=$?
- if [ $RETVAL != 0 ] ; then
- echo FAILED - check /var/log/rabbitmq/shutdown_log, _err
- else
+ if [ $RETVAL = 0 ] ; then
+ # Try to stop epmd if run by the rabbitmq user
+ pkill -u rabbitmq epmd || :
rm -rf $LOCK_FILE
+ else
+ echo FAILED - check /var/log/rabbitmq/shutdown_log, _err
fi
else
echo No nodes running
diff --git a/packaging/debs/Debian/Makefile b/packaging/debs/Debian/Makefile
index 67fabae0..7ab8b659 100644
--- a/packaging/debs/Debian/Makefile
+++ b/packaging/debs/Debian/Makefile
@@ -1,8 +1,9 @@
TARBALL_DIR=../../../dist
-TARBALL=$(shell (cd $(TARBALL_DIR); echo rabbitmq-server-[0-9]*.tar.gz))
+TARBALL=$(notdir $(wildcard $(TARBALL_DIR)/rabbitmq-server-[0-9.]*.tar.gz))
COMMON_DIR=../../common
-DEBIAN_ORIG_TARBALL=$(shell echo $(TARBALL) | sed -e 's:\(.*\)-\(.*\)\(\.tar\.gz\):\1_\2\.orig\3:g')
VERSION=$(shell echo $(TARBALL) | sed -e 's:rabbitmq-server-\(.*\)\.tar\.gz:\1:g')
+
+DEBIAN_ORIG_TARBALL=$(shell echo $(TARBALL) | sed -e 's:\(.*\)-\(.*\)\(\.tar\.gz\):\1_\2\.orig\3:g')
UNPACKED_DIR=rabbitmq-server-$(VERSION)
PACKAGENAME=rabbitmq-server
SIGNING_KEY_ID=056E8E56
diff --git a/packaging/debs/Debian/debian/init.d b/packaging/debs/Debian/debian/init.d
index a35a60ec..4a7909c5 100644
--- a/packaging/debs/Debian/debian/init.d
+++ b/packaging/debs/Debian/debian/init.d
@@ -53,7 +53,10 @@ stop_rabbitmq () {
if [ $RETVAL = 0 ] ; then
$DAEMON stop_all > /var/log/rabbitmq/shutdown_log 2> /var/log/rabbitmq/shutdown_err
RETVAL=$?
- if [ $RETVAL != 0 ] ; then
+ if [ $RETVAL = 0 ] ; then
+ # Try to stop epmd if run by the rabbitmq user
+ pkill -u rabbitmq epmd || :
+ else
echo FAILED - check /var/log/rabbitmq/shutdown_log, _err
fi
else
diff --git a/src/gen_server2.erl b/src/gen_server2.erl
index ba8becfc..36fb4fa8 100644
--- a/src/gen_server2.erl
+++ b/src/gen_server2.erl
@@ -1,4 +1,4 @@
-%% This file is a copy of gen_server.erl from the R11B-5 Erlang/OTP
+%% This file is a copy of gen_server.erl from the R13B-1 Erlang/OTP
%% distribution, with the following modifications:
%%
%% 1) the module name is gen_server2
@@ -21,6 +21,42 @@
%% higher priorities are processed before requests with lower
%% priorities. The default priority is 0.
%%
+%% 5) The callback module can optionally implement
+%% handle_pre_hibernate/1 and handle_post_hibernate/1. These will be
+%% called immediately prior to and post hibernation, respectively. If
+%% handle_pre_hibernate returns {hibernate, NewState} then the process
+%% will hibernate. If the module does not implement
+%% handle_pre_hibernate/1 then the default action is to hibernate.
+%%
+%% 6) init can return a 4th arg, {backoff, InitialTimeout,
+%% MinimumTimeout, DesiredHibernatePeriod} (all in
+%% milliseconds). Then, on all callbacks which can return a timeout
+%% (including init), timeout can be 'hibernate'. When this is the
+%% case, the current timeout value will be used (initially, the
+%% InitialTimeout supplied from init). After this timeout has
+%% occurred, hibernation will occur as normal. Upon awaking, a new
+%% current timeout value will be calculated.
+%%
+%% The purpose is that the gen_server2 takes care of adjusting the
+%% current timeout value such that the process will increase the
+%% timeout value repeatedly if it is unable to sleep for the
+%% DesiredHibernatePeriod. If it is able to sleep for the
+%% DesiredHibernatePeriod it will decrease the current timeout down to
+%% the MinimumTimeout, so that the process is put to sleep sooner (and
+%% hopefully stays asleep for longer). In short, should a process
+%% using this receive a burst of messages, it should not hibernate
+%% between those messages, but as the messages become less frequent,
+%% the process will not only hibernate, it will do so sooner after
+%% each message.
+%%
+%% When using this backoff mechanism, normal timeout values (i.e. not
+%% 'hibernate') can still be used, and if they are used then the
+%% handle_info(timeout, State) will be called as normal. In this case,
+%% returning 'hibernate' from handle_info(timeout, State) will not
+%% hibernate the process immediately, as it would if backoff wasn't
+%% being used. Instead it'll wait for the current timeout as described
+%% above.
+
%% All modifications are (C) 2009 LShift Ltd.
%% ``The contents of this file are subject to the Erlang Public License,
@@ -55,6 +91,7 @@
%%% init(Args)
%%% ==> {ok, State}
%%% {ok, State, Timeout}
+%%% {ok, State, Timeout, Backoff}
%%% ignore
%%% {stop, Reason}
%%%
@@ -86,6 +123,17 @@
%%%
%%% ==> ok
%%%
+%%% handle_pre_hibernate(State)
+%%%
+%%% ==> {hibernate, State}
+%%% {stop, Reason, State}
+%%% Reason = normal | shutdown | Term, terminate(State) is called
+%%%
+%%% handle_post_hibernate(State)
+%%%
+%%% ==> {noreply, State}
+%%% {stop, Reason, State}
+%%% Reason = normal | shutdown | Term, terminate(State) is called
%%%
%%% The work flow (of the server) can be described as follows:
%%%
@@ -116,7 +164,7 @@
cast/2, pcast/3, reply/2,
abcast/2, abcast/3,
multi_call/2, multi_call/3, multi_call/4,
- enter_loop/3, enter_loop/4, enter_loop/5]).
+ enter_loop/3, enter_loop/4, enter_loop/5, wake_hib/7]).
-export([behaviour_info/1]).
@@ -290,7 +338,7 @@ multi_call(Nodes, Name, Req, Timeout)
%%-----------------------------------------------------------------
-%% enter_loop(Mod, Options, State, <ServerName>, <TimeOut>) ->_
+%% enter_loop(Mod, Options, State, <ServerName>, <TimeOut>, <Backoff>) ->_
%%
%% Description: Makes an existing process into a gen_server.
%% The calling process will enter the gen_server receive
@@ -301,20 +349,30 @@ multi_call(Nodes, Name, Req, Timeout)
%% process, including registering a name for it.
%%-----------------------------------------------------------------
enter_loop(Mod, Options, State) ->
- enter_loop(Mod, Options, State, self(), infinity).
+ enter_loop(Mod, Options, State, self(), infinity, undefined).
+
+enter_loop(Mod, Options, State, Backoff = {backoff, _, _ , _}) ->
+ enter_loop(Mod, Options, State, self(), infinity, Backoff);
enter_loop(Mod, Options, State, ServerName = {_, _}) ->
- enter_loop(Mod, Options, State, ServerName, infinity);
+ enter_loop(Mod, Options, State, ServerName, infinity, undefined);
enter_loop(Mod, Options, State, Timeout) ->
- enter_loop(Mod, Options, State, self(), Timeout).
+ enter_loop(Mod, Options, State, self(), Timeout, undefined).
+
+enter_loop(Mod, Options, State, ServerName, Backoff = {backoff, _, _, _}) ->
+ enter_loop(Mod, Options, State, ServerName, infinity, Backoff);
enter_loop(Mod, Options, State, ServerName, Timeout) ->
+ enter_loop(Mod, Options, State, ServerName, Timeout, undefined).
+
+enter_loop(Mod, Options, State, ServerName, Timeout, Backoff) ->
Name = get_proc_name(ServerName),
Parent = get_parent(),
Debug = debug_options(Name, Options),
Queue = priority_queue:new(),
- loop(Parent, Name, State, Mod, Timeout, Queue, Debug).
+ Backoff1 = extend_backoff(Backoff),
+ loop(Parent, Name, State, Mod, Timeout, Backoff1, Queue, Debug).
%%%========================================================================
%%% Gen-callback functions
@@ -329,23 +387,37 @@ enter_loop(Mod, Options, State, ServerName, Timeout) ->
%%% ---------------------------------------------------
init_it(Starter, self, Name, Mod, Args, Options) ->
init_it(Starter, self(), Name, Mod, Args, Options);
-init_it(Starter, Parent, Name, Mod, Args, Options) ->
+init_it(Starter, Parent, Name0, Mod, Args, Options) ->
+ Name = name(Name0),
Debug = debug_options(Name, Options),
Queue = priority_queue:new(),
case catch Mod:init(Args) of
{ok, State} ->
proc_lib:init_ack(Starter, {ok, self()}),
- loop(Parent, Name, State, Mod, infinity, Queue, Debug);
+ loop(Parent, Name, State, Mod, infinity, undefined, Queue, Debug);
{ok, State, Timeout} ->
- proc_lib:init_ack(Starter, {ok, self()}),
- loop(Parent, Name, State, Mod, Timeout, Queue, Debug);
+ proc_lib:init_ack(Starter, {ok, self()}),
+ loop(Parent, Name, State, Mod, Timeout, undefined, Queue, Debug);
+ {ok, State, Timeout, Backoff = {backoff, _, _, _}} ->
+ Backoff1 = extend_backoff(Backoff),
+ proc_lib:init_ack(Starter, {ok, self()}),
+ loop(Parent, Name, State, Mod, Timeout, Backoff1, Queue, Debug);
{stop, Reason} ->
+ %% For consistency, we must make sure that the
+ %% registered name (if any) is unregistered before
+ %% the parent process is notified about the failure.
+ %% (Otherwise, the parent process could get
+ %% an 'already_started' error if it immediately
+ %% tried starting the process again.)
+ unregister_name(Name0),
proc_lib:init_ack(Starter, {error, Reason}),
exit(Reason);
ignore ->
+ unregister_name(Name0),
proc_lib:init_ack(Starter, ignore),
exit(normal);
{'EXIT', Reason} ->
+ unregister_name(Name0),
proc_lib:init_ack(Starter, {error, Reason}),
exit(Reason);
Else ->
@@ -354,33 +426,159 @@ init_it(Starter, Parent, Name, Mod, Args, Options) ->
exit(Error)
end.
+name({local,Name}) -> Name;
+name({global,Name}) -> Name;
+%% name(Pid) when is_pid(Pid) -> Pid;
+%% when R11 goes away, drop the line beneath and uncomment the line above
+name(Name) -> Name.
+
+unregister_name({local,Name}) ->
+ _ = (catch unregister(Name));
+unregister_name({global,Name}) ->
+ _ = global:unregister_name(Name);
+unregister_name(Pid) when is_pid(Pid) ->
+ Pid.
+
+extend_backoff(undefined) ->
+ undefined;
+extend_backoff({backoff, InitialTimeout, MinimumTimeout, DesiredHibPeriod}) ->
+ {backoff, InitialTimeout, MinimumTimeout, DesiredHibPeriod, now()}.
+
%%%========================================================================
%%% Internal functions
%%%========================================================================
%%% ---------------------------------------------------
%%% The MAIN loop.
%%% ---------------------------------------------------
-loop(Parent, Name, State, Mod, Time, Queue, Debug) ->
+loop(Parent, Name, State, Mod, hibernate, undefined, Queue, Debug) ->
+ pre_hibernate(Parent, Name, State, Mod, undefined, Queue, Debug);
+loop(Parent, Name, State, Mod, Time, TimeoutState, Queue, Debug) ->
+ process_next_msg(Parent, Name, State, Mod, Time, TimeoutState,
+ drain(Queue), Debug).
+
+drain(Queue) ->
receive
- Input -> loop(Parent, Name, State, Mod,
- Time, in(Input, Queue), Debug)
- after 0 ->
- case priority_queue:out(Queue) of
- {{value, Msg}, Queue1} ->
- process_msg(Parent, Name, State, Mod,
- Time, Queue1, Debug, Msg);
- {empty, Queue1} ->
- receive
- Input ->
- loop(Parent, Name, State, Mod,
- Time, in(Input, Queue1), Debug)
- after Time ->
- process_msg(Parent, Name, State, Mod,
- Time, Queue1, Debug, timeout)
+ Input -> drain(in(Input, Queue))
+ after 0 -> Queue
+ end.
+
+process_next_msg(Parent, Name, State, Mod, Time, TimeoutState, Queue, Debug) ->
+ case priority_queue:out(Queue) of
+ {{value, Msg}, Queue1} ->
+ process_msg(Parent, Name, State, Mod,
+ Time, TimeoutState, Queue1, Debug, Msg);
+ {empty, Queue1} ->
+ {Time1, HibOnTimeout}
+ = case {Time, TimeoutState} of
+ {hibernate, {backoff, Current, _Min, _Desired, _RSt}} ->
+ {Current, true};
+ {hibernate, _} ->
+ %% wake_hib/7 will set Time to hibernate. If
+ %% we were woken and didn't receive a msg
+ %% then we will get here and need a sensible
+ %% value for Time1, otherwise we crash.
+ %% R13B1 always waits infinitely when waking
+ %% from hibernation, so that's what we do
+ %% here too.
+ {infinity, false};
+ _ -> {Time, false}
+ end,
+ receive
+ Input ->
+ %% Time could be 'hibernate' here, so *don't* call loop
+ process_next_msg(
+ Parent, Name, State, Mod, Time, TimeoutState,
+ drain(in(Input, Queue1)), Debug)
+ after Time1 ->
+ case HibOnTimeout of
+ true ->
+ pre_hibernate(
+ Parent, Name, State, Mod, TimeoutState, Queue1,
+ Debug);
+ false ->
+ process_msg(
+ Parent, Name, State, Mod, Time, TimeoutState,
+ Queue1, Debug, timeout)
end
end
end.
+wake_hib(Parent, Name, State, Mod, TS, Queue, Debug) ->
+ TimeoutState1 = case TS of
+ undefined ->
+ undefined;
+ {SleptAt, TimeoutState} ->
+ adjust_timeout_state(SleptAt, now(), TimeoutState)
+ end,
+ post_hibernate(Parent, Name, State, Mod, TimeoutState1,
+ drain(Queue), Debug).
+
+hibernate(Parent, Name, State, Mod, TimeoutState, Queue, Debug) ->
+ TS = case TimeoutState of
+ undefined -> undefined;
+ {backoff, _, _, _, _} -> {now(), TimeoutState}
+ end,
+ proc_lib:hibernate(?MODULE, wake_hib, [Parent, Name, State, Mod,
+ TS, Queue, Debug]).
+
+pre_hibernate(Parent, Name, State, Mod, TimeoutState, Queue, Debug) ->
+ case erlang:function_exported(Mod, handle_pre_hibernate, 1) of
+ true ->
+ case catch Mod:handle_pre_hibernate(State) of
+ {hibernate, NState} ->
+ hibernate(Parent, Name, NState, Mod, TimeoutState, Queue,
+ Debug);
+ Reply ->
+ handle_common_termination(Reply, Name, pre_hibernate,
+ Mod, State, Debug)
+ end;
+ false ->
+ hibernate(Parent, Name, State, Mod, TimeoutState, Queue, Debug)
+ end.
+
+post_hibernate(Parent, Name, State, Mod, TimeoutState, Queue, Debug) ->
+ case erlang:function_exported(Mod, handle_post_hibernate, 1) of
+ true ->
+ case catch Mod:handle_post_hibernate(State) of
+ {noreply, NState} ->
+ process_next_msg(Parent, Name, NState, Mod, infinity,
+ TimeoutState, Queue, Debug);
+ {noreply, NState, Time} ->
+ process_next_msg(Parent, Name, NState, Mod, Time,
+ TimeoutState, Queue, Debug);
+ Reply ->
+ handle_common_termination(Reply, Name, post_hibernate,
+ Mod, State, Debug)
+ end;
+ false ->
+ %% use hibernate here, not infinity. This matches
+ %% R13B. The key is that we should be able to get through
+ %% to process_msg calling sys:handle_system_msg with Time
+ %% still set to hibernate, iff that msg is the very msg
+ %% that woke us up (or the first msg we receive after
+ %% waking up).
+ process_next_msg(Parent, Name, State, Mod, hibernate,
+ TimeoutState, Queue, Debug)
+ end.
+
+adjust_timeout_state(SleptAt, AwokeAt, {backoff, CurrentTO, MinimumTO,
+ DesiredHibPeriod, RandomState}) ->
+ NapLengthMicros = timer:now_diff(AwokeAt, SleptAt),
+ CurrentMicros = CurrentTO * 1000,
+ MinimumMicros = MinimumTO * 1000,
+ DesiredHibMicros = DesiredHibPeriod * 1000,
+ GapBetweenMessagesMicros = NapLengthMicros + CurrentMicros,
+ Base =
+ %% If enough time has passed between the last two messages then we
+ %% should consider sleeping sooner. Otherwise stay awake longer.
+ case GapBetweenMessagesMicros > (MinimumMicros + DesiredHibMicros) of
+ true -> lists:max([MinimumTO, CurrentTO div 2]);
+ false -> CurrentTO
+ end,
+ {Extra, RandomState1} = random:uniform_s(Base, RandomState),
+ CurrentTO1 = Base + Extra,
+ {backoff, CurrentTO1, MinimumTO, DesiredHibPeriod, RandomState1}.
+
in({'$gen_pcast', {Priority, Msg}}, Queue) ->
priority_queue:in({'$gen_cast', Msg}, Priority, Queue);
in({'$gen_pcall', From, {Priority, Msg}}, Queue) ->
@@ -388,19 +586,25 @@ in({'$gen_pcall', From, {Priority, Msg}}, Queue) ->
in(Input, Queue) ->
priority_queue:in(Input, Queue).
-process_msg(Parent, Name, State, Mod, Time, Queue, Debug, Msg) ->
+process_msg(Parent, Name, State, Mod, Time, TimeoutState, Queue,
+ Debug, Msg) ->
case Msg of
{system, From, Req} ->
- sys:handle_system_msg(Req, From, Parent, ?MODULE, Debug,
- [Name, State, Mod, Time, Queue]);
+ sys:handle_system_msg
+ (Req, From, Parent, ?MODULE, Debug,
+ [Name, State, Mod, Time, TimeoutState, Queue]);
+ %% gen_server puts Hib on the end as the 7th arg, but that
+ %% version of the function seems not to be documented so
+ %% leaving out for now.
{'EXIT', Parent, Reason} ->
terminate(Reason, Name, Msg, Mod, State, Debug);
_Msg when Debug =:= [] ->
- handle_msg(Msg, Parent, Name, State, Mod, Time, Queue);
+ handle_msg(Msg, Parent, Name, State, Mod, TimeoutState, Queue);
_Msg ->
Debug1 = sys:handle_debug(Debug, {?MODULE, print_event},
Name, {in, Msg}),
- handle_msg(Msg, Parent, Name, State, Mod, Time, Queue, Debug1)
+ handle_msg(Msg, Parent, Name, State, Mod, TimeoutState, Queue,
+ Debug1)
end.
%%% ---------------------------------------------------
@@ -598,87 +802,95 @@ dispatch(Info, Mod, State) ->
Mod:handle_info(Info, State).
handle_msg({'$gen_call', From, Msg},
- Parent, Name, State, Mod, _Time, Queue) ->
+ Parent, Name, State, Mod, TimeoutState, Queue) ->
case catch Mod:handle_call(Msg, From, State) of
{reply, Reply, NState} ->
reply(From, Reply),
- loop(Parent, Name, NState, Mod, infinity, Queue, []);
+ loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, []);
{reply, Reply, NState, Time1} ->
reply(From, Reply),
- loop(Parent, Name, NState, Mod, Time1, Queue, []);
+ loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, []);
{noreply, NState} ->
- loop(Parent, Name, NState, Mod, infinity, Queue, []);
+ loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, []);
{noreply, NState, Time1} ->
- loop(Parent, Name, NState, Mod, Time1, Queue, []);
+ loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, []);
{stop, Reason, Reply, NState} ->
{'EXIT', R} =
(catch terminate(Reason, Name, Msg, Mod, NState, [])),
reply(From, Reply),
exit(R);
- Other -> handle_common_reply(Other,
- Parent, Name, Msg, Mod, State, Queue)
+ Other -> handle_common_reply(Other, Parent, Name, Msg, Mod, State,
+ TimeoutState, Queue)
end;
handle_msg(Msg,
- Parent, Name, State, Mod, _Time, Queue) ->
+ Parent, Name, State, Mod, TimeoutState, Queue) ->
Reply = (catch dispatch(Msg, Mod, State)),
- handle_common_reply(Reply, Parent, Name, Msg, Mod, State, Queue).
+ handle_common_reply(Reply, Parent, Name, Msg, Mod, State,
+ TimeoutState, Queue).
handle_msg({'$gen_call', From, Msg},
- Parent, Name, State, Mod, _Time, Queue, Debug) ->
+ Parent, Name, State, Mod, TimeoutState, Queue, Debug) ->
case catch Mod:handle_call(Msg, From, State) of
{reply, Reply, NState} ->
Debug1 = reply(Name, From, Reply, NState, Debug),
- loop(Parent, Name, NState, Mod, infinity, Queue, Debug1);
+ loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue,
+ Debug1);
{reply, Reply, NState, Time1} ->
Debug1 = reply(Name, From, Reply, NState, Debug),
- loop(Parent, Name, NState, Mod, Time1, Queue, Debug1);
+ loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, Debug1);
{noreply, NState} ->
Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name,
{noreply, NState}),
- loop(Parent, Name, NState, Mod, infinity, Queue, Debug1);
+ loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue,
+ Debug1);
{noreply, NState, Time1} ->
Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name,
{noreply, NState}),
- loop(Parent, Name, NState, Mod, Time1, Queue, Debug1);
+ loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, Debug1);
{stop, Reason, Reply, NState} ->
{'EXIT', R} =
(catch terminate(Reason, Name, Msg, Mod, NState, Debug)),
reply(Name, From, Reply, NState, Debug),
exit(R);
Other ->
- handle_common_reply(Other,
- Parent, Name, Msg, Mod, State, Queue, Debug)
+ handle_common_reply(Other, Parent, Name, Msg, Mod, State,
+ TimeoutState, Queue, Debug)
end;
handle_msg(Msg,
- Parent, Name, State, Mod, _Time, Queue, Debug) ->
+ Parent, Name, State, Mod, TimeoutState, Queue, Debug) ->
Reply = (catch dispatch(Msg, Mod, State)),
- handle_common_reply(Reply,
- Parent, Name, Msg, Mod, State, Queue, Debug).
+ handle_common_reply(Reply, Parent, Name, Msg, Mod, State,
+ TimeoutState, Queue, Debug).
-handle_common_reply(Reply, Parent, Name, Msg, Mod, State, Queue) ->
+handle_common_reply(Reply, Parent, Name, Msg, Mod, State,
+ TimeoutState, Queue) ->
case Reply of
{noreply, NState} ->
- loop(Parent, Name, NState, Mod, infinity, Queue, []);
+ loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, []);
{noreply, NState, Time1} ->
- loop(Parent, Name, NState, Mod, Time1, Queue, []);
- {stop, Reason, NState} ->
- terminate(Reason, Name, Msg, Mod, NState, []);
- {'EXIT', What} ->
- terminate(What, Name, Msg, Mod, State, []);
- _ ->
- terminate({bad_return_value, Reply}, Name, Msg, Mod, State, [])
+ loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, []);
+ _ ->
+ handle_common_termination(Reply, Name, Msg, Mod, State, [])
end.
-handle_common_reply(Reply, Parent, Name, Msg, Mod, State, Queue, Debug) ->
+handle_common_reply(Reply, Parent, Name, Msg, Mod, State, TimeoutState, Queue,
+ Debug) ->
case Reply of
{noreply, NState} ->
Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name,
{noreply, NState}),
- loop(Parent, Name, NState, Mod, infinity, Queue, Debug1);
+ loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue,
+ Debug1);
{noreply, NState, Time1} ->
Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name,
{noreply, NState}),
- loop(Parent, Name, NState, Mod, Time1, Queue, Debug1);
+ loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, Debug1);
+ _ ->
+ handle_common_termination(Reply, Name, Msg, Mod, State, Debug)
+ end.
+
+handle_common_termination(Reply, Name, Msg, Mod, State, Debug) ->
+ case Reply of
{stop, Reason, NState} ->
terminate(Reason, Name, Msg, Mod, NState, Debug);
{'EXIT', What} ->
@@ -696,16 +908,24 @@ reply(Name, {To, Tag}, Reply, State, Debug) ->
%%-----------------------------------------------------------------
%% Callback functions for system messages handling.
%%-----------------------------------------------------------------
-system_continue(Parent, Debug, [Name, State, Mod, Time, Queue]) ->
- loop(Parent, Name, State, Mod, Time, Queue, Debug).
+system_continue(Parent, Debug, [Name, State, Mod, Time, TimeoutState, Queue]) ->
+ loop(Parent, Name, State, Mod, Time, TimeoutState, Queue, Debug).
-system_terminate(Reason, _Parent, Debug, [Name, State, Mod, _Time, _Queue]) ->
+-ifdef(use_specs).
+-spec system_terminate(_, _, _, [_]) -> no_return().
+-endif.
+
+system_terminate(Reason, _Parent, Debug, [Name, State, Mod, _Time,
+ _TimeoutState, _Queue]) ->
terminate(Reason, Name, [], Mod, State, Debug).
-system_code_change([Name, State, Mod, Time, Queue], _Module, OldVsn, Extra) ->
+system_code_change([Name, State, Mod, Time, TimeoutState, Queue], _Module,
+ OldVsn, Extra) ->
case catch Mod:code_change(OldVsn, State, Extra) of
- {ok, NewState} -> {ok, [Name, NewState, Mod, Time, Queue]};
- Else -> Else
+ {ok, NewState} ->
+ {ok, [Name, NewState, Mod, Time, TimeoutState, Queue]};
+ Else ->
+ Else
end.
%%-----------------------------------------------------------------
@@ -747,6 +967,8 @@ terminate(Reason, Name, Msg, Mod, State, Debug) ->
exit(normal);
shutdown ->
exit(shutdown);
+ {shutdown,_}=Shutdown ->
+ exit(Shutdown);
_ ->
error_info(Reason, Name, Msg, State, Debug),
exit(Reason)
@@ -871,8 +1093,8 @@ name_to_pid(Name) ->
%% Status information
%%-----------------------------------------------------------------
format_status(Opt, StatusData) ->
- [PDict, SysState, Parent, Debug, [Name, State, Mod, _Time, Queue]] =
- StatusData,
+ [PDict, SysState, Parent, Debug,
+ [Name, State, Mod, _Time, _TimeoutState, Queue]] = StatusData,
NameTag = if is_pid(Name) ->
pid_to_list(Name);
is_atom(Name) ->
diff --git a/src/priority_queue.erl b/src/priority_queue.erl
index 732757c4..c74b39a9 100644
--- a/src/priority_queue.erl
+++ b/src/priority_queue.erl
@@ -55,7 +55,8 @@
-module(priority_queue).
--export([new/0, is_queue/1, is_empty/1, len/1, to_list/1, in/2, in/3, out/1]).
+-export([new/0, is_queue/1, is_empty/1, len/1, to_list/1, in/2, in/3,
+ out/1, join/2]).
%%----------------------------------------------------------------------------
@@ -73,6 +74,7 @@
-spec(in/2 :: (any(), pqueue()) -> pqueue()).
-spec(in/3 :: (any(), priority(), pqueue()) -> pqueue()).
-spec(out/1 :: (pqueue()) -> {empty | {value, any()}, pqueue()}).
+-spec(join/2 :: (pqueue(), pqueue()) -> pqueue()).
-endif.
@@ -147,6 +149,42 @@ out({pqueue, [{P, Q} | Queues]}) ->
end,
{R, NewQ}.
+join(A, {queue, [], []}) ->
+ A;
+join({queue, [], []}, B) ->
+ B;
+join({queue, AIn, AOut}, {queue, BIn, BOut}) ->
+ {queue, BIn, AOut ++ lists:reverse(AIn, BOut)};
+join(A = {queue, _, _}, {pqueue, BPQ}) ->
+ {Pre, Post} = lists:splitwith(fun ({P, _}) -> P < 0 end, BPQ),
+ Post1 = case Post of
+ [] -> [ {0, A} ];
+ [ {0, ZeroQueue} | Rest ] -> [ {0, join(A, ZeroQueue)} | Rest ];
+ _ -> [ {0, A} | Post ]
+ end,
+ {pqueue, Pre ++ Post1};
+join({pqueue, APQ}, B = {queue, _, _}) ->
+ {Pre, Post} = lists:splitwith(fun ({P, _}) -> P < 0 end, APQ),
+ Post1 = case Post of
+ [] -> [ {0, B} ];
+ [ {0, ZeroQueue} | Rest ] -> [ {0, join(ZeroQueue, B)} | Rest ];
+ _ -> [ {0, B} | Post ]
+ end,
+ {pqueue, Pre ++ Post1};
+join({pqueue, APQ}, {pqueue, BPQ}) ->
+ {pqueue, merge(APQ, BPQ, [])}.
+
+merge([], BPQ, Acc) ->
+ lists:reverse(Acc, BPQ);
+merge(APQ, [], Acc) ->
+ lists:reverse(Acc, APQ);
+merge([{P, A}|As], [{P, B}|Bs], Acc) ->
+ merge(As, Bs, [ {P, join(A, B)} | Acc ]);
+merge([{PA, A}|As], Bs = [{PB, _}|_], Acc) when PA < PB ->
+ merge(As, Bs, [ {PA, A} | Acc ]);
+merge(As = [{_, _}|_], [{PB, B}|Bs], Acc) ->
+ merge(As, Bs, [ {PB, B} | Acc ]).
+
r2f([]) -> {queue, [], []};
r2f([_] = R) -> {queue, [], R};
r2f([X,Y]) -> {queue, [X], [Y]};
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 196212ea..b0d62b5a 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -133,6 +133,7 @@ start(normal, []) ->
{"core processes",
fun () ->
ok = start_child(rabbit_log),
+ ok = rabbit_hooks:start(),
ok = rabbit_amqqueue:start(),
@@ -207,8 +208,21 @@ log_location(Type) ->
print_banner() ->
{ok, Product} = application:get_key(id),
{ok, Version} = application:get_key(vsn),
- io:format("~s ~s (AMQP ~p-~p)~n~s~n~s~n~n",
- [Product, Version,
+ ProductLen = string:len(Product),
+ io:format("~n"
+ "+---+ +---+~n"
+ "| | | |~n"
+ "| | | |~n"
+ "| | | |~n"
+ "| +---+ +-------+~n"
+ "| |~n"
+ "| ~s +---+ |~n"
+ "| | | |~n"
+ "| ~s +---+ |~n"
+ "| |~n"
+ "+-------------------+~n"
+ "AMQP ~p-~p~n~s~n~s~n~n",
+ [Product, string:right([$v|Version], ProductLen),
?PROTOCOL_VERSION_MAJOR, ?PROTOCOL_VERSION_MINOR,
?COPYRIGHT_MESSAGE, ?INFORMATION_MESSAGE]),
Settings = [{"node", node()},
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 198e2782..f05f7880 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -51,8 +51,6 @@
-include("rabbit.hrl").
-include_lib("stdlib/include/qlc.hrl").
--define(CALL_TIMEOUT, 5000).
-
%%----------------------------------------------------------------------------
-ifdef(use_specs).
@@ -305,10 +303,10 @@ basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) ->
infinity).
notify_sent(QPid, ChPid) ->
- gen_server2:cast(QPid, {notify_sent, ChPid}).
+ gen_server2:pcast(QPid, 8, {notify_sent, ChPid}).
unblock(QPid, ChPid) ->
- gen_server2:cast(QPid, {unblock, ChPid}).
+ gen_server2:pcast(QPid, 8, {unblock, ChPid}).
internal_delete(QueueName) ->
rabbit_misc:execute_mnesia_transaction(
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index cf0ef44f..fe2e8509 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -36,7 +36,8 @@
-behaviour(gen_server2).
-define(UNSENT_MESSAGE_LIMIT, 100).
--define(HIBERNATE_AFTER, 1000).
+-define(HIBERNATE_AFTER_MIN, 1000).
+-define(DESIRED_HIBERNATE, 10000).
-export([start_link/1]).
@@ -101,7 +102,8 @@ init(Q) ->
next_msg_id = 1,
message_buffer = queue:new(),
active_consumers = queue:new(),
- blocked_consumers = queue:new()}, ?HIBERNATE_AFTER}.
+ blocked_consumers = queue:new()}, hibernate,
+ {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
terminate(_Reason, State) ->
%% FIXME: How do we cancel active subscriptions?
@@ -116,9 +118,9 @@ code_change(_OldVsn, State, _Extra) ->
%%----------------------------------------------------------------------------
-reply(Reply, NewState) -> {reply, Reply, NewState, ?HIBERNATE_AFTER}.
+reply(Reply, NewState) -> {reply, Reply, NewState, hibernate}.
-noreply(NewState) -> {noreply, NewState, ?HIBERNATE_AFTER}.
+noreply(NewState) -> {noreply, NewState, hibernate}.
lookup_ch(ChPid) ->
case get({ch, ChPid}) of
@@ -813,11 +815,6 @@ handle_info({'DOWN', MonitorRef, process, DownPid, _Reason},
handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) ->
handle_ch_down(DownPid, State);
-handle_info(timeout, State) ->
- %% TODO: Once we drop support for R11B-5, we can change this to
- %% {noreply, State, hibernate};
- proc_lib:hibernate(gen_server2, enter_loop, [?MODULE, [], State]);
-
handle_info(Info, State) ->
?LOGDEBUG("Info in queue: ~p~n", [Info]),
{stop, {unhandled_info, Info}, State}.
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index 2dc619c1..4033aaaf 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -35,6 +35,7 @@
-export([publish/1, message/4, properties/1, delivery/4]).
-export([publish/4, publish/7]).
+-export([build_content/2, from_content/1]).
%%----------------------------------------------------------------------------
@@ -53,6 +54,8 @@
-spec(publish/7 :: (exchange_name(), routing_key(), bool(), bool(),
maybe(txn()), properties_input(), binary()) ->
publish_result()).
+-spec(build_content/2 :: (amqp_properties(), binary()) -> content()).
+-spec(from_content/1 :: (content()) -> {amqp_properties(), binary()}).
-endif.
@@ -72,16 +75,26 @@ delivery(Mandatory, Immediate, Txn, Message) ->
#delivery{mandatory = Mandatory, immediate = Immediate, txn = Txn,
sender = self(), message = Message}.
+build_content(Properties, BodyBin) ->
+ {ClassId, _MethodId} = rabbit_framing:method_id('basic.publish'),
+ #content{class_id = ClassId,
+ properties = Properties,
+ properties_bin = none,
+ payload_fragments_rev = [BodyBin]}.
+
+from_content(Content) ->
+ #content{class_id = ClassId,
+ properties = Props,
+ payload_fragments_rev = FragmentsRev} =
+ rabbit_binary_parser:ensure_content_decoded(Content),
+ {ClassId, _MethodId} = rabbit_framing:method_id('basic.publish'),
+ {Props, list_to_binary(lists:reverse(FragmentsRev))}.
+
message(ExchangeName, RoutingKeyBin, RawProperties, BodyBin) ->
Properties = properties(RawProperties),
- {ClassId, _MethodId} = rabbit_framing:method_id('basic.publish'),
- Content = #content{class_id = ClassId,
- properties = Properties,
- properties_bin = none,
- payload_fragments_rev = [BodyBin]},
#basic_message{exchange_name = ExchangeName,
routing_key = RoutingKeyBin,
- content = Content,
+ content = build_content(Properties, BodyBin),
persistent_key = none}.
properties(P = #'P_basic'{}) ->
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 3089bb62..16b7c938 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -89,7 +89,7 @@ deliver(Pid, ConsumerTag, AckRequired, Msg) ->
gen_server2:cast(Pid, {deliver, ConsumerTag, AckRequired, Msg}).
conserve_memory(Pid, Conserve) ->
- gen_server2:cast(Pid, {conserve_memory, Conserve}).
+ gen_server2:pcast(Pid, 9, {conserve_memory, Conserve}).
%%---------------------------------------------------------------------------
@@ -157,14 +157,16 @@ handle_cast({conserve_memory, Conserve}, State) ->
State#ch.writer_pid, #'channel.flow'{active = not(Conserve)}),
noreply(State).
+handle_info({'EXIT', WriterPid, Reason = {writer, send_failed, _Error}},
+ State = #ch{writer_pid = WriterPid}) ->
+ State#ch.reader_pid ! {channel_exit, State#ch.channel, Reason},
+ {stop, normal, State};
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State};
handle_info(timeout, State) ->
ok = clear_permission_cache(),
- %% TODO: Once we drop support for R11B-5, we can change this to
- %% {noreply, State, hibernate};
- proc_lib:hibernate(gen_server2, enter_loop, [?MODULE, [], State]).
+ {noreply, State, hibernate}.
terminate(_Reason, #ch{writer_pid = WriterPid, limiter_pid = LimiterPid,
state = terminating}) ->
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index 6649899a..37e4d189 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -36,7 +36,7 @@
-record(params, {quiet, node, command, args}).
--define(RPC_TIMEOUT, 30000).
+-define(RPC_TIMEOUT, infinity).
%%----------------------------------------------------------------------------
diff --git a/src/rabbit_guid.erl b/src/rabbit_guid.erl
index 2be00503..b789fbd1 100644
--- a/src/rabbit_guid.erl
+++ b/src/rabbit_guid.erl
@@ -42,6 +42,7 @@
terminate/2, code_change/3]).
-define(SERVER, ?MODULE).
+-define(SERIAL_FILENAME, "rabbit_serial").
-record(state, {serial}).
@@ -59,17 +60,28 @@
%%----------------------------------------------------------------------------
start_link() ->
- %% The persister can get heavily loaded, and we don't want that to
- %% impact guid generation. We therefore keep the serial in a
- %% separate process rather than calling rabbit_persister:serial/0
- %% directly in the functions below.
gen_server:start_link({local, ?SERVER}, ?MODULE,
- [rabbit_persister:serial()], []).
+ [update_disk_serial()], []).
+
+update_disk_serial() ->
+ Filename = filename:join(rabbit_mnesia:dir(), ?SERIAL_FILENAME),
+ Serial = case rabbit_misc:read_term_file(Filename) of
+ {ok, [Num]} -> Num;
+ {error, enoent} -> rabbit_persister:serial();
+ {error, Reason} ->
+ throw({error, {cannot_read_serial_file, Filename, Reason}})
+ end,
+ case rabbit_misc:write_term_file(Filename, [Serial + 1]) of
+ ok -> ok;
+ {error, Reason1} ->
+ throw({error, {cannot_write_serial_file, Filename, Reason1}})
+ end,
+ Serial.
%% generate a guid that is monotonically increasing per process.
%%
%% The id is only unique within a single cluster and as long as the
-%% persistent message store hasn't been deleted.
+%% serial store hasn't been deleted.
guid() ->
%% We don't use erlang:now() here because a) it may return
%% duplicates when the system clock has been rewound prior to a
@@ -77,7 +89,7 @@ guid() ->
%% now() to move ahead of the system time), and b) it is really
%% slow since it takes a global lock and makes a system call.
%%
- %% rabbit_persister:serial/0, in combination with self/0 (which
+ %% A persisted serial number, in combination with self/0 (which
%% includes the node name) uniquely identifies a process in space
%% and time. We combine that with a process-local counter to give
%% us a GUID that is monotonically increasing per process.
diff --git a/src/rabbit_hooks.erl b/src/rabbit_hooks.erl
new file mode 100644
index 00000000..b3d271c2
--- /dev/null
+++ b/src/rabbit_hooks.erl
@@ -0,0 +1,73 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2009 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_hooks).
+
+-export([start/0]).
+-export([subscribe/3, unsubscribe/2, trigger/2, notify_remote/5]).
+
+-define(TableName, rabbit_hooks).
+
+-ifdef(use_specs).
+
+-spec(start/0 :: () -> 'ok').
+-spec(subscribe/3 :: (atom(), atom(), {atom(), atom(), list()}) -> 'ok').
+-spec(unsubscribe/2 :: (atom(), atom()) -> 'ok').
+-spec(trigger/2 :: (atom(), list()) -> 'ok').
+-spec(notify_remote/5 :: (atom(), atom(), list(), pid(), list()) -> 'ok').
+
+-endif.
+
+start() ->
+ ets:new(?TableName, [bag, public, named_table]),
+ ok.
+
+subscribe(Hook, HandlerName, Handler) ->
+ ets:insert(?TableName, {Hook, HandlerName, Handler}),
+ ok.
+
+unsubscribe(Hook, HandlerName) ->
+ ets:match_delete(?TableName, {Hook, HandlerName, '_'}),
+ ok.
+
+trigger(Hook, Args) ->
+ Hooks = ets:lookup(?TableName, Hook),
+ [case catch apply(M, F, [Hook, Name, Args | A]) of
+ {'EXIT', Reason} ->
+ rabbit_log:warning("Failed to execute handler ~p for hook ~p: ~p",
+ [Name, Hook, Reason]);
+ _ -> ok
+ end || {_, Name, {M, F, A}} <- Hooks],
+ ok.
+
+notify_remote(Hook, HandlerName, Args, Pid, PidArgs) ->
+ Pid ! {rabbitmq_hook, [Hook, HandlerName, Args | PidArgs]},
+ ok.
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 72e16f0f..95a274e3 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -41,6 +41,7 @@
-export([dirty_read/1]).
-export([r/3, r/2, r_arg/4, rs/1]).
-export([enable_cover/0, report_cover/0]).
+-export([enable_cover/1, report_cover/1]).
-export([throw_on_error/2, with_exit_handler/2, filter_exit_map/2]).
-export([with_user/2, with_vhost/2, with_user_and_vhost/3]).
-export([execute_mnesia_transaction/1]).
@@ -49,9 +50,11 @@
-export([intersperse/2, upmap/2, map_in_order/2]).
-export([table_foreach/2]).
-export([dirty_read_all/1, dirty_foreach_key/2, dirty_dump_log/1]).
+-export([read_term_file/1, write_term_file/2]).
-export([append_file/2, ensure_parent_dirs_exist/1]).
-export([format_stderr/2]).
-export([start_applications/1, stop_applications/1]).
+-export([unfold/2, ceil/1]).
-import(mnesia).
-import(lists).
@@ -64,6 +67,8 @@
-include_lib("kernel/include/inet.hrl").
+-type(ok_or_error() :: 'ok' | {'error', any()}).
+
-spec(method_record_type/1 :: (tuple()) -> atom()).
-spec(polite_pause/0 :: () -> 'done').
-spec(polite_pause/1 :: (non_neg_integer()) -> 'done').
@@ -87,8 +92,10 @@
-spec(r_arg/4 :: (vhost() | r(atom()), K, amqp_table(), binary()) ->
undefined | r(K) when is_subtype(K, atom())).
-spec(rs/1 :: (r(atom())) -> string()).
--spec(enable_cover/0 :: () -> 'ok' | {'error', any()}).
+-spec(enable_cover/0 :: () -> ok_or_error()).
-spec(report_cover/0 :: () -> 'ok').
+-spec(enable_cover/1 :: (string()) -> ok_or_error()).
+-spec(report_cover/1 :: (string()) -> 'ok').
-spec(throw_on_error/2 ::
(atom(), thunk({error, any()} | {ok, A} | A)) -> A).
-spec(with_exit_handler/2 :: (thunk(A), thunk(A)) -> A).
@@ -97,7 +104,7 @@
-spec(with_vhost/2 :: (vhost(), thunk(A)) -> A).
-spec(with_user_and_vhost/3 :: (username(), vhost(), thunk(A)) -> A).
-spec(execute_mnesia_transaction/1 :: (thunk(A)) -> A).
--spec(ensure_ok/2 :: ('ok' | {'error', any()}, atom()) -> 'ok').
+-spec(ensure_ok/2 :: (ok_or_error(), atom()) -> 'ok').
-spec(localnode/1 :: (atom()) -> erlang_node()).
-spec(tcp_name/3 :: (atom(), ip_address(), ip_port()) -> atom()).
-spec(intersperse/2 :: (A, [A]) -> [A]).
@@ -107,12 +114,16 @@
-spec(dirty_read_all/1 :: (atom()) -> [any()]).
-spec(dirty_foreach_key/2 :: (fun ((any()) -> any()), atom()) ->
'ok' | 'aborted').
--spec(dirty_dump_log/1 :: (string()) -> 'ok' | {'error', any()}).
--spec(append_file/2 :: (string(), string()) -> 'ok' | {'error', any()}).
+-spec(dirty_dump_log/1 :: (string()) -> ok_or_error()).
+-spec(read_term_file/1 :: (string()) -> {'ok', [any()]} | {'error', any()}).
+-spec(write_term_file/2 :: (string(), [any()]) -> ok_or_error()).
+-spec(append_file/2 :: (string(), string()) -> ok_or_error()).
-spec(ensure_parent_dirs_exist/1 :: (string()) -> 'ok').
-spec(format_stderr/2 :: (string(), [any()]) -> 'ok').
-spec(start_applications/1 :: ([atom()]) -> 'ok').
-spec(stop_applications/1 :: ([atom()]) -> 'ok').
+-spec(unfold/2 :: (fun ((A) -> ({'true', B, A} | 'false')), A) -> {[B], A}).
+-spec(ceil/1 :: (number()) -> number()).
-endif.
@@ -188,17 +199,27 @@ rs(#resource{virtual_host = VHostPath, kind = Kind, name = Name}) ->
[Kind, Name, VHostPath])).
enable_cover() ->
- case cover:compile_beam_directory("ebin") of
+ enable_cover(".").
+
+enable_cover([Root]) when is_atom(Root) ->
+ enable_cover(atom_to_list(Root));
+enable_cover(Root) ->
+ case cover:compile_beam_directory(filename:join(Root, "ebin")) of
{error,Reason} -> {error,Reason};
_ -> ok
end.
report_cover() ->
- Dir = "cover/",
- ok = filelib:ensure_dir(Dir),
+ report_cover(".").
+
+report_cover([Root]) when is_atom(Root) ->
+ report_cover(atom_to_list(Root));
+report_cover(Root) ->
+ Dir = filename:join(Root, "cover"),
+ ok = filelib:ensure_dir(filename:join(Dir,"junk")),
lists:foreach(fun(F) -> file:delete(F) end,
- filelib:wildcard(Dir ++ "*.html")),
- {ok, SummaryFile} = file:open(Dir ++ "summary.txt", [write]),
+ filelib:wildcard(filename:join(Dir, "*.html"))),
+ {ok, SummaryFile} = file:open(filename:join(Dir, "summary.txt"), [write]),
{CT, NCT} =
lists:foldl(
fun(M,{CovTot, NotCovTot}) ->
@@ -207,7 +228,7 @@ report_cover() ->
Cov, NotCov, M),
{ok,_} = cover:analyze_to_file(
M,
- Dir ++ atom_to_list(M) ++ ".html",
+ filename:join(Dir, atom_to_list(M) ++ ".html"),
[html]),
{CovTot+Cov, NotCovTot+NotCov}
end,
@@ -347,7 +368,9 @@ dirty_foreach_key1(F, TableName, K) ->
end.
dirty_dump_log(FileName) ->
- {ok, LH} = disk_log:open([{name, dirty_dump_log}, {mode, read_only}, {file, FileName}]),
+ {ok, LH} = disk_log:open([{name, dirty_dump_log},
+ {mode, read_only},
+ {file, FileName}]),
dirty_dump_log1(LH, disk_log:chunk(LH, start)),
disk_log:close(LH).
@@ -361,6 +384,12 @@ dirty_dump_log1(LH, {K, Terms, BadBytes}) ->
dirty_dump_log1(LH, disk_log:chunk(LH, K)).
+read_term_file(File) -> file:consult(File).
+
+write_term_file(File, Terms) ->
+ file:write_file(File, list_to_binary([io_lib:format("~w.~n", [Term]) ||
+ Term <- Terms])).
+
append_file(File, Suffix) ->
case file:read_file_info(File) of
{ok, FInfo} -> append_file(File, FInfo#file_info.size, Suffix);
@@ -431,3 +460,18 @@ stop_applications(Apps) ->
cannot_stop_application,
Apps).
+unfold(Fun, Init) ->
+ unfold(Fun, [], Init).
+
+unfold(Fun, Acc, Init) ->
+ case Fun(Init) of
+ {true, E, I} -> unfold(Fun, [E|Acc], I);
+ false -> {Acc, Init}
+ end.
+
+ceil(N) ->
+ T = trunc(N),
+ case N - T of
+ 0 -> N;
+ _ -> 1 + T
+ end.
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 575ecb0a..74856c41 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -192,28 +192,16 @@ cluster_nodes_config_filename() ->
create_cluster_nodes_config(ClusterNodes) ->
FileName = cluster_nodes_config_filename(),
- Handle = case file:open(FileName, [write]) of
- {ok, Device} -> Device;
- {error, Reason} ->
- throw({error, {cannot_create_cluster_nodes_config,
- FileName, Reason}})
- end,
- try
- ok = io:write(Handle, ClusterNodes),
- ok = io:put_chars(Handle, [$.])
- after
- case file:close(Handle) of
- ok -> ok;
- {error, Reason1} ->
- throw({error, {cannot_close_cluster_nodes_config,
- FileName, Reason1}})
- end
- end,
- ok.
+ case rabbit_misc:write_term_file(FileName, [ClusterNodes]) of
+ ok -> ok;
+ {error, Reason} ->
+ throw({error, {cannot_create_cluster_nodes_config,
+ FileName, Reason}})
+ end.
read_cluster_nodes_config() ->
FileName = cluster_nodes_config_filename(),
- case file:consult(FileName) of
+ case rabbit_misc:read_term_file(FileName) of
{ok, [ClusterNodes]} -> ClusterNodes;
{error, enoent} ->
case application:get_env(cluster_config) of
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 426b99eb..7be92812 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -286,7 +286,7 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) ->
%% since this termination is initiated by our parent it is
%% probably more important to exit quickly.
exit(Reason);
- {'EXIT', _Pid, E = {writer, send_failed, _Error}} ->
+ {channel_exit, _Chan, E = {writer, send_failed, _Error}} ->
throw(E);
{channel_exit, Channel, Reason} ->
mainloop(Parent, Deb, handle_channel_exit(Channel, Reason, State));
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 01757509..8f278c2d 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -33,6 +33,9 @@
-export([all_tests/0, test_parsing/0]).
+%% Exported so the hook mechanism can call back
+-export([handle_hook/3, bad_handle_hook/3, extra_arg_hook/5]).
+
-import(lists).
-include("rabbit.hrl").
@@ -46,6 +49,7 @@ test_content_prop_roundtrip(Datum, Binary) ->
all_tests() ->
passed = test_priority_queue(),
+ passed = test_unfold(),
passed = test_parsing(),
passed = test_topic_matching(),
passed = test_log_management(),
@@ -54,6 +58,7 @@ all_tests() ->
passed = test_cluster_management(),
passed = test_user_management(),
passed = test_server_status(),
+ passed = test_hooks(),
passed.
test_priority_queue() ->
@@ -71,7 +76,8 @@ test_priority_queue() ->
%% 1-element priority Q
Q1 = priority_queue:in(foo, 1, priority_queue:new()),
- {true, false, 1, [{1, foo}], [foo]} = test_priority_queue(Q1),
+ {true, false, 1, [{1, foo}], [foo]} =
+ test_priority_queue(Q1),
%% 2-element same-priority Q
Q2 = priority_queue:in(bar, 1, Q1),
@@ -87,6 +93,71 @@ test_priority_queue() ->
Q4 = priority_queue:in(foo, -1, priority_queue:new()),
{true, false, 1, [{-1, foo}], [foo]} = test_priority_queue(Q4),
+ %% merge 2 * 1-element no-priority Qs
+ Q5 = priority_queue:join(priority_queue:in(foo, Q),
+ priority_queue:in(bar, Q)),
+ {true, false, 2, [{0, foo}, {0, bar}], [foo, bar]} =
+ test_priority_queue(Q5),
+
+ %% merge 1-element no-priority Q with 1-element priority Q
+ Q6 = priority_queue:join(priority_queue:in(foo, Q),
+ priority_queue:in(bar, 1, Q)),
+ {true, false, 2, [{1, bar}, {0, foo}], [bar, foo]} =
+ test_priority_queue(Q6),
+
+ %% merge 1-element priority Q with 1-element no-priority Q
+ Q7 = priority_queue:join(priority_queue:in(foo, 1, Q),
+ priority_queue:in(bar, Q)),
+ {true, false, 2, [{1, foo}, {0, bar}], [foo, bar]} =
+ test_priority_queue(Q7),
+
+ %% merge 2 * 1-element same-priority Qs
+ Q8 = priority_queue:join(priority_queue:in(foo, 1, Q),
+ priority_queue:in(bar, 1, Q)),
+ {true, false, 2, [{1, foo}, {1, bar}], [foo, bar]} =
+ test_priority_queue(Q8),
+
+ %% merge 2 * 1-element different-priority Qs
+ Q9 = priority_queue:join(priority_queue:in(foo, 1, Q),
+ priority_queue:in(bar, 2, Q)),
+ {true, false, 2, [{2, bar}, {1, foo}], [bar, foo]} =
+ test_priority_queue(Q9),
+
+ %% merge 2 * 1-element different-priority Qs (other way around)
+ Q10 = priority_queue:join(priority_queue:in(bar, 2, Q),
+ priority_queue:in(foo, 1, Q)),
+ {true, false, 2, [{2, bar}, {1, foo}], [bar, foo]} =
+ test_priority_queue(Q10),
+
+ %% merge 2 * 2-element multi-different-priority Qs
+ Q11 = priority_queue:join(Q6, Q5),
+ {true, false, 4, [{1, bar}, {0, foo}, {0, foo}, {0, bar}],
+ [bar, foo, foo, bar]} = test_priority_queue(Q11),
+
+ %% and the other way around
+ Q12 = priority_queue:join(Q5, Q6),
+ {true, false, 4, [{1, bar}, {0, foo}, {0, bar}, {0, foo}],
+ [bar, foo, bar, foo]} = test_priority_queue(Q12),
+
+ %% merge with negative priorities
+ Q13 = priority_queue:join(Q4, Q5),
+ {true, false, 3, [{0, foo}, {0, bar}, {-1, foo}], [foo, bar, foo]} =
+ test_priority_queue(Q13),
+
+ %% and the other way around
+ Q14 = priority_queue:join(Q5, Q4),
+ {true, false, 3, [{0, foo}, {0, bar}, {-1, foo}], [foo, bar, foo]} =
+ test_priority_queue(Q14),
+
+ %% joins with empty queues:
+ Q1 = priority_queue:join(Q, Q1),
+ Q1 = priority_queue:join(Q1, Q),
+
+ %% insert with priority into non-empty zero-priority queue
+ Q15 = priority_queue:in(baz, 1, Q5),
+ {true, false, 3, [{1, baz}, {0, foo}, {0, bar}], [baz, foo, bar]} =
+ test_priority_queue(Q15),
+
passed.
priority_queue_in_all(Q, L) ->
@@ -112,6 +183,14 @@ test_simple_n_element_queue(N) ->
{true, false, N, ToListRes, Items} = test_priority_queue(Q),
passed.
+test_unfold() ->
+ {[], test} = rabbit_misc:unfold(fun (_V) -> false end, test),
+ List = lists:seq(2,20,2),
+ {List, 0} = rabbit_misc:unfold(fun (0) -> false;
+ (N) -> {true, N*2, N-1}
+ end, 10),
+ passed.
+
test_parsing() ->
passed = test_content_properties(),
passed.
@@ -601,6 +680,52 @@ test_server_status() ->
passed.
+test_hooks() ->
+ %% Firing of hooks calls all hooks in an isolated manner
+ rabbit_hooks:subscribe(test_hook, test, {rabbit_tests, handle_hook, []}),
+ rabbit_hooks:subscribe(test_hook, test2, {rabbit_tests, handle_hook, []}),
+ rabbit_hooks:subscribe(test_hook2, test2, {rabbit_tests, handle_hook, []}),
+ rabbit_hooks:trigger(test_hook, [arg1, arg2]),
+ [arg1, arg2] = get(test_hook_test_fired),
+ [arg1, arg2] = get(test_hook_test2_fired),
+ undefined = get(test_hook2_test2_fired),
+
+ %% Hook Deletion works
+ put(test_hook_test_fired, undefined),
+ put(test_hook_test2_fired, undefined),
+ rabbit_hooks:unsubscribe(test_hook, test),
+ rabbit_hooks:trigger(test_hook, [arg3, arg4]),
+ undefined = get(test_hook_test_fired),
+ [arg3, arg4] = get(test_hook_test2_fired),
+ undefined = get(test_hook2_test2_fired),
+
+ %% Catches exceptions from bad hooks
+ rabbit_hooks:subscribe(test_hook3, test, {rabbit_tests, bad_handle_hook, []}),
+ ok = rabbit_hooks:trigger(test_hook3, []),
+
+ %% Passing extra arguments to hooks
+ rabbit_hooks:subscribe(arg_hook, test, {rabbit_tests, extra_arg_hook, [1, 3]}),
+ rabbit_hooks:trigger(arg_hook, [arg1, arg2]),
+ {[arg1, arg2], 1, 3} = get(arg_hook_test_fired),
+
+ %% Invoking Pids
+ Remote = fun() ->
+ receive
+ {rabbitmq_hook,[remote_test,test,[],Target]} ->
+ Target ! invoked
+ end
+ end,
+ P = spawn(Remote),
+ rabbit_hooks:subscribe(remote_test, test, {rabbit_hooks, notify_remote, [P, [self()]]}),
+ rabbit_hooks:trigger(remote_test, []),
+ receive
+ invoked -> ok
+ after 100 ->
+ io:format("Remote hook not invoked"),
+ throw(timeout)
+ end,
+ passed.
+
%---------------------------------------------------------------------
control_action(Command, Args) -> control_action(Command, node(), Args).
@@ -684,3 +809,11 @@ delete_log_handlers(Handlers) ->
[[] = error_logger:delete_report_handler(Handler) ||
Handler <- Handlers],
ok.
+
+handle_hook(HookName, Handler, Args) ->
+ A = atom_to_list(HookName) ++ "_" ++ atom_to_list(Handler) ++ "_fired",
+ put(list_to_atom(A), Args).
+bad_handle_hook(_, _, _) ->
+ bad:bad().
+extra_arg_hook(Hookname, Handler, Args, Extra1, Extra2) ->
+ handle_hook(Hookname, Handler, {Args, Extra1, Extra2}).
diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl
index 9cf9f8ae..e338ddfe 100644
--- a/src/rabbit_writer.erl
+++ b/src/rabbit_writer.erl
@@ -33,9 +33,9 @@
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
--export([start/3, shutdown/1, mainloop/1]).
--export([send_command/2, send_command/3,
- send_command_and_notify/5]).
+-export([start/3, start_link/3, shutdown/1, mainloop/1]).
+-export([send_command/2, send_command/3, send_command_and_signal_back/3,
+ send_command_and_signal_back/4, send_command_and_notify/5]).
-export([internal_send_command/3, internal_send_command/5]).
-import(gen_tcp).
@@ -49,8 +49,12 @@
-ifdef(use_specs).
-spec(start/3 :: (socket(), channel_number(), non_neg_integer()) -> pid()).
+-spec(start_link/3 :: (socket(), channel_number(), non_neg_integer()) -> pid()).
-spec(send_command/2 :: (pid(), amqp_method()) -> 'ok').
-spec(send_command/3 :: (pid(), amqp_method(), content()) -> 'ok').
+-spec(send_command_and_signal_back/3 :: (pid(), amqp_method(), pid()) -> 'ok').
+-spec(send_command_and_signal_back/4 ::
+ (pid(), amqp_method(), content(), pid()) -> 'ok').
-spec(send_command_and_notify/5 ::
(pid(), pid(), pid(), amqp_method(), content()) -> 'ok').
-spec(internal_send_command/3 ::
@@ -68,6 +72,11 @@ start(Sock, Channel, FrameMax) ->
channel = Channel,
frame_max = FrameMax}]).
+start_link(Sock, Channel, FrameMax) ->
+ spawn_link(?MODULE, mainloop, [#wstate{sock = Sock,
+ channel = Channel,
+ frame_max = FrameMax}]).
+
mainloop(State) ->
receive
Message -> ?MODULE:mainloop(handle_message(Message, State))
@@ -86,6 +95,19 @@ handle_message({send_command, MethodRecord, Content},
ok = internal_send_command_async(Sock, Channel, MethodRecord,
Content, FrameMax),
State;
+handle_message({send_command_and_signal_back, MethodRecord, Parent},
+ State = #wstate{sock = Sock, channel = Channel}) ->
+ ok = internal_send_command_async(Sock, Channel, MethodRecord),
+ Parent ! rabbit_writer_send_command_signal,
+ State;
+handle_message({send_command_and_signal_back, MethodRecord, Content, Parent},
+ State = #wstate{sock = Sock,
+ channel = Channel,
+ frame_max = FrameMax}) ->
+ ok = internal_send_command_async(Sock, Channel, MethodRecord,
+ Content, FrameMax),
+ Parent ! rabbit_writer_send_command_signal,
+ State;
handle_message({send_command_and_notify, QPid, ChPid, MethodRecord, Content},
State = #wstate{sock = Sock,
channel = Channel,
@@ -113,6 +135,14 @@ send_command(W, MethodRecord, Content) ->
W ! {send_command, MethodRecord, Content},
ok.
+send_command_and_signal_back(W, MethodRecord, Parent) ->
+ W ! {send_command_and_signal_back, MethodRecord, Parent},
+ ok.
+
+send_command_and_signal_back(W, MethodRecord, Content, Parent) ->
+ W ! {send_command_and_signal_back, MethodRecord, Content, Parent},
+ ok.
+
send_command_and_notify(W, Q, ChPid, MethodRecord, Content) ->
W ! {send_command_and_notify, Q, ChPid, MethodRecord, Content},
ok.