summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2010-03-05 17:30:56 +0000
committerMatthew Sackman <matthew@lshift.net>2010-03-05 17:30:56 +0000
commit970293125a3bba065840f5daa431e9946b301c3a (patch)
tree17dc5d0fcfe1f31d5f8b02957690821d00a65f95
parenteb1a554779ab77ef063ac7d7e27f33f7c39be3fa (diff)
parent97ea22dab345e1c237518f4e5a111462a7ee022c (diff)
downloadrabbitmq-server-970293125a3bba065840f5daa431e9946b301c3a.tar.gz
Merging bug 21939 into default
-rw-r--r--codegen.py15
-rw-r--r--include/rabbit_exchange_type_spec.hrl42
-rw-r--r--include/rabbit_framing_spec.hrl2
-rw-r--r--packaging/RPMS/Fedora/rabbitmq-server.spec11
-rw-r--r--packaging/debs/Debian/debian/changelog6
-rw-r--r--src/rabbit.erl6
-rw-r--r--src/rabbit_amqqueue.erl73
-rw-r--r--src/rabbit_amqqueue_process.erl6
-rw-r--r--src/rabbit_channel.erl105
-rw-r--r--src/rabbit_exchange.erl531
-rw-r--r--src/rabbit_exchange_type.erl61
-rw-r--r--src/rabbit_exchange_type_direct.erl63
-rw-r--r--src/rabbit_exchange_type_fanout.erl61
-rw-r--r--src/rabbit_exchange_type_headers.erl137
-rw-r--r--src/rabbit_exchange_type_registry.erl129
-rw-r--r--src/rabbit_exchange_type_topic.erl101
-rw-r--r--src/rabbit_limiter.erl77
-rw-r--r--src/rabbit_misc.erl28
-rw-r--r--src/rabbit_multi.erl5
-rw-r--r--src/rabbit_networking.erl28
-rw-r--r--src/rabbit_router.erl45
-rw-r--r--src/rabbit_tests.erl12
22 files changed, 1138 insertions, 406 deletions
diff --git a/codegen.py b/codegen.py
index 96109610..91c70e81 100644
--- a/codegen.py
+++ b/codegen.py
@@ -126,7 +126,7 @@ def printFileHeader():
%%
%% Contributor(s): ______________________________________.
%%"""
-
+
def genErl(spec):
def erlType(domain):
return erlangTypeMap[spec.resolveDomain(domain)]
@@ -151,7 +151,7 @@ def genErl(spec):
def genMethodHasContent(m):
print "method_has_content(%s) -> %s;" % (m.erlangName(), str(m.hasContent).lower())
-
+
def genMethodIsSynchronous(m):
hasNoWait = "nowait" in fieldNameList(m.arguments)
if m.isSynchronous and hasNoWait:
@@ -219,6 +219,9 @@ def genErl(spec):
else:
pass
+ def genMethodRecord(m):
+ print "method_record(%s) -> #%s{};" % (m.erlangName(), m.erlangName())
+
def genDecodeMethodFields(m):
packedFields = packMethodFields(m.arguments)
binaryPattern = ', '.join([methodFieldFragment(f) for f in packedFields])
@@ -299,6 +302,7 @@ def genErl(spec):
-export([method_id/1]).
-export([method_has_content/1]).
-export([is_method_synchronous/1]).
+-export([method_record/1]).
-export([method_fieldnames/1]).
-export([decode_method_fields/2]).
-export([decode_properties/2]).
@@ -323,6 +327,9 @@ bitvalue(undefined) -> 0.
for m in methods: genMethodIsSynchronous(m)
print "is_method_synchronous(Name) -> exit({unknown_method_name, Name})."
+ for m in methods: genMethodRecord(m)
+ print "method_record(Name) -> exit({unknown_method_name, Name})."
+
for m in methods: genMethodFieldNames(m)
print "method_fieldnames(Name) -> exit({unknown_method_name, Name})."
@@ -362,7 +369,7 @@ def genHrl(spec):
result += ' = ' + conv_fn(field.defaultvalue)
return result
return ', '.join([fillField(f) for f in fields])
-
+
methods = spec.allMethods()
printFileHeader()
@@ -386,7 +393,7 @@ def generateErl(specPath):
def generateHrl(specPath):
genHrl(AmqpSpec(specPath))
-
+
if __name__ == "__main__":
do_main(generateHrl, generateErl)
diff --git a/include/rabbit_exchange_type_spec.hrl b/include/rabbit_exchange_type_spec.hrl
new file mode 100644
index 00000000..9864f1eb
--- /dev/null
+++ b/include/rabbit_exchange_type_spec.hrl
@@ -0,0 +1,42 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2009 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+-ifdef(use_specs).
+
+-spec(description/0 :: () -> [{atom(), any()}]).
+-spec(publish/2 :: (exchange(), delivery()) -> {routing_result(), [pid()]}).
+-spec(validate/1 :: (exchange()) -> 'ok').
+-spec(create/1 :: (exchange()) -> 'ok').
+-spec(recover/2 :: (exchange(), list(binding())) -> 'ok').
+-spec(delete/2 :: (exchange(), list(binding())) -> 'ok').
+-spec(add_binding/2 :: (exchange(), binding()) -> 'ok').
+-spec(remove_bindings/2 :: (exchange(), list(binding())) -> 'ok').
+
+-endif.
diff --git a/include/rabbit_framing_spec.hrl b/include/rabbit_framing_spec.hrl
index 199a0f89..1a979899 100644
--- a/include/rabbit_framing_spec.hrl
+++ b/include/rabbit_framing_spec.hrl
@@ -56,5 +56,5 @@
-type(password() :: binary()).
-type(vhost() :: binary()).
-type(ctag() :: binary()).
--type(exchange_type() :: 'direct' | 'topic' | 'fanout').
+-type(exchange_type() :: atom()).
-type(binding_key() :: binary()).
diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec
index 4dd22308..fa9bb2ee 100644
--- a/packaging/RPMS/Fedora/rabbitmq-server.spec
+++ b/packaging/RPMS/Fedora/rabbitmq-server.spec
@@ -65,12 +65,12 @@ mkdir -p %{buildroot}%{_sysconfdir}/rabbitmq
rm %{_maindir}/LICENSE %{_maindir}/LICENSE-MPL-RabbitMQ %{_maindir}/INSTALL
#Build the list of files
-rm -f %{_builddir}/filelist.%{name}.rpm
-echo '%defattr(-,root,root, -)' >> %{_builddir}/filelist.%{name}.rpm
+rm -f %{_builddir}/%{name}.files
+echo '%defattr(-,root,root, -)' >> %{_builddir}/%{name}.files
(cd %{buildroot}; \
find . -type f ! -regex '\.%{_sysconfdir}.*' \
! -regex '\.\(%{_rabbit_erllibdir}\|%{_rabbit_libdir}\).*' \
- | sed -e 's/^\.//' >> %{_builddir}/filelist.%{name}.rpm)
+ | sed -e 's/^\.//' >> %{_builddir}/%{name}.files)
%pre
@@ -103,7 +103,7 @@ if [ $1 = 0 ]; then
# Leave rabbitmq user and group
fi
-%files -f ../filelist.%{name}.rpm
+%files -f ../%{name}.files
%defattr(-,root,root,-)
%attr(0750, rabbitmq, rabbitmq) %dir %{_localstatedir}/lib/rabbitmq
%attr(0750, rabbitmq, rabbitmq) %dir %{_localstatedir}/log/rabbitmq
@@ -118,6 +118,9 @@ fi
rm -rf %{buildroot}
%changelog
+* Mon Feb 15 2010 Matthew Sackman <matthew@lshift.net> 1.7.2-1
+- New Upstream Release
+
* Fri Jan 22 2010 Matthew Sackman <matthew@lshift.net> 1.7.1-1
- New Upstream Release
diff --git a/packaging/debs/Debian/debian/changelog b/packaging/debs/Debian/debian/changelog
index 796a301a..63b50749 100644
--- a/packaging/debs/Debian/debian/changelog
+++ b/packaging/debs/Debian/debian/changelog
@@ -1,3 +1,9 @@
+rabbitmq-server (1.7.2-1) intrepid; urgency=low
+
+ * New Upstream Release
+
+ -- Matthew Sackman <matthew@lshift.net> Mon, 15 Feb 2010 15:54:47 +0000
+
rabbitmq-server (1.7.1-1) intrepid; urgency=low
* New Upstream Release
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 35d3ce4a..6084be1b 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -53,6 +53,12 @@
[{mfa, {rabbit_mnesia, init, []}},
{enables, kernel_ready}]}).
+-rabbit_boot_step({rabbit_exchange_type_registry,
+ [{description, "exchange type registry"},
+ {mfa, {rabbit_sup, start_child,
+ [rabbit_exchange_type_registry]}},
+ {enables, kernel_ready}]}).
+
-rabbit_boot_step({rabbit_log,
[{description, "logging server"},
{mfa, {rabbit_sup, start_child, [rabbit_log]}},
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 3f25d72e..ceec00fd 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -40,7 +40,7 @@
-export([consumers/1, consumers_all/1]).
-export([claim_queue/2]).
-export([basic_get/3, basic_consume/8, basic_cancel/4]).
--export([notify_sent/2, unblock/2]).
+-export([notify_sent/2, unblock/2, flush_all/2]).
-export([commit_all/2, rollback_all/2, notify_down_all/2, limit_all/3]).
-export([on_node_down/1]).
@@ -107,6 +107,7 @@
-spec(basic_cancel/4 :: (amqqueue(), pid(), ctag(), any()) -> 'ok').
-spec(notify_sent/2 :: (pid(), pid()) -> 'ok').
-spec(unblock/2 :: (pid(), pid()) -> 'ok').
+-spec(flush_all/2 :: ([pid()], pid()) -> 'ok').
-spec(internal_declare/2 :: (amqqueue(), boolean()) -> amqqueue()).
-spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()).
-spec(on_node_down/1 :: (erlang_node()) -> 'ok').
@@ -284,7 +285,7 @@ requeue(QPid, MsgIds, ChPid) ->
gen_server2:cast(QPid, {requeue, MsgIds, ChPid}).
ack(QPid, Txn, MsgIds, ChPid) ->
- gen_server2:pcast(QPid, 8, {ack, Txn, MsgIds, ChPid}).
+ gen_server2:pcast(QPid, 7, {ack, Txn, MsgIds, ChPid}).
commit_all(QPids, Txn) ->
safe_pmap_ok(
@@ -329,39 +330,53 @@ basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) ->
infinity).
notify_sent(QPid, ChPid) ->
- gen_server2:pcast(QPid, 8, {notify_sent, ChPid}).
+ gen_server2:pcast(QPid, 7, {notify_sent, ChPid}).
unblock(QPid, ChPid) ->
- gen_server2:pcast(QPid, 8, {unblock, ChPid}).
+ gen_server2:pcast(QPid, 7, {unblock, ChPid}).
+
+flush_all(QPids, ChPid) ->
+ safe_pmap_ok(
+ fun (_) -> ok end,
+ fun (QPid) -> gen_server2:cast(QPid, {flush, ChPid}) end,
+ QPids).
internal_delete(QueueName) ->
- rabbit_misc:execute_mnesia_transaction(
- fun () ->
- case mnesia:wread({rabbit_queue, QueueName}) of
- [] -> {error, not_found};
- [_] ->
- ok = rabbit_exchange:delete_queue_bindings(QueueName),
- ok = mnesia:delete({rabbit_queue, QueueName}),
- ok = mnesia:delete({rabbit_durable_queue, QueueName}),
- ok
- end
- end).
+ case
+ rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ case mnesia:wread({rabbit_queue, QueueName}) of
+ [] -> {error, not_found};
+ [_] ->
+ ok = mnesia:delete({rabbit_queue, QueueName}),
+ ok = mnesia:delete({rabbit_durable_queue, QueueName}),
+ %% we want to execute some things, as
+ %% decided by rabbit_exchange, after the
+ %% transaction.
+ rabbit_exchange:delete_queue_bindings(QueueName)
+ end
+ end) of
+ Err = {error, _} -> Err;
+ PostHook ->
+ PostHook(),
+ ok
+ end.
on_node_down(Node) ->
- rabbit_misc:execute_mnesia_transaction(
- fun () ->
- qlc:fold(
- fun (QueueName, Acc) ->
- ok = rabbit_exchange:delete_transient_queue_bindings(
- QueueName),
- ok = mnesia:delete({rabbit_queue, QueueName}),
- Acc
- end,
- ok,
- qlc:q([QueueName || #amqqueue{name = QueueName, pid = Pid}
- <- mnesia:table(rabbit_queue),
- node(Pid) == Node]))
- end).
+ [Hook() ||
+ Hook <- rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ qlc:e(qlc:q([delete_queue(QueueName) ||
+ #amqqueue{name = QueueName, pid = Pid}
+ <- mnesia:table(rabbit_queue),
+ node(Pid) == Node]))
+ end)],
+ ok.
+
+delete_queue(QueueName) ->
+ Post = rabbit_exchange:delete_transient_queue_bindings(QueueName),
+ ok = mnesia:delete({rabbit_queue, QueueName}),
+ Post.
pseudo_queue(QueueName, Pid) ->
#amqqueue{name = QueueName,
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index e4791f95..19cb5c71 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -826,7 +826,11 @@ handle_cast({limit, ChPid, LimiterPid}, State) ->
end,
NewLimited = Limited andalso LimiterPid =/= undefined,
C#cr{limiter_pid = LimiterPid, is_limit_active = NewLimited}
- end)).
+ end));
+
+handle_cast({flush, ChPid}, State) ->
+ ok = rabbit_channel:flushed(ChPid, self()),
+ noreply(State).
handle_info({'DOWN', MonitorRef, process, DownPid, _Reason},
State = #q{owner = {DownPid, MonitorRef}}) ->
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 585c59dc..3597fcd7 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -1,4 +1,4 @@
-%% The contents of this file are subject to the Mozilla Public Licenses
+%% The contents of this file are subject to the Mozilla Public License
%% Version 1.1 (the "License"); you may not use this file except in
%% compliance with the License. You may obtain a copy of the License at
%% http://www.mozilla.org/MPL/
@@ -36,7 +36,7 @@
-behaviour(gen_server2).
-export([start_link/5, do/2, do/3, shutdown/1]).
--export([send_command/2, deliver/4, conserve_memory/2]).
+-export([send_command/2, deliver/4, conserve_memory/2, flushed/2]).
-export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]).
-export([init/1, terminate/2, code_change/3,
@@ -45,8 +45,8 @@
-record(ch, {state, channel, reader_pid, writer_pid, limiter_pid,
transaction_id, tx_participants, next_tag,
uncommitted_ack_q, unacked_message_q,
- username, virtual_host,
- most_recently_declared_queue, consumer_mapping}).
+ username, virtual_host, most_recently_declared_queue,
+ consumer_mapping, blocking}).
-define(HIBERNATE_AFTER_MIN, 1000).
-define(DESIRED_HIBERNATE, 10000).
@@ -77,6 +77,7 @@
-spec(send_command/2 :: (pid(), amqp_method()) -> 'ok').
-spec(deliver/4 :: (pid(), ctag(), boolean(), msg()) -> 'ok').
-spec(conserve_memory/2 :: (pid(), boolean()) -> 'ok').
+-spec(flushed/2 :: (pid(), pid()) -> 'ok').
-spec(list/0 :: () -> [pid()]).
-spec(info_keys/0 :: () -> [info_key()]).
-spec(info/1 :: (pid()) -> [info()]).
@@ -110,7 +111,10 @@ deliver(Pid, ConsumerTag, AckRequired, Msg) ->
gen_server2:cast(Pid, {deliver, ConsumerTag, AckRequired, Msg}).
conserve_memory(Pid, Conserve) ->
- gen_server2:pcast(Pid, 9, {conserve_memory, Conserve}).
+ gen_server2:pcast(Pid, 8, {conserve_memory, Conserve}).
+
+flushed(Pid, QPid) ->
+ gen_server2:cast(Pid, {flushed, QPid}).
list() ->
pg_local:get_members(rabbit_channels).
@@ -152,7 +156,8 @@ init([Channel, ReaderPid, WriterPid, Username, VHost]) ->
username = Username,
virtual_host = VHost,
most_recently_declared_queue = <<>>,
- consumer_mapping = dict:new()},
+ consumer_mapping = dict:new(),
+ blocking = dict:new()},
hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
@@ -190,6 +195,9 @@ handle_cast({method, Method, Content}, State) ->
{stop, {Reason, erlang:get_stacktrace()}, State}
end;
+handle_cast({flushed, QPid}, State) ->
+ {noreply, queue_blocked(QPid, State)};
+
handle_cast(terminate, State) ->
{stop, normal, State};
@@ -215,7 +223,9 @@ handle_info({'EXIT', WriterPid, Reason = {writer, send_failed, _Error}},
State#ch.reader_pid ! {channel_exit, State#ch.channel, Reason},
{stop, normal, State};
handle_info({'EXIT', _Pid, Reason}, State) ->
- {stop, Reason, State}.
+ {stop, Reason, State};
+handle_info({'DOWN', _MRef, process, QPid, _Reason}, State) ->
+ {noreply, queue_blocked(QPid, State)}.
handle_pre_hibernate(State) ->
ok = clear_permission_cache(),
@@ -331,6 +341,20 @@ check_name(Kind, NameBin = <<"amq.", _/binary>>) ->
check_name(_Kind, NameBin) ->
NameBin.
+queue_blocked(QPid, State = #ch{blocking = Blocking}) ->
+ case dict:find(QPid, Blocking) of
+ error -> State;
+ {ok, MRef} -> true = erlang:demonitor(MRef),
+ Blocking1 = dict:erase(QPid, Blocking),
+ ok = case dict:size(Blocking1) of
+ 0 -> rabbit_writer:send_command(
+ State#ch.writer_pid,
+ #'channel.flow_ok'{active = false});
+ _ -> ok
+ end,
+ State#ch{blocking = Blocking1}
+ end.
+
handle_method(#'channel.open'{}, _, State = #ch{state = starting}) ->
{reply, #'channel.open_ok'{}, State#ch{state = running}};
@@ -540,25 +564,17 @@ handle_method(#'basic.qos'{prefetch_size = Size}, _, _State) when Size /= 0 ->
"prefetch_size!=0 (~w)", [Size]);
handle_method(#'basic.qos'{prefetch_count = PrefetchCount},
- _, State = #ch{ limiter_pid = LimiterPid,
- unacked_message_q = UAMQ }) ->
- NewLimiterPid = case {LimiterPid, PrefetchCount} of
- {undefined, 0} ->
- undefined;
- {undefined, _} ->
- LPid = rabbit_limiter:start_link(self(),
- queue:len(UAMQ)),
- ok = limit_queues(LPid, State),
- LPid;
- {_, 0} ->
- ok = rabbit_limiter:shutdown(LimiterPid),
- ok = limit_queues(undefined, State),
- undefined;
- {_, _} ->
- LimiterPid
- end,
- ok = rabbit_limiter:limit(NewLimiterPid, PrefetchCount),
- {reply, #'basic.qos_ok'{}, State#ch{limiter_pid = NewLimiterPid}};
+ _, State = #ch{limiter_pid = LimiterPid}) ->
+ LimiterPid1 = case {LimiterPid, PrefetchCount} of
+ {undefined, 0} -> undefined;
+ {undefined, _} -> start_limiter(State);
+ {_, _} -> LimiterPid
+ end,
+ LimiterPid2 = case rabbit_limiter:limit(LimiterPid1, PrefetchCount) of
+ ok -> LimiterPid1;
+ stopped -> unlimit_queues(State)
+ end,
+ {reply, #'basic.qos_ok'{}, State#ch{limiter_pid = LimiterPid2}};
handle_method(#'basic.recover'{requeue = true},
_, State = #ch{ transaction_id = none,
@@ -791,9 +807,31 @@ handle_method(#'tx.rollback'{}, _, #ch{transaction_id = none}) ->
handle_method(#'tx.rollback'{}, _, State) ->
{reply, #'tx.rollback_ok'{}, internal_rollback(State)};
-handle_method(#'channel.flow'{active = _}, _, State) ->
- %% FIXME: implement
- {reply, #'channel.flow_ok'{active = true}, State};
+handle_method(#'channel.flow'{active = true}, _,
+ State = #ch{limiter_pid = LimiterPid}) ->
+ LimiterPid1 = case rabbit_limiter:unblock(LimiterPid) of
+ ok -> LimiterPid;
+ stopped -> unlimit_queues(State)
+ end,
+ {reply, #'channel.flow_ok'{active = true},
+ State#ch{limiter_pid = LimiterPid1}};
+
+handle_method(#'channel.flow'{active = false}, _,
+ State = #ch{limiter_pid = LimiterPid,
+ consumer_mapping = Consumers}) ->
+ LimiterPid1 = case LimiterPid of
+ undefined -> start_limiter(State);
+ Other -> Other
+ end,
+ ok = rabbit_limiter:block(LimiterPid1),
+ QPids = consumer_queues(Consumers),
+ Queues = [{QPid, erlang:monitor(process, QPid)} || QPid <- QPids],
+ ok = rabbit_amqqueue:flush_all(QPids, self()),
+ case Queues of
+ [] -> {reply, #'channel.flow_ok'{active = false}, State};
+ _ -> {noreply, State#ch{limiter_pid = LimiterPid1,
+ blocking = dict:from_list(Queues)}}
+ end;
handle_method(#'channel.flow_ok'{active = _}, _, State) ->
%% TODO: We may want to correlate this to channel.flow messages we
@@ -940,9 +978,18 @@ fold_per_queue(F, Acc0, UAQ) ->
dict:fold(fun (QPid, MsgIds, Acc) -> F(QPid, MsgIds, Acc) end,
Acc0, D).
+start_limiter(State = #ch{unacked_message_q = UAMQ}) ->
+ LPid = rabbit_limiter:start_link(self(), queue:len(UAMQ)),
+ ok = limit_queues(LPid, State),
+ LPid.
+
notify_queues(#ch{consumer_mapping = Consumers}) ->
rabbit_amqqueue:notify_down_all(consumer_queues(Consumers), self()).
+unlimit_queues(State) ->
+ ok = limit_queues(undefined, State),
+ undefined.
+
limit_queues(LPid, #ch{consumer_mapping = Consumers}) ->
rabbit_amqqueue:limit_all(consumer_queues(Consumers), self(), LPid).
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 832acd16..1cfba00e 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -30,7 +30,6 @@
%%
-module(rabbit_exchange).
--include_lib("stdlib/include/qlc.hrl").
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
@@ -40,7 +39,7 @@
-export([add_binding/4, delete_binding/4, list_bindings/1]).
-export([delete/2]).
-export([delete_queue_bindings/1, delete_transient_queue_bindings/1]).
--export([check_type/1, assert_type/2, topic_matches/2, headers_match/2]).
+-export([check_type/1, assert_type/2]).
%% EXTENDED API
-export([list_exchange_bindings/1]).
@@ -49,7 +48,6 @@
-import(mnesia).
-import(sets).
-import(lists).
--import(qlc).
-import(regexp).
%%----------------------------------------------------------------------------
@@ -82,10 +80,8 @@
bind_res() | {'error', 'binding_not_found'}).
-spec(list_bindings/1 :: (vhost()) ->
[{exchange_name(), queue_name(), routing_key(), amqp_table()}]).
--spec(delete_queue_bindings/1 :: (queue_name()) -> 'ok').
--spec(delete_transient_queue_bindings/1 :: (queue_name()) -> 'ok').
--spec(topic_matches/2 :: (binary(), binary()) -> boolean()).
--spec(headers_match/2 :: (amqp_table(), amqp_table()) -> boolean()).
+-spec(delete_queue_bindings/1 :: (queue_name()) -> fun(() -> none())).
+-spec(delete_transient_queue_bindings/1 :: (queue_name()) -> fun(() -> none())).
-spec(delete/2 :: (exchange_name(), boolean()) ->
'ok' | not_found() | {'error', 'in_use'}).
-spec(list_queue_bindings/1 :: (queue_name()) ->
@@ -100,17 +96,37 @@
-define(INFO_KEYS, [name, type, durable, auto_delete, arguments].
recover() ->
- ok = rabbit_misc:table_foreach(
- fun(Exchange) -> ok = mnesia:write(rabbit_exchange,
- Exchange, write)
- end, rabbit_durable_exchange),
- ok = rabbit_misc:table_foreach(
- fun(Route) -> {_, ReverseRoute} = route_with_reverse(Route),
- ok = mnesia:write(rabbit_route,
- Route, write),
- ok = mnesia:write(rabbit_reverse_route,
- ReverseRoute, write)
- end, rabbit_durable_route).
+ Exs = rabbit_misc:table_fold(
+ fun(Exchange, Acc) ->
+ ok = mnesia:write(rabbit_exchange, Exchange, write),
+ [Exchange | Acc]
+ end, [], rabbit_durable_exchange),
+ Bs = rabbit_misc:table_fold(
+ fun(Route = #route{binding = B}, Acc) ->
+ {_, ReverseRoute} = route_with_reverse(Route),
+ ok = mnesia:write(rabbit_route,
+ Route, write),
+ ok = mnesia:write(rabbit_reverse_route,
+ ReverseRoute, write),
+ [B | Acc]
+ end, [], rabbit_durable_route),
+ recover_with_bindings(Bs, Exs),
+ ok.
+
+recover_with_bindings(Bs, Exs) ->
+ recover_with_bindings(
+ lists:keysort(#binding.exchange_name, Bs),
+ lists:keysort(#exchange.name, Exs), []).
+
+recover_with_bindings([B = #binding{exchange_name = Name} | Rest],
+ Xs = [#exchange{name = Name} | _],
+ Bindings) ->
+ recover_with_bindings(Rest, Xs, [B | Bindings]);
+recover_with_bindings(Bs, [X = #exchange{type = Type} | Xs], Bindings) ->
+ (type_to_module(Type)):recover(X, Bindings),
+ recover_with_bindings(Bs, Xs, []);
+recover_with_bindings([], [], []) ->
+ ok.
declare(ExchangeName, Type, Durable, AutoDelete, Args) ->
Exchange = #exchange{name = ExchangeName,
@@ -118,31 +134,53 @@ declare(ExchangeName, Type, Durable, AutoDelete, Args) ->
durable = Durable,
auto_delete = AutoDelete,
arguments = Args},
- rabbit_misc:execute_mnesia_transaction(
- fun () ->
- case mnesia:wread({rabbit_exchange, ExchangeName}) of
- [] -> ok = mnesia:write(rabbit_exchange, Exchange, write),
- if Durable ->
- ok = mnesia:write(rabbit_durable_exchange,
- Exchange, write);
- true -> ok
- end,
- Exchange;
- [ExistingX] -> ExistingX
- end
- end).
+ %% We want to upset things if it isn't ok; this is different from
+ %% the other hooks invocations, where we tend to ignore the return
+ %% value.
+ TypeModule = type_to_module(Type),
+ ok = TypeModule:validate(Exchange),
+ case rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ case mnesia:wread({rabbit_exchange, ExchangeName}) of
+ [] ->
+ ok = mnesia:write(rabbit_exchange, Exchange, write),
+ ok = case Durable of
+ true ->
+ mnesia:write(rabbit_durable_exchange,
+ Exchange, write);
+ false ->
+ ok
+ end,
+ {new, Exchange};
+ [ExistingX] ->
+ {existing, ExistingX}
+ end
+ end) of
+ {new, X} -> TypeModule:create(X),
+ X;
+ {existing, X} -> X;
+ Err -> Err
+ end.
-check_type(<<"fanout">>) ->
- fanout;
-check_type(<<"direct">>) ->
- direct;
-check_type(<<"topic">>) ->
- topic;
-check_type(<<"headers">>) ->
- headers;
-check_type(T) ->
- rabbit_misc:protocol_error(
- command_invalid, "invalid exchange type '~s'", [T]).
+%% Used with atoms from records; e.g., the type is expected to exist.
+type_to_module(T) ->
+ case rabbit_exchange_type_registry:lookup_module(T) of
+ {ok, Module} -> Module;
+ {error, not_found} -> rabbit_misc:protocol_error(
+ command_invalid,
+ "invalid exchange type '~s'", [T])
+ end.
+
+%% Used with binaries sent over the wire; the type may not exist.
+check_type(TypeBin) ->
+ case rabbit_exchange_type_registry:binary_to_type(TypeBin) of
+ {error, not_found} ->
+ rabbit_misc:protocol_error(
+ command_invalid, "unknown exchange type '~s'", [TypeBin]);
+ T ->
+ _Module = type_to_module(T),
+ T
+ end.
assert_type(#exchange{ type = ActualType }, RequiredType)
when ActualType == RequiredType ->
@@ -157,7 +195,7 @@ lookup(Name) ->
lookup_or_die(Name) ->
case lookup(Name) of
- {ok, X} -> X;
+ {ok, X} -> X;
{error, not_found} -> rabbit_misc:not_found(Name)
end.
@@ -193,9 +231,8 @@ info_all(VHostPath, Items) -> map(VHostPath, fun (X) -> info(X, Items) end).
publish(X, Delivery) ->
publish(X, [], Delivery).
-publish(X, Seen, Delivery = #delivery{
- message = #basic_message{routing_key = RK, content = C}}) ->
- case rabbit_router:deliver(route(X, RK, C), Delivery) of
+publish(X = #exchange{type = Type}, Seen, Delivery) ->
+ case (type_to_module(Type)):publish(X, Delivery) of
{_, []} = R ->
#exchange{name = XName, arguments = Args} = X,
case rabbit_misc:r_arg(XName, exchange, Args,
@@ -205,95 +242,24 @@ publish(X, Seen, Delivery = #delivery{
AName ->
NewSeen = [XName | Seen],
case lists:member(AName, NewSeen) of
- true ->
- R;
- false ->
- case lookup(AName) of
- {ok, AX} ->
- publish(AX, NewSeen, Delivery);
- {error, not_found} ->
- rabbit_log:warning(
- "alternate exchange for ~s "
- "does not exist: ~s",
- [rabbit_misc:rs(XName),
- rabbit_misc:rs(AName)]),
- R
- end
+ true -> R;
+ false -> case lookup(AName) of
+ {ok, AX} ->
+ publish(AX, NewSeen, Delivery);
+ {error, not_found} ->
+ rabbit_log:warning(
+ "alternate exchange for ~s "
+ "does not exist: ~s",
+ [rabbit_misc:rs(XName),
+ rabbit_misc:rs(AName)]),
+ R
+ end
end
end;
R ->
R
end.
-%% return the list of qpids to which a message with a given routing
-%% key, sent to a particular exchange, should be delivered.
-%%
-%% The function ensures that a qpid appears in the return list exactly
-%% as many times as a message should be delivered to it. With the
-%% current exchange types that is at most once.
-route(X = #exchange{type = topic}, RoutingKey, _Content) ->
- match_bindings(X, fun (#binding{key = BindingKey}) ->
- topic_matches(BindingKey, RoutingKey)
- end);
-
-route(X = #exchange{type = headers}, _RoutingKey, Content) ->
- Headers = case (Content#content.properties)#'P_basic'.headers of
- undefined -> [];
- H -> sort_arguments(H)
- end,
- match_bindings(X, fun (#binding{args = Spec}) ->
- headers_match(Spec, Headers)
- end);
-
-route(X = #exchange{type = fanout}, _RoutingKey, _Content) ->
- match_routing_key(X, '_');
-
-route(X = #exchange{type = direct}, RoutingKey, _Content) ->
- match_routing_key(X, RoutingKey).
-
-sort_arguments(Arguments) ->
- lists:keysort(1, Arguments).
-
-%% TODO: Maybe this should be handled by a cursor instead.
-%% TODO: This causes a full scan for each entry with the same exchange
-match_bindings(#exchange{name = Name}, Match) ->
- Query = qlc:q([QName || #route{binding = Binding = #binding{
- exchange_name = ExchangeName,
- queue_name = QName}} <-
- mnesia:table(rabbit_route),
- ExchangeName == Name,
- Match(Binding)]),
- lookup_qpids(
- try
- mnesia:async_dirty(fun qlc:e/1, [Query])
- catch exit:{aborted, {badarg, _}} ->
- %% work around OTP-7025, which was fixed in R12B-1, by
- %% falling back on a less efficient method
- [QName || #route{binding = Binding = #binding{
- queue_name = QName}} <-
- mnesia:dirty_match_object(
- rabbit_route,
- #route{binding = #binding{exchange_name = Name,
- _ = '_'}}),
- Match(Binding)]
- end).
-
-match_routing_key(#exchange{name = Name}, RoutingKey) ->
- MatchHead = #route{binding = #binding{exchange_name = Name,
- queue_name = '$1',
- key = RoutingKey,
- _ = '_'}},
- lookup_qpids(mnesia:dirty_select(rabbit_route, [{MatchHead, [], ['$1']}])).
-
-lookup_qpids(Queues) ->
- sets:fold(
- fun(Key, Acc) ->
- case mnesia:dirty_read({rabbit_queue, Key}) of
- [#amqqueue{pid = QPid}] -> [QPid | Acc];
- [] -> Acc
- end
- end, [], sets:from_list(Queues)).
-
%% TODO: Should all of the route and binding management not be
%% refactored to its own module, especially seeing as unbind will have
%% to be implemented for 0.91 ?
@@ -302,13 +268,13 @@ delete_exchange_bindings(ExchangeName) ->
[begin
ok = mnesia:delete_object(rabbit_reverse_route,
reverse_route(Route), write),
- ok = delete_forward_routes(Route)
+ ok = delete_forward_routes(Route),
+ Route#route.binding
end || Route <- mnesia:match_object(
rabbit_route,
#route{binding = #binding{exchange_name = ExchangeName,
_ = '_'}},
- write)],
- ok.
+ write)].
delete_queue_bindings(QueueName) ->
delete_queue_bindings(QueueName, fun delete_forward_routes/1).
@@ -317,21 +283,55 @@ delete_transient_queue_bindings(QueueName) ->
delete_queue_bindings(QueueName, fun delete_transient_forward_routes/1).
delete_queue_bindings(QueueName, FwdDeleteFun) ->
- Exchanges = exchanges_for_queue(QueueName),
- [begin
- ok = FwdDeleteFun(reverse_route(Route)),
- ok = mnesia:delete_object(rabbit_reverse_route, Route, write)
- end || Route <- mnesia:match_object(
- rabbit_reverse_route,
- reverse_route(
- #route{binding = #binding{queue_name = QueueName,
- _ = '_'}}),
- write)],
- [begin
- [X] = mnesia:read({rabbit_exchange, ExchangeName}),
- ok = maybe_auto_delete(X)
- end || ExchangeName <- Exchanges],
- ok.
+ DeletedBindings =
+ [begin
+ Route = reverse_route(ReverseRoute),
+ ok = FwdDeleteFun(Route),
+ ok = mnesia:delete_object(rabbit_reverse_route,
+ ReverseRoute, write),
+ Route#route.binding
+ end || ReverseRoute
+ <- mnesia:match_object(
+ rabbit_reverse_route,
+ reverse_route(#route{binding = #binding{
+ queue_name = QueueName,
+ _ = '_'}}),
+ write)],
+ Cleanup = cleanup_deleted_queue_bindings(
+ lists:keysort(#binding.exchange_name, DeletedBindings), []),
+ fun () ->
+ lists:foreach(
+ fun ({{IsDeleted, X = #exchange{ type = Type }}, Bs}) ->
+ Module = type_to_module(Type),
+ case IsDeleted of
+ auto_deleted -> Module:delete(X, Bs);
+ no_delete -> Module:remove_bindings(X, Bs)
+ end
+ end, Cleanup)
+ end.
+
+%% Requires that its input binding list is sorted in exchange-name
+%% order, so that the grouping of bindings (for passing to
+%% cleanup_deleted_queue_bindings1) works properly.
+cleanup_deleted_queue_bindings([], Acc) ->
+ Acc;
+cleanup_deleted_queue_bindings(
+ [B = #binding{exchange_name = ExchangeName} | Bs], Acc) ->
+ cleanup_deleted_queue_bindings(ExchangeName, Bs, [B], Acc).
+
+cleanup_deleted_queue_bindings(
+ ExchangeName, [B = #binding{exchange_name = ExchangeName} | Bs],
+ Bindings, Acc) ->
+ cleanup_deleted_queue_bindings(ExchangeName, Bs, [B | Bindings], Acc);
+cleanup_deleted_queue_bindings(ExchangeName, Deleted, Bindings, Acc) ->
+ %% either Deleted is [], or its head has a non-matching ExchangeName
+ NewAcc = [cleanup_deleted_queue_bindings1(ExchangeName, Bindings) | Acc],
+ cleanup_deleted_queue_bindings(Deleted, NewAcc).
+
+cleanup_deleted_queue_bindings1(ExchangeName, Bindings) ->
+ [X] = mnesia:read({rabbit_exchange, ExchangeName}),
+ {maybe_auto_delete(X), Bindings}.
+
delete_forward_routes(Route) ->
ok = mnesia:delete_object(rabbit_route, Route, write),
@@ -340,15 +340,6 @@ delete_forward_routes(Route) ->
delete_transient_forward_routes(Route) ->
ok = mnesia:delete_object(rabbit_route, Route, write).
-exchanges_for_queue(QueueName) ->
- MatchHead = reverse_route(
- #route{binding = #binding{exchange_name = '$1',
- queue_name = QueueName,
- _ = '_'}}),
- sets:to_list(
- sets:from_list(
- mnesia:select(rabbit_reverse_route, [{MatchHead, [], ['$1']}]))).
-
contains(Table, MatchHead) ->
try
continue(mnesia:select(Table, [{MatchHead, [], ['$_']}], 1, read))
@@ -385,37 +376,61 @@ call_with_exchange_and_queue(Exchange, Queue, Fun) ->
end).
add_binding(ExchangeName, QueueName, RoutingKey, Arguments) ->
- binding_action(
- ExchangeName, QueueName, RoutingKey, Arguments,
- fun (X, Q, B) ->
- if Q#amqqueue.durable and not(X#exchange.durable) ->
- {error, durability_settings_incompatible};
- true -> ok = sync_binding(B, Q#amqqueue.durable,
- fun mnesia:write/3)
- end
- end).
+ case binding_action(
+ ExchangeName, QueueName, RoutingKey, Arguments,
+ fun (X, Q, B) ->
+ if Q#amqqueue.durable and not(X#exchange.durable) ->
+ {error, durability_settings_incompatible};
+ true ->
+ case mnesia:read(rabbit_route, B) of
+ [] ->
+ sync_binding(B, Q#amqqueue.durable,
+ fun mnesia:write/3),
+ {new, X, B};
+ [_R] ->
+ {existing, X, B}
+ end
+ end
+ end) of
+ {new, Exchange = #exchange{ type = Type }, Binding} ->
+ (type_to_module(Type)):add_binding(Exchange, Binding);
+ {existing, _, _} ->
+ ok;
+ Err = {error, _} ->
+ Err
+ end.
delete_binding(ExchangeName, QueueName, RoutingKey, Arguments) ->
- binding_action(
- ExchangeName, QueueName, RoutingKey, Arguments,
- fun (X, Q, B) ->
- case mnesia:match_object(rabbit_route, #route{binding = B},
- write) of
- [] -> {error, binding_not_found};
- _ -> ok = sync_binding(B, Q#amqqueue.durable,
- fun mnesia:delete_object/3),
- maybe_auto_delete(X)
- end
- end).
+ case binding_action(
+ ExchangeName, QueueName, RoutingKey, Arguments,
+ fun (X, Q, B) ->
+ case mnesia:match_object(rabbit_route, #route{binding = B},
+ write) of
+ [] -> {error, binding_not_found};
+ _ -> ok = sync_binding(B, Q#amqqueue.durable,
+ fun mnesia:delete_object/3),
+ {maybe_auto_delete(X), B}
+ end
+ end) of
+ Err = {error, _} ->
+ Err;
+ {{Action, X = #exchange{ type = Type }}, B} ->
+ Module = type_to_module(Type),
+ case Action of
+ auto_delete -> Module:delete(X, [B]);
+ no_delete -> Module:remove_bindings(X, [B])
+ end
+ end.
binding_action(ExchangeName, QueueName, RoutingKey, Arguments, Fun) ->
call_with_exchange_and_queue(
ExchangeName, QueueName,
fun (X, Q) ->
- Fun(X, Q, #binding{exchange_name = ExchangeName,
- queue_name = QueueName,
- key = RoutingKey,
- args = sort_arguments(Arguments)})
+ Fun(X, Q, #binding{
+ exchange_name = ExchangeName,
+ queue_name = QueueName,
+ key = RoutingKey,
+ args = rabbit_misc:sort_field_table(Arguments)})
end).
sync_binding(Binding, Durable, Fun) ->
@@ -440,8 +455,8 @@ list_bindings(VHostPath) ->
rabbit_route,
#route{binding = #binding{
exchange_name = rabbit_misc:r(VHostPath, exchange),
- _ = '_'},
- _ = '_'})].
+ _ = '_'},
+ _ = '_'})].
route_with_reverse(#route{binding = Binding}) ->
route_with_reverse(Binding);
@@ -456,136 +471,60 @@ reverse_route(#reverse_route{reverse_binding = Binding}) ->
#route{binding = reverse_binding(Binding)}.
reverse_binding(#reverse_binding{exchange_name = Exchange,
- queue_name = Queue,
- key = Key,
- args = Args}) ->
+ queue_name = Queue,
+ key = Key,
+ args = Args}) ->
#binding{exchange_name = Exchange,
- queue_name = Queue,
- key = Key,
- args = Args};
+ queue_name = Queue,
+ key = Key,
+ args = Args};
reverse_binding(#binding{exchange_name = Exchange,
- queue_name = Queue,
- key = Key,
- args = Args}) ->
+ queue_name = Queue,
+ key = Key,
+ args = Args}) ->
#reverse_binding{exchange_name = Exchange,
- queue_name = Queue,
- key = Key,
- args = Args}.
-
-default_headers_match_kind() -> all.
-
-parse_x_match(<<"all">>) -> all;
-parse_x_match(<<"any">>) -> any;
-parse_x_match(Other) ->
- rabbit_log:warning("Invalid x-match field value ~p; expected all or any",
- [Other]),
- default_headers_match_kind().
-
-%% Horrendous matching algorithm. Depends for its merge-like
-%% (linear-time) behaviour on the lists:keysort (sort_arguments) that
-%% route/3 and {add,delete}_binding/4 do.
-%%
-%% !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
-%% In other words: REQUIRES BOTH PATTERN AND DATA TO BE SORTED ASCENDING BY KEY.
-%% !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
-%%
-headers_match(Pattern, Data) ->
- MatchKind = case lists:keysearch(<<"x-match">>, 1, Pattern) of
- {value, {_, longstr, MK}} -> parse_x_match(MK);
- {value, {_, Type, MK}} ->
- rabbit_log:warning("Invalid x-match field type ~p "
- "(value ~p); expected longstr",
- [Type, MK]),
- default_headers_match_kind();
- _ -> default_headers_match_kind()
- end,
- headers_match(Pattern, Data, true, false, MatchKind).
-
-headers_match([], _Data, AllMatch, _AnyMatch, all) ->
- AllMatch;
-headers_match([], _Data, _AllMatch, AnyMatch, any) ->
- AnyMatch;
-headers_match([{<<"x-", _/binary>>, _PT, _PV} | PRest], Data,
- AllMatch, AnyMatch, MatchKind) ->
- headers_match(PRest, Data, AllMatch, AnyMatch, MatchKind);
-headers_match(_Pattern, [], _AllMatch, AnyMatch, MatchKind) ->
- headers_match([], [], false, AnyMatch, MatchKind);
-headers_match(Pattern = [{PK, _PT, _PV} | _], [{DK, _DT, _DV} | DRest],
- AllMatch, AnyMatch, MatchKind) when PK > DK ->
- headers_match(Pattern, DRest, AllMatch, AnyMatch, MatchKind);
-headers_match([{PK, _PT, _PV} | PRest], Data = [{DK, _DT, _DV} | _],
- _AllMatch, AnyMatch, MatchKind) when PK < DK ->
- headers_match(PRest, Data, false, AnyMatch, MatchKind);
-headers_match([{PK, PT, PV} | PRest], [{DK, DT, DV} | DRest],
- AllMatch, AnyMatch, MatchKind) when PK == DK ->
- {AllMatch1, AnyMatch1} =
- if
- %% It's not properly specified, but a "no value" in a
- %% pattern field is supposed to mean simple presence of
- %% the corresponding data field. I've interpreted that to
- %% mean a type of "void" for the pattern field.
- PT == void -> {AllMatch, true};
- %% Similarly, it's not specified, but I assume that a
- %% mismatched type causes a mismatched value.
- PT =/= DT -> {false, AnyMatch};
- PV == DV -> {AllMatch, true};
- true -> {false, AnyMatch}
- end,
- headers_match(PRest, DRest, AllMatch1, AnyMatch1, MatchKind).
-
-split_topic_key(Key) ->
- {ok, KeySplit} = regexp:split(binary_to_list(Key), "\\."),
- KeySplit.
-
-topic_matches(PatternKey, RoutingKey) ->
- P = split_topic_key(PatternKey),
- R = split_topic_key(RoutingKey),
- topic_matches1(P, R).
-
-topic_matches1(["#"], _R) ->
- true;
-topic_matches1(["#" | PTail], R) ->
- last_topic_match(PTail, [], lists:reverse(R));
-topic_matches1([], []) ->
- true;
-topic_matches1(["*" | PatRest], [_ | ValRest]) ->
- topic_matches1(PatRest, ValRest);
-topic_matches1([PatElement | PatRest], [ValElement | ValRest]) when PatElement == ValElement ->
- topic_matches1(PatRest, ValRest);
-topic_matches1(_, _) ->
- false.
-
-last_topic_match(P, R, []) ->
- topic_matches1(P, R);
-last_topic_match(P, R, [BacktrackNext | BacktrackList]) ->
- topic_matches1(P, R) or last_topic_match(P, [BacktrackNext | R], BacktrackList).
-
-delete(ExchangeName, _IfUnused = true) ->
- call_with_exchange(ExchangeName, fun conditional_delete/1);
-delete(ExchangeName, _IfUnused = false) ->
- call_with_exchange(ExchangeName, fun unconditional_delete/1).
-
-maybe_auto_delete(#exchange{auto_delete = false}) ->
- ok;
+ queue_name = Queue,
+ key = Key,
+ args = Args}.
+
+delete(ExchangeName, IfUnused) ->
+ Fun = case IfUnused of
+ true -> fun conditional_delete/1;
+ false -> fun unconditional_delete/1
+ end,
+ case call_with_exchange(ExchangeName, Fun) of
+ {deleted, X = #exchange{type = Type}, Bs} ->
+ (type_to_module(Type)):delete(X, Bs),
+ ok;
+ Error = {error, _InUseOrNotFound} ->
+ Error
+ end.
+
+maybe_auto_delete(Exchange = #exchange{auto_delete = false}) ->
+ {no_delete, Exchange};
maybe_auto_delete(Exchange = #exchange{auto_delete = true}) ->
- conditional_delete(Exchange),
- ok.
+ case conditional_delete(Exchange) of
+ {error, in_use} -> {no_delete, Exchange};
+ {deleted, Exchange, []} -> {auto_deleted, Exchange}
+ end.
conditional_delete(Exchange = #exchange{name = ExchangeName}) ->
Match = #route{binding = #binding{exchange_name = ExchangeName, _ = '_'}},
%% we need to check for durable routes here too in case a bunch of
%% routes to durable queues have been removed temporarily as a
%% result of a node failure
- case contains(rabbit_route, Match) orelse contains(rabbit_durable_route, Match) of
+ case contains(rabbit_route, Match) orelse
+ contains(rabbit_durable_route, Match) of
false -> unconditional_delete(Exchange);
true -> {error, in_use}
end.
-unconditional_delete(#exchange{name = ExchangeName}) ->
- ok = delete_exchange_bindings(ExchangeName),
+unconditional_delete(Exchange = #exchange{name = ExchangeName}) ->
+ Bindings = delete_exchange_bindings(ExchangeName),
ok = mnesia:delete({rabbit_durable_exchange, ExchangeName}),
- ok = mnesia:delete({rabbit_exchange, ExchangeName}).
+ ok = mnesia:delete({rabbit_exchange, ExchangeName}),
+ {deleted, Exchange, Bindings}.
%%----------------------------------------------------------------------------
%% EXTENDED API
diff --git a/src/rabbit_exchange_type.erl b/src/rabbit_exchange_type.erl
new file mode 100644
index 00000000..a8c071e6
--- /dev/null
+++ b/src/rabbit_exchange_type.erl
@@ -0,0 +1,61 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2009 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_exchange_type).
+
+-export([behaviour_info/1]).
+
+behaviour_info(callbacks) ->
+ [
+ {description, 0},
+ {publish, 2},
+
+ %% called BEFORE declaration, to check args etc; may exit with #amqp_error{}
+ {validate, 1},
+
+ %% called after declaration when previously absent
+ {create, 1},
+
+ %% called when recovering
+ {recover, 2},
+
+ %% called after exchange deletion.
+ {delete, 2},
+
+ %% called after a binding has been added
+ {add_binding, 2},
+
+ %% called after bindings have been deleted.
+ {remove_bindings, 2}
+
+ ];
+behaviour_info(_Other) ->
+ undefined.
diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl
new file mode 100644
index 00000000..9b71e0e1
--- /dev/null
+++ b/src/rabbit_exchange_type_direct.erl
@@ -0,0 +1,63 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2009 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_exchange_type_direct).
+-include("rabbit.hrl").
+
+-behaviour(rabbit_exchange_type).
+
+-export([description/0, publish/2]).
+-export([validate/1, create/1, recover/2, delete/2,
+ add_binding/2, remove_bindings/2]).
+-include("rabbit_exchange_type_spec.hrl").
+
+-rabbit_boot_step({?MODULE,
+ [{description, "exchange type direct"},
+ {mfa, {rabbit_exchange_type_registry, register,
+ [<<"direct">>, ?MODULE]}},
+ {requires, rabbit_exchange_type_registry},
+ {enables, kernel_ready}]}).
+
+description() ->
+ [{name, <<"direct">>},
+ {description, <<"AMQP direct exchange, as per the AMQP specification">>}].
+
+publish(#exchange{name = Name}, Delivery =
+ #delivery{message = #basic_message{routing_key = RoutingKey}}) ->
+ rabbit_router:deliver(rabbit_router:match_routing_key(Name, RoutingKey),
+ Delivery).
+
+validate(_X) -> ok.
+create(_X) -> ok.
+recover(_X, _Bs) -> ok.
+delete(_X, _Bs) -> ok.
+add_binding(_X, _B) -> ok.
+remove_bindings(_X, _Bs) -> ok.
diff --git a/src/rabbit_exchange_type_fanout.erl b/src/rabbit_exchange_type_fanout.erl
new file mode 100644
index 00000000..311654ab
--- /dev/null
+++ b/src/rabbit_exchange_type_fanout.erl
@@ -0,0 +1,61 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2009 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_exchange_type_fanout).
+-include("rabbit.hrl").
+
+-behaviour(rabbit_exchange_type).
+
+-export([description/0, publish/2]).
+-export([validate/1, create/1, recover/2, delete/2,
+ add_binding/2, remove_bindings/2]).
+-include("rabbit_exchange_type_spec.hrl").
+
+-rabbit_boot_step({?MODULE,
+ [{description, "exchange type fanout"},
+ {mfa, {rabbit_exchange_type_registry, register,
+ [<<"fanout">>, ?MODULE]}},
+ {requires, rabbit_exchange_type_registry},
+ {enables, kernel_ready}]}).
+
+description() ->
+ [{name, <<"fanout">>},
+ {description, <<"AMQP fanout exchange, as per the AMQP specification">>}].
+
+publish(#exchange{name = Name}, Delivery) ->
+ rabbit_router:deliver(rabbit_router:match_routing_key(Name, '_'), Delivery).
+
+validate(_X) -> ok.
+create(_X) -> ok.
+recover(_X, _Bs) -> ok.
+delete(_X, _Bs) -> ok.
+add_binding(_X, _B) -> ok.
+remove_bindings(_X, _Bs) -> ok.
diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl
new file mode 100644
index 00000000..285dab1a
--- /dev/null
+++ b/src/rabbit_exchange_type_headers.erl
@@ -0,0 +1,137 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2009 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_exchange_type_headers).
+-include("rabbit.hrl").
+-include("rabbit_framing.hrl").
+
+-behaviour(rabbit_exchange_type).
+
+-export([description/0, publish/2]).
+-export([validate/1, create/1, recover/2, delete/2,
+ add_binding/2, remove_bindings/2]).
+-include("rabbit_exchange_type_spec.hrl").
+
+-rabbit_boot_step({?MODULE,
+ [{description, "exchange type headers"},
+ {mfa, {rabbit_exchange_type_registry, register,
+ [<<"headers">>, ?MODULE]}},
+ {requires, rabbit_exchange_type_registry},
+ {enables, kernel_ready}]}).
+
+-ifdef(use_specs).
+-spec(headers_match/2 :: (amqp_table(), amqp_table()) -> boolean()).
+-endif.
+
+description() ->
+ [{name, <<"headers">>},
+ {description, <<"AMQP headers exchange, as per the AMQP specification">>}].
+
+publish(#exchange{name = Name},
+ Delivery = #delivery{message = #basic_message{content = Content}}) ->
+ Headers = case (Content#content.properties)#'P_basic'.headers of
+ undefined -> [];
+ H -> rabbit_misc:sort_field_table(H)
+ end,
+ rabbit_router:deliver(rabbit_router:match_bindings(
+ Name, fun (#binding{args = Spec}) ->
+ headers_match(Spec, Headers)
+ end),
+ Delivery).
+
+default_headers_match_kind() -> all.
+
+parse_x_match(<<"all">>) -> all;
+parse_x_match(<<"any">>) -> any;
+parse_x_match(Other) ->
+ rabbit_log:warning("Invalid x-match field value ~p; expected all or any",
+ [Other]),
+ default_headers_match_kind().
+
+%% Horrendous matching algorithm. Depends for its merge-like
+%% (linear-time) behaviour on the lists:keysort
+%% (rabbit_misc:sort_field_table) that route/3 and
+%% rabbit_exchange:{add,delete}_binding/4 do.
+%%
+%% !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
+%% In other words: REQUIRES BOTH PATTERN AND DATA TO BE SORTED ASCENDING BY KEY.
+%% !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
+%%
+headers_match(Pattern, Data) ->
+ MatchKind = case lists:keysearch(<<"x-match">>, 1, Pattern) of
+ {value, {_, longstr, MK}} -> parse_x_match(MK);
+ {value, {_, Type, MK}} ->
+ rabbit_log:warning("Invalid x-match field type ~p "
+ "(value ~p); expected longstr",
+ [Type, MK]),
+ default_headers_match_kind();
+ _ -> default_headers_match_kind()
+ end,
+ headers_match(Pattern, Data, true, false, MatchKind).
+
+headers_match([], _Data, AllMatch, _AnyMatch, all) ->
+ AllMatch;
+headers_match([], _Data, _AllMatch, AnyMatch, any) ->
+ AnyMatch;
+headers_match([{<<"x-", _/binary>>, _PT, _PV} | PRest], Data,
+ AllMatch, AnyMatch, MatchKind) ->
+ headers_match(PRest, Data, AllMatch, AnyMatch, MatchKind);
+headers_match(_Pattern, [], _AllMatch, AnyMatch, MatchKind) ->
+ headers_match([], [], false, AnyMatch, MatchKind);
+headers_match(Pattern = [{PK, _PT, _PV} | _], [{DK, _DT, _DV} | DRest],
+ AllMatch, AnyMatch, MatchKind) when PK > DK ->
+ headers_match(Pattern, DRest, AllMatch, AnyMatch, MatchKind);
+headers_match([{PK, _PT, _PV} | PRest], Data = [{DK, _DT, _DV} | _],
+ _AllMatch, AnyMatch, MatchKind) when PK < DK ->
+ headers_match(PRest, Data, false, AnyMatch, MatchKind);
+headers_match([{PK, PT, PV} | PRest], [{DK, DT, DV} | DRest],
+ AllMatch, AnyMatch, MatchKind) when PK == DK ->
+ {AllMatch1, AnyMatch1} =
+ if
+ %% It's not properly specified, but a "no value" in a
+ %% pattern field is supposed to mean simple presence of
+ %% the corresponding data field. I've interpreted that to
+ %% mean a type of "void" for the pattern field.
+ PT == void -> {AllMatch, true};
+ %% Similarly, it's not specified, but I assume that a
+ %% mismatched type causes a mismatched value.
+ PT =/= DT -> {false, AnyMatch};
+ PV == DV -> {AllMatch, true};
+ true -> {false, AnyMatch}
+ end,
+ headers_match(PRest, DRest, AllMatch1, AnyMatch1, MatchKind).
+
+validate(_X) -> ok.
+create(_X) -> ok.
+recover(_X, _Bs) -> ok.
+delete(_X, _Bs) -> ok.
+add_binding(_X, _B) -> ok.
+remove_bindings(_X, _Bs) -> ok.
diff --git a/src/rabbit_exchange_type_registry.erl b/src/rabbit_exchange_type_registry.erl
new file mode 100644
index 00000000..175d15ad
--- /dev/null
+++ b/src/rabbit_exchange_type_registry.erl
@@ -0,0 +1,129 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2009 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_exchange_type_registry).
+
+-behaviour(gen_server).
+
+-export([start_link/0]).
+
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+-export([register/2, binary_to_type/1, lookup_module/1]).
+
+-define(SERVER, ?MODULE).
+-define(ETS_NAME, ?MODULE).
+
+-ifdef(use_specs).
+
+-spec(start_link/0 :: () -> 'ignore' | {'error', term()} | {'ok', pid()}).
+-spec(register/2 :: (binary(), atom()) -> 'ok').
+-spec(binary_to_type/1 :: (binary()) -> atom() | {'error', 'not_found'}).
+-spec(lookup_module/1 :: (atom()) -> {'ok', atom()} | {'error', 'not_found'}).
+
+-endif.
+
+%%---------------------------------------------------------------------------
+
+start_link() ->
+ gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
+
+%%---------------------------------------------------------------------------
+
+register(TypeName, ModuleName) ->
+ gen_server:call(?SERVER, {register, TypeName, ModuleName}).
+
+%% This is used with user-supplied arguments (e.g., on exchange
+%% declare), so we restrict it to existing atoms only. This means it
+%% can throw a badarg, indicating that the type cannot have been
+%% registered.
+binary_to_type(TypeBin) when is_binary(TypeBin) ->
+ case catch list_to_existing_atom(binary_to_list(TypeBin)) of
+ {'EXIT', {badarg, _}} -> {error, not_found};
+ TypeAtom -> TypeAtom
+ end.
+
+lookup_module(T) when is_atom(T) ->
+ case ets:lookup(?ETS_NAME, T) of
+ [{_, Module}] ->
+ {ok, Module};
+ [] ->
+ {error, not_found}
+ end.
+
+%%---------------------------------------------------------------------------
+
+internal_binary_to_type(TypeBin) when is_binary(TypeBin) ->
+ list_to_atom(binary_to_list(TypeBin)).
+
+internal_register(TypeName, ModuleName)
+ when is_binary(TypeName), is_atom(ModuleName) ->
+ ok = sanity_check_module(ModuleName),
+ true = ets:insert(?ETS_NAME,
+ {internal_binary_to_type(TypeName), ModuleName}),
+ ok.
+
+sanity_check_module(Module) ->
+ case catch lists:member(rabbit_exchange_type,
+ lists:flatten(
+ [Bs || {Attr, Bs} <-
+ Module:module_info(attributes),
+ Attr =:= behavior orelse
+ Attr =:= behaviour])) of
+ {'EXIT', {undef, _}} -> {error, not_module};
+ false -> {error, not_exchange_type};
+ true -> ok
+ end.
+
+%%---------------------------------------------------------------------------
+
+init([]) ->
+ ?ETS_NAME = ets:new(?ETS_NAME, [protected, set, named_table]),
+ {ok, none}.
+
+handle_call({register, TypeName, ModuleName}, _From, State) ->
+ ok = internal_register(TypeName, ModuleName),
+ {reply, ok, State};
+handle_call(Request, _From, State) ->
+ {stop, {unhandled_call, Request}, State}.
+
+handle_cast(Request, State) ->
+ {stop, {unhandled_cast, Request}, State}.
+
+handle_info(Message, State) ->
+ {stop, {unhandled_info, Message}, State}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl
new file mode 100644
index 00000000..8a3dceea
--- /dev/null
+++ b/src/rabbit_exchange_type_topic.erl
@@ -0,0 +1,101 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2009 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_exchange_type_topic).
+-include("rabbit.hrl").
+
+-behaviour(rabbit_exchange_type).
+
+-export([description/0, publish/2]).
+-export([validate/1, create/1, recover/2, delete/2,
+ add_binding/2, remove_bindings/2]).
+-include("rabbit_exchange_type_spec.hrl").
+
+-rabbit_boot_step({?MODULE,
+ [{description, "exchange type topic"},
+ {mfa, {rabbit_exchange_type_registry, register,
+ [<<"topic">>, ?MODULE]}},
+ {requires, rabbit_exchange_type_registry},
+ {enables, kernel_ready}]}).
+
+-export([topic_matches/2]).
+
+-ifdef(use_specs).
+-spec(topic_matches/2 :: (binary(), binary()) -> boolean()).
+-endif.
+
+description() ->
+ [{name, <<"topic">>},
+ {description, <<"AMQP topic exchange, as per the AMQP specification">>}].
+
+publish(#exchange{name = Name}, Delivery =
+ #delivery{message = #basic_message{routing_key = RoutingKey}}) ->
+ rabbit_router:deliver(rabbit_router:match_bindings(
+ Name, fun (#binding{key = BindingKey}) ->
+ topic_matches(BindingKey, RoutingKey)
+ end),
+ Delivery).
+
+split_topic_key(Key) ->
+ {ok, KeySplit} = regexp:split(binary_to_list(Key), "\\."),
+ KeySplit.
+
+topic_matches(PatternKey, RoutingKey) ->
+ P = split_topic_key(PatternKey),
+ R = split_topic_key(RoutingKey),
+ topic_matches1(P, R).
+
+topic_matches1(["#"], _R) ->
+ true;
+topic_matches1(["#" | PTail], R) ->
+ last_topic_match(PTail, [], lists:reverse(R));
+topic_matches1([], []) ->
+ true;
+topic_matches1(["*" | PatRest], [_ | ValRest]) ->
+ topic_matches1(PatRest, ValRest);
+topic_matches1([PatElement | PatRest], [ValElement | ValRest])
+ when PatElement == ValElement ->
+ topic_matches1(PatRest, ValRest);
+topic_matches1(_, _) ->
+ false.
+
+last_topic_match(P, R, []) ->
+ topic_matches1(P, R);
+last_topic_match(P, R, [BacktrackNext | BacktrackList]) ->
+ topic_matches1(P, R) or
+ last_topic_match(P, [BacktrackNext | R], BacktrackList).
+
+validate(_X) -> ok.
+create(_X) -> ok.
+recover(_X, _Bs) -> ok.
+delete(_X, _Bs) -> ok.
+add_binding(_X, _B) -> ok.
+remove_bindings(_X, _Bs) -> ok.
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index c9f8183f..7d840861 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -37,7 +37,7 @@
handle_info/2]).
-export([start_link/2, shutdown/1]).
-export([limit/2, can_send/3, ack/2, register/2, unregister/2]).
--export([get_limit/1]).
+-export([get_limit/1, block/1, unblock/1]).
%%----------------------------------------------------------------------------
@@ -47,12 +47,14 @@
-spec(start_link/2 :: (pid(), non_neg_integer()) -> pid()).
-spec(shutdown/1 :: (maybe_pid()) -> 'ok').
--spec(limit/2 :: (maybe_pid(), non_neg_integer()) -> 'ok').
+-spec(limit/2 :: (maybe_pid(), non_neg_integer()) -> 'ok' | 'stopped').
-spec(can_send/3 :: (maybe_pid(), pid(), boolean()) -> boolean()).
-spec(ack/2 :: (maybe_pid(), non_neg_integer()) -> 'ok').
-spec(register/2 :: (maybe_pid(), pid()) -> 'ok').
-spec(unregister/2 :: (maybe_pid(), pid()) -> 'ok').
-spec(get_limit/1 :: (maybe_pid()) -> non_neg_integer()).
+-spec(block/1 :: (maybe_pid()) -> 'ok').
+-spec(unblock/1 :: (maybe_pid()) -> 'ok' | 'stopped').
-endif.
@@ -60,6 +62,7 @@
-record(lim, {prefetch_count = 0,
ch_pid,
+ blocked = false,
queues = dict:new(), % QPid -> {MonitorRef, Notify}
volume = 0}).
%% 'Notify' is a boolean that indicates whether a queue should be
@@ -77,13 +80,14 @@ start_link(ChPid, UnackedMsgCount) ->
shutdown(undefined) ->
ok;
shutdown(LimiterPid) ->
- unlink(LimiterPid),
+ true = unlink(LimiterPid),
gen_server2:cast(LimiterPid, shutdown).
limit(undefined, 0) ->
ok;
limit(LimiterPid, PrefetchCount) ->
- gen_server2:cast(LimiterPid, {limit, PrefetchCount}).
+ unlink_on_stopped(LimiterPid,
+ gen_server2:call(LimiterPid, {limit, PrefetchCount})).
%% Ask the limiter whether the queue can deliver a message without
%% breaching a limit
@@ -113,6 +117,17 @@ get_limit(Pid) ->
fun () -> 0 end,
fun () -> gen_server2:pcall(Pid, 9, get_limit, infinity) end).
+block(undefined) ->
+ ok;
+block(LimiterPid) ->
+ gen_server2:call(LimiterPid, block, infinity).
+
+unblock(undefined) ->
+ ok;
+unblock(LimiterPid) ->
+ unlink_on_stopped(LimiterPid,
+ gen_server2:call(LimiterPid, unblock, infinity)).
+
%%----------------------------------------------------------------------------
%% gen_server callbacks
%%----------------------------------------------------------------------------
@@ -120,29 +135,45 @@ get_limit(Pid) ->
init([ChPid, UnackedMsgCount]) ->
{ok, #lim{ch_pid = ChPid, volume = UnackedMsgCount}}.
+handle_call({can_send, _QPid, _AckRequired}, _From,
+ State = #lim{blocked = true}) ->
+ {reply, false, State};
handle_call({can_send, QPid, AckRequired}, _From,
State = #lim{volume = Volume}) ->
case limit_reached(State) of
true -> {reply, false, limit_queue(QPid, State)};
- false -> {reply, true, State#lim{volume = if AckRequired -> Volume + 1;
- true -> Volume
- end}}
+ false -> {reply, true, State#lim{volume = if AckRequired -> Volume + 1;
+ true -> Volume
+ end}}
end;
handle_call(get_limit, _From, State = #lim{prefetch_count = PrefetchCount}) ->
- {reply, PrefetchCount, State}.
+ {reply, PrefetchCount, State};
+
+handle_call({limit, PrefetchCount}, _From, State) ->
+ case maybe_notify(State, State#lim{prefetch_count = PrefetchCount}) of
+ {cont, State1} -> {reply, ok, State1};
+ {stop, State1} -> {stop, normal, stopped, State1}
+ end;
+
+handle_call(block, _From, State) ->
+ {reply, ok, State#lim{blocked = true}};
+
+handle_call(unblock, _From, State) ->
+ case maybe_notify(State, State#lim{blocked = false}) of
+ {cont, State1} -> {reply, ok, State1};
+ {stop, State1} -> {stop, normal, stopped, State1}
+ end.
handle_cast(shutdown, State) ->
{stop, normal, State};
-handle_cast({limit, PrefetchCount}, State) ->
- {noreply, maybe_notify(State, State#lim{prefetch_count = PrefetchCount})};
-
handle_cast({ack, Count}, State = #lim{volume = Volume}) ->
NewVolume = if Volume == 0 -> 0;
true -> Volume - Count
end,
- {noreply, maybe_notify(State, State#lim{volume = NewVolume})};
+ {cont, State1} = maybe_notify(State, State#lim{volume = NewVolume}),
+ {noreply, State1};
handle_cast({register, QPid}, State) ->
{noreply, remember_queue(QPid, State)};
@@ -164,14 +195,21 @@ code_change(_, State, _) ->
%%----------------------------------------------------------------------------
maybe_notify(OldState, NewState) ->
- case limit_reached(OldState) andalso not(limit_reached(NewState)) of
- true -> notify_queues(NewState);
- false -> NewState
+ case (limit_reached(OldState) orelse is_blocked(OldState)) andalso
+ not (limit_reached(NewState) orelse is_blocked(NewState)) of
+ true -> NewState1 = notify_queues(NewState),
+ {case NewState1#lim.prefetch_count of
+ 0 -> stop;
+ _ -> cont
+ end, NewState1};
+ false -> {cont, NewState}
end.
limit_reached(#lim{prefetch_count = Limit, volume = Volume}) ->
Limit =/= 0 andalso Volume >= Limit.
+is_blocked(#lim{blocked = Blocked}) -> Blocked.
+
remember_queue(QPid, State = #lim{queues = Queues}) ->
case dict:is_key(QPid, Queues) of
false -> MRef = erlang:monitor(process, QPid),
@@ -209,3 +247,12 @@ notify_queues(State = #lim{ch_pid = ChPid, queues = Queues}) ->
ok
end,
State#lim{queues = NewQueues}.
+
+unlink_on_stopped(LimiterPid, stopped) ->
+ true = unlink(LimiterPid),
+ ok = receive {'EXIT', LimiterPid, _Reason} -> ok
+ after 0 -> ok
+ end,
+ stopped;
+unlink_on_stopped(_LimiterPid, Result) ->
+ Result.
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 0654f58a..9abc1695 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -49,13 +49,14 @@
-export([ensure_ok/2]).
-export([makenode/1, nodeparts/1, cookie_hash/0, tcp_name/3]).
-export([intersperse/2, upmap/2, map_in_order/2]).
--export([table_foreach/2]).
+-export([table_fold/3]).
-export([dirty_read_all/1, dirty_foreach_key/2, dirty_dump_log/1]).
-export([read_term_file/1, write_term_file/2]).
-export([append_file/2, ensure_parent_dirs_exist/1]).
-export([format_stderr/2]).
-export([start_applications/1, stop_applications/1]).
-export([unfold/2, ceil/1, queue_fold/3]).
+-export([sort_field_table/1]).
-export([pid_to_string/1, string_to_pid/1]).
-export([version_compare/2, version_compare/3]).
@@ -114,7 +115,7 @@
-spec(intersperse/2 :: (A, [A]) -> [A]).
-spec(upmap/2 :: (fun ((A) -> B), [A]) -> [B]).
-spec(map_in_order/2 :: (fun ((A) -> B), [A]) -> [B]).
--spec(table_foreach/2 :: (fun ((any()) -> any()), atom()) -> 'ok').
+-spec(table_fold/3 :: (fun ((any(), A) -> A), A, atom()) -> A).
-spec(dirty_read_all/1 :: (atom()) -> [any()]).
-spec(dirty_foreach_key/2 :: (fun ((any()) -> any()), atom()) ->
'ok' | 'aborted').
@@ -129,6 +130,7 @@
-spec(unfold/2 :: (fun ((A) -> ({'true', B, A} | 'false')), A) -> {[B], A}).
-spec(ceil/1 :: (number()) -> number()).
-spec(queue_fold/3 :: (fun ((any(), B) -> B), B, queue()) -> B).
+-spec(sort_field_table/1 :: (amqp_table()) -> amqp_table()).
-spec(pid_to_string/1 :: (pid()) -> string()).
-spec(string_to_pid/1 :: (string()) -> pid()).
@@ -357,20 +359,20 @@ map_in_order(F, L) ->
lists:reverse(
lists:foldl(fun (E, Acc) -> [F(E) | Acc] end, [], L)).
-%% For each entry in a table, execute a function in a transaction.
-%% This is often far more efficient than wrapping a tx around the lot.
+%% Fold over each entry in a table, executing the cons function in a
+%% transaction. This is often far more efficient than wrapping a tx
+%% around the lot.
%%
%% We ignore entries that have been modified or removed.
-table_foreach(F, TableName) ->
- lists:foreach(
- fun (E) -> execute_mnesia_transaction(
+table_fold(F, Acc0, TableName) ->
+ lists:foldl(
+ fun (E, Acc) -> execute_mnesia_transaction(
fun () -> case mnesia:match_object(TableName, E, read) of
- [] -> ok;
- _ -> F(E)
+ [] -> Acc;
+ _ -> F(E, Acc)
end
end)
- end, dirty_read_all(TableName)),
- ok.
+ end, Acc0, dirty_read_all(TableName)).
dirty_read_all(TableName) ->
mnesia:dirty_select(TableName, [{'$1',[],['$1']}]).
@@ -504,6 +506,10 @@ queue_fold(Fun, Init, Q) ->
{{value, V}, Q1} -> queue_fold(Fun, Fun(V, Init), Q1)
end.
+%% Sorts a list of AMQP table fields as per the AMQP spec
+sort_field_table(Arguments) ->
+ lists:keysort(1, Arguments).
+
%% This provides a string representation of a pid that is the same
%% regardless of what node we are running on. The representation also
%% permits easy identification of the pid's node.
diff --git a/src/rabbit_multi.erl b/src/rabbit_multi.erl
index bfafa6ff..8c898498 100644
--- a/src/rabbit_multi.erl
+++ b/src/rabbit_multi.erl
@@ -222,9 +222,8 @@ run_rabbitmq_server() ->
{win32, fun run_rabbitmq_server_win32/0}]).
run_rabbitmq_server_unix() ->
- FullPath = getenv("RABBITMQ_SCRIPT_HOME") ++ "/rabbitmq-server",
- erlang:open_port({spawn_executable, FullPath},
- [{arg0, FullPath}, {args, ["-noinput"]}, nouse_stdio]).
+ CmdLine = getenv("RABBITMQ_SCRIPT_HOME") ++ "/rabbitmq-server -noinput",
+ erlang:open_port({spawn, CmdLine}, [nouse_stdio]).
run_rabbitmq_server_win32() ->
Cmd = filename:nativename(os:find_executable("cmd")),
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index cf04f05b..7978573d 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -117,15 +117,25 @@ start() ->
transient, infinity, supervisor, [tcp_client_sup]}),
ok.
+getaddr(Host) ->
+ %% inet_parse:address takes care of ip string, like "0.0.0.0"
+ %% inet:getaddr returns immediately for ip tuple {0,0,0,0},
+ %% and runs 'inet_gethost' port process for dns lookups.
+ %% On Windows inet:getaddr runs dns resolver for ip string, which may fail.
+ case inet_parse:address(Host) of
+ {ok, IPAddress1} -> IPAddress1;
+ {error, _} ->
+ case inet:getaddr(Host, inet) of
+ {ok, IPAddress2} -> IPAddress2;
+ {error, Reason} ->
+ error_logger:error_msg("invalid host ~p - ~p~n",
+ [Host, Reason]),
+ throw({error, {invalid_host, Host, Reason}})
+ end
+ end.
+
check_tcp_listener_address(NamePrefix, Host, Port) ->
- IPAddress =
- case inet:getaddr(Host, inet) of
- {ok, IPAddress1} -> IPAddress1;
- {error, Reason} ->
- error_logger:error_msg("invalid host ~p - ~p~n",
- [Host, Reason]),
- throw({error, {invalid_host, Host, Reason}})
- end,
+ IPAddress = getaddr(Host),
if is_integer(Port) andalso (Port >= 0) andalso (Port =< 65535) -> ok;
true -> error_logger:error_msg("invalid port ~p - not 0..65535~n",
[Port]),
@@ -157,7 +167,7 @@ start_listener(Host, Port, Label, OnConnect) ->
ok.
stop_tcp_listener(Host, Port) ->
- {ok, IPAddress} = inet:getaddr(Host, inet),
+ IPAddress = getaddr(Host),
Name = rabbit_misc:tcp_name(rabbit_tcp_listener_sup, IPAddress, Port),
ok = supervisor:terminate_child(rabbit_sup, Name),
ok = supervisor:delete_child(rabbit_sup, Name),
diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl
index ee2b82c5..884ea4ab 100644
--- a/src/rabbit_router.erl
+++ b/src/rabbit_router.erl
@@ -30,12 +30,15 @@
%%
-module(rabbit_router).
+-include_lib("stdlib/include/qlc.hrl").
-include("rabbit.hrl").
-behaviour(gen_server2).
-export([start_link/0,
- deliver/2]).
+ deliver/2,
+ match_bindings/2,
+ match_routing_key/2]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
@@ -129,6 +132,46 @@ deliver_per_node(NodeQPids, Delivery) ->
-endif.
+%% TODO: Maybe this should be handled by a cursor instead.
+%% TODO: This causes a full scan for each entry with the same exchange
+match_bindings(Name, Match) ->
+ Query = qlc:q([QName || #route{binding = Binding = #binding{
+ exchange_name = ExchangeName,
+ queue_name = QName}} <-
+ mnesia:table(rabbit_route),
+ ExchangeName == Name,
+ Match(Binding)]),
+ lookup_qpids(
+ try
+ mnesia:async_dirty(fun qlc:e/1, [Query])
+ catch exit:{aborted, {badarg, _}} ->
+ %% work around OTP-7025, which was fixed in R12B-1, by
+ %% falling back on a less efficient method
+ [QName || #route{binding = Binding = #binding{
+ queue_name = QName}} <-
+ mnesia:dirty_match_object(
+ rabbit_route,
+ #route{binding = #binding{exchange_name = Name,
+ _ = '_'}}),
+ Match(Binding)]
+ end).
+
+match_routing_key(Name, RoutingKey) ->
+ MatchHead = #route{binding = #binding{exchange_name = Name,
+ queue_name = '$1',
+ key = RoutingKey,
+ _ = '_'}},
+ lookup_qpids(mnesia:dirty_select(rabbit_route, [{MatchHead, [], ['$1']}])).
+
+lookup_qpids(Queues) ->
+ sets:fold(
+ fun(Key, Acc) ->
+ case mnesia:dirty_read({rabbit_queue, Key}) of
+ [#amqqueue{pid = QPid}] -> [QPid | Acc];
+ [] -> Acc
+ end
+ end, [], sets:from_list(Queues)).
+
%%--------------------------------------------------------------------
init([]) ->
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 277e6717..82f2d199 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -31,6 +31,8 @@
-module(rabbit_tests).
+-compile([export_all]).
+
-export([all_tests/0, test_parsing/0]).
%% Exported so the hook mechanism can call back
@@ -187,7 +189,7 @@ test_simple_n_element_queue(N) ->
test_pg_local() ->
[P, Q] = [spawn(fun () -> receive X -> X end end) || _ <- [x, x]],
check_pg_local(ok, [], []),
- check_pg_local(pg_local:join(a, P), [P], []),
+ check_pg_local(pg_local:join(a, P), [P], []),
check_pg_local(pg_local:join(b, P), [P], [P]),
check_pg_local(pg_local:join(a, P), [P, P], [P]),
check_pg_local(pg_local:join(a, Q), [P, P, Q], [P]),
@@ -197,7 +199,10 @@ test_pg_local() ->
check_pg_local(pg_local:leave(b, P), [P, Q], [Q, Q]),
check_pg_local(pg_local:leave(a, P), [Q], [Q, Q]),
check_pg_local(pg_local:leave(a, P), [Q], [Q, Q]),
- [X ! done || X <- [P, Q]],
+ [begin X ! done,
+ Ref = erlang:monitor(process, X),
+ receive {'DOWN', Ref, process, X, _Info} -> ok end
+ end || X <- [P, Q]],
check_pg_local(ok, [], []),
passed.
@@ -325,7 +330,8 @@ test_topic_match(P, R) ->
test_topic_match(P, R, true).
test_topic_match(P, R, Expected) ->
- case rabbit_exchange:topic_matches(list_to_binary(P), list_to_binary(R)) of
+ case rabbit_exchange_type_topic:topic_matches(list_to_binary(P),
+ list_to_binary(R)) of
Expected ->
passed;
_ ->