summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBen Hood <0x6e6562@gmail.com>2008-11-03 15:37:23 +0000
committerBen Hood <0x6e6562@gmail.com>2008-11-03 15:37:23 +0000
commitefda2e06e4f60663c5f2d4f0e3447f39d8be7a80 (patch)
tree67033b13ed03e6e09d0ca34563e0b29685081221
parent15fe46007c2df9f2bceade06d9b8b5cb2e27bfc1 (diff)
parentbae21727b74c793967d147ea112b8d87528b07c4 (diff)
downloadrabbitmq-server-efda2e06e4f60663c5f2d4f0e3447f39d8be7a80.tar.gz
Merged default into 18776bug18776
-rw-r--r--packaging/RPMS/Fedora/Makefile31
-rw-r--r--packaging/RPMS/Fedora/rabbitmq-server.logrotate2
-rw-r--r--packaging/RPMS/Fedora/rabbitmq-server.spec32
-rw-r--r--packaging/debs/Debian/Makefile4
-rw-r--r--packaging/debs/Debian/debian/control2
-rw-r--r--packaging/debs/Debian/debian/rabbitmq-server.logrotate2
-rwxr-xr-xscripts/rabbitmq-server4
-rw-r--r--scripts/rabbitmq-server.bat4
-rw-r--r--src/buffering_proxy.erl20
-rw-r--r--src/rabbit.erl3
-rw-r--r--src/rabbit_alarm.erl125
-rw-r--r--src/rabbit_amqqueue.erl22
-rw-r--r--src/rabbit_amqqueue_process.erl78
-rw-r--r--src/rabbit_channel.erl91
-rw-r--r--src/rabbit_misc.erl11
-rw-r--r--src/rabbit_reader.erl31
-rw-r--r--src/rabbit_router.erl8
-rw-r--r--src/rabbit_writer.erl14
18 files changed, 352 insertions, 132 deletions
diff --git a/packaging/RPMS/Fedora/Makefile b/packaging/RPMS/Fedora/Makefile
index 6cc3579b..33032f11 100644
--- a/packaging/RPMS/Fedora/Makefile
+++ b/packaging/RPMS/Fedora/Makefile
@@ -4,32 +4,23 @@ VERSION=0.0.0
SOURCE_TARBALL_DIR=../../../dist
TARBALL=$(SOURCE_TARBALL_DIR)/rabbitmq-server-$(VERSION).tar.gz
TOP_DIR=$(shell pwd)
-RPM_VERSION=$(shell echo $(VERSION) | tr - _)
-DEFINES=--define '_topdir $(TOP_DIR)' --define '_tmppath $(TOP_DIR)/tmp' --define 'main_version $(VERSION)' --define 'rpm_version $(RPM_VERSION)'
+DEFINES=--define '_topdir $(TOP_DIR)' --define '_tmppath $(TOP_DIR)/tmp' --define 'debian 1'
rpms: clean server
#Create proper environment for making rpms
prepare:
- mkdir -p $(TOP_DIR)/BUILD
- mkdir -p $(TOP_DIR)/SOURCES
- mkdir -p $(TOP_DIR)/SPECS
- mkdir -p $(TOP_DIR)/SRPMS
- mkdir -p $(TOP_DIR)/RPMS
- mkdir -p $(TOP_DIR)/tmp
- cp $(TOP_DIR)/$(TARBALL) $(TOP_DIR)/SOURCES
- cp $(TOP_DIR)/rabbitmq-server.spec $(TOP_DIR)/SPECS
- cp $(TOP_DIR)/init.d $(TOP_DIR)/BUILD
- cp $(TOP_DIR)/rabbitmqctl_wrapper $(TOP_DIR)/BUILD
- cp $(TOP_DIR)/rabbitmq-server.logrotate $(TOP_DIR)/BUILD
+ mkdir -p BUILD SOURCES SPECS SRPMS RPMS tmp
+ cp $(TOP_DIR)/$(TARBALL) SOURCES
+ cp rabbitmq-server.spec SPECS
+ sed -i 's/%%VERSION%%/$(VERSION)/' SPECS/rabbitmq-server.spec
+
+ cp init.d SOURCES/rabbitmq-server.init
+ cp rabbitmqctl_wrapper SOURCES/rabbitmq-server.wrapper
+ cp rabbitmq-server.logrotate SOURCES/rabbitmq-server.logrotate
server: prepare
- rpmbuild -ba $(TOP_DIR)/SPECS/rabbitmq-server.spec $(DEFINES) --target noarch
+ rpmbuild -ba SPECS/rabbitmq-server.spec $(DEFINES) --target noarch
clean:
- rm -rf $(TOP_DIR)/SOURCES/
- rm -rf $(TOP_DIR)/SPECS/
- rm -rf $(TOP_DIR)/RPMS/
- rm -rf $(TOP_DIR)/SRPMS/
- rm -rf $(TOP_DIR)/BUILD/
- rm -rf $(TOP_DIR)/tmp/
+ rm -rf SOURCES SPECS RPMS SRPMS BUILD tmp
diff --git a/packaging/RPMS/Fedora/rabbitmq-server.logrotate b/packaging/RPMS/Fedora/rabbitmq-server.logrotate
index 64cd01a1..ab87e4a5 100644
--- a/packaging/RPMS/Fedora/rabbitmq-server.logrotate
+++ b/packaging/RPMS/Fedora/rabbitmq-server.logrotate
@@ -9,4 +9,4 @@
postrotate
/sbin/service rabbitmq-server rotate-logs
endscript
-} \ No newline at end of file
+}
diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec
index 08694c09..214f6918 100644
--- a/packaging/RPMS/Fedora/rabbitmq-server.spec
+++ b/packaging/RPMS/Fedora/rabbitmq-server.spec
@@ -1,14 +1,21 @@
Name: rabbitmq-server
-Version: %{rpm_version}
+Version: %%VERSION%%
Release: 1
License: MPLv1.1
Group: Development/Libraries
-Source: http://www.rabbitmq.com/releases/rabbitmq-server/v%{main_version}/%{name}-%{main_version}.tar.gz
+Source: http://www.rabbitmq.com/releases/rabbitmq-server/v%{version}/%{name}-%{version}.tar.gz
+Source1: rabbitmq-server.init
+Source2: rabbitmq-server.wrapper
+Source3: rabbitmq-server.logrotate
URL: http://www.rabbitmq.com/
Vendor: LShift Ltd., Cohesive Financial Technologies LLC., Rabbit Technlogies Ltd.
+%if 0%{?debian}
+%else
+BuildRequires: python, python-json
+%endif
Requires: erlang, logrotate
Packager: Hubert Plociniczak <hubert@lshift.net>
-BuildRoot: %{_tmppath}/%{name}-%{main_version}-%{release}-root
+BuildRoot: %{_tmppath}/%{name}-%{version}-%{release}-root
Summary: The RabbitMQ server
Requires(post): chkconfig
Requires(pre): chkconfig initscripts
@@ -19,10 +26,8 @@ performance enterprise messaging. The RabbitMQ server is a robust and
scalable implementation of an AMQP broker.
-%define _mandir /usr/share/man
-%define _sbindir /usr/sbin
-%define _libdir %(erl -noshell -eval "io:format('~s~n', [code:lib_dir()]), halt().")
-%define _maindir %{buildroot}%{_libdir}/rabbitmq_server-%{main_version}
+%define _erllibdir %(erl -noshell -eval "io:format('~s~n', [code:lib_dir()]), halt().")
+%define _maindir %{buildroot}%{_erllibdir}/rabbitmq_server-%{version}
%pre
@@ -33,7 +38,7 @@ if [ $1 -gt 1 ]; then
fi
%prep
-%setup -n %{name}-%{main_version}
+%setup -n %{name}-%{version}
%build
make
@@ -44,24 +49,23 @@ rm -rf %{buildroot}
make install TARGET_DIR=%{_maindir} \
SBIN_DIR=%{buildroot}%{_sbindir} \
MAN_DIR=%{buildroot}%{_mandir}
- VERSION=%{main_version}
+ VERSION=%{version}
mkdir -p %{buildroot}/var/lib/rabbitmq/mnesia
mkdir -p %{buildroot}/var/log/rabbitmq
mkdir -p %{buildroot}/etc/rc.d/init.d/
#Copy all necessary lib files etc.
-cp ../init.d %{buildroot}/etc/rc.d/init.d/rabbitmq-server
+install -m 0755 %SOURCE1 %{buildroot}/etc/rc.d/init.d/rabbitmq-server
chmod 0755 %{buildroot}/etc/rc.d/init.d/rabbitmq-server
mv %{buildroot}/usr/sbin/rabbitmqctl %{buildroot}/usr/sbin/rabbitmqctl_real
-cp ../rabbitmqctl_wrapper %{buildroot}/usr/sbin/rabbitmqctl
-chmod 0755 %{buildroot}/usr/sbin/rabbitmqctl
+install -m 0755 %SOURCE2 %{buildroot}/usr/sbin/rabbitmqctl
cp %{buildroot}%{_mandir}/man1/rabbitmqctl.1.gz %{buildroot}%{_mandir}/man1/rabbitmqctl_real.1.gz
mkdir -p %{buildroot}/etc/logrotate.d
-cp ../rabbitmq-server.logrotate %{buildroot}/etc/logrotate.d/rabbitmq-server
+install %SOURCE3 %{buildroot}/etc/logrotate.d/rabbitmq-server
%post
# create rabbitmq group
@@ -93,7 +97,7 @@ fi
%files
%defattr(-,root,root,-)
-%{_libdir}/rabbitmq_server-%{main_version}/
+%{_erllibdir}/rabbitmq_server-%{version}/
%{_mandir}/man1/rabbitmq-multi.1.gz
%{_mandir}/man1/rabbitmq-server.1.gz
%{_mandir}/man1/rabbitmqctl.1.gz
diff --git a/packaging/debs/Debian/Makefile b/packaging/debs/Debian/Makefile
index dd74c31e..3e74cb52 100644
--- a/packaging/debs/Debian/Makefile
+++ b/packaging/debs/Debian/Makefile
@@ -1,5 +1,6 @@
TARBALL_DIR=../../../dist
TARBALL=$(shell (cd $(TARBALL_DIR); echo rabbitmq-server-[0-9]*.tar.gz))
+DEBIAN_ORIG_TARBALL=$(shell echo $(TARBALL) | sed -e 's:\(.*\)-\(.*\)\(\.tar\.gz\):\1_\2\.orig\3:g')
VERSION=$(shell echo $(TARBALL) | sed -e 's:rabbitmq-server-\(.*\)\.tar\.gz:\1:g')
UNPACKED_DIR=rabbitmq-server-$(VERSION)
PACKAGENAME=rabbitmq-server
@@ -16,7 +17,8 @@ all:
package: clean
make -C ../.. check_tools
- tar -zxvf $(TARBALL_DIR)/$(TARBALL)
+ cp $(TARBALL_DIR)/$(TARBALL) $(DEBIAN_ORIG_TARBALL)
+ tar -zxvf $(DEBIAN_ORIG_TARBALL)
cp -r debian $(UNPACKED_DIR)
chmod a+x $(UNPACKED_DIR)/debian/rules
UNOFFICIAL_RELEASE=$(UNOFFICIAL_RELEASE) VERSION=$(VERSION) ./check-changelog.sh rabbitmq-server $(UNPACKED_DIR)
diff --git a/packaging/debs/Debian/debian/control b/packaging/debs/Debian/debian/control
index bc691bc7..749791a4 100644
--- a/packaging/debs/Debian/debian/control
+++ b/packaging/debs/Debian/debian/control
@@ -2,7 +2,7 @@ Source: rabbitmq-server
Section: net
Priority: extra
Maintainer: Tony Garnock-Jones <tonyg@rabbitmq.com>
-Build-Depends: cdbs, debhelper (>= 5), erlang-base | erlang-base-hipe, erlang-nox, erlang-dev, erlang-src, make, python
+Build-Depends: cdbs, debhelper (>= 5), erlang-nox, erlang-dev, python-json
Standards-Version: 3.7.2
Package: rabbitmq-server
diff --git a/packaging/debs/Debian/debian/rabbitmq-server.logrotate b/packaging/debs/Debian/debian/rabbitmq-server.logrotate
index 247635d1..bfd6b8da 100644
--- a/packaging/debs/Debian/debian/rabbitmq-server.logrotate
+++ b/packaging/debs/Debian/debian/rabbitmq-server.logrotate
@@ -9,4 +9,4 @@
postrotate
/etc/init.d/rabbitmq-server rotate-logs
endscript
-} \ No newline at end of file
+}
diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server
index b930c8ed..c953a753 100755
--- a/scripts/rabbitmq-server
+++ b/scripts/rabbitmq-server
@@ -66,8 +66,10 @@ erl \
-sasl sasl_error_logger '{file,"'${SASL_LOGS}'"}' \
-os_mon start_cpu_sup true \
-os_mon start_disksup false \
- -os_mon start_memsup false \
+ -os_mon start_memsup true \
-os_mon start_os_sup false \
+ -os_mon memsup_system_only true \
+ -os_mon system_memory_high_watermark 0.95 \
-mnesia dir "\"${MNESIA_DIR}\"" \
${CLUSTER_CONFIG} \
${RABBIT_ARGS} \
diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat
index f08027d2..38b8cc53 100644
--- a/scripts/rabbitmq-server.bat
+++ b/scripts/rabbitmq-server.bat
@@ -107,8 +107,10 @@ set MNESIA_DIR=%MNESIA_BASE%/%NODENAME%-mnesia
-sasl sasl_error_logger {file,\""%LOG_BASE%/%NODENAME%-sasl.log"\"} ^
-os_mon start_cpu_sup true ^
-os_mon start_disksup false ^
--os_mon start_memsup false ^
+-os_mon start_memsup true ^
-os_mon start_os_sup false ^
+-os_mon memsup_system_only true ^
+-os_mon system_memory_high_watermark 0.95 ^
-mnesia dir \""%MNESIA_DIR%"\" ^
%CLUSTER_CONFIG% ^
%RABBIT_ARGS% ^
diff --git a/src/buffering_proxy.erl b/src/buffering_proxy.erl
index d2505701..fcb7b412 100644
--- a/src/buffering_proxy.erl
+++ b/src/buffering_proxy.erl
@@ -32,6 +32,8 @@
-export([mainloop/4, drain/2]).
-export([proxy_loop/3]).
+-define(HIBERNATE_AFTER, 5000).
+
%%----------------------------------------------------------------------------
start_link(M, A) ->
@@ -40,7 +42,8 @@ start_link(M, A) ->
ProxyPid = self(),
Ref = make_ref(),
Pid = spawn_link(
- fun () -> mainloop(ProxyPid, Ref, M,
+ fun () -> ProxyPid ! Ref,
+ mainloop(ProxyPid, Ref, M,
M:init(ProxyPid, A)) end),
proxy_loop(Ref, Pid, empty)
end).
@@ -48,14 +51,19 @@ start_link(M, A) ->
%%----------------------------------------------------------------------------
mainloop(ProxyPid, Ref, M, State) ->
- ProxyPid ! Ref,
NewState =
receive
{Ref, Messages} ->
- lists:foldl(fun (Msg, S) ->
- drain(M, M:handle_message(Msg, S))
- end, State, lists:reverse(Messages));
+ NewSt =
+ lists:foldl(fun (Msg, S) ->
+ drain(M, M:handle_message(Msg, S))
+ end, State, lists:reverse(Messages)),
+ ProxyPid ! Ref,
+ NewSt;
Msg -> M:handle_message(Msg, State)
+ after ?HIBERNATE_AFTER ->
+ erlang:hibernate(?MODULE, mainloop,
+ [ProxyPid, Ref, M, State])
end,
?MODULE:mainloop(ProxyPid, Ref, M, NewState).
@@ -89,4 +97,6 @@ proxy_loop(Ref, Pid, State) ->
waiting -> Pid ! {Ref, [Msg]}, empty;
Messages -> [Msg | Messages]
end)
+ after ?HIBERNATE_AFTER ->
+ erlang:hibernate(?MODULE, proxy_loop, [Ref, Pid, State])
end.
diff --git a/src/rabbit.erl b/src/rabbit.erl
index c6ef1749..a33c5b7b 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -157,6 +157,8 @@ start(normal, []) ->
ok = rabbit_amqqueue:start(),
+ ok = rabbit_alarm:start(),
+
ok = rabbit_binary_generator:
check_empty_content_body_frame_size(),
@@ -198,6 +200,7 @@ start(normal, []) ->
stop(_State) ->
terminated_ok = error_logger:delete_report_handler(rabbit_error_logger),
+ ok = rabbit_alarm:stop(),
ok.
%---------------------------------------------------------------------------
diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl
new file mode 100644
index 00000000..346f5361
--- /dev/null
+++ b/src/rabbit_alarm.erl
@@ -0,0 +1,125 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd.,
+%% Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd., Cohesive Financial Technologies
+%% LLC., and Rabbit Technologies Ltd. are Copyright (C) 2007-2008
+%% LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit
+%% Technologies Ltd.;
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_alarm).
+
+-behaviour(gen_event).
+
+-export([start/0, stop/0, register/2]).
+
+-export([init/1, handle_call/2, handle_event/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+-define(MEMSUP_CHECK_INTERVAL, 1000).
+
+-record(alarms, {alertees, system_memory_high_watermark = false}).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(start/0 :: () -> 'ok').
+-spec(stop/0 :: () -> 'ok').
+-spec(register/2 :: (pid(), mfa()) -> 'ok').
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+start() ->
+ %% The default memsup check interval is 1 minute, which is way too
+ %% long - rabbit can gobble up all memory in a matter of
+ %% seconds. Unfortunately the memory_check_interval configuration
+ %% parameter and memsup:set_check_interval/1 function only provide
+ %% a granularity of minutes. So we have to peel off one layer of
+ %% the API to get to the underlying layer which operates at the
+ %% granularity of milliseconds.
+ %%
+ %% Note that the new setting will only take effect after the first
+ %% check has completed, i.e. after one minute. So if rabbit eats
+ %% all the memory within the first minute after startup then we
+ %% are out of luck.
+ ok = os_mon:call(memsup, {set_check_interval, ?MEMSUP_CHECK_INTERVAL},
+ infinity),
+
+ ok = alarm_handler:add_alarm_handler(?MODULE).
+
+stop() ->
+ ok = alarm_handler:delete_alarm_handler(?MODULE).
+
+register(Pid, HighMemMFA) ->
+ ok = gen_event:call(alarm_handler, ?MODULE,
+ {register, Pid, HighMemMFA}).
+
+%%----------------------------------------------------------------------------
+
+init([]) ->
+ {ok, #alarms{alertees = dict:new()}}.
+
+handle_call({register, Pid, HighMemMFA},
+ State = #alarms{alertees = Alertess}) ->
+ _MRef = erlang:monitor(process, Pid),
+ case State#alarms.system_memory_high_watermark of
+ true -> {M, F, A} = HighMemMFA,
+ ok = erlang:apply(M, F, A ++ [Pid, true]);
+ false -> ok
+ end,
+ NewAlertees = dict:store(Pid, HighMemMFA, Alertess),
+ {ok, ok, State#alarms{alertees = NewAlertees}};
+
+handle_call(_Request, State) ->
+ {ok, not_understood, State}.
+
+handle_event({set_alarm, {system_memory_high_watermark, []}}, State) ->
+ ok = alert(true, State#alarms.alertees),
+ {ok, State#alarms{system_memory_high_watermark = true}};
+
+handle_event({clear_alarm, system_memory_high_watermark}, State) ->
+ ok = alert(false, State#alarms.alertees),
+ {ok, State#alarms{system_memory_high_watermark = false}};
+
+handle_event(_Event, State) ->
+ {ok, State}.
+
+handle_info({'DOWN', _MRef, process, Pid, _Reason},
+ State = #alarms{alertees = Alertess}) ->
+ {ok, State#alarms{alertees = dict:erase(Pid, Alertess)}};
+
+handle_info(_Info, State) ->
+ {ok, State}.
+
+terminate(_Arg, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%%----------------------------------------------------------------------------
+
+alert(Alert, Alertees) ->
+ dict:fold(fun (Pid, {M, F, A}, Acc) ->
+ ok = erlang:apply(M, F, A ++ [Pid, Alert]),
+ Acc
+ end, ok, Alertees).
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 00ea2541..56d2c35d 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -213,25 +213,23 @@ ack(QPid, Txn, MsgIds, ChPid) ->
commit_all(QPids, Txn) ->
Timeout = length(QPids) * ?CALL_TIMEOUT,
safe_pmap_ok(
+ fun (QPid) -> exit({queue_disappeared, QPid}) end,
fun (QPid) -> gen_server:call(QPid, {commit, Txn}, Timeout) end,
QPids).
rollback_all(QPids, Txn) ->
safe_pmap_ok(
+ fun (QPid) -> exit({queue_disappeared, QPid}) end,
fun (QPid) -> gen_server:cast(QPid, {rollback, Txn}) end,
QPids).
notify_down_all(QPids, ChPid) ->
Timeout = length(QPids) * ?CALL_TIMEOUT,
safe_pmap_ok(
- fun (QPid) ->
- rabbit_misc:with_exit_handler(
- %% we don't care if the queue process has terminated
- %% in the meantime
- fun () -> ok end,
- fun () -> gen_server:call(QPid, {notify_down, ChPid},
- Timeout) end)
- end,
+ %% we don't care if the queue process has terminated in the
+ %% meantime
+ fun (_) -> ok end,
+ fun (QPid) -> gen_server:call(QPid, {notify_down, ChPid}, Timeout) end,
QPids).
claim_queue(#amqqueue{pid = QPid}, ReaderPid) ->
@@ -286,10 +284,13 @@ pseudo_queue(QueueName, Pid) ->
arguments = [],
pid = Pid}.
-safe_pmap_ok(F, L) ->
+safe_pmap_ok(H, F, L) ->
case [R || R <- rabbit_misc:upmap(
fun (V) ->
- try F(V)
+ try
+ rabbit_misc:with_exit_handler(
+ fun () -> H(V) end,
+ fun () -> F(V) end)
catch Class:Reason -> {Class, Reason}
end
end, L),
@@ -297,4 +298,3 @@ safe_pmap_ok(F, L) ->
[] -> ok;
Errors -> {error, Errors}
end.
-
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 7716ef16..e687df84 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -30,6 +30,7 @@
-behaviour(gen_server).
-define(UNSENT_MESSAGE_LIMIT, 100).
+-define(HIBERNATE_AFTER, 1000).
-export([start_link/1]).
@@ -75,7 +76,7 @@ init(Q) ->
has_had_consumers = false,
next_msg_id = 1,
message_buffer = queue:new(),
- round_robin = queue:new()}}.
+ round_robin = queue:new()}, ?HIBERNATE_AFTER}.
terminate(_Reason, State) ->
%% FIXME: How do we cancel active subscriptions?
@@ -90,6 +91,10 @@ code_change(_OldVsn, State, _Extra) ->
%%----------------------------------------------------------------------------
+reply(Reply, NewState) -> {reply, Reply, NewState, ?HIBERNATE_AFTER}.
+
+noreply(NewState) -> {noreply, NewState, ?HIBERNATE_AFTER}.
+
lookup_ch(ChPid) ->
case get({ch, ChPid}) of
undefined -> not_found;
@@ -254,7 +259,7 @@ check_auto_delete(State = #q{round_robin = RoundRobin}) ->
handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder,
round_robin = ActiveConsumers}) ->
case lookup_ch(DownPid) of
- not_found -> {noreply, State};
+ not_found -> noreply(State);
#cr{monitor_ref = MonitorRef, ch_pid = ChPid, unacked_messages = UAM} ->
NewActive = block_consumers(ChPid, ActiveConsumers),
erlang:demonitor(MonitorRef),
@@ -270,7 +275,7 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder,
end,
round_robin = NewActive})) of
{continue, NewState} ->
- {noreply, NewState};
+ noreply(NewState);
{stop, NewState} ->
{stop, normal, NewState}
end
@@ -470,12 +475,12 @@ handle_call({deliver_immediately, Txn, Message}, _From, State) ->
%% queues discarding the message?
%%
{Delivered, NewState} = attempt_delivery(Txn, Message, State),
- {reply, Delivered, NewState};
+ reply(Delivered, NewState);
handle_call({deliver, Txn, Message}, _From, State) ->
%% Synchronous, "mandatory" delivery mode
{Delivered, NewState} = deliver_or_enqueue(Txn, Message, State),
- {reply, Delivered, NewState};
+ reply(Delivered, NewState);
handle_call({commit, Txn}, From, State) ->
ok = commit_work(Txn, qname(State)),
@@ -483,7 +488,7 @@ handle_call({commit, Txn}, From, State) ->
gen_server:reply(From, ok),
NewState = process_pending(Txn, State),
erase_tx(Txn),
- {noreply, NewState};
+ noreply(NewState);
handle_call({notify_down, ChPid}, From, State) ->
%% optimisation: we reply straight away so the sender can continue
@@ -507,10 +512,11 @@ handle_call({basic_get, ChPid, NoAck}, _From,
persist_auto_ack(QName, Message)
end,
Msg = {QName, self(), NextId, Delivered, Message},
- {reply, {ok, queue:len(BufferTail), Msg},
- State#q{message_buffer = BufferTail, next_msg_id = NextId + 1}};
+ reply({ok, queue:len(BufferTail), Msg},
+ State#q{message_buffer = BufferTail,
+ next_msg_id = NextId + 1});
{empty, _} ->
- {reply, empty, State}
+ reply(empty, State)
end;
handle_call({basic_consume, NoAck, ReaderPid, ChPid, ConsumerTag,
@@ -520,11 +526,11 @@ handle_call({basic_consume, NoAck, ReaderPid, ChPid, ConsumerTag,
round_robin = RoundRobin}) ->
case check_queue_owner(Owner, ReaderPid) of
mismatch ->
- {reply, {error, queue_owned_by_another_connection}, State};
+ reply({error, queue_owned_by_another_connection}, State);
ok ->
case check_exclusive_access(ExistingHolder, ExclusiveConsume) of
in_use ->
- {reply, {error, exclusive_consume_unavailable}, State};
+ reply({error, exclusive_consume_unavailable}, State);
ok ->
C = #cr{consumers = Consumers} = ch_record(ChPid),
Consumer = #consumer{tag = ConsumerTag, ack_required = not(NoAck)},
@@ -538,7 +544,7 @@ handle_call({basic_consume, NoAck, ReaderPid, ChPid, ConsumerTag,
end,
round_robin = queue:in({ChPid, Consumer}, RoundRobin)},
ok = maybe_send_reply(ChPid, OkMsg),
- {reply, ok, run_poke_burst(State1)}
+ reply(ok, run_poke_burst(State1))
end
end;
@@ -548,7 +554,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
case lookup_ch(ChPid) of
not_found ->
ok = maybe_send_reply(ChPid, OkMsg),
- {reply, ok, State};
+ reply(ok, State);
C = #cr{consumers = Consumers} ->
NewConsumers = lists:filter
(fun (#consumer{tag = CT}) -> CT /= ConsumerTag end,
@@ -564,7 +570,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
ConsumerTag,
RoundRobin)}) of
{continue, State1} ->
- {reply, ok, State1};
+ reply(ok, State1);
{stop, State1} ->
{stop, normal, ok, State1}
end
@@ -573,7 +579,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
handle_call(stat, _From, State = #q{q = #amqqueue{name = Name},
message_buffer = MessageBuffer,
round_robin = RoundRobin}) ->
- {reply, {ok, Name, queue:len(MessageBuffer), queue:len(RoundRobin)}, State};
+ reply({ok, Name, queue:len(MessageBuffer), queue:len(RoundRobin)}, State);
handle_call({delete, IfUnused, IfEmpty}, _From,
State = #q{message_buffer = MessageBuffer}) ->
@@ -581,16 +587,17 @@ handle_call({delete, IfUnused, IfEmpty}, _From,
IsUnused = is_unused(),
if
IfEmpty and not(IsEmpty) ->
- {reply, {error, not_empty}, State};
+ reply({error, not_empty}, State);
IfUnused and not(IsUnused) ->
- {reply, {error, in_use}, State};
+ reply({error, in_use}, State);
true ->
{stop, normal, {ok, queue:len(MessageBuffer)}, State}
end;
handle_call(purge, _From, State = #q{message_buffer = MessageBuffer}) ->
ok = purge_message_buffer(qname(State), MessageBuffer),
- {reply, {ok, queue:len(MessageBuffer)}, State#q{message_buffer = queue:new()}};
+ reply({ok, queue:len(MessageBuffer)},
+ State#q{message_buffer = queue:new()});
handle_call({claim_queue, ReaderPid}, _From, State = #q{owner = Owner,
exclusive_consumer = Holder}) ->
@@ -604,25 +611,25 @@ handle_call({claim_queue, ReaderPid}, _From, State = #q{owner = Owner,
%% to check, we'd need to hold not just the ch
%% pid for each consumer, but also its reader
%% pid...
- {reply, locked, State};
+ reply(locked, State);
ok ->
- {reply, ok, State#q{owner = {ReaderPid, erlang:monitor(process, ReaderPid)}}}
+ reply(ok, State#q{owner = {ReaderPid, erlang:monitor(process, ReaderPid)}})
end;
{ReaderPid, _MonitorRef} ->
- {reply, ok, State};
+ reply(ok, State);
_ ->
- {reply, locked, State}
+ reply(locked, State)
end.
handle_cast({deliver, Txn, Message}, State) ->
%% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
{_Delivered, NewState} = deliver_or_enqueue(Txn, Message, State),
- {noreply, NewState};
+ noreply(NewState);
handle_cast({ack, Txn, MsgIds, ChPid}, State) ->
case lookup_ch(ChPid) of
not_found ->
- {noreply, State};
+ noreply(State);
C = #cr{unacked_messages = UAM} ->
{Acked, Remaining} = collect_messages(MsgIds, UAM),
persist_acks(Txn, qname(State), Acked),
@@ -632,37 +639,37 @@ handle_cast({ack, Txn, MsgIds, ChPid}, State) ->
_ ->
record_pending_acks(Txn, ChPid, MsgIds)
end,
- {noreply, State}
+ noreply(State)
end;
handle_cast({rollback, Txn}, State) ->
ok = rollback_work(Txn, qname(State)),
erase_tx(Txn),
- {noreply, State};
+ noreply(State);
handle_cast({redeliver, Messages}, State) ->
- {noreply, deliver_or_enqueue_n(Messages, State)};
+ noreply(deliver_or_enqueue_n(Messages, State));
handle_cast({requeue, MsgIds, ChPid}, State) ->
case lookup_ch(ChPid) of
not_found ->
rabbit_log:warning("Ignoring requeue from unknown ch: ~p~n",
[ChPid]),
- {noreply, State};
+ noreply(State);
C = #cr{unacked_messages = UAM} ->
{Messages, NewUAM} = collect_messages(MsgIds, UAM),
store_ch_record(C#cr{unacked_messages = NewUAM}),
- {noreply, deliver_or_enqueue_n(
- [{Message, true} || Message <- Messages], State)}
+ noreply(deliver_or_enqueue_n(
+ [{Message, true} || Message <- Messages], State))
end;
handle_cast({notify_sent, ChPid}, State) ->
case lookup_ch(ChPid) of
- not_found -> {noreply, State};
+ not_found -> noreply(State);
T = #cr{unsent_message_count =Count} ->
- {noreply, possibly_unblock(
- T#cr{unsent_message_count = Count - 1},
- State)}
+ noreply(possibly_unblock(
+ T#cr{unsent_message_count = Count - 1},
+ State))
end.
handle_info({'DOWN', MonitorRef, process, DownPid, _Reason},
@@ -681,6 +688,9 @@ handle_info({'DOWN', MonitorRef, process, DownPid, _Reason},
handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) ->
handle_ch_down(DownPid, State);
+handle_info(timeout, State) ->
+ {noreply, State, hibernate};
+
handle_info(Info, State) ->
?LOGDEBUG("Info in queue: ~p~n", [Info]),
{stop, {unhandled_info, Info}, State}.
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index c432ffd3..1eb421ca 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -28,7 +28,7 @@
-include("rabbit.hrl").
-export([start_link/4, do/2, do/3, shutdown/1]).
--export([send_command/2, deliver/4]).
+-export([send_command/2, deliver/4, conserve_memory/2]).
%% callbacks
-export([init/2, handle_message/2]).
@@ -49,6 +49,7 @@
-spec(shutdown/1 :: (pid()) -> 'ok').
-spec(send_command/2 :: (pid(), amqp_method()) -> 'ok').
-spec(deliver/4 :: (pid(), ctag(), bool(), msg()) -> 'ok').
+-spec(conserve_memory/2 :: (pid(), bool()) -> 'ok').
-endif.
@@ -77,11 +78,18 @@ deliver(Pid, ConsumerTag, AckRequired, Msg) ->
Pid ! {deliver, ConsumerTag, AckRequired, Msg},
ok.
+conserve_memory(Pid, Conserve) ->
+ Pid ! {conserve_memory, Conserve},
+ ok.
+
%%---------------------------------------------------------------------------
init(ProxyPid, [ReaderPid, WriterPid, Username, VHost]) ->
process_flag(trap_exit, true),
link(WriterPid),
+ %% this is bypassing the proxy so alarms can "jump the queue" and
+ %% be handled promptly
+ rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}),
#ch{state = starting,
proxy_pid = ProxyPid,
reader_pid = ReaderPid,
@@ -129,6 +137,11 @@ handle_message({deliver, ConsumerTag, AckRequired, Msg},
true, ConsumerTag, DeliveryTag, Msg),
State1#ch{next_tag = DeliveryTag + 1};
+handle_message({conserve_memory, Conserve}, State) ->
+ ok = rabbit_writer:send_command(
+ State#ch.writer_pid, #'channel.flow'{active = not(Conserve)}),
+ State;
+
handle_message({'EXIT', _Pid, Reason}, State) ->
terminate(Reason, State);
@@ -572,29 +585,18 @@ handle_method(#'queue.bind'{queue = QueueNameBin,
exchange = ExchangeNameBin,
routing_key = RoutingKey,
nowait = NoWait,
- arguments = Arguments},
- _, State = #ch{ virtual_host = VHostPath }) ->
- %% FIXME: connection exception (!) on failure?? (see rule named "failure" in spec-XML)
- %% FIXME: don't allow binding to internal exchanges - including the one named "" !
- QueueName = expand_queue_name_shortcut(QueueNameBin, State),
- ActualRoutingKey = expand_routing_key_shortcut(QueueNameBin, RoutingKey,
- State),
- ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
- case rabbit_exchange:add_binding(ExchangeName, QueueName,
- ActualRoutingKey, Arguments) of
- {error, queue_not_found} ->
- rabbit_misc:protocol_error(
- not_found, "no ~s", [rabbit_misc:rs(QueueName)]);
- {error, exchange_not_found} ->
- rabbit_misc:protocol_error(
- not_found, "no ~s", [rabbit_misc:rs(ExchangeName)]);
- {error, durability_settings_incompatible} ->
- rabbit_misc:protocol_error(
- not_allowed, "durability settings of ~s incompatible with ~s",
- [rabbit_misc:rs(QueueName), rabbit_misc:rs(ExchangeName)]);
- ok ->
- return_ok(State, NoWait, #'queue.bind_ok'{})
- end;
+ arguments = Arguments}, _, State) ->
+ binding_action(fun rabbit_exchange:add_binding/4, ExchangeNameBin,
+ QueueNameBin, RoutingKey, Arguments, #'queue.bind_ok'{},
+ NoWait, State);
+
+handle_method(#'queue.unbind'{queue = QueueNameBin,
+ exchange = ExchangeNameBin,
+ routing_key = RoutingKey,
+ arguments = Arguments}, _, State) ->
+ binding_action(fun rabbit_exchange:delete_binding/4, ExchangeNameBin,
+ QueueNameBin, RoutingKey, Arguments, #'queue.unbind_ok'{},
+ false, State);
handle_method(#'queue.purge'{queue = QueueNameBin,
nowait = NoWait},
@@ -630,12 +632,47 @@ handle_method(#'channel.flow'{active = _}, _, State) ->
%% FIXME: implement
{reply, #'channel.flow_ok'{active = true}, State};
+handle_method(#'channel.flow_ok'{active = _}, _, State) ->
+ %% TODO: We may want to correlate this to channel.flow messages we
+ %% have sent, and complain if we get an unsolicited
+ %% channel.flow_ok, or the client refuses our flow request.
+ {noreply, State};
+
handle_method(_MethodRecord, _Content, _State) ->
rabbit_misc:protocol_error(
command_invalid, "unimplemented method", []).
%%----------------------------------------------------------------------------
+binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments,
+ ReturnMethod, NoWait, State = #ch{virtual_host = VHostPath}) ->
+ %% FIXME: connection exception (!) on failure??
+ %% (see rule named "failure" in spec-XML)
+ %% FIXME: don't allow binding to internal exchanges -
+ %% including the one named "" !
+ QueueName = expand_queue_name_shortcut(QueueNameBin, State),
+ ActualRoutingKey = expand_routing_key_shortcut(QueueNameBin, RoutingKey,
+ State),
+ ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
+ case Fun(ExchangeName, QueueName, ActualRoutingKey, Arguments) of
+ {error, queue_not_found} ->
+ rabbit_misc:protocol_error(
+ not_found, "no ~s", [rabbit_misc:rs(QueueName)]);
+ {error, exchange_not_found} ->
+ rabbit_misc:protocol_error(
+ not_found, "no ~s", [rabbit_misc:rs(ExchangeName)]);
+ {error, binding_not_found} ->
+ rabbit_misc:protocol_error(
+ not_found, "no binding ~s between ~s and ~s",
+ [RoutingKey, rabbit_misc:rs(ExchangeName),
+ rabbit_misc:rs(QueueName)]);
+ {error, durability_settings_incompatible} ->
+ rabbit_misc:protocol_error(
+ not_allowed, "durability settings of ~s incompatible with ~s",
+ [rabbit_misc:rs(QueueName), rabbit_misc:rs(ExchangeName)]);
+ ok -> return_ok(State, NoWait, ReturnMethod)
+ end.
+
publish(Mandatory, Immediate, Message, QPids,
State = #ch{transaction_id = TxnKey, writer_pid = WriterPid}) ->
Handled = deliver(QPids, Mandatory, Immediate, TxnKey,
@@ -717,7 +754,8 @@ internal_commit(State = #ch{transaction_id = TxnKey,
case rabbit_amqqueue:commit_all(sets:to_list(Participants),
TxnKey) of
ok -> new_tx(State);
- {error, Errors} -> exit({commit_failed, Errors})
+ {error, Errors} -> rabbit_misc:protocol_error(
+ internal_error, "commit failed: ~w", [Errors])
end.
internal_rollback(State = #ch{transaction_id = TxnKey,
@@ -732,7 +770,8 @@ internal_rollback(State = #ch{transaction_id = TxnKey,
TxnKey) of
ok -> NewUAMQ = queue:join(UAQ, UAMQ),
new_tx(State#ch{unacked_message_q = NewUAMQ});
- {error, Errors} -> exit({rollback_failed, Errors})
+ {error, Errors} -> rabbit_misc:protocol_error(
+ internal_error, "rollback failed: ~w", [Errors])
end.
fold_per_queue(F, Acc0, UAQ) ->
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 60c32da4..09779e50 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -34,7 +34,7 @@
-export([dirty_read/1]).
-export([r/3, r/2, rs/1]).
-export([enable_cover/0, report_cover/0]).
--export([with_exit_handler/2]).
+-export([throw_on_error/2, with_exit_handler/2]).
-export([with_user/2, with_vhost/2, with_user_and_vhost/3]).
-export([execute_mnesia_transaction/1]).
-export([ensure_ok/2]).
@@ -77,6 +77,8 @@
-spec(rs/1 :: (r(atom())) -> string()).
-spec(enable_cover/0 :: () -> 'ok' | {'error', any()}).
-spec(report_cover/0 :: () -> 'ok').
+-spec(throw_on_error/2 ::
+ (atom(), thunk({error, any()} | {ok, A} | A)) -> A).
-spec(with_exit_handler/2 :: (thunk(A), thunk(A)) -> A).
-spec(with_user/2 :: (username(), thunk(A)) -> A).
-spec(with_vhost/2 :: (vhost(), thunk(A)) -> A).
@@ -198,6 +200,13 @@ report_coverage_percentage(File, Cov, NotCov, Mod) ->
end,
Mod]).
+throw_on_error(E, Thunk) ->
+ case Thunk() of
+ {error, Reason} -> throw({E, Reason});
+ {ok, Res} -> Res;
+ Res -> Res
+ end.
+
with_exit_handler(Handler, Thunk) ->
try
Thunk()
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 7e68b3ed..ce26c11a 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -166,14 +166,27 @@ teardown_profiling(Value) ->
fprof:analyse([{dest, []}, {cols, 100}])
end.
+inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F).
+
+peername(Sock) ->
+ try
+ {Address, Port} = inet_op(fun () -> inet:peername(Sock) end),
+ AddressS = inet_parse:ntoa(Address),
+ {AddressS, Port}
+ catch
+ Ex -> rabbit_log:error("error on TCP connection ~p:~p~n",
+ [self(), Ex]),
+ rabbit_log:info("closing TCP connection ~p", [self()]),
+ exit(normal)
+ end.
+
start_connection(Parent, Deb, ClientSock) ->
- ProfilingValue = setup_profiling(),
process_flag(trap_exit, true),
- {ok, {PeerAddress, PeerPort}} = inet:peername(ClientSock),
- PeerAddressS = inet_parse:ntoa(PeerAddress),
- rabbit_log:info("starting TCP connection ~p from ~s:~p~n",
- [self(), PeerAddressS, PeerPort]),
+ {PeerAddressS, PeerPort} = peername(ClientSock),
+ ProfilingValue = setup_profiling(),
try
+ rabbit_log:info("starting TCP connection ~p from ~s:~p~n",
+ [self(), PeerAddressS, PeerPort]),
erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(),
handshake_timeout),
mainloop(Parent, Deb, switch_callback(
@@ -266,7 +279,8 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) ->
end.
switch_callback(OldState, NewCallback, Length) ->
- {ok, Ref} = prim_inet:async_recv(OldState#v1.sock, Length, -1),
+ Ref = inet_op(fun () -> prim_inet:async_recv(
+ OldState#v1.sock, Length, -1) end),
OldState#v1{callback = NewCallback,
recv_ref = Ref}.
@@ -472,7 +486,10 @@ handle_input(handshake, <<"AMQP",1,1,ProtocolMajor,ProtocolMinor>>,
end;
handle_input(handshake, Other, #v1{sock = Sock}) ->
- ok = gen_tcp:send(Sock, <<"AMQP",1,1,?PROTOCOL_VERSION_MAJOR,?PROTOCOL_VERSION_MINOR>>),
+ ok = inet_op(fun () -> gen_tcp:send(
+ Sock, <<"AMQP",1,1,
+ ?PROTOCOL_VERSION_MAJOR,
+ ?PROTOCOL_VERSION_MINOR>>) end),
throw({bad_header, Other});
handle_input(Callback, Data, _State) ->
diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl
index 41a8d64c..a2337647 100644
--- a/src/rabbit_router.erl
+++ b/src/rabbit_router.erl
@@ -150,11 +150,9 @@ run_bindings(QPids, IsMandatory, IsImmediate, Txn, Message) ->
fun (QPid, {Routed, Handled}) ->
case catch rabbit_amqqueue:deliver(IsMandatory, IsImmediate,
Txn, Message, QPid) of
- true -> {true, [QPid | Handled]};
- false -> {true, Handled};
- {'EXIT', Reason} -> rabbit_log:warning("delivery to ~p failed:~n~p~n",
- [QPid, Reason]),
- {Routed, Handled}
+ true -> {true, [QPid | Handled]};
+ false -> {true, Handled};
+ {'EXIT', _Reason} -> {Routed, Handled}
end
end,
{false, []},
diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl
index 0f6bca91..a2688625 100644
--- a/src/rabbit_writer.erl
+++ b/src/rabbit_writer.erl
@@ -36,6 +36,8 @@
-record(wstate, {sock, channel, frame_max}).
+-define(HIBERNATE_AFTER, 5000).
+
%%----------------------------------------------------------------------------
-ifdef(use_specs).
@@ -63,6 +65,8 @@ start(Sock, Channel, FrameMax) ->
mainloop(State) ->
receive
Message -> ?MODULE:mainloop(handle_message(Message, State))
+ after ?HIBERNATE_AFTER ->
+ erlang:hibernate(?MODULE, mainloop, [State])
end.
handle_message({send_command, MethodRecord},
@@ -127,12 +131,16 @@ assemble_frames(Channel, MethodRecord, Content, FrameMax) ->
Channel, Content, FrameMax),
[MethodFrame | ContentFrames].
+tcp_send(Sock, Data) ->
+ rabbit_misc:throw_on_error(inet_error,
+ fun () -> gen_tcp:send(Sock, Data) end).
+
internal_send_command(Sock, Channel, MethodRecord) ->
- ok = gen_tcp:send(Sock, assemble_frames(Channel, MethodRecord)).
+ ok = tcp_send(Sock, assemble_frames(Channel, MethodRecord)).
internal_send_command(Sock, Channel, MethodRecord, Content, FrameMax) ->
- ok = gen_tcp:send(Sock, assemble_frames(Channel, MethodRecord,
- Content, FrameMax)).
+ ok = tcp_send(Sock, assemble_frames(Channel, MethodRecord,
+ Content, FrameMax)).
%% gen_tcp:send/2 does a selective receive of {inet_reply, Sock,
%% Status} to obtain the result. That is bad when it is called from