diff options
author | Matthias Radestock <matthias@lshift.net> | 2008-11-07 17:45:07 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@lshift.net> | 2008-11-07 17:45:07 +0000 |
commit | db9617bb8dd76e23c067269d5bdc517a4bee27d4 (patch) | |
tree | e4c34f05516628f7773040ad62f11bf7e9ff4272 | |
parent | 9b82bfcb2133bd40759b9cc063b86c06e2523825 (diff) | |
parent | 78e2a8e7fdff0850e873a9f85b475a4ec36f97da (diff) | |
download | rabbitmq-server-db9617bb8dd76e23c067269d5bdc517a4bee27d4.tar.gz |
merge bug19250 into default
-rw-r--r-- | include/rabbit.hrl | 20 | ||||
-rw-r--r-- | include/rabbit_framing_spec.hrl | 1 | ||||
-rw-r--r-- | packaging/Makefile | 3 | ||||
-rwxr-xr-x | packaging/checks.sh | 45 | ||||
-rw-r--r-- | packaging/debs/Debian/Makefile | 1 | ||||
-rwxr-xr-x | scripts/rabbitmq-server | 4 | ||||
-rw-r--r-- | scripts/rabbitmq-server.bat | 4 | ||||
-rw-r--r-- | src/buffering_proxy.erl | 7 | ||||
-rw-r--r-- | src/rabbit.erl | 3 | ||||
-rw-r--r-- | src/rabbit_alarm.erl | 126 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 120 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 78 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 30 | ||||
-rw-r--r-- | src/rabbit_exchange.erl | 406 | ||||
-rw-r--r-- | src/rabbit_misc.erl | 7 | ||||
-rw-r--r-- | src/rabbit_mnesia.erl | 10 | ||||
-rw-r--r-- | src/rabbit_writer.erl | 4 | ||||
-rw-r--r-- | src/tcp_listener.erl | 6 |
18 files changed, 483 insertions, 392 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl index 180a0dc3..706a92af 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -43,11 +43,14 @@ -record(exchange, {name, type, durable, auto_delete, arguments}). --record(amqqueue, {name, durable, auto_delete, arguments, binding_specs, pid}). --record(binding_spec, {exchange_name, routing_key, arguments}). +-record(amqqueue, {name, durable, auto_delete, arguments, pid}). --record(binding, {key, handlers}). --record(handler, {binding_spec, queue, qpid}). +%% mnesia doesn't like unary records, so we add a dummy 'value' field +-record(route, {binding, value = const}). +-record(reverse_route, {reverse_binding, value = const}). + +-record(binding, {exchange_name, key, queue_name, args = []}). +-record(reverse_binding, {queue_name, key, exchange_name, args = []}). -record(listener, {node, protocol, host, port}). @@ -77,16 +80,11 @@ -type(user() :: #user{username :: username(), password :: password()}). --type(binding_spec() :: - #binding_spec{exchange_name :: exchange_name(), - routing_key :: routing_key(), - arguments :: amqp_table()}). -type(amqqueue() :: #amqqueue{name :: queue_name(), durable :: bool(), auto_delete :: bool(), arguments :: amqp_table(), - binding_specs :: [binding_spec()], pid :: maybe(pid())}). -type(exchange() :: #exchange{name :: exchange_name(), @@ -94,6 +92,10 @@ durable :: bool(), auto_delete :: bool(), arguments :: amqp_table()}). +-type(binding() :: + #binding{exchange_name :: exchange_name(), + queue_name :: queue_name(), + key :: binding_key()}). %% TODO: make this more precise by tying specific class_ids to %% specific properties -type(undecoded_content() :: diff --git a/include/rabbit_framing_spec.hrl b/include/rabbit_framing_spec.hrl index e9e65092..13000153 100644 --- a/include/rabbit_framing_spec.hrl +++ b/include/rabbit_framing_spec.hrl @@ -53,3 +53,4 @@ -type(vhost() :: binary()). -type(ctag() :: binary()). -type(exchange_type() :: 'direct' | 'topic' | 'fanout'). +-type(binding_key() :: binary()). diff --git a/packaging/Makefile b/packaging/Makefile deleted file mode 100644 index 44a9b328..00000000 --- a/packaging/Makefile +++ /dev/null @@ -1,3 +0,0 @@ -check_tools: - @sh ./checks.sh - @echo All the needed tools seem to be installed, great! diff --git a/packaging/checks.sh b/packaging/checks.sh deleted file mode 100755 index 63e88701..00000000 --- a/packaging/checks.sh +++ /dev/null @@ -1,45 +0,0 @@ -#! /bin/sh - -# We check for the presence of the tools necessary to build a release on a -# Debian based OS. - -TOOLS_STOP=0 - -checker () { - if [ ! `which $1` ] - then - echo "$1 is missing, please install it" - TOOLS_STOP=1 - NEW_NAME=`echo $1 | sed -e 's/-/_/g'` - eval "$NEW_NAME=1" - else - echo "$1 found" - fi -}; - -echo ~~~~~~~~~~~~ Looking for mandatory programs ~~~~~~~~~~~~ - -for i in cdbs-edit-patch reprepro rpm elinks wget zip gpg rsync -do - checker $i -done -echo ~~~~~~~~~~~~~~~~~~~~~~~~~~ DONE ~~~~~~~~~~~~~~~~~~~~~~~ - -if [ 1 = $TOOLS_STOP ] -then - [ $cdbs_edit_patch ] && cdbs_edit_patch="cdbs " - [ $reprepro ] && reprepro="reprepro " - [ $rpm ] && rpm="rpm " - [ $elinks ] && elinks="elinks " - [ $wget ] && wget="wget " - [ $zip ] && zip="zip " - [ $gpg ] && gpg="gpg " - [ $rsync ] && rsync="rsync " - - echo - echo We suggest you run the command - echo "apt-get install ${cdbs_edit_patch}${reprepro}${rpm}${elinks}${wget}${zip}${gpg}${rsync}" - echo -fi - -exit $TOOLS_STOP diff --git a/packaging/debs/Debian/Makefile b/packaging/debs/Debian/Makefile index 3e74cb52..9479feb0 100644 --- a/packaging/debs/Debian/Makefile +++ b/packaging/debs/Debian/Makefile @@ -16,7 +16,6 @@ all: @echo 'Please choose a target from the Makefile.' package: clean - make -C ../.. check_tools cp $(TARBALL_DIR)/$(TARBALL) $(DEBIAN_ORIG_TARBALL) tar -zxvf $(DEBIAN_ORIG_TARBALL) cp -r debian $(UNPACKED_DIR) diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server index b930c8ed..c953a753 100755 --- a/scripts/rabbitmq-server +++ b/scripts/rabbitmq-server @@ -66,8 +66,10 @@ erl \ -sasl sasl_error_logger '{file,"'${SASL_LOGS}'"}' \ -os_mon start_cpu_sup true \ -os_mon start_disksup false \ - -os_mon start_memsup false \ + -os_mon start_memsup true \ -os_mon start_os_sup false \ + -os_mon memsup_system_only true \ + -os_mon system_memory_high_watermark 0.95 \ -mnesia dir "\"${MNESIA_DIR}\"" \ ${CLUSTER_CONFIG} \ ${RABBIT_ARGS} \ diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat index f08027d2..38b8cc53 100644 --- a/scripts/rabbitmq-server.bat +++ b/scripts/rabbitmq-server.bat @@ -107,8 +107,10 @@ set MNESIA_DIR=%MNESIA_BASE%/%NODENAME%-mnesia -sasl sasl_error_logger {file,\""%LOG_BASE%/%NODENAME%-sasl.log"\"} ^
-os_mon start_cpu_sup true ^
-os_mon start_disksup false ^
--os_mon start_memsup false ^
+-os_mon start_memsup true ^
-os_mon start_os_sup false ^
+-os_mon memsup_system_only true ^
+-os_mon system_memory_high_watermark 0.95 ^
-mnesia dir \""%MNESIA_DIR%"\" ^
%CLUSTER_CONFIG% ^
%RABBIT_ARGS% ^
diff --git a/src/buffering_proxy.erl b/src/buffering_proxy.erl index dc168608..fcb7b412 100644 --- a/src/buffering_proxy.erl +++ b/src/buffering_proxy.erl @@ -32,6 +32,8 @@ -export([mainloop/4, drain/2]). -export([proxy_loop/3]). +-define(HIBERNATE_AFTER, 5000). + %%---------------------------------------------------------------------------- start_link(M, A) -> @@ -59,6 +61,9 @@ mainloop(ProxyPid, Ref, M, State) -> ProxyPid ! Ref, NewSt; Msg -> M:handle_message(Msg, State) + after ?HIBERNATE_AFTER -> + erlang:hibernate(?MODULE, mainloop, + [ProxyPid, Ref, M, State]) end, ?MODULE:mainloop(ProxyPid, Ref, M, NewState). @@ -92,4 +97,6 @@ proxy_loop(Ref, Pid, State) -> waiting -> Pid ! {Ref, [Msg]}, empty; Messages -> [Msg | Messages] end) + after ?HIBERNATE_AFTER -> + erlang:hibernate(?MODULE, proxy_loop, [Ref, Pid, State]) end. diff --git a/src/rabbit.erl b/src/rabbit.erl index c6ef1749..a33c5b7b 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -157,6 +157,8 @@ start(normal, []) -> ok = rabbit_amqqueue:start(), + ok = rabbit_alarm:start(), + ok = rabbit_binary_generator: check_empty_content_body_frame_size(), @@ -198,6 +200,7 @@ start(normal, []) -> stop(_State) -> terminated_ok = error_logger:delete_report_handler(rabbit_error_logger), + ok = rabbit_alarm:stop(), ok. %--------------------------------------------------------------------------- diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl new file mode 100644 index 00000000..d9c1c450 --- /dev/null +++ b/src/rabbit_alarm.erl @@ -0,0 +1,126 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd., +%% Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd., Cohesive Financial Technologies +%% LLC., and Rabbit Technologies Ltd. are Copyright (C) 2007-2008 +%% LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit +%% Technologies Ltd.; +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_alarm). + +-behaviour(gen_event). + +-export([start/0, stop/0, register/2]). + +-export([init/1, handle_call/2, handle_event/2, handle_info/2, + terminate/2, code_change/3]). + +-define(MEMSUP_CHECK_INTERVAL, 1000). + +-record(alarms, {alertees, system_memory_high_watermark = false}). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-type(mfa_tuple() :: {atom(), atom(), list()}). +-spec(start/0 :: () -> 'ok'). +-spec(stop/0 :: () -> 'ok'). +-spec(register/2 :: (pid(), mfa_tuple()) -> 'ok'). + +-endif. + +%%---------------------------------------------------------------------------- + +start() -> + %% The default memsup check interval is 1 minute, which is way too + %% long - rabbit can gobble up all memory in a matter of + %% seconds. Unfortunately the memory_check_interval configuration + %% parameter and memsup:set_check_interval/1 function only provide + %% a granularity of minutes. So we have to peel off one layer of + %% the API to get to the underlying layer which operates at the + %% granularity of milliseconds. + %% + %% Note that the new setting will only take effect after the first + %% check has completed, i.e. after one minute. So if rabbit eats + %% all the memory within the first minute after startup then we + %% are out of luck. + ok = os_mon:call(memsup, {set_check_interval, ?MEMSUP_CHECK_INTERVAL}, + infinity), + + ok = alarm_handler:add_alarm_handler(?MODULE). + +stop() -> + ok = alarm_handler:delete_alarm_handler(?MODULE). + +register(Pid, HighMemMFA) -> + ok = gen_event:call(alarm_handler, ?MODULE, + {register, Pid, HighMemMFA}). + +%%---------------------------------------------------------------------------- + +init([]) -> + {ok, #alarms{alertees = dict:new()}}. + +handle_call({register, Pid, HighMemMFA}, + State = #alarms{alertees = Alertess}) -> + _MRef = erlang:monitor(process, Pid), + case State#alarms.system_memory_high_watermark of + true -> {M, F, A} = HighMemMFA, + ok = erlang:apply(M, F, A ++ [Pid, true]); + false -> ok + end, + NewAlertees = dict:store(Pid, HighMemMFA, Alertess), + {ok, ok, State#alarms{alertees = NewAlertees}}; + +handle_call(_Request, State) -> + {ok, not_understood, State}. + +handle_event({set_alarm, {system_memory_high_watermark, []}}, State) -> + ok = alert(true, State#alarms.alertees), + {ok, State#alarms{system_memory_high_watermark = true}}; + +handle_event({clear_alarm, system_memory_high_watermark}, State) -> + ok = alert(false, State#alarms.alertees), + {ok, State#alarms{system_memory_high_watermark = false}}; + +handle_event(_Event, State) -> + {ok, State}. + +handle_info({'DOWN', _MRef, process, Pid, _Reason}, + State = #alarms{alertees = Alertess}) -> + {ok, State#alarms{alertees = dict:erase(Pid, Alertess)}}; + +handle_info(_Info, State) -> + {ok, State}. + +terminate(_Arg, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%---------------------------------------------------------------------------- + +alert(Alert, Alertees) -> + dict:fold(fun (Pid, {M, F, A}, Acc) -> + ok = erlang:apply(M, F, A ++ [Pid, Alert]), + Acc + end, ok, Alertees). diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 7b2f801a..56d2c35d 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -29,7 +29,6 @@ -export([pseudo_queue/2]). -export([lookup/1, with/2, with_or_die/2, list_vhost_queues/1, stat/1, stat_all/0, deliver/5, redeliver/2, requeue/3, ack/4]). --export([add_binding/4, delete_binding/4, binding_forcibly_removed/2]). -export([claim_queue/2]). -export([basic_get/3, basic_consume/7, basic_cancel/4]). -export([notify_sent/2]). @@ -53,21 +52,12 @@ -type(qstats() :: {'ok', queue_name(), non_neg_integer(), non_neg_integer()}). -type(qlen() :: {'ok', non_neg_integer()}). -type(qfun(A) :: fun ((amqqueue()) -> A)). --type(bind_res() :: {'ok', non_neg_integer()} | - {'error', 'queue_not_found' | 'exchange_not_found'}). -type(ok_or_errors() :: 'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}). - -spec(start/0 :: () -> 'ok'). -spec(recover/0 :: () -> 'ok'). -spec(declare/4 :: (queue_name(), bool(), bool(), amqp_table()) -> amqqueue()). --spec(add_binding/4 :: - (queue_name(), exchange_name(), routing_key(), amqp_table()) -> - bind_res() | {'error', 'durability_settings_incompatible'}). --spec(delete_binding/4 :: - (queue_name(), exchange_name(), routing_key(), amqp_table()) -> - bind_res() | {'error', 'binding_not_found'}). -spec(lookup/1 :: (queue_name()) -> {'ok', amqqueue()} | not_found()). -spec(with/2 :: (queue_name(), qfun(A)) -> A | not_found()). -spec(with_or_die/2 :: (queue_name(), qfun(A)) -> A). @@ -89,7 +79,6 @@ -spec(commit_all/2 :: ([pid()], txn()) -> ok_or_errors()). -spec(rollback_all/2 :: ([pid()], txn()) -> ok_or_errors()). -spec(notify_down_all/2 :: ([pid()], pid()) -> ok_or_errors()). --spec(binding_forcibly_removed/2 :: (binding_spec(), queue_name()) -> 'ok'). -spec(claim_queue/2 :: (amqqueue(), pid()) -> 'ok' | 'locked'). -spec(basic_get/3 :: (amqqueue(), pid(), bool()) -> {'ok', non_neg_integer(), msg()} | 'empty'). @@ -131,7 +120,7 @@ recover_durable_queues() -> Queues = lists:map(fun start_queue_process/1, R), rabbit_misc:execute_mnesia_transaction( fun () -> - lists:foreach(fun recover_queue/1, Queues), + lists:foreach(fun store_queue/1, Queues), ok end). @@ -140,12 +129,12 @@ declare(QueueName, Durable, AutoDelete, Args) -> durable = Durable, auto_delete = AutoDelete, arguments = Args, - binding_specs = [], pid = none}), case rabbit_misc:execute_mnesia_transaction( fun () -> case mnesia:wread({amqqueue, QueueName}) of - [] -> ok = recover_queue(Q), + [] -> ok = store_queue(Q), + ok = add_default_binding(Q), Q; [ExistingQ] -> ExistingQ end @@ -167,83 +156,12 @@ start_queue_process(Q) -> {ok, Pid} = supervisor:start_child(rabbit_amqqueue_sup, [Q]), Q#amqqueue{pid = Pid}. -recover_queue(Q) -> - ok = store_queue(Q), - ok = recover_bindings(Q), - ok. - -default_binding_spec(#resource{virtual_host = VHost, name = Name}) -> - #binding_spec{exchange_name = rabbit_misc:r(VHost, exchange, <<>>), - routing_key = Name, - arguments = []}. - -recover_bindings(Q = #amqqueue{name = QueueName, binding_specs = Specs}) -> - ok = rabbit_exchange:add_binding(default_binding_spec(QueueName), Q), - lists:foreach(fun (B) -> - ok = rabbit_exchange:add_binding(B, Q) - end, Specs), +add_default_binding(#amqqueue{name = QueueName}) -> + Exchange = rabbit_misc:r(QueueName, exchange, <<>>), + RoutingKey = QueueName#resource.name, + rabbit_exchange:add_binding(Exchange, QueueName, RoutingKey, []), ok. -modify_bindings(QueueName, ExchangeName, RoutingKey, Arguments, - SpecPresentFun, SpecAbsentFun) -> - rabbit_misc:execute_mnesia_transaction( - fun () -> - case mnesia:wread({amqqueue, QueueName}) of - [Q = #amqqueue{binding_specs = Specs0}] -> - Spec = #binding_spec{exchange_name = ExchangeName, - routing_key = RoutingKey, - arguments = Arguments}, - case (case lists:member(Spec, Specs0) of - true -> SpecPresentFun; - false -> SpecAbsentFun - end)(Q, Spec) of - {ok, #amqqueue{binding_specs = Specs}} -> - {ok, length(Specs)}; - {error, not_found} -> - {error, exchange_not_found}; - Other -> Other - end; - [] -> {error, queue_not_found} - end - end). - -update_bindings(Q = #amqqueue{binding_specs = Specs0}, Spec, - UpdateSpecFun, UpdateExchangeFun) -> - Q1 = Q#amqqueue{binding_specs = UpdateSpecFun(Spec, Specs0)}, - case UpdateExchangeFun(Spec, Q1) of - ok -> store_queue(Q1), - {ok, Q1}; - Other -> Other - end. - -add_binding(QueueName, ExchangeName, RoutingKey, Arguments) -> - modify_bindings( - QueueName, ExchangeName, RoutingKey, Arguments, - fun (Q, _Spec) -> {ok, Q} end, - fun (Q, Spec) -> update_bindings( - Q, Spec, - fun (S, Specs) -> [S | Specs] end, - fun rabbit_exchange:add_binding/2) - end). - -delete_binding(QueueName, ExchangeName, RoutingKey, Arguments) -> - modify_bindings( - QueueName, ExchangeName, RoutingKey, Arguments, - fun (Q, Spec) -> update_bindings( - Q, Spec, - fun lists:delete/2, - fun rabbit_exchange:delete_binding/2) - end, - fun (Q, Spec) -> - %% the following is essentially a no-op, though crucially - %% it produces {error, not_found} when the exchange does - %% not exist. - case rabbit_exchange:delete_binding(Spec, Q) of - ok -> {error, binding_not_found}; - Other -> Other - end - end). - lookup(Name) -> rabbit_misc:dirty_read({amqqueue, Name}). @@ -314,17 +232,6 @@ notify_down_all(QPids, ChPid) -> fun (QPid) -> gen_server:call(QPid, {notify_down, ChPid}, Timeout) end, QPids). -binding_forcibly_removed(BindingSpec, QueueName) -> - rabbit_misc:execute_mnesia_transaction( - fun () -> - case mnesia:wread({amqqueue, QueueName}) of - [] -> ok; - [Q = #amqqueue{binding_specs = Specs}] -> - store_queue(Q#amqqueue{binding_specs = - lists:delete(BindingSpec, Specs)}) - end - end). - claim_queue(#amqqueue{pid = QPid}, ReaderPid) -> gen_server:call(QPid, {claim_queue, ReaderPid}). @@ -342,12 +249,6 @@ basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) -> notify_sent(QPid, ChPid) -> gen_server:cast(QPid, {notify_sent, ChPid}). -delete_bindings(Q = #amqqueue{binding_specs = Specs}) -> - lists:foreach(fun (BindingSpec) -> - ok = rabbit_exchange:delete_binding( - BindingSpec, Q) - end, Specs). - internal_delete(QueueName) -> rabbit_misc:execute_mnesia_transaction( fun () -> @@ -360,10 +261,8 @@ internal_delete(QueueName) -> end end). -delete_queue(Q = #amqqueue{name = QueueName}) -> - ok = delete_bindings(Q), - ok = rabbit_exchange:delete_binding( - default_binding_spec(QueueName), Q), +delete_queue(#amqqueue{name = QueueName}) -> + ok = rabbit_exchange:delete_bindings_for_queue(QueueName), ok = mnesia:delete({amqqueue, QueueName}), ok. @@ -383,7 +282,6 @@ pseudo_queue(QueueName, Pid) -> durable = false, auto_delete = false, arguments = [], - binding_specs = [], pid = Pid}. safe_pmap_ok(H, F, L) -> diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 7716ef16..e687df84 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -30,6 +30,7 @@ -behaviour(gen_server). -define(UNSENT_MESSAGE_LIMIT, 100). +-define(HIBERNATE_AFTER, 1000). -export([start_link/1]). @@ -75,7 +76,7 @@ init(Q) -> has_had_consumers = false, next_msg_id = 1, message_buffer = queue:new(), - round_robin = queue:new()}}. + round_robin = queue:new()}, ?HIBERNATE_AFTER}. terminate(_Reason, State) -> %% FIXME: How do we cancel active subscriptions? @@ -90,6 +91,10 @@ code_change(_OldVsn, State, _Extra) -> %%---------------------------------------------------------------------------- +reply(Reply, NewState) -> {reply, Reply, NewState, ?HIBERNATE_AFTER}. + +noreply(NewState) -> {noreply, NewState, ?HIBERNATE_AFTER}. + lookup_ch(ChPid) -> case get({ch, ChPid}) of undefined -> not_found; @@ -254,7 +259,7 @@ check_auto_delete(State = #q{round_robin = RoundRobin}) -> handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder, round_robin = ActiveConsumers}) -> case lookup_ch(DownPid) of - not_found -> {noreply, State}; + not_found -> noreply(State); #cr{monitor_ref = MonitorRef, ch_pid = ChPid, unacked_messages = UAM} -> NewActive = block_consumers(ChPid, ActiveConsumers), erlang:demonitor(MonitorRef), @@ -270,7 +275,7 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder, end, round_robin = NewActive})) of {continue, NewState} -> - {noreply, NewState}; + noreply(NewState); {stop, NewState} -> {stop, normal, NewState} end @@ -470,12 +475,12 @@ handle_call({deliver_immediately, Txn, Message}, _From, State) -> %% queues discarding the message? %% {Delivered, NewState} = attempt_delivery(Txn, Message, State), - {reply, Delivered, NewState}; + reply(Delivered, NewState); handle_call({deliver, Txn, Message}, _From, State) -> %% Synchronous, "mandatory" delivery mode {Delivered, NewState} = deliver_or_enqueue(Txn, Message, State), - {reply, Delivered, NewState}; + reply(Delivered, NewState); handle_call({commit, Txn}, From, State) -> ok = commit_work(Txn, qname(State)), @@ -483,7 +488,7 @@ handle_call({commit, Txn}, From, State) -> gen_server:reply(From, ok), NewState = process_pending(Txn, State), erase_tx(Txn), - {noreply, NewState}; + noreply(NewState); handle_call({notify_down, ChPid}, From, State) -> %% optimisation: we reply straight away so the sender can continue @@ -507,10 +512,11 @@ handle_call({basic_get, ChPid, NoAck}, _From, persist_auto_ack(QName, Message) end, Msg = {QName, self(), NextId, Delivered, Message}, - {reply, {ok, queue:len(BufferTail), Msg}, - State#q{message_buffer = BufferTail, next_msg_id = NextId + 1}}; + reply({ok, queue:len(BufferTail), Msg}, + State#q{message_buffer = BufferTail, + next_msg_id = NextId + 1}); {empty, _} -> - {reply, empty, State} + reply(empty, State) end; handle_call({basic_consume, NoAck, ReaderPid, ChPid, ConsumerTag, @@ -520,11 +526,11 @@ handle_call({basic_consume, NoAck, ReaderPid, ChPid, ConsumerTag, round_robin = RoundRobin}) -> case check_queue_owner(Owner, ReaderPid) of mismatch -> - {reply, {error, queue_owned_by_another_connection}, State}; + reply({error, queue_owned_by_another_connection}, State); ok -> case check_exclusive_access(ExistingHolder, ExclusiveConsume) of in_use -> - {reply, {error, exclusive_consume_unavailable}, State}; + reply({error, exclusive_consume_unavailable}, State); ok -> C = #cr{consumers = Consumers} = ch_record(ChPid), Consumer = #consumer{tag = ConsumerTag, ack_required = not(NoAck)}, @@ -538,7 +544,7 @@ handle_call({basic_consume, NoAck, ReaderPid, ChPid, ConsumerTag, end, round_robin = queue:in({ChPid, Consumer}, RoundRobin)}, ok = maybe_send_reply(ChPid, OkMsg), - {reply, ok, run_poke_burst(State1)} + reply(ok, run_poke_burst(State1)) end end; @@ -548,7 +554,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, case lookup_ch(ChPid) of not_found -> ok = maybe_send_reply(ChPid, OkMsg), - {reply, ok, State}; + reply(ok, State); C = #cr{consumers = Consumers} -> NewConsumers = lists:filter (fun (#consumer{tag = CT}) -> CT /= ConsumerTag end, @@ -564,7 +570,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, ConsumerTag, RoundRobin)}) of {continue, State1} -> - {reply, ok, State1}; + reply(ok, State1); {stop, State1} -> {stop, normal, ok, State1} end @@ -573,7 +579,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, handle_call(stat, _From, State = #q{q = #amqqueue{name = Name}, message_buffer = MessageBuffer, round_robin = RoundRobin}) -> - {reply, {ok, Name, queue:len(MessageBuffer), queue:len(RoundRobin)}, State}; + reply({ok, Name, queue:len(MessageBuffer), queue:len(RoundRobin)}, State); handle_call({delete, IfUnused, IfEmpty}, _From, State = #q{message_buffer = MessageBuffer}) -> @@ -581,16 +587,17 @@ handle_call({delete, IfUnused, IfEmpty}, _From, IsUnused = is_unused(), if IfEmpty and not(IsEmpty) -> - {reply, {error, not_empty}, State}; + reply({error, not_empty}, State); IfUnused and not(IsUnused) -> - {reply, {error, in_use}, State}; + reply({error, in_use}, State); true -> {stop, normal, {ok, queue:len(MessageBuffer)}, State} end; handle_call(purge, _From, State = #q{message_buffer = MessageBuffer}) -> ok = purge_message_buffer(qname(State), MessageBuffer), - {reply, {ok, queue:len(MessageBuffer)}, State#q{message_buffer = queue:new()}}; + reply({ok, queue:len(MessageBuffer)}, + State#q{message_buffer = queue:new()}); handle_call({claim_queue, ReaderPid}, _From, State = #q{owner = Owner, exclusive_consumer = Holder}) -> @@ -604,25 +611,25 @@ handle_call({claim_queue, ReaderPid}, _From, State = #q{owner = Owner, %% to check, we'd need to hold not just the ch %% pid for each consumer, but also its reader %% pid... - {reply, locked, State}; + reply(locked, State); ok -> - {reply, ok, State#q{owner = {ReaderPid, erlang:monitor(process, ReaderPid)}}} + reply(ok, State#q{owner = {ReaderPid, erlang:monitor(process, ReaderPid)}}) end; {ReaderPid, _MonitorRef} -> - {reply, ok, State}; + reply(ok, State); _ -> - {reply, locked, State} + reply(locked, State) end. handle_cast({deliver, Txn, Message}, State) -> %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. {_Delivered, NewState} = deliver_or_enqueue(Txn, Message, State), - {noreply, NewState}; + noreply(NewState); handle_cast({ack, Txn, MsgIds, ChPid}, State) -> case lookup_ch(ChPid) of not_found -> - {noreply, State}; + noreply(State); C = #cr{unacked_messages = UAM} -> {Acked, Remaining} = collect_messages(MsgIds, UAM), persist_acks(Txn, qname(State), Acked), @@ -632,37 +639,37 @@ handle_cast({ack, Txn, MsgIds, ChPid}, State) -> _ -> record_pending_acks(Txn, ChPid, MsgIds) end, - {noreply, State} + noreply(State) end; handle_cast({rollback, Txn}, State) -> ok = rollback_work(Txn, qname(State)), erase_tx(Txn), - {noreply, State}; + noreply(State); handle_cast({redeliver, Messages}, State) -> - {noreply, deliver_or_enqueue_n(Messages, State)}; + noreply(deliver_or_enqueue_n(Messages, State)); handle_cast({requeue, MsgIds, ChPid}, State) -> case lookup_ch(ChPid) of not_found -> rabbit_log:warning("Ignoring requeue from unknown ch: ~p~n", [ChPid]), - {noreply, State}; + noreply(State); C = #cr{unacked_messages = UAM} -> {Messages, NewUAM} = collect_messages(MsgIds, UAM), store_ch_record(C#cr{unacked_messages = NewUAM}), - {noreply, deliver_or_enqueue_n( - [{Message, true} || Message <- Messages], State)} + noreply(deliver_or_enqueue_n( + [{Message, true} || Message <- Messages], State)) end; handle_cast({notify_sent, ChPid}, State) -> case lookup_ch(ChPid) of - not_found -> {noreply, State}; + not_found -> noreply(State); T = #cr{unsent_message_count =Count} -> - {noreply, possibly_unblock( - T#cr{unsent_message_count = Count - 1}, - State)} + noreply(possibly_unblock( + T#cr{unsent_message_count = Count - 1}, + State)) end. handle_info({'DOWN', MonitorRef, process, DownPid, _Reason}, @@ -681,6 +688,9 @@ handle_info({'DOWN', MonitorRef, process, DownPid, _Reason}, handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) -> handle_ch_down(DownPid, State); +handle_info(timeout, State) -> + {noreply, State, hibernate}; + handle_info(Info, State) -> ?LOGDEBUG("Info in queue: ~p~n", [Info]), {stop, {unhandled_info, Info}, State}. diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 0544d32e..1eb421ca 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -28,7 +28,7 @@ -include("rabbit.hrl"). -export([start_link/4, do/2, do/3, shutdown/1]). --export([send_command/2, deliver/4]). +-export([send_command/2, deliver/4, conserve_memory/2]). %% callbacks -export([init/2, handle_message/2]). @@ -49,6 +49,7 @@ -spec(shutdown/1 :: (pid()) -> 'ok'). -spec(send_command/2 :: (pid(), amqp_method()) -> 'ok'). -spec(deliver/4 :: (pid(), ctag(), bool(), msg()) -> 'ok'). +-spec(conserve_memory/2 :: (pid(), bool()) -> 'ok'). -endif. @@ -77,11 +78,18 @@ deliver(Pid, ConsumerTag, AckRequired, Msg) -> Pid ! {deliver, ConsumerTag, AckRequired, Msg}, ok. +conserve_memory(Pid, Conserve) -> + Pid ! {conserve_memory, Conserve}, + ok. + %%--------------------------------------------------------------------------- init(ProxyPid, [ReaderPid, WriterPid, Username, VHost]) -> process_flag(trap_exit, true), link(WriterPid), + %% this is bypassing the proxy so alarms can "jump the queue" and + %% be handled promptly + rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}), #ch{state = starting, proxy_pid = ProxyPid, reader_pid = ReaderPid, @@ -129,6 +137,11 @@ handle_message({deliver, ConsumerTag, AckRequired, Msg}, true, ConsumerTag, DeliveryTag, Msg), State1#ch{next_tag = DeliveryTag + 1}; +handle_message({conserve_memory, Conserve}, State) -> + ok = rabbit_writer:send_command( + State#ch.writer_pid, #'channel.flow'{active = not(Conserve)}), + State; + handle_message({'EXIT', _Pid, Reason}, State) -> terminate(Reason, State); @@ -573,7 +586,7 @@ handle_method(#'queue.bind'{queue = QueueNameBin, routing_key = RoutingKey, nowait = NoWait, arguments = Arguments}, _, State) -> - binding_action(fun rabbit_amqqueue:add_binding/4, ExchangeNameBin, + binding_action(fun rabbit_exchange:add_binding/4, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, #'queue.bind_ok'{}, NoWait, State); @@ -581,7 +594,7 @@ handle_method(#'queue.unbind'{queue = QueueNameBin, exchange = ExchangeNameBin, routing_key = RoutingKey, arguments = Arguments}, _, State) -> - binding_action(fun rabbit_amqqueue:delete_binding/4, ExchangeNameBin, + binding_action(fun rabbit_exchange:delete_binding/4, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, #'queue.unbind_ok'{}, false, State); @@ -619,6 +632,12 @@ handle_method(#'channel.flow'{active = _}, _, State) -> %% FIXME: implement {reply, #'channel.flow_ok'{active = true}, State}; +handle_method(#'channel.flow_ok'{active = _}, _, State) -> + %% TODO: We may want to correlate this to channel.flow messages we + %% have sent, and complain if we get an unsolicited + %% channel.flow_ok, or the client refuses our flow request. + {noreply, State}; + handle_method(_MethodRecord, _Content, _State) -> rabbit_misc:protocol_error( command_invalid, "unimplemented method", []). @@ -635,7 +654,7 @@ binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, ActualRoutingKey = expand_routing_key_shortcut(QueueNameBin, RoutingKey, State), ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), - case Fun(QueueName, ExchangeName, ActualRoutingKey, Arguments) of + case Fun(ExchangeName, QueueName, ActualRoutingKey, Arguments) of {error, queue_not_found} -> rabbit_misc:protocol_error( not_found, "no ~s", [rabbit_misc:rs(QueueName)]); @@ -651,8 +670,7 @@ binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, rabbit_misc:protocol_error( not_allowed, "durability settings of ~s incompatible with ~s", [rabbit_misc:rs(QueueName), rabbit_misc:rs(ExchangeName)]); - {ok, _BindingCount} -> - return_ok(State, NoWait, ReturnMethod) + ok -> return_ok(State, NoWait, ReturnMethod) end. publish(Mandatory, Immediate, Message, QPids, diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index bb132a50..a8c54438 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -29,13 +29,18 @@ -include("rabbit_framing.hrl"). -export([recover/0, declare/5, lookup/1, lookup_or_die/1, - list_vhost_exchanges/1, list_exchange_bindings/1, + list_vhost_exchanges/1, simple_publish/6, simple_publish/3, route/2]). --export([add_binding/2, delete_binding/2]). +-export([add_binding/4, delete_binding/4]). -export([delete/2]). +-export([delete_bindings_for_queue/1]). -export([check_type/1, assert_type/2, topic_matches/2]). +%% EXTENDED API +-export([list_exchange_bindings/1]). +-export([list_queue_bindings/1]). + -import(mnesia). -import(sets). -import(lists). @@ -48,7 +53,8 @@ -type(publish_res() :: {'ok', [pid()]} | not_found() | {'error', 'unroutable' | 'not_delivered'}). - +-type(bind_res() :: 'ok' | + {'error', 'queue_not_found' | 'exchange_not_found'}). -spec(recover/0 :: () -> 'ok'). -spec(declare/5 :: (exchange_name(), exchange_type(), bool(), bool(), amqp_table()) -> exchange()). @@ -57,37 +63,46 @@ -spec(lookup/1 :: (exchange_name()) -> {'ok', exchange()} | not_found()). -spec(lookup_or_die/1 :: (exchange_name()) -> exchange()). -spec(list_vhost_exchanges/1 :: (vhost()) -> [exchange()]). --spec(list_exchange_bindings/1 :: (exchange_name()) -> - [{queue_name(), routing_key(), amqp_table()}]). -spec(simple_publish/6 :: (bool(), bool(), exchange_name(), routing_key(), binary(), binary()) -> publish_res()). -spec(simple_publish/3 :: (bool(), bool(), message()) -> publish_res()). -spec(route/2 :: (exchange(), routing_key()) -> [pid()]). --spec(add_binding/2 :: (binding_spec(), amqqueue()) -> - 'ok' | not_found() | - {'error', 'durability_settings_incompatible'}). --spec(delete_binding/2 :: (binding_spec(), amqqueue()) -> - 'ok' | not_found()). +-spec(add_binding/4 :: + (exchange_name(), queue_name(), routing_key(), amqp_table()) -> + bind_res() | {'error', 'durability_settings_incompatible'}). +-spec(delete_binding/4 :: + (exchange_name(), queue_name(), routing_key(), amqp_table()) -> + bind_res() | {'error', 'binding_not_found'}). +-spec(delete_bindings_for_queue/1 :: (queue_name()) -> 'ok'). -spec(topic_matches/2 :: (binary(), binary()) -> bool()). -spec(delete/2 :: (exchange_name(), bool()) -> 'ok' | not_found() | {'error', 'in_use'}). +-spec(list_queue_bindings/1 :: (queue_name()) -> + [{exchange_name(), routing_key(), amqp_table()}]). +-spec(list_exchange_bindings/1 :: (exchange_name()) -> + [{queue_name(), routing_key(), amqp_table()}]). -endif. %%---------------------------------------------------------------------------- recover() -> - ok = recover_durable_exchanges(), - ok. - -recover_durable_exchanges() -> rabbit_misc:execute_mnesia_transaction( fun () -> - mnesia:foldl(fun (Exchange, Acc) -> - ok = mnesia:write(Exchange), - Acc - end, ok, durable_exchanges) + mnesia:foldl( + fun (Exchange, Acc) -> + ok = mnesia:write(Exchange), + Acc + end, ok, durable_exchanges), + mnesia:foldl( + fun (Route, Acc) -> + {_, ReverseRoute} = route_with_reverse(Route), + ok = mnesia:write(Route), + ok = mnesia:write(ReverseRoute), + Acc + end, ok, durable_routes), + ok end). declare(ExchangeName, Type, Durable, AutoDelete, Args) -> @@ -143,22 +158,9 @@ list_vhost_exchanges(VHostPath) -> mnesia:dirty_match_object( #exchange{name = rabbit_misc:r(VHostPath, exchange), _ = '_'}). -list_exchange_bindings(Name) -> - [{QueueName, RoutingKey, Arguments} || - #binding{handlers = Handlers} <- bindings_for_exchange(Name), - #handler{binding_spec = #binding_spec{routing_key = RoutingKey, - arguments = Arguments}, - queue = QueueName} <- Handlers]. - -bindings_for_exchange(Name) -> - qlc:e(qlc:q([B || B = #binding{key = K} <- mnesia:table(binding), - element(1, K) == Name])). - -empty_handlers() -> - []. - %% Usable by Erlang code that wants to publish messages. -simple_publish(Mandatory, Immediate, ExchangeName, RoutingKeyBin, ContentTypeBin, BodyBin) -> +simple_publish(Mandatory, Immediate, ExchangeName, RoutingKeyBin, + ContentTypeBin, BodyBin) -> {ClassId, _MethodId} = rabbit_framing:method_id('basic.publish'), Content = #content{class_id = ClassId, properties = #'P_basic'{content_type = ContentTypeBin}, @@ -188,121 +190,173 @@ simple_publish(Mandatory, Immediate, %% The function ensures that a qpid appears in the return list exactly %% as many times as a message should be delivered to it. With the %% current exchange types that is at most once. +%% +%% TODO: Maybe this should be handled by a cursor instead. route(#exchange{name = Name, type = topic}, RoutingKey) -> - sets:to_list( - sets:union( - mnesia:activity( - async_dirty, - fun () -> - qlc:e(qlc:q([handler_qpids(H) || - #binding{key = {Name1, PatternKey}, - handlers = H} - <- mnesia:table(binding), - Name == Name1, - topic_matches(PatternKey, RoutingKey)])) - end))); - -route(#exchange{name = Name, type = Type}, RoutingKey) -> - BindingKey = delivery_key_for_type(Type, Name, RoutingKey), - case rabbit_misc:dirty_read({binding, BindingKey}) of - {ok, #binding{handlers = H}} -> sets:to_list(handler_qpids(H)); - {error, not_found} -> [] - end. + Query = qlc:q([QName || + #route{binding = #binding{ + exchange_name = ExchangeName, + queue_name = QName, + key = BindingKey}} <- mnesia:table(route), + ExchangeName == Name, + %% TODO: This causes a full scan for each entry + %% with the same exchange (see bug 19336) + topic_matches(BindingKey, RoutingKey)]), + lookup_qpids(mnesia:async_dirty(fun qlc:e/1, [Query])); + +route(X = #exchange{type = fanout}, _) -> + route_internal(X, '_'); + +route(X = #exchange{type = direct}, RoutingKey) -> + route_internal(X, RoutingKey). + +route_internal(#exchange{name = Name}, RoutingKey) -> + MatchHead = #route{binding = #binding{exchange_name = Name, + queue_name = '$1', + key = RoutingKey, + _ = '_'}}, + lookup_qpids(mnesia:dirty_select(route, [{MatchHead, [], ['$1']}])). + +lookup_qpids(Queues) -> + sets:fold( + fun(Key, Acc) -> + [#amqqueue{pid = QPid}] = mnesia:dirty_read({amqqueue, Key}), + [QPid | Acc] + end, [], sets:from_list(Queues)). + +%% TODO: Should all of the route and binding management not be +%% refactored to its own module, especially seeing as unbind will have +%% to be implemented for 0.91 ? + +delete_bindings_for_exchange(ExchangeName) -> + indexed_delete( + #route{binding = #binding{exchange_name = ExchangeName, + _ = '_'}}, + fun delete_forward_routes/1, fun mnesia:delete_object/1). + +delete_bindings_for_queue(QueueName) -> + Exchanges = exchanges_for_queue(QueueName), + indexed_delete( + reverse_route(#route{binding = #binding{queue_name = QueueName, + _ = '_'}}), + fun mnesia:delete_object/1, fun delete_forward_routes/1), + [begin + [X] = mnesia:read({exchange, ExchangeName}), + ok = maybe_auto_delete(X) + end || ExchangeName <- Exchanges], + ok. -delivery_key_for_type(fanout, Name, _RoutingKey) -> - {Name, fanout}; -delivery_key_for_type(_Type, Name, RoutingKey) -> - {Name, RoutingKey}. +indexed_delete(Match, ForwardsDeleteFun, ReverseDeleteFun) -> + [begin + ok = ReverseDeleteFun(reverse_route(Route)), + ok = ForwardsDeleteFun(Route) + end || Route <- mnesia:match_object(Match)], + ok. -call_with_exchange(Name, Fun) -> - case mnesia:wread({exchange, Name}) of - [] -> {error, not_found}; - [X] -> Fun(X) - end. +delete_forward_routes(Route) -> + ok = mnesia:delete_object(Route), + ok = mnesia:delete_object(durable_routes, Route, write). -make_handler(BindingSpec, #amqqueue{name = QueueName, pid = QPid}) -> - #handler{binding_spec = BindingSpec, queue = QueueName, qpid = QPid}. +exchanges_for_queue(QueueName) -> + MatchHead = reverse_route( + #route{binding = #binding{exchange_name = '$1', + queue_name = QueueName, + _ = '_'}}), + sets:to_list( + sets:from_list( + mnesia:select(reverse_route, [{MatchHead, [], ['$1']}]))). -add_binding(BindingSpec = #binding_spec{exchange_name = ExchangeName, - routing_key = RoutingKey}, Q) -> - call_with_exchange( - ExchangeName, - fun (X) -> if Q#amqqueue.durable and not(X#exchange.durable) -> - {error, durability_settings_incompatible}; - true -> - internal_add_binding( - X, RoutingKey, make_handler(BindingSpec, Q)) - end +has_bindings(ExchangeName) -> + MatchHead = #route{binding = #binding{exchange_name = ExchangeName, + queue_name = '$1', + _ = '_'}}, + continue(mnesia:select(route, [{MatchHead, [], ['$1']}], 1, read)). + +continue('$end_of_table') -> false; +continue({[_|_], _}) -> true; +continue({[], Continuation}) -> continue(mnesia:select(Continuation)). + +call_with_exchange(Exchange, Fun) -> + rabbit_misc:execute_mnesia_transaction( + fun() -> case mnesia:read({exchange, Exchange}) of + [] -> {error, exchange_not_found}; + [X] -> Fun(X) + end end). -delete_binding(BindingSpec = #binding_spec{exchange_name = ExchangeName, - routing_key = RoutingKey}, Q) -> +call_with_exchange_and_queue(Exchange, Queue, Fun) -> call_with_exchange( - ExchangeName, - fun (X) -> ok = internal_delete_binding( - X, RoutingKey, make_handler(BindingSpec, Q)), - maybe_auto_delete(X) + Exchange, + fun(X) -> case mnesia:read({amqqueue, Queue}) of + [] -> {error, queue_not_found}; + [Q] -> Fun(X, Q) + end end). -%% Must run within a transaction. -maybe_auto_delete(#exchange{auto_delete = false}) -> - ok; -maybe_auto_delete(#exchange{name = ExchangeName, auto_delete = true}) -> - case internal_delete(ExchangeName, true) of - {error, in_use} -> ok; - ok -> ok - end. - -handlers_isempty([]) -> true; -handlers_isempty([_|_]) -> false. - -extend_handlers(Handlers, Handler) -> [Handler | Handlers]. - -delete_handler(Handlers, Handler) -> lists:delete(Handler, Handlers). - -handler_qpids(Handlers) -> - sets:from_list([QPid || #handler{qpid = QPid} <- Handlers]). +add_binding(ExchangeName, QueueName, RoutingKey, Arguments) -> + call_with_exchange_and_queue( + ExchangeName, QueueName, + fun (X, Q) -> + if Q#amqqueue.durable and not(X#exchange.durable) -> + {error, durability_settings_incompatible}; + true -> ok = sync_binding( + ExchangeName, QueueName, RoutingKey, Arguments, + Q#amqqueue.durable, fun mnesia:write/3) + end + end). -%% Must run within a transaction. -internal_add_binding(#exchange{name = ExchangeName, type = Type}, - RoutingKey, Handler) -> - BindingKey = delivery_key_for_type(Type, ExchangeName, RoutingKey), - ok = add_handler_to_binding(BindingKey, Handler). +delete_binding(ExchangeName, QueueName, RoutingKey, Arguments) -> + call_with_exchange_and_queue( + ExchangeName, QueueName, + fun (X, Q) -> + ok = sync_binding( + ExchangeName, QueueName, RoutingKey, Arguments, + Q#amqqueue.durable, fun mnesia:delete_object/3), + maybe_auto_delete(X) + end). -%% Must run within a transaction. -internal_delete_binding(#exchange{name = ExchangeName, type = Type}, RoutingKey, Handler) -> - BindingKey = delivery_key_for_type(Type, ExchangeName, RoutingKey), - remove_handler_from_binding(BindingKey, Handler), +sync_binding(ExchangeName, QueueName, RoutingKey, Arguments, Durable, Fun) -> + Binding = #binding{exchange_name = ExchangeName, + queue_name = QueueName, + key = RoutingKey, + args = Arguments}, + ok = case Durable of + true -> Fun(durable_routes, #route{binding = Binding}, write); + false -> ok + end, + [ok, ok] = [Fun(element(1, R), R, write) || + R <- tuple_to_list(route_with_reverse(Binding))], ok. -%% Must run within a transaction. -add_handler_to_binding(BindingKey, Handler) -> - ok = case mnesia:wread({binding, BindingKey}) of - [] -> - ok = mnesia:write( - #binding{key = BindingKey, - handlers = extend_handlers( - empty_handlers(), Handler)}); - [B = #binding{handlers = H}] -> - ok = mnesia:write( - B#binding{handlers = extend_handlers(H, Handler)}) - end. - -%% Must run within a transaction. -remove_handler_from_binding(BindingKey, Handler) -> - case mnesia:wread({binding, BindingKey}) of - [] -> empty; - [B = #binding{handlers = H}] -> - H1 = delete_handler(H, Handler), - case handlers_isempty(H1) of - true -> - ok = mnesia:delete({binding, BindingKey}), - empty; - _ -> - ok = mnesia:write(B#binding{handlers = H1}), - not_empty - end - end. +route_with_reverse(#route{binding = Binding}) -> + route_with_reverse(Binding); +route_with_reverse(Binding = #binding{}) -> + Route = #route{binding = Binding}, + {Route, reverse_route(Route)}. + +reverse_route(#route{binding = Binding}) -> + #reverse_route{reverse_binding = reverse_binding(Binding)}; + +reverse_route(#reverse_route{reverse_binding = Binding}) -> + #route{binding = reverse_binding(Binding)}. + +reverse_binding(#reverse_binding{exchange_name = Exchange, + queue_name = Queue, + key = Key, + args = Args}) -> + #binding{exchange_name = Exchange, + queue_name = Queue, + key = Key, + args = Args}; + +reverse_binding(#binding{exchange_name = Exchange, + queue_name = Queue, + key = Key, + args = Args}) -> + #reverse_binding{exchange_name = Exchange, + queue_name = Queue, + key = Key, + args = Args}. split_topic_key(Key) -> {ok, KeySplit} = regexp:split(binary_to_list(Key), "\\."), @@ -331,46 +385,50 @@ last_topic_match(P, R, []) -> last_topic_match(P, R, [BacktrackNext | BacktrackList]) -> topic_matches1(P, R) or last_topic_match(P, [BacktrackNext | R], BacktrackList). -delete(ExchangeName, IfUnused) -> - rabbit_misc:execute_mnesia_transaction( - fun () -> internal_delete(ExchangeName, IfUnused) end). - -internal_delete(ExchangeName, _IfUnused = true) -> - Bindings = bindings_for_exchange(ExchangeName), - case Bindings of - [] -> do_internal_delete(ExchangeName, Bindings); - _ -> - case lists:all(fun (#binding{handlers = H}) -> handlers_isempty(H) end, - Bindings) of - true -> - %% There are no handlers anywhere in any of the - %% bindings for this exchange. - do_internal_delete(ExchangeName, Bindings); - false -> - %% There was at least one real handler - %% present. It's still in use. - {error, in_use} - end - end; -internal_delete(ExchangeName, false) -> - do_internal_delete(ExchangeName, bindings_for_exchange(ExchangeName)). - -forcibly_remove_handlers(Handlers) -> - lists:foreach( - fun (#handler{binding_spec = BindingSpec, queue = QueueName}) -> - ok = rabbit_amqqueue:binding_forcibly_removed( - BindingSpec, QueueName) - end, Handlers), +delete(ExchangeName, _IfUnused = true) -> + call_with_exchange(ExchangeName, fun conditional_delete/1); +delete(ExchangeName, _IfUnused = false) -> + call_with_exchange(ExchangeName, fun unconditional_delete/1). + +maybe_auto_delete(#exchange{auto_delete = false}) -> + ok; +maybe_auto_delete(Exchange = #exchange{auto_delete = true}) -> + conditional_delete(Exchange), ok. -do_internal_delete(ExchangeName, Bindings) -> - case mnesia:wread({exchange, ExchangeName}) of - [] -> {error, not_found}; - _ -> - lists:foreach(fun (#binding{key = K, handlers = H}) -> - ok = forcibly_remove_handlers(H), - ok = mnesia:delete({binding, K}) - end, Bindings), - ok = mnesia:delete({durable_exchanges, ExchangeName}), - ok = mnesia:delete({exchange, ExchangeName}) +conditional_delete(Exchange = #exchange{name = ExchangeName}) -> + case has_bindings(ExchangeName) of + false -> unconditional_delete(Exchange); + true -> {error, in_use} end. + +unconditional_delete(#exchange{name = ExchangeName}) -> + ok = delete_bindings_for_exchange(ExchangeName), + ok = mnesia:delete({durable_exchanges, ExchangeName}), + ok = mnesia:delete({exchange, ExchangeName}). + +%%---------------------------------------------------------------------------- +%% EXTENDED API +%% These are API calls that are not used by the server internally, +%% they are exported for embedded clients to use + +%% This is currently used in mod_rabbit.erl (XMPP) and expects this to +%% return {QueueName, RoutingKey, Arguments} tuples +list_exchange_bindings(ExchangeName) -> + Route = #route{binding = #binding{exchange_name = ExchangeName, + _ = '_'}}, + [{QueueName, RoutingKey, Arguments} || + #route{binding = #binding{queue_name = QueueName, + key = RoutingKey, + args = Arguments}} + <- mnesia:dirty_match_object(Route)]. + +% Refactoring is left as an exercise for the reader +list_queue_bindings(QueueName) -> + Route = #route{binding = #binding{queue_name = QueueName, + _ = '_'}}, + [{ExchangeName, RoutingKey, Arguments} || + #route{binding = #binding{exchange_name = ExchangeName, + key = RoutingKey, + args = Arguments}} + <- mnesia:dirty_match_object(Route)]. diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 89648f4f..7638af58 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -68,7 +68,8 @@ -spec(get_config/2 :: (atom(), A) -> A). -spec(set_config/2 :: (atom(), any()) -> 'ok'). -spec(dirty_read/1 :: ({atom(), any()}) -> {'ok', any()} | not_found()). --spec(r/3 :: (vhost(), K, resource_name()) -> r(K) when is_subtype(K, atom())). +-spec(r/3 :: (vhost() | r(atom()), K, resource_name()) -> r(K) + when is_subtype(K, atom())). -spec(r/2 :: (vhost(), K) -> #resource{virtual_host :: vhost(), kind :: K, name :: '_'} @@ -210,7 +211,8 @@ with_exit_handler(Handler, Thunk) -> try Thunk() catch - exit:{R, _} when R =:= noproc; R =:= normal -> Handler() + exit:{R, _} when R =:= noproc; R =:= normal; R =:= shutdown -> + Handler() end. with_user(Username, Thunk) -> @@ -236,6 +238,7 @@ with_vhost(VHostPath, Thunk) -> with_user_and_vhost(Username, VHostPath, Thunk) -> with_user(Username, with_vhost(VHostPath, Thunk)). + execute_mnesia_transaction(TxFun) -> %% Making this a sync_transaction allows us to use dirty_read %% elsewhere and get a consistent result even when that read diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 4ae367ba..9b67135d 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -105,7 +105,13 @@ table_definitions() -> {rabbit_config, [{disc_copies, [node()]}]}, {listener, [{type, bag}, {attributes, record_info(fields, listener)}]}, - {binding, [{attributes, record_info(fields, binding)}]}, + {durable_routes, [{disc_copies, [node()]}, + {record_name, route}, + {attributes, record_info(fields, route)}]}, + {route, [{type, ordered_set}, + {attributes, record_info(fields, route)}]}, + {reverse_route, [{type, ordered_set}, + {attributes, record_info(fields, reverse_route)}]}, {durable_exchanges, [{disc_copies, [node()]}, {record_name, exchange}, {attributes, record_info(fields, exchange)}]}, @@ -255,7 +261,7 @@ init_db(ClusterNodes) -> end. create_schema() -> - mnesia:stop(), + mnesia:stop(), rabbit_misc:ensure_ok(mnesia:create_schema([node()]), cannot_create_schema), rabbit_misc:ensure_ok(mnesia:start(), diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl index 2c7fa2ab..a2688625 100644 --- a/src/rabbit_writer.erl +++ b/src/rabbit_writer.erl @@ -36,6 +36,8 @@ -record(wstate, {sock, channel, frame_max}). +-define(HIBERNATE_AFTER, 5000). + %%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -63,6 +65,8 @@ start(Sock, Channel, FrameMax) -> mainloop(State) -> receive Message -> ?MODULE:mainloop(handle_message(Message, State)) + after ?HIBERNATE_AFTER -> + erlang:hibernate(?MODULE, mainloop, [State]) end. handle_message({send_command, MethodRecord}, diff --git a/src/tcp_listener.erl b/src/tcp_listener.erl index 3943161a..dc38b594 100644 --- a/src/tcp_listener.erl +++ b/src/tcp_listener.erl @@ -58,9 +58,9 @@ init({IPAddress, Port, SocketOpts, AcceptorSup, [LSock]) end, lists:duplicate(ConcurrentAcceptorCount, dummy)), - error_logger:info_msg( - "started TCP listener on ~s:~p~n", - [inet_parse:ntoa(IPAddress), Port]), + {ok, {LIPAddress, LPort}} = inet:sockname(LSock), + error_logger:info_msg("started TCP listener on ~s:~p~n", + [inet_parse:ntoa(LIPAddress), LPort]), apply(M, F, A ++ [IPAddress, Port]), {ok, #state{sock=LSock, on_startup = OnStartup, on_shutdown = OnShutdown}}; |