diff options
author | Simon MacMullen <simon@lshift.net> | 2008-11-07 11:25:23 +0000 |
---|---|---|
committer | Simon MacMullen <simon@lshift.net> | 2008-11-07 11:25:23 +0000 |
commit | 78e2a8e7fdff0850e873a9f85b475a4ec36f97da (patch) | |
tree | e4c34f05516628f7773040ad62f11bf7e9ff4272 | |
parent | 13caad9a22d27fa9c70e25e6404a6aa615fdc6e3 (diff) | |
parent | f58bbbdcb3d1265d8507fc5b0261fb1c017417f6 (diff) | |
download | rabbitmq-server-78e2a8e7fdff0850e873a9f85b475a4ec36f97da.tar.gz |
Verified
-rw-r--r-- | include/rabbit.hrl | 20 | ||||
-rw-r--r-- | include/rabbit_framing_spec.hrl | 1 | ||||
-rw-r--r-- | packaging/RPMS/Fedora/Makefile | 31 | ||||
-rw-r--r-- | packaging/RPMS/Fedora/rabbitmq-server.logrotate | 2 | ||||
-rw-r--r-- | packaging/RPMS/Fedora/rabbitmq-server.spec | 30 | ||||
-rw-r--r-- | packaging/debs/Debian/debian/control | 2 | ||||
-rw-r--r-- | packaging/debs/Debian/debian/rabbitmq-server.logrotate | 2 | ||||
-rwxr-xr-x | scripts/rabbitmq-server | 4 | ||||
-rw-r--r-- | scripts/rabbitmq-server.bat | 4 | ||||
-rw-r--r-- | src/buffering_proxy.erl | 20 | ||||
-rw-r--r-- | src/rabbit.erl | 3 | ||||
-rw-r--r-- | src/rabbit_alarm.erl | 126 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 142 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 78 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 91 | ||||
-rw-r--r-- | src/rabbit_exchange.erl | 406 | ||||
-rw-r--r-- | src/rabbit_misc.erl | 18 | ||||
-rw-r--r-- | src/rabbit_mnesia.erl | 10 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 31 | ||||
-rw-r--r-- | src/rabbit_writer.erl | 14 | ||||
-rw-r--r-- | src/tcp_listener.erl | 6 |
21 files changed, 614 insertions, 427 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl index 180a0dc3..706a92af 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -43,11 +43,14 @@ -record(exchange, {name, type, durable, auto_delete, arguments}). --record(amqqueue, {name, durable, auto_delete, arguments, binding_specs, pid}). --record(binding_spec, {exchange_name, routing_key, arguments}). +-record(amqqueue, {name, durable, auto_delete, arguments, pid}). --record(binding, {key, handlers}). --record(handler, {binding_spec, queue, qpid}). +%% mnesia doesn't like unary records, so we add a dummy 'value' field +-record(route, {binding, value = const}). +-record(reverse_route, {reverse_binding, value = const}). + +-record(binding, {exchange_name, key, queue_name, args = []}). +-record(reverse_binding, {queue_name, key, exchange_name, args = []}). -record(listener, {node, protocol, host, port}). @@ -77,16 +80,11 @@ -type(user() :: #user{username :: username(), password :: password()}). --type(binding_spec() :: - #binding_spec{exchange_name :: exchange_name(), - routing_key :: routing_key(), - arguments :: amqp_table()}). -type(amqqueue() :: #amqqueue{name :: queue_name(), durable :: bool(), auto_delete :: bool(), arguments :: amqp_table(), - binding_specs :: [binding_spec()], pid :: maybe(pid())}). -type(exchange() :: #exchange{name :: exchange_name(), @@ -94,6 +92,10 @@ durable :: bool(), auto_delete :: bool(), arguments :: amqp_table()}). +-type(binding() :: + #binding{exchange_name :: exchange_name(), + queue_name :: queue_name(), + key :: binding_key()}). %% TODO: make this more precise by tying specific class_ids to %% specific properties -type(undecoded_content() :: diff --git a/include/rabbit_framing_spec.hrl b/include/rabbit_framing_spec.hrl index e9e65092..13000153 100644 --- a/include/rabbit_framing_spec.hrl +++ b/include/rabbit_framing_spec.hrl @@ -53,3 +53,4 @@ -type(vhost() :: binary()). -type(ctag() :: binary()). -type(exchange_type() :: 'direct' | 'topic' | 'fanout'). +-type(binding_key() :: binary()). diff --git a/packaging/RPMS/Fedora/Makefile b/packaging/RPMS/Fedora/Makefile index 814c79f0..33032f11 100644 --- a/packaging/RPMS/Fedora/Makefile +++ b/packaging/RPMS/Fedora/Makefile @@ -4,32 +4,23 @@ VERSION=0.0.0 SOURCE_TARBALL_DIR=../../../dist TARBALL=$(SOURCE_TARBALL_DIR)/rabbitmq-server-$(VERSION).tar.gz TOP_DIR=$(shell pwd) -RPM_VERSION=$(shell echo $(VERSION) | tr - _) -DEFINES=--define '_topdir $(TOP_DIR)' --define '_tmppath $(TOP_DIR)/tmp' --define 'main_version $(VERSION)' --define 'rpm_version $(RPM_VERSION)' --define 'debian 1' +DEFINES=--define '_topdir $(TOP_DIR)' --define '_tmppath $(TOP_DIR)/tmp' --define 'debian 1' rpms: clean server #Create proper environment for making rpms prepare: - mkdir -p $(TOP_DIR)/BUILD - mkdir -p $(TOP_DIR)/SOURCES - mkdir -p $(TOP_DIR)/SPECS - mkdir -p $(TOP_DIR)/SRPMS - mkdir -p $(TOP_DIR)/RPMS - mkdir -p $(TOP_DIR)/tmp - cp $(TOP_DIR)/$(TARBALL) $(TOP_DIR)/SOURCES - cp $(TOP_DIR)/rabbitmq-server.spec $(TOP_DIR)/SPECS - cp $(TOP_DIR)/init.d $(TOP_DIR)/BUILD - cp $(TOP_DIR)/rabbitmqctl_wrapper $(TOP_DIR)/BUILD - cp $(TOP_DIR)/rabbitmq-server.logrotate $(TOP_DIR)/BUILD + mkdir -p BUILD SOURCES SPECS SRPMS RPMS tmp + cp $(TOP_DIR)/$(TARBALL) SOURCES + cp rabbitmq-server.spec SPECS + sed -i 's/%%VERSION%%/$(VERSION)/' SPECS/rabbitmq-server.spec + + cp init.d SOURCES/rabbitmq-server.init + cp rabbitmqctl_wrapper SOURCES/rabbitmq-server.wrapper + cp rabbitmq-server.logrotate SOURCES/rabbitmq-server.logrotate server: prepare - rpmbuild -ba $(TOP_DIR)/SPECS/rabbitmq-server.spec $(DEFINES) --target noarch + rpmbuild -ba SPECS/rabbitmq-server.spec $(DEFINES) --target noarch clean: - rm -rf $(TOP_DIR)/SOURCES/ - rm -rf $(TOP_DIR)/SPECS/ - rm -rf $(TOP_DIR)/RPMS/ - rm -rf $(TOP_DIR)/SRPMS/ - rm -rf $(TOP_DIR)/BUILD/ - rm -rf $(TOP_DIR)/tmp/ + rm -rf SOURCES SPECS RPMS SRPMS BUILD tmp diff --git a/packaging/RPMS/Fedora/rabbitmq-server.logrotate b/packaging/RPMS/Fedora/rabbitmq-server.logrotate index 64cd01a1..ab87e4a5 100644 --- a/packaging/RPMS/Fedora/rabbitmq-server.logrotate +++ b/packaging/RPMS/Fedora/rabbitmq-server.logrotate @@ -9,4 +9,4 @@ postrotate /sbin/service rabbitmq-server rotate-logs endscript -}
\ No newline at end of file +} diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec index 43837ba3..214f6918 100644 --- a/packaging/RPMS/Fedora/rabbitmq-server.spec +++ b/packaging/RPMS/Fedora/rabbitmq-server.spec @@ -1,9 +1,12 @@ Name: rabbitmq-server -Version: %{rpm_version} +Version: %%VERSION%% Release: 1 License: MPLv1.1 Group: Development/Libraries -Source: http://www.rabbitmq.com/releases/rabbitmq-server/v%{main_version}/%{name}-%{main_version}.tar.gz +Source: http://www.rabbitmq.com/releases/rabbitmq-server/v%{version}/%{name}-%{version}.tar.gz +Source1: rabbitmq-server.init +Source2: rabbitmq-server.wrapper +Source3: rabbitmq-server.logrotate URL: http://www.rabbitmq.com/ Vendor: LShift Ltd., Cohesive Financial Technologies LLC., Rabbit Technlogies Ltd. %if 0%{?debian} @@ -12,7 +15,7 @@ BuildRequires: python, python-json %endif Requires: erlang, logrotate Packager: Hubert Plociniczak <hubert@lshift.net> -BuildRoot: %{_tmppath}/%{name}-%{main_version}-%{release}-root +BuildRoot: %{_tmppath}/%{name}-%{version}-%{release}-root Summary: The RabbitMQ server Requires(post): chkconfig Requires(pre): chkconfig initscripts @@ -22,10 +25,10 @@ RabbitMQ is an implementation of AMQP, the emerging standard for high performance enterprise messaging. The RabbitMQ server is a robust and scalable implementation of an AMQP broker. -%define _mandir /usr/share/man -%define _sbindir /usr/sbin -%define _libdir %(erl -noshell -eval "io:format('~s~n', [code:lib_dir()]), halt().") -%define _maindir %{buildroot}%{_libdir}/rabbitmq_server-%{main_version} + +%define _erllibdir %(erl -noshell -eval "io:format('~s~n', [code:lib_dir()]), halt().") +%define _maindir %{buildroot}%{_erllibdir}/rabbitmq_server-%{version} + %pre if [ $1 -gt 1 ]; then @@ -35,7 +38,7 @@ if [ $1 -gt 1 ]; then fi %prep -%setup -n %{name}-%{main_version} +%setup -n %{name}-%{version} %build make @@ -46,24 +49,23 @@ rm -rf %{buildroot} make install TARGET_DIR=%{_maindir} \ SBIN_DIR=%{buildroot}%{_sbindir} \ MAN_DIR=%{buildroot}%{_mandir} - VERSION=%{main_version} + VERSION=%{version} mkdir -p %{buildroot}/var/lib/rabbitmq/mnesia mkdir -p %{buildroot}/var/log/rabbitmq mkdir -p %{buildroot}/etc/rc.d/init.d/ #Copy all necessary lib files etc. -cp ../init.d %{buildroot}/etc/rc.d/init.d/rabbitmq-server +install -m 0755 %SOURCE1 %{buildroot}/etc/rc.d/init.d/rabbitmq-server chmod 0755 %{buildroot}/etc/rc.d/init.d/rabbitmq-server mv %{buildroot}/usr/sbin/rabbitmqctl %{buildroot}/usr/sbin/rabbitmqctl_real -cp ../rabbitmqctl_wrapper %{buildroot}/usr/sbin/rabbitmqctl -chmod 0755 %{buildroot}/usr/sbin/rabbitmqctl +install -m 0755 %SOURCE2 %{buildroot}/usr/sbin/rabbitmqctl cp %{buildroot}%{_mandir}/man1/rabbitmqctl.1.gz %{buildroot}%{_mandir}/man1/rabbitmqctl_real.1.gz mkdir -p %{buildroot}/etc/logrotate.d -cp ../rabbitmq-server.logrotate %{buildroot}/etc/logrotate.d/rabbitmq-server +install %SOURCE3 %{buildroot}/etc/logrotate.d/rabbitmq-server %post # create rabbitmq group @@ -95,7 +97,7 @@ fi %files %defattr(-,root,root,-) -%{_libdir}/rabbitmq_server-%{main_version}/ +%{_erllibdir}/rabbitmq_server-%{version}/ %{_mandir}/man1/rabbitmq-multi.1.gz %{_mandir}/man1/rabbitmq-server.1.gz %{_mandir}/man1/rabbitmqctl.1.gz diff --git a/packaging/debs/Debian/debian/control b/packaging/debs/Debian/debian/control index 675e15f4..749791a4 100644 --- a/packaging/debs/Debian/debian/control +++ b/packaging/debs/Debian/debian/control @@ -2,7 +2,7 @@ Source: rabbitmq-server Section: net Priority: extra Maintainer: Tony Garnock-Jones <tonyg@rabbitmq.com> -Build-Depends: cdbs, debhelper (>= 5), erlang-base | erlang-base-hipe, erlang-nox, erlang-dev, erlang-src, make, python, python-json +Build-Depends: cdbs, debhelper (>= 5), erlang-nox, erlang-dev, python-json Standards-Version: 3.7.2 Package: rabbitmq-server diff --git a/packaging/debs/Debian/debian/rabbitmq-server.logrotate b/packaging/debs/Debian/debian/rabbitmq-server.logrotate index 247635d1..bfd6b8da 100644 --- a/packaging/debs/Debian/debian/rabbitmq-server.logrotate +++ b/packaging/debs/Debian/debian/rabbitmq-server.logrotate @@ -9,4 +9,4 @@ postrotate /etc/init.d/rabbitmq-server rotate-logs endscript -}
\ No newline at end of file +} diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server index b930c8ed..c953a753 100755 --- a/scripts/rabbitmq-server +++ b/scripts/rabbitmq-server @@ -66,8 +66,10 @@ erl \ -sasl sasl_error_logger '{file,"'${SASL_LOGS}'"}' \ -os_mon start_cpu_sup true \ -os_mon start_disksup false \ - -os_mon start_memsup false \ + -os_mon start_memsup true \ -os_mon start_os_sup false \ + -os_mon memsup_system_only true \ + -os_mon system_memory_high_watermark 0.95 \ -mnesia dir "\"${MNESIA_DIR}\"" \ ${CLUSTER_CONFIG} \ ${RABBIT_ARGS} \ diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat index f08027d2..38b8cc53 100644 --- a/scripts/rabbitmq-server.bat +++ b/scripts/rabbitmq-server.bat @@ -107,8 +107,10 @@ set MNESIA_DIR=%MNESIA_BASE%/%NODENAME%-mnesia -sasl sasl_error_logger {file,\""%LOG_BASE%/%NODENAME%-sasl.log"\"} ^
-os_mon start_cpu_sup true ^
-os_mon start_disksup false ^
--os_mon start_memsup false ^
+-os_mon start_memsup true ^
-os_mon start_os_sup false ^
+-os_mon memsup_system_only true ^
+-os_mon system_memory_high_watermark 0.95 ^
-mnesia dir \""%MNESIA_DIR%"\" ^
%CLUSTER_CONFIG% ^
%RABBIT_ARGS% ^
diff --git a/src/buffering_proxy.erl b/src/buffering_proxy.erl index d2505701..fcb7b412 100644 --- a/src/buffering_proxy.erl +++ b/src/buffering_proxy.erl @@ -32,6 +32,8 @@ -export([mainloop/4, drain/2]). -export([proxy_loop/3]). +-define(HIBERNATE_AFTER, 5000). + %%---------------------------------------------------------------------------- start_link(M, A) -> @@ -40,7 +42,8 @@ start_link(M, A) -> ProxyPid = self(), Ref = make_ref(), Pid = spawn_link( - fun () -> mainloop(ProxyPid, Ref, M, + fun () -> ProxyPid ! Ref, + mainloop(ProxyPid, Ref, M, M:init(ProxyPid, A)) end), proxy_loop(Ref, Pid, empty) end). @@ -48,14 +51,19 @@ start_link(M, A) -> %%---------------------------------------------------------------------------- mainloop(ProxyPid, Ref, M, State) -> - ProxyPid ! Ref, NewState = receive {Ref, Messages} -> - lists:foldl(fun (Msg, S) -> - drain(M, M:handle_message(Msg, S)) - end, State, lists:reverse(Messages)); + NewSt = + lists:foldl(fun (Msg, S) -> + drain(M, M:handle_message(Msg, S)) + end, State, lists:reverse(Messages)), + ProxyPid ! Ref, + NewSt; Msg -> M:handle_message(Msg, State) + after ?HIBERNATE_AFTER -> + erlang:hibernate(?MODULE, mainloop, + [ProxyPid, Ref, M, State]) end, ?MODULE:mainloop(ProxyPid, Ref, M, NewState). @@ -89,4 +97,6 @@ proxy_loop(Ref, Pid, State) -> waiting -> Pid ! {Ref, [Msg]}, empty; Messages -> [Msg | Messages] end) + after ?HIBERNATE_AFTER -> + erlang:hibernate(?MODULE, proxy_loop, [Ref, Pid, State]) end. diff --git a/src/rabbit.erl b/src/rabbit.erl index c6ef1749..a33c5b7b 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -157,6 +157,8 @@ start(normal, []) -> ok = rabbit_amqqueue:start(), + ok = rabbit_alarm:start(), + ok = rabbit_binary_generator: check_empty_content_body_frame_size(), @@ -198,6 +200,7 @@ start(normal, []) -> stop(_State) -> terminated_ok = error_logger:delete_report_handler(rabbit_error_logger), + ok = rabbit_alarm:stop(), ok. %--------------------------------------------------------------------------- diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl new file mode 100644 index 00000000..d9c1c450 --- /dev/null +++ b/src/rabbit_alarm.erl @@ -0,0 +1,126 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd., +%% Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd., Cohesive Financial Technologies +%% LLC., and Rabbit Technologies Ltd. are Copyright (C) 2007-2008 +%% LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit +%% Technologies Ltd.; +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_alarm). + +-behaviour(gen_event). + +-export([start/0, stop/0, register/2]). + +-export([init/1, handle_call/2, handle_event/2, handle_info/2, + terminate/2, code_change/3]). + +-define(MEMSUP_CHECK_INTERVAL, 1000). + +-record(alarms, {alertees, system_memory_high_watermark = false}). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-type(mfa_tuple() :: {atom(), atom(), list()}). +-spec(start/0 :: () -> 'ok'). +-spec(stop/0 :: () -> 'ok'). +-spec(register/2 :: (pid(), mfa_tuple()) -> 'ok'). + +-endif. + +%%---------------------------------------------------------------------------- + +start() -> + %% The default memsup check interval is 1 minute, which is way too + %% long - rabbit can gobble up all memory in a matter of + %% seconds. Unfortunately the memory_check_interval configuration + %% parameter and memsup:set_check_interval/1 function only provide + %% a granularity of minutes. So we have to peel off one layer of + %% the API to get to the underlying layer which operates at the + %% granularity of milliseconds. + %% + %% Note that the new setting will only take effect after the first + %% check has completed, i.e. after one minute. So if rabbit eats + %% all the memory within the first minute after startup then we + %% are out of luck. + ok = os_mon:call(memsup, {set_check_interval, ?MEMSUP_CHECK_INTERVAL}, + infinity), + + ok = alarm_handler:add_alarm_handler(?MODULE). + +stop() -> + ok = alarm_handler:delete_alarm_handler(?MODULE). + +register(Pid, HighMemMFA) -> + ok = gen_event:call(alarm_handler, ?MODULE, + {register, Pid, HighMemMFA}). + +%%---------------------------------------------------------------------------- + +init([]) -> + {ok, #alarms{alertees = dict:new()}}. + +handle_call({register, Pid, HighMemMFA}, + State = #alarms{alertees = Alertess}) -> + _MRef = erlang:monitor(process, Pid), + case State#alarms.system_memory_high_watermark of + true -> {M, F, A} = HighMemMFA, + ok = erlang:apply(M, F, A ++ [Pid, true]); + false -> ok + end, + NewAlertees = dict:store(Pid, HighMemMFA, Alertess), + {ok, ok, State#alarms{alertees = NewAlertees}}; + +handle_call(_Request, State) -> + {ok, not_understood, State}. + +handle_event({set_alarm, {system_memory_high_watermark, []}}, State) -> + ok = alert(true, State#alarms.alertees), + {ok, State#alarms{system_memory_high_watermark = true}}; + +handle_event({clear_alarm, system_memory_high_watermark}, State) -> + ok = alert(false, State#alarms.alertees), + {ok, State#alarms{system_memory_high_watermark = false}}; + +handle_event(_Event, State) -> + {ok, State}. + +handle_info({'DOWN', _MRef, process, Pid, _Reason}, + State = #alarms{alertees = Alertess}) -> + {ok, State#alarms{alertees = dict:erase(Pid, Alertess)}}; + +handle_info(_Info, State) -> + {ok, State}. + +terminate(_Arg, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%---------------------------------------------------------------------------- + +alert(Alert, Alertees) -> + dict:fold(fun (Pid, {M, F, A}, Acc) -> + ok = erlang:apply(M, F, A ++ [Pid, Alert]), + Acc + end, ok, Alertees). diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index bd64f1e4..56d2c35d 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -29,7 +29,6 @@ -export([pseudo_queue/2]). -export([lookup/1, with/2, with_or_die/2, list_vhost_queues/1, stat/1, stat_all/0, deliver/5, redeliver/2, requeue/3, ack/4]). --export([add_binding/4, delete_binding/4, binding_forcibly_removed/2]). -export([claim_queue/2]). -export([basic_get/3, basic_consume/7, basic_cancel/4]). -export([notify_sent/2]). @@ -53,21 +52,12 @@ -type(qstats() :: {'ok', queue_name(), non_neg_integer(), non_neg_integer()}). -type(qlen() :: {'ok', non_neg_integer()}). -type(qfun(A) :: fun ((amqqueue()) -> A)). --type(bind_res() :: {'ok', non_neg_integer()} | - {'error', 'queue_not_found' | 'exchange_not_found'}). -type(ok_or_errors() :: 'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}). - -spec(start/0 :: () -> 'ok'). -spec(recover/0 :: () -> 'ok'). -spec(declare/4 :: (queue_name(), bool(), bool(), amqp_table()) -> amqqueue()). --spec(add_binding/4 :: - (queue_name(), exchange_name(), routing_key(), amqp_table()) -> - bind_res() | {'error', 'durability_settings_incompatible'}). --spec(delete_binding/4 :: - (queue_name(), exchange_name(), routing_key(), amqp_table()) -> - bind_res() | {'error', 'binding_not_found'}). -spec(lookup/1 :: (queue_name()) -> {'ok', amqqueue()} | not_found()). -spec(with/2 :: (queue_name(), qfun(A)) -> A | not_found()). -spec(with_or_die/2 :: (queue_name(), qfun(A)) -> A). @@ -89,7 +79,6 @@ -spec(commit_all/2 :: ([pid()], txn()) -> ok_or_errors()). -spec(rollback_all/2 :: ([pid()], txn()) -> ok_or_errors()). -spec(notify_down_all/2 :: ([pid()], pid()) -> ok_or_errors()). --spec(binding_forcibly_removed/2 :: (binding_spec(), queue_name()) -> 'ok'). -spec(claim_queue/2 :: (amqqueue(), pid()) -> 'ok' | 'locked'). -spec(basic_get/3 :: (amqqueue(), pid(), bool()) -> {'ok', non_neg_integer(), msg()} | 'empty'). @@ -131,7 +120,7 @@ recover_durable_queues() -> Queues = lists:map(fun start_queue_process/1, R), rabbit_misc:execute_mnesia_transaction( fun () -> - lists:foreach(fun recover_queue/1, Queues), + lists:foreach(fun store_queue/1, Queues), ok end). @@ -140,12 +129,12 @@ declare(QueueName, Durable, AutoDelete, Args) -> durable = Durable, auto_delete = AutoDelete, arguments = Args, - binding_specs = [], pid = none}), case rabbit_misc:execute_mnesia_transaction( fun () -> case mnesia:wread({amqqueue, QueueName}) of - [] -> ok = recover_queue(Q), + [] -> ok = store_queue(Q), + ok = add_default_binding(Q), Q; [ExistingQ] -> ExistingQ end @@ -167,83 +156,12 @@ start_queue_process(Q) -> {ok, Pid} = supervisor:start_child(rabbit_amqqueue_sup, [Q]), Q#amqqueue{pid = Pid}. -recover_queue(Q) -> - ok = store_queue(Q), - ok = recover_bindings(Q), - ok. - -default_binding_spec(#resource{virtual_host = VHost, name = Name}) -> - #binding_spec{exchange_name = rabbit_misc:r(VHost, exchange, <<>>), - routing_key = Name, - arguments = []}. - -recover_bindings(Q = #amqqueue{name = QueueName, binding_specs = Specs}) -> - ok = rabbit_exchange:add_binding(default_binding_spec(QueueName), Q), - lists:foreach(fun (B) -> - ok = rabbit_exchange:add_binding(B, Q) - end, Specs), +add_default_binding(#amqqueue{name = QueueName}) -> + Exchange = rabbit_misc:r(QueueName, exchange, <<>>), + RoutingKey = QueueName#resource.name, + rabbit_exchange:add_binding(Exchange, QueueName, RoutingKey, []), ok. -modify_bindings(QueueName, ExchangeName, RoutingKey, Arguments, - SpecPresentFun, SpecAbsentFun) -> - rabbit_misc:execute_mnesia_transaction( - fun () -> - case mnesia:wread({amqqueue, QueueName}) of - [Q = #amqqueue{binding_specs = Specs0}] -> - Spec = #binding_spec{exchange_name = ExchangeName, - routing_key = RoutingKey, - arguments = Arguments}, - case (case lists:member(Spec, Specs0) of - true -> SpecPresentFun; - false -> SpecAbsentFun - end)(Q, Spec) of - {ok, #amqqueue{binding_specs = Specs}} -> - {ok, length(Specs)}; - {error, not_found} -> - {error, exchange_not_found}; - Other -> Other - end; - [] -> {error, queue_not_found} - end - end). - -update_bindings(Q = #amqqueue{binding_specs = Specs0}, Spec, - UpdateSpecFun, UpdateExchangeFun) -> - Q1 = Q#amqqueue{binding_specs = UpdateSpecFun(Spec, Specs0)}, - case UpdateExchangeFun(Spec, Q1) of - ok -> store_queue(Q1), - {ok, Q1}; - Other -> Other - end. - -add_binding(QueueName, ExchangeName, RoutingKey, Arguments) -> - modify_bindings( - QueueName, ExchangeName, RoutingKey, Arguments, - fun (Q, _Spec) -> {ok, Q} end, - fun (Q, Spec) -> update_bindings( - Q, Spec, - fun (S, Specs) -> [S | Specs] end, - fun rabbit_exchange:add_binding/2) - end). - -delete_binding(QueueName, ExchangeName, RoutingKey, Arguments) -> - modify_bindings( - QueueName, ExchangeName, RoutingKey, Arguments, - fun (Q, Spec) -> update_bindings( - Q, Spec, - fun lists:delete/2, - fun rabbit_exchange:delete_binding/2) - end, - fun (Q, Spec) -> - %% the following is essentially a no-op, though crucially - %% it produces {error, not_found} when the exchange does - %% not exist. - case rabbit_exchange:delete_binding(Spec, Q) of - ok -> {error, binding_not_found}; - Other -> Other - end - end). - lookup(Name) -> rabbit_misc:dirty_read({amqqueue, Name}). @@ -295,38 +213,25 @@ ack(QPid, Txn, MsgIds, ChPid) -> commit_all(QPids, Txn) -> Timeout = length(QPids) * ?CALL_TIMEOUT, safe_pmap_ok( + fun (QPid) -> exit({queue_disappeared, QPid}) end, fun (QPid) -> gen_server:call(QPid, {commit, Txn}, Timeout) end, QPids). rollback_all(QPids, Txn) -> safe_pmap_ok( + fun (QPid) -> exit({queue_disappeared, QPid}) end, fun (QPid) -> gen_server:cast(QPid, {rollback, Txn}) end, QPids). notify_down_all(QPids, ChPid) -> Timeout = length(QPids) * ?CALL_TIMEOUT, safe_pmap_ok( - fun (QPid) -> - rabbit_misc:with_exit_handler( - %% we don't care if the queue process has terminated - %% in the meantime - fun () -> ok end, - fun () -> gen_server:call(QPid, {notify_down, ChPid}, - Timeout) end) - end, + %% we don't care if the queue process has terminated in the + %% meantime + fun (_) -> ok end, + fun (QPid) -> gen_server:call(QPid, {notify_down, ChPid}, Timeout) end, QPids). -binding_forcibly_removed(BindingSpec, QueueName) -> - rabbit_misc:execute_mnesia_transaction( - fun () -> - case mnesia:wread({amqqueue, QueueName}) of - [] -> ok; - [Q = #amqqueue{binding_specs = Specs}] -> - store_queue(Q#amqqueue{binding_specs = - lists:delete(BindingSpec, Specs)}) - end - end). - claim_queue(#amqqueue{pid = QPid}, ReaderPid) -> gen_server:call(QPid, {claim_queue, ReaderPid}). @@ -344,12 +249,6 @@ basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) -> notify_sent(QPid, ChPid) -> gen_server:cast(QPid, {notify_sent, ChPid}). -delete_bindings(Q = #amqqueue{binding_specs = Specs}) -> - lists:foreach(fun (BindingSpec) -> - ok = rabbit_exchange:delete_binding( - BindingSpec, Q) - end, Specs). - internal_delete(QueueName) -> rabbit_misc:execute_mnesia_transaction( fun () -> @@ -362,10 +261,8 @@ internal_delete(QueueName) -> end end). -delete_queue(Q = #amqqueue{name = QueueName}) -> - ok = delete_bindings(Q), - ok = rabbit_exchange:delete_binding( - default_binding_spec(QueueName), Q), +delete_queue(#amqqueue{name = QueueName}) -> + ok = rabbit_exchange:delete_bindings_for_queue(QueueName), ok = mnesia:delete({amqqueue, QueueName}), ok. @@ -385,13 +282,15 @@ pseudo_queue(QueueName, Pid) -> durable = false, auto_delete = false, arguments = [], - binding_specs = [], pid = Pid}. -safe_pmap_ok(F, L) -> +safe_pmap_ok(H, F, L) -> case [R || R <- rabbit_misc:upmap( fun (V) -> - try F(V) + try + rabbit_misc:with_exit_handler( + fun () -> H(V) end, + fun () -> F(V) end) catch Class:Reason -> {Class, Reason} end end, L), @@ -399,4 +298,3 @@ safe_pmap_ok(F, L) -> [] -> ok; Errors -> {error, Errors} end. - diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 7716ef16..e687df84 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -30,6 +30,7 @@ -behaviour(gen_server). -define(UNSENT_MESSAGE_LIMIT, 100). +-define(HIBERNATE_AFTER, 1000). -export([start_link/1]). @@ -75,7 +76,7 @@ init(Q) -> has_had_consumers = false, next_msg_id = 1, message_buffer = queue:new(), - round_robin = queue:new()}}. + round_robin = queue:new()}, ?HIBERNATE_AFTER}. terminate(_Reason, State) -> %% FIXME: How do we cancel active subscriptions? @@ -90,6 +91,10 @@ code_change(_OldVsn, State, _Extra) -> %%---------------------------------------------------------------------------- +reply(Reply, NewState) -> {reply, Reply, NewState, ?HIBERNATE_AFTER}. + +noreply(NewState) -> {noreply, NewState, ?HIBERNATE_AFTER}. + lookup_ch(ChPid) -> case get({ch, ChPid}) of undefined -> not_found; @@ -254,7 +259,7 @@ check_auto_delete(State = #q{round_robin = RoundRobin}) -> handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder, round_robin = ActiveConsumers}) -> case lookup_ch(DownPid) of - not_found -> {noreply, State}; + not_found -> noreply(State); #cr{monitor_ref = MonitorRef, ch_pid = ChPid, unacked_messages = UAM} -> NewActive = block_consumers(ChPid, ActiveConsumers), erlang:demonitor(MonitorRef), @@ -270,7 +275,7 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder, end, round_robin = NewActive})) of {continue, NewState} -> - {noreply, NewState}; + noreply(NewState); {stop, NewState} -> {stop, normal, NewState} end @@ -470,12 +475,12 @@ handle_call({deliver_immediately, Txn, Message}, _From, State) -> %% queues discarding the message? %% {Delivered, NewState} = attempt_delivery(Txn, Message, State), - {reply, Delivered, NewState}; + reply(Delivered, NewState); handle_call({deliver, Txn, Message}, _From, State) -> %% Synchronous, "mandatory" delivery mode {Delivered, NewState} = deliver_or_enqueue(Txn, Message, State), - {reply, Delivered, NewState}; + reply(Delivered, NewState); handle_call({commit, Txn}, From, State) -> ok = commit_work(Txn, qname(State)), @@ -483,7 +488,7 @@ handle_call({commit, Txn}, From, State) -> gen_server:reply(From, ok), NewState = process_pending(Txn, State), erase_tx(Txn), - {noreply, NewState}; + noreply(NewState); handle_call({notify_down, ChPid}, From, State) -> %% optimisation: we reply straight away so the sender can continue @@ -507,10 +512,11 @@ handle_call({basic_get, ChPid, NoAck}, _From, persist_auto_ack(QName, Message) end, Msg = {QName, self(), NextId, Delivered, Message}, - {reply, {ok, queue:len(BufferTail), Msg}, - State#q{message_buffer = BufferTail, next_msg_id = NextId + 1}}; + reply({ok, queue:len(BufferTail), Msg}, + State#q{message_buffer = BufferTail, + next_msg_id = NextId + 1}); {empty, _} -> - {reply, empty, State} + reply(empty, State) end; handle_call({basic_consume, NoAck, ReaderPid, ChPid, ConsumerTag, @@ -520,11 +526,11 @@ handle_call({basic_consume, NoAck, ReaderPid, ChPid, ConsumerTag, round_robin = RoundRobin}) -> case check_queue_owner(Owner, ReaderPid) of mismatch -> - {reply, {error, queue_owned_by_another_connection}, State}; + reply({error, queue_owned_by_another_connection}, State); ok -> case check_exclusive_access(ExistingHolder, ExclusiveConsume) of in_use -> - {reply, {error, exclusive_consume_unavailable}, State}; + reply({error, exclusive_consume_unavailable}, State); ok -> C = #cr{consumers = Consumers} = ch_record(ChPid), Consumer = #consumer{tag = ConsumerTag, ack_required = not(NoAck)}, @@ -538,7 +544,7 @@ handle_call({basic_consume, NoAck, ReaderPid, ChPid, ConsumerTag, end, round_robin = queue:in({ChPid, Consumer}, RoundRobin)}, ok = maybe_send_reply(ChPid, OkMsg), - {reply, ok, run_poke_burst(State1)} + reply(ok, run_poke_burst(State1)) end end; @@ -548,7 +554,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, case lookup_ch(ChPid) of not_found -> ok = maybe_send_reply(ChPid, OkMsg), - {reply, ok, State}; + reply(ok, State); C = #cr{consumers = Consumers} -> NewConsumers = lists:filter (fun (#consumer{tag = CT}) -> CT /= ConsumerTag end, @@ -564,7 +570,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, ConsumerTag, RoundRobin)}) of {continue, State1} -> - {reply, ok, State1}; + reply(ok, State1); {stop, State1} -> {stop, normal, ok, State1} end @@ -573,7 +579,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, handle_call(stat, _From, State = #q{q = #amqqueue{name = Name}, message_buffer = MessageBuffer, round_robin = RoundRobin}) -> - {reply, {ok, Name, queue:len(MessageBuffer), queue:len(RoundRobin)}, State}; + reply({ok, Name, queue:len(MessageBuffer), queue:len(RoundRobin)}, State); handle_call({delete, IfUnused, IfEmpty}, _From, State = #q{message_buffer = MessageBuffer}) -> @@ -581,16 +587,17 @@ handle_call({delete, IfUnused, IfEmpty}, _From, IsUnused = is_unused(), if IfEmpty and not(IsEmpty) -> - {reply, {error, not_empty}, State}; + reply({error, not_empty}, State); IfUnused and not(IsUnused) -> - {reply, {error, in_use}, State}; + reply({error, in_use}, State); true -> {stop, normal, {ok, queue:len(MessageBuffer)}, State} end; handle_call(purge, _From, State = #q{message_buffer = MessageBuffer}) -> ok = purge_message_buffer(qname(State), MessageBuffer), - {reply, {ok, queue:len(MessageBuffer)}, State#q{message_buffer = queue:new()}}; + reply({ok, queue:len(MessageBuffer)}, + State#q{message_buffer = queue:new()}); handle_call({claim_queue, ReaderPid}, _From, State = #q{owner = Owner, exclusive_consumer = Holder}) -> @@ -604,25 +611,25 @@ handle_call({claim_queue, ReaderPid}, _From, State = #q{owner = Owner, %% to check, we'd need to hold not just the ch %% pid for each consumer, but also its reader %% pid... - {reply, locked, State}; + reply(locked, State); ok -> - {reply, ok, State#q{owner = {ReaderPid, erlang:monitor(process, ReaderPid)}}} + reply(ok, State#q{owner = {ReaderPid, erlang:monitor(process, ReaderPid)}}) end; {ReaderPid, _MonitorRef} -> - {reply, ok, State}; + reply(ok, State); _ -> - {reply, locked, State} + reply(locked, State) end. handle_cast({deliver, Txn, Message}, State) -> %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. {_Delivered, NewState} = deliver_or_enqueue(Txn, Message, State), - {noreply, NewState}; + noreply(NewState); handle_cast({ack, Txn, MsgIds, ChPid}, State) -> case lookup_ch(ChPid) of not_found -> - {noreply, State}; + noreply(State); C = #cr{unacked_messages = UAM} -> {Acked, Remaining} = collect_messages(MsgIds, UAM), persist_acks(Txn, qname(State), Acked), @@ -632,37 +639,37 @@ handle_cast({ack, Txn, MsgIds, ChPid}, State) -> _ -> record_pending_acks(Txn, ChPid, MsgIds) end, - {noreply, State} + noreply(State) end; handle_cast({rollback, Txn}, State) -> ok = rollback_work(Txn, qname(State)), erase_tx(Txn), - {noreply, State}; + noreply(State); handle_cast({redeliver, Messages}, State) -> - {noreply, deliver_or_enqueue_n(Messages, State)}; + noreply(deliver_or_enqueue_n(Messages, State)); handle_cast({requeue, MsgIds, ChPid}, State) -> case lookup_ch(ChPid) of not_found -> rabbit_log:warning("Ignoring requeue from unknown ch: ~p~n", [ChPid]), - {noreply, State}; + noreply(State); C = #cr{unacked_messages = UAM} -> {Messages, NewUAM} = collect_messages(MsgIds, UAM), store_ch_record(C#cr{unacked_messages = NewUAM}), - {noreply, deliver_or_enqueue_n( - [{Message, true} || Message <- Messages], State)} + noreply(deliver_or_enqueue_n( + [{Message, true} || Message <- Messages], State)) end; handle_cast({notify_sent, ChPid}, State) -> case lookup_ch(ChPid) of - not_found -> {noreply, State}; + not_found -> noreply(State); T = #cr{unsent_message_count =Count} -> - {noreply, possibly_unblock( - T#cr{unsent_message_count = Count - 1}, - State)} + noreply(possibly_unblock( + T#cr{unsent_message_count = Count - 1}, + State)) end. handle_info({'DOWN', MonitorRef, process, DownPid, _Reason}, @@ -681,6 +688,9 @@ handle_info({'DOWN', MonitorRef, process, DownPid, _Reason}, handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) -> handle_ch_down(DownPid, State); +handle_info(timeout, State) -> + {noreply, State, hibernate}; + handle_info(Info, State) -> ?LOGDEBUG("Info in queue: ~p~n", [Info]), {stop, {unhandled_info, Info}, State}. diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index a9278898..1eb421ca 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -28,7 +28,7 @@ -include("rabbit.hrl"). -export([start_link/4, do/2, do/3, shutdown/1]). --export([send_command/2, deliver/4]). +-export([send_command/2, deliver/4, conserve_memory/2]). %% callbacks -export([init/2, handle_message/2]). @@ -49,6 +49,7 @@ -spec(shutdown/1 :: (pid()) -> 'ok'). -spec(send_command/2 :: (pid(), amqp_method()) -> 'ok'). -spec(deliver/4 :: (pid(), ctag(), bool(), msg()) -> 'ok'). +-spec(conserve_memory/2 :: (pid(), bool()) -> 'ok'). -endif. @@ -77,11 +78,18 @@ deliver(Pid, ConsumerTag, AckRequired, Msg) -> Pid ! {deliver, ConsumerTag, AckRequired, Msg}, ok. +conserve_memory(Pid, Conserve) -> + Pid ! {conserve_memory, Conserve}, + ok. + %%--------------------------------------------------------------------------- init(ProxyPid, [ReaderPid, WriterPid, Username, VHost]) -> process_flag(trap_exit, true), link(WriterPid), + %% this is bypassing the proxy so alarms can "jump the queue" and + %% be handled promptly + rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}), #ch{state = starting, proxy_pid = ProxyPid, reader_pid = ReaderPid, @@ -129,6 +137,11 @@ handle_message({deliver, ConsumerTag, AckRequired, Msg}, true, ConsumerTag, DeliveryTag, Msg), State1#ch{next_tag = DeliveryTag + 1}; +handle_message({conserve_memory, Conserve}, State) -> + ok = rabbit_writer:send_command( + State#ch.writer_pid, #'channel.flow'{active = not(Conserve)}), + State; + handle_message({'EXIT', _Pid, Reason}, State) -> terminate(Reason, State); @@ -572,29 +585,18 @@ handle_method(#'queue.bind'{queue = QueueNameBin, exchange = ExchangeNameBin, routing_key = RoutingKey, nowait = NoWait, - arguments = Arguments}, - _, State = #ch{ virtual_host = VHostPath }) -> - %% FIXME: connection exception (!) on failure?? (see rule named "failure" in spec-XML) - %% FIXME: don't allow binding to internal exchanges - including the one named "" ! - QueueName = expand_queue_name_shortcut(QueueNameBin, State), - ActualRoutingKey = expand_routing_key_shortcut(QueueNameBin, RoutingKey, - State), - ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), - case rabbit_amqqueue:add_binding(QueueName, ExchangeName, - ActualRoutingKey, Arguments) of - {error, queue_not_found} -> - rabbit_misc:protocol_error( - not_found, "no ~s", [rabbit_misc:rs(QueueName)]); - {error, exchange_not_found} -> - rabbit_misc:protocol_error( - not_found, "no ~s", [rabbit_misc:rs(ExchangeName)]); - {error, durability_settings_incompatible} -> - rabbit_misc:protocol_error( - not_allowed, "durability settings of ~s incompatible with ~s", - [rabbit_misc:rs(QueueName), rabbit_misc:rs(ExchangeName)]); - {ok, _BindingCount} -> - return_ok(State, NoWait, #'queue.bind_ok'{}) - end; + arguments = Arguments}, _, State) -> + binding_action(fun rabbit_exchange:add_binding/4, ExchangeNameBin, + QueueNameBin, RoutingKey, Arguments, #'queue.bind_ok'{}, + NoWait, State); + +handle_method(#'queue.unbind'{queue = QueueNameBin, + exchange = ExchangeNameBin, + routing_key = RoutingKey, + arguments = Arguments}, _, State) -> + binding_action(fun rabbit_exchange:delete_binding/4, ExchangeNameBin, + QueueNameBin, RoutingKey, Arguments, #'queue.unbind_ok'{}, + false, State); handle_method(#'queue.purge'{queue = QueueNameBin, nowait = NoWait}, @@ -630,12 +632,47 @@ handle_method(#'channel.flow'{active = _}, _, State) -> %% FIXME: implement {reply, #'channel.flow_ok'{active = true}, State}; +handle_method(#'channel.flow_ok'{active = _}, _, State) -> + %% TODO: We may want to correlate this to channel.flow messages we + %% have sent, and complain if we get an unsolicited + %% channel.flow_ok, or the client refuses our flow request. + {noreply, State}; + handle_method(_MethodRecord, _Content, _State) -> rabbit_misc:protocol_error( command_invalid, "unimplemented method", []). %%---------------------------------------------------------------------------- +binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, + ReturnMethod, NoWait, State = #ch{virtual_host = VHostPath}) -> + %% FIXME: connection exception (!) on failure?? + %% (see rule named "failure" in spec-XML) + %% FIXME: don't allow binding to internal exchanges - + %% including the one named "" ! + QueueName = expand_queue_name_shortcut(QueueNameBin, State), + ActualRoutingKey = expand_routing_key_shortcut(QueueNameBin, RoutingKey, + State), + ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), + case Fun(ExchangeName, QueueName, ActualRoutingKey, Arguments) of + {error, queue_not_found} -> + rabbit_misc:protocol_error( + not_found, "no ~s", [rabbit_misc:rs(QueueName)]); + {error, exchange_not_found} -> + rabbit_misc:protocol_error( + not_found, "no ~s", [rabbit_misc:rs(ExchangeName)]); + {error, binding_not_found} -> + rabbit_misc:protocol_error( + not_found, "no binding ~s between ~s and ~s", + [RoutingKey, rabbit_misc:rs(ExchangeName), + rabbit_misc:rs(QueueName)]); + {error, durability_settings_incompatible} -> + rabbit_misc:protocol_error( + not_allowed, "durability settings of ~s incompatible with ~s", + [rabbit_misc:rs(QueueName), rabbit_misc:rs(ExchangeName)]); + ok -> return_ok(State, NoWait, ReturnMethod) + end. + publish(Mandatory, Immediate, Message, QPids, State = #ch{transaction_id = TxnKey, writer_pid = WriterPid}) -> Handled = deliver(QPids, Mandatory, Immediate, TxnKey, @@ -717,7 +754,8 @@ internal_commit(State = #ch{transaction_id = TxnKey, case rabbit_amqqueue:commit_all(sets:to_list(Participants), TxnKey) of ok -> new_tx(State); - {error, Errors} -> exit({commit_failed, Errors}) + {error, Errors} -> rabbit_misc:protocol_error( + internal_error, "commit failed: ~w", [Errors]) end. internal_rollback(State = #ch{transaction_id = TxnKey, @@ -732,7 +770,8 @@ internal_rollback(State = #ch{transaction_id = TxnKey, TxnKey) of ok -> NewUAMQ = queue:join(UAQ, UAMQ), new_tx(State#ch{unacked_message_q = NewUAMQ}); - {error, Errors} -> exit({rollback_failed, Errors}) + {error, Errors} -> rabbit_misc:protocol_error( + internal_error, "rollback failed: ~w", [Errors]) end. fold_per_queue(F, Acc0, UAQ) -> diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index bb132a50..a8c54438 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -29,13 +29,18 @@ -include("rabbit_framing.hrl"). -export([recover/0, declare/5, lookup/1, lookup_or_die/1, - list_vhost_exchanges/1, list_exchange_bindings/1, + list_vhost_exchanges/1, simple_publish/6, simple_publish/3, route/2]). --export([add_binding/2, delete_binding/2]). +-export([add_binding/4, delete_binding/4]). -export([delete/2]). +-export([delete_bindings_for_queue/1]). -export([check_type/1, assert_type/2, topic_matches/2]). +%% EXTENDED API +-export([list_exchange_bindings/1]). +-export([list_queue_bindings/1]). + -import(mnesia). -import(sets). -import(lists). @@ -48,7 +53,8 @@ -type(publish_res() :: {'ok', [pid()]} | not_found() | {'error', 'unroutable' | 'not_delivered'}). - +-type(bind_res() :: 'ok' | + {'error', 'queue_not_found' | 'exchange_not_found'}). -spec(recover/0 :: () -> 'ok'). -spec(declare/5 :: (exchange_name(), exchange_type(), bool(), bool(), amqp_table()) -> exchange()). @@ -57,37 +63,46 @@ -spec(lookup/1 :: (exchange_name()) -> {'ok', exchange()} | not_found()). -spec(lookup_or_die/1 :: (exchange_name()) -> exchange()). -spec(list_vhost_exchanges/1 :: (vhost()) -> [exchange()]). --spec(list_exchange_bindings/1 :: (exchange_name()) -> - [{queue_name(), routing_key(), amqp_table()}]). -spec(simple_publish/6 :: (bool(), bool(), exchange_name(), routing_key(), binary(), binary()) -> publish_res()). -spec(simple_publish/3 :: (bool(), bool(), message()) -> publish_res()). -spec(route/2 :: (exchange(), routing_key()) -> [pid()]). --spec(add_binding/2 :: (binding_spec(), amqqueue()) -> - 'ok' | not_found() | - {'error', 'durability_settings_incompatible'}). --spec(delete_binding/2 :: (binding_spec(), amqqueue()) -> - 'ok' | not_found()). +-spec(add_binding/4 :: + (exchange_name(), queue_name(), routing_key(), amqp_table()) -> + bind_res() | {'error', 'durability_settings_incompatible'}). +-spec(delete_binding/4 :: + (exchange_name(), queue_name(), routing_key(), amqp_table()) -> + bind_res() | {'error', 'binding_not_found'}). +-spec(delete_bindings_for_queue/1 :: (queue_name()) -> 'ok'). -spec(topic_matches/2 :: (binary(), binary()) -> bool()). -spec(delete/2 :: (exchange_name(), bool()) -> 'ok' | not_found() | {'error', 'in_use'}). +-spec(list_queue_bindings/1 :: (queue_name()) -> + [{exchange_name(), routing_key(), amqp_table()}]). +-spec(list_exchange_bindings/1 :: (exchange_name()) -> + [{queue_name(), routing_key(), amqp_table()}]). -endif. %%---------------------------------------------------------------------------- recover() -> - ok = recover_durable_exchanges(), - ok. - -recover_durable_exchanges() -> rabbit_misc:execute_mnesia_transaction( fun () -> - mnesia:foldl(fun (Exchange, Acc) -> - ok = mnesia:write(Exchange), - Acc - end, ok, durable_exchanges) + mnesia:foldl( + fun (Exchange, Acc) -> + ok = mnesia:write(Exchange), + Acc + end, ok, durable_exchanges), + mnesia:foldl( + fun (Route, Acc) -> + {_, ReverseRoute} = route_with_reverse(Route), + ok = mnesia:write(Route), + ok = mnesia:write(ReverseRoute), + Acc + end, ok, durable_routes), + ok end). declare(ExchangeName, Type, Durable, AutoDelete, Args) -> @@ -143,22 +158,9 @@ list_vhost_exchanges(VHostPath) -> mnesia:dirty_match_object( #exchange{name = rabbit_misc:r(VHostPath, exchange), _ = '_'}). -list_exchange_bindings(Name) -> - [{QueueName, RoutingKey, Arguments} || - #binding{handlers = Handlers} <- bindings_for_exchange(Name), - #handler{binding_spec = #binding_spec{routing_key = RoutingKey, - arguments = Arguments}, - queue = QueueName} <- Handlers]. - -bindings_for_exchange(Name) -> - qlc:e(qlc:q([B || B = #binding{key = K} <- mnesia:table(binding), - element(1, K) == Name])). - -empty_handlers() -> - []. - %% Usable by Erlang code that wants to publish messages. -simple_publish(Mandatory, Immediate, ExchangeName, RoutingKeyBin, ContentTypeBin, BodyBin) -> +simple_publish(Mandatory, Immediate, ExchangeName, RoutingKeyBin, + ContentTypeBin, BodyBin) -> {ClassId, _MethodId} = rabbit_framing:method_id('basic.publish'), Content = #content{class_id = ClassId, properties = #'P_basic'{content_type = ContentTypeBin}, @@ -188,121 +190,173 @@ simple_publish(Mandatory, Immediate, %% The function ensures that a qpid appears in the return list exactly %% as many times as a message should be delivered to it. With the %% current exchange types that is at most once. +%% +%% TODO: Maybe this should be handled by a cursor instead. route(#exchange{name = Name, type = topic}, RoutingKey) -> - sets:to_list( - sets:union( - mnesia:activity( - async_dirty, - fun () -> - qlc:e(qlc:q([handler_qpids(H) || - #binding{key = {Name1, PatternKey}, - handlers = H} - <- mnesia:table(binding), - Name == Name1, - topic_matches(PatternKey, RoutingKey)])) - end))); - -route(#exchange{name = Name, type = Type}, RoutingKey) -> - BindingKey = delivery_key_for_type(Type, Name, RoutingKey), - case rabbit_misc:dirty_read({binding, BindingKey}) of - {ok, #binding{handlers = H}} -> sets:to_list(handler_qpids(H)); - {error, not_found} -> [] - end. + Query = qlc:q([QName || + #route{binding = #binding{ + exchange_name = ExchangeName, + queue_name = QName, + key = BindingKey}} <- mnesia:table(route), + ExchangeName == Name, + %% TODO: This causes a full scan for each entry + %% with the same exchange (see bug 19336) + topic_matches(BindingKey, RoutingKey)]), + lookup_qpids(mnesia:async_dirty(fun qlc:e/1, [Query])); + +route(X = #exchange{type = fanout}, _) -> + route_internal(X, '_'); + +route(X = #exchange{type = direct}, RoutingKey) -> + route_internal(X, RoutingKey). + +route_internal(#exchange{name = Name}, RoutingKey) -> + MatchHead = #route{binding = #binding{exchange_name = Name, + queue_name = '$1', + key = RoutingKey, + _ = '_'}}, + lookup_qpids(mnesia:dirty_select(route, [{MatchHead, [], ['$1']}])). + +lookup_qpids(Queues) -> + sets:fold( + fun(Key, Acc) -> + [#amqqueue{pid = QPid}] = mnesia:dirty_read({amqqueue, Key}), + [QPid | Acc] + end, [], sets:from_list(Queues)). + +%% TODO: Should all of the route and binding management not be +%% refactored to its own module, especially seeing as unbind will have +%% to be implemented for 0.91 ? + +delete_bindings_for_exchange(ExchangeName) -> + indexed_delete( + #route{binding = #binding{exchange_name = ExchangeName, + _ = '_'}}, + fun delete_forward_routes/1, fun mnesia:delete_object/1). + +delete_bindings_for_queue(QueueName) -> + Exchanges = exchanges_for_queue(QueueName), + indexed_delete( + reverse_route(#route{binding = #binding{queue_name = QueueName, + _ = '_'}}), + fun mnesia:delete_object/1, fun delete_forward_routes/1), + [begin + [X] = mnesia:read({exchange, ExchangeName}), + ok = maybe_auto_delete(X) + end || ExchangeName <- Exchanges], + ok. -delivery_key_for_type(fanout, Name, _RoutingKey) -> - {Name, fanout}; -delivery_key_for_type(_Type, Name, RoutingKey) -> - {Name, RoutingKey}. +indexed_delete(Match, ForwardsDeleteFun, ReverseDeleteFun) -> + [begin + ok = ReverseDeleteFun(reverse_route(Route)), + ok = ForwardsDeleteFun(Route) + end || Route <- mnesia:match_object(Match)], + ok. -call_with_exchange(Name, Fun) -> - case mnesia:wread({exchange, Name}) of - [] -> {error, not_found}; - [X] -> Fun(X) - end. +delete_forward_routes(Route) -> + ok = mnesia:delete_object(Route), + ok = mnesia:delete_object(durable_routes, Route, write). -make_handler(BindingSpec, #amqqueue{name = QueueName, pid = QPid}) -> - #handler{binding_spec = BindingSpec, queue = QueueName, qpid = QPid}. +exchanges_for_queue(QueueName) -> + MatchHead = reverse_route( + #route{binding = #binding{exchange_name = '$1', + queue_name = QueueName, + _ = '_'}}), + sets:to_list( + sets:from_list( + mnesia:select(reverse_route, [{MatchHead, [], ['$1']}]))). -add_binding(BindingSpec = #binding_spec{exchange_name = ExchangeName, - routing_key = RoutingKey}, Q) -> - call_with_exchange( - ExchangeName, - fun (X) -> if Q#amqqueue.durable and not(X#exchange.durable) -> - {error, durability_settings_incompatible}; - true -> - internal_add_binding( - X, RoutingKey, make_handler(BindingSpec, Q)) - end +has_bindings(ExchangeName) -> + MatchHead = #route{binding = #binding{exchange_name = ExchangeName, + queue_name = '$1', + _ = '_'}}, + continue(mnesia:select(route, [{MatchHead, [], ['$1']}], 1, read)). + +continue('$end_of_table') -> false; +continue({[_|_], _}) -> true; +continue({[], Continuation}) -> continue(mnesia:select(Continuation)). + +call_with_exchange(Exchange, Fun) -> + rabbit_misc:execute_mnesia_transaction( + fun() -> case mnesia:read({exchange, Exchange}) of + [] -> {error, exchange_not_found}; + [X] -> Fun(X) + end end). -delete_binding(BindingSpec = #binding_spec{exchange_name = ExchangeName, - routing_key = RoutingKey}, Q) -> +call_with_exchange_and_queue(Exchange, Queue, Fun) -> call_with_exchange( - ExchangeName, - fun (X) -> ok = internal_delete_binding( - X, RoutingKey, make_handler(BindingSpec, Q)), - maybe_auto_delete(X) + Exchange, + fun(X) -> case mnesia:read({amqqueue, Queue}) of + [] -> {error, queue_not_found}; + [Q] -> Fun(X, Q) + end end). -%% Must run within a transaction. -maybe_auto_delete(#exchange{auto_delete = false}) -> - ok; -maybe_auto_delete(#exchange{name = ExchangeName, auto_delete = true}) -> - case internal_delete(ExchangeName, true) of - {error, in_use} -> ok; - ok -> ok - end. - -handlers_isempty([]) -> true; -handlers_isempty([_|_]) -> false. - -extend_handlers(Handlers, Handler) -> [Handler | Handlers]. - -delete_handler(Handlers, Handler) -> lists:delete(Handler, Handlers). - -handler_qpids(Handlers) -> - sets:from_list([QPid || #handler{qpid = QPid} <- Handlers]). +add_binding(ExchangeName, QueueName, RoutingKey, Arguments) -> + call_with_exchange_and_queue( + ExchangeName, QueueName, + fun (X, Q) -> + if Q#amqqueue.durable and not(X#exchange.durable) -> + {error, durability_settings_incompatible}; + true -> ok = sync_binding( + ExchangeName, QueueName, RoutingKey, Arguments, + Q#amqqueue.durable, fun mnesia:write/3) + end + end). -%% Must run within a transaction. -internal_add_binding(#exchange{name = ExchangeName, type = Type}, - RoutingKey, Handler) -> - BindingKey = delivery_key_for_type(Type, ExchangeName, RoutingKey), - ok = add_handler_to_binding(BindingKey, Handler). +delete_binding(ExchangeName, QueueName, RoutingKey, Arguments) -> + call_with_exchange_and_queue( + ExchangeName, QueueName, + fun (X, Q) -> + ok = sync_binding( + ExchangeName, QueueName, RoutingKey, Arguments, + Q#amqqueue.durable, fun mnesia:delete_object/3), + maybe_auto_delete(X) + end). -%% Must run within a transaction. -internal_delete_binding(#exchange{name = ExchangeName, type = Type}, RoutingKey, Handler) -> - BindingKey = delivery_key_for_type(Type, ExchangeName, RoutingKey), - remove_handler_from_binding(BindingKey, Handler), +sync_binding(ExchangeName, QueueName, RoutingKey, Arguments, Durable, Fun) -> + Binding = #binding{exchange_name = ExchangeName, + queue_name = QueueName, + key = RoutingKey, + args = Arguments}, + ok = case Durable of + true -> Fun(durable_routes, #route{binding = Binding}, write); + false -> ok + end, + [ok, ok] = [Fun(element(1, R), R, write) || + R <- tuple_to_list(route_with_reverse(Binding))], ok. -%% Must run within a transaction. -add_handler_to_binding(BindingKey, Handler) -> - ok = case mnesia:wread({binding, BindingKey}) of - [] -> - ok = mnesia:write( - #binding{key = BindingKey, - handlers = extend_handlers( - empty_handlers(), Handler)}); - [B = #binding{handlers = H}] -> - ok = mnesia:write( - B#binding{handlers = extend_handlers(H, Handler)}) - end. - -%% Must run within a transaction. -remove_handler_from_binding(BindingKey, Handler) -> - case mnesia:wread({binding, BindingKey}) of - [] -> empty; - [B = #binding{handlers = H}] -> - H1 = delete_handler(H, Handler), - case handlers_isempty(H1) of - true -> - ok = mnesia:delete({binding, BindingKey}), - empty; - _ -> - ok = mnesia:write(B#binding{handlers = H1}), - not_empty - end - end. +route_with_reverse(#route{binding = Binding}) -> + route_with_reverse(Binding); +route_with_reverse(Binding = #binding{}) -> + Route = #route{binding = Binding}, + {Route, reverse_route(Route)}. + +reverse_route(#route{binding = Binding}) -> + #reverse_route{reverse_binding = reverse_binding(Binding)}; + +reverse_route(#reverse_route{reverse_binding = Binding}) -> + #route{binding = reverse_binding(Binding)}. + +reverse_binding(#reverse_binding{exchange_name = Exchange, + queue_name = Queue, + key = Key, + args = Args}) -> + #binding{exchange_name = Exchange, + queue_name = Queue, + key = Key, + args = Args}; + +reverse_binding(#binding{exchange_name = Exchange, + queue_name = Queue, + key = Key, + args = Args}) -> + #reverse_binding{exchange_name = Exchange, + queue_name = Queue, + key = Key, + args = Args}. split_topic_key(Key) -> {ok, KeySplit} = regexp:split(binary_to_list(Key), "\\."), @@ -331,46 +385,50 @@ last_topic_match(P, R, []) -> last_topic_match(P, R, [BacktrackNext | BacktrackList]) -> topic_matches1(P, R) or last_topic_match(P, [BacktrackNext | R], BacktrackList). -delete(ExchangeName, IfUnused) -> - rabbit_misc:execute_mnesia_transaction( - fun () -> internal_delete(ExchangeName, IfUnused) end). - -internal_delete(ExchangeName, _IfUnused = true) -> - Bindings = bindings_for_exchange(ExchangeName), - case Bindings of - [] -> do_internal_delete(ExchangeName, Bindings); - _ -> - case lists:all(fun (#binding{handlers = H}) -> handlers_isempty(H) end, - Bindings) of - true -> - %% There are no handlers anywhere in any of the - %% bindings for this exchange. - do_internal_delete(ExchangeName, Bindings); - false -> - %% There was at least one real handler - %% present. It's still in use. - {error, in_use} - end - end; -internal_delete(ExchangeName, false) -> - do_internal_delete(ExchangeName, bindings_for_exchange(ExchangeName)). - -forcibly_remove_handlers(Handlers) -> - lists:foreach( - fun (#handler{binding_spec = BindingSpec, queue = QueueName}) -> - ok = rabbit_amqqueue:binding_forcibly_removed( - BindingSpec, QueueName) - end, Handlers), +delete(ExchangeName, _IfUnused = true) -> + call_with_exchange(ExchangeName, fun conditional_delete/1); +delete(ExchangeName, _IfUnused = false) -> + call_with_exchange(ExchangeName, fun unconditional_delete/1). + +maybe_auto_delete(#exchange{auto_delete = false}) -> + ok; +maybe_auto_delete(Exchange = #exchange{auto_delete = true}) -> + conditional_delete(Exchange), ok. -do_internal_delete(ExchangeName, Bindings) -> - case mnesia:wread({exchange, ExchangeName}) of - [] -> {error, not_found}; - _ -> - lists:foreach(fun (#binding{key = K, handlers = H}) -> - ok = forcibly_remove_handlers(H), - ok = mnesia:delete({binding, K}) - end, Bindings), - ok = mnesia:delete({durable_exchanges, ExchangeName}), - ok = mnesia:delete({exchange, ExchangeName}) +conditional_delete(Exchange = #exchange{name = ExchangeName}) -> + case has_bindings(ExchangeName) of + false -> unconditional_delete(Exchange); + true -> {error, in_use} end. + +unconditional_delete(#exchange{name = ExchangeName}) -> + ok = delete_bindings_for_exchange(ExchangeName), + ok = mnesia:delete({durable_exchanges, ExchangeName}), + ok = mnesia:delete({exchange, ExchangeName}). + +%%---------------------------------------------------------------------------- +%% EXTENDED API +%% These are API calls that are not used by the server internally, +%% they are exported for embedded clients to use + +%% This is currently used in mod_rabbit.erl (XMPP) and expects this to +%% return {QueueName, RoutingKey, Arguments} tuples +list_exchange_bindings(ExchangeName) -> + Route = #route{binding = #binding{exchange_name = ExchangeName, + _ = '_'}}, + [{QueueName, RoutingKey, Arguments} || + #route{binding = #binding{queue_name = QueueName, + key = RoutingKey, + args = Arguments}} + <- mnesia:dirty_match_object(Route)]. + +% Refactoring is left as an exercise for the reader +list_queue_bindings(QueueName) -> + Route = #route{binding = #binding{queue_name = QueueName, + _ = '_'}}, + [{ExchangeName, RoutingKey, Arguments} || + #route{binding = #binding{exchange_name = ExchangeName, + key = RoutingKey, + args = Arguments}} + <- mnesia:dirty_match_object(Route)]. diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 3e4ed8f3..7638af58 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -34,7 +34,7 @@ -export([dirty_read/1]). -export([r/3, r/2, rs/1]). -export([enable_cover/0, report_cover/0]). --export([with_exit_handler/2]). +-export([throw_on_error/2, with_exit_handler/2]). -export([with_user/2, with_vhost/2, with_user_and_vhost/3]). -export([execute_mnesia_transaction/1]). -export([ensure_ok/2]). @@ -68,7 +68,8 @@ -spec(get_config/2 :: (atom(), A) -> A). -spec(set_config/2 :: (atom(), any()) -> 'ok'). -spec(dirty_read/1 :: ({atom(), any()}) -> {'ok', any()} | not_found()). --spec(r/3 :: (vhost(), K, resource_name()) -> r(K) when is_subtype(K, atom())). +-spec(r/3 :: (vhost() | r(atom()), K, resource_name()) -> r(K) + when is_subtype(K, atom())). -spec(r/2 :: (vhost(), K) -> #resource{virtual_host :: vhost(), kind :: K, name :: '_'} @@ -76,6 +77,8 @@ -spec(rs/1 :: (r(atom())) -> string()). -spec(enable_cover/0 :: () -> 'ok' | {'error', any()}). -spec(report_cover/0 :: () -> 'ok'). +-spec(throw_on_error/2 :: + (atom(), thunk({error, any()} | {ok, A} | A)) -> A). -spec(with_exit_handler/2 :: (thunk(A), thunk(A)) -> A). -spec(with_user/2 :: (username(), thunk(A)) -> A). -spec(with_vhost/2 :: (vhost(), thunk(A)) -> A). @@ -197,11 +200,19 @@ report_coverage_percentage(File, Cov, NotCov, Mod) -> end, Mod]). +throw_on_error(E, Thunk) -> + case Thunk() of + {error, Reason} -> throw({E, Reason}); + {ok, Res} -> Res; + Res -> Res + end. + with_exit_handler(Handler, Thunk) -> try Thunk() catch - exit:{R, _} when R =:= noproc; R =:= normal -> Handler() + exit:{R, _} when R =:= noproc; R =:= normal; R =:= shutdown -> + Handler() end. with_user(Username, Thunk) -> @@ -227,6 +238,7 @@ with_vhost(VHostPath, Thunk) -> with_user_and_vhost(Username, VHostPath, Thunk) -> with_user(Username, with_vhost(VHostPath, Thunk)). + execute_mnesia_transaction(TxFun) -> %% Making this a sync_transaction allows us to use dirty_read %% elsewhere and get a consistent result even when that read diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 4ae367ba..9b67135d 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -105,7 +105,13 @@ table_definitions() -> {rabbit_config, [{disc_copies, [node()]}]}, {listener, [{type, bag}, {attributes, record_info(fields, listener)}]}, - {binding, [{attributes, record_info(fields, binding)}]}, + {durable_routes, [{disc_copies, [node()]}, + {record_name, route}, + {attributes, record_info(fields, route)}]}, + {route, [{type, ordered_set}, + {attributes, record_info(fields, route)}]}, + {reverse_route, [{type, ordered_set}, + {attributes, record_info(fields, reverse_route)}]}, {durable_exchanges, [{disc_copies, [node()]}, {record_name, exchange}, {attributes, record_info(fields, exchange)}]}, @@ -255,7 +261,7 @@ init_db(ClusterNodes) -> end. create_schema() -> - mnesia:stop(), + mnesia:stop(), rabbit_misc:ensure_ok(mnesia:create_schema([node()]), cannot_create_schema), rabbit_misc:ensure_ok(mnesia:start(), diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 7e68b3ed..ce26c11a 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -166,14 +166,27 @@ teardown_profiling(Value) -> fprof:analyse([{dest, []}, {cols, 100}]) end. +inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F). + +peername(Sock) -> + try + {Address, Port} = inet_op(fun () -> inet:peername(Sock) end), + AddressS = inet_parse:ntoa(Address), + {AddressS, Port} + catch + Ex -> rabbit_log:error("error on TCP connection ~p:~p~n", + [self(), Ex]), + rabbit_log:info("closing TCP connection ~p", [self()]), + exit(normal) + end. + start_connection(Parent, Deb, ClientSock) -> - ProfilingValue = setup_profiling(), process_flag(trap_exit, true), - {ok, {PeerAddress, PeerPort}} = inet:peername(ClientSock), - PeerAddressS = inet_parse:ntoa(PeerAddress), - rabbit_log:info("starting TCP connection ~p from ~s:~p~n", - [self(), PeerAddressS, PeerPort]), + {PeerAddressS, PeerPort} = peername(ClientSock), + ProfilingValue = setup_profiling(), try + rabbit_log:info("starting TCP connection ~p from ~s:~p~n", + [self(), PeerAddressS, PeerPort]), erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(), handshake_timeout), mainloop(Parent, Deb, switch_callback( @@ -266,7 +279,8 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) -> end. switch_callback(OldState, NewCallback, Length) -> - {ok, Ref} = prim_inet:async_recv(OldState#v1.sock, Length, -1), + Ref = inet_op(fun () -> prim_inet:async_recv( + OldState#v1.sock, Length, -1) end), OldState#v1{callback = NewCallback, recv_ref = Ref}. @@ -472,7 +486,10 @@ handle_input(handshake, <<"AMQP",1,1,ProtocolMajor,ProtocolMinor>>, end; handle_input(handshake, Other, #v1{sock = Sock}) -> - ok = gen_tcp:send(Sock, <<"AMQP",1,1,?PROTOCOL_VERSION_MAJOR,?PROTOCOL_VERSION_MINOR>>), + ok = inet_op(fun () -> gen_tcp:send( + Sock, <<"AMQP",1,1, + ?PROTOCOL_VERSION_MAJOR, + ?PROTOCOL_VERSION_MINOR>>) end), throw({bad_header, Other}); handle_input(Callback, Data, _State) -> diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl index 0f6bca91..a2688625 100644 --- a/src/rabbit_writer.erl +++ b/src/rabbit_writer.erl @@ -36,6 +36,8 @@ -record(wstate, {sock, channel, frame_max}). +-define(HIBERNATE_AFTER, 5000). + %%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -63,6 +65,8 @@ start(Sock, Channel, FrameMax) -> mainloop(State) -> receive Message -> ?MODULE:mainloop(handle_message(Message, State)) + after ?HIBERNATE_AFTER -> + erlang:hibernate(?MODULE, mainloop, [State]) end. handle_message({send_command, MethodRecord}, @@ -127,12 +131,16 @@ assemble_frames(Channel, MethodRecord, Content, FrameMax) -> Channel, Content, FrameMax), [MethodFrame | ContentFrames]. +tcp_send(Sock, Data) -> + rabbit_misc:throw_on_error(inet_error, + fun () -> gen_tcp:send(Sock, Data) end). + internal_send_command(Sock, Channel, MethodRecord) -> - ok = gen_tcp:send(Sock, assemble_frames(Channel, MethodRecord)). + ok = tcp_send(Sock, assemble_frames(Channel, MethodRecord)). internal_send_command(Sock, Channel, MethodRecord, Content, FrameMax) -> - ok = gen_tcp:send(Sock, assemble_frames(Channel, MethodRecord, - Content, FrameMax)). + ok = tcp_send(Sock, assemble_frames(Channel, MethodRecord, + Content, FrameMax)). %% gen_tcp:send/2 does a selective receive of {inet_reply, Sock, %% Status} to obtain the result. That is bad when it is called from diff --git a/src/tcp_listener.erl b/src/tcp_listener.erl index 3943161a..dc38b594 100644 --- a/src/tcp_listener.erl +++ b/src/tcp_listener.erl @@ -58,9 +58,9 @@ init({IPAddress, Port, SocketOpts, AcceptorSup, [LSock]) end, lists:duplicate(ConcurrentAcceptorCount, dummy)), - error_logger:info_msg( - "started TCP listener on ~s:~p~n", - [inet_parse:ntoa(IPAddress), Port]), + {ok, {LIPAddress, LPort}} = inet:sockname(LSock), + error_logger:info_msg("started TCP listener on ~s:~p~n", + [inet_parse:ntoa(LIPAddress), LPort]), apply(M, F, A ++ [IPAddress, Port]), {ok, #state{sock=LSock, on_startup = OnStartup, on_shutdown = OnShutdown}}; |