summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2008-11-07 17:45:07 +0000
committerMatthias Radestock <matthias@lshift.net>2008-11-07 17:45:07 +0000
commitdb9617bb8dd76e23c067269d5bdc517a4bee27d4 (patch)
treee4c34f05516628f7773040ad62f11bf7e9ff4272
parent9b82bfcb2133bd40759b9cc063b86c06e2523825 (diff)
parent78e2a8e7fdff0850e873a9f85b475a4ec36f97da (diff)
downloadrabbitmq-server-db9617bb8dd76e23c067269d5bdc517a4bee27d4.tar.gz
merge bug19250 into default
-rw-r--r--include/rabbit.hrl20
-rw-r--r--include/rabbit_framing_spec.hrl1
-rw-r--r--packaging/Makefile3
-rwxr-xr-xpackaging/checks.sh45
-rw-r--r--packaging/debs/Debian/Makefile1
-rwxr-xr-xscripts/rabbitmq-server4
-rw-r--r--scripts/rabbitmq-server.bat4
-rw-r--r--src/buffering_proxy.erl7
-rw-r--r--src/rabbit.erl3
-rw-r--r--src/rabbit_alarm.erl126
-rw-r--r--src/rabbit_amqqueue.erl120
-rw-r--r--src/rabbit_amqqueue_process.erl78
-rw-r--r--src/rabbit_channel.erl30
-rw-r--r--src/rabbit_exchange.erl406
-rw-r--r--src/rabbit_misc.erl7
-rw-r--r--src/rabbit_mnesia.erl10
-rw-r--r--src/rabbit_writer.erl4
-rw-r--r--src/tcp_listener.erl6
18 files changed, 483 insertions, 392 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index 180a0dc3..706a92af 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -43,11 +43,14 @@
-record(exchange, {name, type, durable, auto_delete, arguments}).
--record(amqqueue, {name, durable, auto_delete, arguments, binding_specs, pid}).
--record(binding_spec, {exchange_name, routing_key, arguments}).
+-record(amqqueue, {name, durable, auto_delete, arguments, pid}).
--record(binding, {key, handlers}).
--record(handler, {binding_spec, queue, qpid}).
+%% mnesia doesn't like unary records, so we add a dummy 'value' field
+-record(route, {binding, value = const}).
+-record(reverse_route, {reverse_binding, value = const}).
+
+-record(binding, {exchange_name, key, queue_name, args = []}).
+-record(reverse_binding, {queue_name, key, exchange_name, args = []}).
-record(listener, {node, protocol, host, port}).
@@ -77,16 +80,11 @@
-type(user() ::
#user{username :: username(),
password :: password()}).
--type(binding_spec() ::
- #binding_spec{exchange_name :: exchange_name(),
- routing_key :: routing_key(),
- arguments :: amqp_table()}).
-type(amqqueue() ::
#amqqueue{name :: queue_name(),
durable :: bool(),
auto_delete :: bool(),
arguments :: amqp_table(),
- binding_specs :: [binding_spec()],
pid :: maybe(pid())}).
-type(exchange() ::
#exchange{name :: exchange_name(),
@@ -94,6 +92,10 @@
durable :: bool(),
auto_delete :: bool(),
arguments :: amqp_table()}).
+-type(binding() ::
+ #binding{exchange_name :: exchange_name(),
+ queue_name :: queue_name(),
+ key :: binding_key()}).
%% TODO: make this more precise by tying specific class_ids to
%% specific properties
-type(undecoded_content() ::
diff --git a/include/rabbit_framing_spec.hrl b/include/rabbit_framing_spec.hrl
index e9e65092..13000153 100644
--- a/include/rabbit_framing_spec.hrl
+++ b/include/rabbit_framing_spec.hrl
@@ -53,3 +53,4 @@
-type(vhost() :: binary()).
-type(ctag() :: binary()).
-type(exchange_type() :: 'direct' | 'topic' | 'fanout').
+-type(binding_key() :: binary()).
diff --git a/packaging/Makefile b/packaging/Makefile
deleted file mode 100644
index 44a9b328..00000000
--- a/packaging/Makefile
+++ /dev/null
@@ -1,3 +0,0 @@
-check_tools:
- @sh ./checks.sh
- @echo All the needed tools seem to be installed, great!
diff --git a/packaging/checks.sh b/packaging/checks.sh
deleted file mode 100755
index 63e88701..00000000
--- a/packaging/checks.sh
+++ /dev/null
@@ -1,45 +0,0 @@
-#! /bin/sh
-
-# We check for the presence of the tools necessary to build a release on a
-# Debian based OS.
-
-TOOLS_STOP=0
-
-checker () {
- if [ ! `which $1` ]
- then
- echo "$1 is missing, please install it"
- TOOLS_STOP=1
- NEW_NAME=`echo $1 | sed -e 's/-/_/g'`
- eval "$NEW_NAME=1"
- else
- echo "$1 found"
- fi
-};
-
-echo ~~~~~~~~~~~~ Looking for mandatory programs ~~~~~~~~~~~~
-
-for i in cdbs-edit-patch reprepro rpm elinks wget zip gpg rsync
-do
- checker $i
-done
-echo ~~~~~~~~~~~~~~~~~~~~~~~~~~ DONE ~~~~~~~~~~~~~~~~~~~~~~~
-
-if [ 1 = $TOOLS_STOP ]
-then
- [ $cdbs_edit_patch ] && cdbs_edit_patch="cdbs "
- [ $reprepro ] && reprepro="reprepro "
- [ $rpm ] && rpm="rpm "
- [ $elinks ] && elinks="elinks "
- [ $wget ] && wget="wget "
- [ $zip ] && zip="zip "
- [ $gpg ] && gpg="gpg "
- [ $rsync ] && rsync="rsync "
-
- echo
- echo We suggest you run the command
- echo "apt-get install ${cdbs_edit_patch}${reprepro}${rpm}${elinks}${wget}${zip}${gpg}${rsync}"
- echo
-fi
-
-exit $TOOLS_STOP
diff --git a/packaging/debs/Debian/Makefile b/packaging/debs/Debian/Makefile
index 3e74cb52..9479feb0 100644
--- a/packaging/debs/Debian/Makefile
+++ b/packaging/debs/Debian/Makefile
@@ -16,7 +16,6 @@ all:
@echo 'Please choose a target from the Makefile.'
package: clean
- make -C ../.. check_tools
cp $(TARBALL_DIR)/$(TARBALL) $(DEBIAN_ORIG_TARBALL)
tar -zxvf $(DEBIAN_ORIG_TARBALL)
cp -r debian $(UNPACKED_DIR)
diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server
index b930c8ed..c953a753 100755
--- a/scripts/rabbitmq-server
+++ b/scripts/rabbitmq-server
@@ -66,8 +66,10 @@ erl \
-sasl sasl_error_logger '{file,"'${SASL_LOGS}'"}' \
-os_mon start_cpu_sup true \
-os_mon start_disksup false \
- -os_mon start_memsup false \
+ -os_mon start_memsup true \
-os_mon start_os_sup false \
+ -os_mon memsup_system_only true \
+ -os_mon system_memory_high_watermark 0.95 \
-mnesia dir "\"${MNESIA_DIR}\"" \
${CLUSTER_CONFIG} \
${RABBIT_ARGS} \
diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat
index f08027d2..38b8cc53 100644
--- a/scripts/rabbitmq-server.bat
+++ b/scripts/rabbitmq-server.bat
@@ -107,8 +107,10 @@ set MNESIA_DIR=%MNESIA_BASE%/%NODENAME%-mnesia
-sasl sasl_error_logger {file,\""%LOG_BASE%/%NODENAME%-sasl.log"\"} ^
-os_mon start_cpu_sup true ^
-os_mon start_disksup false ^
--os_mon start_memsup false ^
+-os_mon start_memsup true ^
-os_mon start_os_sup false ^
+-os_mon memsup_system_only true ^
+-os_mon system_memory_high_watermark 0.95 ^
-mnesia dir \""%MNESIA_DIR%"\" ^
%CLUSTER_CONFIG% ^
%RABBIT_ARGS% ^
diff --git a/src/buffering_proxy.erl b/src/buffering_proxy.erl
index dc168608..fcb7b412 100644
--- a/src/buffering_proxy.erl
+++ b/src/buffering_proxy.erl
@@ -32,6 +32,8 @@
-export([mainloop/4, drain/2]).
-export([proxy_loop/3]).
+-define(HIBERNATE_AFTER, 5000).
+
%%----------------------------------------------------------------------------
start_link(M, A) ->
@@ -59,6 +61,9 @@ mainloop(ProxyPid, Ref, M, State) ->
ProxyPid ! Ref,
NewSt;
Msg -> M:handle_message(Msg, State)
+ after ?HIBERNATE_AFTER ->
+ erlang:hibernate(?MODULE, mainloop,
+ [ProxyPid, Ref, M, State])
end,
?MODULE:mainloop(ProxyPid, Ref, M, NewState).
@@ -92,4 +97,6 @@ proxy_loop(Ref, Pid, State) ->
waiting -> Pid ! {Ref, [Msg]}, empty;
Messages -> [Msg | Messages]
end)
+ after ?HIBERNATE_AFTER ->
+ erlang:hibernate(?MODULE, proxy_loop, [Ref, Pid, State])
end.
diff --git a/src/rabbit.erl b/src/rabbit.erl
index c6ef1749..a33c5b7b 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -157,6 +157,8 @@ start(normal, []) ->
ok = rabbit_amqqueue:start(),
+ ok = rabbit_alarm:start(),
+
ok = rabbit_binary_generator:
check_empty_content_body_frame_size(),
@@ -198,6 +200,7 @@ start(normal, []) ->
stop(_State) ->
terminated_ok = error_logger:delete_report_handler(rabbit_error_logger),
+ ok = rabbit_alarm:stop(),
ok.
%---------------------------------------------------------------------------
diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl
new file mode 100644
index 00000000..d9c1c450
--- /dev/null
+++ b/src/rabbit_alarm.erl
@@ -0,0 +1,126 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd.,
+%% Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd., Cohesive Financial Technologies
+%% LLC., and Rabbit Technologies Ltd. are Copyright (C) 2007-2008
+%% LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit
+%% Technologies Ltd.;
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_alarm).
+
+-behaviour(gen_event).
+
+-export([start/0, stop/0, register/2]).
+
+-export([init/1, handle_call/2, handle_event/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+-define(MEMSUP_CHECK_INTERVAL, 1000).
+
+-record(alarms, {alertees, system_memory_high_watermark = false}).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-type(mfa_tuple() :: {atom(), atom(), list()}).
+-spec(start/0 :: () -> 'ok').
+-spec(stop/0 :: () -> 'ok').
+-spec(register/2 :: (pid(), mfa_tuple()) -> 'ok').
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+start() ->
+ %% The default memsup check interval is 1 minute, which is way too
+ %% long - rabbit can gobble up all memory in a matter of
+ %% seconds. Unfortunately the memory_check_interval configuration
+ %% parameter and memsup:set_check_interval/1 function only provide
+ %% a granularity of minutes. So we have to peel off one layer of
+ %% the API to get to the underlying layer which operates at the
+ %% granularity of milliseconds.
+ %%
+ %% Note that the new setting will only take effect after the first
+ %% check has completed, i.e. after one minute. So if rabbit eats
+ %% all the memory within the first minute after startup then we
+ %% are out of luck.
+ ok = os_mon:call(memsup, {set_check_interval, ?MEMSUP_CHECK_INTERVAL},
+ infinity),
+
+ ok = alarm_handler:add_alarm_handler(?MODULE).
+
+stop() ->
+ ok = alarm_handler:delete_alarm_handler(?MODULE).
+
+register(Pid, HighMemMFA) ->
+ ok = gen_event:call(alarm_handler, ?MODULE,
+ {register, Pid, HighMemMFA}).
+
+%%----------------------------------------------------------------------------
+
+init([]) ->
+ {ok, #alarms{alertees = dict:new()}}.
+
+handle_call({register, Pid, HighMemMFA},
+ State = #alarms{alertees = Alertess}) ->
+ _MRef = erlang:monitor(process, Pid),
+ case State#alarms.system_memory_high_watermark of
+ true -> {M, F, A} = HighMemMFA,
+ ok = erlang:apply(M, F, A ++ [Pid, true]);
+ false -> ok
+ end,
+ NewAlertees = dict:store(Pid, HighMemMFA, Alertess),
+ {ok, ok, State#alarms{alertees = NewAlertees}};
+
+handle_call(_Request, State) ->
+ {ok, not_understood, State}.
+
+handle_event({set_alarm, {system_memory_high_watermark, []}}, State) ->
+ ok = alert(true, State#alarms.alertees),
+ {ok, State#alarms{system_memory_high_watermark = true}};
+
+handle_event({clear_alarm, system_memory_high_watermark}, State) ->
+ ok = alert(false, State#alarms.alertees),
+ {ok, State#alarms{system_memory_high_watermark = false}};
+
+handle_event(_Event, State) ->
+ {ok, State}.
+
+handle_info({'DOWN', _MRef, process, Pid, _Reason},
+ State = #alarms{alertees = Alertess}) ->
+ {ok, State#alarms{alertees = dict:erase(Pid, Alertess)}};
+
+handle_info(_Info, State) ->
+ {ok, State}.
+
+terminate(_Arg, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%%----------------------------------------------------------------------------
+
+alert(Alert, Alertees) ->
+ dict:fold(fun (Pid, {M, F, A}, Acc) ->
+ ok = erlang:apply(M, F, A ++ [Pid, Alert]),
+ Acc
+ end, ok, Alertees).
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 7b2f801a..56d2c35d 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -29,7 +29,6 @@
-export([pseudo_queue/2]).
-export([lookup/1, with/2, with_or_die/2, list_vhost_queues/1,
stat/1, stat_all/0, deliver/5, redeliver/2, requeue/3, ack/4]).
--export([add_binding/4, delete_binding/4, binding_forcibly_removed/2]).
-export([claim_queue/2]).
-export([basic_get/3, basic_consume/7, basic_cancel/4]).
-export([notify_sent/2]).
@@ -53,21 +52,12 @@
-type(qstats() :: {'ok', queue_name(), non_neg_integer(), non_neg_integer()}).
-type(qlen() :: {'ok', non_neg_integer()}).
-type(qfun(A) :: fun ((amqqueue()) -> A)).
--type(bind_res() :: {'ok', non_neg_integer()} |
- {'error', 'queue_not_found' | 'exchange_not_found'}).
-type(ok_or_errors() ::
'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}).
-
-spec(start/0 :: () -> 'ok').
-spec(recover/0 :: () -> 'ok').
-spec(declare/4 :: (queue_name(), bool(), bool(), amqp_table()) ->
amqqueue()).
--spec(add_binding/4 ::
- (queue_name(), exchange_name(), routing_key(), amqp_table()) ->
- bind_res() | {'error', 'durability_settings_incompatible'}).
--spec(delete_binding/4 ::
- (queue_name(), exchange_name(), routing_key(), amqp_table()) ->
- bind_res() | {'error', 'binding_not_found'}).
-spec(lookup/1 :: (queue_name()) -> {'ok', amqqueue()} | not_found()).
-spec(with/2 :: (queue_name(), qfun(A)) -> A | not_found()).
-spec(with_or_die/2 :: (queue_name(), qfun(A)) -> A).
@@ -89,7 +79,6 @@
-spec(commit_all/2 :: ([pid()], txn()) -> ok_or_errors()).
-spec(rollback_all/2 :: ([pid()], txn()) -> ok_or_errors()).
-spec(notify_down_all/2 :: ([pid()], pid()) -> ok_or_errors()).
--spec(binding_forcibly_removed/2 :: (binding_spec(), queue_name()) -> 'ok').
-spec(claim_queue/2 :: (amqqueue(), pid()) -> 'ok' | 'locked').
-spec(basic_get/3 :: (amqqueue(), pid(), bool()) ->
{'ok', non_neg_integer(), msg()} | 'empty').
@@ -131,7 +120,7 @@ recover_durable_queues() ->
Queues = lists:map(fun start_queue_process/1, R),
rabbit_misc:execute_mnesia_transaction(
fun () ->
- lists:foreach(fun recover_queue/1, Queues),
+ lists:foreach(fun store_queue/1, Queues),
ok
end).
@@ -140,12 +129,12 @@ declare(QueueName, Durable, AutoDelete, Args) ->
durable = Durable,
auto_delete = AutoDelete,
arguments = Args,
- binding_specs = [],
pid = none}),
case rabbit_misc:execute_mnesia_transaction(
fun () ->
case mnesia:wread({amqqueue, QueueName}) of
- [] -> ok = recover_queue(Q),
+ [] -> ok = store_queue(Q),
+ ok = add_default_binding(Q),
Q;
[ExistingQ] -> ExistingQ
end
@@ -167,83 +156,12 @@ start_queue_process(Q) ->
{ok, Pid} = supervisor:start_child(rabbit_amqqueue_sup, [Q]),
Q#amqqueue{pid = Pid}.
-recover_queue(Q) ->
- ok = store_queue(Q),
- ok = recover_bindings(Q),
- ok.
-
-default_binding_spec(#resource{virtual_host = VHost, name = Name}) ->
- #binding_spec{exchange_name = rabbit_misc:r(VHost, exchange, <<>>),
- routing_key = Name,
- arguments = []}.
-
-recover_bindings(Q = #amqqueue{name = QueueName, binding_specs = Specs}) ->
- ok = rabbit_exchange:add_binding(default_binding_spec(QueueName), Q),
- lists:foreach(fun (B) ->
- ok = rabbit_exchange:add_binding(B, Q)
- end, Specs),
+add_default_binding(#amqqueue{name = QueueName}) ->
+ Exchange = rabbit_misc:r(QueueName, exchange, <<>>),
+ RoutingKey = QueueName#resource.name,
+ rabbit_exchange:add_binding(Exchange, QueueName, RoutingKey, []),
ok.
-modify_bindings(QueueName, ExchangeName, RoutingKey, Arguments,
- SpecPresentFun, SpecAbsentFun) ->
- rabbit_misc:execute_mnesia_transaction(
- fun () ->
- case mnesia:wread({amqqueue, QueueName}) of
- [Q = #amqqueue{binding_specs = Specs0}] ->
- Spec = #binding_spec{exchange_name = ExchangeName,
- routing_key = RoutingKey,
- arguments = Arguments},
- case (case lists:member(Spec, Specs0) of
- true -> SpecPresentFun;
- false -> SpecAbsentFun
- end)(Q, Spec) of
- {ok, #amqqueue{binding_specs = Specs}} ->
- {ok, length(Specs)};
- {error, not_found} ->
- {error, exchange_not_found};
- Other -> Other
- end;
- [] -> {error, queue_not_found}
- end
- end).
-
-update_bindings(Q = #amqqueue{binding_specs = Specs0}, Spec,
- UpdateSpecFun, UpdateExchangeFun) ->
- Q1 = Q#amqqueue{binding_specs = UpdateSpecFun(Spec, Specs0)},
- case UpdateExchangeFun(Spec, Q1) of
- ok -> store_queue(Q1),
- {ok, Q1};
- Other -> Other
- end.
-
-add_binding(QueueName, ExchangeName, RoutingKey, Arguments) ->
- modify_bindings(
- QueueName, ExchangeName, RoutingKey, Arguments,
- fun (Q, _Spec) -> {ok, Q} end,
- fun (Q, Spec) -> update_bindings(
- Q, Spec,
- fun (S, Specs) -> [S | Specs] end,
- fun rabbit_exchange:add_binding/2)
- end).
-
-delete_binding(QueueName, ExchangeName, RoutingKey, Arguments) ->
- modify_bindings(
- QueueName, ExchangeName, RoutingKey, Arguments,
- fun (Q, Spec) -> update_bindings(
- Q, Spec,
- fun lists:delete/2,
- fun rabbit_exchange:delete_binding/2)
- end,
- fun (Q, Spec) ->
- %% the following is essentially a no-op, though crucially
- %% it produces {error, not_found} when the exchange does
- %% not exist.
- case rabbit_exchange:delete_binding(Spec, Q) of
- ok -> {error, binding_not_found};
- Other -> Other
- end
- end).
-
lookup(Name) ->
rabbit_misc:dirty_read({amqqueue, Name}).
@@ -314,17 +232,6 @@ notify_down_all(QPids, ChPid) ->
fun (QPid) -> gen_server:call(QPid, {notify_down, ChPid}, Timeout) end,
QPids).
-binding_forcibly_removed(BindingSpec, QueueName) ->
- rabbit_misc:execute_mnesia_transaction(
- fun () ->
- case mnesia:wread({amqqueue, QueueName}) of
- [] -> ok;
- [Q = #amqqueue{binding_specs = Specs}] ->
- store_queue(Q#amqqueue{binding_specs =
- lists:delete(BindingSpec, Specs)})
- end
- end).
-
claim_queue(#amqqueue{pid = QPid}, ReaderPid) ->
gen_server:call(QPid, {claim_queue, ReaderPid}).
@@ -342,12 +249,6 @@ basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) ->
notify_sent(QPid, ChPid) ->
gen_server:cast(QPid, {notify_sent, ChPid}).
-delete_bindings(Q = #amqqueue{binding_specs = Specs}) ->
- lists:foreach(fun (BindingSpec) ->
- ok = rabbit_exchange:delete_binding(
- BindingSpec, Q)
- end, Specs).
-
internal_delete(QueueName) ->
rabbit_misc:execute_mnesia_transaction(
fun () ->
@@ -360,10 +261,8 @@ internal_delete(QueueName) ->
end
end).
-delete_queue(Q = #amqqueue{name = QueueName}) ->
- ok = delete_bindings(Q),
- ok = rabbit_exchange:delete_binding(
- default_binding_spec(QueueName), Q),
+delete_queue(#amqqueue{name = QueueName}) ->
+ ok = rabbit_exchange:delete_bindings_for_queue(QueueName),
ok = mnesia:delete({amqqueue, QueueName}),
ok.
@@ -383,7 +282,6 @@ pseudo_queue(QueueName, Pid) ->
durable = false,
auto_delete = false,
arguments = [],
- binding_specs = [],
pid = Pid}.
safe_pmap_ok(H, F, L) ->
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 7716ef16..e687df84 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -30,6 +30,7 @@
-behaviour(gen_server).
-define(UNSENT_MESSAGE_LIMIT, 100).
+-define(HIBERNATE_AFTER, 1000).
-export([start_link/1]).
@@ -75,7 +76,7 @@ init(Q) ->
has_had_consumers = false,
next_msg_id = 1,
message_buffer = queue:new(),
- round_robin = queue:new()}}.
+ round_robin = queue:new()}, ?HIBERNATE_AFTER}.
terminate(_Reason, State) ->
%% FIXME: How do we cancel active subscriptions?
@@ -90,6 +91,10 @@ code_change(_OldVsn, State, _Extra) ->
%%----------------------------------------------------------------------------
+reply(Reply, NewState) -> {reply, Reply, NewState, ?HIBERNATE_AFTER}.
+
+noreply(NewState) -> {noreply, NewState, ?HIBERNATE_AFTER}.
+
lookup_ch(ChPid) ->
case get({ch, ChPid}) of
undefined -> not_found;
@@ -254,7 +259,7 @@ check_auto_delete(State = #q{round_robin = RoundRobin}) ->
handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder,
round_robin = ActiveConsumers}) ->
case lookup_ch(DownPid) of
- not_found -> {noreply, State};
+ not_found -> noreply(State);
#cr{monitor_ref = MonitorRef, ch_pid = ChPid, unacked_messages = UAM} ->
NewActive = block_consumers(ChPid, ActiveConsumers),
erlang:demonitor(MonitorRef),
@@ -270,7 +275,7 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder,
end,
round_robin = NewActive})) of
{continue, NewState} ->
- {noreply, NewState};
+ noreply(NewState);
{stop, NewState} ->
{stop, normal, NewState}
end
@@ -470,12 +475,12 @@ handle_call({deliver_immediately, Txn, Message}, _From, State) ->
%% queues discarding the message?
%%
{Delivered, NewState} = attempt_delivery(Txn, Message, State),
- {reply, Delivered, NewState};
+ reply(Delivered, NewState);
handle_call({deliver, Txn, Message}, _From, State) ->
%% Synchronous, "mandatory" delivery mode
{Delivered, NewState} = deliver_or_enqueue(Txn, Message, State),
- {reply, Delivered, NewState};
+ reply(Delivered, NewState);
handle_call({commit, Txn}, From, State) ->
ok = commit_work(Txn, qname(State)),
@@ -483,7 +488,7 @@ handle_call({commit, Txn}, From, State) ->
gen_server:reply(From, ok),
NewState = process_pending(Txn, State),
erase_tx(Txn),
- {noreply, NewState};
+ noreply(NewState);
handle_call({notify_down, ChPid}, From, State) ->
%% optimisation: we reply straight away so the sender can continue
@@ -507,10 +512,11 @@ handle_call({basic_get, ChPid, NoAck}, _From,
persist_auto_ack(QName, Message)
end,
Msg = {QName, self(), NextId, Delivered, Message},
- {reply, {ok, queue:len(BufferTail), Msg},
- State#q{message_buffer = BufferTail, next_msg_id = NextId + 1}};
+ reply({ok, queue:len(BufferTail), Msg},
+ State#q{message_buffer = BufferTail,
+ next_msg_id = NextId + 1});
{empty, _} ->
- {reply, empty, State}
+ reply(empty, State)
end;
handle_call({basic_consume, NoAck, ReaderPid, ChPid, ConsumerTag,
@@ -520,11 +526,11 @@ handle_call({basic_consume, NoAck, ReaderPid, ChPid, ConsumerTag,
round_robin = RoundRobin}) ->
case check_queue_owner(Owner, ReaderPid) of
mismatch ->
- {reply, {error, queue_owned_by_another_connection}, State};
+ reply({error, queue_owned_by_another_connection}, State);
ok ->
case check_exclusive_access(ExistingHolder, ExclusiveConsume) of
in_use ->
- {reply, {error, exclusive_consume_unavailable}, State};
+ reply({error, exclusive_consume_unavailable}, State);
ok ->
C = #cr{consumers = Consumers} = ch_record(ChPid),
Consumer = #consumer{tag = ConsumerTag, ack_required = not(NoAck)},
@@ -538,7 +544,7 @@ handle_call({basic_consume, NoAck, ReaderPid, ChPid, ConsumerTag,
end,
round_robin = queue:in({ChPid, Consumer}, RoundRobin)},
ok = maybe_send_reply(ChPid, OkMsg),
- {reply, ok, run_poke_burst(State1)}
+ reply(ok, run_poke_burst(State1))
end
end;
@@ -548,7 +554,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
case lookup_ch(ChPid) of
not_found ->
ok = maybe_send_reply(ChPid, OkMsg),
- {reply, ok, State};
+ reply(ok, State);
C = #cr{consumers = Consumers} ->
NewConsumers = lists:filter
(fun (#consumer{tag = CT}) -> CT /= ConsumerTag end,
@@ -564,7 +570,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
ConsumerTag,
RoundRobin)}) of
{continue, State1} ->
- {reply, ok, State1};
+ reply(ok, State1);
{stop, State1} ->
{stop, normal, ok, State1}
end
@@ -573,7 +579,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
handle_call(stat, _From, State = #q{q = #amqqueue{name = Name},
message_buffer = MessageBuffer,
round_robin = RoundRobin}) ->
- {reply, {ok, Name, queue:len(MessageBuffer), queue:len(RoundRobin)}, State};
+ reply({ok, Name, queue:len(MessageBuffer), queue:len(RoundRobin)}, State);
handle_call({delete, IfUnused, IfEmpty}, _From,
State = #q{message_buffer = MessageBuffer}) ->
@@ -581,16 +587,17 @@ handle_call({delete, IfUnused, IfEmpty}, _From,
IsUnused = is_unused(),
if
IfEmpty and not(IsEmpty) ->
- {reply, {error, not_empty}, State};
+ reply({error, not_empty}, State);
IfUnused and not(IsUnused) ->
- {reply, {error, in_use}, State};
+ reply({error, in_use}, State);
true ->
{stop, normal, {ok, queue:len(MessageBuffer)}, State}
end;
handle_call(purge, _From, State = #q{message_buffer = MessageBuffer}) ->
ok = purge_message_buffer(qname(State), MessageBuffer),
- {reply, {ok, queue:len(MessageBuffer)}, State#q{message_buffer = queue:new()}};
+ reply({ok, queue:len(MessageBuffer)},
+ State#q{message_buffer = queue:new()});
handle_call({claim_queue, ReaderPid}, _From, State = #q{owner = Owner,
exclusive_consumer = Holder}) ->
@@ -604,25 +611,25 @@ handle_call({claim_queue, ReaderPid}, _From, State = #q{owner = Owner,
%% to check, we'd need to hold not just the ch
%% pid for each consumer, but also its reader
%% pid...
- {reply, locked, State};
+ reply(locked, State);
ok ->
- {reply, ok, State#q{owner = {ReaderPid, erlang:monitor(process, ReaderPid)}}}
+ reply(ok, State#q{owner = {ReaderPid, erlang:monitor(process, ReaderPid)}})
end;
{ReaderPid, _MonitorRef} ->
- {reply, ok, State};
+ reply(ok, State);
_ ->
- {reply, locked, State}
+ reply(locked, State)
end.
handle_cast({deliver, Txn, Message}, State) ->
%% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
{_Delivered, NewState} = deliver_or_enqueue(Txn, Message, State),
- {noreply, NewState};
+ noreply(NewState);
handle_cast({ack, Txn, MsgIds, ChPid}, State) ->
case lookup_ch(ChPid) of
not_found ->
- {noreply, State};
+ noreply(State);
C = #cr{unacked_messages = UAM} ->
{Acked, Remaining} = collect_messages(MsgIds, UAM),
persist_acks(Txn, qname(State), Acked),
@@ -632,37 +639,37 @@ handle_cast({ack, Txn, MsgIds, ChPid}, State) ->
_ ->
record_pending_acks(Txn, ChPid, MsgIds)
end,
- {noreply, State}
+ noreply(State)
end;
handle_cast({rollback, Txn}, State) ->
ok = rollback_work(Txn, qname(State)),
erase_tx(Txn),
- {noreply, State};
+ noreply(State);
handle_cast({redeliver, Messages}, State) ->
- {noreply, deliver_or_enqueue_n(Messages, State)};
+ noreply(deliver_or_enqueue_n(Messages, State));
handle_cast({requeue, MsgIds, ChPid}, State) ->
case lookup_ch(ChPid) of
not_found ->
rabbit_log:warning("Ignoring requeue from unknown ch: ~p~n",
[ChPid]),
- {noreply, State};
+ noreply(State);
C = #cr{unacked_messages = UAM} ->
{Messages, NewUAM} = collect_messages(MsgIds, UAM),
store_ch_record(C#cr{unacked_messages = NewUAM}),
- {noreply, deliver_or_enqueue_n(
- [{Message, true} || Message <- Messages], State)}
+ noreply(deliver_or_enqueue_n(
+ [{Message, true} || Message <- Messages], State))
end;
handle_cast({notify_sent, ChPid}, State) ->
case lookup_ch(ChPid) of
- not_found -> {noreply, State};
+ not_found -> noreply(State);
T = #cr{unsent_message_count =Count} ->
- {noreply, possibly_unblock(
- T#cr{unsent_message_count = Count - 1},
- State)}
+ noreply(possibly_unblock(
+ T#cr{unsent_message_count = Count - 1},
+ State))
end.
handle_info({'DOWN', MonitorRef, process, DownPid, _Reason},
@@ -681,6 +688,9 @@ handle_info({'DOWN', MonitorRef, process, DownPid, _Reason},
handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) ->
handle_ch_down(DownPid, State);
+handle_info(timeout, State) ->
+ {noreply, State, hibernate};
+
handle_info(Info, State) ->
?LOGDEBUG("Info in queue: ~p~n", [Info]),
{stop, {unhandled_info, Info}, State}.
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 0544d32e..1eb421ca 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -28,7 +28,7 @@
-include("rabbit.hrl").
-export([start_link/4, do/2, do/3, shutdown/1]).
--export([send_command/2, deliver/4]).
+-export([send_command/2, deliver/4, conserve_memory/2]).
%% callbacks
-export([init/2, handle_message/2]).
@@ -49,6 +49,7 @@
-spec(shutdown/1 :: (pid()) -> 'ok').
-spec(send_command/2 :: (pid(), amqp_method()) -> 'ok').
-spec(deliver/4 :: (pid(), ctag(), bool(), msg()) -> 'ok').
+-spec(conserve_memory/2 :: (pid(), bool()) -> 'ok').
-endif.
@@ -77,11 +78,18 @@ deliver(Pid, ConsumerTag, AckRequired, Msg) ->
Pid ! {deliver, ConsumerTag, AckRequired, Msg},
ok.
+conserve_memory(Pid, Conserve) ->
+ Pid ! {conserve_memory, Conserve},
+ ok.
+
%%---------------------------------------------------------------------------
init(ProxyPid, [ReaderPid, WriterPid, Username, VHost]) ->
process_flag(trap_exit, true),
link(WriterPid),
+ %% this is bypassing the proxy so alarms can "jump the queue" and
+ %% be handled promptly
+ rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}),
#ch{state = starting,
proxy_pid = ProxyPid,
reader_pid = ReaderPid,
@@ -129,6 +137,11 @@ handle_message({deliver, ConsumerTag, AckRequired, Msg},
true, ConsumerTag, DeliveryTag, Msg),
State1#ch{next_tag = DeliveryTag + 1};
+handle_message({conserve_memory, Conserve}, State) ->
+ ok = rabbit_writer:send_command(
+ State#ch.writer_pid, #'channel.flow'{active = not(Conserve)}),
+ State;
+
handle_message({'EXIT', _Pid, Reason}, State) ->
terminate(Reason, State);
@@ -573,7 +586,7 @@ handle_method(#'queue.bind'{queue = QueueNameBin,
routing_key = RoutingKey,
nowait = NoWait,
arguments = Arguments}, _, State) ->
- binding_action(fun rabbit_amqqueue:add_binding/4, ExchangeNameBin,
+ binding_action(fun rabbit_exchange:add_binding/4, ExchangeNameBin,
QueueNameBin, RoutingKey, Arguments, #'queue.bind_ok'{},
NoWait, State);
@@ -581,7 +594,7 @@ handle_method(#'queue.unbind'{queue = QueueNameBin,
exchange = ExchangeNameBin,
routing_key = RoutingKey,
arguments = Arguments}, _, State) ->
- binding_action(fun rabbit_amqqueue:delete_binding/4, ExchangeNameBin,
+ binding_action(fun rabbit_exchange:delete_binding/4, ExchangeNameBin,
QueueNameBin, RoutingKey, Arguments, #'queue.unbind_ok'{},
false, State);
@@ -619,6 +632,12 @@ handle_method(#'channel.flow'{active = _}, _, State) ->
%% FIXME: implement
{reply, #'channel.flow_ok'{active = true}, State};
+handle_method(#'channel.flow_ok'{active = _}, _, State) ->
+ %% TODO: We may want to correlate this to channel.flow messages we
+ %% have sent, and complain if we get an unsolicited
+ %% channel.flow_ok, or the client refuses our flow request.
+ {noreply, State};
+
handle_method(_MethodRecord, _Content, _State) ->
rabbit_misc:protocol_error(
command_invalid, "unimplemented method", []).
@@ -635,7 +654,7 @@ binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments,
ActualRoutingKey = expand_routing_key_shortcut(QueueNameBin, RoutingKey,
State),
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
- case Fun(QueueName, ExchangeName, ActualRoutingKey, Arguments) of
+ case Fun(ExchangeName, QueueName, ActualRoutingKey, Arguments) of
{error, queue_not_found} ->
rabbit_misc:protocol_error(
not_found, "no ~s", [rabbit_misc:rs(QueueName)]);
@@ -651,8 +670,7 @@ binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments,
rabbit_misc:protocol_error(
not_allowed, "durability settings of ~s incompatible with ~s",
[rabbit_misc:rs(QueueName), rabbit_misc:rs(ExchangeName)]);
- {ok, _BindingCount} ->
- return_ok(State, NoWait, ReturnMethod)
+ ok -> return_ok(State, NoWait, ReturnMethod)
end.
publish(Mandatory, Immediate, Message, QPids,
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index bb132a50..a8c54438 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -29,13 +29,18 @@
-include("rabbit_framing.hrl").
-export([recover/0, declare/5, lookup/1, lookup_or_die/1,
- list_vhost_exchanges/1, list_exchange_bindings/1,
+ list_vhost_exchanges/1,
simple_publish/6, simple_publish/3,
route/2]).
--export([add_binding/2, delete_binding/2]).
+-export([add_binding/4, delete_binding/4]).
-export([delete/2]).
+-export([delete_bindings_for_queue/1]).
-export([check_type/1, assert_type/2, topic_matches/2]).
+%% EXTENDED API
+-export([list_exchange_bindings/1]).
+-export([list_queue_bindings/1]).
+
-import(mnesia).
-import(sets).
-import(lists).
@@ -48,7 +53,8 @@
-type(publish_res() :: {'ok', [pid()]} |
not_found() | {'error', 'unroutable' | 'not_delivered'}).
-
+-type(bind_res() :: 'ok' |
+ {'error', 'queue_not_found' | 'exchange_not_found'}).
-spec(recover/0 :: () -> 'ok').
-spec(declare/5 :: (exchange_name(), exchange_type(), bool(), bool(),
amqp_table()) -> exchange()).
@@ -57,37 +63,46 @@
-spec(lookup/1 :: (exchange_name()) -> {'ok', exchange()} | not_found()).
-spec(lookup_or_die/1 :: (exchange_name()) -> exchange()).
-spec(list_vhost_exchanges/1 :: (vhost()) -> [exchange()]).
--spec(list_exchange_bindings/1 :: (exchange_name()) ->
- [{queue_name(), routing_key(), amqp_table()}]).
-spec(simple_publish/6 ::
(bool(), bool(), exchange_name(), routing_key(), binary(), binary()) ->
publish_res()).
-spec(simple_publish/3 :: (bool(), bool(), message()) -> publish_res()).
-spec(route/2 :: (exchange(), routing_key()) -> [pid()]).
--spec(add_binding/2 :: (binding_spec(), amqqueue()) ->
- 'ok' | not_found() |
- {'error', 'durability_settings_incompatible'}).
--spec(delete_binding/2 :: (binding_spec(), amqqueue()) ->
- 'ok' | not_found()).
+-spec(add_binding/4 ::
+ (exchange_name(), queue_name(), routing_key(), amqp_table()) ->
+ bind_res() | {'error', 'durability_settings_incompatible'}).
+-spec(delete_binding/4 ::
+ (exchange_name(), queue_name(), routing_key(), amqp_table()) ->
+ bind_res() | {'error', 'binding_not_found'}).
+-spec(delete_bindings_for_queue/1 :: (queue_name()) -> 'ok').
-spec(topic_matches/2 :: (binary(), binary()) -> bool()).
-spec(delete/2 :: (exchange_name(), bool()) ->
'ok' | not_found() | {'error', 'in_use'}).
+-spec(list_queue_bindings/1 :: (queue_name()) ->
+ [{exchange_name(), routing_key(), amqp_table()}]).
+-spec(list_exchange_bindings/1 :: (exchange_name()) ->
+ [{queue_name(), routing_key(), amqp_table()}]).
-endif.
%%----------------------------------------------------------------------------
recover() ->
- ok = recover_durable_exchanges(),
- ok.
-
-recover_durable_exchanges() ->
rabbit_misc:execute_mnesia_transaction(
fun () ->
- mnesia:foldl(fun (Exchange, Acc) ->
- ok = mnesia:write(Exchange),
- Acc
- end, ok, durable_exchanges)
+ mnesia:foldl(
+ fun (Exchange, Acc) ->
+ ok = mnesia:write(Exchange),
+ Acc
+ end, ok, durable_exchanges),
+ mnesia:foldl(
+ fun (Route, Acc) ->
+ {_, ReverseRoute} = route_with_reverse(Route),
+ ok = mnesia:write(Route),
+ ok = mnesia:write(ReverseRoute),
+ Acc
+ end, ok, durable_routes),
+ ok
end).
declare(ExchangeName, Type, Durable, AutoDelete, Args) ->
@@ -143,22 +158,9 @@ list_vhost_exchanges(VHostPath) ->
mnesia:dirty_match_object(
#exchange{name = rabbit_misc:r(VHostPath, exchange), _ = '_'}).
-list_exchange_bindings(Name) ->
- [{QueueName, RoutingKey, Arguments} ||
- #binding{handlers = Handlers} <- bindings_for_exchange(Name),
- #handler{binding_spec = #binding_spec{routing_key = RoutingKey,
- arguments = Arguments},
- queue = QueueName} <- Handlers].
-
-bindings_for_exchange(Name) ->
- qlc:e(qlc:q([B || B = #binding{key = K} <- mnesia:table(binding),
- element(1, K) == Name])).
-
-empty_handlers() ->
- [].
-
%% Usable by Erlang code that wants to publish messages.
-simple_publish(Mandatory, Immediate, ExchangeName, RoutingKeyBin, ContentTypeBin, BodyBin) ->
+simple_publish(Mandatory, Immediate, ExchangeName, RoutingKeyBin,
+ ContentTypeBin, BodyBin) ->
{ClassId, _MethodId} = rabbit_framing:method_id('basic.publish'),
Content = #content{class_id = ClassId,
properties = #'P_basic'{content_type = ContentTypeBin},
@@ -188,121 +190,173 @@ simple_publish(Mandatory, Immediate,
%% The function ensures that a qpid appears in the return list exactly
%% as many times as a message should be delivered to it. With the
%% current exchange types that is at most once.
+%%
+%% TODO: Maybe this should be handled by a cursor instead.
route(#exchange{name = Name, type = topic}, RoutingKey) ->
- sets:to_list(
- sets:union(
- mnesia:activity(
- async_dirty,
- fun () ->
- qlc:e(qlc:q([handler_qpids(H) ||
- #binding{key = {Name1, PatternKey},
- handlers = H}
- <- mnesia:table(binding),
- Name == Name1,
- topic_matches(PatternKey, RoutingKey)]))
- end)));
-
-route(#exchange{name = Name, type = Type}, RoutingKey) ->
- BindingKey = delivery_key_for_type(Type, Name, RoutingKey),
- case rabbit_misc:dirty_read({binding, BindingKey}) of
- {ok, #binding{handlers = H}} -> sets:to_list(handler_qpids(H));
- {error, not_found} -> []
- end.
+ Query = qlc:q([QName ||
+ #route{binding = #binding{
+ exchange_name = ExchangeName,
+ queue_name = QName,
+ key = BindingKey}} <- mnesia:table(route),
+ ExchangeName == Name,
+ %% TODO: This causes a full scan for each entry
+ %% with the same exchange (see bug 19336)
+ topic_matches(BindingKey, RoutingKey)]),
+ lookup_qpids(mnesia:async_dirty(fun qlc:e/1, [Query]));
+
+route(X = #exchange{type = fanout}, _) ->
+ route_internal(X, '_');
+
+route(X = #exchange{type = direct}, RoutingKey) ->
+ route_internal(X, RoutingKey).
+
+route_internal(#exchange{name = Name}, RoutingKey) ->
+ MatchHead = #route{binding = #binding{exchange_name = Name,
+ queue_name = '$1',
+ key = RoutingKey,
+ _ = '_'}},
+ lookup_qpids(mnesia:dirty_select(route, [{MatchHead, [], ['$1']}])).
+
+lookup_qpids(Queues) ->
+ sets:fold(
+ fun(Key, Acc) ->
+ [#amqqueue{pid = QPid}] = mnesia:dirty_read({amqqueue, Key}),
+ [QPid | Acc]
+ end, [], sets:from_list(Queues)).
+
+%% TODO: Should all of the route and binding management not be
+%% refactored to its own module, especially seeing as unbind will have
+%% to be implemented for 0.91 ?
+
+delete_bindings_for_exchange(ExchangeName) ->
+ indexed_delete(
+ #route{binding = #binding{exchange_name = ExchangeName,
+ _ = '_'}},
+ fun delete_forward_routes/1, fun mnesia:delete_object/1).
+
+delete_bindings_for_queue(QueueName) ->
+ Exchanges = exchanges_for_queue(QueueName),
+ indexed_delete(
+ reverse_route(#route{binding = #binding{queue_name = QueueName,
+ _ = '_'}}),
+ fun mnesia:delete_object/1, fun delete_forward_routes/1),
+ [begin
+ [X] = mnesia:read({exchange, ExchangeName}),
+ ok = maybe_auto_delete(X)
+ end || ExchangeName <- Exchanges],
+ ok.
-delivery_key_for_type(fanout, Name, _RoutingKey) ->
- {Name, fanout};
-delivery_key_for_type(_Type, Name, RoutingKey) ->
- {Name, RoutingKey}.
+indexed_delete(Match, ForwardsDeleteFun, ReverseDeleteFun) ->
+ [begin
+ ok = ReverseDeleteFun(reverse_route(Route)),
+ ok = ForwardsDeleteFun(Route)
+ end || Route <- mnesia:match_object(Match)],
+ ok.
-call_with_exchange(Name, Fun) ->
- case mnesia:wread({exchange, Name}) of
- [] -> {error, not_found};
- [X] -> Fun(X)
- end.
+delete_forward_routes(Route) ->
+ ok = mnesia:delete_object(Route),
+ ok = mnesia:delete_object(durable_routes, Route, write).
-make_handler(BindingSpec, #amqqueue{name = QueueName, pid = QPid}) ->
- #handler{binding_spec = BindingSpec, queue = QueueName, qpid = QPid}.
+exchanges_for_queue(QueueName) ->
+ MatchHead = reverse_route(
+ #route{binding = #binding{exchange_name = '$1',
+ queue_name = QueueName,
+ _ = '_'}}),
+ sets:to_list(
+ sets:from_list(
+ mnesia:select(reverse_route, [{MatchHead, [], ['$1']}]))).
-add_binding(BindingSpec = #binding_spec{exchange_name = ExchangeName,
- routing_key = RoutingKey}, Q) ->
- call_with_exchange(
- ExchangeName,
- fun (X) -> if Q#amqqueue.durable and not(X#exchange.durable) ->
- {error, durability_settings_incompatible};
- true ->
- internal_add_binding(
- X, RoutingKey, make_handler(BindingSpec, Q))
- end
+has_bindings(ExchangeName) ->
+ MatchHead = #route{binding = #binding{exchange_name = ExchangeName,
+ queue_name = '$1',
+ _ = '_'}},
+ continue(mnesia:select(route, [{MatchHead, [], ['$1']}], 1, read)).
+
+continue('$end_of_table') -> false;
+continue({[_|_], _}) -> true;
+continue({[], Continuation}) -> continue(mnesia:select(Continuation)).
+
+call_with_exchange(Exchange, Fun) ->
+ rabbit_misc:execute_mnesia_transaction(
+ fun() -> case mnesia:read({exchange, Exchange}) of
+ [] -> {error, exchange_not_found};
+ [X] -> Fun(X)
+ end
end).
-delete_binding(BindingSpec = #binding_spec{exchange_name = ExchangeName,
- routing_key = RoutingKey}, Q) ->
+call_with_exchange_and_queue(Exchange, Queue, Fun) ->
call_with_exchange(
- ExchangeName,
- fun (X) -> ok = internal_delete_binding(
- X, RoutingKey, make_handler(BindingSpec, Q)),
- maybe_auto_delete(X)
+ Exchange,
+ fun(X) -> case mnesia:read({amqqueue, Queue}) of
+ [] -> {error, queue_not_found};
+ [Q] -> Fun(X, Q)
+ end
end).
-%% Must run within a transaction.
-maybe_auto_delete(#exchange{auto_delete = false}) ->
- ok;
-maybe_auto_delete(#exchange{name = ExchangeName, auto_delete = true}) ->
- case internal_delete(ExchangeName, true) of
- {error, in_use} -> ok;
- ok -> ok
- end.
-
-handlers_isempty([]) -> true;
-handlers_isempty([_|_]) -> false.
-
-extend_handlers(Handlers, Handler) -> [Handler | Handlers].
-
-delete_handler(Handlers, Handler) -> lists:delete(Handler, Handlers).
-
-handler_qpids(Handlers) ->
- sets:from_list([QPid || #handler{qpid = QPid} <- Handlers]).
+add_binding(ExchangeName, QueueName, RoutingKey, Arguments) ->
+ call_with_exchange_and_queue(
+ ExchangeName, QueueName,
+ fun (X, Q) ->
+ if Q#amqqueue.durable and not(X#exchange.durable) ->
+ {error, durability_settings_incompatible};
+ true -> ok = sync_binding(
+ ExchangeName, QueueName, RoutingKey, Arguments,
+ Q#amqqueue.durable, fun mnesia:write/3)
+ end
+ end).
-%% Must run within a transaction.
-internal_add_binding(#exchange{name = ExchangeName, type = Type},
- RoutingKey, Handler) ->
- BindingKey = delivery_key_for_type(Type, ExchangeName, RoutingKey),
- ok = add_handler_to_binding(BindingKey, Handler).
+delete_binding(ExchangeName, QueueName, RoutingKey, Arguments) ->
+ call_with_exchange_and_queue(
+ ExchangeName, QueueName,
+ fun (X, Q) ->
+ ok = sync_binding(
+ ExchangeName, QueueName, RoutingKey, Arguments,
+ Q#amqqueue.durable, fun mnesia:delete_object/3),
+ maybe_auto_delete(X)
+ end).
-%% Must run within a transaction.
-internal_delete_binding(#exchange{name = ExchangeName, type = Type}, RoutingKey, Handler) ->
- BindingKey = delivery_key_for_type(Type, ExchangeName, RoutingKey),
- remove_handler_from_binding(BindingKey, Handler),
+sync_binding(ExchangeName, QueueName, RoutingKey, Arguments, Durable, Fun) ->
+ Binding = #binding{exchange_name = ExchangeName,
+ queue_name = QueueName,
+ key = RoutingKey,
+ args = Arguments},
+ ok = case Durable of
+ true -> Fun(durable_routes, #route{binding = Binding}, write);
+ false -> ok
+ end,
+ [ok, ok] = [Fun(element(1, R), R, write) ||
+ R <- tuple_to_list(route_with_reverse(Binding))],
ok.
-%% Must run within a transaction.
-add_handler_to_binding(BindingKey, Handler) ->
- ok = case mnesia:wread({binding, BindingKey}) of
- [] ->
- ok = mnesia:write(
- #binding{key = BindingKey,
- handlers = extend_handlers(
- empty_handlers(), Handler)});
- [B = #binding{handlers = H}] ->
- ok = mnesia:write(
- B#binding{handlers = extend_handlers(H, Handler)})
- end.
-
-%% Must run within a transaction.
-remove_handler_from_binding(BindingKey, Handler) ->
- case mnesia:wread({binding, BindingKey}) of
- [] -> empty;
- [B = #binding{handlers = H}] ->
- H1 = delete_handler(H, Handler),
- case handlers_isempty(H1) of
- true ->
- ok = mnesia:delete({binding, BindingKey}),
- empty;
- _ ->
- ok = mnesia:write(B#binding{handlers = H1}),
- not_empty
- end
- end.
+route_with_reverse(#route{binding = Binding}) ->
+ route_with_reverse(Binding);
+route_with_reverse(Binding = #binding{}) ->
+ Route = #route{binding = Binding},
+ {Route, reverse_route(Route)}.
+
+reverse_route(#route{binding = Binding}) ->
+ #reverse_route{reverse_binding = reverse_binding(Binding)};
+
+reverse_route(#reverse_route{reverse_binding = Binding}) ->
+ #route{binding = reverse_binding(Binding)}.
+
+reverse_binding(#reverse_binding{exchange_name = Exchange,
+ queue_name = Queue,
+ key = Key,
+ args = Args}) ->
+ #binding{exchange_name = Exchange,
+ queue_name = Queue,
+ key = Key,
+ args = Args};
+
+reverse_binding(#binding{exchange_name = Exchange,
+ queue_name = Queue,
+ key = Key,
+ args = Args}) ->
+ #reverse_binding{exchange_name = Exchange,
+ queue_name = Queue,
+ key = Key,
+ args = Args}.
split_topic_key(Key) ->
{ok, KeySplit} = regexp:split(binary_to_list(Key), "\\."),
@@ -331,46 +385,50 @@ last_topic_match(P, R, []) ->
last_topic_match(P, R, [BacktrackNext | BacktrackList]) ->
topic_matches1(P, R) or last_topic_match(P, [BacktrackNext | R], BacktrackList).
-delete(ExchangeName, IfUnused) ->
- rabbit_misc:execute_mnesia_transaction(
- fun () -> internal_delete(ExchangeName, IfUnused) end).
-
-internal_delete(ExchangeName, _IfUnused = true) ->
- Bindings = bindings_for_exchange(ExchangeName),
- case Bindings of
- [] -> do_internal_delete(ExchangeName, Bindings);
- _ ->
- case lists:all(fun (#binding{handlers = H}) -> handlers_isempty(H) end,
- Bindings) of
- true ->
- %% There are no handlers anywhere in any of the
- %% bindings for this exchange.
- do_internal_delete(ExchangeName, Bindings);
- false ->
- %% There was at least one real handler
- %% present. It's still in use.
- {error, in_use}
- end
- end;
-internal_delete(ExchangeName, false) ->
- do_internal_delete(ExchangeName, bindings_for_exchange(ExchangeName)).
-
-forcibly_remove_handlers(Handlers) ->
- lists:foreach(
- fun (#handler{binding_spec = BindingSpec, queue = QueueName}) ->
- ok = rabbit_amqqueue:binding_forcibly_removed(
- BindingSpec, QueueName)
- end, Handlers),
+delete(ExchangeName, _IfUnused = true) ->
+ call_with_exchange(ExchangeName, fun conditional_delete/1);
+delete(ExchangeName, _IfUnused = false) ->
+ call_with_exchange(ExchangeName, fun unconditional_delete/1).
+
+maybe_auto_delete(#exchange{auto_delete = false}) ->
+ ok;
+maybe_auto_delete(Exchange = #exchange{auto_delete = true}) ->
+ conditional_delete(Exchange),
ok.
-do_internal_delete(ExchangeName, Bindings) ->
- case mnesia:wread({exchange, ExchangeName}) of
- [] -> {error, not_found};
- _ ->
- lists:foreach(fun (#binding{key = K, handlers = H}) ->
- ok = forcibly_remove_handlers(H),
- ok = mnesia:delete({binding, K})
- end, Bindings),
- ok = mnesia:delete({durable_exchanges, ExchangeName}),
- ok = mnesia:delete({exchange, ExchangeName})
+conditional_delete(Exchange = #exchange{name = ExchangeName}) ->
+ case has_bindings(ExchangeName) of
+ false -> unconditional_delete(Exchange);
+ true -> {error, in_use}
end.
+
+unconditional_delete(#exchange{name = ExchangeName}) ->
+ ok = delete_bindings_for_exchange(ExchangeName),
+ ok = mnesia:delete({durable_exchanges, ExchangeName}),
+ ok = mnesia:delete({exchange, ExchangeName}).
+
+%%----------------------------------------------------------------------------
+%% EXTENDED API
+%% These are API calls that are not used by the server internally,
+%% they are exported for embedded clients to use
+
+%% This is currently used in mod_rabbit.erl (XMPP) and expects this to
+%% return {QueueName, RoutingKey, Arguments} tuples
+list_exchange_bindings(ExchangeName) ->
+ Route = #route{binding = #binding{exchange_name = ExchangeName,
+ _ = '_'}},
+ [{QueueName, RoutingKey, Arguments} ||
+ #route{binding = #binding{queue_name = QueueName,
+ key = RoutingKey,
+ args = Arguments}}
+ <- mnesia:dirty_match_object(Route)].
+
+% Refactoring is left as an exercise for the reader
+list_queue_bindings(QueueName) ->
+ Route = #route{binding = #binding{queue_name = QueueName,
+ _ = '_'}},
+ [{ExchangeName, RoutingKey, Arguments} ||
+ #route{binding = #binding{exchange_name = ExchangeName,
+ key = RoutingKey,
+ args = Arguments}}
+ <- mnesia:dirty_match_object(Route)].
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 89648f4f..7638af58 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -68,7 +68,8 @@
-spec(get_config/2 :: (atom(), A) -> A).
-spec(set_config/2 :: (atom(), any()) -> 'ok').
-spec(dirty_read/1 :: ({atom(), any()}) -> {'ok', any()} | not_found()).
--spec(r/3 :: (vhost(), K, resource_name()) -> r(K) when is_subtype(K, atom())).
+-spec(r/3 :: (vhost() | r(atom()), K, resource_name()) -> r(K)
+ when is_subtype(K, atom())).
-spec(r/2 :: (vhost(), K) -> #resource{virtual_host :: vhost(),
kind :: K,
name :: '_'}
@@ -210,7 +211,8 @@ with_exit_handler(Handler, Thunk) ->
try
Thunk()
catch
- exit:{R, _} when R =:= noproc; R =:= normal -> Handler()
+ exit:{R, _} when R =:= noproc; R =:= normal; R =:= shutdown ->
+ Handler()
end.
with_user(Username, Thunk) ->
@@ -236,6 +238,7 @@ with_vhost(VHostPath, Thunk) ->
with_user_and_vhost(Username, VHostPath, Thunk) ->
with_user(Username, with_vhost(VHostPath, Thunk)).
+
execute_mnesia_transaction(TxFun) ->
%% Making this a sync_transaction allows us to use dirty_read
%% elsewhere and get a consistent result even when that read
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 4ae367ba..9b67135d 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -105,7 +105,13 @@ table_definitions() ->
{rabbit_config, [{disc_copies, [node()]}]},
{listener, [{type, bag},
{attributes, record_info(fields, listener)}]},
- {binding, [{attributes, record_info(fields, binding)}]},
+ {durable_routes, [{disc_copies, [node()]},
+ {record_name, route},
+ {attributes, record_info(fields, route)}]},
+ {route, [{type, ordered_set},
+ {attributes, record_info(fields, route)}]},
+ {reverse_route, [{type, ordered_set},
+ {attributes, record_info(fields, reverse_route)}]},
{durable_exchanges, [{disc_copies, [node()]},
{record_name, exchange},
{attributes, record_info(fields, exchange)}]},
@@ -255,7 +261,7 @@ init_db(ClusterNodes) ->
end.
create_schema() ->
- mnesia:stop(),
+ mnesia:stop(),
rabbit_misc:ensure_ok(mnesia:create_schema([node()]),
cannot_create_schema),
rabbit_misc:ensure_ok(mnesia:start(),
diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl
index 2c7fa2ab..a2688625 100644
--- a/src/rabbit_writer.erl
+++ b/src/rabbit_writer.erl
@@ -36,6 +36,8 @@
-record(wstate, {sock, channel, frame_max}).
+-define(HIBERNATE_AFTER, 5000).
+
%%----------------------------------------------------------------------------
-ifdef(use_specs).
@@ -63,6 +65,8 @@ start(Sock, Channel, FrameMax) ->
mainloop(State) ->
receive
Message -> ?MODULE:mainloop(handle_message(Message, State))
+ after ?HIBERNATE_AFTER ->
+ erlang:hibernate(?MODULE, mainloop, [State])
end.
handle_message({send_command, MethodRecord},
diff --git a/src/tcp_listener.erl b/src/tcp_listener.erl
index 3943161a..dc38b594 100644
--- a/src/tcp_listener.erl
+++ b/src/tcp_listener.erl
@@ -58,9 +58,9 @@ init({IPAddress, Port, SocketOpts,
AcceptorSup, [LSock])
end,
lists:duplicate(ConcurrentAcceptorCount, dummy)),
- error_logger:info_msg(
- "started TCP listener on ~s:~p~n",
- [inet_parse:ntoa(IPAddress), Port]),
+ {ok, {LIPAddress, LPort}} = inet:sockname(LSock),
+ error_logger:info_msg("started TCP listener on ~s:~p~n",
+ [inet_parse:ntoa(LIPAddress), LPort]),
apply(M, F, A ++ [IPAddress, Port]),
{ok, #state{sock=LSock,
on_startup = OnStartup, on_shutdown = OnShutdown}};